Move scheduler fns into traits, improve error handling for scheduler not existing

This commit is contained in:
Filip Tibell 2023-02-15 13:22:13 +01:00
parent fa14e4a02b
commit b1e08bf813
No known key found for this signature in database
15 changed files with 522 additions and 363 deletions

View file

@ -10,7 +10,7 @@ use crate::{
lua::{ lua::{
// net::{NetWebSocketClient, NetWebSocketServer}, // net::{NetWebSocketClient, NetWebSocketServer},
net::{NetClient, NetClientBuilder, NetLocalExec, NetService, RequestConfig, ServeConfig}, net::{NetClient, NetClientBuilder, NetLocalExec, NetService, RequestConfig, ServeConfig},
task::TaskScheduler, task::{TaskScheduler, TaskSchedulerAsyncExt},
}, },
utils::{net::get_request_user_agent_header, table::TableBuilder}, utils::{net::get_request_user_agent_header, table::TableBuilder},
}; };
@ -108,7 +108,9 @@ async fn net_serve<'a>(
lua.create_registry_value(handler) lua.create_registry_value(handler)
.expect("Failed to store websocket handler") .expect("Failed to store websocket handler")
}); });
let sched = lua.app_data_ref::<&TaskScheduler>().unwrap(); let sched = lua
.app_data_ref::<&TaskScheduler>()
.expect("Missing task scheduler - make sure it is added as a lua app data before the first scheduler resumption");
// Bind first to make sure that we can bind to this address // Bind first to make sure that we can bind to this address
let bound = match Server::try_bind(&([127, 0, 0, 1], port).into()) { let bound = match Server::try_bind(&([127, 0, 0, 1], port).into()) {
Err(e) => { Err(e) => {

View file

@ -50,7 +50,9 @@ pub fn create(lua: &'static Lua, args_vec: Vec<String>) -> LuaResult<LuaTable> {
let process_exit_env_yield: LuaFunction = lua.named_registry_value("co.yield")?; let process_exit_env_yield: LuaFunction = lua.named_registry_value("co.yield")?;
let process_exit_env_exit: LuaFunction = lua.create_function(|lua, code: Option<u8>| { let process_exit_env_exit: LuaFunction = lua.create_function(|lua, code: Option<u8>| {
let exit_code = code.map_or(ExitCode::SUCCESS, ExitCode::from); let exit_code = code.map_or(ExitCode::SUCCESS, ExitCode::from);
let sched = lua.app_data_ref::<&TaskScheduler>().unwrap(); let sched = lua
.app_data_ref::<&TaskScheduler>()
.expect("Missing task scheduler - make sure it is added as a lua app data before the first scheduler resumption");
sched.set_exit_code(exit_code); sched.set_exit_code(exit_code);
Ok(()) Ok(())
})?; })?;

View file

@ -1,10 +1,12 @@
use mlua::prelude::*; use mlua::prelude::*;
use crate::{ use crate::{
lua::task::{TaskKind, TaskReference, TaskScheduler}, lua::task::{TaskKind, TaskReference, TaskScheduler, TaskSchedulerScheduleExt},
utils::table::TableBuilder, utils::table::TableBuilder,
}; };
const ERR_MISSING_SCHEDULER: &str = "Missing task scheduler - make sure it is added as a lua app data before the first scheduler resumption";
const TASK_WAIT_IMPL_LUA: &str = r#" const TASK_WAIT_IMPL_LUA: &str = r#"
local seconds = ... local seconds = ...
local current = thread() local current = thread()
@ -15,7 +17,7 @@ return yield()
const TASK_SPAWN_IMPL_LUA: &str = r#" const TASK_SPAWN_IMPL_LUA: &str = r#"
-- Schedule the current thread at the front -- Schedule the current thread at the front
scheduleNext(thread()) scheduleNext(thread())
-- Schedule the thread to spawn at the front, -- Schedule the wanted task arg at the front,
-- the previous schedule now comes right after -- the previous schedule now comes right after
local task = scheduleNext(...) local task = scheduleNext(...)
-- Give control over to the scheduler, which will -- Give control over to the scheduler, which will
@ -25,20 +27,26 @@ return task
"#; "#;
pub fn create(lua: &'static Lua) -> LuaResult<LuaTable<'static>> { pub fn create(lua: &'static Lua) -> LuaResult<LuaTable<'static>> {
// Create a user-accessible function cancel tasks // Create a user-accessible function that cancels a task
let task_cancel = lua.create_function(|lua, task: TaskReference| { let task_cancel = lua.create_function(|lua, task: TaskReference| {
let sched = lua.app_data_ref::<&TaskScheduler>().unwrap(); let sched = lua
.app_data_ref::<&TaskScheduler>()
.expect(ERR_MISSING_SCHEDULER);
sched.remove_task(task)?; sched.remove_task(task)?;
Ok(()) Ok(())
})?; })?;
// Create functions that manipulate non-blocking tasks in the scheduler // Create functions that manipulate non-blocking tasks in the scheduler
let task_defer = lua.create_function(|lua, (tof, args): (LuaValue, LuaMultiValue)| { let task_defer = lua.create_function(|lua, (tof, args): (LuaValue, LuaMultiValue)| {
let sched = lua.app_data_ref::<&TaskScheduler>().unwrap(); let sched = lua
sched.schedule_deferred(tof, args) .app_data_ref::<&TaskScheduler>()
.expect(ERR_MISSING_SCHEDULER);
sched.schedule_blocking_deferred(tof, args)
})?; })?;
let task_delay = let task_delay =
lua.create_function(|lua, (secs, tof, args): (f64, LuaValue, LuaMultiValue)| { lua.create_function(|lua, (secs, tof, args): (f64, LuaValue, LuaMultiValue)| {
let sched = lua.app_data_ref::<&TaskScheduler>().unwrap(); let sched = lua
.app_data_ref::<&TaskScheduler>()
.expect(ERR_MISSING_SCHEDULER);
sched.schedule_delayed(secs, tof, args) sched.schedule_delayed(secs, tof, args)
})?; })?;
// Create our task wait function, this is a bit different since // Create our task wait function, this is a bit different since
@ -55,7 +63,9 @@ pub fn create(lua: &'static Lua) -> LuaResult<LuaTable<'static>> {
.with_function( .with_function(
"resumeAfter", "resumeAfter",
|lua, (secs, thread): (Option<f64>, LuaThread)| { |lua, (secs, thread): (Option<f64>, LuaThread)| {
let sched = lua.app_data_ref::<&TaskScheduler>().unwrap(); let sched = lua
.app_data_ref::<&TaskScheduler>()
.expect(ERR_MISSING_SCHEDULER);
sched.schedule_wait(secs.unwrap_or(0f64), LuaValue::Thread(thread)) sched.schedule_wait(secs.unwrap_or(0f64), LuaValue::Thread(thread))
}, },
)? )?
@ -76,8 +86,10 @@ pub fn create(lua: &'static Lua) -> LuaResult<LuaTable<'static>> {
.with_function( .with_function(
"scheduleNext", "scheduleNext",
|lua, (tof, args): (LuaValue, LuaMultiValue)| { |lua, (tof, args): (LuaValue, LuaMultiValue)| {
let sched = lua.app_data_ref::<&TaskScheduler>().unwrap(); let sched = lua
sched.schedule_next(tof, args) .app_data_ref::<&TaskScheduler>()
.expect(ERR_MISSING_SCHEDULER);
sched.schedule_blocking(tof, args)
}, },
)? )?
.build_readonly()?, .build_readonly()?,
@ -115,29 +127,37 @@ pub fn create(lua: &'static Lua) -> LuaResult<LuaTable<'static>> {
coroutine.set( coroutine.set(
"resume", "resume",
lua.create_function(|lua, value: LuaValue| { lua.create_function(|lua, value: LuaValue| {
let tname = value.type_name();
if let LuaValue::Thread(thread) = value { if let LuaValue::Thread(thread) = value {
let sched = lua.app_data_ref::<&TaskScheduler>().unwrap(); let sched = lua
.app_data_ref::<&TaskScheduler>()
.expect(ERR_MISSING_SCHEDULER);
let task = let task =
sched.create_task(TaskKind::Instant, LuaValue::Thread(thread), None, None)?; sched.create_task(TaskKind::Instant, LuaValue::Thread(thread), None, None)?;
sched.resume_task(task, None) sched.resume_task(task, None)
} else if let Ok(task) = TaskReference::from_lua(value, lua) { } else if let Ok(task) = TaskReference::from_lua(value, lua) {
let sched = lua.app_data_ref::<&TaskScheduler>().unwrap(); lua.app_data_ref::<&TaskScheduler>()
sched.resume_task(task, None) .expect(ERR_MISSING_SCHEDULER)
.resume_task(task, None)
} else { } else {
Err(LuaError::RuntimeError( Err(LuaError::RuntimeError(format!(
"Argument #1 must be a thread".to_string(), "Argument #1 must be a thread, got {tname}",
)) )))
} }
})?, })?,
)?; )?;
coroutine.set( coroutine.set(
"wrap", "wrap",
lua.create_function(|lua, func: LuaFunction| { lua.create_function(|lua, func: LuaFunction| {
let sched = lua.app_data_ref::<&TaskScheduler>().unwrap(); let sched = lua
.app_data_ref::<&TaskScheduler>()
.expect(ERR_MISSING_SCHEDULER);
let task = let task =
sched.create_task(TaskKind::Instant, LuaValue::Function(func), None, None)?; sched.create_task(TaskKind::Instant, LuaValue::Function(func), None, None)?;
lua.create_function(move |lua, args: LuaMultiValue| { lua.create_function(move |lua, args: LuaMultiValue| {
let sched = lua.app_data_ref::<&TaskScheduler>().unwrap(); let sched = lua
.app_data_ref::<&TaskScheduler>()
.expect(ERR_MISSING_SCHEDULER);
sched.resume_task(task, Some(Ok(args))) sched.resume_task(task, Some(Ok(args)))
}) })
})?, })?,

View file

@ -1,6 +1,6 @@
use std::{collections::HashSet, process::ExitCode}; use std::{collections::HashSet, process::ExitCode};
use lua::task::TaskScheduler; use lua::task::{TaskScheduler, TaskSchedulerResumeExt, TaskSchedulerScheduleExt};
use mlua::prelude::*; use mlua::prelude::*;
use tokio::task::LocalSet; use tokio::task::LocalSet;
@ -103,7 +103,7 @@ impl Lune {
// Create our task scheduler and schedule the main thread on it // Create our task scheduler and schedule the main thread on it
let sched = TaskScheduler::new(lua)?.into_static(); let sched = TaskScheduler::new(lua)?.into_static();
lua.set_app_data(sched); lua.set_app_data(sched);
sched.schedule_next( sched.schedule_blocking(
LuaValue::Function( LuaValue::Function(
lua.load(script_contents) lua.load(script_contents)
.set_name(script_name) .set_name(script_name)

View file

@ -51,7 +51,9 @@ impl Service<Request<Body>> for NetServiceInner {
let _ws = ws.await.map_err(LuaError::external)?; let _ws = ws.await.map_err(LuaError::external)?;
// let sock = NetWebSocketServer::from(ws); // let sock = NetWebSocketServer::from(ws);
// let table = sock.into_lua_table(lua)?; // let table = sock.into_lua_table(lua)?;
// let sched = lua.app_data_ref::<&TaskScheduler>().unwrap(); // let sched = lua
// .app_data_ref::<&TaskScheduler>()
// .expect("Missing task scheduler - make sure it is added as a lua app data before the first scheduler resumption");
// sched.schedule_current_resume( // sched.schedule_current_resume(
// LuaValue::Function(handler), // LuaValue::Function(handler),
// LuaMultiValue::from_vec(vec![LuaValue::Table(table)]), // LuaMultiValue::from_vec(vec![LuaValue::Table(table)]),

View file

@ -0,0 +1,147 @@
use async_trait::async_trait;
use futures_util::Future;
use mlua::prelude::*;
use crate::utils::table::TableBuilder;
use super::super::{
async_handle::TaskSchedulerAsyncHandle, message::TaskSchedulerMessage,
scheduler::TaskReference, scheduler::TaskScheduler,
};
const TASK_ASYNC_IMPL_LUA: &str = r#"
resumeAsync(thread(), ...)
return yield()
"#;
/*
Trait definition - same as the implementation, ignore this
We use traits here to prevent misuse of certain scheduler
APIs, making importing of them as intentional as possible
*/
#[async_trait(?Send)]
pub trait TaskSchedulerAsyncExt<'fut> {
fn register_background_task(&self) -> TaskSchedulerAsyncHandle;
fn schedule_async<'sched, R, F, FR>(
&'sched self,
thread_or_function: LuaValue<'_>,
func: F,
) -> LuaResult<TaskReference>
where
'sched: 'fut,
R: ToLuaMulti<'static>,
F: 'static + Fn(&'static Lua) -> FR,
FR: 'static + Future<Output = LuaResult<R>>;
fn make_scheduled_async_fn<A, R, F, FR>(&self, func: F) -> LuaResult<LuaFunction>
where
A: FromLuaMulti<'static>,
R: ToLuaMulti<'static>,
F: 'static + Fn(&'static Lua, A) -> FR,
FR: 'static + Future<Output = LuaResult<R>>;
}
/*
Trait implementation
*/
#[async_trait(?Send)]
impl<'fut> TaskSchedulerAsyncExt<'fut> for TaskScheduler<'fut> {
/**
Registers a new background task with the task scheduler.
The returned [`TaskSchedulerAsyncHandle`] must have its
[`TaskSchedulerAsyncHandle::unregister`] method called
upon completion of the background task to prevent
the task scheduler from running indefinitely.
*/
fn register_background_task(&self) -> TaskSchedulerAsyncHandle {
let sender = self.futures_tx.clone();
sender
.send(TaskSchedulerMessage::Spawned)
.unwrap_or_else(|e| {
panic!(
"\
\nFailed to unregister background task - this is an internal error! \
\nPlease report it at {} \
\nDetails: {e} \
",
env!("CARGO_PKG_REPOSITORY")
)
});
TaskSchedulerAsyncHandle::new(sender)
}
/**
Schedules a lua thread or function
to be resumed after running a future.
The given lua thread or function will be resumed
using the optional arguments returned by the future.
*/
fn schedule_async<'sched, R, F, FR>(
&'sched self,
thread_or_function: LuaValue<'_>,
func: F,
) -> LuaResult<TaskReference>
where
'sched: 'fut, // Scheduler must live at least as long as the future
R: ToLuaMulti<'static>,
F: 'static + Fn(&'static Lua) -> FR,
FR: 'static + Future<Output = LuaResult<R>>,
{
self.queue_async_task(thread_or_function, None, None, async move {
match func(self.lua).await {
Ok(res) => match res.to_lua_multi(self.lua) {
Ok(multi) => Ok(Some(multi)),
Err(e) => Err(e),
},
Err(e) => Err(e),
}
})
}
/**
Creates a function callable from Lua that runs an async
closure and returns the results of it to the call site.
*/
fn make_scheduled_async_fn<A, R, F, FR>(&self, func: F) -> LuaResult<LuaFunction>
where
A: FromLuaMulti<'static>,
R: ToLuaMulti<'static>,
F: 'static + Fn(&'static Lua, A) -> FR,
FR: 'static + Future<Output = LuaResult<R>>,
{
let async_env_thread: LuaFunction = self.lua.named_registry_value("co.thread")?;
let async_env_yield: LuaFunction = self.lua.named_registry_value("co.yield")?;
self.lua
.load(TASK_ASYNC_IMPL_LUA)
.set_environment(
TableBuilder::new(self.lua)?
.with_value("thread", async_env_thread)?
.with_value("yield", async_env_yield)?
.with_function(
"resumeAsync",
move |lua: &Lua, (thread, args): (LuaThread, A)| {
let fut = func(lua, args);
let sched = lua
.app_data_ref::<&TaskScheduler>()
.expect("Missing task scheduler - make sure it is added as a lua app data before the first scheduler resumption");
sched.queue_async_task(LuaValue::Thread(thread), None, None, async {
let rets = fut.await?;
let mult = rets.to_lua_multi(lua)?;
Ok(Some(mult))
})
},
)?
.build_readonly()?,
)?
.into_function()
}
}

View file

@ -0,0 +1,7 @@
mod async_ext;
mod resume_ext;
mod schedule_ext;
pub use async_ext::TaskSchedulerAsyncExt;
pub use resume_ext::TaskSchedulerResumeExt;
pub use schedule_ext::TaskSchedulerScheduleExt;

View file

@ -0,0 +1,163 @@
use async_trait::async_trait;
use mlua::prelude::*;
use futures_util::StreamExt;
use super::super::{message::TaskSchedulerMessage, result::TaskSchedulerState, TaskScheduler};
/*
Trait definition - same as the implementation, ignore this
We use traits here to prevent misuse of certain scheduler
APIs, making importing of them as intentional as possible
*/
#[async_trait(?Send)]
pub trait TaskSchedulerResumeExt {
async fn resume_queue(&self) -> TaskSchedulerState;
}
/*
Trait implementation
*/
#[async_trait(?Send)]
impl TaskSchedulerResumeExt for TaskScheduler<'_> {
/**
Awaits the next background task registration
message, if any messages exist in the queue.
This is a no-op if there are no background tasks left running
and / or the background task messages channel was closed.
*/
async fn resume_queue(&self) -> TaskSchedulerState {
let current = TaskSchedulerState::new(self);
if current.has_blocking_tasks() {
// 1. Blocking tasks
resume_next_blocking_task(self, None)
} else if current.has_future_tasks() && current.has_background_tasks() {
// 2. Async + background tasks
tokio::select! {
result = resume_next_async_task(self) => result,
result = receive_next_message(self) => result,
}
} else if current.has_future_tasks() {
// 3. Async tasks
resume_next_async_task(self).await
} else if current.has_background_tasks() {
// 4. Background tasks
receive_next_message(self).await
} else {
TaskSchedulerState::new(self)
}
}
}
/*
Private functions for the trait that operate on the task scheduler
These could be implemented as normal methods but if we put them in the
trait they become public, and putting them in the task scheduler's
own implementation block will clutter that up unnecessarily
*/
/**
Resumes the next queued Lua task, if one exists, blocking
the current thread until it either yields or finishes.
*/
fn resume_next_blocking_task(
scheduler: &TaskScheduler<'_>,
override_args: Option<LuaResult<LuaMultiValue>>,
) -> TaskSchedulerState {
match {
let mut queue_guard = scheduler.tasks_queue_blocking.borrow_mut();
let task = queue_guard.pop_front();
drop(queue_guard);
task
} {
None => TaskSchedulerState::new(scheduler),
Some(task) => match scheduler.resume_task(task, override_args) {
Ok(_) => TaskSchedulerState::new(scheduler),
Err(task_err) => TaskSchedulerState::err(scheduler, task_err),
},
}
}
/**
Awaits the first available queued future, and resumes its associated
Lua task which will be ready for resumption when that future wakes.
Panics if there are no futures currently queued.
Use [`TaskScheduler::next_queue_future_exists`]
to check if there are any queued futures.
*/
async fn resume_next_async_task(scheduler: &TaskScheduler<'_>) -> TaskSchedulerState {
let (task, result) = {
let mut futs = scheduler
.futures
.try_lock()
.expect("Tried to resume next queued future while already resuming or modifying");
futs.next()
.await
.expect("Tried to resume next queued future but none are queued")
};
// Promote this future task to a blocking task and resume it
// right away, also taking care to not borrow mutably twice
// by dropping this guard before trying to resume it
let mut queue_guard = scheduler.tasks_queue_blocking.borrow_mut();
queue_guard.push_front(task);
drop(queue_guard);
resume_next_blocking_task(scheduler, result.transpose())
}
/**
Resumes the task scheduler queue.
This will run any spawned or deferred Lua tasks in a blocking manner.
Once all spawned and / or deferred Lua tasks have finished running,
this will process delayed tasks, waiting tasks, and native Rust
futures concurrently, awaiting the first one to be ready for resumption.
*/
async fn receive_next_message(scheduler: &TaskScheduler<'_>) -> TaskSchedulerState {
let message_opt = {
let mut rx = scheduler.futures_rx.lock().await;
rx.recv().await
};
if let Some(message) = message_opt {
match message {
TaskSchedulerMessage::NewBlockingTaskReady => TaskSchedulerState::new(scheduler),
TaskSchedulerMessage::Spawned => {
let prev = scheduler.futures_registered_count.get();
scheduler.futures_registered_count.set(prev + 1);
TaskSchedulerState::new(scheduler)
}
TaskSchedulerMessage::Terminated(result) => {
let prev = scheduler.futures_registered_count.get();
scheduler.futures_registered_count.set(prev - 1);
if prev == 0 {
panic!(
r#"
Terminated a background task without it running - this is an internal error!
Please report it at {}
"#,
env!("CARGO_PKG_REPOSITORY")
)
}
if let Err(e) = result {
TaskSchedulerState::err(scheduler, e)
} else {
TaskSchedulerState::new(scheduler)
}
}
}
} else {
TaskSchedulerState::new(scheduler)
}
}

View file

@ -0,0 +1,133 @@
use std::time::Duration;
use mlua::prelude::*;
use tokio::time::sleep;
use super::super::{scheduler::TaskKind, scheduler::TaskReference, scheduler::TaskScheduler};
/*
Trait definition - same as the implementation, ignore this
We use traits here to prevent misuse of certain scheduler
APIs, making importing of them as intentional as possible
*/
pub trait TaskSchedulerScheduleExt {
fn schedule_blocking(
&self,
thread_or_function: LuaValue<'_>,
thread_args: LuaMultiValue<'_>,
) -> LuaResult<TaskReference>;
fn schedule_blocking_deferred(
&self,
thread_or_function: LuaValue<'_>,
thread_args: LuaMultiValue<'_>,
) -> LuaResult<TaskReference>;
fn schedule_delayed(
&self,
after_secs: f64,
thread_or_function: LuaValue<'_>,
thread_args: LuaMultiValue<'_>,
) -> LuaResult<TaskReference>;
fn schedule_wait(
&self,
after_secs: f64,
thread_or_function: LuaValue<'_>,
) -> LuaResult<TaskReference>;
}
/*
Trait implementation
*/
impl TaskSchedulerScheduleExt for TaskScheduler<'_> {
/**
Schedules a lua thread or function to resume ***first*** during this
resumption point, ***skipping ahead*** of any other currently queued tasks.
The given lua thread or function will be resumed
using the given `thread_args` as its argument(s).
*/
fn schedule_blocking(
&self,
thread_or_function: LuaValue<'_>,
thread_args: LuaMultiValue<'_>,
) -> LuaResult<TaskReference> {
self.queue_blocking_task(
TaskKind::Instant,
thread_or_function,
Some(thread_args),
None,
)
}
/**
Schedules a lua thread or function to resume ***after all***
currently resuming tasks, during this resumption point.
The given lua thread or function will be resumed
using the given `thread_args` as its argument(s).
*/
fn schedule_blocking_deferred(
&self,
thread_or_function: LuaValue<'_>,
thread_args: LuaMultiValue<'_>,
) -> LuaResult<TaskReference> {
self.queue_blocking_task(
TaskKind::Deferred,
thread_or_function,
Some(thread_args),
None,
)
}
/**
Schedules a lua thread or function to
be resumed after waiting asynchronously.
The given lua thread or function will be resumed
using the given `thread_args` as its argument(s).
*/
fn schedule_delayed(
&self,
after_secs: f64,
thread_or_function: LuaValue<'_>,
thread_args: LuaMultiValue<'_>,
) -> LuaResult<TaskReference> {
self.queue_async_task(thread_or_function, Some(thread_args), None, async move {
sleep(Duration::from_secs_f64(after_secs)).await;
Ok(None)
})
}
/**
Schedules a lua thread or function to
be resumed after waiting asynchronously.
The given lua thread or function will be resumed
using the elapsed time as its one and only argument.
*/
fn schedule_wait(
&self,
after_secs: f64,
thread_or_function: LuaValue<'_>,
) -> LuaResult<TaskReference> {
self.queue_async_task(
thread_or_function,
None,
// Wait should recycle the guid of the current task,
// which ensures that the TaskReference is identical and
// that any waits inside of spawned tasks will also cancel
self.guid_running.get(),
async move {
sleep(Duration::from_secs_f64(after_secs)).await;
Ok(None)
},
)
}
}

View file

@ -1,5 +1,7 @@
use mlua::prelude::*; use mlua::prelude::*;
/// Internal message enum for the task scheduler, used to notify
/// futures to wake up and schedule their respective blocking tasks
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum TaskSchedulerMessage { pub enum TaskSchedulerMessage {
NewBlockingTaskReady, NewBlockingTaskReady,

View file

@ -1,10 +1,10 @@
mod async_handle; mod async_handle;
mod ext;
mod message; mod message;
mod result; mod result;
mod scheduler; mod scheduler;
mod task_kind; mod task_kind;
mod task_reference; mod task_reference;
pub use scheduler::TaskScheduler; pub use ext::*;
pub use task_kind::TaskKind; pub use scheduler::*;
pub use task_reference::TaskReference;

View file

@ -2,9 +2,9 @@ use std::{fmt, process::ExitCode};
use mlua::prelude::*; use mlua::prelude::*;
use super::TaskScheduler; use super::scheduler::TaskScheduler;
/// A struct representing the current state of the task scheduler /// Struct representing the current state of the task scheduler
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct TaskSchedulerState { pub struct TaskSchedulerState {
lua_error: Option<LuaError>, lua_error: Option<LuaError>,

View file

@ -3,32 +3,22 @@ use std::{
cell::{Cell, RefCell}, cell::{Cell, RefCell},
collections::{HashMap, VecDeque}, collections::{HashMap, VecDeque},
process::ExitCode, process::ExitCode,
time::Duration,
}; };
use futures_util::{future::LocalBoxFuture, stream::FuturesUnordered, Future, StreamExt}; use futures_util::{future::LocalBoxFuture, stream::FuturesUnordered, Future};
use mlua::prelude::*; use mlua::prelude::*;
use tokio::{ use tokio::{
sync::{mpsc, Mutex as AsyncMutex}, sync::{mpsc, Mutex as AsyncMutex},
time::{sleep, Instant}, time::Instant,
}; };
use crate::utils::table::TableBuilder; use super::message::TaskSchedulerMessage;
pub use super::{task_kind::TaskKind, task_reference::TaskReference};
use super::{
async_handle::TaskSchedulerAsyncHandle, message::TaskSchedulerMessage,
result::TaskSchedulerState, task_kind::TaskKind, task_reference::TaskReference,
};
type TaskFutureRets<'fut> = LuaResult<Option<LuaMultiValue<'fut>>>; type TaskFutureRets<'fut> = LuaResult<Option<LuaMultiValue<'fut>>>;
type TaskFuture<'fut> = LocalBoxFuture<'fut, (TaskReference, TaskFutureRets<'fut>)>; type TaskFuture<'fut> = LocalBoxFuture<'fut, (TaskReference, TaskFutureRets<'fut>)>;
const TASK_ASYNC_IMPL_LUA: &str = r#"
resumeAsync(thread(), ...)
return yield()
"#;
/// A struct representing a task contained in the task scheduler /// A struct representing a task contained in the task scheduler
#[derive(Debug)] #[derive(Debug)]
pub struct Task { pub struct Task {
@ -52,9 +42,9 @@ pub struct TaskScheduler<'fut> {
which must use async-aware mutexes to be cancellation safe across await points. which must use async-aware mutexes to be cancellation safe across await points.
*/ */
// Internal state & flags // Internal state & flags
lua: &'static Lua, pub(super) lua: &'static Lua,
guid: Cell<usize>, pub(super) guid: Cell<usize>,
guid_running: Cell<Option<usize>>, pub(super) guid_running: Cell<Option<usize>>,
pub(super) exit_code: Cell<Option<ExitCode>>, pub(super) exit_code: Cell<Option<ExitCode>>,
// Blocking tasks // Blocking tasks
pub(super) tasks: RefCell<HashMap<TaskReference, Task>>, pub(super) tasks: RefCell<HashMap<TaskReference, Task>>,
@ -62,8 +52,8 @@ pub struct TaskScheduler<'fut> {
// Future tasks & objects for waking // Future tasks & objects for waking
pub(super) futures: AsyncMutex<FuturesUnordered<TaskFuture<'fut>>>, pub(super) futures: AsyncMutex<FuturesUnordered<TaskFuture<'fut>>>,
pub(super) futures_registered_count: Cell<usize>, pub(super) futures_registered_count: Cell<usize>,
futures_tx: mpsc::UnboundedSender<TaskSchedulerMessage>, pub(super) futures_tx: mpsc::UnboundedSender<TaskSchedulerMessage>,
futures_rx: AsyncMutex<mpsc::UnboundedReceiver<TaskSchedulerMessage>>, pub(super) futures_rx: AsyncMutex<mpsc::UnboundedReceiver<TaskSchedulerMessage>>,
} }
impl<'fut> TaskScheduler<'fut> { impl<'fut> TaskScheduler<'fut> {
@ -357,316 +347,4 @@ impl<'fut> TaskScheduler<'fut> {
})); }));
Ok(task_ref) Ok(task_ref)
} }
/**
Schedules a lua thread or function to resume ***first*** during this
resumption point, ***skipping ahead*** of any other currently queued tasks.
The given lua thread or function will be resumed
using the given `thread_args` as its argument(s).
*/
pub fn schedule_next(
&self,
thread_or_function: LuaValue<'_>,
thread_args: LuaMultiValue<'_>,
) -> LuaResult<TaskReference> {
self.queue_blocking_task(
TaskKind::Instant,
thread_or_function,
Some(thread_args),
None,
)
}
/**
Schedules a lua thread or function to resume ***after all***
currently resuming tasks, during this resumption point.
The given lua thread or function will be resumed
using the given `thread_args` as its argument(s).
*/
pub fn schedule_deferred(
&self,
thread_or_function: LuaValue<'_>,
thread_args: LuaMultiValue<'_>,
) -> LuaResult<TaskReference> {
self.queue_blocking_task(
TaskKind::Deferred,
thread_or_function,
Some(thread_args),
None,
)
}
/**
Schedules a lua thread or function to
be resumed after waiting asynchronously.
The given lua thread or function will be resumed
using the given `thread_args` as its argument(s).
*/
pub fn schedule_delayed(
&self,
after_secs: f64,
thread_or_function: LuaValue<'_>,
thread_args: LuaMultiValue<'_>,
) -> LuaResult<TaskReference> {
self.queue_async_task(thread_or_function, Some(thread_args), None, async move {
sleep(Duration::from_secs_f64(after_secs)).await;
Ok(None)
})
}
/**
Schedules a lua thread or function to
be resumed after waiting asynchronously.
The given lua thread or function will be resumed
using the elapsed time as its one and only argument.
*/
pub fn schedule_wait(
&self,
after_secs: f64,
thread_or_function: LuaValue<'_>,
) -> LuaResult<TaskReference> {
self.queue_async_task(
thread_or_function,
None,
// Wait should recycle the guid of the current task,
// which ensures that the TaskReference is identical and
// that any waits inside of spawned tasks will also cancel
self.guid_running.get(),
async move {
sleep(Duration::from_secs_f64(after_secs)).await;
Ok(None)
},
)
}
/**
Schedules a lua thread or function
to be resumed after running a future.
The given lua thread or function will be resumed
using the optional arguments returned by the future.
*/
#[allow(dead_code)]
pub fn schedule_async<'sched, R, F, FR>(
&'sched self,
thread_or_function: LuaValue<'_>,
func: F,
) -> LuaResult<TaskReference>
where
'sched: 'fut, // Scheduler must live at least as long as the future
R: ToLuaMulti<'static>,
F: 'static + Fn(&'static Lua) -> FR,
FR: 'static + Future<Output = LuaResult<R>>,
{
self.queue_async_task(thread_or_function, None, None, async move {
match func(self.lua).await {
Ok(res) => match res.to_lua_multi(self.lua) {
Ok(multi) => Ok(Some(multi)),
Err(e) => Err(e),
},
Err(e) => Err(e),
}
})
}
/**
Creates a function callable from Lua that runs an async
closure and returns the results of it to the call site.
*/
pub fn make_scheduled_async_fn<A, R, F, FR>(&self, func: F) -> LuaResult<LuaFunction>
where
A: FromLuaMulti<'static>,
R: ToLuaMulti<'static>,
F: 'static + Fn(&'static Lua, A) -> FR,
FR: 'static + Future<Output = LuaResult<R>>,
{
let async_env_thread: LuaFunction = self.lua.named_registry_value("co.thread")?;
let async_env_yield: LuaFunction = self.lua.named_registry_value("co.yield")?;
self.lua
.load(TASK_ASYNC_IMPL_LUA)
.set_environment(
TableBuilder::new(self.lua)?
.with_value("thread", async_env_thread)?
.with_value("yield", async_env_yield)?
.with_function(
"resumeAsync",
move |lua: &Lua, (thread, args): (LuaThread, A)| {
let fut = func(lua, args);
let sched = lua.app_data_ref::<&TaskScheduler>().unwrap();
sched.queue_async_task(LuaValue::Thread(thread), None, None, async {
let rets = fut.await?;
let mult = rets.to_lua_multi(lua)?;
Ok(Some(mult))
})
},
)?
.build_readonly()?,
)?
.into_function()
}
/**
Registers a new background task with the task scheduler.
This will ensure that the task scheduler keeps running until a
call to [`TaskScheduler::deregister_background_task`] is made.
The returned [`TaskSchedulerUnregistrar::unregister`]
must be called upon completion of the background task to
prevent the task scheduler from running indefinitely.
*/
pub fn register_background_task(&self) -> TaskSchedulerAsyncHandle {
let sender = self.futures_tx.clone();
sender
.send(TaskSchedulerMessage::Spawned)
.unwrap_or_else(|e| {
panic!(
"\
\nFailed to unregister background task - this is an internal error! \
\nPlease report it at {} \
\nDetails: {e} \
",
env!("CARGO_PKG_REPOSITORY")
)
});
TaskSchedulerAsyncHandle::new(sender)
}
/**
Resumes the next queued Lua task, if one exists, blocking
the current thread until it either yields or finishes.
*/
fn resume_next_blocking_task(
&self,
override_args: Option<LuaResult<LuaMultiValue>>,
) -> TaskSchedulerState {
match {
let mut queue_guard = self.tasks_queue_blocking.borrow_mut();
let task = queue_guard.pop_front();
drop(queue_guard);
task
} {
None => TaskSchedulerState::new(self),
Some(task) => match self.resume_task(task, override_args) {
Ok(_) => TaskSchedulerState::new(self),
Err(task_err) => TaskSchedulerState::err(self, task_err),
},
}
}
/**
Awaits the first available queued future, and resumes its associated
Lua task which will be ready for resumption when that future wakes.
Panics if there are no futures currently queued.
Use [`TaskScheduler::next_queue_future_exists`]
to check if there are any queued futures.
*/
async fn resume_next_async_task(&self) -> TaskSchedulerState {
let (task, result) = {
let mut futs = self
.futures
.try_lock()
.expect("Tried to resume next queued future while already resuming or modifying");
futs.next()
.await
.expect("Tried to resume next queued future but none are queued")
};
// Promote this future task to a blocking task and resume it
// right away, also taking care to not borrow mutably twice
// by dropping this guard before trying to resume it
let mut queue_guard = self.tasks_queue_blocking.borrow_mut();
queue_guard.push_front(task);
drop(queue_guard);
self.resume_next_blocking_task(result.transpose())
}
/**
Awaits the next background task registration
message, if any messages exist in the queue.
This is a no-op if there are no background tasks left running
and / or the background task messages channel was closed.
*/
async fn receive_next_message(&self) -> TaskSchedulerState {
let message_opt = {
let mut rx = self.futures_rx.lock().await;
rx.recv().await
};
if let Some(message) = message_opt {
match message {
TaskSchedulerMessage::NewBlockingTaskReady => TaskSchedulerState::new(self),
TaskSchedulerMessage::Spawned => {
let prev = self.futures_registered_count.get();
self.futures_registered_count.set(prev + 1);
TaskSchedulerState::new(self)
}
TaskSchedulerMessage::Terminated(result) => {
let prev = self.futures_registered_count.get();
self.futures_registered_count.set(prev - 1);
if prev == 0 {
panic!(
r#"
Terminated a background task without it running - this is an internal error!
Please report it at {}
"#,
env!("CARGO_PKG_REPOSITORY")
)
}
if let Err(e) = result {
TaskSchedulerState::err(self, e)
} else {
TaskSchedulerState::new(self)
}
}
}
} else {
TaskSchedulerState::new(self)
}
}
/**
Resumes the task scheduler queue.
This will run any spawned or deferred Lua tasks in a blocking manner.
Once all spawned and / or deferred Lua tasks have finished running,
this will process delayed tasks, waiting tasks, and native Rust
futures concurrently, awaiting the first one to be ready for resumption.
*/
pub async fn resume_queue(&self) -> TaskSchedulerState {
let current = TaskSchedulerState::new(self);
/*
Resume tasks in the internal queue, in this order:
* 🛑 = blocking - lua tasks, in order
* = async - first come, first serve
1. 🛑 Tasks from task.spawn / task.defer, the main thread
2. Tasks from task.delay / task.wait, spawned background tasks
*/
if current.has_blocking_tasks() {
self.resume_next_blocking_task(None)
} else if current.has_future_tasks() && current.has_background_tasks() {
// Futures, spawned background tasks
tokio::select! {
result = self.resume_next_async_task() => result,
result = self.receive_next_message() => result,
}
} else if current.has_future_tasks() {
// Futures
self.resume_next_async_task().await
} else if current.has_background_tasks() {
// Only spawned background tasks, these may then
// spawn new lua tasks and "wake up" the scheduler
self.receive_next_message().await
} else {
TaskSchedulerState::new(self)
}
}
} }

View file

@ -1,6 +1,6 @@
use std::fmt; use std::fmt;
/// An enum representing different kinds of tasks /// Enum representing different kinds of tasks
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum TaskKind { pub enum TaskKind {
Instant, Instant,

View file

@ -2,7 +2,7 @@ use std::future::Future;
use mlua::prelude::*; use mlua::prelude::*;
use crate::lua::task::TaskScheduler; use crate::lua::task::{TaskScheduler, TaskSchedulerAsyncExt};
pub struct TableBuilder { pub struct TableBuilder {
lua: &'static Lua, lua: &'static Lua,
@ -78,7 +78,10 @@ impl TableBuilder {
F: 'static + Fn(&'static Lua, A) -> FR, F: 'static + Fn(&'static Lua, A) -> FR,
FR: 'static + Future<Output = LuaResult<R>>, FR: 'static + Future<Output = LuaResult<R>>,
{ {
let sched = self.lua.app_data_ref::<&TaskScheduler>().unwrap(); let sched = self
.lua
.app_data_ref::<&TaskScheduler>()
.expect("Missing task scheduler - make sure it is added as a lua app data before the first scheduler resumption");
let func = sched.make_scheduled_async_fn(func)?; let func = sched.make_scheduled_async_fn(func)?;
self.with_value(key, LuaValue::Function(func)) self.with_value(key, LuaValue::Function(func))
} }