Fix wait_for_thread and remove debugging for it

This commit is contained in:
Filip Tibell 2023-08-20 14:56:58 -05:00
parent 98bb475afe
commit 3cd7a8945c
4 changed files with 34 additions and 24 deletions

View file

@ -191,7 +191,6 @@ impl<'lua> RequireContext<'lua> {
// Schedule the thread to run, wait for it to finish running
let thread_id = sched.push_back(file_thread, ())?;
println!("Waiting for thread with id {thread_id:?}");
let thread_res = sched.wait_for_thread(thread_id).await;
// Return the result of the thread, storing any lua value(s) in the registry

View file

@ -42,7 +42,6 @@ where
// Resume the thread, ensuring that the schedulers
// current thread id is set correctly for error catching
println!("Resuming thread with id {thread_id:?}");
self.state.set_current_thread_id(Some(thread_id));
let res = thread.resume::<_, LuaMultiValue>(args);
self.state.set_current_thread_id(None);
@ -59,19 +58,23 @@ where
// If the thread has finished running completely,
// send results of final resume to any listeners
if let Some(sender) = self.thread_senders.borrow_mut().remove(&thread_id) {
if sender.receiver_count() > 0 {
let stored = match res {
Err(e) => Err(e),
Ok(v) => Ok(Arc::new(
self.lua
.create_registry_value(v.into_vec())
.expect("Failed to store return values in registry"),
)),
};
sender
.send(stored)
.expect("Failed to broadcast return values of thread");
if thread.status() != LuaThreadStatus::Resumable {
// NOTE: Threads that were spawned to resume
// with an error will not have a result sender
if let Some(sender) = self.thread_senders.borrow_mut().remove(&thread_id) {
if sender.receiver_count() > 0 {
let stored = match res {
Err(e) => Err(e),
Ok(v) => Ok(Arc::new(
self.lua
.create_registry_value(v.into_vec())
.expect("Failed to store return values in registry"),
)),
};
sender
.send(stored)
.expect("Failed to broadcast return values of thread");
}
}
}

View file

@ -86,9 +86,14 @@ where
.into_lua_err()
.context("Failed to borrow threads vec")?
.push_front(thread);
// NOTE: We might be resuming the same thread several times and
// pushing it to the scheduler several times before it is done,
// and we should only ever create one result sender per thread
self.thread_senders
.borrow_mut()
.insert(thread_id, SchedulerThreadSender::new(1));
.entry(thread_id)
.or_insert_with(|| SchedulerThreadSender::new(1));
// NOTE: We might be resuming futures, need to signal that a
// new lua thread is ready to break out of futures resumption
@ -119,9 +124,14 @@ where
.into_lua_err()
.context("Failed to borrow threads vec")?
.push_back(thread);
// NOTE: We might be resuming the same thread several times and
// pushing it to the scheduler several times before it is done,
// and we should only ever create one result sender per thread
self.thread_senders
.borrow_mut()
.insert(thread_id, SchedulerThreadSender::new(1));
.entry(thread_id)
.or_insert_with(|| SchedulerThreadSender::new(1));
// NOTE: We might be resuming futures, need to signal that a
// new lua thread is ready to break out of futures resumption
@ -146,7 +156,11 @@ where
.expect("Tried to wait for thread that is not queued");
sender.subscribe()
};
match recv.recv().await.expect("Failed to receive thread result") {
let res = match recv.recv().await {
Err(_) => panic!("Sender was dropped while waiting for {thread_id:?}"),
Ok(r) => r,
};
match res {
Err(e) => Err(e),
Ok(k) => {
let vals = self

View file

@ -1,15 +1,9 @@
local task = require("@lune/task")
print("Requiring 1")
local module1 = require("./modules/async")
print("Required 1")
print("Requiring 2")
local module2 = require("./modules/async")
print("Required 2")
print("Waiting")
task.wait(1)
print("Waited")
assert(type(module1) == "table", "Required module1 did not return a table")
assert(module1.Foo == "Bar", "Required module1 did not contain correct values")