Give stdio its own task

This commit is contained in:
Filip Tibell 2024-01-17 11:29:54 +01:00
parent 084c8a1a21
commit 6143267ea5
No known key found for this signature in database

View file

@ -189,7 +189,8 @@ fn main_lua_task(mut rx: MessageReceiver, tx: MessageSender, stats: Stats) -> Lu
break; // All threads ran, and we don't have any async task that can spawn more break; // All threads ran, and we don't have any async task that can spawn more
} }
// Wait for at least one message, but try to receive as many as possible // Set up message processor - we mutably borrow both yielded_threads and runnable_threads
// so we can't really do this outside of the loop, but it compiles down to the same thing
let mut process_message = |message| match message { let mut process_message = |message| match message {
Message::Resume(thread_id, args) => { Message::Resume(thread_id, args) => {
if let Some(thread) = yielded_threads.remove(&thread_id) { if let Some(thread) = yielded_threads.remove(&thread_id) {
@ -203,6 +204,8 @@ fn main_lua_task(mut rx: MessageReceiver, tx: MessageSender, stats: Stats) -> Lu
} }
_ => unreachable!(), _ => unreachable!(),
}; };
// Wait for at least one message, but try to receive as many as possible
if let Some(message) = rx.blocking_recv() { if let Some(message) = rx.blocking_recv() {
process_message(message); process_message(message);
while let Ok(message) = rx.try_recv() { while let Ok(message) = rx.try_recv() {
@ -222,21 +225,19 @@ async fn main_async_task(
tx: MessageSender, tx: MessageSender,
stats: Stats, stats: Stats,
) -> LuaResult<()> { ) -> LuaResult<()> {
let mut handle_stdout = io::stdout(); // Give stdio its own task, we don't need it to block the scheduler
let mut handle_stderr = io::stderr(); let (tx_stdout, rx_stdout) = unbounded_channel();
let (tx_stderr, rx_stderr) = unbounded_channel();
// Wait for at least one message, but try to receive as many as possible let forward_stdout = |data| tx_stdout.send(data).ok();
let mut messages = Vec::new(); let forward_stderr = |data| tx_stderr.send(data).ok();
while let Some(message) = rx.recv().await { spawn(async move {
messages.push(message); if let Err(e) = async_stdio_task(rx_stdout, rx_stderr).await {
while let Ok(message) = rx.try_recv() { eprintln!("Stdio fatal error: {e}");
messages.push(message);
} }
});
// Handle all messages // Set up message processor
let mut wrote_stdout = false; let process_message = |message| {
let mut wrote_stderr = false;
for message in messages.drain(..) {
match message { match message {
Message::Sleep(_, _, _) => stats.incr(StatsCounter::ThreadSlept), Message::Sleep(_, _, _) => stats.incr(StatsCounter::ThreadSlept),
Message::Error(_, _) => stats.incr(StatsCounter::ThreadErrored), Message::Error(_, _) => stats.incr(StatsCounter::ThreadErrored),
@ -255,34 +256,55 @@ async fn main_async_task(
}); });
} }
Message::Error(_, e) => { Message::Error(_, e) => {
wrote_stderr = true; forward_stderr(b"Lua error: ".to_vec());
handle_stderr.write_all(b"Lua error: ").await?; forward_stderr(e.to_string().as_bytes().to_vec());
handle_stderr.write_all(e.to_string().as_bytes()).await?;
} }
Message::WriteStdout(data) => { Message::WriteStdout(data) => {
wrote_stdout = true; forward_stdout(data);
handle_stdout.write_all(&data).await?;
} }
Message::WriteStderr(data) => { Message::WriteStderr(data) => {
wrote_stderr = true; forward_stderr(data);
handle_stderr.write_all(&data).await?;
} }
_ => unreachable!(), _ => unreachable!(),
} }
} };
// Flush streams if we wrote anything to them // Wait for at least one message, but try to receive as many as possible
if wrote_stdout { while let Some(message) = rx.recv().await {
handle_stdout.flush().await?; process_message(message);
} while let Ok(message) = rx.try_recv() {
if wrote_stderr { process_message(message);
handle_stderr.flush().await?; }
}
Ok(())
}
async fn async_stdio_task(
mut rx_stdout: UnboundedReceiver<Vec<u8>>,
mut rx_stderr: UnboundedReceiver<Vec<u8>>,
) -> LuaResult<()> {
let mut stdout = io::stdout();
let mut stderr = io::stderr();
loop {
select! {
data = rx_stdout.recv() => match data {
None => break, // Main task exited
Some(data) => {
stdout.write_all(&data).await?;
stdout.flush().await?;
}
},
data = rx_stderr.recv() => match data {
None => break, // Main task exited
Some(data) => {
stderr.write_all(&data).await?;
stderr.flush().await?;
}
}
} }
} }
// Flush stdio one extra final time, just in case
handle_stdout.flush().await?;
handle_stderr.flush().await?;
Ok(()) Ok(())
} }