From 7befc61ab34a1103267de152d94a07354cea144f Mon Sep 17 00:00:00 2001 From: rictorlome Date: Sun, 13 Feb 2022 17:45:58 -0500 Subject: [PATCH] Scaffold for concurrent file reads --- README.md | 6 +++++ src/client.rs | 61 +++++++++++++++++++++++++++++++++++++++++++++----- src/file.rs | 2 ++ src/message.rs | 8 +++++++ 4 files changed, 72 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index d556140..e68c617 100644 --- a/README.md +++ b/README.md @@ -15,3 +15,9 @@ - https://github.com/rust-lang/flate2-rs - https://crates.io/crates/async-compression + + +- https://pdos.csail.mit.edu/papers/chord:sigcomm01/chord_sigcomm.pdf +- https://pdos.csail.mit.edu/6.824/schedule.html +- https://flylib.com/books/en/2.292.1.105/1/ +- https://en.wikipedia.org/wiki/Two_Generals%27_Problem \ No newline at end of file diff --git a/src/client.rs b/src/client.rs index ef63dd5..74988b0 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,6 +1,8 @@ use crate::crypto::handshake; use crate::file::{to_size_string, FileHandle, FileInfo}; -use crate::message::{EncryptedMessage, FileNegotiationPayload, Message, MessageStream}; +use crate::message::{ + EncryptedMessage, FileNegotiationPayload, FileTransferPayload, Message, MessageStream, +}; use aes_gcm::Aes256Gcm; use anyhow::{anyhow, Result}; @@ -8,9 +10,13 @@ use blake2::{Blake2s256, Digest}; use bytes::{Bytes, BytesMut}; use futures::future::try_join_all; use futures::prelude::*; +use futures::stream::FuturesUnordered; +use futures::StreamExt; use std::path::PathBuf; -use tokio::io; +use std::pin::Pin; +use tokio::io::{self, AsyncReadExt}; use tokio::net::TcpStream; +use tokio::sync::mpsc; use tokio_util::codec::{FramedRead, LinesCodec}; fn pass_to_bytes(password: &String) -> Bytes { @@ -135,10 +141,55 @@ pub async fn negotiate_files_down(stream: &mut MessageStream, cipher: &Aes256Gcm pub async fn upload_encrypted_files( stream: &mut MessageStream, - file_paths: &Vec, - key: Bytes, + handles: Vec, + cipher: &Aes256Gcm, ) -> Result<()> { - return Ok(()); + let (tx, mut rx) = mpsc::unbounded_channel::(); + //turn foo into something more concrete + for mut handle in handles { + let txc = tx.clone(); + tokio::spawn(async move { + let _ = enqueue_file_chunks(&mut handle, txc).await; + }); + } + + loop { + tokio::select! { + Some(msg) = rx.recv() => { + println!("message received to client.rx {:?}", msg); + let x = msg.to_encrypted_message(cipher)?; + stream.send(x).await? + } + else => break, + } + } + Ok(()) +} +const BUFFER_SIZE: usize = 1024 * 64; +pub async fn enqueue_file_chunks( + fh: &mut FileHandle, + tx: mpsc::UnboundedSender, +) -> Result<()> { + // let mut buf = BytesMut::with_capacity(BUFFER_SIZE); + + // // The `read` method is defined by this trait. + // let mut chunk_num = 0; + // while { + // let n = fh.file.read(&mut buf[..]).await?; + // n == 0 + // } { + // let chunk = buf.freeze(); + // let file_info = fh.to_file_info(); + // let ftp = EncryptedMessage::FileTransferMessage(FileTransferPayload { + // chunk, + // chunk_num, + // file_info, + // }); + // tx.send(ftp); + // chunk_num += 1; + // } + + Ok(()) } pub async fn prompt_user_input( diff --git a/src/file.rs b/src/file.rs index 79f6c02..362c3f2 100644 --- a/src/file.rs +++ b/src/file.rs @@ -1,3 +1,5 @@ +use crate::message::FileTransferPayload; + use anyhow::{anyhow, Result}; use serde::{Deserialize, Serialize}; use std::fs::Metadata; diff --git a/src/message.rs b/src/message.rs index 769d015..5819978 100644 --- a/src/message.rs +++ b/src/message.rs @@ -35,6 +35,7 @@ pub struct EncryptedPayload { #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub enum EncryptedMessage { FileNegotiationMessage(FileNegotiationPayload), + FileTransferMessage(FileTransferPayload), } #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] @@ -42,6 +43,13 @@ pub struct FileNegotiationPayload { pub files: Vec, } +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct FileTransferPayload { + pub file_info: FileInfo, + pub chunk_num: u64, + pub chunk: Bytes, +} + impl EncryptedMessage { pub fn from_encrypted_message(cipher: &Aes256Gcm, payload: &EncryptedPayload) -> Result { let raw = decrypt(cipher, payload)?;