Merge pull request #4 from rictorlome/speed-up

Speed up
This commit is contained in:
rictorlome 2022-09-03 15:13:18 -04:00 committed by GitHub
commit b41985773a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 558 additions and 582 deletions

1
.gitignore vendored
View file

@ -1 +1,2 @@
/target
Notes.md

128
Cargo.lock generated
View file

@ -2,6 +2,12 @@
# It is not intended for manual editing.
version = 3
[[package]]
name = "adler"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
[[package]]
name = "aead"
version = "0.4.3"
@ -162,6 +168,15 @@ dependencies = [
"libc",
]
[[package]]
name = "crc32fast"
version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b540bd8bc810d3885c6ea91e2018302f68baba2129ab3e88f32389ee9370880d"
dependencies = [
"cfg-if",
]
[[package]]
name = "crypto-common"
version = "0.1.2"
@ -214,28 +229,13 @@ dependencies = [
]
[[package]]
name = "educe"
version = "0.4.18"
name = "flate2"
version = "1.0.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f86b50932a01e7ec5c06160492ab660fb19b6bb2a7878030dd6cd68d21df9d4d"
checksum = "f82b0f4c27ad9f8bfd1f3208d882da2b09c301bc1c828fd3a00d0216d2fbbff6"
dependencies = [
"enum-ordinalize",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "enum-ordinalize"
version = "3.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b166c9e378360dd5a6666a9604bb4f54ae0cac39023ffbac425e917a2a04fef"
dependencies = [
"num-bigint",
"num-traits",
"proc-macro2",
"quote",
"syn",
"crc32fast",
"miniz_oxide",
]
[[package]]
@ -464,6 +464,15 @@ version = "2.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "308cc39be01b73d0d18f82a0e7b2a3df85245f84af96fdddc5d202d27e47b86a"
[[package]]
name = "miniz_oxide"
version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6f5c75688da582b8ffc1f1799e9db273f32133c49e048f614d22ec3256773ccc"
dependencies = [
"adler",
]
[[package]]
name = "mio"
version = "0.7.14"
@ -495,36 +504,6 @@ dependencies = [
"winapi",
]
[[package]]
name = "num-bigint"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f93ab6289c7b344a8a9f60f88d80aa20032336fe78da341afc91c8a2341fc75f"
dependencies = [
"autocfg",
"num-integer",
"num-traits",
]
[[package]]
name = "num-integer"
version = "0.1.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2cc698a63b549a70bc047073d2949cce27cd1c7b0a4a862d08a8031bc2801db"
dependencies = [
"autocfg",
"num-traits",
]
[[package]]
name = "num-traits"
version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a64b1ec5cda2586e284722486d802acf1f7dbdc623e2bfc57e65ca1cd099290"
dependencies = [
"autocfg",
]
[[package]]
name = "num_cpus"
version = "1.13.1"
@ -581,26 +560,6 @@ dependencies = [
"winapi",
]
[[package]]
name = "pin-project"
version = "1.0.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "58ad3879ad3baf4e44784bc6a718a8698867bb991f8ce24d1bcbe2cfb4c3a75e"
dependencies = [
"pin-project-internal",
]
[[package]]
name = "pin-project-internal"
version = "1.0.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "744b6f092ba29c3650faf274db506afd39944f48420f6c86b17cfe0ee1cb36bb"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "pin-project-lite"
version = "0.2.8"
@ -741,13 +700,12 @@ dependencies = [
"blake2",
"bytes",
"clap",
"flate2",
"futures",
"rand",
"serde",
"spake2",
"tokio",
"tokio-serde",
"tokio-stream",
"tokio-util",
]
@ -889,32 +847,6 @@ dependencies = [
"syn",
]
[[package]]
name = "tokio-serde"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "911a61637386b789af998ee23f50aa30d5fd7edcec8d6d3dedae5e5815205466"
dependencies = [
"bincode",
"bytes",
"educe",
"futures-core",
"futures-sink",
"pin-project",
"serde",
]
[[package]]
name = "tokio-stream"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "50145484efff8818b5ccd256697f36863f587da82cf8b409c53adf1e840798e3"
dependencies = [
"futures-core",
"pin-project-lite",
"tokio",
]
[[package]]
name = "tokio-util"
version = "0.6.9"

View file

@ -11,11 +11,10 @@ blake2 = "0.10.2"
bytes = { version = "1", features = ["serde"] }
bincode = "1.3.3"
clap = { version = "3.0.14", features = ["derive"] }
flate2 = "1.0"
futures = { version = "0.3.0", features = ["thread-pool"]}
rand = "0.8.4"
serde = { version = "1.0", features = ["derive"] }
spake2 = "0.3.1"
tokio = { version = "1.16.1", features = ["full"] }
tokio-serde = { version = "~0.8", features = ["bincode"] }
tokio-stream = { version = "~0.1.6", features = ["net"]}
tokio-util = { version = "0.6.3", features = ["full"]}

View file

@ -1,30 +1,35 @@
- https://github.com/mcginty/snow/tree/master/examples
- https://github.com/black-binary/snowstorm/tree/master/examples
- https://cryptoservices.github.io/cryptography/protocols/2016/04/27/noise-protocol.html
# ruck
- https://github.com/libp2p
`ruck` is a command line tool used for hosting relay servers and sending end-to-end encrypted files between clients. It was heavily inspired by [croc](https://github.com/schollz/croc), one of the easiest ways to send files between peers. This document describes the protocol `ruck` uses to support this functionality.
- https://docs.rs/spake2/latest/spake2/
- https://crates.io/crates/chacha20poly1305
- https://briansmith.org/rustdoc/ring/aead/index.html
- https://libsodium.gitbook.io/doc/secret-key_cryptography/secretstream
### Version
- https://kerkour.com/rust-symmetric-encryption-aead-benchmark/
- https://docs.rs/aes-gcm/latest/aes_gcm/
This document refers to version `0.1.0` of `ruck` as defined by the `Cargo.toml` file.
- https://github.com/rust-lang/flate2-rs
- https://crates.io/crates/async-compression
## Server
- https://pdos.csail.mit.edu/papers/chord:sigcomm01/chord_sigcomm.pdf
- https://pdos.csail.mit.edu/6.824/schedule.html
- https://flylib.com/books/en/2.292.1.105/1/
- https://en.wikipedia.org/wiki/Two_Generals%27_Problem
The server in `ruck` exposes a TCP port.
Its only functions are to staple connections and shuttle bytes between stapled connections.
The first 32 bytes sent over the wire from a new client are used as its unique identifier.
When a new client joins, if the server has another open connection with the same identifier, the connections are then stapled.
The clients have some mechanism for agreeing on these identifiers, however, from the server's perspective it doesn't matter how they agree.
- https://kobzol.github.io/rust/ci/2021/05/07/building-rust-binaries-in-ci-that-work-with-older-glibc.html
Once the connection is stapled, all bytes are piped across until a client disconnects or times out.
The time out is set to remove idle connections.
The server does nothing else with the bytes, so the clients are free to end-to-end encrypt their messages.
For this reason, updates to the `ruck` protocol do not typically necessitate server redeployments.
## Todo
## Client
- [ ] Use tracing
- [ ] Add progress bars
- [ ] Exit happily when transfer is complete
- [ ] Compress files
There are two types of clients - `send` and `receive` clients.
Out of band, the clients agree on a relay server and password, from which they can derive the 32 byte identifier used by the server to staple their connections.
Clients have the option of using the single-use, automatically generated passwords which `ruck` supplies by default.
Using the passwords per the [Spake2](https://docs.rs/spake2/0.3.1/spake2/) handshake algorithm, clients generate a symmetric key with which to encrypt their subsequent messages.
Once the handshake is complete, `send` and `receive` negotiate and exchange files per the following:
- `send` offers a list of files and waits.
- `receive` specifies which bytes it wants from these files.
- `send` sends the specified bytes and waits.
- `receive` sends heartbeats with progress updates.
- `send` hangs up once the heartbeats stop or received a successful heartbeat.
- `receive` hangs up once the downloads are complete.

View file

@ -1,295 +1,107 @@
use crate::conf::BUFFER_SIZE;
use crate::crypto::handshake;
use crate::file::{to_size_string, FileHandle, FileInfo};
use crate::message::{
EncryptedMessage, FileNegotiationPayload, FileTransferPayload, Message, MessageStream,
};
use crate::connection::Connection;
use crate::file::{ChunkHeader, FileHandle, FileOffer, StdFileHandle};
use crate::handshake::Handshake;
use crate::message::{FileOfferPayload, FileRequestPayload, Message};
use crate::ui::prompt_user_for_file_confirmation;
use aes_gcm::Aes256Gcm;
use anyhow::{anyhow, Result};
use blake2::{Blake2s256, Digest};
use bytes::{Bytes, BytesMut};
use futures::future::try_join_all;
use futures::prelude::*;
use std::collections::HashMap;
use std::ffi::OsStr;
use std::path::PathBuf;
use tokio::fs::File;
use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
use tokio::sync::mpsc;
use tokio_util::codec::{FramedRead, LinesCodec};
fn pass_to_bytes(password: &String) -> Bytes {
let mut hasher = Blake2s256::new();
hasher.update(password.as_bytes());
let res = hasher.finalize();
BytesMut::from(&res[..]).freeze()
}
use tokio::net::TcpStream;
pub async fn send(file_paths: &Vec<PathBuf>, password: &String) -> Result<()> {
// Fail early if there are problems generating file handles
let handles = get_file_handles(file_paths).await?;
let handles = FileHandle::get_file_handles(file_paths).await?;
// Establish connection to server
let socket = TcpStream::connect("127.0.0.1:8080").await?;
let mut stream = Message::to_stream(socket);
// Complete handshake, returning cipher used for encryption
let (stream, cipher) = handshake(
&mut stream,
Bytes::from(password.to_string()),
pass_to_bytes(password),
)
.await?;
let (handshake, s1) = Handshake::from_password(password);
// Complete handshake, returning key used for encryption
let (socket, key) = handshake.negotiate(socket, s1).await?;
// Complete file negotiation
let handles = negotiate_files_up(handles, stream, &cipher).await?;
let mut connection = Connection::new(socket, key);
// Offer files, wait for requested file response
let requested_chunks = offer_files(&mut connection, &handles).await?;
// Upload negotiated files
upload_encrypted_files(stream, handles, &cipher).await?;
let std_file_handles = FileHandle::to_stds(handles, requested_chunks).await;
connection.upload_files(std_file_handles).await?;
println!("Done uploading.");
// Exit
Ok(())
}
pub async fn receive(password: &String) -> Result<()> {
// Establish connection to server
let socket = TcpStream::connect("127.0.0.1:8080").await?;
let mut stream = Message::to_stream(socket);
let (stream, cipher) = handshake(
&mut stream,
Bytes::from(password.to_string()),
pass_to_bytes(password),
)
.await?;
let files = negotiate_files_down(stream, &cipher).await?;
download_files(files, stream, &cipher).await?;
let (handshake, s1) = Handshake::from_password(password);
// Complete handshake, returning key used for encryption
let (socket, key) = handshake.negotiate(socket, s1).await?;
let mut connection = Connection::new(socket, key);
// Wait for offered files, respond with desired files
let desired_files = request_specific_files(&mut connection).await?;
// Create files
let std_file_handles = create_files(desired_files).await?;
// Download them
connection.download_files(std_file_handles).await?;
return Ok(());
}
pub async fn get_file_handles(file_paths: &Vec<PathBuf>) -> Result<Vec<FileHandle>> {
let tasks = file_paths
.into_iter()
.map(|path| FileHandle::new(path.to_path_buf()));
let handles = try_join_all(tasks).await?;
Ok(handles)
}
pub async fn negotiate_files_up(
file_handles: Vec<FileHandle>,
stream: &mut MessageStream,
cipher: &Aes256Gcm,
) -> Result<Vec<FileHandle>> {
let files = file_handles.iter().map(|fh| fh.to_file_info()).collect();
let msg = EncryptedMessage::FileNegotiationMessage(FileNegotiationPayload { files });
let server_msg = msg.to_encrypted_message(cipher)?;
stream.send(server_msg).await?;
let reply_payload = match stream.next().await {
Some(Ok(msg)) => match msg {
Message::EncryptedMessage(response) => response,
_ => return Err(anyhow!("Expecting encrypted message back")),
},
_ => {
return Err(anyhow!("No response to negotiation message"));
}
};
let plaintext_reply = EncryptedMessage::from_encrypted_message(cipher, &reply_payload)?;
let requested_paths: Vec<PathBuf> = match plaintext_reply {
EncryptedMessage::FileNegotiationMessage(fnm) => {
fnm.files.into_iter().map(|f| f.path).collect()
}
_ => return Err(anyhow!("Expecting file negotiation message back")),
};
Ok(file_handles
.into_iter()
.filter(|fh| requested_paths.contains(&fh.path))
.collect())
}
pub async fn negotiate_files_down(
stream: &mut MessageStream,
cipher: &Aes256Gcm,
) -> Result<Vec<FileInfo>> {
let file_offer = match stream.next().await {
Some(Ok(msg)) => match msg {
Message::EncryptedMessage(response) => response,
_ => return Err(anyhow!("Expecting encrypted message back")),
},
_ => {
return Err(anyhow!("No response to negotiation message"));
}
};
let plaintext_offer = EncryptedMessage::from_encrypted_message(cipher, &file_offer)?;
let requested_infos: Vec<FileInfo> = match plaintext_offer {
EncryptedMessage::FileNegotiationMessage(fnm) => fnm.files,
_ => return Err(anyhow!("Expecting file negotiation message back")),
};
let mut stdin = FramedRead::new(io::stdin(), LinesCodec::new());
let mut files = vec![];
for file_info in requested_infos.into_iter() {
let mut reply = prompt_user_input(&mut stdin, &file_info).await;
while reply.is_none() {
reply = prompt_user_input(&mut stdin, &file_info).await;
}
match reply {
Some(true) => files.push(file_info),
_ => {}
}
pub async fn offer_files(
conn: &mut Connection,
file_handles: &Vec<FileHandle>,
) -> Result<Vec<ChunkHeader>> {
// Collect file offer
let files = file_handles.iter().map(|fh| fh.to_file_offer()).collect();
let msg = Message::FileOffer(FileOfferPayload { files });
// Send file offer
conn.send_msg(msg).await?;
// Wait for reply
let reply = conn.await_msg().await?;
// Return requested chunks
match reply {
Message::FileRequest(file_request_payload) => Ok(file_request_payload.chunks),
_ => Err(anyhow!("Expecting file request message back")),
}
let msg = EncryptedMessage::FileNegotiationMessage(FileNegotiationPayload {
files: files.clone(),
}
pub async fn request_specific_files(conn: &mut Connection) -> Result<Vec<FileOffer>> {
// Wait for offer message
let offer_message = conn.await_msg().await?;
let offered_files: Vec<FileOffer> = match offer_message {
Message::FileOffer(file_offer_payload) => file_offer_payload.files,
_ => return Err(anyhow!("Expecting file offer message")),
};
// Prompt user for confirmation of files
let desired_files = prompt_user_for_file_confirmation(offered_files).await;
let file_request_msg = Message::FileRequest(FileRequestPayload {
chunks: desired_files
.iter()
.map(|file| ChunkHeader {
id: file.id,
start: 0,
})
.collect(),
});
let server_msg = msg.to_encrypted_message(cipher)?;
stream.send(server_msg).await?;
Ok(files)
conn.send_msg(file_request_msg).await?;
Ok(desired_files)
}
pub async fn upload_encrypted_files(
stream: &mut MessageStream,
handles: Vec<FileHandle>,
cipher: &Aes256Gcm,
) -> Result<()> {
let (tx, mut rx) = mpsc::unbounded_channel::<EncryptedMessage>();
for mut handle in handles {
let txc = tx.clone();
tokio::spawn(async move {
let _ = enqueue_file_chunks(&mut handle, txc).await;
});
}
loop {
tokio::select! {
Some(msg) = rx.recv() => {
// println!("message received to client.rx {:?}", msg);
let x = msg.to_encrypted_message(cipher)?;
stream.send(x).await?
}
else => {
println!("breaking");
break
},
}
}
Ok(())
}
pub async fn enqueue_file_chunks(
fh: &mut FileHandle,
tx: mpsc::UnboundedSender<EncryptedMessage>,
) -> Result<()> {
let mut chunk_num = 0;
let mut bytes_read = 1;
while bytes_read != 0 {
let mut buf = BytesMut::with_capacity(BUFFER_SIZE);
bytes_read = fh.file.read_buf(&mut buf).await?;
// println!("Bytes_read: {:?}, The bytes: {:?}", bytes_read, &buf[..]);
if bytes_read != 0 {
let chunk = buf.freeze();
let file_info = fh.to_file_info();
let ftp = EncryptedMessage::FileTransferMessage(FileTransferPayload {
chunk,
chunk_num,
file_info,
});
tx.send(ftp)?;
chunk_num += 1;
}
}
Ok(())
}
pub async fn download_files(
file_infos: Vec<FileInfo>,
stream: &mut MessageStream,
cipher: &Aes256Gcm,
) -> Result<()> {
// for each file_info
let mut info_handles: HashMap<PathBuf, mpsc::UnboundedSender<(u64, Bytes)>> = HashMap::new();
for fi in file_infos {
let (tx, rx) = mpsc::unbounded_channel::<(u64, Bytes)>();
let path = fi.path.clone();
tokio::spawn(async move { download_file(fi, rx).await });
info_handles.insert(path, tx);
}
loop {
tokio::select! {
result = stream.next() => match result {
Some(Ok(Message::EncryptedMessage(payload))) => {
let ec = EncryptedMessage::from_encrypted_message(cipher, &payload)?;
// println!("encrypted message received! {:?}", ec);
match ec {
EncryptedMessage::FileTransferMessage(payload) => {
println!("matched file transfer message");
if let Some(tx) = info_handles.get(&payload.file_info.path) {
println!("matched on filetype, sending to tx");
tx.send((payload.chunk_num, payload.chunk))?
};
},
_ => {println!("wrong msg")}
}
}
Some(Ok(_)) => {
println!("wrong msg");
}
Some(Err(e)) => {
println!("Error {:?}", e);
}
None => break,
}
}
}
Ok(())
}
pub async fn download_file(
file_info: FileInfo,
rx: mpsc::UnboundedReceiver<(u64, Bytes)>,
) -> Result<()> {
println!("in download file");
let mut rx = rx;
let filename = match file_info.path.file_name() {
Some(f) => {
println!("matched filename");
f
}
None => {
println!("didnt match filename");
OsStr::new("random.txt")
}
};
println!("trying to create file...filename={:?}", filename);
let mut file = File::create(filename).await?;
println!("file created ok! filename={:?}", filename);
while let Some((_chunk_num, chunk)) = rx.recv().await {
// println!("rx got message! chunk={:?}", chunk);
file.write_all(&chunk).await?;
}
println!("done receiving messages");
Ok(())
}
pub async fn prompt_user_input(
stdin: &mut FramedRead<io::Stdin, LinesCodec>,
file_info: &FileInfo,
) -> Option<bool> {
let prompt_name = file_info.path.file_name().unwrap();
println!(
"Accept {:?}? ({:?}). (Y/n)",
prompt_name,
to_size_string(file_info.size)
);
match stdin.next().await {
Some(Ok(line)) => match line.as_str() {
"" | "Y" | "y" | "yes" | "Yes" | "YES" => Some(true),
"N" | "n" | "NO" | "no" | "No" => Some(false),
_ => {
println!("Invalid input. Please enter one of the following characters: [YyNn]");
return None;
}
},
_ => None,
pub async fn create_files(desired_files: Vec<FileOffer>) -> Result<Vec<StdFileHandle>> {
let mut v = Vec::new();
for desired_file in desired_files {
let filename = desired_file
.path
.file_name()
.unwrap_or(OsStr::new("random.txt"));
let file = File::create(filename).await?;
let std_file_handle = StdFileHandle::new(desired_file.id, file, 0).await?;
v.push(std_file_handle)
}
return Ok(v);
}

View file

@ -1 +1,5 @@
pub const BUFFER_SIZE: usize = 1024 * 1024;
pub const ID_SIZE: usize = 32; // Blake256 of password
pub const HANDSHAKE_MSG_SIZE: usize = 33; // generated by Spake2
pub const PER_CLIENT_BUFFER: usize = 1024 * 64; // buffer size allocated by server for each client
pub const BUFFER_SIZE: usize = 1024 * 64; // chunk size for files sent over wire
pub const NONCE_SIZE: usize = 96 / 8; // used for every encrypted message

128
src/connection.rs Normal file
View file

@ -0,0 +1,128 @@
use crate::conf::BUFFER_SIZE;
use crate::crypto::Crypt;
use crate::file::{ChunkHeader, StdFileHandle};
use crate::message::{FileTransferPayload, Message, MessageStream};
use anyhow::{anyhow, Result};
use futures::{SinkExt, StreamExt};
use tokio::net::TcpStream;
use bytes::{Bytes, BytesMut};
use flate2::bufread::GzEncoder;
use flate2::write::GzDecoder;
use flate2::Compression;
use std::io::{BufReader, Read, Write};
use std::time::Instant;
pub struct Connection {
ms: MessageStream,
crypt: Crypt,
}
impl Connection {
pub fn new(socket: TcpStream, key: Vec<u8>) -> Self {
let ms = Message::to_stream(socket);
let crypt = Crypt::new(&key);
Connection { ms, crypt }
}
pub async fn send_bytes(&mut self, bytes: Bytes) -> Result<()> {
match self.ms.send(bytes).await {
Ok(_) => Ok(()),
Err(e) => Err(anyhow!(e.to_string())),
}
}
pub async fn send_msg(&mut self, msg: Message) -> Result<()> {
let msg = msg.serialize()?;
let bytes = self.crypt.encrypt(msg)?;
self.send_bytes(bytes).await
}
pub async fn await_msg(&mut self) -> Result<Message> {
match self.ms.next().await {
Some(Ok(msg)) => {
let decrypted_bytes = self.crypt.decrypt(msg.freeze())?;
Message::deserialize(decrypted_bytes)
}
Some(Err(e)) => Err(anyhow!(e.to_string())),
None => Err(anyhow!("Error awaiting msg")),
}
}
pub async fn upload_file(&mut self, handle: StdFileHandle) -> Result<()> {
let mut buffer = [0; BUFFER_SIZE];
let reader = BufReader::new(handle.file);
let mut gz = GzEncoder::new(reader, Compression::fast());
let before = Instant::now();
let mut count = 0;
let mut bytes_sent: u64 = 0;
loop {
count += 1;
match gz.read(&mut buffer) {
Ok(0) => {
break;
}
Ok(n) => {
bytes_sent += n as u64;
let message = Message::FileTransfer(FileTransferPayload {
chunk: BytesMut::from(&buffer[..n]).freeze(),
chunk_header: ChunkHeader {
id: handle.id,
start: 0,
},
});
self.send_msg(message).await?;
}
Err(e) => return Err(anyhow!(e.to_string())),
}
}
let elapsed = before.elapsed();
let mb_sent = bytes_sent / 1_048_576;
println!(
"{:?} mb sent, {:?} iterations. {:?} total time, {:?} avg per iteration, {:?} avg mb/sec",
mb_sent,
count,
elapsed,
elapsed / count,
mb_sent / elapsed.as_secs()
);
Ok(())
}
pub async fn upload_files(mut self, handles: Vec<StdFileHandle>) -> Result<()> {
for handle in handles {
self.upload_file(handle).await?;
}
Ok(())
}
pub async fn download_files(mut self, handles: Vec<StdFileHandle>) -> Result<()> {
for handle in handles {
self.download_file(handle).await?;
}
Ok(())
}
pub async fn download_file(&mut self, handle: StdFileHandle) -> Result<()> {
let mut decoder = GzDecoder::new(handle.file);
loop {
let msg = self.await_msg().await?;
match msg {
Message::FileTransfer(payload) => {
if payload.chunk_header.id != handle.id {
return Err(anyhow!("Wrong file"));
}
if payload.chunk.len() == 0 {
break;
}
decoder.write_all(&payload.chunk[..])?
}
_ => return Err(anyhow!("Expecting file transfer message")),
}
}
decoder.finish()?;
println!("Done downloading file.");
Ok(())
}
}

View file

@ -1,69 +1,48 @@
use crate::message::{EncryptedPayload, HandshakePayload, Message, MessageStream};
use crate::conf::NONCE_SIZE;
use aes_gcm::aead::{Aead, NewAead};
use aes_gcm::{Aes256Gcm, Key, Nonce}; // Or `Aes128Gcm`
use anyhow::{anyhow, Result};
use bytes::Bytes;
use futures::prelude::*;
use bytes::{Bytes, BytesMut};
use rand::{thread_rng, Rng};
use spake2::{Ed25519Group, Identity, Password, Spake2};
pub async fn handshake(
stream: &mut MessageStream,
password: Bytes,
id: Bytes,
) -> Result<(&mut MessageStream, Aes256Gcm)> {
let (s1, outbound_msg) =
Spake2::<Ed25519Group>::start_symmetric(&Password::new(password), &Identity::new(&id));
println!("client - sending handshake msg");
let handshake_msg = Message::HandshakeMessage(HandshakePayload {
id,
msg: Bytes::from(outbound_msg),
});
// println!("client - handshake msg, {:?}", handshake_msg);
stream.send(handshake_msg).await?;
let first_message = match stream.next().await {
Some(Ok(msg)) => match msg {
Message::HandshakeMessage(response) => response.msg,
_ => return Err(anyhow!("Expecting handshake message response")),
},
_ => {
return Err(anyhow!("No response to handshake message"));
pub struct Crypt {
cipher: Aes256Gcm,
arr: [u8; NONCE_SIZE],
}
impl Crypt {
pub fn new(key: &Vec<u8>) -> Crypt {
let key = Key::from_slice(&key[..]);
Crypt {
cipher: Aes256Gcm::new(key),
arr: [0u8; NONCE_SIZE],
}
};
println!("client - handshake msg responded to");
let key = match s1.finish(&first_message[..]) {
Ok(key_bytes) => key_bytes,
Err(e) => return Err(anyhow!(e.to_string())),
};
// println!("Handshake successful. Key is {:?}", key);
return Ok((stream, new_cipher(&key)));
}
}
pub fn new_cipher(key: &Vec<u8>) -> Aes256Gcm {
let key = Key::from_slice(&key[..]);
Aes256Gcm::new(key)
}
// Returns wire format, includes nonce as prefix
pub fn encrypt(&mut self, plaintext: Bytes) -> Result<Bytes> {
thread_rng().try_fill(&mut self.arr[..])?;
let nonce = Nonce::from_slice(&self.arr);
match self.cipher.encrypt(nonce, plaintext.as_ref()) {
Ok(body) => {
let mut buffer = BytesMut::with_capacity(NONCE_SIZE + body.len());
buffer.extend_from_slice(nonce);
buffer.extend_from_slice(&body);
Ok(buffer.freeze())
}
Err(e) => Err(anyhow!(e.to_string())),
}
}
pub const NONCE_SIZE_IN_BYTES: usize = 96 / 8;
pub fn encrypt(cipher: &Aes256Gcm, body: &Vec<u8>) -> Result<EncryptedPayload> {
let mut arr = [0u8; NONCE_SIZE_IN_BYTES];
thread_rng().try_fill(&mut arr[..])?;
let nonce = Nonce::from_slice(&arr);
let plaintext = body.as_ref();
match cipher.encrypt(nonce, plaintext) {
Ok(body) => Ok(EncryptedPayload {
nonce: arr.to_vec(),
body,
}),
Err(_) => Err(anyhow!("Encryption error")),
}
}
pub fn decrypt(cipher: &Aes256Gcm, payload: &EncryptedPayload) -> Result<Bytes> {
let nonce = Nonce::from_slice(payload.nonce.as_ref());
match cipher.decrypt(nonce, payload.body.as_ref()) {
Ok(payload) => Ok(Bytes::from(payload)),
Err(_) => Err(anyhow!("Decryption error")),
// Accepts wire format, includes nonce as prefix
pub fn decrypt(&self, ciphertext: Bytes) -> Result<Bytes> {
let mut ciphertext_body = ciphertext;
let nonce_bytes = ciphertext_body.split_to(NONCE_SIZE);
let nonce = Nonce::from_slice(&nonce_bytes);
match self.cipher.decrypt(nonce, ciphertext_body.as_ref()) {
Ok(payload) => Ok(Bytes::from(payload)),
Err(e) => Err(anyhow!(e.to_string())),
}
}
}

View file

@ -1,35 +1,104 @@
use anyhow::Result;
use futures::future::try_join_all;
use serde::{Deserialize, Serialize};
use std::fs::Metadata;
use std::path::PathBuf;
use std::io::{Seek, SeekFrom};
use tokio::fs::File;
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct FileInfo {
pub struct ChunkHeader {
pub id: u8,
pub start: u64,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct FileOffer {
pub id: u8,
pub path: PathBuf,
pub size: u64,
}
pub struct StdFileHandle {
pub id: u8,
pub file: std::fs::File,
pub start: u64,
}
impl StdFileHandle {
pub async fn new(id: u8, file: File, start: u64) -> Result<StdFileHandle> {
let mut std_file = file.into_std().await;
std_file.seek(SeekFrom::Start(start))?;
Ok(StdFileHandle {
id: id,
file: std_file,
start: start,
})
}
}
pub struct FileHandle {
pub id: u8,
pub file: File,
pub md: Metadata,
pub path: PathBuf,
}
impl FileHandle {
pub async fn new(path: PathBuf) -> Result<FileHandle> {
pub async fn new(id: u8, path: PathBuf) -> Result<FileHandle> {
let file = File::open(&path).await?;
let md = file.metadata().await?;
let fh = FileHandle { file, md, path };
let fh = FileHandle { id, file, md, path };
return Ok(fh);
}
pub fn to_file_info(&self) -> FileInfo {
FileInfo {
pub async fn to_stds(
file_handles: Vec<FileHandle>,
chunk_headers: Vec<ChunkHeader>,
) -> Vec<StdFileHandle> {
let mut ret = Vec::new();
for handle in file_handles {
let chunk = chunk_headers.iter().find(|chunk| handle.id == chunk.id);
match chunk {
Some(chunk) => {
match handle.to_std(chunk).await {
Ok(std_file_handle) => {
ret.push(std_file_handle);
}
_ => println!("Error seeking in file"),
};
}
None => {
println!("Skipping file b/c not in requested chunks");
}
}
}
ret
}
async fn to_std(self, chunk_header: &ChunkHeader) -> Result<StdFileHandle> {
StdFileHandle::new(self.id, self.file, chunk_header.start).await
}
pub fn to_file_offer(&self) -> FileOffer {
FileOffer {
id: self.id,
path: self.path.clone(),
size: self.md.len(),
}
}
pub async fn get_file_handles(file_paths: &Vec<PathBuf>) -> Result<Vec<FileHandle>> {
let tasks = file_paths
.into_iter()
.enumerate()
.map(|(idx, path)| FileHandle::new(idx.try_into().unwrap(), path.to_path_buf()));
let handles = try_join_all(tasks).await?;
Ok(handles)
}
}
const SUFFIX: [&'static str; 9] = ["B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB"];

72
src/handshake.rs Normal file
View file

@ -0,0 +1,72 @@
use crate::conf::{HANDSHAKE_MSG_SIZE, ID_SIZE};
use anyhow::{anyhow, Result};
use blake2::{Blake2s256, Digest};
use bytes::{Bytes, BytesMut};
use spake2::{Ed25519Group, Identity, Password, Spake2};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
pub struct Handshake {
pub id: Bytes,
pub outbound_msg: Bytes,
}
impl Handshake {
pub fn from_password(pw: &String) -> (Handshake, spake2::Spake2<spake2::Ed25519Group>) {
let password = Bytes::from(pw.to_string());
let id = Handshake::pass_to_bytes(&pw);
let (s1, outbound_msg) =
Spake2::<Ed25519Group>::start_symmetric(&Password::new(&password), &Identity::new(&id));
let mut buffer = BytesMut::with_capacity(HANDSHAKE_MSG_SIZE);
buffer.extend_from_slice(&outbound_msg[..HANDSHAKE_MSG_SIZE]);
let outbound_msg = buffer.freeze();
let handshake = Handshake { id, outbound_msg };
(handshake, s1)
}
pub async fn from_socket(socket: TcpStream) -> Result<(Handshake, TcpStream)> {
let mut socket = socket;
let mut buffer = [0; ID_SIZE + HANDSHAKE_MSG_SIZE];
let n = socket.read_exact(&mut buffer).await?;
println!("The bytes: {:?}", &buffer[..n]);
let mut outbound_msg = BytesMut::from(&buffer[..n]).freeze();
let id = outbound_msg.split_to(ID_SIZE);
Ok((Handshake { id, outbound_msg }, socket))
}
pub fn to_bytes(self) -> Bytes {
let mut buffer = BytesMut::with_capacity(ID_SIZE + HANDSHAKE_MSG_SIZE);
buffer.extend_from_slice(&self.id);
buffer.extend_from_slice(&self.outbound_msg);
buffer.freeze()
}
pub async fn negotiate(
self,
socket: TcpStream,
s1: spake2::Spake2<spake2::Ed25519Group>,
) -> Result<(TcpStream, Vec<u8>)> {
let mut socket = socket;
let bytes = &self.to_bytes();
println!("client - sending handshake msg= {:?}", &bytes);
socket.write_all(&bytes).await?;
let mut buffer = [0; HANDSHAKE_MSG_SIZE];
let n = socket.read_exact(&mut buffer).await?;
let response = BytesMut::from(&buffer[..n]).freeze();
// println!("client - handshake msg, {:?}", response);
let key = match s1.finish(&response[..]) {
Ok(key_bytes) => key_bytes,
Err(e) => return Err(anyhow!(e.to_string())),
};
println!("Handshake successful. Key is {:?}", key);
return Ok((socket, key));
}
fn pass_to_bytes(password: &String) -> Bytes {
let mut hasher = Blake2s256::new();
hasher.update(password.as_bytes());
let res = hasher.finalize();
BytesMut::from(&res[..]).freeze()
}
}

View file

@ -1,10 +1,13 @@
mod cli;
mod client;
mod conf;
mod connection;
mod crypto;
mod file;
mod handshake;
mod message;
mod server;
mod ui;
use clap::Parser;
use cli::{Cli, Commands};

View file

@ -1,109 +1,50 @@
use crate::crypto::{decrypt, encrypt};
use crate::file::FileInfo;
use crate::file::{ChunkHeader, FileOffer};
use aes_gcm::Aes256Gcm; // Or `Aes128Gcm`
use anyhow::{anyhow, Result};
use bytes::Bytes;
use serde::{Deserialize, Serialize};
use std::error::Error;
use std::fmt;
use tokio::net::TcpStream;
use tokio_serde::{formats::SymmetricalBincode, SymmetricallyFramed};
use tokio_util::codec::{Framed, LengthDelimitedCodec};
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum Message {
HandshakeMessage(HandshakePayload),
EncryptedMessage(EncryptedPayload),
ErrorMessage(RuckError),
FileOffer(FileOfferPayload),
FileRequest(FileRequestPayload),
FileTransfer(FileTransferPayload),
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct HandshakePayload {
pub id: Bytes,
pub msg: Bytes,
pub struct FileRequestPayload {
pub chunks: Vec<ChunkHeader>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct EncryptedPayload {
pub nonce: Vec<u8>,
pub body: Vec<u8>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum EncryptedMessage {
FileNegotiationMessage(FileNegotiationPayload),
FileTransferMessage(FileTransferPayload),
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct FileNegotiationPayload {
pub files: Vec<FileInfo>,
pub struct FileOfferPayload {
pub files: Vec<FileOffer>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct FileTransferPayload {
pub file_info: FileInfo,
pub chunk_num: u64,
pub chunk_header: ChunkHeader,
pub chunk: Bytes,
}
impl EncryptedMessage {
pub fn from_encrypted_message(cipher: &Aes256Gcm, payload: &EncryptedPayload) -> Result<Self> {
let raw = decrypt(cipher, payload)?;
let res = match bincode::deserialize(raw.as_ref()) {
Ok(result) => result,
Err(e) => {
println!("deserialize error {:?}", e);
return Err(anyhow!("deser error"));
}
};
Ok(res)
impl Message {
pub fn serialize(&self) -> Result<Bytes> {
bincode::serialize(&self).map(|vec| Ok(Bytes::from(vec)))?
}
pub fn to_encrypted_message(&self, cipher: &Aes256Gcm) -> Result<Message> {
let raw = match bincode::serialize(&self) {
Ok(result) => result,
Err(e) => {
println!("serialize error {:?}", e);
return Err(anyhow!("serialize error"));
}
};
let payload = encrypt(cipher, &raw)?;
Ok(Message::EncryptedMessage(payload))
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum RuckError {
NotHandshake,
SenderNotConnected,
SenderAlreadyConnected,
PairDisconnected,
}
impl fmt::Display for RuckError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "RuckError is here!")
}
}
impl Error for RuckError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
Some(self)
pub fn deserialize(bytes: Bytes) -> Result<Self> {
match bincode::deserialize(bytes.as_ref()) {
Ok(msg) => Ok(msg),
Err(e) => Err(anyhow!(e.to_string())),
}
}
}
impl Message {
pub fn to_stream(stream: TcpStream) -> MessageStream {
tokio_serde::SymmetricallyFramed::new(
Framed::new(stream, LengthDelimitedCodec::new()),
tokio_serde::formats::SymmetricalBincode::<Message>::default(),
)
Framed::new(stream, LengthDelimitedCodec::new())
}
}
pub type MessageStream = SymmetricallyFramed<
Framed<TcpStream, LengthDelimitedCodec>,
Message,
SymmetricalBincode<Message>,
>;
pub type MessageStream = Framed<TcpStream, LengthDelimitedCodec>;

View file

@ -1,16 +1,16 @@
use crate::message::{HandshakePayload, Message, MessageStream, RuckError};
use crate::conf::PER_CLIENT_BUFFER;
use crate::handshake::Handshake;
use anyhow::{anyhow, Result};
use bytes::Bytes;
use futures::prelude::*;
use bytes::{Bytes, BytesMut};
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::{mpsc, Mutex};
type Tx = mpsc::UnboundedSender<Message>;
type Rx = mpsc::UnboundedReceiver<Message>;
type Tx = mpsc::UnboundedSender<Bytes>;
type Rx = mpsc::UnboundedReceiver<Bytes>;
pub struct Shared {
handshake_cache: HashMap<Bytes, Tx>,
@ -18,12 +18,12 @@ pub struct Shared {
type State = Arc<Mutex<Shared>>;
struct Client {
messages: MessageStream,
socket: TcpStream,
rx: Rx,
peer_tx: Option<Tx>,
}
struct StapledClient {
messages: MessageStream,
socket: TcpStream,
rx: Rx,
peer_tx: Tx,
}
@ -37,11 +37,11 @@ impl Shared {
}
impl Client {
async fn new(id: Bytes, state: State, messages: MessageStream) -> Result<Client> {
async fn new(id: Bytes, state: State, socket: TcpStream) -> Result<Client> {
let (tx, rx) = mpsc::unbounded_channel();
let mut shared = state.lock().await;
let client = Client {
messages,
socket,
rx,
peer_tx: shared.handshake_cache.remove(&id),
};
@ -49,11 +49,7 @@ impl Client {
Ok(client)
}
async fn upgrade(
client: Client,
state: State,
handshake_payload: HandshakePayload,
) -> Result<StapledClient> {
async fn upgrade(client: Client, state: State, handshake: Handshake) -> Result<StapledClient> {
let mut client = client;
let peer_tx = match client.peer_tx {
// Receiver - already stapled at creation
@ -61,28 +57,21 @@ impl Client {
// Sender - needs to wait for the incoming msg to look up peer_tx
None => {
tokio::select! {
// Client reads handshake message sent over channel
Some(msg) = client.rx.recv() => {
client.messages.send(msg).await?
}
result = client.messages.next() => match result {
Some(_) => return Err(anyhow!("Client sending more messages before handshake complete")),
None => return Err(anyhow!("Connection interrupted")),
// Writes parnter handshake message over wire
client.socket.write_all(&msg[..]).await?
}
}
match state
.lock()
.await
.handshake_cache
.remove(&handshake_payload.id)
{
match state.lock().await.handshake_cache.remove(&handshake.id) {
Some(peer_tx) => peer_tx,
None => return Err(anyhow!("Connection not stapled")),
}
}
};
peer_tx.send(Message::HandshakeMessage(handshake_payload))?;
peer_tx.send(handshake.outbound_msg)?;
Ok(StapledClient {
messages: client.messages,
socket: client.socket,
rx: client.rx,
peer_tx,
})
@ -109,26 +98,14 @@ pub async fn serve() -> Result<()> {
pub async fn handle_connection(
state: Arc<Mutex<Shared>>,
socket: TcpStream,
addr: SocketAddr,
_addr: SocketAddr,
) -> Result<()> {
let mut stream = Message::to_stream(socket);
println!("server - new conn from {:?}", addr);
let handshake_payload = match stream.next().await {
Some(Ok(Message::HandshakeMessage(payload))) => payload,
Some(Ok(_)) => {
stream
.send(Message::ErrorMessage(RuckError::NotHandshake))
.await?;
return Ok(());
}
_ => {
println!("No first message");
return Ok(());
}
};
let id = handshake_payload.id.clone();
let client = Client::new(id.clone(), state.clone(), stream).await?;
let mut client = match Client::upgrade(client, state.clone(), handshake_payload).await {
socket.readable().await?;
let (handshake, socket) = Handshake::from_socket(socket).await?;
let id = handshake.id.clone();
let client = Client::new(id.clone(), state.clone(), socket).await?;
println!("Client created");
let mut client = match Client::upgrade(client, state.clone(), handshake).await {
Ok(client) => client,
Err(err) => {
// Clear handshake cache if staple is unsuccessful
@ -136,22 +113,30 @@ pub async fn handle_connection(
return Err(err);
}
};
println!("Client upgraded");
// The handshake cache should be empty for {id} at this point.
let mut client_buffer = BytesMut::with_capacity(PER_CLIENT_BUFFER);
loop {
tokio::select! {
Some(msg) = client.rx.recv() => {
client.messages.send(msg).await?
// println!("piping bytes= {:?}", msg);
client.socket.write_all(&msg[..]).await?
}
result = client.messages.next() => match result {
Some(Ok(msg)) => {
client.peer_tx.send(msg)?
}
Some(Err(e)) => {
result = client.socket.read_buf(&mut client_buffer) => match result {
Ok(0) => {
break;
},
Ok(n) => {
let b = BytesMut::from(&client_buffer[0..n]).freeze();
client.peer_tx.send(b)?;
client_buffer.clear();
},
Err(e) => {
println!("Error {:?}", e);
}
None => break,
}
}
}
println!("done with client");
Ok(())
}

46
src/ui.rs Normal file
View file

@ -0,0 +1,46 @@
use crate::file::{to_size_string, FileOffer};
use futures::prelude::*;
use tokio::io::{self};
use tokio_util::codec::{FramedRead, LinesCodec};
pub async fn prompt_user_for_file_confirmation(file_offers: Vec<FileOffer>) -> Vec<FileOffer> {
let mut stdin = FramedRead::new(io::stdin(), LinesCodec::new());
let mut files = vec![];
for file_offer in file_offers.into_iter() {
let mut reply = prompt_user_input(&mut stdin, &file_offer).await;
while reply.is_none() {
reply = prompt_user_input(&mut stdin, &file_offer).await;
}
match reply {
Some(true) => files.push(file_offer),
_ => {}
}
}
files
}
pub async fn prompt_user_input(
stdin: &mut FramedRead<io::Stdin, LinesCodec>,
file_offer: &FileOffer,
) -> Option<bool> {
let prompt_name = file_offer.path.file_name().unwrap();
println!(
"Accept {:?}? ({:?}). (Y/n)",
prompt_name,
to_size_string(file_offer.size)
);
match stdin.next().await {
Some(Ok(line)) => match line.as_str() {
"" | "Y" | "y" | "yes" | "Yes" | "YES" => Some(true),
"N" | "n" | "NO" | "no" | "No" => Some(false),
_ => {
println!("Invalid input. Please enter one of the following characters: [YyNn]");
return None;
}
},
_ => None,
}
}