bazzar/crates/event-bus-redis-plugin/src/lib.rs
2023-06-03 13:31:57 +02:00

140 lines
4.1 KiB
Rust

#![feature(async_fn_in_trait)]
use std::pin::Pin;
use std::task::{Context, Poll};
use async_trait::async_trait;
use event_bus_adapter::{
EBError, EBResult, EventBus, Message, MessageSend, Msg, PluginConfig, Topic,
};
use futures_util::stream::{SplitSink, SplitStream};
use futures_util::{SinkExt, StreamExt};
use plugin_api::{AppConfig, EventBusRegister, PluginType};
use redis_async::client::connect::RespConnection;
use redis_async::resp::RespValue;
use redis_async::resp::RespValue::BulkString;
use tracing::warn;
pub struct MessageSender(SplitSink<RespConnection, RespValue>);
#[async_trait]
impl MessageSend for MessageSender {
async fn send(&mut self, topic: Topic, msg: Msg) -> EBResult<()> {
match (Message {
payload: msg,
offset: 0,
topic,
})
.to_bytes()
{
Ok(v) => {
if let Err(e) = self.0.send(BulkString(v)).await {
warn!("Failed to send serialized message: {e}");
return Err(EBError::SendFailed);
}
}
Err(e) => {
warn!("Failed to serialize message while sending: {e}");
return Err(EBError::SendFailed);
}
}
Ok(())
}
}
pub struct MessageStream(SplitStream<RespConnection>);
impl futures::stream::Stream for MessageStream {
type Item = Message;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut_self: &mut Self = unsafe { Pin::get_unchecked_mut(self) };
match Pin::new(&mut mut_self.0).poll_next(cx) {
Poll::Ready(value) => match value {
Some(msg) => {
let value = match msg {
Ok(v) => v,
Err(e) => {
warn!("reading from redis event stream failed: {e}");
return Poll::Pending;
}
};
let BulkString(v) = value else {
return Poll::Pending;
};
let msg = match Message::from_bytes(&v) {
Ok(msg) => msg,
Err(e) => {
warn!("Invalid message: {e}");
return Poll::Pending;
}
};
return Poll::Ready(Some(msg));
}
None => Poll::Ready(None),
},
Poll::Pending => Poll::Pending,
}
}
}
#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub struct RedisEventBusConfig {
host: String,
port: u16,
}
pub struct RedisEventBus;
impl RedisEventBus {
async fn connect(config: PluginConfig) -> Result<(Box<MessageStream>, Box<MessageSender>), ()> {
let RedisEventBusConfig { host, port } = config.config().expect("Invalid redis bus config");
let client = redis_async::client::connect(&host, port)
.await
.expect("Failed to connect to redis event bus");
let (sink, stream) = client.split();
Ok((
Box::new(MessageStream(stream)),
Box::new(MessageSender(sink)),
))
}
}
pub static PLUGIN_NAME: &str = "event-bus-redis";
pub struct RedisEventBusPlugin {
plugin_config: PluginConfig,
}
#[async_trait]
impl plugin_api::Plugin for RedisEventBusPlugin {
fn plugin_type() -> PluginType
where
Self: Sized,
{
PluginType::EventBus
}
fn name(&self) -> &'static str {
PLUGIN_NAME
}
async fn initialize(app_config: &AppConfig) -> Self
where
Self: Sized,
{
Self {
plugin_config: app_config.plugin_config(PLUGIN_NAME).unwrap_or_default(),
}
}
async fn register_event_bus(&mut self, register: &'static mut EventBusRegister) {
let Ok((stream, sender)) = RedisEventBus::connect(self.plugin_config.clone()).await else {
return
};
register.register(PLUGIN_NAME, EventBus::new(stream, sender));
}
}