diff --git a/src/main.rs b/src/main.rs index ebed917..af60383 100644 --- a/src/main.rs +++ b/src/main.rs @@ -20,7 +20,7 @@ pub fn main() -> LuaResult<()> { lua.globals().set( "wait", lua.create_async_function(|_, duration: Option| async move { - let duration = duration.unwrap_or_default().min(1.0 / 250.0); + let duration = duration.unwrap_or_default().max(1.0 / 250.0); let before = Instant::now(); let after = Timer::after(Duration::from_secs_f64(duration)).await; Ok((after - before).as_secs_f64()) diff --git a/src/thread_runtime.rs b/src/thread_runtime.rs index 61da3b5..70365a6 100644 --- a/src/thread_runtime.rs +++ b/src/thread_runtime.rs @@ -1,4 +1,4 @@ -use std::{collections::VecDeque, rc::Rc}; +use std::{cell::Cell, rc::Rc}; use mlua::prelude::*; use smol::{ @@ -15,7 +15,9 @@ use super::{ }; pub struct ThreadRuntime { - queue: Rc>>, + queue_status: Rc>, + queue_spawn: Rc>>, + queue_defer: Rc>>, tx: Sender<()>, rx: Receiver<()>, } @@ -27,18 +29,22 @@ impl ThreadRuntime { This will inject some functions to interact with the scheduler / executor. */ pub fn new(lua: &Lua) -> LuaResult { - let queue = Rc::new(Mutex::new(VecDeque::new())); + let queue_status = Rc::new(Cell::new(false)); + let queue_spawn = Rc::new(Mutex::new(Vec::new())); + let queue_defer = Rc::new(Mutex::new(Vec::new())); let (tx, rx) = channel::unbounded(); // Create spawn function (push to start of queue) - let queue_spawn = Rc::clone(&queue); + let b_spawn = Rc::clone(&queue_status); + let q_spawn = Rc::clone(&queue_spawn); let tx_spawn = tx.clone(); let fn_spawn = lua.create_function( move |lua, (tof, args): (LuaThreadOrFunction, LuaMultiValue)| { let thread = tof.into_thread(lua)?; if thread.status() == LuaThreadStatus::Resumable { let stored = ThreadWithArgs::new(lua, thread.clone(), args); - queue_spawn.lock_blocking().push_front(stored); + q_spawn.lock_blocking().push(stored); + b_spawn.replace(true); tx_spawn.try_send(()).map_err(|_| { LuaError::runtime("Tried to spawn thread to a dropped queue") })?; @@ -50,14 +56,16 @@ impl ThreadRuntime { )?; // Create defer function (push to end of queue) - let queue_defer = Rc::clone(&queue); + let b_defer = Rc::clone(&queue_status); + let q_defer = Rc::clone(&queue_defer); let tx_defer = tx.clone(); let fn_defer = lua.create_function( move |lua, (tof, args): (LuaThreadOrFunction, LuaMultiValue)| { let thread = tof.into_thread(lua)?; if thread.status() == LuaThreadStatus::Resumable { let stored = ThreadWithArgs::new(lua, thread.clone(), args); - queue_defer.lock_blocking().push_back(stored); + q_defer.lock_blocking().push(stored); + b_defer.replace(true); tx_defer.try_send(()).map_err(|_| { LuaError::runtime("Tried to defer thread to a dropped queue") })?; @@ -73,7 +81,13 @@ impl ThreadRuntime { lua.globals().set("spawn", fn_spawn)?; lua.globals().set("defer", fn_defer)?; - Ok(ThreadRuntime { queue, tx, rx }) + Ok(ThreadRuntime { + queue_status, + queue_spawn, + queue_defer, + tx, + rx, + }) } /** @@ -92,7 +106,8 @@ impl ThreadRuntime { let stored = ThreadWithArgs::new(lua, thread, args); - self.queue.lock_blocking().push_front(stored); + self.queue_spawn.lock_blocking().push(stored); + self.queue_status.replace(true); self.tx.try_send(()).unwrap(); // Unwrap is safe since this struct also holds the receiver } @@ -113,38 +128,43 @@ impl ThreadRuntime { loop { // Wait for a new thread to arrive __or__ next futures step, prioritizing // new threads, so we don't accidentally exit when there is more work to do - self.rx - .recv() - .or(async { - lua_exec.tick().await; - Ok(()) - }) - .await - .ok(); + let fut_recv = async { + self.rx.recv().await.ok(); + }; + let fut_tick = async { + lua_exec.tick().await; + }; + fut_recv.or(fut_tick).await; - // If a new thread was spawned onto queue, we - // must drain it and schedule on the executor - for queued_thread in self.queue.lock().await.drain(..) { - // NOTE: Thread may have been cancelled from lua - // before we got here, so we need to check it again - let (thread, args) = queued_thread.into_inner(lua); - if thread.status() == LuaThreadStatus::Resumable { - let mut stream = thread.into_async::<_, LuaValue>(args); - lua_exec - .spawn(async move { - // Only run stream until first coroutine.yield or completion, - // this will then get dropped right away and clear stack space - match stream.next().await.unwrap() { - Err(e) => { - eprintln!("{e}"); - // TODO: Forward error + // If a new thread was spawned onto any queue, we + // must drain them and schedule on the executor + if self.queue_status.get() { + let mut queued_threads = Vec::new(); + queued_threads.extend(self.queue_spawn.lock().await.drain(..)); + queued_threads.extend(self.queue_defer.lock().await.drain(..)); + for queued_thread in queued_threads { + // NOTE: Thread may have been cancelled from lua + // before we got here, so we need to check it again + let (thread, args) = queued_thread.into_inner(lua); + if thread.status() == LuaThreadStatus::Resumable { + let mut stream = thread.into_async::<_, LuaValue>(args); + lua_exec + .spawn(async move { + // Only run stream until first coroutine.yield or completion. We will + // drop it right away to clear stack space since detached tasks dont drop + // until the executor drops https://github.com/smol-rs/smol/issues/294 + match stream.next().await.unwrap() { + Err(e) => { + eprintln!("{e}"); + // TODO: Forward error + } + Ok(_) => { + // TODO: Forward value + } } - Ok(_) => { - // TODO: Forward value - } - } - }) - .detach(); + }) + .detach(); + } } }