mirror of
https://github.com/lune-org/lune.git
synced 2025-05-04 10:43:57 +01:00
Start migrating lune-std-process to use async-process instead of tokio
This commit is contained in:
parent
2a701e919c
commit
8059026251
7 changed files with 78 additions and 104 deletions
42
Cargo.lock
generated
42
Cargo.lock
generated
|
@ -242,6 +242,43 @@ dependencies = [
|
|||
"pin-project-lite",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-process"
|
||||
version = "2.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "63255f1dc2381611000436537bbedfe83183faa303a5a0edaf191edef06526bb"
|
||||
dependencies = [
|
||||
"async-channel",
|
||||
"async-io",
|
||||
"async-lock",
|
||||
"async-signal",
|
||||
"async-task",
|
||||
"blocking",
|
||||
"cfg-if 1.0.0",
|
||||
"event-listener 5.3.1",
|
||||
"futures-lite",
|
||||
"rustix",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-signal"
|
||||
version = "0.2.10"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "637e00349800c0bdf8bfc21ebbc0b6524abea702b0da4168ac00d070d0c0b9f3"
|
||||
dependencies = [
|
||||
"async-io",
|
||||
"async-lock",
|
||||
"atomic-waker",
|
||||
"cfg-if 1.0.0",
|
||||
"futures-core",
|
||||
"futures-io",
|
||||
"rustix",
|
||||
"signal-hook-registry",
|
||||
"slab",
|
||||
"windows-sys 0.59.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-task"
|
||||
version = "4.7.1"
|
||||
|
@ -1672,15 +1709,18 @@ dependencies = [
|
|||
name = "lune-std-process"
|
||||
version = "0.1.3"
|
||||
dependencies = [
|
||||
"async-io",
|
||||
"async-process",
|
||||
"blocking",
|
||||
"bstr",
|
||||
"bytes",
|
||||
"directories",
|
||||
"futures-lite",
|
||||
"lune-utils",
|
||||
"mlua",
|
||||
"mlua-luau-scheduler",
|
||||
"os_str_bytes",
|
||||
"pin-project",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
|
|
@ -23,12 +23,9 @@ os_str_bytes = { version = "7.0", features = ["conversions"] }
|
|||
bstr = "1.9"
|
||||
bytes = "1.6.0"
|
||||
|
||||
tokio = { version = "1", default-features = false, features = [
|
||||
"io-std",
|
||||
"io-util",
|
||||
"process",
|
||||
"rt",
|
||||
"sync",
|
||||
] }
|
||||
async-io = "2.4"
|
||||
async-process = "2.3"
|
||||
blocking = "1.6"
|
||||
futures-lite = "2.6"
|
||||
|
||||
lune-utils = { version = "0.1.3", path = "../lune-utils" }
|
||||
|
|
|
@ -1,25 +1,22 @@
|
|||
#![allow(clippy::cargo_common_metadata)]
|
||||
|
||||
use std::{
|
||||
cell::RefCell,
|
||||
env::{
|
||||
self,
|
||||
consts::{ARCH, OS},
|
||||
},
|
||||
path::MAIN_SEPARATOR,
|
||||
process::Stdio,
|
||||
rc::Rc,
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use mlua::prelude::*;
|
||||
|
||||
use lune_utils::TableBuilder;
|
||||
use mlua_luau_scheduler::{Functions, LuaSpawnExt};
|
||||
use options::ProcessSpawnOptionsStdio;
|
||||
|
||||
use async_process::Child;
|
||||
use futures_lite::prelude::*;
|
||||
use os_str_bytes::RawOsString;
|
||||
use stream::{ChildProcessReader, ChildProcessWriter};
|
||||
use tokio::{io::AsyncWriteExt, process::Child, sync::RwLock};
|
||||
|
||||
use lune_utils::{path::get_current_dir, TableBuilder};
|
||||
|
||||
mod options;
|
||||
mod stream;
|
||||
|
@ -29,8 +26,6 @@ mod wait_for_child;
|
|||
use self::options::ProcessSpawnOptions;
|
||||
use self::wait_for_child::wait_for_child;
|
||||
|
||||
use lune_utils::path::get_current_dir;
|
||||
|
||||
/**
|
||||
Creates the `process` standard library module.
|
||||
|
||||
|
@ -179,69 +174,10 @@ async fn process_exec(
|
|||
|
||||
#[allow(clippy::await_holding_refcell_ref)]
|
||||
fn process_create(
|
||||
lua: &Lua,
|
||||
(program, args, options): (String, Option<Vec<String>>, ProcessSpawnOptions),
|
||||
_lua: &Lua,
|
||||
(_program, _args, _options): (String, Option<Vec<String>>, ProcessSpawnOptions),
|
||||
) -> LuaResult<LuaTable> {
|
||||
// We do not want the user to provide stdio options for process.create,
|
||||
// so we reset the options, regardless of what the user provides us
|
||||
let mut spawn_options = options.clone();
|
||||
spawn_options.stdio = ProcessSpawnOptionsStdio::default();
|
||||
|
||||
let (code_tx, code_rx) = tokio::sync::broadcast::channel(4);
|
||||
let code_rx_rc = Rc::new(RefCell::new(code_rx));
|
||||
|
||||
let child = spawn_command(program, args, spawn_options)?;
|
||||
|
||||
let child_arc = Arc::new(RwLock::new(child));
|
||||
|
||||
let child_arc_clone = Arc::clone(&child_arc);
|
||||
let mut child_lock = tokio::task::block_in_place(|| child_arc_clone.blocking_write());
|
||||
|
||||
let stdin = child_lock.stdin.take().unwrap();
|
||||
let stdout = child_lock.stdout.take().unwrap();
|
||||
let stderr = child_lock.stderr.take().unwrap();
|
||||
|
||||
let child_arc_inner = Arc::clone(&child_arc);
|
||||
|
||||
// Spawn a background task to wait for the child to exit and send the exit code
|
||||
let status_handle = tokio::spawn(async move {
|
||||
let res = child_arc_inner.write().await.wait().await;
|
||||
|
||||
if let Ok(output) = res {
|
||||
let code = output.code().unwrap_or_default();
|
||||
|
||||
code_tx
|
||||
.send(code)
|
||||
.expect("ExitCode receiver was unexpectedly dropped");
|
||||
}
|
||||
});
|
||||
|
||||
TableBuilder::new(lua.clone())?
|
||||
.with_value("stdout", ChildProcessReader(stdout))?
|
||||
.with_value("stderr", ChildProcessReader(stderr))?
|
||||
.with_value("stdin", ChildProcessWriter(stdin))?
|
||||
.with_async_function("kill", move |_, ()| {
|
||||
// First, stop the status task so the RwLock is dropped
|
||||
status_handle.abort();
|
||||
let child_arc_clone = Arc::clone(&child_arc);
|
||||
|
||||
// Then get another RwLock to write to the child process and kill it
|
||||
async move { Ok(child_arc_clone.write().await.kill().await?) }
|
||||
})?
|
||||
.with_async_function("status", move |lua, ()| {
|
||||
let code_rx_rc_clone = Rc::clone(&code_rx_rc);
|
||||
async move {
|
||||
// Exit code of 9 corresponds to SIGKILL, which should be the only case where
|
||||
// the receiver gets suddenly dropped
|
||||
let code = code_rx_rc_clone.borrow_mut().recv().await.unwrap_or(9);
|
||||
|
||||
TableBuilder::new(lua)?
|
||||
.with_value("code", code)?
|
||||
.with_value("ok", code == 0)?
|
||||
.build_readonly()
|
||||
}
|
||||
})?
|
||||
.build_readonly()
|
||||
Err(LuaError::runtime("unimplemented"))
|
||||
}
|
||||
|
||||
async fn spawn_command_with_stdin(
|
||||
|
|
|
@ -4,9 +4,10 @@ use std::{
|
|||
path::PathBuf,
|
||||
};
|
||||
|
||||
use directories::UserDirs;
|
||||
use mlua::prelude::*;
|
||||
use tokio::process::Command;
|
||||
|
||||
use async_process::Command;
|
||||
use directories::UserDirs;
|
||||
|
||||
mod kind;
|
||||
mod stdio;
|
||||
|
|
|
@ -1,21 +1,23 @@
|
|||
use bstr::BString;
|
||||
use bytes::BytesMut;
|
||||
use futures_lite::prelude::*;
|
||||
use mlua::prelude::*;
|
||||
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
|
||||
|
||||
const CHUNK_SIZE: usize = 8;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ChildProcessReader<R: AsyncRead>(pub R);
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ChildProcessWriter<W: AsyncWrite>(pub W);
|
||||
|
||||
impl<R: AsyncRead + Unpin> ChildProcessReader<R> {
|
||||
pub async fn read(&mut self, chunk_size: Option<usize>) -> LuaResult<Vec<u8>> {
|
||||
let mut buf = BytesMut::with_capacity(chunk_size.unwrap_or(CHUNK_SIZE));
|
||||
self.0.read_buf(&mut buf).await?;
|
||||
let mut buf = vec![0u8; chunk_size.unwrap_or(CHUNK_SIZE)];
|
||||
|
||||
Ok(buf.to_vec())
|
||||
let read = self.0.read(&mut buf).await?;
|
||||
buf.truncate(read);
|
||||
|
||||
Ok(buf)
|
||||
}
|
||||
|
||||
pub async fn read_to_end(&mut self) -> LuaResult<Vec<u8>> {
|
||||
|
|
|
@ -4,8 +4,8 @@ use std::{
|
|||
task::{Context, Poll},
|
||||
};
|
||||
|
||||
use futures_lite::{io, prelude::*};
|
||||
use pin_project::pin_project;
|
||||
use tokio::io::{self, AsyncWrite};
|
||||
|
||||
#[pin_project]
|
||||
pub struct AsyncTeeWriter<'a, W>
|
||||
|
@ -45,8 +45,7 @@ where
|
|||
let mut this = self.project();
|
||||
match this.writer.as_mut().poll_write(cx, buf) {
|
||||
Poll::Ready(res) => {
|
||||
this.buffer
|
||||
.write_all(buf)
|
||||
Write::write_all(&mut this.buffer, buf)
|
||||
.expect("Failed to write to internal tee buffer");
|
||||
Poll::Ready(res)
|
||||
}
|
||||
|
@ -58,7 +57,7 @@ where
|
|||
self.project().writer.as_mut().poll_flush(cx)
|
||||
}
|
||||
|
||||
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||
self.project().writer.as_mut().poll_shutdown(cx)
|
||||
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||
self.project().writer.as_mut().poll_close(cx)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,11 +1,10 @@
|
|||
use std::process::ExitStatus;
|
||||
use std::{io::stdout, process::ExitStatus};
|
||||
|
||||
use mlua::prelude::*;
|
||||
use tokio::{
|
||||
io::{self, AsyncRead, AsyncReadExt},
|
||||
process::Child,
|
||||
task,
|
||||
};
|
||||
|
||||
use async_process::Child;
|
||||
use blocking::Unblock;
|
||||
use futures_lite::{io, prelude::*};
|
||||
|
||||
use super::{options::ProcessSpawnOptionsStdioKind, tee_writer::AsyncTeeWriter};
|
||||
|
||||
|
@ -39,7 +38,7 @@ where
|
|||
let mut read_from =
|
||||
read_from.expect("read_from must be Some when stdio kind is Inherit");
|
||||
|
||||
let mut stdout = io::stdout();
|
||||
let mut stdout = Unblock::new(stdout());
|
||||
let mut tee = AsyncTeeWriter::new(&mut stdout);
|
||||
|
||||
io::copy(&mut read_from, &mut tee).await.into_lua_err()?;
|
||||
|
@ -57,13 +56,13 @@ pub(super) async fn wait_for_child(
|
|||
let stdout_opt = child.stdout.take();
|
||||
let stderr_opt = child.stderr.take();
|
||||
|
||||
let stdout_task = task::spawn(read_with_stdio_kind(stdout_opt, stdout_kind));
|
||||
let stderr_task = task::spawn(read_with_stdio_kind(stderr_opt, stderr_kind));
|
||||
let stdout_task = read_with_stdio_kind(stdout_opt, stdout_kind);
|
||||
let stderr_task = read_with_stdio_kind(stderr_opt, stderr_kind);
|
||||
|
||||
let status = child.wait().await.expect("Child process failed to start");
|
||||
let status = child.status().await.into_lua_err()?;
|
||||
|
||||
let stdout_buffer = stdout_task.await.into_lua_err()??;
|
||||
let stderr_buffer = stderr_task.await.into_lua_err()??;
|
||||
let stdout_buffer = stdout_task.await.into_lua_err()?;
|
||||
let stderr_buffer = stderr_task.await.into_lua_err()?;
|
||||
|
||||
Ok(WaitForChildResult {
|
||||
status,
|
||||
|
|
Loading…
Add table
Reference in a new issue