From 4fa76aa27f9fd5ef0300c571c857f1726d75fc64 Mon Sep 17 00:00:00 2001 From: Filip Tibell Date: Wed, 16 Aug 2023 22:00:15 -0500 Subject: [PATCH] Implement thread return value broadcasting in scheduler --- src/lune/scheduler/impl_runner.rs | 86 +++++++++++++++++++----------- src/lune/scheduler/impl_threads.rs | 66 +++++++++++++++++++---- src/lune/scheduler/mod.rs | 14 ++++- src/lune/scheduler/thread.rs | 36 +++++++++++++ 4 files changed, 161 insertions(+), 41 deletions(-) diff --git a/src/lune/scheduler/impl_runner.rs b/src/lune/scheduler/impl_runner.rs index 3abdee0..f822657 100644 --- a/src/lune/scheduler/impl_runner.rs +++ b/src/lune/scheduler/impl_runner.rs @@ -1,64 +1,90 @@ -use std::process::ExitCode; +use std::{process::ExitCode, sync::Arc}; use mlua::prelude::*; +use tokio::task::LocalSet; use super::SchedulerImpl; impl SchedulerImpl { /** - Runs all lua threads to completion, gathering any results they produce. - */ - fn run_threads(&self) -> Vec> { - let mut results = Vec::new(); + Runs all lua threads to completion. - while let Some((thread, args)) = self + Returns `true` if any thread was resumed, `false` otherwise. + */ + fn run_lua_threads(&self) -> bool { + if self.state.has_exit_code() { + return false; + } + + let mut resumed_any = false; + + while let Some((thread, args, sender)) = self .pop_thread() .expect("Failed to pop thread from scheduler") { - let res = thread.resume(args); + let res = thread.resume::<_, LuaMultiValue>(args); self.state.add_resumption(); + resumed_any = true; - if let Err(e) = &res { + if let Err(err) = &res { self.state.add_error(); - eprintln!("{e}"); // TODO: Pretty print the lua error here + eprint!("{err}"); // TODO: Pretty print the lua error here } - results.push(res); + if sender.receiver_count() > 0 { + sender + .send(res.map(|v| { + Arc::new( + self.lua + .create_registry_value(v.into_vec()) + .expect("Failed to store return values in registry"), + ) + })) + .expect("Failed to broadcast return values of thread"); + } if self.state.has_exit_code() { break; } } - results + resumed_any } /** - Runs the scheduler to completion, both normal lua threads and futures. + Runs the scheduler to completion in a [`LocalSet`], + both normal lua threads and futures, prioritizing + lua threads over completion of any pending futures. - This will emit lua output and errors to stdout and stderr. + Will emit lua output and errors to stdout and stderr. */ pub async fn run_to_completion(&self) -> ExitCode { - loop { - // 1. Run lua threads until exit or there are none left - let results = self.run_threads(); + let fut = async move { + loop { + // 1. Run lua threads until exit or there are none left, + // if any thread was resumed it may have spawned futures + let resumed_lua = self.run_lua_threads(); - // 2. If we got a manual exit code from lua we should not continue - if self.state.has_exit_code() { - break; + // 2. If we got a manual exit code from lua we should + // not try to wait for any pending futures to complete + if self.state.has_exit_code() { + break; + } + + // 3. Wait for the next future to complete, this may + // add more lua threads to run in the next iteration + + // TODO: Implement futures resumption + + // 4. If we did not resume any lua threads, and we have no futures + // queued either, we have now run the scheduler until completion + if !resumed_lua { + break; + } } + }; - // 3. Wait for the next future to complete, this may - // add more lua threads to run in the next iteration - - // TODO: Implement this - - // 4. If did not resume any lua threads, and we have no futures - // queued either, we have run the scheduler until completion - if results.is_empty() { - break; - } - } + LocalSet::new().run_until(fut).await; if let Some(code) = self.state.exit_code() { ExitCode::from(code) diff --git a/src/lune/scheduler/impl_threads.rs b/src/lune/scheduler/impl_threads.rs index d17873b..7343595 100644 --- a/src/lune/scheduler/impl_threads.rs +++ b/src/lune/scheduler/impl_threads.rs @@ -1,6 +1,10 @@ use mlua::prelude::*; -use super::{thread::SchedulerThread, traits::IntoLuaThread, SchedulerImpl}; +use super::{ + thread::{SchedulerThread, SchedulerThreadId, SchedulerThreadSender}, + traits::IntoLuaThread, + SchedulerImpl, +}; impl<'lua> SchedulerImpl { /** @@ -10,7 +14,7 @@ impl<'lua> SchedulerImpl { */ pub(super) fn pop_thread( &'lua self, - ) -> LuaResult, LuaMultiValue<'lua>)>> { + ) -> LuaResult, LuaMultiValue<'lua>, SchedulerThreadSender)>> { match self .threads .try_borrow_mut() @@ -19,8 +23,14 @@ impl<'lua> SchedulerImpl { .pop_front() { Some(thread) => { + let thread_id = &thread.id(); let (thread, args) = thread.into_inner(&self.lua); - Ok(Some((thread, args))) + let sender = self + .thread_senders + .borrow_mut() + .remove(thread_id) + .expect("Missing thread sender"); + Ok(Some((thread, args, sender))) } None => Ok(None), } @@ -34,17 +44,23 @@ impl<'lua> SchedulerImpl { &'lua self, thread: impl IntoLuaThread<'lua>, args: impl IntoLuaMulti<'lua>, - ) -> LuaResult<()> { + ) -> LuaResult { let thread = thread.into_lua_thread(&self.lua)?; let args = args.into_lua_multi(&self.lua)?; + let thread = SchedulerThread::new(&self.lua, thread, args)?; + let thread_id = thread.id(); + self.threads .try_borrow_mut() .into_lua_err() .context("Failed to borrow threads vec")? - .push_front(SchedulerThread::new(&self.lua, thread, args)?); + .push_front(thread); + self.thread_senders + .borrow_mut() + .insert(thread_id, SchedulerThreadSender::new(1)); - Ok(()) + Ok(thread_id) } /** @@ -55,16 +71,48 @@ impl<'lua> SchedulerImpl { &'lua self, thread: impl IntoLuaThread<'lua>, args: impl IntoLuaMulti<'lua>, - ) -> LuaResult<()> { + ) -> LuaResult { let thread = thread.into_lua_thread(&self.lua)?; let args = args.into_lua_multi(&self.lua)?; + let thread = SchedulerThread::new(&self.lua, thread, args)?; + let thread_id = thread.id(); + self.threads .try_borrow_mut() .into_lua_err() .context("Failed to borrow threads vec")? - .push_back(SchedulerThread::new(&self.lua, thread, args)?); + .push_back(thread); + self.thread_senders + .borrow_mut() + .insert(thread_id, SchedulerThreadSender::new(1)); - Ok(()) + Ok(thread_id) + } + + /** + Waits for the given thread to finish running, and returns its result. + */ + pub async fn wait_for_thread( + &'lua self, + thread_id: SchedulerThreadId, + ) -> LuaResult> { + let mut recv = { + let senders = self.thread_senders.borrow(); + let sender = senders + .get(&thread_id) + .expect("Tried to wait for thread that is not queued"); + sender.subscribe() + }; + match recv.recv().await.expect("Failed to receive thread result") { + Err(e) => Err(e), + Ok(k) => { + let vals = self + .lua + .registry_value::>(&k) + .expect("Received invalid registry key for thread"); + Ok(LuaMultiValue::from_vec(vals)) + } + } } } diff --git a/src/lune/scheduler/mod.rs b/src/lune/scheduler/mod.rs index acc814c..0ce3de7 100644 --- a/src/lune/scheduler/mod.rs +++ b/src/lune/scheduler/mod.rs @@ -1,4 +1,9 @@ -use std::{cell::RefCell, collections::VecDeque, ops::Deref, sync::Arc}; +use std::{ + cell::RefCell, + collections::{HashMap, VecDeque}, + ops::Deref, + sync::Arc, +}; use mlua::prelude::*; @@ -9,7 +14,10 @@ mod traits; mod impl_runner; mod impl_threads; -use self::{state::SchedulerState, thread::SchedulerThread}; +use self::{ + state::SchedulerState, + thread::{SchedulerThread, SchedulerThreadId, SchedulerThreadSender}, +}; /** Scheduler for Lua threads. @@ -57,6 +65,7 @@ pub struct SchedulerImpl { lua: Arc, state: SchedulerState, threads: RefCell>, + thread_senders: RefCell>, } impl SchedulerImpl { @@ -65,6 +74,7 @@ impl SchedulerImpl { lua, state: SchedulerState::new(), threads: RefCell::new(VecDeque::new()), + thread_senders: RefCell::new(HashMap::new()), } } } diff --git a/src/lune/scheduler/thread.rs b/src/lune/scheduler/thread.rs index da43880..9ddd5a7 100644 --- a/src/lune/scheduler/thread.rs +++ b/src/lune/scheduler/thread.rs @@ -1,10 +1,38 @@ +use std::sync::Arc; + use mlua::prelude::*; +use rand::Rng; +use tokio::sync::broadcast::Sender; + +/** + Type alias for a broadcast [`Sender`], which will + broadcast the result and return values of a lua thread. + + The return values are stored in the lua registry as a + `Vec>`, and the registry key pointing to + those values will be sent using the broadcast sender. +*/ +pub type SchedulerThreadSender = Sender>>; + +/** + Unique, randomly generated id for a scheduler thread. +*/ +#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)] +pub struct SchedulerThreadId(u128); + +impl SchedulerThreadId { + fn gen() -> Self { + // FUTURE: Use a faster rng here? + Self(rand::thread_rng().gen()) + } +} /** Container for registry keys that point to a thread and thread arguments. */ #[derive(Debug)] pub(super) struct SchedulerThread { + scheduler_id: SchedulerThreadId, key_thread: LuaRegistryKey, key_args: LuaRegistryKey, } @@ -30,6 +58,7 @@ impl SchedulerThread { .context("Failed to store value in registry")?; Ok(Self { + scheduler_id: SchedulerThreadId::gen(), key_thread, key_args, }) @@ -55,4 +84,11 @@ impl SchedulerThread { (thread, args) } + + /** + Retrieves the unique, randomly generated id for this scheduler thread. + */ + pub(super) fn id(&self) -> SchedulerThreadId { + self.scheduler_id + } }