Framed messages
This commit is contained in:
parent
e7daeee326
commit
4a6ce7c366
10
Cargo.lock
generated
10
Cargo.lock
generated
@ -3370,6 +3370,7 @@ dependencies = [
|
|||||||
"futures",
|
"futures",
|
||||||
"futures-util",
|
"futures-util",
|
||||||
"gumdrop",
|
"gumdrop",
|
||||||
|
"memmap2",
|
||||||
"serde",
|
"serde",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
"thiserror",
|
"thiserror",
|
||||||
@ -3482,6 +3483,15 @@ dependencies = [
|
|||||||
"rustix 0.37.19",
|
"rustix 0.37.19",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "memmap2"
|
||||||
|
version = "0.6.2"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "6d28bba84adfe6646737845bc5ebbfa2c08424eb1c37e94a1fd2a82adb56a872"
|
||||||
|
dependencies = [
|
||||||
|
"libc",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "memoffset"
|
name = "memoffset"
|
||||||
version = "0.6.5"
|
version = "0.6.5"
|
||||||
|
@ -3,11 +3,18 @@
|
|||||||
pub use event_bus_messages::{Message, Msg};
|
pub use event_bus_messages::{Message, Msg};
|
||||||
|
|
||||||
#[derive(Debug, thiserror::Error)]
|
#[derive(Debug, thiserror::Error)]
|
||||||
pub enum EBError {}
|
pub enum EBError {
|
||||||
|
#[error("Failed to send message")]
|
||||||
|
SendFailed,
|
||||||
|
#[error("Failed to send Pong")]
|
||||||
|
PongFailed,
|
||||||
|
#[error("Connection is closed")]
|
||||||
|
Closed,
|
||||||
|
}
|
||||||
|
|
||||||
pub type EBResult<T> = Result<T, EBError>;
|
pub type EBResult<T> = Result<T, EBError>;
|
||||||
|
|
||||||
pub struct Config(String);
|
pub struct Config(pub String);
|
||||||
|
|
||||||
impl Config {
|
impl Config {
|
||||||
pub fn config<S: serde::de::DeserializeOwned>(self) -> Result<S, toml::de::Error> {
|
pub fn config<S: serde::de::DeserializeOwned>(self) -> Result<S, toml::de::Error> {
|
||||||
@ -16,7 +23,7 @@ impl Config {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub trait MessageSend {
|
pub trait MessageSend {
|
||||||
async fn send(&mut self, msg: Msg);
|
async fn send(&mut self, msg: Msg) -> EBResult<()>;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub trait EventBus<Stream, Sender>
|
pub trait EventBus<Stream, Sender>
|
||||||
|
@ -4,8 +4,10 @@ pub enum Msg {
|
|||||||
Ping,
|
Ping,
|
||||||
Pong,
|
Pong,
|
||||||
Ack,
|
Ack,
|
||||||
Test1,
|
Acked,
|
||||||
Test2,
|
Test1(usize, usize, usize, usize),
|
||||||
|
Test2(u8, u16, u32, char, usize),
|
||||||
|
LenControl(usize, usize, usize, usize),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Msg {
|
impl Msg {
|
||||||
|
@ -32,3 +32,4 @@ bincode = { version = "1" }
|
|||||||
uuid = { version = "1.3.2", features = ['v4'] }
|
uuid = { version = "1.3.2", features = ['v4'] }
|
||||||
crossbeam-channel = { version = "0.5.8" }
|
crossbeam-channel = { version = "0.5.8" }
|
||||||
tracing-subscriber = { version = "0.3.17", features = ['env-filter'] }
|
tracing-subscriber = { version = "0.3.17", features = ['env-filter'] }
|
||||||
|
memmap2 = { version = "0.6.2" }
|
||||||
|
@ -1,4 +1,39 @@
|
|||||||
|
use async_std::stream::StreamExt;
|
||||||
|
use async_std::task::spawn;
|
||||||
|
use event_bus_adapter::{Config, EventBus, MessageSend, Msg};
|
||||||
|
use leb::{LocalEventBus, MessageSender, MessageStream};
|
||||||
|
use tracing::info;
|
||||||
|
|
||||||
#[async_std::main]
|
#[async_std::main]
|
||||||
async fn main() {
|
async fn main() {
|
||||||
|
tracing_subscriber::fmt::init();
|
||||||
|
|
||||||
|
let config = Config("bind = \"localhost\"\nport = 8686".into());
|
||||||
|
let (mut stream, mut sender) =
|
||||||
|
<LocalEventBus as EventBus<MessageStream, MessageSender>>::connect(config)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let mut sender2 = sender.clone();
|
||||||
|
|
||||||
|
info!("Send seek 0");
|
||||||
|
MessageSend::send(&mut sender, Msg::Seek(0)).await.unwrap();
|
||||||
|
info!("After seek 0");
|
||||||
|
|
||||||
|
info!("Send test1");
|
||||||
|
MessageSend::send(&mut sender, Msg::Test1(1, 2, 3, 4))
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
info!("after test1");
|
||||||
|
|
||||||
|
info!("Send test2");
|
||||||
|
MessageSend::send(&mut sender, Msg::Test2(1, 2, 3, 'a', 5))
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
info!("after test2");
|
||||||
|
|
||||||
|
while let Some(msg) = stream.next().await {
|
||||||
|
info!("Received: {msg:?}");
|
||||||
|
sender2.ack().await.unwrap();
|
||||||
|
}
|
||||||
}
|
}
|
@ -2,15 +2,18 @@
|
|||||||
|
|
||||||
use std::future::Future;
|
use std::future::Future;
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
|
use std::sync::{Arc, RwLock};
|
||||||
use std::task::{Context, Poll};
|
use std::task::{Context, Poll};
|
||||||
|
|
||||||
use async_std::io::ReadExt;
|
use async_std::io::ReadExt;
|
||||||
|
use async_std::task::{sleep, spawn};
|
||||||
use event_bus_adapter::*;
|
use event_bus_adapter::*;
|
||||||
use futures_util::AsyncWriteExt;
|
use futures_util::AsyncWriteExt;
|
||||||
use tracing::{error, warn};
|
use tracing::{debug, error, warn};
|
||||||
|
|
||||||
pub struct MessageStream {
|
pub struct MessageStream {
|
||||||
client: async_std::net::TcpStream,
|
client: async_std::net::TcpStream,
|
||||||
|
connected: Arc<RwLock<bool>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl futures::stream::Stream for MessageStream {
|
impl futures::stream::Stream for MessageStream {
|
||||||
@ -18,11 +21,29 @@ impl futures::stream::Stream for MessageStream {
|
|||||||
|
|
||||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||||
let mut_self: &mut Self = unsafe { Pin::get_unchecked_mut(self) };
|
let mut_self: &mut Self = unsafe { Pin::get_unchecked_mut(self) };
|
||||||
let mut buffer = Vec::with_capacity(4086);
|
|
||||||
let mut f = mut_self.client.read_to_end(&mut buffer);
|
if !*mut_self.connected.read().unwrap() {
|
||||||
|
return Poll::Ready(None);
|
||||||
|
}
|
||||||
|
|
||||||
|
const LEN_SIZE: usize = std::mem::size_of::<usize>();
|
||||||
|
let mut len_buf = [0; LEN_SIZE];
|
||||||
|
|
||||||
|
let mut f = mut_self.client.read_exact(&mut len_buf);
|
||||||
|
let r = Pin::new(&mut f).poll(cx);
|
||||||
|
let len = match r {
|
||||||
|
Poll::Ready(Ok(())) => usize::from_le_bytes(len_buf),
|
||||||
|
Poll::Ready(Err(e)) => {
|
||||||
|
error!("Failed to read from message stream: {e}");
|
||||||
|
return Poll::Ready(None);
|
||||||
|
}
|
||||||
|
Poll::Pending => return Poll::Pending,
|
||||||
|
};
|
||||||
|
let mut buffer = vec![0; len];
|
||||||
|
let mut f = mut_self.client.read_exact(&mut buffer[..len]);
|
||||||
let r = Pin::new(&mut f).poll(cx);
|
let r = Pin::new(&mut f).poll(cx);
|
||||||
match r {
|
match r {
|
||||||
Poll::Ready(Ok(n)) => match Message::from_bytes(&buffer[..n]) {
|
Poll::Ready(Ok(())) => match Message::from_bytes(&buffer[..]) {
|
||||||
Ok(msg) => Poll::Ready(Some(msg)),
|
Ok(msg) => Poll::Ready(Some(msg)),
|
||||||
_ => Poll::Ready(None),
|
_ => Poll::Ready(None),
|
||||||
},
|
},
|
||||||
@ -35,24 +56,41 @@ impl futures::stream::Stream for MessageStream {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
pub struct MessageSender {
|
pub struct MessageSender {
|
||||||
client: async_std::net::TcpStream,
|
client: async_std::net::TcpStream,
|
||||||
|
connected: Arc<RwLock<bool>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl MessageSender {
|
impl MessageSender {
|
||||||
pub async fn ack(&mut self) {
|
pub async fn ack(&mut self) -> EBResult<()> {
|
||||||
self.send(Msg::Ack).await;
|
self.send(Msg::Ack).await
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl MessageSend for MessageSender {
|
impl MessageSend for MessageSender {
|
||||||
async fn send(&mut self, msg: Msg) {
|
async fn send(&mut self, msg: Msg) -> EBResult<()> {
|
||||||
let Ok(v) = msg.to_bytes() else {
|
if !*self.connected.read().unwrap() {
|
||||||
return;
|
return Err(EBError::Closed);
|
||||||
};
|
|
||||||
if let Err(e) = self.client.write(&v).await {
|
|
||||||
warn!("Failed to write message {msg:?}: {e}");
|
|
||||||
}
|
}
|
||||||
|
let Ok(msg_buf) = msg.to_bytes() else {
|
||||||
|
return Err(EBError::SendFailed);
|
||||||
|
};
|
||||||
|
let len = msg_buf.len();
|
||||||
|
let mut len_buf = len.to_le_bytes().to_vec();
|
||||||
|
len_buf.extend_from_slice(&msg_buf);
|
||||||
|
debug!("Sending {msg:?} with {len} as {len_buf:?}");
|
||||||
|
// debug!("Parts {len_buf:?} {msg_buf:?}");
|
||||||
|
|
||||||
|
if let Err(e) = self.client.write(&len_buf).await {
|
||||||
|
warn!("Failed to write message {msg:?}: {e}");
|
||||||
|
return Err(EBError::SendFailed);
|
||||||
|
}
|
||||||
|
// if let Err(e) = self.client.write(&v).await {
|
||||||
|
// warn!("Failed to write message {msg:?}: {e}");
|
||||||
|
// return Err(EBError::SendFailed);
|
||||||
|
// }
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -70,12 +108,33 @@ impl EventBus<MessageStream, MessageSender> for LocalEventBus {
|
|||||||
let client = async_std::net::TcpStream::connect(format!("{}:{}", config.bind, config.port))
|
let client = async_std::net::TcpStream::connect(format!("{}:{}", config.bind, config.port))
|
||||||
.await
|
.await
|
||||||
.expect("Failed tp connect to event bus");
|
.expect("Failed tp connect to event bus");
|
||||||
|
client.set_nodelay(false).unwrap();
|
||||||
|
|
||||||
Ok((
|
let connected = Arc::new(RwLock::new(true));
|
||||||
MessageStream {
|
let stream = MessageStream {
|
||||||
client: client.clone(),
|
client: client.clone(),
|
||||||
},
|
connected: connected.clone(),
|
||||||
MessageSender { client },
|
};
|
||||||
))
|
let sender = MessageSender {
|
||||||
|
client,
|
||||||
|
connected: connected.clone(),
|
||||||
|
};
|
||||||
|
|
||||||
|
{
|
||||||
|
let connected = connected.clone();
|
||||||
|
let mut sender = sender.clone();
|
||||||
|
spawn(async move {
|
||||||
|
loop {
|
||||||
|
if sender.send(Msg::Pong).await.is_err() {
|
||||||
|
*connected.write().unwrap() = false;
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
sleep(std::time::Duration::from_millis(300)).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok((stream, sender))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,17 +1,21 @@
|
|||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
|
use std::net::Shutdown;
|
||||||
use std::sync::{Arc, RwLock};
|
use std::sync::{Arc, RwLock};
|
||||||
|
|
||||||
use async_std::net::{TcpListener, TcpStream};
|
use async_std::net::{TcpListener, TcpStream};
|
||||||
|
use async_std::task::sleep;
|
||||||
use event_bus_adapter::*;
|
use event_bus_adapter::*;
|
||||||
use flumedb::*;
|
use flumedb::*;
|
||||||
use futures_util::{AsyncReadExt, AsyncWriteExt, StreamExt};
|
use futures_util::{AsyncReadExt, AsyncWriteExt, StreamExt};
|
||||||
use gumdrop::Options;
|
use gumdrop::Options;
|
||||||
use tracing::{error, info, warn};
|
use tracing::{debug, error, info, warn};
|
||||||
use tracing_subscriber::EnvFilter;
|
use tracing_subscriber::EnvFilter;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
#[derive(Debug, thiserror::Error)]
|
#[derive(Debug, thiserror::Error)]
|
||||||
enum Error {
|
enum Error {
|
||||||
|
#[error("Reserved")]
|
||||||
|
Reversed,
|
||||||
#[error("Client closed connection")]
|
#[error("Client closed connection")]
|
||||||
BrokenPipe,
|
BrokenPipe,
|
||||||
}
|
}
|
||||||
@ -77,10 +81,12 @@ async fn main() -> Result<(), Error> {
|
|||||||
let clients = clients.clone();
|
let clients = clients.clone();
|
||||||
|
|
||||||
async_std::task::spawn(async move {
|
async_std::task::spawn(async move {
|
||||||
let Ok(stream) = stream else {
|
let Ok(mut stream) = stream else {
|
||||||
return;
|
return;
|
||||||
};
|
};
|
||||||
let mut log = OffsetLog::<Msg>::from_file(
|
stream.set_nodelay(false).unwrap();
|
||||||
|
|
||||||
|
let mut log = OffsetLog::<u64>::from_file(
|
||||||
std::fs::OpenOptions::new()
|
std::fs::OpenOptions::new()
|
||||||
.append(true)
|
.append(true)
|
||||||
.create(true)
|
.create(true)
|
||||||
@ -103,21 +109,11 @@ async fn main() -> Result<(), Error> {
|
|||||||
warn!("Failed to send register: {e}");
|
warn!("Failed to send register: {e}");
|
||||||
}
|
}
|
||||||
|
|
||||||
loop {
|
while client.is_alive() {
|
||||||
let len = match AsyncReadExt::read_to_end(&mut client.stream, &mut buffer).await {
|
let Ok(Some(msg)) = read_msg(client.clone(), &mut buffer).await else {
|
||||||
Ok(n) => n,
|
continue;
|
||||||
Err(e) => {
|
|
||||||
warn!("Failed to read from client: {e}");
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
let msg = match Msg::from_bytes(&buffer[..len]) {
|
|
||||||
Ok(msg) => msg,
|
|
||||||
Err(e) => {
|
|
||||||
warn!("Invalid incoming message {buffer:?}: {e}");
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
|
||||||
match &msg {
|
match &msg {
|
||||||
Msg::Ping | Msg::Pong => {
|
Msg::Ping | Msg::Pong => {
|
||||||
if let Err(e) = client
|
if let Err(e) = client
|
||||||
@ -126,26 +122,27 @@ async fn main() -> Result<(), Error> {
|
|||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
warn!("Failed to write to client. Closing. {e}");
|
warn!("Failed to write to client. Closing. {e}");
|
||||||
|
sleep(std::time::Duration::from_millis(300)).await;
|
||||||
client.stream.close().await.ok();
|
client.stream.close().await.ok();
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Msg::Seek(offset) => {
|
Msg::Seek(offset) => {
|
||||||
let log_file = match OffsetLog::<Msg>::open_read_only(&path) {
|
let log_file = match OffsetLog::<u64>::open_read_only(&path) {
|
||||||
Ok(f) => f,
|
Ok(f) => f,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!("Failed to open file for read: {e}");
|
error!("Failed to open file for read: {e}");
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
let offset = (*offset) as u64;
|
let mut offset = (*offset) as u64;
|
||||||
client.offset = Some(Arc::new(RwLock::new(offset)));
|
|
||||||
|
|
||||||
let iter = log_file.bidir_iter_at_offset(offset);
|
let iter = log_file.bidir_iter_at_offset(offset);
|
||||||
let mut iter =
|
let mut iter =
|
||||||
iter.filter_map(|e| Msg::from_bytes(&e.data).ok().zip(Some(e.offset)));
|
iter.filter_map(|e| Msg::from_bytes(&e.data).ok().zip(Some(e.offset)));
|
||||||
while let Some((msg, offset)) = iter.next() {
|
|
||||||
match send_directly(client.clone(), &mut buffer, offset, msg).await {
|
while let Some((msg, new_offset)) = iter.next() {
|
||||||
|
match send_directly(client.clone(), new_offset, msg).await {
|
||||||
Err(Error::BrokenPipe) => break,
|
Err(Error::BrokenPipe) => break,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
warn!("Failed to send message: {e}");
|
warn!("Failed to send message: {e}");
|
||||||
@ -157,12 +154,11 @@ async fn main() -> Result<(), Error> {
|
|||||||
{
|
{
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
if let Some(o) = client.offset.as_mut() {
|
offset = new_offset;
|
||||||
*o.write().unwrap() = offset;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
client.offset = Some(Arc::new(RwLock::new(offset)));
|
||||||
}
|
}
|
||||||
_ => {
|
_ => {
|
||||||
info!("Incoming message: {msg:?}");
|
info!("Incoming message: {msg:?}");
|
||||||
@ -177,7 +173,8 @@ async fn main() -> Result<(), Error> {
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
let offset = match log.append(&bytes) {
|
debug!("Msg bytes {:?}", bytes.as_slice());
|
||||||
|
let offset = match log.append(&bytes[..]) {
|
||||||
Ok(offset) => offset,
|
Ok(offset) => offset,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!("Failed to write message to file: {e}");
|
error!("Failed to write message to file: {e}");
|
||||||
@ -191,7 +188,7 @@ async fn main() -> Result<(), Error> {
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
*client.alive.write().unwrap() = false;
|
client.close();
|
||||||
if let Err(e) = tx.send(InnerMsg::Drop(client.id)) {
|
if let Err(e) = tx.send(InnerMsg::Drop(client.id)) {
|
||||||
warn!("Failed to send close client: {e}");
|
warn!("Failed to send close client: {e}");
|
||||||
}
|
}
|
||||||
@ -216,7 +213,7 @@ async fn broadcast(
|
|||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
for client in clients {
|
for client in clients {
|
||||||
let id = client.id;
|
let id = client.id;
|
||||||
if let Err(e) = send_directly(client.clone(), buffer, offset, msg.clone()).await {
|
if let Err(e) = send_directly(client.clone(), offset, msg.clone()).await {
|
||||||
warn!("Failed to send message to {}: {e}", id);
|
warn!("Failed to send message to {}: {e}", id);
|
||||||
}
|
}
|
||||||
if let Err(Error::BrokenPipe) = wait_ack(client, buffer).await {
|
if let Err(Error::BrokenPipe) = wait_ack(client, buffer).await {
|
||||||
@ -226,23 +223,26 @@ async fn broadcast(
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn send_directly(
|
async fn send_directly(mut client: Client, offset: u64, msg: Msg) -> Result<(), Error> {
|
||||||
mut client: Client,
|
let msg_buf = match (Message {
|
||||||
buffer: &mut Vec<u8>,
|
|
||||||
offset: u64,
|
|
||||||
msg: Msg,
|
|
||||||
) -> Result<(), Error> {
|
|
||||||
if let Err(e) = (Message {
|
|
||||||
offset,
|
offset,
|
||||||
payload: msg,
|
payload: msg,
|
||||||
})
|
})
|
||||||
.to_bytes_into(buffer)
|
.to_bytes()
|
||||||
{
|
{
|
||||||
warn!("Failed to serialize message: {e}");
|
Ok(v) => v,
|
||||||
return Ok(());
|
Err(e) => {
|
||||||
|
warn!("Failed to serialize message: {e}");
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
};
|
};
|
||||||
if let Err(e) = client.stream.write(buffer).await {
|
let len = msg_buf.len();
|
||||||
|
let mut len_buf = len.to_le_bytes().to_vec();
|
||||||
|
len_buf.extend_from_slice(&msg_buf);
|
||||||
|
|
||||||
|
if let Err(e) = client.stream.write(&len_buf).await {
|
||||||
warn!("Failed to write message to client: {e}");
|
warn!("Failed to write message to client: {e}");
|
||||||
|
client.close();
|
||||||
Err(Error::BrokenPipe)
|
Err(Error::BrokenPipe)
|
||||||
} else {
|
} else {
|
||||||
Ok(())
|
Ok(())
|
||||||
@ -250,15 +250,45 @@ async fn send_directly(
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn read_msg(mut client: Client, buffer: &mut Vec<u8>) -> Result<Option<Msg>, Error> {
|
async fn read_msg(mut client: Client, buffer: &mut Vec<u8>) -> Result<Option<Msg>, Error> {
|
||||||
let len = match AsyncReadExt::read_to_end(&mut client.stream, buffer).await {
|
const LEN_SIZE: usize = std::mem::size_of::<usize>();
|
||||||
|
|
||||||
|
let mut len_buf = [0; LEN_SIZE];
|
||||||
|
let len = match AsyncReadExt::read_exact(&mut client.stream, &mut len_buf).await {
|
||||||
|
Ok(()) => {
|
||||||
|
// debug!("Received length bytes {len_buf:?}");
|
||||||
|
usize::from_le_bytes(len_buf)
|
||||||
|
}
|
||||||
|
Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
|
||||||
|
client.close();
|
||||||
|
return Err(Error::BrokenPipe);
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
warn!("Failed to read msg len from client: {e}");
|
||||||
|
return Err(Error::BrokenPipe);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
buffer.clear();
|
||||||
|
buffer.reserve(len);
|
||||||
|
buffer.resize(len, 0);
|
||||||
|
// debug!("Length is {len}");
|
||||||
|
// debug!("FRAME: {len_buf:?} {buffer:?}");
|
||||||
|
|
||||||
|
let _l = match AsyncReadExt::read_exact(&mut client.stream, &mut buffer[..len]).await {
|
||||||
Ok(n) => n,
|
Ok(n) => n,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
warn!("Failed to read from client: {e}");
|
warn!("Failed to read from client: {e}");
|
||||||
return Err(Error::BrokenPipe);
|
return Err(Error::BrokenPipe);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
match Msg::from_bytes(&buffer[..len]) {
|
// debug!("Incoming data: {len} {buffer:?}");
|
||||||
Ok(msg) => Ok(Some(msg)),
|
match Msg::from_bytes(&buffer) {
|
||||||
|
Ok(msg) => {
|
||||||
|
if !matches!(msg, Msg::Pong) {
|
||||||
|
debug!("Message {msg:?}");
|
||||||
|
}
|
||||||
|
Ok(Some(msg))
|
||||||
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
warn!("Invalid incoming message {buffer:?}: {e}");
|
warn!("Invalid incoming message {buffer:?}: {e}");
|
||||||
return Ok(None);
|
return Ok(None);
|
||||||
@ -268,6 +298,7 @@ async fn read_msg(mut client: Client, buffer: &mut Vec<u8>) -> Result<Option<Msg
|
|||||||
|
|
||||||
async fn wait_ack(client: Client, buffer: &mut Vec<u8>) -> Result<(), Error> {
|
async fn wait_ack(client: Client, buffer: &mut Vec<u8>) -> Result<(), Error> {
|
||||||
loop {
|
loop {
|
||||||
|
buffer.clear();
|
||||||
let msg = read_msg(client.clone(), buffer).await?;
|
let msg = read_msg(client.clone(), buffer).await?;
|
||||||
if let Some(Msg::Ack) = msg {
|
if let Some(Msg::Ack) = msg {
|
||||||
break;
|
break;
|
||||||
@ -288,3 +319,14 @@ struct Client {
|
|||||||
alive: Arc<RwLock<bool>>,
|
alive: Arc<RwLock<bool>>,
|
||||||
offset: Option<Arc<RwLock<u64>>>,
|
offset: Option<Arc<RwLock<u64>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Client {
|
||||||
|
fn close(&mut self) {
|
||||||
|
*self.alive.write().unwrap() = false;
|
||||||
|
self.stream.shutdown(Shutdown::Both).ok();
|
||||||
|
}
|
||||||
|
|
||||||
|
fn is_alive(&self) -> bool {
|
||||||
|
*self.alive.read().unwrap()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -3,7 +3,7 @@
|
|||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use std::task::{Context, Poll};
|
use std::task::{Context, Poll};
|
||||||
|
|
||||||
use event_bus_adapter::{Config, EventBus, Message, MessageSend, Msg};
|
use event_bus_adapter::{Config, EBError, EBResult, EventBus, Message, MessageSend, Msg};
|
||||||
use futures_util::stream::{SplitSink, SplitStream};
|
use futures_util::stream::{SplitSink, SplitStream};
|
||||||
use futures_util::{SinkExt, StreamExt};
|
use futures_util::{SinkExt, StreamExt};
|
||||||
use redis_async::client::connect::RespConnection;
|
use redis_async::client::connect::RespConnection;
|
||||||
@ -14,7 +14,7 @@ use tracing::warn;
|
|||||||
pub struct MessageSender(SplitSink<RespConnection, RespValue>);
|
pub struct MessageSender(SplitSink<RespConnection, RespValue>);
|
||||||
|
|
||||||
impl MessageSend for MessageSender {
|
impl MessageSend for MessageSender {
|
||||||
async fn send(&mut self, msg: Msg) {
|
async fn send(&mut self, msg: Msg) -> EBResult<()> {
|
||||||
match (Message {
|
match (Message {
|
||||||
payload: msg,
|
payload: msg,
|
||||||
offset: 0,
|
offset: 0,
|
||||||
@ -24,12 +24,15 @@ impl MessageSend for MessageSender {
|
|||||||
Ok(v) => {
|
Ok(v) => {
|
||||||
if let Err(e) = self.0.send(BulkString(v)).await {
|
if let Err(e) = self.0.send(BulkString(v)).await {
|
||||||
warn!("Failed to send serialized message: {e}");
|
warn!("Failed to send serialized message: {e}");
|
||||||
|
return Err(EBError::SendFailed);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
warn!("Failed to serialize message while sending: {e}");
|
warn!("Failed to serialize message while sending: {e}");
|
||||||
|
return Err(EBError::SendFailed);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -0,0 +1 @@
|
|||||||
|
|
Loading…
Reference in New Issue
Block a user