Make it easier to construct functions and spawn local futures on lua runtime

This commit is contained in:
Filip Tibell 2024-01-28 20:14:10 +01:00
parent 81308b65e3
commit f4ecf7e018
No known key found for this signature in database
8 changed files with 328 additions and 133 deletions

View file

@ -26,7 +26,19 @@ pub fn main() -> LuaResult<()> {
Err(e) => Err(e),
}
});
task.await.into_lua_err()
// Wait for it to complete
let result = task.await.into_lua_err();
// We can also spawn local tasks that do take up resources
// on the Lua thread, but that do not have the Send bound
if result.is_ok() {
lua.spawn_local(async move {
println!("File read successfully!");
});
}
result
})?,
)?;

View file

@ -5,7 +5,7 @@ use std::time::Duration;
use async_io::{block_on, Timer};
use mlua::prelude::*;
use mlua_luau_runtime::Runtime;
use mlua_luau_runtime::{Functions, Runtime};
const MAIN_SCRIPT: &str = include_str!("./lua/lots_of_threads.luau");
@ -17,9 +17,9 @@ pub fn main() -> LuaResult<()> {
// Set up persistent Lua environment
let lua = Lua::new();
let rt = Runtime::new(&lua);
let fns = Functions::new(&lua)?;
let rt_fns = rt.create_functions()?;
lua.globals().set("spawn", rt_fns.spawn)?;
lua.globals().set("spawn", fns.spawn)?;
lua.globals().set(
"sleep",
lua.create_async_function(|_, ()| async move {

View file

@ -6,7 +6,7 @@ use std::time::{Duration, Instant};
use async_io::{block_on, Timer};
use mlua::prelude::*;
use mlua_luau_runtime::Runtime;
use mlua_luau_runtime::{Functions, Runtime};
const MAIN_SCRIPT: &str = include_str!("./lua/scheduler_ordering.luau");
@ -16,10 +16,10 @@ pub fn main() -> LuaResult<()> {
// Set up persistent Lua environment
let lua = Lua::new();
let rt = Runtime::new(&lua);
let fns = Functions::new(&lua)?;
let rt_fns = rt.create_functions()?;
lua.globals().set("spawn", rt_fns.spawn)?;
lua.globals().set("defer", rt_fns.defer)?;
lua.globals().set("spawn", fns.spawn)?;
lua.globals().set("defer", fns.defer)?;
lua.globals().set(
"sleep",
lua.create_async_function(|_, duration: Option<f64>| async move {

120
lib/functions.rs Normal file
View file

@ -0,0 +1,120 @@
#![allow(unused_imports)]
#![allow(clippy::module_name_repetitions)]
use mlua::prelude::*;
use crate::{
error_callback::ThreadErrorCallback,
queue::{DeferredThreadQueue, SpawnedThreadQueue},
runtime::Runtime,
util::LuaThreadOrFunction,
};
const ERR_METADATA_NOT_ATTACHED: &str = "\
Lua state does not have runtime metadata attached!\
\nThis is most likely caused by creating functions outside of a runtime.\
\nRuntime functions must always be created from within an active runtime.\
";
/**
A collection of lua functions that may be called to interact with a [`Runtime`].
*/
pub struct Functions<'lua> {
/**
Resumes a function / thread once instantly, and runs until first yield.
Spawns onto the runtime queue if not completed.
*/
pub spawn: LuaFunction<'lua>,
/**
Defers a function / thread onto the runtime queue.
Does not resume instantly, only adds to the queue.
*/
pub defer: LuaFunction<'lua>,
/**
Cancels a function / thread, removing it from the queue.
*/
pub cancel: LuaFunction<'lua>,
}
impl<'lua> Functions<'lua> {
/**
Creates a new collection of Lua functions that may be called to interact with a [`Runtime`].
# Errors
Errors when out of memory, or if default Lua globals are missing.
# Panics
Panics when the given [`Lua`] instance does not have an attached [`Runtime`].
*/
pub fn new(lua: &'lua Lua) -> LuaResult<Self> {
let spawn_queue = lua
.app_data_ref::<SpawnedThreadQueue>()
.expect(ERR_METADATA_NOT_ATTACHED)
.clone();
let defer_queue = lua
.app_data_ref::<DeferredThreadQueue>()
.expect(ERR_METADATA_NOT_ATTACHED)
.clone();
let error_callback = lua
.app_data_ref::<ThreadErrorCallback>()
.expect(ERR_METADATA_NOT_ATTACHED)
.clone();
let spawn = lua.create_function(
move |lua, (tof, args): (LuaThreadOrFunction, LuaMultiValue)| {
let thread = tof.into_thread(lua)?;
if thread.status() == LuaThreadStatus::Resumable {
// NOTE: We need to resume the thread once instantly for correct behavior,
// and only if we get the pending value back we can spawn to async executor
match thread.resume::<_, LuaValue>(args.clone()) {
Ok(v) => {
if v.as_light_userdata()
.map(|l| l == Lua::poll_pending())
.unwrap_or_default()
{
spawn_queue.push_item(lua, &thread, args)?;
}
}
Err(e) => {
error_callback.call(&e);
}
};
}
Ok(thread)
},
)?;
let defer = lua.create_function(
move |lua, (tof, args): (LuaThreadOrFunction, LuaMultiValue)| {
let thread = tof.into_thread(lua)?;
if thread.status() == LuaThreadStatus::Resumable {
defer_queue.push_item(lua, &thread, args)?;
}
Ok(thread)
},
)?;
let close = lua
.globals()
.get::<_, LuaTable>("coroutine")?
.get::<_, LuaFunction>("close")?;
let close_key = lua.create_registry_value(close)?;
let cancel = lua.create_function(move |lua, thread: LuaThread| {
let close: LuaFunction = lua.registry_value(&close_key)?;
match close.call(thread) {
Err(LuaError::CoroutineInactive) | Ok(()) => Ok(()),
Err(e) => Err(e),
}
})?;
Ok(Self {
spawn,
defer,
cancel,
})
}
}

View file

@ -1,4 +1,5 @@
mod error_callback;
mod functions;
mod handle;
mod queue;
mod runtime;
@ -6,7 +7,8 @@ mod status;
mod traits;
mod util;
pub use functions::Functions;
pub use handle::Handle;
pub use runtime::{Functions, Runtime};
pub use runtime::Runtime;
pub use status::Status;
pub use traits::{IntoLuaThread, LuaRuntimeExt};

View file

@ -1,8 +1,9 @@
use std::sync::Arc;
use std::{pin::Pin, rc::Rc, sync::Arc};
use concurrent_queue::ConcurrentQueue;
use derive_more::{Deref, DerefMut};
use event_listener::Event;
use futures_lite::{Future, FutureExt};
use mlua::prelude::*;
use crate::{handle::Handle, traits::IntoLuaThread, util::ThreadWithArgs};
@ -98,3 +99,42 @@ impl DeferredThreadQueue {
Self(ThreadQueue::new())
}
}
pub type LocalBoxFuture<'fut> = Pin<Box<dyn Future<Output = ()> + 'fut>>;
/**
Queue for storing local futures.
Provides methods for pushing and draining the queue, as
well as listening for new items being pushed to the queue.
*/
#[derive(Debug, Clone)]
pub(crate) struct FuturesQueue<'fut> {
queue: Rc<ConcurrentQueue<LocalBoxFuture<'fut>>>,
event: Arc<Event>,
}
impl<'fut> FuturesQueue<'fut> {
pub fn new() -> Self {
let queue = Rc::new(ConcurrentQueue::unbounded());
let event = Arc::new(Event::new());
Self { queue, event }
}
pub fn push_item(&self, fut: impl Future<Output = ()> + 'fut) {
let _ = self.queue.push(fut.boxed_local());
self.event.notify(usize::MAX);
}
pub fn drain_items<'outer>(
&'outer self,
) -> impl Iterator<Item = LocalBoxFuture<'fut>> + 'outer {
self.queue.try_iter()
}
pub async fn wait_for_item(&self) {
if self.queue.is_empty() {
self.event.listen().await;
}
}
}

View file

@ -2,8 +2,8 @@
use std::{
cell::Cell,
rc::Rc,
sync::{Arc, Weak},
rc::{Rc, Weak as WeakRc},
sync::{Arc, Weak as WeakArc},
};
use futures_lite::prelude::*;
@ -15,10 +15,10 @@ use tracing::Instrument;
use crate::{
error_callback::ThreadErrorCallback,
handle::Handle,
queue::{DeferredThreadQueue, SpawnedThreadQueue},
queue::{DeferredThreadQueue, FuturesQueue, SpawnedThreadQueue},
status::Status,
traits::IntoLuaThread,
util::{run_until_yield, LuaThreadOrFunction},
util::run_until_yield,
};
const ERR_METADATA_ALREADY_ATTACHED: &str = "\
@ -32,6 +32,10 @@ Lua state runtime metadata was unexpectedly removed!\
\nThis should never happen, and is likely a bug in the runtime.\
";
const ERR_SET_CALLBACK_WHEN_RUNNING: &str = "\
Cannot set error callback when runtime is running!\
";
/**
A runtime for running Lua threads and async tasks.
*/
@ -49,6 +53,10 @@ impl<'lua> Runtime<'lua> {
Creates a new runtime for the given Lua state.
This runtime will have a default error callback that prints errors to stderr.
# Panics
Panics if the given Lua state already has a runtime attached to it.
*/
#[must_use]
pub fn new(lua: &'lua Lua) -> Runtime<'lua> {
@ -56,6 +64,24 @@ impl<'lua> Runtime<'lua> {
let queue_defer = DeferredThreadQueue::new();
let error_callback = ThreadErrorCallback::default();
let status = Rc::new(Cell::new(Status::NotStarted));
assert!(
lua.app_data_ref::<SpawnedThreadQueue>().is_none(),
"{ERR_METADATA_ALREADY_ATTACHED}"
);
assert!(
lua.app_data_ref::<DeferredThreadQueue>().is_none(),
"{ERR_METADATA_ALREADY_ATTACHED}"
);
assert!(
lua.app_data_ref::<ThreadErrorCallback>().is_none(),
"{ERR_METADATA_ALREADY_ATTACHED}"
);
lua.set_app_data(queue_spawn.clone());
lua.set_app_data(queue_defer.clone());
lua.set_app_data(error_callback.clone());
Runtime {
lua,
queue_spawn,
@ -79,8 +105,16 @@ impl<'lua> Runtime<'lua> {
This callback will be called whenever a Lua thread errors.
Overwrites any previous error callback.
# Panics
Panics if the runtime is currently running.
*/
pub fn set_error_callback(&self, callback: impl Fn(LuaError) + Send + 'static) {
assert!(
!self.status().is_running(),
"{ERR_SET_CALLBACK_WHEN_RUNNING}"
);
self.error_callback.replace(callback);
}
@ -88,22 +122,19 @@ impl<'lua> Runtime<'lua> {
Clears the error callback for this runtime.
This will remove any current error callback, including default(s).
# Panics
Panics if the runtime is currently running.
*/
pub fn remove_error_callback(&self) {
assert!(
!self.status().is_running(),
"{ERR_SET_CALLBACK_WHEN_RUNNING}"
);
self.error_callback.clear();
}
/**
Creates a collection of lua functions that may be called to interact with the runtime.
# Errors
Errors when out of memory.
*/
pub fn create_functions(&self) -> LuaResult<Functions> {
Functions::new(self)
}
/**
Spawns a chunk / function / thread onto the runtime queue.
@ -166,6 +197,7 @@ impl<'lua> Runtime<'lua> {
Panics if the given Lua state already has a runtime attached to it.
*/
#[allow(clippy::too_many_lines)]
pub async fn run(&self) {
/*
Create new executors to use - note that we do not need create multiple executors
@ -178,30 +210,27 @@ impl<'lua> Runtime<'lua> {
We also use the main executor to drive the main loop below forward,
saving a tiny bit of processing from going on the Lua executor itself.
*/
let lua_exec = LocalExecutor::new();
let local_exec = LocalExecutor::new();
let main_exec = Arc::new(Executor::new());
let fut_queue = Rc::new(FuturesQueue::new());
/*
Store the main executor and queues in Lua, so that they may be used with LuaRuntimeExt.
Store the main executor and queue in Lua, so that they may be used with LuaRuntimeExt.
Also ensure we do not already have an executor or queues - these are definite user errors
and may happen if the user tries to run multiple runtimes on the same Lua state at once.
*/
assert!(
self.lua.app_data_ref::<Weak<Executor>>().is_none(),
self.lua.app_data_ref::<WeakArc<Executor>>().is_none(),
"{ERR_METADATA_ALREADY_ATTACHED}"
);
assert!(
self.lua.app_data_ref::<SpawnedThreadQueue>().is_none(),
"{ERR_METADATA_ALREADY_ATTACHED}"
);
assert!(
self.lua.app_data_ref::<DeferredThreadQueue>().is_none(),
self.lua.app_data_ref::<WeakRc<FuturesQueue>>().is_none(),
"{ERR_METADATA_ALREADY_ATTACHED}"
);
self.lua.set_app_data(Arc::downgrade(&main_exec));
self.lua.set_app_data(self.queue_spawn.clone());
self.lua.set_app_data(self.queue_defer.clone());
self.lua.set_app_data(Rc::downgrade(&fut_queue.clone()));
/*
Manually tick the Lua executor, while running under the main executor.
@ -209,7 +238,8 @@ impl<'lua> Runtime<'lua> {
1. A Lua thread is available to run on the spawned queue
2. A Lua thread is available to run on the deferred queue
3. Task(s) scheduled on the Lua executor have made progress and should be polled again
3. A new thread-local future is available to run on the local executor
4. Task(s) scheduled on the Lua executor have made progress and should be polled again
This ordering is vital to ensure that we don't accidentally exit the main loop
when there are new Lua threads to enqueue and potentially more work to be done.
@ -219,7 +249,7 @@ impl<'lua> Runtime<'lua> {
// NOTE: Thread may have been cancelled from Lua
// before we got here, so we need to check it again
if thread.status() == LuaThreadStatus::Resumable {
lua_exec
local_exec
.spawn(async move {
if let Err(e) = run_until_yield(thread, args).await {
self.error_callback.call(&e);
@ -232,22 +262,24 @@ impl<'lua> Runtime<'lua> {
loop {
let fut_spawn = self.queue_spawn.wait_for_item(); // 1
let fut_defer = self.queue_defer.wait_for_item(); // 2
let fut_futs = fut_queue.wait_for_item(); // 3
// 3
// 4
let mut num_processed = 0;
let span_tick = tracing::debug_span!("tick_executor");
let fut_tick = async {
lua_exec.tick().await;
local_exec.tick().await;
// NOTE: Try to do as much work as possible instead of just a single tick()
num_processed += 1;
while lua_exec.try_tick() {
while local_exec.try_tick() {
num_processed += 1;
}
};
// 1 + 2 + 3
// 1 + 2 + 3 + 4
fut_spawn
.or(fut_defer)
.or(fut_futs)
.or(fut_tick.instrument(span_tick.or_current()))
.await;
@ -271,9 +303,19 @@ impl<'lua> Runtime<'lua> {
tracing::trace!(num_spawned, num_deferred, "tasks_spawned");
}
// Process spawned futures
let mut num_futs = 0;
for fut in fut_queue.drain_items() {
local_exec.spawn(fut).detach();
num_futs += 1;
}
if num_futs > 0 {
tracing::trace!(num_futs, "futures_spawned");
}
// Empty executor = we didn't spawn any new Lua tasks
// above, and there are no remaining tasks to run later
if lua_exec.is_empty() {
if local_exec.is_empty() {
break;
}
}
@ -291,95 +333,24 @@ impl<'lua> Runtime<'lua> {
// Clean up
self.lua
.remove_app_data::<Weak<Executor>>()
.remove_app_data::<WeakArc<Executor>>()
.expect(ERR_METADATA_REMOVED);
self.lua
.remove_app_data::<WeakRc<FuturesQueue>>()
.expect(ERR_METADATA_REMOVED);
}
}
impl Drop for Runtime<'_> {
fn drop(&mut self) {
self.lua
.remove_app_data::<SpawnedThreadQueue>()
.expect(ERR_METADATA_REMOVED);
self.lua
.remove_app_data::<DeferredThreadQueue>()
.expect(ERR_METADATA_REMOVED);
}
}
/**
A collection of lua functions that may be called to interact with a [`Runtime`].
*/
pub struct Functions<'lua> {
/**
Spawns a function / thread onto the runtime queue.
Resumes once instantly, and runs until first yield.
Adds to the queue if not completed.
*/
pub spawn: LuaFunction<'lua>,
/**
Defers a function / thread onto the runtime queue.
Does not resume instantly, only adds to the queue.
*/
pub defer: LuaFunction<'lua>,
/**
Cancels a function / thread, removing it from the queue.
*/
pub cancel: LuaFunction<'lua>,
}
impl<'lua> Functions<'lua> {
fn new(rt: &Runtime<'lua>) -> LuaResult<Self> {
let error_callback = rt.error_callback.clone();
let spawn_queue = rt.queue_spawn.clone();
let spawn = rt.lua.create_function(
move |lua, (tof, args): (LuaThreadOrFunction, LuaMultiValue)| {
let thread = tof.into_thread(lua)?;
if thread.status() == LuaThreadStatus::Resumable {
// NOTE: We need to resume the thread once instantly for correct behavior,
// and only if we get the pending value back we can spawn to async executor
match thread.resume::<_, LuaValue>(args.clone()) {
Ok(v) => {
if v.as_light_userdata()
.map(|l| l == Lua::poll_pending())
.unwrap_or_default()
{
spawn_queue.push_item(lua, &thread, args)?;
}
}
Err(e) => {
error_callback.call(&e);
}
};
}
Ok(thread)
},
)?;
let defer_queue = rt.queue_defer.clone();
let defer = rt.lua.create_function(
move |lua, (tof, args): (LuaThreadOrFunction, LuaMultiValue)| {
let thread = tof.into_thread(lua)?;
if thread.status() == LuaThreadStatus::Resumable {
defer_queue.push_item(lua, &thread, args)?;
}
Ok(thread)
},
)?;
let close = rt
.lua
.globals()
.get::<_, LuaTable>("coroutine")?
.get::<_, LuaFunction>("close")?;
let close_key = rt.lua.create_registry_value(close)?;
let cancel = rt.lua.create_function(move |lua, thread: LuaThread| {
let close: LuaFunction = lua.registry_value(&close_key)?;
match close.call(thread) {
Err(LuaError::CoroutineInactive) | Ok(()) => Ok(()),
Err(e) => Err(e),
}
})?;
Ok(Self {
spawn,
defer,
cancel,
})
self.lua
.remove_app_data::<ThreadErrorCallback>()
.expect(ERR_METADATA_REMOVED);
}
}

View file

@ -1,6 +1,7 @@
#![allow(unused_imports)]
#![allow(clippy::missing_errors_doc)]
use std::{future::Future, sync::Weak};
use std::{future::Future, rc::Weak as WeakRc, sync::Weak as WeakArc};
use mlua::prelude::*;
@ -8,7 +9,8 @@ use async_executor::{Executor, Task};
use crate::{
handle::Handle,
queue::{DeferredThreadQueue, SpawnedThreadQueue},
queue::{DeferredThreadQueue, FuturesQueue, SpawnedThreadQueue},
runtime::Runtime,
};
/**
@ -58,10 +60,7 @@ where
}
/**
Trait for scheduling Lua threads and spawning `Send` futures on the current executor.
For spawning `!Send` futures on the same local executor as a [`Lua`]
VM instance, [`Lua::create_async_function`] should be used instead.
Trait for scheduling Lua threads and spawning futures on the current executor.
*/
pub trait LuaRuntimeExt<'lua> {
/**
@ -115,7 +114,7 @@ pub trait LuaRuntimeExt<'lua> {
lua.globals().set(
"spawnBackgroundTask",
lua.create_async_function(|lua, ()| async move {
lua.spawn_future(async move {
lua.spawn(async move {
println!("Hello from background task!");
}).await;
Ok(())
@ -133,6 +132,47 @@ pub trait LuaRuntimeExt<'lua> {
[`Runtime`]: crate::Runtime
*/
fn spawn<T: Send + 'static>(&self, fut: impl Future<Output = T> + Send + 'static) -> Task<T>;
/**
Spawns the given thread-local future on the current executor.
Note that this future will run detached and always to completion,
preventing the [`Runtime`] was spawned on from completing until done.
# Panics
Panics if called outside of a running [`Runtime`].
# Example usage
```rust
use async_io::block_on;
use mlua::prelude::*;
use mlua_luau_runtime::*;
fn main() -> LuaResult<()> {
let lua = Lua::new();
lua.globals().set(
"spawnLocalTask",
lua.create_async_function(|lua, ()| async move {
lua.spawn_local(async move {
println!("Hello from local task!");
});
Ok(())
})?
)?;
let rt = Runtime::new(&lua);
rt.push_thread_front(lua.load("spawnLocalTask()"), ());
block_on(rt.run());
Ok(())
}
```
*/
fn spawn_local(&self, fut: impl Future<Output = ()> + 'static);
}
impl<'lua> LuaRuntimeExt<'lua> for Lua {
@ -160,11 +200,21 @@ impl<'lua> LuaRuntimeExt<'lua> for Lua {
fn spawn<T: Send + 'static>(&self, fut: impl Future<Output = T> + Send + 'static) -> Task<T> {
let exec = self
.app_data_ref::<Weak<Executor>>()
.app_data_ref::<WeakArc<Executor>>()
.expect("futures can only be spawned within a runtime")
.upgrade()
.expect("executor was dropped");
tracing::trace!("spawning future on executor");
exec.spawn(fut)
}
fn spawn_local(&self, fut: impl Future<Output = ()> + 'static) {
let queue = self
.app_data_ref::<WeakRc<FuturesQueue>>()
.expect("futures can only be spawned within a runtime")
.upgrade()
.expect("executor was dropped");
tracing::trace!("spawning local future on executor");
queue.push_item(fut);
}
}