From 0d5e450656a8fb6cb6b6f10f72143d72f18563fb Mon Sep 17 00:00:00 2001 From: eraden Date: Mon, 14 Nov 2022 21:11:45 +0100 Subject: [PATCH] Emit product created and updated events --- crates/channels/src/stocks/mod.rs | 2 + crates/channels/src/stocks/product.rs | 58 +++++++++++++++++- crates/stock_manager/src/actions/product.rs | 66 +++++++++++++++++---- 3 files changed, 113 insertions(+), 13 deletions(-) diff --git a/crates/channels/src/stocks/mod.rs b/crates/channels/src/stocks/mod.rs index 134b5a6..213d85b 100644 --- a/crates/channels/src/stocks/mod.rs +++ b/crates/channels/src/stocks/mod.rs @@ -30,6 +30,8 @@ pub enum Error { VariantPhotos(Vec), #[error("Failed to create product")] CreateProduct, + #[error("Failed to update product {0:?}")] + UpdateProduct(ProductId), #[error("Failed to create variant of product {0:?}")] CreateProductVariant(ProductId), #[error("Failed to create stock of variant {0:?}")] diff --git a/crates/channels/src/stocks/product.rs b/crates/channels/src/stocks/product.rs index 4c66960..03383b2 100644 --- a/crates/channels/src/stocks/product.rs +++ b/crates/channels/src/stocks/product.rs @@ -1,3 +1,10 @@ +use bytes::Bytes; +use model::v2::DetailedProduct; +use rumqttc::QoS; +use serde::de::DeserializeOwned; + +use crate::{AsyncClient, DeserializePayload}; + pub mod create_product { use model::v2::*; @@ -37,17 +44,22 @@ pub mod create_product { pub mod update_product { use model::v2::*; + use crate::stocks::Error; + #[derive(Debug, serde::Serialize, serde::Deserialize)] pub struct Input { + pub id: ProductId, pub name: ProductName, pub category: Option, pub deliver_days_flag: Days, } #[derive(Debug, serde::Serialize, serde::Deserialize)] - pub struct Output { - pub product: model::Product, + pub struct Details { + pub product: Product, } + + pub type Output = Result; } pub mod delete_product { @@ -67,3 +79,45 @@ pub mod delete_product { pub type Output = Result; } + +#[derive(Copy, Clone, Debug, PartialOrd, PartialEq, serde::Serialize, serde::Deserialize)] +pub enum Topic { + ProductCreated, + ProductUpdated, +} + +impl Topic { + pub fn to_str(&self) -> &str { + match self { + Topic::ProductCreated => "product/created", + Topic::ProductUpdated => "product/updated", + } + } +} + +impl Into for Topic { + fn into(self) -> String { + self.to_str().into() + } +} + +impl DeserializePayload for Topic { + fn deserialize_payload(self, bytes: Bytes) -> Option { + match self { + Topic::ProductCreated => bincode::deserialize(bytes.as_ref()).ok(), + Topic::ProductUpdated => bincode::deserialize(bytes.as_ref()).ok(), + } + } +} + +impl AsyncClient { + pub async fn emit_product_created(&self, product: &DetailedProduct) { + self.publish_or_log(Topic::ProductCreated, QoS::AtLeastOnce, true, product) + .await + } + + pub async fn emit_product_updated(&self, product: &model::v2::Product) { + self.publish_or_log(Topic::ProductUpdated, QoS::AtLeastOnce, true, product) + .await + } +} diff --git a/crates/stock_manager/src/actions/product.rs b/crates/stock_manager/src/actions/product.rs index 515dd11..0a1e84d 100644 --- a/crates/stock_manager/src/actions/product.rs +++ b/crates/stock_manager/src/actions/product.rs @@ -10,23 +10,25 @@ use crate::db::Database; pub async fn create_product( input: create_product::Input, db: Database, - _mqtt: AsyncClient, + mqtt: AsyncClient, _config: SharedAppConfig, ) -> create_product::Output { let mut t = begin_t!(db, Error::InternalServerError); - match inner_create_product(input, &mut t, Some(_mqtt), Some(_config)).await { + match inner_create_product(input, &mut t, Some(_config)).await { Ok(res) => { if let Err(e) = t.commit().await { tracing::error!("{}", e); - return Err(Error::InternalServerError); + Err(Error::InternalServerError) + } else { + mqtt.emit_product_created(&res.product).await; + Ok(res) } - return Ok(res); } Err(e) => { tracing::error!("{}", e); t.rollback().await.ok(); - return Err(e); + Err(e) } } } @@ -34,7 +36,6 @@ pub async fn create_product( async fn inner_create_product( input: create_product::Input, t: &mut PgT<'_>, - _mqtt: Option, _config: Option, ) -> create_product::Output { use create_product::*; @@ -103,12 +104,56 @@ async fn inner_create_product( } pub async fn update_product( - _input: update_product::Input, - _db: Database, - _mqtt: AsyncClient, + input: update_product::Input, + db: Database, + mqtt: AsyncClient, _config: SharedAppConfig, ) -> update_product::Output { - todo!() + let mut t = begin_t!(db, Error::InternalServerError); + + let res = inner_update_product(input, &mut t, Some(_config)).await; + + match res { + Ok(res) => { + t.commit().await.map_err(|e| { + tracing::error!("{}", e); + Error::InternalServerError + })?; + + mqtt.emit_product_updated(&res.product).await; + + Ok(res) + } + Err(e) => { + tracing::warn!("{}", e); + t.rollback().await.map_err(|e| { + tracing::error!("{}", e); + Error::InternalServerError + })?; + Err(e) + } + } +} + +async fn inner_update_product( + input: update_product::Input, + t: &mut PgT<'_>, + _config: Option, +) -> update_product::Output { + let dbm = crate::db::UpdateProduct { + id: input.id, + name: input.name, + category: input.category, + deliver_days_flag: input.deliver_days_flag, + }; + + dbm.run(t) + .await + .map_err(|e| { + tracing::warn!("{}", e); + Error::UpdateProduct(input.id) + }) + .map(|product| update_product::Details { product }) } pub async fn delete_product( @@ -153,7 +198,6 @@ mod tests { }, &mut t, None, - None, ) .await;