From 900b6715b6d5cb0cefb8c71a264055ebf61f8f90 Mon Sep 17 00:00:00 2001 From: Erica Marigold Date: Sun, 9 Jun 2024 13:01:34 +0530 Subject: [PATCH] feat: initial revamped process.spawn impl --- Cargo.lock | 2 + crates/lune-std-process/Cargo.toml | 3 + crates/lune-std-process/src/lib.rs | 125 +++++++++++++++++++++++--- crates/lune-std-process/src/stream.rs | 41 +++++++++ test.luau | 7 ++ 5 files changed, 167 insertions(+), 11 deletions(-) create mode 100644 crates/lune-std-process/src/stream.rs create mode 100644 test.luau diff --git a/Cargo.lock b/Cargo.lock index e6c68e5..434f2e6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1625,6 +1625,8 @@ dependencies = [ name = "lune-std-process" version = "0.1.1" dependencies = [ + "bstr", + "bytes", "directories", "lune-utils", "mlua", diff --git a/crates/lune-std-process/Cargo.toml b/crates/lune-std-process/Cargo.toml index 83a792f..7af5ad2 100644 --- a/crates/lune-std-process/Cargo.toml +++ b/crates/lune-std-process/Cargo.toml @@ -20,6 +20,8 @@ directories = "5.0" pin-project = "1.0" os_str_bytes = { version = "7.0", features = ["conversions"] } +bstr = "1.9" + tokio = { version = "1", default-features = false, features = [ "io-std", "io-util", @@ -29,3 +31,4 @@ tokio = { version = "1", default-features = false, features = [ ] } lune-utils = { version = "0.1.0", path = "../lune-utils" } +bytes = "1.6.0" diff --git a/crates/lune-std-process/src/lib.rs b/crates/lune-std-process/src/lib.rs index d3fd502..04286ff 100644 --- a/crates/lune-std-process/src/lib.rs +++ b/crates/lune-std-process/src/lib.rs @@ -1,12 +1,14 @@ #![allow(clippy::cargo_common_metadata)] use std::{ + cell::RefCell, env::{ self, consts::{ARCH, OS}, }, path::MAIN_SEPARATOR, process::Stdio, + rc::Rc, }; use mlua::prelude::*; @@ -14,14 +16,16 @@ use mlua::prelude::*; use lune_utils::TableBuilder; use mlua_luau_scheduler::{Functions, LuaSpawnExt}; use os_str_bytes::RawOsString; -use tokio::io::AsyncWriteExt; +use stream::{ChildProcessReader, ChildProcessWriter}; +use tokio::{io::AsyncWriteExt, process::Child}; mod options; +mod stream; mod tee_writer; mod wait_for_child; use self::options::ProcessSpawnOptions; -use self::wait_for_child::{wait_for_child, WaitForChildResult}; +use self::wait_for_child::wait_for_child; use lune_utils::path::get_current_dir; @@ -73,6 +77,7 @@ pub fn module(lua: &Lua) -> LuaResult { .with_value("cwd", cwd_str)? .with_value("env", env_tab)? .with_value("exit", process_exit)? + .with_async_function("exec", process_exec)? .with_async_function("spawn", process_spawn)? .build_readonly() } @@ -141,11 +146,16 @@ fn process_env_iter<'lua>( }) } -async fn process_spawn( +async fn process_exec( lua: &Lua, (program, args, options): (String, Option>, ProcessSpawnOptions), ) -> LuaResult { - let res = lua.spawn(spawn_command(program, args, options)).await?; + let res = lua + .spawn(async move { + let cmd = spawn_command(program, args, options.clone()).await?; + wait_for_child(cmd, options.stdio.stdout, options.stdio.stderr).await + }) + .await?; /* NOTE: If an exit code was not given by the child process, @@ -168,22 +178,115 @@ async fn process_spawn( .build_readonly() } +#[allow(clippy::await_holding_refcell_ref)] +async fn process_spawn( + lua: &Lua, + (program, args, options): (String, Option>, ProcessSpawnOptions), +) -> LuaResult { + let mut spawn_options = options.clone(); + spawn_options.stdio.stdin = None; + + let (stdin_tx, stdin_rx) = tokio::sync::oneshot::channel(); + let (stdout_tx, stdout_rx) = tokio::sync::oneshot::channel(); + let (stderr_tx, stderr_rx) = tokio::sync::oneshot::channel(); + let (code_tx, code_rx) = tokio::sync::broadcast::channel(100); + let code_rx_rc = Rc::new(RefCell::new(code_rx)); + + tokio::spawn(async move { + let mut child = spawn_command(program, args, spawn_options) + .await + .expect("Could not spawn child process"); + stdin_tx + .send(child.stdin.take()) + .expect("Stdin receiver was unexpectedly dropped"); + stdout_tx + .send(child.stdout.take()) + .expect("Stdout receiver was unexpectedly dropped"); + stderr_tx + .send(child.stderr.take()) + .expect("Stderr receiver was unexpectedly dropped"); + + let res = child + .wait_with_output() + .await + .expect("Failed to get status and output of spawned child process"); + + let code = res + .status + .code() + .unwrap_or(i32::from(!res.stderr.is_empty())); + + code_tx + .send(code) + .expect("ExitCode receiver was unexpectedly dropped"); + }); + + TableBuilder::new(lua)? + .with_value( + "stdout", + ChildProcessReader( + stdout_rx + .await + .expect("Stdout sender unexpectedly dropped") + .ok_or(LuaError::runtime( + "Cannot read from stdout when it is not piped", + ))?, + ), + )? + .with_value( + "stderr", + ChildProcessReader( + stderr_rx + .await + .expect("Stderr sender unexpectedly dropped") + .ok_or(LuaError::runtime( + "Cannot read from stderr when it is not piped", + ))?, + ), + )? + .with_value( + "stdin", + ChildProcessWriter( + stdin_rx + .await + .expect("Stdin sender unexpectedly dropped") + .unwrap(), + ), + )? + .with_async_function("status", move |lua, ()| { + let code_rx_rc_clone = Rc::clone(&code_rx_rc); + async move { + let code = code_rx_rc_clone + .borrow_mut() + .recv() + .await + .expect("Code sender unexpectedly dropped"); + + TableBuilder::new(lua)? + .with_value("code", code)? + .with_value("success", code == 0)? + .build_readonly() + } + })? + .build_readonly() +} + async fn spawn_command( program: String, args: Option>, mut options: ProcessSpawnOptions, -) -> LuaResult { +) -> LuaResult { let stdout = options.stdio.stdout; let stderr = options.stdio.stderr; let stdin = options.stdio.stdin.take(); + // TODO: Have an stdin_kind which the user can supply as piped or not + // TODO: Maybe even revamp the stdout/stderr kinds? User should only use + // piped when they are sure they want to read the stdout. Currently we default + // to piped let mut child = options .into_command(program, args) - .stdin(if stdin.is_some() { - Stdio::piped() - } else { - Stdio::null() - }) + .stdin(Stdio::piped()) .stdout(stdout.as_stdio()) .stderr(stderr.as_stdio()) .spawn()?; @@ -193,5 +296,5 @@ async fn spawn_command( child_stdin.write_all(&stdin).await.into_lua_err()?; } - wait_for_child(child, stdout, stderr).await + Ok(child) } diff --git a/crates/lune-std-process/src/stream.rs b/crates/lune-std-process/src/stream.rs new file mode 100644 index 0000000..df46d6e --- /dev/null +++ b/crates/lune-std-process/src/stream.rs @@ -0,0 +1,41 @@ +use bstr::BString; +use bytes::BytesMut; +use mlua::prelude::*; +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; + +const CHUNK_SIZE: usize = 8; + +#[derive(Debug, Clone)] +pub struct ChildProcessReader(pub R); +#[derive(Debug, Clone)] +pub struct ChildProcessWriter(pub W); + +impl ChildProcessReader { + pub async fn read(&mut self) -> LuaResult> { + let mut buf = BytesMut::with_capacity(CHUNK_SIZE); + self.0.read_buf(&mut buf).await?; + + Ok(buf.to_vec()) + } +} + +impl LuaUserData for ChildProcessReader { + fn add_methods<'lua, M: LuaUserDataMethods<'lua, Self>>(methods: &mut M) { + methods.add_async_method_mut("read", |lua, this, ()| async { + Ok(lua.create_buffer(this.read().await?)) + }); + } +} + +impl ChildProcessWriter { + pub async fn write(&mut self, data: BString) -> LuaResult<()> { + self.0.write_all(data.as_ref()).await?; + Ok(()) + } +} + +impl LuaUserData for ChildProcessWriter { + fn add_methods<'lua, M: LuaUserDataMethods<'lua, Self>>(methods: &mut M) { + methods.add_async_method_mut("write", |_, this, data| async { this.write(data).await }); + } +} diff --git a/test.luau b/test.luau new file mode 100644 index 0000000..983c0a3 --- /dev/null +++ b/test.luau @@ -0,0 +1,7 @@ +local process = require("@lune/process") + +local a = process.spawn("yes", {}) + +print(a) +print(buffer.tostring(a.stdout:read())) +print(a.status()) \ No newline at end of file