From acae6a6369a43558fd9b2b5df92b4ce35e8754c5 Mon Sep 17 00:00:00 2001 From: Filip Tibell Date: Mon, 21 Aug 2023 10:47:43 -0500 Subject: [PATCH] Get rid of lua lifetime in scheduler, add scheduler documentation --- src/lune/builtins/net/server.rs | 1 + src/lune/builtins/task/mod.rs | 8 ++--- src/lune/globals/require/context.rs | 4 +-- src/lune/mod.rs | 12 ++++---- src/lune/scheduler/impl_async.rs | 14 ++++----- src/lune/scheduler/impl_runner.rs | 27 +++++++++-------- src/lune/scheduler/impl_threads.rs | 45 +++++++++++++++-------------- src/lune/scheduler/mod.rs | 34 +++++++++++++++------- src/lune/scheduler/traits.rs | 17 +++++++++-- 9 files changed, 95 insertions(+), 67 deletions(-) diff --git a/src/lune/builtins/net/server.rs b/src/lune/builtins/net/server.rs index 971243d..0d6cec0 100644 --- a/src/lune/builtins/net/server.rs +++ b/src/lune/builtins/net/server.rs @@ -60,6 +60,7 @@ impl Service> for NetServiceInner { let ws = ws.await.into_lua_err()?; let sock = NetWebSocket::new(ws).into_lua_table(lua)?; sched.push_front( + lua, lua.create_thread(handler)?, LuaMultiValue::from_vec(vec![LuaValue::Table(sock)]), ) diff --git a/src/lune/builtins/task/mod.rs b/src/lune/builtins/task/mod.rs index f757d84..189df32 100644 --- a/src/lune/builtins/task/mod.rs +++ b/src/lune/builtins/task/mod.rs @@ -42,7 +42,7 @@ pub fn create(lua: &'static Lua) -> LuaResult> { let sched = lua .app_data_ref::<&Scheduler>() .expect("Lua struct is missing scheduler"); - sched.push_front(thread.clone(), args)?; + sched.push_front(lua, thread.clone(), args)?; Ok(thread) })?; let task_spawn_env = TableBuilder::new(lua)? @@ -85,7 +85,7 @@ fn task_defer<'lua>( let sched = lua .app_data_ref::<&Scheduler>() .expect("Lua struct is missing scheduler"); - sched.push_back(thread.clone(), args)?; + sched.push_back(lua, thread.clone(), args)?; Ok(thread) } @@ -105,10 +105,10 @@ where .expect("Lua struct is missing scheduler"); let thread2 = thread.clone(); - sched.spawn_thread(thread.clone(), async move { + sched.spawn_thread(lua, thread.clone(), async move { let duration = Duration::from_secs_f64(secs); time::sleep(duration).await; - sched.push_back(thread2, args)?; + sched.push_back(lua, thread2, args)?; Ok(()) })?; diff --git a/src/lune/globals/require/context.rs b/src/lune/globals/require/context.rs index 761a5f7..5f17f00 100644 --- a/src/lune/globals/require/context.rs +++ b/src/lune/globals/require/context.rs @@ -188,8 +188,8 @@ impl<'lua> RequireContext<'lua> { .into_lua_thread(self.lua)?; // Schedule the thread to run, wait for it to finish running - let thread_id = sched.push_back(file_thread, ())?; - let thread_res = sched.wait_for_thread(thread_id).await; + let thread_id = sched.push_back(self.lua, file_thread, ())?; + let thread_res = sched.wait_for_thread(self.lua, thread_id).await; // Return the result of the thread, storing any lua value(s) in the registry match thread_res { diff --git a/src/lune/mod.rs b/src/lune/mod.rs index 8071762..2eb9270 100644 --- a/src/lune/mod.rs +++ b/src/lune/mod.rs @@ -6,7 +6,7 @@ mod globals; mod scheduler; mod util; -use self::scheduler::Scheduler; +use self::scheduler::{LuaSchedulerExt, Scheduler}; pub use error::LuneError; use mlua::Lua; @@ -14,7 +14,7 @@ use mlua::Lua; #[derive(Debug, Clone)] pub struct Lune { lua: &'static Lua, - scheduler: &'static Scheduler<'static, 'static>, + scheduler: &'static Scheduler<'static>, args: Vec, } @@ -28,9 +28,9 @@ impl Lune { // any way for us to create a scheduler, store it in app data, and // guarantee it has the same lifetime as Lua without using any unsafe? let lua = Lua::new().into_static(); - let scheduler = Scheduler::new(lua).into_static(); + let scheduler = Scheduler::new().into_static(); - lua.set_app_data(scheduler); + lua.set_scheduler(scheduler); globals::inject_all(lua).expect("Failed to inject lua globals"); Self { @@ -65,7 +65,7 @@ impl Lune { .load(script_contents.as_ref()) .set_name(script_name.as_ref()); - self.scheduler.push_back(main, ())?; - Ok(self.scheduler.run_to_completion().await) + self.scheduler.push_back(self.lua, main, ())?; + Ok(self.scheduler.run_to_completion(self.lua).await) } } diff --git a/src/lune/scheduler/impl_async.rs b/src/lune/scheduler/impl_async.rs index 1a61414..416f632 100644 --- a/src/lune/scheduler/impl_async.rs +++ b/src/lune/scheduler/impl_async.rs @@ -7,10 +7,7 @@ use tokio::{ use super::{IntoLuaThread, Scheduler}; -impl<'lua, 'fut> Scheduler<'lua, 'fut> -where - 'lua: 'fut, -{ +impl<'fut> Scheduler<'fut> { /** Checks if there are any futures to run, for lua futures and background futures respectively. @@ -97,6 +94,7 @@ where */ pub fn spawn_thread( &'fut self, + lua: &'fut Lua, thread: impl IntoLuaThread<'fut>, fut: F, ) -> LuaResult<()> @@ -104,20 +102,20 @@ where FR: IntoLuaMulti<'fut>, F: Future> + 'fut, { - let thread = thread.into_lua_thread(self.lua)?; + let thread = thread.into_lua_thread(lua)?; let futs = self.futures_lua.try_lock().expect( "Failed to lock futures queue - \ can't schedule future lua threads during futures resumption", ); futs.push(Box::pin(async move { - match fut.await.and_then(|rets| rets.into_lua_multi(self.lua)) { + match fut.await.and_then(|rets| rets.into_lua_multi(lua)) { Err(e) => { - self.push_err(thread, e) + self.push_err(lua, thread, e) .expect("Failed to schedule future err thread"); } Ok(v) => { - self.push_back(thread, v) + self.push_back(lua, thread, v) .expect("Failed to schedule future thread"); } } diff --git a/src/lune/scheduler/impl_runner.rs b/src/lune/scheduler/impl_runner.rs index 674b283..73780d9 100644 --- a/src/lune/scheduler/impl_runner.rs +++ b/src/lune/scheduler/impl_runner.rs @@ -9,16 +9,13 @@ use crate::lune::util::traits::LuaEmitErrorExt; use super::Scheduler; -impl<'lua, 'fut> Scheduler<'lua, 'fut> -where - 'lua: 'fut, -{ +impl<'fut> Scheduler<'fut> { /** Runs all lua threads to completion. Returns `true` if any thread was resumed, `false` otherwise. */ - fn run_lua_threads(&self) -> bool { + fn run_lua_threads(&self, lua: &Lua) -> bool { if self.state.has_exit_code() { return false; } @@ -32,7 +29,7 @@ where { // Deconstruct the scheduler thread into its parts let thread_id = thread.id(); - let (thread, args) = thread.into_inner(self.lua); + let (thread, args) = thread.into_inner(lua); // Make sure this thread is still resumable, it might have // been resumed somewhere else or even have been cancelled @@ -53,7 +50,7 @@ where // a non-zero exit code, and print it out to stderr if let Err(err) = &res { self.state.increment_error_count(); - self.lua.emit_error(err.clone()); + lua.emit_error(err.clone()); } // If the thread has finished running completely, @@ -65,11 +62,9 @@ where if sender.receiver_count() > 0 { let stored = match res { Err(e) => Err(e), - Ok(v) => Ok(Arc::new( - self.lua.create_registry_value(v.into_vec()).expect( - "Failed to store thread results in registry - out of memory", - ), - )), + Ok(v) => Ok(Arc::new(lua.create_registry_value(v.into_vec()).expect( + "Failed to store thread results in registry - out of memory", + ))), }; sender .send(stored) @@ -135,13 +130,17 @@ where Will emit lua output and errors to stdout and stderr. */ - pub async fn run_to_completion(&self) -> ExitCode { + pub async fn run_to_completion(&self, lua: &Lua) -> ExitCode { + if let Some(code) = self.state.exit_code() { + return ExitCode::from(code); + } + let set = LocalSet::new(); let _guard = set.enter(); loop { // 1. Run lua threads until exit or there are none left - self.run_lua_threads(); + self.run_lua_threads(lua); // 2. If we got a manual exit code from lua we should // not try to wait for any pending futures to complete diff --git a/src/lune/scheduler/impl_threads.rs b/src/lune/scheduler/impl_threads.rs index 165a37b..b72145e 100644 --- a/src/lune/scheduler/impl_threads.rs +++ b/src/lune/scheduler/impl_threads.rs @@ -7,10 +7,7 @@ use super::{ IntoLuaThread, Scheduler, }; -impl<'lua, 'fut> Scheduler<'lua, 'fut> -where - 'lua: 'fut, -{ +impl<'fut> Scheduler<'fut> { /** Checks if there are any lua threads to run. */ @@ -43,11 +40,16 @@ where /** Schedules the `thread` to be resumed with the given [`LuaError`]. */ - pub fn push_err<'a>(&'a self, thread: impl IntoLuaThread<'a>, err: LuaError) -> LuaResult<()> { - let thread = thread.into_lua_thread(self.lua)?; + pub fn push_err<'a>( + &self, + lua: &'a Lua, + thread: impl IntoLuaThread<'a>, + err: LuaError, + ) -> LuaResult<()> { + let thread = thread.into_lua_thread(lua)?; let args = LuaMultiValue::new(); // Will be resumed with error, don't need real args - let thread = SchedulerThread::new(self.lua, thread, args); + let thread = SchedulerThread::new(lua, thread, args); let thread_id = thread.id(); self.state.set_thread_error(thread_id, err); @@ -71,14 +73,15 @@ where right away, before any other currently scheduled threads. */ pub fn push_front<'a>( - &'a self, + &self, + lua: &'a Lua, thread: impl IntoLuaThread<'a>, args: impl IntoLuaMulti<'a>, ) -> LuaResult { - let thread = thread.into_lua_thread(self.lua)?; - let args = args.into_lua_multi(self.lua)?; + let thread = thread.into_lua_thread(lua)?; + let args = args.into_lua_multi(lua)?; - let thread = SchedulerThread::new(self.lua, thread, args); + let thread = SchedulerThread::new(lua, thread, args); let thread_id = thread.id(); self.threads @@ -109,14 +112,15 @@ where after all other current threads have been resumed. */ pub fn push_back<'a>( - &'a self, + &self, + lua: &'a Lua, thread: impl IntoLuaThread<'a>, args: impl IntoLuaMulti<'a>, ) -> LuaResult { - let thread = thread.into_lua_thread(self.lua)?; - let args = args.into_lua_multi(self.lua)?; + let thread = thread.into_lua_thread(lua)?; + let args = args.into_lua_multi(lua)?; - let thread = SchedulerThread::new(self.lua, thread, args); + let thread = SchedulerThread::new(lua, thread, args); let thread_id = thread.id(); self.threads @@ -145,10 +149,11 @@ where /** Waits for the given thread to finish running, and returns its result. */ - pub async fn wait_for_thread( + pub async fn wait_for_thread<'a>( &self, + lua: &'a Lua, thread_id: SchedulerThreadId, - ) -> LuaResult> { + ) -> LuaResult> { let mut recv = { let senders = self.thread_senders.borrow(); let sender = senders @@ -163,8 +168,7 @@ where match res { Err(e) => Err(e), Ok(k) => { - let vals = self - .lua + let vals = lua .registry_value::>(&k) .expect("Received invalid registry key for thread"); @@ -172,8 +176,7 @@ where // up registry values on its own, but doing this will add // some extra safety and clean up registry values faster if let Some(key) = Arc::into_inner(k) { - self.lua - .remove_registry_value(key) + lua.remove_registry_value(key) .expect("Failed to remove registry key for thread"); } diff --git a/src/lune/scheduler/mod.rs b/src/lune/scheduler/mod.rs index 6f5030f..7fb417d 100644 --- a/src/lune/scheduler/mod.rs +++ b/src/lune/scheduler/mod.rs @@ -37,8 +37,7 @@ type SchedulerFuture<'fut> = Pin + 'fut>>; and data will remain unchanged and accessible from all clones. */ #[derive(Debug, Clone)] -pub(crate) struct Scheduler<'lua, 'fut> { - lua: &'lua Lua, +pub(crate) struct Scheduler<'fut> { state: Arc, threads: Arc>>, thread_senders: Arc>>, @@ -47,23 +46,33 @@ pub(crate) struct Scheduler<'lua, 'fut> { futures_break_signal: Sender<()>, } -impl<'lua, 'fut> Scheduler<'lua, 'fut> { - pub fn new(lua: &'lua Lua) -> Self { +impl<'fut> Scheduler<'fut> { + /** + Creates a new scheduler. + */ + pub fn new() -> Self { let (futures_break_signal, _) = channel(1); - let this = Self { - lua, + Self { state: Arc::new(SchedulerState::new()), threads: Arc::new(RefCell::new(VecDeque::new())), thread_senders: Arc::new(RefCell::new(HashMap::new())), futures_lua: Arc::new(AsyncMutex::new(FuturesUnordered::new())), futures_background: Arc::new(AsyncMutex::new(FuturesUnordered::new())), futures_break_signal, - }; + } + } + /** + Sets the luau interrupt for this scheduler. + + This will propagate errors from any lua-spawned + futures back to the lua threads that spawned them. + */ + pub fn set_interrupt_for(&self, lua: &Lua) { // Propagate errors given to the scheduler back to their lua threads // FUTURE: Do profiling and anything else we need inside of this interrupt - let state = this.state.clone(); + let state = self.state.clone(); lua.set_interrupt(move |_| { if let Some(id) = state.get_current_thread_id() { if let Some(err) = state.get_thread_error(id) { @@ -72,10 +81,15 @@ impl<'lua, 'fut> Scheduler<'lua, 'fut> { } Ok(LuaVmState::Continue) }); - - this } + /** + Sets the exit code for the scheduler. + + This will stop the scheduler from resuming any more lua threads or futures. + + Panics if the exit code is set more than once. + */ pub fn set_exit_code(&self, code: impl Into) { assert!( self.state.exit_code().is_none(), diff --git a/src/lune/scheduler/traits.rs b/src/lune/scheduler/traits.rs index c3f2c4b..f95c253 100644 --- a/src/lune/scheduler/traits.rs +++ b/src/lune/scheduler/traits.rs @@ -13,7 +13,12 @@ return yield() for access to the scheduler without having to import it or handle registry / app data references manually. */ -pub trait LuaSchedulerExt<'lua> { +pub(crate) trait LuaSchedulerExt<'lua> { + /** + Sets the scheduler for the [`Lua`] struct. + */ + fn set_scheduler(&'lua self, scheduler: &'lua Scheduler); + /** Creates a function callable from Lua that runs an async closure and returns the results of it to the call site. @@ -33,6 +38,11 @@ impl<'lua> LuaSchedulerExt<'lua> for Lua where 'lua: 'static, { + fn set_scheduler(&'lua self, scheduler: &'lua Scheduler) { + self.set_app_data(scheduler); + scheduler.set_interrupt_for(self); + } + fn create_async_function(&'lua self, func: F) -> LuaResult> where A: FromLuaMulti<'lua>, @@ -40,6 +50,9 @@ where F: Fn(&'lua Lua, A) -> FR + 'lua, FR: Future> + 'lua, { + self.app_data_ref::<&Scheduler>() + .expect("Lua must have a scheduler to create async functions"); + let async_env = self.create_table_with_capacity(0, 2)?; async_env.set( @@ -57,7 +70,7 @@ where let sched = lua .app_data_ref::<&Scheduler>() .expect("Lua struct is missing scheduler"); - sched.spawn_thread(thread, future)?; + sched.spawn_thread(lua, thread, future)?; Ok(()) }), )?;