Rewrite email sender

This commit is contained in:
Adrian Woźniak 2022-11-05 01:08:45 +01:00
parent 7d507602c3
commit eb69682c67
No known key found for this signature in database
GPG Key ID: 0012845A89C7352B
23 changed files with 641 additions and 356 deletions

38
Cargo.lock generated
View File

@ -672,14 +672,12 @@ dependencies = [
"actix-web-opentelemetry", "actix-web-opentelemetry",
"async-trait", "async-trait",
"bytes", "bytes",
"cart_manager",
"channels", "channels",
"chrono", "chrono",
"config", "config",
"database_manager", "database_manager",
"derive_more", "derive_more",
"dotenv", "dotenv",
"email_manager",
"fs_manager", "fs_manager",
"futures 0.3.25", "futures 0.3.25",
"futures-util", "futures-util",
@ -884,11 +882,13 @@ dependencies = [
"bincode", "bincode",
"bytes", "bytes",
"config", "config",
"futures 0.3.25",
"model", "model",
"rumqttc", "rumqttc",
"serde", "serde",
"tarpc", "tarpc",
"thiserror", "thiserror",
"tokio",
"tracing", "tracing",
] ]
@ -1370,17 +1370,25 @@ version = "0.1.0"
dependencies = [ dependencies = [
"actix 0.13.0", "actix 0.13.0",
"actix-rt", "actix-rt",
"channels",
"chrono", "chrono",
"config", "config",
"dotenv",
"handlebars",
"model", "model",
"opentelemetry 0.17.0",
"opentelemetry-jaeger",
"pretty_env_logger", "pretty_env_logger",
"rumqttc", "rumqttc",
"sendgrid", "sendgrid",
"serde", "serde",
"serde_json", "serde_json",
"tarpc",
"thiserror", "thiserror",
"tinytemplate", "tokio",
"tracing", "tracing",
"tracing-opentelemetry",
"tracing-subscriber",
"uuid 0.8.2", "uuid 0.8.2",
] ]
@ -1912,6 +1920,20 @@ dependencies = [
"tracing", "tracing",
] ]
[[package]]
name = "handlebars"
version = "4.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "433e4ab33f1213cdc25b5fa45c76881240cfe79284cf2b395e8b9e312a30a2fd"
dependencies = [
"log",
"pest",
"pest_derive",
"serde",
"serde_json",
"thiserror",
]
[[package]] [[package]]
name = "hashbrown" name = "hashbrown"
version = "0.12.3" version = "0.12.3"
@ -4280,16 +4302,6 @@ dependencies = [
"displaydoc", "displaydoc",
] ]
[[package]]
name = "tinytemplate"
version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be4d6b5f19ff7664e8c98d03e2139cb510db9b0a60b55f8e8709b689d939b6bc"
dependencies = [
"serde",
"serde_json",
]
[[package]] [[package]]
name = "tinyvec" name = "tinyvec"
version = "1.6.0" version = "1.6.0"

View File

@ -23,5 +23,13 @@ next_connection_delay_ms = 1
max_inflight_count = 200 max_inflight_count = 200
max_inflight_size = 1024 max_inflight_size = 1024
[replicator]
connection_timeout_ms = 100
max_client_id_len = 256
throttle_delay_ms = 0
max_payload_size = 2048
max_inflight_count = 500
max_inflight_size = 1024
[console] [console]
listen = "0.0.0.0:3030" listen = "0.0.0.0:3030"

View File

@ -1,29 +1,12 @@
use std::time::Duration;
use channels::accounts::Topic;
use config::SharedAppConfig; use config::SharedAppConfig;
use rumqttc::{Event, Incoming, QoS}; use rumqttc::{Event, Incoming};
use crate::db::Database; use crate::db::Database;
pub async fn start(config: SharedAppConfig, _db: Database) -> channels::AsyncClient { pub async fn start(config: SharedAppConfig, _db: Database) -> channels::AsyncClient {
let mut mqtt_options = { let (client, mut event_loop) =
let l = config.lock(); channels::mqtt::create_client(channels::accounts::CLIENT_NAME, config);
let bind = &l.account_manager().mqtt_bind;
let port = l.account_manager().mqtt_port;
tracing::info!("Starting account mqtt at {}:{}", bind, port);
rumqttc::MqttOptions::new(channels::accounts::CLIENT_NAME, bind, port)
};
mqtt_options.set_keep_alive(Duration::from_secs(5));
let (client, mut event_loop) = rumqttc::AsyncClient::new(mqtt_options, 10);
client
.subscribe(Topic::CreateAccount, QoS::AtLeastOnce)
.await
.unwrap();
let client = channels::AsyncClient(client);
let spawn_client = client.clone(); let spawn_client = client.clone();
tokio::spawn(async move { tokio::spawn(async move {
let _client = spawn_client.clone(); let _client = spawn_client.clone();

View File

@ -1,16 +1,8 @@
use std::net::{IpAddr, Ipv4Addr};
use channels::accounts::rpc::Accounts; use channels::accounts::rpc::Accounts;
use channels::accounts::{me, register}; use channels::accounts::{me, register};
use channels::AsyncClient; use channels::AsyncClient;
use config::SharedAppConfig; use config::SharedAppConfig;
use futures::future::{self};
use futures::stream::StreamExt;
use rumqttc::QoS;
use tarpc::context; use tarpc::context;
use tarpc::server::incoming::Incoming;
use tarpc::server::{self, Channel};
use tarpc::tokio_serde::formats::Bincode;
use crate::actions; use crate::actions;
use crate::db::Database; use crate::db::Database;
@ -50,15 +42,13 @@ impl Accounts for AccountsServer {
_: context::Context, _: context::Context,
input: register::Input, input: register::Input,
) -> register::Output { ) -> register::Output {
use channels::accounts::{Error, Topic}; use channels::accounts::Error;
let res = actions::create_account(input, &self.db, self.config).await; let res = actions::create_account(input, &self.db, self.config).await;
tracing::info!("REGISTER result: {:?}", res); tracing::info!("REGISTER result: {:?}", res);
match res { match res {
Ok(account) => { Ok(account) => {
self.mqtt_client self.mqtt_client.emit_account_created(&account).await;
.publish_or_log(Topic::AccountCreated, QoS::AtLeastOnce, true, &account)
.await;
register::Output { register::Output {
account: Some(account), account: Some(account),
error: None, error: None,
@ -73,37 +63,15 @@ impl Accounts for AccountsServer {
} }
pub async fn start(config: SharedAppConfig, db: Database, mqtt_client: AsyncClient) { pub async fn start(config: SharedAppConfig, db: Database, mqtt_client: AsyncClient) {
let port = { config.lock().account_manager().port }; let port = { config.lock().account_manager().rpc_port };
let server_addr = (IpAddr::V4(Ipv4Addr::LOCALHOST), port); channels::rpc::start("accounts", port, || {
let mut listener = tarpc::serde_transport::tcp::listen(&server_addr, Bincode::default)
.await
.unwrap();
tracing::info!("Starting account rpc at {}", listener.local_addr());
listener.config_mut().max_frame_length(usize::MAX);
listener
// Ignore accept errors.
.filter_map(|r| future::ready(r.ok()))
.map(server::BaseChannel::with_defaults)
// Limit channels to 8 per IP.
.max_channels_per_key(8, |t| t.transport().peer_addr().unwrap().ip())
.max_concurrent_requests_per_channel(20)
// serve is generated by the service attribute. It takes as input any type implementing
// the generated World trait.
.map(|channel| {
channel.execute(
AccountsServer { AccountsServer {
db: db.clone(), db: db.clone(),
config: config.clone(), config: config.clone(),
mqtt_client: mqtt_client.clone(), mqtt_client: mqtt_client.clone(),
} }
.serve(), .serve()
)
}) })
// Max 10 channels.
.buffer_unordered(10)
.for_each(|_| async {})
.await; .await;
tracing::info!("RPC channel closed");
} }

View File

@ -18,14 +18,12 @@ actix-web-httpauth = { version = "0.6", features = [] }
actix-web-opentelemetry = { version = "0.12", features = [] } actix-web-opentelemetry = { version = "0.12", features = [] }
async-trait = { version = "0.1", features = [] } async-trait = { version = "0.1", features = [] }
bytes = { version = "1.1.0" } bytes = { version = "1.1.0" }
cart_manager = { path = "../cart_manager" }
channels = { path = "../channels" } channels = { path = "../channels" }
chrono = { version = "0.4", features = ["serde"] } chrono = { version = "0.4", features = ["serde"] }
config = { path = "../config" } config = { path = "../config" }
database_manager = { path = "../database_manager" } database_manager = { path = "../database_manager" }
derive_more = { version = "0.99", features = [] } derive_more = { version = "0.99", features = [] }
dotenv = { version = "0.15", features = [] } dotenv = { version = "0.15", features = [] }
email_manager = { path = "../email_manager" }
fs_manager = { path = "../fs_manager" } fs_manager = { path = "../fs_manager" }
futures = { version = "0.3", features = [] } futures = { version = "0.3", features = [] }
futures-util = { version = "0.3", features = [] } futures-util = { version = "0.3", features = [] }

View File

@ -10,13 +10,13 @@ use actix_web::middleware::Logger;
use actix_web::web::Data; use actix_web::web::Data;
use actix_web::{App, HttpServer}; use actix_web::{App, HttpServer};
use config::UpdateConfig; use config::UpdateConfig;
use email_manager::TestMail;
use jemallocator::Jemalloc; use jemallocator::Jemalloc;
use model::{Email, Encrypt, Login, PassHash, Password, Role}; use model::{AccountState, Email, Encrypt, Login, PassHash, Password, Role};
use opts::{ use opts::{
Command, CreateAccountCmd, CreateAccountOpts, GenerateHashOpts, Opts, ServerOpts, Command, CreateAccountCmd, CreateAccountOpts, GenerateHashOpts, Opts, ServerOpts,
TestMailerOpts, TestMailerOpts,
}; };
use rumqttc::Outgoing;
use validator::{validate_email, validate_length}; use validator::{validate_email, validate_length};
use crate::opts::ReIndexOpts; use crate::opts::ReIndexOpts;
@ -169,16 +169,28 @@ async fn test_mailer(opts: TestMailerOpts) -> Result<()> {
let config = config::default_load(&opts); let config = config::default_load(&opts);
opts.update_config(&mut *config.lock()); opts.update_config(&mut *config.lock());
let manager = email_manager::EmailManager::build(config) let (client, mut event_loop) = channels::mqtt::create_client("bazzar", config);
.expect("Invalid email manager config") client
.start(); .emit_test(&model::Account {
manager id: 0.into(),
.send(TestMail { email: opts.receiver.unwrap(),
receiver: opts.receiver.expect("e-mail address is required"), login: Login::new("test email"),
role: Role::Admin,
customer_id: Default::default(),
state: AccountState::Active,
}) })
.await .await;
.expect("Failed to execute actor")
.expect("Failed to send email"); loop {
let msg = event_loop.poll().await.unwrap();
tracing::info!("{:?}", msg);
if let rumqttc::Event::Outgoing(Outgoing::PubAck(_)) = msg {
client.0.disconnect().await.unwrap();
break;
}
}
println!("Success!"); println!("Success!");
Ok(()) Ok(())
} }

View File

@ -41,7 +41,6 @@ pub enum Error {
Public(public::Error), Public(public::Error),
Admin(admin::Error), Admin(admin::Error),
Database(database_manager::Error), Database(database_manager::Error),
Email(email_manager::Error),
Fs(fs_manager::Error), Fs(fs_manager::Error),
Order(order_manager::Error), Order(order_manager::Error),
Pay(payment_manager::Error), Pay(payment_manager::Error),
@ -78,7 +77,6 @@ impl Display for Error {
.unwrap_or_default(), .unwrap_or_default(),
Error::CriticalFailure => String::from("Something went wrong"), Error::CriticalFailure => String::from("Something went wrong"),
Error::Database(_e) => serde_json::to_string(&self).unwrap_or_default(), Error::Database(_e) => serde_json::to_string(&self).unwrap_or_default(),
Error::Email(_e) => serde_json::to_string(&self).unwrap_or_default(),
Error::Fs(_e) => serde_json::to_string(&self).unwrap_or_default(), Error::Fs(_e) => serde_json::to_string(&self).unwrap_or_default(),
Error::Order(_e) => serde_json::to_string(&self).unwrap_or_default(), Error::Order(_e) => serde_json::to_string(&self).unwrap_or_default(),
Error::Pay(_e) => serde_json::to_string(&self).unwrap_or_default(), Error::Pay(_e) => serde_json::to_string(&self).unwrap_or_default(),
@ -99,7 +97,6 @@ impl ResponseError for Error {
Error::Admin(_) => StatusCode::BAD_REQUEST, Error::Admin(_) => StatusCode::BAD_REQUEST,
Error::Public(_) => StatusCode::BAD_REQUEST, Error::Public(_) => StatusCode::BAD_REQUEST,
Error::Database(_) => StatusCode::BAD_REQUEST, Error::Database(_) => StatusCode::BAD_REQUEST,
Error::Email(_) => StatusCode::BAD_REQUEST,
Error::Fs(_) => StatusCode::BAD_REQUEST, Error::Fs(_) => StatusCode::BAD_REQUEST,
Error::Order(_) => StatusCode::BAD_REQUEST, Error::Order(_) => StatusCode::BAD_REQUEST,
Error::Pay(_) => StatusCode::BAD_REQUEST, Error::Pay(_) => StatusCode::BAD_REQUEST,

View File

@ -1,14 +1,8 @@
use std::net::{IpAddr, Ipv4Addr};
use channels::carts::modify_item::{Input, Output}; use channels::carts::modify_item::{Input, Output};
use channels::carts::rpc::Carts; use channels::carts::rpc::Carts;
use channels::AsyncClient; use channels::AsyncClient;
use config::SharedAppConfig; use config::SharedAppConfig;
use futures::{future, StreamExt}; use tarpc::context;
use tarpc::server::incoming::Incoming;
use tarpc::server::Channel;
use tarpc::tokio_serde::formats::Bincode;
use tarpc::{context, server};
use crate::db::Database; use crate::db::Database;
@ -51,37 +45,15 @@ impl Carts for CartsServer {
} }
pub async fn start(config: SharedAppConfig, db: Database, mqtt_client: AsyncClient) { pub async fn start(config: SharedAppConfig, db: Database, mqtt_client: AsyncClient) {
let port = { config.lock().cart_manager().port }; let port = { config.lock().cart_manager().rpc_port };
let server_addr = (IpAddr::V4(Ipv4Addr::LOCALHOST), port); channels::rpc::start("carts", port, || {
let mut listener = tarpc::serde_transport::tcp::listen(&server_addr, Bincode::default)
.await
.unwrap();
tracing::info!("Starting account rpc at {}", listener.local_addr());
listener.config_mut().max_frame_length(usize::MAX);
listener
// Ignore accept errors.
.filter_map(|r| future::ready(r.ok()))
.map(server::BaseChannel::with_defaults)
// Limit channels to 8 per IP.
.max_channels_per_key(8, |t| t.transport().peer_addr().unwrap().ip())
.max_concurrent_requests_per_channel(20)
// serve is generated by the service attribute. It takes as input any type implementing
// the generated World trait.
.map(|channel| {
channel.execute(
CartsServer { CartsServer {
db: db.clone(), db: db.clone(),
_config: config.clone(), _config: config.clone(),
_mqtt_client: mqtt_client.clone(), _mqtt_client: mqtt_client.clone(),
} }
.serve(), .serve()
)
}) })
// Max 10 channels.
.buffer_unordered(10)
.for_each(|_| async {})
.await; .await;
tracing::info!("RPC channel closed");
} }

View File

@ -7,9 +7,11 @@ edition = "2021"
bincode = { version = "*" } bincode = { version = "*" }
bytes = { version = "1.2.1" } bytes = { version = "1.2.1" }
config = { path = "../config" } config = { path = "../config" }
futures = { version = "0.3.25" }
model = { path = "../model" } model = { path = "../model" }
rumqttc = { version = "0.17.0" } rumqttc = { version = "0.17.0" }
serde = { version = "*", features = ['derive'] } serde = { version = "*", features = ['derive'] }
tarpc = { version = "0.30.0", features = ["tokio1", "serde-transport-bincode", "serde-transport", "serde", "serde-transport-json", "tcp"] } tarpc = { version = "0.30.0", features = ["tokio1", "serde-transport-bincode", "serde-transport", "serde", "serde-transport-json", "tcp"] }
thiserror = { version = "1.0.37" } thiserror = { version = "1.0.37" }
tokio = { version = "1.21.2", features = ['full'] }
tracing = { version = "0.1.37" } tracing = { version = "0.1.37" }

View File

@ -1,3 +1,9 @@
use bytes::Bytes;
use rumqttc::QoS;
use serde::de::DeserializeOwned;
use crate::{AsyncClient, DeserializePayload};
#[derive(Debug, thiserror::Error, serde::Serialize, serde::Deserialize)] #[derive(Debug, thiserror::Error, serde::Serialize, serde::Deserialize)]
pub enum Error { pub enum Error {
#[error("mqtt payload has invalid create account data")] #[error("mqtt payload has invalid create account data")]
@ -14,9 +20,23 @@ pub static CLIENT_NAME: &str = "account-manager";
#[derive(Copy, Clone, Debug, PartialOrd, PartialEq, serde::Serialize, serde::Deserialize)] #[derive(Copy, Clone, Debug, PartialOrd, PartialEq, serde::Serialize, serde::Deserialize)]
pub enum Topic { pub enum Topic {
CreateAccount,
AccountCreated, AccountCreated,
SignUpFailure, }
impl DeserializePayload for Topic {
fn deserialize_payload<T: DeserializeOwned>(self, bytes: Bytes) -> Option<T> {
match self {
Topic::AccountCreated => bincode::deserialize(bytes.as_ref()).ok(),
}
}
}
impl Topic {
pub fn to_str(self) -> &'static str {
match self {
Topic::AccountCreated => "account/created",
}
}
} }
impl Into<String> for Topic { impl Into<String> for Topic {
@ -37,16 +57,6 @@ impl PartialEq<String> for Topic {
} }
} }
impl Topic {
pub fn to_str(self) -> &'static str {
match self {
Topic::CreateAccount => "account/create",
Topic::AccountCreated => "account/created",
Topic::SignUpFailure => "account/failure",
}
}
}
pub mod register { pub mod register {
use model::{Email, Login, Password, Role}; use model::{Email, Login, Password, Role};
@ -92,6 +102,13 @@ pub mod me {
} }
} }
impl AsyncClient {
pub async fn emit_account_created(&self, account: &model::FullAccount) {
self.publish_or_log(Topic::AccountCreated, QoS::AtLeastOnce, true, account)
.await
}
}
pub mod rpc { pub mod rpc {
use config::SharedAppConfig; use config::SharedAppConfig;
@ -112,7 +129,10 @@ pub mod rpc {
let addr = { let addr = {
let l = config.lock(); let l = config.lock();
(l.account_manager().bind.clone(), l.account_manager().port) (
l.account_manager().bind.clone(),
l.account_manager().rpc_port,
)
}; };
let transport = tarpc::serde_transport::tcp::connect(addr, Bincode::default); let transport = tarpc::serde_transport::tcp::connect(addr, Bincode::default);

View File

@ -188,7 +188,10 @@ pub mod rpc {
let addr = { let addr = {
let l = config.lock(); let l = config.lock();
(l.account_manager().bind.clone(), l.account_manager().port) (
l.account_manager().bind.clone(),
l.account_manager().rpc_port,
)
}; };
let transport = tarpc::serde_transport::tcp::connect(addr, Bincode::default); let transport = tarpc::serde_transport::tcp::connect(addr, Bincode::default);

View File

@ -0,0 +1,85 @@
use bytes::Bytes;
use rumqttc::QoS;
use serde::de::DeserializeOwned;
use crate::{AsyncClient, DeserializePayload};
impl AsyncClient {
pub async fn emit_reset_password(&self, account: &reset_password::Input) {
self.publish_or_log(Topic::ResetPassword, QoS::AtLeastOnce, true, account)
.await;
}
pub async fn emit_test(&self, account: &model::Account) {
self.publish_or_log(Topic::Test, QoS::AtLeastOnce, false, account)
.await;
}
}
#[derive(Debug, thiserror::Error, serde::Serialize, serde::Deserialize)]
pub enum Error {}
pub static CLIENT_NAME: &str = "email-sender";
#[derive(Copy, Clone, Debug, PartialOrd, PartialEq, serde::Serialize, serde::Deserialize)]
pub enum Topic {
ResetPassword,
Test,
}
impl Topic {
pub fn to_str(self) -> &'static str {
match self {
Topic::ResetPassword => "emails/reset-password",
Topic::Test => "emails/test",
}
}
}
impl DeserializePayload for Topic {
fn deserialize_payload<T: DeserializeOwned>(self, bytes: Bytes) -> Option<T> {
bincode::deserialize(bytes.as_ref()).ok()
}
}
impl Into<String> for Topic {
fn into(self) -> String {
String::from(self.to_str())
}
}
impl<'s> PartialEq<&'s str> for Topic {
fn eq(&self, other: &&'s str) -> bool {
self.to_str() == *other
}
}
impl PartialEq<String> for Topic {
fn eq(&self, other: &String) -> bool {
self.to_str() == other.as_str()
}
}
pub mod test_mail {
#[derive(Debug)]
pub struct Input {
pub receiver: model::Email,
}
}
pub mod reset_password {
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct Input {
pub login: model::Login,
pub email: model::Email,
pub reset_token: model::ResetToken,
}
}
pub mod welcome {
#[derive(Debug)]
pub struct Input {
pub login: model::Login,
pub email: model::Email,
}
}

View File

@ -2,24 +2,38 @@
pub mod accounts; pub mod accounts;
pub mod carts; pub mod carts;
pub mod emails;
pub mod mqtt;
pub mod rpc;
pub trait DeserializePayload {
fn deserialize_payload<T: serde::de::DeserializeOwned>(self, bytes: bytes::Bytes) -> Option<T>;
}
#[derive(Clone)] #[derive(Clone)]
pub struct AsyncClient(pub rumqttc::AsyncClient); pub struct AsyncClient(pub rumqttc::AsyncClient);
impl AsyncClient { impl AsyncClient {
pub async fn publish<Topic: Into<String>, T: serde::Serialize>( pub(crate) async fn publish<Topic: Into<String>, T: serde::Serialize>(
&self, &self,
topic: Topic, topic: Topic,
qos: rumqttc::QoS, qos: rumqttc::QoS,
retain: bool, retain: bool,
t: T, t: T,
) -> Result<(), rumqttc::ClientError> { ) -> Result<(), rumqttc::ClientError> {
let v = bincode::serialize(&t).unwrap_or_default(); match bincode::serialize(&t) {
let bytes = bytes::Bytes::copy_from_slice(&v); Ok(v) => {
let bytes = bytes::Bytes::from(v);
self.0.publish_bytes(topic, qos, retain, bytes).await self.0.publish_bytes(topic, qos, retain, bytes).await
} }
Err(e) => {
tracing::error!("{}", e);
Ok(())
}
}
}
pub async fn publish_or_log<Topic: Into<String>, T: serde::Serialize>( pub(crate) async fn publish_or_log<Topic: Into<String>, T: serde::Serialize>(
&self, &self,
topic: Topic, topic: Topic,
qos: rumqttc::QoS, qos: rumqttc::QoS,

View File

@ -0,0 +1,22 @@
use std::time::Duration;
use config::SharedAppConfig;
use rumqttc::EventLoop;
use crate::AsyncClient;
pub fn create_client(name: &str, config: SharedAppConfig) -> (AsyncClient, EventLoop) {
let mut mqtt_options = {
let l = config.lock();
let bind = &l.account_manager().mqtt_bind;
let port = l.account_manager().mqtt_port;
tracing::info!("Starting account mqtt at {}:{}", bind, port);
rumqttc::MqttOptions::new(name, bind, port)
};
mqtt_options.set_keep_alive(Duration::from_secs(5));
let (client, event_loop) = rumqttc::AsyncClient::new(mqtt_options, 10);
(AsyncClient(client), event_loop)
}

View File

@ -0,0 +1,40 @@
use std::net::{IpAddr, Ipv4Addr};
use futures::StreamExt;
use tarpc::server;
use tarpc::server::incoming::Incoming;
use tarpc::server::{Channel, Serve};
use tarpc::tokio_serde::formats::Bincode;
pub async fn start<Server, Req, Build>(name: &str, port: u16, build: Build)
where
Server: Serve<Req> + Send + 'static + Clone,
Build: Fn() -> Server,
<Server as Serve<Req>>::Fut: Send,
<Server as Serve<Req>>::Resp: serde::Serialize + Send + 'static,
Req: Send + 'static,
Req: for<'l> serde::Deserialize<'l>,
{
let server_addr = (IpAddr::V4(Ipv4Addr::LOCALHOST), port);
let mut listener = tarpc::serde_transport::tcp::listen(&server_addr, Bincode::default)
.await
.unwrap();
tracing::info!("Starting {} rpc at {}", name, listener.local_addr());
listener.config_mut().max_frame_length(usize::MAX);
listener
// Ignore accept errors.
.filter_map(|r| futures::future::ready(r.ok()))
.map(server::BaseChannel::with_defaults)
// Limit channels to 8 per IP.
.max_channels_per_key(8, |t| t.transport().peer_addr().unwrap().ip())
.max_concurrent_requests_per_channel(20)
// serve is generated by the service attribute. It takes as input any type implementing
// the generated World trait.
.map(|channel| channel.execute(build()))
// Max 10 channels.
.buffer_unordered(10)
.for_each(|_| async {})
.await;
tracing::info!("RPC channel closed");
}

View File

@ -409,7 +409,7 @@ impl FilesConfig {
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
pub struct AccountManagerConfig { pub struct AccountManagerConfig {
pub port: u16, pub rpc_port: u16,
pub bind: String, pub bind: String,
pub mqtt_port: u16, pub mqtt_port: u16,
pub mqtt_bind: String, pub mqtt_bind: String,
@ -419,7 +419,7 @@ pub struct AccountManagerConfig {
impl Default for AccountManagerConfig { impl Default for AccountManagerConfig {
fn default() -> Self { fn default() -> Self {
Self { Self {
port: 19329, rpc_port: 19329,
bind: "0.0.0.0".into(), bind: "0.0.0.0".into(),
mqtt_port: 1883, mqtt_port: 1883,
mqtt_bind: "0.0.0.0".into(), mqtt_bind: "0.0.0.0".into(),
@ -432,7 +432,7 @@ impl Example for AccountManagerConfig {}
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
pub struct CartManagerConfig { pub struct CartManagerConfig {
pub port: u16, pub rpc_port: u16,
pub bind: String, pub bind: String,
pub mqtt_port: u16, pub mqtt_port: u16,
pub mqtt_bind: String, pub mqtt_bind: String,
@ -442,7 +442,7 @@ pub struct CartManagerConfig {
impl Default for CartManagerConfig { impl Default for CartManagerConfig {
fn default() -> Self { fn default() -> Self {
Self { Self {
port: 19330, rpc_port: 19330,
bind: "0.0.0.0".into(), bind: "0.0.0.0".into(),
mqtt_port: 1884, mqtt_port: 1884,
mqtt_bind: "0.0.0.0".into(), mqtt_bind: "0.0.0.0".into(),
@ -453,6 +453,28 @@ impl Default for CartManagerConfig {
impl Example for CartManagerConfig {} impl Example for CartManagerConfig {}
#[derive(Debug, Serialize, Deserialize)]
pub struct EmailSenderConfig {
pub rpc_port: u16,
pub bind: String,
pub mqtt_port: u16,
pub mqtt_bind: String,
pub database_url: String,
}
impl Default for EmailSenderConfig {
fn default() -> Self {
Self {
rpc_port: 19331,
bind: "0.0.0.0".into(),
mqtt_port: 1885,
mqtt_bind: "0.0.0.0".into(),
database_url: "postgres://postgres@localhost/bazzar_emails".into(),
}
}
}
impl Example for EmailSenderConfig {}
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
pub struct AppConfig { pub struct AppConfig {
#[serde(default)] #[serde(default)]
@ -471,6 +493,8 @@ pub struct AppConfig {
account_manager: AccountManagerConfig, account_manager: AccountManagerConfig,
#[serde(default)] #[serde(default)]
cart_manager: CartManagerConfig, cart_manager: CartManagerConfig,
#[serde(default)]
email_sender: EmailSenderConfig,
#[serde(skip)] #[serde(skip)]
config_path: String, config_path: String,
} }
@ -485,7 +509,8 @@ impl Example for AppConfig {
search: SearchConfig::example(), search: SearchConfig::example(),
files: FilesConfig::example(), files: FilesConfig::example(),
account_manager: AccountManagerConfig::example(), account_manager: AccountManagerConfig::example(),
cart_manager: Default::default(), cart_manager: CartManagerConfig::example(),
email_sender: EmailSenderConfig::example(),
config_path: "".to_string(), config_path: "".to_string(),
} }
} }
@ -539,6 +564,10 @@ impl AppConfig {
pub fn cart_manager(&self) -> &CartManagerConfig { pub fn cart_manager(&self) -> &CartManagerConfig {
&self.cart_manager &self.cart_manager
} }
pub fn email_sender(&self) -> &EmailSenderConfig {
&self.email_sender
}
} }
impl Default for AppConfig { impl Default for AppConfig {
@ -552,6 +581,7 @@ impl Default for AppConfig {
files: FilesConfig::default(), files: FilesConfig::default(),
account_manager: AccountManagerConfig::default(), account_manager: AccountManagerConfig::default(),
cart_manager: Default::default(), cart_manager: Default::default(),
email_sender: Default::default(),
config_path: "".to_string(), config_path: "".to_string(),
} }
} }

View File

@ -3,18 +3,30 @@ name = "email_manager"
version = "0.1.0" version = "0.1.0"
edition = "2021" edition = "2021"
[[bin]]
name = "email-sender"
path = "./src/main.rs"
[dependencies] [dependencies]
actix = { version = "0.13", features = [] } actix = { version = "0.13", features = [] }
actix-rt = { version = "2.7", features = [] } actix-rt = { version = "2.7", features = [] }
channels = { path = "../channels" }
chrono = { version = "0.4", features = ["serde"] } chrono = { version = "0.4", features = ["serde"] }
config = { path = "../config" } config = { path = "../config" }
model = { path = "../model" } model = { path = "../model" }
opentelemetry = { version = "0.17.0" }
opentelemetry-jaeger = { version = "0.17.0" }
pretty_env_logger = { version = "0.4", features = [] } pretty_env_logger = { version = "0.4", features = [] }
rumqttc = { version = "*" } rumqttc = { version = "*" }
sendgrid = { version = "0.17", features = ["async"] } sendgrid = { version = "0.17", features = ["async"] }
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_json = { version = "1.0", features = [] } serde_json = { version = "1.0", features = [] }
tarpc = { version = "0.30.0", features = ["tokio1", "serde-transport-bincode", "serde-transport", "serde", "serde-transport-json", "tcp"] }
thiserror = { version = "1.0.31" } thiserror = { version = "1.0.31" }
tinytemplate = { version = "1.2.1" } tokio = { version = "1.21.2", features = ['full'] }
tracing = { version = "0.1.34" } tracing = { version = "0.1.37" }
tracing-opentelemetry = { version = "0.17.4" }
tracing-subscriber = { version = "0.3.16", features = ["env-filter"] }
uuid = { version = "0.8", features = ["serde"] } uuid = { version = "0.8", features = ["serde"] }
dotenv = { version = "0.15.0" }
handlebars = { version = "*", features = [] }

View File

@ -5,7 +5,7 @@
<title>Reset password</title> <title>Reset password</title>
<meta name="viewport" content="width=device-width, initial-scale=1.0"> <meta name="viewport" content="width=device-width, initial-scale=1.0">
<style> <style>
{style} {{ style }}
</style> </style>
</head> </head>
<body> <body>
@ -16,17 +16,22 @@
<p class="mb-2 leading-normal text-sky-900">Resetting your password is easy.</p> <p class="mb-2 leading-normal text-sky-900">Resetting your password is easy.</p>
<p class="mb-2 leading-normal text-sky-900">Just press the button below and follow the instructions. Well have you up and running in no time.</p> <p class="mb-2 leading-normal text-sky-900">
Just press the button below and follow the instructions.
Well have you up and running in no time.
</p>
<p> <p>
<a <a
class="px-4 py-2 inline-block text-white bg-blue-600 border border-transparent rounded-r hover:bg-blue-700" class="px-4 py-2 inline-block text-white bg-blue-600 border border-transparent rounded-r hover:bg-blue-700"
href="{ url }" href="{{ url }}"
> >
Reset Password Reset Password
</a> </a>
</p> </p>
<p class="mb-2 leading-normal text-sky-900">If you did not make this request then please ignore this email.</p> <p class="mb-2 leading-normal text-sky-900">
If you did not make this request then please ignore this email.
</p>
</div> </div>
</div> </div>
</section> </section>

View File

@ -2,21 +2,21 @@
<html lang="en"> <html lang="en">
<head> <head>
<meta charset="UTF-8"> <meta charset="UTF-8">
<title>Welcome to { service_name }</title> <title>Welcome to {{ service_name }}</title>
<meta name="viewport" content="width=device-width, initial-scale=1.0"> <meta name="viewport" content="width=device-width, initial-scale=1.0">
<style> <style>
{style} {{ style }}
</style> </style>
</head> </head>
<body> <body>
<section class="debug-screens"> <section class="debug-screens">
<div class="container p-8 mx-auto mt-12 bg-white"> <div class="container p-8 mx-auto mt-12 bg-white">
<div class="rounded-lg"> <div class="rounded-lg">
<h1 class="mb-3 text-xl font-semibold tracking-tight text-sky-600">Hi { login }</h1> <h1 class="mb-3 text-xl font-semibold tracking-tight text-sky-600">Hi {{ login }}</h1>
<p class="mb-2 leading-normal text-sky-900"> <p class="mb-2 leading-normal text-sky-900">
Welcome to {service_name} were excited to have you on board and wed love to say thank you on behalf Welcome to {service_name} were excited to have you on board, and wed love to say thank you on behalf
of our whole company for chosing us. of our whole company for choosing us.
</p> </p>
<p class="mb-2 leading-normal text-sky-900">Take care,</p> <p class="mb-2 leading-normal text-sky-900">Take care,</p>
<p class="mb-2 leading-normal text-sky-900">{signature}</p> <p class="mb-2 leading-normal text-sky-900">{signature}</p>

View File

@ -0,0 +1,101 @@
use channels::emails::{reset_password, test_mail, welcome};
use super::{Error, Result};
use crate::SharedContext;
static STYLE: &str = include_str!("../assets/style.css");
pub async fn test_mail(msg: test_mail::Input, ctx: SharedContext) -> Result<()> {
welcome(
welcome::Input {
login: model::Login::new("Test User"),
email: msg.receiver,
},
ctx,
)
.await
}
#[derive(Debug, serde::Serialize)]
struct ResetPasswordContext {
url: String,
style: &'static str,
}
pub async fn reset_password(msg: reset_password::Input, ctx: SharedContext) -> Result<()> {
let host = { ctx.config.lock().web().host() };
let context = ResetPasswordContext {
url: format!(
"{host}/reset-password/{reset_token}",
reset_token = msg.reset_token
),
style: STYLE,
};
let html = ctx
.template
.render("reset-password", &context)
.map_err(|e| {
tracing::error!("{e:?}");
Error::ResetPassTemplate
})?;
let smtp_from = ctx.config.lock().mail().smtp_from();
let status = ctx
.send_grid
.send(
sendgrid::Mail::new()
.add_to((msg.email.as_str(), msg.login.as_str()).into())
.add_from(&smtp_from)
.add_subject("Reset Password")
.add_html(html.as_str())
.build(),
)
.await;
tracing::debug!("{:?}", status);
Ok(())
}
#[derive(Debug, serde::Serialize)]
struct WelcomeContext {
login: model::Login,
service_name: String,
signature: String,
style: &'static str,
}
pub async fn welcome(msg: welcome::Input, ctx: SharedContext) -> Result<()> {
let (signature, service_name) = {
let l = ctx.config.lock();
let w = l.web();
(w.signature(), w.service_name())
};
let context = WelcomeContext {
login: msg.login.clone(),
service_name,
signature,
style: STYLE,
};
let html = ctx.template.render("welcome", &context).map_err(|e| {
tracing::error!("{e:?}");
Error::ResetPassTemplate
})?;
let smtp_from = ctx.config.lock().mail().smtp_from().clone();
let status = ctx
.send_grid
.send(
sendgrid::Mail::new()
.add_to((msg.email.as_str(), msg.login.as_str()).into())
.add_from(&smtp_from)
.add_subject("Welcome")
.add_html(html.as_str())
.build(),
)
.await;
tracing::debug!("{:?}", status);
Ok(())
}

View File

@ -1,189 +0,0 @@
use std::sync::Arc;
use config::SharedAppConfig;
use serde::Serialize;
#[macro_export]
macro_rules! mail_async_handler {
($msg: ty, $async: ident, $res: ty) => {
impl actix::Handler<$msg> for EmailManager {
type Result = actix::ResponseActFuture<Self, Result<$res>>;
fn handle(&mut self, msg: $msg, _ctx: &mut Self::Context) -> Self::Result {
use actix::WrapFuture;
let inner = self.0.clone();
Box::pin(async { $async(msg, inner).await }.into_actor(self))
}
}
};
}
static STYLE: &str = include_str!("../assets/style.css");
#[derive(Debug, Copy, Clone, serde::Serialize, serde::Deserialize, thiserror::Error)]
#[serde(rename_all = "kebab-case", tag = "email")]
pub enum Error {
#[error("Failed to render reset password template")]
ResetPassTemplate,
}
pub type Result<T> = std::result::Result<T, Error>;
pub struct SendState {
pub success: bool,
}
pub struct EmailManager(Arc<Inner>);
pub(crate) struct Inner {
config: SharedAppConfig,
send_grid: sendgrid::SGClient,
template: Arc<tinytemplate::TinyTemplate<'static>>,
}
impl actix::Actor for EmailManager {
type Context = actix::Context<Self>;
}
impl EmailManager {
pub fn build(config: SharedAppConfig) -> Result<Self> {
let template = {
use tinytemplate::*;
let mut t = TinyTemplate::new();
t.add_template(
"reset-password",
include_str!("../assets/reset-password.html"),
)
.expect("Failed to load e-mail template reset-password");
t.add_template("welcome", include_str!("../assets/welcome.html"))
.expect("Failed to load e-mail template welcome");
t
};
Ok(Self(Arc::new(Inner {
config: config.clone(),
send_grid: sendgrid::SGClient::new(config.lock().mail().sendgrid_secret()),
template: Arc::new(template),
})))
}
}
#[derive(actix::Message)]
#[rtype(result = "Result<()>")]
pub struct TestMail {
pub receiver: model::Email,
}
mail_async_handler!(TestMail, test_mail, ());
pub(crate) async fn test_mail(msg: TestMail, inner: Arc<Inner>) -> Result<()> {
welcome(
Welcome {
login: model::Login::new("Test User"),
email: msg.receiver,
},
inner,
)
.await
}
#[derive(actix::Message, Debug)]
#[rtype(result = "Result<()>")]
pub struct ResetPassword {
pub login: model::Login,
pub email: model::Email,
pub reset_token: model::ResetToken,
}
#[derive(Serialize)]
struct ResetPasswordContext {
url: String,
style: &'static str,
}
mail_async_handler!(ResetPassword, reset_password, ());
pub(crate) async fn reset_password(msg: ResetPassword, inner: Arc<Inner>) -> Result<()> {
let host = { inner.config.lock().web().host() };
let context = ResetPasswordContext {
url: format!(
"{host}/reset-password/{reset_token}",
reset_token = msg.reset_token
),
style: STYLE,
};
let html = inner
.template
.render("reset-password", &context)
.map_err(|e| {
tracing::error!("{e:?}");
Error::ResetPassTemplate
})?;
let status = inner
.send_grid
.send(
sendgrid::Mail::new()
.add_to((msg.email.as_str(), msg.login.as_str()).into())
.add_from(&inner.config.lock().mail().smtp_from())
.add_subject("Reset Password")
.add_html(html.as_str())
.build(),
)
.await;
tracing::debug!("{:?}", status);
Ok(())
}
#[derive(actix::Message)]
#[rtype(result = "Result<()>")]
pub struct Welcome {
pub login: model::Login,
pub email: model::Email,
}
#[derive(Serialize)]
struct WelcomeContext {
login: model::Login,
service_name: String,
signature: String,
style: &'static str,
}
mail_async_handler!(Welcome, welcome, ());
pub(crate) async fn welcome(msg: Welcome, inner: Arc<Inner>) -> Result<()> {
let (signature, service_name) = {
let l = inner.config.lock();
let w = l.web();
(w.signature(), w.service_name())
};
let context = WelcomeContext {
login: msg.login.clone(),
service_name,
signature,
style: STYLE,
};
let html = inner.template.render("welcome", &context).map_err(|e| {
tracing::error!("{e:?}");
Error::ResetPassTemplate
})?;
let status = inner
.send_grid
.send(
sendgrid::Mail::new()
.add_to((msg.email.as_str(), msg.login.as_str()).into())
.add_from(&inner.config.lock().mail().smtp_from())
.add_subject("Welcome")
.add_html(html.as_str())
.build(),
)
.await;
tracing::debug!("{:?}", status);
Ok(())
}

View File

@ -0,0 +1,94 @@
use std::sync::Arc;
use config::{SharedAppConfig, UpdateConfig};
use tracing_subscriber::fmt::format::FmtSpan;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
// use crate::db::Database;
pub mod actions;
// pub mod db;
pub mod mqtt;
// pub mod rpc;
#[derive(Debug, Copy, Clone, serde::Serialize, serde::Deserialize, thiserror::Error)]
pub enum Error {
#[error("Failed to render reset password template")]
ResetPassTemplate,
}
pub type Result<T> = std::result::Result<T, Error>;
pub struct Opts {}
impl UpdateConfig for Opts {}
pub struct Context {
config: SharedAppConfig,
send_grid: sendgrid::SGClient,
template: handlebars::Handlebars<'static>,
// template: tinytemplate::TinyTemplate<'static>,
}
impl Context {
pub fn build(config: SharedAppConfig) -> SharedContext {
let template = {
let mut t = handlebars::Handlebars::new();
t.register_template_string/*add_template*/(
"reset-password",
include_str!("../assets/reset-password.hbs"),
)
.expect("Failed to load e-mail template reset-password");
t.register_template_string("welcome", include_str!("../assets/welcome.hbs"))
.expect("Failed to load e-mail template welcome");
t
};
Arc::new(Self {
config: config.clone(),
send_grid: sendgrid::SGClient::new(config.lock().mail().sendgrid_secret()),
template,
})
}
}
pub type SharedContext = Arc<Context>;
#[actix::main]
async fn main() {
dotenv::dotenv().ok();
init_tracing("email-sender");
let opts = Opts {};
let config = config::default_load(&opts);
let context = Context::build(config.clone());
let _mqtt_client = mqtt::start(config.clone(), context.clone()).await;
// rpc::start(config.clone(), db.clone(), mqtt_client.clone()).await;
loop {
std::thread::park();
}
}
pub fn init_tracing(_service_name: &str) {
std::env::set_var("OTEL_BSP_MAX_EXPORT_BATCH_SIZE", "12");
let tracer = {
use opentelemetry::sdk::export::trace::stdout::new_pipeline;
use opentelemetry::sdk::trace::Config;
new_pipeline()
.with_trace_config(Config::default())
.with_pretty_print(true)
.install_simple()
};
tracing_subscriber::registry()
.with(tracing_subscriber::EnvFilter::from_default_env())
.with(tracing_subscriber::fmt::layer().with_span_events(FmtSpan::NEW | FmtSpan::CLOSE))
.with(tracing_opentelemetry::layer().with_tracer(tracer))
.try_init()
.unwrap();
}

View File

@ -0,0 +1,96 @@
use channels::emails::{reset_password, test_mail};
use channels::{accounts, emails, DeserializePayload};
use config::SharedAppConfig;
use rumqttc::{Event, Incoming, Publish, QoS};
use crate::{actions, SharedContext};
pub async fn start(config: SharedAppConfig, ctx: SharedContext) -> channels::AsyncClient {
use channels::accounts::Topic as AccountTopic;
use channels::emails::Topic as EmailTopic;
let (client, mut event_loop) =
channels::mqtt::create_client(emails::CLIENT_NAME, config.clone());
client
.0
.subscribe(AccountTopic::AccountCreated, QoS::AtLeastOnce)
.await
.unwrap();
client
.0
.subscribe(EmailTopic::ResetPassword, QoS::AtLeastOnce)
.await
.unwrap();
client
.0
.subscribe(EmailTopic::Test, QoS::AtLeastOnce)
.await
.unwrap();
let spawn_client = client.clone();
let ctx = ctx.clone();
tokio::spawn(async move {
let _client = spawn_client.clone();
let ctx = ctx.clone();
loop {
let notification = event_loop.poll().await;
match notification {
Ok(Event::Incoming(Incoming::Publish(publish))) => {
tracing::info!("Received publish {:?}", publish.topic);
match publish.topic.as_str() {
t if AccountTopic::AccountCreated == t => {
on_created(publish, ctx.clone()).await
}
t if EmailTopic::ResetPassword == t => on_reset(publish, ctx.clone()).await,
t if EmailTopic::Test == t => on_test(publish, ctx.clone()).await,
_ => {}
}
}
_ => {}
}
}
});
client
}
async fn on_created(publish: Publish, ctx: SharedContext) {
if let Some(account) =
accounts::Topic::AccountCreated.deserialize_payload::<model::FullAccount>(publish.payload)
{
if let Err(e) = actions::welcome(
emails::welcome::Input {
login: account.login,
email: account.email,
},
ctx,
)
.await
{
tracing::error!("{}", e);
}
}
}
async fn on_reset(publish: Publish, ctx: SharedContext) {
if let Some(msg) =
emails::Topic::ResetPassword.deserialize_payload::<reset_password::Input>(publish.payload)
{
if let Err(e) = actions::reset_password(msg, ctx).await {
tracing::error!("{}", e);
}
}
}
async fn on_test(publish: Publish, ctx: SharedContext) {
if let Some(msg) = emails::Topic::Test.deserialize_payload::<model::Account>(publish.payload) {
let msg = test_mail::Input {
receiver: msg.email,
};
if let Err(e) = actions::test_mail(msg, ctx).await {
tracing::error!("{}", e);
}
}
}