mirror of
https://github.com/lune-org/lune.git
synced 2024-12-12 04:50:36 +00:00
Move mlua-luau-scheduler into this repository
This commit is contained in:
parent
5a292aabc5
commit
2a85532448
35 changed files with 2436 additions and 10 deletions
163
Cargo.lock
generated
163
Cargo.lock
generated
|
@ -188,6 +188,47 @@ dependencies = [
|
|||
"slab",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-fs"
|
||||
version = "2.1.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ebcd09b382f40fcd159c2d695175b2ae620ffa5f3bd6f664131efff4e8b9e04a"
|
||||
dependencies = [
|
||||
"async-lock",
|
||||
"blocking",
|
||||
"futures-lite",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-io"
|
||||
version = "2.3.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0d6baa8f0178795da0e71bc42c9e5d13261aac7ee549853162e66a241ba17964"
|
||||
dependencies = [
|
||||
"async-lock",
|
||||
"cfg-if",
|
||||
"concurrent-queue",
|
||||
"futures-io",
|
||||
"futures-lite",
|
||||
"parking",
|
||||
"polling",
|
||||
"rustix",
|
||||
"slab",
|
||||
"tracing",
|
||||
"windows-sys 0.52.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-lock"
|
||||
version = "3.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ff6e472cdea888a4bd64f342f09b3f50e1886d32afe8df3d663c01140b811b18"
|
||||
dependencies = [
|
||||
"event-listener 5.3.1",
|
||||
"event-listener-strategy",
|
||||
"pin-project-lite",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-task"
|
||||
version = "4.7.1"
|
||||
|
@ -946,6 +987,20 @@ dependencies = [
|
|||
"slab",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "generator"
|
||||
version = "0.8.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "186014d53bc231d0090ef8d6f03e0920c54d85a5ed22f4f2f74315ec56cf83fb"
|
||||
dependencies = [
|
||||
"cc",
|
||||
"cfg-if",
|
||||
"libc",
|
||||
"log",
|
||||
"rustversion",
|
||||
"windows",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "generic-array"
|
||||
version = "0.14.7"
|
||||
|
@ -1243,7 +1298,7 @@ dependencies = [
|
|||
"iana-time-zone-haiku",
|
||||
"js-sys",
|
||||
"wasm-bindgen",
|
||||
"windows-core",
|
||||
"windows-core 0.52.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -1425,6 +1480,19 @@ version = "0.4.21"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c"
|
||||
|
||||
[[package]]
|
||||
name = "loom"
|
||||
version = "0.7.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "419e0dc8046cb947daa77eb95ae174acfbddb7673b4151f56d1eed8e93fbfaca"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"generator",
|
||||
"scoped-tls",
|
||||
"tracing",
|
||||
"tracing-subscriber",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "luau0-src"
|
||||
version = "0.9.1+luau625"
|
||||
|
@ -1746,10 +1814,10 @@ dependencies = [
|
|||
[[package]]
|
||||
name = "mlua-luau-scheduler"
|
||||
version = "0.0.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a13eabdbc57fa38cf0b604d98ce3431573c79a964aac56e09c16c240d36cb1bf"
|
||||
dependencies = [
|
||||
"async-executor",
|
||||
"async-fs",
|
||||
"async-io",
|
||||
"blocking",
|
||||
"concurrent-queue",
|
||||
"derive_more",
|
||||
|
@ -1758,6 +1826,8 @@ dependencies = [
|
|||
"mlua",
|
||||
"rustc-hash",
|
||||
"tracing",
|
||||
"tracing-subscriber",
|
||||
"tracing-tracy",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -2029,6 +2099,21 @@ dependencies = [
|
|||
"time 0.3.36",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "polling"
|
||||
version = "3.7.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5e6a007746f34ed64099e88783b0ae369eaa3da6392868ba262e2af9b8fbaea1"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"concurrent-queue",
|
||||
"hermit-abi",
|
||||
"pin-project-lite",
|
||||
"rustix",
|
||||
"tracing",
|
||||
"windows-sys 0.52.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "powerfmt"
|
||||
version = "0.2.0"
|
||||
|
@ -2510,6 +2595,12 @@ dependencies = [
|
|||
"untrusted",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rustversion"
|
||||
version = "1.0.17"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "955d28af4278de8121b7ebeb796b6a45735dc01436d898801014aced2773a3d6"
|
||||
|
||||
[[package]]
|
||||
name = "rustyline"
|
||||
version = "14.0.0"
|
||||
|
@ -2547,6 +2638,12 @@ dependencies = [
|
|||
"winapi-util",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "scoped-tls"
|
||||
version = "1.0.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e1cf6437eb19a8f4a6cc0f7dca544973b0b78843adbfeb3683d1a94a0024a294"
|
||||
|
||||
[[package]]
|
||||
name = "scopeguard"
|
||||
version = "1.2.0"
|
||||
|
@ -3233,6 +3330,37 @@ dependencies = [
|
|||
"tracing-log",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tracing-tracy"
|
||||
version = "0.11.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6024d04f84a69fd0d1dc1eee3a2b070bd246530a0582f9982ae487cb6c703614"
|
||||
dependencies = [
|
||||
"tracing-core",
|
||||
"tracing-subscriber",
|
||||
"tracy-client",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tracy-client"
|
||||
version = "0.17.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "59fb931a64ff88984f86d3e9bcd1ae8843aa7fe44dd0f8097527bc172351741d"
|
||||
dependencies = [
|
||||
"loom",
|
||||
"once_cell",
|
||||
"tracy-client-sys",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tracy-client-sys"
|
||||
version = "0.22.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9d104d610dfa9dd154535102cc9c6164ae1fa37842bc2d9e83f9ac82b0ae0882"
|
||||
dependencies = [
|
||||
"cc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "try-lock"
|
||||
version = "0.2.5"
|
||||
|
@ -3511,6 +3639,16 @@ version = "0.4.0"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
|
||||
|
||||
[[package]]
|
||||
name = "windows"
|
||||
version = "0.54.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9252e5725dbed82865af151df558e754e4a3c2c30818359eb17465f1346a1b49"
|
||||
dependencies = [
|
||||
"windows-core 0.54.0",
|
||||
"windows-targets 0.52.5",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "windows-core"
|
||||
version = "0.52.0"
|
||||
|
@ -3520,6 +3658,25 @@ dependencies = [
|
|||
"windows-targets 0.52.5",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "windows-core"
|
||||
version = "0.54.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "12661b9c89351d684a50a8a643ce5f608e20243b9fb84687800163429f161d65"
|
||||
dependencies = [
|
||||
"windows-result",
|
||||
"windows-targets 0.52.5",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "windows-result"
|
||||
version = "0.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "749f0da9cc72d82e600d8d2e44cadd0b9eedb9038f71a1c58556ac1c5791813b"
|
||||
dependencies = [
|
||||
"windows-targets 0.52.5",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "windows-sys"
|
||||
version = "0.48.0"
|
||||
|
|
|
@ -16,6 +16,7 @@ members = [
|
|||
"crates/lune-std-stdio",
|
||||
"crates/lune-std-task",
|
||||
"crates/lune-utils",
|
||||
"crates/mlua-luau-scheduler",
|
||||
]
|
||||
|
||||
# Profile for building the release binary, with the following options set:
|
||||
|
|
|
@ -14,7 +14,7 @@ workspace = true
|
|||
|
||||
[dependencies]
|
||||
mlua = { version = "0.9.7", features = ["luau"] }
|
||||
mlua-luau-scheduler = "0.0.2"
|
||||
mlua-luau-scheduler = { version = "0.0.2", path = "../mlua-luau-scheduler" }
|
||||
|
||||
bstr = "1.9"
|
||||
futures-util = "0.3"
|
||||
|
|
|
@ -14,7 +14,7 @@ workspace = true
|
|||
|
||||
[dependencies]
|
||||
mlua = { version = "0.9.7", features = ["luau"] }
|
||||
mlua-luau-scheduler = "0.0.2"
|
||||
mlua-luau-scheduler = { version = "0.0.2", path = "../mlua-luau-scheduler" }
|
||||
|
||||
directories = "5.0"
|
||||
pin-project = "1.0"
|
||||
|
|
|
@ -14,7 +14,7 @@ workspace = true
|
|||
|
||||
[dependencies]
|
||||
mlua = { version = "0.9.7", features = ["luau"] }
|
||||
mlua-luau-scheduler = "0.0.2"
|
||||
mlua-luau-scheduler = { version = "0.0.2", path = "../mlua-luau-scheduler" }
|
||||
|
||||
once_cell = "1.17"
|
||||
rbx_cookie = { version = "0.1.4", default-features = false }
|
||||
|
|
|
@ -15,7 +15,7 @@ workspace = true
|
|||
[dependencies]
|
||||
dialoguer = "0.11"
|
||||
mlua = { version = "0.9.7", features = ["luau"] }
|
||||
mlua-luau-scheduler = "0.0.2"
|
||||
mlua-luau-scheduler = { version = "0.0.2", path = "../mlua-luau-scheduler" }
|
||||
|
||||
tokio = { version = "1", default-features = false, features = [
|
||||
"io-std",
|
||||
|
|
|
@ -14,7 +14,7 @@ workspace = true
|
|||
|
||||
[dependencies]
|
||||
mlua = { version = "0.9.7", features = ["luau"] }
|
||||
mlua-luau-scheduler = "0.0.2"
|
||||
mlua-luau-scheduler = { version = "0.0.2", path = "../mlua-luau-scheduler" }
|
||||
|
||||
tokio = { version = "1", default-features = false, features = ["time"] }
|
||||
|
||||
|
|
|
@ -39,7 +39,7 @@ task = ["dep:lune-std-task"]
|
|||
|
||||
[dependencies]
|
||||
mlua = { version = "0.9.7", features = ["luau"] }
|
||||
mlua-luau-scheduler = "0.0.2"
|
||||
mlua-luau-scheduler = { version = "0.0.2", path = "../mlua-luau-scheduler" }
|
||||
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
serde_json = "1.0"
|
||||
|
|
|
@ -51,7 +51,7 @@ workspace = true
|
|||
|
||||
[dependencies]
|
||||
mlua = { version = "0.9.7", features = ["luau"] }
|
||||
mlua-luau-scheduler = "0.0.2"
|
||||
mlua-luau-scheduler = { version = "0.0.2", path = "../mlua-luau-scheduler" }
|
||||
|
||||
anyhow = "1.0"
|
||||
console = "0.15"
|
||||
|
|
67
crates/mlua-luau-scheduler/Cargo.toml
Normal file
67
crates/mlua-luau-scheduler/Cargo.toml
Normal file
|
@ -0,0 +1,67 @@
|
|||
[package]
|
||||
name = "mlua-luau-scheduler"
|
||||
version = "0.0.2"
|
||||
edition = "2021"
|
||||
license = "MPL-2.0"
|
||||
repository = "https://github.com/lune-org/lune"
|
||||
description = "Luau-based async scheduler, using mlua and async-executor"
|
||||
readme = "README.md"
|
||||
keywords = ["async", "luau", "scheduler"]
|
||||
categories = ["async"]
|
||||
|
||||
[lib]
|
||||
path = "src/lib.rs"
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
async-executor = "1.8"
|
||||
blocking = "1.5"
|
||||
concurrent-queue = "2.4"
|
||||
derive_more = "0.99"
|
||||
event-listener = "4.0"
|
||||
futures-lite = "2.2"
|
||||
rustc-hash = "1.1"
|
||||
tracing = "0.1"
|
||||
|
||||
mlua = { version = "0.9.6", features = [
|
||||
"luau",
|
||||
"luau-jit",
|
||||
"async",
|
||||
"serialize",
|
||||
] }
|
||||
|
||||
[dev-dependencies]
|
||||
async-fs = "2.1"
|
||||
async-io = "2.3"
|
||||
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
|
||||
tracing-tracy = "0.11"
|
||||
|
||||
[[example]]
|
||||
name = "basic_sleep"
|
||||
test = true
|
||||
|
||||
[[example]]
|
||||
name = "basic_spawn"
|
||||
test = true
|
||||
|
||||
[[example]]
|
||||
name = "callbacks"
|
||||
test = true
|
||||
|
||||
[[example]]
|
||||
name = "exit_code"
|
||||
test = true
|
||||
|
||||
[[example]]
|
||||
name = "lots_of_threads"
|
||||
test = true
|
||||
|
||||
[[example]]
|
||||
name = "scheduler_ordering"
|
||||
test = true
|
||||
|
||||
[[example]]
|
||||
name = "tracy"
|
||||
test = false
|
78
crates/mlua-luau-scheduler/README.md
Normal file
78
crates/mlua-luau-scheduler/README.md
Normal file
|
@ -0,0 +1,78 @@
|
|||
<!-- markdownlint-disable MD033 -->
|
||||
<!-- markdownlint-disable MD041 -->
|
||||
|
||||
# `mlua-luau-scheduler`
|
||||
|
||||
An async scheduler for Luau, using [`mlua`][mlua] and built on top of [`async-executor`][async-executor].
|
||||
|
||||
This crate is runtime-agnostic and is compatible with any async runtime, including [Tokio][tokio], [smol][smol], [async-std][async-std], and others. </br>
|
||||
However, since many dependencies are shared with [smol][smol], depending on it over other runtimes may be preferred.
|
||||
|
||||
[async-executor]: https://crates.io/crates/async-executor
|
||||
[async-std]: https://async.rs
|
||||
[mlua]: https://crates.io/crates/mlua
|
||||
[smol]: https://github.com/smol-rs/smol
|
||||
[tokio]: https://tokio.rs
|
||||
|
||||
## Example Usage
|
||||
|
||||
### 1. Import dependencies
|
||||
|
||||
```rs
|
||||
use std::time::{Duration, Instant};
|
||||
use std::io::ErrorKind;
|
||||
|
||||
use async_io::{block_on, Timer};
|
||||
use async_fs::read_to_string;
|
||||
|
||||
use mlua::prelude::*;
|
||||
use mlua_luau_scheduler::*;
|
||||
```
|
||||
|
||||
### 2. Set up Lua environment
|
||||
|
||||
```rs
|
||||
let lua = Lua::new();
|
||||
|
||||
lua.globals().set(
|
||||
"sleep",
|
||||
lua.create_async_function(|_, duration: f64| async move {
|
||||
let before = Instant::now();
|
||||
let after = Timer::after(Duration::from_secs_f64(duration)).await;
|
||||
Ok((after - before).as_secs_f64())
|
||||
})?,
|
||||
)?;
|
||||
|
||||
lua.globals().set(
|
||||
"readFile",
|
||||
lua.create_async_function(|lua, path: String| async move {
|
||||
// Spawn background task that does not take up resources on the lua thread
|
||||
// Normally, futures in mlua can not be shared across threads, but this can
|
||||
let task = lua.spawn(async move {
|
||||
match read_to_string(path).await {
|
||||
Ok(s) => Ok(Some(s)),
|
||||
Err(e) if e.kind() == ErrorKind::NotFound => Ok(None),
|
||||
Err(e) => Err(e),
|
||||
}
|
||||
});
|
||||
task.await.into_lua_err()
|
||||
})?,
|
||||
)?;
|
||||
```
|
||||
|
||||
### 3. Set up scheduler, run threads
|
||||
|
||||
```rs
|
||||
let sched = Scheduler::new(&lua)?;
|
||||
|
||||
// We can create multiple lua threads ...
|
||||
let sleepThread = lua.load("sleep(0.1)");
|
||||
let fileThread = lua.load("readFile(\"Cargo.toml\")");
|
||||
|
||||
// ... spawn them both onto the scheduler ...
|
||||
sched.push_thread_front(sleepThread, ());
|
||||
sched.push_thread_front(fileThread, ());
|
||||
|
||||
// ... and run until they finish
|
||||
block_on(sched.run());
|
||||
```
|
45
crates/mlua-luau-scheduler/examples/basic_sleep.rs
Normal file
45
crates/mlua-luau-scheduler/examples/basic_sleep.rs
Normal file
|
@ -0,0 +1,45 @@
|
|||
#![allow(clippy::missing_errors_doc)]
|
||||
#![allow(clippy::cargo_common_metadata)]
|
||||
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use async_io::{block_on, Timer};
|
||||
|
||||
use mlua::prelude::*;
|
||||
use mlua_luau_scheduler::Scheduler;
|
||||
|
||||
const MAIN_SCRIPT: &str = include_str!("./lua/basic_sleep.luau");
|
||||
|
||||
pub fn main() -> LuaResult<()> {
|
||||
tracing_subscriber::fmt()
|
||||
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
|
||||
.with_target(false)
|
||||
.without_time()
|
||||
.init();
|
||||
|
||||
// Set up persistent Lua environment
|
||||
let lua = Lua::new();
|
||||
lua.globals().set(
|
||||
"sleep",
|
||||
lua.create_async_function(|_, duration: f64| async move {
|
||||
let before = Instant::now();
|
||||
let after = Timer::after(Duration::from_secs_f64(duration)).await;
|
||||
Ok((after - before).as_secs_f64())
|
||||
})?,
|
||||
)?;
|
||||
|
||||
// Load the main script into a scheduler
|
||||
let sched = Scheduler::new(&lua);
|
||||
let main = lua.load(MAIN_SCRIPT);
|
||||
sched.push_thread_front(main, ())?;
|
||||
|
||||
// Run until completion
|
||||
block_on(sched.run());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_basic_sleep() -> LuaResult<()> {
|
||||
main()
|
||||
}
|
64
crates/mlua-luau-scheduler/examples/basic_spawn.rs
Normal file
64
crates/mlua-luau-scheduler/examples/basic_spawn.rs
Normal file
|
@ -0,0 +1,64 @@
|
|||
#![allow(clippy::missing_errors_doc)]
|
||||
#![allow(clippy::cargo_common_metadata)]
|
||||
|
||||
use std::io::ErrorKind;
|
||||
|
||||
use async_fs::read_to_string;
|
||||
use async_io::block_on;
|
||||
|
||||
use mlua::prelude::*;
|
||||
use mlua_luau_scheduler::{LuaSpawnExt, Scheduler};
|
||||
|
||||
const MAIN_SCRIPT: &str = include_str!("./lua/basic_spawn.luau");
|
||||
|
||||
pub fn main() -> LuaResult<()> {
|
||||
tracing_subscriber::fmt()
|
||||
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
|
||||
.with_target(false)
|
||||
.without_time()
|
||||
.init();
|
||||
|
||||
// Set up persistent Lua environment
|
||||
let lua = Lua::new();
|
||||
lua.globals().set(
|
||||
"readFile",
|
||||
lua.create_async_function(|lua, path: String| async move {
|
||||
// Spawn background task that does not take up resources on the Lua thread
|
||||
let task = lua.spawn(async move {
|
||||
match read_to_string(path).await {
|
||||
Ok(s) => Ok(Some(s)),
|
||||
Err(e) if e.kind() == ErrorKind::NotFound => Ok(None),
|
||||
Err(e) => Err(e),
|
||||
}
|
||||
});
|
||||
|
||||
// Wait for it to complete
|
||||
let result = task.await.into_lua_err();
|
||||
|
||||
// We can also spawn local tasks that do take up resources
|
||||
// on the Lua thread, but that do not have the Send bound
|
||||
if result.is_ok() {
|
||||
lua.spawn_local(async move {
|
||||
println!("File read successfully!");
|
||||
});
|
||||
}
|
||||
|
||||
result
|
||||
})?,
|
||||
)?;
|
||||
|
||||
// Load the main script into a scheduler
|
||||
let sched = Scheduler::new(&lua);
|
||||
let main = lua.load(MAIN_SCRIPT);
|
||||
sched.push_thread_front(main, ())?;
|
||||
|
||||
// Run until completion
|
||||
block_on(sched.run());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_basic_spawn() -> LuaResult<()> {
|
||||
main()
|
||||
}
|
48
crates/mlua-luau-scheduler/examples/callbacks.rs
Normal file
48
crates/mlua-luau-scheduler/examples/callbacks.rs
Normal file
|
@ -0,0 +1,48 @@
|
|||
#![allow(clippy::missing_errors_doc)]
|
||||
#![allow(clippy::missing_panics_doc)]
|
||||
#![allow(clippy::cargo_common_metadata)]
|
||||
|
||||
use mlua::prelude::*;
|
||||
use mlua_luau_scheduler::Scheduler;
|
||||
|
||||
use async_io::block_on;
|
||||
|
||||
const MAIN_SCRIPT: &str = include_str!("./lua/callbacks.luau");
|
||||
|
||||
pub fn main() -> LuaResult<()> {
|
||||
tracing_subscriber::fmt()
|
||||
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
|
||||
.with_target(false)
|
||||
.without_time()
|
||||
.init();
|
||||
|
||||
// Set up persistent Lua environment
|
||||
let lua = Lua::new();
|
||||
|
||||
// Create a new scheduler with custom callbacks
|
||||
let sched = Scheduler::new(&lua);
|
||||
sched.set_error_callback(|e| {
|
||||
println!(
|
||||
"Captured error from Lua!\n{}\n{e}\n{}",
|
||||
"-".repeat(15),
|
||||
"-".repeat(15)
|
||||
);
|
||||
});
|
||||
|
||||
// Load the main script into the scheduler, and keep track of the thread we spawn
|
||||
let main = lua.load(MAIN_SCRIPT);
|
||||
let id = sched.push_thread_front(main, ())?;
|
||||
|
||||
// Run until completion
|
||||
block_on(sched.run());
|
||||
|
||||
// We should have gotten the error back from our script
|
||||
assert!(sched.get_thread_result(id).unwrap().is_err());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_callbacks() -> LuaResult<()> {
|
||||
main()
|
||||
}
|
43
crates/mlua-luau-scheduler/examples/exit_code.rs
Normal file
43
crates/mlua-luau-scheduler/examples/exit_code.rs
Normal file
|
@ -0,0 +1,43 @@
|
|||
#![allow(clippy::missing_errors_doc)]
|
||||
#![allow(clippy::missing_panics_doc)]
|
||||
#![allow(clippy::cargo_common_metadata)]
|
||||
|
||||
use async_io::block_on;
|
||||
|
||||
use mlua::prelude::*;
|
||||
use mlua_luau_scheduler::{Functions, Scheduler};
|
||||
|
||||
const MAIN_SCRIPT: &str = include_str!("./lua/exit_code.luau");
|
||||
|
||||
pub fn main() -> LuaResult<()> {
|
||||
tracing_subscriber::fmt()
|
||||
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
|
||||
.with_target(false)
|
||||
.without_time()
|
||||
.init();
|
||||
|
||||
// Set up persistent Lua environment
|
||||
let lua = Lua::new();
|
||||
let sched = Scheduler::new(&lua);
|
||||
let fns = Functions::new(&lua)?;
|
||||
|
||||
lua.globals().set("exit", fns.exit)?;
|
||||
|
||||
// Load the main script into the scheduler
|
||||
let main = lua.load(MAIN_SCRIPT);
|
||||
sched.push_thread_front(main, ())?;
|
||||
|
||||
// Run until completion
|
||||
block_on(sched.run());
|
||||
|
||||
// Verify that we got a correct exit code
|
||||
let code = sched.get_exit_code().unwrap_or_default();
|
||||
assert!(format!("{code:?}").contains("(1)"));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_exit_code() -> LuaResult<()> {
|
||||
main()
|
||||
}
|
51
crates/mlua-luau-scheduler/examples/lots_of_threads.rs
Normal file
51
crates/mlua-luau-scheduler/examples/lots_of_threads.rs
Normal file
|
@ -0,0 +1,51 @@
|
|||
#![allow(clippy::missing_errors_doc)]
|
||||
#![allow(clippy::cargo_common_metadata)]
|
||||
|
||||
use std::time::Duration;
|
||||
|
||||
use async_io::{block_on, Timer};
|
||||
|
||||
use mlua::prelude::*;
|
||||
use mlua_luau_scheduler::{Functions, Scheduler};
|
||||
|
||||
const MAIN_SCRIPT: &str = include_str!("./lua/lots_of_threads.luau");
|
||||
|
||||
const ONE_NANOSECOND: Duration = Duration::from_nanos(1);
|
||||
|
||||
pub fn main() -> LuaResult<()> {
|
||||
tracing_subscriber::fmt()
|
||||
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
|
||||
.with_target(false)
|
||||
.without_time()
|
||||
.init();
|
||||
|
||||
// Set up persistent Lua environment
|
||||
let lua = Lua::new();
|
||||
let sched = Scheduler::new(&lua);
|
||||
let fns = Functions::new(&lua)?;
|
||||
|
||||
lua.globals().set("spawn", fns.spawn)?;
|
||||
lua.globals().set(
|
||||
"sleep",
|
||||
lua.create_async_function(|_, ()| async move {
|
||||
// Obviously we can't sleep for a single nanosecond since
|
||||
// this uses OS scheduling under the hood, but we can try
|
||||
Timer::after(ONE_NANOSECOND).await;
|
||||
Ok(())
|
||||
})?,
|
||||
)?;
|
||||
|
||||
// Load the main script into the scheduler
|
||||
let main = lua.load(MAIN_SCRIPT);
|
||||
sched.push_thread_front(main, ())?;
|
||||
|
||||
// Run until completion
|
||||
block_on(sched.run());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_lots_of_threads() -> LuaResult<()> {
|
||||
main()
|
||||
}
|
13
crates/mlua-luau-scheduler/examples/lua/basic_sleep.luau
Normal file
13
crates/mlua-luau-scheduler/examples/lua/basic_sleep.luau
Normal file
|
@ -0,0 +1,13 @@
|
|||
--!nocheck
|
||||
--!nolint UnknownGlobal
|
||||
|
||||
print("Sleeping for 3 seconds...")
|
||||
|
||||
sleep(1)
|
||||
print("1 second passed")
|
||||
|
||||
sleep(1)
|
||||
print("2 seconds passed")
|
||||
|
||||
sleep(1)
|
||||
print("3 seconds passed")
|
17
crates/mlua-luau-scheduler/examples/lua/basic_spawn.luau
Normal file
17
crates/mlua-luau-scheduler/examples/lua/basic_spawn.luau
Normal file
|
@ -0,0 +1,17 @@
|
|||
--!nocheck
|
||||
--!nolint UnknownGlobal
|
||||
|
||||
local _, err = pcall(function()
|
||||
local file = readFile("Cargo.toml")
|
||||
if file ~= nil then
|
||||
print("Cargo.toml found!")
|
||||
print("Contents:")
|
||||
print(file)
|
||||
else
|
||||
print("Cargo.toml not found!")
|
||||
end
|
||||
end)
|
||||
|
||||
if err ~= nil then
|
||||
print("Error while reading file: " .. err)
|
||||
end
|
4
crates/mlua-luau-scheduler/examples/lua/callbacks.luau
Normal file
4
crates/mlua-luau-scheduler/examples/lua/callbacks.luau
Normal file
|
@ -0,0 +1,4 @@
|
|||
--!nocheck
|
||||
--!nolint UnknownGlobal
|
||||
|
||||
error("Oh no! Something went very very wrong!")
|
8
crates/mlua-luau-scheduler/examples/lua/exit_code.luau
Normal file
8
crates/mlua-luau-scheduler/examples/lua/exit_code.luau
Normal file
|
@ -0,0 +1,8 @@
|
|||
--!nocheck
|
||||
--!nolint UnknownGlobal
|
||||
|
||||
print("Setting exit code manually")
|
||||
|
||||
exit(1)
|
||||
|
||||
error("unreachable")
|
29
crates/mlua-luau-scheduler/examples/lua/lots_of_threads.luau
Normal file
29
crates/mlua-luau-scheduler/examples/lua/lots_of_threads.luau
Normal file
|
@ -0,0 +1,29 @@
|
|||
--!nocheck
|
||||
--!nolint UnknownGlobal
|
||||
|
||||
local NUM_BATCHES = 10
|
||||
local NUM_THREADS = 100_000
|
||||
|
||||
print(`Spawning {NUM_BATCHES * NUM_THREADS} threads split into {NUM_BATCHES} batches\n`)
|
||||
|
||||
local before = os.clock()
|
||||
for i = 1, NUM_BATCHES do
|
||||
print(`Batch {i} of {NUM_BATCHES}`)
|
||||
local thread = coroutine.running()
|
||||
|
||||
local counter = 0
|
||||
for j = 1, NUM_THREADS do
|
||||
spawn(function()
|
||||
sleep(0.1)
|
||||
counter += 1
|
||||
if counter == NUM_THREADS then
|
||||
spawn(thread)
|
||||
end
|
||||
end)
|
||||
end
|
||||
|
||||
coroutine.yield()
|
||||
end
|
||||
local after = os.clock()
|
||||
|
||||
print(`\nSpawned {NUM_BATCHES * NUM_THREADS} sleeping threads in {after - before}s`)
|
|
@ -0,0 +1,34 @@
|
|||
--!nocheck
|
||||
--!nolint UnknownGlobal
|
||||
|
||||
local nums = {}
|
||||
local function insert(n: number)
|
||||
table.insert(nums, n)
|
||||
print(n)
|
||||
end
|
||||
|
||||
insert(1)
|
||||
|
||||
-- Defer will run at the end of the resumption cycle, but without yielding
|
||||
defer(function()
|
||||
insert(5)
|
||||
end)
|
||||
|
||||
-- Spawn will instantly run up until the first yield, and must then be resumed manually ...
|
||||
spawn(function()
|
||||
insert(2)
|
||||
coroutine.yield()
|
||||
error("unreachable code")
|
||||
end)
|
||||
|
||||
-- ... unless calling functions created using `lua.create_async_function(...)`,
|
||||
-- which will resume their calling thread with their result automatically
|
||||
spawn(function()
|
||||
insert(3)
|
||||
sleep(1)
|
||||
insert(6)
|
||||
end)
|
||||
|
||||
insert(4)
|
||||
|
||||
return nums
|
56
crates/mlua-luau-scheduler/examples/scheduler_ordering.rs
Normal file
56
crates/mlua-luau-scheduler/examples/scheduler_ordering.rs
Normal file
|
@ -0,0 +1,56 @@
|
|||
#![allow(clippy::missing_errors_doc)]
|
||||
#![allow(clippy::missing_panics_doc)]
|
||||
#![allow(clippy::cargo_common_metadata)]
|
||||
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use async_io::{block_on, Timer};
|
||||
|
||||
use mlua::prelude::*;
|
||||
use mlua_luau_scheduler::{Functions, Scheduler};
|
||||
|
||||
const MAIN_SCRIPT: &str = include_str!("./lua/scheduler_ordering.luau");
|
||||
|
||||
pub fn main() -> LuaResult<()> {
|
||||
tracing_subscriber::fmt()
|
||||
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
|
||||
.with_target(false)
|
||||
.without_time()
|
||||
.init();
|
||||
|
||||
// Set up persistent Lua environment
|
||||
let lua = Lua::new();
|
||||
let sched = Scheduler::new(&lua);
|
||||
let fns = Functions::new(&lua)?;
|
||||
|
||||
lua.globals().set("spawn", fns.spawn)?;
|
||||
lua.globals().set("defer", fns.defer)?;
|
||||
lua.globals().set(
|
||||
"sleep",
|
||||
lua.create_async_function(|_, duration: Option<f64>| async move {
|
||||
let duration = duration.unwrap_or_default().max(1.0 / 250.0);
|
||||
let before = Instant::now();
|
||||
let after = Timer::after(Duration::from_secs_f64(duration)).await;
|
||||
Ok((after - before).as_secs_f64())
|
||||
})?,
|
||||
)?;
|
||||
|
||||
// Load the main script into the scheduler, and keep track of the thread we spawn
|
||||
let main = lua.load(MAIN_SCRIPT);
|
||||
let id = sched.push_thread_front(main, ())?;
|
||||
|
||||
// Run until completion
|
||||
block_on(sched.run());
|
||||
|
||||
// We should have gotten proper values back from our script
|
||||
let res = sched.get_thread_result(id).unwrap().unwrap();
|
||||
let nums = Vec::<usize>::from_lua_multi(res, &lua)?;
|
||||
assert_eq!(nums, vec![1, 2, 3, 4, 5, 6]);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_scheduler_ordering() -> LuaResult<()> {
|
||||
main()
|
||||
}
|
61
crates/mlua-luau-scheduler/examples/tracy.rs
Normal file
61
crates/mlua-luau-scheduler/examples/tracy.rs
Normal file
|
@ -0,0 +1,61 @@
|
|||
/*
|
||||
NOTE: This example is the same as "lots_of_threads", but with tracy set up for performance profiling.
|
||||
|
||||
How to run:
|
||||
|
||||
1. Install tracy
|
||||
- Follow the instructions at https://github.com/wolfpld/tracy
|
||||
- Or install via something like homebrew: `brew install tracy`
|
||||
2. Run the server (`tracy`) in a terminal
|
||||
3. Run the example in another terminal
|
||||
- `export RUST_LOG=trace`
|
||||
- `cargo run --example tracy`
|
||||
*/
|
||||
|
||||
#![allow(clippy::missing_errors_doc)]
|
||||
#![allow(clippy::cargo_common_metadata)]
|
||||
|
||||
use std::time::Duration;
|
||||
|
||||
use async_io::{block_on, Timer};
|
||||
use tracing_subscriber::layer::SubscriberExt;
|
||||
use tracing_tracy::{client::Client as TracyClient, TracyLayer};
|
||||
|
||||
use mlua::prelude::*;
|
||||
use mlua_luau_scheduler::{Functions, Scheduler};
|
||||
|
||||
const MAIN_SCRIPT: &str = include_str!("./lua/lots_of_threads.luau");
|
||||
|
||||
const ONE_NANOSECOND: Duration = Duration::from_nanos(1);
|
||||
|
||||
pub fn main() -> LuaResult<()> {
|
||||
let _client = TracyClient::start();
|
||||
let _ = tracing::subscriber::set_global_default(
|
||||
tracing_subscriber::registry().with(TracyLayer::default()),
|
||||
);
|
||||
|
||||
// Set up persistent Lua environment
|
||||
let lua = Lua::new();
|
||||
let sched = Scheduler::new(&lua);
|
||||
let fns = Functions::new(&lua)?;
|
||||
|
||||
lua.globals().set("spawn", fns.spawn)?;
|
||||
lua.globals().set(
|
||||
"sleep",
|
||||
lua.create_async_function(|_, ()| async move {
|
||||
// Obviously we can't sleep for a single nanosecond since
|
||||
// this uses OS scheduling under the hood, but we can try
|
||||
Timer::after(ONE_NANOSECOND).await;
|
||||
Ok(())
|
||||
})?,
|
||||
)?;
|
||||
|
||||
// Load the main script into the scheduler
|
||||
let main = lua.load(MAIN_SCRIPT);
|
||||
sched.push_thread_front(main, ())?;
|
||||
|
||||
// Run until completion
|
||||
block_on(sched.run());
|
||||
|
||||
Ok(())
|
||||
}
|
45
crates/mlua-luau-scheduler/src/error_callback.rs
Normal file
45
crates/mlua-luau-scheduler/src/error_callback.rs
Normal file
|
@ -0,0 +1,45 @@
|
|||
use std::{cell::RefCell, rc::Rc};
|
||||
|
||||
use mlua::prelude::*;
|
||||
|
||||
type ErrorCallback = Box<dyn Fn(LuaError) + Send + 'static>;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct ThreadErrorCallback {
|
||||
inner: Rc<RefCell<Option<ErrorCallback>>>,
|
||||
}
|
||||
|
||||
impl ThreadErrorCallback {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
inner: Rc::new(RefCell::new(None)),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn replace(&self, callback: impl Fn(LuaError) + Send + 'static) {
|
||||
self.inner.borrow_mut().replace(Box::new(callback));
|
||||
}
|
||||
|
||||
pub fn clear(&self) {
|
||||
self.inner.borrow_mut().take();
|
||||
}
|
||||
|
||||
pub fn call(&self, error: &LuaError) {
|
||||
if let Some(cb) = &*self.inner.borrow() {
|
||||
cb(error.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(clippy::needless_pass_by_value)]
|
||||
fn default_error_callback(e: LuaError) {
|
||||
eprintln!("{e}");
|
||||
}
|
||||
|
||||
impl Default for ThreadErrorCallback {
|
||||
fn default() -> Self {
|
||||
let this = Self::new();
|
||||
this.replace(default_error_callback);
|
||||
this
|
||||
}
|
||||
}
|
31
crates/mlua-luau-scheduler/src/exit.rs
Normal file
31
crates/mlua-luau-scheduler/src/exit.rs
Normal file
|
@ -0,0 +1,31 @@
|
|||
use std::{cell::Cell, process::ExitCode, rc::Rc};
|
||||
|
||||
use event_listener::Event;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct Exit {
|
||||
code: Rc<Cell<Option<ExitCode>>>,
|
||||
event: Rc<Event>,
|
||||
}
|
||||
|
||||
impl Exit {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
code: Rc::new(Cell::new(None)),
|
||||
event: Rc::new(Event::new()),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn set(&self, code: ExitCode) {
|
||||
self.code.set(Some(code));
|
||||
self.event.notify(usize::MAX);
|
||||
}
|
||||
|
||||
pub fn get(&self) -> Option<ExitCode> {
|
||||
self.code.get()
|
||||
}
|
||||
|
||||
pub async fn listen(&self) {
|
||||
self.event.listen().await;
|
||||
}
|
||||
}
|
283
crates/mlua-luau-scheduler/src/functions.rs
Normal file
283
crates/mlua-luau-scheduler/src/functions.rs
Normal file
|
@ -0,0 +1,283 @@
|
|||
#![allow(unused_imports)]
|
||||
#![allow(clippy::too_many_lines)]
|
||||
|
||||
use std::process::ExitCode;
|
||||
|
||||
use mlua::prelude::*;
|
||||
|
||||
use crate::{
|
||||
error_callback::ThreadErrorCallback,
|
||||
queue::{DeferredThreadQueue, SpawnedThreadQueue},
|
||||
result_map::ThreadResultMap,
|
||||
scheduler::Scheduler,
|
||||
thread_id::ThreadId,
|
||||
traits::LuaSchedulerExt,
|
||||
util::{is_poll_pending, LuaThreadOrFunction, ThreadResult},
|
||||
};
|
||||
|
||||
const ERR_METADATA_NOT_ATTACHED: &str = "\
|
||||
Lua state does not have scheduler metadata attached!\
|
||||
\nThis is most likely caused by creating functions outside of a scheduler.\
|
||||
\nScheduler functions must always be created from within an active scheduler.\
|
||||
";
|
||||
|
||||
const EXIT_IMPL_LUA: &str = r"
|
||||
exit(...)
|
||||
yield()
|
||||
";
|
||||
|
||||
const WRAP_IMPL_LUA: &str = r"
|
||||
local t = create(...)
|
||||
return function(...)
|
||||
local r = { resume(t, ...) }
|
||||
if r[1] then
|
||||
return select(2, unpack(r))
|
||||
else
|
||||
error(r[2], 2)
|
||||
end
|
||||
end
|
||||
";
|
||||
|
||||
/**
|
||||
A collection of lua functions that may be called to interact with a [`Scheduler`].
|
||||
|
||||
Note that these may all be implemented using [`LuaSchedulerExt`], however, this struct
|
||||
is implemented using internal (non-public) APIs, and generally has better performance.
|
||||
*/
|
||||
pub struct Functions<'lua> {
|
||||
/**
|
||||
Implementation of `coroutine.resume` that handles async polling properly.
|
||||
|
||||
Defers onto the scheduler queue if the thread calls an async function.
|
||||
*/
|
||||
pub resume: LuaFunction<'lua>,
|
||||
/**
|
||||
Implementation of `coroutine.wrap` that handles async polling properly.
|
||||
|
||||
Defers onto the scheduler queue if the thread calls an async function.
|
||||
*/
|
||||
pub wrap: LuaFunction<'lua>,
|
||||
/**
|
||||
Resumes a function / thread once instantly, and runs until first yield.
|
||||
|
||||
Spawns onto the scheduler queue if not completed.
|
||||
*/
|
||||
pub spawn: LuaFunction<'lua>,
|
||||
/**
|
||||
Defers a function / thread onto the scheduler queue.
|
||||
|
||||
Does not resume instantly, only adds to the queue.
|
||||
*/
|
||||
pub defer: LuaFunction<'lua>,
|
||||
/**
|
||||
Cancels a function / thread, removing it from the queue.
|
||||
*/
|
||||
pub cancel: LuaFunction<'lua>,
|
||||
/**
|
||||
Exits the scheduler, stopping all other threads and closing the scheduler.
|
||||
|
||||
Yields the calling thread to ensure that it does not continue.
|
||||
*/
|
||||
pub exit: LuaFunction<'lua>,
|
||||
}
|
||||
|
||||
impl<'lua> Functions<'lua> {
|
||||
/**
|
||||
Creates a new collection of Lua functions that may be called to interact with a [`Scheduler`].
|
||||
|
||||
# Errors
|
||||
|
||||
Errors when out of memory, or if default Lua globals are missing.
|
||||
|
||||
# Panics
|
||||
|
||||
Panics when the given [`Lua`] instance does not have an attached [`Scheduler`].
|
||||
*/
|
||||
pub fn new(lua: &'lua Lua) -> LuaResult<Self> {
|
||||
let spawn_queue = lua
|
||||
.app_data_ref::<SpawnedThreadQueue>()
|
||||
.expect(ERR_METADATA_NOT_ATTACHED)
|
||||
.clone();
|
||||
let defer_queue = lua
|
||||
.app_data_ref::<DeferredThreadQueue>()
|
||||
.expect(ERR_METADATA_NOT_ATTACHED)
|
||||
.clone();
|
||||
let error_callback = lua
|
||||
.app_data_ref::<ThreadErrorCallback>()
|
||||
.expect(ERR_METADATA_NOT_ATTACHED)
|
||||
.clone();
|
||||
let result_map = lua
|
||||
.app_data_ref::<ThreadResultMap>()
|
||||
.expect(ERR_METADATA_NOT_ATTACHED)
|
||||
.clone();
|
||||
|
||||
let resume_queue = defer_queue.clone();
|
||||
let resume_map = result_map.clone();
|
||||
let resume =
|
||||
lua.create_function(move |lua, (thread, args): (LuaThread, LuaMultiValue)| {
|
||||
let _span = tracing::trace_span!("Scheduler::fn_resume").entered();
|
||||
match thread.resume::<_, LuaMultiValue>(args.clone()) {
|
||||
Ok(v) => {
|
||||
if v.get(0).is_some_and(is_poll_pending) {
|
||||
// Pending, defer to scheduler and return nil
|
||||
resume_queue.push_item(lua, &thread, args)?;
|
||||
(true, LuaValue::Nil).into_lua_multi(lua)
|
||||
} else {
|
||||
// Not pending, store the value if thread is done
|
||||
if thread.status() != LuaThreadStatus::Resumable {
|
||||
let id = ThreadId::from(&thread);
|
||||
if resume_map.is_tracked(id) {
|
||||
let res = ThreadResult::new(Ok(v.clone()), lua);
|
||||
resume_map.insert(id, res);
|
||||
}
|
||||
}
|
||||
(true, v).into_lua_multi(lua)
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
// Not pending, store the error
|
||||
let id = ThreadId::from(&thread);
|
||||
if resume_map.is_tracked(id) {
|
||||
let res = ThreadResult::new(Err(e.clone()), lua);
|
||||
resume_map.insert(id, res);
|
||||
}
|
||||
(false, e.to_string()).into_lua_multi(lua)
|
||||
}
|
||||
}
|
||||
})?;
|
||||
|
||||
let wrap_env = lua.create_table_from(vec![
|
||||
("resume", resume.clone()),
|
||||
("error", lua.globals().get::<_, LuaFunction>("error")?),
|
||||
("select", lua.globals().get::<_, LuaFunction>("select")?),
|
||||
("unpack", lua.globals().get::<_, LuaFunction>("unpack")?),
|
||||
(
|
||||
"create",
|
||||
lua.globals()
|
||||
.get::<_, LuaTable>("coroutine")?
|
||||
.get::<_, LuaFunction>("create")?,
|
||||
),
|
||||
])?;
|
||||
let wrap = lua
|
||||
.load(WRAP_IMPL_LUA)
|
||||
.set_name("=__scheduler_wrap")
|
||||
.set_environment(wrap_env)
|
||||
.into_function()?;
|
||||
|
||||
let spawn_map = result_map.clone();
|
||||
let spawn = lua.create_function(
|
||||
move |lua, (tof, args): (LuaThreadOrFunction, LuaMultiValue)| {
|
||||
let _span = tracing::trace_span!("Scheduler::fn_spawn").entered();
|
||||
let thread = tof.into_thread(lua)?;
|
||||
if thread.status() == LuaThreadStatus::Resumable {
|
||||
// NOTE: We need to resume the thread once instantly for correct behavior,
|
||||
// and only if we get the pending value back we can spawn to async executor
|
||||
match thread.resume::<_, LuaMultiValue>(args.clone()) {
|
||||
Ok(v) => {
|
||||
if v.get(0).is_some_and(is_poll_pending) {
|
||||
spawn_queue.push_item(lua, &thread, args)?;
|
||||
} else {
|
||||
// Not pending, store the value if thread is done
|
||||
if thread.status() != LuaThreadStatus::Resumable {
|
||||
let id = ThreadId::from(&thread);
|
||||
if spawn_map.is_tracked(id) {
|
||||
let res = ThreadResult::new(Ok(v), lua);
|
||||
spawn_map.insert(id, res);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error_callback.call(&e);
|
||||
// Not pending, store the error
|
||||
let id = ThreadId::from(&thread);
|
||||
if spawn_map.is_tracked(id) {
|
||||
let res = ThreadResult::new(Err(e), lua);
|
||||
spawn_map.insert(id, res);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
Ok(thread)
|
||||
},
|
||||
)?;
|
||||
|
||||
let defer = lua.create_function(
|
||||
move |lua, (tof, args): (LuaThreadOrFunction, LuaMultiValue)| {
|
||||
let _span = tracing::trace_span!("Scheduler::fn_defer").entered();
|
||||
let thread = tof.into_thread(lua)?;
|
||||
if thread.status() == LuaThreadStatus::Resumable {
|
||||
defer_queue.push_item(lua, &thread, args)?;
|
||||
}
|
||||
Ok(thread)
|
||||
},
|
||||
)?;
|
||||
|
||||
let close = lua
|
||||
.globals()
|
||||
.get::<_, LuaTable>("coroutine")?
|
||||
.get::<_, LuaFunction>("close")?;
|
||||
let close_key = lua.create_registry_value(close)?;
|
||||
let cancel = lua.create_function(move |lua, thread: LuaThread| {
|
||||
let _span = tracing::trace_span!("Scheduler::fn_cancel").entered();
|
||||
let close: LuaFunction = lua.registry_value(&close_key)?;
|
||||
match close.call(thread) {
|
||||
Err(LuaError::CoroutineInactive) | Ok(()) => Ok(()),
|
||||
Err(e) => Err(e),
|
||||
}
|
||||
})?;
|
||||
|
||||
let exit_env = lua.create_table_from(vec![
|
||||
(
|
||||
"exit",
|
||||
lua.create_function(|lua, code: Option<u8>| {
|
||||
let _span = tracing::trace_span!("Scheduler::fn_exit").entered();
|
||||
let code = code.map(ExitCode::from).unwrap_or_default();
|
||||
lua.set_exit_code(code);
|
||||
Ok(())
|
||||
})?,
|
||||
),
|
||||
(
|
||||
"yield",
|
||||
lua.globals()
|
||||
.get::<_, LuaTable>("coroutine")?
|
||||
.get::<_, LuaFunction>("yield")?,
|
||||
),
|
||||
])?;
|
||||
let exit = lua
|
||||
.load(EXIT_IMPL_LUA)
|
||||
.set_name("=__scheduler_exit")
|
||||
.set_environment(exit_env)
|
||||
.into_function()?;
|
||||
|
||||
Ok(Self {
|
||||
resume,
|
||||
wrap,
|
||||
spawn,
|
||||
defer,
|
||||
cancel,
|
||||
exit,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl Functions<'_> {
|
||||
/**
|
||||
Injects [`Scheduler`]-compatible functions into the given [`Lua`] instance.
|
||||
|
||||
This will overwrite the following functions:
|
||||
|
||||
- `coroutine.resume`
|
||||
- `coroutine.wrap`
|
||||
|
||||
# Errors
|
||||
|
||||
Errors when out of memory, or if default Lua globals are missing.
|
||||
*/
|
||||
pub fn inject_compat(&self, lua: &Lua) -> LuaResult<()> {
|
||||
let co: LuaTable = lua.globals().get("coroutine")?;
|
||||
co.set("resume", self.resume.clone())?;
|
||||
co.set("wrap", self.wrap.clone())?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
18
crates/mlua-luau-scheduler/src/lib.rs
Normal file
18
crates/mlua-luau-scheduler/src/lib.rs
Normal file
|
@ -0,0 +1,18 @@
|
|||
#![allow(clippy::cargo_common_metadata)]
|
||||
|
||||
mod error_callback;
|
||||
mod exit;
|
||||
mod functions;
|
||||
mod queue;
|
||||
mod result_map;
|
||||
mod scheduler;
|
||||
mod status;
|
||||
mod thread_id;
|
||||
mod traits;
|
||||
mod util;
|
||||
|
||||
pub use functions::Functions;
|
||||
pub use scheduler::Scheduler;
|
||||
pub use status::Status;
|
||||
pub use thread_id::ThreadId;
|
||||
pub use traits::{IntoLuaThread, LuaSchedulerExt, LuaSpawnExt};
|
139
crates/mlua-luau-scheduler/src/queue.rs
Normal file
139
crates/mlua-luau-scheduler/src/queue.rs
Normal file
|
@ -0,0 +1,139 @@
|
|||
use std::{pin::Pin, rc::Rc};
|
||||
|
||||
use concurrent_queue::ConcurrentQueue;
|
||||
use derive_more::{Deref, DerefMut};
|
||||
use event_listener::Event;
|
||||
use futures_lite::{Future, FutureExt};
|
||||
use mlua::prelude::*;
|
||||
|
||||
use crate::{traits::IntoLuaThread, util::ThreadWithArgs, ThreadId};
|
||||
|
||||
/**
|
||||
Queue for storing [`LuaThread`]s with associated arguments.
|
||||
|
||||
Provides methods for pushing and draining the queue, as
|
||||
well as listening for new items being pushed to the queue.
|
||||
*/
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct ThreadQueue {
|
||||
queue: Rc<ConcurrentQueue<ThreadWithArgs>>,
|
||||
event: Rc<Event>,
|
||||
}
|
||||
|
||||
impl ThreadQueue {
|
||||
pub fn new() -> Self {
|
||||
let queue = Rc::new(ConcurrentQueue::unbounded());
|
||||
let event = Rc::new(Event::new());
|
||||
Self { queue, event }
|
||||
}
|
||||
|
||||
pub fn push_item<'lua>(
|
||||
&self,
|
||||
lua: &'lua Lua,
|
||||
thread: impl IntoLuaThread<'lua>,
|
||||
args: impl IntoLuaMulti<'lua>,
|
||||
) -> LuaResult<ThreadId> {
|
||||
let thread = thread.into_lua_thread(lua)?;
|
||||
let args = args.into_lua_multi(lua)?;
|
||||
|
||||
tracing::trace!("pushing item to queue with {} args", args.len());
|
||||
let id = ThreadId::from(&thread);
|
||||
let stored = ThreadWithArgs::new(lua, thread, args)?;
|
||||
|
||||
self.queue.push(stored).into_lua_err()?;
|
||||
self.event.notify(usize::MAX);
|
||||
|
||||
Ok(id)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn drain_items<'outer, 'lua>(
|
||||
&'outer self,
|
||||
lua: &'lua Lua,
|
||||
) -> impl Iterator<Item = (LuaThread<'lua>, LuaMultiValue<'lua>)> + 'outer
|
||||
where
|
||||
'lua: 'outer,
|
||||
{
|
||||
self.queue.try_iter().map(|stored| stored.into_inner(lua))
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub async fn wait_for_item(&self) {
|
||||
if self.queue.is_empty() {
|
||||
let listener = self.event.listen();
|
||||
// NOTE: Need to check again, we could have gotten
|
||||
// new queued items while creating our listener
|
||||
if self.queue.is_empty() {
|
||||
listener.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.queue.is_empty()
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
Alias for [`ThreadQueue`], providing a newtype to store in Lua app data.
|
||||
*/
|
||||
#[derive(Debug, Clone, Deref, DerefMut)]
|
||||
pub(crate) struct SpawnedThreadQueue(ThreadQueue);
|
||||
|
||||
impl SpawnedThreadQueue {
|
||||
pub fn new() -> Self {
|
||||
Self(ThreadQueue::new())
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
Alias for [`ThreadQueue`], providing a newtype to store in Lua app data.
|
||||
*/
|
||||
#[derive(Debug, Clone, Deref, DerefMut)]
|
||||
pub(crate) struct DeferredThreadQueue(ThreadQueue);
|
||||
|
||||
impl DeferredThreadQueue {
|
||||
pub fn new() -> Self {
|
||||
Self(ThreadQueue::new())
|
||||
}
|
||||
}
|
||||
|
||||
pub type LocalBoxFuture<'fut> = Pin<Box<dyn Future<Output = ()> + 'fut>>;
|
||||
|
||||
/**
|
||||
Queue for storing local futures.
|
||||
|
||||
Provides methods for pushing and draining the queue, as
|
||||
well as listening for new items being pushed to the queue.
|
||||
*/
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct FuturesQueue<'fut> {
|
||||
queue: Rc<ConcurrentQueue<LocalBoxFuture<'fut>>>,
|
||||
event: Rc<Event>,
|
||||
}
|
||||
|
||||
impl<'fut> FuturesQueue<'fut> {
|
||||
pub fn new() -> Self {
|
||||
let queue = Rc::new(ConcurrentQueue::unbounded());
|
||||
let event = Rc::new(Event::new());
|
||||
Self { queue, event }
|
||||
}
|
||||
|
||||
pub fn push_item(&self, fut: impl Future<Output = ()> + 'fut) {
|
||||
let _ = self.queue.push(fut.boxed_local());
|
||||
self.event.notify(usize::MAX);
|
||||
}
|
||||
|
||||
pub fn drain_items<'outer>(
|
||||
&'outer self,
|
||||
) -> impl Iterator<Item = LocalBoxFuture<'fut>> + 'outer {
|
||||
self.queue.try_iter()
|
||||
}
|
||||
|
||||
pub async fn wait_for_item(&self) {
|
||||
if self.queue.is_empty() {
|
||||
self.event.listen().await;
|
||||
}
|
||||
}
|
||||
}
|
64
crates/mlua-luau-scheduler/src/result_map.rs
Normal file
64
crates/mlua-luau-scheduler/src/result_map.rs
Normal file
|
@ -0,0 +1,64 @@
|
|||
#![allow(clippy::inline_always)]
|
||||
|
||||
use std::{cell::RefCell, rc::Rc};
|
||||
|
||||
use event_listener::Event;
|
||||
// NOTE: This is the hash algorithm that mlua also uses, so we
|
||||
// are not adding any additional dependencies / bloat by using it.
|
||||
use rustc_hash::{FxHashMap, FxHashSet};
|
||||
|
||||
use crate::{thread_id::ThreadId, util::ThreadResult};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct ThreadResultMap {
|
||||
tracked: Rc<RefCell<FxHashSet<ThreadId>>>,
|
||||
results: Rc<RefCell<FxHashMap<ThreadId, ThreadResult>>>,
|
||||
events: Rc<RefCell<FxHashMap<ThreadId, Rc<Event>>>>,
|
||||
}
|
||||
|
||||
impl ThreadResultMap {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
tracked: Rc::new(RefCell::new(FxHashSet::default())),
|
||||
results: Rc::new(RefCell::new(FxHashMap::default())),
|
||||
events: Rc::new(RefCell::new(FxHashMap::default())),
|
||||
}
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
pub fn track(&self, id: ThreadId) {
|
||||
self.tracked.borrow_mut().insert(id);
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
pub fn is_tracked(&self, id: ThreadId) -> bool {
|
||||
self.tracked.borrow().contains(&id)
|
||||
}
|
||||
|
||||
pub fn insert(&self, id: ThreadId, result: ThreadResult) {
|
||||
debug_assert!(self.is_tracked(id), "Thread must be tracked");
|
||||
self.results.borrow_mut().insert(id, result);
|
||||
if let Some(event) = self.events.borrow_mut().remove(&id) {
|
||||
event.notify(usize::MAX);
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn listen(&self, id: ThreadId) {
|
||||
debug_assert!(self.is_tracked(id), "Thread must be tracked");
|
||||
if !self.results.borrow().contains_key(&id) {
|
||||
let listener = {
|
||||
let mut events = self.events.borrow_mut();
|
||||
let event = events.entry(id).or_insert_with(|| Rc::new(Event::new()));
|
||||
event.listen()
|
||||
};
|
||||
listener.await;
|
||||
}
|
||||
}
|
||||
|
||||
pub fn remove(&self, id: ThreadId) -> Option<ThreadResult> {
|
||||
let res = self.results.borrow_mut().remove(&id)?;
|
||||
self.tracked.borrow_mut().remove(&id);
|
||||
self.events.borrow_mut().remove(&id);
|
||||
Some(res)
|
||||
}
|
||||
}
|
484
crates/mlua-luau-scheduler/src/scheduler.rs
Normal file
484
crates/mlua-luau-scheduler/src/scheduler.rs
Normal file
|
@ -0,0 +1,484 @@
|
|||
#![allow(clippy::module_name_repetitions)]
|
||||
|
||||
use std::{
|
||||
cell::Cell,
|
||||
process::ExitCode,
|
||||
rc::{Rc, Weak as WeakRc},
|
||||
sync::{Arc, Weak as WeakArc},
|
||||
thread::panicking,
|
||||
};
|
||||
|
||||
use futures_lite::prelude::*;
|
||||
use mlua::prelude::*;
|
||||
|
||||
use async_executor::{Executor, LocalExecutor};
|
||||
use tracing::{debug, instrument, trace, trace_span, Instrument};
|
||||
|
||||
use crate::{
|
||||
error_callback::ThreadErrorCallback,
|
||||
exit::Exit,
|
||||
queue::{DeferredThreadQueue, FuturesQueue, SpawnedThreadQueue},
|
||||
result_map::ThreadResultMap,
|
||||
status::Status,
|
||||
thread_id::ThreadId,
|
||||
traits::IntoLuaThread,
|
||||
util::{run_until_yield, ThreadResult},
|
||||
};
|
||||
|
||||
const ERR_METADATA_ALREADY_ATTACHED: &str = "\
|
||||
Lua state already has scheduler metadata attached!\
|
||||
\nThis may be caused by running multiple schedulers on the same Lua state, or a call to Scheduler::run being cancelled.\
|
||||
\nOnly one scheduler can be used per Lua state at once, and schedulers must always run until completion.\
|
||||
";
|
||||
|
||||
const ERR_METADATA_REMOVED: &str = "\
|
||||
Lua state scheduler metadata was unexpectedly removed!\
|
||||
\nThis should never happen, and is likely a bug in the scheduler.\
|
||||
";
|
||||
|
||||
const ERR_SET_CALLBACK_WHEN_RUNNING: &str = "\
|
||||
Cannot set error callback when scheduler is running!\
|
||||
";
|
||||
|
||||
/**
|
||||
A scheduler for running Lua threads and async tasks.
|
||||
*/
|
||||
#[derive(Clone)]
|
||||
pub struct Scheduler<'lua> {
|
||||
lua: &'lua Lua,
|
||||
queue_spawn: SpawnedThreadQueue,
|
||||
queue_defer: DeferredThreadQueue,
|
||||
error_callback: ThreadErrorCallback,
|
||||
result_map: ThreadResultMap,
|
||||
status: Rc<Cell<Status>>,
|
||||
exit: Exit,
|
||||
}
|
||||
|
||||
impl<'lua> Scheduler<'lua> {
|
||||
/**
|
||||
Creates a new scheduler for the given Lua state.
|
||||
|
||||
This scheduler will have a default error callback that prints errors to stderr.
|
||||
|
||||
# Panics
|
||||
|
||||
Panics if the given Lua state already has a scheduler attached to it.
|
||||
*/
|
||||
#[must_use]
|
||||
pub fn new(lua: &'lua Lua) -> Scheduler<'lua> {
|
||||
let queue_spawn = SpawnedThreadQueue::new();
|
||||
let queue_defer = DeferredThreadQueue::new();
|
||||
let error_callback = ThreadErrorCallback::default();
|
||||
let result_map = ThreadResultMap::new();
|
||||
let exit = Exit::new();
|
||||
|
||||
assert!(
|
||||
lua.app_data_ref::<SpawnedThreadQueue>().is_none(),
|
||||
"{ERR_METADATA_ALREADY_ATTACHED}"
|
||||
);
|
||||
assert!(
|
||||
lua.app_data_ref::<DeferredThreadQueue>().is_none(),
|
||||
"{ERR_METADATA_ALREADY_ATTACHED}"
|
||||
);
|
||||
assert!(
|
||||
lua.app_data_ref::<ThreadErrorCallback>().is_none(),
|
||||
"{ERR_METADATA_ALREADY_ATTACHED}"
|
||||
);
|
||||
assert!(
|
||||
lua.app_data_ref::<ThreadResultMap>().is_none(),
|
||||
"{ERR_METADATA_ALREADY_ATTACHED}"
|
||||
);
|
||||
assert!(
|
||||
lua.app_data_ref::<Exit>().is_none(),
|
||||
"{ERR_METADATA_ALREADY_ATTACHED}"
|
||||
);
|
||||
|
||||
lua.set_app_data(queue_spawn.clone());
|
||||
lua.set_app_data(queue_defer.clone());
|
||||
lua.set_app_data(error_callback.clone());
|
||||
lua.set_app_data(result_map.clone());
|
||||
lua.set_app_data(exit.clone());
|
||||
|
||||
let status = Rc::new(Cell::new(Status::NotStarted));
|
||||
|
||||
Scheduler {
|
||||
lua,
|
||||
queue_spawn,
|
||||
queue_defer,
|
||||
error_callback,
|
||||
result_map,
|
||||
status,
|
||||
exit,
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
Sets the current status of this scheduler and emits relevant tracing events.
|
||||
*/
|
||||
fn set_status(&self, status: Status) {
|
||||
debug!(status = ?status, "status");
|
||||
self.status.set(status);
|
||||
}
|
||||
|
||||
/**
|
||||
Returns the current status of this scheduler.
|
||||
*/
|
||||
#[must_use]
|
||||
pub fn status(&self) -> Status {
|
||||
self.status.get()
|
||||
}
|
||||
|
||||
/**
|
||||
Sets the error callback for this scheduler.
|
||||
|
||||
This callback will be called whenever a Lua thread errors.
|
||||
|
||||
Overwrites any previous error callback.
|
||||
|
||||
# Panics
|
||||
|
||||
Panics if the scheduler is currently running.
|
||||
*/
|
||||
pub fn set_error_callback(&self, callback: impl Fn(LuaError) + Send + 'static) {
|
||||
assert!(
|
||||
!self.status().is_running(),
|
||||
"{ERR_SET_CALLBACK_WHEN_RUNNING}"
|
||||
);
|
||||
self.error_callback.replace(callback);
|
||||
}
|
||||
|
||||
/**
|
||||
Clears the error callback for this scheduler.
|
||||
|
||||
This will remove any current error callback, including default(s).
|
||||
|
||||
# Panics
|
||||
|
||||
Panics if the scheduler is currently running.
|
||||
*/
|
||||
pub fn remove_error_callback(&self) {
|
||||
assert!(
|
||||
!self.status().is_running(),
|
||||
"{ERR_SET_CALLBACK_WHEN_RUNNING}"
|
||||
);
|
||||
self.error_callback.clear();
|
||||
}
|
||||
|
||||
/**
|
||||
Gets the exit code for this scheduler, if one has been set.
|
||||
*/
|
||||
#[must_use]
|
||||
pub fn get_exit_code(&self) -> Option<ExitCode> {
|
||||
self.exit.get()
|
||||
}
|
||||
|
||||
/**
|
||||
Sets the exit code for this scheduler.
|
||||
|
||||
This will cause [`Scheduler::run`] to exit immediately.
|
||||
*/
|
||||
pub fn set_exit_code(&self, code: ExitCode) {
|
||||
self.exit.set(code);
|
||||
}
|
||||
|
||||
/**
|
||||
Spawns a chunk / function / thread onto the scheduler queue.
|
||||
|
||||
Threads are guaranteed to be resumed in the order that they were pushed to the queue.
|
||||
|
||||
# Returns
|
||||
|
||||
Returns a [`ThreadId`] that can be used to retrieve the result of the thread.
|
||||
|
||||
Note that the result may not be available until [`Scheduler::run`] completes.
|
||||
|
||||
# Errors
|
||||
|
||||
Errors when out of memory.
|
||||
*/
|
||||
pub fn push_thread_front(
|
||||
&self,
|
||||
thread: impl IntoLuaThread<'lua>,
|
||||
args: impl IntoLuaMulti<'lua>,
|
||||
) -> LuaResult<ThreadId> {
|
||||
let id = self.queue_spawn.push_item(self.lua, thread, args)?;
|
||||
self.result_map.track(id);
|
||||
Ok(id)
|
||||
}
|
||||
|
||||
/**
|
||||
Defers a chunk / function / thread onto the scheduler queue.
|
||||
|
||||
Deferred threads are guaranteed to run after all spawned threads either yield or complete.
|
||||
|
||||
Threads are guaranteed to be resumed in the order that they were pushed to the queue.
|
||||
|
||||
# Returns
|
||||
|
||||
Returns a [`ThreadId`] that can be used to retrieve the result of the thread.
|
||||
|
||||
Note that the result may not be available until [`Scheduler::run`] completes.
|
||||
|
||||
# Errors
|
||||
|
||||
Errors when out of memory.
|
||||
*/
|
||||
pub fn push_thread_back(
|
||||
&self,
|
||||
thread: impl IntoLuaThread<'lua>,
|
||||
args: impl IntoLuaMulti<'lua>,
|
||||
) -> LuaResult<ThreadId> {
|
||||
let id = self.queue_defer.push_item(self.lua, thread, args)?;
|
||||
self.result_map.track(id);
|
||||
Ok(id)
|
||||
}
|
||||
|
||||
/**
|
||||
Gets the tracked result for the [`LuaThread`] with the given [`ThreadId`].
|
||||
|
||||
Depending on the current [`Scheduler::status`], this method will return:
|
||||
|
||||
- [`Status::NotStarted`]: returns `None`.
|
||||
- [`Status::Running`]: may return `Some(Ok(v))` or `Some(Err(e))`, but it is not guaranteed.
|
||||
- [`Status::Completed`]: returns `Some(Ok(v))` or `Some(Err(e))`.
|
||||
|
||||
Note that this method also takes the value out of the scheduler and
|
||||
stops tracking the given thread, so it may only be called once.
|
||||
|
||||
Any subsequent calls after this method returns `Some` will return `None`.
|
||||
*/
|
||||
#[must_use]
|
||||
pub fn get_thread_result(&self, id: ThreadId) -> Option<LuaResult<LuaMultiValue<'lua>>> {
|
||||
self.result_map.remove(id).map(|r| r.value(self.lua))
|
||||
}
|
||||
|
||||
/**
|
||||
Waits for the [`LuaThread`] with the given [`ThreadId`] to complete.
|
||||
|
||||
This will return instantly if the thread has already completed.
|
||||
*/
|
||||
pub async fn wait_for_thread(&self, id: ThreadId) {
|
||||
self.result_map.listen(id).await;
|
||||
}
|
||||
|
||||
/**
|
||||
Runs the scheduler until all Lua threads have completed.
|
||||
|
||||
Note that the given Lua state must be the same one that was
|
||||
used to create this scheduler, otherwise this method will panic.
|
||||
|
||||
# Panics
|
||||
|
||||
Panics if the given Lua state already has a scheduler attached to it.
|
||||
*/
|
||||
#[allow(clippy::too_many_lines)]
|
||||
#[instrument(level = "debug", name = "Scheduler::run", skip(self))]
|
||||
pub async fn run(&self) {
|
||||
/*
|
||||
Create new executors to use - note that we do not need create multiple executors
|
||||
for work stealing, the user may do that themselves if they want to and it will work
|
||||
just fine, as long as anything async is .await-ed from within a Lua async function.
|
||||
|
||||
The main purpose of the two executors here is just to have one with
|
||||
the Send bound, and another (local) one without it, for Lua scheduling.
|
||||
|
||||
We also use the main executor to drive the main loop below forward,
|
||||
saving a tiny bit of processing from going on the Lua executor itself.
|
||||
*/
|
||||
let local_exec = LocalExecutor::new();
|
||||
let main_exec = Arc::new(Executor::new());
|
||||
let fut_queue = Rc::new(FuturesQueue::new());
|
||||
|
||||
/*
|
||||
Store the main executor and queue in Lua, so that they may be used with LuaSchedulerExt.
|
||||
|
||||
Also ensure we do not already have an executor or queues - these are definite user errors
|
||||
and may happen if the user tries to run multiple schedulers on the same Lua state at once.
|
||||
*/
|
||||
assert!(
|
||||
self.lua.app_data_ref::<WeakArc<Executor>>().is_none(),
|
||||
"{ERR_METADATA_ALREADY_ATTACHED}"
|
||||
);
|
||||
assert!(
|
||||
self.lua.app_data_ref::<WeakRc<FuturesQueue>>().is_none(),
|
||||
"{ERR_METADATA_ALREADY_ATTACHED}"
|
||||
);
|
||||
|
||||
self.lua.set_app_data(Arc::downgrade(&main_exec));
|
||||
self.lua.set_app_data(Rc::downgrade(&fut_queue.clone()));
|
||||
|
||||
/*
|
||||
Manually tick the Lua executor, while running under the main executor.
|
||||
Each tick we wait for the next action to perform, in prioritized order:
|
||||
|
||||
1. The exit event is triggered by setting an exit code
|
||||
2. A Lua thread is available to run on the spawned queue
|
||||
3. A Lua thread is available to run on the deferred queue
|
||||
4. A new thread-local future is available to run on the local executor
|
||||
5. Task(s) scheduled on the Lua executor have made progress and should be polled again
|
||||
|
||||
This ordering is vital to ensure that we don't accidentally exit the main loop
|
||||
when there are new Lua threads to enqueue and potentially more work to be done.
|
||||
*/
|
||||
let fut = async {
|
||||
let result_map = self.result_map.clone();
|
||||
let process_thread = |thread: LuaThread<'lua>, args| {
|
||||
// NOTE: Thread may have been cancelled from Lua
|
||||
// before we got here, so we need to check it again
|
||||
if thread.status() == LuaThreadStatus::Resumable {
|
||||
// Check if we should be tracking this thread
|
||||
let id = ThreadId::from(&thread);
|
||||
let id_tracked = result_map.is_tracked(id);
|
||||
let result_map_inner = if id_tracked {
|
||||
Some(result_map.clone())
|
||||
} else {
|
||||
None
|
||||
};
|
||||
// Create our future which will run the thread and store its final result
|
||||
let fut = async move {
|
||||
if id_tracked {
|
||||
// Run until yield and check if we got a final result
|
||||
if let Some(res) = run_until_yield(thread.clone(), args).await {
|
||||
if let Err(e) = res.as_ref() {
|
||||
self.error_callback.call(e);
|
||||
}
|
||||
if thread.status() != LuaThreadStatus::Resumable {
|
||||
let thread_res = ThreadResult::new(res, self.lua);
|
||||
result_map_inner.unwrap().insert(id, thread_res);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Just run until yield
|
||||
if let Some(res) = run_until_yield(thread, args).await {
|
||||
if let Err(e) = res.as_ref() {
|
||||
self.error_callback.call(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
// Spawn it on the executor
|
||||
local_exec.spawn(fut).detach();
|
||||
}
|
||||
};
|
||||
|
||||
loop {
|
||||
let fut_exit = self.exit.listen(); // 1
|
||||
let fut_spawn = self.queue_spawn.wait_for_item(); // 2
|
||||
let fut_defer = self.queue_defer.wait_for_item(); // 3
|
||||
let fut_futs = fut_queue.wait_for_item(); // 4
|
||||
|
||||
// 5
|
||||
let mut num_processed = 0;
|
||||
let span_tick = trace_span!("Scheduler::tick");
|
||||
let fut_tick = async {
|
||||
local_exec.tick().await;
|
||||
// NOTE: Try to do as much work as possible instead of just a single tick()
|
||||
num_processed += 1;
|
||||
while local_exec.try_tick() {
|
||||
num_processed += 1;
|
||||
}
|
||||
};
|
||||
|
||||
// 1 + 2 + 3 + 4 + 5
|
||||
fut_exit
|
||||
.or(fut_spawn)
|
||||
.or(fut_defer)
|
||||
.or(fut_futs)
|
||||
.or(fut_tick.instrument(span_tick.or_current()))
|
||||
.await;
|
||||
|
||||
// Check if we should exit
|
||||
if self.exit.get().is_some() {
|
||||
debug!("exit signal received");
|
||||
break;
|
||||
}
|
||||
|
||||
// Process spawned threads first, then deferred threads, then futures
|
||||
let mut num_spawned = 0;
|
||||
let mut num_deferred = 0;
|
||||
let mut num_futures = 0;
|
||||
{
|
||||
let _span = trace_span!("Scheduler::drain_spawned").entered();
|
||||
for (thread, args) in self.queue_spawn.drain_items(self.lua) {
|
||||
process_thread(thread, args);
|
||||
num_spawned += 1;
|
||||
}
|
||||
}
|
||||
{
|
||||
let _span = trace_span!("Scheduler::drain_deferred").entered();
|
||||
for (thread, args) in self.queue_defer.drain_items(self.lua) {
|
||||
process_thread(thread, args);
|
||||
num_deferred += 1;
|
||||
}
|
||||
}
|
||||
{
|
||||
let _span = trace_span!("Scheduler::drain_futures").entered();
|
||||
for fut in fut_queue.drain_items() {
|
||||
local_exec.spawn(fut).detach();
|
||||
num_futures += 1;
|
||||
}
|
||||
}
|
||||
|
||||
// Empty executor = we didn't spawn any new Lua tasks
|
||||
// above, and there are no remaining tasks to run later
|
||||
let completed = local_exec.is_empty()
|
||||
&& self.queue_spawn.is_empty()
|
||||
&& self.queue_defer.is_empty();
|
||||
trace!(
|
||||
futures_spawned = num_futures,
|
||||
futures_processed = num_processed,
|
||||
lua_threads_spawned = num_spawned,
|
||||
lua_threads_deferred = num_deferred,
|
||||
"loop"
|
||||
);
|
||||
if completed {
|
||||
break;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// Run the executor inside a span until all lua threads complete
|
||||
self.set_status(Status::Running);
|
||||
main_exec.run(fut).await;
|
||||
self.set_status(Status::Completed);
|
||||
|
||||
// Clean up
|
||||
self.lua
|
||||
.remove_app_data::<WeakArc<Executor>>()
|
||||
.expect(ERR_METADATA_REMOVED);
|
||||
self.lua
|
||||
.remove_app_data::<WeakRc<FuturesQueue>>()
|
||||
.expect(ERR_METADATA_REMOVED);
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for Scheduler<'_> {
|
||||
fn drop(&mut self) {
|
||||
if panicking() {
|
||||
// Do not cause further panics if already panicking, as
|
||||
// this may abort the program instead of safely unwinding
|
||||
self.lua.remove_app_data::<SpawnedThreadQueue>();
|
||||
self.lua.remove_app_data::<DeferredThreadQueue>();
|
||||
self.lua.remove_app_data::<ThreadErrorCallback>();
|
||||
self.lua.remove_app_data::<ThreadResultMap>();
|
||||
self.lua.remove_app_data::<Exit>();
|
||||
} else {
|
||||
// In any other case we panic if metadata was removed incorrectly
|
||||
self.lua
|
||||
.remove_app_data::<SpawnedThreadQueue>()
|
||||
.expect(ERR_METADATA_REMOVED);
|
||||
self.lua
|
||||
.remove_app_data::<DeferredThreadQueue>()
|
||||
.expect(ERR_METADATA_REMOVED);
|
||||
self.lua
|
||||
.remove_app_data::<ThreadErrorCallback>()
|
||||
.expect(ERR_METADATA_REMOVED);
|
||||
self.lua
|
||||
.remove_app_data::<ThreadResultMap>()
|
||||
.expect(ERR_METADATA_REMOVED);
|
||||
self.lua
|
||||
.remove_app_data::<Exit>()
|
||||
.expect(ERR_METADATA_REMOVED);
|
||||
}
|
||||
}
|
||||
}
|
31
crates/mlua-luau-scheduler/src/status.rs
Normal file
31
crates/mlua-luau-scheduler/src/status.rs
Normal file
|
@ -0,0 +1,31 @@
|
|||
#![allow(clippy::module_name_repetitions)]
|
||||
|
||||
/**
|
||||
The current status of a scheduler.
|
||||
*/
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
|
||||
pub enum Status {
|
||||
/// The scheduler has not yet started running.
|
||||
NotStarted,
|
||||
/// The scheduler is currently running.
|
||||
Running,
|
||||
/// The scheduler has completed.
|
||||
Completed,
|
||||
}
|
||||
|
||||
impl Status {
|
||||
#[must_use]
|
||||
pub const fn is_not_started(self) -> bool {
|
||||
matches!(self, Self::NotStarted)
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub const fn is_running(self) -> bool {
|
||||
matches!(self, Self::Running)
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub const fn is_completed(self) -> bool {
|
||||
matches!(self, Self::Completed)
|
||||
}
|
||||
}
|
30
crates/mlua-luau-scheduler/src/thread_id.rs
Normal file
30
crates/mlua-luau-scheduler/src/thread_id.rs
Normal file
|
@ -0,0 +1,30 @@
|
|||
use std::hash::{Hash, Hasher};
|
||||
|
||||
use mlua::prelude::*;
|
||||
|
||||
/**
|
||||
Opaque and unique ID representing a [`LuaThread`].
|
||||
|
||||
Typically used for associating metadata with a thread in a structure such as a `HashMap<ThreadId, ...>`.
|
||||
|
||||
Note that holding a `ThreadId` does not prevent the thread from being garbage collected.
|
||||
The actual thread may or may not still exist and be active at any given point in time.
|
||||
*/
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub struct ThreadId {
|
||||
inner: usize,
|
||||
}
|
||||
|
||||
impl From<&LuaThread<'_>> for ThreadId {
|
||||
fn from(thread: &LuaThread) -> Self {
|
||||
Self {
|
||||
inner: thread.to_pointer() as usize,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Hash for ThreadId {
|
||||
fn hash<H: Hasher>(&self, state: &mut H) {
|
||||
self.inner.hash(state);
|
||||
}
|
||||
}
|
378
crates/mlua-luau-scheduler/src/traits.rs
Normal file
378
crates/mlua-luau-scheduler/src/traits.rs
Normal file
|
@ -0,0 +1,378 @@
|
|||
#![allow(unused_imports)]
|
||||
#![allow(clippy::missing_errors_doc)]
|
||||
|
||||
use std::{
|
||||
cell::Cell, future::Future, process::ExitCode, rc::Weak as WeakRc, sync::Weak as WeakArc,
|
||||
};
|
||||
|
||||
use async_executor::{Executor, Task};
|
||||
use mlua::prelude::*;
|
||||
use tracing::trace;
|
||||
|
||||
use crate::{
|
||||
exit::Exit,
|
||||
queue::{DeferredThreadQueue, FuturesQueue, SpawnedThreadQueue},
|
||||
result_map::ThreadResultMap,
|
||||
scheduler::Scheduler,
|
||||
thread_id::ThreadId,
|
||||
};
|
||||
|
||||
/**
|
||||
Trait for any struct that can be turned into an [`LuaThread`]
|
||||
and passed to the scheduler, implemented for the following types:
|
||||
|
||||
- Lua threads ([`LuaThread`])
|
||||
- Lua functions ([`LuaFunction`])
|
||||
- Lua chunks ([`LuaChunk`])
|
||||
*/
|
||||
pub trait IntoLuaThread<'lua> {
|
||||
/**
|
||||
Converts the value into a Lua thread.
|
||||
|
||||
# Errors
|
||||
|
||||
Errors when out of memory.
|
||||
*/
|
||||
fn into_lua_thread(self, lua: &'lua Lua) -> LuaResult<LuaThread<'lua>>;
|
||||
}
|
||||
|
||||
impl<'lua> IntoLuaThread<'lua> for LuaThread<'lua> {
|
||||
fn into_lua_thread(self, _: &'lua Lua) -> LuaResult<LuaThread<'lua>> {
|
||||
Ok(self)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'lua> IntoLuaThread<'lua> for LuaFunction<'lua> {
|
||||
fn into_lua_thread(self, lua: &'lua Lua) -> LuaResult<LuaThread<'lua>> {
|
||||
lua.create_thread(self)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'lua> IntoLuaThread<'lua> for LuaChunk<'lua, '_> {
|
||||
fn into_lua_thread(self, lua: &'lua Lua) -> LuaResult<LuaThread<'lua>> {
|
||||
lua.create_thread(self.into_function()?)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'lua, T> IntoLuaThread<'lua> for &T
|
||||
where
|
||||
T: IntoLuaThread<'lua> + Clone,
|
||||
{
|
||||
fn into_lua_thread(self, lua: &'lua Lua) -> LuaResult<LuaThread<'lua>> {
|
||||
self.clone().into_lua_thread(lua)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
Trait for interacting with the current [`Scheduler`].
|
||||
|
||||
Provides extra methods on the [`Lua`] struct for:
|
||||
|
||||
- Setting the exit code and forcibly stopping the scheduler
|
||||
- Pushing (spawning) and deferring (pushing to the back) lua threads
|
||||
- Tracking and getting the result of lua threads
|
||||
*/
|
||||
pub trait LuaSchedulerExt<'lua> {
|
||||
/**
|
||||
Sets the exit code of the current scheduler.
|
||||
|
||||
See [`Scheduler::set_exit_code`] for more information.
|
||||
|
||||
# Panics
|
||||
|
||||
Panics if called outside of a running [`Scheduler`].
|
||||
*/
|
||||
fn set_exit_code(&self, code: ExitCode);
|
||||
|
||||
/**
|
||||
Pushes (spawns) a lua thread to the **front** of the current scheduler.
|
||||
|
||||
See [`Scheduler::push_thread_front`] for more information.
|
||||
|
||||
# Panics
|
||||
|
||||
Panics if called outside of a running [`Scheduler`].
|
||||
*/
|
||||
fn push_thread_front(
|
||||
&'lua self,
|
||||
thread: impl IntoLuaThread<'lua>,
|
||||
args: impl IntoLuaMulti<'lua>,
|
||||
) -> LuaResult<ThreadId>;
|
||||
|
||||
/**
|
||||
Pushes (defers) a lua thread to the **back** of the current scheduler.
|
||||
|
||||
See [`Scheduler::push_thread_back`] for more information.
|
||||
|
||||
# Panics
|
||||
|
||||
Panics if called outside of a running [`Scheduler`].
|
||||
*/
|
||||
fn push_thread_back(
|
||||
&'lua self,
|
||||
thread: impl IntoLuaThread<'lua>,
|
||||
args: impl IntoLuaMulti<'lua>,
|
||||
) -> LuaResult<ThreadId>;
|
||||
|
||||
/**
|
||||
Registers the given thread to be tracked within the current scheduler.
|
||||
|
||||
Must be called before waiting for a thread to complete or getting its result.
|
||||
*/
|
||||
fn track_thread(&'lua self, id: ThreadId);
|
||||
|
||||
/**
|
||||
Gets the result of the given thread.
|
||||
|
||||
See [`Scheduler::get_thread_result`] for more information.
|
||||
|
||||
# Panics
|
||||
|
||||
Panics if called outside of a running [`Scheduler`].
|
||||
*/
|
||||
fn get_thread_result(&'lua self, id: ThreadId) -> Option<LuaResult<LuaMultiValue<'lua>>>;
|
||||
|
||||
/**
|
||||
Waits for the given thread to complete.
|
||||
|
||||
See [`Scheduler::wait_for_thread`] for more information.
|
||||
|
||||
# Panics
|
||||
|
||||
Panics if called outside of a running [`Scheduler`].
|
||||
*/
|
||||
fn wait_for_thread(&'lua self, id: ThreadId) -> impl Future<Output = ()>;
|
||||
}
|
||||
|
||||
/**
|
||||
Trait for interacting with the [`Executor`] for the current [`Scheduler`].
|
||||
|
||||
Provides extra methods on the [`Lua`] struct for:
|
||||
|
||||
- Spawning thread-local (`!Send`) futures on the current executor
|
||||
- Spawning background (`Send`) futures on the current executor
|
||||
- Spawning blocking tasks on a separate thread pool
|
||||
*/
|
||||
pub trait LuaSpawnExt<'lua> {
|
||||
/**
|
||||
Spawns the given future on the current executor and returns its [`Task`].
|
||||
|
||||
# Panics
|
||||
|
||||
Panics if called outside of a running [`Scheduler`].
|
||||
|
||||
# Example usage
|
||||
|
||||
```rust
|
||||
use async_io::block_on;
|
||||
|
||||
use mlua::prelude::*;
|
||||
use mlua_luau_scheduler::*;
|
||||
|
||||
fn main() -> LuaResult<()> {
|
||||
let lua = Lua::new();
|
||||
|
||||
lua.globals().set(
|
||||
"spawnBackgroundTask",
|
||||
lua.create_async_function(|lua, ()| async move {
|
||||
lua.spawn(async move {
|
||||
println!("Hello from background task!");
|
||||
}).await;
|
||||
Ok(())
|
||||
})?
|
||||
)?;
|
||||
|
||||
let sched = Scheduler::new(&lua);
|
||||
sched.push_thread_front(lua.load("spawnBackgroundTask()"), ());
|
||||
block_on(sched.run());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
```
|
||||
*/
|
||||
fn spawn<F, T>(&self, fut: F) -> Task<T>
|
||||
where
|
||||
F: Future<Output = T> + Send + 'static,
|
||||
T: Send + 'static;
|
||||
|
||||
/**
|
||||
Spawns the given thread-local future on the current executor.
|
||||
|
||||
Note that this future will run detached and always to completion,
|
||||
preventing the [`Scheduler`] was spawned on from completing until done.
|
||||
|
||||
# Panics
|
||||
|
||||
Panics if called outside of a running [`Scheduler`].
|
||||
|
||||
# Example usage
|
||||
|
||||
```rust
|
||||
use async_io::block_on;
|
||||
|
||||
use mlua::prelude::*;
|
||||
use mlua_luau_scheduler::*;
|
||||
|
||||
fn main() -> LuaResult<()> {
|
||||
let lua = Lua::new();
|
||||
|
||||
lua.globals().set(
|
||||
"spawnLocalTask",
|
||||
lua.create_async_function(|lua, ()| async move {
|
||||
lua.spawn_local(async move {
|
||||
println!("Hello from local task!");
|
||||
});
|
||||
Ok(())
|
||||
})?
|
||||
)?;
|
||||
|
||||
let sched = Scheduler::new(&lua);
|
||||
sched.push_thread_front(lua.load("spawnLocalTask()"), ());
|
||||
block_on(sched.run());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
```
|
||||
*/
|
||||
fn spawn_local<F>(&self, fut: F)
|
||||
where
|
||||
F: Future<Output = ()> + 'static;
|
||||
|
||||
/**
|
||||
Spawns the given blocking function and returns its [`Task`].
|
||||
|
||||
This function will run on a separate thread pool and not block the current executor.
|
||||
|
||||
# Panics
|
||||
|
||||
Panics if called outside of a running [`Scheduler`].
|
||||
|
||||
# Example usage
|
||||
|
||||
```rust
|
||||
use async_io::block_on;
|
||||
|
||||
use mlua::prelude::*;
|
||||
use mlua_luau_scheduler::*;
|
||||
|
||||
fn main() -> LuaResult<()> {
|
||||
let lua = Lua::new();
|
||||
|
||||
lua.globals().set(
|
||||
"spawnBlockingTask",
|
||||
lua.create_async_function(|lua, ()| async move {
|
||||
lua.spawn_blocking(|| {
|
||||
println!("Hello from blocking task!");
|
||||
}).await;
|
||||
Ok(())
|
||||
})?
|
||||
)?;
|
||||
|
||||
let sched = Scheduler::new(&lua);
|
||||
sched.push_thread_front(lua.load("spawnBlockingTask()"), ());
|
||||
block_on(sched.run());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
```
|
||||
*/
|
||||
fn spawn_blocking<F, T>(&self, f: F) -> Task<T>
|
||||
where
|
||||
F: FnOnce() -> T + Send + 'static,
|
||||
T: Send + 'static;
|
||||
}
|
||||
|
||||
impl<'lua> LuaSchedulerExt<'lua> for Lua {
|
||||
fn set_exit_code(&self, code: ExitCode) {
|
||||
let exit = self
|
||||
.app_data_ref::<Exit>()
|
||||
.expect("exit code can only be set from within an active scheduler");
|
||||
exit.set(code);
|
||||
}
|
||||
|
||||
fn push_thread_front(
|
||||
&'lua self,
|
||||
thread: impl IntoLuaThread<'lua>,
|
||||
args: impl IntoLuaMulti<'lua>,
|
||||
) -> LuaResult<ThreadId> {
|
||||
let queue = self
|
||||
.app_data_ref::<SpawnedThreadQueue>()
|
||||
.expect("lua threads can only be pushed from within an active scheduler");
|
||||
queue.push_item(self, thread, args)
|
||||
}
|
||||
|
||||
fn push_thread_back(
|
||||
&'lua self,
|
||||
thread: impl IntoLuaThread<'lua>,
|
||||
args: impl IntoLuaMulti<'lua>,
|
||||
) -> LuaResult<ThreadId> {
|
||||
let queue = self
|
||||
.app_data_ref::<DeferredThreadQueue>()
|
||||
.expect("lua threads can only be pushed from within an active scheduler");
|
||||
queue.push_item(self, thread, args)
|
||||
}
|
||||
|
||||
fn track_thread(&'lua self, id: ThreadId) {
|
||||
let map = self
|
||||
.app_data_ref::<ThreadResultMap>()
|
||||
.expect("lua threads can only be tracked from within an active scheduler");
|
||||
map.track(id);
|
||||
}
|
||||
|
||||
fn get_thread_result(&'lua self, id: ThreadId) -> Option<LuaResult<LuaMultiValue<'lua>>> {
|
||||
let map = self
|
||||
.app_data_ref::<ThreadResultMap>()
|
||||
.expect("lua threads results can only be retrieved from within an active scheduler");
|
||||
map.remove(id).map(|r| r.value(self))
|
||||
}
|
||||
|
||||
fn wait_for_thread(&'lua self, id: ThreadId) -> impl Future<Output = ()> {
|
||||
let map = self
|
||||
.app_data_ref::<ThreadResultMap>()
|
||||
.expect("lua threads results can only be retrieved from within an active scheduler");
|
||||
async move { map.listen(id).await }
|
||||
}
|
||||
}
|
||||
|
||||
impl<'lua> LuaSpawnExt<'lua> for Lua {
|
||||
fn spawn<F, T>(&self, fut: F) -> Task<T>
|
||||
where
|
||||
F: Future<Output = T> + Send + 'static,
|
||||
T: Send + 'static,
|
||||
{
|
||||
let exec = self
|
||||
.app_data_ref::<WeakArc<Executor>>()
|
||||
.expect("tasks can only be spawned within an active scheduler")
|
||||
.upgrade()
|
||||
.expect("executor was dropped");
|
||||
trace!("spawning future on executor");
|
||||
exec.spawn(fut)
|
||||
}
|
||||
|
||||
fn spawn_local<F>(&self, fut: F)
|
||||
where
|
||||
F: Future<Output = ()> + 'static,
|
||||
{
|
||||
let queue = self
|
||||
.app_data_ref::<WeakRc<FuturesQueue>>()
|
||||
.expect("tasks can only be spawned within an active scheduler")
|
||||
.upgrade()
|
||||
.expect("executor was dropped");
|
||||
trace!("spawning local task on executor");
|
||||
queue.push_item(fut);
|
||||
}
|
||||
|
||||
fn spawn_blocking<F, T>(&self, f: F) -> Task<T>
|
||||
where
|
||||
F: FnOnce() -> T + Send + 'static,
|
||||
T: Send + 'static,
|
||||
{
|
||||
let exec = self
|
||||
.app_data_ref::<WeakArc<Executor>>()
|
||||
.expect("tasks can only be spawned within an active scheduler")
|
||||
.upgrade()
|
||||
.expect("executor was dropped");
|
||||
trace!("spawning blocking task on executor");
|
||||
exec.spawn(blocking::unblock(f))
|
||||
}
|
||||
}
|
147
crates/mlua-luau-scheduler/src/util.rs
Normal file
147
crates/mlua-luau-scheduler/src/util.rs
Normal file
|
@ -0,0 +1,147 @@
|
|||
use futures_lite::StreamExt;
|
||||
use mlua::prelude::*;
|
||||
use tracing::instrument;
|
||||
|
||||
/**
|
||||
Runs a Lua thread until it manually yields (using coroutine.yield), errors, or completes.
|
||||
|
||||
May return `None` if the thread was cancelled.
|
||||
|
||||
Otherwise returns the values yielded by the thread, or the error that caused it to stop.
|
||||
*/
|
||||
#[instrument(level = "trace", name = "Scheduler::run_until_yield", skip_all)]
|
||||
pub(crate) async fn run_until_yield<'lua>(
|
||||
thread: LuaThread<'lua>,
|
||||
args: LuaMultiValue<'lua>,
|
||||
) -> Option<LuaResult<LuaMultiValue<'lua>>> {
|
||||
let mut stream = thread.into_async(args);
|
||||
/*
|
||||
NOTE: It is very important that we drop the thread/stream as
|
||||
soon as we are done, it takes up valuable Lua registry space
|
||||
and detached tasks will not drop until the executor does
|
||||
|
||||
https://github.com/smol-rs/smol/issues/294
|
||||
|
||||
We also do not unwrap here since returning `None` is expected behavior for cancellation.
|
||||
|
||||
Even though we are converting into a stream, and then immediately running it,
|
||||
the future may still be cancelled before it is polled, which gives us None.
|
||||
*/
|
||||
stream.next().await
|
||||
}
|
||||
|
||||
/**
|
||||
Checks if the given [`LuaValue`] is the async `POLL_PENDING` constant.
|
||||
*/
|
||||
#[inline]
|
||||
pub(crate) fn is_poll_pending(value: &LuaValue) -> bool {
|
||||
value
|
||||
.as_light_userdata()
|
||||
.is_some_and(|l| l == Lua::poll_pending())
|
||||
}
|
||||
|
||||
/**
|
||||
Representation of a [`LuaResult`] with an associated [`LuaMultiValue`] currently stored in the Lua registry.
|
||||
*/
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct ThreadResult {
|
||||
inner: LuaResult<LuaRegistryKey>,
|
||||
}
|
||||
|
||||
impl ThreadResult {
|
||||
pub fn new(result: LuaResult<LuaMultiValue>, lua: &Lua) -> Self {
|
||||
Self {
|
||||
inner: match result {
|
||||
Ok(v) => Ok({
|
||||
let vec = v.into_vec();
|
||||
lua.create_registry_value(vec).expect("out of memory")
|
||||
}),
|
||||
Err(e) => Err(e),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
pub fn value(self, lua: &Lua) -> LuaResult<LuaMultiValue> {
|
||||
match self.inner {
|
||||
Ok(key) => {
|
||||
let vec = lua.registry_value(&key).unwrap();
|
||||
lua.remove_registry_value(key).unwrap();
|
||||
Ok(LuaMultiValue::from_vec(vec))
|
||||
}
|
||||
Err(e) => Err(e.clone()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
Representation of a [`LuaThread`] with its associated arguments currently stored in the Lua registry.
|
||||
*/
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct ThreadWithArgs {
|
||||
key_thread: LuaRegistryKey,
|
||||
key_args: LuaRegistryKey,
|
||||
}
|
||||
|
||||
impl ThreadWithArgs {
|
||||
pub fn new<'lua>(
|
||||
lua: &'lua Lua,
|
||||
thread: LuaThread<'lua>,
|
||||
args: LuaMultiValue<'lua>,
|
||||
) -> LuaResult<Self> {
|
||||
let argsv = args.into_vec();
|
||||
|
||||
let key_thread = lua.create_registry_value(thread)?;
|
||||
let key_args = lua.create_registry_value(argsv)?;
|
||||
|
||||
Ok(Self {
|
||||
key_thread,
|
||||
key_args,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn into_inner(self, lua: &Lua) -> (LuaThread<'_>, LuaMultiValue<'_>) {
|
||||
let thread = lua.registry_value(&self.key_thread).unwrap();
|
||||
let argsv = lua.registry_value(&self.key_args).unwrap();
|
||||
|
||||
let args = LuaMultiValue::from_vec(argsv);
|
||||
|
||||
lua.remove_registry_value(self.key_thread).unwrap();
|
||||
lua.remove_registry_value(self.key_args).unwrap();
|
||||
|
||||
(thread, args)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
Wrapper struct to accept either a Lua thread or a Lua function as function argument.
|
||||
|
||||
[`LuaThreadOrFunction::into_thread`] may be used to convert the value into a Lua thread.
|
||||
*/
|
||||
#[derive(Clone)]
|
||||
pub(crate) enum LuaThreadOrFunction<'lua> {
|
||||
Thread(LuaThread<'lua>),
|
||||
Function(LuaFunction<'lua>),
|
||||
}
|
||||
|
||||
impl<'lua> LuaThreadOrFunction<'lua> {
|
||||
pub(super) fn into_thread(self, lua: &'lua Lua) -> LuaResult<LuaThread<'lua>> {
|
||||
match self {
|
||||
Self::Thread(t) => Ok(t),
|
||||
Self::Function(f) => lua.create_thread(f),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'lua> FromLua<'lua> for LuaThreadOrFunction<'lua> {
|
||||
fn from_lua(value: LuaValue<'lua>, _: &'lua Lua) -> LuaResult<Self> {
|
||||
match value {
|
||||
LuaValue::Thread(t) => Ok(Self::Thread(t)),
|
||||
LuaValue::Function(f) => Ok(Self::Function(f)),
|
||||
value => Err(LuaError::FromLuaConversionError {
|
||||
from: value.type_name(),
|
||||
to: "LuaThreadOrFunction",
|
||||
message: Some("Expected thread or function".to_string()),
|
||||
}),
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue