#![feature(async_fn_in_trait)] use std::pin::Pin; use std::task::{Context, Poll}; use async_trait::async_trait; use event_bus_adapter::{ EBError, EBResult, EventBus, Message, MessageSend, Msg, PluginConfig, Topic, }; use futures_util::stream::{SplitSink, SplitStream}; use futures_util::{SinkExt, StreamExt}; use plugin_api::{AppConfig, EventBusRegister, PluginType}; use redis_async::client::connect::RespConnection; use redis_async::resp::RespValue; use redis_async::resp::RespValue::BulkString; use tracing::warn; pub struct MessageSender(SplitSink); #[async_trait] impl MessageSend for MessageSender { async fn send(&mut self, topic: Topic, msg: Msg) -> EBResult<()> { match (Message { payload: msg, offset: 0, topic, }) .to_bytes() { Ok(v) => { if let Err(e) = self.0.send(BulkString(v)).await { warn!("Failed to send serialized message: {e}"); return Err(EBError::SendFailed); } } Err(e) => { warn!("Failed to serialize message while sending: {e}"); return Err(EBError::SendFailed); } } Ok(()) } } pub struct MessageStream(SplitStream); impl futures::stream::Stream for MessageStream { type Item = Message; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut_self: &mut Self = unsafe { Pin::get_unchecked_mut(self) }; match Pin::new(&mut mut_self.0).poll_next(cx) { Poll::Ready(value) => match value { Some(msg) => { let value = match msg { Ok(v) => v, Err(e) => { warn!("reading from redis event stream failed: {e}"); return Poll::Pending; } }; let BulkString(v) = value else { return Poll::Pending; }; let msg = match Message::from_bytes(&v) { Ok(msg) => msg, Err(e) => { warn!("Invalid message: {e}"); return Poll::Pending; } }; return Poll::Ready(Some(msg)); } None => Poll::Ready(None), }, Poll::Pending => Poll::Pending, } } } #[derive(Debug, serde::Deserialize, serde::Serialize)] pub struct RedisEventBusConfig { host: String, port: u16, } pub struct RedisEventBus; impl RedisEventBus { async fn connect(config: PluginConfig) -> Result<(Box, Box), ()> { let RedisEventBusConfig { host, port } = config.config().expect("Invalid redis bus config"); let client = redis_async::client::connect(&host, port) .await .expect("Failed to connect to redis event bus"); let (sink, stream) = client.split(); Ok(( Box::new(MessageStream(stream)), Box::new(MessageSender(sink)), )) } } pub static PLUGIN_NAME: &str = "event-bus-redis"; pub struct RedisEventBusPlugin { plugin_config: PluginConfig, } #[async_trait] impl plugin_api::Plugin for RedisEventBusPlugin { fn plugin_type() -> PluginType where Self: Sized, { PluginType::EventBus } fn name(&self) -> &'static str { PLUGIN_NAME } async fn initialize(app_config: &AppConfig) -> Self where Self: Sized, { Self { plugin_config: app_config.plugin_config(PLUGIN_NAME).unwrap_or_default(), } } async fn register_event_bus(&mut self, register: &'static mut EventBusRegister) { let Ok((stream, sender)) = RedisEventBus::connect(self.plugin_config.clone()).await else { return }; register.register(PLUGIN_NAME, EventBus::new(stream, sender)); } }