Change cart manager into microservice

This commit is contained in:
eraden 2022-11-04 18:40:14 +01:00
parent 8e037fe1e7
commit f5ecbfb338
156 changed files with 8335 additions and 738 deletions

3
.env
View File

@ -1,4 +1,7 @@
DATABASE_NAME=bazzar
DATABASE_URL=postgres://postgres@localhost/bazzar DATABASE_URL=postgres://postgres@localhost/bazzar
ACCOUNT_DATABASE_URL=postgres://postgres@localhost/bazzar_accounts
CART_DATABASE_URL=postgres://postgres@localhost/bazzar_carts
PASS_SALT=18CHwV7eGFAea16z+qMKZg PASS_SALT=18CHwV7eGFAea16z+qMKZg
RUST_LOG=debug RUST_LOG=debug

14
Cargo.lock generated
View File

@ -12,7 +12,6 @@ dependencies = [
"bytes", "bytes",
"channels", "channels",
"config", "config",
"database_manager",
"dotenv", "dotenv",
"futures 0.3.25", "futures 0.3.25",
"gumdrop", "gumdrop",
@ -23,6 +22,8 @@ dependencies = [
"pretty_env_logger", "pretty_env_logger",
"rumqttc", "rumqttc",
"serde", "serde",
"sqlx",
"sqlx-core",
"tarpc", "tarpc",
"thiserror", "thiserror",
"tokio", "tokio",
@ -836,13 +837,22 @@ dependencies = [
"channels", "channels",
"chrono", "chrono",
"config", "config",
"database_manager", "dotenv",
"futures 0.3.25",
"model", "model",
"opentelemetry 0.17.0",
"opentelemetry-jaeger",
"pretty_env_logger", "pretty_env_logger",
"rumqttc", "rumqttc",
"serde", "serde",
"sqlx",
"sqlx-core",
"tarpc",
"thiserror", "thiserror",
"tokio",
"tracing", "tracing",
"tracing-opentelemetry",
"tracing-subscriber",
"uuid 0.8.2", "uuid 0.8.2",
] ]

View File

@ -1,25 +1,25 @@
[workspace] [workspace]
members = [ members = [
# shared # shared
"shared/model", "crates/model",
"shared/channels", "crates/channels",
"shared/config", "crates/config",
"shared/testx", "crates/testx",
# actors # actors
"actors/account_manager", "crates/account_manager",
"actors/cart_manager", "crates/cart_manager",
"actors/database_manager", "crates/database_manager",
"actors/email_manager", "crates/email_manager",
"actors/order_manager", "crates/order_manager",
"actors/payment_manager", "crates/payment_manager",
"actors/search_manager", "crates/search_manager",
"actors/token_manager", "crates/token_manager",
"actors/fs_manager", "crates/fs_manager",
"actors/lang_provider", "crates/lang_provider",
# artifacts # artifacts
"db-seed", "crates/db-seed",
"api", "crates/api",
"web", "crates/web",
# vendor # vendor
"vendor/t_pay", "vendor/t_pay",
] ]

View File

@ -1,99 +0,0 @@
use std::time::Duration;
use channels::account::{AccountFailure, CreateAccount, Topic};
use config::SharedAppConfig;
use database_manager::Database;
use rumqttc::{AsyncClient, Event, Incoming, MqttOptions, QoS};
use crate::{actions, Error};
pub async fn start(config: SharedAppConfig, db: Database) -> channels::AsyncClient {
tracing::info!("Starting account mqtt at 0.0.0.0:1883");
let mut mqtt_options = MqttOptions::new(channels::account::CLIENT_NAME, "0.0.0.0", 1883);
mqtt_options.set_keep_alive(Duration::from_secs(5));
let (client, mut event_loop) = AsyncClient::new(mqtt_options, 10);
client
.subscribe(Topic::CreateAccount, QoS::AtLeastOnce)
.await
.unwrap();
let client = channels::AsyncClient(client);
let spawn_client = client.clone();
tokio::spawn(async move {
let client = spawn_client.clone();
loop {
let notification = event_loop.poll().await;
match notification {
Ok(Event::Incoming(Incoming::Publish(publish))) => match publish.topic.as_str() {
topic if Topic::CreateAccount == topic => {
if let Ok(channels::account::CreateAccount {
email,
login,
password,
role,
}) = channels::account::CreateAccount::try_from(publish.payload)
{
match actions::create_account(
CreateAccount {
email,
login,
password,
role,
},
&db,
config.clone(),
)
.await
{
Ok(account) => {
client
.publish_or_log(
Topic::AccountCreated,
QoS::AtLeastOnce,
true,
model::Account::from(account),
)
.await;
}
Err(e) => {
tracing::error!("{}", e);
let m = match e {
Error::Hashing => {
Some(AccountFailure::FailedToHashPassword)
}
Error::Saving => Some(AccountFailure::SaveAccount),
Error::DbCritical => {
Some(AccountFailure::InternalServerError)
}
_ => None,
};
if let Some(m) = m {
client
.publish_or_log(
Topic::SignUpFailure,
QoS::AtLeastOnce,
true,
m,
)
.await;
}
}
}
}
}
_ => {}
},
Ok(Event::Incoming(_incoming)) => {}
Ok(Event::Outgoing(_outgoing)) => {}
Err(e) => {
tracing::error!("{}", e);
}
}
}
});
client
// tracing::info!("Mqtt channel closed");
}

View File

@ -1,19 +0,0 @@
[package]
name = "cart_manager"
version = "0.1.0"
edition = "2021"
[dependencies]
actix = { version = "0.13", features = [] }
actix-rt = { version = "2.7", features = [] }
channels = { path = "../../shared/channels" }
chrono = { version = "0.4", features = ["serde"] }
config = { path = "../../shared/config" }
database_manager = { path = "../database_manager" }
model = { path = "../../shared/model" }
pretty_env_logger = { version = "0.4", features = [] }
rumqttc = { version = "*" }
serde = { version = "1.0.137", features = ["derive"] }
thiserror = { version = "1.0.31" }
tracing = { version = "0.1.34" }
uuid = { version = "0.8", features = ["serde"] }

View File

@ -1,286 +0,0 @@
#![feature(drain_filter)]
use std::collections::HashSet;
use database_manager::{query_db, Database};
use model::{PaymentMethod, ShoppingCartId};
#[macro_export]
macro_rules! cart_async_handler {
($msg: ty, $async: ident, $res: ty) => {
impl actix::Handler<$msg> for CartManager {
type Result = actix::ResponseActFuture<Self, Result<$res>>;
fn handle(&mut self, msg: $msg, _ctx: &mut Self::Context) -> Self::Result {
use actix::WrapFuture;
let db = self.db.clone();
Box::pin(async { $async(msg, db).await }.into_actor(self))
}
}
};
}
#[macro_export]
macro_rules! query_cart {
($cart: expr, $msg: expr, default $fail: expr) => {
match $cart.send($msg).await {
Ok(Ok(r)) => r,
Ok(Err(e)) => {
tracing::error!("{e}");
$fail
}
Err(e) => {
tracing::error!("{e:?}");
$fail
}
}
};
($cart: expr, $msg: expr, $fail: expr) => {
$crate::query_cart!($cart, $msg, $fail, $fail)
};
($cart: expr, $msg: expr, $db_fail: expr, $act_fail: expr) => {
match $cart.send($msg).await {
Ok(Ok(r)) => r,
Ok(Err(e)) => {
tracing::error!("{e}");
return Err($db_fail);
}
Err(e) => {
tracing::error!("{e:?}");
return Err($act_fail);
}
}
};
}
#[derive(Debug, Copy, Clone, serde::Serialize, thiserror::Error)]
#[serde(rename_all = "kebab-case", tag = "cart")]
pub enum Error {
#[error("System can't ensure shopping cart existence")]
ShoppingCartFailed,
#[error("Shopping cart is not available for unknown reason")]
CartNotAvailable,
#[error("Failed to modify item to cart")]
CantModifyItem,
#[error("Failed to modify cart")]
CantModifyCart,
#[error("{0}")]
Db(#[from] database_manager::Error),
#[error("Unable to update cart item")]
UpdateFailed,
}
pub type Result<T> = std::result::Result<T, Error>;
pub struct CartManager {
db: actix::Addr<Database>,
}
impl actix::Actor for CartManager {
type Context = actix::Context<Self>;
}
impl CartManager {
pub fn new(db: actix::Addr<Database>) -> Self {
Self { db }
}
}
#[derive(actix::Message, Debug)]
#[rtype(result = "Result<Option<model::ShoppingCartItem>>")]
pub struct ModifyItem {
pub buyer_id: model::AccountId,
pub product_id: model::ProductId,
pub quantity: model::Quantity,
pub quantity_unit: model::QuantityUnit,
}
cart_async_handler!(ModifyItem, modify_item, Option<model::ShoppingCartItem>);
async fn modify_item(
msg: ModifyItem,
db: actix::Addr<Database>,
) -> Result<Option<model::ShoppingCartItem>> {
let _cart = query_db!(
db,
database_manager::EnsureActiveShoppingCart {
buyer_id: msg.buyer_id,
},
Error::ShoppingCartFailed
);
let mut carts: Vec<model::ShoppingCart> = query_db!(
db,
database_manager::AccountShoppingCarts {
account_id: msg.buyer_id,
state: Some(model::ShoppingCartState::Active),
},
passthrough Error::Db,
Error::CartNotAvailable
);
let cart = if carts.is_empty() {
return Err(Error::CartNotAvailable);
} else {
carts.remove(0)
};
let item: Option<model::ShoppingCartItem> = query_db!(
db,
database_manager::ActiveCartItemByProduct {
product_id: msg.product_id
},
Error::CantModifyItem
);
match item {
Some(item) if **item.quantity == 0 => Ok(query_db!(
db,
database_manager::DeleteShoppingCartItem { id: item.id },
passthrough Error::Db,
Error::CantModifyItem
)),
Some(item) => Ok(Some(query_db!(
db,
database_manager::UpdateShoppingCartItem {
id: item.id,
product_id: msg.product_id,
shopping_cart_id: cart.id,
quantity: msg.quantity,
quantity_unit: msg.quantity_unit,
},
passthrough Error::Db,
Error::CantModifyItem
))),
None => Ok(Some(query_db!(
db,
database_manager::CreateShoppingCartItem {
product_id: msg.product_id,
shopping_cart_id: cart.id,
quantity: msg.quantity,
quantity_unit: msg.quantity_unit,
},
passthrough Error::Db,
Error::CantModifyItem
))),
}
}
#[derive(actix::Message)]
#[rtype(result = "Result<Option<model::ShoppingCartItem>>")]
pub struct RemoveProduct {
pub shopping_cart_id: model::ShoppingCartId,
pub shopping_cart_item_id: model::ShoppingCartItemId,
}
cart_async_handler!(
RemoveProduct,
remove_product,
Option<model::ShoppingCartItem>
);
pub(crate) async fn remove_product(
msg: RemoveProduct,
db: actix::Addr<Database>,
) -> Result<Option<model::ShoppingCartItem>> {
Ok(query_db!(
db,
database_manager::RemoveCartItem {
shopping_cart_id: msg.shopping_cart_id,
shopping_cart_item_id: Some(msg.shopping_cart_item_id),
product_id: None,
},
Error::UpdateFailed
))
}
pub struct ModifyCartResult {
pub cart_id: ShoppingCartId,
pub items: Vec<model::ShoppingCartItem>,
pub checkout_notes: String,
pub payment_method: model::PaymentMethod,
}
#[derive(actix::Message, Debug)]
#[rtype(result = "Result<ModifyCartResult>")]
pub struct ModifyCart {
pub buyer_id: model::AccountId,
pub items: Vec<ModifyItem>,
pub checkout_notes: String,
pub payment_method: Option<PaymentMethod>,
}
cart_async_handler!(ModifyCart, modify_cart, ModifyCartResult);
async fn modify_cart(msg: ModifyCart, db: actix::Addr<Database>) -> Result<ModifyCartResult> {
tracing::debug!("{:?}", msg);
let cart: model::ShoppingCart = query_db!(
db,
database_manager::EnsureActiveShoppingCart {
buyer_id: msg.buyer_id,
},
Error::ShoppingCartFailed
);
let cart: model::ShoppingCart = query_db!(
db,
database_manager::UpdateShoppingCart {
id: cart.id,
buyer_id: msg.buyer_id,
payment_method: msg.payment_method.unwrap_or(cart.payment_method),
state: model::ShoppingCartState::Active,
checkout_notes: if msg.checkout_notes.is_empty() {
None
} else {
Some(msg.checkout_notes)
}
},
passthrough Error::Db,
Error::CartNotAvailable
);
let existing =
msg.items
.iter()
.fold(HashSet::with_capacity(msg.items.len()), |mut agg, item| {
agg.insert(item.product_id);
agg
});
let items: Vec<model::ShoppingCartItem> = query_db!(
db,
database_manager::CartItems {
shopping_cart_id: cart.id
},
Error::CantModifyCart
);
for item in items
.into_iter()
.filter(|item| !existing.contains(&item.product_id))
{
query_db!(
db,
database_manager::RemoveCartItem {
shopping_cart_id: cart.id,
shopping_cart_item_id: Some(item.id),
product_id: None,
},
Error::CantModifyCart
);
}
let mut out = Vec::with_capacity(msg.items.len());
for item in msg.items {
if let Some(item) = modify_item(item, db.clone()).await? {
out.push(item);
}
}
Ok(ModifyCartResult {
cart_id: cart.id,
items: out,
checkout_notes: cart.checkout_notes.unwrap_or_default(),
payment_method: cart.payment_method,
})
}

View File

@ -5,26 +5,27 @@ edition = "2021"
[[bin]] [[bin]]
name = "account-manager" name = "account-manager"
path = "./src/main.rs" path = "src/main.rs"
[dependencies] [dependencies]
actix = { version = "0.13", features = [] } actix = { version = "0.13", features = [] }
actix-rt = { version = "2.7", features = [] } actix-rt = { version = "2.7", features = [] }
bincode = { version = "1.3.3" } bincode = { version = "1.3.3" }
bytes = { version = "1.2.1" } bytes = { version = "1.2.1" }
channels = { path = "../../shared/channels" } channels = { path = "../channels" }
config = { path = "../../shared/config" } config = { path = "../config" }
database_manager = { path = "../database_manager" }
dotenv = { version = "0.15.0" } dotenv = { version = "0.15.0" }
futures = { version = "0.3.25" } futures = { version = "0.3.25" }
gumdrop = { version = "0.8.1" } gumdrop = { version = "0.8.1" }
json = { version = "0.12.4" } json = { version = "0.12.4" }
model = { path = "../../shared/model" } model = { path = "../model" }
opentelemetry = { version = "0.17.0" } opentelemetry = { version = "0.17.0" }
opentelemetry-jaeger = { version = "0.17.0" } opentelemetry-jaeger = { version = "0.17.0" }
pretty_env_logger = { version = "0.4", features = [] } pretty_env_logger = { version = "0.4", features = [] }
rumqttc = { version = "*" } rumqttc = { version = "*" }
serde = { version = "1.0.137", features = ["derive"] } serde = { version = "1.0.137", features = ["derive"] }
sqlx = { version = "0.6.2", features = ["migrate", "runtime-actix-rustls", "all-types", "postgres"] }
sqlx-core = { version = "0.6.2", features = [] }
tarpc = { version = "0.30.0", features = ["tokio1", "serde-transport-bincode", "serde-transport", "serde", "serde-transport-json", "tcp"] } tarpc = { version = "0.30.0", features = ["tokio1", "serde-transport-bincode", "serde-transport", "serde", "serde-transport-json", "tcp"] }
thiserror = { version = "1.0.31" } thiserror = { version = "1.0.31" }
tokio = { version = "1.21.2", features = ['full'] } tokio = { version = "1.21.2", features = ['full'] }

View File

@ -0,0 +1,22 @@
CREATE EXTENSION IF NOT EXISTS "uuid-ossp" WITH SCHEMA public;
CREATE TYPE "AccountState" AS ENUM (
'active',
'suspended',
'banned'
);
CREATE TYPE "Role" AS ENUM (
'admin',
'user'
);
CREATE TABLE public.accounts (
id integer NOT NULL,
email character varying NOT NULL,
login character varying NOT NULL,
pass_hash character varying NOT NULL,
role "Role" DEFAULT 'user'::"Role" NOT NULL,
customer_id uuid DEFAULT gen_random_uuid() NOT NULL,
state "AccountState" DEFAULT 'active'::"AccountState" NOT NULL
);

View File

@ -0,0 +1,12 @@
CREATE TABLE public.account_addresses (
id integer NOT NULL,
name text NOT NULL,
email text NOT NULL,
street text NOT NULL,
city text NOT NULL,
country text NOT NULL,
zip text NOT NULL,
account_id integer,
is_default boolean DEFAULT false NOT NULL,
phone text DEFAULT ''::text NOT NULL
);

View File

@ -1,37 +1,55 @@
use channels::account::{CreateAccount, MeResult}; use channels::accounts::{me, register};
use config::SharedAppConfig; use config::SharedAppConfig;
use database_manager::Database;
use model::{Encrypt, FullAccount}; use model::{Encrypt, FullAccount};
use crate::db::{AccountAddresses, Database, FindAccount};
use crate::{Error, Result}; use crate::{Error, Result};
#[allow(unused)] #[allow(unused)]
pub async fn me(account_id: model::AccountId, db: Database) -> MeResult { pub async fn me(account_id: model::AccountId, db: Database) -> me::Output {
use channels::account::Error; use channels::accounts::Error;
let msg = database_manager::FindAccount { account_id }; let mut t = match db.pool.begin().await {
let account: model::FullAccount = match msg.inner_find_account(db.pool().clone()).await { Ok(t) => t,
Err(e) => {
tracing::error!("{}", e);
return me::Output {
account: None,
addresses: None,
error: Some(Error::Account),
};
}
};
let res = FindAccount { account_id }.run(&mut t).await;
let account: model::FullAccount = match res {
Ok(account) => account, Ok(account) => account,
Err(e) => { Err(e) => {
tracing::error!("{}", e); tracing::error!("{}", e);
return MeResult { t.rollback().await.ok();
return me::Output {
error: Some(Error::Account), error: Some(Error::Account),
..Default::default() ..Default::default()
}; };
} }
}; };
let msg = database_manager::AccountAddresses { account_id }; let res = AccountAddresses { account_id }.run(&mut t).await;
let addresses = match msg.inner_account_addresses(db.pool().clone()).await { let addresses = match res {
Ok(v) => v, Ok(v) => v,
Err(e) => { Err(e) => {
tracing::error!("{}", e); tracing::error!("{}", e);
return MeResult { t.rollback().await.ok();
return me::Output {
error: Some(Error::Addresses), error: Some(Error::Addresses),
..Default::default() ..Default::default()
}; };
} }
}; };
MeResult { t.commit().await.ok();
me::Output {
account: Some(account), account: Some(account),
addresses: Some(addresses), addresses: Some(addresses),
..Default::default() ..Default::default()
@ -39,7 +57,7 @@ pub async fn me(account_id: model::AccountId, db: Database) -> MeResult {
} }
pub async fn create_account( pub async fn create_account(
msg: CreateAccount, msg: register::Input,
db: &Database, db: &Database,
config: SharedAppConfig, config: SharedAppConfig,
) -> Result<FullAccount> { ) -> Result<FullAccount> {
@ -51,21 +69,20 @@ pub async fn create_account(
Error::Hashing Error::Hashing
})?; })?;
let mut t = db.pool().begin().await.map_err(|e| { let mut t = db.pool.begin().await.map_err(|e| {
tracing::error!("{}", e); tracing::error!("{}", e);
Error::DbCritical Error::DbCritical
})?; })?;
let account: FullAccount = match database_manager::create_account( let res = crate::db::CreateAccount {
database_manager::CreateAccount { email: msg.email,
email: msg.email, login: msg.login,
login: msg.login, pass_hash: model::PassHash::new(hash),
pass_hash: model::PassHash::new(hash), role: msg.role,
role: msg.role, }
}, .run(&mut t)
&mut t, .await;
)
.await let account: FullAccount = match res {
{
Ok(r) => r, Ok(r) => r,
Err(e) => { Err(e) => {
tracing::error!("{}", e); tracing::error!("{}", e);

View File

@ -20,7 +20,7 @@ async fn main() -> std::io::Result<()> {
let opts: Flags = gumdrop::Options::parse_args_default_or_exit(); let opts: Flags = gumdrop::Options::parse_args_default_or_exit();
let config = config::default_load(&opts); let config = config::default_load(&opts);
let client = channels::account::rpc::create_client(config).await; let client = channels::accounts::rpc::create_client(config).await;
let r = client.me(context::current(), 1.into()).await; let r = client.me(context::current(), 1.into()).await;
println!("{:?}", r); println!("{:?}", r);

View File

@ -0,0 +1,428 @@
use model::{AccountId, AccountState, Email, FullAccount, Login, PassHash, Role};
pub type Result<T> = std::result::Result<T, Error>;
#[derive(Debug, Copy, Clone, PartialEq, Eq, thiserror::Error)]
pub enum Error {
#[error("Can't create account")]
CantCreate,
#[error("Can't find account does to lack of identity")]
NoIdentity,
#[error("Account does not exists")]
NotExists,
#[error("Failed to load all accounts")]
All,
#[error("Can't update account")]
CantUpdate,
}
#[derive(Debug)]
pub struct AllAccounts;
impl AllAccounts {
pub async fn run(
_msg: AllAccounts,
pool: &mut sqlx::Transaction<'_, sqlx::Postgres>,
) -> Result<Vec<FullAccount>> {
sqlx::query_as(
r#"
SELECT id, email, login, pass_hash, role, customer_id, state
FROM accounts
"#,
)
.fetch_all(pool)
.await
.map_err(|e| {
tracing::error!("{e:?}");
Error::All
})
}
}
#[derive(Debug)]
pub struct CreateAccount {
pub email: Email,
pub login: Login,
pub pass_hash: PassHash,
pub role: Role,
}
impl CreateAccount {
pub async fn run(
self,
pool: &mut sqlx::Transaction<'_, sqlx::Postgres>,
) -> Result<FullAccount> {
sqlx::query_as(
r#"
INSERT INTO accounts (login, email, role, pass_hash)
VALUES ($1, $2, $3, $4)
RETURNING id, email, login, pass_hash, role, customer_id, state
"#,
)
.bind(self.login)
.bind(self.email)
.bind(self.role)
.bind(self.pass_hash)
.fetch_one(pool)
.await
.map_err(|e| {
tracing::error!("{e:?}");
Error::CantCreate
})
}
}
#[derive(Debug)]
pub struct UpdateAccount {
pub id: AccountId,
pub email: Email,
pub login: Login,
pub pass_hash: Option<PassHash>,
pub role: Role,
pub state: AccountState,
}
impl UpdateAccount {
pub async fn run(
self,
pool: &mut sqlx::Transaction<'_, sqlx::Postgres>,
) -> Result<FullAccount> {
match self.pass_hash {
Some(hash) => sqlx::query_as(
r#"
UPDATE accounts
SET login = $2, email = $3, role = $4, pass_hash = $5, state = $6
WHERE id = $1
RETURNING id, email, login, pass_hash, role, customer_id, state
"#,
)
.bind(self.id)
.bind(self.login)
.bind(self.email)
.bind(self.role)
.bind(hash)
.bind(self.state),
None => sqlx::query_as(
r#"
UPDATE accounts
SET login = $2, email = $3, role = $4, state = $5
WHERE id = $1
RETURNING id, email, login, pass_hash, role, customer_id, state
"#,
)
.bind(self.id)
.bind(self.login)
.bind(self.email)
.bind(self.role)
.bind(self.state),
}
.fetch_one(pool)
.await
.map_err(|e| {
tracing::error!("{e:?}");
Error::CantUpdate
})
}
}
#[derive(Debug)]
pub struct FindAccount {
pub account_id: AccountId,
}
impl FindAccount {
pub async fn run(
self,
pool: &mut sqlx::Transaction<'_, sqlx::Postgres>,
) -> Result<FullAccount> {
sqlx::query_as(
r#"
SELECT id, email, login, pass_hash, role, customer_id, state
FROM accounts
WHERE id = $1
"#,
)
.bind(self.account_id)
.fetch_one(pool)
.await
.map_err(|e| {
tracing::error!("{e:?}");
Error::NotExists
})
}
}
#[derive(Debug)]
pub struct AccountByIdentity {
pub login: Option<Login>,
pub email: Option<Email>,
}
impl AccountByIdentity {
pub async fn run(
self,
pool: &mut sqlx::Transaction<'_, sqlx::Postgres>,
) -> Result<FullAccount> {
match (self.login, self.email) {
(Some(login), None) => sqlx::query_as(
r#"
SELECT id, email, login, pass_hash, role, customer_id, state
FROM accounts
WHERE login = $1
"#,
)
.bind(login),
(None, Some(email)) => sqlx::query_as(
r#"
SELECT id, email, login, pass_hash, role, customer_id, state
FROM accounts
WHERE email = $1
"#,
)
.bind(email),
(Some(login), Some(email)) => sqlx::query_as(
r#"
SELECT id, email, login, pass_hash, role, customer_id, state
FROM accounts
WHERE login = $1 AND email = $2
"#,
)
.bind(login)
.bind(email),
_ => return Err(Error::NoIdentity),
}
.fetch_one(pool)
.await
.map_err(|e| {
tracing::error!("{e:?}");
Error::CantCreate
})
}
}
#[cfg(test)]
mod tests {
use config::UpdateConfig;
use fake::Fake;
use model::*;
use super::*;
pub struct NoOpts;
impl UpdateConfig for NoOpts {}
async fn test_create_account(
t: &mut sqlx::Transaction<'_, sqlx::Postgres>,
login: Option<String>,
email: Option<String>,
hash: Option<String>,
) -> FullAccount {
use fake::faker::internet::en;
let login: String = login.unwrap_or_else(|| en::Username().fake());
let email: String = email.unwrap_or_else(|| en::FreeEmail().fake());
let hash: String = hash.unwrap_or_else(|| en::Password(10..20).fake());
CreateAccount {
email: Email::new(email),
login: Login::new(login),
pass_hash: PassHash::new(hash),
role: Role::Admin,
}
.run(t)
.await
.unwrap()
}
#[actix::test]
async fn create_account() {
testx::db_t_ref!(t);
let login: String = fake::faker::internet::en::Username().fake();
let email: String = fake::faker::internet::en::FreeEmail().fake();
let hash: String = fake::faker::internet::en::Password(10..20).fake();
let account: FullAccount = CreateAccount {
email: Email::new(&email),
login: Login::new(&login),
pass_hash: PassHash::new(&hash),
role: Role::Admin,
}
.run(&mut t)
.await
.unwrap();
let expected = FullAccount {
login: Login::new(login),
email: Email::new(email),
pass_hash: PassHash::new(&hash),
role: Role::Admin,
customer_id: account.customer_id,
id: account.id,
state: AccountState::Active,
};
t.rollback().await.unwrap();
assert_eq!(account, expected);
}
#[actix::test]
async fn all_accounts() {
testx::db_t_ref!(t);
test_create_account(&mut t, None, None, None).await;
test_create_account(&mut t, None, None, None).await;
test_create_account(&mut t, None, None, None).await;
let v: Vec<FullAccount> = AllAccounts.run(&mut t).await.unwrap();
testx::db_rollback!(t);
assert!(v.len() >= 3);
}
#[actix::test]
async fn update_account_without_pass() {
testx::db_t_ref!(t);
let original_login: String = fake::faker::internet::en::Username().fake();
let original_email: String = fake::faker::internet::en::FreeEmail().fake();
let original_hash: String = fake::faker::internet::en::Password(10..20).fake();
let original_account = test_create_account(
&mut t,
Some(original_login.clone()),
Some(original_email.clone()),
Some(original_hash.clone()),
)
.await;
let updated_login: String = fake::faker::internet::en::Username().fake();
let updated_email: String = fake::faker::internet::en::FreeEmail().fake();
let updated_account: FullAccount = UpdateAccount {
id: original_account.id,
email: Email::new(updated_email.clone()),
login: Login::new(updated_login.clone()),
pass_hash: None,
role: Role::Admin,
state: AccountState::Active,
}
.run(&mut t)
.await
.unwrap();
let expected = FullAccount {
id: original_account.id,
email: Email::new(updated_email),
login: Login::new(updated_login),
pass_hash: PassHash::new(original_hash),
role: Role::Admin,
customer_id: original_account.customer_id,
state: AccountState::Active,
};
testx::db_rollback!(t);
assert_ne!(original_account, expected);
assert_eq!(updated_account, expected);
}
#[actix::test]
async fn update_account_with_pass() {
testx::db_t_ref!(t);
let original_login: String = fake::faker::internet::en::Username().fake();
let original_email: String = fake::faker::internet::en::FreeEmail().fake();
let original_hash: String = fake::faker::internet::en::Password(10..20).fake();
let original_account = test_create_account(
&mut t,
Some(original_login.clone()),
Some(original_email.clone()),
Some(original_hash.clone()),
)
.await;
let updated_login: String = fake::faker::internet::en::Username().fake();
let updated_email: String = fake::faker::internet::en::FreeEmail().fake();
let updated_hash: String = fake::faker::internet::en::Password(10..20).fake();
let updated_account: FullAccount = UpdateAccount {
id: original_account.id,
email: Email::new(updated_email.clone()),
login: Login::new(updated_login.clone()),
pass_hash: Some(PassHash::new(updated_hash.clone())),
role: Role::Admin,
state: AccountState::Active,
}
.run(&mut t)
.await
.unwrap();
let expected = FullAccount {
id: original_account.id,
email: Email::new(updated_email),
login: Login::new(updated_login),
pass_hash: PassHash::new(updated_hash),
role: Role::Admin,
customer_id: original_account.customer_id,
state: AccountState::Active,
};
testx::db_rollback!(t);
assert_ne!(original_account, expected);
assert_eq!(updated_account, expected);
}
#[actix::test]
async fn find() {
testx::db_t_ref!(t);
let account = test_create_account(&mut t, None, None, None).await;
let res: FullAccount = FindAccount {
account_id: account.id,
}
.run(&mut t)
.await
.unwrap();
testx::db_rollback!(t);
assert_eq!(account, res);
}
#[actix::test]
async fn find_identity_email() {
testx::db_t_ref!(t);
let account = test_create_account(&mut t, None, None, None).await;
let res: FullAccount = AccountByIdentity {
email: Some(account.email.clone()),
login: None,
}
.run(&mut t)
.await
.unwrap();
testx::db_rollback!(t);
assert_eq!(account, res);
}
#[actix::test]
async fn find_identity_login() {
testx::db_t_ref!(t);
let account = test_create_account(&mut t, None, None, None).await;
let res: FullAccount = AccountByIdentity {
login: Some(account.login.clone()),
email: None,
}
.run(&mut t)
.await
.unwrap();
testx::db_rollback!(t);
assert_eq!(account, res);
}
}

View File

@ -0,0 +1,312 @@
pub type Result<T> = std::result::Result<T, Error>;
#[derive(Debug, Copy, Clone, PartialEq, Eq, serde::Serialize, thiserror::Error)]
pub enum Error {
#[error("Can't load account addresses")]
AccountAddresses,
#[error("Failed to save account address")]
CreateAccountAddress,
}
#[derive(Debug)]
pub struct AccountAddresses {
pub account_id: model::AccountId,
}
impl AccountAddresses {
pub async fn run(
self,
pool: &mut sqlx::Transaction<'_, sqlx::Postgres>,
) -> Result<Vec<model::AccountAddress>> {
sqlx::query_as(
r#"
SELECT id, name, email, phone, street, city, country, zip, account_id, is_default
FROM account_addresses
WHERE account_id = $1
"#,
)
.bind(self.account_id)
.fetch_all(pool)
.await
.map_err(|_| Error::AccountAddresses.into())
}
}
#[derive(Debug)]
pub struct FindAccountAddress {
pub account_id: model::AccountId,
pub address_id: model::AddressId,
}
impl FindAccountAddress {
pub async fn run(
self,
pool: &mut sqlx::Transaction<'_, sqlx::Postgres>,
) -> Result<model::AccountAddress> {
sqlx::query_as(
r#"
SELECT id, name, email, phone, street, city, country, zip, account_id, is_default
FROM account_addresses
WHERE account_id = $1 AND id = $2
"#,
)
.bind(self.account_id)
.bind(self.address_id)
.fetch_one(pool)
.await
.map_err(|_| Error::AccountAddresses.into())
}
}
#[derive(Debug)]
pub struct DefaultAccountAddress {
pub account_id: model::AccountId,
}
impl DefaultAccountAddress {
pub async fn run(
self,
pool: &mut sqlx::Transaction<'_, sqlx::Postgres>,
) -> Result<model::AccountAddress> {
sqlx::query_as(
r#"
SELECT id, name, email, phone, street, city, country, zip, account_id, is_default
FROM account_addresses
WHERE account_id = $1 AND is_default
"#,
)
.bind(self.account_id)
.fetch_one(pool)
.await
.map_err(|_| Error::AccountAddresses.into())
}
}
#[derive(Debug)]
pub struct CreateAccountAddress {
pub name: model::Name,
pub email: model::Email,
pub phone: model::Phone,
pub street: model::Street,
pub city: model::City,
pub country: model::Country,
pub zip: model::Zip,
pub account_id: Option<model::AccountId>,
pub is_default: bool,
}
impl CreateAccountAddress {
pub async fn run(
self,
pool: &mut sqlx::Transaction<'_, sqlx::Postgres>,
) -> Result<model::AccountAddress> {
if self.is_default && self.account_id.is_some() {
if let Err(e) = sqlx::query(
r#"
UPDATE account_addresses
SET is_default = FALSE
WHERE account_id = $1
"#,
)
.bind(self.account_id)
.fetch_all(&mut *pool)
.await
{
tracing::error!("{e}");
dbg!(e);
}
}
sqlx::query_as(
r#"
INSERT INTO account_addresses ( name, email, phone, street, city, country, zip, account_id, is_default)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
RETURNING id, name, email, phone, street, city, country, zip, account_id, is_default
"#,
)
.bind(self.name)
.bind(self.email)
.bind(self.phone)
.bind(self.street)
.bind(self.city)
.bind(self.country)
.bind(self.zip)
.bind(self.account_id)
.bind(self.is_default)
.fetch_one(pool)
.await
.map_err(|e| {
tracing::error!("{e}");
dbg!(e);
Error::CreateAccountAddress.into()
})
}
}
#[derive(Debug)]
pub struct UpdateAccountAddress {
pub id: model::AddressId,
pub name: model::Name,
pub email: model::Email,
pub phone: model::Phone,
pub street: model::Street,
pub city: model::City,
pub country: model::Country,
pub zip: model::Zip,
pub account_id: model::AccountId,
pub is_default: bool,
}
impl UpdateAccountAddress {
pub async fn run(
self,
pool: &mut sqlx::Transaction<'_, sqlx::Postgres>,
) -> Result<model::AccountAddress> {
sqlx::query_as(
r#"
UPDATE account_addresses
SET name = $2, email = $3, street = $4, city = $5, country = $6, zip = $7, account_id = $8, is_default = $9, phone = $10
WHERE id = $1
RETURNING id, name, email, phone, street, city, country, zip, account_id, is_default
"#,
)
.bind(self.id)
.bind(self.name)
.bind(self.email)
.bind(self.street)
.bind(self.city)
.bind(self.country)
.bind(self.zip)
.bind(self.account_id)
.bind(self.is_default)
.bind(self.phone)
.fetch_one(pool)
.await
.map_err(|_| Error::CreateAccountAddress.into())
}
}
#[cfg(test)]
mod test {
use config::*;
use fake::Fake;
use model::*;
use super::super::accounts::CreateAccount;
use super::*;
pub struct NoOpts;
impl UpdateConfig for NoOpts {}
async fn test_create_account(pool: &mut sqlx::Transaction<'_, sqlx::Postgres>) -> FullAccount {
let login: String = fake::faker::internet::en::Username().fake();
let email: String = fake::faker::internet::en::FreeEmail().fake();
let hash: String = fake::faker::internet::en::Password(10..20).fake();
CreateAccount {
email: Email::new(email),
login: Login::new(login),
pass_hash: PassHash::new(hash),
role: Role::Admin,
}
.run(pool)
.await
.unwrap()
}
#[actix::test]
async fn full_check() {
testx::db_t_ref!(t);
// account
let account = test_create_account(&mut t).await;
// address
let mut address: AccountAddress = {
let name: String = fake::faker::name::en::Name().fake();
let email: String = fake::faker::internet::en::FreeEmail().fake();
let phone: String = fake::faker::phone_number::en::PhoneNumber().fake();
let street: String = fake::faker::address::en::StreetName().fake();
let city: String = fake::faker::address::en::CityName().fake();
let country: String = fake::faker::address::en::CountryName().fake();
let zip: String = fake::faker::address::en::ZipCode().fake();
let account_id = Some(account.id);
let is_default: bool = true;
let address = CreateAccountAddress {
name: Name::new(name.clone()),
email: Email::new(email.clone()),
phone: Phone::new(phone.clone()),
street: Street::new(street.clone()),
city: City::new(city.clone()),
country: Country::new(country.clone()),
zip: Zip::new(zip.clone()),
account_id,
is_default,
}
.run(&mut t)
.await
.unwrap();
assert_eq!(
address,
AccountAddress {
id: address.id,
name: Name::new(name.clone()),
email: Email::new(email.clone()),
phone: Phone::new(phone.clone()),
street: Street::new(street.clone()),
city: City::new(city.clone()),
country: Country::new(country.clone()),
zip: Zip::new(zip.clone()),
account_id: account.id,
is_default,
}
);
address
};
let found = super::find_account_address(
FindAccountAddress {
account_id: account.id,
address_id: address.id,
},
&mut t,
)
.await
.unwrap();
assert_eq!(found, address);
let changed = UpdateAccountAddress {
id: address.id,
name: address.name.clone(),
email: address.email.clone(),
phone: address.phone.clone(),
street: address.street.clone(),
city: address.city.clone(),
country: address.country.clone(),
zip: address.zip.clone(),
account_id: address.account_id,
is_default: true,
}
.run(&mut t)
.await
.unwrap();
address.is_default = true;
assert_eq!(changed, address);
let default_address = DefaultAccountAddress {
account_id: account.id,
}
.run(&mut t)
.await
.unwrap();
testx::db_rollback!(t);
assert_eq!(default_address, address);
}
}

View File

@ -0,0 +1,28 @@
pub mod accounts;
pub mod addresses;
pub use accounts::*;
pub use addresses::*;
use config::SharedAppConfig;
#[derive(Clone)]
pub struct Database {
pub pool: sqlx::PgPool,
_config: SharedAppConfig,
}
impl Database {
pub async fn build(config: SharedAppConfig) -> Self {
let url = config.lock().account_manager().database_url.clone();
let pool = sqlx::PgPool::connect(&url).await.unwrap_or_else(|e| {
tracing::error!("Failed to connect to database. {e:?}");
std::process::exit(1);
});
Self {
pool,
_config: config,
}
}
pub fn pool(&self) {}
}

View File

@ -3,12 +3,12 @@
use std::env; use std::env;
use config::UpdateConfig; use config::UpdateConfig;
use database_manager::Database;
use tracing_subscriber::fmt::format::FmtSpan; use tracing_subscriber::fmt::format::FmtSpan;
use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt; use tracing_subscriber::util::SubscriberInitExt;
pub mod actions; pub mod actions;
pub mod db;
pub mod mqtt; pub mod mqtt;
pub mod rpc; pub mod rpc;
@ -26,8 +26,6 @@ pub enum Error {
Saving, Saving,
#[error("Unable to hash password")] #[error("Unable to hash password")]
Hashing, Hashing,
#[error("{0}")]
Db(#[from] database_manager::Error),
} }
pub struct Opts {} pub struct Opts {}
@ -43,7 +41,7 @@ async fn main() {
let config = config::default_load(&opts); let config = config::default_load(&opts);
let db = Database::build(config.clone()).await; let db = db::Database::build(config.clone()).await;
let mqtt_client = mqtt::start(config.clone(), db.clone()).await; let mqtt_client = mqtt::start(config.clone(), db.clone()).await;
rpc::start(config.clone(), db.clone(), mqtt_client.clone()).await; rpc::start(config.clone(), db.clone(), mqtt_client.clone()).await;
@ -55,6 +53,7 @@ pub fn init_tracing(_service_name: &str) {
let tracer = { let tracer = {
use opentelemetry::sdk::export::trace::stdout::new_pipeline; use opentelemetry::sdk::export::trace::stdout::new_pipeline;
use opentelemetry::sdk::trace::Config; use opentelemetry::sdk::trace::Config;
new_pipeline() new_pipeline()
.with_trace_config(Config::default()) .with_trace_config(Config::default())
.with_pretty_print(true) .with_pretty_print(true)

View File

@ -0,0 +1,48 @@
use std::time::Duration;
use channels::accounts::Topic;
use config::SharedAppConfig;
use rumqttc::{Event, Incoming, QoS};
use crate::db::Database;
pub async fn start(config: SharedAppConfig, _db: Database) -> channels::AsyncClient {
let mut mqtt_options = {
let l = config.lock();
let bind = &l.account_manager().mqtt_bind;
let port = l.account_manager().mqtt_port;
tracing::info!("Starting account mqtt at {}:{}", bind, port);
rumqttc::MqttOptions::new(channels::accounts::CLIENT_NAME, bind, port)
};
mqtt_options.set_keep_alive(Duration::from_secs(5));
let (client, mut event_loop) = rumqttc::AsyncClient::new(mqtt_options, 10);
client
.subscribe(Topic::CreateAccount, QoS::AtLeastOnce)
.await
.unwrap();
let client = channels::AsyncClient(client);
let spawn_client = client.clone();
tokio::spawn(async move {
let _client = spawn_client.clone();
loop {
let notification = event_loop.poll().await;
match notification {
Ok(Event::Incoming(Incoming::Publish(publish))) => match publish.topic.as_str() {
_ => {}
},
Ok(Event::Incoming(_incoming)) => {}
Ok(Event::Outgoing(_outgoing)) => {}
Err(e) => {
tracing::error!("{}", e);
}
}
}
// tracing::info!("Mqtt channel closed");
});
client
}

View File

@ -1,18 +1,19 @@
use std::net::{IpAddr, Ipv4Addr}; use std::net::{IpAddr, Ipv4Addr};
use channels::account::{CreateAccount, MeResult, RegisterResult}; use channels::accounts::rpc::Accounts;
use channels::accounts::{me, register};
use channels::AsyncClient; use channels::AsyncClient;
use config::SharedAppConfig; use config::SharedAppConfig;
use database_manager::Database;
use futures::future::{self}; use futures::future::{self};
use futures::stream::StreamExt; use futures::stream::StreamExt;
use rumqttc::QoS; use rumqttc::QoS;
use tarpc::context; use tarpc::context;
use tarpc::server::incoming::Incoming; use tarpc::server::incoming::Incoming;
use tarpc::server::{self, Channel}; use tarpc::server::{self, Channel};
use tarpc::tokio_serde::formats::Json; use tarpc::tokio_serde::formats::Bincode;
use crate::actions; use crate::actions;
use crate::db::Database;
#[derive(Debug, Copy, Clone, serde::Serialize, thiserror::Error)] #[derive(Debug, Copy, Clone, serde::Serialize, thiserror::Error)]
#[serde(rename_all = "kebab-case", tag = "account")] #[serde(rename_all = "kebab-case", tag = "account")]
@ -27,8 +28,6 @@ pub enum Error {
Saving, Saving,
#[error("Unable to hash password")] #[error("Unable to hash password")]
Hashing, Hashing,
#[error("{0}")]
Db(#[from] database_manager::Error),
} }
#[derive(Clone)] #[derive(Clone)]
@ -39,47 +38,46 @@ struct AccountsServer {
} }
#[tarpc::server] #[tarpc::server]
impl channels::account::rpc::Accounts for AccountsServer { impl Accounts for AccountsServer {
async fn me(self, _: context::Context, account_id: model::AccountId) -> MeResult { async fn me(self, _: context::Context, input: me::Input) -> me::Output {
let res = actions::me(account_id, self.db).await; let res = actions::me(input.account_id, self.db).await;
tracing::info!("ME result: {:?}", res); tracing::info!("ME result: {:?}", res);
res res
} }
async fn register_account(self, _: context::Context, details: CreateAccount) -> RegisterResult { async fn register_account(
let res = actions::create_account(details, &self.db, self.config).await; self,
_: context::Context,
input: register::Input,
) -> register::Output {
use channels::accounts::{Error, Topic};
let res = actions::create_account(input, &self.db, self.config).await;
tracing::info!("REGISTER result: {:?}", res); tracing::info!("REGISTER result: {:?}", res);
match res { match res {
Ok(account) => { Ok(account) => {
self.mqtt_client self.mqtt_client
.publish_or_log( .publish_or_log(Topic::AccountCreated, QoS::AtLeastOnce, true, &account)
channels::account::Topic::AccountCreated,
QoS::AtLeastOnce,
true,
&account,
)
.await; .await;
RegisterResult { register::Output {
account: Some(account), account: Some(account),
error: None, error: None,
} }
} }
Err(_e) => RegisterResult { Err(_e) => register::Output {
account: None, account: None,
error: Some(channels::account::Error::Account), error: Some(Error::Account),
}, },
} }
} }
} }
pub async fn start(config: SharedAppConfig, db: Database, mqtt_client: AsyncClient) { pub async fn start(config: SharedAppConfig, db: Database, mqtt_client: AsyncClient) {
use channels::account::rpc::Accounts;
let port = { config.lock().account_manager().port }; let port = { config.lock().account_manager().port };
let server_addr = (IpAddr::V4(Ipv4Addr::LOCALHOST), port); let server_addr = (IpAddr::V4(Ipv4Addr::LOCALHOST), port);
let mut listener = tarpc::serde_transport::tcp::listen(&server_addr, Json::default) let mut listener = tarpc::serde_transport::tcp::listen(&server_addr, Bincode::default)
.await .await
.unwrap(); .unwrap();
tracing::info!("Starting account rpc at {}", listener.local_addr()); tracing::info!("Starting account rpc at {}", listener.local_addr());

View File

@ -18,40 +18,40 @@ actix-web-httpauth = { version = "0.6", features = [] }
actix-web-opentelemetry = { version = "0.12", features = [] } actix-web-opentelemetry = { version = "0.12", features = [] }
async-trait = { version = "0.1", features = [] } async-trait = { version = "0.1", features = [] }
bytes = { version = "1.1.0" } bytes = { version = "1.1.0" }
cart_manager = { path = "../actors/cart_manager" } cart_manager = { path = "../cart_manager" }
channels = { path = "../shared/channels" } channels = { path = "../channels" }
chrono = { version = "0.4", features = ["serde"] } chrono = { version = "0.4", features = ["serde"] }
config = { path = "../shared/config" } config = { path = "../config" }
database_manager = { path = "../actors/database_manager" } database_manager = { path = "../database_manager" }
derive_more = { version = "0.99", features = [] } derive_more = { version = "0.99", features = [] }
dotenv = { version = "0.15", features = [] } dotenv = { version = "0.15", features = [] }
email_manager = { path = "../actors/email_manager" } email_manager = { path = "../email_manager" }
fs_manager = { path = "../actors/fs_manager" } fs_manager = { path = "../fs_manager" }
futures = { version = "0.3", features = [] } futures = { version = "0.3", features = [] }
futures-util = { version = "0.3", features = [] } futures-util = { version = "0.3", features = [] }
gumdrop = { version = "0.8", features = [] } gumdrop = { version = "0.8", features = [] }
human-panic = { version = "1.0.3" } human-panic = { version = "1.0.3" }
include_dir = { version = "0.7.2", features = [] } include_dir = { version = "0.7.2", features = [] }
jemallocator = { version = "0.3", features = [] } jemallocator = { version = "0.3", features = [] }
model = { path = "../shared/model", version = "0.1", features = ["db"] } model = { path = "../model", version = "0.1", features = ["db"] }
oauth2 = { version = "4.1", features = [] } oauth2 = { version = "4.1", features = [] }
order_manager = { path = "../actors/order_manager" } order_manager = { path = "../order_manager" }
parking_lot = { version = "0.12", features = [] } parking_lot = { version = "0.12", features = [] }
payment_manager = { path = "../actors/payment_manager" } payment_manager = { path = "../payment_manager" }
pretty_env_logger = { version = "0.4", features = [] } pretty_env_logger = { version = "0.4", features = [] }
rumqttc = { version = "*" } rumqttc = { version = "*" }
search_manager = { path = "../actors/search_manager" } search_manager = { path = "../search_manager" }
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_json = { version = "1.0", features = [] } serde_json = { version = "1.0", features = [] }
sqlx = { version = "0.6.2", features = ["migrate", "runtime-actix-rustls", "all-types", "postgres"] } sqlx = { version = "0.6.2", features = ["migrate", "runtime-actix-rustls", "all-types", "postgres"] }
sqlx-core = { version = "0.6.2", features = [] } sqlx-core = { version = "0.6.2", features = [] }
tarpc = { version = "0.30.0", features = ["tokio1", "serde-transport-bincode", "serde-transport", "serde", "serde-transport-json", "tcp"] }
tera = { version = "1.15", features = [] } tera = { version = "1.15", features = [] }
thiserror = { version = "1.0", features = [] } thiserror = { version = "1.0", features = [] }
token_manager = { path = "../actors/token_manager" } token_manager = { path = "../token_manager" }
tokio = { version = "1.17", features = ["full"] } tokio = { version = "1.17", features = ["full"] }
toml = { version = "0.5", features = [] } toml = { version = "0.5", features = [] }
tracing = { version = "0.1.34" } tracing = { version = "0.1.34" }
tracing-subscriber = { version = "0.3.11" } tracing-subscriber = { version = "0.3.11" }
uuid = { version = "1.2.1", features = ["serde"] } uuid = { version = "1.2.1", features = ["serde"] }
validator = { version = "0.14", features = [] } validator = { version = "0.14", features = [] }
tarpc = { version = "0.30.0", features = ["tokio1", "serde-transport-bincode", "serde-transport", "serde", "serde-transport-json", "tcp"] }

View File

Before

Width:  |  Height:  |  Size: 1.6 KiB

After

Width:  |  Height:  |  Size: 1.6 KiB

View File

Before

Width:  |  Height:  |  Size: 1.6 KiB

After

Width:  |  Height:  |  Size: 1.6 KiB

View File

Before

Width:  |  Height:  |  Size: 1.2 KiB

After

Width:  |  Height:  |  Size: 1.2 KiB

View File

Before

Width:  |  Height:  |  Size: 3.3 KiB

After

Width:  |  Height:  |  Size: 3.3 KiB

View File

Before

Width:  |  Height:  |  Size: 1.7 KiB

After

Width:  |  Height:  |  Size: 1.7 KiB

View File

Before

Width:  |  Height:  |  Size: 464 B

After

Width:  |  Height:  |  Size: 464 B

View File

Before

Width:  |  Height:  |  Size: 3.0 KiB

After

Width:  |  Height:  |  Size: 3.0 KiB

View File

Before

Width:  |  Height:  |  Size: 1015 B

After

Width:  |  Height:  |  Size: 1015 B

View File

Before

Width:  |  Height:  |  Size: 2.1 KiB

After

Width:  |  Height:  |  Size: 2.1 KiB

View File

Before

Width:  |  Height:  |  Size: 2.1 KiB

After

Width:  |  Height:  |  Size: 2.1 KiB

View File

Before

Width:  |  Height:  |  Size: 2.2 KiB

After

Width:  |  Height:  |  Size: 2.2 KiB

View File

@ -0,0 +1,32 @@
[package]
name = "cart_manager"
version = "0.1.0"
edition = "2021"
[[bin]]
name = "cart-manager"
path = "src/main.rs"
[dependencies]
actix = { version = "0.13", features = [] }
actix-rt = { version = "2.7", features = [] }
channels = { path = "../channels" }
chrono = { version = "0.4", features = ["serde"] }
config = { path = "../config" }
dotenv = { version = "0.15.0" }
futures = { version = "0.3.25" }
model = { path = "../model" }
opentelemetry = { version = "0.17.0" }
opentelemetry-jaeger = { version = "0.17.0" }
pretty_env_logger = { version = "0.4", features = [] }
rumqttc = { version = "*" }
serde = { version = "1.0.137", features = ["derive"] }
sqlx = { version = "0.6.2", features = ["migrate", "runtime-actix-rustls", "all-types", "postgres"] }
sqlx-core = { version = "0.6.2", features = [] }
tarpc = { version = "0.30.0", features = ["tokio1", "serde-transport-bincode", "serde-transport", "serde", "serde-transport-json", "tcp"] }
thiserror = { version = "1.0.31" }
tokio = { version = "1.21.2", features = ['full'] }
tracing = { version = "0.1.37" }
tracing-opentelemetry = { version = "0.17.4" }
tracing-subscriber = { version = "0.3.16", features = ["env-filter"] }
uuid = { version = "0.8", features = ["serde"] }

View File

@ -0,0 +1,33 @@
CREATE TYPE "PaymentMethod" AS ENUM (
'pay_u',
'payment_on_the_spot'
);
CREATE TYPE "ShoppingCartState" AS ENUM (
'active',
'closed'
);
CREATE TYPE "QuantityUnit" AS ENUM (
'g',
'dkg',
'kg',
'piece'
);
CREATE TABLE shopping_carts (
id integer NOT NULL,
buyer_id integer NOT NULL,
payment_method "PaymentMethod" DEFAULT 'payment_on_the_spot'::"PaymentMethod" NOT NULL,
state "ShoppingCartState" DEFAULT 'active'::"ShoppingCartState" NOT NULL,
checkout_notes text
);
CREATE TABLE shopping_cart_items (
id integer NOT NULL,
product_id integer NOT NULL,
shopping_cart_id integer,
quantity integer DEFAULT 0 NOT NULL,
quantity_unit "QuantityUnit" NOT NULL,
CONSTRAINT positive_quantity CHECK ((quantity >= 0))
);

View File

@ -0,0 +1,250 @@
use std::collections::HashSet;
use channels::carts::modify_cart::CartDetails;
use channels::carts::{self, Error};
use crate::db::*;
macro_rules! begin_t {
($db: ident) => {
match $db.pool.begin().await {
Ok(t) => t,
Err(e) => {
tracing::error!("{}", e);
return Output::error(Error::InternalServerError);
}
}
};
}
macro_rules! end_t {
($t: ident, $res: expr) => {
if let Err(e) = $t.commit().await {
tracing::error!("{}", e);
Output::error(Error::InternalServerError)
} else {
$res
}
};
}
pub async fn modify_item(
msg: carts::modify_item::Input,
db: Database,
) -> carts::modify_item::Output {
use channels::carts::modify_item::Output;
let mut t = begin_t!(db);
let dbm = EnsureActiveShoppingCart {
buyer_id: msg.buyer_id,
};
if let Err(e) = dbm.run(&mut t).await {
tracing::error!("{}", e);
t.rollback().await.ok();
return Output::error(Error::InternalServerError);
}
let dbm = AccountShoppingCarts {
account_id: msg.buyer_id,
state: Some(model::ShoppingCartState::Active),
};
let mut carts: Vec<model::ShoppingCart> = match dbm.run(db.pool()).await {
Ok(carts) => carts,
Err(e) => {
tracing::error!("{}", e);
return Output::error(Error::NoCarts);
}
};
let cart = if carts.is_empty() {
return Output::error(Error::NoCarts);
} else {
carts.remove(0)
};
let dbm = ActiveCartItemByProduct {
product_id: msg.product_id,
};
let item: Option<model::ShoppingCartItem> = match dbm.run(&mut t).await {
Ok(res) => res,
Err(e) => {
tracing::error!("{}", e);
return Output::error(Error::NoActiveCart);
}
};
let res = match item {
Some(item) if **item.quantity == 0 => {
let dbm = DeleteShoppingCartItem { id: item.id };
match dbm.run(&mut t).await {
Ok(Some(res)) => Output::item(res),
Ok(None) => Output::default(),
Err(e) => {
tracing::error!("{}", e);
t.rollback().await.ok();
return Output::error(Error::DeleteItem(item.id));
}
}
}
Some(item) => {
let dbm = UpdateShoppingCartItem {
id: item.id,
product_id: msg.product_id,
shopping_cart_id: cart.id,
quantity: msg.quantity,
quantity_unit: msg.quantity_unit,
};
match dbm.run(&mut t).await {
Ok(res) => Output::item(res),
Err(e) => {
tracing::error!("{}", e);
t.rollback().await.ok();
return Output::error(Error::ModifyItem(item.id));
}
}
}
None => {
let dbm = CreateShoppingCartItem {
product_id: msg.product_id,
shopping_cart_id: cart.id,
quantity: msg.quantity,
quantity_unit: msg.quantity_unit,
};
match dbm.run(&mut t).await {
Ok(res) => Output::item(res),
Err(e) => {
tracing::error!("{}", e);
t.rollback().await.ok();
return Output::error(Error::CreateItem);
}
}
}
};
end_t!(t, res)
}
pub async fn remove_product(
msg: carts::remove_product::Input,
db: Database,
) -> carts::remove_product::Output {
use carts::remove_product::Output;
let dbm = RemoveCartItem {
shopping_cart_id: msg.shopping_cart_id,
shopping_cart_item_id: Some(msg.shopping_cart_item_id),
product_id: None,
};
let mut t = begin_t!(db);
let res = match dbm.run(&mut t).await {
Ok(Some(res)) => Output::item(res),
Ok(None) => Output::default(),
Err(e) => {
tracing::error!("{}", e);
Output::error(Error::DeleteItem(msg.shopping_cart_item_id))
}
};
end_t!(t, res)
}
pub async fn modify_cart(
msg: carts::modify_cart::Input,
db: Database,
) -> carts::modify_cart::Output {
use carts::modify_cart::Output;
tracing::debug!("{:?}", msg);
let mut t = begin_t!(db);
let dbm = EnsureActiveShoppingCart {
buyer_id: msg.buyer_id,
};
let cart = match dbm.run(&mut t).await {
Ok(res) => res,
Err(e) => {
tracing::error!("{}", e);
return Output::error(Error::InternalServerError);
}
};
let dbm = UpdateShoppingCart {
id: cart.id,
buyer_id: msg.buyer_id,
payment_method: msg.payment_method.unwrap_or(cart.payment_method),
state: model::ShoppingCartState::Active,
checkout_notes: if msg.checkout_notes.is_empty() {
None
} else {
Some(msg.checkout_notes)
},
};
let cart: model::ShoppingCart = match dbm.run(&mut t).await {
Ok(res) => res,
Err(e) => {
tracing::error!("{}", e);
return Output::error(Error::ModifyCart(cart.id));
}
};
let existing =
msg.items
.iter()
.fold(HashSet::with_capacity(msg.items.len()), |mut agg, item| {
agg.insert(item.product_id);
agg
});
let dbm = CartItems {
shopping_cart_id: cart.id,
};
let items: Vec<model::ShoppingCartItem> = match dbm.run(&mut t).await {
Ok(v) => v,
Err(e) => {
tracing::error!("{}", e);
return Output::error(Error::LoadItems(cart.id));
}
};
for item in items
.into_iter()
.filter(|item| !existing.contains(&item.product_id))
{
let dbm = RemoveCartItem {
shopping_cart_id: cart.id,
shopping_cart_item_id: Some(item.id),
product_id: None,
};
match dbm.run(&mut t).await {
Ok(_) => {}
Err(e) => {
tracing::error!("{}", e);
return Output::error(Error::DeleteItem(item.id));
}
};
}
let mut out = Vec::with_capacity(msg.items.len());
for item in msg.items {
let res = modify_item(item, db.clone()).await;
if let carts::modify_item::Output {
error: Some(error), ..
} = &res
{
return Output::error(error.clone());
}
if let Some(item) = res.item {
out.push(item);
}
}
end_t!(
t,
Output::cart(CartDetails {
cart_id: cart.id,
items: out,
checkout_notes: cart.checkout_notes.unwrap_or_default(),
payment_method: cart.payment_method,
})
)
}

View File

@ -0,0 +1,32 @@
pub mod shopping_cart_items;
pub mod shopping_carts;
use config::SharedAppConfig;
pub use shopping_cart_items::*;
pub use shopping_carts::*;
use sqlx_core::pool::Pool;
use sqlx_core::postgres::Postgres;
#[derive(Clone)]
pub struct Database {
pub pool: sqlx::PgPool,
_config: SharedAppConfig,
}
impl Database {
pub async fn build(config: SharedAppConfig) -> Self {
let url = config.lock().cart_manager().database_url.clone();
let pool = sqlx::PgPool::connect(&url).await.unwrap_or_else(|e| {
tracing::error!("Failed to connect to database. {e:?}");
std::process::exit(1);
});
Self {
pool,
_config: config,
}
}
pub fn pool(&self) -> Pool<Postgres> {
self.pool.clone()
}
}

View File

@ -0,0 +1,631 @@
use model::*;
pub type Result<T> = std::result::Result<T, Error>;
#[derive(Debug, Copy, Clone, PartialEq, Eq, serde::Serialize, thiserror::Error)]
pub enum Error {
#[error("Can't create shopping cart item")]
CantCreate,
#[error("Can't update shopping cart item")]
CantUpdate(ShoppingCartItemId),
#[error("Shopping cart does not exists")]
NotExists,
#[error("Failed to load all shopping cart items")]
All,
#[error("Failed to load account shopping cart items")]
AccountCarts,
#[error("Failed to load items for shopping cart {0}")]
CartItems(ShoppingCartId),
#[error("Can't find shopping cart item doe to lack of identity")]
NoIdentity,
#[error("Failed to update shopping cart item with id {shopping_cart_item_id:?} and/or product id {product_id:?}")]
Update {
shopping_cart_item_id: Option<ShoppingCartItemId>,
product_id: Option<ProductId>,
},
}
#[derive(Debug)]
pub struct AllShoppingCartItems;
impl AllShoppingCartItems {
pub async fn run(
self,
t: &mut sqlx::Transaction<'_, sqlx::Postgres>,
) -> Result<Vec<ShoppingCartItem>> {
sqlx::query_as(
r#"
SELECT shopping_cart_items.id,
shopping_cart_items.product_id,
shopping_cart_items.shopping_cart_id,
shopping_cart_items.quantity,
shopping_cart_items.quantity_unit
FROM shopping_cart_items
INNER JOIN shopping_carts
ON shopping_cart_items.shopping_cart_id = shopping_carts.id
ORDER BY shopping_cart_items.id ASC
"#,
)
.fetch_all(t)
.await
.map_err(|e| {
tracing::error!("{e:?}");
dbg!(e);
Error::All
})
}
}
#[derive(Debug)]
pub struct AccountShoppingCartItems {
pub account_id: AccountId,
pub shopping_cart_id: Option<ShoppingCartId>,
}
impl AccountShoppingCartItems {
pub async fn run(
self,
t: &mut sqlx::Transaction<'_, sqlx::Postgres>,
) -> Result<Vec<ShoppingCartItem>> {
let msg = self;
match msg.shopping_cart_id {
Some(shopping_cart_id) => sqlx::query_as(
r#"
SELECT shopping_cart_items.id as id,
shopping_cart_items.product_id as product_id,
shopping_cart_items.shopping_cart_id as shopping_cart_id,
shopping_cart_items.quantity as quantity,
shopping_cart_items.quantity_unit as quantity_unit
FROM shopping_cart_items
LEFT JOIN shopping_carts
ON shopping_carts.id = shopping_cart_id
WHERE shopping_carts.buyer_id = $1 AND shopping_carts.id = $2
"#,
)
.bind(msg.account_id)
.bind(shopping_cart_id),
None => sqlx::query_as(
r#"
SELECT shopping_cart_items.id as id,
shopping_cart_items.product_id as product_id,
shopping_cart_items.shopping_cart_id as shopping_cart_id,
shopping_cart_items.quantity as quantity,
shopping_cart_items.quantity_unit as quantity_unit
FROM shopping_cart_items
LEFT JOIN shopping_carts
ON shopping_carts.id = shopping_cart_id
WHERE shopping_carts.buyer_id = $1
"#,
)
.bind(msg.account_id),
}
.fetch_all(t)
.await
.map_err(|e| {
tracing::error!("{e:?}");
Error::AccountCarts
})
}
}
#[derive(Debug)]
pub struct CreateShoppingCartItem {
pub product_id: ProductId,
pub shopping_cart_id: ShoppingCartId,
pub quantity: Quantity,
pub quantity_unit: QuantityUnit,
}
impl CreateShoppingCartItem {
pub async fn run(
self,
t: &mut sqlx::Transaction<'_, sqlx::Postgres>,
) -> Result<ShoppingCartItem> {
let msg = self;
sqlx::query_as(
r#"
INSERT INTO shopping_cart_items (product_id, shopping_cart_id, quantity, quantity_unit)
VALUES ($1, $2, $3, $4)
RETURNING id, product_id, shopping_cart_id, quantity, quantity_unit
"#,
)
.bind(msg.product_id)
.bind(msg.shopping_cart_id)
.bind(msg.quantity)
.bind(msg.quantity_unit)
.fetch_one(t)
.await
.map_err(|e| {
tracing::error!("{e:?}");
dbg!(&e);
Error::CantCreate
})
}
}
#[derive(Debug)]
pub struct UpdateShoppingCartItem {
pub id: ShoppingCartItemId,
pub product_id: ProductId,
pub shopping_cart_id: ShoppingCartId,
pub quantity: Quantity,
pub quantity_unit: QuantityUnit,
}
impl UpdateShoppingCartItem {
pub async fn run(
self,
t: &mut sqlx::Transaction<'_, sqlx::Postgres>,
) -> Result<ShoppingCartItem> {
let msg = self;
sqlx::query_as(
r#"
UPDATE shopping_cart_items
SET product_id = $2, shopping_cart_id = $3, quantity = $4, quantity_unit = $5
WHERE id = $1
RETURNING id, product_id, shopping_cart_id, quantity, quantity_unit
"#,
)
.bind(msg.id)
.bind(msg.product_id)
.bind(msg.shopping_cart_id)
.bind(msg.quantity)
.bind(msg.quantity_unit)
.fetch_one(t)
.await
.map_err(|e| {
tracing::error!("{e:?}");
Error::CantUpdate(msg.id)
})
}
}
#[derive(Debug)]
pub struct DeleteShoppingCartItem {
pub id: ShoppingCartItemId,
}
impl DeleteShoppingCartItem {
pub async fn run(
self,
t: &mut sqlx::Transaction<'_, sqlx::Postgres>,
) -> Result<Option<ShoppingCartItem>> {
let msg = self;
sqlx::query_as(
r#"
DELETE FROM shopping_cart_items
WHERE id = $1
RETURNING id, product_id, shopping_cart_id, quantity, quantity_unit
"#,
)
.bind(msg.id)
.fetch_optional(t)
.await
.map_err(|e| {
tracing::error!("{e:?}");
Error::CantUpdate(msg.id)
})
}
}
#[derive(Debug)]
pub struct FindShoppingCartItem {
pub id: ShoppingCartItemId,
}
impl FindShoppingCartItem {
pub async fn run(
self,
t: &mut sqlx::Transaction<'_, sqlx::Postgres>,
) -> Result<ShoppingCartItem> {
let msg = self;
sqlx::query_as(
r#"
SELECT id, product_id, shopping_cart_id, quantity, quantity_unit
FROM shopping_cart_items
WHERE id = $1
"#,
)
.bind(msg.id)
.fetch_one(t)
.await
.map_err(|e| {
tracing::error!("{e:?}");
Error::NotExists
})
}
}
#[derive(Debug)]
pub struct ActiveCartItemByProduct {
pub product_id: ProductId,
}
impl ActiveCartItemByProduct {
pub async fn run(
self,
t: &mut sqlx::Transaction<'_, sqlx::Postgres>,
) -> Result<Option<ShoppingCartItem>> {
let msg = self;
sqlx::query_as(
r#"
SELECT shopping_cart_items.id,
shopping_cart_items.product_id,
shopping_cart_items.shopping_cart_id,
shopping_cart_items.quantity,
shopping_cart_items.quantity_unit
FROM shopping_cart_items
INNER JOIN shopping_carts
ON shopping_cart_items.shopping_cart_id = shopping_carts.id
WHERE product_id = $1
AND shopping_carts.state = $2
ORDER BY shopping_cart_items.id ASC
"#,
)
.bind(msg.product_id)
.bind(model::ShoppingCartState::Active)
.fetch_optional(t)
.await
.map_err(|e| {
tracing::error!("{e:?}");
Error::NotExists
})
}
}
#[derive(Debug)]
pub struct CartItems {
pub shopping_cart_id: ShoppingCartId,
}
impl CartItems {
pub async fn run(
self,
t: &mut sqlx::Transaction<'_, sqlx::Postgres>,
) -> Result<Vec<ShoppingCartItem>> {
let msg = self;
let shopping_cart_id = msg.shopping_cart_id;
sqlx::query_as(
r#"
SELECT id,
product_id,
shopping_cart_id,
quantity,
quantity_unit
FROM shopping_cart_items
WHERE shopping_cart_id = $1
ORDER BY shopping_cart_items.id ASC
"#,
)
.bind(msg.shopping_cart_id)
.fetch_all(t)
.await
.map_err(|e| {
tracing::error!("{e:?}");
Error::CartItems(shopping_cart_id)
})
}
}
#[derive(Debug)]
pub struct RemoveCartItem {
pub shopping_cart_id: ShoppingCartId,
pub shopping_cart_item_id: Option<ShoppingCartItemId>,
pub product_id: Option<ProductId>,
}
impl RemoveCartItem {
pub async fn run(
self,
t: &mut sqlx::Transaction<'_, sqlx::Postgres>,
) -> Result<Option<ShoppingCartItem>> {
let msg = self;
match (msg.shopping_cart_item_id, msg.product_id) {
(Some(shopping_cart_item_id), None) => sqlx::query_as(
r#"
DELETE FROM shopping_cart_items
WHERE shopping_cart_id = $1 AND id = $2
RETURNING id, product_id, shopping_cart_id, quantity, quantity_unit
"#,
)
.bind(msg.shopping_cart_id)
.bind(shopping_cart_item_id),
(Some(shopping_cart_item_id), Some(product_id)) => sqlx::query_as(
r#"
DELETE FROM shopping_cart_items
WHERE shopping_cart_id = $1 AND id = $2 AND product_id = $3
RETURNING id, product_id, shopping_cart_id, quantity, quantity_unit
"#,
)
.bind(msg.shopping_cart_id)
.bind(shopping_cart_item_id)
.bind(product_id),
(None, Some(product_id)) => sqlx::query_as(
r#"
DELETE FROM shopping_cart_items
WHERE shopping_cart_id = $1 AND product_id = $2
RETURNING id, product_id, shopping_cart_id, quantity, quantity_unit
"#,
)
.bind(msg.shopping_cart_id)
.bind(product_id),
_ => return Err(Error::NoIdentity),
}
.fetch_optional(t)
.await
.map_err(|e| {
tracing::error!("{e:?}");
Error::Update {
shopping_cart_item_id: msg.shopping_cart_item_id,
product_id: msg.product_id,
}
})
}
}
#[cfg(test)]
mod tests {
use config::UpdateConfig;
use fake::Fake;
use model::*;
use uuid::Uuid;
pub struct NoOpts;
impl UpdateConfig for NoOpts {}
use super::*;
async fn test_product(t: &mut sqlx::Transaction<'_, sqlx::Postgres>) -> Product {
CreateProduct {
name: ProductName::new(format!("{}", Uuid::new_v4())),
short_description: ProductShortDesc::new(format!("{}", Uuid::new_v4())),
long_description: ProductLongDesc::new(format!("{}", Uuid::new_v4())),
category: None,
price: Price::from_u32(4687),
deliver_days_flag: Days(vec![Day::Friday, Day::Sunday]),
}
.run(t)
.await
.unwrap()
}
async fn test_account(
t: &mut sqlx::Transaction<'_, sqlx::Postgres>,
login: Option<String>,
email: Option<String>,
hash: Option<String>,
) -> FullAccount {
use fake::faker::internet::en;
let login: String = login.unwrap_or_else(|| en::Username().fake());
let email: String = email.unwrap_or_else(|| en::FreeEmail().fake());
let hash: String = hash.unwrap_or_else(|| en::Password(10..20).fake());
CreateAccount {
email: Email::new(email),
login: Login::new(login),
pass_hash: PassHash::new(hash),
role: Role::Admin,
}
.run(t)
.await
.unwrap()
}
async fn test_shopping_cart(
t: &mut sqlx::Transaction<'_, sqlx::Postgres>,
buyer_id: Option<AccountId>,
state: ShoppingCartState,
) -> ShoppingCart {
let buyer_id = match buyer_id {
Some(id) => id,
_ => test_account(&mut *t, None, None, None).await.id,
};
sqlx::query(
r#"
UPDATE shopping_carts
SET state = 'closed'
WHERE buyer_id = $1
"#,
)
.bind(buyer_id)
.execute(&mut *t)
.await
.unwrap();
let cart = CreateShoppingCart {
buyer_id,
payment_method: PaymentMethod::PaymentOnTheSpot,
}
.run(&mut *t)
.await
.unwrap();
UpdateShoppingCart {
id: cart.id,
buyer_id: cart.buyer_id,
payment_method: cart.payment_method,
state,
checkout_notes: None,
}
.run(&mut *t)
.await
.unwrap()
}
async fn test_shopping_cart_item(
t: &mut sqlx::Transaction<'_, sqlx::Postgres>,
shopping_cart_id: Option<ShoppingCartId>,
product_id: Option<ProductId>,
) -> ShoppingCartItem {
let shopping_cart_id = match shopping_cart_id {
Some(id) => id,
_ => {
test_shopping_cart(&mut *t, None, ShoppingCartState::Closed)
.await
.id
}
};
let product_id = match product_id {
Some(id) => id,
_ => test_product(&mut *t).await.id,
};
CreateShoppingCartItem {
product_id,
shopping_cart_id,
quantity: Quantity::from_u32(496879),
quantity_unit: QuantityUnit::Gram,
}
.run(t)
.await
.unwrap()
}
#[actix::test]
async fn create() {
testx::db_t_ref!(t);
test_shopping_cart_item(&mut t, None, None).await;
testx::db_rollback!(t);
}
#[actix::test]
async fn all() {
testx::db_t_ref!(t);
let account_id = test_account(&mut t, None, None, None).await.id;
let mut items = Vec::with_capacity(9);
let cart1 = test_shopping_cart(&mut t, Some(account_id), ShoppingCartState::Closed).await;
items.push(test_shopping_cart_item(&mut t, Some(cart1.id), None).await);
items.push(test_shopping_cart_item(&mut t, Some(cart1.id), None).await);
items.push(test_shopping_cart_item(&mut t, Some(cart1.id), None).await);
let cart2 = test_shopping_cart(&mut t, Some(account_id), ShoppingCartState::Active).await;
items.push(test_shopping_cart_item(&mut t, Some(cart2.id), None).await);
items.push(test_shopping_cart_item(&mut t, Some(cart2.id), None).await);
items.push(test_shopping_cart_item(&mut t, Some(cart2.id), None).await);
let cart3 = test_shopping_cart(&mut t, Some(account_id), ShoppingCartState::Closed).await;
items.push(test_shopping_cart_item(&mut t, Some(cart3.id), None).await);
items.push(test_shopping_cart_item(&mut t, Some(cart3.id), None).await);
items.push(test_shopping_cart_item(&mut t, Some(cart3.id), None).await);
let all = all_shopping_cart_items(AllShoppingCartItems, &mut t)
.await
.unwrap();
testx::db_rollback!(t);
assert_eq!(all, items)
}
#[actix::test]
async fn account_cart_with_cart_id() {
testx::db_t_ref!(t);
let account_id = test_account(&mut t, None, None, None).await.id;
let mut items = Vec::with_capacity(9);
let cart1 = test_shopping_cart(&mut t, Some(account_id), ShoppingCartState::Closed).await;
test_shopping_cart_item(&mut t, Some(cart1.id), None).await;
test_shopping_cart_item(&mut t, Some(cart1.id), None).await;
test_shopping_cart_item(&mut t, Some(cart1.id), None).await;
let cart2 = test_shopping_cart(&mut t, Some(account_id), ShoppingCartState::Active).await;
items.push(test_shopping_cart_item(&mut t, Some(cart2.id), None).await);
items.push(test_shopping_cart_item(&mut t, Some(cart2.id), None).await);
items.push(test_shopping_cart_item(&mut t, Some(cart2.id), None).await);
let cart3 = test_shopping_cart(&mut t, Some(account_id), ShoppingCartState::Closed).await;
test_shopping_cart_item(&mut t, Some(cart3.id), None).await;
test_shopping_cart_item(&mut t, Some(cart3.id), None).await;
test_shopping_cart_item(&mut t, Some(cart3.id), None).await;
let all = account_shopping_cart_items(
AccountShoppingCartItems {
account_id,
shopping_cart_id: Some(cart2.id),
},
&mut t,
)
.await
.unwrap();
testx::db_rollback!(t);
assert_eq!(all, items)
}
#[actix::test]
async fn account_cart_without_cart_id() {
testx::db_t_ref!(t);
let account_id = test_account(&mut t, None, None, None).await.id;
let mut items = Vec::with_capacity(9);
let cart1 = test_shopping_cart(&mut t, Some(account_id), ShoppingCartState::Closed).await;
items.push(test_shopping_cart_item(&mut t, Some(cart1.id), None).await);
items.push(test_shopping_cart_item(&mut t, Some(cart1.id), None).await);
items.push(test_shopping_cart_item(&mut t, Some(cart1.id), None).await);
let cart2 = test_shopping_cart(&mut t, Some(account_id), ShoppingCartState::Active).await;
items.push(test_shopping_cart_item(&mut t, Some(cart2.id), None).await);
items.push(test_shopping_cart_item(&mut t, Some(cart2.id), None).await);
items.push(test_shopping_cart_item(&mut t, Some(cart2.id), None).await);
let cart3 = test_shopping_cart(&mut t, Some(account_id), ShoppingCartState::Closed).await;
items.push(test_shopping_cart_item(&mut t, Some(cart3.id), None).await);
items.push(test_shopping_cart_item(&mut t, Some(cart3.id), None).await);
items.push(test_shopping_cart_item(&mut t, Some(cart3.id), None).await);
let all = account_shopping_cart_items(
AccountShoppingCartItems {
account_id,
shopping_cart_id: None,
},
&mut t,
)
.await
.unwrap();
testx::db_rollback!(t);
assert_eq!(all, items)
}
#[actix::test]
async fn update() {
testx::db_t_ref!(t);
let account_id = test_account(&mut t, None, None, None).await.id;
let cart1 = test_shopping_cart(&mut t, Some(account_id), ShoppingCartState::Closed).await;
let item = test_shopping_cart_item(&mut t, Some(cart1.id), None).await;
let updated = update_shopping_cart_item(
UpdateShoppingCartItem {
id: item.id,
product_id: item.product_id,
shopping_cart_id: item.shopping_cart_id,
quantity: Quantity::from_u32(987979879),
quantity_unit: QuantityUnit::Kilogram,
},
&mut t,
)
.await
.unwrap();
assert_ne!(item, updated);
assert_eq!(
updated,
ShoppingCartItem {
id: item.id,
product_id: item.product_id,
shopping_cart_id: item.shopping_cart_id,
quantity: Quantity::from_u32(987979879),
quantity_unit: QuantityUnit::Kilogram,
}
);
}
}

View File

@ -0,0 +1,423 @@
use model::*;
use sqlx::PgPool;
pub type Result<T> = std::result::Result<T, Error>;
#[derive(Debug, Copy, Clone, PartialEq, Eq, serde::Serialize, thiserror::Error)]
pub enum Error {
#[error("Can't create shopping cart")]
CantCreate,
#[error("Can't update shopping cart {0}")]
CantUpdate(ShoppingCartId),
#[error("Shopping cart does not exists")]
NotExists,
#[error("Failed to load all shopping carts")]
All,
#[error("Failed to load account shopping carts")]
AccountCarts,
}
#[derive(Debug)]
pub struct AllShoppingCarts;
impl AllShoppingCarts {
pub async fn run(self, pool: PgPool) -> Result<Vec<ShoppingCart>> {
sqlx::query_as(
r#"
SELECT id, buyer_id, payment_method, state, checkout_notes
FROM shopping_carts
"#,
)
.fetch_all(&pool)
.await
.map_err(|e| {
tracing::error!("{e:?}");
Error::All
})
}
}
#[derive(Debug)]
pub struct AccountShoppingCarts {
pub account_id: AccountId,
pub state: Option<ShoppingCartState>,
}
impl AccountShoppingCarts {
pub async fn run(self, pool: PgPool) -> Result<Vec<ShoppingCart>> {
let msg = self;
if let Some(state) = msg.state {
sqlx::query_as(
r#"
SELECT id, buyer_id, payment_method, state, checkout_notes
FROM shopping_carts
WHERE buyer_id = $1 AND state = $2
"#,
)
.bind(msg.account_id)
.bind(state)
} else {
sqlx::query_as(
r#"
SELECT id, buyer_id, payment_method, state, checkout_notes
FROM shopping_carts
WHERE buyer_id = $1
"#,
)
.bind(msg.account_id)
}
.fetch_all(&pool)
.await
.map_err(|e| {
tracing::error!("{e:?}");
Error::AccountCarts
})
}
}
#[derive(Debug)]
pub struct CreateShoppingCart {
pub buyer_id: AccountId,
pub payment_method: PaymentMethod,
}
impl CreateShoppingCart {
pub async fn run(
self,
pool: &mut sqlx::Transaction<'_, sqlx::Postgres>,
) -> Result<ShoppingCart> {
let msg = self;
sqlx::query_as(
r#"
INSERT INTO shopping_carts (buyer_id, payment_method)
VALUES ($1, $2)
RETURNING id, buyer_id, payment_method, state, checkout_notes
"#,
)
.bind(msg.buyer_id)
.bind(msg.payment_method)
.fetch_one(pool)
.await
.map_err(|e| {
tracing::error!("{e:?}");
dbg!(e);
Error::CantCreate
})
}
}
#[derive(Debug)]
pub struct UpdateShoppingCart {
pub id: ShoppingCartId,
pub buyer_id: AccountId,
pub payment_method: PaymentMethod,
pub state: ShoppingCartState,
pub checkout_notes: Option<String>,
}
impl UpdateShoppingCart {
pub async fn run(
self,
pool: &mut sqlx::Transaction<'_, sqlx::Postgres>,
) -> Result<ShoppingCart> {
let msg = self;
sqlx::query_as(
r#"
UPDATE shopping_carts
SET buyer_id = $2, payment_method = $3, state = $4, checkout_notes = $5
WHERE id = $1
RETURNING id, buyer_id, payment_method, state, checkout_notes
"#,
)
.bind(msg.id)
.bind(msg.buyer_id)
.bind(msg.payment_method)
.bind(msg.state)
.bind(msg.checkout_notes)
.fetch_one(pool)
.await
.map_err(|e| {
tracing::error!("{e:?}");
Error::CantUpdate(msg.id)
})
}
}
#[derive(Debug)]
pub struct ShoppingCartSetState {
pub id: ShoppingCartId,
pub state: ShoppingCartState,
pub checkout_notes: Option<String>,
}
impl ShoppingCartSetState {
pub async fn run(
self,
pool: &mut sqlx::Transaction<'_, sqlx::Postgres>,
) -> Result<ShoppingCart> {
let msg = self;
sqlx::query_as(
r#"
UPDATE shopping_carts
SET state = $2, checkout_notes = $3
WHERE id = $1
RETURNING id, buyer_id, payment_method, state, checkout_notes
"#,
)
.bind(msg.id)
.bind(msg.state)
.bind(msg.checkout_notes)
.fetch_one(pool)
.await
.map_err(|e| {
tracing::error!("{e:?}");
Error::CantUpdate(msg.id)
})
}
}
#[derive(Debug)]
pub struct FindShoppingCart {
pub id: ShoppingCartId,
}
impl FindShoppingCart {
pub async fn run(
self,
pool: &mut sqlx::Transaction<'_, sqlx::Postgres>,
) -> Result<ShoppingCart> {
let msg = self;
sqlx::query_as(
r#"
SELECT id, buyer_id, payment_method, state, checkout_notes
FROM shopping_carts
WHERE id = $1
"#,
)
.bind(msg.id)
.fetch_one(pool)
.await
.map_err(|e| {
tracing::error!("{e:?}");
Error::NotExists
})
}
}
#[derive(Debug)]
pub struct EnsureActiveShoppingCart {
pub buyer_id: AccountId,
}
impl EnsureActiveShoppingCart {
pub async fn run(
self,
pool: &mut sqlx::Transaction<'_, sqlx::Postgres>,
) -> Result<ShoppingCart> {
let msg = self;
if let Ok(Some(cart)) = sqlx::query_as(
r#"
INSERT INTO shopping_carts (buyer_id, state)
VALUES ($1, 'active')
ON CONFLICT
DO NOTHING
RETURNING id, buyer_id, payment_method, state, checkout_notes
"#,
)
.bind(msg.buyer_id)
.fetch_optional(&mut *pool)
.await
.map_err(|e| {
tracing::error!("{e:?}");
Error::NotExists
}) {
return Ok(cart);
};
sqlx::query_as(
r#"
SELECT id, buyer_id, payment_method, state, checkout_notes
FROM shopping_carts
WHERE buyer_id = $1 AND state = 'active'
"#,
)
.bind(msg.buyer_id)
.fetch_one(pool)
.await
.map_err(|e| {
tracing::error!("{e:?}");
Error::NotExists
})
}
}
#[cfg(test)]
mod tests {
use config::UpdateConfig;
use fake::Fake;
use model::*;
pub struct NoOpts;
impl UpdateConfig for NoOpts {}
use super::*;
async fn test_account(
t: &mut sqlx::Transaction<'_, sqlx::Postgres>,
login: Option<String>,
email: Option<String>,
hash: Option<String>,
) -> FullAccount {
use fake::faker::internet::en;
let login: String = login.unwrap_or_else(|| en::Username().fake());
let email: String = email.unwrap_or_else(|| en::FreeEmail().fake());
let hash: String = hash.unwrap_or_else(|| en::Password(10..20).fake());
CreateAccount {
email: Email::new(email),
login: Login::new(login),
pass_hash: PassHash::new(hash),
role: Role::Admin,
}
.run(t)
.await
.unwrap()
}
async fn test_shopping_cart(
t: &mut sqlx::Transaction<'_, sqlx::Postgres>,
buyer_id: Option<AccountId>,
) -> ShoppingCart {
let buyer_id = match buyer_id {
Some(id) => id,
_ => test_account(&mut *t, None, None, None).await.id,
};
super::create_shopping_cart(
CreateShoppingCart {
buyer_id,
payment_method: PaymentMethod::PaymentOnTheSpot,
},
t,
)
.await
.unwrap()
}
#[actix::test]
async fn create_shopping_cart() {
testx::db_t_ref!(t);
let account = test_account(&mut t, None, None, None).await;
let cart = super::create_shopping_cart(
CreateShoppingCart {
buyer_id: account.id,
payment_method: PaymentMethod::PaymentOnTheSpot,
},
&mut t,
)
.await;
testx::db_rollback!(t);
assert!(cart.is_ok());
}
#[actix::test]
async fn update_shopping_cart() {
testx::db_t_ref!(t);
let account = test_account(&mut t, None, None, None).await;
let original = test_shopping_cart(&mut t, Some(account.id)).await;
let cart = super::update_shopping_cart(
UpdateShoppingCart {
id: original.id,
buyer_id: account.id,
payment_method: PaymentMethod::PayU,
state: ShoppingCartState::Closed,
checkout_notes: Some("Foo bar".into()),
},
&mut t,
)
.await
.unwrap();
testx::db_rollback!(t);
assert_ne!(cart, original);
assert_eq!(
cart,
ShoppingCart {
id: original.id,
buyer_id: account.id,
payment_method: PaymentMethod::PayU,
state: ShoppingCartState::Closed,
checkout_notes: Some("Foo bar".into())
}
);
}
#[actix::test]
async fn without_cart_ensure_shopping_cart() {
testx::db_t_ref!(t);
let account = test_account(&mut t, None, None, None).await;
let cart = super::ensure_active_shopping_cart(
EnsureActiveShoppingCart {
buyer_id: account.id,
},
&mut t,
)
.await
.unwrap();
let id = cart.id;
testx::db_rollback!(t);
assert_eq!(
cart,
model::ShoppingCart {
id,
buyer_id: account.id,
payment_method: Default::default(),
state: ShoppingCartState::Active,
checkout_notes: None
}
);
}
#[actix::test]
async fn with_inactive_cart_ensure_shopping_cart() {
testx::db_t_ref!(t);
let account = test_account(&mut t, None, None, None).await;
let original = test_shopping_cart(&mut t, Some(account.id)).await;
let _ = super::update_shopping_cart(
UpdateShoppingCart {
id: original.id,
buyer_id: account.id,
payment_method: Default::default(),
state: ShoppingCartState::Closed,
checkout_notes: None,
},
&mut t,
)
.await
.unwrap();
let cart = super::ensure_active_shopping_cart(
EnsureActiveShoppingCart {
buyer_id: account.id,
},
&mut t,
)
.await
.unwrap();
testx::db_rollback!(t);
assert_ne!(original, cart);
}
}

View File

@ -0,0 +1,50 @@
use config::UpdateConfig;
use tracing_subscriber::fmt::format::FmtSpan;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use crate::db::Database;
pub mod actions;
pub mod db;
pub mod mqtt;
pub mod rpc;
pub struct Opts {}
impl UpdateConfig for Opts {}
#[actix::main]
async fn main() {
dotenv::dotenv().ok();
init_tracing("account-manager");
let opts = Opts {};
let config = config::default_load(&opts);
let db = Database::build(config.clone()).await;
let mqtt_client = mqtt::start(config.clone(), db.clone()).await;
rpc::start(config.clone(), db.clone(), mqtt_client.clone()).await;
}
pub fn init_tracing(_service_name: &str) {
std::env::set_var("OTEL_BSP_MAX_EXPORT_BATCH_SIZE", "12");
let tracer = {
use opentelemetry::sdk::export::trace::stdout::new_pipeline;
use opentelemetry::sdk::trace::Config;
new_pipeline()
.with_trace_config(Config::default())
.with_pretty_print(true)
.install_simple()
};
tracing_subscriber::registry()
.with(tracing_subscriber::EnvFilter::from_default_env())
.with(tracing_subscriber::fmt::layer().with_span_events(FmtSpan::NEW | FmtSpan::CLOSE))
.with(tracing_opentelemetry::layer().with_tracer(tracer))
.try_init()
.unwrap();
}

View File

@ -0,0 +1,37 @@
use std::time::Duration;
use config::SharedAppConfig;
use rumqttc::{Event, Incoming};
use crate::Database;
pub async fn start(config: SharedAppConfig, _db: Database) -> channels::AsyncClient {
let mut mqtt_options = {
let l = config.lock();
let bind = &l.account_manager().mqtt_bind;
let port = l.account_manager().mqtt_port;
tracing::info!("Starting account mqtt at {}:{}", bind, port);
rumqttc::MqttOptions::new(channels::accounts::CLIENT_NAME, bind, port)
};
mqtt_options.set_keep_alive(Duration::from_secs(5));
let (client, mut event_loop) = rumqttc::AsyncClient::new(mqtt_options, 10);
let client = channels::AsyncClient(client);
let spawn_client = client.clone();
tokio::spawn(async move {
let _client = spawn_client.clone();
loop {
let notification = event_loop.poll().await;
match notification {
Ok(Event::Incoming(Incoming::Publish(publish))) => match publish.topic.as_str() {
_ => {}
},
_ => {}
}
}
});
client
}

View File

@ -0,0 +1,79 @@
use std::net::{IpAddr, Ipv4Addr};
use channels::carts::modify_item::{Input, Output};
use channels::carts::rpc::Carts;
use channels::AsyncClient;
use config::SharedAppConfig;
use futures::{future, StreamExt};
use tarpc::server::incoming::Incoming;
use tarpc::server::Channel;
use tarpc::tokio_serde::formats::Bincode;
use tarpc::{context, server};
use crate::db::Database;
#[derive(Clone)]
pub struct CartsServer {
db: Database,
_config: SharedAppConfig,
_mqtt_client: AsyncClient,
}
#[tarpc::server]
impl Carts for CartsServer {
async fn modify_item(self, _: context::Context, input: Input) -> Output {
crate::actions::modify_item(input, self.db).await
}
async fn modify_cart(
self,
_: context::Context,
input: channels::carts::modify_cart::Input,
) -> channels::carts::modify_cart::Output {
crate::actions::modify_cart(input, self.db).await
}
async fn remove_cart(
self,
_: context::Context,
input: channels::carts::remove_product::Input,
) -> channels::carts::remove_product::Output {
crate::actions::remove_product(input, self.db).await
}
}
pub async fn start(config: SharedAppConfig, db: Database, mqtt_client: AsyncClient) {
let port = { config.lock().cart_manager().port };
let server_addr = (IpAddr::V4(Ipv4Addr::LOCALHOST), port);
let mut listener = tarpc::serde_transport::tcp::listen(&server_addr, Bincode::default)
.await
.unwrap();
tracing::info!("Starting account rpc at {}", listener.local_addr());
listener.config_mut().max_frame_length(usize::MAX);
listener
// Ignore accept errors.
.filter_map(|r| future::ready(r.ok()))
.map(server::BaseChannel::with_defaults)
// Limit channels to 8 per IP.
.max_channels_per_key(8, |t| t.transport().peer_addr().unwrap().ip())
.max_concurrent_requests_per_channel(20)
// serve is generated by the service attribute. It takes as input any type implementing
// the generated World trait.
.map(|channel| {
channel.execute(
CartsServer {
db: db.clone(),
_config: config.clone(),
_mqtt_client: mqtt_client.clone(),
}
.serve(),
)
})
// Max 10 channels.
.buffer_unordered(10)
.for_each(|_| async {})
.await;
tracing::info!("RPC channel closed");
}

View File

@ -0,0 +1,149 @@
#[derive(Debug, thiserror::Error, serde::Serialize, serde::Deserialize)]
pub enum Error {
#[error("mqtt payload has invalid create account data")]
InvalidCreateAccount,
#[error("mqtt payload has invalid account failure data")]
InvalidAccountFailure,
#[error("Account does not exists")]
Account,
#[error("Account does have any addresses")]
Addresses,
}
pub static CLIENT_NAME: &str = "account-manager";
#[derive(Copy, Clone, Debug, PartialOrd, PartialEq, serde::Serialize, serde::Deserialize)]
pub enum Topic {
CreateAccount,
AccountCreated,
SignUpFailure,
}
impl Into<String> for Topic {
fn into(self) -> String {
String::from(self.to_str())
}
}
impl<'s> PartialEq<&'s str> for Topic {
fn eq(&self, other: &&'s str) -> bool {
self.to_str() == *other
}
}
impl PartialEq<String> for Topic {
fn eq(&self, other: &String) -> bool {
self.to_str() == other.as_str()
}
}
impl Topic {
pub fn to_str(self) -> &'static str {
match self {
Topic::CreateAccount => "account/create",
Topic::AccountCreated => "account/created",
Topic::SignUpFailure => "account/failure",
}
}
}
pub mod register {
use model::{Email, Login, Password, Role};
use crate::accounts::Error;
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct Input {
pub email: Email,
pub login: Login,
pub password: Password,
pub role: Role,
}
impl TryFrom<bytes::Bytes> for Input {
type Error = Error;
fn try_from(value: bytes::Bytes) -> Result<Self, Self::Error> {
bincode::deserialize(value.as_ref()).map_err(|e| {
tracing::error!("{}", e);
Error::InvalidCreateAccount
})
}
}
#[derive(Debug, Default, serde::Serialize, serde::Deserialize)]
pub struct Output {
pub account: Option<model::FullAccount>,
pub error: Option<Error>,
}
}
// #[derive(Debug, thiserror::Error, serde::Serialize, serde::Deserialize)]
// pub enum AccountFailure {
// #[error("Failed to hash password")]
// FailedToHashPassword,
// #[error("Failed to save account")]
// SaveAccount,
// #[error("Internal server error")]
// InternalServerError,
// }
//
// impl TryFrom<bytes::Bytes> for AccountFailure {
// type Error = Error;
//
// fn try_from(value: bytes::Bytes) -> Result<Self, Self::Error> {
// bincode::deserialize(value.as_ref()).map_err(|e| {
// tracing::error!("{}", e);
// Error::InvalidAccountFailure
// })
// }
// }
pub mod me {
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct Input {
pub account_id: model::AccountId,
}
#[derive(Debug, Default, serde::Serialize, serde::Deserialize)]
pub struct Output {
pub account: Option<model::FullAccount>,
pub addresses: Option<Vec<model::AccountAddress>>,
pub error: Option<super::Error>,
}
}
pub mod rpc {
use config::SharedAppConfig;
use crate::accounts::{me, register};
#[tarpc::service]
pub trait Accounts {
/// Returns a greeting for name.
async fn me(input: me::Input) -> me::Output;
/// Creates new user account.
async fn register_account(input: register::Input) -> register::Output;
}
pub async fn create_client(config: SharedAppConfig) -> AccountsClient {
use tarpc::client;
use tarpc::tokio_serde::formats::Bincode;
let addr = {
let l = config.lock();
(l.account_manager().bind.clone(), l.account_manager().port)
};
let transport = tarpc::serde_transport::tcp::connect(addr, Bincode::default);
let client = AccountsClient::new(
client::Config::default(),
transport.await.expect("Failed to connect to server"),
)
.spawn();
client
}
}

View File

@ -0,0 +1,147 @@
pub static CLIENT_NAME: &str = "cart-manager";
pub enum Topic {}
#[derive(Debug, Clone, thiserror::Error, serde::Serialize, serde::Deserialize)]
pub enum Error {
#[error("Internal server error")]
InternalServerError,
#[error("Failed to load account shopping carts")]
NoCarts,
#[error("Account does not have active shopping cart")]
NoActiveCart,
#[error("Failed to delete item {0:?}")]
DeleteItem(model::ShoppingCartItemId),
#[error("Failed to modify item {0:?}")]
ModifyItem(model::ShoppingCartItemId),
#[error("Failed to create item")]
CreateItem,
#[error("Failed to modify cart {0:?}")]
ModifyCart(model::ShoppingCartId),
#[error("Failed to load cart {0:?} items")]
LoadItems(model::ShoppingCartId),
}
pub mod remove_product {
use super::Error;
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct Input {
pub shopping_cart_id: model::ShoppingCartId,
pub shopping_cart_item_id: model::ShoppingCartItemId,
}
#[derive(Debug, Default, serde::Serialize, serde::Deserialize)]
pub struct Output {
pub item: Option<model::ShoppingCartItem>,
pub error: Option<Error>,
}
impl Output {
pub fn error(error: Error) -> Self {
Self {
error: Some(error),
..Default::default()
}
}
pub fn item(item: model::ShoppingCartItem) -> Self {
Self {
item: Some(item),
..Default::default()
}
}
}
}
pub mod modify_item {
use super::Error;
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct Input {
pub buyer_id: model::AccountId,
pub product_id: model::ProductId,
pub quantity: model::Quantity,
pub quantity_unit: model::QuantityUnit,
}
#[derive(Debug, Default, serde::Serialize, serde::Deserialize)]
pub struct Output {
pub item: Option<model::ShoppingCartItem>,
pub error: Option<Error>,
}
impl Output {
pub fn error(error: Error) -> Self {
Self {
error: Some(error),
..Default::default()
}
}
pub fn item(item: model::ShoppingCartItem) -> Self {
Self {
item: Some(item),
..Default::default()
}
}
}
}
pub mod modify_cart {
use super::{modify_item, Error};
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct Input {
pub buyer_id: model::AccountId,
pub items: Vec<modify_item::Input>,
pub checkout_notes: String,
pub payment_method: Option<model::PaymentMethod>,
}
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct CartDetails {
pub cart_id: model::ShoppingCartId,
pub items: Vec<model::ShoppingCartItem>,
pub checkout_notes: String,
pub payment_method: model::PaymentMethod,
}
#[derive(Debug, Default, serde::Serialize, serde::Deserialize)]
pub struct Output {
pub cart: Option<CartDetails>,
pub error: Option<Error>,
}
impl Output {
pub fn error(error: Error) -> Self {
Self {
error: Some(error),
..Default::default()
}
}
pub fn cart(cart: CartDetails) -> Self {
Self {
cart: Some(cart),
..Default::default()
}
}
}
}
pub mod rpc {
use super::{modify_cart, modify_item, remove_product};
#[tarpc::service]
pub trait Carts {
/// Change shopping cart item.
async fn modify_item(input: modify_item::Input) -> modify_item::Output;
/// Change entire shopping cart content.
async fn modify_cart(input: modify_cart::Input) -> modify_cart::Output;
/// Remove entire shopping cart.
async fn remove_cart(input: remove_product::Input) -> remove_product::Output;
}
}

View File

@ -0,0 +1,33 @@
#![feature(structural_match)]
pub mod accounts;
pub mod carts;
#[derive(Clone)]
pub struct AsyncClient(pub rumqttc::AsyncClient);
impl AsyncClient {
pub async fn publish<Topic: Into<String>, T: serde::Serialize>(
&self,
topic: Topic,
qos: rumqttc::QoS,
retain: bool,
t: T,
) -> Result<(), rumqttc::ClientError> {
let v = bincode::serialize(&t).unwrap_or_default();
let bytes = bytes::Bytes::copy_from_slice(&v);
self.0.publish_bytes(topic, qos, retain, bytes).await
}
pub async fn publish_or_log<Topic: Into<String>, T: serde::Serialize>(
&self,
topic: Topic,
qos: rumqttc::QoS,
retain: bool,
t: T,
) {
if let Err(e) = self.publish(topic, qos, retain, t).await {
tracing::error!("{}", e);
}
}
}

View File

@ -4,18 +4,12 @@ version = "0.1.0"
edition = "2021" edition = "2021"
[dependencies] [dependencies]
actix-web = { version = "4.0", features = [] } actix-web = { version = "4.0", features = [] }
parking_lot = { version = "0.12", features = [] } parking_lot = { version = "0.12", features = [] }
password-hash = { version = "0.4", features = ["alloc"] } password-hash = { version = "0.4", features = ["alloc"] }
pay_u = { version = '0.1', features = ["single-client"] } pay_u = { version = '0.1', features = ["single-client"] }
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_json = { version = "1.0", features = [] } serde_json = { version = "1.0", features = [] }
thiserror = { version = "1.0" } thiserror = { version = "1.0" }
toml = { version = "0.5", features = [] } toml = { version = "0.5", features = [] }
tracing = { version = "0.1.34" } tracing = { version = "0.1.34" }

View File

@ -13,8 +13,10 @@ pub trait UpdateConfig {
fn update_config(&self, _config: &mut AppConfig) {} fn update_config(&self, _config: &mut AppConfig) {}
} }
trait Example: Sized { trait Example: Sized + Default {
fn example() -> Self; fn example() -> Self {
Self::default()
}
} }
#[derive(Clone)] #[derive(Clone)]
@ -315,17 +317,7 @@ pub struct SearchConfig {
search_active: bool, search_active: bool,
} }
impl Example for SearchConfig { impl Example for SearchConfig {}
fn example() -> Self {
Self {
sonic_search_addr: None,
sonic_search_pass: None,
sonic_ingest_addr: None,
sonic_ingest_pass: None,
search_active: true,
}
}
}
impl Default for SearchConfig { impl Default for SearchConfig {
fn default() -> Self { fn default() -> Self {
@ -386,14 +378,7 @@ pub struct FilesConfig {
local_path: Option<String>, local_path: Option<String>,
} }
impl Example for FilesConfig { impl Example for FilesConfig {}
fn example() -> Self {
Self {
public_path: Some("/uploads".into()),
local_path: Some("/var/local/bazzar".into()),
}
}
}
impl Default for FilesConfig { impl Default for FilesConfig {
fn default() -> Self { fn default() -> Self {
@ -426,6 +411,9 @@ impl FilesConfig {
pub struct AccountManagerConfig { pub struct AccountManagerConfig {
pub port: u16, pub port: u16,
pub bind: String, pub bind: String,
pub mqtt_port: u16,
pub mqtt_bind: String,
pub database_url: String,
} }
impl Default for AccountManagerConfig { impl Default for AccountManagerConfig {
@ -433,19 +421,38 @@ impl Default for AccountManagerConfig {
Self { Self {
port: 19329, port: 19329,
bind: "0.0.0.0".into(), bind: "0.0.0.0".into(),
mqtt_port: 1883,
mqtt_bind: "0.0.0.0".into(),
database_url: "postgres://postgres@localhost/bazzar_accounts".into(),
} }
} }
} }
impl Example for AccountManagerConfig { impl Example for AccountManagerConfig {}
fn example() -> Self {
#[derive(Debug, Serialize, Deserialize)]
pub struct CartManagerConfig {
pub port: u16,
pub bind: String,
pub mqtt_port: u16,
pub mqtt_bind: String,
pub database_url: String,
}
impl Default for CartManagerConfig {
fn default() -> Self {
Self { Self {
port: 19329, port: 19330,
bind: "0.0.0.0".into(), bind: "0.0.0.0".into(),
mqtt_port: 1884,
mqtt_bind: "0.0.0.0".into(),
database_url: "postgres://postgres@localhost/bazzar_carts".into(),
} }
} }
} }
impl Example for CartManagerConfig {}
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
pub struct AppConfig { pub struct AppConfig {
#[serde(default)] #[serde(default)]
@ -462,6 +469,8 @@ pub struct AppConfig {
files: FilesConfig, files: FilesConfig,
#[serde(default)] #[serde(default)]
account_manager: AccountManagerConfig, account_manager: AccountManagerConfig,
#[serde(default)]
cart_manager: CartManagerConfig,
#[serde(skip)] #[serde(skip)]
config_path: String, config_path: String,
} }
@ -476,6 +485,7 @@ impl Example for AppConfig {
search: SearchConfig::example(), search: SearchConfig::example(),
files: FilesConfig::example(), files: FilesConfig::example(),
account_manager: AccountManagerConfig::example(), account_manager: AccountManagerConfig::example(),
cart_manager: Default::default(),
config_path: "".to_string(), config_path: "".to_string(),
} }
} }
@ -525,6 +535,10 @@ impl AppConfig {
pub fn account_manager(&self) -> &AccountManagerConfig { pub fn account_manager(&self) -> &AccountManagerConfig {
&self.account_manager &self.account_manager
} }
pub fn cart_manager(&self) -> &CartManagerConfig {
&self.cart_manager
}
} }
impl Default for AppConfig { impl Default for AppConfig {
@ -537,6 +551,7 @@ impl Default for AppConfig {
search: Default::default(), search: Default::default(),
files: FilesConfig::default(), files: FilesConfig::default(),
account_manager: AccountManagerConfig::default(), account_manager: AccountManagerConfig::default(),
cart_manager: Default::default(),
config_path: "".to_string(), config_path: "".to_string(),
} }
} }

View File

@ -11,10 +11,10 @@ actix = { version = "0.13", features = [] }
actix-rt = { version = "2.7", features = [] } actix-rt = { version = "2.7", features = [] }
async-trait = { version = "0.1.56" } async-trait = { version = "0.1.56" }
chrono = { version = "0.4", features = ["serde"] } chrono = { version = "0.4", features = ["serde"] }
config = { path = "../../shared/config" } config = { path = "../config" }
fake = { version = "2.4.3", features = ["derive", "chrono", "http", "uuid"], optional = true } fake = { version = "2.4.3", features = ["derive", "chrono", "http", "uuid"], optional = true }
itertools = { version = "0.10.3" } itertools = { version = "0.10.3" }
model = { path = "../../shared/model" } model = { path = "../model" }
pretty_env_logger = { version = "0.4", features = [] } pretty_env_logger = { version = "0.4", features = [] }
rand = { version = "0.8.5", optional = true } rand = { version = "0.8.5", optional = true }
rumqttc = { version = "*" } rumqttc = { version = "*" }
@ -26,4 +26,4 @@ tracing = { version = "0.1.34" }
uuid = { version = "1.2.1", features = ["serde"] } uuid = { version = "1.2.1", features = ["serde"] }
[dev-dependencies] [dev-dependencies]
testx = { path = "../../shared/testx" } testx = { path = "../testx" }

View File

@ -156,7 +156,7 @@ pub struct FindAccount {
db_async_handler!(FindAccount, find_account, FullAccount, inner_find_account); db_async_handler!(FindAccount, find_account, FullAccount, inner_find_account);
pub(crate) async fn find_account( pub async fn find_account(
msg: FindAccount, msg: FindAccount,
pool: &mut sqlx::Transaction<'_, sqlx::Postgres>, pool: &mut sqlx::Transaction<'_, sqlx::Postgres>,
) -> Result<FullAccount> { ) -> Result<FullAccount> {

View File

@ -101,7 +101,7 @@ db_async_handler!(
inner_create_shopping_cart inner_create_shopping_cart
); );
pub(crate) async fn create_shopping_cart( pub async fn create_shopping_cart(
msg: CreateShoppingCart, msg: CreateShoppingCart,
pool: &mut sqlx::Transaction<'_, sqlx::Postgres>, pool: &mut sqlx::Transaction<'_, sqlx::Postgres>,
) -> Result<ShoppingCart> { ) -> Result<ShoppingCart> {
@ -140,7 +140,7 @@ db_async_handler!(
inner_update_shopping_cart inner_update_shopping_cart
); );
pub(crate) async fn update_shopping_cart( pub async fn update_shopping_cart(
msg: UpdateShoppingCart, msg: UpdateShoppingCart,
pool: &mut sqlx::Transaction<'_, sqlx::Postgres>, pool: &mut sqlx::Transaction<'_, sqlx::Postgres>,
) -> Result<ShoppingCart> { ) -> Result<ShoppingCart> {
@ -180,7 +180,7 @@ db_async_handler!(
inner_shopping_cart_set_state inner_shopping_cart_set_state
); );
pub(crate) async fn shopping_cart_set_state( pub async fn shopping_cart_set_state(
msg: ShoppingCartSetState, msg: ShoppingCartSetState,
pool: &mut sqlx::Transaction<'_, sqlx::Postgres>, pool: &mut sqlx::Transaction<'_, sqlx::Postgres>,
) -> Result<ShoppingCart> { ) -> Result<ShoppingCart> {
@ -216,7 +216,7 @@ db_async_handler!(
inner_find_shopping_cart inner_find_shopping_cart
); );
pub(crate) async fn find_shopping_cart( pub async fn find_shopping_cart(
msg: FindShoppingCart, msg: FindShoppingCart,
pool: &mut sqlx::Transaction<'_, sqlx::Postgres>, pool: &mut sqlx::Transaction<'_, sqlx::Postgres>,
) -> Result<ShoppingCart> { ) -> Result<ShoppingCart> {
@ -249,7 +249,7 @@ db_async_handler!(
inner_ensure_active_shopping_cart inner_ensure_active_shopping_cart
); );
pub(crate) async fn ensure_active_shopping_cart( pub async fn ensure_active_shopping_cart(
msg: EnsureActiveShoppingCart, msg: EnsureActiveShoppingCart,
pool: &mut sqlx::Transaction<'_, sqlx::Postgres>, pool: &mut sqlx::Transaction<'_, sqlx::Postgres>,
) -> Result<ShoppingCart> { ) -> Result<ShoppingCart> {

View File

@ -4,29 +4,20 @@ version = "0.1.0"
edition = "2021" edition = "2021"
[dependencies] [dependencies]
model = { path = "../shared/model", version = "0.1", features = ["db", "dummy"] }
config = { path = "../shared/config" }
database_manager = { path = "../actors/database_manager", features = ["dummy"] }
fs_manager = { path = "../actors/fs_manager", features = [] }
bytes = { version = "1.1.0" }
actix = { version = "0.13", features = [] } actix = { version = "0.13", features = [] }
actix-rt = { version = "2.7", features = [] } actix-rt = { version = "2.7", features = [] }
actix-web = { version = "4.0", features = [] } actix-web = { version = "4.0", features = [] }
bytes = { version = "1.1.0" }
tokio = { version = "1.18.1", features = ["full"] } config = { path = "../config" }
database_manager = { path = "../database_manager", features = ["dummy"] }
fake = { version = "2.4.3", features = ["derive", "chrono", "http"] }
rand = { version = "0.8.5" }
dotenv = { version = "0.15", features = [] } dotenv = { version = "0.15", features = [] }
fake = { version = "2.4.3", features = ["derive", "chrono", "http"] }
fs_manager = { path = "../fs_manager", features = [] }
human-panic = { version = "1.0.3" }
model = { path = "../model", version = "0.1", features = ["db", "dummy"] }
password-hash = { version = "0.4", features = ["alloc"] }
rand = { version = "0.8.5" }
thiserror = { version = "1.0.31" }
tokio = { version = "1.18.1", features = ["full"] }
tracing = { version = "0.1.34" } tracing = { version = "0.1.34" }
tracing-subscriber = { version = "0.3.11" } tracing-subscriber = { version = "0.3.11" }
password-hash = { version = "0.4", features = ["alloc"] }
thiserror = { version = "1.0.31" }
human-panic = { version = "1.0.3" }

View File

@ -7,8 +7,8 @@ edition = "2021"
actix = { version = "0.13", features = [] } actix = { version = "0.13", features = [] }
actix-rt = { version = "2.7", features = [] } actix-rt = { version = "2.7", features = [] }
chrono = { version = "0.4", features = ["serde"] } chrono = { version = "0.4", features = ["serde"] }
config = { path = "../../shared/config" } config = { path = "../config" }
model = { path = "../../shared/model" } model = { path = "../model" }
pretty_env_logger = { version = "0.4", features = [] } pretty_env_logger = { version = "0.4", features = [] }
rumqttc = { version = "*" } rumqttc = { version = "*" }
sendgrid = { version = "0.17", features = ["async"] } sendgrid = { version = "0.17", features = ["async"] }

View File

@ -9,9 +9,9 @@ actix-rt = { version = "2.7", features = [] }
actix-web = { version = "4.0.1" } actix-web = { version = "4.0.1" }
bytes = { version = "1.1.0" } bytes = { version = "1.1.0" }
chrono = { version = "0.4", features = ["serde"] } chrono = { version = "0.4", features = ["serde"] }
config = { path = "../../shared/config" } config = { path = "../config" }
fibers_rpc = { version = "0.3.4", features = [] } fibers_rpc = { version = "0.3.4", features = [] }
model = { path = "../../shared/model" } model = { path = "../model" }
pretty_env_logger = { version = "0.4", features = [] } pretty_env_logger = { version = "0.4", features = [] }
rumqttc = { version = "*" } rumqttc = { version = "*" }
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }

View File

@ -6,9 +6,9 @@ edition = "2021"
[dependencies] [dependencies]
actix = { version = "0.13", features = [] } actix = { version = "0.13", features = [] }
actix-rt = { version = "2.7", features = [] } actix-rt = { version = "2.7", features = [] }
config = { path = "../../shared/config" } config = { path = "../config" }
fluent = { version = "0.16.0" } fluent = { version = "0.16.0" }
model = { path = "../../shared/model" } model = { path = "../model" }
pretty_env_logger = { version = "0.4", features = [] } pretty_env_logger = { version = "0.4", features = [] }
rumqttc = { version = "*" } rumqttc = { version = "*" }
thiserror = { version = "1.0.31" } thiserror = { version = "1.0.31" }

View File

@ -7,9 +7,9 @@ edition = "2021"
actix = { version = "0.13", features = [] } actix = { version = "0.13", features = [] }
actix-rt = { version = "2.7", features = [] } actix-rt = { version = "2.7", features = [] }
chrono = { version = "0.4", features = ["serde"] } chrono = { version = "0.4", features = ["serde"] }
config = { path = "../../shared/config" } config = { path = "../config" }
database_manager = { path = "../database_manager" } database_manager = { path = "../database_manager" }
model = { path = "../../shared/model" } model = { path = "../model" }
pretty_env_logger = { version = "0.4", features = [] } pretty_env_logger = { version = "0.4", features = [] }
rumqttc = { version = "*" } rumqttc = { version = "*" }
serde = { version = "1.0.137", features = ["derive"] } serde = { version = "1.0.137", features = ["derive"] }

Some files were not shown because too many files have changed in this diff Show More