From d2ff0783a5f81c8ecbd38536163a6605c8764d05 Mon Sep 17 00:00:00 2001 From: Filip Tibell Date: Tue, 14 Feb 2023 13:17:07 +0100 Subject: [PATCH] Implement bulk of async APIs --- Cargo.lock | 3 - packages/lib/Cargo.toml | 2 +- packages/lib/src/globals/net.rs | 37 +++++--- packages/lib/src/lib.rs | 13 +-- packages/lib/src/lua/net/mod.rs | 8 +- packages/lib/src/lua/net/ws_client.rs | 88 ++++++++---------- packages/lib/src/lua/net/ws_server.rs | 92 ++++++++----------- packages/lib/src/lua/task/scheduler.rs | 44 ++++++++- packages/lib/src/tests.rs | 3 +- packages/lib/src/utils/process.rs | 14 ++- packages/lib/src/utils/table.rs | 7 +- tests/net/{serve.luau => serve/requests.luau} | 53 ++--------- tests/net/serve/websockets.luau | 64 +++++++++++++ 13 files changed, 246 insertions(+), 182 deletions(-) rename tests/net/{serve.luau => serve/requests.luau} (56%) create mode 100644 tests/net/serve/websockets.luau diff --git a/Cargo.lock b/Cargo.lock index 10e491e..174772d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -757,9 +757,6 @@ dependencies = [ "bstr", "cc", "erased-serde", - "futures-core", - "futures-task", - "futures-util", "luau0-src", "num-traits", "once_cell", diff --git a/packages/lib/Cargo.toml b/packages/lib/Cargo.toml index ab3b756..61b1c8f 100644 --- a/packages/lib/Cargo.toml +++ b/packages/lib/Cargo.toml @@ -32,7 +32,7 @@ os_str_bytes = "6.4.1" hyper = { version = "0.14.24", features = ["full"] } hyper-tungstenite = { version = "0.9.0" } tokio-tungstenite = { version = "0.18.0" } -mlua = { version = "0.8.7", features = ["luau", "async", "serialize"] } +mlua = { version = "0.8.7", features = ["luau", "serialize"] } [dev-dependencies] anyhow = "1.0.69" diff --git a/packages/lib/src/globals/net.rs b/packages/lib/src/globals/net.rs index 4b31d21..1724c2d 100644 --- a/packages/lib/src/globals/net.rs +++ b/packages/lib/src/globals/net.rs @@ -16,7 +16,11 @@ use reqwest::Method; use tokio::{sync::mpsc, task}; use crate::{ - lua::net::{NetClient, NetClientBuilder, NetWebSocketClient, NetWebSocketServer, ServeConfig}, + lua::{ + // net::{NetWebSocketClient, NetWebSocketServer}, + net::{NetClient, NetClientBuilder, ServeConfig}, + task::TaskScheduler, + }, utils::{net::get_request_user_agent_header, table::TableBuilder}, }; @@ -141,13 +145,14 @@ async fn net_request<'a>(lua: &'static Lua, config: LuaValue<'a>) -> LuaResult(lua: &'static Lua, url: String) -> LuaResult { - let (stream, _) = tokio_tungstenite::connect_async(url) +async fn net_socket<'a>(lua: &'static Lua, url: String) -> LuaResult { + let (ws, _) = tokio_tungstenite::connect_async(url) .await .map_err(LuaError::external)?; - let ws_lua = NetWebSocketClient::from(stream); - let ws_proper = ws_lua.into_proper(lua).await?; - Ok(ws_proper) + todo!() + // let sock = NetWebSocketClient::from(ws); + // let table = sock.into_lua_table(lua)?; + // Ok(table) } async fn net_serve<'a>( @@ -223,13 +228,21 @@ impl Service> for NetService { let key = kopt.as_ref().as_ref().unwrap(); let handler: LuaFunction = lua.registry_value(key).expect("Missing websocket handler"); let (response, ws) = ws_upgrade(&mut req, None).expect("Failed to upgrade websocket"); + // TODO: This should be spawned as part of the scheduler, + // the scheduler may exit early and cancel this even though what + // we want here is a long-running task that keeps the program alive task::spawn_local(async move { - // Create our new full websocket object + // Create our new full websocket object, then + // schedule our handler to get called asap let ws = ws.await.map_err(LuaError::external)?; - let ws_lua = NetWebSocketServer::from(ws); - let ws_proper = ws_lua.into_proper(lua).await?; - // Call our handler with it - handler.call_async::<_, ()>(ws_proper).await + // let sock = NetWebSocketServer::from(ws); + // let table = sock.into_lua_table(lua)?; + // let sched = lua.app_data_mut::<&TaskScheduler>().unwrap(); + // sched.schedule_current_resume( + // LuaValue::Function(handler), + // LuaMultiValue::from_vec(vec![LuaValue::Table(table)]), + // ) + Ok::<_, LuaError>(()) }); Box::pin(async move { Ok(response) }) } else { @@ -274,7 +287,7 @@ impl Service> for NetService { .with_value("headers", header_map)? .with_value("body", lua.create_string(&bytes)?)? .build_readonly()?; - match handler.call_async(request).await { + match handler.call(request) { // Plain strings from the handler are plaintext responses Ok(LuaValue::String(s)) => Ok(Response::builder() .status(200) diff --git a/packages/lib/src/lib.rs b/packages/lib/src/lib.rs index d462c28..0cc0b82 100644 --- a/packages/lib/src/lib.rs +++ b/packages/lib/src/lib.rs @@ -100,12 +100,6 @@ impl Lune { lua.set_named_registry_value("co.close", coroutine.get::<_, LuaFunction>("close")?)?; let debug: LuaTable = lua.globals().raw_get("debug")?; lua.set_named_registry_value("dbg.info", debug.get::<_, LuaFunction>("info")?)?; - // Add in wanted lune globals - for global in self.includes.clone() { - if !self.excludes.contains(&global) { - global.inject(lua)?; - } - } // Create our task scheduler and schedule the main thread on it let sched = TaskScheduler::new(lua)?.into_static(); lua.set_app_data(sched); @@ -119,6 +113,13 @@ impl Lune { ), LuaValue::Nil.to_lua_multi(lua)?, )?; + // Create our wanted lune globals, some of these need + // the task scheduler be available during construction + for global in self.includes.clone() { + if !self.excludes.contains(&global) { + global.inject(lua)?; + } + } // Keep running the scheduler until there are either no tasks // left to run, or until a task requests to exit the process let exit_code = LocalSet::new() diff --git a/packages/lib/src/lua/net/mod.rs b/packages/lib/src/lua/net/mod.rs index dc9dfd4..637f49e 100644 --- a/packages/lib/src/lua/net/mod.rs +++ b/packages/lib/src/lua/net/mod.rs @@ -1,9 +1,9 @@ mod client; mod config; -mod ws_client; -mod ws_server; +// mod ws_client; +// mod ws_server; pub use client::{NetClient, NetClientBuilder}; pub use config::ServeConfig; -pub use ws_client::NetWebSocketClient; -pub use ws_server::NetWebSocketServer; +// pub use ws_client::NetWebSocketClient; +// pub use ws_server::NetWebSocketServer; diff --git a/packages/lib/src/lua/net/ws_client.rs b/packages/lib/src/lua/net/ws_client.rs index c27b423..6f2415d 100644 --- a/packages/lib/src/lua/net/ws_client.rs +++ b/packages/lib/src/lua/net/ws_client.rs @@ -8,70 +8,58 @@ use futures_util::{SinkExt, StreamExt}; use tokio::{net::TcpStream, sync::Mutex}; use tokio_tungstenite::MaybeTlsStream; +use crate::utils::table::TableBuilder; + #[derive(Debug, Clone)] pub struct NetWebSocketClient(Arc>>>); impl NetWebSocketClient { - pub async fn close(&self) -> LuaResult<()> { - let mut ws = self.0.lock().await; - ws.close(None).await.map_err(LuaError::external)?; + async fn close(&self) -> LuaResult<()> { + self.0.lock().await.close(None).await; Ok(()) } - pub async fn send(&self, msg: WsMessage) -> LuaResult<()> { - let mut ws = self.0.lock().await; - ws.send(msg).await.map_err(LuaError::external)?; - Ok(()) + async fn send(&self, msg: String) -> LuaResult<()> { + self.0 + .lock() + .await + .send(WsMessage::Text(msg)) + .await + .map_err(LuaError::external) } - pub async fn next(&self) -> LuaResult> { - let mut ws = self.0.lock().await; - let item = ws.next().await.transpose(); - item.map_err(LuaError::external) + async fn next<'a>(&self, lua: &'static Lua) -> LuaResult> { + let item = self + .0 + .lock() + .await + .next() + .await + .transpose() + .map_err(LuaError::external)?; + Ok(match item { + None => LuaValue::Nil, + Some(msg) => match msg { + WsMessage::Binary(bin) => LuaValue::String(lua.create_string(&bin)?), + WsMessage::Text(txt) => LuaValue::String(lua.create_string(&txt)?), + _ => LuaValue::Nil, + }, + }) } - pub async fn into_proper(self, lua: &'static Lua) -> LuaResult { - // HACK: This creates a new userdata that consumes and proxies this one, - // since there's no great way to implement this in pure async Rust - // and as a plain table without tons of strange lifetime issues - let chunk = r#" - local ws = ... - local proxy = newproxy(true) - local meta = getmetatable(proxy) - meta.__index = { - close = function() return ws:close() end, - send = function(...) return ws:send(...) end, - next = function() return ws:next() end, - } - meta.__iter = function() - return function() - return ws:next() - end - end - return proxy - "#; - lua.load(chunk).call_async(self).await + pub fn into_lua_table(self, lua: &'static Lua) -> LuaResult { + let inner_close = self.clone(); + let inner_send = self.clone(); + let inner_next = self.clone(); + TableBuilder::new(lua)? + .with_async_function("close", move |_, _: ()| inner_close.close())? + .with_async_function("send", move |_, msg: String| inner_send.send(msg))? + .with_async_function("next", move |lua, _: ()| inner_next.next(lua))? + .build_readonly() } } -impl LuaUserData for NetWebSocketClient { - fn add_methods<'lua, M: LuaUserDataMethods<'lua, Self>>(methods: &mut M) { - methods.add_async_method("close", |_, this, _: ()| async move { this.close().await }); - methods.add_async_method("send", |_, this, msg: String| async move { - this.send(WsMessage::Text(msg)).await - }); - methods.add_async_method("next", |lua, this, _: ()| async move { - match this.next().await? { - Some(msg) => Ok(match msg { - WsMessage::Binary(bin) => LuaValue::String(lua.create_string(&bin)?), - WsMessage::Text(txt) => LuaValue::String(lua.create_string(&txt)?), - _ => LuaValue::Nil, - }), - None => Ok(LuaValue::Nil), - } - }); - } -} +impl LuaUserData for NetWebSocketClient {} impl From>> for NetWebSocketClient { fn from(value: WebSocketStream>) -> Self { diff --git a/packages/lib/src/lua/net/ws_server.rs b/packages/lib/src/lua/net/ws_server.rs index be8b42e..7a7894b 100644 --- a/packages/lib/src/lua/net/ws_server.rs +++ b/packages/lib/src/lua/net/ws_server.rs @@ -8,68 +8,56 @@ use hyper_tungstenite::{tungstenite::Message as WsMessage, WebSocketStream}; use futures_util::{SinkExt, StreamExt}; use tokio::sync::Mutex; +use crate::utils::table::TableBuilder; + +type Inner = Arc>>; + #[derive(Debug, Clone)] -pub struct NetWebSocketServer(Arc>>); +pub struct NetWebSocketServer(Inner); impl NetWebSocketServer { - pub async fn close(&self) -> LuaResult<()> { - let mut ws = self.0.lock().await; - ws.close(None).await.map_err(LuaError::external)?; + async fn close(&self) -> LuaResult<()> { + self.0.lock().await.close(None).await; Ok(()) } - pub async fn send(&self, msg: WsMessage) -> LuaResult<()> { - let mut ws = self.0.lock().await; - ws.send(msg).await.map_err(LuaError::external)?; - Ok(()) + async fn send(&self, msg: String) -> LuaResult<()> { + self.0 + .lock() + .await + .send(WsMessage::Text(msg)) + .await + .map_err(LuaError::external) } - pub async fn next(&self) -> LuaResult> { - let mut ws = self.0.lock().await; - let item = ws.next().await.transpose(); - item.map_err(LuaError::external) + async fn next<'a>(&self, lua: &'static Lua) -> LuaResult> { + let item = self + .0 + .lock() + .await + .next() + .await + .transpose() + .map_err(LuaError::external)?; + Ok(match item { + None => LuaValue::Nil, + Some(msg) => match msg { + WsMessage::Binary(bin) => LuaValue::String(lua.create_string(&bin)?), + WsMessage::Text(txt) => LuaValue::String(lua.create_string(&txt)?), + _ => LuaValue::Nil, + }, + }) } - pub async fn into_proper(self, lua: &'static Lua) -> LuaResult { - // HACK: This creates a new userdata that consumes and proxies this one, - // since there's no great way to implement this in pure async Rust - // and as a plain table without tons of strange lifetime issues - let chunk = r#" - local ws = ... - local proxy = newproxy(true) - local meta = getmetatable(proxy) - meta.__index = { - close = function() return ws:close() end, - send = function(...) return ws:send(...) end, - next = function() return ws:next() end, - } - meta.__iter = function() - return function() - return ws:next() - end - end - return proxy - "#; - lua.load(chunk).call_async(self).await - } -} - -impl LuaUserData for NetWebSocketServer { - fn add_methods<'lua, M: LuaUserDataMethods<'lua, Self>>(methods: &mut M) { - methods.add_async_method("close", |_, this, _: ()| async move { this.close().await }); - methods.add_async_method("send", |_, this, msg: String| async move { - this.send(WsMessage::Text(msg)).await - }); - methods.add_async_method("next", |lua, this, _: ()| async move { - match this.next().await? { - Some(msg) => Ok(match msg { - WsMessage::Binary(bin) => LuaValue::String(lua.create_string(&bin)?), - WsMessage::Text(txt) => LuaValue::String(lua.create_string(&txt)?), - _ => LuaValue::Nil, - }), - None => Ok(LuaValue::Nil), - } - }); + pub fn into_lua_table(self, lua: &'static Lua) -> LuaResult { + let inner_close = self.clone(); + let inner_send = self.clone(); + let inner_next = self.clone(); + TableBuilder::new(lua)? + .with_async_function("close", move |_, _: ()| inner_close.close())? + .with_async_function("send", move |_, msg: String| inner_send.send(msg))? + .with_async_function("next", move |lua, _: ()| inner_next.next(lua))? + .build_readonly() } } diff --git a/packages/lib/src/lua/task/scheduler.rs b/packages/lib/src/lua/task/scheduler.rs index ef2a558..c116af3 100644 --- a/packages/lib/src/lua/task/scheduler.rs +++ b/packages/lib/src/lua/task/scheduler.rs @@ -18,12 +18,19 @@ use tokio::{ time::{sleep, Instant}, }; +use crate::utils::table::TableBuilder; + type TaskSchedulerQueue = Arc>>; type TaskFutureArgsOverride<'fut> = Option>>; type TaskFutureReturns<'fut> = LuaResult>; type TaskFuture<'fut> = LocalBoxFuture<'fut, (TaskReference, TaskFutureReturns<'fut>)>; +const TASK_ASYNC_IMPL_LUA: &str = r#" +resume_async(thread(), ...) +return yield() +"#; + /// An enum representing different kinds of tasks #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] pub enum TaskKind { @@ -441,7 +448,6 @@ impl<'fut> TaskScheduler<'fut> { The given lua thread or function will be resumed using the optional arguments returned by the future. */ - #[allow(dead_code)] pub fn schedule_async( &self, thread_or_function: LuaValue<'_>, @@ -450,6 +456,42 @@ impl<'fut> TaskScheduler<'fut> { self.queue_async(thread_or_function, None, None, fut) } + /** + Creates a function callable from Lua that runs an async + closure and returns the results of it to the call site. + */ + pub fn make_scheduled_async_fn(&self, func: F) -> LuaResult + where + A: FromLuaMulti<'static>, + R: ToLuaMulti<'static>, + F: 'static + Fn(&'static Lua, A) -> FR, + FR: 'static + Future>, + { + let async_env_thread: LuaFunction = self.lua.named_registry_value("co.thread")?; + let async_env_yield: LuaFunction = self.lua.named_registry_value("co.yield")?; + self.lua + .load(TASK_ASYNC_IMPL_LUA) + .set_environment( + TableBuilder::new(self.lua)? + .with_value("thread", async_env_thread)? + .with_value("yield", async_env_yield)? + .with_function( + "resume_async", + move |lua: &Lua, (thread, args): (LuaThread, A)| { + let fut = func(lua, args); + let sched = lua.app_data_mut::<&TaskScheduler>().unwrap(); + sched.schedule_async(LuaValue::Thread(thread), async { + let rets = fut.await?; + let mult = rets.to_lua_multi(lua)?; + Ok(Some(mult.into_vec())) + }) + }, + )? + .build_readonly()?, + )? + .into_function() + } + /** Checks if a task still exists in the scheduler. diff --git a/packages/lib/src/tests.rs b/packages/lib/src/tests.rs index da80e12..aa04bbb 100644 --- a/packages/lib/src/tests.rs +++ b/packages/lib/src/tests.rs @@ -48,7 +48,8 @@ create_tests! { net_request_redirect: "net/request/redirect", net_json_decode: "net/json/decode", net_json_encode: "net/json/encode", - net_serve: "net/serve", + net_serve_requests: "net/serve/requests", + net_serve_websockets: "net/serve/websockets", process_args: "process/args", process_cwd: "process/cwd", process_env: "process/env", diff --git a/packages/lib/src/utils/process.rs b/packages/lib/src/utils/process.rs index d211c38..7bf591a 100644 --- a/packages/lib/src/utils/process.rs +++ b/packages/lib/src/utils/process.rs @@ -1,7 +1,7 @@ use std::process::ExitStatus; use mlua::prelude::*; -use tokio::{io, process::Child, task::spawn}; +use tokio::{io, process::Child, task}; use crate::utils::futures::AsyncTeeWriter; @@ -11,7 +11,15 @@ pub async fn pipe_and_inherit_child_process_stdio( let mut child_stdout = child.stdout.take().unwrap(); let mut child_stderr = child.stderr.take().unwrap(); - let stdout_thread = spawn(async move { + /* + NOTE: We do not need to register these + independent tasks spawning in the scheduler + + This function is only used by `process.spawn` which in + turn registers a task with the scheduler that awaits this + */ + + let stdout_thread = task::spawn(async move { let mut stdout = io::stdout(); let mut tee = AsyncTeeWriter::new(&mut stdout); @@ -22,7 +30,7 @@ pub async fn pipe_and_inherit_child_process_stdio( Ok::<_, LuaError>(tee.into_vec()) }); - let stderr_thread = spawn(async move { + let stderr_thread = task::spawn(async move { let mut stderr = io::stderr(); let mut tee = AsyncTeeWriter::new(&mut stderr); diff --git a/packages/lib/src/utils/table.rs b/packages/lib/src/utils/table.rs index d6435df..207b72b 100644 --- a/packages/lib/src/utils/table.rs +++ b/packages/lib/src/utils/table.rs @@ -2,6 +2,8 @@ use std::future::Future; use mlua::prelude::*; +use crate::lua::task::TaskScheduler; + pub struct TableBuilder { lua: &'static Lua, tab: LuaTable<'static>, @@ -76,8 +78,9 @@ impl TableBuilder { F: 'static + Fn(&'static Lua, A) -> FR, FR: 'static + Future>, { - let f = self.lua.create_async_function(func)?; - self.with_value(key, LuaValue::Function(f)) + let sched = self.lua.app_data_mut::<&TaskScheduler>().unwrap(); + let func = sched.make_scheduled_async_fn(func)?; + self.with_value(key, LuaValue::Function(func)) } pub fn build_readonly(self) -> LuaResult> { diff --git a/tests/net/serve.luau b/tests/net/serve/requests.luau similarity index 56% rename from tests/net/serve.luau rename to tests/net/serve/requests.luau index 75bcf1b..317a6e9 100644 --- a/tests/net/serve.luau +++ b/tests/net/serve/requests.luau @@ -1,9 +1,12 @@ local PORT = 8080 local URL = `http://127.0.0.1:{PORT}` -local WS_URL = `ws://127.0.0.1:{PORT}` -local REQUEST = "Hello from client!" local RESPONSE = "Hello, lune!" +local thread = task.delay(0.2, function() + task.spawn(error, "Serve must not block the current thread") + process.exit(1) +end) + local handle = net.serve(PORT, function(request) -- info("Request:", request) -- info("Responding with", RESPONSE) @@ -16,6 +19,7 @@ end) local response = net.request(URL .. "/some/path?key=param1&key=param2&key2=param3").body assert(response == RESPONSE, "Invalid response from server") +task.cancel(thread) handle.stop() -- Stopping is not guaranteed to happen instantly since it is async, but @@ -54,48 +58,3 @@ assert( or string.find(message, "shut down"), "The error message for calling stop twice on the net serve handle should be descriptive" ) - ---[[ - Serve should also take a full config with handler functions - - A server should also be able to start on the previously closed port -]] -local handle2 = net.serve(PORT, { - handleRequest = function() - return RESPONSE - end, - handleWebSocket = function(socket) - local socketMessage = socket.next() - assert(socketMessage == REQUEST, "Invalid web socket request from client") - socket.send(RESPONSE) - socket.close() - end, -}) - -local response3 = net.request(URL).body -assert(response3 == RESPONSE, "Invalid response from server") - --- Web socket client should work -local socket = net.socket(WS_URL) - -socket.send(REQUEST) - -local socketMessage = socket.next() -assert(socketMessage ~= nil, "Got no web socket response from server") -assert(socketMessage == RESPONSE, "Invalid web socket response from server") - -socket.close() - --- Wait for the socket to close and make sure we can't send messages afterwards -task.wait() -local success3, err2 = (pcall :: any)(socket.send, "") -assert(not success3, "Sending messages after the socket has been closed should error") -local message2 = tostring(err2) -assert( - string.find(message2, "close") or string.find(message2, "closing"), - "The error message for sending messages on a closed web socket should be descriptive" -) - --- Stop the server to end the test - -handle2.stop() diff --git a/tests/net/serve/websockets.luau b/tests/net/serve/websockets.luau new file mode 100644 index 0000000..0881b66 --- /dev/null +++ b/tests/net/serve/websockets.luau @@ -0,0 +1,64 @@ +local PORT = 8080 +local URL = `http://127.0.0.1:{PORT}` +local WS_URL = `ws://127.0.0.1:{PORT}` +local REQUEST = "Hello from client!" +local RESPONSE = "Hello, lune!" + +local thread = task.delay(0.2, function() + task.spawn(error, "Serve must not block the current thread") + process.exit(1) +end) + +--[[ + Serve should also take a full config with handler functions + + A server should also be able to start on a previously closed port +]] + +local handle = net.serve(PORT, function(request) + return RESPONSE +end) + +task.cancel(thread) +handle.stop() +task.wait() + +local handle2 = net.serve(PORT, { + handleRequest = function() + return RESPONSE + end, + handleWebSocket = function(socket) + local socketMessage = socket.next() + assert(socketMessage == REQUEST, "Invalid web socket request from client") + socket.send(RESPONSE) + socket.close() + end, +}) + +local response = net.request(URL).body +assert(response == RESPONSE, "Invalid response from server") + +-- Web socket client should work +local socket = net.socket(WS_URL) + +socket.send(REQUEST) + +local socketMessage = socket.next() +assert(socketMessage ~= nil, "Got no web socket response from server") +assert(socketMessage == RESPONSE, "Invalid web socket response from server") + +socket.close() + +-- Wait for the socket to close and make sure we can't send messages afterwards +task.wait() +local success3, err2 = (pcall :: any)(socket.send, "") +assert(not success3, "Sending messages after the socket has been closed should error") +local message2 = tostring(err2) +assert( + string.find(message2, "close") or string.find(message2, "closing"), + "The error message for sending messages on a closed web socket should be descriptive" +) + +-- Stop the server to end the test + +handle2.stop()