diff --git a/Cargo.lock b/Cargo.lock index 696c93f..1302301 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3370,6 +3370,7 @@ dependencies = [ "futures", "futures-util", "gumdrop", + "memmap2", "serde", "serde_json", "thiserror", @@ -3482,6 +3483,15 @@ dependencies = [ "rustix 0.37.19", ] +[[package]] +name = "memmap2" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d28bba84adfe6646737845bc5ebbfa2c08424eb1c37e94a1fd2a82adb56a872" +dependencies = [ + "libc", +] + [[package]] name = "memoffset" version = "0.6.5" diff --git a/crates/event-bus-adapter/src/lib.rs b/crates/event-bus-adapter/src/lib.rs index 70288bf..4cf3b39 100644 --- a/crates/event-bus-adapter/src/lib.rs +++ b/crates/event-bus-adapter/src/lib.rs @@ -3,11 +3,18 @@ pub use event_bus_messages::{Message, Msg}; #[derive(Debug, thiserror::Error)] -pub enum EBError {} +pub enum EBError { + #[error("Failed to send message")] + SendFailed, + #[error("Failed to send Pong")] + PongFailed, + #[error("Connection is closed")] + Closed, +} pub type EBResult = Result; -pub struct Config(String); +pub struct Config(pub String); impl Config { pub fn config(self) -> Result { @@ -16,7 +23,7 @@ impl Config { } pub trait MessageSend { - async fn send(&mut self, msg: Msg); + async fn send(&mut self, msg: Msg) -> EBResult<()>; } pub trait EventBus diff --git a/crates/event-bus-messages/src/lib.rs b/crates/event-bus-messages/src/lib.rs index 61388bb..663a40a 100644 --- a/crates/event-bus-messages/src/lib.rs +++ b/crates/event-bus-messages/src/lib.rs @@ -4,8 +4,10 @@ pub enum Msg { Ping, Pong, Ack, - Test1, - Test2, + Acked, + Test1(usize, usize, usize, usize), + Test2(u8, u16, u32, char, usize), + LenControl(usize, usize, usize, usize), } impl Msg { diff --git a/crates/local-event-bus/Cargo.toml b/crates/local-event-bus/Cargo.toml index e99ee16..f509ca6 100644 --- a/crates/local-event-bus/Cargo.toml +++ b/crates/local-event-bus/Cargo.toml @@ -32,3 +32,4 @@ bincode = { version = "1" } uuid = { version = "1.3.2", features = ['v4'] } crossbeam-channel = { version = "0.5.8" } tracing-subscriber = { version = "0.3.17", features = ['env-filter'] } +memmap2 = { version = "0.6.2" } diff --git a/crates/local-event-bus/src/check1.rs b/crates/local-event-bus/src/check1.rs index a204e8a..f2c2498 100644 --- a/crates/local-event-bus/src/check1.rs +++ b/crates/local-event-bus/src/check1.rs @@ -1,4 +1,39 @@ +use async_std::stream::StreamExt; +use async_std::task::spawn; +use event_bus_adapter::{Config, EventBus, MessageSend, Msg}; +use leb::{LocalEventBus, MessageSender, MessageStream}; +use tracing::info; + #[async_std::main] async fn main() { - -} \ No newline at end of file + tracing_subscriber::fmt::init(); + + let config = Config("bind = \"localhost\"\nport = 8686".into()); + let (mut stream, mut sender) = + >::connect(config) + .await + .unwrap(); + + let mut sender2 = sender.clone(); + + info!("Send seek 0"); + MessageSend::send(&mut sender, Msg::Seek(0)).await.unwrap(); + info!("After seek 0"); + + info!("Send test1"); + MessageSend::send(&mut sender, Msg::Test1(1, 2, 3, 4)) + .await + .unwrap(); + info!("after test1"); + + info!("Send test2"); + MessageSend::send(&mut sender, Msg::Test2(1, 2, 3, 'a', 5)) + .await + .unwrap(); + info!("after test2"); + + while let Some(msg) = stream.next().await { + info!("Received: {msg:?}"); + sender2.ack().await.unwrap(); + } +} diff --git a/crates/local-event-bus/src/lib.rs b/crates/local-event-bus/src/lib.rs index 4aa10a9..ef9cddc 100644 --- a/crates/local-event-bus/src/lib.rs +++ b/crates/local-event-bus/src/lib.rs @@ -2,15 +2,18 @@ use std::future::Future; use std::pin::Pin; +use std::sync::{Arc, RwLock}; use std::task::{Context, Poll}; use async_std::io::ReadExt; +use async_std::task::{sleep, spawn}; use event_bus_adapter::*; use futures_util::AsyncWriteExt; -use tracing::{error, warn}; +use tracing::{debug, error, warn}; pub struct MessageStream { client: async_std::net::TcpStream, + connected: Arc>, } impl futures::stream::Stream for MessageStream { @@ -18,11 +21,29 @@ impl futures::stream::Stream for MessageStream { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut_self: &mut Self = unsafe { Pin::get_unchecked_mut(self) }; - let mut buffer = Vec::with_capacity(4086); - let mut f = mut_self.client.read_to_end(&mut buffer); + + if !*mut_self.connected.read().unwrap() { + return Poll::Ready(None); + } + + const LEN_SIZE: usize = std::mem::size_of::(); + let mut len_buf = [0; LEN_SIZE]; + + let mut f = mut_self.client.read_exact(&mut len_buf); + let r = Pin::new(&mut f).poll(cx); + let len = match r { + Poll::Ready(Ok(())) => usize::from_le_bytes(len_buf), + Poll::Ready(Err(e)) => { + error!("Failed to read from message stream: {e}"); + return Poll::Ready(None); + } + Poll::Pending => return Poll::Pending, + }; + let mut buffer = vec![0; len]; + let mut f = mut_self.client.read_exact(&mut buffer[..len]); let r = Pin::new(&mut f).poll(cx); match r { - Poll::Ready(Ok(n)) => match Message::from_bytes(&buffer[..n]) { + Poll::Ready(Ok(())) => match Message::from_bytes(&buffer[..]) { Ok(msg) => Poll::Ready(Some(msg)), _ => Poll::Ready(None), }, @@ -35,24 +56,41 @@ impl futures::stream::Stream for MessageStream { } } +#[derive(Debug, Clone)] pub struct MessageSender { client: async_std::net::TcpStream, + connected: Arc>, } impl MessageSender { - pub async fn ack(&mut self) { - self.send(Msg::Ack).await; + pub async fn ack(&mut self) -> EBResult<()> { + self.send(Msg::Ack).await } } impl MessageSend for MessageSender { - async fn send(&mut self, msg: Msg) { - let Ok(v) = msg.to_bytes() else { - return; - }; - if let Err(e) = self.client.write(&v).await { - warn!("Failed to write message {msg:?}: {e}"); + async fn send(&mut self, msg: Msg) -> EBResult<()> { + if !*self.connected.read().unwrap() { + return Err(EBError::Closed); } + let Ok(msg_buf) = msg.to_bytes() else { + return Err(EBError::SendFailed); + }; + let len = msg_buf.len(); + let mut len_buf = len.to_le_bytes().to_vec(); + len_buf.extend_from_slice(&msg_buf); + debug!("Sending {msg:?} with {len} as {len_buf:?}"); + // debug!("Parts {len_buf:?} {msg_buf:?}"); + + if let Err(e) = self.client.write(&len_buf).await { + warn!("Failed to write message {msg:?}: {e}"); + return Err(EBError::SendFailed); + } + // if let Err(e) = self.client.write(&v).await { + // warn!("Failed to write message {msg:?}: {e}"); + // return Err(EBError::SendFailed); + // } + Ok(()) } } @@ -70,12 +108,33 @@ impl EventBus for LocalEventBus { let client = async_std::net::TcpStream::connect(format!("{}:{}", config.bind, config.port)) .await .expect("Failed tp connect to event bus"); + client.set_nodelay(false).unwrap(); - Ok(( - MessageStream { - client: client.clone(), - }, - MessageSender { client }, - )) + let connected = Arc::new(RwLock::new(true)); + let stream = MessageStream { + client: client.clone(), + connected: connected.clone(), + }; + let sender = MessageSender { + client, + connected: connected.clone(), + }; + + { + let connected = connected.clone(); + let mut sender = sender.clone(); + spawn(async move { + loop { + if sender.send(Msg::Pong).await.is_err() { + *connected.write().unwrap() = false; + break; + } else { + sleep(std::time::Duration::from_millis(300)).await; + } + } + }) + }; + + Ok((stream, sender)) } } diff --git a/crates/local-event-bus/src/main.rs b/crates/local-event-bus/src/main.rs index 9e370db..500d0a6 100644 --- a/crates/local-event-bus/src/main.rs +++ b/crates/local-event-bus/src/main.rs @@ -1,17 +1,21 @@ use std::collections::HashMap; +use std::net::Shutdown; use std::sync::{Arc, RwLock}; use async_std::net::{TcpListener, TcpStream}; +use async_std::task::sleep; use event_bus_adapter::*; use flumedb::*; use futures_util::{AsyncReadExt, AsyncWriteExt, StreamExt}; use gumdrop::Options; -use tracing::{error, info, warn}; +use tracing::{debug, error, info, warn}; use tracing_subscriber::EnvFilter; use uuid::Uuid; #[derive(Debug, thiserror::Error)] enum Error { + #[error("Reserved")] + Reversed, #[error("Client closed connection")] BrokenPipe, } @@ -77,10 +81,12 @@ async fn main() -> Result<(), Error> { let clients = clients.clone(); async_std::task::spawn(async move { - let Ok(stream) = stream else { + let Ok(mut stream) = stream else { return; }; - let mut log = OffsetLog::::from_file( + stream.set_nodelay(false).unwrap(); + + let mut log = OffsetLog::::from_file( std::fs::OpenOptions::new() .append(true) .create(true) @@ -103,21 +109,11 @@ async fn main() -> Result<(), Error> { warn!("Failed to send register: {e}"); } - loop { - let len = match AsyncReadExt::read_to_end(&mut client.stream, &mut buffer).await { - Ok(n) => n, - Err(e) => { - warn!("Failed to read from client: {e}"); - break; - } - }; - let msg = match Msg::from_bytes(&buffer[..len]) { - Ok(msg) => msg, - Err(e) => { - warn!("Invalid incoming message {buffer:?}: {e}"); - continue; - } + while client.is_alive() { + let Ok(Some(msg)) = read_msg(client.clone(), &mut buffer).await else { + continue; }; + match &msg { Msg::Ping | Msg::Pong => { if let Err(e) = client @@ -126,26 +122,27 @@ async fn main() -> Result<(), Error> { .await { warn!("Failed to write to client. Closing. {e}"); + sleep(std::time::Duration::from_millis(300)).await; client.stream.close().await.ok(); break; } } Msg::Seek(offset) => { - let log_file = match OffsetLog::::open_read_only(&path) { + let log_file = match OffsetLog::::open_read_only(&path) { Ok(f) => f, Err(e) => { error!("Failed to open file for read: {e}"); continue; } }; - let offset = (*offset) as u64; - client.offset = Some(Arc::new(RwLock::new(offset))); + let mut offset = (*offset) as u64; let iter = log_file.bidir_iter_at_offset(offset); let mut iter = iter.filter_map(|e| Msg::from_bytes(&e.data).ok().zip(Some(e.offset))); - while let Some((msg, offset)) = iter.next() { - match send_directly(client.clone(), &mut buffer, offset, msg).await { + + while let Some((msg, new_offset)) = iter.next() { + match send_directly(client.clone(), new_offset, msg).await { Err(Error::BrokenPipe) => break, Err(e) => { warn!("Failed to send message: {e}"); @@ -157,12 +154,11 @@ async fn main() -> Result<(), Error> { { break; } - if let Some(o) = client.offset.as_mut() { - *o.write().unwrap() = offset; - } + offset = new_offset; } }; } + client.offset = Some(Arc::new(RwLock::new(offset))); } _ => { info!("Incoming message: {msg:?}"); @@ -177,7 +173,8 @@ async fn main() -> Result<(), Error> { break; } }; - let offset = match log.append(&bytes) { + debug!("Msg bytes {:?}", bytes.as_slice()); + let offset = match log.append(&bytes[..]) { Ok(offset) => offset, Err(e) => { error!("Failed to write message to file: {e}"); @@ -191,7 +188,7 @@ async fn main() -> Result<(), Error> { }; } - *client.alive.write().unwrap() = false; + client.close(); if let Err(e) = tx.send(InnerMsg::Drop(client.id)) { warn!("Failed to send close client: {e}"); } @@ -216,7 +213,7 @@ async fn broadcast( .collect::>(); for client in clients { let id = client.id; - if let Err(e) = send_directly(client.clone(), buffer, offset, msg.clone()).await { + if let Err(e) = send_directly(client.clone(), offset, msg.clone()).await { warn!("Failed to send message to {}: {e}", id); } if let Err(Error::BrokenPipe) = wait_ack(client, buffer).await { @@ -226,23 +223,26 @@ async fn broadcast( Ok(()) } -async fn send_directly( - mut client: Client, - buffer: &mut Vec, - offset: u64, - msg: Msg, -) -> Result<(), Error> { - if let Err(e) = (Message { +async fn send_directly(mut client: Client, offset: u64, msg: Msg) -> Result<(), Error> { + let msg_buf = match (Message { offset, payload: msg, }) - .to_bytes_into(buffer) + .to_bytes() { - warn!("Failed to serialize message: {e}"); - return Ok(()); + Ok(v) => v, + Err(e) => { + warn!("Failed to serialize message: {e}"); + return Ok(()); + } }; - if let Err(e) = client.stream.write(buffer).await { + let len = msg_buf.len(); + let mut len_buf = len.to_le_bytes().to_vec(); + len_buf.extend_from_slice(&msg_buf); + + if let Err(e) = client.stream.write(&len_buf).await { warn!("Failed to write message to client: {e}"); + client.close(); Err(Error::BrokenPipe) } else { Ok(()) @@ -250,15 +250,45 @@ async fn send_directly( } async fn read_msg(mut client: Client, buffer: &mut Vec) -> Result, Error> { - let len = match AsyncReadExt::read_to_end(&mut client.stream, buffer).await { + const LEN_SIZE: usize = std::mem::size_of::(); + + let mut len_buf = [0; LEN_SIZE]; + let len = match AsyncReadExt::read_exact(&mut client.stream, &mut len_buf).await { + Ok(()) => { + // debug!("Received length bytes {len_buf:?}"); + usize::from_le_bytes(len_buf) + } + Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => { + client.close(); + return Err(Error::BrokenPipe); + } + Err(e) => { + warn!("Failed to read msg len from client: {e}"); + return Err(Error::BrokenPipe); + } + }; + + buffer.clear(); + buffer.reserve(len); + buffer.resize(len, 0); + // debug!("Length is {len}"); + // debug!("FRAME: {len_buf:?} {buffer:?}"); + + let _l = match AsyncReadExt::read_exact(&mut client.stream, &mut buffer[..len]).await { Ok(n) => n, Err(e) => { warn!("Failed to read from client: {e}"); return Err(Error::BrokenPipe); } }; - match Msg::from_bytes(&buffer[..len]) { - Ok(msg) => Ok(Some(msg)), + // debug!("Incoming data: {len} {buffer:?}"); + match Msg::from_bytes(&buffer) { + Ok(msg) => { + if !matches!(msg, Msg::Pong) { + debug!("Message {msg:?}"); + } + Ok(Some(msg)) + } Err(e) => { warn!("Invalid incoming message {buffer:?}: {e}"); return Ok(None); @@ -268,6 +298,7 @@ async fn read_msg(mut client: Client, buffer: &mut Vec) -> Result) -> Result<(), Error> { loop { + buffer.clear(); let msg = read_msg(client.clone(), buffer).await?; if let Some(Msg::Ack) = msg { break; @@ -288,3 +319,14 @@ struct Client { alive: Arc>, offset: Option>>, } + +impl Client { + fn close(&mut self) { + *self.alive.write().unwrap() = false; + self.stream.shutdown(Shutdown::Both).ok(); + } + + fn is_alive(&self) -> bool { + *self.alive.read().unwrap() + } +} diff --git a/crates/redis-event-bus/src/lib.rs b/crates/redis-event-bus/src/lib.rs index c8deb13..f22c983 100644 --- a/crates/redis-event-bus/src/lib.rs +++ b/crates/redis-event-bus/src/lib.rs @@ -3,7 +3,7 @@ use std::pin::Pin; use std::task::{Context, Poll}; -use event_bus_adapter::{Config, EventBus, Message, MessageSend, Msg}; +use event_bus_adapter::{Config, EBError, EBResult, EventBus, Message, MessageSend, Msg}; use futures_util::stream::{SplitSink, SplitStream}; use futures_util::{SinkExt, StreamExt}; use redis_async::client::connect::RespConnection; @@ -14,7 +14,7 @@ use tracing::warn; pub struct MessageSender(SplitSink); impl MessageSend for MessageSender { - async fn send(&mut self, msg: Msg) { + async fn send(&mut self, msg: Msg) -> EBResult<()> { match (Message { payload: msg, offset: 0, @@ -24,12 +24,15 @@ impl MessageSend for MessageSender { Ok(v) => { if let Err(e) = self.0.send(BulkString(v)).await { warn!("Failed to send serialized message: {e}"); + return Err(EBError::SendFailed); } } Err(e) => { warn!("Failed to serialize message while sending: {e}"); + return Err(EBError::SendFailed); } } + Ok(()) } } diff --git a/crates/token_manager/src/context.rs b/crates/token_manager/src/context.rs index e69de29..8b13789 100644 --- a/crates/token_manager/src/context.rs +++ b/crates/token_manager/src/context.rs @@ -0,0 +1 @@ +