diff --git a/Cargo.lock b/Cargo.lock index 0fe989e..c03f905 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1493,6 +1493,14 @@ dependencies = [ "libc", ] +[[package]] +name = "event-bus-adapter" +version = "0.1.0" + +[[package]] +name = "event-bus-messages" +version = "0.1.0" + [[package]] name = "event-listener" version = "2.5.3" @@ -2729,6 +2737,10 @@ dependencies = [ "local-waker", ] +[[package]] +name = "local-event-bus" +version = "0.1.0" + [[package]] name = "local-waker" version = "0.1.3" @@ -3768,6 +3780,10 @@ dependencies = [ "rand_core 0.3.1", ] +[[package]] +name = "redis-event-bus" +version = "0.1.0" + [[package]] name = "redox_syscall" version = "0.2.16" @@ -4726,6 +4742,7 @@ dependencies = [ "fulfillment_adapter", "payment_adapter", "serde", + "thiserror", "tokio 1.28.0", "tracing", ] diff --git a/Cargo.toml b/Cargo.toml index 7af6a53..3a8fd54 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,6 +30,11 @@ members = [ # "vendor/pay_u", "crates/pay_u_adapter", "crates/stripe_adapter", + # EVENT BUS + "crates/event-bus-messages", + "crates/event-bus-adapter", + "crates/local-event-bus", + "crates/redis-event-bus", ] exclude = [ diff --git a/crates/event-bus-adapter/Cargo.toml b/crates/event-bus-adapter/Cargo.toml new file mode 100644 index 0000000..d0cf6ea --- /dev/null +++ b/crates/event-bus-adapter/Cargo.toml @@ -0,0 +1,8 @@ +[package] +name = "event-bus-adapter" +version = "0.1.0" +edition = "2021" + +[dependencies] +serde = { version = "1.0.162", features = ['derive'] } +bincode = { version = "1.3.3" } diff --git a/crates/event-bus-adapter/src/lib.rs b/crates/event-bus-adapter/src/lib.rs new file mode 100644 index 0000000..7d12d9a --- /dev/null +++ b/crates/event-bus-adapter/src/lib.rs @@ -0,0 +1,14 @@ +pub fn add(left: usize, right: usize) -> usize { + left + right +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn it_works() { + let result = add(2, 2); + assert_eq!(result, 4); + } +} diff --git a/crates/event-bus-messages/Cargo.toml b/crates/event-bus-messages/Cargo.toml new file mode 100644 index 0000000..13affe2 --- /dev/null +++ b/crates/event-bus-messages/Cargo.toml @@ -0,0 +1,8 @@ +[package] +name = "event-bus-messages" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] diff --git a/crates/event-bus-messages/src/lib.rs b/crates/event-bus-messages/src/lib.rs new file mode 100644 index 0000000..7d12d9a --- /dev/null +++ b/crates/event-bus-messages/src/lib.rs @@ -0,0 +1,14 @@ +pub fn add(left: usize, right: usize) -> usize { + left + right +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn it_works() { + let result = add(2, 2); + assert_eq!(result, 4); + } +} diff --git a/crates/local-event-bus/Cargo.toml b/crates/local-event-bus/Cargo.toml new file mode 100644 index 0000000..514a7f1 --- /dev/null +++ b/crates/local-event-bus/Cargo.toml @@ -0,0 +1,8 @@ +[package] +name = "local-event-bus" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] diff --git a/crates/local-event-bus/src/main.rs b/crates/local-event-bus/src/main.rs new file mode 100644 index 0000000..e7a11a9 --- /dev/null +++ b/crates/local-event-bus/src/main.rs @@ -0,0 +1,3 @@ +fn main() { + println!("Hello, world!"); +} diff --git a/crates/payment_adapter/src/lib.rs b/crates/payment_adapter/src/lib.rs index ad543b4..ec1ff74 100644 --- a/crates/payment_adapter/src/lib.rs +++ b/crates/payment_adapter/src/lib.rs @@ -239,6 +239,29 @@ pub fn session_mut_ref( ::downcast_mut(session) } -pub trait Plugin { +pub trait Plugin: Any { + fn name(&self) -> &'static str; + fn mount(&self, config: &mut actix_web::web::ServiceConfig); } + +pub struct Plugins { + plugins: HashMap<&'static str, Box>, +} + +pub fn find_plugin<'plugins, P: Plugin>( + plugins: &'plugins Plugins, + name: &str, +) -> Option<&'plugins P> { + let plugin = plugins.plugins.get(name)?; + ::downcast_ref(plugin) +} + +impl Plugins { + pub fn insert_plugin

(&mut self, plugin: P) + where + P: Plugin, + { + self.plugins.insert(plugin.name(), Box::new(plugin)); + } +} diff --git a/crates/redis-event-bus/Cargo.toml b/crates/redis-event-bus/Cargo.toml new file mode 100644 index 0000000..3998d77 --- /dev/null +++ b/crates/redis-event-bus/Cargo.toml @@ -0,0 +1,8 @@ +[package] +name = "redis-event-bus" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] diff --git a/crates/redis-event-bus/src/main.rs b/crates/redis-event-bus/src/main.rs new file mode 100644 index 0000000..e7a11a9 --- /dev/null +++ b/crates/redis-event-bus/src/main.rs @@ -0,0 +1,3 @@ +fn main() { + println!("Hello, world!"); +} diff --git a/crates/stripe_adapter/Cargo.toml b/crates/stripe_adapter/Cargo.toml index 8d30f03..627ebf5 100644 --- a/crates/stripe_adapter/Cargo.toml +++ b/crates/stripe_adapter/Cargo.toml @@ -6,8 +6,23 @@ edition = "2021" [lib] crate-type = ['dylib'] -[build] -rustflags = ["-C", "prefer-dynamic", "-C", "rpath"] +#[lib] +#crate-type = ['dylib'] +#name = "stripe-common" +#path = "./src/lib.rs" +# +#[lib] +#crate-type = ['dylib'] +#name = "stripe-adapters" +#path = "./src/adapters.rs" +# +#[lib] +#crate-type = ['dylib'] +#name = "stripe-web" +#path = "./src/web.rs" + +#[build] +#rustflags = ["-C", "prefer-dynamic", "-C", "rpath"] [dependencies] payment_adapter = { path = "../payment_adapter" } @@ -19,3 +34,4 @@ serde = { version = "1.0.162", features = ['derive'] } derive_more = { version = "0.99.17" } async-stripe = { version = "0.21.0", features = ['tokio', 'async', 'runtime-tokio-hyper'] } actix-web = { version = "4.3.1" } +thiserror = { version = "1.0.40" } diff --git a/crates/stripe_adapter/src/lib.rs b/crates/stripe_adapter/src/lib.rs index 536b654..8fd3a96 100644 --- a/crates/stripe_adapter/src/lib.rs +++ b/crates/stripe_adapter/src/lib.rs @@ -1,9 +1,7 @@ #![crate_type = "rlib"] -use std::collections::HashMap; use std::str::FromStr; -use actix_web::HttpResponse; use fulfillment_adapter::*; use payment_adapter::*; use tracing::warn; @@ -11,6 +9,8 @@ use tracing::warn; mod przelewy_24; mod routes; +static PLUGIN_NAME: &'static str = "stripe-adapter"; + #[derive(Debug, serde::Deserialize, serde::Serialize)] pub enum CaptureMethod { Automatic, @@ -27,6 +27,7 @@ pub enum SetupFutureUsage { pub struct StripeConfig { pub api_key: String, pub client: String, + pub webhook_secret: String, } #[derive(Debug)] @@ -54,8 +55,11 @@ async fn payment_status( ::stripe::PaymentIntentStatus::RequiresPaymentMethod | ::stripe::PaymentIntentStatus::RequiresConfirmation | ::stripe::PaymentIntentStatus::Processing => PaymentSessionStatus::Pending, + ::stripe::PaymentIntentStatus::RequiresAction => PaymentSessionStatus::RequiresMore, + ::stripe::PaymentIntentStatus::Canceled => PaymentSessionStatus::Canceled, + ::stripe::PaymentIntentStatus::RequiresCapture | ::stripe::PaymentIntentStatus::Succeeded => PaymentSessionStatus::Authorized, }) @@ -91,9 +95,15 @@ impl PaymentSessionData for Intent { } } -pub struct StripePlugin {} +pub struct StripePlugin { + pub config: StripeConfig, +} impl Plugin for StripePlugin { + fn name(&self) -> &'static str { + PLUGIN_NAME + } + fn mount(&self, config: &mut actix_web::web::ServiceConfig) { config.service(routes::stripe_hooks); } diff --git a/crates/stripe_adapter/src/lib/adapters.rs b/crates/stripe_adapter/src/lib/adapters.rs new file mode 100644 index 0000000..e69de29 diff --git a/crates/stripe_adapter/src/lib/web.rs b/crates/stripe_adapter/src/lib/web.rs new file mode 100644 index 0000000..e69de29 diff --git a/crates/stripe_adapter/src/przelewy_24.rs b/crates/stripe_adapter/src/przelewy_24.rs index b024ee0..85b6acf 100644 --- a/crates/stripe_adapter/src/przelewy_24.rs +++ b/crates/stripe_adapter/src/przelewy_24.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use std::str::FromStr; use payment_adapter::{ diff --git a/crates/stripe_adapter/src/routes.rs b/crates/stripe_adapter/src/routes.rs index e0941fe..93cdd71 100644 --- a/crates/stripe_adapter/src/routes.rs +++ b/crates/stripe_adapter/src/routes.rs @@ -1,18 +1,57 @@ +use actix_web::web::Data; use actix_web::{web, HttpRequest, HttpResponse}; +use payment_adapter::{find_plugin, Plugins}; use stripe::{EventObject, EventType, Webhook, WebhookError}; +use tracing::warn; -#[actix_web::web::post("/stripe/hooks")] -pub async fn stripe_hooks(req: HttpRequest, payload: web::Bytes) -> HttpResponse { - handle_webhook(req, payload).unwrap(); - HttpResponse::Ok().finish() +use crate::{StripePlugin, PLUGIN_NAME}; + +#[derive(Debug, thiserror::Error)] +pub enum HookError { + #[error("{0}")] + Webhook(#[from] WebhookError), + #[error("Invalid stripe webhook payload")] + InvalidPayload, + #[error("Stripe plugin is inactive")] + NoStripePlugin, } -pub fn handle_webhook(req: HttpRequest, payload: web::Bytes) -> Result<(), WebhookError> { - let payload_str = std::str::from_utf8(payload.borrow()).unwrap(); +#[actix_web::post("/stripe/hooks")] +pub async fn stripe_hooks( + req: HttpRequest, + payload: web::Bytes, + plugins: Data, +) -> HttpResponse { + if let Err(e) = handle_webhook(req, payload, plugins) { + warn!("Failed to handle stripe hook: {e}"); + HttpResponse::Ok().finish() + } else { + HttpResponse::Ok().finish() + } +} - let stripe_signature = get_header_value(&req, "Stripe-Signature").unwrap_or_default(); +pub fn handle_webhook( + req: HttpRequest, + payload: web::Bytes, + plugins: Data, +) -> Result<(), HookError> { + use std::borrow::Borrow; - if let Ok(event) = Webhook::construct_event(payload_str, stripe_signature, "whsec_xxxxx") { + let stripe = + find_plugin::(&*plugins, PLUGIN_NAME).ok_or(HookError::NoStripePlugin)?; + + let payload_str = std::str::from_utf8(payload.borrow()).map_err(|e| { + warn!("Invalid stripe payload: {e}"); + HookError::InvalidPayload + })?; + + let stripe_signature = read_header_value(&req, "Stripe-Signature") + .or_else(|| read_header_value(&req, "stripe-signature")) + .unwrap_or_default(); + + if let Ok(event) = + Webhook::construct_event(payload_str, stripe_signature, &stripe.config.webhook_secret) + { match event.type_ { EventType::AccountUpdated => { if let EventObject::Account(account) = event.data.object { @@ -41,11 +80,11 @@ pub fn handle_webhook(req: HttpRequest, payload: web::Bytes) -> Result<(), Webho Ok(()) } -fn get_header_value<'b>(req: &'b HttpRequest, key: &'b str) -> Option<&'b str> { +fn read_header_value<'b>(req: &'b HttpRequest, key: &'b str) -> Option<&'b str> { req.headers().get(key)?.to_str().ok() } -fn handle_account_updated(account: stripe::Account) -> Result<(), WebhookError> { +fn handle_account_updated(account: stripe::Account) -> Result<(), HookError> { println!( "Received account updated webhook for account: {:?}", account.id @@ -53,7 +92,7 @@ fn handle_account_updated(account: stripe::Account) -> Result<(), WebhookError> Ok(()) } -fn handle_checkout_session(session: stripe::CheckoutSession) -> Result<(), WebhookError> { +fn handle_checkout_session(session: stripe::CheckoutSession) -> Result<(), HookError> { println!( "Received checkout session completed webhook with id: {:?}", session.id @@ -61,6 +100,6 @@ fn handle_checkout_session(session: stripe::CheckoutSession) -> Result<(), Webho Ok(()) } -fn handle_payment_intent(_intent: stripe::PaymentIntent) -> Result<(), WebhookError> { +fn handle_payment_intent(_intent: stripe::PaymentIntent) -> Result<(), HookError> { Ok(()) }