diff --git a/Cargo.lock b/Cargo.lock index ebb6412..be1143e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,12 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "adler" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" + [[package]] name = "aead" version = "0.4.3" @@ -162,6 +168,15 @@ dependencies = [ "libc", ] +[[package]] +name = "crc32fast" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b540bd8bc810d3885c6ea91e2018302f68baba2129ab3e88f32389ee9370880d" +dependencies = [ + "cfg-if", +] + [[package]] name = "crypto-common" version = "0.1.2" @@ -213,6 +228,16 @@ dependencies = [ "subtle", ] +[[package]] +name = "flate2" +version = "1.0.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f82b0f4c27ad9f8bfd1f3208d882da2b09c301bc1c828fd3a00d0216d2fbbff6" +dependencies = [ + "crc32fast", + "miniz_oxide", +] + [[package]] name = "futures" version = "0.3.21" @@ -439,6 +464,15 @@ version = "2.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "308cc39be01b73d0d18f82a0e7b2a3df85245f84af96fdddc5d202d27e47b86a" +[[package]] +name = "miniz_oxide" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f5c75688da582b8ffc1f1799e9db273f32133c49e048f614d22ec3256773ccc" +dependencies = [ + "adler", +] + [[package]] name = "mio" version = "0.7.14" @@ -666,6 +700,7 @@ dependencies = [ "blake2", "bytes", "clap", + "flate2", "futures", "rand", "serde", diff --git a/Cargo.toml b/Cargo.toml index 7272996..3fba138 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,6 +11,7 @@ blake2 = "0.10.2" bytes = { version = "1", features = ["serde"] } bincode = "1.3.3" clap = { version = "3.0.14", features = ["derive"] } +flate2 = "1.0" futures = { version = "0.3.0", features = ["thread-pool"]} rand = "0.8.4" serde = { version = "1.0", features = ["derive"] } diff --git a/src/client.rs b/src/client.rs index fa1ce0a..a5677a9 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,13 +1,12 @@ use crate::conf::BUFFER_SIZE; use crate::connection::Connection; -use crate::file::{to_size_string, FileHandle, FileInfo}; +use crate::file::{FileHandle, FileInfo}; use crate::handshake::Handshake; use crate::message::{FileNegotiationPayload, FileTransferPayload, Message}; +use crate::ui::prompt_user_input; use anyhow::{anyhow, Result}; use bytes::{Bytes, BytesMut}; -use futures::future::try_join_all; -use futures::prelude::*; use std::collections::HashMap; use std::ffi::OsStr; use std::path::PathBuf; @@ -19,7 +18,7 @@ use tokio_util::codec::{FramedRead, LinesCodec}; pub async fn send(file_paths: &Vec, password: &String) -> Result<()> { // Fail early if there are problems generating file handles - let handles = get_file_handles(file_paths).await?; + let handles = FileHandle::get_file_handles(file_paths).await?; // Establish connection to server let socket = TcpStream::connect("127.0.0.1:8080").await?; @@ -51,14 +50,6 @@ pub async fn receive(password: &String) -> Result<()> { return Ok(()); } -pub async fn get_file_handles(file_paths: &Vec) -> Result> { - let tasks = file_paths - .into_iter() - .map(|path| FileHandle::new(path.to_path_buf())); - let handles = try_join_all(tasks).await?; - Ok(handles) -} - pub async fn negotiate_files_up( conn: &mut Connection, file_handles: Vec, @@ -190,26 +181,3 @@ pub async fn download_file( println!("done receiving messages"); Ok(()) } - -pub async fn prompt_user_input( - stdin: &mut FramedRead, - file_info: &FileInfo, -) -> Option { - let prompt_name = file_info.path.file_name().unwrap(); - println!( - "Accept {:?}? ({:?}). (Y/n)", - prompt_name, - to_size_string(file_info.size) - ); - match stdin.next().await { - Some(Ok(line)) => match line.as_str() { - "" | "Y" | "y" | "yes" | "Yes" | "YES" => Some(true), - "N" | "n" | "NO" | "no" | "No" => Some(false), - _ => { - println!("Invalid input. Please enter one of the following characters: [YyNn]"); - return None; - } - }, - _ => None, - } -} diff --git a/src/file.rs b/src/file.rs index 3e03c82..514ecd1 100644 --- a/src/file.rs +++ b/src/file.rs @@ -1,35 +1,82 @@ use anyhow::Result; + +use futures::future::try_join_all; + use serde::{Deserialize, Serialize}; use std::fs::Metadata; use std::path::PathBuf; + +use flate2::bufread::GzEncoder; +use flate2::Compression; +use std::io::{copy, BufRead, BufReader, Read, Write}; +use std::net::TcpStream; use tokio::fs::File; +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct ChunkHeader { + pub id: u8, + pub start: u64, + pub end: u64, +} + #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct FileInfo { pub path: PathBuf, - pub size: u64, + pub chunk_header: ChunkHeader, } pub struct FileHandle { + pub id: u8, pub file: File, pub md: Metadata, pub path: PathBuf, } +impl Read for FileHandle { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + Ok(2) + } +} + impl FileHandle { - pub async fn new(path: PathBuf) -> Result { + pub async fn new(id: u8, path: PathBuf) -> Result { let file = File::open(&path).await?; let md = file.metadata().await?; - let fh = FileHandle { file, md, path }; + let fh = FileHandle { id, file, md, path }; return Ok(fh); } pub fn to_file_info(&self) -> FileInfo { FileInfo { path: self.path.clone(), - size: self.md.len(), + chunk_header: ChunkHeader { + id: self.id, + start: 0, + end: self.md.len(), + }, } } + + pub async fn get_file_handles(file_paths: &Vec) -> Result> { + let tasks = file_paths + .into_iter() + .enumerate() + .map(|(idx, path)| FileHandle::new(idx.try_into().unwrap(), path.to_path_buf())); + let handles = try_join_all(tasks).await?; + Ok(handles) + } + + async fn count_lines(self, socket: TcpStream) -> Result { + let file = self.file.into_std().await; + let mut socket = socket; + tokio::task::spawn_blocking(move || { + let reader = BufReader::new(file); + let mut gz = GzEncoder::new(reader, Compression::fast()); + copy(&mut gz, &mut socket)?; + Ok(socket) + }) + .await? + } } const SUFFIX: [&'static str; 9] = ["B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB"]; diff --git a/src/main.rs b/src/main.rs index e481fd1..f880896 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,6 +7,7 @@ mod file; mod handshake; mod message; mod server; +mod ui; use clap::Parser; use cli::{Cli, Commands}; diff --git a/src/ui.rs b/src/ui.rs new file mode 100644 index 0000000..2d4f005 --- /dev/null +++ b/src/ui.rs @@ -0,0 +1,30 @@ +use crate::file::{to_size_string, FileInfo}; + +use futures::prelude::*; + +use tokio::io::{self}; + +use tokio_util::codec::{FramedRead, LinesCodec}; + +pub async fn prompt_user_input( + stdin: &mut FramedRead, + file_info: &FileInfo, +) -> Option { + let prompt_name = file_info.path.file_name().unwrap(); + println!( + "Accept {:?}? ({:?}). (Y/n)", + prompt_name, + to_size_string(file_info.chunk_header.end) + ); + match stdin.next().await { + Some(Ok(line)) => match line.as_str() { + "" | "Y" | "y" | "yes" | "Yes" | "YES" => Some(true), + "N" | "n" | "NO" | "no" | "No" => Some(false), + _ => { + println!("Invalid input. Please enter one of the following characters: [YyNn]"); + return None; + } + }, + _ => None, + } +}