bitque/jirs-server/src/ws/mod.rs

372 lines
11 KiB
Rust
Raw Normal View History

2020-04-19 10:57:09 +02:00
use std::collections::{HashMap, HashSet};
use actix::{
Actor, ActorContext, Addr, AsyncContext, Context, Handler, Message, Recipient, StreamHandler,
};
2020-04-06 08:38:08 +02:00
use actix_web::web::Data;
2020-04-05 15:15:09 +02:00
use actix_web::{get, web, Error, HttpRequest, HttpResponse};
use actix_web_actors::ws;
2020-04-19 10:57:09 +02:00
use futures::executor::block_on;
2020-04-05 15:15:09 +02:00
2020-04-21 09:19:15 +02:00
use jirs_data::{ProjectId, Token, UserId, WsMsg};
2020-04-05 15:15:09 +02:00
2020-04-06 22:59:33 +02:00
use crate::db::authorize_user::AuthorizeUser;
2020-04-16 16:11:19 +02:00
use crate::db::tokens::FindBindToken;
2020-04-06 08:38:08 +02:00
use crate::db::DbExecutor;
use crate::mail::MailExecutor;
2020-04-06 08:38:08 +02:00
2020-04-16 16:11:19 +02:00
pub mod auth;
2020-04-11 11:18:41 +02:00
pub mod comments;
2020-04-21 11:49:45 +02:00
pub mod invitations;
2020-04-11 11:18:41 +02:00
pub mod issues;
pub mod projects;
pub mod users;
pub type WsResult = std::result::Result<Option<WsMsg>, WsMsg>;
pub fn current_user(current_user: &Option<jirs_data::User>) -> Result<&jirs_data::User, WsMsg> {
current_user
.as_ref()
.map(|u| u)
.ok_or_else(|| WsMsg::AuthorizeExpired)
}
2020-04-08 20:26:28 +02:00
2020-04-06 22:59:33 +02:00
trait WsMessageSender {
2020-04-19 10:57:09 +02:00
fn send_msg(&mut self, msg: &jirs_data::WsMsg);
2020-04-06 22:59:33 +02:00
}
2020-04-06 08:38:08 +02:00
struct WebSocketActor {
db: Data<Addr<DbExecutor>>,
mail: Data<Addr<MailExecutor>>,
2020-04-06 22:59:33 +02:00
current_user: Option<jirs_data::User>,
2020-04-19 10:57:09 +02:00
addr: Addr<WsServer>,
2020-04-06 08:38:08 +02:00
}
impl Actor for WebSocketActor {
2020-04-06 22:59:33 +02:00
type Context = ws::WebsocketContext<WebSocketActor>;
2020-04-05 15:15:09 +02:00
}
2020-04-06 22:59:33 +02:00
impl WsMessageSender for ws::WebsocketContext<WebSocketActor> {
2020-04-19 10:57:09 +02:00
fn send_msg(&mut self, msg: &WsMsg) {
self.binary(bincode::serialize(msg).unwrap())
}
}
impl Handler<InnerMsg> for WebSocketActor {
type Result = ();
fn handle(&mut self, msg: InnerMsg, ctx: &mut Self::Context) -> Self::Result {
match msg {
InnerMsg::Transfer(msg) => ctx.send_msg(&msg),
_ => {}
};
2020-04-06 22:59:33 +02:00
}
}
impl WebSocketActor {
2020-04-19 10:57:09 +02:00
fn broadcast(&self, msg: &WsMsg) {
let user = match self.current_user.as_ref() {
Some(u) => u,
_ => return,
};
self.addr
.do_send(InnerMsg::BroadcastToChannel(user.project_id, msg.clone()));
}
2020-04-06 08:38:08 +02:00
2020-04-19 10:57:09 +02:00
fn handle_ws_msg(
&mut self,
msg: WsMsg,
ctx: &mut <WebSocketActor as Actor>::Context,
) -> WsResult {
2020-04-08 20:26:28 +02:00
if msg != WsMsg::Ping && msg != WsMsg::Pong {
2020-04-19 10:57:09 +02:00
debug!("incoming message: {:?}", msg);
2020-04-08 20:26:28 +02:00
}
2020-04-19 10:57:09 +02:00
let msg = match msg {
WsMsg::Ping => Some(WsMsg::Pong),
WsMsg::Pong => Some(WsMsg::Ping),
2020-04-19 10:57:09 +02:00
// Issues
WsMsg::IssueUpdateRequest(id, field_id, payload) => match block_on(
issues::update_issue(&self.db, &self.current_user, id, field_id, payload),
) {
Ok(Some(msg)) => {
self.broadcast(&msg);
None
}
2020-04-19 10:57:09 +02:00
_ => None,
},
WsMsg::IssueCreateRequest(payload) => {
block_on(issues::add_issue(&self.db, &self.current_user, payload))?
}
WsMsg::IssueDeleteRequest(id) => {
block_on(issues::delete_issue(&self.db, &self.current_user, id))?
}
WsMsg::ProjectIssuesRequest => {
block_on(issues::load_issues(&self.db, &self.current_user))?
}
2020-04-19 10:57:09 +02:00
// projects
WsMsg::ProjectRequest => {
block_on(projects::current_project(&self.db, &self.current_user))?
}
2020-04-19 10:57:09 +02:00
WsMsg::ProjectUpdateRequest(payload) => block_on(projects::update_project(
&self.db,
&self.current_user,
payload,
))?,
2020-04-17 20:44:55 +02:00
2020-04-19 10:57:09 +02:00
// auth
WsMsg::AuthorizeRequest(uuid) => block_on(self.check_auth_token(uuid, ctx))?,
WsMsg::BindTokenCheck(uuid) => block_on(self.check_bind_token(uuid))?,
WsMsg::AuthenticateRequest(email, name) => {
block_on(auth::authenticate(&self.db, &self.mail, name, email))?
}
// register
WsMsg::SignUpRequest(email, username) => {
block_on(users::register(&self.db, &self.mail, username, email))?
}
// users
WsMsg::ProjectUsersRequest => {
block_on(users::load_project_users(&self.db, &self.current_user))?
}
// comments
WsMsg::IssueCommentsRequest(issue_id) => block_on(comments::load_issues(
&self.db,
&self.current_user,
issue_id,
))?,
WsMsg::CreateComment(payload) => block_on(comments::create_comment(
&self.db,
&self.current_user,
payload,
))?,
2020-04-19 10:57:09 +02:00
WsMsg::UpdateComment(payload) => match block_on(comments::update_comment(
&self.db,
&self.current_user,
payload,
)) {
Ok(Some(msg)) => {
self.broadcast(&msg);
None
}
2020-04-19 10:57:09 +02:00
_ => None,
},
WsMsg::CommentDeleteRequest(comment_id) => block_on(comments::delete_comment(
&self.db,
&self.current_user,
comment_id,
))?,
// else fail
_ => {
error!("No handle for {:?} specified", msg);
None
}
};
2020-04-14 16:20:05 +02:00
if msg.is_some() && msg != Some(WsMsg::Pong) {
info!("sending message {:?}", msg);
}
2020-04-08 20:26:28 +02:00
Ok(msg)
2020-04-06 22:59:33 +02:00
}
2020-04-19 10:57:09 +02:00
async fn check_auth_token(
&mut self,
token: uuid::Uuid,
ctx: &mut <WebSocketActor as Actor>::Context,
) -> WsResult {
2020-04-08 20:26:28 +02:00
let m = match self
2020-04-06 22:59:33 +02:00
.db
.send(AuthorizeUser {
access_token: token,
})
.await
{
2020-04-08 20:26:28 +02:00
Ok(Ok(u)) => {
let user: jirs_data::User = u.into();
self.current_user = Some(user.clone());
2020-04-19 10:57:09 +02:00
self.join_channel(ctx.address().recipient()).await;
2020-04-08 20:26:28 +02:00
Some(WsMsg::AuthorizeLoaded(Ok(user)))
}
Ok(Err(_)) => Some(WsMsg::AuthorizeLoaded(
Err("Invalid auth token".to_string()),
)),
_ => Some(WsMsg::AuthorizeExpired),
};
Ok(m)
}
2020-04-16 16:11:19 +02:00
async fn check_bind_token(&mut self, bind_token: uuid::Uuid) -> WsResult {
2020-04-21 09:19:15 +02:00
let token: Token = match self.db.send(FindBindToken { token: bind_token }).await {
Ok(Ok(token)) => token,
Ok(Err(_)) => return Ok(Some(WsMsg::BindTokenBad)),
_ => return Ok(None),
};
2020-04-16 16:11:19 +02:00
Ok(Some(WsMsg::BindTokenOk(token.access_token)))
}
2020-04-19 10:57:09 +02:00
async fn join_channel(&self, addr: Recipient<InnerMsg>) {
info!("joining channel...");
info!(" current user {:?}", self.current_user);
let user = match self.current_user.as_ref() {
None => return,
Some(u) => u,
};
match self
.addr
.send(InnerMsg::Join(user.project_id, user.id, addr))
.await
{
Err(e) => error!("{}", e),
_ => info!(" joined channel"),
};
}
2020-04-19 22:31:01 +02:00
fn require_user(&self) -> Result<&jirs_data::User, WsMsg> {
current_user(&self.current_user)
}
2020-04-06 22:59:33 +02:00
}
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WebSocketActor {
fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
2020-04-05 15:15:09 +02:00
match msg {
Ok(ws::Message::Ping(msg)) => ctx.pong(&msg),
Ok(ws::Message::Text(text)) => ctx.text(text),
2020-04-06 22:59:33 +02:00
2020-04-06 08:38:08 +02:00
Ok(ws::Message::Binary(bin)) => {
let ws_msg: bincode::Result<jirs_data::WsMsg> =
bincode::deserialize(bin.to_vec().as_slice());
2020-04-06 22:59:33 +02:00
let msg = match ws_msg {
2020-04-08 20:26:28 +02:00
Ok(m) => m,
2020-04-06 22:59:33 +02:00
_ => return,
};
2020-04-19 10:57:09 +02:00
match self.handle_ws_msg(msg, ctx) {
Ok(Some(msg)) => ctx.send_msg(&msg),
Err(e) => ctx.send_msg(&e),
2020-04-08 20:26:28 +02:00
_ => (),
2020-04-06 08:38:08 +02:00
};
}
2020-04-05 15:15:09 +02:00
_ => (),
}
}
2020-04-19 10:57:09 +02:00
fn finished(&mut self, ctx: &mut Self::Context) {
info!("Disconnected");
if let Some(user) = self.current_user.as_ref() {
self.addr.do_send(InnerMsg::Leave(user.project_id, user.id));
}
ctx.stop()
}
}
#[derive(Message, Debug)]
#[rtype(result = "()")]
pub enum InnerMsg {
Join(ProjectId, UserId, Recipient<InnerMsg>),
Leave(ProjectId, UserId),
BroadcastToChannel(ProjectId, WsMsg),
Transfer(WsMsg),
}
pub struct WsServer {
sessions: HashMap<i32, Recipient<InnerMsg>>,
rooms: HashMap<i32, HashSet<i32>>,
}
impl Default for WsServer {
fn default() -> Self {
Self {
sessions: HashMap::new(),
rooms: HashMap::new(),
}
}
}
impl Message for WsServer {
type Result = ();
}
impl Actor for WsServer {
type Context = Context<Self>;
}
impl Handler<InnerMsg> for WsServer {
type Result = ();
fn handle(&mut self, msg: InnerMsg, _ctx: &mut Self::Context) -> Self::Result {
debug!("receive {:?}", msg);
match msg {
InnerMsg::Join(project_id, user_id, recipient) => {
self.sessions.insert(user_id, recipient);
self.ensure_room(project_id);
if let Some(room) = self.rooms.get_mut(&project_id) {
room.insert(user_id);
}
}
InnerMsg::Leave(project_id, user_id) => {
self.ensure_room(project_id);
if let Some(room) = self.rooms.get_mut(&project_id) {
room.remove(&user_id);
}
self.sessions.remove(&user_id);
}
InnerMsg::BroadcastToChannel(project_id, msg) => {
debug!("Begin broadcast to channel {} msg {:?}", project_id, msg);
let set = match self.rooms.get(&project_id) {
Some(s) => s,
_ => return debug!(" channel not found, aborting..."),
};
for r in set {
let recipient = match self.sessions.get(r) {
Some(r) => r,
_ => {
debug!("recipient is dead, skipping...");
continue;
}
};
match recipient.do_send(InnerMsg::Transfer(msg.clone())) {
Ok(_) => debug!("msg sent"),
Err(e) => error!("{}", e),
}
}
}
_ => (),
}
}
}
impl WsServer {
pub fn ensure_room(&mut self, room: i32) {
if !self.rooms.contains_key(&room) {
self.rooms.insert(room, HashSet::new());
}
}
2020-04-05 15:15:09 +02:00
}
#[get("/ws/")]
2020-04-06 08:38:08 +02:00
pub async fn index(
req: HttpRequest,
stream: web::Payload,
db: Data<Addr<DbExecutor>>,
mail: Data<Addr<MailExecutor>>,
2020-04-19 10:57:09 +02:00
ws_server: Data<Addr<WsServer>>,
2020-04-06 08:38:08 +02:00
) -> Result<HttpResponse, Error> {
2020-04-06 22:59:33 +02:00
ws::start(
WebSocketActor {
db,
mail,
2020-04-06 22:59:33 +02:00
current_user: None,
2020-04-19 10:57:09 +02:00
addr: ws_server.get_ref().clone(),
2020-04-06 22:59:33 +02:00
},
&req,
stream,
)
2020-04-05 15:15:09 +02:00
}