Fix websocket cleaning. Fix refresh avatar after change

This commit is contained in:
Adrian Wozniak 2020-05-05 22:56:55 +02:00
parent fe06c1e63e
commit 8ee6566e3b
3 changed files with 50 additions and 23 deletions

View File

@ -26,18 +26,18 @@ pub fn update(msg: Msg, model: &mut crate::model::Model, orders: &mut impl Order
_ => (), _ => (),
} }
let project_page = match &mut model.page_content { let profile_page = match &mut model.page_content {
PageContent::Profile(profile_page) => profile_page, PageContent::Profile(profile_page) => profile_page,
_ => return, _ => return,
}; };
project_page.name.update(&msg); profile_page.name.update(&msg);
project_page.email.update(&msg); profile_page.email.update(&msg);
project_page.avatar.update(&msg); profile_page.avatar.update(&msg);
match msg { match msg {
Msg::FileInputChanged(FieldId::Profile(UsersFieldId::Avatar), ..) => { Msg::FileInputChanged(FieldId::Profile(UsersFieldId::Avatar), ..) => {
let file = match project_page.avatar.file.as_ref() { let file = match profile_page.avatar.file.as_ref() {
Some(f) => f, Some(f) => f,
_ => return, _ => return,
}; };
@ -52,6 +52,13 @@ pub fn update(msg: Msg, model: &mut crate::model::Model, orders: &mut impl Order
orders.perform_cmd(update_avatar(fd)); orders.perform_cmd(update_avatar(fd));
orders.skip(); orders.skip();
} }
Msg::WsMsg(WsMsg::AvatarUrlChanged(user_id, avatar_url)) => {
if let Some(me) = model.user.as_mut() {
if me.id == user_id {
profile_page.avatar.url = Some(avatar_url.clone());
}
}
}
_ => (), _ => (),
} }
} }

View File

@ -1,4 +1,5 @@
#![feature(async_closure)] #![feature(async_closure)]
#![feature(vec_remove_item)]
#[macro_use] #[macro_use]
extern crate diesel; extern crate diesel;

View File

@ -1,6 +1,8 @@
use std::collections::{HashMap, HashSet}; use std::collections::HashMap;
use actix::{Actor, ActorContext, Addr, Context, Handler, Message, Recipient, StreamHandler}; use actix::{
Actor, ActorContext, Addr, AsyncContext, Context, Handler, Message, Recipient, StreamHandler,
};
use actix_web::web::Data; use actix_web::web::Data;
use actix_web::{get, web, Error, HttpRequest, HttpResponse}; use actix_web::{get, web, Error, HttpRequest, HttpResponse};
use actix_web_actors::ws; use actix_web_actors::ws;
@ -201,7 +203,11 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WebSocketActor {
fn finished(&mut self, ctx: &mut Self::Context) { fn finished(&mut self, ctx: &mut Self::Context) {
info!("Disconnected"); info!("Disconnected");
if let Some(user) = self.current_user.as_ref() { if let Some(user) = self.current_user.as_ref() {
self.addr.do_send(InnerMsg::Leave(user.project_id, user.id)); self.addr.do_send(InnerMsg::Leave(
user.project_id,
user.id,
ctx.address().recipient(),
));
} }
ctx.stop() ctx.stop()
} }
@ -218,14 +224,14 @@ where
#[rtype(result = "()")] #[rtype(result = "()")]
pub enum InnerMsg { pub enum InnerMsg {
Join(ProjectId, UserId, Recipient<InnerMsg>), Join(ProjectId, UserId, Recipient<InnerMsg>),
Leave(ProjectId, UserId), Leave(ProjectId, UserId, Recipient<InnerMsg>),
BroadcastToChannel(ProjectId, WsMsg), BroadcastToChannel(ProjectId, WsMsg),
Transfer(WsMsg), Transfer(WsMsg),
} }
pub struct WsServer { pub struct WsServer {
sessions: HashMap<i32, Recipient<InnerMsg>>, sessions: HashMap<UserId, Vec<Recipient<InnerMsg>>>,
rooms: HashMap<i32, HashSet<i32>>, rooms: HashMap<ProjectId, HashMap<UserId, i32>>,
} }
impl Default for WsServer { impl Default for WsServer {
@ -252,18 +258,29 @@ impl Handler<InnerMsg> for WsServer {
debug!("receive {:?}", msg); debug!("receive {:?}", msg);
match msg { match msg {
InnerMsg::Join(project_id, user_id, recipient) => { InnerMsg::Join(project_id, user_id, recipient) => {
self.sessions.insert(user_id, recipient); let v = self.sessions.entry(user_id).or_insert(vec![]);
v.push(recipient);
self.ensure_room(project_id); self.ensure_room(project_id);
if let Some(room) = self.rooms.get_mut(&project_id) { if let Some(room) = self.rooms.get_mut(&project_id) {
room.insert(user_id); let n = *room.entry(user_id).or_insert(0);
room.insert(user_id, n + 1);
} }
} }
InnerMsg::Leave(project_id, user_id) => { InnerMsg::Leave(project_id, user_id, recipient) => {
self.ensure_room(project_id); self.ensure_room(project_id);
if let Some(room) = self.rooms.get_mut(&project_id) { let room = match self.rooms.get_mut(&project_id) {
Some(room) => room,
None => return,
};
let n = *room.entry(user_id).or_insert(0);
if n <= 1 {
room.remove(&user_id); room.remove(&user_id);
self.sessions.remove(&user_id);
} else {
let v = self.sessions.entry(user_id).or_insert(vec![]);
v.remove_item(&recipient);
} }
self.sessions.remove(&user_id);
} }
InnerMsg::BroadcastToChannel(project_id, msg) => { InnerMsg::BroadcastToChannel(project_id, msg) => {
debug!("Begin broadcast to channel {} msg {:?}", project_id, msg); debug!("Begin broadcast to channel {} msg {:?}", project_id, msg);
@ -272,17 +289,19 @@ impl Handler<InnerMsg> for WsServer {
_ => return debug!(" channel not found, aborting..."), _ => return debug!(" channel not found, aborting..."),
}; };
let _s = set.len(); let _s = set.len();
for r in set { for r in set.keys() {
let recipient = match self.sessions.get(r) { let v = match self.sessions.get(r) {
Some(r) => r, Some(v) => v,
_ => { _ => {
debug!("recipient is dead, skipping..."); debug!("recipient is dead, skipping...");
continue; continue;
} }
}; };
match recipient.do_send(InnerMsg::Transfer(msg.clone())) { for recipient in v.iter() {
Ok(_) => debug!("msg sent"), match recipient.do_send(InnerMsg::Transfer(msg.clone())) {
Err(e) => error!("{}", e), Ok(_) => debug!("msg sent"),
Err(e) => error!("{}", e),
};
} }
} }
} }
@ -293,7 +312,7 @@ impl Handler<InnerMsg> for WsServer {
impl WsServer { impl WsServer {
pub fn ensure_room(&mut self, room: i32) { pub fn ensure_room(&mut self, room: i32) {
self.rooms.entry(room).or_insert_with(HashSet::new); self.rooms.entry(room).or_insert_with(HashMap::new);
} }
} }