#![feature(async_fn_in_trait)] use std::pin::Pin; use std::task::{Context, Poll}; use event_bus_adapter::{Config, EventBus, Message, MessageSend, Msg}; use futures_util::stream::{SplitSink, SplitStream}; use futures_util::{SinkExt, StreamExt}; use redis_async::client::connect::RespConnection; use redis_async::resp::RespValue; use redis_async::resp::RespValue::BulkString; use tracing::log::warn; pub struct MessageSender(SplitSink); impl MessageSend for MessageSender { async fn send(&mut self, msg: Msg) { match (Message { payload: msg, offset: 0, }) .to_bytes() { Ok(v) => { if let Err(e) = self.0.send(BulkString(v)).await { warn!("Failed to send serialized message: {e}"); } } Err(e) => { warn!("Failed to serialize message while sending: {e}"); } } } } pub struct MessageStream(SplitStream); impl futures::stream::Stream for MessageStream { type Item = Message; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut_self: &mut Self = unsafe { Pin::get_unchecked_mut(self) }; match Pin::new(&mut mut_self.0).poll_next(cx) { Poll::Ready(value) => match value { Some(msg) => { let value = match msg { Ok(v) => v, Err(e) => { warn!("reading from redis event stream failed: {e}"); return Poll::Pending; } }; let BulkString(v) = value else { return Poll::Pending; }; let msg = match Message::from_bytes(&v) { Ok(msg) => msg, Err(e) => { warn!("Invalid message: {e}"); return Poll::Pending; } }; return Poll::Ready(Some(msg)); } None => Poll::Ready(None), }, Poll::Pending => Poll::Pending, } } } #[derive(Debug, serde::Deserialize, serde::Serialize)] pub struct RedisEventBusConfig { host: String, port: u16, } pub struct RedisEventBus; impl EventBus for RedisEventBus { async fn connect(config: Config) -> Result<(MessageStream, MessageSender), ()> { let RedisEventBusConfig { host, port } = config.config().expect("Invalid redis bus config"); let client = redis_async::client::connect(&host, port) .await .expect("Failed to connect to redis event bus"); let (sink, stream) = client.split(); Ok((MessageStream(stream), MessageSender(sink))) } }