diff --git a/jirs-server/src/ws/auth.rs b/jirs-server/src/ws/auth.rs index 1266dcda..86bc3421 100644 --- a/jirs-server/src/ws/auth.rs +++ b/jirs-server/src/ws/auth.rs @@ -1,62 +1,101 @@ -use actix::Addr; -use actix_web::web::Data; +use actix::AsyncContext; +use futures::executor::block_on; -use jirs_data::WsMsg; +use jirs_data::{Token, WsMsg}; -use crate::db::tokens::CreateBindToken; +use crate::db::authorize_user::AuthorizeUser; +use crate::db::tokens::{CreateBindToken, FindBindToken}; use crate::db::users::FindUser; -use crate::db::DbExecutor; use crate::mail::welcome::Welcome; -use crate::mail::MailExecutor; -use crate::ws::WsResult; +use crate::ws::{WebSocketActor, WsHandler, WsResult}; -pub async fn authenticate( - db: &Data>, - mail: &Data>, - name: String, - email: String, -) -> WsResult { - // TODO check attempt number, allow only 5 times per day - let user = match db.send(FindUser { name, email }).await { - Ok(Ok(user)) => user, - Ok(Err(e)) => { - error!("{:?}", e); - return Ok(None); - } - Err(e) => { - error!("{:?}", e); - return Ok(None); - } - }; - let token = match db.send(CreateBindToken { user_id: user.id }).await { - Ok(Ok(token)) => token, - Ok(Err(e)) => { - error!("{:?}", e); - return Ok(None); - } - Err(e) => { - error!("{:?}", e); - return Ok(None); - } - }; - if let Some(bind_token) = token.bind_token.as_ref().cloned() { - match mail - .send(Welcome { - bind_token, - email: user.email.clone(), - }) - .await - { - Ok(Ok(_)) => (), +pub struct Authenticate { + pub name: String, + pub email: String, +} + +impl WsHandler for WebSocketActor { + fn handle_msg(&mut self, msg: Authenticate, _ctx: &mut Self::Context) -> WsResult { + let Authenticate { name, email } = msg; + // TODO check attempt number, allow only 5 times per day + let user = match block_on(self.db.send(FindUser { name, email })) { + Ok(Ok(user)) => user, Ok(Err(e)) => { - error!("{}", e); + error!("{:?}", e); return Ok(None); } Err(e) => { - error!("{}", e); + error!("{:?}", e); return Ok(None); } + }; + let token = match block_on(self.db.send(CreateBindToken { user_id: user.id })) { + Ok(Ok(token)) => token, + Ok(Err(e)) => { + error!("{:?}", e); + return Ok(None); + } + Err(e) => { + error!("{:?}", e); + return Ok(None); + } + }; + if let Some(bind_token) = token.bind_token.as_ref().cloned() { + match block_on(self.mail.send(Welcome { + bind_token, + email: user.email.clone(), + })) { + Ok(Ok(_)) => (), + Ok(Err(e)) => { + error!("{}", e); + return Ok(None); + } + Err(e) => { + error!("{}", e); + return Ok(None); + } + } } + Ok(Some(WsMsg::AuthenticateSuccess)) + } +} + +pub struct CheckAuthToken { + pub token: uuid::Uuid, +} + +impl WsHandler for WebSocketActor { + fn handle_msg(&mut self, msg: CheckAuthToken, ctx: &mut Self::Context) -> WsResult { + let user: jirs_data::User = match block_on(self.db.send(AuthorizeUser { + access_token: msg.token, + })) { + Ok(Ok(u)) => u.into(), + Ok(Err(_)) => { + return Ok(Some(WsMsg::AuthorizeLoaded(Err( + "Invalid auth token".to_string() + )))) + } + _ => return Ok(Some(WsMsg::AuthorizeExpired)), + }; + self.current_user = Some(user.clone()); + block_on(self.join_channel(ctx.address().recipient())); + Ok(Some(WsMsg::AuthorizeLoaded(Ok(user)))) + } +} + +pub struct CheckBindToken { + pub bind_token: uuid::Uuid, +} + +impl WsHandler for WebSocketActor { + fn handle_msg(&mut self, msg: CheckBindToken, _ctx: &mut Self::Context) -> WsResult { + let token: Token = match block_on(self.db.send(FindBindToken { + token: msg.bind_token, + })) { + Ok(Ok(token)) => token, + Ok(Err(_)) => return Ok(Some(WsMsg::BindTokenBad)), + _ => return Ok(None), + }; + Ok(Some(WsMsg::BindTokenOk(token.access_token))) } - Ok(Some(WsMsg::AuthenticateSuccess)) } diff --git a/jirs-server/src/ws/comments.rs b/jirs-server/src/ws/comments.rs index 7d388701..93a3c473 100644 --- a/jirs-server/src/ws/comments.rs +++ b/jirs-server/src/ws/comments.rs @@ -1,98 +1,98 @@ use actix::Addr; use actix_web::web::Data; +use futures::executor::block_on; use jirs_data::{CommentId, CreateCommentPayload, IssueId, UpdateCommentPayload, WsMsg}; -use crate::db::comments::LoadIssueComments; use crate::db::DbExecutor; -use crate::ws::{current_user, WsResult}; +use crate::ws::{current_user, WebSocketActor, WsHandler, WsResult}; -pub async fn load_issues( - db: &Data>, - user: &Option, - issue_id: IssueId, -) -> WsResult { - current_user(user)?; - let comments = match db.send(LoadIssueComments { issue_id }).await { - Ok(Ok(comments)) => comments.into_iter().map(|c| c.into()).collect(), - _ => return Ok(None), - }; - - Ok(Some(WsMsg::IssueCommentsLoaded(comments))) +pub struct LoadIssueComments { + pub issue_id: IssueId, } -pub async fn create_comment( - db: &Data>, - user: &Option, - mut payload: CreateCommentPayload, -) -> WsResult { - use crate::db::comments::CreateComment; +impl WsHandler for WebSocketActor { + fn handle_msg(&mut self, msg: LoadIssueComments, _ctx: Self::Context) -> WsResult { + self.require_user()?; - let user_id = current_user(user)?.id; - if payload.user_id.is_none() { - payload.user_id = Some(user_id); + let comments = match block_on(self.db.send(crate::db::comments::LoadIssueComments { + issue_id: msg.issue_id, + })) { + Ok(Ok(comments)) => comments.into_iter().map(|c| c.into()).collect(), + _ => return Ok(None), + }; + + Ok(Some(WsMsg::IssueCommentsLoaded(comments))) } - let issue_id = payload.issue_id; - match db - .send(CreateComment { +} + +impl WsHandler for WebSocketActor { + fn handle_msg(&mut self, mut msg: CreateCommentPayload, _ctx: Self::Context) -> WsResult { + use crate::db::comments::CreateComment; + + let user_id = self.require_user()?.id; + if msg.user_id.is_none() { + msg.user_id = Some(user_id); + } + let issue_id = msg.issue_id; + match block_on(self.db.send(CreateComment { user_id, issue_id, - body: payload.body, - }) - .await - { - Ok(Ok(_)) => (), - _ => return Ok(None), - }; - load_issues(db, user, issue_id).await + body: msg.body, + })) { + Ok(Ok(_)) => (), + _ => return Ok(None), + }; + self.handle_msg(LoadIssueComments { issue_id }) + } } -pub async fn update_comment( - db: &Data>, - user: &Option, - payload: UpdateCommentPayload, -) -> WsResult { - use crate::db::comments::UpdateComment; +impl WsHandler for WebSocketActor { + fn handle_msg(&mut self, msg: UpdateCommentPayload, _ctx: Self::Context) -> WsResult { + use crate::db::comments::UpdateComment; - info!("{:?}", payload); - let user_id = current_user(user)?.id; + info!("{:?}", msg); + let user_id = self.require_user()?.id; - let UpdateCommentPayload { - id: comment_id, - body, - } = payload; + let UpdateCommentPayload { + id: comment_id, + body, + } = msg; - let issue_id = match db - .send(UpdateComment { + let issue_id = match block_on(self.db.send(UpdateComment { comment_id, user_id, body, - }) - .await - { - Ok(Ok(comment)) => comment.issue_id, - _ => return Ok(None), - }; - load_issues(db, user, issue_id).await + })) { + Ok(Ok(comment)) => comment.issue_id, + _ => return Ok(None), + }; + if let Some(v) = self.handle_msg(LoadIssueComments { issue_id })? { + self.broadcast(&v); + } + Ok(None) + } } -pub async fn delete_comment( - db: &Data>, - user: &Option, - comment_id: CommentId, -) -> WsResult { - use crate::db::comments::DeleteComment; - - let user_id = current_user(user)?.id; - - let msg = DeleteComment { - comment_id, - user_id, - }; - match db.send(msg).await { - Ok(Ok(_)) => (), - _ => return Ok(None), - }; - - Ok(Some(WsMsg::CommentDeleted(comment_id))) +pub struct DeleteComment { + pub comment_id: CommentId, +} + +impl WsHandler for WebSocketActor { + fn handle_msg(&mut self, msg: DeleteComment, _ctx: Self::Context) -> WsResult { + use crate::db::comments::DeleteComment; + + let user_id = self.require_user()?.id; + + let m = DeleteComment { + comment_id: msg.comment_id, + user_id, + }; + match block_on(self.db.send(m)) { + Ok(Ok(_)) => (), + _ => return Ok(None), + }; + + Ok(Some(WsMsg::CommentDeleted(msg.comment_id))) + } } diff --git a/jirs-server/src/ws/invitations.rs b/jirs-server/src/ws/invitations.rs index 94bfb201..72af2998 100644 --- a/jirs-server/src/ws/invitations.rs +++ b/jirs-server/src/ws/invitations.rs @@ -1,21 +1,14 @@ -use actix::{Handler, Message}; use futures::executor::block_on; use jirs_data::{EmailString, InvitationId, UsernameString, WsMsg}; use crate::db::invitations; -use crate::ws::{WebSocketActor, WsResult}; +use crate::ws::{WebSocketActor, WsHandler, WsResult}; pub struct ListInvitation; -impl Message for ListInvitation { - type Result = WsResult; -} - -impl Handler for WebSocketActor { - type Result = WsResult; - - fn handle(&mut self, _msg: ListInvitation, _ctx: &mut Self::Context) -> Self::Result { +impl WsHandler for WebSocketActor { + fn handle_msg(&mut self, _msg: ListInvitation, _ctx: &mut Self::Context) -> WsResult { let user_id = match self.current_user.as_ref().map(|u| u.id) { Some(id) => id, _ => return Ok(None), @@ -33,14 +26,8 @@ pub struct CreateInvitation { pub name: UsernameString, } -impl Message for CreateInvitation { - type Result = WsResult; -} - -impl Handler for WebSocketActor { - type Result = WsResult; - - fn handle(&mut self, msg: CreateInvitation, _ctx: &mut Self::Context) -> Self::Result { +impl WsHandler for WebSocketActor { + fn handle_msg(&mut self, msg: CreateInvitation, _ctx: &mut Self::Context) -> WsResult { let (user_id, project_id) = match self.current_user.as_ref().map(|u| (u.id, u.project_id)) { Some(id) => id, _ => return Ok(None), @@ -63,14 +50,8 @@ pub struct DeleteInvitation { pub id: InvitationId, } -impl Message for DeleteInvitation { - type Result = WsResult; -} - -impl Handler for WebSocketActor { - type Result = WsResult; - - fn handle(&mut self, msg: DeleteInvitation, _ctx: &mut Self::Context) -> Self::Result { +impl WsHandler for WebSocketActor { + fn handle_msg(&mut self, msg: DeleteInvitation, _ctx: &mut Self::Context) -> WsResult { self.require_user()?; let DeleteInvitation { id } = msg; let res = match block_on(self.db.send(invitations::DeleteInvitation { id })) { @@ -85,14 +66,8 @@ pub struct RevokeInvitation { pub id: InvitationId, } -impl Message for RevokeInvitation { - type Result = WsResult; -} - -impl Handler for WebSocketActor { - type Result = WsResult; - - fn handle(&mut self, msg: RevokeInvitation, _ctx: &mut Self::Context) -> Self::Result { +impl WsHandler for WebSocketActor { + fn handle_msg(&mut self, msg: RevokeInvitation, _ctx: &mut Self::Context) -> WsResult { self.require_user()?; let RevokeInvitation { id } = msg; let res = match block_on(self.db.send(invitations::RevokeInvitation { id })) { @@ -107,14 +82,8 @@ pub struct AcceptInvitation { pub id: InvitationId, } -impl Message for AcceptInvitation { - type Result = WsResult; -} - -impl Handler for WebSocketActor { - type Result = WsResult; - - fn handle(&mut self, msg: AcceptInvitation, _ctx: &mut Self::Context) -> Self::Result { +impl WsHandler for WebSocketActor { + fn handle_msg(&mut self, msg: AcceptInvitation, _ctx: &mut Self::Context) -> WsResult { self.require_user()?; let AcceptInvitation { id } = msg; let res = match block_on(self.db.send(invitations::AcceptInvitation { id })) { diff --git a/jirs-server/src/ws/issues.rs b/jirs-server/src/ws/issues.rs index 042b779b..3d33b711 100644 --- a/jirs-server/src/ws/issues.rs +++ b/jirs-server/src/ws/issues.rs @@ -1,15 +1,12 @@ use std::collections::HashMap; -use actix::*; -use actix_web::web::Data; use futures::executor::block_on; -use jirs_data::{IssueAssignee, IssueFieldId, PayloadVariant, WsMsg}; +use jirs_data::{CreateIssuePayload, IssueAssignee, IssueFieldId, IssueId, PayloadVariant, WsMsg}; use crate::db::issue_assignees::LoadAssignees; use crate::db::issues::{LoadProjectIssues, UpdateIssue}; -use crate::db::DbExecutor; -use crate::ws::{current_user, WebSocketActor, WsResult}; +use crate::ws::{WebSocketActor, WsHandler, WsResult}; pub struct UpdateIssueHandler { pub id: i32, @@ -17,18 +14,8 @@ pub struct UpdateIssueHandler { pub payload: PayloadVariant, } -impl Message for UpdateIssueHandler { - type Result = WsResult; -} - -impl Actor for UpdateIssueHandler { - type Context = Context; -} - -impl Handler for WebSocketActor { - type Result = WsResult; - - fn handle(&mut self, msg: UpdateIssueHandler, _ctx: &mut Self::Context) -> Self::Result { +impl WsHandler for WebSocketActor { + fn handle_msg(&mut self, msg: UpdateIssueHandler, _ctx: &mut Self::Context) -> WsResult { self.require_user()?; let UpdateIssueHandler { @@ -90,145 +77,87 @@ impl Handler for WebSocketActor { for assignee in assignees { issue.user_ids.push(assignee.user_id); } + self.broadcast(&WsMsg::IssueUpdated(issue)); - Ok(Some(WsMsg::IssueUpdated(issue))) + Ok(None) } } -pub async fn update_issue( - db: &Data>, - user: &Option, - issue_id: i32, - issue_field_id: IssueFieldId, - payload: PayloadVariant, -) -> WsResult { - current_user(user)?; - - let mut msg = UpdateIssue::default(); - msg.issue_id = issue_id; - match (issue_field_id, payload) { - (IssueFieldId::Type, PayloadVariant::IssueType(t)) => { - msg.issue_type = Some(t); - } - (IssueFieldId::Title, PayloadVariant::String(s)) => { - msg.title = Some(s); - } - (IssueFieldId::Description, PayloadVariant::String(s)) => { - msg.description = Some(s); - } - (IssueFieldId::Status, PayloadVariant::IssueStatus(s)) => { - msg.status = Some(s); - } - (IssueFieldId::ListPosition, PayloadVariant::I32(i)) => { - msg.list_position = Some(i); - } - (IssueFieldId::Assignees, PayloadVariant::VecI32(v)) => { - msg.user_ids = Some(v); - } - (IssueFieldId::Reporter, PayloadVariant::I32(i)) => { - msg.reporter_id = Some(i); - } - (IssueFieldId::Priority, PayloadVariant::IssuePriority(p)) => { - msg.priority = Some(p); - } - (IssueFieldId::Estimate, PayloadVariant::OptionI32(o)) => { - msg.estimate = o; - } - (IssueFieldId::TimeSpend, PayloadVariant::OptionI32(o)) => { - msg.time_spent = o; - } - (IssueFieldId::TimeRemaining, PayloadVariant::OptionI32(o)) => { - msg.time_remaining = o; - } - _ => (), - }; - - let mut issue: jirs_data::Issue = match db.send(msg).await { - Ok(Ok(issue)) => issue.into(), - _ => return Ok(None), - }; - - let assignees = match db.send(LoadAssignees { issue_id: issue.id }).await { - Ok(Ok(v)) => v, - _ => vec![], - }; - for assignee in assignees { - issue.user_ids.push(assignee.user_id); - } - - Ok(Some(WsMsg::IssueUpdated(issue))) -} - -pub async fn add_issue( - db: &Data>, - user: &Option, - payload: jirs_data::CreateIssuePayload, -) -> WsResult { - current_user(user)?; - let msg = crate::db::issues::CreateIssue { - title: payload.title, - issue_type: payload.issue_type, - status: payload.status, - priority: payload.priority, - description: payload.description, - description_text: payload.description_text, - estimate: payload.estimate, - time_spent: payload.time_spent, - time_remaining: payload.time_remaining, - project_id: payload.project_id, - reporter_id: payload.reporter_id, - user_ids: payload.user_ids, - }; - let m = match db.send(msg).await { - Ok(Ok(issue)) => Some(WsMsg::IssueCreated(issue.into())), - _ => None, - }; - Ok(m) -} - -pub async fn delete_issue( - db: &Data>, - user: &Option, - id: i32, -) -> WsResult { - current_user(user)?; - let m = match db - .send(crate::db::issues::DeleteIssue { issue_id: id }) - .await - { - Ok(Ok(_)) => Some(WsMsg::IssueDeleted(id)), - _ => None, - }; - Ok(m) -} - -pub async fn load_issues(db: &Data>, user: &Option) -> WsResult { - let project_id = current_user(user).map(|u| u.project_id)?; - - let issues: Vec = match db.send(LoadProjectIssues { project_id }).await { - Ok(Ok(v)) => v.into_iter().map(|i| i.into()).collect(), - _ => return Ok(None), - }; - let mut issue_map = HashMap::new(); - let mut queue = vec![]; - for issue in issues.into_iter() { - let f = db.send(LoadAssignees { issue_id: issue.id }); - queue.push(f); - issue_map.insert(issue.id, issue); - } - for f in queue { - if let Ok(Ok(assignees)) = f.await { - for assignee in assignees { - if let Some(issue) = issue_map.get_mut(&assignee.issue_id) { - issue.user_ids.push(assignee.user_id); - } - } +impl WsHandler for WebSocketActor { + fn handle_msg(&mut self, msg: CreateIssuePayload, _ctx: &mut Self::Context) -> WsResult { + self.require_user()?; + let msg = crate::db::issues::CreateIssue { + title: msg.title, + issue_type: msg.issue_type, + status: msg.status, + priority: msg.priority, + description: msg.description, + description_text: msg.description_text, + estimate: msg.estimate, + time_spent: msg.time_spent, + time_remaining: msg.time_remaining, + project_id: msg.project_id, + reporter_id: msg.reporter_id, + user_ids: msg.user_ids, }; + let m = match block_on(self.db.send(msg)) { + Ok(Ok(issue)) => Some(WsMsg::IssueCreated(issue.into())), + _ => None, + }; + Ok(m) + } +} + +pub struct DeleteIssue { + pub id: IssueId, +} + +impl WsHandler for WebSocketActor { + fn handle_msg(&mut self, msg: DeleteIssue, _ctx: &mut Self::Context) -> WsResult { + self.require_user()?; + let m = match block_on( + self.db + .send(crate::db::issues::DeleteIssue { issue_id: msg.id }), + ) { + Ok(Ok(_)) => Some(WsMsg::IssueDeleted(msg.id)), + _ => None, + }; + Ok(m) + } +} + +pub struct LoadIssues; + +impl WsHandler for WebSocketActor { + fn handle_msg(&mut self, _msg: LoadIssues, _ctx: &mut Self::Context) -> WsResult { + let project_id = self.require_user()?.project_id; + + let issues: Vec = + match block_on(self.db.send(LoadProjectIssues { project_id })) { + Ok(Ok(v)) => v.into_iter().map(|i| i.into()).collect(), + _ => return Ok(None), + }; + let mut issue_map = HashMap::new(); + let mut queue = vec![]; + for issue in issues.into_iter() { + let f = self.db.send(LoadAssignees { issue_id: issue.id }); + queue.push(f); + issue_map.insert(issue.id, issue); + } + for f in queue { + if let Ok(Ok(assignees)) = block_on(f) { + for assignee in assignees { + if let Some(issue) = issue_map.get_mut(&assignee.issue_id) { + issue.user_ids.push(assignee.user_id); + } + } + }; + } + let mut issues = vec![]; + for (_, issue) in issue_map.into_iter() { + issues.push(issue); + } + + Ok(Some(WsMsg::ProjectIssuesLoaded(issues))) } - let mut issues = vec![]; - for (_, issue) in issue_map.into_iter() { - issues.push(issue); - } - - Ok(Some(WsMsg::ProjectIssuesLoaded(issues))) } diff --git a/jirs-server/src/ws/mod.rs b/jirs-server/src/ws/mod.rs index 297a36a7..0fa5490c 100644 --- a/jirs-server/src/ws/mod.rs +++ b/jirs-server/src/ws/mod.rs @@ -1,20 +1,18 @@ use std::collections::{HashMap, HashSet}; -use actix::{ - Actor, ActorContext, Addr, AsyncContext, Context, Handler, Message, Recipient, StreamHandler, -}; +use actix::{Actor, ActorContext, Addr, Context, Handler, Message, Recipient, StreamHandler}; use actix_web::web::Data; use actix_web::{get, web, Error, HttpRequest, HttpResponse}; use actix_web_actors::ws; use futures::executor::block_on; -use futures::SinkExt; -use jirs_data::{ProjectId, Token, UserId, WsMsg}; +use jirs_data::{ProjectId, UserId, WsMsg}; -use crate::db::authorize_user::AuthorizeUser; -use crate::db::tokens::FindBindToken; use crate::db::DbExecutor; use crate::mail::MailExecutor; +use crate::ws::auth::{Authenticate, CheckAuthToken, CheckBindToken}; +use crate::ws::invitations::*; +use crate::ws::issues::UpdateIssueHandler; pub mod auth; pub mod comments; @@ -88,83 +86,69 @@ impl WebSocketActor { WsMsg::Pong => Some(WsMsg::Ping), // Issues - WsMsg::IssueUpdateRequest(id, field_id, payload) => match block_on( - issues::update_issue(&self.db, &self.current_user, id, field_id, payload), - ) { - Ok(Some(msg)) => { - self.broadcast(&msg); - None - } - _ => None, - }, - WsMsg::IssueCreateRequest(payload) => { - block_on(issues::add_issue(&self.db, &self.current_user, payload))? - } - WsMsg::IssueDeleteRequest(id) => { - block_on(issues::delete_issue(&self.db, &self.current_user, id))? - } - WsMsg::ProjectIssuesRequest => { - block_on(issues::load_issues(&self.db, &self.current_user))? - } + WsMsg::IssueUpdateRequest(id, field_id, payload) => self.handle_msg( + UpdateIssueHandler { + id, + field_id, + payload, + }, + ctx, + )?, + WsMsg::IssueCreateRequest(payload) => self.handle_msg(payload, ctx)?, + WsMsg::IssueDeleteRequest(id) => self.handle_msg(issues::DeleteIssue { id }, ctx)?, + WsMsg::ProjectIssuesRequest => self.handle_msg(issues::LoadIssues, ctx)?, // projects - WsMsg::ProjectRequest => { - block_on(projects::current_project(&self.db, &self.current_user))? - } - - WsMsg::ProjectUpdateRequest(payload) => block_on(projects::update_project( - &self.db, - &self.current_user, - payload, - ))?, + WsMsg::ProjectRequest => self.handle_msg(projects::CurrentProject, ctx)?, + WsMsg::ProjectUpdateRequest(payload) => self.handle_msg(payload, ctx)?, // auth - WsMsg::AuthorizeRequest(uuid) => block_on(self.check_auth_token(uuid, ctx))?, - WsMsg::BindTokenCheck(uuid) => block_on(self.check_bind_token(uuid))?, + WsMsg::AuthorizeRequest(uuid) => { + self.handle_msg(CheckAuthToken { token: uuid }, ctx)? + } + WsMsg::BindTokenCheck(uuid) => { + self.handle_msg(CheckBindToken { bind_token: uuid }, ctx)? + } WsMsg::AuthenticateRequest(email, name) => { - block_on(auth::authenticate(&self.db, &self.mail, name, email))? + self.handle_msg(Authenticate { name, email }, ctx)? } // register - WsMsg::SignUpRequest(email, username) => { - block_on(users::register(&self.db, &self.mail, username, email))? - } + WsMsg::SignUpRequest(email, username) => self.handle_msg( + users::Register { + name: username, + email, + }, + ctx, + )?, // users - WsMsg::ProjectUsersRequest => { - block_on(users::load_project_users(&self.db, &self.current_user))? - } + WsMsg::ProjectUsersRequest => self.handle_msg(users::LoadProjectUsers, ctx)?, // comments - WsMsg::IssueCommentsRequest(issue_id) => block_on(comments::load_issues( - &self.db, - &self.current_user, - issue_id, - ))?, + WsMsg::IssueCommentsRequest(issue_id) => { + self.handle_msg(comments::LoadIssueComments { issue_id }, ctx)? + } + WsMsg::CreateComment(payload) => self.handle_msg(payload, ctx)?, + WsMsg::UpdateComment(payload) => self.handle_msg(payload, ctx)?, + WsMsg::CommentDeleteRequest(comment_id) => { + self.handle_msg(comments::DeleteComment { comment_id }, ctx)? + } - WsMsg::CreateComment(payload) => block_on(comments::create_comment( - &self.db, - &self.current_user, - payload, - ))?, + // invitations + WsMsg::InvitationSendRequest { name, email } => self.handle_msg( + CreateInvitation { + name: name.clone(), + email: email.clone(), + }, + ctx, + )?, + WsMsg::InvitationListRequest => self.handle_msg(ListInvitation, ctx)?, + WsMsg::InvitationAcceptRequest(id) => self.handle_msg(AcceptInvitation { id }, ctx)?, - WsMsg::UpdateComment(payload) => match block_on(comments::update_comment( - &self.db, - &self.current_user, - payload, - )) { - Ok(Some(msg)) => { - self.broadcast(&msg); - None - } - _ => None, - }, + WsMsg::InvitationRevokeRequest(id) => self.handle_msg(RevokeInvitation { id }, ctx)?, - WsMsg::CommentDeleteRequest(comment_id) => block_on(comments::delete_comment( - &self.db, - &self.current_user, - comment_id, - ))?, + WsMsg::InvitedUsersRequest => None, // else fail _ => { @@ -178,41 +162,6 @@ impl WebSocketActor { Ok(msg) } - async fn check_auth_token( - &mut self, - token: uuid::Uuid, - ctx: &mut ::Context, - ) -> WsResult { - let m = match self - .db - .send(AuthorizeUser { - access_token: token, - }) - .await - { - Ok(Ok(u)) => { - let user: jirs_data::User = u.into(); - self.current_user = Some(user.clone()); - self.join_channel(ctx.address().recipient()).await; - Some(WsMsg::AuthorizeLoaded(Ok(user))) - } - Ok(Err(_)) => Some(WsMsg::AuthorizeLoaded( - Err("Invalid auth token".to_string()), - )), - _ => Some(WsMsg::AuthorizeExpired), - }; - Ok(m) - } - - async fn check_bind_token(&mut self, bind_token: uuid::Uuid) -> WsResult { - let token: Token = match self.db.send(FindBindToken { token: bind_token }).await { - Ok(Ok(token)) => token, - Ok(Err(_)) => return Ok(Some(WsMsg::BindTokenBad)), - _ => return Ok(None), - }; - Ok(Some(WsMsg::BindTokenOk(token.access_token))) - } - async fn join_channel(&self, addr: Recipient) { info!("joining channel..."); info!(" current user {:?}", self.current_user); @@ -267,35 +216,11 @@ impl StreamHandler> for WebSocketActor { } } -impl WebSocketActor { - fn try_handle_message( - &mut self, - msg: WsMsg, - _ctx: &mut ::Context, - ) -> WsResult - where - Self: Actor, - { - match msg { - WsMsg::InvitationSendRequest { name, email } => { - use invitations::*; - - let m = CreateInvitation { - name: name.clone(), - email: email.clone(), - }; - // Handler::handle(&mut self, m, _ctx); - Ok(None) - // Handler::::handle(&mut self, m, _ctx) - // >.handle(m, ctx) - } - // WsMsg::InvitationListRequest => self.handle(ListInvitation, ctx), - // WsMsg::InvitationAcceptRequest(id) => Ok(None), - // WsMsg::InvitationRevokeRequest(id) => self.handle(RevokeInvitation { id: *id }, ctx), - // WsMsg::InvitedUsersRequest => Ok(None), - _ => Ok(None), - } - } +pub trait WsHandler +where + Self: Actor, +{ + fn handle_msg(&mut self, msg: Message, _ctx: &mut ::Context) -> WsResult; } #[derive(Message, Debug)] diff --git a/jirs-server/src/ws/projects.rs b/jirs-server/src/ws/projects.rs index f579bf6b..203a8f58 100644 --- a/jirs-server/src/ws/projects.rs +++ b/jirs-server/src/ws/projects.rs @@ -1,50 +1,44 @@ -use actix::Addr; -use actix_web::web::Data; +use futures::executor::block_on; use jirs_data::{UpdateProjectPayload, WsMsg}; use crate::db::projects::LoadCurrentProject; -use crate::db::DbExecutor; -use crate::ws::{current_user, WsResult}; +use crate::ws::{WebSocketActor, WsHandler, WsResult}; -pub async fn current_project( - db: &Data>, - user: &Option, -) -> WsResult { - let project_id = current_user(user).map(|u| u.project_id)?; +pub struct CurrentProject; - let m = match db.send(LoadCurrentProject { project_id }).await { - Ok(Ok(project)) => Some(WsMsg::ProjectLoaded(project.into())), - Ok(Err(e)) => { - error!("{:?}", e); - None - } - Err(e) => { - error!("{:?}", e); - None - } - }; - Ok(m) +impl WsHandler for WebSocketActor { + fn handle_msg(&mut self, _msg: CurrentProject, _ctx: &mut Self::Context) -> WsResult { + let project_id = self.require_user()?.project_id; + + let m = match block_on(self.db.send(LoadCurrentProject { project_id })) { + Ok(Ok(project)) => Some(WsMsg::ProjectLoaded(project.into())), + Ok(Err(e)) => { + error!("{:?}", e); + None + } + Err(e) => { + error!("{:?}", e); + None + } + }; + Ok(m) + } } -pub async fn update_project( - db: &Data>, - user: &Option, - payload: UpdateProjectPayload, -) -> WsResult { - let project_id = current_user(user).map(|u| u.project_id)?; - let project = match db - .send(crate::db::projects::UpdateProject { +impl WsHandler for WebSocketActor { + fn handle_msg(&mut self, msg: UpdateProjectPayload, _ctx: &mut Self::Context) -> WsResult { + let project_id = self.require_user()?.project_id; + let project = match block_on(self.db.send(crate::db::projects::UpdateProject { project_id, - name: payload.name, - url: payload.url, - description: payload.description, - category: payload.category, - }) - .await - { - Ok(Ok(project)) => project, - _ => return Ok(None), - }; - Ok(Some(WsMsg::ProjectLoaded(project.into()))) + name: msg.name, + url: msg.url, + description: msg.description, + category: msg.category, + })) { + Ok(Ok(project)) => project, + _ => return Ok(None), + }; + Ok(Some(WsMsg::ProjectLoaded(project.into()))) + } } diff --git a/jirs-server/src/ws/users.rs b/jirs-server/src/ws/users.rs index ade94a9c..927ba4bb 100644 --- a/jirs-server/src/ws/users.rs +++ b/jirs-server/src/ws/users.rs @@ -1,50 +1,50 @@ -use actix::Addr; -use actix_web::web::Data; +use futures::executor::block_on; use jirs_data::WsMsg; -use crate::db::users::{LoadProjectUsers, Register}; -use crate::db::DbExecutor; -use crate::mail::MailExecutor; -use crate::ws::auth::authenticate; -use crate::ws::{current_user, WsResult}; +use crate::db::users::Register as DbRegister; +use crate::ws::auth::Authenticate; +use crate::ws::{current_user, WebSocketActor, WsHandler, WsResult}; -pub async fn load_project_users( - db: &Data>, - user: &Option, -) -> WsResult { - let project_id = current_user(user).map(|u| u.project_id)?; - let m = match db.send(LoadProjectUsers { project_id }).await { - Ok(Ok(v)) => Some(WsMsg::ProjectUsersLoaded( - v.into_iter().map(|i| i.into()).collect(), - )), - _ => None, - }; - Ok(m) +pub struct LoadProjectUsers; + +impl WsHandler for WebSocketActor { + fn handle_msg(&mut self, _msg: LoadProjectUsers, _ctx: &mut Self::Context) -> WsResult { + use crate::db::users::LoadProjectUsers as Msg; + + let project_id = current_user(&self.current_user).map(|u| u.project_id)?; + let m = match block_on(self.db.send(Msg { project_id })) { + Ok(Ok(v)) => Some(WsMsg::ProjectUsersLoaded( + v.into_iter().map(|i| i.into()).collect(), + )), + _ => None, + }; + Ok(m) + } } -pub async fn register( - db: &Data>, - mail: &Data>, - name: String, - email: String, -) -> WsResult { - let msg = match db - .send(Register { +pub struct Register { + pub name: String, + pub email: String, +} + +impl WsHandler for WebSocketActor { + fn handle_msg(&mut self, msg: Register, ctx: &mut Self::Context) -> WsResult { + let Register { name, email } = msg; + let msg = match block_on(self.db.send(DbRegister { name: name.clone(), email: email.clone(), - }) - .await - { - Ok(Ok(_)) => Some(WsMsg::SignUpSuccess), - Ok(Err(_)) => Some(WsMsg::SignUpPairTaken), - _ => None, - }; + })) { + Ok(Ok(_)) => Some(WsMsg::SignUpSuccess), + Ok(Err(_)) => Some(WsMsg::SignUpPairTaken), + _ => None, + }; - match authenticate(db, mail, name, email).await { - Ok(_) => (), - Err(e) => return Ok(Some(e)), - }; + match self.handle_msg(Authenticate { name, email }, ctx) { + Ok(_) => (), + Err(e) => return Ok(Some(e)), + }; - Ok(msg) + Ok(msg) + } }