Use lock-free storage for runtime thread queue

This commit is contained in:
Filip Tibell 2024-01-26 09:39:21 +01:00
parent 5eeef3fec1
commit df2747bae3
No known key found for this signature in database
4 changed files with 45 additions and 48 deletions

1
Cargo.lock generated
View file

@ -517,6 +517,7 @@ dependencies = [
name = "smol-mlua" name = "smol-mlua"
version = "0.0.0" version = "0.0.0"
dependencies = [ dependencies = [
"concurrent-queue",
"mlua", "mlua",
"smol", "smol",
] ]

View file

@ -4,6 +4,7 @@ version = "0.0.0"
edition = "2021" edition = "2021"
[dependencies] [dependencies]
concurrent-queue = "2.4"
smol = "2.0" smol = "2.0"
mlua = { version = "0.9", features = ["luau", "luau-jit", "async"] } mlua = { version = "0.9", features = ["luau", "luau-jit", "async"] }

View file

@ -1,13 +1,8 @@
use std::sync::{ use std::sync::Arc;
atomic::{AtomicBool, Ordering},
Arc,
};
use concurrent_queue::ConcurrentQueue;
use mlua::prelude::*; use mlua::prelude::*;
use smol::{ use smol::channel::{unbounded, Receiver, Sender};
channel::{unbounded, Receiver, Sender},
lock::Mutex,
};
use crate::IntoLuaThread; use crate::IntoLuaThread;
@ -21,27 +16,22 @@ const ERR_OOM: &str = "out of memory";
*/ */
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct ThreadQueue { pub struct ThreadQueue {
queue: Arc<Mutex<Vec<ThreadWithArgs>>>, queue: Arc<ConcurrentQueue<ThreadWithArgs>>,
status: Arc<AtomicBool>,
signal_tx: Sender<()>, signal_tx: Sender<()>,
signal_rx: Receiver<()>, signal_rx: Receiver<()>,
} }
impl ThreadQueue { impl ThreadQueue {
pub fn new() -> Self { pub fn new() -> Self {
let queue = Arc::new(ConcurrentQueue::unbounded());
let (signal_tx, signal_rx) = unbounded(); let (signal_tx, signal_rx) = unbounded();
Self { Self {
queue: Arc::new(Mutex::new(Vec::new())), queue,
status: Arc::new(AtomicBool::new(false)),
signal_tx, signal_tx,
signal_rx, signal_rx,
} }
} }
pub fn has_threads(&self) -> bool {
self.status.load(Ordering::SeqCst)
}
pub fn push<'lua>( pub fn push<'lua>(
&self, &self,
lua: &'lua Lua, lua: &'lua Lua,
@ -52,21 +42,23 @@ impl ThreadQueue {
let args = args.into_lua_multi(lua)?; let args = args.into_lua_multi(lua)?;
let stored = ThreadWithArgs::new(lua, thread, args); let stored = ThreadWithArgs::new(lua, thread, args);
self.queue.lock_blocking().push(stored); self.queue.push(stored).unwrap();
self.status.store(true, Ordering::SeqCst);
self.signal_tx.try_send(()).unwrap(); self.signal_tx.try_send(()).unwrap();
Ok(()) Ok(())
} }
pub async fn drain<'lua>(&self, lua: &'lua Lua) -> Vec<(LuaThread<'lua>, LuaMultiValue<'lua>)> { pub fn drain<'outer, 'lua>(
let mut queue = self.queue.lock().await; &'outer self,
let drained = queue.drain(..).map(|s| s.into_inner(lua)).collect(); lua: &'lua Lua,
self.status.store(false, Ordering::SeqCst); ) -> impl Iterator<Item = (LuaThread<'lua>, LuaMultiValue<'lua>)> + 'outer
drained where
'lua: 'outer,
{
self.queue.try_iter().map(|stored| stored.into_inner(lua))
} }
pub async fn recv(&self) { pub async fn listen(&self) {
self.signal_rx.recv().await.unwrap(); self.signal_rx.recv().await.unwrap();
// Drain any pending receives // Drain any pending receives
loop { loop {

View file

@ -184,8 +184,8 @@ impl<'lua> Runtime<'lua> {
loop { loop {
// Wait for a new thread to arrive __or__ next futures step, prioritizing // Wait for a new thread to arrive __or__ next futures step, prioritizing
// new threads, so we don't accidentally exit when there is more work to do // new threads, so we don't accidentally exit when there is more work to do
let fut_spawn = self.queue_spawn.recv(); let fut_spawn = self.queue_spawn.listen();
let fut_defer = self.queue_defer.recv(); let fut_defer = self.queue_defer.listen();
let fut_tick = async { let fut_tick = async {
lua_exec.tick().await; lua_exec.tick().await;
// Do as much work as possible // Do as much work as possible
@ -200,29 +200,32 @@ impl<'lua> Runtime<'lua> {
// If a new thread was spawned onto any queue, // If a new thread was spawned onto any queue,
// we must drain them and schedule on the executor // we must drain them and schedule on the executor
if self.queue_spawn.has_threads() || self.queue_defer.has_threads() { let process_thread = |thread: LuaThread<'lua>, args| {
let mut queued_threads = Vec::new(); // NOTE: Thread may have been cancelled from lua
queued_threads.extend(self.queue_spawn.drain(self.lua).await); // before we got here, so we need to check it again
queued_threads.extend(self.queue_defer.drain(self.lua).await); if thread.status() == LuaThreadStatus::Resumable {
for (thread, args) in queued_threads { let mut stream = thread.clone().into_async::<_, LuaValue>(args);
// NOTE: Thread may have been cancelled from lua lua_exec
// before we got here, so we need to check it again .spawn(async move {
if thread.status() == LuaThreadStatus::Resumable { // Only run stream until first coroutine.yield or completion. We will
let mut stream = thread.clone().into_async::<_, LuaValue>(args); // drop it right away to clear stack space since detached tasks dont drop
lua_exec // until the executor drops https://github.com/smol-rs/smol/issues/294
.spawn(async move { let res = stream.next().await.unwrap();
// Only run stream until first coroutine.yield or completion. We will if let Err(e) = &res {
// drop it right away to clear stack space since detached tasks dont drop self.error_callback.call(e);
// until the executor drops https://github.com/smol-rs/smol/issues/294 }
let res = stream.next().await.unwrap(); // TODO: Figure out how to give this result to caller of spawn_thread/defer_thread
if let Err(e) = &res { })
self.error_callback.call(e); .detach();
}
// TODO: Figure out how to give this result to caller of spawn_thread/defer_thread
})
.detach();
}
} }
};
// Process spawned threads first, then deferred threads
for (thread, args) in self.queue_spawn.drain(self.lua) {
process_thread(thread, args);
}
for (thread, args) in self.queue_defer.drain(self.lua) {
process_thread(thread, args);
} }
// Empty executor = no remaining threads // Empty executor = no remaining threads