diff --git a/crates/lune-std-net/src/server/mod.rs b/crates/lune-std-net/src/server/mod.rs index 241cbb7..57ee2b6 100644 --- a/crates/lune-std-net/src/server/mod.rs +++ b/crates/lune-std-net/src/server/mod.rs @@ -1,4 +1,10 @@ -use std::net::SocketAddr; +use std::{ + net::SocketAddr, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, +}; use async_net::TcpListener; use futures_lite::pin; @@ -39,51 +45,76 @@ pub async fn serve(lua: Lua, port: u16, config: ServeConfig) -> LuaResult { - if res.is_ok() { - break; - } - // NOTE: We will only get a RecvError if the serve handle is dropped, - // this means lua has garbage collected it and the user does not want - // to manually stop the server using the serve handle. Run forever. - running_forever = true; + // 1. Keep accepting new connections until we should shutdown + let (conn, addr) = if handle_dropped.load(Ordering::SeqCst) { + // 1a. Handle has been dropped, and we don't need to listen for shutdown + match listener.accept().await { + Ok(acc) => acc, + Err(_err) => { + // TODO: Propagate error somehow + continue; + } + } + } else { + // 1b. Handle is possibly active, we must listen for shutdown + match either(shutdown_rx.recv(), listener.accept()).await { + Either::Left(Ok(())) => break, + Either::Left(Err(_)) => { + // NOTE #1: We will only get a RecvError if the serve handle is dropped, + // this means lua has garbage collected it and the user does not want + // to manually stop the server using the serve handle. Run forever. + handle_dropped.store(true, Ordering::SeqCst); + continue; + } + Either::Right(Ok(acc)) => acc, + Either::Right(Err(_err)) => { + // TODO: Propagate error somehow continue; } - Either::Right(acc) => acc, - } - }; - - let (conn, addr) = match accepted { - Ok((conn, addr)) => (conn, addr), - Err(err) => { - eprintln!("Error while accepting connection: {err}"); - continue; } }; + // 2. For each connection, spawn a new task to handle it lua.spawn_local({ let rx = shutdown_rx.clone(); let io = HyperIo::from(conn); + let mut svc = service.clone(); svc.address = addr; + + let handle_dropped = Arc::clone(&handle_dropped); async move { let conn = Http1Builder::new() .timer(HyperTimer) .keep_alive(true) .serve_connection(io, svc) .with_upgrades(); - // NOTE: Because we use keep_alive for websockets above, we need to - // also manually poll this future and handle the shutdown signal here - pin!(conn); - match either(rx.recv(), conn.as_mut()).await { - Either::Left(_) => conn.as_mut().graceful_shutdown(), - Either::Right(_) => {} + if handle_dropped.load(Ordering::SeqCst) { + if let Err(_err) = conn.await { + // TODO: Propagate error somehow + } + } else { + // NOTE #2: Because we use keep_alive for websockets above, we need to + // also manually poll this future and handle the graceful shutdown, + // otherwise the already accepted connection will linger and run + // even if the stop method has been called on the serve handle + pin!(conn); + match either(rx.recv(), conn.as_mut()).await { + Either::Left(Ok(())) => conn.as_mut().graceful_shutdown(), + Either::Left(Err(_)) => { + // Same as note #1 + handle_dropped.store(true, Ordering::SeqCst); + if let Err(_err) = conn.await { + // TODO: Propagate error somehow + } + } + Either::Right(Ok(())) => {} + Either::Right(Err(_err)) => { + // TODO: Propagate error somehow + } + } } } }); diff --git a/test.luau b/test.luau new file mode 100644 index 0000000..be34e69 --- /dev/null +++ b/test.luau @@ -0,0 +1,12 @@ +local net = require("@lune/net") + +local counter = 0 +net.serve(8080, function() + counter += 1 + return { + status = 200, + body = "Hello! This is response #" .. tostring(counter), + } +end) + +print("Listening on port 8080 🚀")