Working wildcard

This commit is contained in:
eraden 2023-09-11 21:46:48 +02:00
commit b05120c91d
4 changed files with 1051 additions and 0 deletions

1
.gitignore vendored Normal file
View File

@ -0,0 +1 @@
/target

624
Cargo.lock generated Normal file
View File

@ -0,0 +1,624 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
version = 3
[[package]]
name = "addr2line"
version = "0.21.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a30b2e23b9e17a9f90641c7ab1549cd9b44f296d3ccbf309d2863cfe398a0cb"
dependencies = [
"gimli",
]
[[package]]
name = "adler"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
[[package]]
name = "autocfg"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]]
name = "backtrace"
version = "0.3.69"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2089b7e3f35b9dd2d0ed921ead4f6d318c27680d4a5bd167b3ee120edb105837"
dependencies = [
"addr2line",
"cc",
"cfg-if",
"libc",
"miniz_oxide",
"object",
"rustc-demangle",
]
[[package]]
name = "bitflags"
version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "bitflags"
version = "2.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4682ae6287fcf752ecaabbfcc7b6f9b72aa33933dc23a554d853aea8eea8635"
[[package]]
name = "bytes"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "89b2fd2a0dcf38d7971e2194b6b6eebab45ae01067456a7fd93d5547a61b70be"
[[package]]
name = "cc"
version = "1.0.83"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f1174fb0b6ec23863f8b971027804a42614e347eafb0a95bf0b12cdae21fc4d0"
dependencies = [
"libc",
]
[[package]]
name = "cfg-if"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "deranged"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f2696e8a945f658fd14dc3b87242e6b80cd0f36ff04ea560fa39082368847946"
dependencies = [
"serde",
]
[[package]]
name = "futures"
version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "23342abe12aba583913b2e62f22225ff9c950774065e4bfb61a19cd9770fec40"
dependencies = [
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-channel"
version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "955518d47e09b25bbebc7a18df10b81f0c766eaf4c4f1cccef2fca5f2a4fb5f2"
dependencies = [
"futures-core",
"futures-sink",
]
[[package]]
name = "futures-core"
version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4bca583b7e26f571124fe5b7561d49cb2868d79116cfa0eefce955557c6fee8c"
[[package]]
name = "futures-executor"
version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ccecee823288125bd88b4d7f565c9e58e41858e47ab72e8ea2d64e93624386e0"
dependencies = [
"futures-core",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-io"
version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964"
[[package]]
name = "futures-macro"
version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "futures-sink"
version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f43be4fe21a13b9781a69afa4985b0f6ee0e1afab2c6f454a8cf30e2b2237b6e"
[[package]]
name = "futures-task"
version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "76d3d132be6c0e6aa1534069c705a74a5997a356c0dc2f86a47765e5617c5b65"
[[package]]
name = "futures-util"
version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26b01e40b772d54cf6c6d721c1d1abd0647a0106a12ecaa1c186273392a69533"
dependencies = [
"futures-channel",
"futures-core",
"futures-io",
"futures-macro",
"futures-sink",
"futures-task",
"memchr",
"pin-project-lite",
"pin-utils",
"slab",
]
[[package]]
name = "getrandom"
version = "0.2.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be4136b2a15dd319360be1c07d9933517ccf0be8f16bf62a3bee4f0d618df427"
dependencies = [
"cfg-if",
"libc",
"wasi",
]
[[package]]
name = "gimli"
version = "0.28.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6fb8d784f27acf97159b40fc4db5ecd8aa23b9ad5ef69cdd136d3bc80665f0c0"
[[package]]
name = "hermit-abi"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "443144c8cdadd93ebf52ddb4056d257f5b52c04d3c804e657d19eb73fc33668b"
[[package]]
name = "itoa"
version = "1.0.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "af150ab688ff2122fcef229be89cb50dd66af9e01a4ff320cc137eecc9bacc38"
[[package]]
name = "libc"
version = "0.2.147"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4668fb0ea861c1df094127ac5f1da3409a82116a4ba74fca2e58ef927159bb3"
[[package]]
name = "lock_api"
version = "0.4.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c1cc9717a20b1bb222f333e6a92fd32f7d8a18ddc5a3191a11af45dcbf4dcd16"
dependencies = [
"autocfg",
"scopeguard",
]
[[package]]
name = "memchr"
version = "2.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5486aed0026218e61b8a01d5fbd5a0a134649abb71a0e53b7bc088529dced86e"
[[package]]
name = "memoffset"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a634b1c61a95585bd15607c6ab0c4e5b226e695ff2800ba0cdccddf208c406c"
dependencies = [
"autocfg",
]
[[package]]
name = "miniz_oxide"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7810e0be55b428ada41041c41f32c9f1a42817901b4ccf45fa3d4b6561e74c7"
dependencies = [
"adler",
]
[[package]]
name = "mio"
version = "0.8.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "927a765cd3fc26206e66b296465fa9d3e5ab003e651c1b3c060e7956d96b19d2"
dependencies = [
"libc",
"wasi",
"windows-sys",
]
[[package]]
name = "multipeek"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d6b1cf1c2ae7c8c3898cbf8354ee836bc7037e35592d3739a9901d53c97b6a2"
[[package]]
name = "nats-server"
version = "0.1.0"
dependencies = [
"futures",
"futures-channel",
"multipeek",
"nix",
"pin-project-lite",
"serde",
"serde_json",
"thiserror",
"time",
"tokio",
"uuid",
]
[[package]]
name = "nix"
version = "0.27.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2eb04e9c688eff1c89d72b407f168cf79bb9e867a9d3323ed6c01519eb9cc053"
dependencies = [
"bitflags 2.4.0",
"cfg-if",
"libc",
"memoffset",
]
[[package]]
name = "num_cpus"
version = "1.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43"
dependencies = [
"hermit-abi",
"libc",
]
[[package]]
name = "object"
version = "0.32.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77ac5bbd07aea88c60a577a1ce218075ffd59208b2d7ca97adf9bfc5aeb21ebe"
dependencies = [
"memchr",
]
[[package]]
name = "parking_lot"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f"
dependencies = [
"lock_api",
"parking_lot_core",
]
[[package]]
name = "parking_lot_core"
version = "0.9.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "93f00c865fe7cabf650081affecd3871070f26767e7b2070a3ffae14c654b447"
dependencies = [
"cfg-if",
"libc",
"redox_syscall",
"smallvec",
"windows-targets",
]
[[package]]
name = "pin-project-lite"
version = "0.2.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8afb450f006bf6385ca15ef45d71d2288452bc3683ce2e2cacc0d18e4be60b58"
[[package]]
name = "pin-utils"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "proc-macro2"
version = "1.0.66"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "18fb31db3f9bddb2ea821cde30a9f70117e3f119938b5ee630b7403aa6e2ead9"
dependencies = [
"unicode-ident",
]
[[package]]
name = "quote"
version = "1.0.33"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5267fca4496028628a95160fc423a33e8b2e6af8a5302579e322e4b520293cae"
dependencies = [
"proc-macro2",
]
[[package]]
name = "redox_syscall"
version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "567664f262709473930a4bf9e51bf2ebf3348f2e748ccc50dea20646858f8f29"
dependencies = [
"bitflags 1.3.2",
]
[[package]]
name = "rustc-demangle"
version = "0.1.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76"
[[package]]
name = "ryu"
version = "1.0.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ad4cc8da4ef723ed60bced201181d83791ad433213d8c24efffda1eec85d741"
[[package]]
name = "scopeguard"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
[[package]]
name = "serde"
version = "1.0.188"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf9e0fcba69a370eed61bcf2b728575f726b50b55cba78064753d708ddc7549e"
dependencies = [
"serde_derive",
]
[[package]]
name = "serde_derive"
version = "1.0.188"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4eca7ac642d82aa35b60049a6eccb4be6be75e599bd2e9adb5f875a737654af2"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "serde_json"
version = "1.0.105"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "693151e1ac27563d6dbcec9dee9fbd5da8539b20fa14ad3752b2e6d363ace360"
dependencies = [
"itoa",
"ryu",
"serde",
]
[[package]]
name = "signal-hook-registry"
version = "1.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d8229b473baa5980ac72ef434c4415e70c4b5e71b423043adb4ba059f89c99a1"
dependencies = [
"libc",
]
[[package]]
name = "slab"
version = "0.4.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f92a496fb766b417c996b9c5e57daf2f7ad3b0bebe1ccfca4856390e3d3bb67"
dependencies = [
"autocfg",
]
[[package]]
name = "smallvec"
version = "1.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "62bb4feee49fdd9f707ef802e22365a35de4b7b299de4763d44bfea899442ff9"
[[package]]
name = "socket2"
version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2538b18701741680e0322a2302176d3253a35388e2e62f172f64f4f16605f877"
dependencies = [
"libc",
"windows-sys",
]
[[package]]
name = "syn"
version = "2.0.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c324c494eba9d92503e6f1ef2e6df781e78f6a7705a0202d9801b198807d518a"
dependencies = [
"proc-macro2",
"quote",
"unicode-ident",
]
[[package]]
name = "thiserror"
version = "1.0.47"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97a802ec30afc17eee47b2855fc72e0c4cd62be9b4efe6591edde0ec5bd68d8f"
dependencies = [
"thiserror-impl",
]
[[package]]
name = "thiserror-impl"
version = "1.0.47"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6bb623b56e39ab7dcd4b1b98bb6c8f8d907ed255b18de254088016b27a8ee19b"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "time"
version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "17f6bb557fd245c28e6411aa56b6403c689ad95061f50e4be16c274e70a17e48"
dependencies = [
"deranged",
"serde",
"time-core",
"time-macros",
]
[[package]]
name = "time-core"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7300fbefb4dadc1af235a9cef3737cea692a9d97e1b9cbcd4ebdae6f8868e6fb"
[[package]]
name = "time-macros"
version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a942f44339478ef67935ab2bbaec2fb0322496cf3cbe84b261e06ac3814c572"
dependencies = [
"time-core",
]
[[package]]
name = "tokio"
version = "1.32.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "17ed6077ed6cd6c74735e21f37eb16dc3935f96878b1fe961074089cc80893f9"
dependencies = [
"backtrace",
"bytes",
"libc",
"mio",
"num_cpus",
"parking_lot",
"pin-project-lite",
"signal-hook-registry",
"socket2",
"tokio-macros",
"windows-sys",
]
[[package]]
name = "tokio-macros"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "unicode-ident"
version = "1.0.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "301abaae475aa91687eb82514b328ab47a211a533026cb25fc3e519b86adfc3c"
[[package]]
name = "uuid"
version = "1.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "79daa5ed5740825c40b389c5e50312b9c86df53fccd33f281df655642b43869d"
dependencies = [
"getrandom",
]
[[package]]
name = "wasi"
version = "0.11.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
[[package]]
name = "windows-sys"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9"
dependencies = [
"windows-targets",
]
[[package]]
name = "windows-targets"
version = "0.48.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c"
dependencies = [
"windows_aarch64_gnullvm",
"windows_aarch64_msvc",
"windows_i686_gnu",
"windows_i686_msvc",
"windows_x86_64_gnu",
"windows_x86_64_gnullvm",
"windows_x86_64_msvc",
]
[[package]]
name = "windows_aarch64_gnullvm"
version = "0.48.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8"
[[package]]
name = "windows_aarch64_msvc"
version = "0.48.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc"
[[package]]
name = "windows_i686_gnu"
version = "0.48.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e"
[[package]]
name = "windows_i686_msvc"
version = "0.48.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406"
[[package]]
name = "windows_x86_64_gnu"
version = "0.48.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e"
[[package]]
name = "windows_x86_64_gnullvm"
version = "0.48.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc"
[[package]]
name = "windows_x86_64_msvc"
version = "0.48.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538"

17
Cargo.toml Normal file
View File

@ -0,0 +1,17 @@
[package]
name = "nats-server"
version = "0.1.0"
edition = "2021"
[dependencies]
futures = "0.3.28"
futures-channel = "0.3.28"
multipeek = "0.1.2"
nix = { version = "0.27.1", features = ["socket", "hostname", "net", "dir"] }
pin-project-lite = "0.2.13"
serde = { version = "1.0.188", features = ["derive"] }
serde_json = "1.0.105"
thiserror = "1.0.47"
time = { version = "0.3.28", features = ["serde"] }
tokio = { version = "1.32.0", features = ["full"] }
uuid = { version = "1.4.1", features = ["v4"] }

409
src/main.rs Normal file
View File

@ -0,0 +1,409 @@
#![allow(dead_code)]
use std::borrow::Cow;
use std::collections::HashMap;
use std::sync::Arc;
use time::OffsetDateTime;
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::{
tcp::{OwnedReadHalf, OwnedWriteHalf},
TcpStream,
},
spawn,
sync::{Mutex, RwLock},
};
use uuid::Uuid;
use multipeek::multipeek;
#[derive(Debug)]
pub struct Sub(String);
impl Sub {
pub fn new<S: Into<String>>(s: S) -> Self {Self(s.into())}
pub fn is_wildcard(&self) -> bool {
self.0.contains('*')
}
fn eq_with_wild(&self, chan: &str) -> bool {
let mut left = self.0.chars().peekable();
let mut right = multipeek(chan.chars());
while let Some(c) = left.peek() {
if *c == '*' {
while let Some('*') = left.peek() {
eprintln!("while let * l => {:?}", left.next());
eprintln!("while let * r => {:?}", right.next()); // consume right to match number of wildcard
}
// check non-wildcard
let pair = (left.peek(), right.peek());
eprintln!("after wild {pair:?}");
let current = match pair {
(Some(c), _) => *c,
(None, _) => return true,
};
while let Some(c) = right.peek_nth(0).cloned() {
eprintln!("skipping to {current} -> {c}");
let second = right.peek_nth(1).cloned();
if c == current && second != Some(current) {
break;
} else {
right.next();
}
}
let pair = (left.next(), right.next());
eprintln!("after wild {pair:?}");
match pair {
(None, _) => return true,
(Some(l), Some(r)) if l != r => return false,
(Some(_), None) => return false,
_ => {}
};
} else {
let pair = (left.next(), right.next());
eprintln!("non wild {pair:?}");
match pair {
(None, None) => return true,
(Some(l), Some(r)) if l != r => return false,
_ => {}
};
}
}
left.next() == right.next()
}
pub fn eq_with_chan(&self, chan: &str) -> bool {
if self.is_wildcard() {
println!("is wild");
self.eq_with_wild(chan)
} else {
println!("not wild {} {}", self.0.len(), chan.len());
if self.0.len() != chan.len() {
return false;
}
self.0.eq_ignore_ascii_case(&chan)
}
}
}
#[cfg(test)]
mod sub_tests {
use super::*;
#[test]
fn check_simple() {
println!("================================================");
println!("check simple");
assert_eq!(Sub::new("foo").eq_with_chan("foo"), true);
assert_eq!(Sub::new("foo").eq_with_chan("fo"), false);
assert_eq!(Sub::new("foo").eq_with_chan("oo"), false);
}
#[test]
fn check_ends_wild() {
println!("================================================");
println!("check ends");
assert_eq!(Sub::new("fo*").eq_with_chan("foo"), true);
assert_eq!(Sub::new("f*").eq_with_chan("foo"), true);
assert_eq!(Sub::new("f*").eq_with_chan("foz"), true);
assert_eq!(Sub::new("f*").eq_with_chan("ff"), true);
assert_eq!(Sub::new("f*").eq_with_chan("of"), false);
assert_eq!(Sub::new("f*").eq_with_chan("ofz"), false);
}
#[test]
fn check_starts_wild() {
println!("================================================");
println!("check starts");
assert_eq!(Sub::new("*oo").eq_with_chan("foo"), true);
assert_eq!(Sub::new("*ofo").eq_with_chan("acfofo"), true);
assert_eq!(Sub::new("*o").eq_with_chan("foo"), true);
assert_eq!(Sub::new("*o").eq_with_chan("bar"), false);
assert_eq!(Sub::new("*o").eq_with_chan("foz"), false);
}
#[test]
fn check_middle_wild() {
println!("================================================");
println!("check middle");
assert_eq!(Sub::new("a*z").eq_with_chan("ahz"), true);
assert_eq!(Sub::new("a*z").eq_with_chan("ahkz"), true);
assert_eq!(Sub::new("a*z").eq_with_chan("hakz"), false);
assert_eq!(Sub::new("a*z").eq_with_chan("akza"), false);
}
#[test]
fn check_multiple_wild() {
println!("================================================");
println!("check multi");
assert_eq!(Sub::new("*a*z").eq_with_chan("gahz"), true);
assert_eq!(Sub::new("*a*z").eq_with_chan("ahz"), false);
assert_eq!(Sub::new("a*k*z").eq_with_chan("asklz"), true);
assert_eq!(Sub::new("a*k*z").eq_with_chan("sklz"), false);
assert_eq!(Sub::new("a*k*z").eq_with_chan("saklz"), false);
assert_eq!(Sub::new("a*k*z").eq_with_chan("asklzl"), false);
}
}
#[derive(Debug, Clone)]
struct Server {
clients: Arc<RwLock<HashMap<Uuid, Client>>>,
subcriptions: Arc<RwLock<Vec<(Sub, Uuid)>>>,
}
impl Server {
pub async fn register(&mut self, client: Client) {
self.clients.write().await.insert(client.id, client);
}
pub async fn remove(&mut self, client: Client) {
*client.dead.write().await = true;
self.clients.write().await.remove(&client.id);
}
}
#[derive(Debug, Clone)]
struct Client {
id: uuid::Uuid,
writer: Arc<Mutex<OwnedWriteHalf>>,
dead: Arc<RwLock<bool>>,
}
impl Client {
pub fn new(write: OwnedWriteHalf) -> Self {
Self {
id: uuid::Uuid::new_v4(),
writer: Arc::new(Mutex::new(write)),
dead: Arc::new(RwLock::new(false)),
}
}
pub async fn write(&self, s: Cow<'static, str>) -> Result<(), CmdErr> {
let mut l = self.writer.lock().await;
l.write(format!("{s}\r\n").as_bytes())
.await
.map_err(|e| {
eprintln!("{e}");
CmdErr::WriteFailed
})
.map(|_| ())
}
}
#[tokio::main]
async fn main() -> std::io::Result<()> {
let listener = tokio::net::TcpListener::bind(("0.0.0.0", 4222))
.await
.unwrap();
listener.set_ttl(100).expect("Failed to set TTL");
while let Ok((stream, _addr)) = listener.accept().await {
spawn(async move {
handle_connection(stream).await;
});
}
Ok(())
}
#[derive(Clone)]
struct Context {
last_live_msg: Arc<RwLock<time::OffsetDateTime>>,
}
async fn handle_connection(stream: TcpStream) {
let ctx = Context {
last_live_msg: Arc::new(RwLock::new(time::OffsetDateTime::now_utc())),
};
let is_alive = spawn(start_stream_health_check(ctx.clone()));
let handle_commands = spawn(handle_commands(stream, ctx));
tokio::select!(
_ = is_alive => (),
_ = handle_commands => (),
);
}
async fn handle_commands(stream: TcpStream, ctx: Context) {
let (mut rx, mut tx) = stream.into_split();
let mut buf = Vec::with_capacity(4096);
while let Ok(_) = handle_incomming_cmd(&mut rx, &mut tx, &mut buf, ctx.clone()).await {}
}
async fn handle_incomming_cmd(
rx: &mut OwnedReadHalf,
tx: &mut OwnedWriteHalf,
buf: &mut Vec<u8>,
ctx: Context,
) -> Result<(), CmdErr> {
let Ok(_n) = rx.read_buf(buf).await else {
return Ok(());
};
let pos = match buf.iter().position(|i| *i == b'\r') {
Some(0) | None => return Ok(()),
Some(n) => n,
};
let Ok(msg) = std::str::from_utf8(&buf[..pos]) else {
return Ok(());
};
eprintln!("buffer msg is: {msg:?}");
for msg in msg.to_owned().split('\r') {
eprintln!("Message is: {:?}", msg);
match handle_msg(msg, buf, rx, tx).await {
Err(e) => {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
return Err(e);
}
Ok(Op::NoOp) => {}
};
}
*ctx.last_live_msg.write().await = time::OffsetDateTime::now_utc();
buf.clear();
Ok(())
}
async fn start_stream_health_check(ctx: Context) {
loop {
if *ctx.last_live_msg.read().await + PING_PONG_SPAN >= OffsetDateTime::now_utc() {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
continue;
}
break;
}
}
async fn handle_msg<'stream>(
msg: &str,
buf: &mut Vec<u8>,
rx: &mut OwnedReadHalf,
tx: &mut OwnedWriteHalf,
) -> Result<Op, CmdErr> {
let cmd: Cmd = match Cmd::parse_command(msg, buf, rx).await {
Ok(cmd) => cmd,
Err(e) => {
let msg = format!("-ERR '{e}'\r\n");
tx.write(msg.as_bytes()).await.unwrap();
return Err(e);
}
};
match cmd {
Cmd::Connect(_) => {
tx.write(b"+OK\r\n").await.unwrap();
Ok(Op::NoOp)
}
Cmd::Publish {
channel_name,
payload,
len,
} => {
tx.write(b"+OK\r\n").await.unwrap();
tx.write(format!("received {channel_name:?} {len:?} {payload:?} \r\n").as_bytes())
.await
.unwrap();
Ok(Op::NoOp)
}
Cmd::Ping | Cmd::Pong => {
//
Ok(Op::NoOp)
}
_ => Ok(Op::NoOp),
}
}
enum Op {
NoOp,
}
const PING_PONG_SPAN: time::Duration = time::Duration::seconds(1);
#[derive(Debug)]
struct ChannelName(String);
#[derive(Debug)]
struct UniqueId(String);
#[derive(Debug)]
struct ConnectArgs {}
enum Cmd {
Connect(ConnectArgs),
Ping,
Pong,
Subscribe {
channel_name: ChannelName,
uid: UniqueId,
},
UnSubscribe {
uid: UniqueId,
},
Publish {
channel_name: ChannelName,
payload: String,
len: usize,
},
Message {
channel_name: ChannelName,
target_uid: UniqueId,
payload: String,
len: usize,
},
}
impl Cmd {
async fn parse_command(
s: &str,
buf: &mut Vec<u8>,
rx: &mut OwnedReadHalf,
) -> Result<Self, CmdErr> {
let mut parts = s.split_whitespace();
let cmd = parts.next().ok_or(CmdErr::UnknownCmd)?;
if cmd.eq_ignore_ascii_case("connect") {
let args = ConnectArgs {};
// TODO: args???
Ok(Cmd::Connect(args))
} else if cmd.eq_ignore_ascii_case("ping") {
Ok(Cmd::Ping)
} else if cmd.eq_ignore_ascii_case("pong") {
Ok(Cmd::Pong)
} else if cmd.eq_ignore_ascii_case("pub") {
let channel_name = ChannelName(parts.next().ok_or(CmdErr::MissingChanName)?.to_owned());
let len: usize = parts
.next()
.ok_or(CmdErr::ExpectLen)?
.parse()
.map_err(|_| CmdErr::ExpectLen)?;
let Ok(_) = rx.read_exact(&mut buf[..len]).await else {
return Err(CmdErr::MissingArg);
};
let Ok(payload) = std::str::from_utf8(&buf[..len]) else {
return Err(CmdErr::MissingArg);
};
Ok(Cmd::Publish {
channel_name,
payload: payload.to_owned(),
len,
})
} else {
Err(CmdErr::UnknownCmd)
}
}
}
#[derive(Debug, thiserror::Error)]
enum CmdErr {
#[error("Unknown Protocol Operation")]
UnknownCmd,
#[error("Missing argument")]
MissingArg,
#[error("Missing channel name")]
MissingChanName,
#[error("Missing payload length")]
ExpectLen,
#[error("Unable to write to client")]
WriteFailed,
}