From d593b0abd3729e49248614f71c5a6f6a53d8f182 Mon Sep 17 00:00:00 2001 From: Filip Tibell Date: Mon, 21 Aug 2023 12:39:43 -0500 Subject: [PATCH] New net.serve implementation --- src/lune/builtins/net/mod.rs | 40 +++-------- src/lune/builtins/net/server.rs | 120 ++++++++++++++++++++++++++++++++ tests/net/serve/websockets.luau | 2 +- 3 files changed, 129 insertions(+), 33 deletions(-) create mode 100644 src/lune/builtins/net/server.rs diff --git a/src/lune/builtins/net/mod.rs b/src/lune/builtins/net/mod.rs index 87f7945..85aae45 100644 --- a/src/lune/builtins/net/mod.rs +++ b/src/lune/builtins/net/mod.rs @@ -2,15 +2,12 @@ use std::collections::HashMap; use mlua::prelude::*; -use console::style; -use hyper::{ - header::{CONTENT_ENCODING, CONTENT_LENGTH}, - Server, -}; -use tokio::sync::mpsc::{self, channel}; +use hyper::header::{CONTENT_ENCODING, CONTENT_LENGTH}; use crate::lune::{scheduler::Scheduler, util::TableBuilder}; +use self::server::create_server; + use super::serde::{ compress_decompress::{decompress, CompressDecompressFormat}, encode_decode::{EncodeDecodeConfig, EncodeDecodeFormat}, @@ -19,11 +16,13 @@ use super::serde::{ mod client; mod config; mod response; +mod server; mod websocket; use client::{NetClient, NetClientBuilder}; use config::{RequestConfig, ServeConfig}; use response::NetServeResponse; +use server::bind_to_localhost; use websocket::NetWebSocket; pub fn create(lua: &'static Lua) -> LuaResult { @@ -140,36 +139,13 @@ async fn net_serve<'lua>( where 'lua: 'static, // FIXME: Get rid of static lifetime bound here { - // Note that we need to use a mpsc here and not - // a oneshot channel since we move the sender - // into our table with the stop function - let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<()>(1); - let server_request_callback = lua - .create_registry_value(config.handle_request) - .expect("Failed to store request handler in registry - out of memory"); - let server_websocket_callback = config.handle_web_socket.map(|handler| { - lua.create_registry_value(handler) - .expect("Failed to store websocket handler in registry - out of memory") - }); let sched = lua .app_data_ref::<&Scheduler>() .expect("Lua struct is missing scheduler"); - // TODO: Spawn a scheduler background task here, communicate using - // mpsc channels, do any heavy lifting possible in the other thread - let (tx_request, mut rx_request) = channel::<()>(64); - let (tx_websocket, mut rx_websocket) = channel::<()>(64); - // Create a new read-only table that contains methods - // for manipulating server behavior and shutting it down - let handle_stop = move |_, _: ()| match shutdown_tx.try_send(()) { - Ok(_) => Ok(()), - Err(_) => Err(LuaError::RuntimeError( - "Server has already been stopped".to_string(), - )), - }; - TableBuilder::new(lua)? - .with_function("stop", handle_stop)? - .build_readonly() + let builder = bind_to_localhost(port)?; + + create_server(lua, &sched, config, builder) } fn net_url_encode<'lua>( diff --git a/src/lune/builtins/net/server.rs b/src/lune/builtins/net/server.rs new file mode 100644 index 0000000..c99a225 --- /dev/null +++ b/src/lune/builtins/net/server.rs @@ -0,0 +1,120 @@ +use std::{convert::Infallible, net::SocketAddr}; + +use hyper::{ + server::{conn::AddrIncoming, Builder}, + service::{make_service_fn, service_fn}, + Response, Server, +}; +use mlua::prelude::*; +use tokio::sync::mpsc; + +use crate::lune::{scheduler::Scheduler, util::TableBuilder}; + +use super::config::ServeConfig; + +pub(super) fn bind_to_localhost(port: u16) -> LuaResult> { + let addr = match SocketAddr::try_from(([127, 0, 0, 1], port)) { + Ok(a) => a, + Err(e) => { + return Err(LuaError::external(format!( + "Failed to bind to localhost on port {port}\n{e}" + ))) + } + }; + match Server::try_bind(&addr) { + Ok(b) => Ok(b), + Err(e) => Err(LuaError::external(format!( + "Failed to bind to localhost on port {port}\n{}", + e.to_string() + .replace("error creating server listener: ", "> ") + ))), + } +} + +pub(super) fn create_server<'lua>( + lua: &'lua Lua, + sched: &'lua Scheduler, + config: ServeConfig<'lua>, + builder: Builder, +) -> LuaResult> +where + 'lua: 'static, // FIXME: Get rid of static lifetime bound here +{ + // Note that we need to use a mpsc here and not + // a oneshot channel since we move the sender + // into our table with the stop function + let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<()>(1); + + // Spawn a scheduler background task, communicate using mpsc + // channels, do any heavy lifting possible in background thread + let (tx_request, mut rx_request) = mpsc::channel::<()>(64); + let (tx_websocket, mut rx_websocket) = mpsc::channel::<()>(64); + sched.spawn(async move { + let result = builder + .serve(make_service_fn(|_| async move { + Ok::<_, Infallible>(service_fn(|_req| async move { + // TODO: Send this request back to lua + let res = Response::new("TODO".to_string()); + Ok::<_, Infallible>(res) + })) + })) + .with_graceful_shutdown(async move { + shutdown_rx.recv().await; + }); + if let Err(e) = result.await { + eprintln!("Net serve error: {e}") + } + }); + + // Spawn a local thread with access to lua, this will get + // requests and sockets to handle using our lua handlers + sched.spawn_local(async move { + loop { + let (req, sock) = tokio::select! { + req = rx_request.recv() => (req, None), + sock = rx_websocket.recv() => (None, sock), + }; + if req.is_none() && sock.is_none() { + break; + } + if let Some(_req) = req { + // TODO: Convert request into lua request struct + let thread_id = sched + .push_back(lua, config.handle_request.clone(), ()) + .expect("Failed to spawn net serve handler"); + // TODO: Send response back to other thread somehow + match sched.wait_for_thread(lua, thread_id).await { + Err(e) => eprintln!("Net serve handler error: {e}"), + Ok(v) => println!("Net serve handler result: {v:?}"), + }; + } + if let Some(_sock) = sock { + let handle_web_socket = config + .handle_web_socket + .as_ref() + .expect("Got web socket but web socket handler is missing"); + // TODO: Convert request into lua request struct + let thread_id = sched + .push_back(lua, handle_web_socket.clone(), ()) + .expect("Failed to spawn net websocket handler"); + // TODO: Send response back to other thread somehow + match sched.wait_for_thread(lua, thread_id).await { + Err(e) => eprintln!("Net websocket handler error: {e}"), + Ok(v) => println!("Net websocket handler result: {v:?}"), + }; + } + } + }); + + // Create a new read-only table that contains methods + // for manipulating server behavior and shutting it down + let handle_stop = move |_, _: ()| match shutdown_tx.try_send(()) { + Ok(_) => Ok(()), + Err(_) => Err(LuaError::RuntimeError( + "Server has already been stopped".to_string(), + )), + }; + TableBuilder::new(lua)? + .with_function("stop", handle_stop)? + .build_readonly() +} diff --git a/tests/net/serve/websockets.luau b/tests/net/serve/websockets.luau index cf42cee..0a49e74 100644 --- a/tests/net/serve/websockets.luau +++ b/tests/net/serve/websockets.luau @@ -11,7 +11,7 @@ local RESPONSE = "Hello, lune!" -- Serve should not block the thread from continuing -local thread = task.delay(0.2, function() +local thread = task.delay(1, function() stdio.ewrite("Serve must not block the current thread\n") task.wait(1) process.exit(1)