From 72f6b356c48eaf1d60a1cad204361f182ea13250 Mon Sep 17 00:00:00 2001 From: Filip Tibell Date: Thu, 9 Feb 2023 17:23:06 +0100 Subject: [PATCH] Initial work on async impl of process spawn --- Cargo.lock | 11 +++ packages/lib/Cargo.toml | 1 + packages/lib/src/globals/process.rs | 18 ++-- packages/lib/src/lib.rs | 5 +- packages/lib/src/utils/formatting.rs | 10 +-- packages/lib/src/utils/process.rs | 120 +++++++++++++++++---------- 6 files changed, 99 insertions(+), 66 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 888a504..d45e3db 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -583,6 +583,7 @@ dependencies = [ "serde", "serde_json", "tokio", + "tokio-pipe", ] [[package]] @@ -1143,6 +1144,16 @@ dependencies = [ "syn", ] +[[package]] +name = "tokio-pipe" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f213a84bffbd61b8fa0ba8a044b4bbe35d471d0b518867181e82bd5c15542784" +dependencies = [ + "libc", + "tokio", +] + [[package]] name = "tokio-rustls" version = "0.23.4" diff --git a/packages/lib/Cargo.toml b/packages/lib/Cargo.toml index 49d3117..583e57d 100644 --- a/packages/lib/Cargo.toml +++ b/packages/lib/Cargo.toml @@ -24,6 +24,7 @@ console = "0.15.5" dialoguer = "0.10.3" lazy_static = "1.4.0" os_str_bytes = "6.4.1" +tokio-pipe = "0.2.12" hyper = { version = "0.14.24", features = ["full"] } mlua = { version = "0.8.7", features = ["luau", "async", "serialize"] } diff --git a/packages/lib/src/globals/process.rs b/packages/lib/src/globals/process.rs index d84b259..ac03491 100644 --- a/packages/lib/src/globals/process.rs +++ b/packages/lib/src/globals/process.rs @@ -1,12 +1,8 @@ -use std::{ - collections::HashMap, - env, - path::PathBuf, - process::{Command, Stdio}, -}; +use std::{collections::HashMap, env, path::PathBuf, process::Stdio}; use mlua::prelude::*; use os_str_bytes::RawOsString; +use tokio::process::Command; use crate::utils::{ process::{exit_and_yield_forever, pipe_and_inherit_child_process_stdio}, @@ -17,7 +13,7 @@ pub fn create(lua: &Lua, args_vec: Vec) -> LuaResult<()> { let cwd = env::current_dir()?.canonicalize()?; let mut cwd_str = cwd.to_string_lossy().to_string(); if !cwd_str.ends_with('/') { - cwd_str = format!("{}/", cwd_str); + cwd_str = format!("{cwd_str}/"); } // Create readonly args array let args_tab = TableBuilder::new(lua)? @@ -41,7 +37,7 @@ pub fn create(lua: &Lua, args_vec: Vec) -> LuaResult<()> { .with_value("cwd", cwd_str)? .with_value("env", env_tab)? .with_async_function("exit", process_exit)? - .with_function("spawn", process_spawn)? + .with_async_function("spawn", process_spawn)? .build_readonly()?, ) } @@ -116,7 +112,7 @@ async fn process_exit(lua: &Lua, exit_code: Option) -> LuaResult<()> { exit_and_yield_forever(lua, exit_code).await } -fn process_spawn<'a>( +async fn process_spawn<'a>( lua: &'a Lua, (mut program, args, options): (String, Option>, Option>), ) -> LuaResult> { @@ -233,9 +229,9 @@ fn process_spawn<'a>( .spawn()?; // Inherit the output and stderr if wanted let result = if child_stdio_inherit { - pipe_and_inherit_child_process_stdio(child) + pipe_and_inherit_child_process_stdio(child).await } else { - let output = child.wait_with_output()?; + let output = child.wait_with_output().await?; Ok((output.status, output.stdout, output.stderr)) }; // Extract result diff --git a/packages/lib/src/lib.rs b/packages/lib/src/lib.rs index 5c69acd..9efdc65 100644 --- a/packages/lib/src/lib.rs +++ b/packages/lib/src/lib.rs @@ -106,7 +106,7 @@ impl Lune { task_set.spawn_local(async move { let result = script_lua .load(&script_chunk) - .set_name(&format!("={}", script_name)) + .set_name(&format!("={script_name}")) .unwrap() .eval_async::() .await; @@ -132,8 +132,7 @@ impl Lune { message => { if task_count == 0 { return Err(format!( - "Got message while task count was 0!\nMessage: {:#?}", - message + "Got message while task count was 0!\nMessage: {message:#?}" )); } } diff --git a/packages/lib/src/utils/formatting.rs b/packages/lib/src/utils/formatting.rs index 7f1e089..6bfdb6d 100644 --- a/packages/lib/src/utils/formatting.rs +++ b/packages/lib/src/utils/formatting.rs @@ -242,19 +242,13 @@ pub fn pretty_format_luau_error(e: &LuaError) -> String { let msg = message .clone() .map_or_else(String::new, |m| format!("\nDetails:\n\t{m}")); - format!( - "Failed to convert Rust type '{}' into Luau type '{}'!{}", - from, to, msg - ) + format!("Failed to convert Rust type '{from}' into Luau type '{to}'!{msg}") } LuaError::FromLuaConversionError { from, to, message } => { let msg = message .clone() .map_or_else(String::new, |m| format!("\nDetails:\n\t{m}")); - format!( - "Failed to convert Luau type '{}' into Rust type '{}'!{}", - from, to, msg - ) + format!("Failed to convert Luau type '{from}' into Rust type '{to}'!{msg}") } e => format!("{e}"), }; diff --git a/packages/lib/src/utils/process.rs b/packages/lib/src/utils/process.rs index 5d7398d..0780f88 100644 --- a/packages/lib/src/utils/process.rs +++ b/packages/lib/src/utils/process.rs @@ -1,81 +1,113 @@ // https://stackoverflow.com/questions/71141122/- use std::{ - io, - io::Write, - process::{Child, ExitStatus}, + pin::Pin, + process::ExitStatus, sync::Weak, + task::{Context, Poll}, time::Duration, }; use mlua::prelude::*; -use tokio::{sync::mpsc::Sender, time}; +use tokio::{ + io::{self, AsyncWrite, AsyncWriteExt}, + process::Child, + sync::mpsc::Sender, + task, time, +}; use crate::LuneMessage; -pub struct TeeWriter<'a, W0: Write, W1: Write> { - w0: &'a mut W0, - w1: &'a mut W1, +pub struct TeeWriter<'a, L, R> +where + L: AsyncWrite + Unpin, + R: AsyncWrite + Unpin, +{ + left: &'a mut L, + right: &'a mut R, } -impl<'a, W0: Write, W1: Write> TeeWriter<'a, W0, W1> { - pub fn new(w0: &'a mut W0, w1: &'a mut W1) -> Self { - Self { w0, w1 } +impl<'a, L, R> TeeWriter<'a, L, R> +where + L: AsyncWrite + Unpin, + R: AsyncWrite + Unpin, +{ + pub fn new(left: &'a mut L, right: &'a mut R) -> Self { + Self { left, right } } } -impl<'a, W0: Write, W1: Write> Write for TeeWriter<'a, W0, W1> { - fn write(&mut self, buf: &[u8]) -> io::Result { - // We have to use write_all() otherwise what - // happens if different amounts are written? - self.w0.write_all(buf)?; - self.w1.write_all(buf)?; - Ok(buf.len()) +impl<'a, L, R> AsyncWrite for TeeWriter<'a, L, R> +where + L: AsyncWrite + Unpin, + R: AsyncWrite + Unpin, +{ + fn poll_write( + mut self: Pin<&mut Self>, + _: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + // TODO: Figure out how to poll both of these + // futures, we can't use await in this trait impl + // It might be better to split the generic TeeWriter out + // and instead make TeeStdoutWriter and TeeStderrWriter + // structs that use Stdout and Stderr + Vec directly, + // all of which already implement these traits for us + self.left.write_all(buf); + self.right.write_all(buf); + Poll::Ready(Ok(buf.len())) } - fn flush(&mut self) -> io::Result<()> { - self.w0.flush()?; - self.w1.flush()?; - Ok(()) + fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + todo!() + } + + fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + todo!() } } -pub fn pipe_and_inherit_child_process_stdio( +pub async fn pipe_and_inherit_child_process_stdio( mut child: Child, ) -> LuaResult<(ExitStatus, Vec, Vec)> { // https://stackoverflow.com/questions/71141122/- let mut child_stdout = child.stdout.take().unwrap(); let mut child_stderr = child.stderr.take().unwrap(); - std::thread::scope(|s| { - let stdout_thread = s.spawn(|| { - let stdout = io::stdout(); - let mut log = Vec::new(); - let mut stdout = stdout.lock(); - let mut tee = TeeWriter::new(&mut stdout, &mut log); - io::copy(&mut child_stdout, &mut tee).map_err(LuaError::external)?; + // TODO: Or maybe we could just spawn four local tasks instead, + // one for each vec and one each for Stdout and Stderr, then we + // join the local tasks for the vecs to get out our results - Ok(log) - }); + let stdout_thread = task::spawn_local(async move { + let mut stdout = io::stdout(); + let mut log = Vec::new(); + let mut tee = TeeWriter::new(&mut stdout, &mut log); - let stderr_thread = s.spawn(|| { - let stderr = io::stderr(); - let mut log = Vec::new(); - let mut stderr = stderr.lock(); - let mut tee = TeeWriter::new(&mut stderr, &mut log); + io::copy(&mut child_stdout, &mut tee) + .await + .map_err(LuaError::external)?; - io::copy(&mut child_stderr, &mut tee).map_err(LuaError::external)?; + Ok(log) + }); - Ok(log) - }); + let stderr_thread = task::spawn_local(async move { + let mut stderr = io::stderr(); + let mut log = Vec::new(); + let mut tee = TeeWriter::new(&mut stderr, &mut log); - let status = child.wait().expect("child wasn't running"); + io::copy(&mut child_stderr, &mut tee) + .await + .map_err(LuaError::external)?; - let stdout_log: Result<_, LuaError> = stdout_thread.join().expect("stdout thread panicked"); - let stderr_log: Result<_, LuaError> = stderr_thread.join().expect("stderr thread panicked"); + Ok(log) + }); - Ok::<_, LuaError>((status, stdout_log?, stderr_log?)) - }) + let status = child.wait().await.expect("child wasn't running"); + + let stdout_log: Result<_, LuaError> = stdout_thread.await.expect("stdout thread panicked"); + let stderr_log: Result<_, LuaError> = stderr_thread.await.expect("stderr thread panicked"); + + Ok::<_, LuaError>((status, stdout_log?, stderr_log?)) } pub async fn exit_and_yield_forever(lua: &Lua, exit_code: Option) -> LuaResult<()> {