Change account manager into microservice

This commit is contained in:
eraden 2022-11-04 10:13:44 +01:00
parent c994f00076
commit 8e037fe1e7
19 changed files with 683 additions and 449 deletions

221
Cargo.lock generated
View File

@ -9,14 +9,17 @@ dependencies = [
"actix 0.13.0",
"actix-rt",
"bincode",
"bus",
"bytes",
"channels",
"config",
"database_manager",
"dotenv",
"futures 0.3.25",
"gumdrop",
"json",
"model",
"opentelemetry 0.17.0",
"opentelemetry-jaeger",
"pretty_env_logger",
"rumqttc",
"serde",
@ -24,6 +27,7 @@ dependencies = [
"thiserror",
"tokio",
"tracing",
"tracing-opentelemetry",
"tracing-subscriber",
]
@ -419,8 +423,8 @@ dependencies = [
"actix-http",
"actix-web",
"futures-util",
"opentelemetry",
"opentelemetry-semantic-conventions",
"opentelemetry 0.17.0",
"opentelemetry-semantic-conventions 0.9.0",
"serde",
]
@ -653,7 +657,6 @@ checksum = "b645a089122eccb6111b4f81cbc1a49f5900ac4666bb93ac027feaecf15607bf"
name = "bazzar"
version = "0.1.0"
dependencies = [
"account_manager",
"actix 0.13.0",
"actix-broker",
"actix-cors",
@ -669,6 +672,7 @@ dependencies = [
"async-trait",
"bytes",
"cart_manager",
"channels",
"chrono",
"config",
"database_manager",
@ -694,6 +698,7 @@ dependencies = [
"serde_json",
"sqlx",
"sqlx-core",
"tarpc",
"tera",
"thiserror",
"token_manager",
@ -791,19 +796,6 @@ version = "3.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "572f695136211188308f16ad2ca5c851a712c464060ae6974944458eb83880ba"
[[package]]
name = "bus"
version = "0.1.0"
dependencies = [
"bincode",
"bytes",
"model",
"rumqttc",
"serde",
"thiserror",
"tracing",
]
[[package]]
name = "bytecodec"
version = "0.4.15"
@ -841,7 +833,7 @@ version = "0.1.0"
dependencies = [
"actix 0.13.0",
"actix-rt",
"bus",
"channels",
"chrono",
"config",
"database_manager",
@ -875,6 +867,21 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "channels"
version = "0.1.0"
dependencies = [
"bincode",
"bytes",
"config",
"model",
"rumqttc",
"serde",
"tarpc",
"thiserror",
"tracing",
]
[[package]]
name = "chrono"
version = "0.4.22"
@ -1329,6 +1336,18 @@ dependencies = [
"syn",
]
[[package]]
name = "educe"
version = "0.4.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cb0188e3c3ba8df5753894d54461f0e39bc91741dc5b22e1c46999ec2c71f4e4"
dependencies = [
"enum-ordinalize",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "either"
version = "1.8.0"
@ -1370,6 +1389,20 @@ dependencies = [
"cfg-if 1.0.0",
]
[[package]]
name = "enum-ordinalize"
version = "3.1.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a62bb1df8b45ecb7ffa78dca1c17a438fb193eb083db0b1b494d2a61bcb5096a"
dependencies = [
"num-bigint",
"num-traits",
"proc-macro2",
"quote",
"rustc_version 0.4.0",
"syn",
]
[[package]]
name = "env_logger"
version = "0.7.1"
@ -2172,6 +2205,12 @@ dependencies = [
"cfg-if 1.0.0",
]
[[package]]
name = "integer-encoding"
version = "3.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02"
[[package]]
name = "intl-memoizer"
version = "0.5.1"
@ -2411,6 +2450,15 @@ dependencies = [
"winapi 0.3.9",
]
[[package]]
name = "matchers"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558"
dependencies = [
"regex-automata",
]
[[package]]
name = "matches"
version = "0.1.9"
@ -2785,13 +2833,81 @@ dependencies = [
"tokio-stream",
]
[[package]]
name = "opentelemetry"
version = "0.18.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69d6c3d7288a106c0a363e4b0e8d308058d56902adefb16f4936f417ffef086e"
dependencies = [
"opentelemetry_api",
"opentelemetry_sdk",
]
[[package]]
name = "opentelemetry-jaeger"
version = "0.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e785d273968748578931e4dc3b4f5ec86b26e09d9e0d66b55adda7fce742f7a"
dependencies = [
"async-trait",
"futures 0.3.25",
"futures-executor",
"once_cell",
"opentelemetry 0.18.0",
"opentelemetry-semantic-conventions 0.10.0",
"thiserror",
"thrift",
]
[[package]]
name = "opentelemetry-semantic-conventions"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "985cc35d832d412224b2cffe2f9194b1b89b6aa5d0bef76d080dce09d90e62bd"
dependencies = [
"opentelemetry",
"opentelemetry 0.17.0",
]
[[package]]
name = "opentelemetry-semantic-conventions"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b02e0230abb0ab6636d18e2ba8fa02903ea63772281340ccac18e0af3ec9eeb"
dependencies = [
"opentelemetry 0.18.0",
]
[[package]]
name = "opentelemetry_api"
version = "0.18.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c24f96e21e7acc813c7a8394ee94978929db2bcc46cf6b5014fc612bf7760c22"
dependencies = [
"futures-channel",
"futures-util",
"indexmap",
"js-sys",
"once_cell",
"pin-project-lite",
"thiserror",
]
[[package]]
name = "opentelemetry_sdk"
version = "0.18.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ca41c4933371b61c2a2f214bf16931499af4ec90543604ec828f7a625c09113"
dependencies = [
"async-trait",
"crossbeam-channel",
"futures-channel",
"futures-executor",
"futures-util",
"once_cell",
"opentelemetry_api",
"percent-encoding",
"rand",
"thiserror",
]
[[package]]
@ -2812,6 +2928,15 @@ dependencies = [
"uuid 1.2.1",
]
[[package]]
name = "ordered-float"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3305af35278dd29f46fcdd139e0b1fbfae2153f0e5928b39b035542dd31e37b7"
dependencies = [
"num-traits",
]
[[package]]
name = "os_type"
version = "2.6.0"
@ -3248,6 +3373,15 @@ dependencies = [
"regex-syntax",
]
[[package]]
name = "regex-automata"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132"
dependencies = [
"regex-syntax",
]
[[package]]
name = "regex-syntax"
version = "0.6.27"
@ -3952,13 +4086,15 @@ dependencies = [
"fnv",
"futures 0.3.25",
"humantime 2.1.0",
"opentelemetry",
"opentelemetry 0.17.0",
"pin-project",
"rand",
"serde",
"static_assertions",
"tarpc-plugins",
"thiserror",
"tokio",
"tokio-serde",
"tokio-util 0.7.4",
"tracing",
"tracing-opentelemetry",
@ -4063,6 +4199,28 @@ dependencies = [
"once_cell",
]
[[package]]
name = "threadpool"
version = "1.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d050e60b33d41c19108b32cea32164033a9013fe3b46cbd4457559bfbf77afaa"
dependencies = [
"num_cpus",
]
[[package]]
name = "thrift"
version = "0.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09678c4cdbb4eed72e18b7c2af1329c69825ed16fcbac62d083fc3e2b0590ff0"
dependencies = [
"byteorder",
"integer-encoding",
"log",
"ordered-float",
"threadpool",
]
[[package]]
name = "time"
version = "0.1.44"
@ -4219,6 +4377,22 @@ dependencies = [
"webpki",
]
[[package]]
name = "tokio-serde"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "911a61637386b789af998ee23f50aa30d5fd7edcec8d6d3dedae5e5815205466"
dependencies = [
"bincode",
"bytes",
"educe",
"futures-core",
"futures-sink",
"pin-project",
"serde",
"serde_json",
]
[[package]]
name = "tokio-stream"
version = "0.1.11"
@ -4326,9 +4500,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fbbe89715c1dbbb790059e2565353978564924ee85017b5fff365c872ff6721f"
dependencies = [
"once_cell",
"opentelemetry",
"opentelemetry 0.17.0",
"tracing",
"tracing-core",
"tracing-log",
"tracing-subscriber",
]
@ -4338,10 +4513,14 @@ version = "0.3.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a6176eae26dd70d0c919749377897b54a9276bd7061339665dd68777926b5a70"
dependencies = [
"matchers",
"nu-ansi-term",
"once_cell",
"regex",
"sharded-slab",
"smallvec",
"thread_local",
"tracing",
"tracing-core",
"tracing-log",
]

View File

@ -2,7 +2,7 @@
members = [
# shared
"shared/model",
"shared/bus",
"shared/channels",
"shared/config",
"shared/testx",
# actors

View File

@ -11,19 +11,23 @@ path = "./src/main.rs"
actix = { version = "0.13", features = [] }
actix-rt = { version = "2.7", features = [] }
bincode = { version = "1.3.3" }
bus = { path = "../../shared/bus" }
bytes = { version = "1.2.1" }
channels = { path = "../../shared/channels" }
config = { path = "../../shared/config" }
database_manager = { path = "../database_manager" }
dotenv = { version = "0.15.0" }
futures = { version = "0.3.25" }
gumdrop = { version = "0.8.1" }
json = { version = "0.12.4" }
model = { path = "../../shared/model" }
opentelemetry = { version = "0.17.0" }
opentelemetry-jaeger = { version = "0.17.0" }
pretty_env_logger = { version = "0.4", features = [] }
rumqttc = { version = "*" }
serde = { version = "1.0.137", features = ["derive"] }
tarpc = { version = "0.30.0", features = ["tokio1"] }
tarpc = { version = "0.30.0", features = ["tokio1", "serde-transport-bincode", "serde-transport", "serde", "serde-transport-json", "tcp"] }
thiserror = { version = "1.0.31" }
tokio = { version = "1.21.2", features = ['full'] }
tracing = { version = "0.1.6" }
tracing-subscriber = { version = "0.3.16" }
tracing-opentelemetry = { version = "0.17.4" }
tracing-subscriber = { version = "0.3.16", features = ["env-filter"] }

View File

@ -0,0 +1,82 @@
use channels::account::{CreateAccount, MeResult};
use config::SharedAppConfig;
use database_manager::Database;
use model::{Encrypt, FullAccount};
use crate::{Error, Result};
#[allow(unused)]
pub async fn me(account_id: model::AccountId, db: Database) -> MeResult {
use channels::account::Error;
let msg = database_manager::FindAccount { account_id };
let account: model::FullAccount = match msg.inner_find_account(db.pool().clone()).await {
Ok(account) => account,
Err(e) => {
tracing::error!("{}", e);
return MeResult {
error: Some(Error::Account),
..Default::default()
};
}
};
let msg = database_manager::AccountAddresses { account_id };
let addresses = match msg.inner_account_addresses(db.pool().clone()).await {
Ok(v) => v,
Err(e) => {
tracing::error!("{}", e);
return MeResult {
error: Some(Error::Addresses),
..Default::default()
};
}
};
MeResult {
account: Some(account),
addresses: Some(addresses),
..Default::default()
}
}
pub async fn create_account(
msg: CreateAccount,
db: &Database,
config: SharedAppConfig,
) -> Result<FullAccount> {
let hash = msg
.password
.encrypt(&config.lock().web().pass_salt())
.map_err(|e| {
tracing::error!("{e:?}");
Error::Hashing
})?;
let mut t = db.pool().begin().await.map_err(|e| {
tracing::error!("{}", e);
Error::DbCritical
})?;
let account: FullAccount = match database_manager::create_account(
database_manager::CreateAccount {
email: msg.email,
login: msg.login,
pass_hash: model::PassHash::new(hash),
role: msg.role,
},
&mut t,
)
.await
{
Ok(r) => r,
Err(e) => {
tracing::error!("{}", e);
t.rollback().await.ok();
return Err(Error::Saving);
}
};
t.commit().await.map_err(|e| {
tracing::error!("{}", e);
Error::DbCritical
})?;
Ok(account)
}

View File

@ -0,0 +1,43 @@
use std::net::SocketAddr;
use std::time::Duration;
use config::UpdateConfig;
use tarpc::tokio_serde::formats::Json;
use tarpc::{client, context};
use tokio::time::sleep;
#[derive(gumdrop::Options)]
struct Flags {
help: bool,
/// Sets the name to say hello to.
name: String,
}
impl UpdateConfig for Flags {}
#[tokio::main]
async fn main() -> std::io::Result<()> {
let opts: Flags = gumdrop::Options::parse_args_default_or_exit();
let config = config::default_load(&opts);
let client = channels::account::rpc::create_client(config).await;
let r = client.me(context::current(), 1.into()).await;
println!("{:?}", r);
let hello = async move {
tokio::join! {
client.me(context::current(), 1.into()),
client.me(context::current(), 2.into()),
}
}
.await;
eprintln!("{:?}", hello);
// Let the background span processor finish.
sleep(Duration::from_micros(1)).await;
opentelemetry::global::shutdown_tracer_provider();
Ok(())
}

View File

@ -1,164 +0,0 @@
use actix::Addr;
use config::SharedAppConfig;
use database_manager::query_db;
use model::{Email, Encrypt, FullAccount, Login, Password, Role};
#[derive(Debug, Copy, Clone, serde::Serialize, thiserror::Error)]
#[serde(rename_all = "kebab-case", tag = "account")]
pub enum Error {
#[error("Unable to send or receive msg from database")]
DbCritical,
#[error("Failed to load account data")]
Account,
#[error("Failed to load account addresses")]
Addresses,
#[error("Unable to save record")]
Saving,
#[error("Unable to hash password")]
Hashing,
#[error("{0}")]
Db(#[from] database_manager::Error),
}
pub type Result<T> = std::result::Result<T, Error>;
#[macro_export]
macro_rules! account_async_handler {
($msg: ty, $async: ident, $res: ty) => {
impl actix::Handler<$msg> for AccountManager {
type Result = actix::ResponseActFuture<Self, Result<$res>>;
fn handle(&mut self, msg: $msg, _ctx: &mut Self::Context) -> Self::Result {
use actix::WrapFuture;
let db = self.db.clone();
let config = self.config.clone();
Box::pin(async { $async(msg, db, config).await }.into_actor(self))
}
}
};
}
#[macro_export]
macro_rules! query_account {
($cart: expr, $msg: expr, default $fail: expr) => {
match $cart.send($msg).await {
Ok(Ok(r)) => r,
Ok(Err(e)) => {
tracing::error!("{e}");
$fail
}
Err(e) => {
tracing::error!("{e:?}");
$fail
}
}
};
($cart: expr, $msg: expr, $fail: expr) => {
$crate::query_cart!($cart, $msg, $fail, $fail)
};
($cart: expr, $msg: expr, $db_fail: expr, $act_fail: expr) => {
match $cart.send($msg).await {
Ok(Ok(r)) => r,
Ok(Err(e)) => {
tracing::error!("{e}");
return Err($db_fail);
}
Err(e) => {
tracing::error!("{e:?}");
return Err($act_fail);
}
}
};
}
pub struct AccountManager {
db: Addr<database_manager::Database>,
config: SharedAppConfig,
}
impl AccountManager {
pub fn new(config: SharedAppConfig, db: Addr<database_manager::Database>) -> Self {
Self { config, db }
}
}
impl actix::Actor for AccountManager {
type Context = actix::Context<Self>;
}
pub struct MeResult {
pub account: FullAccount,
pub addresses: Vec<model::AccountAddress>,
}
#[derive(actix::Message, Debug)]
#[rtype(result = "Result<MeResult>")]
pub struct Me {
pub account_id: model::AccountId,
}
account_async_handler!(Me, me, MeResult);
pub(crate) async fn me(
msg: Me,
db: Addr<database_manager::Database>,
_config: SharedAppConfig,
) -> Result<MeResult> {
let account: FullAccount = query_db!(
db,
database_manager::FindAccount {
account_id: msg.account_id
},
Error::Account
);
let addresses = query_db!(
db,
database_manager::AccountAddresses {
account_id: msg.account_id
},
Error::Addresses
);
Ok(MeResult { account, addresses })
}
#[derive(actix::Message)]
#[rtype(result = "Result<FullAccount>")]
pub struct CreateAccount {
pub email: Email,
pub login: Login,
pub password: Password,
pub role: Role,
}
account_async_handler!(CreateAccount, create_account, FullAccount);
pub(crate) async fn create_account(
msg: CreateAccount,
db: Addr<database_manager::Database>,
config: SharedAppConfig,
) -> Result<FullAccount> {
let hash = {
match msg.password.encrypt(&config.lock().web().pass_salt()) {
Ok(hash) => hash,
Err(e) => {
tracing::error!("{e:?}");
return Err(Error::Hashing);
}
}
};
let account: FullAccount = query_db!(
db,
database_manager::CreateAccount {
email: msg.email,
login: msg.login,
pass_hash: model::PassHash::new(hash),
role: msg.role,
},
Error::DbCritical,
Error::Saving
);
Ok(account)
}

View File

@ -1,12 +1,16 @@
#![feature(structural_match)]
use std::time::Duration;
use std::env;
use bus::account::{AccountFailure, CreateAccount, Topic};
use config::{SharedAppConfig, UpdateConfig};
use config::UpdateConfig;
use database_manager::Database;
use model::{Encrypt, FullAccount};
use rumqttc::{AsyncClient, Event, Incoming, MqttOptions, QoS};
use tracing_subscriber::fmt::format::FmtSpan;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
pub mod actions;
pub mod mqtt;
pub mod rpc;
pub type Result<T> = std::result::Result<T, Error>;
@ -33,7 +37,7 @@ impl UpdateConfig for Opts {}
#[actix::main]
async fn main() {
dotenv::dotenv().ok();
tracing_subscriber::fmt::init();
init_tracing("account-manager");
let opts = Opts {};
@ -41,213 +45,26 @@ async fn main() {
let db = Database::build(config.clone()).await;
mqtt::start(config, &db).await;
let mqtt_client = mqtt::start(config.clone(), db.clone()).await;
rpc::start(config.clone(), db.clone(), mqtt_client.clone()).await;
}
mod grpc {
use config::SharedAppConfig;
use database_manager::Database;
use futures::future::{self, Ready};
use futures::prelude::*;
use futures::stream::StreamExt;
use json::JsonValue;
use tarpc::server::incoming::Incoming;
use tarpc::server::{self, Channel};
use tarpc::{client, context};
pub fn init_tracing(_service_name: &str) {
env::set_var("OTEL_BSP_MAX_EXPORT_BATCH_SIZE", "12");
#[derive(Debug, Copy, Clone, serde::Serialize, thiserror::Error)]
#[serde(rename_all = "kebab-case", tag = "account")]
pub enum Error {
#[error("Unable to send or receive msg from database")]
DbCritical,
#[error("Failed to load account data")]
Account,
#[error("Failed to load account addresses")]
Addresses,
#[error("Unable to save record")]
Saving,
#[error("Unable to hash password")]
Hashing,
#[error("{0}")]
Db(#[from] database_manager::Error),
}
let tracer = {
use opentelemetry::sdk::export::trace::stdout::new_pipeline;
use opentelemetry::sdk::trace::Config;
new_pipeline()
.with_trace_config(Config::default())
.with_pretty_print(true)
.install_simple()
};
pub type Result<T> = std::result::Result<T, Error>;
pub struct MeResult {
pub account: model::FullAccount,
pub addresses: Vec<model::AccountAddress>,
}
#[tarpc::service]
trait Accounts {
/// Returns a greeting for name.
async fn me(account_id: model::AccountId) -> String;
}
#[derive(Clone)]
struct AccountsServer {
db: Database,
}
impl Accounts for AccountsServer {
// Each defined rpc generates two items in the trait, a fn that serves the RPC,
// and an associated type representing the future output by the fn.
type AccountsFut = Ready<String>;
fn me(self, _: context::Context, account_id: model::AccountId) -> Self::AccountsFut {
future::ready(format!("Hello, {name}!"))
}
}
async fn me(
account_id: model::AccountId,
db: Database,
_config: SharedAppConfig,
) -> Result<MeResult> {
let account: model::FullAccount = query_db!(
db,
database_manager::FindAccount {
account_id: msg.account_id
},
Error::Account
);
let addresses = query_db!(
db,
database_manager::AccountAddresses {
account_id: msg.account_id
},
Error::Addresses
);
Ok(MeResult { account, addresses })
}
async fn start(config: SharedAppConfig) {
let port = { config.lock().account_manager().port };
}
}
mod mqtt {
use std::time::Duration;
use account_manager::CreateAccount;
use bus::account::{AccountFailure, Topic};
use config::SharedAppConfig;
use database_manager::Database;
use model::{Encrypt, FullAccount};
use rumqttc::{AsyncClient, Event, Incoming, MqttOptions, QoS};
use crate::{Error, Result};
pub async fn start(config: SharedAppConfig, db: &Database) {
let mut mqtt_options = MqttOptions::new(bus::account::CLIENT_NAME, "0.0.0.0", 1883);
mqtt_options.set_keep_alive(Duration::from_secs(5));
let (client, mut event_loop) = AsyncClient::new(mqtt_options, 10);
client
.subscribe(Topic::CreateAccount, QoS::AtLeastOnce)
.await
.unwrap();
let client = bus::AsyncClient(client);
loop {
let notification = event_loop.poll().await;
match notification {
Ok(Event::Incoming(Incoming::Publish(publish))) => match publish.topic.as_str() {
topic if Topic::CreateAccount == topic => {
if let Ok(msg) = CreateAccount::try_from(publish.payload) {
match create_account(msg, &db, config.clone()).await {
Ok(account) => {
client
.publish_or_log(
Topic::AccountCreated,
QoS::AtLeastOnce,
true,
model::Account::from(account),
)
.await;
}
Err(e) => {
tracing::error!("{}", e);
let m = match e {
Error::Hashing => {
Some(AccountFailure::FailedToHashPassword)
}
Error::Saving => Some(AccountFailure::SaveAccount),
Error::DbCritical => {
Some(AccountFailure::InternalServerError)
}
_ => None,
};
if let Some(m) = m {
client
.publish_or_log(
Topic::SignUpFailure,
QoS::AtLeastOnce,
true,
m,
)
.await;
}
}
}
}
}
_ => {}
},
Ok(Event::Incoming(_incoming)) => {}
Ok(Event::Outgoing(_outgoing)) => {}
Err(e) => {
tracing::error!("{}", e);
}
}
}
}
pub(crate) async fn create_account(
msg: CreateAccount,
db: &database_manager::Database,
config: SharedAppConfig,
) -> Result<FullAccount> {
let hash = {
match msg.password.encrypt(&config.lock().web().pass_salt()) {
Ok(hash) => hash,
Err(e) => {
tracing::error!("{e:?}");
return Err(Error::Hashing);
}
}
};
let mut t = db.pool().begin().await.map_err(|e| {
tracing::error!("{}", e);
Error::DbCritical
})?;
let account: FullAccount = match database_manager::create_account(
database_manager::CreateAccount {
email: msg.email,
login: msg.login,
pass_hash: model::PassHash::new(hash),
role: msg.role,
},
&mut t,
)
.await
{
Ok(r) => r,
Err(e) => {
tracing::error!("{}", e);
t.rollback().await.ok();
return Err(Error::Saving);
}
};
t.commit().await.map_err(|e| {
tracing::error!("{}", e);
Error::DbCritical
})?;
Ok(account)
}
tracing_subscriber::registry()
.with(tracing_subscriber::EnvFilter::from_default_env())
.with(tracing_subscriber::fmt::layer().with_span_events(FmtSpan::NEW | FmtSpan::CLOSE))
.with(tracing_opentelemetry::layer().with_tracer(tracer))
.try_init()
.unwrap();
}

View File

@ -0,0 +1,99 @@
use std::time::Duration;
use channels::account::{AccountFailure, CreateAccount, Topic};
use config::SharedAppConfig;
use database_manager::Database;
use rumqttc::{AsyncClient, Event, Incoming, MqttOptions, QoS};
use crate::{actions, Error};
pub async fn start(config: SharedAppConfig, db: Database) -> channels::AsyncClient {
tracing::info!("Starting account mqtt at 0.0.0.0:1883");
let mut mqtt_options = MqttOptions::new(channels::account::CLIENT_NAME, "0.0.0.0", 1883);
mqtt_options.set_keep_alive(Duration::from_secs(5));
let (client, mut event_loop) = AsyncClient::new(mqtt_options, 10);
client
.subscribe(Topic::CreateAccount, QoS::AtLeastOnce)
.await
.unwrap();
let client = channels::AsyncClient(client);
let spawn_client = client.clone();
tokio::spawn(async move {
let client = spawn_client.clone();
loop {
let notification = event_loop.poll().await;
match notification {
Ok(Event::Incoming(Incoming::Publish(publish))) => match publish.topic.as_str() {
topic if Topic::CreateAccount == topic => {
if let Ok(channels::account::CreateAccount {
email,
login,
password,
role,
}) = channels::account::CreateAccount::try_from(publish.payload)
{
match actions::create_account(
CreateAccount {
email,
login,
password,
role,
},
&db,
config.clone(),
)
.await
{
Ok(account) => {
client
.publish_or_log(
Topic::AccountCreated,
QoS::AtLeastOnce,
true,
model::Account::from(account),
)
.await;
}
Err(e) => {
tracing::error!("{}", e);
let m = match e {
Error::Hashing => {
Some(AccountFailure::FailedToHashPassword)
}
Error::Saving => Some(AccountFailure::SaveAccount),
Error::DbCritical => {
Some(AccountFailure::InternalServerError)
}
_ => None,
};
if let Some(m) = m {
client
.publish_or_log(
Topic::SignUpFailure,
QoS::AtLeastOnce,
true,
m,
)
.await;
}
}
}
}
}
_ => {}
},
Ok(Event::Incoming(_incoming)) => {}
Ok(Event::Outgoing(_outgoing)) => {}
Err(e) => {
tracing::error!("{}", e);
}
}
}
});
client
// tracing::info!("Mqtt channel closed");
}

View File

@ -0,0 +1,111 @@
use std::net::{IpAddr, Ipv4Addr};
use channels::account::{CreateAccount, MeResult, RegisterResult};
use channels::AsyncClient;
use config::SharedAppConfig;
use database_manager::Database;
use futures::future::{self};
use futures::stream::StreamExt;
use rumqttc::QoS;
use tarpc::context;
use tarpc::server::incoming::Incoming;
use tarpc::server::{self, Channel};
use tarpc::tokio_serde::formats::Json;
use crate::actions;
#[derive(Debug, Copy, Clone, serde::Serialize, thiserror::Error)]
#[serde(rename_all = "kebab-case", tag = "account")]
pub enum Error {
#[error("Unable to send or receive msg from database")]
DbCritical,
#[error("Failed to load account data")]
Account,
#[error("Failed to load account addresses")]
Addresses,
#[error("Unable to save record")]
Saving,
#[error("Unable to hash password")]
Hashing,
#[error("{0}")]
Db(#[from] database_manager::Error),
}
#[derive(Clone)]
struct AccountsServer {
db: Database,
config: SharedAppConfig,
mqtt_client: AsyncClient,
}
#[tarpc::server]
impl channels::account::rpc::Accounts for AccountsServer {
async fn me(self, _: context::Context, account_id: model::AccountId) -> MeResult {
let res = actions::me(account_id, self.db).await;
tracing::info!("ME result: {:?}", res);
res
}
async fn register_account(self, _: context::Context, details: CreateAccount) -> RegisterResult {
let res = actions::create_account(details, &self.db, self.config).await;
tracing::info!("REGISTER result: {:?}", res);
match res {
Ok(account) => {
self.mqtt_client
.publish_or_log(
channels::account::Topic::AccountCreated,
QoS::AtLeastOnce,
true,
&account,
)
.await;
RegisterResult {
account: Some(account),
error: None,
}
}
Err(_e) => RegisterResult {
account: None,
error: Some(channels::account::Error::Account),
},
}
}
}
pub async fn start(config: SharedAppConfig, db: Database, mqtt_client: AsyncClient) {
use channels::account::rpc::Accounts;
let port = { config.lock().account_manager().port };
let server_addr = (IpAddr::V4(Ipv4Addr::LOCALHOST), port);
let mut listener = tarpc::serde_transport::tcp::listen(&server_addr, Json::default)
.await
.unwrap();
tracing::info!("Starting account rpc at {}", listener.local_addr());
listener.config_mut().max_frame_length(usize::MAX);
listener
// Ignore accept errors.
.filter_map(|r| future::ready(r.ok()))
.map(server::BaseChannel::with_defaults)
// Limit channels to 8 per IP.
.max_channels_per_key(8, |t| t.transport().peer_addr().unwrap().ip())
.max_concurrent_requests_per_channel(20)
// serve is generated by the service attribute. It takes as input any type implementing
// the generated World trait.
.map(|channel| {
channel.execute(
AccountsServer {
db: db.clone(),
config: config.clone(),
mqtt_client: mqtt_client.clone(),
}
.serve(),
)
})
// Max 10 channels.
.buffer_unordered(10)
.for_each(|_| async {})
.await;
tracing::info!("RPC channel closed");
}

View File

@ -6,7 +6,7 @@ edition = "2021"
[dependencies]
actix = { version = "0.13", features = [] }
actix-rt = { version = "2.7", features = [] }
bus = { path = "../../shared/bus" }
channels = { path = "../../shared/channels" }
chrono = { version = "0.4", features = ["serde"] }
config = { path = "../../shared/config" }
database_manager = { path = "../database_manager" }

View File

@ -43,22 +43,24 @@ macro_rules! db_async_handler {
}
};
($msg: ty, $async: ident, $res: ty, $inner_async: ident) => {
async fn $inner_async(msg: $msg, pool: sqlx::PgPool) -> Result<$res> {
let mut t = pool.begin().await.map_err(|e| {
tracing::error!("{:?}", e);
$crate::Error::TransactionFailed
})?;
match $async(msg, &mut t).await {
Ok(res) => {
t.commit().await.map_err(|e| {
tracing::error!("{:?}", e);
$crate::Error::TransactionFailed
})?;
Ok(res)
}
Err(e) => {
let _ = t.rollback().await;
Err(e)
impl $msg {
pub async fn $inner_async(self, pool: sqlx::PgPool) -> Result<$res> {
let mut t = pool.begin().await.map_err(|e| {
tracing::error!("{:?}", e);
$crate::Error::TransactionFailed
})?;
match $async(self, &mut t).await {
Ok(res) => {
t.commit().await.map_err(|e| {
tracing::error!("{:?}", e);
$crate::Error::TransactionFailed
})?;
Ok(res)
}
Err(e) => {
let _ = t.rollback().await;
Err(e)
}
}
}
}
@ -69,7 +71,7 @@ macro_rules! db_async_handler {
fn handle(&mut self, msg: $msg, _ctx: &mut Self::Context) -> Self::Result {
use actix::WrapFuture;
let pool = self.pool.clone();
Box::pin(async { $inner_async(msg, pool).await }.into_actor(self))
Box::pin(async { msg.$inner_async(pool).await }.into_actor(self))
}
}
@ -170,7 +172,7 @@ pub type Result<T> = std::result::Result<T, Error>;
#[derive(Clone)]
pub struct Database {
pool: PgPool,
config: SharedAppConfig,
_config: SharedAppConfig,
}
pub type SharedDatabase = actix::Addr<Database>;
@ -182,7 +184,10 @@ impl Database {
tracing::error!("Failed to connect to database. {e:?}");
std::process::exit(1);
});
Self { pool, config }
Self {
pool,
_config: config,
}
}
pub fn pool(&self) -> &PgPool {

View File

@ -4,7 +4,6 @@ version = "0.1.0"
edition = "2021"
[dependencies]
account_manager = { path = "../actors/account_manager" }
actix = { version = "0.13", features = [] }
actix-broker = { version = "0.4", features = [] }
actix-cors = { version = "0.6", features = [] }
@ -20,6 +19,7 @@ actix-web-opentelemetry = { version = "0.12", features = [] }
async-trait = { version = "0.1", features = [] }
bytes = { version = "1.1.0" }
cart_manager = { path = "../actors/cart_manager" }
channels = { path = "../shared/channels" }
chrono = { version = "0.4", features = ["serde"] }
config = { path = "../shared/config" }
database_manager = { path = "../actors/database_manager" }
@ -54,3 +54,4 @@ tracing = { version = "0.1.34" }
tracing-subscriber = { version = "0.3.11" }
uuid = { version = "1.2.1", features = ["serde"] }
validator = { version = "0.14", features = [] }
tarpc = { version = "0.30.0", features = ["tokio1", "serde-transport-bincode", "serde-transport", "serde", "serde-transport-json", "tcp"] }

View File

@ -59,8 +59,7 @@ async fn server(opts: ServerOpts) -> Result<()> {
.await
.expect("Failed to initialize file system storage");
let cart_manager = cart_manager::CartManager::new(db.clone()).start();
let account_manager =
account_manager::AccountManager::new(app_config.clone(), db.clone()).start();
let account_manager = channels::account::rpc::create_client(app_config.clone()).await;
let addr = {
let l = app_config.lock();
let w = l.web();

View File

@ -40,7 +40,6 @@ pub enum Error {
CriticalFailure,
Public(public::Error),
Admin(admin::Error),
Account(account_manager::Error),
Cart(cart_manager::Error),
Database(database_manager::Error),
Email(email_manager::Error),
@ -79,7 +78,6 @@ impl Display for Error {
})
.unwrap_or_default(),
Error::CriticalFailure => String::from("Something went wrong"),
Error::Account(_e) => serde_json::to_string(&self).unwrap_or_default(),
Error::Cart(_e) => serde_json::to_string(&self).unwrap_or_default(),
Error::Database(_e) => serde_json::to_string(&self).unwrap_or_default(),
Error::Email(_e) => serde_json::to_string(&self).unwrap_or_default(),
@ -102,7 +100,6 @@ impl ResponseError for Error {
}
Error::Admin(_) => StatusCode::BAD_REQUEST,
Error::Public(_) => StatusCode::BAD_REQUEST,
Error::Account(_) => StatusCode::BAD_REQUEST,
Error::Cart(_) => StatusCode::BAD_REQUEST,
Error::Database(_) => StatusCode::BAD_REQUEST,
Error::Email(_) => StatusCode::BAD_REQUEST,

View File

@ -1,4 +1,3 @@
use account_manager::query_account;
use actix::Addr;
use actix_web::web::{scope, Data, Json, ServiceConfig};
use actix_web::{delete, get, post, put, HttpRequest, HttpResponse};
@ -210,7 +209,7 @@ async fn delete_cart_item(
#[get("/me")]
pub(crate) async fn me(
account: Data<Addr<account_manager::AccountManager>>,
account: Data<channels::account::rpc::AccountsClient>,
tm: Data<Addr<TokenManager>>,
credentials: BearerAuth,
) -> routes::Result<Json<model::api::Account>> {
@ -218,14 +217,14 @@ pub(crate) async fn me(
.require_user(tm.into_inner())
.await?
.account_id();
let account_manager::MeResult { account, addresses } = query_account!(
account,
account_manager::Me { account_id },
PublicError::DatabaseConnection.into(),
PublicError::DatabaseConnection.into()
);
Ok(Json((account, addresses).into()))
match account.me(tarpc::context::current(), account_id).await {
Ok(me) => Ok(Json((me.account.unwrap(), me.addresses.unwrap()).into())),
Err(e) => {
tracing::error!("{}", e);
Err(routes::Error::CriticalFailure)
}
}
}
#[post("/order")]

View File

@ -1,13 +1,15 @@
[package]
name = "bus"
name = "channels"
version = "0.1.0"
edition = "2021"
[dependencies]
serde = { version = "*", features = ['derive'] }
bincode = { version = "*" }
model = { path = "../model" }
bytes = { version = "1.2.1" }
config = { path = "../config" }
model = { path = "../model" }
rumqttc = { version = "0.17.0" }
serde = { version = "*", features = ['derive'] }
tarpc = { version = "0.30.0", features = ["tokio1", "serde-transport-bincode", "serde-transport", "serde", "serde-transport-json", "tcp"] }
thiserror = { version = "1.0.37" }
tracing = { version = "0.1.37" }
rumqttc = { version = "0.17.0" }

View File

@ -1,5 +1,6 @@
#![feature(structural_match)]
#[derive(Clone)]
pub struct AsyncClient(pub rumqttc::AsyncClient);
impl AsyncClient {
@ -31,12 +32,16 @@ impl AsyncClient {
pub mod account {
use model::{Email, Login, Password, Role};
#[derive(Debug, thiserror::Error)]
#[derive(Debug, thiserror::Error, serde::Serialize, serde::Deserialize)]
pub enum Error {
#[error("mqtt payload has invalid create account data")]
InvalidCreateAccount,
#[error("mqtt payload has invalid account failure data")]
InvalidAccountFailure,
#[error("Account does not exists")]
Account,
#[error("Account does have any addresses")]
Addresses,
}
pub static CLIENT_NAME: &str = "account-manager";
@ -115,4 +120,52 @@ pub mod account {
})
}
}
#[derive(Debug, Default, serde::Serialize, serde::Deserialize)]
pub struct MeResult {
pub account: Option<model::FullAccount>,
pub addresses: Option<Vec<model::AccountAddress>>,
pub error: Option<Error>,
}
#[derive(Debug, Default, serde::Serialize, serde::Deserialize)]
pub struct RegisterResult {
pub account: Option<model::FullAccount>,
pub error: Option<Error>,
}
pub mod rpc {
use config::SharedAppConfig;
#[tarpc::service]
pub trait Accounts {
/// Returns a greeting for name.
async fn me(account_id: model::AccountId) -> crate::account::MeResult;
/// Creates new user account.
async fn register_account(
details: crate::account::CreateAccount,
) -> crate::account::RegisterResult;
}
pub async fn create_client(config: SharedAppConfig) -> AccountsClient {
use tarpc::client;
use tarpc::tokio_serde::formats::Json;
let addr = {
let l = config.lock();
(l.account_manager().bind.clone(), l.account_manager().port)
};
let transport = tarpc::serde_transport::tcp::connect(addr, Json::default);
let client = AccountsClient::new(
client::Config::default(),
transport.await.expect("Failed to connect to server"),
)
.spawn();
client
}
}
}

View File

@ -4,18 +4,18 @@ version = "0.1.0"
edition = "2021"
[dependencies]
serde = { version = "1.0", features = ["derive"] }
serde_json = { version = "1.0", features = [] }
toml = { version = "0.5", features = [] }
actix-web = { version = "4.0", features = [] }
parking_lot = { version = "0.12", features = [] }
password-hash = { version = "0.4", features = ["alloc"] }
pay_u = { version = '0.1', features = ["single-client"] }
actix-web = { version = "4.0", features = [] }
tracing = { version = "0.1.34" }
serde = { version = "1.0", features = ["derive"] }
serde_json = { version = "1.0", features = [] }
thiserror = { version = "1.0" }
toml = { version = "0.5", features = [] }
tracing = { version = "0.1.34" }

View File

@ -425,17 +425,24 @@ impl FilesConfig {
#[derive(Debug, Serialize, Deserialize)]
pub struct AccountManagerConfig {
pub port: u16,
pub bind: String,
}
impl Default for AccountManagerConfig {
fn default() -> Self {
Self { port: 19329 }
Self {
port: 19329,
bind: "0.0.0.0".into(),
}
}
}
impl Example for AccountManagerConfig {
fn example() -> Self {
Self { port: 19329 }
Self {
port: 19329,
bind: "0.0.0.0".into(),
}
}
}