diff --git a/Cargo.lock b/Cargo.lock index 46f7994..bf614c0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -38,10 +38,10 @@ dependencies = [ "bitflags", "bytes 1.4.0", "crossbeam-channel", - "futures-core 0.3.28", - "futures-sink 0.3.28", + "futures-core", + "futures-sink", "futures-task", - "futures-util 0.3.28", + "futures-util", "log", "once_cell", "parking_lot 0.12.1", @@ -59,8 +59,8 @@ checksum = "57a7559404a7f3573127aab53c08ce37a6c6a315c374a31070f3c91cd1b4a7fe" dependencies = [ "bitflags", "bytes 1.4.0", - "futures-core 0.3.28", - "futures-sink 0.3.28", + "futures-core", + "futures-sink", "log", "memchr", "pin-project-lite 0.2.9", @@ -87,7 +87,7 @@ dependencies = [ "derive_more", "encoding_rs", "flate2", - "futures-core 0.3.28", + "futures-core", "h2 0.3.18", "http", "httparse", @@ -137,7 +137,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "15265b6b8e2347670eb363c47fc8c75208b4a4994b27192f345fcbe707804f3e" dependencies = [ "actix-macros", - "futures-core 0.3.28", + "futures-core", "tokio 1.28.0", ] @@ -150,8 +150,8 @@ dependencies = [ "actix-rt", "actix-service", "actix-utils", - "futures-core 0.3.28", - "futures-util 0.3.28", + "futures-core", + "futures-util", "mio 0.8.6", "num_cpus", "socket2 0.4.9", @@ -165,7 +165,7 @@ version = "2.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3b894941f818cfdc7ccc4b9e60fa7e53b5042a2e8567270f9147d5591893373a" dependencies = [ - "futures-core 0.3.28", + "futures-core", "paste", "pin-project-lite 0.2.9", ] @@ -202,8 +202,8 @@ dependencies = [ "cookie 0.16.2", "derive_more", "encoding_rs", - "futures-core 0.3.28", - "futures-util 0.3.28", + "futures-core", + "futures-util", "http", "itoa 1.0.6", "language-tags", @@ -453,7 +453,7 @@ checksum = "cf46fee83e5ccffc220104713af3292ff9bc7c64c7de289f66dae8e38d826833" dependencies = [ "concurrent-queue", "event-listener", - "futures-core 0.3.28", + "futures-core", ] [[package]] @@ -462,7 +462,7 @@ version = "1.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7427a12b8dc09291528cfb1da2447059adb4a257388c2acd6497a79d55cf6f7c" dependencies = [ - "futures-io 0.3.28", + "futures-io", "simple-mutex", ] @@ -504,7 +504,7 @@ dependencies = [ "async-channel", "async-dup", "async-std", - "futures-core 0.3.28", + "futures-core", "http-types", "httparse", "log", @@ -606,9 +606,9 @@ dependencies = [ "async-lock", "async-process", "crossbeam-utils", - "futures-channel 0.3.28", - "futures-core 0.3.28", - "futures-io 0.3.28", + "futures-channel", + "futures-core", + "futures-io", "futures-lite", "gloo-timers", "kv-log-macro", @@ -628,7 +628,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "65bb25e3145c6216eafb3eca6f8cd6e016f43c9d4416d0af7984de46acf4f288" dependencies = [ "chrono", - "futures-util 0.3.28", + "futures-util", "hex", "hmac 0.12.1", "http-types", @@ -663,6 +663,19 @@ dependencies = [ "syn 2.0.15", ] +[[package]] +name = "async-tungstenite" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07b30ef0ea5c20caaa54baea49514a206308989c68be7ecd86c7f956e4da6378" +dependencies = [ + "futures-io", + "futures-util", + "log", + "pin-project-lite 0.2.9", + "tungstenite", +] + [[package]] name = "atoi" version = "1.0.0" @@ -2167,8 +2180,8 @@ version = "0.10.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1657b4441c3403d9f7b3409e47575237dac27b1b5726df654a6ecbf92f0f7577" dependencies = [ - "futures-core 0.3.28", - "futures-sink 0.3.28", + "futures-core", + "futures-sink", "nanorand", "pin-project", "spin 0.9.8", @@ -2279,32 +2292,13 @@ version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "23342abe12aba583913b2e62f22225ff9c950774065e4bfb61a19cd9770fec40" dependencies = [ - "futures-channel 0.3.28", - "futures-core 0.3.28", - "futures-executor 0.3.28", - "futures-io 0.3.28", - "futures-sink 0.3.28", + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", "futures-task", - "futures-util 0.3.28", -] - -[[package]] -name = "futures-async-runtime" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bab8d196f9bcbc3b33148960602889bf44c74afe5bea20ee970aaa08df2db2f5" -dependencies = [ - "futures-core 0.2.1", - "futures-stable", -] - -[[package]] -name = "futures-channel" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bbb37ec6418c577b25f5b129c0f4456ad7ce8714ec43c59712aa7e4cd2cb6b85" -dependencies = [ - "futures-core 0.2.1", + "futures-util", ] [[package]] @@ -2313,17 +2307,8 @@ version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "955518d47e09b25bbebc7a18df10b81f0c766eaf4c4f1cccef2fca5f2a4fb5f2" dependencies = [ - "futures-core 0.3.28", - "futures-sink 0.3.28", -] - -[[package]] -name = "futures-core" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7455c91eb2eae38f33b013f77ebe766c75761af333efd9d550e154045c63e225" -dependencies = [ - "either", + "futures-core", + "futures-sink", ] [[package]] @@ -2332,28 +2317,15 @@ version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4bca583b7e26f571124fe5b7561d49cb2868d79116cfa0eefce955557c6fee8c" -[[package]] -name = "futures-executor" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5db1dd3979745f5e50b28fd604602f2715f9d5a28ab835a5f9686a9d84cd1315" -dependencies = [ - "futures-channel 0.2.1", - "futures-core 0.2.1", - "futures-util 0.2.1", - "lazy_static", - "num_cpus", -] - [[package]] name = "futures-executor" version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ccecee823288125bd88b4d7f565c9e58e41858e47ab72e8ea2d64e93624386e0" dependencies = [ - "futures-core 0.3.28", + "futures-core", "futures-task", - "futures-util 0.3.28", + "futures-util", ] [[package]] @@ -2362,21 +2334,11 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a604f7a68fbf8103337523b1fadc8ade7361ee3f112f7c680ad179651616aed5" dependencies = [ - "futures-core 0.3.28", + "futures-core", "lock_api", "parking_lot 0.11.2", ] -[[package]] -name = "futures-io" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6a0470fdba9dc87c27a3564ad6d5cc04e080f3afa26c93549728cce46ab21a2" -dependencies = [ - "futures-core 0.2.1", - "iovec", -] - [[package]] name = "futures-io" version = "0.3.28" @@ -2390,8 +2352,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49a9d51ce47660b1e808d3c990b4709f2f415d928835a17dfd16991515c46bce" dependencies = [ "fastrand", - "futures-core 0.3.28", - "futures-io 0.3.28", + "futures-core", + "futures-io", "memchr", "parking", "pin-project-lite 0.2.9", @@ -2409,63 +2371,29 @@ dependencies = [ "syn 2.0.15", ] -[[package]] -name = "futures-sink" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a8a93a7c480876b8e02cdd70022e7eb9c8423575ea6a25a0b749b18834c16412" -dependencies = [ - "either", - "futures-channel 0.2.1", - "futures-core 0.2.1", -] - [[package]] name = "futures-sink" version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f43be4fe21a13b9781a69afa4985b0f6ee0e1afab2c6f454a8cf30e2b2237b6e" -[[package]] -name = "futures-stable" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a222f540db94a09c275be08a406b5cd82f4311422b940a684795cdaef3d02e81" -dependencies = [ - "futures-core 0.2.1", - "futures-executor 0.2.1", -] - [[package]] name = "futures-task" version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "76d3d132be6c0e6aa1534069c705a74a5997a356c0dc2f86a47765e5617c5b65" -[[package]] -name = "futures-util" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0cf12a3fc1ccaf1bc2901ec6e0ed6ed407a4f16eaa20dd838f40cabf5f7b31f1" -dependencies = [ - "either", - "futures-channel 0.2.1", - "futures-core 0.2.1", - "futures-io 0.2.1", - "futures-sink 0.2.1", -] - [[package]] name = "futures-util" version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "26b01e40b772d54cf6c6d721c1d1abd0647a0106a12ecaa1c186273392a69533" dependencies = [ - "futures-channel 0.3.28", - "futures-core 0.3.28", - "futures-io 0.3.28", + "futures-channel", + "futures-core", + "futures-io", "futures-macro", - "futures-sink 0.3.28", + "futures-sink", "futures-task", "memchr", "pin-project-lite 0.2.9", @@ -2589,7 +2517,7 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a8d5564e570a38b43d78bdc063374a0c3098c4f0d64005b12f9bbe87e869b6d7" dependencies = [ - "futures-channel 0.3.28", + "futures-channel", "gloo-events", "js-sys", "wasm-bindgen", @@ -2602,8 +2530,8 @@ version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b995a66bb87bebce9a0f4a95aed01daca4872c050bfcb21653361c03bc35e5c" dependencies = [ - "futures-channel 0.3.28", - "futures-core 0.3.28", + "futures-channel", + "futures-core", "js-sys", "wasm-bindgen", ] @@ -2649,9 +2577,9 @@ checksum = "5e4728fd124914ad25e99e3d15a9361a879f6620f63cb56bbb08f95abb97a535" dependencies = [ "bytes 0.5.6", "fnv", - "futures-core 0.3.28", - "futures-sink 0.3.28", - "futures-util 0.3.28", + "futures-core", + "futures-sink", + "futures-util", "http", "indexmap", "slab", @@ -2669,9 +2597,9 @@ checksum = "17f8a914c2987b688368b5138aa05321db91f4090cf26118185672ad588bce21" dependencies = [ "bytes 1.4.0", "fnv", - "futures-core 0.3.28", - "futures-sink 0.3.28", - "futures-util 0.3.28", + "futures-core", + "futures-sink", + "futures-util", "http", "indexmap", "slab", @@ -2944,9 +2872,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a6f157065790a3ed2f88679250419b5cdd96e714a0d65f7797fd337186e96bb" dependencies = [ "bytes 0.5.6", - "futures-channel 0.3.28", - "futures-core 0.3.28", - "futures-util 0.3.28", + "futures-channel", + "futures-core", + "futures-util", "h2 0.2.7", "http", "http-body 0.3.1", @@ -2968,9 +2896,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ab302d72a6f11a3b910431ff93aae7e773078c769f0a3ef15fb9ec692ed147d4" dependencies = [ "bytes 1.4.0", - "futures-channel 0.3.28", - "futures-core 0.3.28", - "futures-util 0.3.28", + "futures-channel", + "futures-core", + "futures-util", "h2 0.3.18", "http", "http-body 0.4.5", @@ -3079,6 +3007,15 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "64e9829a50b42bb782c1df523f78d332fe371b10c661e78b7a3c34b0198e9fac" +[[package]] +name = "input_buffer" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f97967975f448f1a7ddb12b0bc41069d09ed6a1c161a92687e057325db35d413" +dependencies = [ + "bytes 1.4.0", +] + [[package]] name = "insta" version = "1.29.0" @@ -3457,9 +3394,9 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f303ec0e94c6c54447f84f3b0ef7af769858a9c4ef56ef2a986d3dcd4c3fc9c" dependencies = [ - "futures-core 0.3.28", - "futures-sink 0.3.28", - "futures-util 0.3.28", + "futures-core", + "futures-sink", + "futures-util", "local-waker", ] @@ -3468,17 +3405,21 @@ name = "local-event-bus" version = "0.1.0" dependencies = [ "async-std", + "bincode", + "crossbeam-channel", "event-bus-adapter", "flumedb", "futures", - "futures-async-runtime", - "futures-util 0.3.28", + "futures-util", "gumdrop", "serde", "serde_json", "thiserror", "tide", + "tide-websockets", "tracing", + "tracing-subscriber", + "uuid 1.3.2", ] [[package]] @@ -3908,8 +3849,8 @@ version = "0.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c24f96e21e7acc813c7a8394ee94978929db2bcc46cf6b5014fc612bf7760c22" dependencies = [ - "futures-channel 0.3.28", - "futures-util 0.3.28", + "futures-channel", + "futures-util", "indexmap", "js-sys", "once_cell", @@ -3925,9 +3866,9 @@ checksum = "1ca41c4933371b61c2a2f214bf16931499af4ec90543604ec828f7a625c09113" dependencies = [ "async-trait", "crossbeam-channel", - "futures-channel 0.3.28", - "futures-executor 0.3.28", - "futures-util 0.3.28", + "futures-channel", + "futures-executor", + "futures-util", "once_cell", "opentelemetry_api", "percent-encoding", @@ -4614,9 +4555,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c70d5058251eb4b8acee55aad429a7a8a4ef2b61fe1f7be39ac6cfcd2934f334" dependencies = [ "bytes 1.4.0", - "futures-channel 0.3.28", - "futures-sink 0.3.28", - "futures-util 0.3.28", + "futures-channel", + "futures-sink", + "futures-util", "log", "pin-project", "tokio 1.28.0", @@ -4629,8 +4570,7 @@ version = "0.1.0" dependencies = [ "event-bus-adapter", "futures", - "futures-async-runtime", - "futures-util 0.3.28", + "futures-util", "redis-async", "serde", "thiserror", @@ -4728,8 +4668,8 @@ dependencies = [ "base64 0.13.1", "bytes 0.5.6", "encoding_rs", - "futures-core 0.3.28", - "futures-util 0.3.28", + "futures-core", + "futures-util", "http", "http-body 0.3.1", "hyper 0.13.10", @@ -4763,8 +4703,8 @@ dependencies = [ "base64 0.21.0", "bytes 1.4.0", "encoding_rs", - "futures-core 0.3.28", - "futures-util 0.3.28", + "futures-core", + "futures-util", "h2 0.3.18", "http", "http-body 0.4.5", @@ -5021,8 +4961,8 @@ checksum = "4ed36cdb20de66d89a17ea04b8883fc7a386f2cf877aaedca5005583ce4876ff" dependencies = [ "crossbeam-channel", "futures", - "futures-channel 0.3.28", - "futures-executor 0.3.28", + "futures-channel", + "futures-executor", "num_cpus", ] @@ -5315,6 +5255,19 @@ dependencies = [ "serde", ] +[[package]] +name = "sha-1" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "99cd6713db3cf16b6c84e06321e049a9b9f699826e16096d23bbcc44d15d51a6" +dependencies = [ + "block-buffer 0.9.0", + "cfg-if 1.0.0", + "cpufeatures", + "digest 0.9.0", + "opaque-debug", +] + [[package]] name = "sha1" version = "0.6.1" @@ -5579,10 +5532,10 @@ dependencies = [ "dotenvy", "either", "event-listener", - "futures-channel 0.3.28", - "futures-core 0.3.28", + "futures-channel", + "futures-core", "futures-intrusive", - "futures-util 0.3.28", + "futures-util", "git2", "hashlink", "hex", @@ -6066,7 +6019,7 @@ dependencies = [ "async-std", "async-trait", "femme", - "futures-util 0.3.28", + "futures-util", "http-client", "http-types", "kv-log-macro", @@ -6077,6 +6030,24 @@ dependencies = [ "serde_json", ] +[[package]] +name = "tide-websockets" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3592c5cb5cb1b7a2ff3a0e5353170c1bb5b104b2f66dd06f73304169b52cc725" +dependencies = [ + "async-dup", + "async-std", + "async-tungstenite", + "base64 0.13.1", + "futures-util", + "pin-project", + "serde", + "serde_json", + "sha-1", + "tide", +] + [[package]] name = "time" version = "0.1.45" @@ -6190,7 +6161,7 @@ dependencies = [ "dotenv", "fake", "futures", - "futures-util 0.3.28", + "futures-util", "gumdrop", "hmac 0.12.1", "jwt", @@ -6218,7 +6189,7 @@ checksum = "6703a273949a90131b290be1fe7b039d0fc884aa1935860dfcbe056f28cd8092" dependencies = [ "bytes 0.5.6", "fnv", - "futures-core 0.3.28", + "futures-core", "iovec", "lazy_static", "memchr", @@ -6288,8 +6259,8 @@ dependencies = [ "bincode", "bytes 1.4.0", "educe", - "futures-core 0.3.28", - "futures-sink 0.3.28", + "futures-core", + "futures-sink", "pin-project", "serde", "serde_json", @@ -6301,7 +6272,7 @@ version = "0.1.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "397c988d37662c7dda6d2208364a706264bf3d6138b11d436cbac0ad38832842" dependencies = [ - "futures-core 0.3.28", + "futures-core", "pin-project-lite 0.2.9", "tokio 1.28.0", ] @@ -6323,8 +6294,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "be8242891f2b6cbef26a2d7e8605133c2c554cd35b3e4948ea892d6d68436499" dependencies = [ "bytes 0.5.6", - "futures-core 0.3.28", - "futures-sink 0.3.28", + "futures-core", + "futures-sink", "log", "pin-project-lite 0.1.12", "tokio 0.2.25", @@ -6337,8 +6308,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "806fe8c2c87eccc8b3267cbae29ed3ab2d0bd37fca70ab622e46aaa9375ddb7d" dependencies = [ "bytes 1.4.0", - "futures-core 0.3.28", - "futures-sink 0.3.28", + "futures-core", + "futures-sink", "pin-project-lite 0.2.9", "slab", "tokio 1.28.0", @@ -6524,6 +6495,26 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3528ecfd12c466c6f163363caf2d02a71161dd5e1cc6ae7b34207ea2d42d81ed" +[[package]] +name = "tungstenite" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5fe8dada8c1a3aeca77d6b51a4f1314e0f4b8e438b7b1b71e3ddaca8080e4093" +dependencies = [ + "base64 0.13.1", + "byteorder", + "bytes 1.4.0", + "http", + "httparse", + "input_buffer", + "log", + "rand 0.8.5", + "sha-1", + "thiserror", + "url", + "utf-8", +] + [[package]] name = "type-map" version = "0.4.0" @@ -6651,6 +6642,12 @@ dependencies = [ "serde", ] +[[package]] +name = "utf-8" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" + [[package]] name = "uuid" version = "0.8.2" diff --git a/crates/event-bus-messages/src/lib.rs b/crates/event-bus-messages/src/lib.rs index 9f79368..61388bb 100644 --- a/crates/event-bus-messages/src/lib.rs +++ b/crates/event-bus-messages/src/lib.rs @@ -1,11 +1,26 @@ -#[derive(Debug, serde::Deserialize, serde::Serialize)] +#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] pub enum Msg { Seek(usize), + Ping, + Pong, + Ack, + Test1, + Test2, +} + +impl Msg { + pub fn from_bytes(v: &[u8]) -> bincode::Result { + bincode::deserialize(v) + } + + pub fn to_bytes(&self) -> bincode::Result> { + bincode::serialize(self) + } } #[derive(Debug, serde::Deserialize, serde::Serialize)] pub struct Message { - pub offset: usize, + pub offset: u64, pub payload: Msg, } @@ -17,4 +32,8 @@ impl Message { pub fn to_bytes(&self) -> bincode::Result> { bincode::serialize(self) } + + pub fn to_bytes_into(&self, buffer: &mut [u8]) -> bincode::Result<()> { + bincode::serialize_into(buffer, self) + } } diff --git a/crates/local-event-bus/Cargo.toml b/crates/local-event-bus/Cargo.toml index daf5394..e99ee16 100644 --- a/crates/local-event-bus/Cargo.toml +++ b/crates/local-event-bus/Cargo.toml @@ -3,16 +3,32 @@ name = "local-event-bus" version = "0.1.0" edition = "2021" +[[bin]] +name = "leb" +path = "./src/main.rs" + +[[bin]] +name = "lebc" +path = "./src/check1.rs" + +[lib] +name = "leb" +path = "./src/lib.rs" + [dependencies] event-bus-adapter = { path = "../event-bus-adapter" } thiserror = { version = "1.0.40" } futures-util = { version = "0.3.28" } futures = { version = "0.3.28" } -futures-async-runtime = { version = "0.2.1" } tracing = { version = "0" } serde = { version = "1.0.162", features = ['derive'] } -flumedb = "*" -serde_json = "*" +flumedb = { version = "*" } +serde_json = { version = "*" } async-std = { version = "*", features = ["attributes"] } -tide = "*" -gumdrop = "*" +tide = { version = "0.16.0" } +gumdrop = { version = "*" } +tide-websockets = "0.4.0" +bincode = { version = "1" } +uuid = { version = "1.3.2", features = ['v4'] } +crossbeam-channel = { version = "0.5.8" } +tracing-subscriber = { version = "0.3.17", features = ['env-filter'] } diff --git a/crates/local-event-bus/src/check1.rs b/crates/local-event-bus/src/check1.rs new file mode 100644 index 0000000..a204e8a --- /dev/null +++ b/crates/local-event-bus/src/check1.rs @@ -0,0 +1,4 @@ +#[async_std::main] +async fn main() { + +} \ No newline at end of file diff --git a/crates/local-event-bus/src/lib.rs b/crates/local-event-bus/src/lib.rs new file mode 100644 index 0000000..888d80b --- /dev/null +++ b/crates/local-event-bus/src/lib.rs @@ -0,0 +1,50 @@ +#![feature(async_fn_in_trait)] + +use std::pin::Pin; +use std::task::{Context, Poll}; + +use event_bus_adapter::*; +use futures_util::AsyncReadExt; + +pub struct L {} + +pub struct MessageStream; + +impl futures::stream::Stream for MessageStream { + type Item = Message; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + todo!(); + } +} + +pub struct MessageSender; + +impl MessageSender { + pub async fn ack(&mut self) { + // + } +} + +impl MessageSend for MessageSender { + async fn send(&mut self, msg: Msg) { + todo!() + } +} + +#[derive(Debug, serde::Serialize, serde::Deserialize)] +pub struct LebConfig { + bind: String, + port: u16, +} + +impl EventBus for L { + async fn connect(config: Config) -> Result<(MessageStream, MessageSender), ()> { + let config: LebConfig = config.config().expect("Invalid Local Event Bus config"); + let client = async_std::net::TcpStream::connect(format!("{}:{}", config.bind, config.port)) + .await + .expect("Failed tp connect to event bus"); + + todo!() + } +} diff --git a/crates/local-event-bus/src/main.rs b/crates/local-event-bus/src/main.rs index 87067f3..9f5e342 100644 --- a/crates/local-event-bus/src/main.rs +++ b/crates/local-event-bus/src/main.rs @@ -1,53 +1,290 @@ -use flumedb::*; -use gumdrop::Options; +use std::collections::HashMap; +use std::sync::{Arc, RwLock}; + +use async_std::net::{TcpListener, TcpStream}; use event_bus_adapter::*; +use flumedb::*; +use futures_util::{AsyncReadExt, AsyncWriteExt, StreamExt}; +use gumdrop::Options; +use tracing::{error, info, warn}; +use tracing_subscriber::EnvFilter; +use uuid::Uuid; #[derive(Debug, thiserror::Error)] -enum Error {} +enum Error { + #[error("Client closed connection")] + BrokenPipe, +} #[derive(Options)] struct Opts { help: bool, - log_path: String, - bind: String, - listen_port: u16, - broadcast_port: u16, + log_path: Option, + bind: Option, + port: Option, } #[async_std::main] async fn main() -> Result<(), Error> { + tracing_subscriber::fmt() + .with_env_filter(EnvFilter::from_default_env()) + .init(); + let opts: Opts = Options::parse_args_default_or_exit(); - let path = opts.log_path; - let mut log = OffsetLog::::from_file( - std::fs::OpenOptions::new() - .append(true) - .create(true) - .write(true) - .read(true) - .open(&path) - .expect("Failed to open log file"), - ) - .expect("Failed to open log file"); + let path = opts + .log_path + .unwrap_or_else(|| std::env::var("LOG_PATH").expect("No path to save file")); - let len = log.iter().count(); + let listener = TcpListener::bind(format!( + "{}:{}", + opts.bind + .unwrap_or_else(|| std::env::var("BIND").expect("No bind parameter")), + opts.port.unwrap_or_else(|| std::env::var("PORT") + .expect("No port parameter") + .parse() + .expect("Invalid port format. Expect number")) + )) + .await + .expect("Failed to start server"); - log.append(&len.to_le_bytes()).expect("Failed to write"); + let mut incoming = listener.incoming(); - // Read the entry at offset 0 - let r = log.read(0).expect("Failed to read at 0"); - // `r.data` is a json string in a standard ssb log. - // `r.next` is the offset of the next entry. - let r = log.read(r.next); + let clients = Arc::new(RwLock::new(HashMap::with_capacity(4086))); + let (tx, rx) = std::sync::mpsc::channel(); - log.iter().map(|r| { - eprintln!("{r:?}"); - let (vec, _) = r.data.split_at(std::mem::size_of::()); - usize::from_le_bytes(vec.try_into().unwrap()) - }) - .for_each(|v| eprintln!("value {v:?}")); - // log.iter() - // .map(|e| serde_json::from_slice::(&e.data).unwrap()) - // .for_each(|v| println!("{}", serde_json::to_string_pretty(&v).unwrap())); + { + let clients = clients.clone(); + async_std::task::spawn(async move { + loop { + let Ok(msg) = rx.recv() else { + continue; + }; + match msg { + InnerMsg::Drop(uuid) => { + clients.write().unwrap().remove(&uuid); + } + InnerMsg::Register(client) => { + clients.write().unwrap().insert(client.id, client); + } + } + } + }); + } + + while let Some(stream) = incoming.next().await { + let path = path.clone(); + let tx = tx.clone(); + let clients = clients.clone(); + + async_std::task::spawn(async move { + let Ok(stream) = stream else { + return; + }; + let mut log = OffsetLog::::from_file( + std::fs::OpenOptions::new() + .append(true) + .create(true) + .write(true) + .read(true) + .open(&path) + .expect("Failed to open log file"), + ) + .expect("Failed to open log file"); + + let mut client = Client { + stream, + id: Uuid::new_v4(), + alive: Arc::new(RwLock::new(true)), + offset: None, + }; + let mut buffer = [0; 4086]; + + if let Err(e) = tx.send(InnerMsg::Register(client.clone())) { + warn!("Failed to send register: {e}"); + } + + loop { + let len = match AsyncReadExt::read(&mut client.stream, &mut buffer).await { + Ok(n) => n, + Err(e) => { + warn!("Failed to read from client: {e}"); + break; + } + }; + let msg = match Msg::from_bytes(&buffer[..len]) { + Ok(msg) => msg, + Err(e) => { + warn!("Invalid incoming message {buffer:?}: {e}"); + continue; + } + }; + match &msg { + Msg::Ping | Msg::Pong => { + if let Err(e) = client + .stream + .write_all(&Msg::Ping.to_bytes().unwrap_or_default()) + .await + { + warn!("Failed to write to client. Closing. {e}"); + client.stream.close().await.ok(); + break; + } + } + Msg::Seek(offset) => { + let log_file = match OffsetLog::::open_read_only(&path) { + Ok(f) => f, + Err(e) => { + error!("Failed to open file for read: {e}"); + continue; + } + }; + let offset = (*offset) as u64; + client.offset = Some(Arc::new(RwLock::new(offset))); + + let iter = log_file.bidir_iter_at_offset(offset); + let mut iter = + iter.filter_map(|e| Msg::from_bytes(&e.data).ok().zip(Some(e.offset))); + while let Some((msg, offset)) = iter.next() { + match send_directly(client.clone(), &mut buffer, offset, msg).await { + Err(Error::BrokenPipe) => break, + Err(e) => { + warn!("Failed to send message: {e}"); + continue; + } + _ => { + if let Err(Error::BrokenPipe) = + wait_ack(client.clone(), &mut buffer).await + { + break; + } + if let Some(o) = client.offset.as_mut() { + *o.write().unwrap() = offset; + } + } + }; + } + } + _ => { + info!("Incoming message: {msg:?}"); + if client.offset.is_none() { + warn!("Offset is not set. Skipping..."); + continue; + } + let bytes = match msg.to_bytes() { + Ok(v) => v, + Err(e) => { + warn!("Failed to serialize message: {e}"); + break; + } + }; + let offset = match log.append(&bytes) { + Ok(offset) => offset, + Err(e) => { + error!("Failed to write message to file: {e}"); + break; + } + }; + if let Err(e) = broadcast(clients.clone(), &mut buffer, offset, msg).await { + warn!("Failed to broadcast message: {e}"); + } + } + }; + } + + *client.alive.write().unwrap() = false; + if let Err(e) = tx.send(InnerMsg::Drop(client.id)) { + warn!("Failed to send close client: {e}"); + } + }); + } Ok(()) } + +async fn broadcast( + clients: Arc>>, + buffer: &mut [u8], + offset: u64, + msg: Msg, +) -> Result<(), Error> { + let clients = clients + .read() + .unwrap() + .values() + .filter(|c| *c.alive.read().unwrap()) + .map(Clone::clone) + .collect::>(); + for client in clients { + let id = client.id; + if let Err(e) = send_directly(client.clone(), buffer, offset, msg.clone()).await { + warn!("Failed to send message to {}: {e}", id); + } + if let Err(Error::BrokenPipe) = wait_ack(client, buffer).await { + break; + } + } + Ok(()) +} + +async fn send_directly( + mut client: Client, + buffer: &mut [u8], + offset: u64, + msg: Msg, +) -> Result<(), Error> { + if let Err(e) = (Message { + offset, + payload: msg, + }) + .to_bytes_into(buffer) + { + warn!("Failed to serialize message: {e}"); + return Ok(()); + }; + if let Err(e) = client.stream.write(buffer).await { + warn!("Failed to write message to client: {e}"); + Err(Error::BrokenPipe) + } else { + Ok(()) + } +} + +async fn read_msg(mut client: Client, buffer: &mut [u8]) -> Result, Error> { + let len = match AsyncReadExt::read(&mut client.stream, buffer).await { + Ok(n) => n, + Err(e) => { + warn!("Failed to read from client: {e}"); + return Err(Error::BrokenPipe); + } + }; + match Msg::from_bytes(&buffer[..len]) { + Ok(msg) => Ok(Some(msg)), + Err(e) => { + warn!("Invalid incoming message {buffer:?}: {e}"); + return Ok(None); + } + } +} + +async fn wait_ack(client: Client, buffer: &mut [u8]) -> Result<(), Error> { + loop { + let msg = read_msg(client.clone(), buffer).await?; + if let Some(Msg::Ack) = msg { + break; + } + } + Ok(()) +} + +enum InnerMsg { + Register(Client), + Drop(Uuid), +} + +#[derive(Clone)] +struct Client { + stream: TcpStream, + id: Uuid, + alive: Arc>, + offset: Option>>, +} diff --git a/crates/redis-event-bus/Cargo.toml b/crates/redis-event-bus/Cargo.toml index 42c37d8..459f22d 100644 --- a/crates/redis-event-bus/Cargo.toml +++ b/crates/redis-event-bus/Cargo.toml @@ -9,6 +9,5 @@ event-bus-adapter = { path = "../event-bus-adapter" } thiserror = { version = "1.0.40" } futures-util = { version = "0.3.28" } futures = { version = "0.3.28" } -futures-async-runtime = { version = "0.2.1" } tracing = { version = "0" } serde = { version = "1.0.162" } diff --git a/crates/redis-event-bus/src/lib.rs b/crates/redis-event-bus/src/lib.rs index 89cbdf0..c8deb13 100644 --- a/crates/redis-event-bus/src/lib.rs +++ b/crates/redis-event-bus/src/lib.rs @@ -9,7 +9,7 @@ use futures_util::{SinkExt, StreamExt}; use redis_async::client::connect::RespConnection; use redis_async::resp::RespValue; use redis_async::resp::RespValue::BulkString; -use tracing::log::warn; +use tracing::warn; pub struct MessageSender(SplitSink);