diff --git a/crates/mlua-luau-scheduler/src/queue/event.rs b/crates/mlua-luau-scheduler/src/queue/event.rs new file mode 100644 index 0000000..ad012ce --- /dev/null +++ b/crates/mlua-luau-scheduler/src/queue/event.rs @@ -0,0 +1,91 @@ +use std::{ + cell::{Cell, RefCell}, + future::Future, + mem, + pin::Pin, + rc::Rc, + task::{Context, Poll, Waker}, +}; + +/** + Internal state for queue events. +*/ +#[derive(Debug, Default)] +struct QueueEventState { + generation: Cell, + wakers: RefCell>, +} + +/** + A single-threaded event signal that can be notified multiple times. +*/ +#[derive(Debug, Clone, Default)] +pub(crate) struct QueueEvent { + state: Rc, +} + +impl QueueEvent { + /** + Creates a new event. + */ + pub fn new() -> Self { + Self::default() + } + + /** + Notifies all waiting listeners. + */ + pub fn notify(&self) { + self.state.generation.set(self.state.generation.get() + 1); + + let wakers = { + let mut wakers = self.state.wakers.borrow_mut(); + mem::take(&mut *wakers) + }; + + for waker in wakers { + waker.wake(); + } + } + + /** + Creates a listener that implements `Future` and resolves when `notify` is called. + */ + pub fn listen(&self) -> QueueListener { + QueueListener { + state: self.state.clone(), + generation: self.state.generation.get(), + } + } +} + +/** + A listener future that resolves when the corresponding [`QueueEvent`] is notified. +*/ +#[derive(Debug)] +pub(crate) struct QueueListener { + state: Rc, + generation: u64, +} + +impl Future for QueueListener { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + // Check if notify was called (generation is more recent) + let current = self.state.generation.get(); + if current > self.generation { + self.get_mut().generation = current; + return Poll::Ready(()); + } + + // No notification observed yet + let mut wakers = self.state.wakers.borrow_mut(); + if !wakers.iter().any(|w| w.will_wake(cx.waker())) { + wakers.push(cx.waker().clone()); + } + Poll::Pending + } +} + +impl Unpin for QueueListener {} diff --git a/crates/mlua-luau-scheduler/src/queue/generic.rs b/crates/mlua-luau-scheduler/src/queue/generic.rs index c9917ce..b069e8f 100644 --- a/crates/mlua-luau-scheduler/src/queue/generic.rs +++ b/crates/mlua-luau-scheduler/src/queue/generic.rs @@ -1,21 +1,22 @@ use std::rc::Rc; use concurrent_queue::ConcurrentQueue; -use event_listener::Event; use mlua::prelude::*; -use crate::{traits::IntoLuaThread, ThreadId}; +use crate::{threads::ThreadId, traits::IntoLuaThread}; + +use super::event::QueueEvent; #[derive(Debug)] struct ThreadQueueInner { queue: ConcurrentQueue<(LuaThread, LuaMultiValue)>, - event: Event, + event: QueueEvent, } impl ThreadQueueInner { fn new() -> Self { let queue = ConcurrentQueue::unbounded(); - let event = Event::new(); + let event = QueueEvent::new(); Self { queue, event } } } @@ -50,7 +51,7 @@ impl ThreadQueue { let id = ThreadId::from(&thread); let _ = self.inner.queue.push((thread, args)); - self.inner.event.notify(usize::MAX); + self.inner.event.notify(); Ok(id) } diff --git a/crates/mlua-luau-scheduler/src/queue/mod.rs b/crates/mlua-luau-scheduler/src/queue/mod.rs index d81648f..21d6e10 100644 --- a/crates/mlua-luau-scheduler/src/queue/mod.rs +++ b/crates/mlua-luau-scheduler/src/queue/mod.rs @@ -1,4 +1,5 @@ mod deferred; +mod event; mod futures; mod generic; mod spawned;