From 31e4fe7c7ba0376ec30dc45017010488d65a77cb Mon Sep 17 00:00:00 2001 From: Donald Knuth Date: Mon, 29 Aug 2022 12:51:37 -0400 Subject: [PATCH 01/13] Docs --- docs/Protocol.md | 47 ++++++++++++++++++++++++++ src/crypto.rs | 11 +++++-- src/server.rs | 85 ++++++++++++++++++++---------------------------- 3 files changed, 90 insertions(+), 53 deletions(-) create mode 100644 docs/Protocol.md diff --git a/docs/Protocol.md b/docs/Protocol.md new file mode 100644 index 0000000..425de11 --- /dev/null +++ b/docs/Protocol.md @@ -0,0 +1,47 @@ +# Protocol + +`ruck` is a command line tool used for hosting relay servers and sending end-to-end encrypted files between clients. This document describes the protocol `ruck` uses to support this functionality. + +### Version + +This document refers to version `0.1.0` of `ruck` as defined by the `Cargo.toml` file. + +## Server + +The server in `ruck` exposes a TCP port, typically port `8080`. Its only functions are to staple connections and shuttle bytes between stapled connections. The first 33 bytes sent from a new client are stored in a HashMap. If the same 33 bytes are already in the Hashmap, the connections are then stapled. This 33 byte key is defined by the [Spake2](https://docs.rs/spake2/0.3.1/spake2/) handshake algorithm which the clients employ to negotiate a single use password to encrypt all their messages. Although from the server's perspective, the clients can agree on these 33 bytes in any way. + +Once the connection is stapled, all bytes are piped across until a client disconnects or times out. The time out is set to remove idle connections. Beyond stapling connections, the file negotiation aspect of the protocol is managed by the clients. For this reason, `ruck` servers are very resistant to updates and protocol updates typically do not necessitate new deployments. + +The server does nothing else with the bytes, so the clients are free to end-to-end encrypt their messages, as long as the first 33 bytes sent over the wire match. Other than that, it is a private echo server. + +## Client + +There are two types of clients - `send` and `receive` clients. The following state machine describes the protocol. All the messages after the exchange of passwords are typically bzip compressed, encrypted with Aes256Gcm using a Spake2 key derived from the exchanged password. They are sent over the wire as bincode. Each message has a fixed size of 1024 \* 1024 bytes. + +Message Types: + +- Vec +- + +- Set a timeout for new messages. + +Send or receive. + +If send: + +- Send message with file info + +They exchange passwords. +Send offers a list of files. +Receive specifies which bytes it wants from these files. +Send sends the specified bytes and waits. +Receive sends heartbeats with progress updates. +Send hangs up once the heartbeats stop or received a successful heartbeat. + +```rust +socket.readable().await?; +let mut socket = socket; +let mut buf = BytesMut::with_capacity(33); +socket.read_exact(&mut buf).await?; +let id = buf.freeze(); +``` diff --git a/src/crypto.rs b/src/crypto.rs index 43cf8ef..0fe748c 100644 --- a/src/crypto.rs +++ b/src/crypto.rs @@ -17,10 +17,15 @@ pub async fn handshake( Spake2::::start_symmetric(&Password::new(password), &Identity::new(&id)); println!("client - sending handshake msg"); let handshake_msg = Message::HandshakeMessage(HandshakePayload { - id, - msg: Bytes::from(outbound_msg), + id: id.clone(), + msg: Bytes::from(outbound_msg.clone()), }); - // println!("client - handshake msg, {:?}", handshake_msg); + println!("client - handshake msg, {:?}", handshake_msg); + println!( + "len id: {:?}. len msg: {:?}", + id.len(), + Bytes::from(outbound_msg).len() + ); stream.send(handshake_msg).await?; let first_message = match stream.next().await { Some(Ok(msg)) => match msg { diff --git a/src/server.rs b/src/server.rs index 759e154..498efd4 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,16 +1,14 @@ -use crate::message::{HandshakePayload, Message, MessageStream, RuckError}; - use anyhow::{anyhow, Result}; -use bytes::Bytes; -use futures::prelude::*; +use bytes::{Bytes, BytesMut}; use std::collections::HashMap; use std::net::SocketAddr; use std::sync::Arc; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::{TcpListener, TcpStream}; use tokio::sync::{mpsc, Mutex}; -type Tx = mpsc::UnboundedSender; -type Rx = mpsc::UnboundedReceiver; +type Tx = mpsc::UnboundedSender; +type Rx = mpsc::UnboundedReceiver; pub struct Shared { handshake_cache: HashMap, @@ -18,12 +16,12 @@ pub struct Shared { type State = Arc>; struct Client { - messages: MessageStream, + socket: TcpStream, rx: Rx, peer_tx: Option, } struct StapledClient { - messages: MessageStream, + socket: TcpStream, rx: Rx, peer_tx: Tx, } @@ -37,11 +35,11 @@ impl Shared { } impl Client { - async fn new(id: Bytes, state: State, messages: MessageStream) -> Result { + async fn new(id: Bytes, state: State, socket: TcpStream) -> Result { let (tx, rx) = mpsc::unbounded_channel(); let mut shared = state.lock().await; let client = Client { - messages, + socket, rx, peer_tx: shared.handshake_cache.remove(&id), }; @@ -52,7 +50,8 @@ impl Client { async fn upgrade( client: Client, state: State, - handshake_payload: HandshakePayload, + id: Bytes, + handshake_msg: Bytes, ) -> Result { let mut client = client; let peer_tx = match client.peer_tx { @@ -62,27 +61,18 @@ impl Client { None => { tokio::select! { Some(msg) = client.rx.recv() => { - client.messages.send(msg).await? - } - result = client.messages.next() => match result { - Some(_) => return Err(anyhow!("Client sending more messages before handshake complete")), - None => return Err(anyhow!("Connection interrupted")), + client.socket.write_all(&msg).await? } } - match state - .lock() - .await - .handshake_cache - .remove(&handshake_payload.id) - { + match state.lock().await.handshake_cache.remove(&id) { Some(peer_tx) => peer_tx, None => return Err(anyhow!("Connection not stapled")), } } }; - peer_tx.send(Message::HandshakeMessage(handshake_payload))?; + peer_tx.send(handshake_msg)?; Ok(StapledClient { - messages: client.messages, + socket: client.socket, rx: client.rx, peer_tx, }) @@ -109,26 +99,18 @@ pub async fn serve() -> Result<()> { pub async fn handle_connection( state: Arc>, socket: TcpStream, - addr: SocketAddr, + _addr: SocketAddr, ) -> Result<()> { - let mut stream = Message::to_stream(socket); - println!("server - new conn from {:?}", addr); - let handshake_payload = match stream.next().await { - Some(Ok(Message::HandshakeMessage(payload))) => payload, - Some(Ok(_)) => { - stream - .send(Message::ErrorMessage(RuckError::NotHandshake)) - .await?; - return Ok(()); - } - _ => { - println!("No first message"); - return Ok(()); - } - }; - let id = handshake_payload.id.clone(); - let client = Client::new(id.clone(), state.clone(), stream).await?; - let mut client = match Client::upgrade(client, state.clone(), handshake_payload).await { + socket.readable().await?; + let mut socket = socket; + let mut id_buffer = BytesMut::with_capacity(32); + let mut msg_buffer = BytesMut::with_capacity(33); + socket.read_exact(&mut id_buffer).await?; + socket.read_exact(&mut msg_buffer).await?; + let id = id_buffer.freeze(); + let msg = msg_buffer.freeze(); + let client = Client::new(id.clone(), state.clone(), socket).await?; + let mut client = match Client::upgrade(client, state.clone(), id.clone(), msg).await { Ok(client) => client, Err(err) => { // Clear handshake cache if staple is unsuccessful @@ -137,19 +119,22 @@ pub async fn handle_connection( } }; // The handshake cache should be empty for {id} at this point. + let mut channel_buffer = BytesMut::with_capacity(1024); loop { tokio::select! { Some(msg) = client.rx.recv() => { - client.messages.send(msg).await? + client.socket.write_all(&msg).await? } - result = client.messages.next() => match result { - Some(Ok(msg)) => { - client.peer_tx.send(msg)? - } - Some(Err(e)) => { + result = client.socket.read(&mut channel_buffer) => match result { + Ok(0) => { + break + }, + Ok(n) => { + client.peer_tx.send(BytesMut::from(&channel_buffer[0..n]).freeze())? + }, + Err(e) => { println!("Error {:?}", e); } - None => break, } } } From b3b683074dfb81b48602f4ee35d0199f97889b81 Mon Sep 17 00:00:00 2001 From: Donald Knuth Date: Mon, 29 Aug 2022 14:01:56 -0400 Subject: [PATCH 02/13] Connection stapled; pared down server --- docs/Protocol.md | 12 ++---------- src/client.rs | 22 ++++++++++------------ src/crypto.rs | 48 +++++++++++++++++++++++------------------------- src/server.rs | 28 +++++++++++++++++----------- 4 files changed, 52 insertions(+), 58 deletions(-) diff --git a/docs/Protocol.md b/docs/Protocol.md index 425de11..efd77b4 100644 --- a/docs/Protocol.md +++ b/docs/Protocol.md @@ -8,11 +8,11 @@ This document refers to version `0.1.0` of `ruck` as defined by the `Cargo.toml` ## Server -The server in `ruck` exposes a TCP port, typically port `8080`. Its only functions are to staple connections and shuttle bytes between stapled connections. The first 33 bytes sent from a new client are stored in a HashMap. If the same 33 bytes are already in the Hashmap, the connections are then stapled. This 33 byte key is defined by the [Spake2](https://docs.rs/spake2/0.3.1/spake2/) handshake algorithm which the clients employ to negotiate a single use password to encrypt all their messages. Although from the server's perspective, the clients can agree on these 33 bytes in any way. +The server in `ruck` exposes a TCP port, typically port `8080`. Its only functions are to staple connections and shuttle bytes between stapled connections. The first 32 bytes sent from a new client are stored in a HashMap. If the same 32 bytes are already in the Hashmap, the connections are then stapled. This 32 byte key is defined by the [Spake2](https://docs.rs/spake2/0.3.1/spake2/) handshake algorithm which the clients employ to negotiate a single use password to encrypt all their messages. Although from the server's perspective, the clients can agree on these 32 bytes in any way. Once the connection is stapled, all bytes are piped across until a client disconnects or times out. The time out is set to remove idle connections. Beyond stapling connections, the file negotiation aspect of the protocol is managed by the clients. For this reason, `ruck` servers are very resistant to updates and protocol updates typically do not necessitate new deployments. -The server does nothing else with the bytes, so the clients are free to end-to-end encrypt their messages, as long as the first 33 bytes sent over the wire match. Other than that, it is a private echo server. +The server does nothing else with the bytes, so the clients are free to end-to-end encrypt their messages, as long as the first 32 bytes sent over the wire match. Other than that, it is a private echo server. ## Client @@ -37,11 +37,3 @@ Receive specifies which bytes it wants from these files. Send sends the specified bytes and waits. Receive sends heartbeats with progress updates. Send hangs up once the heartbeats stop or received a successful heartbeat. - -```rust -socket.readable().await?; -let mut socket = socket; -let mut buf = BytesMut::with_capacity(33); -socket.read_exact(&mut buf).await?; -let id = buf.freeze(); -``` diff --git a/src/client.rs b/src/client.rs index 60c4da0..dfe1c43 100644 --- a/src/client.rs +++ b/src/client.rs @@ -33,21 +33,20 @@ pub async fn send(file_paths: &Vec, password: &String) -> Result<()> { // Establish connection to server let socket = TcpStream::connect("127.0.0.1:8080").await?; - let mut stream = Message::to_stream(socket); // Complete handshake, returning cipher used for encryption - let (stream, cipher) = handshake( - &mut stream, + let (socket, cipher) = handshake( + socket, Bytes::from(password.to_string()), pass_to_bytes(password), ) .await?; - + let mut stream = Message::to_stream(socket); // Complete file negotiation - let handles = negotiate_files_up(handles, stream, &cipher).await?; + let handles = negotiate_files_up(handles, &mut stream, &cipher).await?; // Upload negotiated files - upload_encrypted_files(stream, handles, &cipher).await?; + upload_encrypted_files(&mut stream, handles, &cipher).await?; // Exit Ok(()) @@ -55,17 +54,16 @@ pub async fn send(file_paths: &Vec, password: &String) -> Result<()> { pub async fn receive(password: &String) -> Result<()> { let socket = TcpStream::connect("127.0.0.1:8080").await?; - let mut stream = Message::to_stream(socket); - let (stream, cipher) = handshake( - &mut stream, + let (socket, cipher) = handshake( + socket, Bytes::from(password.to_string()), pass_to_bytes(password), ) .await?; + let mut stream = Message::to_stream(socket); + let files = negotiate_files_down(&mut stream, &cipher).await?; - let files = negotiate_files_down(stream, &cipher).await?; - - download_files(files, stream, &cipher).await?; + download_files(files, &mut stream, &cipher).await?; return Ok(()); } diff --git a/src/crypto.rs b/src/crypto.rs index 0fe748c..482d37e 100644 --- a/src/crypto.rs +++ b/src/crypto.rs @@ -1,48 +1,46 @@ -use crate::message::{EncryptedPayload, HandshakePayload, Message, MessageStream}; +use crate::message::EncryptedPayload; use aes_gcm::aead::{Aead, NewAead}; use aes_gcm::{Aes256Gcm, Key, Nonce}; // Or `Aes128Gcm` use anyhow::{anyhow, Result}; -use bytes::Bytes; +use bytes::{Bytes, BytesMut}; use futures::prelude::*; use rand::{thread_rng, Rng}; use spake2::{Ed25519Group, Identity, Password, Spake2}; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::net::TcpStream; pub async fn handshake( - stream: &mut MessageStream, + socket: TcpStream, password: Bytes, id: Bytes, -) -> Result<(&mut MessageStream, Aes256Gcm)> { +) -> Result<(TcpStream, Aes256Gcm)> { + let mut socket = socket; let (s1, outbound_msg) = Spake2::::start_symmetric(&Password::new(password), &Identity::new(&id)); println!("client - sending handshake msg"); - let handshake_msg = Message::HandshakeMessage(HandshakePayload { - id: id.clone(), - msg: Bytes::from(outbound_msg.clone()), - }); + let mut handshake_msg = BytesMut::with_capacity(32 + 33); + handshake_msg.extend_from_slice(&id); + handshake_msg.extend_from_slice(&outbound_msg); + let handshake_msg = handshake_msg.freeze(); println!("client - handshake msg, {:?}", handshake_msg); - println!( - "len id: {:?}. len msg: {:?}", - id.len(), - Bytes::from(outbound_msg).len() - ); - stream.send(handshake_msg).await?; - let first_message = match stream.next().await { - Some(Ok(msg)) => match msg { - Message::HandshakeMessage(response) => response.msg, - _ => return Err(anyhow!("Expecting handshake message response")), - }, - _ => { - return Err(anyhow!("No response to handshake message")); - } - }; - println!("client - handshake msg responded to"); + // println!( + // "len id: {:?}. len msg: {:?}", + // id.len(), + // Bytes::from(outbound_msg).len() + // ); + socket.write_all(&handshake_msg).await?; + let mut buffer = [0; 33]; + let n = socket.read_exact(&mut buffer).await?; + println!("The bytes: {:?}", &buffer[..n]); + let first_message = BytesMut::from(&buffer[..n]).freeze(); + println!("client - handshake msg responded to: {:?}", first_message); let key = match s1.finish(&first_message[..]) { Ok(key_bytes) => key_bytes, Err(e) => return Err(anyhow!(e.to_string())), }; // println!("Handshake successful. Key is {:?}", key); - return Ok((stream, new_cipher(&key))); + return Ok((socket, new_cipher(&key))); } pub fn new_cipher(key: &Vec) -> Aes256Gcm { diff --git a/src/server.rs b/src/server.rs index 498efd4..0d7f8c0 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,6 +1,7 @@ use anyhow::{anyhow, Result}; use bytes::{Bytes, BytesMut}; use std::collections::HashMap; +use std::io; use std::net::SocketAddr; use std::sync::Arc; use tokio::io::{AsyncReadExt, AsyncWriteExt}; @@ -61,7 +62,7 @@ impl Client { None => { tokio::select! { Some(msg) = client.rx.recv() => { - client.socket.write_all(&msg).await? + client.socket.write_all(&msg[..]).await? } } match state.lock().await.handshake_cache.remove(&id) { @@ -103,13 +104,15 @@ pub async fn handle_connection( ) -> Result<()> { socket.readable().await?; let mut socket = socket; - let mut id_buffer = BytesMut::with_capacity(32); - let mut msg_buffer = BytesMut::with_capacity(33); - socket.read_exact(&mut id_buffer).await?; - socket.read_exact(&mut msg_buffer).await?; - let id = id_buffer.freeze(); - let msg = msg_buffer.freeze(); + // let mut handshake_buffer = BytesMut::with_capacity(55); + let mut buffer = [0; 32 + 33]; + let n = socket.read_exact(&mut buffer).await?; + println!("The bytes: {:?}", &buffer[..n]); + let mut msg = BytesMut::from(&buffer[..n]).freeze(); + let id = msg.split_to(32); + println!("New client with id={:?}, msg={:?}", id.clone(), msg.clone()); let client = Client::new(id.clone(), state.clone(), socket).await?; + println!("Client created"); let mut client = match Client::upgrade(client, state.clone(), id.clone(), msg).await { Ok(client) => client, Err(err) => { @@ -118,19 +121,21 @@ pub async fn handle_connection( return Err(err); } }; + println!("Client upgraded"); // The handshake cache should be empty for {id} at this point. - let mut channel_buffer = BytesMut::with_capacity(1024); + let mut client_buffer = BytesMut::with_capacity(1024); loop { tokio::select! { Some(msg) = client.rx.recv() => { - client.socket.write_all(&msg).await? + client.socket.write_all(&msg[..]).await? } - result = client.socket.read(&mut channel_buffer) => match result { + result = client.socket.read(&mut client_buffer) => match result { Ok(0) => { break }, Ok(n) => { - client.peer_tx.send(BytesMut::from(&channel_buffer[0..n]).freeze())? + println!("reading more"); + client.peer_tx.send(BytesMut::from(&client_buffer[0..n]).freeze())? }, Err(e) => { println!("Error {:?}", e); @@ -138,5 +143,6 @@ pub async fn handle_connection( } } } + println!("done with client"); Ok(()) } From 33745f7b444a4c2e4daaf0fc34842908aeb5c760 Mon Sep 17 00:00:00 2001 From: Donald Knuth Date: Mon, 29 Aug 2022 16:14:30 -0400 Subject: [PATCH 03/13] Fix server; parity with main --- src/client.rs | 1 + src/conf.rs | 2 +- src/crypto.rs | 9 ++------- src/server.rs | 15 +++++++++------ 4 files changed, 13 insertions(+), 14 deletions(-) diff --git a/src/client.rs b/src/client.rs index dfe1c43..08ea686 100644 --- a/src/client.rs +++ b/src/client.rs @@ -83,6 +83,7 @@ pub async fn negotiate_files_up( let files = file_handles.iter().map(|fh| fh.to_file_info()).collect(); let msg = EncryptedMessage::FileNegotiationMessage(FileNegotiationPayload { files }); let server_msg = msg.to_encrypted_message(cipher)?; + println!("server_msg encrypted: {:?}", server_msg); stream.send(server_msg).await?; let reply_payload = match stream.next().await { Some(Ok(msg)) => match msg { diff --git a/src/conf.rs b/src/conf.rs index 07a6f99..b4db236 100644 --- a/src/conf.rs +++ b/src/conf.rs @@ -1 +1 @@ -pub const BUFFER_SIZE: usize = 1024 * 1024; +pub const BUFFER_SIZE: usize = 1024 * 1024 * 10; diff --git a/src/crypto.rs b/src/crypto.rs index 482d37e..1b555db 100644 --- a/src/crypto.rs +++ b/src/crypto.rs @@ -4,7 +4,6 @@ use aes_gcm::aead::{Aead, NewAead}; use aes_gcm::{Aes256Gcm, Key, Nonce}; // Or `Aes128Gcm` use anyhow::{anyhow, Result}; use bytes::{Bytes, BytesMut}; -use futures::prelude::*; use rand::{thread_rng, Rng}; use spake2::{Ed25519Group, Identity, Password, Spake2}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; @@ -24,11 +23,7 @@ pub async fn handshake( handshake_msg.extend_from_slice(&outbound_msg); let handshake_msg = handshake_msg.freeze(); println!("client - handshake msg, {:?}", handshake_msg); - // println!( - // "len id: {:?}. len msg: {:?}", - // id.len(), - // Bytes::from(outbound_msg).len() - // ); + println!("id: {:?}. msg: {:?}", id.clone(), outbound_msg.clone()); socket.write_all(&handshake_msg).await?; let mut buffer = [0; 33]; let n = socket.read_exact(&mut buffer).await?; @@ -39,7 +34,7 @@ pub async fn handshake( Ok(key_bytes) => key_bytes, Err(e) => return Err(anyhow!(e.to_string())), }; - // println!("Handshake successful. Key is {:?}", key); + println!("Handshake successful. Key is {:?}", key); return Ok((socket, new_cipher(&key))); } diff --git a/src/server.rs b/src/server.rs index 0d7f8c0..102d0f4 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,7 +1,7 @@ +use crate::conf::BUFFER_SIZE; use anyhow::{anyhow, Result}; use bytes::{Bytes, BytesMut}; use std::collections::HashMap; -use std::io; use std::net::SocketAddr; use std::sync::Arc; use tokio::io::{AsyncReadExt, AsyncWriteExt}; @@ -123,19 +123,22 @@ pub async fn handle_connection( }; println!("Client upgraded"); // The handshake cache should be empty for {id} at this point. - let mut client_buffer = BytesMut::with_capacity(1024); + let mut client_buffer = BytesMut::with_capacity(BUFFER_SIZE); loop { tokio::select! { Some(msg) = client.rx.recv() => { + // println!("piping bytes= {:?}", msg); client.socket.write_all(&msg[..]).await? } - result = client.socket.read(&mut client_buffer) => match result { + result = client.socket.read_buf(&mut client_buffer) => match result { Ok(0) => { - break + break; }, Ok(n) => { - println!("reading more"); - client.peer_tx.send(BytesMut::from(&client_buffer[0..n]).freeze())? + let b = BytesMut::from(&client_buffer[0..n]).freeze(); + // println!("reading more = {:?}", b); + client_buffer.clear(); + client.peer_tx.send(b)? }, Err(e) => { println!("Error {:?}", e); From 995d07282c551f0a7b5abe5a34eb656fd92d5b75 Mon Sep 17 00:00:00 2001 From: Donald Knuth Date: Mon, 29 Aug 2022 21:21:48 -0400 Subject: [PATCH 04/13] Handshake refactored not working --- src/client.rs | 27 ++++------------- src/conf.rs | 2 ++ src/crypto.rs | 65 +++++++++++++++++++--------------------- src/handshake.rs | 78 ++++++++++++++++++++++++++++++++++++++++++++++++ src/main.rs | 1 + src/server.rs | 24 +++++---------- 6 files changed, 125 insertions(+), 72 deletions(-) create mode 100644 src/handshake.rs diff --git a/src/client.rs b/src/client.rs index 08ea686..0e37446 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,13 +1,12 @@ use crate::conf::BUFFER_SIZE; -use crate::crypto::handshake; use crate::file::{to_size_string, FileHandle, FileInfo}; +use crate::handshake::Handshake; use crate::message::{ EncryptedMessage, FileNegotiationPayload, FileTransferPayload, Message, MessageStream, }; use aes_gcm::Aes256Gcm; use anyhow::{anyhow, Result}; -use blake2::{Blake2s256, Digest}; use bytes::{Bytes, BytesMut}; use futures::future::try_join_all; use futures::prelude::*; @@ -20,13 +19,6 @@ use tokio::net::TcpStream; use tokio::sync::mpsc; use tokio_util::codec::{FramedRead, LinesCodec}; -fn pass_to_bytes(password: &String) -> Bytes { - let mut hasher = Blake2s256::new(); - hasher.update(password.as_bytes()); - let res = hasher.finalize(); - BytesMut::from(&res[..]).freeze() -} - pub async fn send(file_paths: &Vec, password: &String) -> Result<()> { // Fail early if there are problems generating file handles let handles = get_file_handles(file_paths).await?; @@ -35,12 +27,9 @@ pub async fn send(file_paths: &Vec, password: &String) -> Result<()> { let socket = TcpStream::connect("127.0.0.1:8080").await?; // Complete handshake, returning cipher used for encryption - let (socket, cipher) = handshake( - socket, - Bytes::from(password.to_string()), - pass_to_bytes(password), - ) - .await?; + let (handshake, s1) = Handshake::from_password(password); + let (socket, cipher) = handshake.negotiate(socket, s1).await?; + let mut stream = Message::to_stream(socket); // Complete file negotiation let handles = negotiate_files_up(handles, &mut stream, &cipher).await?; @@ -54,12 +43,8 @@ pub async fn send(file_paths: &Vec, password: &String) -> Result<()> { pub async fn receive(password: &String) -> Result<()> { let socket = TcpStream::connect("127.0.0.1:8080").await?; - let (socket, cipher) = handshake( - socket, - Bytes::from(password.to_string()), - pass_to_bytes(password), - ) - .await?; + let (handshake, s1) = Handshake::from_password(password); + let (socket, cipher) = handshake.negotiate(socket, s1).await?; let mut stream = Message::to_stream(socket); let files = negotiate_files_down(&mut stream, &cipher).await?; diff --git a/src/conf.rs b/src/conf.rs index b4db236..9ab88ad 100644 --- a/src/conf.rs +++ b/src/conf.rs @@ -1 +1,3 @@ +pub const ID_SIZE: usize = 32; +pub const HANDSHAKE_MSG_SIZE: usize = 33; pub const BUFFER_SIZE: usize = 1024 * 1024 * 10; diff --git a/src/crypto.rs b/src/crypto.rs index 1b555db..60dd513 100644 --- a/src/crypto.rs +++ b/src/crypto.rs @@ -1,42 +1,39 @@ use crate::message::EncryptedPayload; - use aes_gcm::aead::{Aead, NewAead}; use aes_gcm::{Aes256Gcm, Key, Nonce}; // Or `Aes128Gcm` use anyhow::{anyhow, Result}; -use bytes::{Bytes, BytesMut}; -use rand::{thread_rng, Rng}; -use spake2::{Ed25519Group, Identity, Password, Spake2}; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; -use tokio::net::TcpStream; +use bytes::Bytes; -pub async fn handshake( - socket: TcpStream, - password: Bytes, - id: Bytes, -) -> Result<(TcpStream, Aes256Gcm)> { - let mut socket = socket; - let (s1, outbound_msg) = - Spake2::::start_symmetric(&Password::new(password), &Identity::new(&id)); - println!("client - sending handshake msg"); - let mut handshake_msg = BytesMut::with_capacity(32 + 33); - handshake_msg.extend_from_slice(&id); - handshake_msg.extend_from_slice(&outbound_msg); - let handshake_msg = handshake_msg.freeze(); - println!("client - handshake msg, {:?}", handshake_msg); - println!("id: {:?}. msg: {:?}", id.clone(), outbound_msg.clone()); - socket.write_all(&handshake_msg).await?; - let mut buffer = [0; 33]; - let n = socket.read_exact(&mut buffer).await?; - println!("The bytes: {:?}", &buffer[..n]); - let first_message = BytesMut::from(&buffer[..n]).freeze(); - println!("client - handshake msg responded to: {:?}", first_message); - let key = match s1.finish(&first_message[..]) { - Ok(key_bytes) => key_bytes, - Err(e) => return Err(anyhow!(e.to_string())), - }; - println!("Handshake successful. Key is {:?}", key); - return Ok((socket, new_cipher(&key))); -} +use rand::{thread_rng, Rng}; + +// pub async fn handshake( +// socket: TcpStream, +// password: Bytes, +// id: Bytes, +// ) -> Result<(TcpStream, Aes256Gcm)> { +// let mut socket = socket; +// let (s1, outbound_msg) = +// Spake2::::start_symmetric(&Password::new(password), &Identity::new(&id)); +// println!("client - sending handshake msg"); +// let mut handshake_msg = BytesMut::with_capacity(32 + 33); +// handshake_msg.extend_from_slice(&id); +// handshake_msg.extend_from_slice(&outbound_msg); +// let handshake_msg = handshake_msg.freeze(); +// println!("client - handshake msg, {:?}", handshake_msg); +// println!("id: {:?}. msg: {:?}", id.clone(), outbound_msg.clone()); +// socket.write_all(&handshake_msg).await?; +// let mut buffer = [0; 33]; +// let n = socket.read_exact(&mut buffer).await?; +// println!("The bytes: {:?}", &buffer[..n]); +// let first_message = BytesMut::from(&buffer[..n]).freeze(); +// println!("client - handshake msg responded to: {:?}", first_message); +// let key = match s1.finish(&first_message[..]) { +// Ok(key_bytes) => key_bytes, +// Err(e) => return Err(anyhow!(e.to_string())), +// }; +// println!("Handshake successful. Key is {:?}", key); +// return Ok((socket, new_cipher(&key))); +// } pub fn new_cipher(key: &Vec) -> Aes256Gcm { let key = Key::from_slice(&key[..]); diff --git a/src/handshake.rs b/src/handshake.rs new file mode 100644 index 0000000..1a04fa5 --- /dev/null +++ b/src/handshake.rs @@ -0,0 +1,78 @@ +use crate::conf::{HANDSHAKE_MSG_SIZE, ID_SIZE}; +use crate::crypto::new_cipher; + +use aes_gcm::Aes256Gcm; // Or `Aes128Gcm` +use anyhow::{anyhow, Result}; +use blake2::{Blake2s256, Digest}; +use bytes::{Bytes, BytesMut}; +use spake2::{Ed25519Group, Identity, Password, Spake2}; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::net::TcpStream; + +pub struct Handshake { + pub id: Bytes, + pub outbound_msg: Bytes, +} + +impl Handshake { + pub fn from_password(pw: &String) -> (Handshake, spake2::Spake2) { + let password = Bytes::from(pw.to_string()); + let id = Handshake::pass_to_bytes(&pw); + let (s1, outbound_msg) = + Spake2::::start_symmetric(&Password::new(&password), &Identity::new(&id)); + let mut buffer = BytesMut::with_capacity(HANDSHAKE_MSG_SIZE); + buffer.extend_from_slice(&outbound_msg[..HANDSHAKE_MSG_SIZE]); + let outbound_msg = buffer.freeze(); + let handshake = Handshake { id, outbound_msg }; + (handshake, s1) + } + + pub async fn from_socket(socket: TcpStream) -> Result<(Handshake, TcpStream)> { + let mut socket = socket; + let mut buffer = BytesMut::with_capacity(ID_SIZE + HANDSHAKE_MSG_SIZE); + match socket.read_exact(&mut buffer).await? { + 65 => Ok((Handshake::from_buffer(buffer), socket)), // magic number to catch correct capacity + _ => return Err(anyhow!("invalid handshake buffer pulled from socket")), + } + } + + pub fn from_buffer(buffer: BytesMut) -> Handshake { + let mut outbound_msg = BytesMut::from(&buffer[..ID_SIZE + HANDSHAKE_MSG_SIZE]).freeze(); + let id = outbound_msg.split_to(32); + Handshake { id, outbound_msg } + } + + pub fn to_bytes(self) -> Bytes { + let mut buffer = BytesMut::with_capacity(ID_SIZE + HANDSHAKE_MSG_SIZE); + buffer.extend_from_slice(&self.id); + buffer.extend_from_slice(&self.outbound_msg); + buffer.freeze() + } + + pub async fn negotiate( + self, + socket: TcpStream, + s1: spake2::Spake2, + ) -> Result<(TcpStream, Aes256Gcm)> { + let mut socket = socket; + // println!("client - sending handshake msg"); + socket.write_all(&self.to_bytes()).await?; + let mut buffer = [0; HANDSHAKE_MSG_SIZE]; + let n = socket.read_exact(&mut buffer).await?; + let response = BytesMut::from(&buffer[..n]).freeze(); + // println!("client - handshake msg, {:?}", response); + let key = match s1.finish(&response[..]) { + Ok(key_bytes) => key_bytes, + Err(e) => return Err(anyhow!(e.to_string())), + }; + println!("Handshake successful. Key is {:?}", key); + return Ok((socket, new_cipher(&key))); + } + + fn pass_to_bytes(password: &String) -> Bytes { + let mut hasher = Blake2s256::new(); + hasher.update(password.as_bytes()); + let res = hasher.finalize(); + BytesMut::from(&res[..]).freeze() + } +} diff --git a/src/main.rs b/src/main.rs index 65c2518..c33a731 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,6 +3,7 @@ mod client; mod conf; mod crypto; mod file; +mod handshake; mod message; mod server; diff --git a/src/server.rs b/src/server.rs index 102d0f4..85e8720 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,4 +1,5 @@ use crate::conf::BUFFER_SIZE; +use crate::handshake::Handshake; use anyhow::{anyhow, Result}; use bytes::{Bytes, BytesMut}; use std::collections::HashMap; @@ -48,12 +49,7 @@ impl Client { Ok(client) } - async fn upgrade( - client: Client, - state: State, - id: Bytes, - handshake_msg: Bytes, - ) -> Result { + async fn upgrade(client: Client, state: State, handshake: Handshake) -> Result { let mut client = client; let peer_tx = match client.peer_tx { // Receiver - already stapled at creation @@ -65,13 +61,13 @@ impl Client { client.socket.write_all(&msg[..]).await? } } - match state.lock().await.handshake_cache.remove(&id) { + match state.lock().await.handshake_cache.remove(&handshake.id) { Some(peer_tx) => peer_tx, None => return Err(anyhow!("Connection not stapled")), } } }; - peer_tx.send(handshake_msg)?; + peer_tx.send(handshake.outbound_msg)?; Ok(StapledClient { socket: client.socket, rx: client.rx, @@ -103,17 +99,11 @@ pub async fn handle_connection( _addr: SocketAddr, ) -> Result<()> { socket.readable().await?; - let mut socket = socket; - // let mut handshake_buffer = BytesMut::with_capacity(55); - let mut buffer = [0; 32 + 33]; - let n = socket.read_exact(&mut buffer).await?; - println!("The bytes: {:?}", &buffer[..n]); - let mut msg = BytesMut::from(&buffer[..n]).freeze(); - let id = msg.split_to(32); - println!("New client with id={:?}, msg={:?}", id.clone(), msg.clone()); + let (handshake, socket) = Handshake::from_socket(socket).await?; + let id = handshake.id.clone(); let client = Client::new(id.clone(), state.clone(), socket).await?; println!("Client created"); - let mut client = match Client::upgrade(client, state.clone(), id.clone(), msg).await { + let mut client = match Client::upgrade(client, state.clone(), handshake).await { Ok(client) => client, Err(err) => { // Clear handshake cache if staple is unsuccessful From e79510cafe0890b18e06eff4449937fed35b4b5a Mon Sep 17 00:00:00 2001 From: Donald Knuth Date: Mon, 29 Aug 2022 22:33:50 -0400 Subject: [PATCH 05/13] Refactor handshake --- src/client.rs | 44 ++++++++++++------------- src/conf.rs | 8 +++-- src/connection.rs | 4 +++ src/crypto.rs | 83 ++++++++++++++++++----------------------------- src/handshake.rs | 28 +++++++--------- src/message.rs | 19 +++-------- src/server.rs | 4 +-- 7 files changed, 79 insertions(+), 111 deletions(-) create mode 100644 src/connection.rs diff --git a/src/client.rs b/src/client.rs index 0e37446..bf15038 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,11 +1,11 @@ use crate::conf::BUFFER_SIZE; +use crate::crypto::Crypt; use crate::file::{to_size_string, FileHandle, FileInfo}; use crate::handshake::Handshake; use crate::message::{ EncryptedMessage, FileNegotiationPayload, FileTransferPayload, Message, MessageStream, }; -use aes_gcm::Aes256Gcm; use anyhow::{anyhow, Result}; use bytes::{Bytes, BytesMut}; use futures::future::try_join_all; @@ -26,16 +26,18 @@ pub async fn send(file_paths: &Vec, password: &String) -> Result<()> { // Establish connection to server let socket = TcpStream::connect("127.0.0.1:8080").await?; - // Complete handshake, returning cipher used for encryption let (handshake, s1) = Handshake::from_password(password); - let (socket, cipher) = handshake.negotiate(socket, s1).await?; + // Complete handshake, returning key used for encryption + let (socket, key) = handshake.negotiate(socket, s1).await?; let mut stream = Message::to_stream(socket); + let crypt = Crypt::new(&key); // Complete file negotiation - let handles = negotiate_files_up(handles, &mut stream, &cipher).await?; + let handles = negotiate_files_up(handles, &mut stream, &crypt).await?; // Upload negotiated files - upload_encrypted_files(&mut stream, handles, &cipher).await?; + upload_encrypted_files(&mut stream, handles, &crypt).await?; + println!("Done uploading."); // Exit Ok(()) @@ -44,11 +46,12 @@ pub async fn send(file_paths: &Vec, password: &String) -> Result<()> { pub async fn receive(password: &String) -> Result<()> { let socket = TcpStream::connect("127.0.0.1:8080").await?; let (handshake, s1) = Handshake::from_password(password); - let (socket, cipher) = handshake.negotiate(socket, s1).await?; + let (socket, key) = handshake.negotiate(socket, s1).await?; let mut stream = Message::to_stream(socket); - let files = negotiate_files_down(&mut stream, &cipher).await?; + let crypt = Crypt::new(&key); + let files = negotiate_files_down(&mut stream, &crypt).await?; - download_files(files, &mut stream, &cipher).await?; + download_files(files, &mut stream, &crypt).await?; return Ok(()); } @@ -63,23 +66,22 @@ pub async fn get_file_handles(file_paths: &Vec) -> Result, stream: &mut MessageStream, - cipher: &Aes256Gcm, + crypt: &Crypt, ) -> Result> { let files = file_handles.iter().map(|fh| fh.to_file_info()).collect(); let msg = EncryptedMessage::FileNegotiationMessage(FileNegotiationPayload { files }); - let server_msg = msg.to_encrypted_message(cipher)?; + let server_msg = msg.to_encrypted_message(crypt)?; println!("server_msg encrypted: {:?}", server_msg); stream.send(server_msg).await?; let reply_payload = match stream.next().await { Some(Ok(msg)) => match msg { Message::EncryptedMessage(response) => response, - _ => return Err(anyhow!("Expecting encrypted message back")), }, _ => { return Err(anyhow!("No response to negotiation message")); } }; - let plaintext_reply = EncryptedMessage::from_encrypted_message(cipher, &reply_payload)?; + let plaintext_reply = EncryptedMessage::from_encrypted_message(crypt, &reply_payload)?; let requested_paths: Vec = match plaintext_reply { EncryptedMessage::FileNegotiationMessage(fnm) => { fnm.files.into_iter().map(|f| f.path).collect() @@ -94,18 +96,17 @@ pub async fn negotiate_files_up( pub async fn negotiate_files_down( stream: &mut MessageStream, - cipher: &Aes256Gcm, + crypt: &Crypt, ) -> Result> { let file_offer = match stream.next().await { Some(Ok(msg)) => match msg { Message::EncryptedMessage(response) => response, - _ => return Err(anyhow!("Expecting encrypted message back")), }, _ => { return Err(anyhow!("No response to negotiation message")); } }; - let plaintext_offer = EncryptedMessage::from_encrypted_message(cipher, &file_offer)?; + let plaintext_offer = EncryptedMessage::from_encrypted_message(crypt, &file_offer)?; let requested_infos: Vec = match plaintext_offer { EncryptedMessage::FileNegotiationMessage(fnm) => fnm.files, _ => return Err(anyhow!("Expecting file negotiation message back")), @@ -125,7 +126,7 @@ pub async fn negotiate_files_down( let msg = EncryptedMessage::FileNegotiationMessage(FileNegotiationPayload { files: files.clone(), }); - let server_msg = msg.to_encrypted_message(cipher)?; + let server_msg = msg.to_encrypted_message(crypt)?; stream.send(server_msg).await?; Ok(files) } @@ -133,7 +134,7 @@ pub async fn negotiate_files_down( pub async fn upload_encrypted_files( stream: &mut MessageStream, handles: Vec, - cipher: &Aes256Gcm, + cipher: &Crypt, ) -> Result<()> { let (tx, mut rx) = mpsc::unbounded_channel::(); for mut handle in handles { @@ -188,7 +189,7 @@ pub async fn enqueue_file_chunks( pub async fn download_files( file_infos: Vec, stream: &mut MessageStream, - cipher: &Aes256Gcm, + cipher: &Crypt, ) -> Result<()> { // for each file_info let mut info_handles: HashMap> = HashMap::new(); @@ -206,18 +207,15 @@ pub async fn download_files( // println!("encrypted message received! {:?}", ec); match ec { EncryptedMessage::FileTransferMessage(payload) => { - println!("matched file transfer message"); + // println!("matched file transfer message"); if let Some(tx) = info_handles.get(&payload.file_info.path) { - println!("matched on filetype, sending to tx"); + // println!("matched on filetype, sending to tx"); tx.send((payload.chunk_num, payload.chunk))? }; }, _ => {println!("wrong msg")} } } - Some(Ok(_)) => { - println!("wrong msg"); - } Some(Err(e)) => { println!("Error {:?}", e); } diff --git a/src/conf.rs b/src/conf.rs index 9ab88ad..6fb5d0e 100644 --- a/src/conf.rs +++ b/src/conf.rs @@ -1,3 +1,5 @@ -pub const ID_SIZE: usize = 32; -pub const HANDSHAKE_MSG_SIZE: usize = 33; -pub const BUFFER_SIZE: usize = 1024 * 1024 * 10; +pub const ID_SIZE: usize = 32; // Blake256 of password +pub const HANDSHAKE_MSG_SIZE: usize = 33; // generated by Spake2 +pub const PER_CLIENT_BUFFER: usize = 1024 * 1024; // buffer size allocated by server for each client +pub const BUFFER_SIZE: usize = 1024 * 1024 * 1024; // chunk size for files sent over wire +pub const NONCE_SIZE_IN_BYTES: usize = 96 / 8; // used for every encrypted message diff --git a/src/connection.rs b/src/connection.rs new file mode 100644 index 0000000..413ad0b --- /dev/null +++ b/src/connection.rs @@ -0,0 +1,4 @@ +pub struct Connection { + ms: MessageStream; + crypt: Crypt +} \ No newline at end of file diff --git a/src/crypto.rs b/src/crypto.rs index 60dd513..6fb97d6 100644 --- a/src/crypto.rs +++ b/src/crypto.rs @@ -1,3 +1,4 @@ +use crate::conf::NONCE_SIZE_IN_BYTES; use crate::message::EncryptedPayload; use aes_gcm::aead::{Aead, NewAead}; use aes_gcm::{Aes256Gcm, Key, Nonce}; // Or `Aes128Gcm` @@ -6,59 +7,37 @@ use bytes::Bytes; use rand::{thread_rng, Rng}; -// pub async fn handshake( -// socket: TcpStream, -// password: Bytes, -// id: Bytes, -// ) -> Result<(TcpStream, Aes256Gcm)> { -// let mut socket = socket; -// let (s1, outbound_msg) = -// Spake2::::start_symmetric(&Password::new(password), &Identity::new(&id)); -// println!("client - sending handshake msg"); -// let mut handshake_msg = BytesMut::with_capacity(32 + 33); -// handshake_msg.extend_from_slice(&id); -// handshake_msg.extend_from_slice(&outbound_msg); -// let handshake_msg = handshake_msg.freeze(); -// println!("client - handshake msg, {:?}", handshake_msg); -// println!("id: {:?}. msg: {:?}", id.clone(), outbound_msg.clone()); -// socket.write_all(&handshake_msg).await?; -// let mut buffer = [0; 33]; -// let n = socket.read_exact(&mut buffer).await?; -// println!("The bytes: {:?}", &buffer[..n]); -// let first_message = BytesMut::from(&buffer[..n]).freeze(); -// println!("client - handshake msg responded to: {:?}", first_message); -// let key = match s1.finish(&first_message[..]) { -// Ok(key_bytes) => key_bytes, -// Err(e) => return Err(anyhow!(e.to_string())), -// }; -// println!("Handshake successful. Key is {:?}", key); -// return Ok((socket, new_cipher(&key))); -// } - -pub fn new_cipher(key: &Vec) -> Aes256Gcm { - let key = Key::from_slice(&key[..]); - Aes256Gcm::new(key) +pub struct Crypt { + cipher: Aes256Gcm, } -pub const NONCE_SIZE_IN_BYTES: usize = 96 / 8; -pub fn encrypt(cipher: &Aes256Gcm, body: &Vec) -> Result { - let mut arr = [0u8; NONCE_SIZE_IN_BYTES]; - thread_rng().try_fill(&mut arr[..])?; - let nonce = Nonce::from_slice(&arr); - let plaintext = body.as_ref(); - match cipher.encrypt(nonce, plaintext) { - Ok(body) => Ok(EncryptedPayload { - nonce: arr.to_vec(), - body, - }), - Err(_) => Err(anyhow!("Encryption error")), - } -} - -pub fn decrypt(cipher: &Aes256Gcm, payload: &EncryptedPayload) -> Result { - let nonce = Nonce::from_slice(payload.nonce.as_ref()); - match cipher.decrypt(nonce, payload.body.as_ref()) { - Ok(payload) => Ok(Bytes::from(payload)), - Err(_) => Err(anyhow!("Decryption error")), +impl Crypt { + pub fn new(key: &Vec) -> Crypt { + let key = Key::from_slice(&key[..]); + Crypt { + cipher: Aes256Gcm::new(key), + } + } + + pub fn encrypt(&self, body: &Vec) -> Result { + let mut arr = [0u8; NONCE_SIZE_IN_BYTES]; + thread_rng().try_fill(&mut arr[..])?; + let nonce = Nonce::from_slice(&arr); + let plaintext = body.as_ref(); + match self.cipher.encrypt(nonce, plaintext) { + Ok(body) => Ok(EncryptedPayload { + nonce: arr.to_vec(), + body, + }), + Err(_) => Err(anyhow!("Encryption error")), + } + } + + pub fn decrypt(&self, payload: &EncryptedPayload) -> Result { + let nonce = Nonce::from_slice(payload.nonce.as_ref()); + match self.cipher.decrypt(nonce, payload.body.as_ref()) { + Ok(payload) => Ok(Bytes::from(payload)), + Err(_) => Err(anyhow!("Decryption error")), + } } } diff --git a/src/handshake.rs b/src/handshake.rs index 1a04fa5..1b1b409 100644 --- a/src/handshake.rs +++ b/src/handshake.rs @@ -1,7 +1,5 @@ use crate::conf::{HANDSHAKE_MSG_SIZE, ID_SIZE}; -use crate::crypto::new_cipher; -use aes_gcm::Aes256Gcm; // Or `Aes128Gcm` use anyhow::{anyhow, Result}; use blake2::{Blake2s256, Digest}; use bytes::{Bytes, BytesMut}; @@ -29,17 +27,12 @@ impl Handshake { pub async fn from_socket(socket: TcpStream) -> Result<(Handshake, TcpStream)> { let mut socket = socket; - let mut buffer = BytesMut::with_capacity(ID_SIZE + HANDSHAKE_MSG_SIZE); - match socket.read_exact(&mut buffer).await? { - 65 => Ok((Handshake::from_buffer(buffer), socket)), // magic number to catch correct capacity - _ => return Err(anyhow!("invalid handshake buffer pulled from socket")), - } - } - - pub fn from_buffer(buffer: BytesMut) -> Handshake { - let mut outbound_msg = BytesMut::from(&buffer[..ID_SIZE + HANDSHAKE_MSG_SIZE]).freeze(); - let id = outbound_msg.split_to(32); - Handshake { id, outbound_msg } + let mut buffer = [0; ID_SIZE + HANDSHAKE_MSG_SIZE]; + let n = socket.read_exact(&mut buffer).await?; + println!("The bytes: {:?}", &buffer[..n]); + let mut outbound_msg = BytesMut::from(&buffer[..n]).freeze(); + let id = outbound_msg.split_to(ID_SIZE); + Ok((Handshake { id, outbound_msg }, socket)) } pub fn to_bytes(self) -> Bytes { @@ -53,10 +46,11 @@ impl Handshake { self, socket: TcpStream, s1: spake2::Spake2, - ) -> Result<(TcpStream, Aes256Gcm)> { + ) -> Result<(TcpStream, Vec)> { let mut socket = socket; - // println!("client - sending handshake msg"); - socket.write_all(&self.to_bytes()).await?; + let bytes = &self.to_bytes(); + println!("client - sending handshake msg= {:?}", &bytes); + socket.write_all(&bytes).await?; let mut buffer = [0; HANDSHAKE_MSG_SIZE]; let n = socket.read_exact(&mut buffer).await?; let response = BytesMut::from(&buffer[..n]).freeze(); @@ -66,7 +60,7 @@ impl Handshake { Err(e) => return Err(anyhow!(e.to_string())), }; println!("Handshake successful. Key is {:?}", key); - return Ok((socket, new_cipher(&key))); + return Ok((socket, key)); } fn pass_to_bytes(password: &String) -> Bytes { diff --git a/src/message.rs b/src/message.rs index 10593a2..b815a1d 100644 --- a/src/message.rs +++ b/src/message.rs @@ -1,7 +1,6 @@ -use crate::crypto::{decrypt, encrypt}; +use crate::crypto::Crypt; use crate::file::FileInfo; -use aes_gcm::Aes256Gcm; // Or `Aes128Gcm` use anyhow::{anyhow, Result}; use bytes::Bytes; use serde::{Deserialize, Serialize}; @@ -13,15 +12,7 @@ use tokio_util::codec::{Framed, LengthDelimitedCodec}; #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub enum Message { - HandshakeMessage(HandshakePayload), EncryptedMessage(EncryptedPayload), - ErrorMessage(RuckError), -} - -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] -pub struct HandshakePayload { - pub id: Bytes, - pub msg: Bytes, } #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] @@ -49,8 +40,8 @@ pub struct FileTransferPayload { } impl EncryptedMessage { - pub fn from_encrypted_message(cipher: &Aes256Gcm, payload: &EncryptedPayload) -> Result { - let raw = decrypt(cipher, payload)?; + pub fn from_encrypted_message(crypt: &Crypt, payload: &EncryptedPayload) -> Result { + let raw = crypt.decrypt(payload)?; let res = match bincode::deserialize(raw.as_ref()) { Ok(result) => result, Err(e) => { @@ -60,7 +51,7 @@ impl EncryptedMessage { }; Ok(res) } - pub fn to_encrypted_message(&self, cipher: &Aes256Gcm) -> Result { + pub fn to_encrypted_message(&self, crypt: &Crypt) -> Result { let raw = match bincode::serialize(&self) { Ok(result) => result, Err(e) => { @@ -68,7 +59,7 @@ impl EncryptedMessage { return Err(anyhow!("serialize error")); } }; - let payload = encrypt(cipher, &raw)?; + let payload = crypt.encrypt(&raw)?; Ok(Message::EncryptedMessage(payload)) } } diff --git a/src/server.rs b/src/server.rs index 85e8720..3509c47 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,4 +1,4 @@ -use crate::conf::BUFFER_SIZE; +use crate::conf::PER_CLIENT_BUFFER; use crate::handshake::Handshake; use anyhow::{anyhow, Result}; use bytes::{Bytes, BytesMut}; @@ -113,7 +113,7 @@ pub async fn handle_connection( }; println!("Client upgraded"); // The handshake cache should be empty for {id} at this point. - let mut client_buffer = BytesMut::with_capacity(BUFFER_SIZE); + let mut client_buffer = BytesMut::with_capacity(PER_CLIENT_BUFFER); loop { tokio::select! { Some(msg) = client.rx.recv() => { From 3c7c5fe29d3213319041e8a9681cb0ffce8635fd Mon Sep 17 00:00:00 2001 From: Donald Knuth Date: Tue, 30 Aug 2022 09:53:16 -0400 Subject: [PATCH 06/13] Refactor connection --- src/client.rs | 138 +++++++++++++--------------------------------- src/connection.rs | 40 +++++++++++++- src/crypto.rs | 30 +++++----- src/main.rs | 1 + src/message.rs | 78 +++++--------------------- src/server.rs | 5 +- 6 files changed, 108 insertions(+), 184 deletions(-) diff --git a/src/client.rs b/src/client.rs index bf15038..fa1ce0a 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,10 +1,8 @@ use crate::conf::BUFFER_SIZE; -use crate::crypto::Crypt; +use crate::connection::Connection; use crate::file::{to_size_string, FileHandle, FileInfo}; use crate::handshake::Handshake; -use crate::message::{ - EncryptedMessage, FileNegotiationPayload, FileTransferPayload, Message, MessageStream, -}; +use crate::message::{FileNegotiationPayload, FileTransferPayload, Message}; use anyhow::{anyhow, Result}; use bytes::{Bytes, BytesMut}; @@ -30,13 +28,12 @@ pub async fn send(file_paths: &Vec, password: &String) -> Result<()> { // Complete handshake, returning key used for encryption let (socket, key) = handshake.negotiate(socket, s1).await?; - let mut stream = Message::to_stream(socket); - let crypt = Crypt::new(&key); + let mut connection = Connection::new(socket, key); // Complete file negotiation - let handles = negotiate_files_up(handles, &mut stream, &crypt).await?; + let handles = negotiate_files_up(&mut connection, handles).await?; // Upload negotiated files - upload_encrypted_files(&mut stream, handles, &crypt).await?; + upload_encrypted_files(&mut connection, handles).await?; println!("Done uploading."); // Exit @@ -47,11 +44,10 @@ pub async fn receive(password: &String) -> Result<()> { let socket = TcpStream::connect("127.0.0.1:8080").await?; let (handshake, s1) = Handshake::from_password(password); let (socket, key) = handshake.negotiate(socket, s1).await?; - let mut stream = Message::to_stream(socket); - let crypt = Crypt::new(&key); - let files = negotiate_files_down(&mut stream, &crypt).await?; + let mut connection = Connection::new(socket, key); + let files = negotiate_files_down(&mut connection).await?; - download_files(files, &mut stream, &crypt).await?; + download_files(&mut connection, files).await?; return Ok(()); } @@ -64,28 +60,15 @@ pub async fn get_file_handles(file_paths: &Vec) -> Result, - stream: &mut MessageStream, - crypt: &Crypt, ) -> Result> { let files = file_handles.iter().map(|fh| fh.to_file_info()).collect(); - let msg = EncryptedMessage::FileNegotiationMessage(FileNegotiationPayload { files }); - let server_msg = msg.to_encrypted_message(crypt)?; - println!("server_msg encrypted: {:?}", server_msg); - stream.send(server_msg).await?; - let reply_payload = match stream.next().await { - Some(Ok(msg)) => match msg { - Message::EncryptedMessage(response) => response, - }, - _ => { - return Err(anyhow!("No response to negotiation message")); - } - }; - let plaintext_reply = EncryptedMessage::from_encrypted_message(crypt, &reply_payload)?; - let requested_paths: Vec = match plaintext_reply { - EncryptedMessage::FileNegotiationMessage(fnm) => { - fnm.files.into_iter().map(|f| f.path).collect() - } + let msg = Message::FileNegotiationMessage(FileNegotiationPayload { files }); + conn.send_msg(msg).await?; + let reply = conn.await_msg().await?; + let requested_paths: Vec = match reply { + Message::FileNegotiationMessage(fnm) => fnm.files.into_iter().map(|f| f.path).collect(), _ => return Err(anyhow!("Expecting file negotiation message back")), }; Ok(file_handles @@ -94,21 +77,10 @@ pub async fn negotiate_files_up( .collect()) } -pub async fn negotiate_files_down( - stream: &mut MessageStream, - crypt: &Crypt, -) -> Result> { - let file_offer = match stream.next().await { - Some(Ok(msg)) => match msg { - Message::EncryptedMessage(response) => response, - }, - _ => { - return Err(anyhow!("No response to negotiation message")); - } - }; - let plaintext_offer = EncryptedMessage::from_encrypted_message(crypt, &file_offer)?; - let requested_infos: Vec = match plaintext_offer { - EncryptedMessage::FileNegotiationMessage(fnm) => fnm.files, +pub async fn negotiate_files_down(conn: &mut Connection) -> Result> { + let offer = conn.await_msg().await?; + let requested_infos: Vec = match offer { + Message::FileNegotiationMessage(fnm) => fnm.files, _ => return Err(anyhow!("Expecting file negotiation message back")), }; let mut stdin = FramedRead::new(io::stdin(), LinesCodec::new()); @@ -123,47 +95,22 @@ pub async fn negotiate_files_down( _ => {} } } - let msg = EncryptedMessage::FileNegotiationMessage(FileNegotiationPayload { + let msg = Message::FileNegotiationMessage(FileNegotiationPayload { files: files.clone(), }); - let server_msg = msg.to_encrypted_message(crypt)?; - stream.send(server_msg).await?; + conn.send_msg(msg).await?; Ok(files) } -pub async fn upload_encrypted_files( - stream: &mut MessageStream, - handles: Vec, - cipher: &Crypt, -) -> Result<()> { - let (tx, mut rx) = mpsc::unbounded_channel::(); +pub async fn upload_encrypted_files(conn: &mut Connection, handles: Vec) -> Result<()> { for mut handle in handles { - let txc = tx.clone(); - tokio::spawn(async move { - let _ = enqueue_file_chunks(&mut handle, txc).await; - }); - } - - loop { - tokio::select! { - Some(msg) = rx.recv() => { - // println!("message received to client.rx {:?}", msg); - let x = msg.to_encrypted_message(cipher)?; - stream.send(x).await? - } - else => { - println!("breaking"); - break - }, - } + enqueue_file_chunks(conn, &mut handle).await?; } + println!("Files uploaded."); Ok(()) } -pub async fn enqueue_file_chunks( - fh: &mut FileHandle, - tx: mpsc::UnboundedSender, -) -> Result<()> { +pub async fn enqueue_file_chunks(conn: &mut Connection, fh: &mut FileHandle) -> Result<()> { let mut chunk_num = 0; let mut bytes_read = 1; while bytes_read != 0 { @@ -173,12 +120,12 @@ pub async fn enqueue_file_chunks( if bytes_read != 0 { let chunk = buf.freeze(); let file_info = fh.to_file_info(); - let ftp = EncryptedMessage::FileTransferMessage(FileTransferPayload { + let ftp = Message::FileTransferMessage(FileTransferPayload { chunk, chunk_num, file_info, }); - tx.send(ftp)?; + conn.send_msg(ftp).await?; chunk_num += 1; } } @@ -186,11 +133,7 @@ pub async fn enqueue_file_chunks( Ok(()) } -pub async fn download_files( - file_infos: Vec, - stream: &mut MessageStream, - cipher: &Crypt, -) -> Result<()> { +pub async fn download_files(conn: &mut Connection, file_infos: Vec) -> Result<()> { // for each file_info let mut info_handles: HashMap> = HashMap::new(); for fi in file_infos { @@ -201,29 +144,24 @@ pub async fn download_files( } loop { tokio::select! { - result = stream.next() => match result { - Some(Ok(Message::EncryptedMessage(payload))) => { - let ec = EncryptedMessage::from_encrypted_message(cipher, &payload)?; - // println!("encrypted message received! {:?}", ec); - match ec { - EncryptedMessage::FileTransferMessage(payload) => { - // println!("matched file transfer message"); + result = conn.await_msg() => match result { + Ok(msg) => { + match msg { + Message::FileTransferMessage(payload) => { if let Some(tx) = info_handles.get(&payload.file_info.path) { - // println!("matched on filetype, sending to tx"); tx.send((payload.chunk_num, payload.chunk))? - }; + } }, - _ => {println!("wrong msg")} + _ => { + println!("Wrong message type"); + return Err(anyhow!("wrong message type")); + } } - } - Some(Err(e)) => { - println!("Error {:?}", e); - } - None => break, + }, + Err(e) => return Err(anyhow!(e.to_string())), } } } - Ok(()) } pub async fn download_file( diff --git a/src/connection.rs b/src/connection.rs index 413ad0b..3bca2a2 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -1,4 +1,38 @@ +use crate::crypto::Crypt; +use crate::message::{Message, MessageStream}; +use anyhow::{anyhow, Result}; +use futures::{SinkExt, StreamExt}; +use tokio::net::TcpStream; + pub struct Connection { - ms: MessageStream; - crypt: Crypt -} \ No newline at end of file + ms: MessageStream, + crypt: Crypt, +} + +impl Connection { + pub fn new(socket: TcpStream, key: Vec) -> Self { + let ms = Message::to_stream(socket); + let crypt = Crypt::new(&key); + Connection { ms, crypt } + } + pub async fn send_msg(&mut self, msg: Message) -> Result<()> { + let msg = msg.serialize()?; + let bytes = self.crypt.encrypt(msg)?; + match self.ms.send(bytes).await { + Ok(_) => Ok(()), + Err(e) => Err(anyhow!(e.to_string())), + } + } + + pub async fn await_msg(&mut self) -> Result { + match self.ms.next().await { + Some(Ok(msg)) => { + let decrypted_bytes = self.crypt.decrypt(msg.freeze())?; + Message::deserialize(decrypted_bytes) + } + _ => { + return Err(anyhow!("No response to negotiation message")); + } + } + } +} diff --git a/src/crypto.rs b/src/crypto.rs index 6fb97d6..fc78e80 100644 --- a/src/crypto.rs +++ b/src/crypto.rs @@ -1,9 +1,8 @@ use crate::conf::NONCE_SIZE_IN_BYTES; -use crate::message::EncryptedPayload; use aes_gcm::aead::{Aead, NewAead}; use aes_gcm::{Aes256Gcm, Key, Nonce}; // Or `Aes128Gcm` use anyhow::{anyhow, Result}; -use bytes::Bytes; +use bytes::{Bytes, BytesMut}; use rand::{thread_rng, Rng}; @@ -19,25 +18,28 @@ impl Crypt { } } - pub fn encrypt(&self, body: &Vec) -> Result { + pub fn encrypt(&self, plaintext: Bytes) -> Result { let mut arr = [0u8; NONCE_SIZE_IN_BYTES]; thread_rng().try_fill(&mut arr[..])?; let nonce = Nonce::from_slice(&arr); - let plaintext = body.as_ref(); - match self.cipher.encrypt(nonce, plaintext) { - Ok(body) => Ok(EncryptedPayload { - nonce: arr.to_vec(), - body, - }), - Err(_) => Err(anyhow!("Encryption error")), + match self.cipher.encrypt(nonce, plaintext.as_ref()) { + Ok(body) => { + let mut buffer = BytesMut::with_capacity(NONCE_SIZE_IN_BYTES + body.len()); + buffer.extend_from_slice(nonce); + buffer.extend_from_slice(&body); + Ok(buffer.freeze()) + } + Err(e) => Err(anyhow!(e.to_string())), } } - pub fn decrypt(&self, payload: &EncryptedPayload) -> Result { - let nonce = Nonce::from_slice(payload.nonce.as_ref()); - match self.cipher.decrypt(nonce, payload.body.as_ref()) { + pub fn decrypt(&self, body: Bytes) -> Result { + let mut body = body; + let nonce_bytes = body.split_to(NONCE_SIZE_IN_BYTES); + let nonce = Nonce::from_slice(&nonce_bytes); + match self.cipher.decrypt(nonce, body.as_ref()) { Ok(payload) => Ok(Bytes::from(payload)), - Err(_) => Err(anyhow!("Decryption error")), + Err(e) => Err(anyhow!(e.to_string())), } } } diff --git a/src/main.rs b/src/main.rs index c33a731..e481fd1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,7 @@ mod cli; mod client; mod conf; +mod connection; mod crypto; mod file; mod handshake; diff --git a/src/message.rs b/src/message.rs index b815a1d..5abf618 100644 --- a/src/message.rs +++ b/src/message.rs @@ -1,28 +1,13 @@ -use crate::crypto::Crypt; use crate::file::FileInfo; use anyhow::{anyhow, Result}; use bytes::Bytes; use serde::{Deserialize, Serialize}; -use std::error::Error; -use std::fmt; use tokio::net::TcpStream; -use tokio_serde::{formats::SymmetricalBincode, SymmetricallyFramed}; use tokio_util::codec::{Framed, LengthDelimitedCodec}; #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub enum Message { - EncryptedMessage(EncryptedPayload), -} - -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] -pub struct EncryptedPayload { - pub nonce: Vec, - pub body: Vec, -} - -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] -pub enum EncryptedMessage { FileNegotiationMessage(FileNegotiationPayload), FileTransferMessage(FileTransferPayload), } @@ -39,62 +24,25 @@ pub struct FileTransferPayload { pub chunk: Bytes, } -impl EncryptedMessage { - pub fn from_encrypted_message(crypt: &Crypt, payload: &EncryptedPayload) -> Result { - let raw = crypt.decrypt(payload)?; - let res = match bincode::deserialize(raw.as_ref()) { - Ok(result) => result, - Err(e) => { - println!("deserialize error {:?}", e); - return Err(anyhow!("deser error")); - } - }; - Ok(res) +impl Message { + pub fn serialize(&self) -> Result { + match bincode::serialize(&self) { + Ok(vec) => Ok(Bytes::from(vec)), + Err(e) => Err(anyhow!(e.to_string())), + } } - pub fn to_encrypted_message(&self, crypt: &Crypt) -> Result { - let raw = match bincode::serialize(&self) { - Ok(result) => result, - Err(e) => { - println!("serialize error {:?}", e); - return Err(anyhow!("serialize error")); - } - }; - let payload = crypt.encrypt(&raw)?; - Ok(Message::EncryptedMessage(payload)) - } -} - -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] -pub enum RuckError { - NotHandshake, - SenderNotConnected, - SenderAlreadyConnected, - PairDisconnected, -} - -impl fmt::Display for RuckError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "RuckError is here!") - } -} - -impl Error for RuckError { - fn source(&self) -> Option<&(dyn Error + 'static)> { - Some(self) + pub fn deserialize(bytes: Bytes) -> Result { + match bincode::deserialize(bytes.as_ref()) { + Ok(msg) => Ok(msg), + Err(e) => Err(anyhow!(e.to_string())), + } } } impl Message { pub fn to_stream(stream: TcpStream) -> MessageStream { - tokio_serde::SymmetricallyFramed::new( - Framed::new(stream, LengthDelimitedCodec::new()), - tokio_serde::formats::SymmetricalBincode::::default(), - ) + Framed::new(stream, LengthDelimitedCodec::new()) } } -pub type MessageStream = SymmetricallyFramed< - Framed, - Message, - SymmetricalBincode, ->; +pub type MessageStream = Framed; diff --git a/src/server.rs b/src/server.rs index 3509c47..f4cf567 100644 --- a/src/server.rs +++ b/src/server.rs @@ -57,7 +57,9 @@ impl Client { // Sender - needs to wait for the incoming msg to look up peer_tx None => { tokio::select! { + // Client reads handshake message sent over channel Some(msg) = client.rx.recv() => { + // Writes parnter handshake message over wire client.socket.write_all(&msg[..]).await? } } @@ -126,9 +128,8 @@ pub async fn handle_connection( }, Ok(n) => { let b = BytesMut::from(&client_buffer[0..n]).freeze(); - // println!("reading more = {:?}", b); + client.peer_tx.send(b)?; client_buffer.clear(); - client.peer_tx.send(b)? }, Err(e) => { println!("Error {:?}", e); From 0dec881a2de57c7ee0b910e3bf2dcd5cdb100b24 Mon Sep 17 00:00:00 2001 From: Donald Knuth Date: Tue, 30 Aug 2022 10:23:50 -0400 Subject: [PATCH 07/13] Update readme --- README.md | 49 ++++++++++++++++++++++++++---------------------- docs/Protocol.md | 39 -------------------------------------- 2 files changed, 27 insertions(+), 61 deletions(-) delete mode 100644 docs/Protocol.md diff --git a/README.md b/README.md index 1465e2c..97d7408 100644 --- a/README.md +++ b/README.md @@ -1,30 +1,35 @@ -- https://github.com/mcginty/snow/tree/master/examples -- https://github.com/black-binary/snowstorm/tree/master/examples -- https://cryptoservices.github.io/cryptography/protocols/2016/04/27/noise-protocol.html +# ruck -- https://github.com/libp2p +`ruck` is a command line tool used for hosting relay servers and sending end-to-end encrypted files between clients. It was heavily inspired by [croc](https://github.com/schollz/croc), one of the easiest ways to send files between peers. This document describes the protocol `ruck` uses to support this functionality. -- https://docs.rs/spake2/latest/spake2/ -- https://crates.io/crates/chacha20poly1305 -- https://briansmith.org/rustdoc/ring/aead/index.html -- https://libsodium.gitbook.io/doc/secret-key_cryptography/secretstream +### Version -- https://kerkour.com/rust-symmetric-encryption-aead-benchmark/ -- https://docs.rs/aes-gcm/latest/aes_gcm/ +This document refers to version `0.1.0` of `ruck` as defined by the `Cargo.toml` file. -- https://github.com/rust-lang/flate2-rs -- https://crates.io/crates/async-compression +## Server -- https://pdos.csail.mit.edu/papers/chord:sigcomm01/chord_sigcomm.pdf -- https://pdos.csail.mit.edu/6.824/schedule.html -- https://flylib.com/books/en/2.292.1.105/1/ -- https://en.wikipedia.org/wiki/Two_Generals%27_Problem +The server in `ruck` exposes a TCP port. +Its only functions are to staple connections and shuttle bytes between stapled connections. +The first 32 bytes sent over the wire from a new client are used as its unique identifier. +When a new client joins, if the server has another open connection with the same identifier, the connections are then stapled. +The clients have some mechanism for agreeing on these identifiers, however, from the server's perspective it doesn't matter how they agree. -- https://kobzol.github.io/rust/ci/2021/05/07/building-rust-binaries-in-ci-that-work-with-older-glibc.html +Once the connection is stapled, all bytes are piped across until a client disconnects or times out. +The time out is set to remove idle connections. +The server does nothing else with the bytes, so the clients are free to end-to-end encrypt their messages. +For this reason, updates to the `ruck` protocol do not typically necessitate server redeployments. -## Todo +## Client -- [ ] Use tracing -- [ ] Add progress bars -- [ ] Exit happily when transfer is complete -- [ ] Compress files +There are two types of clients - `send` and `receive` clients. +Out of band, the clients agree on a relay server and password, from which they can derive the 32 byte identifier used by the server to staple their connections. +Clients have the option of using the single-use, automatically generated passwords which `ruck` supplies by default. +Using the passwords per the [Spake2](https://docs.rs/spake2/0.3.1/spake2/) handshake algorithm, clients generate a symmetric key with which to encrypt their subsequent messages. +Once the handshake is complete, `send` and `receive` negotiate and exchange files per the following: + +- `send` offers a list of files and waits. +- `receive` specifies which bytes it wants from these files. +- `send` sends the specified bytes and waits. +- `receive` sends heartbeats with progress updates. +- `send` hangs up once the heartbeats stop or received a successful heartbeat. +- `receive` hangs up once the downloads are complete. diff --git a/docs/Protocol.md b/docs/Protocol.md deleted file mode 100644 index efd77b4..0000000 --- a/docs/Protocol.md +++ /dev/null @@ -1,39 +0,0 @@ -# Protocol - -`ruck` is a command line tool used for hosting relay servers and sending end-to-end encrypted files between clients. This document describes the protocol `ruck` uses to support this functionality. - -### Version - -This document refers to version `0.1.0` of `ruck` as defined by the `Cargo.toml` file. - -## Server - -The server in `ruck` exposes a TCP port, typically port `8080`. Its only functions are to staple connections and shuttle bytes between stapled connections. The first 32 bytes sent from a new client are stored in a HashMap. If the same 32 bytes are already in the Hashmap, the connections are then stapled. This 32 byte key is defined by the [Spake2](https://docs.rs/spake2/0.3.1/spake2/) handshake algorithm which the clients employ to negotiate a single use password to encrypt all their messages. Although from the server's perspective, the clients can agree on these 32 bytes in any way. - -Once the connection is stapled, all bytes are piped across until a client disconnects or times out. The time out is set to remove idle connections. Beyond stapling connections, the file negotiation aspect of the protocol is managed by the clients. For this reason, `ruck` servers are very resistant to updates and protocol updates typically do not necessitate new deployments. - -The server does nothing else with the bytes, so the clients are free to end-to-end encrypt their messages, as long as the first 32 bytes sent over the wire match. Other than that, it is a private echo server. - -## Client - -There are two types of clients - `send` and `receive` clients. The following state machine describes the protocol. All the messages after the exchange of passwords are typically bzip compressed, encrypted with Aes256Gcm using a Spake2 key derived from the exchanged password. They are sent over the wire as bincode. Each message has a fixed size of 1024 \* 1024 bytes. - -Message Types: - -- Vec -- - -- Set a timeout for new messages. - -Send or receive. - -If send: - -- Send message with file info - -They exchange passwords. -Send offers a list of files. -Receive specifies which bytes it wants from these files. -Send sends the specified bytes and waits. -Receive sends heartbeats with progress updates. -Send hangs up once the heartbeats stop or received a successful heartbeat. From 7bcbe7c3ae81be5a0c5ccc717a7ab3a1e0c526da Mon Sep 17 00:00:00 2001 From: Donald Knuth Date: Tue, 30 Aug 2022 10:40:41 -0400 Subject: [PATCH 08/13] Remove unused deps --- Cargo.lock | 103 ----------------------------------------------------- Cargo.toml | 2 -- 2 files changed, 105 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7d59848..ebb6412 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -213,31 +213,6 @@ dependencies = [ "subtle", ] -[[package]] -name = "educe" -version = "0.4.18" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f86b50932a01e7ec5c06160492ab660fb19b6bb2a7878030dd6cd68d21df9d4d" -dependencies = [ - "enum-ordinalize", - "proc-macro2", - "quote", - "syn", -] - -[[package]] -name = "enum-ordinalize" -version = "3.1.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b166c9e378360dd5a6666a9604bb4f54ae0cac39023ffbac425e917a2a04fef" -dependencies = [ - "num-bigint", - "num-traits", - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "futures" version = "0.3.21" @@ -495,36 +470,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "num-bigint" -version = "0.4.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f93ab6289c7b344a8a9f60f88d80aa20032336fe78da341afc91c8a2341fc75f" -dependencies = [ - "autocfg", - "num-integer", - "num-traits", -] - -[[package]] -name = "num-integer" -version = "0.1.44" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d2cc698a63b549a70bc047073d2949cce27cd1c7b0a4a862d08a8031bc2801db" -dependencies = [ - "autocfg", - "num-traits", -] - -[[package]] -name = "num-traits" -version = "0.2.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a64b1ec5cda2586e284722486d802acf1f7dbdc623e2bfc57e65ca1cd099290" -dependencies = [ - "autocfg", -] - [[package]] name = "num_cpus" version = "1.13.1" @@ -581,26 +526,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "pin-project" -version = "1.0.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "58ad3879ad3baf4e44784bc6a718a8698867bb991f8ce24d1bcbe2cfb4c3a75e" -dependencies = [ - "pin-project-internal", -] - -[[package]] -name = "pin-project-internal" -version = "1.0.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "744b6f092ba29c3650faf274db506afd39944f48420f6c86b17cfe0ee1cb36bb" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "pin-project-lite" version = "0.2.8" @@ -746,8 +671,6 @@ dependencies = [ "serde", "spake2", "tokio", - "tokio-serde", - "tokio-stream", "tokio-util", ] @@ -889,32 +812,6 @@ dependencies = [ "syn", ] -[[package]] -name = "tokio-serde" -version = "0.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "911a61637386b789af998ee23f50aa30d5fd7edcec8d6d3dedae5e5815205466" -dependencies = [ - "bincode", - "bytes", - "educe", - "futures-core", - "futures-sink", - "pin-project", - "serde", -] - -[[package]] -name = "tokio-stream" -version = "0.1.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50145484efff8818b5ccd256697f36863f587da82cf8b409c53adf1e840798e3" -dependencies = [ - "futures-core", - "pin-project-lite", - "tokio", -] - [[package]] name = "tokio-util" version = "0.6.9" diff --git a/Cargo.toml b/Cargo.toml index 8d71e61..7272996 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,6 +16,4 @@ rand = "0.8.4" serde = { version = "1.0", features = ["derive"] } spake2 = "0.3.1" tokio = { version = "1.16.1", features = ["full"] } -tokio-serde = { version = "~0.8", features = ["bincode"] } -tokio-stream = { version = "~0.1.6", features = ["net"]} tokio-util = { version = "0.6.3", features = ["full"]} \ No newline at end of file From d752e8f16ef42301500bf722ca9b4845b6527c70 Mon Sep 17 00:00:00 2001 From: Donald Knuth Date: Tue, 30 Aug 2022 11:36:45 -0400 Subject: [PATCH 09/13] Tweaks --- .gitignore | 1 + src/connection.rs | 17 +++++++++++------ src/crypto.rs | 19 +++++++++++-------- 3 files changed, 23 insertions(+), 14 deletions(-) diff --git a/.gitignore b/.gitignore index ea8c4bf..d03e6b9 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ /target +Notes.md \ No newline at end of file diff --git a/src/connection.rs b/src/connection.rs index 3bca2a2..aef11cb 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -1,6 +1,7 @@ use crate::crypto::Crypt; use crate::message::{Message, MessageStream}; use anyhow::{anyhow, Result}; +use bytes::Bytes; use futures::{SinkExt, StreamExt}; use tokio::net::TcpStream; @@ -15,24 +16,28 @@ impl Connection { let crypt = Crypt::new(&key); Connection { ms, crypt } } - pub async fn send_msg(&mut self, msg: Message) -> Result<()> { - let msg = msg.serialize()?; - let bytes = self.crypt.encrypt(msg)?; + + async fn send_bytes(&mut self, bytes: Bytes) -> Result<()> { match self.ms.send(bytes).await { Ok(_) => Ok(()), Err(e) => Err(anyhow!(e.to_string())), } } + pub async fn send_msg(&mut self, msg: Message) -> Result<()> { + let msg = msg.serialize()?; + let bytes = self.crypt.encrypt(msg)?; + self.send_bytes(bytes).await + } + pub async fn await_msg(&mut self) -> Result { match self.ms.next().await { Some(Ok(msg)) => { let decrypted_bytes = self.crypt.decrypt(msg.freeze())?; Message::deserialize(decrypted_bytes) } - _ => { - return Err(anyhow!("No response to negotiation message")); - } + Some(Err(e)) => Err(anyhow!(e.to_string())), + None => Err(anyhow!("Error awaiting msg")), } } } diff --git a/src/crypto.rs b/src/crypto.rs index fc78e80..ae66d03 100644 --- a/src/crypto.rs +++ b/src/crypto.rs @@ -8,6 +8,7 @@ use rand::{thread_rng, Rng}; pub struct Crypt { cipher: Aes256Gcm, + arr: [u8; NONCE_SIZE_IN_BYTES], } impl Crypt { @@ -15,13 +16,14 @@ impl Crypt { let key = Key::from_slice(&key[..]); Crypt { cipher: Aes256Gcm::new(key), + arr: [0u8; NONCE_SIZE_IN_BYTES], } } - pub fn encrypt(&self, plaintext: Bytes) -> Result { - let mut arr = [0u8; NONCE_SIZE_IN_BYTES]; - thread_rng().try_fill(&mut arr[..])?; - let nonce = Nonce::from_slice(&arr); + // Returns wire format, includes nonce as prefix + pub fn encrypt(&mut self, plaintext: Bytes) -> Result { + thread_rng().try_fill(&mut self.arr[..])?; + let nonce = Nonce::from_slice(&self.arr); match self.cipher.encrypt(nonce, plaintext.as_ref()) { Ok(body) => { let mut buffer = BytesMut::with_capacity(NONCE_SIZE_IN_BYTES + body.len()); @@ -33,11 +35,12 @@ impl Crypt { } } - pub fn decrypt(&self, body: Bytes) -> Result { - let mut body = body; - let nonce_bytes = body.split_to(NONCE_SIZE_IN_BYTES); + // Accepts wire format, includes nonce as prefix + pub fn decrypt(&self, ciphertext: Bytes) -> Result { + let mut ciphertext_body = ciphertext; + let nonce_bytes = ciphertext_body.split_to(NONCE_SIZE_IN_BYTES); let nonce = Nonce::from_slice(&nonce_bytes); - match self.cipher.decrypt(nonce, body.as_ref()) { + match self.cipher.decrypt(nonce, ciphertext_body.as_ref()) { Ok(payload) => Ok(Bytes::from(payload)), Err(e) => Err(anyhow!(e.to_string())), } From 7544c0fc397c7d62c9358ec2ff530316ac885122 Mon Sep 17 00:00:00 2001 From: Donald Knuth Date: Wed, 31 Aug 2022 14:05:58 -0400 Subject: [PATCH 10/13] Attempt to clean up further; speed up file upload --- Cargo.lock | 35 ++++++++++++++++++++++++++++++++ Cargo.toml | 1 + src/client.rs | 38 +++-------------------------------- src/file.rs | 55 +++++++++++++++++++++++++++++++++++++++++++++++---- src/main.rs | 1 + src/ui.rs | 30 ++++++++++++++++++++++++++++ 6 files changed, 121 insertions(+), 39 deletions(-) create mode 100644 src/ui.rs diff --git a/Cargo.lock b/Cargo.lock index ebb6412..be1143e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,12 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "adler" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" + [[package]] name = "aead" version = "0.4.3" @@ -162,6 +168,15 @@ dependencies = [ "libc", ] +[[package]] +name = "crc32fast" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b540bd8bc810d3885c6ea91e2018302f68baba2129ab3e88f32389ee9370880d" +dependencies = [ + "cfg-if", +] + [[package]] name = "crypto-common" version = "0.1.2" @@ -213,6 +228,16 @@ dependencies = [ "subtle", ] +[[package]] +name = "flate2" +version = "1.0.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f82b0f4c27ad9f8bfd1f3208d882da2b09c301bc1c828fd3a00d0216d2fbbff6" +dependencies = [ + "crc32fast", + "miniz_oxide", +] + [[package]] name = "futures" version = "0.3.21" @@ -439,6 +464,15 @@ version = "2.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "308cc39be01b73d0d18f82a0e7b2a3df85245f84af96fdddc5d202d27e47b86a" +[[package]] +name = "miniz_oxide" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f5c75688da582b8ffc1f1799e9db273f32133c49e048f614d22ec3256773ccc" +dependencies = [ + "adler", +] + [[package]] name = "mio" version = "0.7.14" @@ -666,6 +700,7 @@ dependencies = [ "blake2", "bytes", "clap", + "flate2", "futures", "rand", "serde", diff --git a/Cargo.toml b/Cargo.toml index 7272996..3fba138 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,6 +11,7 @@ blake2 = "0.10.2" bytes = { version = "1", features = ["serde"] } bincode = "1.3.3" clap = { version = "3.0.14", features = ["derive"] } +flate2 = "1.0" futures = { version = "0.3.0", features = ["thread-pool"]} rand = "0.8.4" serde = { version = "1.0", features = ["derive"] } diff --git a/src/client.rs b/src/client.rs index fa1ce0a..a5677a9 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,13 +1,12 @@ use crate::conf::BUFFER_SIZE; use crate::connection::Connection; -use crate::file::{to_size_string, FileHandle, FileInfo}; +use crate::file::{FileHandle, FileInfo}; use crate::handshake::Handshake; use crate::message::{FileNegotiationPayload, FileTransferPayload, Message}; +use crate::ui::prompt_user_input; use anyhow::{anyhow, Result}; use bytes::{Bytes, BytesMut}; -use futures::future::try_join_all; -use futures::prelude::*; use std::collections::HashMap; use std::ffi::OsStr; use std::path::PathBuf; @@ -19,7 +18,7 @@ use tokio_util::codec::{FramedRead, LinesCodec}; pub async fn send(file_paths: &Vec, password: &String) -> Result<()> { // Fail early if there are problems generating file handles - let handles = get_file_handles(file_paths).await?; + let handles = FileHandle::get_file_handles(file_paths).await?; // Establish connection to server let socket = TcpStream::connect("127.0.0.1:8080").await?; @@ -51,14 +50,6 @@ pub async fn receive(password: &String) -> Result<()> { return Ok(()); } -pub async fn get_file_handles(file_paths: &Vec) -> Result> { - let tasks = file_paths - .into_iter() - .map(|path| FileHandle::new(path.to_path_buf())); - let handles = try_join_all(tasks).await?; - Ok(handles) -} - pub async fn negotiate_files_up( conn: &mut Connection, file_handles: Vec, @@ -190,26 +181,3 @@ pub async fn download_file( println!("done receiving messages"); Ok(()) } - -pub async fn prompt_user_input( - stdin: &mut FramedRead, - file_info: &FileInfo, -) -> Option { - let prompt_name = file_info.path.file_name().unwrap(); - println!( - "Accept {:?}? ({:?}). (Y/n)", - prompt_name, - to_size_string(file_info.size) - ); - match stdin.next().await { - Some(Ok(line)) => match line.as_str() { - "" | "Y" | "y" | "yes" | "Yes" | "YES" => Some(true), - "N" | "n" | "NO" | "no" | "No" => Some(false), - _ => { - println!("Invalid input. Please enter one of the following characters: [YyNn]"); - return None; - } - }, - _ => None, - } -} diff --git a/src/file.rs b/src/file.rs index 3e03c82..514ecd1 100644 --- a/src/file.rs +++ b/src/file.rs @@ -1,35 +1,82 @@ use anyhow::Result; + +use futures::future::try_join_all; + use serde::{Deserialize, Serialize}; use std::fs::Metadata; use std::path::PathBuf; + +use flate2::bufread::GzEncoder; +use flate2::Compression; +use std::io::{copy, BufRead, BufReader, Read, Write}; +use std::net::TcpStream; use tokio::fs::File; +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct ChunkHeader { + pub id: u8, + pub start: u64, + pub end: u64, +} + #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct FileInfo { pub path: PathBuf, - pub size: u64, + pub chunk_header: ChunkHeader, } pub struct FileHandle { + pub id: u8, pub file: File, pub md: Metadata, pub path: PathBuf, } +impl Read for FileHandle { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + Ok(2) + } +} + impl FileHandle { - pub async fn new(path: PathBuf) -> Result { + pub async fn new(id: u8, path: PathBuf) -> Result { let file = File::open(&path).await?; let md = file.metadata().await?; - let fh = FileHandle { file, md, path }; + let fh = FileHandle { id, file, md, path }; return Ok(fh); } pub fn to_file_info(&self) -> FileInfo { FileInfo { path: self.path.clone(), - size: self.md.len(), + chunk_header: ChunkHeader { + id: self.id, + start: 0, + end: self.md.len(), + }, } } + + pub async fn get_file_handles(file_paths: &Vec) -> Result> { + let tasks = file_paths + .into_iter() + .enumerate() + .map(|(idx, path)| FileHandle::new(idx.try_into().unwrap(), path.to_path_buf())); + let handles = try_join_all(tasks).await?; + Ok(handles) + } + + async fn count_lines(self, socket: TcpStream) -> Result { + let file = self.file.into_std().await; + let mut socket = socket; + tokio::task::spawn_blocking(move || { + let reader = BufReader::new(file); + let mut gz = GzEncoder::new(reader, Compression::fast()); + copy(&mut gz, &mut socket)?; + Ok(socket) + }) + .await? + } } const SUFFIX: [&'static str; 9] = ["B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB"]; diff --git a/src/main.rs b/src/main.rs index e481fd1..f880896 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,6 +7,7 @@ mod file; mod handshake; mod message; mod server; +mod ui; use clap::Parser; use cli::{Cli, Commands}; diff --git a/src/ui.rs b/src/ui.rs new file mode 100644 index 0000000..2d4f005 --- /dev/null +++ b/src/ui.rs @@ -0,0 +1,30 @@ +use crate::file::{to_size_string, FileInfo}; + +use futures::prelude::*; + +use tokio::io::{self}; + +use tokio_util::codec::{FramedRead, LinesCodec}; + +pub async fn prompt_user_input( + stdin: &mut FramedRead, + file_info: &FileInfo, +) -> Option { + let prompt_name = file_info.path.file_name().unwrap(); + println!( + "Accept {:?}? ({:?}). (Y/n)", + prompt_name, + to_size_string(file_info.chunk_header.end) + ); + match stdin.next().await { + Some(Ok(line)) => match line.as_str() { + "" | "Y" | "y" | "yes" | "Yes" | "YES" => Some(true), + "N" | "n" | "NO" | "no" | "No" => Some(false), + _ => { + println!("Invalid input. Please enter one of the following characters: [YyNn]"); + return None; + } + }, + _ => None, + } +} From e0567d8479736575fa401fac70f8d26d0995afb6 Mon Sep 17 00:00:00 2001 From: Donald Knuth Date: Fri, 2 Sep 2022 16:41:40 -0400 Subject: [PATCH 11/13] Progress --- src/client.rs | 199 +++++++++++++++------------------------------- src/conf.rs | 7 +- src/connection.rs | 69 +++++++++++++++- src/crypto.rs | 10 +-- src/file.rs | 99 +++++++++++++++-------- src/message.rs | 24 +++--- src/ui.rs | 24 +++++- 7 files changed, 241 insertions(+), 191 deletions(-) diff --git a/src/client.rs b/src/client.rs index a5677a9..69bdcac 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,20 +1,16 @@ -use crate::conf::BUFFER_SIZE; use crate::connection::Connection; -use crate::file::{FileHandle, FileInfo}; +use crate::file::{ChunkHeader, FileHandle, FileOffer, StdFileHandle}; use crate::handshake::Handshake; -use crate::message::{FileNegotiationPayload, FileTransferPayload, Message}; -use crate::ui::prompt_user_input; +use crate::message::{FileOfferPayload, FileRequestPayload, Message}; +use crate::ui::prompt_user_for_file_confirmation; use anyhow::{anyhow, Result}; -use bytes::{Bytes, BytesMut}; -use std::collections::HashMap; + use std::ffi::OsStr; use std::path::PathBuf; use tokio::fs::File; -use tokio::io::{self, AsyncReadExt, AsyncWriteExt}; + use tokio::net::TcpStream; -use tokio::sync::mpsc; -use tokio_util::codec::{FramedRead, LinesCodec}; pub async fn send(file_paths: &Vec, password: &String) -> Result<()> { // Fail early if there are problems generating file handles @@ -28,11 +24,12 @@ pub async fn send(file_paths: &Vec, password: &String) -> Result<()> { let (socket, key) = handshake.negotiate(socket, s1).await?; let mut connection = Connection::new(socket, key); - // Complete file negotiation - let handles = negotiate_files_up(&mut connection, handles).await?; + // Offer files, wait for requested file response + let requested_chunks = offer_files(&mut connection, &handles).await?; // Upload negotiated files - upload_encrypted_files(&mut connection, handles).await?; + let std_file_handles = FileHandle::to_stds(handles, requested_chunks).await; + connection.upload_files(std_file_handles).await?; println!("Done uploading."); // Exit @@ -40,144 +37,76 @@ pub async fn send(file_paths: &Vec, password: &String) -> Result<()> { } pub async fn receive(password: &String) -> Result<()> { + // Establish connection to server let socket = TcpStream::connect("127.0.0.1:8080").await?; let (handshake, s1) = Handshake::from_password(password); + // Complete handshake, returning key used for encryption let (socket, key) = handshake.negotiate(socket, s1).await?; let mut connection = Connection::new(socket, key); - let files = negotiate_files_down(&mut connection).await?; - - download_files(&mut connection, files).await?; + // Wait for offered files, respond with desired files + let desired_files = request_specific_files(&mut connection).await?; + // Create files + let std_file_handles = create_files(desired_files).await?; + // Download them + connection.download_files(std_file_handles).await?; return Ok(()); } -pub async fn negotiate_files_up( +pub async fn offer_files( conn: &mut Connection, - file_handles: Vec, -) -> Result> { - let files = file_handles.iter().map(|fh| fh.to_file_info()).collect(); - let msg = Message::FileNegotiationMessage(FileNegotiationPayload { files }); + file_handles: &Vec, +) -> Result> { + // Collect file offer + let files = file_handles.iter().map(|fh| fh.to_file_offer()).collect(); + let msg = Message::FileOffer(FileOfferPayload { files }); + // Send file offer conn.send_msg(msg).await?; + // Wait for reply let reply = conn.await_msg().await?; - let requested_paths: Vec = match reply { - Message::FileNegotiationMessage(fnm) => fnm.files.into_iter().map(|f| f.path).collect(), - _ => return Err(anyhow!("Expecting file negotiation message back")), - }; - Ok(file_handles - .into_iter() - .filter(|fh| requested_paths.contains(&fh.path)) - .collect()) + // Return requested chunks + match reply { + Message::FileRequest(file_request_payload) => Ok(file_request_payload.chunks), + _ => Err(anyhow!("Expecting file request message back")), + } } -pub async fn negotiate_files_down(conn: &mut Connection) -> Result> { - let offer = conn.await_msg().await?; - let requested_infos: Vec = match offer { - Message::FileNegotiationMessage(fnm) => fnm.files, - _ => return Err(anyhow!("Expecting file negotiation message back")), +pub async fn request_specific_files(conn: &mut Connection) -> Result> { + // Wait for offer message + let offer_message = conn.await_msg().await?; + let offered_files: Vec = match offer_message { + Message::FileOffer(file_offer_payload) => file_offer_payload.files, + _ => return Err(anyhow!("Expecting file offer message")), }; - let mut stdin = FramedRead::new(io::stdin(), LinesCodec::new()); - let mut files = vec![]; - for file_info in requested_infos.into_iter() { - let mut reply = prompt_user_input(&mut stdin, &file_info).await; - while reply.is_none() { - reply = prompt_user_input(&mut stdin, &file_info).await; - } - match reply { - Some(true) => files.push(file_info), - _ => {} - } - } - let msg = Message::FileNegotiationMessage(FileNegotiationPayload { - files: files.clone(), + // Prompt user for confirmation of files + let desired_files = prompt_user_for_file_confirmation(offered_files).await; + let file_request_msg = Message::FileRequest(FileRequestPayload { + chunks: desired_files + .iter() + .map(|file| ChunkHeader { + id: file.id, + start: 0, + }) + .collect(), }); - conn.send_msg(msg).await?; - Ok(files) + conn.send_msg(file_request_msg).await?; + Ok(desired_files) } -pub async fn upload_encrypted_files(conn: &mut Connection, handles: Vec) -> Result<()> { - for mut handle in handles { - enqueue_file_chunks(conn, &mut handle).await?; +pub async fn create_files(desired_files: Vec) -> Result> { + let mut v = Vec::new(); + for desired_file in desired_files { + let filename = desired_file + .path + .file_name() + .unwrap_or(OsStr::new("random.txt")); + let file = File::create(filename).await?; + let std_file = file.into_std().await; + let std_file_handle = StdFileHandle { + id: desired_file.id, + file: std_file, + start: 0, + }; + v.push(std_file_handle) } - println!("Files uploaded."); - Ok(()) -} - -pub async fn enqueue_file_chunks(conn: &mut Connection, fh: &mut FileHandle) -> Result<()> { - let mut chunk_num = 0; - let mut bytes_read = 1; - while bytes_read != 0 { - let mut buf = BytesMut::with_capacity(BUFFER_SIZE); - bytes_read = fh.file.read_buf(&mut buf).await?; - // println!("Bytes_read: {:?}, The bytes: {:?}", bytes_read, &buf[..]); - if bytes_read != 0 { - let chunk = buf.freeze(); - let file_info = fh.to_file_info(); - let ftp = Message::FileTransferMessage(FileTransferPayload { - chunk, - chunk_num, - file_info, - }); - conn.send_msg(ftp).await?; - chunk_num += 1; - } - } - - Ok(()) -} - -pub async fn download_files(conn: &mut Connection, file_infos: Vec) -> Result<()> { - // for each file_info - let mut info_handles: HashMap> = HashMap::new(); - for fi in file_infos { - let (tx, rx) = mpsc::unbounded_channel::<(u64, Bytes)>(); - let path = fi.path.clone(); - tokio::spawn(async move { download_file(fi, rx).await }); - info_handles.insert(path, tx); - } - loop { - tokio::select! { - result = conn.await_msg() => match result { - Ok(msg) => { - match msg { - Message::FileTransferMessage(payload) => { - if let Some(tx) = info_handles.get(&payload.file_info.path) { - tx.send((payload.chunk_num, payload.chunk))? - } - }, - _ => { - println!("Wrong message type"); - return Err(anyhow!("wrong message type")); - } - } - }, - Err(e) => return Err(anyhow!(e.to_string())), - } - } - } -} - -pub async fn download_file( - file_info: FileInfo, - rx: mpsc::UnboundedReceiver<(u64, Bytes)>, -) -> Result<()> { - println!("in download file"); - let mut rx = rx; - let filename = match file_info.path.file_name() { - Some(f) => { - println!("matched filename"); - f - } - None => { - println!("didnt match filename"); - OsStr::new("random.txt") - } - }; - println!("trying to create file...filename={:?}", filename); - let mut file = File::create(filename).await?; - println!("file created ok! filename={:?}", filename); - while let Some((_chunk_num, chunk)) = rx.recv().await { - // println!("rx got message! chunk={:?}", chunk); - file.write_all(&chunk).await?; - } - println!("done receiving messages"); - Ok(()) + return Ok(v); } diff --git a/src/conf.rs b/src/conf.rs index 6fb5d0e..0d007f0 100644 --- a/src/conf.rs +++ b/src/conf.rs @@ -1,5 +1,6 @@ pub const ID_SIZE: usize = 32; // Blake256 of password pub const HANDSHAKE_MSG_SIZE: usize = 33; // generated by Spake2 -pub const PER_CLIENT_BUFFER: usize = 1024 * 1024; // buffer size allocated by server for each client -pub const BUFFER_SIZE: usize = 1024 * 1024 * 1024; // chunk size for files sent over wire -pub const NONCE_SIZE_IN_BYTES: usize = 96 / 8; // used for every encrypted message +pub const PER_CLIENT_BUFFER: usize = 1024 * 64; // buffer size allocated by server for each client +pub const BUFFER_SIZE: usize = 1024 * 64; // chunk size for files sent over wire +pub const NONCE_SIZE: usize = 96 / 8; // used for every encrypted message +pub const CHUNK_HEADER_SIZE: usize = 10; // used for every chunk header diff --git a/src/connection.rs b/src/connection.rs index aef11cb..361141c 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -1,10 +1,17 @@ +use crate::conf::{BUFFER_SIZE, CHUNK_HEADER_SIZE}; use crate::crypto::Crypt; +use crate::file::{ChunkHeader, FileHandle, StdFileHandle}; use crate::message::{Message, MessageStream}; use anyhow::{anyhow, Result}; -use bytes::Bytes; + use futures::{SinkExt, StreamExt}; use tokio::net::TcpStream; +use bytes::{Bytes, BytesMut}; +use flate2::write::{GzDecoder, GzEncoder}; +use flate2::Compression; +use std::io::{Read, Write}; + pub struct Connection { ms: MessageStream, crypt: Crypt, @@ -17,7 +24,7 @@ impl Connection { Connection { ms, crypt } } - async fn send_bytes(&mut self, bytes: Bytes) -> Result<()> { + pub async fn send_bytes(&mut self, bytes: Bytes) -> Result<()> { match self.ms.send(bytes).await { Ok(_) => Ok(()), Err(e) => Err(anyhow!(e.to_string())), @@ -40,4 +47,62 @@ impl Connection { None => Err(anyhow!("Error awaiting msg")), } } + + pub async fn upload_files(mut self, handles: Vec) -> Result<()> { + let mut socket = self.ms.into_inner().into_std()?; + tokio::task::spawn_blocking(move || { + for mut handle in handles { + let mut buffer = BytesMut::with_capacity(BUFFER_SIZE); + let mut start = handle.start; + loop { + let end = + FileHandle::to_message(handle.id, &mut handle.file, &mut buffer, start)?; + let mut compressor = GzEncoder::new(Vec::new(), Compression::fast()); + compressor.write_all(&buffer[..])?; + let compressed_bytes = compressor.finish()?; + let encrypted_bytes = self.crypt.encrypt(Bytes::from(compressed_bytes))?; + start = end; + socket.write(&encrypted_bytes[..])?; + if end == 0 { + break; + } + } + } + Ok(()) + }) + .await? + } + + pub async fn download_files(mut self, handles: Vec) -> Result<()> { + let mut socket = self.ms.into_inner().into_std()?; + tokio::task::spawn_blocking(move || { + for mut handle in handles { + let mut buffer = BytesMut::with_capacity(BUFFER_SIZE); + let mut start = handle.start; + loop { + // read bytes + match socket.read(&mut buffer) { + Ok(0) => { + break; + } + Ok(n) => { + let decrypted_bytes = + self.crypt.decrypt(Bytes::from(&mut buffer[0..n]))?; + let mut writer = Vec::new(); + let mut decompressor = GzDecoder::new(writer); + decompressor.write_all(&decrypted_bytes[..])?; + decompressor.try_finish()?; + writer = decompressor.finish()?; + let chunk_header: ChunkHeader = + bincode::deserialize(&writer[..CHUNK_HEADER_SIZE])?; + handle.file.write_all(&writer) + } + Err(e) => return Err(anyhow!(e.to_string())), + }; + } + } + Ok(()) + }) + .await? + } } diff --git a/src/crypto.rs b/src/crypto.rs index ae66d03..e95b5fa 100644 --- a/src/crypto.rs +++ b/src/crypto.rs @@ -1,4 +1,4 @@ -use crate::conf::NONCE_SIZE_IN_BYTES; +use crate::conf::NONCE_SIZE; use aes_gcm::aead::{Aead, NewAead}; use aes_gcm::{Aes256Gcm, Key, Nonce}; // Or `Aes128Gcm` use anyhow::{anyhow, Result}; @@ -8,7 +8,7 @@ use rand::{thread_rng, Rng}; pub struct Crypt { cipher: Aes256Gcm, - arr: [u8; NONCE_SIZE_IN_BYTES], + arr: [u8; NONCE_SIZE], } impl Crypt { @@ -16,7 +16,7 @@ impl Crypt { let key = Key::from_slice(&key[..]); Crypt { cipher: Aes256Gcm::new(key), - arr: [0u8; NONCE_SIZE_IN_BYTES], + arr: [0u8; NONCE_SIZE], } } @@ -26,7 +26,7 @@ impl Crypt { let nonce = Nonce::from_slice(&self.arr); match self.cipher.encrypt(nonce, plaintext.as_ref()) { Ok(body) => { - let mut buffer = BytesMut::with_capacity(NONCE_SIZE_IN_BYTES + body.len()); + let mut buffer = BytesMut::with_capacity(NONCE_SIZE + body.len()); buffer.extend_from_slice(nonce); buffer.extend_from_slice(&body); Ok(buffer.freeze()) @@ -38,7 +38,7 @@ impl Crypt { // Accepts wire format, includes nonce as prefix pub fn decrypt(&self, ciphertext: Bytes) -> Result { let mut ciphertext_body = ciphertext; - let nonce_bytes = ciphertext_body.split_to(NONCE_SIZE_IN_BYTES); + let nonce_bytes = ciphertext_body.split_to(NONCE_SIZE); let nonce = Nonce::from_slice(&nonce_bytes); match self.cipher.decrypt(nonce, ciphertext_body.as_ref()) { Ok(payload) => Ok(Bytes::from(payload)), diff --git a/src/file.rs b/src/file.rs index 514ecd1..4967cd4 100644 --- a/src/file.rs +++ b/src/file.rs @@ -1,28 +1,32 @@ use anyhow::Result; - use futures::future::try_join_all; use serde::{Deserialize, Serialize}; use std::fs::Metadata; use std::path::PathBuf; -use flate2::bufread::GzEncoder; -use flate2::Compression; -use std::io::{copy, BufRead, BufReader, Read, Write}; -use std::net::TcpStream; +use bytes::BytesMut; +use std::io::{Read, Seek, SeekFrom}; + use tokio::fs::File; #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct ChunkHeader { pub id: u8, pub start: u64, - pub end: u64, } #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] -pub struct FileInfo { +pub struct FileOffer { + pub id: u8, pub path: PathBuf, - pub chunk_header: ChunkHeader, + pub size: u64, +} + +pub struct StdFileHandle { + pub id: u8, + pub file: std::fs::File, + pub start: u64, } pub struct FileHandle { @@ -32,12 +36,6 @@ pub struct FileHandle { pub path: PathBuf, } -impl Read for FileHandle { - fn read(&mut self, buf: &mut [u8]) -> std::io::Result { - Ok(2) - } -} - impl FileHandle { pub async fn new(id: u8, path: PathBuf) -> Result { let file = File::open(&path).await?; @@ -46,14 +44,45 @@ impl FileHandle { return Ok(fh); } - pub fn to_file_info(&self) -> FileInfo { - FileInfo { + pub async fn to_stds( + file_handles: Vec, + chunk_headers: Vec, + ) -> Vec { + let mut ret = Vec::new(); + for handle in file_handles { + let chunk = chunk_headers.iter().find(|chunk| handle.id == chunk.id); + match chunk { + Some(chunk) => { + match handle.to_std(chunk).await { + Ok(std_file_handle) => { + ret.push(std_file_handle); + } + _ => println!("Error seeking in file"), + }; + } + None => { + println!("Skipping file b/c not in requested chunks"); + } + } + } + ret + } + + async fn to_std(self, chunk_header: &ChunkHeader) -> Result { + let mut std_file = self.file.into_std().await; + std_file.seek(SeekFrom::Start(chunk_header.start))?; + Ok(StdFileHandle { + id: self.id, + file: std_file, + start: chunk_header.start, + }) + } + + pub fn to_file_offer(&self) -> FileOffer { + FileOffer { + id: self.id, path: self.path.clone(), - chunk_header: ChunkHeader { - id: self.id, - start: 0, - end: self.md.len(), - }, + size: self.md.len(), } } @@ -66,16 +95,24 @@ impl FileHandle { Ok(handles) } - async fn count_lines(self, socket: TcpStream) -> Result { - let file = self.file.into_std().await; - let mut socket = socket; - tokio::task::spawn_blocking(move || { - let reader = BufReader::new(file); - let mut gz = GzEncoder::new(reader, Compression::fast()); - copy(&mut gz, &mut socket)?; - Ok(socket) - }) - .await? + pub fn to_message( + id: u8, + file: &mut std::fs::File, + buffer: &mut BytesMut, + start: u64, + ) -> Result { + // reads the next chunk of the file + // packs it into the buffer, with the header taking up the first X bytes + let chunk_header = ChunkHeader { id, start }; + let chunk_bytes = bincode::serialize(&chunk_header)?; + println!( + "chunk_bytes = {:?}, len = {:?}", + chunk_bytes.clone(), + chunk_bytes.len() + ); + buffer.extend_from_slice(&chunk_bytes[..]); + let n = file.read(buffer)? as u64; + Ok(n) } } diff --git a/src/message.rs b/src/message.rs index 5abf618..0e27365 100644 --- a/src/message.rs +++ b/src/message.rs @@ -1,4 +1,4 @@ -use crate::file::FileInfo; +use crate::file::{ChunkHeader, FileOffer}; use anyhow::{anyhow, Result}; use bytes::Bytes; @@ -8,28 +8,30 @@ use tokio_util::codec::{Framed, LengthDelimitedCodec}; #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub enum Message { - FileNegotiationMessage(FileNegotiationPayload), - FileTransferMessage(FileTransferPayload), + FileOffer(FileOfferPayload), + FileRequest(FileRequestPayload), + FileTransfer(FileTransferPayload), } #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] -pub struct FileNegotiationPayload { - pub files: Vec, +pub struct FileRequestPayload { + pub chunks: Vec, +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct FileOfferPayload { + pub files: Vec, } #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct FileTransferPayload { - pub file_info: FileInfo, - pub chunk_num: u64, + pub chunk_header: ChunkHeader, pub chunk: Bytes, } impl Message { pub fn serialize(&self) -> Result { - match bincode::serialize(&self) { - Ok(vec) => Ok(Bytes::from(vec)), - Err(e) => Err(anyhow!(e.to_string())), - } + bincode::serialize(&self).map(|vec| Ok(Bytes::from(vec)))? } pub fn deserialize(bytes: Bytes) -> Result { match bincode::deserialize(bytes.as_ref()) { diff --git a/src/ui.rs b/src/ui.rs index 2d4f005..9b94a41 100644 --- a/src/ui.rs +++ b/src/ui.rs @@ -1,4 +1,4 @@ -use crate::file::{to_size_string, FileInfo}; +use crate::file::{to_size_string, FileOffer}; use futures::prelude::*; @@ -6,15 +6,31 @@ use tokio::io::{self}; use tokio_util::codec::{FramedRead, LinesCodec}; +pub async fn prompt_user_for_file_confirmation(file_offers: Vec) -> Vec { + let mut stdin = FramedRead::new(io::stdin(), LinesCodec::new()); + let mut files = vec![]; + for file_offer in file_offers.into_iter() { + let mut reply = prompt_user_input(&mut stdin, &file_offer).await; + while reply.is_none() { + reply = prompt_user_input(&mut stdin, &file_offer).await; + } + match reply { + Some(true) => files.push(file_offer), + _ => {} + } + } + files +} + pub async fn prompt_user_input( stdin: &mut FramedRead, - file_info: &FileInfo, + file_offer: &FileOffer, ) -> Option { - let prompt_name = file_info.path.file_name().unwrap(); + let prompt_name = file_offer.path.file_name().unwrap(); println!( "Accept {:?}? ({:?}). (Y/n)", prompt_name, - to_size_string(file_info.chunk_header.end) + to_size_string(file_offer.size) ); match stdin.next().await { Some(Ok(line)) => match line.as_str() { From 6bc011c910ab5298d1827685fbdd029b22da3e95 Mon Sep 17 00:00:00 2001 From: Donald Knuth Date: Fri, 2 Sep 2022 20:29:18 -0400 Subject: [PATCH 12/13] Back to parity --- src/client.rs | 7 +-- src/conf.rs | 1 - src/connection.rs | 112 ++++++++++++++++++++++++---------------------- src/file.rs | 43 ++++++------------ 4 files changed, 73 insertions(+), 90 deletions(-) diff --git a/src/client.rs b/src/client.rs index 69bdcac..61e6fc2 100644 --- a/src/client.rs +++ b/src/client.rs @@ -100,12 +100,7 @@ pub async fn create_files(desired_files: Vec) -> Result) -> Result<()> { - let mut socket = self.ms.into_inner().into_std()?; - tokio::task::spawn_blocking(move || { - for mut handle in handles { - let mut buffer = BytesMut::with_capacity(BUFFER_SIZE); - let mut start = handle.start; - loop { - let end = - FileHandle::to_message(handle.id, &mut handle.file, &mut buffer, start)?; - let mut compressor = GzEncoder::new(Vec::new(), Compression::fast()); - compressor.write_all(&buffer[..])?; - let compressed_bytes = compressor.finish()?; - let encrypted_bytes = self.crypt.encrypt(Bytes::from(compressed_bytes))?; - start = end; - socket.write(&encrypted_bytes[..])?; - if end == 0 { - break; - } + pub async fn upload_file(&mut self, handle: StdFileHandle) -> Result<()> { + let mut buffer = [0; BUFFER_SIZE]; + let reader = BufReader::new(handle.file); + let mut gz = GzEncoder::new(reader, Compression::fast()); + loop { + match gz.read(&mut buffer) { + Ok(0) => { + break; } + Ok(n) => { + let message = Message::FileTransfer(FileTransferPayload { + chunk: BytesMut::from(&buffer[..n]).freeze(), + chunk_header: ChunkHeader { + id: handle.id, + start: 0, + }, + }); + self.send_msg(message).await?; + } + Err(e) => return Err(anyhow!(e.to_string())), } - Ok(()) - }) - .await? + } + Ok(()) + } + + pub async fn upload_files(mut self, handles: Vec) -> Result<()> { + for handle in handles { + self.upload_file(handle).await?; + } + Ok(()) } pub async fn download_files(mut self, handles: Vec) -> Result<()> { - let mut socket = self.ms.into_inner().into_std()?; - tokio::task::spawn_blocking(move || { - for mut handle in handles { - let mut buffer = BytesMut::with_capacity(BUFFER_SIZE); - let mut start = handle.start; - loop { - // read bytes - match socket.read(&mut buffer) { - Ok(0) => { - break; - } - Ok(n) => { - let decrypted_bytes = - self.crypt.decrypt(Bytes::from(&mut buffer[0..n]))?; - let mut writer = Vec::new(); - let mut decompressor = GzDecoder::new(writer); - decompressor.write_all(&decrypted_bytes[..])?; - decompressor.try_finish()?; - writer = decompressor.finish()?; - let chunk_header: ChunkHeader = - bincode::deserialize(&writer[..CHUNK_HEADER_SIZE])?; - handle.file.write_all(&writer) - } - Err(e) => return Err(anyhow!(e.to_string())), - }; + for handle in handles { + self.download_file(handle).await?; + } + Ok(()) + } + + pub async fn download_file(&mut self, handle: StdFileHandle) -> Result<()> { + let mut decoder = GzDecoder::new(handle.file); + loop { + let msg = self.await_msg().await?; + match msg { + Message::FileTransfer(payload) => { + if payload.chunk_header.id != handle.id { + return Err(anyhow!("Wrong file")); + } + if payload.chunk.len() == 0 { + break; + } + decoder.write_all(&payload.chunk[..])? } + _ => return Err(anyhow!("Expecting file transfer message")), } - Ok(()) - }) - .await? + } + decoder.finish()?; + println!("Done downloading file."); + Ok(()) } } diff --git a/src/file.rs b/src/file.rs index 4967cd4..c63e094 100644 --- a/src/file.rs +++ b/src/file.rs @@ -5,8 +5,7 @@ use serde::{Deserialize, Serialize}; use std::fs::Metadata; use std::path::PathBuf; -use bytes::BytesMut; -use std::io::{Read, Seek, SeekFrom}; +use std::io::{Seek, SeekFrom}; use tokio::fs::File; @@ -29,6 +28,18 @@ pub struct StdFileHandle { pub start: u64, } +impl StdFileHandle { + pub async fn new(id: u8, file: File, start: u64) -> Result { + let mut std_file = file.into_std().await; + std_file.seek(SeekFrom::Start(start))?; + Ok(StdFileHandle { + id: id, + file: std_file, + start: start, + }) + } +} + pub struct FileHandle { pub id: u8, pub file: File, @@ -69,13 +80,7 @@ impl FileHandle { } async fn to_std(self, chunk_header: &ChunkHeader) -> Result { - let mut std_file = self.file.into_std().await; - std_file.seek(SeekFrom::Start(chunk_header.start))?; - Ok(StdFileHandle { - id: self.id, - file: std_file, - start: chunk_header.start, - }) + StdFileHandle::new(self.id, self.file, chunk_header.start).await } pub fn to_file_offer(&self) -> FileOffer { @@ -94,26 +99,6 @@ impl FileHandle { let handles = try_join_all(tasks).await?; Ok(handles) } - - pub fn to_message( - id: u8, - file: &mut std::fs::File, - buffer: &mut BytesMut, - start: u64, - ) -> Result { - // reads the next chunk of the file - // packs it into the buffer, with the header taking up the first X bytes - let chunk_header = ChunkHeader { id, start }; - let chunk_bytes = bincode::serialize(&chunk_header)?; - println!( - "chunk_bytes = {:?}, len = {:?}", - chunk_bytes.clone(), - chunk_bytes.len() - ); - buffer.extend_from_slice(&chunk_bytes[..]); - let n = file.read(buffer)? as u64; - Ok(n) - } } const SUFFIX: [&'static str; 9] = ["B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB"]; From bb32cf8686fe570d1fbd870f99ba39175e5117a0 Mon Sep 17 00:00:00 2001 From: Donald Knuth Date: Fri, 2 Sep 2022 22:51:32 -0400 Subject: [PATCH 13/13] Add some benchmark for upload --- src/connection.rs | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/src/connection.rs b/src/connection.rs index fdaac43..80f5df4 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -12,6 +12,7 @@ use flate2::bufread::GzEncoder; use flate2::write::GzDecoder; use flate2::Compression; use std::io::{BufReader, Read, Write}; +use std::time::Instant; pub struct Connection { ms: MessageStream, @@ -53,12 +54,17 @@ impl Connection { let mut buffer = [0; BUFFER_SIZE]; let reader = BufReader::new(handle.file); let mut gz = GzEncoder::new(reader, Compression::fast()); + let before = Instant::now(); + let mut count = 0; + let mut bytes_sent: u64 = 0; loop { + count += 1; match gz.read(&mut buffer) { Ok(0) => { break; } Ok(n) => { + bytes_sent += n as u64; let message = Message::FileTransfer(FileTransferPayload { chunk: BytesMut::from(&buffer[..n]).freeze(), chunk_header: ChunkHeader { @@ -71,6 +77,16 @@ impl Connection { Err(e) => return Err(anyhow!(e.to_string())), } } + let elapsed = before.elapsed(); + let mb_sent = bytes_sent / 1_048_576; + println!( + "{:?} mb sent, {:?} iterations. {:?} total time, {:?} avg per iteration, {:?} avg mb/sec", + mb_sent, + count, + elapsed, + elapsed / count, + mb_sent / elapsed.as_secs() + ); Ok(()) }