diff --git a/docs/Protocol.md b/docs/Protocol.md new file mode 100644 index 0000000..425de11 --- /dev/null +++ b/docs/Protocol.md @@ -0,0 +1,47 @@ +# Protocol + +`ruck` is a command line tool used for hosting relay servers and sending end-to-end encrypted files between clients. This document describes the protocol `ruck` uses to support this functionality. + +### Version + +This document refers to version `0.1.0` of `ruck` as defined by the `Cargo.toml` file. + +## Server + +The server in `ruck` exposes a TCP port, typically port `8080`. Its only functions are to staple connections and shuttle bytes between stapled connections. The first 33 bytes sent from a new client are stored in a HashMap. If the same 33 bytes are already in the Hashmap, the connections are then stapled. This 33 byte key is defined by the [Spake2](https://docs.rs/spake2/0.3.1/spake2/) handshake algorithm which the clients employ to negotiate a single use password to encrypt all their messages. Although from the server's perspective, the clients can agree on these 33 bytes in any way. + +Once the connection is stapled, all bytes are piped across until a client disconnects or times out. The time out is set to remove idle connections. Beyond stapling connections, the file negotiation aspect of the protocol is managed by the clients. For this reason, `ruck` servers are very resistant to updates and protocol updates typically do not necessitate new deployments. + +The server does nothing else with the bytes, so the clients are free to end-to-end encrypt their messages, as long as the first 33 bytes sent over the wire match. Other than that, it is a private echo server. + +## Client + +There are two types of clients - `send` and `receive` clients. The following state machine describes the protocol. All the messages after the exchange of passwords are typically bzip compressed, encrypted with Aes256Gcm using a Spake2 key derived from the exchanged password. They are sent over the wire as bincode. Each message has a fixed size of 1024 \* 1024 bytes. + +Message Types: + +- Vec +- + +- Set a timeout for new messages. + +Send or receive. + +If send: + +- Send message with file info + +They exchange passwords. +Send offers a list of files. +Receive specifies which bytes it wants from these files. +Send sends the specified bytes and waits. +Receive sends heartbeats with progress updates. +Send hangs up once the heartbeats stop or received a successful heartbeat. + +```rust +socket.readable().await?; +let mut socket = socket; +let mut buf = BytesMut::with_capacity(33); +socket.read_exact(&mut buf).await?; +let id = buf.freeze(); +``` diff --git a/src/crypto.rs b/src/crypto.rs index 43cf8ef..0fe748c 100644 --- a/src/crypto.rs +++ b/src/crypto.rs @@ -17,10 +17,15 @@ pub async fn handshake( Spake2::::start_symmetric(&Password::new(password), &Identity::new(&id)); println!("client - sending handshake msg"); let handshake_msg = Message::HandshakeMessage(HandshakePayload { - id, - msg: Bytes::from(outbound_msg), + id: id.clone(), + msg: Bytes::from(outbound_msg.clone()), }); - // println!("client - handshake msg, {:?}", handshake_msg); + println!("client - handshake msg, {:?}", handshake_msg); + println!( + "len id: {:?}. len msg: {:?}", + id.len(), + Bytes::from(outbound_msg).len() + ); stream.send(handshake_msg).await?; let first_message = match stream.next().await { Some(Ok(msg)) => match msg { diff --git a/src/server.rs b/src/server.rs index 759e154..498efd4 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,16 +1,14 @@ -use crate::message::{HandshakePayload, Message, MessageStream, RuckError}; - use anyhow::{anyhow, Result}; -use bytes::Bytes; -use futures::prelude::*; +use bytes::{Bytes, BytesMut}; use std::collections::HashMap; use std::net::SocketAddr; use std::sync::Arc; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::{TcpListener, TcpStream}; use tokio::sync::{mpsc, Mutex}; -type Tx = mpsc::UnboundedSender; -type Rx = mpsc::UnboundedReceiver; +type Tx = mpsc::UnboundedSender; +type Rx = mpsc::UnboundedReceiver; pub struct Shared { handshake_cache: HashMap, @@ -18,12 +16,12 @@ pub struct Shared { type State = Arc>; struct Client { - messages: MessageStream, + socket: TcpStream, rx: Rx, peer_tx: Option, } struct StapledClient { - messages: MessageStream, + socket: TcpStream, rx: Rx, peer_tx: Tx, } @@ -37,11 +35,11 @@ impl Shared { } impl Client { - async fn new(id: Bytes, state: State, messages: MessageStream) -> Result { + async fn new(id: Bytes, state: State, socket: TcpStream) -> Result { let (tx, rx) = mpsc::unbounded_channel(); let mut shared = state.lock().await; let client = Client { - messages, + socket, rx, peer_tx: shared.handshake_cache.remove(&id), }; @@ -52,7 +50,8 @@ impl Client { async fn upgrade( client: Client, state: State, - handshake_payload: HandshakePayload, + id: Bytes, + handshake_msg: Bytes, ) -> Result { let mut client = client; let peer_tx = match client.peer_tx { @@ -62,27 +61,18 @@ impl Client { None => { tokio::select! { Some(msg) = client.rx.recv() => { - client.messages.send(msg).await? - } - result = client.messages.next() => match result { - Some(_) => return Err(anyhow!("Client sending more messages before handshake complete")), - None => return Err(anyhow!("Connection interrupted")), + client.socket.write_all(&msg).await? } } - match state - .lock() - .await - .handshake_cache - .remove(&handshake_payload.id) - { + match state.lock().await.handshake_cache.remove(&id) { Some(peer_tx) => peer_tx, None => return Err(anyhow!("Connection not stapled")), } } }; - peer_tx.send(Message::HandshakeMessage(handshake_payload))?; + peer_tx.send(handshake_msg)?; Ok(StapledClient { - messages: client.messages, + socket: client.socket, rx: client.rx, peer_tx, }) @@ -109,26 +99,18 @@ pub async fn serve() -> Result<()> { pub async fn handle_connection( state: Arc>, socket: TcpStream, - addr: SocketAddr, + _addr: SocketAddr, ) -> Result<()> { - let mut stream = Message::to_stream(socket); - println!("server - new conn from {:?}", addr); - let handshake_payload = match stream.next().await { - Some(Ok(Message::HandshakeMessage(payload))) => payload, - Some(Ok(_)) => { - stream - .send(Message::ErrorMessage(RuckError::NotHandshake)) - .await?; - return Ok(()); - } - _ => { - println!("No first message"); - return Ok(()); - } - }; - let id = handshake_payload.id.clone(); - let client = Client::new(id.clone(), state.clone(), stream).await?; - let mut client = match Client::upgrade(client, state.clone(), handshake_payload).await { + socket.readable().await?; + let mut socket = socket; + let mut id_buffer = BytesMut::with_capacity(32); + let mut msg_buffer = BytesMut::with_capacity(33); + socket.read_exact(&mut id_buffer).await?; + socket.read_exact(&mut msg_buffer).await?; + let id = id_buffer.freeze(); + let msg = msg_buffer.freeze(); + let client = Client::new(id.clone(), state.clone(), socket).await?; + let mut client = match Client::upgrade(client, state.clone(), id.clone(), msg).await { Ok(client) => client, Err(err) => { // Clear handshake cache if staple is unsuccessful @@ -137,19 +119,22 @@ pub async fn handle_connection( } }; // The handshake cache should be empty for {id} at this point. + let mut channel_buffer = BytesMut::with_capacity(1024); loop { tokio::select! { Some(msg) = client.rx.recv() => { - client.messages.send(msg).await? + client.socket.write_all(&msg).await? } - result = client.messages.next() => match result { - Some(Ok(msg)) => { - client.peer_tx.send(msg)? - } - Some(Err(e)) => { + result = client.socket.read(&mut channel_buffer) => match result { + Ok(0) => { + break + }, + Ok(n) => { + client.peer_tx.send(BytesMut::from(&channel_buffer[0..n]).freeze())? + }, + Err(e) => { println!("Error {:?}", e); } - None => break, } } }