Add notes for future scheduler improvements, add wrapper structs for messaging

This commit is contained in:
Filip Tibell 2023-08-22 19:41:20 -05:00
parent 5ae8f662b9
commit 83db30496d
7 changed files with 154 additions and 54 deletions

View file

@ -25,9 +25,12 @@ impl Lune {
*/
#[allow(clippy::new_without_default)]
pub fn new() -> Self {
// FIXME: Leaking these here does not feel great... is there
// any way for us to create a scheduler, store it in app data, and
// guarantee it has the same lifetime as Lua without using any unsafe?
/*
FUTURE: Stop leaking these when we have removed the lifetime
on the scheduler and can place them in lua app data using arc
See the scheduler struct for more notes
*/
let lua = Lua::new().into_static();
let scheduler = Scheduler::new().into_static();

View file

@ -5,7 +5,7 @@ use tokio::{
task,
};
use super::{IntoLuaThread, Scheduler, SchedulerMessage};
use super::{IntoLuaThread, Scheduler};
impl<'fut> Scheduler<'fut> {
/**
@ -63,11 +63,7 @@ impl<'fut> Scheduler<'fut> {
// NOTE: We might be resuming lua futures, need to signal that a
// new background future is ready to break out of futures resumption
if self.futures_signal.receiver_count() > 0 {
self.futures_signal
.send(SchedulerMessage::SpawnedBackgroundFuture)
.ok();
}
self.state.message_sender().send_spawned_background_future();
rx
}
@ -94,11 +90,7 @@ impl<'fut> Scheduler<'fut> {
// NOTE: We might be resuming lua futures, need to signal that a
// new background future is ready to break out of futures resumption
if self.futures_signal.receiver_count() > 0 {
self.futures_signal
.send(SchedulerMessage::SpawnedBackgroundFuture)
.ok();
}
self.state.message_sender().send_spawned_background_future();
rx
}
@ -139,11 +131,7 @@ impl<'fut> Scheduler<'fut> {
// NOTE: We might be resuming background futures, need to signal that a
// new background future is ready to break out of futures resumption
if self.futures_signal.receiver_count() > 0 {
self.futures_signal
.send(SchedulerMessage::SpawnedLuaFuture)
.ok();
}
self.state.message_sender().send_spawned_lua_future();
Ok(())
}

View file

@ -147,15 +147,16 @@ impl<'fut> Scheduler<'fut> {
return;
}
let mut rx = self.futures_signal.subscribe();
let mut rx = self.state.message_receiver();
let mut count = 0;
while has_lua || has_background {
if has_lua && has_background {
tokio::select! {
_ = self.run_future_lua() => {},
_ = self.run_future_background() => {},
msg = rx.recv() => {
if let Ok(msg) = msg {
if let Some(msg) = msg {
if msg.should_break_futures() {
break;
}
@ -167,7 +168,7 @@ impl<'fut> Scheduler<'fut> {
tokio::select! {
_ = self.run_future_lua() => {},
msg = rx.recv() => {
if let Ok(msg) = msg {
if let Some(msg) = msg {
if msg.should_break_lua_futures() {
break;
}
@ -179,7 +180,7 @@ impl<'fut> Scheduler<'fut> {
tokio::select! {
_ = self.run_future_background() => {},
msg = rx.recv() => {
if let Ok(msg) = msg {
if let Some(msg) = msg {
if msg.should_break_background_futures() {
break;
}

View file

@ -4,7 +4,7 @@ use mlua::prelude::*;
use super::{
thread::{SchedulerThread, SchedulerThreadId, SchedulerThreadSender},
IntoLuaThread, Scheduler, SchedulerMessage,
IntoLuaThread, Scheduler,
};
impl<'fut> Scheduler<'fut> {
@ -61,11 +61,7 @@ impl<'fut> Scheduler<'fut> {
// NOTE: We might be resuming futures, need to signal that a
// new lua thread is ready to break out of futures resumption
if self.futures_signal.receiver_count() > 0 {
self.futures_signal
.send(SchedulerMessage::PushedLuaThread)
.ok();
}
self.state.message_sender().send_pushed_lua_thread();
Ok(())
}
@ -102,11 +98,7 @@ impl<'fut> Scheduler<'fut> {
// NOTE: We might be resuming futures, need to signal that a
// new lua thread is ready to break out of futures resumption
if self.futures_signal.receiver_count() > 0 {
self.futures_signal
.send(SchedulerMessage::PushedLuaThread)
.ok();
}
self.state.message_sender().send_pushed_lua_thread();
Ok(thread_id)
}
@ -143,11 +135,7 @@ impl<'fut> Scheduler<'fut> {
// NOTE: We might be resuming futures, need to signal that a
// new lua thread is ready to break out of futures resumption
if self.futures_signal.receiver_count() > 0 {
self.futures_signal
.send(SchedulerMessage::PushedLuaThread)
.ok();
}
self.state.message_sender().send_pushed_lua_thread();
Ok(thread_id)
}

View file

@ -1,5 +1,11 @@
use std::sync::{MutexGuard, TryLockError};
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use super::state::SchedulerState;
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
pub enum SchedulerMessage {
pub(crate) enum SchedulerMessage {
ExitCodeSet,
PushedLuaThread,
SpawnedLuaFuture,
@ -19,3 +25,74 @@ impl SchedulerMessage {
self.should_break_futures() || matches!(self, Self::SpawnedLuaFuture)
}
}
/**
A message sender for the scheduler.
As long as this sender is not dropped, the scheduler
will be kept alive, waiting for more messages to arrive.
*/
pub(crate) struct SchedulerMessageSender(UnboundedSender<SchedulerMessage>);
impl SchedulerMessageSender {
/**
Creates a new message sender for the scheduler.
*/
pub fn new(state: &SchedulerState) -> Self {
Self(
state
.message_sender
.lock()
.expect("Scheduler state was poisoned")
.clone(),
)
}
pub fn send_exit_code_set(&self) {
self.0.send(SchedulerMessage::ExitCodeSet).ok();
}
pub fn send_pushed_lua_thread(&self) {
self.0.send(SchedulerMessage::PushedLuaThread).ok();
}
pub fn send_spawned_lua_future(&self) {
self.0.send(SchedulerMessage::SpawnedLuaFuture).ok();
}
pub fn send_spawned_background_future(&self) {
self.0.send(SchedulerMessage::SpawnedBackgroundFuture).ok();
}
}
/**
A message receiver for the scheduler.
Only one message receiver may exist per scheduler.
*/
pub(crate) struct SchedulerMessageReceiver<'a>(MutexGuard<'a, UnboundedReceiver<SchedulerMessage>>);
impl<'a> SchedulerMessageReceiver<'a> {
/**
Creates a new message receiver for the scheduler.
Panics if the message receiver is already being used.
*/
pub fn new(state: &'a SchedulerState) -> Self {
Self(match state.message_receiver.try_lock() {
Err(TryLockError::Poisoned(_)) => panic!("Sheduler state was poisoned"),
Err(TryLockError::WouldBlock) => {
panic!("Message receiver may only be borrowed once at a time")
}
Ok(guard) => guard,
})
}
// NOTE: Holding this lock across await points is fine, since we
// can only ever create lock exactly one SchedulerMessageReceiver
// See above constructor for details on this
#[allow(clippy::await_holding_lock)]
pub async fn recv(&mut self) -> Option<SchedulerMessage> {
self.0.recv().await
}
}

View file

@ -7,10 +7,7 @@ use std::{
use futures_util::{stream::FuturesUnordered, Future};
use mlua::prelude::*;
use tokio::sync::{
broadcast::{channel, Sender},
Mutex as AsyncMutex,
};
use tokio::sync::Mutex as AsyncMutex;
mod message;
mod state;
@ -21,7 +18,6 @@ mod impl_async;
mod impl_runner;
mod impl_threads;
pub use self::message::SchedulerMessage;
pub use self::thread::SchedulerThreadId;
pub use self::traits::*;
@ -43,9 +39,25 @@ pub(crate) struct Scheduler<'fut> {
state: Arc<SchedulerState>,
threads: Arc<RefCell<VecDeque<SchedulerThread>>>,
thread_senders: Arc<RefCell<HashMap<SchedulerThreadId, SchedulerThreadSender>>>,
/*
FUTURE: Get rid of these, let the tokio runtime handle running
and resumption of futures completely, just use our scheduler
state and receiver to know when we have run to completion.
If we have no senders left, we have run to completion.
Once we no longer store futures in our scheduler, we can
get rid of the lifetime on it, store it in our lua app
data as a Weak<Scheduler>, together with a Weak<Lua>.
In our lua async functions we can then get a reference to this,
upgrade it to an Arc<Scheduler> and Arc<Lua> to extend lifetimes,
and hopefully get rid of Box::leak and 'static lifetimes for good.
Relevant comment on the mlua repository:
https://github.com/khvzak/mlua/issues/169#issuecomment-1138863979
*/
futures_lua: Arc<AsyncMutex<FuturesUnordered<SchedulerFuture<'fut>>>>,
futures_background: Arc<AsyncMutex<FuturesUnordered<SchedulerFuture<'static>>>>,
futures_signal: Sender<SchedulerMessage>,
}
impl<'fut> Scheduler<'fut> {
@ -53,15 +65,12 @@ impl<'fut> Scheduler<'fut> {
Creates a new scheduler.
*/
pub fn new() -> Self {
let (futures_signal, _) = channel(1);
Self {
state: Arc::new(SchedulerState::new()),
threads: Arc::new(RefCell::new(VecDeque::new())),
thread_senders: Arc::new(RefCell::new(HashMap::new())),
futures_lua: Arc::new(AsyncMutex::new(FuturesUnordered::new())),
futures_background: Arc::new(AsyncMutex::new(FuturesUnordered::new())),
futures_signal,
}
}
@ -98,7 +107,6 @@ impl<'fut> Scheduler<'fut> {
"Exit code may only be set exactly once"
);
self.state.set_exit_code(code.into());
self.futures_signal.send(SchedulerMessage::ExitCodeSet).ok();
}
#[doc(hidden)]

View file

@ -8,7 +8,12 @@ use std::{
use mlua::Error as LuaError;
use super::SchedulerThreadId;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use super::{
message::{SchedulerMessage, SchedulerMessageReceiver, SchedulerMessageSender},
SchedulerThreadId,
};
/**
Internal state for a [`Scheduler`].
@ -16,14 +21,16 @@ use super::SchedulerThreadId;
This scheduler state uses atomic operations for everything
except lua error storage, and is completely thread safe.
*/
#[derive(Debug, Default)]
pub struct SchedulerState {
#[derive(Debug)]
pub(crate) struct SchedulerState {
exit_state: AtomicBool,
exit_code: AtomicU8,
num_resumptions: AtomicUsize,
num_errors: AtomicUsize,
thread_id: Arc<Mutex<Option<SchedulerThreadId>>>,
thread_errors: Arc<Mutex<HashMap<SchedulerThreadId, LuaError>>>,
pub(super) message_sender: Arc<Mutex<UnboundedSender<SchedulerMessage>>>,
pub(super) message_receiver: Arc<Mutex<UnboundedReceiver<SchedulerMessage>>>,
}
impl SchedulerState {
@ -31,7 +38,18 @@ impl SchedulerState {
Creates a new scheduler state.
*/
pub fn new() -> Self {
Self::default()
let (message_sender, message_receiver) = unbounded_channel();
Self {
exit_state: AtomicBool::new(false),
exit_code: AtomicU8::new(0),
num_resumptions: AtomicUsize::new(0),
num_errors: AtomicUsize::new(0),
thread_id: Arc::new(Mutex::new(None)),
thread_errors: Arc::new(Mutex::new(HashMap::new())),
message_sender: Arc::new(Mutex::new(message_sender)),
message_receiver: Arc::new(Mutex::new(message_receiver)),
}
}
/**
@ -78,6 +96,7 @@ impl SchedulerState {
pub fn set_exit_code(&self, code: impl Into<u8>) {
self.exit_state.store(true, Ordering::SeqCst);
self.exit_code.store(code.into(), Ordering::SeqCst);
self.message_sender().send_exit_code_set();
}
/**
@ -138,4 +157,20 @@ impl SchedulerState {
.expect("Failed to lock thread errors");
thread_errors.insert(id, err);
}
/**
Creates a new message sender for the scheduler.
*/
pub fn message_sender(&self) -> SchedulerMessageSender {
SchedulerMessageSender::new(self)
}
/**
Tries to borrow the message receiver for the scheduler.
Panics if the message receiver is already being used.
*/
pub fn message_receiver(&self) -> SchedulerMessageReceiver {
SchedulerMessageReceiver::new(self)
}
}