bitque/actors/websocket-actor/src/lib.rs

349 lines
11 KiB
Rust
Raw Normal View History

#[macro_use]
extern crate log;
use {
crate::{
handlers::*,
server::{InnerMsg, WsServer},
},
actix::{Actor, ActorContext, Addr, AsyncContext, Handler, Recipient, StreamHandler},
actix_web::{
get,
web::{self, Data},
Error, HttpRequest, HttpResponse,
},
actix_web_actors::ws,
database_actor::{projects::LoadCurrentProject, user_projects::CurrentUserProject, DbExecutor},
futures::executor::block_on,
jirs_data::{Project, User, UserProject, WsMsg},
log::*,
mail_actor::MailExecutor,
};
pub mod handlers;
pub mod prelude;
pub mod server;
2020-04-11 11:18:41 +02:00
pub type WsResult = std::result::Result<Option<WsMsg>, WsMsg>;
2020-04-06 22:59:33 +02:00
trait WsMessageSender {
2020-04-19 10:57:09 +02:00
fn send_msg(&mut self, msg: &jirs_data::WsMsg);
2020-04-06 22:59:33 +02:00
}
2020-04-06 08:38:08 +02:00
struct WebSocketActor {
db: Data<Addr<DbExecutor>>,
mail: Data<Addr<MailExecutor>>,
2020-04-19 10:57:09 +02:00
addr: Addr<WsServer>,
hi: Data<Addr<highlight_actor::HighlightActor>>,
2020-05-21 17:02:16 +02:00
current_user: Option<jirs_data::User>,
current_user_project: Option<jirs_data::UserProject>,
current_project: Option<jirs_data::Project>,
2020-04-06 08:38:08 +02:00
}
impl Actor for WebSocketActor {
2020-04-06 22:59:33 +02:00
type Context = ws::WebsocketContext<WebSocketActor>;
2020-04-05 15:15:09 +02:00
}
2020-04-06 22:59:33 +02:00
impl WsMessageSender for ws::WebsocketContext<WebSocketActor> {
2020-04-19 10:57:09 +02:00
fn send_msg(&mut self, msg: &WsMsg) {
self.binary(bincode::serialize(msg).unwrap())
}
}
impl Handler<InnerMsg> for WebSocketActor {
type Result = ();
2020-10-18 10:35:41 +02:00
fn handle(&mut self, msg: InnerMsg, ctx: &mut <Self as Actor>::Context) -> Self::Result {
if let InnerMsg::Transfer(msg) = msg {
ctx.send_msg(&msg)
2020-04-19 10:57:09 +02:00
};
2020-04-06 22:59:33 +02:00
}
}
impl WebSocketActor {
2020-04-19 10:57:09 +02:00
fn broadcast(&self, msg: &WsMsg) {
2020-05-21 17:02:16 +02:00
let project_id = match self.require_user_project() {
Ok(up) => up.project_id,
2020-04-19 10:57:09 +02:00
_ => return,
};
self.addr
2020-05-21 17:02:16 +02:00
.do_send(InnerMsg::BroadcastToChannel(project_id, msg.clone()));
2020-04-19 10:57:09 +02:00
}
2020-04-06 08:38:08 +02:00
2020-04-19 10:57:09 +02:00
fn handle_ws_msg(
&mut self,
msg: WsMsg,
ctx: &mut <WebSocketActor as Actor>::Context,
) -> WsResult {
2020-04-08 20:26:28 +02:00
if msg != WsMsg::Ping && msg != WsMsg::Pong {
2020-04-19 10:57:09 +02:00
debug!("incoming message: {:?}", msg);
2020-04-08 20:26:28 +02:00
}
2020-04-19 10:57:09 +02:00
let msg = match msg {
WsMsg::Ping => Some(WsMsg::Pong),
WsMsg::Pong => Some(WsMsg::Ping),
2020-05-06 22:24:58 +02:00
// issues
WsMsg::IssueUpdate(id, field_id, payload) => self.handle_msg(
2020-04-21 19:35:39 +02:00
UpdateIssueHandler {
id,
field_id,
payload,
},
ctx,
)?,
WsMsg::IssueCreate(payload) => self.handle_msg(payload, ctx)?,
WsMsg::IssueDelete(id) => self.handle_msg(DeleteIssue { id }, ctx)?,
WsMsg::IssueSyncListPosition(sync) => {
self.handle_msg(SyncIssueListPosition(sync), ctx)?
}
WsMsg::ProjectIssuesLoad => self.handle_msg(LoadIssues, ctx)?,
2020-05-06 22:24:58 +02:00
// issue statuses
WsMsg::IssueStatusesLoad => self.handle_msg(LoadIssueStatuses, ctx)?,
2020-05-07 17:08:40 +02:00
WsMsg::IssueStatusDelete(issue_status_id) => {
self.handle_msg(DeleteIssueStatus { issue_status_id }, ctx)?
}
WsMsg::IssueStatusUpdate(issue_status_id, name, position) => self.handle_msg(
UpdateIssueStatus {
issue_status_id,
name,
position,
},
ctx,
)?,
WsMsg::IssueStatusCreate(name, position) => {
self.handle_msg(CreateIssueStatus { name, position }, ctx)?
}
2020-05-06 22:24:58 +02:00
2020-04-19 10:57:09 +02:00
// projects
2020-05-21 21:38:46 +02:00
WsMsg::ProjectsLoad => self.handle_msg(LoadProjects, ctx)?,
WsMsg::ProjectUpdateLoad(payload) => self.handle_msg(payload, ctx)?,
2020-04-17 20:44:55 +02:00
2020-05-21 21:38:46 +02:00
// user projects
WsMsg::UserProjectsLoad => self.handle_msg(LoadUserProjects, ctx)?,
2020-05-28 10:30:07 +02:00
WsMsg::UserProjectSetCurrent(user_project_id) => self.handle_msg(
SetCurrentUserProject {
id: user_project_id,
},
ctx,
)?,
2020-05-21 21:38:46 +02:00
2020-04-19 10:57:09 +02:00
// auth
WsMsg::AuthorizeLoad(uuid) => self.handle_msg(CheckAuthToken { token: uuid }, ctx)?,
2020-04-21 19:35:39 +02:00
WsMsg::BindTokenCheck(uuid) => {
self.handle_msg(CheckBindToken { bind_token: uuid }, ctx)?
}
2020-04-19 10:57:09 +02:00
WsMsg::AuthenticateRequest(email, name) => {
2020-04-21 19:35:39 +02:00
self.handle_msg(Authenticate { name, email }, ctx)?
2020-04-19 10:57:09 +02:00
}
// register
2020-04-21 19:35:39 +02:00
WsMsg::SignUpRequest(email, username) => self.handle_msg(
Register {
2020-04-21 19:35:39 +02:00
name: username,
email,
},
ctx,
)?,
2020-04-19 10:57:09 +02:00
// users
WsMsg::ProjectUsersLoad => self.handle_msg(LoadProjectUsers, ctx)?,
2020-05-22 17:35:32 +02:00
WsMsg::InvitedUserRemoveRequest(user_id) => {
self.handle_msg(RemoveInvitedUser { user_id }, ctx)?
}
2020-04-19 10:57:09 +02:00
// comments
WsMsg::IssueCommentsLoad(issue_id) => {
self.handle_msg(LoadIssueComments { issue_id }, ctx)?
2020-04-21 19:35:39 +02:00
}
WsMsg::CommentCreate(payload) => self.handle_msg(payload, ctx)?,
WsMsg::CommentUpdate(payload) => self.handle_msg(payload, ctx)?,
WsMsg::CommentDelete(comment_id) => {
self.handle_msg(DeleteComment { comment_id }, ctx)?
2020-04-21 19:35:39 +02:00
}
// invitations
2020-05-21 21:38:46 +02:00
WsMsg::InvitationSendRequest { name, email, role } => {
self.handle_msg(CreateInvitation { name, email, role }, ctx)?
}
WsMsg::InvitationListLoad => self.handle_msg(ListInvitation, ctx)?,
2020-05-22 17:35:32 +02:00
WsMsg::InvitationAcceptRequest(invitation_token) => {
self.handle_msg(AcceptInvitation { invitation_token }, ctx)?
}
2020-04-21 19:35:39 +02:00
WsMsg::InvitationRevokeRequest(id) => self.handle_msg(RevokeInvitation { id }, ctx)?,
WsMsg::InvitedUsersLoad => self.handle_msg(LoadInvitedUsers, ctx)?,
2020-04-19 10:57:09 +02:00
2020-05-06 19:13:16 +02:00
// users
WsMsg::ProfileUpdate(email, name) => {
self.handle_msg(ProfileUpdate { email, name }, ctx)?
}
// messages
WsMsg::MessagesLoad => self.handle_msg(LoadMessages, ctx)?,
2020-05-29 21:14:07 +02:00
WsMsg::MessageMarkSeen(id) => self.handle_msg(MarkMessageSeen { id }, ctx)?,
// epics
WsMsg::EpicsLoad => self.handle_msg(epics::LoadEpics, ctx)?,
WsMsg::EpicCreate(name) => self.handle_msg(epics::CreateEpic { name }, ctx)?,
WsMsg::EpicUpdate(epic_id, name) => {
self.handle_msg(epics::UpdateEpic { epic_id, name }, ctx)?
}
WsMsg::EpicDelete(epic_id) => self.handle_msg(epics::DeleteEpic { epic_id }, ctx)?,
WsMsg::HighlightCode(lang, code) => {
self.handle_msg(hi::HighlightCode(lang, code), ctx)?
}
2020-04-19 10:57:09 +02:00
// else fail
_ => {
error!("No handle for {:?} specified", msg);
None
}
};
2020-04-14 16:20:05 +02:00
if msg.is_some() && msg != Some(WsMsg::Pong) {
info!("sending message {:?}", msg);
}
2020-04-08 20:26:28 +02:00
Ok(msg)
2020-04-06 22:59:33 +02:00
}
2020-04-19 10:57:09 +02:00
async fn join_channel(&self, addr: Recipient<InnerMsg>) {
info!("joining channel...");
info!(" current user {:?}", self.current_user);
2020-05-21 17:02:16 +02:00
2020-04-19 10:57:09 +02:00
let user = match self.current_user.as_ref() {
None => return,
Some(u) => u,
};
2020-05-21 17:02:16 +02:00
let project_id = match self.require_user_project() {
Ok(user_project) => user_project.project_id,
_ => return,
};
2020-04-19 10:57:09 +02:00
match self
.addr
2020-05-21 17:02:16 +02:00
.send(InnerMsg::Join(project_id, user.id, addr))
2020-04-19 10:57:09 +02:00
.await
{
Err(e) => error!("{}", e),
_ => info!(" joined channel"),
};
}
2020-05-21 17:02:16 +02:00
fn require_user(&self) -> Result<&User, WsMsg> {
2020-10-18 10:35:41 +02:00
self.current_user
.as_ref()
.map(|u| u)
.ok_or_else(|| WsMsg::AuthorizeExpired)
2020-04-19 22:31:01 +02:00
}
2020-05-21 17:02:16 +02:00
fn require_user_project(&self) -> Result<&UserProject, WsMsg> {
self.current_user_project
.as_ref()
.map(|u| u)
2020-10-21 23:59:17 +02:00
.ok_or_else(|| WsMsg::AuthorizeExpired)
2020-05-21 17:02:16 +02:00
}
fn load_user_project(&self) -> Result<UserProject, WsMsg> {
let user_id = self.require_user()?.id;
2020-05-21 17:02:16 +02:00
match block_on(self.db.send(CurrentUserProject { user_id })) {
Ok(Ok(user_project)) => Ok(user_project),
Ok(Err(e)) => {
2020-10-21 23:59:17 +02:00
error!("load_user_project encounter service error {:?}", e);
Err(WsMsg::AuthorizeExpired)
}
Err(e) => {
2020-10-21 23:59:17 +02:00
error!("load_user_project encounter mailbox error {}", e);
Err(WsMsg::AuthorizeExpired)
}
2020-05-21 17:02:16 +02:00
}
}
fn load_project(&self) -> Result<Project, WsMsg> {
let project_id = self.require_user_project()?.project_id;
match block_on(self.db.send(LoadCurrentProject { project_id })) {
Ok(Ok(project)) => Ok(project),
Ok(Err(e)) => {
error!("{:?}", e);
Err(WsMsg::AuthorizeExpired)
}
Err(e) => {
error!("{}", e);
Err(WsMsg::AuthorizeExpired)
}
2020-05-21 17:02:16 +02:00
}
}
2020-04-06 22:59:33 +02:00
}
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WebSocketActor {
2020-10-18 10:35:41 +02:00
fn handle(
&mut self,
msg: Result<ws::Message, ws::ProtocolError>,
ctx: &mut <Self as Actor>::Context,
) {
2020-04-05 15:15:09 +02:00
match msg {
Ok(ws::Message::Ping(msg)) => ctx.pong(&msg),
Ok(ws::Message::Text(text)) => ctx.text(text),
2020-04-06 22:59:33 +02:00
2020-04-06 08:38:08 +02:00
Ok(ws::Message::Binary(bin)) => {
let ws_msg: bincode::Result<jirs_data::WsMsg> =
bincode::deserialize(bin.to_vec().as_slice());
2020-04-06 22:59:33 +02:00
let msg = match ws_msg {
2020-04-08 20:26:28 +02:00
Ok(m) => m,
2020-04-06 22:59:33 +02:00
_ => return,
};
2020-04-19 10:57:09 +02:00
match self.handle_ws_msg(msg, ctx) {
Ok(Some(msg)) => ctx.send_msg(&msg),
Err(e) => ctx.send_msg(&e),
2020-04-08 20:26:28 +02:00
_ => (),
2020-04-06 08:38:08 +02:00
};
}
2020-04-05 15:15:09 +02:00
_ => (),
}
}
2020-04-19 10:57:09 +02:00
2020-10-18 10:35:41 +02:00
fn finished(&mut self, ctx: &mut <Self as Actor>::Context) {
2020-04-19 10:57:09 +02:00
info!("Disconnected");
2020-05-21 17:02:16 +02:00
if let (Some(user), Some(up)) = (
self.current_user.as_ref(),
self.current_user_project.as_ref(),
) {
self.addr.do_send(InnerMsg::Leave(
2020-05-21 17:02:16 +02:00
up.project_id,
user.id,
ctx.address().recipient(),
));
2020-04-19 10:57:09 +02:00
}
ctx.stop()
}
}
2020-04-21 19:35:39 +02:00
pub trait WsHandler<Message>
where
Self: Actor,
{
fn handle_msg(&mut self, msg: Message, _ctx: &mut <Self as Actor>::Context) -> WsResult;
2020-04-21 13:27:54 +02:00
}
2020-04-05 15:15:09 +02:00
#[get("/ws/")]
2020-04-06 08:38:08 +02:00
pub async fn index(
req: HttpRequest,
stream: web::Payload,
db: Data<Addr<DbExecutor>>,
mail: Data<Addr<MailExecutor>>,
2020-04-19 10:57:09 +02:00
ws_server: Data<Addr<WsServer>>,
hi: Data<Addr<highlight_actor::HighlightActor>>,
2020-04-06 08:38:08 +02:00
) -> Result<HttpResponse, Error> {
2020-04-06 22:59:33 +02:00
ws::start(
WebSocketActor {
db,
mail,
hi,
2020-04-06 22:59:33 +02:00
current_user: None,
2020-05-21 17:02:16 +02:00
current_user_project: None,
current_project: None,
2020-04-19 10:57:09 +02:00
addr: ws_server.get_ref().clone(),
2020-04-06 22:59:33 +02:00
},
&req,
stream,
)
2020-04-05 15:15:09 +02:00
}