Fix task.delay keeping Lune alive even if cancelled

This commit is contained in:
Filip Tibell 2023-02-12 19:07:15 +01:00
parent 5157e60379
commit bb182033b9
No known key found for this signature in database
7 changed files with 76 additions and 72 deletions

View file

@ -8,6 +8,12 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## Unreleased
### Fixed
- Fixed `task.delay` keeping the script running even if it was cancelled using `task.cancel`
## `0.4.0` - February 11th, 2023 ## `0.4.0` - February 11th, 2023
### Added ### Added

View file

@ -2,7 +2,7 @@ use std::{
collections::HashMap, collections::HashMap,
future::Future, future::Future,
pin::Pin, pin::Pin,
sync::{Arc, Weak}, sync::Arc,
task::{Context, Poll}, task::{Context, Poll},
}; };
@ -13,14 +13,14 @@ use hyper::{Body, Request, Response, Server};
use hyper_tungstenite::{is_upgrade_request as is_ws_upgrade_request, upgrade as ws_upgrade}; use hyper_tungstenite::{is_upgrade_request as is_ws_upgrade_request, upgrade as ws_upgrade};
use reqwest::Method; use reqwest::Method;
use tokio::{ use tokio::{sync::mpsc, task};
sync::mpsc::{self, Sender},
task,
};
use crate::{ use crate::{
lua::net::{NetClient, NetClientBuilder, NetWebSocketClient, NetWebSocketServer, ServeConfig}, lua::net::{NetClient, NetClientBuilder, NetWebSocketClient, NetWebSocketServer, ServeConfig},
utils::{message::LuneMessage, net::get_request_user_agent_header, table::TableBuilder}, utils::{
message::LuneMessage, net::get_request_user_agent_header, table::TableBuilder,
task::send_message,
},
}; };
pub fn create(lua: &'static Lua) -> LuaResult<LuaTable> { pub fn create(lua: &'static Lua) -> LuaResult<LuaTable> {
@ -179,18 +179,17 @@ async fn net_serve<'a>(
}); });
// Make sure we register the thread properly by sending messages // Make sure we register the thread properly by sending messages
// when the server starts up and when it shuts down or errors // when the server starts up and when it shuts down or errors
let server_sender = lua send_message(lua, LuneMessage::Spawned).await?;
.app_data_ref::<Weak<Sender<LuneMessage>>>()
.unwrap()
.upgrade()
.unwrap();
let _ = server_sender.send(LuneMessage::Spawned).await;
task::spawn_local(async move { task::spawn_local(async move {
let res = server.await.map_err(LuaError::external); let res = server.await.map_err(LuaError::external);
let _ = match res { let _ = send_message(
Err(e) => server_sender.try_send(LuneMessage::LuaError(e)), lua,
Ok(_) => server_sender.try_send(LuneMessage::Finished), match res {
}; Err(e) => LuneMessage::LuaError(e),
Ok(_) => LuneMessage::Finished,
},
)
.await;
}); });
// Create a new read-only table that contains methods // Create a new read-only table that contains methods
// for manipulating server behavior and shutting it down // for manipulating server behavior and shutting it down
@ -255,11 +254,6 @@ impl Service<Request<Body>> for NetService {
// function & lune message sender to use later // function & lune message sender to use later
let bytes = to_bytes(body).await.map_err(LuaError::external)?; let bytes = to_bytes(body).await.map_err(LuaError::external)?;
let handler: LuaFunction = lua.registry_value(&key)?; let handler: LuaFunction = lua.registry_value(&key)?;
let sender = lua
.app_data_ref::<Weak<Sender<LuneMessage>>>()
.unwrap()
.upgrade()
.unwrap();
// Create a readonly table for the request query params // Create a readonly table for the request query params
let query_params = TableBuilder::new(lua)? let query_params = TableBuilder::new(lua)?
.with_values( .with_values(
@ -320,10 +314,7 @@ impl Service<Request<Body>> for NetService {
} }
// If the handler returns an error, generate a 5xx response // If the handler returns an error, generate a 5xx response
Err(err) => { Err(err) => {
sender send_message(lua, LuneMessage::LuaError(err.to_lua_err())).await?;
.send(LuneMessage::LuaError(err.to_lua_err()))
.await
.map_err(LuaError::external)?;
Ok(Response::builder() Ok(Response::builder()
.status(500) .status(500)
.body(Body::from("Internal Server Error")) .body(Body::from("Internal Server Error"))
@ -332,13 +323,10 @@ impl Service<Request<Body>> for NetService {
// If the handler returns a value that is of an invalid type, // If the handler returns a value that is of an invalid type,
// this should also be an error, so generate a 5xx response // this should also be an error, so generate a 5xx response
Ok(value) => { Ok(value) => {
sender send_message(lua, LuneMessage::LuaError(LuaError::RuntimeError(format!(
.send(LuneMessage::LuaError(LuaError::RuntimeError(format!(
"Expected net serve handler to return a value of type 'string' or 'table', got '{}'", "Expected net serve handler to return a value of type 'string' or 'table', got '{}'",
value.type_name() value.type_name()
)))) )))).await?;
.await
.map_err(LuaError::external)?;
Ok(Response::builder() Ok(Response::builder()
.status(500) .status(500)
.body(Body::from("Internal Server Error")) .body(Body::from("Internal Server Error"))

View file

@ -10,6 +10,9 @@ use crate::utils::{
const MINIMUM_WAIT_OR_DELAY_DURATION: f32 = 10.0 / 1_000.0; // 10ms const MINIMUM_WAIT_OR_DELAY_DURATION: f32 = 10.0 / 1_000.0; // 10ms
// TODO: We should probably keep track of all threads in a scheduler userdata
// that takes care of scheduling in a better way, and it should keep resuming
// threads until it encounters a delayed / waiting thread, then task:sleep
pub fn create(lua: &'static Lua) -> LuaResult<LuaTable> { pub fn create(lua: &'static Lua) -> LuaResult<LuaTable> {
// HACK: There is no way to call coroutine.close directly from the mlua // HACK: There is no way to call coroutine.close directly from the mlua
// crate, so we need to fetch the function and store it in the registry // crate, so we need to fetch the function and store it in the registry
@ -81,12 +84,22 @@ async fn task_delay<'a>(
let task_thread_key = lua.create_registry_value(task_thread)?; let task_thread_key = lua.create_registry_value(task_thread)?;
let task_args_key = lua.create_registry_value(args.into_vec())?; let task_args_key = lua.create_registry_value(args.into_vec())?;
let lua_thread_to_return = lua.registry_value(&task_thread_key)?; let lua_thread_to_return = lua.registry_value(&task_thread_key)?;
run_registered_task(lua, TaskRunMode::Deferred, async move { let start = Instant::now();
task_wait(lua, duration).await?; let dur = Duration::from_secs_f32(
duration
.map(|d| d.max(MINIMUM_WAIT_OR_DELAY_DURATION))
.unwrap_or(MINIMUM_WAIT_OR_DELAY_DURATION),
);
run_registered_task(lua, TaskRunMode::Instant, async move {
let thread: LuaThread = lua.registry_value(&task_thread_key)?; let thread: LuaThread = lua.registry_value(&task_thread_key)?;
// NOTE: We are somewhat busy-waiting here, but we have to do this to make sure
// that delayed+cancelled threads do not prevent the tokio runtime from finishing
while thread.status() == LuaThreadStatus::Resumable && start.elapsed() < dur {
time::sleep(Duration::from_millis(1)).await;
}
if thread.status() == LuaThreadStatus::Resumable {
let argsv: Vec<LuaValue> = lua.registry_value(&task_args_key)?; let argsv: Vec<LuaValue> = lua.registry_value(&task_args_key)?;
let args = LuaMultiValue::from_vec(argsv); let args = LuaMultiValue::from_vec(argsv);
if thread.status() == LuaThreadStatus::Resumable {
let _: LuaMultiValue = thread.into_async(args).await?; let _: LuaMultiValue = thread.into_async(args).await?;
} }
lua.remove_registry_value(task_thread_key)?; lua.remove_registry_value(task_thread_key)?;

View file

@ -2,6 +2,7 @@ use std::{collections::HashSet, process::ExitCode, sync::Arc};
use mlua::prelude::*; use mlua::prelude::*;
use tokio::{sync::mpsc, task}; use tokio::{sync::mpsc, task};
use utils::task::send_message;
pub(crate) mod globals; pub(crate) mod globals;
pub(crate) mod lua; pub(crate) mod lua;
@ -101,11 +102,7 @@ impl Lune {
// Spawn the main thread from our entrypoint script // Spawn the main thread from our entrypoint script
let script_name = script_name.to_string(); let script_name = script_name.to_string();
let script_chunk = script_contents.to_string(); let script_chunk = script_contents.to_string();
let script_sender = snd.clone(); send_message(lua, LuneMessage::Spawned).await?;
script_sender
.send(LuneMessage::Spawned)
.await
.map_err(LuaError::external)?;
task_set.spawn_local(async move { task_set.spawn_local(async move {
let result = lua let result = lua
.load(&script_chunk) .load(&script_chunk)
@ -113,10 +110,14 @@ impl Lune {
.unwrap() .unwrap()
.eval_async::<LuaValue>() .eval_async::<LuaValue>()
.await; .await;
send_message(
lua,
match result { match result {
Err(e) => script_sender.send(LuneMessage::LuaError(e)).await, Err(e) => LuneMessage::LuaError(e),
Ok(_) => script_sender.send(LuneMessage::Finished).await, Ok(_) => LuneMessage::Finished,
} },
)
.await
}); });
// Run the executor until there are no tasks left, // Run the executor until there are no tasks left,
// taking care to not exit right away for errors // taking care to not exit right away for errors

View file

@ -1,10 +1,12 @@
use std::{process::ExitStatus, sync::Weak, time::Duration}; use std::{process::ExitStatus, time::Duration};
use mlua::prelude::*; use mlua::prelude::*;
use tokio::{io, process::Child, sync::mpsc::Sender, task::spawn, time::sleep}; use tokio::{io, process::Child, task::spawn, time::sleep};
use crate::utils::{futures::AsyncTeeWriter, message::LuneMessage}; use crate::utils::{futures::AsyncTeeWriter, message::LuneMessage};
use super::task::send_message;
pub async fn pipe_and_inherit_child_process_stdio( pub async fn pipe_and_inherit_child_process_stdio(
mut child: Child, mut child: Child,
) -> LuaResult<(ExitStatus, Vec<u8>, Vec<u8>)> { ) -> LuaResult<(ExitStatus, Vec<u8>, Vec<u8>)> {
@ -42,17 +44,9 @@ pub async fn pipe_and_inherit_child_process_stdio(
} }
pub async fn exit_and_yield_forever(lua: &'static Lua, exit_code: Option<u8>) -> LuaResult<()> { pub async fn exit_and_yield_forever(lua: &'static Lua, exit_code: Option<u8>) -> LuaResult<()> {
let sender = lua
.app_data_ref::<Weak<Sender<LuneMessage>>>()
.unwrap()
.upgrade()
.unwrap();
// Send an exit signal to the main thread, which // Send an exit signal to the main thread, which
// will try to exit safely and as soon as possible // will try to exit safely and as soon as possible
sender send_message(lua, LuneMessage::Exit(exit_code.unwrap_or(0))).await?;
.send(LuneMessage::Exit(exit_code.unwrap_or(0)))
.await
.map_err(LuaError::external)?;
// Make sure to block the rest of this thread indefinitely since // Make sure to block the rest of this thread indefinitely since
// the main thread may not register the exit signal right away // the main thread may not register the exit signal right away
sleep(Duration::MAX).await; sleep(Duration::MAX).await;

View file

@ -25,24 +25,23 @@ impl fmt::Display for TaskRunMode {
} }
} }
pub async fn run_registered_task<T>( pub async fn send_message(lua: &'static Lua, message: LuneMessage) -> LuaResult<()> {
lua: &'static Lua,
mode: TaskRunMode,
to_run: impl Future<Output = LuaResult<T>> + 'static,
) -> LuaResult<()> {
// Fetch global reference to message sender
let sender = lua let sender = lua
.app_data_ref::<Weak<Sender<LuneMessage>>>() .app_data_ref::<Weak<Sender<LuneMessage>>>()
.unwrap() .unwrap()
.upgrade() .upgrade()
.unwrap(); .unwrap();
sender.send(message).await.map_err(LuaError::external)
}
pub async fn run_registered_task<T>(
lua: &'static Lua,
mode: TaskRunMode,
to_run: impl Future<Output = LuaResult<T>> + 'static,
) -> LuaResult<()> {
// Send a message that we have started our task // Send a message that we have started our task
sender send_message(lua, LuneMessage::Spawned).await?;
.send(LuneMessage::Spawned)
.await
.map_err(LuaError::external)?;
// Run the new task separately from the current one using the executor // Run the new task separately from the current one using the executor
let sender = sender.clone();
let task = task::spawn_local(async move { let task = task::spawn_local(async move {
// HACK: For deferred tasks we yield a bunch of times to try and ensure // HACK: For deferred tasks we yield a bunch of times to try and ensure
// we run our task at the very end of the async queue, this can fail if // we run our task at the very end of the async queue, this can fail if
@ -52,12 +51,14 @@ pub async fn run_registered_task<T>(
task::yield_now().await; task::yield_now().await;
} }
} }
sender send_message(
.send(match to_run.await { lua,
match to_run.await {
Ok(_) => LuneMessage::Finished, Ok(_) => LuneMessage::Finished,
Err(LuaError::CoroutineInactive) => LuneMessage::Finished, // Task was canceled Err(LuaError::CoroutineInactive) => LuneMessage::Finished, // Task was canceled
Err(e) => LuneMessage::LuaError(e), Err(e) => LuneMessage::LuaError(e),
}) },
)
.await .await
}); });
// Wait for the task to complete if we want this call to be blocking // Wait for the task to complete if we want this call to be blocking

View file

@ -9,12 +9,13 @@ task.wait(0.1)
assert(not flag, "Cancel should handle deferred threads") assert(not flag, "Cancel should handle deferred threads")
local flag2: boolean = false local flag2: boolean = false
local thread2 = task.delay(0, function() local thread2 = task.delay(0.1, function()
flag2 = true flag2 = true
end) end)
task.wait(0)
task.cancel(thread2) task.cancel(thread2)
task.wait(0.1) task.wait(0.2)
assert(not flag2, "Cancel should handle deferred threads") assert(not flag2, "Cancel should handle delayed threads")
-- Cancellation should work with yields in spawned threads -- Cancellation should work with yields in spawned threads