Add safety check and resumption batching to runtime

This commit is contained in:
Filip Tibell 2024-01-26 09:16:39 +01:00
parent fe4563b4f8
commit 5eeef3fec1
No known key found for this signature in database
2 changed files with 26 additions and 1 deletions

View file

@ -68,6 +68,13 @@ impl ThreadQueue {
pub async fn recv(&self) {
self.signal_rx.recv().await.unwrap();
// Drain any pending receives
loop {
match self.signal_rx.try_recv() {
Ok(_) => continue,
Err(_) => break,
}
}
}
}

View file

@ -1,7 +1,8 @@
use std::sync::{Arc, Weak};
use std::time::Duration;
use mlua::prelude::*;
use smol::prelude::*;
use smol::{prelude::*, Timer};
use smol::{block_on, Executor, LocalExecutor};
@ -154,6 +155,15 @@ impl<'lua> Runtime<'lua> {
used to create this runtime, otherwise this method may panic.
*/
pub async fn run_async(&self) {
// Make sure we do not already have an executor - this is a definite user error
// and may happen if the user tries to run multiple runtimes on the same lua state
if self.lua.app_data_ref::<Weak<Executor>>().is_some() {
panic!(
"Lua state already has an executor attached!\
\nOnly one runtime can be used per lua state."
);
}
// Create new executors to use - note that we do not need to create multiple executors
// for work stealing, using the `spawn` global function that smol provides will work
// just fine, as long as anything spawned by it is awaited from lua async functions
@ -163,6 +173,11 @@ impl<'lua> Runtime<'lua> {
// Store the main executor in lua for spawner trait
self.lua.set_app_data(Arc::downgrade(&main_exec));
// Create a timer for a resumption cycle / throttling mechanism, waiting on this
// will allow us to batch more work together when the runtime is under high load,
// and adds an acceptable amount of latency for new async tasks (we run at 250hz)
let mut cycle = Timer::interval(Duration::from_millis(4));
// Tick local lua executor while also driving main
// executor forward, until all lua threads finish
let fut = async {
@ -214,6 +229,9 @@ impl<'lua> Runtime<'lua> {
if lua_exec.is_empty() {
break;
}
// Wait for next resumption cycle
cycle.next().await;
}
};