Fix server; parity with main

This commit is contained in:
Donald Knuth 2022-08-29 16:14:30 -04:00
parent b3b683074d
commit 33745f7b44
4 changed files with 13 additions and 14 deletions

View file

@ -83,6 +83,7 @@ pub async fn negotiate_files_up(
let files = file_handles.iter().map(|fh| fh.to_file_info()).collect(); let files = file_handles.iter().map(|fh| fh.to_file_info()).collect();
let msg = EncryptedMessage::FileNegotiationMessage(FileNegotiationPayload { files }); let msg = EncryptedMessage::FileNegotiationMessage(FileNegotiationPayload { files });
let server_msg = msg.to_encrypted_message(cipher)?; let server_msg = msg.to_encrypted_message(cipher)?;
println!("server_msg encrypted: {:?}", server_msg);
stream.send(server_msg).await?; stream.send(server_msg).await?;
let reply_payload = match stream.next().await { let reply_payload = match stream.next().await {
Some(Ok(msg)) => match msg { Some(Ok(msg)) => match msg {

View file

@ -1 +1 @@
pub const BUFFER_SIZE: usize = 1024 * 1024; pub const BUFFER_SIZE: usize = 1024 * 1024 * 10;

View file

@ -4,7 +4,6 @@ use aes_gcm::aead::{Aead, NewAead};
use aes_gcm::{Aes256Gcm, Key, Nonce}; // Or `Aes128Gcm` use aes_gcm::{Aes256Gcm, Key, Nonce}; // Or `Aes128Gcm`
use anyhow::{anyhow, Result}; use anyhow::{anyhow, Result};
use bytes::{Bytes, BytesMut}; use bytes::{Bytes, BytesMut};
use futures::prelude::*;
use rand::{thread_rng, Rng}; use rand::{thread_rng, Rng};
use spake2::{Ed25519Group, Identity, Password, Spake2}; use spake2::{Ed25519Group, Identity, Password, Spake2};
use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::io::{AsyncReadExt, AsyncWriteExt};
@ -24,11 +23,7 @@ pub async fn handshake(
handshake_msg.extend_from_slice(&outbound_msg); handshake_msg.extend_from_slice(&outbound_msg);
let handshake_msg = handshake_msg.freeze(); let handshake_msg = handshake_msg.freeze();
println!("client - handshake msg, {:?}", handshake_msg); println!("client - handshake msg, {:?}", handshake_msg);
// println!( println!("id: {:?}. msg: {:?}", id.clone(), outbound_msg.clone());
// "len id: {:?}. len msg: {:?}",
// id.len(),
// Bytes::from(outbound_msg).len()
// );
socket.write_all(&handshake_msg).await?; socket.write_all(&handshake_msg).await?;
let mut buffer = [0; 33]; let mut buffer = [0; 33];
let n = socket.read_exact(&mut buffer).await?; let n = socket.read_exact(&mut buffer).await?;
@ -39,7 +34,7 @@ pub async fn handshake(
Ok(key_bytes) => key_bytes, Ok(key_bytes) => key_bytes,
Err(e) => return Err(anyhow!(e.to_string())), Err(e) => return Err(anyhow!(e.to_string())),
}; };
// println!("Handshake successful. Key is {:?}", key); println!("Handshake successful. Key is {:?}", key);
return Ok((socket, new_cipher(&key))); return Ok((socket, new_cipher(&key)));
} }

View file

@ -1,7 +1,7 @@
use crate::conf::BUFFER_SIZE;
use anyhow::{anyhow, Result}; use anyhow::{anyhow, Result};
use bytes::{Bytes, BytesMut}; use bytes::{Bytes, BytesMut};
use std::collections::HashMap; use std::collections::HashMap;
use std::io;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::io::{AsyncReadExt, AsyncWriteExt};
@ -123,19 +123,22 @@ pub async fn handle_connection(
}; };
println!("Client upgraded"); println!("Client upgraded");
// The handshake cache should be empty for {id} at this point. // The handshake cache should be empty for {id} at this point.
let mut client_buffer = BytesMut::with_capacity(1024); let mut client_buffer = BytesMut::with_capacity(BUFFER_SIZE);
loop { loop {
tokio::select! { tokio::select! {
Some(msg) = client.rx.recv() => { Some(msg) = client.rx.recv() => {
// println!("piping bytes= {:?}", msg);
client.socket.write_all(&msg[..]).await? client.socket.write_all(&msg[..]).await?
} }
result = client.socket.read(&mut client_buffer) => match result { result = client.socket.read_buf(&mut client_buffer) => match result {
Ok(0) => { Ok(0) => {
break break;
}, },
Ok(n) => { Ok(n) => {
println!("reading more"); let b = BytesMut::from(&client_buffer[0..n]).freeze();
client.peer_tx.send(BytesMut::from(&client_buffer[0..n]).freeze())? // println!("reading more = {:?}", b);
client_buffer.clear();
client.peer_tx.send(b)?
}, },
Err(e) => { Err(e) => {
println!("Error {:?}", e); println!("Error {:?}", e);