Fix new lua threads not breaking scheduler out of futures resumption

This commit is contained in:
Filip Tibell 2023-08-20 14:01:34 -05:00
parent ee21921601
commit 3ab15e63e8
3 changed files with 45 additions and 7 deletions

View file

@ -89,12 +89,25 @@ where
async fn run_futures(&self) -> bool { async fn run_futures(&self) -> bool {
let mut resumed_any = false; let mut resumed_any = false;
let mut futs = self loop {
.futures let mut rx = self.new_thread_ready.subscribe();
.try_lock()
.expect("Failed to lock futures queue"); let mut futs = self
while futs.next().await.is_some() { .futures
resumed_any = true; .try_lock()
.expect("Failed to lock futures queue");
// Wait until we either get a new lua thread or a future completes
tokio::select! {
_res = rx.recv() => break,
res = futs.next() => {
match res {
Some(_) => resumed_any = true,
None => break,
}
},
}
if self.has_thread() { if self.has_thread() {
break; break;
} }

View file

@ -57,6 +57,12 @@ where
.context("Failed to borrow threads vec")? .context("Failed to borrow threads vec")?
.push_front(thread); .push_front(thread);
// NOTE: We might be resuming futures, need to signal that a
// new lua thread is ready to break out of futures resumption
if self.new_thread_ready.receiver_count() > 0 {
self.new_thread_ready.send(()).ok();
}
Ok(()) Ok(())
} }
@ -84,6 +90,12 @@ where
.borrow_mut() .borrow_mut()
.insert(thread_id, SchedulerThreadSender::new(1)); .insert(thread_id, SchedulerThreadSender::new(1));
// NOTE: We might be resuming futures, need to signal that a
// new lua thread is ready to break out of futures resumption
if self.new_thread_ready.receiver_count() > 0 {
self.new_thread_ready.send(()).ok();
}
Ok(thread_id) Ok(thread_id)
} }
@ -111,6 +123,12 @@ where
.borrow_mut() .borrow_mut()
.insert(thread_id, SchedulerThreadSender::new(1)); .insert(thread_id, SchedulerThreadSender::new(1));
// NOTE: We might be resuming futures, need to signal that a
// new lua thread is ready to break out of futures resumption
if self.new_thread_ready.receiver_count() > 0 {
self.new_thread_ready.send(()).ok();
}
Ok(thread_id) Ok(thread_id)
} }

View file

@ -7,7 +7,10 @@ use std::{
use futures_util::{stream::FuturesUnordered, Future}; use futures_util::{stream::FuturesUnordered, Future};
use mlua::prelude::*; use mlua::prelude::*;
use tokio::sync::Mutex as AsyncMutex; use tokio::sync::{
broadcast::{channel, Sender},
Mutex as AsyncMutex,
};
mod state; mod state;
mod thread; mod thread;
@ -40,16 +43,20 @@ pub(crate) struct Scheduler<'lua, 'fut> {
threads: Arc<RefCell<VecDeque<SchedulerThread>>>, threads: Arc<RefCell<VecDeque<SchedulerThread>>>,
thread_senders: Arc<RefCell<HashMap<SchedulerThreadId, SchedulerThreadSender>>>, thread_senders: Arc<RefCell<HashMap<SchedulerThreadId, SchedulerThreadSender>>>,
futures: Arc<AsyncMutex<FuturesUnordered<SchedulerFuture<'fut>>>>, futures: Arc<AsyncMutex<FuturesUnordered<SchedulerFuture<'fut>>>>,
new_thread_ready: Sender<()>,
} }
impl<'lua, 'fut> Scheduler<'lua, 'fut> { impl<'lua, 'fut> Scheduler<'lua, 'fut> {
pub fn new(lua: &'lua Lua) -> Self { pub fn new(lua: &'lua Lua) -> Self {
let (new_thread_ready, _) = channel(1);
let this = Self { let this = Self {
lua, lua,
state: Arc::new(SchedulerState::new()), state: Arc::new(SchedulerState::new()),
threads: Arc::new(RefCell::new(VecDeque::new())), threads: Arc::new(RefCell::new(VecDeque::new())),
thread_senders: Arc::new(RefCell::new(HashMap::new())), thread_senders: Arc::new(RefCell::new(HashMap::new())),
futures: Arc::new(AsyncMutex::new(FuturesUnordered::new())), futures: Arc::new(AsyncMutex::new(FuturesUnordered::new())),
new_thread_ready,
}; };
// Propagate errors given to the scheduler back to their lua threads // Propagate errors given to the scheduler back to their lua threads