bazzar/crates/redis-event-bus/src/lib.rs

92 lines
2.9 KiB
Rust
Raw Normal View History

2023-05-25 15:56:15 +02:00
#![feature(async_fn_in_trait)]
use std::pin::Pin;
use std::task::{Context, Poll};
use event_bus_adapter::{Config, EventBus, Message, MessageSend, Msg};
use futures_util::stream::{SplitSink, SplitStream};
use futures_util::{SinkExt, StreamExt};
use redis_async::client::connect::RespConnection;
use redis_async::resp::RespValue;
use redis_async::resp::RespValue::BulkString;
use tracing::log::warn;
pub struct MessageSender(SplitSink<RespConnection, RespValue>);
impl MessageSend for MessageSender {
async fn send(&mut self, msg: Msg) {
match (Message {
payload: msg,
index: 0,
})
.to_bytes()
{
Ok(v) => {
if let Err(e) = self.0.send(BulkString(v)).await {
warn!("Failed to send serialized message: {e}");
}
}
Err(e) => {
warn!("Failed to serialize message while sending: {e}");
}
}
}
}
pub struct MessageStream(SplitStream<RespConnection>);
impl futures::stream::Stream for MessageStream {
type Item = Message;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut_self: &mut Self = unsafe { Pin::get_unchecked_mut(self) };
match Pin::new(&mut mut_self.0).poll_next(cx) {
Poll::Ready(value) => match value {
Some(msg) => {
let value = match msg {
Ok(v) => v,
Err(e) => {
warn!("reading from redis event stream failed: {e}");
return Poll::Pending;
}
};
let BulkString(v) = value else {
return Poll::Pending;
};
let msg = match Message::from_bytes(&v) {
Ok(msg) => msg,
Err(e) => {
warn!("Invalid message: {e}");
return Poll::Pending;
}
};
return Poll::Ready(Some(msg));
}
None => Poll::Ready(None),
},
Poll::Pending => Poll::Pending,
}
}
}
#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub struct RedisEventBusConfig {
host: String,
port: u16,
}
pub struct RedisEventBus;
impl EventBus<MessageStream, MessageSender> for RedisEventBus {
async fn connect(config: Config) -> Result<(MessageStream, MessageSender), ()> {
let RedisEventBusConfig { host, port } = config.config().expect("Invalid redis bus config");
let client = redis_async::client::connect(&host, port)
.await
.expect("Failed to connect to redis event bus");
let (sink, stream) = client.split();
Ok((MessageStream(stream), MessageSender(sink)))
}
}