More work on scheduler async stuff

This commit is contained in:
Filip Tibell 2023-08-17 20:00:01 -05:00
parent e08908e22e
commit 6416ef5fb7
7 changed files with 73 additions and 77 deletions

View file

@ -78,10 +78,11 @@ urlencoding = "2.1"
### RUNTIME
mlua = { version = "0.9.0", features = [
"macros",
"luau",
"luau-jit",
"serialize",
"macros",
"unstable",
] }
tokio = { version = "1.24", features = ["full"] }

View file

@ -1,9 +1,12 @@
use futures_util::Future;
use mlua::prelude::*;
use super::{traits::IntoLuaThread, SchedulerImpl};
use super::{traits::IntoLuaOwnedThread, SchedulerImpl};
impl<'lua> SchedulerImpl {
impl<'lua, 'fut> SchedulerImpl
where
'lua: 'fut,
{
/**
Schedules a plain future to run whenever the scheduler is available.
*/
@ -11,30 +14,30 @@ impl<'lua> SchedulerImpl {
where
F: 'static + Future<Output = ()>,
{
self.futures
let futs = self
.futures
.try_lock()
.expect("Failed to lock futures queue")
.push(Box::pin(fut))
.expect("Failed to lock futures queue");
futs.push(Box::pin(fut))
}
/**
Schedules the given `thread` to run when the given `fut` completes.
*/
pub fn schedule_thread<T, R, F>(&'lua self, thread: T, fut: F) -> LuaResult<()>
pub fn schedule_future_thread<T, F, R>(&'lua self, thread: T, fut: F) -> LuaResult<()>
where
T: IntoLuaThread<'lua>,
R: IntoLuaMulti<'lua>,
T: 'static + IntoLuaOwnedThread,
F: 'static + Future<Output = LuaResult<R>>,
R: IntoLuaMulti<'fut>,
{
let thread = thread.into_lua_thread(&self.lua)?;
let thread = thread.into_owned_lua_thread(&self.lua)?;
let fut = async move {
// FIXME: We use self in the future below, so this doesn't compile... how to fix?
self.schedule_future(async move {
let rets = fut.await.expect("Failed to receive result");
self.push_back(thread, rets)
.expect("Failed to schedule future thread");
};
// TODO: Lifetime issues
// self.schedule_future(fut);
});
Ok(())
}

View file

@ -6,7 +6,7 @@ use tokio::task::LocalSet;
use super::SchedulerImpl;
impl SchedulerImpl {
impl<'lua> SchedulerImpl {
/**
Runs all lua threads to completion.

View file

@ -4,7 +4,7 @@ use mlua::prelude::*;
use super::{
thread::{SchedulerThread, SchedulerThreadId, SchedulerThreadSender},
traits::IntoLuaThread,
traits::IntoLuaOwnedThread,
SchedulerImpl,
};
@ -27,7 +27,7 @@ impl<'lua> SchedulerImpl {
*/
pub(super) fn pop_thread(
&'lua self,
) -> LuaResult<Option<(LuaThread<'lua>, LuaMultiValue<'lua>, SchedulerThreadSender)>> {
) -> LuaResult<Option<(LuaOwnedThread, LuaMultiValue<'lua>, SchedulerThreadSender)>> {
match self
.threads
.try_borrow_mut()
@ -55,10 +55,10 @@ impl<'lua> SchedulerImpl {
*/
pub fn push_front(
&'lua self,
thread: impl IntoLuaThread<'lua>,
thread: impl IntoLuaOwnedThread,
args: impl IntoLuaMulti<'lua>,
) -> LuaResult<SchedulerThreadId> {
let thread = thread.into_lua_thread(&self.lua)?;
let thread = thread.into_owned_lua_thread(&self.lua)?;
let args = args.into_lua_multi(&self.lua)?;
let thread = SchedulerThread::new(&self.lua, thread, args)?;
@ -82,10 +82,10 @@ impl<'lua> SchedulerImpl {
*/
pub fn push_back(
&'lua self,
thread: impl IntoLuaThread<'lua>,
thread: impl IntoLuaOwnedThread,
args: impl IntoLuaMulti<'lua>,
) -> LuaResult<SchedulerThreadId> {
let thread = thread.into_lua_thread(&self.lua)?;
let thread = thread.into_owned_lua_thread(&self.lua)?;
let args = args.into_lua_multi(&self.lua)?;
let thread = SchedulerThread::new(&self.lua, thread, args)?;

View file

@ -30,32 +30,29 @@ use self::{
to the same underlying scheduler and Lua struct.
*/
#[derive(Debug, Clone)]
pub struct Scheduler(Arc<SchedulerImpl>);
pub struct Scheduler {
lua: Arc<Lua>,
inner: Arc<SchedulerImpl>,
}
impl Scheduler {
/**
Creates a new scheduler for the given [`Lua`] struct.
*/
pub fn new(lua: Arc<Lua>) -> Self {
assert!(
lua.app_data_ref::<Self>().is_none() && lua.app_data_ref::<&Self>().is_none(),
"Only one scheduler may be created per Lua struct"
);
let sched_lua = Arc::clone(&lua);
let sched_impl = SchedulerImpl::new(sched_lua);
let inner = SchedulerImpl::new(Arc::clone(&lua));
let sched = Self(Arc::new(inner));
let inner = Arc::new(sched_impl);
lua.set_app_data(sched.clone());
lua.set_interrupt(move |_| Ok(LuaVmState::Continue));
sched
Self { lua, inner }
}
}
impl Deref for Scheduler {
type Target = SchedulerImpl;
fn deref(&self) -> &Self::Target {
&self.0
&self.inner
}
}

View file

@ -33,8 +33,8 @@ impl SchedulerThreadId {
#[derive(Debug)]
pub(super) struct SchedulerThread {
scheduler_id: SchedulerThreadId,
key_thread: LuaRegistryKey,
key_args: LuaRegistryKey,
thread: LuaOwnedThread,
args: LuaRegistryKey,
}
impl SchedulerThread {
@ -45,44 +45,36 @@ impl SchedulerThread {
*/
pub(super) fn new<'lua>(
lua: &'lua Lua,
thread: LuaThread<'lua>,
thread: LuaOwnedThread,
args: LuaMultiValue<'lua>,
) -> LuaResult<Self> {
let args_vec = args.into_vec();
let key_thread = lua
.create_registry_value(thread)
.context("Failed to store value in registry")?;
let key_args = lua
let args = lua
.create_registry_value(args_vec)
.context("Failed to store value in registry")?;
Ok(Self {
scheduler_id: SchedulerThreadId::gen(),
key_thread,
key_args,
thread,
args,
})
}
/**
Extracts the inner thread and args from the container.
*/
pub(super) fn into_inner(self, lua: &Lua) -> (LuaThread<'_>, LuaMultiValue<'_>) {
let thread = lua
.registry_value(&self.key_thread)
.expect("Failed to get thread from registry");
pub(super) fn into_inner(self, lua: &Lua) -> (LuaOwnedThread, LuaMultiValue<'_>) {
let args_vec = lua
.registry_value(&self.key_args)
.registry_value(&self.args)
.expect("Failed to get thread args from registry");
let args = LuaMultiValue::from_vec(args_vec);
lua.remove_registry_value(self.key_thread)
.expect("Failed to remove thread from registry");
lua.remove_registry_value(self.key_args)
lua.remove_registry_value(self.args)
.expect("Failed to remove thread args from registry");
(thread, args)
(self.thread, args)
}
/**

View file

@ -10,12 +10,9 @@ use super::Scheduler;
*/
pub trait LuaSchedulerExt {
/**
Get a strong reference to the scheduler for the [`Lua`] struct.
Note that if this reference is not dropped, `Lua` can
not be dropped either because of the strong reference.
Get a reference to the scheduler for the [`Lua`] struct.
*/
fn scheduler(&self) -> Scheduler;
fn scheduler(&self) -> &Scheduler;
/**
Creates a function callable from Lua that runs an async
@ -33,10 +30,10 @@ pub trait LuaSchedulerExt {
}
impl LuaSchedulerExt for Lua {
fn scheduler(&self) -> Scheduler {
self.app_data_ref::<Scheduler>()
fn scheduler(&self) -> &Scheduler {
*self
.app_data_ref::<&Scheduler>()
.expect("Lua struct is missing scheduler")
.clone()
}
fn create_async_function<'lua, A, R, F, FR>(&'lua self, func: F) -> LuaResult<LuaFunction<'lua>>
@ -46,21 +43,21 @@ impl LuaSchedulerExt for Lua {
F: 'static + Fn(&'lua Lua, A) -> FR,
FR: 'static + Future<Output = LuaResult<R>>,
{
let async_yield = self
let coroutine_yield = self
.globals()
.get::<_, LuaTable>("coroutine")?
.get::<_, LuaFunction>("yield")?;
let async_schedule = self.create_function(move |lua: &Lua, args: A| {
let thread = lua.current_thread();
let schedule = LuaFunction::wrap(move |lua: &Lua, args: A| {
let thread = lua.current_thread().into_owned();
let future = func(lua, args);
// TODO: Add to scheduler
lua.scheduler().schedule_future_thread(thread, future);
Ok(())
})?;
});
let async_func = self
.load(chunk!({
$async_schedule(...)
return $async_yield()
$schedule(...)
return $coroutine_yield()
}))
.set_name("async")
.into_function()?;
@ -76,27 +73,33 @@ impl LuaSchedulerExt for Lua {
- Lua functions ([`LuaFunction`])
- Lua chunks ([`LuaChunk`])
*/
pub trait IntoLuaThread<'lua> {
pub trait IntoLuaOwnedThread {
/**
Converts the value into a lua thread.
*/
fn into_lua_thread(self, lua: &'lua Lua) -> LuaResult<LuaThread<'lua>>;
fn into_owned_lua_thread(self, lua: &Lua) -> LuaResult<LuaOwnedThread>;
}
impl<'lua> IntoLuaThread<'lua> for LuaThread<'lua> {
fn into_lua_thread(self, _: &'lua Lua) -> LuaResult<LuaThread<'lua>> {
impl IntoLuaOwnedThread for LuaOwnedThread {
fn into_owned_lua_thread(self, _lua: &Lua) -> LuaResult<LuaOwnedThread> {
Ok(self)
}
}
impl<'lua> IntoLuaThread<'lua> for LuaFunction<'lua> {
fn into_lua_thread(self, lua: &'lua Lua) -> LuaResult<LuaThread<'lua>> {
lua.create_thread(self)
impl<'lua> IntoLuaOwnedThread for LuaThread<'lua> {
fn into_owned_lua_thread(self, _lua: &Lua) -> LuaResult<LuaOwnedThread> {
Ok(self.into_owned())
}
}
impl<'lua, 'a> IntoLuaThread<'lua> for LuaChunk<'lua, 'a> {
fn into_lua_thread(self, lua: &'lua Lua) -> LuaResult<LuaThread<'lua>> {
lua.create_thread(self.into_function()?)
impl<'lua> IntoLuaOwnedThread for LuaFunction<'lua> {
fn into_owned_lua_thread(self, lua: &Lua) -> LuaResult<LuaOwnedThread> {
Ok(lua.create_thread(self)?.into_owned())
}
}
impl<'lua, 'a> IntoLuaOwnedThread for LuaChunk<'lua, 'a> {
fn into_owned_lua_thread(self, lua: &Lua) -> LuaResult<LuaOwnedThread> {
Ok(lua.create_thread(self.into_function()?)?.into_owned())
}
}