Add mqtt driven create account

This commit is contained in:
eraden 2022-11-01 22:09:58 +01:00
parent 2778ce78b9
commit 1fe3a1cae1
19 changed files with 524 additions and 184 deletions

236
Cargo.lock generated
View File

@ -8,15 +8,21 @@ version = "0.1.0"
dependencies = [
"actix 0.13.0",
"actix-rt",
"bincode",
"bus",
"bytes",
"config",
"database_manager",
"dotenv",
"fibers_rpc",
"model",
"pretty_env_logger",
"rumqttc",
"serde",
"thiserror",
"tokio",
"tracing",
"tracing-subscriber",
]
[[package]]
@ -674,13 +680,13 @@ dependencies = [
"human-panic",
"include_dir",
"jemallocator",
"messagebus",
"model",
"oauth2",
"order_manager",
"parking_lot 0.12.1",
"payment_manager",
"pretty_env_logger",
"rumqttc",
"search_manager",
"serde",
"serde_json",
@ -708,6 +714,15 @@ dependencies = [
"num-traits",
]
[[package]]
name = "bincode"
version = "1.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b1f45e9417d87227c7a56d22e471c6206462cba514c7590c09aff4cf6d1ddcad"
dependencies = [
"serde",
]
[[package]]
name = "bit-vec"
version = "0.6.3"
@ -778,8 +793,13 @@ checksum = "572f695136211188308f16ad2ca5c851a712c464060ae6974944458eb83880ba"
name = "bus"
version = "0.1.0"
dependencies = [
"lifeline",
"tokio",
"bincode",
"bytes",
"model",
"rumqttc",
"serde",
"thiserror",
"tracing",
]
[[package]]
@ -825,6 +845,7 @@ dependencies = [
"database_manager",
"model",
"pretty_env_logger",
"rumqttc",
"serde",
"thiserror",
"tracing",
@ -1072,16 +1093,6 @@ dependencies = [
"typenum",
]
[[package]]
name = "ctor"
version = "0.1.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d2301688392eb071b0bf1a37be05c469d3cc4dbbd95df672fe28ab021e6a096"
dependencies = [
"quote",
"syn",
]
[[package]]
name = "ctr"
version = "0.9.2"
@ -1170,16 +1181,6 @@ dependencies = [
"syn",
]
[[package]]
name = "dashmap"
version = "4.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e77a43b28d0668df09411cb0bc9a8c2adc40f9a048afe863e05fd43251e8e39c"
dependencies = [
"cfg-if 1.0.0",
"num_cpus",
]
[[package]]
name = "data-encoding"
version = "2.3.2"
@ -1200,6 +1201,7 @@ dependencies = [
"model",
"pretty_env_logger",
"rand",
"rumqttc",
"serde",
"sqlx",
"sqlx-core",
@ -1341,6 +1343,7 @@ dependencies = [
"config",
"model",
"pretty_env_logger",
"rumqttc",
"sendgrid",
"serde",
"serde_json",
@ -1378,15 +1381,6 @@ dependencies = [
"termcolor",
]
[[package]]
name = "erased-serde"
version = "0.3.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "54558e0ba96fbe24280072642eceb9d7d442e32c7ec0ea9e7ecd7b4ea2cf4e11"
dependencies = [
"serde",
]
[[package]]
name = "event-listener"
version = "2.5.3"
@ -1518,6 +1512,19 @@ dependencies = [
"thiserror",
]
[[package]]
name = "flume"
version = "0.10.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1657b4441c3403d9f7b3409e47575237dac27b1b5726df654a6ecbf92f0f7577"
dependencies = [
"futures-core",
"futures-sink",
"nanorand",
"pin-project",
"spin 0.9.4",
]
[[package]]
name = "fnv"
version = "1.0.7"
@ -1567,6 +1574,7 @@ dependencies = [
"fibers_rpc",
"model",
"pretty_env_logger",
"rumqttc",
"serde",
"thiserror",
"tokio",
@ -2285,6 +2293,7 @@ dependencies = [
"fluent",
"model",
"pretty_env_logger",
"rumqttc",
"thiserror",
"tracing",
"unic-langid",
@ -2332,23 +2341,6 @@ dependencies = [
"vcpkg",
]
[[package]]
name = "lifeline"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b49b836e2676e5f6d926c09dc1a13b52c3315b459554df4514d76756bfa72a0e"
dependencies = [
"anyhow",
"async-trait",
"futures-util",
"log",
"pin-project 0.4.30",
"postage",
"regex",
"thiserror",
"tokio",
]
[[package]]
name = "link-cplusplus"
version = "1.0.7"
@ -2435,39 +2427,6 @@ dependencies = [
"autocfg",
]
[[package]]
name = "messagebus"
version = "0.9.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "81124c365f95dc69d0a6e572220dc57ca2d0148fd130bfb2b54a6aa1506102d5"
dependencies = [
"async-trait",
"ctor",
"dashmap",
"erased-serde",
"futures 0.3.25",
"log",
"messagebus_derive",
"parking_lot 0.11.2",
"serde",
"serde_derive",
"sharded-slab",
"smallvec",
"thiserror",
"tokio",
]
[[package]]
name = "messagebus_derive"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49f364a3b76588b4997b63e99e1bf73e2cf37e2546e97fbae4cb2a268a74e57e"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "mime"
version = "0.3.16"
@ -2562,6 +2521,15 @@ dependencies = [
"validator 0.15.0",
]
[[package]]
name = "nanorand"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3"
dependencies = [
"getrandom",
]
[[package]]
name = "native-tls"
version = "0.2.10"
@ -2796,7 +2764,7 @@ dependencies = [
"js-sys",
"lazy_static",
"percent-encoding",
"pin-project 1.0.12",
"pin-project",
"rand",
"thiserror",
"tokio",
@ -2823,6 +2791,7 @@ dependencies = [
"database_manager",
"model",
"pretty_env_logger",
"rumqttc",
"serde",
"thiserror",
"tracing",
@ -2947,6 +2916,7 @@ dependencies = [
"parking_lot 0.12.1",
"pay_u",
"pretty_env_logger",
"rumqttc",
"serde",
"testx",
"thiserror",
@ -3043,33 +3013,13 @@ dependencies = [
"uncased",
]
[[package]]
name = "pin-project"
version = "0.4.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3ef0f924a5ee7ea9cbcea77529dba45f8a9ba9f622419fe3386ca581a3ae9d5a"
dependencies = [
"pin-project-internal 0.4.30",
]
[[package]]
name = "pin-project"
version = "1.0.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ad29a609b6bcd67fee905812e544992d216af9d755757c05ed2d0e15a74c6ecc"
dependencies = [
"pin-project-internal 1.0.12",
]
[[package]]
name = "pin-project-internal"
version = "0.4.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "851c8d0ce9bebe43790dedfc86614c23494ac9f423dd618d3a61fc693eafe61e"
dependencies = [
"proc-macro2",
"quote",
"syn",
"pin-project-internal",
]
[[package]]
@ -3119,21 +3069,6 @@ dependencies = [
"universal-hash",
]
[[package]]
name = "postage"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a63d25391d04a097954b76aba742b6b5b74f213dfe3dbaeeb36e8ddc1c657f0b"
dependencies = [
"atomic",
"crossbeam-queue",
"log",
"pin-project 1.0.12",
"pollster",
"static_assertions",
"thiserror",
]
[[package]]
name = "ppv-lite86"
version = "0.2.16"
@ -3341,7 +3276,7 @@ dependencies = [
"percent-encoding",
"pin-project-lite",
"rustls",
"rustls-pemfile",
"rustls-pemfile 1.0.1",
"serde",
"serde_json",
"serde_urlencoded",
@ -3366,12 +3301,31 @@ dependencies = [
"cc",
"libc",
"once_cell",
"spin",
"spin 0.5.2",
"untrusted",
"web-sys",
"winapi 0.3.9",
]
[[package]]
name = "rumqttc"
version = "0.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "499b7ab08ffa5a722958b6ce1b7c0270bea30909f589d12c5ec3a051afe423fc"
dependencies = [
"bytes",
"flume",
"futures 0.3.25",
"http",
"log",
"pollster",
"rustls-native-certs",
"rustls-pemfile 0.3.0",
"thiserror",
"tokio",
"tokio-rustls",
]
[[package]]
name = "rust_decimal"
version = "1.26.1"
@ -3435,6 +3389,27 @@ dependencies = [
"webpki",
]
[[package]]
name = "rustls-native-certs"
version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0167bac7a9f490495f3c33013e7722b53cb087ecbe082fb0c6387c96f634ea50"
dependencies = [
"openssl-probe",
"rustls-pemfile 1.0.1",
"schannel",
"security-framework",
]
[[package]]
name = "rustls-pemfile"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ee86d63972a7c661d1536fefe8c3c8407321c3df668891286de28abcd087360"
dependencies = [
"base64",
]
[[package]]
name = "rustls-pemfile"
version = "1.0.1"
@ -3513,6 +3488,7 @@ dependencies = [
"model",
"parking_lot 0.12.1",
"pretty_env_logger",
"rumqttc",
"serde",
"sonic-channel",
"thiserror",
@ -3782,6 +3758,15 @@ version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
[[package]]
name = "spin"
version = "0.9.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f6002a767bff9e83f8eeecf883ecb8011875a21ae8da43bffb817a57e78cc09"
dependencies = [
"lock_api",
]
[[package]]
name = "splay_tree"
version = "0.2.10"
@ -3855,7 +3840,7 @@ dependencies = [
"rand",
"rust_decimal",
"rustls",
"rustls-pemfile",
"rustls-pemfile 1.0.1",
"serde",
"serde_json",
"sha1",
@ -3904,12 +3889,6 @@ dependencies = [
"tokio-rustls",
]
[[package]]
name = "static_assertions"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
[[package]]
name = "stringprep"
version = "0.1.2"
@ -4125,6 +4104,7 @@ dependencies = [
"password-hash",
"pretty_env_logger",
"rand_core",
"rumqttc",
"serde",
"sha2",
"testx",

View File

@ -3,15 +3,25 @@ name = "account_manager"
version = "0.1.0"
edition = "2021"
[[bin]]
name = "account-manager"
path = "./src/main.rs"
[dependencies]
actix = { version = "0.13", features = [] }
actix-rt = { version = "2.7", features = [] }
bincode = { version = "1.3.3" }
bus = { path = "../../shared/bus" }
bytes = { version = "1.2.1" }
config = { path = "../../shared/config" }
database_manager = { path = "../database_manager" }
dotenv = { version = "0.15.0" }
fibers_rpc = { version = "0.3.4", features = [] }
model = { path = "../../shared/model" }
pretty_env_logger = { version = "0.4", features = [] }
rumqttc = { version = "*" }
serde = { version = "1.0.137", features = ["derive"] }
thiserror = { version = "1.0.31" }
tokio = { version = "1.21.2", features = ['full'] }
tracing = { version = "0.1.6" }
tracing-subscriber = { version = "0.3.16" }

View File

@ -0,0 +1,145 @@
#![feature(structural_match)]
use std::time::Duration;
use bus::account::{AccountFailure, CreateAccount, Topic};
use config::{SharedAppConfig, UpdateConfig};
use model::{Encrypt, FullAccount};
use rumqttc::{AsyncClient, Event, Incoming, MqttOptions, QoS};
pub type Result<T> = std::result::Result<T, Error>;
#[derive(Debug, Copy, Clone, serde::Serialize, thiserror::Error)]
pub enum Error {
#[error("Unable to send or receive msg from database")]
DbCritical,
#[error("Failed to load account data")]
Account,
#[error("Failed to load account addresses")]
Addresses,
#[error("Unable to save record")]
Saving,
#[error("Unable to hash password")]
Hashing,
#[error("{0}")]
Db(#[from] database_manager::Error),
}
pub struct Opts {}
impl UpdateConfig for Opts {}
#[actix::main]
async fn main() {
dotenv::dotenv().ok();
tracing_subscriber::fmt::init();
let opts = Opts {};
let config = config::default_load(&opts);
let db = database_manager::Database::build(config.clone()).await;
let mut mqttoptions = MqttOptions::new(bus::account::CLIENT_NAME, "0.0.0.0", 1883);
mqttoptions.set_keep_alive(Duration::from_secs(5));
let (client, mut event_loop) = AsyncClient::new(mqttoptions, 10);
client
.subscribe(Topic::CreateAccount, QoS::AtLeastOnce)
.await
.unwrap();
let client = bus::AsyncClient(client);
loop {
let notification = event_loop.poll().await;
match notification {
Ok(Event::Incoming(Incoming::Publish(publish))) => match publish.topic.as_str() {
topic if Topic::CreateAccount == topic => {
if let Ok(msg) = CreateAccount::try_from(publish.payload) {
match create_account(msg, &db, config.clone()).await {
Ok(account) => {
client
.publish_or_log(
Topic::AccountCreated,
QoS::AtLeastOnce,
true,
model::Account::from(account),
)
.await;
}
Err(e) => {
tracing::error!("{}", e);
let m = match e {
Error::Hashing => Some(AccountFailure::FailedToHashPassword),
Error::Saving => Some(AccountFailure::SaveAccount),
Error::DbCritical => Some(AccountFailure::InternalServerError),
_ => None,
};
if let Some(m) = m {
client.publish_or_log(
Topic::SignUpFailure,
QoS::AtLeastOnce,
true,
m,
);
}
}
}
}
}
_ => {}
},
Ok(Event::Incoming(_incoming)) => {}
Ok(Event::Outgoing(_outgoing)) => {}
Err(e) => {
tracing::error!("{}", e);
}
}
}
}
pub(crate) async fn create_account(
msg: CreateAccount,
db: &database_manager::Database,
config: SharedAppConfig,
) -> Result<FullAccount> {
let hash = {
match msg.password.encrypt(&config.lock().web().pass_salt()) {
Ok(hash) => hash,
Err(e) => {
tracing::error!("{e:?}");
return Err(Error::Hashing);
}
}
};
let mut t = db.pool().begin().await.map_err(|e| {
tracing::error!("{}", e);
Error::DbCritical
})?;
let account: FullAccount = match database_manager::create_account(
database_manager::CreateAccount {
email: msg.email,
login: msg.login,
pass_hash: model::PassHash::new(hash),
role: msg.role,
},
&mut t,
)
.await
{
Ok(r) => r,
Err(e) => {
tracing::error!("{}", e);
t.rollback().await.ok();
return Err(Error::Saving);
}
};
t.commit().await.map_err(|e| {
tracing::error!("{}", e);
Error::DbCritical
})?;
Ok(account)
}

View File

@ -12,6 +12,7 @@ config = { path = "../../shared/config" }
database_manager = { path = "../database_manager" }
model = { path = "../../shared/model" }
pretty_env_logger = { version = "0.4", features = [] }
rumqttc = { version = "*" }
serde = { version = "1.0.137", features = ["derive"] }
thiserror = { version = "1.0.31" }
tracing = { version = "0.1.34" }

View File

@ -17,6 +17,7 @@ itertools = { version = "0.10.3" }
model = { path = "../../shared/model" }
pretty_env_logger = { version = "0.4", features = [] }
rand = { version = "0.8.5", optional = true }
rumqttc = { version = "*" }
serde = { version = "1.0", features = ["derive"] }
sqlx = { version = "0.6.2", features = ["migrate", "runtime-actix-rustls", "all-types", "postgres"] }
sqlx-core = { version = "0.6.2", features = [] }

View File

@ -65,7 +65,7 @@ db_async_handler!(
inner_create_account
);
pub(crate) async fn create_account(
pub async fn create_account(
msg: CreateAccount,
pool: &mut sqlx::Transaction<'_, sqlx::Postgres>,
) -> Result<FullAccount> {

View File

@ -10,6 +10,7 @@ chrono = { version = "0.4", features = ["serde"] }
config = { path = "../../shared/config" }
model = { path = "../../shared/model" }
pretty_env_logger = { version = "0.4", features = [] }
rumqttc = { version = "*" }
sendgrid = { version = "0.17", features = ["async"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = { version = "1.0", features = [] }

View File

@ -13,8 +13,10 @@ config = { path = "../../shared/config" }
fibers_rpc = { version = "0.3.4", features = [] }
model = { path = "../../shared/model" }
pretty_env_logger = { version = "0.4", features = [] }
rumqttc = { version = "*" }
serde = { version = "1.0", features = ["derive"] }
thiserror = { version = "1.0.31" }
tokio = { version = "1.18.1", features = ["full"] }
tracing = { version = "0.1.34" }
uuid = { version = "1.2.1", features = ["serde"] }

View File

@ -10,6 +10,7 @@ config = { path = "../../shared/config" }
fluent = { version = "0.16.0" }
model = { path = "../../shared/model" }
pretty_env_logger = { version = "0.4", features = [] }
rumqttc = { version = "*" }
thiserror = { version = "1.0.31" }
tracing = { version = "0.1.34" }
unic-langid = { version = "0.9.0" }

View File

@ -11,6 +11,7 @@ config = { path = "../../shared/config" }
database_manager = { path = "../database_manager" }
model = { path = "../../shared/model" }
pretty_env_logger = { version = "0.4", features = [] }
rumqttc = { version = "*" }
serde = { version = "1.0.137", features = ["derive"] }
thiserror = { version = "1.0.31" }
tracing = { version = "0.1.34" }

View File

@ -14,6 +14,7 @@ model = { path = "../../shared/model" }
parking_lot = { version = "0.12", features = [] }
pay_u = { version = '0.1', features = ["single-client"] }
pretty_env_logger = { version = "0.4", features = [] }
rumqttc = { version = "*" }
serde = { version = "1.0", features = ["derive"] }
thiserror = { version = "1.0.31" }
tracing = { version = "0.1.34" }

View File

@ -12,6 +12,7 @@ derive_more = { version = "0.99", features = [] }
model = { path = "../../shared/model" }
parking_lot = { version = "0.12", features = [] }
pretty_env_logger = { version = "0.4", features = [] }
rumqttc = { version = "*" }
serde = { version = "1.0", features = ["derive"] }
sonic-channel = { version = "1.1.0", features = ["ingest"] }
thiserror = { version = "1.0.31" }

View File

@ -20,6 +20,7 @@ parking_lot = { version = "0.12", features = [] }
password-hash = { version = "0.4", features = ["alloc"] }
pretty_env_logger = { version = "0.4", features = [] }
rand_core = { version = "0.6", features = ["std"] }
rumqttc = { version = "*" }
serde = { version = "1.0", features = ["derive"] }
sha2 = { version = "0.10", features = [] }
thiserror = { version = "1.0.31" }

View File

@ -4,55 +4,53 @@ version = "0.1.0"
edition = "2021"
[dependencies]
model = { path = "../shared/model", version = "0.1", features = ["db"] }
config = { path = "../shared/config" }
database_manager = { path = "../actors/database_manager" }
cart_manager = { path = "../actors/cart_manager" }
email_manager = { path = "../actors/email_manager" }
order_manager = { path = "../actors/order_manager" }
payment_manager = { path = "../actors/payment_manager" }
search_manager = { path = "../actors/search_manager" }
token_manager = { path = "../actors/token_manager" }
fs_manager = { path = "../actors/fs_manager" }
account_manager = { path = "../actors/account_manager" }
human-panic = { version = "1.0.3" }
bytes = { version = "1.1.0" }
actix = { version = "0.13", features = [] }
actix-broker = { version = "0.4", features = [] }
actix-cors = { version = "0.6", features = [] }
actix-files = { version = "0.6", features = [] }
actix-identity = { version = "0.4", features = [] }
actix-multipart = { version = "0.4", features = [] }
actix-redis = { version = "0.11", features = [] }
actix-rt = { version = "2.7", features = [] }
actix-session = { version = "0.6", features = ["actix-redis", "redis-actor-session"] }
actix-web = { version = "4.0", features = [] }
actix-web-httpauth = { version = "0.6", features = [] }
actix-cors = { version = "0.6", features = [] }
actix-broker = { version = "0.4", features = [] }
actix-identity = { version = "0.4", features = [] }
actix-web-opentelemetry = { version = "0.12", features = [] }
actix-session = { version = "0.6", features = ["actix-redis", "redis-actor-session"] }
actix-redis = { version = "0.11", features = [] }
actix-files = { version = "0.6", features = [] }
actix-multipart = { version = "0.4", features = [] }
gumdrop = { version = "0.8", features = [] }
tera = { version = "1.15", features = [] }
uuid = { version = "1.2.1", features = ["serde"] }
async-trait = { version = "0.1", features = [] }
bytes = { version = "1.1.0" }
cart_manager = { path = "../actors/cart_manager" }
chrono = { version = "0.4", features = ["serde"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = { version = "1.0", features = [] }
toml = { version = "0.5", features = [] }
sqlx = { version = "0.6.2", features = ["migrate", "runtime-actix-rustls", "all-types", "postgres"] }
sqlx-core = { version = "0.6.2", features = [] }
thiserror = { version = "1.0", features = [] }
validator = { version = "0.14", features = [] }
tracing = { version = "0.1.34" }
tracing-subscriber = { version = "0.3.11" }
pretty_env_logger = { version = "0.4", features = [] }
dotenv = { version = "0.15", features = [] }
config = { path = "../shared/config" }
database_manager = { path = "../actors/database_manager" }
derive_more = { version = "0.99", features = [] }
parking_lot = { version = "0.12", features = [] }
tokio = { version = "1.17", features = ["full"] }
dotenv = { version = "0.15", features = [] }
email_manager = { path = "../actors/email_manager" }
fs_manager = { path = "../actors/fs_manager" }
futures = { version = "0.3", features = [] }
futures-util = { version = "0.3", features = [] }
oauth2 = { version = "4.1", features = [] }
async-trait = { version = "0.1", features = [] }
jemallocator = { version = "0.3", features = [] }
gumdrop = { version = "0.8", features = [] }
human-panic = { version = "1.0.3" }
include_dir = { version = "0.7.2", features = [] }
# For rewrite into bus-based app
messagebus = { version = "0.9.13" }
jemallocator = { version = "0.3", features = [] }
model = { path = "../shared/model", version = "0.1", features = ["db"] }
oauth2 = { version = "4.1", features = [] }
order_manager = { path = "../actors/order_manager" }
parking_lot = { version = "0.12", features = [] }
payment_manager = { path = "../actors/payment_manager" }
pretty_env_logger = { version = "0.4", features = [] }
rumqttc = { version = "*" }
search_manager = { path = "../actors/search_manager" }
serde = { version = "1.0", features = ["derive"] }
serde_json = { version = "1.0", features = [] }
sqlx = { version = "0.6.2", features = ["migrate", "runtime-actix-rustls", "all-types", "postgres"] }
sqlx-core = { version = "0.6.2", features = [] }
tera = { version = "1.15", features = [] }
thiserror = { version = "1.0", features = [] }
token_manager = { path = "../actors/token_manager" }
tokio = { version = "1.17", features = ["full"] }
toml = { version = "0.5", features = [] }
tracing = { version = "0.1.34" }
tracing-subscriber = { version = "0.3.11" }
uuid = { version = "1.2.1", features = ["serde"] }
validator = { version = "0.14", features = [] }

27
config/rumqqtd.conf Normal file
View File

@ -0,0 +1,27 @@
# Broker id. Used to identify local node of the replication mesh
id = 0
# A commitlog read will pull full segment. Make sure that a segment isn't
# too big as async tcp writes readiness of one connection might affect tail
# latencies of other connection. Not a problem with preempting runtimes
[router]
id = 0
dir = "/tmp/rumqttd"
max_segment_size = 10240
max_segment_count = 10
max_connections = 10001
# Configuration of server and connections that it accepts
[servers.1]
listen = "0.0.0.0:1883"
next_connection_delay_ms = 1
[servers.1.connections]
connection_timeout_ms = 5000
max_client_id_len = 256
throttle_delay_ms = 0
max_payload_size = 5120
max_inflight_count = 200
max_inflight_size = 1024
[console]
listen = "0.0.0.0:3030"

62
config/rumqttd.toml Normal file
View File

@ -0,0 +1,62 @@
id = 0
# A commitlog read will pull full segment. Make sure that a segment isn't
# too big as async tcp writes readiness of one connection might affect tail
# latencies of other connection. Not a problem with preempting runtimes
[router]
id = 0
instant_ack = true
max_segment_size = 104857600
max_segment_count = 10
max_read_len = 10240
max_connections = 10001
dir = "/tmp/rumqttd"
# Configuration of server and connections that it accepts
[v4.1]
name = "v4-1"
listen = "0.0.0.0:1883"
next_connection_delay_ms = 1
[v4.1.connections]
connection_timeout_ms = 60000
max_client_id_len = 256
throttle_delay_ms = 0
max_payload_size = 20480
max_inflight_count = 500
max_inflight_size = 1024
dynamic_filters = true
# Example configuration for a TLS enabled server
# [v4.2]
# name = "v4-2"
# listen = "0.0.0.0:8883"
# next_connection_delay_ms = 10
# # tls config for rustls
# [v4.2.tls]
# certpath = "./localhost.cert.pem"
# keypath = "./localhost.key.pem"
# capath = "./ca.cert.pem"
# # settings for all the connections on this server
# [v4.2.connections]
# connection_timeout_ms = 60000
# throttle_delay_ms = 0
# max_payload_size = 20480
# max_inflight_count = 100
# max_inflight_size = 1024
[v5.1]
name = "v5-1"
listen = "0.0.0.0:1884"
next_connection_delay_ms = 1
[v5.1.connections]
connection_timeout_ms = 60000
max_client_id_len = 256
throttle_delay_ms = 0
max_payload_size = 20480
max_inflight_count = 500
max_inflight_size = 1024
[ws]
[console]
listen = "0.0.0.0:3030"

View File

@ -4,5 +4,10 @@ version = "0.1.0"
edition = "2021"
[dependencies]
lifeline = { version = "0.6.1" }
tokio = { version = "1.18.2", features = ["full"] }
serde = { version = "*", features = ['derive'] }
bincode = { version = "*" }
model = { path = "../model" }
bytes = { version = "1.2.1" }
thiserror = { version = "1.0.37" }
tracing = { version = "0.1.37" }
rumqttc = { version = "0.17.0" }

View File

@ -1,13 +1,118 @@
pub mod messages;
pub use lifeline;
use lifeline::lifeline_bus;
pub use lifeline::Message;
pub use tokio::sync::mpsc;
#![feature(structural_match)]
pub use crate::messages::*;
pub struct AsyncClient(pub rumqttc::AsyncClient);
lifeline_bus!(pub struct MainBus);
impl AsyncClient {
pub async fn publish<Topic: Into<String>, T: serde::Serialize>(
&self,
topic: Topic,
qos: rumqttc::QoS,
retain: bool,
t: T,
) -> Result<(), rumqttc::ClientError> {
let v = bincode::serialize(&t).unwrap_or_default();
let bytes = bytes::Bytes::copy_from_slice(&v);
self.0.publish_bytes(topic, qos, retain, bytes).await
}
// impl Message<MainBus> for MainSend {
// type Channel = mpsc::Sender<Self>;
// }
pub async fn publish_or_log<Topic: Into<String>, T: serde::Serialize>(
&self,
topic: Topic,
qos: rumqttc::QoS,
retain: bool,
t: T,
) {
if let Err(e) = self.publish(topic, qos, retain, t).await {
tracing::error!("{}", e);
}
}
}
pub mod account {
use model::{Email, Login, Password, Role};
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("mqtt payload has invalid create account data")]
InvalidCreateAccount,
#[error("mqtt payload has invalid account failure data")]
InvalidAccountFailure,
}
pub static CLIENT_NAME: &str = "account-manager";
#[derive(Copy, Clone, Debug, PartialOrd, PartialEq, serde::Serialize, serde::Deserialize)]
pub enum Topic {
CreateAccount,
AccountCreated,
SignUpFailure,
}
impl Into<String> for Topic {
fn into(self) -> String {
String::from(self.to_str())
}
}
impl<'s> PartialEq<&'s str> for Topic {
fn eq(&self, other: &&'s str) -> bool {
self.to_str() == *other
}
}
impl PartialEq<String> for Topic {
fn eq(&self, other: &String) -> bool {
self.to_str() == other.as_str()
}
}
impl Topic {
pub fn to_str(self) -> &'static str {
match self {
Topic::CreateAccount => "account/create",
Topic::AccountCreated => "account/created",
Topic::SignUpFailure => "account/failure",
}
}
}
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct CreateAccount {
pub email: Email,
pub login: Login,
pub password: Password,
pub role: Role,
}
impl TryFrom<bytes::Bytes> for CreateAccount {
type Error = Error;
fn try_from(value: bytes::Bytes) -> Result<Self, Self::Error> {
bincode::deserialize(value.as_ref()).map_err(|e| {
tracing::error!("{}", e);
Error::InvalidCreateAccount
})
}
}
#[derive(Debug, thiserror::Error, serde::Serialize, serde::Deserialize)]
pub enum AccountFailure {
#[error("Failed to hash password")]
FailedToHashPassword,
#[error("Failed to save account")]
SaveAccount,
#[error("Internal server error")]
InternalServerError,
}
impl TryFrom<bytes::Bytes> for AccountFailure {
type Error = Error;
fn try_from(value: bytes::Bytes) -> Result<Self, Self::Error> {
bincode::deserialize(value.as_ref()).map_err(|e| {
tracing::error!("{}", e);
Error::InvalidAccountFailure
})
}
}
}

View File

@ -1,2 +0,0 @@
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum Main {}