commit b05120c91dfe411b3131603fb2e03e16797e329e Author: eraden Date: Mon Sep 11 21:46:48 2023 +0200 Working wildcard diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ea8c4bf --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +/target diff --git a/Cargo.lock b/Cargo.lock new file mode 100644 index 0000000..fbf0ade --- /dev/null +++ b/Cargo.lock @@ -0,0 +1,624 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 3 + +[[package]] +name = "addr2line" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a30b2e23b9e17a9f90641c7ab1549cd9b44f296d3ccbf309d2863cfe398a0cb" +dependencies = [ + "gimli", +] + +[[package]] +name = "adler" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" + +[[package]] +name = "autocfg" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" + +[[package]] +name = "backtrace" +version = "0.3.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2089b7e3f35b9dd2d0ed921ead4f6d318c27680d4a5bd167b3ee120edb105837" +dependencies = [ + "addr2line", + "cc", + "cfg-if", + "libc", + "miniz_oxide", + "object", + "rustc-demangle", +] + +[[package]] +name = "bitflags" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" + +[[package]] +name = "bitflags" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4682ae6287fcf752ecaabbfcc7b6f9b72aa33933dc23a554d853aea8eea8635" + +[[package]] +name = "bytes" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89b2fd2a0dcf38d7971e2194b6b6eebab45ae01067456a7fd93d5547a61b70be" + +[[package]] +name = "cc" +version = "1.0.83" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1174fb0b6ec23863f8b971027804a42614e347eafb0a95bf0b12cdae21fc4d0" +dependencies = [ + "libc", +] + +[[package]] +name = "cfg-if" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" + +[[package]] +name = "deranged" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2696e8a945f658fd14dc3b87242e6b80cd0f36ff04ea560fa39082368847946" +dependencies = [ + "serde", +] + +[[package]] +name = "futures" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23342abe12aba583913b2e62f22225ff9c950774065e4bfb61a19cd9770fec40" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "955518d47e09b25bbebc7a18df10b81f0c766eaf4c4f1cccef2fca5f2a4fb5f2" +dependencies = [ + "futures-core", + "futures-sink", +] + +[[package]] +name = "futures-core" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4bca583b7e26f571124fe5b7561d49cb2868d79116cfa0eefce955557c6fee8c" + +[[package]] +name = "futures-executor" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ccecee823288125bd88b4d7f565c9e58e41858e47ab72e8ea2d64e93624386e0" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964" + +[[package]] +name = "futures-macro" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "futures-sink" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f43be4fe21a13b9781a69afa4985b0f6ee0e1afab2c6f454a8cf30e2b2237b6e" + +[[package]] +name = "futures-task" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76d3d132be6c0e6aa1534069c705a74a5997a356c0dc2f86a47765e5617c5b65" + +[[package]] +name = "futures-util" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26b01e40b772d54cf6c6d721c1d1abd0647a0106a12ecaa1c186273392a69533" +dependencies = [ + "futures-channel", + "futures-core", + "futures-io", + "futures-macro", + "futures-sink", + "futures-task", + "memchr", + "pin-project-lite", + "pin-utils", + "slab", +] + +[[package]] +name = "getrandom" +version = "0.2.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be4136b2a15dd319360be1c07d9933517ccf0be8f16bf62a3bee4f0d618df427" +dependencies = [ + "cfg-if", + "libc", + "wasi", +] + +[[package]] +name = "gimli" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6fb8d784f27acf97159b40fc4db5ecd8aa23b9ad5ef69cdd136d3bc80665f0c0" + +[[package]] +name = "hermit-abi" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "443144c8cdadd93ebf52ddb4056d257f5b52c04d3c804e657d19eb73fc33668b" + +[[package]] +name = "itoa" +version = "1.0.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af150ab688ff2122fcef229be89cb50dd66af9e01a4ff320cc137eecc9bacc38" + +[[package]] +name = "libc" +version = "0.2.147" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4668fb0ea861c1df094127ac5f1da3409a82116a4ba74fca2e58ef927159bb3" + +[[package]] +name = "lock_api" +version = "0.4.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1cc9717a20b1bb222f333e6a92fd32f7d8a18ddc5a3191a11af45dcbf4dcd16" +dependencies = [ + "autocfg", + "scopeguard", +] + +[[package]] +name = "memchr" +version = "2.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5486aed0026218e61b8a01d5fbd5a0a134649abb71a0e53b7bc088529dced86e" + +[[package]] +name = "memoffset" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a634b1c61a95585bd15607c6ab0c4e5b226e695ff2800ba0cdccddf208c406c" +dependencies = [ + "autocfg", +] + +[[package]] +name = "miniz_oxide" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7810e0be55b428ada41041c41f32c9f1a42817901b4ccf45fa3d4b6561e74c7" +dependencies = [ + "adler", +] + +[[package]] +name = "mio" +version = "0.8.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "927a765cd3fc26206e66b296465fa9d3e5ab003e651c1b3c060e7956d96b19d2" +dependencies = [ + "libc", + "wasi", + "windows-sys", +] + +[[package]] +name = "multipeek" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d6b1cf1c2ae7c8c3898cbf8354ee836bc7037e35592d3739a9901d53c97b6a2" + +[[package]] +name = "nats-server" +version = "0.1.0" +dependencies = [ + "futures", + "futures-channel", + "multipeek", + "nix", + "pin-project-lite", + "serde", + "serde_json", + "thiserror", + "time", + "tokio", + "uuid", +] + +[[package]] +name = "nix" +version = "0.27.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2eb04e9c688eff1c89d72b407f168cf79bb9e867a9d3323ed6c01519eb9cc053" +dependencies = [ + "bitflags 2.4.0", + "cfg-if", + "libc", + "memoffset", +] + +[[package]] +name = "num_cpus" +version = "1.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" +dependencies = [ + "hermit-abi", + "libc", +] + +[[package]] +name = "object" +version = "0.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77ac5bbd07aea88c60a577a1ce218075ffd59208b2d7ca97adf9bfc5aeb21ebe" +dependencies = [ + "memchr", +] + +[[package]] +name = "parking_lot" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" +dependencies = [ + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93f00c865fe7cabf650081affecd3871070f26767e7b2070a3ffae14c654b447" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "smallvec", + "windows-targets", +] + +[[package]] +name = "pin-project-lite" +version = "0.2.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8afb450f006bf6385ca15ef45d71d2288452bc3683ce2e2cacc0d18e4be60b58" + +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + +[[package]] +name = "proc-macro2" +version = "1.0.66" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18fb31db3f9bddb2ea821cde30a9f70117e3f119938b5ee630b7403aa6e2ead9" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "quote" +version = "1.0.33" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5267fca4496028628a95160fc423a33e8b2e6af8a5302579e322e4b520293cae" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "redox_syscall" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "567664f262709473930a4bf9e51bf2ebf3348f2e748ccc50dea20646858f8f29" +dependencies = [ + "bitflags 1.3.2", +] + +[[package]] +name = "rustc-demangle" +version = "0.1.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" + +[[package]] +name = "ryu" +version = "1.0.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ad4cc8da4ef723ed60bced201181d83791ad433213d8c24efffda1eec85d741" + +[[package]] +name = "scopeguard" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" + +[[package]] +name = "serde" +version = "1.0.188" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf9e0fcba69a370eed61bcf2b728575f726b50b55cba78064753d708ddc7549e" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.188" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4eca7ac642d82aa35b60049a6eccb4be6be75e599bd2e9adb5f875a737654af2" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "serde_json" +version = "1.0.105" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "693151e1ac27563d6dbcec9dee9fbd5da8539b20fa14ad3752b2e6d363ace360" +dependencies = [ + "itoa", + "ryu", + "serde", +] + +[[package]] +name = "signal-hook-registry" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8229b473baa5980ac72ef434c4415e70c4b5e71b423043adb4ba059f89c99a1" +dependencies = [ + "libc", +] + +[[package]] +name = "slab" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f92a496fb766b417c996b9c5e57daf2f7ad3b0bebe1ccfca4856390e3d3bb67" +dependencies = [ + "autocfg", +] + +[[package]] +name = "smallvec" +version = "1.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62bb4feee49fdd9f707ef802e22365a35de4b7b299de4763d44bfea899442ff9" + +[[package]] +name = "socket2" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2538b18701741680e0322a2302176d3253a35388e2e62f172f64f4f16605f877" +dependencies = [ + "libc", + "windows-sys", +] + +[[package]] +name = "syn" +version = "2.0.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c324c494eba9d92503e6f1ef2e6df781e78f6a7705a0202d9801b198807d518a" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "thiserror" +version = "1.0.47" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97a802ec30afc17eee47b2855fc72e0c4cd62be9b4efe6591edde0ec5bd68d8f" +dependencies = [ + "thiserror-impl", +] + +[[package]] +name = "thiserror-impl" +version = "1.0.47" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6bb623b56e39ab7dcd4b1b98bb6c8f8d907ed255b18de254088016b27a8ee19b" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "time" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "17f6bb557fd245c28e6411aa56b6403c689ad95061f50e4be16c274e70a17e48" +dependencies = [ + "deranged", + "serde", + "time-core", + "time-macros", +] + +[[package]] +name = "time-core" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7300fbefb4dadc1af235a9cef3737cea692a9d97e1b9cbcd4ebdae6f8868e6fb" + +[[package]] +name = "time-macros" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a942f44339478ef67935ab2bbaec2fb0322496cf3cbe84b261e06ac3814c572" +dependencies = [ + "time-core", +] + +[[package]] +name = "tokio" +version = "1.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "17ed6077ed6cd6c74735e21f37eb16dc3935f96878b1fe961074089cc80893f9" +dependencies = [ + "backtrace", + "bytes", + "libc", + "mio", + "num_cpus", + "parking_lot", + "pin-project-lite", + "signal-hook-registry", + "socket2", + "tokio-macros", + "windows-sys", +] + +[[package]] +name = "tokio-macros" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "unicode-ident" +version = "1.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "301abaae475aa91687eb82514b328ab47a211a533026cb25fc3e519b86adfc3c" + +[[package]] +name = "uuid" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "79daa5ed5740825c40b389c5e50312b9c86df53fccd33f281df655642b43869d" +dependencies = [ + "getrandom", +] + +[[package]] +name = "wasi" +version = "0.11.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" + +[[package]] +name = "windows-sys" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" +dependencies = [ + "windows-targets", +] + +[[package]] +name = "windows-targets" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c" +dependencies = [ + "windows_aarch64_gnullvm", + "windows_aarch64_msvc", + "windows_i686_gnu", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_gnullvm", + "windows_x86_64_msvc", +] + +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" + +[[package]] +name = "windows_aarch64_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" + +[[package]] +name = "windows_i686_gnu" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" + +[[package]] +name = "windows_i686_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..acf5de0 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "nats-server" +version = "0.1.0" +edition = "2021" + +[dependencies] +futures = "0.3.28" +futures-channel = "0.3.28" +multipeek = "0.1.2" +nix = { version = "0.27.1", features = ["socket", "hostname", "net", "dir"] } +pin-project-lite = "0.2.13" +serde = { version = "1.0.188", features = ["derive"] } +serde_json = "1.0.105" +thiserror = "1.0.47" +time = { version = "0.3.28", features = ["serde"] } +tokio = { version = "1.32.0", features = ["full"] } +uuid = { version = "1.4.1", features = ["v4"] } diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..51f4e6c --- /dev/null +++ b/src/main.rs @@ -0,0 +1,409 @@ +#![allow(dead_code)] + +use std::borrow::Cow; +use std::collections::HashMap; +use std::sync::Arc; + +use time::OffsetDateTime; +use tokio::{ + io::{AsyncReadExt, AsyncWriteExt}, + net::{ + tcp::{OwnedReadHalf, OwnedWriteHalf}, + TcpStream, + }, + spawn, + sync::{Mutex, RwLock}, +}; +use uuid::Uuid; +use multipeek::multipeek; + +#[derive(Debug)] +pub struct Sub(String); + +impl Sub { + pub fn new>(s: S) -> Self {Self(s.into())} + + pub fn is_wildcard(&self) -> bool { + self.0.contains('*') + } + + fn eq_with_wild(&self, chan: &str) -> bool { + let mut left = self.0.chars().peekable(); + let mut right = multipeek(chan.chars()); + + while let Some(c) = left.peek() { + if *c == '*' { + while let Some('*') = left.peek() { + eprintln!("while let * l => {:?}", left.next()); + eprintln!("while let * r => {:?}", right.next()); // consume right to match number of wildcard + } + // check non-wildcard + let pair = (left.peek(), right.peek()); + eprintln!("after wild {pair:?}"); + let current = match pair { + (Some(c), _) => *c, + (None, _) => return true, + }; + + while let Some(c) = right.peek_nth(0).cloned() { + eprintln!("skipping to {current} -> {c}"); + let second = right.peek_nth(1).cloned(); + if c == current && second != Some(current) { + break; + } else { + right.next(); + } + } + let pair = (left.next(), right.next()); + eprintln!("after wild {pair:?}"); + match pair { + (None, _) => return true, + (Some(l), Some(r)) if l != r => return false, + (Some(_), None) => return false, + _ => {} + }; + } else { + let pair = (left.next(), right.next()); + eprintln!("non wild {pair:?}"); + match pair { + (None, None) => return true, + (Some(l), Some(r)) if l != r => return false, + _ => {} + }; + } + } + left.next() == right.next() + } + + pub fn eq_with_chan(&self, chan: &str) -> bool { + if self.is_wildcard() { + println!("is wild"); + self.eq_with_wild(chan) + } else { + println!("not wild {} {}", self.0.len(), chan.len()); + if self.0.len() != chan.len() { + return false; + } + self.0.eq_ignore_ascii_case(&chan) + } + } +} + +#[cfg(test)] +mod sub_tests { + use super::*; + + #[test] + fn check_simple() { + println!("================================================"); + println!("check simple"); + assert_eq!(Sub::new("foo").eq_with_chan("foo"), true); + assert_eq!(Sub::new("foo").eq_with_chan("fo"), false); + assert_eq!(Sub::new("foo").eq_with_chan("oo"), false); + } + + #[test] + fn check_ends_wild() { + println!("================================================"); + println!("check ends"); + assert_eq!(Sub::new("fo*").eq_with_chan("foo"), true); + assert_eq!(Sub::new("f*").eq_with_chan("foo"), true); + assert_eq!(Sub::new("f*").eq_with_chan("foz"), true); + assert_eq!(Sub::new("f*").eq_with_chan("ff"), true); + assert_eq!(Sub::new("f*").eq_with_chan("of"), false); + assert_eq!(Sub::new("f*").eq_with_chan("ofz"), false); + } + + #[test] + fn check_starts_wild() { + println!("================================================"); + println!("check starts"); + assert_eq!(Sub::new("*oo").eq_with_chan("foo"), true); + assert_eq!(Sub::new("*ofo").eq_with_chan("acfofo"), true); + assert_eq!(Sub::new("*o").eq_with_chan("foo"), true); + assert_eq!(Sub::new("*o").eq_with_chan("bar"), false); + assert_eq!(Sub::new("*o").eq_with_chan("foz"), false); + } + + #[test] + fn check_middle_wild() { + println!("================================================"); + println!("check middle"); + assert_eq!(Sub::new("a*z").eq_with_chan("ahz"), true); + assert_eq!(Sub::new("a*z").eq_with_chan("ahkz"), true); + assert_eq!(Sub::new("a*z").eq_with_chan("hakz"), false); + assert_eq!(Sub::new("a*z").eq_with_chan("akza"), false); + } + + #[test] + fn check_multiple_wild() { + println!("================================================"); + println!("check multi"); + assert_eq!(Sub::new("*a*z").eq_with_chan("gahz"), true); + assert_eq!(Sub::new("*a*z").eq_with_chan("ahz"), false); + + assert_eq!(Sub::new("a*k*z").eq_with_chan("asklz"), true); + assert_eq!(Sub::new("a*k*z").eq_with_chan("sklz"), false); + assert_eq!(Sub::new("a*k*z").eq_with_chan("saklz"), false); + assert_eq!(Sub::new("a*k*z").eq_with_chan("asklzl"), false); + } +} + +#[derive(Debug, Clone)] +struct Server { + clients: Arc>>, + subcriptions: Arc>>, +} + +impl Server { + pub async fn register(&mut self, client: Client) { + self.clients.write().await.insert(client.id, client); + } + + pub async fn remove(&mut self, client: Client) { + *client.dead.write().await = true; + self.clients.write().await.remove(&client.id); + } +} + +#[derive(Debug, Clone)] +struct Client { + id: uuid::Uuid, + writer: Arc>, + dead: Arc>, +} + +impl Client { + pub fn new(write: OwnedWriteHalf) -> Self { + Self { + id: uuid::Uuid::new_v4(), + writer: Arc::new(Mutex::new(write)), + dead: Arc::new(RwLock::new(false)), + } + } + + pub async fn write(&self, s: Cow<'static, str>) -> Result<(), CmdErr> { + let mut l = self.writer.lock().await; + l.write(format!("{s}\r\n").as_bytes()) + .await + .map_err(|e| { + eprintln!("{e}"); + CmdErr::WriteFailed + }) + .map(|_| ()) + } +} + +#[tokio::main] +async fn main() -> std::io::Result<()> { + let listener = tokio::net::TcpListener::bind(("0.0.0.0", 4222)) + .await + .unwrap(); + listener.set_ttl(100).expect("Failed to set TTL"); + + while let Ok((stream, _addr)) = listener.accept().await { + spawn(async move { + handle_connection(stream).await; + }); + } + Ok(()) +} + +#[derive(Clone)] +struct Context { + last_live_msg: Arc>, +} + +async fn handle_connection(stream: TcpStream) { + let ctx = Context { + last_live_msg: Arc::new(RwLock::new(time::OffsetDateTime::now_utc())), + }; + + let is_alive = spawn(start_stream_health_check(ctx.clone())); + let handle_commands = spawn(handle_commands(stream, ctx)); + + tokio::select!( + _ = is_alive => (), + _ = handle_commands => (), + ); +} + +async fn handle_commands(stream: TcpStream, ctx: Context) { + let (mut rx, mut tx) = stream.into_split(); + let mut buf = Vec::with_capacity(4096); + + while let Ok(_) = handle_incomming_cmd(&mut rx, &mut tx, &mut buf, ctx.clone()).await {} +} + +async fn handle_incomming_cmd( + rx: &mut OwnedReadHalf, + tx: &mut OwnedWriteHalf, + buf: &mut Vec, + ctx: Context, +) -> Result<(), CmdErr> { + let Ok(_n) = rx.read_buf(buf).await else { + return Ok(()); + }; + let pos = match buf.iter().position(|i| *i == b'\r') { + Some(0) | None => return Ok(()), + Some(n) => n, + }; + let Ok(msg) = std::str::from_utf8(&buf[..pos]) else { + return Ok(()); + }; + eprintln!("buffer msg is: {msg:?}"); + for msg in msg.to_owned().split('\r') { + eprintln!("Message is: {:?}", msg); + match handle_msg(msg, buf, rx, tx).await { + Err(e) => { + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + return Err(e); + } + Ok(Op::NoOp) => {} + }; + } + *ctx.last_live_msg.write().await = time::OffsetDateTime::now_utc(); + buf.clear(); + Ok(()) +} + +async fn start_stream_health_check(ctx: Context) { + loop { + if *ctx.last_live_msg.read().await + PING_PONG_SPAN >= OffsetDateTime::now_utc() { + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + continue; + } + break; + } +} + +async fn handle_msg<'stream>( + msg: &str, + buf: &mut Vec, + rx: &mut OwnedReadHalf, + tx: &mut OwnedWriteHalf, +) -> Result { + let cmd: Cmd = match Cmd::parse_command(msg, buf, rx).await { + Ok(cmd) => cmd, + Err(e) => { + let msg = format!("-ERR '{e}'\r\n"); + tx.write(msg.as_bytes()).await.unwrap(); + return Err(e); + } + }; + match cmd { + Cmd::Connect(_) => { + tx.write(b"+OK\r\n").await.unwrap(); + Ok(Op::NoOp) + } + Cmd::Publish { + channel_name, + payload, + len, + } => { + tx.write(b"+OK\r\n").await.unwrap(); + tx.write(format!("received {channel_name:?} {len:?} {payload:?} \r\n").as_bytes()) + .await + .unwrap(); + Ok(Op::NoOp) + } + Cmd::Ping | Cmd::Pong => { + // + Ok(Op::NoOp) + } + _ => Ok(Op::NoOp), + } +} + +enum Op { + NoOp, +} + +const PING_PONG_SPAN: time::Duration = time::Duration::seconds(1); + +#[derive(Debug)] +struct ChannelName(String); + +#[derive(Debug)] +struct UniqueId(String); + +#[derive(Debug)] +struct ConnectArgs {} + +enum Cmd { + Connect(ConnectArgs), + Ping, + Pong, + Subscribe { + channel_name: ChannelName, + uid: UniqueId, + }, + UnSubscribe { + uid: UniqueId, + }, + Publish { + channel_name: ChannelName, + payload: String, + len: usize, + }, + Message { + channel_name: ChannelName, + target_uid: UniqueId, + payload: String, + len: usize, + }, +} + +impl Cmd { + async fn parse_command( + s: &str, + buf: &mut Vec, + rx: &mut OwnedReadHalf, + ) -> Result { + let mut parts = s.split_whitespace(); + let cmd = parts.next().ok_or(CmdErr::UnknownCmd)?; + if cmd.eq_ignore_ascii_case("connect") { + let args = ConnectArgs {}; + // TODO: args??? + Ok(Cmd::Connect(args)) + } else if cmd.eq_ignore_ascii_case("ping") { + Ok(Cmd::Ping) + } else if cmd.eq_ignore_ascii_case("pong") { + Ok(Cmd::Pong) + } else if cmd.eq_ignore_ascii_case("pub") { + let channel_name = ChannelName(parts.next().ok_or(CmdErr::MissingChanName)?.to_owned()); + let len: usize = parts + .next() + .ok_or(CmdErr::ExpectLen)? + .parse() + .map_err(|_| CmdErr::ExpectLen)?; + let Ok(_) = rx.read_exact(&mut buf[..len]).await else { + return Err(CmdErr::MissingArg); + }; + let Ok(payload) = std::str::from_utf8(&buf[..len]) else { + return Err(CmdErr::MissingArg); + }; + Ok(Cmd::Publish { + channel_name, + payload: payload.to_owned(), + len, + }) + } else { + Err(CmdErr::UnknownCmd) + } + } +} + +#[derive(Debug, thiserror::Error)] +enum CmdErr { + #[error("Unknown Protocol Operation")] + UnknownCmd, + #[error("Missing argument")] + MissingArg, + #[error("Missing channel name")] + MissingChanName, + #[error("Missing payload length")] + ExpectLen, + #[error("Unable to write to client")] + WriteFailed, +}