From df2747bae31f23b0003819f19dd3740b278fe0a9 Mon Sep 17 00:00:00 2001 From: Filip Tibell Date: Fri, 26 Jan 2024 09:39:21 +0100 Subject: [PATCH] Use lock-free storage for runtime thread queue --- Cargo.lock | 1 + Cargo.toml | 1 + lib/queue.rs | 40 ++++++++++++++++----------------------- lib/runtime.rs | 51 ++++++++++++++++++++++++++------------------------ 4 files changed, 45 insertions(+), 48 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e174644..923e87f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -517,6 +517,7 @@ dependencies = [ name = "smol-mlua" version = "0.0.0" dependencies = [ + "concurrent-queue", "mlua", "smol", ] diff --git a/Cargo.toml b/Cargo.toml index 660115e..633eedb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,6 +4,7 @@ version = "0.0.0" edition = "2021" [dependencies] +concurrent-queue = "2.4" smol = "2.0" mlua = { version = "0.9", features = ["luau", "luau-jit", "async"] } diff --git a/lib/queue.rs b/lib/queue.rs index 633287e..b2075dc 100644 --- a/lib/queue.rs +++ b/lib/queue.rs @@ -1,13 +1,8 @@ -use std::sync::{ - atomic::{AtomicBool, Ordering}, - Arc, -}; +use std::sync::Arc; +use concurrent_queue::ConcurrentQueue; use mlua::prelude::*; -use smol::{ - channel::{unbounded, Receiver, Sender}, - lock::Mutex, -}; +use smol::channel::{unbounded, Receiver, Sender}; use crate::IntoLuaThread; @@ -21,27 +16,22 @@ const ERR_OOM: &str = "out of memory"; */ #[derive(Debug, Clone)] pub struct ThreadQueue { - queue: Arc>>, - status: Arc, + queue: Arc>, signal_tx: Sender<()>, signal_rx: Receiver<()>, } impl ThreadQueue { pub fn new() -> Self { + let queue = Arc::new(ConcurrentQueue::unbounded()); let (signal_tx, signal_rx) = unbounded(); Self { - queue: Arc::new(Mutex::new(Vec::new())), - status: Arc::new(AtomicBool::new(false)), + queue, signal_tx, signal_rx, } } - pub fn has_threads(&self) -> bool { - self.status.load(Ordering::SeqCst) - } - pub fn push<'lua>( &self, lua: &'lua Lua, @@ -52,21 +42,23 @@ impl ThreadQueue { let args = args.into_lua_multi(lua)?; let stored = ThreadWithArgs::new(lua, thread, args); - self.queue.lock_blocking().push(stored); - self.status.store(true, Ordering::SeqCst); + self.queue.push(stored).unwrap(); self.signal_tx.try_send(()).unwrap(); Ok(()) } - pub async fn drain<'lua>(&self, lua: &'lua Lua) -> Vec<(LuaThread<'lua>, LuaMultiValue<'lua>)> { - let mut queue = self.queue.lock().await; - let drained = queue.drain(..).map(|s| s.into_inner(lua)).collect(); - self.status.store(false, Ordering::SeqCst); - drained + pub fn drain<'outer, 'lua>( + &'outer self, + lua: &'lua Lua, + ) -> impl Iterator, LuaMultiValue<'lua>)> + 'outer + where + 'lua: 'outer, + { + self.queue.try_iter().map(|stored| stored.into_inner(lua)) } - pub async fn recv(&self) { + pub async fn listen(&self) { self.signal_rx.recv().await.unwrap(); // Drain any pending receives loop { diff --git a/lib/runtime.rs b/lib/runtime.rs index a359212..d27f902 100644 --- a/lib/runtime.rs +++ b/lib/runtime.rs @@ -184,8 +184,8 @@ impl<'lua> Runtime<'lua> { loop { // Wait for a new thread to arrive __or__ next futures step, prioritizing // new threads, so we don't accidentally exit when there is more work to do - let fut_spawn = self.queue_spawn.recv(); - let fut_defer = self.queue_defer.recv(); + let fut_spawn = self.queue_spawn.listen(); + let fut_defer = self.queue_defer.listen(); let fut_tick = async { lua_exec.tick().await; // Do as much work as possible @@ -200,29 +200,32 @@ impl<'lua> Runtime<'lua> { // If a new thread was spawned onto any queue, // we must drain them and schedule on the executor - if self.queue_spawn.has_threads() || self.queue_defer.has_threads() { - let mut queued_threads = Vec::new(); - queued_threads.extend(self.queue_spawn.drain(self.lua).await); - queued_threads.extend(self.queue_defer.drain(self.lua).await); - for (thread, args) in queued_threads { - // NOTE: Thread may have been cancelled from lua - // before we got here, so we need to check it again - if thread.status() == LuaThreadStatus::Resumable { - let mut stream = thread.clone().into_async::<_, LuaValue>(args); - lua_exec - .spawn(async move { - // Only run stream until first coroutine.yield or completion. We will - // drop it right away to clear stack space since detached tasks dont drop - // until the executor drops https://github.com/smol-rs/smol/issues/294 - let res = stream.next().await.unwrap(); - if let Err(e) = &res { - self.error_callback.call(e); - } - // TODO: Figure out how to give this result to caller of spawn_thread/defer_thread - }) - .detach(); - } + let process_thread = |thread: LuaThread<'lua>, args| { + // NOTE: Thread may have been cancelled from lua + // before we got here, so we need to check it again + if thread.status() == LuaThreadStatus::Resumable { + let mut stream = thread.clone().into_async::<_, LuaValue>(args); + lua_exec + .spawn(async move { + // Only run stream until first coroutine.yield or completion. We will + // drop it right away to clear stack space since detached tasks dont drop + // until the executor drops https://github.com/smol-rs/smol/issues/294 + let res = stream.next().await.unwrap(); + if let Err(e) = &res { + self.error_callback.call(e); + } + // TODO: Figure out how to give this result to caller of spawn_thread/defer_thread + }) + .detach(); } + }; + + // Process spawned threads first, then deferred threads + for (thread, args) in self.queue_spawn.drain(self.lua) { + process_thread(thread, args); + } + for (thread, args) in self.queue_defer.drain(self.lua) { + process_thread(thread, args); } // Empty executor = no remaining threads