From 7699d06647c63f7384ff6d2f3ad81df6a20ae3a0 Mon Sep 17 00:00:00 2001 From: Filip Tibell Date: Thu, 9 Feb 2023 22:23:22 +0100 Subject: [PATCH] Proper implementation of async process spawn --- Cargo.lock | 32 +++++++---- aftman.toml | 1 + packages/lib/Cargo.toml | 2 +- packages/lib/src/utils/futures.rs | 64 +++++++++++++++++++++ packages/lib/src/utils/mod.rs | 1 + packages/lib/src/utils/process.rs | 95 +++++-------------------------- tests/process/spawn.luau | 61 ++++++++++++++------ 7 files changed, 146 insertions(+), 110 deletions(-) create mode 100644 packages/lib/src/utils/futures.rs diff --git a/Cargo.lock b/Cargo.lock index d45e3db..8b57198 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -579,11 +579,11 @@ dependencies = [ "lazy_static", "mlua", "os_str_bytes", + "pin-project", "reqwest", "serde", "serde_json", "tokio", - "tokio-pipe", ] [[package]] @@ -727,6 +727,26 @@ version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "478c572c3d73181ff3c2539045f6eb99e5491218eae919370993b890cdbdd98e" +[[package]] +name = "pin-project" +version = "1.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad29a609b6bcd67fee905812e544992d216af9d755757c05ed2d0e15a74c6ecc" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "069bdb1e05adc7a8990dce9cc75370895fbe4e3d58b9b73bf1aee56359344a55" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "pin-project-lite" version = "0.2.9" @@ -1144,16 +1164,6 @@ dependencies = [ "syn", ] -[[package]] -name = "tokio-pipe" -version = "0.2.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f213a84bffbd61b8fa0ba8a044b4bbe35d471d0b518867181e82bd5c15542784" -dependencies = [ - "libc", - "tokio", -] - [[package]] name = "tokio-rustls" version = "0.23.4" diff --git a/aftman.toml b/aftman.toml index 4644129..82dcca1 100644 --- a/aftman.toml +++ b/aftman.toml @@ -2,3 +2,4 @@ luau-lsp = "JohnnyMorganz/luau-lsp@1.15.0" selene = "Kampfkarren/selene@0.24.0" stylua = "JohnnyMorganz/StyLua@0.16.0" +rojo = "rojo-rbx/rojo@7.2.1" diff --git a/packages/lib/Cargo.toml b/packages/lib/Cargo.toml index 583e57d..5bab490 100644 --- a/packages/lib/Cargo.toml +++ b/packages/lib/Cargo.toml @@ -23,8 +23,8 @@ reqwest.workspace = true console = "0.15.5" dialoguer = "0.10.3" lazy_static = "1.4.0" +pin-project = "1.0.12" os_str_bytes = "6.4.1" -tokio-pipe = "0.2.12" hyper = { version = "0.14.24", features = ["full"] } mlua = { version = "0.8.7", features = ["luau", "async", "serialize"] } diff --git a/packages/lib/src/utils/futures.rs b/packages/lib/src/utils/futures.rs new file mode 100644 index 0000000..5c3124f --- /dev/null +++ b/packages/lib/src/utils/futures.rs @@ -0,0 +1,64 @@ +use std::{ + io::Write, + pin::Pin, + task::{Context, Poll}, +}; + +use pin_project::pin_project; +use tokio::io::{self, AsyncWrite}; + +#[pin_project] +pub struct AsyncTeeWriter<'a, W> +where + W: AsyncWrite + Unpin, +{ + #[pin] + writer: &'a mut W, + buffer: Vec, +} + +impl<'a, W> AsyncTeeWriter<'a, W> +where + W: AsyncWrite + Unpin, +{ + pub fn new(writer: &'a mut W) -> Self { + Self { + writer, + buffer: Vec::new(), + } + } + + pub fn into_vec(self) -> Vec { + self.buffer + } +} + +impl<'a, W> AsyncWrite for AsyncTeeWriter<'a, W> +where + W: AsyncWrite + Unpin, +{ + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + let mut this = self.project(); + match this.writer.as_mut().poll_write(cx, buf) { + Poll::Ready(res) => { + this.buffer + .write_all(buf) + .expect("Failed to write to internal tee buffer"); + Poll::Ready(res) + } + Poll::Pending => Poll::Pending, + } + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().writer.as_mut().poll_flush(cx) + } + + fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().writer.as_mut().poll_flush(cx) + } +} diff --git a/packages/lib/src/utils/mod.rs b/packages/lib/src/utils/mod.rs index a43306a..56c7591 100644 --- a/packages/lib/src/utils/mod.rs +++ b/packages/lib/src/utils/mod.rs @@ -1,4 +1,5 @@ pub mod formatting; +pub mod futures; pub mod net; pub mod process; pub mod table; diff --git a/packages/lib/src/utils/process.rs b/packages/lib/src/utils/process.rs index 0780f88..af4e344 100644 --- a/packages/lib/src/utils/process.rs +++ b/packages/lib/src/utils/process.rs @@ -1,113 +1,46 @@ -// https://stackoverflow.com/questions/71141122/- - -use std::{ - pin::Pin, - process::ExitStatus, - sync::Weak, - task::{Context, Poll}, - time::Duration, -}; +use std::{process::ExitStatus, sync::Weak, time::Duration}; use mlua::prelude::*; -use tokio::{ - io::{self, AsyncWrite, AsyncWriteExt}, - process::Child, - sync::mpsc::Sender, - task, time, -}; +use tokio::{io, process::Child, sync::mpsc::Sender, task::spawn, time::sleep}; use crate::LuneMessage; -pub struct TeeWriter<'a, L, R> -where - L: AsyncWrite + Unpin, - R: AsyncWrite + Unpin, -{ - left: &'a mut L, - right: &'a mut R, -} - -impl<'a, L, R> TeeWriter<'a, L, R> -where - L: AsyncWrite + Unpin, - R: AsyncWrite + Unpin, -{ - pub fn new(left: &'a mut L, right: &'a mut R) -> Self { - Self { left, right } - } -} - -impl<'a, L, R> AsyncWrite for TeeWriter<'a, L, R> -where - L: AsyncWrite + Unpin, - R: AsyncWrite + Unpin, -{ - fn poll_write( - mut self: Pin<&mut Self>, - _: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - // TODO: Figure out how to poll both of these - // futures, we can't use await in this trait impl - // It might be better to split the generic TeeWriter out - // and instead make TeeStdoutWriter and TeeStderrWriter - // structs that use Stdout and Stderr + Vec directly, - // all of which already implement these traits for us - self.left.write_all(buf); - self.right.write_all(buf); - Poll::Ready(Ok(buf.len())) - } - - fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { - todo!() - } - - fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { - todo!() - } -} +use super::futures::AsyncTeeWriter; pub async fn pipe_and_inherit_child_process_stdio( mut child: Child, ) -> LuaResult<(ExitStatus, Vec, Vec)> { - // https://stackoverflow.com/questions/71141122/- let mut child_stdout = child.stdout.take().unwrap(); let mut child_stderr = child.stderr.take().unwrap(); - // TODO: Or maybe we could just spawn four local tasks instead, - // one for each vec and one each for Stdout and Stderr, then we - // join the local tasks for the vecs to get out our results - - let stdout_thread = task::spawn_local(async move { + let stdout_thread = spawn(async move { let mut stdout = io::stdout(); - let mut log = Vec::new(); - let mut tee = TeeWriter::new(&mut stdout, &mut log); + let mut tee = AsyncTeeWriter::new(&mut stdout); io::copy(&mut child_stdout, &mut tee) .await .map_err(LuaError::external)?; - Ok(log) + Ok::<_, LuaError>(tee.into_vec()) }); - let stderr_thread = task::spawn_local(async move { + let stderr_thread = spawn(async move { let mut stderr = io::stderr(); - let mut log = Vec::new(); - let mut tee = TeeWriter::new(&mut stderr, &mut log); + let mut tee = AsyncTeeWriter::new(&mut stderr); io::copy(&mut child_stderr, &mut tee) .await .map_err(LuaError::external)?; - Ok(log) + Ok::<_, LuaError>(tee.into_vec()) }); - let status = child.wait().await.expect("child wasn't running"); + let status = child.wait().await.expect("Child process failed to start"); - let stdout_log: Result<_, LuaError> = stdout_thread.await.expect("stdout thread panicked"); - let stderr_log: Result<_, LuaError> = stderr_thread.await.expect("stderr thread panicked"); + let stdout_buffer = stdout_thread.await.expect("Tee writer for stdout errored"); + let stderr_buffer = stderr_thread.await.expect("Tee writer for stderr errored"); - Ok::<_, LuaError>((status, stdout_log?, stderr_log?)) + Ok::<_, LuaError>((status, stdout_buffer?, stderr_buffer?)) } pub async fn exit_and_yield_forever(lua: &Lua, exit_code: Option) -> LuaResult<()> { @@ -124,6 +57,6 @@ pub async fn exit_and_yield_forever(lua: &Lua, exit_code: Option) -> LuaResu .map_err(LuaError::external)?; // Make sure to block the rest of this thread indefinitely since // the main thread may not register the exit signal right away - time::sleep(Duration::MAX).await; + sleep(Duration::MAX).await; Ok(()) } diff --git a/tests/process/spawn.luau b/tests/process/spawn.luau index e7bcc55..042210b 100644 --- a/tests/process/spawn.luau +++ b/tests/process/spawn.luau @@ -46,27 +46,54 @@ end -- Setting cwd should not change the cwd of this process -local before = process.spawn("pwd").stdout +local pwdBefore = process.spawn("pwd").stdout process.spawn("ls", {}, { cwd = "/", shell = true, }) -local after = process.spawn("pwd").stdout -assert(before == after, "Current working directory changed after running child process") +local pwdAfter = process.spawn("pwd").stdout +assert(pwdBefore == pwdAfter, "Current working directory changed after running child process") -- Inheriting stdio & environment variables should work -task.delay(2, function() - local message = "Hello from child process!" - local result = process.spawn("echo", { - '"$TEST_VAR"', - }, { - env = { TEST_VAR = message }, - shell = "bash", - stdio = "inherit", - }) - assert( - result.stdout == (message .. "\n"), -- Note that echo adds a newline - "Inheriting stdio did not return proper output" - ) -end) +local echoMessage = "Hello from child process!" +local echoResult = process.spawn("echo", { + '"$TEST_VAR"', +}, { + env = { TEST_VAR = echoMessage }, + shell = "bash", + stdio = "inherit", +}) +assert( + echoResult.stdout == (echoMessage .. "\n"), -- Note that echo adds a newline + "Inheriting stdio did not return proper output" +) + +--[[ + Spawning a process should not block any lua thread(s) + + We test this by sleeping more than once concurrently + and then ensuring that the total time slept is more + than a single sleep but also less than 1.5 sleeps +]] + +local SLEEP_DURATION = 1 / 4 +local SLEEP_SAMPLES = 2 + +local sleepStart = os.clock() +local sleepCounter = 0 +for i = 1, SLEEP_SAMPLES, 1 do + task.spawn(function() + process.spawn("sleep", { tostring(SLEEP_DURATION) }) + sleepCounter += 1 + end) +end +while sleepCounter < SLEEP_SAMPLES do + task.wait() +end + +local sleepElapsed = os.clock() - sleepStart +assert( + (sleepElapsed >= SLEEP_DURATION) and (sleepElapsed < SLEEP_DURATION * 1.5), + "Coroutine yielded the main lua thread during process yield" +)