Start orders

This commit is contained in:
Adrian Woźniak 2022-11-29 15:18:31 +01:00
parent 6eb60f4223
commit 91d9dff415
34 changed files with 853 additions and 228 deletions

1
.env
View File

@ -3,6 +3,7 @@ DATABASE_URL=postgres://postgres@localhost/bazzar
ACCOUNT_DATABASE_URL=postgres://postgres@localhost/bazzar_accounts ACCOUNT_DATABASE_URL=postgres://postgres@localhost/bazzar_accounts
CART_DATABASE_URL=postgres://postgres@localhost/bazzar_carts CART_DATABASE_URL=postgres://postgres@localhost/bazzar_carts
STOCK_DATABASE_URL=postgres://postgres@localhost/bazzar_stocks STOCK_DATABASE_URL=postgres://postgres@localhost/bazzar_stocks
ORDER_DATABASE_URL=postgres://postgres@localhost/bazzar_orders
PASS_SALT=18CHwV7eGFAea16z+qMKZg PASS_SALT=18CHwV7eGFAea16z+qMKZg
RUST_LOG=debug RUST_LOG=debug

27
Cargo.lock generated
View File

@ -18,7 +18,6 @@ dependencies = [
"model", "model",
"opentelemetry 0.17.0", "opentelemetry 0.17.0",
"opentelemetry-jaeger", "opentelemetry-jaeger",
"pretty_env_logger",
"rumqttc", "rumqttc",
"serde", "serde",
"sqlx", "sqlx",
@ -2444,6 +2443,32 @@ dependencies = [
"thiserror", "thiserror",
] ]
[[package]]
name = "order_manager"
version = "0.1.0"
dependencies = [
"channels",
"chrono",
"config",
"db-utils",
"fake",
"model",
"opentelemetry 0.17.0",
"opentelemetry-jaeger",
"rumqttc",
"serde",
"sqlx",
"sqlx-core",
"tarpc",
"testx",
"thiserror",
"tokio",
"tracing",
"tracing-opentelemetry",
"tracing-subscriber",
"uuid 1.2.1",
]
[[package]] [[package]]
name = "ordered-float" name = "ordered-float"
version = "1.1.1" version = "1.1.1"

View File

@ -11,7 +11,7 @@ members = [
"crates/cart_manager", "crates/cart_manager",
# "crates/database_manager", # "crates/database_manager",
"crates/email_manager", "crates/email_manager",
# "crates/order_manager", "crates/order_manager",
# "crates/payment_manager", # "crates/payment_manager",
"crates/search_manager", "crates/search_manager",
"crates/stock_manager", "crates/stock_manager",

View File

@ -19,7 +19,6 @@ json = { version = "0.12.4" }
model = { path = "../model", features = ['db'] } model = { path = "../model", features = ['db'] }
opentelemetry = { version = "0.17.0" } opentelemetry = { version = "0.17.0" }
opentelemetry-jaeger = { version = "0.17.0" } opentelemetry-jaeger = { version = "0.17.0" }
pretty_env_logger = { version = "0.4", features = [] }
rumqttc = { version = "*" } rumqttc = { version = "*" }
serde = { version = "1.0.137", features = ["derive"] } serde = { version = "1.0.137", features = ["derive"] }
sqlx = { version = "0.6.2", features = ["migrate", "runtime-actix-rustls", "all-types", "postgres"] } sqlx = { version = "0.6.2", features = ["migrate", "runtime-actix-rustls", "all-types", "postgres"] }

View File

@ -4,14 +4,15 @@ CREATE TYPE "AccountState" AS ENUM (
'active', 'active',
'suspended', 'suspended',
'banned' 'banned'
); );
CREATE TYPE "Role" AS ENUM ( CREATE TYPE "Role" AS ENUM (
'admin', 'admin',
'user' 'user'
); );
CREATE TABLE public.accounts ( CREATE TABLE public.accounts
(
id serial NOT NULL, id serial NOT NULL,
email character varying NOT NULL, email character varying NOT NULL,
login character varying NOT NULL, login character varying NOT NULL,

View File

@ -1,4 +1,5 @@
CREATE TABLE public.account_addresses ( CREATE TABLE public.account_addresses
(
id serial NOT NULL, id serial NOT NULL,
name text NOT NULL, name text NOT NULL,
email text NOT NULL, email text NOT NULL,

View File

@ -1,21 +1,22 @@
CREATE TYPE "PaymentMethod" AS ENUM ( CREATE TYPE "PaymentMethod" AS ENUM (
'pay_u', 'pay_u',
'payment_on_the_spot' 'payment_on_the_spot'
); );
CREATE TYPE "ShoppingCartState" AS ENUM ( CREATE TYPE "ShoppingCartState" AS ENUM (
'active', 'active',
'closed' 'closed'
); );
CREATE TYPE "QuantityUnit" AS ENUM ( CREATE TYPE "QuantityUnit" AS ENUM (
'g', 'g',
'dkg', 'dkg',
'kg', 'kg',
'piece' 'piece'
); );
CREATE TABLE shopping_carts ( CREATE TABLE shopping_carts
(
id serial NOT NULL, id serial NOT NULL,
buyer_id integer NOT NULL, buyer_id integer NOT NULL,
payment_method "PaymentMethod" DEFAULT 'payment_on_the_spot'::"PaymentMethod" NOT NULL, payment_method "PaymentMethod" DEFAULT 'payment_on_the_spot'::"PaymentMethod" NOT NULL,
@ -23,7 +24,8 @@ CREATE TABLE shopping_carts (
checkout_notes text checkout_notes text
); );
CREATE TABLE shopping_cart_items ( CREATE TABLE shopping_cart_items
(
id serial NOT NULL, id serial NOT NULL,
product_variant_id integer NOT NULL, product_variant_id integer NOT NULL,
product_id integer NOT NULL, product_id integer NOT NULL,

View File

@ -9,7 +9,8 @@ carts = []
emails = [] emails = []
search = [] search = []
stocks = [] stocks = []
default = ['accounts', 'carts', 'emails', 'search', 'stocks'] orders = []
default = ['accounts', 'carts', 'emails', 'search', 'stocks', 'orders']
[dependencies] [dependencies]
bincode = { version = "*" } bincode = { version = "*" }
@ -19,9 +20,9 @@ futures = { version = "0.3.25" }
model = { path = "../model" } model = { path = "../model" }
rumqttc = { version = "0.17.0" } rumqttc = { version = "0.17.0" }
serde = { version = "*", features = ['derive'] } serde = { version = "*", features = ['derive'] }
strum = { version = "0.24.1", features = ['strum_macros', 'default', 'derive'] }
tarpc = { version = "0.31.0", features = ["tokio1", "serde-transport-bincode", "serde-transport", "serde", "serde-transport-json", "tcp"] } tarpc = { version = "0.31.0", features = ["tokio1", "serde-transport-bincode", "serde-transport", "serde", "serde-transport-json", "tcp"] }
thiserror = { version = "1.0.37" } thiserror = { version = "1.0.37" }
tokio = { version = "1.21.2", features = ['full'] } tokio = { version = "1.21.2", features = ['full'] }
tracing = { version = "0.1.37" } tracing = { version = "0.1.37" }
whatlang = { version = "0.16.2" } whatlang = { version = "0.16.2" }
strum = { version = "0.24.1", features = ['strum_macros', 'default', 'derive'] }

View File

@ -9,6 +9,8 @@ pub mod carts;
#[cfg(feature = "emails")] #[cfg(feature = "emails")]
pub mod emails; pub mod emails;
pub mod mqtt; pub mod mqtt;
#[cfg(feature = "orders")]
pub mod orders;
pub mod rpc; pub mod rpc;
#[cfg(feature = "search")] #[cfg(feature = "search")]
pub mod search; pub mod search;

View File

@ -0,0 +1,128 @@
use model::Order;
use rumqttc::QoS;
use crate::AsyncClient;
pub static CLIENT_NAME: &str = "orders";
#[derive(Debug, thiserror::Error, serde::Serialize, serde::Deserialize)]
pub enum Error {
#[error("Something went wrong")]
InternalServerError,
#[error("Failed to create order")]
CreateOrder,
#[error("Failed to create order item")]
CreateOrderItem,
}
#[derive(Debug, Copy, Clone)]
pub enum Topic {
OrderCreated,
}
impl Topic {
pub fn to_str(self) -> &'static str {
match self {
Topic::OrderCreated => "order/created",
}
}
}
impl Into<String> for Topic {
fn into(self) -> String {
self.to_str().into()
}
}
impl<'s> PartialEq<&'s str> for Topic {
fn eq(&self, other: &&'s str) -> bool {
self.to_str() == *other
}
}
impl PartialEq<String> for Topic {
fn eq(&self, other: &String) -> bool {
self.to_str() == other.as_str()
}
}
impl AsyncClient {
pub async fn emit_order_created(&self, order: &Order) {
self.publish_or_log(Topic::OrderCreated, QoS::AtLeastOnce, true, order)
.await
}
}
pub mod create_order {
use model::{
AccountId, Order, OrderAddressId, OrderItem, ProductId, Quantity, QuantityUnit,
ShoppingCartId,
};
pub use super::Error;
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct OrderItemInput {
pub product_id: ProductId,
pub quantity: Quantity,
pub quantity_unit: QuantityUnit,
}
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct Input {
pub buyer_id: AccountId,
pub items: Vec<OrderItemInput>,
pub shopping_cart_id: Option<ShoppingCartId>,
pub checkout_notes: Option<String>,
pub delivery_address_id: OrderAddressId,
}
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct Details {
pub order: Order,
pub order_items: Vec<OrderItem>,
}
pub type Output = Result<Details, Error>;
}
pub mod rpc {
use config::SharedAppConfig;
use crate::orders::create_order;
#[tarpc::service]
pub trait Orders {
async fn create_order(input: create_order::Input) -> create_order::Output;
}
pub async fn create_client(config: SharedAppConfig) -> OrdersClient {
use tarpc::client;
use tarpc::tokio_serde::formats::Bincode;
let l = config.lock();
let addr = l.orders_manager().rpc_addr();
let transport = tarpc::serde_transport::tcp::connect(addr, Bincode::default);
let client = OrdersClient::new(
client::Config::default(),
transport.await.expect("Failed to connect to server"),
)
.spawn();
client
}
}
pub mod mqtt {
use config::SharedAppConfig;
use rumqttc::EventLoop;
use crate::orders::CLIENT_NAME;
use crate::AsyncClient;
pub fn create_client(config: SharedAppConfig) -> (AsyncClient, EventLoop) {
crate::mqtt::create_client(CLIENT_NAME, config.lock().stocks_manager().mqtt_addr())
}
}

View File

@ -551,6 +551,39 @@ impl StocksConfig {
} }
} }
#[derive(Serialize, Deserialize)]
pub struct OrderConfig {
pub rpc_port: u16,
pub rpc_bind: String,
pub mqtt_port: u16,
pub mqtt_bind: String,
pub database_url: String,
}
impl Example for OrderConfig {}
impl Default for OrderConfig {
fn default() -> Self {
Self {
rpc_port: 19334,
rpc_bind: "0.0.0.0".into(),
mqtt_port: 1887,
mqtt_bind: "0.0.0.0".into(),
database_url: "postgres://postgres@localhost/bazzar_orders".into(),
}
}
}
impl OrderConfig {
pub fn rpc_addr(&self) -> (&str, u16) {
(&self.rpc_bind, self.rpc_port)
}
pub fn mqtt_addr(&self) -> (&str, u16) {
(&self.mqtt_bind, self.mqtt_port)
}
}
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
pub struct AppConfig { pub struct AppConfig {
#[serde(default)] #[serde(default)]
@ -573,6 +606,8 @@ pub struct AppConfig {
email_sender: EmailSenderConfig, email_sender: EmailSenderConfig,
#[serde(default)] #[serde(default)]
stocks: StocksConfig, stocks: StocksConfig,
#[serde(default)]
order_manager: OrderConfig,
#[serde(skip)] #[serde(skip)]
config_path: String, config_path: String,
} }
@ -590,6 +625,7 @@ impl Example for AppConfig {
cart_manager: CartManagerConfig::example(), cart_manager: CartManagerConfig::example(),
email_sender: EmailSenderConfig::example(), email_sender: EmailSenderConfig::example(),
stocks: StocksConfig::example(), stocks: StocksConfig::example(),
order_manager: OrderConfig::example(),
config_path: "".to_string(), config_path: "".to_string(),
} }
} }
@ -651,6 +687,10 @@ impl AppConfig {
pub fn stocks_manager(&self) -> &StocksConfig { pub fn stocks_manager(&self) -> &StocksConfig {
&self.stocks &self.stocks
} }
pub fn orders_manager(&self) -> &OrderConfig {
&self.order_manager
}
} }
impl Default for AppConfig { impl Default for AppConfig {
@ -666,6 +706,7 @@ impl Default for AppConfig {
cart_manager: CartManagerConfig::default(), cart_manager: CartManagerConfig::default(),
email_sender: EmailSenderConfig::default(), email_sender: EmailSenderConfig::default(),
stocks: StocksConfig::default(), stocks: StocksConfig::default(),
order_manager: OrderConfig::default(),
config_path: "".to_string(), config_path: "".to_string(),
} }
} }

View File

@ -26,5 +26,5 @@ tracing = { version = "0.1.34" }
uuid = { version = "1.2.1", features = ["serde"] } uuid = { version = "1.2.1", features = ["serde"] }
[dev-dependencies] [dev-dependencies]
testx = { path = "../testx" }
fake = { version = "2.5.0" } fake = { version = "2.5.0" }
testx = { path = "../testx" }

View File

@ -4,7 +4,9 @@ version = "0.1.0"
edition = "2021" edition = "2021"
[dependencies] [dependencies]
account-manager = { path = '../account_manager' }
bytes = { version = "1.1.0" } bytes = { version = "1.1.0" }
channels = { path = '../channels' }
config = { path = "../config" } config = { path = "../config" }
dotenv = { version = "0.15", features = [] } dotenv = { version = "0.15", features = [] }
fakeit = { version = "1.1.1", features = [] } fakeit = { version = "1.1.1", features = [] }
@ -13,10 +15,8 @@ human-panic = { version = "1.0.3" }
model = { path = "../model", version = "0.1", features = ["db", "dummy"] } model = { path = "../model", version = "0.1", features = ["db", "dummy"] }
password-hash = { version = "0.4", features = ["alloc"] } password-hash = { version = "0.4", features = ["alloc"] }
rand = { version = "0.8.5" } rand = { version = "0.8.5" }
stock-manager = { path = "../stock_manager" }
thiserror = { version = "1.0.31" } thiserror = { version = "1.0.31" }
tokio = { version = "1.18.1", features = ["full"] } tokio = { version = "1.18.1", features = ["full"] }
tracing = { version = "0.1.34" } tracing = { version = "0.1.34" }
tracing-subscriber = { version = "0.3.11" } tracing-subscriber = { version = "0.3.11" }
account-manager = { path = '../account_manager' }
stock-manager = { path = "../stock_manager" }
channels = { path = '../channels' }

View File

@ -1,5 +1,31 @@
use sqlx::Arguments; use sqlx::Arguments;
#[macro_export]
macro_rules! begin_t {
($db: ident, $err: expr) => {
match $db.pool().begin().await {
Err(e) => {
tracing::warn!("{}", e);
return Err($err);
}
Ok(t) => t,
}
};
}
#[macro_export]
macro_rules! dbm_run {
($dbm: ident, $t: expr, $err: expr) => {
match $dbm.run($t).await {
Ok(res) => res,
Err(e) => {
tracing::warn!("{}", e);
return Err($err);
}
}
};
}
pub type PgT<'l> = sqlx::Transaction<'l, sqlx::Postgres>; pub type PgT<'l> = sqlx::Transaction<'l, sqlx::Postgres>;
pub struct Padding { pub struct Padding {

View File

@ -62,6 +62,18 @@ impl CategoryMapper {
} }
} }
#[cfg_attr(feature = "db", derive(sqlx::Type))]
#[cfg_attr(feature = "db", sqlx(rename_all = "snake_case"))]
#[derive(Copy, Clone, Debug, Hash, PartialEq, Eq, Default, Display, Deserialize, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum OrderItemState {
#[default]
#[display(fmt = "Poprawny")]
Valid,
#[display(fmt = "Niedostępny")]
OutOfStock,
}
#[cfg_attr(feature = "db", derive(sqlx::Type))] #[cfg_attr(feature = "db", derive(sqlx::Type))]
#[cfg_attr(feature = "db", sqlx(rename_all = "snake_case"))] #[cfg_attr(feature = "db", sqlx(rename_all = "snake_case"))]
#[derive(Copy, Clone, Debug, Hash, PartialEq, Eq, Display, Deserialize, Serialize)] #[derive(Copy, Clone, Debug, Hash, PartialEq, Eq, Display, Deserialize, Serialize)]
@ -1016,6 +1028,7 @@ pub struct OrderItem {
pub order_id: OrderId, pub order_id: OrderId,
pub quantity: Quantity, pub quantity: Quantity,
pub quantity_unit: QuantityUnit, pub quantity_unit: QuantityUnit,
pub state: OrderItemState,
} }
#[cfg_attr(feature = "db", derive(sqlx::Type))] #[cfg_attr(feature = "db", derive(sqlx::Type))]

View File

@ -3,18 +3,28 @@ name = "order_manager"
version = "0.1.0" version = "0.1.0"
edition = "2021" edition = "2021"
[[bin]]
name = "order-manager"
path = "./src/main.rs"
[dependencies] [dependencies]
actix = { version = "0.13", features = [] } channels = { path = "../channels" }
actix-rt = { version = "2.7", features = [] }
chrono = { version = "0.4", features = ["serde"] } chrono = { version = "0.4", features = ["serde"] }
config = { path = "../config" } config = { path = "../config" }
database_manager = { path = "../database_manager" } db-utils = { path = "../db-utils" }
model = { path = "../model", features = ["db"] } model = { path = "../model", features = ["db"] }
pretty_env_logger = { version = "0.4", features = [] } opentelemetry = { version = "0.17.0" }
opentelemetry-jaeger = { version = "0.17.0" }
rumqttc = { version = "*" } rumqttc = { version = "*" }
serde = { version = "1.0.137", features = ["derive"] } serde = { version = "1.0.137", features = ["derive"] }
sqlx = { version = "0.6.2", features = ["migrate", "runtime-actix-rustls", "all-types", "postgres"] }
sqlx-core = { version = "0.6.2", features = [] }
tarpc = { version = "0.31.0", features = ["tokio1", "serde-transport-bincode", "serde-transport", "serde", "serde-transport-json", "tcp"] }
thiserror = { version = "1.0.31" } thiserror = { version = "1.0.31" }
tracing = { version = "0.1.34" } tokio = { version = "1.21.2", features = ['full'] }
tracing = { version = "0.1.6" }
tracing-opentelemetry = { version = "0.17.4" }
tracing-subscriber = { version = "0.3.16", features = ["env-filter"] }
uuid = { version = "1.2.1", features = ["serde"] } uuid = { version = "1.2.1", features = ["serde"] }
[dev-dependencies] [dev-dependencies]

View File

@ -0,0 +1,56 @@
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
CREATE TYPE "OrderStatus" AS ENUM (
'confirmed',
'cancelled',
'delivered',
'payed',
'require_refund',
'refunded'
);
CREATE TYPE "QuantityUnit" AS ENUM (
'g',
'dkg',
'kg',
'piece'
);
CREATE TYPE "OrderItemState" AS ENUM (
'valid',
'out_of_stock'
);
CREATE TABLE orders
(
id serial NOT NULL PRIMARY KEY,
buyer_id integer,
status "OrderStatus" DEFAULT 'confirmed'::"OrderStatus" NOT NULL,
order_ext_id uuid DEFAULT uuid_generate_v4() NOT NULL,
service_order_id text,
checkout_notes text,
address_id integer
);
CREATE TABLE order_addresses
(
id serial NOT NULL PRIMARY KEY,
name text NOT NULL,
email text NOT NULL,
street text NOT NULL,
city text NOT NULL,
country text NOT NULL,
zip text NOT NULL,
phone text NOT NULL
);
CREATE TABLE order_items
(
id serial NOT NULL PRIMARY KEY,
product_id integer NOT NULL,
order_id integer references orders (id) ON DELETE CASCADE NOT NULL,
quantity integer DEFAULT 0 NOT NULL,
quantity_unit "QuantityUnit" NOT NULL,
state "OrderItemState" DEFAULT 'valid'::"OrderItemState" NOT NULL,
CONSTRAINT positive_quantity CHECK ((quantity >= 0))
);

View File

@ -0,0 +1,3 @@
pub mod order;
pub mod order_address;
pub mod order_item;

View File

@ -0,0 +1,82 @@
use channels::orders::{create_order, Error};
use channels::AsyncClient;
use config::SharedAppConfig;
use db_utils::{begin_t, dbm_run, PgT};
use model::OrderStatus;
use crate::db::{CreateOrder, CreateOrderItem, Database};
pub async fn create_order(
input: create_order::Input,
db: Database,
mqtt: AsyncClient,
_config: SharedAppConfig,
) -> create_order::Output {
let mut t = begin_t!(db, Error::InternalServerError);
match inner_create_order(input, &mut t).await {
Ok(order) => {
if let Err(e) = t.commit().await {
tracing::error!("{}", e);
Err(Error::InternalServerError)
} else {
mqtt.emit_order_created(&order.order);
Ok(order)
}
}
Err(e) => {
tracing::error!("{}", e);
t.rollback().await.ok();
Err(Error::CreateOrder)
}
}
}
pub async fn inner_create_order(
input: create_order::Input,
t: &mut PgT<'_>,
) -> create_order::Output {
let dbm = CreateOrder {
buyer_id: input.buyer_id,
shopping_cart_id: input.shopping_cart_id,
checkout_notes: input.checkout_notes,
delivery_address_id: input.delivery_address_id,
};
let order = dbm_run!(dbm, &mut *t, Error::CreateOrder);
let mut order_items = Vec::with_capacity(input.items.len());
for item in input.items {
let dbm = CreateOrderItem {
order_id: order.id,
product_id: item.product_id,
quantity: item.quantity,
quantity_unit: item.quantity_unit,
};
let item = dbm_run!(dbm, &mut *t, Error::CreateOrderItem);
order_items.push(item);
}
Ok(create_order::Details { order, order_items })
}
pub fn change(current: OrderStatus, next: OrderStatus) -> Option<OrderStatus> {
match (current, next) {
// paying
(OrderStatus::Confirmed, OrderStatus::Payed) => Some(OrderStatus::Payed),
// delivering
(OrderStatus::Confirmed | OrderStatus::Payed, OrderStatus::Delivered) => {
Some(OrderStatus::Delivered)
}
// cancelling
(OrderStatus::Confirmed, OrderStatus::Cancelled) => Some(OrderStatus::Cancelled),
(OrderStatus::Payed, OrderStatus::Cancelled) => Some(OrderStatus::RequireRefund),
(OrderStatus::Payed, OrderStatus::RequireRefund) => Some(OrderStatus::RequireRefund),
(OrderStatus::RequireRefund, OrderStatus::Refunded) => Some(OrderStatus::Refunded),
_ => None,
}
}

View File

View File

@ -0,0 +1,45 @@
pub mod order;
pub mod order_address;
pub mod order_item;
use config::SharedAppConfig;
pub use order::*;
pub use order_address::*;
pub use order_item::*;
use sqlx_core::pool::Pool;
use sqlx_core::postgres::Postgres;
#[derive(Clone)]
pub struct Database {
pub pool: sqlx::PgPool,
_config: SharedAppConfig,
}
impl Database {
pub async fn build(config: SharedAppConfig) -> Self {
let url = config.lock().orders_manager().database_url.clone();
let pool = sqlx::PgPool::connect(&url).await.unwrap_or_else(|e| {
tracing::error!("Failed to connect to database. {e:?}");
std::process::exit(1);
});
Self {
pool,
_config: config,
}
}
pub fn pool(&self) -> Pool<Postgres> {
self.pool.clone()
}
}
#[derive(Debug)]
pub struct CreateOrderAddress {
pub name: model::Name,
pub email: model::Email,
pub phone: model::Phone,
pub street: model::Street,
pub city: model::City,
pub country: model::Country,
pub zip: model::Zip,
}

View File

@ -0,0 +1,58 @@
use model::{AccountId, Order, OrderAddressId, OrderStatus, ShoppingCartId};
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("Can't create order item")]
CantCreate,
}
pub type Result<T> = std::result::Result<T, Error>;
#[derive(Debug)]
pub struct CreateOrder {
pub buyer_id: AccountId,
pub shopping_cart_id: Option<ShoppingCartId>,
pub checkout_notes: Option<String>,
pub delivery_address_id: OrderAddressId,
}
impl CreateOrder {
pub async fn run(self, t: &mut sqlx::Transaction<'_, sqlx::Postgres>) -> Result<Order> {
sqlx::query_as(
r#"
INSERT INTO orders (buyer_id, status, checkout_notes, address_id)
VALUES ($1, $2, $3, $4)
RETURNING id, buyer_id, status, order_ext_id, service_order_id, checkout_notes, address_id
"#,
)
.bind(self.buyer_id)
.bind(OrderStatus::Confirmed)
.bind(self.checkout_notes.as_deref())
.bind(self.delivery_address_id)
.fetch_one(&mut *t)
.await
.map_err(|e| {
tracing::error!("{e:?}");
dbg!(e);
Error::CantCreate
})
// if let Some(shopping_cart_id) = msg.shopping_cart_id {
// if let Err(e) = shopping_cart_set_state(
// ShoppingCartSetState {
// id: shopping_cart_id,
// state: ShoppingCartState::Closed,
// checkout_notes: msg.checkout_notes,
// },
// t,
// )
// .await
// {
// dbg!(e);
// tracing::error!("{e:?}");
//
// return Err(super::Error::AccountOrder(Error::CantCreate));
// };
// }
}
}

View File

@ -0,0 +1,40 @@
use model::{OrderId, OrderItem, ProductId, Quantity, QuantityUnit};
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("Can't create order item")]
CantCreate,
}
pub type Result<T> = std::result::Result<T, Error>;
#[derive(Debug)]
pub struct CreateOrderItem {
pub order_id: OrderId,
pub product_id: ProductId,
pub quantity: Quantity,
pub quantity_unit: QuantityUnit,
}
impl CreateOrderItem {
pub async fn run(self, t: &mut sqlx::Transaction<'_, sqlx::Postgres>) -> Result<OrderItem> {
sqlx::query_as(
r#"
INSERT INTO order_items (product_id, order_id, quantity, quantity_unit)
VALUES ($1, $2, $3, $4)
RETURNING id, product_id, order_id, quantity, quantity_unit
"#,
)
.bind(self.product_id)
.bind(self.order_id)
.bind(self.quantity)
.bind(self.quantity_unit)
.fetch_one(t)
.await
.map_err(|e| {
tracing::error!("{e:?}");
dbg!(e);
Error::CantCreate
})
}
}

View File

@ -0,0 +1,22 @@
use config::UpdateConfig;
mod actions;
mod db;
mod mqtt;
mod rpc;
pub struct Opts {}
impl UpdateConfig for Opts {}
#[tokio::main]
async fn main() {
let opts = Opts {};
let config = config::default_load(&opts);
let db = db::Database::build(config.clone()).await;
let mqtt_client = mqtt::start(config.clone(), db.clone()).await;
rpc::start(config, db, mqtt_client).await;
}

View File

@ -0,0 +1,30 @@
use config::SharedAppConfig;
use rumqttc::{Event, Incoming};
use crate::db::Database;
pub async fn start(config: SharedAppConfig, _db: Database) -> channels::AsyncClient {
let (client, mut event_loop) = channels::orders::mqtt::create_client(config);
let spawn_client = client.clone();
tokio::spawn(async move {
let _client = spawn_client.clone();
loop {
let notification = event_loop.poll().await;
match notification {
Ok(Event::Incoming(Incoming::Publish(publish))) => match publish.topic.as_str() {
_ => {}
},
Ok(Event::Incoming(_incoming)) => {}
Ok(Event::Outgoing(_outgoing)) => {}
Err(e) => {
tracing::warn!("{}", e);
}
}
}
// tracing::info!("Mqtt channel closed");
});
client
}

View File

@ -0,0 +1,36 @@
use channels::orders::create_order::{Input, Output};
use channels::orders::rpc::Orders;
use channels::AsyncClient;
use config::SharedAppConfig;
use tarpc::context;
use crate::actions;
use crate::db::Database;
#[derive(Clone)]
pub struct OrdersServer {
pub db: Database,
pub mqtt_client: AsyncClient,
pub config: SharedAppConfig,
}
#[tarpc::server]
impl Orders for OrdersServer {
async fn create_order(self, _: context::Context, input: Input) -> Output {
actions::order::create_order(input, self.db, self.mqtt_client, self.config).await
}
}
pub async fn start(config: SharedAppConfig, db: Database, mqtt_client: AsyncClient) {
let port = { config.lock().stocks_manager().rpc_port };
channels::rpc::start("orders", port, || {
OrdersServer {
db: db.clone(),
config: config.clone(),
mqtt_client: mqtt_client.clone(),
}
.serve()
})
.await;
}

View File

@ -32,6 +32,6 @@ tracing-subscriber = { version = "0.3.16", features = ["env-filter"] }
uuid = { version = "1.2.1", features = ['v4'] } uuid = { version = "1.2.1", features = ['v4'] }
[dev-dependencies] [dev-dependencies]
testx = { path = "../testx" }
insta = { version = "1.21.0" }
fakeit = { version = "1.1.1" } fakeit = { version = "1.1.1" }
insta = { version = "1.21.0" }
testx = { path = "../testx" }

View File

@ -5,23 +5,26 @@ CREATE TYPE "QuantityUnit" AS ENUM (
'dkg', 'dkg',
'kg', 'kg',
'piece' 'piece'
); );
CREATE TABLE photos ( CREATE TABLE photos
(
id serial NOT NULL PRIMARY KEY, id serial NOT NULL PRIMARY KEY,
local_path character varying NOT NULL, local_path character varying NOT NULL,
file_name character varying NOT NULL, file_name character varying NOT NULL,
unique_name text DEFAULT (gen_random_uuid())::text NOT NULL unique_name text DEFAULT (gen_random_uuid())::text NOT NULL
); );
CREATE TABLE products ( CREATE TABLE products
(
id serial NOT NULL PRIMARY KEY, id serial NOT NULL PRIMARY KEY,
"name" character varying NOT NULL, "name" character varying NOT NULL,
category character varying, category character varying,
deliver_days_flag integer DEFAULT 127 NOT NULL deliver_days_flag integer DEFAULT 127 NOT NULL
); );
CREATE TABLE product_variants ( CREATE TABLE product_variants
(
id serial NOT NULL PRIMARY KEY, id serial NOT NULL PRIMARY KEY,
product_id integer REFERENCES products (id) ON DELETE CASCADE NOT NULL, product_id integer REFERENCES products (id) ON DELETE CASCADE NOT NULL,
"name" character varying NOT NULL, "name" character varying NOT NULL,
@ -32,21 +35,24 @@ CREATE TABLE product_variants (
CONSTRAINT non_negative CHECK ((price >= 0)) CONSTRAINT non_negative CHECK ((price >= 0))
); );
CREATE TABLE stocks ( CREATE TABLE stocks
(
id serial NOT NULL PRIMARY KEY, id serial NOT NULL PRIMARY KEY,
product_variant_id integer REFERENCES product_variants(id) ON DELETE CASCADE NOT NULL, product_variant_id integer REFERENCES product_variants (id) ON DELETE CASCADE NOT NULL,
quantity integer DEFAULT 0 NOT NULL, quantity integer DEFAULT 0 NOT NULL,
quantity_unit "QuantityUnit" NOT NULL, quantity_unit "QuantityUnit" NOT NULL,
CONSTRAINT positive_quantity CHECK ((quantity >= 0)) CONSTRAINT positive_quantity CHECK ((quantity >= 0))
); );
CREATE TABLE product_photos ( CREATE TABLE product_photos
(
id serial NOT NULL PRIMARY KEY, id serial NOT NULL PRIMARY KEY,
product_variant_id integer REFERENCES product_variants(id) ON DELETE CASCADE NOT NULL, product_variant_id integer REFERENCES product_variants (id) ON DELETE CASCADE NOT NULL,
photo_id integer REFERENCES photos(id) NOT NULL photo_id integer REFERENCES photos (id) NOT NULL
); );
CREATE TABLE categories ( CREATE TABLE categories
(
id serial NOT NULL PRIMARY KEY, id serial NOT NULL PRIMARY KEY,
parent_id int references categories (id) ON DELETE CASCADE, parent_id int references categories (id) ON DELETE CASCADE,
"name" varchar not null, "name" varchar not null,

View File

@ -1,28 +1,22 @@
use channels::stocks::create_category::{Input, Output};
use channels::stocks::rpc::Stocks; use channels::stocks::rpc::Stocks;
use channels::stocks::*;
use channels::AsyncClient; use channels::AsyncClient;
use config::SharedAppConfig; use config::SharedAppConfig;
use tarpc::context;
use crate::actions;
use crate::db::Database; use crate::db::Database;
use crate::rpc::rpc::StocksServer;
pub mod rpc { #[derive(Clone)]
use channels::stocks::create_category::{Input, Output}; pub struct StocksServer {
use channels::stocks::rpc::Stocks; pub db: Database,
use channels::stocks::*; pub mqtt_client: AsyncClient,
use config::SharedAppConfig;
use tarpc::context;
use crate::actions;
#[derive(Clone)]
pub struct StocksServer {
pub db: crate::db::Database,
pub mqtt_client: channels::AsyncClient,
pub config: SharedAppConfig, pub config: SharedAppConfig,
} }
#[tarpc::server] #[tarpc::server]
impl Stocks for StocksServer { impl Stocks for StocksServer {
async fn create_product( async fn create_product(
self, self,
_: context::Context, _: context::Context,
@ -170,7 +164,6 @@ pub mod rpc {
) -> all_categories::Output { ) -> all_categories::Output {
actions::all_categories(input, self.db, self.mqtt_client, self.config).await actions::all_categories(input, self.db, self.mqtt_client, self.config).await
} }
}
} }
pub async fn start(config: SharedAppConfig, db: Database, mqtt_client: AsyncClient) { pub async fn start(config: SharedAppConfig, db: Database, mqtt_client: AsyncClient) {

View File

@ -7,6 +7,7 @@ then
psql postgres postgres -c "DROP DATABASE ${DATABASE_NAME}_accounts" || echo 0 psql postgres postgres -c "DROP DATABASE ${DATABASE_NAME}_accounts" || echo 0
psql postgres postgres -c "DROP DATABASE ${DATABASE_NAME}_carts" || echo 0 psql postgres postgres -c "DROP DATABASE ${DATABASE_NAME}_carts" || echo 0
psql postgres postgres -c "DROP DATABASE ${DATABASE_NAME}_stocks" || echo 0 psql postgres postgres -c "DROP DATABASE ${DATABASE_NAME}_stocks" || echo 0
psql postgres postgres -c "DROP DATABASE ${DATABASE_NAME}_orders" || echo 0
fi fi
psql postgres postgres -c "CREATE DATABASE ${DATABASE_NAME}_accounts" || echo 0 psql postgres postgres -c "CREATE DATABASE ${DATABASE_NAME}_accounts" || echo 0
@ -17,3 +18,6 @@ sqlx migrate run -D "${CART_DATABASE_URL}" --source ./crates/cart_manager/migrat
psql postgres postgres -c "CREATE DATABASE ${DATABASE_NAME}_stocks" || echo 0 psql postgres postgres -c "CREATE DATABASE ${DATABASE_NAME}_stocks" || echo 0
sqlx migrate run -D "${STOCK_DATABASE_URL}" --source ./crates/stock_manager/migrations sqlx migrate run -D "${STOCK_DATABASE_URL}" --source ./crates/stock_manager/migrations
psql postgres postgres -c "CREATE DATABASE ${DATABASE_NAME}_orders" || echo 0
sqlx migrate run -D "${ORDER_DATABASE_URL}" --source ./crates/order_manager/migrations