Client lib

This commit is contained in:
eraden 2023-05-26 23:22:14 +02:00
parent a917b7c064
commit e7daeee326
4 changed files with 50 additions and 134 deletions

115
Cargo.lock generated
View File

@ -1889,12 +1889,6 @@ dependencies = [
"uuid 0.8.2", "uuid 0.8.2",
] ]
[[package]]
name = "enclose"
version = "1.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1056f553da426e9c025a662efa48b52e62e0a3a7648aa2d15aeaaf7f0d329357"
[[package]] [[package]]
name = "encode_unicode" name = "encode_unicode"
version = "0.3.6" version = "0.3.6"
@ -2501,29 +2495,6 @@ version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
[[package]]
name = "gloo-events"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "68b107f8abed8105e4182de63845afcc7b69c098b7852a813ea7462a320992fc"
dependencies = [
"wasm-bindgen",
"web-sys",
]
[[package]]
name = "gloo-file"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a8d5564e570a38b43d78bdc063374a0c3098c4f0d64005b12f9bbe87e869b6d7"
dependencies = [
"futures-channel",
"gloo-events",
"js-sys",
"wasm-bindgen",
"web-sys",
]
[[package]] [[package]]
name = "gloo-timers" name = "gloo-timers"
version = "0.2.6" version = "0.2.6"
@ -2536,19 +2507,6 @@ dependencies = [
"wasm-bindgen", "wasm-bindgen",
] ]
[[package]]
name = "gloo-utils"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a8e8fc851e9c7b9852508bc6e3f690f452f474417e8545ec9857b7f7377036b5"
dependencies = [
"js-sys",
"serde",
"serde_json",
"wasm-bindgen",
"web-sys",
]
[[package]] [[package]]
name = "gumdrop" name = "gumdrop"
version = "0.8.1" version = "0.8.1"
@ -4834,16 +4792,6 @@ dependencies = [
"serde_json", "serde_json",
] ]
[[package]]
name = "rust_decimal_macros"
version = "1.29.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0e773fd3da1ed42472fdf3cfdb4972948a555bc3d73f5e0bdb99d17e7b54c687"
dependencies = [
"quote",
"rust_decimal",
]
[[package]] [[package]]
name = "rustc-demangle" name = "rustc-demangle"
version = "0.1.23" version = "0.1.23"
@ -4943,16 +4891,6 @@ version = "1.0.12"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4f3208ce4d8448b3f3e7d168a73f5e0c43a61e32930de3bceeccedb388b6bf06" checksum = "4f3208ce4d8448b3f3e7d168a73f5e0c43a61e32930de3bceeccedb388b6bf06"
[[package]]
name = "rusty-money"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b28f881005eac7ad8d46b6f075da5f322bd7f4f83a38720fc069694ddadd683"
dependencies = [
"rust_decimal",
"rust_decimal_macros",
]
[[package]] [[package]]
name = "rusty_pool" name = "rusty_pool"
version = "0.7.0" version = "0.7.0"
@ -5069,28 +5007,6 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "seed"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c0e296ea0569d20467e9a1df3cb6ed66ce3b791a7eaf1e1110ae231f75e2b46"
dependencies = [
"enclose",
"futures",
"getrandom 0.2.9",
"gloo-file",
"gloo-timers",
"gloo-utils",
"indexmap",
"js-sys",
"rand 0.8.5",
"uuid 1.3.2",
"version_check",
"wasm-bindgen",
"wasm-bindgen-futures",
"web-sys",
]
[[package]] [[package]]
name = "self_cell" name = "self_cell"
version = "0.10.2" version = "0.10.2"
@ -5141,17 +5057,6 @@ dependencies = [
"serde_derive", "serde_derive",
] ]
[[package]]
name = "serde-wasm-bindgen"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f3b143e2833c57ab9ad3ea280d21fd34e285a42837aeb0ee301f4f41890fa00e"
dependencies = [
"js-sys",
"serde",
"wasm-bindgen",
]
[[package]] [[package]]
name = "serde_bytes" name = "serde_bytes"
version = "0.11.9" version = "0.11.9"
@ -7182,26 +7087,6 @@ dependencies = [
"wast 57.0.0", "wast 57.0.0",
] ]
[[package]]
name = "web"
version = "0.1.0"
dependencies = [
"chrono",
"gloo-timers",
"indexmap",
"js-sys",
"model",
"rusty-money",
"seed",
"serde",
"serde-wasm-bindgen",
"serde_json",
"thiserror",
"uuid 1.3.2",
"wasm-bindgen",
"web-sys",
]
[[package]] [[package]]
name = "web-sys" name = "web-sys"
version = "0.3.61" version = "0.3.61"

View File

@ -24,7 +24,7 @@ members = [
# artifacts # artifacts
# "crates/db-seed", # "crates/db-seed",
# "crates/api", # "crates/api",
"crates/web", # "crates/web",
# vendor # vendor
# "vendor/t_pay", # "vendor/t_pay",
# "vendor/pay_u", # "vendor/pay_u",
@ -37,7 +37,7 @@ members = [
"crates/redis-event-bus", "crates/redis-event-bus",
] ]
exclude = [ exclude = [
"crates/web"
] ]
[profile.release] [profile.release]

View File

@ -1,34 +1,58 @@
#![feature(async_fn_in_trait)] #![feature(async_fn_in_trait)]
use std::future::Future;
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use async_std::io::ReadExt;
use event_bus_adapter::*; use event_bus_adapter::*;
use futures_util::AsyncReadExt; use futures_util::AsyncWriteExt;
use tracing::{error, warn};
pub struct L {} pub struct MessageStream {
client: async_std::net::TcpStream,
pub struct MessageStream; }
impl futures::stream::Stream for MessageStream { impl futures::stream::Stream for MessageStream {
type Item = Message; type Item = Message;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
todo!(); let mut_self: &mut Self = unsafe { Pin::get_unchecked_mut(self) };
let mut buffer = Vec::with_capacity(4086);
let mut f = mut_self.client.read_to_end(&mut buffer);
let r = Pin::new(&mut f).poll(cx);
match r {
Poll::Ready(Ok(n)) => match Message::from_bytes(&buffer[..n]) {
Ok(msg) => Poll::Ready(Some(msg)),
_ => Poll::Ready(None),
},
Poll::Ready(Err(e)) => {
error!("Failed to read from message stream: {e}");
Poll::Ready(None)
}
Poll::Pending => Poll::Pending,
}
} }
} }
pub struct MessageSender; pub struct MessageSender {
client: async_std::net::TcpStream,
}
impl MessageSender { impl MessageSender {
pub async fn ack(&mut self) { pub async fn ack(&mut self) {
// self.send(Msg::Ack).await;
} }
} }
impl MessageSend for MessageSender { impl MessageSend for MessageSender {
async fn send(&mut self, msg: Msg) { async fn send(&mut self, msg: Msg) {
todo!() let Ok(v) = msg.to_bytes() else {
return;
};
if let Err(e) = self.client.write(&v).await {
warn!("Failed to write message {msg:?}: {e}");
}
} }
} }
@ -38,13 +62,20 @@ pub struct LebConfig {
port: u16, port: u16,
} }
impl EventBus<MessageStream, MessageSender> for L { pub struct LocalEventBus;
impl EventBus<MessageStream, MessageSender> for LocalEventBus {
async fn connect(config: Config) -> Result<(MessageStream, MessageSender), ()> { async fn connect(config: Config) -> Result<(MessageStream, MessageSender), ()> {
let config: LebConfig = config.config().expect("Invalid Local Event Bus config"); let config: LebConfig = config.config().expect("Invalid Local Event Bus config");
let client = async_std::net::TcpStream::connect(format!("{}:{}", config.bind, config.port)) let client = async_std::net::TcpStream::connect(format!("{}:{}", config.bind, config.port))
.await .await
.expect("Failed tp connect to event bus"); .expect("Failed tp connect to event bus");
todo!() Ok((
MessageStream {
client: client.clone(),
},
MessageSender { client },
))
} }
} }

View File

@ -97,14 +97,14 @@ async fn main() -> Result<(), Error> {
alive: Arc::new(RwLock::new(true)), alive: Arc::new(RwLock::new(true)),
offset: None, offset: None,
}; };
let mut buffer = [0; 4086]; let mut buffer = Vec::with_capacity(4086);
if let Err(e) = tx.send(InnerMsg::Register(client.clone())) { if let Err(e) = tx.send(InnerMsg::Register(client.clone())) {
warn!("Failed to send register: {e}"); warn!("Failed to send register: {e}");
} }
loop { loop {
let len = match AsyncReadExt::read(&mut client.stream, &mut buffer).await { let len = match AsyncReadExt::read_to_end(&mut client.stream, &mut buffer).await {
Ok(n) => n, Ok(n) => n,
Err(e) => { Err(e) => {
warn!("Failed to read from client: {e}"); warn!("Failed to read from client: {e}");
@ -203,7 +203,7 @@ async fn main() -> Result<(), Error> {
async fn broadcast( async fn broadcast(
clients: Arc<RwLock<HashMap<Uuid, Client>>>, clients: Arc<RwLock<HashMap<Uuid, Client>>>,
buffer: &mut [u8], buffer: &mut Vec<u8>,
offset: u64, offset: u64,
msg: Msg, msg: Msg,
) -> Result<(), Error> { ) -> Result<(), Error> {
@ -228,7 +228,7 @@ async fn broadcast(
async fn send_directly( async fn send_directly(
mut client: Client, mut client: Client,
buffer: &mut [u8], buffer: &mut Vec<u8>,
offset: u64, offset: u64,
msg: Msg, msg: Msg,
) -> Result<(), Error> { ) -> Result<(), Error> {
@ -249,8 +249,8 @@ async fn send_directly(
} }
} }
async fn read_msg(mut client: Client, buffer: &mut [u8]) -> Result<Option<Msg>, Error> { async fn read_msg(mut client: Client, buffer: &mut Vec<u8>) -> Result<Option<Msg>, Error> {
let len = match AsyncReadExt::read(&mut client.stream, buffer).await { let len = match AsyncReadExt::read_to_end(&mut client.stream, buffer).await {
Ok(n) => n, Ok(n) => n,
Err(e) => { Err(e) => {
warn!("Failed to read from client: {e}"); warn!("Failed to read from client: {e}");
@ -266,7 +266,7 @@ async fn read_msg(mut client: Client, buffer: &mut [u8]) -> Result<Option<Msg>,
} }
} }
async fn wait_ack(client: Client, buffer: &mut [u8]) -> Result<(), Error> { async fn wait_ack(client: Client, buffer: &mut Vec<u8>) -> Result<(), Error> {
loop { loop {
let msg = read_msg(client.clone(), buffer).await?; let msg = read_msg(client.clone(), buffer).await?;
if let Some(Msg::Ack) = msg { if let Some(Msg::Ack) = msg {