Optimize thread queue storage for spawned and deferred threads

This commit is contained in:
Filip Tibell 2025-04-30 14:21:48 +02:00
parent 461ca24c33
commit 4c2bbcf425
No known key found for this signature in database
4 changed files with 42 additions and 23 deletions

View file

@ -1,6 +1,6 @@
use std::ops::{Deref, DerefMut}; use std::ops::{Deref, DerefMut};
use super::generic::ThreadQueue; use super::threads::ThreadQueue;
/** /**
Alias for [`ThreadQueue`], providing a newtype to store in Lua app data. Alias for [`ThreadQueue`], providing a newtype to store in Lua app data.

View file

@ -1,8 +1,8 @@
mod deferred; mod deferred;
mod event; mod event;
mod futures; mod futures;
mod generic;
mod spawned; mod spawned;
mod threads;
pub(crate) use self::deferred::DeferredThreadQueue; pub(crate) use self::deferred::DeferredThreadQueue;
pub(crate) use self::futures::FuturesQueue; pub(crate) use self::futures::FuturesQueue;

View file

@ -1,6 +1,6 @@
use std::ops::{Deref, DerefMut}; use std::ops::{Deref, DerefMut};
use super::generic::ThreadQueue; use super::threads::ThreadQueue;
/** /**
Alias for [`ThreadQueue`], providing a newtype to store in Lua app data. Alias for [`ThreadQueue`], providing a newtype to store in Lua app data.

View file

@ -1,6 +1,7 @@
use std::rc::Rc; #![allow(clippy::inline_always)]
use std::{cell::RefCell, collections::VecDeque, rc::Rc};
use concurrent_queue::ConcurrentQueue;
use mlua::prelude::*; use mlua::prelude::*;
use crate::{threads::ThreadId, traits::IntoLuaThread}; use crate::{threads::ThreadId, traits::IntoLuaThread};
@ -9,15 +10,16 @@ use super::event::QueueEvent;
#[derive(Debug)] #[derive(Debug)]
struct ThreadQueueInner { struct ThreadQueueInner {
queue: ConcurrentQueue<(LuaThread, LuaMultiValue)>, queue: RefCell<VecDeque<(LuaThread, LuaMultiValue)>>,
event: QueueEvent, event: QueueEvent,
} }
impl ThreadQueueInner { impl ThreadQueueInner {
fn new() -> Self { fn new() -> Self {
let queue = ConcurrentQueue::unbounded(); Self {
let event = QueueEvent::new(); queue: RefCell::new(VecDeque::new()),
Self { queue, event } event: QueueEvent::new(),
}
} }
} }
@ -50,31 +52,48 @@ impl ThreadQueue {
tracing::trace!("pushing item to queue with {} args", args.len()); tracing::trace!("pushing item to queue with {} args", args.len());
let id = ThreadId::from(&thread); let id = ThreadId::from(&thread);
let _ = self.inner.queue.push((thread, args)); self.inner.queue.borrow_mut().push_back((thread, args));
self.inner.event.notify(); self.inner.event.notify();
Ok(id) Ok(id)
} }
#[inline] #[inline(always)]
pub fn drain_items(&self) -> impl Iterator<Item = (LuaThread, LuaMultiValue)> + '_ { pub fn drain_items(&self) -> ThreadQueueDrain<'_> {
self.inner.queue.try_iter() ThreadQueueDrain::new(self)
} }
#[inline] #[inline(always)]
pub async fn wait_for_item(&self) { pub async fn wait_for_item(&self) {
if self.inner.queue.is_empty() { if self.inner.queue.borrow().is_empty() {
let listener = self.inner.event.listen(); self.inner.event.listen().await;
// NOTE: Need to check again, we could have gotten
// new queued items while creating our listener
if self.inner.queue.is_empty() {
listener.await;
}
} }
} }
#[inline] #[inline(always)]
pub fn is_empty(&self) -> bool { pub fn is_empty(&self) -> bool {
self.inner.queue.is_empty() self.inner.queue.borrow().is_empty()
}
}
/**
Iterator that drains the thread queue,
popping items from the front first.
*/
pub(crate) struct ThreadQueueDrain<'a> {
queue: &'a ThreadQueue,
}
impl<'a> ThreadQueueDrain<'a> {
pub fn new(queue: &'a ThreadQueue) -> Self {
Self { queue }
}
}
impl Iterator for ThreadQueueDrain<'_> {
type Item = (LuaThread, LuaMultiValue);
fn next(&mut self) -> Option<Self::Item> {
self.queue.inner.queue.borrow_mut().pop_front()
} }
} }