From 8d44e1e82799f252cd4c947599aee04497b6205c Mon Sep 17 00:00:00 2001 From: Filip Tibell Date: Tue, 14 Feb 2023 23:17:03 +0100 Subject: [PATCH] Support built-in coroutine global in new task scheduler --- CHANGELOG.md | 3 +- packages/lib/src/globals/mod.rs | 11 ++ packages/lib/src/globals/net.rs | 2 +- packages/lib/src/globals/process.rs | 2 +- packages/lib/src/globals/task.rs | 52 +++++- packages/lib/src/lua/net/server.rs | 2 +- packages/lib/src/lua/task/scheduler.rs | 244 +++++++++++++------------ packages/lib/src/utils/table.rs | 2 +- tests/task/wait.luau | 12 +- 9 files changed, 204 insertions(+), 126 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7b7a944..4118f5c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,8 +17,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed -- Fixed `stdio.prompt` blocking all other lua threads while prompting for input +- Fixed remaining edge cases where the `task` and `coroutine` libraries weren't interoperable - Fixed `task.delay` keeping the script running even if it was cancelled using `task.cancel` +- Fixed `stdio.prompt` blocking all other lua threads while prompting for input ## `0.4.0` - February 11th, 2023 diff --git a/packages/lib/src/globals/mod.rs b/packages/lib/src/globals/mod.rs index a0b27de..2ddff96 100644 --- a/packages/lib/src/globals/mod.rs +++ b/packages/lib/src/globals/mod.rs @@ -72,6 +72,17 @@ impl LuneGlobal { matches!(self, Self::Require | Self::TopLevel) } + /** + Checks if this Lune global is an injector. + + An injector is similar to a proxy global but will inject + value(s) into the global lua environment during creation, + to ensure correct usage and compatibility with base Luau. + */ + pub fn is_injector(&self) -> bool { + matches!(self, Self::Task) + } + /** Creates the [`mlua::Table`] value for this Lune global. diff --git a/packages/lib/src/globals/net.rs b/packages/lib/src/globals/net.rs index 864d959..64cf15f 100644 --- a/packages/lib/src/globals/net.rs +++ b/packages/lib/src/globals/net.rs @@ -108,7 +108,7 @@ async fn net_serve<'a>( lua.create_registry_value(handler) .expect("Failed to store websocket handler") }); - let sched = lua.app_data_mut::<&TaskScheduler>().unwrap(); + let sched = lua.app_data_ref::<&TaskScheduler>().unwrap(); // Bind first to make sure that we can bind to this address let bound = match Server::try_bind(&([127, 0, 0, 1], port).into()) { Err(e) => { diff --git a/packages/lib/src/globals/process.rs b/packages/lib/src/globals/process.rs index 00ed28e..cccf165 100644 --- a/packages/lib/src/globals/process.rs +++ b/packages/lib/src/globals/process.rs @@ -50,7 +50,7 @@ pub fn create(lua: &'static Lua, args_vec: Vec) -> LuaResult { let process_exit_env_yield: LuaFunction = lua.named_registry_value("co.yield")?; let process_exit_env_exit: LuaFunction = lua.create_function(|lua, code: Option| { let exit_code = code.map_or(ExitCode::SUCCESS, ExitCode::from); - let sched = lua.app_data_mut::<&TaskScheduler>().unwrap(); + let sched = lua.app_data_ref::<&TaskScheduler>().unwrap(); sched.set_exit_code(exit_code); Ok(()) })?; diff --git a/packages/lib/src/globals/task.rs b/packages/lib/src/globals/task.rs index fb48f39..91a7a8b 100644 --- a/packages/lib/src/globals/task.rs +++ b/packages/lib/src/globals/task.rs @@ -1,7 +1,7 @@ use mlua::prelude::*; use crate::{ - lua::task::{TaskReference, TaskScheduler}, + lua::task::{TaskKind, TaskReference, TaskScheduler}, utils::table::TableBuilder, }; @@ -18,19 +18,20 @@ return task "#; pub fn create(lua: &'static Lua) -> LuaResult> { - // Create task spawning functions that add tasks to the scheduler + // Create a user-accessible function cancel tasks let task_cancel = lua.create_function(|lua, task: TaskReference| { - let sched = lua.app_data_mut::<&TaskScheduler>().unwrap(); + let sched = lua.app_data_ref::<&TaskScheduler>().unwrap(); sched.remove_task(task)?; Ok(()) })?; + // Create functions that manipulate non-blocking tasks in the scheduler let task_defer = lua.create_function(|lua, (tof, args): (LuaValue, LuaMultiValue)| { - let sched = lua.app_data_mut::<&TaskScheduler>().unwrap(); + let sched = lua.app_data_ref::<&TaskScheduler>().unwrap(); sched.schedule_deferred(tof, args) })?; let task_delay = lua.create_function(|lua, (secs, tof, args): (f64, LuaValue, LuaMultiValue)| { - let sched = lua.app_data_mut::<&TaskScheduler>().unwrap(); + let sched = lua.app_data_ref::<&TaskScheduler>().unwrap(); sched.schedule_delayed(secs, tof, args) })?; // Create our task wait function, this is a bit different since @@ -47,7 +48,7 @@ pub fn create(lua: &'static Lua) -> LuaResult> { .with_function( "resume_after", |lua, (thread, secs): (LuaThread, Option)| { - let sched = lua.app_data_mut::<&TaskScheduler>().unwrap(); + let sched = lua.app_data_ref::<&TaskScheduler>().unwrap(); sched.schedule_wait(secs.unwrap_or(0f64), LuaValue::Thread(thread)) }, )? @@ -68,12 +69,12 @@ pub fn create(lua: &'static Lua) -> LuaResult> { .with_function( "resume_first", |lua, (tof, args): (LuaValue, LuaMultiValue)| { - let sched = lua.app_data_mut::<&TaskScheduler>().unwrap(); + let sched = lua.app_data_ref::<&TaskScheduler>().unwrap(); sched.schedule_current_resume(tof, args) }, )? .with_function("resume_second", |lua, thread: LuaThread| { - let sched = lua.app_data_mut::<&TaskScheduler>().unwrap(); + let sched = lua.app_data_ref::<&TaskScheduler>().unwrap(); sched.schedule_after_current_resume( LuaValue::Thread(thread), LuaMultiValue::new(), @@ -106,6 +107,41 @@ pub fn create(lua: &'static Lua) -> LuaResult> { })?; globals.set("type", type_proxy)?; globals.set("typeof", typeof_proxy)?; + // Functions in the built-in coroutine library also need to be + // replaced, these are a bit different than the ones above because + // calling resume or the function that wrap returns must return + // whatever lua value(s) that the thread or task yielded back + let coroutine = globals.get::<_, LuaTable>("coroutine")?; + coroutine.set( + "resume", + lua.create_function(|lua, value: LuaValue| { + if let LuaValue::Thread(thread) = value { + let sched = lua.app_data_ref::<&TaskScheduler>().unwrap(); + let task = + sched.create_task(TaskKind::Instant, LuaValue::Thread(thread), None, None)?; + sched.resume_task(task, None) + } else if let Ok(task) = TaskReference::from_lua(value, lua) { + let sched = lua.app_data_ref::<&TaskScheduler>().unwrap(); + sched.resume_task(task, None) + } else { + Err(LuaError::RuntimeError( + "Argument #1 must be a thread".to_string(), + )) + } + })?, + )?; + coroutine.set( + "wrap", + lua.create_function(|lua, func: LuaFunction| { + let sched = lua.app_data_ref::<&TaskScheduler>().unwrap(); + let task = + sched.create_task(TaskKind::Instant, LuaValue::Function(func), None, None)?; + lua.create_function(move |lua, args: LuaMultiValue| { + let sched = lua.app_data_ref::<&TaskScheduler>().unwrap(); + sched.resume_task(task, Some(args.into_vec())) + }) + })?, + )?; // All good, return the task scheduler lib TableBuilder::new(lua)? .with_value("cancel", task_cancel)? diff --git a/packages/lib/src/lua/net/server.rs b/packages/lib/src/lua/net/server.rs index 7ec690d..624fcda 100644 --- a/packages/lib/src/lua/net/server.rs +++ b/packages/lib/src/lua/net/server.rs @@ -51,7 +51,7 @@ impl Service> for NetServiceInner { let _ws = ws.await.map_err(LuaError::external)?; // let sock = NetWebSocketServer::from(ws); // let table = sock.into_lua_table(lua)?; - // let sched = lua.app_data_mut::<&TaskScheduler>().unwrap(); + // let sched = lua.app_data_ref::<&TaskScheduler>().unwrap(); // sched.schedule_current_resume( // LuaValue::Function(handler), // LuaMultiValue::from_vec(vec![LuaValue::Table(table)]), diff --git a/packages/lib/src/lua/task/scheduler.rs b/packages/lib/src/lua/task/scheduler.rs index ab32cac..ec56079 100644 --- a/packages/lib/src/lua/task/scheduler.rs +++ b/packages/lib/src/lua/task/scheduler.rs @@ -135,6 +135,9 @@ impl TaskSchedulerResult { fn new(sched: &TaskScheduler) -> Self { const MESSAGE: &str = "Failed to get lock - make sure not to call during task scheduler resumption"; + let num_instant = sched.task_queue_instant.try_lock().expect(MESSAGE).len(); + let num_deferred = sched.task_queue_deferred.try_lock().expect(MESSAGE).len(); + let num_active = num_instant + num_deferred; Self { lua_error: None, exit_code: if sched.exit_code_set.load(Ordering::Relaxed) { @@ -142,11 +145,11 @@ impl TaskSchedulerResult { } else { None }, - num_instant: sched.task_queue_instant.try_lock().expect(MESSAGE).len(), - num_deferred: sched.task_queue_deferred.try_lock().expect(MESSAGE).len(), + num_instant, + num_deferred, num_futures: sched.futures.try_lock().expect(MESSAGE).len(), num_background: sched.futures_in_background.load(Ordering::Relaxed), - num_active: sched.tasks.try_lock().expect(MESSAGE).len(), + num_active, } } @@ -176,18 +179,24 @@ impl TaskSchedulerResult { Returns `true` if the task scheduler is still busy, meaning it still has lua threads left to run. */ - #[allow(dead_code)] pub fn is_busy(&self) -> bool { self.num_active > 0 } + /** + Returns `true` if the task scheduler has finished all + blocking lua tasks, but still has yielding tasks running. + */ + pub fn is_yielding(&self) -> bool { + self.num_active == 0 && self.num_futures > 0 + } + /** Returns `true` if the task scheduler has finished all lua threads, but still has background tasks running. */ - #[allow(dead_code)] pub fn is_background(&self) -> bool { - self.num_active == 0 && self.num_background > 0 + self.num_active == 0 && self.num_futures == 0 && self.num_background > 0 } /** @@ -196,7 +205,7 @@ impl TaskSchedulerResult { no spawned tasks are running in the background. */ pub fn is_done(&self) -> bool { - self.num_active == 0 && self.num_background == 0 + self.num_active == 0 && self.num_futures == 0 && self.num_background == 0 } } @@ -204,6 +213,8 @@ impl fmt::Display for TaskSchedulerResult { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let status = if self.is_busy() { "Busy" + } else if self.is_yielding() { + "Yielding" } else if self.is_background() { "Background" } else { @@ -360,12 +371,28 @@ impl<'fut> TaskScheduler<'fut> { self.exit_code_set.store(true, Ordering::Relaxed); } + /** + Checks if a task still exists in the scheduler. + + A task may no longer exist in the scheduler if it has been manually + cancelled and removed by calling [`TaskScheduler::cancel_task()`]. + */ + #[allow(dead_code)] + pub fn contains_task(&self, reference: TaskReference) -> bool { + let tasks = self.tasks.lock().unwrap(); + tasks.contains_key(&reference) + } + /** Creates a new task, storing a new Lua thread for it, as well as the arguments to give the thread on resumption, in the Lua registry. + + Note that this task will ***not*** resume on its + own, it needs to be used together with either the + scheduling functions or [`TaskScheduler::resume_task`]. */ - fn create_task( + pub fn create_task( &self, kind: TaskKind, thread_or_function: LuaValue<'_>, @@ -409,6 +436,95 @@ impl<'fut> TaskScheduler<'fut> { Ok(task_ref) } + /** + Cancels a task, if the task still exists in the scheduler. + + It is possible to hold one or more task references that point + to a task that no longer exists in the scheduler, and calling + this method with one of those references will return `false`. + */ + pub fn remove_task(&self, reference: TaskReference) -> LuaResult { + /* + Remove the task from the task list and the Lua registry + + This is all we need to do since resume_task will always + ignore resumption of any task that no longer exists there + + This does lead to having some amount of "junk" futures that will + build up in the queue but these will get cleaned up and not block + the program from exiting since the scheduler only runs until there + are no tasks left in the task list, the futures do not matter there + */ + let mut found = false; + let mut tasks = self.tasks.lock().unwrap(); + // Unfortunately we have to loop through to find which task + // references to remove instead of removing directly since + // tasks can switch kinds between instant, deferred, future + let tasks_to_remove: Vec<_> = tasks + .keys() + .filter(|task_ref| task_ref.guid == reference.guid) + .copied() + .collect(); + for task_ref in tasks_to_remove { + if let Some(task) = tasks.remove(&task_ref) { + self.lua.remove_registry_value(task.thread)?; + self.lua.remove_registry_value(task.args)?; + found = true; + } + } + Ok(found) + } + + /** + Resumes a task, if the task still exists in the scheduler. + + A task may no longer exist in the scheduler if it has been manually + cancelled and removed by calling [`TaskScheduler::cancel_task()`]. + + This will be a no-op if the task no longer exists. + */ + pub fn resume_task<'a>( + &self, + reference: TaskReference, + override_args: Option>>, + ) -> LuaResult> { + self.guid_running_task + .store(reference.guid, Ordering::Relaxed); + let task = { + let mut tasks = self.tasks.lock().unwrap(); + match tasks.remove(&reference) { + Some(task) => task, + None => return Ok(LuaMultiValue::new()), // Task was removed + } + }; + let thread: LuaThread = self.lua.registry_value(&task.thread)?; + let args_vec_opt = override_args.or_else(|| { + self.lua + .registry_value::>>(&task.args) + .expect("Failed to get stored args for task") + }); + self.lua.remove_registry_value(task.thread)?; + self.lua.remove_registry_value(task.args)?; + let rets = if let Some(args) = args_vec_opt { + thread.resume::<_, LuaMultiValue>(LuaMultiValue::from_vec(args)) + } else { + /* + The tasks did not get any arguments from either: + + - Providing arguments at the call site for creating the task + - Returning arguments from a future that created this task + + The only tasks that do not get any arguments from either + of those sources are waiting tasks, and waiting tasks + want the amount of time waited returned to them. + */ + let elapsed = task.queued_at.elapsed().as_secs_f64(); + thread.resume::<_, LuaMultiValue>(elapsed) + }; + self.guid_running_task.store(0, Ordering::Relaxed); + rets + } + /** Queues a new task to run on the task scheduler. @@ -618,7 +734,12 @@ impl<'fut> TaskScheduler<'fut> { // which ensures that the TaskReference is identical and // that any waits inside of spawned tasks will also cancel match self.guid_running_task.load(Ordering::Relaxed) { - 0 => panic!("Tried to schedule waiting task with no task running"), + 0 => { + // NOTE: We had this here to verify the behavior of our task scheduler during development, + // but for re-implementing coroutine.resume (which is not registered) we must not panic here + // panic!("Tried to schedule waiting task with no registered task running") + None + } guid => Some(guid), }, async move { @@ -666,7 +787,7 @@ impl<'fut> TaskScheduler<'fut> { "resume_async", move |lua: &Lua, (thread, args): (LuaThread, A)| { let fut = func(lua, args); - let sched = lua.app_data_mut::<&TaskScheduler>().unwrap(); + let sched = lua.app_data_ref::<&TaskScheduler>().unwrap(); sched.schedule_async(LuaValue::Thread(thread), async { let rets = fut.await?; let mult = rets.to_lua_multi(lua)?; @@ -679,107 +800,6 @@ impl<'fut> TaskScheduler<'fut> { .into_function() } - /** - Checks if a task still exists in the scheduler. - - A task may no longer exist in the scheduler if it has been manually - cancelled and removed by calling [`TaskScheduler::cancel_task()`]. - */ - #[allow(dead_code)] - pub fn contains_task(&self, reference: TaskReference) -> bool { - let tasks = self.tasks.lock().unwrap(); - tasks.contains_key(&reference) - } - - /** - Cancels a task, if the task still exists in the scheduler. - - It is possible to hold one or more task references that point - to a task that no longer exists in the scheduler, and calling - this method with one of those references will return `false`. - */ - pub fn remove_task(&self, reference: TaskReference) -> LuaResult { - /* - Remove the task from the task list and the Lua registry - - This is all we need to do since resume_task will always - ignore resumption of any task that no longer exists there - - This does lead to having some amount of "junk" futures that will - build up in the queue but these will get cleaned up and not block - the program from exiting since the scheduler only runs until there - are no tasks left in the task list, the futures do not matter there - */ - let mut found = false; - let mut tasks = self.tasks.lock().unwrap(); - // Unfortunately we have to loop through to find which task - // references to remove instead of removing directly since - // tasks can switch kinds between instant, deferred, future - let tasks_to_remove: Vec<_> = tasks - .keys() - .filter(|task_ref| task_ref.guid == reference.guid) - .copied() - .collect(); - for task_ref in tasks_to_remove { - if let Some(task) = tasks.remove(&task_ref) { - self.lua.remove_registry_value(task.thread)?; - self.lua.remove_registry_value(task.args)?; - found = true; - } - } - Ok(found) - } - - /** - Resumes a task, if the task still exists in the scheduler. - - A task may no longer exist in the scheduler if it has been manually - cancelled and removed by calling [`TaskScheduler::cancel_task()`]. - - This will be a no-op if the task no longer exists. - */ - pub fn resume_task( - &self, - reference: TaskReference, - override_args: Option>, - ) -> LuaResult<()> { - self.guid_running_task - .store(reference.guid, Ordering::Relaxed); - let task = { - let mut tasks = self.tasks.lock().unwrap(); - match tasks.remove(&reference) { - Some(task) => task, - None => return Ok(()), // Task was removed - } - }; - let thread: LuaThread = self.lua.registry_value(&task.thread)?; - let args_vec_opt = override_args.or_else(|| { - self.lua - .registry_value::>>(&task.args) - .expect("Failed to get stored args for task") - }); - self.lua.remove_registry_value(task.thread)?; - self.lua.remove_registry_value(task.args)?; - if let Some(args) = args_vec_opt { - thread.resume::<_, LuaMultiValue>(LuaMultiValue::from_vec(args))?; - } else { - /* - The tasks did not get any arguments from either: - - - Providing arguments at the call site for creating the task - - Returning arguments from a future that created this task - - The only tasks that do not get any arguments from either - of those sources are waiting tasks, and waiting tasks - want the amount of time waited returned to them. - */ - let elapsed = task.queued_at.elapsed().as_secs_f64(); - thread.resume::<_, LuaMultiValue>(elapsed)?; - } - self.guid_running_task.store(0, Ordering::Relaxed); - Ok(()) - } - /** Registers a new background task with the task scheduler. @@ -838,7 +858,7 @@ impl<'fut> TaskScheduler<'fut> { } { None => TaskSchedulerResult::new(self), Some(task) => match self.resume_task(task, override_args) { - Ok(()) => TaskSchedulerResult::new(self), + Ok(_) => TaskSchedulerResult::new(self), Err(task_err) => TaskSchedulerResult::err(self, task_err), }, } diff --git a/packages/lib/src/utils/table.rs b/packages/lib/src/utils/table.rs index 207b72b..2ec39ab 100644 --- a/packages/lib/src/utils/table.rs +++ b/packages/lib/src/utils/table.rs @@ -78,7 +78,7 @@ impl TableBuilder { F: 'static + Fn(&'static Lua, A) -> FR, FR: 'static + Future>, { - let sched = self.lua.app_data_mut::<&TaskScheduler>().unwrap(); + let sched = self.lua.app_data_ref::<&TaskScheduler>().unwrap(); let func = sched.make_scheduled_async_fn(func)?; self.with_value(key, LuaValue::Function(func)) } diff --git a/tests/task/wait.luau b/tests/task/wait.luau index d7dde6f..437f30e 100644 --- a/tests/task/wait.luau +++ b/tests/task/wait.luau @@ -44,7 +44,8 @@ measure(1 / 30) measure(1 / 20) measure(1 / 10) --- Wait should work in other threads, too +-- Wait should work in other threads, including +-- ones created by the built-in coroutine library local flag: boolean = false task.spawn(function() @@ -63,3 +64,12 @@ end)) assert(not flag2, "Wait failed while inside coroutine (1)") task.wait(0.2) assert(flag2, "Wait failed while inside coroutine (2)") + +local flag3: boolean = false +coroutine.wrap(function() + task.wait(0.1) + flag3 = true +end)() +assert(not flag3, "Wait failed while inside wrap (1)") +task.wait(0.2) +assert(flag3, "Wait failed while inside wrap (2)")