From 0737c3254e1e7868426c66815c23ee11708862c1 Mon Sep 17 00:00:00 2001 From: Filip Tibell Date: Tue, 21 Feb 2023 11:45:09 +0100 Subject: [PATCH] Fix internal task scheduler counter bug, do some cleanup --- packages/lib/src/lua/async_ext.rs | 3 ++ packages/lib/src/lua/task/ext/async_ext.rs | 5 ++- packages/lib/src/lua/task/ext/resume_ext.rs | 35 +++++++++---------- packages/lib/src/lua/task/mod.rs | 9 ++--- packages/lib/src/lua/task/scheduler.rs | 22 ++++++++++-- .../{async_handle.rs => scheduler_handle.rs} | 2 +- .../task/{message.rs => scheduler_message.rs} | 0 .../task/{result.rs => scheduler_state.rs} | 32 ++++------------- 8 files changed, 54 insertions(+), 54 deletions(-) rename packages/lib/src/lua/task/{async_handle.rs => scheduler_handle.rs} (95%) rename packages/lib/src/lua/task/{message.rs => scheduler_message.rs} (100%) rename packages/lib/src/lua/task/{result.rs => scheduler_state.rs} (87%) diff --git a/packages/lib/src/lua/async_ext.rs b/packages/lib/src/lua/async_ext.rs index 4518cb2..b15d3d2 100644 --- a/packages/lib/src/lua/async_ext.rs +++ b/packages/lib/src/lua/async_ext.rs @@ -71,6 +71,9 @@ impl LuaAsyncExt for &'static Lua { Creates a special async function that waits the desired amount of time, inheriting the guid of the current thread / task for proper cancellation. + + This will yield the lua thread calling the function until the + desired time has passed and the scheduler resumes the thread. */ fn create_waiter_function<'lua>(self) -> LuaResult> { let async_env_yield: LuaFunction = self.named_registry_value("co.yield")?; diff --git a/packages/lib/src/lua/task/ext/async_ext.rs b/packages/lib/src/lua/task/ext/async_ext.rs index 46a6707..5b9f23e 100644 --- a/packages/lib/src/lua/task/ext/async_ext.rs +++ b/packages/lib/src/lua/task/ext/async_ext.rs @@ -9,8 +9,8 @@ use tokio::time::{sleep, Instant}; use crate::lua::task::TaskKind; use super::super::{ - async_handle::TaskSchedulerAsyncHandle, message::TaskSchedulerMessage, - scheduler::TaskReference, scheduler::TaskScheduler, + scheduler::TaskReference, scheduler::TaskScheduler, scheduler_handle::TaskSchedulerAsyncHandle, + scheduler_message::TaskSchedulerMessage, }; /* @@ -120,7 +120,6 @@ impl<'fut> TaskSchedulerAsyncExt<'fut> for TaskScheduler<'fut> { .futures .try_lock() .expect("Tried to add future to queue during futures resumption"); - self.futures_count.set(self.futures_count.get() + 1); futs.push(Box::pin(async move { let before = Instant::now(); sleep(Duration::from_secs_f64( diff --git a/packages/lib/src/lua/task/ext/resume_ext.rs b/packages/lib/src/lua/task/ext/resume_ext.rs index f6d099c..b7bb9a1 100644 --- a/packages/lib/src/lua/task/ext/resume_ext.rs +++ b/packages/lib/src/lua/task/ext/resume_ext.rs @@ -7,7 +7,9 @@ use mlua::prelude::*; use futures_util::StreamExt; use tokio::time::sleep; -use super::super::{message::TaskSchedulerMessage, result::TaskSchedulerState, TaskScheduler}; +use super::super::{ + scheduler_message::TaskSchedulerMessage, scheduler_state::TaskSchedulerState, TaskScheduler, +}; /* ────────────────────────────────────────────────────────── @@ -30,19 +32,21 @@ pub trait TaskSchedulerResumeExt { #[async_trait(?Send)] impl TaskSchedulerResumeExt for TaskScheduler<'_> { /** - Awaits the next background task registration - message, if any messages exist in the queue. + Resumes the task scheduler queue. - This is a no-op if there are no background tasks left running - and / or the background task messages channel was closed. + This will run any spawned or deferred Lua tasks in a blocking manner. + + Once all spawned and / or deferred Lua tasks have finished running, + this will process delayed tasks, waiting tasks, and native Rust + futures concurrently, awaiting the first one to be ready for resumption. */ async fn resume_queue(&self) -> TaskSchedulerState { let current = TaskSchedulerState::new(self); - let result = if current.has_blocking_tasks() { + let result = if current.num_blocking > 0 { // 1. Blocking tasks resume_next_blocking_task(self, None) - } else if current.has_future_tasks() || current.has_background_tasks() { - // 2. Async + background tasks + } else if current.num_futures > 0 || current.num_background > 0 { + // 2. Async and/or background tasks tokio::select! { result = resume_next_async_task(self) => result, result = receive_next_message(self) => result, @@ -111,11 +115,6 @@ async fn resume_next_async_task(scheduler: &TaskScheduler<'_>) -> TaskSchedulerS }; // The future might not return a reference that it wants to resume if let Some(task) = task { - // Decrement the counter since the future has completed, - // meaning it has been removed from the futures queue - scheduler - .futures_count - .set(scheduler.futures_count.get() - 1); // Promote this future task to a blocking task and resume it // right away, also taking care to not borrow mutably twice // by dropping this guard before trying to resume it @@ -127,13 +126,11 @@ async fn resume_next_async_task(scheduler: &TaskScheduler<'_>) -> TaskSchedulerS } /** - Resumes the task scheduler queue. + Awaits the next background task registration + message, if any messages exist in the queue. - This will run any spawned or deferred Lua tasks in a blocking manner. - - Once all spawned and / or deferred Lua tasks have finished running, - this will process delayed tasks, waiting tasks, and native Rust - futures concurrently, awaiting the first one to be ready for resumption. + This is a no-op if there are no background tasks left running + and / or the background task messages channel was closed. */ async fn receive_next_message(scheduler: &TaskScheduler<'_>) -> TaskSchedulerState { let message_opt = { diff --git a/packages/lib/src/lua/task/mod.rs b/packages/lib/src/lua/task/mod.rs index ea62066..f9d1813 100644 --- a/packages/lib/src/lua/task/mod.rs +++ b/packages/lib/src/lua/task/mod.rs @@ -1,13 +1,14 @@ -mod async_handle; mod ext; -mod message; mod proxy; -mod result; mod scheduler; +mod scheduler_handle; +mod scheduler_message; +mod scheduler_state; mod task_kind; mod task_reference; -pub use async_handle::*; pub use ext::*; pub use proxy::*; pub use scheduler::*; +pub use scheduler_handle::*; +pub use scheduler_state::*; diff --git a/packages/lib/src/lua/task/scheduler.rs b/packages/lib/src/lua/task/scheduler.rs index 6c6c53e..6752e64 100644 --- a/packages/lib/src/lua/task/scheduler.rs +++ b/packages/lib/src/lua/task/scheduler.rs @@ -11,7 +11,7 @@ use mlua::prelude::*; use tokio::sync::{mpsc, Mutex as AsyncMutex}; -use super::message::TaskSchedulerMessage; +use super::scheduler_message::TaskSchedulerMessage; pub use super::{task_kind::TaskKind, task_reference::TaskReference}; type TaskFutureRets<'fut> = LuaResult>>; @@ -20,6 +20,7 @@ type TaskFuture<'fut> = LocalBoxFuture<'fut, (Option, TaskFutureR /// A struct representing a task contained in the task scheduler #[derive(Debug)] pub struct Task { + kind: TaskKind, thread: LuaRegistryKey, args: LuaRegistryKey, } @@ -44,6 +45,7 @@ pub struct TaskScheduler<'fut> { pub(super) exit_code: Cell>, // Blocking tasks pub(super) tasks: RefCell>, + pub(super) tasks_count: Cell, pub(super) tasks_current: Cell>, pub(super) tasks_queue_blocking: RefCell>, pub(super) tasks_current_lua_error: Arc>>, @@ -72,6 +74,7 @@ impl<'fut> TaskScheduler<'fut> { guid: Cell::new(0), exit_code: Cell::new(None), tasks: RefCell::new(HashMap::new()), + tasks_count: Cell::new(0), tasks_current: Cell::new(None), tasks_queue_blocking: RefCell::new(VecDeque::new()), tasks_current_lua_error, @@ -203,6 +206,7 @@ impl<'fut> TaskScheduler<'fut> { let task_thread_key: LuaRegistryKey = self.lua.create_registry_value(thread)?; // Create the full task struct let task = Task { + kind, thread: task_thread_key, args: task_args_key, }; @@ -217,6 +221,11 @@ impl<'fut> TaskScheduler<'fut> { guid }; let reference = TaskReference::new(kind, guid); + // Increment the corresponding task counter + match kind { + TaskKind::Future => self.futures_count.set(self.futures_count.get() + 1), + _ => self.tasks_count.set(self.tasks_count.get() + 1), + } // Add the task to the scheduler { let mut tasks = self.tasks.borrow_mut(); @@ -256,6 +265,11 @@ impl<'fut> TaskScheduler<'fut> { .collect(); for task_ref in &tasks_to_remove { if let Some(task) = tasks.remove(task_ref) { + // Decrement the corresponding task counter + match task.kind { + TaskKind::Future => self.futures_count.set(self.futures_count.get() - 1), + _ => self.tasks_count.set(self.tasks_count.get() - 1), + } // NOTE: We need to close the thread here to // make 100% sure that nothing can resume it let close: LuaFunction = self.lua.named_registry_value("co.close")?; @@ -291,6 +305,11 @@ impl<'fut> TaskScheduler<'fut> { None => return Ok(LuaMultiValue::new()), } }; + // Decrement the corresponding task counter + match task.kind { + TaskKind::Future => self.futures_count.set(self.futures_count.get() - 1), + _ => self.tasks_count.set(self.tasks_count.get() - 1), + } // Fetch and remove the thread to resume + its arguments let thread: LuaThread = self.lua.registry_value(&task.thread)?; let args_opt_res = override_args.or_else(|| { @@ -389,7 +408,6 @@ impl<'fut> TaskScheduler<'fut> { .futures .try_lock() .expect("Tried to add future to queue during futures resumption"); - self.futures_count.set(self.futures_count.get() + 1); futs.push(Box::pin(async move { let result = fut.await; (Some(task_ref), result) diff --git a/packages/lib/src/lua/task/async_handle.rs b/packages/lib/src/lua/task/scheduler_handle.rs similarity index 95% rename from packages/lib/src/lua/task/async_handle.rs rename to packages/lib/src/lua/task/scheduler_handle.rs index 18bcb39..08993fc 100644 --- a/packages/lib/src/lua/task/async_handle.rs +++ b/packages/lib/src/lua/task/scheduler_handle.rs @@ -4,7 +4,7 @@ use mlua::prelude::*; use tokio::sync::mpsc; -use super::message::TaskSchedulerMessage; +use super::scheduler_message::TaskSchedulerMessage; /** A handle to a registered asynchronous background task. diff --git a/packages/lib/src/lua/task/message.rs b/packages/lib/src/lua/task/scheduler_message.rs similarity index 100% rename from packages/lib/src/lua/task/message.rs rename to packages/lib/src/lua/task/scheduler_message.rs diff --git a/packages/lib/src/lua/task/result.rs b/packages/lib/src/lua/task/scheduler_state.rs similarity index 87% rename from packages/lib/src/lua/task/result.rs rename to packages/lib/src/lua/task/scheduler_state.rs index 309981d..f36e889 100644 --- a/packages/lib/src/lua/task/result.rs +++ b/packages/lib/src/lua/task/scheduler_state.rs @@ -6,27 +6,21 @@ use super::scheduler::TaskScheduler; /// Struct representing the current state of the task scheduler #[derive(Debug, Clone)] +#[must_use = "Scheduler state must be checked after every resumption"] pub struct TaskSchedulerState { - lua_error: Option, - exit_code: Option, - num_blocking: usize, - num_futures: usize, - num_background: usize, + pub(super) lua_error: Option, + pub(super) exit_code: Option, + pub(super) num_blocking: usize, + pub(super) num_futures: usize, + pub(super) num_background: usize, } impl TaskSchedulerState { pub(super) fn new(sched: &TaskScheduler) -> Self { - const MESSAGE: &str = "\ - Failed to get lock on or borrow internal scheduler state!\ - \nMake sure not to call during task scheduler resumption"; Self { lua_error: None, exit_code: sched.exit_code.get(), - num_blocking: sched - .tasks_queue_blocking - .try_borrow() - .expect(MESSAGE) - .len(), + num_blocking: sched.tasks_count.get(), num_futures: sched.futures_count.get(), num_background: sched.futures_background_count.get(), } @@ -38,18 +32,6 @@ impl TaskSchedulerState { this } - pub(super) fn has_blocking_tasks(&self) -> bool { - self.num_blocking > 0 - } - - pub(super) fn has_future_tasks(&self) -> bool { - self.num_futures > 0 - } - - pub(super) fn has_background_tasks(&self) -> bool { - self.num_background > 0 - } - /** Returns a clone of the error from this task scheduler result, if any.