#![feature(drain_filter)] use std::io::Write; use std::str::FromStr; use actix::Actor; use actix_session::storage::RedisActorSessionStore; use actix_session::SessionMiddleware; use actix_web::middleware::Logger; use actix_web::web::Data; use actix_web::{App, HttpServer}; use channels::Lang; use config::UpdateConfig; use jemallocator::Jemalloc; use model::{AccountState, Email, Login, PassHash, Password, Role}; use opts::{ Command, CreateAccountCmd, CreateAccountOpts, GenerateHashOpts, Opts, ServerOpts, TestMailerOpts, }; use rumqttc::Incoming; use validator::{validate_email, validate_length}; use crate::opts::ReIndexOpts; mod opts; pub mod routes; #[global_allocator] static GLOBAL: Jemalloc = Jemalloc; #[derive(Debug, thiserror::Error)] pub enum Error { #[error("Failed to boot. {0:?}")] Boot(std::io::Error), #[error("Unable to read password file. {0:?}")] PassFile(std::io::Error), #[error("Unable to read password from STDIN. {0:?}")] ReadPass(std::io::Error), } pub type Result = std::result::Result; async fn server(opts: ServerOpts) -> Result<()> { let redis_connection_string = "127.0.0.1:6379"; let app_config = config::default_load(&opts); let db = database_manager::Database::build(app_config.clone()) .await .start(); let token_manager = token_manager::TokenManager::new(app_config.clone(), db.clone()).start(); let order_manager = order_manager::OrderManager::new(app_config.clone(), db.clone()).start(); let payment_manager = payment_manager::PaymentManager::build(app_config.clone(), db.clone()) .await .start(); let search_manager = channels::search::rpc::create_client(app_config.clone()).await; let fs_manager = fs_manager::FsManager::build(app_config.clone()) .await .expect("Failed to initialize file system storage"); let cart_manager = channels::carts::rpc::create_client(app_config.clone()).await; let account_manager = channels::accounts::rpc::create_client(app_config.clone()).await; let addr = { let l = app_config.lock(); let w = l.web(); (w.bind().unwrap_or(opts.bind), w.port().unwrap_or(opts.port)) }; println!("Listen at {:?}", addr); HttpServer::new(move || { let config = app_config.clone(); App::new() .wrap(Logger::default()) .wrap(SessionMiddleware::new( RedisActorSessionStore::new(redis_connection_string), { let l = config.lock(); l.web().session_secret() }, )) .wrap(actix_web::middleware::Compress::default()) .wrap(actix_web::middleware::NormalizePath::trim()) .app_data(Data::new(config)) .app_data(Data::new(db.clone())) .app_data(Data::new(token_manager.clone())) .app_data(Data::new(order_manager.clone())) .app_data(Data::new(payment_manager.clone())) .app_data(Data::new(search_manager.clone())) .app_data(Data::new(fs_manager.clone())) .app_data(Data::new(cart_manager.clone())) .app_data(Data::new(account_manager.clone())) .configure(routes::configure) .service({ let l = app_config.lock(); actix_files::Files::new(&l.files().public_path(), l.files().local_path()) .use_etag(true) }) .default_service(actix_web::web::to(actix_web::HttpResponse::Ok)) }) .bind(addr) .map_err(Error::Boot)? .run() .await .map_err(Error::Boot) } async fn generate_hash(_opts: GenerateHashOpts) -> Result<()> { model::print_hash(); Ok(()) } async fn create_account(opts: CreateAccountOpts) -> Result<()> { let (role, opts) = match opts.cmd.expect("Account type is mandatory") { CreateAccountCmd::Admin(opts) => (Role::Admin, opts), CreateAccountCmd::User(opts) => (Role::User, opts), }; if !validate_email(&opts.email) { panic!("Invalid email address"); } if !validate_length(&opts.login, Some(4), Some(100), None) { panic!("Login must have at least 4 characters and no more than 100"); } let config = config::default_load(&opts); let pass = match opts.pass_file { Some(path) => std::fs::read_to_string(path).map_err(Error::PassFile)?, None => { let mut s = String::with_capacity(100); { let mut std_out = std::io::stdout(); let std_in = std::io::stdin(); std_out .write_all(b"PASS > ") .expect("Failed to write to stdout"); std_out.flush().expect("Failed to write to stdout"); std_in.read_line(&mut s).map_err(Error::ReadPass)?; } if let Some(pos) = s.chars().position(|c| c == '\n') { s.remove(pos); } s } }; if pass.trim().is_empty() { panic!("Password cannot be empty!"); } let channel = channels::accounts::rpc::create_client(config.clone()).await; channel .register_account( tarpc::context::current(), channels::accounts::register::Input { email: Email::from_str(&opts.email).unwrap(), login: Login::new(opts.login), password: Password::new(pass), role, }, ) .await .unwrap() .unwrap(); Ok(()) } async fn test_mailer(opts: TestMailerOpts) -> Result<()> { let config = config::default_load(&opts); opts.update_config(&mut *config.lock()); let (client, mut event_loop) = channels::emails::mqtt::create_client(config); client .emit_test(&model::Account { id: 0.into(), email: opts.receiver.unwrap(), login: Login::new("test email"), role: Role::Admin, customer_id: Default::default(), state: AccountState::Active, }) .await; loop { let msg = event_loop.poll().await.unwrap(); tracing::info!("{:?}", msg); if let rumqttc::Event::Incoming(Incoming::PubAck(_)) = msg { client.0.disconnect().await.unwrap(); break; } } println!("Success!"); Ok(()) } async fn reindex(opts: ReIndexOpts) -> Result<()> { let config = config::default_load(&opts); opts.update_config(&mut *config.lock()); let lang: Lang = opts.lang.clone().parse().unwrap(); let db = database_manager::Database::build(config.clone()) .await .start(); let search = channels::search::rpc::create_client(config.clone()).await; let products: Vec = db .send(database_manager::AllProducts) .await .unwrap() .unwrap(); tracing::info!("{:?}", products); for product in products { if let Ok(Err(e)) = search .create_index( tarpc::context::current(), channels::search::create_index::Input::new( product.id.to_string(), vec![ product.long_description.into_inner(), product.short_description.into_inner(), product.name.into_inner(), ] .join(" "), "products", "default", lang, ), ) .await { tracing::error!("{}", e); return Ok(()); } } tracing::info!("Success!"); Ok(()) } #[actix_web::main] async fn main() -> Result<()> { human_panic::setup_panic!(); dotenv::dotenv().ok(); tracing_subscriber::fmt::init(); let opts: Opts = gumdrop::Options::parse_args_default_or_exit(); match opts.cmd.unwrap_or_default() { Command::Server(opts) => server(opts).await, Command::GenerateHash(opts) => generate_hash(opts).await, Command::CreateAccount(opts) => create_account(opts).await, Command::TestMailer(opts) => test_mailer(opts).await, Command::ConfigInfo(_) => { config::config_info().await.unwrap(); Ok(()) } Command::ReIndex(opts) => reindex(opts).await, } }