This commit is contained in:
Donald Knuth 2022-08-29 12:51:37 -04:00
parent 764a6d4687
commit 31e4fe7c7b
3 changed files with 90 additions and 53 deletions

47
docs/Protocol.md Normal file
View file

@ -0,0 +1,47 @@
# Protocol
`ruck` is a command line tool used for hosting relay servers and sending end-to-end encrypted files between clients. This document describes the protocol `ruck` uses to support this functionality.
### Version
This document refers to version `0.1.0` of `ruck` as defined by the `Cargo.toml` file.
## Server
The server in `ruck` exposes a TCP port, typically port `8080`. Its only functions are to staple connections and shuttle bytes between stapled connections. The first 33 bytes sent from a new client are stored in a HashMap. If the same 33 bytes are already in the Hashmap, the connections are then stapled. This 33 byte key is defined by the [Spake2](https://docs.rs/spake2/0.3.1/spake2/) handshake algorithm which the clients employ to negotiate a single use password to encrypt all their messages. Although from the server's perspective, the clients can agree on these 33 bytes in any way.
Once the connection is stapled, all bytes are piped across until a client disconnects or times out. The time out is set to remove idle connections. Beyond stapling connections, the file negotiation aspect of the protocol is managed by the clients. For this reason, `ruck` servers are very resistant to updates and protocol updates typically do not necessitate new deployments.
The server does nothing else with the bytes, so the clients are free to end-to-end encrypt their messages, as long as the first 33 bytes sent over the wire match. Other than that, it is a private echo server.
## Client
There are two types of clients - `send` and `receive` clients. The following state machine describes the protocol. All the messages after the exchange of passwords are typically bzip compressed, encrypted with Aes256Gcm using a Spake2 key derived from the exchanged password. They are sent over the wire as bincode. Each message has a fixed size of 1024 \* 1024 bytes.
Message Types:
- Vec<File Info>
-
- Set a timeout for new messages.
Send or receive.
If send:
- Send message with file info
They exchange passwords.
Send offers a list of files.
Receive specifies which bytes it wants from these files.
Send sends the specified bytes and waits.
Receive sends heartbeats with progress updates.
Send hangs up once the heartbeats stop or received a successful heartbeat.
```rust
socket.readable().await?;
let mut socket = socket;
let mut buf = BytesMut::with_capacity(33);
socket.read_exact(&mut buf).await?;
let id = buf.freeze();
```

View file

@ -17,10 +17,15 @@ pub async fn handshake(
Spake2::<Ed25519Group>::start_symmetric(&Password::new(password), &Identity::new(&id));
println!("client - sending handshake msg");
let handshake_msg = Message::HandshakeMessage(HandshakePayload {
id,
msg: Bytes::from(outbound_msg),
id: id.clone(),
msg: Bytes::from(outbound_msg.clone()),
});
// println!("client - handshake msg, {:?}", handshake_msg);
println!("client - handshake msg, {:?}", handshake_msg);
println!(
"len id: {:?}. len msg: {:?}",
id.len(),
Bytes::from(outbound_msg).len()
);
stream.send(handshake_msg).await?;
let first_message = match stream.next().await {
Some(Ok(msg)) => match msg {

View file

@ -1,16 +1,14 @@
use crate::message::{HandshakePayload, Message, MessageStream, RuckError};
use anyhow::{anyhow, Result};
use bytes::Bytes;
use futures::prelude::*;
use bytes::{Bytes, BytesMut};
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::{mpsc, Mutex};
type Tx = mpsc::UnboundedSender<Message>;
type Rx = mpsc::UnboundedReceiver<Message>;
type Tx = mpsc::UnboundedSender<Bytes>;
type Rx = mpsc::UnboundedReceiver<Bytes>;
pub struct Shared {
handshake_cache: HashMap<Bytes, Tx>,
@ -18,12 +16,12 @@ pub struct Shared {
type State = Arc<Mutex<Shared>>;
struct Client {
messages: MessageStream,
socket: TcpStream,
rx: Rx,
peer_tx: Option<Tx>,
}
struct StapledClient {
messages: MessageStream,
socket: TcpStream,
rx: Rx,
peer_tx: Tx,
}
@ -37,11 +35,11 @@ impl Shared {
}
impl Client {
async fn new(id: Bytes, state: State, messages: MessageStream) -> Result<Client> {
async fn new(id: Bytes, state: State, socket: TcpStream) -> Result<Client> {
let (tx, rx) = mpsc::unbounded_channel();
let mut shared = state.lock().await;
let client = Client {
messages,
socket,
rx,
peer_tx: shared.handshake_cache.remove(&id),
};
@ -52,7 +50,8 @@ impl Client {
async fn upgrade(
client: Client,
state: State,
handshake_payload: HandshakePayload,
id: Bytes,
handshake_msg: Bytes,
) -> Result<StapledClient> {
let mut client = client;
let peer_tx = match client.peer_tx {
@ -62,27 +61,18 @@ impl Client {
None => {
tokio::select! {
Some(msg) = client.rx.recv() => {
client.messages.send(msg).await?
}
result = client.messages.next() => match result {
Some(_) => return Err(anyhow!("Client sending more messages before handshake complete")),
None => return Err(anyhow!("Connection interrupted")),
client.socket.write_all(&msg).await?
}
}
match state
.lock()
.await
.handshake_cache
.remove(&handshake_payload.id)
{
match state.lock().await.handshake_cache.remove(&id) {
Some(peer_tx) => peer_tx,
None => return Err(anyhow!("Connection not stapled")),
}
}
};
peer_tx.send(Message::HandshakeMessage(handshake_payload))?;
peer_tx.send(handshake_msg)?;
Ok(StapledClient {
messages: client.messages,
socket: client.socket,
rx: client.rx,
peer_tx,
})
@ -109,26 +99,18 @@ pub async fn serve() -> Result<()> {
pub async fn handle_connection(
state: Arc<Mutex<Shared>>,
socket: TcpStream,
addr: SocketAddr,
_addr: SocketAddr,
) -> Result<()> {
let mut stream = Message::to_stream(socket);
println!("server - new conn from {:?}", addr);
let handshake_payload = match stream.next().await {
Some(Ok(Message::HandshakeMessage(payload))) => payload,
Some(Ok(_)) => {
stream
.send(Message::ErrorMessage(RuckError::NotHandshake))
.await?;
return Ok(());
}
_ => {
println!("No first message");
return Ok(());
}
};
let id = handshake_payload.id.clone();
let client = Client::new(id.clone(), state.clone(), stream).await?;
let mut client = match Client::upgrade(client, state.clone(), handshake_payload).await {
socket.readable().await?;
let mut socket = socket;
let mut id_buffer = BytesMut::with_capacity(32);
let mut msg_buffer = BytesMut::with_capacity(33);
socket.read_exact(&mut id_buffer).await?;
socket.read_exact(&mut msg_buffer).await?;
let id = id_buffer.freeze();
let msg = msg_buffer.freeze();
let client = Client::new(id.clone(), state.clone(), socket).await?;
let mut client = match Client::upgrade(client, state.clone(), id.clone(), msg).await {
Ok(client) => client,
Err(err) => {
// Clear handshake cache if staple is unsuccessful
@ -137,19 +119,22 @@ pub async fn handle_connection(
}
};
// The handshake cache should be empty for {id} at this point.
let mut channel_buffer = BytesMut::with_capacity(1024);
loop {
tokio::select! {
Some(msg) = client.rx.recv() => {
client.messages.send(msg).await?
client.socket.write_all(&msg).await?
}
result = client.messages.next() => match result {
Some(Ok(msg)) => {
client.peer_tx.send(msg)?
}
Some(Err(e)) => {
result = client.socket.read(&mut channel_buffer) => match result {
Ok(0) => {
break
},
Ok(n) => {
client.peer_tx.send(BytesMut::from(&channel_buffer[0..n]).freeze())?
},
Err(e) => {
println!("Error {:?}", e);
}
None => break,
}
}
}