From bc6909acd487ce65cc177fe93104acc8c40da09f Mon Sep 17 00:00:00 2001 From: Filip Tibell Date: Fri, 26 Jan 2024 09:57:48 +0100 Subject: [PATCH] Use event primitive instead of smol channels in thread queue --- Cargo.lock | 1 + Cargo.toml | 1 + lib/queue.rs | 30 ++++++++++-------------------- lib/runtime.rs | 26 ++++++++------------------ 4 files changed, 20 insertions(+), 38 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 923e87f..303c4d6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -518,6 +518,7 @@ name = "smol-mlua" version = "0.0.0" dependencies = [ "concurrent-queue", + "event-listener 4.0.3", "mlua", "smol", ] diff --git a/Cargo.toml b/Cargo.toml index 633eedb..8e283f3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,7 @@ edition = "2021" [dependencies] concurrent-queue = "2.4" +event-listener = "4.0" smol = "2.0" mlua = { version = "0.9", features = ["luau", "luau-jit", "async"] } diff --git a/lib/queue.rs b/lib/queue.rs index b2075dc..56ace40 100644 --- a/lib/queue.rs +++ b/lib/queue.rs @@ -1,8 +1,8 @@ use std::sync::Arc; use concurrent_queue::ConcurrentQueue; +use event_listener::Event; use mlua::prelude::*; -use smol::channel::{unbounded, Receiver, Sender}; use crate::IntoLuaThread; @@ -17,22 +17,17 @@ const ERR_OOM: &str = "out of memory"; #[derive(Debug, Clone)] pub struct ThreadQueue { queue: Arc>, - signal_tx: Sender<()>, - signal_rx: Receiver<()>, + event: Arc, } impl ThreadQueue { pub fn new() -> Self { let queue = Arc::new(ConcurrentQueue::unbounded()); - let (signal_tx, signal_rx) = unbounded(); - Self { - queue, - signal_tx, - signal_rx, - } + let event = Arc::new(Event::new()); + Self { queue, event } } - pub fn push<'lua>( + pub fn push_item<'lua>( &self, lua: &'lua Lua, thread: impl IntoLuaThread<'lua>, @@ -43,12 +38,12 @@ impl ThreadQueue { let stored = ThreadWithArgs::new(lua, thread, args); self.queue.push(stored).unwrap(); - self.signal_tx.try_send(()).unwrap(); + self.event.notify(usize::MAX); Ok(()) } - pub fn drain<'outer, 'lua>( + pub fn drain_items<'outer, 'lua>( &'outer self, lua: &'lua Lua, ) -> impl Iterator, LuaMultiValue<'lua>)> + 'outer @@ -58,14 +53,9 @@ impl ThreadQueue { self.queue.try_iter().map(|stored| stored.into_inner(lua)) } - pub async fn listen(&self) { - self.signal_rx.recv().await.unwrap(); - // Drain any pending receives - loop { - match self.signal_rx.try_recv() { - Ok(_) => continue, - Err(_) => break, - } + pub async fn wait_for_item(&self) { + if self.queue.is_empty() { + self.event.listen().await; } } } diff --git a/lib/runtime.rs b/lib/runtime.rs index d27f902..b049481 100644 --- a/lib/runtime.rs +++ b/lib/runtime.rs @@ -67,12 +67,7 @@ impl<'lua> Runtime<'lua> { thread: impl IntoLuaThread<'lua>, args: impl IntoLuaMulti<'lua>, ) -> LuaResult<()> { - let thread = thread.into_lua_thread(self.lua)?; - let args = args.into_lua_multi(self.lua)?; - - self.queue_spawn.push(self.lua, thread, args)?; - - Ok(()) + self.queue_spawn.push_item(self.lua, thread, args) } /** @@ -87,12 +82,7 @@ impl<'lua> Runtime<'lua> { thread: impl IntoLuaThread<'lua>, args: impl IntoLuaMulti<'lua>, ) -> LuaResult<()> { - let thread = thread.into_lua_thread(self.lua)?; - let args = args.into_lua_multi(self.lua)?; - - self.queue_defer.push(self.lua, thread, args)?; - - Ok(()) + self.queue_defer.push_item(self.lua, thread, args) } /** @@ -115,7 +105,7 @@ impl<'lua> Runtime<'lua> { .map(|l| l == Lua::poll_pending()) .unwrap_or_default() { - spawn_queue.push(lua, &thread, args)?; + spawn_queue.push_item(lua, &thread, args)?; } } Err(e) => { @@ -141,7 +131,7 @@ impl<'lua> Runtime<'lua> { move |lua, (tof, args): (LuaThreadOrFunction, LuaMultiValue)| { let thread = tof.into_thread(lua)?; if thread.status() == LuaThreadStatus::Resumable { - defer_queue.push(lua, &thread, args)?; + defer_queue.push_item(lua, &thread, args)?; } Ok(thread) }, @@ -184,8 +174,8 @@ impl<'lua> Runtime<'lua> { loop { // Wait for a new thread to arrive __or__ next futures step, prioritizing // new threads, so we don't accidentally exit when there is more work to do - let fut_spawn = self.queue_spawn.listen(); - let fut_defer = self.queue_defer.listen(); + let fut_spawn = self.queue_spawn.wait_for_item(); + let fut_defer = self.queue_defer.wait_for_item(); let fut_tick = async { lua_exec.tick().await; // Do as much work as possible @@ -221,10 +211,10 @@ impl<'lua> Runtime<'lua> { }; // Process spawned threads first, then deferred threads - for (thread, args) in self.queue_spawn.drain(self.lua) { + for (thread, args) in self.queue_spawn.drain_items(self.lua) { process_thread(thread, args); } - for (thread, args) in self.queue_defer.drain(self.lua) { + for (thread, args) in self.queue_defer.drain_items(self.lua) { process_thread(thread, args); }