From 7fd390deadda070d3a286b736c6a5464c594988c Mon Sep 17 00:00:00 2001 From: Filip Tibell Date: Wed, 30 Apr 2025 13:53:43 +0200 Subject: [PATCH] Organize queue-related files a bit better --- crates/mlua-luau-scheduler/src/queue.rs | 188 ------------------ .../mlua-luau-scheduler/src/queue/deferred.rs | 28 +++ .../mlua-luau-scheduler/src/queue/futures.rs | 56 ++++++ .../mlua-luau-scheduler/src/queue/generic.rs | 79 ++++++++ crates/mlua-luau-scheduler/src/queue/mod.rs | 8 + .../mlua-luau-scheduler/src/queue/spawned.rs | 28 +++ 6 files changed, 199 insertions(+), 188 deletions(-) delete mode 100644 crates/mlua-luau-scheduler/src/queue.rs create mode 100644 crates/mlua-luau-scheduler/src/queue/deferred.rs create mode 100644 crates/mlua-luau-scheduler/src/queue/futures.rs create mode 100644 crates/mlua-luau-scheduler/src/queue/generic.rs create mode 100644 crates/mlua-luau-scheduler/src/queue/mod.rs create mode 100644 crates/mlua-luau-scheduler/src/queue/spawned.rs diff --git a/crates/mlua-luau-scheduler/src/queue.rs b/crates/mlua-luau-scheduler/src/queue.rs deleted file mode 100644 index 4f22f28..0000000 --- a/crates/mlua-luau-scheduler/src/queue.rs +++ /dev/null @@ -1,188 +0,0 @@ -use std::{ - ops::{Deref, DerefMut}, - pin::Pin, - rc::Rc, -}; - -use concurrent_queue::ConcurrentQueue; -use event_listener::Event; -use futures_lite::{Future, FutureExt}; -use mlua::prelude::*; - -use crate::{traits::IntoLuaThread, ThreadId}; - -/** - Queue for storing [`LuaThread`]s with associated arguments. - - Provides methods for pushing and draining the queue, as - well as listening for new items being pushed to the queue. -*/ -#[derive(Debug, Clone)] -pub(crate) struct ThreadQueue { - inner: Rc, -} - -impl ThreadQueue { - pub fn new() -> Self { - let inner = Rc::new(ThreadQueueInner::new()); - Self { inner } - } - - pub fn push_item( - &self, - lua: &Lua, - thread: impl IntoLuaThread, - args: impl IntoLuaMulti, - ) -> LuaResult { - let thread = thread.into_lua_thread(lua)?; - let args = args.into_lua_multi(lua)?; - - tracing::trace!("pushing item to queue with {} args", args.len()); - let id = ThreadId::from(&thread); - - let _ = self.inner.queue.push((thread, args)); - self.inner.event.notify(usize::MAX); - - Ok(id) - } - - #[inline] - pub fn drain_items(&self) -> impl Iterator + '_ { - self.inner.queue.try_iter() - } - - #[inline] - pub async fn wait_for_item(&self) { - if self.inner.queue.is_empty() { - let listener = self.inner.event.listen(); - // NOTE: Need to check again, we could have gotten - // new queued items while creating our listener - if self.inner.queue.is_empty() { - listener.await; - } - } - } - - #[inline] - pub fn is_empty(&self) -> bool { - self.inner.queue.is_empty() - } -} - -/** - Alias for [`ThreadQueue`], providing a newtype to store in Lua app data. -*/ -#[derive(Debug, Clone)] -pub(crate) struct SpawnedThreadQueue(ThreadQueue); - -impl SpawnedThreadQueue { - pub fn new() -> Self { - Self(ThreadQueue::new()) - } -} - -impl Deref for SpawnedThreadQueue { - type Target = ThreadQueue; - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl DerefMut for SpawnedThreadQueue { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 - } -} - -/** - Alias for [`ThreadQueue`], providing a newtype to store in Lua app data. -*/ -#[derive(Debug, Clone)] -pub(crate) struct DeferredThreadQueue(ThreadQueue); - -impl DeferredThreadQueue { - pub fn new() -> Self { - Self(ThreadQueue::new()) - } -} - -impl Deref for DeferredThreadQueue { - type Target = ThreadQueue; - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl DerefMut for DeferredThreadQueue { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 - } -} - -pub type LocalBoxFuture<'fut> = Pin + 'fut>>; - -/** - Queue for storing local futures. - - Provides methods for pushing and draining the queue, as - well as listening for new items being pushed to the queue. -*/ -#[derive(Debug, Clone)] -pub(crate) struct FuturesQueue<'fut> { - inner: Rc>, -} - -impl<'fut> FuturesQueue<'fut> { - pub fn new() -> Self { - let inner = Rc::new(FuturesQueueInner::new()); - Self { inner } - } - - pub fn push_item(&self, fut: impl Future + 'fut) { - let _ = self.inner.queue.push(fut.boxed_local()); - self.inner.event.notify(usize::MAX); - } - - pub fn drain_items<'outer>( - &'outer self, - ) -> impl Iterator> + 'outer { - self.inner.queue.try_iter() - } - - pub async fn wait_for_item(&self) { - if self.inner.queue.is_empty() { - self.inner.event.listen().await; - } - } -} - -// Inner structs without ref counting so that outer structs -// have only a single ref counter for extremely cheap clones - -#[derive(Debug)] -struct ThreadQueueInner { - queue: ConcurrentQueue<(LuaThread, LuaMultiValue)>, - event: Event, -} - -impl ThreadQueueInner { - fn new() -> Self { - let queue = ConcurrentQueue::unbounded(); - let event = Event::new(); - Self { queue, event } - } -} - -#[derive(Debug)] -struct FuturesQueueInner<'fut> { - queue: ConcurrentQueue>, - event: Event, -} - -impl FuturesQueueInner<'_> { - pub fn new() -> Self { - let queue = ConcurrentQueue::unbounded(); - let event = Event::new(); - Self { queue, event } - } -} diff --git a/crates/mlua-luau-scheduler/src/queue/deferred.rs b/crates/mlua-luau-scheduler/src/queue/deferred.rs new file mode 100644 index 0000000..f2ad9e2 --- /dev/null +++ b/crates/mlua-luau-scheduler/src/queue/deferred.rs @@ -0,0 +1,28 @@ +use std::ops::{Deref, DerefMut}; + +use super::generic::ThreadQueue; + +/** + Alias for [`ThreadQueue`], providing a newtype to store in Lua app data. +*/ +#[derive(Debug, Clone)] +pub(crate) struct DeferredThreadQueue(ThreadQueue); + +impl DeferredThreadQueue { + pub fn new() -> Self { + Self(ThreadQueue::new()) + } +} + +impl Deref for DeferredThreadQueue { + type Target = ThreadQueue; + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for DeferredThreadQueue { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} diff --git a/crates/mlua-luau-scheduler/src/queue/futures.rs b/crates/mlua-luau-scheduler/src/queue/futures.rs new file mode 100644 index 0000000..eab6b1c --- /dev/null +++ b/crates/mlua-luau-scheduler/src/queue/futures.rs @@ -0,0 +1,56 @@ +use std::{pin::Pin, rc::Rc}; + +use concurrent_queue::ConcurrentQueue; +use event_listener::Event; +use futures_lite::{Future, FutureExt}; + +pub type LocalBoxFuture<'fut> = Pin + 'fut>>; + +#[derive(Debug)] +struct FuturesQueueInner<'fut> { + queue: ConcurrentQueue>, + event: Event, +} + +impl FuturesQueueInner<'_> { + pub fn new() -> Self { + let queue = ConcurrentQueue::unbounded(); + let event = Event::new(); + Self { queue, event } + } +} + +/** + Queue for storing local futures. + + Provides methods for pushing and draining the queue, as + well as listening for new items being pushed to the queue. +*/ +#[derive(Debug, Clone)] +pub(crate) struct FuturesQueue<'fut> { + inner: Rc>, +} + +impl<'fut> FuturesQueue<'fut> { + pub fn new() -> Self { + let inner = Rc::new(FuturesQueueInner::new()); + Self { inner } + } + + pub fn push_item(&self, fut: impl Future + 'fut) { + let _ = self.inner.queue.push(fut.boxed_local()); + self.inner.event.notify(usize::MAX); + } + + pub fn drain_items<'outer>( + &'outer self, + ) -> impl Iterator> + 'outer { + self.inner.queue.try_iter() + } + + pub async fn wait_for_item(&self) { + if self.inner.queue.is_empty() { + self.inner.event.listen().await; + } + } +} diff --git a/crates/mlua-luau-scheduler/src/queue/generic.rs b/crates/mlua-luau-scheduler/src/queue/generic.rs new file mode 100644 index 0000000..c9917ce --- /dev/null +++ b/crates/mlua-luau-scheduler/src/queue/generic.rs @@ -0,0 +1,79 @@ +use std::rc::Rc; + +use concurrent_queue::ConcurrentQueue; +use event_listener::Event; +use mlua::prelude::*; + +use crate::{traits::IntoLuaThread, ThreadId}; + +#[derive(Debug)] +struct ThreadQueueInner { + queue: ConcurrentQueue<(LuaThread, LuaMultiValue)>, + event: Event, +} + +impl ThreadQueueInner { + fn new() -> Self { + let queue = ConcurrentQueue::unbounded(); + let event = Event::new(); + Self { queue, event } + } +} + +/** + Queue for storing [`LuaThread`]s with associated arguments. + + Provides methods for pushing and draining the queue, as + well as listening for new items being pushed to the queue. +*/ +#[derive(Debug, Clone)] +pub(crate) struct ThreadQueue { + inner: Rc, +} + +impl ThreadQueue { + pub fn new() -> Self { + let inner = Rc::new(ThreadQueueInner::new()); + Self { inner } + } + + pub fn push_item( + &self, + lua: &Lua, + thread: impl IntoLuaThread, + args: impl IntoLuaMulti, + ) -> LuaResult { + let thread = thread.into_lua_thread(lua)?; + let args = args.into_lua_multi(lua)?; + + tracing::trace!("pushing item to queue with {} args", args.len()); + let id = ThreadId::from(&thread); + + let _ = self.inner.queue.push((thread, args)); + self.inner.event.notify(usize::MAX); + + Ok(id) + } + + #[inline] + pub fn drain_items(&self) -> impl Iterator + '_ { + self.inner.queue.try_iter() + } + + #[inline] + pub async fn wait_for_item(&self) { + if self.inner.queue.is_empty() { + let listener = self.inner.event.listen(); + // NOTE: Need to check again, we could have gotten + // new queued items while creating our listener + if self.inner.queue.is_empty() { + listener.await; + } + } + } + + #[inline] + pub fn is_empty(&self) -> bool { + self.inner.queue.is_empty() + } +} diff --git a/crates/mlua-luau-scheduler/src/queue/mod.rs b/crates/mlua-luau-scheduler/src/queue/mod.rs new file mode 100644 index 0000000..d81648f --- /dev/null +++ b/crates/mlua-luau-scheduler/src/queue/mod.rs @@ -0,0 +1,8 @@ +mod deferred; +mod futures; +mod generic; +mod spawned; + +pub(crate) use self::deferred::DeferredThreadQueue; +pub(crate) use self::futures::FuturesQueue; +pub(crate) use self::spawned::SpawnedThreadQueue; diff --git a/crates/mlua-luau-scheduler/src/queue/spawned.rs b/crates/mlua-luau-scheduler/src/queue/spawned.rs new file mode 100644 index 0000000..5c918a5 --- /dev/null +++ b/crates/mlua-luau-scheduler/src/queue/spawned.rs @@ -0,0 +1,28 @@ +use std::ops::{Deref, DerefMut}; + +use super::generic::ThreadQueue; + +/** + Alias for [`ThreadQueue`], providing a newtype to store in Lua app data. +*/ +#[derive(Debug, Clone)] +pub(crate) struct SpawnedThreadQueue(ThreadQueue); + +impl SpawnedThreadQueue { + pub fn new() -> Self { + Self(ThreadQueue::new()) + } +} + +impl Deref for SpawnedThreadQueue { + type Target = ThreadQueue; + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for SpawnedThreadQueue { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +}