Resume does not work

This commit is contained in:
Donald Knuth 2022-09-03 14:58:48 -04:00
parent bb32cf8686
commit 28a97f675e
5 changed files with 71 additions and 27 deletions

View file

@ -6,7 +6,6 @@ use crate::ui::prompt_user_for_file_confirmation;
use anyhow::{anyhow, Result}; use anyhow::{anyhow, Result};
use std::ffi::OsStr;
use std::path::PathBuf; use std::path::PathBuf;
use tokio::fs::File; use tokio::fs::File;
@ -46,7 +45,7 @@ pub async fn receive(password: &String) -> Result<()> {
// Wait for offered files, respond with desired files // Wait for offered files, respond with desired files
let desired_files = request_specific_files(&mut connection).await?; let desired_files = request_specific_files(&mut connection).await?;
// Create files // Create files
let std_file_handles = create_files(desired_files).await?; let std_file_handles = create_or_find_files(desired_files).await?;
// Download them // Download them
connection.download_files(std_file_handles).await?; connection.download_files(std_file_handles).await?;
return Ok(()); return Ok(());
@ -57,7 +56,10 @@ pub async fn offer_files(
file_handles: &Vec<FileHandle>, file_handles: &Vec<FileHandle>,
) -> Result<Vec<ChunkHeader>> { ) -> Result<Vec<ChunkHeader>> {
// Collect file offer // Collect file offer
let files = file_handles.iter().map(|fh| fh.to_file_offer()).collect(); let mut files = vec![];
for handle in file_handles {
files.push(handle.to_file_offer()?);
}
let msg = Message::FileOffer(FileOfferPayload { files }); let msg = Message::FileOffer(FileOfferPayload { files });
// Send file offer // Send file offer
conn.send_msg(msg).await?; conn.send_msg(msg).await?;
@ -92,15 +94,24 @@ pub async fn request_specific_files(conn: &mut Connection) -> Result<Vec<FileOff
Ok(desired_files) Ok(desired_files)
} }
pub async fn create_files(desired_files: Vec<FileOffer>) -> Result<Vec<StdFileHandle>> { pub async fn create_or_find_files(desired_files: Vec<FileOffer>) -> Result<Vec<StdFileHandle>> {
let mut v = Vec::new(); let mut v = Vec::new();
for desired_file in desired_files { for desired_file in desired_files {
let filename = desired_file let mut filename = desired_file.path;
.path filename.push_str(".part");
.file_name() let file = match File::open(filename.clone()).await {
.unwrap_or(OsStr::new("random.txt")); Ok(file) => {
let file = File::create(filename).await?; println!(
let std_file_handle = StdFileHandle::new(desired_file.id, file, 0).await?; "File {:?} already exists. Attempting to resume download.",
filename
);
file
}
Err(_) => File::create(filename).await?,
};
let metadata = file.metadata().await?;
let std_file_handle =
StdFileHandle::new(desired_file.id, file, metadata.len(), desired_file.size).await?;
v.push(std_file_handle) v.push(std_file_handle)
} }
return Ok(v); return Ok(v);

View file

@ -77,6 +77,7 @@ impl Connection {
Err(e) => return Err(anyhow!(e.to_string())), Err(e) => return Err(anyhow!(e.to_string())),
} }
} }
self.send_msg(Message::FileTransferComplete).await?;
let elapsed = before.elapsed(); let elapsed = before.elapsed();
let mb_sent = bytes_sent / 1_048_576; let mb_sent = bytes_sent / 1_048_576;
println!( println!(
@ -105,24 +106,38 @@ impl Connection {
} }
pub async fn download_file(&mut self, handle: StdFileHandle) -> Result<()> { pub async fn download_file(&mut self, handle: StdFileHandle) -> Result<()> {
let clone = handle.file.try_clone()?;
let mut decoder = GzDecoder::new(handle.file); let mut decoder = GzDecoder::new(handle.file);
loop { loop {
let msg = self.await_msg().await?; let msg = self.await_msg().await?;
match msg { match msg {
Message::FileTransfer(payload) => { Message::FileTransfer(payload) => {
println!("In download");
if payload.chunk_header.id != handle.id { if payload.chunk_header.id != handle.id {
return Err(anyhow!("Wrong file")); return Err(anyhow!("Wrong file"));
} }
if payload.chunk.len() == 0 {
break;
}
decoder.write_all(&payload.chunk[..])? decoder.write_all(&payload.chunk[..])?
} }
Message::FileTransferComplete => {
break;
}
_ => return Err(anyhow!("Expecting file transfer message")), _ => return Err(anyhow!("Expecting file transfer message")),
} }
} }
decoder.finish()?; decoder.finish()?;
println!("Done downloading file."); println!("Done downloading file.");
Connection::check_and_finish_download(clone, handle.size).await?;
Ok(()) Ok(())
} }
pub async fn check_and_finish_download(file: std::fs::File, size: u64) -> Result<()> {
let metadata = file.metadata()?;
if metadata.len() == size {
println!("File looks good.");
return Ok(());
}
return Err(anyhow!(
"Downloaded file does not match expected size. Try again"
));
}
} }

View file

@ -1,4 +1,4 @@
use anyhow::Result; use anyhow::{anyhow, Result};
use futures::future::try_join_all; use futures::future::try_join_all;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -18,7 +18,7 @@ pub struct ChunkHeader {
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct FileOffer { pub struct FileOffer {
pub id: u8, pub id: u8,
pub path: PathBuf, pub path: String,
pub size: u64, pub size: u64,
} }
@ -26,16 +26,21 @@ pub struct StdFileHandle {
pub id: u8, pub id: u8,
pub file: std::fs::File, pub file: std::fs::File,
pub start: u64, pub start: u64,
pub size: u64,
} }
impl StdFileHandle { impl StdFileHandle {
pub async fn new(id: u8, file: File, start: u64) -> Result<StdFileHandle> { pub async fn new(id: u8, file: File, start: u64, size: u64) -> Result<StdFileHandle> {
let mut std_file = file.into_std().await; let mut file = file.into_std().await;
std_file.seek(SeekFrom::Start(start))?; file.seek(SeekFrom::Start(start))?;
if start != 0 {
println!("Seeking to {:?}", start);
}
Ok(StdFileHandle { Ok(StdFileHandle {
id: id, id,
file: std_file, file,
start: start, start,
size,
}) })
} }
} }
@ -80,15 +85,15 @@ impl FileHandle {
} }
async fn to_std(self, chunk_header: &ChunkHeader) -> Result<StdFileHandle> { async fn to_std(self, chunk_header: &ChunkHeader) -> Result<StdFileHandle> {
StdFileHandle::new(self.id, self.file, chunk_header.start).await StdFileHandle::new(self.id, self.file, chunk_header.start, self.md.len()).await
} }
pub fn to_file_offer(&self) -> FileOffer { pub fn to_file_offer(&self) -> Result<FileOffer> {
FileOffer { Ok(FileOffer {
id: self.id, id: self.id,
path: self.path.clone(), path: pathbuf_to_string(&self.path)?,
size: self.md.len(), size: self.md.len(),
} })
} }
pub async fn get_file_handles(file_paths: &Vec<PathBuf>) -> Result<Vec<FileHandle>> { pub async fn get_file_handles(file_paths: &Vec<PathBuf>) -> Result<Vec<FileHandle>> {
@ -115,3 +120,15 @@ pub fn to_size_string(size: u64) -> String {
result result
} }
pub fn pathbuf_to_string(path: &PathBuf) -> Result<String> {
let filename = match path.file_name() {
Some(s) => s,
None => return Err(anyhow!("Could not get filename from file offer.")),
};
let filename = filename.to_os_string();
match filename.into_string() {
Ok(s) => Ok(s),
Err(e) => Err(anyhow!("Error converting {:?} to String", path)),
}
}

View file

@ -11,6 +11,7 @@ pub enum Message {
FileOffer(FileOfferPayload), FileOffer(FileOfferPayload),
FileRequest(FileRequestPayload), FileRequest(FileRequestPayload),
FileTransfer(FileTransferPayload), FileTransfer(FileTransferPayload),
FileTransferComplete,
} }
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]

View file

@ -26,7 +26,7 @@ pub async fn prompt_user_input(
stdin: &mut FramedRead<io::Stdin, LinesCodec>, stdin: &mut FramedRead<io::Stdin, LinesCodec>,
file_offer: &FileOffer, file_offer: &FileOffer,
) -> Option<bool> { ) -> Option<bool> {
let prompt_name = file_offer.path.file_name().unwrap(); let prompt_name = &file_offer.path;
println!( println!(
"Accept {:?}? ({:?}). (Y/n)", "Accept {:?}? ({:?}). (Y/n)",
prompt_name, prompt_name,