Scaffold for concurrent file reads

This commit is contained in:
rictorlome 2022-02-13 17:45:58 -05:00
parent 9479653364
commit 7befc61ab3
4 changed files with 72 additions and 5 deletions

View file

@ -15,3 +15,9 @@
- https://github.com/rust-lang/flate2-rs
- https://crates.io/crates/async-compression
- https://pdos.csail.mit.edu/papers/chord:sigcomm01/chord_sigcomm.pdf
- https://pdos.csail.mit.edu/6.824/schedule.html
- https://flylib.com/books/en/2.292.1.105/1/
- https://en.wikipedia.org/wiki/Two_Generals%27_Problem

View file

@ -1,6 +1,8 @@
use crate::crypto::handshake;
use crate::file::{to_size_string, FileHandle, FileInfo};
use crate::message::{EncryptedMessage, FileNegotiationPayload, Message, MessageStream};
use crate::message::{
EncryptedMessage, FileNegotiationPayload, FileTransferPayload, Message, MessageStream,
};
use aes_gcm::Aes256Gcm;
use anyhow::{anyhow, Result};
@ -8,9 +10,13 @@ use blake2::{Blake2s256, Digest};
use bytes::{Bytes, BytesMut};
use futures::future::try_join_all;
use futures::prelude::*;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use std::path::PathBuf;
use tokio::io;
use std::pin::Pin;
use tokio::io::{self, AsyncReadExt};
use tokio::net::TcpStream;
use tokio::sync::mpsc;
use tokio_util::codec::{FramedRead, LinesCodec};
fn pass_to_bytes(password: &String) -> Bytes {
@ -135,10 +141,55 @@ pub async fn negotiate_files_down(stream: &mut MessageStream, cipher: &Aes256Gcm
pub async fn upload_encrypted_files(
stream: &mut MessageStream,
file_paths: &Vec<PathBuf>,
key: Bytes,
handles: Vec<FileHandle>,
cipher: &Aes256Gcm,
) -> Result<()> {
return Ok(());
let (tx, mut rx) = mpsc::unbounded_channel::<EncryptedMessage>();
//turn foo into something more concrete
for mut handle in handles {
let txc = tx.clone();
tokio::spawn(async move {
let _ = enqueue_file_chunks(&mut handle, txc).await;
});
}
loop {
tokio::select! {
Some(msg) = rx.recv() => {
println!("message received to client.rx {:?}", msg);
let x = msg.to_encrypted_message(cipher)?;
stream.send(x).await?
}
else => break,
}
}
Ok(())
}
const BUFFER_SIZE: usize = 1024 * 64;
pub async fn enqueue_file_chunks(
fh: &mut FileHandle,
tx: mpsc::UnboundedSender<EncryptedMessage>,
) -> Result<()> {
// let mut buf = BytesMut::with_capacity(BUFFER_SIZE);
// // The `read` method is defined by this trait.
// let mut chunk_num = 0;
// while {
// let n = fh.file.read(&mut buf[..]).await?;
// n == 0
// } {
// let chunk = buf.freeze();
// let file_info = fh.to_file_info();
// let ftp = EncryptedMessage::FileTransferMessage(FileTransferPayload {
// chunk,
// chunk_num,
// file_info,
// });
// tx.send(ftp);
// chunk_num += 1;
// }
Ok(())
}
pub async fn prompt_user_input(

View file

@ -1,3 +1,5 @@
use crate::message::FileTransferPayload;
use anyhow::{anyhow, Result};
use serde::{Deserialize, Serialize};
use std::fs::Metadata;

View file

@ -35,6 +35,7 @@ pub struct EncryptedPayload {
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum EncryptedMessage {
FileNegotiationMessage(FileNegotiationPayload),
FileTransferMessage(FileTransferPayload),
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
@ -42,6 +43,13 @@ pub struct FileNegotiationPayload {
pub files: Vec<FileInfo>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct FileTransferPayload {
pub file_info: FileInfo,
pub chunk_num: u64,
pub chunk: Bytes,
}
impl EncryptedMessage {
pub fn from_encrypted_message(cipher: &Aes256Gcm, payload: &EncryptedPayload) -> Result<Self> {
let raw = decrypt(cipher, payload)?;