diff --git a/src/smol/executor.rs b/src/smol/executor.rs deleted file mode 100644 index cd3aed2..0000000 --- a/src/smol/executor.rs +++ /dev/null @@ -1,73 +0,0 @@ -use std::{rc::Rc, sync::Arc}; - -use mlua::prelude::*; -use smol::*; - -struct LuaSmol<'ex> { - lua_exec: Rc>, - main_exec: Arc>, -} - -// HACK: self_cell is not actually used to make a self-referential struct here, -// it is instead used to guarantee the lifetime of the executors. It does not -// need to refer to Lua during construction at all but the end result is the -// same and we let the self_cell crate handle all the unsafe code for us. -self_cell::self_cell!( - struct LuaExecutorInner { - owner: Rc, - - #[not_covariant] - dependent: LuaSmol, - } -); - -impl LuaExecutorInner { - fn create(lua: Rc) -> Self { - LuaExecutorInner::new(lua, |_| { - let lua_exec = Rc::new(LocalExecutor::new()); - let main_exec = Arc::new(Executor::new()); - LuaSmol { - lua_exec, - main_exec, - } - }) - } -} - -pub struct LuaExecutor { - _lua: Rc, - inner: LuaExecutorInner, -} - -impl LuaExecutor { - pub fn new(lua: Rc) -> Self { - Self { - _lua: Rc::clone(&lua), - inner: LuaExecutorInner::create(lua), - } - } - - pub fn run<'outer_fn, F>(&'outer_fn self, futures_spawner: F) -> LuaResult<()> - where - F: for<'lua> FnOnce( - &'lua Lua, - &'outer_fn LocalExecutor<'lua>, - &'outer_fn Executor<'lua>, - ) -> LuaResult<()>, - { - self.inner.with_dependent(|lua, rt_executors| { - // 1. Spawn futures using the provided function - let lua_exec = &rt_executors.lua_exec; - let main_exec = &rt_executors.main_exec; - futures_spawner(lua, lua_exec, main_exec)?; - // 2. Run them all until lua executor completes - block_on(main_exec.run(async { - while !lua_exec.is_empty() { - lua_exec.tick().await; - } - })); - // 3. Yay! - Ok(()) - }) - } -} diff --git a/src/smol/mod.rs b/src/smol/mod.rs index d1c9cd2..d67eeb9 100644 --- a/src/smol/mod.rs +++ b/src/smol/mod.rs @@ -1,25 +1,38 @@ -use std::{ - rc::Rc, - time::{Duration, Instant}, -}; +use std::time::{Duration, Instant}; use mlua::prelude::*; use smol::*; -const NUM_TEST_BATCHES: usize = 20; -const NUM_TEST_THREADS: usize = 50_000; - const MAIN_CHUNK: &str = r#" -wait(0.01 * math.random()) +for i = 1, 5 do + print("iteration " .. tostring(i) .. " of 5") + local thread = coroutine.running() + local counter = 0 + for j = 1, 10_000 do + __scheduler__spawn(function() + wait(0.1 * math.random()) + counter += 1 + if counter == 10_000 then + print("completed iteration " .. tostring(i) .. " of 5") + end + end) + end + coroutine.yield() -- FIXME: This resumes instantly with mlua "async" feature +end "#; -mod executor; -use executor::*; +mod thread_runtime; +mod thread_storage; +mod thread_util; + +use thread_runtime::*; +use thread_storage::*; pub fn main() -> LuaResult<()> { - let lua = Rc::new(Lua::new()); - let rt = LuaExecutor::new(Rc::clone(&lua)); + let start = Instant::now(); + let lua = Lua::new(); + // Set up persistent lua environment lua.globals().set( "wait", lua.create_async_function(|_, duration: f64| async move { @@ -29,28 +42,10 @@ pub fn main() -> LuaResult<()> { })?, )?; - let start = Instant::now(); - let main_fn = lua.load(MAIN_CHUNK).into_function()?; - - for _ in 0..NUM_TEST_BATCHES { - rt.run(|lua, lua_exec, _| { - // TODO: Figure out how to create a scheduler queue that we can - // append threads to, both front and back, and resume them in order - - for _ in 0..NUM_TEST_THREADS { - let thread = lua.create_thread(main_fn.clone())?; - let task = lua_exec.spawn(async move { - if let Err(err) = thread.into_async::<_, ()>(()).await { - println!("error: {}", err); - } - Ok::<_, LuaError>(()) - }); - task.detach(); - } - - Ok(()) - })?; - } + // Set up runtime (thread queue / async executors) and run main script until end + let rt = ThreadRuntime::new(&lua)?; + rt.push_main(&lua, lua.load(MAIN_CHUNK), ()); + rt.run_blocking(&lua); println!("elapsed: {:?}", start.elapsed()); diff --git a/src/smol/thread_runtime.rs b/src/smol/thread_runtime.rs new file mode 100644 index 0000000..7cf4fac --- /dev/null +++ b/src/smol/thread_runtime.rs @@ -0,0 +1,167 @@ +use std::{collections::VecDeque, rc::Rc}; + +use mlua::prelude::*; +use smol::{ + channel::{Receiver, Sender}, + future::race, + lock::Mutex, + *, +}; + +use super::{ + thread_util::{IntoLuaThread, LuaThreadOrFunction}, + ThreadWithArgs, +}; + +pub struct ThreadRuntime { + queue: Rc>>, + tx: Sender<()>, + rx: Receiver<()>, +} + +impl ThreadRuntime { + /** + Creates a new runtime for the given Lua state. + + This will inject some functions to interact with the scheduler / executor. + */ + pub fn new(lua: &Lua) -> LuaResult { + let queue = Rc::new(Mutex::new(VecDeque::new())); + let (tx, rx) = channel::unbounded(); + + // Create spawn function (push to start of queue) + let queue_spawn = Rc::clone(&queue); + let tx_spawn = tx.clone(); + let fn_spawn = lua.create_function( + move |lua, (tof, args): (LuaThreadOrFunction, LuaMultiValue)| { + let thread = tof.into_thread(lua)?; + if thread.status() == LuaThreadStatus::Resumable { + let stored = ThreadWithArgs::new(lua, thread.clone(), args); + queue_spawn.lock_blocking().push_front(stored); + tx_spawn.try_send(()).map_err(|_| { + LuaError::runtime("Tried to spawn thread to a dropped queue") + })?; + Ok(thread) + } else { + Err(LuaError::runtime("Tried to spawn non-resumable thread")) + } + }, + )?; + + // Create defer function (push to end of queue) + let queue_defer = Rc::clone(&queue); + let tx_defer = tx.clone(); + let fn_defer = lua.create_function( + move |lua, (tof, args): (LuaThreadOrFunction, LuaMultiValue)| { + let thread = tof.into_thread(lua)?; + if thread.status() == LuaThreadStatus::Resumable { + let stored = ThreadWithArgs::new(lua, thread.clone(), args); + queue_defer.lock_blocking().push_back(stored); + tx_defer.try_send(()).map_err(|_| { + LuaError::runtime("Tried to defer thread to a dropped queue") + })?; + Ok(thread) + } else { + Err(LuaError::runtime("Tried to defer non-resumable thread")) + } + }, + )?; + + // FUTURE: Store these as named registry values instead + // so that they are not accessible from within user code + lua.globals().set("__scheduler__spawn", fn_spawn)?; + lua.globals().set("__scheduler__defer", fn_defer)?; + + Ok(ThreadRuntime { queue, tx, rx }) + } + + /** + Pushes a chunk / function / thread to the front of the runtime. + */ + pub fn push_main<'lua>( + &self, + lua: &'lua Lua, + thread: impl IntoLuaThread<'lua>, + args: impl IntoLuaMulti<'lua>, + ) { + let thread = thread + .into_lua_thread(lua) + .expect("failed to create thread"); + let args = args.into_lua_multi(lua).expect("failed to create args"); + + let stored = ThreadWithArgs::new(lua, thread, args); + + self.queue.lock_blocking().push_front(stored); + self.tx.try_send(()).unwrap(); + } + + /** + Runs the runtime until all Lua threads have completed. + + Note that the given Lua state must be the same one that was + used to create this runtime, otherwise this method may panic. + */ + pub async fn run_async(&self, lua: &Lua) { + // Create new executors to use + let lua_exec = LocalExecutor::new(); + let main_exec = Executor::new(); + + // Tick local lua executor while also driving main + // executor forward, until all lua threads finish + let fut = async { + loop { + let did_spawn = race( + // Wait for next futures step... + async { + lua_exec.tick().await; + false + }, + // ...or for a new thread to arrive + async { + self.rx.recv().await.ok(); + true + }, + ) + .await; + + // If a new thread was spawned onto queue, we + // must drain it and schedule on the executor + if did_spawn { + let queued_threads = self.queue.lock().await.drain(..).collect::>(); + for queued_thread in queued_threads { + // NOTE: Thread may have been cancelled from lua + // before we got here, so we need to check it again + let (thread, args) = queued_thread.into_inner(lua); + if thread.status() == LuaThreadStatus::Resumable { + let fut = thread.into_async::<_, ()>(args); + lua_exec + .spawn(async move { + match fut.await { + Ok(()) => {} + Err(e) => eprintln!("{e}"), + } + }) + .detach(); + } + } + } + + // Empty executor = no remaining threads + if lua_exec.is_empty() { + break; + } + } + }; + + main_exec.run(fut).await + } + + /** + Runs the runtime until all Lua threads have completed, blocking the thread. + + See [`ThreadRuntime::run_async`] for more info. + */ + pub fn run_blocking(&self, lua: &Lua) { + block_on(self.run_async(lua)) + } +} diff --git a/src/smol/thread_storage.rs b/src/smol/thread_storage.rs new file mode 100644 index 0000000..cb51c6a --- /dev/null +++ b/src/smol/thread_storage.rs @@ -0,0 +1,43 @@ +use mlua::prelude::*; + +#[derive(Debug)] +pub struct ThreadWithArgs { + key_thread: LuaRegistryKey, + key_args: LuaRegistryKey, +} + +impl ThreadWithArgs { + pub fn new<'lua>(lua: &'lua Lua, thread: LuaThread<'lua>, args: LuaMultiValue<'lua>) -> Self { + let args_vec = args.into_vec(); + + let key_thread = lua + .create_registry_value(thread) + .expect("Failed to store thread in registry - out of memory"); + let key_args = lua + .create_registry_value(args_vec) + .expect("Failed to store thread args in registry - out of memory"); + + Self { + key_thread, + key_args, + } + } + + pub fn into_inner(self, lua: &Lua) -> (LuaThread<'_>, LuaMultiValue<'_>) { + let thread = lua + .registry_value(&self.key_thread) + .expect("Failed to get thread from registry"); + let args_vec = lua + .registry_value(&self.key_args) + .expect("Failed to get thread args from registry"); + + let args = LuaMultiValue::from_vec(args_vec); + + lua.remove_registry_value(self.key_thread) + .expect("Failed to remove thread from registry"); + lua.remove_registry_value(self.key_args) + .expect("Failed to remove thread args from registry"); + + (thread, args) + } +} diff --git a/src/smol/thread_util.rs b/src/smol/thread_util.rs new file mode 100644 index 0000000..43e0858 --- /dev/null +++ b/src/smol/thread_util.rs @@ -0,0 +1,68 @@ +use mlua::prelude::*; + +/** + Wrapper struct to accept either a Lua thread or a Lua function as function argument. + + [`LuaThreadOrFunction::into_thread`] may be used to convert the value into a Lua thread. +*/ +#[derive(Clone)] +pub enum LuaThreadOrFunction<'lua> { + Thread(LuaThread<'lua>), + Function(LuaFunction<'lua>), +} + +impl<'lua> LuaThreadOrFunction<'lua> { + pub(super) fn into_thread(self, lua: &'lua Lua) -> LuaResult> { + match self { + Self::Thread(t) => Ok(t), + Self::Function(f) => lua.create_thread(f), + } + } +} + +impl<'lua> FromLua<'lua> for LuaThreadOrFunction<'lua> { + fn from_lua(value: LuaValue<'lua>, _: &'lua Lua) -> LuaResult { + match value { + LuaValue::Thread(t) => Ok(Self::Thread(t)), + LuaValue::Function(f) => Ok(Self::Function(f)), + value => Err(LuaError::FromLuaConversionError { + from: value.type_name(), + to: "LuaThreadOrFunction", + message: Some("Expected thread or function".to_string()), + }), + } + } +} + +/** + Trait for any struct that can be turned into an [`LuaThread`] + and given to the scheduler, implemented for the following types: + + - Lua threads ([`LuaThread`]) + - Lua functions ([`LuaFunction`]) + - Lua chunks ([`LuaChunk`]) +*/ +pub trait IntoLuaThread<'lua> { + /** + Converts the value into a Lua thread. + */ + fn into_lua_thread(self, lua: &'lua Lua) -> LuaResult>; +} + +impl<'lua> IntoLuaThread<'lua> for LuaThread<'lua> { + fn into_lua_thread(self, _: &'lua Lua) -> LuaResult> { + Ok(self) + } +} + +impl<'lua> IntoLuaThread<'lua> for LuaFunction<'lua> { + fn into_lua_thread(self, lua: &'lua Lua) -> LuaResult> { + lua.create_thread(self) + } +} + +impl<'lua, 'a> IntoLuaThread<'lua> for LuaChunk<'lua, 'a> { + fn into_lua_thread(self, lua: &'lua Lua) -> LuaResult> { + lua.create_thread(self.into_function()?) + } +}