Proper implementation of async process spawn

This commit is contained in:
Filip Tibell 2023-02-09 22:23:22 +01:00
parent 72f6b356c4
commit 7699d06647
No known key found for this signature in database
7 changed files with 146 additions and 110 deletions

32
Cargo.lock generated
View file

@ -579,11 +579,11 @@ dependencies = [
"lazy_static", "lazy_static",
"mlua", "mlua",
"os_str_bytes", "os_str_bytes",
"pin-project",
"reqwest", "reqwest",
"serde", "serde",
"serde_json", "serde_json",
"tokio", "tokio",
"tokio-pipe",
] ]
[[package]] [[package]]
@ -727,6 +727,26 @@ version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "478c572c3d73181ff3c2539045f6eb99e5491218eae919370993b890cdbdd98e" checksum = "478c572c3d73181ff3c2539045f6eb99e5491218eae919370993b890cdbdd98e"
[[package]]
name = "pin-project"
version = "1.0.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ad29a609b6bcd67fee905812e544992d216af9d755757c05ed2d0e15a74c6ecc"
dependencies = [
"pin-project-internal",
]
[[package]]
name = "pin-project-internal"
version = "1.0.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "069bdb1e05adc7a8990dce9cc75370895fbe4e3d58b9b73bf1aee56359344a55"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]] [[package]]
name = "pin-project-lite" name = "pin-project-lite"
version = "0.2.9" version = "0.2.9"
@ -1144,16 +1164,6 @@ dependencies = [
"syn", "syn",
] ]
[[package]]
name = "tokio-pipe"
version = "0.2.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f213a84bffbd61b8fa0ba8a044b4bbe35d471d0b518867181e82bd5c15542784"
dependencies = [
"libc",
"tokio",
]
[[package]] [[package]]
name = "tokio-rustls" name = "tokio-rustls"
version = "0.23.4" version = "0.23.4"

View file

@ -2,3 +2,4 @@
luau-lsp = "JohnnyMorganz/luau-lsp@1.15.0" luau-lsp = "JohnnyMorganz/luau-lsp@1.15.0"
selene = "Kampfkarren/selene@0.24.0" selene = "Kampfkarren/selene@0.24.0"
stylua = "JohnnyMorganz/StyLua@0.16.0" stylua = "JohnnyMorganz/StyLua@0.16.0"
rojo = "rojo-rbx/rojo@7.2.1"

View file

@ -23,8 +23,8 @@ reqwest.workspace = true
console = "0.15.5" console = "0.15.5"
dialoguer = "0.10.3" dialoguer = "0.10.3"
lazy_static = "1.4.0" lazy_static = "1.4.0"
pin-project = "1.0.12"
os_str_bytes = "6.4.1" os_str_bytes = "6.4.1"
tokio-pipe = "0.2.12"
hyper = { version = "0.14.24", features = ["full"] } hyper = { version = "0.14.24", features = ["full"] }
mlua = { version = "0.8.7", features = ["luau", "async", "serialize"] } mlua = { version = "0.8.7", features = ["luau", "async", "serialize"] }

View file

@ -0,0 +1,64 @@
use std::{
io::Write,
pin::Pin,
task::{Context, Poll},
};
use pin_project::pin_project;
use tokio::io::{self, AsyncWrite};
#[pin_project]
pub struct AsyncTeeWriter<'a, W>
where
W: AsyncWrite + Unpin,
{
#[pin]
writer: &'a mut W,
buffer: Vec<u8>,
}
impl<'a, W> AsyncTeeWriter<'a, W>
where
W: AsyncWrite + Unpin,
{
pub fn new(writer: &'a mut W) -> Self {
Self {
writer,
buffer: Vec::new(),
}
}
pub fn into_vec(self) -> Vec<u8> {
self.buffer
}
}
impl<'a, W> AsyncWrite for AsyncTeeWriter<'a, W>
where
W: AsyncWrite + Unpin,
{
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
let mut this = self.project();
match this.writer.as_mut().poll_write(cx, buf) {
Poll::Ready(res) => {
this.buffer
.write_all(buf)
.expect("Failed to write to internal tee buffer");
Poll::Ready(res)
}
Poll::Pending => Poll::Pending,
}
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.project().writer.as_mut().poll_flush(cx)
}
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.project().writer.as_mut().poll_flush(cx)
}
}

View file

@ -1,4 +1,5 @@
pub mod formatting; pub mod formatting;
pub mod futures;
pub mod net; pub mod net;
pub mod process; pub mod process;
pub mod table; pub mod table;

View file

@ -1,113 +1,46 @@
// https://stackoverflow.com/questions/71141122/- use std::{process::ExitStatus, sync::Weak, time::Duration};
use std::{
pin::Pin,
process::ExitStatus,
sync::Weak,
task::{Context, Poll},
time::Duration,
};
use mlua::prelude::*; use mlua::prelude::*;
use tokio::{ use tokio::{io, process::Child, sync::mpsc::Sender, task::spawn, time::sleep};
io::{self, AsyncWrite, AsyncWriteExt},
process::Child,
sync::mpsc::Sender,
task, time,
};
use crate::LuneMessage; use crate::LuneMessage;
pub struct TeeWriter<'a, L, R> use super::futures::AsyncTeeWriter;
where
L: AsyncWrite + Unpin,
R: AsyncWrite + Unpin,
{
left: &'a mut L,
right: &'a mut R,
}
impl<'a, L, R> TeeWriter<'a, L, R>
where
L: AsyncWrite + Unpin,
R: AsyncWrite + Unpin,
{
pub fn new(left: &'a mut L, right: &'a mut R) -> Self {
Self { left, right }
}
}
impl<'a, L, R> AsyncWrite for TeeWriter<'a, L, R>
where
L: AsyncWrite + Unpin,
R: AsyncWrite + Unpin,
{
fn poll_write(
mut self: Pin<&mut Self>,
_: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
// TODO: Figure out how to poll both of these
// futures, we can't use await in this trait impl
// It might be better to split the generic TeeWriter out
// and instead make TeeStdoutWriter and TeeStderrWriter
// structs that use Stdout and Stderr + Vec directly,
// all of which already implement these traits for us
self.left.write_all(buf);
self.right.write_all(buf);
Poll::Ready(Ok(buf.len()))
}
fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
todo!()
}
fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
todo!()
}
}
pub async fn pipe_and_inherit_child_process_stdio( pub async fn pipe_and_inherit_child_process_stdio(
mut child: Child, mut child: Child,
) -> LuaResult<(ExitStatus, Vec<u8>, Vec<u8>)> { ) -> LuaResult<(ExitStatus, Vec<u8>, Vec<u8>)> {
// https://stackoverflow.com/questions/71141122/-
let mut child_stdout = child.stdout.take().unwrap(); let mut child_stdout = child.stdout.take().unwrap();
let mut child_stderr = child.stderr.take().unwrap(); let mut child_stderr = child.stderr.take().unwrap();
// TODO: Or maybe we could just spawn four local tasks instead, let stdout_thread = spawn(async move {
// one for each vec and one each for Stdout and Stderr, then we
// join the local tasks for the vecs to get out our results
let stdout_thread = task::spawn_local(async move {
let mut stdout = io::stdout(); let mut stdout = io::stdout();
let mut log = Vec::new(); let mut tee = AsyncTeeWriter::new(&mut stdout);
let mut tee = TeeWriter::new(&mut stdout, &mut log);
io::copy(&mut child_stdout, &mut tee) io::copy(&mut child_stdout, &mut tee)
.await .await
.map_err(LuaError::external)?; .map_err(LuaError::external)?;
Ok(log) Ok::<_, LuaError>(tee.into_vec())
}); });
let stderr_thread = task::spawn_local(async move { let stderr_thread = spawn(async move {
let mut stderr = io::stderr(); let mut stderr = io::stderr();
let mut log = Vec::new(); let mut tee = AsyncTeeWriter::new(&mut stderr);
let mut tee = TeeWriter::new(&mut stderr, &mut log);
io::copy(&mut child_stderr, &mut tee) io::copy(&mut child_stderr, &mut tee)
.await .await
.map_err(LuaError::external)?; .map_err(LuaError::external)?;
Ok(log) Ok::<_, LuaError>(tee.into_vec())
}); });
let status = child.wait().await.expect("child wasn't running"); let status = child.wait().await.expect("Child process failed to start");
let stdout_log: Result<_, LuaError> = stdout_thread.await.expect("stdout thread panicked"); let stdout_buffer = stdout_thread.await.expect("Tee writer for stdout errored");
let stderr_log: Result<_, LuaError> = stderr_thread.await.expect("stderr thread panicked"); let stderr_buffer = stderr_thread.await.expect("Tee writer for stderr errored");
Ok::<_, LuaError>((status, stdout_log?, stderr_log?)) Ok::<_, LuaError>((status, stdout_buffer?, stderr_buffer?))
} }
pub async fn exit_and_yield_forever(lua: &Lua, exit_code: Option<u8>) -> LuaResult<()> { pub async fn exit_and_yield_forever(lua: &Lua, exit_code: Option<u8>) -> LuaResult<()> {
@ -124,6 +57,6 @@ pub async fn exit_and_yield_forever(lua: &Lua, exit_code: Option<u8>) -> LuaResu
.map_err(LuaError::external)?; .map_err(LuaError::external)?;
// Make sure to block the rest of this thread indefinitely since // Make sure to block the rest of this thread indefinitely since
// the main thread may not register the exit signal right away // the main thread may not register the exit signal right away
time::sleep(Duration::MAX).await; sleep(Duration::MAX).await;
Ok(()) Ok(())
} }

View file

@ -46,27 +46,54 @@ end
-- Setting cwd should not change the cwd of this process -- Setting cwd should not change the cwd of this process
local before = process.spawn("pwd").stdout local pwdBefore = process.spawn("pwd").stdout
process.spawn("ls", {}, { process.spawn("ls", {}, {
cwd = "/", cwd = "/",
shell = true, shell = true,
}) })
local after = process.spawn("pwd").stdout local pwdAfter = process.spawn("pwd").stdout
assert(before == after, "Current working directory changed after running child process") assert(pwdBefore == pwdAfter, "Current working directory changed after running child process")
-- Inheriting stdio & environment variables should work -- Inheriting stdio & environment variables should work
task.delay(2, function() local echoMessage = "Hello from child process!"
local message = "Hello from child process!" local echoResult = process.spawn("echo", {
local result = process.spawn("echo", {
'"$TEST_VAR"', '"$TEST_VAR"',
}, { }, {
env = { TEST_VAR = message }, env = { TEST_VAR = echoMessage },
shell = "bash", shell = "bash",
stdio = "inherit", stdio = "inherit",
}) })
assert( assert(
result.stdout == (message .. "\n"), -- Note that echo adds a newline echoResult.stdout == (echoMessage .. "\n"), -- Note that echo adds a newline
"Inheriting stdio did not return proper output" "Inheriting stdio did not return proper output"
) )
--[[
Spawning a process should not block any lua thread(s)
We test this by sleeping more than once concurrently
and then ensuring that the total time slept is more
than a single sleep but also less than 1.5 sleeps
]]
local SLEEP_DURATION = 1 / 4
local SLEEP_SAMPLES = 2
local sleepStart = os.clock()
local sleepCounter = 0
for i = 1, SLEEP_SAMPLES, 1 do
task.spawn(function()
process.spawn("sleep", { tostring(SLEEP_DURATION) })
sleepCounter += 1
end) end)
end
while sleepCounter < SLEEP_SAMPLES do
task.wait()
end
local sleepElapsed = os.clock() - sleepStart
assert(
(sleepElapsed >= SLEEP_DURATION) and (sleepElapsed < SLEEP_DURATION * 1.5),
"Coroutine yielded the main lua thread during process yield"
)