mirror of
https://github.com/lune-org/mlua-luau-scheduler.git
synced 2025-04-10 21:40:55 +01:00
Implement exit codes for runtime
This commit is contained in:
parent
a5ae251fa3
commit
4117cfba75
8 changed files with 199 additions and 19 deletions
|
@ -50,6 +50,10 @@ test = true
|
||||||
name = "callbacks"
|
name = "callbacks"
|
||||||
test = true
|
test = true
|
||||||
|
|
||||||
|
[[example]]
|
||||||
|
name = "exit_code"
|
||||||
|
test = true
|
||||||
|
|
||||||
[[example]]
|
[[example]]
|
||||||
name = "lots_of_threads"
|
name = "lots_of_threads"
|
||||||
test = true
|
test = true
|
||||||
|
|
68
examples/exit_code.rs
Normal file
68
examples/exit_code.rs
Normal file
|
@ -0,0 +1,68 @@
|
||||||
|
#![allow(clippy::missing_errors_doc)]
|
||||||
|
#![allow(clippy::missing_panics_doc)]
|
||||||
|
|
||||||
|
use std::process::ExitCode;
|
||||||
|
|
||||||
|
use async_io::block_on;
|
||||||
|
|
||||||
|
use mlua::prelude::*;
|
||||||
|
use mlua_luau_runtime::{LuaRuntimeExt, Runtime};
|
||||||
|
|
||||||
|
const MAIN_SCRIPT: &str = include_str!("./lua/exit_code.luau");
|
||||||
|
|
||||||
|
const EXIT_IMPL_LUA: &str = r"
|
||||||
|
exit(...)
|
||||||
|
yield()
|
||||||
|
";
|
||||||
|
|
||||||
|
pub fn main() -> LuaResult<()> {
|
||||||
|
tracing_subscriber::fmt::init();
|
||||||
|
|
||||||
|
// Set up persistent Lua environment
|
||||||
|
let lua = Lua::new();
|
||||||
|
|
||||||
|
// Note that our exit function is partially implemented in Lua
|
||||||
|
// because we need to also yield the thread that called it, this
|
||||||
|
// is not possible to do in Rust because of crossing C-call boundary
|
||||||
|
let exit_fn_env = lua.create_table_from(vec![
|
||||||
|
(
|
||||||
|
"exit",
|
||||||
|
lua.create_function(|lua, code: Option<u8>| {
|
||||||
|
let code = code.map(ExitCode::from).unwrap_or_default();
|
||||||
|
lua.set_exit_code(code);
|
||||||
|
Ok(())
|
||||||
|
})?,
|
||||||
|
),
|
||||||
|
(
|
||||||
|
"yield",
|
||||||
|
lua.globals()
|
||||||
|
.get::<_, LuaTable>("coroutine")?
|
||||||
|
.get::<_, LuaFunction>("yield")?,
|
||||||
|
),
|
||||||
|
])?;
|
||||||
|
|
||||||
|
let exit_fn = lua
|
||||||
|
.load(EXIT_IMPL_LUA)
|
||||||
|
.set_environment(exit_fn_env)
|
||||||
|
.into_function()?;
|
||||||
|
lua.globals().set("exit", exit_fn)?;
|
||||||
|
|
||||||
|
// Load the main script into a runtime
|
||||||
|
let rt = Runtime::new(&lua);
|
||||||
|
let main = lua.load(MAIN_SCRIPT);
|
||||||
|
rt.push_thread_front(main, ())?;
|
||||||
|
|
||||||
|
// Run until completion
|
||||||
|
block_on(rt.run());
|
||||||
|
|
||||||
|
// Verify that we got a correct exit code
|
||||||
|
let code = rt.get_exit_code().unwrap_or_default();
|
||||||
|
assert!(format!("{code:?}").contains("(1)"));
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_exit_code() -> LuaResult<()> {
|
||||||
|
main()
|
||||||
|
}
|
8
examples/lua/exit_code.luau
Normal file
8
examples/lua/exit_code.luau
Normal file
|
@ -0,0 +1,8 @@
|
||||||
|
--!nocheck
|
||||||
|
--!nolint UnknownGlobal
|
||||||
|
|
||||||
|
print("Setting exit code manually")
|
||||||
|
|
||||||
|
exit(1)
|
||||||
|
|
||||||
|
error("unreachable")
|
31
lib/exit.rs
Normal file
31
lib/exit.rs
Normal file
|
@ -0,0 +1,31 @@
|
||||||
|
use std::{cell::Cell, process::ExitCode, rc::Rc};
|
||||||
|
|
||||||
|
use event_listener::Event;
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub(crate) struct Exit {
|
||||||
|
code: Rc<Cell<Option<ExitCode>>>,
|
||||||
|
event: Rc<Event>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Exit {
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self {
|
||||||
|
code: Rc::new(Cell::new(None)),
|
||||||
|
event: Rc::new(Event::new()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn set(&self, code: ExitCode) {
|
||||||
|
self.code.set(Some(code));
|
||||||
|
self.event.notify(usize::MAX);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get(&self) -> Option<ExitCode> {
|
||||||
|
self.code.get()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn listen(&self) {
|
||||||
|
self.event.listen().await;
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,4 +1,5 @@
|
||||||
mod error_callback;
|
mod error_callback;
|
||||||
|
mod exit;
|
||||||
mod functions;
|
mod functions;
|
||||||
mod queue;
|
mod queue;
|
||||||
mod result_map;
|
mod result_map;
|
||||||
|
|
14
lib/queue.rs
14
lib/queue.rs
|
@ -1,4 +1,4 @@
|
||||||
use std::{pin::Pin, rc::Rc, sync::Arc};
|
use std::{pin::Pin, rc::Rc};
|
||||||
|
|
||||||
use concurrent_queue::ConcurrentQueue;
|
use concurrent_queue::ConcurrentQueue;
|
||||||
use derive_more::{Deref, DerefMut};
|
use derive_more::{Deref, DerefMut};
|
||||||
|
@ -16,14 +16,14 @@ use crate::{traits::IntoLuaThread, util::ThreadWithArgs, ThreadId};
|
||||||
*/
|
*/
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub(crate) struct ThreadQueue {
|
pub(crate) struct ThreadQueue {
|
||||||
queue: Arc<ConcurrentQueue<ThreadWithArgs>>,
|
queue: Rc<ConcurrentQueue<ThreadWithArgs>>,
|
||||||
event: Arc<Event>,
|
event: Rc<Event>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ThreadQueue {
|
impl ThreadQueue {
|
||||||
pub fn new() -> Self {
|
pub fn new() -> Self {
|
||||||
let queue = Arc::new(ConcurrentQueue::unbounded());
|
let queue = Rc::new(ConcurrentQueue::unbounded());
|
||||||
let event = Arc::new(Event::new());
|
let event = Rc::new(Event::new());
|
||||||
Self { queue, event }
|
Self { queue, event }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -98,13 +98,13 @@ pub type LocalBoxFuture<'fut> = Pin<Box<dyn Future<Output = ()> + 'fut>>;
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub(crate) struct FuturesQueue<'fut> {
|
pub(crate) struct FuturesQueue<'fut> {
|
||||||
queue: Rc<ConcurrentQueue<LocalBoxFuture<'fut>>>,
|
queue: Rc<ConcurrentQueue<LocalBoxFuture<'fut>>>,
|
||||||
event: Arc<Event>,
|
event: Rc<Event>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'fut> FuturesQueue<'fut> {
|
impl<'fut> FuturesQueue<'fut> {
|
||||||
pub fn new() -> Self {
|
pub fn new() -> Self {
|
||||||
let queue = Rc::new(ConcurrentQueue::unbounded());
|
let queue = Rc::new(ConcurrentQueue::unbounded());
|
||||||
let event = Arc::new(Event::new());
|
let event = Rc::new(Event::new());
|
||||||
Self { queue, event }
|
Self { queue, event }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2,6 +2,7 @@
|
||||||
|
|
||||||
use std::{
|
use std::{
|
||||||
cell::Cell,
|
cell::Cell,
|
||||||
|
process::ExitCode,
|
||||||
rc::{Rc, Weak as WeakRc},
|
rc::{Rc, Weak as WeakRc},
|
||||||
sync::{Arc, Weak as WeakArc},
|
sync::{Arc, Weak as WeakArc},
|
||||||
};
|
};
|
||||||
|
@ -14,6 +15,7 @@ use tracing::Instrument;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
error_callback::ThreadErrorCallback,
|
error_callback::ThreadErrorCallback,
|
||||||
|
exit::Exit,
|
||||||
queue::{DeferredThreadQueue, FuturesQueue, SpawnedThreadQueue},
|
queue::{DeferredThreadQueue, FuturesQueue, SpawnedThreadQueue},
|
||||||
result_map::ThreadResultMap,
|
result_map::ThreadResultMap,
|
||||||
status::Status,
|
status::Status,
|
||||||
|
@ -48,6 +50,7 @@ pub struct Runtime<'lua> {
|
||||||
error_callback: ThreadErrorCallback,
|
error_callback: ThreadErrorCallback,
|
||||||
result_map: ThreadResultMap,
|
result_map: ThreadResultMap,
|
||||||
status: Rc<Cell<Status>>,
|
status: Rc<Cell<Status>>,
|
||||||
|
exit: Exit,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'lua> Runtime<'lua> {
|
impl<'lua> Runtime<'lua> {
|
||||||
|
@ -66,6 +69,7 @@ impl<'lua> Runtime<'lua> {
|
||||||
let queue_defer = DeferredThreadQueue::new();
|
let queue_defer = DeferredThreadQueue::new();
|
||||||
let error_callback = ThreadErrorCallback::default();
|
let error_callback = ThreadErrorCallback::default();
|
||||||
let result_map = ThreadResultMap::new();
|
let result_map = ThreadResultMap::new();
|
||||||
|
let exit = Exit::new();
|
||||||
|
|
||||||
assert!(
|
assert!(
|
||||||
lua.app_data_ref::<SpawnedThreadQueue>().is_none(),
|
lua.app_data_ref::<SpawnedThreadQueue>().is_none(),
|
||||||
|
@ -83,11 +87,16 @@ impl<'lua> Runtime<'lua> {
|
||||||
lua.app_data_ref::<ThreadResultMap>().is_none(),
|
lua.app_data_ref::<ThreadResultMap>().is_none(),
|
||||||
"{ERR_METADATA_ALREADY_ATTACHED}"
|
"{ERR_METADATA_ALREADY_ATTACHED}"
|
||||||
);
|
);
|
||||||
|
assert!(
|
||||||
|
lua.app_data_ref::<Exit>().is_none(),
|
||||||
|
"{ERR_METADATA_ALREADY_ATTACHED}"
|
||||||
|
);
|
||||||
|
|
||||||
lua.set_app_data(queue_spawn.clone());
|
lua.set_app_data(queue_spawn.clone());
|
||||||
lua.set_app_data(queue_defer.clone());
|
lua.set_app_data(queue_defer.clone());
|
||||||
lua.set_app_data(error_callback.clone());
|
lua.set_app_data(error_callback.clone());
|
||||||
lua.set_app_data(result_map.clone());
|
lua.set_app_data(result_map.clone());
|
||||||
|
lua.set_app_data(exit.clone());
|
||||||
|
|
||||||
let status = Rc::new(Cell::new(Status::NotStarted));
|
let status = Rc::new(Cell::new(Status::NotStarted));
|
||||||
|
|
||||||
|
@ -98,6 +107,7 @@ impl<'lua> Runtime<'lua> {
|
||||||
error_callback,
|
error_callback,
|
||||||
result_map,
|
result_map,
|
||||||
status,
|
status,
|
||||||
|
exit,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -145,6 +155,23 @@ impl<'lua> Runtime<'lua> {
|
||||||
self.error_callback.clear();
|
self.error_callback.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
Gets the exit code for this runtime, if one has been set.
|
||||||
|
*/
|
||||||
|
#[must_use]
|
||||||
|
pub fn get_exit_code(&self) -> Option<ExitCode> {
|
||||||
|
self.exit.get()
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
Sets the exit code for this runtime.
|
||||||
|
|
||||||
|
This will cause [`Runtime::run`] to exit immediately.
|
||||||
|
*/
|
||||||
|
pub fn set_exit_code(&self, code: ExitCode) {
|
||||||
|
self.exit.set(code);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
Spawns a chunk / function / thread onto the runtime queue.
|
Spawns a chunk / function / thread onto the runtime queue.
|
||||||
|
|
||||||
|
@ -276,10 +303,11 @@ impl<'lua> Runtime<'lua> {
|
||||||
Manually tick the Lua executor, while running under the main executor.
|
Manually tick the Lua executor, while running under the main executor.
|
||||||
Each tick we wait for the next action to perform, in prioritized order:
|
Each tick we wait for the next action to perform, in prioritized order:
|
||||||
|
|
||||||
1. A Lua thread is available to run on the spawned queue
|
1. The exit event is triggered by setting an exit code
|
||||||
2. A Lua thread is available to run on the deferred queue
|
2. A Lua thread is available to run on the spawned queue
|
||||||
3. A new thread-local future is available to run on the local executor
|
3. A Lua thread is available to run on the deferred queue
|
||||||
4. Task(s) scheduled on the Lua executor have made progress and should be polled again
|
4. A new thread-local future is available to run on the local executor
|
||||||
|
5. Task(s) scheduled on the Lua executor have made progress and should be polled again
|
||||||
|
|
||||||
This ordering is vital to ensure that we don't accidentally exit the main loop
|
This ordering is vital to ensure that we don't accidentally exit the main loop
|
||||||
when there are new Lua threads to enqueue and potentially more work to be done.
|
when there are new Lua threads to enqueue and potentially more work to be done.
|
||||||
|
@ -315,11 +343,12 @@ impl<'lua> Runtime<'lua> {
|
||||||
};
|
};
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let fut_spawn = self.queue_spawn.wait_for_item(); // 1
|
let fut_exit = self.exit.listen(); // 1
|
||||||
let fut_defer = self.queue_defer.wait_for_item(); // 2
|
let fut_spawn = self.queue_spawn.wait_for_item(); // 2
|
||||||
let fut_futs = fut_queue.wait_for_item(); // 3
|
let fut_defer = self.queue_defer.wait_for_item(); // 3
|
||||||
|
let fut_futs = fut_queue.wait_for_item(); // 4
|
||||||
|
|
||||||
// 4
|
// 5
|
||||||
let mut num_processed = 0;
|
let mut num_processed = 0;
|
||||||
let span_tick = tracing::debug_span!("tick_executor");
|
let span_tick = tracing::debug_span!("tick_executor");
|
||||||
let fut_tick = async {
|
let fut_tick = async {
|
||||||
|
@ -331,13 +360,20 @@ impl<'lua> Runtime<'lua> {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
// 1 + 2 + 3 + 4
|
// 1 + 2 + 3 + 4 + 5
|
||||||
fut_spawn
|
fut_exit
|
||||||
|
.or(fut_spawn)
|
||||||
.or(fut_defer)
|
.or(fut_defer)
|
||||||
.or(fut_futs)
|
.or(fut_futs)
|
||||||
.or(fut_tick.instrument(span_tick.or_current()))
|
.or(fut_tick.instrument(span_tick.or_current()))
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
|
// Check if we should exit
|
||||||
|
if self.exit.get().is_some() {
|
||||||
|
tracing::trace!("exited with code");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
// Emit traces
|
// Emit traces
|
||||||
if num_processed > 0 {
|
if num_processed > 0 {
|
||||||
tracing::trace!(num_processed, "tasks_processed");
|
tracing::trace!(num_processed, "tasks_processed");
|
||||||
|
@ -410,5 +446,8 @@ impl Drop for Runtime<'_> {
|
||||||
self.lua
|
self.lua
|
||||||
.remove_app_data::<ThreadResultMap>()
|
.remove_app_data::<ThreadResultMap>()
|
||||||
.expect(ERR_METADATA_REMOVED);
|
.expect(ERR_METADATA_REMOVED);
|
||||||
|
self.lua
|
||||||
|
.remove_app_data::<Exit>()
|
||||||
|
.expect(ERR_METADATA_REMOVED);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,13 +1,16 @@
|
||||||
#![allow(unused_imports)]
|
#![allow(unused_imports)]
|
||||||
#![allow(clippy::missing_errors_doc)]
|
#![allow(clippy::missing_errors_doc)]
|
||||||
|
|
||||||
use std::{future::Future, rc::Weak as WeakRc, sync::Weak as WeakArc};
|
use std::{
|
||||||
|
cell::Cell, future::Future, process::ExitCode, rc::Weak as WeakRc, sync::Weak as WeakArc,
|
||||||
|
};
|
||||||
|
|
||||||
use mlua::prelude::*;
|
use mlua::prelude::*;
|
||||||
|
|
||||||
use async_executor::{Executor, Task};
|
use async_executor::{Executor, Task};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
|
exit::Exit,
|
||||||
queue::{DeferredThreadQueue, FuturesQueue, SpawnedThreadQueue},
|
queue::{DeferredThreadQueue, FuturesQueue, SpawnedThreadQueue},
|
||||||
result_map::ThreadResultMap,
|
result_map::ThreadResultMap,
|
||||||
runtime::Runtime,
|
runtime::Runtime,
|
||||||
|
@ -61,9 +64,28 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
Trait for scheduling Lua threads and spawning futures on the current executor.
|
Trait for interacting with the current [`Runtime`].
|
||||||
|
|
||||||
|
Provides extra methods on the [`Lua`] struct for:
|
||||||
|
|
||||||
|
- Setting the exit code and forcibly stopping the runtime
|
||||||
|
- Pushing (spawning) and deferring (pushing to the back) lua threads
|
||||||
|
- Tracking and getting the result of lua threads
|
||||||
|
- Spawning thread-local (`!Send`) futures on the current executor
|
||||||
|
- Spawning background (`Send`) futures on the current executor
|
||||||
*/
|
*/
|
||||||
pub trait LuaRuntimeExt<'lua> {
|
pub trait LuaRuntimeExt<'lua> {
|
||||||
|
/**
|
||||||
|
Sets the exit code of the current runtime.
|
||||||
|
|
||||||
|
See [`Runtime::set_exit_code`] for more information.
|
||||||
|
|
||||||
|
# Panics
|
||||||
|
|
||||||
|
Panics if called outside of a running [`Runtime`].
|
||||||
|
*/
|
||||||
|
fn set_exit_code(&self, code: ExitCode);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
Pushes (spawns) a lua thread to the **front** of the current runtime.
|
Pushes (spawns) a lua thread to the **front** of the current runtime.
|
||||||
|
|
||||||
|
@ -206,6 +228,13 @@ pub trait LuaRuntimeExt<'lua> {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'lua> LuaRuntimeExt<'lua> for Lua {
|
impl<'lua> LuaRuntimeExt<'lua> for Lua {
|
||||||
|
fn set_exit_code(&self, code: ExitCode) {
|
||||||
|
let exit = self
|
||||||
|
.app_data_ref::<Exit>()
|
||||||
|
.expect("exit code can only be set within a runtime");
|
||||||
|
exit.set(code);
|
||||||
|
}
|
||||||
|
|
||||||
fn push_thread_front(
|
fn push_thread_front(
|
||||||
&'lua self,
|
&'lua self,
|
||||||
thread: impl IntoLuaThread<'lua>,
|
thread: impl IntoLuaThread<'lua>,
|
||||||
|
|
Loading…
Add table
Reference in a new issue