From b3b683074dfb81b48602f4ee35d0199f97889b81 Mon Sep 17 00:00:00 2001 From: Donald Knuth Date: Mon, 29 Aug 2022 14:01:56 -0400 Subject: [PATCH] Connection stapled; pared down server --- docs/Protocol.md | 12 ++---------- src/client.rs | 22 ++++++++++------------ src/crypto.rs | 48 +++++++++++++++++++++++------------------------- src/server.rs | 28 +++++++++++++++++----------- 4 files changed, 52 insertions(+), 58 deletions(-) diff --git a/docs/Protocol.md b/docs/Protocol.md index 425de11..efd77b4 100644 --- a/docs/Protocol.md +++ b/docs/Protocol.md @@ -8,11 +8,11 @@ This document refers to version `0.1.0` of `ruck` as defined by the `Cargo.toml` ## Server -The server in `ruck` exposes a TCP port, typically port `8080`. Its only functions are to staple connections and shuttle bytes between stapled connections. The first 33 bytes sent from a new client are stored in a HashMap. If the same 33 bytes are already in the Hashmap, the connections are then stapled. This 33 byte key is defined by the [Spake2](https://docs.rs/spake2/0.3.1/spake2/) handshake algorithm which the clients employ to negotiate a single use password to encrypt all their messages. Although from the server's perspective, the clients can agree on these 33 bytes in any way. +The server in `ruck` exposes a TCP port, typically port `8080`. Its only functions are to staple connections and shuttle bytes between stapled connections. The first 32 bytes sent from a new client are stored in a HashMap. If the same 32 bytes are already in the Hashmap, the connections are then stapled. This 32 byte key is defined by the [Spake2](https://docs.rs/spake2/0.3.1/spake2/) handshake algorithm which the clients employ to negotiate a single use password to encrypt all their messages. Although from the server's perspective, the clients can agree on these 32 bytes in any way. Once the connection is stapled, all bytes are piped across until a client disconnects or times out. The time out is set to remove idle connections. Beyond stapling connections, the file negotiation aspect of the protocol is managed by the clients. For this reason, `ruck` servers are very resistant to updates and protocol updates typically do not necessitate new deployments. -The server does nothing else with the bytes, so the clients are free to end-to-end encrypt their messages, as long as the first 33 bytes sent over the wire match. Other than that, it is a private echo server. +The server does nothing else with the bytes, so the clients are free to end-to-end encrypt their messages, as long as the first 32 bytes sent over the wire match. Other than that, it is a private echo server. ## Client @@ -37,11 +37,3 @@ Receive specifies which bytes it wants from these files. Send sends the specified bytes and waits. Receive sends heartbeats with progress updates. Send hangs up once the heartbeats stop or received a successful heartbeat. - -```rust -socket.readable().await?; -let mut socket = socket; -let mut buf = BytesMut::with_capacity(33); -socket.read_exact(&mut buf).await?; -let id = buf.freeze(); -``` diff --git a/src/client.rs b/src/client.rs index 60c4da0..dfe1c43 100644 --- a/src/client.rs +++ b/src/client.rs @@ -33,21 +33,20 @@ pub async fn send(file_paths: &Vec, password: &String) -> Result<()> { // Establish connection to server let socket = TcpStream::connect("127.0.0.1:8080").await?; - let mut stream = Message::to_stream(socket); // Complete handshake, returning cipher used for encryption - let (stream, cipher) = handshake( - &mut stream, + let (socket, cipher) = handshake( + socket, Bytes::from(password.to_string()), pass_to_bytes(password), ) .await?; - + let mut stream = Message::to_stream(socket); // Complete file negotiation - let handles = negotiate_files_up(handles, stream, &cipher).await?; + let handles = negotiate_files_up(handles, &mut stream, &cipher).await?; // Upload negotiated files - upload_encrypted_files(stream, handles, &cipher).await?; + upload_encrypted_files(&mut stream, handles, &cipher).await?; // Exit Ok(()) @@ -55,17 +54,16 @@ pub async fn send(file_paths: &Vec, password: &String) -> Result<()> { pub async fn receive(password: &String) -> Result<()> { let socket = TcpStream::connect("127.0.0.1:8080").await?; - let mut stream = Message::to_stream(socket); - let (stream, cipher) = handshake( - &mut stream, + let (socket, cipher) = handshake( + socket, Bytes::from(password.to_string()), pass_to_bytes(password), ) .await?; + let mut stream = Message::to_stream(socket); + let files = negotiate_files_down(&mut stream, &cipher).await?; - let files = negotiate_files_down(stream, &cipher).await?; - - download_files(files, stream, &cipher).await?; + download_files(files, &mut stream, &cipher).await?; return Ok(()); } diff --git a/src/crypto.rs b/src/crypto.rs index 0fe748c..482d37e 100644 --- a/src/crypto.rs +++ b/src/crypto.rs @@ -1,48 +1,46 @@ -use crate::message::{EncryptedPayload, HandshakePayload, Message, MessageStream}; +use crate::message::EncryptedPayload; use aes_gcm::aead::{Aead, NewAead}; use aes_gcm::{Aes256Gcm, Key, Nonce}; // Or `Aes128Gcm` use anyhow::{anyhow, Result}; -use bytes::Bytes; +use bytes::{Bytes, BytesMut}; use futures::prelude::*; use rand::{thread_rng, Rng}; use spake2::{Ed25519Group, Identity, Password, Spake2}; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::net::TcpStream; pub async fn handshake( - stream: &mut MessageStream, + socket: TcpStream, password: Bytes, id: Bytes, -) -> Result<(&mut MessageStream, Aes256Gcm)> { +) -> Result<(TcpStream, Aes256Gcm)> { + let mut socket = socket; let (s1, outbound_msg) = Spake2::::start_symmetric(&Password::new(password), &Identity::new(&id)); println!("client - sending handshake msg"); - let handshake_msg = Message::HandshakeMessage(HandshakePayload { - id: id.clone(), - msg: Bytes::from(outbound_msg.clone()), - }); + let mut handshake_msg = BytesMut::with_capacity(32 + 33); + handshake_msg.extend_from_slice(&id); + handshake_msg.extend_from_slice(&outbound_msg); + let handshake_msg = handshake_msg.freeze(); println!("client - handshake msg, {:?}", handshake_msg); - println!( - "len id: {:?}. len msg: {:?}", - id.len(), - Bytes::from(outbound_msg).len() - ); - stream.send(handshake_msg).await?; - let first_message = match stream.next().await { - Some(Ok(msg)) => match msg { - Message::HandshakeMessage(response) => response.msg, - _ => return Err(anyhow!("Expecting handshake message response")), - }, - _ => { - return Err(anyhow!("No response to handshake message")); - } - }; - println!("client - handshake msg responded to"); + // println!( + // "len id: {:?}. len msg: {:?}", + // id.len(), + // Bytes::from(outbound_msg).len() + // ); + socket.write_all(&handshake_msg).await?; + let mut buffer = [0; 33]; + let n = socket.read_exact(&mut buffer).await?; + println!("The bytes: {:?}", &buffer[..n]); + let first_message = BytesMut::from(&buffer[..n]).freeze(); + println!("client - handshake msg responded to: {:?}", first_message); let key = match s1.finish(&first_message[..]) { Ok(key_bytes) => key_bytes, Err(e) => return Err(anyhow!(e.to_string())), }; // println!("Handshake successful. Key is {:?}", key); - return Ok((stream, new_cipher(&key))); + return Ok((socket, new_cipher(&key))); } pub fn new_cipher(key: &Vec) -> Aes256Gcm { diff --git a/src/server.rs b/src/server.rs index 498efd4..0d7f8c0 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,6 +1,7 @@ use anyhow::{anyhow, Result}; use bytes::{Bytes, BytesMut}; use std::collections::HashMap; +use std::io; use std::net::SocketAddr; use std::sync::Arc; use tokio::io::{AsyncReadExt, AsyncWriteExt}; @@ -61,7 +62,7 @@ impl Client { None => { tokio::select! { Some(msg) = client.rx.recv() => { - client.socket.write_all(&msg).await? + client.socket.write_all(&msg[..]).await? } } match state.lock().await.handshake_cache.remove(&id) { @@ -103,13 +104,15 @@ pub async fn handle_connection( ) -> Result<()> { socket.readable().await?; let mut socket = socket; - let mut id_buffer = BytesMut::with_capacity(32); - let mut msg_buffer = BytesMut::with_capacity(33); - socket.read_exact(&mut id_buffer).await?; - socket.read_exact(&mut msg_buffer).await?; - let id = id_buffer.freeze(); - let msg = msg_buffer.freeze(); + // let mut handshake_buffer = BytesMut::with_capacity(55); + let mut buffer = [0; 32 + 33]; + let n = socket.read_exact(&mut buffer).await?; + println!("The bytes: {:?}", &buffer[..n]); + let mut msg = BytesMut::from(&buffer[..n]).freeze(); + let id = msg.split_to(32); + println!("New client with id={:?}, msg={:?}", id.clone(), msg.clone()); let client = Client::new(id.clone(), state.clone(), socket).await?; + println!("Client created"); let mut client = match Client::upgrade(client, state.clone(), id.clone(), msg).await { Ok(client) => client, Err(err) => { @@ -118,19 +121,21 @@ pub async fn handle_connection( return Err(err); } }; + println!("Client upgraded"); // The handshake cache should be empty for {id} at this point. - let mut channel_buffer = BytesMut::with_capacity(1024); + let mut client_buffer = BytesMut::with_capacity(1024); loop { tokio::select! { Some(msg) = client.rx.recv() => { - client.socket.write_all(&msg).await? + client.socket.write_all(&msg[..]).await? } - result = client.socket.read(&mut channel_buffer) => match result { + result = client.socket.read(&mut client_buffer) => match result { Ok(0) => { break }, Ok(n) => { - client.peer_tx.send(BytesMut::from(&channel_buffer[0..n]).freeze())? + println!("reading more"); + client.peer_tx.send(BytesMut::from(&client_buffer[0..n]).freeze())? }, Err(e) => { println!("Error {:?}", e); @@ -138,5 +143,6 @@ pub async fn handle_connection( } } } + println!("done with client"); Ok(()) }