diff --git a/.env b/.env index f188f83..f0a8719 100644 --- a/.env +++ b/.env @@ -3,6 +3,7 @@ DATABASE_URL=postgres://postgres@localhost/bazzar ACCOUNT_DATABASE_URL=postgres://postgres@localhost/bazzar_accounts CART_DATABASE_URL=postgres://postgres@localhost/bazzar_carts STOCK_DATABASE_URL=postgres://postgres@localhost/bazzar_stocks +ORDER_DATABASE_URL=postgres://postgres@localhost/bazzar_orders PASS_SALT=18CHwV7eGFAea16z+qMKZg RUST_LOG=debug diff --git a/Cargo.lock b/Cargo.lock index c59a5bd..cd8142e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -18,7 +18,6 @@ dependencies = [ "model", "opentelemetry 0.17.0", "opentelemetry-jaeger", - "pretty_env_logger", "rumqttc", "serde", "sqlx", @@ -2444,6 +2443,32 @@ dependencies = [ "thiserror", ] +[[package]] +name = "order_manager" +version = "0.1.0" +dependencies = [ + "channels", + "chrono", + "config", + "db-utils", + "fake", + "model", + "opentelemetry 0.17.0", + "opentelemetry-jaeger", + "rumqttc", + "serde", + "sqlx", + "sqlx-core", + "tarpc", + "testx", + "thiserror", + "tokio", + "tracing", + "tracing-opentelemetry", + "tracing-subscriber", + "uuid 1.2.1", +] + [[package]] name = "ordered-float" version = "1.1.1" diff --git a/Cargo.toml b/Cargo.toml index a541842..1a269b8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,7 @@ members = [ "crates/cart_manager", # "crates/database_manager", "crates/email_manager", -# "crates/order_manager", + "crates/order_manager", # "crates/payment_manager", "crates/search_manager", "crates/stock_manager", diff --git a/crates/account_manager/Cargo.toml b/crates/account_manager/Cargo.toml index 6a7c490..8dd7e59 100644 --- a/crates/account_manager/Cargo.toml +++ b/crates/account_manager/Cargo.toml @@ -19,7 +19,6 @@ json = { version = "0.12.4" } model = { path = "../model", features = ['db'] } opentelemetry = { version = "0.17.0" } opentelemetry-jaeger = { version = "0.17.0" } -pretty_env_logger = { version = "0.4", features = [] } rumqttc = { version = "*" } serde = { version = "1.0.137", features = ["derive"] } sqlx = { version = "0.6.2", features = ["migrate", "runtime-actix-rustls", "all-types", "postgres"] } diff --git a/crates/account_manager/migrations/202204131841_init.sql b/crates/account_manager/migrations/202204131841_init.sql index 1dd5311..8e54c83 100644 --- a/crates/account_manager/migrations/202204131841_init.sql +++ b/crates/account_manager/migrations/202204131841_init.sql @@ -4,19 +4,20 @@ CREATE TYPE "AccountState" AS ENUM ( 'active', 'suspended', 'banned' -); + ); CREATE TYPE "Role" AS ENUM ( 'admin', 'user' -); + ); -CREATE TABLE public.accounts ( - id serial NOT NULL, - email character varying NOT NULL, - login character varying NOT NULL, - pass_hash character varying NOT NULL, - role "Role" DEFAULT 'user'::"Role" NOT NULL, - customer_id uuid DEFAULT gen_random_uuid() NOT NULL, - state "AccountState" DEFAULT 'active'::"AccountState" NOT NULL +CREATE TABLE public.accounts +( + id serial NOT NULL, + email character varying NOT NULL, + login character varying NOT NULL, + pass_hash character varying NOT NULL, + role "Role" DEFAULT 'user'::"Role" NOT NULL, + customer_id uuid DEFAULT gen_random_uuid() NOT NULL, + state "AccountState" DEFAULT 'active'::"AccountState" NOT NULL ); diff --git a/crates/account_manager/migrations/202204131842_addresses.sql b/crates/account_manager/migrations/202204131842_addresses.sql index c4cabac..ea595c9 100644 --- a/crates/account_manager/migrations/202204131842_addresses.sql +++ b/crates/account_manager/migrations/202204131842_addresses.sql @@ -1,12 +1,13 @@ -CREATE TABLE public.account_addresses ( - id serial NOT NULL, - name text NOT NULL, - email text NOT NULL, - street text NOT NULL, - city text NOT NULL, - country text NOT NULL, - zip text NOT NULL, +CREATE TABLE public.account_addresses +( + id serial NOT NULL, + name text NOT NULL, + email text NOT NULL, + street text NOT NULL, + city text NOT NULL, + country text NOT NULL, + zip text NOT NULL, account_id integer, - is_default boolean DEFAULT false NOT NULL, - phone text DEFAULT ''::text NOT NULL + is_default boolean DEFAULT false NOT NULL, + phone text DEFAULT ''::text NOT NULL ); diff --git a/crates/cart_manager/migrations/202204131841_init.sql b/crates/cart_manager/migrations/202204131841_init.sql index 1820be8..20c0575 100644 --- a/crates/cart_manager/migrations/202204131841_init.sql +++ b/crates/cart_manager/migrations/202204131841_init.sql @@ -1,34 +1,36 @@ CREATE TYPE "PaymentMethod" AS ENUM ( 'pay_u', 'payment_on_the_spot' -); + ); CREATE TYPE "ShoppingCartState" AS ENUM ( 'active', 'closed' -); + ); CREATE TYPE "QuantityUnit" AS ENUM ( 'g', 'dkg', 'kg', 'piece' -); + ); -CREATE TABLE shopping_carts ( - id serial NOT NULL, - buyer_id integer NOT NULL, - payment_method "PaymentMethod" DEFAULT 'payment_on_the_spot'::"PaymentMethod" NOT NULL, - state "ShoppingCartState" DEFAULT 'active'::"ShoppingCartState" NOT NULL, +CREATE TABLE shopping_carts +( + id serial NOT NULL, + buyer_id integer NOT NULL, + payment_method "PaymentMethod" DEFAULT 'payment_on_the_spot'::"PaymentMethod" NOT NULL, + state "ShoppingCartState" DEFAULT 'active'::"ShoppingCartState" NOT NULL, checkout_notes text ); -CREATE TABLE shopping_cart_items ( - id serial NOT NULL, - product_variant_id integer NOT NULL, - product_id integer NOT NULL, - shopping_cart_id integer, - quantity integer DEFAULT 0 NOT NULL, - quantity_unit "QuantityUnit" NOT NULL, +CREATE TABLE shopping_cart_items +( + id serial NOT NULL, + product_variant_id integer NOT NULL, + product_id integer NOT NULL, + shopping_cart_id integer, + quantity integer DEFAULT 0 NOT NULL, + quantity_unit "QuantityUnit" NOT NULL, CONSTRAINT positive_quantity CHECK ((quantity >= 0)) ); diff --git a/crates/channels/Cargo.toml b/crates/channels/Cargo.toml index 06a480a..b232ed8 100644 --- a/crates/channels/Cargo.toml +++ b/crates/channels/Cargo.toml @@ -9,7 +9,8 @@ carts = [] emails = [] search = [] stocks = [] -default = ['accounts', 'carts', 'emails', 'search', 'stocks'] +orders = [] +default = ['accounts', 'carts', 'emails', 'search', 'stocks', 'orders'] [dependencies] bincode = { version = "*" } @@ -19,9 +20,9 @@ futures = { version = "0.3.25" } model = { path = "../model" } rumqttc = { version = "0.17.0" } serde = { version = "*", features = ['derive'] } +strum = { version = "0.24.1", features = ['strum_macros', 'default', 'derive'] } tarpc = { version = "0.31.0", features = ["tokio1", "serde-transport-bincode", "serde-transport", "serde", "serde-transport-json", "tcp"] } thiserror = { version = "1.0.37" } tokio = { version = "1.21.2", features = ['full'] } tracing = { version = "0.1.37" } whatlang = { version = "0.16.2" } -strum = { version = "0.24.1", features = ['strum_macros', 'default', 'derive'] } diff --git a/crates/channels/src/lib.rs b/crates/channels/src/lib.rs index 1484c98..3b31e3b 100644 --- a/crates/channels/src/lib.rs +++ b/crates/channels/src/lib.rs @@ -9,6 +9,8 @@ pub mod carts; #[cfg(feature = "emails")] pub mod emails; pub mod mqtt; +#[cfg(feature = "orders")] +pub mod orders; pub mod rpc; #[cfg(feature = "search")] pub mod search; diff --git a/crates/channels/src/orders.rs b/crates/channels/src/orders.rs new file mode 100644 index 0000000..6b877c6 --- /dev/null +++ b/crates/channels/src/orders.rs @@ -0,0 +1,128 @@ +use model::Order; +use rumqttc::QoS; + +use crate::AsyncClient; + +pub static CLIENT_NAME: &str = "orders"; + +#[derive(Debug, thiserror::Error, serde::Serialize, serde::Deserialize)] +pub enum Error { + #[error("Something went wrong")] + InternalServerError, + #[error("Failed to create order")] + CreateOrder, + #[error("Failed to create order item")] + CreateOrderItem, +} + +#[derive(Debug, Copy, Clone)] +pub enum Topic { + OrderCreated, +} + +impl Topic { + pub fn to_str(self) -> &'static str { + match self { + Topic::OrderCreated => "order/created", + } + } +} + +impl Into for Topic { + fn into(self) -> String { + self.to_str().into() + } +} + +impl<'s> PartialEq<&'s str> for Topic { + fn eq(&self, other: &&'s str) -> bool { + self.to_str() == *other + } +} + +impl PartialEq for Topic { + fn eq(&self, other: &String) -> bool { + self.to_str() == other.as_str() + } +} + +impl AsyncClient { + pub async fn emit_order_created(&self, order: &Order) { + self.publish_or_log(Topic::OrderCreated, QoS::AtLeastOnce, true, order) + .await + } +} + +pub mod create_order { + use model::{ + AccountId, Order, OrderAddressId, OrderItem, ProductId, Quantity, QuantityUnit, + ShoppingCartId, + }; + + pub use super::Error; + + #[derive(Debug, serde::Serialize, serde::Deserialize)] + pub struct OrderItemInput { + pub product_id: ProductId, + pub quantity: Quantity, + pub quantity_unit: QuantityUnit, + } + + #[derive(Debug, serde::Serialize, serde::Deserialize)] + pub struct Input { + pub buyer_id: AccountId, + pub items: Vec, + pub shopping_cart_id: Option, + pub checkout_notes: Option, + pub delivery_address_id: OrderAddressId, + } + + #[derive(Debug, serde::Serialize, serde::Deserialize)] + pub struct Details { + pub order: Order, + pub order_items: Vec, + } + + pub type Output = Result; +} + +pub mod rpc { + use config::SharedAppConfig; + + use crate::orders::create_order; + + #[tarpc::service] + pub trait Orders { + async fn create_order(input: create_order::Input) -> create_order::Output; + } + + pub async fn create_client(config: SharedAppConfig) -> OrdersClient { + use tarpc::client; + use tarpc::tokio_serde::formats::Bincode; + + let l = config.lock(); + let addr = l.orders_manager().rpc_addr(); + + let transport = tarpc::serde_transport::tcp::connect(addr, Bincode::default); + + let client = OrdersClient::new( + client::Config::default(), + transport.await.expect("Failed to connect to server"), + ) + .spawn(); + + client + } +} + +pub mod mqtt { + use config::SharedAppConfig; + use rumqttc::EventLoop; + + use crate::orders::CLIENT_NAME; + use crate::AsyncClient; + + pub fn create_client(config: SharedAppConfig) -> (AsyncClient, EventLoop) { + crate::mqtt::create_client(CLIENT_NAME, config.lock().stocks_manager().mqtt_addr()) + } +} diff --git a/crates/config/src/lib.rs b/crates/config/src/lib.rs index d9bc254..56e54b4 100644 --- a/crates/config/src/lib.rs +++ b/crates/config/src/lib.rs @@ -551,6 +551,39 @@ impl StocksConfig { } } +#[derive(Serialize, Deserialize)] +pub struct OrderConfig { + pub rpc_port: u16, + pub rpc_bind: String, + pub mqtt_port: u16, + pub mqtt_bind: String, + pub database_url: String, +} + +impl Example for OrderConfig {} + +impl Default for OrderConfig { + fn default() -> Self { + Self { + rpc_port: 19334, + rpc_bind: "0.0.0.0".into(), + mqtt_port: 1887, + mqtt_bind: "0.0.0.0".into(), + database_url: "postgres://postgres@localhost/bazzar_orders".into(), + } + } +} + +impl OrderConfig { + pub fn rpc_addr(&self) -> (&str, u16) { + (&self.rpc_bind, self.rpc_port) + } + + pub fn mqtt_addr(&self) -> (&str, u16) { + (&self.mqtt_bind, self.mqtt_port) + } +} + #[derive(Serialize, Deserialize)] pub struct AppConfig { #[serde(default)] @@ -573,6 +606,8 @@ pub struct AppConfig { email_sender: EmailSenderConfig, #[serde(default)] stocks: StocksConfig, + #[serde(default)] + order_manager: OrderConfig, #[serde(skip)] config_path: String, } @@ -590,6 +625,7 @@ impl Example for AppConfig { cart_manager: CartManagerConfig::example(), email_sender: EmailSenderConfig::example(), stocks: StocksConfig::example(), + order_manager: OrderConfig::example(), config_path: "".to_string(), } } @@ -651,6 +687,10 @@ impl AppConfig { pub fn stocks_manager(&self) -> &StocksConfig { &self.stocks } + + pub fn orders_manager(&self) -> &OrderConfig { + &self.order_manager + } } impl Default for AppConfig { @@ -666,6 +706,7 @@ impl Default for AppConfig { cart_manager: CartManagerConfig::default(), email_sender: EmailSenderConfig::default(), stocks: StocksConfig::default(), + order_manager: OrderConfig::default(), config_path: "".to_string(), } } diff --git a/crates/database_manager/Cargo.toml b/crates/database_manager/Cargo.toml index f85dd67..d85f72c 100644 --- a/crates/database_manager/Cargo.toml +++ b/crates/database_manager/Cargo.toml @@ -26,5 +26,5 @@ tracing = { version = "0.1.34" } uuid = { version = "1.2.1", features = ["serde"] } [dev-dependencies] -testx = { path = "../testx" } fake = { version = "2.5.0" } +testx = { path = "../testx" } diff --git a/crates/db-seed/Cargo.toml b/crates/db-seed/Cargo.toml index 5cb17a0..f1badfd 100644 --- a/crates/db-seed/Cargo.toml +++ b/crates/db-seed/Cargo.toml @@ -4,7 +4,9 @@ version = "0.1.0" edition = "2021" [dependencies] +account-manager = { path = '../account_manager' } bytes = { version = "1.1.0" } +channels = { path = '../channels' } config = { path = "../config" } dotenv = { version = "0.15", features = [] } fakeit = { version = "1.1.1", features = [] } @@ -13,10 +15,8 @@ human-panic = { version = "1.0.3" } model = { path = "../model", version = "0.1", features = ["db", "dummy"] } password-hash = { version = "0.4", features = ["alloc"] } rand = { version = "0.8.5" } +stock-manager = { path = "../stock_manager" } thiserror = { version = "1.0.31" } tokio = { version = "1.18.1", features = ["full"] } tracing = { version = "0.1.34" } tracing-subscriber = { version = "0.3.11" } -account-manager = { path = '../account_manager' } -stock-manager = { path = "../stock_manager" } -channels = { path = '../channels' } diff --git a/crates/db-utils/src/lib.rs b/crates/db-utils/src/lib.rs index cd2223a..c7dc15a 100644 --- a/crates/db-utils/src/lib.rs +++ b/crates/db-utils/src/lib.rs @@ -1,5 +1,31 @@ use sqlx::Arguments; +#[macro_export] +macro_rules! begin_t { + ($db: ident, $err: expr) => { + match $db.pool().begin().await { + Err(e) => { + tracing::warn!("{}", e); + return Err($err); + } + Ok(t) => t, + } + }; +} + +#[macro_export] +macro_rules! dbm_run { + ($dbm: ident, $t: expr, $err: expr) => { + match $dbm.run($t).await { + Ok(res) => res, + Err(e) => { + tracing::warn!("{}", e); + return Err($err); + } + } + }; +} + pub type PgT<'l> = sqlx::Transaction<'l, sqlx::Postgres>; pub struct Padding { diff --git a/crates/model/src/lib.rs b/crates/model/src/lib.rs index 02b233b..6604f55 100644 --- a/crates/model/src/lib.rs +++ b/crates/model/src/lib.rs @@ -62,6 +62,18 @@ impl CategoryMapper { } } +#[cfg_attr(feature = "db", derive(sqlx::Type))] +#[cfg_attr(feature = "db", sqlx(rename_all = "snake_case"))] +#[derive(Copy, Clone, Debug, Hash, PartialEq, Eq, Default, Display, Deserialize, Serialize)] +#[serde(rename_all = "snake_case")] +pub enum OrderItemState { + #[default] + #[display(fmt = "Poprawny")] + Valid, + #[display(fmt = "Niedostępny")] + OutOfStock, +} + #[cfg_attr(feature = "db", derive(sqlx::Type))] #[cfg_attr(feature = "db", sqlx(rename_all = "snake_case"))] #[derive(Copy, Clone, Debug, Hash, PartialEq, Eq, Display, Deserialize, Serialize)] @@ -1016,6 +1028,7 @@ pub struct OrderItem { pub order_id: OrderId, pub quantity: Quantity, pub quantity_unit: QuantityUnit, + pub state: OrderItemState, } #[cfg_attr(feature = "db", derive(sqlx::Type))] diff --git a/crates/order_manager/Cargo.toml b/crates/order_manager/Cargo.toml index 3b3ce8d..94a5e13 100644 --- a/crates/order_manager/Cargo.toml +++ b/crates/order_manager/Cargo.toml @@ -3,18 +3,28 @@ name = "order_manager" version = "0.1.0" edition = "2021" +[[bin]] +name = "order-manager" +path = "./src/main.rs" + [dependencies] -actix = { version = "0.13", features = [] } -actix-rt = { version = "2.7", features = [] } +channels = { path = "../channels" } chrono = { version = "0.4", features = ["serde"] } config = { path = "../config" } -database_manager = { path = "../database_manager" } +db-utils = { path = "../db-utils" } model = { path = "../model", features = ["db"] } -pretty_env_logger = { version = "0.4", features = [] } +opentelemetry = { version = "0.17.0" } +opentelemetry-jaeger = { version = "0.17.0" } rumqttc = { version = "*" } serde = { version = "1.0.137", features = ["derive"] } +sqlx = { version = "0.6.2", features = ["migrate", "runtime-actix-rustls", "all-types", "postgres"] } +sqlx-core = { version = "0.6.2", features = [] } +tarpc = { version = "0.31.0", features = ["tokio1", "serde-transport-bincode", "serde-transport", "serde", "serde-transport-json", "tcp"] } thiserror = { version = "1.0.31" } -tracing = { version = "0.1.34" } +tokio = { version = "1.21.2", features = ['full'] } +tracing = { version = "0.1.6" } +tracing-opentelemetry = { version = "0.17.4" } +tracing-subscriber = { version = "0.3.16", features = ["env-filter"] } uuid = { version = "1.2.1", features = ["serde"] } [dev-dependencies] diff --git a/crates/order_manager/migrations/202204131841_init.sql b/crates/order_manager/migrations/202204131841_init.sql new file mode 100644 index 0000000..d267b59 --- /dev/null +++ b/crates/order_manager/migrations/202204131841_init.sql @@ -0,0 +1,56 @@ +CREATE EXTENSION IF NOT EXISTS "uuid-ossp"; + +CREATE TYPE "OrderStatus" AS ENUM ( + 'confirmed', + 'cancelled', + 'delivered', + 'payed', + 'require_refund', + 'refunded' + ); + +CREATE TYPE "QuantityUnit" AS ENUM ( + 'g', + 'dkg', + 'kg', + 'piece' + ); + +CREATE TYPE "OrderItemState" AS ENUM ( + 'valid', + 'out_of_stock' + ); + +CREATE TABLE orders +( + id serial NOT NULL PRIMARY KEY, + buyer_id integer, + status "OrderStatus" DEFAULT 'confirmed'::"OrderStatus" NOT NULL, + order_ext_id uuid DEFAULT uuid_generate_v4() NOT NULL, + service_order_id text, + checkout_notes text, + address_id integer +); + +CREATE TABLE order_addresses +( + id serial NOT NULL PRIMARY KEY, + name text NOT NULL, + email text NOT NULL, + street text NOT NULL, + city text NOT NULL, + country text NOT NULL, + zip text NOT NULL, + phone text NOT NULL +); + +CREATE TABLE order_items +( + id serial NOT NULL PRIMARY KEY, + product_id integer NOT NULL, + order_id integer references orders (id) ON DELETE CASCADE NOT NULL, + quantity integer DEFAULT 0 NOT NULL, + quantity_unit "QuantityUnit" NOT NULL, + state "OrderItemState" DEFAULT 'valid'::"OrderItemState" NOT NULL, + CONSTRAINT positive_quantity CHECK ((quantity >= 0)) +); diff --git a/crates/order_manager/src/lib.rs b/crates/order_manager/src/_lib.rs similarity index 100% rename from crates/order_manager/src/lib.rs rename to crates/order_manager/src/_lib.rs diff --git a/crates/order_manager/src/actions/mod.rs b/crates/order_manager/src/actions/mod.rs new file mode 100644 index 0000000..fac5524 --- /dev/null +++ b/crates/order_manager/src/actions/mod.rs @@ -0,0 +1,3 @@ +pub mod order; +pub mod order_address; +pub mod order_item; diff --git a/crates/order_manager/src/actions/order.rs b/crates/order_manager/src/actions/order.rs new file mode 100644 index 0000000..6578a52 --- /dev/null +++ b/crates/order_manager/src/actions/order.rs @@ -0,0 +1,82 @@ +use channels::orders::{create_order, Error}; +use channels::AsyncClient; +use config::SharedAppConfig; +use db_utils::{begin_t, dbm_run, PgT}; +use model::OrderStatus; + +use crate::db::{CreateOrder, CreateOrderItem, Database}; + +pub async fn create_order( + input: create_order::Input, + db: Database, + mqtt: AsyncClient, + _config: SharedAppConfig, +) -> create_order::Output { + let mut t = begin_t!(db, Error::InternalServerError); + + match inner_create_order(input, &mut t).await { + Ok(order) => { + if let Err(e) = t.commit().await { + tracing::error!("{}", e); + Err(Error::InternalServerError) + } else { + mqtt.emit_order_created(&order.order); + Ok(order) + } + } + Err(e) => { + tracing::error!("{}", e); + t.rollback().await.ok(); + Err(Error::CreateOrder) + } + } +} + +pub async fn inner_create_order( + input: create_order::Input, + t: &mut PgT<'_>, +) -> create_order::Output { + let dbm = CreateOrder { + buyer_id: input.buyer_id, + shopping_cart_id: input.shopping_cart_id, + checkout_notes: input.checkout_notes, + delivery_address_id: input.delivery_address_id, + }; + + let order = dbm_run!(dbm, &mut *t, Error::CreateOrder); + + let mut order_items = Vec::with_capacity(input.items.len()); + for item in input.items { + let dbm = CreateOrderItem { + order_id: order.id, + product_id: item.product_id, + quantity: item.quantity, + quantity_unit: item.quantity_unit, + }; + + let item = dbm_run!(dbm, &mut *t, Error::CreateOrderItem); + order_items.push(item); + } + + Ok(create_order::Details { order, order_items }) +} + +pub fn change(current: OrderStatus, next: OrderStatus) -> Option { + match (current, next) { + // paying + (OrderStatus::Confirmed, OrderStatus::Payed) => Some(OrderStatus::Payed), + + // delivering + (OrderStatus::Confirmed | OrderStatus::Payed, OrderStatus::Delivered) => { + Some(OrderStatus::Delivered) + } + + // cancelling + (OrderStatus::Confirmed, OrderStatus::Cancelled) => Some(OrderStatus::Cancelled), + (OrderStatus::Payed, OrderStatus::Cancelled) => Some(OrderStatus::RequireRefund), + (OrderStatus::Payed, OrderStatus::RequireRefund) => Some(OrderStatus::RequireRefund), + (OrderStatus::RequireRefund, OrderStatus::Refunded) => Some(OrderStatus::Refunded), + + _ => None, + } +} diff --git a/crates/order_manager/src/actions/order_address.rs b/crates/order_manager/src/actions/order_address.rs new file mode 100644 index 0000000..e69de29 diff --git a/crates/order_manager/src/actions/order_item.rs b/crates/order_manager/src/actions/order_item.rs new file mode 100644 index 0000000..e69de29 diff --git a/crates/order_manager/src/context.rs b/crates/order_manager/src/context.rs new file mode 100644 index 0000000..e69de29 diff --git a/crates/order_manager/src/db/mod.rs b/crates/order_manager/src/db/mod.rs new file mode 100644 index 0000000..4e74bb9 --- /dev/null +++ b/crates/order_manager/src/db/mod.rs @@ -0,0 +1,45 @@ +pub mod order; +pub mod order_address; +pub mod order_item; + +use config::SharedAppConfig; +pub use order::*; +pub use order_address::*; +pub use order_item::*; +use sqlx_core::pool::Pool; +use sqlx_core::postgres::Postgres; + +#[derive(Clone)] +pub struct Database { + pub pool: sqlx::PgPool, + _config: SharedAppConfig, +} + +impl Database { + pub async fn build(config: SharedAppConfig) -> Self { + let url = config.lock().orders_manager().database_url.clone(); + let pool = sqlx::PgPool::connect(&url).await.unwrap_or_else(|e| { + tracing::error!("Failed to connect to database. {e:?}"); + std::process::exit(1); + }); + Self { + pool, + _config: config, + } + } + + pub fn pool(&self) -> Pool { + self.pool.clone() + } +} + +#[derive(Debug)] +pub struct CreateOrderAddress { + pub name: model::Name, + pub email: model::Email, + pub phone: model::Phone, + pub street: model::Street, + pub city: model::City, + pub country: model::Country, + pub zip: model::Zip, +} diff --git a/crates/order_manager/src/db/order.rs b/crates/order_manager/src/db/order.rs new file mode 100644 index 0000000..da66582 --- /dev/null +++ b/crates/order_manager/src/db/order.rs @@ -0,0 +1,58 @@ +use model::{AccountId, Order, OrderAddressId, OrderStatus, ShoppingCartId}; + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("Can't create order item")] + CantCreate, +} + +pub type Result = std::result::Result; + +#[derive(Debug)] +pub struct CreateOrder { + pub buyer_id: AccountId, + pub shopping_cart_id: Option, + pub checkout_notes: Option, + pub delivery_address_id: OrderAddressId, +} + +impl CreateOrder { + pub async fn run(self, t: &mut sqlx::Transaction<'_, sqlx::Postgres>) -> Result { + sqlx::query_as( + r#" +INSERT INTO orders (buyer_id, status, checkout_notes, address_id) +VALUES ($1, $2, $3, $4) +RETURNING id, buyer_id, status, order_ext_id, service_order_id, checkout_notes, address_id + "#, + ) + .bind(self.buyer_id) + .bind(OrderStatus::Confirmed) + .bind(self.checkout_notes.as_deref()) + .bind(self.delivery_address_id) + .fetch_one(&mut *t) + .await + .map_err(|e| { + tracing::error!("{e:?}"); + dbg!(e); + Error::CantCreate + }) + + // if let Some(shopping_cart_id) = msg.shopping_cart_id { + // if let Err(e) = shopping_cart_set_state( + // ShoppingCartSetState { + // id: shopping_cart_id, + // state: ShoppingCartState::Closed, + // checkout_notes: msg.checkout_notes, + // }, + // t, + // ) + // .await + // { + // dbg!(e); + // tracing::error!("{e:?}"); + // + // return Err(super::Error::AccountOrder(Error::CantCreate)); + // }; + // } + } +} diff --git a/crates/order_manager/src/db/order_address.rs b/crates/order_manager/src/db/order_address.rs new file mode 100644 index 0000000..e69de29 diff --git a/crates/order_manager/src/db/order_item.rs b/crates/order_manager/src/db/order_item.rs new file mode 100644 index 0000000..e887683 --- /dev/null +++ b/crates/order_manager/src/db/order_item.rs @@ -0,0 +1,40 @@ +use model::{OrderId, OrderItem, ProductId, Quantity, QuantityUnit}; + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("Can't create order item")] + CantCreate, +} + +pub type Result = std::result::Result; + +#[derive(Debug)] +pub struct CreateOrderItem { + pub order_id: OrderId, + pub product_id: ProductId, + pub quantity: Quantity, + pub quantity_unit: QuantityUnit, +} + +impl CreateOrderItem { + pub async fn run(self, t: &mut sqlx::Transaction<'_, sqlx::Postgres>) -> Result { + sqlx::query_as( + r#" +INSERT INTO order_items (product_id, order_id, quantity, quantity_unit) +VALUES ($1, $2, $3, $4) +RETURNING id, product_id, order_id, quantity, quantity_unit + "#, + ) + .bind(self.product_id) + .bind(self.order_id) + .bind(self.quantity) + .bind(self.quantity_unit) + .fetch_one(t) + .await + .map_err(|e| { + tracing::error!("{e:?}"); + dbg!(e); + Error::CantCreate + }) + } +} diff --git a/crates/order_manager/src/main.rs b/crates/order_manager/src/main.rs new file mode 100644 index 0000000..31801c4 --- /dev/null +++ b/crates/order_manager/src/main.rs @@ -0,0 +1,22 @@ +use config::UpdateConfig; + +mod actions; +mod db; +mod mqtt; +mod rpc; + +pub struct Opts {} + +impl UpdateConfig for Opts {} + +#[tokio::main] +async fn main() { + let opts = Opts {}; + + let config = config::default_load(&opts); + + let db = db::Database::build(config.clone()).await; + + let mqtt_client = mqtt::start(config.clone(), db.clone()).await; + rpc::start(config, db, mqtt_client).await; +} diff --git a/crates/order_manager/src/mqtt.rs b/crates/order_manager/src/mqtt.rs new file mode 100644 index 0000000..e278627 --- /dev/null +++ b/crates/order_manager/src/mqtt.rs @@ -0,0 +1,30 @@ +use config::SharedAppConfig; +use rumqttc::{Event, Incoming}; + +use crate::db::Database; + +pub async fn start(config: SharedAppConfig, _db: Database) -> channels::AsyncClient { + let (client, mut event_loop) = channels::orders::mqtt::create_client(config); + + let spawn_client = client.clone(); + tokio::spawn(async move { + let _client = spawn_client.clone(); + loop { + let notification = event_loop.poll().await; + + match notification { + Ok(Event::Incoming(Incoming::Publish(publish))) => match publish.topic.as_str() { + _ => {} + }, + Ok(Event::Incoming(_incoming)) => {} + Ok(Event::Outgoing(_outgoing)) => {} + Err(e) => { + tracing::warn!("{}", e); + } + } + } + // tracing::info!("Mqtt channel closed"); + }); + + client +} diff --git a/crates/order_manager/src/rpc.rs b/crates/order_manager/src/rpc.rs new file mode 100644 index 0000000..c4db962 --- /dev/null +++ b/crates/order_manager/src/rpc.rs @@ -0,0 +1,36 @@ +use channels::orders::create_order::{Input, Output}; +use channels::orders::rpc::Orders; +use channels::AsyncClient; +use config::SharedAppConfig; +use tarpc::context; + +use crate::actions; +use crate::db::Database; + +#[derive(Clone)] +pub struct OrdersServer { + pub db: Database, + pub mqtt_client: AsyncClient, + pub config: SharedAppConfig, +} + +#[tarpc::server] +impl Orders for OrdersServer { + async fn create_order(self, _: context::Context, input: Input) -> Output { + actions::order::create_order(input, self.db, self.mqtt_client, self.config).await + } +} + +pub async fn start(config: SharedAppConfig, db: Database, mqtt_client: AsyncClient) { + let port = { config.lock().stocks_manager().rpc_port }; + + channels::rpc::start("orders", port, || { + OrdersServer { + db: db.clone(), + config: config.clone(), + mqtt_client: mqtt_client.clone(), + } + .serve() + }) + .await; +} diff --git a/crates/stock_manager/Cargo.toml b/crates/stock_manager/Cargo.toml index 64b7d2a..5a6b8c8 100644 --- a/crates/stock_manager/Cargo.toml +++ b/crates/stock_manager/Cargo.toml @@ -32,6 +32,6 @@ tracing-subscriber = { version = "0.3.16", features = ["env-filter"] } uuid = { version = "1.2.1", features = ['v4'] } [dev-dependencies] -testx = { path = "../testx" } -insta = { version = "1.21.0" } fakeit = { version = "1.1.1" } +insta = { version = "1.21.0" } +testx = { path = "../testx" } diff --git a/crates/stock_manager/migrations/202204131841_init.sql b/crates/stock_manager/migrations/202204131841_init.sql index 8cd7425..1169d92 100644 --- a/crates/stock_manager/migrations/202204131841_init.sql +++ b/crates/stock_manager/migrations/202204131841_init.sql @@ -5,51 +5,57 @@ CREATE TYPE "QuantityUnit" AS ENUM ( 'dkg', 'kg', 'piece' -); + ); -CREATE TABLE photos ( - id serial NOT NULL PRIMARY KEY, - local_path character varying NOT NULL, - file_name character varying NOT NULL, +CREATE TABLE photos +( + id serial NOT NULL PRIMARY KEY, + local_path character varying NOT NULL, + file_name character varying NOT NULL, unique_name text DEFAULT (gen_random_uuid())::text NOT NULL ); -CREATE TABLE products ( - id serial NOT NULL PRIMARY KEY, - "name" character varying NOT NULL, - category character varying, +CREATE TABLE products +( + id serial NOT NULL PRIMARY KEY, + "name" character varying NOT NULL, + category character varying, deliver_days_flag integer DEFAULT 127 NOT NULL ); -CREATE TABLE product_variants ( - id serial NOT NULL PRIMARY KEY, - product_id integer REFERENCES products (id) ON DELETE CASCADE NOT NULL, - "name" character varying NOT NULL, - short_description character varying NOT NULL, - long_description character varying NOT NULL, - price integer NOT NULL, - quantity_unit "QuantityUnit" NOT NULL, +CREATE TABLE product_variants +( + id serial NOT NULL PRIMARY KEY, + product_id integer REFERENCES products (id) ON DELETE CASCADE NOT NULL, + "name" character varying NOT NULL, + short_description character varying NOT NULL, + long_description character varying NOT NULL, + price integer NOT NULL, + quantity_unit "QuantityUnit" NOT NULL, CONSTRAINT non_negative CHECK ((price >= 0)) ); -CREATE TABLE stocks ( - id serial NOT NULL PRIMARY KEY, - product_variant_id integer REFERENCES product_variants(id) ON DELETE CASCADE NOT NULL, - quantity integer DEFAULT 0 NOT NULL, - quantity_unit "QuantityUnit" NOT NULL, +CREATE TABLE stocks +( + id serial NOT NULL PRIMARY KEY, + product_variant_id integer REFERENCES product_variants (id) ON DELETE CASCADE NOT NULL, + quantity integer DEFAULT 0 NOT NULL, + quantity_unit "QuantityUnit" NOT NULL, CONSTRAINT positive_quantity CHECK ((quantity >= 0)) ); -CREATE TABLE product_photos ( - id serial NOT NULL PRIMARY KEY, - product_variant_id integer REFERENCES product_variants(id) ON DELETE CASCADE NOT NULL, - photo_id integer REFERENCES photos(id) NOT NULL +CREATE TABLE product_photos +( + id serial NOT NULL PRIMARY KEY, + product_variant_id integer REFERENCES product_variants (id) ON DELETE CASCADE NOT NULL, + photo_id integer REFERENCES photos (id) NOT NULL ); -CREATE TABLE categories ( - id serial NOT NULL PRIMARY KEY, +CREATE TABLE categories +( + id serial NOT NULL PRIMARY KEY, parent_id int references categories (id) ON DELETE CASCADE, - "name" varchar not null, - "key" varchar not null, - "svg" varchar not null + "name" varchar not null, + "key" varchar not null, + "svg" varchar not null ); diff --git a/crates/stock_manager/src/rpc.rs b/crates/stock_manager/src/rpc.rs index 848707e..080610d 100644 --- a/crates/stock_manager/src/rpc.rs +++ b/crates/stock_manager/src/rpc.rs @@ -1,175 +1,168 @@ +use channels::stocks::create_category::{Input, Output}; use channels::stocks::rpc::Stocks; +use channels::stocks::*; use channels::AsyncClient; use config::SharedAppConfig; +use tarpc::context; +use crate::actions; use crate::db::Database; -use crate::rpc::rpc::StocksServer; -pub mod rpc { - use channels::stocks::create_category::{Input, Output}; - use channels::stocks::rpc::Stocks; - use channels::stocks::*; - use config::SharedAppConfig; - use tarpc::context; +#[derive(Clone)] +pub struct StocksServer { + pub db: Database, + pub mqtt_client: AsyncClient, + pub config: SharedAppConfig, +} - use crate::actions; - - #[derive(Clone)] - pub struct StocksServer { - pub db: crate::db::Database, - pub mqtt_client: channels::AsyncClient, - pub config: SharedAppConfig, +#[tarpc::server] +impl Stocks for StocksServer { + async fn create_product( + self, + _: context::Context, + input: create_product::Input, + ) -> create_product::Output { + actions::create_product(input, self.db, self.mqtt_client, self.config).await } - #[tarpc::server] - impl Stocks for StocksServer { - async fn create_product( - self, - _: context::Context, - input: create_product::Input, - ) -> create_product::Output { - actions::create_product(input, self.db, self.mqtt_client, self.config).await - } + async fn update_product( + self, + _: context::Context, + input: update_product::Input, + ) -> update_product::Output { + actions::update_product(input, self.db, self.mqtt_client, self.config).await + } - async fn update_product( - self, - _: context::Context, - input: update_product::Input, - ) -> update_product::Output { - actions::update_product(input, self.db, self.mqtt_client, self.config).await - } + async fn delete_product( + self, + _: context::Context, + input: delete_product::Input, + ) -> delete_product::Output { + actions::delete_product(input, self.db, self.mqtt_client, self.config).await + } - async fn delete_product( - self, - _: context::Context, - input: delete_product::Input, - ) -> delete_product::Output { - actions::delete_product(input, self.db, self.mqtt_client, self.config).await - } + async fn detailed_product( + self, + _: context::Context, + input: detailed_product::Input, + ) -> detailed_product::Output { + actions::detailed_product(input, self.db, self.mqtt_client, self.config).await + } - async fn detailed_product( - self, - _: context::Context, - input: detailed_product::Input, - ) -> detailed_product::Output { - actions::detailed_product(input, self.db, self.mqtt_client, self.config).await - } + async fn detailed_products( + self, + _: context::Context, + input: detailed_products::Input, + ) -> detailed_products::Output { + actions::detailed_products(input, self.db, self.mqtt_client, self.config).await + } - async fn detailed_products( - self, - _: context::Context, - input: detailed_products::Input, - ) -> detailed_products::Output { - actions::detailed_products(input, self.db, self.mqtt_client, self.config).await - } + async fn shopping_cart_products( + self, + _: context::Context, + input: find_products::Input, + ) -> find_products::Output { + actions::find_products(input, self.db, self.mqtt_client, self.config).await + } - async fn shopping_cart_products( - self, - _: context::Context, - input: find_products::Input, - ) -> find_products::Output { - actions::find_products(input, self.db, self.mqtt_client, self.config).await - } + async fn create_product_variant( + self, + _: context::Context, + input: create_product_variant::Input, + ) -> create_product_variant::Output { + actions::create_product_variant(input, self.db, self.mqtt_client, self.config).await + } - async fn create_product_variant( - self, - _: context::Context, - input: create_product_variant::Input, - ) -> create_product_variant::Output { - actions::create_product_variant(input, self.db, self.mqtt_client, self.config).await - } + async fn update_product_variant( + self, + _: context::Context, + input: update_product_variant::Input, + ) -> update_product_variant::Output { + actions::update_product_variant(input, self.db, self.mqtt_client, self.config).await + } - async fn update_product_variant( - self, - _: context::Context, - input: update_product_variant::Input, - ) -> update_product_variant::Output { - actions::update_product_variant(input, self.db, self.mqtt_client, self.config).await - } + async fn delete_product_variant( + self, + _: context::Context, + input: delete_product_variant::Input, + ) -> delete_product_variant::Output { + actions::delete_product_variant(input, self.db, self.mqtt_client, self.config).await + } - async fn delete_product_variant( - self, - _: context::Context, - input: delete_product_variant::Input, - ) -> delete_product_variant::Output { - actions::delete_product_variant(input, self.db, self.mqtt_client, self.config).await - } + async fn shopping_cart_product_variants( + self, + _: context::Context, + input: find_product_variants::Input, + ) -> find_product_variants::Output { + actions::find_product_variants(input, self.db, self.mqtt_client, self.config).await + } - async fn shopping_cart_product_variants( - self, - _: context::Context, - input: find_product_variants::Input, - ) -> find_product_variants::Output { - actions::find_product_variants(input, self.db, self.mqtt_client, self.config).await - } + async fn all_product_photo( + self, + _: context::Context, + input: all_product_photo::Input, + ) -> all_product_photo::Output { + actions::all_product_photo(input, self.db, self.mqtt_client, self.config).await + } - async fn all_product_photo( - self, - _: context::Context, - input: all_product_photo::Input, - ) -> all_product_photo::Output { - actions::all_product_photo(input, self.db, self.mqtt_client, self.config).await - } + async fn add_product_photo( + self, + _: context::Context, + input: add_product_photo::Input, + ) -> add_product_photo::Output { + actions::add_product_photo(input, self.db, self.mqtt_client, self.config).await + } - async fn add_product_photo( - self, - _: context::Context, - input: add_product_photo::Input, - ) -> add_product_photo::Output { - actions::add_product_photo(input, self.db, self.mqtt_client, self.config).await - } + async fn delete_product_photo( + self, + _: context::Context, + input: delete_product_photo::Input, + ) -> delete_product_photo::Output { + actions::delete_product_photo(input, self.db, self.mqtt_client, self.config).await + } - async fn delete_product_photo( - self, - _: context::Context, - input: delete_product_photo::Input, - ) -> delete_product_photo::Output { - actions::delete_product_photo(input, self.db, self.mqtt_client, self.config).await - } + async fn create_product_stock( + self, + _: context::Context, + input: create_product_stock::Input, + ) -> create_product_stock::Output { + actions::create_product_stock(input, self.db, self.mqtt_client, self.config).await + } - async fn create_product_stock( - self, - _: context::Context, - input: create_product_stock::Input, - ) -> create_product_stock::Output { - actions::create_product_stock(input, self.db, self.mqtt_client, self.config).await - } + async fn update_product_stock( + self, + _: context::Context, + input: update_product_stock::Input, + ) -> update_product_stock::Output { + actions::update_product_stock(input, self.db, self.mqtt_client, self.config).await + } - async fn update_product_stock( - self, - _: context::Context, - input: update_product_stock::Input, - ) -> update_product_stock::Output { - actions::update_product_stock(input, self.db, self.mqtt_client, self.config).await - } + async fn create_category(self, _: context::Context, input: Input) -> Output { + actions::create_category(input, self.db, self.mqtt_client, self.config).await + } - async fn create_category(self, _: context::Context, input: Input) -> Output { - actions::create_category(input, self.db, self.mqtt_client, self.config).await - } + async fn delete_category( + self, + _: context::Context, + input: delete_category::Input, + ) -> delete_category::Output { + actions::delete_category(input, self.db, self.mqtt_client, self.config).await + } - async fn delete_category( - self, - _: context::Context, - input: delete_category::Input, - ) -> delete_category::Output { - actions::delete_category(input, self.db, self.mqtt_client, self.config).await - } + async fn update_category( + self, + _: context::Context, + input: update_category::Input, + ) -> update_category::Output { + actions::update_category(input, self.db, self.mqtt_client, self.config).await + } - async fn update_category( - self, - _: context::Context, - input: update_category::Input, - ) -> update_category::Output { - actions::update_category(input, self.db, self.mqtt_client, self.config).await - } - - async fn all_categories( - self, - _: context::Context, - input: all_categories::Input, - ) -> all_categories::Output { - actions::all_categories(input, self.db, self.mqtt_client, self.config).await - } + async fn all_categories( + self, + _: context::Context, + input: all_categories::Input, + ) -> all_categories::Output { + actions::all_categories(input, self.db, self.mqtt_client, self.config).await } } diff --git a/scripts/migrate.sh b/scripts/migrate.sh index 5e137f7..9eb3592 100755 --- a/scripts/migrate.sh +++ b/scripts/migrate.sh @@ -7,6 +7,7 @@ then psql postgres postgres -c "DROP DATABASE ${DATABASE_NAME}_accounts" || echo 0 psql postgres postgres -c "DROP DATABASE ${DATABASE_NAME}_carts" || echo 0 psql postgres postgres -c "DROP DATABASE ${DATABASE_NAME}_stocks" || echo 0 + psql postgres postgres -c "DROP DATABASE ${DATABASE_NAME}_orders" || echo 0 fi psql postgres postgres -c "CREATE DATABASE ${DATABASE_NAME}_accounts" || echo 0 @@ -17,3 +18,6 @@ sqlx migrate run -D "${CART_DATABASE_URL}" --source ./crates/cart_manager/migrat psql postgres postgres -c "CREATE DATABASE ${DATABASE_NAME}_stocks" || echo 0 sqlx migrate run -D "${STOCK_DATABASE_URL}" --source ./crates/stock_manager/migrations + +psql postgres postgres -c "CREATE DATABASE ${DATABASE_NAME}_orders" || echo 0 +sqlx migrate run -D "${ORDER_DATABASE_URL}" --source ./crates/order_manager/migrations