Final optimizations for future and thread queues

This commit is contained in:
Filip Tibell 2025-04-30 14:47:16 +02:00
parent 4c2bbcf425
commit d425d2568a
No known key found for this signature in database
3 changed files with 27 additions and 49 deletions

View file

@ -1,22 +1,22 @@
use std::{pin::Pin, rc::Rc}; use std::{cell::RefCell, mem, pin::Pin, rc::Rc};
use concurrent_queue::ConcurrentQueue; use futures_lite::prelude::*;
use event_listener::Event;
use futures_lite::{Future, FutureExt}; use super::event::QueueEvent;
pub type LocalBoxFuture<'fut> = Pin<Box<dyn Future<Output = ()> + 'fut>>; pub type LocalBoxFuture<'fut> = Pin<Box<dyn Future<Output = ()> + 'fut>>;
#[derive(Debug)]
struct FuturesQueueInner<'fut> { struct FuturesQueueInner<'fut> {
queue: ConcurrentQueue<LocalBoxFuture<'fut>>, queue: RefCell<Vec<LocalBoxFuture<'fut>>>,
event: Event, event: QueueEvent,
} }
impl FuturesQueueInner<'_> { impl FuturesQueueInner<'_> {
pub fn new() -> Self { pub fn new() -> Self {
let queue = ConcurrentQueue::unbounded(); Self {
let event = Event::new(); queue: RefCell::new(Vec::new()),
Self { queue, event } event: QueueEvent::new(),
}
} }
} }
@ -26,7 +26,7 @@ impl FuturesQueueInner<'_> {
Provides methods for pushing and draining the queue, as Provides methods for pushing and draining the queue, as
well as listening for new items being pushed to the queue. well as listening for new items being pushed to the queue.
*/ */
#[derive(Debug, Clone)] #[derive(Clone)]
pub(crate) struct FuturesQueue<'fut> { pub(crate) struct FuturesQueue<'fut> {
inner: Rc<FuturesQueueInner<'fut>>, inner: Rc<FuturesQueueInner<'fut>>,
} }
@ -38,18 +38,17 @@ impl<'fut> FuturesQueue<'fut> {
} }
pub fn push_item(&self, fut: impl Future<Output = ()> + 'fut) { pub fn push_item(&self, fut: impl Future<Output = ()> + 'fut) {
let _ = self.inner.queue.push(fut.boxed_local()); self.inner.queue.borrow_mut().push(fut.boxed_local());
self.inner.event.notify(usize::MAX); self.inner.event.notify();
} }
pub fn drain_items<'outer>( pub fn take_items(&self) -> Vec<LocalBoxFuture<'fut>> {
&'outer self, let mut queue = self.inner.queue.borrow_mut();
) -> impl Iterator<Item = LocalBoxFuture<'fut>> + 'outer { mem::take(&mut *queue)
self.inner.queue.try_iter()
} }
pub async fn wait_for_item(&self) { pub async fn wait_for_item(&self) {
if self.inner.queue.is_empty() { if self.inner.queue.borrow().is_empty() {
self.inner.event.listen().await; self.inner.event.listen().await;
} }
} }

View file

@ -1,6 +1,6 @@
#![allow(clippy::inline_always)] #![allow(clippy::inline_always)]
use std::{cell::RefCell, collections::VecDeque, rc::Rc}; use std::{cell::RefCell, mem, rc::Rc};
use mlua::prelude::*; use mlua::prelude::*;
@ -10,14 +10,14 @@ use super::event::QueueEvent;
#[derive(Debug)] #[derive(Debug)]
struct ThreadQueueInner { struct ThreadQueueInner {
queue: RefCell<VecDeque<(LuaThread, LuaMultiValue)>>, queue: RefCell<Vec<(LuaThread, LuaMultiValue)>>,
event: QueueEvent, event: QueueEvent,
} }
impl ThreadQueueInner { impl ThreadQueueInner {
fn new() -> Self { fn new() -> Self {
Self { Self {
queue: RefCell::new(VecDeque::new()), queue: RefCell::new(Vec::new()),
event: QueueEvent::new(), event: QueueEvent::new(),
} }
} }
@ -52,15 +52,16 @@ impl ThreadQueue {
tracing::trace!("pushing item to queue with {} args", args.len()); tracing::trace!("pushing item to queue with {} args", args.len());
let id = ThreadId::from(&thread); let id = ThreadId::from(&thread);
self.inner.queue.borrow_mut().push_back((thread, args)); self.inner.queue.borrow_mut().push((thread, args));
self.inner.event.notify(); self.inner.event.notify();
Ok(id) Ok(id)
} }
#[inline(always)] #[inline(always)]
pub fn drain_items(&self) -> ThreadQueueDrain<'_> { pub fn take_items(&self) -> Vec<(LuaThread, LuaMultiValue)> {
ThreadQueueDrain::new(self) let mut queue = self.inner.queue.borrow_mut();
mem::take(&mut *queue)
} }
#[inline(always)] #[inline(always)]
@ -75,25 +76,3 @@ impl ThreadQueue {
self.inner.queue.borrow().is_empty() self.inner.queue.borrow().is_empty()
} }
} }
/**
Iterator that drains the thread queue,
popping items from the front first.
*/
pub(crate) struct ThreadQueueDrain<'a> {
queue: &'a ThreadQueue,
}
impl<'a> ThreadQueueDrain<'a> {
pub fn new(queue: &'a ThreadQueue) -> Self {
Self { queue }
}
}
impl Iterator for ThreadQueueDrain<'_> {
type Item = (LuaThread, LuaMultiValue);
fn next(&mut self) -> Option<Self::Item> {
self.queue.inner.queue.borrow_mut().pop_front()
}
}

View file

@ -396,21 +396,21 @@ impl Scheduler {
let mut num_futures = 0; let mut num_futures = 0;
{ {
let _span = trace_span!("Scheduler::drain_spawned").entered(); let _span = trace_span!("Scheduler::drain_spawned").entered();
for (thread, args) in self.queue_spawn.drain_items() { for (thread, args) in self.queue_spawn.take_items() {
process_thread(thread, args); process_thread(thread, args);
num_spawned += 1; num_spawned += 1;
} }
} }
{ {
let _span = trace_span!("Scheduler::drain_deferred").entered(); let _span = trace_span!("Scheduler::drain_deferred").entered();
for (thread, args) in self.queue_defer.drain_items() { for (thread, args) in self.queue_defer.take_items() {
process_thread(thread, args); process_thread(thread, args);
num_deferred += 1; num_deferred += 1;
} }
} }
{ {
let _span = trace_span!("Scheduler::drain_futures").entered(); let _span = trace_span!("Scheduler::drain_futures").entered();
for fut in fut_queue.drain_items() { for fut in fut_queue.take_items() {
local_exec.spawn(fut).detach(); local_exec.spawn(fut).detach();
num_futures += 1; num_futures += 1;
} }