diff --git a/src/lune/builtins/net/mod.rs b/src/lune/builtins/net/mod.rs index 95aecd9..c504cde 100644 --- a/src/lune/builtins/net/mod.rs +++ b/src/lune/builtins/net/mod.rs @@ -7,10 +7,7 @@ use hyper::{ header::{CONTENT_ENCODING, CONTENT_LENGTH}, Server, }; -use tokio::{ - sync::{mpsc, oneshot}, - task, -}; +use tokio::sync::mpsc; use crate::lune::{scheduler::Scheduler, util::TableBuilder}; @@ -174,32 +171,27 @@ where } Ok(bound) => bound, }; - // Register a background task to prevent the task scheduler from - // exiting early and start up our web server on the bound address - // TODO: Implement background task registration in scheduler - let (background_tx, background_rx) = oneshot::channel(); + // Start up our web server sched.schedule_future(async move { - let _ = background_rx.await; + bound + .http1_only(true) // Web sockets can only use http1 + .http1_keepalive(true) // Web sockets must be kept alive + .executor(NetLocalExec) + .serve(NetService::new( + lua, + server_request_callback, + server_websocket_callback, + )) + .with_graceful_shutdown(async move { + shutdown_rx + .recv() + .await + .expect("Server was stopped instantly"); + shutdown_rx.close(); + }) + .await + .unwrap(); }); - let server = bound - .http1_only(true) // Web sockets can only use http1 - .http1_keepalive(true) // Web sockets must be kept alive - .executor(NetLocalExec) - .serve(NetService::new( - lua, - server_request_callback, - server_websocket_callback, - )) - .with_graceful_shutdown(async move { - let _ = background_tx.send(()); - shutdown_rx - .recv() - .await - .expect("Server was stopped instantly"); - shutdown_rx.close(); - }); - // Spawn a new tokio task so we don't block - task::spawn_local(server); // Create a new read-only table that contains methods // for manipulating server behavior and shutting it down let handle_stop = move |_, _: ()| match shutdown_tx.try_send(()) { diff --git a/src/lune/builtins/net/server.rs b/src/lune/builtins/net/server.rs index 5f9c364..d896560 100644 --- a/src/lune/builtins/net/server.rs +++ b/src/lune/builtins/net/server.rs @@ -10,7 +10,7 @@ use mlua::prelude::*; use hyper::{body::to_bytes, server::conn::AddrStream, service::Service}; use hyper::{Body, Request, Response}; use hyper_tungstenite::{is_upgrade_request as is_ws_upgrade_request, upgrade as ws_upgrade}; -use tokio::{sync::oneshot, task}; +use tokio::task; use crate::lune::{ scheduler::Scheduler, @@ -47,31 +47,26 @@ impl Service> for NetServiceInner { let key = kopt.as_ref().as_ref().unwrap(); let handler: LuaFunction = lua.registry_value(key).expect("Missing websocket handler"); let (response, ws) = ws_upgrade(&mut req, None).expect("Failed to upgrade websocket"); - // This should be spawned as a registered task, otherwise + // This should be spawned as a scheduler task, otherwise // the scheduler may exit early and cancel this even though what // we want here is a long-running task that keeps the program alive let sched = lua .app_data_ref::<&Scheduler>() .expect("Lua struct is missing scheduler"); - // TODO: Implement background task registration in scheduler - let (background_tx, background_rx) = oneshot::channel(); sched.schedule_future(async move { - let _ = background_rx.await; - }); - task::spawn_local(async move { // Create our new full websocket object, then // schedule our handler to get called asap - let ws = ws.await.into_lua_err()?; - let sock = NetWebSocket::new(ws).into_lua_table(lua)?; - let sched = lua - .app_data_ref::<&Scheduler>() - .expect("Lua struct is missing scheduler"); - let result = sched.push_front( - lua.create_thread(handler)?, - LuaMultiValue::from_vec(vec![LuaValue::Table(sock)]), - ); - let _ = background_tx.send(()); - result + let res = async move { + let ws = ws.await.into_lua_err()?; + let sock = NetWebSocket::new(ws).into_lua_table(lua)?; + sched.push_front( + lua.create_thread(handler)?, + LuaMultiValue::from_vec(vec![LuaValue::Table(sock)]), + ) + }; + if let Err(e) = res.await { + lua.emit_error(e); + } }); Box::pin(async move { Ok(response) }) } else { diff --git a/src/lune/mod.rs b/src/lune/mod.rs index 3aaa11c..8071762 100644 --- a/src/lune/mod.rs +++ b/src/lune/mod.rs @@ -24,10 +24,9 @@ impl Lune { */ #[allow(clippy::new_without_default)] pub fn new() -> Self { - // FIXME: Leaking these and using a manual drop implementation - // does not feel great... is there any way for us to create a - // scheduler, store it in app data, and guarantee it has - // the same lifetime as Lua without using any unsafe? + // FIXME: Leaking these here does not feel great... is there + // any way for us to create a scheduler, store it in app data, and + // guarantee it has the same lifetime as Lua without using any unsafe? let lua = Lua::new().into_static(); let scheduler = Scheduler::new(lua).into_static(); @@ -70,16 +69,3 @@ impl Lune { Ok(self.scheduler.run_to_completion().await) } } - -impl Drop for Lune { - fn drop(&mut self) { - // SAFETY: When the Lune struct is dropped, it is guaranteed - // that the Lua and Scheduler structs are no longer being used, - // since all the methods that reference them (eg. `run`) - // take an exclusive / mutable reference - unsafe { - Lua::from_static(self.lua); - Scheduler::from_static(self.scheduler); - } - } -} diff --git a/src/lune/scheduler/impl_runner.rs b/src/lune/scheduler/impl_runner.rs index 3261401..86efcf1 100644 --- a/src/lune/scheduler/impl_runner.rs +++ b/src/lune/scheduler/impl_runner.rs @@ -104,12 +104,18 @@ where // Wait until we either manually break out of resumption or a future completes tokio::select! { - _res = rx.recv() => break, - res = futs.next() => { - match res { - Some(_) => resumed_any = true, - None => break, + res = rx.recv() => { + if res.is_err() { + panic!( + "Futures break signal was dropped but futures still remain - \ + this may cause memory unsafety if a future accesses lua struct" + ) } + break; + }, + res = futs.next() => match res { + Some(_) => resumed_any = true, + None => break, }, } diff --git a/src/lune/scheduler/impl_threads.rs b/src/lune/scheduler/impl_threads.rs index dc5a2f8..d8530ca 100644 --- a/src/lune/scheduler/impl_threads.rs +++ b/src/lune/scheduler/impl_threads.rs @@ -60,7 +60,7 @@ where // NOTE: We might be resuming futures, need to signal that a // new lua thread is ready to break out of futures resumption if self.futures_break_signal.receiver_count() > 0 { - self.futures_break_signal.send(()).ok(); + self.futures_break_signal.send(false).ok(); } Ok(()) @@ -98,7 +98,7 @@ where // NOTE: We might be resuming futures, need to signal that a // new lua thread is ready to break out of futures resumption if self.futures_break_signal.receiver_count() > 0 { - self.futures_break_signal.send(()).ok(); + self.futures_break_signal.send(false).ok(); } Ok(thread_id) @@ -136,7 +136,7 @@ where // NOTE: We might be resuming futures, need to signal that a // new lua thread is ready to break out of futures resumption if self.futures_break_signal.receiver_count() > 0 { - self.futures_break_signal.send(()).ok(); + self.futures_break_signal.send(false).ok(); } Ok(thread_id) diff --git a/src/lune/scheduler/mod.rs b/src/lune/scheduler/mod.rs index 0e06b43..69d90b3 100644 --- a/src/lune/scheduler/mod.rs +++ b/src/lune/scheduler/mod.rs @@ -43,7 +43,7 @@ pub(crate) struct Scheduler<'lua, 'fut> { threads: Arc>>, thread_senders: Arc>>, futures: Arc>>>, - futures_break_signal: Sender<()>, + futures_break_signal: Sender, } impl<'lua, 'fut> Scheduler<'lua, 'fut> { @@ -88,9 +88,4 @@ impl<'lua, 'fut> Scheduler<'lua, 'fut> { pub fn into_static(self) -> &'static Self { Box::leak(Box::new(self)) } - - #[doc(hidden)] - pub unsafe fn from_static(lua: &'static Scheduler) -> Self { - *Box::from_raw(lua as *const Scheduler as *mut Scheduler) - } } diff --git a/src/tests.rs b/src/tests.rs index c5e6715..5380e5d 100644 --- a/src/tests.rs +++ b/src/tests.rs @@ -55,9 +55,8 @@ create_tests! { net_request_redirect: "net/request/redirect", net_url_encode: "net/url/encode", net_url_decode: "net/url/decode", - // FIXME: Net library has futures running past the lifetime of lua, causing sigsegv - // net_serve_requests: "net/serve/requests", - // net_serve_websockets: "net/serve/websockets", + net_serve_requests: "net/serve/requests", + net_serve_websockets: "net/serve/websockets", net_socket_wss: "net/socket/wss", net_socket_wss_rw: "net/socket/wss_rw",