Make websockets work again

This commit is contained in:
Filip Tibell 2023-02-20 14:43:14 +01:00
parent 801da61c0f
commit c57677bdd3
No known key found for this signature in database
6 changed files with 103 additions and 112 deletions

View file

@ -8,8 +8,10 @@ use tokio::{sync::mpsc, task};
use crate::{ use crate::{
lua::{ lua::{
// net::{NetWebSocketClient, NetWebSocketServer}, net::{
net::{NetClient, NetClientBuilder, NetLocalExec, NetService, RequestConfig, ServeConfig}, NetClient, NetClientBuilder, NetLocalExec, NetService, NetWebSocketClient,
RequestConfig, ServeConfig,
},
task::{TaskScheduler, TaskSchedulerAsyncExt}, task::{TaskScheduler, TaskSchedulerAsyncExt},
}, },
utils::{net::get_request_user_agent_header, table::TableBuilder}, utils::{net::get_request_user_agent_header, table::TableBuilder},
@ -78,27 +80,17 @@ async fn net_request<'a>(lua: &'static Lua, config: RequestConfig<'a>) -> LuaRes
.build_readonly() .build_readonly()
} }
async fn net_socket<'a>(_lua: &'static Lua, _url: String) -> LuaResult<LuaTable> { async fn net_socket<'a>(lua: &'static Lua, url: String) -> LuaResult<LuaTable> {
Err(LuaError::RuntimeError( let (ws, _) = tokio_tungstenite::connect_async(url)
"Client websockets are not yet implemented".to_string(), .await
)) .map_err(LuaError::external)?;
// let (ws, _) = tokio_tungstenite::connect_async(url) NetWebSocketClient::from(ws).into_lua_table(lua)
// .await
// .map_err(LuaError::external)?;
// let sock = NetWebSocketClient::from(ws);
// let table = sock.into_lua_table(lua)?;
// Ok(table)
} }
async fn net_serve<'a>( async fn net_serve<'a>(
lua: &'static Lua, lua: &'static Lua,
(port, config): (u16, ServeConfig<'a>), (port, config): (u16, ServeConfig<'a>),
) -> LuaResult<LuaTable<'a>> { ) -> LuaResult<LuaTable<'a>> {
if config.handle_web_socket.is_some() {
return Err(LuaError::RuntimeError(
"Server websockets are not yet implemented".to_string(),
));
}
// Note that we need to use a mpsc here and not // Note that we need to use a mpsc here and not
// a oneshot channel since we move the sender // a oneshot channel since we move the sender
// into our table with the stop function // into our table with the stop function

View file

@ -22,7 +22,7 @@ pub trait LuaAsyncExt {
where where
A: FromLuaMulti<'static>, A: FromLuaMulti<'static>,
R: ToLuaMulti<'static>, R: ToLuaMulti<'static>,
F: 'static + Fn(&'static Lua, A) -> FR, F: 'static + Fn(&'lua Lua, A) -> FR,
FR: 'static + Future<Output = LuaResult<R>>; FR: 'static + Future<Output = LuaResult<R>>;
fn create_waiter_function<'lua>(self) -> LuaResult<LuaFunction<'lua>>; fn create_waiter_function<'lua>(self) -> LuaResult<LuaFunction<'lua>>;
@ -37,7 +37,7 @@ impl LuaAsyncExt for &'static Lua {
where where
A: FromLuaMulti<'static>, A: FromLuaMulti<'static>,
R: ToLuaMulti<'static>, R: ToLuaMulti<'static>,
F: 'static + Fn(&'static Lua, A) -> FR, F: 'static + Fn(&'lua Lua, A) -> FR,
FR: 'static + Future<Output = LuaResult<R>>, FR: 'static + Future<Output = LuaResult<R>>,
{ {
let async_env_yield: LuaFunction = self.named_registry_value("co.yield")?; let async_env_yield: LuaFunction = self.named_registry_value("co.yield")?;

View file

@ -1,11 +1,11 @@
mod client; mod client;
mod config; mod config;
mod server; mod server;
// mod ws_client; mod ws_client;
// mod ws_server; mod ws_server;
pub use client::{NetClient, NetClientBuilder}; pub use client::{NetClient, NetClientBuilder};
pub use config::{RequestConfig, ServeConfig}; pub use config::{RequestConfig, ServeConfig};
pub use server::{NetLocalExec, NetService}; pub use server::{NetLocalExec, NetService};
// pub use ws_client::NetWebSocketClient; pub use ws_client::NetWebSocketClient;
// pub use ws_server::NetWebSocketServer; pub use ws_server::NetWebSocketServer;

View file

@ -12,7 +12,12 @@ use hyper::{Body, Request, Response};
use hyper_tungstenite::{is_upgrade_request as is_ws_upgrade_request, upgrade as ws_upgrade}; use hyper_tungstenite::{is_upgrade_request as is_ws_upgrade_request, upgrade as ws_upgrade};
use tokio::task; use tokio::task;
use crate::utils::table::TableBuilder; use crate::{
lua::task::{TaskScheduler, TaskSchedulerAsyncExt, TaskSchedulerScheduleExt},
utils::table::TableBuilder,
};
use super::NetWebSocketServer;
// Hyper service implementation for net, lots of boilerplate here // Hyper service implementation for net, lots of boilerplate here
// but make_svc and make_svc_function do not work for what we need // but make_svc and make_svc_function do not work for what we need
@ -40,25 +45,30 @@ impl Service<Request<Body>> for NetServiceInner {
// and then call our handler with a new socket object // and then call our handler with a new socket object
let kopt = self.2.clone(); let kopt = self.2.clone();
let key = kopt.as_ref().as_ref().unwrap(); let key = kopt.as_ref().as_ref().unwrap();
let _handler: LuaFunction = lua.registry_value(key).expect("Missing websocket handler"); let handler: LuaFunction = lua.registry_value(key).expect("Missing websocket handler");
let (response, ws) = ws_upgrade(&mut req, None).expect("Failed to upgrade websocket"); let (response, ws) = ws_upgrade(&mut req, None).expect("Failed to upgrade websocket");
// TODO: This should be spawned as part of the scheduler, // This should be spawned as a registered task, otherwise
// the scheduler may exit early and cancel this even though what // the scheduler may exit early and cancel this even though what
// we want here is a long-running task that keeps the program alive // we want here is a long-running task that keeps the program alive
let sched = lua
.app_data_ref::<&TaskScheduler>()
.expect("Missing task scheduler - make sure it is added as a lua app data before the first scheduler resumption");
let handle = sched.register_background_task();
task::spawn_local(async move { task::spawn_local(async move {
// Create our new full websocket object, then // Create our new full websocket object, then
// schedule our handler to get called asap // schedule our handler to get called asap
let _ws = ws.await.map_err(LuaError::external)?; let ws = ws.await.map_err(LuaError::external)?;
// let sock = NetWebSocketServer::from(ws); let sock = NetWebSocketServer::from(ws);
// let table = sock.into_lua_table(lua)?; let table = sock.into_lua_table(lua)?;
// let sched = lua let sched = lua
// .app_data_ref::<&TaskScheduler>() .app_data_ref::<&TaskScheduler>()
// .expect("Missing task scheduler - make sure it is added as a lua app data before the first scheduler resumption"); .expect("Missing task scheduler - make sure it is added as a lua app data before the first scheduler resumption");
// sched.schedule_current_resume( let result = sched.schedule_blocking(
// LuaValue::Function(handler), lua.create_thread(handler)?,
// LuaMultiValue::from_vec(vec![LuaValue::Table(table)]), LuaMultiValue::from_vec(vec![LuaValue::Table(table)]),
// ) );
Ok::<_, LuaError>(()) handle.unregister(Ok(()));
result
}); });
Box::pin(async move { Ok(response) }) Box::pin(async move { Ok(response) })
} else { } else {
@ -103,6 +113,9 @@ impl Service<Request<Body>> for NetServiceInner {
.with_value("headers", header_map)? .with_value("headers", header_map)?
.with_value("body", lua.create_string(&bytes)?)? .with_value("body", lua.create_string(&bytes)?)?
.build_readonly()?; .build_readonly()?;
// TODO: Make some kind of NetServeResponse type with a
// FromLua implementation instead, this is a bit messy
// and does not send errors to the scheduler properly
match handler.call(request) { match handler.call(request) {
// Plain strings from the handler are plaintext responses // Plain strings from the handler are plaintext responses
Ok(LuaValue::String(s)) => Ok(Response::builder() Ok(LuaValue::String(s)) => Ok(Response::builder()
@ -139,12 +152,8 @@ impl Service<Request<Body>> for NetServiceInner {
} }
// If the handler returns a value that is of an invalid type, // If the handler returns a value that is of an invalid type,
// this should also be an error, so generate a 5xx response // this should also be an error, so generate a 5xx response
Ok(value) => { Ok(_) => {
// TODO: Send below error to task scheduler so that it can emit properly // TODO: Implement the type in the above todo
let _ = LuaError::RuntimeError(format!(
"Expected net serve handler to return a value of type 'string' or 'table', got '{}'",
value.type_name()
));
Ok(Response::builder() Ok(Response::builder()
.status(500) .status(500)
.body(Body::from("Internal Server Error")) .body(Body::from("Internal Server Error"))

View file

@ -14,53 +14,48 @@ use crate::utils::table::TableBuilder;
pub struct NetWebSocketClient(Arc<Mutex<WebSocketStream<MaybeTlsStream<TcpStream>>>>); pub struct NetWebSocketClient(Arc<Mutex<WebSocketStream<MaybeTlsStream<TcpStream>>>>);
impl NetWebSocketClient { impl NetWebSocketClient {
async fn close(&self) -> LuaResult<()> { pub async fn close(&self) -> LuaResult<()> {
self.0.lock().await.close(None).await; let mut ws = self.0.lock().await;
ws.close(None).await.map_err(LuaError::external)?;
Ok(()) Ok(())
} }
async fn send(&self, msg: String) -> LuaResult<()> { pub async fn send(&self, msg: WsMessage) -> LuaResult<()> {
self.0 let mut ws = self.0.lock().await;
.lock() ws.send(msg).await.map_err(LuaError::external)?;
.await Ok(())
.send(WsMessage::Text(msg))
.await
.map_err(LuaError::external)
} }
async fn next<'a>(&self, lua: &'static Lua) -> LuaResult<LuaValue<'a>> { pub async fn next(&self) -> LuaResult<Option<WsMessage>> {
let item = self let mut ws = self.0.lock().await;
.0 let item = ws.next().await.transpose();
.lock() item.map_err(LuaError::external)
.await
.next()
.await
.transpose()
.map_err(LuaError::external)?;
Ok(match item {
None => LuaValue::Nil,
Some(msg) => match msg {
WsMessage::Binary(bin) => LuaValue::String(lua.create_string(&bin)?),
WsMessage::Text(txt) => LuaValue::String(lua.create_string(&txt)?),
_ => LuaValue::Nil,
},
})
} }
pub fn into_lua_table(self, lua: &'static Lua) -> LuaResult<LuaTable> { pub fn into_lua_table(self, lua: &'static Lua) -> LuaResult<LuaTable> {
let inner_close = self.clone(); // FIXME: Deallocate when closed
let inner_send = self.clone(); let client = Box::leak(Box::new(self));
let inner_next = self.clone();
TableBuilder::new(lua)? TableBuilder::new(lua)?
.with_async_function("close", move |_, _: ()| inner_close.close())? .with_async_function("close", |_, ()| async {
.with_async_function("send", move |_, msg: String| inner_send.send(msg))? let result = client.close().await;
.with_async_function("next", move |lua, _: ()| inner_next.next(lua))? result
})?
.with_async_function("send", |_, message: String| async {
let result = client.send(WsMessage::Text(message)).await;
result
})?
.with_async_function("next", |lua, ()| async {
let result = client.next().await?;
Ok(match result {
Some(WsMessage::Binary(bin)) => LuaValue::String(lua.create_string(&bin)?),
Some(WsMessage::Text(txt)) => LuaValue::String(lua.create_string(&txt)?),
_ => LuaValue::Nil,
})
})?
.build_readonly() .build_readonly()
} }
} }
impl LuaUserData for NetWebSocketClient {}
impl From<WebSocketStream<MaybeTlsStream<TcpStream>>> for NetWebSocketClient { impl From<WebSocketStream<MaybeTlsStream<TcpStream>>> for NetWebSocketClient {
fn from(value: WebSocketStream<MaybeTlsStream<TcpStream>>) -> Self { fn from(value: WebSocketStream<MaybeTlsStream<TcpStream>>) -> Self {
Self(Arc::new(Mutex::new(value))) Self(Arc::new(Mutex::new(value)))

View file

@ -10,53 +10,48 @@ use tokio::sync::Mutex;
use crate::utils::table::TableBuilder; use crate::utils::table::TableBuilder;
type Inner = Arc<Mutex<WebSocketStream<Upgraded>>>;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct NetWebSocketServer(Inner); pub struct NetWebSocketServer(Arc<Mutex<WebSocketStream<Upgraded>>>);
impl NetWebSocketServer { impl NetWebSocketServer {
async fn close(&self) -> LuaResult<()> { pub async fn close(&self) -> LuaResult<()> {
self.0.lock().await.close(None).await; let mut ws = self.0.lock().await;
ws.close(None).await.map_err(LuaError::external)?;
Ok(()) Ok(())
} }
async fn send(&self, msg: String) -> LuaResult<()> { pub async fn send(&self, msg: WsMessage) -> LuaResult<()> {
self.0 let mut ws = self.0.lock().await;
.lock() ws.send(msg).await.map_err(LuaError::external)?;
.await Ok(())
.send(WsMessage::Text(msg))
.await
.map_err(LuaError::external)
} }
async fn next<'a>(&self, lua: &'static Lua) -> LuaResult<LuaValue<'a>> { pub async fn next(&self) -> LuaResult<Option<WsMessage>> {
let item = self let mut ws = self.0.lock().await;
.0 let item = ws.next().await.transpose();
.lock() item.map_err(LuaError::external)
.await
.next()
.await
.transpose()
.map_err(LuaError::external)?;
Ok(match item {
None => LuaValue::Nil,
Some(msg) => match msg {
WsMessage::Binary(bin) => LuaValue::String(lua.create_string(&bin)?),
WsMessage::Text(txt) => LuaValue::String(lua.create_string(&txt)?),
_ => LuaValue::Nil,
},
})
} }
pub fn into_lua_table(self, lua: &'static Lua) -> LuaResult<LuaTable> { pub fn into_lua_table(self, lua: &'static Lua) -> LuaResult<LuaTable> {
let inner_close = self.clone(); // FIXME: Deallocate when closed
let inner_send = self.clone(); let server = Box::leak(Box::new(self));
let inner_next = self.clone();
TableBuilder::new(lua)? TableBuilder::new(lua)?
.with_async_function("close", move |_, _: ()| inner_close.close())? .with_async_function("close", |_, ()| async {
.with_async_function("send", move |_, msg: String| inner_send.send(msg))? let result = server.close().await;
.with_async_function("next", move |lua, _: ()| inner_next.next(lua))? result
})?
.with_async_function("send", |_, message: String| async {
let result = server.send(WsMessage::Text(message)).await;
result
})?
.with_async_function("next", |lua, ()| async {
let result = server.next().await?;
Ok(match result {
Some(WsMessage::Binary(bin)) => LuaValue::String(lua.create_string(&bin)?),
Some(WsMessage::Text(txt)) => LuaValue::String(lua.create_string(&txt)?),
_ => LuaValue::Nil,
})
})?
.build_readonly() .build_readonly()
} }
} }