Start working on stocks, finish search (suggestions added)

This commit is contained in:
Adrian Woźniak 2022-11-06 19:50:51 +01:00
parent 95b20dffd7
commit 6c28472ace
No known key found for this signature in database
GPG Key ID: 0012845A89C7352B
36 changed files with 692 additions and 70 deletions

View File

@ -1,5 +1,5 @@
#[target.x86_64-unknown-linux-gnu] [target.x86_64-unknown-linux-gnu]
#linker = "clang" linker = "clang"
#rustflags = [ rustflags = [
# "-C", "link-arg=-fuse-ld=mold", "-C", "link-arg=-fuse-ld=mold",
#] ]

5
.env
View File

@ -2,6 +2,7 @@ DATABASE_NAME=bazzar
DATABASE_URL=postgres://postgres@localhost/bazzar DATABASE_URL=postgres://postgres@localhost/bazzar
ACCOUNT_DATABASE_URL=postgres://postgres@localhost/bazzar_accounts ACCOUNT_DATABASE_URL=postgres://postgres@localhost/bazzar_accounts
CART_DATABASE_URL=postgres://postgres@localhost/bazzar_carts CART_DATABASE_URL=postgres://postgres@localhost/bazzar_carts
STOCK_DATABASE_URL=postgres://postgres@localhost/bazzar_stocks
PASS_SALT=18CHwV7eGFAea16z+qMKZg PASS_SALT=18CHwV7eGFAea16z+qMKZg
RUST_LOG=debug RUST_LOG=debug
@ -30,8 +31,8 @@ WEB_HOST=0.0.0.0
FILES_PUBLIC_PATH=/files FILES_PUBLIC_PATH=/files
FILES_LOCAL_PATH=./tmp FILES_LOCAL_PATH=./tmp
SONIC_SEARCH_ADDR=0.0.0.0:1491 SONIC_SEARCH_ADDR=[::1]:1491
SONIC_SEARCH_PASS=SecretPassword SONIC_SEARCH_PASS=SecretPassword
SONIC_INGEST_ADDR=0.0.0.0:1491 SONIC_INGEST_ADDR=[::1]:1491
SONIC_INGEST_PASS=SecretPassword SONIC_INGEST_PASS=SecretPassword
SEARCH_ACTIVE=true SEARCH_ACTIVE=true

32
Cargo.lock generated
View File

@ -6,8 +6,6 @@ version = 3
name = "account_manager" name = "account_manager"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"actix 0.13.0",
"actix-rt",
"bincode", "bincode",
"bytes", "bytes",
"channels", "channels",
@ -684,6 +682,7 @@ dependencies = [
"gumdrop", "gumdrop",
"human-panic", "human-panic",
"include_dir", "include_dir",
"itertools",
"jemallocator", "jemallocator",
"model", "model",
"oauth2", "oauth2",
@ -889,6 +888,7 @@ dependencies = [
"thiserror", "thiserror",
"tokio", "tokio",
"tracing", "tracing",
"whatlang",
] ]
[[package]] [[package]]
@ -954,6 +954,7 @@ name = "config"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"actix-web", "actix-web",
"cookie",
"parking_lot 0.12.1", "parking_lot 0.12.1",
"password-hash", "password-hash",
"pay_u", "pay_u",
@ -3682,6 +3683,7 @@ dependencies = [
"tracing-opentelemetry", "tracing-opentelemetry",
"tracing-subscriber", "tracing-subscriber",
"uuid 1.2.1", "uuid 1.2.1",
"whatlang",
] ]
[[package]] [[package]]
@ -4083,6 +4085,32 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
[[package]]
name = "stock_manager"
version = "0.1.0"
dependencies = [
"channels",
"chrono",
"config",
"derive_more",
"dotenv",
"futures 0.3.25",
"model",
"opentelemetry 0.17.0",
"opentelemetry-jaeger",
"pretty_env_logger",
"rumqttc",
"serde",
"sqlx",
"sqlx-core",
"tarpc",
"thiserror",
"tokio",
"tracing",
"tracing-opentelemetry",
"tracing-subscriber",
]
[[package]] [[package]]
name = "stringprep" name = "stringprep"
version = "0.1.2" version = "0.1.2"

View File

@ -13,6 +13,7 @@ members = [
"crates/order_manager", "crates/order_manager",
"crates/payment_manager", "crates/payment_manager",
"crates/search_manager", "crates/search_manager",
"crates/stock_manager",
"crates/token_manager", "crates/token_manager",
"crates/fs_manager", "crates/fs_manager",
"crates/lang_provider", "crates/lang_provider",

View File

@ -8,8 +8,6 @@ name = "account-manager"
path = "src/main.rs" path = "src/main.rs"
[dependencies] [dependencies]
actix = { version = "0.13", features = [] }
actix-rt = { version = "2.7", features = [] }
bincode = { version = "1.3.3" } bincode = { version = "1.3.3" }
bytes = { version = "1.2.1" } bytes = { version = "1.2.1" }
channels = { path = "../channels" } channels = { path = "../channels" }

View File

@ -4,6 +4,8 @@ pub mod addresses;
pub use accounts::*; pub use accounts::*;
pub use addresses::*; pub use addresses::*;
use config::SharedAppConfig; use config::SharedAppConfig;
use sqlx_core::pool::Pool;
use sqlx_core::postgres::Postgres;
#[derive(Clone)] #[derive(Clone)]
pub struct Database { pub struct Database {
@ -24,5 +26,7 @@ impl Database {
} }
} }
pub fn pool(&self) {} pub fn pool(&self) -> Pool<Postgres> {
self.pool.clone()
}
} }

View File

@ -32,7 +32,7 @@ pub struct Opts {}
impl UpdateConfig for Opts {} impl UpdateConfig for Opts {}
#[actix::main] #[tokio::main]
async fn main() { async fn main() {
dotenv::dotenv().ok(); dotenv::dotenv().ok();
init_tracing("account-manager"); init_tracing("account-manager");

View File

@ -30,6 +30,7 @@ futures-util = { version = "0.3", features = [] }
gumdrop = { version = "0.8", features = [] } gumdrop = { version = "0.8", features = [] }
human-panic = { version = "1.0.3" } human-panic = { version = "1.0.3" }
include_dir = { version = "0.7.2", features = [] } include_dir = { version = "0.7.2", features = [] }
itertools = { version = "0.10.5" }
jemallocator = { version = "0.3", features = [] } jemallocator = { version = "0.3", features = [] }
model = { path = "../model", version = "0.1", features = ["db"] } model = { path = "../model", version = "0.1", features = ["db"] }
oauth2 = { version = "4.1", features = [] } oauth2 = { version = "4.1", features = [] }

View File

@ -9,6 +9,7 @@ use actix_session::SessionMiddleware;
use actix_web::middleware::Logger; use actix_web::middleware::Logger;
use actix_web::web::Data; use actix_web::web::Data;
use actix_web::{App, HttpServer}; use actix_web::{App, HttpServer};
use channels::Lang;
use config::UpdateConfig; use config::UpdateConfig;
use jemallocator::Jemalloc; use jemallocator::Jemalloc;
use model::{AccountState, Email, Login, PassHash, Password, Role}; use model::{AccountState, Email, Login, PassHash, Password, Role};
@ -197,6 +198,9 @@ async fn test_mailer(opts: TestMailerOpts) -> Result<()> {
async fn reindex(opts: ReIndexOpts) -> Result<()> { async fn reindex(opts: ReIndexOpts) -> Result<()> {
let config = config::default_load(&opts); let config = config::default_load(&opts);
opts.update_config(&mut *config.lock()); opts.update_config(&mut *config.lock());
let lang: Lang = opts.lang.clone().parse().unwrap();
let db = database_manager::Database::build(config.clone()) let db = database_manager::Database::build(config.clone())
.await .await
.start(); .start();
@ -206,26 +210,35 @@ async fn reindex(opts: ReIndexOpts) -> Result<()> {
.await .await
.unwrap() .unwrap()
.unwrap(); .unwrap();
tracing::info!("{:?}", products);
for product in products { for product in products {
search if let Ok(res) = search
.create_index( .create_index(
tarpc::context::current(), tarpc::context::current(),
channels::search::create_index::Input { channels::search::create_index::Input::new(
key: product.id.to_string(), product.id.to_string(),
value: vec![ vec![
product.long_description.into_inner(), product.long_description.into_inner(),
product.short_description.into_inner(), product.short_description.into_inner(),
product.name.into_inner(), product.name.into_inner(),
] ]
.join(" "), .join(" "),
collection: "products".into(), "products",
lang: opts.lang.clone(), "default",
}, lang,
),
) )
.await .await
.unwrap(); {
if let Some(error) = res.error {
tracing::error!("{}", error);
return Ok(());
}
}
} }
println!("Success!"); tracing::info!("Success!");
Ok(()) Ok(())
} }

View File

@ -1,3 +1,5 @@
use std::collections::HashSet;
use actix::Addr; use actix::Addr;
use actix_web::web::{Data, Json, ServiceConfig}; use actix_web::web::{Data, Json, ServiceConfig};
use actix_web::{delete, get, patch, post, HttpResponse}; use actix_web::{delete, get, patch, post, HttpResponse};
@ -110,20 +112,36 @@ async fn create_product(
} }
); );
let long_description = product.long_description.as_str();
let short_description = product.short_description.as_str();
let name = product.name.as_str();
let n = long_description
.split_ascii_whitespace()
.chain(short_description.split_ascii_whitespace())
.chain(name.split_ascii_whitespace())
.count();
use itertools::Itertools;
if let Err(e) = search if let Err(e) = search
.create_index( .create_index(
tarpc::context::current(), tarpc::context::current(),
channels::search::create_index::Input { channels::search::create_index::Input::new(
key: product.id.to_string(), product.id.to_string(),
value: vec![ long_description
product.long_description.to_string(), .split_ascii_whitespace()
product.short_description.to_string(), .chain(short_description.split_ascii_whitespace())
product.name.to_string(), .chain(name.split_ascii_whitespace())
] .fold(HashSet::with_capacity(n), |mut h, word| {
.join(" "), h.insert(word);
collection: "products".into(), h
lang: payload.lang, })
}, .iter()
.join(" "),
"default",
"products",
payload.lang.parse().unwrap_or_else(|_| channels::Lang::Pol),
),
) )
.await .await
{ {

View File

@ -38,6 +38,7 @@ pub enum Error {
#[from(ignore)] #[from(ignore)]
Unauthorized, Unauthorized,
CriticalFailure, CriticalFailure,
UnknownLanguage,
Public(public::Error), Public(public::Error),
Admin(admin::Error), Admin(admin::Error),
Database(database_manager::Error), Database(database_manager::Error),
@ -80,6 +81,7 @@ impl Display for Error {
Error::Order(_e) => serde_json::to_string(&self).unwrap_or_default(), Error::Order(_e) => serde_json::to_string(&self).unwrap_or_default(),
Error::Pay(_e) => serde_json::to_string(&self).unwrap_or_default(), Error::Pay(_e) => serde_json::to_string(&self).unwrap_or_default(),
Error::Token(_e) => serde_json::to_string(&self).unwrap_or_default(), Error::Token(_e) => serde_json::to_string(&self).unwrap_or_default(),
Error::UnknownLanguage => serde_json::to_string(&self).unwrap_or_default(),
}; };
f.write_str(&msg) f.write_str(&msg)
} }
@ -99,6 +101,7 @@ impl ResponseError for Error {
Error::Order(_) => StatusCode::BAD_REQUEST, Error::Order(_) => StatusCode::BAD_REQUEST,
Error::Pay(_) => StatusCode::BAD_REQUEST, Error::Pay(_) => StatusCode::BAD_REQUEST,
Error::Token(_) => StatusCode::BAD_REQUEST, Error::Token(_) => StatusCode::BAD_REQUEST,
Error::UnknownLanguage => StatusCode::BAD_REQUEST,
} }
} }
} }

View File

@ -19,14 +19,17 @@ async fn search(
query: Query<model::api::SearchRequest>, query: Query<model::api::SearchRequest>,
) -> routes::Result<Json<Vec<model::Product>>> { ) -> routes::Result<Json<Vec<model::Product>>> {
let q = query.into_inner(); let q = query.into_inner();
let lang = match q.lang.parse() {
Ok(lang) => lang,
Err(e) => {
tracing::warn!("{}", e);
return Err(routes::Error::UnknownLanguage);
}
};
let product_ids: Vec<model::ProductId> = match search let product_ids: Vec<model::ProductId> = match search
.search( .search(
tarpc::context::current(), tarpc::context::current(),
channels::search::search::Input { channels::search::search::Input::new(q.q, "products", "default", lang),
query: q.q,
collection: "products".into(),
lang: q.lang,
},
) )
.await .await
{ {

View File

@ -8,7 +8,8 @@ accounts = []
carts = [] carts = []
emails = [] emails = []
search = [] search = []
default = ['accounts', 'carts', 'emails', 'search'] stocks = []
default = ['accounts', 'carts', 'emails', 'search', 'stocks']
[dependencies] [dependencies]
bincode = { version = "*" } bincode = { version = "*" }
@ -22,3 +23,4 @@ tarpc = { version = "0.30.0", features = ["tokio1", "serde-transport-bincode", "
thiserror = { version = "1.0.37" } thiserror = { version = "1.0.37" }
tokio = { version = "1.21.2", features = ['full'] } tokio = { version = "1.21.2", features = ['full'] }
tracing = { version = "0.1.37" } tracing = { version = "0.1.37" }
whatlang = { version = "0.16.2" }

View File

@ -127,13 +127,8 @@ pub mod rpc {
use tarpc::client; use tarpc::client;
use tarpc::tokio_serde::formats::Bincode; use tarpc::tokio_serde::formats::Bincode;
let addr = { let l = config.lock();
let l = config.lock(); let addr = l.account_manager().rpc_addr();
(
l.account_manager().rpc_bind.clone(),
l.account_manager().rpc_port,
)
};
let transport = tarpc::serde_transport::tcp::connect(addr, Bincode::default); let transport = tarpc::serde_transport::tcp::connect(addr, Bincode::default);

View File

@ -1,5 +1,7 @@
#![feature(structural_match)] #![feature(structural_match)]
pub use whatlang::Lang;
#[cfg(feature = "accounts")] #[cfg(feature = "accounts")]
pub mod accounts; pub mod accounts;
#[cfg(feature = "carts")] #[cfg(feature = "carts")]
@ -10,6 +12,8 @@ pub mod mqtt;
pub mod rpc; pub mod rpc;
#[cfg(feature = "search")] #[cfg(feature = "search")]
pub mod search; pub mod search;
#[cfg(feature = "stocks")]
pub mod stocks;
pub trait DeserializePayload { pub trait DeserializePayload {
fn deserialize_payload<T: serde::de::DeserializeOwned>(self, bytes: bytes::Bytes) -> Option<T>; fn deserialize_payload<T: serde::de::DeserializeOwned>(self, bytes: bytes::Bytes) -> Option<T>;

View File

@ -8,14 +8,80 @@ pub enum Error {
pub type Result<T> = std::result::Result<T, Error>; pub type Result<T> = std::result::Result<T, Error>;
#[allow(clippy::module_inception)]
pub mod search { pub mod search {
use crate::search::create_index::Lang;
use crate::search::Error; use crate::search::Error;
#[derive(Debug, serde::Serialize, serde::Deserialize)] #[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct Input { pub struct Input {
pub query: String, pub query: String,
pub bucket: String,
pub collection: String, pub collection: String,
pub lang: String, pub lang: Lang,
}
impl Input {
pub fn new<Q: Into<String>, B: Into<String>, C: Into<String>>(
query: Q,
collection: C,
bucket: B,
lang: whatlang::Lang,
) -> Self {
Self {
query: query.into(),
bucket: bucket.into(),
collection: collection.into(),
lang: Lang(lang),
}
}
}
#[derive(Debug, Default, serde::Serialize, serde::Deserialize)]
pub struct Output {
pub found: Option<Vec<String>>,
pub error: Option<Error>,
}
impl Output {
pub fn found(found: Vec<String>) -> Self {
Self {
found: Some(found),
..Default::default()
}
}
pub fn error(error: Error) -> Self {
Self {
error: Some(error),
..Default::default()
}
}
}
}
pub mod suggest {
use crate::search::Error;
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct Input {
pub query: String,
pub bucket: String,
pub collection: String,
}
impl Input {
pub fn new<Q: Into<String>, B: Into<String>, C: Into<String>>(
query: Q,
collection: C,
bucket: B,
) -> Self {
Self {
query: query.into(),
bucket: bucket.into(),
collection: collection.into(),
}
}
} }
#[derive(Debug, Default, serde::Serialize, serde::Deserialize)] #[derive(Debug, Default, serde::Serialize, serde::Deserialize)]
@ -42,14 +108,79 @@ pub mod search {
} }
pub mod create_index { pub mod create_index {
use std::fmt::Formatter;
use serde::de::Visitor;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use crate::search::Error; use crate::search::Error;
#[derive(Debug)]
pub struct Lang(pub whatlang::Lang);
impl Serialize for Lang {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(self.0.code())
}
}
impl<'de> Deserialize<'de> for Lang {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
struct SV;
impl Visitor<'_> for SV {
type Value = whatlang::Lang;
fn expecting(&self, formatter: &mut Formatter) -> std::fmt::Result {
formatter.write_str("must be valid whatlang::Lang")
}
fn visit_string<E>(self, v: String) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
match v.parse() {
Ok(l) => Ok(l),
Err(e) => Err(E::custom(format!("{}", e))),
}
}
}
Ok(Self(deserializer.deserialize_string(SV)?))
}
}
#[derive(Debug, serde::Serialize, serde::Deserialize)] #[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct Input { pub struct Input {
pub key: String, pub key: String,
pub value: String, pub value: String,
pub bucket: String,
pub collection: String, pub collection: String,
pub lang: String, pub lang: Lang,
}
impl Input {
pub fn new<K: Into<String>, V: Into<String>, B: Into<String>, C: Into<String>>(
key: K,
value: V,
collection: C,
bucket: B,
lang: whatlang::Lang,
) -> Self {
Self {
key: key.into(),
value: value.into(),
bucket: bucket.into(),
collection: collection.into(),
lang: Lang(lang),
}
}
} }
#[derive(Debug, Default, serde::Serialize, serde::Deserialize)] #[derive(Debug, Default, serde::Serialize, serde::Deserialize)]
@ -78,13 +209,16 @@ pub mod create_index {
pub mod rpc { pub mod rpc {
use config::SharedAppConfig; use config::SharedAppConfig;
use crate::search::{create_index, search}; use crate::search::{create_index, search, suggest};
#[tarpc::service] #[tarpc::service]
pub trait Search { pub trait Search {
/// Search all matching indices. /// Search all matching indices.
async fn search(input: search::Input) -> search::Output; async fn search(input: search::Input) -> search::Output;
/// Suggest all matching indices.
async fn suggest(input: suggest::Input) -> suggest::Output;
/// Create new search index. /// Create new search index.
async fn create_index(input: create_index::Input) -> create_index::Output; async fn create_index(input: create_index::Input) -> create_index::Output;
} }

View File

@ -0,0 +1,103 @@
pub static CLIENT_NAME: &str = "stocks";
pub mod create_product {
use model::{
Days, Price, ProductCategory, ProductLongDesc, ProductName, ProductShortDesc, Quantity,
QuantityUnit,
};
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct ProductInput {
pub name: ProductName,
pub short_description: ProductShortDesc,
pub long_description: ProductLongDesc,
pub category: Option<ProductCategory>,
pub price: Price,
pub deliver_days_flag: Days,
}
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct StockInput {
pub quantity: Quantity,
pub quantity_unit: QuantityUnit,
}
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct Input {
pub product: ProductInput,
pub stock: StockInput,
}
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct Output {
pub product: model::Product,
pub stocks: Vec<model::Stock>,
pub photos: Vec<model::Photo>,
}
}
pub mod detailed_product {
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct Input {
pub product_id: model::ProductId,
}
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct Output {
pub product: model::Product,
pub stocks: Vec<model::Stock>,
pub photos: Vec<model::Photo>,
}
}
pub mod detailed_products {
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct Input {}
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct Output {
pub products: Vec<model::DetailedProduct>,
}
}
pub mod rpc {
use config::SharedAppConfig;
use crate::accounts::register;
#[tarpc::service]
pub trait Stocks {
/// List of products with stock size and photos
async fn detailed_products(input: register::Input) -> register::Output;
}
pub async fn create_client(config: SharedAppConfig) -> StocksClient {
use tarpc::client;
use tarpc::tokio_serde::formats::Bincode;
let l = config.lock();
let addr = l.stocks_manager().rpc_addr();
let transport = tarpc::serde_transport::tcp::connect(addr, Bincode::default);
let client = StocksClient::new(
client::Config::default(),
transport.await.expect("Failed to connect to server"),
)
.spawn();
client
}
}
pub mod mqtt {
use config::SharedAppConfig;
use rumqttc::EventLoop;
use crate::stocks::CLIENT_NAME;
use crate::AsyncClient;
pub fn create_client(config: SharedAppConfig) -> (AsyncClient, EventLoop) {
crate::mqtt::create_client(CLIENT_NAME, config.lock().stocks_manager().mqtt_addr())
}
}

View File

@ -5,6 +5,7 @@ edition = "2021"
[dependencies] [dependencies]
actix-web = { version = "4.0", features = [] } actix-web = { version = "4.0", features = [] }
cookie = { version = "0.16.1", features = ["signed"] }
parking_lot = { version = "0.12", features = [] } parking_lot = { version = "0.12", features = [] }
password-hash = { version = "0.4", features = ["alloc"] } password-hash = { version = "0.4", features = ["alloc"] }
pay_u = { version = '0.1', features = ["single-client"] } pay_u = { version = '0.1', features = ["single-client"] }

View File

@ -23,7 +23,7 @@ trait Example: Sized + Default {
pub struct SharedAppConfig(Arc<Mutex<AppConfig>>); pub struct SharedAppConfig(Arc<Mutex<AppConfig>>);
impl SharedAppConfig { impl SharedAppConfig {
fn new(app_config: AppConfig) -> Self { pub fn new(app_config: AppConfig) -> Self {
Self(Arc::new(Mutex::new(app_config))) Self(Arc::new(Mutex::new(app_config)))
} }
} }
@ -176,6 +176,7 @@ impl WebConfig {
pub fn session_secret(&self) -> actix_web::cookie::Key { pub fn session_secret(&self) -> actix_web::cookie::Key {
use actix_web::cookie::Key; use actix_web::cookie::Key;
self.session_secret self.session_secret
.as_ref() .as_ref()
.map(|s| Key::from(s.as_bytes())) .map(|s| Key::from(s.as_bytes()))
@ -324,9 +325,9 @@ impl Example for SearchConfig {}
impl Default for SearchConfig { impl Default for SearchConfig {
fn default() -> Self { fn default() -> Self {
Self { Self {
sonic_search_addr: Some("0.0.0.0:1491".into()), sonic_search_addr: Some("[::1]:1491".into()),
sonic_search_pass: Some("SecretPassword".into()), sonic_search_pass: Some("SecretPassword".into()),
sonic_ingest_addr: Some("0.0.0.0:1491".into()), sonic_ingest_addr: Some("[::1]:1491".into()),
sonic_ingest_pass: Some("SecretPassword".into()), sonic_ingest_pass: Some("SecretPassword".into()),
rpc_port: 19332, rpc_port: 19332,
rpc_bind: "0.0.0.0".into(), rpc_bind: "0.0.0.0".into(),
@ -347,6 +348,7 @@ impl SearchConfig {
.or_else(|| std::env::var("SONIC_SEARCH_ADDR").ok()) .or_else(|| std::env::var("SONIC_SEARCH_ADDR").ok())
.expect("Search sonic_search_addr nor SONIC_SEARCH_ADDR env variable was provided") .expect("Search sonic_search_addr nor SONIC_SEARCH_ADDR env variable was provided")
} }
pub fn sonic_search_pass(&self) -> String { pub fn sonic_search_pass(&self) -> String {
self.sonic_search_pass self.sonic_search_pass
.as_ref() .as_ref()
@ -354,6 +356,7 @@ impl SearchConfig {
.or_else(|| std::env::var("SONIC_SEARCH_PASS").ok()) .or_else(|| std::env::var("SONIC_SEARCH_PASS").ok())
.expect("Search sonic_search_pass nor SONIC_SEARCH_PASS env variable was provided") .expect("Search sonic_search_pass nor SONIC_SEARCH_PASS env variable was provided")
} }
pub fn sonic_ingest_addr(&self) -> String { pub fn sonic_ingest_addr(&self) -> String {
self.sonic_ingest_addr self.sonic_ingest_addr
.as_ref() .as_ref()
@ -361,6 +364,7 @@ impl SearchConfig {
.or_else(|| std::env::var("SONIC_INGEST_ADDR").ok()) .or_else(|| std::env::var("SONIC_INGEST_ADDR").ok())
.expect("Search sonic_ingest_addr nor SONIC_INGEST_ADDR env variable was provided") .expect("Search sonic_ingest_addr nor SONIC_INGEST_ADDR env variable was provided")
} }
pub fn sonic_ingest_pass(&self) -> String { pub fn sonic_ingest_pass(&self) -> String {
self.sonic_ingest_pass self.sonic_ingest_pass
.as_ref() .as_ref()
@ -368,6 +372,7 @@ impl SearchConfig {
.or_else(|| std::env::var("SONIC_INGEST_PASS").ok()) .or_else(|| std::env::var("SONIC_INGEST_PASS").ok())
.expect("Search sonic_ingest_pass nor SONIC_INGEST_PASS env variable was provided") .expect("Search sonic_ingest_pass nor SONIC_INGEST_PASS env variable was provided")
} }
pub fn search_active(&self) -> bool { pub fn search_active(&self) -> bool {
self.search_active self.search_active
|| std::env::var("SEARCH_ACTIVE") || std::env::var("SEARCH_ACTIVE")
@ -500,6 +505,7 @@ impl Default for EmailSenderConfig {
} }
} }
} }
impl Example for EmailSenderConfig {} impl Example for EmailSenderConfig {}
impl EmailSenderConfig { impl EmailSenderConfig {
@ -512,6 +518,39 @@ impl EmailSenderConfig {
} }
} }
#[derive(Serialize, Deserialize)]
pub struct StocksConfig {
pub rpc_port: u16,
pub rpc_bind: String,
pub mqtt_port: u16,
pub mqtt_bind: String,
pub database_url: String,
}
impl Example for StocksConfig {}
impl Default for StocksConfig {
fn default() -> Self {
Self {
rpc_port: 19333,
rpc_bind: "0.0.0.0".into(),
mqtt_port: 1886,
mqtt_bind: "0.0.0.0".into(),
database_url: "postgres://postgres@localhost/bazzar_stocks".into(),
}
}
}
impl StocksConfig {
pub fn rpc_addr(&self) -> (&str, u16) {
(&self.rpc_bind, self.rpc_port)
}
pub fn mqtt_addr(&self) -> (&str, u16) {
(&self.mqtt_bind, self.mqtt_port)
}
}
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
pub struct AppConfig { pub struct AppConfig {
#[serde(default)] #[serde(default)]
@ -532,6 +571,8 @@ pub struct AppConfig {
cart_manager: CartManagerConfig, cart_manager: CartManagerConfig,
#[serde(default)] #[serde(default)]
email_sender: EmailSenderConfig, email_sender: EmailSenderConfig,
#[serde(default)]
stocks: StocksConfig,
#[serde(skip)] #[serde(skip)]
config_path: String, config_path: String,
} }
@ -548,6 +589,7 @@ impl Example for AppConfig {
account_manager: AccountManagerConfig::example(), account_manager: AccountManagerConfig::example(),
cart_manager: CartManagerConfig::example(), cart_manager: CartManagerConfig::example(),
email_sender: EmailSenderConfig::example(), email_sender: EmailSenderConfig::example(),
stocks: StocksConfig::example(),
config_path: "".to_string(), config_path: "".to_string(),
} }
} }
@ -605,20 +647,25 @@ impl AppConfig {
pub fn email_sender(&self) -> &EmailSenderConfig { pub fn email_sender(&self) -> &EmailSenderConfig {
&self.email_sender &self.email_sender
} }
pub fn stocks_manager(&self) -> &StocksConfig {
&self.stocks
}
} }
impl Default for AppConfig { impl Default for AppConfig {
fn default() -> Self { fn default() -> Self {
Self { Self {
payment: Default::default(), payment: PaymentConfig::default(),
web: WebConfig::default(), web: WebConfig::default(),
mail: Default::default(), mail: MailConfig::default(),
database: DatabaseConfig::default(), database: DatabaseConfig::default(),
search: Default::default(), search: SearchConfig::default(),
files: FilesConfig::default(), files: FilesConfig::default(),
account_manager: AccountManagerConfig::default(), account_manager: AccountManagerConfig::default(),
cart_manager: Default::default(), cart_manager: CartManagerConfig::default(),
email_sender: Default::default(), email_sender: EmailSenderConfig::default(),
stocks: StocksConfig::default(),
config_path: "".to_string(), config_path: "".to_string(),
} }
} }
@ -628,7 +675,7 @@ pub fn default_load(opts: &impl UpdateConfig) -> SharedAppConfig {
load("./bazzar.toml", opts) load("./bazzar.toml", opts)
} }
fn load(config_path: &str, opts: &impl UpdateConfig) -> SharedAppConfig { pub fn load(config_path: &str, opts: &impl UpdateConfig) -> SharedAppConfig {
match std::fs::read_to_string(config_path) { match std::fs::read_to_string(config_path) {
Ok(c) => { Ok(c) => {
let mut c = toml::from_str(&c).unwrap(); let mut c = toml::from_str(&c).unwrap();

View File

@ -833,6 +833,10 @@ impl ProductName {
pub fn into_inner(self) -> String { pub fn into_inner(self) -> String {
self.0 self.0
} }
pub fn as_sr(&self) -> &str {
&self.0
}
} }
#[cfg_attr(feature = "db", derive(sqlx::Type))] #[cfg_attr(feature = "db", derive(sqlx::Type))]
@ -849,6 +853,10 @@ impl ProductShortDesc {
pub fn into_inner(self) -> String { pub fn into_inner(self) -> String {
self.0 self.0
} }
pub fn as_sr(&self) -> &str {
&self.0
}
} }
#[cfg_attr(feature = "db", derive(sqlx::Type))] #[cfg_attr(feature = "db", derive(sqlx::Type))]
@ -861,6 +869,10 @@ impl ProductLongDesc {
pub fn into_inner(self) -> String { pub fn into_inner(self) -> String {
self.0 self.0
} }
pub fn as_sr(&self) -> &str {
&self.0
}
} }
impl ProductLongDesc { impl ProductLongDesc {
@ -880,6 +892,25 @@ impl ProductCategory {
pub fn new<S: Into<String>>(s: S) -> Self { pub fn new<S: Into<String>>(s: S) -> Self {
Self(s.into()) Self(s.into())
} }
pub fn as_sr(&self) -> &str {
&self.0
}
}
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct DetailedProduct {
pub id: ProductId,
pub name: ProductName,
pub short_description: ProductShortDesc,
pub long_description: ProductLongDesc,
pub category: Option<ProductCategory>,
pub price: Price,
pub deliver_days_flag: Days,
pub stocks: Vec<Stock>,
pub photos: Vec<Photo>,
} }
#[cfg_attr(feature = "dummy", derive(fake::Dummy))] #[cfg_attr(feature = "dummy", derive(fake::Dummy))]

View File

@ -31,3 +31,4 @@ tracing = { version = "0.1.6" }
tracing-opentelemetry = { version = "0.17.4" } tracing-opentelemetry = { version = "0.17.4" }
tracing-subscriber = { version = "0.3.16", features = ["env-filter"] } tracing-subscriber = { version = "0.3.16", features = ["env-filter"] }
uuid = { version = "1.2.1", features = ["serde"] } uuid = { version = "1.2.1", features = ["serde"] }
whatlang = { version = "0.16.2" }

View File

@ -1,15 +1,19 @@
use channels::search::{create_index, search, Error}; use channels::search::{create_index, search, suggest, Error};
use config::SharedAppConfig; use config::SharedAppConfig;
use sonic_channel::{Dest, ObjDest, PushRequest, QueryRequest}; use sonic_channel::{Dest, ObjDest, PushRequest, QueryRequest, SuggestRequest};
use crate::context::Context; use crate::context::Context;
pub async fn search(msg: search::Input, ctx: Context, _config: SharedAppConfig) -> search::Output { pub async fn search(msg: search::Input, ctx: Context, _config: SharedAppConfig) -> search::Output {
if let Ok(l) = ctx.search.lock() { if let Ok(l) = ctx.search.lock() {
match l.query(QueryRequest::new( let search::Input {
Dest::col_buc(msg.collection, msg.lang), query,
&msg.query, bucket,
)) { collection,
lang,
} = msg;
let query = QueryRequest::new(Dest::col_buc(collection, bucket), query).lang(lang.0);
match l.query(query) {
Ok(res) => search::Output::found(res), Ok(res) => search::Output::found(res),
Err(e) => { Err(e) => {
tracing::error!("{e:?}"); tracing::error!("{e:?}");
@ -21,19 +25,46 @@ pub async fn search(msg: search::Input, ctx: Context, _config: SharedAppConfig)
} }
} }
pub async fn suggest(
msg: suggest::Input,
ctx: Context,
_config: SharedAppConfig,
) -> suggest::Output {
if let Ok(l) = ctx.search.lock() {
let suggest::Input {
query,
bucket,
collection,
} = msg;
let query = SuggestRequest::new(Dest::col_buc(collection, bucket), query).limit(10);
match l.suggest(query) {
Ok(res) => suggest::Output::found(res),
Err(e) => {
tracing::error!("{e:?}");
suggest::Output::error(Error::QueryFailed)
}
}
} else {
suggest::Output::found(vec![])
}
}
pub async fn create_index( pub async fn create_index(
msg: create_index::Input, msg: create_index::Input,
ctx: Context, ctx: Context,
_config: SharedAppConfig, _config: SharedAppConfig,
) -> create_index::Output { ) -> create_index::Output {
if let Ok(l) = ctx.ingest.lock() { if let Ok(l) = ctx.ingest.lock() {
match l.push(PushRequest::new( match l.push(
ObjDest::new(Dest::col_buc(msg.collection, msg.lang), &msg.key), PushRequest::new(
&msg.value, ObjDest::new(Dest::col_buc(msg.collection, msg.bucket), &msg.key),
)) { &msg.value,
)
.lang(msg.lang.0),
) {
Ok(_) => create_index::Output::ok(), Ok(_) => create_index::Output::ok(),
Err(e) => { Err(e) => {
tracing::error!("{e:?}"); tracing::error!("push {e:?}");
create_index::Output::error(Error::CantCreate) create_index::Output::error(Error::CantCreate)
} }
} }
@ -41,3 +72,84 @@ pub async fn create_index(
create_index::Output::ok() create_index::Output::ok()
} }
} }
#[cfg(test)]
mod tests {
use channels::Lang;
use config::UpdateConfig;
use crate::actions::{create_index, search, suggest};
struct Opts;
impl UpdateConfig for Opts {}
#[actix::test]
async fn check_lookup() {
let config = config::load("../../bazzar.toml", &Opts);
let ctx = crate::Context::new(config.clone()).unwrap();
create_index(
create_index::Input::new("1", "zielony pomidor", "check_lookup", "default", Lang::Pol),
ctx.clone(),
config.clone(),
)
.await;
create_index(
create_index::Input::new(
"2",
"pomarańczowa marchewka",
"check_lookup",
"default",
Lang::Pol,
),
ctx.clone(),
config.clone(),
)
.await;
create_index(
create_index::Input::new(
"3",
"czerwony pomidor",
"check_lookup",
"default",
Lang::Pol,
),
ctx.clone(),
config.clone(),
)
.await;
create_index(
create_index::Input::new("4", "zółta kukurydza", "check_lookup", "default", Lang::Pol),
ctx.clone(),
config.clone(),
)
.await;
let search_res: search::Output = search(
search::Input::new("pomidor", "check_lookup", "default", Lang::Pol),
ctx.clone(),
config.clone(),
)
.await;
let suggest_res: suggest::Output = suggest(
suggest::Input::new("pom", "check_lookup", "default"),
ctx.clone(),
config.clone(),
)
.await;
{
let mut res = search_res.found.unwrap();
res.sort();
assert_eq!(res, vec!["1".to_string(), "3".to_string()]);
}
{
let mut res = suggest_res.found.unwrap();
res.sort();
assert_eq!(res, vec!["pomarańczowa".to_string(), "pomidor".to_string()]);
}
}
}

View File

@ -1,5 +1,5 @@
use channels::search::rpc::Search; use channels::search::rpc::Search;
use channels::search::{create_index, search}; use channels::search::{create_index, search, suggest};
use config::SharedAppConfig; use config::SharedAppConfig;
use tarpc::context; use tarpc::context;
@ -14,14 +14,21 @@ pub struct SearchServer {
#[tarpc::server] #[tarpc::server]
impl Search for SearchServer { impl Search for SearchServer {
async fn search(self, _: context::Context, input: search::Input) -> search::Output { async fn search(self, _: context::Context, input: search::Input) -> search::Output {
tracing::info!("Received {:?}", input);
crate::actions::search(input, self.ctx, self.config).await crate::actions::search(input, self.ctx, self.config).await
} }
async fn suggest(self, _: context::Context, input: suggest::Input) -> suggest::Output {
tracing::info!("Received {:?}", input);
crate::actions::suggest(input, self.ctx, self.config).await
}
async fn create_index( async fn create_index(
self, self,
_: context::Context, _: context::Context,
input: create_index::Input, input: create_index::Input,
) -> create_index::Output { ) -> create_index::Output {
tracing::info!("Received {:?}", input);
crate::actions::create_index(input, self.ctx, self.config).await crate::actions::create_index(input, self.ctx, self.config).await
} }
} }

View File

@ -0,0 +1,30 @@
[package]
name = "stock_manager"
version = "0.1.0"
edition = "2021"
[[bin]]
name = "stock-manager"
path = "./src/main.rs"
[dependencies]
channels = { path = "../channels" }
chrono = { version = "0.4", features = ["serde"] }
config = { path = "../config" }
derive_more = { version = "0.99", features = [] }
dotenv = { version = "0.15.0" }
futures = { version = "0.3.25" }
model = { path = "../model" }
opentelemetry = { version = "0.17.0" }
opentelemetry-jaeger = { version = "0.17.0" }
pretty_env_logger = { version = "0.4", features = [] }
rumqttc = { version = "*" }
serde = { version = "1.0", features = ["derive"] }
sqlx = { version = "0.6.2", features = ["migrate", "runtime-actix-rustls", "all-types", "postgres"] }
sqlx-core = { version = "0.6.2", features = [] }
tarpc = { version = "0.30.0", features = ["tokio1", "serde-transport-bincode", "serde-transport", "serde", "serde-transport-json", "tcp"] }
thiserror = { version = "1.0.31" }
tokio = { version = "1.21.2", features = ['full'] }
tracing = { version = "0.1.6" }
tracing-opentelemetry = { version = "0.17.4" }
tracing-subscriber = { version = "0.3.16", features = ["env-filter"] }

View File

@ -0,0 +1,46 @@
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
CREATE TYPE "QuantityUnit" AS ENUM (
'g',
'dkg',
'kg',
'piece'
);
CREATE TABLE photos (
id integer NOT NULL PRIMARY KEY,
local_path character varying NOT NULL,
file_name character varying NOT NULL,
unique_name text DEFAULT (gen_random_uuid())::text NOT NULL
);
CREATE TABLE products (
id integer NOT NULL PRIMARY KEY,
name character varying NOT NULL,
category character varying,
deliver_days_flag integer DEFAULT 127 NOT NULL
);
CREATE TABLE product_variants (
id integer NOT NULL PRIMARY KEY,
product_id integer REFERENCES products (id) NOT NULL,
name character varying NOT NULL,
short_description character varying NOT NULL,
long_description character varying NOT NULL,
price integer NOT NULL,
CONSTRAINT non_negative CHECK ((price >= 0))
);
CREATE TABLE stocks (
id integer NOT NULL PRIMARY KEY,
product_variant_id integer REFERENCES product_variants(id) NOT NULL,
quantity integer DEFAULT 0 NOT NULL,
quantity_unit "QuantityUnit" NOT NULL,
CONSTRAINT positive_quantity CHECK ((quantity >= 0))
);
CREATE TABLE product_photos (
id integer NOT NULL PRIMARY KEY,
product_variant_id integer REFERENCES product_variants(id) NOT NULL,
photo_id integer REFERENCES photos(id) NOT NULL
);

View File

View File

View File

@ -0,0 +1,28 @@
use config::SharedAppConfig;
mod photos;
mod product_photos;
mod products;
mod stocks;
#[derive(Clone)]
pub struct Database {
pub pool: sqlx::PgPool,
_config: SharedAppConfig,
}
impl Database {
pub async fn build(config: SharedAppConfig) -> Self {
let url = config.lock().stocks_manager().database_url.clone();
let pool = sqlx::PgPool::connect(&url).await.unwrap_or_else(|e| {
tracing::error!("Failed to connect to database. {e:?}");
std::process::exit(1);
});
Self {
pool,
_config: config,
}
}
pub fn pool(&self) {}
}

View File

View File

View File

View File

@ -0,0 +1,8 @@
mod actions;
mod context;
mod db;
mod mqtt;
mod rpc;
#[tokio::main]
async fn main() {}

View File

View File

View File

@ -7,3 +7,6 @@ sqlx migrate run -D "${ACCOUNT_DATABASE_URL}" --source ./crates/account_manager/
psql postgres postgres -c "CREATE DATABASE ${DATABASE_NAME}_carts" || echo 0 psql postgres postgres -c "CREATE DATABASE ${DATABASE_NAME}_carts" || echo 0
sqlx migrate run -D "${CART_DATABASE_URL}" --source ./crates/cart_manager/migrations sqlx migrate run -D "${CART_DATABASE_URL}" --source ./crates/cart_manager/migrations
psql postgres postgres -c "CREATE DATABASE ${DATABASE_NAME}_stocks" || echo 0
sqlx migrate run -D "${STOCK_DATABASE_URL}" --source ./crates/stock_manager/migrations