diff --git a/Cargo.lock b/Cargo.lock index 3c28268..3256bee 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -14,11 +14,13 @@ dependencies = [ "config", "database_manager", "dotenv", - "fibers_rpc", + "futures 0.3.25", + "json", "model", "pretty_env_logger", "rumqttc", "serde", + "tarpc", "thiserror", "tokio", "tracing", @@ -1375,7 +1377,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "44533bbbb3bb3c1fa17d9f2e4e38bbbaf8396ba82193c4cb1b6445d711445d36" dependencies = [ "atty", - "humantime", + "humantime 1.3.0", "log", "regex", "termcolor", @@ -1997,6 +1999,12 @@ dependencies = [ "quick-error", ] +[[package]] +name = "humantime" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" + [[package]] name = "hyper" version = "0.14.20" @@ -2258,6 +2266,12 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "json" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "078e285eafdfb6c4b434e0d31e8cfcb5115b651496faca5749b88fafd4f23bfd" + [[package]] name = "jwt" version = "0.16.0" @@ -3889,6 +3903,12 @@ dependencies = [ "tokio-rustls", ] +[[package]] +name = "static_assertions" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" + [[package]] name = "stringprep" version = "0.1.2" @@ -3922,6 +3942,39 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "tarpc" +version = "0.30.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6dd84a0fdd485d04b67be6009a04603489c8cb00ade830e4dd2e3660bef855b1" +dependencies = [ + "anyhow", + "fnv", + "futures 0.3.25", + "humantime 2.1.0", + "opentelemetry", + "pin-project", + "rand", + "static_assertions", + "tarpc-plugins", + "thiserror", + "tokio", + "tokio-util 0.7.4", + "tracing", + "tracing-opentelemetry", +] + +[[package]] +name = "tarpc-plugins" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ee42b4e559f17bce0385ebf511a7beb67d5cc33c12c96b7f4e9789919d9c10f" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "tasque" version = "0.1.5" @@ -4201,6 +4254,7 @@ dependencies = [ "futures-core", "futures-sink", "pin-project-lite", + "slab", "tokio", "tracing", ] @@ -4265,6 +4319,19 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-opentelemetry" +version = "0.17.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fbbe89715c1dbbb790059e2565353978564924ee85017b5fff365c872ff6721f" +dependencies = [ + "once_cell", + "opentelemetry", + "tracing", + "tracing-core", + "tracing-subscriber", +] + [[package]] name = "tracing-subscriber" version = "0.3.16" diff --git a/actors/account_manager/Cargo.toml b/actors/account_manager/Cargo.toml index 724db7e..cdb720f 100644 --- a/actors/account_manager/Cargo.toml +++ b/actors/account_manager/Cargo.toml @@ -16,11 +16,13 @@ bytes = { version = "1.2.1" } config = { path = "../../shared/config" } database_manager = { path = "../database_manager" } dotenv = { version = "0.15.0" } -fibers_rpc = { version = "0.3.4", features = [] } +futures = { version = "0.3.25" } +json = { version = "0.12.4" } model = { path = "../../shared/model" } pretty_env_logger = { version = "0.4", features = [] } rumqttc = { version = "*" } serde = { version = "1.0.137", features = ["derive"] } +tarpc = { version = "0.30.0", features = ["tokio1"] } thiserror = { version = "1.0.31" } tokio = { version = "1.21.2", features = ['full'] } tracing = { version = "0.1.6" } diff --git a/actors/account_manager/src/main.rs b/actors/account_manager/src/main.rs index 5449cf8..200dcf5 100644 --- a/actors/account_manager/src/main.rs +++ b/actors/account_manager/src/main.rs @@ -4,6 +4,7 @@ use std::time::Duration; use bus::account::{AccountFailure, CreateAccount, Topic}; use config::{SharedAppConfig, UpdateConfig}; +use database_manager::Database; use model::{Encrypt, FullAccount}; use rumqttc::{AsyncClient, Event, Incoming, MqttOptions, QoS}; @@ -38,108 +39,215 @@ async fn main() { let config = config::default_load(&opts); - let db = database_manager::Database::build(config.clone()).await; + let db = Database::build(config.clone()).await; - let mut mqttoptions = MqttOptions::new(bus::account::CLIENT_NAME, "0.0.0.0", 1883); - mqttoptions.set_keep_alive(Duration::from_secs(5)); + mqtt::start(config, &db).await; +} - let (client, mut event_loop) = AsyncClient::new(mqttoptions, 10); - client - .subscribe(Topic::CreateAccount, QoS::AtLeastOnce) - .await - .unwrap(); +mod grpc { + use config::SharedAppConfig; + use database_manager::Database; + use futures::future::{self, Ready}; + use futures::prelude::*; + use futures::stream::StreamExt; + use json::JsonValue; + use tarpc::server::incoming::Incoming; + use tarpc::server::{self, Channel}; + use tarpc::{client, context}; - let client = bus::AsyncClient(client); - loop { - let notification = event_loop.poll().await; + #[derive(Debug, Copy, Clone, serde::Serialize, thiserror::Error)] + #[serde(rename_all = "kebab-case", tag = "account")] + pub enum Error { + #[error("Unable to send or receive msg from database")] + DbCritical, + #[error("Failed to load account data")] + Account, + #[error("Failed to load account addresses")] + Addresses, + #[error("Unable to save record")] + Saving, + #[error("Unable to hash password")] + Hashing, + #[error("{0}")] + Db(#[from] database_manager::Error), + } - match notification { - Ok(Event::Incoming(Incoming::Publish(publish))) => match publish.topic.as_str() { - topic if Topic::CreateAccount == topic => { - if let Ok(msg) = CreateAccount::try_from(publish.payload) { - match create_account(msg, &db, config.clone()).await { - Ok(account) => { - client - .publish_or_log( - Topic::AccountCreated, - QoS::AtLeastOnce, - true, - model::Account::from(account), - ) - .await; - } - Err(e) => { - tracing::error!("{}", e); - let m = match e { - Error::Hashing => Some(AccountFailure::FailedToHashPassword), - Error::Saving => Some(AccountFailure::SaveAccount), - Error::DbCritical => Some(AccountFailure::InternalServerError), - _ => None, - }; - if let Some(m) = m { - client.publish_or_log( - Topic::SignUpFailure, - QoS::AtLeastOnce, - true, - m, - ); + pub type Result = std::result::Result; + + pub struct MeResult { + pub account: model::FullAccount, + pub addresses: Vec, + } + + #[tarpc::service] + trait Accounts { + /// Returns a greeting for name. + async fn me(account_id: model::AccountId) -> String; + } + + #[derive(Clone)] + struct AccountsServer { + db: Database, + } + + impl Accounts for AccountsServer { + // Each defined rpc generates two items in the trait, a fn that serves the RPC, + // and an associated type representing the future output by the fn. + + type AccountsFut = Ready; + + fn me(self, _: context::Context, account_id: model::AccountId) -> Self::AccountsFut { + future::ready(format!("Hello, {name}!")) + } + } + + async fn me( + account_id: model::AccountId, + db: Database, + _config: SharedAppConfig, + ) -> Result { + let account: model::FullAccount = query_db!( + db, + database_manager::FindAccount { + account_id: msg.account_id + }, + Error::Account + ); + let addresses = query_db!( + db, + database_manager::AccountAddresses { + account_id: msg.account_id + }, + Error::Addresses + ); + Ok(MeResult { account, addresses }) + } + + async fn start(config: SharedAppConfig) { + let port = { config.lock().account_manager().port }; + } +} + +mod mqtt { + use std::time::Duration; + + use account_manager::CreateAccount; + use bus::account::{AccountFailure, Topic}; + use config::SharedAppConfig; + use database_manager::Database; + use model::{Encrypt, FullAccount}; + use rumqttc::{AsyncClient, Event, Incoming, MqttOptions, QoS}; + + use crate::{Error, Result}; + + pub async fn start(config: SharedAppConfig, db: &Database) { + let mut mqtt_options = MqttOptions::new(bus::account::CLIENT_NAME, "0.0.0.0", 1883); + mqtt_options.set_keep_alive(Duration::from_secs(5)); + + let (client, mut event_loop) = AsyncClient::new(mqtt_options, 10); + client + .subscribe(Topic::CreateAccount, QoS::AtLeastOnce) + .await + .unwrap(); + + let client = bus::AsyncClient(client); + loop { + let notification = event_loop.poll().await; + + match notification { + Ok(Event::Incoming(Incoming::Publish(publish))) => match publish.topic.as_str() { + topic if Topic::CreateAccount == topic => { + if let Ok(msg) = CreateAccount::try_from(publish.payload) { + match create_account(msg, &db, config.clone()).await { + Ok(account) => { + client + .publish_or_log( + Topic::AccountCreated, + QoS::AtLeastOnce, + true, + model::Account::from(account), + ) + .await; + } + Err(e) => { + tracing::error!("{}", e); + let m = match e { + Error::Hashing => { + Some(AccountFailure::FailedToHashPassword) + } + Error::Saving => Some(AccountFailure::SaveAccount), + Error::DbCritical => { + Some(AccountFailure::InternalServerError) + } + _ => None, + }; + if let Some(m) = m { + client + .publish_or_log( + Topic::SignUpFailure, + QoS::AtLeastOnce, + true, + m, + ) + .await; + } } } } } + _ => {} + }, + Ok(Event::Incoming(_incoming)) => {} + Ok(Event::Outgoing(_outgoing)) => {} + Err(e) => { + tracing::error!("{}", e); } - _ => {} - }, - Ok(Event::Incoming(_incoming)) => {} - Ok(Event::Outgoing(_outgoing)) => {} - Err(e) => { - tracing::error!("{}", e); } } } -} -pub(crate) async fn create_account( - msg: CreateAccount, - db: &database_manager::Database, - config: SharedAppConfig, -) -> Result { - let hash = { - match msg.password.encrypt(&config.lock().web().pass_salt()) { - Ok(hash) => hash, - Err(e) => { - tracing::error!("{e:?}"); - return Err(Error::Hashing); + pub(crate) async fn create_account( + msg: CreateAccount, + db: &database_manager::Database, + config: SharedAppConfig, + ) -> Result { + let hash = { + match msg.password.encrypt(&config.lock().web().pass_salt()) { + Ok(hash) => hash, + Err(e) => { + tracing::error!("{e:?}"); + return Err(Error::Hashing); + } } - } - }; + }; - let mut t = db.pool().begin().await.map_err(|e| { - tracing::error!("{}", e); - Error::DbCritical - })?; - let account: FullAccount = match database_manager::create_account( - database_manager::CreateAccount { - email: msg.email, - login: msg.login, - pass_hash: model::PassHash::new(hash), - role: msg.role, - }, - &mut t, - ) - .await - { - Ok(r) => r, - Err(e) => { + let mut t = db.pool().begin().await.map_err(|e| { tracing::error!("{}", e); - t.rollback().await.ok(); - return Err(Error::Saving); - } - }; - t.commit().await.map_err(|e| { - tracing::error!("{}", e); - Error::DbCritical - })?; + Error::DbCritical + })?; + let account: FullAccount = match database_manager::create_account( + database_manager::CreateAccount { + email: msg.email, + login: msg.login, + pass_hash: model::PassHash::new(hash), + role: msg.role, + }, + &mut t, + ) + .await + { + Ok(r) => r, + Err(e) => { + tracing::error!("{}", e); + t.rollback().await.ok(); + return Err(Error::Saving); + } + }; + t.commit().await.map_err(|e| { + tracing::error!("{}", e); + Error::DbCritical + })?; - Ok(account) + Ok(account) + } } diff --git a/actors/database_manager/src/lib.rs b/actors/database_manager/src/lib.rs index e00ec17..00077a5 100644 --- a/actors/database_manager/src/lib.rs +++ b/actors/database_manager/src/lib.rs @@ -167,6 +167,7 @@ pub enum Error { pub type Result = std::result::Result; +#[derive(Clone)] pub struct Database { pool: PgPool, config: SharedAppConfig, @@ -174,15 +175,6 @@ pub struct Database { pub type SharedDatabase = actix::Addr; -impl Clone for Database { - fn clone(&self) -> Self { - Self { - pool: self.pool.clone(), - config: self.config.clone(), - } - } -} - impl Database { pub async fn build(config: SharedAppConfig) -> Self { let url = config.lock().database().url(); diff --git a/shared/config/src/lib.rs b/shared/config/src/lib.rs index 2c7a6cb..4363a1f 100644 --- a/shared/config/src/lib.rs +++ b/shared/config/src/lib.rs @@ -380,7 +380,7 @@ impl SearchConfig { } } -#[derive(Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize)] pub struct FilesConfig { public_path: Option, local_path: Option, @@ -422,6 +422,23 @@ impl FilesConfig { } } +#[derive(Debug, Serialize, Deserialize)] +pub struct AccountManagerConfig { + pub port: u16, +} + +impl Default for AccountManagerConfig { + fn default() -> Self { + Self { port: 19329 } + } +} + +impl Example for AccountManagerConfig { + fn example() -> Self { + Self { port: 19329 } + } +} + #[derive(Serialize, Deserialize)] pub struct AppConfig { #[serde(default)] @@ -436,6 +453,8 @@ pub struct AppConfig { search: SearchConfig, #[serde(default)] files: FilesConfig, + #[serde(default)] + account_manager: AccountManagerConfig, #[serde(skip)] config_path: String, } @@ -449,6 +468,7 @@ impl Example for AppConfig { database: DatabaseConfig::example(), search: SearchConfig::example(), files: FilesConfig::example(), + account_manager: AccountManagerConfig::example(), config_path: "".to_string(), } } @@ -494,6 +514,10 @@ impl AppConfig { pub fn files(&self) -> &FilesConfig { &self.files } + + pub fn account_manager(&self) -> &AccountManagerConfig { + &self.account_manager + } } impl Default for AppConfig { @@ -505,6 +529,7 @@ impl Default for AppConfig { database: DatabaseConfig::default(), search: Default::default(), files: FilesConfig::default(), + account_manager: AccountManagerConfig::default(), config_path: "".to_string(), } }