Start work on background tasks

This commit is contained in:
Filip Tibell 2023-08-20 16:20:45 -05:00
parent f0099ac5e8
commit a3b364ae23
7 changed files with 53 additions and 80 deletions

View file

@ -7,10 +7,7 @@ use hyper::{
header::{CONTENT_ENCODING, CONTENT_LENGTH}, header::{CONTENT_ENCODING, CONTENT_LENGTH},
Server, Server,
}; };
use tokio::{ use tokio::sync::mpsc;
sync::{mpsc, oneshot},
task,
};
use crate::lune::{scheduler::Scheduler, util::TableBuilder}; use crate::lune::{scheduler::Scheduler, util::TableBuilder};
@ -174,14 +171,9 @@ where
} }
Ok(bound) => bound, Ok(bound) => bound,
}; };
// Register a background task to prevent the task scheduler from // Start up our web server
// exiting early and start up our web server on the bound address
// TODO: Implement background task registration in scheduler
let (background_tx, background_rx) = oneshot::channel();
sched.schedule_future(async move { sched.schedule_future(async move {
let _ = background_rx.await; bound
});
let server = bound
.http1_only(true) // Web sockets can only use http1 .http1_only(true) // Web sockets can only use http1
.http1_keepalive(true) // Web sockets must be kept alive .http1_keepalive(true) // Web sockets must be kept alive
.executor(NetLocalExec) .executor(NetLocalExec)
@ -191,15 +183,15 @@ where
server_websocket_callback, server_websocket_callback,
)) ))
.with_graceful_shutdown(async move { .with_graceful_shutdown(async move {
let _ = background_tx.send(());
shutdown_rx shutdown_rx
.recv() .recv()
.await .await
.expect("Server was stopped instantly"); .expect("Server was stopped instantly");
shutdown_rx.close(); shutdown_rx.close();
})
.await
.unwrap();
}); });
// Spawn a new tokio task so we don't block
task::spawn_local(server);
// Create a new read-only table that contains methods // Create a new read-only table that contains methods
// for manipulating server behavior and shutting it down // for manipulating server behavior and shutting it down
let handle_stop = move |_, _: ()| match shutdown_tx.try_send(()) { let handle_stop = move |_, _: ()| match shutdown_tx.try_send(()) {

View file

@ -10,7 +10,7 @@ use mlua::prelude::*;
use hyper::{body::to_bytes, server::conn::AddrStream, service::Service}; use hyper::{body::to_bytes, server::conn::AddrStream, service::Service};
use hyper::{Body, Request, Response}; use hyper::{Body, Request, Response};
use hyper_tungstenite::{is_upgrade_request as is_ws_upgrade_request, upgrade as ws_upgrade}; use hyper_tungstenite::{is_upgrade_request as is_ws_upgrade_request, upgrade as ws_upgrade};
use tokio::{sync::oneshot, task}; use tokio::task;
use crate::lune::{ use crate::lune::{
scheduler::Scheduler, scheduler::Scheduler,
@ -47,31 +47,26 @@ impl Service<Request<Body>> for NetServiceInner {
let key = kopt.as_ref().as_ref().unwrap(); let key = kopt.as_ref().as_ref().unwrap();
let handler: LuaFunction = lua.registry_value(key).expect("Missing websocket handler"); let handler: LuaFunction = lua.registry_value(key).expect("Missing websocket handler");
let (response, ws) = ws_upgrade(&mut req, None).expect("Failed to upgrade websocket"); let (response, ws) = ws_upgrade(&mut req, None).expect("Failed to upgrade websocket");
// This should be spawned as a registered task, otherwise // This should be spawned as a scheduler task, otherwise
// the scheduler may exit early and cancel this even though what // the scheduler may exit early and cancel this even though what
// we want here is a long-running task that keeps the program alive // we want here is a long-running task that keeps the program alive
let sched = lua let sched = lua
.app_data_ref::<&Scheduler>() .app_data_ref::<&Scheduler>()
.expect("Lua struct is missing scheduler"); .expect("Lua struct is missing scheduler");
// TODO: Implement background task registration in scheduler
let (background_tx, background_rx) = oneshot::channel();
sched.schedule_future(async move { sched.schedule_future(async move {
let _ = background_rx.await;
});
task::spawn_local(async move {
// Create our new full websocket object, then // Create our new full websocket object, then
// schedule our handler to get called asap // schedule our handler to get called asap
let res = async move {
let ws = ws.await.into_lua_err()?; let ws = ws.await.into_lua_err()?;
let sock = NetWebSocket::new(ws).into_lua_table(lua)?; let sock = NetWebSocket::new(ws).into_lua_table(lua)?;
let sched = lua sched.push_front(
.app_data_ref::<&Scheduler>()
.expect("Lua struct is missing scheduler");
let result = sched.push_front(
lua.create_thread(handler)?, lua.create_thread(handler)?,
LuaMultiValue::from_vec(vec![LuaValue::Table(sock)]), LuaMultiValue::from_vec(vec![LuaValue::Table(sock)]),
); )
let _ = background_tx.send(()); };
result if let Err(e) = res.await {
lua.emit_error(e);
}
}); });
Box::pin(async move { Ok(response) }) Box::pin(async move { Ok(response) })
} else { } else {

View file

@ -24,10 +24,9 @@ impl Lune {
*/ */
#[allow(clippy::new_without_default)] #[allow(clippy::new_without_default)]
pub fn new() -> Self { pub fn new() -> Self {
// FIXME: Leaking these and using a manual drop implementation // FIXME: Leaking these here does not feel great... is there
// does not feel great... is there any way for us to create a // any way for us to create a scheduler, store it in app data, and
// scheduler, store it in app data, and guarantee it has // guarantee it has the same lifetime as Lua without using any unsafe?
// the same lifetime as Lua without using any unsafe?
let lua = Lua::new().into_static(); let lua = Lua::new().into_static();
let scheduler = Scheduler::new(lua).into_static(); let scheduler = Scheduler::new(lua).into_static();
@ -70,16 +69,3 @@ impl Lune {
Ok(self.scheduler.run_to_completion().await) Ok(self.scheduler.run_to_completion().await)
} }
} }
impl Drop for Lune {
fn drop(&mut self) {
// SAFETY: When the Lune struct is dropped, it is guaranteed
// that the Lua and Scheduler structs are no longer being used,
// since all the methods that reference them (eg. `run`)
// take an exclusive / mutable reference
unsafe {
Lua::from_static(self.lua);
Scheduler::from_static(self.scheduler);
}
}
}

View file

@ -104,12 +104,18 @@ where
// Wait until we either manually break out of resumption or a future completes // Wait until we either manually break out of resumption or a future completes
tokio::select! { tokio::select! {
_res = rx.recv() => break, res = rx.recv() => {
res = futs.next() => { if res.is_err() {
match res { panic!(
"Futures break signal was dropped but futures still remain - \
this may cause memory unsafety if a future accesses lua struct"
)
}
break;
},
res = futs.next() => match res {
Some(_) => resumed_any = true, Some(_) => resumed_any = true,
None => break, None => break,
}
}, },
} }

View file

@ -60,7 +60,7 @@ where
// NOTE: We might be resuming futures, need to signal that a // NOTE: We might be resuming futures, need to signal that a
// new lua thread is ready to break out of futures resumption // new lua thread is ready to break out of futures resumption
if self.futures_break_signal.receiver_count() > 0 { if self.futures_break_signal.receiver_count() > 0 {
self.futures_break_signal.send(()).ok(); self.futures_break_signal.send(false).ok();
} }
Ok(()) Ok(())
@ -98,7 +98,7 @@ where
// NOTE: We might be resuming futures, need to signal that a // NOTE: We might be resuming futures, need to signal that a
// new lua thread is ready to break out of futures resumption // new lua thread is ready to break out of futures resumption
if self.futures_break_signal.receiver_count() > 0 { if self.futures_break_signal.receiver_count() > 0 {
self.futures_break_signal.send(()).ok(); self.futures_break_signal.send(false).ok();
} }
Ok(thread_id) Ok(thread_id)
@ -136,7 +136,7 @@ where
// NOTE: We might be resuming futures, need to signal that a // NOTE: We might be resuming futures, need to signal that a
// new lua thread is ready to break out of futures resumption // new lua thread is ready to break out of futures resumption
if self.futures_break_signal.receiver_count() > 0 { if self.futures_break_signal.receiver_count() > 0 {
self.futures_break_signal.send(()).ok(); self.futures_break_signal.send(false).ok();
} }
Ok(thread_id) Ok(thread_id)

View file

@ -43,7 +43,7 @@ pub(crate) struct Scheduler<'lua, 'fut> {
threads: Arc<RefCell<VecDeque<SchedulerThread>>>, threads: Arc<RefCell<VecDeque<SchedulerThread>>>,
thread_senders: Arc<RefCell<HashMap<SchedulerThreadId, SchedulerThreadSender>>>, thread_senders: Arc<RefCell<HashMap<SchedulerThreadId, SchedulerThreadSender>>>,
futures: Arc<AsyncMutex<FuturesUnordered<SchedulerFuture<'fut>>>>, futures: Arc<AsyncMutex<FuturesUnordered<SchedulerFuture<'fut>>>>,
futures_break_signal: Sender<()>, futures_break_signal: Sender<bool>,
} }
impl<'lua, 'fut> Scheduler<'lua, 'fut> { impl<'lua, 'fut> Scheduler<'lua, 'fut> {
@ -88,9 +88,4 @@ impl<'lua, 'fut> Scheduler<'lua, 'fut> {
pub fn into_static(self) -> &'static Self { pub fn into_static(self) -> &'static Self {
Box::leak(Box::new(self)) Box::leak(Box::new(self))
} }
#[doc(hidden)]
pub unsafe fn from_static(lua: &'static Scheduler) -> Self {
*Box::from_raw(lua as *const Scheduler as *mut Scheduler)
}
} }

View file

@ -55,9 +55,8 @@ create_tests! {
net_request_redirect: "net/request/redirect", net_request_redirect: "net/request/redirect",
net_url_encode: "net/url/encode", net_url_encode: "net/url/encode",
net_url_decode: "net/url/decode", net_url_decode: "net/url/decode",
// FIXME: Net library has futures running past the lifetime of lua, causing sigsegv net_serve_requests: "net/serve/requests",
// net_serve_requests: "net/serve/requests", net_serve_websockets: "net/serve/websockets",
// net_serve_websockets: "net/serve/websockets",
net_socket_wss: "net/socket/wss", net_socket_wss: "net/socket/wss",
net_socket_wss_rw: "net/socket/wss_rw", net_socket_wss_rw: "net/socket/wss_rw",