Emit product created and updated events

This commit is contained in:
eraden 2022-11-14 21:11:45 +01:00
parent c74e61f1f6
commit 0d5e450656
3 changed files with 113 additions and 13 deletions

View File

@ -30,6 +30,8 @@ pub enum Error {
VariantPhotos(Vec<ProductVariantId>), VariantPhotos(Vec<ProductVariantId>),
#[error("Failed to create product")] #[error("Failed to create product")]
CreateProduct, CreateProduct,
#[error("Failed to update product {0:?}")]
UpdateProduct(ProductId),
#[error("Failed to create variant of product {0:?}")] #[error("Failed to create variant of product {0:?}")]
CreateProductVariant(ProductId), CreateProductVariant(ProductId),
#[error("Failed to create stock of variant {0:?}")] #[error("Failed to create stock of variant {0:?}")]

View File

@ -1,3 +1,10 @@
use bytes::Bytes;
use model::v2::DetailedProduct;
use rumqttc::QoS;
use serde::de::DeserializeOwned;
use crate::{AsyncClient, DeserializePayload};
pub mod create_product { pub mod create_product {
use model::v2::*; use model::v2::*;
@ -37,17 +44,22 @@ pub mod create_product {
pub mod update_product { pub mod update_product {
use model::v2::*; use model::v2::*;
use crate::stocks::Error;
#[derive(Debug, serde::Serialize, serde::Deserialize)] #[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct Input { pub struct Input {
pub id: ProductId,
pub name: ProductName, pub name: ProductName,
pub category: Option<ProductCategory>, pub category: Option<ProductCategory>,
pub deliver_days_flag: Days, pub deliver_days_flag: Days,
} }
#[derive(Debug, serde::Serialize, serde::Deserialize)] #[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct Output { pub struct Details {
pub product: model::Product, pub product: Product,
} }
pub type Output = Result<Details, Error>;
} }
pub mod delete_product { pub mod delete_product {
@ -67,3 +79,45 @@ pub mod delete_product {
pub type Output = Result<Details, Error>; pub type Output = Result<Details, Error>;
} }
#[derive(Copy, Clone, Debug, PartialOrd, PartialEq, serde::Serialize, serde::Deserialize)]
pub enum Topic {
ProductCreated,
ProductUpdated,
}
impl Topic {
pub fn to_str(&self) -> &str {
match self {
Topic::ProductCreated => "product/created",
Topic::ProductUpdated => "product/updated",
}
}
}
impl Into<String> for Topic {
fn into(self) -> String {
self.to_str().into()
}
}
impl DeserializePayload for Topic {
fn deserialize_payload<T: DeserializeOwned>(self, bytes: Bytes) -> Option<T> {
match self {
Topic::ProductCreated => bincode::deserialize(bytes.as_ref()).ok(),
Topic::ProductUpdated => bincode::deserialize(bytes.as_ref()).ok(),
}
}
}
impl AsyncClient {
pub async fn emit_product_created(&self, product: &DetailedProduct) {
self.publish_or_log(Topic::ProductCreated, QoS::AtLeastOnce, true, product)
.await
}
pub async fn emit_product_updated(&self, product: &model::v2::Product) {
self.publish_or_log(Topic::ProductUpdated, QoS::AtLeastOnce, true, product)
.await
}
}

View File

@ -10,23 +10,25 @@ use crate::db::Database;
pub async fn create_product( pub async fn create_product(
input: create_product::Input, input: create_product::Input,
db: Database, db: Database,
_mqtt: AsyncClient, mqtt: AsyncClient,
_config: SharedAppConfig, _config: SharedAppConfig,
) -> create_product::Output { ) -> create_product::Output {
let mut t = begin_t!(db, Error::InternalServerError); let mut t = begin_t!(db, Error::InternalServerError);
match inner_create_product(input, &mut t, Some(_mqtt), Some(_config)).await { match inner_create_product(input, &mut t, Some(_config)).await {
Ok(res) => { Ok(res) => {
if let Err(e) = t.commit().await { if let Err(e) = t.commit().await {
tracing::error!("{}", e); tracing::error!("{}", e);
return Err(Error::InternalServerError); Err(Error::InternalServerError)
} else {
mqtt.emit_product_created(&res.product).await;
Ok(res)
} }
return Ok(res);
} }
Err(e) => { Err(e) => {
tracing::error!("{}", e); tracing::error!("{}", e);
t.rollback().await.ok(); t.rollback().await.ok();
return Err(e); Err(e)
} }
} }
} }
@ -34,7 +36,6 @@ pub async fn create_product(
async fn inner_create_product( async fn inner_create_product(
input: create_product::Input, input: create_product::Input,
t: &mut PgT<'_>, t: &mut PgT<'_>,
_mqtt: Option<AsyncClient>,
_config: Option<SharedAppConfig>, _config: Option<SharedAppConfig>,
) -> create_product::Output { ) -> create_product::Output {
use create_product::*; use create_product::*;
@ -103,12 +104,56 @@ async fn inner_create_product(
} }
pub async fn update_product( pub async fn update_product(
_input: update_product::Input, input: update_product::Input,
_db: Database, db: Database,
_mqtt: AsyncClient, mqtt: AsyncClient,
_config: SharedAppConfig, _config: SharedAppConfig,
) -> update_product::Output { ) -> update_product::Output {
todo!() let mut t = begin_t!(db, Error::InternalServerError);
let res = inner_update_product(input, &mut t, Some(_config)).await;
match res {
Ok(res) => {
t.commit().await.map_err(|e| {
tracing::error!("{}", e);
Error::InternalServerError
})?;
mqtt.emit_product_updated(&res.product).await;
Ok(res)
}
Err(e) => {
tracing::warn!("{}", e);
t.rollback().await.map_err(|e| {
tracing::error!("{}", e);
Error::InternalServerError
})?;
Err(e)
}
}
}
async fn inner_update_product(
input: update_product::Input,
t: &mut PgT<'_>,
_config: Option<SharedAppConfig>,
) -> update_product::Output {
let dbm = crate::db::UpdateProduct {
id: input.id,
name: input.name,
category: input.category,
deliver_days_flag: input.deliver_days_flag,
};
dbm.run(t)
.await
.map_err(|e| {
tracing::warn!("{}", e);
Error::UpdateProduct(input.id)
})
.map(|product| update_product::Details { product })
} }
pub async fn delete_product( pub async fn delete_product(
@ -153,7 +198,6 @@ mod tests {
}, },
&mut t, &mut t,
None, None,
None,
) )
.await; .await;