Improvements to scheduler futures resumption

This commit is contained in:
Filip Tibell 2023-08-22 10:36:45 -05:00
parent 38a91a3dc3
commit e5ed668a33
5 changed files with 178 additions and 58 deletions

View file

@ -5,7 +5,7 @@ use tokio::{
task,
};
use super::{IntoLuaThread, Scheduler};
use super::{IntoLuaThread, Scheduler, SchedulerMessage};
impl<'fut> Scheduler<'fut> {
/**
@ -61,6 +61,14 @@ impl<'fut> Scheduler<'fut> {
handle.await.ok();
}));
// NOTE: We might be resuming lua futures, need to signal that a
// new background future is ready to break out of futures resumption
if self.futures_signal.receiver_count() > 0 {
self.futures_signal
.send(SchedulerMessage::SpawnedBackgroundFuture)
.ok();
}
rx
}
@ -84,6 +92,14 @@ impl<'fut> Scheduler<'fut> {
tx.send(res).ok();
}));
// NOTE: We might be resuming lua futures, need to signal that a
// new background future is ready to break out of futures resumption
if self.futures_signal.receiver_count() > 0 {
self.futures_signal
.send(SchedulerMessage::SpawnedBackgroundFuture)
.ok();
}
rx
}
@ -121,6 +137,14 @@ impl<'fut> Scheduler<'fut> {
}
}));
// NOTE: We might be resuming background futures, need to signal that a
// new background future is ready to break out of futures resumption
if self.futures_signal.receiver_count() > 0 {
self.futures_signal
.send(SchedulerMessage::SpawnedLuaFuture)
.ok();
}
Ok(())
}
}

View file

@ -13,15 +13,13 @@ use super::Scheduler;
impl<'fut> Scheduler<'fut> {
/**
Runs all lua threads to completion.
Returns the number of threads that were resumed.
*/
fn run_lua_threads(&self, lua: &Lua) -> usize {
fn run_lua_threads(&self, lua: &Lua) {
if self.state.has_exit_code() {
return 0;
return;
}
let mut resumed_count = 0;
let mut count = 0;
// Pop threads from the scheduler until there are none left
while let Some(thread) = self
@ -44,7 +42,7 @@ impl<'fut> Scheduler<'fut> {
let res = thread.resume::<_, LuaMultiValue>(args);
self.state.set_current_thread_id(None);
resumed_count += 1;
count += 1;
// If we got any resumption (lua-side) error, increment
// the error count of the scheduler so we can exit with
@ -79,54 +77,125 @@ impl<'fut> Scheduler<'fut> {
}
}
resumed_count
if count > 0 {
debug! {
%count,
"resumed lua"
}
}
}
/**
Runs futures until none are left or a future spawned a new lua thread.
Runs the next lua future to completion.
Panics if no lua future is queued.
*/
async fn run_futures_lua(&self) -> usize {
async fn run_future_lua(&self) {
let mut futs = self
.futures_lua
.try_lock()
.expect("Failed to lock lua futures for resumption");
let mut fut_count = 0;
while futs.next().await.is_some() {
fut_count += 1;
if self.has_thread() {
break;
}
}
fut_count
assert!(futs.len() > 0, "No lua futures are queued");
futs.next().await;
}
/**
Runs background futures until none are left or a future spawned a new lua thread.
Runs the next background future to completion.
Panics if no background future is queued.
*/
async fn run_futures_background(&self) -> usize {
async fn run_future_background(&self) {
let mut futs = self
.futures_background
.try_lock()
.expect("Failed to lock background futures for resumption");
let mut fut_count = 0;
while futs.next().await.is_some() {
fut_count += 1;
if self.has_thread() {
break;
}
}
fut_count
assert!(futs.len() > 0, "No background futures are queued");
futs.next().await;
}
async fn run_futures(&self) -> usize {
let mut rx = self.futures_break_signal.subscribe();
/**
Runs as many futures as possible, until a new lua thread
is ready, or an exit code has been set for the scheduler.
tokio::select! {
ran = self.run_futures_lua() => ran,
ran = self.run_futures_background() => ran,
_ = rx.recv() => 0,
### Implementation details
Running futures on our scheduler consists of a couple moving parts:
1. An unordered futures queue for lua (main thread, local) futures
2. An unordered futures queue for background (multithreaded, 'static lifetime) futures
3. A signal for breaking out of futures resumption
The two unordered futures queues need to run concurrently,
but since `FuturesUnordered` returns instantly if it does
not currently have any futures queued on it, we need to do
this branching loop, checking if each queue has futures first.
We also need to listen for our signal, to see if we should break out of resumption:
* Always break out of resumption if a new lua thread is ready
* Always break out of resumption if an exit code has been set
* Break out of lua futures resumption if we have a new background future
* Break out of background futures resumption if we have a new lua future
We need to listen for both future queues concurrently,
and break out whenever the other corresponding queue has
a new future, since the other queue may resume sooner.
*/
async fn run_futures(&self) {
let (mut has_lua, mut has_background) = self.has_futures();
if !has_lua && !has_background {
return;
}
let mut rx = self.futures_signal.subscribe();
let mut count = 0;
while has_lua || has_background {
if has_lua && has_background {
tokio::select! {
_ = self.run_future_lua() => {},
_ = self.run_future_background() => {},
msg = rx.recv() => {
if let Ok(msg) = msg {
if msg.should_break_futures() {
break;
}
}
}
}
count += 1;
} else if has_lua {
tokio::select! {
_ = self.run_future_lua() => {},
msg = rx.recv() => {
if let Ok(msg) = msg {
if msg.should_break_lua_futures() {
break;
}
}
}
}
count += 1;
} else if has_background {
tokio::select! {
_ = self.run_future_background() => {},
msg = rx.recv() => {
if let Ok(msg) = msg {
if msg.should_break_background_futures() {
break;
}
}
}
}
count += 1;
}
(has_lua, has_background) = self.has_futures();
}
if count > 0 {
debug! {
%count,
"resumed lua futures"
}
}
}
@ -147,10 +216,7 @@ impl<'fut> Scheduler<'fut> {
loop {
// 1. Run lua threads until exit or there are none left
let lua_count = self.run_lua_threads(lua);
if lua_count > 0 {
debug!("Ran {lua_count} lua threads");
}
self.run_lua_threads(lua);
// 2. If we got a manual exit code from lua we should
// not try to wait for any pending futures to complete
@ -161,10 +227,7 @@ impl<'fut> Scheduler<'fut> {
// 3. Keep resuming futures until there are no futures left to
// resume, or until we manually break out of resumption for any
// reason, this may be because a future spawned a new lua thread
let fut_count = self.run_futures().await;
if fut_count > 0 {
debug!("Ran {fut_count} futures");
}
self.run_futures().await;
// 4. Once again, check for an exit code, in case a future sets one
if self.state.has_exit_code() {
@ -180,13 +243,16 @@ impl<'fut> Scheduler<'fut> {
}
if let Some(code) = self.state.exit_code() {
debug!("Scheduler ran to completion, exit code {}", code);
debug! {
%code,
"scheduler ran to completion"
};
ExitCode::from(code)
} else if self.state.has_errored() {
debug!("Scheduler ran to completion, with failure");
debug!("scheduler ran to completion, with failure");
ExitCode::FAILURE
} else {
debug!("Scheduler ran to completion, with success");
debug!("scheduler ran to completion, with success");
ExitCode::SUCCESS
}
}

View file

@ -4,7 +4,7 @@ use mlua::prelude::*;
use super::{
thread::{SchedulerThread, SchedulerThreadId, SchedulerThreadSender},
IntoLuaThread, Scheduler,
IntoLuaThread, Scheduler, SchedulerMessage,
};
impl<'fut> Scheduler<'fut> {
@ -61,8 +61,10 @@ impl<'fut> Scheduler<'fut> {
// NOTE: We might be resuming futures, need to signal that a
// new lua thread is ready to break out of futures resumption
if self.futures_break_signal.receiver_count() > 0 {
self.futures_break_signal.send(()).ok();
if self.futures_signal.receiver_count() > 0 {
self.futures_signal
.send(SchedulerMessage::PushedLuaThread)
.ok();
}
Ok(())
@ -100,8 +102,10 @@ impl<'fut> Scheduler<'fut> {
// NOTE: We might be resuming futures, need to signal that a
// new lua thread is ready to break out of futures resumption
if self.futures_break_signal.receiver_count() > 0 {
self.futures_break_signal.send(()).ok();
if self.futures_signal.receiver_count() > 0 {
self.futures_signal
.send(SchedulerMessage::PushedLuaThread)
.ok();
}
Ok(thread_id)
@ -139,8 +143,10 @@ impl<'fut> Scheduler<'fut> {
// NOTE: We might be resuming futures, need to signal that a
// new lua thread is ready to break out of futures resumption
if self.futures_break_signal.receiver_count() > 0 {
self.futures_break_signal.send(()).ok();
if self.futures_signal.receiver_count() > 0 {
self.futures_signal
.send(SchedulerMessage::PushedLuaThread)
.ok();
}
Ok(thread_id)

View file

@ -0,0 +1,21 @@
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
pub enum SchedulerMessage {
ExitCodeSet,
PushedLuaThread,
SpawnedLuaFuture,
SpawnedBackgroundFuture,
}
impl SchedulerMessage {
pub fn should_break_futures(self) -> bool {
matches!(self, Self::ExitCodeSet | Self::PushedLuaThread)
}
pub fn should_break_lua_futures(self) -> bool {
self.should_break_futures() || matches!(self, Self::SpawnedBackgroundFuture)
}
pub fn should_break_background_futures(self) -> bool {
self.should_break_futures() || matches!(self, Self::SpawnedLuaFuture)
}
}

View file

@ -12,6 +12,7 @@ use tokio::sync::{
Mutex as AsyncMutex,
};
mod message;
mod state;
mod thread;
mod traits;
@ -20,6 +21,7 @@ mod impl_async;
mod impl_runner;
mod impl_threads;
pub use self::message::SchedulerMessage;
pub use self::thread::SchedulerThreadId;
pub use self::traits::*;
@ -43,7 +45,7 @@ pub(crate) struct Scheduler<'fut> {
thread_senders: Arc<RefCell<HashMap<SchedulerThreadId, SchedulerThreadSender>>>,
futures_lua: Arc<AsyncMutex<FuturesUnordered<SchedulerFuture<'fut>>>>,
futures_background: Arc<AsyncMutex<FuturesUnordered<SchedulerFuture<'static>>>>,
futures_break_signal: Sender<()>,
futures_signal: Sender<SchedulerMessage>,
}
impl<'fut> Scheduler<'fut> {
@ -51,7 +53,7 @@ impl<'fut> Scheduler<'fut> {
Creates a new scheduler.
*/
pub fn new() -> Self {
let (futures_break_signal, _) = channel(1);
let (futures_signal, _) = channel(1);
Self {
state: Arc::new(SchedulerState::new()),
@ -59,7 +61,7 @@ impl<'fut> Scheduler<'fut> {
thread_senders: Arc::new(RefCell::new(HashMap::new())),
futures_lua: Arc::new(AsyncMutex::new(FuturesUnordered::new())),
futures_background: Arc::new(AsyncMutex::new(FuturesUnordered::new())),
futures_break_signal,
futures_signal,
}
}
@ -95,7 +97,8 @@ impl<'fut> Scheduler<'fut> {
self.state.exit_code().is_none(),
"Exit code may only be set exactly once"
);
self.state.set_exit_code(code.into())
self.state.set_exit_code(code.into());
self.futures_signal.send(SchedulerMessage::ExitCodeSet).ok();
}
#[doc(hidden)]