Get rid of lua lifetime in scheduler, add scheduler documentation

This commit is contained in:
Filip Tibell 2023-08-21 10:47:43 -05:00
parent 8e4fc4b65e
commit acae6a6369
9 changed files with 95 additions and 67 deletions

View file

@ -60,6 +60,7 @@ impl Service<Request<Body>> for NetServiceInner {
let ws = ws.await.into_lua_err()?; let ws = ws.await.into_lua_err()?;
let sock = NetWebSocket::new(ws).into_lua_table(lua)?; let sock = NetWebSocket::new(ws).into_lua_table(lua)?;
sched.push_front( sched.push_front(
lua,
lua.create_thread(handler)?, lua.create_thread(handler)?,
LuaMultiValue::from_vec(vec![LuaValue::Table(sock)]), LuaMultiValue::from_vec(vec![LuaValue::Table(sock)]),
) )

View file

@ -42,7 +42,7 @@ pub fn create(lua: &'static Lua) -> LuaResult<LuaTable<'_>> {
let sched = lua let sched = lua
.app_data_ref::<&Scheduler>() .app_data_ref::<&Scheduler>()
.expect("Lua struct is missing scheduler"); .expect("Lua struct is missing scheduler");
sched.push_front(thread.clone(), args)?; sched.push_front(lua, thread.clone(), args)?;
Ok(thread) Ok(thread)
})?; })?;
let task_spawn_env = TableBuilder::new(lua)? let task_spawn_env = TableBuilder::new(lua)?
@ -85,7 +85,7 @@ fn task_defer<'lua>(
let sched = lua let sched = lua
.app_data_ref::<&Scheduler>() .app_data_ref::<&Scheduler>()
.expect("Lua struct is missing scheduler"); .expect("Lua struct is missing scheduler");
sched.push_back(thread.clone(), args)?; sched.push_back(lua, thread.clone(), args)?;
Ok(thread) Ok(thread)
} }
@ -105,10 +105,10 @@ where
.expect("Lua struct is missing scheduler"); .expect("Lua struct is missing scheduler");
let thread2 = thread.clone(); let thread2 = thread.clone();
sched.spawn_thread(thread.clone(), async move { sched.spawn_thread(lua, thread.clone(), async move {
let duration = Duration::from_secs_f64(secs); let duration = Duration::from_secs_f64(secs);
time::sleep(duration).await; time::sleep(duration).await;
sched.push_back(thread2, args)?; sched.push_back(lua, thread2, args)?;
Ok(()) Ok(())
})?; })?;

View file

@ -188,8 +188,8 @@ impl<'lua> RequireContext<'lua> {
.into_lua_thread(self.lua)?; .into_lua_thread(self.lua)?;
// Schedule the thread to run, wait for it to finish running // Schedule the thread to run, wait for it to finish running
let thread_id = sched.push_back(file_thread, ())?; let thread_id = sched.push_back(self.lua, file_thread, ())?;
let thread_res = sched.wait_for_thread(thread_id).await; let thread_res = sched.wait_for_thread(self.lua, thread_id).await;
// Return the result of the thread, storing any lua value(s) in the registry // Return the result of the thread, storing any lua value(s) in the registry
match thread_res { match thread_res {

View file

@ -6,7 +6,7 @@ mod globals;
mod scheduler; mod scheduler;
mod util; mod util;
use self::scheduler::Scheduler; use self::scheduler::{LuaSchedulerExt, Scheduler};
pub use error::LuneError; pub use error::LuneError;
use mlua::Lua; use mlua::Lua;
@ -14,7 +14,7 @@ use mlua::Lua;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct Lune { pub struct Lune {
lua: &'static Lua, lua: &'static Lua,
scheduler: &'static Scheduler<'static, 'static>, scheduler: &'static Scheduler<'static>,
args: Vec<String>, args: Vec<String>,
} }
@ -28,9 +28,9 @@ impl Lune {
// any way for us to create a scheduler, store it in app data, and // any way for us to create a scheduler, store it in app data, and
// guarantee it has the same lifetime as Lua without using any unsafe? // guarantee it has the same lifetime as Lua without using any unsafe?
let lua = Lua::new().into_static(); let lua = Lua::new().into_static();
let scheduler = Scheduler::new(lua).into_static(); let scheduler = Scheduler::new().into_static();
lua.set_app_data(scheduler); lua.set_scheduler(scheduler);
globals::inject_all(lua).expect("Failed to inject lua globals"); globals::inject_all(lua).expect("Failed to inject lua globals");
Self { Self {
@ -65,7 +65,7 @@ impl Lune {
.load(script_contents.as_ref()) .load(script_contents.as_ref())
.set_name(script_name.as_ref()); .set_name(script_name.as_ref());
self.scheduler.push_back(main, ())?; self.scheduler.push_back(self.lua, main, ())?;
Ok(self.scheduler.run_to_completion().await) Ok(self.scheduler.run_to_completion(self.lua).await)
} }
} }

View file

@ -7,10 +7,7 @@ use tokio::{
use super::{IntoLuaThread, Scheduler}; use super::{IntoLuaThread, Scheduler};
impl<'lua, 'fut> Scheduler<'lua, 'fut> impl<'fut> Scheduler<'fut> {
where
'lua: 'fut,
{
/** /**
Checks if there are any futures to run, for Checks if there are any futures to run, for
lua futures and background futures respectively. lua futures and background futures respectively.
@ -97,6 +94,7 @@ where
*/ */
pub fn spawn_thread<F, FR>( pub fn spawn_thread<F, FR>(
&'fut self, &'fut self,
lua: &'fut Lua,
thread: impl IntoLuaThread<'fut>, thread: impl IntoLuaThread<'fut>,
fut: F, fut: F,
) -> LuaResult<()> ) -> LuaResult<()>
@ -104,20 +102,20 @@ where
FR: IntoLuaMulti<'fut>, FR: IntoLuaMulti<'fut>,
F: Future<Output = LuaResult<FR>> + 'fut, F: Future<Output = LuaResult<FR>> + 'fut,
{ {
let thread = thread.into_lua_thread(self.lua)?; let thread = thread.into_lua_thread(lua)?;
let futs = self.futures_lua.try_lock().expect( let futs = self.futures_lua.try_lock().expect(
"Failed to lock futures queue - \ "Failed to lock futures queue - \
can't schedule future lua threads during futures resumption", can't schedule future lua threads during futures resumption",
); );
futs.push(Box::pin(async move { futs.push(Box::pin(async move {
match fut.await.and_then(|rets| rets.into_lua_multi(self.lua)) { match fut.await.and_then(|rets| rets.into_lua_multi(lua)) {
Err(e) => { Err(e) => {
self.push_err(thread, e) self.push_err(lua, thread, e)
.expect("Failed to schedule future err thread"); .expect("Failed to schedule future err thread");
} }
Ok(v) => { Ok(v) => {
self.push_back(thread, v) self.push_back(lua, thread, v)
.expect("Failed to schedule future thread"); .expect("Failed to schedule future thread");
} }
} }

View file

@ -9,16 +9,13 @@ use crate::lune::util::traits::LuaEmitErrorExt;
use super::Scheduler; use super::Scheduler;
impl<'lua, 'fut> Scheduler<'lua, 'fut> impl<'fut> Scheduler<'fut> {
where
'lua: 'fut,
{
/** /**
Runs all lua threads to completion. Runs all lua threads to completion.
Returns `true` if any thread was resumed, `false` otherwise. Returns `true` if any thread was resumed, `false` otherwise.
*/ */
fn run_lua_threads(&self) -> bool { fn run_lua_threads(&self, lua: &Lua) -> bool {
if self.state.has_exit_code() { if self.state.has_exit_code() {
return false; return false;
} }
@ -32,7 +29,7 @@ where
{ {
// Deconstruct the scheduler thread into its parts // Deconstruct the scheduler thread into its parts
let thread_id = thread.id(); let thread_id = thread.id();
let (thread, args) = thread.into_inner(self.lua); let (thread, args) = thread.into_inner(lua);
// Make sure this thread is still resumable, it might have // Make sure this thread is still resumable, it might have
// been resumed somewhere else or even have been cancelled // been resumed somewhere else or even have been cancelled
@ -53,7 +50,7 @@ where
// a non-zero exit code, and print it out to stderr // a non-zero exit code, and print it out to stderr
if let Err(err) = &res { if let Err(err) = &res {
self.state.increment_error_count(); self.state.increment_error_count();
self.lua.emit_error(err.clone()); lua.emit_error(err.clone());
} }
// If the thread has finished running completely, // If the thread has finished running completely,
@ -65,11 +62,9 @@ where
if sender.receiver_count() > 0 { if sender.receiver_count() > 0 {
let stored = match res { let stored = match res {
Err(e) => Err(e), Err(e) => Err(e),
Ok(v) => Ok(Arc::new( Ok(v) => Ok(Arc::new(lua.create_registry_value(v.into_vec()).expect(
self.lua.create_registry_value(v.into_vec()).expect(
"Failed to store thread results in registry - out of memory", "Failed to store thread results in registry - out of memory",
), ))),
)),
}; };
sender sender
.send(stored) .send(stored)
@ -135,13 +130,17 @@ where
Will emit lua output and errors to stdout and stderr. Will emit lua output and errors to stdout and stderr.
*/ */
pub async fn run_to_completion(&self) -> ExitCode { pub async fn run_to_completion(&self, lua: &Lua) -> ExitCode {
if let Some(code) = self.state.exit_code() {
return ExitCode::from(code);
}
let set = LocalSet::new(); let set = LocalSet::new();
let _guard = set.enter(); let _guard = set.enter();
loop { loop {
// 1. Run lua threads until exit or there are none left // 1. Run lua threads until exit or there are none left
self.run_lua_threads(); self.run_lua_threads(lua);
// 2. If we got a manual exit code from lua we should // 2. If we got a manual exit code from lua we should
// not try to wait for any pending futures to complete // not try to wait for any pending futures to complete

View file

@ -7,10 +7,7 @@ use super::{
IntoLuaThread, Scheduler, IntoLuaThread, Scheduler,
}; };
impl<'lua, 'fut> Scheduler<'lua, 'fut> impl<'fut> Scheduler<'fut> {
where
'lua: 'fut,
{
/** /**
Checks if there are any lua threads to run. Checks if there are any lua threads to run.
*/ */
@ -43,11 +40,16 @@ where
/** /**
Schedules the `thread` to be resumed with the given [`LuaError`]. Schedules the `thread` to be resumed with the given [`LuaError`].
*/ */
pub fn push_err<'a>(&'a self, thread: impl IntoLuaThread<'a>, err: LuaError) -> LuaResult<()> { pub fn push_err<'a>(
let thread = thread.into_lua_thread(self.lua)?; &self,
lua: &'a Lua,
thread: impl IntoLuaThread<'a>,
err: LuaError,
) -> LuaResult<()> {
let thread = thread.into_lua_thread(lua)?;
let args = LuaMultiValue::new(); // Will be resumed with error, don't need real args let args = LuaMultiValue::new(); // Will be resumed with error, don't need real args
let thread = SchedulerThread::new(self.lua, thread, args); let thread = SchedulerThread::new(lua, thread, args);
let thread_id = thread.id(); let thread_id = thread.id();
self.state.set_thread_error(thread_id, err); self.state.set_thread_error(thread_id, err);
@ -71,14 +73,15 @@ where
right away, before any other currently scheduled threads. right away, before any other currently scheduled threads.
*/ */
pub fn push_front<'a>( pub fn push_front<'a>(
&'a self, &self,
lua: &'a Lua,
thread: impl IntoLuaThread<'a>, thread: impl IntoLuaThread<'a>,
args: impl IntoLuaMulti<'a>, args: impl IntoLuaMulti<'a>,
) -> LuaResult<SchedulerThreadId> { ) -> LuaResult<SchedulerThreadId> {
let thread = thread.into_lua_thread(self.lua)?; let thread = thread.into_lua_thread(lua)?;
let args = args.into_lua_multi(self.lua)?; let args = args.into_lua_multi(lua)?;
let thread = SchedulerThread::new(self.lua, thread, args); let thread = SchedulerThread::new(lua, thread, args);
let thread_id = thread.id(); let thread_id = thread.id();
self.threads self.threads
@ -109,14 +112,15 @@ where
after all other current threads have been resumed. after all other current threads have been resumed.
*/ */
pub fn push_back<'a>( pub fn push_back<'a>(
&'a self, &self,
lua: &'a Lua,
thread: impl IntoLuaThread<'a>, thread: impl IntoLuaThread<'a>,
args: impl IntoLuaMulti<'a>, args: impl IntoLuaMulti<'a>,
) -> LuaResult<SchedulerThreadId> { ) -> LuaResult<SchedulerThreadId> {
let thread = thread.into_lua_thread(self.lua)?; let thread = thread.into_lua_thread(lua)?;
let args = args.into_lua_multi(self.lua)?; let args = args.into_lua_multi(lua)?;
let thread = SchedulerThread::new(self.lua, thread, args); let thread = SchedulerThread::new(lua, thread, args);
let thread_id = thread.id(); let thread_id = thread.id();
self.threads self.threads
@ -145,10 +149,11 @@ where
/** /**
Waits for the given thread to finish running, and returns its result. Waits for the given thread to finish running, and returns its result.
*/ */
pub async fn wait_for_thread( pub async fn wait_for_thread<'a>(
&self, &self,
lua: &'a Lua,
thread_id: SchedulerThreadId, thread_id: SchedulerThreadId,
) -> LuaResult<LuaMultiValue<'_>> { ) -> LuaResult<LuaMultiValue<'a>> {
let mut recv = { let mut recv = {
let senders = self.thread_senders.borrow(); let senders = self.thread_senders.borrow();
let sender = senders let sender = senders
@ -163,8 +168,7 @@ where
match res { match res {
Err(e) => Err(e), Err(e) => Err(e),
Ok(k) => { Ok(k) => {
let vals = self let vals = lua
.lua
.registry_value::<Vec<LuaValue>>(&k) .registry_value::<Vec<LuaValue>>(&k)
.expect("Received invalid registry key for thread"); .expect("Received invalid registry key for thread");
@ -172,8 +176,7 @@ where
// up registry values on its own, but doing this will add // up registry values on its own, but doing this will add
// some extra safety and clean up registry values faster // some extra safety and clean up registry values faster
if let Some(key) = Arc::into_inner(k) { if let Some(key) = Arc::into_inner(k) {
self.lua lua.remove_registry_value(key)
.remove_registry_value(key)
.expect("Failed to remove registry key for thread"); .expect("Failed to remove registry key for thread");
} }

View file

@ -37,8 +37,7 @@ type SchedulerFuture<'fut> = Pin<Box<dyn Future<Output = ()> + 'fut>>;
and data will remain unchanged and accessible from all clones. and data will remain unchanged and accessible from all clones.
*/ */
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub(crate) struct Scheduler<'lua, 'fut> { pub(crate) struct Scheduler<'fut> {
lua: &'lua Lua,
state: Arc<SchedulerState>, state: Arc<SchedulerState>,
threads: Arc<RefCell<VecDeque<SchedulerThread>>>, threads: Arc<RefCell<VecDeque<SchedulerThread>>>,
thread_senders: Arc<RefCell<HashMap<SchedulerThreadId, SchedulerThreadSender>>>, thread_senders: Arc<RefCell<HashMap<SchedulerThreadId, SchedulerThreadSender>>>,
@ -47,23 +46,33 @@ pub(crate) struct Scheduler<'lua, 'fut> {
futures_break_signal: Sender<()>, futures_break_signal: Sender<()>,
} }
impl<'lua, 'fut> Scheduler<'lua, 'fut> { impl<'fut> Scheduler<'fut> {
pub fn new(lua: &'lua Lua) -> Self { /**
Creates a new scheduler.
*/
pub fn new() -> Self {
let (futures_break_signal, _) = channel(1); let (futures_break_signal, _) = channel(1);
let this = Self { Self {
lua,
state: Arc::new(SchedulerState::new()), state: Arc::new(SchedulerState::new()),
threads: Arc::new(RefCell::new(VecDeque::new())), threads: Arc::new(RefCell::new(VecDeque::new())),
thread_senders: Arc::new(RefCell::new(HashMap::new())), thread_senders: Arc::new(RefCell::new(HashMap::new())),
futures_lua: Arc::new(AsyncMutex::new(FuturesUnordered::new())), futures_lua: Arc::new(AsyncMutex::new(FuturesUnordered::new())),
futures_background: Arc::new(AsyncMutex::new(FuturesUnordered::new())), futures_background: Arc::new(AsyncMutex::new(FuturesUnordered::new())),
futures_break_signal, futures_break_signal,
}; }
}
/**
Sets the luau interrupt for this scheduler.
This will propagate errors from any lua-spawned
futures back to the lua threads that spawned them.
*/
pub fn set_interrupt_for(&self, lua: &Lua) {
// Propagate errors given to the scheduler back to their lua threads // Propagate errors given to the scheduler back to their lua threads
// FUTURE: Do profiling and anything else we need inside of this interrupt // FUTURE: Do profiling and anything else we need inside of this interrupt
let state = this.state.clone(); let state = self.state.clone();
lua.set_interrupt(move |_| { lua.set_interrupt(move |_| {
if let Some(id) = state.get_current_thread_id() { if let Some(id) = state.get_current_thread_id() {
if let Some(err) = state.get_thread_error(id) { if let Some(err) = state.get_thread_error(id) {
@ -72,10 +81,15 @@ impl<'lua, 'fut> Scheduler<'lua, 'fut> {
} }
Ok(LuaVmState::Continue) Ok(LuaVmState::Continue)
}); });
this
} }
/**
Sets the exit code for the scheduler.
This will stop the scheduler from resuming any more lua threads or futures.
Panics if the exit code is set more than once.
*/
pub fn set_exit_code(&self, code: impl Into<u8>) { pub fn set_exit_code(&self, code: impl Into<u8>) {
assert!( assert!(
self.state.exit_code().is_none(), self.state.exit_code().is_none(),

View file

@ -13,7 +13,12 @@ return yield()
for access to the scheduler without having to import for access to the scheduler without having to import
it or handle registry / app data references manually. it or handle registry / app data references manually.
*/ */
pub trait LuaSchedulerExt<'lua> { pub(crate) trait LuaSchedulerExt<'lua> {
/**
Sets the scheduler for the [`Lua`] struct.
*/
fn set_scheduler(&'lua self, scheduler: &'lua Scheduler);
/** /**
Creates a function callable from Lua that runs an async Creates a function callable from Lua that runs an async
closure and returns the results of it to the call site. closure and returns the results of it to the call site.
@ -33,6 +38,11 @@ impl<'lua> LuaSchedulerExt<'lua> for Lua
where where
'lua: 'static, 'lua: 'static,
{ {
fn set_scheduler(&'lua self, scheduler: &'lua Scheduler) {
self.set_app_data(scheduler);
scheduler.set_interrupt_for(self);
}
fn create_async_function<A, R, F, FR>(&'lua self, func: F) -> LuaResult<LuaFunction<'lua>> fn create_async_function<A, R, F, FR>(&'lua self, func: F) -> LuaResult<LuaFunction<'lua>>
where where
A: FromLuaMulti<'lua>, A: FromLuaMulti<'lua>,
@ -40,6 +50,9 @@ where
F: Fn(&'lua Lua, A) -> FR + 'lua, F: Fn(&'lua Lua, A) -> FR + 'lua,
FR: Future<Output = LuaResult<R>> + 'lua, FR: Future<Output = LuaResult<R>> + 'lua,
{ {
self.app_data_ref::<&Scheduler>()
.expect("Lua must have a scheduler to create async functions");
let async_env = self.create_table_with_capacity(0, 2)?; let async_env = self.create_table_with_capacity(0, 2)?;
async_env.set( async_env.set(
@ -57,7 +70,7 @@ where
let sched = lua let sched = lua
.app_data_ref::<&Scheduler>() .app_data_ref::<&Scheduler>()
.expect("Lua struct is missing scheduler"); .expect("Lua struct is missing scheduler");
sched.spawn_thread(thread, future)?; sched.spawn_thread(lua, thread, future)?;
Ok(()) Ok(())
}), }),
)?; )?;