mirror of
https://github.com/lune-org/lune.git
synced 2025-04-04 10:30:54 +01:00
Fully working implementation of new server
This commit is contained in:
parent
718572f537
commit
b9fcafba9d
4 changed files with 314 additions and 75 deletions
1
Cargo.lock
generated
1
Cargo.lock
generated
|
@ -1360,6 +1360,7 @@ dependencies = [
|
|||
"env_logger",
|
||||
"futures-util",
|
||||
"glam",
|
||||
"http 1.0.0",
|
||||
"http-body-util",
|
||||
"hyper 1.1.0",
|
||||
"hyper-tungstenite",
|
||||
|
|
|
@ -110,6 +110,7 @@ toml = { version = "0.8", features = ["preserve_order"] }
|
|||
|
||||
hyper = { version = "1.1", features = ["full"] }
|
||||
hyper-util = { version = "0.1", features = ["full"] }
|
||||
http = "1.0"
|
||||
http-body-util = { version = "0.1" }
|
||||
hyper-tungstenite = { version = "0.13" }
|
||||
|
||||
|
|
|
@ -1,96 +1,331 @@
|
|||
use std::{convert::Infallible, net::SocketAddr};
|
||||
|
||||
use http_body_util::Full;
|
||||
use hyper_util::rt::TokioIo;
|
||||
use tokio::{net::TcpListener, spawn, sync::mpsc::channel};
|
||||
|
||||
use hyper::{
|
||||
body::{Bytes, Incoming},
|
||||
server::conn::http1,
|
||||
service::service_fn,
|
||||
Request, Response,
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
future::Future,
|
||||
net::SocketAddr,
|
||||
pin::Pin,
|
||||
rc::{Rc, Weak},
|
||||
str::FromStr,
|
||||
};
|
||||
|
||||
use http::request::Parts;
|
||||
use http_body_util::{BodyExt, Full};
|
||||
use hyper::{
|
||||
body::{Bytes, Incoming},
|
||||
header::{HeaderName, HeaderValue},
|
||||
server::conn::http1,
|
||||
service::Service,
|
||||
HeaderMap, Request, Response,
|
||||
};
|
||||
use hyper_tungstenite::{is_upgrade_request, upgrade};
|
||||
use hyper_util::rt::TokioIo;
|
||||
use tokio::{net::TcpListener, pin};
|
||||
|
||||
use mlua::prelude::*;
|
||||
use mlua_luau_scheduler::{LuaSchedulerExt, LuaSpawnExt};
|
||||
|
||||
use crate::lune::util::TableBuilder;
|
||||
|
||||
use super::config::ServeConfig;
|
||||
use super::{config::ServeConfig, websocket::NetWebSocket};
|
||||
|
||||
const SERVER_IMPL_LUA: &str = r#"
|
||||
spawn(function()
|
||||
while true do
|
||||
local id, request, socket, exit = server:next()
|
||||
if exit then
|
||||
break
|
||||
end
|
||||
spawn(function()
|
||||
if socket ~= nil then
|
||||
local handler = server:getRequestHandler()
|
||||
local response = handler(request)
|
||||
server:respond(id, response)
|
||||
elseif request ~= nil then
|
||||
local handler = server:getWebsocketHandler()
|
||||
handler(socket)
|
||||
end
|
||||
end)
|
||||
end
|
||||
end)
|
||||
"#;
|
||||
struct LuaRequest {
|
||||
_remote_addr: SocketAddr,
|
||||
head: Parts,
|
||||
body: Vec<u8>,
|
||||
}
|
||||
|
||||
pub(super) async fn serve<'lua>(
|
||||
impl LuaUserData for LuaRequest {
|
||||
fn add_fields<'lua, F: LuaUserDataFields<'lua, Self>>(fields: &mut F) {
|
||||
fields.add_field_method_get("method", |_, this| {
|
||||
Ok(this.head.method.as_str().to_string())
|
||||
});
|
||||
|
||||
fields.add_field_method_get("path", |_, this| Ok(this.head.uri.path().to_string()));
|
||||
|
||||
fields.add_field_method_get("query", |_, this| {
|
||||
let query: HashMap<String, String> = this
|
||||
.head
|
||||
.uri
|
||||
.query()
|
||||
.unwrap_or_default()
|
||||
.split('&')
|
||||
.filter_map(|q| q.split_once('='))
|
||||
.map(|(k, v)| (k.to_string(), v.to_string()))
|
||||
.collect();
|
||||
Ok(query)
|
||||
});
|
||||
|
||||
fields.add_field_method_get("headers", |_, this| {
|
||||
let headers: HashMap<String, Vec<u8>> = this
|
||||
.head
|
||||
.headers
|
||||
.iter()
|
||||
.map(|(k, v)| (k.as_str().to_string(), v.as_bytes().to_vec()))
|
||||
.collect();
|
||||
Ok(headers)
|
||||
});
|
||||
|
||||
fields.add_field_method_get("body", |lua, this| lua.create_string(&this.body));
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
enum LuaResponseKind {
|
||||
PlainText,
|
||||
Table,
|
||||
}
|
||||
|
||||
struct LuaResponse {
|
||||
kind: LuaResponseKind,
|
||||
status: u16,
|
||||
headers: HeaderMap,
|
||||
body: Option<Vec<u8>>,
|
||||
}
|
||||
|
||||
impl LuaResponse {
|
||||
fn into_response(self) -> LuaResult<Response<Full<Bytes>>> {
|
||||
Ok(match self.kind {
|
||||
LuaResponseKind::PlainText => Response::builder()
|
||||
.status(200)
|
||||
.header("Content-Type", "text/plain")
|
||||
.body(Full::new(Bytes::from(self.body.unwrap())))
|
||||
.into_lua_err()?,
|
||||
LuaResponseKind::Table => {
|
||||
let mut response = Response::builder()
|
||||
.status(self.status)
|
||||
.body(Full::new(Bytes::from(self.body.unwrap_or_default())))
|
||||
.into_lua_err()?;
|
||||
response.headers_mut().extend(self.headers);
|
||||
response
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl FromLua<'_> for LuaResponse {
|
||||
fn from_lua(value: LuaValue, _: &Lua) -> LuaResult<Self> {
|
||||
match value {
|
||||
// Plain strings from the handler are plaintext responses
|
||||
LuaValue::String(s) => Ok(Self {
|
||||
kind: LuaResponseKind::PlainText,
|
||||
status: 200,
|
||||
headers: HeaderMap::new(),
|
||||
body: Some(s.as_bytes().to_vec()),
|
||||
}),
|
||||
// Tables are more detailed responses with potential status, headers, body
|
||||
LuaValue::Table(t) => {
|
||||
let status: Option<u16> = t.get("status")?;
|
||||
let headers: Option<LuaTable> = t.get("headers")?;
|
||||
let body: Option<LuaString> = t.get("body")?;
|
||||
|
||||
let mut headers_map = HeaderMap::new();
|
||||
if let Some(headers) = headers {
|
||||
for pair in headers.pairs::<String, LuaString>() {
|
||||
let (h, v) = pair?;
|
||||
let name = HeaderName::from_str(&h).into_lua_err()?;
|
||||
let value = HeaderValue::from_bytes(v.as_bytes()).into_lua_err()?;
|
||||
headers_map.insert(name, value);
|
||||
}
|
||||
}
|
||||
|
||||
let body_bytes = body.map(|s| s.as_bytes().to_vec());
|
||||
|
||||
Ok(Self {
|
||||
kind: LuaResponseKind::Table,
|
||||
status: status.unwrap_or(200),
|
||||
headers: headers_map,
|
||||
body: body_bytes,
|
||||
})
|
||||
}
|
||||
// Anything else is an error
|
||||
value => Err(LuaError::FromLuaConversionError {
|
||||
from: value.type_name(),
|
||||
to: "NetServeResponse",
|
||||
message: None,
|
||||
}),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct Svc {
|
||||
lua: Rc<Lua>,
|
||||
addr: SocketAddr,
|
||||
handler_request: LuaRegistryKey,
|
||||
handler_websocket: LuaRegistryKey,
|
||||
has_websocket_handler: bool,
|
||||
}
|
||||
|
||||
impl Svc {
|
||||
fn clone_registry_keys(&self) -> (LuaRegistryKey, LuaRegistryKey) {
|
||||
let cloned_request = self
|
||||
.lua
|
||||
.registry_value::<LuaFunction>(&self.handler_request)
|
||||
.expect("Failed to clone registry value");
|
||||
let cloned_websocket = self
|
||||
.lua
|
||||
.registry_value::<Option<LuaFunction>>(&self.handler_websocket)
|
||||
.expect("Failed to clone registry value");
|
||||
|
||||
let stored_request = self
|
||||
.lua
|
||||
.create_registry_value(cloned_request)
|
||||
.expect("Failed to clone registry value");
|
||||
let stored_websocket = self
|
||||
.lua
|
||||
.create_registry_value(cloned_websocket)
|
||||
.expect("Failed to clone registry value");
|
||||
|
||||
(stored_request, stored_websocket)
|
||||
}
|
||||
}
|
||||
|
||||
impl Clone for Svc {
|
||||
fn clone(&self) -> Self {
|
||||
let (handler_request, handler_websocket) = self.clone_registry_keys();
|
||||
Self {
|
||||
lua: self.lua.clone(),
|
||||
addr: self.addr,
|
||||
handler_request,
|
||||
handler_websocket,
|
||||
has_websocket_handler: self.has_websocket_handler,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Service<Request<Incoming>> for Svc {
|
||||
type Response = Response<Full<Bytes>>;
|
||||
type Error = LuaError;
|
||||
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>;
|
||||
|
||||
fn call(&self, req: Request<Incoming>) -> Self::Future {
|
||||
let addr = self.addr;
|
||||
let lua = self.lua.clone();
|
||||
|
||||
let (handler_request, handler_websocket) = self.clone_registry_keys();
|
||||
|
||||
if self.has_websocket_handler && is_upgrade_request(&req) {
|
||||
Box::pin(async move {
|
||||
let (res, sock) = upgrade(req, None).into_lua_err()?;
|
||||
|
||||
let lua_inner = lua.clone();
|
||||
lua.spawn_local(async move {
|
||||
let sock = sock.await.unwrap();
|
||||
let lua_sock = NetWebSocket::new(sock).into_lua_table(&lua_inner).unwrap();
|
||||
|
||||
let handler_websocket = lua_inner
|
||||
.registry_value::<LuaFunction>(&handler_websocket)
|
||||
.unwrap();
|
||||
|
||||
lua_inner
|
||||
.push_thread_back(handler_websocket, lua_sock)
|
||||
.unwrap();
|
||||
});
|
||||
|
||||
Ok(res)
|
||||
})
|
||||
} else {
|
||||
let (head, body) = req.into_parts();
|
||||
|
||||
Box::pin(async move {
|
||||
let handler_request = lua.registry_value::<LuaFunction>(&handler_request)?;
|
||||
|
||||
let body = body.collect().await.into_lua_err()?.to_bytes().to_vec();
|
||||
|
||||
let lua_req = LuaRequest {
|
||||
_remote_addr: addr,
|
||||
head,
|
||||
body,
|
||||
};
|
||||
|
||||
let thread_id = lua.push_thread_back(handler_request, lua_req)?;
|
||||
lua.track_thread(thread_id);
|
||||
lua.wait_for_thread(thread_id).await;
|
||||
let thread_res = lua
|
||||
.get_thread_result(thread_id)
|
||||
.expect("Missing handler thread result")?;
|
||||
|
||||
LuaResponse::from_lua_multi(thread_res, &lua)?.into_response()
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn serve<'lua>(
|
||||
lua: &'lua Lua,
|
||||
port: u16,
|
||||
config: ServeConfig<'lua>,
|
||||
) -> LuaResult<LuaTable<'lua>> {
|
||||
let addr = SocketAddr::from((config.address, port));
|
||||
let listener = TcpListener::bind(addr).await.map_err(|e| {
|
||||
LuaError::external(format!(
|
||||
"Failed to bind to {addr}\n{}",
|
||||
e.to_string()
|
||||
.replace("error creating server listener: ", "> ")
|
||||
))
|
||||
})?;
|
||||
let addr: SocketAddr = (config.address, port).into();
|
||||
let listener = TcpListener::bind(addr).await?;
|
||||
|
||||
// Spawn a new task to accept incoming connections + listening for shutdown
|
||||
let (shutdown_tx, mut shutdown_rx) = channel::<()>(1);
|
||||
spawn(async move {
|
||||
let (lua_inner, lua_inner_2) = {
|
||||
let rc = lua
|
||||
.app_data_ref::<Weak<Lua>>()
|
||||
.expect("Missing weak lua ref")
|
||||
.upgrade()
|
||||
.expect("Lua was dropped unexpectedly");
|
||||
(Rc::clone(&rc), rc)
|
||||
};
|
||||
|
||||
let svc = Svc {
|
||||
lua: lua_inner,
|
||||
addr,
|
||||
has_websocket_handler: config.handle_web_socket.is_some(),
|
||||
handler_request: lua.create_registry_value(config.handle_request)?,
|
||||
handler_websocket: lua.create_registry_value(config.handle_web_socket)?,
|
||||
};
|
||||
|
||||
let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
|
||||
lua.spawn_local(async move {
|
||||
let mut shutdown_rx_outer = shutdown_rx.clone();
|
||||
loop {
|
||||
tokio::select! {
|
||||
// If we receive a shutdown signal, break the loop
|
||||
_ = shutdown_rx.recv() => break,
|
||||
// Each connection gets its own task that forwards to lua
|
||||
accepted = listener.accept() => {
|
||||
match accepted {
|
||||
Err(e) => println!("Error accepting connection: {e}"),
|
||||
Ok((s, _)) => {
|
||||
let io = TokioIo::new(s);
|
||||
spawn(async move {
|
||||
if let Err(err) = http1::Builder::new()
|
||||
.serve_connection(io, service_fn(|_| async move {
|
||||
// TODO: Forward to lua somehow
|
||||
Ok::<_, Infallible>(Response::new(Full::new(Bytes::from("Hello, World!"))))
|
||||
}))
|
||||
.await
|
||||
{
|
||||
println!("Error serving connection: {err:?}");
|
||||
}
|
||||
});
|
||||
// Create futures for accepting new connections and shutting down
|
||||
let fut_shutdown = shutdown_rx_outer.changed();
|
||||
let fut_accept = async {
|
||||
let stream = match listener.accept().await {
|
||||
Err(_) => return,
|
||||
Ok((s, _)) => s,
|
||||
};
|
||||
|
||||
let io = TokioIo::new(stream);
|
||||
let svc = svc.clone();
|
||||
let mut shutdown_rx_inner = shutdown_rx.clone();
|
||||
|
||||
lua_inner_2.spawn_local(async move {
|
||||
let conn = http1::Builder::new()
|
||||
.keep_alive(true) // Web sockets need this
|
||||
.serve_connection(io, svc)
|
||||
.with_upgrades();
|
||||
// NOTE: Because we need to use keep_alive for websockets, we need to
|
||||
// also manually poll this future and handle the shutdown signal here
|
||||
pin!(conn);
|
||||
tokio::select! {
|
||||
_ = conn.as_mut() => {}
|
||||
_ = shutdown_rx_inner.changed() => {
|
||||
conn.as_mut().graceful_shutdown();
|
||||
}
|
||||
}
|
||||
});
|
||||
};
|
||||
|
||||
// Wait for either a new connection or a shutdown signal
|
||||
tokio::select! {
|
||||
_ = fut_accept => {}
|
||||
res = fut_shutdown => {
|
||||
// NOTE: We will only get a RecvError here if the serve handle is dropped,
|
||||
// this means lua has garbage collected it and the user does not want
|
||||
// to manually stop the server using the serve handle. Run forever.
|
||||
if res.is_ok() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Create a new read-only table that contains methods
|
||||
// for manipulating server behavior and shutting it down
|
||||
let handle_stop = move |_, _: ()| match shutdown_tx.try_send(()) {
|
||||
Err(_) => Err(LuaError::runtime("Server has already been stopped")),
|
||||
Ok(_) => Ok(()),
|
||||
};
|
||||
|
||||
TableBuilder::new(lua)?
|
||||
.with_function("stop", handle_stop)?
|
||||
.with_function("stop", move |lua, _: ()| match shutdown_tx.send(true) {
|
||||
Ok(_) => Ok(()),
|
||||
Err(_) => Err(LuaError::runtime("Server already stopped")),
|
||||
})?
|
||||
.build_readonly()
|
||||
}
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
use std::{
|
||||
process::ExitCode,
|
||||
rc::Rc,
|
||||
sync::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
Arc,
|
||||
|
@ -7,6 +8,7 @@ use std::{
|
|||
};
|
||||
|
||||
use mlua::Lua;
|
||||
use mlua_luau_scheduler::Scheduler;
|
||||
|
||||
mod builtins;
|
||||
mod error;
|
||||
|
@ -15,11 +17,10 @@ mod globals;
|
|||
pub(crate) mod util;
|
||||
|
||||
pub use error::RuntimeError;
|
||||
use mlua_luau_scheduler::Scheduler;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Runtime {
|
||||
lua: Lua,
|
||||
lua: Rc<Lua>,
|
||||
args: Vec<String>,
|
||||
}
|
||||
|
||||
|
@ -29,8 +30,9 @@ impl Runtime {
|
|||
*/
|
||||
#[allow(clippy::new_without_default)]
|
||||
pub fn new() -> Self {
|
||||
let lua = Lua::new();
|
||||
let lua = Rc::new(Lua::new());
|
||||
|
||||
lua.set_app_data(Rc::downgrade(&lua));
|
||||
lua.set_app_data(Vec::<String>::new());
|
||||
|
||||
Self {
|
||||
|
|
Loading…
Add table
Reference in a new issue