From 3990b8e064a81a1449a8bdbeaf49835211560e68 Mon Sep 17 00:00:00 2001 From: Filip Tibell Date: Tue, 14 Feb 2023 21:36:04 +0100 Subject: [PATCH] Properly handle blocking tasks spawned during async tasks in new scheduler --- packages/lib/src/lua/task/scheduler.rs | 94 ++++++++++++++++---------- 1 file changed, 60 insertions(+), 34 deletions(-) diff --git a/packages/lib/src/lua/task/scheduler.rs b/packages/lib/src/lua/task/scheduler.rs index 318759d..ab32cac 100644 --- a/packages/lib/src/lua/task/scheduler.rs +++ b/packages/lib/src/lua/task/scheduler.rs @@ -91,11 +91,11 @@ pub struct Task { #[derive(Debug)] pub struct TaskSchedulerBackgroundTaskHandle { unregistered: bool, - sender: mpsc::UnboundedSender, + sender: mpsc::UnboundedSender, } impl TaskSchedulerBackgroundTaskHandle { - pub fn new(sender: mpsc::UnboundedSender) -> Self { + pub fn new(sender: mpsc::UnboundedSender) -> Self { Self { unregistered: false, sender, @@ -105,7 +105,7 @@ impl TaskSchedulerBackgroundTaskHandle { pub fn unregister(mut self, result: LuaResult<()>) { self.unregistered = true; self.sender - .send(TaskSchedulerRegistrationMessage::Terminated(result)) + .send(TaskSchedulerMessage::Terminated(result)) .unwrap_or_else(|_| { panic!( "\ @@ -285,7 +285,8 @@ impl fmt::Display for TaskSchedulerResult { } #[derive(Debug, Clone)] -pub enum TaskSchedulerRegistrationMessage { +pub enum TaskSchedulerMessage { + NewBlockingTaskReady, Spawned, Terminated(LuaResult<()>), } @@ -297,8 +298,8 @@ pub struct TaskScheduler<'fut> { lua: &'static Lua, tasks: Arc>>, futures: Arc>>>, - futures_tx: mpsc::UnboundedSender, - futures_rx: Arc>>, + futures_tx: mpsc::UnboundedSender, + futures_rx: Arc>>, futures_in_background: AtomicUsize, task_queue_instant: TaskSchedulerQueue, task_queue_deferred: TaskSchedulerQueue, @@ -424,7 +425,7 @@ impl<'fut> TaskScheduler<'fut> { -- Here we have either yielded or finished the above task ``` */ - fn queue_task( + fn queue_blocking_task( &self, kind: TaskKind, thread_or_function: LuaValue<'_>, @@ -436,25 +437,49 @@ impl<'fut> TaskScheduler<'fut> { panic!("Tried to schedule future using normal task schedule method") } let task_ref = self.create_task(kind, thread_or_function, thread_args, guid_to_reuse)?; - match kind { - TaskKind::Instant => { - let mut queue = self.task_queue_instant.lock().unwrap(); - if after_current_resume { - assert!( - queue.len() > 0, - "Cannot schedule a task after the first instant when task queue is empty" - ); - queue.insert(1, task_ref); - } else { - queue.push_front(task_ref); - } - } - TaskKind::Deferred => { - // Deferred tasks should always schedule at the end of the deferred queue - let mut queue = self.task_queue_deferred.lock().unwrap(); + // Note that we create two new inner new + // scopes to drop mutexes as fast as possible + let num_prev_blocking_tasks = { + let (should_defer, num_prev_tasks, mut queue) = { + let queue_instant = self.task_queue_instant.lock().unwrap(); + let queue_deferred = self.task_queue_deferred.lock().unwrap(); + let num_prev_tasks = queue_instant.len() + queue_deferred.len(); + ( + kind == TaskKind::Deferred, + num_prev_tasks, + match kind { + TaskKind::Instant => queue_instant, + TaskKind::Deferred => queue_deferred, + TaskKind::Future => unreachable!(), + }, + ) + }; + if should_defer { queue.push_back(task_ref); + } else if after_current_resume { + assert!( + queue.len() > 0, + "Cannot schedule a task after the first instant when task queue is empty" + ); + queue.insert(1, task_ref); + } else { + queue.push_front(task_ref); } - TaskKind::Future => unreachable!(), + num_prev_tasks + }; + /* + If we had any previous task and are currently async + waiting on tasks, we should send a signal to wake up + and run the new blocking task that was just queued + + This can happen in cases such as an async http + server waking up from a connection and then wanting to + run a lua callback in response, to create the.. response + */ + if num_prev_blocking_tasks == 0 { + self.futures_tx + .send(TaskSchedulerMessage::NewBlockingTaskReady) + .expect("Futures waker channel was closed") } Ok(task_ref) } @@ -462,7 +487,7 @@ impl<'fut> TaskScheduler<'fut> { /** Queues a new future to run on the task scheduler. */ - fn queue_async( + fn queue_async_task( &self, thread_or_function: LuaValue<'_>, thread_args: Option>, @@ -498,7 +523,7 @@ impl<'fut> TaskScheduler<'fut> { thread_or_function: LuaValue<'_>, thread_args: LuaMultiValue<'_>, ) -> LuaResult { - self.queue_task( + self.queue_blocking_task( TaskKind::Instant, thread_or_function, Some(thread_args), @@ -519,7 +544,7 @@ impl<'fut> TaskScheduler<'fut> { thread_or_function: LuaValue<'_>, thread_args: LuaMultiValue<'_>, ) -> LuaResult { - self.queue_task( + self.queue_blocking_task( TaskKind::Instant, thread_or_function, Some(thread_args), @@ -546,7 +571,7 @@ impl<'fut> TaskScheduler<'fut> { thread_or_function: LuaValue<'_>, thread_args: LuaMultiValue<'_>, ) -> LuaResult { - self.queue_task( + self.queue_blocking_task( TaskKind::Deferred, thread_or_function, Some(thread_args), @@ -568,7 +593,7 @@ impl<'fut> TaskScheduler<'fut> { thread_or_function: LuaValue<'_>, thread_args: LuaMultiValue<'_>, ) -> LuaResult { - self.queue_async(thread_or_function, Some(thread_args), None, async move { + self.queue_async_task(thread_or_function, Some(thread_args), None, async move { sleep(Duration::from_secs_f64(after_secs)).await; Ok(None) }) @@ -586,7 +611,7 @@ impl<'fut> TaskScheduler<'fut> { after_secs: f64, thread_or_function: LuaValue<'_>, ) -> LuaResult { - self.queue_async( + self.queue_async_task( thread_or_function, None, // Wait should recycle the guid of the current task, @@ -615,7 +640,7 @@ impl<'fut> TaskScheduler<'fut> { thread_or_function: LuaValue<'_>, fut: impl Future> + 'fut, ) -> LuaResult { - self.queue_async(thread_or_function, None, None, fut) + self.queue_async_task(thread_or_function, None, None, fut) } /** @@ -768,7 +793,7 @@ impl<'fut> TaskScheduler<'fut> { pub fn register_background_task(&self) -> TaskSchedulerBackgroundTaskHandle { let sender = self.futures_tx.clone(); sender - .send(TaskSchedulerRegistrationMessage::Spawned) + .send(TaskSchedulerMessage::Spawned) .unwrap_or_else(|e| { panic!( "\ @@ -876,11 +901,12 @@ impl<'fut> TaskScheduler<'fut> { }; if let Some(message) = message_opt { match message { - TaskSchedulerRegistrationMessage::Spawned => { + TaskSchedulerMessage::NewBlockingTaskReady => TaskSchedulerResult::new(self), + TaskSchedulerMessage::Spawned => { self.futures_in_background.fetch_add(1, Ordering::Relaxed); TaskSchedulerResult::new(self) } - TaskSchedulerRegistrationMessage::Terminated(result) => { + TaskSchedulerMessage::Terminated(result) => { let prev = self.futures_in_background.fetch_sub(1, Ordering::Relaxed); if prev == 0 { panic!(