diff --git a/packages/lib/src/globals/process.rs b/packages/lib/src/globals/process.rs index 433fe65..7b4a218 100644 --- a/packages/lib/src/globals/process.rs +++ b/packages/lib/src/globals/process.rs @@ -58,6 +58,7 @@ pub fn create(lua: &'static Lua, args_vec: Vec) -> LuaResult { })?; let process_exit = lua .load(PROCESS_EXIT_IMPL_LUA) + .set_name("=process.exit")? .set_environment( TableBuilder::new(lua)? .with_value("yield", process_exit_env_yield)? diff --git a/packages/lib/src/globals/task.rs b/packages/lib/src/globals/task.rs index ce38641..07127a1 100644 --- a/packages/lib/src/globals/task.rs +++ b/packages/lib/src/globals/task.rs @@ -1,4 +1,7 @@ +use std::time::Duration; + use mlua::prelude::*; +use tokio::time::{sleep, Instant}; use crate::{ lua::task::{TaskKind, TaskReference, TaskScheduler, TaskSchedulerScheduleExt}, @@ -7,13 +10,6 @@ use crate::{ const ERR_MISSING_SCHEDULER: &str = "Missing task scheduler - make sure it is added as a lua app data before the first scheduler resumption"; -const TASK_WAIT_IMPL_LUA: &str = r#" -local seconds = ... -local current = thread() -resumeAfter(seconds, current) -return yield() -"#; - const TASK_SPAWN_IMPL_LUA: &str = r#" -- Schedule the current thread at the front scheduleNext(thread()) @@ -27,58 +23,14 @@ return task "#; pub fn create(lua: &'static Lua) -> LuaResult> { - // Create a user-accessible function that cancels a task - let task_cancel = lua.create_function(|lua, task: TaskReference| { - let sched = lua - .app_data_ref::<&TaskScheduler>() - .expect(ERR_MISSING_SCHEDULER); - sched.remove_task(task)?; - Ok(()) - })?; - // Create functions that manipulate non-blocking tasks in the scheduler - let task_defer = lua.create_function(|lua, (tof, args): (LuaValue, LuaMultiValue)| { - let sched = lua - .app_data_ref::<&TaskScheduler>() - .expect(ERR_MISSING_SCHEDULER); - sched.schedule_blocking_deferred(tof, args) - })?; - let task_delay = - lua.create_function(|lua, (secs, tof, args): (f64, LuaValue, LuaMultiValue)| { - let sched = lua - .app_data_ref::<&TaskScheduler>() - .expect(ERR_MISSING_SCHEDULER); - sched.schedule_delayed(secs, tof, args) - })?; - // Create our task wait function, this is a bit different since - // we have no way to yield from c / rust, we need to load a - // lua chunk that schedules and yields for us instead - let task_wait_env_thread: LuaFunction = lua.named_registry_value("co.thread")?; - let task_wait_env_yield: LuaFunction = lua.named_registry_value("co.yield")?; - let task_wait = lua - .load(TASK_WAIT_IMPL_LUA) - .set_environment( - TableBuilder::new(lua)? - .with_value("thread", task_wait_env_thread)? - .with_value("yield", task_wait_env_yield)? - .with_function( - "resumeAfter", - |lua, (secs, thread): (Option, LuaThread)| { - let sched = lua - .app_data_ref::<&TaskScheduler>() - .expect(ERR_MISSING_SCHEDULER); - sched.schedule_wait(secs.unwrap_or_default(), LuaValue::Thread(thread)) - }, - )? - .build_readonly()?, - )? - .into_function()?; - // The spawn function also needs special treatment, + // The spawn function needs special treatment, // we need to yield right away to allow the // spawned task to run until first yield let task_spawn_env_thread: LuaFunction = lua.named_registry_value("co.thread")?; let task_spawn_env_yield: LuaFunction = lua.named_registry_value("co.yield")?; let task_spawn = lua .load(TASK_SPAWN_IMPL_LUA) + .set_name("=task.spawn")? .set_environment( TableBuilder::new(lua)? .with_value("thread", task_spawn_env_thread)? @@ -164,10 +116,41 @@ pub fn create(lua: &'static Lua) -> LuaResult> { )?; // All good, return the task scheduler lib TableBuilder::new(lua)? - .with_value("cancel", task_cancel)? .with_value("spawn", task_spawn)? - .with_value("defer", task_defer)? - .with_value("delay", task_delay)? - .with_value("wait", task_wait)? + .with_function("cancel", task_cancel)? + .with_function("defer", task_defer)? + .with_function("delay", task_delay)? + .with_async_function("wait", task_wait)? .build_readonly() } + +fn task_cancel(lua: &Lua, task: TaskReference) -> LuaResult<()> { + let sched = lua + .app_data_ref::<&TaskScheduler>() + .expect(ERR_MISSING_SCHEDULER); + sched.remove_task(task)?; + Ok(()) +} + +fn task_defer(lua: &Lua, (tof, args): (LuaValue, LuaMultiValue)) -> LuaResult { + let sched = lua + .app_data_ref::<&TaskScheduler>() + .expect(ERR_MISSING_SCHEDULER); + sched.schedule_blocking_deferred(tof, args) +} + +fn task_delay( + lua: &Lua, + (secs, tof, args): (f64, LuaValue, LuaMultiValue), +) -> LuaResult { + let sched = lua + .app_data_ref::<&TaskScheduler>() + .expect(ERR_MISSING_SCHEDULER); + sched.schedule_blocking_after_seconds(secs, tof, args) +} + +async fn task_wait(_: &Lua, secs: Option) -> LuaResult { + let start = Instant::now(); + sleep(Duration::from_secs_f64(secs.unwrap_or_default())).await; + Ok(start.elapsed().as_secs_f64()) +} diff --git a/packages/lib/src/lua/create.rs b/packages/lib/src/lua/create.rs index 7708c81..5379b24 100644 --- a/packages/lib/src/lua/create.rs +++ b/packages/lib/src/lua/create.rs @@ -28,8 +28,6 @@ for level = 2, 2^8 do end end if #lines > 0 then - push(lines, 1, "Stack Begin") - push(lines, "Stack End") return concat(lines, "\n") else return nil @@ -43,6 +41,7 @@ end --- * `"require"` -> `require` + * `"select"` -> `select` --- * `"print"` -> `print` * `"error"` -> `error` @@ -68,6 +67,7 @@ pub fn create() -> LuaResult<&'static Lua> { // Store original lua global functions in the registry so we can use // them later without passing them around and dealing with lifetimes lua.set_named_registry_value("require", globals.get::<_, LuaFunction>("require")?)?; + lua.set_named_registry_value("select", globals.get::<_, LuaFunction>("select")?)?; lua.set_named_registry_value("print", globals.get::<_, LuaFunction>("print")?)?; lua.set_named_registry_value("error", globals.get::<_, LuaFunction>("error")?)?; lua.set_named_registry_value("type", globals.get::<_, LuaFunction>("type")?)?; @@ -85,6 +85,7 @@ pub fn create() -> LuaResult<&'static Lua> { trace_env.set("format", string.get::<_, LuaFunction>("format")?)?; let trace_fn = lua .load(TRACE_IMPL_LUA) + .set_name("=dbg.trace")? .set_environment(trace_env)? .into_function()?; lua.set_named_registry_value("dbg.trace", trace_fn)?; diff --git a/packages/lib/src/lua/ext.rs b/packages/lib/src/lua/ext.rs new file mode 100644 index 0000000..ce715ef --- /dev/null +++ b/packages/lib/src/lua/ext.rs @@ -0,0 +1,61 @@ +use async_trait::async_trait; +use futures_util::Future; +use mlua::prelude::*; + +use crate::{lua::task::TaskScheduler, utils::table::TableBuilder}; + +#[async_trait(?Send)] +pub trait LuaAsyncExt { + fn create_async_function<'lua, A, R, F, FR>(self, func: F) -> LuaResult> + where + A: FromLuaMulti<'static>, + R: ToLuaMulti<'static>, + F: 'static + Fn(&'static Lua, A) -> FR, + FR: 'static + Future>; +} + +impl LuaAsyncExt for &'static Lua { + /** + Creates a function callable from Lua that runs an async + closure and returns the results of it to the call site. + */ + fn create_async_function<'lua, A, R, F, FR>(self, func: F) -> LuaResult> + where + A: FromLuaMulti<'static>, + R: ToLuaMulti<'static>, + F: 'static + Fn(&'static Lua, A) -> FR, + FR: 'static + Future>, + { + let async_env_thread: LuaFunction = self.named_registry_value("co.thread")?; + let async_env_yield: LuaFunction = self.named_registry_value("co.yield")?; + let async_env = TableBuilder::new(self)? + .with_value("thread", async_env_thread)? + .with_value("yield", async_env_yield)? + .with_function( + "resumeAsync", + move |lua: &Lua, (thread, args): (LuaThread, A)| { + let fut = func(lua, args); + let sched = lua + .app_data_ref::<&TaskScheduler>() + .expect("Missing task scheduler as a lua app data"); + sched.queue_async_task(LuaValue::Thread(thread), None, None, async { + let rets = fut.await?; + let mult = rets.to_lua_multi(lua)?; + Ok(Some(mult)) + }) + }, + )? + .build_readonly()?; + let async_func = self + .load( + " + resumeAsync(thread(), ...) + return yield() + ", + ) + .set_name("asyncWrapper")? + .set_environment(async_env)? + .into_function()?; + Ok(async_func) + } +} diff --git a/packages/lib/src/lua/mod.rs b/packages/lib/src/lua/mod.rs index c48ac29..752b458 100644 --- a/packages/lib/src/lua/mod.rs +++ b/packages/lib/src/lua/mod.rs @@ -1,5 +1,6 @@ mod create; +pub mod ext; pub mod net; pub mod stdio; pub mod task; diff --git a/packages/lib/src/lua/task/ext/async_ext.rs b/packages/lib/src/lua/task/ext/async_ext.rs index f0cc5b7..97223bd 100644 --- a/packages/lib/src/lua/task/ext/async_ext.rs +++ b/packages/lib/src/lua/task/ext/async_ext.rs @@ -3,18 +3,11 @@ use async_trait::async_trait; use futures_util::Future; use mlua::prelude::*; -use crate::utils::table::TableBuilder; - use super::super::{ async_handle::TaskSchedulerAsyncHandle, message::TaskSchedulerMessage, scheduler::TaskReference, scheduler::TaskScheduler, }; -const TASK_ASYNC_IMPL_LUA: &str = r#" -resumeAsync(thread(), ...) -return yield() -"#; - /* ────────────────────────────────────────────────────────── Trait definition - same as the implementation, ignore this @@ -37,13 +30,6 @@ pub trait TaskSchedulerAsyncExt<'fut> { R: ToLuaMulti<'static>, F: 'static + Fn(&'static Lua) -> FR, FR: 'static + Future>; - - fn make_scheduled_async_fn(&self, func: F) -> LuaResult - where - A: FromLuaMulti<'static>, - R: ToLuaMulti<'static>, - F: 'static + Fn(&'static Lua, A) -> FR, - FR: 'static + Future>; } /* @@ -106,42 +92,4 @@ impl<'fut> TaskSchedulerAsyncExt<'fut> for TaskScheduler<'fut> { } }) } - - /** - Creates a function callable from Lua that runs an async - closure and returns the results of it to the call site. - */ - fn make_scheduled_async_fn(&self, func: F) -> LuaResult - where - A: FromLuaMulti<'static>, - R: ToLuaMulti<'static>, - F: 'static + Fn(&'static Lua, A) -> FR, - FR: 'static + Future>, - { - let async_env_thread: LuaFunction = self.lua.named_registry_value("co.thread")?; - let async_env_yield: LuaFunction = self.lua.named_registry_value("co.yield")?; - self.lua - .load(TASK_ASYNC_IMPL_LUA) - .set_environment( - TableBuilder::new(self.lua)? - .with_value("thread", async_env_thread)? - .with_value("yield", async_env_yield)? - .with_function( - "resumeAsync", - move |lua: &Lua, (thread, args): (LuaThread, A)| { - let fut = func(lua, args); - let sched = lua - .app_data_ref::<&TaskScheduler>() - .expect("Missing task scheduler - make sure it is added as a lua app data before the first scheduler resumption"); - sched.queue_async_task(LuaValue::Thread(thread), None, None, async { - let rets = fut.await?; - let mult = rets.to_lua_multi(lua)?; - Ok(Some(mult)) - }) - }, - )? - .build_readonly()?, - )? - .into_function() - } } diff --git a/packages/lib/src/lua/task/ext/schedule_ext.rs b/packages/lib/src/lua/task/ext/schedule_ext.rs index e55bdae..9455a53 100644 --- a/packages/lib/src/lua/task/ext/schedule_ext.rs +++ b/packages/lib/src/lua/task/ext/schedule_ext.rs @@ -26,18 +26,12 @@ pub trait TaskSchedulerScheduleExt { thread_args: LuaMultiValue<'_>, ) -> LuaResult; - fn schedule_delayed( + fn schedule_blocking_after_seconds( &self, after_secs: f64, thread_or_function: LuaValue<'_>, thread_args: LuaMultiValue<'_>, ) -> LuaResult; - - fn schedule_wait( - &self, - after_secs: f64, - thread_or_function: LuaValue<'_>, - ) -> LuaResult; } /* @@ -93,7 +87,7 @@ impl TaskSchedulerScheduleExt for TaskScheduler<'_> { The given lua thread or function will be resumed using the given `thread_args` as its argument(s). */ - fn schedule_delayed( + fn schedule_blocking_after_seconds( &self, after_secs: f64, thread_or_function: LuaValue<'_>, @@ -104,30 +98,4 @@ impl TaskSchedulerScheduleExt for TaskScheduler<'_> { Ok(None) }) } - - /** - Schedules a lua thread or function to - be resumed after waiting asynchronously. - - The given lua thread or function will be resumed - using the elapsed time as its one and only argument. - */ - fn schedule_wait( - &self, - after_secs: f64, - thread_or_function: LuaValue<'_>, - ) -> LuaResult { - self.queue_async_task( - thread_or_function, - None, - // Wait should recycle the guid of the current task, - // which ensures that the TaskReference is identical and - // that any waits inside of spawned tasks will also cancel - self.guid_running.get(), - async move { - sleep(Duration::from_secs_f64(after_secs)).await; - Ok(None) - }, - ) - } } diff --git a/packages/lib/src/lua/task/scheduler.rs b/packages/lib/src/lua/task/scheduler.rs index a78b199..9af693a 100644 --- a/packages/lib/src/lua/task/scheduler.rs +++ b/packages/lib/src/lua/task/scheduler.rs @@ -8,10 +8,7 @@ use std::{ use futures_util::{future::LocalBoxFuture, stream::FuturesUnordered, Future}; use mlua::prelude::*; -use tokio::{ - sync::{mpsc, Mutex as AsyncMutex}, - time::Instant, -}; +use tokio::sync::{mpsc, Mutex as AsyncMutex}; use super::message::TaskSchedulerMessage; pub use super::{task_kind::TaskKind, task_reference::TaskReference}; @@ -24,7 +21,6 @@ type TaskFuture<'fut> = LocalBoxFuture<'fut, (TaskReference, TaskFutureRets<'fut pub struct Task { thread: LuaRegistryKey, args: LuaRegistryKey, - queued_at: Instant, } /// A task scheduler that implements task queues @@ -147,11 +143,9 @@ impl<'fut> TaskScheduler<'fut> { let task_args_key: LuaRegistryKey = self.lua.create_registry_value(task_args_vec)?; let task_thread_key: LuaRegistryKey = self.lua.create_registry_value(task_thread)?; // Create the full task struct - let queued_at = Instant::now(); let task = Task { thread: task_thread_key, args: task_args_key, - queued_at, }; // Create the task ref to use let task_ref = if let Some(reusable_guid) = guid_to_reuse { @@ -239,33 +233,16 @@ impl<'fut> TaskScheduler<'fut> { }); self.lua.remove_registry_value(task.thread)?; self.lua.remove_registry_value(task.args)?; - if let Some(args_res) = args_opt_res { - match args_res { - Err(e) => Err(e), // FIXME: We need to throw this error in lua to let pcall & friends handle it properly - Ok(args) => { - self.guid_running.set(Some(reference.id())); - let rets = thread.resume::<_, LuaMultiValue>(args); - self.guid_running.set(None); - rets - } - } - } else { - /* - The tasks did not get any arguments from either: - - - Providing arguments at the call site for creating the task - - Returning arguments from a future that created this task - - The only tasks that do not get any arguments from either - of those sources are waiting tasks, and waiting tasks - want the amount of time waited returned to them. - */ - let elapsed = task.queued_at.elapsed().as_secs_f64(); - self.guid_running.set(Some(reference.id())); - let rets = thread.resume::<_, LuaMultiValue>(elapsed); - self.guid_running.set(None); - rets - } + self.guid_running.set(Some(reference.id())); + let rets = match args_opt_res { + Some(args_res) => match args_res { + Err(err) => Err(err), // FIXME: We need to throw this error in lua to let pcall & friends handle it properly + Ok(args) => thread.resume::<_, LuaMultiValue>(args), + }, + None => thread.resume(()), + }; + self.guid_running.set(None); + rets } /** @@ -284,7 +261,7 @@ impl<'fut> TaskScheduler<'fut> { -- Here we have either yielded or finished the above task ``` */ - pub(super) fn queue_blocking_task( + pub(crate) fn queue_blocking_task( &self, kind: TaskKind, thread_or_function: LuaValue<'_>, @@ -324,7 +301,7 @@ impl<'fut> TaskScheduler<'fut> { /** Queues a new future to run on the task scheduler. */ - pub(super) fn queue_async_task( + pub(crate) fn queue_async_task( &self, thread_or_function: LuaValue<'_>, thread_args: Option>, diff --git a/packages/lib/src/utils/table.rs b/packages/lib/src/utils/table.rs index c9c60f2..e11ddce 100644 --- a/packages/lib/src/utils/table.rs +++ b/packages/lib/src/utils/table.rs @@ -2,7 +2,7 @@ use std::future::Future; use mlua::prelude::*; -use crate::lua::task::{TaskScheduler, TaskSchedulerAsyncExt}; +use crate::lua::ext::LuaAsyncExt; pub struct TableBuilder { lua: &'static Lua, @@ -78,12 +78,8 @@ impl TableBuilder { F: 'static + Fn(&'static Lua, A) -> FR, FR: 'static + Future>, { - let sched = self - .lua - .app_data_ref::<&TaskScheduler>() - .expect("Missing task scheduler - make sure it is added as a lua app data before the first scheduler resumption"); - let func = sched.make_scheduled_async_fn(func)?; - self.with_value(key, LuaValue::Function(func)) + let f = self.lua.create_async_function(func)?; + self.with_value(key, LuaValue::Function(f)) } pub fn build_readonly(self) -> LuaResult> {