Implement optimized event listener for thread queues

This commit is contained in:
Filip Tibell 2025-04-30 14:10:35 +02:00
parent 7fd390dead
commit 461ca24c33
No known key found for this signature in database
3 changed files with 98 additions and 5 deletions

View file

@ -0,0 +1,91 @@
use std::{
cell::{Cell, RefCell},
future::Future,
mem,
pin::Pin,
rc::Rc,
task::{Context, Poll, Waker},
};
/**
Internal state for queue events.
*/
#[derive(Debug, Default)]
struct QueueEventState {
generation: Cell<u64>,
wakers: RefCell<Vec<Waker>>,
}
/**
A single-threaded event signal that can be notified multiple times.
*/
#[derive(Debug, Clone, Default)]
pub(crate) struct QueueEvent {
state: Rc<QueueEventState>,
}
impl QueueEvent {
/**
Creates a new event.
*/
pub fn new() -> Self {
Self::default()
}
/**
Notifies all waiting listeners.
*/
pub fn notify(&self) {
self.state.generation.set(self.state.generation.get() + 1);
let wakers = {
let mut wakers = self.state.wakers.borrow_mut();
mem::take(&mut *wakers)
};
for waker in wakers {
waker.wake();
}
}
/**
Creates a listener that implements `Future` and resolves when `notify` is called.
*/
pub fn listen(&self) -> QueueListener {
QueueListener {
state: self.state.clone(),
generation: self.state.generation.get(),
}
}
}
/**
A listener future that resolves when the corresponding [`QueueEvent`] is notified.
*/
#[derive(Debug)]
pub(crate) struct QueueListener {
state: Rc<QueueEventState>,
generation: u64,
}
impl Future for QueueListener {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// Check if notify was called (generation is more recent)
let current = self.state.generation.get();
if current > self.generation {
self.get_mut().generation = current;
return Poll::Ready(());
}
// No notification observed yet
let mut wakers = self.state.wakers.borrow_mut();
if !wakers.iter().any(|w| w.will_wake(cx.waker())) {
wakers.push(cx.waker().clone());
}
Poll::Pending
}
}
impl Unpin for QueueListener {}

View file

@ -1,21 +1,22 @@
use std::rc::Rc;
use concurrent_queue::ConcurrentQueue;
use event_listener::Event;
use mlua::prelude::*;
use crate::{traits::IntoLuaThread, ThreadId};
use crate::{threads::ThreadId, traits::IntoLuaThread};
use super::event::QueueEvent;
#[derive(Debug)]
struct ThreadQueueInner {
queue: ConcurrentQueue<(LuaThread, LuaMultiValue)>,
event: Event,
event: QueueEvent,
}
impl ThreadQueueInner {
fn new() -> Self {
let queue = ConcurrentQueue::unbounded();
let event = Event::new();
let event = QueueEvent::new();
Self { queue, event }
}
}
@ -50,7 +51,7 @@ impl ThreadQueue {
let id = ThreadId::from(&thread);
let _ = self.inner.queue.push((thread, args));
self.inner.event.notify(usize::MAX);
self.inner.event.notify();
Ok(id)
}

View file

@ -1,4 +1,5 @@
mod deferred;
mod event;
mod futures;
mod generic;
mod spawned;