From eb69682c67ecd5ca050ea40f5b322bad5c4632a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrian=20Wo=C5=BAniak?= Date: Sat, 5 Nov 2022 01:08:45 +0100 Subject: [PATCH] Rewrite email sender --- Cargo.lock | 38 ++-- config/{rumqqtd.conf => rumqttd.conf} | 8 + crates/account_manager/src/mqtt.rs | 23 +-- crates/account_manager/src/rpc.rs | 56 ++---- crates/api/Cargo.toml | 2 - crates/api/src/main.rs | 34 +++- crates/api/src/routes/mod.rs | 3 - crates/cart_manager/src/rpc.rs | 50 +---- crates/channels/Cargo.toml | 2 + crates/channels/src/accounts.rs | 46 +++-- crates/channels/src/carts.rs | 5 +- crates/channels/src/emails.rs | 85 ++++++++ crates/channels/src/lib.rs | 24 ++- crates/channels/src/mqtt.rs | 22 ++ crates/channels/src/rpc.rs | 40 ++++ crates/config/src/lib.rs | 40 +++- crates/email_manager/Cargo.toml | 16 +- ...reset-password.html => reset-password.hbs} | 13 +- .../assets/{welcome.html => welcome.hbs} | 10 +- crates/email_manager/src/actions.rs | 101 ++++++++++ crates/email_manager/src/lib.rs | 189 ------------------ crates/email_manager/src/main.rs | 94 +++++++++ crates/email_manager/src/mqtt.rs | 96 +++++++++ 23 files changed, 641 insertions(+), 356 deletions(-) rename config/{rumqqtd.conf => rumqttd.conf} (82%) create mode 100644 crates/channels/src/emails.rs create mode 100644 crates/channels/src/mqtt.rs create mode 100644 crates/channels/src/rpc.rs rename crates/email_manager/assets/{reset-password.html => reset-password.hbs} (66%) rename crates/email_manager/assets/{welcome.html => welcome.hbs} (75%) create mode 100644 crates/email_manager/src/actions.rs delete mode 100644 crates/email_manager/src/lib.rs create mode 100644 crates/email_manager/src/main.rs create mode 100644 crates/email_manager/src/mqtt.rs diff --git a/Cargo.lock b/Cargo.lock index d64cbfd..09f7846 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -672,14 +672,12 @@ dependencies = [ "actix-web-opentelemetry", "async-trait", "bytes", - "cart_manager", "channels", "chrono", "config", "database_manager", "derive_more", "dotenv", - "email_manager", "fs_manager", "futures 0.3.25", "futures-util", @@ -884,11 +882,13 @@ dependencies = [ "bincode", "bytes", "config", + "futures 0.3.25", "model", "rumqttc", "serde", "tarpc", "thiserror", + "tokio", "tracing", ] @@ -1370,17 +1370,25 @@ version = "0.1.0" dependencies = [ "actix 0.13.0", "actix-rt", + "channels", "chrono", "config", + "dotenv", + "handlebars", "model", + "opentelemetry 0.17.0", + "opentelemetry-jaeger", "pretty_env_logger", "rumqttc", "sendgrid", "serde", "serde_json", + "tarpc", "thiserror", - "tinytemplate", + "tokio", "tracing", + "tracing-opentelemetry", + "tracing-subscriber", "uuid 0.8.2", ] @@ -1912,6 +1920,20 @@ dependencies = [ "tracing", ] +[[package]] +name = "handlebars" +version = "4.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "433e4ab33f1213cdc25b5fa45c76881240cfe79284cf2b395e8b9e312a30a2fd" +dependencies = [ + "log", + "pest", + "pest_derive", + "serde", + "serde_json", + "thiserror", +] + [[package]] name = "hashbrown" version = "0.12.3" @@ -4280,16 +4302,6 @@ dependencies = [ "displaydoc", ] -[[package]] -name = "tinytemplate" -version = "1.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "be4d6b5f19ff7664e8c98d03e2139cb510db9b0a60b55f8e8709b689d939b6bc" -dependencies = [ - "serde", - "serde_json", -] - [[package]] name = "tinyvec" version = "1.6.0" diff --git a/config/rumqqtd.conf b/config/rumqttd.conf similarity index 82% rename from config/rumqqtd.conf rename to config/rumqttd.conf index 773f8cf..f15778e 100644 --- a/config/rumqqtd.conf +++ b/config/rumqttd.conf @@ -23,5 +23,13 @@ next_connection_delay_ms = 1 max_inflight_count = 200 max_inflight_size = 1024 +[replicator] +connection_timeout_ms = 100 +max_client_id_len = 256 +throttle_delay_ms = 0 +max_payload_size = 2048 +max_inflight_count = 500 +max_inflight_size = 1024 + [console] listen = "0.0.0.0:3030" diff --git a/crates/account_manager/src/mqtt.rs b/crates/account_manager/src/mqtt.rs index 079e484..c577a31 100644 --- a/crates/account_manager/src/mqtt.rs +++ b/crates/account_manager/src/mqtt.rs @@ -1,29 +1,12 @@ -use std::time::Duration; - -use channels::accounts::Topic; use config::SharedAppConfig; -use rumqttc::{Event, Incoming, QoS}; +use rumqttc::{Event, Incoming}; use crate::db::Database; pub async fn start(config: SharedAppConfig, _db: Database) -> channels::AsyncClient { - let mut mqtt_options = { - let l = config.lock(); - let bind = &l.account_manager().mqtt_bind; - let port = l.account_manager().mqtt_port; - tracing::info!("Starting account mqtt at {}:{}", bind, port); + let (client, mut event_loop) = + channels::mqtt::create_client(channels::accounts::CLIENT_NAME, config); - rumqttc::MqttOptions::new(channels::accounts::CLIENT_NAME, bind, port) - }; - mqtt_options.set_keep_alive(Duration::from_secs(5)); - - let (client, mut event_loop) = rumqttc::AsyncClient::new(mqtt_options, 10); - client - .subscribe(Topic::CreateAccount, QoS::AtLeastOnce) - .await - .unwrap(); - - let client = channels::AsyncClient(client); let spawn_client = client.clone(); tokio::spawn(async move { let _client = spawn_client.clone(); diff --git a/crates/account_manager/src/rpc.rs b/crates/account_manager/src/rpc.rs index 869c2a8..5d0e355 100644 --- a/crates/account_manager/src/rpc.rs +++ b/crates/account_manager/src/rpc.rs @@ -1,16 +1,8 @@ -use std::net::{IpAddr, Ipv4Addr}; - use channels::accounts::rpc::Accounts; use channels::accounts::{me, register}; use channels::AsyncClient; use config::SharedAppConfig; -use futures::future::{self}; -use futures::stream::StreamExt; -use rumqttc::QoS; use tarpc::context; -use tarpc::server::incoming::Incoming; -use tarpc::server::{self, Channel}; -use tarpc::tokio_serde::formats::Bincode; use crate::actions; use crate::db::Database; @@ -50,15 +42,13 @@ impl Accounts for AccountsServer { _: context::Context, input: register::Input, ) -> register::Output { - use channels::accounts::{Error, Topic}; + use channels::accounts::Error; let res = actions::create_account(input, &self.db, self.config).await; tracing::info!("REGISTER result: {:?}", res); match res { Ok(account) => { - self.mqtt_client - .publish_or_log(Topic::AccountCreated, QoS::AtLeastOnce, true, &account) - .await; + self.mqtt_client.emit_account_created(&account).await; register::Output { account: Some(account), error: None, @@ -73,37 +63,15 @@ impl Accounts for AccountsServer { } pub async fn start(config: SharedAppConfig, db: Database, mqtt_client: AsyncClient) { - let port = { config.lock().account_manager().port }; + let port = { config.lock().account_manager().rpc_port }; - let server_addr = (IpAddr::V4(Ipv4Addr::LOCALHOST), port); - - let mut listener = tarpc::serde_transport::tcp::listen(&server_addr, Bincode::default) - .await - .unwrap(); - tracing::info!("Starting account rpc at {}", listener.local_addr()); - listener.config_mut().max_frame_length(usize::MAX); - listener - // Ignore accept errors. - .filter_map(|r| future::ready(r.ok())) - .map(server::BaseChannel::with_defaults) - // Limit channels to 8 per IP. - .max_channels_per_key(8, |t| t.transport().peer_addr().unwrap().ip()) - .max_concurrent_requests_per_channel(20) - // serve is generated by the service attribute. It takes as input any type implementing - // the generated World trait. - .map(|channel| { - channel.execute( - AccountsServer { - db: db.clone(), - config: config.clone(), - mqtt_client: mqtt_client.clone(), - } - .serve(), - ) - }) - // Max 10 channels. - .buffer_unordered(10) - .for_each(|_| async {}) - .await; - tracing::info!("RPC channel closed"); + channels::rpc::start("accounts", port, || { + AccountsServer { + db: db.clone(), + config: config.clone(), + mqtt_client: mqtt_client.clone(), + } + .serve() + }) + .await; } diff --git a/crates/api/Cargo.toml b/crates/api/Cargo.toml index 7bbe3fc..f3787ad 100644 --- a/crates/api/Cargo.toml +++ b/crates/api/Cargo.toml @@ -18,14 +18,12 @@ actix-web-httpauth = { version = "0.6", features = [] } actix-web-opentelemetry = { version = "0.12", features = [] } async-trait = { version = "0.1", features = [] } bytes = { version = "1.1.0" } -cart_manager = { path = "../cart_manager" } channels = { path = "../channels" } chrono = { version = "0.4", features = ["serde"] } config = { path = "../config" } database_manager = { path = "../database_manager" } derive_more = { version = "0.99", features = [] } dotenv = { version = "0.15", features = [] } -email_manager = { path = "../email_manager" } fs_manager = { path = "../fs_manager" } futures = { version = "0.3", features = [] } futures-util = { version = "0.3", features = [] } diff --git a/crates/api/src/main.rs b/crates/api/src/main.rs index 343e42e..781436e 100644 --- a/crates/api/src/main.rs +++ b/crates/api/src/main.rs @@ -10,13 +10,13 @@ use actix_web::middleware::Logger; use actix_web::web::Data; use actix_web::{App, HttpServer}; use config::UpdateConfig; -use email_manager::TestMail; use jemallocator::Jemalloc; -use model::{Email, Encrypt, Login, PassHash, Password, Role}; +use model::{AccountState, Email, Encrypt, Login, PassHash, Password, Role}; use opts::{ Command, CreateAccountCmd, CreateAccountOpts, GenerateHashOpts, Opts, ServerOpts, TestMailerOpts, }; +use rumqttc::Outgoing; use validator::{validate_email, validate_length}; use crate::opts::ReIndexOpts; @@ -169,16 +169,28 @@ async fn test_mailer(opts: TestMailerOpts) -> Result<()> { let config = config::default_load(&opts); opts.update_config(&mut *config.lock()); - let manager = email_manager::EmailManager::build(config) - .expect("Invalid email manager config") - .start(); - manager - .send(TestMail { - receiver: opts.receiver.expect("e-mail address is required"), + let (client, mut event_loop) = channels::mqtt::create_client("bazzar", config); + client + .emit_test(&model::Account { + id: 0.into(), + email: opts.receiver.unwrap(), + login: Login::new("test email"), + role: Role::Admin, + customer_id: Default::default(), + state: AccountState::Active, }) - .await - .expect("Failed to execute actor") - .expect("Failed to send email"); + .await; + + loop { + let msg = event_loop.poll().await.unwrap(); + tracing::info!("{:?}", msg); + + if let rumqttc::Event::Outgoing(Outgoing::PubAck(_)) = msg { + client.0.disconnect().await.unwrap(); + break; + } + } + println!("Success!"); Ok(()) } diff --git a/crates/api/src/routes/mod.rs b/crates/api/src/routes/mod.rs index 586668d..a8065b5 100644 --- a/crates/api/src/routes/mod.rs +++ b/crates/api/src/routes/mod.rs @@ -41,7 +41,6 @@ pub enum Error { Public(public::Error), Admin(admin::Error), Database(database_manager::Error), - Email(email_manager::Error), Fs(fs_manager::Error), Order(order_manager::Error), Pay(payment_manager::Error), @@ -78,7 +77,6 @@ impl Display for Error { .unwrap_or_default(), Error::CriticalFailure => String::from("Something went wrong"), Error::Database(_e) => serde_json::to_string(&self).unwrap_or_default(), - Error::Email(_e) => serde_json::to_string(&self).unwrap_or_default(), Error::Fs(_e) => serde_json::to_string(&self).unwrap_or_default(), Error::Order(_e) => serde_json::to_string(&self).unwrap_or_default(), Error::Pay(_e) => serde_json::to_string(&self).unwrap_or_default(), @@ -99,7 +97,6 @@ impl ResponseError for Error { Error::Admin(_) => StatusCode::BAD_REQUEST, Error::Public(_) => StatusCode::BAD_REQUEST, Error::Database(_) => StatusCode::BAD_REQUEST, - Error::Email(_) => StatusCode::BAD_REQUEST, Error::Fs(_) => StatusCode::BAD_REQUEST, Error::Order(_) => StatusCode::BAD_REQUEST, Error::Pay(_) => StatusCode::BAD_REQUEST, diff --git a/crates/cart_manager/src/rpc.rs b/crates/cart_manager/src/rpc.rs index 60c670f..1f00abb 100644 --- a/crates/cart_manager/src/rpc.rs +++ b/crates/cart_manager/src/rpc.rs @@ -1,14 +1,8 @@ -use std::net::{IpAddr, Ipv4Addr}; - use channels::carts::modify_item::{Input, Output}; use channels::carts::rpc::Carts; use channels::AsyncClient; use config::SharedAppConfig; -use futures::{future, StreamExt}; -use tarpc::server::incoming::Incoming; -use tarpc::server::Channel; -use tarpc::tokio_serde::formats::Bincode; -use tarpc::{context, server}; +use tarpc::context; use crate::db::Database; @@ -51,37 +45,15 @@ impl Carts for CartsServer { } pub async fn start(config: SharedAppConfig, db: Database, mqtt_client: AsyncClient) { - let port = { config.lock().cart_manager().port }; + let port = { config.lock().cart_manager().rpc_port }; - let server_addr = (IpAddr::V4(Ipv4Addr::LOCALHOST), port); - - let mut listener = tarpc::serde_transport::tcp::listen(&server_addr, Bincode::default) - .await - .unwrap(); - tracing::info!("Starting account rpc at {}", listener.local_addr()); - listener.config_mut().max_frame_length(usize::MAX); - listener - // Ignore accept errors. - .filter_map(|r| future::ready(r.ok())) - .map(server::BaseChannel::with_defaults) - // Limit channels to 8 per IP. - .max_channels_per_key(8, |t| t.transport().peer_addr().unwrap().ip()) - .max_concurrent_requests_per_channel(20) - // serve is generated by the service attribute. It takes as input any type implementing - // the generated World trait. - .map(|channel| { - channel.execute( - CartsServer { - db: db.clone(), - _config: config.clone(), - _mqtt_client: mqtt_client.clone(), - } - .serve(), - ) - }) - // Max 10 channels. - .buffer_unordered(10) - .for_each(|_| async {}) - .await; - tracing::info!("RPC channel closed"); + channels::rpc::start("carts", port, || { + CartsServer { + db: db.clone(), + _config: config.clone(), + _mqtt_client: mqtt_client.clone(), + } + .serve() + }) + .await; } diff --git a/crates/channels/Cargo.toml b/crates/channels/Cargo.toml index b5b21ee..6974433 100644 --- a/crates/channels/Cargo.toml +++ b/crates/channels/Cargo.toml @@ -7,9 +7,11 @@ edition = "2021" bincode = { version = "*" } bytes = { version = "1.2.1" } config = { path = "../config" } +futures = { version = "0.3.25" } model = { path = "../model" } rumqttc = { version = "0.17.0" } serde = { version = "*", features = ['derive'] } tarpc = { version = "0.30.0", features = ["tokio1", "serde-transport-bincode", "serde-transport", "serde", "serde-transport-json", "tcp"] } thiserror = { version = "1.0.37" } +tokio = { version = "1.21.2", features = ['full'] } tracing = { version = "0.1.37" } diff --git a/crates/channels/src/accounts.rs b/crates/channels/src/accounts.rs index cabe01a..184f897 100644 --- a/crates/channels/src/accounts.rs +++ b/crates/channels/src/accounts.rs @@ -1,3 +1,9 @@ +use bytes::Bytes; +use rumqttc::QoS; +use serde::de::DeserializeOwned; + +use crate::{AsyncClient, DeserializePayload}; + #[derive(Debug, thiserror::Error, serde::Serialize, serde::Deserialize)] pub enum Error { #[error("mqtt payload has invalid create account data")] @@ -14,9 +20,23 @@ pub static CLIENT_NAME: &str = "account-manager"; #[derive(Copy, Clone, Debug, PartialOrd, PartialEq, serde::Serialize, serde::Deserialize)] pub enum Topic { - CreateAccount, AccountCreated, - SignUpFailure, +} + +impl DeserializePayload for Topic { + fn deserialize_payload(self, bytes: Bytes) -> Option { + match self { + Topic::AccountCreated => bincode::deserialize(bytes.as_ref()).ok(), + } + } +} + +impl Topic { + pub fn to_str(self) -> &'static str { + match self { + Topic::AccountCreated => "account/created", + } + } } impl Into for Topic { @@ -37,16 +57,6 @@ impl PartialEq for Topic { } } -impl Topic { - pub fn to_str(self) -> &'static str { - match self { - Topic::CreateAccount => "account/create", - Topic::AccountCreated => "account/created", - Topic::SignUpFailure => "account/failure", - } - } -} - pub mod register { use model::{Email, Login, Password, Role}; @@ -92,6 +102,13 @@ pub mod me { } } +impl AsyncClient { + pub async fn emit_account_created(&self, account: &model::FullAccount) { + self.publish_or_log(Topic::AccountCreated, QoS::AtLeastOnce, true, account) + .await + } +} + pub mod rpc { use config::SharedAppConfig; @@ -112,7 +129,10 @@ pub mod rpc { let addr = { let l = config.lock(); - (l.account_manager().bind.clone(), l.account_manager().port) + ( + l.account_manager().bind.clone(), + l.account_manager().rpc_port, + ) }; let transport = tarpc::serde_transport::tcp::connect(addr, Bincode::default); diff --git a/crates/channels/src/carts.rs b/crates/channels/src/carts.rs index 1d20cab..3566cd6 100644 --- a/crates/channels/src/carts.rs +++ b/crates/channels/src/carts.rs @@ -188,7 +188,10 @@ pub mod rpc { let addr = { let l = config.lock(); - (l.account_manager().bind.clone(), l.account_manager().port) + ( + l.account_manager().bind.clone(), + l.account_manager().rpc_port, + ) }; let transport = tarpc::serde_transport::tcp::connect(addr, Bincode::default); diff --git a/crates/channels/src/emails.rs b/crates/channels/src/emails.rs new file mode 100644 index 0000000..e141a51 --- /dev/null +++ b/crates/channels/src/emails.rs @@ -0,0 +1,85 @@ +use bytes::Bytes; +use rumqttc::QoS; +use serde::de::DeserializeOwned; + +use crate::{AsyncClient, DeserializePayload}; + +impl AsyncClient { + pub async fn emit_reset_password(&self, account: &reset_password::Input) { + self.publish_or_log(Topic::ResetPassword, QoS::AtLeastOnce, true, account) + .await; + } + + pub async fn emit_test(&self, account: &model::Account) { + self.publish_or_log(Topic::Test, QoS::AtLeastOnce, false, account) + .await; + } +} + +#[derive(Debug, thiserror::Error, serde::Serialize, serde::Deserialize)] +pub enum Error {} + +pub static CLIENT_NAME: &str = "email-sender"; + +#[derive(Copy, Clone, Debug, PartialOrd, PartialEq, serde::Serialize, serde::Deserialize)] +pub enum Topic { + ResetPassword, + Test, +} + +impl Topic { + pub fn to_str(self) -> &'static str { + match self { + Topic::ResetPassword => "emails/reset-password", + Topic::Test => "emails/test", + } + } +} + +impl DeserializePayload for Topic { + fn deserialize_payload(self, bytes: Bytes) -> Option { + bincode::deserialize(bytes.as_ref()).ok() + } +} + +impl Into for Topic { + fn into(self) -> String { + String::from(self.to_str()) + } +} + +impl<'s> PartialEq<&'s str> for Topic { + fn eq(&self, other: &&'s str) -> bool { + self.to_str() == *other + } +} + +impl PartialEq for Topic { + fn eq(&self, other: &String) -> bool { + self.to_str() == other.as_str() + } +} + +pub mod test_mail { + #[derive(Debug)] + pub struct Input { + pub receiver: model::Email, + } +} + +pub mod reset_password { + #[derive(Debug, serde::Serialize, serde::Deserialize)] + pub struct Input { + pub login: model::Login, + pub email: model::Email, + pub reset_token: model::ResetToken, + } +} + +pub mod welcome { + #[derive(Debug)] + pub struct Input { + pub login: model::Login, + pub email: model::Email, + } +} diff --git a/crates/channels/src/lib.rs b/crates/channels/src/lib.rs index d6df606..6256dc2 100644 --- a/crates/channels/src/lib.rs +++ b/crates/channels/src/lib.rs @@ -2,24 +2,38 @@ pub mod accounts; pub mod carts; +pub mod emails; +pub mod mqtt; +pub mod rpc; + +pub trait DeserializePayload { + fn deserialize_payload(self, bytes: bytes::Bytes) -> Option; +} #[derive(Clone)] pub struct AsyncClient(pub rumqttc::AsyncClient); impl AsyncClient { - pub async fn publish, T: serde::Serialize>( + pub(crate) async fn publish, T: serde::Serialize>( &self, topic: Topic, qos: rumqttc::QoS, retain: bool, t: T, ) -> Result<(), rumqttc::ClientError> { - let v = bincode::serialize(&t).unwrap_or_default(); - let bytes = bytes::Bytes::copy_from_slice(&v); - self.0.publish_bytes(topic, qos, retain, bytes).await + match bincode::serialize(&t) { + Ok(v) => { + let bytes = bytes::Bytes::from(v); + self.0.publish_bytes(topic, qos, retain, bytes).await + } + Err(e) => { + tracing::error!("{}", e); + Ok(()) + } + } } - pub async fn publish_or_log, T: serde::Serialize>( + pub(crate) async fn publish_or_log, T: serde::Serialize>( &self, topic: Topic, qos: rumqttc::QoS, diff --git a/crates/channels/src/mqtt.rs b/crates/channels/src/mqtt.rs new file mode 100644 index 0000000..77208ea --- /dev/null +++ b/crates/channels/src/mqtt.rs @@ -0,0 +1,22 @@ +use std::time::Duration; + +use config::SharedAppConfig; +use rumqttc::EventLoop; + +use crate::AsyncClient; + +pub fn create_client(name: &str, config: SharedAppConfig) -> (AsyncClient, EventLoop) { + let mut mqtt_options = { + let l = config.lock(); + let bind = &l.account_manager().mqtt_bind; + let port = l.account_manager().mqtt_port; + tracing::info!("Starting account mqtt at {}:{}", bind, port); + + rumqttc::MqttOptions::new(name, bind, port) + }; + mqtt_options.set_keep_alive(Duration::from_secs(5)); + + let (client, event_loop) = rumqttc::AsyncClient::new(mqtt_options, 10); + + (AsyncClient(client), event_loop) +} diff --git a/crates/channels/src/rpc.rs b/crates/channels/src/rpc.rs new file mode 100644 index 0000000..ac75ca6 --- /dev/null +++ b/crates/channels/src/rpc.rs @@ -0,0 +1,40 @@ +use std::net::{IpAddr, Ipv4Addr}; + +use futures::StreamExt; +use tarpc::server; +use tarpc::server::incoming::Incoming; +use tarpc::server::{Channel, Serve}; +use tarpc::tokio_serde::formats::Bincode; + +pub async fn start(name: &str, port: u16, build: Build) +where + Server: Serve + Send + 'static + Clone, + Build: Fn() -> Server, + >::Fut: Send, + >::Resp: serde::Serialize + Send + 'static, + Req: Send + 'static, + Req: for<'l> serde::Deserialize<'l>, +{ + let server_addr = (IpAddr::V4(Ipv4Addr::LOCALHOST), port); + + let mut listener = tarpc::serde_transport::tcp::listen(&server_addr, Bincode::default) + .await + .unwrap(); + tracing::info!("Starting {} rpc at {}", name, listener.local_addr()); + listener.config_mut().max_frame_length(usize::MAX); + listener + // Ignore accept errors. + .filter_map(|r| futures::future::ready(r.ok())) + .map(server::BaseChannel::with_defaults) + // Limit channels to 8 per IP. + .max_channels_per_key(8, |t| t.transport().peer_addr().unwrap().ip()) + .max_concurrent_requests_per_channel(20) + // serve is generated by the service attribute. It takes as input any type implementing + // the generated World trait. + .map(|channel| channel.execute(build())) + // Max 10 channels. + .buffer_unordered(10) + .for_each(|_| async {}) + .await; + tracing::info!("RPC channel closed"); +} diff --git a/crates/config/src/lib.rs b/crates/config/src/lib.rs index 55f9235..2e9db13 100644 --- a/crates/config/src/lib.rs +++ b/crates/config/src/lib.rs @@ -409,7 +409,7 @@ impl FilesConfig { #[derive(Debug, Serialize, Deserialize)] pub struct AccountManagerConfig { - pub port: u16, + pub rpc_port: u16, pub bind: String, pub mqtt_port: u16, pub mqtt_bind: String, @@ -419,7 +419,7 @@ pub struct AccountManagerConfig { impl Default for AccountManagerConfig { fn default() -> Self { Self { - port: 19329, + rpc_port: 19329, bind: "0.0.0.0".into(), mqtt_port: 1883, mqtt_bind: "0.0.0.0".into(), @@ -432,7 +432,7 @@ impl Example for AccountManagerConfig {} #[derive(Debug, Serialize, Deserialize)] pub struct CartManagerConfig { - pub port: u16, + pub rpc_port: u16, pub bind: String, pub mqtt_port: u16, pub mqtt_bind: String, @@ -442,7 +442,7 @@ pub struct CartManagerConfig { impl Default for CartManagerConfig { fn default() -> Self { Self { - port: 19330, + rpc_port: 19330, bind: "0.0.0.0".into(), mqtt_port: 1884, mqtt_bind: "0.0.0.0".into(), @@ -453,6 +453,28 @@ impl Default for CartManagerConfig { impl Example for CartManagerConfig {} +#[derive(Debug, Serialize, Deserialize)] +pub struct EmailSenderConfig { + pub rpc_port: u16, + pub bind: String, + pub mqtt_port: u16, + pub mqtt_bind: String, + pub database_url: String, +} + +impl Default for EmailSenderConfig { + fn default() -> Self { + Self { + rpc_port: 19331, + bind: "0.0.0.0".into(), + mqtt_port: 1885, + mqtt_bind: "0.0.0.0".into(), + database_url: "postgres://postgres@localhost/bazzar_emails".into(), + } + } +} +impl Example for EmailSenderConfig {} + #[derive(Serialize, Deserialize)] pub struct AppConfig { #[serde(default)] @@ -471,6 +493,8 @@ pub struct AppConfig { account_manager: AccountManagerConfig, #[serde(default)] cart_manager: CartManagerConfig, + #[serde(default)] + email_sender: EmailSenderConfig, #[serde(skip)] config_path: String, } @@ -485,7 +509,8 @@ impl Example for AppConfig { search: SearchConfig::example(), files: FilesConfig::example(), account_manager: AccountManagerConfig::example(), - cart_manager: Default::default(), + cart_manager: CartManagerConfig::example(), + email_sender: EmailSenderConfig::example(), config_path: "".to_string(), } } @@ -539,6 +564,10 @@ impl AppConfig { pub fn cart_manager(&self) -> &CartManagerConfig { &self.cart_manager } + + pub fn email_sender(&self) -> &EmailSenderConfig { + &self.email_sender + } } impl Default for AppConfig { @@ -552,6 +581,7 @@ impl Default for AppConfig { files: FilesConfig::default(), account_manager: AccountManagerConfig::default(), cart_manager: Default::default(), + email_sender: Default::default(), config_path: "".to_string(), } } diff --git a/crates/email_manager/Cargo.toml b/crates/email_manager/Cargo.toml index 100886e..0790697 100644 --- a/crates/email_manager/Cargo.toml +++ b/crates/email_manager/Cargo.toml @@ -3,18 +3,30 @@ name = "email_manager" version = "0.1.0" edition = "2021" +[[bin]] +name = "email-sender" +path = "./src/main.rs" + [dependencies] actix = { version = "0.13", features = [] } actix-rt = { version = "2.7", features = [] } +channels = { path = "../channels" } chrono = { version = "0.4", features = ["serde"] } config = { path = "../config" } model = { path = "../model" } +opentelemetry = { version = "0.17.0" } +opentelemetry-jaeger = { version = "0.17.0" } pretty_env_logger = { version = "0.4", features = [] } rumqttc = { version = "*" } sendgrid = { version = "0.17", features = ["async"] } serde = { version = "1.0", features = ["derive"] } serde_json = { version = "1.0", features = [] } +tarpc = { version = "0.30.0", features = ["tokio1", "serde-transport-bincode", "serde-transport", "serde", "serde-transport-json", "tcp"] } thiserror = { version = "1.0.31" } -tinytemplate = { version = "1.2.1" } -tracing = { version = "0.1.34" } +tokio = { version = "1.21.2", features = ['full'] } +tracing = { version = "0.1.37" } +tracing-opentelemetry = { version = "0.17.4" } +tracing-subscriber = { version = "0.3.16", features = ["env-filter"] } uuid = { version = "0.8", features = ["serde"] } +dotenv = { version = "0.15.0" } +handlebars = { version = "*", features = [] } \ No newline at end of file diff --git a/crates/email_manager/assets/reset-password.html b/crates/email_manager/assets/reset-password.hbs similarity index 66% rename from crates/email_manager/assets/reset-password.html rename to crates/email_manager/assets/reset-password.hbs index c8cf040..8bd8579 100644 --- a/crates/email_manager/assets/reset-password.html +++ b/crates/email_manager/assets/reset-password.hbs @@ -5,7 +5,7 @@ Reset password @@ -16,17 +16,22 @@

Resetting your password is easy.

-

Just press the button below and follow the instructions. We’ll have you up and running in no time.

+

+ Just press the button below and follow the instructions. + We’ll have you up and running in no time. +

Reset Password

-

If you did not make this request then please ignore this email.

+

+ If you did not make this request then please ignore this email. +

diff --git a/crates/email_manager/assets/welcome.html b/crates/email_manager/assets/welcome.hbs similarity index 75% rename from crates/email_manager/assets/welcome.html rename to crates/email_manager/assets/welcome.hbs index 6e6cc4f..b15b3e1 100644 --- a/crates/email_manager/assets/welcome.html +++ b/crates/email_manager/assets/welcome.hbs @@ -2,21 +2,21 @@ - Welcome to { service_name } + Welcome to {{ service_name }}
-

Hi { login }

+

Hi {{ login }}

- Welcome to {service_name} – we’re excited to have you on board and we’d love to say thank you on behalf - of our whole company for chosing us. + Welcome to {service_name} – we’re excited to have you on board, and we’d love to say thank you on behalf + of our whole company for choosing us.

Take care,

{signature}

diff --git a/crates/email_manager/src/actions.rs b/crates/email_manager/src/actions.rs new file mode 100644 index 0000000..d0ea483 --- /dev/null +++ b/crates/email_manager/src/actions.rs @@ -0,0 +1,101 @@ +use channels::emails::{reset_password, test_mail, welcome}; + +use super::{Error, Result}; +use crate::SharedContext; + +static STYLE: &str = include_str!("../assets/style.css"); + +pub async fn test_mail(msg: test_mail::Input, ctx: SharedContext) -> Result<()> { + welcome( + welcome::Input { + login: model::Login::new("Test User"), + email: msg.receiver, + }, + ctx, + ) + .await +} + +#[derive(Debug, serde::Serialize)] +struct ResetPasswordContext { + url: String, + style: &'static str, +} + +pub async fn reset_password(msg: reset_password::Input, ctx: SharedContext) -> Result<()> { + let host = { ctx.config.lock().web().host() }; + let context = ResetPasswordContext { + url: format!( + "{host}/reset-password/{reset_token}", + reset_token = msg.reset_token + ), + style: STYLE, + }; + let html = ctx + .template + .render("reset-password", &context) + .map_err(|e| { + tracing::error!("{e:?}"); + Error::ResetPassTemplate + })?; + + let smtp_from = ctx.config.lock().mail().smtp_from(); + let status = ctx + .send_grid + .send( + sendgrid::Mail::new() + .add_to((msg.email.as_str(), msg.login.as_str()).into()) + .add_from(&smtp_from) + .add_subject("Reset Password") + .add_html(html.as_str()) + .build(), + ) + .await; + + tracing::debug!("{:?}", status); + + Ok(()) +} + +#[derive(Debug, serde::Serialize)] +struct WelcomeContext { + login: model::Login, + service_name: String, + signature: String, + style: &'static str, +} + +pub async fn welcome(msg: welcome::Input, ctx: SharedContext) -> Result<()> { + let (signature, service_name) = { + let l = ctx.config.lock(); + let w = l.web(); + (w.signature(), w.service_name()) + }; + let context = WelcomeContext { + login: msg.login.clone(), + service_name, + signature, + style: STYLE, + }; + let html = ctx.template.render("welcome", &context).map_err(|e| { + tracing::error!("{e:?}"); + Error::ResetPassTemplate + })?; + + let smtp_from = ctx.config.lock().mail().smtp_from().clone(); + let status = ctx + .send_grid + .send( + sendgrid::Mail::new() + .add_to((msg.email.as_str(), msg.login.as_str()).into()) + .add_from(&smtp_from) + .add_subject("Welcome") + .add_html(html.as_str()) + .build(), + ) + .await; + + tracing::debug!("{:?}", status); + + Ok(()) +} diff --git a/crates/email_manager/src/lib.rs b/crates/email_manager/src/lib.rs deleted file mode 100644 index 704dd8a..0000000 --- a/crates/email_manager/src/lib.rs +++ /dev/null @@ -1,189 +0,0 @@ -use std::sync::Arc; - -use config::SharedAppConfig; -use serde::Serialize; - -#[macro_export] -macro_rules! mail_async_handler { - ($msg: ty, $async: ident, $res: ty) => { - impl actix::Handler<$msg> for EmailManager { - type Result = actix::ResponseActFuture>; - - fn handle(&mut self, msg: $msg, _ctx: &mut Self::Context) -> Self::Result { - use actix::WrapFuture; - let inner = self.0.clone(); - Box::pin(async { $async(msg, inner).await }.into_actor(self)) - } - } - }; -} - -static STYLE: &str = include_str!("../assets/style.css"); - -#[derive(Debug, Copy, Clone, serde::Serialize, serde::Deserialize, thiserror::Error)] -#[serde(rename_all = "kebab-case", tag = "email")] -pub enum Error { - #[error("Failed to render reset password template")] - ResetPassTemplate, -} - -pub type Result = std::result::Result; - -pub struct SendState { - pub success: bool, -} - -pub struct EmailManager(Arc); - -pub(crate) struct Inner { - config: SharedAppConfig, - send_grid: sendgrid::SGClient, - template: Arc>, -} - -impl actix::Actor for EmailManager { - type Context = actix::Context; -} - -impl EmailManager { - pub fn build(config: SharedAppConfig) -> Result { - let template = { - use tinytemplate::*; - let mut t = TinyTemplate::new(); - t.add_template( - "reset-password", - include_str!("../assets/reset-password.html"), - ) - .expect("Failed to load e-mail template reset-password"); - t.add_template("welcome", include_str!("../assets/welcome.html")) - .expect("Failed to load e-mail template welcome"); - t - }; - - Ok(Self(Arc::new(Inner { - config: config.clone(), - send_grid: sendgrid::SGClient::new(config.lock().mail().sendgrid_secret()), - template: Arc::new(template), - }))) - } -} - -#[derive(actix::Message)] -#[rtype(result = "Result<()>")] -pub struct TestMail { - pub receiver: model::Email, -} - -mail_async_handler!(TestMail, test_mail, ()); - -pub(crate) async fn test_mail(msg: TestMail, inner: Arc) -> Result<()> { - welcome( - Welcome { - login: model::Login::new("Test User"), - email: msg.receiver, - }, - inner, - ) - .await -} - -#[derive(actix::Message, Debug)] -#[rtype(result = "Result<()>")] -pub struct ResetPassword { - pub login: model::Login, - pub email: model::Email, - pub reset_token: model::ResetToken, -} - -#[derive(Serialize)] -struct ResetPasswordContext { - url: String, - style: &'static str, -} - -mail_async_handler!(ResetPassword, reset_password, ()); - -pub(crate) async fn reset_password(msg: ResetPassword, inner: Arc) -> Result<()> { - let host = { inner.config.lock().web().host() }; - let context = ResetPasswordContext { - url: format!( - "{host}/reset-password/{reset_token}", - reset_token = msg.reset_token - ), - style: STYLE, - }; - let html = inner - .template - .render("reset-password", &context) - .map_err(|e| { - tracing::error!("{e:?}"); - Error::ResetPassTemplate - })?; - - let status = inner - .send_grid - .send( - sendgrid::Mail::new() - .add_to((msg.email.as_str(), msg.login.as_str()).into()) - .add_from(&inner.config.lock().mail().smtp_from()) - .add_subject("Reset Password") - .add_html(html.as_str()) - .build(), - ) - .await; - - tracing::debug!("{:?}", status); - - Ok(()) -} - -#[derive(actix::Message)] -#[rtype(result = "Result<()>")] -pub struct Welcome { - pub login: model::Login, - pub email: model::Email, -} - -#[derive(Serialize)] -struct WelcomeContext { - login: model::Login, - service_name: String, - signature: String, - style: &'static str, -} - -mail_async_handler!(Welcome, welcome, ()); - -pub(crate) async fn welcome(msg: Welcome, inner: Arc) -> Result<()> { - let (signature, service_name) = { - let l = inner.config.lock(); - let w = l.web(); - (w.signature(), w.service_name()) - }; - let context = WelcomeContext { - login: msg.login.clone(), - service_name, - signature, - style: STYLE, - }; - let html = inner.template.render("welcome", &context).map_err(|e| { - tracing::error!("{e:?}"); - Error::ResetPassTemplate - })?; - - let status = inner - .send_grid - .send( - sendgrid::Mail::new() - .add_to((msg.email.as_str(), msg.login.as_str()).into()) - .add_from(&inner.config.lock().mail().smtp_from()) - .add_subject("Welcome") - .add_html(html.as_str()) - .build(), - ) - .await; - - tracing::debug!("{:?}", status); - - Ok(()) -} diff --git a/crates/email_manager/src/main.rs b/crates/email_manager/src/main.rs new file mode 100644 index 0000000..c3aba74 --- /dev/null +++ b/crates/email_manager/src/main.rs @@ -0,0 +1,94 @@ +use std::sync::Arc; + +use config::{SharedAppConfig, UpdateConfig}; +use tracing_subscriber::fmt::format::FmtSpan; +use tracing_subscriber::layer::SubscriberExt; +use tracing_subscriber::util::SubscriberInitExt; + +// use crate::db::Database; + +pub mod actions; +// pub mod db; +pub mod mqtt; +// pub mod rpc; + +#[derive(Debug, Copy, Clone, serde::Serialize, serde::Deserialize, thiserror::Error)] +pub enum Error { + #[error("Failed to render reset password template")] + ResetPassTemplate, +} + +pub type Result = std::result::Result; + +pub struct Opts {} + +impl UpdateConfig for Opts {} + +pub struct Context { + config: SharedAppConfig, + send_grid: sendgrid::SGClient, + template: handlebars::Handlebars<'static>, + // template: tinytemplate::TinyTemplate<'static>, +} + +impl Context { + pub fn build(config: SharedAppConfig) -> SharedContext { + let template = { + let mut t = handlebars::Handlebars::new(); + t.register_template_string/*add_template*/( + "reset-password", + include_str!("../assets/reset-password.hbs"), + ) + .expect("Failed to load e-mail template reset-password"); + t.register_template_string("welcome", include_str!("../assets/welcome.hbs")) + .expect("Failed to load e-mail template welcome"); + t + }; + + Arc::new(Self { + config: config.clone(), + send_grid: sendgrid::SGClient::new(config.lock().mail().sendgrid_secret()), + template, + }) + } +} + +pub type SharedContext = Arc; + +#[actix::main] +async fn main() { + dotenv::dotenv().ok(); + init_tracing("email-sender"); + + let opts = Opts {}; + + let config = config::default_load(&opts); + + let context = Context::build(config.clone()); + + let _mqtt_client = mqtt::start(config.clone(), context.clone()).await; + // rpc::start(config.clone(), db.clone(), mqtt_client.clone()).await; + loop { + std::thread::park(); + } +} + +pub fn init_tracing(_service_name: &str) { + std::env::set_var("OTEL_BSP_MAX_EXPORT_BATCH_SIZE", "12"); + + let tracer = { + use opentelemetry::sdk::export::trace::stdout::new_pipeline; + use opentelemetry::sdk::trace::Config; + new_pipeline() + .with_trace_config(Config::default()) + .with_pretty_print(true) + .install_simple() + }; + + tracing_subscriber::registry() + .with(tracing_subscriber::EnvFilter::from_default_env()) + .with(tracing_subscriber::fmt::layer().with_span_events(FmtSpan::NEW | FmtSpan::CLOSE)) + .with(tracing_opentelemetry::layer().with_tracer(tracer)) + .try_init() + .unwrap(); +} diff --git a/crates/email_manager/src/mqtt.rs b/crates/email_manager/src/mqtt.rs new file mode 100644 index 0000000..1cfa86d --- /dev/null +++ b/crates/email_manager/src/mqtt.rs @@ -0,0 +1,96 @@ +use channels::emails::{reset_password, test_mail}; +use channels::{accounts, emails, DeserializePayload}; +use config::SharedAppConfig; +use rumqttc::{Event, Incoming, Publish, QoS}; + +use crate::{actions, SharedContext}; + +pub async fn start(config: SharedAppConfig, ctx: SharedContext) -> channels::AsyncClient { + use channels::accounts::Topic as AccountTopic; + use channels::emails::Topic as EmailTopic; + + let (client, mut event_loop) = + channels::mqtt::create_client(emails::CLIENT_NAME, config.clone()); + + client + .0 + .subscribe(AccountTopic::AccountCreated, QoS::AtLeastOnce) + .await + .unwrap(); + client + .0 + .subscribe(EmailTopic::ResetPassword, QoS::AtLeastOnce) + .await + .unwrap(); + client + .0 + .subscribe(EmailTopic::Test, QoS::AtLeastOnce) + .await + .unwrap(); + + let spawn_client = client.clone(); + let ctx = ctx.clone(); + + tokio::spawn(async move { + let _client = spawn_client.clone(); + let ctx = ctx.clone(); + loop { + let notification = event_loop.poll().await; + + match notification { + Ok(Event::Incoming(Incoming::Publish(publish))) => { + tracing::info!("Received publish {:?}", publish.topic); + match publish.topic.as_str() { + t if AccountTopic::AccountCreated == t => { + on_created(publish, ctx.clone()).await + } + t if EmailTopic::ResetPassword == t => on_reset(publish, ctx.clone()).await, + t if EmailTopic::Test == t => on_test(publish, ctx.clone()).await, + _ => {} + } + } + _ => {} + } + } + }); + client +} + +async fn on_created(publish: Publish, ctx: SharedContext) { + if let Some(account) = + accounts::Topic::AccountCreated.deserialize_payload::(publish.payload) + { + if let Err(e) = actions::welcome( + emails::welcome::Input { + login: account.login, + email: account.email, + }, + ctx, + ) + .await + { + tracing::error!("{}", e); + } + } +} + +async fn on_reset(publish: Publish, ctx: SharedContext) { + if let Some(msg) = + emails::Topic::ResetPassword.deserialize_payload::(publish.payload) + { + if let Err(e) = actions::reset_password(msg, ctx).await { + tracing::error!("{}", e); + } + } +} + +async fn on_test(publish: Publish, ctx: SharedContext) { + if let Some(msg) = emails::Topic::Test.deserialize_payload::(publish.payload) { + let msg = test_mail::Input { + receiver: msg.email, + }; + if let Err(e) = actions::test_mail(msg, ctx).await { + tracing::error!("{}", e); + } + } +}