From 6c28472aceaaaf3966444e15182b6afb45a5294f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrian=20Wo=C5=BAniak?= Date: Sun, 6 Nov 2022 19:50:51 +0100 Subject: [PATCH] Start working on stocks, finish search (suggestions added) --- .cargo/config.toml | 10 +- .env | 5 +- Cargo.lock | 32 +++- Cargo.toml | 1 + crates/account_manager/Cargo.toml | 2 - crates/account_manager/src/db/mod.rs | 6 +- crates/account_manager/src/main.rs | 2 +- crates/api/Cargo.toml | 1 + crates/api/src/main.rs | 31 ++-- .../api/src/routes/admin/api_v1/products.rs | 40 +++-- crates/api/src/routes/mod.rs | 3 + .../src/routes/public/api_v1/unrestricted.rs | 13 +- crates/channels/Cargo.toml | 4 +- crates/channels/src/accounts.rs | 9 +- crates/channels/src/lib.rs | 4 + crates/channels/src/search.rs | 140 +++++++++++++++++- crates/channels/src/stocks.rs | 103 +++++++++++++ crates/config/Cargo.toml | 1 + crates/config/src/lib.rs | 65 ++++++-- crates/model/src/lib.rs | 31 ++++ crates/search_manager/Cargo.toml | 1 + crates/search_manager/src/actions.rs | 134 +++++++++++++++-- crates/search_manager/src/rpc.rs | 9 +- crates/stock_manager/Cargo.toml | 30 ++++ .../migrations/202204131841_init.sql | 46 ++++++ crates/stock_manager/src/actions.rs | 0 crates/stock_manager/src/context.rs | 0 crates/stock_manager/src/db/mod.rs | 28 ++++ crates/stock_manager/src/db/photos.rs | 0 crates/stock_manager/src/db/product_photos.rs | 0 crates/stock_manager/src/db/products.rs | 0 crates/stock_manager/src/db/stocks.rs | 0 crates/stock_manager/src/main.rs | 8 + crates/stock_manager/src/mqtt.rs | 0 crates/stock_manager/src/rpc.rs | 0 scripts/migrate.sh | 3 + 36 files changed, 692 insertions(+), 70 deletions(-) create mode 100644 crates/channels/src/stocks.rs create mode 100644 crates/stock_manager/Cargo.toml create mode 100644 crates/stock_manager/migrations/202204131841_init.sql create mode 100644 crates/stock_manager/src/actions.rs create mode 100644 crates/stock_manager/src/context.rs create mode 100644 crates/stock_manager/src/db/mod.rs create mode 100644 crates/stock_manager/src/db/photos.rs create mode 100644 crates/stock_manager/src/db/product_photos.rs create mode 100644 crates/stock_manager/src/db/products.rs create mode 100644 crates/stock_manager/src/db/stocks.rs create mode 100644 crates/stock_manager/src/main.rs create mode 100644 crates/stock_manager/src/mqtt.rs create mode 100644 crates/stock_manager/src/rpc.rs diff --git a/.cargo/config.toml b/.cargo/config.toml index 2519d1b..3e26d32 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -1,5 +1,5 @@ -#[target.x86_64-unknown-linux-gnu] -#linker = "clang" -#rustflags = [ -# "-C", "link-arg=-fuse-ld=mold", -#] +[target.x86_64-unknown-linux-gnu] +linker = "clang" +rustflags = [ + "-C", "link-arg=-fuse-ld=mold", +] diff --git a/.env b/.env index 3f2886d..f188f83 100644 --- a/.env +++ b/.env @@ -2,6 +2,7 @@ DATABASE_NAME=bazzar DATABASE_URL=postgres://postgres@localhost/bazzar ACCOUNT_DATABASE_URL=postgres://postgres@localhost/bazzar_accounts CART_DATABASE_URL=postgres://postgres@localhost/bazzar_carts +STOCK_DATABASE_URL=postgres://postgres@localhost/bazzar_stocks PASS_SALT=18CHwV7eGFAea16z+qMKZg RUST_LOG=debug @@ -30,8 +31,8 @@ WEB_HOST=0.0.0.0 FILES_PUBLIC_PATH=/files FILES_LOCAL_PATH=./tmp -SONIC_SEARCH_ADDR=0.0.0.0:1491 +SONIC_SEARCH_ADDR=[::1]:1491 SONIC_SEARCH_PASS=SecretPassword -SONIC_INGEST_ADDR=0.0.0.0:1491 +SONIC_INGEST_ADDR=[::1]:1491 SONIC_INGEST_PASS=SecretPassword SEARCH_ACTIVE=true diff --git a/Cargo.lock b/Cargo.lock index afc6237..6961253 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6,8 +6,6 @@ version = 3 name = "account_manager" version = "0.1.0" dependencies = [ - "actix 0.13.0", - "actix-rt", "bincode", "bytes", "channels", @@ -684,6 +682,7 @@ dependencies = [ "gumdrop", "human-panic", "include_dir", + "itertools", "jemallocator", "model", "oauth2", @@ -889,6 +888,7 @@ dependencies = [ "thiserror", "tokio", "tracing", + "whatlang", ] [[package]] @@ -954,6 +954,7 @@ name = "config" version = "0.1.0" dependencies = [ "actix-web", + "cookie", "parking_lot 0.12.1", "password-hash", "pay_u", @@ -3682,6 +3683,7 @@ dependencies = [ "tracing-opentelemetry", "tracing-subscriber", "uuid 1.2.1", + "whatlang", ] [[package]] @@ -4083,6 +4085,32 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" +[[package]] +name = "stock_manager" +version = "0.1.0" +dependencies = [ + "channels", + "chrono", + "config", + "derive_more", + "dotenv", + "futures 0.3.25", + "model", + "opentelemetry 0.17.0", + "opentelemetry-jaeger", + "pretty_env_logger", + "rumqttc", + "serde", + "sqlx", + "sqlx-core", + "tarpc", + "thiserror", + "tokio", + "tracing", + "tracing-opentelemetry", + "tracing-subscriber", +] + [[package]] name = "stringprep" version = "0.1.2" diff --git a/Cargo.toml b/Cargo.toml index 0bf2ccf..fb835f4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,7 @@ members = [ "crates/order_manager", "crates/payment_manager", "crates/search_manager", + "crates/stock_manager", "crates/token_manager", "crates/fs_manager", "crates/lang_provider", diff --git a/crates/account_manager/Cargo.toml b/crates/account_manager/Cargo.toml index 10e1913..e4c83a8 100644 --- a/crates/account_manager/Cargo.toml +++ b/crates/account_manager/Cargo.toml @@ -8,8 +8,6 @@ name = "account-manager" path = "src/main.rs" [dependencies] -actix = { version = "0.13", features = [] } -actix-rt = { version = "2.7", features = [] } bincode = { version = "1.3.3" } bytes = { version = "1.2.1" } channels = { path = "../channels" } diff --git a/crates/account_manager/src/db/mod.rs b/crates/account_manager/src/db/mod.rs index bed853b..741b8fc 100644 --- a/crates/account_manager/src/db/mod.rs +++ b/crates/account_manager/src/db/mod.rs @@ -4,6 +4,8 @@ pub mod addresses; pub use accounts::*; pub use addresses::*; use config::SharedAppConfig; +use sqlx_core::pool::Pool; +use sqlx_core::postgres::Postgres; #[derive(Clone)] pub struct Database { @@ -24,5 +26,7 @@ impl Database { } } - pub fn pool(&self) {} + pub fn pool(&self) -> Pool { + self.pool.clone() + } } diff --git a/crates/account_manager/src/main.rs b/crates/account_manager/src/main.rs index 60aac30..2b15f1b 100644 --- a/crates/account_manager/src/main.rs +++ b/crates/account_manager/src/main.rs @@ -32,7 +32,7 @@ pub struct Opts {} impl UpdateConfig for Opts {} -#[actix::main] +#[tokio::main] async fn main() { dotenv::dotenv().ok(); init_tracing("account-manager"); diff --git a/crates/api/Cargo.toml b/crates/api/Cargo.toml index 60b1ca9..dbac3e8 100644 --- a/crates/api/Cargo.toml +++ b/crates/api/Cargo.toml @@ -30,6 +30,7 @@ futures-util = { version = "0.3", features = [] } gumdrop = { version = "0.8", features = [] } human-panic = { version = "1.0.3" } include_dir = { version = "0.7.2", features = [] } +itertools = { version = "0.10.5" } jemallocator = { version = "0.3", features = [] } model = { path = "../model", version = "0.1", features = ["db"] } oauth2 = { version = "4.1", features = [] } diff --git a/crates/api/src/main.rs b/crates/api/src/main.rs index 3af2acd..2f255cd 100644 --- a/crates/api/src/main.rs +++ b/crates/api/src/main.rs @@ -9,6 +9,7 @@ use actix_session::SessionMiddleware; use actix_web::middleware::Logger; use actix_web::web::Data; use actix_web::{App, HttpServer}; +use channels::Lang; use config::UpdateConfig; use jemallocator::Jemalloc; use model::{AccountState, Email, Login, PassHash, Password, Role}; @@ -197,6 +198,9 @@ async fn test_mailer(opts: TestMailerOpts) -> Result<()> { async fn reindex(opts: ReIndexOpts) -> Result<()> { let config = config::default_load(&opts); opts.update_config(&mut *config.lock()); + + let lang: Lang = opts.lang.clone().parse().unwrap(); + let db = database_manager::Database::build(config.clone()) .await .start(); @@ -206,26 +210,35 @@ async fn reindex(opts: ReIndexOpts) -> Result<()> { .await .unwrap() .unwrap(); + + tracing::info!("{:?}", products); + for product in products { - search + if let Ok(res) = search .create_index( tarpc::context::current(), - channels::search::create_index::Input { - key: product.id.to_string(), - value: vec![ + channels::search::create_index::Input::new( + product.id.to_string(), + vec![ product.long_description.into_inner(), product.short_description.into_inner(), product.name.into_inner(), ] .join(" "), - collection: "products".into(), - lang: opts.lang.clone(), - }, + "products", + "default", + lang, + ), ) .await - .unwrap(); + { + if let Some(error) = res.error { + tracing::error!("{}", error); + return Ok(()); + } + } } - println!("Success!"); + tracing::info!("Success!"); Ok(()) } diff --git a/crates/api/src/routes/admin/api_v1/products.rs b/crates/api/src/routes/admin/api_v1/products.rs index ebdc906..32f5af2 100644 --- a/crates/api/src/routes/admin/api_v1/products.rs +++ b/crates/api/src/routes/admin/api_v1/products.rs @@ -1,3 +1,5 @@ +use std::collections::HashSet; + use actix::Addr; use actix_web::web::{Data, Json, ServiceConfig}; use actix_web::{delete, get, patch, post, HttpResponse}; @@ -110,20 +112,36 @@ async fn create_product( } ); + let long_description = product.long_description.as_str(); + let short_description = product.short_description.as_str(); + let name = product.name.as_str(); + + let n = long_description + .split_ascii_whitespace() + .chain(short_description.split_ascii_whitespace()) + .chain(name.split_ascii_whitespace()) + .count(); + + use itertools::Itertools; if let Err(e) = search .create_index( tarpc::context::current(), - channels::search::create_index::Input { - key: product.id.to_string(), - value: vec![ - product.long_description.to_string(), - product.short_description.to_string(), - product.name.to_string(), - ] - .join(" "), - collection: "products".into(), - lang: payload.lang, - }, + channels::search::create_index::Input::new( + product.id.to_string(), + long_description + .split_ascii_whitespace() + .chain(short_description.split_ascii_whitespace()) + .chain(name.split_ascii_whitespace()) + .fold(HashSet::with_capacity(n), |mut h, word| { + h.insert(word); + h + }) + .iter() + .join(" "), + "default", + "products", + payload.lang.parse().unwrap_or_else(|_| channels::Lang::Pol), + ), ) .await { diff --git a/crates/api/src/routes/mod.rs b/crates/api/src/routes/mod.rs index e5bd608..ced2b5b 100644 --- a/crates/api/src/routes/mod.rs +++ b/crates/api/src/routes/mod.rs @@ -38,6 +38,7 @@ pub enum Error { #[from(ignore)] Unauthorized, CriticalFailure, + UnknownLanguage, Public(public::Error), Admin(admin::Error), Database(database_manager::Error), @@ -80,6 +81,7 @@ impl Display for Error { Error::Order(_e) => serde_json::to_string(&self).unwrap_or_default(), Error::Pay(_e) => serde_json::to_string(&self).unwrap_or_default(), Error::Token(_e) => serde_json::to_string(&self).unwrap_or_default(), + Error::UnknownLanguage => serde_json::to_string(&self).unwrap_or_default(), }; f.write_str(&msg) } @@ -99,6 +101,7 @@ impl ResponseError for Error { Error::Order(_) => StatusCode::BAD_REQUEST, Error::Pay(_) => StatusCode::BAD_REQUEST, Error::Token(_) => StatusCode::BAD_REQUEST, + Error::UnknownLanguage => StatusCode::BAD_REQUEST, } } } diff --git a/crates/api/src/routes/public/api_v1/unrestricted.rs b/crates/api/src/routes/public/api_v1/unrestricted.rs index 42d1781..a1c3a82 100644 --- a/crates/api/src/routes/public/api_v1/unrestricted.rs +++ b/crates/api/src/routes/public/api_v1/unrestricted.rs @@ -19,14 +19,17 @@ async fn search( query: Query, ) -> routes::Result>> { let q = query.into_inner(); + let lang = match q.lang.parse() { + Ok(lang) => lang, + Err(e) => { + tracing::warn!("{}", e); + return Err(routes::Error::UnknownLanguage); + } + }; let product_ids: Vec = match search .search( tarpc::context::current(), - channels::search::search::Input { - query: q.q, - collection: "products".into(), - lang: q.lang, - }, + channels::search::search::Input::new(q.q, "products", "default", lang), ) .await { diff --git a/crates/channels/Cargo.toml b/crates/channels/Cargo.toml index c6bb467..ba12861 100644 --- a/crates/channels/Cargo.toml +++ b/crates/channels/Cargo.toml @@ -8,7 +8,8 @@ accounts = [] carts = [] emails = [] search = [] -default = ['accounts', 'carts', 'emails', 'search'] +stocks = [] +default = ['accounts', 'carts', 'emails', 'search', 'stocks'] [dependencies] bincode = { version = "*" } @@ -22,3 +23,4 @@ tarpc = { version = "0.30.0", features = ["tokio1", "serde-transport-bincode", " thiserror = { version = "1.0.37" } tokio = { version = "1.21.2", features = ['full'] } tracing = { version = "0.1.37" } +whatlang = { version = "0.16.2" } diff --git a/crates/channels/src/accounts.rs b/crates/channels/src/accounts.rs index fabc0be..5ef43d4 100644 --- a/crates/channels/src/accounts.rs +++ b/crates/channels/src/accounts.rs @@ -127,13 +127,8 @@ pub mod rpc { use tarpc::client; use tarpc::tokio_serde::formats::Bincode; - let addr = { - let l = config.lock(); - ( - l.account_manager().rpc_bind.clone(), - l.account_manager().rpc_port, - ) - }; + let l = config.lock(); + let addr = l.account_manager().rpc_addr(); let transport = tarpc::serde_transport::tcp::connect(addr, Bincode::default); diff --git a/crates/channels/src/lib.rs b/crates/channels/src/lib.rs index 8bd225c..1484c98 100644 --- a/crates/channels/src/lib.rs +++ b/crates/channels/src/lib.rs @@ -1,5 +1,7 @@ #![feature(structural_match)] +pub use whatlang::Lang; + #[cfg(feature = "accounts")] pub mod accounts; #[cfg(feature = "carts")] @@ -10,6 +12,8 @@ pub mod mqtt; pub mod rpc; #[cfg(feature = "search")] pub mod search; +#[cfg(feature = "stocks")] +pub mod stocks; pub trait DeserializePayload { fn deserialize_payload(self, bytes: bytes::Bytes) -> Option; diff --git a/crates/channels/src/search.rs b/crates/channels/src/search.rs index 347ad7f..2650ce6 100644 --- a/crates/channels/src/search.rs +++ b/crates/channels/src/search.rs @@ -8,14 +8,80 @@ pub enum Error { pub type Result = std::result::Result; +#[allow(clippy::module_inception)] pub mod search { + use crate::search::create_index::Lang; use crate::search::Error; #[derive(Debug, serde::Serialize, serde::Deserialize)] pub struct Input { pub query: String, + pub bucket: String, pub collection: String, - pub lang: String, + pub lang: Lang, + } + + impl Input { + pub fn new, B: Into, C: Into>( + query: Q, + collection: C, + bucket: B, + lang: whatlang::Lang, + ) -> Self { + Self { + query: query.into(), + bucket: bucket.into(), + collection: collection.into(), + lang: Lang(lang), + } + } + } + + #[derive(Debug, Default, serde::Serialize, serde::Deserialize)] + pub struct Output { + pub found: Option>, + pub error: Option, + } + + impl Output { + pub fn found(found: Vec) -> Self { + Self { + found: Some(found), + ..Default::default() + } + } + + pub fn error(error: Error) -> Self { + Self { + error: Some(error), + ..Default::default() + } + } + } +} + +pub mod suggest { + use crate::search::Error; + + #[derive(Debug, serde::Serialize, serde::Deserialize)] + pub struct Input { + pub query: String, + pub bucket: String, + pub collection: String, + } + + impl Input { + pub fn new, B: Into, C: Into>( + query: Q, + collection: C, + bucket: B, + ) -> Self { + Self { + query: query.into(), + bucket: bucket.into(), + collection: collection.into(), + } + } } #[derive(Debug, Default, serde::Serialize, serde::Deserialize)] @@ -42,14 +108,79 @@ pub mod search { } pub mod create_index { + use std::fmt::Formatter; + + use serde::de::Visitor; + use serde::{Deserialize, Deserializer, Serialize, Serializer}; + use crate::search::Error; + #[derive(Debug)] + pub struct Lang(pub whatlang::Lang); + + impl Serialize for Lang { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + serializer.serialize_str(self.0.code()) + } + } + + impl<'de> Deserialize<'de> for Lang { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + struct SV; + + impl Visitor<'_> for SV { + type Value = whatlang::Lang; + + fn expecting(&self, formatter: &mut Formatter) -> std::fmt::Result { + formatter.write_str("must be valid whatlang::Lang") + } + + fn visit_string(self, v: String) -> Result + where + E: serde::de::Error, + { + match v.parse() { + Ok(l) => Ok(l), + Err(e) => Err(E::custom(format!("{}", e))), + } + } + } + + Ok(Self(deserializer.deserialize_string(SV)?)) + } + } + #[derive(Debug, serde::Serialize, serde::Deserialize)] pub struct Input { pub key: String, pub value: String, + pub bucket: String, pub collection: String, - pub lang: String, + pub lang: Lang, + } + + impl Input { + pub fn new, V: Into, B: Into, C: Into>( + key: K, + value: V, + collection: C, + bucket: B, + lang: whatlang::Lang, + ) -> Self { + Self { + key: key.into(), + value: value.into(), + bucket: bucket.into(), + collection: collection.into(), + lang: Lang(lang), + } + } } #[derive(Debug, Default, serde::Serialize, serde::Deserialize)] @@ -78,13 +209,16 @@ pub mod create_index { pub mod rpc { use config::SharedAppConfig; - use crate::search::{create_index, search}; + use crate::search::{create_index, search, suggest}; #[tarpc::service] pub trait Search { /// Search all matching indices. async fn search(input: search::Input) -> search::Output; + /// Suggest all matching indices. + async fn suggest(input: suggest::Input) -> suggest::Output; + /// Create new search index. async fn create_index(input: create_index::Input) -> create_index::Output; } diff --git a/crates/channels/src/stocks.rs b/crates/channels/src/stocks.rs new file mode 100644 index 0000000..45f5024 --- /dev/null +++ b/crates/channels/src/stocks.rs @@ -0,0 +1,103 @@ +pub static CLIENT_NAME: &str = "stocks"; + +pub mod create_product { + use model::{ + Days, Price, ProductCategory, ProductLongDesc, ProductName, ProductShortDesc, Quantity, + QuantityUnit, + }; + + #[derive(Debug, serde::Serialize, serde::Deserialize)] + pub struct ProductInput { + pub name: ProductName, + pub short_description: ProductShortDesc, + pub long_description: ProductLongDesc, + pub category: Option, + pub price: Price, + pub deliver_days_flag: Days, + } + + #[derive(Debug, serde::Serialize, serde::Deserialize)] + pub struct StockInput { + pub quantity: Quantity, + pub quantity_unit: QuantityUnit, + } + + #[derive(Debug, serde::Serialize, serde::Deserialize)] + pub struct Input { + pub product: ProductInput, + pub stock: StockInput, + } + + #[derive(Debug, serde::Serialize, serde::Deserialize)] + pub struct Output { + pub product: model::Product, + pub stocks: Vec, + pub photos: Vec, + } +} + +pub mod detailed_product { + #[derive(Debug, serde::Serialize, serde::Deserialize)] + pub struct Input { + pub product_id: model::ProductId, + } + + #[derive(Debug, serde::Serialize, serde::Deserialize)] + pub struct Output { + pub product: model::Product, + pub stocks: Vec, + pub photos: Vec, + } +} + +pub mod detailed_products { + #[derive(Debug, serde::Serialize, serde::Deserialize)] + pub struct Input {} + + #[derive(Debug, serde::Serialize, serde::Deserialize)] + pub struct Output { + pub products: Vec, + } +} + +pub mod rpc { + use config::SharedAppConfig; + + use crate::accounts::register; + + #[tarpc::service] + pub trait Stocks { + /// List of products with stock size and photos + async fn detailed_products(input: register::Input) -> register::Output; + } + + pub async fn create_client(config: SharedAppConfig) -> StocksClient { + use tarpc::client; + use tarpc::tokio_serde::formats::Bincode; + + let l = config.lock(); + let addr = l.stocks_manager().rpc_addr(); + + let transport = tarpc::serde_transport::tcp::connect(addr, Bincode::default); + + let client = StocksClient::new( + client::Config::default(), + transport.await.expect("Failed to connect to server"), + ) + .spawn(); + + client + } +} + +pub mod mqtt { + use config::SharedAppConfig; + use rumqttc::EventLoop; + + use crate::stocks::CLIENT_NAME; + use crate::AsyncClient; + + pub fn create_client(config: SharedAppConfig) -> (AsyncClient, EventLoop) { + crate::mqtt::create_client(CLIENT_NAME, config.lock().stocks_manager().mqtt_addr()) + } +} diff --git a/crates/config/Cargo.toml b/crates/config/Cargo.toml index a45e650..dc0d52b 100644 --- a/crates/config/Cargo.toml +++ b/crates/config/Cargo.toml @@ -5,6 +5,7 @@ edition = "2021" [dependencies] actix-web = { version = "4.0", features = [] } +cookie = { version = "0.16.1", features = ["signed"] } parking_lot = { version = "0.12", features = [] } password-hash = { version = "0.4", features = ["alloc"] } pay_u = { version = '0.1', features = ["single-client"] } diff --git a/crates/config/src/lib.rs b/crates/config/src/lib.rs index d958d69..3311651 100644 --- a/crates/config/src/lib.rs +++ b/crates/config/src/lib.rs @@ -23,7 +23,7 @@ trait Example: Sized + Default { pub struct SharedAppConfig(Arc>); impl SharedAppConfig { - fn new(app_config: AppConfig) -> Self { + pub fn new(app_config: AppConfig) -> Self { Self(Arc::new(Mutex::new(app_config))) } } @@ -176,6 +176,7 @@ impl WebConfig { pub fn session_secret(&self) -> actix_web::cookie::Key { use actix_web::cookie::Key; + self.session_secret .as_ref() .map(|s| Key::from(s.as_bytes())) @@ -324,9 +325,9 @@ impl Example for SearchConfig {} impl Default for SearchConfig { fn default() -> Self { Self { - sonic_search_addr: Some("0.0.0.0:1491".into()), + sonic_search_addr: Some("[::1]:1491".into()), sonic_search_pass: Some("SecretPassword".into()), - sonic_ingest_addr: Some("0.0.0.0:1491".into()), + sonic_ingest_addr: Some("[::1]:1491".into()), sonic_ingest_pass: Some("SecretPassword".into()), rpc_port: 19332, rpc_bind: "0.0.0.0".into(), @@ -347,6 +348,7 @@ impl SearchConfig { .or_else(|| std::env::var("SONIC_SEARCH_ADDR").ok()) .expect("Search sonic_search_addr nor SONIC_SEARCH_ADDR env variable was provided") } + pub fn sonic_search_pass(&self) -> String { self.sonic_search_pass .as_ref() @@ -354,6 +356,7 @@ impl SearchConfig { .or_else(|| std::env::var("SONIC_SEARCH_PASS").ok()) .expect("Search sonic_search_pass nor SONIC_SEARCH_PASS env variable was provided") } + pub fn sonic_ingest_addr(&self) -> String { self.sonic_ingest_addr .as_ref() @@ -361,6 +364,7 @@ impl SearchConfig { .or_else(|| std::env::var("SONIC_INGEST_ADDR").ok()) .expect("Search sonic_ingest_addr nor SONIC_INGEST_ADDR env variable was provided") } + pub fn sonic_ingest_pass(&self) -> String { self.sonic_ingest_pass .as_ref() @@ -368,6 +372,7 @@ impl SearchConfig { .or_else(|| std::env::var("SONIC_INGEST_PASS").ok()) .expect("Search sonic_ingest_pass nor SONIC_INGEST_PASS env variable was provided") } + pub fn search_active(&self) -> bool { self.search_active || std::env::var("SEARCH_ACTIVE") @@ -500,6 +505,7 @@ impl Default for EmailSenderConfig { } } } + impl Example for EmailSenderConfig {} impl EmailSenderConfig { @@ -512,6 +518,39 @@ impl EmailSenderConfig { } } +#[derive(Serialize, Deserialize)] +pub struct StocksConfig { + pub rpc_port: u16, + pub rpc_bind: String, + pub mqtt_port: u16, + pub mqtt_bind: String, + pub database_url: String, +} + +impl Example for StocksConfig {} + +impl Default for StocksConfig { + fn default() -> Self { + Self { + rpc_port: 19333, + rpc_bind: "0.0.0.0".into(), + mqtt_port: 1886, + mqtt_bind: "0.0.0.0".into(), + database_url: "postgres://postgres@localhost/bazzar_stocks".into(), + } + } +} + +impl StocksConfig { + pub fn rpc_addr(&self) -> (&str, u16) { + (&self.rpc_bind, self.rpc_port) + } + + pub fn mqtt_addr(&self) -> (&str, u16) { + (&self.mqtt_bind, self.mqtt_port) + } +} + #[derive(Serialize, Deserialize)] pub struct AppConfig { #[serde(default)] @@ -532,6 +571,8 @@ pub struct AppConfig { cart_manager: CartManagerConfig, #[serde(default)] email_sender: EmailSenderConfig, + #[serde(default)] + stocks: StocksConfig, #[serde(skip)] config_path: String, } @@ -548,6 +589,7 @@ impl Example for AppConfig { account_manager: AccountManagerConfig::example(), cart_manager: CartManagerConfig::example(), email_sender: EmailSenderConfig::example(), + stocks: StocksConfig::example(), config_path: "".to_string(), } } @@ -605,20 +647,25 @@ impl AppConfig { pub fn email_sender(&self) -> &EmailSenderConfig { &self.email_sender } + + pub fn stocks_manager(&self) -> &StocksConfig { + &self.stocks + } } impl Default for AppConfig { fn default() -> Self { Self { - payment: Default::default(), + payment: PaymentConfig::default(), web: WebConfig::default(), - mail: Default::default(), + mail: MailConfig::default(), database: DatabaseConfig::default(), - search: Default::default(), + search: SearchConfig::default(), files: FilesConfig::default(), account_manager: AccountManagerConfig::default(), - cart_manager: Default::default(), - email_sender: Default::default(), + cart_manager: CartManagerConfig::default(), + email_sender: EmailSenderConfig::default(), + stocks: StocksConfig::default(), config_path: "".to_string(), } } @@ -628,7 +675,7 @@ pub fn default_load(opts: &impl UpdateConfig) -> SharedAppConfig { load("./bazzar.toml", opts) } -fn load(config_path: &str, opts: &impl UpdateConfig) -> SharedAppConfig { +pub fn load(config_path: &str, opts: &impl UpdateConfig) -> SharedAppConfig { match std::fs::read_to_string(config_path) { Ok(c) => { let mut c = toml::from_str(&c).unwrap(); diff --git a/crates/model/src/lib.rs b/crates/model/src/lib.rs index ab160bd..7da0e69 100644 --- a/crates/model/src/lib.rs +++ b/crates/model/src/lib.rs @@ -833,6 +833,10 @@ impl ProductName { pub fn into_inner(self) -> String { self.0 } + + pub fn as_sr(&self) -> &str { + &self.0 + } } #[cfg_attr(feature = "db", derive(sqlx::Type))] @@ -849,6 +853,10 @@ impl ProductShortDesc { pub fn into_inner(self) -> String { self.0 } + + pub fn as_sr(&self) -> &str { + &self.0 + } } #[cfg_attr(feature = "db", derive(sqlx::Type))] @@ -861,6 +869,10 @@ impl ProductLongDesc { pub fn into_inner(self) -> String { self.0 } + + pub fn as_sr(&self) -> &str { + &self.0 + } } impl ProductLongDesc { @@ -880,6 +892,25 @@ impl ProductCategory { pub fn new>(s: S) -> Self { Self(s.into()) } + + pub fn as_sr(&self) -> &str { + &self.0 + } +} + +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct DetailedProduct { + pub id: ProductId, + pub name: ProductName, + pub short_description: ProductShortDesc, + pub long_description: ProductLongDesc, + pub category: Option, + pub price: Price, + pub deliver_days_flag: Days, + + pub stocks: Vec, + + pub photos: Vec, } #[cfg_attr(feature = "dummy", derive(fake::Dummy))] diff --git a/crates/search_manager/Cargo.toml b/crates/search_manager/Cargo.toml index 28c90af..298bc05 100644 --- a/crates/search_manager/Cargo.toml +++ b/crates/search_manager/Cargo.toml @@ -31,3 +31,4 @@ tracing = { version = "0.1.6" } tracing-opentelemetry = { version = "0.17.4" } tracing-subscriber = { version = "0.3.16", features = ["env-filter"] } uuid = { version = "1.2.1", features = ["serde"] } +whatlang = { version = "0.16.2" } diff --git a/crates/search_manager/src/actions.rs b/crates/search_manager/src/actions.rs index e14ee9d..5d87422 100644 --- a/crates/search_manager/src/actions.rs +++ b/crates/search_manager/src/actions.rs @@ -1,15 +1,19 @@ -use channels::search::{create_index, search, Error}; +use channels::search::{create_index, search, suggest, Error}; use config::SharedAppConfig; -use sonic_channel::{Dest, ObjDest, PushRequest, QueryRequest}; +use sonic_channel::{Dest, ObjDest, PushRequest, QueryRequest, SuggestRequest}; use crate::context::Context; pub async fn search(msg: search::Input, ctx: Context, _config: SharedAppConfig) -> search::Output { if let Ok(l) = ctx.search.lock() { - match l.query(QueryRequest::new( - Dest::col_buc(msg.collection, msg.lang), - &msg.query, - )) { + let search::Input { + query, + bucket, + collection, + lang, + } = msg; + let query = QueryRequest::new(Dest::col_buc(collection, bucket), query).lang(lang.0); + match l.query(query) { Ok(res) => search::Output::found(res), Err(e) => { tracing::error!("{e:?}"); @@ -21,19 +25,46 @@ pub async fn search(msg: search::Input, ctx: Context, _config: SharedAppConfig) } } +pub async fn suggest( + msg: suggest::Input, + ctx: Context, + _config: SharedAppConfig, +) -> suggest::Output { + if let Ok(l) = ctx.search.lock() { + let suggest::Input { + query, + bucket, + collection, + } = msg; + let query = SuggestRequest::new(Dest::col_buc(collection, bucket), query).limit(10); + match l.suggest(query) { + Ok(res) => suggest::Output::found(res), + Err(e) => { + tracing::error!("{e:?}"); + suggest::Output::error(Error::QueryFailed) + } + } + } else { + suggest::Output::found(vec![]) + } +} + pub async fn create_index( msg: create_index::Input, ctx: Context, _config: SharedAppConfig, ) -> create_index::Output { if let Ok(l) = ctx.ingest.lock() { - match l.push(PushRequest::new( - ObjDest::new(Dest::col_buc(msg.collection, msg.lang), &msg.key), - &msg.value, - )) { + match l.push( + PushRequest::new( + ObjDest::new(Dest::col_buc(msg.collection, msg.bucket), &msg.key), + &msg.value, + ) + .lang(msg.lang.0), + ) { Ok(_) => create_index::Output::ok(), Err(e) => { - tracing::error!("{e:?}"); + tracing::error!("push {e:?}"); create_index::Output::error(Error::CantCreate) } } @@ -41,3 +72,84 @@ pub async fn create_index( create_index::Output::ok() } } + +#[cfg(test)] +mod tests { + use channels::Lang; + use config::UpdateConfig; + + use crate::actions::{create_index, search, suggest}; + + struct Opts; + + impl UpdateConfig for Opts {} + + #[actix::test] + async fn check_lookup() { + let config = config::load("../../bazzar.toml", &Opts); + let ctx = crate::Context::new(config.clone()).unwrap(); + + create_index( + create_index::Input::new("1", "zielony pomidor", "check_lookup", "default", Lang::Pol), + ctx.clone(), + config.clone(), + ) + .await; + create_index( + create_index::Input::new( + "2", + "pomarańczowa marchewka", + "check_lookup", + "default", + Lang::Pol, + ), + ctx.clone(), + config.clone(), + ) + .await; + create_index( + create_index::Input::new( + "3", + "czerwony pomidor", + "check_lookup", + "default", + Lang::Pol, + ), + ctx.clone(), + config.clone(), + ) + .await; + create_index( + create_index::Input::new("4", "zółta kukurydza", "check_lookup", "default", Lang::Pol), + ctx.clone(), + config.clone(), + ) + .await; + + let search_res: search::Output = search( + search::Input::new("pomidor", "check_lookup", "default", Lang::Pol), + ctx.clone(), + config.clone(), + ) + .await; + + let suggest_res: suggest::Output = suggest( + suggest::Input::new("pom", "check_lookup", "default"), + ctx.clone(), + config.clone(), + ) + .await; + + { + let mut res = search_res.found.unwrap(); + res.sort(); + assert_eq!(res, vec!["1".to_string(), "3".to_string()]); + } + + { + let mut res = suggest_res.found.unwrap(); + res.sort(); + assert_eq!(res, vec!["pomarańczowa".to_string(), "pomidor".to_string()]); + } + } +} diff --git a/crates/search_manager/src/rpc.rs b/crates/search_manager/src/rpc.rs index e1423b6..8fed417 100644 --- a/crates/search_manager/src/rpc.rs +++ b/crates/search_manager/src/rpc.rs @@ -1,5 +1,5 @@ use channels::search::rpc::Search; -use channels::search::{create_index, search}; +use channels::search::{create_index, search, suggest}; use config::SharedAppConfig; use tarpc::context; @@ -14,14 +14,21 @@ pub struct SearchServer { #[tarpc::server] impl Search for SearchServer { async fn search(self, _: context::Context, input: search::Input) -> search::Output { + tracing::info!("Received {:?}", input); crate::actions::search(input, self.ctx, self.config).await } + async fn suggest(self, _: context::Context, input: suggest::Input) -> suggest::Output { + tracing::info!("Received {:?}", input); + crate::actions::suggest(input, self.ctx, self.config).await + } + async fn create_index( self, _: context::Context, input: create_index::Input, ) -> create_index::Output { + tracing::info!("Received {:?}", input); crate::actions::create_index(input, self.ctx, self.config).await } } diff --git a/crates/stock_manager/Cargo.toml b/crates/stock_manager/Cargo.toml new file mode 100644 index 0000000..ab6b169 --- /dev/null +++ b/crates/stock_manager/Cargo.toml @@ -0,0 +1,30 @@ +[package] +name = "stock_manager" +version = "0.1.0" +edition = "2021" + +[[bin]] +name = "stock-manager" +path = "./src/main.rs" + +[dependencies] +channels = { path = "../channels" } +chrono = { version = "0.4", features = ["serde"] } +config = { path = "../config" } +derive_more = { version = "0.99", features = [] } +dotenv = { version = "0.15.0" } +futures = { version = "0.3.25" } +model = { path = "../model" } +opentelemetry = { version = "0.17.0" } +opentelemetry-jaeger = { version = "0.17.0" } +pretty_env_logger = { version = "0.4", features = [] } +rumqttc = { version = "*" } +serde = { version = "1.0", features = ["derive"] } +sqlx = { version = "0.6.2", features = ["migrate", "runtime-actix-rustls", "all-types", "postgres"] } +sqlx-core = { version = "0.6.2", features = [] } +tarpc = { version = "0.30.0", features = ["tokio1", "serde-transport-bincode", "serde-transport", "serde", "serde-transport-json", "tcp"] } +thiserror = { version = "1.0.31" } +tokio = { version = "1.21.2", features = ['full'] } +tracing = { version = "0.1.6" } +tracing-opentelemetry = { version = "0.17.4" } +tracing-subscriber = { version = "0.3.16", features = ["env-filter"] } diff --git a/crates/stock_manager/migrations/202204131841_init.sql b/crates/stock_manager/migrations/202204131841_init.sql new file mode 100644 index 0000000..0e06e07 --- /dev/null +++ b/crates/stock_manager/migrations/202204131841_init.sql @@ -0,0 +1,46 @@ +CREATE EXTENSION IF NOT EXISTS "uuid-ossp"; + +CREATE TYPE "QuantityUnit" AS ENUM ( + 'g', + 'dkg', + 'kg', + 'piece' +); + +CREATE TABLE photos ( + id integer NOT NULL PRIMARY KEY, + local_path character varying NOT NULL, + file_name character varying NOT NULL, + unique_name text DEFAULT (gen_random_uuid())::text NOT NULL +); + +CREATE TABLE products ( + id integer NOT NULL PRIMARY KEY, + name character varying NOT NULL, + category character varying, + deliver_days_flag integer DEFAULT 127 NOT NULL +); + +CREATE TABLE product_variants ( + id integer NOT NULL PRIMARY KEY, + product_id integer REFERENCES products (id) NOT NULL, + name character varying NOT NULL, + short_description character varying NOT NULL, + long_description character varying NOT NULL, + price integer NOT NULL, + CONSTRAINT non_negative CHECK ((price >= 0)) +); + +CREATE TABLE stocks ( + id integer NOT NULL PRIMARY KEY, + product_variant_id integer REFERENCES product_variants(id) NOT NULL, + quantity integer DEFAULT 0 NOT NULL, + quantity_unit "QuantityUnit" NOT NULL, + CONSTRAINT positive_quantity CHECK ((quantity >= 0)) +); + +CREATE TABLE product_photos ( + id integer NOT NULL PRIMARY KEY, + product_variant_id integer REFERENCES product_variants(id) NOT NULL, + photo_id integer REFERENCES photos(id) NOT NULL +); diff --git a/crates/stock_manager/src/actions.rs b/crates/stock_manager/src/actions.rs new file mode 100644 index 0000000..e69de29 diff --git a/crates/stock_manager/src/context.rs b/crates/stock_manager/src/context.rs new file mode 100644 index 0000000..e69de29 diff --git a/crates/stock_manager/src/db/mod.rs b/crates/stock_manager/src/db/mod.rs new file mode 100644 index 0000000..344faa0 --- /dev/null +++ b/crates/stock_manager/src/db/mod.rs @@ -0,0 +1,28 @@ +use config::SharedAppConfig; + +mod photos; +mod product_photos; +mod products; +mod stocks; + +#[derive(Clone)] +pub struct Database { + pub pool: sqlx::PgPool, + _config: SharedAppConfig, +} + +impl Database { + pub async fn build(config: SharedAppConfig) -> Self { + let url = config.lock().stocks_manager().database_url.clone(); + let pool = sqlx::PgPool::connect(&url).await.unwrap_or_else(|e| { + tracing::error!("Failed to connect to database. {e:?}"); + std::process::exit(1); + }); + Self { + pool, + _config: config, + } + } + + pub fn pool(&self) {} +} diff --git a/crates/stock_manager/src/db/photos.rs b/crates/stock_manager/src/db/photos.rs new file mode 100644 index 0000000..e69de29 diff --git a/crates/stock_manager/src/db/product_photos.rs b/crates/stock_manager/src/db/product_photos.rs new file mode 100644 index 0000000..e69de29 diff --git a/crates/stock_manager/src/db/products.rs b/crates/stock_manager/src/db/products.rs new file mode 100644 index 0000000..e69de29 diff --git a/crates/stock_manager/src/db/stocks.rs b/crates/stock_manager/src/db/stocks.rs new file mode 100644 index 0000000..e69de29 diff --git a/crates/stock_manager/src/main.rs b/crates/stock_manager/src/main.rs new file mode 100644 index 0000000..4480adb --- /dev/null +++ b/crates/stock_manager/src/main.rs @@ -0,0 +1,8 @@ +mod actions; +mod context; +mod db; +mod mqtt; +mod rpc; + +#[tokio::main] +async fn main() {} diff --git a/crates/stock_manager/src/mqtt.rs b/crates/stock_manager/src/mqtt.rs new file mode 100644 index 0000000..e69de29 diff --git a/crates/stock_manager/src/rpc.rs b/crates/stock_manager/src/rpc.rs new file mode 100644 index 0000000..e69de29 diff --git a/scripts/migrate.sh b/scripts/migrate.sh index 0543c9e..63630b7 100755 --- a/scripts/migrate.sh +++ b/scripts/migrate.sh @@ -7,3 +7,6 @@ sqlx migrate run -D "${ACCOUNT_DATABASE_URL}" --source ./crates/account_manager/ psql postgres postgres -c "CREATE DATABASE ${DATABASE_NAME}_carts" || echo 0 sqlx migrate run -D "${CART_DATABASE_URL}" --source ./crates/cart_manager/migrations + +psql postgres postgres -c "CREATE DATABASE ${DATABASE_NAME}_stocks" || echo 0 +sqlx migrate run -D "${STOCK_DATABASE_URL}" --source ./crates/stock_manager/migrations