diff --git a/src/thread_runtime.rs b/src/thread_runtime.rs index 70365a6..cee3dbf 100644 --- a/src/thread_runtime.rs +++ b/src/thread_runtime.rs @@ -3,7 +3,7 @@ use std::{cell::Cell, rc::Rc}; use mlua::prelude::*; use smol::{ channel::{Receiver, Sender}, - future::FutureExt, + future::{yield_now, FutureExt}, lock::Mutex, stream::StreamExt, *, @@ -34,6 +34,16 @@ impl ThreadRuntime { let queue_defer = Rc::new(Mutex::new(Vec::new())); let (tx, rx) = channel::unbounded(); + // HACK: Extract mlua "pending" constant value and store it + let pending = lua + .create_async_function(|_, ()| async move { + yield_now().await; + Ok(()) + })? + .into_lua_thread(lua)? + .resume::<_, LuaValue>(())?; + let pending_key = lua.create_registry_value(pending)?; + // Create spawn function (push to start of queue) let b_spawn = Rc::clone(&queue_status); let q_spawn = Rc::clone(&queue_spawn); @@ -42,12 +52,26 @@ impl ThreadRuntime { move |lua, (tof, args): (LuaThreadOrFunction, LuaMultiValue)| { let thread = tof.into_thread(lua)?; if thread.status() == LuaThreadStatus::Resumable { - let stored = ThreadWithArgs::new(lua, thread.clone(), args); - q_spawn.lock_blocking().push(stored); - b_spawn.replace(true); - tx_spawn.try_send(()).map_err(|_| { - LuaError::runtime("Tried to spawn thread to a dropped queue") - })?; + // HACK: We need to resume the thread once instantly for correct behavior, + // and only if we get the pending value back we can spawn to async executor + let pending: LuaValue = lua.registry_value(&pending_key)?; + match thread.resume::<_, LuaValue>(args.clone()) { + Err(e) => { + eprintln!("{:?}", e); + // TODO: Forward error + } + Ok(v) if v == pending => { + let stored = ThreadWithArgs::new(lua, thread.clone(), args); + q_spawn.lock_blocking().push(stored); + b_spawn.replace(true); + tx_spawn.try_send(()).map_err(|_| { + LuaError::runtime("Tried to spawn thread to a dropped queue") + })?; + } + Ok(_) => { + // TODO: Forward value + } + } Ok(thread) } else { Err(LuaError::runtime("Tried to spawn non-resumable thread")) @@ -133,6 +157,12 @@ impl ThreadRuntime { }; let fut_tick = async { lua_exec.tick().await; + // Do as much work as possible + loop { + if !lua_exec.try_tick() { + break; + } + } }; fut_recv.or(fut_tick).await;