Initial work on async impl of process spawn

This commit is contained in:
Filip Tibell 2023-02-09 17:23:06 +01:00
parent 7aa8896997
commit 72f6b356c4
No known key found for this signature in database
6 changed files with 99 additions and 66 deletions

11
Cargo.lock generated
View file

@ -583,6 +583,7 @@ dependencies = [
"serde", "serde",
"serde_json", "serde_json",
"tokio", "tokio",
"tokio-pipe",
] ]
[[package]] [[package]]
@ -1143,6 +1144,16 @@ dependencies = [
"syn", "syn",
] ]
[[package]]
name = "tokio-pipe"
version = "0.2.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f213a84bffbd61b8fa0ba8a044b4bbe35d471d0b518867181e82bd5c15542784"
dependencies = [
"libc",
"tokio",
]
[[package]] [[package]]
name = "tokio-rustls" name = "tokio-rustls"
version = "0.23.4" version = "0.23.4"

View file

@ -24,6 +24,7 @@ console = "0.15.5"
dialoguer = "0.10.3" dialoguer = "0.10.3"
lazy_static = "1.4.0" lazy_static = "1.4.0"
os_str_bytes = "6.4.1" os_str_bytes = "6.4.1"
tokio-pipe = "0.2.12"
hyper = { version = "0.14.24", features = ["full"] } hyper = { version = "0.14.24", features = ["full"] }
mlua = { version = "0.8.7", features = ["luau", "async", "serialize"] } mlua = { version = "0.8.7", features = ["luau", "async", "serialize"] }

View file

@ -1,12 +1,8 @@
use std::{ use std::{collections::HashMap, env, path::PathBuf, process::Stdio};
collections::HashMap,
env,
path::PathBuf,
process::{Command, Stdio},
};
use mlua::prelude::*; use mlua::prelude::*;
use os_str_bytes::RawOsString; use os_str_bytes::RawOsString;
use tokio::process::Command;
use crate::utils::{ use crate::utils::{
process::{exit_and_yield_forever, pipe_and_inherit_child_process_stdio}, process::{exit_and_yield_forever, pipe_and_inherit_child_process_stdio},
@ -17,7 +13,7 @@ pub fn create(lua: &Lua, args_vec: Vec<String>) -> LuaResult<()> {
let cwd = env::current_dir()?.canonicalize()?; let cwd = env::current_dir()?.canonicalize()?;
let mut cwd_str = cwd.to_string_lossy().to_string(); let mut cwd_str = cwd.to_string_lossy().to_string();
if !cwd_str.ends_with('/') { if !cwd_str.ends_with('/') {
cwd_str = format!("{}/", cwd_str); cwd_str = format!("{cwd_str}/");
} }
// Create readonly args array // Create readonly args array
let args_tab = TableBuilder::new(lua)? let args_tab = TableBuilder::new(lua)?
@ -41,7 +37,7 @@ pub fn create(lua: &Lua, args_vec: Vec<String>) -> LuaResult<()> {
.with_value("cwd", cwd_str)? .with_value("cwd", cwd_str)?
.with_value("env", env_tab)? .with_value("env", env_tab)?
.with_async_function("exit", process_exit)? .with_async_function("exit", process_exit)?
.with_function("spawn", process_spawn)? .with_async_function("spawn", process_spawn)?
.build_readonly()?, .build_readonly()?,
) )
} }
@ -116,7 +112,7 @@ async fn process_exit(lua: &Lua, exit_code: Option<u8>) -> LuaResult<()> {
exit_and_yield_forever(lua, exit_code).await exit_and_yield_forever(lua, exit_code).await
} }
fn process_spawn<'a>( async fn process_spawn<'a>(
lua: &'a Lua, lua: &'a Lua,
(mut program, args, options): (String, Option<Vec<String>>, Option<LuaTable<'a>>), (mut program, args, options): (String, Option<Vec<String>>, Option<LuaTable<'a>>),
) -> LuaResult<LuaTable<'a>> { ) -> LuaResult<LuaTable<'a>> {
@ -233,9 +229,9 @@ fn process_spawn<'a>(
.spawn()?; .spawn()?;
// Inherit the output and stderr if wanted // Inherit the output and stderr if wanted
let result = if child_stdio_inherit { let result = if child_stdio_inherit {
pipe_and_inherit_child_process_stdio(child) pipe_and_inherit_child_process_stdio(child).await
} else { } else {
let output = child.wait_with_output()?; let output = child.wait_with_output().await?;
Ok((output.status, output.stdout, output.stderr)) Ok((output.status, output.stdout, output.stderr))
}; };
// Extract result // Extract result

View file

@ -106,7 +106,7 @@ impl Lune {
task_set.spawn_local(async move { task_set.spawn_local(async move {
let result = script_lua let result = script_lua
.load(&script_chunk) .load(&script_chunk)
.set_name(&format!("={}", script_name)) .set_name(&format!("={script_name}"))
.unwrap() .unwrap()
.eval_async::<LuaValue>() .eval_async::<LuaValue>()
.await; .await;
@ -132,8 +132,7 @@ impl Lune {
message => { message => {
if task_count == 0 { if task_count == 0 {
return Err(format!( return Err(format!(
"Got message while task count was 0!\nMessage: {:#?}", "Got message while task count was 0!\nMessage: {message:#?}"
message
)); ));
} }
} }

View file

@ -242,19 +242,13 @@ pub fn pretty_format_luau_error(e: &LuaError) -> String {
let msg = message let msg = message
.clone() .clone()
.map_or_else(String::new, |m| format!("\nDetails:\n\t{m}")); .map_or_else(String::new, |m| format!("\nDetails:\n\t{m}"));
format!( format!("Failed to convert Rust type '{from}' into Luau type '{to}'!{msg}")
"Failed to convert Rust type '{}' into Luau type '{}'!{}",
from, to, msg
)
} }
LuaError::FromLuaConversionError { from, to, message } => { LuaError::FromLuaConversionError { from, to, message } => {
let msg = message let msg = message
.clone() .clone()
.map_or_else(String::new, |m| format!("\nDetails:\n\t{m}")); .map_or_else(String::new, |m| format!("\nDetails:\n\t{m}"));
format!( format!("Failed to convert Luau type '{from}' into Rust type '{to}'!{msg}")
"Failed to convert Luau type '{}' into Rust type '{}'!{}",
from, to, msg
)
} }
e => format!("{e}"), e => format!("{e}"),
}; };

View file

@ -1,81 +1,113 @@
// https://stackoverflow.com/questions/71141122/- // https://stackoverflow.com/questions/71141122/-
use std::{ use std::{
io, pin::Pin,
io::Write, process::ExitStatus,
process::{Child, ExitStatus},
sync::Weak, sync::Weak,
task::{Context, Poll},
time::Duration, time::Duration,
}; };
use mlua::prelude::*; use mlua::prelude::*;
use tokio::{sync::mpsc::Sender, time}; use tokio::{
io::{self, AsyncWrite, AsyncWriteExt},
process::Child,
sync::mpsc::Sender,
task, time,
};
use crate::LuneMessage; use crate::LuneMessage;
pub struct TeeWriter<'a, W0: Write, W1: Write> { pub struct TeeWriter<'a, L, R>
w0: &'a mut W0, where
w1: &'a mut W1, L: AsyncWrite + Unpin,
R: AsyncWrite + Unpin,
{
left: &'a mut L,
right: &'a mut R,
} }
impl<'a, W0: Write, W1: Write> TeeWriter<'a, W0, W1> { impl<'a, L, R> TeeWriter<'a, L, R>
pub fn new(w0: &'a mut W0, w1: &'a mut W1) -> Self { where
Self { w0, w1 } L: AsyncWrite + Unpin,
R: AsyncWrite + Unpin,
{
pub fn new(left: &'a mut L, right: &'a mut R) -> Self {
Self { left, right }
} }
} }
impl<'a, W0: Write, W1: Write> Write for TeeWriter<'a, W0, W1> { impl<'a, L, R> AsyncWrite for TeeWriter<'a, L, R>
fn write(&mut self, buf: &[u8]) -> io::Result<usize> { where
// We have to use write_all() otherwise what L: AsyncWrite + Unpin,
// happens if different amounts are written? R: AsyncWrite + Unpin,
self.w0.write_all(buf)?; {
self.w1.write_all(buf)?; fn poll_write(
Ok(buf.len()) mut self: Pin<&mut Self>,
_: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
// TODO: Figure out how to poll both of these
// futures, we can't use await in this trait impl
// It might be better to split the generic TeeWriter out
// and instead make TeeStdoutWriter and TeeStderrWriter
// structs that use Stdout and Stderr + Vec directly,
// all of which already implement these traits for us
self.left.write_all(buf);
self.right.write_all(buf);
Poll::Ready(Ok(buf.len()))
} }
fn flush(&mut self) -> io::Result<()> { fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
self.w0.flush()?; todo!()
self.w1.flush()?; }
Ok(())
fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
todo!()
} }
} }
pub fn pipe_and_inherit_child_process_stdio( pub async fn pipe_and_inherit_child_process_stdio(
mut child: Child, mut child: Child,
) -> LuaResult<(ExitStatus, Vec<u8>, Vec<u8>)> { ) -> LuaResult<(ExitStatus, Vec<u8>, Vec<u8>)> {
// https://stackoverflow.com/questions/71141122/- // https://stackoverflow.com/questions/71141122/-
let mut child_stdout = child.stdout.take().unwrap(); let mut child_stdout = child.stdout.take().unwrap();
let mut child_stderr = child.stderr.take().unwrap(); let mut child_stderr = child.stderr.take().unwrap();
std::thread::scope(|s| {
let stdout_thread = s.spawn(|| { // TODO: Or maybe we could just spawn four local tasks instead,
let stdout = io::stdout(); // one for each vec and one each for Stdout and Stderr, then we
// join the local tasks for the vecs to get out our results
let stdout_thread = task::spawn_local(async move {
let mut stdout = io::stdout();
let mut log = Vec::new(); let mut log = Vec::new();
let mut stdout = stdout.lock();
let mut tee = TeeWriter::new(&mut stdout, &mut log); let mut tee = TeeWriter::new(&mut stdout, &mut log);
io::copy(&mut child_stdout, &mut tee).map_err(LuaError::external)?; io::copy(&mut child_stdout, &mut tee)
.await
.map_err(LuaError::external)?;
Ok(log) Ok(log)
}); });
let stderr_thread = s.spawn(|| { let stderr_thread = task::spawn_local(async move {
let stderr = io::stderr(); let mut stderr = io::stderr();
let mut log = Vec::new(); let mut log = Vec::new();
let mut stderr = stderr.lock();
let mut tee = TeeWriter::new(&mut stderr, &mut log); let mut tee = TeeWriter::new(&mut stderr, &mut log);
io::copy(&mut child_stderr, &mut tee).map_err(LuaError::external)?; io::copy(&mut child_stderr, &mut tee)
.await
.map_err(LuaError::external)?;
Ok(log) Ok(log)
}); });
let status = child.wait().expect("child wasn't running"); let status = child.wait().await.expect("child wasn't running");
let stdout_log: Result<_, LuaError> = stdout_thread.join().expect("stdout thread panicked"); let stdout_log: Result<_, LuaError> = stdout_thread.await.expect("stdout thread panicked");
let stderr_log: Result<_, LuaError> = stderr_thread.join().expect("stderr thread panicked"); let stderr_log: Result<_, LuaError> = stderr_thread.await.expect("stderr thread panicked");
Ok::<_, LuaError>((status, stdout_log?, stderr_log?)) Ok::<_, LuaError>((status, stdout_log?, stderr_log?))
})
} }
pub async fn exit_and_yield_forever(lua: &Lua, exit_code: Option<u8>) -> LuaResult<()> { pub async fn exit_and_yield_forever(lua: &Lua, exit_code: Option<u8>) -> LuaResult<()> {