From 4c2bbcf425e7e5116173a031bc7eac6f959cfd11 Mon Sep 17 00:00:00 2001 From: Filip Tibell Date: Wed, 30 Apr 2025 14:21:48 +0200 Subject: [PATCH] Optimize thread queue storage for spawned and deferred threads --- .../mlua-luau-scheduler/src/queue/deferred.rs | 2 +- crates/mlua-luau-scheduler/src/queue/mod.rs | 2 +- .../mlua-luau-scheduler/src/queue/spawned.rs | 2 +- .../src/queue/{generic.rs => threads.rs} | 59 ++++++++++++------- 4 files changed, 42 insertions(+), 23 deletions(-) rename crates/mlua-luau-scheduler/src/queue/{generic.rs => threads.rs} (52%) diff --git a/crates/mlua-luau-scheduler/src/queue/deferred.rs b/crates/mlua-luau-scheduler/src/queue/deferred.rs index f2ad9e2..dbdb536 100644 --- a/crates/mlua-luau-scheduler/src/queue/deferred.rs +++ b/crates/mlua-luau-scheduler/src/queue/deferred.rs @@ -1,6 +1,6 @@ use std::ops::{Deref, DerefMut}; -use super::generic::ThreadQueue; +use super::threads::ThreadQueue; /** Alias for [`ThreadQueue`], providing a newtype to store in Lua app data. diff --git a/crates/mlua-luau-scheduler/src/queue/mod.rs b/crates/mlua-luau-scheduler/src/queue/mod.rs index 21d6e10..97dc2e5 100644 --- a/crates/mlua-luau-scheduler/src/queue/mod.rs +++ b/crates/mlua-luau-scheduler/src/queue/mod.rs @@ -1,8 +1,8 @@ mod deferred; mod event; mod futures; -mod generic; mod spawned; +mod threads; pub(crate) use self::deferred::DeferredThreadQueue; pub(crate) use self::futures::FuturesQueue; diff --git a/crates/mlua-luau-scheduler/src/queue/spawned.rs b/crates/mlua-luau-scheduler/src/queue/spawned.rs index 5c918a5..2e3bc42 100644 --- a/crates/mlua-luau-scheduler/src/queue/spawned.rs +++ b/crates/mlua-luau-scheduler/src/queue/spawned.rs @@ -1,6 +1,6 @@ use std::ops::{Deref, DerefMut}; -use super::generic::ThreadQueue; +use super::threads::ThreadQueue; /** Alias for [`ThreadQueue`], providing a newtype to store in Lua app data. diff --git a/crates/mlua-luau-scheduler/src/queue/generic.rs b/crates/mlua-luau-scheduler/src/queue/threads.rs similarity index 52% rename from crates/mlua-luau-scheduler/src/queue/generic.rs rename to crates/mlua-luau-scheduler/src/queue/threads.rs index b069e8f..6c3f100 100644 --- a/crates/mlua-luau-scheduler/src/queue/generic.rs +++ b/crates/mlua-luau-scheduler/src/queue/threads.rs @@ -1,6 +1,7 @@ -use std::rc::Rc; +#![allow(clippy::inline_always)] + +use std::{cell::RefCell, collections::VecDeque, rc::Rc}; -use concurrent_queue::ConcurrentQueue; use mlua::prelude::*; use crate::{threads::ThreadId, traits::IntoLuaThread}; @@ -9,15 +10,16 @@ use super::event::QueueEvent; #[derive(Debug)] struct ThreadQueueInner { - queue: ConcurrentQueue<(LuaThread, LuaMultiValue)>, + queue: RefCell>, event: QueueEvent, } impl ThreadQueueInner { fn new() -> Self { - let queue = ConcurrentQueue::unbounded(); - let event = QueueEvent::new(); - Self { queue, event } + Self { + queue: RefCell::new(VecDeque::new()), + event: QueueEvent::new(), + } } } @@ -50,31 +52,48 @@ impl ThreadQueue { tracing::trace!("pushing item to queue with {} args", args.len()); let id = ThreadId::from(&thread); - let _ = self.inner.queue.push((thread, args)); + self.inner.queue.borrow_mut().push_back((thread, args)); self.inner.event.notify(); Ok(id) } - #[inline] - pub fn drain_items(&self) -> impl Iterator + '_ { - self.inner.queue.try_iter() + #[inline(always)] + pub fn drain_items(&self) -> ThreadQueueDrain<'_> { + ThreadQueueDrain::new(self) } - #[inline] + #[inline(always)] pub async fn wait_for_item(&self) { - if self.inner.queue.is_empty() { - let listener = self.inner.event.listen(); - // NOTE: Need to check again, we could have gotten - // new queued items while creating our listener - if self.inner.queue.is_empty() { - listener.await; - } + if self.inner.queue.borrow().is_empty() { + self.inner.event.listen().await; } } - #[inline] + #[inline(always)] pub fn is_empty(&self) -> bool { - self.inner.queue.is_empty() + self.inner.queue.borrow().is_empty() + } +} + +/** + Iterator that drains the thread queue, + popping items from the front first. +*/ +pub(crate) struct ThreadQueueDrain<'a> { + queue: &'a ThreadQueue, +} + +impl<'a> ThreadQueueDrain<'a> { + pub fn new(queue: &'a ThreadQueue) -> Self { + Self { queue } + } +} + +impl Iterator for ThreadQueueDrain<'_> { + type Item = (LuaThread, LuaMultiValue); + + fn next(&mut self) -> Option { + self.queue.inner.queue.borrow_mut().pop_front() } }