Implement thread return value broadcasting in scheduler

This commit is contained in:
Filip Tibell 2023-08-16 22:00:15 -05:00
parent 6757e1a1a8
commit 4fa76aa27f
4 changed files with 161 additions and 41 deletions

View file

@ -1,64 +1,90 @@
use std::process::ExitCode;
use std::{process::ExitCode, sync::Arc};
use mlua::prelude::*;
use tokio::task::LocalSet;
use super::SchedulerImpl;
impl SchedulerImpl {
/**
Runs all lua threads to completion, gathering any results they produce.
*/
fn run_threads(&self) -> Vec<LuaResult<()>> {
let mut results = Vec::new();
Runs all lua threads to completion.
while let Some((thread, args)) = self
Returns `true` if any thread was resumed, `false` otherwise.
*/
fn run_lua_threads(&self) -> bool {
if self.state.has_exit_code() {
return false;
}
let mut resumed_any = false;
while let Some((thread, args, sender)) = self
.pop_thread()
.expect("Failed to pop thread from scheduler")
{
let res = thread.resume(args);
let res = thread.resume::<_, LuaMultiValue>(args);
self.state.add_resumption();
resumed_any = true;
if let Err(e) = &res {
if let Err(err) = &res {
self.state.add_error();
eprintln!("{e}"); // TODO: Pretty print the lua error here
eprint!("{err}"); // TODO: Pretty print the lua error here
}
results.push(res);
if sender.receiver_count() > 0 {
sender
.send(res.map(|v| {
Arc::new(
self.lua
.create_registry_value(v.into_vec())
.expect("Failed to store return values in registry"),
)
}))
.expect("Failed to broadcast return values of thread");
}
if self.state.has_exit_code() {
break;
}
}
results
resumed_any
}
/**
Runs the scheduler to completion, both normal lua threads and futures.
Runs the scheduler to completion in a [`LocalSet`],
both normal lua threads and futures, prioritizing
lua threads over completion of any pending futures.
This will emit lua output and errors to stdout and stderr.
Will emit lua output and errors to stdout and stderr.
*/
pub async fn run_to_completion(&self) -> ExitCode {
loop {
// 1. Run lua threads until exit or there are none left
let results = self.run_threads();
let fut = async move {
loop {
// 1. Run lua threads until exit or there are none left,
// if any thread was resumed it may have spawned futures
let resumed_lua = self.run_lua_threads();
// 2. If we got a manual exit code from lua we should not continue
if self.state.has_exit_code() {
break;
// 2. If we got a manual exit code from lua we should
// not try to wait for any pending futures to complete
if self.state.has_exit_code() {
break;
}
// 3. Wait for the next future to complete, this may
// add more lua threads to run in the next iteration
// TODO: Implement futures resumption
// 4. If we did not resume any lua threads, and we have no futures
// queued either, we have now run the scheduler until completion
if !resumed_lua {
break;
}
}
};
// 3. Wait for the next future to complete, this may
// add more lua threads to run in the next iteration
// TODO: Implement this
// 4. If did not resume any lua threads, and we have no futures
// queued either, we have run the scheduler until completion
if results.is_empty() {
break;
}
}
LocalSet::new().run_until(fut).await;
if let Some(code) = self.state.exit_code() {
ExitCode::from(code)

View file

@ -1,6 +1,10 @@
use mlua::prelude::*;
use super::{thread::SchedulerThread, traits::IntoLuaThread, SchedulerImpl};
use super::{
thread::{SchedulerThread, SchedulerThreadId, SchedulerThreadSender},
traits::IntoLuaThread,
SchedulerImpl,
};
impl<'lua> SchedulerImpl {
/**
@ -10,7 +14,7 @@ impl<'lua> SchedulerImpl {
*/
pub(super) fn pop_thread(
&'lua self,
) -> LuaResult<Option<(LuaThread<'lua>, LuaMultiValue<'lua>)>> {
) -> LuaResult<Option<(LuaThread<'lua>, LuaMultiValue<'lua>, SchedulerThreadSender)>> {
match self
.threads
.try_borrow_mut()
@ -19,8 +23,14 @@ impl<'lua> SchedulerImpl {
.pop_front()
{
Some(thread) => {
let thread_id = &thread.id();
let (thread, args) = thread.into_inner(&self.lua);
Ok(Some((thread, args)))
let sender = self
.thread_senders
.borrow_mut()
.remove(thread_id)
.expect("Missing thread sender");
Ok(Some((thread, args, sender)))
}
None => Ok(None),
}
@ -34,17 +44,23 @@ impl<'lua> SchedulerImpl {
&'lua self,
thread: impl IntoLuaThread<'lua>,
args: impl IntoLuaMulti<'lua>,
) -> LuaResult<()> {
) -> LuaResult<SchedulerThreadId> {
let thread = thread.into_lua_thread(&self.lua)?;
let args = args.into_lua_multi(&self.lua)?;
let thread = SchedulerThread::new(&self.lua, thread, args)?;
let thread_id = thread.id();
self.threads
.try_borrow_mut()
.into_lua_err()
.context("Failed to borrow threads vec")?
.push_front(SchedulerThread::new(&self.lua, thread, args)?);
.push_front(thread);
self.thread_senders
.borrow_mut()
.insert(thread_id, SchedulerThreadSender::new(1));
Ok(())
Ok(thread_id)
}
/**
@ -55,16 +71,48 @@ impl<'lua> SchedulerImpl {
&'lua self,
thread: impl IntoLuaThread<'lua>,
args: impl IntoLuaMulti<'lua>,
) -> LuaResult<()> {
) -> LuaResult<SchedulerThreadId> {
let thread = thread.into_lua_thread(&self.lua)?;
let args = args.into_lua_multi(&self.lua)?;
let thread = SchedulerThread::new(&self.lua, thread, args)?;
let thread_id = thread.id();
self.threads
.try_borrow_mut()
.into_lua_err()
.context("Failed to borrow threads vec")?
.push_back(SchedulerThread::new(&self.lua, thread, args)?);
.push_back(thread);
self.thread_senders
.borrow_mut()
.insert(thread_id, SchedulerThreadSender::new(1));
Ok(())
Ok(thread_id)
}
/**
Waits for the given thread to finish running, and returns its result.
*/
pub async fn wait_for_thread(
&'lua self,
thread_id: SchedulerThreadId,
) -> LuaResult<LuaMultiValue<'lua>> {
let mut recv = {
let senders = self.thread_senders.borrow();
let sender = senders
.get(&thread_id)
.expect("Tried to wait for thread that is not queued");
sender.subscribe()
};
match recv.recv().await.expect("Failed to receive thread result") {
Err(e) => Err(e),
Ok(k) => {
let vals = self
.lua
.registry_value::<Vec<LuaValue>>(&k)
.expect("Received invalid registry key for thread");
Ok(LuaMultiValue::from_vec(vals))
}
}
}
}

View file

@ -1,4 +1,9 @@
use std::{cell::RefCell, collections::VecDeque, ops::Deref, sync::Arc};
use std::{
cell::RefCell,
collections::{HashMap, VecDeque},
ops::Deref,
sync::Arc,
};
use mlua::prelude::*;
@ -9,7 +14,10 @@ mod traits;
mod impl_runner;
mod impl_threads;
use self::{state::SchedulerState, thread::SchedulerThread};
use self::{
state::SchedulerState,
thread::{SchedulerThread, SchedulerThreadId, SchedulerThreadSender},
};
/**
Scheduler for Lua threads.
@ -57,6 +65,7 @@ pub struct SchedulerImpl {
lua: Arc<Lua>,
state: SchedulerState,
threads: RefCell<VecDeque<SchedulerThread>>,
thread_senders: RefCell<HashMap<SchedulerThreadId, SchedulerThreadSender>>,
}
impl SchedulerImpl {
@ -65,6 +74,7 @@ impl SchedulerImpl {
lua,
state: SchedulerState::new(),
threads: RefCell::new(VecDeque::new()),
thread_senders: RefCell::new(HashMap::new()),
}
}
}

View file

@ -1,10 +1,38 @@
use std::sync::Arc;
use mlua::prelude::*;
use rand::Rng;
use tokio::sync::broadcast::Sender;
/**
Type alias for a broadcast [`Sender`], which will
broadcast the result and return values of a lua thread.
The return values are stored in the lua registry as a
`Vec<LuaValue<'_>>`, and the registry key pointing to
those values will be sent using the broadcast sender.
*/
pub type SchedulerThreadSender = Sender<LuaResult<Arc<LuaRegistryKey>>>;
/**
Unique, randomly generated id for a scheduler thread.
*/
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
pub struct SchedulerThreadId(u128);
impl SchedulerThreadId {
fn gen() -> Self {
// FUTURE: Use a faster rng here?
Self(rand::thread_rng().gen())
}
}
/**
Container for registry keys that point to a thread and thread arguments.
*/
#[derive(Debug)]
pub(super) struct SchedulerThread {
scheduler_id: SchedulerThreadId,
key_thread: LuaRegistryKey,
key_args: LuaRegistryKey,
}
@ -30,6 +58,7 @@ impl SchedulerThread {
.context("Failed to store value in registry")?;
Ok(Self {
scheduler_id: SchedulerThreadId::gen(),
key_thread,
key_args,
})
@ -55,4 +84,11 @@ impl SchedulerThread {
(thread, args)
}
/**
Retrieves the unique, randomly generated id for this scheduler thread.
*/
pub(super) fn id(&self) -> SchedulerThreadId {
self.scheduler_id
}
}