diff --git a/crates/idp/Cargo.toml b/crates/idp/Cargo.toml new file mode 100644 index 0000000..1822532 --- /dev/null +++ b/crates/idp/Cargo.toml @@ -0,0 +1,33 @@ +[package] +name = "idp" +version = "0.1.0" +edition = "2021" + +[[bin]] +name = "idp" +path = "src/main.rs" + +[dependencies] +bincode = { version = "1" } +bytes = { version = "1" } +channels = { path = "../channels" } +config = { path = "../config" } +dotenv = { version = "0" } +futures = { version = "0" } +gumdrop = { version = "0" } +json = { version = "0" } +model = { path = "../model", features = ['db'] } +rauthy-client = { version = "0.4.0", features = ["actix-web", "qrcode"] } +rumqttc = { version = "*" } +serde = { version = "1", features = ["derive"] } +sqlx = { version = "0", features = ["migrate", "runtime-actix-rustls", "all-types", "postgres"] } +sqlx-core = { version = "0", features = [] } +tarpc = { version = "0", features = ["tokio1", "serde-transport-bincode", "serde-transport", "serde", "serde-transport-json", "tcp"] } +thiserror = { version = "1" } +tokio = { version = "1", features = ['full'] } +tracing = { version = "0" } +uuid = { workspace = true, features = ["v4"] } + +[dev-dependencies] +fake = { version = "2" } +testx = { path = "../testx" } diff --git a/crates/idp/migrations/202204131841_init.sql b/crates/idp/migrations/202204131841_init.sql new file mode 100644 index 0000000..8e54c83 --- /dev/null +++ b/crates/idp/migrations/202204131841_init.sql @@ -0,0 +1,23 @@ +CREATE EXTENSION IF NOT EXISTS "uuid-ossp" WITH SCHEMA public; + +CREATE TYPE "AccountState" AS ENUM ( + 'active', + 'suspended', + 'banned' + ); + +CREATE TYPE "Role" AS ENUM ( + 'admin', + 'user' + ); + +CREATE TABLE public.accounts +( + id serial NOT NULL, + email character varying NOT NULL, + login character varying NOT NULL, + pass_hash character varying NOT NULL, + role "Role" DEFAULT 'user'::"Role" NOT NULL, + customer_id uuid DEFAULT gen_random_uuid() NOT NULL, + state "AccountState" DEFAULT 'active'::"AccountState" NOT NULL +); diff --git a/crates/idp/migrations/202204131842_addresses.sql b/crates/idp/migrations/202204131842_addresses.sql new file mode 100644 index 0000000..ea595c9 --- /dev/null +++ b/crates/idp/migrations/202204131842_addresses.sql @@ -0,0 +1,13 @@ +CREATE TABLE public.account_addresses +( + id serial NOT NULL, + name text NOT NULL, + email text NOT NULL, + street text NOT NULL, + city text NOT NULL, + country text NOT NULL, + zip text NOT NULL, + account_id integer, + is_default boolean DEFAULT false NOT NULL, + phone text DEFAULT ''::text NOT NULL +); diff --git a/crates/idp/src/actions.rs b/crates/idp/src/actions.rs new file mode 100644 index 0000000..61026e0 --- /dev/null +++ b/crates/idp/src/actions.rs @@ -0,0 +1,124 @@ +use channels::accounts::{all, find_by_identity, me, register}; +use config::SharedAppConfig; +use model::{Encrypt, FullAccount, Ranged}; + +use crate::db::{AccountAddresses, Database, FindAccount}; +use crate::{Error, Result}; + +macro_rules! ok_or_rollback { + ($res: expr, $t: expr, $err: expr) => { + match $res { + Ok(v) => v, + Err(e) => { + tracing::error!("{}", e); + $t.rollback().await.ok(); + + return Err($err); + } + } + }; +} + +macro_rules! begin_t { + ($db: expr, $err: expr) => { + match $db.pool.begin().await { + Ok(t) => t, + Err(e) => { + tracing::error!("{}", e); + return Err($err); + } + } + }; +} + +pub async fn all(input: all::Input, db: Database) -> all::Output { + use channels::accounts::Error; + + let mut t = begin_t!(db, Error::DbCritical); + + let dbm = crate::db::AllAccounts { + limit: input.limit.into_raw(), + offset: input.offset.into_raw(), + }; + + let res = dbm.run(&mut t).await; + + let accounts = ok_or_rollback!(res, t, Error::All); + + t.commit().await.map_err(|e| { + tracing::error!("{}", e); + Error::DbCritical + })?; + + Ok(all::Details { accounts }) +} + +pub async fn me(account_id: model::AccountId, db: Database) -> me::Output { + use channels::accounts::Error; + + let mut t = begin_t!(db, Error::Account); + + let res = FindAccount { account_id }.run(&mut t).await; + let account: FullAccount = ok_or_rollback!(res, t, Error::Account); + + let res = AccountAddresses { account_id }.run(&mut t).await; + let addresses = ok_or_rollback!(res, t, Error::Addresses); + + t.commit().await.map_err(|e| { + tracing::error!("{}", e); + Error::DbCritical + })?; + + Ok(me::Details { account, addresses }) +} + +pub async fn create_account( + msg: register::Input, + db: &Database, + config: SharedAppConfig, +) -> Result { + let salt = config.lock().web().pass_salt(); + let hash = msg.password.encrypt(&salt).map_err(|e| { + tracing::error!("{e:?}"); + Error::Hashing + })?; + + let mut t = begin_t!(db, Error::DbCritical); + + let res = crate::db::CreateAccount { + email: msg.email, + login: msg.login, + pass_hash: model::PassHash::new(hash), + role: msg.role, + } + .run(&mut t) + .await; + + let account: FullAccount = ok_or_rollback!(res, t, Error::Saving); + + t.commit().await.map_err(|e| { + tracing::error!("{}", e); + Error::DbCritical + })?; + + Ok(account) +} + +pub async fn find_by_identity( + input: find_by_identity::Input, + db: Database, +) -> find_by_identity::Output { + use channels::accounts::Error; + + let mut t = begin_t!(db, Error::DbCritical); + let dbm = crate::db::AccountByIdentity { + login: input.login, + email: input.email, + }; + + let res = dbm.run(&mut t).await; + + let account: FullAccount = ok_or_rollback!(res, t, Error::InvalidIdentity); + + Ok(find_by_identity::Details { account }) +} diff --git a/crates/idp/src/bin/account-client.rs b/crates/idp/src/bin/account-client.rs new file mode 100644 index 0000000..264421f --- /dev/null +++ b/crates/idp/src/bin/account-client.rs @@ -0,0 +1,39 @@ +use std::time::Duration; + +use config::UpdateConfig; +use tarpc::context; +use tokio::time::sleep; + +#[derive(gumdrop::Options)] +struct Flags { + help: bool, + /// Sets the name to say hello to. + name: String, +} + +impl UpdateConfig for Flags {} + +#[tokio::main] +async fn main() -> std::io::Result<()> { + use channels::accounts::me::Input; + + let opts: Flags = gumdrop::Options::parse_args_default_or_exit(); + + let config = config::default_load(&opts); + let client = channels::accounts::rpc::create_client(config).await; + + let r = client + .me( + context::current(), + Input { + account_id: 1.into(), + }, + ) + .await; + println!("{:?}", r); + + // Let the background span processor finish. + sleep(Duration::from_micros(1)).await; + + Ok(()) +} diff --git a/crates/idp/src/db/accounts.rs b/crates/idp/src/db/accounts.rs new file mode 100644 index 0000000..6b098b6 --- /dev/null +++ b/crates/idp/src/db/accounts.rs @@ -0,0 +1,441 @@ +use model::{AccountId, AccountState, Email, FullAccount, Login, PassHash, Role}; + +pub type Result = std::result::Result; + +#[derive(Debug, Copy, Clone, PartialEq, Eq, thiserror::Error)] +pub enum Error { + #[error("Can't create account")] + CantCreate, + #[error("Can't find account does to lack of identity")] + NoIdentity, + #[error("Account does not exists")] + NotExists, + #[error("Failed to load all accounts")] + All, + #[error("Can't update account")] + CantUpdate, +} + +#[derive(Debug)] +pub struct AllAccounts { + pub limit: i32, + pub offset: i32, +} + +impl AllAccounts { + pub async fn run( + self, + pool: &mut sqlx::Transaction<'_, sqlx::Postgres>, + ) -> Result> { + sqlx::query_as( + r#" +SELECT id, email, login, pass_hash, role, customer_id, state +FROM accounts +LIMIT $1 OFFSET $2 + "#, + ) + .bind(self.limit) + .bind(self.offset) + .fetch_all(pool) + .await + .map_err(|e| { + tracing::error!("{e:?}"); + Error::All + }) + } +} + +#[derive(Debug)] +pub struct CreateAccount { + pub email: Email, + pub login: Login, + pub pass_hash: PassHash, + pub role: Role, +} + +impl CreateAccount { + pub async fn run( + self, + pool: &mut sqlx::Transaction<'_, sqlx::Postgres>, + ) -> Result { + sqlx::query_as( + r#" +INSERT INTO accounts (login, email, role, pass_hash) +VALUES ($1, $2, $3, $4) +RETURNING id, email, login, pass_hash, role, customer_id, state + "#, + ) + .bind(self.login) + .bind(self.email) + .bind(self.role) + .bind(self.pass_hash) + .fetch_one(pool) + .await + .map_err(|e| { + tracing::error!("{e:?}"); + Error::CantCreate + }) + } +} + +#[derive(Debug)] +pub struct UpdateAccount { + pub id: AccountId, + pub email: Email, + pub login: Login, + pub pass_hash: Option, + pub role: Role, + pub state: AccountState, +} + +impl UpdateAccount { + pub async fn run( + self, + pool: &mut sqlx::Transaction<'_, sqlx::Postgres>, + ) -> Result { + match self.pass_hash { + Some(hash) => sqlx::query_as( + r#" +UPDATE accounts +SET login = $2, email = $3, role = $4, pass_hash = $5, state = $6 +WHERE id = $1 +RETURNING id, email, login, pass_hash, role, customer_id, state + "#, + ) + .bind(self.id) + .bind(self.login) + .bind(self.email) + .bind(self.role) + .bind(hash) + .bind(self.state), + None => sqlx::query_as( + r#" +UPDATE accounts +SET login = $2, email = $3, role = $4, state = $5 +WHERE id = $1 +RETURNING id, email, login, pass_hash, role, customer_id, state + "#, + ) + .bind(self.id) + .bind(self.login) + .bind(self.email) + .bind(self.role) + .bind(self.state), + } + .fetch_one(pool) + .await + .map_err(|e| { + tracing::error!("{e:?}"); + Error::CantUpdate + }) + } +} + +#[derive(Debug)] +pub struct FindAccount { + pub account_id: AccountId, +} + +impl FindAccount { + pub async fn run( + self, + pool: &mut sqlx::Transaction<'_, sqlx::Postgres>, + ) -> Result { + sqlx::query_as( + r#" +SELECT id, email, login, pass_hash, role, customer_id, state +FROM accounts +WHERE id = $1 + "#, + ) + .bind(self.account_id) + .fetch_one(pool) + .await + .map_err(|e| { + tracing::error!("{e:?}"); + Error::NotExists + }) + } +} + +#[derive(Debug)] +pub struct AccountByIdentity { + pub login: Option, + pub email: Option, +} + +impl AccountByIdentity { + pub async fn run( + self, + pool: &mut sqlx::Transaction<'_, sqlx::Postgres>, + ) -> Result { + match (self.login, self.email) { + (Some(login), None) => sqlx::query_as( + r#" +SELECT id, email, login, pass_hash, role, customer_id, state +FROM accounts +WHERE login = $1 + "#, + ) + .bind(login), + (None, Some(email)) => sqlx::query_as( + r#" +SELECT id, email, login, pass_hash, role, customer_id, state +FROM accounts +WHERE email = $1 + "#, + ) + .bind(email), + (Some(login), Some(email)) => sqlx::query_as( + r#" +SELECT id, email, login, pass_hash, role, customer_id, state +FROM accounts +WHERE login = $1 AND email = $2 + "#, + ) + .bind(login) + .bind(email), + _ => return Err(Error::NoIdentity), + } + .fetch_one(pool) + .await + .map_err(|e| { + tracing::error!("{e:?}"); + Error::CantCreate + }) + } +} + +#[cfg(test)] +mod tests { + use config::UpdateConfig; + use fake::Fake; + use model::*; + + use super::*; + use crate::db::Database; + + pub struct NoOpts; + + impl UpdateConfig for NoOpts {} + + async fn test_create_account( + t: &mut sqlx::Transaction<'_, sqlx::Postgres>, + login: Option, + email: Option, + hash: Option, + ) -> FullAccount { + use fake::faker::internet::en; + let login: String = login.unwrap_or_else(|| en::Username().fake()); + let email: String = email.unwrap_or_else(|| en::FreeEmail().fake()); + let hash: String = hash.unwrap_or_else(|| en::Password(10..20).fake()); + + CreateAccount { + email: Email::new(email), + login: Login::new(login), + pass_hash: PassHash::new(hash), + role: Role::Admin, + } + .run(t) + .await + .unwrap() + } + + #[tokio::test] + async fn create_account() { + testx::db_t_ref!(t); + + let login: String = fake::faker::internet::en::Username().fake(); + let email: String = fake::faker::internet::en::FreeEmail().fake(); + let hash: String = fake::faker::internet::en::Password(10..20).fake(); + + let account: FullAccount = CreateAccount { + email: Email::new(&email), + login: Login::new(&login), + pass_hash: PassHash::new(&hash), + role: Role::Admin, + } + .run(&mut t) + .await + .unwrap(); + + let expected = FullAccount { + login: Login::new(login), + email: Email::new(email), + pass_hash: PassHash::new(&hash), + role: Role::Admin, + customer_id: account.customer_id, + id: account.id, + state: AccountState::Active, + }; + + t.rollback().await.unwrap(); + assert_eq!(account, expected); + } + + #[tokio::test] + async fn all_accounts() { + testx::db_t_ref!(t); + + test_create_account(&mut t, None, None, None).await; + test_create_account(&mut t, None, None, None).await; + test_create_account(&mut t, None, None, None).await; + + let v: Vec = AllAccounts { + limit: 200, + offset: 0, + } + .run(&mut t) + .await + .unwrap(); + + testx::db_rollback!(t); + assert!(v.len() >= 3); + } + + #[tokio::test] + async fn update_account_without_pass() { + testx::db_t_ref!(t); + + let original_login: String = fake::faker::internet::en::Username().fake(); + let original_email: String = fake::faker::internet::en::FreeEmail().fake(); + let original_hash: String = fake::faker::internet::en::Password(10..20).fake(); + + let original_account = test_create_account( + &mut t, + Some(original_login.clone()), + Some(original_email.clone()), + Some(original_hash.clone()), + ) + .await; + + let updated_login: String = fake::faker::internet::en::Username().fake(); + let updated_email: String = fake::faker::internet::en::FreeEmail().fake(); + + let updated_account: FullAccount = UpdateAccount { + id: original_account.id, + email: Email::new(updated_email.clone()), + login: Login::new(updated_login.clone()), + pass_hash: None, + role: Role::Admin, + state: AccountState::Active, + } + .run(&mut t) + .await + .unwrap(); + + let expected = FullAccount { + id: original_account.id, + email: Email::new(updated_email), + login: Login::new(updated_login), + pass_hash: PassHash::new(original_hash), + role: Role::Admin, + customer_id: original_account.customer_id, + state: AccountState::Active, + }; + + testx::db_rollback!(t); + assert_ne!(original_account, expected); + assert_eq!(updated_account, expected); + } + + #[tokio::test] + async fn update_account_with_pass() { + testx::db_t_ref!(t); + + let original_login: String = fake::faker::internet::en::Username().fake(); + let original_email: String = fake::faker::internet::en::FreeEmail().fake(); + let original_hash: String = fake::faker::internet::en::Password(10..20).fake(); + + let original_account = test_create_account( + &mut t, + Some(original_login.clone()), + Some(original_email.clone()), + Some(original_hash.clone()), + ) + .await; + + let updated_login: String = fake::faker::internet::en::Username().fake(); + let updated_email: String = fake::faker::internet::en::FreeEmail().fake(); + let updated_hash: String = fake::faker::internet::en::Password(10..20).fake(); + + let updated_account: FullAccount = UpdateAccount { + id: original_account.id, + email: Email::new(updated_email.clone()), + login: Login::new(updated_login.clone()), + pass_hash: Some(PassHash::new(updated_hash.clone())), + role: Role::Admin, + state: AccountState::Active, + } + .run(&mut t) + .await + .unwrap(); + + let expected = FullAccount { + id: original_account.id, + email: Email::new(updated_email), + login: Login::new(updated_login), + pass_hash: PassHash::new(updated_hash), + role: Role::Admin, + customer_id: original_account.customer_id, + state: AccountState::Active, + }; + + testx::db_rollback!(t); + assert_ne!(original_account, expected); + assert_eq!(updated_account, expected); + } + + #[tokio::test] + async fn find() { + testx::db_t_ref!(t); + + let account = test_create_account(&mut t, None, None, None).await; + + let res: FullAccount = FindAccount { + account_id: account.id, + } + .run(&mut t) + .await + .unwrap(); + + testx::db_rollback!(t); + assert_eq!(account, res); + } + + #[tokio::test] + async fn find_identity_email() { + testx::db_t_ref!(t); + + let account = test_create_account(&mut t, None, None, None).await; + + let res: FullAccount = AccountByIdentity { + email: Some(account.email.clone()), + login: None, + } + .run(&mut t) + .await + .unwrap(); + + testx::db_rollback!(t); + assert_eq!(account, res); + } + + #[tokio::test] + async fn find_identity_login() { + testx::db_t_ref!(t); + + let account = test_create_account(&mut t, None, None, None).await; + + let res: FullAccount = AccountByIdentity { + login: Some(account.login.clone()), + email: None, + } + .run(&mut t) + .await + .unwrap(); + + testx::db_rollback!(t); + assert_eq!(account, res); + } +} diff --git a/crates/idp/src/db/addresses.rs b/crates/idp/src/db/addresses.rs new file mode 100644 index 0000000..7d8aab8 --- /dev/null +++ b/crates/idp/src/db/addresses.rs @@ -0,0 +1,311 @@ +pub type Result = std::result::Result; + +#[derive(Debug, Copy, Clone, PartialEq, Eq, serde::Serialize, thiserror::Error)] +pub enum Error { + #[error("Can't load account addresses")] + AccountAddresses, + #[error("Failed to save account address")] + CreateAccountAddress, +} + +#[derive(Debug)] +pub struct AccountAddresses { + pub account_id: model::AccountId, +} + +impl AccountAddresses { + pub async fn run( + self, + pool: &mut sqlx::Transaction<'_, sqlx::Postgres>, + ) -> Result> { + sqlx::query_as( + r#" +SELECT id, name, email, phone, street, city, country, zip, account_id, is_default +FROM account_addresses +WHERE account_id = $1 + "#, + ) + .bind(self.account_id) + .fetch_all(pool) + .await + .map_err(|_| Error::AccountAddresses.into()) + } +} + +#[derive(Debug)] +pub struct FindAccountAddress { + pub account_id: model::AccountId, + pub address_id: model::AddressId, +} + +impl FindAccountAddress { + pub async fn run( + self, + pool: &mut sqlx::Transaction<'_, sqlx::Postgres>, + ) -> Result { + sqlx::query_as( + r#" +SELECT id, name, email, phone, street, city, country, zip, account_id, is_default +FROM account_addresses +WHERE account_id = $1 AND id = $2 + "#, + ) + .bind(self.account_id) + .bind(self.address_id) + .fetch_one(pool) + .await + .map_err(|_| Error::AccountAddresses.into()) + } +} + +#[derive(Debug)] +pub struct DefaultAccountAddress { + pub account_id: model::AccountId, +} + +impl DefaultAccountAddress { + pub async fn run( + self, + pool: &mut sqlx::Transaction<'_, sqlx::Postgres>, + ) -> Result { + sqlx::query_as( + r#" +SELECT id, name, email, phone, street, city, country, zip, account_id, is_default +FROM account_addresses +WHERE account_id = $1 AND is_default + "#, + ) + .bind(self.account_id) + .fetch_one(pool) + .await + .map_err(|_| Error::AccountAddresses.into()) + } +} + +#[derive(Debug)] +pub struct CreateAccountAddress { + pub name: model::Name, + pub email: model::Email, + pub phone: model::Phone, + pub street: model::Street, + pub city: model::City, + pub country: model::Country, + pub zip: model::Zip, + pub account_id: Option, + pub is_default: bool, +} + +impl CreateAccountAddress { + pub async fn run( + self, + pool: &mut sqlx::Transaction<'_, sqlx::Postgres>, + ) -> Result { + if self.is_default && self.account_id.is_some() { + if let Err(e) = sqlx::query( + r#" +UPDATE account_addresses +SET is_default = FALSE +WHERE account_id = $1 + "#, + ) + .bind(self.account_id) + .fetch_all(&mut *pool) + .await + { + tracing::error!("{e}"); + dbg!(e); + } + } + + sqlx::query_as( + r#" +INSERT INTO account_addresses ( name, email, phone, street, city, country, zip, account_id, is_default) +VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) +RETURNING id, name, email, phone, street, city, country, zip, account_id, is_default + "#, + ) + .bind(self.name) + .bind(self.email) + .bind(self.phone) + .bind(self.street) + .bind(self.city) + .bind(self.country) + .bind(self.zip) + .bind(self.account_id) + .bind(self.is_default) + .fetch_one(pool) + .await + .map_err(|e| { + tracing::error!("{e}"); + dbg!(e); + Error::CreateAccountAddress.into() + }) + } +} + +#[derive(Debug)] +pub struct UpdateAccountAddress { + pub id: model::AddressId, + pub name: model::Name, + pub email: model::Email, + pub phone: model::Phone, + pub street: model::Street, + pub city: model::City, + pub country: model::Country, + pub zip: model::Zip, + pub account_id: model::AccountId, + pub is_default: bool, +} + +impl UpdateAccountAddress { + pub async fn run( + self, + pool: &mut sqlx::Transaction<'_, sqlx::Postgres>, + ) -> Result { + sqlx::query_as( + r#" +UPDATE account_addresses +SET name = $2, email = $3, street = $4, city = $5, country = $6, zip = $7, account_id = $8, is_default = $9, phone = $10 +WHERE id = $1 +RETURNING id, name, email, phone, street, city, country, zip, account_id, is_default + "#, + ) + .bind(self.id) + .bind(self.name) + .bind(self.email) + .bind(self.street) + .bind(self.city) + .bind(self.country) + .bind(self.zip) + .bind(self.account_id) + .bind(self.is_default) + .bind(self.phone) + .fetch_one(pool) + .await + .map_err(|_| Error::CreateAccountAddress.into()) + } +} + +#[cfg(test)] +mod test { + use config::*; + use fake::Fake; + use model::*; + + use super::super::accounts::CreateAccount; + use super::*; + use crate::db::Database; + + pub struct NoOpts; + + impl UpdateConfig for NoOpts {} + + async fn test_create_account(pool: &mut sqlx::Transaction<'_, sqlx::Postgres>) -> FullAccount { + let login: String = fake::faker::internet::en::Username().fake(); + let email: String = fake::faker::internet::en::FreeEmail().fake(); + let hash: String = fake::faker::internet::en::Password(10..20).fake(); + + CreateAccount { + email: Email::new(email), + login: Login::new(login), + pass_hash: PassHash::new(hash), + role: Role::Admin, + } + .run(pool) + .await + .unwrap() + } + + #[tokio::test] + async fn full_check() { + testx::db_t_ref!(t); + + // account + let account = test_create_account(&mut t).await; + + // address + let mut address: AccountAddress = { + let name: String = fake::faker::name::en::Name().fake(); + let email: String = fake::faker::internet::en::FreeEmail().fake(); + let phone: String = fake::faker::phone_number::en::PhoneNumber().fake(); + let street: String = fake::faker::address::en::StreetName().fake(); + let city: String = fake::faker::address::en::CityName().fake(); + let country: String = fake::faker::address::en::CountryName().fake(); + let zip: String = fake::faker::address::en::ZipCode().fake(); + let account_id = Some(account.id); + let is_default: bool = true; + + let address = CreateAccountAddress { + name: Name::new(name.clone()), + email: Email::new(email.clone()), + phone: Phone::new(phone.clone()), + street: Street::new(street.clone()), + city: City::new(city.clone()), + country: Country::new(country.clone()), + zip: Zip::new(zip.clone()), + account_id, + is_default, + } + .run(&mut t) + .await + .unwrap(); + + assert_eq!( + address, + AccountAddress { + id: address.id, + name: Name::new(name.clone()), + email: Email::new(email.clone()), + phone: Phone::new(phone.clone()), + street: Street::new(street.clone()), + city: City::new(city.clone()), + country: Country::new(country.clone()), + zip: Zip::new(zip.clone()), + account_id: account.id, + is_default, + } + ); + address + }; + + let found = FindAccountAddress { + account_id: account.id, + address_id: address.id, + } + .run(&mut t) + .await + .unwrap(); + + assert_eq!(found, address); + + let changed = UpdateAccountAddress { + id: address.id, + name: address.name.clone(), + email: address.email.clone(), + phone: address.phone.clone(), + street: address.street.clone(), + city: address.city.clone(), + country: address.country.clone(), + zip: address.zip.clone(), + account_id: address.account_id, + is_default: true, + } + .run(&mut t) + .await + .unwrap(); + + address.is_default = true; + + assert_eq!(changed, address); + + let default_address = DefaultAccountAddress { + account_id: account.id, + } + .run(&mut t) + .await + .unwrap(); + + testx::db_rollback!(t); + assert_eq!(default_address, address); + } +} diff --git a/crates/idp/src/db/mod.rs b/crates/idp/src/db/mod.rs new file mode 100644 index 0000000..8c8fed4 --- /dev/null +++ b/crates/idp/src/db/mod.rs @@ -0,0 +1,31 @@ +pub mod accounts; +pub mod addresses; + +pub use accounts::*; +pub use addresses::*; +use config::SharedAppConfig; +use sqlx::{Pool, Postgres}; + +#[derive(Clone)] +pub struct Database { + pub pool: sqlx::PgPool, + _config: SharedAppConfig, +} + +impl Database { + pub async fn build(config: SharedAppConfig) -> Self { + let url = config.lock().idp().database_url.clone(); + let pool = sqlx::PgPool::connect(&url).await.unwrap_or_else(|e| { + tracing::error!("Failed to connect to database. {e:?}"); + std::process::exit(1); + }); + Self { + pool, + _config: config, + } + } + + pub fn pool(&self) -> Pool { + self.pool.clone() + } +} diff --git a/crates/idp/src/idp.rs b/crates/idp/src/idp.rs new file mode 100644 index 0000000..6c9441b --- /dev/null +++ b/crates/idp/src/idp.rs @@ -0,0 +1,163 @@ +use config::SharedAppConfig; + +pub async fn init(config: SharedAppConfig) { + let (secret, web) = { + let c = config.lock(); + (c.idp().secret(), c.web().host()) + }; + rauthy_client::init_with(None, RauthyHttpsOnly::No, DangerAcceptInvalidCerts::Yes).await?; + + let config = RauthyConfig { + // Sets the .is_admin field for the principal based on the `ClaimMapping`. + admin_claim: ClaimMapping::Or(vec![JwtClaim { + typ: JwtClaimTyp::Roles, + value: "admin".to_string(), + }]), + // Sets the .is_user field for the principal based on the `ClaimMapping`. + // Without this claim, a user would not have access to this app. This is + // used, because usually you never want to just have all your OIDC users to + // have access to a certain application. + user_claim: ClaimMapping::Or(vec![JwtClaim { + typ: JwtClaimTyp::Groups, + value: "user".to_string(), + }]), + // In almost all cases, this should just match the `client_id` + allowed_audiences: HashSet::from(["idp".to_string()]), + client_id: "idp".to_string(), + // If set to 'false', tokens with a non-verified email address will be rejected. + email_verified: !cfg!(debug_assertions), + // The issuer URL from your Rauthy deployment + iss: format!("{host}/auth/v1"), + // The scopes you want to request. The only mandatory which always needs to exist is + // `openid`, the rest is optional and depending on your needs. + scope: vec![ + "openid".to_string(), + "email".to_string(), + "profile".to_string(), + "groups".to_string(), + ], + // If set to None, the client will be treated as a public client and not provide any + // secret to the /token endpoint after the callback. Set a secret for confidential clients. + secret: secret.map(String::from), + // secret: Some("secretCopiedFromTheRauthyUiIfIsConfidentialClient".to_string(),), + }; + // The redirect_uri here must match the URI of this application, where we accept + // and handle the callback after a successful login. + OidcProvider::setup_from_config(config, format!("{host}/callback")).await?; +} + +pub async fn refresh_token(kanidm: &KanidmClient) -> Result<(), ClientError> { + Ok(()) +} + +pub async fn create_account_with_password( + kanidm: &KanidmClient, + login: &str, + display_name: &str, + email: &str, + password: &str, +) -> Result<(), ClientError> { + refresh_token(kanidm).await?; + let _person_created = kanidm + .idm_person_account_create(login, display_name) + .await + .ok(); + let accounts = accounts(kanidm).await?; + let uid = find_account(&accounts, FindBy::Name(login)).await?; + let id = uid.to_string(); + + kanidm + .idm_person_account_update(&id, None, None, None, Some(&[email.to_string()])) + .await?; + let (session_token, status) = kanidm.idm_account_credential_update_begin(&id).await?; + tracing::debug!( + "Begin update credentials ({can_commit}): {status:?}", + can_commit = status.can_commit + ); + + kanidm + .idm_account_credential_update_set_password(&session_token, password) + .await?; + + let status = kanidm + .idm_account_credential_update_status(&session_token) + .await?; + tracing::debug!( + "Set password ({can_commit}): {status:?}", + can_commit = status.can_commit + ); + + let status = kanidm + .idm_account_credential_update_init_totp(&session_token) + .await?; + tracing::debug!( + "Init TOTP ({can_commit}): {status:?}", + can_commit = status.can_commit + ); + + // let status = kanidm + // .idm_account_credential_update_check_totp(&session_token, totp_chal, + // label) .await?; + + tracing::debug!( + "TOTP check ({can_commit}): {status:?}", + can_commit = status.can_commit + ); + + kanidm + .idm_account_credential_update_commit(&session_token) + .await?; + let status = kanidm + .idm_account_credential_update_status(&session_token) + .await?; + tracing::debug!( + "Commit ({can_commit}): {status:?}", + can_commit = status.can_commit + ); + Ok(()) +} + +pub async fn accounts(kanidm: &KanidmClient) -> Result, ClientError> { + refresh_token(kanidm).await?; + + kanidm.idm_person_account_list().await +} + +#[derive(Debug)] +pub enum FindBy<'s> { + Email(&'s str), + Name(&'s str), +} + +impl<'s> FindBy<'s> { + fn key(&self) -> &'static str { + match self { + Self::Email(..) => "mail", + Self::Name(..) => "name", + } + } + fn as_str(&self) -> &'s str { + match self { + Self::Email(s) => s, + Self::Name(s) => s, + } + } +} + +pub async fn find_account(list: &[Entry], find_by: FindBy<'_>) -> Result { + list.iter() + .find_map(|entra| { + tracing::debug!("compare {find_by:?} with {entra:?}"); + let attrs = &entra.attrs; + attrs.get(find_by.key()).filter(|v| { + tracing::debug!("compare value {v:?} with {s}", s = find_by.as_str()); + v.iter().any(|s| s == find_by.as_str()) + })?; + let id = attrs.get("uuid").and_then(|v| v.first())?; + id.parse::().ok() + }) + .ok_or_else(|| { + tracing::info!("User not found"); + ClientError::Unauthorized + }) +} diff --git a/crates/idp/src/main.rs b/crates/idp/src/main.rs new file mode 100644 index 0000000..68454f8 --- /dev/null +++ b/crates/idp/src/main.rs @@ -0,0 +1,61 @@ +#![feature(structural_match)] + +use config::UpdateConfig; + +pub mod actions; +pub mod db; +pub mod idp; +pub mod mqtt; +pub mod rpc; + +pub type Result = std::result::Result; + +#[derive(Debug, Copy, Clone, serde::Serialize, thiserror::Error)] +pub enum Error { + #[error("Unable to send or receive msg from database")] + DbCritical, + #[error("Failed to load account data")] + Account, + #[error("Failed to load account addresses")] + Addresses, + #[error("Unable to save record")] + Saving, + #[error("Unable to hash password")] + Hashing, +} + +pub struct Opts {} + +impl UpdateConfig for Opts {} + +#[tokio::main] +async fn main() { + dotenv::dotenv().ok(); + config::init_tracing("account-manager"); + + let opts = Opts {}; + + let config = config::default_load(&opts); + + let db = db::Database::build(config.clone()).await; + + let kanidm = kanidm_client::KanidmClientBuilder::new() + .address(config.lock().idp().idm_url().to_owned()) + .danger_accept_invalid_certs(cfg!(debug_assertions)) + .connect_timeout(2) + .build() + .unwrap(); + idp::accounts(&kanidm).await.unwrap(); + idp::create_account_with_password( + &kanidm, + "eraden", + "Adrian Woźniak", + "adrian.wozniak@ita-prog.pl", + "n59GmOOdcpVUJqJ1", + ) + .await + .unwrap(); + + let mqtt_client = mqtt::start(config.clone(), db.clone()).await; + rpc::start(config.clone(), db.clone(), mqtt_client.clone()).await; +} diff --git a/crates/idp/src/mqtt.rs b/crates/idp/src/mqtt.rs new file mode 100644 index 0000000..25541dc --- /dev/null +++ b/crates/idp/src/mqtt.rs @@ -0,0 +1,30 @@ +use config::SharedAppConfig; +use rumqttc::{Event, Incoming}; + +use crate::db::Database; + +pub async fn start(config: SharedAppConfig, _db: Database) -> channels::AsyncClient { + let (client, mut event_loop) = channels::accounts::mqtt::create_client(config); + + let spawn_client = client.clone(); + tokio::spawn(async move { + let _client = spawn_client.clone(); + loop { + let notification = event_loop.poll().await; + + match notification { + Ok(Event::Incoming(Incoming::Publish(publish))) => match publish.topic.as_str() { + _ => {} + }, + Ok(Event::Incoming(_incoming)) => {} + Ok(Event::Outgoing(_outgoing)) => {} + Err(e) => { + tracing::error!("{}", e); + } + } + } + // tracing::info!("Mqtt channel closed"); + }); + + client +} diff --git a/crates/idp/src/rpc.rs b/crates/idp/src/rpc.rs new file mode 100644 index 0000000..1fb946d --- /dev/null +++ b/crates/idp/src/rpc.rs @@ -0,0 +1,86 @@ +use channels::accounts::rpc::Accounts; +use channels::accounts::{all, find_by_identity, me, register}; +use channels::AsyncClient; +use config::SharedAppConfig; +use tarpc::context; + +use crate::actions; +use crate::db::Database; + +#[derive(Debug, Copy, Clone, serde::Serialize, thiserror::Error)] +#[serde(rename_all = "kebab-case", tag = "account")] +pub enum Error { + #[error("Unable to send or receive msg from database")] + DbCritical, + #[error("Failed to load account data")] + Account, + #[error("Failed to load account addresses")] + Addresses, + #[error("Unable to save record")] + Saving, + #[error("Unable to hash password")] + Hashing, +} + +#[derive(Clone)] +struct AccountsServer { + db: Database, + config: SharedAppConfig, + mqtt_client: AsyncClient, +} + +impl Accounts for AccountsServer { + async fn me(self, _: context::Context, input: me::Input) -> me::Output { + let res = actions::me(input.account_id, self.db).await; + tracing::info!("ME result: {:?}", res); + res + } + + async fn register_account( + self, + _: context::Context, + input: register::Input, + ) -> register::Output { + use channels::accounts::Error; + + let res = actions::create_account(input, &self.db, self.config).await; + tracing::info!("REGISTER result: {:?}", res); + match res { + Ok(account) => { + self.mqtt_client.emit_account_created(&account).await; + Ok(register::Details { account }) + } + Err(_e) => Err(Error::Account), + } + } + + async fn all(self, _: context::Context, input: all::Input) -> all::Output { + let res = actions::all(input, self.db).await; + tracing::info!("ME result: {:?}", res); + res + } + + async fn find_by_identity( + self, + _: context::Context, + input: find_by_identity::Input, + ) -> find_by_identity::Output { + let res = actions::find_by_identity(input, self.db).await; + tracing::info!("ME result: {:?}", res); + res + } +} + +pub async fn start(config: SharedAppConfig, db: Database, mqtt_client: AsyncClient) { + let port = { config.lock().idp().rpc_port }; + + channels::rpc::start("accounts", port, || { + AccountsServer { + db: db.clone(), + config: config.clone(), + mqtt_client: mqtt_client.clone(), + } + .serve() + }) + .await; +}