Implement futures resumption

This commit is contained in:
Filip Tibell 2023-08-16 22:40:59 -05:00
parent d1a2dc2fa6
commit eafc42531f
3 changed files with 56 additions and 6 deletions

View file

@ -1,5 +1,6 @@
use std::{process::ExitCode, sync::Arc};
use futures_util::StreamExt;
use mlua::prelude::*;
use tokio::task::LocalSet;
@ -51,6 +52,28 @@ impl SchedulerImpl {
resumed_any
}
/**
Runs futures until none are left or a future spawned a new lua thread.
Returns `true` if any future was resumed, `false` otherwise.
*/
async fn run_futures(&self) -> bool {
let mut resumed_any = false;
let mut futs = self
.futures
.try_lock()
.expect("Failed to lock futures for resumption");
while futs.next().await.is_some() {
resumed_any = true;
if self.has_thread() {
break;
}
}
resumed_any
}
/**
Runs the scheduler to completion in a [`LocalSet`],
both normal lua threads and futures, prioritizing
@ -71,14 +94,13 @@ impl SchedulerImpl {
break;
}
// 3. Wait for the next future to complete, this may
// add more lua threads to run in the next iteration
// TODO: Implement futures resumption
// 3. Keep resuming futures until we get a new lua thread to
// resume, or until we don't have any futures left to wait for
let resumed_fut = self.run_futures().await;
// 4. If we did not resume any lua threads, and we have no futures
// queued either, we have now run the scheduler until completion
if !resumed_lua {
// remaining either, we have now run the scheduler until completion
if !resumed_lua && !resumed_fut {
break;
}
}

View file

@ -1,3 +1,5 @@
use std::sync::Arc;
use mlua::prelude::*;
use super::{
@ -7,6 +9,17 @@ use super::{
};
impl<'lua> SchedulerImpl {
/**
Checks if there are any lua threads to run.
*/
pub(super) fn has_thread(&self) -> bool {
!self
.threads
.try_borrow()
.expect("Failed to borrow threads vec")
.is_empty()
}
/**
Pops the next thread to run, from the front of the scheduler.
@ -111,6 +124,16 @@ impl<'lua> SchedulerImpl {
.lua
.registry_value::<Vec<LuaValue>>(&k)
.expect("Received invalid registry key for thread");
// NOTE: This is not strictly necessary, mlua can clean
// up registry values on its own, but doing this will add
// some extra safety and clean up registry values faster
if let Some(key) = Arc::into_inner(k) {
self.lua
.remove_registry_value(key)
.expect("Failed to remove registry key for thread");
}
Ok(LuaMultiValue::from_vec(vals))
}
}

View file

@ -2,10 +2,13 @@ use std::{
cell::RefCell,
collections::{HashMap, VecDeque},
ops::Deref,
pin::Pin,
sync::Arc,
};
use futures_util::{stream::FuturesUnordered, Future};
use mlua::prelude::*;
use tokio::sync::Mutex as AsyncMutex;
mod state;
mod thread;
@ -66,6 +69,7 @@ pub struct SchedulerImpl {
state: SchedulerState,
threads: RefCell<VecDeque<SchedulerThread>>,
thread_senders: RefCell<HashMap<SchedulerThreadId, SchedulerThreadSender>>,
futures: AsyncMutex<FuturesUnordered<Pin<Box<dyn Future<Output = ()>>>>>,
}
impl SchedulerImpl {
@ -75,6 +79,7 @@ impl SchedulerImpl {
state: SchedulerState::new(),
threads: RefCell::new(VecDeque::new()),
thread_senders: RefCell::new(HashMap::new()),
futures: AsyncMutex::new(FuturesUnordered::new()),
}
}
}