Redis Event Stream

This commit is contained in:
Adrian Woźniak 2023-05-25 15:56:15 +02:00
parent 445e9ac97c
commit 84e5e3343a
17 changed files with 703 additions and 419 deletions

284
Cargo.lock generated
View File

@ -38,10 +38,10 @@ dependencies = [
"bitflags",
"bytes 1.4.0",
"crossbeam-channel",
"futures-core",
"futures-sink",
"futures-core 0.3.28",
"futures-sink 0.3.28",
"futures-task",
"futures-util",
"futures-util 0.3.28",
"log",
"once_cell",
"parking_lot 0.12.1",
@ -59,8 +59,8 @@ checksum = "57a7559404a7f3573127aab53c08ce37a6c6a315c374a31070f3c91cd1b4a7fe"
dependencies = [
"bitflags",
"bytes 1.4.0",
"futures-core",
"futures-sink",
"futures-core 0.3.28",
"futures-sink 0.3.28",
"log",
"memchr",
"pin-project-lite 0.2.9",
@ -87,7 +87,7 @@ dependencies = [
"derive_more",
"encoding_rs",
"flate2",
"futures-core",
"futures-core 0.3.28",
"h2 0.3.18",
"http",
"httparse",
@ -137,7 +137,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "15265b6b8e2347670eb363c47fc8c75208b4a4994b27192f345fcbe707804f3e"
dependencies = [
"actix-macros",
"futures-core",
"futures-core 0.3.28",
"tokio 1.28.0",
]
@ -150,8 +150,8 @@ dependencies = [
"actix-rt",
"actix-service",
"actix-utils",
"futures-core",
"futures-util",
"futures-core 0.3.28",
"futures-util 0.3.28",
"mio 0.8.6",
"num_cpus",
"socket2 0.4.9",
@ -165,7 +165,7 @@ version = "2.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3b894941f818cfdc7ccc4b9e60fa7e53b5042a2e8567270f9147d5591893373a"
dependencies = [
"futures-core",
"futures-core 0.3.28",
"paste",
"pin-project-lite 0.2.9",
]
@ -202,8 +202,8 @@ dependencies = [
"cookie 0.16.2",
"derive_more",
"encoding_rs",
"futures-core",
"futures-util",
"futures-core 0.3.28",
"futures-util 0.3.28",
"http",
"itoa 1.0.6",
"language-tags",
@ -368,7 +368,7 @@ checksum = "cf46fee83e5ccffc220104713af3292ff9bc7c64c7de289f66dae8e38d826833"
dependencies = [
"concurrent-queue",
"event-listener",
"futures-core",
"futures-core 0.3.28",
]
[[package]]
@ -378,7 +378,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "65bb25e3145c6216eafb3eca6f8cd6e016f43c9d4416d0af7984de46acf4f288"
dependencies = [
"chrono",
"futures-util",
"futures-util 0.3.28",
"hex",
"hmac",
"http-types",
@ -851,7 +851,7 @@ dependencies = [
"actix-web",
"cookie 0.17.0",
"parking_lot 0.12.1",
"password-hash 0.5.0",
"password-hash 0.4.2",
"serde",
"serde_json",
"thiserror",
@ -1496,10 +1496,23 @@ dependencies = [
[[package]]
name = "event-bus-adapter"
version = "0.1.0"
dependencies = [
"bincode",
"event-bus-messages",
"futures",
"serde",
"thiserror",
"toml 0.7.3",
]
[[package]]
name = "event-bus-messages"
version = "0.1.0"
dependencies = [
"bincode",
"serde",
"thiserror",
]
[[package]]
name = "event-listener"
@ -1631,8 +1644,8 @@ version = "0.10.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1657b4441c3403d9f7b3409e47575237dac27b1b5726df654a6ecbf92f0f7577"
dependencies = [
"futures-core",
"futures-sink",
"futures-core 0.3.28",
"futures-sink 0.3.28",
"nanorand",
"pin-project",
"spin 0.9.8",
@ -1722,13 +1735,32 @@ version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "23342abe12aba583913b2e62f22225ff9c950774065e4bfb61a19cd9770fec40"
dependencies = [
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-channel 0.3.28",
"futures-core 0.3.28",
"futures-executor 0.3.28",
"futures-io 0.3.28",
"futures-sink 0.3.28",
"futures-task",
"futures-util",
"futures-util 0.3.28",
]
[[package]]
name = "futures-async-runtime"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bab8d196f9bcbc3b33148960602889bf44c74afe5bea20ee970aaa08df2db2f5"
dependencies = [
"futures-core 0.2.1",
"futures-stable",
]
[[package]]
name = "futures-channel"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbb37ec6418c577b25f5b129c0f4456ad7ce8714ec43c59712aa7e4cd2cb6b85"
dependencies = [
"futures-core 0.2.1",
]
[[package]]
@ -1737,8 +1769,17 @@ version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "955518d47e09b25bbebc7a18df10b81f0c766eaf4c4f1cccef2fca5f2a4fb5f2"
dependencies = [
"futures-core",
"futures-sink",
"futures-core 0.3.28",
"futures-sink 0.3.28",
]
[[package]]
name = "futures-core"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7455c91eb2eae38f33b013f77ebe766c75761af333efd9d550e154045c63e225"
dependencies = [
"either",
]
[[package]]
@ -1747,15 +1788,28 @@ version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4bca583b7e26f571124fe5b7561d49cb2868d79116cfa0eefce955557c6fee8c"
[[package]]
name = "futures-executor"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5db1dd3979745f5e50b28fd604602f2715f9d5a28ab835a5f9686a9d84cd1315"
dependencies = [
"futures-channel 0.2.1",
"futures-core 0.2.1",
"futures-util 0.2.1",
"lazy_static",
"num_cpus",
]
[[package]]
name = "futures-executor"
version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ccecee823288125bd88b4d7f565c9e58e41858e47ab72e8ea2d64e93624386e0"
dependencies = [
"futures-core",
"futures-core 0.3.28",
"futures-task",
"futures-util",
"futures-util 0.3.28",
]
[[package]]
@ -1764,11 +1818,21 @@ version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a604f7a68fbf8103337523b1fadc8ade7361ee3f112f7c680ad179651616aed5"
dependencies = [
"futures-core",
"futures-core 0.3.28",
"lock_api",
"parking_lot 0.11.2",
]
[[package]]
name = "futures-io"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6a0470fdba9dc87c27a3564ad6d5cc04e080f3afa26c93549728cce46ab21a2"
dependencies = [
"futures-core 0.2.1",
"iovec",
]
[[package]]
name = "futures-io"
version = "0.3.28"
@ -1782,8 +1846,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49a9d51ce47660b1e808d3c990b4709f2f415d928835a17dfd16991515c46bce"
dependencies = [
"fastrand",
"futures-core",
"futures-io",
"futures-core 0.3.28",
"futures-io 0.3.28",
"memchr",
"parking",
"pin-project-lite 0.2.9",
@ -1801,29 +1865,63 @@ dependencies = [
"syn 2.0.15",
]
[[package]]
name = "futures-sink"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a8a93a7c480876b8e02cdd70022e7eb9c8423575ea6a25a0b749b18834c16412"
dependencies = [
"either",
"futures-channel 0.2.1",
"futures-core 0.2.1",
]
[[package]]
name = "futures-sink"
version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f43be4fe21a13b9781a69afa4985b0f6ee0e1afab2c6f454a8cf30e2b2237b6e"
[[package]]
name = "futures-stable"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a222f540db94a09c275be08a406b5cd82f4311422b940a684795cdaef3d02e81"
dependencies = [
"futures-core 0.2.1",
"futures-executor 0.2.1",
]
[[package]]
name = "futures-task"
version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "76d3d132be6c0e6aa1534069c705a74a5997a356c0dc2f86a47765e5617c5b65"
[[package]]
name = "futures-util"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0cf12a3fc1ccaf1bc2901ec6e0ed6ed407a4f16eaa20dd838f40cabf5f7b31f1"
dependencies = [
"either",
"futures-channel 0.2.1",
"futures-core 0.2.1",
"futures-io 0.2.1",
"futures-sink 0.2.1",
]
[[package]]
name = "futures-util"
version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26b01e40b772d54cf6c6d721c1d1abd0647a0106a12ecaa1c186273392a69533"
dependencies = [
"futures-channel",
"futures-core",
"futures-io",
"futures-channel 0.3.28",
"futures-core 0.3.28",
"futures-io 0.3.28",
"futures-macro",
"futures-sink",
"futures-sink 0.3.28",
"futures-task",
"memchr",
"pin-project-lite 0.2.9",
@ -1931,7 +2029,7 @@ version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a8d5564e570a38b43d78bdc063374a0c3098c4f0d64005b12f9bbe87e869b6d7"
dependencies = [
"futures-channel",
"futures-channel 0.3.28",
"gloo-events",
"js-sys",
"wasm-bindgen",
@ -1944,8 +2042,8 @@ version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b995a66bb87bebce9a0f4a95aed01daca4872c050bfcb21653361c03bc35e5c"
dependencies = [
"futures-channel",
"futures-core",
"futures-channel 0.3.28",
"futures-core 0.3.28",
"js-sys",
"wasm-bindgen",
]
@ -1991,9 +2089,9 @@ checksum = "5e4728fd124914ad25e99e3d15a9361a879f6620f63cb56bbb08f95abb97a535"
dependencies = [
"bytes 0.5.6",
"fnv",
"futures-core",
"futures-sink",
"futures-util",
"futures-core 0.3.28",
"futures-sink 0.3.28",
"futures-util 0.3.28",
"http",
"indexmap",
"slab",
@ -2011,9 +2109,9 @@ checksum = "17f8a914c2987b688368b5138aa05321db91f4090cf26118185672ad588bce21"
dependencies = [
"bytes 1.4.0",
"fnv",
"futures-core",
"futures-sink",
"futures-util",
"futures-core 0.3.28",
"futures-sink 0.3.28",
"futures-util 0.3.28",
"http",
"indexmap",
"slab",
@ -2227,9 +2325,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a6f157065790a3ed2f88679250419b5cdd96e714a0d65f7797fd337186e96bb"
dependencies = [
"bytes 0.5.6",
"futures-channel",
"futures-core",
"futures-util",
"futures-channel 0.3.28",
"futures-core 0.3.28",
"futures-util 0.3.28",
"h2 0.2.7",
"http",
"http-body 0.3.1",
@ -2251,9 +2349,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab302d72a6f11a3b910431ff93aae7e773078c769f0a3ef15fb9ec692ed147d4"
dependencies = [
"bytes 1.4.0",
"futures-channel",
"futures-core",
"futures-util",
"futures-channel 0.3.28",
"futures-core 0.3.28",
"futures-util 0.3.28",
"h2 0.3.18",
"http",
"http-body 0.4.5",
@ -2731,15 +2829,24 @@ version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f303ec0e94c6c54447f84f3b0ef7af769858a9c4ef56ef2a986d3dcd4c3fc9c"
dependencies = [
"futures-core",
"futures-sink",
"futures-util",
"futures-core 0.3.28",
"futures-sink 0.3.28",
"futures-util 0.3.28",
"local-waker",
]
[[package]]
name = "local-event-bus"
version = "0.1.0"
dependencies = [
"event-bus-adapter",
"futures",
"futures-async-runtime",
"futures-util 0.3.28",
"serde",
"thiserror",
"tracing",
]
[[package]]
name = "local-waker"
@ -3142,8 +3249,8 @@ version = "0.18.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c24f96e21e7acc813c7a8394ee94978929db2bcc46cf6b5014fc612bf7760c22"
dependencies = [
"futures-channel",
"futures-util",
"futures-channel 0.3.28",
"futures-util 0.3.28",
"indexmap",
"js-sys",
"once_cell",
@ -3159,9 +3266,9 @@ checksum = "1ca41c4933371b61c2a2f214bf16931499af4ec90543604ec828f7a625c09113"
dependencies = [
"async-trait",
"crossbeam-channel",
"futures-channel",
"futures-executor",
"futures-util",
"futures-channel 0.3.28",
"futures-executor 0.3.28",
"futures-util 0.3.28",
"once_cell",
"opentelemetry_api",
"percent-encoding",
@ -3291,6 +3398,7 @@ dependencies = [
"payment_adapter",
"serde",
"serde_json",
"thiserror",
"tracing",
]
@ -3780,9 +3888,35 @@ dependencies = [
"rand_core 0.3.1",
]
[[package]]
name = "redis-async"
version = "0.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c70d5058251eb4b8acee55aad429a7a8a4ef2b61fe1f7be39ac6cfcd2934f334"
dependencies = [
"bytes 1.4.0",
"futures-channel 0.3.28",
"futures-sink 0.3.28",
"futures-util 0.3.28",
"log",
"pin-project",
"tokio 1.28.0",
"tokio-util 0.7.8",
]
[[package]]
name = "redis-event-bus"
version = "0.1.0"
dependencies = [
"event-bus-adapter",
"futures",
"futures-async-runtime",
"futures-util 0.3.28",
"redis-async",
"serde",
"thiserror",
"tracing",
]
[[package]]
name = "redox_syscall"
@ -3875,8 +4009,8 @@ dependencies = [
"base64 0.13.1",
"bytes 0.5.6",
"encoding_rs",
"futures-core",
"futures-util",
"futures-core 0.3.28",
"futures-util 0.3.28",
"http",
"http-body 0.3.1",
"hyper 0.13.10",
@ -3910,8 +4044,8 @@ dependencies = [
"base64 0.21.0",
"bytes 1.4.0",
"encoding_rs",
"futures-core",
"futures-util",
"futures-core 0.3.28",
"futures-util 0.3.28",
"h2 0.3.18",
"http",
"http-body 0.4.5",
@ -4153,8 +4287,8 @@ checksum = "4ed36cdb20de66d89a17ea04b8883fc7a386f2cf877aaedca5005583ce4876ff"
dependencies = [
"crossbeam-channel",
"futures",
"futures-channel",
"futures-executor",
"futures-channel 0.3.28",
"futures-executor 0.3.28",
"num_cpus",
]
@ -4610,10 +4744,10 @@ dependencies = [
"dotenvy",
"either",
"event-listener",
"futures-channel",
"futures-core",
"futures-channel 0.3.28",
"futures-core 0.3.28",
"futures-intrusive",
"futures-util",
"futures-util 0.3.28",
"git2",
"hashlink",
"hex",
@ -5051,7 +5185,7 @@ dependencies = [
"dotenv",
"fake",
"futures",
"futures-util",
"futures-util 0.3.28",
"gumdrop",
"hmac",
"jwt",
@ -5079,7 +5213,7 @@ checksum = "6703a273949a90131b290be1fe7b039d0fc884aa1935860dfcbe056f28cd8092"
dependencies = [
"bytes 0.5.6",
"fnv",
"futures-core",
"futures-core 0.3.28",
"iovec",
"lazy_static",
"memchr",
@ -5149,8 +5283,8 @@ dependencies = [
"bincode",
"bytes 1.4.0",
"educe",
"futures-core",
"futures-sink",
"futures-core 0.3.28",
"futures-sink 0.3.28",
"pin-project",
"serde",
"serde_json",
@ -5162,7 +5296,7 @@ version = "0.1.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "397c988d37662c7dda6d2208364a706264bf3d6138b11d436cbac0ad38832842"
dependencies = [
"futures-core",
"futures-core 0.3.28",
"pin-project-lite 0.2.9",
"tokio 1.28.0",
]
@ -5184,8 +5318,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be8242891f2b6cbef26a2d7e8605133c2c554cd35b3e4948ea892d6d68436499"
dependencies = [
"bytes 0.5.6",
"futures-core",
"futures-sink",
"futures-core 0.3.28",
"futures-sink 0.3.28",
"log",
"pin-project-lite 0.1.12",
"tokio 0.2.25",
@ -5198,8 +5332,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "806fe8c2c87eccc8b3267cbae29ed3ab2d0bd37fca70ab622e46aaa9375ddb7d"
dependencies = [
"bytes 1.4.0",
"futures-core",
"futures-sink",
"futures-core 0.3.28",
"futures-sink 0.3.28",
"pin-project-lite 0.2.9",
"slab",
"tokio 1.28.0",

View File

@ -77,13 +77,11 @@ pub async fn create_account(
db: &Database,
config: SharedAppConfig,
) -> Result<FullAccount> {
let hash = msg
.password
.encrypt(&config.lock().web().pass_salt())
.map_err(|e| {
tracing::error!("{e:?}");
Error::Hashing
})?;
let salt = config.lock().web().pass_salt();
let hash = msg.password.encrypt(&salt).map_err(|e| {
tracing::error!("{e:?}");
Error::Hashing
})?;
let mut t = begin_t!(db, Error::DbCritical);

View File

@ -11,7 +11,7 @@ default = []
actix-web = { version = "4", features = [], optional = true }
cookie = { version = "0", features = ["signed"], optional = true }
parking_lot = { version = "0", features = [] }
password-hash = { version = "0", features = ["alloc"] }
password-hash = { version = "=0.4.2", features = ["alloc"] }
serde = { version = "1", features = ["derive"] }
serde_json = { version = "1", features = [] }
thiserror = { version = "1" }

View File

@ -88,7 +88,7 @@ impl WebConfig {
}
pub fn pass_salt(&self) -> SaltString {
SaltString::from_b64(
SaltString::new(
&self
.pass_salt
.as_ref()

View File

@ -6,3 +6,7 @@ edition = "2021"
[dependencies]
serde = { version = "1.0.162", features = ['derive'] }
bincode = { version = "1.3.3" }
thiserror = { version = "1.0.40" }
event-bus-messages = { path = "../event-bus-messages" }
futures = { version = "0.3.28" }
toml = { version = "0.7.3" }

View File

@ -1,14 +1,28 @@
pub fn add(left: usize, right: usize) -> usize {
left + right
}
#![feature(async_fn_in_trait)]
#[cfg(test)]
mod tests {
use super::*;
pub use event_bus_messages::{Message, Msg};
#[test]
fn it_works() {
let result = add(2, 2);
assert_eq!(result, 4);
#[derive(Debug, thiserror::Error)]
pub enum EBError {}
pub type EBResult<T> = Result<T, EBError>;
pub struct Config(String);
impl Config {
pub fn config<S: serde::de::DeserializeOwned>(self) -> Result<S, toml::de::Error> {
toml::from_str(&self.0)
}
}
pub trait MessageSend {
async fn send(&mut self, msg: Msg);
}
pub trait EventBus<Stream, Sender>
where
Stream: futures::stream::Stream<Item = Message>,
Sender: MessageSend,
{
async fn connect(config: Config) -> Result<(Stream, Sender), ()>;
}

View File

@ -3,6 +3,7 @@ name = "event-bus-messages"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
thiserror = { version = "1.0.40" }
serde = { version = "1.0.162", features = ['derive'] }
bincode = { version = "1.3.3" }

View File

@ -1,14 +1,20 @@
pub fn add(left: usize, right: usize) -> usize {
left + right
#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub enum Msg {
Seek(usize),
}
#[cfg(test)]
mod tests {
use super::*;
#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub struct Message {
pub index: usize,
pub payload: Msg,
}
#[test]
fn it_works() {
let result = add(2, 2);
assert_eq!(result, 4);
impl Message {
pub fn from_bytes(v: &[u8]) -> bincode::Result<Self> {
bincode::deserialize(v)
}
pub fn to_bytes(&self) -> bincode::Result<Vec<u8>> {
bincode::serialize(self)
}
}

View File

@ -3,6 +3,11 @@ name = "local-event-bus"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
event-bus-adapter = { path = "../event-bus-adapter" }
thiserror = { version = "1.0.40" }
futures-util = { version = "0.3.28" }
futures = { version = "0.3.28" }
futures-async-runtime = { version = "0.2.1" }
tracing = { version = "0" }
serde = { version = "1.0.162" }

View File

@ -19,3 +19,4 @@ fulfillment_adapter = { path = "../fulfillment_adapter" }
serde = { version = "1", features = ['derive'] }
serde_json = { version = "1" }
async-trait = { version = "0.1.68" }
thiserror = { version = "1.0.40" }

View File

@ -1,4 +1,5 @@
#![crate_type = "rlib"]
#![feature(async_fn_in_trait)]
mod credit;
mod deserialize;
@ -8,7 +9,6 @@ mod req;
mod res;
mod serialize;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use fulfillment_adapter::{
@ -16,8 +16,6 @@ use fulfillment_adapter::{
};
use payment_adapter::*;
use crate::req::OrderCreate;
pub struct PayUPayment {
sandbox: bool,
merchant_pos_id: i32,
@ -57,72 +55,95 @@ impl PayUPayment {
}
}
#[derive(Default, Debug, serde::Serialize, serde::Deserialize)]
pub struct PayUConfig {
sandbox: bool,
merchant_pos_id: i32,
client_id: String,
client_secret: String,
bearer: Option<String>,
bearer_expires_at: chrono::DateTime<chrono::Utc>,
}
#[async_trait::async_trait]
impl PaymentAdapter for PayUPayment {
async fn new(config: Config) -> Self {
let config: PayUConfig = config.config().expect("Malformed Stripe PayU config");
Self {
sandbox: config.sandbox,
merchant_pos_id: 0,
client_id: "".to_string(),
client_secret: "".to_string(),
bearer: None,
bearer_expires_at: Default::default(),
}
}
fn identifier(&self) -> &'static str {
todo!()
}
async fn initialize_payment(
&mut self,
ctx: PaymentProcessorContext,
ctx: &mut PaymentProcessorContext,
) -> PResult<PaymentProcessorSessionResponse> {
todo!()
}
async fn update_payment(
&mut self,
ctx: PaymentProcessorContext,
ctx: &mut PaymentProcessorContext,
) -> PResult<Option<PaymentProcessorSessionResponse>> {
todo!()
}
async fn refund_payment(
&mut self,
payment_session_data: PaymentSessionData,
payment_session_data: &mut Box<dyn PaymentSessionData>,
refund_amount: Amount,
) -> PResult<PaymentSessionData> {
) -> PResult<()> {
todo!()
}
async fn authorize_payment(
&mut self,
payment_session_data: PaymentSessionData,
payment_session_data: &mut Box<dyn PaymentSessionData>,
data: PaymentProcessCtx,
) -> PResult<(PaymentSessionStatus, PaymentSessionData)> {
) -> PResult<(PaymentSessionStatus, ())> {
todo!()
}
async fn capture_payment(
&mut self,
payment_session_data: PaymentSessionData,
) -> PResult<PaymentSessionData> {
payment_session_data: &mut Box<dyn PaymentSessionData>,
) -> PResult<()> {
todo!()
}
async fn delete_payment(
&mut self,
payment_session_data: PaymentSessionData,
) -> PResult<PaymentSessionData> {
payment_session_data: &mut Box<dyn PaymentSessionData>,
) -> PResult<()> {
todo!()
}
async fn retrieve_payment(
&mut self,
payment_session_data: PaymentSessionData,
) -> PResult<PaymentSessionData> {
payment_session_data: &mut Box<dyn PaymentSessionData>,
) -> PResult<Box<dyn PaymentSessionData>> {
todo!()
}
async fn cancel_payment(
&mut self,
payment_session_data: PaymentSessionData,
) -> PResult<PaymentSessionData> {
payment_session_data: &mut Box<dyn PaymentSessionData>,
) -> PResult<()> {
todo!()
}
async fn payment_status(
&mut self,
payment_session_data: &PaymentSessionData,
payment_session_data: &mut Box<dyn PaymentSessionData>,
) -> PResult<PaymentSessionStatus> {
todo!()
}
@ -130,66 +151,67 @@ impl PaymentAdapter for PayUPayment {
pub struct PayUFulfillment {}
impl FulfillmentAdapter for PayUFulfillment {
fn identifier() -> &'static str {
"pay-u"
}
fn fulfillment_options(&mut self) -> FResult<&[FulfillmentOption]> {
todo!()
}
fn validate_fulfillment_payload<P: FulfillmentPayload, C: FulfillmentCart>(
&mut self,
payload: P,
cart: C,
) -> FResult<bool> {
todo!()
}
fn validate_option<P: FulfillmentPayload>(&mut self, payload: P) -> FResult<bool> {
todo!()
}
fn can_calculate<P: FulfillmentPayload>(&mut self, payload: P) -> FResult<bool> {
todo!()
}
fn calculate_price<P: FulfillmentPayload, C: FulfillmentCart>(
data: P,
cart: C,
) -> FResult<u64> {
todo!()
}
fn create_fulfillment(&mut self) {
todo!()
}
fn cancel_fulfillment(&mut self) {
todo!()
}
fn fulfillment_documents(&mut self) {
todo!()
}
fn create_return(&mut self) {
todo!()
}
fn return_documents(&mut self) {
todo!()
}
fn shipment_documents(&mut self) {
todo!()
}
fn documents(&mut self) {
todo!()
}
}
// #[async_trait::async_trait]
// impl FulfillmentAdapter for PayUFulfillment {
// fn identifier() -> &'static str {
// "pay-u"
// }
//
// fn fulfillment_options(&mut self) -> FResult<&[FulfillmentOption]> {
// todo!()
// }
//
// fn validate_fulfillment_payload<P: FulfillmentPayload, C:
// FulfillmentCart>( &mut self,
// payload: P,
// cart: C,
// ) -> FResult<bool> {
// todo!()
// }
//
// fn validate_option<P: FulfillmentPayload>(&mut self, payload: P) ->
// FResult<bool> { todo!()
// }
//
// fn can_calculate<P: FulfillmentPayload>(&mut self, payload: P) ->
// FResult<bool> { todo!()
// }
//
// fn calculate_price<P: FulfillmentPayload, C: FulfillmentCart>(
// data: P,
// cart: C,
// ) -> FResult<u64> {
// todo!()
// }
//
// fn create_fulfillment(&mut self) {
// todo!()
// }
//
// fn cancel_fulfillment(&mut self) {
// todo!()
// }
//
// fn fulfillment_documents(&mut self) {
// todo!()
// }
//
// fn create_return(&mut self) {
// todo!()
// }
//
// fn return_documents(&mut self) {
// todo!()
// }
//
// fn shipment_documents(&mut self) {
// todo!()
// }
//
// fn documents(&mut self) {
// todo!()
// }
// }
static mut CONFIG: Option<Arc<Mutex<PayUPayment>>> = None;

View File

@ -3,6 +3,12 @@ name = "redis-event-bus"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
redis-async = { version = "0.16.0" }
event-bus-adapter = { path = "../event-bus-adapter" }
thiserror = { version = "1.0.40" }
futures-util = { version = "0.3.28" }
futures = { version = "0.3.28" }
futures-async-runtime = { version = "0.2.1" }
tracing = { version = "0" }
serde = { version = "1.0.162" }

View File

@ -0,0 +1,91 @@
#![feature(async_fn_in_trait)]
use std::pin::Pin;
use std::task::{Context, Poll};
use event_bus_adapter::{Config, EventBus, Message, MessageSend, Msg};
use futures_util::stream::{SplitSink, SplitStream};
use futures_util::{SinkExt, StreamExt};
use redis_async::client::connect::RespConnection;
use redis_async::resp::RespValue;
use redis_async::resp::RespValue::BulkString;
use tracing::log::warn;
pub struct MessageSender(SplitSink<RespConnection, RespValue>);
impl MessageSend for MessageSender {
async fn send(&mut self, msg: Msg) {
match (Message {
payload: msg,
index: 0,
})
.to_bytes()
{
Ok(v) => {
if let Err(e) = self.0.send(BulkString(v)).await {
warn!("Failed to send serialized message: {e}");
}
}
Err(e) => {
warn!("Failed to serialize message while sending: {e}");
}
}
}
}
pub struct MessageStream(SplitStream<RespConnection>);
impl futures::stream::Stream for MessageStream {
type Item = Message;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut_self: &mut Self = unsafe { Pin::get_unchecked_mut(self) };
match Pin::new(&mut mut_self.0).poll_next(cx) {
Poll::Ready(value) => match value {
Some(msg) => {
let value = match msg {
Ok(v) => v,
Err(e) => {
warn!("reading from redis event stream failed: {e}");
return Poll::Pending;
}
};
let BulkString(v) = value else {
return Poll::Pending;
};
let msg = match Message::from_bytes(&v) {
Ok(msg) => msg,
Err(e) => {
warn!("Invalid message: {e}");
return Poll::Pending;
}
};
return Poll::Ready(Some(msg));
}
None => Poll::Ready(None),
},
Poll::Pending => Poll::Pending,
}
}
}
#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub struct RedisEventBusConfig {
host: String,
port: u16,
}
pub struct RedisEventBus;
impl EventBus<MessageStream, MessageSender> for RedisEventBus {
async fn connect(config: Config) -> Result<(MessageStream, MessageSender), ()> {
let RedisEventBusConfig { host, port } = config.config().expect("Invalid redis bus config");
let client = redis_async::client::connect(&host, port)
.await
.expect("Failed to connect to redis event bus");
let (sink, stream) = client.split();
Ok((MessageStream(stream), MessageSender(sink)))
}
}

View File

@ -1,3 +0,0 @@
fn main() {
println!("Hello, world!");
}

View File

@ -1,7 +1,5 @@
#![feature(structural_match)]
use std::env;
use config::UpdateConfig;
pub use context::*;

View File

@ -1,7 +1,7 @@
use std::convert::Infallible;
use std::ops::FromResidual;
use seed::fetch::{FetchError, Request};
// use seed::{FetchError, Request};
pub mod admin;
pub mod public;

View File

@ -1,226 +1,233 @@
#![feature(try_trait_v2)]
pub mod api;
#[cfg(debug_assertions)]
mod debug;
mod i18n;
mod model;
mod pages;
pub mod session;
pub mod shared;
pub mod shopping_cart;
use seed::empty;
use seed::prelude::*;
use crate::api::NetRes;
use crate::i18n::I18n;
use crate::model::Model;
use crate::pages::{AdminPage, Msg, Page, PublicPage};
use crate::session::SessionMsg;
#[macro_export]
macro_rules! fetch_page {
(public $model: expr, $page: ident, $ret: expr) => {{
let p = match &mut $model.page {
$crate::pages::Page::Public(p) => p,
_ => return $ret,
};
match p {
$crate::pages::PublicPage::$page(p) => p,
_ => return $ret,
}
}};
(admin $model: expr, $page: ident, $ret: expr) => {{
let p = match &mut $model.page {
$crate::pages::Page::Admin(p) => p,
_ => return $ret,
};
match p {
$crate::pages::AdminPage::$page(p) => p,
_ => return $ret,
}
}};
(public $model: expr, $page: ident) => {{
let p = match &mut $model.page {
crate::pages::Page::Public(p) => p,
_ => return,
};
match p {
crate::pages::PublicPage::$page(p) => p,
_ => return,
}
}};
(admin $model: expr, $page: ident) => {{
let p = match &mut $model.page {
crate::pages::Page::Admin(p) => p,
_ => return,
};
match p {
crate::pages::AdminPage::$page(p) => p,
_ => return,
}
}};
(public page $page: expr, $page_name: ident) => {{
let p = match $page {
crate::pages::Page::Public(p) => p,
_ => return,
};
match p {
crate::pages::PublicPage::$page_name(p) => p,
_ => return,
}
}};
(public page $page: expr, $page_name: ident, $ret: expr) => {{
let p = match $page {
crate::pages::Page::Public(p) => p,
_ => {
*$page = $ret;
return;
}
};
match p {
crate::pages::PublicPage::$page_name(p) => p,
_ => {
*$page = $ret;
return;
}
}
}};
(admin page $page: expr, $page_name: ident, $ret: expr) => {{
let p = match $page {
crate::pages::Page::Admin(p) => p,
_ => {
*$page = $ret;
return;
}
};
match p {
crate::pages::AdminPage::$page_name(p) => p,
_ => {
*$page = $ret;
return;
}
}
}};
}
fn init(url: Url, orders: &mut impl Orders<Msg>) -> Model {
orders
.subscribe(Msg::UrlChanged)
.perform_cmd(async { Msg::Config(crate::api::public::config().await) });
let mut model = Model {
url: url.clone().set_path(&[] as &[&str]),
token: LocalStorage::get("auth-token").ok(),
page: Page::init(url, orders),
logo: seed::document()
.query_selector("link[rel=icon]")
.ok()
.flatten()
.and_then(|el: web_sys::Element| el.get_attribute("href")),
shared: shared::Model::default(),
i18n: I18n::load(),
cart: Default::default(),
config: model::Config::default(),
#[cfg(debug_assertions)]
debug_modal: false,
};
shopping_cart::init(&mut model, orders);
session::init(&mut model, orders);
shared::init(&mut model, orders);
#[cfg(debug_assertions)]
crate::debug::init(&mut model, orders);
model
}
fn update(msg: Msg, model: &mut Model, orders: &mut impl Orders<Msg>) {
#[cfg(debug_assertions)]
if !matches!(msg, Msg::Session(SessionMsg::CheckSession)) {
seed::log!("msg", msg);
}
match msg {
#[cfg(debug_assertions)]
Msg::NoOp => {
orders.skip();
}
Msg::Config(res) => {
if let NetRes::Success(config) = res {
model.config = config.into();
}
}
Msg::Shared(msg) => {
shared::update(msg, model, orders);
}
Msg::UrlChanged(subs::UrlChanged(url)) => model.page.page_changed(url, orders),
Msg::Public(pages::public::PublicMsg::Listing(msg)) => {
let page = fetch_page!(public model, Listing);
pages::public::listing::update(msg, page, &mut orders.proxy(Into::into));
}
Msg::Public(pages::public::PublicMsg::Product(msg)) => {
let page = fetch_page!(public model, Product);
pages::public::product::update(msg, page, &mut orders.proxy(Into::into))
}
Msg::Public(pages::public::PublicMsg::SignIn(msg)) => {
let page = fetch_page!(public model, SignIn);
pages::public::sign_in::update(msg, page, &mut orders.proxy(Into::into))
}
Msg::Public(pages::public::PublicMsg::SignUp(msg)) => {
let page = fetch_page!(public model, SignUp);
pages::public::sign_up::update(msg, page, &mut orders.proxy(Into::into))
}
Msg::Public(pages::public::PublicMsg::ShoppingCart(msg)) => {
let page = fetch_page!(public model, ShoppingCart);
pages::public::shopping_cart::update(msg, page, &mut orders.proxy(Into::into))
}
Msg::Public(pages::public::PublicMsg::Checkout(msg)) => {
pages::public::checkout::update(msg, model, &mut orders.proxy(Into::into))
}
// Admin
Msg::Admin(pages::admin::Msg::Landing(msg)) => {
let page = fetch_page!(admin model, Landing);
pages::admin::landing::update(msg, page, &mut orders.proxy(Into::into))
}
Msg::Session(msg) => {
session::update(msg, model, orders);
}
Msg::Cart(msg) => {
shopping_cart::update(msg, model, orders);
}
#[cfg(debug_assertions)]
Msg::Debug(msg) => {
debug::update(msg, model);
}
}
}
fn view(model: &Model) -> Node<Msg> {
let view = match &model.page {
Page::Public(PublicPage::Listing(page)) => pages::public::listing::view(model, page),
Page::Public(PublicPage::Product(page)) => pages::public::product::view(model, page),
Page::Public(PublicPage::SignIn(page)) => pages::public::sign_in::view(model, page),
Page::Public(PublicPage::SignUp(page)) => pages::public::sign_up::view(model, page),
Page::Public(PublicPage::ShoppingCart(page)) => {
pages::public::shopping_cart::view(model, page)
}
Page::Public(PublicPage::Checkout(page)) => pages::public::checkout::view(model, page),
Page::Admin(AdminPage::Landing(page)) => pages::admin::landing::view(model, page),
_ => empty![],
};
if cfg!(debug_assertions) {
use seed::*;
div![crate::debug::view(model), view]
} else {
view
}
}
#[wasm_bindgen(start)]
pub fn start() {
App::start("main", init, update, view);
}
// #![feature(try_trait_v2)]
//
// pub mod api;
// #[cfg(debug_assertions)]
// mod debug;
// mod i18n;
// mod model;
// mod pages;
// pub mod session;
// pub mod shared;
// pub mod shopping_cart;
//
// use seed::empty;
// use seed::prelude::*;
//
// use crate::api::NetRes;
// use crate::i18n::I18n;
// use crate::model::Model;
// use crate::pages::{AdminPage, Msg, Page, PublicPage};
// use crate::session::SessionMsg;
//
// #[macro_export]
// macro_rules! fetch_page {
// (public $model: expr, $page: ident, $ret: expr) => {{
// let p = match &mut $model.page {
// $crate::pages::Page::Public(p) => p,
// _ => return $ret,
// };
// match p {
// $crate::pages::PublicPage::$page(p) => p,
// _ => return $ret,
// }
// }};
// (admin $model: expr, $page: ident, $ret: expr) => {{
// let p = match &mut $model.page {
// $crate::pages::Page::Admin(p) => p,
// _ => return $ret,
// };
// match p {
// $crate::pages::AdminPage::$page(p) => p,
// _ => return $ret,
// }
// }};
// (public $model: expr, $page: ident) => {{
// let p = match &mut $model.page {
// crate::pages::Page::Public(p) => p,
// _ => return,
// };
// match p {
// crate::pages::PublicPage::$page(p) => p,
// _ => return,
// }
// }};
// (admin $model: expr, $page: ident) => {{
// let p = match &mut $model.page {
// crate::pages::Page::Admin(p) => p,
// _ => return,
// };
// match p {
// crate::pages::AdminPage::$page(p) => p,
// _ => return,
// }
// }};
// (public page $page: expr, $page_name: ident) => {{
// let p = match $page {
// crate::pages::Page::Public(p) => p,
// _ => return,
// };
// match p {
// crate::pages::PublicPage::$page_name(p) => p,
// _ => return,
// }
// }};
// (public page $page: expr, $page_name: ident, $ret: expr) => {{
// let p = match $page {
// crate::pages::Page::Public(p) => p,
// _ => {
// *$page = $ret;
// return;
// }
// };
// match p {
// crate::pages::PublicPage::$page_name(p) => p,
// _ => {
// *$page = $ret;
// return;
// }
// }
// }};
// (admin page $page: expr, $page_name: ident, $ret: expr) => {{
// let p = match $page {
// crate::pages::Page::Admin(p) => p,
// _ => {
// *$page = $ret;
// return;
// }
// };
// match p {
// crate::pages::AdminPage::$page_name(p) => p,
// _ => {
// *$page = $ret;
// return;
// }
// }
// }};
// }
//
// fn init(url: Url, orders: &mut impl Orders<Msg>) -> Model {
// orders
// .subscribe(Msg::UrlChanged)
// .perform_cmd(async { Msg::Config(crate::api::public::config().await)
// });
//
// let mut model = Model {
// url: url.clone().set_path(&[] as &[&str]),
// token: LocalStorage::get("auth-token").ok(),
// page: Page::init(url, orders),
// logo: seed::document()
// .query_selector("link[rel=icon]")
// .ok()
// .flatten()
// .and_then(|el: web_sys::Element| el.get_attribute("href")),
// shared: shared::Model::default(),
// i18n: I18n::load(),
// cart: Default::default(),
//
// config: model::Config::default(),
// #[cfg(debug_assertions)]
// debug_modal: false,
// };
//
// shopping_cart::init(&mut model, orders);
// session::init(&mut model, orders);
// shared::init(&mut model, orders);
//
// #[cfg(debug_assertions)]
// crate::debug::init(&mut model, orders);
//
// model
// }
//
// fn update(msg: Msg, model: &mut Model, orders: &mut impl Orders<Msg>) {
// #[cfg(debug_assertions)]
// if !matches!(msg, Msg::Session(SessionMsg::CheckSession)) {
// seed::log!("msg", msg);
// }
// match msg {
// #[cfg(debug_assertions)]
// Msg::NoOp => {
// orders.skip();
// }
// Msg::Config(res) => {
// if let NetRes::Success(config) = res {
// model.config = config.into();
// }
// }
// Msg::Shared(msg) => {
// shared::update(msg, model, orders);
// }
// Msg::UrlChanged(subs::UrlChanged(url)) =>
// model.page.page_changed(url, orders),
// Msg::Public(pages::public::PublicMsg::Listing(msg)) => {
// let page = fetch_page!(public model, Listing);
// pages::public::listing::update(msg, page, &mut
// orders.proxy(Into::into)); }
// Msg::Public(pages::public::PublicMsg::Product(msg)) => {
// let page = fetch_page!(public model, Product);
// pages::public::product::update(msg, page, &mut
// orders.proxy(Into::into)) }
// Msg::Public(pages::public::PublicMsg::SignIn(msg)) => {
// let page = fetch_page!(public model, SignIn);
// pages::public::sign_in::update(msg, page, &mut
// orders.proxy(Into::into)) }
// Msg::Public(pages::public::PublicMsg::SignUp(msg)) => {
// let page = fetch_page!(public model, SignUp);
// pages::public::sign_up::update(msg, page, &mut
// orders.proxy(Into::into)) }
// Msg::Public(pages::public::PublicMsg::ShoppingCart(msg)) => {
// let page = fetch_page!(public model, ShoppingCart);
// pages::public::shopping_cart::update(msg, page, &mut
// orders.proxy(Into::into)) }
// Msg::Public(pages::public::PublicMsg::Checkout(msg)) => {
// pages::public::checkout::update(msg, model, &mut
// orders.proxy(Into::into)) }
// // Admin
// Msg::Admin(pages::admin::Msg::Landing(msg)) => {
// let page = fetch_page!(admin model, Landing);
// pages::admin::landing::update(msg, page, &mut
// orders.proxy(Into::into)) }
// Msg::Session(msg) => {
// session::update(msg, model, orders);
// }
// Msg::Cart(msg) => {
// shopping_cart::update(msg, model, orders);
// }
// #[cfg(debug_assertions)]
// Msg::Debug(msg) => {
// debug::update(msg, model);
// }
// }
// }
//
// fn view(model: &Model) -> Node<Msg> {
// let view = match &model.page {
// Page::Public(PublicPage::Listing(page)) =>
// pages::public::listing::view(model, page),
// Page::Public(PublicPage::Product(page)) =>
// pages::public::product::view(model, page),
// Page::Public(PublicPage::SignIn(page)) =>
// pages::public::sign_in::view(model, page),
// Page::Public(PublicPage::SignUp(page)) =>
// pages::public::sign_up::view(model, page),
// Page::Public(PublicPage::ShoppingCart(page)) => {
// pages::public::shopping_cart::view(model, page)
// }
// Page::Public(PublicPage::Checkout(page)) =>
// pages::public::checkout::view(model, page),
// Page::Admin(AdminPage::Landing(page)) =>
// pages::admin::landing::view(model, page), _ => empty![],
// };
//
// if cfg!(debug_assertions) {
// use seed::*;
// div![crate::debug::view(model), view]
// } else {
// view
// }
// }
//
// #[wasm_bindgen(start)]
// pub fn start() {
// App::start("main", init, update, view);
// }