Use event primitive instead of smol channels in thread queue

This commit is contained in:
Filip Tibell 2024-01-26 09:57:48 +01:00
parent df2747bae3
commit bc6909acd4
No known key found for this signature in database
4 changed files with 20 additions and 38 deletions

1
Cargo.lock generated
View file

@ -518,6 +518,7 @@ name = "smol-mlua"
version = "0.0.0" version = "0.0.0"
dependencies = [ dependencies = [
"concurrent-queue", "concurrent-queue",
"event-listener 4.0.3",
"mlua", "mlua",
"smol", "smol",
] ]

View file

@ -5,6 +5,7 @@ edition = "2021"
[dependencies] [dependencies]
concurrent-queue = "2.4" concurrent-queue = "2.4"
event-listener = "4.0"
smol = "2.0" smol = "2.0"
mlua = { version = "0.9", features = ["luau", "luau-jit", "async"] } mlua = { version = "0.9", features = ["luau", "luau-jit", "async"] }

View file

@ -1,8 +1,8 @@
use std::sync::Arc; use std::sync::Arc;
use concurrent_queue::ConcurrentQueue; use concurrent_queue::ConcurrentQueue;
use event_listener::Event;
use mlua::prelude::*; use mlua::prelude::*;
use smol::channel::{unbounded, Receiver, Sender};
use crate::IntoLuaThread; use crate::IntoLuaThread;
@ -17,22 +17,17 @@ const ERR_OOM: &str = "out of memory";
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct ThreadQueue { pub struct ThreadQueue {
queue: Arc<ConcurrentQueue<ThreadWithArgs>>, queue: Arc<ConcurrentQueue<ThreadWithArgs>>,
signal_tx: Sender<()>, event: Arc<Event>,
signal_rx: Receiver<()>,
} }
impl ThreadQueue { impl ThreadQueue {
pub fn new() -> Self { pub fn new() -> Self {
let queue = Arc::new(ConcurrentQueue::unbounded()); let queue = Arc::new(ConcurrentQueue::unbounded());
let (signal_tx, signal_rx) = unbounded(); let event = Arc::new(Event::new());
Self { Self { queue, event }
queue,
signal_tx,
signal_rx,
}
} }
pub fn push<'lua>( pub fn push_item<'lua>(
&self, &self,
lua: &'lua Lua, lua: &'lua Lua,
thread: impl IntoLuaThread<'lua>, thread: impl IntoLuaThread<'lua>,
@ -43,12 +38,12 @@ impl ThreadQueue {
let stored = ThreadWithArgs::new(lua, thread, args); let stored = ThreadWithArgs::new(lua, thread, args);
self.queue.push(stored).unwrap(); self.queue.push(stored).unwrap();
self.signal_tx.try_send(()).unwrap(); self.event.notify(usize::MAX);
Ok(()) Ok(())
} }
pub fn drain<'outer, 'lua>( pub fn drain_items<'outer, 'lua>(
&'outer self, &'outer self,
lua: &'lua Lua, lua: &'lua Lua,
) -> impl Iterator<Item = (LuaThread<'lua>, LuaMultiValue<'lua>)> + 'outer ) -> impl Iterator<Item = (LuaThread<'lua>, LuaMultiValue<'lua>)> + 'outer
@ -58,14 +53,9 @@ impl ThreadQueue {
self.queue.try_iter().map(|stored| stored.into_inner(lua)) self.queue.try_iter().map(|stored| stored.into_inner(lua))
} }
pub async fn listen(&self) { pub async fn wait_for_item(&self) {
self.signal_rx.recv().await.unwrap(); if self.queue.is_empty() {
// Drain any pending receives self.event.listen().await;
loop {
match self.signal_rx.try_recv() {
Ok(_) => continue,
Err(_) => break,
}
} }
} }
} }

View file

@ -67,12 +67,7 @@ impl<'lua> Runtime<'lua> {
thread: impl IntoLuaThread<'lua>, thread: impl IntoLuaThread<'lua>,
args: impl IntoLuaMulti<'lua>, args: impl IntoLuaMulti<'lua>,
) -> LuaResult<()> { ) -> LuaResult<()> {
let thread = thread.into_lua_thread(self.lua)?; self.queue_spawn.push_item(self.lua, thread, args)
let args = args.into_lua_multi(self.lua)?;
self.queue_spawn.push(self.lua, thread, args)?;
Ok(())
} }
/** /**
@ -87,12 +82,7 @@ impl<'lua> Runtime<'lua> {
thread: impl IntoLuaThread<'lua>, thread: impl IntoLuaThread<'lua>,
args: impl IntoLuaMulti<'lua>, args: impl IntoLuaMulti<'lua>,
) -> LuaResult<()> { ) -> LuaResult<()> {
let thread = thread.into_lua_thread(self.lua)?; self.queue_defer.push_item(self.lua, thread, args)
let args = args.into_lua_multi(self.lua)?;
self.queue_defer.push(self.lua, thread, args)?;
Ok(())
} }
/** /**
@ -115,7 +105,7 @@ impl<'lua> Runtime<'lua> {
.map(|l| l == Lua::poll_pending()) .map(|l| l == Lua::poll_pending())
.unwrap_or_default() .unwrap_or_default()
{ {
spawn_queue.push(lua, &thread, args)?; spawn_queue.push_item(lua, &thread, args)?;
} }
} }
Err(e) => { Err(e) => {
@ -141,7 +131,7 @@ impl<'lua> Runtime<'lua> {
move |lua, (tof, args): (LuaThreadOrFunction, LuaMultiValue)| { move |lua, (tof, args): (LuaThreadOrFunction, LuaMultiValue)| {
let thread = tof.into_thread(lua)?; let thread = tof.into_thread(lua)?;
if thread.status() == LuaThreadStatus::Resumable { if thread.status() == LuaThreadStatus::Resumable {
defer_queue.push(lua, &thread, args)?; defer_queue.push_item(lua, &thread, args)?;
} }
Ok(thread) Ok(thread)
}, },
@ -184,8 +174,8 @@ impl<'lua> Runtime<'lua> {
loop { loop {
// Wait for a new thread to arrive __or__ next futures step, prioritizing // Wait for a new thread to arrive __or__ next futures step, prioritizing
// new threads, so we don't accidentally exit when there is more work to do // new threads, so we don't accidentally exit when there is more work to do
let fut_spawn = self.queue_spawn.listen(); let fut_spawn = self.queue_spawn.wait_for_item();
let fut_defer = self.queue_defer.listen(); let fut_defer = self.queue_defer.wait_for_item();
let fut_tick = async { let fut_tick = async {
lua_exec.tick().await; lua_exec.tick().await;
// Do as much work as possible // Do as much work as possible
@ -221,10 +211,10 @@ impl<'lua> Runtime<'lua> {
}; };
// Process spawned threads first, then deferred threads // Process spawned threads first, then deferred threads
for (thread, args) in self.queue_spawn.drain(self.lua) { for (thread, args) in self.queue_spawn.drain_items(self.lua) {
process_thread(thread, args); process_thread(thread, args);
} }
for (thread, args) in self.queue_defer.drain(self.lua) { for (thread, args) in self.queue_defer.drain_items(self.lua) {
process_thread(thread, args); process_thread(thread, args);
} }