bazzar/crates/event-bus-redis-plugin/src/lib.rs

140 lines
4.1 KiB
Rust
Raw Normal View History

2023-05-25 15:56:15 +02:00
#![feature(async_fn_in_trait)]
use std::pin::Pin;
use std::task::{Context, Poll};
2023-06-03 13:31:57 +02:00
use async_trait::async_trait;
2023-06-01 22:02:47 +02:00
use event_bus_adapter::{
EBError, EBResult, EventBus, Message, MessageSend, Msg, PluginConfig, Topic,
};
2023-05-25 15:56:15 +02:00
use futures_util::stream::{SplitSink, SplitStream};
use futures_util::{SinkExt, StreamExt};
2023-06-03 13:31:57 +02:00
use plugin_api::{AppConfig, EventBusRegister, PluginType};
2023-05-25 15:56:15 +02:00
use redis_async::client::connect::RespConnection;
use redis_async::resp::RespValue;
use redis_async::resp::RespValue::BulkString;
use tracing::warn;
2023-05-25 15:56:15 +02:00
pub struct MessageSender(SplitSink<RespConnection, RespValue>);
2023-06-03 13:31:57 +02:00
#[async_trait]
2023-05-25 15:56:15 +02:00
impl MessageSend for MessageSender {
2023-06-01 10:41:58 +02:00
async fn send(&mut self, topic: Topic, msg: Msg) -> EBResult<()> {
2023-05-25 15:56:15 +02:00
match (Message {
payload: msg,
2023-05-26 09:11:28 +02:00
offset: 0,
2023-06-01 10:41:58 +02:00
topic,
2023-05-25 15:56:15 +02:00
})
.to_bytes()
{
Ok(v) => {
if let Err(e) = self.0.send(BulkString(v)).await {
warn!("Failed to send serialized message: {e}");
2023-05-27 23:04:55 +02:00
return Err(EBError::SendFailed);
2023-05-25 15:56:15 +02:00
}
}
Err(e) => {
warn!("Failed to serialize message while sending: {e}");
2023-05-27 23:04:55 +02:00
return Err(EBError::SendFailed);
2023-05-25 15:56:15 +02:00
}
}
2023-05-27 23:04:55 +02:00
Ok(())
2023-05-25 15:56:15 +02:00
}
}
pub struct MessageStream(SplitStream<RespConnection>);
impl futures::stream::Stream for MessageStream {
type Item = Message;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut_self: &mut Self = unsafe { Pin::get_unchecked_mut(self) };
match Pin::new(&mut mut_self.0).poll_next(cx) {
Poll::Ready(value) => match value {
Some(msg) => {
let value = match msg {
Ok(v) => v,
Err(e) => {
warn!("reading from redis event stream failed: {e}");
return Poll::Pending;
}
};
let BulkString(v) = value else {
return Poll::Pending;
};
let msg = match Message::from_bytes(&v) {
Ok(msg) => msg,
Err(e) => {
warn!("Invalid message: {e}");
return Poll::Pending;
}
};
return Poll::Ready(Some(msg));
}
None => Poll::Ready(None),
},
Poll::Pending => Poll::Pending,
}
}
}
#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub struct RedisEventBusConfig {
host: String,
port: u16,
}
pub struct RedisEventBus;
2023-06-03 13:31:57 +02:00
impl RedisEventBus {
async fn connect(config: PluginConfig) -> Result<(Box<MessageStream>, Box<MessageSender>), ()> {
2023-05-25 15:56:15 +02:00
let RedisEventBusConfig { host, port } = config.config().expect("Invalid redis bus config");
let client = redis_async::client::connect(&host, port)
.await
.expect("Failed to connect to redis event bus");
let (sink, stream) = client.split();
2023-06-03 13:31:57 +02:00
Ok((
Box::new(MessageStream(stream)),
Box::new(MessageSender(sink)),
))
}
}
pub static PLUGIN_NAME: &str = "event-bus-redis";
pub struct RedisEventBusPlugin {
plugin_config: PluginConfig,
}
#[async_trait]
impl plugin_api::Plugin for RedisEventBusPlugin {
fn plugin_type() -> PluginType
where
Self: Sized,
{
PluginType::EventBus
}
fn name(&self) -> &'static str {
PLUGIN_NAME
}
async fn initialize(app_config: &AppConfig) -> Self
where
Self: Sized,
{
Self {
plugin_config: app_config.plugin_config(PLUGIN_NAME).unwrap_or_default(),
}
}
async fn register_event_bus(&mut self, register: &'static mut EventBusRegister) {
let Ok((stream, sender)) = RedisEventBus::connect(self.plugin_config.clone()).await else {
2024-06-26 16:34:00 +02:00
return;
2023-06-03 13:31:57 +02:00
};
register.register(PLUGIN_NAME, EventBus::new(stream, sender));
2023-05-25 15:56:15 +02:00
}
}