Fix internal task scheduler counter bug, do some cleanup

This commit is contained in:
Filip Tibell 2023-02-21 11:45:09 +01:00
parent 8fd39fcf9d
commit 0737c3254e
No known key found for this signature in database
8 changed files with 54 additions and 54 deletions

View file

@ -71,6 +71,9 @@ impl LuaAsyncExt for &'static Lua {
Creates a special async function that waits the Creates a special async function that waits the
desired amount of time, inheriting the guid of the desired amount of time, inheriting the guid of the
current thread / task for proper cancellation. current thread / task for proper cancellation.
This will yield the lua thread calling the function until the
desired time has passed and the scheduler resumes the thread.
*/ */
fn create_waiter_function<'lua>(self) -> LuaResult<LuaFunction<'lua>> { fn create_waiter_function<'lua>(self) -> LuaResult<LuaFunction<'lua>> {
let async_env_yield: LuaFunction = self.named_registry_value("co.yield")?; let async_env_yield: LuaFunction = self.named_registry_value("co.yield")?;

View file

@ -9,8 +9,8 @@ use tokio::time::{sleep, Instant};
use crate::lua::task::TaskKind; use crate::lua::task::TaskKind;
use super::super::{ use super::super::{
async_handle::TaskSchedulerAsyncHandle, message::TaskSchedulerMessage, scheduler::TaskReference, scheduler::TaskScheduler, scheduler_handle::TaskSchedulerAsyncHandle,
scheduler::TaskReference, scheduler::TaskScheduler, scheduler_message::TaskSchedulerMessage,
}; };
/* /*
@ -120,7 +120,6 @@ impl<'fut> TaskSchedulerAsyncExt<'fut> for TaskScheduler<'fut> {
.futures .futures
.try_lock() .try_lock()
.expect("Tried to add future to queue during futures resumption"); .expect("Tried to add future to queue during futures resumption");
self.futures_count.set(self.futures_count.get() + 1);
futs.push(Box::pin(async move { futs.push(Box::pin(async move {
let before = Instant::now(); let before = Instant::now();
sleep(Duration::from_secs_f64( sleep(Duration::from_secs_f64(

View file

@ -7,7 +7,9 @@ use mlua::prelude::*;
use futures_util::StreamExt; use futures_util::StreamExt;
use tokio::time::sleep; use tokio::time::sleep;
use super::super::{message::TaskSchedulerMessage, result::TaskSchedulerState, TaskScheduler}; use super::super::{
scheduler_message::TaskSchedulerMessage, scheduler_state::TaskSchedulerState, TaskScheduler,
};
/* /*
@ -30,19 +32,21 @@ pub trait TaskSchedulerResumeExt {
#[async_trait(?Send)] #[async_trait(?Send)]
impl TaskSchedulerResumeExt for TaskScheduler<'_> { impl TaskSchedulerResumeExt for TaskScheduler<'_> {
/** /**
Awaits the next background task registration Resumes the task scheduler queue.
message, if any messages exist in the queue.
This is a no-op if there are no background tasks left running This will run any spawned or deferred Lua tasks in a blocking manner.
and / or the background task messages channel was closed.
Once all spawned and / or deferred Lua tasks have finished running,
this will process delayed tasks, waiting tasks, and native Rust
futures concurrently, awaiting the first one to be ready for resumption.
*/ */
async fn resume_queue(&self) -> TaskSchedulerState { async fn resume_queue(&self) -> TaskSchedulerState {
let current = TaskSchedulerState::new(self); let current = TaskSchedulerState::new(self);
let result = if current.has_blocking_tasks() { let result = if current.num_blocking > 0 {
// 1. Blocking tasks // 1. Blocking tasks
resume_next_blocking_task(self, None) resume_next_blocking_task(self, None)
} else if current.has_future_tasks() || current.has_background_tasks() { } else if current.num_futures > 0 || current.num_background > 0 {
// 2. Async + background tasks // 2. Async and/or background tasks
tokio::select! { tokio::select! {
result = resume_next_async_task(self) => result, result = resume_next_async_task(self) => result,
result = receive_next_message(self) => result, result = receive_next_message(self) => result,
@ -111,11 +115,6 @@ async fn resume_next_async_task(scheduler: &TaskScheduler<'_>) -> TaskSchedulerS
}; };
// The future might not return a reference that it wants to resume // The future might not return a reference that it wants to resume
if let Some(task) = task { if let Some(task) = task {
// Decrement the counter since the future has completed,
// meaning it has been removed from the futures queue
scheduler
.futures_count
.set(scheduler.futures_count.get() - 1);
// Promote this future task to a blocking task and resume it // Promote this future task to a blocking task and resume it
// right away, also taking care to not borrow mutably twice // right away, also taking care to not borrow mutably twice
// by dropping this guard before trying to resume it // by dropping this guard before trying to resume it
@ -127,13 +126,11 @@ async fn resume_next_async_task(scheduler: &TaskScheduler<'_>) -> TaskSchedulerS
} }
/** /**
Resumes the task scheduler queue. Awaits the next background task registration
message, if any messages exist in the queue.
This will run any spawned or deferred Lua tasks in a blocking manner. This is a no-op if there are no background tasks left running
and / or the background task messages channel was closed.
Once all spawned and / or deferred Lua tasks have finished running,
this will process delayed tasks, waiting tasks, and native Rust
futures concurrently, awaiting the first one to be ready for resumption.
*/ */
async fn receive_next_message(scheduler: &TaskScheduler<'_>) -> TaskSchedulerState { async fn receive_next_message(scheduler: &TaskScheduler<'_>) -> TaskSchedulerState {
let message_opt = { let message_opt = {

View file

@ -1,13 +1,14 @@
mod async_handle;
mod ext; mod ext;
mod message;
mod proxy; mod proxy;
mod result;
mod scheduler; mod scheduler;
mod scheduler_handle;
mod scheduler_message;
mod scheduler_state;
mod task_kind; mod task_kind;
mod task_reference; mod task_reference;
pub use async_handle::*;
pub use ext::*; pub use ext::*;
pub use proxy::*; pub use proxy::*;
pub use scheduler::*; pub use scheduler::*;
pub use scheduler_handle::*;
pub use scheduler_state::*;

View file

@ -11,7 +11,7 @@ use mlua::prelude::*;
use tokio::sync::{mpsc, Mutex as AsyncMutex}; use tokio::sync::{mpsc, Mutex as AsyncMutex};
use super::message::TaskSchedulerMessage; use super::scheduler_message::TaskSchedulerMessage;
pub use super::{task_kind::TaskKind, task_reference::TaskReference}; pub use super::{task_kind::TaskKind, task_reference::TaskReference};
type TaskFutureRets<'fut> = LuaResult<Option<LuaMultiValue<'fut>>>; type TaskFutureRets<'fut> = LuaResult<Option<LuaMultiValue<'fut>>>;
@ -20,6 +20,7 @@ type TaskFuture<'fut> = LocalBoxFuture<'fut, (Option<TaskReference>, TaskFutureR
/// A struct representing a task contained in the task scheduler /// A struct representing a task contained in the task scheduler
#[derive(Debug)] #[derive(Debug)]
pub struct Task { pub struct Task {
kind: TaskKind,
thread: LuaRegistryKey, thread: LuaRegistryKey,
args: LuaRegistryKey, args: LuaRegistryKey,
} }
@ -44,6 +45,7 @@ pub struct TaskScheduler<'fut> {
pub(super) exit_code: Cell<Option<ExitCode>>, pub(super) exit_code: Cell<Option<ExitCode>>,
// Blocking tasks // Blocking tasks
pub(super) tasks: RefCell<HashMap<TaskReference, Task>>, pub(super) tasks: RefCell<HashMap<TaskReference, Task>>,
pub(super) tasks_count: Cell<usize>,
pub(super) tasks_current: Cell<Option<TaskReference>>, pub(super) tasks_current: Cell<Option<TaskReference>>,
pub(super) tasks_queue_blocking: RefCell<VecDeque<TaskReference>>, pub(super) tasks_queue_blocking: RefCell<VecDeque<TaskReference>>,
pub(super) tasks_current_lua_error: Arc<RefCell<Option<LuaError>>>, pub(super) tasks_current_lua_error: Arc<RefCell<Option<LuaError>>>,
@ -72,6 +74,7 @@ impl<'fut> TaskScheduler<'fut> {
guid: Cell::new(0), guid: Cell::new(0),
exit_code: Cell::new(None), exit_code: Cell::new(None),
tasks: RefCell::new(HashMap::new()), tasks: RefCell::new(HashMap::new()),
tasks_count: Cell::new(0),
tasks_current: Cell::new(None), tasks_current: Cell::new(None),
tasks_queue_blocking: RefCell::new(VecDeque::new()), tasks_queue_blocking: RefCell::new(VecDeque::new()),
tasks_current_lua_error, tasks_current_lua_error,
@ -203,6 +206,7 @@ impl<'fut> TaskScheduler<'fut> {
let task_thread_key: LuaRegistryKey = self.lua.create_registry_value(thread)?; let task_thread_key: LuaRegistryKey = self.lua.create_registry_value(thread)?;
// Create the full task struct // Create the full task struct
let task = Task { let task = Task {
kind,
thread: task_thread_key, thread: task_thread_key,
args: task_args_key, args: task_args_key,
}; };
@ -217,6 +221,11 @@ impl<'fut> TaskScheduler<'fut> {
guid guid
}; };
let reference = TaskReference::new(kind, guid); let reference = TaskReference::new(kind, guid);
// Increment the corresponding task counter
match kind {
TaskKind::Future => self.futures_count.set(self.futures_count.get() + 1),
_ => self.tasks_count.set(self.tasks_count.get() + 1),
}
// Add the task to the scheduler // Add the task to the scheduler
{ {
let mut tasks = self.tasks.borrow_mut(); let mut tasks = self.tasks.borrow_mut();
@ -256,6 +265,11 @@ impl<'fut> TaskScheduler<'fut> {
.collect(); .collect();
for task_ref in &tasks_to_remove { for task_ref in &tasks_to_remove {
if let Some(task) = tasks.remove(task_ref) { if let Some(task) = tasks.remove(task_ref) {
// Decrement the corresponding task counter
match task.kind {
TaskKind::Future => self.futures_count.set(self.futures_count.get() - 1),
_ => self.tasks_count.set(self.tasks_count.get() - 1),
}
// NOTE: We need to close the thread here to // NOTE: We need to close the thread here to
// make 100% sure that nothing can resume it // make 100% sure that nothing can resume it
let close: LuaFunction = self.lua.named_registry_value("co.close")?; let close: LuaFunction = self.lua.named_registry_value("co.close")?;
@ -291,6 +305,11 @@ impl<'fut> TaskScheduler<'fut> {
None => return Ok(LuaMultiValue::new()), None => return Ok(LuaMultiValue::new()),
} }
}; };
// Decrement the corresponding task counter
match task.kind {
TaskKind::Future => self.futures_count.set(self.futures_count.get() - 1),
_ => self.tasks_count.set(self.tasks_count.get() - 1),
}
// Fetch and remove the thread to resume + its arguments // Fetch and remove the thread to resume + its arguments
let thread: LuaThread = self.lua.registry_value(&task.thread)?; let thread: LuaThread = self.lua.registry_value(&task.thread)?;
let args_opt_res = override_args.or_else(|| { let args_opt_res = override_args.or_else(|| {
@ -389,7 +408,6 @@ impl<'fut> TaskScheduler<'fut> {
.futures .futures
.try_lock() .try_lock()
.expect("Tried to add future to queue during futures resumption"); .expect("Tried to add future to queue during futures resumption");
self.futures_count.set(self.futures_count.get() + 1);
futs.push(Box::pin(async move { futs.push(Box::pin(async move {
let result = fut.await; let result = fut.await;
(Some(task_ref), result) (Some(task_ref), result)

View file

@ -4,7 +4,7 @@ use mlua::prelude::*;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use super::message::TaskSchedulerMessage; use super::scheduler_message::TaskSchedulerMessage;
/** /**
A handle to a registered asynchronous background task. A handle to a registered asynchronous background task.

View file

@ -6,27 +6,21 @@ use super::scheduler::TaskScheduler;
/// Struct representing the current state of the task scheduler /// Struct representing the current state of the task scheduler
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
#[must_use = "Scheduler state must be checked after every resumption"]
pub struct TaskSchedulerState { pub struct TaskSchedulerState {
lua_error: Option<LuaError>, pub(super) lua_error: Option<LuaError>,
exit_code: Option<ExitCode>, pub(super) exit_code: Option<ExitCode>,
num_blocking: usize, pub(super) num_blocking: usize,
num_futures: usize, pub(super) num_futures: usize,
num_background: usize, pub(super) num_background: usize,
} }
impl TaskSchedulerState { impl TaskSchedulerState {
pub(super) fn new(sched: &TaskScheduler) -> Self { pub(super) fn new(sched: &TaskScheduler) -> Self {
const MESSAGE: &str = "\
Failed to get lock on or borrow internal scheduler state!\
\nMake sure not to call during task scheduler resumption";
Self { Self {
lua_error: None, lua_error: None,
exit_code: sched.exit_code.get(), exit_code: sched.exit_code.get(),
num_blocking: sched num_blocking: sched.tasks_count.get(),
.tasks_queue_blocking
.try_borrow()
.expect(MESSAGE)
.len(),
num_futures: sched.futures_count.get(), num_futures: sched.futures_count.get(),
num_background: sched.futures_background_count.get(), num_background: sched.futures_background_count.get(),
} }
@ -38,18 +32,6 @@ impl TaskSchedulerState {
this this
} }
pub(super) fn has_blocking_tasks(&self) -> bool {
self.num_blocking > 0
}
pub(super) fn has_future_tasks(&self) -> bool {
self.num_futures > 0
}
pub(super) fn has_background_tasks(&self) -> bool {
self.num_background > 0
}
/** /**
Returns a clone of the error from Returns a clone of the error from
this task scheduler result, if any. this task scheduler result, if any.