bazzar/actors/database_manager/src/lib.rs

352 lines
9.9 KiB
Rust
Raw Normal View History

2022-04-18 22:07:52 +02:00
use actix::{Actor, Context};
2022-05-06 11:47:18 +02:00
use config::SharedAppConfig;
2022-04-18 22:07:52 +02:00
use sqlx::PgPool;
2022-05-12 20:33:16 +02:00
use sqlx_core::arguments::Arguments;
2022-05-06 16:02:38 +02:00
pub use crate::account_addresses::*;
2022-05-06 16:02:38 +02:00
pub use crate::accounts::*;
pub use crate::order_addresses::*;
2022-05-06 16:02:38 +02:00
pub use crate::order_items::*;
pub use crate::orders::*;
2022-05-06 16:02:38 +02:00
pub use crate::photos::*;
pub use crate::product_photos::*;
pub use crate::products::*;
pub use crate::shopping_cart_items::*;
pub use crate::shopping_carts::*;
pub use crate::stocks::*;
pub use crate::tokens::*;
2022-04-14 21:40:26 +02:00
pub mod account_addresses;
2022-04-16 18:57:37 +02:00
pub mod accounts;
pub mod order_addresses;
2022-04-16 18:57:37 +02:00
pub mod order_items;
pub mod orders;
2022-05-06 16:02:38 +02:00
pub mod photos;
pub mod product_photos;
2022-04-16 18:57:37 +02:00
pub mod products;
pub mod shopping_cart_items;
pub mod shopping_carts;
pub mod stocks;
2022-04-18 22:07:52 +02:00
pub mod tokens;
2022-04-14 21:40:26 +02:00
2022-04-20 14:30:59 +02:00
#[macro_export]
macro_rules! db_async_handler {
($msg: ty, $async: ident, $res: ty) => {
impl actix::Handler<$msg> for $crate::Database {
2022-04-20 14:30:59 +02:00
type Result = actix::ResponseActFuture<Self, Result<$res>>;
fn handle(&mut self, msg: $msg, _ctx: &mut Self::Context) -> Self::Result {
use actix::WrapFuture;
let pool = self.pool.clone();
Box::pin(async { $async(msg, pool).await }.into_actor(self))
}
}
};
2022-05-04 22:26:10 +02:00
($msg: ty, $async: ident, $res: ty, $inner_async: ident) => {
async fn $inner_async(msg: $msg, pool: sqlx::PgPool) -> Result<$res> {
2022-05-22 14:19:11 +02:00
let mut t = pool.begin().await.map_err(|e| {
tracing::error!("{:?}", e);
2022-05-22 14:19:11 +02:00
$crate::Error::TransactionFailed
})?;
2022-05-04 22:26:10 +02:00
match $async(msg, &mut t).await {
Ok(res) => {
2022-05-22 14:19:11 +02:00
t.commit().await.map_err(|e| {
tracing::error!("{:?}", e);
2022-05-22 14:19:11 +02:00
$crate::Error::TransactionFailed
})?;
2022-05-04 22:26:10 +02:00
Ok(res)
}
Err(e) => {
let _ = t.rollback().await;
Err(e)
}
}
}
impl actix::Handler<$msg> for $crate::Database {
2022-05-04 22:26:10 +02:00
type Result = actix::ResponseActFuture<Self, Result<$res>>;
fn handle(&mut self, msg: $msg, _ctx: &mut Self::Context) -> Self::Result {
use actix::WrapFuture;
let pool = self.pool.clone();
Box::pin(async { $inner_async(msg, pool).await }.into_actor(self))
}
}
#[async_trait::async_trait]
impl $crate::Queue<$msg> for $crate::Transaction {
type Result = $res;
async fn handle(&mut self, msg: $msg) -> Result<Self::Result> {
let t = &mut self.t;
$async(msg, t).await
}
}
2022-05-04 22:26:10 +02:00
};
2022-04-20 14:30:59 +02:00
}
2022-05-04 07:26:29 +02:00
#[macro_export]
macro_rules! query_db {
($db: expr, $msg: expr, default $fail: expr) => {
match $db.send($msg).await {
Ok(Ok(r)) => r,
Ok(Err(e)) => {
tracing::error!("{e}");
2022-05-04 07:26:29 +02:00
$fail
}
Err(e) => {
tracing::error!("{e:?}");
2022-05-04 07:26:29 +02:00
$fail
}
}
};
($db: expr, $msg: expr, $fail: expr) => {
$crate::query_db!($db, $msg, $fail, $fail)
};
($db: expr, $msg: expr, $db_fail: expr, $act_fail: expr) => {
match $db.send($msg).await {
Ok(Ok(r)) => r,
Ok(Err(e)) => {
tracing::error!("{e}");
2022-05-04 07:26:29 +02:00
return Err($db_fail);
}
Err(e) => {
tracing::error!("{e:?}");
2022-05-04 07:26:29 +02:00
return Err($act_fail);
}
}
};
2022-05-15 10:30:15 +02:00
($db: expr, $msg: expr, passthrough $db_fail: expr, $act_fail: expr) => {
match $db.send($msg).await {
Ok(Ok(r)) => r,
Ok(Err(e)) => {
tracing::error!("{e}");
2022-05-15 10:30:15 +02:00
return Err($db_fail(e.into()));
}
Err(e) => {
tracing::error!("{e:?}");
2022-05-15 10:30:15 +02:00
return Err($act_fail);
}
}
};
2022-05-04 07:26:29 +02:00
}
#[derive(Debug, Copy, Clone, PartialEq, Eq, serde::Serialize, thiserror::Error)]
2022-05-22 14:19:11 +02:00
#[serde(rename_all = "kebab-case")]
2022-04-14 21:40:26 +02:00
pub enum Error {
2022-04-18 22:07:52 +02:00
#[error("{0}")]
Account(#[from] accounts::Error),
2022-04-14 21:40:26 +02:00
#[error("{0}")]
AccountOrder(#[from] orders::Error),
2022-04-14 21:40:26 +02:00
#[error("{0}")]
2022-04-18 22:07:52 +02:00
Product(#[from] products::Error),
#[error("{0}")]
2022-04-18 22:07:52 +02:00
Stock(#[from] stocks::Error),
#[error("{0}")]
2022-04-18 22:07:52 +02:00
OrderItem(#[from] order_items::Error),
#[error("{0}")]
2022-04-18 22:07:52 +02:00
ShoppingCart(#[from] shopping_carts::Error),
2022-04-16 09:30:11 +02:00
#[error("{0}")]
2022-04-18 22:07:52 +02:00
ShoppingCartItem(#[from] shopping_cart_items::Error),
2022-04-16 12:48:38 +02:00
#[error("{0}")]
2022-04-18 22:07:52 +02:00
Token(#[from] tokens::Error),
2022-05-06 16:02:38 +02:00
#[error("{0}")]
Photo(#[from] photos::Error),
#[error("{0}")]
ProductPhoto(#[from] product_photos::Error),
2022-05-19 16:13:27 +02:00
#[error("{0}")]
AccountAddress(#[from] account_addresses::Error),
#[error("{0}")]
OrderAddress(#[from] order_addresses::Error),
2022-05-22 14:19:11 +02:00
#[error("Failed to start or finish transaction")]
TransactionFailed,
2022-04-14 21:40:26 +02:00
}
pub type Result<T> = std::result::Result<T, Error>;
2022-11-03 07:02:33 +01:00
#[derive(Clone)]
2022-04-14 21:40:26 +02:00
pub struct Database {
pool: PgPool,
config: SharedAppConfig,
2022-04-14 21:40:26 +02:00
}
2022-04-20 14:30:59 +02:00
pub type SharedDatabase = actix::Addr<Database>;
2022-04-14 21:40:26 +02:00
impl Database {
2022-05-22 14:19:11 +02:00
pub async fn build(config: SharedAppConfig) -> Self {
let url = config.lock().database().url();
2022-05-22 14:19:11 +02:00
let pool = PgPool::connect(&url).await.unwrap_or_else(|e| {
tracing::error!("Failed to connect to database. {e:?}");
2022-05-22 14:19:11 +02:00
std::process::exit(1);
});
Self { pool, config }
2022-04-14 21:40:26 +02:00
}
pub fn pool(&self) -> &PgPool {
&self.pool
}
pub async fn begin(&self) -> sqlx::Result<Transaction> {
Ok(Transaction {
t: self.pool.begin().await?,
})
}
2022-04-14 21:40:26 +02:00
}
impl Actor for Database {
type Context = Context<Self>;
}
2022-05-12 20:33:16 +02:00
pub struct Transaction {
t: sqlx::Transaction<'static, sqlx::Postgres>,
}
impl Transaction {
pub async fn commit(self) -> Result<()> {
self.t.commit().await.map_err(|e| {
tracing::error!("{e:?}");
dbg!(e);
Error::TransactionFailed
})?;
Ok(())
}
pub async fn rollback(self) -> Result<()> {
self.t.rollback().await.map_err(|e| {
tracing::error!("{e:?}");
dbg!(e);
Error::TransactionFailed
})?;
Ok(())
}
}
#[async_trait::async_trait]
pub trait Queue<Msg> {
type Result;
async fn handle(&mut self, msg: Msg) -> Result<Self::Result>;
}
2022-05-12 20:33:16 +02:00
/// Multi-query load for large amount of records to read
///
/// Examples
///
/// ```
/// # use database_manager::photos::Error;
/// async fn load(pool: sqlx::PgPool) {
2022-05-12 20:33:16 +02:00
/// use database_manager::MultiLoad;
/// let mut t = pool.begin().await.unwrap();
2022-05-12 20:33:16 +02:00
/// let mut multi = MultiLoad::new(
/// &mut t,
/// "SELECT id, name FROM products WHERE ",
/// " id = "
/// )
/// // order by id
/// .with_sorting("id ASC")
/// // 100 rows per db query
/// .with_size(100);
/// let products: Vec<model::Product> = multi.load(4, vec![1, 2, 3, 4].into_iter(), |_| Error::All.into())
/// .await.unwrap();
2022-05-12 20:33:16 +02:00
/// t.commit().await.unwrap();
/// }
/// ```
pub struct MultiLoad<'transaction, 'transaction2, 'header, 'condition, T> {
pool: &'transaction mut sqlx::Transaction<'transaction2, sqlx::Postgres>,
header: &'header str,
condition: &'condition str,
2022-06-06 15:09:13 +02:00
sort: Option<String>,
size: usize,
2022-05-12 20:33:16 +02:00
__phantom: std::marker::PhantomData<T>,
}
impl<'transaction, 'transaction2, 'header, 'condition, T>
MultiLoad<'transaction, 'transaction2, 'header, 'condition, T>
where
T: for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow> + Send + Unpin,
{
pub fn new(
pool: &'transaction mut sqlx::Transaction<'transaction2, sqlx::Postgres>,
header: &'header str,
condition: &'condition str,
) -> Self {
Self {
pool,
header,
condition,
2022-06-06 15:09:13 +02:00
sort: None,
size: 20,
2022-05-12 20:33:16 +02:00
__phantom: Default::default(),
}
}
2022-06-06 15:09:13 +02:00
pub fn with_sorting<S: Into<String>>(mut self, order: S) -> Self {
self.sort = Some(order.into());
self
}
pub fn with_size(mut self, size: usize) -> Self {
self.size = size;
self
}
2022-05-12 20:33:16 +02:00
pub async fn load<'query, Error, Ids>(
&mut self,
len: usize,
items: Ids,
on_error: Error,
) -> std::result::Result<Vec<T>, crate::Error>
where
Ids: Iterator<Item = model::RecordId>,
Error: Fn(sqlx::Error) -> crate::Error,
{
let mut res = Vec::new();
let size = self.size;
2022-05-12 20:33:16 +02:00
for ids in items.fold(
Vec::<Vec<model::RecordId>>::with_capacity(len),
|mut v, id| {
let last_len = v.last().map(|v| v.len());
if last_len == Some(size) || last_len == None {
v.push(Vec::with_capacity(size));
2022-05-12 20:33:16 +02:00
}
v.last_mut().unwrap().push(id);
v
},
) {
let query: String = self.header.into();
2022-06-06 15:09:13 +02:00
let mut query = ids.iter().enumerate().fold(query, |mut q, (idx, _id)| {
2022-05-12 20:33:16 +02:00
if idx != 0 {
q.push_str(" OR");
}
q.push_str(&format!(" {} ${}", self.condition, idx + 1));
q
});
2022-06-06 15:09:13 +02:00
if let Some(s) = self.sort.as_deref() {
query.push_str("\nORDER BY ");
query.push_str(s);
query.push(' ');
2022-06-06 15:09:13 +02:00
}
2022-05-12 20:33:16 +02:00
let q = sqlx::query_as_with(
query.as_str(),
ids.into_iter()
.fold(sqlx::postgres::PgArguments::default(), |mut args, id| {
args.add(id);
args
}),
);
let records: Vec<T> = match q.fetch_all(&mut *self.pool).await {
Ok(rec) => rec,
Err(e) => return Err(on_error(e)),
};
res.extend(records);
}
Ok(res)
}
}