From b03a0101e2750a879260e4a7ef7da84996a43f7a Mon Sep 17 00:00:00 2001 From: Filip Tibell Date: Sun, 28 Jan 2024 14:51:40 +0100 Subject: [PATCH] Expand handle struct and runtime ext trait to be able to use from inside lua --- Cargo.lock | 50 ++++++++++++++++++++++++- Cargo.toml | 1 + README.md | 2 +- examples/basic_spawn.rs | 4 +- lib/handle.rs | 45 ++++++++++++++++++---- lib/lib.rs | 2 +- lib/queue.rs | 41 +++++++++++++++++++- lib/runtime.rs | 81 ++++++++++++++++++++++++---------------- lib/traits.rs | 83 ++++++++++++++++++++++++++++++++++++----- 9 files changed, 254 insertions(+), 55 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1903b1e..9db513f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -144,12 +144,31 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "convert_case" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6245d59a3e82a7fc217c5828a6692dbc6dfb63a0c8c90495621f7b9d79704a0e" + [[package]] name = "crossbeam-utils" version = "0.8.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "248e3bacc7dc6baa3b21e405ee045c3047101a49145e7e9eca583ab4c2ca5345" +[[package]] +name = "derive_more" +version = "0.99.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fb810d30a7c1953f91334de7244731fc3f3c10d7fe163338a35b9f640960321" +dependencies = [ + "convert_case", + "proc-macro2", + "quote", + "rustc_version", + "syn 1.0.109", +] + [[package]] name = "erased-serde" version = "0.4.2" @@ -315,6 +334,7 @@ dependencies = [ "async-fs", "async-io", "concurrent-queue", + "derive_more", "event-listener", "futures-lite", "mlua", @@ -447,6 +467,15 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" +[[package]] +name = "rustc_version" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366" +dependencies = [ + "semver", +] + [[package]] name = "rustix" version = "0.38.30" @@ -460,6 +489,12 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "semver" +version = "1.0.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b97ed7a9823b74f99c7742f5336af7be5ecd3eeafcb1507d1fa93347b1d589b0" + [[package]] name = "serde" version = "1.0.195" @@ -487,7 +522,7 @@ checksum = "46fe8f8603d81ba86327b23a2e9cdf49e1255fb94a4c5f297f6ee0547178ea2c" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.48", ] [[package]] @@ -514,6 +549,17 @@ version = "1.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6ecd384b10a64542d77071bd64bd7b231f4ed5940fba55e98c3de13824cf3d7" +[[package]] +name = "syn" +version = "1.0.109" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + [[package]] name = "syn" version = "2.0.48" @@ -554,7 +600,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.48", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 353926e..d954505 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,6 +12,7 @@ categories = ["async"] [dependencies] async-executor = "1.8" concurrent-queue = "2.4" +derive_more = "0.99" event-listener = "4.0" futures-lite = "2.2" tracing = "0.1" diff --git a/README.md b/README.md index 53ba5b3..9af6d6c 100644 --- a/README.md +++ b/README.md @@ -52,7 +52,7 @@ lua.globals().set( lua.create_async_function(|lua, path: String| async move { // Spawn background task that does not take up resources on the lua thread // Normally, futures in mlua can not be shared across threads, but this can - let task = lua.spawn(async move { + let task = lua.spawn_future(async move { match read_to_string(path).await { Ok(s) => Ok(Some(s)), Err(e) if e.kind() == ErrorKind::NotFound => Ok(None), diff --git a/examples/basic_spawn.rs b/examples/basic_spawn.rs index 07643c3..83a5d89 100644 --- a/examples/basic_spawn.rs +++ b/examples/basic_spawn.rs @@ -6,7 +6,7 @@ use async_fs::read_to_string; use async_io::block_on; use mlua::prelude::*; -use mlua_luau_runtime::{LuaSpawnExt, Runtime}; +use mlua_luau_runtime::{LuaRuntimeExt, Runtime}; const MAIN_SCRIPT: &str = include_str!("./lua/basic_spawn.luau"); @@ -19,7 +19,7 @@ pub fn main() -> LuaResult<()> { "readFile", lua.create_async_function(|lua, path: String| async move { // Spawn background task that does not take up resources on the Lua thread - let task = lua.spawn(async move { + let task = lua.spawn_future(async move { match read_to_string(path).await { Ok(s) => Ok(Some(s)), Err(e) if e.kind() == ErrorKind::NotFound => Ok(None), diff --git a/lib/handle.rs b/lib/handle.rs index 39acc6c..d23671a 100644 --- a/lib/handle.rs +++ b/lib/handle.rs @@ -2,27 +2,35 @@ #![allow(clippy::missing_panics_doc)] #![allow(clippy::module_name_repetitions)] -use std::{cell::RefCell, rc::Rc}; +use std::{ + cell::{Cell, RefCell}, + rc::Rc, +}; +use event_listener::Event; use mlua::prelude::*; use crate::{ runtime::Runtime, status::Status, + traits::IntoLuaThread, util::{run_until_yield, ThreadWithArgs}, - IntoLuaThread, }; /** A handle to a thread that has been spawned onto a [`Runtime`]. - This handle contains a single public method, [`Handle::result`], which may - be used to extract the result of the thread, once it has finished running. + This handle contains a public method, [`Handle::result`], which may + be used to extract the result of the thread, once it finishes running. + + A result may be waited for using the [`Handle::listen`] method. */ #[derive(Debug, Clone)] pub struct Handle { thread: Rc>>, result: Rc>>, + status: Rc>, + event: Rc, } impl Handle { @@ -39,6 +47,8 @@ impl Handle { Ok(Self { thread: Rc::new(RefCell::new(Some(packed))), result: Rc::new(RefCell::new(None)), + status: Rc::new(Cell::new(false)), + event: Rc::new(Event::new()), }) } @@ -59,7 +69,12 @@ impl Handle { .into_inner(lua) } - fn set<'lua>(&self, lua: &'lua Lua, result: &LuaResult>) -> LuaResult<()> { + fn set<'lua>( + &self, + lua: &'lua Lua, + result: &LuaResult>, + is_final: bool, + ) -> LuaResult<()> { self.result.borrow_mut().replace(( result.is_ok(), match &result { @@ -67,6 +82,10 @@ impl Handle { Err(e) => lua.create_registry_value(e.clone())?, }, )); + self.status.replace(is_final); + if is_final { + self.event.notify(usize::MAX); + } Ok(()) } @@ -90,6 +109,17 @@ impl Handle { Err(lua.registry_value(key).unwrap()) }) } + + /** + Waits for this handle to have its final result available. + + Does not wait if the final result is already available. + */ + pub async fn listen(&self) { + if !self.status.get() { + self.event.listen().await; + } + } } impl LuaUserData for Handle { @@ -103,8 +133,9 @@ impl LuaUserData for Handle { it may be caught using the runtime and any error callback(s) */ let (thread, args) = this.take(lua); - let result = run_until_yield(thread, args).await; - this.set(lua, &result)?; + let result = run_until_yield(thread.clone(), args).await; + let is_final = thread.status() != LuaThreadStatus::Resumable; + this.set(lua, &result, is_final)?; result }); } diff --git a/lib/lib.rs b/lib/lib.rs index de4cc5e..7833ecd 100644 --- a/lib/lib.rs +++ b/lib/lib.rs @@ -9,4 +9,4 @@ mod util; pub use handle::Handle; pub use runtime::Runtime; pub use status::Status; -pub use traits::{IntoLuaThread, LuaSpawnExt}; +pub use traits::{IntoLuaThread, LuaRuntimeExt}; diff --git a/lib/queue.rs b/lib/queue.rs index e0a6eeb..bb9ec40 100644 --- a/lib/queue.rs +++ b/lib/queue.rs @@ -1,10 +1,11 @@ use std::sync::Arc; use concurrent_queue::ConcurrentQueue; +use derive_more::{Deref, DerefMut}; use event_listener::Event; use mlua::prelude::*; -use crate::{util::ThreadWithArgs, IntoLuaThread}; +use crate::{handle::Handle, traits::IntoLuaThread, util::ThreadWithArgs}; /** Queue for storing [`LuaThread`]s with associated arguments. @@ -43,6 +44,20 @@ impl ThreadQueue { Ok(()) } + pub fn push_item_with_handle<'lua>( + &self, + lua: &'lua Lua, + thread: impl IntoLuaThread<'lua>, + args: impl IntoLuaMulti<'lua>, + ) -> LuaResult { + let handle = Handle::new(lua, thread, args)?; + let handle_thread = handle.create_thread(lua)?; + + self.push_item(lua, handle_thread, ())?; + + Ok(handle) + } + pub fn drain_items<'outer, 'lua>( &'outer self, lua: &'lua Lua, @@ -59,3 +74,27 @@ impl ThreadQueue { } } } + +/** + Alias for [`ThreadQueue`], providing a newtype to store in Lua app data. +*/ +#[derive(Debug, Clone, Deref, DerefMut)] +pub(crate) struct SpawnedThreadQueue(ThreadQueue); + +impl SpawnedThreadQueue { + pub fn new() -> Self { + Self(ThreadQueue::new()) + } +} + +/** + Alias for [`ThreadQueue`], providing a newtype to store in Lua app data. +*/ +#[derive(Debug, Clone, Deref, DerefMut)] +pub(crate) struct DeferredThreadQueue(ThreadQueue); + +impl DeferredThreadQueue { + pub fn new() -> Self { + Self(ThreadQueue::new()) + } +} diff --git a/lib/runtime.rs b/lib/runtime.rs index 59d92ef..f10963f 100644 --- a/lib/runtime.rs +++ b/lib/runtime.rs @@ -12,20 +12,33 @@ use mlua::prelude::*; use async_executor::{Executor, LocalExecutor}; use tracing::Instrument; -use crate::{status::Status, util::run_until_yield, Handle}; - -use super::{ - error_callback::ThreadErrorCallback, queue::ThreadQueue, traits::IntoLuaThread, - util::LuaThreadOrFunction, +use crate::{ + error_callback::ThreadErrorCallback, + handle::Handle, + queue::{DeferredThreadQueue, SpawnedThreadQueue}, + status::Status, + traits::IntoLuaThread, + util::{run_until_yield, LuaThreadOrFunction}, }; +const ERR_METADATA_ALREADY_ATTACHED: &str = "\ +Lua state already has runtime metadata attached!\ +\nThis may be caused by running multiple runtimes on the same Lua state, or a call to Runtime::run being cancelled.\ +\nOnly one runtime can be used per Lua state at once, and runtimes must always run until completion.\ +"; + +const ERR_METADATA_REMOVED: &str = "\ +Lua state runtime metadata was unexpectedly removed!\ +\nThis should never happen, and is likely a bug in the runtime.\ +"; + /** A runtime for running Lua threads and async tasks. */ pub struct Runtime<'lua> { lua: &'lua Lua, - queue_spawn: ThreadQueue, - queue_defer: ThreadQueue, + queue_spawn: SpawnedThreadQueue, + queue_defer: DeferredThreadQueue, error_callback: ThreadErrorCallback, status: Rc>, } @@ -38,8 +51,8 @@ impl<'lua> Runtime<'lua> { */ #[must_use] pub fn new(lua: &'lua Lua) -> Runtime<'lua> { - let queue_spawn = ThreadQueue::new(); - let queue_defer = ThreadQueue::new(); + let queue_spawn = SpawnedThreadQueue::new(); + let queue_defer = DeferredThreadQueue::new(); let error_callback = ThreadErrorCallback::default(); let status = Rc::new(Cell::new(Status::NotStarted)); Runtime { @@ -100,13 +113,8 @@ impl<'lua> Runtime<'lua> { args: impl IntoLuaMulti<'lua>, ) -> LuaResult { tracing::debug!(deferred = false, "new runtime thread"); - - let handle = Handle::new(self.lua, thread, args)?; - let handle_thread = handle.create_thread(self.lua)?; - - self.queue_spawn.push_item(self.lua, handle_thread, ())?; - - Ok(handle) + self.queue_spawn + .push_item_with_handle(self.lua, thread, args) } /** @@ -132,13 +140,8 @@ impl<'lua> Runtime<'lua> { args: impl IntoLuaMulti<'lua>, ) -> LuaResult { tracing::debug!(deferred = true, "new runtime thread"); - - let handle = Handle::new(self.lua, thread, args)?; - let handle_thread = handle.create_thread(self.lua)?; - - self.queue_defer.push_item(self.lua, handle_thread, ())?; - - Ok(handle) + self.queue_defer + .push_item_with_handle(self.lua, thread, args) } /** @@ -228,20 +231,26 @@ impl<'lua> Runtime<'lua> { let main_exec = Arc::new(Executor::new()); /* - Store the main executor in Lua, so that it may be used with LuaSpawnExt. + Store the main executor and queues in Lua, so that they may be used with LuaRuntimeExt. - Also ensure we do not already have an executor - this is a definite user error - and may happen if the user tries to run multiple runtimes on the same Lua state. + Also ensure we do not already have an executor or queues - these are definite user errors + and may happen if the user tries to run multiple runtimes on the same Lua state at once. */ assert!( self.lua.app_data_ref::>().is_none(), - "\ - Lua state already has an executor attached!\ - \nThis may be caused by running multiple runtimes on the same Lua state, or a call to Runtime::run being cancelled.\ - \nOnly one runtime can be used per Lua state at once, and runtimes must always run until completion.\ - " + "{ERR_METADATA_ALREADY_ATTACHED}" + ); + assert!( + self.lua.app_data_ref::().is_none(), + "{ERR_METADATA_ALREADY_ATTACHED}" + ); + assert!( + self.lua.app_data_ref::().is_none(), + "{ERR_METADATA_ALREADY_ATTACHED}" ); self.lua.set_app_data(Arc::downgrade(&main_exec)); + self.lua.set_app_data(self.queue_spawn.clone()); + self.lua.set_app_data(self.queue_defer.clone()); /* Manually tick the Lua executor, while running under the main executor. @@ -330,6 +339,14 @@ impl<'lua> Runtime<'lua> { self.status.set(Status::Completed); // Clean up - self.lua.remove_app_data::>(); + self.lua + .remove_app_data::>() + .expect(ERR_METADATA_REMOVED); + self.lua + .remove_app_data::() + .expect(ERR_METADATA_REMOVED); + self.lua + .remove_app_data::() + .expect(ERR_METADATA_REMOVED); } } diff --git a/lib/traits.rs b/lib/traits.rs index 7625af6..bd906e5 100644 --- a/lib/traits.rs +++ b/lib/traits.rs @@ -1,9 +1,16 @@ +#![allow(clippy::missing_errors_doc)] + use std::{future::Future, sync::Weak}; use mlua::prelude::*; use async_executor::{Executor, Task}; +use crate::{ + handle::Handle, + queue::{DeferredThreadQueue, SpawnedThreadQueue}, +}; + /** Trait for any struct that can be turned into an [`LuaThread`] and passed to the runtime, implemented for the following types: @@ -51,20 +58,50 @@ where } /** - Trait for spawning `Send` futures on the current executor. + Trait for scheduling Lua threads and spawning `Send` futures on the current executor. For spawning `!Send` futures on the same local executor as a [`Lua`] VM instance, [`Lua::create_async_function`] should be used instead. */ -pub trait LuaSpawnExt<'lua> { +pub trait LuaRuntimeExt<'lua> { + /** + Spawns a lua thread onto the current runtime. + + See [`Runtime::spawn_thread`] for more information. + + # Panics + + Panics if called outside of a running [`Runtime`]. + */ + fn spawn_thread( + &'lua self, + thread: impl IntoLuaThread<'lua>, + args: impl IntoLuaMulti<'lua>, + ) -> LuaResult; + + /** + Defers a lua thread onto the current runtime. + + See [`Runtime::defer_thread`] for more information. + + # Panics + + Panics if called outside of a running [`Runtime`]. + */ + fn defer_thread( + &'lua self, + thread: impl IntoLuaThread<'lua>, + args: impl IntoLuaMulti<'lua>, + ) -> LuaResult; + /** Spawns the given future on the current executor and returns its [`Task`]. - ### Panics + # Panics - Panics if called outside of a [`Runtime`]. + Panics if called outside of a running [`Runtime`]. - ### Example usage + # Example usage ```rust use async_io::block_on; @@ -78,7 +115,7 @@ pub trait LuaSpawnExt<'lua> { lua.globals().set( "spawnBackgroundTask", lua.create_async_function(|lua, ()| async move { - lua.spawn(async move { + lua.spawn_future(async move { println!("Hello from background task!"); }).await; Ok(()) @@ -95,11 +132,39 @@ pub trait LuaSpawnExt<'lua> { [`Runtime`]: crate::Runtime */ - fn spawn(&self, fut: impl Future + Send + 'static) -> Task; + fn spawn_future( + &self, + fut: impl Future + Send + 'static, + ) -> Task; } -impl<'lua> LuaSpawnExt<'lua> for Lua { - fn spawn(&self, fut: impl Future + Send + 'static) -> Task { +impl<'lua> LuaRuntimeExt<'lua> for Lua { + fn spawn_thread( + &'lua self, + thread: impl IntoLuaThread<'lua>, + args: impl IntoLuaMulti<'lua>, + ) -> LuaResult { + let queue = self + .app_data_ref::() + .expect("lua threads can only be spawned within a runtime"); + queue.push_item_with_handle(self, thread, args) + } + + fn defer_thread( + &'lua self, + thread: impl IntoLuaThread<'lua>, + args: impl IntoLuaMulti<'lua>, + ) -> LuaResult { + let queue = self + .app_data_ref::() + .expect("lua threads can only be deferred within a runtime"); + queue.push_item_with_handle(self, thread, args) + } + + fn spawn_future( + &self, + fut: impl Future + Send + 'static, + ) -> Task { let exec = self .app_data_ref::>() .expect("futures can only be spawned within a runtime")