Migrate search
This commit is contained in:
parent
733df891c0
commit
95b20dffd7
2
Cargo.lock
generated
2
Cargo.lock
generated
@ -692,7 +692,6 @@ dependencies = [
|
||||
"payment_manager",
|
||||
"pretty_env_logger",
|
||||
"rumqttc",
|
||||
"search_manager",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"sqlx",
|
||||
@ -3666,6 +3665,7 @@ dependencies = [
|
||||
"chrono",
|
||||
"config",
|
||||
"derive_more",
|
||||
"dotenv",
|
||||
"futures 0.3.25",
|
||||
"model",
|
||||
"opentelemetry 0.17.0",
|
||||
|
@ -18,7 +18,7 @@ actix-web-httpauth = { version = "0.6", features = [] }
|
||||
actix-web-opentelemetry = { version = "0.12", features = [] }
|
||||
async-trait = { version = "0.1", features = [] }
|
||||
bytes = { version = "1.1.0" }
|
||||
channels = { path = "../channels" }
|
||||
channels = { path = "../channels", features = ['accounts', 'carts', 'emails', 'search'] }
|
||||
chrono = { version = "0.4", features = ["serde"] }
|
||||
config = { path = "../config" }
|
||||
database_manager = { path = "../database_manager" }
|
||||
@ -38,7 +38,6 @@ parking_lot = { version = "0.12", features = [] }
|
||||
payment_manager = { path = "../payment_manager" }
|
||||
pretty_env_logger = { version = "0.4", features = [] }
|
||||
rumqttc = { version = "*" }
|
||||
search_manager = { path = "../search_manager" }
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
serde_json = { version = "1.0", features = [] }
|
||||
sqlx = { version = "0.6.2", features = ["migrate", "runtime-actix-rustls", "all-types", "postgres"] }
|
||||
|
@ -54,7 +54,7 @@ async fn server(opts: ServerOpts) -> Result<()> {
|
||||
let payment_manager = payment_manager::PaymentManager::build(app_config.clone(), db.clone())
|
||||
.await
|
||||
.start();
|
||||
let search_manager = search_manager::SearchManager::new(app_config.clone()).start();
|
||||
let search_manager = channels::search::rpc::create_client(app_config.clone()).await;
|
||||
let fs_manager = fs_manager::FsManager::build(app_config.clone())
|
||||
.await
|
||||
.expect("Failed to initialize file system storage");
|
||||
@ -200,7 +200,7 @@ async fn reindex(opts: ReIndexOpts) -> Result<()> {
|
||||
let db = database_manager::Database::build(config.clone())
|
||||
.await
|
||||
.start();
|
||||
let search = search_manager::SearchManager::new(config).start();
|
||||
let search = channels::search::rpc::create_client(config.clone()).await;
|
||||
let products: Vec<model::Product> = db
|
||||
.send(database_manager::AllProducts)
|
||||
.await
|
||||
@ -208,7 +208,9 @@ async fn reindex(opts: ReIndexOpts) -> Result<()> {
|
||||
.unwrap();
|
||||
for product in products {
|
||||
search
|
||||
.send(search_manager::CreateIndex {
|
||||
.create_index(
|
||||
tarpc::context::current(),
|
||||
channels::search::create_index::Input {
|
||||
key: product.id.to_string(),
|
||||
value: vec![
|
||||
product.long_description.into_inner(),
|
||||
@ -218,9 +220,9 @@ async fn reindex(opts: ReIndexOpts) -> Result<()> {
|
||||
.join(" "),
|
||||
collection: "products".into(),
|
||||
lang: opts.lang.clone(),
|
||||
})
|
||||
},
|
||||
)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
}
|
||||
println!("Success!");
|
||||
|
@ -8,7 +8,6 @@ use model::{
|
||||
api, Days, Price, ProductCategory, ProductId, ProductLongDesc, ProductName, ProductShortDesc,
|
||||
Quantity, QuantityUnit,
|
||||
};
|
||||
use search_manager::SearchManager;
|
||||
use serde::Deserialize;
|
||||
use token_manager::TokenManager;
|
||||
|
||||
@ -94,7 +93,7 @@ async fn create_product(
|
||||
credentials: BearerAuth,
|
||||
tm: Data<Addr<TokenManager>>,
|
||||
db: Data<Addr<Database>>,
|
||||
search: Data<Addr<SearchManager>>,
|
||||
search: Data<channels::search::rpc::SearchClient>,
|
||||
Json(payload): Json<CreateProduct>,
|
||||
) -> routes::Result<HttpResponse> {
|
||||
credentials.require_admin(tm.into_inner()).await?;
|
||||
@ -111,7 +110,10 @@ async fn create_product(
|
||||
}
|
||||
);
|
||||
|
||||
search.do_send(search_manager::CreateIndex {
|
||||
if let Err(e) = search
|
||||
.create_index(
|
||||
tarpc::context::current(),
|
||||
channels::search::create_index::Input {
|
||||
key: product.id.to_string(),
|
||||
value: vec![
|
||||
product.long_description.to_string(),
|
||||
@ -121,7 +123,12 @@ async fn create_product(
|
||||
.join(" "),
|
||||
collection: "products".into(),
|
||||
lang: payload.lang,
|
||||
});
|
||||
},
|
||||
)
|
||||
.await
|
||||
{
|
||||
tracing::error!("{}", e);
|
||||
}
|
||||
|
||||
let _ = admin_send_db!(
|
||||
db,
|
||||
|
@ -44,7 +44,6 @@ pub enum Error {
|
||||
Fs(fs_manager::Error),
|
||||
Order(order_manager::Error),
|
||||
Pay(payment_manager::Error),
|
||||
Search(search_manager::Error),
|
||||
Token(token_manager::Error),
|
||||
}
|
||||
|
||||
@ -80,7 +79,6 @@ impl Display for Error {
|
||||
Error::Fs(_e) => serde_json::to_string(&self).unwrap_or_default(),
|
||||
Error::Order(_e) => serde_json::to_string(&self).unwrap_or_default(),
|
||||
Error::Pay(_e) => serde_json::to_string(&self).unwrap_or_default(),
|
||||
Error::Search(_e) => serde_json::to_string(&self).unwrap_or_default(),
|
||||
Error::Token(_e) => serde_json::to_string(&self).unwrap_or_default(),
|
||||
};
|
||||
f.write_str(&msg)
|
||||
@ -100,7 +98,6 @@ impl ResponseError for Error {
|
||||
Error::Fs(_) => StatusCode::BAD_REQUEST,
|
||||
Error::Order(_) => StatusCode::BAD_REQUEST,
|
||||
Error::Pay(_) => StatusCode::BAD_REQUEST,
|
||||
Error::Search(_) => StatusCode::BAD_REQUEST,
|
||||
Error::Token(_) => StatusCode::BAD_REQUEST,
|
||||
}
|
||||
}
|
||||
|
@ -5,7 +5,6 @@ use config::SharedAppConfig;
|
||||
use database_manager::{query_db, Database};
|
||||
use model::Encrypt;
|
||||
use payment_manager::{PaymentManager, PaymentNotification};
|
||||
use search_manager::SearchManager;
|
||||
use token_manager::TokenManager;
|
||||
|
||||
use crate::public_send_db;
|
||||
@ -16,19 +15,24 @@ use crate::routes::{self};
|
||||
async fn search(
|
||||
db: Data<Addr<Database>>,
|
||||
_config: Data<SharedAppConfig>,
|
||||
search: Data<Addr<SearchManager>>,
|
||||
search: Data<channels::search::rpc::SearchClient>,
|
||||
query: Query<model::api::SearchRequest>,
|
||||
) -> routes::Result<Json<Vec<model::Product>>> {
|
||||
let q = query.into_inner();
|
||||
let product_ids: Vec<model::ProductId> = match search
|
||||
.send(search_manager::Search {
|
||||
.search(
|
||||
tarpc::context::current(),
|
||||
channels::search::search::Input {
|
||||
query: q.q,
|
||||
collection: "products".into(),
|
||||
lang: q.lang,
|
||||
})
|
||||
},
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(Ok(Some(res))) => res
|
||||
Ok(channels::search::search::Output {
|
||||
found: Some(res), ..
|
||||
}) => res
|
||||
.into_iter()
|
||||
.filter_map(|s| {
|
||||
s.parse::<model::RecordId>()
|
||||
@ -36,11 +40,7 @@ async fn search(
|
||||
.map(model::ProductId::from)
|
||||
})
|
||||
.collect(),
|
||||
Ok(Ok(None)) => return Ok(Json(vec![])),
|
||||
Ok(Err(e)) => {
|
||||
tracing::error!("{e}");
|
||||
return Ok(Json(vec![]));
|
||||
}
|
||||
Ok(_) => return Ok(Json(vec![])),
|
||||
Err(e) => {
|
||||
tracing::error!("{e:?}");
|
||||
return Ok(Json(vec![]));
|
||||
|
@ -1,5 +1,3 @@
|
||||
use std::time::Duration;
|
||||
|
||||
use config::SharedAppConfig;
|
||||
use rumqttc::{Event, Incoming};
|
||||
|
||||
|
@ -3,6 +3,13 @@ name = "channels"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[features]
|
||||
accounts = []
|
||||
carts = []
|
||||
emails = []
|
||||
search = []
|
||||
default = ['accounts', 'carts', 'emails', 'search']
|
||||
|
||||
[dependencies]
|
||||
bincode = { version = "*" }
|
||||
bytes = { version = "1.2.1" }
|
||||
|
@ -130,7 +130,7 @@ pub mod rpc {
|
||||
let addr = {
|
||||
let l = config.lock();
|
||||
(
|
||||
l.account_manager().bind.clone(),
|
||||
l.account_manager().rpc_bind.clone(),
|
||||
l.account_manager().rpc_port,
|
||||
)
|
||||
};
|
||||
@ -155,6 +155,6 @@ pub mod mqtt {
|
||||
use crate::AsyncClient;
|
||||
|
||||
pub fn create_client(config: SharedAppConfig) -> (AsyncClient, EventLoop) {
|
||||
crate::mqtt::create_client(CLIENT_NAME, config)
|
||||
crate::mqtt::create_client(CLIENT_NAME, config.lock().account_manager().mqtt_addr())
|
||||
}
|
||||
}
|
||||
|
@ -189,7 +189,7 @@ pub mod rpc {
|
||||
let addr = {
|
||||
let l = config.lock();
|
||||
(
|
||||
l.account_manager().bind.clone(),
|
||||
l.account_manager().rpc_bind.clone(),
|
||||
l.account_manager().rpc_port,
|
||||
)
|
||||
};
|
||||
@ -214,6 +214,6 @@ pub mod mqtt {
|
||||
use crate::AsyncClient;
|
||||
|
||||
pub fn create_client(config: SharedAppConfig) -> (AsyncClient, EventLoop) {
|
||||
crate::mqtt::create_client(CLIENT_NAME, config)
|
||||
crate::mqtt::create_client(CLIENT_NAME, config.lock().cart_manager().mqtt_addr())
|
||||
}
|
||||
}
|
||||
|
@ -92,6 +92,6 @@ pub mod mqtt {
|
||||
use crate::AsyncClient;
|
||||
|
||||
pub fn create_client(config: SharedAppConfig) -> (AsyncClient, EventLoop) {
|
||||
crate::mqtt::create_client(CLIENT_NAME, config)
|
||||
crate::mqtt::create_client(CLIENT_NAME, config.lock().email_sender().mqtt_addr())
|
||||
}
|
||||
}
|
||||
|
@ -1,10 +1,14 @@
|
||||
#![feature(structural_match)]
|
||||
|
||||
#[cfg(feature = "accounts")]
|
||||
pub mod accounts;
|
||||
#[cfg(feature = "carts")]
|
||||
pub mod carts;
|
||||
#[cfg(feature = "emails")]
|
||||
pub mod emails;
|
||||
pub mod mqtt;
|
||||
pub mod rpc;
|
||||
#[cfg(feature = "search")]
|
||||
pub mod search;
|
||||
|
||||
pub trait DeserializePayload {
|
||||
|
@ -1,19 +1,12 @@
|
||||
use std::time::Duration;
|
||||
|
||||
use config::SharedAppConfig;
|
||||
use rumqttc::EventLoop;
|
||||
|
||||
use crate::AsyncClient;
|
||||
|
||||
pub(crate) fn create_client(name: &str, config: SharedAppConfig) -> (AsyncClient, EventLoop) {
|
||||
let mut mqtt_options = {
|
||||
let l = config.lock();
|
||||
let bind = &l.account_manager().mqtt_bind;
|
||||
let port = l.account_manager().mqtt_port;
|
||||
pub(crate) fn create_client(name: &str, (bind, port): (&str, u16)) -> (AsyncClient, EventLoop) {
|
||||
tracing::info!("Starting account mqtt at {}:{}", bind, port);
|
||||
|
||||
rumqttc::MqttOptions::new(name, bind, port)
|
||||
};
|
||||
let mut mqtt_options = rumqttc::MqttOptions::new(name, bind, port);
|
||||
mqtt_options.set_keep_alive(Duration::from_secs(5));
|
||||
|
||||
let (client, event_loop) = rumqttc::AsyncClient::new(mqtt_options, 10);
|
||||
|
@ -1,4 +1,4 @@
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
#[derive(Debug, thiserror::Error, serde::Serialize, serde::Deserialize)]
|
||||
pub enum Error {
|
||||
#[error("Can't create index")]
|
||||
CantCreate,
|
||||
@ -18,10 +18,26 @@ pub mod search {
|
||||
pub lang: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, serde::Serialize, serde::Deserialize)]
|
||||
#[derive(Debug, Default, serde::Serialize, serde::Deserialize)]
|
||||
pub struct Output {
|
||||
pub found: Option<Vec<String>>,
|
||||
pub error: Error,
|
||||
pub error: Option<Error>,
|
||||
}
|
||||
|
||||
impl Output {
|
||||
pub fn found(found: Vec<String>) -> Self {
|
||||
Self {
|
||||
found: Some(found),
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
pub fn error(error: Error) -> Self {
|
||||
Self {
|
||||
error: Some(error),
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -36,9 +52,57 @@ pub mod create_index {
|
||||
pub lang: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, serde::Serialize, serde::Deserialize)]
|
||||
#[derive(Debug, Default, serde::Serialize, serde::Deserialize)]
|
||||
pub struct Output {
|
||||
pub found: Option<()>,
|
||||
pub error: Error,
|
||||
pub error: Option<Error>,
|
||||
}
|
||||
|
||||
impl Output {
|
||||
pub fn ok() -> Self {
|
||||
Self {
|
||||
found: Some(()),
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
pub fn error(error: Error) -> Self {
|
||||
Self {
|
||||
error: Some(error),
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub mod rpc {
|
||||
use config::SharedAppConfig;
|
||||
|
||||
use crate::search::{create_index, search};
|
||||
|
||||
#[tarpc::service]
|
||||
pub trait Search {
|
||||
/// Search all matching indices.
|
||||
async fn search(input: search::Input) -> search::Output;
|
||||
|
||||
/// Create new search index.
|
||||
async fn create_index(input: create_index::Input) -> create_index::Output;
|
||||
}
|
||||
|
||||
pub async fn create_client(config: SharedAppConfig) -> SearchClient {
|
||||
use tarpc::client;
|
||||
use tarpc::tokio_serde::formats::Bincode;
|
||||
|
||||
let l = config.lock();
|
||||
let transport =
|
||||
tarpc::serde_transport::tcp::connect(l.search().rpc_addr(), Bincode::default);
|
||||
|
||||
let client = SearchClient::new(
|
||||
client::Config::default(),
|
||||
transport.await.expect("Failed to connect to search server"),
|
||||
)
|
||||
.spawn();
|
||||
|
||||
client
|
||||
}
|
||||
}
|
||||
|
@ -313,6 +313,8 @@ pub struct SearchConfig {
|
||||
sonic_search_pass: Option<String>,
|
||||
sonic_ingest_addr: Option<String>,
|
||||
sonic_ingest_pass: Option<String>,
|
||||
pub rpc_port: u16,
|
||||
pub rpc_bind: String,
|
||||
#[serde(default)]
|
||||
search_active: bool,
|
||||
}
|
||||
@ -326,12 +328,18 @@ impl Default for SearchConfig {
|
||||
sonic_search_pass: Some("SecretPassword".into()),
|
||||
sonic_ingest_addr: Some("0.0.0.0:1491".into()),
|
||||
sonic_ingest_pass: Some("SecretPassword".into()),
|
||||
rpc_port: 19332,
|
||||
rpc_bind: "0.0.0.0".into(),
|
||||
search_active: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl SearchConfig {
|
||||
pub fn rpc_addr(&self) -> (&str, u16) {
|
||||
(&self.rpc_bind, self.rpc_port)
|
||||
}
|
||||
|
||||
pub fn sonic_search_addr(&self) -> String {
|
||||
self.sonic_search_addr
|
||||
.as_ref()
|
||||
@ -410,7 +418,7 @@ impl FilesConfig {
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct AccountManagerConfig {
|
||||
pub rpc_port: u16,
|
||||
pub bind: String,
|
||||
pub rpc_bind: String,
|
||||
pub mqtt_port: u16,
|
||||
pub mqtt_bind: String,
|
||||
pub database_url: String,
|
||||
@ -420,7 +428,7 @@ impl Default for AccountManagerConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
rpc_port: 19329,
|
||||
bind: "0.0.0.0".into(),
|
||||
rpc_bind: "0.0.0.0".into(),
|
||||
mqtt_port: 1883,
|
||||
mqtt_bind: "0.0.0.0".into(),
|
||||
database_url: "postgres://postgres@localhost/bazzar_accounts".into(),
|
||||
@ -430,10 +438,20 @@ impl Default for AccountManagerConfig {
|
||||
|
||||
impl Example for AccountManagerConfig {}
|
||||
|
||||
impl AccountManagerConfig {
|
||||
pub fn rpc_addr(&self) -> (&str, u16) {
|
||||
(&self.rpc_bind, self.rpc_port)
|
||||
}
|
||||
|
||||
pub fn mqtt_addr(&self) -> (&str, u16) {
|
||||
(&self.mqtt_bind, self.mqtt_port)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct CartManagerConfig {
|
||||
pub rpc_port: u16,
|
||||
pub bind: String,
|
||||
pub rpc_bind: String,
|
||||
pub mqtt_port: u16,
|
||||
pub mqtt_bind: String,
|
||||
pub database_url: String,
|
||||
@ -443,7 +461,7 @@ impl Default for CartManagerConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
rpc_port: 19330,
|
||||
bind: "0.0.0.0".into(),
|
||||
rpc_bind: "0.0.0.0".into(),
|
||||
mqtt_port: 1884,
|
||||
mqtt_bind: "0.0.0.0".into(),
|
||||
database_url: "postgres://postgres@localhost/bazzar_carts".into(),
|
||||
@ -453,10 +471,19 @@ impl Default for CartManagerConfig {
|
||||
|
||||
impl Example for CartManagerConfig {}
|
||||
|
||||
impl CartManagerConfig {
|
||||
pub fn rpc_addr(&self) -> (&str, u16) {
|
||||
(&self.rpc_bind, self.rpc_port)
|
||||
}
|
||||
pub fn mqtt_addr(&self) -> (&str, u16) {
|
||||
(&self.mqtt_bind, self.mqtt_port)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct EmailSenderConfig {
|
||||
pub rpc_port: u16,
|
||||
pub bind: String,
|
||||
pub rpc_bind: String,
|
||||
pub mqtt_port: u16,
|
||||
pub mqtt_bind: String,
|
||||
pub database_url: String,
|
||||
@ -466,7 +493,7 @@ impl Default for EmailSenderConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
rpc_port: 19331,
|
||||
bind: "0.0.0.0".into(),
|
||||
rpc_bind: "0.0.0.0".into(),
|
||||
mqtt_port: 1885,
|
||||
mqtt_bind: "0.0.0.0".into(),
|
||||
database_url: "postgres://postgres@localhost/bazzar_emails".into(),
|
||||
@ -475,6 +502,16 @@ impl Default for EmailSenderConfig {
|
||||
}
|
||||
impl Example for EmailSenderConfig {}
|
||||
|
||||
impl EmailSenderConfig {
|
||||
pub fn rpc_addr(&self) -> (&str, u16) {
|
||||
(&self.rpc_bind, self.rpc_port)
|
||||
}
|
||||
|
||||
pub fn mqtt_addr(&self) -> (&str, u16) {
|
||||
(&self.mqtt_bind, self.mqtt_port)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct AppConfig {
|
||||
#[serde(default)]
|
||||
|
@ -14,6 +14,7 @@ channels = { path = "../channels" }
|
||||
chrono = { version = "0.4", features = ["serde"] }
|
||||
config = { path = "../config" }
|
||||
derive_more = { version = "0.99", features = [] }
|
||||
dotenv = { version = "0.15.0" }
|
||||
futures = { version = "0.3.25" }
|
||||
model = { path = "../model" }
|
||||
opentelemetry = { version = "0.17.0" }
|
||||
|
43
crates/search_manager/src/actions.rs
Normal file
43
crates/search_manager/src/actions.rs
Normal file
@ -0,0 +1,43 @@
|
||||
use channels::search::{create_index, search, Error};
|
||||
use config::SharedAppConfig;
|
||||
use sonic_channel::{Dest, ObjDest, PushRequest, QueryRequest};
|
||||
|
||||
use crate::context::Context;
|
||||
|
||||
pub async fn search(msg: search::Input, ctx: Context, _config: SharedAppConfig) -> search::Output {
|
||||
if let Ok(l) = ctx.search.lock() {
|
||||
match l.query(QueryRequest::new(
|
||||
Dest::col_buc(msg.collection, msg.lang),
|
||||
&msg.query,
|
||||
)) {
|
||||
Ok(res) => search::Output::found(res),
|
||||
Err(e) => {
|
||||
tracing::error!("{e:?}");
|
||||
search::Output::error(Error::QueryFailed)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
search::Output::found(vec![])
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn create_index(
|
||||
msg: create_index::Input,
|
||||
ctx: Context,
|
||||
_config: SharedAppConfig,
|
||||
) -> create_index::Output {
|
||||
if let Ok(l) = ctx.ingest.lock() {
|
||||
match l.push(PushRequest::new(
|
||||
ObjDest::new(Dest::col_buc(msg.collection, msg.lang), &msg.key),
|
||||
&msg.value,
|
||||
)) {
|
||||
Ok(_) => create_index::Output::ok(),
|
||||
Err(e) => {
|
||||
tracing::error!("{e:?}");
|
||||
create_index::Output::error(Error::CantCreate)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
create_index::Output::ok()
|
||||
}
|
||||
}
|
40
crates/search_manager/src/context.rs
Normal file
40
crates/search_manager/src/context.rs
Normal file
@ -0,0 +1,40 @@
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use config::SharedAppConfig;
|
||||
use sonic_channel::SonicChannel;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Context {
|
||||
pub search: Arc<Mutex<sonic_channel::SearchChannel>>,
|
||||
pub ingest: Arc<Mutex<sonic_channel::IngestChannel>>,
|
||||
}
|
||||
|
||||
impl Context {
|
||||
pub fn new(config: SharedAppConfig) -> Option<Self> {
|
||||
let enabled = config.lock().search().search_active();
|
||||
if enabled {
|
||||
let search = {
|
||||
let l = config.lock();
|
||||
sonic_channel::SearchChannel::start(
|
||||
l.search().sonic_search_addr(),
|
||||
l.search().sonic_search_pass(),
|
||||
)
|
||||
.unwrap_or_else(|e| panic!("Failed to connect to sonic search channel. {}", e))
|
||||
};
|
||||
let ingest = {
|
||||
let l = config.lock();
|
||||
sonic_channel::IngestChannel::start(
|
||||
l.search().sonic_ingest_addr(),
|
||||
l.search().sonic_ingest_pass(),
|
||||
)
|
||||
.unwrap_or_else(|e| panic!("Failed to connect to sonic ingest channel. {}", e))
|
||||
};
|
||||
Some(Self {
|
||||
search: Arc::new(Mutex::new(search)),
|
||||
ingest: Arc::new(Mutex::new(ingest)),
|
||||
})
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
@ -1,155 +0,0 @@
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use config::SharedAppConfig;
|
||||
use sonic_channel::{Dest, ObjDest, PushRequest, QueryRequest, SonicChannel};
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! search_async_handler {
|
||||
($msg: ty, async fn call($($argv: ident : $arg_t: ty,)*) -> Result<Option< $res: ty >> $body: block) => {
|
||||
impl $msg {
|
||||
async fn call ( $($argv : $arg_t,)* ) -> Result<Option< $res >> $body
|
||||
}
|
||||
|
||||
impl actix::Handler<$msg> for SearchManager {
|
||||
type Result = actix::ResponseActFuture<Self, Result<Option<$res>>>;
|
||||
|
||||
fn handle(&mut self, msg: $msg, _ctx: &mut Self::Context) -> Self::Result {
|
||||
use actix::WrapFuture;
|
||||
match self.channels.clone() {
|
||||
Some(channels) => {
|
||||
let config = self.config.clone();
|
||||
Box::pin(async { <$msg>::call(msg, channels, config).await }.into_actor(self))
|
||||
}
|
||||
None => Box::pin(async { Ok(None) }.into_actor(self)),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Copy, Clone, serde::Serialize, thiserror::Error)]
|
||||
#[serde(rename_all = "kebab-case", tag = "search")]
|
||||
pub enum Error {
|
||||
#[error("Can't create index")]
|
||||
CantCreate,
|
||||
#[error("Failed to find records in bucket")]
|
||||
QueryFailed,
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Channels {
|
||||
search: Arc<Mutex<sonic_channel::SearchChannel>>,
|
||||
ingest: Arc<Mutex<sonic_channel::IngestChannel>>,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct SearchManager {
|
||||
channels: Option<Channels>,
|
||||
config: SharedAppConfig,
|
||||
}
|
||||
|
||||
impl SearchManager {
|
||||
pub fn new(config: SharedAppConfig) -> Self {
|
||||
let enabled = config.lock().search().search_active();
|
||||
|
||||
let channels = if enabled {
|
||||
let search = {
|
||||
let l = config.lock();
|
||||
Arc::new(Mutex::new(
|
||||
sonic_channel::SearchChannel::start(
|
||||
l.search().sonic_search_addr(),
|
||||
l.search().sonic_search_pass(),
|
||||
)
|
||||
.unwrap_or_else(|e| panic!("Failed to connect to sonic search channel. {}", e)),
|
||||
))
|
||||
};
|
||||
let ingest = {
|
||||
let l = config.lock();
|
||||
Arc::new(Mutex::new(
|
||||
sonic_channel::IngestChannel::start(
|
||||
l.search().sonic_ingest_addr(),
|
||||
l.search().sonic_ingest_pass(),
|
||||
)
|
||||
.unwrap_or_else(|e| panic!("Failed to connect to sonic ingest channel. {}", e)),
|
||||
))
|
||||
};
|
||||
Some(Channels { search, ingest })
|
||||
} else {
|
||||
None
|
||||
};
|
||||
Self { channels, config }
|
||||
}
|
||||
}
|
||||
|
||||
impl actix::Actor for SearchManager {
|
||||
type Context = actix::Context<Self>;
|
||||
}
|
||||
|
||||
#[derive(actix::Message)]
|
||||
#[rtype(result = "Result<Option<Vec<String>>>")]
|
||||
pub struct Search {
|
||||
pub query: String,
|
||||
pub collection: String,
|
||||
pub lang: String,
|
||||
}
|
||||
|
||||
pub type StringList = Vec<String>;
|
||||
|
||||
search_async_handler!(
|
||||
Search,
|
||||
async fn call(
|
||||
msg: Search,
|
||||
channels: Channels,
|
||||
_config: SharedAppConfig,
|
||||
) -> Result<Option<StringList>> {
|
||||
if let Ok(l) = channels.search.lock() {
|
||||
match l.query(QueryRequest::new(
|
||||
Dest::col_buc(msg.collection, msg.lang),
|
||||
&msg.query,
|
||||
)) {
|
||||
Ok(res) => Ok(Some(res)),
|
||||
Err(e) => {
|
||||
tracing::error!("{e:?}");
|
||||
Err(Error::QueryFailed)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
#[derive(actix::Message)]
|
||||
#[rtype(result = "Result<Option<()>>")]
|
||||
pub struct CreateIndex {
|
||||
pub key: String,
|
||||
pub value: String,
|
||||
pub collection: String,
|
||||
pub lang: String,
|
||||
}
|
||||
|
||||
search_async_handler!(
|
||||
CreateIndex,
|
||||
async fn call(
|
||||
msg: CreateIndex,
|
||||
channels: Channels,
|
||||
_config: SharedAppConfig,
|
||||
) -> Result<Option<()>> {
|
||||
if let Ok(l) = channels.ingest.lock() {
|
||||
match l.push(PushRequest::new(
|
||||
ObjDest::new(Dest::col_buc(msg.collection, msg.lang), &msg.key),
|
||||
&msg.value,
|
||||
)) {
|
||||
Ok(_) => Ok(Some(())),
|
||||
Err(e) => {
|
||||
tracing::error!("{e:?}");
|
||||
Err(Error::CantCreate)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
Ok(Some(()))
|
||||
}
|
||||
}
|
||||
);
|
@ -1,19 +1,16 @@
|
||||
#![feature(structural_match)]
|
||||
|
||||
use std::env;
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
||||
use config::{SharedAppConfig, UpdateConfig};
|
||||
use search_manager::Channels;
|
||||
use sonic_channel::SonicChannel;
|
||||
use config::UpdateConfig;
|
||||
pub use context::*;
|
||||
use tracing_subscriber::fmt::format::FmtSpan;
|
||||
use tracing_subscriber::layer::SubscriberExt;
|
||||
use tracing_subscriber::util::SubscriberInitExt;
|
||||
|
||||
// pub mod actions;
|
||||
// pub mod db;
|
||||
// pub mod mqtt;
|
||||
// pub mod rpc;
|
||||
pub mod actions;
|
||||
pub mod context;
|
||||
pub mod rpc;
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
|
||||
@ -35,45 +32,6 @@ pub struct Opts {}
|
||||
|
||||
impl UpdateConfig for Opts {}
|
||||
|
||||
mod rpc {}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Context {
|
||||
search: Arc<RwLock<sonic_channel::SearchChannel>>,
|
||||
ingest: Arc<RwLock<sonic_channel::IngestChannel>>,
|
||||
}
|
||||
impl Context {
|
||||
pub fn new(config: SharedAppConfig) -> Option<Self> {
|
||||
let enabled = config.lock().search().search_active();
|
||||
|
||||
if enabled {
|
||||
let search = {
|
||||
let l = config.lock();
|
||||
Arc::new(RwLock::new(
|
||||
sonic_channel::SearchChannel::start(
|
||||
l.search().sonic_search_addr(),
|
||||
l.search().sonic_search_pass(),
|
||||
)
|
||||
.unwrap_or_else(|e| panic!("Failed to connect to sonic search channel. {}", e)),
|
||||
))
|
||||
};
|
||||
let ingest = {
|
||||
let l = config.lock();
|
||||
Arc::new(RwLock::new(
|
||||
sonic_channel::IngestChannel::start(
|
||||
l.search().sonic_ingest_addr(),
|
||||
l.search().sonic_ingest_pass(),
|
||||
)
|
||||
.unwrap_or_else(|e| panic!("Failed to connect to sonic ingest channel. {}", e)),
|
||||
))
|
||||
};
|
||||
Some(Self { search, ingest })
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[actix::main]
|
||||
async fn main() {
|
||||
dotenv::dotenv().ok();
|
||||
@ -82,11 +40,12 @@ async fn main() {
|
||||
let opts = Opts {};
|
||||
|
||||
let config = config::default_load(&opts);
|
||||
let ctx = Context::new(config.clone()).unwrap_or_else(|| {
|
||||
tracing::info!("Search is disabled");
|
||||
std::process::exit(0);
|
||||
});
|
||||
|
||||
let db = db::Database::build(config.clone()).await;
|
||||
|
||||
// let mqtt_client = mqtt::start(config.clone(), db.clone()).await;
|
||||
// rpc::start(config.clone(), db.clone(), mqtt_client.clone()).await;
|
||||
rpc::start(config.clone(), ctx.clone()).await;
|
||||
}
|
||||
|
||||
pub fn init_tracing(_service_name: &str) {
|
||||
|
40
crates/search_manager/src/rpc.rs
Normal file
40
crates/search_manager/src/rpc.rs
Normal file
@ -0,0 +1,40 @@
|
||||
use channels::search::rpc::Search;
|
||||
use channels::search::{create_index, search};
|
||||
use config::SharedAppConfig;
|
||||
use tarpc::context;
|
||||
|
||||
use crate::context::Context;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct SearchServer {
|
||||
ctx: Context,
|
||||
config: SharedAppConfig,
|
||||
}
|
||||
|
||||
#[tarpc::server]
|
||||
impl Search for SearchServer {
|
||||
async fn search(self, _: context::Context, input: search::Input) -> search::Output {
|
||||
crate::actions::search(input, self.ctx, self.config).await
|
||||
}
|
||||
|
||||
async fn create_index(
|
||||
self,
|
||||
_: context::Context,
|
||||
input: create_index::Input,
|
||||
) -> create_index::Output {
|
||||
crate::actions::create_index(input, self.ctx, self.config).await
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn start(config: SharedAppConfig, ctx: Context) {
|
||||
let port = { config.lock().search().rpc_port };
|
||||
|
||||
channels::rpc::start("search", port, || {
|
||||
SearchServer {
|
||||
ctx: ctx.clone(),
|
||||
config: config.clone(),
|
||||
}
|
||||
.serve()
|
||||
})
|
||||
.await;
|
||||
}
|
Loading…
Reference in New Issue
Block a user