
snowflake as a dependency for generating unique ids through the process.use std::{collections::{HashMap, HashSet}, hash::Hash};
use snowflake::ProcessUniqueId;
use tokio::sync::mpsc::Sender;
pub struct User {
pub tx: Sender<String>
}
pub struct RoomManager {
pub clients: HashMap<ProcessUniqueId, User>,
pub subscriptions: HashMap<String, HashSet<ProcessUniqueId>>
}
impl RoomManager {
pub fn new() -> Self {
Self {
clients: HashMap::new(),
subscriptions: HashMap::new()
}
}
}
broadcast method that broadcasts a message to the room argument.pub async fn broadcast(&self, room_id: String, message: String) {
let subscriptions = self.subscriptions.get(&room_id);
if let Some(subscriptions) = subscriptions {
for client_id in subscriptions {
let _ = self.clients.get(client_id).unwrap().tx.send(message.clone()).await;
}
}
}
<aside> 💡
Good assignment - How to parrallelise these tasks?
</aside>
broadcast method and relays it back to the user.async fn ws_handler(request: HttpRequest, body: web::Payload) -> Result<actix_web::HttpResponse, actix_web::error::Error> {
let (response, mut session, mut stream) = actix_ws::handle(&request, body).unwrap();
let (tx, mut rx) = mpsc::channel::<String>(32);
let mut session_clone = session.clone();
rt::spawn(async move {
while let Some(message) = stream.recv().await {
match message.unwrap() {
Message::Ping(data) => {
let _ = session_clone.pong(&data).await;
}
Message::Text(message) => {
let _ = session_clone.text(message).await;
}
_ => {
}
}
}
});
rt::spawn(async move {
while let Some(message) = rx.recv().await {
let _ = session.text(message).await;
}
});
Ok(response)
}
use serde::{Serialize, Deserialize};
#[derive(Serialize, Deserialize)]
pub struct RoomMessage {
pub room_id: String,
pub message: String
}
#[derive(Serialize, Deserialize)]
pub enum Message {
JoinRoom(String),
LeaveRoom(String),
Message(RoomMessage)
}
use std::sync::Arc;
use actix_web::{App, HttpRequest, HttpServer, rt, web};
use actix_ws::Message;
use futures_util::lock::Mutex;
use snowflake::ProcessUniqueId;
use tokio::sync::mpsc;
use crate::{room_manager::{RoomManager, User}};
pub mod room_manager;
pub mod messages;
async fn ws_handler(request: HttpRequest, body: web::Payload, room_manager: web::Data<Arc<Mutex<RoomManager>>>) -> Result<actix_web::HttpResponse, actix_web::error::Error> {
let (response, mut session, mut stream) = actix_ws::handle(&request, body).unwrap();
let (tx, mut rx) = mpsc::channel::<String>(32);
let mut session_clone = session.clone();
let id = ProcessUniqueId::new();
room_manager.lock().await.clients.insert(id, User {
tx
});
rt::spawn(async move {
while let Some(message) = stream.recv().await {
match message.unwrap() {
Message::Ping(data) => {
let _ = session_clone.pong(&data).await;
}
Message::Text(message) => {
let message: Result<messages::Message, serde_json::Error> = serde_json::from_slice(&message.as_bytes());
if let Ok(message) = message {
match message {
messages::Message::JoinRoom(room_id) => {
let mut room_manager = room_manager.lock().await;
room_manager.subscriptions.entry(room_id).or_default().insert(id);
}
messages::Message::LeaveRoom(room_id) => {
let mut room_manager = room_manager.lock().await;
room_manager.subscriptions.entry(room_id).or_default().remove(&id);
}
messages::Message::Message(room_message) => {
let room_manager = room_manager.lock().await;
room_manager.broadcast(room_message.room_id, room_message.message).await ;
}
}
}
}
_ => {
}
}
}
});
rt::spawn(async move {
while let Some(message) = rx.recv().await {
let _ = session.text(message).await;
}
});
Ok(response)
}
#[actix_web::main]
async fn main() {
let room_manager = Arc::new(Mutex::new(RoomManager::new()));
let _ = HttpServer::new(move || {
App::new()
.route("/ws", web::get().to(ws_handler))
.app_data(web::Data::new(room_manager.clone()))
})
.bind("0.0.0.0:3000")
.unwrap()
.run()
.await;
}