This commit is contained in:
Donald Knuth 2022-09-02 16:41:40 -04:00
parent 7544c0fc39
commit e0567d8479
7 changed files with 241 additions and 191 deletions

View file

@ -1,20 +1,16 @@
use crate::conf::BUFFER_SIZE;
use crate::connection::Connection;
use crate::file::{FileHandle, FileInfo};
use crate::file::{ChunkHeader, FileHandle, FileOffer, StdFileHandle};
use crate::handshake::Handshake;
use crate::message::{FileNegotiationPayload, FileTransferPayload, Message};
use crate::ui::prompt_user_input;
use crate::message::{FileOfferPayload, FileRequestPayload, Message};
use crate::ui::prompt_user_for_file_confirmation;
use anyhow::{anyhow, Result};
use bytes::{Bytes, BytesMut};
use std::collections::HashMap;
use std::ffi::OsStr;
use std::path::PathBuf;
use tokio::fs::File;
use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
use tokio::sync::mpsc;
use tokio_util::codec::{FramedRead, LinesCodec};
pub async fn send(file_paths: &Vec<PathBuf>, password: &String) -> Result<()> {
// Fail early if there are problems generating file handles
@ -28,11 +24,12 @@ pub async fn send(file_paths: &Vec<PathBuf>, password: &String) -> Result<()> {
let (socket, key) = handshake.negotiate(socket, s1).await?;
let mut connection = Connection::new(socket, key);
// Complete file negotiation
let handles = negotiate_files_up(&mut connection, handles).await?;
// Offer files, wait for requested file response
let requested_chunks = offer_files(&mut connection, &handles).await?;
// Upload negotiated files
upload_encrypted_files(&mut connection, handles).await?;
let std_file_handles = FileHandle::to_stds(handles, requested_chunks).await;
connection.upload_files(std_file_handles).await?;
println!("Done uploading.");
// Exit
@ -40,144 +37,76 @@ pub async fn send(file_paths: &Vec<PathBuf>, password: &String) -> Result<()> {
}
pub async fn receive(password: &String) -> Result<()> {
// Establish connection to server
let socket = TcpStream::connect("127.0.0.1:8080").await?;
let (handshake, s1) = Handshake::from_password(password);
// Complete handshake, returning key used for encryption
let (socket, key) = handshake.negotiate(socket, s1).await?;
let mut connection = Connection::new(socket, key);
let files = negotiate_files_down(&mut connection).await?;
download_files(&mut connection, files).await?;
// Wait for offered files, respond with desired files
let desired_files = request_specific_files(&mut connection).await?;
// Create files
let std_file_handles = create_files(desired_files).await?;
// Download them
connection.download_files(std_file_handles).await?;
return Ok(());
}
pub async fn negotiate_files_up(
pub async fn offer_files(
conn: &mut Connection,
file_handles: Vec<FileHandle>,
) -> Result<Vec<FileHandle>> {
let files = file_handles.iter().map(|fh| fh.to_file_info()).collect();
let msg = Message::FileNegotiationMessage(FileNegotiationPayload { files });
file_handles: &Vec<FileHandle>,
) -> Result<Vec<ChunkHeader>> {
// Collect file offer
let files = file_handles.iter().map(|fh| fh.to_file_offer()).collect();
let msg = Message::FileOffer(FileOfferPayload { files });
// Send file offer
conn.send_msg(msg).await?;
// Wait for reply
let reply = conn.await_msg().await?;
let requested_paths: Vec<PathBuf> = match reply {
Message::FileNegotiationMessage(fnm) => fnm.files.into_iter().map(|f| f.path).collect(),
_ => return Err(anyhow!("Expecting file negotiation message back")),
};
Ok(file_handles
.into_iter()
.filter(|fh| requested_paths.contains(&fh.path))
.collect())
// Return requested chunks
match reply {
Message::FileRequest(file_request_payload) => Ok(file_request_payload.chunks),
_ => Err(anyhow!("Expecting file request message back")),
}
}
pub async fn negotiate_files_down(conn: &mut Connection) -> Result<Vec<FileInfo>> {
let offer = conn.await_msg().await?;
let requested_infos: Vec<FileInfo> = match offer {
Message::FileNegotiationMessage(fnm) => fnm.files,
_ => return Err(anyhow!("Expecting file negotiation message back")),
pub async fn request_specific_files(conn: &mut Connection) -> Result<Vec<FileOffer>> {
// Wait for offer message
let offer_message = conn.await_msg().await?;
let offered_files: Vec<FileOffer> = match offer_message {
Message::FileOffer(file_offer_payload) => file_offer_payload.files,
_ => return Err(anyhow!("Expecting file offer message")),
};
let mut stdin = FramedRead::new(io::stdin(), LinesCodec::new());
let mut files = vec![];
for file_info in requested_infos.into_iter() {
let mut reply = prompt_user_input(&mut stdin, &file_info).await;
while reply.is_none() {
reply = prompt_user_input(&mut stdin, &file_info).await;
}
match reply {
Some(true) => files.push(file_info),
_ => {}
}
}
let msg = Message::FileNegotiationMessage(FileNegotiationPayload {
files: files.clone(),
// Prompt user for confirmation of files
let desired_files = prompt_user_for_file_confirmation(offered_files).await;
let file_request_msg = Message::FileRequest(FileRequestPayload {
chunks: desired_files
.iter()
.map(|file| ChunkHeader {
id: file.id,
start: 0,
})
.collect(),
});
conn.send_msg(msg).await?;
Ok(files)
conn.send_msg(file_request_msg).await?;
Ok(desired_files)
}
pub async fn upload_encrypted_files(conn: &mut Connection, handles: Vec<FileHandle>) -> Result<()> {
for mut handle in handles {
enqueue_file_chunks(conn, &mut handle).await?;
pub async fn create_files(desired_files: Vec<FileOffer>) -> Result<Vec<StdFileHandle>> {
let mut v = Vec::new();
for desired_file in desired_files {
let filename = desired_file
.path
.file_name()
.unwrap_or(OsStr::new("random.txt"));
let file = File::create(filename).await?;
let std_file = file.into_std().await;
let std_file_handle = StdFileHandle {
id: desired_file.id,
file: std_file,
start: 0,
};
v.push(std_file_handle)
}
println!("Files uploaded.");
Ok(())
}
pub async fn enqueue_file_chunks(conn: &mut Connection, fh: &mut FileHandle) -> Result<()> {
let mut chunk_num = 0;
let mut bytes_read = 1;
while bytes_read != 0 {
let mut buf = BytesMut::with_capacity(BUFFER_SIZE);
bytes_read = fh.file.read_buf(&mut buf).await?;
// println!("Bytes_read: {:?}, The bytes: {:?}", bytes_read, &buf[..]);
if bytes_read != 0 {
let chunk = buf.freeze();
let file_info = fh.to_file_info();
let ftp = Message::FileTransferMessage(FileTransferPayload {
chunk,
chunk_num,
file_info,
});
conn.send_msg(ftp).await?;
chunk_num += 1;
}
}
Ok(())
}
pub async fn download_files(conn: &mut Connection, file_infos: Vec<FileInfo>) -> Result<()> {
// for each file_info
let mut info_handles: HashMap<PathBuf, mpsc::UnboundedSender<(u64, Bytes)>> = HashMap::new();
for fi in file_infos {
let (tx, rx) = mpsc::unbounded_channel::<(u64, Bytes)>();
let path = fi.path.clone();
tokio::spawn(async move { download_file(fi, rx).await });
info_handles.insert(path, tx);
}
loop {
tokio::select! {
result = conn.await_msg() => match result {
Ok(msg) => {
match msg {
Message::FileTransferMessage(payload) => {
if let Some(tx) = info_handles.get(&payload.file_info.path) {
tx.send((payload.chunk_num, payload.chunk))?
}
},
_ => {
println!("Wrong message type");
return Err(anyhow!("wrong message type"));
}
}
},
Err(e) => return Err(anyhow!(e.to_string())),
}
}
}
}
pub async fn download_file(
file_info: FileInfo,
rx: mpsc::UnboundedReceiver<(u64, Bytes)>,
) -> Result<()> {
println!("in download file");
let mut rx = rx;
let filename = match file_info.path.file_name() {
Some(f) => {
println!("matched filename");
f
}
None => {
println!("didnt match filename");
OsStr::new("random.txt")
}
};
println!("trying to create file...filename={:?}", filename);
let mut file = File::create(filename).await?;
println!("file created ok! filename={:?}", filename);
while let Some((_chunk_num, chunk)) = rx.recv().await {
// println!("rx got message! chunk={:?}", chunk);
file.write_all(&chunk).await?;
}
println!("done receiving messages");
Ok(())
return Ok(v);
}

View file

@ -1,5 +1,6 @@
pub const ID_SIZE: usize = 32; // Blake256 of password
pub const HANDSHAKE_MSG_SIZE: usize = 33; // generated by Spake2
pub const PER_CLIENT_BUFFER: usize = 1024 * 1024; // buffer size allocated by server for each client
pub const BUFFER_SIZE: usize = 1024 * 1024 * 1024; // chunk size for files sent over wire
pub const NONCE_SIZE_IN_BYTES: usize = 96 / 8; // used for every encrypted message
pub const PER_CLIENT_BUFFER: usize = 1024 * 64; // buffer size allocated by server for each client
pub const BUFFER_SIZE: usize = 1024 * 64; // chunk size for files sent over wire
pub const NONCE_SIZE: usize = 96 / 8; // used for every encrypted message
pub const CHUNK_HEADER_SIZE: usize = 10; // used for every chunk header

View file

@ -1,10 +1,17 @@
use crate::conf::{BUFFER_SIZE, CHUNK_HEADER_SIZE};
use crate::crypto::Crypt;
use crate::file::{ChunkHeader, FileHandle, StdFileHandle};
use crate::message::{Message, MessageStream};
use anyhow::{anyhow, Result};
use bytes::Bytes;
use futures::{SinkExt, StreamExt};
use tokio::net::TcpStream;
use bytes::{Bytes, BytesMut};
use flate2::write::{GzDecoder, GzEncoder};
use flate2::Compression;
use std::io::{Read, Write};
pub struct Connection {
ms: MessageStream,
crypt: Crypt,
@ -17,7 +24,7 @@ impl Connection {
Connection { ms, crypt }
}
async fn send_bytes(&mut self, bytes: Bytes) -> Result<()> {
pub async fn send_bytes(&mut self, bytes: Bytes) -> Result<()> {
match self.ms.send(bytes).await {
Ok(_) => Ok(()),
Err(e) => Err(anyhow!(e.to_string())),
@ -40,4 +47,62 @@ impl Connection {
None => Err(anyhow!("Error awaiting msg")),
}
}
pub async fn upload_files(mut self, handles: Vec<StdFileHandle>) -> Result<()> {
let mut socket = self.ms.into_inner().into_std()?;
tokio::task::spawn_blocking(move || {
for mut handle in handles {
let mut buffer = BytesMut::with_capacity(BUFFER_SIZE);
let mut start = handle.start;
loop {
let end =
FileHandle::to_message(handle.id, &mut handle.file, &mut buffer, start)?;
let mut compressor = GzEncoder::new(Vec::new(), Compression::fast());
compressor.write_all(&buffer[..])?;
let compressed_bytes = compressor.finish()?;
let encrypted_bytes = self.crypt.encrypt(Bytes::from(compressed_bytes))?;
start = end;
socket.write(&encrypted_bytes[..])?;
if end == 0 {
break;
}
}
}
Ok(())
})
.await?
}
pub async fn download_files(mut self, handles: Vec<StdFileHandle>) -> Result<()> {
let mut socket = self.ms.into_inner().into_std()?;
tokio::task::spawn_blocking(move || {
for mut handle in handles {
let mut buffer = BytesMut::with_capacity(BUFFER_SIZE);
let mut start = handle.start;
loop {
// read bytes
match socket.read(&mut buffer) {
Ok(0) => {
break;
}
Ok(n) => {
let decrypted_bytes =
self.crypt.decrypt(Bytes::from(&mut buffer[0..n]))?;
let mut writer = Vec::new();
let mut decompressor = GzDecoder::new(writer);
decompressor.write_all(&decrypted_bytes[..])?;
decompressor.try_finish()?;
writer = decompressor.finish()?;
let chunk_header: ChunkHeader =
bincode::deserialize(&writer[..CHUNK_HEADER_SIZE])?;
handle.file.write_all(&writer)
}
Err(e) => return Err(anyhow!(e.to_string())),
};
}
}
Ok(())
})
.await?
}
}

View file

@ -1,4 +1,4 @@
use crate::conf::NONCE_SIZE_IN_BYTES;
use crate::conf::NONCE_SIZE;
use aes_gcm::aead::{Aead, NewAead};
use aes_gcm::{Aes256Gcm, Key, Nonce}; // Or `Aes128Gcm`
use anyhow::{anyhow, Result};
@ -8,7 +8,7 @@ use rand::{thread_rng, Rng};
pub struct Crypt {
cipher: Aes256Gcm,
arr: [u8; NONCE_SIZE_IN_BYTES],
arr: [u8; NONCE_SIZE],
}
impl Crypt {
@ -16,7 +16,7 @@ impl Crypt {
let key = Key::from_slice(&key[..]);
Crypt {
cipher: Aes256Gcm::new(key),
arr: [0u8; NONCE_SIZE_IN_BYTES],
arr: [0u8; NONCE_SIZE],
}
}
@ -26,7 +26,7 @@ impl Crypt {
let nonce = Nonce::from_slice(&self.arr);
match self.cipher.encrypt(nonce, plaintext.as_ref()) {
Ok(body) => {
let mut buffer = BytesMut::with_capacity(NONCE_SIZE_IN_BYTES + body.len());
let mut buffer = BytesMut::with_capacity(NONCE_SIZE + body.len());
buffer.extend_from_slice(nonce);
buffer.extend_from_slice(&body);
Ok(buffer.freeze())
@ -38,7 +38,7 @@ impl Crypt {
// Accepts wire format, includes nonce as prefix
pub fn decrypt(&self, ciphertext: Bytes) -> Result<Bytes> {
let mut ciphertext_body = ciphertext;
let nonce_bytes = ciphertext_body.split_to(NONCE_SIZE_IN_BYTES);
let nonce_bytes = ciphertext_body.split_to(NONCE_SIZE);
let nonce = Nonce::from_slice(&nonce_bytes);
match self.cipher.decrypt(nonce, ciphertext_body.as_ref()) {
Ok(payload) => Ok(Bytes::from(payload)),

View file

@ -1,28 +1,32 @@
use anyhow::Result;
use futures::future::try_join_all;
use serde::{Deserialize, Serialize};
use std::fs::Metadata;
use std::path::PathBuf;
use flate2::bufread::GzEncoder;
use flate2::Compression;
use std::io::{copy, BufRead, BufReader, Read, Write};
use std::net::TcpStream;
use bytes::BytesMut;
use std::io::{Read, Seek, SeekFrom};
use tokio::fs::File;
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct ChunkHeader {
pub id: u8,
pub start: u64,
pub end: u64,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct FileInfo {
pub struct FileOffer {
pub id: u8,
pub path: PathBuf,
pub chunk_header: ChunkHeader,
pub size: u64,
}
pub struct StdFileHandle {
pub id: u8,
pub file: std::fs::File,
pub start: u64,
}
pub struct FileHandle {
@ -32,12 +36,6 @@ pub struct FileHandle {
pub path: PathBuf,
}
impl Read for FileHandle {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
Ok(2)
}
}
impl FileHandle {
pub async fn new(id: u8, path: PathBuf) -> Result<FileHandle> {
let file = File::open(&path).await?;
@ -46,14 +44,45 @@ impl FileHandle {
return Ok(fh);
}
pub fn to_file_info(&self) -> FileInfo {
FileInfo {
pub async fn to_stds(
file_handles: Vec<FileHandle>,
chunk_headers: Vec<ChunkHeader>,
) -> Vec<StdFileHandle> {
let mut ret = Vec::new();
for handle in file_handles {
let chunk = chunk_headers.iter().find(|chunk| handle.id == chunk.id);
match chunk {
Some(chunk) => {
match handle.to_std(chunk).await {
Ok(std_file_handle) => {
ret.push(std_file_handle);
}
_ => println!("Error seeking in file"),
};
}
None => {
println!("Skipping file b/c not in requested chunks");
}
}
}
ret
}
async fn to_std(self, chunk_header: &ChunkHeader) -> Result<StdFileHandle> {
let mut std_file = self.file.into_std().await;
std_file.seek(SeekFrom::Start(chunk_header.start))?;
Ok(StdFileHandle {
id: self.id,
file: std_file,
start: chunk_header.start,
})
}
pub fn to_file_offer(&self) -> FileOffer {
FileOffer {
id: self.id,
path: self.path.clone(),
chunk_header: ChunkHeader {
id: self.id,
start: 0,
end: self.md.len(),
},
size: self.md.len(),
}
}
@ -66,16 +95,24 @@ impl FileHandle {
Ok(handles)
}
async fn count_lines(self, socket: TcpStream) -> Result<TcpStream, std::io::Error> {
let file = self.file.into_std().await;
let mut socket = socket;
tokio::task::spawn_blocking(move || {
let reader = BufReader::new(file);
let mut gz = GzEncoder::new(reader, Compression::fast());
copy(&mut gz, &mut socket)?;
Ok(socket)
})
.await?
pub fn to_message(
id: u8,
file: &mut std::fs::File,
buffer: &mut BytesMut,
start: u64,
) -> Result<u64> {
// reads the next chunk of the file
// packs it into the buffer, with the header taking up the first X bytes
let chunk_header = ChunkHeader { id, start };
let chunk_bytes = bincode::serialize(&chunk_header)?;
println!(
"chunk_bytes = {:?}, len = {:?}",
chunk_bytes.clone(),
chunk_bytes.len()
);
buffer.extend_from_slice(&chunk_bytes[..]);
let n = file.read(buffer)? as u64;
Ok(n)
}
}

View file

@ -1,4 +1,4 @@
use crate::file::FileInfo;
use crate::file::{ChunkHeader, FileOffer};
use anyhow::{anyhow, Result};
use bytes::Bytes;
@ -8,28 +8,30 @@ use tokio_util::codec::{Framed, LengthDelimitedCodec};
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum Message {
FileNegotiationMessage(FileNegotiationPayload),
FileTransferMessage(FileTransferPayload),
FileOffer(FileOfferPayload),
FileRequest(FileRequestPayload),
FileTransfer(FileTransferPayload),
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct FileNegotiationPayload {
pub files: Vec<FileInfo>,
pub struct FileRequestPayload {
pub chunks: Vec<ChunkHeader>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct FileOfferPayload {
pub files: Vec<FileOffer>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct FileTransferPayload {
pub file_info: FileInfo,
pub chunk_num: u64,
pub chunk_header: ChunkHeader,
pub chunk: Bytes,
}
impl Message {
pub fn serialize(&self) -> Result<Bytes> {
match bincode::serialize(&self) {
Ok(vec) => Ok(Bytes::from(vec)),
Err(e) => Err(anyhow!(e.to_string())),
}
bincode::serialize(&self).map(|vec| Ok(Bytes::from(vec)))?
}
pub fn deserialize(bytes: Bytes) -> Result<Self> {
match bincode::deserialize(bytes.as_ref()) {

View file

@ -1,4 +1,4 @@
use crate::file::{to_size_string, FileInfo};
use crate::file::{to_size_string, FileOffer};
use futures::prelude::*;
@ -6,15 +6,31 @@ use tokio::io::{self};
use tokio_util::codec::{FramedRead, LinesCodec};
pub async fn prompt_user_for_file_confirmation(file_offers: Vec<FileOffer>) -> Vec<FileOffer> {
let mut stdin = FramedRead::new(io::stdin(), LinesCodec::new());
let mut files = vec![];
for file_offer in file_offers.into_iter() {
let mut reply = prompt_user_input(&mut stdin, &file_offer).await;
while reply.is_none() {
reply = prompt_user_input(&mut stdin, &file_offer).await;
}
match reply {
Some(true) => files.push(file_offer),
_ => {}
}
}
files
}
pub async fn prompt_user_input(
stdin: &mut FramedRead<io::Stdin, LinesCodec>,
file_info: &FileInfo,
file_offer: &FileOffer,
) -> Option<bool> {
let prompt_name = file_info.path.file_name().unwrap();
let prompt_name = file_offer.path.file_name().unwrap();
println!(
"Accept {:?}? ({:?}). (Y/n)",
prompt_name,
to_size_string(file_info.chunk_header.end)
to_size_string(file_offer.size)
);
match stdin.next().await {
Some(Ok(line)) => match line.as_str() {