diff --git a/examples/basic_spawn.rs b/examples/basic_spawn.rs index abb378f..deae81e 100644 --- a/examples/basic_spawn.rs +++ b/examples/basic_spawn.rs @@ -26,7 +26,19 @@ pub fn main() -> LuaResult<()> { Err(e) => Err(e), } }); - task.await.into_lua_err() + + // Wait for it to complete + let result = task.await.into_lua_err(); + + // We can also spawn local tasks that do take up resources + // on the Lua thread, but that do not have the Send bound + if result.is_ok() { + lua.spawn_local(async move { + println!("File read successfully!"); + }); + } + + result })?, )?; diff --git a/examples/lots_of_threads.rs b/examples/lots_of_threads.rs index f290efe..684cace 100644 --- a/examples/lots_of_threads.rs +++ b/examples/lots_of_threads.rs @@ -5,7 +5,7 @@ use std::time::Duration; use async_io::{block_on, Timer}; use mlua::prelude::*; -use mlua_luau_runtime::Runtime; +use mlua_luau_runtime::{Functions, Runtime}; const MAIN_SCRIPT: &str = include_str!("./lua/lots_of_threads.luau"); @@ -17,9 +17,9 @@ pub fn main() -> LuaResult<()> { // Set up persistent Lua environment let lua = Lua::new(); let rt = Runtime::new(&lua); + let fns = Functions::new(&lua)?; - let rt_fns = rt.create_functions()?; - lua.globals().set("spawn", rt_fns.spawn)?; + lua.globals().set("spawn", fns.spawn)?; lua.globals().set( "sleep", lua.create_async_function(|_, ()| async move { diff --git a/examples/scheduler_ordering.rs b/examples/scheduler_ordering.rs index 4d3508e..d91b3dd 100644 --- a/examples/scheduler_ordering.rs +++ b/examples/scheduler_ordering.rs @@ -6,7 +6,7 @@ use std::time::{Duration, Instant}; use async_io::{block_on, Timer}; use mlua::prelude::*; -use mlua_luau_runtime::Runtime; +use mlua_luau_runtime::{Functions, Runtime}; const MAIN_SCRIPT: &str = include_str!("./lua/scheduler_ordering.luau"); @@ -16,10 +16,10 @@ pub fn main() -> LuaResult<()> { // Set up persistent Lua environment let lua = Lua::new(); let rt = Runtime::new(&lua); + let fns = Functions::new(&lua)?; - let rt_fns = rt.create_functions()?; - lua.globals().set("spawn", rt_fns.spawn)?; - lua.globals().set("defer", rt_fns.defer)?; + lua.globals().set("spawn", fns.spawn)?; + lua.globals().set("defer", fns.defer)?; lua.globals().set( "sleep", lua.create_async_function(|_, duration: Option| async move { diff --git a/lib/functions.rs b/lib/functions.rs new file mode 100644 index 0000000..7f50d60 --- /dev/null +++ b/lib/functions.rs @@ -0,0 +1,120 @@ +#![allow(unused_imports)] +#![allow(clippy::module_name_repetitions)] + +use mlua::prelude::*; + +use crate::{ + error_callback::ThreadErrorCallback, + queue::{DeferredThreadQueue, SpawnedThreadQueue}, + runtime::Runtime, + util::LuaThreadOrFunction, +}; + +const ERR_METADATA_NOT_ATTACHED: &str = "\ +Lua state does not have runtime metadata attached!\ +\nThis is most likely caused by creating functions outside of a runtime.\ +\nRuntime functions must always be created from within an active runtime.\ +"; + +/** + A collection of lua functions that may be called to interact with a [`Runtime`]. +*/ +pub struct Functions<'lua> { + /** + Resumes a function / thread once instantly, and runs until first yield. + + Spawns onto the runtime queue if not completed. + */ + pub spawn: LuaFunction<'lua>, + /** + Defers a function / thread onto the runtime queue. + + Does not resume instantly, only adds to the queue. + */ + pub defer: LuaFunction<'lua>, + /** + Cancels a function / thread, removing it from the queue. + */ + pub cancel: LuaFunction<'lua>, +} + +impl<'lua> Functions<'lua> { + /** + Creates a new collection of Lua functions that may be called to interact with a [`Runtime`]. + + # Errors + + Errors when out of memory, or if default Lua globals are missing. + + # Panics + + Panics when the given [`Lua`] instance does not have an attached [`Runtime`]. + */ + pub fn new(lua: &'lua Lua) -> LuaResult { + let spawn_queue = lua + .app_data_ref::() + .expect(ERR_METADATA_NOT_ATTACHED) + .clone(); + let defer_queue = lua + .app_data_ref::() + .expect(ERR_METADATA_NOT_ATTACHED) + .clone(); + let error_callback = lua + .app_data_ref::() + .expect(ERR_METADATA_NOT_ATTACHED) + .clone(); + + let spawn = lua.create_function( + move |lua, (tof, args): (LuaThreadOrFunction, LuaMultiValue)| { + let thread = tof.into_thread(lua)?; + if thread.status() == LuaThreadStatus::Resumable { + // NOTE: We need to resume the thread once instantly for correct behavior, + // and only if we get the pending value back we can spawn to async executor + match thread.resume::<_, LuaValue>(args.clone()) { + Ok(v) => { + if v.as_light_userdata() + .map(|l| l == Lua::poll_pending()) + .unwrap_or_default() + { + spawn_queue.push_item(lua, &thread, args)?; + } + } + Err(e) => { + error_callback.call(&e); + } + }; + } + Ok(thread) + }, + )?; + + let defer = lua.create_function( + move |lua, (tof, args): (LuaThreadOrFunction, LuaMultiValue)| { + let thread = tof.into_thread(lua)?; + if thread.status() == LuaThreadStatus::Resumable { + defer_queue.push_item(lua, &thread, args)?; + } + Ok(thread) + }, + )?; + + let close = lua + .globals() + .get::<_, LuaTable>("coroutine")? + .get::<_, LuaFunction>("close")?; + let close_key = lua.create_registry_value(close)?; + let cancel = lua.create_function(move |lua, thread: LuaThread| { + let close: LuaFunction = lua.registry_value(&close_key)?; + match close.call(thread) { + Err(LuaError::CoroutineInactive) | Ok(()) => Ok(()), + Err(e) => Err(e), + } + })?; + + Ok(Self { + spawn, + defer, + cancel, + }) + } +} diff --git a/lib/lib.rs b/lib/lib.rs index bc326a2..ccc41c9 100644 --- a/lib/lib.rs +++ b/lib/lib.rs @@ -1,4 +1,5 @@ mod error_callback; +mod functions; mod handle; mod queue; mod runtime; @@ -6,7 +7,8 @@ mod status; mod traits; mod util; +pub use functions::Functions; pub use handle::Handle; -pub use runtime::{Functions, Runtime}; +pub use runtime::Runtime; pub use status::Status; pub use traits::{IntoLuaThread, LuaRuntimeExt}; diff --git a/lib/queue.rs b/lib/queue.rs index bb9ec40..54837f4 100644 --- a/lib/queue.rs +++ b/lib/queue.rs @@ -1,8 +1,9 @@ -use std::sync::Arc; +use std::{pin::Pin, rc::Rc, sync::Arc}; use concurrent_queue::ConcurrentQueue; use derive_more::{Deref, DerefMut}; use event_listener::Event; +use futures_lite::{Future, FutureExt}; use mlua::prelude::*; use crate::{handle::Handle, traits::IntoLuaThread, util::ThreadWithArgs}; @@ -98,3 +99,42 @@ impl DeferredThreadQueue { Self(ThreadQueue::new()) } } + +pub type LocalBoxFuture<'fut> = Pin + 'fut>>; + +/** + Queue for storing local futures. + + Provides methods for pushing and draining the queue, as + well as listening for new items being pushed to the queue. +*/ +#[derive(Debug, Clone)] +pub(crate) struct FuturesQueue<'fut> { + queue: Rc>>, + event: Arc, +} + +impl<'fut> FuturesQueue<'fut> { + pub fn new() -> Self { + let queue = Rc::new(ConcurrentQueue::unbounded()); + let event = Arc::new(Event::new()); + Self { queue, event } + } + + pub fn push_item(&self, fut: impl Future + 'fut) { + let _ = self.queue.push(fut.boxed_local()); + self.event.notify(usize::MAX); + } + + pub fn drain_items<'outer>( + &'outer self, + ) -> impl Iterator> + 'outer { + self.queue.try_iter() + } + + pub async fn wait_for_item(&self) { + if self.queue.is_empty() { + self.event.listen().await; + } + } +} diff --git a/lib/runtime.rs b/lib/runtime.rs index 49a1a0c..b00f355 100644 --- a/lib/runtime.rs +++ b/lib/runtime.rs @@ -2,8 +2,8 @@ use std::{ cell::Cell, - rc::Rc, - sync::{Arc, Weak}, + rc::{Rc, Weak as WeakRc}, + sync::{Arc, Weak as WeakArc}, }; use futures_lite::prelude::*; @@ -15,10 +15,10 @@ use tracing::Instrument; use crate::{ error_callback::ThreadErrorCallback, handle::Handle, - queue::{DeferredThreadQueue, SpawnedThreadQueue}, + queue::{DeferredThreadQueue, FuturesQueue, SpawnedThreadQueue}, status::Status, traits::IntoLuaThread, - util::{run_until_yield, LuaThreadOrFunction}, + util::run_until_yield, }; const ERR_METADATA_ALREADY_ATTACHED: &str = "\ @@ -32,6 +32,10 @@ Lua state runtime metadata was unexpectedly removed!\ \nThis should never happen, and is likely a bug in the runtime.\ "; +const ERR_SET_CALLBACK_WHEN_RUNNING: &str = "\ +Cannot set error callback when runtime is running!\ +"; + /** A runtime for running Lua threads and async tasks. */ @@ -49,6 +53,10 @@ impl<'lua> Runtime<'lua> { Creates a new runtime for the given Lua state. This runtime will have a default error callback that prints errors to stderr. + + # Panics + + Panics if the given Lua state already has a runtime attached to it. */ #[must_use] pub fn new(lua: &'lua Lua) -> Runtime<'lua> { @@ -56,6 +64,24 @@ impl<'lua> Runtime<'lua> { let queue_defer = DeferredThreadQueue::new(); let error_callback = ThreadErrorCallback::default(); let status = Rc::new(Cell::new(Status::NotStarted)); + + assert!( + lua.app_data_ref::().is_none(), + "{ERR_METADATA_ALREADY_ATTACHED}" + ); + assert!( + lua.app_data_ref::().is_none(), + "{ERR_METADATA_ALREADY_ATTACHED}" + ); + assert!( + lua.app_data_ref::().is_none(), + "{ERR_METADATA_ALREADY_ATTACHED}" + ); + + lua.set_app_data(queue_spawn.clone()); + lua.set_app_data(queue_defer.clone()); + lua.set_app_data(error_callback.clone()); + Runtime { lua, queue_spawn, @@ -79,8 +105,16 @@ impl<'lua> Runtime<'lua> { This callback will be called whenever a Lua thread errors. Overwrites any previous error callback. + + # Panics + + Panics if the runtime is currently running. */ pub fn set_error_callback(&self, callback: impl Fn(LuaError) + Send + 'static) { + assert!( + !self.status().is_running(), + "{ERR_SET_CALLBACK_WHEN_RUNNING}" + ); self.error_callback.replace(callback); } @@ -88,22 +122,19 @@ impl<'lua> Runtime<'lua> { Clears the error callback for this runtime. This will remove any current error callback, including default(s). + + # Panics + + Panics if the runtime is currently running. */ pub fn remove_error_callback(&self) { + assert!( + !self.status().is_running(), + "{ERR_SET_CALLBACK_WHEN_RUNNING}" + ); self.error_callback.clear(); } - /** - Creates a collection of lua functions that may be called to interact with the runtime. - - # Errors - - Errors when out of memory. - */ - pub fn create_functions(&self) -> LuaResult { - Functions::new(self) - } - /** Spawns a chunk / function / thread onto the runtime queue. @@ -166,6 +197,7 @@ impl<'lua> Runtime<'lua> { Panics if the given Lua state already has a runtime attached to it. */ + #[allow(clippy::too_many_lines)] pub async fn run(&self) { /* Create new executors to use - note that we do not need create multiple executors @@ -178,30 +210,27 @@ impl<'lua> Runtime<'lua> { We also use the main executor to drive the main loop below forward, saving a tiny bit of processing from going on the Lua executor itself. */ - let lua_exec = LocalExecutor::new(); + let local_exec = LocalExecutor::new(); let main_exec = Arc::new(Executor::new()); + let fut_queue = Rc::new(FuturesQueue::new()); /* - Store the main executor and queues in Lua, so that they may be used with LuaRuntimeExt. + Store the main executor and queue in Lua, so that they may be used with LuaRuntimeExt. Also ensure we do not already have an executor or queues - these are definite user errors and may happen if the user tries to run multiple runtimes on the same Lua state at once. */ assert!( - self.lua.app_data_ref::>().is_none(), + self.lua.app_data_ref::>().is_none(), "{ERR_METADATA_ALREADY_ATTACHED}" ); assert!( - self.lua.app_data_ref::().is_none(), - "{ERR_METADATA_ALREADY_ATTACHED}" - ); - assert!( - self.lua.app_data_ref::().is_none(), + self.lua.app_data_ref::>().is_none(), "{ERR_METADATA_ALREADY_ATTACHED}" ); + self.lua.set_app_data(Arc::downgrade(&main_exec)); - self.lua.set_app_data(self.queue_spawn.clone()); - self.lua.set_app_data(self.queue_defer.clone()); + self.lua.set_app_data(Rc::downgrade(&fut_queue.clone())); /* Manually tick the Lua executor, while running under the main executor. @@ -209,7 +238,8 @@ impl<'lua> Runtime<'lua> { 1. A Lua thread is available to run on the spawned queue 2. A Lua thread is available to run on the deferred queue - 3. Task(s) scheduled on the Lua executor have made progress and should be polled again + 3. A new thread-local future is available to run on the local executor + 4. Task(s) scheduled on the Lua executor have made progress and should be polled again This ordering is vital to ensure that we don't accidentally exit the main loop when there are new Lua threads to enqueue and potentially more work to be done. @@ -219,7 +249,7 @@ impl<'lua> Runtime<'lua> { // NOTE: Thread may have been cancelled from Lua // before we got here, so we need to check it again if thread.status() == LuaThreadStatus::Resumable { - lua_exec + local_exec .spawn(async move { if let Err(e) = run_until_yield(thread, args).await { self.error_callback.call(&e); @@ -232,22 +262,24 @@ impl<'lua> Runtime<'lua> { loop { let fut_spawn = self.queue_spawn.wait_for_item(); // 1 let fut_defer = self.queue_defer.wait_for_item(); // 2 + let fut_futs = fut_queue.wait_for_item(); // 3 - // 3 + // 4 let mut num_processed = 0; let span_tick = tracing::debug_span!("tick_executor"); let fut_tick = async { - lua_exec.tick().await; + local_exec.tick().await; // NOTE: Try to do as much work as possible instead of just a single tick() num_processed += 1; - while lua_exec.try_tick() { + while local_exec.try_tick() { num_processed += 1; } }; - // 1 + 2 + 3 + // 1 + 2 + 3 + 4 fut_spawn .or(fut_defer) + .or(fut_futs) .or(fut_tick.instrument(span_tick.or_current())) .await; @@ -271,9 +303,19 @@ impl<'lua> Runtime<'lua> { tracing::trace!(num_spawned, num_deferred, "tasks_spawned"); } + // Process spawned futures + let mut num_futs = 0; + for fut in fut_queue.drain_items() { + local_exec.spawn(fut).detach(); + num_futs += 1; + } + if num_futs > 0 { + tracing::trace!(num_futs, "futures_spawned"); + } + // Empty executor = we didn't spawn any new Lua tasks // above, and there are no remaining tasks to run later - if lua_exec.is_empty() { + if local_exec.is_empty() { break; } } @@ -291,95 +333,24 @@ impl<'lua> Runtime<'lua> { // Clean up self.lua - .remove_app_data::>() + .remove_app_data::>() .expect(ERR_METADATA_REMOVED); + self.lua + .remove_app_data::>() + .expect(ERR_METADATA_REMOVED); + } +} + +impl Drop for Runtime<'_> { + fn drop(&mut self) { self.lua .remove_app_data::() .expect(ERR_METADATA_REMOVED); self.lua .remove_app_data::() .expect(ERR_METADATA_REMOVED); - } -} - -/** - A collection of lua functions that may be called to interact with a [`Runtime`]. -*/ -pub struct Functions<'lua> { - /** - Spawns a function / thread onto the runtime queue. - Resumes once instantly, and runs until first yield. - Adds to the queue if not completed. - */ - pub spawn: LuaFunction<'lua>, - /** - Defers a function / thread onto the runtime queue. - Does not resume instantly, only adds to the queue. - */ - pub defer: LuaFunction<'lua>, - /** - Cancels a function / thread, removing it from the queue. - */ - pub cancel: LuaFunction<'lua>, -} - -impl<'lua> Functions<'lua> { - fn new(rt: &Runtime<'lua>) -> LuaResult { - let error_callback = rt.error_callback.clone(); - let spawn_queue = rt.queue_spawn.clone(); - let spawn = rt.lua.create_function( - move |lua, (tof, args): (LuaThreadOrFunction, LuaMultiValue)| { - let thread = tof.into_thread(lua)?; - if thread.status() == LuaThreadStatus::Resumable { - // NOTE: We need to resume the thread once instantly for correct behavior, - // and only if we get the pending value back we can spawn to async executor - match thread.resume::<_, LuaValue>(args.clone()) { - Ok(v) => { - if v.as_light_userdata() - .map(|l| l == Lua::poll_pending()) - .unwrap_or_default() - { - spawn_queue.push_item(lua, &thread, args)?; - } - } - Err(e) => { - error_callback.call(&e); - } - }; - } - Ok(thread) - }, - )?; - - let defer_queue = rt.queue_defer.clone(); - let defer = rt.lua.create_function( - move |lua, (tof, args): (LuaThreadOrFunction, LuaMultiValue)| { - let thread = tof.into_thread(lua)?; - if thread.status() == LuaThreadStatus::Resumable { - defer_queue.push_item(lua, &thread, args)?; - } - Ok(thread) - }, - )?; - - let close = rt - .lua - .globals() - .get::<_, LuaTable>("coroutine")? - .get::<_, LuaFunction>("close")?; - let close_key = rt.lua.create_registry_value(close)?; - let cancel = rt.lua.create_function(move |lua, thread: LuaThread| { - let close: LuaFunction = lua.registry_value(&close_key)?; - match close.call(thread) { - Err(LuaError::CoroutineInactive) | Ok(()) => Ok(()), - Err(e) => Err(e), - } - })?; - - Ok(Self { - spawn, - defer, - cancel, - }) + self.lua + .remove_app_data::() + .expect(ERR_METADATA_REMOVED); } } diff --git a/lib/traits.rs b/lib/traits.rs index 2dfc21b..e370aa0 100644 --- a/lib/traits.rs +++ b/lib/traits.rs @@ -1,6 +1,7 @@ +#![allow(unused_imports)] #![allow(clippy::missing_errors_doc)] -use std::{future::Future, sync::Weak}; +use std::{future::Future, rc::Weak as WeakRc, sync::Weak as WeakArc}; use mlua::prelude::*; @@ -8,7 +9,8 @@ use async_executor::{Executor, Task}; use crate::{ handle::Handle, - queue::{DeferredThreadQueue, SpawnedThreadQueue}, + queue::{DeferredThreadQueue, FuturesQueue, SpawnedThreadQueue}, + runtime::Runtime, }; /** @@ -58,10 +60,7 @@ where } /** - Trait for scheduling Lua threads and spawning `Send` futures on the current executor. - - For spawning `!Send` futures on the same local executor as a [`Lua`] - VM instance, [`Lua::create_async_function`] should be used instead. + Trait for scheduling Lua threads and spawning futures on the current executor. */ pub trait LuaRuntimeExt<'lua> { /** @@ -115,7 +114,7 @@ pub trait LuaRuntimeExt<'lua> { lua.globals().set( "spawnBackgroundTask", lua.create_async_function(|lua, ()| async move { - lua.spawn_future(async move { + lua.spawn(async move { println!("Hello from background task!"); }).await; Ok(()) @@ -133,6 +132,47 @@ pub trait LuaRuntimeExt<'lua> { [`Runtime`]: crate::Runtime */ fn spawn(&self, fut: impl Future + Send + 'static) -> Task; + + /** + Spawns the given thread-local future on the current executor. + + Note that this future will run detached and always to completion, + preventing the [`Runtime`] was spawned on from completing until done. + + # Panics + + Panics if called outside of a running [`Runtime`]. + + # Example usage + + ```rust + use async_io::block_on; + + use mlua::prelude::*; + use mlua_luau_runtime::*; + + fn main() -> LuaResult<()> { + let lua = Lua::new(); + + lua.globals().set( + "spawnLocalTask", + lua.create_async_function(|lua, ()| async move { + lua.spawn_local(async move { + println!("Hello from local task!"); + }); + Ok(()) + })? + )?; + + let rt = Runtime::new(&lua); + rt.push_thread_front(lua.load("spawnLocalTask()"), ()); + block_on(rt.run()); + + Ok(()) + } + ``` + */ + fn spawn_local(&self, fut: impl Future + 'static); } impl<'lua> LuaRuntimeExt<'lua> for Lua { @@ -160,11 +200,21 @@ impl<'lua> LuaRuntimeExt<'lua> for Lua { fn spawn(&self, fut: impl Future + Send + 'static) -> Task { let exec = self - .app_data_ref::>() + .app_data_ref::>() .expect("futures can only be spawned within a runtime") .upgrade() .expect("executor was dropped"); tracing::trace!("spawning future on executor"); exec.spawn(fut) } + + fn spawn_local(&self, fut: impl Future + 'static) { + let queue = self + .app_data_ref::>() + .expect("futures can only be spawned within a runtime") + .upgrade() + .expect("executor was dropped"); + tracing::trace!("spawning local future on executor"); + queue.push_item(fut); + } }