Use async thread as stream, compare to mlua internal pending value

This commit is contained in:
Filip Tibell 2024-01-18 17:58:53 +01:00
parent 710c93d74b
commit cd090d877f
No known key found for this signature in database
2 changed files with 50 additions and 24 deletions

View file

@ -3,16 +3,18 @@ for i = 1, 5 do
local thread = coroutine.running() local thread = coroutine.running()
local counter = 0 local counter = 0
for j = 1, 10_000 do for j = 1, 50_000 do
spawn(function() spawn(function()
wait(0.1 * math.random()) wait(0.1 + 0.1 * math.random())
counter += 1 counter += 1
if counter == 10_000 then if counter == 50_000 then
print("completed iteration " .. tostring(i) .. " of 5") print("completed iteration " .. tostring(i) .. " of 5")
spawn(thread) spawn(thread)
end end
end) end)
end end
wait(0.1 * math.random())
coroutine.yield() coroutine.yield()
end end

View file

@ -3,8 +3,9 @@ use std::{collections::VecDeque, rc::Rc};
use mlua::prelude::*; use mlua::prelude::*;
use smol::{ use smol::{
channel::{Receiver, Sender}, channel::{Receiver, Sender},
future::race, future::{race, yield_now},
lock::Mutex, lock::Mutex,
stream::StreamExt,
*, *,
}; };
@ -14,6 +15,7 @@ use super::{
}; };
pub struct ThreadRuntime { pub struct ThreadRuntime {
pending_key: LuaRegistryKey,
queue: Rc<Mutex<VecDeque<ThreadWithArgs>>>, queue: Rc<Mutex<VecDeque<ThreadWithArgs>>>,
tx: Sender<()>, tx: Sender<()>,
rx: Receiver<()>, rx: Receiver<()>,
@ -72,7 +74,25 @@ impl ThreadRuntime {
lua.globals().set("spawn", fn_spawn)?; lua.globals().set("spawn", fn_spawn)?;
lua.globals().set("defer", fn_defer)?; lua.globals().set("defer", fn_defer)?;
Ok(ThreadRuntime { queue, tx, rx }) // HACK: Extract mlua "pending" constant value
let pending = lua
.create_async_function(|_, ()| async move {
yield_now().await;
Ok(())
})
.unwrap()
.into_lua_thread(lua)
.unwrap()
.resume::<_, LuaValue>(())
.unwrap();
let pending_key = lua.create_registry_value(pending).unwrap();
Ok(ThreadRuntime {
pending_key,
queue,
tx,
rx,
})
} }
/** /**
@ -110,39 +130,43 @@ impl ThreadRuntime {
// executor forward, until all lua threads finish // executor forward, until all lua threads finish
let fut = async { let fut = async {
loop { loop {
let did_spawn = race( race(
// Wait for next futures step... // Wait for next futures step...
async { async {
lua_exec.tick().await; lua_exec.tick().await;
false
}, },
// ...or for a new thread to arrive // ...or for a new thread to arrive
async { async {
self.rx.recv().await.ok(); self.rx.recv().await.ok();
true
}, },
) )
.await; .await;
// If a new thread was spawned onto queue, we // If a new thread was spawned onto queue, we
// must drain it and schedule on the executor // must drain it and schedule on the executor
if did_spawn { for queued_thread in self.queue.lock().await.drain(..) {
let queued_threads = self.queue.lock().await.drain(..).collect::<Vec<_>>(); // NOTE: Thread may have been cancelled from lua
for queued_thread in queued_threads { // before we got here, so we need to check it again
// NOTE: Thread may have been cancelled from lua let (thread, args) = queued_thread.into_inner(lua);
// before we got here, so we need to check it again if thread.status() == LuaThreadStatus::Resumable {
let (thread, args) = queued_thread.into_inner(lua); let pending = lua.registry_value(&self.pending_key).unwrap();
if thread.status() == LuaThreadStatus::Resumable { let mut stream = thread.into_async::<_, LuaValue>(args);
let fut = thread.into_async::<_, ()>(args);
lua_exec // Keep resuming the thread until we get a
.spawn(async move { // value that is not the mlua pending value
match fut.await { let fut = async move {
Ok(()) => {} while let Some(res) = stream.next().await {
Err(e) => eprintln!("{e}"), match res {
Err(e) => eprintln!("{e}"),
Ok(v) if v != pending => {
break;
} }
}) Ok(_) => {}
.detach(); }
} }
};
lua_exec.spawn(fut).detach();
} }
} }