extracting search manager

This commit is contained in:
eraden 2022-11-05 12:31:18 +01:00
parent e2c34d68aa
commit 733df891c0
6 changed files with 179 additions and 3 deletions

8
Cargo.lock generated
View File

@ -3662,17 +3662,25 @@ version = "0.1.0"
dependencies = [
"actix 0.13.0",
"actix-rt",
"channels",
"chrono",
"config",
"derive_more",
"futures 0.3.25",
"model",
"opentelemetry 0.17.0",
"opentelemetry-jaeger",
"parking_lot 0.12.1",
"pretty_env_logger",
"rumqttc",
"serde",
"sonic-channel",
"tarpc",
"thiserror",
"tokio",
"tracing",
"tracing-opentelemetry",
"tracing-subscriber",
"uuid 1.2.1",
]

View File

@ -5,6 +5,7 @@ pub mod carts;
pub mod emails;
pub mod mqtt;
pub mod rpc;
pub mod search;
pub trait DeserializePayload {
fn deserialize_payload<T: serde::de::DeserializeOwned>(self, bytes: bytes::Bytes) -> Option<T>;

View File

@ -0,0 +1,44 @@
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("Can't create index")]
CantCreate,
#[error("Failed to find records in bucket")]
QueryFailed,
}
pub type Result<T> = std::result::Result<T, Error>;
pub mod search {
use crate::search::Error;
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct Input {
pub query: String,
pub collection: String,
pub lang: String,
}
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct Output {
pub found: Option<Vec<String>>,
pub error: Error,
}
}
pub mod create_index {
use crate::search::Error;
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct Input {
pub key: String,
pub value: String,
pub collection: String,
pub lang: String,
}
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct Output {
pub found: Option<()>,
pub error: Error,
}
}

View File

@ -13,6 +13,8 @@ actix-rt = { version = "2.7", features = [] }
channels = { path = "../channels" }
chrono = { version = "0.4", features = ["serde"] }
config = { path = "../config" }
dotenv = { version = "0.15.0" }
handlebars = { version = "*", features = [] }
model = { path = "../model" }
opentelemetry = { version = "0.17.0" }
opentelemetry-jaeger = { version = "0.17.0" }
@ -28,5 +30,3 @@ tracing = { version = "0.1.37" }
tracing-opentelemetry = { version = "0.17.4" }
tracing-subscriber = { version = "0.3.16", features = ["env-filter"] }
uuid = { version = "0.8", features = ["serde"] }
dotenv = { version = "0.15.0" }
handlebars = { version = "*", features = [] }

View File

@ -3,18 +3,30 @@ name = "search_manager"
version = "0.1.0"
edition = "2021"
[[bin]]
name = "search-manager"
path = "./src/main.rs"
[dependencies]
actix = { version = "0.13", features = [] }
actix-rt = { version = "2.7", features = [] }
channels = { path = "../channels" }
chrono = { version = "0.4", features = ["serde"] }
config = { path = "../config" }
derive_more = { version = "0.99", features = [] }
futures = { version = "0.3.25" }
model = { path = "../model" }
opentelemetry = { version = "0.17.0" }
opentelemetry-jaeger = { version = "0.17.0" }
parking_lot = { version = "0.12", features = [] }
pretty_env_logger = { version = "0.4", features = [] }
rumqttc = { version = "*" }
serde = { version = "1.0", features = ["derive"] }
sonic-channel = { version = "1.1.0", features = ["ingest"] }
tarpc = { version = "0.30.0", features = ["tokio1", "serde-transport-bincode", "serde-transport", "serde", "serde-transport-json", "tcp"] }
thiserror = { version = "1.0.31" }
tracing = { version = "0.1.34" }
tokio = { version = "1.21.2", features = ['full'] }
tracing = { version = "0.1.6" }
tracing-opentelemetry = { version = "0.17.4" }
tracing-subscriber = { version = "0.3.16", features = ["env-filter"] }
uuid = { version = "1.2.1", features = ["serde"] }

View File

@ -0,0 +1,111 @@
#![feature(structural_match)]
use std::env;
use std::sync::{Arc, RwLock};
use config::{SharedAppConfig, UpdateConfig};
use search_manager::Channels;
use sonic_channel::SonicChannel;
use tracing_subscriber::fmt::format::FmtSpan;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
// pub mod actions;
// pub mod db;
// pub mod mqtt;
// pub mod rpc;
pub type Result<T> = std::result::Result<T, Error>;
#[derive(Debug, Copy, Clone, serde::Serialize, thiserror::Error)]
pub enum Error {
#[error("Unable to send or receive msg from database")]
DbCritical,
#[error("Failed to load account data")]
Account,
#[error("Failed to load account addresses")]
Addresses,
#[error("Unable to save record")]
Saving,
#[error("Unable to hash password")]
Hashing,
}
pub struct Opts {}
impl UpdateConfig for Opts {}
mod rpc {}
#[derive(Clone)]
pub struct Context {
search: Arc<RwLock<sonic_channel::SearchChannel>>,
ingest: Arc<RwLock<sonic_channel::IngestChannel>>,
}
impl Context {
pub fn new(config: SharedAppConfig) -> Option<Self> {
let enabled = config.lock().search().search_active();
if enabled {
let search = {
let l = config.lock();
Arc::new(RwLock::new(
sonic_channel::SearchChannel::start(
l.search().sonic_search_addr(),
l.search().sonic_search_pass(),
)
.unwrap_or_else(|e| panic!("Failed to connect to sonic search channel. {}", e)),
))
};
let ingest = {
let l = config.lock();
Arc::new(RwLock::new(
sonic_channel::IngestChannel::start(
l.search().sonic_ingest_addr(),
l.search().sonic_ingest_pass(),
)
.unwrap_or_else(|e| panic!("Failed to connect to sonic ingest channel. {}", e)),
))
};
Some(Self { search, ingest })
} else {
None
}
}
}
#[actix::main]
async fn main() {
dotenv::dotenv().ok();
init_tracing("search-manager");
let opts = Opts {};
let config = config::default_load(&opts);
let db = db::Database::build(config.clone()).await;
// let mqtt_client = mqtt::start(config.clone(), db.clone()).await;
// rpc::start(config.clone(), db.clone(), mqtt_client.clone()).await;
}
pub fn init_tracing(_service_name: &str) {
env::set_var("OTEL_BSP_MAX_EXPORT_BATCH_SIZE", "12");
let tracer = {
use opentelemetry::sdk::export::trace::stdout::new_pipeline;
use opentelemetry::sdk::trace::Config;
new_pipeline()
.with_trace_config(Config::default())
.with_pretty_print(true)
.install_simple()
};
tracing_subscriber::registry()
.with(tracing_subscriber::EnvFilter::from_default_env())
.with(tracing_subscriber::fmt::layer().with_span_events(FmtSpan::NEW | FmtSpan::CLOSE))
.with(tracing_opentelemetry::layer().with_tracer(tracer))
.try_init()
.unwrap();
}