Scheduler now manages entire Lua struct, elide lifetimes where possible

This commit is contained in:
Filip Tibell 2023-08-18 09:20:35 -05:00
parent 1b7287a742
commit dc80b1c28f
6 changed files with 81 additions and 114 deletions

View file

@ -1,6 +1,4 @@
use std::{process::ExitCode, sync::Arc}; use std::process::ExitCode;
use mlua::prelude::*;
mod error; mod error;
mod scheduler; mod scheduler;
@ -41,18 +39,8 @@ impl Lune {
script_name: impl AsRef<str>, script_name: impl AsRef<str>,
script_contents: impl AsRef<[u8]>, script_contents: impl AsRef<[u8]>,
) -> Result<ExitCode, LuneError> { ) -> Result<ExitCode, LuneError> {
let lua = Arc::new(Lua::new()); Ok(Scheduler::new()
let sched = Scheduler::new(Arc::clone(&lua)); .run_main(script_name, script_contents)
.await)
let main_fn = lua
.load(script_contents.as_ref())
.set_name(script_name.as_ref())
.into_function()?;
let main_thread = lua.create_thread(main_fn)?.into_owned();
sched
.push_back(main_thread, ())
.expect("Failed to enqueue thread for main");
Ok(sched.run_to_completion().await)
} }
} }

View file

@ -1,16 +1,16 @@
use futures_util::Future; use futures_util::Future;
use mlua::prelude::*; use mlua::prelude::*;
use super::SchedulerImpl; use super::Scheduler;
impl<'lua, 'fut> SchedulerImpl<'fut> impl<'lua, 'fut> Scheduler<'fut>
where where
'lua: 'fut, 'lua: 'fut,
{ {
/** /**
Schedules a plain future to run whenever the scheduler is available. Schedules a plain future to run whenever the scheduler is available.
*/ */
pub fn schedule_future<F>(&'lua self, fut: F) pub fn schedule_future<F>(&'fut self, fut: F)
where where
F: 'fut + Future<Output = ()>, F: 'fut + Future<Output = ()>,
{ {
@ -24,10 +24,9 @@ where
/** /**
Schedules the given `thread` to run when the given `fut` completes. Schedules the given `thread` to run when the given `fut` completes.
*/ */
pub fn schedule_future_thread<R, F>(&'lua self, thread: LuaOwnedThread, fut: F) -> LuaResult<()> pub fn schedule_future_thread<F>(&'fut self, thread: LuaOwnedThread, fut: F) -> LuaResult<()>
where where
R: IntoLuaMulti<'fut>, F: 'fut + Future<Output = LuaResult<LuaMultiValue<'fut>>>,
F: 'fut + Future<Output = LuaResult<R>>,
{ {
self.schedule_future(async move { self.schedule_future(async move {
let rets = fut.await.expect("Failed to receive result"); let rets = fut.await.expect("Failed to receive result");

View file

@ -2,11 +2,14 @@ use std::{process::ExitCode, sync::Arc};
use futures_util::StreamExt; use futures_util::StreamExt;
use mlua::prelude::*; use mlua::prelude::*;
use tokio::task::LocalSet; use tokio::task::LocalSet;
use super::SchedulerImpl; use super::{IntoLuaOwnedThread, Scheduler};
impl<'lua, 'fut> SchedulerImpl<'fut> const EMPTY_MULTI_VALUE: LuaMultiValue = LuaMultiValue::new();
impl<'lua, 'fut> Scheduler<'fut>
where where
'lua: 'fut, 'lua: 'fut,
{ {
@ -15,7 +18,7 @@ where
Returns `true` if any thread was resumed, `false` otherwise. Returns `true` if any thread was resumed, `false` otherwise.
*/ */
fn run_lua_threads(&'lua self) -> bool { fn run_lua_threads(&self) -> bool {
if self.state.has_exit_code() { if self.state.has_exit_code() {
return false; return false;
} }
@ -60,7 +63,7 @@ where
Returns `true` if any future was resumed, `false` otherwise. Returns `true` if any future was resumed, `false` otherwise.
*/ */
async fn run_futures(&'lua self) -> bool { async fn run_futures(&self) -> bool {
let mut resumed_any = false; let mut resumed_any = false;
let mut futs = self let mut futs = self
@ -84,32 +87,31 @@ where
Will emit lua output and errors to stdout and stderr. Will emit lua output and errors to stdout and stderr.
*/ */
pub async fn run_to_completion(&'lua self) -> ExitCode { pub async fn run_to_completion(&self) -> ExitCode {
let fut = async move { let set = LocalSet::new();
loop { let _guard = set.enter();
// 1. Run lua threads until exit or there are none left,
// if any thread was resumed it may have spawned futures
let resumed_lua = self.run_lua_threads();
// 2. If we got a manual exit code from lua we should loop {
// not try to wait for any pending futures to complete // 1. Run lua threads until exit or there are none left,
if self.state.has_exit_code() { // if any thread was resumed it may have spawned futures
break; let resumed_lua = self.run_lua_threads();
}
// 3. Keep resuming futures until we get a new lua thread to // 2. If we got a manual exit code from lua we should
// resume, or until we don't have any futures left to wait for // not try to wait for any pending futures to complete
let resumed_fut = self.run_futures().await; if self.state.has_exit_code() {
break;
// 4. If we did not resume any lua threads, and we have no futures
// remaining either, we have now run the scheduler until completion
if !resumed_lua && !resumed_fut {
break;
}
} }
};
LocalSet::new().run_until(fut).await; // 3. Keep resuming futures until we get a new lua thread to
// resume, or until we don't have any futures left to wait for
let resumed_fut = self.run_futures().await;
// 4. If we did not resume any lua threads, and we have no futures
// remaining either, we have now run the scheduler until completion
if !resumed_lua && !resumed_fut {
break;
}
}
if let Some(code) = self.state.exit_code() { if let Some(code) = self.state.exit_code() {
ExitCode::from(code) ExitCode::from(code)
@ -119,4 +121,31 @@ where
ExitCode::SUCCESS ExitCode::SUCCESS
} }
} }
/**
Runs a script with the given `script_name` and `script_contents` to completion.
Refer to [`run_to_completion`] for additional details.
*/
pub async fn run_main(
self,
script_name: impl AsRef<str>,
script_contents: impl AsRef<[u8]>,
) -> ExitCode {
let main_fn = self
.lua
.load(script_contents.as_ref())
.set_name(script_name.as_ref())
.into_function()
.expect("Failed to create function for main");
let main_thread = main_fn
.into_owned_lua_thread(&self.lua)
.expect("Failed to create thread for main");
self.push_back(main_thread, EMPTY_MULTI_VALUE)
.expect("Failed to enqueue thread for main");
self.run_to_completion().await
}
} }

View file

@ -4,10 +4,10 @@ use mlua::prelude::*;
use super::{ use super::{
thread::{SchedulerThread, SchedulerThreadId, SchedulerThreadSender}, thread::{SchedulerThread, SchedulerThreadId, SchedulerThreadSender},
SchedulerImpl, Scheduler,
}; };
impl<'lua, 'fut> SchedulerImpl<'fut> impl<'lua, 'fut> Scheduler<'fut>
where where
'lua: 'fut, 'lua: 'fut,
{ {
@ -28,8 +28,8 @@ where
Returns `None` if there are no threads left to run. Returns `None` if there are no threads left to run.
*/ */
pub(super) fn pop_thread( pub(super) fn pop_thread(
&'lua self, &self,
) -> LuaResult<Option<(LuaOwnedThread, LuaMultiValue<'lua>, SchedulerThreadSender)>> { ) -> LuaResult<Option<(LuaOwnedThread, LuaMultiValue<'_>, SchedulerThreadSender)>> {
match self match self
.threads .threads
.try_borrow_mut() .try_borrow_mut()
@ -56,12 +56,10 @@ where
right away, before any other currently scheduled threads. right away, before any other currently scheduled threads.
*/ */
pub fn push_front( pub fn push_front(
&'lua self, &self,
thread: LuaOwnedThread, thread: LuaOwnedThread,
args: impl IntoLuaMulti<'lua>, args: LuaMultiValue<'_>,
) -> LuaResult<SchedulerThreadId> { ) -> LuaResult<SchedulerThreadId> {
let args = args.into_lua_multi(&self.lua)?;
let thread = SchedulerThread::new(&self.lua, thread, args)?; let thread = SchedulerThread::new(&self.lua, thread, args)?;
let thread_id = thread.id(); let thread_id = thread.id();
@ -82,12 +80,10 @@ where
after all other current threads have been resumed. after all other current threads have been resumed.
*/ */
pub fn push_back( pub fn push_back(
&'lua self, &self,
thread: LuaOwnedThread, thread: LuaOwnedThread,
args: impl IntoLuaMulti<'lua>, args: LuaMultiValue<'_>,
) -> LuaResult<SchedulerThreadId> { ) -> LuaResult<SchedulerThreadId> {
let args = args.into_lua_multi(&self.lua)?;
let thread = SchedulerThread::new(&self.lua, thread, args)?; let thread = SchedulerThread::new(&self.lua, thread, args)?;
let thread_id = thread.id(); let thread_id = thread.id();
@ -107,9 +103,9 @@ where
Waits for the given thread to finish running, and returns its result. Waits for the given thread to finish running, and returns its result.
*/ */
pub async fn wait_for_thread( pub async fn wait_for_thread(
&'lua self, &self,
thread_id: SchedulerThreadId, thread_id: SchedulerThreadId,
) -> LuaResult<LuaMultiValue<'lua>> { ) -> LuaResult<LuaMultiValue<'_>> {
let mut recv = { let mut recv = {
let senders = self.thread_senders.borrow(); let senders = self.thread_senders.borrow();
let sender = senders let sender = senders

View file

@ -1,9 +1,7 @@
use std::{ use std::{
cell::RefCell, cell::RefCell,
collections::{HashMap, VecDeque}, collections::{HashMap, VecDeque},
ops::Deref,
pin::Pin, pin::Pin,
sync::Arc,
}; };
use futures_util::{stream::FuturesUnordered, Future}; use futures_util::{stream::FuturesUnordered, Future};
@ -28,51 +26,21 @@ use self::{
/** /**
Scheduler for Lua threads. Scheduler for Lua threads.
Can be cheaply cloned, and any clone will refer This wraps a [`Lua`] struct and exposes it as the `lua` property.
to the same underlying scheduler and Lua struct.
*/
#[derive(Debug, Clone)]
pub(crate) struct Scheduler<'fut> {
inner: Arc<SchedulerImpl<'fut>>,
}
impl<'fut> Scheduler<'fut> {
/**
Creates a new scheduler for the given [`Lua`] struct.
*/
pub fn new(lua: Arc<Lua>) -> Self {
let sched_lua = Arc::clone(&lua);
let sched_impl = SchedulerImpl::new(sched_lua);
let inner = Arc::new(sched_impl);
Self { inner }
}
}
impl<'fut> Deref for Scheduler<'fut> {
type Target = SchedulerImpl<'fut>;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
/**
Implementation of scheduler for Lua threads.
Not meant to be used directly, use [`Scheduler`] instead.
*/ */
#[derive(Debug)] #[derive(Debug)]
pub(crate) struct SchedulerImpl<'fut> { pub(crate) struct Scheduler<'fut> {
lua: Arc<Lua>, pub(crate) lua: Lua,
state: SchedulerState, state: SchedulerState,
threads: RefCell<VecDeque<SchedulerThread>>, threads: RefCell<VecDeque<SchedulerThread>>,
thread_senders: RefCell<HashMap<SchedulerThreadId, SchedulerThreadSender>>, thread_senders: RefCell<HashMap<SchedulerThreadId, SchedulerThreadSender>>,
futures: AsyncMutex<FuturesUnordered<Pin<Box<dyn Future<Output = ()> + 'fut>>>>, futures: AsyncMutex<FuturesUnordered<Pin<Box<dyn Future<Output = ()> + 'fut>>>>,
} }
impl<'fut> SchedulerImpl<'fut> { impl<'fut> Scheduler<'fut> {
fn new(lua: Arc<Lua>) -> Self { pub fn new() -> Self {
let lua = Lua::new();
Self { Self {
lua, lua,
state: SchedulerState::new(), state: SchedulerState::new(),

View file

@ -1,5 +1,3 @@
use std::sync::Arc;
use futures_util::Future; use futures_util::Future;
use mlua::prelude::*; use mlua::prelude::*;
@ -19,11 +17,6 @@ pub trait LuaSchedulerExt<'lua, 'fut>
where where
'lua: 'fut, 'lua: 'fut,
{ {
/**
Creates a new [`Lua`] struct with a [`Scheduler`].
*/
fn new_with_scheduler() -> Arc<Self>;
/** /**
Creates a function callable from Lua that runs an async Creates a function callable from Lua that runs an async
closure and returns the results of it to the call site. closure and returns the results of it to the call site.
@ -40,12 +33,6 @@ impl<'lua, 'fut> LuaSchedulerExt<'lua, 'fut> for Lua
where where
'lua: 'fut, 'lua: 'fut,
{ {
fn new_with_scheduler() -> Arc<Self> {
let lua = Arc::new(Lua::new());
lua.set_app_data(Scheduler::new(Arc::clone(&lua)));
lua
}
fn create_async_function<A, R, F, FR>(&'lua self, func: F) -> LuaResult<LuaFunction<'lua>> fn create_async_function<A, R, F, FR>(&'lua self, func: F) -> LuaResult<LuaFunction<'lua>>
where where
A: FromLuaMulti<'lua>, A: FromLuaMulti<'lua>,