From 59aef5c1707f485b8a0b2eb2e72dfc83dd537dcc Mon Sep 17 00:00:00 2001 From: Filip Tibell Date: Sun, 20 Aug 2023 18:54:34 -0500 Subject: [PATCH] Split scheduler futures into lua & background, improve async tests --- src/lune/builtins/net/mod.rs | 2 +- src/lune/builtins/net/server.rs | 2 +- src/lune/scheduler/impl_async.rs | 34 +++++++++--- src/lune/scheduler/impl_runner.rs | 84 ++++++++++++++++-------------- src/lune/scheduler/impl_threads.rs | 6 +-- src/lune/scheduler/mod.rs | 8 +-- tests/net/serve/requests.luau | 8 +-- tests/net/serve/websockets.luau | 47 ++++++++++------- tests/process/spawn.luau | 25 ++++++++- 9 files changed, 140 insertions(+), 76 deletions(-) diff --git a/src/lune/builtins/net/mod.rs b/src/lune/builtins/net/mod.rs index c504cde..552fd55 100644 --- a/src/lune/builtins/net/mod.rs +++ b/src/lune/builtins/net/mod.rs @@ -172,7 +172,7 @@ where Ok(bound) => bound, }; // Start up our web server - sched.schedule_future(async move { + sched.schedule_future_background(async move { bound .http1_only(true) // Web sockets can only use http1 .http1_keepalive(true) // Web sockets must be kept alive diff --git a/src/lune/builtins/net/server.rs b/src/lune/builtins/net/server.rs index d896560..ef3d48f 100644 --- a/src/lune/builtins/net/server.rs +++ b/src/lune/builtins/net/server.rs @@ -53,7 +53,7 @@ impl Service> for NetServiceInner { let sched = lua .app_data_ref::<&Scheduler>() .expect("Lua struct is missing scheduler"); - sched.schedule_future(async move { + sched.schedule_future_background(async move { // Create our new full websocket object, then // schedule our handler to get called asap let res = async move { diff --git a/src/lune/scheduler/impl_async.rs b/src/lune/scheduler/impl_async.rs index 925dd03..1a2a9c9 100644 --- a/src/lune/scheduler/impl_async.rs +++ b/src/lune/scheduler/impl_async.rs @@ -8,16 +8,38 @@ where 'lua: 'fut, { /** - Schedules a plain future to run whenever the scheduler is available. + Checks if there are any futures to run, for + lua futures and background futures respectively. */ - pub fn schedule_future(&'fut self, fut: F) + pub(super) fn has_futures(&self) -> (bool, bool) { + ( + self.futures_lua + .try_lock() + .expect("Failed to lock lua futures for check") + .len() + > 0, + self.futures_background + .try_lock() + .expect("Failed to lock background futures for check") + .len() + > 0, + ) + } + + /** + Schedules a plain future to run in the background. + + Note that this will keep the scheduler alive even + if the future does not spawn any new lua threads. + */ + pub fn schedule_future_background(&self, fut: F) where - F: Future + 'fut, + F: Future + 'static, { let futs = self - .futures + .futures_background .try_lock() - .expect("TODO: Make scheduling futures during resumption work"); + .expect("Failed to lock futures queue for background tasks"); futs.push(Box::pin(fut)) } @@ -36,7 +58,7 @@ where F: Future> + 'fut, { let thread = thread.into_lua_thread(self.lua)?; - let futs = self.futures.try_lock().expect( + let futs = self.futures_lua.try_lock().expect( "Failed to lock futures queue - \ can't schedule future lua threads during futures resumption", ); diff --git a/src/lune/scheduler/impl_runner.rs b/src/lune/scheduler/impl_runner.rs index efcb39b..440c02b 100644 --- a/src/lune/scheduler/impl_runner.rs +++ b/src/lune/scheduler/impl_runner.rs @@ -88,42 +88,44 @@ where /** Runs futures until none are left or a future spawned a new lua thread. - - Returns `true` if any future was resumed, `false` otherwise. */ - async fn run_futures(&self) -> bool { - let mut resumed_any = false; - - loop { - let mut rx = self.futures_break_signal.subscribe(); - let mut futs = self - .futures - .try_lock() - .expect("Failed to lock futures for resumption"); - - // Wait until we either manually break out of resumption or a future completes - tokio::select! { - res = rx.recv() => { - if res.is_err() { - panic!( - "Futures break signal was dropped but futures still remain - \ - this may cause memory unsafety if a future accesses lua struct" - ) - } - break; - }, - res = futs.next() => match res { - Some(_) => resumed_any = true, - None => break, - }, - } + async fn run_futures_lua(&self) { + let mut futs = self + .futures_lua + .try_lock() + .expect("Failed to lock lua futures for resumption"); + while futs.next().await.is_some() { if self.has_thread() { break; } } + } - resumed_any + /** + Runs background futures until none are left or a future spawned a new lua thread. + */ + async fn run_futures_background(&self) { + let mut futs = self + .futures_background + .try_lock() + .expect("Failed to lock background futures for resumption"); + + while futs.next().await.is_some() { + if self.has_thread() { + break; + } + } + } + + async fn run_futures(&self) { + let mut rx = self.futures_break_signal.subscribe(); + + tokio::select! { + _ = self.run_futures_lua() => {}, + _ = self.run_futures_background() => {}, + _ = rx.recv() => {}, + }; } /** @@ -138,9 +140,8 @@ where let _guard = set.enter(); loop { - // 1. Run lua threads until exit or there are none left, - // if any thread was resumed it may have spawned futures - let resumed_lua = self.run_lua_threads(); + // 1. Run lua threads until exit or there are none left + self.run_lua_threads(); // 2. If we got a manual exit code from lua we should // not try to wait for any pending futures to complete @@ -148,13 +149,20 @@ where break; } - // 3. Keep resuming futures until we get a new lua thread to - // resume, or until we don't have any futures left to wait for - let resumed_fut = self.run_futures().await; + // 3. Keep resuming futures until there are no futures left to + // resume, or until we manually break out of resumption for any + // reason, this may be because a future spawned a new lua thread + self.run_futures().await; - // 4. If we did not resume any lua threads, and we have no futures - // remaining either, we have now run the scheduler until completion - if !resumed_lua && !resumed_fut { + // 4. Once again, check for an exit code, in case a future sets one + if self.state.has_exit_code() { + break; + } + + // 5. If we have no lua threads or futures remaining, + // we have now run the scheduler until completion + let (has_future_lua, has_future_background) = self.has_futures(); + if !has_future_lua && !has_future_background && !self.has_thread() { break; } } diff --git a/src/lune/scheduler/impl_threads.rs b/src/lune/scheduler/impl_threads.rs index d8530ca..dc5a2f8 100644 --- a/src/lune/scheduler/impl_threads.rs +++ b/src/lune/scheduler/impl_threads.rs @@ -60,7 +60,7 @@ where // NOTE: We might be resuming futures, need to signal that a // new lua thread is ready to break out of futures resumption if self.futures_break_signal.receiver_count() > 0 { - self.futures_break_signal.send(false).ok(); + self.futures_break_signal.send(()).ok(); } Ok(()) @@ -98,7 +98,7 @@ where // NOTE: We might be resuming futures, need to signal that a // new lua thread is ready to break out of futures resumption if self.futures_break_signal.receiver_count() > 0 { - self.futures_break_signal.send(false).ok(); + self.futures_break_signal.send(()).ok(); } Ok(thread_id) @@ -136,7 +136,7 @@ where // NOTE: We might be resuming futures, need to signal that a // new lua thread is ready to break out of futures resumption if self.futures_break_signal.receiver_count() > 0 { - self.futures_break_signal.send(false).ok(); + self.futures_break_signal.send(()).ok(); } Ok(thread_id) diff --git a/src/lune/scheduler/mod.rs b/src/lune/scheduler/mod.rs index 69d90b3..b011878 100644 --- a/src/lune/scheduler/mod.rs +++ b/src/lune/scheduler/mod.rs @@ -42,8 +42,9 @@ pub(crate) struct Scheduler<'lua, 'fut> { state: Arc, threads: Arc>>, thread_senders: Arc>>, - futures: Arc>>>, - futures_break_signal: Sender, + futures_lua: Arc>>>, + futures_background: Arc>>>, + futures_break_signal: Sender<()>, } impl<'lua, 'fut> Scheduler<'lua, 'fut> { @@ -55,7 +56,8 @@ impl<'lua, 'fut> Scheduler<'lua, 'fut> { state: Arc::new(SchedulerState::new()), threads: Arc::new(RefCell::new(VecDeque::new())), thread_senders: Arc::new(RefCell::new(HashMap::new())), - futures: Arc::new(AsyncMutex::new(FuturesUnordered::new())), + futures_lua: Arc::new(AsyncMutex::new(FuturesUnordered::new())), + futures_background: Arc::new(AsyncMutex::new(FuturesUnordered::new())), futures_break_signal, }; diff --git a/tests/net/serve/requests.luau b/tests/net/serve/requests.luau index a2dc551..1762248 100644 --- a/tests/net/serve/requests.luau +++ b/tests/net/serve/requests.luau @@ -9,9 +9,9 @@ local RESPONSE = "Hello, lune!" -- Serve should not block the thread from continuing -local thread = task.delay(0.2, function() +local thread = task.delay(1, function() stdio.ewrite("Serve must not block the current thread\n") - task.wait() + task.wait(1) process.exit(1) end) @@ -28,9 +28,9 @@ task.cancel(thread) -- Serve should respond to a request we send to it -local thread2 = task.delay(0.2, function() +local thread2 = task.delay(1, function() stdio.ewrite("Serve should respond to requests in a reasonable amount of time\n") - task.wait() + task.wait(1) process.exit(1) end) diff --git a/tests/net/serve/websockets.luau b/tests/net/serve/websockets.luau index 0e7e997..cf42cee 100644 --- a/tests/net/serve/websockets.luau +++ b/tests/net/serve/websockets.luau @@ -1,5 +1,6 @@ local net = require("@lune/net") local process = require("@lune/process") +local stdio = require("@lune/stdio") local task = require("@lune/task") local PORT = 8081 @@ -8,26 +9,15 @@ local WS_URL = `ws://127.0.0.1:{PORT}` local REQUEST = "Hello from client!" local RESPONSE = "Hello, lune!" +-- Serve should not block the thread from continuing + local thread = task.delay(0.2, function() - task.spawn(error, "Serve must not block the current thread") + stdio.ewrite("Serve must not block the current thread\n") + task.wait(1) process.exit(1) end) ---[[ - Serve should also take a full config with handler functions - - A server should also be able to start on a previously closed port -]] - -local handle = net.serve(PORT, function(request) - return RESPONSE -end) - -task.cancel(thread) -handle.stop() -task.wait() - -local handle2 = net.serve(PORT, { +local handle = net.serve(PORT, { handleRequest = function() return RESPONSE end, @@ -39,10 +29,29 @@ local handle2 = net.serve(PORT, { end, }) +task.cancel(thread) + +-- Serve should respond to a request we send to it + +local thread2 = task.delay(1, function() + stdio.ewrite("Serve should respond to requests in a reasonable amount of time\n") + task.wait(1) + process.exit(1) +end) + local response = net.request(URL).body assert(response == RESPONSE, "Invalid response from server") --- Web socket client should work +task.cancel(thread2) + +-- Web socket responses should also be responded to + +local thread3 = task.delay(1, function() + stdio.ewrite("Serve should respond to websockets in a reasonable amount of time\n") + task.wait(1) + process.exit(1) +end) + local socket = net.socket(WS_URL) socket.send(REQUEST) @@ -53,6 +62,8 @@ assert(socketMessage == RESPONSE, "Invalid web socket response from server") socket.close() +task.cancel(thread3) + -- Wait for the socket to close and make sure we can't send messages afterwards task.wait() local success3, err2 = (pcall :: any)(socket.send, "") @@ -64,4 +75,4 @@ assert( ) -- Stop the server to end the test -handle2.stop() +handle.stop() diff --git a/tests/process/spawn.luau b/tests/process/spawn.luau index 6878e8e..c3537df 100644 --- a/tests/process/spawn.luau +++ b/tests/process/spawn.luau @@ -1,12 +1,21 @@ local process = require("@lune/process") +local stdio = require("@lune/stdio") local task = require("@lune/task") --- Spawning a child process should work with options +-- Spawning a child process should work, with options + +local thread = task.delay(1, function() + stdio.ewrite("Spawning a process should take a reasonable amount of time\n") + task.wait(1) + process.exit(1) +end) local result = process.spawn("ls", { "-a", }) +task.cancel(thread) + assert(result.ok, "Failed to spawn child process") assert(result.stderr == "", "Stderr was not empty") @@ -85,6 +94,12 @@ assert(homeDir1 == homeDir2, "Home dirs did not match when performing tilde subs local SLEEP_DURATION = 1 / 4 local SLEEP_SAMPLES = 2 +local thread2 = task.delay(SLEEP_DURATION * 1.5, function() + stdio.ewrite("Spawning a sleep process should take a reasonable amount of time\n") + task.wait(1) + process.exit(1) +end) + local sleepStart = os.clock() local sleepCounter = 0 for i = 1, SLEEP_SAMPLES, 1 do @@ -97,9 +112,15 @@ while sleepCounter < SLEEP_SAMPLES do task.wait() end +task.cancel(thread2) + local sleepElapsed = os.clock() - sleepStart assert( - (sleepElapsed >= SLEEP_DURATION) and (sleepElapsed < SLEEP_DURATION * 1.5), + sleepElapsed >= SLEEP_DURATION, + "Spawning a process that does blocking sleep did not sleep enough" +) +assert( + sleepElapsed < SLEEP_DURATION * 1.5, "Coroutine yielded the main lua thread during process yield" )