diff --git a/src/lune/scheduler/impl_runner.rs b/src/lune/scheduler/impl_runner.rs index 43b096b..f3b2f29 100644 --- a/src/lune/scheduler/impl_runner.rs +++ b/src/lune/scheduler/impl_runner.rs @@ -89,12 +89,25 @@ where async fn run_futures(&self) -> bool { let mut resumed_any = false; - let mut futs = self - .futures - .try_lock() - .expect("Failed to lock futures queue"); - while futs.next().await.is_some() { - resumed_any = true; + loop { + let mut rx = self.new_thread_ready.subscribe(); + + let mut futs = self + .futures + .try_lock() + .expect("Failed to lock futures queue"); + + // Wait until we either get a new lua thread or a future completes + tokio::select! { + _res = rx.recv() => break, + res = futs.next() => { + match res { + Some(_) => resumed_any = true, + None => break, + } + }, + } + if self.has_thread() { break; } diff --git a/src/lune/scheduler/impl_threads.rs b/src/lune/scheduler/impl_threads.rs index 765bbb7..52cbe12 100644 --- a/src/lune/scheduler/impl_threads.rs +++ b/src/lune/scheduler/impl_threads.rs @@ -57,6 +57,12 @@ where .context("Failed to borrow threads vec")? .push_front(thread); + // NOTE: We might be resuming futures, need to signal that a + // new lua thread is ready to break out of futures resumption + if self.new_thread_ready.receiver_count() > 0 { + self.new_thread_ready.send(()).ok(); + } + Ok(()) } @@ -84,6 +90,12 @@ where .borrow_mut() .insert(thread_id, SchedulerThreadSender::new(1)); + // NOTE: We might be resuming futures, need to signal that a + // new lua thread is ready to break out of futures resumption + if self.new_thread_ready.receiver_count() > 0 { + self.new_thread_ready.send(()).ok(); + } + Ok(thread_id) } @@ -111,6 +123,12 @@ where .borrow_mut() .insert(thread_id, SchedulerThreadSender::new(1)); + // NOTE: We might be resuming futures, need to signal that a + // new lua thread is ready to break out of futures resumption + if self.new_thread_ready.receiver_count() > 0 { + self.new_thread_ready.send(()).ok(); + } + Ok(thread_id) } diff --git a/src/lune/scheduler/mod.rs b/src/lune/scheduler/mod.rs index 3e6a58e..b00e2b8 100644 --- a/src/lune/scheduler/mod.rs +++ b/src/lune/scheduler/mod.rs @@ -7,7 +7,10 @@ use std::{ use futures_util::{stream::FuturesUnordered, Future}; use mlua::prelude::*; -use tokio::sync::Mutex as AsyncMutex; +use tokio::sync::{ + broadcast::{channel, Sender}, + Mutex as AsyncMutex, +}; mod state; mod thread; @@ -40,16 +43,20 @@ pub(crate) struct Scheduler<'lua, 'fut> { threads: Arc>>, thread_senders: Arc>>, futures: Arc>>>, + new_thread_ready: Sender<()>, } impl<'lua, 'fut> Scheduler<'lua, 'fut> { pub fn new(lua: &'lua Lua) -> Self { + let (new_thread_ready, _) = channel(1); + let this = Self { lua, state: Arc::new(SchedulerState::new()), threads: Arc::new(RefCell::new(VecDeque::new())), thread_senders: Arc::new(RefCell::new(HashMap::new())), futures: Arc::new(AsyncMutex::new(FuturesUnordered::new())), + new_thread_ready, }; // Propagate errors given to the scheduler back to their lua threads