From bf0bd20ffc6e349ef2922bde108647f298d13a98 Mon Sep 17 00:00:00 2001 From: Filip Tibell Date: Thu, 18 Jan 2024 10:52:30 +0100 Subject: [PATCH] Get rid of old stuff --- Cargo.lock | 293 +------------------------------ Cargo.toml | 6 - src/main.luau | 17 ++ src/main.rs | 40 ++++- src/smol/mod.rs | 53 ------ src/{smol => }/thread_runtime.rs | 4 +- src/{smol => }/thread_storage.rs | 0 src/{smol => }/thread_util.rs | 0 src/tokio/error_storage.rs | 36 ---- src/tokio/lua.rs | 41 ----- src/tokio/lua_ext.rs | 113 ------------ src/tokio/message.rs | 16 -- src/tokio/mod.rs | 226 ------------------------ src/tokio/stats.rs | 40 ----- src/tokio/thread_id.rs | 34 ---- src/tokio/value.rs | 287 ------------------------------ 16 files changed, 55 insertions(+), 1151 deletions(-) create mode 100644 src/main.luau delete mode 100644 src/smol/mod.rs rename src/{smol => }/thread_runtime.rs (97%) rename src/{smol => }/thread_storage.rs (100%) rename src/{smol => }/thread_util.rs (100%) delete mode 100644 src/tokio/error_storage.rs delete mode 100644 src/tokio/lua.rs delete mode 100644 src/tokio/lua_ext.rs delete mode 100644 src/tokio/message.rs delete mode 100644 src/tokio/mod.rs delete mode 100644 src/tokio/stats.rs delete mode 100644 src/tokio/thread_id.rs delete mode 100644 src/tokio/value.rs diff --git a/Cargo.lock b/Cargo.lock index 1994fc0..1031946 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,27 +2,6 @@ # It is not intended for manual editing. version = 3 -[[package]] -name = "addr2line" -version = "0.21.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a30b2e23b9e17a9f90641c7ab1549cd9b44f296d3ccbf309d2863cfe398a0cb" -dependencies = [ - "gimli", -] - -[[package]] -name = "adler" -version = "1.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" - -[[package]] -name = "anyhow" -version = "1.0.79" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "080e9890a082662b09c1ad45f567faeeb47f22b5fb23895fbe1e651e718e25ca" - [[package]] name = "async-channel" version = "2.1.1" @@ -165,27 +144,6 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" -[[package]] -name = "backtrace" -version = "0.3.69" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2089b7e3f35b9dd2d0ed921ead4f6d318c27680d4a5bd167b3ee120edb105837" -dependencies = [ - "addr2line", - "cc", - "cfg-if", - "libc", - "miniz_oxide", - "object", - "rustc-demangle", -] - -[[package]] -name = "bitflags" -version = "1.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" - [[package]] name = "bitflags" version = "2.4.2" @@ -218,12 +176,6 @@ dependencies = [ "serde", ] -[[package]] -name = "bytes" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a2bd12c1caf447e69cd4528f47f94d203fd2582878ecb9e9465484c4148a8223" - [[package]] name = "cc" version = "1.0.83" @@ -254,19 +206,6 @@ version = "0.8.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "248e3bacc7dc6baa3b21e405ee045c3047101a49145e7e9eca583ab4c2ca5345" -[[package]] -name = "dashmap" -version = "5.5.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" -dependencies = [ - "cfg-if", - "hashbrown", - "lock_api", - "once_cell", - "parking_lot_core", -] - [[package]] name = "errno" version = "0.3.8" @@ -354,44 +293,6 @@ dependencies = [ "slab", ] -[[package]] -name = "getrandom" -version = "0.2.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "190092ea657667030ac6a35e305e62fc4dd69fd98ac98631e5d3a2b1575a12b5" -dependencies = [ - "cfg-if", - "libc", - "wasi", -] - -[[package]] -name = "gimli" -version = "0.28.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4271d37baee1b8c7e4b708028c57d816cf9d2434acb33a549475f78c181f6253" - -[[package]] -name = "gxhash" -version = "2.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09f0c897148ec6ff3ca864b7c886df75e6ba09972d206bd9a89af0c18c992253" -dependencies = [ - "rand", -] - -[[package]] -name = "hashbrown" -version = "0.14.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "290f1a1d9242c78d09ce40a5e87e7554ee637af1351968159f4952f028f75604" - -[[package]] -name = "hermit-abi" -version = "0.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d77f7ec81a6d05a3abb01ab6eb7590f6083d08449fe5a1c8b1e620283546ccb7" - [[package]] name = "libc" version = "0.2.152" @@ -414,27 +315,12 @@ version = "0.4.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "01cda141df6706de531b6c46c3a33ecca755538219bd484262fa09410c13539c" -[[package]] -name = "lock_api" -version = "0.4.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c168f8615b12bc01f9c17e2eb0cc07dcae1940121185446edc3744920e8ef45" -dependencies = [ - "autocfg", - "scopeguard", -] - [[package]] name = "luau-scheduler-experiments" version = "0.0.0" dependencies = [ - "anyhow", - "dashmap", - "gxhash", "mlua", - "self_cell", "smol", - "tokio", ] [[package]] @@ -452,26 +338,6 @@ version = "2.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "523dc4f511e55ab87b694dc30d0f820d60906ef06413f93d4d7a1385599cc149" -[[package]] -name = "miniz_oxide" -version = "0.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e7810e0be55b428ada41041c41f32c9f1a42817901b4ccf45fa3d4b6561e74c7" -dependencies = [ - "adler", -] - -[[package]] -name = "mio" -version = "0.8.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f3d0b296e374a4e6f3c7b0a1f5a51d748a0d34c85e7dc48fc3fa9a87657fe09" -dependencies = [ - "libc", - "wasi", - "windows-sys 0.48.0", -] - [[package]] name = "mlua" version = "0.9.4" @@ -508,25 +374,6 @@ dependencies = [ "autocfg", ] -[[package]] -name = "num_cpus" -version = "1.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" -dependencies = [ - "hermit-abi", - "libc", -] - -[[package]] -name = "object" -version = "0.32.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a6a622008b6e321afc04970976f62ee297fdbaa6f95318ca343e3eebb9648441" -dependencies = [ - "memchr", -] - [[package]] name = "once_cell" version = "1.19.0" @@ -539,29 +386,6 @@ version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bb813b8af86854136c6922af0598d719255ecb2179515e6e7730d468f05c9cae" -[[package]] -name = "parking_lot" -version = "0.12.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" -dependencies = [ - "lock_api", - "parking_lot_core", -] - -[[package]] -name = "parking_lot_core" -version = "0.9.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c42a9226546d68acdd9c0a280d17ce19bfe27a46bf68784e4066115788d008e" -dependencies = [ - "cfg-if", - "libc", - "redox_syscall", - "smallvec", - "windows-targets 0.48.5", -] - [[package]] name = "pin-project-lite" version = "0.2.13" @@ -605,12 +429,6 @@ dependencies = [ "windows-sys 0.52.0", ] -[[package]] -name = "ppv-lite86" -version = "0.2.17" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" - [[package]] name = "proc-macro2" version = "1.0.76" @@ -629,51 +447,6 @@ dependencies = [ "proc-macro2", ] -[[package]] -name = "rand" -version = "0.8.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" -dependencies = [ - "libc", - "rand_chacha", - "rand_core", -] - -[[package]] -name = "rand_chacha" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" -dependencies = [ - "ppv-lite86", - "rand_core", -] - -[[package]] -name = "rand_core" -version = "0.6.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" -dependencies = [ - "getrandom", -] - -[[package]] -name = "redox_syscall" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4722d768eff46b75989dd134e5c353f0d6296e5aaa3132e776cbdb56be7731aa" -dependencies = [ - "bitflags 1.3.2", -] - -[[package]] -name = "rustc-demangle" -version = "0.1.23" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" - [[package]] name = "rustc-hash" version = "1.1.0" @@ -686,25 +459,13 @@ version = "0.38.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "322394588aaf33c24007e8bb3238ee3e4c5c09c084ab32bc73890b99ff326bca" dependencies = [ - "bitflags 2.4.2", + "bitflags", "errno", "libc", "linux-raw-sys", "windows-sys 0.52.0", ] -[[package]] -name = "scopeguard" -version = "1.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" - -[[package]] -name = "self_cell" -version = "1.0.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "58bf37232d3bb9a2c4e641ca2a11d83b5062066f88df7fed36c28772046d65ba" - [[package]] name = "serde" version = "1.0.195" @@ -743,12 +504,6 @@ dependencies = [ "autocfg", ] -[[package]] -name = "smallvec" -version = "1.12.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2593d31f82ead8df961d8bd23a64c2ccf2eb5dd34b0a34bfb4dd54011c72009e" - [[package]] name = "smol" version = "2.0.0" @@ -766,16 +521,6 @@ dependencies = [ "futures-lite", ] -[[package]] -name = "socket2" -version = "0.5.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b5fac59a5cb5dd637972e5fca70daf0523c9067fcdc4842f053dae04a18f8e9" -dependencies = [ - "libc", - "windows-sys 0.48.0", -] - [[package]] name = "syn" version = "2.0.48" @@ -787,36 +532,6 @@ dependencies = [ "unicode-ident", ] -[[package]] -name = "tokio" -version = "1.35.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c89b4efa943be685f629b149f53829423f8f5531ea21249408e8e2f8671ec104" -dependencies = [ - "backtrace", - "bytes", - "libc", - "mio", - "num_cpus", - "parking_lot", - "pin-project-lite", - "signal-hook-registry", - "socket2", - "tokio-macros", - "windows-sys 0.48.0", -] - -[[package]] -name = "tokio-macros" -version = "2.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "tracing" version = "0.1.40" @@ -839,12 +554,6 @@ version = "1.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b" -[[package]] -name = "wasi" -version = "0.11.0+wasi-snapshot-preview1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" - [[package]] name = "windows-sys" version = "0.48.0" diff --git a/Cargo.toml b/Cargo.toml index a310b4a..5133fdc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,11 +4,5 @@ version = "0.0.0" edition = "2021" [dependencies] -anyhow = "1.0" -dashmap = "5.5" -gxhash = "2.3" -self_cell = "1.0" - smol = "2.0" -tokio = { version = "1.0", features = ["full"] } mlua = { version = "0.9", features = ["luau", "luau-jit", "async"] } diff --git a/src/main.luau b/src/main.luau new file mode 100644 index 0000000..7a399ec --- /dev/null +++ b/src/main.luau @@ -0,0 +1,17 @@ +for i = 1, 5 do + print("iteration " .. tostring(i) .. " of 5") + + local counter = 0 + for j = 1, 10_000 do + spawn(function() + wait(0.1 * math.random()) + counter += 1 + if counter == 10_000 then + print("completed iteration " .. tostring(i) .. " of 5") + end + end) + end + + -- FIXME: This resumes instantly with mlua "async" feature + coroutine.yield() +end diff --git a/src/main.rs b/src/main.rs index 92a3461..02d1429 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,7 +1,37 @@ -mod smol; -// mod tokio; +use std::time::{Duration, Instant}; -fn main() { - let _ = smol::main(); - // let _ = tokio::main(); +use mlua::prelude::*; +use smol::*; + +const MAIN_SCRIPT: &str = include_str!("./main.luau"); + +mod thread_runtime; +mod thread_storage; +mod thread_util; + +use thread_runtime::*; +use thread_storage::*; + +pub fn main() -> LuaResult<()> { + let start = Instant::now(); + let lua = Lua::new(); + + // Set up persistent lua environment + lua.globals().set( + "wait", + lua.create_async_function(|_, duration: f64| async move { + let before = Instant::now(); + let after = Timer::after(Duration::from_secs_f64(duration)).await; + Ok((after - before).as_secs_f64()) + })?, + )?; + + // Set up runtime (thread queue / async executors) and run main script until end + let rt = ThreadRuntime::new(&lua)?; + rt.push_main(&lua, lua.load(MAIN_SCRIPT), ()); + rt.run_blocking(&lua); + + println!("elapsed: {:?}", start.elapsed()); + + Ok(()) } diff --git a/src/smol/mod.rs b/src/smol/mod.rs deleted file mode 100644 index d67eeb9..0000000 --- a/src/smol/mod.rs +++ /dev/null @@ -1,53 +0,0 @@ -use std::time::{Duration, Instant}; - -use mlua::prelude::*; -use smol::*; - -const MAIN_CHUNK: &str = r#" -for i = 1, 5 do - print("iteration " .. tostring(i) .. " of 5") - local thread = coroutine.running() - local counter = 0 - for j = 1, 10_000 do - __scheduler__spawn(function() - wait(0.1 * math.random()) - counter += 1 - if counter == 10_000 then - print("completed iteration " .. tostring(i) .. " of 5") - end - end) - end - coroutine.yield() -- FIXME: This resumes instantly with mlua "async" feature -end -"#; - -mod thread_runtime; -mod thread_storage; -mod thread_util; - -use thread_runtime::*; -use thread_storage::*; - -pub fn main() -> LuaResult<()> { - let start = Instant::now(); - let lua = Lua::new(); - - // Set up persistent lua environment - lua.globals().set( - "wait", - lua.create_async_function(|_, duration: f64| async move { - let before = Instant::now(); - let after = Timer::after(Duration::from_secs_f64(duration)).await; - Ok((after - before).as_secs_f64()) - })?, - )?; - - // Set up runtime (thread queue / async executors) and run main script until end - let rt = ThreadRuntime::new(&lua)?; - rt.push_main(&lua, lua.load(MAIN_CHUNK), ()); - rt.run_blocking(&lua); - - println!("elapsed: {:?}", start.elapsed()); - - Ok(()) -} diff --git a/src/smol/thread_runtime.rs b/src/thread_runtime.rs similarity index 97% rename from src/smol/thread_runtime.rs rename to src/thread_runtime.rs index 7cf4fac..43f1804 100644 --- a/src/smol/thread_runtime.rs +++ b/src/thread_runtime.rs @@ -69,8 +69,8 @@ impl ThreadRuntime { // FUTURE: Store these as named registry values instead // so that they are not accessible from within user code - lua.globals().set("__scheduler__spawn", fn_spawn)?; - lua.globals().set("__scheduler__defer", fn_defer)?; + lua.globals().set("spawn", fn_spawn)?; + lua.globals().set("defer", fn_defer)?; Ok(ThreadRuntime { queue, tx, rx }) } diff --git a/src/smol/thread_storage.rs b/src/thread_storage.rs similarity index 100% rename from src/smol/thread_storage.rs rename to src/thread_storage.rs diff --git a/src/smol/thread_util.rs b/src/thread_util.rs similarity index 100% rename from src/smol/thread_util.rs rename to src/thread_util.rs diff --git a/src/tokio/error_storage.rs b/src/tokio/error_storage.rs deleted file mode 100644 index a3fda8e..0000000 --- a/src/tokio/error_storage.rs +++ /dev/null @@ -1,36 +0,0 @@ -use std::sync::{ - atomic::{AtomicBool, Ordering}, - Arc, Mutex, -}; - -use mlua::prelude::*; - -#[derive(Debug, Clone)] -pub struct ErrorStorage { - is_some: Arc, - inner: Arc>>, -} - -impl ErrorStorage { - pub fn new() -> Self { - Self { - is_some: Arc::new(AtomicBool::new(false)), - inner: Arc::new(Mutex::new(None)), - } - } - - #[inline] - pub fn take(&self) -> Option { - if self.is_some.load(Ordering::Relaxed) { - self.inner.lock().unwrap().take() - } else { - None - } - } - - #[inline] - pub fn replace(&self, e: LuaError) { - self.is_some.store(true, Ordering::Relaxed); - self.inner.lock().unwrap().replace(e); - } -} diff --git a/src/tokio/lua.rs b/src/tokio/lua.rs deleted file mode 100644 index 5375147..0000000 --- a/src/tokio/lua.rs +++ /dev/null @@ -1,41 +0,0 @@ -use mlua::prelude::*; - -use crate::tokio::{Message, MessageSender, ThreadId}; - -pub fn create_lua(lua_tx: MessageSender, async_tx: MessageSender) -> LuaResult { - let lua = Lua::new(); - lua.enable_jit(true); - lua.set_app_data(async_tx.clone()); - - // Cancellation - let cancel_tx = lua_tx.clone(); - lua.globals().set( - "__scheduler__cancel", - LuaFunction::wrap(move |_, thread: LuaThread| { - let thread_id = ThreadId::from(thread); - cancel_tx.send(Message::Cancel(thread_id)).into_lua_err() - }), - )?; - - // Stdout - let stdout_tx = async_tx.clone(); - lua.globals().set( - "__scheduler__writeStdout", - LuaFunction::wrap(move |_, s: LuaString| { - let bytes = s.as_bytes().to_vec(); - stdout_tx.send(Message::WriteStdout(bytes)).into_lua_err() - }), - )?; - - // Stderr - let stderr_tx = async_tx.clone(); - lua.globals().set( - "__scheduler__writeStderr", - LuaFunction::wrap(move |_, s: LuaString| { - let bytes = s.as_bytes().to_vec(); - stderr_tx.send(Message::WriteStderr(bytes)).into_lua_err() - }), - )?; - - Ok(lua) -} diff --git a/src/tokio/lua_ext.rs b/src/tokio/lua_ext.rs deleted file mode 100644 index 305dfb7..0000000 --- a/src/tokio/lua_ext.rs +++ /dev/null @@ -1,113 +0,0 @@ -use std::future::Future; - -use mlua::prelude::*; -use tokio::{spawn, task::spawn_local}; - -use crate::tokio::{AsyncValues, Message, MessageSender, ThreadId}; - -const ASYNC_IMPL: &str = r#" -run(...) -return yield() -"#; - -pub trait LuaAsyncExt<'lua> { - fn current_thread_id(&'lua self) -> ThreadId; - - fn create_async_function(&'lua self, f: F) -> LuaResult> - where - A: FromLuaMulti<'lua>, - R: Into + Send + 'static, - F: Fn(&'lua Lua, A) -> FR + 'static, - FR: Future> + Send + 'static; - - fn create_local_async_function(&'lua self, f: F) -> LuaResult> - where - A: FromLuaMulti<'lua>, - R: Into + 'static, - F: Fn(&'lua Lua, A) -> FR + 'static, - FR: Future> + 'static; -} - -impl<'lua> LuaAsyncExt<'lua> for Lua { - fn current_thread_id(&'lua self) -> ThreadId { - ThreadId::from(self.current_thread()) - } - - fn create_async_function(&'lua self, f: F) -> LuaResult> - where - A: FromLuaMulti<'lua>, - R: Into + Send + 'static, - F: Fn(&'lua Lua, A) -> FR + 'static, - FR: Future> + Send + 'static, - { - let tx = self.app_data_ref::().unwrap().clone(); - - let yld = self - .globals() - .get::<_, LuaTable>("coroutine")? - .get::<_, LuaFunction>("yield")?; - - let run = self.create_function(move |lua, args: A| { - let thread_id = lua.current_thread_id(); - let fut = f(lua, args); - let tx = tx.clone(); - - spawn(async move { - tx.send(match fut.await { - Ok(args) => Message::Resume(thread_id, Ok(args.into())), - Err(e) => Message::Resume(thread_id, Err(e)), - }) - }); - - Ok(()) - })?; - - let env = self.create_table()?; - env.set("yield", yld)?; - env.set("run", run)?; - - self.load(ASYNC_IMPL) - .set_environment(env) - .set_name("async") - .into_function() - } - - fn create_local_async_function(&'lua self, f: F) -> LuaResult> - where - A: FromLuaMulti<'lua>, - R: Into + 'static, - F: Fn(&'lua Lua, A) -> FR + 'static, - FR: Future> + 'static, - { - let tx = self.app_data_ref::().unwrap().clone(); - - let yld = self - .globals() - .get::<_, LuaTable>("coroutine")? - .get::<_, LuaFunction>("yield")?; - - let run = self.create_function(move |lua, args: A| { - let thread_id = lua.current_thread_id(); - let fut = f(lua, args); - let tx = tx.clone(); - - spawn_local(async move { - tx.send(match fut.await { - Ok(args) => Message::Resume(thread_id, Ok(args.into())), - Err(e) => Message::Resume(thread_id, Err(e)), - }) - }); - - Ok(()) - })?; - - let env = self.create_table()?; - env.set("yield", yld)?; - env.set("run", run)?; - - self.load(ASYNC_IMPL) - .set_environment(env) - .set_name("async") - .into_function() - } -} diff --git a/src/tokio/message.rs b/src/tokio/message.rs deleted file mode 100644 index aaceb9a..0000000 --- a/src/tokio/message.rs +++ /dev/null @@ -1,16 +0,0 @@ -use mlua::prelude::*; -use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; - -use crate::tokio::{AsyncValues, ThreadId}; - -pub type MessageSender = UnboundedSender; -pub type MessageReceiver = UnboundedReceiver; - -#[derive(Debug)] -pub enum Message { - Resume(ThreadId, LuaResult), - Cancel(ThreadId), - WriteError(LuaError), - WriteStdout(Vec), - WriteStderr(Vec), -} diff --git a/src/tokio/mod.rs b/src/tokio/mod.rs deleted file mode 100644 index 195fa08..0000000 --- a/src/tokio/mod.rs +++ /dev/null @@ -1,226 +0,0 @@ -use std::time::Duration; - -use gxhash::GxHashMap; -use mlua::prelude::*; -use tokio::{ - io::{self, AsyncWriteExt}, - runtime::Runtime as TokioRuntime, - select, spawn, - sync::mpsc::{unbounded_channel, UnboundedReceiver}, - task::{spawn_blocking, LocalSet}, - time::{sleep, Instant}, -}; - -mod error_storage; -mod lua; -mod lua_ext; -mod message; -mod stats; -mod thread_id; -mod value; - -use error_storage::*; -use lua::*; -use message::*; -use stats::*; -use thread_id::*; -use value::*; - -use crate::tokio::lua_ext::LuaAsyncExt; - -const NUM_TEST_BATCHES: usize = 20; -const NUM_TEST_THREADS: usize = 50_000; - -const MAIN_CHUNK: &str = r#" -wait(0.01 * math.random()) -"#; - -pub fn main() { - let rt = TokioRuntime::new().unwrap(); - let set = LocalSet::new(); - let _guard = set.enter(); - - let (async_tx, lua_rx) = unbounded_channel::(); - let (lua_tx, async_rx) = unbounded_channel::(); - - let stats = Stats::new(); - let stats_inner = stats.clone(); - - set.block_on(&rt, async { - let res = select! { - r = spawn(main_async_task(async_rx, stats_inner.clone())) => r, - r = spawn_blocking(move || main_lua_task(lua_rx, lua_tx, async_tx, stats_inner)) => r, - }; - if let Err(e) = res { - eprintln!("Runtime fatal error: {e}"); - } - }); - - println!("Finished running in {:?}", stats.elapsed()); - println!("Thread counters: {:#?}", stats.counters); -} - -fn main_lua_task( - mut lua_rx: MessageReceiver, - lua_tx: MessageSender, - async_tx: MessageSender, - stats: Stats, -) -> LuaResult<()> { - let lua = create_lua(lua_tx.clone(), async_tx.clone())?; - - let error_storage = ErrorStorage::new(); - let error_storage_interrupt = error_storage.clone(); - lua.set_interrupt(move |_| match error_storage_interrupt.take() { - Some(e) => Err(e), - None => Ok(LuaVmState::Continue), - }); - - lua.globals().set( - "wait", - lua.create_async_function(|_, duration: f64| async move { - let before = Instant::now(); - sleep(Duration::from_secs_f64(duration)).await; - Ok(Instant::now() - before) - })?, - )?; - - let mut yielded_threads = GxHashMap::default(); - let mut runnable_threads = GxHashMap::default(); - - println!("Running {NUM_TEST_BATCHES} batches"); - for _ in 0..NUM_TEST_BATCHES { - let main_fn = lua.load(MAIN_CHUNK).into_function()?; - for _ in 0..NUM_TEST_THREADS { - let thread = lua.create_thread(main_fn.clone())?; - runnable_threads.insert(ThreadId::from(&thread), (thread, Ok(AsyncValues::new()))); - } - - loop { - // Runnable / yielded threads may be empty because of cancellation - if runnable_threads.is_empty() && yielded_threads.is_empty() { - break; - } - - // Resume as many threads as possible - for (thread_id, (thread, res)) in runnable_threads.drain() { - stats.incr(StatsCounter::ThreadResumed); - // NOTE: If we got an error we don't need to resume with any args - let args = match res { - Ok(a) => a, - Err(e) => { - error_storage.replace(e); - AsyncValues::from(()) - } - }; - if let Err(e) = thread.resume::<_, ()>(args) { - stats.incr(StatsCounter::ThreadErrored); - async_tx.send(Message::WriteError(e)).unwrap(); - } else if thread.status() == LuaThreadStatus::Resumable { - stats.incr(StatsCounter::ThreadYielded); - yielded_threads.insert(thread_id, thread); - } - } - - if yielded_threads.is_empty() { - break; // All threads ran, and we don't have any async task that can spawn more - } - - // Set up message processor - we mutably borrow both yielded_threads and runnable_threads - // so we can't really do this outside of the loop, but it compiles down to the same thing - let mut process_message = |message| match message { - Message::Resume(thread_id, res) => { - if let Some(thread) = yielded_threads.remove(&thread_id) { - runnable_threads.insert(thread_id, (thread, res)); - } - } - Message::Cancel(thread_id) => { - yielded_threads.remove(&thread_id); - runnable_threads.remove(&thread_id); - stats.incr(StatsCounter::ThreadCancelled); - } - m => unreachable!("got non-lua message: {m:?}"), - }; - - // Wait for at least one message, but try to receive as many as possible - if let Some(message) = lua_rx.blocking_recv() { - process_message(message); - while let Ok(message) = lua_rx.try_recv() { - process_message(message); - } - } else { - break; // Scheduler exited - } - } - } - - Ok(()) -} - -async fn main_async_task(mut async_rx: MessageReceiver, stats: Stats) -> LuaResult<()> { - // Give stdio its own task, we don't need it to block the scheduler - let (stdout_tx, stdout_rx) = unbounded_channel(); - let (stderr_tx, stderr_rx) = unbounded_channel(); - let forward_stdout = |data| stdout_tx.send(data).ok(); - let forward_stderr = |data| stderr_tx.send(data).ok(); - spawn(async move { - if let Err(e) = async_stdio_task(stdout_rx, stderr_rx).await { - eprintln!("Stdio fatal error: {e}"); - } - }); - - // Set up message processor - let process_message = |message| match message { - Message::WriteError(e) => { - forward_stderr(b"Lua error: ".to_vec()); - forward_stderr(e.to_string().as_bytes().to_vec()); - } - Message::WriteStdout(data) => { - forward_stdout(data); - stats.incr(StatsCounter::WriteStdout); - } - Message::WriteStderr(data) => { - forward_stderr(data); - stats.incr(StatsCounter::WriteStderr); - } - _ => unreachable!(), - }; - - // Wait for at least one message, but try to receive as many as possible - while let Some(message) = async_rx.recv().await { - process_message(message); - while let Ok(message) = async_rx.try_recv() { - process_message(message); - } - } - - Ok(()) -} - -async fn async_stdio_task( - mut stdout_rx: UnboundedReceiver>, - mut stderr_rx: UnboundedReceiver>, -) -> LuaResult<()> { - let mut stdout = io::stdout(); - let mut stderr = io::stderr(); - - loop { - select! { - data = stdout_rx.recv() => match data { - None => break, // Main task exited - Some(data) => { - stdout.write_all(&data).await?; - stdout.flush().await?; - } - }, - data = stderr_rx.recv() => match data { - None => break, // Main task exited - Some(data) => { - stderr.write_all(&data).await?; - stderr.flush().await?; - } - } - } - } - - Ok(()) -} diff --git a/src/tokio/stats.rs b/src/tokio/stats.rs deleted file mode 100644 index 55eaeca..0000000 --- a/src/tokio/stats.rs +++ /dev/null @@ -1,40 +0,0 @@ -use std::{sync::Arc, time::Duration}; - -use dashmap::DashMap; -use tokio::time::Instant; - -#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)] -pub enum StatsCounter { - ThreadResumed, - ThreadYielded, - ThreadCancelled, - ThreadErrored, - WriteStdout, - WriteStderr, -} - -#[derive(Debug, Clone)] -pub struct Stats { - start: Instant, - pub counters: Arc>, -} - -impl Stats { - pub fn new() -> Self { - Self { - start: Instant::now(), - counters: Arc::new(DashMap::new()), - } - } - - pub fn incr(&self, counter: StatsCounter) { - self.counters - .entry(counter) - .and_modify(|c| *c += 1) - .or_insert(1); - } - - pub fn elapsed(&self) -> Duration { - Instant::now() - self.start - } -} diff --git a/src/tokio/thread_id.rs b/src/tokio/thread_id.rs deleted file mode 100644 index 3201437..0000000 --- a/src/tokio/thread_id.rs +++ /dev/null @@ -1,34 +0,0 @@ -use mlua::prelude::*; - -#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)] -pub struct ThreadId(usize); - -impl ThreadId { - fn new(value: &LuaThread) -> Self { - // HACK: We rely on the debug format of mlua - // thread refs here, but currently this is the - // only way to get a proper unique id using mlua - let addr_string = format!("{value:?}"); - let addr = addr_string - .strip_prefix("Thread(Ref(0x") - .expect("Invalid thread address format - unknown prefix") - .split_once(')') - .map(|(s, _)| s) - .expect("Invalid thread address format - missing ')'"); - let id = usize::from_str_radix(addr, 16) - .expect("Failed to parse thread address as hexadecimal into usize"); - Self(id) - } -} - -impl From> for ThreadId { - fn from(value: LuaThread) -> Self { - Self::new(&value) - } -} - -impl From<&LuaThread<'_>> for ThreadId { - fn from(value: &LuaThread) -> Self { - Self::new(value) - } -} diff --git a/src/tokio/value.rs b/src/tokio/value.rs deleted file mode 100644 index f6181a4..0000000 --- a/src/tokio/value.rs +++ /dev/null @@ -1,287 +0,0 @@ -#![allow(dead_code)] - -use std::time::Duration; - -use mlua::prelude::*; - -#[derive(Debug, Default)] -pub enum AsyncValue { - #[default] - Nil, - Bool(bool), - Number(f64), - String(String), - Bytes(Vec), -} - -impl IntoLua<'_> for AsyncValue { - #[inline] - fn into_lua(self, lua: &Lua) -> LuaResult { - match self { - AsyncValue::Nil => Ok(LuaValue::Nil), - AsyncValue::Bool(b) => Ok(LuaValue::Boolean(b)), - AsyncValue::Number(n) => Ok(LuaValue::Number(n)), - AsyncValue::String(s) => Ok(LuaValue::String(lua.create_string(&s)?)), - AsyncValue::Bytes(b) => Ok(LuaValue::String(lua.create_string(&b)?)), - } - } -} - -// Primitives - -impl From<()> for AsyncValue { - #[inline] - fn from(_: ()) -> Self { - AsyncValue::Nil - } -} - -impl From for AsyncValue { - #[inline] - fn from(b: bool) -> Self { - AsyncValue::Bool(b) - } -} - -impl From for AsyncValue { - #[inline] - fn from(u: u8) -> Self { - AsyncValue::Number(u as f64) - } -} - -impl From for AsyncValue { - #[inline] - fn from(u: u16) -> Self { - AsyncValue::Number(u as f64) - } -} - -impl From for AsyncValue { - #[inline] - fn from(u: u32) -> Self { - AsyncValue::Number(u as f64) - } -} - -impl From for AsyncValue { - #[inline] - fn from(u: u64) -> Self { - AsyncValue::Number(u as f64) - } -} - -impl From for AsyncValue { - #[inline] - fn from(i: i8) -> Self { - AsyncValue::Number(i as f64) - } -} - -impl From for AsyncValue { - #[inline] - fn from(i: i16) -> Self { - AsyncValue::Number(i as f64) - } -} - -impl From for AsyncValue { - #[inline] - fn from(i: i32) -> Self { - AsyncValue::Number(i as f64) - } -} - -impl From for AsyncValue { - #[inline] - fn from(i: i64) -> Self { - AsyncValue::Number(i as f64) - } -} - -impl From for AsyncValue { - #[inline] - fn from(n: f32) -> Self { - AsyncValue::Number(n as f64) - } -} - -impl From for AsyncValue { - #[inline] - fn from(n: f64) -> Self { - AsyncValue::Number(n) - } -} - -impl From for AsyncValue { - #[inline] - fn from(s: String) -> Self { - AsyncValue::String(s) - } -} - -impl From<&String> for AsyncValue { - #[inline] - fn from(s: &String) -> Self { - AsyncValue::String(s.to_owned()) - } -} - -impl From<&str> for AsyncValue { - #[inline] - fn from(s: &str) -> Self { - AsyncValue::String(s.to_owned()) - } -} - -impl From> for AsyncValue { - #[inline] - fn from(b: Vec) -> Self { - AsyncValue::Bytes(b) - } -} - -impl From<&Vec> for AsyncValue { - #[inline] - fn from(b: &Vec) -> Self { - AsyncValue::Bytes(b.to_owned()) - } -} - -impl From<&[u8]> for AsyncValue { - #[inline] - fn from(b: &[u8]) -> Self { - AsyncValue::Bytes(b.to_owned()) - } -} - -// Other types - -impl From for AsyncValue { - #[inline] - fn from(d: Duration) -> Self { - AsyncValue::Number(d.as_secs_f64()) - } -} - -// Multi args - -#[derive(Debug, Default)] -pub struct AsyncValues { - inner: Vec, -} - -impl AsyncValues { - #[inline] - pub fn new() -> Self { - Self::default() - } -} - -impl IntoLuaMulti<'_> for AsyncValues { - #[inline] - fn into_lua_multi(self, lua: &Lua) -> LuaResult { - Ok(LuaMultiValue::from_vec( - self.inner - .into_iter() - .map(|arg| arg.into_lua(lua)) - .collect::>>()?, - )) - } -} - -// Boilerplate - -impl From for AsyncValues -where - T: Into, -{ - #[inline] - fn from(t: T) -> Self { - AsyncValues { - inner: vec![t.into()], - } - } -} - -impl From<(T0, T1)> for AsyncValues -where - T0: Into, - T1: Into, -{ - #[inline] - fn from((t0, t1): (T0, T1)) -> Self { - AsyncValues { - inner: vec![t0.into(), t1.into()], - } - } -} - -impl From<(T0, T1, T2)> for AsyncValues -where - T0: Into, - T1: Into, - T2: Into, -{ - #[inline] - fn from((t0, t1, t2): (T0, T1, T2)) -> Self { - AsyncValues { - inner: vec![t0.into(), t1.into(), t2.into()], - } - } -} - -impl From<(T0, T1, T2, T3)> for AsyncValues -where - T0: Into, - T1: Into, - T2: Into, - T3: Into, -{ - #[inline] - fn from((t0, t1, t2, t3): (T0, T1, T2, T3)) -> Self { - AsyncValues { - inner: vec![t0.into(), t1.into(), t2.into(), t3.into()], - } - } -} - -impl From<(T0, T1, T2, T3, T4)> for AsyncValues -where - T0: Into, - T1: Into, - T2: Into, - T3: Into, - T4: Into, -{ - #[inline] - fn from((t0, t1, t2, t3, t4): (T0, T1, T2, T3, T4)) -> Self { - AsyncValues { - inner: vec![t0.into(), t1.into(), t2.into(), t3.into(), t4.into()], - } - } -} - -impl From<(T0, T1, T2, T3, T4, T5)> for AsyncValues -where - T0: Into, - T1: Into, - T2: Into, - T3: Into, - T4: Into, - T5: Into, -{ - #[inline] - fn from((t0, t1, t2, t3, t4, t5): (T0, T1, T2, T3, T4, T5)) -> Self { - AsyncValues { - inner: vec![ - t0.into(), - t1.into(), - t2.into(), - t3.into(), - t4.into(), - t5.into(), - ], - } - } -}