diff --git a/Cargo.lock b/Cargo.lock index 686dbb9..afc6237 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -692,7 +692,6 @@ dependencies = [ "payment_manager", "pretty_env_logger", "rumqttc", - "search_manager", "serde", "serde_json", "sqlx", @@ -3666,6 +3665,7 @@ dependencies = [ "chrono", "config", "derive_more", + "dotenv", "futures 0.3.25", "model", "opentelemetry 0.17.0", diff --git a/crates/api/Cargo.toml b/crates/api/Cargo.toml index f3787ad..60b1ca9 100644 --- a/crates/api/Cargo.toml +++ b/crates/api/Cargo.toml @@ -18,7 +18,7 @@ actix-web-httpauth = { version = "0.6", features = [] } actix-web-opentelemetry = { version = "0.12", features = [] } async-trait = { version = "0.1", features = [] } bytes = { version = "1.1.0" } -channels = { path = "../channels" } +channels = { path = "../channels", features = ['accounts', 'carts', 'emails', 'search'] } chrono = { version = "0.4", features = ["serde"] } config = { path = "../config" } database_manager = { path = "../database_manager" } @@ -38,7 +38,6 @@ parking_lot = { version = "0.12", features = [] } payment_manager = { path = "../payment_manager" } pretty_env_logger = { version = "0.4", features = [] } rumqttc = { version = "*" } -search_manager = { path = "../search_manager" } serde = { version = "1.0", features = ["derive"] } serde_json = { version = "1.0", features = [] } sqlx = { version = "0.6.2", features = ["migrate", "runtime-actix-rustls", "all-types", "postgres"] } diff --git a/crates/api/src/main.rs b/crates/api/src/main.rs index a7c9cf2..3af2acd 100644 --- a/crates/api/src/main.rs +++ b/crates/api/src/main.rs @@ -54,7 +54,7 @@ async fn server(opts: ServerOpts) -> Result<()> { let payment_manager = payment_manager::PaymentManager::build(app_config.clone(), db.clone()) .await .start(); - let search_manager = search_manager::SearchManager::new(app_config.clone()).start(); + let search_manager = channels::search::rpc::create_client(app_config.clone()).await; let fs_manager = fs_manager::FsManager::build(app_config.clone()) .await .expect("Failed to initialize file system storage"); @@ -200,7 +200,7 @@ async fn reindex(opts: ReIndexOpts) -> Result<()> { let db = database_manager::Database::build(config.clone()) .await .start(); - let search = search_manager::SearchManager::new(config).start(); + let search = channels::search::rpc::create_client(config.clone()).await; let products: Vec = db .send(database_manager::AllProducts) .await @@ -208,19 +208,21 @@ async fn reindex(opts: ReIndexOpts) -> Result<()> { .unwrap(); for product in products { search - .send(search_manager::CreateIndex { - key: product.id.to_string(), - value: vec![ - product.long_description.into_inner(), - product.short_description.into_inner(), - product.name.into_inner(), - ] - .join(" "), - collection: "products".into(), - lang: opts.lang.clone(), - }) + .create_index( + tarpc::context::current(), + channels::search::create_index::Input { + key: product.id.to_string(), + value: vec![ + product.long_description.into_inner(), + product.short_description.into_inner(), + product.name.into_inner(), + ] + .join(" "), + collection: "products".into(), + lang: opts.lang.clone(), + }, + ) .await - .unwrap() .unwrap(); } println!("Success!"); diff --git a/crates/api/src/routes/admin/api_v1/products.rs b/crates/api/src/routes/admin/api_v1/products.rs index a18f7a8..ebdc906 100644 --- a/crates/api/src/routes/admin/api_v1/products.rs +++ b/crates/api/src/routes/admin/api_v1/products.rs @@ -8,7 +8,6 @@ use model::{ api, Days, Price, ProductCategory, ProductId, ProductLongDesc, ProductName, ProductShortDesc, Quantity, QuantityUnit, }; -use search_manager::SearchManager; use serde::Deserialize; use token_manager::TokenManager; @@ -94,7 +93,7 @@ async fn create_product( credentials: BearerAuth, tm: Data>, db: Data>, - search: Data>, + search: Data, Json(payload): Json, ) -> routes::Result { credentials.require_admin(tm.into_inner()).await?; @@ -111,17 +110,25 @@ async fn create_product( } ); - search.do_send(search_manager::CreateIndex { - key: product.id.to_string(), - value: vec![ - product.long_description.to_string(), - product.short_description.to_string(), - product.name.to_string(), - ] - .join(" "), - collection: "products".into(), - lang: payload.lang, - }); + if let Err(e) = search + .create_index( + tarpc::context::current(), + channels::search::create_index::Input { + key: product.id.to_string(), + value: vec![ + product.long_description.to_string(), + product.short_description.to_string(), + product.name.to_string(), + ] + .join(" "), + collection: "products".into(), + lang: payload.lang, + }, + ) + .await + { + tracing::error!("{}", e); + } let _ = admin_send_db!( db, diff --git a/crates/api/src/routes/mod.rs b/crates/api/src/routes/mod.rs index a8065b5..e5bd608 100644 --- a/crates/api/src/routes/mod.rs +++ b/crates/api/src/routes/mod.rs @@ -44,7 +44,6 @@ pub enum Error { Fs(fs_manager::Error), Order(order_manager::Error), Pay(payment_manager::Error), - Search(search_manager::Error), Token(token_manager::Error), } @@ -80,7 +79,6 @@ impl Display for Error { Error::Fs(_e) => serde_json::to_string(&self).unwrap_or_default(), Error::Order(_e) => serde_json::to_string(&self).unwrap_or_default(), Error::Pay(_e) => serde_json::to_string(&self).unwrap_or_default(), - Error::Search(_e) => serde_json::to_string(&self).unwrap_or_default(), Error::Token(_e) => serde_json::to_string(&self).unwrap_or_default(), }; f.write_str(&msg) @@ -100,7 +98,6 @@ impl ResponseError for Error { Error::Fs(_) => StatusCode::BAD_REQUEST, Error::Order(_) => StatusCode::BAD_REQUEST, Error::Pay(_) => StatusCode::BAD_REQUEST, - Error::Search(_) => StatusCode::BAD_REQUEST, Error::Token(_) => StatusCode::BAD_REQUEST, } } diff --git a/crates/api/src/routes/public/api_v1/unrestricted.rs b/crates/api/src/routes/public/api_v1/unrestricted.rs index 2a28a9e..42d1781 100644 --- a/crates/api/src/routes/public/api_v1/unrestricted.rs +++ b/crates/api/src/routes/public/api_v1/unrestricted.rs @@ -5,7 +5,6 @@ use config::SharedAppConfig; use database_manager::{query_db, Database}; use model::Encrypt; use payment_manager::{PaymentManager, PaymentNotification}; -use search_manager::SearchManager; use token_manager::TokenManager; use crate::public_send_db; @@ -16,19 +15,24 @@ use crate::routes::{self}; async fn search( db: Data>, _config: Data, - search: Data>, + search: Data, query: Query, ) -> routes::Result>> { let q = query.into_inner(); let product_ids: Vec = match search - .send(search_manager::Search { - query: q.q, - collection: "products".into(), - lang: q.lang, - }) + .search( + tarpc::context::current(), + channels::search::search::Input { + query: q.q, + collection: "products".into(), + lang: q.lang, + }, + ) .await { - Ok(Ok(Some(res))) => res + Ok(channels::search::search::Output { + found: Some(res), .. + }) => res .into_iter() .filter_map(|s| { s.parse::() @@ -36,11 +40,7 @@ async fn search( .map(model::ProductId::from) }) .collect(), - Ok(Ok(None)) => return Ok(Json(vec![])), - Ok(Err(e)) => { - tracing::error!("{e}"); - return Ok(Json(vec![])); - } + Ok(_) => return Ok(Json(vec![])), Err(e) => { tracing::error!("{e:?}"); return Ok(Json(vec![])); diff --git a/crates/cart_manager/src/mqtt.rs b/crates/cart_manager/src/mqtt.rs index 7db5e35..75fda38 100644 --- a/crates/cart_manager/src/mqtt.rs +++ b/crates/cart_manager/src/mqtt.rs @@ -1,5 +1,3 @@ -use std::time::Duration; - use config::SharedAppConfig; use rumqttc::{Event, Incoming}; diff --git a/crates/channels/Cargo.toml b/crates/channels/Cargo.toml index 6974433..c6bb467 100644 --- a/crates/channels/Cargo.toml +++ b/crates/channels/Cargo.toml @@ -3,6 +3,13 @@ name = "channels" version = "0.1.0" edition = "2021" +[features] +accounts = [] +carts = [] +emails = [] +search = [] +default = ['accounts', 'carts', 'emails', 'search'] + [dependencies] bincode = { version = "*" } bytes = { version = "1.2.1" } diff --git a/crates/channels/src/accounts.rs b/crates/channels/src/accounts.rs index 462adb2..fabc0be 100644 --- a/crates/channels/src/accounts.rs +++ b/crates/channels/src/accounts.rs @@ -130,7 +130,7 @@ pub mod rpc { let addr = { let l = config.lock(); ( - l.account_manager().bind.clone(), + l.account_manager().rpc_bind.clone(), l.account_manager().rpc_port, ) }; @@ -155,6 +155,6 @@ pub mod mqtt { use crate::AsyncClient; pub fn create_client(config: SharedAppConfig) -> (AsyncClient, EventLoop) { - crate::mqtt::create_client(CLIENT_NAME, config) + crate::mqtt::create_client(CLIENT_NAME, config.lock().account_manager().mqtt_addr()) } } diff --git a/crates/channels/src/carts.rs b/crates/channels/src/carts.rs index e15ed17..114c6df 100644 --- a/crates/channels/src/carts.rs +++ b/crates/channels/src/carts.rs @@ -189,7 +189,7 @@ pub mod rpc { let addr = { let l = config.lock(); ( - l.account_manager().bind.clone(), + l.account_manager().rpc_bind.clone(), l.account_manager().rpc_port, ) }; @@ -214,6 +214,6 @@ pub mod mqtt { use crate::AsyncClient; pub fn create_client(config: SharedAppConfig) -> (AsyncClient, EventLoop) { - crate::mqtt::create_client(CLIENT_NAME, config) + crate::mqtt::create_client(CLIENT_NAME, config.lock().cart_manager().mqtt_addr()) } } diff --git a/crates/channels/src/emails.rs b/crates/channels/src/emails.rs index 3c34129..7b1db9c 100644 --- a/crates/channels/src/emails.rs +++ b/crates/channels/src/emails.rs @@ -92,6 +92,6 @@ pub mod mqtt { use crate::AsyncClient; pub fn create_client(config: SharedAppConfig) -> (AsyncClient, EventLoop) { - crate::mqtt::create_client(CLIENT_NAME, config) + crate::mqtt::create_client(CLIENT_NAME, config.lock().email_sender().mqtt_addr()) } } diff --git a/crates/channels/src/lib.rs b/crates/channels/src/lib.rs index f4cbf12..8bd225c 100644 --- a/crates/channels/src/lib.rs +++ b/crates/channels/src/lib.rs @@ -1,10 +1,14 @@ #![feature(structural_match)] +#[cfg(feature = "accounts")] pub mod accounts; +#[cfg(feature = "carts")] pub mod carts; +#[cfg(feature = "emails")] pub mod emails; pub mod mqtt; pub mod rpc; +#[cfg(feature = "search")] pub mod search; pub trait DeserializePayload { diff --git a/crates/channels/src/mqtt.rs b/crates/channels/src/mqtt.rs index b11e275..1ddcba2 100644 --- a/crates/channels/src/mqtt.rs +++ b/crates/channels/src/mqtt.rs @@ -1,19 +1,12 @@ use std::time::Duration; -use config::SharedAppConfig; use rumqttc::EventLoop; use crate::AsyncClient; -pub(crate) fn create_client(name: &str, config: SharedAppConfig) -> (AsyncClient, EventLoop) { - let mut mqtt_options = { - let l = config.lock(); - let bind = &l.account_manager().mqtt_bind; - let port = l.account_manager().mqtt_port; - tracing::info!("Starting account mqtt at {}:{}", bind, port); - - rumqttc::MqttOptions::new(name, bind, port) - }; +pub(crate) fn create_client(name: &str, (bind, port): (&str, u16)) -> (AsyncClient, EventLoop) { + tracing::info!("Starting account mqtt at {}:{}", bind, port); + let mut mqtt_options = rumqttc::MqttOptions::new(name, bind, port); mqtt_options.set_keep_alive(Duration::from_secs(5)); let (client, event_loop) = rumqttc::AsyncClient::new(mqtt_options, 10); diff --git a/crates/channels/src/search.rs b/crates/channels/src/search.rs index c86be2e..347ad7f 100644 --- a/crates/channels/src/search.rs +++ b/crates/channels/src/search.rs @@ -1,4 +1,4 @@ -#[derive(Debug, thiserror::Error)] +#[derive(Debug, thiserror::Error, serde::Serialize, serde::Deserialize)] pub enum Error { #[error("Can't create index")] CantCreate, @@ -18,10 +18,26 @@ pub mod search { pub lang: String, } - #[derive(Debug, serde::Serialize, serde::Deserialize)] + #[derive(Debug, Default, serde::Serialize, serde::Deserialize)] pub struct Output { pub found: Option>, - pub error: Error, + pub error: Option, + } + + impl Output { + pub fn found(found: Vec) -> Self { + Self { + found: Some(found), + ..Default::default() + } + } + + pub fn error(error: Error) -> Self { + Self { + error: Some(error), + ..Default::default() + } + } } } @@ -36,9 +52,57 @@ pub mod create_index { pub lang: String, } - #[derive(Debug, serde::Serialize, serde::Deserialize)] + #[derive(Debug, Default, serde::Serialize, serde::Deserialize)] pub struct Output { pub found: Option<()>, - pub error: Error, + pub error: Option, + } + + impl Output { + pub fn ok() -> Self { + Self { + found: Some(()), + ..Default::default() + } + } + + pub fn error(error: Error) -> Self { + Self { + error: Some(error), + ..Default::default() + } + } + } +} + +pub mod rpc { + use config::SharedAppConfig; + + use crate::search::{create_index, search}; + + #[tarpc::service] + pub trait Search { + /// Search all matching indices. + async fn search(input: search::Input) -> search::Output; + + /// Create new search index. + async fn create_index(input: create_index::Input) -> create_index::Output; + } + + pub async fn create_client(config: SharedAppConfig) -> SearchClient { + use tarpc::client; + use tarpc::tokio_serde::formats::Bincode; + + let l = config.lock(); + let transport = + tarpc::serde_transport::tcp::connect(l.search().rpc_addr(), Bincode::default); + + let client = SearchClient::new( + client::Config::default(), + transport.await.expect("Failed to connect to search server"), + ) + .spawn(); + + client } } diff --git a/crates/config/src/lib.rs b/crates/config/src/lib.rs index 2e9db13..d958d69 100644 --- a/crates/config/src/lib.rs +++ b/crates/config/src/lib.rs @@ -313,6 +313,8 @@ pub struct SearchConfig { sonic_search_pass: Option, sonic_ingest_addr: Option, sonic_ingest_pass: Option, + pub rpc_port: u16, + pub rpc_bind: String, #[serde(default)] search_active: bool, } @@ -326,12 +328,18 @@ impl Default for SearchConfig { sonic_search_pass: Some("SecretPassword".into()), sonic_ingest_addr: Some("0.0.0.0:1491".into()), sonic_ingest_pass: Some("SecretPassword".into()), + rpc_port: 19332, + rpc_bind: "0.0.0.0".into(), search_active: true, } } } impl SearchConfig { + pub fn rpc_addr(&self) -> (&str, u16) { + (&self.rpc_bind, self.rpc_port) + } + pub fn sonic_search_addr(&self) -> String { self.sonic_search_addr .as_ref() @@ -410,7 +418,7 @@ impl FilesConfig { #[derive(Debug, Serialize, Deserialize)] pub struct AccountManagerConfig { pub rpc_port: u16, - pub bind: String, + pub rpc_bind: String, pub mqtt_port: u16, pub mqtt_bind: String, pub database_url: String, @@ -420,7 +428,7 @@ impl Default for AccountManagerConfig { fn default() -> Self { Self { rpc_port: 19329, - bind: "0.0.0.0".into(), + rpc_bind: "0.0.0.0".into(), mqtt_port: 1883, mqtt_bind: "0.0.0.0".into(), database_url: "postgres://postgres@localhost/bazzar_accounts".into(), @@ -430,10 +438,20 @@ impl Default for AccountManagerConfig { impl Example for AccountManagerConfig {} +impl AccountManagerConfig { + pub fn rpc_addr(&self) -> (&str, u16) { + (&self.rpc_bind, self.rpc_port) + } + + pub fn mqtt_addr(&self) -> (&str, u16) { + (&self.mqtt_bind, self.mqtt_port) + } +} + #[derive(Debug, Serialize, Deserialize)] pub struct CartManagerConfig { pub rpc_port: u16, - pub bind: String, + pub rpc_bind: String, pub mqtt_port: u16, pub mqtt_bind: String, pub database_url: String, @@ -443,7 +461,7 @@ impl Default for CartManagerConfig { fn default() -> Self { Self { rpc_port: 19330, - bind: "0.0.0.0".into(), + rpc_bind: "0.0.0.0".into(), mqtt_port: 1884, mqtt_bind: "0.0.0.0".into(), database_url: "postgres://postgres@localhost/bazzar_carts".into(), @@ -453,10 +471,19 @@ impl Default for CartManagerConfig { impl Example for CartManagerConfig {} +impl CartManagerConfig { + pub fn rpc_addr(&self) -> (&str, u16) { + (&self.rpc_bind, self.rpc_port) + } + pub fn mqtt_addr(&self) -> (&str, u16) { + (&self.mqtt_bind, self.mqtt_port) + } +} + #[derive(Debug, Serialize, Deserialize)] pub struct EmailSenderConfig { pub rpc_port: u16, - pub bind: String, + pub rpc_bind: String, pub mqtt_port: u16, pub mqtt_bind: String, pub database_url: String, @@ -466,7 +493,7 @@ impl Default for EmailSenderConfig { fn default() -> Self { Self { rpc_port: 19331, - bind: "0.0.0.0".into(), + rpc_bind: "0.0.0.0".into(), mqtt_port: 1885, mqtt_bind: "0.0.0.0".into(), database_url: "postgres://postgres@localhost/bazzar_emails".into(), @@ -475,6 +502,16 @@ impl Default for EmailSenderConfig { } impl Example for EmailSenderConfig {} +impl EmailSenderConfig { + pub fn rpc_addr(&self) -> (&str, u16) { + (&self.rpc_bind, self.rpc_port) + } + + pub fn mqtt_addr(&self) -> (&str, u16) { + (&self.mqtt_bind, self.mqtt_port) + } +} + #[derive(Serialize, Deserialize)] pub struct AppConfig { #[serde(default)] diff --git a/crates/search_manager/Cargo.toml b/crates/search_manager/Cargo.toml index 0d5ddf8..28c90af 100644 --- a/crates/search_manager/Cargo.toml +++ b/crates/search_manager/Cargo.toml @@ -14,6 +14,7 @@ channels = { path = "../channels" } chrono = { version = "0.4", features = ["serde"] } config = { path = "../config" } derive_more = { version = "0.99", features = [] } +dotenv = { version = "0.15.0" } futures = { version = "0.3.25" } model = { path = "../model" } opentelemetry = { version = "0.17.0" } diff --git a/crates/search_manager/src/actions.rs b/crates/search_manager/src/actions.rs new file mode 100644 index 0000000..e14ee9d --- /dev/null +++ b/crates/search_manager/src/actions.rs @@ -0,0 +1,43 @@ +use channels::search::{create_index, search, Error}; +use config::SharedAppConfig; +use sonic_channel::{Dest, ObjDest, PushRequest, QueryRequest}; + +use crate::context::Context; + +pub async fn search(msg: search::Input, ctx: Context, _config: SharedAppConfig) -> search::Output { + if let Ok(l) = ctx.search.lock() { + match l.query(QueryRequest::new( + Dest::col_buc(msg.collection, msg.lang), + &msg.query, + )) { + Ok(res) => search::Output::found(res), + Err(e) => { + tracing::error!("{e:?}"); + search::Output::error(Error::QueryFailed) + } + } + } else { + search::Output::found(vec![]) + } +} + +pub async fn create_index( + msg: create_index::Input, + ctx: Context, + _config: SharedAppConfig, +) -> create_index::Output { + if let Ok(l) = ctx.ingest.lock() { + match l.push(PushRequest::new( + ObjDest::new(Dest::col_buc(msg.collection, msg.lang), &msg.key), + &msg.value, + )) { + Ok(_) => create_index::Output::ok(), + Err(e) => { + tracing::error!("{e:?}"); + create_index::Output::error(Error::CantCreate) + } + } + } else { + create_index::Output::ok() + } +} diff --git a/crates/search_manager/src/context.rs b/crates/search_manager/src/context.rs new file mode 100644 index 0000000..580eb1e --- /dev/null +++ b/crates/search_manager/src/context.rs @@ -0,0 +1,40 @@ +use std::sync::{Arc, Mutex}; + +use config::SharedAppConfig; +use sonic_channel::SonicChannel; + +#[derive(Clone)] +pub struct Context { + pub search: Arc>, + pub ingest: Arc>, +} + +impl Context { + pub fn new(config: SharedAppConfig) -> Option { + let enabled = config.lock().search().search_active(); + if enabled { + let search = { + let l = config.lock(); + sonic_channel::SearchChannel::start( + l.search().sonic_search_addr(), + l.search().sonic_search_pass(), + ) + .unwrap_or_else(|e| panic!("Failed to connect to sonic search channel. {}", e)) + }; + let ingest = { + let l = config.lock(); + sonic_channel::IngestChannel::start( + l.search().sonic_ingest_addr(), + l.search().sonic_ingest_pass(), + ) + .unwrap_or_else(|e| panic!("Failed to connect to sonic ingest channel. {}", e)) + }; + Some(Self { + search: Arc::new(Mutex::new(search)), + ingest: Arc::new(Mutex::new(ingest)), + }) + } else { + None + } + } +} diff --git a/crates/search_manager/src/lib.rs b/crates/search_manager/src/lib.rs deleted file mode 100644 index 0da862e..0000000 --- a/crates/search_manager/src/lib.rs +++ /dev/null @@ -1,155 +0,0 @@ -use std::sync::{Arc, Mutex}; - -use config::SharedAppConfig; -use sonic_channel::{Dest, ObjDest, PushRequest, QueryRequest, SonicChannel}; - -#[macro_export] -macro_rules! search_async_handler { - ($msg: ty, async fn call($($argv: ident : $arg_t: ty,)*) -> Result> $body: block) => { - impl $msg { - async fn call ( $($argv : $arg_t,)* ) -> Result> $body - } - - impl actix::Handler<$msg> for SearchManager { - type Result = actix::ResponseActFuture>>; - - fn handle(&mut self, msg: $msg, _ctx: &mut Self::Context) -> Self::Result { - use actix::WrapFuture; - match self.channels.clone() { - Some(channels) => { - let config = self.config.clone(); - Box::pin(async { <$msg>::call(msg, channels, config).await }.into_actor(self)) - } - None => Box::pin(async { Ok(None) }.into_actor(self)), - } - } - } - } -} - -#[derive(Debug, Copy, Clone, serde::Serialize, thiserror::Error)] -#[serde(rename_all = "kebab-case", tag = "search")] -pub enum Error { - #[error("Can't create index")] - CantCreate, - #[error("Failed to find records in bucket")] - QueryFailed, -} - -pub type Result = std::result::Result; - -#[derive(Clone)] -pub struct Channels { - search: Arc>, - ingest: Arc>, -} - -#[derive(Clone)] -pub struct SearchManager { - channels: Option, - config: SharedAppConfig, -} - -impl SearchManager { - pub fn new(config: SharedAppConfig) -> Self { - let enabled = config.lock().search().search_active(); - - let channels = if enabled { - let search = { - let l = config.lock(); - Arc::new(Mutex::new( - sonic_channel::SearchChannel::start( - l.search().sonic_search_addr(), - l.search().sonic_search_pass(), - ) - .unwrap_or_else(|e| panic!("Failed to connect to sonic search channel. {}", e)), - )) - }; - let ingest = { - let l = config.lock(); - Arc::new(Mutex::new( - sonic_channel::IngestChannel::start( - l.search().sonic_ingest_addr(), - l.search().sonic_ingest_pass(), - ) - .unwrap_or_else(|e| panic!("Failed to connect to sonic ingest channel. {}", e)), - )) - }; - Some(Channels { search, ingest }) - } else { - None - }; - Self { channels, config } - } -} - -impl actix::Actor for SearchManager { - type Context = actix::Context; -} - -#[derive(actix::Message)] -#[rtype(result = "Result>>")] -pub struct Search { - pub query: String, - pub collection: String, - pub lang: String, -} - -pub type StringList = Vec; - -search_async_handler!( - Search, - async fn call( - msg: Search, - channels: Channels, - _config: SharedAppConfig, - ) -> Result> { - if let Ok(l) = channels.search.lock() { - match l.query(QueryRequest::new( - Dest::col_buc(msg.collection, msg.lang), - &msg.query, - )) { - Ok(res) => Ok(Some(res)), - Err(e) => { - tracing::error!("{e:?}"); - Err(Error::QueryFailed) - } - } - } else { - Ok(None) - } - } -); - -#[derive(actix::Message)] -#[rtype(result = "Result>")] -pub struct CreateIndex { - pub key: String, - pub value: String, - pub collection: String, - pub lang: String, -} - -search_async_handler!( - CreateIndex, - async fn call( - msg: CreateIndex, - channels: Channels, - _config: SharedAppConfig, - ) -> Result> { - if let Ok(l) = channels.ingest.lock() { - match l.push(PushRequest::new( - ObjDest::new(Dest::col_buc(msg.collection, msg.lang), &msg.key), - &msg.value, - )) { - Ok(_) => Ok(Some(())), - Err(e) => { - tracing::error!("{e:?}"); - Err(Error::CantCreate) - } - } - } else { - Ok(Some(())) - } - } -); diff --git a/crates/search_manager/src/main.rs b/crates/search_manager/src/main.rs index 6ad6dd5..534c195 100644 --- a/crates/search_manager/src/main.rs +++ b/crates/search_manager/src/main.rs @@ -1,19 +1,16 @@ #![feature(structural_match)] use std::env; -use std::sync::{Arc, RwLock}; -use config::{SharedAppConfig, UpdateConfig}; -use search_manager::Channels; -use sonic_channel::SonicChannel; +use config::UpdateConfig; +pub use context::*; use tracing_subscriber::fmt::format::FmtSpan; use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::util::SubscriberInitExt; -// pub mod actions; -// pub mod db; -// pub mod mqtt; -// pub mod rpc; +pub mod actions; +pub mod context; +pub mod rpc; pub type Result = std::result::Result; @@ -35,45 +32,6 @@ pub struct Opts {} impl UpdateConfig for Opts {} -mod rpc {} - -#[derive(Clone)] -pub struct Context { - search: Arc>, - ingest: Arc>, -} -impl Context { - pub fn new(config: SharedAppConfig) -> Option { - let enabled = config.lock().search().search_active(); - - if enabled { - let search = { - let l = config.lock(); - Arc::new(RwLock::new( - sonic_channel::SearchChannel::start( - l.search().sonic_search_addr(), - l.search().sonic_search_pass(), - ) - .unwrap_or_else(|e| panic!("Failed to connect to sonic search channel. {}", e)), - )) - }; - let ingest = { - let l = config.lock(); - Arc::new(RwLock::new( - sonic_channel::IngestChannel::start( - l.search().sonic_ingest_addr(), - l.search().sonic_ingest_pass(), - ) - .unwrap_or_else(|e| panic!("Failed to connect to sonic ingest channel. {}", e)), - )) - }; - Some(Self { search, ingest }) - } else { - None - } - } -} - #[actix::main] async fn main() { dotenv::dotenv().ok(); @@ -82,11 +40,12 @@ async fn main() { let opts = Opts {}; let config = config::default_load(&opts); + let ctx = Context::new(config.clone()).unwrap_or_else(|| { + tracing::info!("Search is disabled"); + std::process::exit(0); + }); - let db = db::Database::build(config.clone()).await; - - // let mqtt_client = mqtt::start(config.clone(), db.clone()).await; - // rpc::start(config.clone(), db.clone(), mqtt_client.clone()).await; + rpc::start(config.clone(), ctx.clone()).await; } pub fn init_tracing(_service_name: &str) { diff --git a/crates/search_manager/src/rpc.rs b/crates/search_manager/src/rpc.rs new file mode 100644 index 0000000..e1423b6 --- /dev/null +++ b/crates/search_manager/src/rpc.rs @@ -0,0 +1,40 @@ +use channels::search::rpc::Search; +use channels::search::{create_index, search}; +use config::SharedAppConfig; +use tarpc::context; + +use crate::context::Context; + +#[derive(Clone)] +pub struct SearchServer { + ctx: Context, + config: SharedAppConfig, +} + +#[tarpc::server] +impl Search for SearchServer { + async fn search(self, _: context::Context, input: search::Input) -> search::Output { + crate::actions::search(input, self.ctx, self.config).await + } + + async fn create_index( + self, + _: context::Context, + input: create_index::Input, + ) -> create_index::Output { + crate::actions::create_index(input, self.ctx, self.config).await + } +} + +pub async fn start(config: SharedAppConfig, ctx: Context) { + let port = { config.lock().search().rpc_port }; + + channels::rpc::start("search", port, || { + SearchServer { + ctx: ctx.clone(), + config: config.clone(), + } + .serve() + }) + .await; +}