Fix websockets memory leak

This commit is contained in:
Filip Tibell 2023-03-21 15:46:14 +01:00
parent 29a3b41e15
commit 1f887cef07
No known key found for this signature in database
2 changed files with 185 additions and 105 deletions

View file

@ -64,10 +64,14 @@ end
* `"co.yield"` -> `coroutine.yield` * `"co.yield"` -> `coroutine.yield`
* `"co.close"` -> `coroutine.close` * `"co.close"` -> `coroutine.close`
--- ---
* `"tab.pack"` -> `table.pack`
* `"tab.unpack"` -> `table.unpack`
* `"tab.freeze"` -> `table.freeze`
* `"tab.getmeta"` -> `getmetatable`
* `"tab.setmeta"` -> `setmetatable`
---
* `"dbg.info"` -> `debug.info` * `"dbg.info"` -> `debug.info`
* `"dbg.trace"` -> `debug.traceback` * `"dbg.trace"` -> `debug.traceback`
* `"dbg.iserr"` -> `<custom function>`
* `"dbg.makeerr"` -> `<custom function>`
--- ---
*/ */
pub fn create() -> LuaResult<&'static Lua> { pub fn create() -> LuaResult<&'static Lua> {
@ -93,6 +97,15 @@ pub fn create() -> LuaResult<&'static Lua> {
lua.set_named_registry_value("dbg.info", debug.get::<_, LuaFunction>("info")?)?; lua.set_named_registry_value("dbg.info", debug.get::<_, LuaFunction>("info")?)?;
lua.set_named_registry_value("tab.pack", table.get::<_, LuaFunction>("pack")?)?; lua.set_named_registry_value("tab.pack", table.get::<_, LuaFunction>("pack")?)?;
lua.set_named_registry_value("tab.unpack", table.get::<_, LuaFunction>("unpack")?)?; lua.set_named_registry_value("tab.unpack", table.get::<_, LuaFunction>("unpack")?)?;
lua.set_named_registry_value("tab.freeze", table.get::<_, LuaFunction>("freeze")?)?;
lua.set_named_registry_value(
"tab.getmeta",
globals.get::<_, LuaFunction>("getmetatable")?,
)?;
lua.set_named_registry_value(
"tab.setmeta",
globals.get::<_, LuaFunction>("setmetatable")?,
)?;
// Create a trace function that can be called to obtain a full stack trace from // Create a trace function that can be called to obtain a full stack trace from
// lua, this is not possible to do from rust when using our manual scheduler // lua, this is not possible to do from rust when using our manual scheduler
let dbg_trace_env = lua.create_table_with_capacity(0, 1)?; let dbg_trace_env = lua.create_table_with_capacity(0, 1)?;

View file

@ -1,7 +1,15 @@
use std::{cell::Cell, sync::Arc}; use std::{cell::Cell, sync::Arc};
use hyper::upgrade::Upgraded;
use mlua::prelude::*; use mlua::prelude::*;
use futures_util::{SinkExt, StreamExt};
use tokio::{
io::{AsyncRead, AsyncWrite},
net::TcpStream,
sync::Mutex as AsyncMutex,
};
use hyper_tungstenite::{ use hyper_tungstenite::{
tungstenite::{ tungstenite::{
protocol::{frame::coding::CloseCode as WsCloseCode, CloseFrame as WsCloseFrame}, protocol::{frame::coding::CloseCode as WsCloseCode, CloseFrame as WsCloseFrame},
@ -9,19 +17,43 @@ use hyper_tungstenite::{
}, },
WebSocketStream, WebSocketStream,
}; };
use tokio_tungstenite::MaybeTlsStream;
use futures_util::{SinkExt, StreamExt};
use tokio::{
io::{AsyncRead, AsyncWrite},
sync::Mutex,
};
use crate::lua::table::TableBuilder; use crate::lua::table::TableBuilder;
#[derive(Debug, Clone)] const WEB_SOCKET_IMPL_LUA: &str = r#"
return freeze(setmetatable({
close = function(...)
return close(websocket, ...)
end,
send = function(...)
return send(websocket, ...)
end,
next = function(...)
return next(websocket, ...)
end,
}, {
__index = function(self, key)
if key == "closeCode" then
return close_code()
end
end,
}))
"#;
#[derive(Debug)]
pub struct NetWebSocket<T> { pub struct NetWebSocket<T> {
close_code: Cell<Option<u16>>, close_code: Arc<Cell<Option<u16>>>,
stream: Arc<Mutex<WebSocketStream<T>>>, stream: Arc<AsyncMutex<WebSocketStream<T>>>,
}
impl<T> Clone for NetWebSocket<T> {
fn clone(&self) -> Self {
Self {
close_code: Arc::clone(&self.close_code),
stream: Arc::clone(&self.stream),
}
}
} }
impl<T> NetWebSocket<T> impl<T> NetWebSocket<T>
@ -30,107 +62,142 @@ where
{ {
pub fn new(value: WebSocketStream<T>) -> Self { pub fn new(value: WebSocketStream<T>) -> Self {
Self { Self {
close_code: Cell::new(None), close_code: Arc::new(Cell::new(None)),
stream: Arc::new(Mutex::new(value)), stream: Arc::new(AsyncMutex::new(value)),
} }
} }
pub fn get_lua_close_code(&self) -> LuaValue { fn into_lua_table_with_env<'lua>(
match self.close_code.get() { lua: &'lua Lua,
Some(code) => LuaValue::Number(code as f64), env: LuaTable<'lua>,
None => LuaValue::Nil, ) -> LuaResult<LuaTable<'lua>> {
} lua.load(WEB_SOCKET_IMPL_LUA)
} .set_name("websocket")?
.set_environment(env)?
pub async fn close(&self, code: Option<u16>) -> LuaResult<()> { .eval()
let mut ws = self.stream.lock().await;
let res = ws.close(Some(WsCloseFrame {
code: match code {
Some(code) if (1000..=4999).contains(&code) => WsCloseCode::from(code),
Some(code) => {
return Err(LuaError::RuntimeError(format!(
"Close code must be between 1000 and 4999, got {code}"
)))
}
None => WsCloseCode::Normal,
},
reason: "".into(),
}));
res.await.map_err(LuaError::external)
}
pub async fn send(&self, msg: WsMessage) -> LuaResult<()> {
let mut ws = self.stream.lock().await;
ws.send(msg).await.map_err(LuaError::external)
}
pub async fn send_lua_string<'lua>(
&self,
string: LuaString<'lua>,
as_binary: Option<bool>,
) -> LuaResult<()> {
let msg = if matches!(as_binary, Some(true)) {
WsMessage::Binary(string.as_bytes().to_vec())
} else {
let s = string.to_str().map_err(LuaError::external)?;
WsMessage::Text(s.to_string())
};
self.send(msg).await
}
pub async fn next(&self) -> LuaResult<Option<WsMessage>> {
let mut ws = self.stream.lock().await;
let item = ws.next().await.transpose().map_err(LuaError::external);
match item {
Ok(Some(WsMessage::Close(msg))) => {
if let Some(msg) = &msg {
self.close_code.replace(Some(msg.code.into()));
}
Ok(Some(WsMessage::Close(msg)))
}
val => val,
}
}
pub async fn next_lua_string<'lua>(&'lua self, lua: &'lua Lua) -> LuaResult<LuaValue> {
while let Some(msg) = self.next().await? {
let msg_string_opt = match msg {
WsMessage::Binary(bin) => Some(lua.create_string(&bin)?),
WsMessage::Text(txt) => Some(lua.create_string(&txt)?),
// Stop waiting for next message if we get a close message
WsMessage::Close(_) => return Ok(LuaValue::Nil),
// Ignore ping/pong/frame messages, they are handled by tungstenite
_ => None,
};
if let Some(msg_string) = msg_string_opt {
return Ok(LuaValue::String(msg_string));
}
}
Ok(LuaValue::Nil)
} }
} }
impl<T> NetWebSocket<T> type NetWebSocketStreamClient = MaybeTlsStream<TcpStream>;
where impl NetWebSocket<NetWebSocketStreamClient> {
T: AsyncRead + AsyncWrite + Unpin + 'static,
{
pub fn into_lua_table(self, lua: &'static Lua) -> LuaResult<LuaTable> { pub fn into_lua_table(self, lua: &'static Lua) -> LuaResult<LuaTable> {
let ws = Box::leak(Box::new(self)); let socket_env = TableBuilder::new(lua)?
TableBuilder::new(lua)? .with_value("websocket", self)?
.with_async_function("close", |_, code| ws.close(code))? .with_function("close_code", close_code::<NetWebSocketStreamClient>)?
.with_async_function("send", |_, (msg, bin)| ws.send_lua_string(msg, bin))? .with_async_function("close", close::<NetWebSocketStreamClient>)?
.with_async_function("next", |lua, _: ()| ws.next_lua_string(lua))? .with_async_function("send", send::<NetWebSocketStreamClient>)?
.with_metatable( .with_async_function("next", next::<NetWebSocketStreamClient>)?
TableBuilder::new(lua)? .with_value(
.with_function(LuaMetaMethod::Index.name(), |_, key: String| { "setmetatable",
if key == "closeCode" { lua.named_registry_value::<_, LuaFunction>("tab.setmeta")?,
Ok(ws.get_lua_close_code())
} else {
Ok(LuaValue::Nil)
}
})?
.build_readonly()?,
)? )?
.build_readonly() .with_value(
"freeze",
lua.named_registry_value::<_, LuaFunction>("tab.freeze")?,
)?
.build_readonly()?;
Self::into_lua_table_with_env(lua, socket_env)
} }
} }
type NetWebSocketStreamServer = Upgraded;
impl NetWebSocket<NetWebSocketStreamServer> {
pub fn into_lua_table(self, lua: &'static Lua) -> LuaResult<LuaTable> {
let socket_env = TableBuilder::new(lua)?
.with_value("websocket", self)?
.with_function("close_code", close_code::<NetWebSocketStreamServer>)?
.with_async_function("close", close::<NetWebSocketStreamServer>)?
.with_async_function("send", send::<NetWebSocketStreamServer>)?
.with_async_function("next", next::<NetWebSocketStreamServer>)?
.with_value(
"setmetatable",
lua.named_registry_value::<_, LuaFunction>("tab.setmeta")?,
)?
.with_value(
"freeze",
lua.named_registry_value::<_, LuaFunction>("tab.freeze")?,
)?
.build_readonly()?;
Self::into_lua_table_with_env(lua, socket_env)
}
}
impl<T> LuaUserData for NetWebSocket<T> {}
fn close_code<T>(_lua: &Lua, socket: NetWebSocket<T>) -> LuaResult<LuaValue>
where
T: AsyncRead + AsyncWrite + Unpin,
{
Ok(match socket.close_code.get() {
Some(code) => LuaValue::Number(code as f64),
None => LuaValue::Nil,
})
}
async fn close<T>(_lua: &Lua, (socket, code): (NetWebSocket<T>, Option<u16>)) -> LuaResult<()>
where
T: AsyncRead + AsyncWrite + Unpin,
{
let mut ws = socket.stream.lock().await;
let res = ws.close(Some(WsCloseFrame {
code: match code {
Some(code) if (1000..=4999).contains(&code) => WsCloseCode::from(code),
Some(code) => {
return Err(LuaError::RuntimeError(format!(
"Close code must be between 1000 and 4999, got {code}"
)))
}
None => WsCloseCode::Normal,
},
reason: "".into(),
}));
res.await.map_err(LuaError::external)
}
async fn send<T>(
_lua: &Lua,
(socket, string, as_binary): (NetWebSocket<T>, LuaString<'_>, Option<bool>),
) -> LuaResult<()>
where
T: AsyncRead + AsyncWrite + Unpin,
{
let msg = if matches!(as_binary, Some(true)) {
WsMessage::Binary(string.as_bytes().to_vec())
} else {
let s = string.to_str().map_err(LuaError::external)?;
WsMessage::Text(s.to_string())
};
let mut ws = socket.stream.lock().await;
ws.send(msg).await.map_err(LuaError::external)
}
async fn next<T>(lua: &Lua, socket: NetWebSocket<T>) -> LuaResult<LuaValue>
where
T: AsyncRead + AsyncWrite + Unpin,
{
let mut ws = socket.stream.lock().await;
let item = ws.next().await.transpose().map_err(LuaError::external);
let msg = match item {
Ok(Some(WsMessage::Close(msg))) => {
if let Some(msg) = &msg {
socket.close_code.replace(Some(msg.code.into()));
}
Ok(Some(WsMessage::Close(msg)))
}
val => val,
}?;
while let Some(msg) = &msg {
let msg_string_opt = match msg {
WsMessage::Binary(bin) => Some(lua.create_string(&bin)?),
WsMessage::Text(txt) => Some(lua.create_string(&txt)?),
// Stop waiting for next message if we get a close message
WsMessage::Close(_) => return Ok(LuaValue::Nil),
// Ignore ping/pong/frame messages, they are handled by tungstenite
_ => None,
};
if let Some(msg_string) = msg_string_opt {
return Ok(LuaValue::String(msg_string));
}
}
Ok(LuaValue::Nil)
}