From 32fd04ef85d9d6fa56449c0eff37666cd0de311d Mon Sep 17 00:00:00 2001 From: Filip Tibell Date: Tue, 22 Aug 2023 19:41:20 -0500 Subject: [PATCH] Add notes for future scheduler improvements, add wrapper structs for messaging --- src/lune/mod.rs | 9 ++-- src/lune/scheduler/impl_async.rs | 20 ++------ src/lune/scheduler/impl_runner.rs | 9 ++-- src/lune/scheduler/impl_threads.rs | 20 ++------ src/lune/scheduler/message.rs | 79 +++++++++++++++++++++++++++++- src/lune/scheduler/mod.rs | 28 +++++++---- src/lune/scheduler/state.rs | 43 ++++++++++++++-- 7 files changed, 154 insertions(+), 54 deletions(-) diff --git a/src/lune/mod.rs b/src/lune/mod.rs index 2ef866e..63261d8 100644 --- a/src/lune/mod.rs +++ b/src/lune/mod.rs @@ -25,9 +25,12 @@ impl Lune { */ #[allow(clippy::new_without_default)] pub fn new() -> Self { - // FIXME: Leaking these here does not feel great... is there - // any way for us to create a scheduler, store it in app data, and - // guarantee it has the same lifetime as Lua without using any unsafe? + /* + FUTURE: Stop leaking these when we have removed the lifetime + on the scheduler and can place them in lua app data using arc + + See the scheduler struct for more notes + */ let lua = Lua::new().into_static(); let scheduler = Scheduler::new().into_static(); diff --git a/src/lune/scheduler/impl_async.rs b/src/lune/scheduler/impl_async.rs index 8db9e37..019b743 100644 --- a/src/lune/scheduler/impl_async.rs +++ b/src/lune/scheduler/impl_async.rs @@ -5,7 +5,7 @@ use tokio::{ task, }; -use super::{IntoLuaThread, Scheduler, SchedulerMessage}; +use super::{IntoLuaThread, Scheduler}; impl<'fut> Scheduler<'fut> { /** @@ -63,11 +63,7 @@ impl<'fut> Scheduler<'fut> { // NOTE: We might be resuming lua futures, need to signal that a // new background future is ready to break out of futures resumption - if self.futures_signal.receiver_count() > 0 { - self.futures_signal - .send(SchedulerMessage::SpawnedBackgroundFuture) - .ok(); - } + self.state.message_sender().send_spawned_background_future(); rx } @@ -94,11 +90,7 @@ impl<'fut> Scheduler<'fut> { // NOTE: We might be resuming lua futures, need to signal that a // new background future is ready to break out of futures resumption - if self.futures_signal.receiver_count() > 0 { - self.futures_signal - .send(SchedulerMessage::SpawnedBackgroundFuture) - .ok(); - } + self.state.message_sender().send_spawned_background_future(); rx } @@ -139,11 +131,7 @@ impl<'fut> Scheduler<'fut> { // NOTE: We might be resuming background futures, need to signal that a // new background future is ready to break out of futures resumption - if self.futures_signal.receiver_count() > 0 { - self.futures_signal - .send(SchedulerMessage::SpawnedLuaFuture) - .ok(); - } + self.state.message_sender().send_spawned_lua_future(); Ok(()) } diff --git a/src/lune/scheduler/impl_runner.rs b/src/lune/scheduler/impl_runner.rs index 588d194..b4c1c61 100644 --- a/src/lune/scheduler/impl_runner.rs +++ b/src/lune/scheduler/impl_runner.rs @@ -147,15 +147,16 @@ impl<'fut> Scheduler<'fut> { return; } - let mut rx = self.futures_signal.subscribe(); + let mut rx = self.state.message_receiver(); let mut count = 0; + while has_lua || has_background { if has_lua && has_background { tokio::select! { _ = self.run_future_lua() => {}, _ = self.run_future_background() => {}, msg = rx.recv() => { - if let Ok(msg) = msg { + if let Some(msg) = msg { if msg.should_break_futures() { break; } @@ -167,7 +168,7 @@ impl<'fut> Scheduler<'fut> { tokio::select! { _ = self.run_future_lua() => {}, msg = rx.recv() => { - if let Ok(msg) = msg { + if let Some(msg) = msg { if msg.should_break_lua_futures() { break; } @@ -179,7 +180,7 @@ impl<'fut> Scheduler<'fut> { tokio::select! { _ = self.run_future_background() => {}, msg = rx.recv() => { - if let Ok(msg) = msg { + if let Some(msg) = msg { if msg.should_break_background_futures() { break; } diff --git a/src/lune/scheduler/impl_threads.rs b/src/lune/scheduler/impl_threads.rs index c633cd0..850d3c2 100644 --- a/src/lune/scheduler/impl_threads.rs +++ b/src/lune/scheduler/impl_threads.rs @@ -4,7 +4,7 @@ use mlua::prelude::*; use super::{ thread::{SchedulerThread, SchedulerThreadId, SchedulerThreadSender}, - IntoLuaThread, Scheduler, SchedulerMessage, + IntoLuaThread, Scheduler, }; impl<'fut> Scheduler<'fut> { @@ -61,11 +61,7 @@ impl<'fut> Scheduler<'fut> { // NOTE: We might be resuming futures, need to signal that a // new lua thread is ready to break out of futures resumption - if self.futures_signal.receiver_count() > 0 { - self.futures_signal - .send(SchedulerMessage::PushedLuaThread) - .ok(); - } + self.state.message_sender().send_pushed_lua_thread(); Ok(()) } @@ -102,11 +98,7 @@ impl<'fut> Scheduler<'fut> { // NOTE: We might be resuming futures, need to signal that a // new lua thread is ready to break out of futures resumption - if self.futures_signal.receiver_count() > 0 { - self.futures_signal - .send(SchedulerMessage::PushedLuaThread) - .ok(); - } + self.state.message_sender().send_pushed_lua_thread(); Ok(thread_id) } @@ -143,11 +135,7 @@ impl<'fut> Scheduler<'fut> { // NOTE: We might be resuming futures, need to signal that a // new lua thread is ready to break out of futures resumption - if self.futures_signal.receiver_count() > 0 { - self.futures_signal - .send(SchedulerMessage::PushedLuaThread) - .ok(); - } + self.state.message_sender().send_pushed_lua_thread(); Ok(thread_id) } diff --git a/src/lune/scheduler/message.rs b/src/lune/scheduler/message.rs index c4463ff..4d05343 100644 --- a/src/lune/scheduler/message.rs +++ b/src/lune/scheduler/message.rs @@ -1,5 +1,11 @@ +use std::sync::{MutexGuard, TryLockError}; + +use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; + +use super::state::SchedulerState; + #[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)] -pub enum SchedulerMessage { +pub(crate) enum SchedulerMessage { ExitCodeSet, PushedLuaThread, SpawnedLuaFuture, @@ -19,3 +25,74 @@ impl SchedulerMessage { self.should_break_futures() || matches!(self, Self::SpawnedLuaFuture) } } + +/** + A message sender for the scheduler. + + As long as this sender is not dropped, the scheduler + will be kept alive, waiting for more messages to arrive. +*/ +pub(crate) struct SchedulerMessageSender(UnboundedSender); + +impl SchedulerMessageSender { + /** + Creates a new message sender for the scheduler. + */ + pub fn new(state: &SchedulerState) -> Self { + Self( + state + .message_sender + .lock() + .expect("Scheduler state was poisoned") + .clone(), + ) + } + + pub fn send_exit_code_set(&self) { + self.0.send(SchedulerMessage::ExitCodeSet).ok(); + } + + pub fn send_pushed_lua_thread(&self) { + self.0.send(SchedulerMessage::PushedLuaThread).ok(); + } + + pub fn send_spawned_lua_future(&self) { + self.0.send(SchedulerMessage::SpawnedLuaFuture).ok(); + } + + pub fn send_spawned_background_future(&self) { + self.0.send(SchedulerMessage::SpawnedBackgroundFuture).ok(); + } +} + +/** + A message receiver for the scheduler. + + Only one message receiver may exist per scheduler. +*/ +pub(crate) struct SchedulerMessageReceiver<'a>(MutexGuard<'a, UnboundedReceiver>); + +impl<'a> SchedulerMessageReceiver<'a> { + /** + Creates a new message receiver for the scheduler. + + Panics if the message receiver is already being used. + */ + pub fn new(state: &'a SchedulerState) -> Self { + Self(match state.message_receiver.try_lock() { + Err(TryLockError::Poisoned(_)) => panic!("Sheduler state was poisoned"), + Err(TryLockError::WouldBlock) => { + panic!("Message receiver may only be borrowed once at a time") + } + Ok(guard) => guard, + }) + } + + // NOTE: Holding this lock across await points is fine, since we + // can only ever create lock exactly one SchedulerMessageReceiver + // See above constructor for details on this + #[allow(clippy::await_holding_lock)] + pub async fn recv(&mut self) -> Option { + self.0.recv().await + } +} diff --git a/src/lune/scheduler/mod.rs b/src/lune/scheduler/mod.rs index d84b0f7..e153234 100644 --- a/src/lune/scheduler/mod.rs +++ b/src/lune/scheduler/mod.rs @@ -7,10 +7,7 @@ use std::{ use futures_util::{stream::FuturesUnordered, Future}; use mlua::prelude::*; -use tokio::sync::{ - broadcast::{channel, Sender}, - Mutex as AsyncMutex, -}; +use tokio::sync::Mutex as AsyncMutex; mod message; mod state; @@ -21,7 +18,6 @@ mod impl_async; mod impl_runner; mod impl_threads; -pub use self::message::SchedulerMessage; pub use self::thread::SchedulerThreadId; pub use self::traits::*; @@ -43,9 +39,25 @@ pub(crate) struct Scheduler<'fut> { state: Arc, threads: Arc>>, thread_senders: Arc>>, + /* + FUTURE: Get rid of these, let the tokio runtime handle running + and resumption of futures completely, just use our scheduler + state and receiver to know when we have run to completion. + If we have no senders left, we have run to completion. + + Once we no longer store futures in our scheduler, we can + get rid of the lifetime on it, store it in our lua app + data as a Weak, together with a Weak. + + In our lua async functions we can then get a reference to this, + upgrade it to an Arc and Arc to extend lifetimes, + and hopefully get rid of Box::leak and 'static lifetimes for good. + + Relevant comment on the mlua repository: + https://github.com/khvzak/mlua/issues/169#issuecomment-1138863979 + */ futures_lua: Arc>>>, futures_background: Arc>>>, - futures_signal: Sender, } impl<'fut> Scheduler<'fut> { @@ -53,15 +65,12 @@ impl<'fut> Scheduler<'fut> { Creates a new scheduler. */ pub fn new() -> Self { - let (futures_signal, _) = channel(1); - Self { state: Arc::new(SchedulerState::new()), threads: Arc::new(RefCell::new(VecDeque::new())), thread_senders: Arc::new(RefCell::new(HashMap::new())), futures_lua: Arc::new(AsyncMutex::new(FuturesUnordered::new())), futures_background: Arc::new(AsyncMutex::new(FuturesUnordered::new())), - futures_signal, } } @@ -98,7 +107,6 @@ impl<'fut> Scheduler<'fut> { "Exit code may only be set exactly once" ); self.state.set_exit_code(code.into()); - self.futures_signal.send(SchedulerMessage::ExitCodeSet).ok(); } #[doc(hidden)] diff --git a/src/lune/scheduler/state.rs b/src/lune/scheduler/state.rs index f2df7ee..d6682f1 100644 --- a/src/lune/scheduler/state.rs +++ b/src/lune/scheduler/state.rs @@ -8,7 +8,12 @@ use std::{ use mlua::Error as LuaError; -use super::SchedulerThreadId; +use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; + +use super::{ + message::{SchedulerMessage, SchedulerMessageReceiver, SchedulerMessageSender}, + SchedulerThreadId, +}; /** Internal state for a [`Scheduler`]. @@ -16,14 +21,16 @@ use super::SchedulerThreadId; This scheduler state uses atomic operations for everything except lua error storage, and is completely thread safe. */ -#[derive(Debug, Default)] -pub struct SchedulerState { +#[derive(Debug)] +pub(crate) struct SchedulerState { exit_state: AtomicBool, exit_code: AtomicU8, num_resumptions: AtomicUsize, num_errors: AtomicUsize, thread_id: Arc>>, thread_errors: Arc>>, + pub(super) message_sender: Arc>>, + pub(super) message_receiver: Arc>>, } impl SchedulerState { @@ -31,7 +38,18 @@ impl SchedulerState { Creates a new scheduler state. */ pub fn new() -> Self { - Self::default() + let (message_sender, message_receiver) = unbounded_channel(); + + Self { + exit_state: AtomicBool::new(false), + exit_code: AtomicU8::new(0), + num_resumptions: AtomicUsize::new(0), + num_errors: AtomicUsize::new(0), + thread_id: Arc::new(Mutex::new(None)), + thread_errors: Arc::new(Mutex::new(HashMap::new())), + message_sender: Arc::new(Mutex::new(message_sender)), + message_receiver: Arc::new(Mutex::new(message_receiver)), + } } /** @@ -78,6 +96,7 @@ impl SchedulerState { pub fn set_exit_code(&self, code: impl Into) { self.exit_state.store(true, Ordering::SeqCst); self.exit_code.store(code.into(), Ordering::SeqCst); + self.message_sender().send_exit_code_set(); } /** @@ -138,4 +157,20 @@ impl SchedulerState { .expect("Failed to lock thread errors"); thread_errors.insert(id, err); } + + /** + Creates a new message sender for the scheduler. + */ + pub fn message_sender(&self) -> SchedulerMessageSender { + SchedulerMessageSender::new(self) + } + + /** + Tries to borrow the message receiver for the scheduler. + + Panics if the message receiver is already being used. + */ + pub fn message_receiver(&self) -> SchedulerMessageReceiver { + SchedulerMessageReceiver::new(self) + } }