diff --git a/Cargo.lock b/Cargo.lock index 692d626..0d97b49 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -729,6 +729,7 @@ dependencies = [ "config", "futures 0.3.25", "model", + "payment_adapter", "rumqttc", "serde", "strum", diff --git a/crates/channels/Cargo.toml b/crates/channels/Cargo.toml index be7c2ef..d92c2b7 100644 --- a/crates/channels/Cargo.toml +++ b/crates/channels/Cargo.toml @@ -10,7 +10,7 @@ emails = [] search = [] stocks = [] orders = [] -payments = [] +payments = ['payment_adapter'] default = ['accounts', 'carts', 'emails', 'search', 'stocks', 'orders', 'payments'] [dependencies] @@ -27,3 +27,4 @@ thiserror = { version = "1.0.37" } tokio = { version = "1.21.2", features = ['full'] } tracing = { version = "0.1.37" } whatlang = { version = "0.16.2" } +payment_adapter = { path = "../payment_adapter", optional = true } diff --git a/crates/channels/src/payments.rs b/crates/channels/src/payments.rs index 32a8e79..46384d6 100644 --- a/crates/channels/src/payments.rs +++ b/crates/channels/src/payments.rs @@ -1,5 +1,6 @@ // use rumqttc::QoS; +pub use payment_adapter::*; // use crate::AsyncClient; pub static CLIENT_NAME: &str = "payments"; @@ -12,13 +13,19 @@ pub enum Error { #[derive(Debug, Copy, Clone)] pub enum Topic { - Succeed, + Pending, + WaitingForConfirmation, + Completed, + Canceled, } impl Topic { pub fn to_str(self) -> &'static str { match self { - Topic::Succeed => "payments/succeed", + Topic::Completed => "payments/completed", + Topic::Canceled => "payments/canceled", + Topic::Pending => "payments/pending", + Topic::WaitingForConfirmation => "payments/waiting_for_confirmation", } } } @@ -45,7 +52,27 @@ pub mod start_payment { use super::Error; #[derive(Debug, serde::Serialize, serde::Deserialize)] - pub struct Input {} + pub struct Input { + pub adapter_name: String, + pub create_payment: payment_adapter::CreatePayment, + } + + #[derive(Debug, serde::Serialize, serde::Deserialize)] + pub struct Details { + pub redirect_url: String, + } + + pub type Output = Result; +} + +pub mod notification { + use super::Error; + + #[derive(Debug, serde::Serialize, serde::Deserialize)] + pub struct Input { + pub adapter_name: String, + pub status: String, + } #[derive(Debug, serde::Serialize, serde::Deserialize)] pub struct Details {} @@ -53,14 +80,66 @@ pub mod start_payment { pub type Output = Result; } +pub mod cancel { + use model::Price; + + use super::Error; + + #[derive(Debug, serde::Serialize, serde::Deserialize)] + pub enum RefundType { + /// Refund entire payment + Full, + /// Refund only part given in enum + Partial(Price), + } + + #[derive(Debug, serde::Serialize, serde::Deserialize)] + pub struct Input { + pub adapter_name: String, + pub refund: RefundType, + pub provider_order_id: String, + pub description: String, + } + + #[derive(Debug, serde::Serialize, serde::Deserialize)] + pub struct Details {} + + pub type Output = Result; +} + +/// List of available payment adapters +pub mod adapters { + use super::Error; + + #[derive(Debug, serde::Serialize, serde::Deserialize)] + pub struct Input {} + + #[derive(Debug, serde::Serialize, serde::Deserialize)] + pub struct Details { + pub names: Vec, + } + + pub type Output = Result; +} + pub mod rpc { use config::SharedAppConfig; - use crate::payments::start_payment; + use crate::payments::{adapters, cancel, notification, start_payment}; #[tarpc::service] pub trait Payments { + /// User manually started payment procedure async fn start_payment(input: start_payment::Input) -> start_payment::Output; + + /// User manually cancelled order + async fn cancel(input: cancel::Input) -> cancel::Output; + + /// Received update status notification from payment service + async fn notification(input: notification::Input) -> notification::Output; + + /// List of available adapters + async fn adapters(input: adapters::Input) -> adapters::Output; } pub async fn create_client(config: SharedAppConfig) -> PaymentsClient { @@ -90,6 +169,6 @@ pub mod mqtt { use crate::AsyncClient; pub fn create_client(config: SharedAppConfig) -> (AsyncClient, EventLoop) { - crate::mqtt::create_client(CLIENT_NAME, config.lock().stocks_manager().mqtt_addr()) + crate::mqtt::create_client(CLIENT_NAME, config.lock().payments_manager().mqtt_addr()) } } diff --git a/crates/payment_adapter_pay_u/src/model.rs b/crates/payment_adapter_pay_u/src/model.rs index 6064ee6..623dd36 100644 --- a/crates/payment_adapter_pay_u/src/model.rs +++ b/crates/payment_adapter_pay_u/src/model.rs @@ -145,7 +145,7 @@ pub struct Refund { pub struct Status { /// One of /// * `PENDING`: Payment is currently being processed. - /// * `WAITING_FOR_CONFIRMATION`: PayU is currently waiting for the merchant + /// * `WaitingForConfirmation`: PayU is currently waiting for the merchant /// system to receive (capture) the payment. This status is set if /// auto-receive is disabled on the merchant system. /// * `COMPLETED`: Payment has been accepted. PayU will pay out the funds diff --git a/crates/payment_adapter_pay_u/src/notify.rs b/crates/payment_adapter_pay_u/src/notify.rs index a41664b..2792c29 100644 --- a/crates/payment_adapter_pay_u/src/notify.rs +++ b/crates/payment_adapter_pay_u/src/notify.rs @@ -20,7 +20,7 @@ //! Verification of notifications signature for more information. //! //! Notifications are sent for orders in the following statuses: PENDING, -//! WAITING_FOR_CONFIRMATION, COMPLETED, CANCELED. +//! WaitingForConfirmation, COMPLETED, CANCELED. //! //! Note: if you filter IP addresses, remember to allow IPs used by PayU to send //! the notifications. These are: @@ -78,7 +78,7 @@ pub struct StatusUpdate { pub local_receipt_date_time: Option, /// Array of objects related to transaction identification. In case of /// statuses: - /// * `"WAITING_FOR_CONFIRMATION"` and `"COMPLETED"` - Contains one element + /// * `"WaitingForConfirmation"` and `"COMPLETED"` - Contains one element /// with two parameters: name and value, /// * `"PENDING"` - may contain object with aforementioned parameters or it /// can be empty. diff --git a/crates/payment_manager/src/main.rs b/crates/payment_manager/src/main.rs index b729727..e99b9db 100644 --- a/crates/payment_manager/src/main.rs +++ b/crates/payment_manager/src/main.rs @@ -1,21 +1,25 @@ +use std::collections::HashMap; use std::fs::read_dir; use std::path::PathBuf; use std::str::FromStr; +use std::sync::{Arc, RwLock}; use config::{AppConfig, UpdateConfig}; use payment_adapter::{HttpMethod, HttpRequest, Item, Product}; use reqwest::header::{HeaderMap, HeaderName, HeaderValue}; use wapc::WasiParams; -use wapc_pool::HostPoolBuilder; +use wapc_pool::{HostPool, HostPoolBuilder}; // mod actions; // mod context; // mod db; -// mod mqtt; +mod mqtt; // pub mod pay_u_adapter; -// mod rpc; +mod rpc; // pub mod t_pay_adapter; +pub type Modules = Arc>>; + #[derive(gumdrop::Options)] pub struct Opts { pub adapters_path: Option, @@ -38,7 +42,7 @@ async fn main() { let opts: Opts = gumdrop::parse_args_default_or_exit(); let config = config::default_load(&opts); - let _payment_config = config.lock().payment().clone(); + let mut modules; { let adapters_path = config.lock().payment().adapters_path.clone(); @@ -49,12 +53,18 @@ async fn main() { adapters, e ) }); - for file in dir.filter_map(|r| r.map(|r| r.path()).ok()) { - tracing::info!("{:?}", file); - if file.extension().and_then(|s| s.to_str()) != Some("wasm") { - continue; - } + let files = dir + .filter_map(|r| r.map(|r| r.path()).ok()) + .filter(|file| file.extension().and_then(|s| s.to_str()) == Some("wasm")) + .collect::>(); + if files.is_empty() { + panic!("No payment adapters found in adapters directory"); + } + + modules = HashMap::with_capacity(files.len()); + + for file in files { let module = std::fs::read(&file).unwrap(); let engine = wasmtime_provider::WasmtimeEngineProviderBuilder::new() .module_bytes(&module) @@ -77,22 +87,31 @@ async fn main() { }) .max_threads(5) .build(); + let name = pool + .call("name", vec![]) + .await + .unwrap_or_else(|e| panic!("Failed to load adapter {file:?} `name`. {e}")); + let name: String = wapc_codec::messagepack::deserialize(&name) + .unwrap_or_else(|e| panic!("Adapter `name` ({name:?}) is not valid string. {e}")); - { - let msg = config - .lock() - .payment() - .providers - .get("pay_u") - .cloned() - .unwrap_or_default(); + let msg = config + .lock() + .payment() + .providers + .get(&name) + .cloned() + .unwrap_or_default(); - tracing::info!("Start init"); - pool.call("init", wapc_codec::messagepack::serialize(msg).unwrap()) - .await - .unwrap(); + tracing::info!("Start init"); + pool.call("init", wapc_codec::messagepack::serialize(msg).unwrap()) + .await + .unwrap(); - let msg = payment_adapter::CreatePayment { + modules.insert(name, pool); + } + + for pool in modules.values() { + let msg = payment_adapter::CreatePayment { buyer: payment_adapter::Buyer { email: "hello@example.com".to_string(), phone: "530698478".to_string(), @@ -120,26 +139,26 @@ async fn main() { continue_uri: "https://localhost:3030/continue_uri".to_string(), }; - tracing::info!("Start create_payment"); - let call_result = pool - .call( - "create_payment", - wapc_codec::messagepack::serialize(msg).unwrap(), - ) - .await - .unwrap(); - let result: payment_adapter::OrderCreated = - wapc_codec::messagepack::deserialize(&call_result).unwrap(); + tracing::info!("Start create_payment"); + let call_result = pool + .call( + "create_payment", + wapc_codec::messagepack::serialize(msg).unwrap(), + ) + .await + .unwrap(); + let result: payment_adapter::OrderCreated = + wapc_codec::messagepack::deserialize(&call_result).unwrap(); - tracing::info!("create payment res {:?}", result) - } + tracing::info!("create payment res {:?}", result) } } // let db = db::Database::build(config.clone()).await; - // let mqtt_client = mqtt::start(config.clone(), db.clone()).await; - // rpc::start(config, db, mqtt_client).await; + let modules = Arc::new(RwLock::new(modules)); + let mqtt_client = mqtt::start(config.clone(), modules.clone()).await; + rpc::start(config, mqtt_client, modules.clone()).await; } // #[tracing::instrument] diff --git a/crates/payment_manager/src/mqtt.rs b/crates/payment_manager/src/mqtt.rs index e69de29..3be77fd 100644 --- a/crates/payment_manager/src/mqtt.rs +++ b/crates/payment_manager/src/mqtt.rs @@ -0,0 +1,31 @@ +use config::SharedAppConfig; +use rumqttc::{Event, Incoming}; + +use crate::db::Database; +use crate::Modules; + +pub async fn start(config: SharedAppConfig, _modules: Modules) -> channels::AsyncClient { + let (client, mut event_loop) = channels::payments::mqtt::create_client(config); + + let spawn_client = client.clone(); + tokio::spawn(async move { + let _client = spawn_client.clone(); + loop { + let notification = event_loop.poll().await; + + match notification { + Ok(Event::Incoming(Incoming::Publish(publish))) => match publish.topic.as_str() { + _ => {} + }, + Ok(Event::Incoming(_incoming)) => {} + Ok(Event::Outgoing(_outgoing)) => {} + Err(e) => { + tracing::warn!("{}", e); + } + } + } + // tracing::info!("Mqtt channel closed"); + }); + + client +} diff --git a/crates/payment_manager/src/rpc.rs b/crates/payment_manager/src/rpc.rs index e69de29..2439d4c 100644 --- a/crates/payment_manager/src/rpc.rs +++ b/crates/payment_manager/src/rpc.rs @@ -0,0 +1,51 @@ +use channels::payments::rpc::Payments; +use channels::payments::start_payment::{Input, Output}; +use channels::payments::{adapters, cancel, notification, start_payment}; +use channels::AsyncClient; +use config::SharedAppConfig; +use tarpc::context; + +use crate::Modules; + +pub struct PaymentsServer { + pub config: SharedAppConfig, + pub mqtt_client: AsyncClient, + pub modules: Modules, +} + +#[tarpc::server] +impl Payments for PaymentsServer { + async fn start_payment(self, _: context::Context, input: Input) -> Output { + todo!() + } + + async fn cancel(self, _: context::Context, input: cancel::Input) -> cancel::Output { + todo!() + } + + async fn notification( + self, + _: context::Context, + input: notification::Input, + ) -> notification::Output { + todo!() + } + + async fn adapters(self, _: context::Context, input: adapters::Input) -> adapters::Output { + todo!() + } +} + +pub async fn start(config: SharedAppConfig, mqtt_client: AsyncClient, modules: Modules) { + let port = { config.lock().stocks_manager().rpc_port }; + + channels::rpc::start("orders", port, || { + PaymentsServer { + config: config.clone(), + mqtt_client: mqtt_client.clone(), + modules, + } + .serve() + }) + .await; +} diff --git a/vendor/pay_u/src/lib.rs b/vendor/pay_u/src/lib.rs index 5c35b7b..107331a 100644 --- a/vendor/pay_u/src/lib.rs +++ b/vendor/pay_u/src/lib.rs @@ -699,7 +699,7 @@ pub struct PayMethod { pub struct Status { /// One of /// * `PENDING`: Payment is currently being processed. - /// * `WAITING_FOR_CONFIRMATION`: PayU is currently waiting for the merchant + /// * `WaitingForConfirmation`: PayU is currently waiting for the merchant /// system to receive (capture) the payment. This status is set if /// auto-receive is disabled on the merchant system. /// * `COMPLETED`: Payment has been accepted. PayU will pay out the funds diff --git a/vendor/pay_u/src/notify.rs b/vendor/pay_u/src/notify.rs index cc118aa..ddc703b 100644 --- a/vendor/pay_u/src/notify.rs +++ b/vendor/pay_u/src/notify.rs @@ -20,7 +20,7 @@ //! Verification of notifications signature for more information. //! //! Notifications are sent for orders in the following statuses: PENDING, -//! WAITING_FOR_CONFIRMATION, COMPLETED, CANCELED. +//! WaitingForConfirmation, COMPLETED, CANCELED. //! //! Note: if you filter IP addresses, remember to allow IPs used by PayU to send //! the notifications. These are: @@ -77,7 +77,7 @@ pub struct StatusUpdate { pub local_receipt_date_time: Option, /// Array of objects related to transaction identification. In case of /// statuses: - /// * `"WAITING_FOR_CONFIRMATION"` and `"COMPLETED"` - Contains one element + /// * `"WaitingForConfirmation"` and `"COMPLETED"` - Contains one element /// with two parameters: name and value, /// * `"PENDING"` - may contain object with aforementioned parameters or it /// can be empty.