bitque/jirs-server/src/ws/mod.rs

271 lines
8.5 KiB
Rust
Raw Normal View History

2020-04-11 09:20:40 +02:00
use std::collections::HashMap;
2020-04-06 08:38:08 +02:00
use actix::{Actor, Addr, StreamHandler};
use actix_web::web::Data;
2020-04-05 15:15:09 +02:00
use actix_web::{get, web, Error, HttpRequest, HttpResponse};
use actix_web_actors::ws;
2020-04-08 20:26:28 +02:00
use jirs_data::WsMsg;
2020-04-05 15:15:09 +02:00
2020-04-06 22:59:33 +02:00
use crate::db::authorize_user::AuthorizeUser;
2020-04-11 09:20:40 +02:00
use crate::db::issue_assignees::LoadAssignees;
2020-04-07 16:02:13 +02:00
use crate::db::issues::{LoadProjectIssues, UpdateIssue};
2020-04-06 08:38:08 +02:00
use crate::db::projects::LoadCurrentProject;
2020-04-06 22:59:33 +02:00
use crate::db::users::LoadProjectUsers;
2020-04-06 08:38:08 +02:00
use crate::db::DbExecutor;
2020-04-08 20:26:28 +02:00
type WsResult = std::result::Result<Option<WsMsg>, WsMsg>;
2020-04-06 22:59:33 +02:00
trait WsMessageSender {
fn send_msg(&mut self, msg: jirs_data::WsMsg);
}
2020-04-06 08:38:08 +02:00
struct WebSocketActor {
db: Data<Addr<DbExecutor>>,
2020-04-06 22:59:33 +02:00
current_user: Option<jirs_data::User>,
2020-04-06 08:38:08 +02:00
}
impl Actor for WebSocketActor {
2020-04-06 22:59:33 +02:00
type Context = ws::WebsocketContext<WebSocketActor>;
2020-04-05 15:15:09 +02:00
}
2020-04-06 22:59:33 +02:00
impl WsMessageSender for ws::WebsocketContext<WebSocketActor> {
fn send_msg(&mut self, msg: WsMsg) {
self.binary(bincode::serialize(&msg).unwrap())
}
}
impl WebSocketActor {
2020-04-08 20:26:28 +02:00
fn handle_ws_msg(&mut self, msg: WsMsg) -> WsResult {
2020-04-06 08:38:08 +02:00
use futures::executor::block_on;
2020-04-08 20:26:28 +02:00
if msg != WsMsg::Ping && msg != WsMsg::Pong {
2020-04-10 22:33:07 +02:00
info!("incoming message: {:?}", msg);
2020-04-08 20:26:28 +02:00
}
let msg = match msg {
2020-04-06 22:59:33 +02:00
WsMsg::Ping => Some(WsMsg::Pong),
WsMsg::Pong => Some(WsMsg::Ping),
2020-04-08 20:26:28 +02:00
WsMsg::IssueUpdateRequest(id, payload) => block_on(self.update_issue(id, payload))?,
WsMsg::IssueCreateRequest(payload) => block_on(self.add_issue(payload))?,
WsMsg::IssueDeleteRequest(id) => block_on(self.delete_issue(id))?,
WsMsg::ProjectRequest => block_on(self.load_project())?,
WsMsg::AuthorizeRequest(uuid) => block_on(self.authorize(uuid))?,
WsMsg::ProjectIssuesRequest => block_on(self.load_issues())?,
WsMsg::ProjectUsersRequest => block_on(self.load_project_users())?,
2020-04-06 22:59:33 +02:00
_ => {
2020-04-09 08:56:12 +02:00
error!("No handle for {:?} specified", msg);
2020-04-06 22:59:33 +02:00
None
}
2020-04-08 20:26:28 +02:00
};
Ok(msg)
2020-04-06 22:59:33 +02:00
}
2020-04-08 20:26:28 +02:00
async fn authorize(&mut self, token: uuid::Uuid) -> WsResult {
let m = match self
2020-04-06 22:59:33 +02:00
.db
.send(AuthorizeUser {
access_token: token,
})
.await
{
2020-04-08 20:26:28 +02:00
Ok(Ok(u)) => {
let user: jirs_data::User = u.into();
self.current_user = Some(user.clone());
Some(WsMsg::AuthorizeLoaded(Ok(user)))
}
Ok(Err(_)) => Some(WsMsg::AuthorizeLoaded(
Err("Invalid auth token".to_string()),
)),
_ => Some(WsMsg::AuthorizeExpired),
};
Ok(m)
}
fn current_user(&mut self) -> Result<&jirs_data::User, WsMsg> {
self.current_user
.as_ref()
.map(|u| u)
.ok_or_else(|| WsMsg::AuthorizeExpired)
2020-04-06 22:59:33 +02:00
}
2020-04-08 20:26:28 +02:00
async fn load_project(&mut self) -> WsResult {
let project_id = self.current_user().map(|u| u.project_id)?;
2020-04-06 22:59:33 +02:00
match self.db.send(LoadCurrentProject { project_id }).await {
2020-04-08 20:26:28 +02:00
Ok(Ok(p)) => Ok(Some(WsMsg::ProjectLoaded(p.into()))),
_ => Ok(None),
2020-04-06 22:59:33 +02:00
}
}
2020-04-08 20:26:28 +02:00
async fn load_issues(&mut self) -> WsResult {
let project_id = self.current_user().map(|u| u.project_id)?;
2020-04-11 09:20:40 +02:00
let issues: Vec<jirs_data::Issue> =
match self.db.send(LoadProjectIssues { project_id }).await {
Ok(Ok(v)) => v.into_iter().map(|i| i.into()).collect(),
_ => return Ok(None),
};
let mut issue_map = HashMap::new();
let mut queue = vec![];
for issue in issues.into_iter() {
let f = self.db.send(LoadAssignees {
issue_id: issue.id.clone(),
});
queue.push(f);
issue_map.insert(issue.id.clone(), issue);
}
for f in queue {
match f.await {
Ok(Ok(assignees)) => {
for assignee in assignees {
if let Some(issue) = issue_map.get_mut(&assignee.issue_id) {
issue.user_ids.push(assignee.user_id);
}
}
}
_ => {}
};
2020-04-06 22:59:33 +02:00
}
2020-04-11 09:20:40 +02:00
let mut issues = vec![];
for (_, issue) in issue_map.into_iter() {
issues.push(issue);
}
Ok(Some(WsMsg::ProjectIssuesLoaded(issues)))
2020-04-06 22:59:33 +02:00
}
2020-04-08 20:26:28 +02:00
async fn load_project_users(&mut self) -> WsResult {
let project_id = self.current_user().map(|u| u.project_id)?;
let m = match self.db.send(LoadProjectUsers { project_id }).await {
2020-04-06 22:59:33 +02:00
Ok(Ok(v)) => Some(WsMsg::ProjectUsersLoaded(
v.into_iter().map(|i| i.into()).collect(),
)),
_ => None,
2020-04-08 20:26:28 +02:00
};
Ok(m)
2020-04-06 22:59:33 +02:00
}
2020-04-07 16:02:13 +02:00
async fn update_issue(
&mut self,
issue_id: i32,
payload: jirs_data::UpdateIssuePayload,
2020-04-08 20:26:28 +02:00
) -> WsResult {
self.current_user()?;
2020-04-11 09:20:40 +02:00
let mut issue: jirs_data::Issue = match self
2020-04-07 16:02:13 +02:00
.db
.send(UpdateIssue {
issue_id,
2020-04-10 08:09:40 +02:00
title: Some(payload.title),
issue_type: Some(payload.issue_type),
status: Some(payload.status),
priority: Some(payload.priority),
list_position: Some(payload.list_position),
description: Some(payload.description),
description_text: Some(payload.description_text),
estimate: Some(payload.estimate),
time_spent: Some(payload.time_spent),
time_remaining: Some(payload.time_remaining),
project_id: Some(payload.project_id),
user_ids: Some(payload.user_ids),
2020-04-11 09:20:40 +02:00
reporter_id: Some(payload.reporter_id),
2020-04-07 16:02:13 +02:00
})
.await
{
2020-04-11 09:20:40 +02:00
Ok(Ok(issue)) => issue.into(),
_ => return Ok(None),
2020-04-08 20:26:28 +02:00
};
2020-04-11 09:20:40 +02:00
let assignees = match self
.db
.send(LoadAssignees {
issue_id: issue.id.clone(),
})
.await
{
Ok(Ok(v)) => v,
_ => vec![],
};
for assignee in assignees {
issue.user_ids.push(assignee.user_id);
}
Ok(Some(WsMsg::IssueUpdated(issue.into())))
2020-04-08 20:26:28 +02:00
}
async fn add_issue(&mut self, payload: jirs_data::CreateIssuePayload) -> WsResult {
self.current_user()?;
let msg = crate::db::issues::CreateIssue {
title: payload.title,
issue_type: payload.issue_type,
status: payload.status,
priority: payload.priority,
description: payload.description,
description_text: payload.description_text,
estimate: payload.estimate,
time_spent: payload.time_spent,
time_remaining: payload.time_remaining,
project_id: payload.project_id,
reporter_id: payload.reporter_id,
user_ids: payload.user_ids,
};
let m = match self.db.send(msg).await {
Ok(Ok(issue)) => Some(WsMsg::IssueCreated(issue.into())),
_ => None,
};
Ok(m)
}
async fn delete_issue(&mut self, id: i32) -> WsResult {
self.current_user()?;
let m = match self
.db
.send(crate::db::issues::DeleteIssue { issue_id: id })
.await
{
Ok(Ok(_)) => Some(WsMsg::IssueDeleted(id)),
_ => None,
};
Ok(m)
2020-04-07 16:02:13 +02:00
}
2020-04-06 22:59:33 +02:00
}
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WebSocketActor {
fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
2020-04-05 15:15:09 +02:00
match msg {
Ok(ws::Message::Ping(msg)) => ctx.pong(&msg),
Ok(ws::Message::Text(text)) => ctx.text(text),
2020-04-06 22:59:33 +02:00
2020-04-06 08:38:08 +02:00
Ok(ws::Message::Binary(bin)) => {
let ws_msg: bincode::Result<jirs_data::WsMsg> =
bincode::deserialize(bin.to_vec().as_slice());
2020-04-06 22:59:33 +02:00
let msg = match ws_msg {
2020-04-08 20:26:28 +02:00
Ok(m) => m,
2020-04-06 22:59:33 +02:00
_ => return,
};
2020-04-08 20:26:28 +02:00
match self.handle_ws_msg(msg) {
Ok(Some(msg)) => ctx.send_msg(msg),
Err(e) => ctx.send_msg(e),
_ => (),
2020-04-06 08:38:08 +02:00
};
}
2020-04-05 15:15:09 +02:00
_ => (),
}
}
}
#[get("/ws/")]
2020-04-06 08:38:08 +02:00
pub async fn index(
req: HttpRequest,
stream: web::Payload,
db: Data<Addr<DbExecutor>>,
) -> Result<HttpResponse, Error> {
2020-04-06 22:59:33 +02:00
ws::start(
WebSocketActor {
db,
current_user: None,
},
&req,
stream,
)
2020-04-05 15:15:09 +02:00
}