diff --git a/crates/mlua-luau-scheduler/src/queue/futures.rs b/crates/mlua-luau-scheduler/src/queue/futures.rs index eab6b1c..9edab7e 100644 --- a/crates/mlua-luau-scheduler/src/queue/futures.rs +++ b/crates/mlua-luau-scheduler/src/queue/futures.rs @@ -1,22 +1,22 @@ -use std::{pin::Pin, rc::Rc}; +use std::{cell::RefCell, mem, pin::Pin, rc::Rc}; -use concurrent_queue::ConcurrentQueue; -use event_listener::Event; -use futures_lite::{Future, FutureExt}; +use futures_lite::prelude::*; + +use super::event::QueueEvent; pub type LocalBoxFuture<'fut> = Pin + 'fut>>; -#[derive(Debug)] struct FuturesQueueInner<'fut> { - queue: ConcurrentQueue>, - event: Event, + queue: RefCell>>, + event: QueueEvent, } impl FuturesQueueInner<'_> { pub fn new() -> Self { - let queue = ConcurrentQueue::unbounded(); - let event = Event::new(); - Self { queue, event } + Self { + queue: RefCell::new(Vec::new()), + event: QueueEvent::new(), + } } } @@ -26,7 +26,7 @@ impl FuturesQueueInner<'_> { Provides methods for pushing and draining the queue, as well as listening for new items being pushed to the queue. */ -#[derive(Debug, Clone)] +#[derive(Clone)] pub(crate) struct FuturesQueue<'fut> { inner: Rc>, } @@ -38,18 +38,17 @@ impl<'fut> FuturesQueue<'fut> { } pub fn push_item(&self, fut: impl Future + 'fut) { - let _ = self.inner.queue.push(fut.boxed_local()); - self.inner.event.notify(usize::MAX); + self.inner.queue.borrow_mut().push(fut.boxed_local()); + self.inner.event.notify(); } - pub fn drain_items<'outer>( - &'outer self, - ) -> impl Iterator> + 'outer { - self.inner.queue.try_iter() + pub fn take_items(&self) -> Vec> { + let mut queue = self.inner.queue.borrow_mut(); + mem::take(&mut *queue) } pub async fn wait_for_item(&self) { - if self.inner.queue.is_empty() { + if self.inner.queue.borrow().is_empty() { self.inner.event.listen().await; } } diff --git a/crates/mlua-luau-scheduler/src/queue/threads.rs b/crates/mlua-luau-scheduler/src/queue/threads.rs index 6c3f100..122d196 100644 --- a/crates/mlua-luau-scheduler/src/queue/threads.rs +++ b/crates/mlua-luau-scheduler/src/queue/threads.rs @@ -1,6 +1,6 @@ #![allow(clippy::inline_always)] -use std::{cell::RefCell, collections::VecDeque, rc::Rc}; +use std::{cell::RefCell, mem, rc::Rc}; use mlua::prelude::*; @@ -10,14 +10,14 @@ use super::event::QueueEvent; #[derive(Debug)] struct ThreadQueueInner { - queue: RefCell>, + queue: RefCell>, event: QueueEvent, } impl ThreadQueueInner { fn new() -> Self { Self { - queue: RefCell::new(VecDeque::new()), + queue: RefCell::new(Vec::new()), event: QueueEvent::new(), } } @@ -52,15 +52,16 @@ impl ThreadQueue { tracing::trace!("pushing item to queue with {} args", args.len()); let id = ThreadId::from(&thread); - self.inner.queue.borrow_mut().push_back((thread, args)); + self.inner.queue.borrow_mut().push((thread, args)); self.inner.event.notify(); Ok(id) } #[inline(always)] - pub fn drain_items(&self) -> ThreadQueueDrain<'_> { - ThreadQueueDrain::new(self) + pub fn take_items(&self) -> Vec<(LuaThread, LuaMultiValue)> { + let mut queue = self.inner.queue.borrow_mut(); + mem::take(&mut *queue) } #[inline(always)] @@ -75,25 +76,3 @@ impl ThreadQueue { self.inner.queue.borrow().is_empty() } } - -/** - Iterator that drains the thread queue, - popping items from the front first. -*/ -pub(crate) struct ThreadQueueDrain<'a> { - queue: &'a ThreadQueue, -} - -impl<'a> ThreadQueueDrain<'a> { - pub fn new(queue: &'a ThreadQueue) -> Self { - Self { queue } - } -} - -impl Iterator for ThreadQueueDrain<'_> { - type Item = (LuaThread, LuaMultiValue); - - fn next(&mut self) -> Option { - self.queue.inner.queue.borrow_mut().pop_front() - } -} diff --git a/crates/mlua-luau-scheduler/src/scheduler.rs b/crates/mlua-luau-scheduler/src/scheduler.rs index 00bd2fd..5191a8b 100644 --- a/crates/mlua-luau-scheduler/src/scheduler.rs +++ b/crates/mlua-luau-scheduler/src/scheduler.rs @@ -396,21 +396,21 @@ impl Scheduler { let mut num_futures = 0; { let _span = trace_span!("Scheduler::drain_spawned").entered(); - for (thread, args) in self.queue_spawn.drain_items() { + for (thread, args) in self.queue_spawn.take_items() { process_thread(thread, args); num_spawned += 1; } } { let _span = trace_span!("Scheduler::drain_deferred").entered(); - for (thread, args) in self.queue_defer.drain_items() { + for (thread, args) in self.queue_defer.take_items() { process_thread(thread, args); num_deferred += 1; } } { let _span = trace_span!("Scheduler::drain_futures").entered(); - for fut in fut_queue.drain_items() { + for fut in fut_queue.take_items() { local_exec.spawn(fut).detach(); num_futures += 1; }