Merge pull request #5 from rictorlome/resume

Resume
This commit is contained in:
rictorlome 2022-09-03 15:35:24 -04:00 committed by GitHub
commit 09a67f9944
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 124 additions and 44 deletions

View file

@ -2,11 +2,22 @@
`ruck` is a command line tool used for hosting relay servers and sending end-to-end encrypted files between clients. It was heavily inspired by [croc](https://github.com/schollz/croc), one of the easiest ways to send files between peers. This document describes the protocol `ruck` uses to support this functionality. `ruck` is a command line tool used for hosting relay servers and sending end-to-end encrypted files between clients. It was heavily inspired by [croc](https://github.com/schollz/croc), one of the easiest ways to send files between peers. This document describes the protocol `ruck` uses to support this functionality.
### Version ## Usage
This document refers to version `0.1.0` of `ruck` as defined by the `Cargo.toml` file. ```bash
## tab 1
cargo run relay # this starts the server
## Server ## tab 2
cargo run send /path/to/file1.md /path/to/file2.md supersecretpassword
## tab 3
cargo run receive supersecretpassword
```
## Protocol
### Server
The server in `ruck` exposes a TCP port. The server in `ruck` exposes a TCP port.
Its only functions are to staple connections and shuttle bytes between stapled connections. Its only functions are to staple connections and shuttle bytes between stapled connections.
@ -19,7 +30,7 @@ The time out is set to remove idle connections.
The server does nothing else with the bytes, so the clients are free to end-to-end encrypt their messages. The server does nothing else with the bytes, so the clients are free to end-to-end encrypt their messages.
For this reason, updates to the `ruck` protocol do not typically necessitate server redeployments. For this reason, updates to the `ruck` protocol do not typically necessitate server redeployments.
## Client ### Client
There are two types of clients - `send` and `receive` clients. There are two types of clients - `send` and `receive` clients.
Out of band, the clients agree on a relay server and password, from which they can derive the 32 byte identifier used by the server to staple their connections. Out of band, the clients agree on a relay server and password, from which they can derive the 32 byte identifier used by the server to staple their connections.
@ -29,7 +40,5 @@ Once the handshake is complete, `send` and `receive` negotiate and exchange file
- `send` offers a list of files and waits. - `send` offers a list of files and waits.
- `receive` specifies which bytes it wants from these files. - `receive` specifies which bytes it wants from these files.
- `send` sends the specified bytes and waits. - `send` sends the specified bytes, then a completion message and hangs up.
- `receive` sends heartbeats with progress updates.
- `send` hangs up once the heartbeats stop or received a successful heartbeat.
- `receive` hangs up once the downloads are complete. - `receive` hangs up once the downloads are complete.

View file

@ -6,7 +6,6 @@ use crate::ui::prompt_user_for_file_confirmation;
use anyhow::{anyhow, Result}; use anyhow::{anyhow, Result};
use std::ffi::OsStr;
use std::path::PathBuf; use std::path::PathBuf;
use tokio::fs::File; use tokio::fs::File;
@ -44,9 +43,7 @@ pub async fn receive(password: &String) -> Result<()> {
let (socket, key) = handshake.negotiate(socket, s1).await?; let (socket, key) = handshake.negotiate(socket, s1).await?;
let mut connection = Connection::new(socket, key); let mut connection = Connection::new(socket, key);
// Wait for offered files, respond with desired files // Wait for offered files, respond with desired files
let desired_files = request_specific_files(&mut connection).await?; let std_file_handles = request_specific_files(&mut connection).await?;
// Create files
let std_file_handles = create_files(desired_files).await?;
// Download them // Download them
connection.download_files(std_file_handles).await?; connection.download_files(std_file_handles).await?;
return Ok(()); return Ok(());
@ -57,7 +54,10 @@ pub async fn offer_files(
file_handles: &Vec<FileHandle>, file_handles: &Vec<FileHandle>,
) -> Result<Vec<ChunkHeader>> { ) -> Result<Vec<ChunkHeader>> {
// Collect file offer // Collect file offer
let files = file_handles.iter().map(|fh| fh.to_file_offer()).collect(); let mut files = vec![];
for handle in file_handles {
files.push(handle.to_file_offer()?);
}
let msg = Message::FileOffer(FileOfferPayload { files }); let msg = Message::FileOffer(FileOfferPayload { files });
// Send file offer // Send file offer
conn.send_msg(msg).await?; conn.send_msg(msg).await?;
@ -70,7 +70,7 @@ pub async fn offer_files(
} }
} }
pub async fn request_specific_files(conn: &mut Connection) -> Result<Vec<FileOffer>> { pub async fn request_specific_files(conn: &mut Connection) -> Result<Vec<StdFileHandle>> {
// Wait for offer message // Wait for offer message
let offer_message = conn.await_msg().await?; let offer_message = conn.await_msg().await?;
let offered_files: Vec<FileOffer> = match offer_message { let offered_files: Vec<FileOffer> = match offer_message {
@ -79,28 +79,50 @@ pub async fn request_specific_files(conn: &mut Connection) -> Result<Vec<FileOff
}; };
// Prompt user for confirmation of files // Prompt user for confirmation of files
let desired_files = prompt_user_for_file_confirmation(offered_files).await; let desired_files = prompt_user_for_file_confirmation(offered_files).await;
let std_file_handles = create_or_find_files(desired_files).await?;
let file_request_msg = Message::FileRequest(FileRequestPayload { let file_request_msg = Message::FileRequest(FileRequestPayload {
chunks: desired_files chunks: std_file_handles
.iter() .iter()
.map(|file| ChunkHeader { .map(|file| ChunkHeader {
id: file.id, id: file.id,
start: 0, start: file.start,
}) })
.collect(), .collect(),
}); });
conn.send_msg(file_request_msg).await?; conn.send_msg(file_request_msg).await?;
Ok(desired_files) Ok(std_file_handles)
} }
pub async fn create_files(desired_files: Vec<FileOffer>) -> Result<Vec<StdFileHandle>> { pub async fn create_or_find_files(desired_files: Vec<FileOffer>) -> Result<Vec<StdFileHandle>> {
let mut v = Vec::new(); let mut v = Vec::new();
for desired_file in desired_files { for desired_file in desired_files {
let filename = desired_file let filename = desired_file.path;
.path // filename.push_str(".part");
.file_name() let file = match File::open(filename.clone()).await {
.unwrap_or(OsStr::new("random.txt")); Ok(file) => {
let file = File::create(filename).await?; println!(
let std_file_handle = StdFileHandle::new(desired_file.id, file, 0).await?; "File {:?} already exists. Attempting to resume download.",
filename
);
file
}
Err(_) => File::create(&filename).await?,
};
let metadata = file.metadata().await?;
println!(
"File: {:?}. Current len: {:?}, Full Size: {:?}",
filename.clone(),
metadata.len(),
desired_file.size
);
let std_file_handle = StdFileHandle::new(
desired_file.id,
filename,
file,
metadata.len(),
desired_file.size,
)
.await?;
v.push(std_file_handle) v.push(std_file_handle)
} }
return Ok(v); return Ok(v);

View file

@ -77,15 +77,17 @@ impl Connection {
Err(e) => return Err(anyhow!(e.to_string())), Err(e) => return Err(anyhow!(e.to_string())),
} }
} }
self.send_msg(Message::FileTransferComplete).await?;
let elapsed = before.elapsed(); let elapsed = before.elapsed();
let mb_sent = bytes_sent / 1_048_576; let mb_sent = bytes_sent / 1_048_576;
println!( println!(
"{:?} mb sent, {:?} iterations. {:?} total time, {:?} avg per iteration, {:?} avg mb/sec", "{:?}: {:?} mb sent (compressed), {:?} iterations. {:?} total time, {:?} avg per iteration, {:?} avg mb/sec",
handle.name,
mb_sent, mb_sent,
count, count,
elapsed, elapsed,
elapsed / count, elapsed / count,
mb_sent / elapsed.as_secs() 1000 * mb_sent as u128 / elapsed.as_millis()
); );
Ok(()) Ok(())
} }
@ -105,6 +107,7 @@ impl Connection {
} }
pub async fn download_file(&mut self, handle: StdFileHandle) -> Result<()> { pub async fn download_file(&mut self, handle: StdFileHandle) -> Result<()> {
let clone = handle.file.try_clone()?;
let mut decoder = GzDecoder::new(handle.file); let mut decoder = GzDecoder::new(handle.file);
loop { loop {
let msg = self.await_msg().await?; let msg = self.await_msg().await?;
@ -113,16 +116,32 @@ impl Connection {
if payload.chunk_header.id != handle.id { if payload.chunk_header.id != handle.id {
return Err(anyhow!("Wrong file")); return Err(anyhow!("Wrong file"));
} }
if payload.chunk.len() == 0 {
break;
}
decoder.write_all(&payload.chunk[..])? decoder.write_all(&payload.chunk[..])?
} }
Message::FileTransferComplete => {
break;
}
_ => return Err(anyhow!("Expecting file transfer message")), _ => return Err(anyhow!("Expecting file transfer message")),
} }
} }
decoder.finish()?; decoder.finish()?;
println!("Done downloading file."); println!("Done downloading {:?}.", handle.name);
Connection::check_and_finish_download(clone, handle.name, handle.size).await?;
Ok(()) Ok(())
} }
pub async fn check_and_finish_download(
file: std::fs::File,
filename: String,
size: u64,
) -> Result<()> {
let metadata = file.metadata()?;
if metadata.len() == size {
println!("OK: downloaded {:?} matches advertised size.", filename);
return Ok(());
}
return Err(anyhow!(
"Downloaded file does not match expected size. Try again"
));
}
} }

View file

@ -1,4 +1,4 @@
use anyhow::Result; use anyhow::{anyhow, Result};
use futures::future::try_join_all; use futures::future::try_join_all;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -18,24 +18,34 @@ pub struct ChunkHeader {
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct FileOffer { pub struct FileOffer {
pub id: u8, pub id: u8,
pub path: PathBuf, pub path: String,
pub size: u64, pub size: u64,
} }
pub struct StdFileHandle { pub struct StdFileHandle {
pub id: u8, pub id: u8,
pub name: String,
pub file: std::fs::File, pub file: std::fs::File,
pub start: u64, pub start: u64,
pub size: u64,
} }
impl StdFileHandle { impl StdFileHandle {
pub async fn new(id: u8, file: File, start: u64) -> Result<StdFileHandle> { pub async fn new(
let mut std_file = file.into_std().await; id: u8,
std_file.seek(SeekFrom::Start(start))?; name: String,
file: File,
start: u64,
size: u64,
) -> Result<StdFileHandle> {
let mut file = file.into_std().await;
file.seek(SeekFrom::Start(start))?;
Ok(StdFileHandle { Ok(StdFileHandle {
id: id, id,
file: std_file, name,
start: start, file,
start,
size,
}) })
} }
} }
@ -80,15 +90,22 @@ impl FileHandle {
} }
async fn to_std(self, chunk_header: &ChunkHeader) -> Result<StdFileHandle> { async fn to_std(self, chunk_header: &ChunkHeader) -> Result<StdFileHandle> {
StdFileHandle::new(self.id, self.file, chunk_header.start).await StdFileHandle::new(
self.id,
pathbuf_to_string(&self.path)?,
self.file,
chunk_header.start,
self.md.len(),
)
.await
} }
pub fn to_file_offer(&self) -> FileOffer { pub fn to_file_offer(&self) -> Result<FileOffer> {
FileOffer { Ok(FileOffer {
id: self.id, id: self.id,
path: self.path.clone(), path: pathbuf_to_string(&self.path)?,
size: self.md.len(), size: self.md.len(),
} })
} }
pub async fn get_file_handles(file_paths: &Vec<PathBuf>) -> Result<Vec<FileHandle>> { pub async fn get_file_handles(file_paths: &Vec<PathBuf>) -> Result<Vec<FileHandle>> {
@ -115,3 +132,15 @@ pub fn to_size_string(size: u64) -> String {
result result
} }
pub fn pathbuf_to_string(path: &PathBuf) -> Result<String> {
let filename = match path.file_name() {
Some(s) => s,
None => return Err(anyhow!("Could not get filename from file offer.")),
};
let filename = filename.to_os_string();
match filename.into_string() {
Ok(s) => Ok(s),
Err(_) => Err(anyhow!("Error converting {:?} to String", path)),
}
}

View file

@ -49,7 +49,7 @@ impl Handshake {
) -> Result<(TcpStream, Vec<u8>)> { ) -> Result<(TcpStream, Vec<u8>)> {
let mut socket = socket; let mut socket = socket;
let bytes = &self.to_bytes(); let bytes = &self.to_bytes();
println!("client - sending handshake msg= {:?}", &bytes); // println!("client - sending handshake msg= {:?}", &bytes);
socket.write_all(&bytes).await?; socket.write_all(&bytes).await?;
let mut buffer = [0; HANDSHAKE_MSG_SIZE]; let mut buffer = [0; HANDSHAKE_MSG_SIZE];
let n = socket.read_exact(&mut buffer).await?; let n = socket.read_exact(&mut buffer).await?;

View file

@ -11,6 +11,7 @@ pub enum Message {
FileOffer(FileOfferPayload), FileOffer(FileOfferPayload),
FileRequest(FileRequestPayload), FileRequest(FileRequestPayload),
FileTransfer(FileTransferPayload), FileTransfer(FileTransferPayload),
FileTransferComplete,
} }
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]

View file

@ -26,7 +26,7 @@ pub async fn prompt_user_input(
stdin: &mut FramedRead<io::Stdin, LinesCodec>, stdin: &mut FramedRead<io::Stdin, LinesCodec>,
file_offer: &FileOffer, file_offer: &FileOffer,
) -> Option<bool> { ) -> Option<bool> {
let prompt_name = file_offer.path.file_name().unwrap(); let prompt_name = &file_offer.path;
println!( println!(
"Accept {:?}? ({:?}). (Y/n)", "Accept {:?}? ({:?}). (Y/n)",
prompt_name, prompt_name,