diff --git a/packages/lib/src/globals/mod.rs b/packages/lib/src/globals/mod.rs index 6688de2..a0b27de 100644 --- a/packages/lib/src/globals/mod.rs +++ b/packages/lib/src/globals/mod.rs @@ -2,8 +2,6 @@ use std::fmt::{Display, Formatter, Result as FmtResult}; use mlua::prelude::*; -use crate::lua::task::TaskScheduler; - mod fs; mod net; mod process; @@ -80,18 +78,14 @@ impl LuneGlobal { Note that proxy globals should be handled with special care and that [`LuneGlobal::inject()`] should be preferred over manually creating and manipulating the value(s) of any Lune global. */ - pub fn value( - &self, - lua: &'static Lua, - scheduler: &'static TaskScheduler, - ) -> LuaResult { + pub fn value(&self, lua: &'static Lua) -> LuaResult { match self { LuneGlobal::Fs => fs::create(lua), LuneGlobal::Net => net::create(lua), LuneGlobal::Process { args } => process::create(lua, args.clone()), LuneGlobal::Require => require::create(lua), LuneGlobal::Stdio => stdio::create(lua), - LuneGlobal::Task => task::create(lua, scheduler), + LuneGlobal::Task => task::create(lua), LuneGlobal::TopLevel => top_level::create(lua), } } @@ -104,9 +98,9 @@ impl LuneGlobal { Refer to [`LuneGlobal::is_top_level()`] for more info on proxy globals. */ - pub fn inject(self, lua: &'static Lua, scheduler: &'static TaskScheduler) -> LuaResult<()> { + pub fn inject(self, lua: &'static Lua) -> LuaResult<()> { let globals = lua.globals(); - let table = self.value(lua, scheduler)?; + let table = self.value(lua)?; // NOTE: Top level globals are special, the values // *in* the table they return should be set directly, // instead of setting the table itself as the global diff --git a/packages/lib/src/globals/net.rs b/packages/lib/src/globals/net.rs index 0c257c9..4b31d21 100644 --- a/packages/lib/src/globals/net.rs +++ b/packages/lib/src/globals/net.rs @@ -164,6 +164,8 @@ async fn net_serve<'a>( .expect("Failed to store websocket handler") })); let server = Server::bind(&([127, 0, 0, 1], port).into()) + .http1_only(true) + .http1_keepalive(true) .executor(LocalExec) .serve(MakeNetService( lua, diff --git a/packages/lib/src/globals/process.rs b/packages/lib/src/globals/process.rs index 38708a2..00ed28e 100644 --- a/packages/lib/src/globals/process.rs +++ b/packages/lib/src/globals/process.rs @@ -50,7 +50,7 @@ pub fn create(lua: &'static Lua, args_vec: Vec) -> LuaResult { let process_exit_env_yield: LuaFunction = lua.named_registry_value("co.yield")?; let process_exit_env_exit: LuaFunction = lua.create_function(|lua, code: Option| { let exit_code = code.map_or(ExitCode::SUCCESS, ExitCode::from); - let sched = &mut lua.app_data_mut::<&TaskScheduler>().unwrap(); + let sched = lua.app_data_mut::<&TaskScheduler>().unwrap(); sched.set_exit_code(exit_code); Ok(()) })?; diff --git a/packages/lib/src/globals/task.rs b/packages/lib/src/globals/task.rs index 379548a..55b5376 100644 --- a/packages/lib/src/globals/task.rs +++ b/packages/lib/src/globals/task.rs @@ -10,23 +10,27 @@ resume_after(thread(), ...) return yield() "#; -pub fn create( - lua: &'static Lua, - scheduler: &'static TaskScheduler, -) -> LuaResult> { - lua.set_app_data(scheduler); +const TASK_SPAWN_IMPL_LUA: &str = r#" +local task = resume_first(...) +resume_second(thread()) +yield() +return task +"#; + +pub fn create(lua: &'static Lua) -> LuaResult> { // Create task spawning functions that add tasks to the scheduler - let task_spawn = lua.create_function(|lua, (tof, args): (LuaValue, LuaMultiValue)| { - let sched = &mut lua.app_data_mut::<&TaskScheduler>().unwrap(); - sched.schedule_instant(tof, args) + let task_cancel = lua.create_function(|lua, task: TaskReference| { + let sched = lua.app_data_mut::<&TaskScheduler>().unwrap(); + sched.cancel_task(task)?; + Ok(()) })?; let task_defer = lua.create_function(|lua, (tof, args): (LuaValue, LuaMultiValue)| { - let sched = &mut lua.app_data_mut::<&TaskScheduler>().unwrap(); + let sched = lua.app_data_mut::<&TaskScheduler>().unwrap(); sched.schedule_deferred(tof, args) })?; let task_delay = lua.create_function(|lua, (secs, tof, args): (f64, LuaValue, LuaMultiValue)| { - let sched = &mut lua.app_data_mut::<&TaskScheduler>().unwrap(); + let sched = lua.app_data_mut::<&TaskScheduler>().unwrap(); sched.schedule_delayed(secs, tof, args) })?; // Create our task wait function, this is a bit different since @@ -43,13 +47,41 @@ pub fn create( .with_function( "resume_after", |lua, (thread, secs): (LuaThread, Option)| { - let sched = &mut lua.app_data_mut::<&TaskScheduler>().unwrap(); - sched.resume_after(secs.unwrap_or(0f64), thread) + let sched = lua.app_data_mut::<&TaskScheduler>().unwrap(); + sched.schedule_wait(secs.unwrap_or(0f64), LuaValue::Thread(thread)) }, )? .build_readonly()?, )? .into_function()?; + // The spawn function also needs special treatment, + // we need to yield right away to allow the + // spawned task to run until first yield + let task_spawn_env_thread: LuaFunction = lua.named_registry_value("co.thread")?; + let task_spawn_env_yield: LuaFunction = lua.named_registry_value("co.yield")?; + let task_spawn = lua + .load(TASK_SPAWN_IMPL_LUA) + .set_environment( + TableBuilder::new(lua)? + .with_value("thread", task_spawn_env_thread)? + .with_value("yield", task_spawn_env_yield)? + .with_function( + "resume_first", + |lua, (tof, args): (LuaValue, LuaMultiValue)| { + let sched = lua.app_data_mut::<&TaskScheduler>().unwrap(); + sched.schedule_current_resume(tof, args) + }, + )? + .with_function("resume_second", |lua, thread: LuaThread| { + let sched = lua.app_data_mut::<&TaskScheduler>().unwrap(); + sched.schedule_after_current_resume( + LuaValue::Thread(thread), + LuaMultiValue::new(), + ) + })? + .build_readonly()?, + )? + .into_function()?; // We want the task scheduler to be transparent, // but it does not return real lua threads, so // we need to override some globals to fake it @@ -76,6 +108,7 @@ pub fn create( globals.set("typeof", typeof_proxy)?; // All good, return the task scheduler lib TableBuilder::new(lua)? + .with_value("cancel", task_cancel)? .with_value("spawn", task_spawn)? .with_value("defer", task_defer)? .with_value("delay", task_delay)? diff --git a/packages/lib/src/lib.rs b/packages/lib/src/lib.rs index 36080e7..20afaaf 100644 --- a/packages/lib/src/lib.rs +++ b/packages/lib/src/lib.rs @@ -1,6 +1,6 @@ use std::{collections::HashSet, process::ExitCode}; -use lua::task::TaskScheduler; +use lua::task::{TaskScheduler, TaskSchedulerResult}; use mlua::prelude::*; use tokio::task::LocalSet; @@ -88,10 +88,7 @@ impl Lune { script_name: &str, script_contents: &str, ) -> Result { - let set = LocalSet::new(); let lua = Lua::new().into_static(); - let sched = TaskScheduler::new(lua)?.into_static(); - lua.set_app_data(sched); // Store original lua global functions in the registry so we can use // them later without passing them around and dealing with lifetimes lua.set_named_registry_value("require", lua.globals().get::<_, LuaFunction>("require")?)?; @@ -105,11 +102,13 @@ impl Lune { // Add in wanted lune globals for global in self.includes.clone() { if !self.excludes.contains(&global) { - global.inject(lua, sched)?; + global.inject(lua)?; } } - // Schedule the main thread on the task scheduler - sched.schedule_instant( + // Create our task scheduler and schedule the main thread on it + let sched = TaskScheduler::new(lua)?.into_static(); + lua.set_app_data(sched); + sched.schedule_current_resume( LuaValue::Function( lua.load(script_contents) .set_name(script_name) @@ -120,30 +119,30 @@ impl Lune { LuaValue::Nil.to_lua_multi(lua)?, )?; // Keep running the scheduler until there are either no tasks - // left to run, or until some task requests to exit the process - let exit_code = set - .run_until(async { - let mut got_error = false; - while let Some(result) = sched.resume_queue().await { - match result { - Err(e) => { - eprintln!("{}", pretty_format_luau_error(&e)); + // left to run, or until a task requests to exit the process + let exit_code = LocalSet::new() + .run_until(async move { + loop { + let mut got_error = false; + let state = match sched.resume_queue().await { + TaskSchedulerResult::TaskSuccessful { state } => state, + TaskSchedulerResult::TaskErrored { state, error } => { + eprintln!("{}", pretty_format_luau_error(&error)); got_error = true; + state } - Ok(status) => { - if let Some(exit_code) = status.exit_code { - return exit_code; - } else if status.num_total == 0 { - return ExitCode::SUCCESS; - } + TaskSchedulerResult::Finished { state } => state, + }; + if let Some(exit_code) = state.exit_code { + return exit_code; + } else if state.num_total == 0 { + if got_error { + return ExitCode::FAILURE; + } else { + return ExitCode::SUCCESS; } } } - if got_error { - ExitCode::FAILURE - } else { - ExitCode::SUCCESS - } }) .await; Ok(exit_code) diff --git a/packages/lib/src/lua/task/scheduler.rs b/packages/lib/src/lua/task/scheduler.rs index 5230111..a1e89e0 100644 --- a/packages/lib/src/lua/task/scheduler.rs +++ b/packages/lib/src/lua/task/scheduler.rs @@ -1,3 +1,4 @@ +use core::panic; use std::{ collections::{HashMap, VecDeque}, fmt, @@ -9,18 +10,26 @@ use std::{ time::Duration, }; +use futures_util::{future::BoxFuture, stream::FuturesUnordered, StreamExt}; use mlua::prelude::*; -use tokio::time::{sleep, Instant}; +use tokio::{ + sync::Mutex as AsyncMutex, + time::{sleep, Instant}, +}; type TaskSchedulerQueue = Arc>>; +type TaskFutureArgsOverride<'fut> = Option>>; +type TaskFutureResult<'fut> = (TaskReference, LuaResult>); +type TaskFuture<'fut> = BoxFuture<'fut, TaskFutureResult<'fut>>; + /// An enum representing different kinds of tasks #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] pub enum TaskKind { Instant, Deferred, - Yielded, + Future, } impl fmt::Display for TaskKind { @@ -28,7 +37,7 @@ impl fmt::Display for TaskKind { let name: &'static str = match self { TaskKind::Instant => "Instant", TaskKind::Deferred => "Deferred", - TaskKind::Yielded => "Yielded", + TaskKind::Future => "Future", }; write!(f, "{name}") } @@ -40,16 +49,11 @@ impl fmt::Display for TaskKind { pub struct TaskReference { kind: TaskKind, guid: usize, - queued_target: Option, } impl TaskReference { - pub const fn new(kind: TaskKind, guid: usize, queued_target: Option) -> Self { - Self { - kind, - guid, - queued_target, - } + pub const fn new(kind: TaskKind, guid: usize) -> Self { + Self { kind, guid } } } @@ -61,114 +65,140 @@ impl fmt::Display for TaskReference { impl LuaUserData for TaskReference {} -impl From<&Task> for TaskReference { - fn from(value: &Task) -> Self { - Self::new(value.kind, value.guid, value.queued_target) - } -} - /// A struct representing a task contained in the task scheduler #[derive(Debug)] pub struct Task { - kind: TaskKind, - guid: usize, thread: LuaRegistryKey, args: LuaRegistryKey, queued_at: Instant, - queued_target: Option, } /// A struct representing the current status of the task scheduler #[derive(Debug, Clone, Copy)] -pub struct TaskSchedulerStatus { +pub struct TaskSchedulerState { pub exit_code: Option, pub num_instant: usize, pub num_deferred: usize, - pub num_yielded: usize, + pub num_future: usize, pub num_total: usize, } -impl fmt::Display for TaskSchedulerStatus { +impl fmt::Display for TaskSchedulerState { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!( f, "TaskSchedulerStatus(\nInstant: {}\nDeferred: {}\nYielded: {}\nTotal: {})", - self.num_instant, self.num_deferred, self.num_yielded, self.num_total + self.num_instant, self.num_deferred, self.num_future, self.num_total ) } } +#[derive(Debug, Clone)] +pub enum TaskSchedulerResult { + Finished { + state: TaskSchedulerState, + }, + TaskErrored { + error: LuaError, + state: TaskSchedulerState, + }, + TaskSuccessful { + state: TaskSchedulerState, + }, +} + /// A task scheduler that implements task queues /// with instant, deferred, and delayed tasks #[derive(Debug)] -pub struct TaskScheduler { +pub struct TaskScheduler<'fut> { lua: &'static Lua, guid: AtomicUsize, - running: bool, tasks: Arc>>, + futures: Arc>>>, task_queue_instant: TaskSchedulerQueue, task_queue_deferred: TaskSchedulerQueue, - task_queue_yielded: TaskSchedulerQueue, exit_code_set: AtomicBool, exit_code: Arc>, } -impl TaskScheduler { +impl<'fut> TaskScheduler<'fut> { + /** + Creates a new task scheduler. + */ pub fn new(lua: &'static Lua) -> LuaResult { Ok(Self { lua, guid: AtomicUsize::new(0), - running: false, tasks: Arc::new(Mutex::new(HashMap::new())), + futures: Arc::new(AsyncMutex::new(FuturesUnordered::new())), task_queue_instant: Arc::new(Mutex::new(VecDeque::new())), task_queue_deferred: Arc::new(Mutex::new(VecDeque::new())), - task_queue_yielded: Arc::new(Mutex::new(VecDeque::new())), exit_code_set: AtomicBool::new(false), exit_code: Arc::new(Mutex::new(ExitCode::SUCCESS)), }) } + /** + Consumes and leaks the task scheduler, + returning a static reference `&'static TaskScheduler`. + + This function is useful when the task scheduler object is + supposed to live for the remainder of the program's life. + + Note that dropping the returned reference will cause a memory leak. + */ pub fn into_static(self) -> &'static Self { Box::leak(Box::new(self)) } - pub fn status(&self) -> TaskSchedulerStatus { - let counts = { - ( - self.task_queue_instant.lock().unwrap().len(), - self.task_queue_deferred.lock().unwrap().len(), - self.task_queue_yielded.lock().unwrap().len(), - ) - }; - let num_total = counts.0 + counts.1 + counts.2; - let exit_code = if self.exit_code_set.load(Ordering::Relaxed) { - Some(*self.exit_code.lock().unwrap()) - } else { - None - }; - TaskSchedulerStatus { - exit_code, - num_instant: counts.0, - num_deferred: counts.1, - num_yielded: counts.2, - num_total, + /** + Gets the current state of the task scheduler. + + Panics if called during any of the task scheduler resumption phases. + */ + pub fn state(&self) -> TaskSchedulerState { + const MESSAGE: &str = + "Failed to get lock - make sure not to call during task scheduler resumption"; + TaskSchedulerState { + exit_code: if self.exit_code_set.load(Ordering::Relaxed) { + Some(*self.exit_code.try_lock().expect(MESSAGE)) + } else { + None + }, + num_instant: self.task_queue_instant.try_lock().expect(MESSAGE).len(), + num_deferred: self.task_queue_deferred.try_lock().expect(MESSAGE).len(), + num_future: self.futures.try_lock().expect(MESSAGE).len(), + num_total: self.tasks.try_lock().expect(MESSAGE).len(), } } + /** + Stores the exit code for the task scheduler. + + This will be passed back to the Rust thread that is running the task scheduler, + in the [`TaskSchedulerState`] returned on resumption of the task scheduler queue. + + Setting this exit code will signal to that thread that it + should stop resuming tasks, and gracefully terminate the program. + */ pub fn set_exit_code(&self, code: ExitCode) { + *self.exit_code.lock().unwrap() = code; self.exit_code_set.store(true, Ordering::Relaxed); - *self.exit_code.lock().unwrap() = code } - fn schedule<'a>( + /** + Creates a new task, storing a new Lua thread + for it, as well as the arguments to give the + thread on resumption, in the Lua registry. + */ + fn create_task<'a>( &self, kind: TaskKind, - tof: LuaValue<'a>, - args: Option>, - delay: Option, + thread_or_function: LuaValue<'a>, + thread_args: Option>, ) -> LuaResult { // Get or create a thread from the given argument - let task_thread = match tof { + let task_thread = match thread_or_function { LuaValue::Thread(t) => t, LuaValue::Function(f) => self.lua.create_thread(f)?, value => { @@ -179,138 +209,232 @@ impl TaskScheduler { } }; // Store the thread and its arguments in the registry - let task_args_vec = args.map(|opt| opt.into_vec()); - let task_thread_key = self.lua.create_registry_value(task_thread)?; - let task_args_key = self.lua.create_registry_value(task_args_vec)?; + let task_args_vec: Option> = thread_args.map(|opt| opt.into_vec()); + let task_args_key: LuaRegistryKey = self.lua.create_registry_value(task_args_vec)?; + let task_thread_key: LuaRegistryKey = self.lua.create_registry_value(task_thread)?; // Create the full task struct let guid = self.guid.fetch_add(1, Ordering::Relaxed) + 1; let queued_at = Instant::now(); - let queued_target = delay.map(|secs| queued_at + Duration::from_secs_f64(secs)); let task = Task { - kind, - guid, thread: task_thread_key, args: task_args_key, queued_at, - queued_target, }; - // Create the task ref (before adding the task to the scheduler) - let task_ref = TaskReference::from(&task); // Add it to the scheduler { let mut tasks = self.tasks.lock().unwrap(); - tasks.insert(task_ref, task); + tasks.insert(TaskReference::new(kind, guid), task); } + Ok(TaskReference::new(kind, guid)) + } + + /** + Schedules a new task to run on the task scheduler. + + When we want to schedule a task to resume instantly after the + currently running task we should pass `after_current_resume = true`. + + This is useful in cases such as our task.spawn implementation: + + ```lua + task.spawn(function() + -- This will be a new task, but it should + -- also run right away, until the first yield + end) + -- Here we have either yielded or finished the above task + ``` + */ + fn schedule<'a>( + &self, + kind: TaskKind, + thread_or_function: LuaValue<'a>, + thread_args: Option>, + after_current_resume: bool, + ) -> LuaResult { + if kind == TaskKind::Future { + panic!("Tried to schedule future using normal task schedule method") + } + let task_ref = self.create_task(kind, thread_or_function, thread_args)?; match kind { TaskKind::Instant => { - // If we have a currently running task and we spawned an - // instant task here it should run right after the currently - // running task, so put it at the front of the task queue let mut queue = self.task_queue_instant.lock().unwrap(); - if self.running { - queue.push_front(task_ref); + if after_current_resume { + assert!( + queue.len() > 0, + "Cannot schedule a task after the first instant when task queue is empty" + ); + queue.insert(1, task_ref); } else { - queue.push_back(task_ref); + queue.push_front(task_ref); } } TaskKind::Deferred => { - // Deferred tasks should always schedule - // at the very end of the deferred queue + // Deferred tasks should always schedule at the end of the deferred queue let mut queue = self.task_queue_deferred.lock().unwrap(); queue.push_back(task_ref); } - TaskKind::Yielded => { - // Find the first task that is scheduled after this one and insert before it, - // this will ensure that our list of delayed tasks is sorted and we can grab - // the very first one to figure out how long to yield until the next cycle - let mut queue = self.task_queue_yielded.lock().unwrap(); - let idx = queue - .iter() - .enumerate() - .find_map(|(idx, t)| { - if t.queued_target > queued_target { - Some(idx) - } else { - None - } - }) - .unwrap_or(queue.len()); - queue.insert(idx, task_ref); - } + TaskKind::Future => unreachable!(), } Ok(task_ref) } - pub fn schedule_instant<'a>( + pub fn schedule_current_resume<'a>( &self, - tof: LuaValue<'a>, - args: LuaMultiValue<'a>, + thread_or_function: LuaValue<'a>, + thread_args: LuaMultiValue<'a>, ) -> LuaResult { - self.schedule(TaskKind::Instant, tof, Some(args), None) + self.schedule( + TaskKind::Instant, + thread_or_function, + Some(thread_args), + false, + ) + } + + pub fn schedule_after_current_resume<'a>( + &self, + thread_or_function: LuaValue<'a>, + thread_args: LuaMultiValue<'a>, + ) -> LuaResult { + self.schedule( + TaskKind::Instant, + thread_or_function, + Some(thread_args), + true, + ) } pub fn schedule_deferred<'a>( &self, - tof: LuaValue<'a>, - args: LuaMultiValue<'a>, + thread_or_function: LuaValue<'a>, + thread_args: LuaMultiValue<'a>, ) -> LuaResult { - self.schedule(TaskKind::Deferred, tof, Some(args), None) + self.schedule( + TaskKind::Deferred, + thread_or_function, + Some(thread_args), + false, + ) } pub fn schedule_delayed<'a>( &self, - secs: f64, - tof: LuaValue<'a>, - args: LuaMultiValue<'a>, + after_secs: f64, + thread_or_function: LuaValue<'a>, + thread_args: LuaMultiValue<'a>, ) -> LuaResult { - self.schedule(TaskKind::Yielded, tof, Some(args), Some(secs)) + let task_ref = self.create_task(TaskKind::Future, thread_or_function, Some(thread_args))?; + let futs = self + .futures + .try_lock() + .expect("Failed to get lock on futures"); + futs.push(Box::pin(async move { + sleep(Duration::from_secs_f64(after_secs)).await; + (task_ref, Ok(None)) + })); + Ok(task_ref) } - pub fn resume_after(&self, secs: f64, thread: LuaThread<'_>) -> LuaResult { - self.schedule( - TaskKind::Yielded, - LuaValue::Thread(thread), - None, - Some(secs), - ) + pub fn schedule_wait( + &self, + after_secs: f64, + thread_or_function: LuaValue<'_>, + ) -> LuaResult { + // TODO: Wait should inherit the guid of the current task, + // this will ensure that TaskReferences are identical and + // that any waits inside of spawned tasks will also cancel + let task_ref = self.create_task(TaskKind::Future, thread_or_function, None)?; + let futs = self + .futures + .try_lock() + .expect("Failed to get lock on futures"); + futs.push(Box::pin(async move { + sleep(Duration::from_secs_f64(after_secs)).await; + (task_ref, Ok(None)) + })); + Ok(task_ref) } - pub fn cancel(&self, reference: TaskReference) -> bool { - let queue_mutex = match reference.kind { - TaskKind::Instant => &self.task_queue_instant, - TaskKind::Deferred => &self.task_queue_deferred, - TaskKind::Yielded => &self.task_queue_yielded, - }; - let mut queue = queue_mutex.lock().unwrap(); - let mut found = false; - queue.retain(|task| { - if task.guid == reference.guid { - found = true; - false - } else { - true - } - }); - found + /** + Checks if a task still exists in the scheduler. + + A task may no longer exist in the scheduler if it has been manually + cancelled and removed by calling [`TaskScheduler::cancel_task()`]. + */ + #[allow(dead_code)] + pub fn contains_task(&self, reference: TaskReference) -> bool { + let tasks = self.tasks.lock().unwrap(); + tasks.contains_key(&reference) } - pub fn resume_task(&self, reference: TaskReference) -> LuaResult<()> { + /** + Cancels a task, if the task still exists in the scheduler. + + It is possible to hold one or more task references that point + to a task that no longer exists in the scheduler, and calling + this method with one of those references will return `false`. + */ + pub fn cancel_task(&self, reference: TaskReference) -> LuaResult { + /* + Remove the task from the task list and the Lua registry + + This is all we need to do since resume_task will always + ignore resumption of any task that no longer exists there + + This does lead to having some amount of "junk" tasks and futures + built up in the queues but these will get cleaned up and not block + the program from exiting since the scheduler only runs until there + are no tasks left in the task list, the queues do not matter there + */ + let mut tasks = self.tasks.lock().unwrap(); + if let Some(task) = tasks.remove(&reference) { + self.lua.remove_registry_value(task.thread)?; + self.lua.remove_registry_value(task.args)?; + Ok(true) + } else { + Ok(false) + } + } + + /** + Resumes a task, if the task still exists in the scheduler. + + A task may no longer exist in the scheduler if it has been manually + cancelled and removed by calling [`TaskScheduler::cancel_task()`]. + + This will be a no-op if the task no longer exists. + */ + pub fn resume_task( + &self, + reference: TaskReference, + override_args: Option>, + ) -> LuaResult<()> { let task = { let mut tasks = self.tasks.lock().unwrap(); match tasks.remove(&reference) { Some(task) => task, - None => { - return Err(LuaError::RuntimeError(format!( - "Task does not exist in scheduler: {reference}" - ))) - } + None => return Ok(()), // Task was removed } }; let thread: LuaThread = self.lua.registry_value(&task.thread)?; - let args: Option> = self.lua.registry_value(&task.args)?; + let args = override_args.or_else(|| { + self.lua + .registry_value::>>(&task.args) + .expect("Failed to get stored args for task") + }); if let Some(args) = args { thread.resume::<_, LuaMultiValue>(LuaMultiValue::from_vec(args))?; } else { + /* + The tasks did not get any arguments from either: + + - Providing arguments at the call site for creating the task + - Returning arguments from a future that created this task + + The only tasks that do not get any arguments from either + of those sources are waiting tasks, and waiting tasks + want the amount of time waited returned to them. + */ let elapsed = task.queued_at.elapsed().as_secs_f64(); thread.resume::<_, LuaMultiValue>(elapsed)?; } @@ -319,88 +443,146 @@ impl TaskScheduler { Ok(()) } + /** + Retrieves the queue for a specific kind of task. + + Panics for [`TaskKind::Future`] since + futures do not use the normal task queue. + */ fn get_queue(&self, kind: TaskKind) -> &TaskSchedulerQueue { match kind { TaskKind::Instant => &self.task_queue_instant, TaskKind::Deferred => &self.task_queue_deferred, - TaskKind::Yielded => &self.task_queue_yielded, + TaskKind::Future => { + panic!("Future tasks do not use the normal task queue") + } } } - fn next_queue_task(&self, kind: TaskKind) -> Option { - let task = { - let queue_guard = self.get_queue(kind).lock().unwrap(); - queue_guard.front().copied() - }; - task + /** + Checks if a future exists in the task queue. + + Panics if called during resumption of the futures task queue. + */ + fn next_queue_future_exists(&self) -> bool { + let futs = self.futures.try_lock().expect( + "Failed to get lock on futures - make sure not to call during futures resumption", + ); + !futs.is_empty() } - fn resume_next_queue_task(&self, kind: TaskKind) -> Option> { + /** + Resumes the next queued Lua task, if one exists, blocking + the current thread until it either yields or finishes. + */ + fn resume_next_queue_task( + &self, + kind: TaskKind, + override_args: Option>, + ) -> TaskSchedulerResult { match { let mut queue_guard = self.get_queue(kind).lock().unwrap(); queue_guard.pop_front() } { None => { - let status = self.status(); + let status = self.state(); if status.num_total > 0 { - Some(Ok(status)) + TaskSchedulerResult::TaskSuccessful { + state: self.state(), + } } else { - None + TaskSchedulerResult::Finished { + state: self.state(), + } } } - Some(t) => match self.resume_task(t) { - Ok(_) => Some(Ok(self.status())), - Err(e) => Some(Err(e)), + Some(task) => match self.resume_task(task, override_args) { + Ok(()) => TaskSchedulerResult::TaskSuccessful { + state: self.state(), + }, + Err(task_err) => TaskSchedulerResult::TaskErrored { + error: task_err, + state: self.state(), + }, }, } } - pub async fn resume_queue(&self) -> Option> { - let now = Instant::now(); - let status = self.status(); + /** + Awaits the first available queued future, and resumes its + associated Lua task which will then be ready for resumption. + + Panics if there are no futures currently queued. + + Use [`TaskScheduler::next_queue_future_exists`] + to check if there are any queued futures. + */ + async fn resume_next_queue_future(&self) -> TaskSchedulerResult { + let result = { + let mut futs = self + .futures + .try_lock() + .expect("Failed to get lock on futures"); + futs.next() + .await + .expect("Tried to resume next queued future but none are queued") + }; + match result { + (task, Err(fut_err)) => { + // Future errored, don't resume its associated task + // and make sure to cancel / remove it completely + let error_prefer_cancel = match self.cancel_task(task) { + Err(cancel_err) => cancel_err, + Ok(_) => fut_err, + }; + TaskSchedulerResult::TaskErrored { + error: error_prefer_cancel, + state: self.state(), + } + } + (task, Ok(args)) => { + // Promote this future task to an instant task + // and resume the instant queue right away, taking + // care to not deadlock by dropping the mutex guard + let mut queue_guard = self.get_queue(TaskKind::Instant).lock().unwrap(); + queue_guard.push_front(task); + drop(queue_guard); + self.resume_next_queue_task(TaskKind::Instant, args) + } + } + } + + /** + Resumes the task scheduler queue. + + This will run any spawned or deferred Lua tasks in a blocking manner. + + Once all spawned and / or deferred Lua tasks have finished running, + this will process delayed tasks, waiting tasks, and native Rust + futures concurrently, awaiting the first one to be ready for resumption. + */ + pub async fn resume_queue(&self) -> TaskSchedulerResult { + let status = self.state(); /* Resume tasks in the internal queue, in this order: 1. Tasks from task.spawn, this includes the main thread 2. Tasks from task.defer - 3. Tasks from task.delay OR futures, whichever comes first - 4. Tasks from futures + 3. Tasks from task.delay / task.wait / native futures, first ready first resumed */ if status.num_instant > 0 { - self.resume_next_queue_task(TaskKind::Instant) + self.resume_next_queue_task(TaskKind::Instant, None) } else if status.num_deferred > 0 { - self.resume_next_queue_task(TaskKind::Deferred) - } else if status.num_yielded > 0 { - // 3. Threads from task.delay or task.wait, futures - let next_yield_target = self - .next_queue_task(TaskKind::Yielded) - .expect("Yielded task missing but status count is > 0") - .queued_target - .expect("Yielded task is missing queued target"); - // Resume this yielding task if its target time has passed - if now >= next_yield_target { - self.resume_next_queue_task(TaskKind::Yielded) - } else { - /* - Await the first future to be ready - - - If it is the sleep fut then we will return and the next - call to resume_queue will then resume that yielded task - - - If it is a future then we resume the corresponding task - that is has stored in the future-specific task queue - */ - sleep(next_yield_target - now).await; - // TODO: Implement this, for now we only await sleep - // since the task scheduler doesn't support futures - Some(Ok(self.status())) - } + self.resume_next_queue_task(TaskKind::Deferred, None) } else { - // 4. Just futures - - // TODO: Await the first future to be ready - // and resume the corresponding task for it - None + // 3. Threads from task.delay or task.wait, futures + if self.next_queue_future_exists() { + self.resume_next_queue_future().await + } else { + TaskSchedulerResult::Finished { + state: self.state(), + } + } } } } diff --git a/tests/task/cancel.luau b/tests/task/cancel.luau index dfbd0ce..f466434 100644 --- a/tests/task/cancel.luau +++ b/tests/task/cancel.luau @@ -21,10 +21,13 @@ assert(not flag2, "Cancel should handle delayed threads") local flag3: number = 1 local thread3 = task.spawn(function() + print("1") task.wait(0.1) flag3 = 2 + print("2") task.wait(0.2) flag3 = 3 + print("3") end) task.wait(0.2) task.cancel(thread3)