feat: initial revamped process.spawn impl

This commit is contained in:
Erica Marigold 2024-06-09 13:01:34 +05:30
parent 59a7955132
commit 900b6715b6
No known key found for this signature in database
GPG key ID: 2768CC0C23D245D1
5 changed files with 167 additions and 11 deletions

2
Cargo.lock generated
View file

@ -1625,6 +1625,8 @@ dependencies = [
name = "lune-std-process"
version = "0.1.1"
dependencies = [
"bstr",
"bytes",
"directories",
"lune-utils",
"mlua",

View file

@ -20,6 +20,8 @@ directories = "5.0"
pin-project = "1.0"
os_str_bytes = { version = "7.0", features = ["conversions"] }
bstr = "1.9"
tokio = { version = "1", default-features = false, features = [
"io-std",
"io-util",
@ -29,3 +31,4 @@ tokio = { version = "1", default-features = false, features = [
] }
lune-utils = { version = "0.1.0", path = "../lune-utils" }
bytes = "1.6.0"

View file

@ -1,12 +1,14 @@
#![allow(clippy::cargo_common_metadata)]
use std::{
cell::RefCell,
env::{
self,
consts::{ARCH, OS},
},
path::MAIN_SEPARATOR,
process::Stdio,
rc::Rc,
};
use mlua::prelude::*;
@ -14,14 +16,16 @@ use mlua::prelude::*;
use lune_utils::TableBuilder;
use mlua_luau_scheduler::{Functions, LuaSpawnExt};
use os_str_bytes::RawOsString;
use tokio::io::AsyncWriteExt;
use stream::{ChildProcessReader, ChildProcessWriter};
use tokio::{io::AsyncWriteExt, process::Child};
mod options;
mod stream;
mod tee_writer;
mod wait_for_child;
use self::options::ProcessSpawnOptions;
use self::wait_for_child::{wait_for_child, WaitForChildResult};
use self::wait_for_child::wait_for_child;
use lune_utils::path::get_current_dir;
@ -73,6 +77,7 @@ pub fn module(lua: &Lua) -> LuaResult<LuaTable> {
.with_value("cwd", cwd_str)?
.with_value("env", env_tab)?
.with_value("exit", process_exit)?
.with_async_function("exec", process_exec)?
.with_async_function("spawn", process_spawn)?
.build_readonly()
}
@ -141,11 +146,16 @@ fn process_env_iter<'lua>(
})
}
async fn process_spawn(
async fn process_exec(
lua: &Lua,
(program, args, options): (String, Option<Vec<String>>, ProcessSpawnOptions),
) -> LuaResult<LuaTable> {
let res = lua.spawn(spawn_command(program, args, options)).await?;
let res = lua
.spawn(async move {
let cmd = spawn_command(program, args, options.clone()).await?;
wait_for_child(cmd, options.stdio.stdout, options.stdio.stderr).await
})
.await?;
/*
NOTE: If an exit code was not given by the child process,
@ -168,22 +178,115 @@ async fn process_spawn(
.build_readonly()
}
#[allow(clippy::await_holding_refcell_ref)]
async fn process_spawn(
lua: &Lua,
(program, args, options): (String, Option<Vec<String>>, ProcessSpawnOptions),
) -> LuaResult<LuaTable> {
let mut spawn_options = options.clone();
spawn_options.stdio.stdin = None;
let (stdin_tx, stdin_rx) = tokio::sync::oneshot::channel();
let (stdout_tx, stdout_rx) = tokio::sync::oneshot::channel();
let (stderr_tx, stderr_rx) = tokio::sync::oneshot::channel();
let (code_tx, code_rx) = tokio::sync::broadcast::channel(100);
let code_rx_rc = Rc::new(RefCell::new(code_rx));
tokio::spawn(async move {
let mut child = spawn_command(program, args, spawn_options)
.await
.expect("Could not spawn child process");
stdin_tx
.send(child.stdin.take())
.expect("Stdin receiver was unexpectedly dropped");
stdout_tx
.send(child.stdout.take())
.expect("Stdout receiver was unexpectedly dropped");
stderr_tx
.send(child.stderr.take())
.expect("Stderr receiver was unexpectedly dropped");
let res = child
.wait_with_output()
.await
.expect("Failed to get status and output of spawned child process");
let code = res
.status
.code()
.unwrap_or(i32::from(!res.stderr.is_empty()));
code_tx
.send(code)
.expect("ExitCode receiver was unexpectedly dropped");
});
TableBuilder::new(lua)?
.with_value(
"stdout",
ChildProcessReader(
stdout_rx
.await
.expect("Stdout sender unexpectedly dropped")
.ok_or(LuaError::runtime(
"Cannot read from stdout when it is not piped",
))?,
),
)?
.with_value(
"stderr",
ChildProcessReader(
stderr_rx
.await
.expect("Stderr sender unexpectedly dropped")
.ok_or(LuaError::runtime(
"Cannot read from stderr when it is not piped",
))?,
),
)?
.with_value(
"stdin",
ChildProcessWriter(
stdin_rx
.await
.expect("Stdin sender unexpectedly dropped")
.unwrap(),
),
)?
.with_async_function("status", move |lua, ()| {
let code_rx_rc_clone = Rc::clone(&code_rx_rc);
async move {
let code = code_rx_rc_clone
.borrow_mut()
.recv()
.await
.expect("Code sender unexpectedly dropped");
TableBuilder::new(lua)?
.with_value("code", code)?
.with_value("success", code == 0)?
.build_readonly()
}
})?
.build_readonly()
}
async fn spawn_command(
program: String,
args: Option<Vec<String>>,
mut options: ProcessSpawnOptions,
) -> LuaResult<WaitForChildResult> {
) -> LuaResult<Child> {
let stdout = options.stdio.stdout;
let stderr = options.stdio.stderr;
let stdin = options.stdio.stdin.take();
// TODO: Have an stdin_kind which the user can supply as piped or not
// TODO: Maybe even revamp the stdout/stderr kinds? User should only use
// piped when they are sure they want to read the stdout. Currently we default
// to piped
let mut child = options
.into_command(program, args)
.stdin(if stdin.is_some() {
Stdio::piped()
} else {
Stdio::null()
})
.stdin(Stdio::piped())
.stdout(stdout.as_stdio())
.stderr(stderr.as_stdio())
.spawn()?;
@ -193,5 +296,5 @@ async fn spawn_command(
child_stdin.write_all(&stdin).await.into_lua_err()?;
}
wait_for_child(child, stdout, stderr).await
Ok(child)
}

View file

@ -0,0 +1,41 @@
use bstr::BString;
use bytes::BytesMut;
use mlua::prelude::*;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
const CHUNK_SIZE: usize = 8;
#[derive(Debug, Clone)]
pub struct ChildProcessReader<R: AsyncRead>(pub R);
#[derive(Debug, Clone)]
pub struct ChildProcessWriter<W: AsyncWrite>(pub W);
impl<R: AsyncRead + Unpin> ChildProcessReader<R> {
pub async fn read(&mut self) -> LuaResult<Vec<u8>> {
let mut buf = BytesMut::with_capacity(CHUNK_SIZE);
self.0.read_buf(&mut buf).await?;
Ok(buf.to_vec())
}
}
impl<R: AsyncRead + Unpin + 'static> LuaUserData for ChildProcessReader<R> {
fn add_methods<'lua, M: LuaUserDataMethods<'lua, Self>>(methods: &mut M) {
methods.add_async_method_mut("read", |lua, this, ()| async {
Ok(lua.create_buffer(this.read().await?))
});
}
}
impl<W: AsyncWrite + Unpin> ChildProcessWriter<W> {
pub async fn write(&mut self, data: BString) -> LuaResult<()> {
self.0.write_all(data.as_ref()).await?;
Ok(())
}
}
impl<W: AsyncWrite + Unpin + 'static> LuaUserData for ChildProcessWriter<W> {
fn add_methods<'lua, M: LuaUserDataMethods<'lua, Self>>(methods: &mut M) {
methods.add_async_method_mut("write", |_, this, data| async { this.write(data).await });
}
}

7
test.luau Normal file
View file

@ -0,0 +1,7 @@
local process = require("@lune/process")
local a = process.spawn("yes", {})
print(a)
print(buffer.tostring(a.stdout:read()))
print(a.status())