From 733df891c0bc3f2337778dfc9e4a6e26dd6b8769 Mon Sep 17 00:00:00 2001 From: eraden Date: Sat, 5 Nov 2022 12:31:18 +0100 Subject: [PATCH] extracting search manager --- Cargo.lock | 8 +++ crates/channels/src/lib.rs | 1 + crates/channels/src/search.rs | 44 ++++++++++++ crates/email_manager/Cargo.toml | 4 +- crates/search_manager/Cargo.toml | 14 +++- crates/search_manager/src/main.rs | 111 ++++++++++++++++++++++++++++++ 6 files changed, 179 insertions(+), 3 deletions(-) create mode 100644 crates/channels/src/search.rs create mode 100644 crates/search_manager/src/main.rs diff --git a/Cargo.lock b/Cargo.lock index 52966c2..686dbb9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3662,17 +3662,25 @@ version = "0.1.0" dependencies = [ "actix 0.13.0", "actix-rt", + "channels", "chrono", "config", "derive_more", + "futures 0.3.25", "model", + "opentelemetry 0.17.0", + "opentelemetry-jaeger", "parking_lot 0.12.1", "pretty_env_logger", "rumqttc", "serde", "sonic-channel", + "tarpc", "thiserror", + "tokio", "tracing", + "tracing-opentelemetry", + "tracing-subscriber", "uuid 1.2.1", ] diff --git a/crates/channels/src/lib.rs b/crates/channels/src/lib.rs index 6256dc2..f4cbf12 100644 --- a/crates/channels/src/lib.rs +++ b/crates/channels/src/lib.rs @@ -5,6 +5,7 @@ pub mod carts; pub mod emails; pub mod mqtt; pub mod rpc; +pub mod search; pub trait DeserializePayload { fn deserialize_payload(self, bytes: bytes::Bytes) -> Option; diff --git a/crates/channels/src/search.rs b/crates/channels/src/search.rs new file mode 100644 index 0000000..c86be2e --- /dev/null +++ b/crates/channels/src/search.rs @@ -0,0 +1,44 @@ +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("Can't create index")] + CantCreate, + #[error("Failed to find records in bucket")] + QueryFailed, +} + +pub type Result = std::result::Result; + +pub mod search { + use crate::search::Error; + + #[derive(Debug, serde::Serialize, serde::Deserialize)] + pub struct Input { + pub query: String, + pub collection: String, + pub lang: String, + } + + #[derive(Debug, serde::Serialize, serde::Deserialize)] + pub struct Output { + pub found: Option>, + pub error: Error, + } +} + +pub mod create_index { + use crate::search::Error; + + #[derive(Debug, serde::Serialize, serde::Deserialize)] + pub struct Input { + pub key: String, + pub value: String, + pub collection: String, + pub lang: String, + } + + #[derive(Debug, serde::Serialize, serde::Deserialize)] + pub struct Output { + pub found: Option<()>, + pub error: Error, + } +} diff --git a/crates/email_manager/Cargo.toml b/crates/email_manager/Cargo.toml index 3a66659..0f1442e 100644 --- a/crates/email_manager/Cargo.toml +++ b/crates/email_manager/Cargo.toml @@ -13,6 +13,8 @@ actix-rt = { version = "2.7", features = [] } channels = { path = "../channels" } chrono = { version = "0.4", features = ["serde"] } config = { path = "../config" } +dotenv = { version = "0.15.0" } +handlebars = { version = "*", features = [] } model = { path = "../model" } opentelemetry = { version = "0.17.0" } opentelemetry-jaeger = { version = "0.17.0" } @@ -28,5 +30,3 @@ tracing = { version = "0.1.37" } tracing-opentelemetry = { version = "0.17.4" } tracing-subscriber = { version = "0.3.16", features = ["env-filter"] } uuid = { version = "0.8", features = ["serde"] } -dotenv = { version = "0.15.0" } -handlebars = { version = "*", features = [] } diff --git a/crates/search_manager/Cargo.toml b/crates/search_manager/Cargo.toml index 0cde023..0d5ddf8 100644 --- a/crates/search_manager/Cargo.toml +++ b/crates/search_manager/Cargo.toml @@ -3,18 +3,30 @@ name = "search_manager" version = "0.1.0" edition = "2021" +[[bin]] +name = "search-manager" +path = "./src/main.rs" + [dependencies] actix = { version = "0.13", features = [] } actix-rt = { version = "2.7", features = [] } +channels = { path = "../channels" } chrono = { version = "0.4", features = ["serde"] } config = { path = "../config" } derive_more = { version = "0.99", features = [] } +futures = { version = "0.3.25" } model = { path = "../model" } +opentelemetry = { version = "0.17.0" } +opentelemetry-jaeger = { version = "0.17.0" } parking_lot = { version = "0.12", features = [] } pretty_env_logger = { version = "0.4", features = [] } rumqttc = { version = "*" } serde = { version = "1.0", features = ["derive"] } sonic-channel = { version = "1.1.0", features = ["ingest"] } +tarpc = { version = "0.30.0", features = ["tokio1", "serde-transport-bincode", "serde-transport", "serde", "serde-transport-json", "tcp"] } thiserror = { version = "1.0.31" } -tracing = { version = "0.1.34" } +tokio = { version = "1.21.2", features = ['full'] } +tracing = { version = "0.1.6" } +tracing-opentelemetry = { version = "0.17.4" } +tracing-subscriber = { version = "0.3.16", features = ["env-filter"] } uuid = { version = "1.2.1", features = ["serde"] } diff --git a/crates/search_manager/src/main.rs b/crates/search_manager/src/main.rs new file mode 100644 index 0000000..6ad6dd5 --- /dev/null +++ b/crates/search_manager/src/main.rs @@ -0,0 +1,111 @@ +#![feature(structural_match)] + +use std::env; +use std::sync::{Arc, RwLock}; + +use config::{SharedAppConfig, UpdateConfig}; +use search_manager::Channels; +use sonic_channel::SonicChannel; +use tracing_subscriber::fmt::format::FmtSpan; +use tracing_subscriber::layer::SubscriberExt; +use tracing_subscriber::util::SubscriberInitExt; + +// pub mod actions; +// pub mod db; +// pub mod mqtt; +// pub mod rpc; + +pub type Result = std::result::Result; + +#[derive(Debug, Copy, Clone, serde::Serialize, thiserror::Error)] +pub enum Error { + #[error("Unable to send or receive msg from database")] + DbCritical, + #[error("Failed to load account data")] + Account, + #[error("Failed to load account addresses")] + Addresses, + #[error("Unable to save record")] + Saving, + #[error("Unable to hash password")] + Hashing, +} + +pub struct Opts {} + +impl UpdateConfig for Opts {} + +mod rpc {} + +#[derive(Clone)] +pub struct Context { + search: Arc>, + ingest: Arc>, +} +impl Context { + pub fn new(config: SharedAppConfig) -> Option { + let enabled = config.lock().search().search_active(); + + if enabled { + let search = { + let l = config.lock(); + Arc::new(RwLock::new( + sonic_channel::SearchChannel::start( + l.search().sonic_search_addr(), + l.search().sonic_search_pass(), + ) + .unwrap_or_else(|e| panic!("Failed to connect to sonic search channel. {}", e)), + )) + }; + let ingest = { + let l = config.lock(); + Arc::new(RwLock::new( + sonic_channel::IngestChannel::start( + l.search().sonic_ingest_addr(), + l.search().sonic_ingest_pass(), + ) + .unwrap_or_else(|e| panic!("Failed to connect to sonic ingest channel. {}", e)), + )) + }; + Some(Self { search, ingest }) + } else { + None + } + } +} + +#[actix::main] +async fn main() { + dotenv::dotenv().ok(); + init_tracing("search-manager"); + + let opts = Opts {}; + + let config = config::default_load(&opts); + + let db = db::Database::build(config.clone()).await; + + // let mqtt_client = mqtt::start(config.clone(), db.clone()).await; + // rpc::start(config.clone(), db.clone(), mqtt_client.clone()).await; +} + +pub fn init_tracing(_service_name: &str) { + env::set_var("OTEL_BSP_MAX_EXPORT_BATCH_SIZE", "12"); + + let tracer = { + use opentelemetry::sdk::export::trace::stdout::new_pipeline; + use opentelemetry::sdk::trace::Config; + + new_pipeline() + .with_trace_config(Config::default()) + .with_pretty_print(true) + .install_simple() + }; + + tracing_subscriber::registry() + .with(tracing_subscriber::EnvFilter::from_default_env()) + .with(tracing_subscriber::fmt::layer().with_span_events(FmtSpan::NEW | FmtSpan::CLOSE)) + .with(tracing_opentelemetry::layer().with_tracer(tracer)) + .try_init() + .unwrap(); +}