Remove old architecture

This commit is contained in:
Adrian Woźniak 2022-11-17 16:56:08 +01:00
parent 703338ad2b
commit 5e097274bc
No known key found for this signature in database
GPG Key ID: 0012845A89C7352B
29 changed files with 256 additions and 1081 deletions

989
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -9,18 +9,18 @@ members = [
# actors
"crates/account_manager",
"crates/cart_manager",
"crates/database_manager",
# "crates/database_manager",
"crates/email_manager",
"crates/order_manager",
"crates/payment_manager",
# "crates/order_manager",
# "crates/payment_manager",
"crates/search_manager",
"crates/stock_manager",
"crates/token_manager",
"crates/fs_manager",
"crates/lang_provider",
# artifacts
"crates/db-seed",
"crates/api",
# "crates/db-seed",
# "crates/api",
"crates/web",
# vendor
"vendor/t_pay",

View File

@ -1,43 +1,73 @@
use channels::accounts::{me, register};
use channels::accounts::{all, find_by_identity, me, register};
use config::SharedAppConfig;
use model::{Encrypt, FullAccount};
use model::{Encrypt, FullAccount, Ranged};
use crate::db::{AccountAddresses, Database, FindAccount};
use crate::{Error, Result};
#[allow(unused)]
pub async fn me(account_id: model::AccountId, db: Database) -> me::Output {
use channels::accounts::Error;
let mut t = match db.pool.begin().await {
Ok(t) => t,
Err(e) => {
tracing::error!("{}", e);
return Err(Error::Account);
}
};
let res = FindAccount { account_id }.run(&mut t).await;
let account: model::FullAccount = match res {
Ok(account) => account,
Err(e) => {
tracing::error!("{}", e);
t.rollback().await.ok();
return Err(Error::Account);
}
};
let res = AccountAddresses { account_id }.run(&mut t).await;
let addresses = match res {
macro_rules! ok_or_rollback {
($res: expr, $t: expr, $err: expr) => {
match $res {
Ok(v) => v,
Err(e) => {
tracing::error!("{}", e);
t.rollback().await.ok();
$t.rollback().await.ok();
return Err(Error::Addresses);
return Err($err);
}
}
};
t.commit().await.ok();
}
macro_rules! begin_t {
($db: expr, $err: expr) => {
match $db.pool.begin().await {
Ok(t) => t,
Err(e) => {
tracing::error!("{}", e);
return Err($err);
}
}
};
}
pub async fn all(input: all::Input, db: Database) -> all::Output {
use channels::accounts::Error;
let mut t = begin_t!(db, Error::DbCritical);
let dbm = crate::db::AllAccounts {
limit: input.limit.into_raw(),
offset: input.offset.into_raw(),
};
let res = dbm.run(&mut t).await;
let accounts = ok_or_rollback!(res, t, Error::All);
t.commit().await.map_err(|e| {
tracing::error!("{}", e);
Error::DbCritical
})?;
Ok(all::Details { accounts })
}
pub async fn me(account_id: model::AccountId, db: Database) -> me::Output {
use channels::accounts::Error;
let mut t = begin_t!(db, Error::Account);
let res = FindAccount { account_id }.run(&mut t).await;
let account: FullAccount = ok_or_rollback!(res, t, Error::Account);
let res = AccountAddresses { account_id }.run(&mut t).await;
let addresses = ok_or_rollback!(res, t, Error::Addresses);
t.commit().await.map_err(|e| {
tracing::error!("{}", e);
Error::DbCritical
})?;
Ok(me::Details { account, addresses })
}
@ -55,10 +85,8 @@ pub async fn create_account(
Error::Hashing
})?;
let mut t = db.pool.begin().await.map_err(|e| {
tracing::error!("{}", e);
Error::DbCritical
})?;
let mut t = begin_t!(db, Error::DbCritical);
let res = crate::db::CreateAccount {
email: msg.email,
login: msg.login,
@ -68,14 +96,8 @@ pub async fn create_account(
.run(&mut t)
.await;
let account: FullAccount = match res {
Ok(r) => r,
Err(e) => {
tracing::error!("{}", e);
t.rollback().await.ok();
return Err(Error::Saving);
}
};
let account: FullAccount = ok_or_rollback!(res, t, Error::Saving);
t.commit().await.map_err(|e| {
tracing::error!("{}", e);
Error::DbCritical
@ -83,3 +105,22 @@ pub async fn create_account(
Ok(account)
}
pub async fn find_by_identity(
input: find_by_identity::Input,
db: Database,
) -> find_by_identity::Output {
use channels::accounts::Error;
let mut t = begin_t!(db, Error::DbCritical);
let dbm = crate::db::AccountByIdentity {
login: input.login,
email: input.email,
};
let res = dbm.run(&mut t).await;
let account: FullAccount = ok_or_rollback!(res, t, Error::InvalidIdentity);
Ok(find_by_identity::Details { account })
}

View File

@ -1,5 +1,5 @@
use channels::accounts::rpc::Accounts;
use channels::accounts::{me, register};
use channels::accounts::{all, find_by_identity, me, register};
use channels::AsyncClient;
use config::SharedAppConfig;
use tarpc::context;
@ -54,6 +54,22 @@ impl Accounts for AccountsServer {
Err(_e) => Err(Error::Account),
}
}
async fn all(self, _: context::Context, input: all::Input) -> all::Output {
let res = actions::all(input, self.db).await;
tracing::info!("ME result: {:?}", res);
res
}
async fn find_by_identity(
self,
_: context::Context,
input: find_by_identity::Input,
) -> find_by_identity::Output {
let res = actions::find_by_identity(input, self.db).await;
tracing::info!("ME result: {:?}", res);
res
}
}
pub async fn start(config: SharedAppConfig, db: Database, mqtt_client: AsyncClient) {

View File

@ -21,7 +21,7 @@ bytes = { version = "1.1.0" }
channels = { path = "../channels", features = ['accounts', 'carts', 'emails', 'search'] }
chrono = { version = "0.4", features = ["serde"] }
config = { path = "../config" }
database_manager = { path = "../database_manager" }
#database_manager = { path = "../database_manager" }
derive_more = { version = "0.99", features = [] }
dotenv = { version = "0.15", features = [] }
fs_manager = { path = "../fs_manager" }

View File

@ -36,8 +36,6 @@ pub enum Error {
PassFile(std::io::Error),
#[error("Unable to read password from STDIN. {0:?}")]
ReadPass(std::io::Error),
#[error("{0}")]
Database(#[from] database_manager::Error),
}
pub type Result<T> = std::result::Result<T, Error>;

View File

@ -3,8 +3,9 @@ use actix_web::web::{Data, Json, ServiceConfig};
use actix_web::{get, patch, post, HttpResponse};
use actix_web_httpauth::extractors::bearer::BearerAuth;
use config::SharedAppConfig;
use database_manager::Database;
use model::{AccountAddress, AccountId, AccountState, Encrypt, PasswordConfirmation};
use model::{
AccountAddress, AccountId, AccountState, Encrypt, Limit, Offset, PasswordConfirmation,
};
use token_manager::TokenManager;
use crate::routes::admin::Error;
@ -15,11 +16,23 @@ use crate::{admin_send_db, routes, Email, Login, PassHash, Password, Role};
pub async fn accounts(
credentials: BearerAuth,
tm: Data<Addr<TokenManager>>,
db: Data<Addr<Database>>,
am: Data<AccountsClient>,
) -> routes::Result<HttpResponse> {
use channels::accounts::rpc::Accounts;
credentials.require_admin(tm.into_inner()).await?;
let accounts = admin_send_db!(db, database_manager::AllAccounts);
let res = am
.all(
tarpc::context::current(),
channels::accounts::all::Input {
limit: Limit::default(),
offset: Offset::from_u32(0),
},
)
.await;
let accounts = res.unwrap_or_default();
Ok(HttpResponse::Ok().json(accounts))
}
@ -38,6 +51,7 @@ pub struct UpdateAccountInput {
pub async fn update_account(
credentials: BearerAuth,
tm: Data<Addr<TokenManager>>,
am: Data<AccountsClient>,
db: Data<Addr<Database>>,
Json(payload): Json<UpdateAccountInput>,
config: Data<SharedAppConfig>,
@ -87,6 +101,7 @@ pub async fn create_account(
credentials: BearerAuth,
tm: Data<Addr<TokenManager>>,
db: Data<Addr<Database>>,
am: Data<AccountsClient>,
Json(payload): Json<model::api::admin::RegisterInput>,
config: Data<SharedAppConfig>,
) -> routes::Result<Json<model::api::admin::RegisterResponse>> {

View File

@ -8,7 +8,6 @@ use actix::Addr;
use actix_web::web::{scope, Data, Json, ServiceConfig};
use actix_web::{delete, post};
use actix_web_httpauth::extractors::bearer::BearerAuth;
use database_manager::{query_db, Database};
use model::Encrypt;
use token_manager::TokenManager;

View File

@ -2,7 +2,6 @@ use actix::Addr;
use actix_web::get;
use actix_web::web::{Data, Json, ServiceConfig};
use actix_web_httpauth::extractors::bearer::BearerAuth;
use database_manager::Database;
use model::api::Orders;
use token_manager::TokenManager;

View File

@ -5,7 +5,6 @@ use actix_web::web::{Data, Json, ServiceConfig};
use actix_web::{delete, get, patch, post, HttpResponse};
use actix_web_httpauth::extractors::bearer::BearerAuth;
use config::SharedAppConfig;
use database_manager::Database;
use model::{
api, Days, Price, ProductCategory, ProductId, ProductLongDesc, ProductName, ProductShortDesc,
Quantity, QuantityUnit,

View File

@ -2,7 +2,6 @@ use actix::Addr;
use actix_web::web::{Data, Json, ServiceConfig};
use actix_web::{delete, get, patch, post, HttpResponse};
use actix_web_httpauth::extractors::bearer::BearerAuth;
use database_manager::Database;
use model::{ProductId, Quantity, QuantityUnit, StockId};
use serde::Deserialize;
use token_manager::TokenManager;

View File

@ -3,7 +3,6 @@ use actix_multipart::Multipart;
use actix_web::web::{Data, ServiceConfig};
use actix_web::{post, HttpResponse};
use actix_web_httpauth::extractors::bearer::BearerAuth;
use database_manager::{query_db, Database};
use fs_manager::FsManager;
use futures_util::StreamExt;
use token_manager::TokenManager;

View File

@ -2,7 +2,6 @@ use actix::Addr;
use actix_web::web::{scope, Data, Json, ServiceConfig};
use actix_web::{delete, get, post, put, HttpRequest, HttpResponse};
use actix_web_httpauth::extractors::bearer::BearerAuth;
use database_manager::{query_db, Database};
use model::api;
use order_manager::{query_order, OrderManager};
use payment_manager::{query_pay, PaymentManager};

View File

@ -1,8 +1,9 @@
use actix::Addr;
use actix_web::web::{Data, Json, Path, Query, ServiceConfig};
use actix_web::{get, post, HttpResponse};
use channels::accounts::rpc::{Accounts, AccountsClient};
use channels::stocks::rpc::{Stocks, StocksClient};
use config::SharedAppConfig;
use database_manager::{query_db, Database};
use model::Encrypt;
use payment_manager::{PaymentManager, PaymentNotification};
use token_manager::TokenManager;
@ -13,7 +14,7 @@ use crate::routes::{self};
#[get("/search")]
async fn search(
db: Data<Addr<Database>>,
am: Data<AccountsClient>,
_config: Data<SharedAppConfig>,
search: Data<channels::search::rpc::SearchClient>,
query: Query<model::api::SearchRequest>,
@ -56,7 +57,7 @@ async fn search(
#[get("/products")]
async fn products(
db: Data<Addr<Database>>,
am: Data<AccountsClient>,
config: Data<SharedAppConfig>,
) -> routes::Result<Json<model::api::Products>> {
let db = db.into_inner();
@ -85,7 +86,7 @@ async fn products(
#[get("/product/{id}")]
async fn product(
path: Path<model::RecordId>,
db: Data<Addr<Database>>,
am: Data<AccountsClient>,
config: Data<SharedAppConfig>,
) -> routes::Result<Json<model::api::Product>> {
let product_id: model::ProductId = path.into_inner().into();
@ -123,14 +124,14 @@ async fn product(
}
#[get("/stocks")]
async fn stocks(db: Data<Addr<Database>>) -> routes::Result<Json<Vec<model::Stock>>> {
async fn stocks(sc: Data<StocksClient>) -> routes::Result<Json<Vec<model::Stock>>> {
let stocks = public_send_db!(owned, db.into_inner(), database_manager::AllStocks);
Ok(Json(stocks))
}
#[post("/register")]
pub async fn create_account(
db: Data<Addr<Database>>,
am: Data<AccountsClient>,
Json(payload): Json<model::api::CreateAccountInput>,
config: Data<SharedAppConfig>,
tm: Data<Addr<TokenManager>>,
@ -180,7 +181,7 @@ pub async fn create_account(
#[post("/sign-in")]
async fn sign_in(
Json(payload): Json<model::api::SignInInput>,
db: Data<Addr<Database>>,
am: Data<AccountsClient>,
tm: Data<Addr<TokenManager>>,
) -> routes::Result<Json<model::api::SessionOutput>> {
let db = db.into_inner();

View File

@ -6,14 +6,20 @@ use crate::{AsyncClient, DeserializePayload};
#[derive(Debug, thiserror::Error, serde::Serialize, serde::Deserialize)]
pub enum Error {
#[error("Unable to connect to database")]
DbCritical,
#[error("mqtt payload has invalid create account data")]
InvalidCreateAccount,
#[error("mqtt payload has invalid account failure data")]
InvalidAccountFailure,
#[error("All accounts can't be loaded")]
All,
#[error("Account does not exists")]
Account,
#[error("Account does have any addresses")]
Addresses,
#[error("No account for identity found")]
InvalidIdentity,
}
pub static CLIENT_NAME: &str = "account-manager";
@ -106,6 +112,44 @@ pub mod me {
pub type Output = Result<Details, Error>;
}
pub mod all {
use model::{Limit, Offset};
use crate::accounts::Error;
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct Input {
pub limit: Limit,
pub offset: Offset,
}
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct Details {
pub accounts: Vec<model::FullAccount>,
}
pub type Output = Result<Details, Error>;
}
pub mod find_by_identity {
use model::{Email, Login};
use crate::accounts::Error;
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct Input {
pub login: Option<Login>,
pub email: Option<Email>,
}
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct Details {
pub account: model::FullAccount,
}
pub type Output = Result<Details, Error>;
}
impl AsyncClient {
pub async fn emit_account_created(&self, account: &model::FullAccount) {
self.publish_or_log(Topic::AccountCreated, QoS::AtLeastOnce, true, account)
@ -116,7 +160,7 @@ impl AsyncClient {
pub mod rpc {
use config::SharedAppConfig;
use crate::accounts::{me, register};
use crate::accounts::{all, find_by_identity, me, register};
#[tarpc::service]
pub trait Accounts {
@ -125,6 +169,12 @@ pub mod rpc {
/// Creates new user account.
async fn register_account(input: register::Input) -> register::Output;
/// Load full data about all accounts
async fn all(input: all::Input) -> all::Output;
/// Find account for email and/or login
async fn find_by_identity(input: find_by_identity::Input) -> find_by_identity::Output;
}
pub async fn create_client(config: SharedAppConfig) -> AccountsClient {

View File

@ -48,7 +48,6 @@ FROM accounts
})
}
#[cfg_attr(feature = "dummy", derive(fake::Dummy))]
#[derive(actix::Message, Debug)]
#[rtype(result = "Result<FullAccount>")]
pub struct CreateAccount {
@ -88,7 +87,6 @@ RETURNING id, email, login, pass_hash, role, customer_id, state
})
}
#[cfg_attr(feature = "dummy", derive(fake::Dummy))]
#[derive(actix::Message)]
#[rtype(result = "Result<FullAccount>")]
pub struct UpdateAccount {

View File

@ -3,17 +3,17 @@ use config::SharedAppConfig;
use sqlx::PgPool;
use sqlx_core::arguments::Arguments;
pub use crate::account_addresses::*;
pub use crate::accounts::*;
// pub use crate::account_addresses::*;
// pub use crate::accounts::*;
pub use crate::order_addresses::*;
pub use crate::order_items::*;
pub use crate::orders::*;
pub use crate::photos::*;
pub use crate::product_photos::*;
pub use crate::products::*;
// pub use crate::photos::*;
// pub use crate::product_photos::*;
// pub use crate::products::*;
pub use crate::shopping_cart_items::*;
pub use crate::shopping_carts::*;
pub use crate::stocks::*;
// pub use crate::stocks::*;
pub use crate::tokens::*;
pub mod account_addresses;

View File

@ -49,7 +49,6 @@ ORDER BY id DESC
})
}
#[cfg_attr(feature = "dummy", derive(fake::Dummy))]
#[derive(actix::Message)]
#[rtype(result = "Result<OrderItem>")]
pub struct CreateOrderItem {

View File

@ -92,7 +92,6 @@ WHERE id = $1
})
}
#[cfg_attr(feature = "dummy", derive(fake::Dummy))]
#[derive(Message, Debug)]
#[rtype(result = "Result<model::Product>")]
pub struct CreateProduct {

View File

@ -4,14 +4,10 @@ version = "0.1.0"
edition = "2021"
[dependencies]
actix = { version = "0.13", features = [] }
actix-rt = { version = "2.7", features = [] }
actix-web = { version = "4.0", features = [] }
bytes = { version = "1.1.0" }
config = { path = "../config" }
database_manager = { path = "../database_manager", features = ["dummy"] }
dotenv = { version = "0.15", features = [] }
fake = { version = "2.4.3", features = ["derive", "chrono", "http"] }
fakeit = { version = "1.1.1", features = [] }
fs_manager = { path = "../fs_manager", features = [] }
human-panic = { version = "1.0.3" }
model = { path = "../model", version = "0.1", features = ["db", "dummy"] }
@ -21,3 +17,6 @@ thiserror = { version = "1.0.31" }
tokio = { version = "1.18.1", features = ["full"] }
tracing = { version = "0.1.34" }
tracing-subscriber = { version = "0.3.11" }
account-manager = { path = '../account_manager' }
stock-manager = { path = "../stock_manager" }
channels = { path = '../channels' }

View File

@ -1,12 +1,9 @@
use actix::Addr;
use config::SharedAppConfig;
use database_manager::{query_db, Database};
use fake::{Fake, Faker};
use crate::{Result, SharedState};
pub(crate) async fn create_accounts(
db: Addr<Database>,
ac: AccountsClient,
seed: SharedState,
_config: SharedAppConfig,
) -> Result<()> {
@ -19,7 +16,7 @@ pub(crate) async fn create_accounts(
let mut accounts = Vec::with_capacity(10);
for _ in 0..10 {
let msg: database_manager::CreateAccount = Faker.fake::<database_manager::CreateAccount>();
let msg: database_manager::CreateAccount = CreateA;
match db.send(msg).await {
Ok(Ok(account)) => accounts.push(account),

View File

@ -38,7 +38,7 @@ pub(crate) struct DbSeed {
pub(crate) type SharedState = Arc<Mutex<DbSeed>>;
#[actix_web::main]
#[tokio::main]
async fn main() {
human_panic::setup_panic!();

View File

@ -1,7 +1,5 @@
use chrono::NaiveDateTime;
use derive_more::Deref;
#[cfg(feature = "dummy")]
use fake::Fake;
use serde::{Deserialize, Serialize};
use crate::*;

View File

@ -10,10 +10,6 @@ use std::ops::{BitOr, Range};
use std::str::FromStr;
use derive_more::{Deref, DerefMut, Display, From};
#[cfg(feature = "dummy")]
use fake::Fake;
#[cfg(feature = "dummy")]
use rand::Rng;
use serde::de::{Error, Visitor};
use serde::{Deserialize, Deserializer, Serialize};
@ -314,22 +310,17 @@ pub trait Ranged: Sized + From<u32> + Copy {
#[cfg_attr(feature = "db", derive(sqlx::Type))]
#[cfg_attr(feature = "db", sqlx(transparent))]
#[derive(
Default,
Debug,
Copy,
Clone,
Hash,
PartialOrd,
PartialEq,
Eq,
Serialize,
Deserialize,
Deref,
From,
Debug, Copy, Clone, Hash, PartialOrd, PartialEq, Eq, Serialize, Deserialize, Deref, From,
)]
#[serde(transparent)]
pub struct Limit(NonNegative);
impl Default for Limit {
fn default() -> Self {
200.into()
}
}
impl From<u32> for Limit {
fn from(value: u32) -> Self {
Self::from_u32(value)
@ -965,8 +956,6 @@ impl ProductCategory {
pub mod v2 {
use derive_more::{Deref, Display, From};
#[cfg(feature = "dummy")]
use fake::Fake;
use serde::{Deserialize, Serialize};
pub use crate::{

View File

@ -23,7 +23,7 @@ CREATE TABLE products (
CREATE TABLE product_variants (
id serial NOT NULL PRIMARY KEY,
product_id integer REFERENCES products (id) NOT NULL,
product_id integer REFERENCES products (id) ON DELETE CASCADE NOT NULL,
"name" character varying NOT NULL,
short_description character varying NOT NULL,
long_description character varying NOT NULL,
@ -33,7 +33,7 @@ CREATE TABLE product_variants (
CREATE TABLE stocks (
id serial NOT NULL PRIMARY KEY,
product_variant_id integer REFERENCES product_variants(id) NOT NULL,
product_variant_id integer REFERENCES product_variants(id) ON DELETE CASCADE NOT NULL,
quantity integer DEFAULT 0 NOT NULL,
quantity_unit "QuantityUnit" NOT NULL,
CONSTRAINT positive_quantity CHECK ((quantity >= 0))
@ -41,6 +41,6 @@ CREATE TABLE stocks (
CREATE TABLE product_photos (
id serial NOT NULL PRIMARY KEY,
product_variant_id integer REFERENCES product_variants(id) NOT NULL,
product_variant_id integer REFERENCES product_variants(id) ON DELETE CASCADE NOT NULL,
photo_id integer REFERENCES photos(id) NOT NULL
);

View File

@ -283,4 +283,24 @@ mod tests {
assert_eq!(new_product.id, product.id);
assert_eq!(new_product.name, new_name);
}
#[tokio::test]
async fn delete_product() {
testx::db_t_ref!(t);
let product = test_product_with_variant(&mut t).await;
let res = inner_delete_product(
delete_product::Input {
product_id: product.id,
},
&mut t,
)
.await;
testx::db_rollback!(t);
let (id, _new_product) = res.unwrap();
assert_eq!(id, product.id);
}
}

View File

@ -110,6 +110,12 @@ impl DeleteProductVariant {
r#"
DELETE FROM product_variants
WHERE id = $1
RETURNING id,
product_id,
name,
short_description,
long_description,
price
"#,
)
.bind(self.product_variant_id)

View File

@ -185,6 +185,7 @@ RETURNING id,
.await
.map_err(|e| {
tracing::error!("{e:?}");
eprintln!("{e:?}");
Error::Delete(self.product_id)
})
}

View File

@ -10,6 +10,8 @@ pub enum Error {
Update(StockId),
#[error("Unable to delete stock {0:?}")]
Delete(StockId),
#[error("Unable to delete all stock for variant {0:?}")]
DeleteAllProductStocks(ProductId),
#[error("Unable find stock for product")]
ProductVariantStock,
#[error("Stock {0:?} does not exists")]
@ -138,7 +140,10 @@ pub struct DeleteStock {
}
impl DeleteStock {
async fn run(self, pool: &mut sqlx::Transaction<'_, sqlx::Postgres>) -> Result<Option<Stock>> {
pub async fn run(
self,
pool: &mut sqlx::Transaction<'_, sqlx::Postgres>,
) -> Result<Option<Stock>> {
sqlx::query_as(
r#"
DELETE FROM stocks