mirror of
https://github.com/CompeyDev/ruck.git
synced 2025-01-08 11:49:09 +00:00
Fixed sending files prototype
This commit is contained in:
parent
9206781791
commit
d29015f085
1 changed files with 44 additions and 28 deletions
|
@ -167,7 +167,10 @@ pub async fn upload_encrypted_files(
|
||||||
let x = msg.to_encrypted_message(cipher)?;
|
let x = msg.to_encrypted_message(cipher)?;
|
||||||
stream.send(x).await?
|
stream.send(x).await?
|
||||||
}
|
}
|
||||||
else => break,
|
else => {
|
||||||
|
println!("breaking");
|
||||||
|
break
|
||||||
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -178,21 +181,22 @@ pub async fn enqueue_file_chunks(
|
||||||
tx: mpsc::UnboundedSender<EncryptedMessage>,
|
tx: mpsc::UnboundedSender<EncryptedMessage>,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let mut chunk_num = 0;
|
let mut chunk_num = 0;
|
||||||
let mut buf: BytesMut;
|
let mut bytes_read = 1;
|
||||||
while {
|
while bytes_read != 0 {
|
||||||
buf = BytesMut::with_capacity(BUFFER_SIZE);
|
let mut buf = BytesMut::with_capacity(BUFFER_SIZE);
|
||||||
let n = fh.file.read_exact(&mut buf[..]).await?;
|
bytes_read = fh.file.read_buf(&mut buf).await?;
|
||||||
n == 0
|
println!("Bytes_read: {:?}, The bytes: {:?}", bytes_read, &buf[..]);
|
||||||
} {
|
if bytes_read != 0 {
|
||||||
let chunk = buf.freeze();
|
let chunk = buf.freeze();
|
||||||
let file_info = fh.to_file_info();
|
let file_info = fh.to_file_info();
|
||||||
let ftp = EncryptedMessage::FileTransferMessage(FileTransferPayload {
|
let ftp = EncryptedMessage::FileTransferMessage(FileTransferPayload {
|
||||||
chunk,
|
chunk,
|
||||||
chunk_num,
|
chunk_num,
|
||||||
file_info,
|
file_info,
|
||||||
});
|
});
|
||||||
tx.send(ftp)?;
|
tx.send(ftp)?;
|
||||||
chunk_num += 1;
|
chunk_num += 1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -204,23 +208,24 @@ pub async fn download_files(
|
||||||
cipher: &Aes256Gcm,
|
cipher: &Aes256Gcm,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
// for each file_info
|
// for each file_info
|
||||||
let info_handles: HashMap<_, _> = file_infos
|
let mut info_handles: HashMap<PathBuf, mpsc::UnboundedSender<(u64, Bytes)>> = HashMap::new();
|
||||||
.into_iter()
|
for fi in file_infos {
|
||||||
.map(|fi| {
|
let (tx, rx) = mpsc::unbounded_channel::<(u64, Bytes)>();
|
||||||
let (tx, rx) = mpsc::unbounded_channel::<(u64, Bytes)>();
|
let path = fi.path.clone();
|
||||||
let path = fi.path.clone();
|
tokio::spawn(async move { download_file(fi, rx).await });
|
||||||
tokio::spawn(async move { download_file(fi, rx).await });
|
info_handles.insert(path, tx);
|
||||||
(path, tx)
|
}
|
||||||
})
|
|
||||||
.collect();
|
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
result = stream.next() => match result {
|
result = stream.next() => match result {
|
||||||
Some(Ok(Message::EncryptedMessage(payload))) => {
|
Some(Ok(Message::EncryptedMessage(payload))) => {
|
||||||
let ec = EncryptedMessage::from_encrypted_message(cipher, &payload)?;
|
let ec = EncryptedMessage::from_encrypted_message(cipher, &payload)?;
|
||||||
|
println!("encrypted message received! {:?}", ec);
|
||||||
match ec {
|
match ec {
|
||||||
EncryptedMessage::FileTransferMessage(payload) => {
|
EncryptedMessage::FileTransferMessage(payload) => {
|
||||||
|
println!("matched file transfer message");
|
||||||
if let Some(tx) = info_handles.get(&payload.file_info.path) {
|
if let Some(tx) = info_handles.get(&payload.file_info.path) {
|
||||||
|
println!("matched on filetype, sending to tx");
|
||||||
tx.send((payload.chunk_num, payload.chunk))?
|
tx.send((payload.chunk_num, payload.chunk))?
|
||||||
};
|
};
|
||||||
},
|
},
|
||||||
|
@ -244,15 +249,26 @@ pub async fn download_file(
|
||||||
file_info: FileInfo,
|
file_info: FileInfo,
|
||||||
rx: mpsc::UnboundedReceiver<(u64, Bytes)>,
|
rx: mpsc::UnboundedReceiver<(u64, Bytes)>,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
|
println!("in download file");
|
||||||
let mut rx = rx;
|
let mut rx = rx;
|
||||||
let filename = match file_info.path.file_name() {
|
let filename = match file_info.path.file_name() {
|
||||||
Some(f) => f,
|
Some(f) => {
|
||||||
None => OsStr::new("random.txt"),
|
println!("matched filename");
|
||||||
|
f
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
println!("didnt match filename");
|
||||||
|
OsStr::new("random.txt")
|
||||||
|
}
|
||||||
};
|
};
|
||||||
let mut file = File::open(filename).await?;
|
println!("trying to create file...filename={:?}", filename);
|
||||||
|
let mut file = File::create(filename).await?;
|
||||||
|
println!("file created ok! filename={:?}", filename);
|
||||||
while let Some((chunk_num, chunk)) = rx.recv().await {
|
while let Some((chunk_num, chunk)) = rx.recv().await {
|
||||||
|
println!("rx got message! chunk={:?}", chunk);
|
||||||
file.write_all(&chunk).await?;
|
file.write_all(&chunk).await?;
|
||||||
}
|
}
|
||||||
|
println!("done receiving messages");
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue