From 5e076f005244dd4d82248da0c8174dd1ce779438 Mon Sep 17 00:00:00 2001 From: Filip Tibell Date: Fri, 18 Aug 2023 09:20:35 -0500 Subject: [PATCH] Scheduler now manages entire Lua struct, elide lifetimes where possible --- src/lune/mod.rs | 20 ++------ src/lune/scheduler/impl_async.rs | 11 ++-- src/lune/scheduler/impl_runner.rs | 81 ++++++++++++++++++++---------- src/lune/scheduler/impl_threads.rs | 24 ++++----- src/lune/scheduler/mod.rs | 46 +++-------------- src/lune/scheduler/traits.rs | 13 ----- 6 files changed, 81 insertions(+), 114 deletions(-) diff --git a/src/lune/mod.rs b/src/lune/mod.rs index c625719..b46e023 100644 --- a/src/lune/mod.rs +++ b/src/lune/mod.rs @@ -1,6 +1,4 @@ -use std::{process::ExitCode, sync::Arc}; - -use mlua::prelude::*; +use std::process::ExitCode; mod error; mod scheduler; @@ -41,18 +39,8 @@ impl Lune { script_name: impl AsRef, script_contents: impl AsRef<[u8]>, ) -> Result { - let lua = Arc::new(Lua::new()); - let sched = Scheduler::new(Arc::clone(&lua)); - - let main_fn = lua - .load(script_contents.as_ref()) - .set_name(script_name.as_ref()) - .into_function()?; - let main_thread = lua.create_thread(main_fn)?.into_owned(); - - sched - .push_back(main_thread, ()) - .expect("Failed to enqueue thread for main"); - Ok(sched.run_to_completion().await) + Ok(Scheduler::new() + .run_main(script_name, script_contents) + .await) } } diff --git a/src/lune/scheduler/impl_async.rs b/src/lune/scheduler/impl_async.rs index 7e1ca67..65c1970 100644 --- a/src/lune/scheduler/impl_async.rs +++ b/src/lune/scheduler/impl_async.rs @@ -1,16 +1,16 @@ use futures_util::Future; use mlua::prelude::*; -use super::SchedulerImpl; +use super::Scheduler; -impl<'lua, 'fut> SchedulerImpl<'fut> +impl<'lua, 'fut> Scheduler<'fut> where 'lua: 'fut, { /** Schedules a plain future to run whenever the scheduler is available. */ - pub fn schedule_future(&'lua self, fut: F) + pub fn schedule_future(&'fut self, fut: F) where F: 'fut + Future, { @@ -24,10 +24,9 @@ where /** Schedules the given `thread` to run when the given `fut` completes. */ - pub fn schedule_future_thread(&'lua self, thread: LuaOwnedThread, fut: F) -> LuaResult<()> + pub fn schedule_future_thread(&'fut self, thread: LuaOwnedThread, fut: F) -> LuaResult<()> where - R: IntoLuaMulti<'fut>, - F: 'fut + Future>, + F: 'fut + Future>>, { self.schedule_future(async move { let rets = fut.await.expect("Failed to receive result"); diff --git a/src/lune/scheduler/impl_runner.rs b/src/lune/scheduler/impl_runner.rs index 878bde7..ed84b92 100644 --- a/src/lune/scheduler/impl_runner.rs +++ b/src/lune/scheduler/impl_runner.rs @@ -2,11 +2,14 @@ use std::{process::ExitCode, sync::Arc}; use futures_util::StreamExt; use mlua::prelude::*; + use tokio::task::LocalSet; -use super::SchedulerImpl; +use super::{IntoLuaOwnedThread, Scheduler}; -impl<'lua, 'fut> SchedulerImpl<'fut> +const EMPTY_MULTI_VALUE: LuaMultiValue = LuaMultiValue::new(); + +impl<'lua, 'fut> Scheduler<'fut> where 'lua: 'fut, { @@ -15,7 +18,7 @@ where Returns `true` if any thread was resumed, `false` otherwise. */ - fn run_lua_threads(&'lua self) -> bool { + fn run_lua_threads(&self) -> bool { if self.state.has_exit_code() { return false; } @@ -60,7 +63,7 @@ where Returns `true` if any future was resumed, `false` otherwise. */ - async fn run_futures(&'lua self) -> bool { + async fn run_futures(&self) -> bool { let mut resumed_any = false; let mut futs = self @@ -84,32 +87,31 @@ where Will emit lua output and errors to stdout and stderr. */ - pub async fn run_to_completion(&'lua self) -> ExitCode { - let fut = async move { - loop { - // 1. Run lua threads until exit or there are none left, - // if any thread was resumed it may have spawned futures - let resumed_lua = self.run_lua_threads(); + pub async fn run_to_completion(&self) -> ExitCode { + let set = LocalSet::new(); + let _guard = set.enter(); - // 2. If we got a manual exit code from lua we should - // not try to wait for any pending futures to complete - if self.state.has_exit_code() { - break; - } + loop { + // 1. Run lua threads until exit or there are none left, + // if any thread was resumed it may have spawned futures + let resumed_lua = self.run_lua_threads(); - // 3. Keep resuming futures until we get a new lua thread to - // resume, or until we don't have any futures left to wait for - let resumed_fut = self.run_futures().await; - - // 4. If we did not resume any lua threads, and we have no futures - // remaining either, we have now run the scheduler until completion - if !resumed_lua && !resumed_fut { - break; - } + // 2. If we got a manual exit code from lua we should + // not try to wait for any pending futures to complete + if self.state.has_exit_code() { + break; } - }; - LocalSet::new().run_until(fut).await; + // 3. Keep resuming futures until we get a new lua thread to + // resume, or until we don't have any futures left to wait for + let resumed_fut = self.run_futures().await; + + // 4. If we did not resume any lua threads, and we have no futures + // remaining either, we have now run the scheduler until completion + if !resumed_lua && !resumed_fut { + break; + } + } if let Some(code) = self.state.exit_code() { ExitCode::from(code) @@ -119,4 +121,31 @@ where ExitCode::SUCCESS } } + + /** + Runs a script with the given `script_name` and `script_contents` to completion. + + Refer to [`run_to_completion`] for additional details. + */ + pub async fn run_main( + self, + script_name: impl AsRef, + script_contents: impl AsRef<[u8]>, + ) -> ExitCode { + let main_fn = self + .lua + .load(script_contents.as_ref()) + .set_name(script_name.as_ref()) + .into_function() + .expect("Failed to create function for main"); + + let main_thread = main_fn + .into_owned_lua_thread(&self.lua) + .expect("Failed to create thread for main"); + + self.push_back(main_thread, EMPTY_MULTI_VALUE) + .expect("Failed to enqueue thread for main"); + + self.run_to_completion().await + } } diff --git a/src/lune/scheduler/impl_threads.rs b/src/lune/scheduler/impl_threads.rs index b7d2b94..b7dea8a 100644 --- a/src/lune/scheduler/impl_threads.rs +++ b/src/lune/scheduler/impl_threads.rs @@ -4,10 +4,10 @@ use mlua::prelude::*; use super::{ thread::{SchedulerThread, SchedulerThreadId, SchedulerThreadSender}, - SchedulerImpl, + Scheduler, }; -impl<'lua, 'fut> SchedulerImpl<'fut> +impl<'lua, 'fut> Scheduler<'fut> where 'lua: 'fut, { @@ -28,8 +28,8 @@ where Returns `None` if there are no threads left to run. */ pub(super) fn pop_thread( - &'lua self, - ) -> LuaResult, SchedulerThreadSender)>> { + &self, + ) -> LuaResult, SchedulerThreadSender)>> { match self .threads .try_borrow_mut() @@ -56,12 +56,10 @@ where right away, before any other currently scheduled threads. */ pub fn push_front( - &'lua self, + &self, thread: LuaOwnedThread, - args: impl IntoLuaMulti<'lua>, + args: LuaMultiValue<'_>, ) -> LuaResult { - let args = args.into_lua_multi(&self.lua)?; - let thread = SchedulerThread::new(&self.lua, thread, args)?; let thread_id = thread.id(); @@ -82,12 +80,10 @@ where after all other current threads have been resumed. */ pub fn push_back( - &'lua self, + &self, thread: LuaOwnedThread, - args: impl IntoLuaMulti<'lua>, + args: LuaMultiValue<'_>, ) -> LuaResult { - let args = args.into_lua_multi(&self.lua)?; - let thread = SchedulerThread::new(&self.lua, thread, args)?; let thread_id = thread.id(); @@ -107,9 +103,9 @@ where Waits for the given thread to finish running, and returns its result. */ pub async fn wait_for_thread( - &'lua self, + &self, thread_id: SchedulerThreadId, - ) -> LuaResult> { + ) -> LuaResult> { let mut recv = { let senders = self.thread_senders.borrow(); let sender = senders diff --git a/src/lune/scheduler/mod.rs b/src/lune/scheduler/mod.rs index a6937c3..e7edc1b 100644 --- a/src/lune/scheduler/mod.rs +++ b/src/lune/scheduler/mod.rs @@ -1,9 +1,7 @@ use std::{ cell::RefCell, collections::{HashMap, VecDeque}, - ops::Deref, pin::Pin, - sync::Arc, }; use futures_util::{stream::FuturesUnordered, Future}; @@ -28,51 +26,21 @@ use self::{ /** Scheduler for Lua threads. - Can be cheaply cloned, and any clone will refer - to the same underlying scheduler and Lua struct. -*/ -#[derive(Debug, Clone)] -pub(crate) struct Scheduler<'fut> { - inner: Arc>, -} - -impl<'fut> Scheduler<'fut> { - /** - Creates a new scheduler for the given [`Lua`] struct. - */ - pub fn new(lua: Arc) -> Self { - let sched_lua = Arc::clone(&lua); - let sched_impl = SchedulerImpl::new(sched_lua); - - let inner = Arc::new(sched_impl); - - Self { inner } - } -} - -impl<'fut> Deref for Scheduler<'fut> { - type Target = SchedulerImpl<'fut>; - fn deref(&self) -> &Self::Target { - &self.inner - } -} - -/** - Implementation of scheduler for Lua threads. - - Not meant to be used directly, use [`Scheduler`] instead. + This wraps a [`Lua`] struct and exposes it as the `lua` property. */ #[derive(Debug)] -pub(crate) struct SchedulerImpl<'fut> { - lua: Arc, +pub(crate) struct Scheduler<'fut> { + pub(crate) lua: Lua, state: SchedulerState, threads: RefCell>, thread_senders: RefCell>, futures: AsyncMutex + 'fut>>>>, } -impl<'fut> SchedulerImpl<'fut> { - fn new(lua: Arc) -> Self { +impl<'fut> Scheduler<'fut> { + pub fn new() -> Self { + let lua = Lua::new(); + Self { lua, state: SchedulerState::new(), diff --git a/src/lune/scheduler/traits.rs b/src/lune/scheduler/traits.rs index bd24567..3bbd129 100644 --- a/src/lune/scheduler/traits.rs +++ b/src/lune/scheduler/traits.rs @@ -1,5 +1,3 @@ -use std::sync::Arc; - use futures_util::Future; use mlua::prelude::*; @@ -19,11 +17,6 @@ pub trait LuaSchedulerExt<'lua, 'fut> where 'lua: 'fut, { - /** - Creates a new [`Lua`] struct with a [`Scheduler`]. - */ - fn new_with_scheduler() -> Arc; - /** Creates a function callable from Lua that runs an async closure and returns the results of it to the call site. @@ -40,12 +33,6 @@ impl<'lua, 'fut> LuaSchedulerExt<'lua, 'fut> for Lua where 'lua: 'fut, { - fn new_with_scheduler() -> Arc { - let lua = Arc::new(Lua::new()); - lua.set_app_data(Scheduler::new(Arc::clone(&lua))); - lua - } - fn create_async_function(&'lua self, func: F) -> LuaResult> where A: FromLuaMulti<'lua>,