Fix receiving msg

This commit is contained in:
eraden 2022-11-05 10:57:07 +01:00
parent eb69682c67
commit 190c62821f
11 changed files with 134 additions and 82 deletions

2
.env
View File

@ -4,7 +4,7 @@ ACCOUNT_DATABASE_URL=postgres://postgres@localhost/bazzar_accounts
CART_DATABASE_URL=postgres://postgres@localhost/bazzar_carts
PASS_SALT=18CHwV7eGFAea16z+qMKZg
RUST_LOG=debug
RUST_LOG=info
SESSION_SECRET="NEPJs#8jjn8SK8GC7QEC^*P844UgsyEbQB8mRWXkT%3mPrwewZoc25MMby9H#R*w2KzaQgMkk#Pif$kxrLy*N5L!Ch%jxbWoa%gb"
JWT_SECRET="42^iFq&ZnQbUf!hwGWXd&CpyY6QQyJmkPU%esFCvne5&Ejcb3nJ4&GyHZp!MArZLf^9*5c6!!VgM$iZ8T%d#&bWTi&xbZk2S@4RN"
SIGNATURE=David

View File

@ -4,8 +4,7 @@ use rumqttc::{Event, Incoming};
use crate::db::Database;
pub async fn start(config: SharedAppConfig, _db: Database) -> channels::AsyncClient {
let (client, mut event_loop) =
channels::mqtt::create_client(channels::accounts::CLIENT_NAME, config);
let (client, mut event_loop) = channels::accounts::mqtt::create_client(config);
let spawn_client = client.clone();
tokio::spawn(async move {

View File

@ -11,12 +11,12 @@ use actix_web::web::Data;
use actix_web::{App, HttpServer};
use config::UpdateConfig;
use jemallocator::Jemalloc;
use model::{AccountState, Email, Encrypt, Login, PassHash, Password, Role};
use model::{AccountState, Email, Login, PassHash, Password, Role};
use opts::{
Command, CreateAccountCmd, CreateAccountOpts, GenerateHashOpts, Opts, ServerOpts,
TestMailerOpts,
};
use rumqttc::Outgoing;
use rumqttc::Incoming;
use validator::{validate_email, validate_length};
use crate::opts::ReIndexOpts;
@ -122,9 +122,7 @@ async fn create_account(opts: CreateAccountOpts) -> Result<()> {
panic!("Login must have at least 4 characters and no more than 100");
}
let config = config::default_load(&opts);
let db = database_manager::Database::build(config.clone())
.await
.start();
let pass = match opts.pass_file {
Some(path) => std::fs::read_to_string(path).map_err(Error::PassFile)?,
None => {
@ -149,19 +147,20 @@ async fn create_account(opts: CreateAccountOpts) -> Result<()> {
if pass.trim().is_empty() {
panic!("Password cannot be empty!");
}
let hash = Password::from(pass)
.encrypt(&config.lock().web().pass_salt())
.unwrap();
db.send(database_manager::CreateAccount {
email: Email::from_str(&opts.email).unwrap(),
login: Login::new(opts.login),
pass_hash: PassHash::from(hash),
role,
})
.await
.unwrap()
.unwrap();
let channel = channels::accounts::rpc::create_client(config.clone()).await;
channel
.register_account(
tarpc::context::current(),
channels::accounts::register::Input {
email: Email::from_str(&opts.email).unwrap(),
login: Login::new(opts.login),
password: Password::new(pass),
role,
},
)
.await
.unwrap();
Ok(())
}
@ -169,7 +168,7 @@ async fn test_mailer(opts: TestMailerOpts) -> Result<()> {
let config = config::default_load(&opts);
opts.update_config(&mut *config.lock());
let (client, mut event_loop) = channels::mqtt::create_client("bazzar", config);
let (client, mut event_loop) = channels::emails::mqtt::create_client(config);
client
.emit_test(&model::Account {
id: 0.into(),
@ -185,7 +184,7 @@ async fn test_mailer(opts: TestMailerOpts) -> Result<()> {
let msg = event_loop.poll().await.unwrap();
tracing::info!("{:?}", msg);
if let rumqttc::Event::Outgoing(Outgoing::PubAck(_)) = msg {
if let rumqttc::Event::Incoming(Incoming::PubAck(_)) = msg {
client.0.disconnect().await.unwrap();
break;
}

View File

@ -6,19 +6,8 @@ use rumqttc::{Event, Incoming};
use crate::Database;
pub async fn start(config: SharedAppConfig, _db: Database) -> channels::AsyncClient {
let mut mqtt_options = {
let l = config.lock();
let bind = &l.account_manager().mqtt_bind;
let port = l.account_manager().mqtt_port;
tracing::info!("Starting account mqtt at {}:{}", bind, port);
let (client, mut event_loop) = channels::carts::mqtt::create_client(config.clone());
rumqttc::MqttOptions::new(channels::accounts::CLIENT_NAME, bind, port)
};
mqtt_options.set_keep_alive(Duration::from_secs(5));
let (client, mut event_loop) = rumqttc::AsyncClient::new(mqtt_options, 10);
let client = channels::AsyncClient(client);
let spawn_client = client.clone();
tokio::spawn(async move {
let _client = spawn_client.clone();

View File

@ -146,3 +146,15 @@ pub mod rpc {
client
}
}
pub mod mqtt {
use config::SharedAppConfig;
use rumqttc::EventLoop;
use crate::accounts::CLIENT_NAME;
use crate::AsyncClient;
pub fn create_client(config: SharedAppConfig) -> (AsyncClient, EventLoop) {
crate::mqtt::create_client(CLIENT_NAME, config)
}
}

View File

@ -205,3 +205,15 @@ pub mod rpc {
client
}
}
pub mod mqtt {
use config::SharedAppConfig;
use rumqttc::EventLoop;
use crate::carts::CLIENT_NAME;
use crate::AsyncClient;
pub fn create_client(config: SharedAppConfig) -> (AsyncClient, EventLoop) {
crate::mqtt::create_client(CLIENT_NAME, config)
}
}

View File

@ -83,3 +83,15 @@ pub mod welcome {
pub email: model::Email,
}
}
pub mod mqtt {
use config::SharedAppConfig;
use rumqttc::EventLoop;
use crate::emails::CLIENT_NAME;
use crate::AsyncClient;
pub fn create_client(config: SharedAppConfig) -> (AsyncClient, EventLoop) {
crate::mqtt::create_client(CLIENT_NAME, config)
}
}

View File

@ -5,7 +5,7 @@ use rumqttc::EventLoop;
use crate::AsyncClient;
pub fn create_client(name: &str, config: SharedAppConfig) -> (AsyncClient, EventLoop) {
pub(crate) fn create_client(name: &str, config: SharedAppConfig) -> (AsyncClient, EventLoop) {
let mut mqtt_options = {
let l = config.lock();
let bind = &l.account_manager().mqtt_bind;

View File

@ -60,17 +60,12 @@ async fn main() {
dotenv::dotenv().ok();
init_tracing("email-sender");
let opts = Opts {};
let config = config::default_load(&opts);
let config = config::default_load(&Opts {});
let context = Context::build(config.clone());
let _mqtt_client = mqtt::start(config.clone(), context.clone()).await;
// rpc::start(config.clone(), db.clone(), mqtt_client.clone()).await;
loop {
std::thread::park();
}
}
pub fn init_tracing(_service_name: &str) {

View File

@ -5,55 +5,89 @@ use rumqttc::{Event, Incoming, Publish, QoS};
use crate::{actions, SharedContext};
pub async fn start(config: SharedAppConfig, ctx: SharedContext) -> channels::AsyncClient {
pub async fn start(config: SharedAppConfig, ctx: SharedContext) {
use channels::accounts::Topic as AccountTopic;
use channels::emails::Topic as EmailTopic;
let (client, mut event_loop) =
channels::mqtt::create_client(emails::CLIENT_NAME, config.clone());
client
.0
.subscribe(AccountTopic::AccountCreated, QoS::AtLeastOnce)
.await
.unwrap();
client
.0
.subscribe(EmailTopic::ResetPassword, QoS::AtLeastOnce)
.await
.unwrap();
client
.0
.subscribe(EmailTopic::Test, QoS::AtLeastOnce)
.await
.unwrap();
let spawn_client = client.clone();
let ctx = ctx.clone();
tokio::spawn(async move {
let _client = spawn_client.clone();
let account_fut = {
let ctx = ctx.clone();
loop {
let notification = event_loop.poll().await;
let config = config.clone();
async move {
let (client, mut event_loop) = accounts::mqtt::create_client(config.clone());
client
.0
.subscribe(AccountTopic::AccountCreated, QoS::AtLeastOnce)
.await
.unwrap();
match notification {
Ok(Event::Incoming(Incoming::Publish(publish))) => {
tracing::info!("Received publish {:?}", publish.topic);
match publish.topic.as_str() {
t if AccountTopic::AccountCreated == t => {
on_created(publish, ctx.clone()).await
let ctx = ctx.clone();
loop {
let notification = event_loop.poll().await;
match notification {
Ok(Event::Incoming(Incoming::Publish(publish))) => {
tracing::info!("Received publish {:?}", publish.topic);
match publish.topic.as_str() {
t if AccountTopic::AccountCreated == t => {
on_created(publish, ctx.clone()).await
}
t if EmailTopic::ResetPassword == t => {
on_reset(publish, ctx.clone()).await
}
t if EmailTopic::Test == t => on_test(publish, ctx.clone()).await,
_ => {}
}
t if EmailTopic::ResetPassword == t => on_reset(publish, ctx.clone()).await,
t if EmailTopic::Test == t => on_test(publish, ctx.clone()).await,
_ => {}
}
_ => {}
}
_ => {}
}
}
});
client
};
let emails_fut = {
let ctx = ctx.clone();
let config = config.clone();
async move {
let (client, mut event_loop) = emails::mqtt::create_client(config.clone());
client
.0
.subscribe(EmailTopic::ResetPassword, QoS::AtLeastOnce)
.await
.unwrap();
client
.0
.subscribe(EmailTopic::Test, QoS::AtLeastOnce)
.await
.unwrap();
let ctx = ctx.clone();
loop {
let notification = event_loop.poll().await;
match notification {
Ok(Event::Incoming(Incoming::Publish(publish))) => {
tracing::info!("Received publish {:?}", publish.topic);
match publish.topic.as_str() {
t if EmailTopic::ResetPassword == t => {
on_reset(publish, ctx.clone()).await
}
t if EmailTopic::Test == t => on_test(publish, ctx.clone()).await,
_ => {}
}
}
Ok(Event::Incoming(inc)) => {
eprintln!("{:?}", inc);
}
Ok(Event::Outgoing(out)) => {
eprintln!("{:?}", out);
}
_ => {}
}
}
}
};
tokio::join!(account_fut, emails_fut);
}
async fn on_created(publish: Publish, ctx: SharedContext) {

View File

@ -2,8 +2,8 @@
source .env
psql postgres postgres -c "CREATE DATABASE ${DATABASE_NAME}_accounts" || 0
psql postgres postgres -c "CREATE DATABASE ${DATABASE_NAME}_accounts" || echo 0
sqlx migrate run -D "${ACCOUNT_DATABASE_URL}" --source ./crates/account_manager/migrations
psql postgres postgres -c "CREATE DATABASE ${DATABASE_NAME}_carts" || 0
psql postgres postgres -c "CREATE DATABASE ${DATABASE_NAME}_carts" || echo 0
sqlx migrate run -D "${CART_DATABASE_URL}" --source ./crates/cart_manager/migrations