From 05e8baa6fb716cba405d40d9c1380e76fb5cf144 Mon Sep 17 00:00:00 2001 From: eraden Date: Sat, 16 Sep 2023 21:55:18 +0200 Subject: [PATCH] Working basic server --- Cargo.lock | 282 ++++++++++++++++++++++++++++++++- Cargo.toml | 8 + src/main.rs | 445 ++++++++++++++++------------------------------------ 3 files changed, 420 insertions(+), 315 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index fbf0ade..4c4fdb9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,6 +17,15 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" +[[package]] +name = "aho-corasick" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c378d78423fdad8089616f827526ee33c19f2fddbd5de1629152c9593ba4783" +dependencies = [ + "memchr", +] + [[package]] name = "autocfg" version = "1.1.0" @@ -71,6 +80,12 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "convert_case" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6245d59a3e82a7fc217c5828a6692dbc6dfb63a0c8c90495621f7b9d79704a0e" + [[package]] name = "deranged" version = "0.3.8" @@ -80,6 +95,19 @@ dependencies = [ "serde", ] +[[package]] +name = "derive_more" +version = "0.99.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fb810d30a7c1953f91334de7244731fc3f3c10d7fe163338a35b9f640960321" +dependencies = [ + "convert_case", + "proc-macro2", + "quote", + "rustc_version", + "syn 1.0.109", +] + [[package]] name = "futures" version = "0.3.28" @@ -136,7 +164,7 @@ checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.29", ] [[package]] @@ -186,6 +214,26 @@ version = "0.28.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6fb8d784f27acf97159b40fc4db5ecd8aa23b9ad5ef69cdd136d3bc80665f0c0" +[[package]] +name = "gumdrop" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5bc700f989d2f6f0248546222d9b4258f5b02a171a431f8285a81c08142629e3" +dependencies = [ + "gumdrop_derive", +] + +[[package]] +name = "gumdrop_derive" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "729f9bd3449d77e7831a18abfb7ba2f99ee813dfd15b8c2167c9a54ba20aa99d" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "hermit-abi" version = "0.3.2" @@ -198,6 +246,12 @@ version = "1.0.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "af150ab688ff2122fcef229be89cb50dd66af9e01a4ff320cc137eecc9bacc38" +[[package]] +name = "lazy_static" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" + [[package]] name = "libc" version = "0.2.147" @@ -214,6 +268,21 @@ dependencies = [ "scopeguard", ] +[[package]] +name = "log" +version = "0.4.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" + +[[package]] +name = "matchers" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" +dependencies = [ + "regex-automata 0.1.10", +] + [[package]] name = "memchr" version = "2.6.2" @@ -259,8 +328,10 @@ checksum = "1d6b1cf1c2ae7c8c3898cbf8354ee836bc7037e35592d3739a9901d53c97b6a2" name = "nats-server" version = "0.1.0" dependencies = [ + "derive_more", "futures", "futures-channel", + "gumdrop", "multipeek", "nix", "pin-project-lite", @@ -269,6 +340,8 @@ dependencies = [ "thiserror", "time", "tokio", + "tracing", + "tracing-subscriber", "uuid", ] @@ -284,6 +357,16 @@ dependencies = [ "memoffset", ] +[[package]] +name = "nu-ansi-term" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" +dependencies = [ + "overload", + "winapi", +] + [[package]] name = "num_cpus" version = "1.16.0" @@ -303,6 +386,18 @@ dependencies = [ "memchr", ] +[[package]] +name = "once_cell" +version = "1.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d" + +[[package]] +name = "overload" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" + [[package]] name = "parking_lot" version = "0.12.1" @@ -365,12 +460,65 @@ dependencies = [ "bitflags 1.3.2", ] +[[package]] +name = "regex" +version = "1.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "697061221ea1b4a94a624f67d0ae2bfe4e22b8a17b6a192afb11046542cc8c47" +dependencies = [ + "aho-corasick", + "memchr", + "regex-automata 0.3.8", + "regex-syntax 0.7.5", +] + +[[package]] +name = "regex-automata" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" +dependencies = [ + "regex-syntax 0.6.29", +] + +[[package]] +name = "regex-automata" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2f401f4955220693b56f8ec66ee9c78abffd8d1c4f23dc41a23839eb88f0795" +dependencies = [ + "aho-corasick", + "memchr", + "regex-syntax 0.7.5", +] + +[[package]] +name = "regex-syntax" +version = "0.6.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" + +[[package]] +name = "regex-syntax" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dbb5fb1acd8a1a18b3dd5be62d25485eb770e05afb408a9627d14d451bae12da" + [[package]] name = "rustc-demangle" version = "0.1.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" +[[package]] +name = "rustc_version" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366" +dependencies = [ + "semver", +] + [[package]] name = "ryu" version = "1.0.15" @@ -383,6 +531,12 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "semver" +version = "1.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0293b4b29daaf487284529cc2f5675b8e57c61f70167ba415a463651fd6a918" + [[package]] name = "serde" version = "1.0.188" @@ -400,7 +554,7 @@ checksum = "4eca7ac642d82aa35b60049a6eccb4be6be75e599bd2e9adb5f875a737654af2" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.29", ] [[package]] @@ -414,6 +568,15 @@ dependencies = [ "serde", ] +[[package]] +name = "sharded-slab" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "900fba806f70c630b0a382d0d825e17a0f19fcd059a2ade1ff237bcddf446b31" +dependencies = [ + "lazy_static", +] + [[package]] name = "signal-hook-registry" version = "1.4.1" @@ -448,6 +611,17 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "syn" +version = "1.0.109" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + [[package]] name = "syn" version = "2.0.29" @@ -476,7 +650,17 @@ checksum = "6bb623b56e39ab7dcd4b1b98bb6c8f8d907ed255b18de254088016b27a8ee19b" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.29", +] + +[[package]] +name = "thread_local" +version = "1.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3fdd6f064ccff2d6567adcb3873ca630700f00b5ad3f060c25b5dcfd9a4ce152" +dependencies = [ + "cfg-if", + "once_cell", ] [[package]] @@ -533,7 +717,69 @@ checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.29", +] + +[[package]] +name = "tracing" +version = "0.1.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8" +dependencies = [ + "cfg-if", + "pin-project-lite", + "tracing-attributes", + "tracing-core", +] + +[[package]] +name = "tracing-attributes" +version = "0.1.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.29", +] + +[[package]] +name = "tracing-core" +version = "0.1.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0955b8137a1df6f1a2e9a37d8a6656291ff0297c1a97c24e0d8425fe2312f79a" +dependencies = [ + "once_cell", + "valuable", +] + +[[package]] +name = "tracing-log" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78ddad33d2d10b1ed7eb9d1f518a5674713876e97e5bb9b7345a7984fbb4f922" +dependencies = [ + "lazy_static", + "log", + "tracing-core", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30a651bc37f915e81f087d86e62a18eec5f79550c7faff886f7090b4ea757c77" +dependencies = [ + "matchers", + "nu-ansi-term", + "once_cell", + "regex", + "sharded-slab", + "smallvec", + "thread_local", + "tracing", + "tracing-core", + "tracing-log", ] [[package]] @@ -551,12 +797,40 @@ dependencies = [ "getrandom", ] +[[package]] +name = "valuable" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" + [[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + [[package]] name = "windows-sys" version = "0.48.0" diff --git a/Cargo.toml b/Cargo.toml index acf5de0..14c773a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,9 +3,15 @@ name = "nats-server" version = "0.1.0" edition = "2021" +[[test]] +name = "test_connect" +path = "./tests/test_connect.rs" + [dependencies] +derive_more = "0.99.17" futures = "0.3.28" futures-channel = "0.3.28" +gumdrop = "0.8.1" multipeek = "0.1.2" nix = { version = "0.27.1", features = ["socket", "hostname", "net", "dir"] } pin-project-lite = "0.2.13" @@ -14,4 +20,6 @@ serde_json = "1.0.105" thiserror = "1.0.47" time = { version = "0.3.28", features = ["serde"] } tokio = { version = "1.32.0", features = ["full"] } +tracing = "0.1.37" +tracing-subscriber = { version = "0.3.17", features = ["env-filter"] } uuid = { version = "1.4.1", features = ["v4"] } diff --git a/src/main.rs b/src/main.rs index 51f4e6c..05bf65b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,10 +1,13 @@ #![allow(dead_code)] +#![feature(async_fn_in_trait)] -use std::borrow::Cow; +use std::collections::BTreeSet; use std::collections::HashMap; +use std::net::SocketAddr; use std::sync::Arc; use time::OffsetDateTime; +use tokio::sync::mpsc::Sender; use tokio::{ io::{AsyncReadExt, AsyncWriteExt}, net::{ @@ -14,196 +17,56 @@ use tokio::{ spawn, sync::{Mutex, RwLock}, }; -use uuid::Uuid; -use multipeek::multipeek; +use tracing::{debug, info}; -#[derive(Debug)] -pub struct Sub(String); +mod input; +use input::*; +mod output; +use output::*; +mod model; +use model::*; +mod client; +use client::*; +mod server; +use server::*; +mod error; +use error::*; -impl Sub { - pub fn new>(s: S) -> Self {Self(s.into())} - - pub fn is_wildcard(&self) -> bool { - self.0.contains('*') - } - - fn eq_with_wild(&self, chan: &str) -> bool { - let mut left = self.0.chars().peekable(); - let mut right = multipeek(chan.chars()); - - while let Some(c) = left.peek() { - if *c == '*' { - while let Some('*') = left.peek() { - eprintln!("while let * l => {:?}", left.next()); - eprintln!("while let * r => {:?}", right.next()); // consume right to match number of wildcard - } - // check non-wildcard - let pair = (left.peek(), right.peek()); - eprintln!("after wild {pair:?}"); - let current = match pair { - (Some(c), _) => *c, - (None, _) => return true, - }; - - while let Some(c) = right.peek_nth(0).cloned() { - eprintln!("skipping to {current} -> {c}"); - let second = right.peek_nth(1).cloned(); - if c == current && second != Some(current) { - break; - } else { - right.next(); - } - } - let pair = (left.next(), right.next()); - eprintln!("after wild {pair:?}"); - match pair { - (None, _) => return true, - (Some(l), Some(r)) if l != r => return false, - (Some(_), None) => return false, - _ => {} - }; - } else { - let pair = (left.next(), right.next()); - eprintln!("non wild {pair:?}"); - match pair { - (None, None) => return true, - (Some(l), Some(r)) if l != r => return false, - _ => {} - }; - } - } - left.next() == right.next() - } - - pub fn eq_with_chan(&self, chan: &str) -> bool { - if self.is_wildcard() { - println!("is wild"); - self.eq_with_wild(chan) - } else { - println!("not wild {} {}", self.0.len(), chan.len()); - if self.0.len() != chan.len() { - return false; - } - self.0.eq_ignore_ascii_case(&chan) - } - } +#[derive(gumdrop::Options)] +struct Args { + help: bool, + port: Option, + addr: Option, + max_payload: Option, } -#[cfg(test)] -mod sub_tests { - use super::*; - - #[test] - fn check_simple() { - println!("================================================"); - println!("check simple"); - assert_eq!(Sub::new("foo").eq_with_chan("foo"), true); - assert_eq!(Sub::new("foo").eq_with_chan("fo"), false); - assert_eq!(Sub::new("foo").eq_with_chan("oo"), false); - } - - #[test] - fn check_ends_wild() { - println!("================================================"); - println!("check ends"); - assert_eq!(Sub::new("fo*").eq_with_chan("foo"), true); - assert_eq!(Sub::new("f*").eq_with_chan("foo"), true); - assert_eq!(Sub::new("f*").eq_with_chan("foz"), true); - assert_eq!(Sub::new("f*").eq_with_chan("ff"), true); - assert_eq!(Sub::new("f*").eq_with_chan("of"), false); - assert_eq!(Sub::new("f*").eq_with_chan("ofz"), false); - } - - #[test] - fn check_starts_wild() { - println!("================================================"); - println!("check starts"); - assert_eq!(Sub::new("*oo").eq_with_chan("foo"), true); - assert_eq!(Sub::new("*ofo").eq_with_chan("acfofo"), true); - assert_eq!(Sub::new("*o").eq_with_chan("foo"), true); - assert_eq!(Sub::new("*o").eq_with_chan("bar"), false); - assert_eq!(Sub::new("*o").eq_with_chan("foz"), false); - } - - #[test] - fn check_middle_wild() { - println!("================================================"); - println!("check middle"); - assert_eq!(Sub::new("a*z").eq_with_chan("ahz"), true); - assert_eq!(Sub::new("a*z").eq_with_chan("ahkz"), true); - assert_eq!(Sub::new("a*z").eq_with_chan("hakz"), false); - assert_eq!(Sub::new("a*z").eq_with_chan("akza"), false); - } - - #[test] - fn check_multiple_wild() { - println!("================================================"); - println!("check multi"); - assert_eq!(Sub::new("*a*z").eq_with_chan("gahz"), true); - assert_eq!(Sub::new("*a*z").eq_with_chan("ahz"), false); - - assert_eq!(Sub::new("a*k*z").eq_with_chan("asklz"), true); - assert_eq!(Sub::new("a*k*z").eq_with_chan("sklz"), false); - assert_eq!(Sub::new("a*k*z").eq_with_chan("saklz"), false); - assert_eq!(Sub::new("a*k*z").eq_with_chan("asklzl"), false); - } -} - -#[derive(Debug, Clone)] -struct Server { - clients: Arc>>, - subcriptions: Arc>>, -} - -impl Server { - pub async fn register(&mut self, client: Client) { - self.clients.write().await.insert(client.id, client); - } - - pub async fn remove(&mut self, client: Client) { - *client.dead.write().await = true; - self.clients.write().await.remove(&client.id); - } -} - -#[derive(Debug, Clone)] -struct Client { - id: uuid::Uuid, - writer: Arc>, - dead: Arc>, -} - -impl Client { - pub fn new(write: OwnedWriteHalf) -> Self { - Self { - id: uuid::Uuid::new_v4(), - writer: Arc::new(Mutex::new(write)), - dead: Arc::new(RwLock::new(false)), - } - } - - pub async fn write(&self, s: Cow<'static, str>) -> Result<(), CmdErr> { - let mut l = self.writer.lock().await; - l.write(format!("{s}\r\n").as_bytes()) - .await - .map_err(|e| { - eprintln!("{e}"); - CmdErr::WriteFailed - }) - .map(|_| ()) - } +pub fn init_logger() { + tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::filter::EnvFilter::from_default_env()) + .init(); } #[tokio::main] async fn main() -> std::io::Result<()> { - let listener = tokio::net::TcpListener::bind(("0.0.0.0", 4222)) + init_logger(); + let args: Args = gumdrop::Options::parse_args_default_or_exit(); + let server = Server::new( + &args.addr.unwrap_or_else(|| "0.0.0.0".into()), + args.port.unwrap_or(4222), + args.max_payload.unwrap_or(4096), + ); + info!("Starting server at {}:{}", server.addr, server.port); + + let listener = tokio::net::TcpListener::bind((server.addr.clone().as_str(), server.port)) .await .unwrap(); listener.set_ttl(100).expect("Failed to set TTL"); - while let Ok((stream, _addr)) = listener.accept().await { + while let Ok((stream, addr)) = listener.accept().await { + info!("new connection from {addr:?}"); + let server = server.clone(); spawn(async move { - handle_connection(stream).await; + handle_connection(server.clone(), addr, stream).await; }); } Ok(()) @@ -214,13 +77,14 @@ struct Context { last_live_msg: Arc>, } -async fn handle_connection(stream: TcpStream) { +async fn handle_connection(server: Server, addr: SocketAddr, stream: TcpStream) { let ctx = Context { last_live_msg: Arc::new(RwLock::new(time::OffsetDateTime::now_utc())), }; - let is_alive = spawn(start_stream_health_check(ctx.clone())); - let handle_commands = spawn(handle_commands(stream, ctx)); + let (read, client, dead_notif) = server.create_client(stream).await; + let is_alive = spawn(start_stream_health_check(ctx.clone(), dead_notif)); + let handle_commands = spawn(handle_commands(server.clone(), addr, ctx, read, client)); tokio::select!( _ = is_alive => (), @@ -228,38 +92,68 @@ async fn handle_connection(stream: TcpStream) { ); } -async fn handle_commands(stream: TcpStream, ctx: Context) { - let (mut rx, mut tx) = stream.into_split(); - let mut buf = Vec::with_capacity(4096); +async fn handle_commands( + server: Server, + addr: SocketAddr, + ctx: Context, + mut rx: OwnedReadHalf, + client: Client, +) { + let mut buf = Vec::with_capacity(server.max_payload); - while let Ok(_) = handle_incomming_cmd(&mut rx, &mut tx, &mut buf, ctx.clone()).await {} + let conn_info = ConnectionInfo { + client_ip: addr.to_string(), + server_id: "asdajsdjasdjas".into(), + server_name: "foo.bar".into(), + version: env!("CARGO_PKG_VERSION"), + proto: 1, + host: server.addr.clone(), + port: server.port, + client_id: client.id().await, + max_payload: server.max_payload, + }; + client + .write(format!("{}", Info::new(conn_info))) + .await + .unwrap(); + + while let Ok(_) = handle_incomming_cmd( + &mut rx, + client.clone(), + &mut buf, + ctx.clone(), + server.clone(), + ) + .await + {} } async fn handle_incomming_cmd( rx: &mut OwnedReadHalf, - tx: &mut OwnedWriteHalf, + client: Client, buf: &mut Vec, ctx: Context, + server: Server, ) -> Result<(), CmdErr> { - let Ok(_n) = rx.read_buf(buf).await else { + buf.clear(); + debug!("Reading message...."); + let Some(pos) = read_to_msg_end(rx, buf).await else { return Ok(()); }; - let pos = match buf.iter().position(|i| *i == b'\r') { - Some(0) | None => return Ok(()), - Some(n) => n, - }; + let Ok(msg) = std::str::from_utf8(&buf[..pos]) else { return Ok(()); }; - eprintln!("buffer msg is: {msg:?}"); + debug!("buffer msg is: {msg:?}"); for msg in msg.to_owned().split('\r') { - eprintln!("Message is: {:?}", msg); - match handle_msg(msg, buf, rx, tx).await { + debug!("Message is: {:?}", msg); + let msg = msg.trim(); + match handle_msg(msg, buf, rx, client.clone(), server.clone()).await { Err(e) => { tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; return Err(e); } - Ok(Op::NoOp) => {} + Ok(()) => {} }; } *ctx.last_live_msg.write().await = time::OffsetDateTime::now_utc(); @@ -267,11 +161,37 @@ async fn handle_incomming_cmd( Ok(()) } -async fn start_stream_health_check(ctx: Context) { +async fn read_to_msg_end(rx: &mut OwnedReadHalf, buf: &mut Vec) -> Option { + loop { + match rx.read_buf(buf).await { + Ok(n) => { + debug!("received {n} bytes"); + } + Err(e) => { + debug!("Failed to read from rx: {e}"); + return None; + } + }; + let caret_pos = buf.iter().rposition(|i| *i == b'\r'); + match caret_pos { + None => continue, + Some(0) => { + debug!("Return caret does not exists or is head. Skipping...: {caret_pos:?}"); + return None; + } + Some(n) => return Some(n), + }; + } +} + +async fn start_stream_health_check(ctx: Context, dead_notif: Sender<()>) { + const PING_PONG_SPAN: time::Duration = time::Duration::seconds(1); loop { if *ctx.last_live_msg.read().await + PING_PONG_SPAN >= OffsetDateTime::now_utc() { tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; continue; + } else { + dead_notif.send(()).await.unwrap(); } break; } @@ -281,129 +201,32 @@ async fn handle_msg<'stream>( msg: &str, buf: &mut Vec, rx: &mut OwnedReadHalf, - tx: &mut OwnedWriteHalf, -) -> Result { - let cmd: Cmd = match Cmd::parse_command(msg, buf, rx).await { + client: Client, + server: Server, +) -> Result<(), CmdErr> { + let fut = Cmd::parse_command(msg, buf, rx); + let cmd: Cmd = match fut.await { Ok(cmd) => cmd, Err(e) => { - let msg = format!("-ERR '{e}'\r\n"); - tx.write(msg.as_bytes()).await.unwrap(); + let msg = format!("-ERR '{e}'"); + client.write(msg).await.unwrap(); return Err(e); } }; match cmd { Cmd::Connect(_) => { - tx.write(b"+OK\r\n").await.unwrap(); - Ok(Op::NoOp) + client.ok().await; } - Cmd::Publish { - channel_name, - payload, - len, - } => { - tx.write(b"+OK\r\n").await.unwrap(); - tx.write(format!("received {channel_name:?} {len:?} {payload:?} \r\n").as_bytes()) - .await - .unwrap(); - Ok(Op::NoOp) + Cmd::Publish(publish) => { + client.ok().await; + server.publish(publish).await; } - Cmd::Ping | Cmd::Pong => { - // - Ok(Op::NoOp) + Cmd::Ping | Cmd::Pong => {} + Cmd::Subscribe(subscribe) => { + server.add_sub(client.uid().await, subscribe).await; + client.ok().await; } - _ => Ok(Op::NoOp), - } -} - -enum Op { - NoOp, -} - -const PING_PONG_SPAN: time::Duration = time::Duration::seconds(1); - -#[derive(Debug)] -struct ChannelName(String); - -#[derive(Debug)] -struct UniqueId(String); - -#[derive(Debug)] -struct ConnectArgs {} - -enum Cmd { - Connect(ConnectArgs), - Ping, - Pong, - Subscribe { - channel_name: ChannelName, - uid: UniqueId, - }, - UnSubscribe { - uid: UniqueId, - }, - Publish { - channel_name: ChannelName, - payload: String, - len: usize, - }, - Message { - channel_name: ChannelName, - target_uid: UniqueId, - payload: String, - len: usize, - }, -} - -impl Cmd { - async fn parse_command( - s: &str, - buf: &mut Vec, - rx: &mut OwnedReadHalf, - ) -> Result { - let mut parts = s.split_whitespace(); - let cmd = parts.next().ok_or(CmdErr::UnknownCmd)?; - if cmd.eq_ignore_ascii_case("connect") { - let args = ConnectArgs {}; - // TODO: args??? - Ok(Cmd::Connect(args)) - } else if cmd.eq_ignore_ascii_case("ping") { - Ok(Cmd::Ping) - } else if cmd.eq_ignore_ascii_case("pong") { - Ok(Cmd::Pong) - } else if cmd.eq_ignore_ascii_case("pub") { - let channel_name = ChannelName(parts.next().ok_or(CmdErr::MissingChanName)?.to_owned()); - let len: usize = parts - .next() - .ok_or(CmdErr::ExpectLen)? - .parse() - .map_err(|_| CmdErr::ExpectLen)?; - let Ok(_) = rx.read_exact(&mut buf[..len]).await else { - return Err(CmdErr::MissingArg); - }; - let Ok(payload) = std::str::from_utf8(&buf[..len]) else { - return Err(CmdErr::MissingArg); - }; - Ok(Cmd::Publish { - channel_name, - payload: payload.to_owned(), - len, - }) - } else { - Err(CmdErr::UnknownCmd) - } - } -} - -#[derive(Debug, thiserror::Error)] -enum CmdErr { - #[error("Unknown Protocol Operation")] - UnknownCmd, - #[error("Missing argument")] - MissingArg, - #[error("Missing channel name")] - MissingChanName, - #[error("Missing payload length")] - ExpectLen, - #[error("Unable to write to client")] - WriteFailed, + _ => (), + }; + Ok(()) }