mirror of
https://github.com/lune-org/lune.git
synced 2025-05-04 10:43:57 +01:00
Properly fix http server dying when serve handle is GCed + document it
This commit is contained in:
parent
bf819d1980
commit
11bce8412c
2 changed files with 72 additions and 29 deletions
|
@ -1,4 +1,10 @@
|
||||||
use std::net::SocketAddr;
|
use std::{
|
||||||
|
net::SocketAddr,
|
||||||
|
sync::{
|
||||||
|
atomic::{AtomicBool, Ordering},
|
||||||
|
Arc,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
use async_net::TcpListener;
|
use async_net::TcpListener;
|
||||||
use futures_lite::pin;
|
use futures_lite::pin;
|
||||||
|
@ -39,51 +45,76 @@ pub async fn serve(lua: Lua, port: u16, config: ServeConfig) -> LuaResult<ServeH
|
||||||
lua.spawn_local({
|
lua.spawn_local({
|
||||||
let lua = lua.clone();
|
let lua = lua.clone();
|
||||||
async move {
|
async move {
|
||||||
let mut running_forever = false;
|
let handle_dropped = Arc::new(AtomicBool::new(false));
|
||||||
loop {
|
loop {
|
||||||
let accepted = if running_forever {
|
// 1. Keep accepting new connections until we should shutdown
|
||||||
listener.accept().await
|
let (conn, addr) = if handle_dropped.load(Ordering::SeqCst) {
|
||||||
} else {
|
// 1a. Handle has been dropped, and we don't need to listen for shutdown
|
||||||
match either(shutdown_rx.recv(), listener.accept()).await {
|
match listener.accept().await {
|
||||||
Either::Left(res) => {
|
Ok(acc) => acc,
|
||||||
if res.is_ok() {
|
Err(_err) => {
|
||||||
break;
|
// TODO: Propagate error somehow
|
||||||
}
|
continue;
|
||||||
// NOTE: We will only get a RecvError if the serve handle is dropped,
|
}
|
||||||
// this means lua has garbage collected it and the user does not want
|
}
|
||||||
// to manually stop the server using the serve handle. Run forever.
|
} else {
|
||||||
running_forever = true;
|
// 1b. Handle is possibly active, we must listen for shutdown
|
||||||
|
match either(shutdown_rx.recv(), listener.accept()).await {
|
||||||
|
Either::Left(Ok(())) => break,
|
||||||
|
Either::Left(Err(_)) => {
|
||||||
|
// NOTE #1: We will only get a RecvError if the serve handle is dropped,
|
||||||
|
// this means lua has garbage collected it and the user does not want
|
||||||
|
// to manually stop the server using the serve handle. Run forever.
|
||||||
|
handle_dropped.store(true, Ordering::SeqCst);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
Either::Right(Ok(acc)) => acc,
|
||||||
|
Either::Right(Err(_err)) => {
|
||||||
|
// TODO: Propagate error somehow
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
Either::Right(acc) => acc,
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let (conn, addr) = match accepted {
|
|
||||||
Ok((conn, addr)) => (conn, addr),
|
|
||||||
Err(err) => {
|
|
||||||
eprintln!("Error while accepting connection: {err}");
|
|
||||||
continue;
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// 2. For each connection, spawn a new task to handle it
|
||||||
lua.spawn_local({
|
lua.spawn_local({
|
||||||
let rx = shutdown_rx.clone();
|
let rx = shutdown_rx.clone();
|
||||||
let io = HyperIo::from(conn);
|
let io = HyperIo::from(conn);
|
||||||
|
|
||||||
let mut svc = service.clone();
|
let mut svc = service.clone();
|
||||||
svc.address = addr;
|
svc.address = addr;
|
||||||
|
|
||||||
|
let handle_dropped = Arc::clone(&handle_dropped);
|
||||||
async move {
|
async move {
|
||||||
let conn = Http1Builder::new()
|
let conn = Http1Builder::new()
|
||||||
.timer(HyperTimer)
|
.timer(HyperTimer)
|
||||||
.keep_alive(true)
|
.keep_alive(true)
|
||||||
.serve_connection(io, svc)
|
.serve_connection(io, svc)
|
||||||
.with_upgrades();
|
.with_upgrades();
|
||||||
// NOTE: Because we use keep_alive for websockets above, we need to
|
if handle_dropped.load(Ordering::SeqCst) {
|
||||||
// also manually poll this future and handle the shutdown signal here
|
if let Err(_err) = conn.await {
|
||||||
pin!(conn);
|
// TODO: Propagate error somehow
|
||||||
match either(rx.recv(), conn.as_mut()).await {
|
}
|
||||||
Either::Left(_) => conn.as_mut().graceful_shutdown(),
|
} else {
|
||||||
Either::Right(_) => {}
|
// NOTE #2: Because we use keep_alive for websockets above, we need to
|
||||||
|
// also manually poll this future and handle the graceful shutdown,
|
||||||
|
// otherwise the already accepted connection will linger and run
|
||||||
|
// even if the stop method has been called on the serve handle
|
||||||
|
pin!(conn);
|
||||||
|
match either(rx.recv(), conn.as_mut()).await {
|
||||||
|
Either::Left(Ok(())) => conn.as_mut().graceful_shutdown(),
|
||||||
|
Either::Left(Err(_)) => {
|
||||||
|
// Same as note #1
|
||||||
|
handle_dropped.store(true, Ordering::SeqCst);
|
||||||
|
if let Err(_err) = conn.await {
|
||||||
|
// TODO: Propagate error somehow
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Either::Right(Ok(())) => {}
|
||||||
|
Either::Right(Err(_err)) => {
|
||||||
|
// TODO: Propagate error somehow
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
12
test.luau
Normal file
12
test.luau
Normal file
|
@ -0,0 +1,12 @@
|
||||||
|
local net = require("@lune/net")
|
||||||
|
|
||||||
|
local counter = 0
|
||||||
|
net.serve(8080, function()
|
||||||
|
counter += 1
|
||||||
|
return {
|
||||||
|
status = 200,
|
||||||
|
body = "Hello! This is response #" .. tostring(counter),
|
||||||
|
}
|
||||||
|
end)
|
||||||
|
|
||||||
|
print("Listening on port 8080 🚀")
|
Loading…
Add table
Reference in a new issue