ruck/src/client.rs

184 lines
6.3 KiB
Rust
Raw Normal View History

use crate::conf::BUFFER_SIZE;
2022-08-30 14:53:16 +01:00
use crate::connection::Connection;
use crate::file::{FileHandle, FileInfo};
2022-08-30 02:21:48 +01:00
use crate::handshake::Handshake;
2022-08-30 14:53:16 +01:00
use crate::message::{FileNegotiationPayload, FileTransferPayload, Message};
use crate::ui::prompt_user_input;
2022-02-06 19:04:36 +00:00
2022-02-12 19:59:04 +00:00
use anyhow::{anyhow, Result};
use bytes::{Bytes, BytesMut};
2022-02-15 04:01:30 +00:00
use std::collections::HashMap;
use std::ffi::OsStr;
2022-02-07 00:54:36 +00:00
use std::path::PathBuf;
2022-02-15 04:01:30 +00:00
use tokio::fs::File;
use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
2022-02-06 19:04:36 +00:00
use tokio::net::TcpStream;
2022-02-13 22:45:58 +00:00
use tokio::sync::mpsc;
2022-02-12 19:59:04 +00:00
use tokio_util::codec::{FramedRead, LinesCodec};
2022-02-06 19:04:36 +00:00
2022-02-10 19:52:28 +00:00
pub async fn send(file_paths: &Vec<PathBuf>, password: &String) -> Result<()> {
2022-02-12 19:59:04 +00:00
// Fail early if there are problems generating file handles
let handles = FileHandle::get_file_handles(file_paths).await?;
2022-02-12 17:18:24 +00:00
2022-02-12 19:59:04 +00:00
// Establish connection to server
2022-02-10 19:52:28 +00:00
let socket = TcpStream::connect("127.0.0.1:8080").await?;
2022-02-12 01:50:14 +00:00
2022-08-30 02:21:48 +01:00
let (handshake, s1) = Handshake::from_password(password);
2022-08-30 03:33:50 +01:00
// Complete handshake, returning key used for encryption
let (socket, key) = handshake.negotiate(socket, s1).await?;
2022-08-30 02:21:48 +01:00
2022-08-30 14:53:16 +01:00
let mut connection = Connection::new(socket, key);
2022-02-12 19:59:04 +00:00
// Complete file negotiation
2022-08-30 14:53:16 +01:00
let handles = negotiate_files_up(&mut connection, handles).await?;
2022-02-12 19:59:04 +00:00
// Upload negotiated files
2022-08-30 14:53:16 +01:00
upload_encrypted_files(&mut connection, handles).await?;
2022-08-30 03:33:50 +01:00
println!("Done uploading.");
2022-02-12 19:59:04 +00:00
// Exit
Ok(())
2022-02-10 19:52:28 +00:00
}
pub async fn receive(password: &String) -> Result<()> {
let socket = TcpStream::connect("127.0.0.1:8080").await?;
2022-08-30 02:21:48 +01:00
let (handshake, s1) = Handshake::from_password(password);
2022-08-30 03:33:50 +01:00
let (socket, key) = handshake.negotiate(socket, s1).await?;
2022-08-30 14:53:16 +01:00
let mut connection = Connection::new(socket, key);
let files = negotiate_files_down(&mut connection).await?;
2022-02-12 19:59:04 +00:00
2022-08-30 14:53:16 +01:00
download_files(&mut connection, files).await?;
2022-02-10 19:52:28 +00:00
return Ok(());
}
2022-02-12 19:59:04 +00:00
pub async fn negotiate_files_up(
2022-08-30 14:53:16 +01:00
conn: &mut Connection,
2022-02-12 19:59:04 +00:00
file_handles: Vec<FileHandle>,
) -> Result<Vec<FileHandle>> {
let files = file_handles.iter().map(|fh| fh.to_file_info()).collect();
2022-08-30 14:53:16 +01:00
let msg = Message::FileNegotiationMessage(FileNegotiationPayload { files });
conn.send_msg(msg).await?;
let reply = conn.await_msg().await?;
let requested_paths: Vec<PathBuf> = match reply {
Message::FileNegotiationMessage(fnm) => fnm.files.into_iter().map(|f| f.path).collect(),
2022-02-12 19:59:04 +00:00
_ => return Err(anyhow!("Expecting file negotiation message back")),
};
Ok(file_handles
.into_iter()
.filter(|fh| requested_paths.contains(&fh.path))
.collect())
}
2022-08-30 14:53:16 +01:00
pub async fn negotiate_files_down(conn: &mut Connection) -> Result<Vec<FileInfo>> {
let offer = conn.await_msg().await?;
let requested_infos: Vec<FileInfo> = match offer {
Message::FileNegotiationMessage(fnm) => fnm.files,
2022-02-12 19:59:04 +00:00
_ => return Err(anyhow!("Expecting file negotiation message back")),
};
let mut stdin = FramedRead::new(io::stdin(), LinesCodec::new());
let mut files = vec![];
2022-02-15 04:01:30 +00:00
for file_info in requested_infos.into_iter() {
let mut reply = prompt_user_input(&mut stdin, &file_info).await;
2022-02-12 19:59:04 +00:00
while reply.is_none() {
2022-02-15 04:01:30 +00:00
reply = prompt_user_input(&mut stdin, &file_info).await;
2022-02-12 19:59:04 +00:00
}
match reply {
2022-02-15 04:01:30 +00:00
Some(true) => files.push(file_info),
2022-02-12 19:59:04 +00:00
_ => {}
}
}
2022-08-30 14:53:16 +01:00
let msg = Message::FileNegotiationMessage(FileNegotiationPayload {
2022-02-15 04:01:30 +00:00
files: files.clone(),
});
2022-08-30 14:53:16 +01:00
conn.send_msg(msg).await?;
2022-02-15 04:01:30 +00:00
Ok(files)
2022-02-12 19:59:04 +00:00
}
2022-08-30 14:53:16 +01:00
pub async fn upload_encrypted_files(conn: &mut Connection, handles: Vec<FileHandle>) -> Result<()> {
2022-02-13 22:45:58 +00:00
for mut handle in handles {
2022-08-30 14:53:16 +01:00
enqueue_file_chunks(conn, &mut handle).await?;
2022-02-13 22:45:58 +00:00
}
2022-08-30 14:53:16 +01:00
println!("Files uploaded.");
2022-02-13 22:45:58 +00:00
Ok(())
}
2022-08-30 14:53:16 +01:00
pub async fn enqueue_file_chunks(conn: &mut Connection, fh: &mut FileHandle) -> Result<()> {
2022-02-15 04:01:30 +00:00
let mut chunk_num = 0;
2022-02-16 01:55:33 +00:00
let mut bytes_read = 1;
while bytes_read != 0 {
let mut buf = BytesMut::with_capacity(BUFFER_SIZE);
bytes_read = fh.file.read_buf(&mut buf).await?;
// println!("Bytes_read: {:?}, The bytes: {:?}", bytes_read, &buf[..]);
2022-02-16 01:55:33 +00:00
if bytes_read != 0 {
let chunk = buf.freeze();
let file_info = fh.to_file_info();
2022-08-30 14:53:16 +01:00
let ftp = Message::FileTransferMessage(FileTransferPayload {
2022-02-16 01:55:33 +00:00
chunk,
chunk_num,
file_info,
});
2022-08-30 14:53:16 +01:00
conn.send_msg(ftp).await?;
2022-02-16 01:55:33 +00:00
chunk_num += 1;
}
2022-02-15 04:01:30 +00:00
}
2022-02-13 22:45:58 +00:00
Ok(())
2022-02-06 19:04:36 +00:00
}
2022-02-12 19:59:04 +00:00
2022-08-30 14:53:16 +01:00
pub async fn download_files(conn: &mut Connection, file_infos: Vec<FileInfo>) -> Result<()> {
2022-02-15 04:01:30 +00:00
// for each file_info
2022-02-16 01:55:33 +00:00
let mut info_handles: HashMap<PathBuf, mpsc::UnboundedSender<(u64, Bytes)>> = HashMap::new();
for fi in file_infos {
let (tx, rx) = mpsc::unbounded_channel::<(u64, Bytes)>();
let path = fi.path.clone();
tokio::spawn(async move { download_file(fi, rx).await });
info_handles.insert(path, tx);
}
2022-02-15 04:01:30 +00:00
loop {
tokio::select! {
2022-08-30 14:53:16 +01:00
result = conn.await_msg() => match result {
Ok(msg) => {
match msg {
Message::FileTransferMessage(payload) => {
2022-02-15 04:01:30 +00:00
if let Some(tx) = info_handles.get(&payload.file_info.path) {
tx.send((payload.chunk_num, payload.chunk))?
2022-08-30 14:53:16 +01:00
}
2022-02-15 04:01:30 +00:00
},
2022-08-30 14:53:16 +01:00
_ => {
println!("Wrong message type");
return Err(anyhow!("wrong message type"));
}
2022-02-15 04:01:30 +00:00
}
2022-08-30 14:53:16 +01:00
},
Err(e) => return Err(anyhow!(e.to_string())),
2022-02-15 04:01:30 +00:00
}
}
}
}
pub async fn download_file(
file_info: FileInfo,
rx: mpsc::UnboundedReceiver<(u64, Bytes)>,
) -> Result<()> {
2022-02-16 01:55:33 +00:00
println!("in download file");
2022-02-15 04:01:30 +00:00
let mut rx = rx;
let filename = match file_info.path.file_name() {
2022-02-16 01:55:33 +00:00
Some(f) => {
println!("matched filename");
f
}
None => {
println!("didnt match filename");
OsStr::new("random.txt")
}
2022-02-15 04:01:30 +00:00
};
2022-02-16 01:55:33 +00:00
println!("trying to create file...filename={:?}", filename);
let mut file = File::create(filename).await?;
println!("file created ok! filename={:?}", filename);
while let Some((_chunk_num, chunk)) = rx.recv().await {
// println!("rx got message! chunk={:?}", chunk);
2022-02-15 04:01:30 +00:00
file.write_all(&chunk).await?;
}
2022-02-16 01:55:33 +00:00
println!("done receiving messages");
2022-02-15 04:01:30 +00:00
Ok(())
}