diff --git a/crates/account_manager/src/bin/account-client.rs b/crates/account_manager/src/bin/account-client.rs index 58ff77f..a774a54 100644 --- a/crates/account_manager/src/bin/account-client.rs +++ b/crates/account_manager/src/bin/account-client.rs @@ -1,9 +1,7 @@ -use std::net::SocketAddr; use std::time::Duration; use config::UpdateConfig; -use tarpc::tokio_serde::formats::Json; -use tarpc::{client, context}; +use tarpc::context; use tokio::time::sleep; #[derive(gumdrop::Options)] @@ -17,24 +15,23 @@ impl UpdateConfig for Flags {} #[tokio::main] async fn main() -> std::io::Result<()> { + use channels::accounts::me::Input; + let opts: Flags = gumdrop::Options::parse_args_default_or_exit(); let config = config::default_load(&opts); let client = channels::accounts::rpc::create_client(config).await; - let r = client.me(context::current(), 1.into()).await; + let r = client + .me( + context::current(), + Input { + account_id: 1.into(), + }, + ) + .await; println!("{:?}", r); - let hello = async move { - tokio::join! { - client.me(context::current(), 1.into()), - client.me(context::current(), 2.into()), - } - } - .await; - - eprintln!("{:?}", hello); - // Let the background span processor finish. sleep(Duration::from_micros(1)).await; opentelemetry::global::shutdown_tracer_provider(); diff --git a/crates/api/src/main.rs b/crates/api/src/main.rs index 803f048..343e42e 100644 --- a/crates/api/src/main.rs +++ b/crates/api/src/main.rs @@ -14,7 +14,7 @@ use email_manager::TestMail; use jemallocator::Jemalloc; use model::{Email, Encrypt, Login, PassHash, Password, Role}; use opts::{ - Command, CreateAccountCmd, CreateAccountOpts, GenerateHashOpts, MigrateOpts, Opts, ServerOpts, + Command, CreateAccountCmd, CreateAccountOpts, GenerateHashOpts, Opts, ServerOpts, TestMailerOpts, }; use validator::{validate_email, validate_length}; @@ -58,8 +58,8 @@ async fn server(opts: ServerOpts) -> Result<()> { let fs_manager = fs_manager::FsManager::build(app_config.clone()) .await .expect("Failed to initialize file system storage"); - let cart_manager = cart_manager::CartManager::new(db.clone()).start(); - let account_manager = channels::account::rpc::create_client(app_config.clone()).await; + let cart_manager = channels::carts::rpc::create_client(app_config.clone()).await; + let account_manager = channels::accounts::rpc::create_client(app_config.clone()).await; let addr = { let l = app_config.lock(); let w = l.web(); @@ -105,22 +105,6 @@ async fn server(opts: ServerOpts) -> Result<()> { .map_err(Error::Boot) } -async fn migrate(opts: MigrateOpts) -> Result<()> { - use sqlx::migrate::MigrateError; - - let config = config::default_load(&opts); - let db = database_manager::Database::build(config).await; - let res: std::result::Result<(), MigrateError> = - sqlx::migrate!("../migrations").run(db.pool()).await; - match res { - Ok(()) => Ok(()), - Err(e) => { - eprintln!("{e}"); - std::process::exit(1); - } - } -} - async fn generate_hash(_opts: GenerateHashOpts) -> Result<()> { model::print_hash(); Ok(()) @@ -241,7 +225,6 @@ async fn main() -> Result<()> { let opts: Opts = gumdrop::Options::parse_args_default_or_exit(); match opts.cmd.unwrap_or_default() { - Command::Migrate(opts) => migrate(opts).await, Command::Server(opts) => server(opts).await, Command::GenerateHash(opts) => generate_hash(opts).await, Command::CreateAccount(opts) => create_account(opts).await, diff --git a/crates/api/src/opts.rs b/crates/api/src/opts.rs index 3808466..580b7b4 100644 --- a/crates/api/src/opts.rs +++ b/crates/api/src/opts.rs @@ -34,8 +34,6 @@ impl UpdateConfig for Opts { pub enum Command { #[options(help = "Run server")] Server(ServerOpts), - #[options(help = "Migrate database")] - Migrate(MigrateOpts), #[options(help = "Generate new salt for passwords")] GenerateHash(GenerateHashOpts), #[options(help = "Create new account")] @@ -54,9 +52,6 @@ impl UpdateConfig for Command { Command::Server(opts) => { opts.update_config(config); } - Command::Migrate(opts) => { - opts.update_config(config); - } Command::GenerateHash(opts) => { opts.update_config(config); } diff --git a/crates/api/src/routes/mod.rs b/crates/api/src/routes/mod.rs index 7ae33f5..586668d 100644 --- a/crates/api/src/routes/mod.rs +++ b/crates/api/src/routes/mod.rs @@ -40,7 +40,6 @@ pub enum Error { CriticalFailure, Public(public::Error), Admin(admin::Error), - Cart(cart_manager::Error), Database(database_manager::Error), Email(email_manager::Error), Fs(fs_manager::Error), @@ -78,7 +77,6 @@ impl Display for Error { }) .unwrap_or_default(), Error::CriticalFailure => String::from("Something went wrong"), - Error::Cart(_e) => serde_json::to_string(&self).unwrap_or_default(), Error::Database(_e) => serde_json::to_string(&self).unwrap_or_default(), Error::Email(_e) => serde_json::to_string(&self).unwrap_or_default(), Error::Fs(_e) => serde_json::to_string(&self).unwrap_or_default(), @@ -100,7 +98,6 @@ impl ResponseError for Error { } Error::Admin(_) => StatusCode::BAD_REQUEST, Error::Public(_) => StatusCode::BAD_REQUEST, - Error::Cart(_) => StatusCode::BAD_REQUEST, Error::Database(_) => StatusCode::BAD_REQUEST, Error::Email(_) => StatusCode::BAD_REQUEST, Error::Fs(_) => StatusCode::BAD_REQUEST, diff --git a/crates/api/src/routes/public/api_v1/restricted.rs b/crates/api/src/routes/public/api_v1/restricted.rs index 86ef6cd..5b0603c 100644 --- a/crates/api/src/routes/public/api_v1/restricted.rs +++ b/crates/api/src/routes/public/api_v1/restricted.rs @@ -2,7 +2,6 @@ use actix::Addr; use actix_web::web::{scope, Data, Json, ServiceConfig}; use actix_web::{delete, get, post, put, HttpRequest, HttpResponse}; use actix_web_httpauth::extractors::bearer::BearerAuth; -use cart_manager::{query_cart, CartManager}; use database_manager::{query_db, Database}; use model::api; use order_manager::{query_order, OrderManager}; @@ -96,24 +95,32 @@ async fn shopping_cart( #[put("/shopping-cart-item")] async fn update_cart_item( - cart: Data>, + cart: Data, tm: Data>, credentials: BearerAuth, Json(payload): Json, ) -> Result> { + use channels::carts::modify_item::Input; let token = credentials.require_user(tm.into_inner()).await?; - let item: Option = query_cart!( - cart, - cart_manager::ModifyItem { - buyer_id: token.account_id(), - product_id: payload.product_id, - quantity: payload.quantity, - quantity_unit: payload.quantity_unit, - }, - routes::Error::Public(super::Error::ModifyItem.into()), - routes::Error::Public(PublicError::DatabaseConnection) - ); + let item = match cart + .modify_item( + tarpc::context::current(), + Input { + buyer_id: token.account_id(), + product_id: payload.product_id, + quantity: payload.quantity, + quantity_unit: payload.quantity_unit, + }, + ) + .await + { + Ok(res) => res.item, + Err(e) => { + tracing::error!("{}", e); + return Err(routes::Error::CriticalFailure); + } + }; match item { Some(item) => Ok(Json(api::UpdateItemOutput { @@ -125,40 +132,63 @@ async fn update_cart_item( #[put("/shopping-cart")] async fn update_cart( - cart: Data>, + cart: Data, tm: Data>, credentials: BearerAuth, Json(payload): Json, ) -> Result> { let token = credentials.require_user(tm.into_inner()).await?; - let items = payload - .items - .into_iter() - .map( - |model::api::UpdateItemInput { - product_id, - quantity, - quantity_unit, - }| cart_manager::ModifyItem { - buyer_id: token.account_id(), - product_id, - quantity, - quantity_unit, - }, - ) - .collect(); + let items = { + use channels::carts::modify_item::Input; + payload + .items + .into_iter() + .map( + |model::api::UpdateItemInput { + product_id, + quantity, + quantity_unit, + }| Input { + buyer_id: token.account_id(), + product_id, + quantity, + quantity_unit, + }, + ) + .collect() + }; - let res: cart_manager::ModifyCartResult = query_cart!( - cart, - cart_manager::ModifyCart { - buyer_id: token.account_id(), - items, - checkout_notes: payload.notes, - payment_method: payload.payment_method, - }, - routes::Error::Public(super::Error::ModifyItem.into()), - routes::Error::Public(PublicError::DatabaseConnection) - ); + use channels::carts::modify_cart::{Input, Output}; + let res = { + match cart + .modify_cart( + tarpc::context::current(), + Input { + buyer_id: token.account_id(), + items, + checkout_notes: payload.notes, + payment_method: payload.payment_method, + }, + ) + .await + { + Ok(Output { + cart: Some(cart), .. + }) => cart, + Ok(Output { error: Some(e), .. }) => { + tracing::error!("{}", e); + return Err(routes::Error::Public(super::Error::ModifyItem.into())); + } + Ok(out) => { + tracing::error!("invalid output {:?}", out); + return Err(routes::Error::Public(super::Error::ModifyItem.into())); + } + Err(e) => { + tracing::error!("{}", e); + return Err(routes::Error::Public(super::Error::ModifyItem.into())); + } + } + }; Ok(Json(api::UpdateCartOutput { cart_id: res.cart_id, @@ -170,46 +200,68 @@ async fn update_cart( #[delete("/shopping-cart-item")] async fn delete_cart_item( - db: Data>, - cart: Data>, + cart: Data, tm: Data>, credentials: BearerAuth, Json(payload): Json, ) -> Result { let token = credentials.require_user(tm.into_inner()).await?; - let sc: model::ShoppingCart = query_db!( - db, - database_manager::EnsureActiveShoppingCart { - buyer_id: token.account_id(), - }, - routes::Error::Public(super::Error::RemoveItem.into()), - routes::Error::Public(PublicError::DatabaseConnection) - ); - - match cart - .into_inner() - .send(cart_manager::RemoveProduct { - shopping_cart_id: sc.id, - shopping_cart_item_id: payload.shopping_cart_item_id, - }) - .await - { - Ok(Ok(_)) => Ok(HttpResponse::Ok().json(api::DeleteItemOutput { success: true })), - Ok(Err(e)) => { - tracing::error!("{e}"); - Ok(HttpResponse::BadRequest().json(api::DeleteItemOutput { success: false })) + let sc = { + use channels::carts::active_shopping_cart::{Input, Output}; + match cart + .active_shopping_cart( + tarpc::context::current(), + Input { + buyer_id: token.account_id(), + }, + ) + .await + { + Ok(Output { + cart: Some(cart), .. + }) => cart, + Ok(Output { error: Some(e), .. }) => { + tracing::error!("{}", e); + return Err(routes::Error::Public(super::Error::ModifyItem.into())); + } + Ok(out) => { + tracing::error!("invalid output {:?}", out); + return Err(routes::Error::Public(super::Error::ModifyItem.into())); + } + Err(e) => { + tracing::error!("{e}"); + return Ok( + HttpResponse::BadRequest().json(api::DeleteItemOutput { success: false }) + ); + } } - Err(e) => { - tracing::error!("{e:?}"); - Err(routes::Error::Public(PublicError::DatabaseConnection)) + }; + + { + use channels::carts::remove_product::Input; + match cart + .remove_product( + tarpc::context::current(), + Input { + shopping_cart_id: sc.id, + shopping_cart_item_id: payload.shopping_cart_item_id, + }, + ) + .await + { + Ok(_) => Ok(HttpResponse::Ok().json(api::DeleteItemOutput { success: true })), + Err(e) => { + tracing::error!("{e:?}"); + Err(routes::Error::Public(PublicError::DatabaseConnection)) + } } } } #[get("/me")] pub(crate) async fn me( - account: Data, + account: Data, tm: Data>, credentials: BearerAuth, ) -> routes::Result> { @@ -218,7 +270,13 @@ pub(crate) async fn me( .await? .account_id(); - match account.me(tarpc::context::current(), account_id).await { + match account + .me( + tarpc::context::current(), + channels::accounts::me::Input { account_id }, + ) + .await + { Ok(me) => Ok(Json((me.account.unwrap(), me.addresses.unwrap()).into())), Err(e) => { tracing::error!("{}", e); diff --git a/crates/cart_manager/src/actions.rs b/crates/cart_manager/src/actions.rs index c073e7b..9acfb33 100644 --- a/crates/cart_manager/src/actions.rs +++ b/crates/cart_manager/src/actions.rs @@ -248,3 +248,28 @@ pub async fn modify_cart( }) ) } + +pub async fn active_shopping_cart( + input: carts::active_shopping_cart::Input, + db: Database, +) -> carts::active_shopping_cart::Output { + use carts::active_shopping_cart::Output; + + tracing::debug!("{:?}", input); + + let mut t = begin_t!(db); + + let dbm = crate::db::EnsureActiveShoppingCart { + buyer_id: input.buyer_id, + }; + let cart = match dbm.run(&mut t).await { + Ok(cart) => cart, + Err(e) => { + tracing::error!("{}", e); + t.rollback().await.ok(); + return Output::error(Error::NoActiveCart); + } + }; + + end_t!(t, Output::cart(cart)) +} diff --git a/crates/cart_manager/src/rpc.rs b/crates/cart_manager/src/rpc.rs index 4688dfe..60c670f 100644 --- a/crates/cart_manager/src/rpc.rs +++ b/crates/cart_manager/src/rpc.rs @@ -33,13 +33,21 @@ impl Carts for CartsServer { crate::actions::modify_cart(input, self.db).await } - async fn remove_cart( + async fn remove_product( self, _: context::Context, input: channels::carts::remove_product::Input, ) -> channels::carts::remove_product::Output { crate::actions::remove_product(input, self.db).await } + + async fn active_shopping_cart( + self, + _: context::Context, + input: channels::carts::active_shopping_cart::Input, + ) -> channels::carts::active_shopping_cart::Output { + crate::actions::active_shopping_cart(input, self.db).await + } } pub async fn start(config: SharedAppConfig, db: Database, mqtt_client: AsyncClient) { diff --git a/crates/channels/src/accounts.rs b/crates/channels/src/accounts.rs index c392793..cabe01a 100644 --- a/crates/channels/src/accounts.rs +++ b/crates/channels/src/accounts.rs @@ -78,27 +78,6 @@ pub mod register { } } -// #[derive(Debug, thiserror::Error, serde::Serialize, serde::Deserialize)] -// pub enum AccountFailure { -// #[error("Failed to hash password")] -// FailedToHashPassword, -// #[error("Failed to save account")] -// SaveAccount, -// #[error("Internal server error")] -// InternalServerError, -// } -// -// impl TryFrom for AccountFailure { -// type Error = Error; -// -// fn try_from(value: bytes::Bytes) -> Result { -// bincode::deserialize(value.as_ref()).map_err(|e| { -// tracing::error!("{}", e); -// Error::InvalidAccountFailure -// }) -// } -// } - pub mod me { #[derive(Debug, serde::Serialize, serde::Deserialize)] pub struct Input { diff --git a/crates/channels/src/carts.rs b/crates/channels/src/carts.rs index 7be0483..1d20cab 100644 --- a/crates/channels/src/carts.rs +++ b/crates/channels/src/carts.rs @@ -130,8 +130,41 @@ pub mod modify_cart { } } +pub mod active_shopping_cart { + use super::Error; + + #[derive(Debug, serde::Serialize, serde::Deserialize)] + pub struct Input { + pub buyer_id: model::AccountId, + } + + #[derive(Debug, Default, serde::Serialize, serde::Deserialize)] + pub struct Output { + pub error: Option, + pub cart: Option, + } + + impl Output { + pub fn error(error: Error) -> Self { + Self { + error: Some(error), + ..Default::default() + } + } + + pub fn cart(cart: model::ShoppingCart) -> Self { + Self { + cart: Some(cart), + ..Default::default() + } + } + } +} + pub mod rpc { - use super::{modify_cart, modify_item, remove_product}; + use config::SharedAppConfig; + + use super::{active_shopping_cart, modify_cart, modify_item, remove_product}; #[tarpc::service] pub trait Carts { @@ -141,7 +174,31 @@ pub mod rpc { /// Change entire shopping cart content. async fn modify_cart(input: modify_cart::Input) -> modify_cart::Output; - /// Remove entire shopping cart. - async fn remove_cart(input: remove_product::Input) -> remove_product::Output; + /// Remove product from shopping cart. + async fn remove_product(input: remove_product::Input) -> remove_product::Output; + + async fn active_shopping_cart( + input: active_shopping_cart::Input, + ) -> active_shopping_cart::Output; + } + + pub async fn create_client(config: SharedAppConfig) -> CartsClient { + use tarpc::client; + use tarpc::tokio_serde::formats::Bincode; + + let addr = { + let l = config.lock(); + (l.account_manager().bind.clone(), l.account_manager().port) + }; + + let transport = tarpc::serde_transport::tcp::connect(addr, Bincode::default); + + let client = CartsClient::new( + client::Config::default(), + transport.await.expect("Failed to connect to server"), + ) + .spawn(); + + client } } diff --git a/crates/search_manager/src/lib.rs b/crates/search_manager/src/lib.rs index 81f42c3..0da862e 100644 --- a/crates/search_manager/src/lib.rs +++ b/crates/search_manager/src/lib.rs @@ -5,7 +5,11 @@ use sonic_channel::{Dest, ObjDest, PushRequest, QueryRequest, SonicChannel}; #[macro_export] macro_rules! search_async_handler { - ($msg: ty, $async: ident, $res: ty) => { + ($msg: ty, async fn call($($argv: ident : $arg_t: ty,)*) -> Result> $body: block) => { + impl $msg { + async fn call ( $($argv : $arg_t,)* ) -> Result> $body + } + impl actix::Handler<$msg> for SearchManager { type Result = actix::ResponseActFuture>>; @@ -14,13 +18,13 @@ macro_rules! search_async_handler { match self.channels.clone() { Some(channels) => { let config = self.config.clone(); - Box::pin(async { $async(msg, channels, config).await }.into_actor(self)) + Box::pin(async { <$msg>::call(msg, channels, config).await }.into_actor(self)) } None => Box::pin(async { Ok(None) }.into_actor(self)), } } } - }; + } } #[derive(Debug, Copy, Clone, serde::Serialize, thiserror::Error)] @@ -91,28 +95,31 @@ pub struct Search { pub lang: String, } -search_async_handler!(Search, search, Vec); +pub type StringList = Vec; -pub(crate) async fn search( - msg: Search, - channels: Channels, - _config: SharedAppConfig, -) -> Result>> { - if let Ok(l) = channels.search.lock() { - match l.query(QueryRequest::new( - Dest::col_buc(msg.collection, msg.lang), - &msg.query, - )) { - Ok(res) => Ok(Some(res)), - Err(e) => { - tracing::error!("{e:?}"); - Err(Error::QueryFailed) +search_async_handler!( + Search, + async fn call( + msg: Search, + channels: Channels, + _config: SharedAppConfig, + ) -> Result> { + if let Ok(l) = channels.search.lock() { + match l.query(QueryRequest::new( + Dest::col_buc(msg.collection, msg.lang), + &msg.query, + )) { + Ok(res) => Ok(Some(res)), + Err(e) => { + tracing::error!("{e:?}"); + Err(Error::QueryFailed) + } } + } else { + Ok(None) } - } else { - Ok(None) } -} +); #[derive(actix::Message)] #[rtype(result = "Result>")] @@ -123,25 +130,26 @@ pub struct CreateIndex { pub lang: String, } -search_async_handler!(CreateIndex, create_index, ()); - -pub(crate) async fn create_index( - msg: CreateIndex, - channels: Channels, - _config: SharedAppConfig, -) -> Result> { - if let Ok(l) = channels.ingest.lock() { - match l.push(PushRequest::new( - ObjDest::new(Dest::col_buc(msg.collection, msg.lang), &msg.key), - &msg.value, - )) { - Ok(_) => Ok(Some(())), - Err(e) => { - tracing::error!("{e:?}"); - Err(Error::CantCreate) +search_async_handler!( + CreateIndex, + async fn call( + msg: CreateIndex, + channels: Channels, + _config: SharedAppConfig, + ) -> Result> { + if let Ok(l) = channels.ingest.lock() { + match l.push(PushRequest::new( + ObjDest::new(Dest::col_buc(msg.collection, msg.lang), &msg.key), + &msg.value, + )) { + Ok(_) => Ok(Some(())), + Err(e) => { + tracing::error!("{e:?}"); + Err(Error::CantCreate) + } } + } else { + Ok(Some(())) } - } else { - Ok(Some(())) } -} +);