diff --git a/packages/lib/src/globals/task.rs b/packages/lib/src/globals/task.rs index 91a7a8b..70d54a4 100644 --- a/packages/lib/src/globals/task.rs +++ b/packages/lib/src/globals/task.rs @@ -6,13 +6,20 @@ use crate::{ }; const TASK_WAIT_IMPL_LUA: &str = r#" -resume_after(thread(), ...) +local seconds = ... +local current = thread() +resumeAfter(seconds, current) return yield() "#; const TASK_SPAWN_IMPL_LUA: &str = r#" -local task = resume_first(...) -resume_second(thread()) +-- Schedule the current thread at the front +scheduleNext(thread()) +-- Schedule the thread to spawn at the front, +-- the previous schedule now comes right after +local task = scheduleNext(...) +-- Give control over to the scheduler, which will +-- resume the above tasks in order when its ready yield() return task "#; @@ -46,8 +53,8 @@ pub fn create(lua: &'static Lua) -> LuaResult> { .with_value("thread", task_wait_env_thread)? .with_value("yield", task_wait_env_yield)? .with_function( - "resume_after", - |lua, (thread, secs): (LuaThread, Option)| { + "resumeAfter", + |lua, (secs, thread): (Option, LuaThread)| { let sched = lua.app_data_ref::<&TaskScheduler>().unwrap(); sched.schedule_wait(secs.unwrap_or(0f64), LuaValue::Thread(thread)) }, @@ -67,19 +74,12 @@ pub fn create(lua: &'static Lua) -> LuaResult> { .with_value("thread", task_spawn_env_thread)? .with_value("yield", task_spawn_env_yield)? .with_function( - "resume_first", + "scheduleNext", |lua, (tof, args): (LuaValue, LuaMultiValue)| { let sched = lua.app_data_ref::<&TaskScheduler>().unwrap(); - sched.schedule_current_resume(tof, args) + sched.schedule_next(tof, args) }, )? - .with_function("resume_second", |lua, thread: LuaThread| { - let sched = lua.app_data_ref::<&TaskScheduler>().unwrap(); - sched.schedule_after_current_resume( - LuaValue::Thread(thread), - LuaMultiValue::new(), - ) - })? .build_readonly()?, )? .into_function()?; @@ -138,7 +138,7 @@ pub fn create(lua: &'static Lua) -> LuaResult> { sched.create_task(TaskKind::Instant, LuaValue::Function(func), None, None)?; lua.create_function(move |lua, args: LuaMultiValue| { let sched = lua.app_data_ref::<&TaskScheduler>().unwrap(); - sched.resume_task(task, Some(args.into_vec())) + sched.resume_task(task, Some(Ok(args))) }) })?, )?; diff --git a/packages/lib/src/lib.rs b/packages/lib/src/lib.rs index 8b87127..75d5467 100644 --- a/packages/lib/src/lib.rs +++ b/packages/lib/src/lib.rs @@ -103,7 +103,7 @@ impl Lune { // Create our task scheduler and schedule the main thread on it let sched = TaskScheduler::new(lua)?.into_static(); lua.set_app_data(sched); - sched.schedule_current_resume( + sched.schedule_next( LuaValue::Function( lua.load(script_contents) .set_name(script_name) diff --git a/packages/lib/src/lua/task/async_handle.rs b/packages/lib/src/lua/task/async_handle.rs new file mode 100644 index 0000000..18bcb39 --- /dev/null +++ b/packages/lib/src/lua/task/async_handle.rs @@ -0,0 +1,46 @@ +use core::panic; + +use mlua::prelude::*; + +use tokio::sync::mpsc; + +use super::message::TaskSchedulerMessage; + +/** + A handle to a registered asynchronous background task. + + [`TaskSchedulerAsyncHandle::unregister`] must be + called upon completion of the background task to + prevent the task scheduler from running indefinitely. +*/ +#[must_use = "Background tasks must be unregistered"] +#[derive(Debug)] +pub struct TaskSchedulerAsyncHandle { + unregistered: bool, + sender: mpsc::UnboundedSender, +} + +impl TaskSchedulerAsyncHandle { + pub fn new(sender: mpsc::UnboundedSender) -> Self { + Self { + unregistered: false, + sender, + } + } + + pub fn unregister(mut self, result: LuaResult<()>) { + self.unregistered = true; + self.sender + .send(TaskSchedulerMessage::Terminated(result)) + .unwrap_or_else(|_| { + panic!( + "\ + \nFailed to unregister background task - this is an internal error! \ + \nPlease report it at {} \ + \nDetails: Manual \ + ", + env!("CARGO_PKG_REPOSITORY") + ) + }); + } +} diff --git a/packages/lib/src/lua/task/message.rs b/packages/lib/src/lua/task/message.rs new file mode 100644 index 0000000..e0af4f9 --- /dev/null +++ b/packages/lib/src/lua/task/message.rs @@ -0,0 +1,8 @@ +use mlua::prelude::*; + +#[derive(Debug, Clone)] +pub enum TaskSchedulerMessage { + NewBlockingTaskReady, + Spawned, + Terminated(LuaResult<()>), +} diff --git a/packages/lib/src/lua/task/mod.rs b/packages/lib/src/lua/task/mod.rs index 968d87d..516d5dd 100644 --- a/packages/lib/src/lua/task/mod.rs +++ b/packages/lib/src/lua/task/mod.rs @@ -1,3 +1,10 @@ +mod async_handle; +mod message; +mod result; mod scheduler; +mod task_kind; +mod task_reference; -pub use scheduler::*; +pub use scheduler::TaskScheduler; +pub use task_kind::TaskKind; +pub use task_reference::TaskReference; diff --git a/packages/lib/src/lua/task/result.rs b/packages/lib/src/lua/task/result.rs new file mode 100644 index 0000000..0b67d8f --- /dev/null +++ b/packages/lib/src/lua/task/result.rs @@ -0,0 +1,187 @@ +use std::{fmt, process::ExitCode}; + +use mlua::prelude::*; + +use super::TaskScheduler; + +/// A struct representing the current state of the task scheduler +#[derive(Debug, Clone)] +pub struct TaskSchedulerState { + lua_error: Option, + exit_code: Option, + num_blocking: usize, + num_futures: usize, + num_background: usize, +} + +impl TaskSchedulerState { + pub(super) fn new(sched: &TaskScheduler) -> Self { + const MESSAGE: &str = "\ + Failed to get lock on or borrow internal scheduler state!\ + \nMake sure not to call during task scheduler resumption"; + Self { + lua_error: None, + exit_code: sched.exit_code.get(), + num_blocking: sched + .tasks_queue_blocking + .try_borrow() + .expect(MESSAGE) + .len(), + num_futures: sched.futures.try_lock().expect(MESSAGE).len(), + num_background: sched.futures_registered_count.get(), + } + } + + pub(super) fn err(sched: &TaskScheduler, err: LuaError) -> Self { + let mut this = Self::new(sched); + this.lua_error = Some(err); + this + } + + pub(super) fn has_blocking_tasks(&self) -> bool { + self.num_blocking > 0 + } + + pub(super) fn has_future_tasks(&self) -> bool { + self.num_futures > 0 + } + + pub(super) fn has_background_tasks(&self) -> bool { + self.num_background > 0 + } + + /** + Returns a clone of the error from + this task scheduler result, if any. + */ + pub fn get_lua_error(&self) -> Option { + self.lua_error.clone() + } + + /** + Returns a clone of the exit code from + this task scheduler result, if any. + */ + pub fn get_exit_code(&self) -> Option { + self.exit_code + } + + /** + Returns `true` if the task scheduler still + has blocking lua threads left to run. + */ + pub fn is_blocking(&self) -> bool { + self.num_blocking > 0 + } + + /** + Returns `true` if the task scheduler has finished all + blocking lua tasks, but still has yielding tasks running. + */ + pub fn is_yielding(&self) -> bool { + self.num_blocking == 0 && self.num_futures > 0 + } + + /** + Returns `true` if the task scheduler has finished all + lua threads, but still has background tasks running. + */ + pub fn is_background(&self) -> bool { + self.num_blocking == 0 && self.num_futures == 0 && self.num_background > 0 + } + + /** + Returns `true` if the task scheduler is done, + meaning it has no lua threads left to run, and + no spawned tasks are running in the background. + */ + pub fn is_done(&self) -> bool { + self.num_blocking == 0 && self.num_futures == 0 && self.num_background == 0 + } +} + +impl fmt::Display for TaskSchedulerState { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let status = if self.is_blocking() { + "Busy" + } else if self.is_yielding() { + "Yielding" + } else if self.is_background() { + "Background" + } else { + "Done" + }; + let code = match self.get_exit_code() { + Some(code) => format!("{code:?}"), + None => "-".to_string(), + }; + let err = match self.get_lua_error() { + Some(e) => format!("{e:?}") + .as_bytes() + .chunks(42) // Kinda arbitrary but should fit in most terminals + .enumerate() + .map(|(idx, buf)| { + format!( + "{}{}{}{}{}", + if idx == 0 { "" } else { "\n│ " }, + if idx == 0 { + "".to_string() + } else { + " ".repeat(16) + }, + if idx == 0 { "" } else { " │ " }, + String::from_utf8_lossy(buf), + if buf.len() == 42 { " │" } else { "" }, + ) + }) + .collect::(), + None => "-".to_string(), + }; + let parts = vec![ + format!("Status │ {status}"), + format!("Tasks active │ {}", self.num_blocking), + format!("Tasks background │ {}", self.num_background), + format!("Status code │ {code}"), + format!("Lua error │ {err}"), + ]; + let lengths = parts + .iter() + .map(|part| { + part.lines() + .next() + .unwrap() + .trim_end_matches(" │") + .chars() + .count() + }) + .collect::>(); + let longest = &parts + .iter() + .enumerate() + .fold(0, |acc, (index, _)| acc.max(lengths[index])); + let sep = "─".repeat(longest + 2); + writeln!(f, "┌{}┐", &sep)?; + for (index, part) in parts.iter().enumerate() { + writeln!( + f, + "│ {}{} │", + part.trim_end_matches(" │"), + " ".repeat( + longest + - part + .lines() + .last() + .unwrap() + .trim_end_matches(" │") + .chars() + .count() + ) + )?; + if index < parts.len() - 1 { + writeln!(f, "┝{}┥", &sep)?; + } + } + write!(f, "└{}┘", &sep)?; + Ok(()) + } +} diff --git a/packages/lib/src/lua/task/scheduler.rs b/packages/lib/src/lua/task/scheduler.rs index ec56079..b81214a 100644 --- a/packages/lib/src/lua/task/scheduler.rs +++ b/packages/lib/src/lua/task/scheduler.rs @@ -1,12 +1,8 @@ use core::panic; use std::{ + cell::{Cell, RefCell}, collections::{HashMap, VecDeque}, - fmt, process::ExitCode, - sync::{ - atomic::{AtomicBool, AtomicUsize, Ordering}, - Arc, Mutex, - }, time::Duration, }; @@ -20,58 +16,19 @@ use tokio::{ use crate::utils::table::TableBuilder; -type TaskSchedulerQueue = Arc>>; +use super::{ + async_handle::TaskSchedulerAsyncHandle, message::TaskSchedulerMessage, + result::TaskSchedulerState, task_kind::TaskKind, task_reference::TaskReference, +}; -type TaskFutureArgsOverride<'fut> = Option>>; -type TaskFutureReturns<'fut> = LuaResult>; -type TaskFuture<'fut> = LocalBoxFuture<'fut, (TaskReference, TaskFutureReturns<'fut>)>; +type TaskFutureRets<'fut> = LuaResult>>; +type TaskFuture<'fut> = LocalBoxFuture<'fut, (TaskReference, TaskFutureRets<'fut>)>; const TASK_ASYNC_IMPL_LUA: &str = r#" -resume_async(thread(), ...) +resumeAsync(thread(), ...) return yield() "#; -/// An enum representing different kinds of tasks -#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] -pub enum TaskKind { - Instant, - Deferred, - Future, -} - -impl fmt::Display for TaskKind { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let name: &'static str = match self { - TaskKind::Instant => "Instant", - TaskKind::Deferred => "Deferred", - TaskKind::Future => "Future", - }; - write!(f, "{name}") - } -} - -/// A lightweight, copyable struct that represents a -/// task in the scheduler and is accessible from Lua -#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] -pub struct TaskReference { - kind: TaskKind, - guid: usize, -} - -impl TaskReference { - pub const fn new(kind: TaskKind, guid: usize) -> Self { - Self { kind, guid } - } -} - -impl fmt::Display for TaskReference { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "TaskReference({} - {})", self.kind, self.guid) - } -} - -impl LuaUserData for TaskReference {} - /// A struct representing a task contained in the task scheduler #[derive(Debug)] pub struct Task { @@ -80,244 +37,33 @@ pub struct Task { queued_at: Instant, } -/** - A handle to a registered background task. - - [`TaskSchedulerUnregistrar::unregister`] must be - called upon completion of the background task to - prevent the task scheduler from running indefinitely. -*/ -#[must_use = "Background tasks must be unregistered"] -#[derive(Debug)] -pub struct TaskSchedulerBackgroundTaskHandle { - unregistered: bool, - sender: mpsc::UnboundedSender, -} - -impl TaskSchedulerBackgroundTaskHandle { - pub fn new(sender: mpsc::UnboundedSender) -> Self { - Self { - unregistered: false, - sender, - } - } - - pub fn unregister(mut self, result: LuaResult<()>) { - self.unregistered = true; - self.sender - .send(TaskSchedulerMessage::Terminated(result)) - .unwrap_or_else(|_| { - panic!( - "\ - \nFailed to unregister background task - this is an internal error! \ - \nPlease report it at {} \ - \nDetails: Manual \ - ", - env!("CARGO_PKG_REPOSITORY") - ) - }); - } -} - -/// A struct representing the current state of the task scheduler -#[derive(Debug, Clone)] -pub struct TaskSchedulerResult { - lua_error: Option, - exit_code: Option, - num_instant: usize, - num_deferred: usize, - num_futures: usize, - num_background: usize, - num_active: usize, -} - -impl TaskSchedulerResult { - fn new(sched: &TaskScheduler) -> Self { - const MESSAGE: &str = - "Failed to get lock - make sure not to call during task scheduler resumption"; - let num_instant = sched.task_queue_instant.try_lock().expect(MESSAGE).len(); - let num_deferred = sched.task_queue_deferred.try_lock().expect(MESSAGE).len(); - let num_active = num_instant + num_deferred; - Self { - lua_error: None, - exit_code: if sched.exit_code_set.load(Ordering::Relaxed) { - Some(*sched.exit_code.try_lock().expect(MESSAGE)) - } else { - None - }, - num_instant, - num_deferred, - num_futures: sched.futures.try_lock().expect(MESSAGE).len(), - num_background: sched.futures_in_background.load(Ordering::Relaxed), - num_active, - } - } - - fn err(sched: &TaskScheduler, err: LuaError) -> Self { - let mut this = Self::new(sched); - this.lua_error = Some(err); - this - } - - /** - Returns a clone of the error from - this task scheduler result, if any. - */ - pub fn get_lua_error(&self) -> Option { - self.lua_error.clone() - } - - /** - Returns a clone of the exit code from - this task scheduler result, if any. - */ - pub fn get_exit_code(&self) -> Option { - self.exit_code - } - - /** - Returns `true` if the task scheduler is still busy, - meaning it still has lua threads left to run. - */ - pub fn is_busy(&self) -> bool { - self.num_active > 0 - } - - /** - Returns `true` if the task scheduler has finished all - blocking lua tasks, but still has yielding tasks running. - */ - pub fn is_yielding(&self) -> bool { - self.num_active == 0 && self.num_futures > 0 - } - - /** - Returns `true` if the task scheduler has finished all - lua threads, but still has background tasks running. - */ - pub fn is_background(&self) -> bool { - self.num_active == 0 && self.num_futures == 0 && self.num_background > 0 - } - - /** - Returns `true` if the task scheduler is done, - meaning it has no lua threads left to run, and - no spawned tasks are running in the background. - */ - pub fn is_done(&self) -> bool { - self.num_active == 0 && self.num_futures == 0 && self.num_background == 0 - } -} - -impl fmt::Display for TaskSchedulerResult { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let status = if self.is_busy() { - "Busy" - } else if self.is_yielding() { - "Yielding" - } else if self.is_background() { - "Background" - } else { - "Done" - }; - let code = match self.get_exit_code() { - Some(code) => format!("{code:?}"), - None => "-".to_string(), - }; - let err = match self.get_lua_error() { - Some(e) => format!("{e:?}") - .as_bytes() - .chunks(42) // Kinda arbitrary but should fit in most terminals - .enumerate() - .map(|(idx, buf)| { - format!( - "{}{}{}{}{}", - if idx == 0 { "" } else { "\n│ " }, - if idx == 0 { - "".to_string() - } else { - " ".repeat(16) - }, - if idx == 0 { "" } else { " │ " }, - String::from_utf8_lossy(buf), - if buf.len() == 42 { " │" } else { "" }, - ) - }) - .collect::(), - None => "-".to_string(), - }; - let parts = vec![ - format!("Status │ {status}"), - format!("Tasks active │ {}", self.num_active), - format!("Tasks background │ {}", self.num_background), - format!("Status code │ {code}"), - format!("Lua error │ {err}"), - ]; - let lengths = parts - .iter() - .map(|part| { - part.lines() - .next() - .unwrap() - .trim_end_matches(" │") - .chars() - .count() - }) - .collect::>(); - let longest = &parts - .iter() - .enumerate() - .fold(0, |acc, (index, _)| acc.max(lengths[index])); - let sep = "─".repeat(longest + 2); - writeln!(f, "┌{}┐", &sep)?; - for (index, part) in parts.iter().enumerate() { - writeln!( - f, - "│ {}{} │", - part.trim_end_matches(" │"), - " ".repeat( - longest - - part - .lines() - .last() - .unwrap() - .trim_end_matches(" │") - .chars() - .count() - ) - )?; - if index < parts.len() - 1 { - writeln!(f, "┝{}┥", &sep)?; - } - } - writeln!(f, "└{}┘", &sep)?; - Ok(()) - } -} - -#[derive(Debug, Clone)] -pub enum TaskSchedulerMessage { - NewBlockingTaskReady, - Spawned, - Terminated(LuaResult<()>), -} - /// A task scheduler that implements task queues /// with instant, deferred, and delayed tasks #[derive(Debug)] pub struct TaskScheduler<'fut> { + /* + Lots of cell and refcell here, however we need full interior mutability and never outer + since the scheduler struct may be accessed from lua more than once at the same time. + + An example of this is the implementation of coroutine.resume, which instantly resumes the given + task, where the task getting resumed may also create new scheduler tasks during its resumption. + + The same goes for values used during resumption of futures (`futures` and `futures_rx`) + which must use async-aware mutexes to be cancellation safe across await points. + */ + // Internal state & flags lua: &'static Lua, - tasks: Arc>>, - futures: Arc>>>, + guid: Cell, + guid_running: Cell>, + pub(super) exit_code: Cell>, + // Blocking tasks + pub(super) tasks: RefCell>, + pub(super) tasks_queue_blocking: RefCell>, + // Future tasks & objects for waking + pub(super) futures: AsyncMutex>>, + pub(super) futures_registered_count: Cell, futures_tx: mpsc::UnboundedSender, - futures_rx: Arc>>, - futures_in_background: AtomicUsize, - task_queue_instant: TaskSchedulerQueue, - task_queue_deferred: TaskSchedulerQueue, - exit_code_set: AtomicBool, - exit_code: Arc>, - guid: AtomicUsize, - guid_running_task: AtomicUsize, + futures_rx: AsyncMutex>, } impl<'fut> TaskScheduler<'fut> { @@ -328,19 +74,15 @@ impl<'fut> TaskScheduler<'fut> { let (tx, rx) = mpsc::unbounded_channel(); Ok(Self { lua, - tasks: Arc::new(Mutex::new(HashMap::new())), - futures: Arc::new(AsyncMutex::new(FuturesUnordered::new())), + guid: Cell::new(0), + guid_running: Cell::new(None), + exit_code: Cell::new(None), + tasks: RefCell::new(HashMap::new()), + tasks_queue_blocking: RefCell::new(VecDeque::new()), + futures: AsyncMutex::new(FuturesUnordered::new()), futures_tx: tx, - futures_rx: Arc::new(AsyncMutex::new(rx)), - futures_in_background: AtomicUsize::new(0), - task_queue_instant: Arc::new(Mutex::new(VecDeque::new())), - task_queue_deferred: Arc::new(Mutex::new(VecDeque::new())), - exit_code_set: AtomicBool::new(false), - exit_code: Arc::new(Mutex::new(ExitCode::SUCCESS)), - // Global ids must start at 1, since 0 is a special - // value for guid_running_task that means "no task" - guid: AtomicUsize::new(1), - guid_running_task: AtomicUsize::new(0), + futures_rx: AsyncMutex::new(rx), + futures_registered_count: Cell::new(0), }) } @@ -367,8 +109,7 @@ impl<'fut> TaskScheduler<'fut> { should stop resuming tasks, and gracefully terminate the program. */ pub fn set_exit_code(&self, code: ExitCode) { - *self.exit_code.lock().unwrap() = code; - self.exit_code_set.store(true, Ordering::Relaxed); + self.exit_code.set(Some(code)); } /** @@ -379,8 +120,7 @@ impl<'fut> TaskScheduler<'fut> { */ #[allow(dead_code)] pub fn contains_task(&self, reference: TaskReference) -> bool { - let tasks = self.tasks.lock().unwrap(); - tasks.contains_key(&reference) + self.tasks.borrow().contains_key(&reference) } /** @@ -411,6 +151,8 @@ impl<'fut> TaskScheduler<'fut> { } }; // Store the thread and its arguments in the registry + // NOTE: We must convert to a vec since multis + // can't be stored in the registry directly let task_args_vec: Option> = thread_args.map(|opt| opt.into_vec()); let task_args_key: LuaRegistryKey = self.lua.create_registry_value(task_args_vec)?; let task_thread_key: LuaRegistryKey = self.lua.create_registry_value(task_thread)?; @@ -425,12 +167,13 @@ impl<'fut> TaskScheduler<'fut> { let task_ref = if let Some(reusable_guid) = guid_to_reuse { TaskReference::new(kind, reusable_guid) } else { - let guid = self.guid.fetch_add(1, Ordering::Relaxed); + let guid = self.guid.get(); + self.guid.set(guid + 1); TaskReference::new(kind, guid) }; // Add the task to the scheduler { - let mut tasks = self.tasks.lock().unwrap(); + let mut tasks = self.tasks.borrow_mut(); tasks.insert(task_ref, task); } Ok(task_ref) @@ -456,13 +199,13 @@ impl<'fut> TaskScheduler<'fut> { are no tasks left in the task list, the futures do not matter there */ let mut found = false; - let mut tasks = self.tasks.lock().unwrap(); + let mut tasks = self.tasks.borrow_mut(); // Unfortunately we have to loop through to find which task // references to remove instead of removing directly since // tasks can switch kinds between instant, deferred, future let tasks_to_remove: Vec<_> = tasks .keys() - .filter(|task_ref| task_ref.guid == reference.guid) + .filter(|task_ref| task_ref.id() == reference.id()) .copied() .collect(); for task_ref in tasks_to_remove { @@ -486,27 +229,36 @@ impl<'fut> TaskScheduler<'fut> { pub fn resume_task<'a>( &self, reference: TaskReference, - override_args: Option>>, + override_args: Option>>, ) -> LuaResult> { - self.guid_running_task - .store(reference.guid, Ordering::Relaxed); let task = { - let mut tasks = self.tasks.lock().unwrap(); + let mut tasks = self.tasks.borrow_mut(); match tasks.remove(&reference) { Some(task) => task, None => return Ok(LuaMultiValue::new()), // Task was removed } }; let thread: LuaThread = self.lua.registry_value(&task.thread)?; - let args_vec_opt = override_args.or_else(|| { - self.lua + let args_opt_res = override_args.or_else(|| { + Ok(self + .lua .registry_value::>>(&task.args) .expect("Failed to get stored args for task") + .map(LuaMultiValue::from_vec)) + .transpose() }); self.lua.remove_registry_value(task.thread)?; self.lua.remove_registry_value(task.args)?; - let rets = if let Some(args) = args_vec_opt { - thread.resume::<_, LuaMultiValue>(LuaMultiValue::from_vec(args)) + if let Some(args_res) = args_opt_res { + match args_res { + Err(e) => Err(e), // FIXME: We need to throw this error in lua to let pcall & friends handle it properly + Ok(args) => { + self.guid_running.set(Some(reference.id())); + let rets = thread.resume::<_, LuaMultiValue>(args); + self.guid_running.set(None); + rets + } + } } else { /* The tasks did not get any arguments from either: @@ -519,10 +271,11 @@ impl<'fut> TaskScheduler<'fut> { want the amount of time waited returned to them. */ let elapsed = task.queued_at.elapsed().as_secs_f64(); - thread.resume::<_, LuaMultiValue>(elapsed) - }; - self.guid_running_task.store(0, Ordering::Relaxed); - rets + self.guid_running.set(Some(reference.id())); + let rets = thread.resume::<_, LuaMultiValue>(elapsed); + self.guid_running.set(None); + rets + } } /** @@ -541,48 +294,26 @@ impl<'fut> TaskScheduler<'fut> { -- Here we have either yielded or finished the above task ``` */ - fn queue_blocking_task( + pub(super) fn queue_blocking_task( &self, kind: TaskKind, thread_or_function: LuaValue<'_>, thread_args: Option>, guid_to_reuse: Option, - after_current_resume: bool, ) -> LuaResult { if kind == TaskKind::Future { panic!("Tried to schedule future using normal task schedule method") } let task_ref = self.create_task(kind, thread_or_function, thread_args, guid_to_reuse)?; - // Note that we create two new inner new - // scopes to drop mutexes as fast as possible - let num_prev_blocking_tasks = { - let (should_defer, num_prev_tasks, mut queue) = { - let queue_instant = self.task_queue_instant.lock().unwrap(); - let queue_deferred = self.task_queue_deferred.lock().unwrap(); - let num_prev_tasks = queue_instant.len() + queue_deferred.len(); - ( - kind == TaskKind::Deferred, - num_prev_tasks, - match kind { - TaskKind::Instant => queue_instant, - TaskKind::Deferred => queue_deferred, - TaskKind::Future => unreachable!(), - }, - ) - }; - if should_defer { - queue.push_back(task_ref); - } else if after_current_resume { - assert!( - queue.len() > 0, - "Cannot schedule a task after the first instant when task queue is empty" - ); - queue.insert(1, task_ref); - } else { - queue.push_front(task_ref); - } - num_prev_tasks - }; + // Add the task to the front of the queue, unless it + // should be deferred, in that case add it to the back + let mut queue = self.tasks_queue_blocking.borrow_mut(); + let num_prev_blocking_tasks = queue.len(); + if kind == TaskKind::Deferred { + queue.push_back(task_ref); + } else { + queue.push_front(task_ref); + } /* If we had any previous task and are currently async waiting on tasks, we should send a signal to wake up @@ -603,12 +334,12 @@ impl<'fut> TaskScheduler<'fut> { /** Queues a new future to run on the task scheduler. */ - fn queue_async_task( + pub(super) fn queue_async_task( &self, thread_or_function: LuaValue<'_>, thread_args: Option>, guid_to_reuse: Option, - fut: impl Future> + 'fut, + fut: impl Future> + 'fut, ) -> LuaResult { let task_ref = self.create_task( TaskKind::Future, @@ -634,7 +365,7 @@ impl<'fut> TaskScheduler<'fut> { The given lua thread or function will be resumed using the given `thread_args` as its argument(s). */ - pub fn schedule_current_resume( + pub fn schedule_next( &self, thread_or_function: LuaValue<'_>, thread_args: LuaMultiValue<'_>, @@ -644,34 +375,6 @@ impl<'fut> TaskScheduler<'fut> { thread_or_function, Some(thread_args), None, - false, - ) - } - - /** - Schedules a lua thread or function to resume ***after the first*** - currently resuming task, during this resumption point. - - The given lua thread or function will be resumed - using the given `thread_args` as its argument(s). - */ - pub fn schedule_after_current_resume( - &self, - thread_or_function: LuaValue<'_>, - thread_args: LuaMultiValue<'_>, - ) -> LuaResult { - self.queue_blocking_task( - TaskKind::Instant, - thread_or_function, - Some(thread_args), - // This should recycle the guid of the current task, - // since it will only be called to schedule resuming - // current thread after it gives resumption to another - match self.guid_running_task.load(Ordering::Relaxed) { - 0 => panic!("Tried to schedule with no task running"), - guid => Some(guid), - }, - true, ) } @@ -692,7 +395,6 @@ impl<'fut> TaskScheduler<'fut> { thread_or_function, Some(thread_args), None, - false, ) } @@ -733,15 +435,7 @@ impl<'fut> TaskScheduler<'fut> { // Wait should recycle the guid of the current task, // which ensures that the TaskReference is identical and // that any waits inside of spawned tasks will also cancel - match self.guid_running_task.load(Ordering::Relaxed) { - 0 => { - // NOTE: We had this here to verify the behavior of our task scheduler during development, - // but for re-implementing coroutine.resume (which is not registered) we must not panic here - // panic!("Tried to schedule waiting task with no registered task running") - None - } - guid => Some(guid), - }, + self.guid_running.get(), async move { sleep(Duration::from_secs_f64(after_secs)).await; Ok(None) @@ -756,12 +450,27 @@ impl<'fut> TaskScheduler<'fut> { The given lua thread or function will be resumed using the optional arguments returned by the future. */ - pub fn schedule_async( - &self, + #[allow(dead_code)] + pub fn schedule_async<'sched, R, F, FR>( + &'sched self, thread_or_function: LuaValue<'_>, - fut: impl Future> + 'fut, - ) -> LuaResult { - self.queue_async_task(thread_or_function, None, None, fut) + func: F, + ) -> LuaResult + where + 'sched: 'fut, // Scheduler must live at least as long as the future + R: ToLuaMulti<'static>, + F: 'static + Fn(&'static Lua) -> FR, + FR: 'static + Future>, + { + self.queue_async_task(thread_or_function, None, None, async move { + match func(self.lua).await { + Ok(res) => match res.to_lua_multi(self.lua) { + Ok(multi) => Ok(Some(multi)), + Err(e) => Err(e), + }, + Err(e) => Err(e), + } + }) } /** @@ -784,14 +493,14 @@ impl<'fut> TaskScheduler<'fut> { .with_value("thread", async_env_thread)? .with_value("yield", async_env_yield)? .with_function( - "resume_async", + "resumeAsync", move |lua: &Lua, (thread, args): (LuaThread, A)| { let fut = func(lua, args); let sched = lua.app_data_ref::<&TaskScheduler>().unwrap(); - sched.schedule_async(LuaValue::Thread(thread), async { + sched.queue_async_task(LuaValue::Thread(thread), None, None, async { let rets = fut.await?; let mult = rets.to_lua_multi(lua)?; - Ok(Some(mult.into_vec())) + Ok(Some(mult)) }) }, )? @@ -810,7 +519,7 @@ impl<'fut> TaskScheduler<'fut> { must be called upon completion of the background task to prevent the task scheduler from running indefinitely. */ - pub fn register_background_task(&self) -> TaskSchedulerBackgroundTaskHandle { + pub fn register_background_task(&self) -> TaskSchedulerAsyncHandle { let sender = self.futures_tx.clone(); sender .send(TaskSchedulerMessage::Spawned) @@ -824,110 +533,82 @@ impl<'fut> TaskScheduler<'fut> { env!("CARGO_PKG_REPOSITORY") ) }); - TaskSchedulerBackgroundTaskHandle::new(sender) - } - - /** - Retrieves the queue for a specific kind of task. - - Panics for [`TaskKind::Future`] since - futures do not use the normal task queue. - */ - fn get_queue(&self, kind: TaskKind) -> &TaskSchedulerQueue { - match kind { - TaskKind::Instant => &self.task_queue_instant, - TaskKind::Deferred => &self.task_queue_deferred, - TaskKind::Future => { - panic!("Future tasks do not use the normal task queue") - } - } + TaskSchedulerAsyncHandle::new(sender) } /** Resumes the next queued Lua task, if one exists, blocking the current thread until it either yields or finishes. */ - fn resume_next_queue_task( + fn resume_next_blocking_task( &self, - kind: TaskKind, - override_args: Option>, - ) -> TaskSchedulerResult { + override_args: Option>, + ) -> TaskSchedulerState { match { - let mut queue_guard = self.get_queue(kind).lock().unwrap(); - queue_guard.pop_front() + let mut queue_guard = self.tasks_queue_blocking.borrow_mut(); + let task = queue_guard.pop_front(); + drop(queue_guard); + task } { - None => TaskSchedulerResult::new(self), + None => TaskSchedulerState::new(self), Some(task) => match self.resume_task(task, override_args) { - Ok(_) => TaskSchedulerResult::new(self), - Err(task_err) => TaskSchedulerResult::err(self, task_err), + Ok(_) => TaskSchedulerState::new(self), + Err(task_err) => TaskSchedulerState::err(self, task_err), }, } } /** - Awaits the first available queued future, and resumes its - associated Lua task which will then be ready for resumption. + Awaits the first available queued future, and resumes its associated + Lua task which will be ready for resumption when that future wakes. Panics if there are no futures currently queued. Use [`TaskScheduler::next_queue_future_exists`] to check if there are any queued futures. */ - async fn resume_next_queue_future(&self) -> TaskSchedulerResult { - let result = { + async fn resume_next_async_task(&self) -> TaskSchedulerState { + let (task, result) = { let mut futs = self .futures .try_lock() - .expect("Failed to get lock on futures"); + .expect("Tried to resume next queued future while already resuming or modifying"); futs.next() .await .expect("Tried to resume next queued future but none are queued") }; - match result { - (task, Err(fut_err)) => { - // Future errored, don't resume its associated task - // and make sure to cancel / remove it completely, if removal - // also errors then we send that error back instead of the future's error - TaskSchedulerResult::err( - self, - match self.remove_task(task) { - Err(cancel_err) => cancel_err, - Ok(_) => fut_err, - }, - ) - } - (task, Ok(args)) => { - // Promote this future task to an instant task - // and resume the instant queue right away, taking - // care to not deadlock by dropping the mutex guard - let mut queue_guard = self.get_queue(TaskKind::Instant).lock().unwrap(); - queue_guard.push_front(task); - drop(queue_guard); - self.resume_next_queue_task(TaskKind::Instant, args) - } - } + // Promote this future task to a blocking task and resume it + // right away, also taking care to not borrow mutably twice + // by dropping this guard before trying to resume it + let mut queue_guard = self.tasks_queue_blocking.borrow_mut(); + queue_guard.push_front(task); + drop(queue_guard); + self.resume_next_blocking_task(result.transpose()) } /** Awaits the next background task registration message, if any messages exist in the queue. - This is a no-op if there are no messages. + This is a no-op if there are no background tasks left running + and / or the background task messages channel was closed. */ - async fn receive_next_message(&self) -> TaskSchedulerResult { + async fn receive_next_message(&self) -> TaskSchedulerState { let message_opt = { let mut rx = self.futures_rx.lock().await; rx.recv().await }; if let Some(message) = message_opt { match message { - TaskSchedulerMessage::NewBlockingTaskReady => TaskSchedulerResult::new(self), + TaskSchedulerMessage::NewBlockingTaskReady => TaskSchedulerState::new(self), TaskSchedulerMessage::Spawned => { - self.futures_in_background.fetch_add(1, Ordering::Relaxed); - TaskSchedulerResult::new(self) + let prev = self.futures_registered_count.get(); + self.futures_registered_count.set(prev + 1); + TaskSchedulerState::new(self) } TaskSchedulerMessage::Terminated(result) => { - let prev = self.futures_in_background.fetch_sub(1, Ordering::Relaxed); + let prev = self.futures_registered_count.get(); + self.futures_registered_count.set(prev - 1); if prev == 0 { panic!( r#" @@ -938,14 +619,14 @@ impl<'fut> TaskScheduler<'fut> { ) } if let Err(e) = result { - TaskSchedulerResult::err(self, e) + TaskSchedulerState::err(self, e) } else { - TaskSchedulerResult::new(self) + TaskSchedulerState::new(self) } } } } else { - TaskSchedulerResult::new(self) + TaskSchedulerState::new(self) } } @@ -958,37 +639,34 @@ impl<'fut> TaskScheduler<'fut> { this will process delayed tasks, waiting tasks, and native Rust futures concurrently, awaiting the first one to be ready for resumption. */ - pub async fn resume_queue(&self) -> TaskSchedulerResult { - let current = TaskSchedulerResult::new(self); + pub async fn resume_queue(&self) -> TaskSchedulerState { + let current = TaskSchedulerState::new(self); /* Resume tasks in the internal queue, in this order: * 🛑 = blocking - lua tasks, in order * ⏳ = async - first come, first serve - 1. 🛑 Tasks from task.spawn and the main thread - 2. 🛑 Tasks from task.defer - 3. ⏳ Tasks from task.delay / task.wait, spawned background tasks + 1. 🛑 Tasks from task.spawn / task.defer, the main thread + 2. ⏳ Tasks from task.delay / task.wait, spawned background tasks */ - if current.num_instant > 0 { - self.resume_next_queue_task(TaskKind::Instant, None) - } else if current.num_deferred > 0 { - self.resume_next_queue_task(TaskKind::Deferred, None) - } else if current.num_futures > 0 && current.num_background > 0 { + if current.has_blocking_tasks() { + self.resume_next_blocking_task(None) + } else if current.has_future_tasks() && current.has_background_tasks() { // Futures, spawned background tasks tokio::select! { - result = self.resume_next_queue_future() => result, + result = self.resume_next_async_task() => result, result = self.receive_next_message() => result, } - } else if current.num_futures > 0 { + } else if current.has_future_tasks() { // Futures - self.resume_next_queue_future().await - } else if current.num_background > 0 { + self.resume_next_async_task().await + } else if current.has_background_tasks() { // Only spawned background tasks, these may then // spawn new lua tasks and "wake up" the scheduler self.receive_next_message().await } else { - TaskSchedulerResult::new(self) + TaskSchedulerState::new(self) } } } diff --git a/packages/lib/src/lua/task/task_kind.rs b/packages/lib/src/lua/task/task_kind.rs new file mode 100644 index 0000000..7665153 --- /dev/null +++ b/packages/lib/src/lua/task/task_kind.rs @@ -0,0 +1,39 @@ +use std::fmt; + +/// An enum representing different kinds of tasks +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub enum TaskKind { + Instant, + Deferred, + Future, +} + +#[allow(dead_code)] +impl TaskKind { + pub fn is_instant(&self) -> bool { + *self == Self::Instant + } + + pub fn is_deferred(&self) -> bool { + *self == Self::Deferred + } + + pub fn is_blocking(&self) -> bool { + *self != Self::Future + } + + pub fn is_future(&self) -> bool { + *self == Self::Future + } +} + +impl fmt::Display for TaskKind { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let name: &'static str = match self { + TaskKind::Instant => "Instant", + TaskKind::Deferred => "Deferred", + TaskKind::Future => "Future", + }; + write!(f, "{name}") + } +} diff --git a/packages/lib/src/lua/task/task_reference.rs b/packages/lib/src/lua/task/task_reference.rs new file mode 100644 index 0000000..bfc6fbf --- /dev/null +++ b/packages/lib/src/lua/task/task_reference.rs @@ -0,0 +1,31 @@ +use std::fmt; + +use mlua::prelude::*; + +use super::task_kind::TaskKind; + +/// A lightweight, copyable struct that represents a +/// task in the scheduler and is accessible from Lua +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct TaskReference { + kind: TaskKind, + guid: usize, +} + +impl TaskReference { + pub const fn new(kind: TaskKind, guid: usize) -> Self { + Self { kind, guid } + } + + pub const fn id(&self) -> usize { + self.guid + } +} + +impl fmt::Display for TaskReference { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "TaskReference({} - {})", self.kind, self.guid) + } +} + +impl LuaUserData for TaskReference {} diff --git a/tests/net/serve/websockets.luau b/tests/net/serve/websockets.luau index 5e5985e..aa63903 100644 --- a/tests/net/serve/websockets.luau +++ b/tests/net/serve/websockets.luau @@ -1,4 +1,4 @@ -local PORT = 8080 +local PORT = 8081 local URL = `http://127.0.0.1:{PORT}` local WS_URL = `ws://127.0.0.1:{PORT}` local REQUEST = "Hello from client!"