Separate spawn and defer queues, optimize

This commit is contained in:
Filip Tibell 2024-01-19 09:33:42 +01:00
parent 4a6dbba1ed
commit 2a1972316a
No known key found for this signature in database
2 changed files with 60 additions and 40 deletions

View file

@ -20,7 +20,7 @@ pub fn main() -> LuaResult<()> {
lua.globals().set(
"wait",
lua.create_async_function(|_, duration: Option<f64>| async move {
let duration = duration.unwrap_or_default().min(1.0 / 250.0);
let duration = duration.unwrap_or_default().max(1.0 / 250.0);
let before = Instant::now();
let after = Timer::after(Duration::from_secs_f64(duration)).await;
Ok((after - before).as_secs_f64())

View file

@ -1,4 +1,4 @@
use std::{collections::VecDeque, rc::Rc};
use std::{cell::Cell, rc::Rc};
use mlua::prelude::*;
use smol::{
@ -15,7 +15,9 @@ use super::{
};
pub struct ThreadRuntime {
queue: Rc<Mutex<VecDeque<ThreadWithArgs>>>,
queue_status: Rc<Cell<bool>>,
queue_spawn: Rc<Mutex<Vec<ThreadWithArgs>>>,
queue_defer: Rc<Mutex<Vec<ThreadWithArgs>>>,
tx: Sender<()>,
rx: Receiver<()>,
}
@ -27,18 +29,22 @@ impl ThreadRuntime {
This will inject some functions to interact with the scheduler / executor.
*/
pub fn new(lua: &Lua) -> LuaResult<ThreadRuntime> {
let queue = Rc::new(Mutex::new(VecDeque::new()));
let queue_status = Rc::new(Cell::new(false));
let queue_spawn = Rc::new(Mutex::new(Vec::new()));
let queue_defer = Rc::new(Mutex::new(Vec::new()));
let (tx, rx) = channel::unbounded();
// Create spawn function (push to start of queue)
let queue_spawn = Rc::clone(&queue);
let b_spawn = Rc::clone(&queue_status);
let q_spawn = Rc::clone(&queue_spawn);
let tx_spawn = tx.clone();
let fn_spawn = lua.create_function(
move |lua, (tof, args): (LuaThreadOrFunction, LuaMultiValue)| {
let thread = tof.into_thread(lua)?;
if thread.status() == LuaThreadStatus::Resumable {
let stored = ThreadWithArgs::new(lua, thread.clone(), args);
queue_spawn.lock_blocking().push_front(stored);
q_spawn.lock_blocking().push(stored);
b_spawn.replace(true);
tx_spawn.try_send(()).map_err(|_| {
LuaError::runtime("Tried to spawn thread to a dropped queue")
})?;
@ -50,14 +56,16 @@ impl ThreadRuntime {
)?;
// Create defer function (push to end of queue)
let queue_defer = Rc::clone(&queue);
let b_defer = Rc::clone(&queue_status);
let q_defer = Rc::clone(&queue_defer);
let tx_defer = tx.clone();
let fn_defer = lua.create_function(
move |lua, (tof, args): (LuaThreadOrFunction, LuaMultiValue)| {
let thread = tof.into_thread(lua)?;
if thread.status() == LuaThreadStatus::Resumable {
let stored = ThreadWithArgs::new(lua, thread.clone(), args);
queue_defer.lock_blocking().push_back(stored);
q_defer.lock_blocking().push(stored);
b_defer.replace(true);
tx_defer.try_send(()).map_err(|_| {
LuaError::runtime("Tried to defer thread to a dropped queue")
})?;
@ -73,7 +81,13 @@ impl ThreadRuntime {
lua.globals().set("spawn", fn_spawn)?;
lua.globals().set("defer", fn_defer)?;
Ok(ThreadRuntime { queue, tx, rx })
Ok(ThreadRuntime {
queue_status,
queue_spawn,
queue_defer,
tx,
rx,
})
}
/**
@ -92,7 +106,8 @@ impl ThreadRuntime {
let stored = ThreadWithArgs::new(lua, thread, args);
self.queue.lock_blocking().push_front(stored);
self.queue_spawn.lock_blocking().push(stored);
self.queue_status.replace(true);
self.tx.try_send(()).unwrap(); // Unwrap is safe since this struct also holds the receiver
}
@ -113,38 +128,43 @@ impl ThreadRuntime {
loop {
// Wait for a new thread to arrive __or__ next futures step, prioritizing
// new threads, so we don't accidentally exit when there is more work to do
self.rx
.recv()
.or(async {
lua_exec.tick().await;
Ok(())
})
.await
.ok();
let fut_recv = async {
self.rx.recv().await.ok();
};
let fut_tick = async {
lua_exec.tick().await;
};
fut_recv.or(fut_tick).await;
// If a new thread was spawned onto queue, we
// must drain it and schedule on the executor
for queued_thread in self.queue.lock().await.drain(..) {
// NOTE: Thread may have been cancelled from lua
// before we got here, so we need to check it again
let (thread, args) = queued_thread.into_inner(lua);
if thread.status() == LuaThreadStatus::Resumable {
let mut stream = thread.into_async::<_, LuaValue>(args);
lua_exec
.spawn(async move {
// Only run stream until first coroutine.yield or completion,
// this will then get dropped right away and clear stack space
match stream.next().await.unwrap() {
Err(e) => {
eprintln!("{e}");
// TODO: Forward error
// If a new thread was spawned onto any queue, we
// must drain them and schedule on the executor
if self.queue_status.get() {
let mut queued_threads = Vec::new();
queued_threads.extend(self.queue_spawn.lock().await.drain(..));
queued_threads.extend(self.queue_defer.lock().await.drain(..));
for queued_thread in queued_threads {
// NOTE: Thread may have been cancelled from lua
// before we got here, so we need to check it again
let (thread, args) = queued_thread.into_inner(lua);
if thread.status() == LuaThreadStatus::Resumable {
let mut stream = thread.into_async::<_, LuaValue>(args);
lua_exec
.spawn(async move {
// Only run stream until first coroutine.yield or completion. We will
// drop it right away to clear stack space since detached tasks dont drop
// until the executor drops https://github.com/smol-rs/smol/issues/294
match stream.next().await.unwrap() {
Err(e) => {
eprintln!("{e}");
// TODO: Forward error
}
Ok(_) => {
// TODO: Forward value
}
}
Ok(_) => {
// TODO: Forward value
}
}
})
.detach();
})
.detach();
}
}
}