Do some debugging, fix scheduler not always exiting with the correct code/error

This commit is contained in:
Filip Tibell 2023-02-14 16:39:15 +01:00
parent b5f6e6da98
commit a9261f889f
No known key found for this signature in database
2 changed files with 117 additions and 31 deletions

View file

@ -125,20 +125,22 @@ impl Lune {
let exit_code = LocalSet::new() let exit_code = LocalSet::new()
.run_until(async move { .run_until(async move {
let mut got_error = false; let mut got_error = false;
let mut result = sched.resume_queue().await; loop {
while !result.is_done() { let result = sched.resume_queue().await;
// println!("{result}");
if let Some(err) = result.get_lua_error() { if let Some(err) = result.get_lua_error() {
eprintln!("{}", pretty_format_luau_error(&err)); eprintln!("{}", pretty_format_luau_error(&err));
got_error = true; got_error = true;
} }
result = sched.resume_queue().await; if result.is_done() {
} if let Some(exit_code) = result.get_exit_code() {
if let Some(exit_code) = result.get_exit_code() { break exit_code;
exit_code } else if got_error {
} else if got_error { break ExitCode::FAILURE;
ExitCode::FAILURE } else {
} else { break ExitCode::SUCCESS;
ExitCode::SUCCESS }
}
} }
}) })
.await; .await;

View file

@ -118,8 +118,8 @@ pub struct TaskSchedulerResult {
num_instant: usize, num_instant: usize,
num_deferred: usize, num_deferred: usize,
num_futures: usize, num_futures: usize,
num_spawned: usize, num_background: usize,
num_total: usize, num_active: usize,
} }
impl TaskSchedulerResult { impl TaskSchedulerResult {
@ -136,8 +136,8 @@ impl TaskSchedulerResult {
num_instant: sched.task_queue_instant.try_lock().expect(MESSAGE).len(), num_instant: sched.task_queue_instant.try_lock().expect(MESSAGE).len(),
num_deferred: sched.task_queue_deferred.try_lock().expect(MESSAGE).len(), num_deferred: sched.task_queue_deferred.try_lock().expect(MESSAGE).len(),
num_futures: sched.futures.try_lock().expect(MESSAGE).len(), num_futures: sched.futures.try_lock().expect(MESSAGE).len(),
num_spawned: sched.futures_counter.load(Ordering::Relaxed), num_background: sched.futures_in_background.load(Ordering::Relaxed),
num_total: sched.tasks.try_lock().expect(MESSAGE).len(), num_active: sched.tasks.try_lock().expect(MESSAGE).len(),
} }
} }
@ -169,16 +169,7 @@ impl TaskSchedulerResult {
*/ */
#[allow(dead_code)] #[allow(dead_code)]
pub fn is_busy(&self) -> bool { pub fn is_busy(&self) -> bool {
self.num_total > 0 self.num_active > 0
}
/**
Returns `true` if the task scheduler is done,
meaning it has no lua threads left to run, and
no spawned tasks are running in the background.
*/
pub fn is_done(&self) -> bool {
self.num_total == 0 && self.num_spawned == 0
} }
/** /**
@ -187,7 +178,100 @@ impl TaskSchedulerResult {
*/ */
#[allow(dead_code)] #[allow(dead_code)]
pub fn is_background(&self) -> bool { pub fn is_background(&self) -> bool {
self.num_total == 0 && self.num_spawned > 0 self.num_active == 0 && self.num_background > 0
}
/**
Returns `true` if the task scheduler is done,
meaning it has no lua threads left to run, and
no spawned tasks are running in the background.
*/
pub fn is_done(&self) -> bool {
self.num_active == 0 && self.num_background == 0
}
}
impl fmt::Display for TaskSchedulerResult {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let status = if self.is_busy() {
"Busy"
} else if self.is_background() {
"Background"
} else {
"Done"
};
let code = match self.get_exit_code() {
Some(code) => format!("{code:?}"),
None => "-".to_string(),
};
let err = match self.get_lua_error() {
Some(e) => format!("{e:?}")
.as_bytes()
.chunks(42) // Kinda arbitrary but should fit in most terminals
.enumerate()
.map(|(idx, buf)| {
format!(
"{}{}{}{}{}",
if idx == 0 { "" } else { "\n" },
if idx == 0 {
"".to_string()
} else {
" ".repeat(16)
},
if idx == 0 { "" } else { "" },
String::from_utf8_lossy(buf),
if buf.len() == 42 { "" } else { "" },
)
})
.collect::<String>(),
None => "-".to_string(),
};
let parts = vec![
format!("Status │ {status}"),
format!("Tasks active │ {}", self.num_active),
format!("Tasks background │ {}", self.num_background),
format!("Status code │ {code}"),
format!("Lua error │ {err}"),
];
let lengths = parts
.iter()
.map(|part| {
part.lines()
.next()
.unwrap()
.trim_end_matches("")
.chars()
.count()
})
.collect::<Vec<_>>();
let longest = &parts
.iter()
.enumerate()
.fold(0, |acc, (index, _)| acc.max(lengths[index]));
let sep = "".repeat(longest + 2);
writeln!(f, "┌{}┐", &sep)?;
for (index, part) in parts.iter().enumerate() {
writeln!(
f,
"│ {}{} │",
part.trim_end_matches(""),
" ".repeat(
longest
- part
.lines()
.last()
.unwrap()
.trim_end_matches("")
.chars()
.count()
)
)?;
if index < parts.len() - 1 {
writeln!(f, "┝{}┥", &sep)?;
}
}
writeln!(f, "└{}┘", &sep)?;
Ok(())
} }
} }
@ -206,7 +290,7 @@ pub struct TaskScheduler<'fut> {
futures: Arc<AsyncMutex<FuturesUnordered<TaskFuture<'fut>>>>, futures: Arc<AsyncMutex<FuturesUnordered<TaskFuture<'fut>>>>,
futures_tx: mpsc::UnboundedSender<TaskSchedulerRegistrationMessage>, futures_tx: mpsc::UnboundedSender<TaskSchedulerRegistrationMessage>,
futures_rx: Arc<AsyncMutex<mpsc::UnboundedReceiver<TaskSchedulerRegistrationMessage>>>, futures_rx: Arc<AsyncMutex<mpsc::UnboundedReceiver<TaskSchedulerRegistrationMessage>>>,
futures_counter: AtomicUsize, futures_in_background: AtomicUsize,
task_queue_instant: TaskSchedulerQueue, task_queue_instant: TaskSchedulerQueue,
task_queue_deferred: TaskSchedulerQueue, task_queue_deferred: TaskSchedulerQueue,
exit_code_set: AtomicBool, exit_code_set: AtomicBool,
@ -227,7 +311,7 @@ impl<'fut> TaskScheduler<'fut> {
futures: Arc::new(AsyncMutex::new(FuturesUnordered::new())), futures: Arc::new(AsyncMutex::new(FuturesUnordered::new())),
futures_tx: tx, futures_tx: tx,
futures_rx: Arc::new(AsyncMutex::new(rx)), futures_rx: Arc::new(AsyncMutex::new(rx)),
futures_counter: AtomicUsize::new(0), futures_in_background: AtomicUsize::new(0),
task_queue_instant: Arc::new(Mutex::new(VecDeque::new())), task_queue_instant: Arc::new(Mutex::new(VecDeque::new())),
task_queue_deferred: Arc::new(Mutex::new(VecDeque::new())), task_queue_deferred: Arc::new(Mutex::new(VecDeque::new())),
exit_code_set: AtomicBool::new(false), exit_code_set: AtomicBool::new(false),
@ -784,11 +868,11 @@ impl<'fut> TaskScheduler<'fut> {
if let Some(message) = message_opt { if let Some(message) = message_opt {
match message { match message {
TaskSchedulerRegistrationMessage::Spawned => { TaskSchedulerRegistrationMessage::Spawned => {
self.futures_counter.fetch_add(1, Ordering::Relaxed); self.futures_in_background.fetch_add(1, Ordering::Relaxed);
TaskSchedulerResult::new(self) TaskSchedulerResult::new(self)
} }
TaskSchedulerRegistrationMessage::Terminated(result) => { TaskSchedulerRegistrationMessage::Terminated(result) => {
let prev = self.futures_counter.fetch_sub(1, Ordering::Relaxed); let prev = self.futures_in_background.fetch_sub(1, Ordering::Relaxed);
if prev == 0 { if prev == 0 {
panic!( panic!(
r#" r#"
@ -835,7 +919,7 @@ impl<'fut> TaskScheduler<'fut> {
self.resume_next_queue_task(TaskKind::Instant, None) self.resume_next_queue_task(TaskKind::Instant, None)
} else if current.num_deferred > 0 { } else if current.num_deferred > 0 {
self.resume_next_queue_task(TaskKind::Deferred, None) self.resume_next_queue_task(TaskKind::Deferred, None)
} else if current.num_futures > 0 && current.num_spawned > 0 { } else if current.num_futures > 0 && current.num_background > 0 {
// Futures, spawned background tasks // Futures, spawned background tasks
tokio::select! { tokio::select! {
result = self.resume_next_queue_future() => result, result = self.resume_next_queue_future() => result,
@ -844,7 +928,7 @@ impl<'fut> TaskScheduler<'fut> {
} else if current.num_futures > 0 { } else if current.num_futures > 0 {
// Futures // Futures
self.resume_next_queue_future().await self.resume_next_queue_future().await
} else if current.num_spawned > 0 { } else if current.num_background > 0 {
// Only spawned background tasks, these may then // Only spawned background tasks, these may then
// spawn new lua tasks and "wake up" the scheduler // spawn new lua tasks and "wake up" the scheduler
self.receive_next_message().await self.receive_next_message().await