use actix::{Actor, Context}; use config::SharedAppConfig; use sqlx::PgPool; use sqlx_core::arguments::Arguments; pub use crate::account_addresses::*; pub use crate::accounts::*; pub use crate::order_addresses::*; pub use crate::order_items::*; pub use crate::orders::*; pub use crate::photos::*; pub use crate::product_photos::*; pub use crate::products::*; pub use crate::shopping_cart_items::*; pub use crate::shopping_carts::*; pub use crate::stocks::*; pub use crate::tokens::*; pub mod account_addresses; pub mod accounts; pub mod order_addresses; pub mod order_items; pub mod orders; pub mod photos; pub mod product_photos; pub mod products; pub mod shopping_cart_items; pub mod shopping_carts; pub mod stocks; pub mod tokens; #[macro_export] macro_rules! db_async_handler { ($msg: ty, $async: ident, $res: ty) => { impl actix::Handler<$msg> for crate::Database { type Result = actix::ResponseActFuture>; fn handle(&mut self, msg: $msg, _ctx: &mut Self::Context) -> Self::Result { use actix::WrapFuture; let pool = self.pool.clone(); Box::pin(async { $async(msg, pool).await }.into_actor(self)) } } }; ($msg: ty, $async: ident, $res: ty, $inner_async: ident) => { async fn $inner_async(msg: $msg, pool: sqlx::PgPool) -> Result<$res> { let mut t = pool.begin().await.map_err(|e| { log::error!("{:?}", e); $crate::Error::TransactionFailed })?; match $async(msg, &mut t).await { Ok(res) => { t.commit().await.map_err(|e| { log::error!("{:?}", e); $crate::Error::TransactionFailed })?; Ok(res) } Err(e) => { let _ = t.rollback().await; Err(e) } } } impl actix::Handler<$msg> for crate::Database { type Result = actix::ResponseActFuture>; fn handle(&mut self, msg: $msg, _ctx: &mut Self::Context) -> Self::Result { use actix::WrapFuture; let pool = self.pool.clone(); Box::pin(async { $inner_async(msg, pool).await }.into_actor(self)) } } }; } #[macro_export] macro_rules! query_db { ($db: expr, $msg: expr, default $fail: expr) => { match $db.send($msg).await { Ok(Ok(r)) => r, Ok(Err(e)) => { log::error!("{e}"); $fail } Err(e) => { log::error!("{e:?}"); $fail } } }; ($db: expr, $msg: expr, $fail: expr) => { $crate::query_db!($db, $msg, $fail, $fail) }; ($db: expr, $msg: expr, $db_fail: expr, $act_fail: expr) => { match $db.send($msg).await { Ok(Ok(r)) => r, Ok(Err(e)) => { log::error!("{e}"); return Err($db_fail); } Err(e) => { log::error!("{e:?}"); return Err($act_fail); } } }; ($db: expr, $msg: expr, passthrough $db_fail: expr, $act_fail: expr) => { match $db.send($msg).await { Ok(Ok(r)) => r, Ok(Err(e)) => { log::error!("{e}"); return Err($db_fail(e.into())); } Err(e) => { log::error!("{e:?}"); return Err($act_fail); } } }; } #[derive(Debug, Copy, Clone, PartialEq, serde::Serialize, thiserror::Error)] #[serde(rename_all = "kebab-case")] pub enum Error { #[error("{0}")] Account(#[from] accounts::Error), #[error("{0}")] AccountOrder(#[from] orders::Error), #[error("{0}")] Product(#[from] products::Error), #[error("{0}")] Stock(#[from] stocks::Error), #[error("{0}")] OrderItem(#[from] order_items::Error), #[error("{0}")] ShoppingCart(#[from] shopping_carts::Error), #[error("{0}")] ShoppingCartItem(#[from] shopping_cart_items::Error), #[error("{0}")] Token(#[from] tokens::Error), #[error("{0}")] Photo(#[from] photos::Error), #[error("{0}")] ProductPhoto(#[from] product_photos::Error), #[error("{0}")] AccountAddress(#[from] account_addresses::Error), #[error("{0}")] OrderAddress(#[from] order_addresses::Error), #[error("Failed to start or finish transaction")] TransactionFailed, } pub type Result = std::result::Result; pub struct Database { pool: PgPool, } pub type SharedDatabase = actix::Addr; impl Clone for Database { fn clone(&self) -> Self { Self { pool: self.pool.clone(), } } } impl Database { pub async fn build(config: SharedAppConfig) -> Self { let url = config.lock().database().url(); let pool = PgPool::connect(&url).await.unwrap_or_else(|e| { log::error!("Failed to connect to database. {e:?}"); std::process::exit(1); }); Database { pool } } pub fn pool(&self) -> &PgPool { &self.pool } } impl Actor for Database { type Context = Context; } /// Multi-query load for large amount of records to read /// /// Examples /// /// ``` /// # use database_manager::photos::Error; /// async fn load(pool: sqlx::PgPool) { /// use database_manager::MultiLoad; /// let mut t = pool.begin().await.unwrap(); /// let mut multi = MultiLoad::new( /// &mut t, /// "SELECT id, name FROM products WHERE ", /// " id = " /// ); /// let products: Vec = multi.load(4, vec![1, 2, 3, 4].into_iter(), |_| Error::All.into()) /// .await.unwrap(); /// t.commit().await.unwrap(); /// } /// ``` pub struct MultiLoad<'transaction, 'transaction2, 'header, 'condition, T> { pool: &'transaction mut sqlx::Transaction<'transaction2, sqlx::Postgres>, header: &'header str, condition: &'condition str, __phantom: std::marker::PhantomData, } impl<'transaction, 'transaction2, 'header, 'condition, T> MultiLoad<'transaction, 'transaction2, 'header, 'condition, T> where T: for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow> + Send + Unpin, { pub fn new( pool: &'transaction mut sqlx::Transaction<'transaction2, sqlx::Postgres>, header: &'header str, condition: &'condition str, ) -> Self { Self { pool, header, condition, __phantom: Default::default(), } } pub async fn load<'query, Error, Ids>( &mut self, len: usize, items: Ids, on_error: Error, ) -> std::result::Result, crate::Error> where Ids: Iterator, Error: Fn(sqlx::Error) -> crate::Error, { let mut res = Vec::new(); for ids in items.fold( Vec::>::with_capacity(len), |mut v, id| { if matches!(v.last().map(|v| v.len()), Some(20) | None) { v.push(Vec::with_capacity(20)); } v.last_mut().unwrap().push(id); v }, ) { let query: String = self.header.into(); let query = ids.iter().enumerate().fold(query, |mut q, (idx, _id)| { if idx != 0 { q.push_str(" OR"); } q.push_str(&format!(" {} ${}", self.condition, idx + 1)); q }); let q = sqlx::query_as_with( query.as_str(), ids.into_iter() .fold(sqlx::postgres::PgArguments::default(), |mut args, id| { args.add(id); args }), ); let records: Vec = match q.fetch_all(&mut *self.pool).await { Ok(rec) => rec, Err(e) => return Err(on_error(e)), }; res.extend(records); } Ok(res) } }