Fix scheduler not always waking up

This commit is contained in:
Filip Tibell 2023-02-20 17:27:33 +01:00
parent 5f3169c1bb
commit cdf7db51f6
No known key found for this signature in database
6 changed files with 30 additions and 32 deletions

View file

@ -53,7 +53,7 @@ impl Service<Request<Body>> for NetServiceInner {
let sched = lua let sched = lua
.app_data_ref::<&TaskScheduler>() .app_data_ref::<&TaskScheduler>()
.expect("Missing task scheduler"); .expect("Missing task scheduler");
let handle = sched.register_background_task(); let task = sched.register_background_task();
task::spawn_local(async move { task::spawn_local(async move {
// Create our new full websocket object, then // Create our new full websocket object, then
// schedule our handler to get called asap // schedule our handler to get called asap
@ -66,7 +66,7 @@ impl Service<Request<Body>> for NetServiceInner {
lua.create_thread(handler)?, lua.create_thread(handler)?,
LuaMultiValue::from_vec(vec![LuaValue::Table(sock)]), LuaMultiValue::from_vec(vec![LuaValue::Table(sock)]),
); );
handle.unregister(Ok(())); task.unregister(Ok(()));
result result
}); });
Box::pin(async move { Ok(response) }) Box::pin(async move { Ok(response) })

View file

@ -120,9 +120,13 @@ impl<'fut> TaskSchedulerAsyncExt<'fut> for TaskScheduler<'fut> {
.futures .futures
.try_lock() .try_lock()
.expect("Tried to add future to queue during futures resumption"); .expect("Tried to add future to queue during futures resumption");
self.futures_count.set(self.futures_count.get() + 1);
futs.push(Box::pin(async move { futs.push(Box::pin(async move {
let before = Instant::now(); let before = Instant::now();
sleep(Duration::from_secs_f64(duration.unwrap_or_default())).await; sleep(Duration::from_secs_f64(
duration.unwrap_or_default().max(0.0),
))
.await;
let elapsed_secs = before.elapsed().as_secs_f64(); let elapsed_secs = before.elapsed().as_secs_f64();
let args = elapsed_secs.to_lua_multi(self.lua).unwrap(); let args = elapsed_secs.to_lua_multi(self.lua).unwrap();
(Some(reference), Ok(Some(args))) (Some(reference), Ok(Some(args)))

View file

@ -38,38 +38,23 @@ impl TaskSchedulerResumeExt for TaskScheduler<'_> {
*/ */
async fn resume_queue(&self) -> TaskSchedulerState { async fn resume_queue(&self) -> TaskSchedulerState {
let current = TaskSchedulerState::new(self); let current = TaskSchedulerState::new(self);
if current.has_blocking_tasks() { let result = if current.has_blocking_tasks() {
// 1. Blocking tasks // 1. Blocking tasks
resume_next_blocking_task(self, None) resume_next_blocking_task(self, None)
} else if current.has_future_tasks() && current.has_background_tasks() { } else if current.has_future_tasks() || current.has_background_tasks() {
// 2. Async + background tasks // 2. Async + background tasks
tokio::select! { tokio::select! {
result = resume_next_async_task(self) => result, result = resume_next_async_task(self) => result,
result = receive_next_message(self) => result, result = receive_next_message(self) => result,
} }
} else if current.has_future_tasks() {
// 3. Async tasks
resume_next_async_task(self).await
} else if current.has_background_tasks() {
// 4. Only background tasks left, meaning the task scheduler will
// get woken up by things such as a new network connection, we can
// take advantage of this and perform a GC cycle right away since
// lua threads don't care about that performance hit right now
if self.lua.gc_is_running() {
// Finish current (maybe partial) GC cycle if it is already running
self.lua.gc_collect().expect("Failed to garbage collect");
}
self.lua.gc_collect().expect("Failed to garbage collect");
self.lua.expire_registry_values();
// All cleaned up, wait for the next background task to wake
receive_next_message(self).await
} else { } else {
// 5. No tasks left, here we sleep one millisecond in case // 3. No tasks left, here we sleep one millisecond in case
// the caller of resume_queue accidentally calls this in // the caller of resume_queue accidentally calls this in
// a busy loop to prevent cpu usage from going to 100% // a busy loop to prevent cpu usage from going to 100%
sleep(Duration::from_millis(1)).await; sleep(Duration::from_millis(1)).await;
TaskSchedulerState::new(self) TaskSchedulerState::new(self)
} };
result
} }
} }
@ -126,6 +111,11 @@ async fn resume_next_async_task(scheduler: &TaskScheduler<'_>) -> TaskSchedulerS
}; };
// The future might not return a reference that it wants to resume // The future might not return a reference that it wants to resume
if let Some(task) = task { if let Some(task) = task {
// Decrement the counter since the future has completed,
// meaning it has been removed from the futures queue
scheduler
.futures_count
.set(scheduler.futures_count.get() - 1);
// Promote this future task to a blocking task and resume it // Promote this future task to a blocking task and resume it
// right away, also taking care to not borrow mutably twice // right away, also taking care to not borrow mutably twice
// by dropping this guard before trying to resume it // by dropping this guard before trying to resume it
@ -154,13 +144,13 @@ async fn receive_next_message(scheduler: &TaskScheduler<'_>) -> TaskSchedulerSta
match message { match message {
TaskSchedulerMessage::NewBlockingTaskReady => TaskSchedulerState::new(scheduler), TaskSchedulerMessage::NewBlockingTaskReady => TaskSchedulerState::new(scheduler),
TaskSchedulerMessage::Spawned => { TaskSchedulerMessage::Spawned => {
let prev = scheduler.futures_registered_count.get(); let prev = scheduler.futures_background_count.get();
scheduler.futures_registered_count.set(prev + 1); scheduler.futures_background_count.set(prev + 1);
TaskSchedulerState::new(scheduler) TaskSchedulerState::new(scheduler)
} }
TaskSchedulerMessage::Terminated(result) => { TaskSchedulerMessage::Terminated(result) => {
let prev = scheduler.futures_registered_count.get(); let prev = scheduler.futures_background_count.get();
scheduler.futures_registered_count.set(prev - 1); scheduler.futures_background_count.set(prev - 1);
if prev == 0 { if prev == 0 {
panic!( panic!(
r#" r#"

View file

@ -7,6 +7,7 @@ mod scheduler;
mod task_kind; mod task_kind;
mod task_reference; mod task_reference;
pub use async_handle::*;
pub use ext::*; pub use ext::*;
pub use proxy::*; pub use proxy::*;
pub use scheduler::*; pub use scheduler::*;

View file

@ -27,8 +27,8 @@ impl TaskSchedulerState {
.try_borrow() .try_borrow()
.expect(MESSAGE) .expect(MESSAGE)
.len(), .len(),
num_futures: sched.futures.try_lock().expect(MESSAGE).len(), num_futures: sched.futures_count.get(),
num_background: sched.futures_registered_count.get(), num_background: sched.futures_background_count.get(),
} }
} }

View file

@ -49,7 +49,8 @@ pub struct TaskScheduler<'fut> {
pub(super) tasks_current_lua_error: Arc<RefCell<Option<LuaError>>>, pub(super) tasks_current_lua_error: Arc<RefCell<Option<LuaError>>>,
// Future tasks & objects for waking // Future tasks & objects for waking
pub(super) futures: AsyncMutex<FuturesUnordered<TaskFuture<'fut>>>, pub(super) futures: AsyncMutex<FuturesUnordered<TaskFuture<'fut>>>,
pub(super) futures_registered_count: Cell<usize>, pub(super) futures_count: Cell<usize>,
pub(super) futures_background_count: Cell<usize>,
pub(super) futures_tx: mpsc::UnboundedSender<TaskSchedulerMessage>, pub(super) futures_tx: mpsc::UnboundedSender<TaskSchedulerMessage>,
pub(super) futures_rx: AsyncMutex<mpsc::UnboundedReceiver<TaskSchedulerMessage>>, pub(super) futures_rx: AsyncMutex<mpsc::UnboundedReceiver<TaskSchedulerMessage>>,
} }
@ -77,7 +78,8 @@ impl<'fut> TaskScheduler<'fut> {
futures: AsyncMutex::new(FuturesUnordered::new()), futures: AsyncMutex::new(FuturesUnordered::new()),
futures_tx: tx, futures_tx: tx,
futures_rx: AsyncMutex::new(rx), futures_rx: AsyncMutex::new(rx),
futures_registered_count: Cell::new(0), futures_count: Cell::new(0),
futures_background_count: Cell::new(0),
}) })
} }
@ -281,7 +283,7 @@ impl<'fut> TaskScheduler<'fut> {
// NOTE: Setting this error here means that when the thread // NOTE: Setting this error here means that when the thread
// is resumed it will error instantly, so we don't need // is resumed it will error instantly, so we don't need
// to call it with proper args, empty args is fine // to call it with proper args, empty args is fine
*self.tasks_current_lua_error.borrow_mut() = Some(e); self.tasks_current_lua_error.replace(Some(e));
thread.resume(()) thread.resume(())
} }
Ok(args) => thread.resume(args), Ok(args) => thread.resume(args),
@ -358,6 +360,7 @@ impl<'fut> TaskScheduler<'fut> {
.futures .futures
.try_lock() .try_lock()
.expect("Tried to add future to queue during futures resumption"); .expect("Tried to add future to queue during futures resumption");
self.futures_count.set(self.futures_count.get() + 1);
futs.push(Box::pin(async move { futs.push(Box::pin(async move {
let result = fut.await; let result = fut.await;
(Some(task_ref), result) (Some(task_ref), result)