From 190c62821f80785326f7a25324ee8fab1887d1b5 Mon Sep 17 00:00:00 2001 From: eraden Date: Sat, 5 Nov 2022 10:57:07 +0100 Subject: [PATCH] Fix receiving msg --- .env | 2 +- crates/account_manager/src/mqtt.rs | 3 +- crates/api/src/main.rs | 37 +++++----- crates/cart_manager/src/mqtt.rs | 13 +--- crates/channels/src/accounts.rs | 12 ++++ crates/channels/src/carts.rs | 12 ++++ crates/channels/src/emails.rs | 12 ++++ crates/channels/src/mqtt.rs | 2 +- crates/email_manager/src/main.rs | 7 +- crates/email_manager/src/mqtt.rs | 112 +++++++++++++++++++---------- scripts/migrate.sh | 4 +- 11 files changed, 134 insertions(+), 82 deletions(-) diff --git a/.env b/.env index eaafd74..9357723 100644 --- a/.env +++ b/.env @@ -4,7 +4,7 @@ ACCOUNT_DATABASE_URL=postgres://postgres@localhost/bazzar_accounts CART_DATABASE_URL=postgres://postgres@localhost/bazzar_carts PASS_SALT=18CHwV7eGFAea16z+qMKZg -RUST_LOG=debug +RUST_LOG=info SESSION_SECRET="NEPJs#8jjn8SK8GC7QEC^*P844UgsyEbQB8mRWXkT%3mPrwewZoc25MMby9H#R*w2KzaQgMkk#Pif$kxrLy*N5L!Ch%jxbWoa%gb" JWT_SECRET="42^iFq&ZnQbUf!hwGWXd&CpyY6QQyJmkPU%esFCvne5&Ejcb3nJ4&GyHZp!MArZLf^9*5c6!!VgM$iZ8T%d#&bWTi&xbZk2S@4RN" SIGNATURE=David diff --git a/crates/account_manager/src/mqtt.rs b/crates/account_manager/src/mqtt.rs index c577a31..25541dc 100644 --- a/crates/account_manager/src/mqtt.rs +++ b/crates/account_manager/src/mqtt.rs @@ -4,8 +4,7 @@ use rumqttc::{Event, Incoming}; use crate::db::Database; pub async fn start(config: SharedAppConfig, _db: Database) -> channels::AsyncClient { - let (client, mut event_loop) = - channels::mqtt::create_client(channels::accounts::CLIENT_NAME, config); + let (client, mut event_loop) = channels::accounts::mqtt::create_client(config); let spawn_client = client.clone(); tokio::spawn(async move { diff --git a/crates/api/src/main.rs b/crates/api/src/main.rs index 781436e..a7c9cf2 100644 --- a/crates/api/src/main.rs +++ b/crates/api/src/main.rs @@ -11,12 +11,12 @@ use actix_web::web::Data; use actix_web::{App, HttpServer}; use config::UpdateConfig; use jemallocator::Jemalloc; -use model::{AccountState, Email, Encrypt, Login, PassHash, Password, Role}; +use model::{AccountState, Email, Login, PassHash, Password, Role}; use opts::{ Command, CreateAccountCmd, CreateAccountOpts, GenerateHashOpts, Opts, ServerOpts, TestMailerOpts, }; -use rumqttc::Outgoing; +use rumqttc::Incoming; use validator::{validate_email, validate_length}; use crate::opts::ReIndexOpts; @@ -122,9 +122,7 @@ async fn create_account(opts: CreateAccountOpts) -> Result<()> { panic!("Login must have at least 4 characters and no more than 100"); } let config = config::default_load(&opts); - let db = database_manager::Database::build(config.clone()) - .await - .start(); + let pass = match opts.pass_file { Some(path) => std::fs::read_to_string(path).map_err(Error::PassFile)?, None => { @@ -149,19 +147,20 @@ async fn create_account(opts: CreateAccountOpts) -> Result<()> { if pass.trim().is_empty() { panic!("Password cannot be empty!"); } - let hash = Password::from(pass) - .encrypt(&config.lock().web().pass_salt()) - .unwrap(); - db.send(database_manager::CreateAccount { - email: Email::from_str(&opts.email).unwrap(), - login: Login::new(opts.login), - pass_hash: PassHash::from(hash), - role, - }) - .await - .unwrap() - .unwrap(); + let channel = channels::accounts::rpc::create_client(config.clone()).await; + channel + .register_account( + tarpc::context::current(), + channels::accounts::register::Input { + email: Email::from_str(&opts.email).unwrap(), + login: Login::new(opts.login), + password: Password::new(pass), + role, + }, + ) + .await + .unwrap(); Ok(()) } @@ -169,7 +168,7 @@ async fn test_mailer(opts: TestMailerOpts) -> Result<()> { let config = config::default_load(&opts); opts.update_config(&mut *config.lock()); - let (client, mut event_loop) = channels::mqtt::create_client("bazzar", config); + let (client, mut event_loop) = channels::emails::mqtt::create_client(config); client .emit_test(&model::Account { id: 0.into(), @@ -185,7 +184,7 @@ async fn test_mailer(opts: TestMailerOpts) -> Result<()> { let msg = event_loop.poll().await.unwrap(); tracing::info!("{:?}", msg); - if let rumqttc::Event::Outgoing(Outgoing::PubAck(_)) = msg { + if let rumqttc::Event::Incoming(Incoming::PubAck(_)) = msg { client.0.disconnect().await.unwrap(); break; } diff --git a/crates/cart_manager/src/mqtt.rs b/crates/cart_manager/src/mqtt.rs index 01a1ee6..7db5e35 100644 --- a/crates/cart_manager/src/mqtt.rs +++ b/crates/cart_manager/src/mqtt.rs @@ -6,19 +6,8 @@ use rumqttc::{Event, Incoming}; use crate::Database; pub async fn start(config: SharedAppConfig, _db: Database) -> channels::AsyncClient { - let mut mqtt_options = { - let l = config.lock(); - let bind = &l.account_manager().mqtt_bind; - let port = l.account_manager().mqtt_port; - tracing::info!("Starting account mqtt at {}:{}", bind, port); + let (client, mut event_loop) = channels::carts::mqtt::create_client(config.clone()); - rumqttc::MqttOptions::new(channels::accounts::CLIENT_NAME, bind, port) - }; - mqtt_options.set_keep_alive(Duration::from_secs(5)); - - let (client, mut event_loop) = rumqttc::AsyncClient::new(mqtt_options, 10); - - let client = channels::AsyncClient(client); let spawn_client = client.clone(); tokio::spawn(async move { let _client = spawn_client.clone(); diff --git a/crates/channels/src/accounts.rs b/crates/channels/src/accounts.rs index 184f897..462adb2 100644 --- a/crates/channels/src/accounts.rs +++ b/crates/channels/src/accounts.rs @@ -146,3 +146,15 @@ pub mod rpc { client } } + +pub mod mqtt { + use config::SharedAppConfig; + use rumqttc::EventLoop; + + use crate::accounts::CLIENT_NAME; + use crate::AsyncClient; + + pub fn create_client(config: SharedAppConfig) -> (AsyncClient, EventLoop) { + crate::mqtt::create_client(CLIENT_NAME, config) + } +} diff --git a/crates/channels/src/carts.rs b/crates/channels/src/carts.rs index 3566cd6..e15ed17 100644 --- a/crates/channels/src/carts.rs +++ b/crates/channels/src/carts.rs @@ -205,3 +205,15 @@ pub mod rpc { client } } + +pub mod mqtt { + use config::SharedAppConfig; + use rumqttc::EventLoop; + + use crate::carts::CLIENT_NAME; + use crate::AsyncClient; + + pub fn create_client(config: SharedAppConfig) -> (AsyncClient, EventLoop) { + crate::mqtt::create_client(CLIENT_NAME, config) + } +} diff --git a/crates/channels/src/emails.rs b/crates/channels/src/emails.rs index e141a51..3c34129 100644 --- a/crates/channels/src/emails.rs +++ b/crates/channels/src/emails.rs @@ -83,3 +83,15 @@ pub mod welcome { pub email: model::Email, } } + +pub mod mqtt { + use config::SharedAppConfig; + use rumqttc::EventLoop; + + use crate::emails::CLIENT_NAME; + use crate::AsyncClient; + + pub fn create_client(config: SharedAppConfig) -> (AsyncClient, EventLoop) { + crate::mqtt::create_client(CLIENT_NAME, config) + } +} diff --git a/crates/channels/src/mqtt.rs b/crates/channels/src/mqtt.rs index 77208ea..b11e275 100644 --- a/crates/channels/src/mqtt.rs +++ b/crates/channels/src/mqtt.rs @@ -5,7 +5,7 @@ use rumqttc::EventLoop; use crate::AsyncClient; -pub fn create_client(name: &str, config: SharedAppConfig) -> (AsyncClient, EventLoop) { +pub(crate) fn create_client(name: &str, config: SharedAppConfig) -> (AsyncClient, EventLoop) { let mut mqtt_options = { let l = config.lock(); let bind = &l.account_manager().mqtt_bind; diff --git a/crates/email_manager/src/main.rs b/crates/email_manager/src/main.rs index c3aba74..a356141 100644 --- a/crates/email_manager/src/main.rs +++ b/crates/email_manager/src/main.rs @@ -60,17 +60,12 @@ async fn main() { dotenv::dotenv().ok(); init_tracing("email-sender"); - let opts = Opts {}; - - let config = config::default_load(&opts); + let config = config::default_load(&Opts {}); let context = Context::build(config.clone()); let _mqtt_client = mqtt::start(config.clone(), context.clone()).await; // rpc::start(config.clone(), db.clone(), mqtt_client.clone()).await; - loop { - std::thread::park(); - } } pub fn init_tracing(_service_name: &str) { diff --git a/crates/email_manager/src/mqtt.rs b/crates/email_manager/src/mqtt.rs index 1cfa86d..f8f7540 100644 --- a/crates/email_manager/src/mqtt.rs +++ b/crates/email_manager/src/mqtt.rs @@ -5,55 +5,89 @@ use rumqttc::{Event, Incoming, Publish, QoS}; use crate::{actions, SharedContext}; -pub async fn start(config: SharedAppConfig, ctx: SharedContext) -> channels::AsyncClient { +pub async fn start(config: SharedAppConfig, ctx: SharedContext) { use channels::accounts::Topic as AccountTopic; use channels::emails::Topic as EmailTopic; - let (client, mut event_loop) = - channels::mqtt::create_client(emails::CLIENT_NAME, config.clone()); - - client - .0 - .subscribe(AccountTopic::AccountCreated, QoS::AtLeastOnce) - .await - .unwrap(); - client - .0 - .subscribe(EmailTopic::ResetPassword, QoS::AtLeastOnce) - .await - .unwrap(); - client - .0 - .subscribe(EmailTopic::Test, QoS::AtLeastOnce) - .await - .unwrap(); - - let spawn_client = client.clone(); - let ctx = ctx.clone(); - - tokio::spawn(async move { - let _client = spawn_client.clone(); + let account_fut = { let ctx = ctx.clone(); - loop { - let notification = event_loop.poll().await; + let config = config.clone(); + async move { + let (client, mut event_loop) = accounts::mqtt::create_client(config.clone()); + client + .0 + .subscribe(AccountTopic::AccountCreated, QoS::AtLeastOnce) + .await + .unwrap(); - match notification { - Ok(Event::Incoming(Incoming::Publish(publish))) => { - tracing::info!("Received publish {:?}", publish.topic); - match publish.topic.as_str() { - t if AccountTopic::AccountCreated == t => { - on_created(publish, ctx.clone()).await + let ctx = ctx.clone(); + loop { + let notification = event_loop.poll().await; + + match notification { + Ok(Event::Incoming(Incoming::Publish(publish))) => { + tracing::info!("Received publish {:?}", publish.topic); + match publish.topic.as_str() { + t if AccountTopic::AccountCreated == t => { + on_created(publish, ctx.clone()).await + } + t if EmailTopic::ResetPassword == t => { + on_reset(publish, ctx.clone()).await + } + t if EmailTopic::Test == t => on_test(publish, ctx.clone()).await, + _ => {} } - t if EmailTopic::ResetPassword == t => on_reset(publish, ctx.clone()).await, - t if EmailTopic::Test == t => on_test(publish, ctx.clone()).await, - _ => {} } + _ => {} } - _ => {} } } - }); - client + }; + + let emails_fut = { + let ctx = ctx.clone(); + let config = config.clone(); + async move { + let (client, mut event_loop) = emails::mqtt::create_client(config.clone()); + client + .0 + .subscribe(EmailTopic::ResetPassword, QoS::AtLeastOnce) + .await + .unwrap(); + client + .0 + .subscribe(EmailTopic::Test, QoS::AtLeastOnce) + .await + .unwrap(); + + let ctx = ctx.clone(); + loop { + let notification = event_loop.poll().await; + + match notification { + Ok(Event::Incoming(Incoming::Publish(publish))) => { + tracing::info!("Received publish {:?}", publish.topic); + match publish.topic.as_str() { + t if EmailTopic::ResetPassword == t => { + on_reset(publish, ctx.clone()).await + } + t if EmailTopic::Test == t => on_test(publish, ctx.clone()).await, + _ => {} + } + } + Ok(Event::Incoming(inc)) => { + eprintln!("{:?}", inc); + } + Ok(Event::Outgoing(out)) => { + eprintln!("{:?}", out); + } + _ => {} + } + } + } + }; + + tokio::join!(account_fut, emails_fut); } async fn on_created(publish: Publish, ctx: SharedContext) { diff --git a/scripts/migrate.sh b/scripts/migrate.sh index b99104c..0543c9e 100755 --- a/scripts/migrate.sh +++ b/scripts/migrate.sh @@ -2,8 +2,8 @@ source .env -psql postgres postgres -c "CREATE DATABASE ${DATABASE_NAME}_accounts" || 0 +psql postgres postgres -c "CREATE DATABASE ${DATABASE_NAME}_accounts" || echo 0 sqlx migrate run -D "${ACCOUNT_DATABASE_URL}" --source ./crates/account_manager/migrations -psql postgres postgres -c "CREATE DATABASE ${DATABASE_NAME}_carts" || 0 +psql postgres postgres -c "CREATE DATABASE ${DATABASE_NAME}_carts" || echo 0 sqlx migrate run -D "${CART_DATABASE_URL}" --source ./crates/cart_manager/migrations