diff --git a/Cargo.lock b/Cargo.lock index 10a62f8..d04de33 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -232,6 +232,17 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" +[[package]] +name = "agent" +version = "0.1.0" +dependencies = [ + "database", + "events", + "futures 0.3.30", + "tokio", + "web", +] + [[package]] name = "ahash" version = "0.7.7" @@ -929,7 +940,7 @@ version = "0.1.0" dependencies = [ "bincode", "bytes 1.5.0", - "futures 0.3.29", + "futures 0.3.30", "futures-util", "rumqttc", "serde", @@ -1002,9 +1013,9 @@ checksum = "3a471a38ef8ed83cd6e40aa59c1ffe17db6855c18e3604d9c4ed8c08ebc28678" [[package]] name = "futures" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da0290714b38af9b4a7b094b8a37086d1b4e61f2df9122c3cad2577669145335" +checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" dependencies = [ "futures-channel", "futures-core", @@ -1017,9 +1028,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ff4dd66668b557604244583e3e1e1eada8c5c2e96a6d0d6653ede395b78bbacb" +checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" dependencies = [ "futures-core", "futures-sink", @@ -1027,15 +1038,15 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb1d22c66e66d9d72e1758f0bd7d4fd0bee04cad842ee34587d68c07e45d088c" +checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" [[package]] name = "futures-executor" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f4fb8693db0cf099eadcca0efe2a5a22e4550f98ed16aba6c48700da29597bc" +checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d" dependencies = [ "futures-core", "futures-task", @@ -1056,15 +1067,15 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8bf34a163b5c4c52d0478a4d757da8fb65cabef42ba90515efee0f6f9fa45aaa" +checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" [[package]] name = "futures-macro" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "53b153fd91e4b0147f4aced87be237c98248656bb01050b96bf3ee89220a8ddb" +checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ "proc-macro2", "quote", @@ -1073,21 +1084,21 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e36d3378ee38c2a36ad710c5d30c2911d752cb941c00c72dbabfb786a7970817" +checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5" [[package]] name = "futures-task" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "efd193069b0ddadc69c46389b740bbccdd97203899b48d09c5f7969591d6bae2" +checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" [[package]] name = "futures-util" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a19526d624e703a3179b3d322efec918b6246ea0fa51d41124525f00f1cc8104" +checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" dependencies = [ "futures 0.1.31", "futures-channel", @@ -1301,6 +1312,7 @@ dependencies = [ name = "identity-agent" version = "0.1.0" dependencies = [ + "async-trait", "database", "events", "tokio", @@ -1976,7 +1988,7 @@ name = "router" version = "0.1.0" dependencies = [ "chrono", - "futures 0.3.29", + "futures 0.3.30", "futures-util", "serde", "serde_toml", @@ -2166,7 +2178,7 @@ dependencies = [ "async-trait", "bigdecimal", "chrono", - "futures 0.3.29", + "futures 0.3.30", "log", "ouroboros", "rust_decimal", @@ -2224,7 +2236,7 @@ dependencies = [ "async-trait", "clap", "dotenvy", - "futures 0.3.29", + "futures 0.3.30", "sea-orm", "sea-orm-cli", "sea-schema", @@ -2285,7 +2297,7 @@ version = "0.14.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0cd9561232bd1b82ea748b581f15909d11de0db6563ddcf28c5d908aee8282f1" dependencies = [ - "futures 0.3.29", + "futures 0.3.30", "sea-query", "sea-schema-derive", ] @@ -2400,18 +2412,10 @@ name = "sessions-agent" version = "0.1.0" dependencies = [ "actix", - "actix-web", - "chrono", + "async-trait", + "database", "events", - "futures 0.3.29", - "futures-util", - "serde", - "serde_toml", - "thiserror", "tokio", - "toml", - "tracing", - "uuid", ] [[package]] @@ -2450,7 +2454,7 @@ name = "shared-config" version = "0.1.0" dependencies = [ "chrono", - "futures 0.3.29", + "futures 0.3.30", "futures-util", "serde", "serde_toml", @@ -2944,6 +2948,7 @@ dependencies = [ "signal-hook-registry", "socket2", "tokio-macros", + "tracing", "windows-sys 0.48.0", ] @@ -3247,6 +3252,16 @@ version = "0.2.89" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ab9b36309365056cd639da3134bf87fa8f3d86008abf99e612384a6eecd459f" +[[package]] +name = "web" +version = "0.1.0" +dependencies = [ + "actix", + "actix-web", + "futures 0.3.30", + "tokio", +] + [[package]] name = "webpki-roots" version = "0.25.3" diff --git a/Cargo.toml b/Cargo.toml index 2126e82..eff6171 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,13 +3,17 @@ resolver = "2" members = [ "crates/database", "crates/events", + "crates/agent", + "crates/web", "crates/router", - "crates/sessions-agent", "crates/shared-config", "crates/zt", - "bins/identity-agent" + "bins/sessions-agent", + "bins/identity-agent", ] [workspace.dependencies] database = { path = "./crates/database" } events = { path = "./crates/events" } +web = { path = "./crates/web" } +agent = { path = "./crates/agent" } diff --git a/bins/identity-agent/Cargo.toml b/bins/identity-agent/Cargo.toml index 3a079a9..ff518a2 100644 --- a/bins/identity-agent/Cargo.toml +++ b/bins/identity-agent/Cargo.toml @@ -7,4 +7,5 @@ publish = false [dependencies] tokio = { version = "1.27.0", features = ["full"] } database = { workspace = true } +async-trait = "0.1.75" events = { workspace = true } diff --git a/bins/identity-agent/src/main.rs b/bins/identity-agent/src/main.rs index f328e4d..6da8f22 100644 --- a/bins/identity-agent/src/main.rs +++ b/bins/identity-agent/src/main.rs @@ -1 +1,10 @@ -fn main() {} +mod migration; + +fn main() { + let event_bus = events::Bus::new( + "identity-agent", + std::env::var("EVENT_BUS_BIND").ok().as_deref(), + std::env::var("EVENT_BUS_PORT").ok().and_then(|s| s.parse().ok()), + move |topic, event| async { + }); +} diff --git a/crates/events/src/lib.rs b/crates/events/src/lib.rs index de629c2..8c26341 100644 --- a/crates/events/src/lib.rs +++ b/crates/events/src/lib.rs @@ -45,14 +45,14 @@ pub struct Bus { impl Bus { pub fn new>( - event_handler: Handler, device_id: S, bind: Option<&str>, port: Option, + event_handler: Handler, ) -> Self where Fut: std::future::Future + Send + 'static, - Handler: Fn(String, AppEvent) -> Fut + Send + 'static, + Handler: Fn(String, AppEvent) -> Fut + Send + Sync + 'static, { let (client, event_loop) = create_conn(device_id, bind, port); tokio::spawn(run(event_handler, event_loop, client.clone())); @@ -83,7 +83,7 @@ pub async fn run( client: AsyncClient, ) where Fut: std::future::Future + Send + 'static, - Handler: Fn(String, AppEvent) -> Fut + Send + 'static, + Handler: Fn(String, AppEvent) -> Fut + Send + Sync + 'static, { loop { // previously published messages should be republished after reconnection. diff --git a/crates/sessions-agent/Cargo.toml b/crates/sessions-agent/Cargo.toml deleted file mode 100644 index 38b5db7..0000000 --- a/crates/sessions-agent/Cargo.toml +++ /dev/null @@ -1,19 +0,0 @@ -[package] -name = "sessions-agent" -version = "0.1.0" -edition = "2021" - -[dependencies] -chrono = { version = "0.4.31", default-features = false, features = ["serde", "clock", "pure-rust-locales"] } -futures = { version = "0.3.29", default-features = false, features = ["async-await", "futures-executor", "thread-pool", "compat", "io-compat", "std"] } -futures-util = { version = "0.3.29", default-features = false, features = ["async-await", "futures-channel", "futures-io", "futures-macro"] } -serde = { version = "1.0.193", features = ["serde_derive"] } -serde_toml = { version = "0.0.1", default-features = false } -thiserror = "1.0.51" -tokio = { version = "1.35.1", default-features = false, features = ["full"] } -toml = { version = "0.8.8", default-features = false, features = ["parse", "indexmap", "preserve_order"] } -tracing = "0.1.40" -uuid = { version = "1.6.1", default-features = false, features = ["v4", "serde"] } -events = { path = "../events" } -actix = "0.13.1" -actix-web = "4.4.1" diff --git a/crates/sessions-agent/src/main.rs b/crates/sessions-agent/src/main.rs deleted file mode 100644 index 5d34394..0000000 --- a/crates/sessions-agent/src/main.rs +++ /dev/null @@ -1,2 +0,0 @@ -#[actix::main] -async fn main() {} diff --git a/crates/sessions-agent/src/utils.rs b/crates/sessions-agent/src/utils.rs deleted file mode 100644 index b7beb2a..0000000 --- a/crates/sessions-agent/src/utils.rs +++ /dev/null @@ -1,294 +0,0 @@ -use std::collections::HashSet; -use std::hash::{DefaultHasher, Hasher}; -use std::num::TryFromIntError; -use std::path::PathBuf; -use std::sync::Arc; - -use chrono::NaiveDateTime; -use database::chrono::Utc; -use database::sea_orm::ActiveValue::*; -use database::{chrono, sessions, uuid}; -use jsonwebtoken::*; -use serde::{Deserialize, Serialize}; -use tracing::*; - -pub static KEYS_PATH: &str = "./config"; - -#[derive(Debug, Serialize, Deserialize)] -pub struct Claims { - #[serde(rename = "sum")] - pub hash_sum: i64, - #[serde(rename = "sub")] - pub subject: i32, - #[serde(rename = "aud")] - pub audience: String, - pub role: String, - #[serde(rename = "iss")] - pub issuer: String, - #[serde(rename = "nbt")] - pub not_before_time: NaiveDateTime, - #[serde(rename = "exp")] - pub exp: NaiveDateTime, - #[serde(rename = "iat")] - pub issued_at: NaiveDateTime, - #[serde(rename = "jti")] - pub jwt_unique_identifier: uuid::Uuid, -} - -impl Claims { - pub async fn new( - subject: i32, - audience: String, - role: String, - issuer: Option, - not_before_time: chrono::NaiveDateTime, - issued_at: chrono::NaiveDateTime, - expiration_duration: Option, - ) -> Result { - let issuer = issuer.unwrap_or_else(|| "ergokeyboard".to_owned()); - let mut claims = Claims { - subject, - audience, - role, - issuer, - hash_sum: 0, - not_before_time, - exp: issued_at + expiration_duration.unwrap_or(chrono::Duration::days(365)), - issued_at, - jwt_unique_identifier: uuid::Uuid::new_v4(), - }; - let hash_sum = generate_hash_sum(&claims).await as i64; - claims.hash_sum = hash_sum; - Ok(claims) - } -} - -macro_rules! cmp_both { - ($l: expr, $r: expr, $($field: ident),+) => { - $( - $l.$field == $r.$field && - )+ true - } -} -impl PartialEq for Claims { - fn eq(&self, s: &sessions::Model) -> bool { - cmp_both!( - self, - s, - hash_sum, - subject, - audience, - role, - issuer, - not_before_time, - issued_at, - jwt_unique_identifier - ) && self.exp == s.expiration_time - } -} - -#[derive(Debug, thiserror::Error)] -pub enum KeysError { - #[error("Decode key failed on file system error: {0}")] - DecodeKeyIo(std::io::Error), - #[error("Decode key failed on file system error: {0}")] - EncodeKeyIo(std::io::Error), - #[error("Decode key failed to parse ed25519 key: {0}")] - DecodeKeyParsing(jsonwebtoken::errors::Error), - #[error("Encode key failed to parse ed25519 key: {0}")] - EncodeKeyParsing(jsonwebtoken::errors::Error), -} - -pub struct JwtKeysInner { - decode: DecodingKey, - encode: EncodingKey, -} - -const DECODE_KEY_NAME: &str = "public.pem"; -const ENCODE_KEY_NAME: &str = "private.pem"; - -#[derive(Clone, derive_more::Deref)] -pub struct JwtKeys(Arc); - -impl JwtKeys { - pub fn load(config_path: PathBuf) -> Result { - Ok(Self(Arc::new(JwtKeysInner { - decode: DecodingKey::from_ed_pem( - &std::fs::read(config_path.join(DECODE_KEY_NAME)) - .map_err(KeysError::DecodeKeyIo)?, - ) - .map_err(KeysError::DecodeKeyParsing)?, - encode: EncodingKey::from_ed_pem( - &std::fs::read(config_path.join(ENCODE_KEY_NAME)) - .map_err(KeysError::EncodeKeyIo)?, - ) - .map_err(KeysError::EncodeKeyParsing)?, - }))) - } -} - -pub async fn generate_token( - subject: i32, - audience: String, - role: String, - issuer: Option, - not_before_time: chrono::NaiveDateTime, - issued_at: chrono::NaiveDateTime, - expiration_duration: Option, -) -> Result { - let claims = Claims::new( - subject, - audience, - role, - issuer.clone(), - not_before_time, - issued_at, - expiration_duration, - ) - .await?; - Ok(database::sessions::ActiveModel { - hash_sum: Set(claims.hash_sum), - subject: Set(claims.subject), - audience: Set(claims.audience), - role: Set(claims.role), - issuer: Set(claims.issuer), - not_before_time: Set(claims.not_before_time), - expiration_time: Set(claims.exp), - issued_at: Set(claims.issued_at), - jwt_unique_identifier: Set(claims.jwt_unique_identifier), - ..Default::default() - }) -} - -pub async fn generate_hash_sum(claims: &Claims) -> i64 { - let Claims { - subject, - hash_sum: _, - audience, - role, - issuer, - not_before_time, - exp: expiration_time, - issued_at, - jwt_unique_identifier, - } = claims; - let mut hasher = DefaultHasher::default(); - hasher.write_i32(*subject); - hasher.write(audience.as_bytes()); - hasher.write(role.as_bytes()); - hasher.write(issuer.as_bytes()); - hasher.write_i64(not_before_time.timestamp_nanos_opt().expect("invalid NBT")); - hasher.write_i64(expiration_time.timestamp_nanos_opt().expect("invalid EXP")); - hasher.write_i64(issued_at.timestamp_nanos_opt().expect("invalid IAT")); - hasher.write(jwt_unique_identifier.as_bytes()); - hasher.finish() as i64 -} - -#[derive(Debug, thiserror::Error)] -pub enum ValidationError { - #[error("Given JWT text is not valid")] - InvalidString, - #[error("Can't load accounts from database")] - FetchAccounts, - #[error("Can't load sessions from database")] - FetchSessions, - #[error("Account for given ID does not exists")] - NoAccount, - #[error("Given token does not exists")] - UnknownToken, - #[error("Given token expired")] - Expired, -} - -pub async fn validate( - db_client: database::DatabaseConnection, - keys: JwtKeys, - token: &str, -) -> Result { - use database::*; - - let mut validation = jsonwebtoken::Validation::new(Algorithm::EdDSA); - validation.validate_exp = false; - validation.required_spec_claims = HashSet::new(); - validation.set_audience(&["Web"]); - validation.set_issuer(&["ergokeyboard"]); - - tracing::info!("decoding token: {token:?}"); - let token = match jsonwebtoken::decode::( - &token, - &keys.decode, - &validation, - ) { - Err(e) => { - warn!("Failed to decode token: {e}"); - return Err(ValidationError::InvalidString); - } - Ok(token) => token.claims, - }; - tracing::trace!("claims are: {token:?}"); - let hash = generate_hash_sum(&token).await as i64; - let Ok(mut rows) = Sessions::find() - .filter(entities::sessions::Column::HashSum.eq(hash)) - .all(&db_client) - .await - else { - return Err(ValidationError::FetchSessions); - }; - - let Some(found_idx) = rows.iter().position(|row| token == *row).clone() else { - return Err(ValidationError::UnknownToken); - }; - let found = rows.remove(found_idx); - if found.expiration_time < Utc::now().naive_utc() { - return Err(ValidationError::Expired); - } - - match Accounts::find() - .filter(database::accounts::Column::Id.eq(found.subject)) - .one(&db_client) - .await - { - Err(e) => { - error!("Failed to load account for {found:?}: {e}"); - Err(ValidationError::FetchAccounts) - } - Ok(None) => Err(ValidationError::NoAccount), - Ok(Some(account)) => Ok(account), - } -} - -pub async fn create_jwt_string( - keys: JwtKeys, - claims: &Claims, -) -> Result { - jsonwebtoken::encode( - &jsonwebtoken::Header::new(Algorithm::EdDSA), - claims, - &keys.encode, - ) -} - -#[cfg(test)] -mod tests { - use database::chrono::Utc; - - use super::*; - use std::path::Path; - - #[tokio::test] - async fn create_string() { - let keys = JwtKeys::load(Path::new("./config").to_owned()).unwrap(); - let claims = Claims::new( - 234, - "jaosidf".into(), - "User".into(), - None, - Utc::now().naive_utc(), - Utc::now().naive_utc(), - None, - ) - .await - .unwrap(); - let _text = create_jwt_string(keys, &claims).await.unwrap(); - } -} diff --git a/crates/zt/src/new_cmd.rs b/crates/zt/src/new_cmd.rs index a94458a..714f792 100644 --- a/crates/zt/src/new_cmd.rs +++ b/crates/zt/src/new_cmd.rs @@ -53,6 +53,9 @@ impl RunCmd for NewAgentOpts { cargo_file .write(b"database = { workspace = true }\n") .expect("Failed to create agent Cargo.toml file"); + cargo_file + .write(b"async-trait = \"0.1.75\"\n") + .expect("Failed to create agent Cargo.toml file"); } if self.with_event_bus { cargo_file @@ -71,12 +74,15 @@ impl RunCmd for NewAgentOpts { .expect("Failed to create main.rs for agent"); if self.with_database { main_file - .write(b"mod migration;\n") + .write(b"mod migration;\n\n") .expect("Failed to create agent main.rs"); } main_file .write(b"fn main() {\n") .expect("Failed to create agent main.rs"); + main_file + .write(format!(" let event_bus = events::Bus::new(\n {agent_name:?},\n std::env::var(\"EVENT_BUS_BIND\").ok().as_deref(),\n std::env::var(\"EVENT_BUS_PORT\").ok().and_then(|s| s.parse().ok()),\n move |topic, event| async {{\n }});\n").as_bytes()) + .expect("Failed to create agent main.rs"); main_file .write(b"}\n") .expect("Failed to create agent main.rs"); @@ -94,7 +100,7 @@ impl RunCmd for NewAgentOpts { migration_file .write( - r#"pub use sea_orm_migration::prelude::*; + r#"pub use database::sea_orm_migration::prelude::*; pub struct Migrator; @@ -125,6 +131,7 @@ impl RunCmd for NewCmd { } } + #[derive(Debug, Options)] pub struct NewOpts { help: bool,