diff --git a/crates/channels/src/payments.rs b/crates/channels/src/payments.rs index 277d7f0..54226f7 100644 --- a/crates/channels/src/payments.rs +++ b/crates/channels/src/payments.rs @@ -9,6 +9,12 @@ pub static CLIENT_NAME: &str = "payments"; pub enum Error { #[error("Something went wrong")] InternalServerError, + #[error("Unknown payment adapter {0:?}")] + UnknownAdapter(String), + #[error("Payment failed")] + PaymentFailed, + #[error("Payment adapter returned invalid response data")] + MalformedCreatePaymentResult, } #[derive(Debug, Copy, Clone)] @@ -147,7 +153,7 @@ pub mod rpc { use tarpc::tokio_serde::formats::Bincode; let l = config.lock(); - let addr = l.orders_manager().rpc_addr(); + let addr = l.payment().rpc_addr(); let transport = tarpc::serde_transport::tcp::connect(addr, Bincode::default); @@ -169,6 +175,6 @@ pub mod mqtt { use crate::AsyncClient; pub fn create_client(config: SharedAppConfig) -> (AsyncClient, EventLoop) { - crate::mqtt::create_client(CLIENT_NAME, config.lock().payments_manager().mqtt_addr()) + crate::mqtt::create_client(CLIENT_NAME, config.lock().payment().mqtt_addr()) } } diff --git a/crates/payment_manager/src/main.rs b/crates/payment_manager/src/main.rs index 6b99a35..ad56688 100644 --- a/crates/payment_manager/src/main.rs +++ b/crates/payment_manager/src/main.rs @@ -2,7 +2,8 @@ use std::collections::HashMap; use std::fs::read_dir; use std::path::PathBuf; use std::str::FromStr; -use std::sync::{Arc, RwLock}; +use std::sync::mpsc::Sender; +use std::sync::{Arc, LockResult, MutexGuard, RwLock, RwLockReadGuard}; use config::{AppConfig, UpdateConfig}; use payment_adapter::{HttpMethod, HttpRequest, Item, Product}; @@ -18,7 +19,79 @@ mod mqtt; mod rpc; // pub mod t_pay_adapter; -pub type Modules = Arc>>; +#[derive(Clone)] +pub enum WasmMsg { + CreateOrder { + create_payment: payment_adapter::CreatePayment, + callback: std::sync::mpsc::Sender, + }, + UpdateStatus(Vec), +} + +#[derive(Clone)] +pub enum ResultMsg { + OrderCreated(payment_adapter::OrderCreated), +} + +pub struct HostChannel { + pub host: HostPool, + pub channel: std::sync::mpsc::Receiver, +} + +impl HostChannel { + pub fn start(self) { + tokio::spawn(async move { + let HostChannel { + host: pool, + channel, + } = self; + + loop { + let msg = match channel.recv() { + Ok(msg) => msg, + _ => continue, + }; + match msg { + WasmMsg::CreateOrder { + create_payment: msg, + callback, + } => { + let msg = match wapc_codec::messagepack::serialize(msg) { + Ok(r) => r, + _ => continue, + }; + let call_result = match pool.call("create_payment", msg).await { + Ok(r) => r, + _ => continue, + }; + let result: payment_adapter::OrderCreated = + match wapc_codec::messagepack::deserialize(&call_result) { + Ok(r) => r, + _ => continue, + }; + if let Err(e) = callback.send(ResultMsg::OrderCreated(result)) { + tracing::error!("{e:?}"); + } + } + WasmMsg::UpdateStatus(_) => {} + } + } + }); + } +} + +#[derive(Clone)] +pub struct Modules(Arc>>>); + +impl Modules { + pub fn new(h: HashMap>) -> Self { + Self(Arc::new(std::sync::Mutex::new(h))) + } + + pub fn lock(&self) -> LockResult>>> { + self.0.lock() + } +} #[derive(gumdrop::Options)] pub struct Opts { @@ -107,54 +180,63 @@ async fn main() { .await .unwrap(); - modules.insert(name, pool); + let (tx, rx) = std::sync::mpsc::channel(); + HostChannel { + host: pool, + channel: rx, + } + .start(); + + modules.insert(name, tx); } - for pool in modules.values() { - let msg = payment_adapter::CreatePayment { - buyer: payment_adapter::Buyer { - email: "hello@example.com".to_string(), - phone: "530698478".to_string(), - first_name: "Joe".to_string(), - last_name: "Doe".to_string(), - language: "pl".to_string(), - }, - customer_ip: "12.22.34.54".to_string(), - currency: "PLN".to_string(), - description: "Nesciunt fugit libero quis dolorum quo. Tempore aut nisi voluptatem. Odio et aspernatur est. Sint vel molestias sunt cumque quibusdam reprehenderit est.".to_string(), - cart_products: vec![Product { - id: 23, - name: "Socks".to_string(), - unit_price: 1542, - quantity_unit: "Unit".to_string(), - quantity: 2, - }], - items: vec![Item { - product_id: 23, - quantity: 2, - quantity_unit: "Unit".to_string(), - }], - order_ext_id: None, - notify_uri: "https://localhost:3030/notify_uri".to_string(), - continue_uri: "https://localhost:3030/continue_uri".to_string(), - }; - - tracing::info!("Start create_payment"); - let call_result = pool - .call( - "create_payment", - wapc_codec::messagepack::serialize(msg).unwrap(), - ) - .await - .unwrap(); - let result: payment_adapter::OrderCreated = - wapc_codec::messagepack::deserialize(&call_result).unwrap(); - - tracing::info!("create payment res {:?}", result) - } + // for pool in modules.values() { + // let msg = payment_adapter::CreatePayment { + // buyer: payment_adapter::Buyer { + // email: "hello@example.com".to_string(), + // phone: "530698478".to_string(), + // first_name: "Joe".to_string(), + // last_name: "Doe".to_string(), + // language: "pl".to_string(), + // }, + // customer_ip: "12.22.34.54".to_string(), + // currency: "PLN".to_string(), + // description: "Nesciunt fugit libero quis dolorum quo. + // Tempore aut nisi voluptatem. Odio et aspernatur est. Sint vel + // molestias sunt cumque quibusdam reprehenderit est.".to_string(), + // cart_products: vec![Product { + // id: 23, + // name: "Socks".to_string(), + // unit_price: 1542, + // quantity_unit: "Unit".to_string(), + // quantity: 2, + // }], + // items: vec![Item { + // product_id: 23, + // quantity: 2, + // quantity_unit: "Unit".to_string(), + // }], + // order_ext_id: None, + // notify_uri: "https://localhost:3030/notify_uri".to_string(), + // continue_uri: "https://localhost:3030/continue_uri".to_string(), + // }; + // + // tracing::info!("Start create_payment"); + // let call_result = pool + // .call( + // "create_payment", + // wapc_codec::messagepack::serialize(msg).unwrap(), + // ) + // .await + // .unwrap(); + // let result: payment_adapter::OrderCreated = + // wapc_codec::messagepack::deserialize(&call_result).unwrap(); + // + // tracing::info!("create payment res {:?}", result) + // } } - let modules = Arc::new(RwLock::new(modules)); + let modules = Modules::new(modules); let mqtt_client = mqtt::start(config.clone(), modules.clone()).await; rpc::start(config, mqtt_client, modules.clone()).await; } diff --git a/crates/payment_manager/src/mqtt.rs b/crates/payment_manager/src/mqtt.rs index 3be77fd..90de357 100644 --- a/crates/payment_manager/src/mqtt.rs +++ b/crates/payment_manager/src/mqtt.rs @@ -1,7 +1,6 @@ use config::SharedAppConfig; use rumqttc::{Event, Incoming}; -use crate::db::Database; use crate::Modules; pub async fn start(config: SharedAppConfig, _modules: Modules) -> channels::AsyncClient { diff --git a/crates/payment_manager/src/rpc.rs b/crates/payment_manager/src/rpc.rs index 2439d4c..1ed0b36 100644 --- a/crates/payment_manager/src/rpc.rs +++ b/crates/payment_manager/src/rpc.rs @@ -1,12 +1,14 @@ +use std::sync::mpsc::RecvError; + use channels::payments::rpc::Payments; -use channels::payments::start_payment::{Input, Output}; use channels::payments::{adapters, cancel, notification, start_payment}; use channels::AsyncClient; use config::SharedAppConfig; use tarpc::context; -use crate::Modules; +use crate::{Modules, ResultMsg, WasmMsg}; +#[derive(Clone)] pub struct PaymentsServer { pub config: SharedAppConfig, pub mqtt_client: AsyncClient, @@ -15,8 +17,50 @@ pub struct PaymentsServer { #[tarpc::server] impl Payments for PaymentsServer { - async fn start_payment(self, _: context::Context, input: Input) -> Output { - todo!() + async fn start_payment( + self, + _: context::Context, + input: start_payment::Input, + ) -> start_payment::Output { + use channels::payments::Error; + let start_payment::Input { + adapter_name, + create_payment, + } = input; + + // let (tx, rx) = std::sync::mpsc::channel(); + + let modules = self.modules.clone(); + let lock = modules.lock().map_err(|e| { + tracing::error!("{e}"); + Error::InternalServerError + })?; + let module = lock + .get(&adapter_name) + .ok_or_else(|| Error::UnknownAdapter(adapter_name))? + .clone(); + + let (tx, rx) = std::sync::mpsc::channel(); + module + .send(WasmMsg::CreateOrder { + create_payment, + callback: tx, + }) + .map_err(|e| { + tracing::error!("{e}"); + Error::PaymentFailed + })?; + let res = rx.recv().map_err(|e| { + tracing::error!("{e}"); + Error::MalformedCreatePaymentResult + })?; + + match res { + ResultMsg::OrderCreated(res) => Ok(start_payment::Details { + redirect_url: res.redirect_uri.unwrap_or_default(), + }), + _ => Err(Error::MalformedCreatePaymentResult), + } } async fn cancel(self, _: context::Context, input: cancel::Input) -> cancel::Output { @@ -39,11 +83,11 @@ impl Payments for PaymentsServer { pub async fn start(config: SharedAppConfig, mqtt_client: AsyncClient, modules: Modules) { let port = { config.lock().stocks_manager().rpc_port }; - channels::rpc::start("orders", port, || { + channels::rpc::start("payment", port, || { PaymentsServer { config: config.clone(), mqtt_client: mqtt_client.clone(), - modules, + modules: modules.clone(), } .serve() })