Attach wbe sockets

This commit is contained in:
Adrian Wozniak 2020-04-06 08:38:08 +02:00
parent 780a0c498a
commit e3bd542063
15 changed files with 246 additions and 130 deletions

1
Cargo.lock generated
View File

@ -1068,7 +1068,6 @@ dependencies = [
"futures 0.1.29",
"jirs-data",
"js-sys",
"lazy_static",
"seed",
"serde",
"serde_json",

View File

@ -21,15 +21,14 @@ serde_json = "*"
bincode = "1.2.1"
chrono = { version = "*", features = [ "serde" ] }
uuid = { version = "*", features = [ "serde" ] }
wasm-bindgen = "*"
wasm-bindgen = "0.2.60"
futures = "^0.1.26"
lazy_static = "*"
[dependencies.js-sys]
js-sys = "*"
version = "*"
[dependencies.web-sys]
version = "*"
version = "0.3.22"
features = [
"Window",
"DataTransfer",
@ -43,4 +42,6 @@ features = [
"WebSocket",
"BinaryType",
"Blob",
"MessageEvent",
"ErrorEvent",
]

View File

@ -32,7 +32,7 @@
margin-top: 24px;
}
#projectPage > #projectBoardFilters > #searchInput {
#projectPage > #projectBoardFilters > .textFilterBoard {
margin-right: 18px;
width: 160px;
}

View File

@ -1,7 +1,83 @@
import "./styles.css";
const getWsHostName = () => process.env.JIRS_SERVER_BIND === "0.0.0.0" ? 'localhost' : process.env.JIRS_SERVER_BIND;
const getProtocol = () => window.location.protocol.replace(/^http/, 'ws');
const wsUrl = () => `${ getProtocol() }//${ getWsHostName() }:${ process.env.JIRS_SERVER_PORT }/ws/`;
import("../pkg/index.js").then(module => {
const host_url = `${location.protocol}//${process.env.JIRS_SERVER_BIND}:${process.env.JIRS_SERVER_PORT}`;
let queue = [];
let ws;
const buildWebSocket = () => {
ws = new WebSocket(wsUrl());
ws.binaryType = 'blob';
ws.onopen = event => {
console.log('open', event);
};
ws.onerror = event => {
console.error(event);
};
ws.onmessage = async event => {
const arrayBuffer = await event.data.arrayBuffer();
const array = new Uint8Array(arrayBuffer);
module.handle_ws_message(array);
};
};
buildWebSocket();
window.send_bin_code = code => queue.push(code);
let wsCheckDelay = 100;
const flush = () => {
if (queue.length >= 1000) {
ws.close();
throw new Error("Message queue overflow");
}
// if (queue.length && wsCheckDelay <= 0) console.log(ws.readyState, queue);
switch (ws.readyState) {
case 1: {
const [ code, ...rest ] = queue;
queue = rest;
if (code) {
// console.log('open', code);
ws.send(Uint8Array.from(code).buffer);
}
break;
}
default:
break;
}
window.requestAnimationFrame(flush);
};
window.flush = flush;
const keepWsOpen = () => {
if (wsCheckDelay > 0) {
wsCheckDelay -= 1;
} else {
wsCheckDelay = 100;
switch (ws.readyState) {
case 1: {
// console.log('sending ping');
// ws.send(Uint8Array.from([ 0, 0, 0, 0 ]).buffer);
break;
}
case 0:
case 2:
break;
case 3:
throw new Error('web socket has been closed');
buildWebSocket();
break;
}
}
window.requestAnimationFrame(keepWsOpen);
};
keepWsOpen();
flush();
const host_url = `${ location.protocol }//${ process.env.JIRS_SERVER_BIND }:${ process.env.JIRS_SERVER_PORT }`;
module.set_host_url(host_url);
module.render();
});

View File

@ -1,13 +1,18 @@
use seed::Method;
use wasm_bindgen::prelude::*;
use jirs_data::{UpdateIssuePayload, WsMsg};
use jirs_data::*;
use crate::shared::host_client;
use crate::Msg;
use seed::prelude::Closure;
use std::sync::Once;
use wasm_bindgen::JsCast;
pub fn send_ws_msg(msg: WsMsg) {
use crate::send_bin_code;
use wasm_bindgen::JsValue;
let binary = bincode::serialize(&msg).unwrap();
let data = JsValue::from_serde(&binary).unwrap();
send_bin_code(data);
}
pub async fn fetch_current_project(host_url: String) -> Result<Msg, Msg> {
match host_client(host_url, "/project") {
@ -53,93 +58,3 @@ pub async fn delete_issue(host_url: String, id: i32) -> Result<Msg, Msg> {
Err(e) => return Ok(Msg::InternalFailure(e)),
}
}
pub struct WebSocket {
ws: web_sys::WebSocket,
queue: Vec<WsMsg>,
}
impl Default for WebSocket {
fn default() -> WebSocket {
use js_sys::*;
use seed::prelude::*;
use web_sys::*;
let native = web_sys::WebSocket::new("ws://localhost:5000/ws/").unwrap();
native.set_binary_type(web_sys::BinaryType::Arraybuffer);
let onmessage_callback =
Closure::wrap(Box::new(move |e: MessageEvent| {}) as Box<dyn FnMut(MessageEvent)>);
native.set_onmessage(Some(onmessage_callback.as_ref().unchecked_ref()));
onmessage_callback.forget();
// let onerror_callback = Closure::wrap(Box::new(move |e: ErrorEvent| {
// seed::log!("error event: {:?}", e);
// }) as Box<dyn FnMut(ErrorEvent)>);
// native.set_onerror(Some(onerror_callback.as_ref().unchecked_ref()));
// onerror_callback.forget();
let cloned_ws = native.clone();
let onopen_callback = Closure::wrap(Box::new(move |_| {
seed::log!("socket opened");
match cloned_ws.send_with_str("ping") {
Ok(_) => seed::log!("message successfully sent"),
Err(err) => seed::log!("error sending message: {:?}", err),
}
}) as Box<dyn FnMut(JsValue)>);
native.set_onopen(Some(onopen_callback.as_ref().unchecked_ref()));
onopen_callback.forget();
Self {
ws: native,
queue: vec![],
}
}
}
impl WebSocket {
pub fn send_with_u8_array(&self, buffer: &[u8]) {
use seed::*;
self.ws
.send_with_u8_array(buffer)
.unwrap_or_else(|e| error!(e));
}
pub fn send(&mut self) {
use bincode;
for msg in self.queue.iter() {
let encoded: Vec<u8> = bincode::serialize(msg).unwrap();
self.send_with_u8_array(encoded.as_slice());
}
self.queue.clear();
}
}
static INIT_WS: Once = Once::new();
static mut WS: Option<WebSocket> = None;
pub fn ws() -> &'static mut WebSocket {
unsafe {
INIT_WS.call_once(|| WS = Some(WebSocket::default()));
let ws_ping = Box::new(|| match WS.as_mut().map(|ws| ws.ws.ready_state()) {
Some(0) => {}
Some(1) => {
ws_send(WsMsg::Ping);
WS.as_mut().unwrap().send();
}
_ => {
WS = Some(WebSocket::default());
}
}) as Box<dyn Fn()>;
seed::set_interval(ws_ping, 10_000);
WS.as_mut().unwrap()
}
}
// pub fn ws_received() {}
//
pub fn ws_send(msg: jirs_data::WsMsg) {
ws().queue.push(msg);
}

View File

@ -1,12 +1,10 @@
#[macro_use]
extern crate lazy_static;
use std::sync::RwLock;
use seed::fetch::FetchObject;
use seed::{prelude::*, *};
use jirs_data::IssueStatus;
use jirs_data::{IssueStatus, WsMsg};
use crate::api::ws;
use crate::model::{ModalType, Model, Page};
use crate::shared::styled_select::StyledSelectChange;
@ -19,6 +17,7 @@ mod project;
mod project_settings;
mod register;
mod shared;
mod ws;
pub type UserId = i32;
pub type IssueId = i32;
@ -38,6 +37,19 @@ pub enum FieldId {
DescriptionAddIssueModal,
}
impl std::fmt::Display for FieldId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
FieldId::IssueTypeEditModalTop => f.write_str("issueTypeEditModalTop"),
FieldId::TextFilterBoard => f.write_str("textFilterBoard"),
FieldId::CopyButtonLabel => f.write_str("copyButtonLabel"),
FieldId::IssueTypeAddIssueModal => f.write_str("issueTypeAddIssueModal"),
FieldId::SummaryAddIssueModal => f.write_str("summaryAddIssueModal"),
FieldId::DescriptionAddIssueModal => f.write_str("descriptionAddIssueModal"),
}
}
}
#[derive(Clone, Debug)]
pub enum FieldChange {
LinkCopied(FieldId, bool),
@ -78,18 +90,21 @@ pub enum Msg {
ModalOpened(ModalType),
ModalDropped,
ModalChanged(FieldChange),
WsMsg(jirs_data::WsMsg),
}
fn update(msg: Msg, model: &mut model::Model, orders: &mut impl Orders<Msg>) {
if cfg!(debug_assertions) {
log!(msg);
}
match msg {
match &msg {
Msg::ChangePage(page) => {
model.page = page;
model.page = page.clone();
}
_ => (),
}
crate::ws::update(&msg, model, orders);
crate::shared::update(&msg, model, orders);
crate::modal::update(&msg, model, orders);
match model.page {
@ -134,6 +149,7 @@ fn routes(url: Url) -> Option<Msg> {
}
pub static mut HOST_URL: String = String::new();
pub static mut APP: Option<RwLock<App<Msg, Model, Node<Msg>>>> = None;
#[wasm_bindgen]
pub fn set_host_url(url: String) {
@ -142,16 +158,45 @@ pub fn set_host_url(url: String) {
}
}
fn after_mount(_url: Url, _orders: &mut impl Orders<Msg>) -> AfterMount<Model> {
ws();
let model = Model::default();
AfterMount::new(model).url_handling(UrlHandling::None)
#[wasm_bindgen]
pub fn handle_ws_message(value: &wasm_bindgen::JsValue) {
let a = js_sys::Uint8Array::new(value);
let mut v = Vec::new();
for idx in 0..a.length() {
v.push(a.get_index(idx));
}
match bincode::deserialize(v.as_slice()) {
Ok(msg) => unsafe {
ws::handle(msg);
},
_ => (),
};
}
#[wasm_bindgen]
extern "C" {
pub fn send_bin_code(data: wasm_bindgen::JsValue);
}
#[wasm_bindgen]
pub fn render() {
App::builder(update, view)
use seed::*;
seed::set_interval(
Box::new(|| {
let binary = bincode::serialize(&jirs_data::WsMsg::Ping).unwrap();
let data = JsValue::from_serde(&binary).unwrap();
send_bin_code(data);
}) as Box<dyn Fn()>,
5000,
);
let app = seed::App::builder(update, view)
.routes(routes)
.after_mount(after_mount)
.build_and_start();
let cell_app = std::sync::RwLock::new(app);
unsafe {
APP = Some(cell_app);
};
}

View File

@ -48,6 +48,7 @@ pub fn view(_model: &Model, modal: &AddIssueModal) -> Node<Msg> {
.into_node();
let description = StyledTextarea::build()
.on_change(input_ev(Ev::Change, |_| Msg::NoOp))
.height(110)
.build(FieldId::DescriptionAddIssueModal)
.into_node();

View File

@ -108,6 +108,8 @@ pub struct Model {
pub host_url: String,
pub project_page: ProjectPage,
pub modals: Vec<ModalType>,
pub current_project: Option<Project>,
}
impl Default for Model {
@ -133,6 +135,7 @@ impl Default for Model {
dragged_issue_id: None,
},
modals: vec![],
current_project: None,
}
}
}

View File

@ -2,6 +2,7 @@ use seed::{prelude::*, *};
use jirs_data::*;
use crate::api::send_ws_msg;
use crate::model::{Model, Page};
use crate::shared::styled_avatar::StyledAvatar;
use crate::shared::styled_button::StyledButton;
@ -15,6 +16,8 @@ pub fn update(msg: Msg, model: &mut crate::model::Model, orders: &mut impl Order
Msg::ChangePage(Page::Project)
| Msg::ChangePage(Page::AddIssue)
| Msg::ChangePage(Page::EditIssue(..)) => {
send_ws_msg(jirs_data::WsMsg::ProjectRequest);
orders
.skip()
.perform_cmd(crate::api::fetch_current_project(model.host_url.clone()));
@ -176,7 +179,6 @@ fn project_board_filters(model: &Model) -> Node<Msg> {
.icon(Icon::Search)
.valid(true)
.on_change(input_ev(Ev::Change, |value| {
crate::api::ws_send(WsMsg::Ping);
Msg::ProjectTextFilterChanged(value)
}))
.build()

View File

@ -71,14 +71,14 @@ pub fn render(values: StyledInput) -> Node<Msg> {
on_change,
} = values;
let mut wrapper_class_list = vec!["styledInput"];
let mut wrapper_class_list = vec!["styledInput".to_string(), format!("{}", id)];
if !valid {
wrapper_class_list.push("invalid");
wrapper_class_list.push("invalid".to_string());
}
let mut input_class_list = vec!["inputElement"];
let mut input_class_list = vec!["inputElement".to_string()];
if icon.is_some() {
input_class_list.push("withIcon");
input_class_list.push("withIcon".to_string());
}
let icon = match icon {

30
jirs-client/src/ws/mod.rs Normal file
View File

@ -0,0 +1,30 @@
use std::sync::RwLock;
use seed::{prelude::*, *};
use jirs_data::WsMsg;
use crate::model::Model;
use crate::{model, Msg, APP, RECEIVED};
pub fn handle(msg: WsMsg) {
let app = match unsafe { APP.as_mut().unwrap() }.write() {
Ok(app) => app,
_ => return,
};
match msg {
WsMsg::Ping | WsMsg::Pong => {}
_ => app.update(Msg::WsMsg(msg)),
}
}
pub fn update(msg: &Msg, model: &mut model::Model, _orders: &mut impl Orders<Msg>) {
match msg {
Msg::WsMsg(WsMsg::ProjectLoaded(project)) => {
model.current_project = Some(project.clone());
log!(model);
}
_ => (),
}
}

View File

@ -387,8 +387,12 @@ pub struct UpdateProjectPayload {
pub category: Option<String>,
}
#[derive(Serialize, Deserialize, Debug, Copy, Clone)]
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum WsMsg {
Ping,
Pong,
ProjectRequest,
ProjectLoaded(Project),
ProjectIssuesRequest(i32),
ProjectIssuesLoaded(Vec<Issue>),
}

View File

@ -5,6 +5,7 @@ use jirs_data::ErrorResponse;
const TOKEN_NOT_FOUND: &str = "Token not found";
const DATABASE_CONNECTION_FAILED: &str = "Database connection failed";
#[derive(Debug)]
pub enum ServiceErrors {
Unauthorized,
DatabaseConnectionLost,

View File

@ -1,3 +1,5 @@
#![feature(async_closure)]
#[macro_use]
extern crate diesel;
#[macro_use]

View File

@ -1,27 +1,64 @@
use actix::{Actor, StreamHandler};
use actix::{Actor, Addr, StreamHandler};
use actix_web::web::Data;
use actix_web::{get, web, Error, HttpRequest, HttpResponse};
use actix_web_actors::ws;
struct MyWs;
use jirs_data::{Project, WsMsg};
impl Actor for MyWs {
use crate::db::projects::LoadCurrentProject;
use crate::db::DbExecutor;
struct WebSocketActor {
db: Data<Addr<DbExecutor>>,
}
impl Actor for WebSocketActor {
type Context = ws::WebsocketContext<Self>;
}
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for MyWs {
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WebSocketActor {
fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
use futures::executor::block_on;
match msg {
Ok(ws::Message::Ping(msg)) => ctx.pong(&msg),
Ok(ws::Message::Text(text)) => ctx.text(text),
Ok(ws::Message::Binary(bin)) => ctx.binary(bin),
Ok(ws::Message::Binary(bin)) => {
let ws_msg: bincode::Result<jirs_data::WsMsg> =
bincode::deserialize(bin.to_vec().as_slice());
match ws_msg {
Ok(WsMsg::Ping) => ctx.binary(bincode::serialize(&WsMsg::Pong).unwrap()),
Ok(WsMsg::Pong) => ctx.binary(bincode::serialize(&WsMsg::Ping).unwrap()),
Ok(WsMsg::ProjectRequest) => match block_on(load_project(self.db.clone())) {
Some(p) => {
ctx.binary(bincode::serialize(&WsMsg::ProjectLoaded(p)).unwrap())
}
_ => eprintln!("Failed to load project"),
},
_ => eprintln!("Failed to resolve message"),
};
}
_ => (),
}
}
}
#[get("/ws/")]
pub async fn index(req: HttpRequest, stream: web::Payload) -> Result<HttpResponse, Error> {
let resp = ws::start(MyWs {}, &req, stream);
println!("{:?}", resp);
resp
pub async fn load_project(db: Data<Addr<DbExecutor>>) -> Option<Project> {
match db.send(LoadCurrentProject { project_id: 1 }).await {
Ok(Ok(p)) => Some(p.into()),
Ok(e) => {
eprintln!("{:?}", e);
None
}
_ => None,
}
}
#[get("/ws/")]
pub async fn index(
req: HttpRequest,
stream: web::Payload,
db: Data<Addr<DbExecutor>>,
) -> Result<HttpResponse, Error> {
ws::start(WebSocketActor { db }, &req, stream)
}