Scaffold message passing

This commit is contained in:
Donald Knuth 2022-02-07 13:38:16 -05:00
parent c6b45dea57
commit eca6ddf3b8
3 changed files with 109 additions and 9 deletions

View file

@ -21,7 +21,10 @@ pub async fn send(paths: &Vec<PathBuf>) -> Result<(), Box<dyn std::error::Error>
let mut buf = BytesMut::with_capacity(1024);
buf.put(&b[..]);
let body = buf.freeze();
let m = Message { body: body };
let m = Message {
from_sender: true,
body: body,
};
stream.send(m).await.unwrap();
}
Ok(())

View file

@ -3,5 +3,6 @@ use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct Message {
pub from_sender: bool,
pub body: Bytes,
}

View file

@ -1,17 +1,83 @@
use crate::message::Message;
use futures::prelude::*;
use std::collections::HashMap;
use std::io;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::net::{TcpListener, TcpStream};
use tokio_util::codec::{FramedRead, LengthDelimitedCodec};
use tokio::sync::{mpsc, Mutex};
use tokio_serde::SymmetricallyFramed;
use tokio_util::codec::{Framed, LengthDelimitedCodec};
type Tx = mpsc::UnboundedSender<Message>;
type Rx = mpsc::UnboundedReceiver<Message>;
type MessageStream = SymmetricallyFramed<
Framed<TcpStream, LengthDelimitedCodec>,
Message,
tokio_serde::formats::SymmetricalBincode<Message>,
>;
pub struct Shared {
rooms: HashMap<String, RoomInfo>,
}
type State = Arc<Mutex<Shared>>;
struct RoomInfo {
sender_tx: Tx,
}
struct Client {
is_sender: bool,
messages: MessageStream,
rx: Rx,
}
impl Shared {
fn new() -> Self {
Shared {
rooms: HashMap::new(),
}
}
// async fn broadcast(&mut self, sender: SocketAddr, message: Message) {
// for peer in self.peers.iter_mut() {
// if *peer.0 != sender {
// let _ = peer.1.send(message.clone());
// }
// }
// }
}
impl Client {
async fn new(is_sender: bool, state: State, messages: MessageStream) -> io::Result<Client> {
let (tx, rx) = mpsc::unbounded_channel();
let room_info = RoomInfo { sender_tx: tx };
state
.lock()
.await
.rooms
.insert("abc".to_string(), room_info);
Ok(Client {
is_sender,
messages,
rx,
})
}
}
pub async fn serve() -> Result<(), Box<dyn std::error::Error>> {
let addr = "127.0.0.1:8080".to_string();
let server = TcpListener::bind(&addr).await?;
let listener = TcpListener::bind(&addr).await?;
let state = Arc::new(Mutex::new(Shared::new()));
println!("Listening on: {}", addr);
loop {
let (stream, _) = server.accept().await?;
let (stream, address) = listener.accept().await?;
let state = Arc::clone(&state);
tokio::spawn(async move {
match process(stream).await {
match handle_connection(state, stream, address).await {
Ok(_) => println!("ok"),
Err(_) => println!("err"),
}
@ -19,14 +85,44 @@ pub async fn serve() -> Result<(), Box<dyn std::error::Error>> {
}
}
pub async fn process(socket: TcpStream) -> Result<(), Box<dyn std::error::Error>> {
let length_delimited = FramedRead::new(socket, LengthDelimitedCodec::new());
pub async fn handle_connection(
state: Arc<Mutex<Shared>>,
socket: TcpStream,
addr: SocketAddr,
) -> Result<(), Box<dyn std::error::Error>> {
let length_delimited = Framed::new(socket, LengthDelimitedCodec::new());
let mut stream = tokio_serde::SymmetricallyFramed::new(
length_delimited,
tokio_serde::formats::SymmetricalBincode::<Message>::default(),
);
while let Some(message) = stream.try_next().await? {
println!("GOT: {:?}", message);
let first_message = match stream.next().await {
Some(Ok(msg)) => {
println!("first msg: {:?}", msg);
msg
}
_ => {
println!("no first message");
return Ok(());
}
};
let mut client = Client::new(first_message.from_sender, state.clone(), stream).await?;
// add client to state here
loop {
tokio::select! {
Some(msg) = client.rx.recv() => {
println!("message received to client.rx {:?}", msg);
}
result = client.messages.next() => match result {
Some(Ok(msg)) => {
println!("GOT: {:?}", msg);
}
Some(Err(e)) => {
println!("Error {:?}", e);
}
None => break,
}
}
}
// client is disconnected, let's remove them from the state
Ok(())
}