Tokens manager

This commit is contained in:
Adrian Woźniak 2022-12-20 15:34:20 +01:00
parent dc8fe6dc25
commit ba00afab41
46 changed files with 1893 additions and 1802 deletions

1
.env
View File

@ -4,6 +4,7 @@ ACCOUNT_DATABASE_URL=postgres://postgres@localhost/bazzar_accounts
CART_DATABASE_URL=postgres://postgres@localhost/bazzar_carts
STOCK_DATABASE_URL=postgres://postgres@localhost/bazzar_stocks
ORDER_DATABASE_URL=postgres://postgres@localhost/bazzar_orders
TOKEN_DATABASE_URL=postgres://postgres@localhost/bazzar_tokens
PASS_SALT=18CHwV7eGFAea16z+qMKZg
RUST_LOG=debug

689
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -16,7 +16,7 @@ members = [
"crates/search_manager",
"crates/stock_manager",
"crates/token_manager",
"crates/fs_manager",
# "crates/fs_manager",
"crates/lang_provider",
"crates/payment_adapter",
# "crates/payment_adapter_pay_u",

View File

@ -8,28 +8,24 @@ name = "account-manager"
path = "src/main.rs"
[dependencies]
bincode = { version = "1.3.3" }
bytes = { version = "1.2.1" }
bincode = { version = "1" }
bytes = { version = "1" }
channels = { path = "../channels" }
config = { path = "../config" }
dotenv = { version = "0.15.0" }
futures = { version = "0.3.25" }
gumdrop = { version = "0.8.1" }
json = { version = "0.12.4" }
dotenv = { version = "0" }
futures = { version = "0" }
gumdrop = { version = "0" }
json = { version = "0" }
model = { path = "../model", features = ['db'] }
opentelemetry = { version = "0.17.0" }
opentelemetry-jaeger = { version = "0.17.0" }
rumqttc = { version = "*" }
serde = { version = "1.0.137", features = ["derive"] }
sqlx = { version = "0.6.2", features = ["migrate", "runtime-actix-rustls", "all-types", "postgres"] }
sqlx-core = { version = "0.6.2", features = [] }
tarpc = { version = "0.31.0", features = ["tokio1", "serde-transport-bincode", "serde-transport", "serde", "serde-transport-json", "tcp"] }
thiserror = { version = "1.0.31" }
tokio = { version = "1.21.2", features = ['full'] }
tracing = { version = "0.1.6" }
tracing-opentelemetry = { version = "0.17.4" }
tracing-subscriber = { version = "0.3.16", features = ["env-filter"] }
serde = { version = "1", features = ["derive"] }
sqlx = { version = "0", features = ["migrate", "runtime-actix-rustls", "all-types", "postgres"] }
sqlx-core = { version = "0", features = [] }
tarpc = { version = "0", features = ["tokio1", "serde-transport-bincode", "serde-transport", "serde", "serde-transport-json", "tcp"] }
thiserror = { version = "1" }
tokio = { version = "1", features = ['full'] }
tracing = { version = "0" }
[dev-dependencies]
fake = { version = "2.5.0" }
fake = { version = "2" }
testx = { path = "../testx" }

View File

@ -34,7 +34,6 @@ async fn main() -> std::io::Result<()> {
// Let the background span processor finish.
sleep(Duration::from_micros(1)).await;
opentelemetry::global::shutdown_tracer_provider();
Ok(())
}

View File

@ -1,11 +1,6 @@
#![feature(structural_match)]
use std::env;
use config::UpdateConfig;
use tracing_subscriber::fmt::format::FmtSpan;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
pub mod actions;
pub mod db;
@ -35,7 +30,7 @@ impl UpdateConfig for Opts {}
#[tokio::main]
async fn main() {
dotenv::dotenv().ok();
init_tracing("account-manager");
config::init_tracing("account-manager");
let opts = Opts {};
@ -46,24 +41,3 @@ async fn main() {
let mqtt_client = mqtt::start(config.clone(), db.clone()).await;
rpc::start(config.clone(), db.clone(), mqtt_client.clone()).await;
}
pub fn init_tracing(_service_name: &str) {
env::set_var("OTEL_BSP_MAX_EXPORT_BATCH_SIZE", "12");
let tracer = {
use opentelemetry::sdk::export::trace::stdout::new_pipeline;
use opentelemetry::sdk::trace::Config;
new_pipeline()
.with_trace_config(Config::default())
.with_pretty_print(true)
.install_simple()
};
tracing_subscriber::registry()
.with(tracing_subscriber::EnvFilter::from_default_env())
.with(tracing_subscriber::fmt::layer().with_span_events(FmtSpan::NEW | FmtSpan::CLOSE))
.with(tracing_opentelemetry::layer().with_tracer(tracer))
.try_init()
.unwrap();
}

View File

@ -4,52 +4,50 @@ version = "0.1.0"
edition = "2021"
[dependencies]
actix = { version = "0.13", features = [] }
actix-broker = { version = "0.4", features = [] }
actix-cors = { version = "0.6", features = [] }
actix-files = { version = "0.6", features = [] }
actix-identity = { version = "0.4", features = [] }
actix-multipart = { version = "0.4", features = [] }
actix-redis = { version = "0.11", features = [] }
actix-rt = { version = "2.7", features = [] }
actix-session = { version = "0.6", features = ["actix-redis", "redis-actor-session"] }
actix-web = { version = "4.0", features = [] }
actix-web-httpauth = { version = "0.6", features = [] }
actix-web-opentelemetry = { version = "0.12", features = [] }
async-trait = { version = "0.1", features = [] }
bytes = { version = "1.1.0" }
actix = { version = "0", features = [] }
actix-broker = { version = "0", features = [] }
actix-cors = { version = "0", features = [] }
actix-files = { version = "0", features = [] }
actix-identity = { version = "0", features = [] }
actix-multipart = { version = "0", features = [] }
actix-redis = { version = "0", features = [] }
actix-rt = { version = "2", features = [] }
actix-session = { version = "0", features = ["actix-redis", "redis-actor-session"] }
actix-web = { version = "4", features = [] }
actix-web-httpauth = { version = "0", features = [] }
actix-web-opentelemetry = { version = "0", features = [] }
async-trait = { version = "0", features = [] }
bytes = { version = "1" }
channels = { path = "../channels", features = ['accounts', 'carts', 'emails', 'search'] }
chrono = { version = "0.4", features = ["serde"] }
chrono = { version = "0", features = ["serde"] }
config = { path = "../config", features = ['full'] }
#database_manager = { path = "../database_manager" }
derive_more = { version = "0.99", features = [] }
dotenv = { version = "0.15", features = [] }
derive_more = { version = "0", features = [] }
dotenv = { version = "0", features = [] }
fs_manager = { path = "../fs_manager" }
futures = { version = "0.3", features = [] }
futures-util = { version = "0.3", features = [] }
gumdrop = { version = "0.8", features = [] }
human-panic = { version = "1.0.3" }
include_dir = { version = "0.7.2", features = [] }
itertools = { version = "0.10.5" }
jemallocator = { version = "0.3", features = [] }
futures = { version = "0", features = [] }
futures-util = { version = "0", features = [] }
gumdrop = { version = "0", features = [] }
human-panic = { version = "1" }
include_dir = { version = "0", features = [] }
itertools = { version = "0" }
jemallocator = { version = "0", features = [] }
model = { path = "../model", version = "0.1", features = ["db"] }
oauth2 = { version = "4.1", features = [] }
oauth2 = { version = "4", features = [] }
order_manager = { path = "../order_manager" }
parking_lot = { version = "0.12", features = [] }
parking_lot = { version = "0", features = [] }
payment_manager = { path = "../payment_manager" }
pretty_env_logger = { version = "0.4", features = [] }
pretty_env_logger = { version = "0", features = [] }
rumqttc = { version = "*" }
serde = { version = "1.0", features = ["derive"] }
serde_json = { version = "1.0", features = [] }
sqlx = { version = "0.6.2", features = ["migrate", "runtime-actix-rustls", "all-types", "postgres"] }
sqlx-core = { version = "0.6.2", features = [] }
tarpc = { version = "0.31.0", features = ["tokio1", "serde-transport-bincode", "serde-transport", "serde", "serde-transport-json", "tcp"] }
tera = { version = "1.15", features = [] }
thiserror = { version = "1.0", features = [] }
serde = { version = "1", features = ["derive"] }
serde_json = { version = "1", features = [] }
sqlx = { version = "0", features = ["migrate", "runtime-actix-rustls", "all-types", "postgres"] }
sqlx-core = { version = "0", features = [] }
tarpc = { version = "0", features = ["tokio1", "serde-transport-bincode", "serde-transport", "serde", "serde-transport-json", "tcp"] }
tera = { version = "1", features = [] }
thiserror = { version = "1", features = [] }
token_manager = { path = "../token_manager" }
tokio = { version = "1.17", features = ["full"] }
toml = { version = "0.5", features = [] }
tracing = { version = "0.1.34" }
tracing-subscriber = { version = "0.3.11" }
uuid = { version = "1.2.1", features = ["serde"] }
validator = { version = "0.14", features = [] }
tokio = { version = "1", features = ["full"] }
toml = { version = "0", features = [] }
uuid = { version = "1", features = ["serde"] }
validator = { version = "0", features = [] }

View File

@ -1,4 +1,4 @@
#![feature(drain_filter)]
+#![feature(drain_filter)]
use std::io::Write;
use std::str::FromStr;

View File

@ -9,26 +9,21 @@ path = "src/main.rs"
[dependencies]
channels = { path = "../channels" }
chrono = { version = "0.4", features = ["serde"] }
chrono = { version = "0", features = ["serde"] }
config = { path = "../config" }
dotenv = { version = "0.15.0" }
futures = { version = "0.3.25" }
dotenv = { version = "0" }
futures = { version = "0" }
model = { path = "../model", features = ["db"] }
opentelemetry = { version = "0.17.0" }
opentelemetry-jaeger = { version = "0.17.0" }
pretty_env_logger = { version = "0.4", features = [] }
rumqttc = { version = "*" }
serde = { version = "1.0.137", features = ["derive"] }
sqlx = { version = "0.6.2", features = ["migrate", "runtime-actix-rustls", "all-types", "postgres"] }
sqlx-core = { version = "0.6.2", features = [] }
tarpc = { version = "0.31.0", features = ["tokio1", "serde-transport-bincode", "serde-transport", "serde", "serde-transport-json", "tcp"] }
thiserror = { version = "1.0.31" }
tokio = { version = "1.21.2", features = ['full'] }
tracing = { version = "0.1.37" }
tracing-opentelemetry = { version = "0.17.4" }
tracing-subscriber = { version = "0.3.16", features = ["env-filter"] }
uuid = { version = "0.8", features = ["serde", "v4"] }
serde = { version = "1", features = ["derive"] }
sqlx = { version = "0", features = ["migrate", "runtime-actix-rustls", "all-types", "postgres"] }
sqlx-core = { version = "0", features = [] }
tarpc = { version = "0", features = ["tokio1", "serde-transport-bincode", "serde-transport", "serde", "serde-transport-json", "tcp"] }
thiserror = { version = "1" }
tokio = { version = "1", features = ['full'] }
tracing = { version = "0" }
uuid = { version = "0", features = ["serde", "v4"] }
[dev-dependencies]
fake = { version = "2.5.0" }
fake = { version = "2" }
testx = { path = "../testx" }

View File

@ -1,7 +1,4 @@
use config::UpdateConfig;
use tracing_subscriber::fmt::format::FmtSpan;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use crate::db::Database;
@ -17,7 +14,7 @@ impl UpdateConfig for Opts {}
#[tokio::main]
async fn main() {
dotenv::dotenv().ok();
init_tracing("account-manager");
config::init_tracing("account-manager");
let opts = Opts {};
@ -28,23 +25,3 @@ async fn main() {
let mqtt_client = mqtt::start(config.clone(), db.clone()).await;
rpc::start(config.clone(), db.clone(), mqtt_client.clone()).await;
}
pub fn init_tracing(_service_name: &str) {
std::env::set_var("OTEL_BSP_MAX_EXPORT_BATCH_SIZE", "12");
let tracer = {
use opentelemetry::sdk::export::trace::stdout::new_pipeline;
use opentelemetry::sdk::trace::Config;
new_pipeline()
.with_trace_config(Config::default())
.with_pretty_print(true)
.install_simple()
};
tracing_subscriber::registry()
.with(tracing_subscriber::EnvFilter::from_default_env())
.with(tracing_subscriber::fmt::layer().with_span_events(FmtSpan::NEW | FmtSpan::CLOSE))
.with(tracing_opentelemetry::layer().with_tracer(tracer))
.try_init()
.unwrap();
}

View File

@ -11,20 +11,21 @@ search = []
stocks = []
orders = []
payments = ['payment_adapter']
default = ['accounts', 'carts', 'emails', 'search', 'stocks', 'orders', 'payments']
tokens = []
default = ['accounts', 'carts', 'emails', 'search', 'stocks', 'orders', 'payments', 'tokens']
[dependencies]
bincode = { version = "*" }
bytes = { version = "1.2.1" }
bytes = { version = "1" }
config = { path = "../config" }
futures = { version = "0.3.25" }
futures = { version = "0" }
model = { path = "../model" }
payment_adapter = { path = "../payment_adapter", optional = true }
rumqttc = { version = "*" }
serde = { version = "*", features = ['derive'] }
strum = { version = "0.24.1", features = ['strum_macros', 'default', 'derive'] }
tarpc = { version = "0.31.0", features = ["tokio1", "serde-transport-bincode", "serde-transport", "serde", "serde-transport-json", "tcp"] }
thiserror = { version = "1.0.37" }
tokio = { version = "1.21.2", features = ['full'] }
tracing = { version = "0.1.37" }
whatlang = { version = "0.16.2" }
payment_adapter = { path = "../payment_adapter", optional = true }
strum = { version = "0", features = ['strum_macros', 'default', 'derive'] }
tarpc = { version = "0", features = ["tokio1", "serde-transport-bincode", "serde-transport", "serde", "serde-transport-json", "tcp"] }
thiserror = { version = "1" }
tokio = { version = "1", features = ['full'] }
tracing = { version = "0" }
whatlang = { version = "0" }

View File

@ -1,4 +1,4 @@
pub static CLIENT_NAME: &str = "cart-manager";
pub static CLIENT_NAME: &str = "carts";
pub enum Topic {}
@ -154,7 +154,7 @@ pub mod mqtt {
use config::SharedAppConfig;
use rumqttc::EventLoop;
use crate::carts::CLIENT_NAME;
use super::CLIENT_NAME;
use crate::AsyncClient;
pub fn create_client(config: SharedAppConfig) -> (AsyncClient, EventLoop) {

View File

@ -18,6 +18,8 @@ pub mod rpc;
pub mod search;
#[cfg(feature = "stocks")]
pub mod stocks;
#[cfg(feature = "tokens")]
pub mod tokens;
pub trait DeserializePayload {
fn deserialize_payload<T: serde::de::DeserializeOwned>(self, bytes: bytes::Bytes) -> Option<T>;

View File

@ -1,7 +1,4 @@
// use rumqttc::QoS;
pub use payment_adapter::*;
// use crate::AsyncClient;
pub static CLIENT_NAME: &str = "payments";
@ -13,8 +10,21 @@ pub enum Error {
UnknownAdapter(String),
#[error("Payment failed")]
PaymentFailed,
#[error("Payment adapter returned invalid response data")]
#[error("Cancel failed")]
CancelFailed,
#[error("Payment adapter returned invalid create order response data")]
MalformedCreatePaymentResult,
#[error("Payment adapter returned invalid cancel response data")]
MalformedCancelResult,
#[error("Update payment state failed")]
UpdateStateFailed,
#[error("Payment adapter returned invalid update state response data")]
MalformedUpdateStateResult,
//
#[error("Unable to send msg to adapter")]
SendToWasmFailed,
#[error("Unable to receive response from adapter")]
RecvFromWasmFailed,
}
#[derive(Debug, Copy, Clone)]
@ -91,7 +101,7 @@ pub mod cancel {
use super::Error;
#[derive(Debug, serde::Serialize, serde::Deserialize)]
#[derive(Debug, Copy, Clone, serde::Serialize, serde::Deserialize)]
pub enum RefundType {
/// Refund entire payment
Full,

View File

@ -0,0 +1,97 @@
pub static CLIENT_NAME: &str = "tokens";
pub enum Topic {}
#[derive(Debug, Clone, thiserror::Error, serde::Serialize, serde::Deserialize)]
pub enum Error {}
pub mod create_pair {
use super::Error;
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct Input {}
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct Details {}
pub type Output = Result<Details, Error>;
}
pub mod validate {
use super::Error;
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct Input {}
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct Details {}
pub type Output = Result<Details, Error>;
}
pub mod refresh {
use model::RefreshTokenString;
use super::Error;
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct Input {
pub refresh_token: RefreshTokenString,
}
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct Details {}
pub type Output = Result<Details, Error>;
}
pub mod rpc {
use config::SharedAppConfig;
use crate::tokens::{create_pair, refresh, validate};
#[tarpc::service]
pub trait Tokens {
/// Create new access token and refresh token without any validations
async fn create_pair(input: create_pair::Input) -> create_pair::Output;
/// Check if access token is valid
async fn validate(input: validate::Input) -> validate::Output;
/// Validate with refresh token and create new access token and refresh
/// token
async fn refresh(input: refresh::Input) -> refresh::Output;
}
pub async fn create_client(config: SharedAppConfig) -> TokensClient {
use tarpc::client;
use tarpc::tokio_serde::formats::Bincode;
let addr = {
let l = config.lock();
(l.tokens().rpc_bind.clone(), l.tokens().rpc_port)
};
let transport = tarpc::serde_transport::tcp::connect(addr, Bincode::default);
let client = TokensClient::new(
client::Config::default(),
transport.await.expect("Failed to connect to server"),
)
.spawn();
client
}
}
pub mod mqtt {
use config::SharedAppConfig;
use rumqttc::EventLoop;
use super::CLIENT_NAME;
use crate::AsyncClient;
pub fn create_client(config: SharedAppConfig) -> (AsyncClient, EventLoop) {
crate::mqtt::create_client(CLIENT_NAME, config.lock().tokens().mqtt_addr())
}
}

View File

@ -8,11 +8,13 @@ full = ['actix-web', 'cookie']
default = []
[dependencies]
actix-web = { version = "4.0", features = [], optional = true }
cookie = { version = "0.16.1", features = ["signed"], optional = true }
parking_lot = { version = "0.12", features = [] }
password-hash = { version = "0.4", features = ["alloc"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = { version = "1.0", features = [] }
thiserror = { version = "1.0" }
toml = { version = "0.5", features = [] }
actix-web = { version = "4", features = [], optional = true }
cookie = { version = "0", features = ["signed"], optional = true }
parking_lot = { version = "0", features = [] }
password-hash = { version = "0", features = ["alloc"] }
serde = { version = "1", features = ["derive"] }
serde_json = { version = "1", features = [] }
thiserror = { version = "1" }
toml = { version = "0", features = [] }
tracing-subscriber = { version = "0", features = ['env-filter'] }
tracing-timing = { version = "0", features = [] }

View File

@ -625,32 +625,67 @@ impl PaymentConfig {
}
}
#[derive(Serialize, Deserialize)]
pub struct TokensConfig {
pub rpc_port: u16,
pub rpc_bind: String,
pub mqtt_port: u16,
pub mqtt_bind: String,
pub database_url: String,
}
impl Default for TokensConfig {
fn default() -> Self {
Self {
rpc_port: 19336,
rpc_bind: "0.0.0.0".into(),
mqtt_port: 1889,
mqtt_bind: "0.0.0.0".into(),
database_url: "postgres://postgres@localhost/bazzar_tokens".into(),
}
}
}
impl Example for TokensConfig {}
impl TokensConfig {
pub fn rpc_addr(&self) -> (&str, u16) {
(&self.rpc_bind, self.rpc_port)
}
pub fn mqtt_addr(&self) -> (&str, u16) {
(&self.mqtt_bind, self.mqtt_port)
}
}
#[derive(Serialize, Deserialize)]
pub struct AppConfig {
#[serde(default)]
payment: PaymentConfig,
#[serde(default)]
web: WebConfig,
#[serde(default)]
mail: MailConfig,
#[serde(default)]
database: DatabaseConfig,
#[serde(default)]
search: SearchConfig,
#[serde(default)]
files: FilesConfig,
#[serde(default)]
account_manager: AccountManagerConfig,
#[serde(default)]
cart_manager: CartManagerConfig,
#[serde(skip)]
config_path: String,
#[serde(default)]
database: DatabaseConfig,
#[serde(default)]
email_sender: EmailSenderConfig,
#[serde(default)]
stocks: StocksConfig,
files: FilesConfig,
#[serde(default)]
mail: MailConfig,
#[serde(default)]
order_manager: OrderConfig,
#[serde(skip)]
config_path: String,
#[serde(default)]
payment: PaymentConfig,
#[serde(default)]
search: SearchConfig,
#[serde(default)]
stocks: StocksConfig,
#[serde(default)]
tokens: TokensConfig,
#[serde(default)]
web: WebConfig,
}
impl Example for AppConfig {
@ -668,51 +703,12 @@ impl Example for AppConfig {
stocks: StocksConfig::example(),
order_manager: OrderConfig::example(),
config_path: "".to_string(),
tokens: TokensConfig::example(),
}
}
}
impl AppConfig {
pub fn payment(&self) -> &PaymentConfig {
&self.payment
}
pub fn web(&self) -> &WebConfig {
&self.web
}
pub fn mail(&self) -> &MailConfig {
&self.mail
}
pub fn database(&self) -> &DatabaseConfig {
&self.database
}
pub fn payment_mut(&mut self) -> &mut PaymentConfig {
&mut self.payment
}
pub fn web_mut(&mut self) -> &mut WebConfig {
&mut self.web
}
pub fn mail_mut(&mut self) -> &mut MailConfig {
&mut self.mail
}
pub fn database_mut(&mut self) -> &mut DatabaseConfig {
&mut self.database
}
pub fn search(&self) -> &SearchConfig {
&self.search
}
pub fn files(&self) -> &FilesConfig {
&self.files
}
pub fn account_manager(&self) -> &AccountManagerConfig {
&self.account_manager
}
@ -721,16 +717,60 @@ impl AppConfig {
&self.cart_manager
}
pub fn database(&self) -> &DatabaseConfig {
&self.database
}
pub fn database_mut(&mut self) -> &mut DatabaseConfig {
&mut self.database
}
pub fn email_sender(&self) -> &EmailSenderConfig {
&self.email_sender
}
pub fn files(&self) -> &FilesConfig {
&self.files
}
pub fn mail(&self) -> &MailConfig {
&self.mail
}
pub fn mail_mut(&mut self) -> &mut MailConfig {
&mut self.mail
}
pub fn orders_manager(&self) -> &OrderConfig {
&self.order_manager
}
pub fn payment(&self) -> &PaymentConfig {
&self.payment
}
pub fn payment_mut(&mut self) -> &mut PaymentConfig {
&mut self.payment
}
pub fn search(&self) -> &SearchConfig {
&self.search
}
pub fn stocks_manager(&self) -> &StocksConfig {
&self.stocks
}
pub fn orders_manager(&self) -> &OrderConfig {
&self.order_manager
pub fn tokens(&self) -> &TokensConfig {
&self.tokens
}
pub fn web(&self) -> &WebConfig {
&self.web
}
pub fn web_mut(&mut self) -> &mut WebConfig {
&mut self.web
}
}
@ -749,6 +789,7 @@ impl Default for AppConfig {
stocks: StocksConfig::default(),
order_manager: OrderConfig::default(),
config_path: "".to_string(),
tokens: Default::default(),
}
}
}
@ -822,3 +863,15 @@ pub fn save(config_path: &str, config: &mut AppConfig) {
config.config_path = config_path.into();
std::fs::write(config_path, toml::to_string_pretty(&config).unwrap()).unwrap();
}
pub fn init_tracing(_service_name: &str) {
use tracing_subscriber::fmt::format::FmtSpan;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
tracing_subscriber::registry()
.with(tracing_subscriber::EnvFilter::from_default_env())
.with(tracing_subscriber::fmt::layer().with_span_events(FmtSpan::NEW | FmtSpan::CLOSE))
.try_init()
.unwrap();
}

View File

@ -7,24 +7,24 @@ edition = "2021"
dummy = ["fake", "rand"]
[dependencies]
actix = { version = "0.13", features = [] }
actix-rt = { version = "2.7", features = [] }
async-trait = { version = "0.1.56" }
chrono = { version = "0.4", features = ["serde"] }
actix = { version = "0", features = [] }
actix-rt = { version = "2", features = [] }
async-trait = { version = "0" }
chrono = { version = "0", features = ["serde"] }
config = { path = "../config" }
fake = { version = "2.4.3", features = ["derive", "chrono", "http", "uuid"], optional = true }
itertools = { version = "0.10.3" }
fake = { version = "2", features = ["derive", "chrono", "http", "uuid"], optional = true }
itertools = { version = "0" }
model = { path = "../model", features = ['db', 'dummy', 'rand'] }
pretty_env_logger = { version = "0.4", features = [] }
rand = { version = "0.8.5", optional = true }
pretty_env_logger = { version = "0", features = [] }
rand = { version = "0", optional = true }
rumqttc = { version = "*" }
serde = { version = "1.0", features = ["derive"] }
sqlx = { version = "0.6.2", features = ["migrate", "runtime-actix-rustls", "all-types", "postgres"] }
sqlx-core = { version = "0.6.2", features = [] }
thiserror = { version = "1.0.31" }
tracing = { version = "0.1.34" }
uuid = { version = "1.2.1", features = ["serde"] }
serde = { version = "1", features = ["derive"] }
sqlx = { version = "0", features = ["migrate", "runtime-actix-rustls", "all-types", "postgres"] }
sqlx-core = { version = "0", features = [] }
thiserror = { version = "1" }
tracing = { version = "0" }
uuid = { version = "1", features = ["serde"] }
[dev-dependencies]
fake = { version = "2.5.0" }
fake = { version = "2" }
testx = { path = "../testx" }

View File

@ -5,18 +5,18 @@ edition = "2021"
[dependencies]
account-manager = { path = '../account_manager' }
bytes = { version = "1.1.0" }
bytes = { version = "1" }
channels = { path = '../channels' }
config = { path = "../config" }
dotenv = { version = "0.15", features = [] }
fakeit = { version = "1.1.1", features = [] }
dotenv = { version = "0", features = [] }
fakeit = { version = "1", features = [] }
fs_manager = { path = "../fs_manager", features = [] }
human-panic = { version = "1.0.3" }
human-panic = { version = "1" }
model = { path = "../model", version = "0.1", features = ["db", "dummy"] }
password-hash = { version = "0.4", features = ["alloc"] }
rand = { version = "0.8.5" }
password-hash = { version = "0", features = ["alloc"] }
rand = { version = "0" }
stock-manager = { path = "../stock_manager" }
thiserror = { version = "1.0.31" }
tokio = { version = "1.18.1", features = ["full"] }
tracing = { version = "0.1.34" }
tracing-subscriber = { version = "0.3.11" }
thiserror = { version = "1" }
tokio = { version = "1", features = ["full"] }
tracing = { version = "0" }
tracing-subscriber = { version = "0" }

View File

@ -5,5 +5,5 @@ edition = "2021"
[dependencies]
model = { path = "../model", features = ["db"] }
sqlx = { version = "0.6.2", features = ["migrate", "runtime-actix-rustls", "all-types", "postgres"] }
sqlx-core = { version = "0.6.2", features = [] }
sqlx = { version = "0", features = ["migrate", "runtime-actix-rustls", "all-types", "postgres"] }
sqlx-core = { version = "0", features = [] }

View File

@ -8,29 +8,22 @@ name = "email-sender"
path = "./src/main.rs"
[dependencies]
actix = { version = "0.13", features = [] }
actix-rt = { version = "2.7", features = [] }
channels = { path = "../channels" }
chrono = { version = "0.4", features = ["serde"] }
chrono = { version = "0", features = ["serde"] }
config = { path = "../config" }
dotenv = { version = "0.15.0" }
dotenv = { version = "0" }
handlebars = { version = "*", features = [] }
model = { path = "../model" }
opentelemetry = { version = "0.17.0" }
opentelemetry-jaeger = { version = "0.17.0" }
pretty_env_logger = { version = "0.4", features = [] }
rumqttc = { version = "*" }
sendgrid = { version = "0.18.1", features = ["async"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = { version = "1.0", features = [] }
tarpc = { version = "0.31.0", features = ["tokio1", "serde-transport-bincode", "serde-transport", "serde", "serde-transport-json", "tcp"] }
thiserror = { version = "1.0.31" }
tokio = { version = "1.21.2", features = ['full'] }
tracing = { version = "0.1.37" }
tracing-opentelemetry = { version = "0.17.4" }
tracing-subscriber = { version = "0.3.16", features = ["env-filter"] }
uuid = { version = "0.8", features = ["serde"] }
sendgrid = { version = "0", features = ["async"] }
serde = { version = "1", features = ["derive"] }
serde_json = { version = "1", features = [] }
tarpc = { version = "0", features = ["tokio1", "serde-transport-bincode", "serde-transport", "serde", "serde-transport-json", "tcp"] }
thiserror = { version = "1" }
tokio = { version = "1", features = ['full'] }
tracing = { version = "0" }
uuid = { version = "0", features = ["serde"] }
[dev-dependencies]
fake = { version = "2.5.0" }
fake = { version = "2" }
testx = { path = "../testx" }

View File

@ -1,9 +1,6 @@
use std::sync::Arc;
use config::{SharedAppConfig, UpdateConfig};
use tracing_subscriber::fmt::format::FmtSpan;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
// use crate::db::Database;
@ -55,10 +52,10 @@ impl Context {
pub type SharedContext = Arc<Context>;
#[actix::main]
#[tokio::main]
async fn main() {
dotenv::dotenv().ok();
init_tracing("email-sender");
config::init_tracing("email-sender");
let config = config::default_load(&Opts {});
@ -67,23 +64,3 @@ async fn main() {
let _mqtt_client = mqtt::start(config.clone(), context.clone()).await;
// rpc::start(config.clone(), db.clone(), mqtt_client.clone()).await;
}
pub fn init_tracing(_service_name: &str) {
std::env::set_var("OTEL_BSP_MAX_EXPORT_BATCH_SIZE", "12");
let tracer = {
use opentelemetry::sdk::export::trace::stdout::new_pipeline;
use opentelemetry::sdk::trace::Config;
new_pipeline()
.with_trace_config(Config::default())
.with_pretty_print(true)
.install_simple()
};
tracing_subscriber::registry()
.with(tracing_subscriber::EnvFilter::from_default_env())
.with(tracing_subscriber::fmt::layer().with_span_events(FmtSpan::NEW | FmtSpan::CLOSE))
.with(tracing_opentelemetry::layer().with_tracer(tracer))
.try_init()
.unwrap();
}

View File

@ -4,19 +4,13 @@ version = "0.1.0"
edition = "2021"
[dependencies]
actix = { version = "0.13", features = [] }
actix-rt = { version = "2.7", features = [] }
actix-web = { version = "4.0.1" }
bytes = { version = "1.1.0" }
chrono = { version = "0.4", features = ["serde"] }
bytes = { version = "1" }
chrono = { version = "0", features = ["serde"] }
config = { path = "../config" }
fibers_rpc = { version = "0.3.4", features = [] }
model = { path = "../model" }
pretty_env_logger = { version = "0.4", features = [] }
rumqttc = { version = "*" }
serde = { version = "1.0", features = ["derive"] }
thiserror = { version = "1.0.31" }
tokio = { version = "1.18.1", features = ["full"] }
tracing = { version = "0.1.34" }
uuid = { version = "1.2.1", features = ["serde"] }
serde = { version = "1", features = ["derive"] }
thiserror = { version = "1" }
tokio = { version = "1", features = ["full"] }
tracing = { version = "0" }
uuid = { version = "1", features = ["serde"] }

View File

@ -4,13 +4,10 @@ version = "0.1.0"
edition = "2021"
[dependencies]
actix = { version = "0.13", features = [] }
actix-rt = { version = "2.7", features = [] }
config = { path = "../config" }
fluent = { version = "0.16.0" }
fluent = { version = "0" }
model = { path = "../model" }
pretty_env_logger = { version = "0.4", features = [] }
rumqttc = { version = "*" }
thiserror = { version = "1.0.31" }
tracing = { version = "0.1.34" }
unic-langid = { version = "0.9.0" }
thiserror = { version = "1" }
tracing = { version = "0" }
unic-langid = { version = "0" }

View File

@ -17,17 +17,16 @@ payments = []
default = ['accounts', 'carts', 'emails', 'search', 'stocks', 'orders', 'payments']
[dependencies]
argon2 = { version = "0.4", features = ["parallel", "password-hash"] }
chrono = { version = "0.4", features = ["serde"] }
derive_more = { version = "0.99.17" }
argon2 = { version = "0", features = ["parallel", "password-hash"] }
chrono = { version = "0", features = ["serde"] }
derive_more = { version = "0" }
fake = { version = "2", features = ["derive", "chrono", "http", "uuid", "dummy"], optional = true }
password-hash = { version = "0.4", features = ["alloc"] }
rand = { version = "0.8.5", optional = true }
rand_core = { version = "0.6", features = ["std"] }
serde = { version = "1.0.137" }
sqlx = { version = "0.6.2", features = ["migrate", "runtime-actix-rustls", "all-types", "postgres"], optional = true }
sqlx-core = { version = "0.6.2", features = [], optional = true }
thiserror = { version = "1.0.31" }
uuid = { version = "1.2.1", features = ["serde"] }
validator = { version = "0.16.0" }
#tracing = { version = "0.1.34" }
password-hash = { version = "0", features = ["alloc"] }
rand = { version = "0", optional = true }
rand_core = { version = "0", features = ["std"] }
serde = { version = "1" }
sqlx = { version = "0", features = ["migrate", "runtime-actix-rustls", "all-types", "postgres"], optional = true }
sqlx-core = { version = "0", features = [], optional = true }
thiserror = { version = "1" }
uuid = { version = "1", features = ["serde"] }
validator = { version = "0" }

View File

@ -9,24 +9,20 @@ path = "./src/main.rs"
[dependencies]
channels = { path = "../channels" }
chrono = { version = "0.4", features = ["serde"] }
chrono = { version = "0", features = ["serde"] }
config = { path = "../config" }
db-utils = { path = "../db-utils" }
model = { path = "../model", features = ["db"] }
opentelemetry = { version = "0.17.0" }
opentelemetry-jaeger = { version = "0.17.0" }
rumqttc = { version = "*" }
serde = { version = "1.0.137", features = ["derive"] }
sqlx = { version = "0.6.2", features = ["migrate", "runtime-actix-rustls", "all-types", "postgres"] }
sqlx-core = { version = "0.6.2", features = [] }
tarpc = { version = "0.31.0", features = ["tokio1", "serde-transport-bincode", "serde-transport", "serde", "serde-transport-json", "tcp"] }
thiserror = { version = "1.0.31" }
tokio = { version = "1.21.2", features = ['full'] }
tracing = { version = "0.1.6" }
tracing-opentelemetry = { version = "0.17.4" }
tracing-subscriber = { version = "0.3.16", features = ["env-filter"] }
uuid = { version = "1.2.1", features = ["serde"] }
serde = { version = "1", features = ["derive"] }
sqlx = { version = "0", features = ["migrate", "runtime-actix-rustls", "all-types", "postgres"] }
sqlx-core = { version = "0", features = [] }
tarpc = { version = "0", features = ["tokio1", "serde-transport-bincode", "serde-transport", "serde", "serde-transport-json", "tcp"] }
thiserror = { version = "1" }
tokio = { version = "1", features = ['full'] }
tracing = { version = "0" }
uuid = { version = "1", features = ["serde"] }
[dev-dependencies]
fake = { version = "2.5.0" }
fake = { version = "2" }
testx = { path = "../testx" }

View File

@ -13,6 +13,7 @@ impl UpdateConfig for Opts {}
#[tokio::main]
async fn main() {
let opts = Opts {};
config::init_tracing("orders");
let config = config::default_load(&opts);

View File

@ -5,9 +5,9 @@ edition = "2021"
[dependencies]
config = { path = "../config", default-features = false, features = [] }
futures = { version = "0" }
model = { path = "../model" }
serde = { version = "1.0.149", features = ['derive'] }
uuid = { version = "1.2.2", features = ['v4'] }
futures = { version = "0.3.25" }
tracing = { version = "0.1.37" }
thiserror = { version = "1.0.37" }
serde = { version = "1", features = ['derive'] }
thiserror = { version = "1" }
tracing = { version = "0" }
uuid = { version = "1", features = ['v4'] }

View File

@ -66,7 +66,6 @@ pub struct Item {
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[repr(C)]
pub struct CreatePayment {
pub buyer: Buyer,
pub customer_ip: String,
@ -80,24 +79,38 @@ pub struct CreatePayment {
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[repr(C)]
pub struct OrderCreated {
pub ext_order_id: Option<ExtOrderId>,
pub redirect_uri: Option<String>,
}
#[derive(Debug, Copy, Clone, serde::Serialize, serde::Deserialize)]
pub enum RefundType {
/// Refund entire payment
Full,
/// Refund only part given in enum
Partial(Price),
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct Cancel {
pub refund: RefundType,
pub provider_order_id: String,
pub description: String,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct Cancelled {}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[repr(C)]
pub struct UpdateStatus {
pub payload: Vec<u8>,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[repr(C)]
pub struct StatusUpdated {}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[repr(C)]
pub enum HttpMethod {
Get,
Post,

View File

@ -2,6 +2,12 @@
# It is not intended for manual editing.
version = 3
[[package]]
name = "adler"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
[[package]]
name = "aho-corasick"
version = "0.7.20"
@ -38,6 +44,12 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]]
name = "base64"
version = "0.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8"
[[package]]
name = "base64ct"
version = "1.5.3"
@ -143,6 +155,8 @@ dependencies = [
"serde_json",
"thiserror",
"toml",
"tracing-subscriber",
"tracing-timing",
]
[[package]]
@ -157,6 +171,29 @@ version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5827cebf4670468b8772dd191856768aedcb1b0278a04f989f7766351917b9dc"
[[package]]
name = "crc32fast"
version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b540bd8bc810d3885c6ea91e2018302f68baba2129ab3e88f32389ee9370880d"
dependencies = [
"cfg-if",
]
[[package]]
name = "crossbeam"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2801af0d36612ae591caa9568261fddce32ce6e08a7275ea334a06a4ad021a2c"
dependencies = [
"cfg-if",
"crossbeam-channel",
"crossbeam-deque",
"crossbeam-epoch",
"crossbeam-queue",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-channel"
version = "0.5.6"
@ -191,6 +228,16 @@ dependencies = [
"scopeguard",
]
[[package]]
name = "crossbeam-queue"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d1cfb3ea8a53f37c40dea2c7bedcbd88bdfae54f5e2175d6ecaff1c988353add"
dependencies = [
"cfg-if",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.14"
@ -278,12 +325,28 @@ dependencies = [
"subtle",
]
[[package]]
name = "doc-comment"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10"
[[package]]
name = "either"
version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "90e5c1c8368803113bf0c9584fc495a58b86dc8a29edbf8fe877d21d9507e797"
[[package]]
name = "flate2"
version = "1.0.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a8a2db397cb1c8772f31494cb8917e48cd1e64f0fa7efac59fbd741a0a8ce841"
dependencies = [
"crc32fast",
"miniz_oxide",
]
[[package]]
name = "form_urlencoded"
version = "1.1.0"
@ -382,6 +445,15 @@ dependencies = [
"slab",
]
[[package]]
name = "fxhash"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c31b6d751ae2c7f11320402d34e41349dd1016f8d5d45e48c4312bc8625af50c"
dependencies = [
"byteorder",
]
[[package]]
name = "generic-array"
version = "0.14.6"
@ -403,6 +475,26 @@ dependencies = [
"wasi 0.11.0+wasi-snapshot-preview1",
]
[[package]]
name = "hashbrown"
version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888"
[[package]]
name = "hdrhistogram"
version = "7.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f19b9f54f7c7f55e31401bb647626ce0cf0f67b0004982ce815b3ee72a02aa8"
dependencies = [
"base64",
"byteorder",
"crossbeam-channel",
"flate2",
"nom",
"num-traits",
]
[[package]]
name = "hermit-abi"
version = "0.1.19"
@ -457,6 +549,16 @@ dependencies = [
"unicode-normalization",
]
[[package]]
name = "indexmap"
version = "1.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1885e79c1fc4b10f0e172c475f458b7f7b93061064d98c3293e98c5ba0c8b399"
dependencies = [
"autocfg",
"hashbrown",
]
[[package]]
name = "instant"
version = "0.1.12"
@ -521,6 +623,24 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "mach"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b823e83b2affd8f40a9ee8c29dbc56404c1e34cd2710921f2801e2cf29527afa"
dependencies = [
"libc",
]
[[package]]
name = "matchers"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558"
dependencies = [
"regex-automata",
]
[[package]]
name = "matches"
version = "0.1.9"
@ -542,6 +662,21 @@ dependencies = [
"autocfg",
]
[[package]]
name = "minimal-lexical"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a"
[[package]]
name = "miniz_oxide"
version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b275950c28b37e794e8c55d88aeb5e139d0ce23fdbbeda68f8d7174abdf9e8fa"
dependencies = [
"adler",
]
[[package]]
name = "model"
version = "0.1.0"
@ -557,6 +692,26 @@ dependencies = [
"validator",
]
[[package]]
name = "nom"
version = "7.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a8903e5a29a317527874d0402f867152a3d21c908bb0b933e416c65e301d4c36"
dependencies = [
"memchr",
"minimal-lexical",
]
[[package]]
name = "nu-ansi-term"
version = "0.46.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84"
dependencies = [
"overload",
"winapi",
]
[[package]]
name = "num-integer"
version = "0.1.45"
@ -592,6 +747,12 @@ version = "1.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "86f0b0d4bf799edbc74508c1e8bf170ff5f41238e5f8225603ca7caaae2b7860"
[[package]]
name = "overload"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
[[package]]
name = "parking_lot"
version = "0.11.2"
@ -712,6 +873,22 @@ dependencies = [
"unicode-ident",
]
[[package]]
name = "quanta"
version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "20afe714292d5e879d8b12740aa223c6a88f118af41870e8b6196e39a02238a8"
dependencies = [
"crossbeam-utils",
"libc",
"mach",
"once_cell",
"raw-cpuid",
"wasi 0.10.0+wasi-snapshot-preview1",
"web-sys",
"winapi",
]
[[package]]
name = "quote"
version = "1.0.21"
@ -730,6 +907,15 @@ dependencies = [
"getrandom",
]
[[package]]
name = "raw-cpuid"
version = "10.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a6823ea29436221176fe662da99998ad3b4db2c7f31e7b6f5fe43adccd6320bb"
dependencies = [
"bitflags",
]
[[package]]
name = "rayon"
version = "1.6.0"
@ -773,6 +959,15 @@ dependencies = [
"regex-syntax",
]
[[package]]
name = "regex-automata"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132"
dependencies = [
"regex-syntax",
]
[[package]]
name = "regex-syntax"
version = "0.6.28"
@ -865,6 +1060,15 @@ dependencies = [
"serde",
]
[[package]]
name = "sharded-slab"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "900fba806f70c630b0a382d0d825e17a0f19fcd059a2ade1ff237bcddf446b31"
dependencies = [
"lazy_static",
]
[[package]]
name = "slab"
version = "0.4.7"
@ -926,6 +1130,15 @@ dependencies = [
"syn",
]
[[package]]
name = "thread_local"
version = "1.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5516c27b78311c50bf42c071425c560ac799b11c30b31f87e3081965fe5e0180"
dependencies = [
"once_cell",
]
[[package]]
name = "time"
version = "0.1.45"
@ -991,6 +1204,53 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "24eb03ba0eab1fd845050058ce5e616558e8f8d8fca633e6b163fe25c797213a"
dependencies = [
"once_cell",
"valuable",
]
[[package]]
name = "tracing-log"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "78ddad33d2d10b1ed7eb9d1f518a5674713876e97e5bb9b7345a7984fbb4f922"
dependencies = [
"lazy_static",
"log",
"tracing-core",
]
[[package]]
name = "tracing-subscriber"
version = "0.3.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a6176eae26dd70d0c919749377897b54a9276bd7061339665dd68777926b5a70"
dependencies = [
"matchers",
"nu-ansi-term",
"once_cell",
"regex",
"sharded-slab",
"smallvec",
"thread_local",
"tracing",
"tracing-core",
"tracing-log",
]
[[package]]
name = "tracing-timing"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fbe4966d7b6ae25201de6ff9fa822afb0c9e933809187d5b82ad846ec108771b"
dependencies = [
"crossbeam",
"doc-comment",
"fxhash",
"hdrhistogram",
"indexmap",
"quanta",
"slab",
"tracing-core",
"tracing-subscriber",
]
[[package]]
@ -1062,6 +1322,12 @@ dependencies = [
"url",
]
[[package]]
name = "valuable"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d"
[[package]]
name = "version_check"
version = "0.9.4"
@ -1154,6 +1420,16 @@ version = "0.2.83"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1c38c045535d93ec4f0b4defec448e4291638ee608530863b1e2ba115d4fff7f"
[[package]]
name = "web-sys"
version = "0.3.60"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bcda906d8be16e728fd5adc5b729afad4e444e106ab28cd1c7256e54fa61510f"
dependencies = [
"js-sys",
"wasm-bindgen",
]
[[package]]
name = "winapi"
version = "0.3.9"

View File

@ -7,13 +7,13 @@ edition = "2021"
crate-type = ['cdylib']
[dependencies]
bincode = { version = "1" }
#tracing = { version = "0" }
chrono = { version = "0", features = ['alloc', 'wasmbind'] }
common_macros = { version = "0" }
payment_adapter = { path = "../payment_adapter" }
bincode = { version = "1.3.3" }
wapc-codec = { version = "1.0.0" }
wapc-guest = { version = "1.0.0" }
#tracing = { version = "0.1.37" }
chrono = { version = "0.4.23", features = ['alloc', 'wasmbind'] }
serde = { version = "1.0.149", features = ['derive'] }
thiserror = { version = "1.0.37" }
serde_json = { version = "1.0.89" }
common_macros = { version = "0.1.1" }
serde = { version = "1", features = ['derive'] }
serde_json = { version = "1" }
thiserror = { version = "1" }
wapc-codec = { version = "1" }
wapc-guest = { version = "1" }

View File

@ -8,36 +8,31 @@ name = "payment-manager"
path = "./src/main.rs"
[dependencies]
bincode = { version = "1" }
channels = { path = "../channels" }
chrono = { version = "0.4", features = ["serde"] }
chrono = { version = "0", features = ["serde"] }
config = { path = "../config" }
db-utils = { path = "../db-utils" }
gumdrop = { version = "0.8.1", features = [] }
llvmenv = { version = "0.3.2" }
gumdrop = { version = "0", features = [] }
llvmenv = { version = "0" }
model = { path = "../model", features = ["db"] }
opentelemetry = { version = "0.17.0" }
opentelemetry-jaeger = { version = "0.17.0" }
payment_adapter = { path = "../payment_adapter" }
reqwest = { version = "0", features = ["default", "json", "blocking"] }
rumqttc = { version = "*" }
serde = { version = "1.0.137", features = ["derive"] }
sqlx = { version = "0.6.2", features = ["migrate", "runtime-actix-rustls", "all-types", "postgres"] }
sqlx-core = { version = "0.6.2", features = [] }
tarpc = { version = "0.31.0", features = ["tokio1", "serde-transport-bincode", "serde-transport", "serde", "serde-transport-json", "tcp"] }
thiserror = { version = "1.0.31" }
tokio = { version = "1.21.2", features = ['full'] }
tracing = { version = "0.1.6" }
tracing-opentelemetry = { version = "0.17.4" }
tracing-subscriber = { version = "0.3.16", features = ["env-filter"] }
uuid = { version = "1.2.1", features = ["serde", "v4"] }
bincode = { version = "1.3.3" }
wapc = { version = "1.0.0", features = [] }
wapc-codec = { version = "1.0.0", features = [] }
wapc-pool = { version = "1.0.0", features = [] }
wasmtime = { version = "3.0.1", features = ['parallel-compilation', 'async'] }
wasmtime-provider = { version = "1.3.2", features = ['wasmtime-wasi', 'wasi-common', 'wasi', 'cache'] }
reqwest = { version = "0.11.13", features = ["default", "json", "blocking"] }
#pay_u = { path = "../../vendor/pay_u" }
serde = { version = "1", features = ["derive"] }
sqlx = { version = "0", features = ["migrate", "runtime-actix-rustls", "all-types", "postgres"] }
sqlx-core = { version = "0", features = [] }
tarpc = { version = "0", features = ["tokio1", "serde-transport-bincode", "serde-transport", "serde", "serde-transport-json", "tcp"] }
thiserror = { version = "1" }
tokio = { version = "1", features = ['full'] }
tracing = { version = "0" }
uuid = { version = "1", features = ["serde", "v4"] }
wapc = { version = "1", features = [] }
wapc-codec = { version = "1", features = [] }
wapc-pool = { version = "1", features = [] }
wasmtime = { version = "3", features = ['parallel-compilation', 'async'] }
wasmtime-provider = { version = "1", features = ['wasmtime-wasi', 'wasi-common', 'wasi', 'cache'] }
[dev-dependencies]
fake = { version = "2.5.0" }
fake = { version = "2" }
testx = { path = "../testx" }

View File

@ -3,14 +3,16 @@ use std::fs::read_dir;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::mpsc::Sender;
use std::sync::{Arc, LockResult, MutexGuard, RwLock, RwLockReadGuard};
use std::sync::{Arc, LockResult, MutexGuard};
use config::{AppConfig, UpdateConfig};
use payment_adapter::{HttpMethod, HttpRequest, Item, Product};
use config::{AppConfig, SharedAppConfig, UpdateConfig};
use payment_adapter::{HttpMethod, HttpRequest};
use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
use wapc::WasiParams;
use wapc_pool::{HostPool, HostPoolBuilder};
use crate::rpc::ModuleSender;
// mod actions;
// mod context;
// mod db;
@ -23,14 +25,23 @@ mod rpc;
pub enum WasmMsg {
CreateOrder {
create_payment: payment_adapter::CreatePayment,
callback: std::sync::mpsc::Sender<ResultMsg>,
tx: Sender<ResultMsg>,
},
UpdateStatus {
update_status: payment_adapter::UpdateStatus,
tx: Sender<ResultMsg>,
},
Cancel {
cancel: payment_adapter::Cancel,
tx: Sender<ResultMsg>,
},
UpdateStatus(Vec<u8>),
}
#[derive(Clone)]
pub enum ResultMsg {
OrderCreated(payment_adapter::OrderCreated),
StatusUpdated(payment_adapter::StatusUpdated),
Cancel(payment_adapter::Cancelled),
}
pub struct HostChannel {
@ -54,41 +65,84 @@ impl HostChannel {
match msg {
WasmMsg::CreateOrder {
create_payment: msg,
callback,
tx,
} => {
let msg = match wapc_codec::messagepack::serialize(msg) {
Ok(r) => r,
_ => continue,
};
let call_result = match pool.call("create_payment", msg).await {
Ok(r) => r,
_ => continue,
};
let result: payment_adapter::OrderCreated =
match wapc_codec::messagepack::deserialize(&call_result) {
Ok(r) => r,
_ => continue,
};
if let Err(e) = callback.send(ResultMsg::OrderCreated(result)) {
tracing::error!("{e:?}");
}
Self::send_and_rec(
"create_payment",
msg,
tx,
&pool,
ResultMsg::OrderCreated,
)
.await
.ok();
}
WasmMsg::UpdateStatus {
update_status: msg,
tx,
} => {
Self::send_and_rec(
"update_status",
msg,
tx,
&pool,
ResultMsg::StatusUpdated,
)
.await
.ok();
}
WasmMsg::Cancel { cancel: msg, tx } => {
Self::send_and_rec("cancel_order", msg, tx, &pool, ResultMsg::Cancel)
.await
.ok();
}
WasmMsg::UpdateStatus(_) => {}
}
}
});
}
async fn send_and_rec<R, T, F>(
name: &str,
msg: R,
tx: Sender<ResultMsg>,
pool: &HostPool,
f: F,
) -> Result<(), ()>
where
R: serde::Serialize,
T: serde::de::DeserializeOwned,
F: FnOnce(T) -> ResultMsg,
{
macro_rules! ok_or_bail {
($run: expr) => {
match $run {
Ok(r) => r,
_ => return Err(()),
}
};
}
let msg = ok_or_bail!(wapc_codec::messagepack::serialize(msg));
let call_result = ok_or_bail!(pool.call(name, msg).await);
let result: T = ok_or_bail!(wapc_codec::messagepack::deserialize(&call_result));
if let Err(e) = tx.send(f(result)) {
tracing::error!("{e:?}");
Err(())
} else {
Ok(())
}
}
}
#[derive(Clone)]
pub struct Modules(Arc<std::sync::Mutex<HashMap<String, std::sync::mpsc::Sender<WasmMsg>>>>);
pub struct Modules(Arc<std::sync::Mutex<HashMap<String, ModuleSender>>>);
impl Modules {
pub fn new(h: HashMap<String, std::sync::mpsc::Sender<WasmMsg>>) -> Self {
pub fn new(h: HashMap<String, ModuleSender>) -> Self {
Self(Arc::new(std::sync::Mutex::new(h)))
}
pub fn lock(&self) -> LockResult<MutexGuard<'_, HashMap<String, Sender<WasmMsg>>>> {
pub fn lock(&self) -> LockResult<MutexGuard<'_, HashMap<String, ModuleSender>>> {
self.0.lock()
}
}
@ -108,139 +162,145 @@ impl UpdateConfig for Opts {
#[tokio::main]
async fn main() {
std::env::set_var("RUST_LOG", "debug");
tracing_subscriber::fmt::init();
config::init_tracing("payments");
let opts: Opts = gumdrop::parse_args_default_or_exit();
let config = config::default_load(&opts);
let mut modules;
let files = scan_adapters_dir(&config);
let mut modules = HashMap::with_capacity(files.len());
{
let adapters_path = config.lock().payment().adapters_path.clone();
let adapters = adapters_path.unwrap_or_else(|| panic!("No payment adapters path provided"));
let dir = read_dir(&adapters).unwrap_or_else(|e| {
panic!(
"Failed to load payment adapters at path {:?}. {}",
adapters, e
)
});
let files = dir
.filter_map(|r| r.map(|r| r.path()).ok())
.filter(|file| file.extension().and_then(|s| s.to_str()) == Some("wasm"))
.collect::<Vec<_>>();
load_adapters(&config, &mut modules, files).await;
if files.is_empty() {
panic!("No payment adapters found in adapters directory");
}
modules = HashMap::with_capacity(files.len());
for file in files {
let module = std::fs::read(&file).unwrap();
let engine = wasmtime_provider::WasmtimeEngineProviderBuilder::new()
.module_bytes(&module)
.wasi_params(WasiParams::default())
.build()
.unwrap();
let pool = HostPoolBuilder::new()
.name("pool")
.factory(move || {
wapc::WapcHost::new(
Box::new(engine.clone()),
Some(Box::new(
move |_a, binding, _namespace, msg_name, payload| {
Ok(host_call(binding, msg_name, payload)?)
},
)),
)
.unwrap()
})
.max_threads(5)
.build();
let name = pool
.call("name", vec![])
.await
.unwrap_or_else(|e| panic!("Failed to load adapter {file:?} `name`. {e}"));
let name: String = wapc_codec::messagepack::deserialize(&name)
.unwrap_or_else(|e| panic!("Adapter `name` ({name:?}) is not valid string. {e}"));
let msg = config
.lock()
.payment()
.providers
.get(&name)
.cloned()
.unwrap_or_default();
tracing::info!("Start init");
pool.call("init", wapc_codec::messagepack::serialize(msg).unwrap())
.await
.unwrap();
let (tx, rx) = std::sync::mpsc::channel();
HostChannel {
host: pool,
channel: rx,
}
.start();
modules.insert(name, tx);
}
// for pool in modules.values() {
// let msg = payment_adapter::CreatePayment {
// buyer: payment_adapter::Buyer {
// email: "hello@example.com".to_string(),
// phone: "530698478".to_string(),
// first_name: "Joe".to_string(),
// last_name: "Doe".to_string(),
// language: "pl".to_string(),
// },
// customer_ip: "12.22.34.54".to_string(),
// currency: "PLN".to_string(),
// description: "Nesciunt fugit libero quis dolorum quo.
// Tempore aut nisi voluptatem. Odio et aspernatur est. Sint vel
// molestias sunt cumque quibusdam reprehenderit est.".to_string(),
// cart_products: vec![Product {
// id: 23,
// name: "Socks".to_string(),
// unit_price: 1542,
// quantity_unit: "Unit".to_string(),
// quantity: 2,
// }],
// items: vec![Item {
// product_id: 23,
// quantity: 2,
// quantity_unit: "Unit".to_string(),
// }],
// order_ext_id: None,
// notify_uri: "https://localhost:3030/notify_uri".to_string(),
// continue_uri: "https://localhost:3030/continue_uri".to_string(),
// };
//
// tracing::info!("Start create_payment");
// let call_result = pool
// .call(
// "create_payment",
// wapc_codec::messagepack::serialize(msg).unwrap(),
// )
// .await
// .unwrap();
// let result: payment_adapter::OrderCreated =
// wapc_codec::messagepack::deserialize(&call_result).unwrap();
//
// tracing::info!("create payment res {:?}", result)
// }
}
// for pool in modules.values() {
// let msg = payment_adapter::CreatePayment {
// buyer: payment_adapter::Buyer {
// email: "hello@example.com".to_string(),
// phone: "530698478".to_string(),
// first_name: "Joe".to_string(),
// last_name: "Doe".to_string(),
// language: "pl".to_string(),
// },
// customer_ip: "12.22.34.54".to_string(),
// currency: "PLN".to_string(),
// description: "Nesciunt fugit libero quis dolorum quo.
// Tempore aut nisi voluptatem. Odio et aspernatur est. Sint vel
// molestias sunt cumque quibusdam reprehenderit est.".to_string(),
// cart_products: vec![Product {
// id: 23,
// name: "Socks".to_string(),
// unit_price: 1542,
// quantity_unit: "Unit".to_string(),
// quantity: 2,
// }],
// items: vec![Item {
// product_id: 23,
// quantity: 2,
// quantity_unit: "Unit".to_string(),
// }],
// order_ext_id: None,
// notify_uri: "https://localhost:3030/notify_uri".to_string(),
// continue_uri: "https://localhost:3030/continue_uri".to_string(),
// };
//
// tracing::info!("Start create_payment");
// let call_result = pool
// .call(
// "create_payment",
// wapc_codec::messagepack::serialize(msg).unwrap(),
// )
// .await
// .unwrap();
// let result: payment_adapter::OrderCreated =
// wapc_codec::messagepack::deserialize(&call_result).unwrap();
//
// tracing::info!("create payment res {:?}", result)
// }
let modules = Modules::new(modules);
let mqtt_client = mqtt::start(config.clone(), modules.clone()).await;
rpc::start(config, mqtt_client, modules.clone()).await;
}
async fn load_adapters(
config: &SharedAppConfig,
modules: &mut HashMap<String, ModuleSender>,
files: Vec<PathBuf>,
) {
for file in files {
let module = std::fs::read(&file).unwrap();
let engine = wasmtime_provider::WasmtimeEngineProviderBuilder::new()
.module_bytes(&module)
.wasi_params(WasiParams::default())
.build()
.unwrap();
let pool = HostPoolBuilder::new()
.name("pool")
.factory(move || {
wapc::WapcHost::new(
Box::new(engine.clone()),
Some(Box::new(
move |_a, binding, _namespace, msg_name, payload| {
Ok(host_call(binding, msg_name, payload)?)
},
)),
)
.unwrap()
})
.max_threads(5)
.build();
let name = pool
.call("name", vec![])
.await
.unwrap_or_else(|e| panic!("Failed to load adapter {file:?} `name`. {e}"));
let name: String = wapc_codec::messagepack::deserialize(&name)
.unwrap_or_else(|e| panic!("Adapter `name` ({name:?}) is not valid string. {e}"));
let msg = config
.lock()
.payment()
.providers
.get(&name)
.cloned()
.unwrap_or_default();
tracing::info!("Start init");
pool.call("init", wapc_codec::messagepack::serialize(msg).unwrap())
.await
.unwrap();
let (tx, rx) = std::sync::mpsc::channel();
HostChannel {
host: pool,
channel: rx,
}
.start();
modules.insert(name, ModuleSender::new(tx));
}
}
fn scan_adapters_dir(config: &SharedAppConfig) -> Vec<PathBuf> {
let adapters_path = config.lock().payment().adapters_path.clone();
let adapters = adapters_path.unwrap_or_else(|| panic!("No payment adapters path provided"));
let dir = read_dir(&adapters).unwrap_or_else(|e| {
panic!(
"Failed to load payment adapters at path {:?}. {}",
adapters, e
)
});
let files = dir
.filter_map(|r| r.map(|r| r.path()).ok())
.filter(|file| file.extension().and_then(|s| s.to_str()) == Some("wasm"))
.collect::<Vec<_>>();
if files.is_empty() {
panic!("No payment adapters found in adapters directory");
}
files
}
// #[tracing::instrument]
fn host_call(binding: &str, name: &str, payload: &[u8]) -> Result<Vec<u8>, payment_adapter::Error> {
match name {

View File

@ -1,13 +1,43 @@
use std::sync::mpsc::RecvError;
use std::sync::mpsc::Sender;
use channels::payments::rpc::Payments;
use channels::payments::{adapters, cancel, notification, start_payment};
use channels::AsyncClient;
use channels::{payments, AsyncClient};
use config::SharedAppConfig;
use payment_adapter::Cancel;
use tarpc::context;
use crate::{Modules, ResultMsg, WasmMsg};
#[derive(Clone)]
pub struct ModuleSender {
tx: Sender<WasmMsg>,
}
impl ModuleSender {
pub fn new(tx: Sender<WasmMsg>) -> Self {
Self { tx }
}
pub fn send<F>(&self, f: F) -> Result<ResultMsg, payments::Error>
where
F: FnOnce(Sender<ResultMsg>) -> WasmMsg,
{
use payments::Error;
let (tx, rx) = std::sync::mpsc::channel();
self.tx.send(f(tx)).map_err(|e| {
tracing::error!("{e}");
Error::SendToWasmFailed
})?;
let res = rx.recv().map_err(|e| {
tracing::error!("{e}");
Error::RecvFromWasmFailed
})?;
Ok(res)
}
}
#[derive(Clone)]
pub struct PaymentsServer {
pub config: SharedAppConfig,
@ -15,6 +45,21 @@ pub struct PaymentsServer {
pub modules: Modules,
}
impl PaymentsServer {
fn adapter_sender(&self, adapter_name: String) -> Result<ModuleSender, payments::Error> {
use channels::payments::Error;
let modules = self.modules.clone();
let lock = modules.lock().map_err(|e| {
tracing::error!("{e}");
Error::InternalServerError
})?;
Ok(lock
.get(&adapter_name)
.ok_or_else(|| Error::UnknownAdapter(adapter_name))?
.clone())
}
}
#[tarpc::server]
impl Payments for PaymentsServer {
async fn start_payment(
@ -28,32 +73,8 @@ impl Payments for PaymentsServer {
create_payment,
} = input;
// let (tx, rx) = std::sync::mpsc::channel();
let modules = self.modules.clone();
let lock = modules.lock().map_err(|e| {
tracing::error!("{e}");
Error::InternalServerError
})?;
let module = lock
.get(&adapter_name)
.ok_or_else(|| Error::UnknownAdapter(adapter_name))?
.clone();
let (tx, rx) = std::sync::mpsc::channel();
module
.send(WasmMsg::CreateOrder {
create_payment,
callback: tx,
})
.map_err(|e| {
tracing::error!("{e}");
Error::PaymentFailed
})?;
let res = rx.recv().map_err(|e| {
tracing::error!("{e}");
Error::MalformedCreatePaymentResult
})?;
let module = self.adapter_sender(adapter_name)?;
let res = module.send(|tx| WasmMsg::CreateOrder { create_payment, tx })?;
match res {
ResultMsg::OrderCreated(res) => Ok(start_payment::Details {
@ -64,7 +85,31 @@ impl Payments for PaymentsServer {
}
async fn cancel(self, _: context::Context, input: cancel::Input) -> cancel::Output {
todo!()
use channels::payments::cancel::RefundType;
use channels::payments::Error;
let cancel::Input {
adapter_name,
refund,
provider_order_id,
description,
} = input;
let module = self.adapter_sender(adapter_name)?;
let res = module.send(|tx| WasmMsg::Cancel {
cancel: Cancel {
refund: match refund {
RefundType::Full => payment_adapter::RefundType::Full,
RefundType::Partial(v) => payment_adapter::RefundType::Partial(v),
},
provider_order_id,
description,
},
tx,
})?;
match res {
ResultMsg::Cancel(_res) => Ok(cancel::Details {}),
_ => Err(Error::MalformedCancelResult),
}
}
async fn notification(
@ -72,11 +117,29 @@ impl Payments for PaymentsServer {
_: context::Context,
input: notification::Input,
) -> notification::Output {
todo!()
use channels::payments::{notification, Error};
let notification::Input { adapter_name, body } = input;
let module = self.adapter_sender(adapter_name)?;
let res = module.send(|tx| WasmMsg::UpdateStatus {
update_status: payment_adapter::UpdateStatus { payload: body },
tx,
})?;
match res {
ResultMsg::StatusUpdated(_res) => Ok(notification::Details {}),
_ => Err(Error::MalformedUpdateStateResult),
}
}
async fn adapters(self, _: context::Context, input: adapters::Input) -> adapters::Output {
todo!()
async fn adapters(self, _: context::Context, _input: adapters::Input) -> adapters::Output {
Ok(adapters::Details {
names: self
.modules
.0
.lock()
.map(|r| r.keys().cloned().collect::<Vec<_>>())
.unwrap_or_default(),
})
}
}

View File

@ -8,31 +8,26 @@ name = "search-manager"
path = "./src/main.rs"
[dependencies]
actix = { version = "0.13", features = [] }
actix-rt = { version = "2.7", features = [] }
actix = { version = "0", features = [] }
actix-rt = { version = "2", features = [] }
channels = { path = "../channels" }
chrono = { version = "0.4", features = ["serde"] }
chrono = { version = "0", features = ["serde"] }
config = { path = "../config" }
derive_more = { version = "0.99", features = [] }
dotenv = { version = "0.15.0" }
futures = { version = "0.3.25" }
derive_more = { version = "0", features = [] }
dotenv = { version = "0" }
futures = { version = "0" }
model = { path = "../model", features = ["db"] }
opentelemetry = { version = "0.17.0" }
opentelemetry-jaeger = { version = "0.17.0" }
parking_lot = { version = "0.12", features = [] }
pretty_env_logger = { version = "0.4", features = [] }
parking_lot = { version = "0", features = [] }
rumqttc = { version = "*" }
serde = { version = "1.0", features = ["derive"] }
sonic-channel = { version = "1.1.0", features = ["ingest"] }
tarpc = { version = "0.31.0", features = ["tokio1", "serde-transport-bincode", "serde-transport", "serde", "serde-transport-json", "tcp"] }
thiserror = { version = "1.0.31" }
tokio = { version = "1.21.2", features = ['full'] }
tracing = { version = "0.1.6" }
tracing-opentelemetry = { version = "0.17.4" }
tracing-subscriber = { version = "0.3.16", features = ["env-filter"] }
uuid = { version = "1.2.1", features = ["serde"] }
whatlang = { version = "0.16.2" }
serde = { version = "1", features = ["derive"] }
sonic-channel = { version = "1", features = ["ingest"] }
tarpc = { version = "0", features = ["tokio1", "serde-transport-bincode", "serde-transport", "serde", "serde-transport-json", "tcp"] }
thiserror = { version = "1" }
tokio = { version = "1", features = ['full'] }
tracing = { version = "0" }
uuid = { version = "1", features = ["serde"] }
whatlang = { version = "0" }
[dev-dependencies]
fake = { version = "2.5.0" }
fake = { version = "2" }
testx = { path = "../testx" }

View File

@ -4,9 +4,6 @@ use std::env;
use config::UpdateConfig;
pub use context::*;
use tracing_subscriber::fmt::format::FmtSpan;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
pub mod actions;
pub mod context;
@ -35,7 +32,7 @@ impl UpdateConfig for Opts {}
#[actix::main]
async fn main() {
dotenv::dotenv().ok();
init_tracing("search-manager");
config::init_tracing("search-manager");
let opts = Opts {};
@ -47,24 +44,3 @@ async fn main() {
rpc::start(config.clone(), ctx.clone()).await;
}
pub fn init_tracing(_service_name: &str) {
env::set_var("OTEL_BSP_MAX_EXPORT_BATCH_SIZE", "12");
let tracer = {
use opentelemetry::sdk::export::trace::stdout::new_pipeline;
use opentelemetry::sdk::trace::Config;
new_pipeline()
.with_trace_config(Config::default())
.with_pretty_print(true)
.install_simple()
};
tracing_subscriber::registry()
.with(tracing_subscriber::EnvFilter::from_default_env())
.with(tracing_subscriber::fmt::layer().with_span_events(FmtSpan::NEW | FmtSpan::CLOSE))
.with(tracing_opentelemetry::layer().with_tracer(tracer))
.try_init()
.unwrap();
}

View File

@ -9,29 +9,24 @@ path = "./src/main.rs"
[dependencies]
channels = { path = "../channels" }
chrono = { version = "0.4", features = ["serde"] }
chrono = { version = "0", features = ["serde"] }
config = { path = "../config" }
db-utils = { path = "../db-utils" }
derive_more = { version = "0.99", features = [] }
dotenv = { version = "0.15.0" }
futures = { version = "0.3.25" }
derive_more = { version = "0", features = [] }
dotenv = { version = "0" }
futures = { version = "0" }
model = { path = "../model", features = ["db"] }
opentelemetry = { version = "0.17.0" }
opentelemetry-jaeger = { version = "0.17.0" }
pretty_env_logger = { version = "0.4", features = [] }
rumqttc = { version = "*" }
serde = { version = "1.0", features = ["derive"] }
sqlx = { version = "0.6.2", features = ["migrate", "runtime-actix-rustls", "all-types", "postgres"] }
sqlx-core = { version = "0.6.2", features = [] }
tarpc = { version = "0.31.0", features = ["tokio1", "serde-transport-bincode", "serde-transport", "serde", "serde-transport-json", "tcp"] }
thiserror = { version = "1.0.31" }
tokio = { version = "1.21.2", features = ['full'] }
tracing = { version = "0.1.6" }
tracing-opentelemetry = { version = "0.17.4" }
tracing-subscriber = { version = "0.3.16", features = ["env-filter"] }
uuid = { version = "1.2.1", features = ['v4'] }
serde = { version = "1", features = ["derive"] }
sqlx = { version = "0", features = ["migrate", "runtime-actix-rustls", "all-types", "postgres"] }
sqlx-core = { version = "0", features = [] }
tarpc = { version = "0", features = ["tokio1", "serde-transport-bincode", "serde-transport", "serde", "serde-transport-json", "tcp"] }
thiserror = { version = "1" }
tokio = { version = "1", features = ['full'] }
tracing = { version = "0" }
uuid = { version = "1", features = ['v4'] }
[dev-dependencies]
fakeit = { version = "1.1.1" }
insta = { version = "1.21.0" }
fakeit = { version = "1" }
insta = { version = "1" }
testx = { path = "../testx" }

View File

@ -14,6 +14,8 @@ impl UpdateConfig for Opts {}
async fn main() {
let opts = Opts {};
config::init_tracing("stocks");
let config = config::default_load(&opts);
let db = db::Database::build(config.clone()).await;

View File

@ -3,31 +3,36 @@ name = "token_manager"
version = "0.1.0"
edition = "2021"
[[bin]]
name = 'token-manager'
path = './src/main.rs'
[dependencies]
actix = { version = "0.13", features = [] }
actix-rt = { version = "2.7", features = [] }
argon2 = { version = "0.4", features = ["parallel", "password-hash"] }
chrono = { version = "0.4", features = ["serde"] }
argon2 = { version = "0", features = ["parallel", "password-hash"] }
channels = { path = "../channels" }
chrono = { version = "0", features = ["serde"] }
config = { path = "../config" }
database_manager = { path = "../database_manager" }
derive_more = { version = "0.99", features = [] }
futures = { version = "0.3", features = [] }
futures-util = { version = "0.3", features = [] }
hmac = { version = "0.12", features = [] }
jwt = { version = "0.16", features = [] }
model = { path = "../model" }
parking_lot = { version = "0.12", features = [] }
password-hash = { version = "0.4", features = ["alloc"] }
pretty_env_logger = { version = "0.4", features = [] }
rand_core = { version = "0.6", features = ["std"] }
db-utils = { path = "../db-utils" }
derive_more = { version = "0", features = [] }
dotenv = { version = "0" }
futures = { version = "0" }
futures-util = { version = "0", features = [] }
hmac = { version = "0", features = [] }
jwt = { version = "0", features = [] }
model = { path = "../model", features = ["db"] }
password-hash = { version = "0", features = ["alloc"] }
rand_core = { version = "0", features = ["std"] }
rumqttc = { version = "*" }
serde = { version = "1.0", features = ["derive"] }
sha2 = { version = "0.10", features = [] }
thiserror = { version = "1.0.31" }
tokio = { version = "1.17", features = ["full"] }
tracing = { version = "0.1.34" }
uuid = { version = "1.2.1", features = ["serde"] }
serde = { version = "1", features = ["derive"] }
sha2 = { version = "0", features = [] }
sqlx = { version = "0", features = ["migrate", "runtime-actix-rustls", "all-types", "postgres"] }
sqlx-core = { version = "0", features = [] }
tarpc = { version = "0", features = ["tokio1", "serde-transport-bincode", "serde-transport", "serde", "serde-transport-json", "tcp"] }
thiserror = { version = "1" }
tokio = { version = "1", features = ['full'] }
tracing = { version = "0" }
uuid = { version = "1", features = ['v4'] }
[dev-dependencies]
fake = { version = "2.5.0" }
fake = { version = "2" }
testx = { path = "../testx" }

View File

@ -0,0 +1,27 @@
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
CREATE TYPE "Audience" AS ENUM (
'web',
'mobile',
'feed',
'admin_panel'
);
CREATE TYPE "Role" AS ENUM (
'admin',
'user'
);
CREATE TABLE tokens
(
id integer NOT NULL,
customer_id uuid NOT NULL,
role "Role" NOT NULL,
issuer character varying DEFAULT 'bazzar'::character varying NOT NULL,
subject integer NOT NULL,
audience "Audience" DEFAULT 'web'::"Audience" NOT NULL,
expiration_time timestamp without time zone DEFAULT (now() + '14 days'::interval) NOT NULL,
not_before_time timestamp without time zone DEFAULT (now() - '00:01:00'::interval) NOT NULL,
issued_at_time timestamp without time zone DEFAULT now() NOT NULL,
jwt_id uuid DEFAULT gen_random_uuid() NOT NULL
);

View File

View File

@ -0,0 +1,28 @@
use config::SharedAppConfig;
use sqlx_core::pool::Pool;
use sqlx_core::postgres::Postgres;
#[derive(Clone)]
pub struct Database {
pub pool: sqlx::PgPool,
_config: SharedAppConfig,
}
impl Database {
pub async fn build(config: SharedAppConfig) -> Self {
let url = config.lock().tokens().database_url.clone();
let pool = sqlx::PgPool::connect(&url).await.unwrap_or_else(|e| {
eprintln!("Failed to connect to database. {e:?}");
tracing::error!("Failed to connect to database. {e:?}");
std::process::exit(1);
});
Self {
pool,
_config: config,
}
}
pub fn pool(&self) -> Pool<Postgres> {
self.pool.clone()
}
}

View File

@ -1,641 +0,0 @@
//! Tokens management system.
//! It's responsible for creating and validating all tokens.
//!
//! Application flow goes like this:
//!
//! ```ascii
//! Client API TokenManager Database
//!
//! │ │ │ │
//! │ │ │ │
//! │ │ │ ┌───────────────►│
//! ├────────────────►├──────────────────►├──────►│ │
//! │ Sign In │ CreatePair │ └───────────────►│
//! │ │ │ Create │
//! │ │ │ * AccessToken │
//! │ │ │ * RefreshToken │
//! │ │ │ │
//! │ │ │ │
//!
//! │ │ │ │
//! ├────────────────►├──────────────────►├───────────────────────►│
//! │ Validate token │ ValidateToken │ Load token │
//! │ │ (string) │◄───────────────────────┤
//! │ │ │ │
//! │ │ Is Valid? │
//! │ │ │ │
//! │ │◄──────────────── YES │
//! │ │ AccessToken │ │
//! │ │ │ │
//! │ │ │ │
//! │ │◄──────────────── NO │
//! │ │ Error │ │
//!
//! │ │ │ │
//! │ │ │ │
//! │ │ │ ┌───────────────►│
//! ├────────────────►├──────────────────►├──────►│ │
//! │ Refresh token │ CreatePair │ └───────────────►│
//! │ │ │ Create │
//! │ │◄──────────────────┤ * AccessToken │
//! │ │ Access Token │ * RefreshToken │
//! │ │ Refresh Token │ │
//! │ │ │ │
//! ```
//!
//! If you need to operate on tokens from API or any other actor you should
//! always use this actor and never touch database directly.
//!
//! # Examples
//!
//! ```
//! use actix::{Actor, Addr};
//! use config::SharedAppConfig;
//! use database_manager::Database;
//! use token_manager::*;
//! use model::*;
//!
//! async fn tokens(db: Addr<Database>, config: SharedAppConfig) {
//! let manager = TokenManager::new(config, db);
//!
//! let manager_addr = manager.start();
//!
//! let AuthPair { access_token, access_token_string, refresh_token_string, .. } = manager_addr.send(CreatePair {
//! customer_id: uuid::Uuid::new_v4(),
//! account_id: AccountId::from(0),
//! role: Role::Admin
//! }).await.unwrap().unwrap();
//!
//! manager_addr.send(Validate { token: access_token_string }).await.unwrap().unwrap();
//! }
//! ```
use std::collections::BTreeMap;
use std::str::FromStr;
use actix::{Addr, Message};
use chrono::prelude::*;
use config::SharedAppConfig;
use database_manager::{query_db, Database};
use hmac::digest::KeyInit;
use hmac::Hmac;
use model::{AccessTokenString, AccountId, Audience, Role, Token};
use sha2::Sha256;
#[macro_export]
macro_rules! token_async_handler {
($msg: ty, $async: ident, $res: ty) => {
impl actix::Handler<$msg> for TokenManager {
type Result = actix::ResponseActFuture<Self, Result<$res>>;
fn handle(&mut self, msg: $msg, _ctx: &mut Self::Context) -> Self::Result {
use actix::WrapFuture;
let db = self.db.clone();
let config = self.config.clone();
Box::pin(async { $async(msg, db, config).await }.into_actor(self))
}
}
};
}
#[macro_export]
macro_rules! query_tm {
($tm: expr, $msg: expr, default $fail: expr) => {
match $tm.send($msg).await {
Ok(Ok(r)) => r,
Ok(Err(e)) => {
tracing::error!("{e}");
$fail
}
Err(e) => {
tracing::error!("{e:?}");
$fail
}
}
};
(multi, $tm: expr, $fail: expr, $($msg: expr),*) => {{
use futures_util::TryFutureExt;
tokio::join!(
$(
$tm.send($msg).map_ok_or_else(
|e| {
tracing::error!("{e:?}");
Err($fail)
},
|res| match res {
Ok(rec) => Ok(rec),
Err(e) => {
tracing::error!("{e}");
Err($fail)
}
},
)
),*
)
}};
($tm: expr, $msg: expr, $fail: expr) => {
$crate::query_tm!($tm, $msg, $fail, $fail)
};
($tm: expr, $msg: expr, $db_fail: expr, $act_fail: expr) => {
match $tm.send($msg).await {
Ok(Ok(r)) => r,
Ok(Err(e)) => {
tracing::error!("{e}");
return Err($db_fail);
}
Err(e) => {
tracing::error!("{e:?}");
return Err($act_fail);
}
}
};
}
/*struct Jwt {
/// cti (customer id): Customer uuid identifier used by payment service
pub cti: uuid::Uuid,
/// arl (account role): account role
pub arl: Role,
/// iss (issuer): Issuer of the JWT
pub iss: String,
/// sub (subject): Subject of the JWT (the user)
pub sub: i32,
/// aud (audience): Recipient for which the JWT is intended
pub aud: Audience,
/// exp (expiration time): Time after which the JWT expires
pub exp: chrono::NaiveDateTime,
/// nbt (not before time): Time before which the JWT must not be accepted
/// for processing
pub nbt: chrono::NaiveDateTime,
/// iat (issued at time): Time at which the JWT was issued; can be used to
/// determine age of the JWT,
pub iat: chrono::NaiveDateTime,
/// jti (JWT ID): Unique identifier; can be used to prevent the JWT from
/// being replayed (allows a token to be used only once)
pub jti: uuid::Uuid,
}*/
#[derive(Debug, Copy, Clone, serde::Serialize, thiserror::Error)]
#[serde(rename_all = "kebab-case", tag = "token")]
pub enum Error {
#[error("Unable to save new token")]
Save,
#[error("Unable to save new token. Can't connect to database")]
SaveInternal,
#[error("Unable to validate token")]
Validate,
#[error("Unable to validate token. Can't connect to database")]
ValidateInternal,
#[error("Token does not exists or some fields are incorrect")]
Invalid,
}
pub type Result<T> = std::result::Result<T, Error>;
pub struct TokenManager {
db: Addr<Database>,
config: SharedAppConfig,
}
impl actix::Actor for TokenManager {
type Context = actix::Context<Self>;
}
impl TokenManager {
pub fn new(config: SharedAppConfig, db: Addr<Database>) -> Self {
Self { db, config }
}
}
/// Creates single token, it's mostly used by [CreatePair]
///
/// # Examples
///
/// ```
/// use actix::Addr;
/// use model::{AccountId, Role};
/// use token_manager::*;
/// async fn create_pair(token_manager: Addr<token_manager::TokenManager>) {
/// match token_manager.send(CreateToken { customer_id: uuid::Uuid::new_v4(), role: Role::Admin, subject: AccountId::from(1), audience: None, exp: None }).await {
/// Ok(Ok(pair)) => {}
/// Ok(Err(manager_error)) => {}
/// Err(actor_error) => {}
/// }
/// }
/// ```
#[derive(Message)]
#[rtype(result = "Result<(Token, AccessTokenString)>")]
pub struct CreateToken {
pub customer_id: uuid::Uuid,
pub role: Role,
pub subject: AccountId,
pub audience: Option<Audience>,
pub exp: Option<NaiveDateTime>,
}
token_async_handler!(CreateToken, create_token, (Token, AccessTokenString));
pub(crate) async fn create_token(
msg: CreateToken,
db: Addr<Database>,
config: SharedAppConfig,
) -> Result<(Token, AccessTokenString)> {
let CreateToken {
customer_id,
role,
subject,
audience,
exp,
} = msg;
let audience = audience.unwrap_or_default();
let token: Token = match exp {
None => query_db!(
db,
database_manager::CreateToken {
customer_id,
role,
subject,
audience,
},
Error::Save,
Error::SaveInternal
),
Some(exp) => query_db!(
db,
database_manager::CreateExtendedToken {
customer_id,
role,
subject,
audience,
expiration_time: exp
},
Error::Save,
Error::SaveInternal
),
};
let token_string = {
use jwt::SignWithKey;
let secret = config.lock().web().jwt_secret();
let key: Hmac<Sha256> = build_key(secret)?;
let mut claims = BTreeMap::new();
// cti (customer id): Customer uuid identifier used by payment service
claims.insert("cti", format!("{}", token.customer_id));
// arl (account role): account role
claims.insert("arl", String::from(token.role.as_str()));
// iss (issuer): Issuer of the JWT
claims.insert("iss", token.issuer.to_string());
// sub (subject): Subject of the JWT (the user)
claims.insert("sub", format!("{}", token.subject));
// aud (audience): Recipient for which the JWT is intended
claims.insert("aud", String::from(token.audience.as_str()));
// exp (expiration time): Time after which the JWT expires
claims.insert(
"exp",
format!(
"{}",
Utc.from_utc_datetime(&token.expiration_time).format("%+")
),
);
// nbt (not before time): Time before which the JWT must not be accepted
// for processing
claims.insert(
"nbt",
format!(
"{}",
Utc.from_utc_datetime(&token.not_before_time).format("%+")
),
);
// iat (issued at time): Time at which the JWT was issued; can be used
// to determine age of the JWT,
claims.insert(
"iat",
format!(
"{}",
Utc.from_utc_datetime(&token.issued_at_time).format("%+")
),
);
// jti (JWT ID): Unique identifier; can be used to prevent the JWT from
// being replayed (allows a token to be used only once)
claims.insert("jti", format!("{}", token.jwt_id));
let s = match claims.sign_with_key(&key) {
Ok(s) => s,
Err(e) => {
tracing::error!("{e:?}");
return Err(Error::SaveInternal);
}
};
AccessTokenString::new(s)
};
Ok((token, token_string))
}
pub struct AuthPair {
pub access_token: Token,
pub access_token_string: AccessTokenString,
pub _refresh_token: Token,
pub refresh_token_string: model::RefreshTokenString,
}
/// Creates access token and refresh token
///
/// # Examples
///
/// ```
/// use actix::Addr;
/// use model::{AccountId, Role};
/// use token_manager::CreatePair;
/// async fn create_pair(token_manager: Addr<token_manager::TokenManager>) {
/// match token_manager.send(CreatePair { customer_id: uuid::Uuid::new_v4(), account_id: AccountId::from(0), role: Role::Admin }).await {
/// Ok(Ok(pair)) => {}
/// Ok(Err(manager_error)) => {}
/// Err(actor_error) => {}
/// }
/// }
/// ```
#[derive(Message)]
#[rtype(result = "Result<AuthPair>")]
pub struct CreatePair {
pub customer_id: uuid::Uuid,
pub role: Role,
pub account_id: AccountId,
}
token_async_handler!(CreatePair, create_pair, AuthPair);
pub(crate) async fn create_pair(
msg: CreatePair,
db: Addr<Database>,
config: SharedAppConfig,
) -> Result<AuthPair> {
let (access_token, refresh_token) = tokio::join!(
create_token(
CreateToken {
customer_id: msg.customer_id,
role: msg.role,
subject: msg.account_id,
audience: Some(model::Audience::Web),
exp: None
},
db.clone(),
config.clone()
),
create_token(
CreateToken {
customer_id: msg.customer_id,
role: msg.role,
subject: msg.account_id,
audience: Some(model::Audience::Web),
exp: Some((chrono::Utc::now() + chrono::Duration::days(31)).naive_utc())
},
db.clone(),
config.clone()
)
);
let (access_token, access_token_string): (Token, AccessTokenString) = access_token?;
let (refresh_token, refresh_token_string): (Token, AccessTokenString) = refresh_token?;
Ok(AuthPair {
access_token,
access_token_string,
_refresh_token: refresh_token,
refresh_token_string: refresh_token_string.into(),
})
}
/// Checks if token is still valid
///
/// # Examples
///
/// ```
/// use actix::Addr;
/// use model::{AccessTokenString, AccountId, Role};
/// use token_manager::{CreatePair, Validate};
/// async fn create_pair(token_manager: Addr<token_manager::TokenManager>, token: AccessTokenString) {
/// match token_manager.send(Validate { token }).await {
/// Ok(Ok(pair)) => {}
/// Ok(Err(manager_error)) => {}
/// Err(actor_error) => {}
/// }
/// }
/// ```
#[derive(Message)]
#[rtype(result = "Result<Token>")]
pub struct Validate {
pub token: AccessTokenString,
}
token_async_handler!(Validate, validate, Token);
pub(crate) async fn validate(
msg: Validate,
db: Addr<Database>,
config: SharedAppConfig,
) -> Result<Token> {
use jwt::VerifyWithKey;
tracing::info!("Validating token {:?}", msg.token);
let secret = config.lock().web().jwt_secret();
let key: Hmac<Sha256> = build_key(secret)?;
let claims: BTreeMap<String, String> = match msg.token.verify_with_key(&key) {
Ok(claims) => claims,
_ => return Err(Error::Validate),
};
let jti = match claims.get("jti") {
Some(jti) => jti,
_ => return Err(Error::Validate),
};
let token: Token = query_db!(
db,
database_manager::TokenByJti {
jti: match uuid::Uuid::from_str(jti) {
Ok(uid) => uid,
_ => return Err(Error::Validate),
},
},
Error::Validate,
Error::ValidateInternal
);
if token.expiration_time < Utc::now().naive_utc() {
return Err(Error::Validate);
}
validate_pair(&claims, "cti", token.customer_id, validate_uuid)?;
validate_pair(&claims, "arl", token.role, eq)?;
validate_pair(&claims, "iss", &token.issuer, eq)?;
validate_pair(&claims, "sub", token.subject, validate_num)?;
validate_pair(&claims, "aud", token.audience, eq)?;
validate_pair(&claims, "exp", &token.expiration_time, validate_time)?;
validate_pair(&claims, "nbt", &token.not_before_time, validate_time)?;
validate_pair(&claims, "iat", &token.issued_at_time, validate_time)?;
tracing::info!("JWT token valid");
Ok(token)
}
fn build_key(secret: String) -> Result<Hmac<Sha256>> {
match Hmac::new_from_slice(secret.as_bytes()) {
Ok(key) => Ok(key),
Err(e) => {
tracing::error!("{e:?}");
dbg!(e);
Err(Error::ValidateInternal)
}
}
}
#[inline(always)]
fn validate_pair<F, V>(
claims: &BTreeMap<String, String>,
key: &str,
v: V,
cmp: F,
) -> std::result::Result<(), Error>
where
F: for<'s> FnOnce(V, &'s str) -> bool,
V: PartialEq,
{
claims
.get(key)
.map(|s| cmp(v, s.as_str()))
.unwrap_or_default()
.then_some(())
.ok_or(Error::Invalid)
}
#[inline(always)]
fn eq<V>(value: V, text: &str) -> bool
where
V: for<'s> PartialEq<&'s str>,
{
value == text
}
#[inline(always)]
fn validate_time(left: &NaiveDateTime, right: &str) -> bool {
chrono::DateTime::parse_from_str(right, "%+")
.map(|t| t.naive_utc() == *left)
.unwrap_or_default()
}
#[inline(always)]
fn validate_num(left: i32, right: &str) -> bool {
right.parse::<i32>().map(|n| left == n).unwrap_or_default()
}
#[inline(always)]
fn validate_uuid(left: uuid::Uuid, right: &str) -> bool {
uuid::Uuid::from_str(right)
.map(|u| u == left)
.unwrap_or_default()
}
#[cfg(test)]
mod tests {
use actix::Actor;
use config::UpdateConfig;
use database_manager::Database;
use model::*;
use super::*;
pub struct NoOpts;
impl UpdateConfig for NoOpts {}
#[actix::test]
async fn create_token() {
testx::db!(config, db);
let db = db.start();
let (token, _text) = super::create_token(
CreateToken {
customer_id: Default::default(),
role: Role::Admin,
subject: AccountId::from(1),
audience: None,
exp: None,
},
db.clone(),
config,
)
.await
.unwrap();
db.send(database_manager::DeleteToken { token_id: token.id })
.await
.ok();
}
#[actix::test]
async fn create_pair() {
testx::db!(config, db);
let db = db.start();
let AuthPair {
access_token,
access_token_string: _,
refresh_token_string: _,
_refresh_token,
} = super::create_pair(
CreatePair {
customer_id: Default::default(),
role: Role::Admin,
account_id: AccountId::from(0),
},
db.clone(),
config,
)
.await
.unwrap();
db.send(database_manager::DeleteToken {
token_id: access_token.id,
})
.await
.ok();
db.send(database_manager::DeleteToken {
token_id: _refresh_token.id,
})
.await
.ok();
}
#[actix::test]
async fn validate() {
testx::db!(config, db);
let db = db.start();
let (token, text) = super::create_token(
CreateToken {
customer_id: Default::default(),
role: Role::Admin,
subject: AccountId::from(1),
audience: None,
exp: None,
},
db.clone(),
config.clone(),
)
.await
.unwrap();
super::validate(Validate { token: text }, db.clone(), config.clone())
.await
.unwrap();
db.send(database_manager::DeleteToken { token_id: token.id })
.await
.ok();
}
}

View File

@ -0,0 +1,570 @@
//! Tokens management system.
//! It's responsible for creating and validating all tokens.
//!
//! Application flow goes like this:
//!
//! ```ascii
//! Client API TokenManager Database
//!
//! │ │ │ │
//! │ │ │ │
//! │ │ │ ┌───────────────►│
//! ├────────────────►├──────────────────►├──────►│ │
//! │ Sign In │ CreatePair │ └───────────────►│
//! │ │ │ Create │
//! │ │ │ * AccessToken │
//! │ │ │ * RefreshToken │
//! │ │ │ │
//! │ │ │ │
//!
//! │ │ │ │
//! ├────────────────►├──────────────────►├───────────────────────►│
//! │ Validate token │ ValidateToken │ Load token │
//! │ │ (string) │◄───────────────────────┤
//! │ │ │ │
//! │ │ Is Valid? │
//! │ │ │ │
//! │ │◄──────────────── YES │
//! │ │ AccessToken │ │
//! │ │ │ │
//! │ │ │ │
//! │ │◄──────────────── NO │
//! │ │ Error │ │
//!
//! │ │ │ │
//! │ │ │ │
//! │ │ │ ┌───────────────►│
//! ├────────────────►├──────────────────►├──────►│ │
//! │ Refresh token │ CreatePair │ └───────────────►│
//! │ │ │ Create │
//! │ │◄──────────────────┤ * AccessToken │
//! │ │ Access Token │ * RefreshToken │
//! │ │ Refresh Token │ │
//! │ │ │ │
//! ```
//!
//! If you need to operate on tokens from API or any other actor you should
//! always use this actor and never touch database directly.
//!
//! # Examples
//!
//! ```
//! use actix::{Actor, Addr};
//! use config::SharedAppConfig;
//! use database_manager::Database;
//! use token_manager::*;
//! use model::*;
//!
//! async fn tokens(db: Addr<Database>, config: SharedAppConfig) {
//! let manager = TokenManager::new(config, db);
//!
//! let manager_addr = manager.start();
//!
//! let AuthPair { access_token, access_token_string, refresh_token_string, .. } = manager_addr.send(CreatePair {
//! customer_id: uuid::Uuid::new_v4(),
//! account_id: AccountId::from(0),
//! role: Role::Admin
//! }).await.unwrap().unwrap();
//!
//! manager_addr.send(Validate { token: access_token_string }).await.unwrap().unwrap();
//! }
//! ```
mod db;
use std::collections::BTreeMap;
use std::str::FromStr;
use channels::payments::CreatePayment;
use chrono::prelude::*;
use config::SharedAppConfig;
use hmac::digest::KeyInit;
use hmac::Hmac;
use model::{AccessTokenString, AccountId, Audience, Role, Token};
use sha2::Sha256;
use crate::db::Database;
/*struct Jwt {
/// cti (customer id): Customer uuid identifier used by payment service
pub cti: uuid::Uuid,
/// arl (account role): account role
pub arl: Role,
/// iss (issuer): Issuer of the JWT
pub iss: String,
/// sub (subject): Subject of the JWT (the user)
pub sub: i32,
/// aud (audience): Recipient for which the JWT is intended
pub aud: Audience,
/// exp (expiration time): Time after which the JWT expires
pub exp: chrono::NaiveDateTime,
/// nbt (not before time): Time before which the JWT must not be accepted
/// for processing
pub nbt: chrono::NaiveDateTime,
/// iat (issued at time): Time at which the JWT was issued; can be used to
/// determine age of the JWT,
pub iat: chrono::NaiveDateTime,
/// jti (JWT ID): Unique identifier; can be used to prevent the JWT from
/// being replayed (allows a token to be used only once)
pub jti: uuid::Uuid,
}*/
#[derive(Debug, Copy, Clone, serde::Serialize, thiserror::Error)]
#[serde(rename_all = "kebab-case", tag = "token")]
pub enum Error {
#[error("Unable to save new token")]
Save,
#[error("Unable to save new token. Can't connect to database")]
SaveInternal,
#[error("Unable to validate token")]
Validate,
#[error("Unable to validate token. Can't connect to database")]
ValidateInternal,
#[error("Token does not exists or some fields are incorrect")]
Invalid,
}
pub type Result<T> = std::result::Result<T, Error>;
pub struct TokenManager {
db: Database,
config: SharedAppConfig,
}
impl TokenManager {
pub fn new(config: SharedAppConfig, db: Database) -> Self {
Self { db, config }
}
}
/// Creates single token, it's mostly used by [CreatePair]
///
/// # Examples
///
/// ```
/// use actix::Addr;
/// use model::{AccountId, Role};
/// use token_manager::*;
/// async fn create_pair(token_manager: Addr<token_manager::TokenManager>) {
/// match token_manager.send(CreateToken { customer_id: uuid::Uuid::new_v4(), role: Role::Admin, subject: AccountId::from(1), audience: None, exp: None }).await {
/// Ok(Ok(pair)) => {}
/// Ok(Err(manager_error)) => {}
/// Err(actor_error) => {}
/// }
/// }
/// ```
#[derive(Debug, Clone)]
pub struct CreateToken {
pub customer_id: uuid::Uuid,
pub role: Role,
pub subject: AccountId,
pub audience: Option<Audience>,
pub exp: Option<NaiveDateTime>,
}
impl CreateToken {
pub async fn run(
self,
db: Addr<Database>,
config: SharedAppConfig,
) -> Result<(Token, AccessTokenString)> {
let CreateToken {
customer_id,
role,
subject,
audience,
exp,
} = self;
let audience = audience.unwrap_or_default();
let token: Token = match exp {
None => query_db!(
db,
database_manager::CreateToken {
customer_id,
role,
subject,
audience,
},
Error::Save,
Error::SaveInternal
),
Some(exp) => query_db!(
db,
database_manager::CreateExtendedToken {
customer_id,
role,
subject,
audience,
expiration_time: exp
},
Error::Save,
Error::SaveInternal
),
};
let token_string = {
use jwt::SignWithKey;
let secret = config.lock().web().jwt_secret();
let key: Hmac<Sha256> = build_key(secret)?;
let mut claims = BTreeMap::new();
// cti (customer id): Customer uuid identifier used by payment service
claims.insert("cti", format!("{}", token.customer_id));
// arl (account role): account role
claims.insert("arl", String::from(token.role.as_str()));
// iss (issuer): Issuer of the JWT
claims.insert("iss", token.issuer.to_string());
// sub (subject): Subject of the JWT (the user)
claims.insert("sub", format!("{}", token.subject));
// aud (audience): Recipient for which the JWT is intended
claims.insert("aud", String::from(token.audience.as_str()));
// exp (expiration time): Time after which the JWT expires
claims.insert(
"exp",
format!(
"{}",
Utc.from_utc_datetime(&token.expiration_time).format("%+")
),
);
// nbt (not before time): Time before which the JWT must not be accepted
// for processing
claims.insert(
"nbt",
format!(
"{}",
Utc.from_utc_datetime(&token.not_before_time).format("%+")
),
);
// iat (issued at time): Time at which the JWT was issued; can be used
// to determine age of the JWT,
claims.insert(
"iat",
format!(
"{}",
Utc.from_utc_datetime(&token.issued_at_time).format("%+")
),
);
// jti (JWT ID): Unique identifier; can be used to prevent the JWT from
// being replayed (allows a token to be used only once)
claims.insert("jti", format!("{}", token.jwt_id));
let s = match claims.sign_with_key(&key) {
Ok(s) => s,
Err(e) => {
tracing::error!("{e:?}");
return Err(Error::SaveInternal);
}
};
AccessTokenString::new(s)
};
Ok((token, token_string))
}
}
#[derive(Debug, Clone)]
pub struct AuthPair {
pub access_token: Token,
pub access_token_string: AccessTokenString,
pub _refresh_token: Token,
pub refresh_token_string: model::RefreshTokenString,
}
/// Creates access token and refresh token
///
/// # Examples
///
/// ```
/// use actix::Addr;
/// use model::{AccountId, Role};
/// use token_manager::CreatePair;
/// async fn create_pair(token_manager: Addr<token_manager::TokenManager>) {
/// match token_manager.send(CreatePair { customer_id: uuid::Uuid::new_v4(), account_id: AccountId::from(0), role: Role::Admin }).await {
/// Ok(Ok(pair)) => {}
/// Ok(Err(manager_error)) => {}
/// Err(actor_error) => {}
/// }
/// }
/// ```
#[derive(Debug, Clone)]
pub struct CreatePair {
pub customer_id: uuid::Uuid,
pub role: Role,
pub account_id: AccountId,
}
impl CreatePair {
pub async fn run(self, db: Database, config: SharedAppConfig) -> Result<AuthPair> {
let (access_token, refresh_token) = tokio::join!(
create_token(
CreateToken {
customer_id: self.customer_id,
role: self.role,
subject: self.account_id,
audience: Some(model::Audience::Web),
exp: None
},
db.clone(),
config.clone()
),
create_token(
CreateToken {
customer_id: self.customer_id,
role: self.role,
subject: self.account_id,
audience: Some(model::Audience::Web),
exp: Some((chrono::Utc::now() + chrono::Duration::days(31)).naive_utc())
},
db.clone(),
config.clone()
)
);
let (access_token, access_token_string): (Token, AccessTokenString) = access_token?;
let (refresh_token, refresh_token_string): (Token, AccessTokenString) = refresh_token?;
Ok(AuthPair {
access_token,
access_token_string,
_refresh_token: refresh_token,
refresh_token_string: refresh_token_string.into(),
})
}
}
/// Checks if token is still valid
///
/// # Examples
///
/// ```
/// use actix::Addr;
/// use model::{AccessTokenString, AccountId, Role};
/// use token_manager::{CreatePair, Validate};
/// async fn create_pair(token_manager: Addr<token_manager::TokenManager>, token: AccessTokenString) {
/// match token_manager.send(Validate { token }).await {
/// Ok(Ok(pair)) => {}
/// Ok(Err(manager_error)) => {}
/// Err(actor_error) => {}
/// }
/// }
/// ```
#[derive(Debug, Clone)]
pub struct Validate {
pub token: AccessTokenString,
}
impl Validate {
pub async fn run(self, db: Database, config: SharedAppConfig) -> Result<Token> {
use jwt::VerifyWithKey;
tracing::info!("Validating token {:?}", self.token);
let secret = config.lock().web().jwt_secret();
let key: Hmac<Sha256> = build_key(secret)?;
let claims: BTreeMap<String, String> = match self.token.verify_with_key(&key) {
Ok(claims) => claims,
_ => return Err(Error::Validate),
};
let jti = match claims.get("jti") {
Some(jti) => jti,
_ => return Err(Error::Validate),
};
let token: Token = query_db!(
db,
database_manager::TokenByJti {
jti: match uuid::Uuid::from_str(jti) {
Ok(uid) => uid,
_ => return Err(Error::Validate),
},
},
Error::Validate,
Error::ValidateInternal
);
if token.expiration_time < Utc::now().naive_utc() {
return Err(Error::Validate);
}
validate_pair(&claims, "cti", token.customer_id, validate_uuid)?;
validate_pair(&claims, "arl", token.role, eq)?;
validate_pair(&claims, "iss", &token.issuer, eq)?;
validate_pair(&claims, "sub", token.subject, validate_num)?;
validate_pair(&claims, "aud", token.audience, eq)?;
validate_pair(&claims, "exp", &token.expiration_time, validate_time)?;
validate_pair(&claims, "nbt", &token.not_before_time, validate_time)?;
validate_pair(&claims, "iat", &token.issued_at_time, validate_time)?;
tracing::info!("JWT token valid");
Ok(token)
}
}
pub struct Refresh {}
impl Refresh {
pub async fn run(self, db: Database, config: SharedAppConfig) -> Result<AuthPair> {
todo!()
}
}
fn build_key(secret: String) -> Result<Hmac<Sha256>> {
match Hmac::new_from_slice(secret.as_bytes()) {
Ok(key) => Ok(key),
Err(e) => {
tracing::error!("{e:?}");
dbg!(e);
Err(Error::ValidateInternal)
}
}
}
#[inline(always)]
fn validate_pair<F, V>(
claims: &BTreeMap<String, String>,
key: &str,
v: V,
cmp: F,
) -> std::result::Result<(), Error>
where
F: for<'s> FnOnce(V, &'s str) -> bool,
V: PartialEq,
{
claims
.get(key)
.map(|s| cmp(v, s.as_str()))
.unwrap_or_default()
.then_some(())
.ok_or(Error::Invalid)
}
#[inline(always)]
fn eq<V>(value: V, text: &str) -> bool
where
V: for<'s> PartialEq<&'s str>,
{
value == text
}
#[inline(always)]
fn validate_time(left: &NaiveDateTime, right: &str) -> bool {
chrono::DateTime::parse_from_str(right, "%+")
.map(|t| t.naive_utc() == *left)
.unwrap_or_default()
}
#[inline(always)]
fn validate_num(left: i32, right: &str) -> bool {
right.parse::<i32>().map(|n| left == n).unwrap_or_default()
}
#[inline(always)]
fn validate_uuid(left: uuid::Uuid, right: &str) -> bool {
uuid::Uuid::from_str(right)
.map(|u| u == left)
.unwrap_or_default()
}
// #[cfg(test)]
// mod tests {
// use actix::Actor;
// use config::UpdateConfig;
// use database_manager::Database;
// use model::*;
//
// use super::*;
//
// pub struct NoOpts;
//
// impl UpdateConfig for NoOpts {}
//
// #[actix::test]
// async fn create_token() {
// testx::db!(config, db);
// let db = db.start();
//
// let (token, _text) = super::create_token(
// CreateToken {
// customer_id: Default::default(),
// role: Role::Admin,
// subject: AccountId::from(1),
// audience: None,
// exp: None,
// },
// db.clone(),
// config,
// )
// .await
// .unwrap();
//
// db.send(database_manager::DeleteToken { token_id: token.id })
// .await
// .ok();
// }
//
// #[actix::test]
// async fn create_pair() {
// testx::db!(config, db);
// let db = db.start();
//
// let AuthPair {
// access_token,
// access_token_string: _,
// refresh_token_string: _,
// _refresh_token,
// } = super::create_pair(
// CreatePair {
// customer_id: Default::default(),
// role: Role::Admin,
// account_id: AccountId::from(0),
// },
// db.clone(),
// config,
// )
// .await
// .unwrap();
//
// db.send(database_manager::DeleteToken {
// token_id: access_token.id,
// })
// .await
// .ok();
//
// db.send(database_manager::DeleteToken {
// token_id: _refresh_token.id,
// })
// .await
// .ok();
// }
//
// #[actix::test]
// async fn validate() {
// testx::db!(config, db);
// let db = db.start();
//
// let (token, text) = super::create_token(
// CreateToken {
// customer_id: Default::default(),
// role: Role::Admin,
// subject: AccountId::from(1),
// audience: None,
// exp: None,
// },
// db.clone(),
// config.clone(),
// )
// .await
// .unwrap();
//
// super::validate(Validate { token: text }, db.clone(), config.clone())
// .await
// .unwrap();
//
// db.send(database_manager::DeleteToken { token_id: token.id })
// .await
// .ok();
// }
// }
#[tokio::main]
async fn main() {}

View File

@ -10,14 +10,14 @@ crate-type = ["cdylib"]
chrono = { version = "*", features = ["wasm-bindgen", "wasmbind"] }
gloo-timers = { version = "*", features = ["futures"] }
indexmap = { version = "1", default-features = false, features = ["serde-1", "std"] }
js-sys = { version = "0.3.57", features = [] }
js-sys = { version = "0", features = [] }
model = { path = "../model", features = ["dummy"] }
rusty-money = { version = "0.4.1", features = ["iso"] }
seed = { version = "0.9.1", features = [] }
serde = { version = "1.0.137", features = ["derive"] }
serde-wasm-bindgen = { version = "0.4.2" }
serde_json = { version = "1.0.81" }
thiserror = { version = "1.0.31" }
uuid = { version = "1.0.0", features = ["v4"] }
wasm-bindgen = { version = "0.2.80", features = ["default"] }
web-sys = { version = "0.3.57", features = ["Navigator"] }
rusty-money = { version = "0", features = ["iso"] }
seed = { version = "0", features = [] }
serde = { version = "1", features = ["derive"] }
serde-wasm-bindgen = { version = "0" }
serde_json = { version = "1" }
thiserror = { version = "1" }
uuid = { version = "1", features = ["v4"] }
wasm-bindgen = { version = "0", features = ["default"] }
web-sys = { version = "0", features = ["Navigator"] }

View File

@ -8,6 +8,7 @@ then
psql postgres postgres -c "DROP DATABASE ${DATABASE_NAME}_carts" || echo 0
psql postgres postgres -c "DROP DATABASE ${DATABASE_NAME}_stocks" || echo 0
psql postgres postgres -c "DROP DATABASE ${DATABASE_NAME}_orders" || echo 0
psql postgres postgres -c "DROP DATABASE ${DATABASE_NAME}_tokens" || echo 0
fi
psql postgres postgres -c "CREATE DATABASE ${DATABASE_NAME}_accounts" || echo 0
@ -21,3 +22,6 @@ sqlx migrate run -D "${STOCK_DATABASE_URL}" --source ./crates/stock_manager/migr
psql postgres postgres -c "CREATE DATABASE ${DATABASE_NAME}_orders" || echo 0
sqlx migrate run -D "${ORDER_DATABASE_URL}" --source ./crates/order_manager/migrations
psql postgres postgres -c "CREATE DATABASE ${DATABASE_NAME}_tokens" || echo 0
sqlx migrate run -D "${TOKEN_DATABASE_URL}" --source ./crates/token_manager/migrations