WASM modules usage

This commit is contained in:
Adrian Woźniak 2022-12-18 21:31:39 +01:00
parent 28dc3024f7
commit dc8fe6dc25
4 changed files with 186 additions and 55 deletions

View File

@ -9,6 +9,12 @@ pub static CLIENT_NAME: &str = "payments";
pub enum Error { pub enum Error {
#[error("Something went wrong")] #[error("Something went wrong")]
InternalServerError, InternalServerError,
#[error("Unknown payment adapter {0:?}")]
UnknownAdapter(String),
#[error("Payment failed")]
PaymentFailed,
#[error("Payment adapter returned invalid response data")]
MalformedCreatePaymentResult,
} }
#[derive(Debug, Copy, Clone)] #[derive(Debug, Copy, Clone)]
@ -147,7 +153,7 @@ pub mod rpc {
use tarpc::tokio_serde::formats::Bincode; use tarpc::tokio_serde::formats::Bincode;
let l = config.lock(); let l = config.lock();
let addr = l.orders_manager().rpc_addr(); let addr = l.payment().rpc_addr();
let transport = tarpc::serde_transport::tcp::connect(addr, Bincode::default); let transport = tarpc::serde_transport::tcp::connect(addr, Bincode::default);
@ -169,6 +175,6 @@ pub mod mqtt {
use crate::AsyncClient; use crate::AsyncClient;
pub fn create_client(config: SharedAppConfig) -> (AsyncClient, EventLoop) { pub fn create_client(config: SharedAppConfig) -> (AsyncClient, EventLoop) {
crate::mqtt::create_client(CLIENT_NAME, config.lock().payments_manager().mqtt_addr()) crate::mqtt::create_client(CLIENT_NAME, config.lock().payment().mqtt_addr())
} }
} }

View File

@ -2,7 +2,8 @@ use std::collections::HashMap;
use std::fs::read_dir; use std::fs::read_dir;
use std::path::PathBuf; use std::path::PathBuf;
use std::str::FromStr; use std::str::FromStr;
use std::sync::{Arc, RwLock}; use std::sync::mpsc::Sender;
use std::sync::{Arc, LockResult, MutexGuard, RwLock, RwLockReadGuard};
use config::{AppConfig, UpdateConfig}; use config::{AppConfig, UpdateConfig};
use payment_adapter::{HttpMethod, HttpRequest, Item, Product}; use payment_adapter::{HttpMethod, HttpRequest, Item, Product};
@ -18,7 +19,79 @@ mod mqtt;
mod rpc; mod rpc;
// pub mod t_pay_adapter; // pub mod t_pay_adapter;
pub type Modules = Arc<RwLock<HashMap<String, HostPool>>>; #[derive(Clone)]
pub enum WasmMsg {
CreateOrder {
create_payment: payment_adapter::CreatePayment,
callback: std::sync::mpsc::Sender<ResultMsg>,
},
UpdateStatus(Vec<u8>),
}
#[derive(Clone)]
pub enum ResultMsg {
OrderCreated(payment_adapter::OrderCreated),
}
pub struct HostChannel {
pub host: HostPool,
pub channel: std::sync::mpsc::Receiver<WasmMsg>,
}
impl HostChannel {
pub fn start(self) {
tokio::spawn(async move {
let HostChannel {
host: pool,
channel,
} = self;
loop {
let msg = match channel.recv() {
Ok(msg) => msg,
_ => continue,
};
match msg {
WasmMsg::CreateOrder {
create_payment: msg,
callback,
} => {
let msg = match wapc_codec::messagepack::serialize(msg) {
Ok(r) => r,
_ => continue,
};
let call_result = match pool.call("create_payment", msg).await {
Ok(r) => r,
_ => continue,
};
let result: payment_adapter::OrderCreated =
match wapc_codec::messagepack::deserialize(&call_result) {
Ok(r) => r,
_ => continue,
};
if let Err(e) = callback.send(ResultMsg::OrderCreated(result)) {
tracing::error!("{e:?}");
}
}
WasmMsg::UpdateStatus(_) => {}
}
}
});
}
}
#[derive(Clone)]
pub struct Modules(Arc<std::sync::Mutex<HashMap<String, std::sync::mpsc::Sender<WasmMsg>>>>);
impl Modules {
pub fn new(h: HashMap<String, std::sync::mpsc::Sender<WasmMsg>>) -> Self {
Self(Arc::new(std::sync::Mutex::new(h)))
}
pub fn lock(&self) -> LockResult<MutexGuard<'_, HashMap<String, Sender<WasmMsg>>>> {
self.0.lock()
}
}
#[derive(gumdrop::Options)] #[derive(gumdrop::Options)]
pub struct Opts { pub struct Opts {
@ -107,54 +180,63 @@ async fn main() {
.await .await
.unwrap(); .unwrap();
modules.insert(name, pool); let (tx, rx) = std::sync::mpsc::channel();
HostChannel {
host: pool,
channel: rx,
}
.start();
modules.insert(name, tx);
} }
for pool in modules.values() { // for pool in modules.values() {
let msg = payment_adapter::CreatePayment { // let msg = payment_adapter::CreatePayment {
buyer: payment_adapter::Buyer { // buyer: payment_adapter::Buyer {
email: "hello@example.com".to_string(), // email: "hello@example.com".to_string(),
phone: "530698478".to_string(), // phone: "530698478".to_string(),
first_name: "Joe".to_string(), // first_name: "Joe".to_string(),
last_name: "Doe".to_string(), // last_name: "Doe".to_string(),
language: "pl".to_string(), // language: "pl".to_string(),
}, // },
customer_ip: "12.22.34.54".to_string(), // customer_ip: "12.22.34.54".to_string(),
currency: "PLN".to_string(), // currency: "PLN".to_string(),
description: "Nesciunt fugit libero quis dolorum quo. Tempore aut nisi voluptatem. Odio et aspernatur est. Sint vel molestias sunt cumque quibusdam reprehenderit est.".to_string(), // description: "Nesciunt fugit libero quis dolorum quo.
cart_products: vec![Product { // Tempore aut nisi voluptatem. Odio et aspernatur est. Sint vel
id: 23, // molestias sunt cumque quibusdam reprehenderit est.".to_string(),
name: "Socks".to_string(), // cart_products: vec![Product {
unit_price: 1542, // id: 23,
quantity_unit: "Unit".to_string(), // name: "Socks".to_string(),
quantity: 2, // unit_price: 1542,
}], // quantity_unit: "Unit".to_string(),
items: vec![Item { // quantity: 2,
product_id: 23, // }],
quantity: 2, // items: vec![Item {
quantity_unit: "Unit".to_string(), // product_id: 23,
}], // quantity: 2,
order_ext_id: None, // quantity_unit: "Unit".to_string(),
notify_uri: "https://localhost:3030/notify_uri".to_string(), // }],
continue_uri: "https://localhost:3030/continue_uri".to_string(), // order_ext_id: None,
}; // notify_uri: "https://localhost:3030/notify_uri".to_string(),
// continue_uri: "https://localhost:3030/continue_uri".to_string(),
tracing::info!("Start create_payment"); // };
let call_result = pool //
.call( // tracing::info!("Start create_payment");
"create_payment", // let call_result = pool
wapc_codec::messagepack::serialize(msg).unwrap(), // .call(
) // "create_payment",
.await // wapc_codec::messagepack::serialize(msg).unwrap(),
.unwrap(); // )
let result: payment_adapter::OrderCreated = // .await
wapc_codec::messagepack::deserialize(&call_result).unwrap(); // .unwrap();
// let result: payment_adapter::OrderCreated =
tracing::info!("create payment res {:?}", result) // wapc_codec::messagepack::deserialize(&call_result).unwrap();
} //
// tracing::info!("create payment res {:?}", result)
// }
} }
let modules = Arc::new(RwLock::new(modules)); let modules = Modules::new(modules);
let mqtt_client = mqtt::start(config.clone(), modules.clone()).await; let mqtt_client = mqtt::start(config.clone(), modules.clone()).await;
rpc::start(config, mqtt_client, modules.clone()).await; rpc::start(config, mqtt_client, modules.clone()).await;
} }

View File

@ -1,7 +1,6 @@
use config::SharedAppConfig; use config::SharedAppConfig;
use rumqttc::{Event, Incoming}; use rumqttc::{Event, Incoming};
use crate::db::Database;
use crate::Modules; use crate::Modules;
pub async fn start(config: SharedAppConfig, _modules: Modules) -> channels::AsyncClient { pub async fn start(config: SharedAppConfig, _modules: Modules) -> channels::AsyncClient {

View File

@ -1,12 +1,14 @@
use std::sync::mpsc::RecvError;
use channels::payments::rpc::Payments; use channels::payments::rpc::Payments;
use channels::payments::start_payment::{Input, Output};
use channels::payments::{adapters, cancel, notification, start_payment}; use channels::payments::{adapters, cancel, notification, start_payment};
use channels::AsyncClient; use channels::AsyncClient;
use config::SharedAppConfig; use config::SharedAppConfig;
use tarpc::context; use tarpc::context;
use crate::Modules; use crate::{Modules, ResultMsg, WasmMsg};
#[derive(Clone)]
pub struct PaymentsServer { pub struct PaymentsServer {
pub config: SharedAppConfig, pub config: SharedAppConfig,
pub mqtt_client: AsyncClient, pub mqtt_client: AsyncClient,
@ -15,8 +17,50 @@ pub struct PaymentsServer {
#[tarpc::server] #[tarpc::server]
impl Payments for PaymentsServer { impl Payments for PaymentsServer {
async fn start_payment(self, _: context::Context, input: Input) -> Output { async fn start_payment(
todo!() self,
_: context::Context,
input: start_payment::Input,
) -> start_payment::Output {
use channels::payments::Error;
let start_payment::Input {
adapter_name,
create_payment,
} = input;
// let (tx, rx) = std::sync::mpsc::channel();
let modules = self.modules.clone();
let lock = modules.lock().map_err(|e| {
tracing::error!("{e}");
Error::InternalServerError
})?;
let module = lock
.get(&adapter_name)
.ok_or_else(|| Error::UnknownAdapter(adapter_name))?
.clone();
let (tx, rx) = std::sync::mpsc::channel();
module
.send(WasmMsg::CreateOrder {
create_payment,
callback: tx,
})
.map_err(|e| {
tracing::error!("{e}");
Error::PaymentFailed
})?;
let res = rx.recv().map_err(|e| {
tracing::error!("{e}");
Error::MalformedCreatePaymentResult
})?;
match res {
ResultMsg::OrderCreated(res) => Ok(start_payment::Details {
redirect_url: res.redirect_uri.unwrap_or_default(),
}),
_ => Err(Error::MalformedCreatePaymentResult),
}
} }
async fn cancel(self, _: context::Context, input: cancel::Input) -> cancel::Output { async fn cancel(self, _: context::Context, input: cancel::Input) -> cancel::Output {
@ -39,11 +83,11 @@ impl Payments for PaymentsServer {
pub async fn start(config: SharedAppConfig, mqtt_client: AsyncClient, modules: Modules) { pub async fn start(config: SharedAppConfig, mqtt_client: AsyncClient, modules: Modules) {
let port = { config.lock().stocks_manager().rpc_port }; let port = { config.lock().stocks_manager().rpc_port };
channels::rpc::start("orders", port, || { channels::rpc::start("payment", port, || {
PaymentsServer { PaymentsServer {
config: config.clone(), config: config.clone(),
mqtt_client: mqtt_client.clone(), mqtt_client: mqtt_client.clone(),
modules, modules: modules.clone(),
} }
.serve() .serve()
}) })