Implement bulk of async APIs

This commit is contained in:
Filip Tibell 2023-02-14 13:17:07 +01:00
parent b1b69c7d94
commit d2ff0783a5
No known key found for this signature in database
13 changed files with 246 additions and 182 deletions

3
Cargo.lock generated
View file

@ -757,9 +757,6 @@ dependencies = [
"bstr",
"cc",
"erased-serde",
"futures-core",
"futures-task",
"futures-util",
"luau0-src",
"num-traits",
"once_cell",

View file

@ -32,7 +32,7 @@ os_str_bytes = "6.4.1"
hyper = { version = "0.14.24", features = ["full"] }
hyper-tungstenite = { version = "0.9.0" }
tokio-tungstenite = { version = "0.18.0" }
mlua = { version = "0.8.7", features = ["luau", "async", "serialize"] }
mlua = { version = "0.8.7", features = ["luau", "serialize"] }
[dev-dependencies]
anyhow = "1.0.69"

View file

@ -16,7 +16,11 @@ use reqwest::Method;
use tokio::{sync::mpsc, task};
use crate::{
lua::net::{NetClient, NetClientBuilder, NetWebSocketClient, NetWebSocketServer, ServeConfig},
lua::{
// net::{NetWebSocketClient, NetWebSocketServer},
net::{NetClient, NetClientBuilder, ServeConfig},
task::TaskScheduler,
},
utils::{net::get_request_user_agent_header, table::TableBuilder},
};
@ -141,13 +145,14 @@ async fn net_request<'a>(lua: &'static Lua, config: LuaValue<'a>) -> LuaResult<L
.build_readonly()
}
async fn net_socket<'a>(lua: &'static Lua, url: String) -> LuaResult<LuaAnyUserData> {
let (stream, _) = tokio_tungstenite::connect_async(url)
async fn net_socket<'a>(lua: &'static Lua, url: String) -> LuaResult<LuaTable> {
let (ws, _) = tokio_tungstenite::connect_async(url)
.await
.map_err(LuaError::external)?;
let ws_lua = NetWebSocketClient::from(stream);
let ws_proper = ws_lua.into_proper(lua).await?;
Ok(ws_proper)
todo!()
// let sock = NetWebSocketClient::from(ws);
// let table = sock.into_lua_table(lua)?;
// Ok(table)
}
async fn net_serve<'a>(
@ -223,13 +228,21 @@ impl Service<Request<Body>> for NetService {
let key = kopt.as_ref().as_ref().unwrap();
let handler: LuaFunction = lua.registry_value(key).expect("Missing websocket handler");
let (response, ws) = ws_upgrade(&mut req, None).expect("Failed to upgrade websocket");
// TODO: This should be spawned as part of the scheduler,
// the scheduler may exit early and cancel this even though what
// we want here is a long-running task that keeps the program alive
task::spawn_local(async move {
// Create our new full websocket object
// Create our new full websocket object, then
// schedule our handler to get called asap
let ws = ws.await.map_err(LuaError::external)?;
let ws_lua = NetWebSocketServer::from(ws);
let ws_proper = ws_lua.into_proper(lua).await?;
// Call our handler with it
handler.call_async::<_, ()>(ws_proper).await
// let sock = NetWebSocketServer::from(ws);
// let table = sock.into_lua_table(lua)?;
// let sched = lua.app_data_mut::<&TaskScheduler>().unwrap();
// sched.schedule_current_resume(
// LuaValue::Function(handler),
// LuaMultiValue::from_vec(vec![LuaValue::Table(table)]),
// )
Ok::<_, LuaError>(())
});
Box::pin(async move { Ok(response) })
} else {
@ -274,7 +287,7 @@ impl Service<Request<Body>> for NetService {
.with_value("headers", header_map)?
.with_value("body", lua.create_string(&bytes)?)?
.build_readonly()?;
match handler.call_async(request).await {
match handler.call(request) {
// Plain strings from the handler are plaintext responses
Ok(LuaValue::String(s)) => Ok(Response::builder()
.status(200)

View file

@ -100,12 +100,6 @@ impl Lune {
lua.set_named_registry_value("co.close", coroutine.get::<_, LuaFunction>("close")?)?;
let debug: LuaTable = lua.globals().raw_get("debug")?;
lua.set_named_registry_value("dbg.info", debug.get::<_, LuaFunction>("info")?)?;
// Add in wanted lune globals
for global in self.includes.clone() {
if !self.excludes.contains(&global) {
global.inject(lua)?;
}
}
// Create our task scheduler and schedule the main thread on it
let sched = TaskScheduler::new(lua)?.into_static();
lua.set_app_data(sched);
@ -119,6 +113,13 @@ impl Lune {
),
LuaValue::Nil.to_lua_multi(lua)?,
)?;
// Create our wanted lune globals, some of these need
// the task scheduler be available during construction
for global in self.includes.clone() {
if !self.excludes.contains(&global) {
global.inject(lua)?;
}
}
// Keep running the scheduler until there are either no tasks
// left to run, or until a task requests to exit the process
let exit_code = LocalSet::new()

View file

@ -1,9 +1,9 @@
mod client;
mod config;
mod ws_client;
mod ws_server;
// mod ws_client;
// mod ws_server;
pub use client::{NetClient, NetClientBuilder};
pub use config::ServeConfig;
pub use ws_client::NetWebSocketClient;
pub use ws_server::NetWebSocketServer;
// pub use ws_client::NetWebSocketClient;
// pub use ws_server::NetWebSocketServer;

View file

@ -8,70 +8,58 @@ use futures_util::{SinkExt, StreamExt};
use tokio::{net::TcpStream, sync::Mutex};
use tokio_tungstenite::MaybeTlsStream;
use crate::utils::table::TableBuilder;
#[derive(Debug, Clone)]
pub struct NetWebSocketClient(Arc<Mutex<WebSocketStream<MaybeTlsStream<TcpStream>>>>);
impl NetWebSocketClient {
pub async fn close(&self) -> LuaResult<()> {
let mut ws = self.0.lock().await;
ws.close(None).await.map_err(LuaError::external)?;
async fn close(&self) -> LuaResult<()> {
self.0.lock().await.close(None).await;
Ok(())
}
pub async fn send(&self, msg: WsMessage) -> LuaResult<()> {
let mut ws = self.0.lock().await;
ws.send(msg).await.map_err(LuaError::external)?;
Ok(())
async fn send(&self, msg: String) -> LuaResult<()> {
self.0
.lock()
.await
.send(WsMessage::Text(msg))
.await
.map_err(LuaError::external)
}
pub async fn next(&self) -> LuaResult<Option<WsMessage>> {
let mut ws = self.0.lock().await;
let item = ws.next().await.transpose();
item.map_err(LuaError::external)
async fn next<'a>(&self, lua: &'static Lua) -> LuaResult<LuaValue<'a>> {
let item = self
.0
.lock()
.await
.next()
.await
.transpose()
.map_err(LuaError::external)?;
Ok(match item {
None => LuaValue::Nil,
Some(msg) => match msg {
WsMessage::Binary(bin) => LuaValue::String(lua.create_string(&bin)?),
WsMessage::Text(txt) => LuaValue::String(lua.create_string(&txt)?),
_ => LuaValue::Nil,
},
})
}
pub async fn into_proper(self, lua: &'static Lua) -> LuaResult<LuaAnyUserData> {
// HACK: This creates a new userdata that consumes and proxies this one,
// since there's no great way to implement this in pure async Rust
// and as a plain table without tons of strange lifetime issues
let chunk = r#"
local ws = ...
local proxy = newproxy(true)
local meta = getmetatable(proxy)
meta.__index = {
close = function() return ws:close() end,
send = function(...) return ws:send(...) end,
next = function() return ws:next() end,
}
meta.__iter = function()
return function()
return ws:next()
end
end
return proxy
"#;
lua.load(chunk).call_async(self).await
pub fn into_lua_table(self, lua: &'static Lua) -> LuaResult<LuaTable> {
let inner_close = self.clone();
let inner_send = self.clone();
let inner_next = self.clone();
TableBuilder::new(lua)?
.with_async_function("close", move |_, _: ()| inner_close.close())?
.with_async_function("send", move |_, msg: String| inner_send.send(msg))?
.with_async_function("next", move |lua, _: ()| inner_next.next(lua))?
.build_readonly()
}
}
impl LuaUserData for NetWebSocketClient {
fn add_methods<'lua, M: LuaUserDataMethods<'lua, Self>>(methods: &mut M) {
methods.add_async_method("close", |_, this, _: ()| async move { this.close().await });
methods.add_async_method("send", |_, this, msg: String| async move {
this.send(WsMessage::Text(msg)).await
});
methods.add_async_method("next", |lua, this, _: ()| async move {
match this.next().await? {
Some(msg) => Ok(match msg {
WsMessage::Binary(bin) => LuaValue::String(lua.create_string(&bin)?),
WsMessage::Text(txt) => LuaValue::String(lua.create_string(&txt)?),
_ => LuaValue::Nil,
}),
None => Ok(LuaValue::Nil),
}
});
}
}
impl LuaUserData for NetWebSocketClient {}
impl From<WebSocketStream<MaybeTlsStream<TcpStream>>> for NetWebSocketClient {
fn from(value: WebSocketStream<MaybeTlsStream<TcpStream>>) -> Self {

View file

@ -8,68 +8,56 @@ use hyper_tungstenite::{tungstenite::Message as WsMessage, WebSocketStream};
use futures_util::{SinkExt, StreamExt};
use tokio::sync::Mutex;
use crate::utils::table::TableBuilder;
type Inner = Arc<Mutex<WebSocketStream<Upgraded>>>;
#[derive(Debug, Clone)]
pub struct NetWebSocketServer(Arc<Mutex<WebSocketStream<Upgraded>>>);
pub struct NetWebSocketServer(Inner);
impl NetWebSocketServer {
pub async fn close(&self) -> LuaResult<()> {
let mut ws = self.0.lock().await;
ws.close(None).await.map_err(LuaError::external)?;
async fn close(&self) -> LuaResult<()> {
self.0.lock().await.close(None).await;
Ok(())
}
pub async fn send(&self, msg: WsMessage) -> LuaResult<()> {
let mut ws = self.0.lock().await;
ws.send(msg).await.map_err(LuaError::external)?;
Ok(())
async fn send(&self, msg: String) -> LuaResult<()> {
self.0
.lock()
.await
.send(WsMessage::Text(msg))
.await
.map_err(LuaError::external)
}
pub async fn next(&self) -> LuaResult<Option<WsMessage>> {
let mut ws = self.0.lock().await;
let item = ws.next().await.transpose();
item.map_err(LuaError::external)
async fn next<'a>(&self, lua: &'static Lua) -> LuaResult<LuaValue<'a>> {
let item = self
.0
.lock()
.await
.next()
.await
.transpose()
.map_err(LuaError::external)?;
Ok(match item {
None => LuaValue::Nil,
Some(msg) => match msg {
WsMessage::Binary(bin) => LuaValue::String(lua.create_string(&bin)?),
WsMessage::Text(txt) => LuaValue::String(lua.create_string(&txt)?),
_ => LuaValue::Nil,
},
})
}
pub async fn into_proper(self, lua: &'static Lua) -> LuaResult<LuaAnyUserData> {
// HACK: This creates a new userdata that consumes and proxies this one,
// since there's no great way to implement this in pure async Rust
// and as a plain table without tons of strange lifetime issues
let chunk = r#"
local ws = ...
local proxy = newproxy(true)
local meta = getmetatable(proxy)
meta.__index = {
close = function() return ws:close() end,
send = function(...) return ws:send(...) end,
next = function() return ws:next() end,
}
meta.__iter = function()
return function()
return ws:next()
end
end
return proxy
"#;
lua.load(chunk).call_async(self).await
}
}
impl LuaUserData for NetWebSocketServer {
fn add_methods<'lua, M: LuaUserDataMethods<'lua, Self>>(methods: &mut M) {
methods.add_async_method("close", |_, this, _: ()| async move { this.close().await });
methods.add_async_method("send", |_, this, msg: String| async move {
this.send(WsMessage::Text(msg)).await
});
methods.add_async_method("next", |lua, this, _: ()| async move {
match this.next().await? {
Some(msg) => Ok(match msg {
WsMessage::Binary(bin) => LuaValue::String(lua.create_string(&bin)?),
WsMessage::Text(txt) => LuaValue::String(lua.create_string(&txt)?),
_ => LuaValue::Nil,
}),
None => Ok(LuaValue::Nil),
}
});
pub fn into_lua_table(self, lua: &'static Lua) -> LuaResult<LuaTable> {
let inner_close = self.clone();
let inner_send = self.clone();
let inner_next = self.clone();
TableBuilder::new(lua)?
.with_async_function("close", move |_, _: ()| inner_close.close())?
.with_async_function("send", move |_, msg: String| inner_send.send(msg))?
.with_async_function("next", move |lua, _: ()| inner_next.next(lua))?
.build_readonly()
}
}

View file

@ -18,12 +18,19 @@ use tokio::{
time::{sleep, Instant},
};
use crate::utils::table::TableBuilder;
type TaskSchedulerQueue = Arc<Mutex<VecDeque<TaskReference>>>;
type TaskFutureArgsOverride<'fut> = Option<Vec<LuaValue<'fut>>>;
type TaskFutureReturns<'fut> = LuaResult<TaskFutureArgsOverride<'fut>>;
type TaskFuture<'fut> = LocalBoxFuture<'fut, (TaskReference, TaskFutureReturns<'fut>)>;
const TASK_ASYNC_IMPL_LUA: &str = r#"
resume_async(thread(), ...)
return yield()
"#;
/// An enum representing different kinds of tasks
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum TaskKind {
@ -441,7 +448,6 @@ impl<'fut> TaskScheduler<'fut> {
The given lua thread or function will be resumed
using the optional arguments returned by the future.
*/
#[allow(dead_code)]
pub fn schedule_async(
&self,
thread_or_function: LuaValue<'_>,
@ -450,6 +456,42 @@ impl<'fut> TaskScheduler<'fut> {
self.queue_async(thread_or_function, None, None, fut)
}
/**
Creates a function callable from Lua that runs an async
closure and returns the results of it to the call site.
*/
pub fn make_scheduled_async_fn<A, R, F, FR>(&self, func: F) -> LuaResult<LuaFunction>
where
A: FromLuaMulti<'static>,
R: ToLuaMulti<'static>,
F: 'static + Fn(&'static Lua, A) -> FR,
FR: 'static + Future<Output = LuaResult<R>>,
{
let async_env_thread: LuaFunction = self.lua.named_registry_value("co.thread")?;
let async_env_yield: LuaFunction = self.lua.named_registry_value("co.yield")?;
self.lua
.load(TASK_ASYNC_IMPL_LUA)
.set_environment(
TableBuilder::new(self.lua)?
.with_value("thread", async_env_thread)?
.with_value("yield", async_env_yield)?
.with_function(
"resume_async",
move |lua: &Lua, (thread, args): (LuaThread, A)| {
let fut = func(lua, args);
let sched = lua.app_data_mut::<&TaskScheduler>().unwrap();
sched.schedule_async(LuaValue::Thread(thread), async {
let rets = fut.await?;
let mult = rets.to_lua_multi(lua)?;
Ok(Some(mult.into_vec()))
})
},
)?
.build_readonly()?,
)?
.into_function()
}
/**
Checks if a task still exists in the scheduler.

View file

@ -48,7 +48,8 @@ create_tests! {
net_request_redirect: "net/request/redirect",
net_json_decode: "net/json/decode",
net_json_encode: "net/json/encode",
net_serve: "net/serve",
net_serve_requests: "net/serve/requests",
net_serve_websockets: "net/serve/websockets",
process_args: "process/args",
process_cwd: "process/cwd",
process_env: "process/env",

View file

@ -1,7 +1,7 @@
use std::process::ExitStatus;
use mlua::prelude::*;
use tokio::{io, process::Child, task::spawn};
use tokio::{io, process::Child, task};
use crate::utils::futures::AsyncTeeWriter;
@ -11,7 +11,15 @@ pub async fn pipe_and_inherit_child_process_stdio(
let mut child_stdout = child.stdout.take().unwrap();
let mut child_stderr = child.stderr.take().unwrap();
let stdout_thread = spawn(async move {
/*
NOTE: We do not need to register these
independent tasks spawning in the scheduler
This function is only used by `process.spawn` which in
turn registers a task with the scheduler that awaits this
*/
let stdout_thread = task::spawn(async move {
let mut stdout = io::stdout();
let mut tee = AsyncTeeWriter::new(&mut stdout);
@ -22,7 +30,7 @@ pub async fn pipe_and_inherit_child_process_stdio(
Ok::<_, LuaError>(tee.into_vec())
});
let stderr_thread = spawn(async move {
let stderr_thread = task::spawn(async move {
let mut stderr = io::stderr();
let mut tee = AsyncTeeWriter::new(&mut stderr);

View file

@ -2,6 +2,8 @@ use std::future::Future;
use mlua::prelude::*;
use crate::lua::task::TaskScheduler;
pub struct TableBuilder {
lua: &'static Lua,
tab: LuaTable<'static>,
@ -76,8 +78,9 @@ impl TableBuilder {
F: 'static + Fn(&'static Lua, A) -> FR,
FR: 'static + Future<Output = LuaResult<R>>,
{
let f = self.lua.create_async_function(func)?;
self.with_value(key, LuaValue::Function(f))
let sched = self.lua.app_data_mut::<&TaskScheduler>().unwrap();
let func = sched.make_scheduled_async_fn(func)?;
self.with_value(key, LuaValue::Function(func))
}
pub fn build_readonly(self) -> LuaResult<LuaTable<'static>> {

View file

@ -1,9 +1,12 @@
local PORT = 8080
local URL = `http://127.0.0.1:{PORT}`
local WS_URL = `ws://127.0.0.1:{PORT}`
local REQUEST = "Hello from client!"
local RESPONSE = "Hello, lune!"
local thread = task.delay(0.2, function()
task.spawn(error, "Serve must not block the current thread")
process.exit(1)
end)
local handle = net.serve(PORT, function(request)
-- info("Request:", request)
-- info("Responding with", RESPONSE)
@ -16,6 +19,7 @@ end)
local response = net.request(URL .. "/some/path?key=param1&key=param2&key2=param3").body
assert(response == RESPONSE, "Invalid response from server")
task.cancel(thread)
handle.stop()
-- Stopping is not guaranteed to happen instantly since it is async, but
@ -54,48 +58,3 @@ assert(
or string.find(message, "shut down"),
"The error message for calling stop twice on the net serve handle should be descriptive"
)
--[[
Serve should also take a full config with handler functions
A server should also be able to start on the previously closed port
]]
local handle2 = net.serve(PORT, {
handleRequest = function()
return RESPONSE
end,
handleWebSocket = function(socket)
local socketMessage = socket.next()
assert(socketMessage == REQUEST, "Invalid web socket request from client")
socket.send(RESPONSE)
socket.close()
end,
})
local response3 = net.request(URL).body
assert(response3 == RESPONSE, "Invalid response from server")
-- Web socket client should work
local socket = net.socket(WS_URL)
socket.send(REQUEST)
local socketMessage = socket.next()
assert(socketMessage ~= nil, "Got no web socket response from server")
assert(socketMessage == RESPONSE, "Invalid web socket response from server")
socket.close()
-- Wait for the socket to close and make sure we can't send messages afterwards
task.wait()
local success3, err2 = (pcall :: any)(socket.send, "")
assert(not success3, "Sending messages after the socket has been closed should error")
local message2 = tostring(err2)
assert(
string.find(message2, "close") or string.find(message2, "closing"),
"The error message for sending messages on a closed web socket should be descriptive"
)
-- Stop the server to end the test
handle2.stop()

View file

@ -0,0 +1,64 @@
local PORT = 8080
local URL = `http://127.0.0.1:{PORT}`
local WS_URL = `ws://127.0.0.1:{PORT}`
local REQUEST = "Hello from client!"
local RESPONSE = "Hello, lune!"
local thread = task.delay(0.2, function()
task.spawn(error, "Serve must not block the current thread")
process.exit(1)
end)
--[[
Serve should also take a full config with handler functions
A server should also be able to start on a previously closed port
]]
local handle = net.serve(PORT, function(request)
return RESPONSE
end)
task.cancel(thread)
handle.stop()
task.wait()
local handle2 = net.serve(PORT, {
handleRequest = function()
return RESPONSE
end,
handleWebSocket = function(socket)
local socketMessage = socket.next()
assert(socketMessage == REQUEST, "Invalid web socket request from client")
socket.send(RESPONSE)
socket.close()
end,
})
local response = net.request(URL).body
assert(response == RESPONSE, "Invalid response from server")
-- Web socket client should work
local socket = net.socket(WS_URL)
socket.send(REQUEST)
local socketMessage = socket.next()
assert(socketMessage ~= nil, "Got no web socket response from server")
assert(socketMessage == RESPONSE, "Invalid web socket response from server")
socket.close()
-- Wait for the socket to close and make sure we can't send messages afterwards
task.wait()
local success3, err2 = (pcall :: any)(socket.send, "")
assert(not success3, "Sending messages after the socket has been closed should error")
local message2 = tostring(err2)
assert(
string.find(message2, "close") or string.find(message2, "closing"),
"The error message for sending messages on a closed web socket should be descriptive"
)
-- Stop the server to end the test
handle2.stop()