This is the way, part 2

This commit is contained in:
Filip Tibell 2024-01-18 10:47:24 +01:00
parent ff812a5a77
commit 85da5e5646
No known key found for this signature in database
5 changed files with 307 additions and 107 deletions

View file

@ -1,73 +0,0 @@
use std::{rc::Rc, sync::Arc};
use mlua::prelude::*;
use smol::*;
struct LuaSmol<'ex> {
lua_exec: Rc<LocalExecutor<'ex>>,
main_exec: Arc<Executor<'ex>>,
}
// HACK: self_cell is not actually used to make a self-referential struct here,
// it is instead used to guarantee the lifetime of the executors. It does not
// need to refer to Lua during construction at all but the end result is the
// same and we let the self_cell crate handle all the unsafe code for us.
self_cell::self_cell!(
struct LuaExecutorInner {
owner: Rc<Lua>,
#[not_covariant]
dependent: LuaSmol,
}
);
impl LuaExecutorInner {
fn create(lua: Rc<Lua>) -> Self {
LuaExecutorInner::new(lua, |_| {
let lua_exec = Rc::new(LocalExecutor::new());
let main_exec = Arc::new(Executor::new());
LuaSmol {
lua_exec,
main_exec,
}
})
}
}
pub struct LuaExecutor {
_lua: Rc<Lua>,
inner: LuaExecutorInner,
}
impl LuaExecutor {
pub fn new(lua: Rc<Lua>) -> Self {
Self {
_lua: Rc::clone(&lua),
inner: LuaExecutorInner::create(lua),
}
}
pub fn run<'outer_fn, F>(&'outer_fn self, futures_spawner: F) -> LuaResult<()>
where
F: for<'lua> FnOnce(
&'lua Lua,
&'outer_fn LocalExecutor<'lua>,
&'outer_fn Executor<'lua>,
) -> LuaResult<()>,
{
self.inner.with_dependent(|lua, rt_executors| {
// 1. Spawn futures using the provided function
let lua_exec = &rt_executors.lua_exec;
let main_exec = &rt_executors.main_exec;
futures_spawner(lua, lua_exec, main_exec)?;
// 2. Run them all until lua executor completes
block_on(main_exec.run(async {
while !lua_exec.is_empty() {
lua_exec.tick().await;
}
}));
// 3. Yay!
Ok(())
})
}
}

View file

@ -1,25 +1,38 @@
use std::{
rc::Rc,
time::{Duration, Instant},
};
use std::time::{Duration, Instant};
use mlua::prelude::*;
use smol::*;
const NUM_TEST_BATCHES: usize = 20;
const NUM_TEST_THREADS: usize = 50_000;
const MAIN_CHUNK: &str = r#"
wait(0.01 * math.random())
for i = 1, 5 do
print("iteration " .. tostring(i) .. " of 5")
local thread = coroutine.running()
local counter = 0
for j = 1, 10_000 do
__scheduler__spawn(function()
wait(0.1 * math.random())
counter += 1
if counter == 10_000 then
print("completed iteration " .. tostring(i) .. " of 5")
end
end)
end
coroutine.yield() -- FIXME: This resumes instantly with mlua "async" feature
end
"#;
mod executor;
use executor::*;
mod thread_runtime;
mod thread_storage;
mod thread_util;
use thread_runtime::*;
use thread_storage::*;
pub fn main() -> LuaResult<()> {
let lua = Rc::new(Lua::new());
let rt = LuaExecutor::new(Rc::clone(&lua));
let start = Instant::now();
let lua = Lua::new();
// Set up persistent lua environment
lua.globals().set(
"wait",
lua.create_async_function(|_, duration: f64| async move {
@ -29,28 +42,10 @@ pub fn main() -> LuaResult<()> {
})?,
)?;
let start = Instant::now();
let main_fn = lua.load(MAIN_CHUNK).into_function()?;
for _ in 0..NUM_TEST_BATCHES {
rt.run(|lua, lua_exec, _| {
// TODO: Figure out how to create a scheduler queue that we can
// append threads to, both front and back, and resume them in order
for _ in 0..NUM_TEST_THREADS {
let thread = lua.create_thread(main_fn.clone())?;
let task = lua_exec.spawn(async move {
if let Err(err) = thread.into_async::<_, ()>(()).await {
println!("error: {}", err);
}
Ok::<_, LuaError>(())
});
task.detach();
}
Ok(())
})?;
}
// Set up runtime (thread queue / async executors) and run main script until end
let rt = ThreadRuntime::new(&lua)?;
rt.push_main(&lua, lua.load(MAIN_CHUNK), ());
rt.run_blocking(&lua);
println!("elapsed: {:?}", start.elapsed());

167
src/smol/thread_runtime.rs Normal file
View file

@ -0,0 +1,167 @@
use std::{collections::VecDeque, rc::Rc};
use mlua::prelude::*;
use smol::{
channel::{Receiver, Sender},
future::race,
lock::Mutex,
*,
};
use super::{
thread_util::{IntoLuaThread, LuaThreadOrFunction},
ThreadWithArgs,
};
pub struct ThreadRuntime {
queue: Rc<Mutex<VecDeque<ThreadWithArgs>>>,
tx: Sender<()>,
rx: Receiver<()>,
}
impl ThreadRuntime {
/**
Creates a new runtime for the given Lua state.
This will inject some functions to interact with the scheduler / executor.
*/
pub fn new(lua: &Lua) -> LuaResult<ThreadRuntime> {
let queue = Rc::new(Mutex::new(VecDeque::new()));
let (tx, rx) = channel::unbounded();
// Create spawn function (push to start of queue)
let queue_spawn = Rc::clone(&queue);
let tx_spawn = tx.clone();
let fn_spawn = lua.create_function(
move |lua, (tof, args): (LuaThreadOrFunction, LuaMultiValue)| {
let thread = tof.into_thread(lua)?;
if thread.status() == LuaThreadStatus::Resumable {
let stored = ThreadWithArgs::new(lua, thread.clone(), args);
queue_spawn.lock_blocking().push_front(stored);
tx_spawn.try_send(()).map_err(|_| {
LuaError::runtime("Tried to spawn thread to a dropped queue")
})?;
Ok(thread)
} else {
Err(LuaError::runtime("Tried to spawn non-resumable thread"))
}
},
)?;
// Create defer function (push to end of queue)
let queue_defer = Rc::clone(&queue);
let tx_defer = tx.clone();
let fn_defer = lua.create_function(
move |lua, (tof, args): (LuaThreadOrFunction, LuaMultiValue)| {
let thread = tof.into_thread(lua)?;
if thread.status() == LuaThreadStatus::Resumable {
let stored = ThreadWithArgs::new(lua, thread.clone(), args);
queue_defer.lock_blocking().push_back(stored);
tx_defer.try_send(()).map_err(|_| {
LuaError::runtime("Tried to defer thread to a dropped queue")
})?;
Ok(thread)
} else {
Err(LuaError::runtime("Tried to defer non-resumable thread"))
}
},
)?;
// FUTURE: Store these as named registry values instead
// so that they are not accessible from within user code
lua.globals().set("__scheduler__spawn", fn_spawn)?;
lua.globals().set("__scheduler__defer", fn_defer)?;
Ok(ThreadRuntime { queue, tx, rx })
}
/**
Pushes a chunk / function / thread to the front of the runtime.
*/
pub fn push_main<'lua>(
&self,
lua: &'lua Lua,
thread: impl IntoLuaThread<'lua>,
args: impl IntoLuaMulti<'lua>,
) {
let thread = thread
.into_lua_thread(lua)
.expect("failed to create thread");
let args = args.into_lua_multi(lua).expect("failed to create args");
let stored = ThreadWithArgs::new(lua, thread, args);
self.queue.lock_blocking().push_front(stored);
self.tx.try_send(()).unwrap();
}
/**
Runs the runtime until all Lua threads have completed.
Note that the given Lua state must be the same one that was
used to create this runtime, otherwise this method may panic.
*/
pub async fn run_async(&self, lua: &Lua) {
// Create new executors to use
let lua_exec = LocalExecutor::new();
let main_exec = Executor::new();
// Tick local lua executor while also driving main
// executor forward, until all lua threads finish
let fut = async {
loop {
let did_spawn = race(
// Wait for next futures step...
async {
lua_exec.tick().await;
false
},
// ...or for a new thread to arrive
async {
self.rx.recv().await.ok();
true
},
)
.await;
// If a new thread was spawned onto queue, we
// must drain it and schedule on the executor
if did_spawn {
let queued_threads = self.queue.lock().await.drain(..).collect::<Vec<_>>();
for queued_thread in queued_threads {
// NOTE: Thread may have been cancelled from lua
// before we got here, so we need to check it again
let (thread, args) = queued_thread.into_inner(lua);
if thread.status() == LuaThreadStatus::Resumable {
let fut = thread.into_async::<_, ()>(args);
lua_exec
.spawn(async move {
match fut.await {
Ok(()) => {}
Err(e) => eprintln!("{e}"),
}
})
.detach();
}
}
}
// Empty executor = no remaining threads
if lua_exec.is_empty() {
break;
}
}
};
main_exec.run(fut).await
}
/**
Runs the runtime until all Lua threads have completed, blocking the thread.
See [`ThreadRuntime::run_async`] for more info.
*/
pub fn run_blocking(&self, lua: &Lua) {
block_on(self.run_async(lua))
}
}

View file

@ -0,0 +1,43 @@
use mlua::prelude::*;
#[derive(Debug)]
pub struct ThreadWithArgs {
key_thread: LuaRegistryKey,
key_args: LuaRegistryKey,
}
impl ThreadWithArgs {
pub fn new<'lua>(lua: &'lua Lua, thread: LuaThread<'lua>, args: LuaMultiValue<'lua>) -> Self {
let args_vec = args.into_vec();
let key_thread = lua
.create_registry_value(thread)
.expect("Failed to store thread in registry - out of memory");
let key_args = lua
.create_registry_value(args_vec)
.expect("Failed to store thread args in registry - out of memory");
Self {
key_thread,
key_args,
}
}
pub fn into_inner(self, lua: &Lua) -> (LuaThread<'_>, LuaMultiValue<'_>) {
let thread = lua
.registry_value(&self.key_thread)
.expect("Failed to get thread from registry");
let args_vec = lua
.registry_value(&self.key_args)
.expect("Failed to get thread args from registry");
let args = LuaMultiValue::from_vec(args_vec);
lua.remove_registry_value(self.key_thread)
.expect("Failed to remove thread from registry");
lua.remove_registry_value(self.key_args)
.expect("Failed to remove thread args from registry");
(thread, args)
}
}

68
src/smol/thread_util.rs Normal file
View file

@ -0,0 +1,68 @@
use mlua::prelude::*;
/**
Wrapper struct to accept either a Lua thread or a Lua function as function argument.
[`LuaThreadOrFunction::into_thread`] may be used to convert the value into a Lua thread.
*/
#[derive(Clone)]
pub enum LuaThreadOrFunction<'lua> {
Thread(LuaThread<'lua>),
Function(LuaFunction<'lua>),
}
impl<'lua> LuaThreadOrFunction<'lua> {
pub(super) fn into_thread(self, lua: &'lua Lua) -> LuaResult<LuaThread<'lua>> {
match self {
Self::Thread(t) => Ok(t),
Self::Function(f) => lua.create_thread(f),
}
}
}
impl<'lua> FromLua<'lua> for LuaThreadOrFunction<'lua> {
fn from_lua(value: LuaValue<'lua>, _: &'lua Lua) -> LuaResult<Self> {
match value {
LuaValue::Thread(t) => Ok(Self::Thread(t)),
LuaValue::Function(f) => Ok(Self::Function(f)),
value => Err(LuaError::FromLuaConversionError {
from: value.type_name(),
to: "LuaThreadOrFunction",
message: Some("Expected thread or function".to_string()),
}),
}
}
}
/**
Trait for any struct that can be turned into an [`LuaThread`]
and given to the scheduler, implemented for the following types:
- Lua threads ([`LuaThread`])
- Lua functions ([`LuaFunction`])
- Lua chunks ([`LuaChunk`])
*/
pub trait IntoLuaThread<'lua> {
/**
Converts the value into a Lua thread.
*/
fn into_lua_thread(self, lua: &'lua Lua) -> LuaResult<LuaThread<'lua>>;
}
impl<'lua> IntoLuaThread<'lua> for LuaThread<'lua> {
fn into_lua_thread(self, _: &'lua Lua) -> LuaResult<LuaThread<'lua>> {
Ok(self)
}
}
impl<'lua> IntoLuaThread<'lua> for LuaFunction<'lua> {
fn into_lua_thread(self, lua: &'lua Lua) -> LuaResult<LuaThread<'lua>> {
lua.create_thread(self)
}
}
impl<'lua, 'a> IntoLuaThread<'lua> for LuaChunk<'lua, 'a> {
fn into_lua_thread(self, lua: &'lua Lua) -> LuaResult<LuaThread<'lua>> {
lua.create_thread(self.into_function()?)
}
}