From e7daeee326f44816268c707cc368c3cfd1e2fea3 Mon Sep 17 00:00:00 2001 From: eraden Date: Fri, 26 May 2023 23:22:14 +0200 Subject: [PATCH] Client lib --- Cargo.lock | 115 ----------------------------- Cargo.toml | 4 +- crates/local-event-bus/src/lib.rs | 51 ++++++++++--- crates/local-event-bus/src/main.rs | 14 ++-- 4 files changed, 50 insertions(+), 134 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bf614c0..696c93f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1889,12 +1889,6 @@ dependencies = [ "uuid 0.8.2", ] -[[package]] -name = "enclose" -version = "1.1.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1056f553da426e9c025a662efa48b52e62e0a3a7648aa2d15aeaaf7f0d329357" - [[package]] name = "encode_unicode" version = "0.3.6" @@ -2501,29 +2495,6 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" -[[package]] -name = "gloo-events" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "68b107f8abed8105e4182de63845afcc7b69c098b7852a813ea7462a320992fc" -dependencies = [ - "wasm-bindgen", - "web-sys", -] - -[[package]] -name = "gloo-file" -version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a8d5564e570a38b43d78bdc063374a0c3098c4f0d64005b12f9bbe87e869b6d7" -dependencies = [ - "futures-channel", - "gloo-events", - "js-sys", - "wasm-bindgen", - "web-sys", -] - [[package]] name = "gloo-timers" version = "0.2.6" @@ -2536,19 +2507,6 @@ dependencies = [ "wasm-bindgen", ] -[[package]] -name = "gloo-utils" -version = "0.1.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a8e8fc851e9c7b9852508bc6e3f690f452f474417e8545ec9857b7f7377036b5" -dependencies = [ - "js-sys", - "serde", - "serde_json", - "wasm-bindgen", - "web-sys", -] - [[package]] name = "gumdrop" version = "0.8.1" @@ -4834,16 +4792,6 @@ dependencies = [ "serde_json", ] -[[package]] -name = "rust_decimal_macros" -version = "1.29.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e773fd3da1ed42472fdf3cfdb4972948a555bc3d73f5e0bdb99d17e7b54c687" -dependencies = [ - "quote", - "rust_decimal", -] - [[package]] name = "rustc-demangle" version = "0.1.23" @@ -4943,16 +4891,6 @@ version = "1.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4f3208ce4d8448b3f3e7d168a73f5e0c43a61e32930de3bceeccedb388b6bf06" -[[package]] -name = "rusty-money" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b28f881005eac7ad8d46b6f075da5f322bd7f4f83a38720fc069694ddadd683" -dependencies = [ - "rust_decimal", - "rust_decimal_macros", -] - [[package]] name = "rusty_pool" version = "0.7.0" @@ -5069,28 +5007,6 @@ dependencies = [ "libc", ] -[[package]] -name = "seed" -version = "0.10.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c0e296ea0569d20467e9a1df3cb6ed66ce3b791a7eaf1e1110ae231f75e2b46" -dependencies = [ - "enclose", - "futures", - "getrandom 0.2.9", - "gloo-file", - "gloo-timers", - "gloo-utils", - "indexmap", - "js-sys", - "rand 0.8.5", - "uuid 1.3.2", - "version_check", - "wasm-bindgen", - "wasm-bindgen-futures", - "web-sys", -] - [[package]] name = "self_cell" version = "0.10.2" @@ -5141,17 +5057,6 @@ dependencies = [ "serde_derive", ] -[[package]] -name = "serde-wasm-bindgen" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f3b143e2833c57ab9ad3ea280d21fd34e285a42837aeb0ee301f4f41890fa00e" -dependencies = [ - "js-sys", - "serde", - "wasm-bindgen", -] - [[package]] name = "serde_bytes" version = "0.11.9" @@ -7182,26 +7087,6 @@ dependencies = [ "wast 57.0.0", ] -[[package]] -name = "web" -version = "0.1.0" -dependencies = [ - "chrono", - "gloo-timers", - "indexmap", - "js-sys", - "model", - "rusty-money", - "seed", - "serde", - "serde-wasm-bindgen", - "serde_json", - "thiserror", - "uuid 1.3.2", - "wasm-bindgen", - "web-sys", -] - [[package]] name = "web-sys" version = "0.3.61" diff --git a/Cargo.toml b/Cargo.toml index 3a8fd54..fa3230a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,7 +24,7 @@ members = [ # artifacts # "crates/db-seed", # "crates/api", - "crates/web", +# "crates/web", # vendor # "vendor/t_pay", # "vendor/pay_u", @@ -37,7 +37,7 @@ members = [ "crates/redis-event-bus", ] exclude = [ - + "crates/web" ] [profile.release] diff --git a/crates/local-event-bus/src/lib.rs b/crates/local-event-bus/src/lib.rs index 888d80b..4aa10a9 100644 --- a/crates/local-event-bus/src/lib.rs +++ b/crates/local-event-bus/src/lib.rs @@ -1,34 +1,58 @@ #![feature(async_fn_in_trait)] +use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; +use async_std::io::ReadExt; use event_bus_adapter::*; -use futures_util::AsyncReadExt; +use futures_util::AsyncWriteExt; +use tracing::{error, warn}; -pub struct L {} - -pub struct MessageStream; +pub struct MessageStream { + client: async_std::net::TcpStream, +} impl futures::stream::Stream for MessageStream { type Item = Message; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - todo!(); + let mut_self: &mut Self = unsafe { Pin::get_unchecked_mut(self) }; + let mut buffer = Vec::with_capacity(4086); + let mut f = mut_self.client.read_to_end(&mut buffer); + let r = Pin::new(&mut f).poll(cx); + match r { + Poll::Ready(Ok(n)) => match Message::from_bytes(&buffer[..n]) { + Ok(msg) => Poll::Ready(Some(msg)), + _ => Poll::Ready(None), + }, + Poll::Ready(Err(e)) => { + error!("Failed to read from message stream: {e}"); + Poll::Ready(None) + } + Poll::Pending => Poll::Pending, + } } } -pub struct MessageSender; +pub struct MessageSender { + client: async_std::net::TcpStream, +} impl MessageSender { pub async fn ack(&mut self) { - // + self.send(Msg::Ack).await; } } impl MessageSend for MessageSender { async fn send(&mut self, msg: Msg) { - todo!() + let Ok(v) = msg.to_bytes() else { + return; + }; + if let Err(e) = self.client.write(&v).await { + warn!("Failed to write message {msg:?}: {e}"); + } } } @@ -38,13 +62,20 @@ pub struct LebConfig { port: u16, } -impl EventBus for L { +pub struct LocalEventBus; + +impl EventBus for LocalEventBus { async fn connect(config: Config) -> Result<(MessageStream, MessageSender), ()> { let config: LebConfig = config.config().expect("Invalid Local Event Bus config"); let client = async_std::net::TcpStream::connect(format!("{}:{}", config.bind, config.port)) .await .expect("Failed tp connect to event bus"); - todo!() + Ok(( + MessageStream { + client: client.clone(), + }, + MessageSender { client }, + )) } } diff --git a/crates/local-event-bus/src/main.rs b/crates/local-event-bus/src/main.rs index 9f5e342..9e370db 100644 --- a/crates/local-event-bus/src/main.rs +++ b/crates/local-event-bus/src/main.rs @@ -97,14 +97,14 @@ async fn main() -> Result<(), Error> { alive: Arc::new(RwLock::new(true)), offset: None, }; - let mut buffer = [0; 4086]; + let mut buffer = Vec::with_capacity(4086); if let Err(e) = tx.send(InnerMsg::Register(client.clone())) { warn!("Failed to send register: {e}"); } loop { - let len = match AsyncReadExt::read(&mut client.stream, &mut buffer).await { + let len = match AsyncReadExt::read_to_end(&mut client.stream, &mut buffer).await { Ok(n) => n, Err(e) => { warn!("Failed to read from client: {e}"); @@ -203,7 +203,7 @@ async fn main() -> Result<(), Error> { async fn broadcast( clients: Arc>>, - buffer: &mut [u8], + buffer: &mut Vec, offset: u64, msg: Msg, ) -> Result<(), Error> { @@ -228,7 +228,7 @@ async fn broadcast( async fn send_directly( mut client: Client, - buffer: &mut [u8], + buffer: &mut Vec, offset: u64, msg: Msg, ) -> Result<(), Error> { @@ -249,8 +249,8 @@ async fn send_directly( } } -async fn read_msg(mut client: Client, buffer: &mut [u8]) -> Result, Error> { - let len = match AsyncReadExt::read(&mut client.stream, buffer).await { +async fn read_msg(mut client: Client, buffer: &mut Vec) -> Result, Error> { + let len = match AsyncReadExt::read_to_end(&mut client.stream, buffer).await { Ok(n) => n, Err(e) => { warn!("Failed to read from client: {e}"); @@ -266,7 +266,7 @@ async fn read_msg(mut client: Client, buffer: &mut [u8]) -> Result, } } -async fn wait_ack(client: Client, buffer: &mut [u8]) -> Result<(), Error> { +async fn wait_ack(client: Client, buffer: &mut Vec) -> Result<(), Error> { loop { let msg = read_msg(client.clone(), buffer).await?; if let Some(Msg::Ack) = msg {