diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..020b720 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,32 @@ +FROM rust:1.58 as build + +# create a new empty shell project +RUN USER=root cargo new --bin ruck +WORKDIR /ruck + +# copy over your manifests +COPY ./Cargo.lock ./Cargo.lock +COPY ./Cargo.toml ./Cargo.toml + +# this build step will cache your dependencies +RUN cargo build --release + +# copy your source tree +RUN rm src/*.rs +COPY ./src ./src + +# build for release +RUN rm ./target/release/deps/ruck* +RUN cargo build --release + +# Copy the binary into a new container for a smaller docker image +FROM debian:buster-slim + +COPY --from=build /ruck/target/release/ruck / +USER root + +ENV RUST_LOG=info +ENV RUST_BACKTRACE=full + +CMD ["/ruck", "relay"] + diff --git a/README.md b/README.md index d556140..1465e2c 100644 --- a/README.md +++ b/README.md @@ -12,6 +12,19 @@ - https://kerkour.com/rust-symmetric-encryption-aead-benchmark/ - https://docs.rs/aes-gcm/latest/aes_gcm/ - - https://github.com/rust-lang/flate2-rs - https://crates.io/crates/async-compression + +- https://pdos.csail.mit.edu/papers/chord:sigcomm01/chord_sigcomm.pdf +- https://pdos.csail.mit.edu/6.824/schedule.html +- https://flylib.com/books/en/2.292.1.105/1/ +- https://en.wikipedia.org/wiki/Two_Generals%27_Problem + +- https://kobzol.github.io/rust/ci/2021/05/07/building-rust-binaries-in-ci-that-work-with-older-glibc.html + +## Todo + +- [ ] Use tracing +- [ ] Add progress bars +- [ ] Exit happily when transfer is complete +- [ ] Compress files diff --git a/scripts/build.sh b/scripts/build.sh new file mode 100755 index 0000000..dfceb4f --- /dev/null +++ b/scripts/build.sh @@ -0,0 +1 @@ +docker build -t ruck . \ No newline at end of file diff --git a/scripts/run.sh b/scripts/run.sh new file mode 100755 index 0000000..5459a22 --- /dev/null +++ b/scripts/run.sh @@ -0,0 +1 @@ +docker run -p 8080:3030 --rm --name ruck1 ruck \ No newline at end of file diff --git a/src/client.rs b/src/client.rs index ef63dd5..60c4da0 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,6 +1,9 @@ +use crate::conf::BUFFER_SIZE; use crate::crypto::handshake; use crate::file::{to_size_string, FileHandle, FileInfo}; -use crate::message::{EncryptedMessage, FileNegotiationPayload, Message, MessageStream}; +use crate::message::{ + EncryptedMessage, FileNegotiationPayload, FileTransferPayload, Message, MessageStream, +}; use aes_gcm::Aes256Gcm; use anyhow::{anyhow, Result}; @@ -8,9 +11,13 @@ use blake2::{Blake2s256, Digest}; use bytes::{Bytes, BytesMut}; use futures::future::try_join_all; use futures::prelude::*; +use std::collections::HashMap; +use std::ffi::OsStr; use std::path::PathBuf; -use tokio::io; +use tokio::fs::File; +use tokio::io::{self, AsyncReadExt, AsyncWriteExt}; use tokio::net::TcpStream; +use tokio::sync::mpsc; use tokio_util::codec::{FramedRead, LinesCodec}; fn pass_to_bytes(password: &String) -> Bytes { @@ -31,7 +38,6 @@ pub async fn send(file_paths: &Vec, password: &String) -> Result<()> { // Complete handshake, returning cipher used for encryption let (stream, cipher) = handshake( &mut stream, - true, Bytes::from(password.to_string()), pass_to_bytes(password), ) @@ -41,6 +47,7 @@ pub async fn send(file_paths: &Vec, password: &String) -> Result<()> { let handles = negotiate_files_up(handles, stream, &cipher).await?; // Upload negotiated files + upload_encrypted_files(stream, handles, &cipher).await?; // Exit Ok(()) @@ -51,13 +58,14 @@ pub async fn receive(password: &String) -> Result<()> { let mut stream = Message::to_stream(socket); let (stream, cipher) = handshake( &mut stream, - false, Bytes::from(password.to_string()), pass_to_bytes(password), ) .await?; let files = negotiate_files_down(stream, &cipher).await?; + + download_files(files, stream, &cipher).await?; return Ok(()); } @@ -100,7 +108,10 @@ pub async fn negotiate_files_up( .collect()) } -pub async fn negotiate_files_down(stream: &mut MessageStream, cipher: &Aes256Gcm) -> Result<()> { +pub async fn negotiate_files_down( + stream: &mut MessageStream, + cipher: &Aes256Gcm, +) -> Result> { let file_offer = match stream.next().await { Some(Ok(msg)) => match msg { Message::EncryptedMessage(response) => response, @@ -117,28 +128,147 @@ pub async fn negotiate_files_down(stream: &mut MessageStream, cipher: &Aes256Gcm }; let mut stdin = FramedRead::new(io::stdin(), LinesCodec::new()); let mut files = vec![]; - for path in requested_infos.into_iter() { - let mut reply = prompt_user_input(&mut stdin, &path).await; + for file_info in requested_infos.into_iter() { + let mut reply = prompt_user_input(&mut stdin, &file_info).await; while reply.is_none() { - reply = prompt_user_input(&mut stdin, &path).await; + reply = prompt_user_input(&mut stdin, &file_info).await; } match reply { - Some(true) => files.push(path), + Some(true) => files.push(file_info), _ => {} } } - let msg = EncryptedMessage::FileNegotiationMessage(FileNegotiationPayload { files }); + let msg = EncryptedMessage::FileNegotiationMessage(FileNegotiationPayload { + files: files.clone(), + }); let server_msg = msg.to_encrypted_message(cipher)?; stream.send(server_msg).await?; - Ok(()) + Ok(files) } pub async fn upload_encrypted_files( stream: &mut MessageStream, - file_paths: &Vec, - key: Bytes, + handles: Vec, + cipher: &Aes256Gcm, ) -> Result<()> { - return Ok(()); + let (tx, mut rx) = mpsc::unbounded_channel::(); + for mut handle in handles { + let txc = tx.clone(); + tokio::spawn(async move { + let _ = enqueue_file_chunks(&mut handle, txc).await; + }); + } + + loop { + tokio::select! { + Some(msg) = rx.recv() => { + // println!("message received to client.rx {:?}", msg); + let x = msg.to_encrypted_message(cipher)?; + stream.send(x).await? + } + else => { + println!("breaking"); + break + }, + } + } + Ok(()) +} + +pub async fn enqueue_file_chunks( + fh: &mut FileHandle, + tx: mpsc::UnboundedSender, +) -> Result<()> { + let mut chunk_num = 0; + let mut bytes_read = 1; + while bytes_read != 0 { + let mut buf = BytesMut::with_capacity(BUFFER_SIZE); + bytes_read = fh.file.read_buf(&mut buf).await?; + // println!("Bytes_read: {:?}, The bytes: {:?}", bytes_read, &buf[..]); + if bytes_read != 0 { + let chunk = buf.freeze(); + let file_info = fh.to_file_info(); + let ftp = EncryptedMessage::FileTransferMessage(FileTransferPayload { + chunk, + chunk_num, + file_info, + }); + tx.send(ftp)?; + chunk_num += 1; + } + } + + Ok(()) +} + +pub async fn download_files( + file_infos: Vec, + stream: &mut MessageStream, + cipher: &Aes256Gcm, +) -> Result<()> { + // for each file_info + let mut info_handles: HashMap> = HashMap::new(); + for fi in file_infos { + let (tx, rx) = mpsc::unbounded_channel::<(u64, Bytes)>(); + let path = fi.path.clone(); + tokio::spawn(async move { download_file(fi, rx).await }); + info_handles.insert(path, tx); + } + loop { + tokio::select! { + result = stream.next() => match result { + Some(Ok(Message::EncryptedMessage(payload))) => { + let ec = EncryptedMessage::from_encrypted_message(cipher, &payload)?; + // println!("encrypted message received! {:?}", ec); + match ec { + EncryptedMessage::FileTransferMessage(payload) => { + println!("matched file transfer message"); + if let Some(tx) = info_handles.get(&payload.file_info.path) { + println!("matched on filetype, sending to tx"); + tx.send((payload.chunk_num, payload.chunk))? + }; + }, + _ => {println!("wrong msg")} + } + } + Some(Ok(_)) => { + println!("wrong msg"); + } + Some(Err(e)) => { + println!("Error {:?}", e); + } + None => break, + } + } + } + Ok(()) +} + +pub async fn download_file( + file_info: FileInfo, + rx: mpsc::UnboundedReceiver<(u64, Bytes)>, +) -> Result<()> { + println!("in download file"); + let mut rx = rx; + let filename = match file_info.path.file_name() { + Some(f) => { + println!("matched filename"); + f + } + None => { + println!("didnt match filename"); + OsStr::new("random.txt") + } + }; + println!("trying to create file...filename={:?}", filename); + let mut file = File::create(filename).await?; + println!("file created ok! filename={:?}", filename); + while let Some((_chunk_num, chunk)) = rx.recv().await { + // println!("rx got message! chunk={:?}", chunk); + file.write_all(&chunk).await?; + } + println!("done receiving messages"); + Ok(()) } pub async fn prompt_user_input( @@ -147,7 +277,7 @@ pub async fn prompt_user_input( ) -> Option { let prompt_name = file_info.path.file_name().unwrap(); println!( - "Do you want to download {:?}? It's {:?}. (Y/n)", + "Accept {:?}? ({:?}). (Y/n)", prompt_name, to_size_string(file_info.size) ); diff --git a/src/conf.rs b/src/conf.rs new file mode 100644 index 0000000..07a6f99 --- /dev/null +++ b/src/conf.rs @@ -0,0 +1 @@ +pub const BUFFER_SIZE: usize = 1024 * 1024; diff --git a/src/crypto.rs b/src/crypto.rs index 789175a..43cf8ef 100644 --- a/src/crypto.rs +++ b/src/crypto.rs @@ -10,7 +10,6 @@ use spake2::{Ed25519Group, Identity, Password, Spake2}; pub async fn handshake( stream: &mut MessageStream, - up: bool, password: Bytes, id: Bytes, ) -> Result<(&mut MessageStream, Aes256Gcm)> { @@ -18,11 +17,10 @@ pub async fn handshake( Spake2::::start_symmetric(&Password::new(password), &Identity::new(&id)); println!("client - sending handshake msg"); let handshake_msg = Message::HandshakeMessage(HandshakePayload { - up, id, msg: Bytes::from(outbound_msg), }); - println!("client - handshake msg, {:?}", handshake_msg); + // println!("client - handshake msg, {:?}", handshake_msg); stream.send(handshake_msg).await?; let first_message = match stream.next().await { Some(Ok(msg)) => match msg { @@ -38,7 +36,7 @@ pub async fn handshake( Ok(key_bytes) => key_bytes, Err(e) => return Err(anyhow!(e.to_string())), }; - println!("Handshake successful. Key is {:?}", key); + // println!("Handshake successful. Key is {:?}", key); return Ok((stream, new_cipher(&key))); } diff --git a/src/file.rs b/src/file.rs index 79f6c02..3e03c82 100644 --- a/src/file.rs +++ b/src/file.rs @@ -1,4 +1,4 @@ -use anyhow::{anyhow, Result}; +use anyhow::Result; use serde::{Deserialize, Serialize}; use std::fs::Metadata; use std::path::PathBuf; diff --git a/src/main.rs b/src/main.rs index d79940f..65c2518 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,13 +1,11 @@ mod cli; mod client; +mod conf; mod crypto; mod file; mod message; mod server; -#[cfg(test)] -mod tests; - use clap::Parser; use cli::{Cli, Commands}; use client::{receive, send}; diff --git a/src/message.rs b/src/message.rs index 769d015..10593a2 100644 --- a/src/message.rs +++ b/src/message.rs @@ -3,7 +3,6 @@ use crate::file::FileInfo; use aes_gcm::Aes256Gcm; // Or `Aes128Gcm` use anyhow::{anyhow, Result}; -use bincode::config; use bytes::Bytes; use serde::{Deserialize, Serialize}; use std::error::Error; @@ -21,7 +20,6 @@ pub enum Message { #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct HandshakePayload { - pub up: bool, pub id: Bytes, pub msg: Bytes, } @@ -35,6 +33,7 @@ pub struct EncryptedPayload { #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub enum EncryptedMessage { FileNegotiationMessage(FileNegotiationPayload), + FileTransferMessage(FileTransferPayload), } #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] @@ -42,6 +41,13 @@ pub struct FileNegotiationPayload { pub files: Vec, } +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct FileTransferPayload { + pub file_info: FileInfo, + pub chunk_num: u64, + pub chunk: Bytes, +} + impl EncryptedMessage { pub fn from_encrypted_message(cipher: &Aes256Gcm, payload: &EncryptedPayload) -> Result { let raw = decrypt(cipher, payload)?; diff --git a/src/server.rs b/src/server.rs index cf155b0..759e154 100644 --- a/src/server.rs +++ b/src/server.rs @@ -7,115 +7,85 @@ use std::collections::HashMap; use std::net::SocketAddr; use std::sync::Arc; use tokio::net::{TcpListener, TcpStream}; -use tokio::sync::{mpsc, oneshot, Mutex}; +use tokio::sync::{mpsc, Mutex}; type Tx = mpsc::UnboundedSender; type Rx = mpsc::UnboundedReceiver; pub struct Shared { - handshakes: HashMap, - senders: HashMap, - receivers: HashMap, + handshake_cache: HashMap, } type State = Arc>; -struct Client<'a> { - up: bool, - id: Bytes, - messages: &'a mut MessageStream, +struct Client { + messages: MessageStream, rx: Rx, + peer_tx: Option, +} +struct StapledClient { + messages: MessageStream, + rx: Rx, + peer_tx: Tx, } impl Shared { fn new() -> Self { Shared { - handshakes: HashMap::new(), - senders: HashMap::new(), - receivers: HashMap::new(), + handshake_cache: HashMap::new(), } } - async fn relay<'a>(&self, client: &Client<'a>, message: Message) -> Result<()> { - println!("in relay - got client={:?}, msg {:?}", client.id, message); - match client.up { - true => match self.receivers.get(&client.id) { - Some(tx) => { - tx.send(message)?; - } - None => { - return Err(anyhow!(RuckError::PairDisconnected)); - } - }, - false => match self.senders.get(&client.id) { - Some(tx) => { - tx.send(message)?; - } - None => { - return Err(anyhow!(RuckError::PairDisconnected)); - } - }, - } - Ok(()) - } } -impl<'a> Client<'a> { - async fn new( - up: bool, - id: Bytes, - state: State, - messages: &'a mut MessageStream, - ) -> Result> { +impl Client { + async fn new(id: Bytes, state: State, messages: MessageStream) -> Result { let (tx, rx) = mpsc::unbounded_channel(); - println!("server - creating client up={:?}, id={:?}", up, id); - let shared = &mut state.lock().await; - match shared.senders.get(&id) { - Some(_) if up => { - messages - .send(Message::ErrorMessage(RuckError::SenderAlreadyConnected)) - .await?; - } - Some(_) => { - println!("server - adding client to receivers"); - shared.receivers.insert(id.clone(), tx); - } - None if up => { - println!("server - adding client to senders"); - shared.senders.insert(id.clone(), tx); - } - None => { - messages - .send(Message::ErrorMessage(RuckError::SenderNotConnected)) - .await?; - } - } - Ok(Client { - up, - id, + let mut shared = state.lock().await; + let client = Client { messages, rx, - }) + peer_tx: shared.handshake_cache.remove(&id), + }; + shared.handshake_cache.insert(id, tx); + Ok(client) } - async fn complete_handshake(&mut self, state: State, msg: Message) -> Result<()> { - match self.up { - true => { - let (tx, rx) = mpsc::unbounded_channel(); - tx.send(msg)?; - state.lock().await.handshakes.insert(self.id.clone(), rx); - } - false => { - let shared = &mut state.lock().await; - if let Some(tx) = shared.senders.get(&self.id) { - tx.send(msg)?; - } - if let Some(mut rx) = shared.handshakes.remove(&self.id) { - drop(shared); - if let Some(msg) = rx.recv().await { - self.messages.send(msg).await?; + + async fn upgrade( + client: Client, + state: State, + handshake_payload: HandshakePayload, + ) -> Result { + let mut client = client; + let peer_tx = match client.peer_tx { + // Receiver - already stapled at creation + Some(peer_tx) => peer_tx, + // Sender - needs to wait for the incoming msg to look up peer_tx + None => { + tokio::select! { + Some(msg) = client.rx.recv() => { + client.messages.send(msg).await? + } + result = client.messages.next() => match result { + Some(_) => return Err(anyhow!("Client sending more messages before handshake complete")), + None => return Err(anyhow!("Connection interrupted")), } } + match state + .lock() + .await + .handshake_cache + .remove(&handshake_payload.id) + { + Some(peer_tx) => peer_tx, + None => return Err(anyhow!("Connection not stapled")), + } } - } - Ok(()) + }; + peer_tx.send(Message::HandshakeMessage(handshake_payload))?; + Ok(StapledClient { + messages: client.messages, + rx: client.rx, + peer_tx, + }) } } @@ -156,29 +126,25 @@ pub async fn handle_connection( return Ok(()); } }; - println!("server - received msg from {:?}", addr); - let mut client = Client::new( - handshake_payload.up, - handshake_payload.id.clone(), - state.clone(), - &mut stream, - ) - .await?; - client - .complete_handshake(state.clone(), Message::HandshakeMessage(handshake_payload)) - .await?; - // add client to state here + let id = handshake_payload.id.clone(); + let client = Client::new(id.clone(), state.clone(), stream).await?; + let mut client = match Client::upgrade(client, state.clone(), handshake_payload).await { + Ok(client) => client, + Err(err) => { + // Clear handshake cache if staple is unsuccessful + state.lock().await.handshake_cache.remove(&id); + return Err(err); + } + }; + // The handshake cache should be empty for {id} at this point. loop { tokio::select! { Some(msg) = client.rx.recv() => { - println!("message received to client.rx {:?}", msg); client.messages.send(msg).await? } result = client.messages.next() => match result { Some(Ok(msg)) => { - println!("GOT: {:?}", msg); - let state = state.lock().await; - state.relay(&client, msg).await?; + client.peer_tx.send(msg)? } Some(Err(e)) => { println!("Error {:?}", e); @@ -187,6 +153,5 @@ pub async fn handle_connection( } } } - // client is disconnected, let's remove them from the state Ok(()) } diff --git a/src/tests.rs b/src/tests.rs deleted file mode 100644 index 50e2f2d..0000000 --- a/src/tests.rs +++ /dev/null @@ -1,11 +0,0 @@ -use crate::crypto::new_cipher; -use crate::file::FileHandle; - -use std::path::PathBuf; - -#[tokio::test] -async fn test_file_handle_nonexistent_file() { - let pb = PathBuf::new(); - let fh = FileHandle::new(pb).await; - assert!(fh.is_err()); -}