Support built-in coroutine global in new task scheduler

This commit is contained in:
Filip Tibell 2023-02-14 23:17:03 +01:00
parent 3990b8e064
commit 8d44e1e827
No known key found for this signature in database
9 changed files with 204 additions and 126 deletions

View file

@ -17,8 +17,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed ### Fixed
- Fixed `stdio.prompt` blocking all other lua threads while prompting for input - Fixed remaining edge cases where the `task` and `coroutine` libraries weren't interoperable
- Fixed `task.delay` keeping the script running even if it was cancelled using `task.cancel` - Fixed `task.delay` keeping the script running even if it was cancelled using `task.cancel`
- Fixed `stdio.prompt` blocking all other lua threads while prompting for input
## `0.4.0` - February 11th, 2023 ## `0.4.0` - February 11th, 2023

View file

@ -72,6 +72,17 @@ impl LuneGlobal {
matches!(self, Self::Require | Self::TopLevel) matches!(self, Self::Require | Self::TopLevel)
} }
/**
Checks if this Lune global is an injector.
An injector is similar to a proxy global but will inject
value(s) into the global lua environment during creation,
to ensure correct usage and compatibility with base Luau.
*/
pub fn is_injector(&self) -> bool {
matches!(self, Self::Task)
}
/** /**
Creates the [`mlua::Table`] value for this Lune global. Creates the [`mlua::Table`] value for this Lune global.

View file

@ -108,7 +108,7 @@ async fn net_serve<'a>(
lua.create_registry_value(handler) lua.create_registry_value(handler)
.expect("Failed to store websocket handler") .expect("Failed to store websocket handler")
}); });
let sched = lua.app_data_mut::<&TaskScheduler>().unwrap(); let sched = lua.app_data_ref::<&TaskScheduler>().unwrap();
// Bind first to make sure that we can bind to this address // Bind first to make sure that we can bind to this address
let bound = match Server::try_bind(&([127, 0, 0, 1], port).into()) { let bound = match Server::try_bind(&([127, 0, 0, 1], port).into()) {
Err(e) => { Err(e) => {

View file

@ -50,7 +50,7 @@ pub fn create(lua: &'static Lua, args_vec: Vec<String>) -> LuaResult<LuaTable> {
let process_exit_env_yield: LuaFunction = lua.named_registry_value("co.yield")?; let process_exit_env_yield: LuaFunction = lua.named_registry_value("co.yield")?;
let process_exit_env_exit: LuaFunction = lua.create_function(|lua, code: Option<u8>| { let process_exit_env_exit: LuaFunction = lua.create_function(|lua, code: Option<u8>| {
let exit_code = code.map_or(ExitCode::SUCCESS, ExitCode::from); let exit_code = code.map_or(ExitCode::SUCCESS, ExitCode::from);
let sched = lua.app_data_mut::<&TaskScheduler>().unwrap(); let sched = lua.app_data_ref::<&TaskScheduler>().unwrap();
sched.set_exit_code(exit_code); sched.set_exit_code(exit_code);
Ok(()) Ok(())
})?; })?;

View file

@ -1,7 +1,7 @@
use mlua::prelude::*; use mlua::prelude::*;
use crate::{ use crate::{
lua::task::{TaskReference, TaskScheduler}, lua::task::{TaskKind, TaskReference, TaskScheduler},
utils::table::TableBuilder, utils::table::TableBuilder,
}; };
@ -18,19 +18,20 @@ return task
"#; "#;
pub fn create(lua: &'static Lua) -> LuaResult<LuaTable<'static>> { pub fn create(lua: &'static Lua) -> LuaResult<LuaTable<'static>> {
// Create task spawning functions that add tasks to the scheduler // Create a user-accessible function cancel tasks
let task_cancel = lua.create_function(|lua, task: TaskReference| { let task_cancel = lua.create_function(|lua, task: TaskReference| {
let sched = lua.app_data_mut::<&TaskScheduler>().unwrap(); let sched = lua.app_data_ref::<&TaskScheduler>().unwrap();
sched.remove_task(task)?; sched.remove_task(task)?;
Ok(()) Ok(())
})?; })?;
// Create functions that manipulate non-blocking tasks in the scheduler
let task_defer = lua.create_function(|lua, (tof, args): (LuaValue, LuaMultiValue)| { let task_defer = lua.create_function(|lua, (tof, args): (LuaValue, LuaMultiValue)| {
let sched = lua.app_data_mut::<&TaskScheduler>().unwrap(); let sched = lua.app_data_ref::<&TaskScheduler>().unwrap();
sched.schedule_deferred(tof, args) sched.schedule_deferred(tof, args)
})?; })?;
let task_delay = let task_delay =
lua.create_function(|lua, (secs, tof, args): (f64, LuaValue, LuaMultiValue)| { lua.create_function(|lua, (secs, tof, args): (f64, LuaValue, LuaMultiValue)| {
let sched = lua.app_data_mut::<&TaskScheduler>().unwrap(); let sched = lua.app_data_ref::<&TaskScheduler>().unwrap();
sched.schedule_delayed(secs, tof, args) sched.schedule_delayed(secs, tof, args)
})?; })?;
// Create our task wait function, this is a bit different since // Create our task wait function, this is a bit different since
@ -47,7 +48,7 @@ pub fn create(lua: &'static Lua) -> LuaResult<LuaTable<'static>> {
.with_function( .with_function(
"resume_after", "resume_after",
|lua, (thread, secs): (LuaThread, Option<f64>)| { |lua, (thread, secs): (LuaThread, Option<f64>)| {
let sched = lua.app_data_mut::<&TaskScheduler>().unwrap(); let sched = lua.app_data_ref::<&TaskScheduler>().unwrap();
sched.schedule_wait(secs.unwrap_or(0f64), LuaValue::Thread(thread)) sched.schedule_wait(secs.unwrap_or(0f64), LuaValue::Thread(thread))
}, },
)? )?
@ -68,12 +69,12 @@ pub fn create(lua: &'static Lua) -> LuaResult<LuaTable<'static>> {
.with_function( .with_function(
"resume_first", "resume_first",
|lua, (tof, args): (LuaValue, LuaMultiValue)| { |lua, (tof, args): (LuaValue, LuaMultiValue)| {
let sched = lua.app_data_mut::<&TaskScheduler>().unwrap(); let sched = lua.app_data_ref::<&TaskScheduler>().unwrap();
sched.schedule_current_resume(tof, args) sched.schedule_current_resume(tof, args)
}, },
)? )?
.with_function("resume_second", |lua, thread: LuaThread| { .with_function("resume_second", |lua, thread: LuaThread| {
let sched = lua.app_data_mut::<&TaskScheduler>().unwrap(); let sched = lua.app_data_ref::<&TaskScheduler>().unwrap();
sched.schedule_after_current_resume( sched.schedule_after_current_resume(
LuaValue::Thread(thread), LuaValue::Thread(thread),
LuaMultiValue::new(), LuaMultiValue::new(),
@ -106,6 +107,41 @@ pub fn create(lua: &'static Lua) -> LuaResult<LuaTable<'static>> {
})?; })?;
globals.set("type", type_proxy)?; globals.set("type", type_proxy)?;
globals.set("typeof", typeof_proxy)?; globals.set("typeof", typeof_proxy)?;
// Functions in the built-in coroutine library also need to be
// replaced, these are a bit different than the ones above because
// calling resume or the function that wrap returns must return
// whatever lua value(s) that the thread or task yielded back
let coroutine = globals.get::<_, LuaTable>("coroutine")?;
coroutine.set(
"resume",
lua.create_function(|lua, value: LuaValue| {
if let LuaValue::Thread(thread) = value {
let sched = lua.app_data_ref::<&TaskScheduler>().unwrap();
let task =
sched.create_task(TaskKind::Instant, LuaValue::Thread(thread), None, None)?;
sched.resume_task(task, None)
} else if let Ok(task) = TaskReference::from_lua(value, lua) {
let sched = lua.app_data_ref::<&TaskScheduler>().unwrap();
sched.resume_task(task, None)
} else {
Err(LuaError::RuntimeError(
"Argument #1 must be a thread".to_string(),
))
}
})?,
)?;
coroutine.set(
"wrap",
lua.create_function(|lua, func: LuaFunction| {
let sched = lua.app_data_ref::<&TaskScheduler>().unwrap();
let task =
sched.create_task(TaskKind::Instant, LuaValue::Function(func), None, None)?;
lua.create_function(move |lua, args: LuaMultiValue| {
let sched = lua.app_data_ref::<&TaskScheduler>().unwrap();
sched.resume_task(task, Some(args.into_vec()))
})
})?,
)?;
// All good, return the task scheduler lib // All good, return the task scheduler lib
TableBuilder::new(lua)? TableBuilder::new(lua)?
.with_value("cancel", task_cancel)? .with_value("cancel", task_cancel)?

View file

@ -51,7 +51,7 @@ impl Service<Request<Body>> for NetServiceInner {
let _ws = ws.await.map_err(LuaError::external)?; let _ws = ws.await.map_err(LuaError::external)?;
// let sock = NetWebSocketServer::from(ws); // let sock = NetWebSocketServer::from(ws);
// let table = sock.into_lua_table(lua)?; // let table = sock.into_lua_table(lua)?;
// let sched = lua.app_data_mut::<&TaskScheduler>().unwrap(); // let sched = lua.app_data_ref::<&TaskScheduler>().unwrap();
// sched.schedule_current_resume( // sched.schedule_current_resume(
// LuaValue::Function(handler), // LuaValue::Function(handler),
// LuaMultiValue::from_vec(vec![LuaValue::Table(table)]), // LuaMultiValue::from_vec(vec![LuaValue::Table(table)]),

View file

@ -135,6 +135,9 @@ impl TaskSchedulerResult {
fn new(sched: &TaskScheduler) -> Self { fn new(sched: &TaskScheduler) -> Self {
const MESSAGE: &str = const MESSAGE: &str =
"Failed to get lock - make sure not to call during task scheduler resumption"; "Failed to get lock - make sure not to call during task scheduler resumption";
let num_instant = sched.task_queue_instant.try_lock().expect(MESSAGE).len();
let num_deferred = sched.task_queue_deferred.try_lock().expect(MESSAGE).len();
let num_active = num_instant + num_deferred;
Self { Self {
lua_error: None, lua_error: None,
exit_code: if sched.exit_code_set.load(Ordering::Relaxed) { exit_code: if sched.exit_code_set.load(Ordering::Relaxed) {
@ -142,11 +145,11 @@ impl TaskSchedulerResult {
} else { } else {
None None
}, },
num_instant: sched.task_queue_instant.try_lock().expect(MESSAGE).len(), num_instant,
num_deferred: sched.task_queue_deferred.try_lock().expect(MESSAGE).len(), num_deferred,
num_futures: sched.futures.try_lock().expect(MESSAGE).len(), num_futures: sched.futures.try_lock().expect(MESSAGE).len(),
num_background: sched.futures_in_background.load(Ordering::Relaxed), num_background: sched.futures_in_background.load(Ordering::Relaxed),
num_active: sched.tasks.try_lock().expect(MESSAGE).len(), num_active,
} }
} }
@ -176,18 +179,24 @@ impl TaskSchedulerResult {
Returns `true` if the task scheduler is still busy, Returns `true` if the task scheduler is still busy,
meaning it still has lua threads left to run. meaning it still has lua threads left to run.
*/ */
#[allow(dead_code)]
pub fn is_busy(&self) -> bool { pub fn is_busy(&self) -> bool {
self.num_active > 0 self.num_active > 0
} }
/**
Returns `true` if the task scheduler has finished all
blocking lua tasks, but still has yielding tasks running.
*/
pub fn is_yielding(&self) -> bool {
self.num_active == 0 && self.num_futures > 0
}
/** /**
Returns `true` if the task scheduler has finished all Returns `true` if the task scheduler has finished all
lua threads, but still has background tasks running. lua threads, but still has background tasks running.
*/ */
#[allow(dead_code)]
pub fn is_background(&self) -> bool { pub fn is_background(&self) -> bool {
self.num_active == 0 && self.num_background > 0 self.num_active == 0 && self.num_futures == 0 && self.num_background > 0
} }
/** /**
@ -196,7 +205,7 @@ impl TaskSchedulerResult {
no spawned tasks are running in the background. no spawned tasks are running in the background.
*/ */
pub fn is_done(&self) -> bool { pub fn is_done(&self) -> bool {
self.num_active == 0 && self.num_background == 0 self.num_active == 0 && self.num_futures == 0 && self.num_background == 0
} }
} }
@ -204,6 +213,8 @@ impl fmt::Display for TaskSchedulerResult {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let status = if self.is_busy() { let status = if self.is_busy() {
"Busy" "Busy"
} else if self.is_yielding() {
"Yielding"
} else if self.is_background() { } else if self.is_background() {
"Background" "Background"
} else { } else {
@ -360,12 +371,28 @@ impl<'fut> TaskScheduler<'fut> {
self.exit_code_set.store(true, Ordering::Relaxed); self.exit_code_set.store(true, Ordering::Relaxed);
} }
/**
Checks if a task still exists in the scheduler.
A task may no longer exist in the scheduler if it has been manually
cancelled and removed by calling [`TaskScheduler::cancel_task()`].
*/
#[allow(dead_code)]
pub fn contains_task(&self, reference: TaskReference) -> bool {
let tasks = self.tasks.lock().unwrap();
tasks.contains_key(&reference)
}
/** /**
Creates a new task, storing a new Lua thread Creates a new task, storing a new Lua thread
for it, as well as the arguments to give the for it, as well as the arguments to give the
thread on resumption, in the Lua registry. thread on resumption, in the Lua registry.
Note that this task will ***not*** resume on its
own, it needs to be used together with either the
scheduling functions or [`TaskScheduler::resume_task`].
*/ */
fn create_task( pub fn create_task(
&self, &self,
kind: TaskKind, kind: TaskKind,
thread_or_function: LuaValue<'_>, thread_or_function: LuaValue<'_>,
@ -409,6 +436,95 @@ impl<'fut> TaskScheduler<'fut> {
Ok(task_ref) Ok(task_ref)
} }
/**
Cancels a task, if the task still exists in the scheduler.
It is possible to hold one or more task references that point
to a task that no longer exists in the scheduler, and calling
this method with one of those references will return `false`.
*/
pub fn remove_task(&self, reference: TaskReference) -> LuaResult<bool> {
/*
Remove the task from the task list and the Lua registry
This is all we need to do since resume_task will always
ignore resumption of any task that no longer exists there
This does lead to having some amount of "junk" futures that will
build up in the queue but these will get cleaned up and not block
the program from exiting since the scheduler only runs until there
are no tasks left in the task list, the futures do not matter there
*/
let mut found = false;
let mut tasks = self.tasks.lock().unwrap();
// Unfortunately we have to loop through to find which task
// references to remove instead of removing directly since
// tasks can switch kinds between instant, deferred, future
let tasks_to_remove: Vec<_> = tasks
.keys()
.filter(|task_ref| task_ref.guid == reference.guid)
.copied()
.collect();
for task_ref in tasks_to_remove {
if let Some(task) = tasks.remove(&task_ref) {
self.lua.remove_registry_value(task.thread)?;
self.lua.remove_registry_value(task.args)?;
found = true;
}
}
Ok(found)
}
/**
Resumes a task, if the task still exists in the scheduler.
A task may no longer exist in the scheduler if it has been manually
cancelled and removed by calling [`TaskScheduler::cancel_task()`].
This will be a no-op if the task no longer exists.
*/
pub fn resume_task<'a>(
&self,
reference: TaskReference,
override_args: Option<Vec<LuaValue<'a>>>,
) -> LuaResult<LuaMultiValue<'a>> {
self.guid_running_task
.store(reference.guid, Ordering::Relaxed);
let task = {
let mut tasks = self.tasks.lock().unwrap();
match tasks.remove(&reference) {
Some(task) => task,
None => return Ok(LuaMultiValue::new()), // Task was removed
}
};
let thread: LuaThread = self.lua.registry_value(&task.thread)?;
let args_vec_opt = override_args.or_else(|| {
self.lua
.registry_value::<Option<Vec<LuaValue>>>(&task.args)
.expect("Failed to get stored args for task")
});
self.lua.remove_registry_value(task.thread)?;
self.lua.remove_registry_value(task.args)?;
let rets = if let Some(args) = args_vec_opt {
thread.resume::<_, LuaMultiValue>(LuaMultiValue::from_vec(args))
} else {
/*
The tasks did not get any arguments from either:
- Providing arguments at the call site for creating the task
- Returning arguments from a future that created this task
The only tasks that do not get any arguments from either
of those sources are waiting tasks, and waiting tasks
want the amount of time waited returned to them.
*/
let elapsed = task.queued_at.elapsed().as_secs_f64();
thread.resume::<_, LuaMultiValue>(elapsed)
};
self.guid_running_task.store(0, Ordering::Relaxed);
rets
}
/** /**
Queues a new task to run on the task scheduler. Queues a new task to run on the task scheduler.
@ -618,7 +734,12 @@ impl<'fut> TaskScheduler<'fut> {
// which ensures that the TaskReference is identical and // which ensures that the TaskReference is identical and
// that any waits inside of spawned tasks will also cancel // that any waits inside of spawned tasks will also cancel
match self.guid_running_task.load(Ordering::Relaxed) { match self.guid_running_task.load(Ordering::Relaxed) {
0 => panic!("Tried to schedule waiting task with no task running"), 0 => {
// NOTE: We had this here to verify the behavior of our task scheduler during development,
// but for re-implementing coroutine.resume (which is not registered) we must not panic here
// panic!("Tried to schedule waiting task with no registered task running")
None
}
guid => Some(guid), guid => Some(guid),
}, },
async move { async move {
@ -666,7 +787,7 @@ impl<'fut> TaskScheduler<'fut> {
"resume_async", "resume_async",
move |lua: &Lua, (thread, args): (LuaThread, A)| { move |lua: &Lua, (thread, args): (LuaThread, A)| {
let fut = func(lua, args); let fut = func(lua, args);
let sched = lua.app_data_mut::<&TaskScheduler>().unwrap(); let sched = lua.app_data_ref::<&TaskScheduler>().unwrap();
sched.schedule_async(LuaValue::Thread(thread), async { sched.schedule_async(LuaValue::Thread(thread), async {
let rets = fut.await?; let rets = fut.await?;
let mult = rets.to_lua_multi(lua)?; let mult = rets.to_lua_multi(lua)?;
@ -679,107 +800,6 @@ impl<'fut> TaskScheduler<'fut> {
.into_function() .into_function()
} }
/**
Checks if a task still exists in the scheduler.
A task may no longer exist in the scheduler if it has been manually
cancelled and removed by calling [`TaskScheduler::cancel_task()`].
*/
#[allow(dead_code)]
pub fn contains_task(&self, reference: TaskReference) -> bool {
let tasks = self.tasks.lock().unwrap();
tasks.contains_key(&reference)
}
/**
Cancels a task, if the task still exists in the scheduler.
It is possible to hold one or more task references that point
to a task that no longer exists in the scheduler, and calling
this method with one of those references will return `false`.
*/
pub fn remove_task(&self, reference: TaskReference) -> LuaResult<bool> {
/*
Remove the task from the task list and the Lua registry
This is all we need to do since resume_task will always
ignore resumption of any task that no longer exists there
This does lead to having some amount of "junk" futures that will
build up in the queue but these will get cleaned up and not block
the program from exiting since the scheduler only runs until there
are no tasks left in the task list, the futures do not matter there
*/
let mut found = false;
let mut tasks = self.tasks.lock().unwrap();
// Unfortunately we have to loop through to find which task
// references to remove instead of removing directly since
// tasks can switch kinds between instant, deferred, future
let tasks_to_remove: Vec<_> = tasks
.keys()
.filter(|task_ref| task_ref.guid == reference.guid)
.copied()
.collect();
for task_ref in tasks_to_remove {
if let Some(task) = tasks.remove(&task_ref) {
self.lua.remove_registry_value(task.thread)?;
self.lua.remove_registry_value(task.args)?;
found = true;
}
}
Ok(found)
}
/**
Resumes a task, if the task still exists in the scheduler.
A task may no longer exist in the scheduler if it has been manually
cancelled and removed by calling [`TaskScheduler::cancel_task()`].
This will be a no-op if the task no longer exists.
*/
pub fn resume_task(
&self,
reference: TaskReference,
override_args: Option<Vec<LuaValue>>,
) -> LuaResult<()> {
self.guid_running_task
.store(reference.guid, Ordering::Relaxed);
let task = {
let mut tasks = self.tasks.lock().unwrap();
match tasks.remove(&reference) {
Some(task) => task,
None => return Ok(()), // Task was removed
}
};
let thread: LuaThread = self.lua.registry_value(&task.thread)?;
let args_vec_opt = override_args.or_else(|| {
self.lua
.registry_value::<Option<Vec<LuaValue>>>(&task.args)
.expect("Failed to get stored args for task")
});
self.lua.remove_registry_value(task.thread)?;
self.lua.remove_registry_value(task.args)?;
if let Some(args) = args_vec_opt {
thread.resume::<_, LuaMultiValue>(LuaMultiValue::from_vec(args))?;
} else {
/*
The tasks did not get any arguments from either:
- Providing arguments at the call site for creating the task
- Returning arguments from a future that created this task
The only tasks that do not get any arguments from either
of those sources are waiting tasks, and waiting tasks
want the amount of time waited returned to them.
*/
let elapsed = task.queued_at.elapsed().as_secs_f64();
thread.resume::<_, LuaMultiValue>(elapsed)?;
}
self.guid_running_task.store(0, Ordering::Relaxed);
Ok(())
}
/** /**
Registers a new background task with the task scheduler. Registers a new background task with the task scheduler.
@ -838,7 +858,7 @@ impl<'fut> TaskScheduler<'fut> {
} { } {
None => TaskSchedulerResult::new(self), None => TaskSchedulerResult::new(self),
Some(task) => match self.resume_task(task, override_args) { Some(task) => match self.resume_task(task, override_args) {
Ok(()) => TaskSchedulerResult::new(self), Ok(_) => TaskSchedulerResult::new(self),
Err(task_err) => TaskSchedulerResult::err(self, task_err), Err(task_err) => TaskSchedulerResult::err(self, task_err),
}, },
} }

View file

@ -78,7 +78,7 @@ impl TableBuilder {
F: 'static + Fn(&'static Lua, A) -> FR, F: 'static + Fn(&'static Lua, A) -> FR,
FR: 'static + Future<Output = LuaResult<R>>, FR: 'static + Future<Output = LuaResult<R>>,
{ {
let sched = self.lua.app_data_mut::<&TaskScheduler>().unwrap(); let sched = self.lua.app_data_ref::<&TaskScheduler>().unwrap();
let func = sched.make_scheduled_async_fn(func)?; let func = sched.make_scheduled_async_fn(func)?;
self.with_value(key, LuaValue::Function(func)) self.with_value(key, LuaValue::Function(func))
} }

View file

@ -44,7 +44,8 @@ measure(1 / 30)
measure(1 / 20) measure(1 / 20)
measure(1 / 10) measure(1 / 10)
-- Wait should work in other threads, too -- Wait should work in other threads, including
-- ones created by the built-in coroutine library
local flag: boolean = false local flag: boolean = false
task.spawn(function() task.spawn(function()
@ -63,3 +64,12 @@ end))
assert(not flag2, "Wait failed while inside coroutine (1)") assert(not flag2, "Wait failed while inside coroutine (1)")
task.wait(0.2) task.wait(0.2)
assert(flag2, "Wait failed while inside coroutine (2)") assert(flag2, "Wait failed while inside coroutine (2)")
local flag3: boolean = false
coroutine.wrap(function()
task.wait(0.1)
flag3 = true
end)()
assert(not flag3, "Wait failed while inside wrap (1)")
task.wait(0.2)
assert(flag3, "Wait failed while inside wrap (2)")