Add cache
This commit is contained in:
parent
4a6ce7c366
commit
313982a720
1528
Cargo.lock
generated
1528
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@ -33,8 +33,11 @@ members = [
|
|||||||
# EVENT BUS
|
# EVENT BUS
|
||||||
"crates/event-bus-messages",
|
"crates/event-bus-messages",
|
||||||
"crates/event-bus-adapter",
|
"crates/event-bus-adapter",
|
||||||
"crates/local-event-bus",
|
"crates/event-bus-redis",
|
||||||
"crates/redis-event-bus",
|
# CACHE
|
||||||
|
"crates/cache-adapter",
|
||||||
|
"crates/cache-adapter-redis",
|
||||||
|
"crates/cache-adapter-embedded-memory",
|
||||||
]
|
]
|
||||||
exclude = [
|
exclude = [
|
||||||
"crates/web"
|
"crates/web"
|
||||||
|
14
crates/cache-adapter-embedded-memory/Cargo.toml
Normal file
14
crates/cache-adapter-embedded-memory/Cargo.toml
Normal file
@ -0,0 +1,14 @@
|
|||||||
|
[package]
|
||||||
|
name = "cache-adapter-embedded-memory"
|
||||||
|
version = "0.1.0"
|
||||||
|
edition = "2021"
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
cache-adapter = { path = "../cache-adapter" }
|
||||||
|
serde = { version = "1.0.163", features = ['derive'] }
|
||||||
|
async-trait = { version = "0.1.68" }
|
||||||
|
chrono = { version = "0.4.26" }
|
||||||
|
bincode = { version = "1" }
|
||||||
|
tracing = { version = "0" }
|
||||||
|
futures-executor = { version = "0.3.28", features = [] }
|
||||||
|
tokio = { version = "1.28.2", features = ['full'] }
|
146
crates/cache-adapter-embedded-memory/src/lib.rs
Normal file
146
crates/cache-adapter-embedded-memory/src/lib.rs
Normal file
@ -0,0 +1,146 @@
|
|||||||
|
use std::collections::HashMap;
|
||||||
|
use std::sync::{Arc, RwLock};
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
use async_trait::async_trait;
|
||||||
|
use cache_adapter::{CResult, CacheAdapter, Config, Error, InvalidatePattern};
|
||||||
|
use serde::de::DeserializeOwned;
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use tracing::error;
|
||||||
|
|
||||||
|
pub struct CacheEntry {
|
||||||
|
pub expires_at: Option<chrono::NaiveDateTime>,
|
||||||
|
pub payload: Vec<u8>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
|
pub struct MemoryConfig {
|
||||||
|
ttl_secs: Option<u64>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct EmbeddedMemoryCacheAdapter {
|
||||||
|
storage: Arc<RwLock<HashMap<String, CacheEntry>>>,
|
||||||
|
ttl: Option<Duration>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl CacheAdapter for EmbeddedMemoryCacheAdapter {
|
||||||
|
async fn new(config: Config) -> CResult<Self> {
|
||||||
|
let config: MemoryConfig = config.config::<MemoryConfig>().map_err(|e| {
|
||||||
|
error!("Failed to parse memory cache config: {e}");
|
||||||
|
Error::InvalidConfig
|
||||||
|
})?;
|
||||||
|
Ok(Self {
|
||||||
|
storage: Arc::new(Default::default()),
|
||||||
|
ttl: config.ttl_secs.map(|n| Duration::from_secs(n)),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn read<T>(&mut self, key: &str) -> CResult<Option<T>>
|
||||||
|
where
|
||||||
|
T: DeserializeOwned,
|
||||||
|
{
|
||||||
|
let (valid, data) = {
|
||||||
|
let lock = self.storage.read().unwrap();
|
||||||
|
let entry = match lock.get(key) {
|
||||||
|
None => return Ok(None),
|
||||||
|
Some(entry) => entry,
|
||||||
|
};
|
||||||
|
let valid = entry
|
||||||
|
.expires_at
|
||||||
|
.map(|exp| exp >= chrono::Utc::now().naive_utc())
|
||||||
|
.unwrap_or(true);
|
||||||
|
let data = if valid {
|
||||||
|
match bincode::deserialize::<T>(&entry.payload) {
|
||||||
|
Ok(t) => Some(t),
|
||||||
|
Err(e) => {
|
||||||
|
tracing::warn!("Malformed embedded cache entry {key:?}: {e}");
|
||||||
|
return Err(Error::InvalidEntry);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
(valid, data)
|
||||||
|
};
|
||||||
|
if valid {
|
||||||
|
Ok(data)
|
||||||
|
} else {
|
||||||
|
self.storage.write().unwrap().remove(key);
|
||||||
|
Ok(None)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn set<T>(&mut self, key: &str, data: T, expires_in: Option<Duration>) -> CResult<()>
|
||||||
|
where
|
||||||
|
T: Serialize + Send,
|
||||||
|
{
|
||||||
|
let expires_at = if let Some(duration) = expires_in.or(self.ttl) {
|
||||||
|
let storage = self.storage.clone();
|
||||||
|
let exp =
|
||||||
|
chrono::Utc::now().naive_utc() + chrono::Duration::from_std(duration).unwrap();
|
||||||
|
|
||||||
|
let key = key.to_owned();
|
||||||
|
tokio::task::spawn(async move {
|
||||||
|
tokio::time::sleep(duration).await;
|
||||||
|
storage.write().unwrap().remove(&key);
|
||||||
|
});
|
||||||
|
|
||||||
|
Some(exp)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
|
||||||
|
self.storage.write().unwrap().insert(
|
||||||
|
key.into(),
|
||||||
|
CacheEntry {
|
||||||
|
expires_at,
|
||||||
|
payload: bincode::serialize(&data).map_err(|e| {
|
||||||
|
tracing::warn!("Malformed embedded cache entry {key:?}: {e}");
|
||||||
|
Error::InvalidEntry
|
||||||
|
})?,
|
||||||
|
},
|
||||||
|
);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn invalidate(&mut self, pattern: InvalidatePattern<'_>) -> CResult<u64> {
|
||||||
|
let keys = self
|
||||||
|
.storage
|
||||||
|
.read()
|
||||||
|
.unwrap()
|
||||||
|
.keys()
|
||||||
|
.filter(|k| pattern.matches(k))
|
||||||
|
.cloned()
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
let mut lock = self.storage.write().unwrap();
|
||||||
|
let len = keys.len();
|
||||||
|
for key in keys {
|
||||||
|
lock.remove(&key);
|
||||||
|
}
|
||||||
|
Ok(len as u64)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn clear(&mut self) -> CResult<u64> {
|
||||||
|
let mut lock = self.storage.write().unwrap();
|
||||||
|
let size = lock.len() as u64;
|
||||||
|
lock.clear();
|
||||||
|
Ok(size)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
trait MatchesKey {
|
||||||
|
fn matches(&self, key: &str) -> bool;
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'s> MatchesKey for InvalidatePattern<'s> {
|
||||||
|
fn matches(&self, key: &str) -> bool {
|
||||||
|
match self {
|
||||||
|
InvalidatePattern::StartsWith(pattern) => key.starts_with(pattern.as_ref()),
|
||||||
|
InvalidatePattern::EndsWith(pattern) => key.ends_with(pattern.as_ref()),
|
||||||
|
InvalidatePattern::Contains(pattern) => key.contains(pattern.as_ref()),
|
||||||
|
InvalidatePattern::Const(pattern) => pattern.as_ref().eq(key),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
17
crates/cache-adapter-redis/Cargo.toml
Normal file
17
crates/cache-adapter-redis/Cargo.toml
Normal file
@ -0,0 +1,17 @@
|
|||||||
|
[package]
|
||||||
|
name = "cache-adapter-redis"
|
||||||
|
version = "0.1.0"
|
||||||
|
edition = "2021"
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
redis-async-pool = { git = "https://github.com/Eraden/redis-async-pool.git", branch = "upgrade-dependencies" }
|
||||||
|
cache-adapter = { path = "../cache-adapter" }
|
||||||
|
serde = { version = "1.0.163", features = ['derive'] }
|
||||||
|
async-trait = { version = "0.1.68" }
|
||||||
|
chrono = { version = "0.4.26" }
|
||||||
|
bincode = { version = "1" }
|
||||||
|
tracing = { version = "0" }
|
||||||
|
tokio = { version = "1.28.2", features = ['full'] }
|
||||||
|
redis = { version = "0.23.0" }
|
||||||
|
deadpool = { version = "0.9.5" }
|
||||||
|
futures = { version = "0.3.28" }
|
163
crates/cache-adapter-redis/src/lib.rs
Normal file
163
crates/cache-adapter-redis/src/lib.rs
Normal file
@ -0,0 +1,163 @@
|
|||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
use async_trait::async_trait;
|
||||||
|
use cache_adapter::{CResult, CacheAdapter, Config, Error, InvalidatePattern};
|
||||||
|
use redis::AsyncCommands;
|
||||||
|
use redis_async_pool::*;
|
||||||
|
use serde::de::DeserializeOwned;
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use tracing::{error, warn};
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
|
pub struct RedisConfig {
|
||||||
|
host: String,
|
||||||
|
port: u16,
|
||||||
|
namespace: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
type RedisPool = deadpool::managed::Pool<RedisConnectionManager>;
|
||||||
|
|
||||||
|
pub struct RedisCacheAdapter {
|
||||||
|
pub client: deadpool::managed::Pool<RedisConnectionManager>,
|
||||||
|
pub namespace: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RedisCacheAdapter {
|
||||||
|
pub async fn connect(config: RedisConfig) -> CResult<RedisCacheAdapter> {
|
||||||
|
let client = redis::Client::open((config.host, config.port)).map_err(|e| {
|
||||||
|
error!("Failed to connect to redis: {e}");
|
||||||
|
Error::Connect
|
||||||
|
})?;
|
||||||
|
let manager = RedisConnectionManager::new(client, true, None);
|
||||||
|
let client = RedisPool::builder(manager)
|
||||||
|
.max_size(5)
|
||||||
|
.build()
|
||||||
|
.map_err(|e| {
|
||||||
|
error!("Unable to connect with Redis: {e}");
|
||||||
|
Error::Connect
|
||||||
|
})?;
|
||||||
|
Ok(Self {
|
||||||
|
client,
|
||||||
|
namespace: config.namespace,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn format_key(&self, key: &str) -> String {
|
||||||
|
self.namespace
|
||||||
|
.as_ref()
|
||||||
|
.map(|s| format!("{s}:{key}"))
|
||||||
|
.unwrap_or(key.into())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl CacheAdapter for RedisCacheAdapter {
|
||||||
|
async fn new(config: Config) -> CResult<Self> {
|
||||||
|
RedisCacheAdapter::connect(config.config().map_err(|e| {
|
||||||
|
error!("Failed to parse redis cache config: {e}");
|
||||||
|
Error::InvalidConfig
|
||||||
|
})?)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn read<T>(&mut self, key: &str) -> CResult<Option<T>>
|
||||||
|
where
|
||||||
|
T: DeserializeOwned,
|
||||||
|
{
|
||||||
|
let mut conn = self.client.get().await.map_err(|e| {
|
||||||
|
error!("Failed to use pool connection to redis: {e}");
|
||||||
|
Error::Connect
|
||||||
|
})?;
|
||||||
|
let key = self.format_key(key);
|
||||||
|
let exists: bool = conn.exists(&key).await.map_err(|e| {
|
||||||
|
warn!("Failed to fetch {key:?} from redis: {e}");
|
||||||
|
Error::Connect
|
||||||
|
})?;
|
||||||
|
if !exists {
|
||||||
|
return Ok(None);
|
||||||
|
}
|
||||||
|
let data: Vec<u8> = conn.get(&key).await.map_err(|e| {
|
||||||
|
warn!("Failed to fetch {key:?} from redis: {e}");
|
||||||
|
Error::Connect
|
||||||
|
})?;
|
||||||
|
let entry: T = bincode::deserialize(&data).map_err(|e| {
|
||||||
|
warn!("Malformed redis cache entry {key:?}: {e}");
|
||||||
|
Error::InvalidEntry
|
||||||
|
})?;
|
||||||
|
Ok(Some(entry))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn set<T>(&mut self, key: &str, data: T, expires_in: Option<Duration>) -> CResult<()>
|
||||||
|
where
|
||||||
|
T: Serialize + Send,
|
||||||
|
{
|
||||||
|
let mut conn = self.client.get().await.map_err(|e| {
|
||||||
|
error!("Failed to use pool connection to redis: {e}");
|
||||||
|
Error::Connect
|
||||||
|
})?;
|
||||||
|
let data = bincode::serialize(&data).map_err(|e| {
|
||||||
|
warn!("Malformed redis cache entry {key:?}: {e}");
|
||||||
|
Error::InvalidEntry
|
||||||
|
})?;
|
||||||
|
let key = self.format_key(&*key);
|
||||||
|
match expires_in {
|
||||||
|
Some(duration) => {
|
||||||
|
conn.set_ex(&*key, data, duration.as_secs() as usize)
|
||||||
|
.await
|
||||||
|
.map_err(|e| {
|
||||||
|
warn!("Failed to fetch {key:?} from redis: {e}");
|
||||||
|
Error::Connect
|
||||||
|
})?;
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
conn.set(&*key, data).await.map_err(|e| {
|
||||||
|
warn!("Failed to fetch {key:?} from redis: {e}");
|
||||||
|
Error::Connect
|
||||||
|
})?;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn invalidate(&mut self, pattern: InvalidatePattern<'_>) -> CResult<u64> {
|
||||||
|
let mut conn = self.client.get().await.map_err(|e| {
|
||||||
|
error!("Failed to use pool connection to redis: {e}");
|
||||||
|
Error::Connect
|
||||||
|
})?;
|
||||||
|
let keys: Vec<String> = conn
|
||||||
|
.keys::<String, Vec<String>>(pattern.as_string())
|
||||||
|
.await
|
||||||
|
.map_err(|e| {
|
||||||
|
warn!("Failed to fetch keys with pattern {pattern:?} from redis: {e}");
|
||||||
|
Error::Connect
|
||||||
|
})?
|
||||||
|
.iter()
|
||||||
|
.map(|s| self.format_key(s))
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
let size = conn.del::<'_, _, u64>(&keys).await.map_err(|e| {
|
||||||
|
warn!("Failed to invalidate keys with pattern {pattern:?} from redis: {e}");
|
||||||
|
Error::Invalidate(keys)
|
||||||
|
})?;
|
||||||
|
Ok(size)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn clear(&mut self) -> CResult<u64> {
|
||||||
|
self.invalidate(InvalidatePattern::StartsWith("".into()))
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
trait AsRedisArgs {
|
||||||
|
fn as_string(&self) -> String;
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AsRedisArgs for InvalidatePattern<'_> {
|
||||||
|
fn as_string(&self) -> String {
|
||||||
|
match self {
|
||||||
|
InvalidatePattern::StartsWith(s) => format!("{s}*"),
|
||||||
|
InvalidatePattern::EndsWith(s) => format!("*{s}"),
|
||||||
|
InvalidatePattern::Contains(s) => format!("*{s}*"),
|
||||||
|
InvalidatePattern::Const(s) => format!("{s}"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
10
crates/cache-adapter/Cargo.toml
Normal file
10
crates/cache-adapter/Cargo.toml
Normal file
@ -0,0 +1,10 @@
|
|||||||
|
[package]
|
||||||
|
name = "cache-adapter"
|
||||||
|
version = "0.1.0"
|
||||||
|
edition = "2021"
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
async-trait = { version = "0.1.68" }
|
||||||
|
serde = { version = "1.0.163", feeatures = ['derive'] }
|
||||||
|
thiserror = { version = "1.0.40" }
|
||||||
|
toml = { version = "0.7.3" }
|
57
crates/cache-adapter/src/lib.rs
Normal file
57
crates/cache-adapter/src/lib.rs
Normal file
@ -0,0 +1,57 @@
|
|||||||
|
use std::borrow::Cow;
|
||||||
|
|
||||||
|
use async_trait::async_trait;
|
||||||
|
|
||||||
|
#[derive(Debug, thiserror::Error)]
|
||||||
|
pub enum Error {
|
||||||
|
#[error("Unable to connect")]
|
||||||
|
Connect,
|
||||||
|
#[error("Entry is invalid")]
|
||||||
|
InvalidEntry,
|
||||||
|
#[error("Can't invalidate keys {0:?}")]
|
||||||
|
Invalidate(Vec<String>),
|
||||||
|
#[error("Failed to parse config")]
|
||||||
|
InvalidConfig,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub type CResult<T> = Result<T, Error>;
|
||||||
|
|
||||||
|
pub struct Config(pub String);
|
||||||
|
|
||||||
|
impl Config {
|
||||||
|
pub fn config<S: serde::de::DeserializeOwned>(self) -> Result<S, toml::de::Error> {
|
||||||
|
toml::from_str(&self.0)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub enum InvalidatePattern<'s> {
|
||||||
|
StartsWith(Cow<'s, str>),
|
||||||
|
EndsWith(Cow<'s, str>),
|
||||||
|
Contains(Cow<'s, str>),
|
||||||
|
Const(Cow<'s, str>),
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
pub trait CacheAdapter: Sized {
|
||||||
|
async fn new(config: Config) -> CResult<Self>;
|
||||||
|
|
||||||
|
async fn read<T>(&mut self, key: &str) -> CResult<Option<T>>
|
||||||
|
where
|
||||||
|
T: serde::de::DeserializeOwned;
|
||||||
|
|
||||||
|
async fn set<T>(
|
||||||
|
&mut self,
|
||||||
|
key: &str,
|
||||||
|
data: T,
|
||||||
|
expires_in: Option<std::time::Duration>,
|
||||||
|
) -> CResult<()>
|
||||||
|
where
|
||||||
|
T: serde::Serialize + Send;
|
||||||
|
|
||||||
|
async fn invalidate(&mut self, pattern: InvalidatePattern<'_>) -> CResult<u64>;
|
||||||
|
|
||||||
|
async fn clear(&mut self) -> CResult<u64>;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct CacheStorage {}
|
@ -1,6 +1,6 @@
|
|||||||
#![feature(async_fn_in_trait)]
|
#![feature(async_fn_in_trait)]
|
||||||
|
|
||||||
pub use event_bus_messages::{Message, Msg};
|
pub use event_bus_messages::*;
|
||||||
|
|
||||||
#[derive(Debug, thiserror::Error)]
|
#[derive(Debug, thiserror::Error)]
|
||||||
pub enum EBError {
|
pub enum EBError {
|
||||||
@ -23,7 +23,7 @@ impl Config {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub trait MessageSend {
|
pub trait MessageSend {
|
||||||
async fn send(&mut self, msg: Msg) -> EBResult<()>;
|
async fn send(&mut self, topic: Topic, msg: Msg) -> EBResult<()>;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub trait EventBus<Stream, Sender>
|
pub trait EventBus<Stream, Sender>
|
||||||
@ -33,3 +33,20 @@ where
|
|||||||
{
|
{
|
||||||
async fn connect(config: Config) -> Result<(Stream, Sender), ()>;
|
async fn connect(config: Config) -> Result<(Stream, Sender), ()>;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// For client-server purpose only
|
||||||
|
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
|
||||||
|
pub struct ClientMsg {
|
||||||
|
pub topic: Topic,
|
||||||
|
pub msg: Msg,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ClientMsg {
|
||||||
|
pub fn from_bytes(v: &[u8]) -> bincode::Result<Self> {
|
||||||
|
bincode::deserialize(v)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn to_bytes(&self) -> bincode::Result<Vec<u8>> {
|
||||||
|
bincode::serialize(self)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -7,3 +7,4 @@ edition = "2021"
|
|||||||
thiserror = { version = "1.0.40" }
|
thiserror = { version = "1.0.40" }
|
||||||
serde = { version = "1.0.162", features = ['derive'] }
|
serde = { version = "1.0.162", features = ['derive'] }
|
||||||
bincode = { version = "1.3.3" }
|
bincode = { version = "1.3.3" }
|
||||||
|
serde_json = { version = "1.0.96" }
|
||||||
|
@ -1,13 +1,48 @@
|
|||||||
|
use std::fmt::Debug;
|
||||||
|
|
||||||
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
|
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
|
||||||
pub enum Msg {
|
pub enum TestMsg {
|
||||||
Seek(usize),
|
|
||||||
Ping,
|
|
||||||
Pong,
|
|
||||||
Ack,
|
|
||||||
Acked,
|
|
||||||
Test1(usize, usize, usize, usize),
|
Test1(usize, usize, usize, usize),
|
||||||
Test2(u8, u16, u32, char, usize),
|
Test2(u8, u16, u32, char, usize),
|
||||||
LenControl(usize, usize, usize, usize),
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
|
||||||
|
pub enum CursorMsg {
|
||||||
|
Seek(u64),
|
||||||
|
Aligned,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
|
||||||
|
pub enum AckMsg {
|
||||||
|
Ack,
|
||||||
|
Acked,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
|
||||||
|
pub enum Msg {
|
||||||
|
Ping,
|
||||||
|
Pong,
|
||||||
|
Ack(AckMsg),
|
||||||
|
Cursor(CursorMsg),
|
||||||
|
Test(TestMsg),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Msg {
|
||||||
|
pub fn is_ack(&self) -> bool {
|
||||||
|
matches!(self, Msg::Ack(_))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn is_aligned(&self) -> bool {
|
||||||
|
matches!(self, Msg::Cursor(CursorMsg::Aligned))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Default, Debug, PartialEq, Clone, Copy, serde::Deserialize, serde::Serialize)]
|
||||||
|
pub enum Topic {
|
||||||
|
Test,
|
||||||
|
PingPong,
|
||||||
|
#[default]
|
||||||
|
Default,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Msg {
|
impl Msg {
|
||||||
@ -23,6 +58,7 @@ impl Msg {
|
|||||||
#[derive(Debug, serde::Deserialize, serde::Serialize)]
|
#[derive(Debug, serde::Deserialize, serde::Serialize)]
|
||||||
pub struct Message {
|
pub struct Message {
|
||||||
pub offset: u64,
|
pub offset: u64,
|
||||||
|
pub topic: Topic,
|
||||||
pub payload: Msg,
|
pub payload: Msg,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "redis-event-bus"
|
name = "event-bus-redis"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
edition = "2021"
|
edition = "2021"
|
||||||
|
|
@ -3,7 +3,7 @@
|
|||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use std::task::{Context, Poll};
|
use std::task::{Context, Poll};
|
||||||
|
|
||||||
use event_bus_adapter::{Config, EBError, EBResult, EventBus, Message, MessageSend, Msg};
|
use event_bus_adapter::{Config, EBError, EBResult, EventBus, Message, MessageSend, Msg, Topic};
|
||||||
use futures_util::stream::{SplitSink, SplitStream};
|
use futures_util::stream::{SplitSink, SplitStream};
|
||||||
use futures_util::{SinkExt, StreamExt};
|
use futures_util::{SinkExt, StreamExt};
|
||||||
use redis_async::client::connect::RespConnection;
|
use redis_async::client::connect::RespConnection;
|
||||||
@ -14,10 +14,11 @@ use tracing::warn;
|
|||||||
pub struct MessageSender(SplitSink<RespConnection, RespValue>);
|
pub struct MessageSender(SplitSink<RespConnection, RespValue>);
|
||||||
|
|
||||||
impl MessageSend for MessageSender {
|
impl MessageSend for MessageSender {
|
||||||
async fn send(&mut self, msg: Msg) -> EBResult<()> {
|
async fn send(&mut self, topic: Topic, msg: Msg) -> EBResult<()> {
|
||||||
match (Message {
|
match (Message {
|
||||||
payload: msg,
|
payload: msg,
|
||||||
offset: 0,
|
offset: 0,
|
||||||
|
topic,
|
||||||
})
|
})
|
||||||
.to_bytes()
|
.to_bytes()
|
||||||
{
|
{
|
@ -1,35 +0,0 @@
|
|||||||
[package]
|
|
||||||
name = "local-event-bus"
|
|
||||||
version = "0.1.0"
|
|
||||||
edition = "2021"
|
|
||||||
|
|
||||||
[[bin]]
|
|
||||||
name = "leb"
|
|
||||||
path = "./src/main.rs"
|
|
||||||
|
|
||||||
[[bin]]
|
|
||||||
name = "lebc"
|
|
||||||
path = "./src/check1.rs"
|
|
||||||
|
|
||||||
[lib]
|
|
||||||
name = "leb"
|
|
||||||
path = "./src/lib.rs"
|
|
||||||
|
|
||||||
[dependencies]
|
|
||||||
event-bus-adapter = { path = "../event-bus-adapter" }
|
|
||||||
thiserror = { version = "1.0.40" }
|
|
||||||
futures-util = { version = "0.3.28" }
|
|
||||||
futures = { version = "0.3.28" }
|
|
||||||
tracing = { version = "0" }
|
|
||||||
serde = { version = "1.0.162", features = ['derive'] }
|
|
||||||
flumedb = { version = "*" }
|
|
||||||
serde_json = { version = "*" }
|
|
||||||
async-std = { version = "*", features = ["attributes"] }
|
|
||||||
tide = { version = "0.16.0" }
|
|
||||||
gumdrop = { version = "*" }
|
|
||||||
tide-websockets = "0.4.0"
|
|
||||||
bincode = { version = "1" }
|
|
||||||
uuid = { version = "1.3.2", features = ['v4'] }
|
|
||||||
crossbeam-channel = { version = "0.5.8" }
|
|
||||||
tracing-subscriber = { version = "0.3.17", features = ['env-filter'] }
|
|
||||||
memmap2 = { version = "0.6.2" }
|
|
@ -1,39 +0,0 @@
|
|||||||
use async_std::stream::StreamExt;
|
|
||||||
use async_std::task::spawn;
|
|
||||||
use event_bus_adapter::{Config, EventBus, MessageSend, Msg};
|
|
||||||
use leb::{LocalEventBus, MessageSender, MessageStream};
|
|
||||||
use tracing::info;
|
|
||||||
|
|
||||||
#[async_std::main]
|
|
||||||
async fn main() {
|
|
||||||
tracing_subscriber::fmt::init();
|
|
||||||
|
|
||||||
let config = Config("bind = \"localhost\"\nport = 8686".into());
|
|
||||||
let (mut stream, mut sender) =
|
|
||||||
<LocalEventBus as EventBus<MessageStream, MessageSender>>::connect(config)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
let mut sender2 = sender.clone();
|
|
||||||
|
|
||||||
info!("Send seek 0");
|
|
||||||
MessageSend::send(&mut sender, Msg::Seek(0)).await.unwrap();
|
|
||||||
info!("After seek 0");
|
|
||||||
|
|
||||||
info!("Send test1");
|
|
||||||
MessageSend::send(&mut sender, Msg::Test1(1, 2, 3, 4))
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
info!("after test1");
|
|
||||||
|
|
||||||
info!("Send test2");
|
|
||||||
MessageSend::send(&mut sender, Msg::Test2(1, 2, 3, 'a', 5))
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
info!("after test2");
|
|
||||||
|
|
||||||
while let Some(msg) = stream.next().await {
|
|
||||||
info!("Received: {msg:?}");
|
|
||||||
sender2.ack().await.unwrap();
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,140 +0,0 @@
|
|||||||
#![feature(async_fn_in_trait)]
|
|
||||||
|
|
||||||
use std::future::Future;
|
|
||||||
use std::pin::Pin;
|
|
||||||
use std::sync::{Arc, RwLock};
|
|
||||||
use std::task::{Context, Poll};
|
|
||||||
|
|
||||||
use async_std::io::ReadExt;
|
|
||||||
use async_std::task::{sleep, spawn};
|
|
||||||
use event_bus_adapter::*;
|
|
||||||
use futures_util::AsyncWriteExt;
|
|
||||||
use tracing::{debug, error, warn};
|
|
||||||
|
|
||||||
pub struct MessageStream {
|
|
||||||
client: async_std::net::TcpStream,
|
|
||||||
connected: Arc<RwLock<bool>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl futures::stream::Stream for MessageStream {
|
|
||||||
type Item = Message;
|
|
||||||
|
|
||||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
|
||||||
let mut_self: &mut Self = unsafe { Pin::get_unchecked_mut(self) };
|
|
||||||
|
|
||||||
if !*mut_self.connected.read().unwrap() {
|
|
||||||
return Poll::Ready(None);
|
|
||||||
}
|
|
||||||
|
|
||||||
const LEN_SIZE: usize = std::mem::size_of::<usize>();
|
|
||||||
let mut len_buf = [0; LEN_SIZE];
|
|
||||||
|
|
||||||
let mut f = mut_self.client.read_exact(&mut len_buf);
|
|
||||||
let r = Pin::new(&mut f).poll(cx);
|
|
||||||
let len = match r {
|
|
||||||
Poll::Ready(Ok(())) => usize::from_le_bytes(len_buf),
|
|
||||||
Poll::Ready(Err(e)) => {
|
|
||||||
error!("Failed to read from message stream: {e}");
|
|
||||||
return Poll::Ready(None);
|
|
||||||
}
|
|
||||||
Poll::Pending => return Poll::Pending,
|
|
||||||
};
|
|
||||||
let mut buffer = vec![0; len];
|
|
||||||
let mut f = mut_self.client.read_exact(&mut buffer[..len]);
|
|
||||||
let r = Pin::new(&mut f).poll(cx);
|
|
||||||
match r {
|
|
||||||
Poll::Ready(Ok(())) => match Message::from_bytes(&buffer[..]) {
|
|
||||||
Ok(msg) => Poll::Ready(Some(msg)),
|
|
||||||
_ => Poll::Ready(None),
|
|
||||||
},
|
|
||||||
Poll::Ready(Err(e)) => {
|
|
||||||
error!("Failed to read from message stream: {e}");
|
|
||||||
Poll::Ready(None)
|
|
||||||
}
|
|
||||||
Poll::Pending => Poll::Pending,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
|
||||||
pub struct MessageSender {
|
|
||||||
client: async_std::net::TcpStream,
|
|
||||||
connected: Arc<RwLock<bool>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl MessageSender {
|
|
||||||
pub async fn ack(&mut self) -> EBResult<()> {
|
|
||||||
self.send(Msg::Ack).await
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl MessageSend for MessageSender {
|
|
||||||
async fn send(&mut self, msg: Msg) -> EBResult<()> {
|
|
||||||
if !*self.connected.read().unwrap() {
|
|
||||||
return Err(EBError::Closed);
|
|
||||||
}
|
|
||||||
let Ok(msg_buf) = msg.to_bytes() else {
|
|
||||||
return Err(EBError::SendFailed);
|
|
||||||
};
|
|
||||||
let len = msg_buf.len();
|
|
||||||
let mut len_buf = len.to_le_bytes().to_vec();
|
|
||||||
len_buf.extend_from_slice(&msg_buf);
|
|
||||||
debug!("Sending {msg:?} with {len} as {len_buf:?}");
|
|
||||||
// debug!("Parts {len_buf:?} {msg_buf:?}");
|
|
||||||
|
|
||||||
if let Err(e) = self.client.write(&len_buf).await {
|
|
||||||
warn!("Failed to write message {msg:?}: {e}");
|
|
||||||
return Err(EBError::SendFailed);
|
|
||||||
}
|
|
||||||
// if let Err(e) = self.client.write(&v).await {
|
|
||||||
// warn!("Failed to write message {msg:?}: {e}");
|
|
||||||
// return Err(EBError::SendFailed);
|
|
||||||
// }
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, serde::Serialize, serde::Deserialize)]
|
|
||||||
pub struct LebConfig {
|
|
||||||
bind: String,
|
|
||||||
port: u16,
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct LocalEventBus;
|
|
||||||
|
|
||||||
impl EventBus<MessageStream, MessageSender> for LocalEventBus {
|
|
||||||
async fn connect(config: Config) -> Result<(MessageStream, MessageSender), ()> {
|
|
||||||
let config: LebConfig = config.config().expect("Invalid Local Event Bus config");
|
|
||||||
let client = async_std::net::TcpStream::connect(format!("{}:{}", config.bind, config.port))
|
|
||||||
.await
|
|
||||||
.expect("Failed tp connect to event bus");
|
|
||||||
client.set_nodelay(false).unwrap();
|
|
||||||
|
|
||||||
let connected = Arc::new(RwLock::new(true));
|
|
||||||
let stream = MessageStream {
|
|
||||||
client: client.clone(),
|
|
||||||
connected: connected.clone(),
|
|
||||||
};
|
|
||||||
let sender = MessageSender {
|
|
||||||
client,
|
|
||||||
connected: connected.clone(),
|
|
||||||
};
|
|
||||||
|
|
||||||
{
|
|
||||||
let connected = connected.clone();
|
|
||||||
let mut sender = sender.clone();
|
|
||||||
spawn(async move {
|
|
||||||
loop {
|
|
||||||
if sender.send(Msg::Pong).await.is_err() {
|
|
||||||
*connected.write().unwrap() = false;
|
|
||||||
break;
|
|
||||||
} else {
|
|
||||||
sleep(std::time::Duration::from_millis(300)).await;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
})
|
|
||||||
};
|
|
||||||
|
|
||||||
Ok((stream, sender))
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,332 +0,0 @@
|
|||||||
use std::collections::HashMap;
|
|
||||||
use std::net::Shutdown;
|
|
||||||
use std::sync::{Arc, RwLock};
|
|
||||||
|
|
||||||
use async_std::net::{TcpListener, TcpStream};
|
|
||||||
use async_std::task::sleep;
|
|
||||||
use event_bus_adapter::*;
|
|
||||||
use flumedb::*;
|
|
||||||
use futures_util::{AsyncReadExt, AsyncWriteExt, StreamExt};
|
|
||||||
use gumdrop::Options;
|
|
||||||
use tracing::{debug, error, info, warn};
|
|
||||||
use tracing_subscriber::EnvFilter;
|
|
||||||
use uuid::Uuid;
|
|
||||||
|
|
||||||
#[derive(Debug, thiserror::Error)]
|
|
||||||
enum Error {
|
|
||||||
#[error("Reserved")]
|
|
||||||
Reversed,
|
|
||||||
#[error("Client closed connection")]
|
|
||||||
BrokenPipe,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Options)]
|
|
||||||
struct Opts {
|
|
||||||
help: bool,
|
|
||||||
log_path: Option<String>,
|
|
||||||
bind: Option<String>,
|
|
||||||
port: Option<u16>,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[async_std::main]
|
|
||||||
async fn main() -> Result<(), Error> {
|
|
||||||
tracing_subscriber::fmt()
|
|
||||||
.with_env_filter(EnvFilter::from_default_env())
|
|
||||||
.init();
|
|
||||||
|
|
||||||
let opts: Opts = Options::parse_args_default_or_exit();
|
|
||||||
let path = opts
|
|
||||||
.log_path
|
|
||||||
.unwrap_or_else(|| std::env::var("LOG_PATH").expect("No path to save file"));
|
|
||||||
|
|
||||||
let listener = TcpListener::bind(format!(
|
|
||||||
"{}:{}",
|
|
||||||
opts.bind
|
|
||||||
.unwrap_or_else(|| std::env::var("BIND").expect("No bind parameter")),
|
|
||||||
opts.port.unwrap_or_else(|| std::env::var("PORT")
|
|
||||||
.expect("No port parameter")
|
|
||||||
.parse()
|
|
||||||
.expect("Invalid port format. Expect number"))
|
|
||||||
))
|
|
||||||
.await
|
|
||||||
.expect("Failed to start server");
|
|
||||||
|
|
||||||
let mut incoming = listener.incoming();
|
|
||||||
|
|
||||||
let clients = Arc::new(RwLock::new(HashMap::with_capacity(4086)));
|
|
||||||
let (tx, rx) = std::sync::mpsc::channel();
|
|
||||||
|
|
||||||
{
|
|
||||||
let clients = clients.clone();
|
|
||||||
async_std::task::spawn(async move {
|
|
||||||
loop {
|
|
||||||
let Ok(msg) = rx.recv() else {
|
|
||||||
continue;
|
|
||||||
};
|
|
||||||
match msg {
|
|
||||||
InnerMsg::Drop(uuid) => {
|
|
||||||
clients.write().unwrap().remove(&uuid);
|
|
||||||
}
|
|
||||||
InnerMsg::Register(client) => {
|
|
||||||
clients.write().unwrap().insert(client.id, client);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
while let Some(stream) = incoming.next().await {
|
|
||||||
let path = path.clone();
|
|
||||||
let tx = tx.clone();
|
|
||||||
let clients = clients.clone();
|
|
||||||
|
|
||||||
async_std::task::spawn(async move {
|
|
||||||
let Ok(mut stream) = stream else {
|
|
||||||
return;
|
|
||||||
};
|
|
||||||
stream.set_nodelay(false).unwrap();
|
|
||||||
|
|
||||||
let mut log = OffsetLog::<u64>::from_file(
|
|
||||||
std::fs::OpenOptions::new()
|
|
||||||
.append(true)
|
|
||||||
.create(true)
|
|
||||||
.write(true)
|
|
||||||
.read(true)
|
|
||||||
.open(&path)
|
|
||||||
.expect("Failed to open log file"),
|
|
||||||
)
|
|
||||||
.expect("Failed to open log file");
|
|
||||||
|
|
||||||
let mut client = Client {
|
|
||||||
stream,
|
|
||||||
id: Uuid::new_v4(),
|
|
||||||
alive: Arc::new(RwLock::new(true)),
|
|
||||||
offset: None,
|
|
||||||
};
|
|
||||||
let mut buffer = Vec::with_capacity(4086);
|
|
||||||
|
|
||||||
if let Err(e) = tx.send(InnerMsg::Register(client.clone())) {
|
|
||||||
warn!("Failed to send register: {e}");
|
|
||||||
}
|
|
||||||
|
|
||||||
while client.is_alive() {
|
|
||||||
let Ok(Some(msg)) = read_msg(client.clone(), &mut buffer).await else {
|
|
||||||
continue;
|
|
||||||
};
|
|
||||||
|
|
||||||
match &msg {
|
|
||||||
Msg::Ping | Msg::Pong => {
|
|
||||||
if let Err(e) = client
|
|
||||||
.stream
|
|
||||||
.write_all(&Msg::Ping.to_bytes().unwrap_or_default())
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
warn!("Failed to write to client. Closing. {e}");
|
|
||||||
sleep(std::time::Duration::from_millis(300)).await;
|
|
||||||
client.stream.close().await.ok();
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Msg::Seek(offset) => {
|
|
||||||
let log_file = match OffsetLog::<u64>::open_read_only(&path) {
|
|
||||||
Ok(f) => f,
|
|
||||||
Err(e) => {
|
|
||||||
error!("Failed to open file for read: {e}");
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
let mut offset = (*offset) as u64;
|
|
||||||
|
|
||||||
let iter = log_file.bidir_iter_at_offset(offset);
|
|
||||||
let mut iter =
|
|
||||||
iter.filter_map(|e| Msg::from_bytes(&e.data).ok().zip(Some(e.offset)));
|
|
||||||
|
|
||||||
while let Some((msg, new_offset)) = iter.next() {
|
|
||||||
match send_directly(client.clone(), new_offset, msg).await {
|
|
||||||
Err(Error::BrokenPipe) => break,
|
|
||||||
Err(e) => {
|
|
||||||
warn!("Failed to send message: {e}");
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
_ => {
|
|
||||||
if let Err(Error::BrokenPipe) =
|
|
||||||
wait_ack(client.clone(), &mut buffer).await
|
|
||||||
{
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
offset = new_offset;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
|
||||||
client.offset = Some(Arc::new(RwLock::new(offset)));
|
|
||||||
}
|
|
||||||
_ => {
|
|
||||||
info!("Incoming message: {msg:?}");
|
|
||||||
if client.offset.is_none() {
|
|
||||||
warn!("Offset is not set. Skipping...");
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
let bytes = match msg.to_bytes() {
|
|
||||||
Ok(v) => v,
|
|
||||||
Err(e) => {
|
|
||||||
warn!("Failed to serialize message: {e}");
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
debug!("Msg bytes {:?}", bytes.as_slice());
|
|
||||||
let offset = match log.append(&bytes[..]) {
|
|
||||||
Ok(offset) => offset,
|
|
||||||
Err(e) => {
|
|
||||||
error!("Failed to write message to file: {e}");
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
if let Err(e) = broadcast(clients.clone(), &mut buffer, offset, msg).await {
|
|
||||||
warn!("Failed to broadcast message: {e}");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
client.close();
|
|
||||||
if let Err(e) = tx.send(InnerMsg::Drop(client.id)) {
|
|
||||||
warn!("Failed to send close client: {e}");
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn broadcast(
|
|
||||||
clients: Arc<RwLock<HashMap<Uuid, Client>>>,
|
|
||||||
buffer: &mut Vec<u8>,
|
|
||||||
offset: u64,
|
|
||||||
msg: Msg,
|
|
||||||
) -> Result<(), Error> {
|
|
||||||
let clients = clients
|
|
||||||
.read()
|
|
||||||
.unwrap()
|
|
||||||
.values()
|
|
||||||
.filter(|c| *c.alive.read().unwrap())
|
|
||||||
.map(Clone::clone)
|
|
||||||
.collect::<Vec<_>>();
|
|
||||||
for client in clients {
|
|
||||||
let id = client.id;
|
|
||||||
if let Err(e) = send_directly(client.clone(), offset, msg.clone()).await {
|
|
||||||
warn!("Failed to send message to {}: {e}", id);
|
|
||||||
}
|
|
||||||
if let Err(Error::BrokenPipe) = wait_ack(client, buffer).await {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn send_directly(mut client: Client, offset: u64, msg: Msg) -> Result<(), Error> {
|
|
||||||
let msg_buf = match (Message {
|
|
||||||
offset,
|
|
||||||
payload: msg,
|
|
||||||
})
|
|
||||||
.to_bytes()
|
|
||||||
{
|
|
||||||
Ok(v) => v,
|
|
||||||
Err(e) => {
|
|
||||||
warn!("Failed to serialize message: {e}");
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
};
|
|
||||||
let len = msg_buf.len();
|
|
||||||
let mut len_buf = len.to_le_bytes().to_vec();
|
|
||||||
len_buf.extend_from_slice(&msg_buf);
|
|
||||||
|
|
||||||
if let Err(e) = client.stream.write(&len_buf).await {
|
|
||||||
warn!("Failed to write message to client: {e}");
|
|
||||||
client.close();
|
|
||||||
Err(Error::BrokenPipe)
|
|
||||||
} else {
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn read_msg(mut client: Client, buffer: &mut Vec<u8>) -> Result<Option<Msg>, Error> {
|
|
||||||
const LEN_SIZE: usize = std::mem::size_of::<usize>();
|
|
||||||
|
|
||||||
let mut len_buf = [0; LEN_SIZE];
|
|
||||||
let len = match AsyncReadExt::read_exact(&mut client.stream, &mut len_buf).await {
|
|
||||||
Ok(()) => {
|
|
||||||
// debug!("Received length bytes {len_buf:?}");
|
|
||||||
usize::from_le_bytes(len_buf)
|
|
||||||
}
|
|
||||||
Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
|
|
||||||
client.close();
|
|
||||||
return Err(Error::BrokenPipe);
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
warn!("Failed to read msg len from client: {e}");
|
|
||||||
return Err(Error::BrokenPipe);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
buffer.clear();
|
|
||||||
buffer.reserve(len);
|
|
||||||
buffer.resize(len, 0);
|
|
||||||
// debug!("Length is {len}");
|
|
||||||
// debug!("FRAME: {len_buf:?} {buffer:?}");
|
|
||||||
|
|
||||||
let _l = match AsyncReadExt::read_exact(&mut client.stream, &mut buffer[..len]).await {
|
|
||||||
Ok(n) => n,
|
|
||||||
Err(e) => {
|
|
||||||
warn!("Failed to read from client: {e}");
|
|
||||||
return Err(Error::BrokenPipe);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
// debug!("Incoming data: {len} {buffer:?}");
|
|
||||||
match Msg::from_bytes(&buffer) {
|
|
||||||
Ok(msg) => {
|
|
||||||
if !matches!(msg, Msg::Pong) {
|
|
||||||
debug!("Message {msg:?}");
|
|
||||||
}
|
|
||||||
Ok(Some(msg))
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
warn!("Invalid incoming message {buffer:?}: {e}");
|
|
||||||
return Ok(None);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn wait_ack(client: Client, buffer: &mut Vec<u8>) -> Result<(), Error> {
|
|
||||||
loop {
|
|
||||||
buffer.clear();
|
|
||||||
let msg = read_msg(client.clone(), buffer).await?;
|
|
||||||
if let Some(Msg::Ack) = msg {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
enum InnerMsg {
|
|
||||||
Register(Client),
|
|
||||||
Drop(Uuid),
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
struct Client {
|
|
||||||
stream: TcpStream,
|
|
||||||
id: Uuid,
|
|
||||||
alive: Arc<RwLock<bool>>,
|
|
||||||
offset: Option<Arc<RwLock<u64>>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Client {
|
|
||||||
fn close(&mut self) {
|
|
||||||
*self.alive.write().unwrap() = false;
|
|
||||||
self.stream.shutdown(Shutdown::Both).ok();
|
|
||||||
}
|
|
||||||
|
|
||||||
fn is_alive(&self) -> bool {
|
|
||||||
*self.alive.read().unwrap()
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in New Issue
Block a user