diff --git a/Cargo.lock b/Cargo.lock index a95790f..3c28268 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8,15 +8,21 @@ version = "0.1.0" dependencies = [ "actix 0.13.0", "actix-rt", + "bincode", "bus", + "bytes", "config", "database_manager", + "dotenv", "fibers_rpc", "model", "pretty_env_logger", + "rumqttc", "serde", "thiserror", + "tokio", "tracing", + "tracing-subscriber", ] [[package]] @@ -674,13 +680,13 @@ dependencies = [ "human-panic", "include_dir", "jemallocator", - "messagebus", "model", "oauth2", "order_manager", "parking_lot 0.12.1", "payment_manager", "pretty_env_logger", + "rumqttc", "search_manager", "serde", "serde_json", @@ -708,6 +714,15 @@ dependencies = [ "num-traits", ] +[[package]] +name = "bincode" +version = "1.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1f45e9417d87227c7a56d22e471c6206462cba514c7590c09aff4cf6d1ddcad" +dependencies = [ + "serde", +] + [[package]] name = "bit-vec" version = "0.6.3" @@ -778,8 +793,13 @@ checksum = "572f695136211188308f16ad2ca5c851a712c464060ae6974944458eb83880ba" name = "bus" version = "0.1.0" dependencies = [ - "lifeline", - "tokio", + "bincode", + "bytes", + "model", + "rumqttc", + "serde", + "thiserror", + "tracing", ] [[package]] @@ -825,6 +845,7 @@ dependencies = [ "database_manager", "model", "pretty_env_logger", + "rumqttc", "serde", "thiserror", "tracing", @@ -1072,16 +1093,6 @@ dependencies = [ "typenum", ] -[[package]] -name = "ctor" -version = "0.1.26" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6d2301688392eb071b0bf1a37be05c469d3cc4dbbd95df672fe28ab021e6a096" -dependencies = [ - "quote", - "syn", -] - [[package]] name = "ctr" version = "0.9.2" @@ -1170,16 +1181,6 @@ dependencies = [ "syn", ] -[[package]] -name = "dashmap" -version = "4.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e77a43b28d0668df09411cb0bc9a8c2adc40f9a048afe863e05fd43251e8e39c" -dependencies = [ - "cfg-if 1.0.0", - "num_cpus", -] - [[package]] name = "data-encoding" version = "2.3.2" @@ -1200,6 +1201,7 @@ dependencies = [ "model", "pretty_env_logger", "rand", + "rumqttc", "serde", "sqlx", "sqlx-core", @@ -1341,6 +1343,7 @@ dependencies = [ "config", "model", "pretty_env_logger", + "rumqttc", "sendgrid", "serde", "serde_json", @@ -1378,15 +1381,6 @@ dependencies = [ "termcolor", ] -[[package]] -name = "erased-serde" -version = "0.3.23" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "54558e0ba96fbe24280072642eceb9d7d442e32c7ec0ea9e7ecd7b4ea2cf4e11" -dependencies = [ - "serde", -] - [[package]] name = "event-listener" version = "2.5.3" @@ -1518,6 +1512,19 @@ dependencies = [ "thiserror", ] +[[package]] +name = "flume" +version = "0.10.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1657b4441c3403d9f7b3409e47575237dac27b1b5726df654a6ecbf92f0f7577" +dependencies = [ + "futures-core", + "futures-sink", + "nanorand", + "pin-project", + "spin 0.9.4", +] + [[package]] name = "fnv" version = "1.0.7" @@ -1567,6 +1574,7 @@ dependencies = [ "fibers_rpc", "model", "pretty_env_logger", + "rumqttc", "serde", "thiserror", "tokio", @@ -2285,6 +2293,7 @@ dependencies = [ "fluent", "model", "pretty_env_logger", + "rumqttc", "thiserror", "tracing", "unic-langid", @@ -2332,23 +2341,6 @@ dependencies = [ "vcpkg", ] -[[package]] -name = "lifeline" -version = "0.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b49b836e2676e5f6d926c09dc1a13b52c3315b459554df4514d76756bfa72a0e" -dependencies = [ - "anyhow", - "async-trait", - "futures-util", - "log", - "pin-project 0.4.30", - "postage", - "regex", - "thiserror", - "tokio", -] - [[package]] name = "link-cplusplus" version = "1.0.7" @@ -2435,39 +2427,6 @@ dependencies = [ "autocfg", ] -[[package]] -name = "messagebus" -version = "0.9.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81124c365f95dc69d0a6e572220dc57ca2d0148fd130bfb2b54a6aa1506102d5" -dependencies = [ - "async-trait", - "ctor", - "dashmap", - "erased-serde", - "futures 0.3.25", - "log", - "messagebus_derive", - "parking_lot 0.11.2", - "serde", - "serde_derive", - "sharded-slab", - "smallvec", - "thiserror", - "tokio", -] - -[[package]] -name = "messagebus_derive" -version = "0.2.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49f364a3b76588b4997b63e99e1bf73e2cf37e2546e97fbae4cb2a268a74e57e" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "mime" version = "0.3.16" @@ -2562,6 +2521,15 @@ dependencies = [ "validator 0.15.0", ] +[[package]] +name = "nanorand" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3" +dependencies = [ + "getrandom", +] + [[package]] name = "native-tls" version = "0.2.10" @@ -2796,7 +2764,7 @@ dependencies = [ "js-sys", "lazy_static", "percent-encoding", - "pin-project 1.0.12", + "pin-project", "rand", "thiserror", "tokio", @@ -2823,6 +2791,7 @@ dependencies = [ "database_manager", "model", "pretty_env_logger", + "rumqttc", "serde", "thiserror", "tracing", @@ -2947,6 +2916,7 @@ dependencies = [ "parking_lot 0.12.1", "pay_u", "pretty_env_logger", + "rumqttc", "serde", "testx", "thiserror", @@ -3043,33 +3013,13 @@ dependencies = [ "uncased", ] -[[package]] -name = "pin-project" -version = "0.4.30" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3ef0f924a5ee7ea9cbcea77529dba45f8a9ba9f622419fe3386ca581a3ae9d5a" -dependencies = [ - "pin-project-internal 0.4.30", -] - [[package]] name = "pin-project" version = "1.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ad29a609b6bcd67fee905812e544992d216af9d755757c05ed2d0e15a74c6ecc" dependencies = [ - "pin-project-internal 1.0.12", -] - -[[package]] -name = "pin-project-internal" -version = "0.4.30" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "851c8d0ce9bebe43790dedfc86614c23494ac9f423dd618d3a61fc693eafe61e" -dependencies = [ - "proc-macro2", - "quote", - "syn", + "pin-project-internal", ] [[package]] @@ -3119,21 +3069,6 @@ dependencies = [ "universal-hash", ] -[[package]] -name = "postage" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a63d25391d04a097954b76aba742b6b5b74f213dfe3dbaeeb36e8ddc1c657f0b" -dependencies = [ - "atomic", - "crossbeam-queue", - "log", - "pin-project 1.0.12", - "pollster", - "static_assertions", - "thiserror", -] - [[package]] name = "ppv-lite86" version = "0.2.16" @@ -3341,7 +3276,7 @@ dependencies = [ "percent-encoding", "pin-project-lite", "rustls", - "rustls-pemfile", + "rustls-pemfile 1.0.1", "serde", "serde_json", "serde_urlencoded", @@ -3366,12 +3301,31 @@ dependencies = [ "cc", "libc", "once_cell", - "spin", + "spin 0.5.2", "untrusted", "web-sys", "winapi 0.3.9", ] +[[package]] +name = "rumqttc" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "499b7ab08ffa5a722958b6ce1b7c0270bea30909f589d12c5ec3a051afe423fc" +dependencies = [ + "bytes", + "flume", + "futures 0.3.25", + "http", + "log", + "pollster", + "rustls-native-certs", + "rustls-pemfile 0.3.0", + "thiserror", + "tokio", + "tokio-rustls", +] + [[package]] name = "rust_decimal" version = "1.26.1" @@ -3435,6 +3389,27 @@ dependencies = [ "webpki", ] +[[package]] +name = "rustls-native-certs" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0167bac7a9f490495f3c33013e7722b53cb087ecbe082fb0c6387c96f634ea50" +dependencies = [ + "openssl-probe", + "rustls-pemfile 1.0.1", + "schannel", + "security-framework", +] + +[[package]] +name = "rustls-pemfile" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ee86d63972a7c661d1536fefe8c3c8407321c3df668891286de28abcd087360" +dependencies = [ + "base64", +] + [[package]] name = "rustls-pemfile" version = "1.0.1" @@ -3513,6 +3488,7 @@ dependencies = [ "model", "parking_lot 0.12.1", "pretty_env_logger", + "rumqttc", "serde", "sonic-channel", "thiserror", @@ -3782,6 +3758,15 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" +[[package]] +name = "spin" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f6002a767bff9e83f8eeecf883ecb8011875a21ae8da43bffb817a57e78cc09" +dependencies = [ + "lock_api", +] + [[package]] name = "splay_tree" version = "0.2.10" @@ -3855,7 +3840,7 @@ dependencies = [ "rand", "rust_decimal", "rustls", - "rustls-pemfile", + "rustls-pemfile 1.0.1", "serde", "serde_json", "sha1", @@ -3904,12 +3889,6 @@ dependencies = [ "tokio-rustls", ] -[[package]] -name = "static_assertions" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" - [[package]] name = "stringprep" version = "0.1.2" @@ -4125,6 +4104,7 @@ dependencies = [ "password-hash", "pretty_env_logger", "rand_core", + "rumqttc", "serde", "sha2", "testx", diff --git a/actors/account_manager/Cargo.toml b/actors/account_manager/Cargo.toml index d3c8b29..724db7e 100644 --- a/actors/account_manager/Cargo.toml +++ b/actors/account_manager/Cargo.toml @@ -3,15 +3,25 @@ name = "account_manager" version = "0.1.0" edition = "2021" +[[bin]] +name = "account-manager" +path = "./src/main.rs" + [dependencies] actix = { version = "0.13", features = [] } actix-rt = { version = "2.7", features = [] } +bincode = { version = "1.3.3" } bus = { path = "../../shared/bus" } +bytes = { version = "1.2.1" } config = { path = "../../shared/config" } database_manager = { path = "../database_manager" } +dotenv = { version = "0.15.0" } fibers_rpc = { version = "0.3.4", features = [] } model = { path = "../../shared/model" } pretty_env_logger = { version = "0.4", features = [] } +rumqttc = { version = "*" } serde = { version = "1.0.137", features = ["derive"] } thiserror = { version = "1.0.31" } +tokio = { version = "1.21.2", features = ['full'] } tracing = { version = "0.1.6" } +tracing-subscriber = { version = "0.3.16" } diff --git a/actors/account_manager/src/main.rs b/actors/account_manager/src/main.rs new file mode 100644 index 0000000..5449cf8 --- /dev/null +++ b/actors/account_manager/src/main.rs @@ -0,0 +1,145 @@ +#![feature(structural_match)] + +use std::time::Duration; + +use bus::account::{AccountFailure, CreateAccount, Topic}; +use config::{SharedAppConfig, UpdateConfig}; +use model::{Encrypt, FullAccount}; +use rumqttc::{AsyncClient, Event, Incoming, MqttOptions, QoS}; + +pub type Result = std::result::Result; + +#[derive(Debug, Copy, Clone, serde::Serialize, thiserror::Error)] +pub enum Error { + #[error("Unable to send or receive msg from database")] + DbCritical, + #[error("Failed to load account data")] + Account, + #[error("Failed to load account addresses")] + Addresses, + #[error("Unable to save record")] + Saving, + #[error("Unable to hash password")] + Hashing, + #[error("{0}")] + Db(#[from] database_manager::Error), +} + +pub struct Opts {} + +impl UpdateConfig for Opts {} + +#[actix::main] +async fn main() { + dotenv::dotenv().ok(); + tracing_subscriber::fmt::init(); + + let opts = Opts {}; + + let config = config::default_load(&opts); + + let db = database_manager::Database::build(config.clone()).await; + + let mut mqttoptions = MqttOptions::new(bus::account::CLIENT_NAME, "0.0.0.0", 1883); + mqttoptions.set_keep_alive(Duration::from_secs(5)); + + let (client, mut event_loop) = AsyncClient::new(mqttoptions, 10); + client + .subscribe(Topic::CreateAccount, QoS::AtLeastOnce) + .await + .unwrap(); + + let client = bus::AsyncClient(client); + loop { + let notification = event_loop.poll().await; + + match notification { + Ok(Event::Incoming(Incoming::Publish(publish))) => match publish.topic.as_str() { + topic if Topic::CreateAccount == topic => { + if let Ok(msg) = CreateAccount::try_from(publish.payload) { + match create_account(msg, &db, config.clone()).await { + Ok(account) => { + client + .publish_or_log( + Topic::AccountCreated, + QoS::AtLeastOnce, + true, + model::Account::from(account), + ) + .await; + } + Err(e) => { + tracing::error!("{}", e); + let m = match e { + Error::Hashing => Some(AccountFailure::FailedToHashPassword), + Error::Saving => Some(AccountFailure::SaveAccount), + Error::DbCritical => Some(AccountFailure::InternalServerError), + _ => None, + }; + if let Some(m) = m { + client.publish_or_log( + Topic::SignUpFailure, + QoS::AtLeastOnce, + true, + m, + ); + } + } + } + } + } + _ => {} + }, + Ok(Event::Incoming(_incoming)) => {} + Ok(Event::Outgoing(_outgoing)) => {} + Err(e) => { + tracing::error!("{}", e); + } + } + } +} + +pub(crate) async fn create_account( + msg: CreateAccount, + db: &database_manager::Database, + config: SharedAppConfig, +) -> Result { + let hash = { + match msg.password.encrypt(&config.lock().web().pass_salt()) { + Ok(hash) => hash, + Err(e) => { + tracing::error!("{e:?}"); + return Err(Error::Hashing); + } + } + }; + + let mut t = db.pool().begin().await.map_err(|e| { + tracing::error!("{}", e); + Error::DbCritical + })?; + let account: FullAccount = match database_manager::create_account( + database_manager::CreateAccount { + email: msg.email, + login: msg.login, + pass_hash: model::PassHash::new(hash), + role: msg.role, + }, + &mut t, + ) + .await + { + Ok(r) => r, + Err(e) => { + tracing::error!("{}", e); + t.rollback().await.ok(); + return Err(Error::Saving); + } + }; + t.commit().await.map_err(|e| { + tracing::error!("{}", e); + Error::DbCritical + })?; + + Ok(account) +} diff --git a/actors/cart_manager/Cargo.toml b/actors/cart_manager/Cargo.toml index 04da9f1..ae41b0a 100644 --- a/actors/cart_manager/Cargo.toml +++ b/actors/cart_manager/Cargo.toml @@ -12,6 +12,7 @@ config = { path = "../../shared/config" } database_manager = { path = "../database_manager" } model = { path = "../../shared/model" } pretty_env_logger = { version = "0.4", features = [] } +rumqttc = { version = "*" } serde = { version = "1.0.137", features = ["derive"] } thiserror = { version = "1.0.31" } tracing = { version = "0.1.34" } diff --git a/actors/database_manager/Cargo.toml b/actors/database_manager/Cargo.toml index 2b83fd1..536b8f7 100644 --- a/actors/database_manager/Cargo.toml +++ b/actors/database_manager/Cargo.toml @@ -17,6 +17,7 @@ itertools = { version = "0.10.3" } model = { path = "../../shared/model" } pretty_env_logger = { version = "0.4", features = [] } rand = { version = "0.8.5", optional = true } +rumqttc = { version = "*" } serde = { version = "1.0", features = ["derive"] } sqlx = { version = "0.6.2", features = ["migrate", "runtime-actix-rustls", "all-types", "postgres"] } sqlx-core = { version = "0.6.2", features = [] } diff --git a/actors/database_manager/src/accounts.rs b/actors/database_manager/src/accounts.rs index 1e242ea..4309d31 100644 --- a/actors/database_manager/src/accounts.rs +++ b/actors/database_manager/src/accounts.rs @@ -65,7 +65,7 @@ db_async_handler!( inner_create_account ); -pub(crate) async fn create_account( +pub async fn create_account( msg: CreateAccount, pool: &mut sqlx::Transaction<'_, sqlx::Postgres>, ) -> Result { diff --git a/actors/email_manager/Cargo.toml b/actors/email_manager/Cargo.toml index f7ad2dd..e82aa72 100644 --- a/actors/email_manager/Cargo.toml +++ b/actors/email_manager/Cargo.toml @@ -10,6 +10,7 @@ chrono = { version = "0.4", features = ["serde"] } config = { path = "../../shared/config" } model = { path = "../../shared/model" } pretty_env_logger = { version = "0.4", features = [] } +rumqttc = { version = "*" } sendgrid = { version = "0.17", features = ["async"] } serde = { version = "1.0", features = ["derive"] } serde_json = { version = "1.0", features = [] } diff --git a/actors/fs_manager/Cargo.toml b/actors/fs_manager/Cargo.toml index 0fd3581..c534c99 100644 --- a/actors/fs_manager/Cargo.toml +++ b/actors/fs_manager/Cargo.toml @@ -13,8 +13,10 @@ config = { path = "../../shared/config" } fibers_rpc = { version = "0.3.4", features = [] } model = { path = "../../shared/model" } pretty_env_logger = { version = "0.4", features = [] } +rumqttc = { version = "*" } serde = { version = "1.0", features = ["derive"] } thiserror = { version = "1.0.31" } tokio = { version = "1.18.1", features = ["full"] } tracing = { version = "0.1.34" } uuid = { version = "1.2.1", features = ["serde"] } + diff --git a/actors/lang_provider/Cargo.toml b/actors/lang_provider/Cargo.toml index 1b2ceca..2765547 100644 --- a/actors/lang_provider/Cargo.toml +++ b/actors/lang_provider/Cargo.toml @@ -10,6 +10,7 @@ config = { path = "../../shared/config" } fluent = { version = "0.16.0" } model = { path = "../../shared/model" } pretty_env_logger = { version = "0.4", features = [] } +rumqttc = { version = "*" } thiserror = { version = "1.0.31" } tracing = { version = "0.1.34" } unic-langid = { version = "0.9.0" } diff --git a/actors/order_manager/Cargo.toml b/actors/order_manager/Cargo.toml index 1908680..834c62a 100644 --- a/actors/order_manager/Cargo.toml +++ b/actors/order_manager/Cargo.toml @@ -11,6 +11,7 @@ config = { path = "../../shared/config" } database_manager = { path = "../database_manager" } model = { path = "../../shared/model" } pretty_env_logger = { version = "0.4", features = [] } +rumqttc = { version = "*" } serde = { version = "1.0.137", features = ["derive"] } thiserror = { version = "1.0.31" } tracing = { version = "0.1.34" } diff --git a/actors/payment_manager/Cargo.toml b/actors/payment_manager/Cargo.toml index 97d7814..eded3bc 100644 --- a/actors/payment_manager/Cargo.toml +++ b/actors/payment_manager/Cargo.toml @@ -14,6 +14,7 @@ model = { path = "../../shared/model" } parking_lot = { version = "0.12", features = [] } pay_u = { version = '0.1', features = ["single-client"] } pretty_env_logger = { version = "0.4", features = [] } +rumqttc = { version = "*" } serde = { version = "1.0", features = ["derive"] } thiserror = { version = "1.0.31" } tracing = { version = "0.1.34" } diff --git a/actors/search_manager/Cargo.toml b/actors/search_manager/Cargo.toml index c1eda3e..1451ef3 100644 --- a/actors/search_manager/Cargo.toml +++ b/actors/search_manager/Cargo.toml @@ -12,6 +12,7 @@ derive_more = { version = "0.99", features = [] } model = { path = "../../shared/model" } parking_lot = { version = "0.12", features = [] } pretty_env_logger = { version = "0.4", features = [] } +rumqttc = { version = "*" } serde = { version = "1.0", features = ["derive"] } sonic-channel = { version = "1.1.0", features = ["ingest"] } thiserror = { version = "1.0.31" } diff --git a/actors/token_manager/Cargo.toml b/actors/token_manager/Cargo.toml index 80dbb09..087ce29 100644 --- a/actors/token_manager/Cargo.toml +++ b/actors/token_manager/Cargo.toml @@ -20,6 +20,7 @@ parking_lot = { version = "0.12", features = [] } password-hash = { version = "0.4", features = ["alloc"] } pretty_env_logger = { version = "0.4", features = [] } rand_core = { version = "0.6", features = ["std"] } +rumqttc = { version = "*" } serde = { version = "1.0", features = ["derive"] } sha2 = { version = "0.10", features = [] } thiserror = { version = "1.0.31" } diff --git a/api/Cargo.toml b/api/Cargo.toml index f1c0242..45942e1 100644 --- a/api/Cargo.toml +++ b/api/Cargo.toml @@ -4,55 +4,53 @@ version = "0.1.0" edition = "2021" [dependencies] -model = { path = "../shared/model", version = "0.1", features = ["db"] } -config = { path = "../shared/config" } -database_manager = { path = "../actors/database_manager" } -cart_manager = { path = "../actors/cart_manager" } -email_manager = { path = "../actors/email_manager" } -order_manager = { path = "../actors/order_manager" } -payment_manager = { path = "../actors/payment_manager" } -search_manager = { path = "../actors/search_manager" } -token_manager = { path = "../actors/token_manager" } -fs_manager = { path = "../actors/fs_manager" } account_manager = { path = "../actors/account_manager" } -human-panic = { version = "1.0.3" } -bytes = { version = "1.1.0" } actix = { version = "0.13", features = [] } +actix-broker = { version = "0.4", features = [] } +actix-cors = { version = "0.6", features = [] } +actix-files = { version = "0.6", features = [] } +actix-identity = { version = "0.4", features = [] } +actix-multipart = { version = "0.4", features = [] } +actix-redis = { version = "0.11", features = [] } actix-rt = { version = "2.7", features = [] } +actix-session = { version = "0.6", features = ["actix-redis", "redis-actor-session"] } actix-web = { version = "4.0", features = [] } actix-web-httpauth = { version = "0.6", features = [] } -actix-cors = { version = "0.6", features = [] } -actix-broker = { version = "0.4", features = [] } -actix-identity = { version = "0.4", features = [] } actix-web-opentelemetry = { version = "0.12", features = [] } -actix-session = { version = "0.6", features = ["actix-redis", "redis-actor-session"] } -actix-redis = { version = "0.11", features = [] } -actix-files = { version = "0.6", features = [] } -actix-multipart = { version = "0.4", features = [] } -gumdrop = { version = "0.8", features = [] } -tera = { version = "1.15", features = [] } -uuid = { version = "1.2.1", features = ["serde"] } +async-trait = { version = "0.1", features = [] } +bytes = { version = "1.1.0" } +cart_manager = { path = "../actors/cart_manager" } chrono = { version = "0.4", features = ["serde"] } -serde = { version = "1.0", features = ["derive"] } -serde_json = { version = "1.0", features = [] } -toml = { version = "0.5", features = [] } -sqlx = { version = "0.6.2", features = ["migrate", "runtime-actix-rustls", "all-types", "postgres"] } -sqlx-core = { version = "0.6.2", features = [] } -thiserror = { version = "1.0", features = [] } -validator = { version = "0.14", features = [] } -tracing = { version = "0.1.34" } -tracing-subscriber = { version = "0.3.11" } -pretty_env_logger = { version = "0.4", features = [] } -dotenv = { version = "0.15", features = [] } +config = { path = "../shared/config" } +database_manager = { path = "../actors/database_manager" } derive_more = { version = "0.99", features = [] } -parking_lot = { version = "0.12", features = [] } -tokio = { version = "1.17", features = ["full"] } +dotenv = { version = "0.15", features = [] } +email_manager = { path = "../actors/email_manager" } +fs_manager = { path = "../actors/fs_manager" } futures = { version = "0.3", features = [] } futures-util = { version = "0.3", features = [] } -oauth2 = { version = "4.1", features = [] } -async-trait = { version = "0.1", features = [] } -jemallocator = { version = "0.3", features = [] } +gumdrop = { version = "0.8", features = [] } +human-panic = { version = "1.0.3" } include_dir = { version = "0.7.2", features = [] } - -# For rewrite into bus-based app -messagebus = { version = "0.9.13" } +jemallocator = { version = "0.3", features = [] } +model = { path = "../shared/model", version = "0.1", features = ["db"] } +oauth2 = { version = "4.1", features = [] } +order_manager = { path = "../actors/order_manager" } +parking_lot = { version = "0.12", features = [] } +payment_manager = { path = "../actors/payment_manager" } +pretty_env_logger = { version = "0.4", features = [] } +rumqttc = { version = "*" } +search_manager = { path = "../actors/search_manager" } +serde = { version = "1.0", features = ["derive"] } +serde_json = { version = "1.0", features = [] } +sqlx = { version = "0.6.2", features = ["migrate", "runtime-actix-rustls", "all-types", "postgres"] } +sqlx-core = { version = "0.6.2", features = [] } +tera = { version = "1.15", features = [] } +thiserror = { version = "1.0", features = [] } +token_manager = { path = "../actors/token_manager" } +tokio = { version = "1.17", features = ["full"] } +toml = { version = "0.5", features = [] } +tracing = { version = "0.1.34" } +tracing-subscriber = { version = "0.3.11" } +uuid = { version = "1.2.1", features = ["serde"] } +validator = { version = "0.14", features = [] } diff --git a/config/rumqqtd.conf b/config/rumqqtd.conf new file mode 100644 index 0000000..773f8cf --- /dev/null +++ b/config/rumqqtd.conf @@ -0,0 +1,27 @@ +# Broker id. Used to identify local node of the replication mesh +id = 0 + +# A commitlog read will pull full segment. Make sure that a segment isn't +# too big as async tcp writes readiness of one connection might affect tail +# latencies of other connection. Not a problem with preempting runtimes +[router] +id = 0 +dir = "/tmp/rumqttd" +max_segment_size = 10240 +max_segment_count = 10 +max_connections = 10001 + +# Configuration of server and connections that it accepts +[servers.1] +listen = "0.0.0.0:1883" +next_connection_delay_ms = 1 + [servers.1.connections] + connection_timeout_ms = 5000 + max_client_id_len = 256 + throttle_delay_ms = 0 + max_payload_size = 5120 + max_inflight_count = 200 + max_inflight_size = 1024 + +[console] +listen = "0.0.0.0:3030" diff --git a/config/rumqttd.toml b/config/rumqttd.toml new file mode 100644 index 0000000..6935ed6 --- /dev/null +++ b/config/rumqttd.toml @@ -0,0 +1,62 @@ +id = 0 + +# A commitlog read will pull full segment. Make sure that a segment isn't +# too big as async tcp writes readiness of one connection might affect tail +# latencies of other connection. Not a problem with preempting runtimes +[router] +id = 0 +instant_ack = true +max_segment_size = 104857600 +max_segment_count = 10 +max_read_len = 10240 +max_connections = 10001 +dir = "/tmp/rumqttd" + +# Configuration of server and connections that it accepts +[v4.1] +name = "v4-1" +listen = "0.0.0.0:1883" +next_connection_delay_ms = 1 + [v4.1.connections] + connection_timeout_ms = 60000 + max_client_id_len = 256 + throttle_delay_ms = 0 + max_payload_size = 20480 + max_inflight_count = 500 + max_inflight_size = 1024 + dynamic_filters = true + +# Example configuration for a TLS enabled server +# [v4.2] +# name = "v4-2" +# listen = "0.0.0.0:8883" +# next_connection_delay_ms = 10 +# # tls config for rustls +# [v4.2.tls] +# certpath = "./localhost.cert.pem" +# keypath = "./localhost.key.pem" +# capath = "./ca.cert.pem" +# # settings for all the connections on this server +# [v4.2.connections] +# connection_timeout_ms = 60000 +# throttle_delay_ms = 0 +# max_payload_size = 20480 +# max_inflight_count = 100 +# max_inflight_size = 1024 + +[v5.1] +name = "v5-1" +listen = "0.0.0.0:1884" +next_connection_delay_ms = 1 + [v5.1.connections] + connection_timeout_ms = 60000 + max_client_id_len = 256 + throttle_delay_ms = 0 + max_payload_size = 20480 + max_inflight_count = 500 + max_inflight_size = 1024 + +[ws] + +[console] +listen = "0.0.0.0:3030" diff --git a/shared/bus/Cargo.toml b/shared/bus/Cargo.toml index 0b3ad9e..121f194 100644 --- a/shared/bus/Cargo.toml +++ b/shared/bus/Cargo.toml @@ -4,5 +4,10 @@ version = "0.1.0" edition = "2021" [dependencies] -lifeline = { version = "0.6.1" } -tokio = { version = "1.18.2", features = ["full"] } +serde = { version = "*", features = ['derive'] } +bincode = { version = "*" } +model = { path = "../model" } +bytes = { version = "1.2.1" } +thiserror = { version = "1.0.37" } +tracing = { version = "0.1.37" } +rumqttc = { version = "0.17.0" } diff --git a/shared/bus/src/lib.rs b/shared/bus/src/lib.rs index e8df069..eb8d489 100644 --- a/shared/bus/src/lib.rs +++ b/shared/bus/src/lib.rs @@ -1,13 +1,118 @@ -pub mod messages; -pub use lifeline; -use lifeline::lifeline_bus; -pub use lifeline::Message; -pub use tokio::sync::mpsc; +#![feature(structural_match)] -pub use crate::messages::*; +pub struct AsyncClient(pub rumqttc::AsyncClient); -lifeline_bus!(pub struct MainBus); +impl AsyncClient { + pub async fn publish, T: serde::Serialize>( + &self, + topic: Topic, + qos: rumqttc::QoS, + retain: bool, + t: T, + ) -> Result<(), rumqttc::ClientError> { + let v = bincode::serialize(&t).unwrap_or_default(); + let bytes = bytes::Bytes::copy_from_slice(&v); + self.0.publish_bytes(topic, qos, retain, bytes).await + } -// impl Message for MainSend { -// type Channel = mpsc::Sender; -// } + pub async fn publish_or_log, T: serde::Serialize>( + &self, + topic: Topic, + qos: rumqttc::QoS, + retain: bool, + t: T, + ) { + if let Err(e) = self.publish(topic, qos, retain, t).await { + tracing::error!("{}", e); + } + } +} + +pub mod account { + use model::{Email, Login, Password, Role}; + + #[derive(Debug, thiserror::Error)] + pub enum Error { + #[error("mqtt payload has invalid create account data")] + InvalidCreateAccount, + #[error("mqtt payload has invalid account failure data")] + InvalidAccountFailure, + } + + pub static CLIENT_NAME: &str = "account-manager"; + + #[derive(Copy, Clone, Debug, PartialOrd, PartialEq, serde::Serialize, serde::Deserialize)] + pub enum Topic { + CreateAccount, + AccountCreated, + SignUpFailure, + } + + impl Into for Topic { + fn into(self) -> String { + String::from(self.to_str()) + } + } + + impl<'s> PartialEq<&'s str> for Topic { + fn eq(&self, other: &&'s str) -> bool { + self.to_str() == *other + } + } + + impl PartialEq for Topic { + fn eq(&self, other: &String) -> bool { + self.to_str() == other.as_str() + } + } + + impl Topic { + pub fn to_str(self) -> &'static str { + match self { + Topic::CreateAccount => "account/create", + Topic::AccountCreated => "account/created", + Topic::SignUpFailure => "account/failure", + } + } + } + + #[derive(Debug, serde::Serialize, serde::Deserialize)] + pub struct CreateAccount { + pub email: Email, + pub login: Login, + pub password: Password, + pub role: Role, + } + + impl TryFrom for CreateAccount { + type Error = Error; + + fn try_from(value: bytes::Bytes) -> Result { + bincode::deserialize(value.as_ref()).map_err(|e| { + tracing::error!("{}", e); + Error::InvalidCreateAccount + }) + } + } + + #[derive(Debug, thiserror::Error, serde::Serialize, serde::Deserialize)] + pub enum AccountFailure { + #[error("Failed to hash password")] + FailedToHashPassword, + #[error("Failed to save account")] + SaveAccount, + #[error("Internal server error")] + InternalServerError, + } + + impl TryFrom for AccountFailure { + type Error = Error; + + fn try_from(value: bytes::Bytes) -> Result { + bincode::deserialize(value.as_ref()).map_err(|e| { + tracing::error!("{}", e); + Error::InvalidAccountFailure + }) + } + } +} diff --git a/shared/bus/src/messages.rs b/shared/bus/src/messages.rs deleted file mode 100644 index 84c907d..0000000 --- a/shared/bus/src/messages.rs +++ /dev/null @@ -1,2 +0,0 @@ -#[derive(Debug, Clone, Eq, PartialEq)] -pub enum Main {}