From 5eeef3fec1874e3fe338338499b967212359eacd Mon Sep 17 00:00:00 2001 From: Filip Tibell Date: Fri, 26 Jan 2024 09:16:39 +0100 Subject: [PATCH] Add safety check and resumption batching to runtime --- lib/queue.rs | 7 +++++++ lib/runtime.rs | 20 +++++++++++++++++++- 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/lib/queue.rs b/lib/queue.rs index 6ac9f27..633287e 100644 --- a/lib/queue.rs +++ b/lib/queue.rs @@ -68,6 +68,13 @@ impl ThreadQueue { pub async fn recv(&self) { self.signal_rx.recv().await.unwrap(); + // Drain any pending receives + loop { + match self.signal_rx.try_recv() { + Ok(_) => continue, + Err(_) => break, + } + } } } diff --git a/lib/runtime.rs b/lib/runtime.rs index b537974..a359212 100644 --- a/lib/runtime.rs +++ b/lib/runtime.rs @@ -1,7 +1,8 @@ use std::sync::{Arc, Weak}; +use std::time::Duration; use mlua::prelude::*; -use smol::prelude::*; +use smol::{prelude::*, Timer}; use smol::{block_on, Executor, LocalExecutor}; @@ -154,6 +155,15 @@ impl<'lua> Runtime<'lua> { used to create this runtime, otherwise this method may panic. */ pub async fn run_async(&self) { + // Make sure we do not already have an executor - this is a definite user error + // and may happen if the user tries to run multiple runtimes on the same lua state + if self.lua.app_data_ref::>().is_some() { + panic!( + "Lua state already has an executor attached!\ + \nOnly one runtime can be used per lua state." + ); + } + // Create new executors to use - note that we do not need to create multiple executors // for work stealing, using the `spawn` global function that smol provides will work // just fine, as long as anything spawned by it is awaited from lua async functions @@ -163,6 +173,11 @@ impl<'lua> Runtime<'lua> { // Store the main executor in lua for spawner trait self.lua.set_app_data(Arc::downgrade(&main_exec)); + // Create a timer for a resumption cycle / throttling mechanism, waiting on this + // will allow us to batch more work together when the runtime is under high load, + // and adds an acceptable amount of latency for new async tasks (we run at 250hz) + let mut cycle = Timer::interval(Duration::from_millis(4)); + // Tick local lua executor while also driving main // executor forward, until all lua threads finish let fut = async { @@ -214,6 +229,9 @@ impl<'lua> Runtime<'lua> { if lua_exec.is_empty() { break; } + + // Wait for next resumption cycle + cycle.next().await; } };