From 3cd7a8945cf0b5fdb0408efb2683439603cfd8a0 Mon Sep 17 00:00:00 2001 From: Filip Tibell Date: Sun, 20 Aug 2023 14:56:58 -0500 Subject: [PATCH] Fix wait_for_thread and remove debugging for it --- src/lune/globals/require/context.rs | 1 - src/lune/scheduler/impl_runner.rs | 31 +++++++++++++---------- src/lune/scheduler/impl_threads.rs | 20 ++++++++++++--- tests/require/tests/async_sequential.luau | 6 ----- 4 files changed, 34 insertions(+), 24 deletions(-) diff --git a/src/lune/globals/require/context.rs b/src/lune/globals/require/context.rs index 83cc458..8468803 100644 --- a/src/lune/globals/require/context.rs +++ b/src/lune/globals/require/context.rs @@ -191,7 +191,6 @@ impl<'lua> RequireContext<'lua> { // Schedule the thread to run, wait for it to finish running let thread_id = sched.push_back(file_thread, ())?; - println!("Waiting for thread with id {thread_id:?}"); let thread_res = sched.wait_for_thread(thread_id).await; // Return the result of the thread, storing any lua value(s) in the registry diff --git a/src/lune/scheduler/impl_runner.rs b/src/lune/scheduler/impl_runner.rs index 8e66333..78cc9d3 100644 --- a/src/lune/scheduler/impl_runner.rs +++ b/src/lune/scheduler/impl_runner.rs @@ -42,7 +42,6 @@ where // Resume the thread, ensuring that the schedulers // current thread id is set correctly for error catching - println!("Resuming thread with id {thread_id:?}"); self.state.set_current_thread_id(Some(thread_id)); let res = thread.resume::<_, LuaMultiValue>(args); self.state.set_current_thread_id(None); @@ -59,19 +58,23 @@ where // If the thread has finished running completely, // send results of final resume to any listeners - if let Some(sender) = self.thread_senders.borrow_mut().remove(&thread_id) { - if sender.receiver_count() > 0 { - let stored = match res { - Err(e) => Err(e), - Ok(v) => Ok(Arc::new( - self.lua - .create_registry_value(v.into_vec()) - .expect("Failed to store return values in registry"), - )), - }; - sender - .send(stored) - .expect("Failed to broadcast return values of thread"); + if thread.status() != LuaThreadStatus::Resumable { + // NOTE: Threads that were spawned to resume + // with an error will not have a result sender + if let Some(sender) = self.thread_senders.borrow_mut().remove(&thread_id) { + if sender.receiver_count() > 0 { + let stored = match res { + Err(e) => Err(e), + Ok(v) => Ok(Arc::new( + self.lua + .create_registry_value(v.into_vec()) + .expect("Failed to store return values in registry"), + )), + }; + sender + .send(stored) + .expect("Failed to broadcast return values of thread"); + } } } diff --git a/src/lune/scheduler/impl_threads.rs b/src/lune/scheduler/impl_threads.rs index 52cbe12..37be2fe 100644 --- a/src/lune/scheduler/impl_threads.rs +++ b/src/lune/scheduler/impl_threads.rs @@ -86,9 +86,14 @@ where .into_lua_err() .context("Failed to borrow threads vec")? .push_front(thread); + + // NOTE: We might be resuming the same thread several times and + // pushing it to the scheduler several times before it is done, + // and we should only ever create one result sender per thread self.thread_senders .borrow_mut() - .insert(thread_id, SchedulerThreadSender::new(1)); + .entry(thread_id) + .or_insert_with(|| SchedulerThreadSender::new(1)); // NOTE: We might be resuming futures, need to signal that a // new lua thread is ready to break out of futures resumption @@ -119,9 +124,14 @@ where .into_lua_err() .context("Failed to borrow threads vec")? .push_back(thread); + + // NOTE: We might be resuming the same thread several times and + // pushing it to the scheduler several times before it is done, + // and we should only ever create one result sender per thread self.thread_senders .borrow_mut() - .insert(thread_id, SchedulerThreadSender::new(1)); + .entry(thread_id) + .or_insert_with(|| SchedulerThreadSender::new(1)); // NOTE: We might be resuming futures, need to signal that a // new lua thread is ready to break out of futures resumption @@ -146,7 +156,11 @@ where .expect("Tried to wait for thread that is not queued"); sender.subscribe() }; - match recv.recv().await.expect("Failed to receive thread result") { + let res = match recv.recv().await { + Err(_) => panic!("Sender was dropped while waiting for {thread_id:?}"), + Ok(r) => r, + }; + match res { Err(e) => Err(e), Ok(k) => { let vals = self diff --git a/tests/require/tests/async_sequential.luau b/tests/require/tests/async_sequential.luau index 17e0b35..1d2b39f 100644 --- a/tests/require/tests/async_sequential.luau +++ b/tests/require/tests/async_sequential.luau @@ -1,15 +1,9 @@ local task = require("@lune/task") -print("Requiring 1") local module1 = require("./modules/async") -print("Required 1") -print("Requiring 2") local module2 = require("./modules/async") -print("Required 2") -print("Waiting") task.wait(1) -print("Waited") assert(type(module1) == "table", "Required module1 did not return a table") assert(module1.Foo == "Bar", "Required module1 did not contain correct values")