From cdf7db51f68438004849a62257d243772ce41987 Mon Sep 17 00:00:00 2001 From: Filip Tibell Date: Mon, 20 Feb 2023 17:27:33 +0100 Subject: [PATCH] Fix scheduler not always waking up --- packages/lib/src/lua/net/server.rs | 4 +-- packages/lib/src/lua/task/ext/async_ext.rs | 6 +++- packages/lib/src/lua/task/ext/resume_ext.rs | 38 ++++++++------------- packages/lib/src/lua/task/mod.rs | 1 + packages/lib/src/lua/task/result.rs | 4 +-- packages/lib/src/lua/task/scheduler.rs | 9 +++-- 6 files changed, 30 insertions(+), 32 deletions(-) diff --git a/packages/lib/src/lua/net/server.rs b/packages/lib/src/lua/net/server.rs index 58e3751..a530acb 100644 --- a/packages/lib/src/lua/net/server.rs +++ b/packages/lib/src/lua/net/server.rs @@ -53,7 +53,7 @@ impl Service> for NetServiceInner { let sched = lua .app_data_ref::<&TaskScheduler>() .expect("Missing task scheduler"); - let handle = sched.register_background_task(); + let task = sched.register_background_task(); task::spawn_local(async move { // Create our new full websocket object, then // schedule our handler to get called asap @@ -66,7 +66,7 @@ impl Service> for NetServiceInner { lua.create_thread(handler)?, LuaMultiValue::from_vec(vec![LuaValue::Table(sock)]), ); - handle.unregister(Ok(())); + task.unregister(Ok(())); result }); Box::pin(async move { Ok(response) }) diff --git a/packages/lib/src/lua/task/ext/async_ext.rs b/packages/lib/src/lua/task/ext/async_ext.rs index 2fbf8a2..46a6707 100644 --- a/packages/lib/src/lua/task/ext/async_ext.rs +++ b/packages/lib/src/lua/task/ext/async_ext.rs @@ -120,9 +120,13 @@ impl<'fut> TaskSchedulerAsyncExt<'fut> for TaskScheduler<'fut> { .futures .try_lock() .expect("Tried to add future to queue during futures resumption"); + self.futures_count.set(self.futures_count.get() + 1); futs.push(Box::pin(async move { let before = Instant::now(); - sleep(Duration::from_secs_f64(duration.unwrap_or_default())).await; + sleep(Duration::from_secs_f64( + duration.unwrap_or_default().max(0.0), + )) + .await; let elapsed_secs = before.elapsed().as_secs_f64(); let args = elapsed_secs.to_lua_multi(self.lua).unwrap(); (Some(reference), Ok(Some(args))) diff --git a/packages/lib/src/lua/task/ext/resume_ext.rs b/packages/lib/src/lua/task/ext/resume_ext.rs index 91066bf..107a9b4 100644 --- a/packages/lib/src/lua/task/ext/resume_ext.rs +++ b/packages/lib/src/lua/task/ext/resume_ext.rs @@ -38,38 +38,23 @@ impl TaskSchedulerResumeExt for TaskScheduler<'_> { */ async fn resume_queue(&self) -> TaskSchedulerState { let current = TaskSchedulerState::new(self); - if current.has_blocking_tasks() { + let result = if current.has_blocking_tasks() { // 1. Blocking tasks resume_next_blocking_task(self, None) - } else if current.has_future_tasks() && current.has_background_tasks() { + } else if current.has_future_tasks() || current.has_background_tasks() { // 2. Async + background tasks tokio::select! { result = resume_next_async_task(self) => result, result = receive_next_message(self) => result, } - } else if current.has_future_tasks() { - // 3. Async tasks - resume_next_async_task(self).await - } else if current.has_background_tasks() { - // 4. Only background tasks left, meaning the task scheduler will - // get woken up by things such as a new network connection, we can - // take advantage of this and perform a GC cycle right away since - // lua threads don't care about that performance hit right now - if self.lua.gc_is_running() { - // Finish current (maybe partial) GC cycle if it is already running - self.lua.gc_collect().expect("Failed to garbage collect"); - } - self.lua.gc_collect().expect("Failed to garbage collect"); - self.lua.expire_registry_values(); - // All cleaned up, wait for the next background task to wake - receive_next_message(self).await } else { - // 5. No tasks left, here we sleep one millisecond in case + // 3. No tasks left, here we sleep one millisecond in case // the caller of resume_queue accidentally calls this in // a busy loop to prevent cpu usage from going to 100% sleep(Duration::from_millis(1)).await; TaskSchedulerState::new(self) - } + }; + result } } @@ -126,6 +111,11 @@ async fn resume_next_async_task(scheduler: &TaskScheduler<'_>) -> TaskSchedulerS }; // The future might not return a reference that it wants to resume if let Some(task) = task { + // Decrement the counter since the future has completed, + // meaning it has been removed from the futures queue + scheduler + .futures_count + .set(scheduler.futures_count.get() - 1); // Promote this future task to a blocking task and resume it // right away, also taking care to not borrow mutably twice // by dropping this guard before trying to resume it @@ -154,13 +144,13 @@ async fn receive_next_message(scheduler: &TaskScheduler<'_>) -> TaskSchedulerSta match message { TaskSchedulerMessage::NewBlockingTaskReady => TaskSchedulerState::new(scheduler), TaskSchedulerMessage::Spawned => { - let prev = scheduler.futures_registered_count.get(); - scheduler.futures_registered_count.set(prev + 1); + let prev = scheduler.futures_background_count.get(); + scheduler.futures_background_count.set(prev + 1); TaskSchedulerState::new(scheduler) } TaskSchedulerMessage::Terminated(result) => { - let prev = scheduler.futures_registered_count.get(); - scheduler.futures_registered_count.set(prev - 1); + let prev = scheduler.futures_background_count.get(); + scheduler.futures_background_count.set(prev - 1); if prev == 0 { panic!( r#" diff --git a/packages/lib/src/lua/task/mod.rs b/packages/lib/src/lua/task/mod.rs index dae3ac5..ea62066 100644 --- a/packages/lib/src/lua/task/mod.rs +++ b/packages/lib/src/lua/task/mod.rs @@ -7,6 +7,7 @@ mod scheduler; mod task_kind; mod task_reference; +pub use async_handle::*; pub use ext::*; pub use proxy::*; pub use scheduler::*; diff --git a/packages/lib/src/lua/task/result.rs b/packages/lib/src/lua/task/result.rs index 895311a..309981d 100644 --- a/packages/lib/src/lua/task/result.rs +++ b/packages/lib/src/lua/task/result.rs @@ -27,8 +27,8 @@ impl TaskSchedulerState { .try_borrow() .expect(MESSAGE) .len(), - num_futures: sched.futures.try_lock().expect(MESSAGE).len(), - num_background: sched.futures_registered_count.get(), + num_futures: sched.futures_count.get(), + num_background: sched.futures_background_count.get(), } } diff --git a/packages/lib/src/lua/task/scheduler.rs b/packages/lib/src/lua/task/scheduler.rs index e2f5e26..7573f65 100644 --- a/packages/lib/src/lua/task/scheduler.rs +++ b/packages/lib/src/lua/task/scheduler.rs @@ -49,7 +49,8 @@ pub struct TaskScheduler<'fut> { pub(super) tasks_current_lua_error: Arc>>, // Future tasks & objects for waking pub(super) futures: AsyncMutex>>, - pub(super) futures_registered_count: Cell, + pub(super) futures_count: Cell, + pub(super) futures_background_count: Cell, pub(super) futures_tx: mpsc::UnboundedSender, pub(super) futures_rx: AsyncMutex>, } @@ -77,7 +78,8 @@ impl<'fut> TaskScheduler<'fut> { futures: AsyncMutex::new(FuturesUnordered::new()), futures_tx: tx, futures_rx: AsyncMutex::new(rx), - futures_registered_count: Cell::new(0), + futures_count: Cell::new(0), + futures_background_count: Cell::new(0), }) } @@ -281,7 +283,7 @@ impl<'fut> TaskScheduler<'fut> { // NOTE: Setting this error here means that when the thread // is resumed it will error instantly, so we don't need // to call it with proper args, empty args is fine - *self.tasks_current_lua_error.borrow_mut() = Some(e); + self.tasks_current_lua_error.replace(Some(e)); thread.resume(()) } Ok(args) => thread.resume(args), @@ -358,6 +360,7 @@ impl<'fut> TaskScheduler<'fut> { .futures .try_lock() .expect("Tried to add future to queue during futures resumption"); + self.futures_count.set(self.futures_count.get() + 1); futs.push(Box::pin(async move { let result = fut.await; (Some(task_ref), result)