diff --git a/Cargo.lock b/Cargo.lock index 3256bee..ba2c279 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9,14 +9,17 @@ dependencies = [ "actix 0.13.0", "actix-rt", "bincode", - "bus", "bytes", + "channels", "config", "database_manager", "dotenv", "futures 0.3.25", + "gumdrop", "json", "model", + "opentelemetry 0.17.0", + "opentelemetry-jaeger", "pretty_env_logger", "rumqttc", "serde", @@ -24,6 +27,7 @@ dependencies = [ "thiserror", "tokio", "tracing", + "tracing-opentelemetry", "tracing-subscriber", ] @@ -419,8 +423,8 @@ dependencies = [ "actix-http", "actix-web", "futures-util", - "opentelemetry", - "opentelemetry-semantic-conventions", + "opentelemetry 0.17.0", + "opentelemetry-semantic-conventions 0.9.0", "serde", ] @@ -653,7 +657,6 @@ checksum = "b645a089122eccb6111b4f81cbc1a49f5900ac4666bb93ac027feaecf15607bf" name = "bazzar" version = "0.1.0" dependencies = [ - "account_manager", "actix 0.13.0", "actix-broker", "actix-cors", @@ -669,6 +672,7 @@ dependencies = [ "async-trait", "bytes", "cart_manager", + "channels", "chrono", "config", "database_manager", @@ -694,6 +698,7 @@ dependencies = [ "serde_json", "sqlx", "sqlx-core", + "tarpc", "tera", "thiserror", "token_manager", @@ -791,19 +796,6 @@ version = "3.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "572f695136211188308f16ad2ca5c851a712c464060ae6974944458eb83880ba" -[[package]] -name = "bus" -version = "0.1.0" -dependencies = [ - "bincode", - "bytes", - "model", - "rumqttc", - "serde", - "thiserror", - "tracing", -] - [[package]] name = "bytecodec" version = "0.4.15" @@ -841,7 +833,7 @@ version = "0.1.0" dependencies = [ "actix 0.13.0", "actix-rt", - "bus", + "channels", "chrono", "config", "database_manager", @@ -875,6 +867,21 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "channels" +version = "0.1.0" +dependencies = [ + "bincode", + "bytes", + "config", + "model", + "rumqttc", + "serde", + "tarpc", + "thiserror", + "tracing", +] + [[package]] name = "chrono" version = "0.4.22" @@ -1329,6 +1336,18 @@ dependencies = [ "syn", ] +[[package]] +name = "educe" +version = "0.4.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb0188e3c3ba8df5753894d54461f0e39bc91741dc5b22e1c46999ec2c71f4e4" +dependencies = [ + "enum-ordinalize", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "either" version = "1.8.0" @@ -1370,6 +1389,20 @@ dependencies = [ "cfg-if 1.0.0", ] +[[package]] +name = "enum-ordinalize" +version = "3.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a62bb1df8b45ecb7ffa78dca1c17a438fb193eb083db0b1b494d2a61bcb5096a" +dependencies = [ + "num-bigint", + "num-traits", + "proc-macro2", + "quote", + "rustc_version 0.4.0", + "syn", +] + [[package]] name = "env_logger" version = "0.7.1" @@ -2172,6 +2205,12 @@ dependencies = [ "cfg-if 1.0.0", ] +[[package]] +name = "integer-encoding" +version = "3.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02" + [[package]] name = "intl-memoizer" version = "0.5.1" @@ -2411,6 +2450,15 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "matchers" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" +dependencies = [ + "regex-automata", +] + [[package]] name = "matches" version = "0.1.9" @@ -2785,13 +2833,81 @@ dependencies = [ "tokio-stream", ] +[[package]] +name = "opentelemetry" +version = "0.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69d6c3d7288a106c0a363e4b0e8d308058d56902adefb16f4936f417ffef086e" +dependencies = [ + "opentelemetry_api", + "opentelemetry_sdk", +] + +[[package]] +name = "opentelemetry-jaeger" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e785d273968748578931e4dc3b4f5ec86b26e09d9e0d66b55adda7fce742f7a" +dependencies = [ + "async-trait", + "futures 0.3.25", + "futures-executor", + "once_cell", + "opentelemetry 0.18.0", + "opentelemetry-semantic-conventions 0.10.0", + "thiserror", + "thrift", +] + [[package]] name = "opentelemetry-semantic-conventions" version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "985cc35d832d412224b2cffe2f9194b1b89b6aa5d0bef76d080dce09d90e62bd" dependencies = [ - "opentelemetry", + "opentelemetry 0.17.0", +] + +[[package]] +name = "opentelemetry-semantic-conventions" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b02e0230abb0ab6636d18e2ba8fa02903ea63772281340ccac18e0af3ec9eeb" +dependencies = [ + "opentelemetry 0.18.0", +] + +[[package]] +name = "opentelemetry_api" +version = "0.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c24f96e21e7acc813c7a8394ee94978929db2bcc46cf6b5014fc612bf7760c22" +dependencies = [ + "futures-channel", + "futures-util", + "indexmap", + "js-sys", + "once_cell", + "pin-project-lite", + "thiserror", +] + +[[package]] +name = "opentelemetry_sdk" +version = "0.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ca41c4933371b61c2a2f214bf16931499af4ec90543604ec828f7a625c09113" +dependencies = [ + "async-trait", + "crossbeam-channel", + "futures-channel", + "futures-executor", + "futures-util", + "once_cell", + "opentelemetry_api", + "percent-encoding", + "rand", + "thiserror", ] [[package]] @@ -2812,6 +2928,15 @@ dependencies = [ "uuid 1.2.1", ] +[[package]] +name = "ordered-float" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3305af35278dd29f46fcdd139e0b1fbfae2153f0e5928b39b035542dd31e37b7" +dependencies = [ + "num-traits", +] + [[package]] name = "os_type" version = "2.6.0" @@ -3248,6 +3373,15 @@ dependencies = [ "regex-syntax", ] +[[package]] +name = "regex-automata" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" +dependencies = [ + "regex-syntax", +] + [[package]] name = "regex-syntax" version = "0.6.27" @@ -3952,13 +4086,15 @@ dependencies = [ "fnv", "futures 0.3.25", "humantime 2.1.0", - "opentelemetry", + "opentelemetry 0.17.0", "pin-project", "rand", + "serde", "static_assertions", "tarpc-plugins", "thiserror", "tokio", + "tokio-serde", "tokio-util 0.7.4", "tracing", "tracing-opentelemetry", @@ -4063,6 +4199,28 @@ dependencies = [ "once_cell", ] +[[package]] +name = "threadpool" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d050e60b33d41c19108b32cea32164033a9013fe3b46cbd4457559bfbf77afaa" +dependencies = [ + "num_cpus", +] + +[[package]] +name = "thrift" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09678c4cdbb4eed72e18b7c2af1329c69825ed16fcbac62d083fc3e2b0590ff0" +dependencies = [ + "byteorder", + "integer-encoding", + "log", + "ordered-float", + "threadpool", +] + [[package]] name = "time" version = "0.1.44" @@ -4219,6 +4377,22 @@ dependencies = [ "webpki", ] +[[package]] +name = "tokio-serde" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "911a61637386b789af998ee23f50aa30d5fd7edcec8d6d3dedae5e5815205466" +dependencies = [ + "bincode", + "bytes", + "educe", + "futures-core", + "futures-sink", + "pin-project", + "serde", + "serde_json", +] + [[package]] name = "tokio-stream" version = "0.1.11" @@ -4326,9 +4500,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fbbe89715c1dbbb790059e2565353978564924ee85017b5fff365c872ff6721f" dependencies = [ "once_cell", - "opentelemetry", + "opentelemetry 0.17.0", "tracing", "tracing-core", + "tracing-log", "tracing-subscriber", ] @@ -4338,10 +4513,14 @@ version = "0.3.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a6176eae26dd70d0c919749377897b54a9276bd7061339665dd68777926b5a70" dependencies = [ + "matchers", "nu-ansi-term", + "once_cell", + "regex", "sharded-slab", "smallvec", "thread_local", + "tracing", "tracing-core", "tracing-log", ] diff --git a/Cargo.toml b/Cargo.toml index 8d6788f..34cb6af 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,7 @@ members = [ # shared "shared/model", - "shared/bus", + "shared/channels", "shared/config", "shared/testx", # actors diff --git a/actors/account_manager/Cargo.toml b/actors/account_manager/Cargo.toml index cdb720f..4d3e8e4 100644 --- a/actors/account_manager/Cargo.toml +++ b/actors/account_manager/Cargo.toml @@ -11,19 +11,23 @@ path = "./src/main.rs" actix = { version = "0.13", features = [] } actix-rt = { version = "2.7", features = [] } bincode = { version = "1.3.3" } -bus = { path = "../../shared/bus" } bytes = { version = "1.2.1" } +channels = { path = "../../shared/channels" } config = { path = "../../shared/config" } database_manager = { path = "../database_manager" } dotenv = { version = "0.15.0" } futures = { version = "0.3.25" } +gumdrop = { version = "0.8.1" } json = { version = "0.12.4" } model = { path = "../../shared/model" } +opentelemetry = { version = "0.17.0" } +opentelemetry-jaeger = { version = "0.17.0" } pretty_env_logger = { version = "0.4", features = [] } rumqttc = { version = "*" } serde = { version = "1.0.137", features = ["derive"] } -tarpc = { version = "0.30.0", features = ["tokio1"] } +tarpc = { version = "0.30.0", features = ["tokio1", "serde-transport-bincode", "serde-transport", "serde", "serde-transport-json", "tcp"] } thiserror = { version = "1.0.31" } tokio = { version = "1.21.2", features = ['full'] } tracing = { version = "0.1.6" } -tracing-subscriber = { version = "0.3.16" } +tracing-opentelemetry = { version = "0.17.4" } +tracing-subscriber = { version = "0.3.16", features = ["env-filter"] } diff --git a/actors/account_manager/src/actions.rs b/actors/account_manager/src/actions.rs new file mode 100644 index 0000000..456dad0 --- /dev/null +++ b/actors/account_manager/src/actions.rs @@ -0,0 +1,82 @@ +use channels::account::{CreateAccount, MeResult}; +use config::SharedAppConfig; +use database_manager::Database; +use model::{Encrypt, FullAccount}; + +use crate::{Error, Result}; + +#[allow(unused)] +pub async fn me(account_id: model::AccountId, db: Database) -> MeResult { + use channels::account::Error; + + let msg = database_manager::FindAccount { account_id }; + let account: model::FullAccount = match msg.inner_find_account(db.pool().clone()).await { + Ok(account) => account, + Err(e) => { + tracing::error!("{}", e); + return MeResult { + error: Some(Error::Account), + ..Default::default() + }; + } + }; + let msg = database_manager::AccountAddresses { account_id }; + let addresses = match msg.inner_account_addresses(db.pool().clone()).await { + Ok(v) => v, + Err(e) => { + tracing::error!("{}", e); + return MeResult { + error: Some(Error::Addresses), + ..Default::default() + }; + } + }; + MeResult { + account: Some(account), + addresses: Some(addresses), + ..Default::default() + } +} + +pub async fn create_account( + msg: CreateAccount, + db: &Database, + config: SharedAppConfig, +) -> Result { + let hash = msg + .password + .encrypt(&config.lock().web().pass_salt()) + .map_err(|e| { + tracing::error!("{e:?}"); + Error::Hashing + })?; + + let mut t = db.pool().begin().await.map_err(|e| { + tracing::error!("{}", e); + Error::DbCritical + })?; + let account: FullAccount = match database_manager::create_account( + database_manager::CreateAccount { + email: msg.email, + login: msg.login, + pass_hash: model::PassHash::new(hash), + role: msg.role, + }, + &mut t, + ) + .await + { + Ok(r) => r, + Err(e) => { + tracing::error!("{}", e); + t.rollback().await.ok(); + return Err(Error::Saving); + } + }; + t.commit().await.map_err(|e| { + tracing::error!("{}", e); + Error::DbCritical + })?; + + Ok(account) +} diff --git a/actors/account_manager/src/bin/account-client.rs b/actors/account_manager/src/bin/account-client.rs new file mode 100644 index 0000000..682cf4d --- /dev/null +++ b/actors/account_manager/src/bin/account-client.rs @@ -0,0 +1,43 @@ +use std::net::SocketAddr; +use std::time::Duration; + +use config::UpdateConfig; +use tarpc::tokio_serde::formats::Json; +use tarpc::{client, context}; +use tokio::time::sleep; + +#[derive(gumdrop::Options)] +struct Flags { + help: bool, + /// Sets the name to say hello to. + name: String, +} + +impl UpdateConfig for Flags {} + +#[tokio::main] +async fn main() -> std::io::Result<()> { + let opts: Flags = gumdrop::Options::parse_args_default_or_exit(); + + let config = config::default_load(&opts); + let client = channels::account::rpc::create_client(config).await; + + let r = client.me(context::current(), 1.into()).await; + println!("{:?}", r); + + let hello = async move { + tokio::join! { + client.me(context::current(), 1.into()), + client.me(context::current(), 2.into()), + } + } + .await; + + eprintln!("{:?}", hello); + + // Let the background span processor finish. + sleep(Duration::from_micros(1)).await; + opentelemetry::global::shutdown_tracer_provider(); + + Ok(()) +} diff --git a/actors/account_manager/src/lib.rs b/actors/account_manager/src/lib.rs deleted file mode 100644 index 03cffa7..0000000 --- a/actors/account_manager/src/lib.rs +++ /dev/null @@ -1,164 +0,0 @@ -use actix::Addr; -use config::SharedAppConfig; -use database_manager::query_db; -use model::{Email, Encrypt, FullAccount, Login, Password, Role}; - -#[derive(Debug, Copy, Clone, serde::Serialize, thiserror::Error)] -#[serde(rename_all = "kebab-case", tag = "account")] -pub enum Error { - #[error("Unable to send or receive msg from database")] - DbCritical, - #[error("Failed to load account data")] - Account, - #[error("Failed to load account addresses")] - Addresses, - #[error("Unable to save record")] - Saving, - #[error("Unable to hash password")] - Hashing, - #[error("{0}")] - Db(#[from] database_manager::Error), -} - -pub type Result = std::result::Result; - -#[macro_export] -macro_rules! account_async_handler { - ($msg: ty, $async: ident, $res: ty) => { - impl actix::Handler<$msg> for AccountManager { - type Result = actix::ResponseActFuture>; - - fn handle(&mut self, msg: $msg, _ctx: &mut Self::Context) -> Self::Result { - use actix::WrapFuture; - let db = self.db.clone(); - let config = self.config.clone(); - Box::pin(async { $async(msg, db, config).await }.into_actor(self)) - } - } - }; -} - -#[macro_export] -macro_rules! query_account { - ($cart: expr, $msg: expr, default $fail: expr) => { - match $cart.send($msg).await { - Ok(Ok(r)) => r, - Ok(Err(e)) => { - tracing::error!("{e}"); - $fail - } - Err(e) => { - tracing::error!("{e:?}"); - $fail - } - } - }; - - ($cart: expr, $msg: expr, $fail: expr) => { - $crate::query_cart!($cart, $msg, $fail, $fail) - }; - - ($cart: expr, $msg: expr, $db_fail: expr, $act_fail: expr) => { - match $cart.send($msg).await { - Ok(Ok(r)) => r, - Ok(Err(e)) => { - tracing::error!("{e}"); - return Err($db_fail); - } - Err(e) => { - tracing::error!("{e:?}"); - return Err($act_fail); - } - } - }; -} - -pub struct AccountManager { - db: Addr, - config: SharedAppConfig, -} - -impl AccountManager { - pub fn new(config: SharedAppConfig, db: Addr) -> Self { - Self { config, db } - } -} - -impl actix::Actor for AccountManager { - type Context = actix::Context; -} - -pub struct MeResult { - pub account: FullAccount, - pub addresses: Vec, -} - -#[derive(actix::Message, Debug)] -#[rtype(result = "Result")] -pub struct Me { - pub account_id: model::AccountId, -} - -account_async_handler!(Me, me, MeResult); - -pub(crate) async fn me( - msg: Me, - db: Addr, - _config: SharedAppConfig, -) -> Result { - let account: FullAccount = query_db!( - db, - database_manager::FindAccount { - account_id: msg.account_id - }, - Error::Account - ); - let addresses = query_db!( - db, - database_manager::AccountAddresses { - account_id: msg.account_id - }, - Error::Addresses - ); - Ok(MeResult { account, addresses }) -} - -#[derive(actix::Message)] -#[rtype(result = "Result")] -pub struct CreateAccount { - pub email: Email, - pub login: Login, - pub password: Password, - pub role: Role, -} - -account_async_handler!(CreateAccount, create_account, FullAccount); - -pub(crate) async fn create_account( - msg: CreateAccount, - db: Addr, - config: SharedAppConfig, -) -> Result { - let hash = { - match msg.password.encrypt(&config.lock().web().pass_salt()) { - Ok(hash) => hash, - Err(e) => { - tracing::error!("{e:?}"); - return Err(Error::Hashing); - } - } - }; - - let account: FullAccount = query_db!( - db, - database_manager::CreateAccount { - email: msg.email, - login: msg.login, - pass_hash: model::PassHash::new(hash), - role: msg.role, - }, - Error::DbCritical, - Error::Saving - ); - Ok(account) -} diff --git a/actors/account_manager/src/main.rs b/actors/account_manager/src/main.rs index 200dcf5..e7484b0 100644 --- a/actors/account_manager/src/main.rs +++ b/actors/account_manager/src/main.rs @@ -1,12 +1,16 @@ #![feature(structural_match)] -use std::time::Duration; +use std::env; -use bus::account::{AccountFailure, CreateAccount, Topic}; -use config::{SharedAppConfig, UpdateConfig}; +use config::UpdateConfig; use database_manager::Database; -use model::{Encrypt, FullAccount}; -use rumqttc::{AsyncClient, Event, Incoming, MqttOptions, QoS}; +use tracing_subscriber::fmt::format::FmtSpan; +use tracing_subscriber::layer::SubscriberExt; +use tracing_subscriber::util::SubscriberInitExt; + +pub mod actions; +pub mod mqtt; +pub mod rpc; pub type Result = std::result::Result; @@ -33,7 +37,7 @@ impl UpdateConfig for Opts {} #[actix::main] async fn main() { dotenv::dotenv().ok(); - tracing_subscriber::fmt::init(); + init_tracing("account-manager"); let opts = Opts {}; @@ -41,213 +45,26 @@ async fn main() { let db = Database::build(config.clone()).await; - mqtt::start(config, &db).await; + let mqtt_client = mqtt::start(config.clone(), db.clone()).await; + rpc::start(config.clone(), db.clone(), mqtt_client.clone()).await; } -mod grpc { - use config::SharedAppConfig; - use database_manager::Database; - use futures::future::{self, Ready}; - use futures::prelude::*; - use futures::stream::StreamExt; - use json::JsonValue; - use tarpc::server::incoming::Incoming; - use tarpc::server::{self, Channel}; - use tarpc::{client, context}; +pub fn init_tracing(_service_name: &str) { + env::set_var("OTEL_BSP_MAX_EXPORT_BATCH_SIZE", "12"); - #[derive(Debug, Copy, Clone, serde::Serialize, thiserror::Error)] - #[serde(rename_all = "kebab-case", tag = "account")] - pub enum Error { - #[error("Unable to send or receive msg from database")] - DbCritical, - #[error("Failed to load account data")] - Account, - #[error("Failed to load account addresses")] - Addresses, - #[error("Unable to save record")] - Saving, - #[error("Unable to hash password")] - Hashing, - #[error("{0}")] - Db(#[from] database_manager::Error), - } + let tracer = { + use opentelemetry::sdk::export::trace::stdout::new_pipeline; + use opentelemetry::sdk::trace::Config; + new_pipeline() + .with_trace_config(Config::default()) + .with_pretty_print(true) + .install_simple() + }; - pub type Result = std::result::Result; - - pub struct MeResult { - pub account: model::FullAccount, - pub addresses: Vec, - } - - #[tarpc::service] - trait Accounts { - /// Returns a greeting for name. - async fn me(account_id: model::AccountId) -> String; - } - - #[derive(Clone)] - struct AccountsServer { - db: Database, - } - - impl Accounts for AccountsServer { - // Each defined rpc generates two items in the trait, a fn that serves the RPC, - // and an associated type representing the future output by the fn. - - type AccountsFut = Ready; - - fn me(self, _: context::Context, account_id: model::AccountId) -> Self::AccountsFut { - future::ready(format!("Hello, {name}!")) - } - } - - async fn me( - account_id: model::AccountId, - db: Database, - _config: SharedAppConfig, - ) -> Result { - let account: model::FullAccount = query_db!( - db, - database_manager::FindAccount { - account_id: msg.account_id - }, - Error::Account - ); - let addresses = query_db!( - db, - database_manager::AccountAddresses { - account_id: msg.account_id - }, - Error::Addresses - ); - Ok(MeResult { account, addresses }) - } - - async fn start(config: SharedAppConfig) { - let port = { config.lock().account_manager().port }; - } -} - -mod mqtt { - use std::time::Duration; - - use account_manager::CreateAccount; - use bus::account::{AccountFailure, Topic}; - use config::SharedAppConfig; - use database_manager::Database; - use model::{Encrypt, FullAccount}; - use rumqttc::{AsyncClient, Event, Incoming, MqttOptions, QoS}; - - use crate::{Error, Result}; - - pub async fn start(config: SharedAppConfig, db: &Database) { - let mut mqtt_options = MqttOptions::new(bus::account::CLIENT_NAME, "0.0.0.0", 1883); - mqtt_options.set_keep_alive(Duration::from_secs(5)); - - let (client, mut event_loop) = AsyncClient::new(mqtt_options, 10); - client - .subscribe(Topic::CreateAccount, QoS::AtLeastOnce) - .await - .unwrap(); - - let client = bus::AsyncClient(client); - loop { - let notification = event_loop.poll().await; - - match notification { - Ok(Event::Incoming(Incoming::Publish(publish))) => match publish.topic.as_str() { - topic if Topic::CreateAccount == topic => { - if let Ok(msg) = CreateAccount::try_from(publish.payload) { - match create_account(msg, &db, config.clone()).await { - Ok(account) => { - client - .publish_or_log( - Topic::AccountCreated, - QoS::AtLeastOnce, - true, - model::Account::from(account), - ) - .await; - } - Err(e) => { - tracing::error!("{}", e); - let m = match e { - Error::Hashing => { - Some(AccountFailure::FailedToHashPassword) - } - Error::Saving => Some(AccountFailure::SaveAccount), - Error::DbCritical => { - Some(AccountFailure::InternalServerError) - } - _ => None, - }; - if let Some(m) = m { - client - .publish_or_log( - Topic::SignUpFailure, - QoS::AtLeastOnce, - true, - m, - ) - .await; - } - } - } - } - } - _ => {} - }, - Ok(Event::Incoming(_incoming)) => {} - Ok(Event::Outgoing(_outgoing)) => {} - Err(e) => { - tracing::error!("{}", e); - } - } - } - } - - pub(crate) async fn create_account( - msg: CreateAccount, - db: &database_manager::Database, - config: SharedAppConfig, - ) -> Result { - let hash = { - match msg.password.encrypt(&config.lock().web().pass_salt()) { - Ok(hash) => hash, - Err(e) => { - tracing::error!("{e:?}"); - return Err(Error::Hashing); - } - } - }; - - let mut t = db.pool().begin().await.map_err(|e| { - tracing::error!("{}", e); - Error::DbCritical - })?; - let account: FullAccount = match database_manager::create_account( - database_manager::CreateAccount { - email: msg.email, - login: msg.login, - pass_hash: model::PassHash::new(hash), - role: msg.role, - }, - &mut t, - ) - .await - { - Ok(r) => r, - Err(e) => { - tracing::error!("{}", e); - t.rollback().await.ok(); - return Err(Error::Saving); - } - }; - t.commit().await.map_err(|e| { - tracing::error!("{}", e); - Error::DbCritical - })?; - - Ok(account) - } + tracing_subscriber::registry() + .with(tracing_subscriber::EnvFilter::from_default_env()) + .with(tracing_subscriber::fmt::layer().with_span_events(FmtSpan::NEW | FmtSpan::CLOSE)) + .with(tracing_opentelemetry::layer().with_tracer(tracer)) + .try_init() + .unwrap(); } diff --git a/actors/account_manager/src/mqtt.rs b/actors/account_manager/src/mqtt.rs new file mode 100644 index 0000000..b77d9c5 --- /dev/null +++ b/actors/account_manager/src/mqtt.rs @@ -0,0 +1,99 @@ +use std::time::Duration; + +use channels::account::{AccountFailure, CreateAccount, Topic}; +use config::SharedAppConfig; +use database_manager::Database; +use rumqttc::{AsyncClient, Event, Incoming, MqttOptions, QoS}; + +use crate::{actions, Error}; + +pub async fn start(config: SharedAppConfig, db: Database) -> channels::AsyncClient { + tracing::info!("Starting account mqtt at 0.0.0.0:1883"); + let mut mqtt_options = MqttOptions::new(channels::account::CLIENT_NAME, "0.0.0.0", 1883); + mqtt_options.set_keep_alive(Duration::from_secs(5)); + + let (client, mut event_loop) = AsyncClient::new(mqtt_options, 10); + client + .subscribe(Topic::CreateAccount, QoS::AtLeastOnce) + .await + .unwrap(); + + let client = channels::AsyncClient(client); + let spawn_client = client.clone(); + tokio::spawn(async move { + let client = spawn_client.clone(); + loop { + let notification = event_loop.poll().await; + + match notification { + Ok(Event::Incoming(Incoming::Publish(publish))) => match publish.topic.as_str() { + topic if Topic::CreateAccount == topic => { + if let Ok(channels::account::CreateAccount { + email, + login, + password, + role, + }) = channels::account::CreateAccount::try_from(publish.payload) + { + match actions::create_account( + CreateAccount { + email, + login, + password, + role, + }, + &db, + config.clone(), + ) + .await + { + Ok(account) => { + client + .publish_or_log( + Topic::AccountCreated, + QoS::AtLeastOnce, + true, + model::Account::from(account), + ) + .await; + } + Err(e) => { + tracing::error!("{}", e); + let m = match e { + Error::Hashing => { + Some(AccountFailure::FailedToHashPassword) + } + Error::Saving => Some(AccountFailure::SaveAccount), + Error::DbCritical => { + Some(AccountFailure::InternalServerError) + } + _ => None, + }; + if let Some(m) = m { + client + .publish_or_log( + Topic::SignUpFailure, + QoS::AtLeastOnce, + true, + m, + ) + .await; + } + } + } + } + } + _ => {} + }, + Ok(Event::Incoming(_incoming)) => {} + Ok(Event::Outgoing(_outgoing)) => {} + Err(e) => { + tracing::error!("{}", e); + } + } + } + }); + + client + // tracing::info!("Mqtt channel closed"); +} diff --git a/actors/account_manager/src/rpc.rs b/actors/account_manager/src/rpc.rs new file mode 100644 index 0000000..af430d1 --- /dev/null +++ b/actors/account_manager/src/rpc.rs @@ -0,0 +1,111 @@ +use std::net::{IpAddr, Ipv4Addr}; + +use channels::account::{CreateAccount, MeResult, RegisterResult}; +use channels::AsyncClient; +use config::SharedAppConfig; +use database_manager::Database; +use futures::future::{self}; +use futures::stream::StreamExt; +use rumqttc::QoS; +use tarpc::context; +use tarpc::server::incoming::Incoming; +use tarpc::server::{self, Channel}; +use tarpc::tokio_serde::formats::Json; + +use crate::actions; + +#[derive(Debug, Copy, Clone, serde::Serialize, thiserror::Error)] +#[serde(rename_all = "kebab-case", tag = "account")] +pub enum Error { + #[error("Unable to send or receive msg from database")] + DbCritical, + #[error("Failed to load account data")] + Account, + #[error("Failed to load account addresses")] + Addresses, + #[error("Unable to save record")] + Saving, + #[error("Unable to hash password")] + Hashing, + #[error("{0}")] + Db(#[from] database_manager::Error), +} + +#[derive(Clone)] +struct AccountsServer { + db: Database, + config: SharedAppConfig, + mqtt_client: AsyncClient, +} + +#[tarpc::server] +impl channels::account::rpc::Accounts for AccountsServer { + async fn me(self, _: context::Context, account_id: model::AccountId) -> MeResult { + let res = actions::me(account_id, self.db).await; + tracing::info!("ME result: {:?}", res); + res + } + + async fn register_account(self, _: context::Context, details: CreateAccount) -> RegisterResult { + let res = actions::create_account(details, &self.db, self.config).await; + tracing::info!("REGISTER result: {:?}", res); + match res { + Ok(account) => { + self.mqtt_client + .publish_or_log( + channels::account::Topic::AccountCreated, + QoS::AtLeastOnce, + true, + &account, + ) + .await; + RegisterResult { + account: Some(account), + error: None, + } + } + Err(_e) => RegisterResult { + account: None, + error: Some(channels::account::Error::Account), + }, + } + } +} + +pub async fn start(config: SharedAppConfig, db: Database, mqtt_client: AsyncClient) { + use channels::account::rpc::Accounts; + + let port = { config.lock().account_manager().port }; + + let server_addr = (IpAddr::V4(Ipv4Addr::LOCALHOST), port); + + let mut listener = tarpc::serde_transport::tcp::listen(&server_addr, Json::default) + .await + .unwrap(); + tracing::info!("Starting account rpc at {}", listener.local_addr()); + listener.config_mut().max_frame_length(usize::MAX); + listener + // Ignore accept errors. + .filter_map(|r| future::ready(r.ok())) + .map(server::BaseChannel::with_defaults) + // Limit channels to 8 per IP. + .max_channels_per_key(8, |t| t.transport().peer_addr().unwrap().ip()) + .max_concurrent_requests_per_channel(20) + // serve is generated by the service attribute. It takes as input any type implementing + // the generated World trait. + .map(|channel| { + channel.execute( + AccountsServer { + db: db.clone(), + config: config.clone(), + mqtt_client: mqtt_client.clone(), + } + .serve(), + ) + }) + // Max 10 channels. + .buffer_unordered(10) + .for_each(|_| async {}) + .await; + tracing::info!("RPC channel closed"); +} diff --git a/actors/cart_manager/Cargo.toml b/actors/cart_manager/Cargo.toml index ae41b0a..640acb2 100644 --- a/actors/cart_manager/Cargo.toml +++ b/actors/cart_manager/Cargo.toml @@ -6,7 +6,7 @@ edition = "2021" [dependencies] actix = { version = "0.13", features = [] } actix-rt = { version = "2.7", features = [] } -bus = { path = "../../shared/bus" } +channels = { path = "../../shared/channels" } chrono = { version = "0.4", features = ["serde"] } config = { path = "../../shared/config" } database_manager = { path = "../database_manager" } diff --git a/actors/database_manager/src/lib.rs b/actors/database_manager/src/lib.rs index 00077a5..d5f3ba3 100644 --- a/actors/database_manager/src/lib.rs +++ b/actors/database_manager/src/lib.rs @@ -43,22 +43,24 @@ macro_rules! db_async_handler { } }; ($msg: ty, $async: ident, $res: ty, $inner_async: ident) => { - async fn $inner_async(msg: $msg, pool: sqlx::PgPool) -> Result<$res> { - let mut t = pool.begin().await.map_err(|e| { - tracing::error!("{:?}", e); - $crate::Error::TransactionFailed - })?; - match $async(msg, &mut t).await { - Ok(res) => { - t.commit().await.map_err(|e| { - tracing::error!("{:?}", e); - $crate::Error::TransactionFailed - })?; - Ok(res) - } - Err(e) => { - let _ = t.rollback().await; - Err(e) + impl $msg { + pub async fn $inner_async(self, pool: sqlx::PgPool) -> Result<$res> { + let mut t = pool.begin().await.map_err(|e| { + tracing::error!("{:?}", e); + $crate::Error::TransactionFailed + })?; + match $async(self, &mut t).await { + Ok(res) => { + t.commit().await.map_err(|e| { + tracing::error!("{:?}", e); + $crate::Error::TransactionFailed + })?; + Ok(res) + } + Err(e) => { + let _ = t.rollback().await; + Err(e) + } } } } @@ -69,7 +71,7 @@ macro_rules! db_async_handler { fn handle(&mut self, msg: $msg, _ctx: &mut Self::Context) -> Self::Result { use actix::WrapFuture; let pool = self.pool.clone(); - Box::pin(async { $inner_async(msg, pool).await }.into_actor(self)) + Box::pin(async { msg.$inner_async(pool).await }.into_actor(self)) } } @@ -170,7 +172,7 @@ pub type Result = std::result::Result; #[derive(Clone)] pub struct Database { pool: PgPool, - config: SharedAppConfig, + _config: SharedAppConfig, } pub type SharedDatabase = actix::Addr; @@ -182,7 +184,10 @@ impl Database { tracing::error!("Failed to connect to database. {e:?}"); std::process::exit(1); }); - Self { pool, config } + Self { + pool, + _config: config, + } } pub fn pool(&self) -> &PgPool { diff --git a/api/Cargo.toml b/api/Cargo.toml index 45942e1..df2b82f 100644 --- a/api/Cargo.toml +++ b/api/Cargo.toml @@ -4,7 +4,6 @@ version = "0.1.0" edition = "2021" [dependencies] -account_manager = { path = "../actors/account_manager" } actix = { version = "0.13", features = [] } actix-broker = { version = "0.4", features = [] } actix-cors = { version = "0.6", features = [] } @@ -20,6 +19,7 @@ actix-web-opentelemetry = { version = "0.12", features = [] } async-trait = { version = "0.1", features = [] } bytes = { version = "1.1.0" } cart_manager = { path = "../actors/cart_manager" } +channels = { path = "../shared/channels" } chrono = { version = "0.4", features = ["serde"] } config = { path = "../shared/config" } database_manager = { path = "../actors/database_manager" } @@ -54,3 +54,4 @@ tracing = { version = "0.1.34" } tracing-subscriber = { version = "0.3.11" } uuid = { version = "1.2.1", features = ["serde"] } validator = { version = "0.14", features = [] } +tarpc = { version = "0.30.0", features = ["tokio1", "serde-transport-bincode", "serde-transport", "serde", "serde-transport-json", "tcp"] } diff --git a/api/src/main.rs b/api/src/main.rs index b2978e2..803f048 100644 --- a/api/src/main.rs +++ b/api/src/main.rs @@ -59,8 +59,7 @@ async fn server(opts: ServerOpts) -> Result<()> { .await .expect("Failed to initialize file system storage"); let cart_manager = cart_manager::CartManager::new(db.clone()).start(); - let account_manager = - account_manager::AccountManager::new(app_config.clone(), db.clone()).start(); + let account_manager = channels::account::rpc::create_client(app_config.clone()).await; let addr = { let l = app_config.lock(); let w = l.web(); diff --git a/api/src/routes/mod.rs b/api/src/routes/mod.rs index 3d960ab..7ae33f5 100644 --- a/api/src/routes/mod.rs +++ b/api/src/routes/mod.rs @@ -40,7 +40,6 @@ pub enum Error { CriticalFailure, Public(public::Error), Admin(admin::Error), - Account(account_manager::Error), Cart(cart_manager::Error), Database(database_manager::Error), Email(email_manager::Error), @@ -79,7 +78,6 @@ impl Display for Error { }) .unwrap_or_default(), Error::CriticalFailure => String::from("Something went wrong"), - Error::Account(_e) => serde_json::to_string(&self).unwrap_or_default(), Error::Cart(_e) => serde_json::to_string(&self).unwrap_or_default(), Error::Database(_e) => serde_json::to_string(&self).unwrap_or_default(), Error::Email(_e) => serde_json::to_string(&self).unwrap_or_default(), @@ -102,7 +100,6 @@ impl ResponseError for Error { } Error::Admin(_) => StatusCode::BAD_REQUEST, Error::Public(_) => StatusCode::BAD_REQUEST, - Error::Account(_) => StatusCode::BAD_REQUEST, Error::Cart(_) => StatusCode::BAD_REQUEST, Error::Database(_) => StatusCode::BAD_REQUEST, Error::Email(_) => StatusCode::BAD_REQUEST, diff --git a/api/src/routes/public/api_v1/restricted.rs b/api/src/routes/public/api_v1/restricted.rs index c9ea0fb..86ef6cd 100644 --- a/api/src/routes/public/api_v1/restricted.rs +++ b/api/src/routes/public/api_v1/restricted.rs @@ -1,4 +1,3 @@ -use account_manager::query_account; use actix::Addr; use actix_web::web::{scope, Data, Json, ServiceConfig}; use actix_web::{delete, get, post, put, HttpRequest, HttpResponse}; @@ -210,7 +209,7 @@ async fn delete_cart_item( #[get("/me")] pub(crate) async fn me( - account: Data>, + account: Data, tm: Data>, credentials: BearerAuth, ) -> routes::Result> { @@ -218,14 +217,14 @@ pub(crate) async fn me( .require_user(tm.into_inner()) .await? .account_id(); - let account_manager::MeResult { account, addresses } = query_account!( - account, - account_manager::Me { account_id }, - PublicError::DatabaseConnection.into(), - PublicError::DatabaseConnection.into() - ); - Ok(Json((account, addresses).into())) + match account.me(tarpc::context::current(), account_id).await { + Ok(me) => Ok(Json((me.account.unwrap(), me.addresses.unwrap()).into())), + Err(e) => { + tracing::error!("{}", e); + Err(routes::Error::CriticalFailure) + } + } } #[post("/order")] diff --git a/shared/bus/Cargo.toml b/shared/channels/Cargo.toml similarity index 61% rename from shared/bus/Cargo.toml rename to shared/channels/Cargo.toml index 121f194..b5b21ee 100644 --- a/shared/bus/Cargo.toml +++ b/shared/channels/Cargo.toml @@ -1,13 +1,15 @@ [package] -name = "bus" +name = "channels" version = "0.1.0" edition = "2021" [dependencies] -serde = { version = "*", features = ['derive'] } bincode = { version = "*" } -model = { path = "../model" } bytes = { version = "1.2.1" } +config = { path = "../config" } +model = { path = "../model" } +rumqttc = { version = "0.17.0" } +serde = { version = "*", features = ['derive'] } +tarpc = { version = "0.30.0", features = ["tokio1", "serde-transport-bincode", "serde-transport", "serde", "serde-transport-json", "tcp"] } thiserror = { version = "1.0.37" } tracing = { version = "0.1.37" } -rumqttc = { version = "0.17.0" } diff --git a/shared/bus/src/lib.rs b/shared/channels/src/lib.rs similarity index 65% rename from shared/bus/src/lib.rs rename to shared/channels/src/lib.rs index eb8d489..965e69c 100644 --- a/shared/bus/src/lib.rs +++ b/shared/channels/src/lib.rs @@ -1,5 +1,6 @@ #![feature(structural_match)] +#[derive(Clone)] pub struct AsyncClient(pub rumqttc::AsyncClient); impl AsyncClient { @@ -31,12 +32,16 @@ impl AsyncClient { pub mod account { use model::{Email, Login, Password, Role}; - #[derive(Debug, thiserror::Error)] + #[derive(Debug, thiserror::Error, serde::Serialize, serde::Deserialize)] pub enum Error { #[error("mqtt payload has invalid create account data")] InvalidCreateAccount, #[error("mqtt payload has invalid account failure data")] InvalidAccountFailure, + #[error("Account does not exists")] + Account, + #[error("Account does have any addresses")] + Addresses, } pub static CLIENT_NAME: &str = "account-manager"; @@ -115,4 +120,52 @@ pub mod account { }) } } + + #[derive(Debug, Default, serde::Serialize, serde::Deserialize)] + pub struct MeResult { + pub account: Option, + pub addresses: Option>, + pub error: Option, + } + + #[derive(Debug, Default, serde::Serialize, serde::Deserialize)] + pub struct RegisterResult { + pub account: Option, + pub error: Option, + } + + pub mod rpc { + use config::SharedAppConfig; + + #[tarpc::service] + pub trait Accounts { + /// Returns a greeting for name. + async fn me(account_id: model::AccountId) -> crate::account::MeResult; + + /// Creates new user account. + async fn register_account( + details: crate::account::CreateAccount, + ) -> crate::account::RegisterResult; + } + + pub async fn create_client(config: SharedAppConfig) -> AccountsClient { + use tarpc::client; + use tarpc::tokio_serde::formats::Json; + + let addr = { + let l = config.lock(); + (l.account_manager().bind.clone(), l.account_manager().port) + }; + + let transport = tarpc::serde_transport::tcp::connect(addr, Json::default); + + let client = AccountsClient::new( + client::Config::default(), + transport.await.expect("Failed to connect to server"), + ) + .spawn(); + + client + } + } } diff --git a/shared/config/Cargo.toml b/shared/config/Cargo.toml index 996bcc8..52a85d9 100644 --- a/shared/config/Cargo.toml +++ b/shared/config/Cargo.toml @@ -4,18 +4,18 @@ version = "0.1.0" edition = "2021" [dependencies] -serde = { version = "1.0", features = ["derive"] } -serde_json = { version = "1.0", features = [] } -toml = { version = "0.5", features = [] } + +actix-web = { version = "4.0", features = [] } parking_lot = { version = "0.12", features = [] } password-hash = { version = "0.4", features = ["alloc"] } pay_u = { version = '0.1', features = ["single-client"] } - -actix-web = { version = "4.0", features = [] } - -tracing = { version = "0.1.34" } +serde = { version = "1.0", features = ["derive"] } +serde_json = { version = "1.0", features = [] } thiserror = { version = "1.0" } +toml = { version = "0.5", features = [] } + +tracing = { version = "0.1.34" } diff --git a/shared/config/src/lib.rs b/shared/config/src/lib.rs index 4363a1f..4a42f2c 100644 --- a/shared/config/src/lib.rs +++ b/shared/config/src/lib.rs @@ -425,17 +425,24 @@ impl FilesConfig { #[derive(Debug, Serialize, Deserialize)] pub struct AccountManagerConfig { pub port: u16, + pub bind: String, } impl Default for AccountManagerConfig { fn default() -> Self { - Self { port: 19329 } + Self { + port: 19329, + bind: "0.0.0.0".into(), + } } } impl Example for AccountManagerConfig { fn example() -> Self { - Self { port: 19329 } + Self { + port: 19329, + bind: "0.0.0.0".into(), + } } }