From 8ee6566e3b21862313cbbdedbb64c73cf4b9db87 Mon Sep 17 00:00:00 2001 From: Adrian Wozniak Date: Tue, 5 May 2020 22:56:55 +0200 Subject: [PATCH] Fix websocket cleaning. Fix refresh avatar after change --- jirs-client/src/profile.rs | 17 ++++++++---- jirs-server/src/main.rs | 1 + jirs-server/src/ws/mod.rs | 55 +++++++++++++++++++++++++------------- 3 files changed, 50 insertions(+), 23 deletions(-) diff --git a/jirs-client/src/profile.rs b/jirs-client/src/profile.rs index f40da3ed..3e54033b 100644 --- a/jirs-client/src/profile.rs +++ b/jirs-client/src/profile.rs @@ -26,18 +26,18 @@ pub fn update(msg: Msg, model: &mut crate::model::Model, orders: &mut impl Order _ => (), } - let project_page = match &mut model.page_content { + let profile_page = match &mut model.page_content { PageContent::Profile(profile_page) => profile_page, _ => return, }; - project_page.name.update(&msg); - project_page.email.update(&msg); - project_page.avatar.update(&msg); + profile_page.name.update(&msg); + profile_page.email.update(&msg); + profile_page.avatar.update(&msg); match msg { Msg::FileInputChanged(FieldId::Profile(UsersFieldId::Avatar), ..) => { - let file = match project_page.avatar.file.as_ref() { + let file = match profile_page.avatar.file.as_ref() { Some(f) => f, _ => return, }; @@ -52,6 +52,13 @@ pub fn update(msg: Msg, model: &mut crate::model::Model, orders: &mut impl Order orders.perform_cmd(update_avatar(fd)); orders.skip(); } + Msg::WsMsg(WsMsg::AvatarUrlChanged(user_id, avatar_url)) => { + if let Some(me) = model.user.as_mut() { + if me.id == user_id { + profile_page.avatar.url = Some(avatar_url.clone()); + } + } + } _ => (), } } diff --git a/jirs-server/src/main.rs b/jirs-server/src/main.rs index 6ae9440d..306df337 100644 --- a/jirs-server/src/main.rs +++ b/jirs-server/src/main.rs @@ -1,4 +1,5 @@ #![feature(async_closure)] +#![feature(vec_remove_item)] #[macro_use] extern crate diesel; diff --git a/jirs-server/src/ws/mod.rs b/jirs-server/src/ws/mod.rs index 8ad494dd..9c5b9fcf 100644 --- a/jirs-server/src/ws/mod.rs +++ b/jirs-server/src/ws/mod.rs @@ -1,6 +1,8 @@ -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; -use actix::{Actor, ActorContext, Addr, Context, Handler, Message, Recipient, StreamHandler}; +use actix::{ + Actor, ActorContext, Addr, AsyncContext, Context, Handler, Message, Recipient, StreamHandler, +}; use actix_web::web::Data; use actix_web::{get, web, Error, HttpRequest, HttpResponse}; use actix_web_actors::ws; @@ -201,7 +203,11 @@ impl StreamHandler> for WebSocketActor { fn finished(&mut self, ctx: &mut Self::Context) { info!("Disconnected"); if let Some(user) = self.current_user.as_ref() { - self.addr.do_send(InnerMsg::Leave(user.project_id, user.id)); + self.addr.do_send(InnerMsg::Leave( + user.project_id, + user.id, + ctx.address().recipient(), + )); } ctx.stop() } @@ -218,14 +224,14 @@ where #[rtype(result = "()")] pub enum InnerMsg { Join(ProjectId, UserId, Recipient), - Leave(ProjectId, UserId), + Leave(ProjectId, UserId, Recipient), BroadcastToChannel(ProjectId, WsMsg), Transfer(WsMsg), } pub struct WsServer { - sessions: HashMap>, - rooms: HashMap>, + sessions: HashMap>>, + rooms: HashMap>, } impl Default for WsServer { @@ -252,18 +258,29 @@ impl Handler for WsServer { debug!("receive {:?}", msg); match msg { InnerMsg::Join(project_id, user_id, recipient) => { - self.sessions.insert(user_id, recipient); + let v = self.sessions.entry(user_id).or_insert(vec![]); + v.push(recipient); self.ensure_room(project_id); + if let Some(room) = self.rooms.get_mut(&project_id) { - room.insert(user_id); + let n = *room.entry(user_id).or_insert(0); + room.insert(user_id, n + 1); } } - InnerMsg::Leave(project_id, user_id) => { + InnerMsg::Leave(project_id, user_id, recipient) => { self.ensure_room(project_id); - if let Some(room) = self.rooms.get_mut(&project_id) { + let room = match self.rooms.get_mut(&project_id) { + Some(room) => room, + None => return, + }; + let n = *room.entry(user_id).or_insert(0); + if n <= 1 { room.remove(&user_id); + self.sessions.remove(&user_id); + } else { + let v = self.sessions.entry(user_id).or_insert(vec![]); + v.remove_item(&recipient); } - self.sessions.remove(&user_id); } InnerMsg::BroadcastToChannel(project_id, msg) => { debug!("Begin broadcast to channel {} msg {:?}", project_id, msg); @@ -272,17 +289,19 @@ impl Handler for WsServer { _ => return debug!(" channel not found, aborting..."), }; let _s = set.len(); - for r in set { - let recipient = match self.sessions.get(r) { - Some(r) => r, + for r in set.keys() { + let v = match self.sessions.get(r) { + Some(v) => v, _ => { debug!("recipient is dead, skipping..."); continue; } }; - match recipient.do_send(InnerMsg::Transfer(msg.clone())) { - Ok(_) => debug!("msg sent"), - Err(e) => error!("{}", e), + for recipient in v.iter() { + match recipient.do_send(InnerMsg::Transfer(msg.clone())) { + Ok(_) => debug!("msg sent"), + Err(e) => error!("{}", e), + }; } } } @@ -293,7 +312,7 @@ impl Handler for WsServer { impl WsServer { pub fn ensure_room(&mut self, room: i32) { - self.rooms.entry(room).or_insert_with(HashSet::new); + self.rooms.entry(room).or_insert_with(HashMap::new); } }