diff --git a/src/lune/scheduler/impl_runner.rs b/src/lune/scheduler/impl_runner.rs index f822657..80e21e0 100644 --- a/src/lune/scheduler/impl_runner.rs +++ b/src/lune/scheduler/impl_runner.rs @@ -1,5 +1,6 @@ use std::{process::ExitCode, sync::Arc}; +use futures_util::StreamExt; use mlua::prelude::*; use tokio::task::LocalSet; @@ -51,6 +52,28 @@ impl SchedulerImpl { resumed_any } + /** + Runs futures until none are left or a future spawned a new lua thread. + + Returns `true` if any future was resumed, `false` otherwise. + */ + async fn run_futures(&self) -> bool { + let mut resumed_any = false; + + let mut futs = self + .futures + .try_lock() + .expect("Failed to lock futures for resumption"); + while futs.next().await.is_some() { + resumed_any = true; + if self.has_thread() { + break; + } + } + + resumed_any + } + /** Runs the scheduler to completion in a [`LocalSet`], both normal lua threads and futures, prioritizing @@ -71,14 +94,13 @@ impl SchedulerImpl { break; } - // 3. Wait for the next future to complete, this may - // add more lua threads to run in the next iteration - - // TODO: Implement futures resumption + // 3. Keep resuming futures until we get a new lua thread to + // resume, or until we don't have any futures left to wait for + let resumed_fut = self.run_futures().await; // 4. If we did not resume any lua threads, and we have no futures - // queued either, we have now run the scheduler until completion - if !resumed_lua { + // remaining either, we have now run the scheduler until completion + if !resumed_lua && !resumed_fut { break; } } diff --git a/src/lune/scheduler/impl_threads.rs b/src/lune/scheduler/impl_threads.rs index 7343595..5377dcc 100644 --- a/src/lune/scheduler/impl_threads.rs +++ b/src/lune/scheduler/impl_threads.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use mlua::prelude::*; use super::{ @@ -7,6 +9,17 @@ use super::{ }; impl<'lua> SchedulerImpl { + /** + Checks if there are any lua threads to run. + */ + pub(super) fn has_thread(&self) -> bool { + !self + .threads + .try_borrow() + .expect("Failed to borrow threads vec") + .is_empty() + } + /** Pops the next thread to run, from the front of the scheduler. @@ -111,6 +124,16 @@ impl<'lua> SchedulerImpl { .lua .registry_value::>(&k) .expect("Received invalid registry key for thread"); + + // NOTE: This is not strictly necessary, mlua can clean + // up registry values on its own, but doing this will add + // some extra safety and clean up registry values faster + if let Some(key) = Arc::into_inner(k) { + self.lua + .remove_registry_value(key) + .expect("Failed to remove registry key for thread"); + } + Ok(LuaMultiValue::from_vec(vals)) } } diff --git a/src/lune/scheduler/mod.rs b/src/lune/scheduler/mod.rs index 0ce3de7..0ada0c2 100644 --- a/src/lune/scheduler/mod.rs +++ b/src/lune/scheduler/mod.rs @@ -2,10 +2,13 @@ use std::{ cell::RefCell, collections::{HashMap, VecDeque}, ops::Deref, + pin::Pin, sync::Arc, }; +use futures_util::{stream::FuturesUnordered, Future}; use mlua::prelude::*; +use tokio::sync::Mutex as AsyncMutex; mod state; mod thread; @@ -66,6 +69,7 @@ pub struct SchedulerImpl { state: SchedulerState, threads: RefCell>, thread_senders: RefCell>, + futures: AsyncMutex>>>>, } impl SchedulerImpl { @@ -75,6 +79,7 @@ impl SchedulerImpl { state: SchedulerState::new(), threads: RefCell::new(VecDeque::new()), thread_senders: RefCell::new(HashMap::new()), + futures: AsyncMutex::new(FuturesUnordered::new()), } } }