298 lines
8.5 KiB
Rust
298 lines
8.5 KiB
Rust
use actix::{Actor, Context};
|
|
use config::SharedAppConfig;
|
|
use sqlx::PgPool;
|
|
use sqlx_core::arguments::Arguments;
|
|
|
|
pub use crate::account_addresses::*;
|
|
pub use crate::accounts::*;
|
|
pub use crate::order_addresses::*;
|
|
pub use crate::order_items::*;
|
|
pub use crate::orders::*;
|
|
pub use crate::photos::*;
|
|
pub use crate::product_photos::*;
|
|
pub use crate::products::*;
|
|
pub use crate::shopping_cart_items::*;
|
|
pub use crate::shopping_carts::*;
|
|
pub use crate::stocks::*;
|
|
pub use crate::tokens::*;
|
|
|
|
pub mod account_addresses;
|
|
pub mod accounts;
|
|
pub mod order_addresses;
|
|
pub mod order_items;
|
|
pub mod orders;
|
|
pub mod photos;
|
|
pub mod product_photos;
|
|
pub mod products;
|
|
pub mod shopping_cart_items;
|
|
pub mod shopping_carts;
|
|
pub mod stocks;
|
|
pub mod tokens;
|
|
|
|
#[macro_export]
|
|
macro_rules! db_async_handler {
|
|
($msg: ty, $async: ident, $res: ty) => {
|
|
impl actix::Handler<$msg> for crate::Database {
|
|
type Result = actix::ResponseActFuture<Self, Result<$res>>;
|
|
|
|
fn handle(&mut self, msg: $msg, _ctx: &mut Self::Context) -> Self::Result {
|
|
use actix::WrapFuture;
|
|
let pool = self.pool.clone();
|
|
Box::pin(async { $async(msg, pool).await }.into_actor(self))
|
|
}
|
|
}
|
|
};
|
|
($msg: ty, $async: ident, $res: ty, $inner_async: ident) => {
|
|
async fn $inner_async(msg: $msg, pool: sqlx::PgPool) -> Result<$res> {
|
|
let mut t = pool.begin().await.map_err(|e| {
|
|
tracing::error!("{:?}", e);
|
|
$crate::Error::TransactionFailed
|
|
})?;
|
|
match $async(msg, &mut t).await {
|
|
Ok(res) => {
|
|
t.commit().await.map_err(|e| {
|
|
tracing::error!("{:?}", e);
|
|
$crate::Error::TransactionFailed
|
|
})?;
|
|
Ok(res)
|
|
}
|
|
Err(e) => {
|
|
let _ = t.rollback().await;
|
|
Err(e)
|
|
}
|
|
}
|
|
}
|
|
|
|
impl actix::Handler<$msg> for crate::Database {
|
|
type Result = actix::ResponseActFuture<Self, Result<$res>>;
|
|
|
|
fn handle(&mut self, msg: $msg, _ctx: &mut Self::Context) -> Self::Result {
|
|
use actix::WrapFuture;
|
|
let pool = self.pool.clone();
|
|
Box::pin(async { $inner_async(msg, pool).await }.into_actor(self))
|
|
}
|
|
}
|
|
};
|
|
}
|
|
|
|
#[macro_export]
|
|
macro_rules! query_db {
|
|
($db: expr, $msg: expr, default $fail: expr) => {
|
|
match $db.send($msg).await {
|
|
Ok(Ok(r)) => r,
|
|
Ok(Err(e)) => {
|
|
tracing::error!("{e}");
|
|
$fail
|
|
}
|
|
Err(e) => {
|
|
tracing::error!("{e:?}");
|
|
$fail
|
|
}
|
|
}
|
|
};
|
|
|
|
($db: expr, $msg: expr, $fail: expr) => {
|
|
$crate::query_db!($db, $msg, $fail, $fail)
|
|
};
|
|
|
|
($db: expr, $msg: expr, $db_fail: expr, $act_fail: expr) => {
|
|
match $db.send($msg).await {
|
|
Ok(Ok(r)) => r,
|
|
Ok(Err(e)) => {
|
|
tracing::error!("{e}");
|
|
return Err($db_fail);
|
|
}
|
|
Err(e) => {
|
|
tracing::error!("{e:?}");
|
|
return Err($act_fail);
|
|
}
|
|
}
|
|
};
|
|
|
|
($db: expr, $msg: expr, passthrough $db_fail: expr, $act_fail: expr) => {
|
|
match $db.send($msg).await {
|
|
Ok(Ok(r)) => r,
|
|
Ok(Err(e)) => {
|
|
tracing::error!("{e}");
|
|
return Err($db_fail(e.into()));
|
|
}
|
|
Err(e) => {
|
|
tracing::error!("{e:?}");
|
|
return Err($act_fail);
|
|
}
|
|
}
|
|
};
|
|
}
|
|
|
|
#[derive(Debug, Copy, Clone, PartialEq, serde::Serialize, thiserror::Error)]
|
|
#[serde(rename_all = "kebab-case")]
|
|
pub enum Error {
|
|
#[error("{0}")]
|
|
Account(#[from] accounts::Error),
|
|
#[error("{0}")]
|
|
AccountOrder(#[from] orders::Error),
|
|
#[error("{0}")]
|
|
Product(#[from] products::Error),
|
|
#[error("{0}")]
|
|
Stock(#[from] stocks::Error),
|
|
#[error("{0}")]
|
|
OrderItem(#[from] order_items::Error),
|
|
#[error("{0}")]
|
|
ShoppingCart(#[from] shopping_carts::Error),
|
|
#[error("{0}")]
|
|
ShoppingCartItem(#[from] shopping_cart_items::Error),
|
|
#[error("{0}")]
|
|
Token(#[from] tokens::Error),
|
|
#[error("{0}")]
|
|
Photo(#[from] photos::Error),
|
|
#[error("{0}")]
|
|
ProductPhoto(#[from] product_photos::Error),
|
|
#[error("{0}")]
|
|
AccountAddress(#[from] account_addresses::Error),
|
|
#[error("{0}")]
|
|
OrderAddress(#[from] order_addresses::Error),
|
|
#[error("Failed to start or finish transaction")]
|
|
TransactionFailed,
|
|
}
|
|
|
|
pub type Result<T> = std::result::Result<T, Error>;
|
|
|
|
pub struct Database {
|
|
pool: PgPool,
|
|
}
|
|
|
|
pub type SharedDatabase = actix::Addr<Database>;
|
|
|
|
impl Clone for Database {
|
|
fn clone(&self) -> Self {
|
|
Self {
|
|
pool: self.pool.clone(),
|
|
}
|
|
}
|
|
}
|
|
|
|
impl Database {
|
|
pub async fn build(config: SharedAppConfig) -> Self {
|
|
let url = config.lock().database().url();
|
|
let pool = PgPool::connect(&url).await.unwrap_or_else(|e| {
|
|
tracing::error!("Failed to connect to database. {e:?}");
|
|
std::process::exit(1);
|
|
});
|
|
Database { pool }
|
|
}
|
|
|
|
pub fn pool(&self) -> &PgPool {
|
|
&self.pool
|
|
}
|
|
}
|
|
|
|
impl Actor for Database {
|
|
type Context = Context<Self>;
|
|
}
|
|
|
|
/// Multi-query load for large amount of records to read
|
|
///
|
|
/// Examples
|
|
///
|
|
/// ```
|
|
/// # use database_manager::photos::Error;
|
|
/// async fn load(pool: sqlx::PgPool) {
|
|
/// use database_manager::MultiLoad;
|
|
/// let mut t = pool.begin().await.unwrap();
|
|
/// let mut multi = MultiLoad::new(
|
|
/// &mut t,
|
|
/// "SELECT id, name FROM products WHERE ",
|
|
/// " id = "
|
|
/// );
|
|
/// let products: Vec<model::Product> = multi.load(4, vec![1, 2, 3, 4].into_iter(), |_| Error::All.into())
|
|
/// .await.unwrap();
|
|
/// t.commit().await.unwrap();
|
|
/// }
|
|
/// ```
|
|
pub struct MultiLoad<'transaction, 'transaction2, 'header, 'condition, T> {
|
|
pool: &'transaction mut sqlx::Transaction<'transaction2, sqlx::Postgres>,
|
|
header: &'header str,
|
|
condition: &'condition str,
|
|
sort: Option<String>,
|
|
__phantom: std::marker::PhantomData<T>,
|
|
}
|
|
|
|
impl<'transaction, 'transaction2, 'header, 'condition, T>
|
|
MultiLoad<'transaction, 'transaction2, 'header, 'condition, T>
|
|
where
|
|
T: for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow> + Send + Unpin,
|
|
{
|
|
pub fn new(
|
|
pool: &'transaction mut sqlx::Transaction<'transaction2, sqlx::Postgres>,
|
|
header: &'header str,
|
|
condition: &'condition str,
|
|
) -> Self {
|
|
Self {
|
|
pool,
|
|
header,
|
|
condition,
|
|
sort: None,
|
|
__phantom: Default::default(),
|
|
}
|
|
}
|
|
|
|
pub fn with_sorting<S: Into<String>>(mut self, order: S) -> Self {
|
|
self.sort = Some(order.into());
|
|
self
|
|
}
|
|
|
|
pub async fn load<'query, Error, Ids>(
|
|
&mut self,
|
|
len: usize,
|
|
items: Ids,
|
|
on_error: Error,
|
|
) -> std::result::Result<Vec<T>, crate::Error>
|
|
where
|
|
Ids: Iterator<Item = model::RecordId>,
|
|
Error: Fn(sqlx::Error) -> crate::Error,
|
|
{
|
|
let mut res = Vec::new();
|
|
|
|
for ids in items.fold(
|
|
Vec::<Vec<model::RecordId>>::with_capacity(len),
|
|
|mut v, id| {
|
|
if matches!(v.last().map(|v| v.len()), Some(20) | None) {
|
|
v.push(Vec::with_capacity(20));
|
|
}
|
|
v.last_mut().unwrap().push(id);
|
|
v
|
|
},
|
|
) {
|
|
let query: String = self.header.into();
|
|
let mut query = ids.iter().enumerate().fold(query, |mut q, (idx, _id)| {
|
|
if idx != 0 {
|
|
q.push_str(" OR");
|
|
}
|
|
q.push_str(&format!(" {} ${}", self.condition, idx + 1));
|
|
q
|
|
});
|
|
if let Some(s) = self.sort.as_deref() {
|
|
query.push_str("\nORDER BY ");
|
|
query.push_str(s);
|
|
query.push_str(" ");
|
|
}
|
|
let q = sqlx::query_as_with(
|
|
query.as_str(),
|
|
ids.into_iter()
|
|
.fold(sqlx::postgres::PgArguments::default(), |mut args, id| {
|
|
args.add(id);
|
|
args
|
|
}),
|
|
);
|
|
|
|
let records: Vec<T> = match q.fetch_all(&mut *self.pool).await {
|
|
Ok(rec) => rec,
|
|
Err(e) => return Err(on_error(e)),
|
|
};
|
|
res.extend(records);
|
|
}
|
|
|
|
Ok(res)
|
|
}
|
|
}
|