From e0567d8479736575fa401fac70f8d26d0995afb6 Mon Sep 17 00:00:00 2001 From: Donald Knuth Date: Fri, 2 Sep 2022 16:41:40 -0400 Subject: [PATCH] Progress --- src/client.rs | 199 +++++++++++++++------------------------------- src/conf.rs | 7 +- src/connection.rs | 69 +++++++++++++++- src/crypto.rs | 10 +-- src/file.rs | 99 +++++++++++++++-------- src/message.rs | 24 +++--- src/ui.rs | 24 +++++- 7 files changed, 241 insertions(+), 191 deletions(-) diff --git a/src/client.rs b/src/client.rs index a5677a9..69bdcac 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,20 +1,16 @@ -use crate::conf::BUFFER_SIZE; use crate::connection::Connection; -use crate::file::{FileHandle, FileInfo}; +use crate::file::{ChunkHeader, FileHandle, FileOffer, StdFileHandle}; use crate::handshake::Handshake; -use crate::message::{FileNegotiationPayload, FileTransferPayload, Message}; -use crate::ui::prompt_user_input; +use crate::message::{FileOfferPayload, FileRequestPayload, Message}; +use crate::ui::prompt_user_for_file_confirmation; use anyhow::{anyhow, Result}; -use bytes::{Bytes, BytesMut}; -use std::collections::HashMap; + use std::ffi::OsStr; use std::path::PathBuf; use tokio::fs::File; -use tokio::io::{self, AsyncReadExt, AsyncWriteExt}; + use tokio::net::TcpStream; -use tokio::sync::mpsc; -use tokio_util::codec::{FramedRead, LinesCodec}; pub async fn send(file_paths: &Vec, password: &String) -> Result<()> { // Fail early if there are problems generating file handles @@ -28,11 +24,12 @@ pub async fn send(file_paths: &Vec, password: &String) -> Result<()> { let (socket, key) = handshake.negotiate(socket, s1).await?; let mut connection = Connection::new(socket, key); - // Complete file negotiation - let handles = negotiate_files_up(&mut connection, handles).await?; + // Offer files, wait for requested file response + let requested_chunks = offer_files(&mut connection, &handles).await?; // Upload negotiated files - upload_encrypted_files(&mut connection, handles).await?; + let std_file_handles = FileHandle::to_stds(handles, requested_chunks).await; + connection.upload_files(std_file_handles).await?; println!("Done uploading."); // Exit @@ -40,144 +37,76 @@ pub async fn send(file_paths: &Vec, password: &String) -> Result<()> { } pub async fn receive(password: &String) -> Result<()> { + // Establish connection to server let socket = TcpStream::connect("127.0.0.1:8080").await?; let (handshake, s1) = Handshake::from_password(password); + // Complete handshake, returning key used for encryption let (socket, key) = handshake.negotiate(socket, s1).await?; let mut connection = Connection::new(socket, key); - let files = negotiate_files_down(&mut connection).await?; - - download_files(&mut connection, files).await?; + // Wait for offered files, respond with desired files + let desired_files = request_specific_files(&mut connection).await?; + // Create files + let std_file_handles = create_files(desired_files).await?; + // Download them + connection.download_files(std_file_handles).await?; return Ok(()); } -pub async fn negotiate_files_up( +pub async fn offer_files( conn: &mut Connection, - file_handles: Vec, -) -> Result> { - let files = file_handles.iter().map(|fh| fh.to_file_info()).collect(); - let msg = Message::FileNegotiationMessage(FileNegotiationPayload { files }); + file_handles: &Vec, +) -> Result> { + // Collect file offer + let files = file_handles.iter().map(|fh| fh.to_file_offer()).collect(); + let msg = Message::FileOffer(FileOfferPayload { files }); + // Send file offer conn.send_msg(msg).await?; + // Wait for reply let reply = conn.await_msg().await?; - let requested_paths: Vec = match reply { - Message::FileNegotiationMessage(fnm) => fnm.files.into_iter().map(|f| f.path).collect(), - _ => return Err(anyhow!("Expecting file negotiation message back")), - }; - Ok(file_handles - .into_iter() - .filter(|fh| requested_paths.contains(&fh.path)) - .collect()) + // Return requested chunks + match reply { + Message::FileRequest(file_request_payload) => Ok(file_request_payload.chunks), + _ => Err(anyhow!("Expecting file request message back")), + } } -pub async fn negotiate_files_down(conn: &mut Connection) -> Result> { - let offer = conn.await_msg().await?; - let requested_infos: Vec = match offer { - Message::FileNegotiationMessage(fnm) => fnm.files, - _ => return Err(anyhow!("Expecting file negotiation message back")), +pub async fn request_specific_files(conn: &mut Connection) -> Result> { + // Wait for offer message + let offer_message = conn.await_msg().await?; + let offered_files: Vec = match offer_message { + Message::FileOffer(file_offer_payload) => file_offer_payload.files, + _ => return Err(anyhow!("Expecting file offer message")), }; - let mut stdin = FramedRead::new(io::stdin(), LinesCodec::new()); - let mut files = vec![]; - for file_info in requested_infos.into_iter() { - let mut reply = prompt_user_input(&mut stdin, &file_info).await; - while reply.is_none() { - reply = prompt_user_input(&mut stdin, &file_info).await; - } - match reply { - Some(true) => files.push(file_info), - _ => {} - } - } - let msg = Message::FileNegotiationMessage(FileNegotiationPayload { - files: files.clone(), + // Prompt user for confirmation of files + let desired_files = prompt_user_for_file_confirmation(offered_files).await; + let file_request_msg = Message::FileRequest(FileRequestPayload { + chunks: desired_files + .iter() + .map(|file| ChunkHeader { + id: file.id, + start: 0, + }) + .collect(), }); - conn.send_msg(msg).await?; - Ok(files) + conn.send_msg(file_request_msg).await?; + Ok(desired_files) } -pub async fn upload_encrypted_files(conn: &mut Connection, handles: Vec) -> Result<()> { - for mut handle in handles { - enqueue_file_chunks(conn, &mut handle).await?; +pub async fn create_files(desired_files: Vec) -> Result> { + let mut v = Vec::new(); + for desired_file in desired_files { + let filename = desired_file + .path + .file_name() + .unwrap_or(OsStr::new("random.txt")); + let file = File::create(filename).await?; + let std_file = file.into_std().await; + let std_file_handle = StdFileHandle { + id: desired_file.id, + file: std_file, + start: 0, + }; + v.push(std_file_handle) } - println!("Files uploaded."); - Ok(()) -} - -pub async fn enqueue_file_chunks(conn: &mut Connection, fh: &mut FileHandle) -> Result<()> { - let mut chunk_num = 0; - let mut bytes_read = 1; - while bytes_read != 0 { - let mut buf = BytesMut::with_capacity(BUFFER_SIZE); - bytes_read = fh.file.read_buf(&mut buf).await?; - // println!("Bytes_read: {:?}, The bytes: {:?}", bytes_read, &buf[..]); - if bytes_read != 0 { - let chunk = buf.freeze(); - let file_info = fh.to_file_info(); - let ftp = Message::FileTransferMessage(FileTransferPayload { - chunk, - chunk_num, - file_info, - }); - conn.send_msg(ftp).await?; - chunk_num += 1; - } - } - - Ok(()) -} - -pub async fn download_files(conn: &mut Connection, file_infos: Vec) -> Result<()> { - // for each file_info - let mut info_handles: HashMap> = HashMap::new(); - for fi in file_infos { - let (tx, rx) = mpsc::unbounded_channel::<(u64, Bytes)>(); - let path = fi.path.clone(); - tokio::spawn(async move { download_file(fi, rx).await }); - info_handles.insert(path, tx); - } - loop { - tokio::select! { - result = conn.await_msg() => match result { - Ok(msg) => { - match msg { - Message::FileTransferMessage(payload) => { - if let Some(tx) = info_handles.get(&payload.file_info.path) { - tx.send((payload.chunk_num, payload.chunk))? - } - }, - _ => { - println!("Wrong message type"); - return Err(anyhow!("wrong message type")); - } - } - }, - Err(e) => return Err(anyhow!(e.to_string())), - } - } - } -} - -pub async fn download_file( - file_info: FileInfo, - rx: mpsc::UnboundedReceiver<(u64, Bytes)>, -) -> Result<()> { - println!("in download file"); - let mut rx = rx; - let filename = match file_info.path.file_name() { - Some(f) => { - println!("matched filename"); - f - } - None => { - println!("didnt match filename"); - OsStr::new("random.txt") - } - }; - println!("trying to create file...filename={:?}", filename); - let mut file = File::create(filename).await?; - println!("file created ok! filename={:?}", filename); - while let Some((_chunk_num, chunk)) = rx.recv().await { - // println!("rx got message! chunk={:?}", chunk); - file.write_all(&chunk).await?; - } - println!("done receiving messages"); - Ok(()) + return Ok(v); } diff --git a/src/conf.rs b/src/conf.rs index 6fb5d0e..0d007f0 100644 --- a/src/conf.rs +++ b/src/conf.rs @@ -1,5 +1,6 @@ pub const ID_SIZE: usize = 32; // Blake256 of password pub const HANDSHAKE_MSG_SIZE: usize = 33; // generated by Spake2 -pub const PER_CLIENT_BUFFER: usize = 1024 * 1024; // buffer size allocated by server for each client -pub const BUFFER_SIZE: usize = 1024 * 1024 * 1024; // chunk size for files sent over wire -pub const NONCE_SIZE_IN_BYTES: usize = 96 / 8; // used for every encrypted message +pub const PER_CLIENT_BUFFER: usize = 1024 * 64; // buffer size allocated by server for each client +pub const BUFFER_SIZE: usize = 1024 * 64; // chunk size for files sent over wire +pub const NONCE_SIZE: usize = 96 / 8; // used for every encrypted message +pub const CHUNK_HEADER_SIZE: usize = 10; // used for every chunk header diff --git a/src/connection.rs b/src/connection.rs index aef11cb..361141c 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -1,10 +1,17 @@ +use crate::conf::{BUFFER_SIZE, CHUNK_HEADER_SIZE}; use crate::crypto::Crypt; +use crate::file::{ChunkHeader, FileHandle, StdFileHandle}; use crate::message::{Message, MessageStream}; use anyhow::{anyhow, Result}; -use bytes::Bytes; + use futures::{SinkExt, StreamExt}; use tokio::net::TcpStream; +use bytes::{Bytes, BytesMut}; +use flate2::write::{GzDecoder, GzEncoder}; +use flate2::Compression; +use std::io::{Read, Write}; + pub struct Connection { ms: MessageStream, crypt: Crypt, @@ -17,7 +24,7 @@ impl Connection { Connection { ms, crypt } } - async fn send_bytes(&mut self, bytes: Bytes) -> Result<()> { + pub async fn send_bytes(&mut self, bytes: Bytes) -> Result<()> { match self.ms.send(bytes).await { Ok(_) => Ok(()), Err(e) => Err(anyhow!(e.to_string())), @@ -40,4 +47,62 @@ impl Connection { None => Err(anyhow!("Error awaiting msg")), } } + + pub async fn upload_files(mut self, handles: Vec) -> Result<()> { + let mut socket = self.ms.into_inner().into_std()?; + tokio::task::spawn_blocking(move || { + for mut handle in handles { + let mut buffer = BytesMut::with_capacity(BUFFER_SIZE); + let mut start = handle.start; + loop { + let end = + FileHandle::to_message(handle.id, &mut handle.file, &mut buffer, start)?; + let mut compressor = GzEncoder::new(Vec::new(), Compression::fast()); + compressor.write_all(&buffer[..])?; + let compressed_bytes = compressor.finish()?; + let encrypted_bytes = self.crypt.encrypt(Bytes::from(compressed_bytes))?; + start = end; + socket.write(&encrypted_bytes[..])?; + if end == 0 { + break; + } + } + } + Ok(()) + }) + .await? + } + + pub async fn download_files(mut self, handles: Vec) -> Result<()> { + let mut socket = self.ms.into_inner().into_std()?; + tokio::task::spawn_blocking(move || { + for mut handle in handles { + let mut buffer = BytesMut::with_capacity(BUFFER_SIZE); + let mut start = handle.start; + loop { + // read bytes + match socket.read(&mut buffer) { + Ok(0) => { + break; + } + Ok(n) => { + let decrypted_bytes = + self.crypt.decrypt(Bytes::from(&mut buffer[0..n]))?; + let mut writer = Vec::new(); + let mut decompressor = GzDecoder::new(writer); + decompressor.write_all(&decrypted_bytes[..])?; + decompressor.try_finish()?; + writer = decompressor.finish()?; + let chunk_header: ChunkHeader = + bincode::deserialize(&writer[..CHUNK_HEADER_SIZE])?; + handle.file.write_all(&writer) + } + Err(e) => return Err(anyhow!(e.to_string())), + }; + } + } + Ok(()) + }) + .await? + } } diff --git a/src/crypto.rs b/src/crypto.rs index ae66d03..e95b5fa 100644 --- a/src/crypto.rs +++ b/src/crypto.rs @@ -1,4 +1,4 @@ -use crate::conf::NONCE_SIZE_IN_BYTES; +use crate::conf::NONCE_SIZE; use aes_gcm::aead::{Aead, NewAead}; use aes_gcm::{Aes256Gcm, Key, Nonce}; // Or `Aes128Gcm` use anyhow::{anyhow, Result}; @@ -8,7 +8,7 @@ use rand::{thread_rng, Rng}; pub struct Crypt { cipher: Aes256Gcm, - arr: [u8; NONCE_SIZE_IN_BYTES], + arr: [u8; NONCE_SIZE], } impl Crypt { @@ -16,7 +16,7 @@ impl Crypt { let key = Key::from_slice(&key[..]); Crypt { cipher: Aes256Gcm::new(key), - arr: [0u8; NONCE_SIZE_IN_BYTES], + arr: [0u8; NONCE_SIZE], } } @@ -26,7 +26,7 @@ impl Crypt { let nonce = Nonce::from_slice(&self.arr); match self.cipher.encrypt(nonce, plaintext.as_ref()) { Ok(body) => { - let mut buffer = BytesMut::with_capacity(NONCE_SIZE_IN_BYTES + body.len()); + let mut buffer = BytesMut::with_capacity(NONCE_SIZE + body.len()); buffer.extend_from_slice(nonce); buffer.extend_from_slice(&body); Ok(buffer.freeze()) @@ -38,7 +38,7 @@ impl Crypt { // Accepts wire format, includes nonce as prefix pub fn decrypt(&self, ciphertext: Bytes) -> Result { let mut ciphertext_body = ciphertext; - let nonce_bytes = ciphertext_body.split_to(NONCE_SIZE_IN_BYTES); + let nonce_bytes = ciphertext_body.split_to(NONCE_SIZE); let nonce = Nonce::from_slice(&nonce_bytes); match self.cipher.decrypt(nonce, ciphertext_body.as_ref()) { Ok(payload) => Ok(Bytes::from(payload)), diff --git a/src/file.rs b/src/file.rs index 514ecd1..4967cd4 100644 --- a/src/file.rs +++ b/src/file.rs @@ -1,28 +1,32 @@ use anyhow::Result; - use futures::future::try_join_all; use serde::{Deserialize, Serialize}; use std::fs::Metadata; use std::path::PathBuf; -use flate2::bufread::GzEncoder; -use flate2::Compression; -use std::io::{copy, BufRead, BufReader, Read, Write}; -use std::net::TcpStream; +use bytes::BytesMut; +use std::io::{Read, Seek, SeekFrom}; + use tokio::fs::File; #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct ChunkHeader { pub id: u8, pub start: u64, - pub end: u64, } #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] -pub struct FileInfo { +pub struct FileOffer { + pub id: u8, pub path: PathBuf, - pub chunk_header: ChunkHeader, + pub size: u64, +} + +pub struct StdFileHandle { + pub id: u8, + pub file: std::fs::File, + pub start: u64, } pub struct FileHandle { @@ -32,12 +36,6 @@ pub struct FileHandle { pub path: PathBuf, } -impl Read for FileHandle { - fn read(&mut self, buf: &mut [u8]) -> std::io::Result { - Ok(2) - } -} - impl FileHandle { pub async fn new(id: u8, path: PathBuf) -> Result { let file = File::open(&path).await?; @@ -46,14 +44,45 @@ impl FileHandle { return Ok(fh); } - pub fn to_file_info(&self) -> FileInfo { - FileInfo { + pub async fn to_stds( + file_handles: Vec, + chunk_headers: Vec, + ) -> Vec { + let mut ret = Vec::new(); + for handle in file_handles { + let chunk = chunk_headers.iter().find(|chunk| handle.id == chunk.id); + match chunk { + Some(chunk) => { + match handle.to_std(chunk).await { + Ok(std_file_handle) => { + ret.push(std_file_handle); + } + _ => println!("Error seeking in file"), + }; + } + None => { + println!("Skipping file b/c not in requested chunks"); + } + } + } + ret + } + + async fn to_std(self, chunk_header: &ChunkHeader) -> Result { + let mut std_file = self.file.into_std().await; + std_file.seek(SeekFrom::Start(chunk_header.start))?; + Ok(StdFileHandle { + id: self.id, + file: std_file, + start: chunk_header.start, + }) + } + + pub fn to_file_offer(&self) -> FileOffer { + FileOffer { + id: self.id, path: self.path.clone(), - chunk_header: ChunkHeader { - id: self.id, - start: 0, - end: self.md.len(), - }, + size: self.md.len(), } } @@ -66,16 +95,24 @@ impl FileHandle { Ok(handles) } - async fn count_lines(self, socket: TcpStream) -> Result { - let file = self.file.into_std().await; - let mut socket = socket; - tokio::task::spawn_blocking(move || { - let reader = BufReader::new(file); - let mut gz = GzEncoder::new(reader, Compression::fast()); - copy(&mut gz, &mut socket)?; - Ok(socket) - }) - .await? + pub fn to_message( + id: u8, + file: &mut std::fs::File, + buffer: &mut BytesMut, + start: u64, + ) -> Result { + // reads the next chunk of the file + // packs it into the buffer, with the header taking up the first X bytes + let chunk_header = ChunkHeader { id, start }; + let chunk_bytes = bincode::serialize(&chunk_header)?; + println!( + "chunk_bytes = {:?}, len = {:?}", + chunk_bytes.clone(), + chunk_bytes.len() + ); + buffer.extend_from_slice(&chunk_bytes[..]); + let n = file.read(buffer)? as u64; + Ok(n) } } diff --git a/src/message.rs b/src/message.rs index 5abf618..0e27365 100644 --- a/src/message.rs +++ b/src/message.rs @@ -1,4 +1,4 @@ -use crate::file::FileInfo; +use crate::file::{ChunkHeader, FileOffer}; use anyhow::{anyhow, Result}; use bytes::Bytes; @@ -8,28 +8,30 @@ use tokio_util::codec::{Framed, LengthDelimitedCodec}; #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub enum Message { - FileNegotiationMessage(FileNegotiationPayload), - FileTransferMessage(FileTransferPayload), + FileOffer(FileOfferPayload), + FileRequest(FileRequestPayload), + FileTransfer(FileTransferPayload), } #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] -pub struct FileNegotiationPayload { - pub files: Vec, +pub struct FileRequestPayload { + pub chunks: Vec, +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct FileOfferPayload { + pub files: Vec, } #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct FileTransferPayload { - pub file_info: FileInfo, - pub chunk_num: u64, + pub chunk_header: ChunkHeader, pub chunk: Bytes, } impl Message { pub fn serialize(&self) -> Result { - match bincode::serialize(&self) { - Ok(vec) => Ok(Bytes::from(vec)), - Err(e) => Err(anyhow!(e.to_string())), - } + bincode::serialize(&self).map(|vec| Ok(Bytes::from(vec)))? } pub fn deserialize(bytes: Bytes) -> Result { match bincode::deserialize(bytes.as_ref()) { diff --git a/src/ui.rs b/src/ui.rs index 2d4f005..9b94a41 100644 --- a/src/ui.rs +++ b/src/ui.rs @@ -1,4 +1,4 @@ -use crate::file::{to_size_string, FileInfo}; +use crate::file::{to_size_string, FileOffer}; use futures::prelude::*; @@ -6,15 +6,31 @@ use tokio::io::{self}; use tokio_util::codec::{FramedRead, LinesCodec}; +pub async fn prompt_user_for_file_confirmation(file_offers: Vec) -> Vec { + let mut stdin = FramedRead::new(io::stdin(), LinesCodec::new()); + let mut files = vec![]; + for file_offer in file_offers.into_iter() { + let mut reply = prompt_user_input(&mut stdin, &file_offer).await; + while reply.is_none() { + reply = prompt_user_input(&mut stdin, &file_offer).await; + } + match reply { + Some(true) => files.push(file_offer), + _ => {} + } + } + files +} + pub async fn prompt_user_input( stdin: &mut FramedRead, - file_info: &FileInfo, + file_offer: &FileOffer, ) -> Option { - let prompt_name = file_info.path.file_name().unwrap(); + let prompt_name = file_offer.path.file_name().unwrap(); println!( "Accept {:?}? ({:?}). (Y/n)", prompt_name, - to_size_string(file_info.chunk_header.end) + to_size_string(file_offer.size) ); match stdin.next().await { Some(Ok(line)) => match line.as_str() {