Refactor new task scheduler

* Use cell & refcell, we don't need mutexes

* Split out into several modules
This commit is contained in:
Filip Tibell 2023-02-15 12:18:25 +01:00
parent 8d44e1e827
commit fa14e4a02b
No known key found for this signature in database
10 changed files with 489 additions and 493 deletions

View file

@ -6,13 +6,20 @@ use crate::{
};
const TASK_WAIT_IMPL_LUA: &str = r#"
resume_after(thread(), ...)
local seconds = ...
local current = thread()
resumeAfter(seconds, current)
return yield()
"#;
const TASK_SPAWN_IMPL_LUA: &str = r#"
local task = resume_first(...)
resume_second(thread())
-- Schedule the current thread at the front
scheduleNext(thread())
-- Schedule the thread to spawn at the front,
-- the previous schedule now comes right after
local task = scheduleNext(...)
-- Give control over to the scheduler, which will
-- resume the above tasks in order when its ready
yield()
return task
"#;
@ -46,8 +53,8 @@ pub fn create(lua: &'static Lua) -> LuaResult<LuaTable<'static>> {
.with_value("thread", task_wait_env_thread)?
.with_value("yield", task_wait_env_yield)?
.with_function(
"resume_after",
|lua, (thread, secs): (LuaThread, Option<f64>)| {
"resumeAfter",
|lua, (secs, thread): (Option<f64>, LuaThread)| {
let sched = lua.app_data_ref::<&TaskScheduler>().unwrap();
sched.schedule_wait(secs.unwrap_or(0f64), LuaValue::Thread(thread))
},
@ -67,19 +74,12 @@ pub fn create(lua: &'static Lua) -> LuaResult<LuaTable<'static>> {
.with_value("thread", task_spawn_env_thread)?
.with_value("yield", task_spawn_env_yield)?
.with_function(
"resume_first",
"scheduleNext",
|lua, (tof, args): (LuaValue, LuaMultiValue)| {
let sched = lua.app_data_ref::<&TaskScheduler>().unwrap();
sched.schedule_current_resume(tof, args)
sched.schedule_next(tof, args)
},
)?
.with_function("resume_second", |lua, thread: LuaThread| {
let sched = lua.app_data_ref::<&TaskScheduler>().unwrap();
sched.schedule_after_current_resume(
LuaValue::Thread(thread),
LuaMultiValue::new(),
)
})?
.build_readonly()?,
)?
.into_function()?;
@ -138,7 +138,7 @@ pub fn create(lua: &'static Lua) -> LuaResult<LuaTable<'static>> {
sched.create_task(TaskKind::Instant, LuaValue::Function(func), None, None)?;
lua.create_function(move |lua, args: LuaMultiValue| {
let sched = lua.app_data_ref::<&TaskScheduler>().unwrap();
sched.resume_task(task, Some(args.into_vec()))
sched.resume_task(task, Some(Ok(args)))
})
})?,
)?;

View file

@ -103,7 +103,7 @@ impl Lune {
// Create our task scheduler and schedule the main thread on it
let sched = TaskScheduler::new(lua)?.into_static();
lua.set_app_data(sched);
sched.schedule_current_resume(
sched.schedule_next(
LuaValue::Function(
lua.load(script_contents)
.set_name(script_name)

View file

@ -0,0 +1,46 @@
use core::panic;
use mlua::prelude::*;
use tokio::sync::mpsc;
use super::message::TaskSchedulerMessage;
/**
A handle to a registered asynchronous background task.
[`TaskSchedulerAsyncHandle::unregister`] must be
called upon completion of the background task to
prevent the task scheduler from running indefinitely.
*/
#[must_use = "Background tasks must be unregistered"]
#[derive(Debug)]
pub struct TaskSchedulerAsyncHandle {
unregistered: bool,
sender: mpsc::UnboundedSender<TaskSchedulerMessage>,
}
impl TaskSchedulerAsyncHandle {
pub fn new(sender: mpsc::UnboundedSender<TaskSchedulerMessage>) -> Self {
Self {
unregistered: false,
sender,
}
}
pub fn unregister(mut self, result: LuaResult<()>) {
self.unregistered = true;
self.sender
.send(TaskSchedulerMessage::Terminated(result))
.unwrap_or_else(|_| {
panic!(
"\
\nFailed to unregister background task - this is an internal error! \
\nPlease report it at {} \
\nDetails: Manual \
",
env!("CARGO_PKG_REPOSITORY")
)
});
}
}

View file

@ -0,0 +1,8 @@
use mlua::prelude::*;
#[derive(Debug, Clone)]
pub enum TaskSchedulerMessage {
NewBlockingTaskReady,
Spawned,
Terminated(LuaResult<()>),
}

View file

@ -1,3 +1,10 @@
mod async_handle;
mod message;
mod result;
mod scheduler;
mod task_kind;
mod task_reference;
pub use scheduler::*;
pub use scheduler::TaskScheduler;
pub use task_kind::TaskKind;
pub use task_reference::TaskReference;

View file

@ -0,0 +1,187 @@
use std::{fmt, process::ExitCode};
use mlua::prelude::*;
use super::TaskScheduler;
/// A struct representing the current state of the task scheduler
#[derive(Debug, Clone)]
pub struct TaskSchedulerState {
lua_error: Option<LuaError>,
exit_code: Option<ExitCode>,
num_blocking: usize,
num_futures: usize,
num_background: usize,
}
impl TaskSchedulerState {
pub(super) fn new(sched: &TaskScheduler) -> Self {
const MESSAGE: &str = "\
Failed to get lock on or borrow internal scheduler state!\
\nMake sure not to call during task scheduler resumption";
Self {
lua_error: None,
exit_code: sched.exit_code.get(),
num_blocking: sched
.tasks_queue_blocking
.try_borrow()
.expect(MESSAGE)
.len(),
num_futures: sched.futures.try_lock().expect(MESSAGE).len(),
num_background: sched.futures_registered_count.get(),
}
}
pub(super) fn err(sched: &TaskScheduler, err: LuaError) -> Self {
let mut this = Self::new(sched);
this.lua_error = Some(err);
this
}
pub(super) fn has_blocking_tasks(&self) -> bool {
self.num_blocking > 0
}
pub(super) fn has_future_tasks(&self) -> bool {
self.num_futures > 0
}
pub(super) fn has_background_tasks(&self) -> bool {
self.num_background > 0
}
/**
Returns a clone of the error from
this task scheduler result, if any.
*/
pub fn get_lua_error(&self) -> Option<LuaError> {
self.lua_error.clone()
}
/**
Returns a clone of the exit code from
this task scheduler result, if any.
*/
pub fn get_exit_code(&self) -> Option<ExitCode> {
self.exit_code
}
/**
Returns `true` if the task scheduler still
has blocking lua threads left to run.
*/
pub fn is_blocking(&self) -> bool {
self.num_blocking > 0
}
/**
Returns `true` if the task scheduler has finished all
blocking lua tasks, but still has yielding tasks running.
*/
pub fn is_yielding(&self) -> bool {
self.num_blocking == 0 && self.num_futures > 0
}
/**
Returns `true` if the task scheduler has finished all
lua threads, but still has background tasks running.
*/
pub fn is_background(&self) -> bool {
self.num_blocking == 0 && self.num_futures == 0 && self.num_background > 0
}
/**
Returns `true` if the task scheduler is done,
meaning it has no lua threads left to run, and
no spawned tasks are running in the background.
*/
pub fn is_done(&self) -> bool {
self.num_blocking == 0 && self.num_futures == 0 && self.num_background == 0
}
}
impl fmt::Display for TaskSchedulerState {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let status = if self.is_blocking() {
"Busy"
} else if self.is_yielding() {
"Yielding"
} else if self.is_background() {
"Background"
} else {
"Done"
};
let code = match self.get_exit_code() {
Some(code) => format!("{code:?}"),
None => "-".to_string(),
};
let err = match self.get_lua_error() {
Some(e) => format!("{e:?}")
.as_bytes()
.chunks(42) // Kinda arbitrary but should fit in most terminals
.enumerate()
.map(|(idx, buf)| {
format!(
"{}{}{}{}{}",
if idx == 0 { "" } else { "\n" },
if idx == 0 {
"".to_string()
} else {
" ".repeat(16)
},
if idx == 0 { "" } else { "" },
String::from_utf8_lossy(buf),
if buf.len() == 42 { "" } else { "" },
)
})
.collect::<String>(),
None => "-".to_string(),
};
let parts = vec![
format!("Status │ {status}"),
format!("Tasks active │ {}", self.num_blocking),
format!("Tasks background │ {}", self.num_background),
format!("Status code │ {code}"),
format!("Lua error │ {err}"),
];
let lengths = parts
.iter()
.map(|part| {
part.lines()
.next()
.unwrap()
.trim_end_matches("")
.chars()
.count()
})
.collect::<Vec<_>>();
let longest = &parts
.iter()
.enumerate()
.fold(0, |acc, (index, _)| acc.max(lengths[index]));
let sep = "".repeat(longest + 2);
writeln!(f, "┌{}┐", &sep)?;
for (index, part) in parts.iter().enumerate() {
writeln!(
f,
"│ {}{} │",
part.trim_end_matches(""),
" ".repeat(
longest
- part
.lines()
.last()
.unwrap()
.trim_end_matches("")
.chars()
.count()
)
)?;
if index < parts.len() - 1 {
writeln!(f, "┝{}┥", &sep)?;
}
}
write!(f, "└{}┘", &sep)?;
Ok(())
}
}

View file

@ -1,12 +1,8 @@
use core::panic;
use std::{
cell::{Cell, RefCell},
collections::{HashMap, VecDeque},
fmt,
process::ExitCode,
sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc, Mutex,
},
time::Duration,
};
@ -20,58 +16,19 @@ use tokio::{
use crate::utils::table::TableBuilder;
type TaskSchedulerQueue = Arc<Mutex<VecDeque<TaskReference>>>;
use super::{
async_handle::TaskSchedulerAsyncHandle, message::TaskSchedulerMessage,
result::TaskSchedulerState, task_kind::TaskKind, task_reference::TaskReference,
};
type TaskFutureArgsOverride<'fut> = Option<Vec<LuaValue<'fut>>>;
type TaskFutureReturns<'fut> = LuaResult<TaskFutureArgsOverride<'fut>>;
type TaskFuture<'fut> = LocalBoxFuture<'fut, (TaskReference, TaskFutureReturns<'fut>)>;
type TaskFutureRets<'fut> = LuaResult<Option<LuaMultiValue<'fut>>>;
type TaskFuture<'fut> = LocalBoxFuture<'fut, (TaskReference, TaskFutureRets<'fut>)>;
const TASK_ASYNC_IMPL_LUA: &str = r#"
resume_async(thread(), ...)
resumeAsync(thread(), ...)
return yield()
"#;
/// An enum representing different kinds of tasks
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum TaskKind {
Instant,
Deferred,
Future,
}
impl fmt::Display for TaskKind {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let name: &'static str = match self {
TaskKind::Instant => "Instant",
TaskKind::Deferred => "Deferred",
TaskKind::Future => "Future",
};
write!(f, "{name}")
}
}
/// A lightweight, copyable struct that represents a
/// task in the scheduler and is accessible from Lua
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct TaskReference {
kind: TaskKind,
guid: usize,
}
impl TaskReference {
pub const fn new(kind: TaskKind, guid: usize) -> Self {
Self { kind, guid }
}
}
impl fmt::Display for TaskReference {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "TaskReference({} - {})", self.kind, self.guid)
}
}
impl LuaUserData for TaskReference {}
/// A struct representing a task contained in the task scheduler
#[derive(Debug)]
pub struct Task {
@ -80,244 +37,33 @@ pub struct Task {
queued_at: Instant,
}
/**
A handle to a registered background task.
[`TaskSchedulerUnregistrar::unregister`] must be
called upon completion of the background task to
prevent the task scheduler from running indefinitely.
*/
#[must_use = "Background tasks must be unregistered"]
#[derive(Debug)]
pub struct TaskSchedulerBackgroundTaskHandle {
unregistered: bool,
sender: mpsc::UnboundedSender<TaskSchedulerMessage>,
}
impl TaskSchedulerBackgroundTaskHandle {
pub fn new(sender: mpsc::UnboundedSender<TaskSchedulerMessage>) -> Self {
Self {
unregistered: false,
sender,
}
}
pub fn unregister(mut self, result: LuaResult<()>) {
self.unregistered = true;
self.sender
.send(TaskSchedulerMessage::Terminated(result))
.unwrap_or_else(|_| {
panic!(
"\
\nFailed to unregister background task - this is an internal error! \
\nPlease report it at {} \
\nDetails: Manual \
",
env!("CARGO_PKG_REPOSITORY")
)
});
}
}
/// A struct representing the current state of the task scheduler
#[derive(Debug, Clone)]
pub struct TaskSchedulerResult {
lua_error: Option<LuaError>,
exit_code: Option<ExitCode>,
num_instant: usize,
num_deferred: usize,
num_futures: usize,
num_background: usize,
num_active: usize,
}
impl TaskSchedulerResult {
fn new(sched: &TaskScheduler) -> Self {
const MESSAGE: &str =
"Failed to get lock - make sure not to call during task scheduler resumption";
let num_instant = sched.task_queue_instant.try_lock().expect(MESSAGE).len();
let num_deferred = sched.task_queue_deferred.try_lock().expect(MESSAGE).len();
let num_active = num_instant + num_deferred;
Self {
lua_error: None,
exit_code: if sched.exit_code_set.load(Ordering::Relaxed) {
Some(*sched.exit_code.try_lock().expect(MESSAGE))
} else {
None
},
num_instant,
num_deferred,
num_futures: sched.futures.try_lock().expect(MESSAGE).len(),
num_background: sched.futures_in_background.load(Ordering::Relaxed),
num_active,
}
}
fn err(sched: &TaskScheduler, err: LuaError) -> Self {
let mut this = Self::new(sched);
this.lua_error = Some(err);
this
}
/**
Returns a clone of the error from
this task scheduler result, if any.
*/
pub fn get_lua_error(&self) -> Option<LuaError> {
self.lua_error.clone()
}
/**
Returns a clone of the exit code from
this task scheduler result, if any.
*/
pub fn get_exit_code(&self) -> Option<ExitCode> {
self.exit_code
}
/**
Returns `true` if the task scheduler is still busy,
meaning it still has lua threads left to run.
*/
pub fn is_busy(&self) -> bool {
self.num_active > 0
}
/**
Returns `true` if the task scheduler has finished all
blocking lua tasks, but still has yielding tasks running.
*/
pub fn is_yielding(&self) -> bool {
self.num_active == 0 && self.num_futures > 0
}
/**
Returns `true` if the task scheduler has finished all
lua threads, but still has background tasks running.
*/
pub fn is_background(&self) -> bool {
self.num_active == 0 && self.num_futures == 0 && self.num_background > 0
}
/**
Returns `true` if the task scheduler is done,
meaning it has no lua threads left to run, and
no spawned tasks are running in the background.
*/
pub fn is_done(&self) -> bool {
self.num_active == 0 && self.num_futures == 0 && self.num_background == 0
}
}
impl fmt::Display for TaskSchedulerResult {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let status = if self.is_busy() {
"Busy"
} else if self.is_yielding() {
"Yielding"
} else if self.is_background() {
"Background"
} else {
"Done"
};
let code = match self.get_exit_code() {
Some(code) => format!("{code:?}"),
None => "-".to_string(),
};
let err = match self.get_lua_error() {
Some(e) => format!("{e:?}")
.as_bytes()
.chunks(42) // Kinda arbitrary but should fit in most terminals
.enumerate()
.map(|(idx, buf)| {
format!(
"{}{}{}{}{}",
if idx == 0 { "" } else { "\n" },
if idx == 0 {
"".to_string()
} else {
" ".repeat(16)
},
if idx == 0 { "" } else { "" },
String::from_utf8_lossy(buf),
if buf.len() == 42 { "" } else { "" },
)
})
.collect::<String>(),
None => "-".to_string(),
};
let parts = vec![
format!("Status │ {status}"),
format!("Tasks active │ {}", self.num_active),
format!("Tasks background │ {}", self.num_background),
format!("Status code │ {code}"),
format!("Lua error │ {err}"),
];
let lengths = parts
.iter()
.map(|part| {
part.lines()
.next()
.unwrap()
.trim_end_matches("")
.chars()
.count()
})
.collect::<Vec<_>>();
let longest = &parts
.iter()
.enumerate()
.fold(0, |acc, (index, _)| acc.max(lengths[index]));
let sep = "".repeat(longest + 2);
writeln!(f, "┌{}┐", &sep)?;
for (index, part) in parts.iter().enumerate() {
writeln!(
f,
"│ {}{} │",
part.trim_end_matches(""),
" ".repeat(
longest
- part
.lines()
.last()
.unwrap()
.trim_end_matches("")
.chars()
.count()
)
)?;
if index < parts.len() - 1 {
writeln!(f, "┝{}┥", &sep)?;
}
}
writeln!(f, "└{}┘", &sep)?;
Ok(())
}
}
#[derive(Debug, Clone)]
pub enum TaskSchedulerMessage {
NewBlockingTaskReady,
Spawned,
Terminated(LuaResult<()>),
}
/// A task scheduler that implements task queues
/// with instant, deferred, and delayed tasks
#[derive(Debug)]
pub struct TaskScheduler<'fut> {
/*
Lots of cell and refcell here, however we need full interior mutability and never outer
since the scheduler struct may be accessed from lua more than once at the same time.
An example of this is the implementation of coroutine.resume, which instantly resumes the given
task, where the task getting resumed may also create new scheduler tasks during its resumption.
The same goes for values used during resumption of futures (`futures` and `futures_rx`)
which must use async-aware mutexes to be cancellation safe across await points.
*/
// Internal state & flags
lua: &'static Lua,
tasks: Arc<Mutex<HashMap<TaskReference, Task>>>,
futures: Arc<AsyncMutex<FuturesUnordered<TaskFuture<'fut>>>>,
guid: Cell<usize>,
guid_running: Cell<Option<usize>>,
pub(super) exit_code: Cell<Option<ExitCode>>,
// Blocking tasks
pub(super) tasks: RefCell<HashMap<TaskReference, Task>>,
pub(super) tasks_queue_blocking: RefCell<VecDeque<TaskReference>>,
// Future tasks & objects for waking
pub(super) futures: AsyncMutex<FuturesUnordered<TaskFuture<'fut>>>,
pub(super) futures_registered_count: Cell<usize>,
futures_tx: mpsc::UnboundedSender<TaskSchedulerMessage>,
futures_rx: Arc<AsyncMutex<mpsc::UnboundedReceiver<TaskSchedulerMessage>>>,
futures_in_background: AtomicUsize,
task_queue_instant: TaskSchedulerQueue,
task_queue_deferred: TaskSchedulerQueue,
exit_code_set: AtomicBool,
exit_code: Arc<Mutex<ExitCode>>,
guid: AtomicUsize,
guid_running_task: AtomicUsize,
futures_rx: AsyncMutex<mpsc::UnboundedReceiver<TaskSchedulerMessage>>,
}
impl<'fut> TaskScheduler<'fut> {
@ -328,19 +74,15 @@ impl<'fut> TaskScheduler<'fut> {
let (tx, rx) = mpsc::unbounded_channel();
Ok(Self {
lua,
tasks: Arc::new(Mutex::new(HashMap::new())),
futures: Arc::new(AsyncMutex::new(FuturesUnordered::new())),
guid: Cell::new(0),
guid_running: Cell::new(None),
exit_code: Cell::new(None),
tasks: RefCell::new(HashMap::new()),
tasks_queue_blocking: RefCell::new(VecDeque::new()),
futures: AsyncMutex::new(FuturesUnordered::new()),
futures_tx: tx,
futures_rx: Arc::new(AsyncMutex::new(rx)),
futures_in_background: AtomicUsize::new(0),
task_queue_instant: Arc::new(Mutex::new(VecDeque::new())),
task_queue_deferred: Arc::new(Mutex::new(VecDeque::new())),
exit_code_set: AtomicBool::new(false),
exit_code: Arc::new(Mutex::new(ExitCode::SUCCESS)),
// Global ids must start at 1, since 0 is a special
// value for guid_running_task that means "no task"
guid: AtomicUsize::new(1),
guid_running_task: AtomicUsize::new(0),
futures_rx: AsyncMutex::new(rx),
futures_registered_count: Cell::new(0),
})
}
@ -367,8 +109,7 @@ impl<'fut> TaskScheduler<'fut> {
should stop resuming tasks, and gracefully terminate the program.
*/
pub fn set_exit_code(&self, code: ExitCode) {
*self.exit_code.lock().unwrap() = code;
self.exit_code_set.store(true, Ordering::Relaxed);
self.exit_code.set(Some(code));
}
/**
@ -379,8 +120,7 @@ impl<'fut> TaskScheduler<'fut> {
*/
#[allow(dead_code)]
pub fn contains_task(&self, reference: TaskReference) -> bool {
let tasks = self.tasks.lock().unwrap();
tasks.contains_key(&reference)
self.tasks.borrow().contains_key(&reference)
}
/**
@ -411,6 +151,8 @@ impl<'fut> TaskScheduler<'fut> {
}
};
// Store the thread and its arguments in the registry
// NOTE: We must convert to a vec since multis
// can't be stored in the registry directly
let task_args_vec: Option<Vec<LuaValue>> = thread_args.map(|opt| opt.into_vec());
let task_args_key: LuaRegistryKey = self.lua.create_registry_value(task_args_vec)?;
let task_thread_key: LuaRegistryKey = self.lua.create_registry_value(task_thread)?;
@ -425,12 +167,13 @@ impl<'fut> TaskScheduler<'fut> {
let task_ref = if let Some(reusable_guid) = guid_to_reuse {
TaskReference::new(kind, reusable_guid)
} else {
let guid = self.guid.fetch_add(1, Ordering::Relaxed);
let guid = self.guid.get();
self.guid.set(guid + 1);
TaskReference::new(kind, guid)
};
// Add the task to the scheduler
{
let mut tasks = self.tasks.lock().unwrap();
let mut tasks = self.tasks.borrow_mut();
tasks.insert(task_ref, task);
}
Ok(task_ref)
@ -456,13 +199,13 @@ impl<'fut> TaskScheduler<'fut> {
are no tasks left in the task list, the futures do not matter there
*/
let mut found = false;
let mut tasks = self.tasks.lock().unwrap();
let mut tasks = self.tasks.borrow_mut();
// Unfortunately we have to loop through to find which task
// references to remove instead of removing directly since
// tasks can switch kinds between instant, deferred, future
let tasks_to_remove: Vec<_> = tasks
.keys()
.filter(|task_ref| task_ref.guid == reference.guid)
.filter(|task_ref| task_ref.id() == reference.id())
.copied()
.collect();
for task_ref in tasks_to_remove {
@ -486,27 +229,36 @@ impl<'fut> TaskScheduler<'fut> {
pub fn resume_task<'a>(
&self,
reference: TaskReference,
override_args: Option<Vec<LuaValue<'a>>>,
override_args: Option<LuaResult<LuaMultiValue<'a>>>,
) -> LuaResult<LuaMultiValue<'a>> {
self.guid_running_task
.store(reference.guid, Ordering::Relaxed);
let task = {
let mut tasks = self.tasks.lock().unwrap();
let mut tasks = self.tasks.borrow_mut();
match tasks.remove(&reference) {
Some(task) => task,
None => return Ok(LuaMultiValue::new()), // Task was removed
}
};
let thread: LuaThread = self.lua.registry_value(&task.thread)?;
let args_vec_opt = override_args.or_else(|| {
self.lua
let args_opt_res = override_args.or_else(|| {
Ok(self
.lua
.registry_value::<Option<Vec<LuaValue>>>(&task.args)
.expect("Failed to get stored args for task")
.map(LuaMultiValue::from_vec))
.transpose()
});
self.lua.remove_registry_value(task.thread)?;
self.lua.remove_registry_value(task.args)?;
let rets = if let Some(args) = args_vec_opt {
thread.resume::<_, LuaMultiValue>(LuaMultiValue::from_vec(args))
if let Some(args_res) = args_opt_res {
match args_res {
Err(e) => Err(e), // FIXME: We need to throw this error in lua to let pcall & friends handle it properly
Ok(args) => {
self.guid_running.set(Some(reference.id()));
let rets = thread.resume::<_, LuaMultiValue>(args);
self.guid_running.set(None);
rets
}
}
} else {
/*
The tasks did not get any arguments from either:
@ -519,11 +271,12 @@ impl<'fut> TaskScheduler<'fut> {
want the amount of time waited returned to them.
*/
let elapsed = task.queued_at.elapsed().as_secs_f64();
thread.resume::<_, LuaMultiValue>(elapsed)
};
self.guid_running_task.store(0, Ordering::Relaxed);
self.guid_running.set(Some(reference.id()));
let rets = thread.resume::<_, LuaMultiValue>(elapsed);
self.guid_running.set(None);
rets
}
}
/**
Queues a new task to run on the task scheduler.
@ -541,48 +294,26 @@ impl<'fut> TaskScheduler<'fut> {
-- Here we have either yielded or finished the above task
```
*/
fn queue_blocking_task(
pub(super) fn queue_blocking_task(
&self,
kind: TaskKind,
thread_or_function: LuaValue<'_>,
thread_args: Option<LuaMultiValue<'_>>,
guid_to_reuse: Option<usize>,
after_current_resume: bool,
) -> LuaResult<TaskReference> {
if kind == TaskKind::Future {
panic!("Tried to schedule future using normal task schedule method")
}
let task_ref = self.create_task(kind, thread_or_function, thread_args, guid_to_reuse)?;
// Note that we create two new inner new
// scopes to drop mutexes as fast as possible
let num_prev_blocking_tasks = {
let (should_defer, num_prev_tasks, mut queue) = {
let queue_instant = self.task_queue_instant.lock().unwrap();
let queue_deferred = self.task_queue_deferred.lock().unwrap();
let num_prev_tasks = queue_instant.len() + queue_deferred.len();
(
kind == TaskKind::Deferred,
num_prev_tasks,
match kind {
TaskKind::Instant => queue_instant,
TaskKind::Deferred => queue_deferred,
TaskKind::Future => unreachable!(),
},
)
};
if should_defer {
// Add the task to the front of the queue, unless it
// should be deferred, in that case add it to the back
let mut queue = self.tasks_queue_blocking.borrow_mut();
let num_prev_blocking_tasks = queue.len();
if kind == TaskKind::Deferred {
queue.push_back(task_ref);
} else if after_current_resume {
assert!(
queue.len() > 0,
"Cannot schedule a task after the first instant when task queue is empty"
);
queue.insert(1, task_ref);
} else {
queue.push_front(task_ref);
}
num_prev_tasks
};
/*
If we had any previous task and are currently async
waiting on tasks, we should send a signal to wake up
@ -603,12 +334,12 @@ impl<'fut> TaskScheduler<'fut> {
/**
Queues a new future to run on the task scheduler.
*/
fn queue_async_task(
pub(super) fn queue_async_task(
&self,
thread_or_function: LuaValue<'_>,
thread_args: Option<LuaMultiValue<'_>>,
guid_to_reuse: Option<usize>,
fut: impl Future<Output = TaskFutureReturns<'fut>> + 'fut,
fut: impl Future<Output = TaskFutureRets<'fut>> + 'fut,
) -> LuaResult<TaskReference> {
let task_ref = self.create_task(
TaskKind::Future,
@ -634,7 +365,7 @@ impl<'fut> TaskScheduler<'fut> {
The given lua thread or function will be resumed
using the given `thread_args` as its argument(s).
*/
pub fn schedule_current_resume(
pub fn schedule_next(
&self,
thread_or_function: LuaValue<'_>,
thread_args: LuaMultiValue<'_>,
@ -644,34 +375,6 @@ impl<'fut> TaskScheduler<'fut> {
thread_or_function,
Some(thread_args),
None,
false,
)
}
/**
Schedules a lua thread or function to resume ***after the first***
currently resuming task, during this resumption point.
The given lua thread or function will be resumed
using the given `thread_args` as its argument(s).
*/
pub fn schedule_after_current_resume(
&self,
thread_or_function: LuaValue<'_>,
thread_args: LuaMultiValue<'_>,
) -> LuaResult<TaskReference> {
self.queue_blocking_task(
TaskKind::Instant,
thread_or_function,
Some(thread_args),
// This should recycle the guid of the current task,
// since it will only be called to schedule resuming
// current thread after it gives resumption to another
match self.guid_running_task.load(Ordering::Relaxed) {
0 => panic!("Tried to schedule with no task running"),
guid => Some(guid),
},
true,
)
}
@ -692,7 +395,6 @@ impl<'fut> TaskScheduler<'fut> {
thread_or_function,
Some(thread_args),
None,
false,
)
}
@ -733,15 +435,7 @@ impl<'fut> TaskScheduler<'fut> {
// Wait should recycle the guid of the current task,
// which ensures that the TaskReference is identical and
// that any waits inside of spawned tasks will also cancel
match self.guid_running_task.load(Ordering::Relaxed) {
0 => {
// NOTE: We had this here to verify the behavior of our task scheduler during development,
// but for re-implementing coroutine.resume (which is not registered) we must not panic here
// panic!("Tried to schedule waiting task with no registered task running")
None
}
guid => Some(guid),
},
self.guid_running.get(),
async move {
sleep(Duration::from_secs_f64(after_secs)).await;
Ok(None)
@ -756,12 +450,27 @@ impl<'fut> TaskScheduler<'fut> {
The given lua thread or function will be resumed
using the optional arguments returned by the future.
*/
pub fn schedule_async(
&self,
#[allow(dead_code)]
pub fn schedule_async<'sched, R, F, FR>(
&'sched self,
thread_or_function: LuaValue<'_>,
fut: impl Future<Output = TaskFutureReturns<'fut>> + 'fut,
) -> LuaResult<TaskReference> {
self.queue_async_task(thread_or_function, None, None, fut)
func: F,
) -> LuaResult<TaskReference>
where
'sched: 'fut, // Scheduler must live at least as long as the future
R: ToLuaMulti<'static>,
F: 'static + Fn(&'static Lua) -> FR,
FR: 'static + Future<Output = LuaResult<R>>,
{
self.queue_async_task(thread_or_function, None, None, async move {
match func(self.lua).await {
Ok(res) => match res.to_lua_multi(self.lua) {
Ok(multi) => Ok(Some(multi)),
Err(e) => Err(e),
},
Err(e) => Err(e),
}
})
}
/**
@ -784,14 +493,14 @@ impl<'fut> TaskScheduler<'fut> {
.with_value("thread", async_env_thread)?
.with_value("yield", async_env_yield)?
.with_function(
"resume_async",
"resumeAsync",
move |lua: &Lua, (thread, args): (LuaThread, A)| {
let fut = func(lua, args);
let sched = lua.app_data_ref::<&TaskScheduler>().unwrap();
sched.schedule_async(LuaValue::Thread(thread), async {
sched.queue_async_task(LuaValue::Thread(thread), None, None, async {
let rets = fut.await?;
let mult = rets.to_lua_multi(lua)?;
Ok(Some(mult.into_vec()))
Ok(Some(mult))
})
},
)?
@ -810,7 +519,7 @@ impl<'fut> TaskScheduler<'fut> {
must be called upon completion of the background task to
prevent the task scheduler from running indefinitely.
*/
pub fn register_background_task(&self) -> TaskSchedulerBackgroundTaskHandle {
pub fn register_background_task(&self) -> TaskSchedulerAsyncHandle {
let sender = self.futures_tx.clone();
sender
.send(TaskSchedulerMessage::Spawned)
@ -824,110 +533,82 @@ impl<'fut> TaskScheduler<'fut> {
env!("CARGO_PKG_REPOSITORY")
)
});
TaskSchedulerBackgroundTaskHandle::new(sender)
}
/**
Retrieves the queue for a specific kind of task.
Panics for [`TaskKind::Future`] since
futures do not use the normal task queue.
*/
fn get_queue(&self, kind: TaskKind) -> &TaskSchedulerQueue {
match kind {
TaskKind::Instant => &self.task_queue_instant,
TaskKind::Deferred => &self.task_queue_deferred,
TaskKind::Future => {
panic!("Future tasks do not use the normal task queue")
}
}
TaskSchedulerAsyncHandle::new(sender)
}
/**
Resumes the next queued Lua task, if one exists, blocking
the current thread until it either yields or finishes.
*/
fn resume_next_queue_task(
fn resume_next_blocking_task(
&self,
kind: TaskKind,
override_args: Option<Vec<LuaValue>>,
) -> TaskSchedulerResult {
override_args: Option<LuaResult<LuaMultiValue>>,
) -> TaskSchedulerState {
match {
let mut queue_guard = self.get_queue(kind).lock().unwrap();
queue_guard.pop_front()
let mut queue_guard = self.tasks_queue_blocking.borrow_mut();
let task = queue_guard.pop_front();
drop(queue_guard);
task
} {
None => TaskSchedulerResult::new(self),
None => TaskSchedulerState::new(self),
Some(task) => match self.resume_task(task, override_args) {
Ok(_) => TaskSchedulerResult::new(self),
Err(task_err) => TaskSchedulerResult::err(self, task_err),
Ok(_) => TaskSchedulerState::new(self),
Err(task_err) => TaskSchedulerState::err(self, task_err),
},
}
}
/**
Awaits the first available queued future, and resumes its
associated Lua task which will then be ready for resumption.
Awaits the first available queued future, and resumes its associated
Lua task which will be ready for resumption when that future wakes.
Panics if there are no futures currently queued.
Use [`TaskScheduler::next_queue_future_exists`]
to check if there are any queued futures.
*/
async fn resume_next_queue_future(&self) -> TaskSchedulerResult {
let result = {
async fn resume_next_async_task(&self) -> TaskSchedulerState {
let (task, result) = {
let mut futs = self
.futures
.try_lock()
.expect("Failed to get lock on futures");
.expect("Tried to resume next queued future while already resuming or modifying");
futs.next()
.await
.expect("Tried to resume next queued future but none are queued")
};
match result {
(task, Err(fut_err)) => {
// Future errored, don't resume its associated task
// and make sure to cancel / remove it completely, if removal
// also errors then we send that error back instead of the future's error
TaskSchedulerResult::err(
self,
match self.remove_task(task) {
Err(cancel_err) => cancel_err,
Ok(_) => fut_err,
},
)
}
(task, Ok(args)) => {
// Promote this future task to an instant task
// and resume the instant queue right away, taking
// care to not deadlock by dropping the mutex guard
let mut queue_guard = self.get_queue(TaskKind::Instant).lock().unwrap();
// Promote this future task to a blocking task and resume it
// right away, also taking care to not borrow mutably twice
// by dropping this guard before trying to resume it
let mut queue_guard = self.tasks_queue_blocking.borrow_mut();
queue_guard.push_front(task);
drop(queue_guard);
self.resume_next_queue_task(TaskKind::Instant, args)
}
}
self.resume_next_blocking_task(result.transpose())
}
/**
Awaits the next background task registration
message, if any messages exist in the queue.
This is a no-op if there are no messages.
This is a no-op if there are no background tasks left running
and / or the background task messages channel was closed.
*/
async fn receive_next_message(&self) -> TaskSchedulerResult {
async fn receive_next_message(&self) -> TaskSchedulerState {
let message_opt = {
let mut rx = self.futures_rx.lock().await;
rx.recv().await
};
if let Some(message) = message_opt {
match message {
TaskSchedulerMessage::NewBlockingTaskReady => TaskSchedulerResult::new(self),
TaskSchedulerMessage::NewBlockingTaskReady => TaskSchedulerState::new(self),
TaskSchedulerMessage::Spawned => {
self.futures_in_background.fetch_add(1, Ordering::Relaxed);
TaskSchedulerResult::new(self)
let prev = self.futures_registered_count.get();
self.futures_registered_count.set(prev + 1);
TaskSchedulerState::new(self)
}
TaskSchedulerMessage::Terminated(result) => {
let prev = self.futures_in_background.fetch_sub(1, Ordering::Relaxed);
let prev = self.futures_registered_count.get();
self.futures_registered_count.set(prev - 1);
if prev == 0 {
panic!(
r#"
@ -938,14 +619,14 @@ impl<'fut> TaskScheduler<'fut> {
)
}
if let Err(e) = result {
TaskSchedulerResult::err(self, e)
TaskSchedulerState::err(self, e)
} else {
TaskSchedulerResult::new(self)
TaskSchedulerState::new(self)
}
}
}
} else {
TaskSchedulerResult::new(self)
TaskSchedulerState::new(self)
}
}
@ -958,37 +639,34 @@ impl<'fut> TaskScheduler<'fut> {
this will process delayed tasks, waiting tasks, and native Rust
futures concurrently, awaiting the first one to be ready for resumption.
*/
pub async fn resume_queue(&self) -> TaskSchedulerResult {
let current = TaskSchedulerResult::new(self);
pub async fn resume_queue(&self) -> TaskSchedulerState {
let current = TaskSchedulerState::new(self);
/*
Resume tasks in the internal queue, in this order:
* 🛑 = blocking - lua tasks, in order
* = async - first come, first serve
1. 🛑 Tasks from task.spawn and the main thread
2. 🛑 Tasks from task.defer
3. Tasks from task.delay / task.wait, spawned background tasks
1. 🛑 Tasks from task.spawn / task.defer, the main thread
2. Tasks from task.delay / task.wait, spawned background tasks
*/
if current.num_instant > 0 {
self.resume_next_queue_task(TaskKind::Instant, None)
} else if current.num_deferred > 0 {
self.resume_next_queue_task(TaskKind::Deferred, None)
} else if current.num_futures > 0 && current.num_background > 0 {
if current.has_blocking_tasks() {
self.resume_next_blocking_task(None)
} else if current.has_future_tasks() && current.has_background_tasks() {
// Futures, spawned background tasks
tokio::select! {
result = self.resume_next_queue_future() => result,
result = self.resume_next_async_task() => result,
result = self.receive_next_message() => result,
}
} else if current.num_futures > 0 {
} else if current.has_future_tasks() {
// Futures
self.resume_next_queue_future().await
} else if current.num_background > 0 {
self.resume_next_async_task().await
} else if current.has_background_tasks() {
// Only spawned background tasks, these may then
// spawn new lua tasks and "wake up" the scheduler
self.receive_next_message().await
} else {
TaskSchedulerResult::new(self)
TaskSchedulerState::new(self)
}
}
}

View file

@ -0,0 +1,39 @@
use std::fmt;
/// An enum representing different kinds of tasks
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum TaskKind {
Instant,
Deferred,
Future,
}
#[allow(dead_code)]
impl TaskKind {
pub fn is_instant(&self) -> bool {
*self == Self::Instant
}
pub fn is_deferred(&self) -> bool {
*self == Self::Deferred
}
pub fn is_blocking(&self) -> bool {
*self != Self::Future
}
pub fn is_future(&self) -> bool {
*self == Self::Future
}
}
impl fmt::Display for TaskKind {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let name: &'static str = match self {
TaskKind::Instant => "Instant",
TaskKind::Deferred => "Deferred",
TaskKind::Future => "Future",
};
write!(f, "{name}")
}
}

View file

@ -0,0 +1,31 @@
use std::fmt;
use mlua::prelude::*;
use super::task_kind::TaskKind;
/// A lightweight, copyable struct that represents a
/// task in the scheduler and is accessible from Lua
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct TaskReference {
kind: TaskKind,
guid: usize,
}
impl TaskReference {
pub const fn new(kind: TaskKind, guid: usize) -> Self {
Self { kind, guid }
}
pub const fn id(&self) -> usize {
self.guid
}
}
impl fmt::Display for TaskReference {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "TaskReference({} - {})", self.kind, self.guid)
}
}
impl LuaUserData for TaskReference {}

View file

@ -1,4 +1,4 @@
local PORT = 8080
local PORT = 8081
local URL = `http://127.0.0.1:{PORT}`
local WS_URL = `ws://127.0.0.1:{PORT}`
local REQUEST = "Hello from client!"