local event bus server, start client and check

This commit is contained in:
Adrian Woźniak 2023-05-26 21:16:53 +02:00
parent 4a557a7de3
commit a917b7c064
8 changed files with 537 additions and 215 deletions

341
Cargo.lock generated
View File

@ -38,10 +38,10 @@ dependencies = [
"bitflags",
"bytes 1.4.0",
"crossbeam-channel",
"futures-core 0.3.28",
"futures-sink 0.3.28",
"futures-core",
"futures-sink",
"futures-task",
"futures-util 0.3.28",
"futures-util",
"log",
"once_cell",
"parking_lot 0.12.1",
@ -59,8 +59,8 @@ checksum = "57a7559404a7f3573127aab53c08ce37a6c6a315c374a31070f3c91cd1b4a7fe"
dependencies = [
"bitflags",
"bytes 1.4.0",
"futures-core 0.3.28",
"futures-sink 0.3.28",
"futures-core",
"futures-sink",
"log",
"memchr",
"pin-project-lite 0.2.9",
@ -87,7 +87,7 @@ dependencies = [
"derive_more",
"encoding_rs",
"flate2",
"futures-core 0.3.28",
"futures-core",
"h2 0.3.18",
"http",
"httparse",
@ -137,7 +137,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "15265b6b8e2347670eb363c47fc8c75208b4a4994b27192f345fcbe707804f3e"
dependencies = [
"actix-macros",
"futures-core 0.3.28",
"futures-core",
"tokio 1.28.0",
]
@ -150,8 +150,8 @@ dependencies = [
"actix-rt",
"actix-service",
"actix-utils",
"futures-core 0.3.28",
"futures-util 0.3.28",
"futures-core",
"futures-util",
"mio 0.8.6",
"num_cpus",
"socket2 0.4.9",
@ -165,7 +165,7 @@ version = "2.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3b894941f818cfdc7ccc4b9e60fa7e53b5042a2e8567270f9147d5591893373a"
dependencies = [
"futures-core 0.3.28",
"futures-core",
"paste",
"pin-project-lite 0.2.9",
]
@ -202,8 +202,8 @@ dependencies = [
"cookie 0.16.2",
"derive_more",
"encoding_rs",
"futures-core 0.3.28",
"futures-util 0.3.28",
"futures-core",
"futures-util",
"http",
"itoa 1.0.6",
"language-tags",
@ -453,7 +453,7 @@ checksum = "cf46fee83e5ccffc220104713af3292ff9bc7c64c7de289f66dae8e38d826833"
dependencies = [
"concurrent-queue",
"event-listener",
"futures-core 0.3.28",
"futures-core",
]
[[package]]
@ -462,7 +462,7 @@ version = "1.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7427a12b8dc09291528cfb1da2447059adb4a257388c2acd6497a79d55cf6f7c"
dependencies = [
"futures-io 0.3.28",
"futures-io",
"simple-mutex",
]
@ -504,7 +504,7 @@ dependencies = [
"async-channel",
"async-dup",
"async-std",
"futures-core 0.3.28",
"futures-core",
"http-types",
"httparse",
"log",
@ -606,9 +606,9 @@ dependencies = [
"async-lock",
"async-process",
"crossbeam-utils",
"futures-channel 0.3.28",
"futures-core 0.3.28",
"futures-io 0.3.28",
"futures-channel",
"futures-core",
"futures-io",
"futures-lite",
"gloo-timers",
"kv-log-macro",
@ -628,7 +628,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "65bb25e3145c6216eafb3eca6f8cd6e016f43c9d4416d0af7984de46acf4f288"
dependencies = [
"chrono",
"futures-util 0.3.28",
"futures-util",
"hex",
"hmac 0.12.1",
"http-types",
@ -663,6 +663,19 @@ dependencies = [
"syn 2.0.15",
]
[[package]]
name = "async-tungstenite"
version = "0.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "07b30ef0ea5c20caaa54baea49514a206308989c68be7ecd86c7f956e4da6378"
dependencies = [
"futures-io",
"futures-util",
"log",
"pin-project-lite 0.2.9",
"tungstenite",
]
[[package]]
name = "atoi"
version = "1.0.0"
@ -2167,8 +2180,8 @@ version = "0.10.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1657b4441c3403d9f7b3409e47575237dac27b1b5726df654a6ecbf92f0f7577"
dependencies = [
"futures-core 0.3.28",
"futures-sink 0.3.28",
"futures-core",
"futures-sink",
"nanorand",
"pin-project",
"spin 0.9.8",
@ -2279,32 +2292,13 @@ version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "23342abe12aba583913b2e62f22225ff9c950774065e4bfb61a19cd9770fec40"
dependencies = [
"futures-channel 0.3.28",
"futures-core 0.3.28",
"futures-executor 0.3.28",
"futures-io 0.3.28",
"futures-sink 0.3.28",
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
"futures-util 0.3.28",
]
[[package]]
name = "futures-async-runtime"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bab8d196f9bcbc3b33148960602889bf44c74afe5bea20ee970aaa08df2db2f5"
dependencies = [
"futures-core 0.2.1",
"futures-stable",
]
[[package]]
name = "futures-channel"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbb37ec6418c577b25f5b129c0f4456ad7ce8714ec43c59712aa7e4cd2cb6b85"
dependencies = [
"futures-core 0.2.1",
"futures-util",
]
[[package]]
@ -2313,17 +2307,8 @@ version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "955518d47e09b25bbebc7a18df10b81f0c766eaf4c4f1cccef2fca5f2a4fb5f2"
dependencies = [
"futures-core 0.3.28",
"futures-sink 0.3.28",
]
[[package]]
name = "futures-core"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7455c91eb2eae38f33b013f77ebe766c75761af333efd9d550e154045c63e225"
dependencies = [
"either",
"futures-core",
"futures-sink",
]
[[package]]
@ -2332,28 +2317,15 @@ version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4bca583b7e26f571124fe5b7561d49cb2868d79116cfa0eefce955557c6fee8c"
[[package]]
name = "futures-executor"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5db1dd3979745f5e50b28fd604602f2715f9d5a28ab835a5f9686a9d84cd1315"
dependencies = [
"futures-channel 0.2.1",
"futures-core 0.2.1",
"futures-util 0.2.1",
"lazy_static",
"num_cpus",
]
[[package]]
name = "futures-executor"
version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ccecee823288125bd88b4d7f565c9e58e41858e47ab72e8ea2d64e93624386e0"
dependencies = [
"futures-core 0.3.28",
"futures-core",
"futures-task",
"futures-util 0.3.28",
"futures-util",
]
[[package]]
@ -2362,21 +2334,11 @@ version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a604f7a68fbf8103337523b1fadc8ade7361ee3f112f7c680ad179651616aed5"
dependencies = [
"futures-core 0.3.28",
"futures-core",
"lock_api",
"parking_lot 0.11.2",
]
[[package]]
name = "futures-io"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6a0470fdba9dc87c27a3564ad6d5cc04e080f3afa26c93549728cce46ab21a2"
dependencies = [
"futures-core 0.2.1",
"iovec",
]
[[package]]
name = "futures-io"
version = "0.3.28"
@ -2390,8 +2352,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49a9d51ce47660b1e808d3c990b4709f2f415d928835a17dfd16991515c46bce"
dependencies = [
"fastrand",
"futures-core 0.3.28",
"futures-io 0.3.28",
"futures-core",
"futures-io",
"memchr",
"parking",
"pin-project-lite 0.2.9",
@ -2409,63 +2371,29 @@ dependencies = [
"syn 2.0.15",
]
[[package]]
name = "futures-sink"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a8a93a7c480876b8e02cdd70022e7eb9c8423575ea6a25a0b749b18834c16412"
dependencies = [
"either",
"futures-channel 0.2.1",
"futures-core 0.2.1",
]
[[package]]
name = "futures-sink"
version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f43be4fe21a13b9781a69afa4985b0f6ee0e1afab2c6f454a8cf30e2b2237b6e"
[[package]]
name = "futures-stable"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a222f540db94a09c275be08a406b5cd82f4311422b940a684795cdaef3d02e81"
dependencies = [
"futures-core 0.2.1",
"futures-executor 0.2.1",
]
[[package]]
name = "futures-task"
version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "76d3d132be6c0e6aa1534069c705a74a5997a356c0dc2f86a47765e5617c5b65"
[[package]]
name = "futures-util"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0cf12a3fc1ccaf1bc2901ec6e0ed6ed407a4f16eaa20dd838f40cabf5f7b31f1"
dependencies = [
"either",
"futures-channel 0.2.1",
"futures-core 0.2.1",
"futures-io 0.2.1",
"futures-sink 0.2.1",
]
[[package]]
name = "futures-util"
version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26b01e40b772d54cf6c6d721c1d1abd0647a0106a12ecaa1c186273392a69533"
dependencies = [
"futures-channel 0.3.28",
"futures-core 0.3.28",
"futures-io 0.3.28",
"futures-channel",
"futures-core",
"futures-io",
"futures-macro",
"futures-sink 0.3.28",
"futures-sink",
"futures-task",
"memchr",
"pin-project-lite 0.2.9",
@ -2589,7 +2517,7 @@ version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a8d5564e570a38b43d78bdc063374a0c3098c4f0d64005b12f9bbe87e869b6d7"
dependencies = [
"futures-channel 0.3.28",
"futures-channel",
"gloo-events",
"js-sys",
"wasm-bindgen",
@ -2602,8 +2530,8 @@ version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b995a66bb87bebce9a0f4a95aed01daca4872c050bfcb21653361c03bc35e5c"
dependencies = [
"futures-channel 0.3.28",
"futures-core 0.3.28",
"futures-channel",
"futures-core",
"js-sys",
"wasm-bindgen",
]
@ -2649,9 +2577,9 @@ checksum = "5e4728fd124914ad25e99e3d15a9361a879f6620f63cb56bbb08f95abb97a535"
dependencies = [
"bytes 0.5.6",
"fnv",
"futures-core 0.3.28",
"futures-sink 0.3.28",
"futures-util 0.3.28",
"futures-core",
"futures-sink",
"futures-util",
"http",
"indexmap",
"slab",
@ -2669,9 +2597,9 @@ checksum = "17f8a914c2987b688368b5138aa05321db91f4090cf26118185672ad588bce21"
dependencies = [
"bytes 1.4.0",
"fnv",
"futures-core 0.3.28",
"futures-sink 0.3.28",
"futures-util 0.3.28",
"futures-core",
"futures-sink",
"futures-util",
"http",
"indexmap",
"slab",
@ -2944,9 +2872,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a6f157065790a3ed2f88679250419b5cdd96e714a0d65f7797fd337186e96bb"
dependencies = [
"bytes 0.5.6",
"futures-channel 0.3.28",
"futures-core 0.3.28",
"futures-util 0.3.28",
"futures-channel",
"futures-core",
"futures-util",
"h2 0.2.7",
"http",
"http-body 0.3.1",
@ -2968,9 +2896,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab302d72a6f11a3b910431ff93aae7e773078c769f0a3ef15fb9ec692ed147d4"
dependencies = [
"bytes 1.4.0",
"futures-channel 0.3.28",
"futures-core 0.3.28",
"futures-util 0.3.28",
"futures-channel",
"futures-core",
"futures-util",
"h2 0.3.18",
"http",
"http-body 0.4.5",
@ -3079,6 +3007,15 @@ version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "64e9829a50b42bb782c1df523f78d332fe371b10c661e78b7a3c34b0198e9fac"
[[package]]
name = "input_buffer"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f97967975f448f1a7ddb12b0bc41069d09ed6a1c161a92687e057325db35d413"
dependencies = [
"bytes 1.4.0",
]
[[package]]
name = "insta"
version = "1.29.0"
@ -3457,9 +3394,9 @@ version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f303ec0e94c6c54447f84f3b0ef7af769858a9c4ef56ef2a986d3dcd4c3fc9c"
dependencies = [
"futures-core 0.3.28",
"futures-sink 0.3.28",
"futures-util 0.3.28",
"futures-core",
"futures-sink",
"futures-util",
"local-waker",
]
@ -3468,17 +3405,21 @@ name = "local-event-bus"
version = "0.1.0"
dependencies = [
"async-std",
"bincode",
"crossbeam-channel",
"event-bus-adapter",
"flumedb",
"futures",
"futures-async-runtime",
"futures-util 0.3.28",
"futures-util",
"gumdrop",
"serde",
"serde_json",
"thiserror",
"tide",
"tide-websockets",
"tracing",
"tracing-subscriber",
"uuid 1.3.2",
]
[[package]]
@ -3908,8 +3849,8 @@ version = "0.18.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c24f96e21e7acc813c7a8394ee94978929db2bcc46cf6b5014fc612bf7760c22"
dependencies = [
"futures-channel 0.3.28",
"futures-util 0.3.28",
"futures-channel",
"futures-util",
"indexmap",
"js-sys",
"once_cell",
@ -3925,9 +3866,9 @@ checksum = "1ca41c4933371b61c2a2f214bf16931499af4ec90543604ec828f7a625c09113"
dependencies = [
"async-trait",
"crossbeam-channel",
"futures-channel 0.3.28",
"futures-executor 0.3.28",
"futures-util 0.3.28",
"futures-channel",
"futures-executor",
"futures-util",
"once_cell",
"opentelemetry_api",
"percent-encoding",
@ -4614,9 +4555,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c70d5058251eb4b8acee55aad429a7a8a4ef2b61fe1f7be39ac6cfcd2934f334"
dependencies = [
"bytes 1.4.0",
"futures-channel 0.3.28",
"futures-sink 0.3.28",
"futures-util 0.3.28",
"futures-channel",
"futures-sink",
"futures-util",
"log",
"pin-project",
"tokio 1.28.0",
@ -4629,8 +4570,7 @@ version = "0.1.0"
dependencies = [
"event-bus-adapter",
"futures",
"futures-async-runtime",
"futures-util 0.3.28",
"futures-util",
"redis-async",
"serde",
"thiserror",
@ -4728,8 +4668,8 @@ dependencies = [
"base64 0.13.1",
"bytes 0.5.6",
"encoding_rs",
"futures-core 0.3.28",
"futures-util 0.3.28",
"futures-core",
"futures-util",
"http",
"http-body 0.3.1",
"hyper 0.13.10",
@ -4763,8 +4703,8 @@ dependencies = [
"base64 0.21.0",
"bytes 1.4.0",
"encoding_rs",
"futures-core 0.3.28",
"futures-util 0.3.28",
"futures-core",
"futures-util",
"h2 0.3.18",
"http",
"http-body 0.4.5",
@ -5021,8 +4961,8 @@ checksum = "4ed36cdb20de66d89a17ea04b8883fc7a386f2cf877aaedca5005583ce4876ff"
dependencies = [
"crossbeam-channel",
"futures",
"futures-channel 0.3.28",
"futures-executor 0.3.28",
"futures-channel",
"futures-executor",
"num_cpus",
]
@ -5315,6 +5255,19 @@ dependencies = [
"serde",
]
[[package]]
name = "sha-1"
version = "0.9.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "99cd6713db3cf16b6c84e06321e049a9b9f699826e16096d23bbcc44d15d51a6"
dependencies = [
"block-buffer 0.9.0",
"cfg-if 1.0.0",
"cpufeatures",
"digest 0.9.0",
"opaque-debug",
]
[[package]]
name = "sha1"
version = "0.6.1"
@ -5579,10 +5532,10 @@ dependencies = [
"dotenvy",
"either",
"event-listener",
"futures-channel 0.3.28",
"futures-core 0.3.28",
"futures-channel",
"futures-core",
"futures-intrusive",
"futures-util 0.3.28",
"futures-util",
"git2",
"hashlink",
"hex",
@ -6066,7 +6019,7 @@ dependencies = [
"async-std",
"async-trait",
"femme",
"futures-util 0.3.28",
"futures-util",
"http-client",
"http-types",
"kv-log-macro",
@ -6077,6 +6030,24 @@ dependencies = [
"serde_json",
]
[[package]]
name = "tide-websockets"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3592c5cb5cb1b7a2ff3a0e5353170c1bb5b104b2f66dd06f73304169b52cc725"
dependencies = [
"async-dup",
"async-std",
"async-tungstenite",
"base64 0.13.1",
"futures-util",
"pin-project",
"serde",
"serde_json",
"sha-1",
"tide",
]
[[package]]
name = "time"
version = "0.1.45"
@ -6190,7 +6161,7 @@ dependencies = [
"dotenv",
"fake",
"futures",
"futures-util 0.3.28",
"futures-util",
"gumdrop",
"hmac 0.12.1",
"jwt",
@ -6218,7 +6189,7 @@ checksum = "6703a273949a90131b290be1fe7b039d0fc884aa1935860dfcbe056f28cd8092"
dependencies = [
"bytes 0.5.6",
"fnv",
"futures-core 0.3.28",
"futures-core",
"iovec",
"lazy_static",
"memchr",
@ -6288,8 +6259,8 @@ dependencies = [
"bincode",
"bytes 1.4.0",
"educe",
"futures-core 0.3.28",
"futures-sink 0.3.28",
"futures-core",
"futures-sink",
"pin-project",
"serde",
"serde_json",
@ -6301,7 +6272,7 @@ version = "0.1.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "397c988d37662c7dda6d2208364a706264bf3d6138b11d436cbac0ad38832842"
dependencies = [
"futures-core 0.3.28",
"futures-core",
"pin-project-lite 0.2.9",
"tokio 1.28.0",
]
@ -6323,8 +6294,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be8242891f2b6cbef26a2d7e8605133c2c554cd35b3e4948ea892d6d68436499"
dependencies = [
"bytes 0.5.6",
"futures-core 0.3.28",
"futures-sink 0.3.28",
"futures-core",
"futures-sink",
"log",
"pin-project-lite 0.1.12",
"tokio 0.2.25",
@ -6337,8 +6308,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "806fe8c2c87eccc8b3267cbae29ed3ab2d0bd37fca70ab622e46aaa9375ddb7d"
dependencies = [
"bytes 1.4.0",
"futures-core 0.3.28",
"futures-sink 0.3.28",
"futures-core",
"futures-sink",
"pin-project-lite 0.2.9",
"slab",
"tokio 1.28.0",
@ -6524,6 +6495,26 @@ version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3528ecfd12c466c6f163363caf2d02a71161dd5e1cc6ae7b34207ea2d42d81ed"
[[package]]
name = "tungstenite"
version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5fe8dada8c1a3aeca77d6b51a4f1314e0f4b8e438b7b1b71e3ddaca8080e4093"
dependencies = [
"base64 0.13.1",
"byteorder",
"bytes 1.4.0",
"http",
"httparse",
"input_buffer",
"log",
"rand 0.8.5",
"sha-1",
"thiserror",
"url",
"utf-8",
]
[[package]]
name = "type-map"
version = "0.4.0"
@ -6651,6 +6642,12 @@ dependencies = [
"serde",
]
[[package]]
name = "utf-8"
version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9"
[[package]]
name = "uuid"
version = "0.8.2"

View File

@ -1,11 +1,26 @@
#[derive(Debug, serde::Deserialize, serde::Serialize)]
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub enum Msg {
Seek(usize),
Ping,
Pong,
Ack,
Test1,
Test2,
}
impl Msg {
pub fn from_bytes(v: &[u8]) -> bincode::Result<Self> {
bincode::deserialize(v)
}
pub fn to_bytes(&self) -> bincode::Result<Vec<u8>> {
bincode::serialize(self)
}
}
#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub struct Message {
pub offset: usize,
pub offset: u64,
pub payload: Msg,
}
@ -17,4 +32,8 @@ impl Message {
pub fn to_bytes(&self) -> bincode::Result<Vec<u8>> {
bincode::serialize(self)
}
pub fn to_bytes_into(&self, buffer: &mut [u8]) -> bincode::Result<()> {
bincode::serialize_into(buffer, self)
}
}

View File

@ -3,16 +3,32 @@ name = "local-event-bus"
version = "0.1.0"
edition = "2021"
[[bin]]
name = "leb"
path = "./src/main.rs"
[[bin]]
name = "lebc"
path = "./src/check1.rs"
[lib]
name = "leb"
path = "./src/lib.rs"
[dependencies]
event-bus-adapter = { path = "../event-bus-adapter" }
thiserror = { version = "1.0.40" }
futures-util = { version = "0.3.28" }
futures = { version = "0.3.28" }
futures-async-runtime = { version = "0.2.1" }
tracing = { version = "0" }
serde = { version = "1.0.162", features = ['derive'] }
flumedb = "*"
serde_json = "*"
flumedb = { version = "*" }
serde_json = { version = "*" }
async-std = { version = "*", features = ["attributes"] }
tide = "*"
gumdrop = "*"
tide = { version = "0.16.0" }
gumdrop = { version = "*" }
tide-websockets = "0.4.0"
bincode = { version = "1" }
uuid = { version = "1.3.2", features = ['v4'] }
crossbeam-channel = { version = "0.5.8" }
tracing-subscriber = { version = "0.3.17", features = ['env-filter'] }

View File

@ -0,0 +1,4 @@
#[async_std::main]
async fn main() {
}

View File

@ -0,0 +1,50 @@
#![feature(async_fn_in_trait)]
use std::pin::Pin;
use std::task::{Context, Poll};
use event_bus_adapter::*;
use futures_util::AsyncReadExt;
pub struct L {}
pub struct MessageStream;
impl futures::stream::Stream for MessageStream {
type Item = Message;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
todo!();
}
}
pub struct MessageSender;
impl MessageSender {
pub async fn ack(&mut self) {
//
}
}
impl MessageSend for MessageSender {
async fn send(&mut self, msg: Msg) {
todo!()
}
}
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct LebConfig {
bind: String,
port: u16,
}
impl EventBus<MessageStream, MessageSender> for L {
async fn connect(config: Config) -> Result<(MessageStream, MessageSender), ()> {
let config: LebConfig = config.config().expect("Invalid Local Event Bus config");
let client = async_std::net::TcpStream::connect(format!("{}:{}", config.bind, config.port))
.await
.expect("Failed tp connect to event bus");
todo!()
}
}

View File

@ -1,53 +1,290 @@
use flumedb::*;
use gumdrop::Options;
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use async_std::net::{TcpListener, TcpStream};
use event_bus_adapter::*;
use flumedb::*;
use futures_util::{AsyncReadExt, AsyncWriteExt, StreamExt};
use gumdrop::Options;
use tracing::{error, info, warn};
use tracing_subscriber::EnvFilter;
use uuid::Uuid;
#[derive(Debug, thiserror::Error)]
enum Error {}
enum Error {
#[error("Client closed connection")]
BrokenPipe,
}
#[derive(Options)]
struct Opts {
help: bool,
log_path: String,
bind: String,
listen_port: u16,
broadcast_port: u16,
log_path: Option<String>,
bind: Option<String>,
port: Option<u16>,
}
#[async_std::main]
async fn main() -> Result<(), Error> {
tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env())
.init();
let opts: Opts = Options::parse_args_default_or_exit();
let path = opts.log_path;
let mut log = OffsetLog::<u32>::from_file(
std::fs::OpenOptions::new()
.append(true)
.create(true)
.write(true)
.read(true)
.open(&path)
.expect("Failed to open log file"),
)
.expect("Failed to open log file");
let path = opts
.log_path
.unwrap_or_else(|| std::env::var("LOG_PATH").expect("No path to save file"));
let len = log.iter().count();
let listener = TcpListener::bind(format!(
"{}:{}",
opts.bind
.unwrap_or_else(|| std::env::var("BIND").expect("No bind parameter")),
opts.port.unwrap_or_else(|| std::env::var("PORT")
.expect("No port parameter")
.parse()
.expect("Invalid port format. Expect number"))
))
.await
.expect("Failed to start server");
log.append(&len.to_le_bytes()).expect("Failed to write");
let mut incoming = listener.incoming();
// Read the entry at offset 0
let r = log.read(0).expect("Failed to read at 0");
// `r.data` is a json string in a standard ssb log.
// `r.next` is the offset of the next entry.
let r = log.read(r.next);
let clients = Arc::new(RwLock::new(HashMap::with_capacity(4086)));
let (tx, rx) = std::sync::mpsc::channel();
log.iter().map(|r| {
eprintln!("{r:?}");
let (vec, _) = r.data.split_at(std::mem::size_of::<usize>());
usize::from_le_bytes(vec.try_into().unwrap())
})
.for_each(|v| eprintln!("value {v:?}"));
// log.iter()
// .map(|e| serde_json::from_slice::<serde_json::Value>(&e.data).unwrap())
// .for_each(|v| println!("{}", serde_json::to_string_pretty(&v).unwrap()));
{
let clients = clients.clone();
async_std::task::spawn(async move {
loop {
let Ok(msg) = rx.recv() else {
continue;
};
match msg {
InnerMsg::Drop(uuid) => {
clients.write().unwrap().remove(&uuid);
}
InnerMsg::Register(client) => {
clients.write().unwrap().insert(client.id, client);
}
}
}
});
}
while let Some(stream) = incoming.next().await {
let path = path.clone();
let tx = tx.clone();
let clients = clients.clone();
async_std::task::spawn(async move {
let Ok(stream) = stream else {
return;
};
let mut log = OffsetLog::<Msg>::from_file(
std::fs::OpenOptions::new()
.append(true)
.create(true)
.write(true)
.read(true)
.open(&path)
.expect("Failed to open log file"),
)
.expect("Failed to open log file");
let mut client = Client {
stream,
id: Uuid::new_v4(),
alive: Arc::new(RwLock::new(true)),
offset: None,
};
let mut buffer = [0; 4086];
if let Err(e) = tx.send(InnerMsg::Register(client.clone())) {
warn!("Failed to send register: {e}");
}
loop {
let len = match AsyncReadExt::read(&mut client.stream, &mut buffer).await {
Ok(n) => n,
Err(e) => {
warn!("Failed to read from client: {e}");
break;
}
};
let msg = match Msg::from_bytes(&buffer[..len]) {
Ok(msg) => msg,
Err(e) => {
warn!("Invalid incoming message {buffer:?}: {e}");
continue;
}
};
match &msg {
Msg::Ping | Msg::Pong => {
if let Err(e) = client
.stream
.write_all(&Msg::Ping.to_bytes().unwrap_or_default())
.await
{
warn!("Failed to write to client. Closing. {e}");
client.stream.close().await.ok();
break;
}
}
Msg::Seek(offset) => {
let log_file = match OffsetLog::<Msg>::open_read_only(&path) {
Ok(f) => f,
Err(e) => {
error!("Failed to open file for read: {e}");
continue;
}
};
let offset = (*offset) as u64;
client.offset = Some(Arc::new(RwLock::new(offset)));
let iter = log_file.bidir_iter_at_offset(offset);
let mut iter =
iter.filter_map(|e| Msg::from_bytes(&e.data).ok().zip(Some(e.offset)));
while let Some((msg, offset)) = iter.next() {
match send_directly(client.clone(), &mut buffer, offset, msg).await {
Err(Error::BrokenPipe) => break,
Err(e) => {
warn!("Failed to send message: {e}");
continue;
}
_ => {
if let Err(Error::BrokenPipe) =
wait_ack(client.clone(), &mut buffer).await
{
break;
}
if let Some(o) = client.offset.as_mut() {
*o.write().unwrap() = offset;
}
}
};
}
}
_ => {
info!("Incoming message: {msg:?}");
if client.offset.is_none() {
warn!("Offset is not set. Skipping...");
continue;
}
let bytes = match msg.to_bytes() {
Ok(v) => v,
Err(e) => {
warn!("Failed to serialize message: {e}");
break;
}
};
let offset = match log.append(&bytes) {
Ok(offset) => offset,
Err(e) => {
error!("Failed to write message to file: {e}");
break;
}
};
if let Err(e) = broadcast(clients.clone(), &mut buffer, offset, msg).await {
warn!("Failed to broadcast message: {e}");
}
}
};
}
*client.alive.write().unwrap() = false;
if let Err(e) = tx.send(InnerMsg::Drop(client.id)) {
warn!("Failed to send close client: {e}");
}
});
}
Ok(())
}
async fn broadcast(
clients: Arc<RwLock<HashMap<Uuid, Client>>>,
buffer: &mut [u8],
offset: u64,
msg: Msg,
) -> Result<(), Error> {
let clients = clients
.read()
.unwrap()
.values()
.filter(|c| *c.alive.read().unwrap())
.map(Clone::clone)
.collect::<Vec<_>>();
for client in clients {
let id = client.id;
if let Err(e) = send_directly(client.clone(), buffer, offset, msg.clone()).await {
warn!("Failed to send message to {}: {e}", id);
}
if let Err(Error::BrokenPipe) = wait_ack(client, buffer).await {
break;
}
}
Ok(())
}
async fn send_directly(
mut client: Client,
buffer: &mut [u8],
offset: u64,
msg: Msg,
) -> Result<(), Error> {
if let Err(e) = (Message {
offset,
payload: msg,
})
.to_bytes_into(buffer)
{
warn!("Failed to serialize message: {e}");
return Ok(());
};
if let Err(e) = client.stream.write(buffer).await {
warn!("Failed to write message to client: {e}");
Err(Error::BrokenPipe)
} else {
Ok(())
}
}
async fn read_msg(mut client: Client, buffer: &mut [u8]) -> Result<Option<Msg>, Error> {
let len = match AsyncReadExt::read(&mut client.stream, buffer).await {
Ok(n) => n,
Err(e) => {
warn!("Failed to read from client: {e}");
return Err(Error::BrokenPipe);
}
};
match Msg::from_bytes(&buffer[..len]) {
Ok(msg) => Ok(Some(msg)),
Err(e) => {
warn!("Invalid incoming message {buffer:?}: {e}");
return Ok(None);
}
}
}
async fn wait_ack(client: Client, buffer: &mut [u8]) -> Result<(), Error> {
loop {
let msg = read_msg(client.clone(), buffer).await?;
if let Some(Msg::Ack) = msg {
break;
}
}
Ok(())
}
enum InnerMsg {
Register(Client),
Drop(Uuid),
}
#[derive(Clone)]
struct Client {
stream: TcpStream,
id: Uuid,
alive: Arc<RwLock<bool>>,
offset: Option<Arc<RwLock<u64>>>,
}

View File

@ -9,6 +9,5 @@ event-bus-adapter = { path = "../event-bus-adapter" }
thiserror = { version = "1.0.40" }
futures-util = { version = "0.3.28" }
futures = { version = "0.3.28" }
futures-async-runtime = { version = "0.2.1" }
tracing = { version = "0" }
serde = { version = "1.0.162" }

View File

@ -9,7 +9,7 @@ use futures_util::{SinkExt, StreamExt};
use redis_async::client::connect::RespConnection;
use redis_async::resp::RespValue;
use redis_async::resp::RespValue::BulkString;
use tracing::log::warn;
use tracing::warn;
pub struct MessageSender(SplitSink<RespConnection, RespValue>);