diff --git a/Cargo.lock b/Cargo.lock index 13ae8cd..ca25bc5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -188,6 +188,47 @@ dependencies = [ "slab", ] +[[package]] +name = "async-fs" +version = "2.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebcd09b382f40fcd159c2d695175b2ae620ffa5f3bd6f664131efff4e8b9e04a" +dependencies = [ + "async-lock", + "blocking", + "futures-lite", +] + +[[package]] +name = "async-io" +version = "2.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d6baa8f0178795da0e71bc42c9e5d13261aac7ee549853162e66a241ba17964" +dependencies = [ + "async-lock", + "cfg-if", + "concurrent-queue", + "futures-io", + "futures-lite", + "parking", + "polling", + "rustix", + "slab", + "tracing", + "windows-sys 0.52.0", +] + +[[package]] +name = "async-lock" +version = "3.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff6e472cdea888a4bd64f342f09b3f50e1886d32afe8df3d663c01140b811b18" +dependencies = [ + "event-listener 5.3.1", + "event-listener-strategy", + "pin-project-lite", +] + [[package]] name = "async-task" version = "4.7.1" @@ -946,6 +987,20 @@ dependencies = [ "slab", ] +[[package]] +name = "generator" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "186014d53bc231d0090ef8d6f03e0920c54d85a5ed22f4f2f74315ec56cf83fb" +dependencies = [ + "cc", + "cfg-if", + "libc", + "log", + "rustversion", + "windows", +] + [[package]] name = "generic-array" version = "0.14.7" @@ -1243,7 +1298,7 @@ dependencies = [ "iana-time-zone-haiku", "js-sys", "wasm-bindgen", - "windows-core", + "windows-core 0.52.0", ] [[package]] @@ -1425,6 +1480,19 @@ version = "0.4.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c" +[[package]] +name = "loom" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "419e0dc8046cb947daa77eb95ae174acfbddb7673b4151f56d1eed8e93fbfaca" +dependencies = [ + "cfg-if", + "generator", + "scoped-tls", + "tracing", + "tracing-subscriber", +] + [[package]] name = "luau0-src" version = "0.9.1+luau625" @@ -1746,10 +1814,10 @@ dependencies = [ [[package]] name = "mlua-luau-scheduler" version = "0.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a13eabdbc57fa38cf0b604d98ce3431573c79a964aac56e09c16c240d36cb1bf" dependencies = [ "async-executor", + "async-fs", + "async-io", "blocking", "concurrent-queue", "derive_more", @@ -1758,6 +1826,8 @@ dependencies = [ "mlua", "rustc-hash", "tracing", + "tracing-subscriber", + "tracing-tracy", ] [[package]] @@ -2029,6 +2099,21 @@ dependencies = [ "time 0.3.36", ] +[[package]] +name = "polling" +version = "3.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e6a007746f34ed64099e88783b0ae369eaa3da6392868ba262e2af9b8fbaea1" +dependencies = [ + "cfg-if", + "concurrent-queue", + "hermit-abi", + "pin-project-lite", + "rustix", + "tracing", + "windows-sys 0.52.0", +] + [[package]] name = "powerfmt" version = "0.2.0" @@ -2510,6 +2595,12 @@ dependencies = [ "untrusted", ] +[[package]] +name = "rustversion" +version = "1.0.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "955d28af4278de8121b7ebeb796b6a45735dc01436d898801014aced2773a3d6" + [[package]] name = "rustyline" version = "14.0.0" @@ -2547,6 +2638,12 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "scoped-tls" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1cf6437eb19a8f4a6cc0f7dca544973b0b78843adbfeb3683d1a94a0024a294" + [[package]] name = "scopeguard" version = "1.2.0" @@ -3233,6 +3330,37 @@ dependencies = [ "tracing-log", ] +[[package]] +name = "tracing-tracy" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6024d04f84a69fd0d1dc1eee3a2b070bd246530a0582f9982ae487cb6c703614" +dependencies = [ + "tracing-core", + "tracing-subscriber", + "tracy-client", +] + +[[package]] +name = "tracy-client" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59fb931a64ff88984f86d3e9bcd1ae8843aa7fe44dd0f8097527bc172351741d" +dependencies = [ + "loom", + "once_cell", + "tracy-client-sys", +] + +[[package]] +name = "tracy-client-sys" +version = "0.22.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d104d610dfa9dd154535102cc9c6164ae1fa37842bc2d9e83f9ac82b0ae0882" +dependencies = [ + "cc", +] + [[package]] name = "try-lock" version = "0.2.5" @@ -3511,6 +3639,16 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows" +version = "0.54.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9252e5725dbed82865af151df558e754e4a3c2c30818359eb17465f1346a1b49" +dependencies = [ + "windows-core 0.54.0", + "windows-targets 0.52.5", +] + [[package]] name = "windows-core" version = "0.52.0" @@ -3520,6 +3658,25 @@ dependencies = [ "windows-targets 0.52.5", ] +[[package]] +name = "windows-core" +version = "0.54.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "12661b9c89351d684a50a8a643ce5f608e20243b9fb84687800163429f161d65" +dependencies = [ + "windows-result", + "windows-targets 0.52.5", +] + +[[package]] +name = "windows-result" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "749f0da9cc72d82e600d8d2e44cadd0b9eedb9038f71a1c58556ac1c5791813b" +dependencies = [ + "windows-targets 0.52.5", +] + [[package]] name = "windows-sys" version = "0.48.0" diff --git a/Cargo.toml b/Cargo.toml index 904de54..221a0fb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,6 +16,7 @@ members = [ "crates/lune-std-stdio", "crates/lune-std-task", "crates/lune-utils", + "crates/mlua-luau-scheduler", ] # Profile for building the release binary, with the following options set: diff --git a/crates/lune-std-net/Cargo.toml b/crates/lune-std-net/Cargo.toml index 2cf086e..fa25858 100644 --- a/crates/lune-std-net/Cargo.toml +++ b/crates/lune-std-net/Cargo.toml @@ -14,7 +14,7 @@ workspace = true [dependencies] mlua = { version = "0.9.7", features = ["luau"] } -mlua-luau-scheduler = "0.0.2" +mlua-luau-scheduler = { version = "0.0.2", path = "../mlua-luau-scheduler" } bstr = "1.9" futures-util = "0.3" diff --git a/crates/lune-std-process/Cargo.toml b/crates/lune-std-process/Cargo.toml index a8d3fc5..83a792f 100644 --- a/crates/lune-std-process/Cargo.toml +++ b/crates/lune-std-process/Cargo.toml @@ -14,7 +14,7 @@ workspace = true [dependencies] mlua = { version = "0.9.7", features = ["luau"] } -mlua-luau-scheduler = "0.0.2" +mlua-luau-scheduler = { version = "0.0.2", path = "../mlua-luau-scheduler" } directories = "5.0" pin-project = "1.0" diff --git a/crates/lune-std-roblox/Cargo.toml b/crates/lune-std-roblox/Cargo.toml index 924e61c..a2cc387 100644 --- a/crates/lune-std-roblox/Cargo.toml +++ b/crates/lune-std-roblox/Cargo.toml @@ -14,7 +14,7 @@ workspace = true [dependencies] mlua = { version = "0.9.7", features = ["luau"] } -mlua-luau-scheduler = "0.0.2" +mlua-luau-scheduler = { version = "0.0.2", path = "../mlua-luau-scheduler" } once_cell = "1.17" rbx_cookie = { version = "0.1.4", default-features = false } diff --git a/crates/lune-std-stdio/Cargo.toml b/crates/lune-std-stdio/Cargo.toml index 7d3909e..2e26e98 100644 --- a/crates/lune-std-stdio/Cargo.toml +++ b/crates/lune-std-stdio/Cargo.toml @@ -15,7 +15,7 @@ workspace = true [dependencies] dialoguer = "0.11" mlua = { version = "0.9.7", features = ["luau"] } -mlua-luau-scheduler = "0.0.2" +mlua-luau-scheduler = { version = "0.0.2", path = "../mlua-luau-scheduler" } tokio = { version = "1", default-features = false, features = [ "io-std", diff --git a/crates/lune-std-task/Cargo.toml b/crates/lune-std-task/Cargo.toml index edc6e5a..b18fc7b 100644 --- a/crates/lune-std-task/Cargo.toml +++ b/crates/lune-std-task/Cargo.toml @@ -14,7 +14,7 @@ workspace = true [dependencies] mlua = { version = "0.9.7", features = ["luau"] } -mlua-luau-scheduler = "0.0.2" +mlua-luau-scheduler = { version = "0.0.2", path = "../mlua-luau-scheduler" } tokio = { version = "1", default-features = false, features = ["time"] } diff --git a/crates/lune-std/Cargo.toml b/crates/lune-std/Cargo.toml index da644c7..83ccab7 100644 --- a/crates/lune-std/Cargo.toml +++ b/crates/lune-std/Cargo.toml @@ -39,7 +39,7 @@ task = ["dep:lune-std-task"] [dependencies] mlua = { version = "0.9.7", features = ["luau"] } -mlua-luau-scheduler = "0.0.2" +mlua-luau-scheduler = { version = "0.0.2", path = "../mlua-luau-scheduler" } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" diff --git a/crates/lune/Cargo.toml b/crates/lune/Cargo.toml index 51fe1a4..ca5094f 100644 --- a/crates/lune/Cargo.toml +++ b/crates/lune/Cargo.toml @@ -51,7 +51,7 @@ workspace = true [dependencies] mlua = { version = "0.9.7", features = ["luau"] } -mlua-luau-scheduler = "0.0.2" +mlua-luau-scheduler = { version = "0.0.2", path = "../mlua-luau-scheduler" } anyhow = "1.0" console = "0.15" diff --git a/crates/mlua-luau-scheduler/Cargo.toml b/crates/mlua-luau-scheduler/Cargo.toml new file mode 100644 index 0000000..589d36d --- /dev/null +++ b/crates/mlua-luau-scheduler/Cargo.toml @@ -0,0 +1,67 @@ +[package] +name = "mlua-luau-scheduler" +version = "0.0.2" +edition = "2021" +license = "MPL-2.0" +repository = "https://github.com/lune-org/lune" +description = "Luau-based async scheduler, using mlua and async-executor" +readme = "README.md" +keywords = ["async", "luau", "scheduler"] +categories = ["async"] + +[lib] +path = "src/lib.rs" + +[lints] +workspace = true + +[dependencies] +async-executor = "1.8" +blocking = "1.5" +concurrent-queue = "2.4" +derive_more = "0.99" +event-listener = "4.0" +futures-lite = "2.2" +rustc-hash = "1.1" +tracing = "0.1" + +mlua = { version = "0.9.6", features = [ + "luau", + "luau-jit", + "async", + "serialize", +] } + +[dev-dependencies] +async-fs = "2.1" +async-io = "2.3" +tracing-subscriber = { version = "0.3", features = ["env-filter"] } +tracing-tracy = "0.11" + +[[example]] +name = "basic_sleep" +test = true + +[[example]] +name = "basic_spawn" +test = true + +[[example]] +name = "callbacks" +test = true + +[[example]] +name = "exit_code" +test = true + +[[example]] +name = "lots_of_threads" +test = true + +[[example]] +name = "scheduler_ordering" +test = true + +[[example]] +name = "tracy" +test = false diff --git a/crates/mlua-luau-scheduler/README.md b/crates/mlua-luau-scheduler/README.md new file mode 100644 index 0000000..e18ed3c --- /dev/null +++ b/crates/mlua-luau-scheduler/README.md @@ -0,0 +1,78 @@ + + + +# `mlua-luau-scheduler` + +An async scheduler for Luau, using [`mlua`][mlua] and built on top of [`async-executor`][async-executor]. + +This crate is runtime-agnostic and is compatible with any async runtime, including [Tokio][tokio], [smol][smol], [async-std][async-std], and others.
+However, since many dependencies are shared with [smol][smol], depending on it over other runtimes may be preferred. + +[async-executor]: https://crates.io/crates/async-executor +[async-std]: https://async.rs +[mlua]: https://crates.io/crates/mlua +[smol]: https://github.com/smol-rs/smol +[tokio]: https://tokio.rs + +## Example Usage + +### 1. Import dependencies + +```rs +use std::time::{Duration, Instant}; +use std::io::ErrorKind; + +use async_io::{block_on, Timer}; +use async_fs::read_to_string; + +use mlua::prelude::*; +use mlua_luau_scheduler::*; +``` + +### 2. Set up Lua environment + +```rs +let lua = Lua::new(); + +lua.globals().set( + "sleep", + lua.create_async_function(|_, duration: f64| async move { + let before = Instant::now(); + let after = Timer::after(Duration::from_secs_f64(duration)).await; + Ok((after - before).as_secs_f64()) + })?, +)?; + +lua.globals().set( + "readFile", + lua.create_async_function(|lua, path: String| async move { + // Spawn background task that does not take up resources on the lua thread + // Normally, futures in mlua can not be shared across threads, but this can + let task = lua.spawn(async move { + match read_to_string(path).await { + Ok(s) => Ok(Some(s)), + Err(e) if e.kind() == ErrorKind::NotFound => Ok(None), + Err(e) => Err(e), + } + }); + task.await.into_lua_err() + })?, +)?; +``` + +### 3. Set up scheduler, run threads + +```rs +let sched = Scheduler::new(&lua)?; + +// We can create multiple lua threads ... +let sleepThread = lua.load("sleep(0.1)"); +let fileThread = lua.load("readFile(\"Cargo.toml\")"); + +// ... spawn them both onto the scheduler ... +sched.push_thread_front(sleepThread, ()); +sched.push_thread_front(fileThread, ()); + +// ... and run until they finish +block_on(sched.run()); +``` diff --git a/crates/mlua-luau-scheduler/examples/basic_sleep.rs b/crates/mlua-luau-scheduler/examples/basic_sleep.rs new file mode 100644 index 0000000..228591d --- /dev/null +++ b/crates/mlua-luau-scheduler/examples/basic_sleep.rs @@ -0,0 +1,45 @@ +#![allow(clippy::missing_errors_doc)] +#![allow(clippy::cargo_common_metadata)] + +use std::time::{Duration, Instant}; + +use async_io::{block_on, Timer}; + +use mlua::prelude::*; +use mlua_luau_scheduler::Scheduler; + +const MAIN_SCRIPT: &str = include_str!("./lua/basic_sleep.luau"); + +pub fn main() -> LuaResult<()> { + tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .with_target(false) + .without_time() + .init(); + + // Set up persistent Lua environment + let lua = Lua::new(); + lua.globals().set( + "sleep", + lua.create_async_function(|_, duration: f64| async move { + let before = Instant::now(); + let after = Timer::after(Duration::from_secs_f64(duration)).await; + Ok((after - before).as_secs_f64()) + })?, + )?; + + // Load the main script into a scheduler + let sched = Scheduler::new(&lua); + let main = lua.load(MAIN_SCRIPT); + sched.push_thread_front(main, ())?; + + // Run until completion + block_on(sched.run()); + + Ok(()) +} + +#[test] +fn test_basic_sleep() -> LuaResult<()> { + main() +} diff --git a/crates/mlua-luau-scheduler/examples/basic_spawn.rs b/crates/mlua-luau-scheduler/examples/basic_spawn.rs new file mode 100644 index 0000000..8e65a1a --- /dev/null +++ b/crates/mlua-luau-scheduler/examples/basic_spawn.rs @@ -0,0 +1,64 @@ +#![allow(clippy::missing_errors_doc)] +#![allow(clippy::cargo_common_metadata)] + +use std::io::ErrorKind; + +use async_fs::read_to_string; +use async_io::block_on; + +use mlua::prelude::*; +use mlua_luau_scheduler::{LuaSpawnExt, Scheduler}; + +const MAIN_SCRIPT: &str = include_str!("./lua/basic_spawn.luau"); + +pub fn main() -> LuaResult<()> { + tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .with_target(false) + .without_time() + .init(); + + // Set up persistent Lua environment + let lua = Lua::new(); + lua.globals().set( + "readFile", + lua.create_async_function(|lua, path: String| async move { + // Spawn background task that does not take up resources on the Lua thread + let task = lua.spawn(async move { + match read_to_string(path).await { + Ok(s) => Ok(Some(s)), + Err(e) if e.kind() == ErrorKind::NotFound => Ok(None), + Err(e) => Err(e), + } + }); + + // Wait for it to complete + let result = task.await.into_lua_err(); + + // We can also spawn local tasks that do take up resources + // on the Lua thread, but that do not have the Send bound + if result.is_ok() { + lua.spawn_local(async move { + println!("File read successfully!"); + }); + } + + result + })?, + )?; + + // Load the main script into a scheduler + let sched = Scheduler::new(&lua); + let main = lua.load(MAIN_SCRIPT); + sched.push_thread_front(main, ())?; + + // Run until completion + block_on(sched.run()); + + Ok(()) +} + +#[test] +fn test_basic_spawn() -> LuaResult<()> { + main() +} diff --git a/crates/mlua-luau-scheduler/examples/callbacks.rs b/crates/mlua-luau-scheduler/examples/callbacks.rs new file mode 100644 index 0000000..44a28fe --- /dev/null +++ b/crates/mlua-luau-scheduler/examples/callbacks.rs @@ -0,0 +1,48 @@ +#![allow(clippy::missing_errors_doc)] +#![allow(clippy::missing_panics_doc)] +#![allow(clippy::cargo_common_metadata)] + +use mlua::prelude::*; +use mlua_luau_scheduler::Scheduler; + +use async_io::block_on; + +const MAIN_SCRIPT: &str = include_str!("./lua/callbacks.luau"); + +pub fn main() -> LuaResult<()> { + tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .with_target(false) + .without_time() + .init(); + + // Set up persistent Lua environment + let lua = Lua::new(); + + // Create a new scheduler with custom callbacks + let sched = Scheduler::new(&lua); + sched.set_error_callback(|e| { + println!( + "Captured error from Lua!\n{}\n{e}\n{}", + "-".repeat(15), + "-".repeat(15) + ); + }); + + // Load the main script into the scheduler, and keep track of the thread we spawn + let main = lua.load(MAIN_SCRIPT); + let id = sched.push_thread_front(main, ())?; + + // Run until completion + block_on(sched.run()); + + // We should have gotten the error back from our script + assert!(sched.get_thread_result(id).unwrap().is_err()); + + Ok(()) +} + +#[test] +fn test_callbacks() -> LuaResult<()> { + main() +} diff --git a/crates/mlua-luau-scheduler/examples/exit_code.rs b/crates/mlua-luau-scheduler/examples/exit_code.rs new file mode 100644 index 0000000..ee4a9a4 --- /dev/null +++ b/crates/mlua-luau-scheduler/examples/exit_code.rs @@ -0,0 +1,43 @@ +#![allow(clippy::missing_errors_doc)] +#![allow(clippy::missing_panics_doc)] +#![allow(clippy::cargo_common_metadata)] + +use async_io::block_on; + +use mlua::prelude::*; +use mlua_luau_scheduler::{Functions, Scheduler}; + +const MAIN_SCRIPT: &str = include_str!("./lua/exit_code.luau"); + +pub fn main() -> LuaResult<()> { + tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .with_target(false) + .without_time() + .init(); + + // Set up persistent Lua environment + let lua = Lua::new(); + let sched = Scheduler::new(&lua); + let fns = Functions::new(&lua)?; + + lua.globals().set("exit", fns.exit)?; + + // Load the main script into the scheduler + let main = lua.load(MAIN_SCRIPT); + sched.push_thread_front(main, ())?; + + // Run until completion + block_on(sched.run()); + + // Verify that we got a correct exit code + let code = sched.get_exit_code().unwrap_or_default(); + assert!(format!("{code:?}").contains("(1)")); + + Ok(()) +} + +#[test] +fn test_exit_code() -> LuaResult<()> { + main() +} diff --git a/crates/mlua-luau-scheduler/examples/lots_of_threads.rs b/crates/mlua-luau-scheduler/examples/lots_of_threads.rs new file mode 100644 index 0000000..33451aa --- /dev/null +++ b/crates/mlua-luau-scheduler/examples/lots_of_threads.rs @@ -0,0 +1,51 @@ +#![allow(clippy::missing_errors_doc)] +#![allow(clippy::cargo_common_metadata)] + +use std::time::Duration; + +use async_io::{block_on, Timer}; + +use mlua::prelude::*; +use mlua_luau_scheduler::{Functions, Scheduler}; + +const MAIN_SCRIPT: &str = include_str!("./lua/lots_of_threads.luau"); + +const ONE_NANOSECOND: Duration = Duration::from_nanos(1); + +pub fn main() -> LuaResult<()> { + tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .with_target(false) + .without_time() + .init(); + + // Set up persistent Lua environment + let lua = Lua::new(); + let sched = Scheduler::new(&lua); + let fns = Functions::new(&lua)?; + + lua.globals().set("spawn", fns.spawn)?; + lua.globals().set( + "sleep", + lua.create_async_function(|_, ()| async move { + // Obviously we can't sleep for a single nanosecond since + // this uses OS scheduling under the hood, but we can try + Timer::after(ONE_NANOSECOND).await; + Ok(()) + })?, + )?; + + // Load the main script into the scheduler + let main = lua.load(MAIN_SCRIPT); + sched.push_thread_front(main, ())?; + + // Run until completion + block_on(sched.run()); + + Ok(()) +} + +#[test] +fn test_lots_of_threads() -> LuaResult<()> { + main() +} diff --git a/crates/mlua-luau-scheduler/examples/lua/basic_sleep.luau b/crates/mlua-luau-scheduler/examples/lua/basic_sleep.luau new file mode 100644 index 0000000..74418d0 --- /dev/null +++ b/crates/mlua-luau-scheduler/examples/lua/basic_sleep.luau @@ -0,0 +1,13 @@ +--!nocheck +--!nolint UnknownGlobal + +print("Sleeping for 3 seconds...") + +sleep(1) +print("1 second passed") + +sleep(1) +print("2 seconds passed") + +sleep(1) +print("3 seconds passed") diff --git a/crates/mlua-luau-scheduler/examples/lua/basic_spawn.luau b/crates/mlua-luau-scheduler/examples/lua/basic_spawn.luau new file mode 100644 index 0000000..b8cce6b --- /dev/null +++ b/crates/mlua-luau-scheduler/examples/lua/basic_spawn.luau @@ -0,0 +1,17 @@ +--!nocheck +--!nolint UnknownGlobal + +local _, err = pcall(function() + local file = readFile("Cargo.toml") + if file ~= nil then + print("Cargo.toml found!") + print("Contents:") + print(file) + else + print("Cargo.toml not found!") + end +end) + +if err ~= nil then + print("Error while reading file: " .. err) +end diff --git a/crates/mlua-luau-scheduler/examples/lua/callbacks.luau b/crates/mlua-luau-scheduler/examples/lua/callbacks.luau new file mode 100644 index 0000000..77e249e --- /dev/null +++ b/crates/mlua-luau-scheduler/examples/lua/callbacks.luau @@ -0,0 +1,4 @@ +--!nocheck +--!nolint UnknownGlobal + +error("Oh no! Something went very very wrong!") diff --git a/crates/mlua-luau-scheduler/examples/lua/exit_code.luau b/crates/mlua-luau-scheduler/examples/lua/exit_code.luau new file mode 100644 index 0000000..0c627dd --- /dev/null +++ b/crates/mlua-luau-scheduler/examples/lua/exit_code.luau @@ -0,0 +1,8 @@ +--!nocheck +--!nolint UnknownGlobal + +print("Setting exit code manually") + +exit(1) + +error("unreachable") diff --git a/crates/mlua-luau-scheduler/examples/lua/lots_of_threads.luau b/crates/mlua-luau-scheduler/examples/lua/lots_of_threads.luau new file mode 100644 index 0000000..d25bd25 --- /dev/null +++ b/crates/mlua-luau-scheduler/examples/lua/lots_of_threads.luau @@ -0,0 +1,29 @@ +--!nocheck +--!nolint UnknownGlobal + +local NUM_BATCHES = 10 +local NUM_THREADS = 100_000 + +print(`Spawning {NUM_BATCHES * NUM_THREADS} threads split into {NUM_BATCHES} batches\n`) + +local before = os.clock() +for i = 1, NUM_BATCHES do + print(`Batch {i} of {NUM_BATCHES}`) + local thread = coroutine.running() + + local counter = 0 + for j = 1, NUM_THREADS do + spawn(function() + sleep(0.1) + counter += 1 + if counter == NUM_THREADS then + spawn(thread) + end + end) + end + + coroutine.yield() +end +local after = os.clock() + +print(`\nSpawned {NUM_BATCHES * NUM_THREADS} sleeping threads in {after - before}s`) diff --git a/crates/mlua-luau-scheduler/examples/lua/scheduler_ordering.luau b/crates/mlua-luau-scheduler/examples/lua/scheduler_ordering.luau new file mode 100644 index 0000000..b8aed74 --- /dev/null +++ b/crates/mlua-luau-scheduler/examples/lua/scheduler_ordering.luau @@ -0,0 +1,34 @@ +--!nocheck +--!nolint UnknownGlobal + +local nums = {} +local function insert(n: number) + table.insert(nums, n) + print(n) +end + +insert(1) + +-- Defer will run at the end of the resumption cycle, but without yielding +defer(function() + insert(5) +end) + +-- Spawn will instantly run up until the first yield, and must then be resumed manually ... +spawn(function() + insert(2) + coroutine.yield() + error("unreachable code") +end) + +-- ... unless calling functions created using `lua.create_async_function(...)`, +-- which will resume their calling thread with their result automatically +spawn(function() + insert(3) + sleep(1) + insert(6) +end) + +insert(4) + +return nums diff --git a/crates/mlua-luau-scheduler/examples/scheduler_ordering.rs b/crates/mlua-luau-scheduler/examples/scheduler_ordering.rs new file mode 100644 index 0000000..2fd4181 --- /dev/null +++ b/crates/mlua-luau-scheduler/examples/scheduler_ordering.rs @@ -0,0 +1,56 @@ +#![allow(clippy::missing_errors_doc)] +#![allow(clippy::missing_panics_doc)] +#![allow(clippy::cargo_common_metadata)] + +use std::time::{Duration, Instant}; + +use async_io::{block_on, Timer}; + +use mlua::prelude::*; +use mlua_luau_scheduler::{Functions, Scheduler}; + +const MAIN_SCRIPT: &str = include_str!("./lua/scheduler_ordering.luau"); + +pub fn main() -> LuaResult<()> { + tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .with_target(false) + .without_time() + .init(); + + // Set up persistent Lua environment + let lua = Lua::new(); + let sched = Scheduler::new(&lua); + let fns = Functions::new(&lua)?; + + lua.globals().set("spawn", fns.spawn)?; + lua.globals().set("defer", fns.defer)?; + lua.globals().set( + "sleep", + lua.create_async_function(|_, duration: Option| async move { + let duration = duration.unwrap_or_default().max(1.0 / 250.0); + let before = Instant::now(); + let after = Timer::after(Duration::from_secs_f64(duration)).await; + Ok((after - before).as_secs_f64()) + })?, + )?; + + // Load the main script into the scheduler, and keep track of the thread we spawn + let main = lua.load(MAIN_SCRIPT); + let id = sched.push_thread_front(main, ())?; + + // Run until completion + block_on(sched.run()); + + // We should have gotten proper values back from our script + let res = sched.get_thread_result(id).unwrap().unwrap(); + let nums = Vec::::from_lua_multi(res, &lua)?; + assert_eq!(nums, vec![1, 2, 3, 4, 5, 6]); + + Ok(()) +} + +#[test] +fn test_scheduler_ordering() -> LuaResult<()> { + main() +} diff --git a/crates/mlua-luau-scheduler/examples/tracy.rs b/crates/mlua-luau-scheduler/examples/tracy.rs new file mode 100644 index 0000000..01732c3 --- /dev/null +++ b/crates/mlua-luau-scheduler/examples/tracy.rs @@ -0,0 +1,61 @@ +/* + NOTE: This example is the same as "lots_of_threads", but with tracy set up for performance profiling. + + How to run: + + 1. Install tracy + - Follow the instructions at https://github.com/wolfpld/tracy + - Or install via something like homebrew: `brew install tracy` + 2. Run the server (`tracy`) in a terminal + 3. Run the example in another terminal + - `export RUST_LOG=trace` + - `cargo run --example tracy` +*/ + +#![allow(clippy::missing_errors_doc)] +#![allow(clippy::cargo_common_metadata)] + +use std::time::Duration; + +use async_io::{block_on, Timer}; +use tracing_subscriber::layer::SubscriberExt; +use tracing_tracy::{client::Client as TracyClient, TracyLayer}; + +use mlua::prelude::*; +use mlua_luau_scheduler::{Functions, Scheduler}; + +const MAIN_SCRIPT: &str = include_str!("./lua/lots_of_threads.luau"); + +const ONE_NANOSECOND: Duration = Duration::from_nanos(1); + +pub fn main() -> LuaResult<()> { + let _client = TracyClient::start(); + let _ = tracing::subscriber::set_global_default( + tracing_subscriber::registry().with(TracyLayer::default()), + ); + + // Set up persistent Lua environment + let lua = Lua::new(); + let sched = Scheduler::new(&lua); + let fns = Functions::new(&lua)?; + + lua.globals().set("spawn", fns.spawn)?; + lua.globals().set( + "sleep", + lua.create_async_function(|_, ()| async move { + // Obviously we can't sleep for a single nanosecond since + // this uses OS scheduling under the hood, but we can try + Timer::after(ONE_NANOSECOND).await; + Ok(()) + })?, + )?; + + // Load the main script into the scheduler + let main = lua.load(MAIN_SCRIPT); + sched.push_thread_front(main, ())?; + + // Run until completion + block_on(sched.run()); + + Ok(()) +} diff --git a/crates/mlua-luau-scheduler/src/error_callback.rs b/crates/mlua-luau-scheduler/src/error_callback.rs new file mode 100644 index 0000000..9d8e0a2 --- /dev/null +++ b/crates/mlua-luau-scheduler/src/error_callback.rs @@ -0,0 +1,45 @@ +use std::{cell::RefCell, rc::Rc}; + +use mlua::prelude::*; + +type ErrorCallback = Box; + +#[derive(Clone)] +pub(crate) struct ThreadErrorCallback { + inner: Rc>>, +} + +impl ThreadErrorCallback { + pub fn new() -> Self { + Self { + inner: Rc::new(RefCell::new(None)), + } + } + + pub fn replace(&self, callback: impl Fn(LuaError) + Send + 'static) { + self.inner.borrow_mut().replace(Box::new(callback)); + } + + pub fn clear(&self) { + self.inner.borrow_mut().take(); + } + + pub fn call(&self, error: &LuaError) { + if let Some(cb) = &*self.inner.borrow() { + cb(error.clone()); + } + } +} + +#[allow(clippy::needless_pass_by_value)] +fn default_error_callback(e: LuaError) { + eprintln!("{e}"); +} + +impl Default for ThreadErrorCallback { + fn default() -> Self { + let this = Self::new(); + this.replace(default_error_callback); + this + } +} diff --git a/crates/mlua-luau-scheduler/src/exit.rs b/crates/mlua-luau-scheduler/src/exit.rs new file mode 100644 index 0000000..a2794dd --- /dev/null +++ b/crates/mlua-luau-scheduler/src/exit.rs @@ -0,0 +1,31 @@ +use std::{cell::Cell, process::ExitCode, rc::Rc}; + +use event_listener::Event; + +#[derive(Debug, Clone)] +pub(crate) struct Exit { + code: Rc>>, + event: Rc, +} + +impl Exit { + pub fn new() -> Self { + Self { + code: Rc::new(Cell::new(None)), + event: Rc::new(Event::new()), + } + } + + pub fn set(&self, code: ExitCode) { + self.code.set(Some(code)); + self.event.notify(usize::MAX); + } + + pub fn get(&self) -> Option { + self.code.get() + } + + pub async fn listen(&self) { + self.event.listen().await; + } +} diff --git a/crates/mlua-luau-scheduler/src/functions.rs b/crates/mlua-luau-scheduler/src/functions.rs new file mode 100644 index 0000000..7230b99 --- /dev/null +++ b/crates/mlua-luau-scheduler/src/functions.rs @@ -0,0 +1,283 @@ +#![allow(unused_imports)] +#![allow(clippy::too_many_lines)] + +use std::process::ExitCode; + +use mlua::prelude::*; + +use crate::{ + error_callback::ThreadErrorCallback, + queue::{DeferredThreadQueue, SpawnedThreadQueue}, + result_map::ThreadResultMap, + scheduler::Scheduler, + thread_id::ThreadId, + traits::LuaSchedulerExt, + util::{is_poll_pending, LuaThreadOrFunction, ThreadResult}, +}; + +const ERR_METADATA_NOT_ATTACHED: &str = "\ +Lua state does not have scheduler metadata attached!\ +\nThis is most likely caused by creating functions outside of a scheduler.\ +\nScheduler functions must always be created from within an active scheduler.\ +"; + +const EXIT_IMPL_LUA: &str = r" +exit(...) +yield() +"; + +const WRAP_IMPL_LUA: &str = r" +local t = create(...) +return function(...) + local r = { resume(t, ...) } + if r[1] then + return select(2, unpack(r)) + else + error(r[2], 2) + end +end +"; + +/** + A collection of lua functions that may be called to interact with a [`Scheduler`]. + + Note that these may all be implemented using [`LuaSchedulerExt`], however, this struct + is implemented using internal (non-public) APIs, and generally has better performance. +*/ +pub struct Functions<'lua> { + /** + Implementation of `coroutine.resume` that handles async polling properly. + + Defers onto the scheduler queue if the thread calls an async function. + */ + pub resume: LuaFunction<'lua>, + /** + Implementation of `coroutine.wrap` that handles async polling properly. + + Defers onto the scheduler queue if the thread calls an async function. + */ + pub wrap: LuaFunction<'lua>, + /** + Resumes a function / thread once instantly, and runs until first yield. + + Spawns onto the scheduler queue if not completed. + */ + pub spawn: LuaFunction<'lua>, + /** + Defers a function / thread onto the scheduler queue. + + Does not resume instantly, only adds to the queue. + */ + pub defer: LuaFunction<'lua>, + /** + Cancels a function / thread, removing it from the queue. + */ + pub cancel: LuaFunction<'lua>, + /** + Exits the scheduler, stopping all other threads and closing the scheduler. + + Yields the calling thread to ensure that it does not continue. + */ + pub exit: LuaFunction<'lua>, +} + +impl<'lua> Functions<'lua> { + /** + Creates a new collection of Lua functions that may be called to interact with a [`Scheduler`]. + + # Errors + + Errors when out of memory, or if default Lua globals are missing. + + # Panics + + Panics when the given [`Lua`] instance does not have an attached [`Scheduler`]. + */ + pub fn new(lua: &'lua Lua) -> LuaResult { + let spawn_queue = lua + .app_data_ref::() + .expect(ERR_METADATA_NOT_ATTACHED) + .clone(); + let defer_queue = lua + .app_data_ref::() + .expect(ERR_METADATA_NOT_ATTACHED) + .clone(); + let error_callback = lua + .app_data_ref::() + .expect(ERR_METADATA_NOT_ATTACHED) + .clone(); + let result_map = lua + .app_data_ref::() + .expect(ERR_METADATA_NOT_ATTACHED) + .clone(); + + let resume_queue = defer_queue.clone(); + let resume_map = result_map.clone(); + let resume = + lua.create_function(move |lua, (thread, args): (LuaThread, LuaMultiValue)| { + let _span = tracing::trace_span!("Scheduler::fn_resume").entered(); + match thread.resume::<_, LuaMultiValue>(args.clone()) { + Ok(v) => { + if v.get(0).is_some_and(is_poll_pending) { + // Pending, defer to scheduler and return nil + resume_queue.push_item(lua, &thread, args)?; + (true, LuaValue::Nil).into_lua_multi(lua) + } else { + // Not pending, store the value if thread is done + if thread.status() != LuaThreadStatus::Resumable { + let id = ThreadId::from(&thread); + if resume_map.is_tracked(id) { + let res = ThreadResult::new(Ok(v.clone()), lua); + resume_map.insert(id, res); + } + } + (true, v).into_lua_multi(lua) + } + } + Err(e) => { + // Not pending, store the error + let id = ThreadId::from(&thread); + if resume_map.is_tracked(id) { + let res = ThreadResult::new(Err(e.clone()), lua); + resume_map.insert(id, res); + } + (false, e.to_string()).into_lua_multi(lua) + } + } + })?; + + let wrap_env = lua.create_table_from(vec![ + ("resume", resume.clone()), + ("error", lua.globals().get::<_, LuaFunction>("error")?), + ("select", lua.globals().get::<_, LuaFunction>("select")?), + ("unpack", lua.globals().get::<_, LuaFunction>("unpack")?), + ( + "create", + lua.globals() + .get::<_, LuaTable>("coroutine")? + .get::<_, LuaFunction>("create")?, + ), + ])?; + let wrap = lua + .load(WRAP_IMPL_LUA) + .set_name("=__scheduler_wrap") + .set_environment(wrap_env) + .into_function()?; + + let spawn_map = result_map.clone(); + let spawn = lua.create_function( + move |lua, (tof, args): (LuaThreadOrFunction, LuaMultiValue)| { + let _span = tracing::trace_span!("Scheduler::fn_spawn").entered(); + let thread = tof.into_thread(lua)?; + if thread.status() == LuaThreadStatus::Resumable { + // NOTE: We need to resume the thread once instantly for correct behavior, + // and only if we get the pending value back we can spawn to async executor + match thread.resume::<_, LuaMultiValue>(args.clone()) { + Ok(v) => { + if v.get(0).is_some_and(is_poll_pending) { + spawn_queue.push_item(lua, &thread, args)?; + } else { + // Not pending, store the value if thread is done + if thread.status() != LuaThreadStatus::Resumable { + let id = ThreadId::from(&thread); + if spawn_map.is_tracked(id) { + let res = ThreadResult::new(Ok(v), lua); + spawn_map.insert(id, res); + } + } + } + } + Err(e) => { + error_callback.call(&e); + // Not pending, store the error + let id = ThreadId::from(&thread); + if spawn_map.is_tracked(id) { + let res = ThreadResult::new(Err(e), lua); + spawn_map.insert(id, res); + } + } + }; + } + Ok(thread) + }, + )?; + + let defer = lua.create_function( + move |lua, (tof, args): (LuaThreadOrFunction, LuaMultiValue)| { + let _span = tracing::trace_span!("Scheduler::fn_defer").entered(); + let thread = tof.into_thread(lua)?; + if thread.status() == LuaThreadStatus::Resumable { + defer_queue.push_item(lua, &thread, args)?; + } + Ok(thread) + }, + )?; + + let close = lua + .globals() + .get::<_, LuaTable>("coroutine")? + .get::<_, LuaFunction>("close")?; + let close_key = lua.create_registry_value(close)?; + let cancel = lua.create_function(move |lua, thread: LuaThread| { + let _span = tracing::trace_span!("Scheduler::fn_cancel").entered(); + let close: LuaFunction = lua.registry_value(&close_key)?; + match close.call(thread) { + Err(LuaError::CoroutineInactive) | Ok(()) => Ok(()), + Err(e) => Err(e), + } + })?; + + let exit_env = lua.create_table_from(vec![ + ( + "exit", + lua.create_function(|lua, code: Option| { + let _span = tracing::trace_span!("Scheduler::fn_exit").entered(); + let code = code.map(ExitCode::from).unwrap_or_default(); + lua.set_exit_code(code); + Ok(()) + })?, + ), + ( + "yield", + lua.globals() + .get::<_, LuaTable>("coroutine")? + .get::<_, LuaFunction>("yield")?, + ), + ])?; + let exit = lua + .load(EXIT_IMPL_LUA) + .set_name("=__scheduler_exit") + .set_environment(exit_env) + .into_function()?; + + Ok(Self { + resume, + wrap, + spawn, + defer, + cancel, + exit, + }) + } +} + +impl Functions<'_> { + /** + Injects [`Scheduler`]-compatible functions into the given [`Lua`] instance. + + This will overwrite the following functions: + + - `coroutine.resume` + - `coroutine.wrap` + + # Errors + + Errors when out of memory, or if default Lua globals are missing. + */ + pub fn inject_compat(&self, lua: &Lua) -> LuaResult<()> { + let co: LuaTable = lua.globals().get("coroutine")?; + co.set("resume", self.resume.clone())?; + co.set("wrap", self.wrap.clone())?; + Ok(()) + } +} diff --git a/crates/mlua-luau-scheduler/src/lib.rs b/crates/mlua-luau-scheduler/src/lib.rs new file mode 100644 index 0000000..7b82595 --- /dev/null +++ b/crates/mlua-luau-scheduler/src/lib.rs @@ -0,0 +1,18 @@ +#![allow(clippy::cargo_common_metadata)] + +mod error_callback; +mod exit; +mod functions; +mod queue; +mod result_map; +mod scheduler; +mod status; +mod thread_id; +mod traits; +mod util; + +pub use functions::Functions; +pub use scheduler::Scheduler; +pub use status::Status; +pub use thread_id::ThreadId; +pub use traits::{IntoLuaThread, LuaSchedulerExt, LuaSpawnExt}; diff --git a/crates/mlua-luau-scheduler/src/queue.rs b/crates/mlua-luau-scheduler/src/queue.rs new file mode 100644 index 0000000..aabb259 --- /dev/null +++ b/crates/mlua-luau-scheduler/src/queue.rs @@ -0,0 +1,139 @@ +use std::{pin::Pin, rc::Rc}; + +use concurrent_queue::ConcurrentQueue; +use derive_more::{Deref, DerefMut}; +use event_listener::Event; +use futures_lite::{Future, FutureExt}; +use mlua::prelude::*; + +use crate::{traits::IntoLuaThread, util::ThreadWithArgs, ThreadId}; + +/** + Queue for storing [`LuaThread`]s with associated arguments. + + Provides methods for pushing and draining the queue, as + well as listening for new items being pushed to the queue. +*/ +#[derive(Debug, Clone)] +pub(crate) struct ThreadQueue { + queue: Rc>, + event: Rc, +} + +impl ThreadQueue { + pub fn new() -> Self { + let queue = Rc::new(ConcurrentQueue::unbounded()); + let event = Rc::new(Event::new()); + Self { queue, event } + } + + pub fn push_item<'lua>( + &self, + lua: &'lua Lua, + thread: impl IntoLuaThread<'lua>, + args: impl IntoLuaMulti<'lua>, + ) -> LuaResult { + let thread = thread.into_lua_thread(lua)?; + let args = args.into_lua_multi(lua)?; + + tracing::trace!("pushing item to queue with {} args", args.len()); + let id = ThreadId::from(&thread); + let stored = ThreadWithArgs::new(lua, thread, args)?; + + self.queue.push(stored).into_lua_err()?; + self.event.notify(usize::MAX); + + Ok(id) + } + + #[inline] + pub fn drain_items<'outer, 'lua>( + &'outer self, + lua: &'lua Lua, + ) -> impl Iterator, LuaMultiValue<'lua>)> + 'outer + where + 'lua: 'outer, + { + self.queue.try_iter().map(|stored| stored.into_inner(lua)) + } + + #[inline] + pub async fn wait_for_item(&self) { + if self.queue.is_empty() { + let listener = self.event.listen(); + // NOTE: Need to check again, we could have gotten + // new queued items while creating our listener + if self.queue.is_empty() { + listener.await; + } + } + } + + #[inline] + pub fn is_empty(&self) -> bool { + self.queue.is_empty() + } +} + +/** + Alias for [`ThreadQueue`], providing a newtype to store in Lua app data. +*/ +#[derive(Debug, Clone, Deref, DerefMut)] +pub(crate) struct SpawnedThreadQueue(ThreadQueue); + +impl SpawnedThreadQueue { + pub fn new() -> Self { + Self(ThreadQueue::new()) + } +} + +/** + Alias for [`ThreadQueue`], providing a newtype to store in Lua app data. +*/ +#[derive(Debug, Clone, Deref, DerefMut)] +pub(crate) struct DeferredThreadQueue(ThreadQueue); + +impl DeferredThreadQueue { + pub fn new() -> Self { + Self(ThreadQueue::new()) + } +} + +pub type LocalBoxFuture<'fut> = Pin + 'fut>>; + +/** + Queue for storing local futures. + + Provides methods for pushing and draining the queue, as + well as listening for new items being pushed to the queue. +*/ +#[derive(Debug, Clone)] +pub(crate) struct FuturesQueue<'fut> { + queue: Rc>>, + event: Rc, +} + +impl<'fut> FuturesQueue<'fut> { + pub fn new() -> Self { + let queue = Rc::new(ConcurrentQueue::unbounded()); + let event = Rc::new(Event::new()); + Self { queue, event } + } + + pub fn push_item(&self, fut: impl Future + 'fut) { + let _ = self.queue.push(fut.boxed_local()); + self.event.notify(usize::MAX); + } + + pub fn drain_items<'outer>( + &'outer self, + ) -> impl Iterator> + 'outer { + self.queue.try_iter() + } + + pub async fn wait_for_item(&self) { + if self.queue.is_empty() { + self.event.listen().await; + } + } +} diff --git a/crates/mlua-luau-scheduler/src/result_map.rs b/crates/mlua-luau-scheduler/src/result_map.rs new file mode 100644 index 0000000..fe08a5f --- /dev/null +++ b/crates/mlua-luau-scheduler/src/result_map.rs @@ -0,0 +1,64 @@ +#![allow(clippy::inline_always)] + +use std::{cell::RefCell, rc::Rc}; + +use event_listener::Event; +// NOTE: This is the hash algorithm that mlua also uses, so we +// are not adding any additional dependencies / bloat by using it. +use rustc_hash::{FxHashMap, FxHashSet}; + +use crate::{thread_id::ThreadId, util::ThreadResult}; + +#[derive(Clone)] +pub(crate) struct ThreadResultMap { + tracked: Rc>>, + results: Rc>>, + events: Rc>>>, +} + +impl ThreadResultMap { + pub fn new() -> Self { + Self { + tracked: Rc::new(RefCell::new(FxHashSet::default())), + results: Rc::new(RefCell::new(FxHashMap::default())), + events: Rc::new(RefCell::new(FxHashMap::default())), + } + } + + #[inline(always)] + pub fn track(&self, id: ThreadId) { + self.tracked.borrow_mut().insert(id); + } + + #[inline(always)] + pub fn is_tracked(&self, id: ThreadId) -> bool { + self.tracked.borrow().contains(&id) + } + + pub fn insert(&self, id: ThreadId, result: ThreadResult) { + debug_assert!(self.is_tracked(id), "Thread must be tracked"); + self.results.borrow_mut().insert(id, result); + if let Some(event) = self.events.borrow_mut().remove(&id) { + event.notify(usize::MAX); + } + } + + pub async fn listen(&self, id: ThreadId) { + debug_assert!(self.is_tracked(id), "Thread must be tracked"); + if !self.results.borrow().contains_key(&id) { + let listener = { + let mut events = self.events.borrow_mut(); + let event = events.entry(id).or_insert_with(|| Rc::new(Event::new())); + event.listen() + }; + listener.await; + } + } + + pub fn remove(&self, id: ThreadId) -> Option { + let res = self.results.borrow_mut().remove(&id)?; + self.tracked.borrow_mut().remove(&id); + self.events.borrow_mut().remove(&id); + Some(res) + } +} diff --git a/crates/mlua-luau-scheduler/src/scheduler.rs b/crates/mlua-luau-scheduler/src/scheduler.rs new file mode 100644 index 0000000..31f699e --- /dev/null +++ b/crates/mlua-luau-scheduler/src/scheduler.rs @@ -0,0 +1,484 @@ +#![allow(clippy::module_name_repetitions)] + +use std::{ + cell::Cell, + process::ExitCode, + rc::{Rc, Weak as WeakRc}, + sync::{Arc, Weak as WeakArc}, + thread::panicking, +}; + +use futures_lite::prelude::*; +use mlua::prelude::*; + +use async_executor::{Executor, LocalExecutor}; +use tracing::{debug, instrument, trace, trace_span, Instrument}; + +use crate::{ + error_callback::ThreadErrorCallback, + exit::Exit, + queue::{DeferredThreadQueue, FuturesQueue, SpawnedThreadQueue}, + result_map::ThreadResultMap, + status::Status, + thread_id::ThreadId, + traits::IntoLuaThread, + util::{run_until_yield, ThreadResult}, +}; + +const ERR_METADATA_ALREADY_ATTACHED: &str = "\ +Lua state already has scheduler metadata attached!\ +\nThis may be caused by running multiple schedulers on the same Lua state, or a call to Scheduler::run being cancelled.\ +\nOnly one scheduler can be used per Lua state at once, and schedulers must always run until completion.\ +"; + +const ERR_METADATA_REMOVED: &str = "\ +Lua state scheduler metadata was unexpectedly removed!\ +\nThis should never happen, and is likely a bug in the scheduler.\ +"; + +const ERR_SET_CALLBACK_WHEN_RUNNING: &str = "\ +Cannot set error callback when scheduler is running!\ +"; + +/** + A scheduler for running Lua threads and async tasks. +*/ +#[derive(Clone)] +pub struct Scheduler<'lua> { + lua: &'lua Lua, + queue_spawn: SpawnedThreadQueue, + queue_defer: DeferredThreadQueue, + error_callback: ThreadErrorCallback, + result_map: ThreadResultMap, + status: Rc>, + exit: Exit, +} + +impl<'lua> Scheduler<'lua> { + /** + Creates a new scheduler for the given Lua state. + + This scheduler will have a default error callback that prints errors to stderr. + + # Panics + + Panics if the given Lua state already has a scheduler attached to it. + */ + #[must_use] + pub fn new(lua: &'lua Lua) -> Scheduler<'lua> { + let queue_spawn = SpawnedThreadQueue::new(); + let queue_defer = DeferredThreadQueue::new(); + let error_callback = ThreadErrorCallback::default(); + let result_map = ThreadResultMap::new(); + let exit = Exit::new(); + + assert!( + lua.app_data_ref::().is_none(), + "{ERR_METADATA_ALREADY_ATTACHED}" + ); + assert!( + lua.app_data_ref::().is_none(), + "{ERR_METADATA_ALREADY_ATTACHED}" + ); + assert!( + lua.app_data_ref::().is_none(), + "{ERR_METADATA_ALREADY_ATTACHED}" + ); + assert!( + lua.app_data_ref::().is_none(), + "{ERR_METADATA_ALREADY_ATTACHED}" + ); + assert!( + lua.app_data_ref::().is_none(), + "{ERR_METADATA_ALREADY_ATTACHED}" + ); + + lua.set_app_data(queue_spawn.clone()); + lua.set_app_data(queue_defer.clone()); + lua.set_app_data(error_callback.clone()); + lua.set_app_data(result_map.clone()); + lua.set_app_data(exit.clone()); + + let status = Rc::new(Cell::new(Status::NotStarted)); + + Scheduler { + lua, + queue_spawn, + queue_defer, + error_callback, + result_map, + status, + exit, + } + } + + /** + Sets the current status of this scheduler and emits relevant tracing events. + */ + fn set_status(&self, status: Status) { + debug!(status = ?status, "status"); + self.status.set(status); + } + + /** + Returns the current status of this scheduler. + */ + #[must_use] + pub fn status(&self) -> Status { + self.status.get() + } + + /** + Sets the error callback for this scheduler. + + This callback will be called whenever a Lua thread errors. + + Overwrites any previous error callback. + + # Panics + + Panics if the scheduler is currently running. + */ + pub fn set_error_callback(&self, callback: impl Fn(LuaError) + Send + 'static) { + assert!( + !self.status().is_running(), + "{ERR_SET_CALLBACK_WHEN_RUNNING}" + ); + self.error_callback.replace(callback); + } + + /** + Clears the error callback for this scheduler. + + This will remove any current error callback, including default(s). + + # Panics + + Panics if the scheduler is currently running. + */ + pub fn remove_error_callback(&self) { + assert!( + !self.status().is_running(), + "{ERR_SET_CALLBACK_WHEN_RUNNING}" + ); + self.error_callback.clear(); + } + + /** + Gets the exit code for this scheduler, if one has been set. + */ + #[must_use] + pub fn get_exit_code(&self) -> Option { + self.exit.get() + } + + /** + Sets the exit code for this scheduler. + + This will cause [`Scheduler::run`] to exit immediately. + */ + pub fn set_exit_code(&self, code: ExitCode) { + self.exit.set(code); + } + + /** + Spawns a chunk / function / thread onto the scheduler queue. + + Threads are guaranteed to be resumed in the order that they were pushed to the queue. + + # Returns + + Returns a [`ThreadId`] that can be used to retrieve the result of the thread. + + Note that the result may not be available until [`Scheduler::run`] completes. + + # Errors + + Errors when out of memory. + */ + pub fn push_thread_front( + &self, + thread: impl IntoLuaThread<'lua>, + args: impl IntoLuaMulti<'lua>, + ) -> LuaResult { + let id = self.queue_spawn.push_item(self.lua, thread, args)?; + self.result_map.track(id); + Ok(id) + } + + /** + Defers a chunk / function / thread onto the scheduler queue. + + Deferred threads are guaranteed to run after all spawned threads either yield or complete. + + Threads are guaranteed to be resumed in the order that they were pushed to the queue. + + # Returns + + Returns a [`ThreadId`] that can be used to retrieve the result of the thread. + + Note that the result may not be available until [`Scheduler::run`] completes. + + # Errors + + Errors when out of memory. + */ + pub fn push_thread_back( + &self, + thread: impl IntoLuaThread<'lua>, + args: impl IntoLuaMulti<'lua>, + ) -> LuaResult { + let id = self.queue_defer.push_item(self.lua, thread, args)?; + self.result_map.track(id); + Ok(id) + } + + /** + Gets the tracked result for the [`LuaThread`] with the given [`ThreadId`]. + + Depending on the current [`Scheduler::status`], this method will return: + + - [`Status::NotStarted`]: returns `None`. + - [`Status::Running`]: may return `Some(Ok(v))` or `Some(Err(e))`, but it is not guaranteed. + - [`Status::Completed`]: returns `Some(Ok(v))` or `Some(Err(e))`. + + Note that this method also takes the value out of the scheduler and + stops tracking the given thread, so it may only be called once. + + Any subsequent calls after this method returns `Some` will return `None`. + */ + #[must_use] + pub fn get_thread_result(&self, id: ThreadId) -> Option>> { + self.result_map.remove(id).map(|r| r.value(self.lua)) + } + + /** + Waits for the [`LuaThread`] with the given [`ThreadId`] to complete. + + This will return instantly if the thread has already completed. + */ + pub async fn wait_for_thread(&self, id: ThreadId) { + self.result_map.listen(id).await; + } + + /** + Runs the scheduler until all Lua threads have completed. + + Note that the given Lua state must be the same one that was + used to create this scheduler, otherwise this method will panic. + + # Panics + + Panics if the given Lua state already has a scheduler attached to it. + */ + #[allow(clippy::too_many_lines)] + #[instrument(level = "debug", name = "Scheduler::run", skip(self))] + pub async fn run(&self) { + /* + Create new executors to use - note that we do not need create multiple executors + for work stealing, the user may do that themselves if they want to and it will work + just fine, as long as anything async is .await-ed from within a Lua async function. + + The main purpose of the two executors here is just to have one with + the Send bound, and another (local) one without it, for Lua scheduling. + + We also use the main executor to drive the main loop below forward, + saving a tiny bit of processing from going on the Lua executor itself. + */ + let local_exec = LocalExecutor::new(); + let main_exec = Arc::new(Executor::new()); + let fut_queue = Rc::new(FuturesQueue::new()); + + /* + Store the main executor and queue in Lua, so that they may be used with LuaSchedulerExt. + + Also ensure we do not already have an executor or queues - these are definite user errors + and may happen if the user tries to run multiple schedulers on the same Lua state at once. + */ + assert!( + self.lua.app_data_ref::>().is_none(), + "{ERR_METADATA_ALREADY_ATTACHED}" + ); + assert!( + self.lua.app_data_ref::>().is_none(), + "{ERR_METADATA_ALREADY_ATTACHED}" + ); + + self.lua.set_app_data(Arc::downgrade(&main_exec)); + self.lua.set_app_data(Rc::downgrade(&fut_queue.clone())); + + /* + Manually tick the Lua executor, while running under the main executor. + Each tick we wait for the next action to perform, in prioritized order: + + 1. The exit event is triggered by setting an exit code + 2. A Lua thread is available to run on the spawned queue + 3. A Lua thread is available to run on the deferred queue + 4. A new thread-local future is available to run on the local executor + 5. Task(s) scheduled on the Lua executor have made progress and should be polled again + + This ordering is vital to ensure that we don't accidentally exit the main loop + when there are new Lua threads to enqueue and potentially more work to be done. + */ + let fut = async { + let result_map = self.result_map.clone(); + let process_thread = |thread: LuaThread<'lua>, args| { + // NOTE: Thread may have been cancelled from Lua + // before we got here, so we need to check it again + if thread.status() == LuaThreadStatus::Resumable { + // Check if we should be tracking this thread + let id = ThreadId::from(&thread); + let id_tracked = result_map.is_tracked(id); + let result_map_inner = if id_tracked { + Some(result_map.clone()) + } else { + None + }; + // Create our future which will run the thread and store its final result + let fut = async move { + if id_tracked { + // Run until yield and check if we got a final result + if let Some(res) = run_until_yield(thread.clone(), args).await { + if let Err(e) = res.as_ref() { + self.error_callback.call(e); + } + if thread.status() != LuaThreadStatus::Resumable { + let thread_res = ThreadResult::new(res, self.lua); + result_map_inner.unwrap().insert(id, thread_res); + } + } + } else { + // Just run until yield + if let Some(res) = run_until_yield(thread, args).await { + if let Err(e) = res.as_ref() { + self.error_callback.call(e); + } + } + } + }; + // Spawn it on the executor + local_exec.spawn(fut).detach(); + } + }; + + loop { + let fut_exit = self.exit.listen(); // 1 + let fut_spawn = self.queue_spawn.wait_for_item(); // 2 + let fut_defer = self.queue_defer.wait_for_item(); // 3 + let fut_futs = fut_queue.wait_for_item(); // 4 + + // 5 + let mut num_processed = 0; + let span_tick = trace_span!("Scheduler::tick"); + let fut_tick = async { + local_exec.tick().await; + // NOTE: Try to do as much work as possible instead of just a single tick() + num_processed += 1; + while local_exec.try_tick() { + num_processed += 1; + } + }; + + // 1 + 2 + 3 + 4 + 5 + fut_exit + .or(fut_spawn) + .or(fut_defer) + .or(fut_futs) + .or(fut_tick.instrument(span_tick.or_current())) + .await; + + // Check if we should exit + if self.exit.get().is_some() { + debug!("exit signal received"); + break; + } + + // Process spawned threads first, then deferred threads, then futures + let mut num_spawned = 0; + let mut num_deferred = 0; + let mut num_futures = 0; + { + let _span = trace_span!("Scheduler::drain_spawned").entered(); + for (thread, args) in self.queue_spawn.drain_items(self.lua) { + process_thread(thread, args); + num_spawned += 1; + } + } + { + let _span = trace_span!("Scheduler::drain_deferred").entered(); + for (thread, args) in self.queue_defer.drain_items(self.lua) { + process_thread(thread, args); + num_deferred += 1; + } + } + { + let _span = trace_span!("Scheduler::drain_futures").entered(); + for fut in fut_queue.drain_items() { + local_exec.spawn(fut).detach(); + num_futures += 1; + } + } + + // Empty executor = we didn't spawn any new Lua tasks + // above, and there are no remaining tasks to run later + let completed = local_exec.is_empty() + && self.queue_spawn.is_empty() + && self.queue_defer.is_empty(); + trace!( + futures_spawned = num_futures, + futures_processed = num_processed, + lua_threads_spawned = num_spawned, + lua_threads_deferred = num_deferred, + "loop" + ); + if completed { + break; + } + } + }; + + // Run the executor inside a span until all lua threads complete + self.set_status(Status::Running); + main_exec.run(fut).await; + self.set_status(Status::Completed); + + // Clean up + self.lua + .remove_app_data::>() + .expect(ERR_METADATA_REMOVED); + self.lua + .remove_app_data::>() + .expect(ERR_METADATA_REMOVED); + } +} + +impl Drop for Scheduler<'_> { + fn drop(&mut self) { + if panicking() { + // Do not cause further panics if already panicking, as + // this may abort the program instead of safely unwinding + self.lua.remove_app_data::(); + self.lua.remove_app_data::(); + self.lua.remove_app_data::(); + self.lua.remove_app_data::(); + self.lua.remove_app_data::(); + } else { + // In any other case we panic if metadata was removed incorrectly + self.lua + .remove_app_data::() + .expect(ERR_METADATA_REMOVED); + self.lua + .remove_app_data::() + .expect(ERR_METADATA_REMOVED); + self.lua + .remove_app_data::() + .expect(ERR_METADATA_REMOVED); + self.lua + .remove_app_data::() + .expect(ERR_METADATA_REMOVED); + self.lua + .remove_app_data::() + .expect(ERR_METADATA_REMOVED); + } + } +} diff --git a/crates/mlua-luau-scheduler/src/status.rs b/crates/mlua-luau-scheduler/src/status.rs new file mode 100644 index 0000000..e9c139b --- /dev/null +++ b/crates/mlua-luau-scheduler/src/status.rs @@ -0,0 +1,31 @@ +#![allow(clippy::module_name_repetitions)] + +/** + The current status of a scheduler. +*/ +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub enum Status { + /// The scheduler has not yet started running. + NotStarted, + /// The scheduler is currently running. + Running, + /// The scheduler has completed. + Completed, +} + +impl Status { + #[must_use] + pub const fn is_not_started(self) -> bool { + matches!(self, Self::NotStarted) + } + + #[must_use] + pub const fn is_running(self) -> bool { + matches!(self, Self::Running) + } + + #[must_use] + pub const fn is_completed(self) -> bool { + matches!(self, Self::Completed) + } +} diff --git a/crates/mlua-luau-scheduler/src/thread_id.rs b/crates/mlua-luau-scheduler/src/thread_id.rs new file mode 100644 index 0000000..e2efcaa --- /dev/null +++ b/crates/mlua-luau-scheduler/src/thread_id.rs @@ -0,0 +1,30 @@ +use std::hash::{Hash, Hasher}; + +use mlua::prelude::*; + +/** + Opaque and unique ID representing a [`LuaThread`]. + + Typically used for associating metadata with a thread in a structure such as a `HashMap`. + + Note that holding a `ThreadId` does not prevent the thread from being garbage collected. + The actual thread may or may not still exist and be active at any given point in time. +*/ +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct ThreadId { + inner: usize, +} + +impl From<&LuaThread<'_>> for ThreadId { + fn from(thread: &LuaThread) -> Self { + Self { + inner: thread.to_pointer() as usize, + } + } +} + +impl Hash for ThreadId { + fn hash(&self, state: &mut H) { + self.inner.hash(state); + } +} diff --git a/crates/mlua-luau-scheduler/src/traits.rs b/crates/mlua-luau-scheduler/src/traits.rs new file mode 100644 index 0000000..cbe2e6e --- /dev/null +++ b/crates/mlua-luau-scheduler/src/traits.rs @@ -0,0 +1,378 @@ +#![allow(unused_imports)] +#![allow(clippy::missing_errors_doc)] + +use std::{ + cell::Cell, future::Future, process::ExitCode, rc::Weak as WeakRc, sync::Weak as WeakArc, +}; + +use async_executor::{Executor, Task}; +use mlua::prelude::*; +use tracing::trace; + +use crate::{ + exit::Exit, + queue::{DeferredThreadQueue, FuturesQueue, SpawnedThreadQueue}, + result_map::ThreadResultMap, + scheduler::Scheduler, + thread_id::ThreadId, +}; + +/** + Trait for any struct that can be turned into an [`LuaThread`] + and passed to the scheduler, implemented for the following types: + + - Lua threads ([`LuaThread`]) + - Lua functions ([`LuaFunction`]) + - Lua chunks ([`LuaChunk`]) +*/ +pub trait IntoLuaThread<'lua> { + /** + Converts the value into a Lua thread. + + # Errors + + Errors when out of memory. + */ + fn into_lua_thread(self, lua: &'lua Lua) -> LuaResult>; +} + +impl<'lua> IntoLuaThread<'lua> for LuaThread<'lua> { + fn into_lua_thread(self, _: &'lua Lua) -> LuaResult> { + Ok(self) + } +} + +impl<'lua> IntoLuaThread<'lua> for LuaFunction<'lua> { + fn into_lua_thread(self, lua: &'lua Lua) -> LuaResult> { + lua.create_thread(self) + } +} + +impl<'lua> IntoLuaThread<'lua> for LuaChunk<'lua, '_> { + fn into_lua_thread(self, lua: &'lua Lua) -> LuaResult> { + lua.create_thread(self.into_function()?) + } +} + +impl<'lua, T> IntoLuaThread<'lua> for &T +where + T: IntoLuaThread<'lua> + Clone, +{ + fn into_lua_thread(self, lua: &'lua Lua) -> LuaResult> { + self.clone().into_lua_thread(lua) + } +} + +/** + Trait for interacting with the current [`Scheduler`]. + + Provides extra methods on the [`Lua`] struct for: + + - Setting the exit code and forcibly stopping the scheduler + - Pushing (spawning) and deferring (pushing to the back) lua threads + - Tracking and getting the result of lua threads +*/ +pub trait LuaSchedulerExt<'lua> { + /** + Sets the exit code of the current scheduler. + + See [`Scheduler::set_exit_code`] for more information. + + # Panics + + Panics if called outside of a running [`Scheduler`]. + */ + fn set_exit_code(&self, code: ExitCode); + + /** + Pushes (spawns) a lua thread to the **front** of the current scheduler. + + See [`Scheduler::push_thread_front`] for more information. + + # Panics + + Panics if called outside of a running [`Scheduler`]. + */ + fn push_thread_front( + &'lua self, + thread: impl IntoLuaThread<'lua>, + args: impl IntoLuaMulti<'lua>, + ) -> LuaResult; + + /** + Pushes (defers) a lua thread to the **back** of the current scheduler. + + See [`Scheduler::push_thread_back`] for more information. + + # Panics + + Panics if called outside of a running [`Scheduler`]. + */ + fn push_thread_back( + &'lua self, + thread: impl IntoLuaThread<'lua>, + args: impl IntoLuaMulti<'lua>, + ) -> LuaResult; + + /** + Registers the given thread to be tracked within the current scheduler. + + Must be called before waiting for a thread to complete or getting its result. + */ + fn track_thread(&'lua self, id: ThreadId); + + /** + Gets the result of the given thread. + + See [`Scheduler::get_thread_result`] for more information. + + # Panics + + Panics if called outside of a running [`Scheduler`]. + */ + fn get_thread_result(&'lua self, id: ThreadId) -> Option>>; + + /** + Waits for the given thread to complete. + + See [`Scheduler::wait_for_thread`] for more information. + + # Panics + + Panics if called outside of a running [`Scheduler`]. + */ + fn wait_for_thread(&'lua self, id: ThreadId) -> impl Future; +} + +/** + Trait for interacting with the [`Executor`] for the current [`Scheduler`]. + + Provides extra methods on the [`Lua`] struct for: + + - Spawning thread-local (`!Send`) futures on the current executor + - Spawning background (`Send`) futures on the current executor + - Spawning blocking tasks on a separate thread pool +*/ +pub trait LuaSpawnExt<'lua> { + /** + Spawns the given future on the current executor and returns its [`Task`]. + + # Panics + + Panics if called outside of a running [`Scheduler`]. + + # Example usage + + ```rust + use async_io::block_on; + + use mlua::prelude::*; + use mlua_luau_scheduler::*; + + fn main() -> LuaResult<()> { + let lua = Lua::new(); + + lua.globals().set( + "spawnBackgroundTask", + lua.create_async_function(|lua, ()| async move { + lua.spawn(async move { + println!("Hello from background task!"); + }).await; + Ok(()) + })? + )?; + + let sched = Scheduler::new(&lua); + sched.push_thread_front(lua.load("spawnBackgroundTask()"), ()); + block_on(sched.run()); + + Ok(()) + } + ``` + */ + fn spawn(&self, fut: F) -> Task + where + F: Future + Send + 'static, + T: Send + 'static; + + /** + Spawns the given thread-local future on the current executor. + + Note that this future will run detached and always to completion, + preventing the [`Scheduler`] was spawned on from completing until done. + + # Panics + + Panics if called outside of a running [`Scheduler`]. + + # Example usage + + ```rust + use async_io::block_on; + + use mlua::prelude::*; + use mlua_luau_scheduler::*; + + fn main() -> LuaResult<()> { + let lua = Lua::new(); + + lua.globals().set( + "spawnLocalTask", + lua.create_async_function(|lua, ()| async move { + lua.spawn_local(async move { + println!("Hello from local task!"); + }); + Ok(()) + })? + )?; + + let sched = Scheduler::new(&lua); + sched.push_thread_front(lua.load("spawnLocalTask()"), ()); + block_on(sched.run()); + + Ok(()) + } + ``` + */ + fn spawn_local(&self, fut: F) + where + F: Future + 'static; + + /** + Spawns the given blocking function and returns its [`Task`]. + + This function will run on a separate thread pool and not block the current executor. + + # Panics + + Panics if called outside of a running [`Scheduler`]. + + # Example usage + + ```rust + use async_io::block_on; + + use mlua::prelude::*; + use mlua_luau_scheduler::*; + + fn main() -> LuaResult<()> { + let lua = Lua::new(); + + lua.globals().set( + "spawnBlockingTask", + lua.create_async_function(|lua, ()| async move { + lua.spawn_blocking(|| { + println!("Hello from blocking task!"); + }).await; + Ok(()) + })? + )?; + + let sched = Scheduler::new(&lua); + sched.push_thread_front(lua.load("spawnBlockingTask()"), ()); + block_on(sched.run()); + + Ok(()) + } + ``` + */ + fn spawn_blocking(&self, f: F) -> Task + where + F: FnOnce() -> T + Send + 'static, + T: Send + 'static; +} + +impl<'lua> LuaSchedulerExt<'lua> for Lua { + fn set_exit_code(&self, code: ExitCode) { + let exit = self + .app_data_ref::() + .expect("exit code can only be set from within an active scheduler"); + exit.set(code); + } + + fn push_thread_front( + &'lua self, + thread: impl IntoLuaThread<'lua>, + args: impl IntoLuaMulti<'lua>, + ) -> LuaResult { + let queue = self + .app_data_ref::() + .expect("lua threads can only be pushed from within an active scheduler"); + queue.push_item(self, thread, args) + } + + fn push_thread_back( + &'lua self, + thread: impl IntoLuaThread<'lua>, + args: impl IntoLuaMulti<'lua>, + ) -> LuaResult { + let queue = self + .app_data_ref::() + .expect("lua threads can only be pushed from within an active scheduler"); + queue.push_item(self, thread, args) + } + + fn track_thread(&'lua self, id: ThreadId) { + let map = self + .app_data_ref::() + .expect("lua threads can only be tracked from within an active scheduler"); + map.track(id); + } + + fn get_thread_result(&'lua self, id: ThreadId) -> Option>> { + let map = self + .app_data_ref::() + .expect("lua threads results can only be retrieved from within an active scheduler"); + map.remove(id).map(|r| r.value(self)) + } + + fn wait_for_thread(&'lua self, id: ThreadId) -> impl Future { + let map = self + .app_data_ref::() + .expect("lua threads results can only be retrieved from within an active scheduler"); + async move { map.listen(id).await } + } +} + +impl<'lua> LuaSpawnExt<'lua> for Lua { + fn spawn(&self, fut: F) -> Task + where + F: Future + Send + 'static, + T: Send + 'static, + { + let exec = self + .app_data_ref::>() + .expect("tasks can only be spawned within an active scheduler") + .upgrade() + .expect("executor was dropped"); + trace!("spawning future on executor"); + exec.spawn(fut) + } + + fn spawn_local(&self, fut: F) + where + F: Future + 'static, + { + let queue = self + .app_data_ref::>() + .expect("tasks can only be spawned within an active scheduler") + .upgrade() + .expect("executor was dropped"); + trace!("spawning local task on executor"); + queue.push_item(fut); + } + + fn spawn_blocking(&self, f: F) -> Task + where + F: FnOnce() -> T + Send + 'static, + T: Send + 'static, + { + let exec = self + .app_data_ref::>() + .expect("tasks can only be spawned within an active scheduler") + .upgrade() + .expect("executor was dropped"); + trace!("spawning blocking task on executor"); + exec.spawn(blocking::unblock(f)) + } +} diff --git a/crates/mlua-luau-scheduler/src/util.rs b/crates/mlua-luau-scheduler/src/util.rs new file mode 100644 index 0000000..2fe537b --- /dev/null +++ b/crates/mlua-luau-scheduler/src/util.rs @@ -0,0 +1,147 @@ +use futures_lite::StreamExt; +use mlua::prelude::*; +use tracing::instrument; + +/** + Runs a Lua thread until it manually yields (using coroutine.yield), errors, or completes. + + May return `None` if the thread was cancelled. + + Otherwise returns the values yielded by the thread, or the error that caused it to stop. +*/ +#[instrument(level = "trace", name = "Scheduler::run_until_yield", skip_all)] +pub(crate) async fn run_until_yield<'lua>( + thread: LuaThread<'lua>, + args: LuaMultiValue<'lua>, +) -> Option>> { + let mut stream = thread.into_async(args); + /* + NOTE: It is very important that we drop the thread/stream as + soon as we are done, it takes up valuable Lua registry space + and detached tasks will not drop until the executor does + + https://github.com/smol-rs/smol/issues/294 + + We also do not unwrap here since returning `None` is expected behavior for cancellation. + + Even though we are converting into a stream, and then immediately running it, + the future may still be cancelled before it is polled, which gives us None. + */ + stream.next().await +} + +/** + Checks if the given [`LuaValue`] is the async `POLL_PENDING` constant. +*/ +#[inline] +pub(crate) fn is_poll_pending(value: &LuaValue) -> bool { + value + .as_light_userdata() + .is_some_and(|l| l == Lua::poll_pending()) +} + +/** + Representation of a [`LuaResult`] with an associated [`LuaMultiValue`] currently stored in the Lua registry. +*/ +#[derive(Debug)] +pub(crate) struct ThreadResult { + inner: LuaResult, +} + +impl ThreadResult { + pub fn new(result: LuaResult, lua: &Lua) -> Self { + Self { + inner: match result { + Ok(v) => Ok({ + let vec = v.into_vec(); + lua.create_registry_value(vec).expect("out of memory") + }), + Err(e) => Err(e), + }, + } + } + + pub fn value(self, lua: &Lua) -> LuaResult { + match self.inner { + Ok(key) => { + let vec = lua.registry_value(&key).unwrap(); + lua.remove_registry_value(key).unwrap(); + Ok(LuaMultiValue::from_vec(vec)) + } + Err(e) => Err(e.clone()), + } + } +} + +/** + Representation of a [`LuaThread`] with its associated arguments currently stored in the Lua registry. +*/ +#[derive(Debug)] +pub(crate) struct ThreadWithArgs { + key_thread: LuaRegistryKey, + key_args: LuaRegistryKey, +} + +impl ThreadWithArgs { + pub fn new<'lua>( + lua: &'lua Lua, + thread: LuaThread<'lua>, + args: LuaMultiValue<'lua>, + ) -> LuaResult { + let argsv = args.into_vec(); + + let key_thread = lua.create_registry_value(thread)?; + let key_args = lua.create_registry_value(argsv)?; + + Ok(Self { + key_thread, + key_args, + }) + } + + pub fn into_inner(self, lua: &Lua) -> (LuaThread<'_>, LuaMultiValue<'_>) { + let thread = lua.registry_value(&self.key_thread).unwrap(); + let argsv = lua.registry_value(&self.key_args).unwrap(); + + let args = LuaMultiValue::from_vec(argsv); + + lua.remove_registry_value(self.key_thread).unwrap(); + lua.remove_registry_value(self.key_args).unwrap(); + + (thread, args) + } +} + +/** + Wrapper struct to accept either a Lua thread or a Lua function as function argument. + + [`LuaThreadOrFunction::into_thread`] may be used to convert the value into a Lua thread. +*/ +#[derive(Clone)] +pub(crate) enum LuaThreadOrFunction<'lua> { + Thread(LuaThread<'lua>), + Function(LuaFunction<'lua>), +} + +impl<'lua> LuaThreadOrFunction<'lua> { + pub(super) fn into_thread(self, lua: &'lua Lua) -> LuaResult> { + match self { + Self::Thread(t) => Ok(t), + Self::Function(f) => lua.create_thread(f), + } + } +} + +impl<'lua> FromLua<'lua> for LuaThreadOrFunction<'lua> { + fn from_lua(value: LuaValue<'lua>, _: &'lua Lua) -> LuaResult { + match value { + LuaValue::Thread(t) => Ok(Self::Thread(t)), + LuaValue::Function(f) => Ok(Self::Function(f)), + value => Err(LuaError::FromLuaConversionError { + from: value.type_name(), + to: "LuaThreadOrFunction", + message: Some("Expected thread or function".to_string()), + }), + } + } +}