diff --git a/packages/lib/src/globals/net.rs b/packages/lib/src/globals/net.rs index c27ead0..9a14d9f 100644 --- a/packages/lib/src/globals/net.rs +++ b/packages/lib/src/globals/net.rs @@ -8,8 +8,10 @@ use tokio::{sync::mpsc, task}; use crate::{ lua::{ - // net::{NetWebSocketClient, NetWebSocketServer}, - net::{NetClient, NetClientBuilder, NetLocalExec, NetService, RequestConfig, ServeConfig}, + net::{ + NetClient, NetClientBuilder, NetLocalExec, NetService, NetWebSocketClient, + RequestConfig, ServeConfig, + }, task::{TaskScheduler, TaskSchedulerAsyncExt}, }, utils::{net::get_request_user_agent_header, table::TableBuilder}, @@ -78,27 +80,17 @@ async fn net_request<'a>(lua: &'static Lua, config: RequestConfig<'a>) -> LuaRes .build_readonly() } -async fn net_socket<'a>(_lua: &'static Lua, _url: String) -> LuaResult { - Err(LuaError::RuntimeError( - "Client websockets are not yet implemented".to_string(), - )) - // let (ws, _) = tokio_tungstenite::connect_async(url) - // .await - // .map_err(LuaError::external)?; - // let sock = NetWebSocketClient::from(ws); - // let table = sock.into_lua_table(lua)?; - // Ok(table) +async fn net_socket<'a>(lua: &'static Lua, url: String) -> LuaResult { + let (ws, _) = tokio_tungstenite::connect_async(url) + .await + .map_err(LuaError::external)?; + NetWebSocketClient::from(ws).into_lua_table(lua) } async fn net_serve<'a>( lua: &'static Lua, (port, config): (u16, ServeConfig<'a>), ) -> LuaResult> { - if config.handle_web_socket.is_some() { - return Err(LuaError::RuntimeError( - "Server websockets are not yet implemented".to_string(), - )); - } // Note that we need to use a mpsc here and not // a oneshot channel since we move the sender // into our table with the stop function diff --git a/packages/lib/src/lua/async_ext.rs b/packages/lib/src/lua/async_ext.rs index 628f630..4518cb2 100644 --- a/packages/lib/src/lua/async_ext.rs +++ b/packages/lib/src/lua/async_ext.rs @@ -22,7 +22,7 @@ pub trait LuaAsyncExt { where A: FromLuaMulti<'static>, R: ToLuaMulti<'static>, - F: 'static + Fn(&'static Lua, A) -> FR, + F: 'static + Fn(&'lua Lua, A) -> FR, FR: 'static + Future>; fn create_waiter_function<'lua>(self) -> LuaResult>; @@ -37,7 +37,7 @@ impl LuaAsyncExt for &'static Lua { where A: FromLuaMulti<'static>, R: ToLuaMulti<'static>, - F: 'static + Fn(&'static Lua, A) -> FR, + F: 'static + Fn(&'lua Lua, A) -> FR, FR: 'static + Future>, { let async_env_yield: LuaFunction = self.named_registry_value("co.yield")?; diff --git a/packages/lib/src/lua/net/mod.rs b/packages/lib/src/lua/net/mod.rs index f75970c..492ebf3 100644 --- a/packages/lib/src/lua/net/mod.rs +++ b/packages/lib/src/lua/net/mod.rs @@ -1,11 +1,11 @@ mod client; mod config; mod server; -// mod ws_client; -// mod ws_server; +mod ws_client; +mod ws_server; pub use client::{NetClient, NetClientBuilder}; pub use config::{RequestConfig, ServeConfig}; pub use server::{NetLocalExec, NetService}; -// pub use ws_client::NetWebSocketClient; -// pub use ws_server::NetWebSocketServer; +pub use ws_client::NetWebSocketClient; +pub use ws_server::NetWebSocketServer; diff --git a/packages/lib/src/lua/net/server.rs b/packages/lib/src/lua/net/server.rs index 17c23cf..6e58822 100644 --- a/packages/lib/src/lua/net/server.rs +++ b/packages/lib/src/lua/net/server.rs @@ -12,7 +12,12 @@ use hyper::{Body, Request, Response}; use hyper_tungstenite::{is_upgrade_request as is_ws_upgrade_request, upgrade as ws_upgrade}; use tokio::task; -use crate::utils::table::TableBuilder; +use crate::{ + lua::task::{TaskScheduler, TaskSchedulerAsyncExt, TaskSchedulerScheduleExt}, + utils::table::TableBuilder, +}; + +use super::NetWebSocketServer; // Hyper service implementation for net, lots of boilerplate here // but make_svc and make_svc_function do not work for what we need @@ -40,25 +45,30 @@ impl Service> for NetServiceInner { // and then call our handler with a new socket object let kopt = self.2.clone(); let key = kopt.as_ref().as_ref().unwrap(); - let _handler: LuaFunction = lua.registry_value(key).expect("Missing websocket handler"); + let handler: LuaFunction = lua.registry_value(key).expect("Missing websocket handler"); let (response, ws) = ws_upgrade(&mut req, None).expect("Failed to upgrade websocket"); - // TODO: This should be spawned as part of the scheduler, + // This should be spawned as a registered task, otherwise // the scheduler may exit early and cancel this even though what // we want here is a long-running task that keeps the program alive + let sched = lua + .app_data_ref::<&TaskScheduler>() + .expect("Missing task scheduler - make sure it is added as a lua app data before the first scheduler resumption"); + let handle = sched.register_background_task(); task::spawn_local(async move { // Create our new full websocket object, then // schedule our handler to get called asap - let _ws = ws.await.map_err(LuaError::external)?; - // let sock = NetWebSocketServer::from(ws); - // let table = sock.into_lua_table(lua)?; - // let sched = lua - // .app_data_ref::<&TaskScheduler>() - // .expect("Missing task scheduler - make sure it is added as a lua app data before the first scheduler resumption"); - // sched.schedule_current_resume( - // LuaValue::Function(handler), - // LuaMultiValue::from_vec(vec![LuaValue::Table(table)]), - // ) - Ok::<_, LuaError>(()) + let ws = ws.await.map_err(LuaError::external)?; + let sock = NetWebSocketServer::from(ws); + let table = sock.into_lua_table(lua)?; + let sched = lua + .app_data_ref::<&TaskScheduler>() + .expect("Missing task scheduler - make sure it is added as a lua app data before the first scheduler resumption"); + let result = sched.schedule_blocking( + lua.create_thread(handler)?, + LuaMultiValue::from_vec(vec![LuaValue::Table(table)]), + ); + handle.unregister(Ok(())); + result }); Box::pin(async move { Ok(response) }) } else { @@ -103,6 +113,9 @@ impl Service> for NetServiceInner { .with_value("headers", header_map)? .with_value("body", lua.create_string(&bytes)?)? .build_readonly()?; + // TODO: Make some kind of NetServeResponse type with a + // FromLua implementation instead, this is a bit messy + // and does not send errors to the scheduler properly match handler.call(request) { // Plain strings from the handler are plaintext responses Ok(LuaValue::String(s)) => Ok(Response::builder() @@ -139,12 +152,8 @@ impl Service> for NetServiceInner { } // If the handler returns a value that is of an invalid type, // this should also be an error, so generate a 5xx response - Ok(value) => { - // TODO: Send below error to task scheduler so that it can emit properly - let _ = LuaError::RuntimeError(format!( - "Expected net serve handler to return a value of type 'string' or 'table', got '{}'", - value.type_name() - )); + Ok(_) => { + // TODO: Implement the type in the above todo Ok(Response::builder() .status(500) .body(Body::from("Internal Server Error")) diff --git a/packages/lib/src/lua/net/ws_client.rs b/packages/lib/src/lua/net/ws_client.rs index 6f2415d..bc11111 100644 --- a/packages/lib/src/lua/net/ws_client.rs +++ b/packages/lib/src/lua/net/ws_client.rs @@ -14,53 +14,48 @@ use crate::utils::table::TableBuilder; pub struct NetWebSocketClient(Arc>>>); impl NetWebSocketClient { - async fn close(&self) -> LuaResult<()> { - self.0.lock().await.close(None).await; + pub async fn close(&self) -> LuaResult<()> { + let mut ws = self.0.lock().await; + ws.close(None).await.map_err(LuaError::external)?; Ok(()) } - async fn send(&self, msg: String) -> LuaResult<()> { - self.0 - .lock() - .await - .send(WsMessage::Text(msg)) - .await - .map_err(LuaError::external) + pub async fn send(&self, msg: WsMessage) -> LuaResult<()> { + let mut ws = self.0.lock().await; + ws.send(msg).await.map_err(LuaError::external)?; + Ok(()) } - async fn next<'a>(&self, lua: &'static Lua) -> LuaResult> { - let item = self - .0 - .lock() - .await - .next() - .await - .transpose() - .map_err(LuaError::external)?; - Ok(match item { - None => LuaValue::Nil, - Some(msg) => match msg { - WsMessage::Binary(bin) => LuaValue::String(lua.create_string(&bin)?), - WsMessage::Text(txt) => LuaValue::String(lua.create_string(&txt)?), - _ => LuaValue::Nil, - }, - }) + pub async fn next(&self) -> LuaResult> { + let mut ws = self.0.lock().await; + let item = ws.next().await.transpose(); + item.map_err(LuaError::external) } pub fn into_lua_table(self, lua: &'static Lua) -> LuaResult { - let inner_close = self.clone(); - let inner_send = self.clone(); - let inner_next = self.clone(); + // FIXME: Deallocate when closed + let client = Box::leak(Box::new(self)); TableBuilder::new(lua)? - .with_async_function("close", move |_, _: ()| inner_close.close())? - .with_async_function("send", move |_, msg: String| inner_send.send(msg))? - .with_async_function("next", move |lua, _: ()| inner_next.next(lua))? + .with_async_function("close", |_, ()| async { + let result = client.close().await; + result + })? + .with_async_function("send", |_, message: String| async { + let result = client.send(WsMessage::Text(message)).await; + result + })? + .with_async_function("next", |lua, ()| async { + let result = client.next().await?; + Ok(match result { + Some(WsMessage::Binary(bin)) => LuaValue::String(lua.create_string(&bin)?), + Some(WsMessage::Text(txt)) => LuaValue::String(lua.create_string(&txt)?), + _ => LuaValue::Nil, + }) + })? .build_readonly() } } -impl LuaUserData for NetWebSocketClient {} - impl From>> for NetWebSocketClient { fn from(value: WebSocketStream>) -> Self { Self(Arc::new(Mutex::new(value))) diff --git a/packages/lib/src/lua/net/ws_server.rs b/packages/lib/src/lua/net/ws_server.rs index 7a7894b..5b289be 100644 --- a/packages/lib/src/lua/net/ws_server.rs +++ b/packages/lib/src/lua/net/ws_server.rs @@ -10,53 +10,48 @@ use tokio::sync::Mutex; use crate::utils::table::TableBuilder; -type Inner = Arc>>; - #[derive(Debug, Clone)] -pub struct NetWebSocketServer(Inner); +pub struct NetWebSocketServer(Arc>>); impl NetWebSocketServer { - async fn close(&self) -> LuaResult<()> { - self.0.lock().await.close(None).await; + pub async fn close(&self) -> LuaResult<()> { + let mut ws = self.0.lock().await; + ws.close(None).await.map_err(LuaError::external)?; Ok(()) } - async fn send(&self, msg: String) -> LuaResult<()> { - self.0 - .lock() - .await - .send(WsMessage::Text(msg)) - .await - .map_err(LuaError::external) + pub async fn send(&self, msg: WsMessage) -> LuaResult<()> { + let mut ws = self.0.lock().await; + ws.send(msg).await.map_err(LuaError::external)?; + Ok(()) } - async fn next<'a>(&self, lua: &'static Lua) -> LuaResult> { - let item = self - .0 - .lock() - .await - .next() - .await - .transpose() - .map_err(LuaError::external)?; - Ok(match item { - None => LuaValue::Nil, - Some(msg) => match msg { - WsMessage::Binary(bin) => LuaValue::String(lua.create_string(&bin)?), - WsMessage::Text(txt) => LuaValue::String(lua.create_string(&txt)?), - _ => LuaValue::Nil, - }, - }) + pub async fn next(&self) -> LuaResult> { + let mut ws = self.0.lock().await; + let item = ws.next().await.transpose(); + item.map_err(LuaError::external) } pub fn into_lua_table(self, lua: &'static Lua) -> LuaResult { - let inner_close = self.clone(); - let inner_send = self.clone(); - let inner_next = self.clone(); + // FIXME: Deallocate when closed + let server = Box::leak(Box::new(self)); TableBuilder::new(lua)? - .with_async_function("close", move |_, _: ()| inner_close.close())? - .with_async_function("send", move |_, msg: String| inner_send.send(msg))? - .with_async_function("next", move |lua, _: ()| inner_next.next(lua))? + .with_async_function("close", |_, ()| async { + let result = server.close().await; + result + })? + .with_async_function("send", |_, message: String| async { + let result = server.send(WsMessage::Text(message)).await; + result + })? + .with_async_function("next", |lua, ()| async { + let result = server.next().await?; + Ok(match result { + Some(WsMessage::Binary(bin)) => LuaValue::String(lua.create_string(&bin)?), + Some(WsMessage::Text(txt)) => LuaValue::String(lua.create_string(&txt)?), + _ => LuaValue::Nil, + }) + })? .build_readonly() } }