diff --git a/packages/lib/src/globals/net.rs b/packages/lib/src/globals/net.rs index 64cf15f..c27ead0 100644 --- a/packages/lib/src/globals/net.rs +++ b/packages/lib/src/globals/net.rs @@ -10,7 +10,7 @@ use crate::{ lua::{ // net::{NetWebSocketClient, NetWebSocketServer}, net::{NetClient, NetClientBuilder, NetLocalExec, NetService, RequestConfig, ServeConfig}, - task::TaskScheduler, + task::{TaskScheduler, TaskSchedulerAsyncExt}, }, utils::{net::get_request_user_agent_header, table::TableBuilder}, }; @@ -108,7 +108,9 @@ async fn net_serve<'a>( lua.create_registry_value(handler) .expect("Failed to store websocket handler") }); - let sched = lua.app_data_ref::<&TaskScheduler>().unwrap(); + let sched = lua + .app_data_ref::<&TaskScheduler>() + .expect("Missing task scheduler - make sure it is added as a lua app data before the first scheduler resumption"); // Bind first to make sure that we can bind to this address let bound = match Server::try_bind(&([127, 0, 0, 1], port).into()) { Err(e) => { diff --git a/packages/lib/src/globals/process.rs b/packages/lib/src/globals/process.rs index cccf165..433fe65 100644 --- a/packages/lib/src/globals/process.rs +++ b/packages/lib/src/globals/process.rs @@ -50,7 +50,9 @@ pub fn create(lua: &'static Lua, args_vec: Vec) -> LuaResult { let process_exit_env_yield: LuaFunction = lua.named_registry_value("co.yield")?; let process_exit_env_exit: LuaFunction = lua.create_function(|lua, code: Option| { let exit_code = code.map_or(ExitCode::SUCCESS, ExitCode::from); - let sched = lua.app_data_ref::<&TaskScheduler>().unwrap(); + let sched = lua + .app_data_ref::<&TaskScheduler>() + .expect("Missing task scheduler - make sure it is added as a lua app data before the first scheduler resumption"); sched.set_exit_code(exit_code); Ok(()) })?; diff --git a/packages/lib/src/globals/task.rs b/packages/lib/src/globals/task.rs index 70d54a4..33120f4 100644 --- a/packages/lib/src/globals/task.rs +++ b/packages/lib/src/globals/task.rs @@ -1,10 +1,12 @@ use mlua::prelude::*; use crate::{ - lua::task::{TaskKind, TaskReference, TaskScheduler}, + lua::task::{TaskKind, TaskReference, TaskScheduler, TaskSchedulerScheduleExt}, utils::table::TableBuilder, }; +const ERR_MISSING_SCHEDULER: &str = "Missing task scheduler - make sure it is added as a lua app data before the first scheduler resumption"; + const TASK_WAIT_IMPL_LUA: &str = r#" local seconds = ... local current = thread() @@ -15,7 +17,7 @@ return yield() const TASK_SPAWN_IMPL_LUA: &str = r#" -- Schedule the current thread at the front scheduleNext(thread()) --- Schedule the thread to spawn at the front, +-- Schedule the wanted task arg at the front, -- the previous schedule now comes right after local task = scheduleNext(...) -- Give control over to the scheduler, which will @@ -25,20 +27,26 @@ return task "#; pub fn create(lua: &'static Lua) -> LuaResult> { - // Create a user-accessible function cancel tasks + // Create a user-accessible function that cancels a task let task_cancel = lua.create_function(|lua, task: TaskReference| { - let sched = lua.app_data_ref::<&TaskScheduler>().unwrap(); + let sched = lua + .app_data_ref::<&TaskScheduler>() + .expect(ERR_MISSING_SCHEDULER); sched.remove_task(task)?; Ok(()) })?; // Create functions that manipulate non-blocking tasks in the scheduler let task_defer = lua.create_function(|lua, (tof, args): (LuaValue, LuaMultiValue)| { - let sched = lua.app_data_ref::<&TaskScheduler>().unwrap(); - sched.schedule_deferred(tof, args) + let sched = lua + .app_data_ref::<&TaskScheduler>() + .expect(ERR_MISSING_SCHEDULER); + sched.schedule_blocking_deferred(tof, args) })?; let task_delay = lua.create_function(|lua, (secs, tof, args): (f64, LuaValue, LuaMultiValue)| { - let sched = lua.app_data_ref::<&TaskScheduler>().unwrap(); + let sched = lua + .app_data_ref::<&TaskScheduler>() + .expect(ERR_MISSING_SCHEDULER); sched.schedule_delayed(secs, tof, args) })?; // Create our task wait function, this is a bit different since @@ -55,7 +63,9 @@ pub fn create(lua: &'static Lua) -> LuaResult> { .with_function( "resumeAfter", |lua, (secs, thread): (Option, LuaThread)| { - let sched = lua.app_data_ref::<&TaskScheduler>().unwrap(); + let sched = lua + .app_data_ref::<&TaskScheduler>() + .expect(ERR_MISSING_SCHEDULER); sched.schedule_wait(secs.unwrap_or(0f64), LuaValue::Thread(thread)) }, )? @@ -76,8 +86,10 @@ pub fn create(lua: &'static Lua) -> LuaResult> { .with_function( "scheduleNext", |lua, (tof, args): (LuaValue, LuaMultiValue)| { - let sched = lua.app_data_ref::<&TaskScheduler>().unwrap(); - sched.schedule_next(tof, args) + let sched = lua + .app_data_ref::<&TaskScheduler>() + .expect(ERR_MISSING_SCHEDULER); + sched.schedule_blocking(tof, args) }, )? .build_readonly()?, @@ -115,29 +127,37 @@ pub fn create(lua: &'static Lua) -> LuaResult> { coroutine.set( "resume", lua.create_function(|lua, value: LuaValue| { + let tname = value.type_name(); if let LuaValue::Thread(thread) = value { - let sched = lua.app_data_ref::<&TaskScheduler>().unwrap(); + let sched = lua + .app_data_ref::<&TaskScheduler>() + .expect(ERR_MISSING_SCHEDULER); let task = sched.create_task(TaskKind::Instant, LuaValue::Thread(thread), None, None)?; sched.resume_task(task, None) } else if let Ok(task) = TaskReference::from_lua(value, lua) { - let sched = lua.app_data_ref::<&TaskScheduler>().unwrap(); - sched.resume_task(task, None) + lua.app_data_ref::<&TaskScheduler>() + .expect(ERR_MISSING_SCHEDULER) + .resume_task(task, None) } else { - Err(LuaError::RuntimeError( - "Argument #1 must be a thread".to_string(), - )) + Err(LuaError::RuntimeError(format!( + "Argument #1 must be a thread, got {tname}", + ))) } })?, )?; coroutine.set( "wrap", lua.create_function(|lua, func: LuaFunction| { - let sched = lua.app_data_ref::<&TaskScheduler>().unwrap(); + let sched = lua + .app_data_ref::<&TaskScheduler>() + .expect(ERR_MISSING_SCHEDULER); let task = sched.create_task(TaskKind::Instant, LuaValue::Function(func), None, None)?; lua.create_function(move |lua, args: LuaMultiValue| { - let sched = lua.app_data_ref::<&TaskScheduler>().unwrap(); + let sched = lua + .app_data_ref::<&TaskScheduler>() + .expect(ERR_MISSING_SCHEDULER); sched.resume_task(task, Some(Ok(args))) }) })?, diff --git a/packages/lib/src/lib.rs b/packages/lib/src/lib.rs index 75d5467..57b161b 100644 --- a/packages/lib/src/lib.rs +++ b/packages/lib/src/lib.rs @@ -1,6 +1,6 @@ use std::{collections::HashSet, process::ExitCode}; -use lua::task::TaskScheduler; +use lua::task::{TaskScheduler, TaskSchedulerResumeExt, TaskSchedulerScheduleExt}; use mlua::prelude::*; use tokio::task::LocalSet; @@ -103,7 +103,7 @@ impl Lune { // Create our task scheduler and schedule the main thread on it let sched = TaskScheduler::new(lua)?.into_static(); lua.set_app_data(sched); - sched.schedule_next( + sched.schedule_blocking( LuaValue::Function( lua.load(script_contents) .set_name(script_name) diff --git a/packages/lib/src/lua/net/server.rs b/packages/lib/src/lua/net/server.rs index 624fcda..17c23cf 100644 --- a/packages/lib/src/lua/net/server.rs +++ b/packages/lib/src/lua/net/server.rs @@ -51,7 +51,9 @@ impl Service> for NetServiceInner { let _ws = ws.await.map_err(LuaError::external)?; // let sock = NetWebSocketServer::from(ws); // let table = sock.into_lua_table(lua)?; - // let sched = lua.app_data_ref::<&TaskScheduler>().unwrap(); + // let sched = lua + // .app_data_ref::<&TaskScheduler>() + // .expect("Missing task scheduler - make sure it is added as a lua app data before the first scheduler resumption"); // sched.schedule_current_resume( // LuaValue::Function(handler), // LuaMultiValue::from_vec(vec![LuaValue::Table(table)]), diff --git a/packages/lib/src/lua/task/ext/async_ext.rs b/packages/lib/src/lua/task/ext/async_ext.rs new file mode 100644 index 0000000..f0cc5b7 --- /dev/null +++ b/packages/lib/src/lua/task/ext/async_ext.rs @@ -0,0 +1,147 @@ +use async_trait::async_trait; + +use futures_util::Future; +use mlua::prelude::*; + +use crate::utils::table::TableBuilder; + +use super::super::{ + async_handle::TaskSchedulerAsyncHandle, message::TaskSchedulerMessage, + scheduler::TaskReference, scheduler::TaskScheduler, +}; + +const TASK_ASYNC_IMPL_LUA: &str = r#" +resumeAsync(thread(), ...) +return yield() +"#; + +/* + ────────────────────────────────────────────────────────── + Trait definition - same as the implementation, ignore this + + We use traits here to prevent misuse of certain scheduler + APIs, making importing of them as intentional as possible + ────────────────────────────────────────────────────────── +*/ +#[async_trait(?Send)] +pub trait TaskSchedulerAsyncExt<'fut> { + fn register_background_task(&self) -> TaskSchedulerAsyncHandle; + + fn schedule_async<'sched, R, F, FR>( + &'sched self, + thread_or_function: LuaValue<'_>, + func: F, + ) -> LuaResult + where + 'sched: 'fut, + R: ToLuaMulti<'static>, + F: 'static + Fn(&'static Lua) -> FR, + FR: 'static + Future>; + + fn make_scheduled_async_fn(&self, func: F) -> LuaResult + where + A: FromLuaMulti<'static>, + R: ToLuaMulti<'static>, + F: 'static + Fn(&'static Lua, A) -> FR, + FR: 'static + Future>; +} + +/* + ──────────────────── + Trait implementation + ──────────────────── +*/ +#[async_trait(?Send)] +impl<'fut> TaskSchedulerAsyncExt<'fut> for TaskScheduler<'fut> { + /** + Registers a new background task with the task scheduler. + + The returned [`TaskSchedulerAsyncHandle`] must have its + [`TaskSchedulerAsyncHandle::unregister`] method called + upon completion of the background task to prevent + the task scheduler from running indefinitely. + */ + fn register_background_task(&self) -> TaskSchedulerAsyncHandle { + let sender = self.futures_tx.clone(); + sender + .send(TaskSchedulerMessage::Spawned) + .unwrap_or_else(|e| { + panic!( + "\ + \nFailed to unregister background task - this is an internal error! \ + \nPlease report it at {} \ + \nDetails: {e} \ + ", + env!("CARGO_PKG_REPOSITORY") + ) + }); + TaskSchedulerAsyncHandle::new(sender) + } + + /** + Schedules a lua thread or function + to be resumed after running a future. + + The given lua thread or function will be resumed + using the optional arguments returned by the future. + */ + fn schedule_async<'sched, R, F, FR>( + &'sched self, + thread_or_function: LuaValue<'_>, + func: F, + ) -> LuaResult + where + 'sched: 'fut, // Scheduler must live at least as long as the future + R: ToLuaMulti<'static>, + F: 'static + Fn(&'static Lua) -> FR, + FR: 'static + Future>, + { + self.queue_async_task(thread_or_function, None, None, async move { + match func(self.lua).await { + Ok(res) => match res.to_lua_multi(self.lua) { + Ok(multi) => Ok(Some(multi)), + Err(e) => Err(e), + }, + Err(e) => Err(e), + } + }) + } + + /** + Creates a function callable from Lua that runs an async + closure and returns the results of it to the call site. + */ + fn make_scheduled_async_fn(&self, func: F) -> LuaResult + where + A: FromLuaMulti<'static>, + R: ToLuaMulti<'static>, + F: 'static + Fn(&'static Lua, A) -> FR, + FR: 'static + Future>, + { + let async_env_thread: LuaFunction = self.lua.named_registry_value("co.thread")?; + let async_env_yield: LuaFunction = self.lua.named_registry_value("co.yield")?; + self.lua + .load(TASK_ASYNC_IMPL_LUA) + .set_environment( + TableBuilder::new(self.lua)? + .with_value("thread", async_env_thread)? + .with_value("yield", async_env_yield)? + .with_function( + "resumeAsync", + move |lua: &Lua, (thread, args): (LuaThread, A)| { + let fut = func(lua, args); + let sched = lua + .app_data_ref::<&TaskScheduler>() + .expect("Missing task scheduler - make sure it is added as a lua app data before the first scheduler resumption"); + sched.queue_async_task(LuaValue::Thread(thread), None, None, async { + let rets = fut.await?; + let mult = rets.to_lua_multi(lua)?; + Ok(Some(mult)) + }) + }, + )? + .build_readonly()?, + )? + .into_function() + } +} diff --git a/packages/lib/src/lua/task/ext/mod.rs b/packages/lib/src/lua/task/ext/mod.rs new file mode 100644 index 0000000..c6a6958 --- /dev/null +++ b/packages/lib/src/lua/task/ext/mod.rs @@ -0,0 +1,7 @@ +mod async_ext; +mod resume_ext; +mod schedule_ext; + +pub use async_ext::TaskSchedulerAsyncExt; +pub use resume_ext::TaskSchedulerResumeExt; +pub use schedule_ext::TaskSchedulerScheduleExt; diff --git a/packages/lib/src/lua/task/ext/resume_ext.rs b/packages/lib/src/lua/task/ext/resume_ext.rs new file mode 100644 index 0000000..4195ac9 --- /dev/null +++ b/packages/lib/src/lua/task/ext/resume_ext.rs @@ -0,0 +1,163 @@ +use async_trait::async_trait; + +use mlua::prelude::*; + +use futures_util::StreamExt; + +use super::super::{message::TaskSchedulerMessage, result::TaskSchedulerState, TaskScheduler}; + +/* + ────────────────────────────────────────────────────────── + Trait definition - same as the implementation, ignore this + + We use traits here to prevent misuse of certain scheduler + APIs, making importing of them as intentional as possible + ────────────────────────────────────────────────────────── +*/ +#[async_trait(?Send)] +pub trait TaskSchedulerResumeExt { + async fn resume_queue(&self) -> TaskSchedulerState; +} + +/* + ──────────────────── + Trait implementation + ──────────────────── +*/ +#[async_trait(?Send)] +impl TaskSchedulerResumeExt for TaskScheduler<'_> { + /** + Awaits the next background task registration + message, if any messages exist in the queue. + + This is a no-op if there are no background tasks left running + and / or the background task messages channel was closed. + */ + async fn resume_queue(&self) -> TaskSchedulerState { + let current = TaskSchedulerState::new(self); + if current.has_blocking_tasks() { + // 1. Blocking tasks + resume_next_blocking_task(self, None) + } else if current.has_future_tasks() && current.has_background_tasks() { + // 2. Async + background tasks + tokio::select! { + result = resume_next_async_task(self) => result, + result = receive_next_message(self) => result, + } + } else if current.has_future_tasks() { + // 3. Async tasks + resume_next_async_task(self).await + } else if current.has_background_tasks() { + // 4. Background tasks + receive_next_message(self).await + } else { + TaskSchedulerState::new(self) + } + } +} + +/* + ──────────────────────────────────────────────────────────────── + Private functions for the trait that operate on the task scheduler + + These could be implemented as normal methods but if we put them in the + trait they become public, and putting them in the task scheduler's + own implementation block will clutter that up unnecessarily + ──────────────────────────────────────────────────────────────── +*/ + +/** + Resumes the next queued Lua task, if one exists, blocking + the current thread until it either yields or finishes. +*/ +fn resume_next_blocking_task( + scheduler: &TaskScheduler<'_>, + override_args: Option>, +) -> TaskSchedulerState { + match { + let mut queue_guard = scheduler.tasks_queue_blocking.borrow_mut(); + let task = queue_guard.pop_front(); + drop(queue_guard); + task + } { + None => TaskSchedulerState::new(scheduler), + Some(task) => match scheduler.resume_task(task, override_args) { + Ok(_) => TaskSchedulerState::new(scheduler), + Err(task_err) => TaskSchedulerState::err(scheduler, task_err), + }, + } +} + +/** + Awaits the first available queued future, and resumes its associated + Lua task which will be ready for resumption when that future wakes. + + Panics if there are no futures currently queued. + + Use [`TaskScheduler::next_queue_future_exists`] + to check if there are any queued futures. +*/ +async fn resume_next_async_task(scheduler: &TaskScheduler<'_>) -> TaskSchedulerState { + let (task, result) = { + let mut futs = scheduler + .futures + .try_lock() + .expect("Tried to resume next queued future while already resuming or modifying"); + futs.next() + .await + .expect("Tried to resume next queued future but none are queued") + }; + // Promote this future task to a blocking task and resume it + // right away, also taking care to not borrow mutably twice + // by dropping this guard before trying to resume it + let mut queue_guard = scheduler.tasks_queue_blocking.borrow_mut(); + queue_guard.push_front(task); + drop(queue_guard); + resume_next_blocking_task(scheduler, result.transpose()) +} + +/** + Resumes the task scheduler queue. + + This will run any spawned or deferred Lua tasks in a blocking manner. + + Once all spawned and / or deferred Lua tasks have finished running, + this will process delayed tasks, waiting tasks, and native Rust + futures concurrently, awaiting the first one to be ready for resumption. +*/ +async fn receive_next_message(scheduler: &TaskScheduler<'_>) -> TaskSchedulerState { + let message_opt = { + let mut rx = scheduler.futures_rx.lock().await; + rx.recv().await + }; + if let Some(message) = message_opt { + match message { + TaskSchedulerMessage::NewBlockingTaskReady => TaskSchedulerState::new(scheduler), + TaskSchedulerMessage::Spawned => { + let prev = scheduler.futures_registered_count.get(); + scheduler.futures_registered_count.set(prev + 1); + TaskSchedulerState::new(scheduler) + } + TaskSchedulerMessage::Terminated(result) => { + let prev = scheduler.futures_registered_count.get(); + scheduler.futures_registered_count.set(prev - 1); + if prev == 0 { + panic!( + r#" + Terminated a background task without it running - this is an internal error! + Please report it at {} + "#, + env!("CARGO_PKG_REPOSITORY") + ) + } + if let Err(e) = result { + TaskSchedulerState::err(scheduler, e) + } else { + TaskSchedulerState::new(scheduler) + } + } + } + } else { + TaskSchedulerState::new(scheduler) + } +} diff --git a/packages/lib/src/lua/task/ext/schedule_ext.rs b/packages/lib/src/lua/task/ext/schedule_ext.rs new file mode 100644 index 0000000..e55bdae --- /dev/null +++ b/packages/lib/src/lua/task/ext/schedule_ext.rs @@ -0,0 +1,133 @@ +use std::time::Duration; + +use mlua::prelude::*; +use tokio::time::sleep; + +use super::super::{scheduler::TaskKind, scheduler::TaskReference, scheduler::TaskScheduler}; + +/* + ────────────────────────────────────────────────────────── + Trait definition - same as the implementation, ignore this + + We use traits here to prevent misuse of certain scheduler + APIs, making importing of them as intentional as possible + ────────────────────────────────────────────────────────── +*/ +pub trait TaskSchedulerScheduleExt { + fn schedule_blocking( + &self, + thread_or_function: LuaValue<'_>, + thread_args: LuaMultiValue<'_>, + ) -> LuaResult; + + fn schedule_blocking_deferred( + &self, + thread_or_function: LuaValue<'_>, + thread_args: LuaMultiValue<'_>, + ) -> LuaResult; + + fn schedule_delayed( + &self, + after_secs: f64, + thread_or_function: LuaValue<'_>, + thread_args: LuaMultiValue<'_>, + ) -> LuaResult; + + fn schedule_wait( + &self, + after_secs: f64, + thread_or_function: LuaValue<'_>, + ) -> LuaResult; +} + +/* + ──────────────────── + Trait implementation + ──────────────────── +*/ +impl TaskSchedulerScheduleExt for TaskScheduler<'_> { + /** + Schedules a lua thread or function to resume ***first*** during this + resumption point, ***skipping ahead*** of any other currently queued tasks. + + The given lua thread or function will be resumed + using the given `thread_args` as its argument(s). + */ + fn schedule_blocking( + &self, + thread_or_function: LuaValue<'_>, + thread_args: LuaMultiValue<'_>, + ) -> LuaResult { + self.queue_blocking_task( + TaskKind::Instant, + thread_or_function, + Some(thread_args), + None, + ) + } + + /** + Schedules a lua thread or function to resume ***after all*** + currently resuming tasks, during this resumption point. + + The given lua thread or function will be resumed + using the given `thread_args` as its argument(s). + */ + fn schedule_blocking_deferred( + &self, + thread_or_function: LuaValue<'_>, + thread_args: LuaMultiValue<'_>, + ) -> LuaResult { + self.queue_blocking_task( + TaskKind::Deferred, + thread_or_function, + Some(thread_args), + None, + ) + } + + /** + Schedules a lua thread or function to + be resumed after waiting asynchronously. + + The given lua thread or function will be resumed + using the given `thread_args` as its argument(s). + */ + fn schedule_delayed( + &self, + after_secs: f64, + thread_or_function: LuaValue<'_>, + thread_args: LuaMultiValue<'_>, + ) -> LuaResult { + self.queue_async_task(thread_or_function, Some(thread_args), None, async move { + sleep(Duration::from_secs_f64(after_secs)).await; + Ok(None) + }) + } + + /** + Schedules a lua thread or function to + be resumed after waiting asynchronously. + + The given lua thread or function will be resumed + using the elapsed time as its one and only argument. + */ + fn schedule_wait( + &self, + after_secs: f64, + thread_or_function: LuaValue<'_>, + ) -> LuaResult { + self.queue_async_task( + thread_or_function, + None, + // Wait should recycle the guid of the current task, + // which ensures that the TaskReference is identical and + // that any waits inside of spawned tasks will also cancel + self.guid_running.get(), + async move { + sleep(Duration::from_secs_f64(after_secs)).await; + Ok(None) + }, + ) + } +} diff --git a/packages/lib/src/lua/task/message.rs b/packages/lib/src/lua/task/message.rs index e0af4f9..463aff7 100644 --- a/packages/lib/src/lua/task/message.rs +++ b/packages/lib/src/lua/task/message.rs @@ -1,5 +1,7 @@ use mlua::prelude::*; +/// Internal message enum for the task scheduler, used to notify +/// futures to wake up and schedule their respective blocking tasks #[derive(Debug, Clone)] pub enum TaskSchedulerMessage { NewBlockingTaskReady, diff --git a/packages/lib/src/lua/task/mod.rs b/packages/lib/src/lua/task/mod.rs index 516d5dd..666eb23 100644 --- a/packages/lib/src/lua/task/mod.rs +++ b/packages/lib/src/lua/task/mod.rs @@ -1,10 +1,10 @@ mod async_handle; +mod ext; mod message; mod result; mod scheduler; mod task_kind; mod task_reference; -pub use scheduler::TaskScheduler; -pub use task_kind::TaskKind; -pub use task_reference::TaskReference; +pub use ext::*; +pub use scheduler::*; diff --git a/packages/lib/src/lua/task/result.rs b/packages/lib/src/lua/task/result.rs index 0b67d8f..1068a84 100644 --- a/packages/lib/src/lua/task/result.rs +++ b/packages/lib/src/lua/task/result.rs @@ -2,9 +2,9 @@ use std::{fmt, process::ExitCode}; use mlua::prelude::*; -use super::TaskScheduler; +use super::scheduler::TaskScheduler; -/// A struct representing the current state of the task scheduler +/// Struct representing the current state of the task scheduler #[derive(Debug, Clone)] pub struct TaskSchedulerState { lua_error: Option, diff --git a/packages/lib/src/lua/task/scheduler.rs b/packages/lib/src/lua/task/scheduler.rs index b81214a..a78b199 100644 --- a/packages/lib/src/lua/task/scheduler.rs +++ b/packages/lib/src/lua/task/scheduler.rs @@ -3,32 +3,22 @@ use std::{ cell::{Cell, RefCell}, collections::{HashMap, VecDeque}, process::ExitCode, - time::Duration, }; -use futures_util::{future::LocalBoxFuture, stream::FuturesUnordered, Future, StreamExt}; +use futures_util::{future::LocalBoxFuture, stream::FuturesUnordered, Future}; use mlua::prelude::*; use tokio::{ sync::{mpsc, Mutex as AsyncMutex}, - time::{sleep, Instant}, + time::Instant, }; -use crate::utils::table::TableBuilder; - -use super::{ - async_handle::TaskSchedulerAsyncHandle, message::TaskSchedulerMessage, - result::TaskSchedulerState, task_kind::TaskKind, task_reference::TaskReference, -}; +use super::message::TaskSchedulerMessage; +pub use super::{task_kind::TaskKind, task_reference::TaskReference}; type TaskFutureRets<'fut> = LuaResult>>; type TaskFuture<'fut> = LocalBoxFuture<'fut, (TaskReference, TaskFutureRets<'fut>)>; -const TASK_ASYNC_IMPL_LUA: &str = r#" -resumeAsync(thread(), ...) -return yield() -"#; - /// A struct representing a task contained in the task scheduler #[derive(Debug)] pub struct Task { @@ -52,9 +42,9 @@ pub struct TaskScheduler<'fut> { which must use async-aware mutexes to be cancellation safe across await points. */ // Internal state & flags - lua: &'static Lua, - guid: Cell, - guid_running: Cell>, + pub(super) lua: &'static Lua, + pub(super) guid: Cell, + pub(super) guid_running: Cell>, pub(super) exit_code: Cell>, // Blocking tasks pub(super) tasks: RefCell>, @@ -62,8 +52,8 @@ pub struct TaskScheduler<'fut> { // Future tasks & objects for waking pub(super) futures: AsyncMutex>>, pub(super) futures_registered_count: Cell, - futures_tx: mpsc::UnboundedSender, - futures_rx: AsyncMutex>, + pub(super) futures_tx: mpsc::UnboundedSender, + pub(super) futures_rx: AsyncMutex>, } impl<'fut> TaskScheduler<'fut> { @@ -357,316 +347,4 @@ impl<'fut> TaskScheduler<'fut> { })); Ok(task_ref) } - - /** - Schedules a lua thread or function to resume ***first*** during this - resumption point, ***skipping ahead*** of any other currently queued tasks. - - The given lua thread or function will be resumed - using the given `thread_args` as its argument(s). - */ - pub fn schedule_next( - &self, - thread_or_function: LuaValue<'_>, - thread_args: LuaMultiValue<'_>, - ) -> LuaResult { - self.queue_blocking_task( - TaskKind::Instant, - thread_or_function, - Some(thread_args), - None, - ) - } - - /** - Schedules a lua thread or function to resume ***after all*** - currently resuming tasks, during this resumption point. - - The given lua thread or function will be resumed - using the given `thread_args` as its argument(s). - */ - pub fn schedule_deferred( - &self, - thread_or_function: LuaValue<'_>, - thread_args: LuaMultiValue<'_>, - ) -> LuaResult { - self.queue_blocking_task( - TaskKind::Deferred, - thread_or_function, - Some(thread_args), - None, - ) - } - - /** - Schedules a lua thread or function to - be resumed after waiting asynchronously. - - The given lua thread or function will be resumed - using the given `thread_args` as its argument(s). - */ - pub fn schedule_delayed( - &self, - after_secs: f64, - thread_or_function: LuaValue<'_>, - thread_args: LuaMultiValue<'_>, - ) -> LuaResult { - self.queue_async_task(thread_or_function, Some(thread_args), None, async move { - sleep(Duration::from_secs_f64(after_secs)).await; - Ok(None) - }) - } - - /** - Schedules a lua thread or function to - be resumed after waiting asynchronously. - - The given lua thread or function will be resumed - using the elapsed time as its one and only argument. - */ - pub fn schedule_wait( - &self, - after_secs: f64, - thread_or_function: LuaValue<'_>, - ) -> LuaResult { - self.queue_async_task( - thread_or_function, - None, - // Wait should recycle the guid of the current task, - // which ensures that the TaskReference is identical and - // that any waits inside of spawned tasks will also cancel - self.guid_running.get(), - async move { - sleep(Duration::from_secs_f64(after_secs)).await; - Ok(None) - }, - ) - } - - /** - Schedules a lua thread or function - to be resumed after running a future. - - The given lua thread or function will be resumed - using the optional arguments returned by the future. - */ - #[allow(dead_code)] - pub fn schedule_async<'sched, R, F, FR>( - &'sched self, - thread_or_function: LuaValue<'_>, - func: F, - ) -> LuaResult - where - 'sched: 'fut, // Scheduler must live at least as long as the future - R: ToLuaMulti<'static>, - F: 'static + Fn(&'static Lua) -> FR, - FR: 'static + Future>, - { - self.queue_async_task(thread_or_function, None, None, async move { - match func(self.lua).await { - Ok(res) => match res.to_lua_multi(self.lua) { - Ok(multi) => Ok(Some(multi)), - Err(e) => Err(e), - }, - Err(e) => Err(e), - } - }) - } - - /** - Creates a function callable from Lua that runs an async - closure and returns the results of it to the call site. - */ - pub fn make_scheduled_async_fn(&self, func: F) -> LuaResult - where - A: FromLuaMulti<'static>, - R: ToLuaMulti<'static>, - F: 'static + Fn(&'static Lua, A) -> FR, - FR: 'static + Future>, - { - let async_env_thread: LuaFunction = self.lua.named_registry_value("co.thread")?; - let async_env_yield: LuaFunction = self.lua.named_registry_value("co.yield")?; - self.lua - .load(TASK_ASYNC_IMPL_LUA) - .set_environment( - TableBuilder::new(self.lua)? - .with_value("thread", async_env_thread)? - .with_value("yield", async_env_yield)? - .with_function( - "resumeAsync", - move |lua: &Lua, (thread, args): (LuaThread, A)| { - let fut = func(lua, args); - let sched = lua.app_data_ref::<&TaskScheduler>().unwrap(); - sched.queue_async_task(LuaValue::Thread(thread), None, None, async { - let rets = fut.await?; - let mult = rets.to_lua_multi(lua)?; - Ok(Some(mult)) - }) - }, - )? - .build_readonly()?, - )? - .into_function() - } - - /** - Registers a new background task with the task scheduler. - - This will ensure that the task scheduler keeps running until a - call to [`TaskScheduler::deregister_background_task`] is made. - - The returned [`TaskSchedulerUnregistrar::unregister`] - must be called upon completion of the background task to - prevent the task scheduler from running indefinitely. - */ - pub fn register_background_task(&self) -> TaskSchedulerAsyncHandle { - let sender = self.futures_tx.clone(); - sender - .send(TaskSchedulerMessage::Spawned) - .unwrap_or_else(|e| { - panic!( - "\ - \nFailed to unregister background task - this is an internal error! \ - \nPlease report it at {} \ - \nDetails: {e} \ - ", - env!("CARGO_PKG_REPOSITORY") - ) - }); - TaskSchedulerAsyncHandle::new(sender) - } - - /** - Resumes the next queued Lua task, if one exists, blocking - the current thread until it either yields or finishes. - */ - fn resume_next_blocking_task( - &self, - override_args: Option>, - ) -> TaskSchedulerState { - match { - let mut queue_guard = self.tasks_queue_blocking.borrow_mut(); - let task = queue_guard.pop_front(); - drop(queue_guard); - task - } { - None => TaskSchedulerState::new(self), - Some(task) => match self.resume_task(task, override_args) { - Ok(_) => TaskSchedulerState::new(self), - Err(task_err) => TaskSchedulerState::err(self, task_err), - }, - } - } - - /** - Awaits the first available queued future, and resumes its associated - Lua task which will be ready for resumption when that future wakes. - - Panics if there are no futures currently queued. - - Use [`TaskScheduler::next_queue_future_exists`] - to check if there are any queued futures. - */ - async fn resume_next_async_task(&self) -> TaskSchedulerState { - let (task, result) = { - let mut futs = self - .futures - .try_lock() - .expect("Tried to resume next queued future while already resuming or modifying"); - futs.next() - .await - .expect("Tried to resume next queued future but none are queued") - }; - // Promote this future task to a blocking task and resume it - // right away, also taking care to not borrow mutably twice - // by dropping this guard before trying to resume it - let mut queue_guard = self.tasks_queue_blocking.borrow_mut(); - queue_guard.push_front(task); - drop(queue_guard); - self.resume_next_blocking_task(result.transpose()) - } - - /** - Awaits the next background task registration - message, if any messages exist in the queue. - - This is a no-op if there are no background tasks left running - and / or the background task messages channel was closed. - */ - async fn receive_next_message(&self) -> TaskSchedulerState { - let message_opt = { - let mut rx = self.futures_rx.lock().await; - rx.recv().await - }; - if let Some(message) = message_opt { - match message { - TaskSchedulerMessage::NewBlockingTaskReady => TaskSchedulerState::new(self), - TaskSchedulerMessage::Spawned => { - let prev = self.futures_registered_count.get(); - self.futures_registered_count.set(prev + 1); - TaskSchedulerState::new(self) - } - TaskSchedulerMessage::Terminated(result) => { - let prev = self.futures_registered_count.get(); - self.futures_registered_count.set(prev - 1); - if prev == 0 { - panic!( - r#" - Terminated a background task without it running - this is an internal error! - Please report it at {} - "#, - env!("CARGO_PKG_REPOSITORY") - ) - } - if let Err(e) = result { - TaskSchedulerState::err(self, e) - } else { - TaskSchedulerState::new(self) - } - } - } - } else { - TaskSchedulerState::new(self) - } - } - - /** - Resumes the task scheduler queue. - - This will run any spawned or deferred Lua tasks in a blocking manner. - - Once all spawned and / or deferred Lua tasks have finished running, - this will process delayed tasks, waiting tasks, and native Rust - futures concurrently, awaiting the first one to be ready for resumption. - */ - pub async fn resume_queue(&self) -> TaskSchedulerState { - let current = TaskSchedulerState::new(self); - /* - Resume tasks in the internal queue, in this order: - - * 🛑 = blocking - lua tasks, in order - * ⏳ = async - first come, first serve - - 1. 🛑 Tasks from task.spawn / task.defer, the main thread - 2. ⏳ Tasks from task.delay / task.wait, spawned background tasks - */ - if current.has_blocking_tasks() { - self.resume_next_blocking_task(None) - } else if current.has_future_tasks() && current.has_background_tasks() { - // Futures, spawned background tasks - tokio::select! { - result = self.resume_next_async_task() => result, - result = self.receive_next_message() => result, - } - } else if current.has_future_tasks() { - // Futures - self.resume_next_async_task().await - } else if current.has_background_tasks() { - // Only spawned background tasks, these may then - // spawn new lua tasks and "wake up" the scheduler - self.receive_next_message().await - } else { - TaskSchedulerState::new(self) - } - } } diff --git a/packages/lib/src/lua/task/task_kind.rs b/packages/lib/src/lua/task/task_kind.rs index 7665153..6a82f49 100644 --- a/packages/lib/src/lua/task/task_kind.rs +++ b/packages/lib/src/lua/task/task_kind.rs @@ -1,6 +1,6 @@ use std::fmt; -/// An enum representing different kinds of tasks +/// Enum representing different kinds of tasks #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] pub enum TaskKind { Instant, diff --git a/packages/lib/src/utils/table.rs b/packages/lib/src/utils/table.rs index 2ec39ab..c9c60f2 100644 --- a/packages/lib/src/utils/table.rs +++ b/packages/lib/src/utils/table.rs @@ -2,7 +2,7 @@ use std::future::Future; use mlua::prelude::*; -use crate::lua::task::TaskScheduler; +use crate::lua::task::{TaskScheduler, TaskSchedulerAsyncExt}; pub struct TableBuilder { lua: &'static Lua, @@ -78,7 +78,10 @@ impl TableBuilder { F: 'static + Fn(&'static Lua, A) -> FR, FR: 'static + Future>, { - let sched = self.lua.app_data_ref::<&TaskScheduler>().unwrap(); + let sched = self + .lua + .app_data_ref::<&TaskScheduler>() + .expect("Missing task scheduler - make sure it is added as a lua app data before the first scheduler resumption"); let func = sched.make_scheduled_async_fn(func)?; self.with_value(key, LuaValue::Function(func)) }