Implement async background tasks for scheduler

This commit is contained in:
Filip Tibell 2023-02-14 15:27:10 +01:00
parent d2ff0783a5
commit b5f6e6da98
No known key found for this signature in database
4 changed files with 254 additions and 129 deletions

View file

@ -149,7 +149,9 @@ async fn net_socket<'a>(lua: &'static Lua, url: String) -> LuaResult<LuaTable> {
let (ws, _) = tokio_tungstenite::connect_async(url) let (ws, _) = tokio_tungstenite::connect_async(url)
.await .await
.map_err(LuaError::external)?; .map_err(LuaError::external)?;
todo!() Err(LuaError::RuntimeError(
"Client websockets are not yet implemented".to_string(),
))
// let sock = NetWebSocketClient::from(ws); // let sock = NetWebSocketClient::from(ws);
// let table = sock.into_lua_table(lua)?; // let table = sock.into_lua_table(lua)?;
// Ok(table) // Ok(table)
@ -159,6 +161,11 @@ async fn net_serve<'a>(
lua: &'static Lua, lua: &'static Lua,
(port, config): (u16, ServeConfig<'a>), (port, config): (u16, ServeConfig<'a>),
) -> LuaResult<LuaTable<'a>> { ) -> LuaResult<LuaTable<'a>> {
if config.handle_web_socket.is_some() {
return Err(LuaError::RuntimeError(
"Server websockets are not yet implemented".to_string(),
));
}
// Note that we need to use a mpsc here and not // Note that we need to use a mpsc here and not
// a oneshot channel since we move the sender // a oneshot channel since we move the sender
// into our table with the stop function // into our table with the stop function
@ -168,6 +175,10 @@ async fn net_serve<'a>(
lua.create_registry_value(handler) lua.create_registry_value(handler)
.expect("Failed to store websocket handler") .expect("Failed to store websocket handler")
})); }));
// Register a background task to prevent
// the task scheduler from exiting early
let sched = lua.app_data_mut::<&TaskScheduler>().unwrap();
let task = sched.register_background_task();
let server = Server::bind(&([127, 0, 0, 1], port).into()) let server = Server::bind(&([127, 0, 0, 1], port).into())
.http1_only(true) .http1_only(true)
.http1_keepalive(true) .http1_keepalive(true)
@ -178,12 +189,15 @@ async fn net_serve<'a>(
server_websocket_callback, server_websocket_callback,
)) ))
.with_graceful_shutdown(async move { .with_graceful_shutdown(async move {
shutdown_rx.recv().await.unwrap(); shutdown_rx
.recv()
.await
.expect("Server was stopped instantly");
shutdown_rx.close(); shutdown_rx.close();
task.unregister(Ok(()));
}); });
// TODO: Spawn a new scheduler future with this so we don't block // Spawn a new tokio task so we don't block
// and make sure that we register it properly to prevent shutdown task::spawn_local(server);
server.await.map_err(LuaError::external)?;
// Create a new read-only table that contains methods // Create a new read-only table that contains methods
// for manipulating server behavior and shutting it down // for manipulating server behavior and shutting it down
let handle_stop = move |_, _: ()| { let handle_stop = move |_, _: ()| {

View file

@ -1,6 +1,6 @@
use std::{collections::HashSet, process::ExitCode}; use std::{collections::HashSet, process::ExitCode};
use lua::task::{TaskScheduler, TaskSchedulerResult}; use lua::task::TaskScheduler;
use mlua::prelude::*; use mlua::prelude::*;
use tokio::task::LocalSet; use tokio::task::LocalSet;
@ -124,26 +124,21 @@ impl Lune {
// left to run, or until a task requests to exit the process // left to run, or until a task requests to exit the process
let exit_code = LocalSet::new() let exit_code = LocalSet::new()
.run_until(async move { .run_until(async move {
loop { let mut got_error = false;
let mut got_error = false; let mut result = sched.resume_queue().await;
let state = match sched.resume_queue().await { while !result.is_done() {
TaskSchedulerResult::TaskSuccessful { state } => state, if let Some(err) = result.get_lua_error() {
TaskSchedulerResult::TaskErrored { state, error } => { eprintln!("{}", pretty_format_luau_error(&err));
eprintln!("{}", pretty_format_luau_error(&error)); got_error = true;
got_error = true;
state
}
TaskSchedulerResult::Finished { state } => state,
};
if let Some(exit_code) = state.exit_code {
return exit_code;
} else if state.num_total == 0 {
if got_error {
return ExitCode::FAILURE;
} else {
return ExitCode::SUCCESS;
}
} }
result = sched.resume_queue().await;
}
if let Some(exit_code) = result.get_exit_code() {
exit_code
} else if got_error {
ExitCode::FAILURE
} else {
ExitCode::SUCCESS
} }
}) })
.await; .await;

View file

@ -14,7 +14,7 @@ use futures_util::{future::LocalBoxFuture, stream::FuturesUnordered, Future, Str
use mlua::prelude::*; use mlua::prelude::*;
use tokio::{ use tokio::{
sync::Mutex as AsyncMutex, sync::{mpsc, Mutex as AsyncMutex},
time::{sleep, Instant}, time::{sleep, Instant},
}; };
@ -50,7 +50,7 @@ impl fmt::Display for TaskKind {
} }
} }
/// A lightweight, clonable struct that represents a /// A lightweight, copyable struct that represents a
/// task in the scheduler and is accessible from Lua /// task in the scheduler and is accessible from Lua
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct TaskReference { pub struct TaskReference {
@ -80,38 +80,121 @@ pub struct Task {
queued_at: Instant, queued_at: Instant,
} }
/// A struct representing the current status of the task scheduler /**
#[derive(Debug, Clone, Copy)] A handle to a registered background task.
pub struct TaskSchedulerState {
pub exit_code: Option<ExitCode>, [`TaskSchedulerUnregistrar::unregister`] must be
pub num_instant: usize, called upon completion of the background task to
pub num_deferred: usize, prevent the task scheduler from running indefinitely.
pub num_future: usize, */
pub num_total: usize, #[must_use = "Background tasks must be unregistered"]
#[derive(Debug)]
pub struct TaskSchedulerBackgroundTaskHandle {
sender: mpsc::UnboundedSender<TaskSchedulerRegistrationMessage>,
} }
impl fmt::Display for TaskSchedulerState { impl TaskSchedulerBackgroundTaskHandle {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { pub fn unregister(self, result: LuaResult<()>) {
write!( self.sender
f, .send(TaskSchedulerRegistrationMessage::Terminated(result))
"TaskSchedulerStatus(\nInstant: {}\nDeferred: {}\nYielded: {}\nTotal: {})", .unwrap_or_else(|_| {
self.num_instant, self.num_deferred, self.num_future, self.num_total panic!(
) "\
\nFailed to unregister background task - this is an internal error! \
\nPlease report it at {} \
\nDetails: Manual \
",
env!("CARGO_PKG_REPOSITORY")
)
});
}
}
/// A struct representing the current state of the task scheduler
#[derive(Debug, Clone)]
pub struct TaskSchedulerResult {
lua_error: Option<LuaError>,
exit_code: Option<ExitCode>,
num_instant: usize,
num_deferred: usize,
num_futures: usize,
num_spawned: usize,
num_total: usize,
}
impl TaskSchedulerResult {
fn new(sched: &TaskScheduler) -> Self {
const MESSAGE: &str =
"Failed to get lock - make sure not to call during task scheduler resumption";
Self {
lua_error: None,
exit_code: if sched.exit_code_set.load(Ordering::Relaxed) {
Some(*sched.exit_code.try_lock().expect(MESSAGE))
} else {
None
},
num_instant: sched.task_queue_instant.try_lock().expect(MESSAGE).len(),
num_deferred: sched.task_queue_deferred.try_lock().expect(MESSAGE).len(),
num_futures: sched.futures.try_lock().expect(MESSAGE).len(),
num_spawned: sched.futures_counter.load(Ordering::Relaxed),
num_total: sched.tasks.try_lock().expect(MESSAGE).len(),
}
}
fn err(sched: &TaskScheduler, err: LuaError) -> Self {
let mut this = Self::new(sched);
this.lua_error = Some(err);
this
}
/**
Returns a clone of the error from
this task scheduler result, if any.
*/
pub fn get_lua_error(&self) -> Option<LuaError> {
self.lua_error.clone()
}
/**
Returns a clone of the exit code from
this task scheduler result, if any.
*/
pub fn get_exit_code(&self) -> Option<ExitCode> {
self.exit_code
}
/**
Returns `true` if the task scheduler is still busy,
meaning it still has lua threads left to run.
*/
#[allow(dead_code)]
pub fn is_busy(&self) -> bool {
self.num_total > 0
}
/**
Returns `true` if the task scheduler is done,
meaning it has no lua threads left to run, and
no spawned tasks are running in the background.
*/
pub fn is_done(&self) -> bool {
self.num_total == 0 && self.num_spawned == 0
}
/**
Returns `true` if the task scheduler has finished all
lua threads, but still has background tasks running.
*/
#[allow(dead_code)]
pub fn is_background(&self) -> bool {
self.num_total == 0 && self.num_spawned > 0
} }
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum TaskSchedulerResult { pub enum TaskSchedulerRegistrationMessage {
Finished { Spawned,
state: TaskSchedulerState, Terminated(LuaResult<()>),
},
TaskErrored {
error: LuaError,
state: TaskSchedulerState,
},
TaskSuccessful {
state: TaskSchedulerState,
},
} }
/// A task scheduler that implements task queues /// A task scheduler that implements task queues
@ -121,6 +204,9 @@ pub struct TaskScheduler<'fut> {
lua: &'static Lua, lua: &'static Lua,
tasks: Arc<Mutex<HashMap<TaskReference, Task>>>, tasks: Arc<Mutex<HashMap<TaskReference, Task>>>,
futures: Arc<AsyncMutex<FuturesUnordered<TaskFuture<'fut>>>>, futures: Arc<AsyncMutex<FuturesUnordered<TaskFuture<'fut>>>>,
futures_tx: mpsc::UnboundedSender<TaskSchedulerRegistrationMessage>,
futures_rx: Arc<AsyncMutex<mpsc::UnboundedReceiver<TaskSchedulerRegistrationMessage>>>,
futures_counter: AtomicUsize,
task_queue_instant: TaskSchedulerQueue, task_queue_instant: TaskSchedulerQueue,
task_queue_deferred: TaskSchedulerQueue, task_queue_deferred: TaskSchedulerQueue,
exit_code_set: AtomicBool, exit_code_set: AtomicBool,
@ -134,10 +220,14 @@ impl<'fut> TaskScheduler<'fut> {
Creates a new task scheduler. Creates a new task scheduler.
*/ */
pub fn new(lua: &'static Lua) -> LuaResult<Self> { pub fn new(lua: &'static Lua) -> LuaResult<Self> {
let (tx, rx) = mpsc::unbounded_channel();
Ok(Self { Ok(Self {
lua, lua,
tasks: Arc::new(Mutex::new(HashMap::new())), tasks: Arc::new(Mutex::new(HashMap::new())),
futures: Arc::new(AsyncMutex::new(FuturesUnordered::new())), futures: Arc::new(AsyncMutex::new(FuturesUnordered::new())),
futures_tx: tx,
futures_rx: Arc::new(AsyncMutex::new(rx)),
futures_counter: AtomicUsize::new(0),
task_queue_instant: Arc::new(Mutex::new(VecDeque::new())), task_queue_instant: Arc::new(Mutex::new(VecDeque::new())),
task_queue_deferred: Arc::new(Mutex::new(VecDeque::new())), task_queue_deferred: Arc::new(Mutex::new(VecDeque::new())),
exit_code_set: AtomicBool::new(false), exit_code_set: AtomicBool::new(false),
@ -162,27 +252,6 @@ impl<'fut> TaskScheduler<'fut> {
Box::leak(Box::new(self)) Box::leak(Box::new(self))
} }
/**
Gets the current state of the task scheduler.
Panics if called during any of the task scheduler resumption phases.
*/
pub fn state(&self) -> TaskSchedulerState {
const MESSAGE: &str =
"Failed to get lock - make sure not to call during task scheduler resumption";
TaskSchedulerState {
exit_code: if self.exit_code_set.load(Ordering::Relaxed) {
Some(*self.exit_code.try_lock().expect(MESSAGE))
} else {
None
},
num_instant: self.task_queue_instant.try_lock().expect(MESSAGE).len(),
num_deferred: self.task_queue_deferred.try_lock().expect(MESSAGE).len(),
num_future: self.futures.try_lock().expect(MESSAGE).len(),
num_total: self.tasks.try_lock().expect(MESSAGE).len(),
}
}
/** /**
Stores the exit code for the task scheduler. Stores the exit code for the task scheduler.
@ -593,6 +662,33 @@ impl<'fut> TaskScheduler<'fut> {
Ok(()) Ok(())
} }
/**
Registers a new background task with the task scheduler.
This will ensure that the task scheduler keeps running until a
call to [`TaskScheduler::deregister_background_task`] is made.
The returned [`TaskSchedulerUnregistrar::unregister`]
must be called upon completion of the background task to
prevent the task scheduler from running indefinitely.
*/
pub fn register_background_task(&self) -> TaskSchedulerBackgroundTaskHandle {
let sender = self.futures_tx.clone();
sender
.send(TaskSchedulerRegistrationMessage::Spawned)
.unwrap_or_else(|e| {
panic!(
"\
\nFailed to unregister background task - this is an internal error! \
\nPlease report it at {} \
\nDetails: {e} \
",
env!("CARGO_PKG_REPOSITORY")
)
});
TaskSchedulerBackgroundTaskHandle { sender }
}
/** /**
Retrieves the queue for a specific kind of task. Retrieves the queue for a specific kind of task.
@ -609,18 +705,6 @@ impl<'fut> TaskScheduler<'fut> {
} }
} }
/**
Checks if a future exists in the task queue.
Panics if called during resumption of the futures task queue.
*/
fn next_queue_future_exists(&self) -> bool {
let futs = self.futures.try_lock().expect(
"Failed to get lock on futures - make sure not to call during futures resumption",
);
!futs.is_empty()
}
/** /**
Resumes the next queued Lua task, if one exists, blocking Resumes the next queued Lua task, if one exists, blocking
the current thread until it either yields or finishes. the current thread until it either yields or finishes.
@ -634,26 +718,10 @@ impl<'fut> TaskScheduler<'fut> {
let mut queue_guard = self.get_queue(kind).lock().unwrap(); let mut queue_guard = self.get_queue(kind).lock().unwrap();
queue_guard.pop_front() queue_guard.pop_front()
} { } {
None => { None => TaskSchedulerResult::new(self),
let status = self.state();
if status.num_total > 0 {
TaskSchedulerResult::TaskSuccessful {
state: self.state(),
}
} else {
TaskSchedulerResult::Finished {
state: self.state(),
}
}
}
Some(task) => match self.resume_task(task, override_args) { Some(task) => match self.resume_task(task, override_args) {
Ok(()) => TaskSchedulerResult::TaskSuccessful { Ok(()) => TaskSchedulerResult::new(self),
state: self.state(), Err(task_err) => TaskSchedulerResult::err(self, task_err),
},
Err(task_err) => TaskSchedulerResult::TaskErrored {
error: task_err,
state: self.state(),
},
}, },
} }
} }
@ -680,15 +748,15 @@ impl<'fut> TaskScheduler<'fut> {
match result { match result {
(task, Err(fut_err)) => { (task, Err(fut_err)) => {
// Future errored, don't resume its associated task // Future errored, don't resume its associated task
// and make sure to cancel / remove it completely // and make sure to cancel / remove it completely, if removal
let error_prefer_cancel = match self.remove_task(task) { // also errors then we send that error back instead of the future's error
Err(cancel_err) => cancel_err, TaskSchedulerResult::err(
Ok(_) => fut_err, self,
}; match self.remove_task(task) {
TaskSchedulerResult::TaskErrored { Err(cancel_err) => cancel_err,
error: error_prefer_cancel, Ok(_) => fut_err,
state: self.state(), },
} )
} }
(task, Ok(args)) => { (task, Ok(args)) => {
// Promote this future task to an instant task // Promote this future task to an instant task
@ -702,6 +770,46 @@ impl<'fut> TaskScheduler<'fut> {
} }
} }
/**
Awaits the next background task registration
message, if any messages exist in the queue.
This is a no-op if there are no messages.
*/
async fn receive_next_message(&self) -> TaskSchedulerResult {
let message_opt = {
let mut rx = self.futures_rx.lock().await;
rx.recv().await
};
if let Some(message) = message_opt {
match message {
TaskSchedulerRegistrationMessage::Spawned => {
self.futures_counter.fetch_add(1, Ordering::Relaxed);
TaskSchedulerResult::new(self)
}
TaskSchedulerRegistrationMessage::Terminated(result) => {
let prev = self.futures_counter.fetch_sub(1, Ordering::Relaxed);
if prev == 0 {
panic!(
r#"
Terminated a background task without it running - this is an internal error!
Please report it at {}
"#,
env!("CARGO_PKG_REPOSITORY")
)
}
if let Err(e) = result {
TaskSchedulerResult::err(self, e)
} else {
TaskSchedulerResult::new(self)
}
}
}
} else {
TaskSchedulerResult::new(self)
}
}
/** /**
Resumes the task scheduler queue. Resumes the task scheduler queue.
@ -712,27 +820,36 @@ impl<'fut> TaskScheduler<'fut> {
futures concurrently, awaiting the first one to be ready for resumption. futures concurrently, awaiting the first one to be ready for resumption.
*/ */
pub async fn resume_queue(&self) -> TaskSchedulerResult { pub async fn resume_queue(&self) -> TaskSchedulerResult {
let status = self.state(); let current = TaskSchedulerResult::new(self);
/* /*
Resume tasks in the internal queue, in this order: Resume tasks in the internal queue, in this order:
1. Tasks from task.spawn, this includes the main thread * 🛑 = blocking - lua tasks, in order
2. Tasks from task.defer * = async - first come, first serve
3. Tasks from task.delay / task.wait / native futures, first ready first resumed
1. 🛑 Tasks from task.spawn and the main thread
2. 🛑 Tasks from task.defer
3. Tasks from task.delay / task.wait, spawned background tasks
*/ */
if status.num_instant > 0 { if current.num_instant > 0 {
self.resume_next_queue_task(TaskKind::Instant, None) self.resume_next_queue_task(TaskKind::Instant, None)
} else if status.num_deferred > 0 { } else if current.num_deferred > 0 {
self.resume_next_queue_task(TaskKind::Deferred, None) self.resume_next_queue_task(TaskKind::Deferred, None)
} else { } else if current.num_futures > 0 && current.num_spawned > 0 {
// 3. Threads from task.delay or task.wait, futures // Futures, spawned background tasks
if self.next_queue_future_exists() { tokio::select! {
self.resume_next_queue_future().await result = self.resume_next_queue_future() => result,
} else { result = self.receive_next_message() => result,
TaskSchedulerResult::Finished {
state: self.state(),
}
} }
} else if current.num_futures > 0 {
// Futures
self.resume_next_queue_future().await
} else if current.num_spawned > 0 {
// Only spawned background tasks, these may then
// spawn new lua tasks and "wake up" the scheduler
self.receive_next_message().await
} else {
TaskSchedulerResult::new(self)
} }
} }
} }

View file

@ -60,5 +60,4 @@ assert(
) )
-- Stop the server to end the test -- Stop the server to end the test
handle2.stop() handle2.stop()