Working basic server

This commit is contained in:
eraden 2023-09-16 21:55:18 +02:00
parent b05120c91d
commit 05e8baa6fb
3 changed files with 420 additions and 315 deletions

282
Cargo.lock generated
View File

@ -17,6 +17,15 @@ version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
[[package]]
name = "aho-corasick"
version = "1.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c378d78423fdad8089616f827526ee33c19f2fddbd5de1629152c9593ba4783"
dependencies = [
"memchr",
]
[[package]]
name = "autocfg"
version = "1.1.0"
@ -71,6 +80,12 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "convert_case"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6245d59a3e82a7fc217c5828a6692dbc6dfb63a0c8c90495621f7b9d79704a0e"
[[package]]
name = "deranged"
version = "0.3.8"
@ -80,6 +95,19 @@ dependencies = [
"serde",
]
[[package]]
name = "derive_more"
version = "0.99.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4fb810d30a7c1953f91334de7244731fc3f3c10d7fe163338a35b9f640960321"
dependencies = [
"convert_case",
"proc-macro2",
"quote",
"rustc_version",
"syn 1.0.109",
]
[[package]]
name = "futures"
version = "0.3.28"
@ -136,7 +164,7 @@ checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72"
dependencies = [
"proc-macro2",
"quote",
"syn",
"syn 2.0.29",
]
[[package]]
@ -186,6 +214,26 @@ version = "0.28.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6fb8d784f27acf97159b40fc4db5ecd8aa23b9ad5ef69cdd136d3bc80665f0c0"
[[package]]
name = "gumdrop"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5bc700f989d2f6f0248546222d9b4258f5b02a171a431f8285a81c08142629e3"
dependencies = [
"gumdrop_derive",
]
[[package]]
name = "gumdrop_derive"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "729f9bd3449d77e7831a18abfb7ba2f99ee813dfd15b8c2167c9a54ba20aa99d"
dependencies = [
"proc-macro2",
"quote",
"syn 1.0.109",
]
[[package]]
name = "hermit-abi"
version = "0.3.2"
@ -198,6 +246,12 @@ version = "1.0.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "af150ab688ff2122fcef229be89cb50dd66af9e01a4ff320cc137eecc9bacc38"
[[package]]
name = "lazy_static"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
[[package]]
name = "libc"
version = "0.2.147"
@ -214,6 +268,21 @@ dependencies = [
"scopeguard",
]
[[package]]
name = "log"
version = "0.4.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f"
[[package]]
name = "matchers"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558"
dependencies = [
"regex-automata 0.1.10",
]
[[package]]
name = "memchr"
version = "2.6.2"
@ -259,8 +328,10 @@ checksum = "1d6b1cf1c2ae7c8c3898cbf8354ee836bc7037e35592d3739a9901d53c97b6a2"
name = "nats-server"
version = "0.1.0"
dependencies = [
"derive_more",
"futures",
"futures-channel",
"gumdrop",
"multipeek",
"nix",
"pin-project-lite",
@ -269,6 +340,8 @@ dependencies = [
"thiserror",
"time",
"tokio",
"tracing",
"tracing-subscriber",
"uuid",
]
@ -284,6 +357,16 @@ dependencies = [
"memoffset",
]
[[package]]
name = "nu-ansi-term"
version = "0.46.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84"
dependencies = [
"overload",
"winapi",
]
[[package]]
name = "num_cpus"
version = "1.16.0"
@ -303,6 +386,18 @@ dependencies = [
"memchr",
]
[[package]]
name = "once_cell"
version = "1.18.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d"
[[package]]
name = "overload"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
[[package]]
name = "parking_lot"
version = "0.12.1"
@ -365,12 +460,65 @@ dependencies = [
"bitflags 1.3.2",
]
[[package]]
name = "regex"
version = "1.9.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "697061221ea1b4a94a624f67d0ae2bfe4e22b8a17b6a192afb11046542cc8c47"
dependencies = [
"aho-corasick",
"memchr",
"regex-automata 0.3.8",
"regex-syntax 0.7.5",
]
[[package]]
name = "regex-automata"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132"
dependencies = [
"regex-syntax 0.6.29",
]
[[package]]
name = "regex-automata"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c2f401f4955220693b56f8ec66ee9c78abffd8d1c4f23dc41a23839eb88f0795"
dependencies = [
"aho-corasick",
"memchr",
"regex-syntax 0.7.5",
]
[[package]]
name = "regex-syntax"
version = "0.6.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1"
[[package]]
name = "regex-syntax"
version = "0.7.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dbb5fb1acd8a1a18b3dd5be62d25485eb770e05afb408a9627d14d451bae12da"
[[package]]
name = "rustc-demangle"
version = "0.1.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76"
[[package]]
name = "rustc_version"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366"
dependencies = [
"semver",
]
[[package]]
name = "ryu"
version = "1.0.15"
@ -383,6 +531,12 @@ version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
[[package]]
name = "semver"
version = "1.0.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b0293b4b29daaf487284529cc2f5675b8e57c61f70167ba415a463651fd6a918"
[[package]]
name = "serde"
version = "1.0.188"
@ -400,7 +554,7 @@ checksum = "4eca7ac642d82aa35b60049a6eccb4be6be75e599bd2e9adb5f875a737654af2"
dependencies = [
"proc-macro2",
"quote",
"syn",
"syn 2.0.29",
]
[[package]]
@ -414,6 +568,15 @@ dependencies = [
"serde",
]
[[package]]
name = "sharded-slab"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "900fba806f70c630b0a382d0d825e17a0f19fcd059a2ade1ff237bcddf446b31"
dependencies = [
"lazy_static",
]
[[package]]
name = "signal-hook-registry"
version = "1.4.1"
@ -448,6 +611,17 @@ dependencies = [
"windows-sys",
]
[[package]]
name = "syn"
version = "1.0.109"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237"
dependencies = [
"proc-macro2",
"quote",
"unicode-ident",
]
[[package]]
name = "syn"
version = "2.0.29"
@ -476,7 +650,17 @@ checksum = "6bb623b56e39ab7dcd4b1b98bb6c8f8d907ed255b18de254088016b27a8ee19b"
dependencies = [
"proc-macro2",
"quote",
"syn",
"syn 2.0.29",
]
[[package]]
name = "thread_local"
version = "1.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3fdd6f064ccff2d6567adcb3873ca630700f00b5ad3f060c25b5dcfd9a4ce152"
dependencies = [
"cfg-if",
"once_cell",
]
[[package]]
@ -533,7 +717,69 @@ checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e"
dependencies = [
"proc-macro2",
"quote",
"syn",
"syn 2.0.29",
]
[[package]]
name = "tracing"
version = "0.1.37"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8"
dependencies = [
"cfg-if",
"pin-project-lite",
"tracing-attributes",
"tracing-core",
]
[[package]]
name = "tracing-attributes"
version = "0.1.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.29",
]
[[package]]
name = "tracing-core"
version = "0.1.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0955b8137a1df6f1a2e9a37d8a6656291ff0297c1a97c24e0d8425fe2312f79a"
dependencies = [
"once_cell",
"valuable",
]
[[package]]
name = "tracing-log"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "78ddad33d2d10b1ed7eb9d1f518a5674713876e97e5bb9b7345a7984fbb4f922"
dependencies = [
"lazy_static",
"log",
"tracing-core",
]
[[package]]
name = "tracing-subscriber"
version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30a651bc37f915e81f087d86e62a18eec5f79550c7faff886f7090b4ea757c77"
dependencies = [
"matchers",
"nu-ansi-term",
"once_cell",
"regex",
"sharded-slab",
"smallvec",
"thread_local",
"tracing",
"tracing-core",
"tracing-log",
]
[[package]]
@ -551,12 +797,40 @@ dependencies = [
"getrandom",
]
[[package]]
name = "valuable"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d"
[[package]]
name = "wasi"
version = "0.11.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
[[package]]
name = "winapi"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419"
dependencies = [
"winapi-i686-pc-windows-gnu",
"winapi-x86_64-pc-windows-gnu",
]
[[package]]
name = "winapi-i686-pc-windows-gnu"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
[[package]]
name = "winapi-x86_64-pc-windows-gnu"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[[package]]
name = "windows-sys"
version = "0.48.0"

View File

@ -3,9 +3,15 @@ name = "nats-server"
version = "0.1.0"
edition = "2021"
[[test]]
name = "test_connect"
path = "./tests/test_connect.rs"
[dependencies]
derive_more = "0.99.17"
futures = "0.3.28"
futures-channel = "0.3.28"
gumdrop = "0.8.1"
multipeek = "0.1.2"
nix = { version = "0.27.1", features = ["socket", "hostname", "net", "dir"] }
pin-project-lite = "0.2.13"
@ -14,4 +20,6 @@ serde_json = "1.0.105"
thiserror = "1.0.47"
time = { version = "0.3.28", features = ["serde"] }
tokio = { version = "1.32.0", features = ["full"] }
tracing = "0.1.37"
tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
uuid = { version = "1.4.1", features = ["v4"] }

View File

@ -1,10 +1,13 @@
#![allow(dead_code)]
#![feature(async_fn_in_trait)]
use std::borrow::Cow;
use std::collections::BTreeSet;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
use time::OffsetDateTime;
use tokio::sync::mpsc::Sender;
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::{
@ -14,196 +17,56 @@ use tokio::{
spawn,
sync::{Mutex, RwLock},
};
use uuid::Uuid;
use multipeek::multipeek;
use tracing::{debug, info};
#[derive(Debug)]
pub struct Sub(String);
mod input;
use input::*;
mod output;
use output::*;
mod model;
use model::*;
mod client;
use client::*;
mod server;
use server::*;
mod error;
use error::*;
impl Sub {
pub fn new<S: Into<String>>(s: S) -> Self {Self(s.into())}
pub fn is_wildcard(&self) -> bool {
self.0.contains('*')
}
fn eq_with_wild(&self, chan: &str) -> bool {
let mut left = self.0.chars().peekable();
let mut right = multipeek(chan.chars());
while let Some(c) = left.peek() {
if *c == '*' {
while let Some('*') = left.peek() {
eprintln!("while let * l => {:?}", left.next());
eprintln!("while let * r => {:?}", right.next()); // consume right to match number of wildcard
}
// check non-wildcard
let pair = (left.peek(), right.peek());
eprintln!("after wild {pair:?}");
let current = match pair {
(Some(c), _) => *c,
(None, _) => return true,
};
while let Some(c) = right.peek_nth(0).cloned() {
eprintln!("skipping to {current} -> {c}");
let second = right.peek_nth(1).cloned();
if c == current && second != Some(current) {
break;
} else {
right.next();
}
}
let pair = (left.next(), right.next());
eprintln!("after wild {pair:?}");
match pair {
(None, _) => return true,
(Some(l), Some(r)) if l != r => return false,
(Some(_), None) => return false,
_ => {}
};
} else {
let pair = (left.next(), right.next());
eprintln!("non wild {pair:?}");
match pair {
(None, None) => return true,
(Some(l), Some(r)) if l != r => return false,
_ => {}
};
}
}
left.next() == right.next()
}
pub fn eq_with_chan(&self, chan: &str) -> bool {
if self.is_wildcard() {
println!("is wild");
self.eq_with_wild(chan)
} else {
println!("not wild {} {}", self.0.len(), chan.len());
if self.0.len() != chan.len() {
return false;
}
self.0.eq_ignore_ascii_case(&chan)
}
}
#[derive(gumdrop::Options)]
struct Args {
help: bool,
port: Option<u16>,
addr: Option<String>,
max_payload: Option<usize>,
}
#[cfg(test)]
mod sub_tests {
use super::*;
#[test]
fn check_simple() {
println!("================================================");
println!("check simple");
assert_eq!(Sub::new("foo").eq_with_chan("foo"), true);
assert_eq!(Sub::new("foo").eq_with_chan("fo"), false);
assert_eq!(Sub::new("foo").eq_with_chan("oo"), false);
}
#[test]
fn check_ends_wild() {
println!("================================================");
println!("check ends");
assert_eq!(Sub::new("fo*").eq_with_chan("foo"), true);
assert_eq!(Sub::new("f*").eq_with_chan("foo"), true);
assert_eq!(Sub::new("f*").eq_with_chan("foz"), true);
assert_eq!(Sub::new("f*").eq_with_chan("ff"), true);
assert_eq!(Sub::new("f*").eq_with_chan("of"), false);
assert_eq!(Sub::new("f*").eq_with_chan("ofz"), false);
}
#[test]
fn check_starts_wild() {
println!("================================================");
println!("check starts");
assert_eq!(Sub::new("*oo").eq_with_chan("foo"), true);
assert_eq!(Sub::new("*ofo").eq_with_chan("acfofo"), true);
assert_eq!(Sub::new("*o").eq_with_chan("foo"), true);
assert_eq!(Sub::new("*o").eq_with_chan("bar"), false);
assert_eq!(Sub::new("*o").eq_with_chan("foz"), false);
}
#[test]
fn check_middle_wild() {
println!("================================================");
println!("check middle");
assert_eq!(Sub::new("a*z").eq_with_chan("ahz"), true);
assert_eq!(Sub::new("a*z").eq_with_chan("ahkz"), true);
assert_eq!(Sub::new("a*z").eq_with_chan("hakz"), false);
assert_eq!(Sub::new("a*z").eq_with_chan("akza"), false);
}
#[test]
fn check_multiple_wild() {
println!("================================================");
println!("check multi");
assert_eq!(Sub::new("*a*z").eq_with_chan("gahz"), true);
assert_eq!(Sub::new("*a*z").eq_with_chan("ahz"), false);
assert_eq!(Sub::new("a*k*z").eq_with_chan("asklz"), true);
assert_eq!(Sub::new("a*k*z").eq_with_chan("sklz"), false);
assert_eq!(Sub::new("a*k*z").eq_with_chan("saklz"), false);
assert_eq!(Sub::new("a*k*z").eq_with_chan("asklzl"), false);
}
}
#[derive(Debug, Clone)]
struct Server {
clients: Arc<RwLock<HashMap<Uuid, Client>>>,
subcriptions: Arc<RwLock<Vec<(Sub, Uuid)>>>,
}
impl Server {
pub async fn register(&mut self, client: Client) {
self.clients.write().await.insert(client.id, client);
}
pub async fn remove(&mut self, client: Client) {
*client.dead.write().await = true;
self.clients.write().await.remove(&client.id);
}
}
#[derive(Debug, Clone)]
struct Client {
id: uuid::Uuid,
writer: Arc<Mutex<OwnedWriteHalf>>,
dead: Arc<RwLock<bool>>,
}
impl Client {
pub fn new(write: OwnedWriteHalf) -> Self {
Self {
id: uuid::Uuid::new_v4(),
writer: Arc::new(Mutex::new(write)),
dead: Arc::new(RwLock::new(false)),
}
}
pub async fn write(&self, s: Cow<'static, str>) -> Result<(), CmdErr> {
let mut l = self.writer.lock().await;
l.write(format!("{s}\r\n").as_bytes())
.await
.map_err(|e| {
eprintln!("{e}");
CmdErr::WriteFailed
})
.map(|_| ())
}
pub fn init_logger() {
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::filter::EnvFilter::from_default_env())
.init();
}
#[tokio::main]
async fn main() -> std::io::Result<()> {
let listener = tokio::net::TcpListener::bind(("0.0.0.0", 4222))
init_logger();
let args: Args = gumdrop::Options::parse_args_default_or_exit();
let server = Server::new(
&args.addr.unwrap_or_else(|| "0.0.0.0".into()),
args.port.unwrap_or(4222),
args.max_payload.unwrap_or(4096),
);
info!("Starting server at {}:{}", server.addr, server.port);
let listener = tokio::net::TcpListener::bind((server.addr.clone().as_str(), server.port))
.await
.unwrap();
listener.set_ttl(100).expect("Failed to set TTL");
while let Ok((stream, _addr)) = listener.accept().await {
while let Ok((stream, addr)) = listener.accept().await {
info!("new connection from {addr:?}");
let server = server.clone();
spawn(async move {
handle_connection(stream).await;
handle_connection(server.clone(), addr, stream).await;
});
}
Ok(())
@ -214,13 +77,14 @@ struct Context {
last_live_msg: Arc<RwLock<time::OffsetDateTime>>,
}
async fn handle_connection(stream: TcpStream) {
async fn handle_connection(server: Server, addr: SocketAddr, stream: TcpStream) {
let ctx = Context {
last_live_msg: Arc::new(RwLock::new(time::OffsetDateTime::now_utc())),
};
let is_alive = spawn(start_stream_health_check(ctx.clone()));
let handle_commands = spawn(handle_commands(stream, ctx));
let (read, client, dead_notif) = server.create_client(stream).await;
let is_alive = spawn(start_stream_health_check(ctx.clone(), dead_notif));
let handle_commands = spawn(handle_commands(server.clone(), addr, ctx, read, client));
tokio::select!(
_ = is_alive => (),
@ -228,38 +92,68 @@ async fn handle_connection(stream: TcpStream) {
);
}
async fn handle_commands(stream: TcpStream, ctx: Context) {
let (mut rx, mut tx) = stream.into_split();
let mut buf = Vec::with_capacity(4096);
async fn handle_commands(
server: Server,
addr: SocketAddr,
ctx: Context,
mut rx: OwnedReadHalf,
client: Client,
) {
let mut buf = Vec::with_capacity(server.max_payload);
while let Ok(_) = handle_incomming_cmd(&mut rx, &mut tx, &mut buf, ctx.clone()).await {}
let conn_info = ConnectionInfo {
client_ip: addr.to_string(),
server_id: "asdajsdjasdjas".into(),
server_name: "foo.bar".into(),
version: env!("CARGO_PKG_VERSION"),
proto: 1,
host: server.addr.clone(),
port: server.port,
client_id: client.id().await,
max_payload: server.max_payload,
};
client
.write(format!("{}", Info::new(conn_info)))
.await
.unwrap();
while let Ok(_) = handle_incomming_cmd(
&mut rx,
client.clone(),
&mut buf,
ctx.clone(),
server.clone(),
)
.await
{}
}
async fn handle_incomming_cmd(
rx: &mut OwnedReadHalf,
tx: &mut OwnedWriteHalf,
client: Client,
buf: &mut Vec<u8>,
ctx: Context,
server: Server,
) -> Result<(), CmdErr> {
let Ok(_n) = rx.read_buf(buf).await else {
buf.clear();
debug!("Reading message....");
let Some(pos) = read_to_msg_end(rx, buf).await else {
return Ok(());
};
let pos = match buf.iter().position(|i| *i == b'\r') {
Some(0) | None => return Ok(()),
Some(n) => n,
};
let Ok(msg) = std::str::from_utf8(&buf[..pos]) else {
return Ok(());
};
eprintln!("buffer msg is: {msg:?}");
debug!("buffer msg is: {msg:?}");
for msg in msg.to_owned().split('\r') {
eprintln!("Message is: {:?}", msg);
match handle_msg(msg, buf, rx, tx).await {
debug!("Message is: {:?}", msg);
let msg = msg.trim();
match handle_msg(msg, buf, rx, client.clone(), server.clone()).await {
Err(e) => {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
return Err(e);
}
Ok(Op::NoOp) => {}
Ok(()) => {}
};
}
*ctx.last_live_msg.write().await = time::OffsetDateTime::now_utc();
@ -267,11 +161,37 @@ async fn handle_incomming_cmd(
Ok(())
}
async fn start_stream_health_check(ctx: Context) {
async fn read_to_msg_end(rx: &mut OwnedReadHalf, buf: &mut Vec<u8>) -> Option<usize> {
loop {
match rx.read_buf(buf).await {
Ok(n) => {
debug!("received {n} bytes");
}
Err(e) => {
debug!("Failed to read from rx: {e}");
return None;
}
};
let caret_pos = buf.iter().rposition(|i| *i == b'\r');
match caret_pos {
None => continue,
Some(0) => {
debug!("Return caret does not exists or is head. Skipping...: {caret_pos:?}");
return None;
}
Some(n) => return Some(n),
};
}
}
async fn start_stream_health_check(ctx: Context, dead_notif: Sender<()>) {
const PING_PONG_SPAN: time::Duration = time::Duration::seconds(1);
loop {
if *ctx.last_live_msg.read().await + PING_PONG_SPAN >= OffsetDateTime::now_utc() {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
continue;
} else {
dead_notif.send(()).await.unwrap();
}
break;
}
@ -281,129 +201,32 @@ async fn handle_msg<'stream>(
msg: &str,
buf: &mut Vec<u8>,
rx: &mut OwnedReadHalf,
tx: &mut OwnedWriteHalf,
) -> Result<Op, CmdErr> {
let cmd: Cmd = match Cmd::parse_command(msg, buf, rx).await {
client: Client,
server: Server,
) -> Result<(), CmdErr> {
let fut = Cmd::parse_command(msg, buf, rx);
let cmd: Cmd = match fut.await {
Ok(cmd) => cmd,
Err(e) => {
let msg = format!("-ERR '{e}'\r\n");
tx.write(msg.as_bytes()).await.unwrap();
let msg = format!("-ERR '{e}'");
client.write(msg).await.unwrap();
return Err(e);
}
};
match cmd {
Cmd::Connect(_) => {
tx.write(b"+OK\r\n").await.unwrap();
Ok(Op::NoOp)
client.ok().await;
}
Cmd::Publish {
channel_name,
payload,
len,
} => {
tx.write(b"+OK\r\n").await.unwrap();
tx.write(format!("received {channel_name:?} {len:?} {payload:?} \r\n").as_bytes())
.await
.unwrap();
Ok(Op::NoOp)
Cmd::Publish(publish) => {
client.ok().await;
server.publish(publish).await;
}
Cmd::Ping | Cmd::Pong => {
//
Ok(Op::NoOp)
Cmd::Ping | Cmd::Pong => {}
Cmd::Subscribe(subscribe) => {
server.add_sub(client.uid().await, subscribe).await;
client.ok().await;
}
_ => Ok(Op::NoOp),
}
}
enum Op {
NoOp,
}
const PING_PONG_SPAN: time::Duration = time::Duration::seconds(1);
#[derive(Debug)]
struct ChannelName(String);
#[derive(Debug)]
struct UniqueId(String);
#[derive(Debug)]
struct ConnectArgs {}
enum Cmd {
Connect(ConnectArgs),
Ping,
Pong,
Subscribe {
channel_name: ChannelName,
uid: UniqueId,
},
UnSubscribe {
uid: UniqueId,
},
Publish {
channel_name: ChannelName,
payload: String,
len: usize,
},
Message {
channel_name: ChannelName,
target_uid: UniqueId,
payload: String,
len: usize,
},
}
impl Cmd {
async fn parse_command(
s: &str,
buf: &mut Vec<u8>,
rx: &mut OwnedReadHalf,
) -> Result<Self, CmdErr> {
let mut parts = s.split_whitespace();
let cmd = parts.next().ok_or(CmdErr::UnknownCmd)?;
if cmd.eq_ignore_ascii_case("connect") {
let args = ConnectArgs {};
// TODO: args???
Ok(Cmd::Connect(args))
} else if cmd.eq_ignore_ascii_case("ping") {
Ok(Cmd::Ping)
} else if cmd.eq_ignore_ascii_case("pong") {
Ok(Cmd::Pong)
} else if cmd.eq_ignore_ascii_case("pub") {
let channel_name = ChannelName(parts.next().ok_or(CmdErr::MissingChanName)?.to_owned());
let len: usize = parts
.next()
.ok_or(CmdErr::ExpectLen)?
.parse()
.map_err(|_| CmdErr::ExpectLen)?;
let Ok(_) = rx.read_exact(&mut buf[..len]).await else {
return Err(CmdErr::MissingArg);
};
let Ok(payload) = std::str::from_utf8(&buf[..len]) else {
return Err(CmdErr::MissingArg);
};
Ok(Cmd::Publish {
channel_name,
payload: payload.to_owned(),
len,
})
} else {
Err(CmdErr::UnknownCmd)
}
}
}
#[derive(Debug, thiserror::Error)]
enum CmdErr {
#[error("Unknown Protocol Operation")]
UnknownCmd,
#[error("Missing argument")]
MissingArg,
#[error("Missing channel name")]
MissingChanName,
#[error("Missing payload length")]
ExpectLen,
#[error("Unable to write to client")]
WriteFailed,
_ => (),
};
Ok(())
}