More payments code

This commit is contained in:
Adrian Woźniak 2022-12-16 15:21:56 +01:00
parent 079212bed1
commit 53d70b38da
10 changed files with 230 additions and 48 deletions

1
Cargo.lock generated
View File

@ -729,6 +729,7 @@ dependencies = [
"config",
"futures 0.3.25",
"model",
"payment_adapter",
"rumqttc",
"serde",
"strum",

View File

@ -10,7 +10,7 @@ emails = []
search = []
stocks = []
orders = []
payments = []
payments = ['payment_adapter']
default = ['accounts', 'carts', 'emails', 'search', 'stocks', 'orders', 'payments']
[dependencies]
@ -27,3 +27,4 @@ thiserror = { version = "1.0.37" }
tokio = { version = "1.21.2", features = ['full'] }
tracing = { version = "0.1.37" }
whatlang = { version = "0.16.2" }
payment_adapter = { path = "../payment_adapter", optional = true }

View File

@ -1,5 +1,6 @@
// use rumqttc::QoS;
pub use payment_adapter::*;
// use crate::AsyncClient;
pub static CLIENT_NAME: &str = "payments";
@ -12,13 +13,19 @@ pub enum Error {
#[derive(Debug, Copy, Clone)]
pub enum Topic {
Succeed,
Pending,
WaitingForConfirmation,
Completed,
Canceled,
}
impl Topic {
pub fn to_str(self) -> &'static str {
match self {
Topic::Succeed => "payments/succeed",
Topic::Completed => "payments/completed",
Topic::Canceled => "payments/canceled",
Topic::Pending => "payments/pending",
Topic::WaitingForConfirmation => "payments/waiting_for_confirmation",
}
}
}
@ -45,7 +52,27 @@ pub mod start_payment {
use super::Error;
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct Input {}
pub struct Input {
pub adapter_name: String,
pub create_payment: payment_adapter::CreatePayment,
}
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct Details {
pub redirect_url: String,
}
pub type Output = Result<Details, Error>;
}
pub mod notification {
use super::Error;
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct Input {
pub adapter_name: String,
pub status: String,
}
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct Details {}
@ -53,14 +80,66 @@ pub mod start_payment {
pub type Output = Result<Details, Error>;
}
pub mod cancel {
use model::Price;
use super::Error;
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub enum RefundType {
/// Refund entire payment
Full,
/// Refund only part given in enum
Partial(Price),
}
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct Input {
pub adapter_name: String,
pub refund: RefundType,
pub provider_order_id: String,
pub description: String,
}
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct Details {}
pub type Output = Result<Details, Error>;
}
/// List of available payment adapters
pub mod adapters {
use super::Error;
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct Input {}
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct Details {
pub names: Vec<String>,
}
pub type Output = Result<Details, Error>;
}
pub mod rpc {
use config::SharedAppConfig;
use crate::payments::start_payment;
use crate::payments::{adapters, cancel, notification, start_payment};
#[tarpc::service]
pub trait Payments {
/// User manually started payment procedure
async fn start_payment(input: start_payment::Input) -> start_payment::Output;
/// User manually cancelled order
async fn cancel(input: cancel::Input) -> cancel::Output;
/// Received update status notification from payment service
async fn notification(input: notification::Input) -> notification::Output;
/// List of available adapters
async fn adapters(input: adapters::Input) -> adapters::Output;
}
pub async fn create_client(config: SharedAppConfig) -> PaymentsClient {
@ -90,6 +169,6 @@ pub mod mqtt {
use crate::AsyncClient;
pub fn create_client(config: SharedAppConfig) -> (AsyncClient, EventLoop) {
crate::mqtt::create_client(CLIENT_NAME, config.lock().stocks_manager().mqtt_addr())
crate::mqtt::create_client(CLIENT_NAME, config.lock().payments_manager().mqtt_addr())
}
}

View File

@ -145,7 +145,7 @@ pub struct Refund {
pub struct Status {
/// One of
/// * `PENDING`: Payment is currently being processed.
/// * `WAITING_FOR_CONFIRMATION`: PayU is currently waiting for the merchant
/// * `WaitingForConfirmation`: PayU is currently waiting for the merchant
/// system to receive (capture) the payment. This status is set if
/// auto-receive is disabled on the merchant system.
/// * `COMPLETED`: Payment has been accepted. PayU will pay out the funds

View File

@ -20,7 +20,7 @@
//! Verification of notifications signature for more information.
//!
//! Notifications are sent for orders in the following statuses: PENDING,
//! WAITING_FOR_CONFIRMATION, COMPLETED, CANCELED.
//! WaitingForConfirmation, COMPLETED, CANCELED.
//!
//! Note: if you filter IP addresses, remember to allow IPs used by PayU to send
//! the notifications. These are:
@ -78,7 +78,7 @@ pub struct StatusUpdate {
pub local_receipt_date_time: Option<String>,
/// Array of objects related to transaction identification. In case of
/// statuses:
/// * `"WAITING_FOR_CONFIRMATION"` and `"COMPLETED"` - Contains one element
/// * `"WaitingForConfirmation"` and `"COMPLETED"` - Contains one element
/// with two parameters: name and value,
/// * `"PENDING"` - may contain object with aforementioned parameters or it
/// can be empty.

View File

@ -1,21 +1,25 @@
use std::collections::HashMap;
use std::fs::read_dir;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::{Arc, RwLock};
use config::{AppConfig, UpdateConfig};
use payment_adapter::{HttpMethod, HttpRequest, Item, Product};
use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
use wapc::WasiParams;
use wapc_pool::HostPoolBuilder;
use wapc_pool::{HostPool, HostPoolBuilder};
// mod actions;
// mod context;
// mod db;
// mod mqtt;
mod mqtt;
// pub mod pay_u_adapter;
// mod rpc;
mod rpc;
// pub mod t_pay_adapter;
pub type Modules = Arc<RwLock<HashMap<String, HostPool>>>;
#[derive(gumdrop::Options)]
pub struct Opts {
pub adapters_path: Option<PathBuf>,
@ -38,7 +42,7 @@ async fn main() {
let opts: Opts = gumdrop::parse_args_default_or_exit();
let config = config::default_load(&opts);
let _payment_config = config.lock().payment().clone();
let mut modules;
{
let adapters_path = config.lock().payment().adapters_path.clone();
@ -49,12 +53,18 @@ async fn main() {
adapters, e
)
});
for file in dir.filter_map(|r| r.map(|r| r.path()).ok()) {
tracing::info!("{:?}", file);
if file.extension().and_then(|s| s.to_str()) != Some("wasm") {
continue;
}
let files = dir
.filter_map(|r| r.map(|r| r.path()).ok())
.filter(|file| file.extension().and_then(|s| s.to_str()) == Some("wasm"))
.collect::<Vec<_>>();
if files.is_empty() {
panic!("No payment adapters found in adapters directory");
}
modules = HashMap::with_capacity(files.len());
for file in files {
let module = std::fs::read(&file).unwrap();
let engine = wasmtime_provider::WasmtimeEngineProviderBuilder::new()
.module_bytes(&module)
@ -77,22 +87,31 @@ async fn main() {
})
.max_threads(5)
.build();
let name = pool
.call("name", vec![])
.await
.unwrap_or_else(|e| panic!("Failed to load adapter {file:?} `name`. {e}"));
let name: String = wapc_codec::messagepack::deserialize(&name)
.unwrap_or_else(|e| panic!("Adapter `name` ({name:?}) is not valid string. {e}"));
{
let msg = config
.lock()
.payment()
.providers
.get("pay_u")
.cloned()
.unwrap_or_default();
let msg = config
.lock()
.payment()
.providers
.get(&name)
.cloned()
.unwrap_or_default();
tracing::info!("Start init");
pool.call("init", wapc_codec::messagepack::serialize(msg).unwrap())
.await
.unwrap();
tracing::info!("Start init");
pool.call("init", wapc_codec::messagepack::serialize(msg).unwrap())
.await
.unwrap();
let msg = payment_adapter::CreatePayment {
modules.insert(name, pool);
}
for pool in modules.values() {
let msg = payment_adapter::CreatePayment {
buyer: payment_adapter::Buyer {
email: "hello@example.com".to_string(),
phone: "530698478".to_string(),
@ -120,26 +139,26 @@ async fn main() {
continue_uri: "https://localhost:3030/continue_uri".to_string(),
};
tracing::info!("Start create_payment");
let call_result = pool
.call(
"create_payment",
wapc_codec::messagepack::serialize(msg).unwrap(),
)
.await
.unwrap();
let result: payment_adapter::OrderCreated =
wapc_codec::messagepack::deserialize(&call_result).unwrap();
tracing::info!("Start create_payment");
let call_result = pool
.call(
"create_payment",
wapc_codec::messagepack::serialize(msg).unwrap(),
)
.await
.unwrap();
let result: payment_adapter::OrderCreated =
wapc_codec::messagepack::deserialize(&call_result).unwrap();
tracing::info!("create payment res {:?}", result)
}
tracing::info!("create payment res {:?}", result)
}
}
// let db = db::Database::build(config.clone()).await;
// let mqtt_client = mqtt::start(config.clone(), db.clone()).await;
// rpc::start(config, db, mqtt_client).await;
let modules = Arc::new(RwLock::new(modules));
let mqtt_client = mqtt::start(config.clone(), modules.clone()).await;
rpc::start(config, mqtt_client, modules.clone()).await;
}
// #[tracing::instrument]

View File

@ -0,0 +1,31 @@
use config::SharedAppConfig;
use rumqttc::{Event, Incoming};
use crate::db::Database;
use crate::Modules;
pub async fn start(config: SharedAppConfig, _modules: Modules) -> channels::AsyncClient {
let (client, mut event_loop) = channels::payments::mqtt::create_client(config);
let spawn_client = client.clone();
tokio::spawn(async move {
let _client = spawn_client.clone();
loop {
let notification = event_loop.poll().await;
match notification {
Ok(Event::Incoming(Incoming::Publish(publish))) => match publish.topic.as_str() {
_ => {}
},
Ok(Event::Incoming(_incoming)) => {}
Ok(Event::Outgoing(_outgoing)) => {}
Err(e) => {
tracing::warn!("{}", e);
}
}
}
// tracing::info!("Mqtt channel closed");
});
client
}

View File

@ -0,0 +1,51 @@
use channels::payments::rpc::Payments;
use channels::payments::start_payment::{Input, Output};
use channels::payments::{adapters, cancel, notification, start_payment};
use channels::AsyncClient;
use config::SharedAppConfig;
use tarpc::context;
use crate::Modules;
pub struct PaymentsServer {
pub config: SharedAppConfig,
pub mqtt_client: AsyncClient,
pub modules: Modules,
}
#[tarpc::server]
impl Payments for PaymentsServer {
async fn start_payment(self, _: context::Context, input: Input) -> Output {
todo!()
}
async fn cancel(self, _: context::Context, input: cancel::Input) -> cancel::Output {
todo!()
}
async fn notification(
self,
_: context::Context,
input: notification::Input,
) -> notification::Output {
todo!()
}
async fn adapters(self, _: context::Context, input: adapters::Input) -> adapters::Output {
todo!()
}
}
pub async fn start(config: SharedAppConfig, mqtt_client: AsyncClient, modules: Modules) {
let port = { config.lock().stocks_manager().rpc_port };
channels::rpc::start("orders", port, || {
PaymentsServer {
config: config.clone(),
mqtt_client: mqtt_client.clone(),
modules,
}
.serve()
})
.await;
}

View File

@ -699,7 +699,7 @@ pub struct PayMethod {
pub struct Status {
/// One of
/// * `PENDING`: Payment is currently being processed.
/// * `WAITING_FOR_CONFIRMATION`: PayU is currently waiting for the merchant
/// * `WaitingForConfirmation`: PayU is currently waiting for the merchant
/// system to receive (capture) the payment. This status is set if
/// auto-receive is disabled on the merchant system.
/// * `COMPLETED`: Payment has been accepted. PayU will pay out the funds

View File

@ -20,7 +20,7 @@
//! Verification of notifications signature for more information.
//!
//! Notifications are sent for orders in the following statuses: PENDING,
//! WAITING_FOR_CONFIRMATION, COMPLETED, CANCELED.
//! WaitingForConfirmation, COMPLETED, CANCELED.
//!
//! Note: if you filter IP addresses, remember to allow IPs used by PayU to send
//! the notifications. These are:
@ -77,7 +77,7 @@ pub struct StatusUpdate {
pub local_receipt_date_time: Option<String>,
/// Array of objects related to transaction identification. In case of
/// statuses:
/// * `"WAITING_FOR_CONFIRMATION"` and `"COMPLETED"` - Contains one element
/// * `"WaitingForConfirmation"` and `"COMPLETED"` - Contains one element
/// with two parameters: name and value,
/// * `"PENDING"` - may contain object with aforementioned parameters or it
/// can be empty.