Screenshot 2025-11-27 at 3.43.36 AM.jpg

use std::{collections::{HashMap, HashSet}, hash::Hash};

use snowflake::ProcessUniqueId;
use tokio::sync::mpsc::Sender;

pub struct User {
    pub tx: Sender<String>
}

pub struct RoomManager {
    pub clients: HashMap<ProcessUniqueId, User>,
    pub subscriptions: HashMap<String, HashSet<ProcessUniqueId>>
}

impl RoomManager {
    pub fn new() -> Self {
        Self {
            clients: HashMap::new(),
            subscriptions: HashMap::new()
        }
    }
}
pub async fn broadcast(&self, room_id: String, message: String) {
    let subscriptions = self.subscriptions.get(&room_id);
    if let Some(subscriptions) = subscriptions {
        for client_id in subscriptions {
            let _ = self.clients.get(client_id).unwrap().tx.send(message.clone()).await;
        }
    }
}

<aside> 💡

Good assignment - How to parrallelise these tasks?

</aside>

async fn ws_handler(request: HttpRequest, body: web::Payload) -> Result<actix_web::HttpResponse, actix_web::error::Error> {
    let (response, mut session, mut stream) = actix_ws::handle(&request, body).unwrap();
    let (tx, mut rx) = mpsc::channel::<String>(32);
    let mut session_clone = session.clone();

    rt::spawn(async move {
        while let Some(message) = stream.recv().await {
            match message.unwrap() {
                Message::Ping(data) => {
                    let _ = session_clone.pong(&data).await;
                }
                Message::Text(message) => {
                    let _ = session_clone.text(message).await;
                }
                _ => {

                }
            }
        }
    });

    rt::spawn(async move {
        while let Some(message) = rx.recv().await {
            let _ = session.text(message).await;
        }
    });

    Ok(response)

}
use serde::{Serialize, Deserialize};

#[derive(Serialize, Deserialize)]
pub struct RoomMessage {
    pub room_id: String,
    pub message: String
}

#[derive(Serialize, Deserialize)]
pub enum Message {
    JoinRoom(String),
    LeaveRoom(String),
    Message(RoomMessage)
}
use std::sync::Arc;

use actix_web::{App, HttpRequest, HttpServer, rt, web};
use actix_ws::Message;
use futures_util::lock::Mutex;
use snowflake::ProcessUniqueId;
use tokio::sync::mpsc;

use crate::{room_manager::{RoomManager, User}};
pub mod room_manager;
pub mod messages;

async fn ws_handler(request: HttpRequest, body: web::Payload, room_manager: web::Data<Arc<Mutex<RoomManager>>>) -> Result<actix_web::HttpResponse, actix_web::error::Error> {
    let (response, mut session, mut stream) = actix_ws::handle(&request, body).unwrap();
    let (tx, mut rx) = mpsc::channel::<String>(32);
    let mut session_clone = session.clone();
    let id = ProcessUniqueId::new();

    room_manager.lock().await.clients.insert(id, User {
        tx
    });

    rt::spawn(async move {
        while let Some(message) = stream.recv().await {
            match message.unwrap() {
                Message::Ping(data) => {
                    let _ = session_clone.pong(&data).await;
                }
                Message::Text(message) => {
                    let message: Result<messages::Message, serde_json::Error> = serde_json::from_slice(&message.as_bytes());
                    if let Ok(message) = message {
                        match message {
                            messages::Message::JoinRoom(room_id) => {
                                let mut room_manager = room_manager.lock().await;
                                room_manager.subscriptions.entry(room_id).or_default().insert(id);

                            }
                            messages::Message::LeaveRoom(room_id) => {
                                let mut room_manager = room_manager.lock().await;
                                room_manager.subscriptions.entry(room_id).or_default().remove(&id);
                            }
                            messages::Message::Message(room_message) => {
                                let room_manager = room_manager.lock().await;
                                room_manager.broadcast(room_message.room_id, room_message.message).await ;
                            }
                        }
                    }
                }
                _ => {

                }
            }
        }
    });

    rt::spawn(async move {
        while let Some(message) = rx.recv().await {
            let _ = session.text(message).await;
        }
    });

    Ok(response)

}

#[actix_web::main]
async fn main() {
    let room_manager = Arc::new(Mutex::new(RoomManager::new()));

    let _ = HttpServer::new(move || {
        App::new()
            .route("/ws", web::get().to(ws_handler))
            .app_data(web::Data::new(room_manager.clone()))
    })
    .bind("0.0.0.0:3000")
    .unwrap()
    .run()
    .await;

}