From 7befc61ab34a1103267de152d94a07354cea144f Mon Sep 17 00:00:00 2001 From: rictorlome Date: Sun, 13 Feb 2022 17:45:58 -0500 Subject: [PATCH 1/7] Scaffold for concurrent file reads --- README.md | 6 +++++ src/client.rs | 61 +++++++++++++++++++++++++++++++++++++++++++++----- src/file.rs | 2 ++ src/message.rs | 8 +++++++ 4 files changed, 72 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index d556140..e68c617 100644 --- a/README.md +++ b/README.md @@ -15,3 +15,9 @@ - https://github.com/rust-lang/flate2-rs - https://crates.io/crates/async-compression + + +- https://pdos.csail.mit.edu/papers/chord:sigcomm01/chord_sigcomm.pdf +- https://pdos.csail.mit.edu/6.824/schedule.html +- https://flylib.com/books/en/2.292.1.105/1/ +- https://en.wikipedia.org/wiki/Two_Generals%27_Problem \ No newline at end of file diff --git a/src/client.rs b/src/client.rs index ef63dd5..74988b0 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,6 +1,8 @@ use crate::crypto::handshake; use crate::file::{to_size_string, FileHandle, FileInfo}; -use crate::message::{EncryptedMessage, FileNegotiationPayload, Message, MessageStream}; +use crate::message::{ + EncryptedMessage, FileNegotiationPayload, FileTransferPayload, Message, MessageStream, +}; use aes_gcm::Aes256Gcm; use anyhow::{anyhow, Result}; @@ -8,9 +10,13 @@ use blake2::{Blake2s256, Digest}; use bytes::{Bytes, BytesMut}; use futures::future::try_join_all; use futures::prelude::*; +use futures::stream::FuturesUnordered; +use futures::StreamExt; use std::path::PathBuf; -use tokio::io; +use std::pin::Pin; +use tokio::io::{self, AsyncReadExt}; use tokio::net::TcpStream; +use tokio::sync::mpsc; use tokio_util::codec::{FramedRead, LinesCodec}; fn pass_to_bytes(password: &String) -> Bytes { @@ -135,10 +141,55 @@ pub async fn negotiate_files_down(stream: &mut MessageStream, cipher: &Aes256Gcm pub async fn upload_encrypted_files( stream: &mut MessageStream, - file_paths: &Vec, - key: Bytes, + handles: Vec, + cipher: &Aes256Gcm, ) -> Result<()> { - return Ok(()); + let (tx, mut rx) = mpsc::unbounded_channel::(); + //turn foo into something more concrete + for mut handle in handles { + let txc = tx.clone(); + tokio::spawn(async move { + let _ = enqueue_file_chunks(&mut handle, txc).await; + }); + } + + loop { + tokio::select! { + Some(msg) = rx.recv() => { + println!("message received to client.rx {:?}", msg); + let x = msg.to_encrypted_message(cipher)?; + stream.send(x).await? + } + else => break, + } + } + Ok(()) +} +const BUFFER_SIZE: usize = 1024 * 64; +pub async fn enqueue_file_chunks( + fh: &mut FileHandle, + tx: mpsc::UnboundedSender, +) -> Result<()> { + // let mut buf = BytesMut::with_capacity(BUFFER_SIZE); + + // // The `read` method is defined by this trait. + // let mut chunk_num = 0; + // while { + // let n = fh.file.read(&mut buf[..]).await?; + // n == 0 + // } { + // let chunk = buf.freeze(); + // let file_info = fh.to_file_info(); + // let ftp = EncryptedMessage::FileTransferMessage(FileTransferPayload { + // chunk, + // chunk_num, + // file_info, + // }); + // tx.send(ftp); + // chunk_num += 1; + // } + + Ok(()) } pub async fn prompt_user_input( diff --git a/src/file.rs b/src/file.rs index 79f6c02..362c3f2 100644 --- a/src/file.rs +++ b/src/file.rs @@ -1,3 +1,5 @@ +use crate::message::FileTransferPayload; + use anyhow::{anyhow, Result}; use serde::{Deserialize, Serialize}; use std::fs::Metadata; diff --git a/src/message.rs b/src/message.rs index 769d015..5819978 100644 --- a/src/message.rs +++ b/src/message.rs @@ -35,6 +35,7 @@ pub struct EncryptedPayload { #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub enum EncryptedMessage { FileNegotiationMessage(FileNegotiationPayload), + FileTransferMessage(FileTransferPayload), } #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] @@ -42,6 +43,13 @@ pub struct FileNegotiationPayload { pub files: Vec, } +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct FileTransferPayload { + pub file_info: FileInfo, + pub chunk_num: u64, + pub chunk: Bytes, +} + impl EncryptedMessage { pub fn from_encrypted_message(cipher: &Aes256Gcm, payload: &EncryptedPayload) -> Result { let raw = decrypt(cipher, payload)?; From e701d69946991cdecd9ad4f2cd3214e874fb8408 Mon Sep 17 00:00:00 2001 From: rictorlome Date: Sun, 13 Feb 2022 22:02:47 -0500 Subject: [PATCH 2/7] Simplify connection stapling --- src/client.rs | 2 - src/crypto.rs | 2 - src/message.rs | 1 - src/server.rs | 149 +++++++++++++++++-------------------------------- 4 files changed, 52 insertions(+), 102 deletions(-) diff --git a/src/client.rs b/src/client.rs index 74988b0..35355ee 100644 --- a/src/client.rs +++ b/src/client.rs @@ -37,7 +37,6 @@ pub async fn send(file_paths: &Vec, password: &String) -> Result<()> { // Complete handshake, returning cipher used for encryption let (stream, cipher) = handshake( &mut stream, - true, Bytes::from(password.to_string()), pass_to_bytes(password), ) @@ -57,7 +56,6 @@ pub async fn receive(password: &String) -> Result<()> { let mut stream = Message::to_stream(socket); let (stream, cipher) = handshake( &mut stream, - false, Bytes::from(password.to_string()), pass_to_bytes(password), ) diff --git a/src/crypto.rs b/src/crypto.rs index 789175a..7074718 100644 --- a/src/crypto.rs +++ b/src/crypto.rs @@ -10,7 +10,6 @@ use spake2::{Ed25519Group, Identity, Password, Spake2}; pub async fn handshake( stream: &mut MessageStream, - up: bool, password: Bytes, id: Bytes, ) -> Result<(&mut MessageStream, Aes256Gcm)> { @@ -18,7 +17,6 @@ pub async fn handshake( Spake2::::start_symmetric(&Password::new(password), &Identity::new(&id)); println!("client - sending handshake msg"); let handshake_msg = Message::HandshakeMessage(HandshakePayload { - up, id, msg: Bytes::from(outbound_msg), }); diff --git a/src/message.rs b/src/message.rs index 5819978..10c440f 100644 --- a/src/message.rs +++ b/src/message.rs @@ -21,7 +21,6 @@ pub enum Message { #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct HandshakePayload { - pub up: bool, pub id: Bytes, pub msg: Bytes, } diff --git a/src/server.rs b/src/server.rs index cf155b0..8da91be 100644 --- a/src/server.rs +++ b/src/server.rs @@ -13,109 +13,74 @@ type Tx = mpsc::UnboundedSender; type Rx = mpsc::UnboundedReceiver; pub struct Shared { - handshakes: HashMap, - senders: HashMap, - receivers: HashMap, + handshake_cache: HashMap, } type State = Arc>; -struct Client<'a> { - up: bool, - id: Bytes, - messages: &'a mut MessageStream, +struct Client { + messages: MessageStream, rx: Rx, + peer_tx: Option, +} +struct StapledClient { + messages: MessageStream, + rx: Rx, + peer_tx: Tx, } impl Shared { fn new() -> Self { Shared { - handshakes: HashMap::new(), - senders: HashMap::new(), - receivers: HashMap::new(), + handshake_cache: HashMap::new(), } } - async fn relay<'a>(&self, client: &Client<'a>, message: Message) -> Result<()> { - println!("in relay - got client={:?}, msg {:?}", client.id, message); - match client.up { - true => match self.receivers.get(&client.id) { - Some(tx) => { - tx.send(message)?; - } - None => { - return Err(anyhow!(RuckError::PairDisconnected)); - } - }, - false => match self.senders.get(&client.id) { - Some(tx) => { - tx.send(message)?; - } - None => { - return Err(anyhow!(RuckError::PairDisconnected)); - } - }, - } - Ok(()) - } } -impl<'a> Client<'a> { - async fn new( - up: bool, - id: Bytes, - state: State, - messages: &'a mut MessageStream, - ) -> Result> { +impl Client { + async fn new(id: Bytes, state: State, messages: MessageStream) -> Result { let (tx, rx) = mpsc::unbounded_channel(); - println!("server - creating client up={:?}, id={:?}", up, id); - let shared = &mut state.lock().await; - match shared.senders.get(&id) { - Some(_) if up => { - messages - .send(Message::ErrorMessage(RuckError::SenderAlreadyConnected)) - .await?; - } - Some(_) => { - println!("server - adding client to receivers"); - shared.receivers.insert(id.clone(), tx); - } - None if up => { - println!("server - adding client to senders"); - shared.senders.insert(id.clone(), tx); - } - None => { - messages - .send(Message::ErrorMessage(RuckError::SenderNotConnected)) - .await?; - } - } - Ok(Client { - up, - id, + let mut shared = state.lock().await; + let client = Client { messages, rx, - }) + peer_tx: shared.handshake_cache.remove(&id), + }; + shared.handshake_cache.insert(id, tx); + Ok(client) } - async fn complete_handshake(&mut self, state: State, msg: Message) -> Result<()> { - match self.up { - true => { - let (tx, rx) = mpsc::unbounded_channel(); - tx.send(msg)?; - state.lock().await.handshakes.insert(self.id.clone(), rx); - } - false => { - let shared = &mut state.lock().await; - if let Some(tx) = shared.senders.get(&self.id) { - tx.send(msg)?; - } - if let Some(mut rx) = shared.handshakes.remove(&self.id) { - drop(shared); - if let Some(msg) = rx.recv().await { - self.messages.send(msg).await?; - } + + async fn upgrade( + client: Client, + state: State, + handshake_payload: HandshakePayload, + ) -> Result { + let mut client = client; + let peer_tx = match client.peer_tx { + // Receiver - already stapled at creation + Some(peer_tx) => peer_tx, + // Sender - needs to wait for the incoming msg to look up peer_tx + None => { + match client.rx.recv().await { + Some(msg) => client.messages.send(msg).await?, + None => return Err(anyhow!("Connection not stapled")), + }; + match state + .lock() + .await + .handshake_cache + .remove(&handshake_payload.id) + { + Some(peer_tx) => peer_tx, + None => return Err(anyhow!("Connection not stapled")), } } - } - Ok(()) + }; + peer_tx.send(Message::HandshakeMessage(handshake_payload))?; + Ok(StapledClient { + messages: client.messages, + rx: client.rx, + peer_tx, + }) } } @@ -157,16 +122,8 @@ pub async fn handle_connection( } }; println!("server - received msg from {:?}", addr); - let mut client = Client::new( - handshake_payload.up, - handshake_payload.id.clone(), - state.clone(), - &mut stream, - ) - .await?; - client - .complete_handshake(state.clone(), Message::HandshakeMessage(handshake_payload)) - .await?; + let client = Client::new(handshake_payload.id.clone(), state.clone(), stream).await?; + let mut client = Client::upgrade(client, state.clone(), handshake_payload).await?; // add client to state here loop { tokio::select! { @@ -176,9 +133,7 @@ pub async fn handle_connection( } result = client.messages.next() => match result { Some(Ok(msg)) => { - println!("GOT: {:?}", msg); - let state = state.lock().await; - state.relay(&client, msg).await?; + client.peer_tx.send(msg)? } Some(Err(e)) => { println!("Error {:?}", e); From 92067817911a0ff08d8de0909e46d2cabfef9bfe Mon Sep 17 00:00:00 2001 From: rictorlome Date: Mon, 14 Feb 2022 23:01:30 -0500 Subject: [PATCH 3/7] Broken sending prototype --- src/client.rs | 122 ++++++++++++++++++++++++++++++++++++++------------ src/server.rs | 2 - 2 files changed, 94 insertions(+), 30 deletions(-) diff --git a/src/client.rs b/src/client.rs index 35355ee..f6db18c 100644 --- a/src/client.rs +++ b/src/client.rs @@ -12,9 +12,11 @@ use futures::future::try_join_all; use futures::prelude::*; use futures::stream::FuturesUnordered; use futures::StreamExt; +use std::collections::HashMap; +use std::ffi::OsStr; use std::path::PathBuf; -use std::pin::Pin; -use tokio::io::{self, AsyncReadExt}; +use tokio::fs::File; +use tokio::io::{self, AsyncReadExt, AsyncWriteExt}; use tokio::net::TcpStream; use tokio::sync::mpsc; use tokio_util::codec::{FramedRead, LinesCodec}; @@ -46,6 +48,7 @@ pub async fn send(file_paths: &Vec, password: &String) -> Result<()> { let handles = negotiate_files_up(handles, stream, &cipher).await?; // Upload negotiated files + upload_encrypted_files(stream, handles, &cipher).await?; // Exit Ok(()) @@ -62,6 +65,8 @@ pub async fn receive(password: &String) -> Result<()> { .await?; let files = negotiate_files_down(stream, &cipher).await?; + + download_files(files, stream, &cipher).await?; return Ok(()); } @@ -104,7 +109,10 @@ pub async fn negotiate_files_up( .collect()) } -pub async fn negotiate_files_down(stream: &mut MessageStream, cipher: &Aes256Gcm) -> Result<()> { +pub async fn negotiate_files_down( + stream: &mut MessageStream, + cipher: &Aes256Gcm, +) -> Result<(Vec)> { let file_offer = match stream.next().await { Some(Ok(msg)) => match msg { Message::EncryptedMessage(response) => response, @@ -121,20 +129,22 @@ pub async fn negotiate_files_down(stream: &mut MessageStream, cipher: &Aes256Gcm }; let mut stdin = FramedRead::new(io::stdin(), LinesCodec::new()); let mut files = vec![]; - for path in requested_infos.into_iter() { - let mut reply = prompt_user_input(&mut stdin, &path).await; + for file_info in requested_infos.into_iter() { + let mut reply = prompt_user_input(&mut stdin, &file_info).await; while reply.is_none() { - reply = prompt_user_input(&mut stdin, &path).await; + reply = prompt_user_input(&mut stdin, &file_info).await; } match reply { - Some(true) => files.push(path), + Some(true) => files.push(file_info), _ => {} } } - let msg = EncryptedMessage::FileNegotiationMessage(FileNegotiationPayload { files }); + let msg = EncryptedMessage::FileNegotiationMessage(FileNegotiationPayload { + files: files.clone(), + }); let server_msg = msg.to_encrypted_message(cipher)?; stream.send(server_msg).await?; - Ok(()) + Ok(files) } pub async fn upload_encrypted_files( @@ -143,7 +153,6 @@ pub async fn upload_encrypted_files( cipher: &Aes256Gcm, ) -> Result<()> { let (tx, mut rx) = mpsc::unbounded_channel::(); - //turn foo into something more concrete for mut handle in handles { let txc = tx.clone(); tokio::spawn(async move { @@ -168,25 +177,82 @@ pub async fn enqueue_file_chunks( fh: &mut FileHandle, tx: mpsc::UnboundedSender, ) -> Result<()> { - // let mut buf = BytesMut::with_capacity(BUFFER_SIZE); + let mut chunk_num = 0; + let mut buf: BytesMut; + while { + buf = BytesMut::with_capacity(BUFFER_SIZE); + let n = fh.file.read_exact(&mut buf[..]).await?; + n == 0 + } { + let chunk = buf.freeze(); + let file_info = fh.to_file_info(); + let ftp = EncryptedMessage::FileTransferMessage(FileTransferPayload { + chunk, + chunk_num, + file_info, + }); + tx.send(ftp)?; + chunk_num += 1; + } - // // The `read` method is defined by this trait. - // let mut chunk_num = 0; - // while { - // let n = fh.file.read(&mut buf[..]).await?; - // n == 0 - // } { - // let chunk = buf.freeze(); - // let file_info = fh.to_file_info(); - // let ftp = EncryptedMessage::FileTransferMessage(FileTransferPayload { - // chunk, - // chunk_num, - // file_info, - // }); - // tx.send(ftp); - // chunk_num += 1; - // } + Ok(()) +} +pub async fn download_files( + file_infos: Vec, + stream: &mut MessageStream, + cipher: &Aes256Gcm, +) -> Result<()> { + // for each file_info + let info_handles: HashMap<_, _> = file_infos + .into_iter() + .map(|fi| { + let (tx, rx) = mpsc::unbounded_channel::<(u64, Bytes)>(); + let path = fi.path.clone(); + tokio::spawn(async move { download_file(fi, rx).await }); + (path, tx) + }) + .collect(); + loop { + tokio::select! { + result = stream.next() => match result { + Some(Ok(Message::EncryptedMessage(payload))) => { + let ec = EncryptedMessage::from_encrypted_message(cipher, &payload)?; + match ec { + EncryptedMessage::FileTransferMessage(payload) => { + if let Some(tx) = info_handles.get(&payload.file_info.path) { + tx.send((payload.chunk_num, payload.chunk))? + }; + }, + _ => {println!("wrong msg")} + } + } + Some(Ok(_)) => { + println!("wrong msg"); + } + Some(Err(e)) => { + println!("Error {:?}", e); + } + None => break, + } + } + } + Ok(()) +} + +pub async fn download_file( + file_info: FileInfo, + rx: mpsc::UnboundedReceiver<(u64, Bytes)>, +) -> Result<()> { + let mut rx = rx; + let filename = match file_info.path.file_name() { + Some(f) => f, + None => OsStr::new("random.txt"), + }; + let mut file = File::open(filename).await?; + while let Some((chunk_num, chunk)) = rx.recv().await { + file.write_all(&chunk).await?; + } Ok(()) } @@ -196,7 +262,7 @@ pub async fn prompt_user_input( ) -> Option { let prompt_name = file_info.path.file_name().unwrap(); println!( - "Do you want to download {:?}? It's {:?}. (Y/n)", + "Accept {:?}? ({:?}). (Y/n)", prompt_name, to_size_string(file_info.size) ); diff --git a/src/server.rs b/src/server.rs index 8da91be..c52bc9f 100644 --- a/src/server.rs +++ b/src/server.rs @@ -124,7 +124,6 @@ pub async fn handle_connection( println!("server - received msg from {:?}", addr); let client = Client::new(handshake_payload.id.clone(), state.clone(), stream).await?; let mut client = Client::upgrade(client, state.clone(), handshake_payload).await?; - // add client to state here loop { tokio::select! { Some(msg) = client.rx.recv() => { @@ -142,6 +141,5 @@ pub async fn handle_connection( } } } - // client is disconnected, let's remove them from the state Ok(()) } From d29015f0856a3e4a1d9189558e0714ea9671cd45 Mon Sep 17 00:00:00 2001 From: rictorlome Date: Tue, 15 Feb 2022 20:55:33 -0500 Subject: [PATCH 4/7] Fixed sending files prototype --- src/client.rs | 72 +++++++++++++++++++++++++++++++-------------------- 1 file changed, 44 insertions(+), 28 deletions(-) diff --git a/src/client.rs b/src/client.rs index f6db18c..91ad717 100644 --- a/src/client.rs +++ b/src/client.rs @@ -167,7 +167,10 @@ pub async fn upload_encrypted_files( let x = msg.to_encrypted_message(cipher)?; stream.send(x).await? } - else => break, + else => { + println!("breaking"); + break + }, } } Ok(()) @@ -178,21 +181,22 @@ pub async fn enqueue_file_chunks( tx: mpsc::UnboundedSender, ) -> Result<()> { let mut chunk_num = 0; - let mut buf: BytesMut; - while { - buf = BytesMut::with_capacity(BUFFER_SIZE); - let n = fh.file.read_exact(&mut buf[..]).await?; - n == 0 - } { - let chunk = buf.freeze(); - let file_info = fh.to_file_info(); - let ftp = EncryptedMessage::FileTransferMessage(FileTransferPayload { - chunk, - chunk_num, - file_info, - }); - tx.send(ftp)?; - chunk_num += 1; + let mut bytes_read = 1; + while bytes_read != 0 { + let mut buf = BytesMut::with_capacity(BUFFER_SIZE); + bytes_read = fh.file.read_buf(&mut buf).await?; + println!("Bytes_read: {:?}, The bytes: {:?}", bytes_read, &buf[..]); + if bytes_read != 0 { + let chunk = buf.freeze(); + let file_info = fh.to_file_info(); + let ftp = EncryptedMessage::FileTransferMessage(FileTransferPayload { + chunk, + chunk_num, + file_info, + }); + tx.send(ftp)?; + chunk_num += 1; + } } Ok(()) @@ -204,23 +208,24 @@ pub async fn download_files( cipher: &Aes256Gcm, ) -> Result<()> { // for each file_info - let info_handles: HashMap<_, _> = file_infos - .into_iter() - .map(|fi| { - let (tx, rx) = mpsc::unbounded_channel::<(u64, Bytes)>(); - let path = fi.path.clone(); - tokio::spawn(async move { download_file(fi, rx).await }); - (path, tx) - }) - .collect(); + let mut info_handles: HashMap> = HashMap::new(); + for fi in file_infos { + let (tx, rx) = mpsc::unbounded_channel::<(u64, Bytes)>(); + let path = fi.path.clone(); + tokio::spawn(async move { download_file(fi, rx).await }); + info_handles.insert(path, tx); + } loop { tokio::select! { result = stream.next() => match result { Some(Ok(Message::EncryptedMessage(payload))) => { let ec = EncryptedMessage::from_encrypted_message(cipher, &payload)?; + println!("encrypted message received! {:?}", ec); match ec { EncryptedMessage::FileTransferMessage(payload) => { + println!("matched file transfer message"); if let Some(tx) = info_handles.get(&payload.file_info.path) { + println!("matched on filetype, sending to tx"); tx.send((payload.chunk_num, payload.chunk))? }; }, @@ -244,15 +249,26 @@ pub async fn download_file( file_info: FileInfo, rx: mpsc::UnboundedReceiver<(u64, Bytes)>, ) -> Result<()> { + println!("in download file"); let mut rx = rx; let filename = match file_info.path.file_name() { - Some(f) => f, - None => OsStr::new("random.txt"), + Some(f) => { + println!("matched filename"); + f + } + None => { + println!("didnt match filename"); + OsStr::new("random.txt") + } }; - let mut file = File::open(filename).await?; + println!("trying to create file...filename={:?}", filename); + let mut file = File::create(filename).await?; + println!("file created ok! filename={:?}", filename); while let Some((chunk_num, chunk)) = rx.recv().await { + println!("rx got message! chunk={:?}", chunk); file.write_all(&chunk).await?; } + println!("done receiving messages"); Ok(()) } From d6e299b031a533fb2ac9d1ef4fce2baeeeded915 Mon Sep 17 00:00:00 2001 From: rictorlome Date: Wed, 16 Feb 2022 21:04:18 -0500 Subject: [PATCH 5/7] Reduce print statements; stub dockerfile --- Dockerfile | 32 ++++++++++++++++++++++++++++++++ README.md | 9 ++++++++- scripts/build.sh | 1 + scripts/run.sh | 1 + src/client.rs | 17 ++++++++--------- src/conf.rs | 1 + src/crypto.rs | 4 ++-- src/file.rs | 4 +--- src/main.rs | 4 +--- src/message.rs | 1 - src/server.rs | 6 +++--- src/tests.rs | 11 ----------- 12 files changed, 58 insertions(+), 33 deletions(-) create mode 100644 Dockerfile create mode 100755 scripts/build.sh create mode 100755 scripts/run.sh create mode 100644 src/conf.rs delete mode 100644 src/tests.rs diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..809ab65 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,32 @@ +FROM rust:1.58 as build + +# create a new empty shell project +RUN USER=root cargo new --bin ruck +WORKDIR /ruck + +# copy over your manifests +COPY ./Cargo.lock ./Cargo.lock +COPY ./Cargo.toml ./Cargo.toml + +# this build step will cache your dependencies +RUN cargo build --release + +# copy your source tree +RUN rm src/*.rs +COPY ./src ./src + +# build for release +RUN rm ./target/release/deps/ruck* +RUN cargo build --release + +# Copy the binary into a new container for a smaller docker image +FROM debian:buster-slim + +COPY --from=build ./target/release/ruck / +USER root + +ENV RUST_LOG=info +ENV RUST_BACKTRACE=full + +CMD ["/ruck", "relay"] + diff --git a/README.md b/README.md index e68c617..f4ca539 100644 --- a/README.md +++ b/README.md @@ -20,4 +20,11 @@ - https://pdos.csail.mit.edu/papers/chord:sigcomm01/chord_sigcomm.pdf - https://pdos.csail.mit.edu/6.824/schedule.html - https://flylib.com/books/en/2.292.1.105/1/ -- https://en.wikipedia.org/wiki/Two_Generals%27_Problem \ No newline at end of file +- https://en.wikipedia.org/wiki/Two_Generals%27_Problem + +## Todo + +- [ ] Use tracing +- [ ] Add progress bars +- [ ] Exit happily when transfer is complete +- [ ] Compress files \ No newline at end of file diff --git a/scripts/build.sh b/scripts/build.sh new file mode 100755 index 0000000..eda8514 --- /dev/null +++ b/scripts/build.sh @@ -0,0 +1 @@ +docker build -t ruck .. \ No newline at end of file diff --git a/scripts/run.sh b/scripts/run.sh new file mode 100755 index 0000000..7895090 --- /dev/null +++ b/scripts/run.sh @@ -0,0 +1 @@ +$ docker run -p 8080:3030 --rm --name ruck1 ruck \ No newline at end of file diff --git a/src/client.rs b/src/client.rs index 91ad717..60c4da0 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,3 +1,4 @@ +use crate::conf::BUFFER_SIZE; use crate::crypto::handshake; use crate::file::{to_size_string, FileHandle, FileInfo}; use crate::message::{ @@ -10,8 +11,6 @@ use blake2::{Blake2s256, Digest}; use bytes::{Bytes, BytesMut}; use futures::future::try_join_all; use futures::prelude::*; -use futures::stream::FuturesUnordered; -use futures::StreamExt; use std::collections::HashMap; use std::ffi::OsStr; use std::path::PathBuf; @@ -112,7 +111,7 @@ pub async fn negotiate_files_up( pub async fn negotiate_files_down( stream: &mut MessageStream, cipher: &Aes256Gcm, -) -> Result<(Vec)> { +) -> Result> { let file_offer = match stream.next().await { Some(Ok(msg)) => match msg { Message::EncryptedMessage(response) => response, @@ -163,7 +162,7 @@ pub async fn upload_encrypted_files( loop { tokio::select! { Some(msg) = rx.recv() => { - println!("message received to client.rx {:?}", msg); + // println!("message received to client.rx {:?}", msg); let x = msg.to_encrypted_message(cipher)?; stream.send(x).await? } @@ -175,7 +174,7 @@ pub async fn upload_encrypted_files( } Ok(()) } -const BUFFER_SIZE: usize = 1024 * 64; + pub async fn enqueue_file_chunks( fh: &mut FileHandle, tx: mpsc::UnboundedSender, @@ -185,7 +184,7 @@ pub async fn enqueue_file_chunks( while bytes_read != 0 { let mut buf = BytesMut::with_capacity(BUFFER_SIZE); bytes_read = fh.file.read_buf(&mut buf).await?; - println!("Bytes_read: {:?}, The bytes: {:?}", bytes_read, &buf[..]); + // println!("Bytes_read: {:?}, The bytes: {:?}", bytes_read, &buf[..]); if bytes_read != 0 { let chunk = buf.freeze(); let file_info = fh.to_file_info(); @@ -220,7 +219,7 @@ pub async fn download_files( result = stream.next() => match result { Some(Ok(Message::EncryptedMessage(payload))) => { let ec = EncryptedMessage::from_encrypted_message(cipher, &payload)?; - println!("encrypted message received! {:?}", ec); + // println!("encrypted message received! {:?}", ec); match ec { EncryptedMessage::FileTransferMessage(payload) => { println!("matched file transfer message"); @@ -264,8 +263,8 @@ pub async fn download_file( println!("trying to create file...filename={:?}", filename); let mut file = File::create(filename).await?; println!("file created ok! filename={:?}", filename); - while let Some((chunk_num, chunk)) = rx.recv().await { - println!("rx got message! chunk={:?}", chunk); + while let Some((_chunk_num, chunk)) = rx.recv().await { + // println!("rx got message! chunk={:?}", chunk); file.write_all(&chunk).await?; } println!("done receiving messages"); diff --git a/src/conf.rs b/src/conf.rs new file mode 100644 index 0000000..07a6f99 --- /dev/null +++ b/src/conf.rs @@ -0,0 +1 @@ +pub const BUFFER_SIZE: usize = 1024 * 1024; diff --git a/src/crypto.rs b/src/crypto.rs index 7074718..43cf8ef 100644 --- a/src/crypto.rs +++ b/src/crypto.rs @@ -20,7 +20,7 @@ pub async fn handshake( id, msg: Bytes::from(outbound_msg), }); - println!("client - handshake msg, {:?}", handshake_msg); + // println!("client - handshake msg, {:?}", handshake_msg); stream.send(handshake_msg).await?; let first_message = match stream.next().await { Some(Ok(msg)) => match msg { @@ -36,7 +36,7 @@ pub async fn handshake( Ok(key_bytes) => key_bytes, Err(e) => return Err(anyhow!(e.to_string())), }; - println!("Handshake successful. Key is {:?}", key); + // println!("Handshake successful. Key is {:?}", key); return Ok((stream, new_cipher(&key))); } diff --git a/src/file.rs b/src/file.rs index 362c3f2..3e03c82 100644 --- a/src/file.rs +++ b/src/file.rs @@ -1,6 +1,4 @@ -use crate::message::FileTransferPayload; - -use anyhow::{anyhow, Result}; +use anyhow::Result; use serde::{Deserialize, Serialize}; use std::fs::Metadata; use std::path::PathBuf; diff --git a/src/main.rs b/src/main.rs index d79940f..65c2518 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,13 +1,11 @@ mod cli; mod client; +mod conf; mod crypto; mod file; mod message; mod server; -#[cfg(test)] -mod tests; - use clap::Parser; use cli::{Cli, Commands}; use client::{receive, send}; diff --git a/src/message.rs b/src/message.rs index 10c440f..10593a2 100644 --- a/src/message.rs +++ b/src/message.rs @@ -3,7 +3,6 @@ use crate::file::FileInfo; use aes_gcm::Aes256Gcm; // Or `Aes128Gcm` use anyhow::{anyhow, Result}; -use bincode::config; use bytes::Bytes; use serde::{Deserialize, Serialize}; use std::error::Error; diff --git a/src/server.rs b/src/server.rs index c52bc9f..0930ead 100644 --- a/src/server.rs +++ b/src/server.rs @@ -7,7 +7,7 @@ use std::collections::HashMap; use std::net::SocketAddr; use std::sync::Arc; use tokio::net::{TcpListener, TcpStream}; -use tokio::sync::{mpsc, oneshot, Mutex}; +use tokio::sync::{mpsc, Mutex}; type Tx = mpsc::UnboundedSender; type Rx = mpsc::UnboundedReceiver; @@ -121,13 +121,13 @@ pub async fn handle_connection( return Ok(()); } }; - println!("server - received msg from {:?}", addr); + // println!("server - received msg from {:?}", addr); let client = Client::new(handshake_payload.id.clone(), state.clone(), stream).await?; let mut client = Client::upgrade(client, state.clone(), handshake_payload).await?; loop { tokio::select! { Some(msg) = client.rx.recv() => { - println!("message received to client.rx {:?}", msg); + // println!("message received to client.rx {:?}", msg); client.messages.send(msg).await? } result = client.messages.next() => match result { diff --git a/src/tests.rs b/src/tests.rs deleted file mode 100644 index 50e2f2d..0000000 --- a/src/tests.rs +++ /dev/null @@ -1,11 +0,0 @@ -use crate::crypto::new_cipher; -use crate::file::FileHandle; - -use std::path::PathBuf; - -#[tokio::test] -async fn test_file_handle_nonexistent_file() { - let pb = PathBuf::new(); - let fh = FileHandle::new(pb).await; - assert!(fh.is_err()); -} From 0c2041a6dbf0c2761475358edd04b8bf2fddf3aa Mon Sep 17 00:00:00 2001 From: rictorlome Date: Thu, 17 Feb 2022 14:01:40 -0500 Subject: [PATCH 6/7] Handle interrupted handshake better --- src/server.rs | 28 ++++++++++++++++++++-------- 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/src/server.rs b/src/server.rs index 0930ead..759e154 100644 --- a/src/server.rs +++ b/src/server.rs @@ -60,10 +60,15 @@ impl Client { Some(peer_tx) => peer_tx, // Sender - needs to wait for the incoming msg to look up peer_tx None => { - match client.rx.recv().await { - Some(msg) => client.messages.send(msg).await?, - None => return Err(anyhow!("Connection not stapled")), - }; + tokio::select! { + Some(msg) = client.rx.recv() => { + client.messages.send(msg).await? + } + result = client.messages.next() => match result { + Some(_) => return Err(anyhow!("Client sending more messages before handshake complete")), + None => return Err(anyhow!("Connection interrupted")), + } + } match state .lock() .await @@ -121,13 +126,20 @@ pub async fn handle_connection( return Ok(()); } }; - // println!("server - received msg from {:?}", addr); - let client = Client::new(handshake_payload.id.clone(), state.clone(), stream).await?; - let mut client = Client::upgrade(client, state.clone(), handshake_payload).await?; + let id = handshake_payload.id.clone(); + let client = Client::new(id.clone(), state.clone(), stream).await?; + let mut client = match Client::upgrade(client, state.clone(), handshake_payload).await { + Ok(client) => client, + Err(err) => { + // Clear handshake cache if staple is unsuccessful + state.lock().await.handshake_cache.remove(&id); + return Err(err); + } + }; + // The handshake cache should be empty for {id} at this point. loop { tokio::select! { Some(msg) = client.rx.recv() => { - // println!("message received to client.rx {:?}", msg); client.messages.send(msg).await? } result = client.messages.next() => match result { From 2a194eb1924d66ee4698407bd13bb778d0e300bc Mon Sep 17 00:00:00 2001 From: Donald Knuth Date: Thu, 17 Feb 2022 20:43:55 -0500 Subject: [PATCH 7/7] Partially correct dockerfile --- Dockerfile | 2 +- README.md | 6 +++--- scripts/build.sh | 2 +- scripts/run.sh | 2 +- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/Dockerfile b/Dockerfile index 809ab65..020b720 100644 --- a/Dockerfile +++ b/Dockerfile @@ -22,7 +22,7 @@ RUN cargo build --release # Copy the binary into a new container for a smaller docker image FROM debian:buster-slim -COPY --from=build ./target/release/ruck / +COPY --from=build /ruck/target/release/ruck / USER root ENV RUST_LOG=info diff --git a/README.md b/README.md index f4ca539..1465e2c 100644 --- a/README.md +++ b/README.md @@ -12,19 +12,19 @@ - https://kerkour.com/rust-symmetric-encryption-aead-benchmark/ - https://docs.rs/aes-gcm/latest/aes_gcm/ - - https://github.com/rust-lang/flate2-rs - https://crates.io/crates/async-compression - - https://pdos.csail.mit.edu/papers/chord:sigcomm01/chord_sigcomm.pdf - https://pdos.csail.mit.edu/6.824/schedule.html - https://flylib.com/books/en/2.292.1.105/1/ - https://en.wikipedia.org/wiki/Two_Generals%27_Problem +- https://kobzol.github.io/rust/ci/2021/05/07/building-rust-binaries-in-ci-that-work-with-older-glibc.html + ## Todo - [ ] Use tracing - [ ] Add progress bars - [ ] Exit happily when transfer is complete -- [ ] Compress files \ No newline at end of file +- [ ] Compress files diff --git a/scripts/build.sh b/scripts/build.sh index eda8514..dfceb4f 100755 --- a/scripts/build.sh +++ b/scripts/build.sh @@ -1 +1 @@ -docker build -t ruck .. \ No newline at end of file +docker build -t ruck . \ No newline at end of file diff --git a/scripts/run.sh b/scripts/run.sh index 7895090..5459a22 100755 --- a/scripts/run.sh +++ b/scripts/run.sh @@ -1 +1 @@ -$ docker run -p 8080:3030 --rm --name ruck1 ruck \ No newline at end of file +docker run -p 8080:3030 --rm --name ruck1 ruck \ No newline at end of file