Initial barebones scheduler implementation and interface

This commit is contained in:
Filip Tibell 2023-08-16 20:38:05 -05:00
parent c80e504283
commit 6757e1a1a8
8 changed files with 381 additions and 14 deletions

View file

@ -1,8 +1,11 @@
use std::process::ExitCode;
use std::{process::ExitCode, sync::Arc};
use mlua::prelude::*;
mod error;
mod scheduler;
use self::scheduler::Scheduler;
pub use error::LuneError;
@ -38,16 +41,14 @@ impl Lune {
script_name: impl AsRef<str>,
script_contents: impl AsRef<[u8]>,
) -> Result<ExitCode, LuneError> {
self.run_inner(script_name, script_contents)
.await
.map_err(LuneError::from)
}
let lua = Arc::new(Lua::new());
let sched = Scheduler::new(Arc::clone(&lua));
async fn run_inner(
&self,
_script_name: impl AsRef<str>,
_script_contents: impl AsRef<[u8]>,
) -> LuaResult<ExitCode> {
Ok(ExitCode::SUCCESS)
let main = lua
.load(script_contents.as_ref())
.set_name(script_name.as_ref());
sched.push_front(main, ())?;
Ok(sched.run_to_completion().await)
}
}

View file

@ -0,0 +1,71 @@
use std::process::ExitCode;
use mlua::prelude::*;
use super::SchedulerImpl;
impl SchedulerImpl {
/**
Runs all lua threads to completion, gathering any results they produce.
*/
fn run_threads(&self) -> Vec<LuaResult<()>> {
let mut results = Vec::new();
while let Some((thread, args)) = self
.pop_thread()
.expect("Failed to pop thread from scheduler")
{
let res = thread.resume(args);
self.state.add_resumption();
if let Err(e) = &res {
self.state.add_error();
eprintln!("{e}"); // TODO: Pretty print the lua error here
}
results.push(res);
if self.state.has_exit_code() {
break;
}
}
results
}
/**
Runs the scheduler to completion, both normal lua threads and futures.
This will emit lua output and errors to stdout and stderr.
*/
pub async fn run_to_completion(&self) -> ExitCode {
loop {
// 1. Run lua threads until exit or there are none left
let results = self.run_threads();
// 2. If we got a manual exit code from lua we should not continue
if self.state.has_exit_code() {
break;
}
// 3. Wait for the next future to complete, this may
// add more lua threads to run in the next iteration
// TODO: Implement this
// 4. If did not resume any lua threads, and we have no futures
// queued either, we have run the scheduler until completion
if results.is_empty() {
break;
}
}
if let Some(code) = self.state.exit_code() {
ExitCode::from(code)
} else if self.state.has_errored() {
ExitCode::FAILURE
} else {
ExitCode::SUCCESS
}
}
}

View file

@ -0,0 +1,70 @@
use mlua::prelude::*;
use super::{thread::SchedulerThread, traits::IntoLuaThread, SchedulerImpl};
impl<'lua> SchedulerImpl {
/**
Pops the next thread to run, from the front of the scheduler.
Returns `None` if there are no threads left to run.
*/
pub(super) fn pop_thread(
&'lua self,
) -> LuaResult<Option<(LuaThread<'lua>, LuaMultiValue<'lua>)>> {
match self
.threads
.try_borrow_mut()
.into_lua_err()
.context("Failed to borrow threads vec")?
.pop_front()
{
Some(thread) => {
let (thread, args) = thread.into_inner(&self.lua);
Ok(Some((thread, args)))
}
None => Ok(None),
}
}
/**
Schedules the `thread` to be resumed with the given `args`
right away, before any other currently scheduled threads.
*/
pub fn push_front(
&'lua self,
thread: impl IntoLuaThread<'lua>,
args: impl IntoLuaMulti<'lua>,
) -> LuaResult<()> {
let thread = thread.into_lua_thread(&self.lua)?;
let args = args.into_lua_multi(&self.lua)?;
self.threads
.try_borrow_mut()
.into_lua_err()
.context("Failed to borrow threads vec")?
.push_front(SchedulerThread::new(&self.lua, thread, args)?);
Ok(())
}
/**
Schedules the `thread` to be resumed with the given `args`
after all other current threads have been resumed.
*/
pub fn push_back(
&'lua self,
thread: impl IntoLuaThread<'lua>,
args: impl IntoLuaMulti<'lua>,
) -> LuaResult<()> {
let thread = thread.into_lua_thread(&self.lua)?;
let args = args.into_lua_multi(&self.lua)?;
self.threads
.try_borrow_mut()
.into_lua_err()
.context("Failed to borrow threads vec")?
.push_back(SchedulerThread::new(&self.lua, thread, args)?);
Ok(())
}
}

70
src/lune/scheduler/mod.rs Normal file
View file

@ -0,0 +1,70 @@
use std::{cell::RefCell, collections::VecDeque, ops::Deref, sync::Arc};
use mlua::prelude::*;
mod state;
mod thread;
mod traits;
mod impl_runner;
mod impl_threads;
use self::{state::SchedulerState, thread::SchedulerThread};
/**
Scheduler for Lua threads.
Can be cheaply cloned, and any clone will refer
to the same underlying scheduler and Lua struct.
*/
#[derive(Debug, Clone)]
pub struct Scheduler(Arc<SchedulerImpl>);
impl Scheduler {
/**
Creates a new scheduler for the given [`Lua`] struct.
*/
pub fn new(lua: Arc<Lua>) -> Self {
assert!(
lua.app_data_ref::<Self>().is_none() && lua.app_data_ref::<&Self>().is_none(),
"Only one scheduler may be created per Lua struct"
);
let inner = SchedulerImpl::new(Arc::clone(&lua));
let sched = Self(Arc::new(inner));
lua.set_app_data(sched.clone());
lua.set_interrupt(move |_| Ok(LuaVmState::Continue));
sched
}
}
impl Deref for Scheduler {
type Target = SchedulerImpl;
fn deref(&self) -> &Self::Target {
&self.0
}
}
/**
Implementation of scheduler for Lua threads.
Not meant to be used directly, use [`Scheduler`] instead.
*/
#[derive(Debug)]
pub struct SchedulerImpl {
lua: Arc<Lua>,
state: SchedulerState,
threads: RefCell<VecDeque<SchedulerThread>>,
}
impl SchedulerImpl {
fn new(lua: Arc<Lua>) -> Self {
Self {
lua,
state: SchedulerState::new(),
threads: RefCell::new(VecDeque::new()),
}
}
}

View file

@ -0,0 +1,38 @@
use std::cell::RefCell;
#[derive(Debug, Default)]
pub struct SchedulerState {
exit_code: RefCell<Option<u8>>,
num_resumptions: RefCell<usize>,
num_errors: RefCell<usize>,
}
impl SchedulerState {
pub fn new() -> Self {
Self::default()
}
pub fn add_resumption(&self) {
*self.num_resumptions.borrow_mut() += 1;
}
pub fn add_error(&self) {
*self.num_errors.borrow_mut() += 1;
}
pub fn has_errored(&self) -> bool {
*self.num_errors.borrow() > 0
}
pub fn exit_code(&self) -> Option<u8> {
*self.exit_code.borrow()
}
pub fn has_exit_code(&self) -> bool {
self.exit_code.borrow().is_some()
}
pub fn set_exit_code(&self, code: impl Into<u8>) {
self.exit_code.replace(Some(code.into()));
}
}

View file

@ -0,0 +1,58 @@
use mlua::prelude::*;
/**
Container for registry keys that point to a thread and thread arguments.
*/
#[derive(Debug)]
pub(super) struct SchedulerThread {
key_thread: LuaRegistryKey,
key_args: LuaRegistryKey,
}
impl SchedulerThread {
/**
Creates a new scheduler thread container from the given thread and arguments.
May fail if an allocation error occurs, is not fallible otherwise.
*/
pub(super) fn new<'lua>(
lua: &'lua Lua,
thread: LuaThread<'lua>,
args: LuaMultiValue<'lua>,
) -> LuaResult<Self> {
let args_vec = args.into_vec();
let key_thread = lua
.create_registry_value(thread)
.context("Failed to store value in registry")?;
let key_args = lua
.create_registry_value(args_vec)
.context("Failed to store value in registry")?;
Ok(Self {
key_thread,
key_args,
})
}
/**
Extracts the inner thread and args from the container.
*/
pub(super) fn into_inner(self, lua: &Lua) -> (LuaThread<'_>, LuaMultiValue<'_>) {
let thread = lua
.registry_value(&self.key_thread)
.expect("Failed to get thread from registry");
let args_vec = lua
.registry_value(&self.key_args)
.expect("Failed to get thread args from registry");
let args = LuaMultiValue::from_vec(args_vec);
lua.remove_registry_value(self.key_thread)
.expect("Failed to remove thread from registry");
lua.remove_registry_value(self.key_args)
.expect("Failed to remove thread args from registry");
(thread, args)
}
}

View file

@ -0,0 +1,59 @@
use mlua::prelude::*;
use super::Scheduler;
/**
Trait for extensions to the [`Lua`] struct, allowing
for access to the scheduler without having to import
it or handle registry / app data references manually.
*/
pub trait LuaSchedulerExt {
/**
Get a strong reference to the scheduler for the [`Lua`] struct.
Note that if this reference is not dropped, `Lua` can
not be dropped either because of the strong reference.
*/
fn scheduler(&self) -> Scheduler;
}
impl LuaSchedulerExt for Lua {
fn scheduler(&self) -> Scheduler {
self.app_data_ref::<Scheduler>()
.expect("Lua struct is missing scheduler")
.clone()
}
}
/**
Trait for any struct that can be turned into an [`LuaThread`]
and given to the scheduler, implemented for the following types:
- Lua threads ([`LuaThread`])
- Lua functions ([`LuaFunction`])
- Lua chunks ([`LuaChunk`])
*/
pub trait IntoLuaThread<'lua> {
/**
Converts the value into a lua thread.
*/
fn into_lua_thread(self, lua: &'lua Lua) -> LuaResult<LuaThread<'lua>>;
}
impl<'lua> IntoLuaThread<'lua> for LuaThread<'lua> {
fn into_lua_thread(self, _: &'lua Lua) -> LuaResult<LuaThread<'lua>> {
Ok(self)
}
}
impl<'lua> IntoLuaThread<'lua> for LuaFunction<'lua> {
fn into_lua_thread(self, lua: &'lua Lua) -> LuaResult<LuaThread<'lua>> {
lua.create_thread(self)
}
}
impl<'lua, 'a> IntoLuaThread<'lua> for LuaChunk<'lua, 'a> {
fn into_lua_thread(self, lua: &'lua Lua) -> LuaResult<LuaThread<'lua>> {
lua.create_thread(self.into_function()?)
}
}

View file

@ -1,5 +1,3 @@
local task = require("@lune/task")
-- Coroutines should return true, ret values OR false, error
local function pass()
@ -55,7 +53,9 @@ local success2 = coroutine.resume(thread2)
assert(success1 == false, "Coroutine resume on dead coroutines should return false")
assert(success2 == false, "Coroutine resume on dead coroutines should return false")
-- Wait should work inside native lua coroutines
-- Task library wait should work inside native lua coroutines
local task = require("@lune/task")
local flag: boolean = false
coroutine.resume(coroutine.create(function()