Improve handling of async lua function creation

This commit is contained in:
Filip Tibell 2023-02-16 23:23:52 +01:00
parent 4ccaa52b87
commit 546ebbd349
No known key found for this signature in database
9 changed files with 124 additions and 188 deletions

View file

@ -58,6 +58,7 @@ pub fn create(lua: &'static Lua, args_vec: Vec<String>) -> LuaResult<LuaTable> {
})?; })?;
let process_exit = lua let process_exit = lua
.load(PROCESS_EXIT_IMPL_LUA) .load(PROCESS_EXIT_IMPL_LUA)
.set_name("=process.exit")?
.set_environment( .set_environment(
TableBuilder::new(lua)? TableBuilder::new(lua)?
.with_value("yield", process_exit_env_yield)? .with_value("yield", process_exit_env_yield)?

View file

@ -1,4 +1,7 @@
use std::time::Duration;
use mlua::prelude::*; use mlua::prelude::*;
use tokio::time::{sleep, Instant};
use crate::{ use crate::{
lua::task::{TaskKind, TaskReference, TaskScheduler, TaskSchedulerScheduleExt}, lua::task::{TaskKind, TaskReference, TaskScheduler, TaskSchedulerScheduleExt},
@ -7,13 +10,6 @@ use crate::{
const ERR_MISSING_SCHEDULER: &str = "Missing task scheduler - make sure it is added as a lua app data before the first scheduler resumption"; const ERR_MISSING_SCHEDULER: &str = "Missing task scheduler - make sure it is added as a lua app data before the first scheduler resumption";
const TASK_WAIT_IMPL_LUA: &str = r#"
local seconds = ...
local current = thread()
resumeAfter(seconds, current)
return yield()
"#;
const TASK_SPAWN_IMPL_LUA: &str = r#" const TASK_SPAWN_IMPL_LUA: &str = r#"
-- Schedule the current thread at the front -- Schedule the current thread at the front
scheduleNext(thread()) scheduleNext(thread())
@ -27,58 +23,14 @@ return task
"#; "#;
pub fn create(lua: &'static Lua) -> LuaResult<LuaTable<'static>> { pub fn create(lua: &'static Lua) -> LuaResult<LuaTable<'static>> {
// Create a user-accessible function that cancels a task // The spawn function needs special treatment,
let task_cancel = lua.create_function(|lua, task: TaskReference| {
let sched = lua
.app_data_ref::<&TaskScheduler>()
.expect(ERR_MISSING_SCHEDULER);
sched.remove_task(task)?;
Ok(())
})?;
// Create functions that manipulate non-blocking tasks in the scheduler
let task_defer = lua.create_function(|lua, (tof, args): (LuaValue, LuaMultiValue)| {
let sched = lua
.app_data_ref::<&TaskScheduler>()
.expect(ERR_MISSING_SCHEDULER);
sched.schedule_blocking_deferred(tof, args)
})?;
let task_delay =
lua.create_function(|lua, (secs, tof, args): (f64, LuaValue, LuaMultiValue)| {
let sched = lua
.app_data_ref::<&TaskScheduler>()
.expect(ERR_MISSING_SCHEDULER);
sched.schedule_delayed(secs, tof, args)
})?;
// Create our task wait function, this is a bit different since
// we have no way to yield from c / rust, we need to load a
// lua chunk that schedules and yields for us instead
let task_wait_env_thread: LuaFunction = lua.named_registry_value("co.thread")?;
let task_wait_env_yield: LuaFunction = lua.named_registry_value("co.yield")?;
let task_wait = lua
.load(TASK_WAIT_IMPL_LUA)
.set_environment(
TableBuilder::new(lua)?
.with_value("thread", task_wait_env_thread)?
.with_value("yield", task_wait_env_yield)?
.with_function(
"resumeAfter",
|lua, (secs, thread): (Option<f64>, LuaThread)| {
let sched = lua
.app_data_ref::<&TaskScheduler>()
.expect(ERR_MISSING_SCHEDULER);
sched.schedule_wait(secs.unwrap_or_default(), LuaValue::Thread(thread))
},
)?
.build_readonly()?,
)?
.into_function()?;
// The spawn function also needs special treatment,
// we need to yield right away to allow the // we need to yield right away to allow the
// spawned task to run until first yield // spawned task to run until first yield
let task_spawn_env_thread: LuaFunction = lua.named_registry_value("co.thread")?; let task_spawn_env_thread: LuaFunction = lua.named_registry_value("co.thread")?;
let task_spawn_env_yield: LuaFunction = lua.named_registry_value("co.yield")?; let task_spawn_env_yield: LuaFunction = lua.named_registry_value("co.yield")?;
let task_spawn = lua let task_spawn = lua
.load(TASK_SPAWN_IMPL_LUA) .load(TASK_SPAWN_IMPL_LUA)
.set_name("=task.spawn")?
.set_environment( .set_environment(
TableBuilder::new(lua)? TableBuilder::new(lua)?
.with_value("thread", task_spawn_env_thread)? .with_value("thread", task_spawn_env_thread)?
@ -164,10 +116,41 @@ pub fn create(lua: &'static Lua) -> LuaResult<LuaTable<'static>> {
)?; )?;
// All good, return the task scheduler lib // All good, return the task scheduler lib
TableBuilder::new(lua)? TableBuilder::new(lua)?
.with_value("cancel", task_cancel)?
.with_value("spawn", task_spawn)? .with_value("spawn", task_spawn)?
.with_value("defer", task_defer)? .with_function("cancel", task_cancel)?
.with_value("delay", task_delay)? .with_function("defer", task_defer)?
.with_value("wait", task_wait)? .with_function("delay", task_delay)?
.with_async_function("wait", task_wait)?
.build_readonly() .build_readonly()
} }
fn task_cancel(lua: &Lua, task: TaskReference) -> LuaResult<()> {
let sched = lua
.app_data_ref::<&TaskScheduler>()
.expect(ERR_MISSING_SCHEDULER);
sched.remove_task(task)?;
Ok(())
}
fn task_defer(lua: &Lua, (tof, args): (LuaValue, LuaMultiValue)) -> LuaResult<TaskReference> {
let sched = lua
.app_data_ref::<&TaskScheduler>()
.expect(ERR_MISSING_SCHEDULER);
sched.schedule_blocking_deferred(tof, args)
}
fn task_delay(
lua: &Lua,
(secs, tof, args): (f64, LuaValue, LuaMultiValue),
) -> LuaResult<TaskReference> {
let sched = lua
.app_data_ref::<&TaskScheduler>()
.expect(ERR_MISSING_SCHEDULER);
sched.schedule_blocking_after_seconds(secs, tof, args)
}
async fn task_wait(_: &Lua, secs: Option<f64>) -> LuaResult<f64> {
let start = Instant::now();
sleep(Duration::from_secs_f64(secs.unwrap_or_default())).await;
Ok(start.elapsed().as_secs_f64())
}

View file

@ -28,8 +28,6 @@ for level = 2, 2^8 do
end end
end end
if #lines > 0 then if #lines > 0 then
push(lines, 1, "Stack Begin")
push(lines, "Stack End")
return concat(lines, "\n") return concat(lines, "\n")
else else
return nil return nil
@ -43,6 +41,7 @@ end
--- ---
* `"require"` -> `require` * `"require"` -> `require`
* `"select"` -> `select`
--- ---
* `"print"` -> `print` * `"print"` -> `print`
* `"error"` -> `error` * `"error"` -> `error`
@ -68,6 +67,7 @@ pub fn create() -> LuaResult<&'static Lua> {
// Store original lua global functions in the registry so we can use // Store original lua global functions in the registry so we can use
// them later without passing them around and dealing with lifetimes // them later without passing them around and dealing with lifetimes
lua.set_named_registry_value("require", globals.get::<_, LuaFunction>("require")?)?; lua.set_named_registry_value("require", globals.get::<_, LuaFunction>("require")?)?;
lua.set_named_registry_value("select", globals.get::<_, LuaFunction>("select")?)?;
lua.set_named_registry_value("print", globals.get::<_, LuaFunction>("print")?)?; lua.set_named_registry_value("print", globals.get::<_, LuaFunction>("print")?)?;
lua.set_named_registry_value("error", globals.get::<_, LuaFunction>("error")?)?; lua.set_named_registry_value("error", globals.get::<_, LuaFunction>("error")?)?;
lua.set_named_registry_value("type", globals.get::<_, LuaFunction>("type")?)?; lua.set_named_registry_value("type", globals.get::<_, LuaFunction>("type")?)?;
@ -85,6 +85,7 @@ pub fn create() -> LuaResult<&'static Lua> {
trace_env.set("format", string.get::<_, LuaFunction>("format")?)?; trace_env.set("format", string.get::<_, LuaFunction>("format")?)?;
let trace_fn = lua let trace_fn = lua
.load(TRACE_IMPL_LUA) .load(TRACE_IMPL_LUA)
.set_name("=dbg.trace")?
.set_environment(trace_env)? .set_environment(trace_env)?
.into_function()?; .into_function()?;
lua.set_named_registry_value("dbg.trace", trace_fn)?; lua.set_named_registry_value("dbg.trace", trace_fn)?;

View file

@ -0,0 +1,61 @@
use async_trait::async_trait;
use futures_util::Future;
use mlua::prelude::*;
use crate::{lua::task::TaskScheduler, utils::table::TableBuilder};
#[async_trait(?Send)]
pub trait LuaAsyncExt {
fn create_async_function<'lua, A, R, F, FR>(self, func: F) -> LuaResult<LuaFunction<'lua>>
where
A: FromLuaMulti<'static>,
R: ToLuaMulti<'static>,
F: 'static + Fn(&'static Lua, A) -> FR,
FR: 'static + Future<Output = LuaResult<R>>;
}
impl LuaAsyncExt for &'static Lua {
/**
Creates a function callable from Lua that runs an async
closure and returns the results of it to the call site.
*/
fn create_async_function<'lua, A, R, F, FR>(self, func: F) -> LuaResult<LuaFunction<'lua>>
where
A: FromLuaMulti<'static>,
R: ToLuaMulti<'static>,
F: 'static + Fn(&'static Lua, A) -> FR,
FR: 'static + Future<Output = LuaResult<R>>,
{
let async_env_thread: LuaFunction = self.named_registry_value("co.thread")?;
let async_env_yield: LuaFunction = self.named_registry_value("co.yield")?;
let async_env = TableBuilder::new(self)?
.with_value("thread", async_env_thread)?
.with_value("yield", async_env_yield)?
.with_function(
"resumeAsync",
move |lua: &Lua, (thread, args): (LuaThread, A)| {
let fut = func(lua, args);
let sched = lua
.app_data_ref::<&TaskScheduler>()
.expect("Missing task scheduler as a lua app data");
sched.queue_async_task(LuaValue::Thread(thread), None, None, async {
let rets = fut.await?;
let mult = rets.to_lua_multi(lua)?;
Ok(Some(mult))
})
},
)?
.build_readonly()?;
let async_func = self
.load(
"
resumeAsync(thread(), ...)
return yield()
",
)
.set_name("asyncWrapper")?
.set_environment(async_env)?
.into_function()?;
Ok(async_func)
}
}

View file

@ -1,5 +1,6 @@
mod create; mod create;
pub mod ext;
pub mod net; pub mod net;
pub mod stdio; pub mod stdio;
pub mod task; pub mod task;

View file

@ -3,18 +3,11 @@ use async_trait::async_trait;
use futures_util::Future; use futures_util::Future;
use mlua::prelude::*; use mlua::prelude::*;
use crate::utils::table::TableBuilder;
use super::super::{ use super::super::{
async_handle::TaskSchedulerAsyncHandle, message::TaskSchedulerMessage, async_handle::TaskSchedulerAsyncHandle, message::TaskSchedulerMessage,
scheduler::TaskReference, scheduler::TaskScheduler, scheduler::TaskReference, scheduler::TaskScheduler,
}; };
const TASK_ASYNC_IMPL_LUA: &str = r#"
resumeAsync(thread(), ...)
return yield()
"#;
/* /*
Trait definition - same as the implementation, ignore this Trait definition - same as the implementation, ignore this
@ -37,13 +30,6 @@ pub trait TaskSchedulerAsyncExt<'fut> {
R: ToLuaMulti<'static>, R: ToLuaMulti<'static>,
F: 'static + Fn(&'static Lua) -> FR, F: 'static + Fn(&'static Lua) -> FR,
FR: 'static + Future<Output = LuaResult<R>>; FR: 'static + Future<Output = LuaResult<R>>;
fn make_scheduled_async_fn<A, R, F, FR>(&self, func: F) -> LuaResult<LuaFunction>
where
A: FromLuaMulti<'static>,
R: ToLuaMulti<'static>,
F: 'static + Fn(&'static Lua, A) -> FR,
FR: 'static + Future<Output = LuaResult<R>>;
} }
/* /*
@ -106,42 +92,4 @@ impl<'fut> TaskSchedulerAsyncExt<'fut> for TaskScheduler<'fut> {
} }
}) })
} }
/**
Creates a function callable from Lua that runs an async
closure and returns the results of it to the call site.
*/
fn make_scheduled_async_fn<A, R, F, FR>(&self, func: F) -> LuaResult<LuaFunction>
where
A: FromLuaMulti<'static>,
R: ToLuaMulti<'static>,
F: 'static + Fn(&'static Lua, A) -> FR,
FR: 'static + Future<Output = LuaResult<R>>,
{
let async_env_thread: LuaFunction = self.lua.named_registry_value("co.thread")?;
let async_env_yield: LuaFunction = self.lua.named_registry_value("co.yield")?;
self.lua
.load(TASK_ASYNC_IMPL_LUA)
.set_environment(
TableBuilder::new(self.lua)?
.with_value("thread", async_env_thread)?
.with_value("yield", async_env_yield)?
.with_function(
"resumeAsync",
move |lua: &Lua, (thread, args): (LuaThread, A)| {
let fut = func(lua, args);
let sched = lua
.app_data_ref::<&TaskScheduler>()
.expect("Missing task scheduler - make sure it is added as a lua app data before the first scheduler resumption");
sched.queue_async_task(LuaValue::Thread(thread), None, None, async {
let rets = fut.await?;
let mult = rets.to_lua_multi(lua)?;
Ok(Some(mult))
})
},
)?
.build_readonly()?,
)?
.into_function()
}
} }

View file

@ -26,18 +26,12 @@ pub trait TaskSchedulerScheduleExt {
thread_args: LuaMultiValue<'_>, thread_args: LuaMultiValue<'_>,
) -> LuaResult<TaskReference>; ) -> LuaResult<TaskReference>;
fn schedule_delayed( fn schedule_blocking_after_seconds(
&self, &self,
after_secs: f64, after_secs: f64,
thread_or_function: LuaValue<'_>, thread_or_function: LuaValue<'_>,
thread_args: LuaMultiValue<'_>, thread_args: LuaMultiValue<'_>,
) -> LuaResult<TaskReference>; ) -> LuaResult<TaskReference>;
fn schedule_wait(
&self,
after_secs: f64,
thread_or_function: LuaValue<'_>,
) -> LuaResult<TaskReference>;
} }
/* /*
@ -93,7 +87,7 @@ impl TaskSchedulerScheduleExt for TaskScheduler<'_> {
The given lua thread or function will be resumed The given lua thread or function will be resumed
using the given `thread_args` as its argument(s). using the given `thread_args` as its argument(s).
*/ */
fn schedule_delayed( fn schedule_blocking_after_seconds(
&self, &self,
after_secs: f64, after_secs: f64,
thread_or_function: LuaValue<'_>, thread_or_function: LuaValue<'_>,
@ -104,30 +98,4 @@ impl TaskSchedulerScheduleExt for TaskScheduler<'_> {
Ok(None) Ok(None)
}) })
} }
/**
Schedules a lua thread or function to
be resumed after waiting asynchronously.
The given lua thread or function will be resumed
using the elapsed time as its one and only argument.
*/
fn schedule_wait(
&self,
after_secs: f64,
thread_or_function: LuaValue<'_>,
) -> LuaResult<TaskReference> {
self.queue_async_task(
thread_or_function,
None,
// Wait should recycle the guid of the current task,
// which ensures that the TaskReference is identical and
// that any waits inside of spawned tasks will also cancel
self.guid_running.get(),
async move {
sleep(Duration::from_secs_f64(after_secs)).await;
Ok(None)
},
)
}
} }

View file

@ -8,10 +8,7 @@ use std::{
use futures_util::{future::LocalBoxFuture, stream::FuturesUnordered, Future}; use futures_util::{future::LocalBoxFuture, stream::FuturesUnordered, Future};
use mlua::prelude::*; use mlua::prelude::*;
use tokio::{ use tokio::sync::{mpsc, Mutex as AsyncMutex};
sync::{mpsc, Mutex as AsyncMutex},
time::Instant,
};
use super::message::TaskSchedulerMessage; use super::message::TaskSchedulerMessage;
pub use super::{task_kind::TaskKind, task_reference::TaskReference}; pub use super::{task_kind::TaskKind, task_reference::TaskReference};
@ -24,7 +21,6 @@ type TaskFuture<'fut> = LocalBoxFuture<'fut, (TaskReference, TaskFutureRets<'fut
pub struct Task { pub struct Task {
thread: LuaRegistryKey, thread: LuaRegistryKey,
args: LuaRegistryKey, args: LuaRegistryKey,
queued_at: Instant,
} }
/// A task scheduler that implements task queues /// A task scheduler that implements task queues
@ -147,11 +143,9 @@ impl<'fut> TaskScheduler<'fut> {
let task_args_key: LuaRegistryKey = self.lua.create_registry_value(task_args_vec)?; let task_args_key: LuaRegistryKey = self.lua.create_registry_value(task_args_vec)?;
let task_thread_key: LuaRegistryKey = self.lua.create_registry_value(task_thread)?; let task_thread_key: LuaRegistryKey = self.lua.create_registry_value(task_thread)?;
// Create the full task struct // Create the full task struct
let queued_at = Instant::now();
let task = Task { let task = Task {
thread: task_thread_key, thread: task_thread_key,
args: task_args_key, args: task_args_key,
queued_at,
}; };
// Create the task ref to use // Create the task ref to use
let task_ref = if let Some(reusable_guid) = guid_to_reuse { let task_ref = if let Some(reusable_guid) = guid_to_reuse {
@ -239,34 +233,17 @@ impl<'fut> TaskScheduler<'fut> {
}); });
self.lua.remove_registry_value(task.thread)?; self.lua.remove_registry_value(task.thread)?;
self.lua.remove_registry_value(task.args)?; self.lua.remove_registry_value(task.args)?;
if let Some(args_res) = args_opt_res {
match args_res {
Err(e) => Err(e), // FIXME: We need to throw this error in lua to let pcall & friends handle it properly
Ok(args) => {
self.guid_running.set(Some(reference.id())); self.guid_running.set(Some(reference.id()));
let rets = thread.resume::<_, LuaMultiValue>(args); let rets = match args_opt_res {
Some(args_res) => match args_res {
Err(err) => Err(err), // FIXME: We need to throw this error in lua to let pcall & friends handle it properly
Ok(args) => thread.resume::<_, LuaMultiValue>(args),
},
None => thread.resume(()),
};
self.guid_running.set(None); self.guid_running.set(None);
rets rets
} }
}
} else {
/*
The tasks did not get any arguments from either:
- Providing arguments at the call site for creating the task
- Returning arguments from a future that created this task
The only tasks that do not get any arguments from either
of those sources are waiting tasks, and waiting tasks
want the amount of time waited returned to them.
*/
let elapsed = task.queued_at.elapsed().as_secs_f64();
self.guid_running.set(Some(reference.id()));
let rets = thread.resume::<_, LuaMultiValue>(elapsed);
self.guid_running.set(None);
rets
}
}
/** /**
Queues a new task to run on the task scheduler. Queues a new task to run on the task scheduler.
@ -284,7 +261,7 @@ impl<'fut> TaskScheduler<'fut> {
-- Here we have either yielded or finished the above task -- Here we have either yielded or finished the above task
``` ```
*/ */
pub(super) fn queue_blocking_task( pub(crate) fn queue_blocking_task(
&self, &self,
kind: TaskKind, kind: TaskKind,
thread_or_function: LuaValue<'_>, thread_or_function: LuaValue<'_>,
@ -324,7 +301,7 @@ impl<'fut> TaskScheduler<'fut> {
/** /**
Queues a new future to run on the task scheduler. Queues a new future to run on the task scheduler.
*/ */
pub(super) fn queue_async_task( pub(crate) fn queue_async_task(
&self, &self,
thread_or_function: LuaValue<'_>, thread_or_function: LuaValue<'_>,
thread_args: Option<LuaMultiValue<'_>>, thread_args: Option<LuaMultiValue<'_>>,

View file

@ -2,7 +2,7 @@ use std::future::Future;
use mlua::prelude::*; use mlua::prelude::*;
use crate::lua::task::{TaskScheduler, TaskSchedulerAsyncExt}; use crate::lua::ext::LuaAsyncExt;
pub struct TableBuilder { pub struct TableBuilder {
lua: &'static Lua, lua: &'static Lua,
@ -78,12 +78,8 @@ impl TableBuilder {
F: 'static + Fn(&'static Lua, A) -> FR, F: 'static + Fn(&'static Lua, A) -> FR,
FR: 'static + Future<Output = LuaResult<R>>, FR: 'static + Future<Output = LuaResult<R>>,
{ {
let sched = self let f = self.lua.create_async_function(func)?;
.lua self.with_value(key, LuaValue::Function(f))
.app_data_ref::<&TaskScheduler>()
.expect("Missing task scheduler - make sure it is added as a lua app data before the first scheduler resumption");
let func = sched.make_scheduled_async_fn(func)?;
self.with_value(key, LuaValue::Function(func))
} }
pub fn build_readonly(self) -> LuaResult<LuaTable<'static>> { pub fn build_readonly(self) -> LuaResult<LuaTable<'static>> {