Split scheduler futures into lua & background, improve async tests

This commit is contained in:
Filip Tibell 2023-08-20 18:54:34 -05:00
parent 9bb3854554
commit 59aef5c170
9 changed files with 140 additions and 76 deletions

View file

@ -172,7 +172,7 @@ where
Ok(bound) => bound, Ok(bound) => bound,
}; };
// Start up our web server // Start up our web server
sched.schedule_future(async move { sched.schedule_future_background(async move {
bound bound
.http1_only(true) // Web sockets can only use http1 .http1_only(true) // Web sockets can only use http1
.http1_keepalive(true) // Web sockets must be kept alive .http1_keepalive(true) // Web sockets must be kept alive

View file

@ -53,7 +53,7 @@ impl Service<Request<Body>> for NetServiceInner {
let sched = lua let sched = lua
.app_data_ref::<&Scheduler>() .app_data_ref::<&Scheduler>()
.expect("Lua struct is missing scheduler"); .expect("Lua struct is missing scheduler");
sched.schedule_future(async move { sched.schedule_future_background(async move {
// Create our new full websocket object, then // Create our new full websocket object, then
// schedule our handler to get called asap // schedule our handler to get called asap
let res = async move { let res = async move {

View file

@ -8,16 +8,38 @@ where
'lua: 'fut, 'lua: 'fut,
{ {
/** /**
Schedules a plain future to run whenever the scheduler is available. Checks if there are any futures to run, for
lua futures and background futures respectively.
*/ */
pub fn schedule_future<F>(&'fut self, fut: F) pub(super) fn has_futures(&self) -> (bool, bool) {
(
self.futures_lua
.try_lock()
.expect("Failed to lock lua futures for check")
.len()
> 0,
self.futures_background
.try_lock()
.expect("Failed to lock background futures for check")
.len()
> 0,
)
}
/**
Schedules a plain future to run in the background.
Note that this will keep the scheduler alive even
if the future does not spawn any new lua threads.
*/
pub fn schedule_future_background<F>(&self, fut: F)
where where
F: Future<Output = ()> + 'fut, F: Future<Output = ()> + 'static,
{ {
let futs = self let futs = self
.futures .futures_background
.try_lock() .try_lock()
.expect("TODO: Make scheduling futures during resumption work"); .expect("Failed to lock futures queue for background tasks");
futs.push(Box::pin(fut)) futs.push(Box::pin(fut))
} }
@ -36,7 +58,7 @@ where
F: Future<Output = LuaResult<FR>> + 'fut, F: Future<Output = LuaResult<FR>> + 'fut,
{ {
let thread = thread.into_lua_thread(self.lua)?; let thread = thread.into_lua_thread(self.lua)?;
let futs = self.futures.try_lock().expect( let futs = self.futures_lua.try_lock().expect(
"Failed to lock futures queue - \ "Failed to lock futures queue - \
can't schedule future lua threads during futures resumption", can't schedule future lua threads during futures resumption",
); );

View file

@ -88,42 +88,44 @@ where
/** /**
Runs futures until none are left or a future spawned a new lua thread. Runs futures until none are left or a future spawned a new lua thread.
Returns `true` if any future was resumed, `false` otherwise.
*/ */
async fn run_futures(&self) -> bool { async fn run_futures_lua(&self) {
let mut resumed_any = false; let mut futs = self
.futures_lua
loop { .try_lock()
let mut rx = self.futures_break_signal.subscribe(); .expect("Failed to lock lua futures for resumption");
let mut futs = self
.futures
.try_lock()
.expect("Failed to lock futures for resumption");
// Wait until we either manually break out of resumption or a future completes
tokio::select! {
res = rx.recv() => {
if res.is_err() {
panic!(
"Futures break signal was dropped but futures still remain - \
this may cause memory unsafety if a future accesses lua struct"
)
}
break;
},
res = futs.next() => match res {
Some(_) => resumed_any = true,
None => break,
},
}
while futs.next().await.is_some() {
if self.has_thread() { if self.has_thread() {
break; break;
} }
} }
}
resumed_any /**
Runs background futures until none are left or a future spawned a new lua thread.
*/
async fn run_futures_background(&self) {
let mut futs = self
.futures_background
.try_lock()
.expect("Failed to lock background futures for resumption");
while futs.next().await.is_some() {
if self.has_thread() {
break;
}
}
}
async fn run_futures(&self) {
let mut rx = self.futures_break_signal.subscribe();
tokio::select! {
_ = self.run_futures_lua() => {},
_ = self.run_futures_background() => {},
_ = rx.recv() => {},
};
} }
/** /**
@ -138,9 +140,8 @@ where
let _guard = set.enter(); let _guard = set.enter();
loop { loop {
// 1. Run lua threads until exit or there are none left, // 1. Run lua threads until exit or there are none left
// if any thread was resumed it may have spawned futures self.run_lua_threads();
let resumed_lua = self.run_lua_threads();
// 2. If we got a manual exit code from lua we should // 2. If we got a manual exit code from lua we should
// not try to wait for any pending futures to complete // not try to wait for any pending futures to complete
@ -148,13 +149,20 @@ where
break; break;
} }
// 3. Keep resuming futures until we get a new lua thread to // 3. Keep resuming futures until there are no futures left to
// resume, or until we don't have any futures left to wait for // resume, or until we manually break out of resumption for any
let resumed_fut = self.run_futures().await; // reason, this may be because a future spawned a new lua thread
self.run_futures().await;
// 4. If we did not resume any lua threads, and we have no futures // 4. Once again, check for an exit code, in case a future sets one
// remaining either, we have now run the scheduler until completion if self.state.has_exit_code() {
if !resumed_lua && !resumed_fut { break;
}
// 5. If we have no lua threads or futures remaining,
// we have now run the scheduler until completion
let (has_future_lua, has_future_background) = self.has_futures();
if !has_future_lua && !has_future_background && !self.has_thread() {
break; break;
} }
} }

View file

@ -60,7 +60,7 @@ where
// NOTE: We might be resuming futures, need to signal that a // NOTE: We might be resuming futures, need to signal that a
// new lua thread is ready to break out of futures resumption // new lua thread is ready to break out of futures resumption
if self.futures_break_signal.receiver_count() > 0 { if self.futures_break_signal.receiver_count() > 0 {
self.futures_break_signal.send(false).ok(); self.futures_break_signal.send(()).ok();
} }
Ok(()) Ok(())
@ -98,7 +98,7 @@ where
// NOTE: We might be resuming futures, need to signal that a // NOTE: We might be resuming futures, need to signal that a
// new lua thread is ready to break out of futures resumption // new lua thread is ready to break out of futures resumption
if self.futures_break_signal.receiver_count() > 0 { if self.futures_break_signal.receiver_count() > 0 {
self.futures_break_signal.send(false).ok(); self.futures_break_signal.send(()).ok();
} }
Ok(thread_id) Ok(thread_id)
@ -136,7 +136,7 @@ where
// NOTE: We might be resuming futures, need to signal that a // NOTE: We might be resuming futures, need to signal that a
// new lua thread is ready to break out of futures resumption // new lua thread is ready to break out of futures resumption
if self.futures_break_signal.receiver_count() > 0 { if self.futures_break_signal.receiver_count() > 0 {
self.futures_break_signal.send(false).ok(); self.futures_break_signal.send(()).ok();
} }
Ok(thread_id) Ok(thread_id)

View file

@ -42,8 +42,9 @@ pub(crate) struct Scheduler<'lua, 'fut> {
state: Arc<SchedulerState>, state: Arc<SchedulerState>,
threads: Arc<RefCell<VecDeque<SchedulerThread>>>, threads: Arc<RefCell<VecDeque<SchedulerThread>>>,
thread_senders: Arc<RefCell<HashMap<SchedulerThreadId, SchedulerThreadSender>>>, thread_senders: Arc<RefCell<HashMap<SchedulerThreadId, SchedulerThreadSender>>>,
futures: Arc<AsyncMutex<FuturesUnordered<SchedulerFuture<'fut>>>>, futures_lua: Arc<AsyncMutex<FuturesUnordered<SchedulerFuture<'fut>>>>,
futures_break_signal: Sender<bool>, futures_background: Arc<AsyncMutex<FuturesUnordered<SchedulerFuture<'static>>>>,
futures_break_signal: Sender<()>,
} }
impl<'lua, 'fut> Scheduler<'lua, 'fut> { impl<'lua, 'fut> Scheduler<'lua, 'fut> {
@ -55,7 +56,8 @@ impl<'lua, 'fut> Scheduler<'lua, 'fut> {
state: Arc::new(SchedulerState::new()), state: Arc::new(SchedulerState::new()),
threads: Arc::new(RefCell::new(VecDeque::new())), threads: Arc::new(RefCell::new(VecDeque::new())),
thread_senders: Arc::new(RefCell::new(HashMap::new())), thread_senders: Arc::new(RefCell::new(HashMap::new())),
futures: Arc::new(AsyncMutex::new(FuturesUnordered::new())), futures_lua: Arc::new(AsyncMutex::new(FuturesUnordered::new())),
futures_background: Arc::new(AsyncMutex::new(FuturesUnordered::new())),
futures_break_signal, futures_break_signal,
}; };

View file

@ -9,9 +9,9 @@ local RESPONSE = "Hello, lune!"
-- Serve should not block the thread from continuing -- Serve should not block the thread from continuing
local thread = task.delay(0.2, function() local thread = task.delay(1, function()
stdio.ewrite("Serve must not block the current thread\n") stdio.ewrite("Serve must not block the current thread\n")
task.wait() task.wait(1)
process.exit(1) process.exit(1)
end) end)
@ -28,9 +28,9 @@ task.cancel(thread)
-- Serve should respond to a request we send to it -- Serve should respond to a request we send to it
local thread2 = task.delay(0.2, function() local thread2 = task.delay(1, function()
stdio.ewrite("Serve should respond to requests in a reasonable amount of time\n") stdio.ewrite("Serve should respond to requests in a reasonable amount of time\n")
task.wait() task.wait(1)
process.exit(1) process.exit(1)
end) end)

View file

@ -1,5 +1,6 @@
local net = require("@lune/net") local net = require("@lune/net")
local process = require("@lune/process") local process = require("@lune/process")
local stdio = require("@lune/stdio")
local task = require("@lune/task") local task = require("@lune/task")
local PORT = 8081 local PORT = 8081
@ -8,26 +9,15 @@ local WS_URL = `ws://127.0.0.1:{PORT}`
local REQUEST = "Hello from client!" local REQUEST = "Hello from client!"
local RESPONSE = "Hello, lune!" local RESPONSE = "Hello, lune!"
-- Serve should not block the thread from continuing
local thread = task.delay(0.2, function() local thread = task.delay(0.2, function()
task.spawn(error, "Serve must not block the current thread") stdio.ewrite("Serve must not block the current thread\n")
task.wait(1)
process.exit(1) process.exit(1)
end) end)
--[[ local handle = net.serve(PORT, {
Serve should also take a full config with handler functions
A server should also be able to start on a previously closed port
]]
local handle = net.serve(PORT, function(request)
return RESPONSE
end)
task.cancel(thread)
handle.stop()
task.wait()
local handle2 = net.serve(PORT, {
handleRequest = function() handleRequest = function()
return RESPONSE return RESPONSE
end, end,
@ -39,10 +29,29 @@ local handle2 = net.serve(PORT, {
end, end,
}) })
task.cancel(thread)
-- Serve should respond to a request we send to it
local thread2 = task.delay(1, function()
stdio.ewrite("Serve should respond to requests in a reasonable amount of time\n")
task.wait(1)
process.exit(1)
end)
local response = net.request(URL).body local response = net.request(URL).body
assert(response == RESPONSE, "Invalid response from server") assert(response == RESPONSE, "Invalid response from server")
-- Web socket client should work task.cancel(thread2)
-- Web socket responses should also be responded to
local thread3 = task.delay(1, function()
stdio.ewrite("Serve should respond to websockets in a reasonable amount of time\n")
task.wait(1)
process.exit(1)
end)
local socket = net.socket(WS_URL) local socket = net.socket(WS_URL)
socket.send(REQUEST) socket.send(REQUEST)
@ -53,6 +62,8 @@ assert(socketMessage == RESPONSE, "Invalid web socket response from server")
socket.close() socket.close()
task.cancel(thread3)
-- Wait for the socket to close and make sure we can't send messages afterwards -- Wait for the socket to close and make sure we can't send messages afterwards
task.wait() task.wait()
local success3, err2 = (pcall :: any)(socket.send, "") local success3, err2 = (pcall :: any)(socket.send, "")
@ -64,4 +75,4 @@ assert(
) )
-- Stop the server to end the test -- Stop the server to end the test
handle2.stop() handle.stop()

View file

@ -1,12 +1,21 @@
local process = require("@lune/process") local process = require("@lune/process")
local stdio = require("@lune/stdio")
local task = require("@lune/task") local task = require("@lune/task")
-- Spawning a child process should work with options -- Spawning a child process should work, with options
local thread = task.delay(1, function()
stdio.ewrite("Spawning a process should take a reasonable amount of time\n")
task.wait(1)
process.exit(1)
end)
local result = process.spawn("ls", { local result = process.spawn("ls", {
"-a", "-a",
}) })
task.cancel(thread)
assert(result.ok, "Failed to spawn child process") assert(result.ok, "Failed to spawn child process")
assert(result.stderr == "", "Stderr was not empty") assert(result.stderr == "", "Stderr was not empty")
@ -85,6 +94,12 @@ assert(homeDir1 == homeDir2, "Home dirs did not match when performing tilde subs
local SLEEP_DURATION = 1 / 4 local SLEEP_DURATION = 1 / 4
local SLEEP_SAMPLES = 2 local SLEEP_SAMPLES = 2
local thread2 = task.delay(SLEEP_DURATION * 1.5, function()
stdio.ewrite("Spawning a sleep process should take a reasonable amount of time\n")
task.wait(1)
process.exit(1)
end)
local sleepStart = os.clock() local sleepStart = os.clock()
local sleepCounter = 0 local sleepCounter = 0
for i = 1, SLEEP_SAMPLES, 1 do for i = 1, SLEEP_SAMPLES, 1 do
@ -97,9 +112,15 @@ while sleepCounter < SLEEP_SAMPLES do
task.wait() task.wait()
end end
task.cancel(thread2)
local sleepElapsed = os.clock() - sleepStart local sleepElapsed = os.clock() - sleepStart
assert( assert(
(sleepElapsed >= SLEEP_DURATION) and (sleepElapsed < SLEEP_DURATION * 1.5), sleepElapsed >= SLEEP_DURATION,
"Spawning a process that does blocking sleep did not sleep enough"
)
assert(
sleepElapsed < SLEEP_DURATION * 1.5,
"Coroutine yielded the main lua thread during process yield" "Coroutine yielded the main lua thread during process yield"
) )