diff --git a/src/lune/scheduler/impl_async.rs b/src/lune/scheduler/impl_async.rs index 416f632..8db9e37 100644 --- a/src/lune/scheduler/impl_async.rs +++ b/src/lune/scheduler/impl_async.rs @@ -5,7 +5,7 @@ use tokio::{ task, }; -use super::{IntoLuaThread, Scheduler}; +use super::{IntoLuaThread, Scheduler, SchedulerMessage}; impl<'fut> Scheduler<'fut> { /** @@ -61,6 +61,14 @@ impl<'fut> Scheduler<'fut> { handle.await.ok(); })); + // NOTE: We might be resuming lua futures, need to signal that a + // new background future is ready to break out of futures resumption + if self.futures_signal.receiver_count() > 0 { + self.futures_signal + .send(SchedulerMessage::SpawnedBackgroundFuture) + .ok(); + } + rx } @@ -84,6 +92,14 @@ impl<'fut> Scheduler<'fut> { tx.send(res).ok(); })); + // NOTE: We might be resuming lua futures, need to signal that a + // new background future is ready to break out of futures resumption + if self.futures_signal.receiver_count() > 0 { + self.futures_signal + .send(SchedulerMessage::SpawnedBackgroundFuture) + .ok(); + } + rx } @@ -121,6 +137,14 @@ impl<'fut> Scheduler<'fut> { } })); + // NOTE: We might be resuming background futures, need to signal that a + // new background future is ready to break out of futures resumption + if self.futures_signal.receiver_count() > 0 { + self.futures_signal + .send(SchedulerMessage::SpawnedLuaFuture) + .ok(); + } + Ok(()) } } diff --git a/src/lune/scheduler/impl_runner.rs b/src/lune/scheduler/impl_runner.rs index 0f4866c..588d194 100644 --- a/src/lune/scheduler/impl_runner.rs +++ b/src/lune/scheduler/impl_runner.rs @@ -13,15 +13,13 @@ use super::Scheduler; impl<'fut> Scheduler<'fut> { /** Runs all lua threads to completion. - - Returns the number of threads that were resumed. */ - fn run_lua_threads(&self, lua: &Lua) -> usize { + fn run_lua_threads(&self, lua: &Lua) { if self.state.has_exit_code() { - return 0; + return; } - let mut resumed_count = 0; + let mut count = 0; // Pop threads from the scheduler until there are none left while let Some(thread) = self @@ -44,7 +42,7 @@ impl<'fut> Scheduler<'fut> { let res = thread.resume::<_, LuaMultiValue>(args); self.state.set_current_thread_id(None); - resumed_count += 1; + count += 1; // If we got any resumption (lua-side) error, increment // the error count of the scheduler so we can exit with @@ -79,54 +77,125 @@ impl<'fut> Scheduler<'fut> { } } - resumed_count + if count > 0 { + debug! { + %count, + "resumed lua" + } + } } /** - Runs futures until none are left or a future spawned a new lua thread. + Runs the next lua future to completion. + + Panics if no lua future is queued. */ - async fn run_futures_lua(&self) -> usize { + async fn run_future_lua(&self) { let mut futs = self .futures_lua .try_lock() .expect("Failed to lock lua futures for resumption"); - - let mut fut_count = 0; - while futs.next().await.is_some() { - fut_count += 1; - if self.has_thread() { - break; - } - } - fut_count + assert!(futs.len() > 0, "No lua futures are queued"); + futs.next().await; } /** - Runs background futures until none are left or a future spawned a new lua thread. + Runs the next background future to completion. + + Panics if no background future is queued. */ - async fn run_futures_background(&self) -> usize { + async fn run_future_background(&self) { let mut futs = self .futures_background .try_lock() .expect("Failed to lock background futures for resumption"); - - let mut fut_count = 0; - while futs.next().await.is_some() { - fut_count += 1; - if self.has_thread() { - break; - } - } - fut_count + assert!(futs.len() > 0, "No background futures are queued"); + futs.next().await; } - async fn run_futures(&self) -> usize { - let mut rx = self.futures_break_signal.subscribe(); + /** + Runs as many futures as possible, until a new lua thread + is ready, or an exit code has been set for the scheduler. - tokio::select! { - ran = self.run_futures_lua() => ran, - ran = self.run_futures_background() => ran, - _ = rx.recv() => 0, + ### Implementation details + + Running futures on our scheduler consists of a couple moving parts: + + 1. An unordered futures queue for lua (main thread, local) futures + 2. An unordered futures queue for background (multithreaded, 'static lifetime) futures + 3. A signal for breaking out of futures resumption + + The two unordered futures queues need to run concurrently, + but since `FuturesUnordered` returns instantly if it does + not currently have any futures queued on it, we need to do + this branching loop, checking if each queue has futures first. + + We also need to listen for our signal, to see if we should break out of resumption: + + * Always break out of resumption if a new lua thread is ready + * Always break out of resumption if an exit code has been set + * Break out of lua futures resumption if we have a new background future + * Break out of background futures resumption if we have a new lua future + + We need to listen for both future queues concurrently, + and break out whenever the other corresponding queue has + a new future, since the other queue may resume sooner. + */ + async fn run_futures(&self) { + let (mut has_lua, mut has_background) = self.has_futures(); + if !has_lua && !has_background { + return; + } + + let mut rx = self.futures_signal.subscribe(); + let mut count = 0; + while has_lua || has_background { + if has_lua && has_background { + tokio::select! { + _ = self.run_future_lua() => {}, + _ = self.run_future_background() => {}, + msg = rx.recv() => { + if let Ok(msg) = msg { + if msg.should_break_futures() { + break; + } + } + } + } + count += 1; + } else if has_lua { + tokio::select! { + _ = self.run_future_lua() => {}, + msg = rx.recv() => { + if let Ok(msg) = msg { + if msg.should_break_lua_futures() { + break; + } + } + } + } + count += 1; + } else if has_background { + tokio::select! { + _ = self.run_future_background() => {}, + msg = rx.recv() => { + if let Ok(msg) = msg { + if msg.should_break_background_futures() { + break; + } + } + } + } + count += 1; + } + (has_lua, has_background) = self.has_futures(); + } + + if count > 0 { + debug! { + %count, + "resumed lua futures" + } } } @@ -147,10 +216,7 @@ impl<'fut> Scheduler<'fut> { loop { // 1. Run lua threads until exit or there are none left - let lua_count = self.run_lua_threads(lua); - if lua_count > 0 { - debug!("Ran {lua_count} lua threads"); - } + self.run_lua_threads(lua); // 2. If we got a manual exit code from lua we should // not try to wait for any pending futures to complete @@ -161,10 +227,7 @@ impl<'fut> Scheduler<'fut> { // 3. Keep resuming futures until there are no futures left to // resume, or until we manually break out of resumption for any // reason, this may be because a future spawned a new lua thread - let fut_count = self.run_futures().await; - if fut_count > 0 { - debug!("Ran {fut_count} futures"); - } + self.run_futures().await; // 4. Once again, check for an exit code, in case a future sets one if self.state.has_exit_code() { @@ -180,13 +243,16 @@ impl<'fut> Scheduler<'fut> { } if let Some(code) = self.state.exit_code() { - debug!("Scheduler ran to completion, exit code {}", code); + debug! { + %code, + "scheduler ran to completion" + }; ExitCode::from(code) } else if self.state.has_errored() { - debug!("Scheduler ran to completion, with failure"); + debug!("scheduler ran to completion, with failure"); ExitCode::FAILURE } else { - debug!("Scheduler ran to completion, with success"); + debug!("scheduler ran to completion, with success"); ExitCode::SUCCESS } } diff --git a/src/lune/scheduler/impl_threads.rs b/src/lune/scheduler/impl_threads.rs index b72145e..c633cd0 100644 --- a/src/lune/scheduler/impl_threads.rs +++ b/src/lune/scheduler/impl_threads.rs @@ -4,7 +4,7 @@ use mlua::prelude::*; use super::{ thread::{SchedulerThread, SchedulerThreadId, SchedulerThreadSender}, - IntoLuaThread, Scheduler, + IntoLuaThread, Scheduler, SchedulerMessage, }; impl<'fut> Scheduler<'fut> { @@ -61,8 +61,10 @@ impl<'fut> Scheduler<'fut> { // NOTE: We might be resuming futures, need to signal that a // new lua thread is ready to break out of futures resumption - if self.futures_break_signal.receiver_count() > 0 { - self.futures_break_signal.send(()).ok(); + if self.futures_signal.receiver_count() > 0 { + self.futures_signal + .send(SchedulerMessage::PushedLuaThread) + .ok(); } Ok(()) @@ -100,8 +102,10 @@ impl<'fut> Scheduler<'fut> { // NOTE: We might be resuming futures, need to signal that a // new lua thread is ready to break out of futures resumption - if self.futures_break_signal.receiver_count() > 0 { - self.futures_break_signal.send(()).ok(); + if self.futures_signal.receiver_count() > 0 { + self.futures_signal + .send(SchedulerMessage::PushedLuaThread) + .ok(); } Ok(thread_id) @@ -139,8 +143,10 @@ impl<'fut> Scheduler<'fut> { // NOTE: We might be resuming futures, need to signal that a // new lua thread is ready to break out of futures resumption - if self.futures_break_signal.receiver_count() > 0 { - self.futures_break_signal.send(()).ok(); + if self.futures_signal.receiver_count() > 0 { + self.futures_signal + .send(SchedulerMessage::PushedLuaThread) + .ok(); } Ok(thread_id) diff --git a/src/lune/scheduler/message.rs b/src/lune/scheduler/message.rs new file mode 100644 index 0000000..c4463ff --- /dev/null +++ b/src/lune/scheduler/message.rs @@ -0,0 +1,21 @@ +#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)] +pub enum SchedulerMessage { + ExitCodeSet, + PushedLuaThread, + SpawnedLuaFuture, + SpawnedBackgroundFuture, +} + +impl SchedulerMessage { + pub fn should_break_futures(self) -> bool { + matches!(self, Self::ExitCodeSet | Self::PushedLuaThread) + } + + pub fn should_break_lua_futures(self) -> bool { + self.should_break_futures() || matches!(self, Self::SpawnedBackgroundFuture) + } + + pub fn should_break_background_futures(self) -> bool { + self.should_break_futures() || matches!(self, Self::SpawnedLuaFuture) + } +} diff --git a/src/lune/scheduler/mod.rs b/src/lune/scheduler/mod.rs index 7fb417d..d84b0f7 100644 --- a/src/lune/scheduler/mod.rs +++ b/src/lune/scheduler/mod.rs @@ -12,6 +12,7 @@ use tokio::sync::{ Mutex as AsyncMutex, }; +mod message; mod state; mod thread; mod traits; @@ -20,6 +21,7 @@ mod impl_async; mod impl_runner; mod impl_threads; +pub use self::message::SchedulerMessage; pub use self::thread::SchedulerThreadId; pub use self::traits::*; @@ -43,7 +45,7 @@ pub(crate) struct Scheduler<'fut> { thread_senders: Arc>>, futures_lua: Arc>>>, futures_background: Arc>>>, - futures_break_signal: Sender<()>, + futures_signal: Sender, } impl<'fut> Scheduler<'fut> { @@ -51,7 +53,7 @@ impl<'fut> Scheduler<'fut> { Creates a new scheduler. */ pub fn new() -> Self { - let (futures_break_signal, _) = channel(1); + let (futures_signal, _) = channel(1); Self { state: Arc::new(SchedulerState::new()), @@ -59,7 +61,7 @@ impl<'fut> Scheduler<'fut> { thread_senders: Arc::new(RefCell::new(HashMap::new())), futures_lua: Arc::new(AsyncMutex::new(FuturesUnordered::new())), futures_background: Arc::new(AsyncMutex::new(FuturesUnordered::new())), - futures_break_signal, + futures_signal, } } @@ -95,7 +97,8 @@ impl<'fut> Scheduler<'fut> { self.state.exit_code().is_none(), "Exit code may only be set exactly once" ); - self.state.set_exit_code(code.into()) + self.state.set_exit_code(code.into()); + self.futures_signal.send(SchedulerMessage::ExitCodeSet).ok(); } #[doc(hidden)]