diff --git a/.gitignore b/.gitignore index ea8c4bf..d03e6b9 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ /target +Notes.md \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 7d59848..be1143e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,12 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "adler" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" + [[package]] name = "aead" version = "0.4.3" @@ -162,6 +168,15 @@ dependencies = [ "libc", ] +[[package]] +name = "crc32fast" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b540bd8bc810d3885c6ea91e2018302f68baba2129ab3e88f32389ee9370880d" +dependencies = [ + "cfg-if", +] + [[package]] name = "crypto-common" version = "0.1.2" @@ -214,28 +229,13 @@ dependencies = [ ] [[package]] -name = "educe" -version = "0.4.18" +name = "flate2" +version = "1.0.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f86b50932a01e7ec5c06160492ab660fb19b6bb2a7878030dd6cd68d21df9d4d" +checksum = "f82b0f4c27ad9f8bfd1f3208d882da2b09c301bc1c828fd3a00d0216d2fbbff6" dependencies = [ - "enum-ordinalize", - "proc-macro2", - "quote", - "syn", -] - -[[package]] -name = "enum-ordinalize" -version = "3.1.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b166c9e378360dd5a6666a9604bb4f54ae0cac39023ffbac425e917a2a04fef" -dependencies = [ - "num-bigint", - "num-traits", - "proc-macro2", - "quote", - "syn", + "crc32fast", + "miniz_oxide", ] [[package]] @@ -464,6 +464,15 @@ version = "2.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "308cc39be01b73d0d18f82a0e7b2a3df85245f84af96fdddc5d202d27e47b86a" +[[package]] +name = "miniz_oxide" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f5c75688da582b8ffc1f1799e9db273f32133c49e048f614d22ec3256773ccc" +dependencies = [ + "adler", +] + [[package]] name = "mio" version = "0.7.14" @@ -495,36 +504,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "num-bigint" -version = "0.4.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f93ab6289c7b344a8a9f60f88d80aa20032336fe78da341afc91c8a2341fc75f" -dependencies = [ - "autocfg", - "num-integer", - "num-traits", -] - -[[package]] -name = "num-integer" -version = "0.1.44" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d2cc698a63b549a70bc047073d2949cce27cd1c7b0a4a862d08a8031bc2801db" -dependencies = [ - "autocfg", - "num-traits", -] - -[[package]] -name = "num-traits" -version = "0.2.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a64b1ec5cda2586e284722486d802acf1f7dbdc623e2bfc57e65ca1cd099290" -dependencies = [ - "autocfg", -] - [[package]] name = "num_cpus" version = "1.13.1" @@ -581,26 +560,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "pin-project" -version = "1.0.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "58ad3879ad3baf4e44784bc6a718a8698867bb991f8ce24d1bcbe2cfb4c3a75e" -dependencies = [ - "pin-project-internal", -] - -[[package]] -name = "pin-project-internal" -version = "1.0.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "744b6f092ba29c3650faf274db506afd39944f48420f6c86b17cfe0ee1cb36bb" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "pin-project-lite" version = "0.2.8" @@ -741,13 +700,12 @@ dependencies = [ "blake2", "bytes", "clap", + "flate2", "futures", "rand", "serde", "spake2", "tokio", - "tokio-serde", - "tokio-stream", "tokio-util", ] @@ -889,32 +847,6 @@ dependencies = [ "syn", ] -[[package]] -name = "tokio-serde" -version = "0.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "911a61637386b789af998ee23f50aa30d5fd7edcec8d6d3dedae5e5815205466" -dependencies = [ - "bincode", - "bytes", - "educe", - "futures-core", - "futures-sink", - "pin-project", - "serde", -] - -[[package]] -name = "tokio-stream" -version = "0.1.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50145484efff8818b5ccd256697f36863f587da82cf8b409c53adf1e840798e3" -dependencies = [ - "futures-core", - "pin-project-lite", - "tokio", -] - [[package]] name = "tokio-util" version = "0.6.9" diff --git a/Cargo.toml b/Cargo.toml index 8d71e61..3fba138 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,11 +11,10 @@ blake2 = "0.10.2" bytes = { version = "1", features = ["serde"] } bincode = "1.3.3" clap = { version = "3.0.14", features = ["derive"] } +flate2 = "1.0" futures = { version = "0.3.0", features = ["thread-pool"]} rand = "0.8.4" serde = { version = "1.0", features = ["derive"] } spake2 = "0.3.1" tokio = { version = "1.16.1", features = ["full"] } -tokio-serde = { version = "~0.8", features = ["bincode"] } -tokio-stream = { version = "~0.1.6", features = ["net"]} tokio-util = { version = "0.6.3", features = ["full"]} \ No newline at end of file diff --git a/README.md b/README.md index 1465e2c..97d7408 100644 --- a/README.md +++ b/README.md @@ -1,30 +1,35 @@ -- https://github.com/mcginty/snow/tree/master/examples -- https://github.com/black-binary/snowstorm/tree/master/examples -- https://cryptoservices.github.io/cryptography/protocols/2016/04/27/noise-protocol.html +# ruck -- https://github.com/libp2p +`ruck` is a command line tool used for hosting relay servers and sending end-to-end encrypted files between clients. It was heavily inspired by [croc](https://github.com/schollz/croc), one of the easiest ways to send files between peers. This document describes the protocol `ruck` uses to support this functionality. -- https://docs.rs/spake2/latest/spake2/ -- https://crates.io/crates/chacha20poly1305 -- https://briansmith.org/rustdoc/ring/aead/index.html -- https://libsodium.gitbook.io/doc/secret-key_cryptography/secretstream +### Version -- https://kerkour.com/rust-symmetric-encryption-aead-benchmark/ -- https://docs.rs/aes-gcm/latest/aes_gcm/ +This document refers to version `0.1.0` of `ruck` as defined by the `Cargo.toml` file. -- https://github.com/rust-lang/flate2-rs -- https://crates.io/crates/async-compression +## Server -- https://pdos.csail.mit.edu/papers/chord:sigcomm01/chord_sigcomm.pdf -- https://pdos.csail.mit.edu/6.824/schedule.html -- https://flylib.com/books/en/2.292.1.105/1/ -- https://en.wikipedia.org/wiki/Two_Generals%27_Problem +The server in `ruck` exposes a TCP port. +Its only functions are to staple connections and shuttle bytes between stapled connections. +The first 32 bytes sent over the wire from a new client are used as its unique identifier. +When a new client joins, if the server has another open connection with the same identifier, the connections are then stapled. +The clients have some mechanism for agreeing on these identifiers, however, from the server's perspective it doesn't matter how they agree. -- https://kobzol.github.io/rust/ci/2021/05/07/building-rust-binaries-in-ci-that-work-with-older-glibc.html +Once the connection is stapled, all bytes are piped across until a client disconnects or times out. +The time out is set to remove idle connections. +The server does nothing else with the bytes, so the clients are free to end-to-end encrypt their messages. +For this reason, updates to the `ruck` protocol do not typically necessitate server redeployments. -## Todo +## Client -- [ ] Use tracing -- [ ] Add progress bars -- [ ] Exit happily when transfer is complete -- [ ] Compress files +There are two types of clients - `send` and `receive` clients. +Out of band, the clients agree on a relay server and password, from which they can derive the 32 byte identifier used by the server to staple their connections. +Clients have the option of using the single-use, automatically generated passwords which `ruck` supplies by default. +Using the passwords per the [Spake2](https://docs.rs/spake2/0.3.1/spake2/) handshake algorithm, clients generate a symmetric key with which to encrypt their subsequent messages. +Once the handshake is complete, `send` and `receive` negotiate and exchange files per the following: + +- `send` offers a list of files and waits. +- `receive` specifies which bytes it wants from these files. +- `send` sends the specified bytes and waits. +- `receive` sends heartbeats with progress updates. +- `send` hangs up once the heartbeats stop or received a successful heartbeat. +- `receive` hangs up once the downloads are complete. diff --git a/src/client.rs b/src/client.rs index 60c4da0..61e6fc2 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,295 +1,107 @@ -use crate::conf::BUFFER_SIZE; -use crate::crypto::handshake; -use crate::file::{to_size_string, FileHandle, FileInfo}; -use crate::message::{ - EncryptedMessage, FileNegotiationPayload, FileTransferPayload, Message, MessageStream, -}; +use crate::connection::Connection; +use crate::file::{ChunkHeader, FileHandle, FileOffer, StdFileHandle}; +use crate::handshake::Handshake; +use crate::message::{FileOfferPayload, FileRequestPayload, Message}; +use crate::ui::prompt_user_for_file_confirmation; -use aes_gcm::Aes256Gcm; use anyhow::{anyhow, Result}; -use blake2::{Blake2s256, Digest}; -use bytes::{Bytes, BytesMut}; -use futures::future::try_join_all; -use futures::prelude::*; -use std::collections::HashMap; + use std::ffi::OsStr; use std::path::PathBuf; use tokio::fs::File; -use tokio::io::{self, AsyncReadExt, AsyncWriteExt}; -use tokio::net::TcpStream; -use tokio::sync::mpsc; -use tokio_util::codec::{FramedRead, LinesCodec}; -fn pass_to_bytes(password: &String) -> Bytes { - let mut hasher = Blake2s256::new(); - hasher.update(password.as_bytes()); - let res = hasher.finalize(); - BytesMut::from(&res[..]).freeze() -} +use tokio::net::TcpStream; pub async fn send(file_paths: &Vec, password: &String) -> Result<()> { // Fail early if there are problems generating file handles - let handles = get_file_handles(file_paths).await?; + let handles = FileHandle::get_file_handles(file_paths).await?; // Establish connection to server let socket = TcpStream::connect("127.0.0.1:8080").await?; - let mut stream = Message::to_stream(socket); - // Complete handshake, returning cipher used for encryption - let (stream, cipher) = handshake( - &mut stream, - Bytes::from(password.to_string()), - pass_to_bytes(password), - ) - .await?; + let (handshake, s1) = Handshake::from_password(password); + // Complete handshake, returning key used for encryption + let (socket, key) = handshake.negotiate(socket, s1).await?; - // Complete file negotiation - let handles = negotiate_files_up(handles, stream, &cipher).await?; + let mut connection = Connection::new(socket, key); + // Offer files, wait for requested file response + let requested_chunks = offer_files(&mut connection, &handles).await?; // Upload negotiated files - upload_encrypted_files(stream, handles, &cipher).await?; + let std_file_handles = FileHandle::to_stds(handles, requested_chunks).await; + connection.upload_files(std_file_handles).await?; + println!("Done uploading."); // Exit Ok(()) } pub async fn receive(password: &String) -> Result<()> { + // Establish connection to server let socket = TcpStream::connect("127.0.0.1:8080").await?; - let mut stream = Message::to_stream(socket); - let (stream, cipher) = handshake( - &mut stream, - Bytes::from(password.to_string()), - pass_to_bytes(password), - ) - .await?; - - let files = negotiate_files_down(stream, &cipher).await?; - - download_files(files, stream, &cipher).await?; + let (handshake, s1) = Handshake::from_password(password); + // Complete handshake, returning key used for encryption + let (socket, key) = handshake.negotiate(socket, s1).await?; + let mut connection = Connection::new(socket, key); + // Wait for offered files, respond with desired files + let desired_files = request_specific_files(&mut connection).await?; + // Create files + let std_file_handles = create_files(desired_files).await?; + // Download them + connection.download_files(std_file_handles).await?; return Ok(()); } -pub async fn get_file_handles(file_paths: &Vec) -> Result> { - let tasks = file_paths - .into_iter() - .map(|path| FileHandle::new(path.to_path_buf())); - let handles = try_join_all(tasks).await?; - Ok(handles) -} - -pub async fn negotiate_files_up( - file_handles: Vec, - stream: &mut MessageStream, - cipher: &Aes256Gcm, -) -> Result> { - let files = file_handles.iter().map(|fh| fh.to_file_info()).collect(); - let msg = EncryptedMessage::FileNegotiationMessage(FileNegotiationPayload { files }); - let server_msg = msg.to_encrypted_message(cipher)?; - stream.send(server_msg).await?; - let reply_payload = match stream.next().await { - Some(Ok(msg)) => match msg { - Message::EncryptedMessage(response) => response, - _ => return Err(anyhow!("Expecting encrypted message back")), - }, - _ => { - return Err(anyhow!("No response to negotiation message")); - } - }; - let plaintext_reply = EncryptedMessage::from_encrypted_message(cipher, &reply_payload)?; - let requested_paths: Vec = match plaintext_reply { - EncryptedMessage::FileNegotiationMessage(fnm) => { - fnm.files.into_iter().map(|f| f.path).collect() - } - _ => return Err(anyhow!("Expecting file negotiation message back")), - }; - Ok(file_handles - .into_iter() - .filter(|fh| requested_paths.contains(&fh.path)) - .collect()) -} - -pub async fn negotiate_files_down( - stream: &mut MessageStream, - cipher: &Aes256Gcm, -) -> Result> { - let file_offer = match stream.next().await { - Some(Ok(msg)) => match msg { - Message::EncryptedMessage(response) => response, - _ => return Err(anyhow!("Expecting encrypted message back")), - }, - _ => { - return Err(anyhow!("No response to negotiation message")); - } - }; - let plaintext_offer = EncryptedMessage::from_encrypted_message(cipher, &file_offer)?; - let requested_infos: Vec = match plaintext_offer { - EncryptedMessage::FileNegotiationMessage(fnm) => fnm.files, - _ => return Err(anyhow!("Expecting file negotiation message back")), - }; - let mut stdin = FramedRead::new(io::stdin(), LinesCodec::new()); - let mut files = vec![]; - for file_info in requested_infos.into_iter() { - let mut reply = prompt_user_input(&mut stdin, &file_info).await; - while reply.is_none() { - reply = prompt_user_input(&mut stdin, &file_info).await; - } - match reply { - Some(true) => files.push(file_info), - _ => {} - } +pub async fn offer_files( + conn: &mut Connection, + file_handles: &Vec, +) -> Result> { + // Collect file offer + let files = file_handles.iter().map(|fh| fh.to_file_offer()).collect(); + let msg = Message::FileOffer(FileOfferPayload { files }); + // Send file offer + conn.send_msg(msg).await?; + // Wait for reply + let reply = conn.await_msg().await?; + // Return requested chunks + match reply { + Message::FileRequest(file_request_payload) => Ok(file_request_payload.chunks), + _ => Err(anyhow!("Expecting file request message back")), } - let msg = EncryptedMessage::FileNegotiationMessage(FileNegotiationPayload { - files: files.clone(), +} + +pub async fn request_specific_files(conn: &mut Connection) -> Result> { + // Wait for offer message + let offer_message = conn.await_msg().await?; + let offered_files: Vec = match offer_message { + Message::FileOffer(file_offer_payload) => file_offer_payload.files, + _ => return Err(anyhow!("Expecting file offer message")), + }; + // Prompt user for confirmation of files + let desired_files = prompt_user_for_file_confirmation(offered_files).await; + let file_request_msg = Message::FileRequest(FileRequestPayload { + chunks: desired_files + .iter() + .map(|file| ChunkHeader { + id: file.id, + start: 0, + }) + .collect(), }); - let server_msg = msg.to_encrypted_message(cipher)?; - stream.send(server_msg).await?; - Ok(files) + conn.send_msg(file_request_msg).await?; + Ok(desired_files) } -pub async fn upload_encrypted_files( - stream: &mut MessageStream, - handles: Vec, - cipher: &Aes256Gcm, -) -> Result<()> { - let (tx, mut rx) = mpsc::unbounded_channel::(); - for mut handle in handles { - let txc = tx.clone(); - tokio::spawn(async move { - let _ = enqueue_file_chunks(&mut handle, txc).await; - }); - } - - loop { - tokio::select! { - Some(msg) = rx.recv() => { - // println!("message received to client.rx {:?}", msg); - let x = msg.to_encrypted_message(cipher)?; - stream.send(x).await? - } - else => { - println!("breaking"); - break - }, - } - } - Ok(()) -} - -pub async fn enqueue_file_chunks( - fh: &mut FileHandle, - tx: mpsc::UnboundedSender, -) -> Result<()> { - let mut chunk_num = 0; - let mut bytes_read = 1; - while bytes_read != 0 { - let mut buf = BytesMut::with_capacity(BUFFER_SIZE); - bytes_read = fh.file.read_buf(&mut buf).await?; - // println!("Bytes_read: {:?}, The bytes: {:?}", bytes_read, &buf[..]); - if bytes_read != 0 { - let chunk = buf.freeze(); - let file_info = fh.to_file_info(); - let ftp = EncryptedMessage::FileTransferMessage(FileTransferPayload { - chunk, - chunk_num, - file_info, - }); - tx.send(ftp)?; - chunk_num += 1; - } - } - - Ok(()) -} - -pub async fn download_files( - file_infos: Vec, - stream: &mut MessageStream, - cipher: &Aes256Gcm, -) -> Result<()> { - // for each file_info - let mut info_handles: HashMap> = HashMap::new(); - for fi in file_infos { - let (tx, rx) = mpsc::unbounded_channel::<(u64, Bytes)>(); - let path = fi.path.clone(); - tokio::spawn(async move { download_file(fi, rx).await }); - info_handles.insert(path, tx); - } - loop { - tokio::select! { - result = stream.next() => match result { - Some(Ok(Message::EncryptedMessage(payload))) => { - let ec = EncryptedMessage::from_encrypted_message(cipher, &payload)?; - // println!("encrypted message received! {:?}", ec); - match ec { - EncryptedMessage::FileTransferMessage(payload) => { - println!("matched file transfer message"); - if let Some(tx) = info_handles.get(&payload.file_info.path) { - println!("matched on filetype, sending to tx"); - tx.send((payload.chunk_num, payload.chunk))? - }; - }, - _ => {println!("wrong msg")} - } - } - Some(Ok(_)) => { - println!("wrong msg"); - } - Some(Err(e)) => { - println!("Error {:?}", e); - } - None => break, - } - } - } - Ok(()) -} - -pub async fn download_file( - file_info: FileInfo, - rx: mpsc::UnboundedReceiver<(u64, Bytes)>, -) -> Result<()> { - println!("in download file"); - let mut rx = rx; - let filename = match file_info.path.file_name() { - Some(f) => { - println!("matched filename"); - f - } - None => { - println!("didnt match filename"); - OsStr::new("random.txt") - } - }; - println!("trying to create file...filename={:?}", filename); - let mut file = File::create(filename).await?; - println!("file created ok! filename={:?}", filename); - while let Some((_chunk_num, chunk)) = rx.recv().await { - // println!("rx got message! chunk={:?}", chunk); - file.write_all(&chunk).await?; - } - println!("done receiving messages"); - Ok(()) -} - -pub async fn prompt_user_input( - stdin: &mut FramedRead, - file_info: &FileInfo, -) -> Option { - let prompt_name = file_info.path.file_name().unwrap(); - println!( - "Accept {:?}? ({:?}). (Y/n)", - prompt_name, - to_size_string(file_info.size) - ); - match stdin.next().await { - Some(Ok(line)) => match line.as_str() { - "" | "Y" | "y" | "yes" | "Yes" | "YES" => Some(true), - "N" | "n" | "NO" | "no" | "No" => Some(false), - _ => { - println!("Invalid input. Please enter one of the following characters: [YyNn]"); - return None; - } - }, - _ => None, +pub async fn create_files(desired_files: Vec) -> Result> { + let mut v = Vec::new(); + for desired_file in desired_files { + let filename = desired_file + .path + .file_name() + .unwrap_or(OsStr::new("random.txt")); + let file = File::create(filename).await?; + let std_file_handle = StdFileHandle::new(desired_file.id, file, 0).await?; + v.push(std_file_handle) } + return Ok(v); } diff --git a/src/conf.rs b/src/conf.rs index 07a6f99..cf506ec 100644 --- a/src/conf.rs +++ b/src/conf.rs @@ -1 +1,5 @@ -pub const BUFFER_SIZE: usize = 1024 * 1024; +pub const ID_SIZE: usize = 32; // Blake256 of password +pub const HANDSHAKE_MSG_SIZE: usize = 33; // generated by Spake2 +pub const PER_CLIENT_BUFFER: usize = 1024 * 64; // buffer size allocated by server for each client +pub const BUFFER_SIZE: usize = 1024 * 64; // chunk size for files sent over wire +pub const NONCE_SIZE: usize = 96 / 8; // used for every encrypted message diff --git a/src/connection.rs b/src/connection.rs new file mode 100644 index 0000000..80f5df4 --- /dev/null +++ b/src/connection.rs @@ -0,0 +1,128 @@ +use crate::conf::BUFFER_SIZE; +use crate::crypto::Crypt; +use crate::file::{ChunkHeader, StdFileHandle}; +use crate::message::{FileTransferPayload, Message, MessageStream}; +use anyhow::{anyhow, Result}; + +use futures::{SinkExt, StreamExt}; +use tokio::net::TcpStream; + +use bytes::{Bytes, BytesMut}; +use flate2::bufread::GzEncoder; +use flate2::write::GzDecoder; +use flate2::Compression; +use std::io::{BufReader, Read, Write}; +use std::time::Instant; + +pub struct Connection { + ms: MessageStream, + crypt: Crypt, +} + +impl Connection { + pub fn new(socket: TcpStream, key: Vec) -> Self { + let ms = Message::to_stream(socket); + let crypt = Crypt::new(&key); + Connection { ms, crypt } + } + + pub async fn send_bytes(&mut self, bytes: Bytes) -> Result<()> { + match self.ms.send(bytes).await { + Ok(_) => Ok(()), + Err(e) => Err(anyhow!(e.to_string())), + } + } + + pub async fn send_msg(&mut self, msg: Message) -> Result<()> { + let msg = msg.serialize()?; + let bytes = self.crypt.encrypt(msg)?; + self.send_bytes(bytes).await + } + + pub async fn await_msg(&mut self) -> Result { + match self.ms.next().await { + Some(Ok(msg)) => { + let decrypted_bytes = self.crypt.decrypt(msg.freeze())?; + Message::deserialize(decrypted_bytes) + } + Some(Err(e)) => Err(anyhow!(e.to_string())), + None => Err(anyhow!("Error awaiting msg")), + } + } + + pub async fn upload_file(&mut self, handle: StdFileHandle) -> Result<()> { + let mut buffer = [0; BUFFER_SIZE]; + let reader = BufReader::new(handle.file); + let mut gz = GzEncoder::new(reader, Compression::fast()); + let before = Instant::now(); + let mut count = 0; + let mut bytes_sent: u64 = 0; + loop { + count += 1; + match gz.read(&mut buffer) { + Ok(0) => { + break; + } + Ok(n) => { + bytes_sent += n as u64; + let message = Message::FileTransfer(FileTransferPayload { + chunk: BytesMut::from(&buffer[..n]).freeze(), + chunk_header: ChunkHeader { + id: handle.id, + start: 0, + }, + }); + self.send_msg(message).await?; + } + Err(e) => return Err(anyhow!(e.to_string())), + } + } + let elapsed = before.elapsed(); + let mb_sent = bytes_sent / 1_048_576; + println!( + "{:?} mb sent, {:?} iterations. {:?} total time, {:?} avg per iteration, {:?} avg mb/sec", + mb_sent, + count, + elapsed, + elapsed / count, + mb_sent / elapsed.as_secs() + ); + Ok(()) + } + + pub async fn upload_files(mut self, handles: Vec) -> Result<()> { + for handle in handles { + self.upload_file(handle).await?; + } + Ok(()) + } + + pub async fn download_files(mut self, handles: Vec) -> Result<()> { + for handle in handles { + self.download_file(handle).await?; + } + Ok(()) + } + + pub async fn download_file(&mut self, handle: StdFileHandle) -> Result<()> { + let mut decoder = GzDecoder::new(handle.file); + loop { + let msg = self.await_msg().await?; + match msg { + Message::FileTransfer(payload) => { + if payload.chunk_header.id != handle.id { + return Err(anyhow!("Wrong file")); + } + if payload.chunk.len() == 0 { + break; + } + decoder.write_all(&payload.chunk[..])? + } + _ => return Err(anyhow!("Expecting file transfer message")), + } + } + decoder.finish()?; + println!("Done downloading file."); + Ok(()) + } +} diff --git a/src/crypto.rs b/src/crypto.rs index 43cf8ef..e95b5fa 100644 --- a/src/crypto.rs +++ b/src/crypto.rs @@ -1,69 +1,48 @@ -use crate::message::{EncryptedPayload, HandshakePayload, Message, MessageStream}; - +use crate::conf::NONCE_SIZE; use aes_gcm::aead::{Aead, NewAead}; use aes_gcm::{Aes256Gcm, Key, Nonce}; // Or `Aes128Gcm` use anyhow::{anyhow, Result}; -use bytes::Bytes; -use futures::prelude::*; +use bytes::{Bytes, BytesMut}; + use rand::{thread_rng, Rng}; -use spake2::{Ed25519Group, Identity, Password, Spake2}; -pub async fn handshake( - stream: &mut MessageStream, - password: Bytes, - id: Bytes, -) -> Result<(&mut MessageStream, Aes256Gcm)> { - let (s1, outbound_msg) = - Spake2::::start_symmetric(&Password::new(password), &Identity::new(&id)); - println!("client - sending handshake msg"); - let handshake_msg = Message::HandshakeMessage(HandshakePayload { - id, - msg: Bytes::from(outbound_msg), - }); - // println!("client - handshake msg, {:?}", handshake_msg); - stream.send(handshake_msg).await?; - let first_message = match stream.next().await { - Some(Ok(msg)) => match msg { - Message::HandshakeMessage(response) => response.msg, - _ => return Err(anyhow!("Expecting handshake message response")), - }, - _ => { - return Err(anyhow!("No response to handshake message")); +pub struct Crypt { + cipher: Aes256Gcm, + arr: [u8; NONCE_SIZE], +} + +impl Crypt { + pub fn new(key: &Vec) -> Crypt { + let key = Key::from_slice(&key[..]); + Crypt { + cipher: Aes256Gcm::new(key), + arr: [0u8; NONCE_SIZE], } - }; - println!("client - handshake msg responded to"); - let key = match s1.finish(&first_message[..]) { - Ok(key_bytes) => key_bytes, - Err(e) => return Err(anyhow!(e.to_string())), - }; - // println!("Handshake successful. Key is {:?}", key); - return Ok((stream, new_cipher(&key))); -} + } -pub fn new_cipher(key: &Vec) -> Aes256Gcm { - let key = Key::from_slice(&key[..]); - Aes256Gcm::new(key) -} + // Returns wire format, includes nonce as prefix + pub fn encrypt(&mut self, plaintext: Bytes) -> Result { + thread_rng().try_fill(&mut self.arr[..])?; + let nonce = Nonce::from_slice(&self.arr); + match self.cipher.encrypt(nonce, plaintext.as_ref()) { + Ok(body) => { + let mut buffer = BytesMut::with_capacity(NONCE_SIZE + body.len()); + buffer.extend_from_slice(nonce); + buffer.extend_from_slice(&body); + Ok(buffer.freeze()) + } + Err(e) => Err(anyhow!(e.to_string())), + } + } -pub const NONCE_SIZE_IN_BYTES: usize = 96 / 8; -pub fn encrypt(cipher: &Aes256Gcm, body: &Vec) -> Result { - let mut arr = [0u8; NONCE_SIZE_IN_BYTES]; - thread_rng().try_fill(&mut arr[..])?; - let nonce = Nonce::from_slice(&arr); - let plaintext = body.as_ref(); - match cipher.encrypt(nonce, plaintext) { - Ok(body) => Ok(EncryptedPayload { - nonce: arr.to_vec(), - body, - }), - Err(_) => Err(anyhow!("Encryption error")), - } -} - -pub fn decrypt(cipher: &Aes256Gcm, payload: &EncryptedPayload) -> Result { - let nonce = Nonce::from_slice(payload.nonce.as_ref()); - match cipher.decrypt(nonce, payload.body.as_ref()) { - Ok(payload) => Ok(Bytes::from(payload)), - Err(_) => Err(anyhow!("Decryption error")), + // Accepts wire format, includes nonce as prefix + pub fn decrypt(&self, ciphertext: Bytes) -> Result { + let mut ciphertext_body = ciphertext; + let nonce_bytes = ciphertext_body.split_to(NONCE_SIZE); + let nonce = Nonce::from_slice(&nonce_bytes); + match self.cipher.decrypt(nonce, ciphertext_body.as_ref()) { + Ok(payload) => Ok(Bytes::from(payload)), + Err(e) => Err(anyhow!(e.to_string())), + } } } diff --git a/src/file.rs b/src/file.rs index 3e03c82..c63e094 100644 --- a/src/file.rs +++ b/src/file.rs @@ -1,35 +1,104 @@ use anyhow::Result; +use futures::future::try_join_all; + use serde::{Deserialize, Serialize}; use std::fs::Metadata; use std::path::PathBuf; + +use std::io::{Seek, SeekFrom}; + use tokio::fs::File; #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] -pub struct FileInfo { +pub struct ChunkHeader { + pub id: u8, + pub start: u64, +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct FileOffer { + pub id: u8, pub path: PathBuf, pub size: u64, } +pub struct StdFileHandle { + pub id: u8, + pub file: std::fs::File, + pub start: u64, +} + +impl StdFileHandle { + pub async fn new(id: u8, file: File, start: u64) -> Result { + let mut std_file = file.into_std().await; + std_file.seek(SeekFrom::Start(start))?; + Ok(StdFileHandle { + id: id, + file: std_file, + start: start, + }) + } +} + pub struct FileHandle { + pub id: u8, pub file: File, pub md: Metadata, pub path: PathBuf, } impl FileHandle { - pub async fn new(path: PathBuf) -> Result { + pub async fn new(id: u8, path: PathBuf) -> Result { let file = File::open(&path).await?; let md = file.metadata().await?; - let fh = FileHandle { file, md, path }; + let fh = FileHandle { id, file, md, path }; return Ok(fh); } - pub fn to_file_info(&self) -> FileInfo { - FileInfo { + pub async fn to_stds( + file_handles: Vec, + chunk_headers: Vec, + ) -> Vec { + let mut ret = Vec::new(); + for handle in file_handles { + let chunk = chunk_headers.iter().find(|chunk| handle.id == chunk.id); + match chunk { + Some(chunk) => { + match handle.to_std(chunk).await { + Ok(std_file_handle) => { + ret.push(std_file_handle); + } + _ => println!("Error seeking in file"), + }; + } + None => { + println!("Skipping file b/c not in requested chunks"); + } + } + } + ret + } + + async fn to_std(self, chunk_header: &ChunkHeader) -> Result { + StdFileHandle::new(self.id, self.file, chunk_header.start).await + } + + pub fn to_file_offer(&self) -> FileOffer { + FileOffer { + id: self.id, path: self.path.clone(), size: self.md.len(), } } + + pub async fn get_file_handles(file_paths: &Vec) -> Result> { + let tasks = file_paths + .into_iter() + .enumerate() + .map(|(idx, path)| FileHandle::new(idx.try_into().unwrap(), path.to_path_buf())); + let handles = try_join_all(tasks).await?; + Ok(handles) + } } const SUFFIX: [&'static str; 9] = ["B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB"]; diff --git a/src/handshake.rs b/src/handshake.rs new file mode 100644 index 0000000..1b1b409 --- /dev/null +++ b/src/handshake.rs @@ -0,0 +1,72 @@ +use crate::conf::{HANDSHAKE_MSG_SIZE, ID_SIZE}; + +use anyhow::{anyhow, Result}; +use blake2::{Blake2s256, Digest}; +use bytes::{Bytes, BytesMut}; +use spake2::{Ed25519Group, Identity, Password, Spake2}; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::net::TcpStream; + +pub struct Handshake { + pub id: Bytes, + pub outbound_msg: Bytes, +} + +impl Handshake { + pub fn from_password(pw: &String) -> (Handshake, spake2::Spake2) { + let password = Bytes::from(pw.to_string()); + let id = Handshake::pass_to_bytes(&pw); + let (s1, outbound_msg) = + Spake2::::start_symmetric(&Password::new(&password), &Identity::new(&id)); + let mut buffer = BytesMut::with_capacity(HANDSHAKE_MSG_SIZE); + buffer.extend_from_slice(&outbound_msg[..HANDSHAKE_MSG_SIZE]); + let outbound_msg = buffer.freeze(); + let handshake = Handshake { id, outbound_msg }; + (handshake, s1) + } + + pub async fn from_socket(socket: TcpStream) -> Result<(Handshake, TcpStream)> { + let mut socket = socket; + let mut buffer = [0; ID_SIZE + HANDSHAKE_MSG_SIZE]; + let n = socket.read_exact(&mut buffer).await?; + println!("The bytes: {:?}", &buffer[..n]); + let mut outbound_msg = BytesMut::from(&buffer[..n]).freeze(); + let id = outbound_msg.split_to(ID_SIZE); + Ok((Handshake { id, outbound_msg }, socket)) + } + + pub fn to_bytes(self) -> Bytes { + let mut buffer = BytesMut::with_capacity(ID_SIZE + HANDSHAKE_MSG_SIZE); + buffer.extend_from_slice(&self.id); + buffer.extend_from_slice(&self.outbound_msg); + buffer.freeze() + } + + pub async fn negotiate( + self, + socket: TcpStream, + s1: spake2::Spake2, + ) -> Result<(TcpStream, Vec)> { + let mut socket = socket; + let bytes = &self.to_bytes(); + println!("client - sending handshake msg= {:?}", &bytes); + socket.write_all(&bytes).await?; + let mut buffer = [0; HANDSHAKE_MSG_SIZE]; + let n = socket.read_exact(&mut buffer).await?; + let response = BytesMut::from(&buffer[..n]).freeze(); + // println!("client - handshake msg, {:?}", response); + let key = match s1.finish(&response[..]) { + Ok(key_bytes) => key_bytes, + Err(e) => return Err(anyhow!(e.to_string())), + }; + println!("Handshake successful. Key is {:?}", key); + return Ok((socket, key)); + } + + fn pass_to_bytes(password: &String) -> Bytes { + let mut hasher = Blake2s256::new(); + hasher.update(password.as_bytes()); + let res = hasher.finalize(); + BytesMut::from(&res[..]).freeze() + } +} diff --git a/src/main.rs b/src/main.rs index 65c2518..f880896 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,10 +1,13 @@ mod cli; mod client; mod conf; +mod connection; mod crypto; mod file; +mod handshake; mod message; mod server; +mod ui; use clap::Parser; use cli::{Cli, Commands}; diff --git a/src/message.rs b/src/message.rs index 10593a2..0e27365 100644 --- a/src/message.rs +++ b/src/message.rs @@ -1,109 +1,50 @@ -use crate::crypto::{decrypt, encrypt}; -use crate::file::FileInfo; +use crate::file::{ChunkHeader, FileOffer}; -use aes_gcm::Aes256Gcm; // Or `Aes128Gcm` use anyhow::{anyhow, Result}; use bytes::Bytes; use serde::{Deserialize, Serialize}; -use std::error::Error; -use std::fmt; use tokio::net::TcpStream; -use tokio_serde::{formats::SymmetricalBincode, SymmetricallyFramed}; use tokio_util::codec::{Framed, LengthDelimitedCodec}; #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub enum Message { - HandshakeMessage(HandshakePayload), - EncryptedMessage(EncryptedPayload), - ErrorMessage(RuckError), + FileOffer(FileOfferPayload), + FileRequest(FileRequestPayload), + FileTransfer(FileTransferPayload), } #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] -pub struct HandshakePayload { - pub id: Bytes, - pub msg: Bytes, +pub struct FileRequestPayload { + pub chunks: Vec, } #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] -pub struct EncryptedPayload { - pub nonce: Vec, - pub body: Vec, -} - -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] -pub enum EncryptedMessage { - FileNegotiationMessage(FileNegotiationPayload), - FileTransferMessage(FileTransferPayload), -} - -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] -pub struct FileNegotiationPayload { - pub files: Vec, +pub struct FileOfferPayload { + pub files: Vec, } #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct FileTransferPayload { - pub file_info: FileInfo, - pub chunk_num: u64, + pub chunk_header: ChunkHeader, pub chunk: Bytes, } -impl EncryptedMessage { - pub fn from_encrypted_message(cipher: &Aes256Gcm, payload: &EncryptedPayload) -> Result { - let raw = decrypt(cipher, payload)?; - let res = match bincode::deserialize(raw.as_ref()) { - Ok(result) => result, - Err(e) => { - println!("deserialize error {:?}", e); - return Err(anyhow!("deser error")); - } - }; - Ok(res) +impl Message { + pub fn serialize(&self) -> Result { + bincode::serialize(&self).map(|vec| Ok(Bytes::from(vec)))? } - pub fn to_encrypted_message(&self, cipher: &Aes256Gcm) -> Result { - let raw = match bincode::serialize(&self) { - Ok(result) => result, - Err(e) => { - println!("serialize error {:?}", e); - return Err(anyhow!("serialize error")); - } - }; - let payload = encrypt(cipher, &raw)?; - Ok(Message::EncryptedMessage(payload)) - } -} - -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] -pub enum RuckError { - NotHandshake, - SenderNotConnected, - SenderAlreadyConnected, - PairDisconnected, -} - -impl fmt::Display for RuckError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "RuckError is here!") - } -} - -impl Error for RuckError { - fn source(&self) -> Option<&(dyn Error + 'static)> { - Some(self) + pub fn deserialize(bytes: Bytes) -> Result { + match bincode::deserialize(bytes.as_ref()) { + Ok(msg) => Ok(msg), + Err(e) => Err(anyhow!(e.to_string())), + } } } impl Message { pub fn to_stream(stream: TcpStream) -> MessageStream { - tokio_serde::SymmetricallyFramed::new( - Framed::new(stream, LengthDelimitedCodec::new()), - tokio_serde::formats::SymmetricalBincode::::default(), - ) + Framed::new(stream, LengthDelimitedCodec::new()) } } -pub type MessageStream = SymmetricallyFramed< - Framed, - Message, - SymmetricalBincode, ->; +pub type MessageStream = Framed; diff --git a/src/server.rs b/src/server.rs index 759e154..f4cf567 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,16 +1,16 @@ -use crate::message::{HandshakePayload, Message, MessageStream, RuckError}; - +use crate::conf::PER_CLIENT_BUFFER; +use crate::handshake::Handshake; use anyhow::{anyhow, Result}; -use bytes::Bytes; -use futures::prelude::*; +use bytes::{Bytes, BytesMut}; use std::collections::HashMap; use std::net::SocketAddr; use std::sync::Arc; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::{TcpListener, TcpStream}; use tokio::sync::{mpsc, Mutex}; -type Tx = mpsc::UnboundedSender; -type Rx = mpsc::UnboundedReceiver; +type Tx = mpsc::UnboundedSender; +type Rx = mpsc::UnboundedReceiver; pub struct Shared { handshake_cache: HashMap, @@ -18,12 +18,12 @@ pub struct Shared { type State = Arc>; struct Client { - messages: MessageStream, + socket: TcpStream, rx: Rx, peer_tx: Option, } struct StapledClient { - messages: MessageStream, + socket: TcpStream, rx: Rx, peer_tx: Tx, } @@ -37,11 +37,11 @@ impl Shared { } impl Client { - async fn new(id: Bytes, state: State, messages: MessageStream) -> Result { + async fn new(id: Bytes, state: State, socket: TcpStream) -> Result { let (tx, rx) = mpsc::unbounded_channel(); let mut shared = state.lock().await; let client = Client { - messages, + socket, rx, peer_tx: shared.handshake_cache.remove(&id), }; @@ -49,11 +49,7 @@ impl Client { Ok(client) } - async fn upgrade( - client: Client, - state: State, - handshake_payload: HandshakePayload, - ) -> Result { + async fn upgrade(client: Client, state: State, handshake: Handshake) -> Result { let mut client = client; let peer_tx = match client.peer_tx { // Receiver - already stapled at creation @@ -61,28 +57,21 @@ impl Client { // Sender - needs to wait for the incoming msg to look up peer_tx None => { tokio::select! { + // Client reads handshake message sent over channel Some(msg) = client.rx.recv() => { - client.messages.send(msg).await? - } - result = client.messages.next() => match result { - Some(_) => return Err(anyhow!("Client sending more messages before handshake complete")), - None => return Err(anyhow!("Connection interrupted")), + // Writes parnter handshake message over wire + client.socket.write_all(&msg[..]).await? } } - match state - .lock() - .await - .handshake_cache - .remove(&handshake_payload.id) - { + match state.lock().await.handshake_cache.remove(&handshake.id) { Some(peer_tx) => peer_tx, None => return Err(anyhow!("Connection not stapled")), } } }; - peer_tx.send(Message::HandshakeMessage(handshake_payload))?; + peer_tx.send(handshake.outbound_msg)?; Ok(StapledClient { - messages: client.messages, + socket: client.socket, rx: client.rx, peer_tx, }) @@ -109,26 +98,14 @@ pub async fn serve() -> Result<()> { pub async fn handle_connection( state: Arc>, socket: TcpStream, - addr: SocketAddr, + _addr: SocketAddr, ) -> Result<()> { - let mut stream = Message::to_stream(socket); - println!("server - new conn from {:?}", addr); - let handshake_payload = match stream.next().await { - Some(Ok(Message::HandshakeMessage(payload))) => payload, - Some(Ok(_)) => { - stream - .send(Message::ErrorMessage(RuckError::NotHandshake)) - .await?; - return Ok(()); - } - _ => { - println!("No first message"); - return Ok(()); - } - }; - let id = handshake_payload.id.clone(); - let client = Client::new(id.clone(), state.clone(), stream).await?; - let mut client = match Client::upgrade(client, state.clone(), handshake_payload).await { + socket.readable().await?; + let (handshake, socket) = Handshake::from_socket(socket).await?; + let id = handshake.id.clone(); + let client = Client::new(id.clone(), state.clone(), socket).await?; + println!("Client created"); + let mut client = match Client::upgrade(client, state.clone(), handshake).await { Ok(client) => client, Err(err) => { // Clear handshake cache if staple is unsuccessful @@ -136,22 +113,30 @@ pub async fn handle_connection( return Err(err); } }; + println!("Client upgraded"); // The handshake cache should be empty for {id} at this point. + let mut client_buffer = BytesMut::with_capacity(PER_CLIENT_BUFFER); loop { tokio::select! { Some(msg) = client.rx.recv() => { - client.messages.send(msg).await? + // println!("piping bytes= {:?}", msg); + client.socket.write_all(&msg[..]).await? } - result = client.messages.next() => match result { - Some(Ok(msg)) => { - client.peer_tx.send(msg)? - } - Some(Err(e)) => { + result = client.socket.read_buf(&mut client_buffer) => match result { + Ok(0) => { + break; + }, + Ok(n) => { + let b = BytesMut::from(&client_buffer[0..n]).freeze(); + client.peer_tx.send(b)?; + client_buffer.clear(); + }, + Err(e) => { println!("Error {:?}", e); } - None => break, } } } + println!("done with client"); Ok(()) } diff --git a/src/ui.rs b/src/ui.rs new file mode 100644 index 0000000..9b94a41 --- /dev/null +++ b/src/ui.rs @@ -0,0 +1,46 @@ +use crate::file::{to_size_string, FileOffer}; + +use futures::prelude::*; + +use tokio::io::{self}; + +use tokio_util::codec::{FramedRead, LinesCodec}; + +pub async fn prompt_user_for_file_confirmation(file_offers: Vec) -> Vec { + let mut stdin = FramedRead::new(io::stdin(), LinesCodec::new()); + let mut files = vec![]; + for file_offer in file_offers.into_iter() { + let mut reply = prompt_user_input(&mut stdin, &file_offer).await; + while reply.is_none() { + reply = prompt_user_input(&mut stdin, &file_offer).await; + } + match reply { + Some(true) => files.push(file_offer), + _ => {} + } + } + files +} + +pub async fn prompt_user_input( + stdin: &mut FramedRead, + file_offer: &FileOffer, +) -> Option { + let prompt_name = file_offer.path.file_name().unwrap(); + println!( + "Accept {:?}? ({:?}). (Y/n)", + prompt_name, + to_size_string(file_offer.size) + ); + match stdin.next().await { + Some(Ok(line)) => match line.as_str() { + "" | "Y" | "y" | "yes" | "Yes" | "YES" => Some(true), + "N" | "n" | "NO" | "no" | "No" => Some(false), + _ => { + println!("Invalid input. Please enter one of the following characters: [YyNn]"); + return None; + } + }, + _ => None, + } +}