Fully implement and document the task scheduler

This commit is contained in:
Filip Tibell 2023-02-13 21:29:05 +01:00
parent fc5de3c8d5
commit 805b9d89ad
No known key found for this signature in database
7 changed files with 446 additions and 233 deletions

View file

@ -2,8 +2,6 @@ use std::fmt::{Display, Formatter, Result as FmtResult};
use mlua::prelude::*; use mlua::prelude::*;
use crate::lua::task::TaskScheduler;
mod fs; mod fs;
mod net; mod net;
mod process; mod process;
@ -80,18 +78,14 @@ impl LuneGlobal {
Note that proxy globals should be handled with special care and that [`LuneGlobal::inject()`] Note that proxy globals should be handled with special care and that [`LuneGlobal::inject()`]
should be preferred over manually creating and manipulating the value(s) of any Lune global. should be preferred over manually creating and manipulating the value(s) of any Lune global.
*/ */
pub fn value( pub fn value(&self, lua: &'static Lua) -> LuaResult<LuaTable> {
&self,
lua: &'static Lua,
scheduler: &'static TaskScheduler,
) -> LuaResult<LuaTable> {
match self { match self {
LuneGlobal::Fs => fs::create(lua), LuneGlobal::Fs => fs::create(lua),
LuneGlobal::Net => net::create(lua), LuneGlobal::Net => net::create(lua),
LuneGlobal::Process { args } => process::create(lua, args.clone()), LuneGlobal::Process { args } => process::create(lua, args.clone()),
LuneGlobal::Require => require::create(lua), LuneGlobal::Require => require::create(lua),
LuneGlobal::Stdio => stdio::create(lua), LuneGlobal::Stdio => stdio::create(lua),
LuneGlobal::Task => task::create(lua, scheduler), LuneGlobal::Task => task::create(lua),
LuneGlobal::TopLevel => top_level::create(lua), LuneGlobal::TopLevel => top_level::create(lua),
} }
} }
@ -104,9 +98,9 @@ impl LuneGlobal {
Refer to [`LuneGlobal::is_top_level()`] for more info on proxy globals. Refer to [`LuneGlobal::is_top_level()`] for more info on proxy globals.
*/ */
pub fn inject(self, lua: &'static Lua, scheduler: &'static TaskScheduler) -> LuaResult<()> { pub fn inject(self, lua: &'static Lua) -> LuaResult<()> {
let globals = lua.globals(); let globals = lua.globals();
let table = self.value(lua, scheduler)?; let table = self.value(lua)?;
// NOTE: Top level globals are special, the values // NOTE: Top level globals are special, the values
// *in* the table they return should be set directly, // *in* the table they return should be set directly,
// instead of setting the table itself as the global // instead of setting the table itself as the global

View file

@ -164,6 +164,8 @@ async fn net_serve<'a>(
.expect("Failed to store websocket handler") .expect("Failed to store websocket handler")
})); }));
let server = Server::bind(&([127, 0, 0, 1], port).into()) let server = Server::bind(&([127, 0, 0, 1], port).into())
.http1_only(true)
.http1_keepalive(true)
.executor(LocalExec) .executor(LocalExec)
.serve(MakeNetService( .serve(MakeNetService(
lua, lua,

View file

@ -50,7 +50,7 @@ pub fn create(lua: &'static Lua, args_vec: Vec<String>) -> LuaResult<LuaTable> {
let process_exit_env_yield: LuaFunction = lua.named_registry_value("co.yield")?; let process_exit_env_yield: LuaFunction = lua.named_registry_value("co.yield")?;
let process_exit_env_exit: LuaFunction = lua.create_function(|lua, code: Option<u8>| { let process_exit_env_exit: LuaFunction = lua.create_function(|lua, code: Option<u8>| {
let exit_code = code.map_or(ExitCode::SUCCESS, ExitCode::from); let exit_code = code.map_or(ExitCode::SUCCESS, ExitCode::from);
let sched = &mut lua.app_data_mut::<&TaskScheduler>().unwrap(); let sched = lua.app_data_mut::<&TaskScheduler>().unwrap();
sched.set_exit_code(exit_code); sched.set_exit_code(exit_code);
Ok(()) Ok(())
})?; })?;

View file

@ -10,23 +10,27 @@ resume_after(thread(), ...)
return yield() return yield()
"#; "#;
pub fn create( const TASK_SPAWN_IMPL_LUA: &str = r#"
lua: &'static Lua, local task = resume_first(...)
scheduler: &'static TaskScheduler, resume_second(thread())
) -> LuaResult<LuaTable<'static>> { yield()
lua.set_app_data(scheduler); return task
"#;
pub fn create(lua: &'static Lua) -> LuaResult<LuaTable<'static>> {
// Create task spawning functions that add tasks to the scheduler // Create task spawning functions that add tasks to the scheduler
let task_spawn = lua.create_function(|lua, (tof, args): (LuaValue, LuaMultiValue)| { let task_cancel = lua.create_function(|lua, task: TaskReference| {
let sched = &mut lua.app_data_mut::<&TaskScheduler>().unwrap(); let sched = lua.app_data_mut::<&TaskScheduler>().unwrap();
sched.schedule_instant(tof, args) sched.cancel_task(task)?;
Ok(())
})?; })?;
let task_defer = lua.create_function(|lua, (tof, args): (LuaValue, LuaMultiValue)| { let task_defer = lua.create_function(|lua, (tof, args): (LuaValue, LuaMultiValue)| {
let sched = &mut lua.app_data_mut::<&TaskScheduler>().unwrap(); let sched = lua.app_data_mut::<&TaskScheduler>().unwrap();
sched.schedule_deferred(tof, args) sched.schedule_deferred(tof, args)
})?; })?;
let task_delay = let task_delay =
lua.create_function(|lua, (secs, tof, args): (f64, LuaValue, LuaMultiValue)| { lua.create_function(|lua, (secs, tof, args): (f64, LuaValue, LuaMultiValue)| {
let sched = &mut lua.app_data_mut::<&TaskScheduler>().unwrap(); let sched = lua.app_data_mut::<&TaskScheduler>().unwrap();
sched.schedule_delayed(secs, tof, args) sched.schedule_delayed(secs, tof, args)
})?; })?;
// Create our task wait function, this is a bit different since // Create our task wait function, this is a bit different since
@ -43,13 +47,41 @@ pub fn create(
.with_function( .with_function(
"resume_after", "resume_after",
|lua, (thread, secs): (LuaThread, Option<f64>)| { |lua, (thread, secs): (LuaThread, Option<f64>)| {
let sched = &mut lua.app_data_mut::<&TaskScheduler>().unwrap(); let sched = lua.app_data_mut::<&TaskScheduler>().unwrap();
sched.resume_after(secs.unwrap_or(0f64), thread) sched.schedule_wait(secs.unwrap_or(0f64), LuaValue::Thread(thread))
}, },
)? )?
.build_readonly()?, .build_readonly()?,
)? )?
.into_function()?; .into_function()?;
// The spawn function also needs special treatment,
// we need to yield right away to allow the
// spawned task to run until first yield
let task_spawn_env_thread: LuaFunction = lua.named_registry_value("co.thread")?;
let task_spawn_env_yield: LuaFunction = lua.named_registry_value("co.yield")?;
let task_spawn = lua
.load(TASK_SPAWN_IMPL_LUA)
.set_environment(
TableBuilder::new(lua)?
.with_value("thread", task_spawn_env_thread)?
.with_value("yield", task_spawn_env_yield)?
.with_function(
"resume_first",
|lua, (tof, args): (LuaValue, LuaMultiValue)| {
let sched = lua.app_data_mut::<&TaskScheduler>().unwrap();
sched.schedule_current_resume(tof, args)
},
)?
.with_function("resume_second", |lua, thread: LuaThread| {
let sched = lua.app_data_mut::<&TaskScheduler>().unwrap();
sched.schedule_after_current_resume(
LuaValue::Thread(thread),
LuaMultiValue::new(),
)
})?
.build_readonly()?,
)?
.into_function()?;
// We want the task scheduler to be transparent, // We want the task scheduler to be transparent,
// but it does not return real lua threads, so // but it does not return real lua threads, so
// we need to override some globals to fake it // we need to override some globals to fake it
@ -76,6 +108,7 @@ pub fn create(
globals.set("typeof", typeof_proxy)?; globals.set("typeof", typeof_proxy)?;
// All good, return the task scheduler lib // All good, return the task scheduler lib
TableBuilder::new(lua)? TableBuilder::new(lua)?
.with_value("cancel", task_cancel)?
.with_value("spawn", task_spawn)? .with_value("spawn", task_spawn)?
.with_value("defer", task_defer)? .with_value("defer", task_defer)?
.with_value("delay", task_delay)? .with_value("delay", task_delay)?

View file

@ -1,6 +1,6 @@
use std::{collections::HashSet, process::ExitCode}; use std::{collections::HashSet, process::ExitCode};
use lua::task::TaskScheduler; use lua::task::{TaskScheduler, TaskSchedulerResult};
use mlua::prelude::*; use mlua::prelude::*;
use tokio::task::LocalSet; use tokio::task::LocalSet;
@ -88,10 +88,7 @@ impl Lune {
script_name: &str, script_name: &str,
script_contents: &str, script_contents: &str,
) -> Result<ExitCode, LuaError> { ) -> Result<ExitCode, LuaError> {
let set = LocalSet::new();
let lua = Lua::new().into_static(); let lua = Lua::new().into_static();
let sched = TaskScheduler::new(lua)?.into_static();
lua.set_app_data(sched);
// Store original lua global functions in the registry so we can use // Store original lua global functions in the registry so we can use
// them later without passing them around and dealing with lifetimes // them later without passing them around and dealing with lifetimes
lua.set_named_registry_value("require", lua.globals().get::<_, LuaFunction>("require")?)?; lua.set_named_registry_value("require", lua.globals().get::<_, LuaFunction>("require")?)?;
@ -105,11 +102,13 @@ impl Lune {
// Add in wanted lune globals // Add in wanted lune globals
for global in self.includes.clone() { for global in self.includes.clone() {
if !self.excludes.contains(&global) { if !self.excludes.contains(&global) {
global.inject(lua, sched)?; global.inject(lua)?;
} }
} }
// Schedule the main thread on the task scheduler // Create our task scheduler and schedule the main thread on it
sched.schedule_instant( let sched = TaskScheduler::new(lua)?.into_static();
lua.set_app_data(sched);
sched.schedule_current_resume(
LuaValue::Function( LuaValue::Function(
lua.load(script_contents) lua.load(script_contents)
.set_name(script_name) .set_name(script_name)
@ -120,30 +119,30 @@ impl Lune {
LuaValue::Nil.to_lua_multi(lua)?, LuaValue::Nil.to_lua_multi(lua)?,
)?; )?;
// Keep running the scheduler until there are either no tasks // Keep running the scheduler until there are either no tasks
// left to run, or until some task requests to exit the process // left to run, or until a task requests to exit the process
let exit_code = set let exit_code = LocalSet::new()
.run_until(async { .run_until(async move {
loop {
let mut got_error = false; let mut got_error = false;
while let Some(result) = sched.resume_queue().await { let state = match sched.resume_queue().await {
match result { TaskSchedulerResult::TaskSuccessful { state } => state,
Err(e) => { TaskSchedulerResult::TaskErrored { state, error } => {
eprintln!("{}", pretty_format_luau_error(&e)); eprintln!("{}", pretty_format_luau_error(&error));
got_error = true; got_error = true;
state
} }
Ok(status) => { TaskSchedulerResult::Finished { state } => state,
if let Some(exit_code) = status.exit_code { };
if let Some(exit_code) = state.exit_code {
return exit_code; return exit_code;
} else if status.num_total == 0 { } else if state.num_total == 0 {
if got_error {
return ExitCode::FAILURE;
} else {
return ExitCode::SUCCESS; return ExitCode::SUCCESS;
} }
} }
} }
}
if got_error {
ExitCode::FAILURE
} else {
ExitCode::SUCCESS
}
}) })
.await; .await;
Ok(exit_code) Ok(exit_code)

View file

@ -1,3 +1,4 @@
use core::panic;
use std::{ use std::{
collections::{HashMap, VecDeque}, collections::{HashMap, VecDeque},
fmt, fmt,
@ -9,18 +10,26 @@ use std::{
time::Duration, time::Duration,
}; };
use futures_util::{future::BoxFuture, stream::FuturesUnordered, StreamExt};
use mlua::prelude::*; use mlua::prelude::*;
use tokio::time::{sleep, Instant}; use tokio::{
sync::Mutex as AsyncMutex,
time::{sleep, Instant},
};
type TaskSchedulerQueue = Arc<Mutex<VecDeque<TaskReference>>>; type TaskSchedulerQueue = Arc<Mutex<VecDeque<TaskReference>>>;
type TaskFutureArgsOverride<'fut> = Option<Vec<LuaValue<'fut>>>;
type TaskFutureResult<'fut> = (TaskReference, LuaResult<TaskFutureArgsOverride<'fut>>);
type TaskFuture<'fut> = BoxFuture<'fut, TaskFutureResult<'fut>>;
/// An enum representing different kinds of tasks /// An enum representing different kinds of tasks
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum TaskKind { pub enum TaskKind {
Instant, Instant,
Deferred, Deferred,
Yielded, Future,
} }
impl fmt::Display for TaskKind { impl fmt::Display for TaskKind {
@ -28,7 +37,7 @@ impl fmt::Display for TaskKind {
let name: &'static str = match self { let name: &'static str = match self {
TaskKind::Instant => "Instant", TaskKind::Instant => "Instant",
TaskKind::Deferred => "Deferred", TaskKind::Deferred => "Deferred",
TaskKind::Yielded => "Yielded", TaskKind::Future => "Future",
}; };
write!(f, "{name}") write!(f, "{name}")
} }
@ -40,16 +49,11 @@ impl fmt::Display for TaskKind {
pub struct TaskReference { pub struct TaskReference {
kind: TaskKind, kind: TaskKind,
guid: usize, guid: usize,
queued_target: Option<Instant>,
} }
impl TaskReference { impl TaskReference {
pub const fn new(kind: TaskKind, guid: usize, queued_target: Option<Instant>) -> Self { pub const fn new(kind: TaskKind, guid: usize) -> Self {
Self { Self { kind, guid }
kind,
guid,
queued_target,
}
} }
} }
@ -61,114 +65,140 @@ impl fmt::Display for TaskReference {
impl LuaUserData for TaskReference {} impl LuaUserData for TaskReference {}
impl From<&Task> for TaskReference {
fn from(value: &Task) -> Self {
Self::new(value.kind, value.guid, value.queued_target)
}
}
/// A struct representing a task contained in the task scheduler /// A struct representing a task contained in the task scheduler
#[derive(Debug)] #[derive(Debug)]
pub struct Task { pub struct Task {
kind: TaskKind,
guid: usize,
thread: LuaRegistryKey, thread: LuaRegistryKey,
args: LuaRegistryKey, args: LuaRegistryKey,
queued_at: Instant, queued_at: Instant,
queued_target: Option<Instant>,
} }
/// A struct representing the current status of the task scheduler /// A struct representing the current status of the task scheduler
#[derive(Debug, Clone, Copy)] #[derive(Debug, Clone, Copy)]
pub struct TaskSchedulerStatus { pub struct TaskSchedulerState {
pub exit_code: Option<ExitCode>, pub exit_code: Option<ExitCode>,
pub num_instant: usize, pub num_instant: usize,
pub num_deferred: usize, pub num_deferred: usize,
pub num_yielded: usize, pub num_future: usize,
pub num_total: usize, pub num_total: usize,
} }
impl fmt::Display for TaskSchedulerStatus { impl fmt::Display for TaskSchedulerState {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!( write!(
f, f,
"TaskSchedulerStatus(\nInstant: {}\nDeferred: {}\nYielded: {}\nTotal: {})", "TaskSchedulerStatus(\nInstant: {}\nDeferred: {}\nYielded: {}\nTotal: {})",
self.num_instant, self.num_deferred, self.num_yielded, self.num_total self.num_instant, self.num_deferred, self.num_future, self.num_total
) )
} }
} }
#[derive(Debug, Clone)]
pub enum TaskSchedulerResult {
Finished {
state: TaskSchedulerState,
},
TaskErrored {
error: LuaError,
state: TaskSchedulerState,
},
TaskSuccessful {
state: TaskSchedulerState,
},
}
/// A task scheduler that implements task queues /// A task scheduler that implements task queues
/// with instant, deferred, and delayed tasks /// with instant, deferred, and delayed tasks
#[derive(Debug)] #[derive(Debug)]
pub struct TaskScheduler { pub struct TaskScheduler<'fut> {
lua: &'static Lua, lua: &'static Lua,
guid: AtomicUsize, guid: AtomicUsize,
running: bool,
tasks: Arc<Mutex<HashMap<TaskReference, Task>>>, tasks: Arc<Mutex<HashMap<TaskReference, Task>>>,
futures: Arc<AsyncMutex<FuturesUnordered<TaskFuture<'fut>>>>,
task_queue_instant: TaskSchedulerQueue, task_queue_instant: TaskSchedulerQueue,
task_queue_deferred: TaskSchedulerQueue, task_queue_deferred: TaskSchedulerQueue,
task_queue_yielded: TaskSchedulerQueue,
exit_code_set: AtomicBool, exit_code_set: AtomicBool,
exit_code: Arc<Mutex<ExitCode>>, exit_code: Arc<Mutex<ExitCode>>,
} }
impl TaskScheduler { impl<'fut> TaskScheduler<'fut> {
/**
Creates a new task scheduler.
*/
pub fn new(lua: &'static Lua) -> LuaResult<Self> { pub fn new(lua: &'static Lua) -> LuaResult<Self> {
Ok(Self { Ok(Self {
lua, lua,
guid: AtomicUsize::new(0), guid: AtomicUsize::new(0),
running: false,
tasks: Arc::new(Mutex::new(HashMap::new())), tasks: Arc::new(Mutex::new(HashMap::new())),
futures: Arc::new(AsyncMutex::new(FuturesUnordered::new())),
task_queue_instant: Arc::new(Mutex::new(VecDeque::new())), task_queue_instant: Arc::new(Mutex::new(VecDeque::new())),
task_queue_deferred: Arc::new(Mutex::new(VecDeque::new())), task_queue_deferred: Arc::new(Mutex::new(VecDeque::new())),
task_queue_yielded: Arc::new(Mutex::new(VecDeque::new())),
exit_code_set: AtomicBool::new(false), exit_code_set: AtomicBool::new(false),
exit_code: Arc::new(Mutex::new(ExitCode::SUCCESS)), exit_code: Arc::new(Mutex::new(ExitCode::SUCCESS)),
}) })
} }
/**
Consumes and leaks the task scheduler,
returning a static reference `&'static TaskScheduler`.
This function is useful when the task scheduler object is
supposed to live for the remainder of the program's life.
Note that dropping the returned reference will cause a memory leak.
*/
pub fn into_static(self) -> &'static Self { pub fn into_static(self) -> &'static Self {
Box::leak(Box::new(self)) Box::leak(Box::new(self))
} }
pub fn status(&self) -> TaskSchedulerStatus { /**
let counts = { Gets the current state of the task scheduler.
(
self.task_queue_instant.lock().unwrap().len(), Panics if called during any of the task scheduler resumption phases.
self.task_queue_deferred.lock().unwrap().len(), */
self.task_queue_yielded.lock().unwrap().len(), pub fn state(&self) -> TaskSchedulerState {
) const MESSAGE: &str =
}; "Failed to get lock - make sure not to call during task scheduler resumption";
let num_total = counts.0 + counts.1 + counts.2; TaskSchedulerState {
let exit_code = if self.exit_code_set.load(Ordering::Relaxed) { exit_code: if self.exit_code_set.load(Ordering::Relaxed) {
Some(*self.exit_code.lock().unwrap()) Some(*self.exit_code.try_lock().expect(MESSAGE))
} else { } else {
None None
}; },
TaskSchedulerStatus { num_instant: self.task_queue_instant.try_lock().expect(MESSAGE).len(),
exit_code, num_deferred: self.task_queue_deferred.try_lock().expect(MESSAGE).len(),
num_instant: counts.0, num_future: self.futures.try_lock().expect(MESSAGE).len(),
num_deferred: counts.1, num_total: self.tasks.try_lock().expect(MESSAGE).len(),
num_yielded: counts.2,
num_total,
} }
} }
/**
Stores the exit code for the task scheduler.
This will be passed back to the Rust thread that is running the task scheduler,
in the [`TaskSchedulerState`] returned on resumption of the task scheduler queue.
Setting this exit code will signal to that thread that it
should stop resuming tasks, and gracefully terminate the program.
*/
pub fn set_exit_code(&self, code: ExitCode) { pub fn set_exit_code(&self, code: ExitCode) {
*self.exit_code.lock().unwrap() = code;
self.exit_code_set.store(true, Ordering::Relaxed); self.exit_code_set.store(true, Ordering::Relaxed);
*self.exit_code.lock().unwrap() = code
} }
fn schedule<'a>( /**
Creates a new task, storing a new Lua thread
for it, as well as the arguments to give the
thread on resumption, in the Lua registry.
*/
fn create_task<'a>(
&self, &self,
kind: TaskKind, kind: TaskKind,
tof: LuaValue<'a>, thread_or_function: LuaValue<'a>,
args: Option<LuaMultiValue<'a>>, thread_args: Option<LuaMultiValue<'a>>,
delay: Option<f64>,
) -> LuaResult<TaskReference> { ) -> LuaResult<TaskReference> {
// Get or create a thread from the given argument // Get or create a thread from the given argument
let task_thread = match tof { let task_thread = match thread_or_function {
LuaValue::Thread(t) => t, LuaValue::Thread(t) => t,
LuaValue::Function(f) => self.lua.create_thread(f)?, LuaValue::Function(f) => self.lua.create_thread(f)?,
value => { value => {
@ -179,138 +209,232 @@ impl TaskScheduler {
} }
}; };
// Store the thread and its arguments in the registry // Store the thread and its arguments in the registry
let task_args_vec = args.map(|opt| opt.into_vec()); let task_args_vec: Option<Vec<LuaValue>> = thread_args.map(|opt| opt.into_vec());
let task_thread_key = self.lua.create_registry_value(task_thread)?; let task_args_key: LuaRegistryKey = self.lua.create_registry_value(task_args_vec)?;
let task_args_key = self.lua.create_registry_value(task_args_vec)?; let task_thread_key: LuaRegistryKey = self.lua.create_registry_value(task_thread)?;
// Create the full task struct // Create the full task struct
let guid = self.guid.fetch_add(1, Ordering::Relaxed) + 1; let guid = self.guid.fetch_add(1, Ordering::Relaxed) + 1;
let queued_at = Instant::now(); let queued_at = Instant::now();
let queued_target = delay.map(|secs| queued_at + Duration::from_secs_f64(secs));
let task = Task { let task = Task {
kind,
guid,
thread: task_thread_key, thread: task_thread_key,
args: task_args_key, args: task_args_key,
queued_at, queued_at,
queued_target,
}; };
// Create the task ref (before adding the task to the scheduler)
let task_ref = TaskReference::from(&task);
// Add it to the scheduler // Add it to the scheduler
{ {
let mut tasks = self.tasks.lock().unwrap(); let mut tasks = self.tasks.lock().unwrap();
tasks.insert(task_ref, task); tasks.insert(TaskReference::new(kind, guid), task);
} }
Ok(TaskReference::new(kind, guid))
}
/**
Schedules a new task to run on the task scheduler.
When we want to schedule a task to resume instantly after the
currently running task we should pass `after_current_resume = true`.
This is useful in cases such as our task.spawn implementation:
```lua
task.spawn(function()
-- This will be a new task, but it should
-- also run right away, until the first yield
end)
-- Here we have either yielded or finished the above task
```
*/
fn schedule<'a>(
&self,
kind: TaskKind,
thread_or_function: LuaValue<'a>,
thread_args: Option<LuaMultiValue<'a>>,
after_current_resume: bool,
) -> LuaResult<TaskReference> {
if kind == TaskKind::Future {
panic!("Tried to schedule future using normal task schedule method")
}
let task_ref = self.create_task(kind, thread_or_function, thread_args)?;
match kind { match kind {
TaskKind::Instant => { TaskKind::Instant => {
// If we have a currently running task and we spawned an
// instant task here it should run right after the currently
// running task, so put it at the front of the task queue
let mut queue = self.task_queue_instant.lock().unwrap(); let mut queue = self.task_queue_instant.lock().unwrap();
if self.running { if after_current_resume {
queue.push_front(task_ref); assert!(
queue.len() > 0,
"Cannot schedule a task after the first instant when task queue is empty"
);
queue.insert(1, task_ref);
} else { } else {
queue.push_back(task_ref); queue.push_front(task_ref);
} }
} }
TaskKind::Deferred => { TaskKind::Deferred => {
// Deferred tasks should always schedule // Deferred tasks should always schedule at the end of the deferred queue
// at the very end of the deferred queue
let mut queue = self.task_queue_deferred.lock().unwrap(); let mut queue = self.task_queue_deferred.lock().unwrap();
queue.push_back(task_ref); queue.push_back(task_ref);
} }
TaskKind::Yielded => { TaskKind::Future => unreachable!(),
// Find the first task that is scheduled after this one and insert before it,
// this will ensure that our list of delayed tasks is sorted and we can grab
// the very first one to figure out how long to yield until the next cycle
let mut queue = self.task_queue_yielded.lock().unwrap();
let idx = queue
.iter()
.enumerate()
.find_map(|(idx, t)| {
if t.queued_target > queued_target {
Some(idx)
} else {
None
}
})
.unwrap_or(queue.len());
queue.insert(idx, task_ref);
}
} }
Ok(task_ref) Ok(task_ref)
} }
pub fn schedule_instant<'a>( pub fn schedule_current_resume<'a>(
&self, &self,
tof: LuaValue<'a>, thread_or_function: LuaValue<'a>,
args: LuaMultiValue<'a>, thread_args: LuaMultiValue<'a>,
) -> LuaResult<TaskReference> { ) -> LuaResult<TaskReference> {
self.schedule(TaskKind::Instant, tof, Some(args), None) self.schedule(
TaskKind::Instant,
thread_or_function,
Some(thread_args),
false,
)
}
pub fn schedule_after_current_resume<'a>(
&self,
thread_or_function: LuaValue<'a>,
thread_args: LuaMultiValue<'a>,
) -> LuaResult<TaskReference> {
self.schedule(
TaskKind::Instant,
thread_or_function,
Some(thread_args),
true,
)
} }
pub fn schedule_deferred<'a>( pub fn schedule_deferred<'a>(
&self, &self,
tof: LuaValue<'a>, thread_or_function: LuaValue<'a>,
args: LuaMultiValue<'a>, thread_args: LuaMultiValue<'a>,
) -> LuaResult<TaskReference> { ) -> LuaResult<TaskReference> {
self.schedule(TaskKind::Deferred, tof, Some(args), None) self.schedule(
TaskKind::Deferred,
thread_or_function,
Some(thread_args),
false,
)
} }
pub fn schedule_delayed<'a>( pub fn schedule_delayed<'a>(
&self, &self,
secs: f64, after_secs: f64,
tof: LuaValue<'a>, thread_or_function: LuaValue<'a>,
args: LuaMultiValue<'a>, thread_args: LuaMultiValue<'a>,
) -> LuaResult<TaskReference> { ) -> LuaResult<TaskReference> {
self.schedule(TaskKind::Yielded, tof, Some(args), Some(secs)) let task_ref = self.create_task(TaskKind::Future, thread_or_function, Some(thread_args))?;
let futs = self
.futures
.try_lock()
.expect("Failed to get lock on futures");
futs.push(Box::pin(async move {
sleep(Duration::from_secs_f64(after_secs)).await;
(task_ref, Ok(None))
}));
Ok(task_ref)
} }
pub fn resume_after(&self, secs: f64, thread: LuaThread<'_>) -> LuaResult<TaskReference> { pub fn schedule_wait(
self.schedule( &self,
TaskKind::Yielded, after_secs: f64,
LuaValue::Thread(thread), thread_or_function: LuaValue<'_>,
None, ) -> LuaResult<TaskReference> {
Some(secs), // TODO: Wait should inherit the guid of the current task,
) // this will ensure that TaskReferences are identical and
// that any waits inside of spawned tasks will also cancel
let task_ref = self.create_task(TaskKind::Future, thread_or_function, None)?;
let futs = self
.futures
.try_lock()
.expect("Failed to get lock on futures");
futs.push(Box::pin(async move {
sleep(Duration::from_secs_f64(after_secs)).await;
(task_ref, Ok(None))
}));
Ok(task_ref)
} }
pub fn cancel(&self, reference: TaskReference) -> bool { /**
let queue_mutex = match reference.kind { Checks if a task still exists in the scheduler.
TaskKind::Instant => &self.task_queue_instant,
TaskKind::Deferred => &self.task_queue_deferred, A task may no longer exist in the scheduler if it has been manually
TaskKind::Yielded => &self.task_queue_yielded, cancelled and removed by calling [`TaskScheduler::cancel_task()`].
}; */
let mut queue = queue_mutex.lock().unwrap(); #[allow(dead_code)]
let mut found = false; pub fn contains_task(&self, reference: TaskReference) -> bool {
queue.retain(|task| { let tasks = self.tasks.lock().unwrap();
if task.guid == reference.guid { tasks.contains_key(&reference)
found = true; }
false
/**
Cancels a task, if the task still exists in the scheduler.
It is possible to hold one or more task references that point
to a task that no longer exists in the scheduler, and calling
this method with one of those references will return `false`.
*/
pub fn cancel_task(&self, reference: TaskReference) -> LuaResult<bool> {
/*
Remove the task from the task list and the Lua registry
This is all we need to do since resume_task will always
ignore resumption of any task that no longer exists there
This does lead to having some amount of "junk" tasks and futures
built up in the queues but these will get cleaned up and not block
the program from exiting since the scheduler only runs until there
are no tasks left in the task list, the queues do not matter there
*/
let mut tasks = self.tasks.lock().unwrap();
if let Some(task) = tasks.remove(&reference) {
self.lua.remove_registry_value(task.thread)?;
self.lua.remove_registry_value(task.args)?;
Ok(true)
} else { } else {
true Ok(false)
} }
});
found
} }
pub fn resume_task(&self, reference: TaskReference) -> LuaResult<()> { /**
Resumes a task, if the task still exists in the scheduler.
A task may no longer exist in the scheduler if it has been manually
cancelled and removed by calling [`TaskScheduler::cancel_task()`].
This will be a no-op if the task no longer exists.
*/
pub fn resume_task(
&self,
reference: TaskReference,
override_args: Option<Vec<LuaValue>>,
) -> LuaResult<()> {
let task = { let task = {
let mut tasks = self.tasks.lock().unwrap(); let mut tasks = self.tasks.lock().unwrap();
match tasks.remove(&reference) { match tasks.remove(&reference) {
Some(task) => task, Some(task) => task,
None => { None => return Ok(()), // Task was removed
return Err(LuaError::RuntimeError(format!(
"Task does not exist in scheduler: {reference}"
)))
}
} }
}; };
let thread: LuaThread = self.lua.registry_value(&task.thread)?; let thread: LuaThread = self.lua.registry_value(&task.thread)?;
let args: Option<Vec<LuaValue>> = self.lua.registry_value(&task.args)?; let args = override_args.or_else(|| {
self.lua
.registry_value::<Option<Vec<LuaValue>>>(&task.args)
.expect("Failed to get stored args for task")
});
if let Some(args) = args { if let Some(args) = args {
thread.resume::<_, LuaMultiValue>(LuaMultiValue::from_vec(args))?; thread.resume::<_, LuaMultiValue>(LuaMultiValue::from_vec(args))?;
} else { } else {
/*
The tasks did not get any arguments from either:
- Providing arguments at the call site for creating the task
- Returning arguments from a future that created this task
The only tasks that do not get any arguments from either
of those sources are waiting tasks, and waiting tasks
want the amount of time waited returned to them.
*/
let elapsed = task.queued_at.elapsed().as_secs_f64(); let elapsed = task.queued_at.elapsed().as_secs_f64();
thread.resume::<_, LuaMultiValue>(elapsed)?; thread.resume::<_, LuaMultiValue>(elapsed)?;
} }
@ -319,88 +443,146 @@ impl TaskScheduler {
Ok(()) Ok(())
} }
/**
Retrieves the queue for a specific kind of task.
Panics for [`TaskKind::Future`] since
futures do not use the normal task queue.
*/
fn get_queue(&self, kind: TaskKind) -> &TaskSchedulerQueue { fn get_queue(&self, kind: TaskKind) -> &TaskSchedulerQueue {
match kind { match kind {
TaskKind::Instant => &self.task_queue_instant, TaskKind::Instant => &self.task_queue_instant,
TaskKind::Deferred => &self.task_queue_deferred, TaskKind::Deferred => &self.task_queue_deferred,
TaskKind::Yielded => &self.task_queue_yielded, TaskKind::Future => {
panic!("Future tasks do not use the normal task queue")
}
} }
} }
fn next_queue_task(&self, kind: TaskKind) -> Option<TaskReference> { /**
let task = { Checks if a future exists in the task queue.
let queue_guard = self.get_queue(kind).lock().unwrap();
queue_guard.front().copied() Panics if called during resumption of the futures task queue.
}; */
task fn next_queue_future_exists(&self) -> bool {
let futs = self.futures.try_lock().expect(
"Failed to get lock on futures - make sure not to call during futures resumption",
);
!futs.is_empty()
} }
fn resume_next_queue_task(&self, kind: TaskKind) -> Option<LuaResult<TaskSchedulerStatus>> { /**
Resumes the next queued Lua task, if one exists, blocking
the current thread until it either yields or finishes.
*/
fn resume_next_queue_task(
&self,
kind: TaskKind,
override_args: Option<Vec<LuaValue>>,
) -> TaskSchedulerResult {
match { match {
let mut queue_guard = self.get_queue(kind).lock().unwrap(); let mut queue_guard = self.get_queue(kind).lock().unwrap();
queue_guard.pop_front() queue_guard.pop_front()
} { } {
None => { None => {
let status = self.status(); let status = self.state();
if status.num_total > 0 { if status.num_total > 0 {
Some(Ok(status)) TaskSchedulerResult::TaskSuccessful {
state: self.state(),
}
} else { } else {
None TaskSchedulerResult::Finished {
state: self.state(),
} }
} }
Some(t) => match self.resume_task(t) { }
Ok(_) => Some(Ok(self.status())), Some(task) => match self.resume_task(task, override_args) {
Err(e) => Some(Err(e)), Ok(()) => TaskSchedulerResult::TaskSuccessful {
state: self.state(),
},
Err(task_err) => TaskSchedulerResult::TaskErrored {
error: task_err,
state: self.state(),
},
}, },
} }
} }
pub async fn resume_queue(&self) -> Option<LuaResult<TaskSchedulerStatus>> { /**
let now = Instant::now(); Awaits the first available queued future, and resumes its
let status = self.status(); associated Lua task which will then be ready for resumption.
Panics if there are no futures currently queued.
Use [`TaskScheduler::next_queue_future_exists`]
to check if there are any queued futures.
*/
async fn resume_next_queue_future(&self) -> TaskSchedulerResult {
let result = {
let mut futs = self
.futures
.try_lock()
.expect("Failed to get lock on futures");
futs.next()
.await
.expect("Tried to resume next queued future but none are queued")
};
match result {
(task, Err(fut_err)) => {
// Future errored, don't resume its associated task
// and make sure to cancel / remove it completely
let error_prefer_cancel = match self.cancel_task(task) {
Err(cancel_err) => cancel_err,
Ok(_) => fut_err,
};
TaskSchedulerResult::TaskErrored {
error: error_prefer_cancel,
state: self.state(),
}
}
(task, Ok(args)) => {
// Promote this future task to an instant task
// and resume the instant queue right away, taking
// care to not deadlock by dropping the mutex guard
let mut queue_guard = self.get_queue(TaskKind::Instant).lock().unwrap();
queue_guard.push_front(task);
drop(queue_guard);
self.resume_next_queue_task(TaskKind::Instant, args)
}
}
}
/**
Resumes the task scheduler queue.
This will run any spawned or deferred Lua tasks in a blocking manner.
Once all spawned and / or deferred Lua tasks have finished running,
this will process delayed tasks, waiting tasks, and native Rust
futures concurrently, awaiting the first one to be ready for resumption.
*/
pub async fn resume_queue(&self) -> TaskSchedulerResult {
let status = self.state();
/* /*
Resume tasks in the internal queue, in this order: Resume tasks in the internal queue, in this order:
1. Tasks from task.spawn, this includes the main thread 1. Tasks from task.spawn, this includes the main thread
2. Tasks from task.defer 2. Tasks from task.defer
3. Tasks from task.delay OR futures, whichever comes first 3. Tasks from task.delay / task.wait / native futures, first ready first resumed
4. Tasks from futures
*/ */
if status.num_instant > 0 { if status.num_instant > 0 {
self.resume_next_queue_task(TaskKind::Instant) self.resume_next_queue_task(TaskKind::Instant, None)
} else if status.num_deferred > 0 { } else if status.num_deferred > 0 {
self.resume_next_queue_task(TaskKind::Deferred) self.resume_next_queue_task(TaskKind::Deferred, None)
} else if status.num_yielded > 0 { } else {
// 3. Threads from task.delay or task.wait, futures // 3. Threads from task.delay or task.wait, futures
let next_yield_target = self if self.next_queue_future_exists() {
.next_queue_task(TaskKind::Yielded) self.resume_next_queue_future().await
.expect("Yielded task missing but status count is > 0")
.queued_target
.expect("Yielded task is missing queued target");
// Resume this yielding task if its target time has passed
if now >= next_yield_target {
self.resume_next_queue_task(TaskKind::Yielded)
} else { } else {
/* TaskSchedulerResult::Finished {
Await the first future to be ready state: self.state(),
}
- If it is the sleep fut then we will return and the next }
call to resume_queue will then resume that yielded task
- If it is a future then we resume the corresponding task
that is has stored in the future-specific task queue
*/
sleep(next_yield_target - now).await;
// TODO: Implement this, for now we only await sleep
// since the task scheduler doesn't support futures
Some(Ok(self.status()))
}
} else {
// 4. Just futures
// TODO: Await the first future to be ready
// and resume the corresponding task for it
None
} }
} }
} }

View file

@ -21,10 +21,13 @@ assert(not flag2, "Cancel should handle delayed threads")
local flag3: number = 1 local flag3: number = 1
local thread3 = task.spawn(function() local thread3 = task.spawn(function()
print("1")
task.wait(0.1) task.wait(0.1)
flag3 = 2 flag3 = 2
print("2")
task.wait(0.2) task.wait(0.2)
flag3 = 3 flag3 = 3
print("3")
end) end)
task.wait(0.2) task.wait(0.2)
task.cancel(thread3) task.cancel(thread3)