Get rid of unnecessary work in mlua-luau-scheduler after latest mlua upgrade + remove tracing in net

This commit is contained in:
Filip Tibell 2025-04-28 14:20:47 +02:00
parent 1d5535dac0
commit 68d1c2bb39
No known key found for this signature in database
10 changed files with 168 additions and 245 deletions

53
Cargo.lock generated
View file

@ -238,7 +238,7 @@ version = "3.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ff6e472cdea888a4bd64f342f09b3f50e1886d32afe8df3d663c01140b811b18" checksum = "ff6e472cdea888a4bd64f342f09b3f50e1886d32afe8df3d663c01140b811b18"
dependencies = [ dependencies = [
"event-listener 5.4.0", "event-listener",
"event-listener-strategy", "event-listener-strategy",
"pin-project-lite", "pin-project-lite",
] ]
@ -267,7 +267,7 @@ dependencies = [
"async-task", "async-task",
"blocking", "blocking",
"cfg-if 1.0.0", "cfg-if 1.0.0",
"event-listener 5.4.0", "event-listener",
"futures-lite", "futures-lite",
"rustix 0.38.44", "rustix 0.38.44",
"tracing", "tracing",
@ -736,12 +736,6 @@ version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7c74b8349d32d297c9134b8c88677813a227df8f779daa29bfc29c183fe3dca6" checksum = "7c74b8349d32d297c9134b8c88677813a227df8f779daa29bfc29c183fe3dca6"
[[package]]
name = "convert_case"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6245d59a3e82a7fc217c5828a6692dbc6dfb63a0c8c90495621f7b9d79704a0e"
[[package]] [[package]]
name = "cookie" name = "cookie"
version = "0.15.2" version = "0.15.2"
@ -849,19 +843,6 @@ dependencies = [
"syn 2.0.100", "syn 2.0.100",
] ]
[[package]]
name = "derive_more"
version = "0.99.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6edb4b64a43d977b8e99788fe3a04d483834fba1215a7e02caa415b626497f7f"
dependencies = [
"convert_case",
"proc-macro2",
"quote",
"rustc_version 0.4.1",
"syn 2.0.100",
]
[[package]] [[package]]
name = "dialoguer" name = "dialoguer"
version = "0.11.0" version = "0.11.0"
@ -1021,17 +1002,6 @@ version = "3.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a5d9305ccc6942a704f4335694ecd3de2ea531b114ac2d51f5f843750787a92f" checksum = "a5d9305ccc6942a704f4335694ecd3de2ea531b114ac2d51f5f843750787a92f"
[[package]]
name = "event-listener"
version = "4.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "67b215c49b2b248c855fb73579eb1f4f26c38ffdc12973e20e07b91d78d5646e"
dependencies = [
"concurrent-queue",
"parking",
"pin-project-lite",
]
[[package]] [[package]]
name = "event-listener" name = "event-listener"
version = "5.4.0" version = "5.4.0"
@ -1049,7 +1019,7 @@ version = "0.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8be9f3dfaaffdae2972880079a491a1a8bb7cbed0b8dd7a347f668b4150a3b93" checksum = "8be9f3dfaaffdae2972880079a491a1a8bb7cbed0b8dd7a347f668b4150a3b93"
dependencies = [ dependencies = [
"event-listener 5.4.0", "event-listener",
"pin-project-lite", "pin-project-lite",
] ]
@ -1913,7 +1883,6 @@ dependencies = [
"pin-project-lite", "pin-project-lite",
"rustls 0.23.26", "rustls 0.23.26",
"rustls-pki-types", "rustls-pki-types",
"tracing",
"url", "url",
"urlencoding", "urlencoding",
"webpki", "webpki",
@ -2149,11 +2118,10 @@ dependencies = [
"async-io", "async-io",
"blocking", "blocking",
"concurrent-queue", "concurrent-queue",
"derive_more", "event-listener",
"event-listener 4.0.3",
"futures-lite", "futures-lite",
"mlua", "mlua",
"rustc-hash 1.1.0", "rustc-hash 2.1.1",
"tracing", "tracing",
"tracing-subscriber", "tracing-subscriber",
"tracing-tracy", "tracing-tracy",
@ -2889,15 +2857,6 @@ dependencies = [
"semver 0.9.0", "semver 0.9.0",
] ]
[[package]]
name = "rustc_version"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cfcb3a22ef46e85b45de6ee7e79d063319ebb6594faafcf1c225ea92ab6e9b92"
dependencies = [
"semver 1.0.26",
]
[[package]] [[package]]
name = "rustix" name = "rustix"
version = "0.38.44" version = "0.38.44"
@ -3287,7 +3246,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d022496b16281348b52d0e30ae99e01a73d737b2f45d38fed4edf79f9325a1d5" checksum = "d022496b16281348b52d0e30ae99e01a73d737b2f45d38fed4edf79f9325a1d5"
dependencies = [ dependencies = [
"discard", "discard",
"rustc_version 0.2.3", "rustc_version",
"stdweb-derive", "stdweb-derive",
"stdweb-internal-macros", "stdweb-internal-macros",
"stdweb-internal-runtime", "stdweb-internal-runtime",

View file

@ -32,7 +32,6 @@ hyper = { version = "1.6", features = ["http1", "client", "server"] }
pin-project-lite = "0.2" pin-project-lite = "0.2"
rustls = "0.23" rustls = "0.23"
rustls-pki-types = "1.11" rustls-pki-types = "1.11"
tracing = "0.1"
url = "2.5" url = "2.5"
urlencoding = "2.1" urlencoding = "2.1"
webpki = "0.22" webpki = "0.22"

View file

@ -78,66 +78,41 @@ impl HyperService<HyperRequest<Incoming>> for Service {
} }
} }
#[tracing::instrument(skip_all)]
async fn handle_request( async fn handle_request(
lua: Lua, lua: Lua,
handler: LuaFunction, handler: LuaFunction,
request: HyperRequest<Incoming>, request: HyperRequest<Incoming>,
address: SocketAddr, address: SocketAddr,
) -> LuaResult<HyperResponse<Full<Bytes>>> { ) -> LuaResult<HyperResponse<Full<Bytes>>> {
let request = tracing::debug_span!("request") let request = Request::from_incoming(request, true)
.in_scope(|| async { .await?
let request = Request::from_incoming(request, true) .with_address(address);
.await?
.with_address(address);
Ok::<_, LuaError>(request)
})
.await?;
let thread_res = tracing::debug_span!("run") let thread_id = lua.push_thread_back(handler, request)?;
.in_scope(|| async { lua.track_thread(thread_id);
let thread_id = lua.push_thread_back(handler, request)?; lua.wait_for_thread(thread_id).await;
lua.track_thread(thread_id);
lua.wait_for_thread(thread_id).await;
let thread_res = lua let thread_res = lua
.get_thread_result(thread_id) .get_thread_result(thread_id)
.expect("Missing handler thread result")?; .expect("Missing handler thread result")?;
Ok::<_, LuaError>(thread_res)
})
.await?;
let response = tracing::debug_span!("response").in_scope(|| { let config = ResponseConfig::from_lua_multi(thread_res, &lua)?;
let config = ResponseConfig::from_lua_multi(thread_res, &lua)?; let response = Response::try_from(config)?;
let response = Response::try_from(config)?; Ok(response.into_full())
Ok::<_, LuaError>(response.into_full())
})?;
Ok(response)
} }
#[tracing::instrument(skip_all)]
async fn handle_websocket( async fn handle_websocket(
lua: Lua, lua: Lua,
handler: LuaFunction, handler: LuaFunction,
request: HyperRequest<Incoming>, request: HyperRequest<Incoming>,
) -> LuaResult<()> { ) -> LuaResult<()> {
let upgraded = tracing::debug_span!("upgrade") let upgraded = hyper::upgrade::on(request).await.into_lua_err()?;
.in_scope(|| async { hyper::upgrade::on(request).await.into_lua_err() })
.await?;
let stream = tracing::debug_span!("stream") let stream =
.in_scope(|| async { WebSocketStream::from_raw_socket(HyperIo::from(upgraded), Role::Server, None).await;
WebSocketStream::from_raw_socket(HyperIo::from(upgraded), Role::Server, None).await
})
.await;
tracing::debug_span!("run") let websocket = Websocket::from(stream);
.in_scope(|| async { lua.push_thread_back(handler, websocket)?;
let websocket = Websocket::from(stream);
lua.push_thread_back(handler, websocket)
})
.await?;
Ok(()) Ok(())
} }

View file

@ -16,13 +16,12 @@ path = "src/lib.rs"
workspace = true workspace = true
[dependencies] [dependencies]
async-executor = "1.8" async-executor = "1.13"
blocking = "1.5" blocking = "1.6"
concurrent-queue = "2.4" concurrent-queue = "2.5"
derive_more = "0.99" event-listener = "5.4"
event-listener = "4.0" futures-lite = "2.6"
futures-lite = "2.2" rustc-hash = "2.1"
rustc-hash = "1.1"
tracing = "0.1" tracing = "0.1"
mlua = { version = "0.10.3", features = [ mlua = { version = "0.10.3", features = [
@ -34,7 +33,7 @@ mlua = { version = "0.10.3", features = [
[dev-dependencies] [dev-dependencies]
async-fs = "2.1" async-fs = "2.1"
async-io = "2.3" async-io = "2.4"
tracing-subscriber = { version = "0.3", features = ["env-filter"] } tracing-subscriber = { version = "0.3", features = ["env-filter"] }
tracing-tracy = "0.11" tracing-tracy = "0.11"

View file

@ -8,7 +8,7 @@ use crate::{
result_map::ThreadResultMap, result_map::ThreadResultMap,
thread_id::ThreadId, thread_id::ThreadId,
traits::LuaSchedulerExt, traits::LuaSchedulerExt,
util::{is_poll_pending, LuaThreadOrFunction, ThreadResult}, util::{is_poll_pending, LuaThreadOrFunction},
}; };
const ERR_METADATA_NOT_ATTACHED: &str = "\ const ERR_METADATA_NOT_ATTACHED: &str = "\
@ -123,8 +123,7 @@ impl Functions {
if thread.status() != LuaThreadStatus::Resumable { if thread.status() != LuaThreadStatus::Resumable {
let id = ThreadId::from(&thread); let id = ThreadId::from(&thread);
if resume_map.is_tracked(id) { if resume_map.is_tracked(id) {
let res = ThreadResult::new(Ok(v.clone()), lua); resume_map.insert(id, Ok(v.clone()));
resume_map.insert(id, res);
} }
} }
(true, v).into_lua_multi(lua) (true, v).into_lua_multi(lua)
@ -134,8 +133,7 @@ impl Functions {
// Not pending, store the error // Not pending, store the error
let id = ThreadId::from(&thread); let id = ThreadId::from(&thread);
if resume_map.is_tracked(id) { if resume_map.is_tracked(id) {
let res = ThreadResult::new(Err(e.clone()), lua); resume_map.insert(id, Err(e.clone()));
resume_map.insert(id, res);
} }
(false, e.to_string()).into_lua_multi(lua) (false, e.to_string()).into_lua_multi(lua)
} }
@ -177,8 +175,7 @@ impl Functions {
if thread.status() != LuaThreadStatus::Resumable { if thread.status() != LuaThreadStatus::Resumable {
let id = ThreadId::from(&thread); let id = ThreadId::from(&thread);
if spawn_map.is_tracked(id) { if spawn_map.is_tracked(id) {
let res = ThreadResult::new(Ok(v), lua); spawn_map.insert(id, Ok(v));
spawn_map.insert(id, res);
} }
} }
} }
@ -188,8 +185,7 @@ impl Functions {
// Not pending, store the error // Not pending, store the error
let id = ThreadId::from(&thread); let id = ThreadId::from(&thread);
if spawn_map.is_tracked(id) { if spawn_map.is_tracked(id) {
let res = ThreadResult::new(Err(e), lua); spawn_map.insert(id, Err(e));
spawn_map.insert(id, res);
} }
} }
} }

View file

@ -1,12 +1,15 @@
use std::{pin::Pin, rc::Rc}; use std::{
ops::{Deref, DerefMut},
pin::Pin,
rc::Rc,
};
use concurrent_queue::ConcurrentQueue; use concurrent_queue::ConcurrentQueue;
use derive_more::{Deref, DerefMut};
use event_listener::Event; use event_listener::Event;
use futures_lite::{Future, FutureExt}; use futures_lite::{Future, FutureExt};
use mlua::prelude::*; use mlua::prelude::*;
use crate::{traits::IntoLuaThread, util::ThreadWithArgs, ThreadId}; use crate::{traits::IntoLuaThread, ThreadId};
/** /**
Queue for storing [`LuaThread`]s with associated arguments. Queue for storing [`LuaThread`]s with associated arguments.
@ -16,15 +19,13 @@ use crate::{traits::IntoLuaThread, util::ThreadWithArgs, ThreadId};
*/ */
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub(crate) struct ThreadQueue { pub(crate) struct ThreadQueue {
queue: Rc<ConcurrentQueue<ThreadWithArgs>>, inner: Rc<ThreadQueueInner>,
event: Rc<Event>,
} }
impl ThreadQueue { impl ThreadQueue {
pub fn new() -> Self { pub fn new() -> Self {
let queue = Rc::new(ConcurrentQueue::unbounded()); let inner = Rc::new(ThreadQueueInner::new());
let event = Rc::new(Event::new()); Self { inner }
Self { queue, event }
} }
pub fn push_item( pub fn push_item(
@ -38,32 +39,25 @@ impl ThreadQueue {
tracing::trace!("pushing item to queue with {} args", args.len()); tracing::trace!("pushing item to queue with {} args", args.len());
let id = ThreadId::from(&thread); let id = ThreadId::from(&thread);
let stored = ThreadWithArgs::new(lua, thread, args)?;
self.queue.push(stored).into_lua_err()?; let _ = self.inner.queue.push((thread, args));
self.event.notify(usize::MAX); self.inner.event.notify(usize::MAX);
Ok(id) Ok(id)
} }
#[inline] #[inline]
pub fn drain_items<'outer, 'lua>( pub fn drain_items(&self) -> impl Iterator<Item = (LuaThread, LuaMultiValue)> + '_ {
&'outer self, self.inner.queue.try_iter()
lua: &'lua Lua,
) -> impl Iterator<Item = (LuaThread, LuaMultiValue)> + 'outer
where
'lua: 'outer,
{
self.queue.try_iter().map(|stored| stored.into_inner(lua))
} }
#[inline] #[inline]
pub async fn wait_for_item(&self) { pub async fn wait_for_item(&self) {
if self.queue.is_empty() { if self.inner.queue.is_empty() {
let listener = self.event.listen(); let listener = self.inner.event.listen();
// NOTE: Need to check again, we could have gotten // NOTE: Need to check again, we could have gotten
// new queued items while creating our listener // new queued items while creating our listener
if self.queue.is_empty() { if self.inner.queue.is_empty() {
listener.await; listener.await;
} }
} }
@ -71,14 +65,14 @@ impl ThreadQueue {
#[inline] #[inline]
pub fn is_empty(&self) -> bool { pub fn is_empty(&self) -> bool {
self.queue.is_empty() self.inner.queue.is_empty()
} }
} }
/** /**
Alias for [`ThreadQueue`], providing a newtype to store in Lua app data. Alias for [`ThreadQueue`], providing a newtype to store in Lua app data.
*/ */
#[derive(Debug, Clone, Deref, DerefMut)] #[derive(Debug, Clone)]
pub(crate) struct SpawnedThreadQueue(ThreadQueue); pub(crate) struct SpawnedThreadQueue(ThreadQueue);
impl SpawnedThreadQueue { impl SpawnedThreadQueue {
@ -87,10 +81,23 @@ impl SpawnedThreadQueue {
} }
} }
impl Deref for SpawnedThreadQueue {
type Target = ThreadQueue;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl DerefMut for SpawnedThreadQueue {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
/** /**
Alias for [`ThreadQueue`], providing a newtype to store in Lua app data. Alias for [`ThreadQueue`], providing a newtype to store in Lua app data.
*/ */
#[derive(Debug, Clone, Deref, DerefMut)] #[derive(Debug, Clone)]
pub(crate) struct DeferredThreadQueue(ThreadQueue); pub(crate) struct DeferredThreadQueue(ThreadQueue);
impl DeferredThreadQueue { impl DeferredThreadQueue {
@ -99,6 +106,19 @@ impl DeferredThreadQueue {
} }
} }
impl Deref for DeferredThreadQueue {
type Target = ThreadQueue;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl DerefMut for DeferredThreadQueue {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
pub type LocalBoxFuture<'fut> = Pin<Box<dyn Future<Output = ()> + 'fut>>; pub type LocalBoxFuture<'fut> = Pin<Box<dyn Future<Output = ()> + 'fut>>;
/** /**
@ -109,31 +129,60 @@ pub type LocalBoxFuture<'fut> = Pin<Box<dyn Future<Output = ()> + 'fut>>;
*/ */
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub(crate) struct FuturesQueue<'fut> { pub(crate) struct FuturesQueue<'fut> {
queue: Rc<ConcurrentQueue<LocalBoxFuture<'fut>>>, inner: Rc<FuturesQueueInner<'fut>>,
event: Rc<Event>,
} }
impl<'fut> FuturesQueue<'fut> { impl<'fut> FuturesQueue<'fut> {
pub fn new() -> Self { pub fn new() -> Self {
let queue = Rc::new(ConcurrentQueue::unbounded()); let inner = Rc::new(FuturesQueueInner::new());
let event = Rc::new(Event::new()); Self { inner }
Self { queue, event }
} }
pub fn push_item(&self, fut: impl Future<Output = ()> + 'fut) { pub fn push_item(&self, fut: impl Future<Output = ()> + 'fut) {
let _ = self.queue.push(fut.boxed_local()); let _ = self.inner.queue.push(fut.boxed_local());
self.event.notify(usize::MAX); self.inner.event.notify(usize::MAX);
} }
pub fn drain_items<'outer>( pub fn drain_items<'outer>(
&'outer self, &'outer self,
) -> impl Iterator<Item = LocalBoxFuture<'fut>> + 'outer { ) -> impl Iterator<Item = LocalBoxFuture<'fut>> + 'outer {
self.queue.try_iter() self.inner.queue.try_iter()
} }
pub async fn wait_for_item(&self) { pub async fn wait_for_item(&self) {
if self.queue.is_empty() { if self.inner.queue.is_empty() {
self.event.listen().await; self.inner.event.listen().await;
} }
} }
} }
// Inner structs without ref counting so that outer structs
// have only a single ref counter for extremely cheap clones
#[derive(Debug)]
struct ThreadQueueInner {
queue: ConcurrentQueue<(LuaThread, LuaMultiValue)>,
event: Event,
}
impl ThreadQueueInner {
fn new() -> Self {
let queue = ConcurrentQueue::unbounded();
let event = Event::new();
Self { queue, event }
}
}
#[derive(Debug)]
struct FuturesQueueInner<'fut> {
queue: ConcurrentQueue<LocalBoxFuture<'fut>>,
event: Event,
}
impl FuturesQueueInner<'_> {
pub fn new() -> Self {
let queue = ConcurrentQueue::unbounded();
let event = Event::new();
Self { queue, event }
}
}

View file

@ -5,60 +5,77 @@ use std::{cell::RefCell, rc::Rc};
use event_listener::Event; use event_listener::Event;
// NOTE: This is the hash algorithm that mlua also uses, so we // NOTE: This is the hash algorithm that mlua also uses, so we
// are not adding any additional dependencies / bloat by using it. // are not adding any additional dependencies / bloat by using it.
use mlua::prelude::*;
use rustc_hash::{FxHashMap, FxHashSet}; use rustc_hash::{FxHashMap, FxHashSet};
use crate::{thread_id::ThreadId, util::ThreadResult}; use crate::thread_id::ThreadId;
struct ThreadResultMapInner {
tracked: FxHashSet<ThreadId>,
results: FxHashMap<ThreadId, LuaResult<LuaMultiValue>>,
events: FxHashMap<ThreadId, Rc<Event>>,
}
impl ThreadResultMapInner {
fn new() -> Self {
Self {
tracked: FxHashSet::default(),
results: FxHashMap::default(),
events: FxHashMap::default(),
}
}
}
#[derive(Clone)] #[derive(Clone)]
pub(crate) struct ThreadResultMap { pub(crate) struct ThreadResultMap {
tracked: Rc<RefCell<FxHashSet<ThreadId>>>, inner: Rc<RefCell<ThreadResultMapInner>>,
results: Rc<RefCell<FxHashMap<ThreadId, ThreadResult>>>,
events: Rc<RefCell<FxHashMap<ThreadId, Rc<Event>>>>,
} }
impl ThreadResultMap { impl ThreadResultMap {
pub fn new() -> Self { pub fn new() -> Self {
Self { let inner = Rc::new(RefCell::new(ThreadResultMapInner::new()));
tracked: Rc::new(RefCell::new(FxHashSet::default())), Self { inner }
results: Rc::new(RefCell::new(FxHashMap::default())),
events: Rc::new(RefCell::new(FxHashMap::default())),
}
} }
#[inline(always)] #[inline(always)]
pub fn track(&self, id: ThreadId) { pub fn track(&self, id: ThreadId) {
self.tracked.borrow_mut().insert(id); self.inner.borrow_mut().tracked.insert(id);
} }
#[inline(always)] #[inline(always)]
pub fn is_tracked(&self, id: ThreadId) -> bool { pub fn is_tracked(&self, id: ThreadId) -> bool {
self.tracked.borrow().contains(&id) self.inner.borrow().tracked.contains(&id)
} }
pub fn insert(&self, id: ThreadId, result: ThreadResult) { pub fn insert(&self, id: ThreadId, result: LuaResult<LuaMultiValue>) {
debug_assert!(self.is_tracked(id), "Thread must be tracked"); debug_assert!(self.is_tracked(id), "Thread must be tracked");
self.results.borrow_mut().insert(id, result); let mut inner = self.inner.borrow_mut();
if let Some(event) = self.events.borrow_mut().remove(&id) { inner.results.insert(id, result);
if let Some(event) = inner.events.remove(&id) {
event.notify(usize::MAX); event.notify(usize::MAX);
} }
} }
pub async fn listen(&self, id: ThreadId) { pub async fn listen(&self, id: ThreadId) {
debug_assert!(self.is_tracked(id), "Thread must be tracked"); debug_assert!(self.is_tracked(id), "Thread must be tracked");
if !self.results.borrow().contains_key(&id) { if !self.inner.borrow().results.contains_key(&id) {
let listener = { let listener = {
let mut events = self.events.borrow_mut(); let mut inner = self.inner.borrow_mut();
let event = events.entry(id).or_insert_with(|| Rc::new(Event::new())); let event = inner
.events
.entry(id)
.or_insert_with(|| Rc::new(Event::new()));
event.listen() event.listen()
}; };
listener.await; listener.await;
} }
} }
pub fn remove(&self, id: ThreadId) -> Option<ThreadResult> { pub fn remove(&self, id: ThreadId) -> Option<LuaResult<LuaMultiValue>> {
let res = self.results.borrow_mut().remove(&id)?; let mut inner = self.inner.borrow_mut();
self.tracked.borrow_mut().remove(&id); let res = inner.results.remove(&id)?;
self.events.borrow_mut().remove(&id); inner.tracked.remove(&id);
inner.events.remove(&id);
Some(res) Some(res)
} }
} }

View file

@ -21,7 +21,7 @@ use crate::{
status::Status, status::Status,
thread_id::ThreadId, thread_id::ThreadId,
traits::IntoLuaThread, traits::IntoLuaThread,
util::{run_until_yield, ThreadResult}, util::run_until_yield,
}; };
const ERR_METADATA_ALREADY_ATTACHED: &str = "\ const ERR_METADATA_ALREADY_ATTACHED: &str = "\
@ -248,7 +248,7 @@ impl Scheduler {
*/ */
#[must_use] #[must_use]
pub fn get_thread_result(&self, id: ThreadId) -> Option<LuaResult<LuaMultiValue>> { pub fn get_thread_result(&self, id: ThreadId) -> Option<LuaResult<LuaMultiValue>> {
self.result_map.remove(id).map(|r| r.value(&self.lua)) self.result_map.remove(id)
} }
/** /**
@ -286,7 +286,7 @@ impl Scheduler {
*/ */
let local_exec = LocalExecutor::new(); let local_exec = LocalExecutor::new();
let main_exec = Arc::new(Executor::new()); let main_exec = Arc::new(Executor::new());
let fut_queue = Rc::new(FuturesQueue::new()); let fut_queue = FuturesQueue::new();
/* /*
Store the main executor and queue in Lua, so that they may be used with LuaSchedulerExt. Store the main executor and queue in Lua, so that they may be used with LuaSchedulerExt.
@ -299,12 +299,12 @@ impl Scheduler {
"{ERR_METADATA_ALREADY_ATTACHED}" "{ERR_METADATA_ALREADY_ATTACHED}"
); );
assert!( assert!(
self.lua.app_data_ref::<WeakRc<FuturesQueue>>().is_none(), self.lua.app_data_ref::<FuturesQueue>().is_none(),
"{ERR_METADATA_ALREADY_ATTACHED}" "{ERR_METADATA_ALREADY_ATTACHED}"
); );
self.lua.set_app_data(Arc::downgrade(&main_exec)); self.lua.set_app_data(Arc::downgrade(&main_exec));
self.lua.set_app_data(Rc::downgrade(&fut_queue.clone())); self.lua.set_app_data(fut_queue.clone());
/* /*
Manually tick the Lua executor, while running under the main executor. Manually tick the Lua executor, while running under the main executor.
@ -342,8 +342,7 @@ impl Scheduler {
self.error_callback.call(e); self.error_callback.call(e);
} }
if thread.status() != LuaThreadStatus::Resumable { if thread.status() != LuaThreadStatus::Resumable {
let thread_res = ThreadResult::new(res, &self.lua); result_map_inner.unwrap().insert(id, res);
result_map_inner.unwrap().insert(id, thread_res);
} }
} }
} else { } else {
@ -398,14 +397,14 @@ impl Scheduler {
let mut num_futures = 0; let mut num_futures = 0;
{ {
let _span = trace_span!("Scheduler::drain_spawned").entered(); let _span = trace_span!("Scheduler::drain_spawned").entered();
for (thread, args) in self.queue_spawn.drain_items(&self.lua) { for (thread, args) in self.queue_spawn.drain_items() {
process_thread(thread, args); process_thread(thread, args);
num_spawned += 1; num_spawned += 1;
} }
} }
{ {
let _span = trace_span!("Scheduler::drain_deferred").entered(); let _span = trace_span!("Scheduler::drain_deferred").entered();
for (thread, args) in self.queue_defer.drain_items(&self.lua) { for (thread, args) in self.queue_defer.drain_items() {
process_thread(thread, args); process_thread(thread, args);
num_deferred += 1; num_deferred += 1;
} }

View file

@ -323,7 +323,7 @@ impl LuaSchedulerExt for Lua {
let map = self let map = self
.app_data_ref::<ThreadResultMap>() .app_data_ref::<ThreadResultMap>()
.expect("lua threads results can only be retrieved from within an active scheduler"); .expect("lua threads results can only be retrieved from within an active scheduler");
map.remove(id).map(|r| r.value(self)) map.remove(id)
} }
fn wait_for_thread(&self, id: ThreadId) -> impl Future<Output = ()> { fn wait_for_thread(&self, id: ThreadId) -> impl Future<Output = ()> {
@ -354,10 +354,8 @@ impl LuaSpawnExt for Lua {
F: Future<Output = ()> + 'static, F: Future<Output = ()> + 'static,
{ {
let queue = self let queue = self
.app_data_ref::<WeakRc<FuturesQueue>>() .app_data_ref::<FuturesQueue>()
.expect("tasks can only be spawned within an active scheduler") .expect("tasks can only be spawned within an active scheduler");
.upgrade()
.expect("executor was dropped");
trace!("spawning local task on executor"); trace!("spawning local task on executor");
queue.push_item(fut); queue.push_item(fut);
} }

View file

@ -40,74 +40,6 @@ pub(crate) fn is_poll_pending(value: &LuaValue) -> bool {
.is_some_and(|l| l == Lua::poll_pending()) .is_some_and(|l| l == Lua::poll_pending())
} }
/**
Representation of a [`LuaResult`] with an associated [`LuaMultiValue`] currently stored in the Lua registry.
*/
#[derive(Debug)]
pub(crate) struct ThreadResult {
inner: LuaResult<LuaRegistryKey>,
}
impl ThreadResult {
pub fn new(result: LuaResult<LuaMultiValue>, lua: &Lua) -> Self {
Self {
inner: match result {
Ok(v) => Ok({
let vec = v.into_vec();
lua.create_registry_value(vec).expect("out of memory")
}),
Err(e) => Err(e),
},
}
}
pub fn value(self, lua: &Lua) -> LuaResult<LuaMultiValue> {
match self.inner {
Ok(key) => {
let vec = lua.registry_value(&key).unwrap();
lua.remove_registry_value(key).unwrap();
Ok(LuaMultiValue::from_vec(vec))
}
Err(e) => Err(e.clone()),
}
}
}
/**
Representation of a [`LuaThread`] with its associated arguments currently stored in the Lua registry.
*/
#[derive(Debug)]
pub(crate) struct ThreadWithArgs {
key_thread: LuaRegistryKey,
key_args: LuaRegistryKey,
}
impl ThreadWithArgs {
pub fn new(lua: &Lua, thread: LuaThread, args: LuaMultiValue) -> LuaResult<Self> {
let argsv = args.into_vec();
let key_thread = lua.create_registry_value(thread)?;
let key_args = lua.create_registry_value(argsv)?;
Ok(Self {
key_thread,
key_args,
})
}
pub fn into_inner(self, lua: &Lua) -> (LuaThread, LuaMultiValue) {
let thread = lua.registry_value(&self.key_thread).unwrap();
let argsv = lua.registry_value(&self.key_args).unwrap();
let args = LuaMultiValue::from_vec(argsv);
lua.remove_registry_value(self.key_thread).unwrap();
lua.remove_registry_value(self.key_args).unwrap();
(thread, args)
}
}
/** /**
Wrapper struct to accept either a Lua thread or a Lua function as function argument. Wrapper struct to accept either a Lua thread or a Lua function as function argument.