From 091dc17337a444bf440a96397ed551c0b472b8e3 Mon Sep 17 00:00:00 2001 From: Filip Tibell Date: Sat, 11 Feb 2023 23:29:17 +0100 Subject: [PATCH] Implement web socket client --- CHANGELOG.md | 53 +++++++++++++---- Cargo.lock | 13 ++++ lune.yml | 6 +- luneTypes.d.luau | 13 ++++ packages/lib/Cargo.toml | 2 + packages/lib/src/globals/net.rs | 12 +++- packages/lib/src/lua/net/mod.rs | 2 + packages/lib/src/lua/net/ws_client.rs | 86 +++++++++++++++++++++++++++ packages/lib/src/lua/net/ws_server.rs | 39 ++++++------ tests/net/serve.luau | 37 +++++++++--- 10 files changed, 224 insertions(+), 39 deletions(-) create mode 100644 packages/lib/src/lua/net/ws_client.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index ad97a7f..f2156ce 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ + + + # Changelog All notable changes to this project will be documented in this file. @@ -9,7 +12,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added -- `net.serve` now supports web sockets in addition to normal http requests! +- ### Web Sockets + + `net` now supports web sockets for both clients and servers!
+ Not that the web socket object is identical on both client and + server, but how you retrieve a web socket object is different. + + #### Server API + + The server web socket API is an extension of the existing `net.serve` function.
+ This allows for serving both normal HTTP requests and web socket requests on the same port. Example usage: @@ -23,17 +35,41 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 socket.send("Timed out!") socket.close() end) - -- This will yield waiting for new messages, and will break - -- when the socket was closed by either the server or client - for message in socket do - if message == "Ping" then + -- The message will be nil when the socket has closed + repeat + local messageFromClient = socket.next() + if messageFromClient == "Ping" then socket.send("Pong") end - end + until messageFromClient == nil end, }) ``` + #### Client API + + Example usage: + + ```lua + local socket = net.socket("ws://localhost:8080") + + socket.send("Ping") + + task.delay(5, function() + socket.close() + end) + + -- The message will be nil when the socket has closed + repeat + local messageFromServer = socket.next() + if messageFromServer == "Ping" then + socket.send("Pong") + end + until messageFromServer == nil + ``` + +### Changed + - `net.serve` now returns a `NetServeHandle` which can be used to stop serving requests safely. Example usage: @@ -49,11 +85,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 print("Shut down succesfully") ``` +- The third and optional argument of `process.spawn` is now a global type `ProcessSpawnOptions`. - Setting `cwd` in the options for `process.spawn` to a path starting with a tilde (`~`) will now use a path relative to the platform-specific home / user directory. -- Added a global type `ProcessSpawnOptions` for the third and optional argument of `process.spawn` - -### Changed - - `NetRequest` query parameters value has been changed to be a table of key-value pairs similar to `process.env`. If any query parameter is specified more than once in the request url, the value chosen will be the last one that was specified. - The internal http client for `net.request` now reuses headers and connections for more efficient requests. diff --git a/Cargo.lock b/Cargo.lock index 2fcfb99..ef60a7a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,6 +17,17 @@ version = "1.0.69" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "224afbd727c3d6e4b90103ece64b8d1b67fbb1973b1046c2281eed3f3803f800" +[[package]] +name = "async-trait" +version = "0.1.64" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1cd7fce9ba8c3c042128ce72d8b2ddbf3a05747efb67ea0313c635e10bda47a2" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "autocfg" version = "1.1.0" @@ -678,6 +689,7 @@ name = "lune" version = "0.3.0" dependencies = [ "anyhow", + "async-trait", "console", "dialoguer", "directories", @@ -692,6 +704,7 @@ dependencies = [ "serde", "serde_json", "tokio", + "tokio-tungstenite", ] [[package]] diff --git a/lune.yml b/lune.yml index fefb73e..fd58899 100644 --- a/lune.yml +++ b/lune.yml @@ -44,7 +44,11 @@ globals: - type: string net.request: args: - - type: any + - type: string | table + net.socket: + must_use: true + args: + - type: string net.serve: args: - type: number diff --git a/luneTypes.d.luau b/luneTypes.d.luau index 31c9244..17b7714 100644 --- a/luneTypes.d.luau +++ b/luneTypes.d.luau @@ -168,6 +168,7 @@ export type NetServeHandle = { declare class NetWebSocket close: () -> () send: (message: string) -> () + next: () -> (string?) function __iter(self): () -> string end @@ -191,6 +192,18 @@ declare net: { --[=[ @within net + Connects to a web socket at the given URL. + + Throws an error if the server at the given URL does not support + web sockets, or if a miscellaneous network or I/O error occurs. + + @param url The URL to connect to + @return A web socket handle + ]=] + socket: (url: string) -> NetWebSocket, + --[=[ + @within net + Creates an HTTP server that listens on the given `port`. This will ***not*** block and will keep listening for requests on the given `port` diff --git a/packages/lib/Cargo.toml b/packages/lib/Cargo.toml index f4e18cb..ab3b756 100644 --- a/packages/lib/Cargo.toml +++ b/packages/lib/Cargo.toml @@ -22,6 +22,7 @@ serde.workspace = true tokio.workspace = true reqwest.workspace = true +async-trait = "0.1.64" dialoguer = "0.10.3" directories = "4.0.1" futures-util = "0.3.26" @@ -30,6 +31,7 @@ os_str_bytes = "6.4.1" hyper = { version = "0.14.24", features = ["full"] } hyper-tungstenite = { version = "0.9.0" } +tokio-tungstenite = { version = "0.18.0" } mlua = { version = "0.8.7", features = ["luau", "async", "serialize"] } [dev-dependencies] diff --git a/packages/lib/src/globals/net.rs b/packages/lib/src/globals/net.rs index 020cb61..e3e49c3 100644 --- a/packages/lib/src/globals/net.rs +++ b/packages/lib/src/globals/net.rs @@ -19,7 +19,7 @@ use tokio::{ }; use crate::{ - lua::net::{NetClient, NetClientBuilder, NetWebSocketServer, ServeConfig}, + lua::net::{NetClient, NetClientBuilder, NetWebSocketClient, NetWebSocketServer, ServeConfig}, utils::{message::LuneMessage, net::get_request_user_agent_header, table::TableBuilder}, }; @@ -35,6 +35,7 @@ pub fn create(lua: &'static Lua) -> LuaResult { .with_function("jsonEncode", net_json_encode)? .with_function("jsonDecode", net_json_decode)? .with_async_function("request", net_request)? + .with_async_function("socket", net_socket)? .with_async_function("serve", net_serve)? .build_readonly() } @@ -143,6 +144,15 @@ async fn net_request<'a>(lua: &'static Lua, config: LuaValue<'a>) -> LuaResult(lua: &'static Lua, url: String) -> LuaResult { + let (stream, _) = tokio_tungstenite::connect_async(url) + .await + .map_err(LuaError::external)?; + let ws_lua = NetWebSocketClient::from(stream); + let ws_proper = ws_lua.into_proper(lua).await?; + Ok(ws_proper) +} + async fn net_serve<'a>( lua: &'static Lua, (port, config): (u16, ServeConfig<'a>), diff --git a/packages/lib/src/lua/net/mod.rs b/packages/lib/src/lua/net/mod.rs index 32dbf46..dc9dfd4 100644 --- a/packages/lib/src/lua/net/mod.rs +++ b/packages/lib/src/lua/net/mod.rs @@ -1,7 +1,9 @@ mod client; mod config; +mod ws_client; mod ws_server; pub use client::{NetClient, NetClientBuilder}; pub use config::ServeConfig; +pub use ws_client::NetWebSocketClient; pub use ws_server::NetWebSocketServer; diff --git a/packages/lib/src/lua/net/ws_client.rs b/packages/lib/src/lua/net/ws_client.rs new file mode 100644 index 0000000..3cdf578 --- /dev/null +++ b/packages/lib/src/lua/net/ws_client.rs @@ -0,0 +1,86 @@ +use std::sync::Arc; + +use mlua::prelude::*; + +use hyper_tungstenite::{tungstenite::Message as WsMessage, WebSocketStream}; + +use futures_util::{SinkExt, StreamExt}; +use tokio::{net::TcpStream, sync::Mutex}; +use tokio_tungstenite::MaybeTlsStream; + +#[derive(Debug, Clone)] +pub struct NetWebSocketClient(Arc>>>); + +impl NetWebSocketClient { + pub async fn close(&self) -> LuaResult<()> { + let mut ws = self.0.lock().await; + ws.close(None).await.map_err(LuaError::external)?; + Ok(()) + } + + pub async fn send(&self, msg: WsMessage) -> LuaResult<()> { + let mut ws = self.0.lock().await; + ws.send(msg).await.map_err(LuaError::external)?; + Ok(()) + } + + pub async fn next(&self) -> LuaResult> { + let mut ws = self.0.lock().await; + let item = ws.next().await.transpose(); + item.map_err(LuaError::external) + } + + pub async fn into_proper(self, lua: &'static Lua) -> LuaResult { + // HACK: This creates a new userdata that consumes and proxies this one, + // since there's no great way to implement this in pure async Rust + // and as a plain table without tons of strange lifetime issues + let chunk = r#" + local ws = ... + local proxy = newproxy(true) + local meta = getmetatable(proxy) + meta.__index = { + close = function() + return ws:close() + end, + send = function(...) + return ws:send(...) + end, + next = function() + return ws:next() + end, + } + meta.__iter = function() + return function() + return ws:next() + end + end + return proxy + "#; + lua.load(chunk).call_async(self).await + } +} + +impl LuaUserData for NetWebSocketClient { + fn add_methods<'lua, M: LuaUserDataMethods<'lua, Self>>(methods: &mut M) { + methods.add_async_method("close", |_, this, _: ()| async move { this.close().await }); + methods.add_async_method("send", |_, this, msg: String| async move { + this.send(WsMessage::Text(msg)).await + }); + methods.add_async_method("next", |lua, this, _: ()| async move { + match this.next().await? { + Some(msg) => Ok(match msg { + WsMessage::Binary(bin) => LuaValue::String(lua.create_string(&bin)?), + WsMessage::Text(txt) => LuaValue::String(lua.create_string(&txt)?), + _ => LuaValue::Nil, + }), + None => Ok(LuaValue::Nil), + } + }) + } +} + +impl From>> for NetWebSocketClient { + fn from(value: WebSocketStream>) -> Self { + Self(Arc::new(Mutex::new(value))) + } +} diff --git a/packages/lib/src/lua/net/ws_server.rs b/packages/lib/src/lua/net/ws_server.rs index ad9a6c7..18966fa 100644 --- a/packages/lib/src/lua/net/ws_server.rs +++ b/packages/lib/src/lua/net/ws_server.rs @@ -30,32 +30,31 @@ impl NetWebSocketServer { item.map_err(LuaError::external) } - pub async fn into_proper(self, lua: &'static Lua) -> LuaResult { + pub async fn into_proper(self, lua: &'static Lua) -> LuaResult { // HACK: This creates a new userdata that consumes and proxies this one, // since there's no great way to implement this in pure async Rust // and as a plain table without tons of strange lifetime issues let chunk = r#" - return function(ws) - local proxy = newproxy(true) - local meta = getmetatable(proxy) - meta.__index = { - close = function() - return ws:close() - end, - send = function(...) - return ws:send(...) - end, - next = function() - return ws:next() - end, - } - meta.__iter = function() - return function() - return ws:next() - end + local ws = ... + local proxy = newproxy(true) + local meta = getmetatable(proxy) + meta.__index = { + close = function() + return ws:close() + end, + send = function(...) + return ws:send(...) + end, + next = function() + return ws:next() + end, + } + meta.__iter = function() + return function() + return ws:next() end - return proxy end + return proxy "#; lua.load(chunk).call_async(self).await } diff --git a/tests/net/serve.luau b/tests/net/serve.luau index a2f1587..75bcf1b 100644 --- a/tests/net/serve.luau +++ b/tests/net/serve.luau @@ -1,4 +1,7 @@ local PORT = 8080 +local URL = `http://127.0.0.1:{PORT}` +local WS_URL = `ws://127.0.0.1:{PORT}` +local REQUEST = "Hello from client!" local RESPONSE = "Hello, lune!" local handle = net.serve(PORT, function(request) @@ -10,8 +13,7 @@ local handle = net.serve(PORT, function(request) return RESPONSE end) -local response = - net.request(`http://127.0.0.1:{PORT}/some/path?key=param1&key=param2&key2=param3`).body +local response = net.request(URL .. "/some/path?key=param1&key=param2&key2=param3").body assert(response == RESPONSE, "Invalid response from server") handle.stop() @@ -22,7 +24,7 @@ task.wait() -- Sending a net request may error if there was -- a connection issue, we should handle that here -local success, response2 = pcall(net.request, `http://127.0.0.1:{PORT}/`) +local success, response2 = pcall(net.request, URL) if not success then local message = tostring(response2) assert( @@ -63,16 +65,37 @@ local handle2 = net.serve(PORT, { return RESPONSE end, handleWebSocket = function(socket) + local socketMessage = socket.next() + assert(socketMessage == REQUEST, "Invalid web socket request from client") + socket.send(RESPONSE) socket.close() end, }) -local response3 = net.request(`http://127.0.0.1:{PORT}/`).body +local response3 = net.request(URL).body assert(response3 == RESPONSE, "Invalid response from server") --- TODO: Test web sockets properly when we have a web socket client +-- Web socket client should work +local socket = net.socket(WS_URL) --- Stop the server and yield once more to end the test +socket.send(REQUEST) + +local socketMessage = socket.next() +assert(socketMessage ~= nil, "Got no web socket response from server") +assert(socketMessage == RESPONSE, "Invalid web socket response from server") + +socket.close() + +-- Wait for the socket to close and make sure we can't send messages afterwards +task.wait() +local success3, err2 = (pcall :: any)(socket.send, "") +assert(not success3, "Sending messages after the socket has been closed should error") +local message2 = tostring(err2) +assert( + string.find(message2, "close") or string.find(message2, "closing"), + "The error message for sending messages on a closed web socket should be descriptive" +) + +-- Stop the server to end the test handle2.stop() -task.wait()