Add agents

This commit is contained in:
Adrian Woźniak 2023-12-28 21:16:25 +01:00
parent b4b851b20f
commit 765c8753dd
9 changed files with 78 additions and 357 deletions

83
Cargo.lock generated
View File

@ -232,6 +232,17 @@ version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
[[package]]
name = "agent"
version = "0.1.0"
dependencies = [
"database",
"events",
"futures 0.3.30",
"tokio",
"web",
]
[[package]]
name = "ahash"
version = "0.7.7"
@ -929,7 +940,7 @@ version = "0.1.0"
dependencies = [
"bincode",
"bytes 1.5.0",
"futures 0.3.29",
"futures 0.3.30",
"futures-util",
"rumqttc",
"serde",
@ -1002,9 +1013,9 @@ checksum = "3a471a38ef8ed83cd6e40aa59c1ffe17db6855c18e3604d9c4ed8c08ebc28678"
[[package]]
name = "futures"
version = "0.3.29"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da0290714b38af9b4a7b094b8a37086d1b4e61f2df9122c3cad2577669145335"
checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0"
dependencies = [
"futures-channel",
"futures-core",
@ -1017,9 +1028,9 @@ dependencies = [
[[package]]
name = "futures-channel"
version = "0.3.29"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ff4dd66668b557604244583e3e1e1eada8c5c2e96a6d0d6653ede395b78bbacb"
checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78"
dependencies = [
"futures-core",
"futures-sink",
@ -1027,15 +1038,15 @@ dependencies = [
[[package]]
name = "futures-core"
version = "0.3.29"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eb1d22c66e66d9d72e1758f0bd7d4fd0bee04cad842ee34587d68c07e45d088c"
checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d"
[[package]]
name = "futures-executor"
version = "0.3.29"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0f4fb8693db0cf099eadcca0efe2a5a22e4550f98ed16aba6c48700da29597bc"
checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d"
dependencies = [
"futures-core",
"futures-task",
@ -1056,15 +1067,15 @@ dependencies = [
[[package]]
name = "futures-io"
version = "0.3.29"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8bf34a163b5c4c52d0478a4d757da8fb65cabef42ba90515efee0f6f9fa45aaa"
checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1"
[[package]]
name = "futures-macro"
version = "0.3.29"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "53b153fd91e4b0147f4aced87be237c98248656bb01050b96bf3ee89220a8ddb"
checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac"
dependencies = [
"proc-macro2",
"quote",
@ -1073,21 +1084,21 @@ dependencies = [
[[package]]
name = "futures-sink"
version = "0.3.29"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e36d3378ee38c2a36ad710c5d30c2911d752cb941c00c72dbabfb786a7970817"
checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5"
[[package]]
name = "futures-task"
version = "0.3.29"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "efd193069b0ddadc69c46389b740bbccdd97203899b48d09c5f7969591d6bae2"
checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004"
[[package]]
name = "futures-util"
version = "0.3.29"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a19526d624e703a3179b3d322efec918b6246ea0fa51d41124525f00f1cc8104"
checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48"
dependencies = [
"futures 0.1.31",
"futures-channel",
@ -1301,6 +1312,7 @@ dependencies = [
name = "identity-agent"
version = "0.1.0"
dependencies = [
"async-trait",
"database",
"events",
"tokio",
@ -1976,7 +1988,7 @@ name = "router"
version = "0.1.0"
dependencies = [
"chrono",
"futures 0.3.29",
"futures 0.3.30",
"futures-util",
"serde",
"serde_toml",
@ -2166,7 +2178,7 @@ dependencies = [
"async-trait",
"bigdecimal",
"chrono",
"futures 0.3.29",
"futures 0.3.30",
"log",
"ouroboros",
"rust_decimal",
@ -2224,7 +2236,7 @@ dependencies = [
"async-trait",
"clap",
"dotenvy",
"futures 0.3.29",
"futures 0.3.30",
"sea-orm",
"sea-orm-cli",
"sea-schema",
@ -2285,7 +2297,7 @@ version = "0.14.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0cd9561232bd1b82ea748b581f15909d11de0db6563ddcf28c5d908aee8282f1"
dependencies = [
"futures 0.3.29",
"futures 0.3.30",
"sea-query",
"sea-schema-derive",
]
@ -2400,18 +2412,10 @@ name = "sessions-agent"
version = "0.1.0"
dependencies = [
"actix",
"actix-web",
"chrono",
"async-trait",
"database",
"events",
"futures 0.3.29",
"futures-util",
"serde",
"serde_toml",
"thiserror",
"tokio",
"toml",
"tracing",
"uuid",
]
[[package]]
@ -2450,7 +2454,7 @@ name = "shared-config"
version = "0.1.0"
dependencies = [
"chrono",
"futures 0.3.29",
"futures 0.3.30",
"futures-util",
"serde",
"serde_toml",
@ -2944,6 +2948,7 @@ dependencies = [
"signal-hook-registry",
"socket2",
"tokio-macros",
"tracing",
"windows-sys 0.48.0",
]
@ -3247,6 +3252,16 @@ version = "0.2.89"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ab9b36309365056cd639da3134bf87fa8f3d86008abf99e612384a6eecd459f"
[[package]]
name = "web"
version = "0.1.0"
dependencies = [
"actix",
"actix-web",
"futures 0.3.30",
"tokio",
]
[[package]]
name = "webpki-roots"
version = "0.25.3"

View File

@ -3,13 +3,17 @@ resolver = "2"
members = [
"crates/database",
"crates/events",
"crates/agent",
"crates/web",
"crates/router",
"crates/sessions-agent",
"crates/shared-config",
"crates/zt",
"bins/identity-agent"
"bins/sessions-agent",
"bins/identity-agent",
]
[workspace.dependencies]
database = { path = "./crates/database" }
events = { path = "./crates/events" }
web = { path = "./crates/web" }
agent = { path = "./crates/agent" }

View File

@ -7,4 +7,5 @@ publish = false
[dependencies]
tokio = { version = "1.27.0", features = ["full"] }
database = { workspace = true }
async-trait = "0.1.75"
events = { workspace = true }

View File

@ -1 +1,10 @@
fn main() {}
mod migration;
fn main() {
let event_bus = events::Bus::new(
"identity-agent",
std::env::var("EVENT_BUS_BIND").ok().as_deref(),
std::env::var("EVENT_BUS_PORT").ok().and_then(|s| s.parse().ok()),
move |topic, event| async {
});
}

View File

@ -45,14 +45,14 @@ pub struct Bus {
impl Bus {
pub fn new<Handler, Fut, S: Into<String>>(
event_handler: Handler,
device_id: S,
bind: Option<&str>,
port: Option<u16>,
event_handler: Handler,
) -> Self
where
Fut: std::future::Future + Send + 'static,
Handler: Fn(String, AppEvent) -> Fut + Send + 'static,
Handler: Fn(String, AppEvent) -> Fut + Send + Sync + 'static,
{
let (client, event_loop) = create_conn(device_id, bind, port);
tokio::spawn(run(event_handler, event_loop, client.clone()));
@ -83,7 +83,7 @@ pub async fn run<Handler, Fut>(
client: AsyncClient,
) where
Fut: std::future::Future + Send + 'static,
Handler: Fn(String, AppEvent) -> Fut + Send + 'static,
Handler: Fn(String, AppEvent) -> Fut + Send + Sync + 'static,
{
loop {
// previously published messages should be republished after reconnection.

View File

@ -1,19 +0,0 @@
[package]
name = "sessions-agent"
version = "0.1.0"
edition = "2021"
[dependencies]
chrono = { version = "0.4.31", default-features = false, features = ["serde", "clock", "pure-rust-locales"] }
futures = { version = "0.3.29", default-features = false, features = ["async-await", "futures-executor", "thread-pool", "compat", "io-compat", "std"] }
futures-util = { version = "0.3.29", default-features = false, features = ["async-await", "futures-channel", "futures-io", "futures-macro"] }
serde = { version = "1.0.193", features = ["serde_derive"] }
serde_toml = { version = "0.0.1", default-features = false }
thiserror = "1.0.51"
tokio = { version = "1.35.1", default-features = false, features = ["full"] }
toml = { version = "0.8.8", default-features = false, features = ["parse", "indexmap", "preserve_order"] }
tracing = "0.1.40"
uuid = { version = "1.6.1", default-features = false, features = ["v4", "serde"] }
events = { path = "../events" }
actix = "0.13.1"
actix-web = "4.4.1"

View File

@ -1,2 +0,0 @@
#[actix::main]
async fn main() {}

View File

@ -1,294 +0,0 @@
use std::collections::HashSet;
use std::hash::{DefaultHasher, Hasher};
use std::num::TryFromIntError;
use std::path::PathBuf;
use std::sync::Arc;
use chrono::NaiveDateTime;
use database::chrono::Utc;
use database::sea_orm::ActiveValue::*;
use database::{chrono, sessions, uuid};
use jsonwebtoken::*;
use serde::{Deserialize, Serialize};
use tracing::*;
pub static KEYS_PATH: &str = "./config";
#[derive(Debug, Serialize, Deserialize)]
pub struct Claims {
#[serde(rename = "sum")]
pub hash_sum: i64,
#[serde(rename = "sub")]
pub subject: i32,
#[serde(rename = "aud")]
pub audience: String,
pub role: String,
#[serde(rename = "iss")]
pub issuer: String,
#[serde(rename = "nbt")]
pub not_before_time: NaiveDateTime,
#[serde(rename = "exp")]
pub exp: NaiveDateTime,
#[serde(rename = "iat")]
pub issued_at: NaiveDateTime,
#[serde(rename = "jti")]
pub jwt_unique_identifier: uuid::Uuid,
}
impl Claims {
pub async fn new(
subject: i32,
audience: String,
role: String,
issuer: Option<String>,
not_before_time: chrono::NaiveDateTime,
issued_at: chrono::NaiveDateTime,
expiration_duration: Option<chrono::Duration>,
) -> Result<Self, std::num::TryFromIntError> {
let issuer = issuer.unwrap_or_else(|| "ergokeyboard".to_owned());
let mut claims = Claims {
subject,
audience,
role,
issuer,
hash_sum: 0,
not_before_time,
exp: issued_at + expiration_duration.unwrap_or(chrono::Duration::days(365)),
issued_at,
jwt_unique_identifier: uuid::Uuid::new_v4(),
};
let hash_sum = generate_hash_sum(&claims).await as i64;
claims.hash_sum = hash_sum;
Ok(claims)
}
}
macro_rules! cmp_both {
($l: expr, $r: expr, $($field: ident),+) => {
$(
$l.$field == $r.$field &&
)+ true
}
}
impl PartialEq<sessions::Model> for Claims {
fn eq(&self, s: &sessions::Model) -> bool {
cmp_both!(
self,
s,
hash_sum,
subject,
audience,
role,
issuer,
not_before_time,
issued_at,
jwt_unique_identifier
) && self.exp == s.expiration_time
}
}
#[derive(Debug, thiserror::Error)]
pub enum KeysError {
#[error("Decode key failed on file system error: {0}")]
DecodeKeyIo(std::io::Error),
#[error("Decode key failed on file system error: {0}")]
EncodeKeyIo(std::io::Error),
#[error("Decode key failed to parse ed25519 key: {0}")]
DecodeKeyParsing(jsonwebtoken::errors::Error),
#[error("Encode key failed to parse ed25519 key: {0}")]
EncodeKeyParsing(jsonwebtoken::errors::Error),
}
pub struct JwtKeysInner {
decode: DecodingKey,
encode: EncodingKey,
}
const DECODE_KEY_NAME: &str = "public.pem";
const ENCODE_KEY_NAME: &str = "private.pem";
#[derive(Clone, derive_more::Deref)]
pub struct JwtKeys(Arc<JwtKeysInner>);
impl JwtKeys {
pub fn load(config_path: PathBuf) -> Result<Self, KeysError> {
Ok(Self(Arc::new(JwtKeysInner {
decode: DecodingKey::from_ed_pem(
&std::fs::read(config_path.join(DECODE_KEY_NAME))
.map_err(KeysError::DecodeKeyIo)?,
)
.map_err(KeysError::DecodeKeyParsing)?,
encode: EncodingKey::from_ed_pem(
&std::fs::read(config_path.join(ENCODE_KEY_NAME))
.map_err(KeysError::EncodeKeyIo)?,
)
.map_err(KeysError::EncodeKeyParsing)?,
})))
}
}
pub async fn generate_token(
subject: i32,
audience: String,
role: String,
issuer: Option<String>,
not_before_time: chrono::NaiveDateTime,
issued_at: chrono::NaiveDateTime,
expiration_duration: Option<chrono::Duration>,
) -> Result<database::sessions::ActiveModel, TryFromIntError> {
let claims = Claims::new(
subject,
audience,
role,
issuer.clone(),
not_before_time,
issued_at,
expiration_duration,
)
.await?;
Ok(database::sessions::ActiveModel {
hash_sum: Set(claims.hash_sum),
subject: Set(claims.subject),
audience: Set(claims.audience),
role: Set(claims.role),
issuer: Set(claims.issuer),
not_before_time: Set(claims.not_before_time),
expiration_time: Set(claims.exp),
issued_at: Set(claims.issued_at),
jwt_unique_identifier: Set(claims.jwt_unique_identifier),
..Default::default()
})
}
pub async fn generate_hash_sum(claims: &Claims) -> i64 {
let Claims {
subject,
hash_sum: _,
audience,
role,
issuer,
not_before_time,
exp: expiration_time,
issued_at,
jwt_unique_identifier,
} = claims;
let mut hasher = DefaultHasher::default();
hasher.write_i32(*subject);
hasher.write(audience.as_bytes());
hasher.write(role.as_bytes());
hasher.write(issuer.as_bytes());
hasher.write_i64(not_before_time.timestamp_nanos_opt().expect("invalid NBT"));
hasher.write_i64(expiration_time.timestamp_nanos_opt().expect("invalid EXP"));
hasher.write_i64(issued_at.timestamp_nanos_opt().expect("invalid IAT"));
hasher.write(jwt_unique_identifier.as_bytes());
hasher.finish() as i64
}
#[derive(Debug, thiserror::Error)]
pub enum ValidationError {
#[error("Given JWT text is not valid")]
InvalidString,
#[error("Can't load accounts from database")]
FetchAccounts,
#[error("Can't load sessions from database")]
FetchSessions,
#[error("Account for given ID does not exists")]
NoAccount,
#[error("Given token does not exists")]
UnknownToken,
#[error("Given token expired")]
Expired,
}
pub async fn validate(
db_client: database::DatabaseConnection,
keys: JwtKeys,
token: &str,
) -> Result<database::accounts::Model, ValidationError> {
use database::*;
let mut validation = jsonwebtoken::Validation::new(Algorithm::EdDSA);
validation.validate_exp = false;
validation.required_spec_claims = HashSet::new();
validation.set_audience(&["Web"]);
validation.set_issuer(&["ergokeyboard"]);
tracing::info!("decoding token: {token:?}");
let token = match jsonwebtoken::decode::<Claims>(
&token,
&keys.decode,
&validation,
) {
Err(e) => {
warn!("Failed to decode token: {e}");
return Err(ValidationError::InvalidString);
}
Ok(token) => token.claims,
};
tracing::trace!("claims are: {token:?}");
let hash = generate_hash_sum(&token).await as i64;
let Ok(mut rows) = Sessions::find()
.filter(entities::sessions::Column::HashSum.eq(hash))
.all(&db_client)
.await
else {
return Err(ValidationError::FetchSessions);
};
let Some(found_idx) = rows.iter().position(|row| token == *row).clone() else {
return Err(ValidationError::UnknownToken);
};
let found = rows.remove(found_idx);
if found.expiration_time < Utc::now().naive_utc() {
return Err(ValidationError::Expired);
}
match Accounts::find()
.filter(database::accounts::Column::Id.eq(found.subject))
.one(&db_client)
.await
{
Err(e) => {
error!("Failed to load account for {found:?}: {e}");
Err(ValidationError::FetchAccounts)
}
Ok(None) => Err(ValidationError::NoAccount),
Ok(Some(account)) => Ok(account),
}
}
pub async fn create_jwt_string(
keys: JwtKeys,
claims: &Claims,
) -> Result<String, jsonwebtoken::errors::Error> {
jsonwebtoken::encode(
&jsonwebtoken::Header::new(Algorithm::EdDSA),
claims,
&keys.encode,
)
}
#[cfg(test)]
mod tests {
use database::chrono::Utc;
use super::*;
use std::path::Path;
#[tokio::test]
async fn create_string() {
let keys = JwtKeys::load(Path::new("./config").to_owned()).unwrap();
let claims = Claims::new(
234,
"jaosidf".into(),
"User".into(),
None,
Utc::now().naive_utc(),
Utc::now().naive_utc(),
None,
)
.await
.unwrap();
let _text = create_jwt_string(keys, &claims).await.unwrap();
}
}

View File

@ -53,6 +53,9 @@ impl RunCmd for NewAgentOpts {
cargo_file
.write(b"database = { workspace = true }\n")
.expect("Failed to create agent Cargo.toml file");
cargo_file
.write(b"async-trait = \"0.1.75\"\n")
.expect("Failed to create agent Cargo.toml file");
}
if self.with_event_bus {
cargo_file
@ -71,12 +74,15 @@ impl RunCmd for NewAgentOpts {
.expect("Failed to create main.rs for agent");
if self.with_database {
main_file
.write(b"mod migration;\n")
.write(b"mod migration;\n\n")
.expect("Failed to create agent main.rs");
}
main_file
.write(b"fn main() {\n")
.expect("Failed to create agent main.rs");
main_file
.write(format!(" let event_bus = events::Bus::new(\n {agent_name:?},\n std::env::var(\"EVENT_BUS_BIND\").ok().as_deref(),\n std::env::var(\"EVENT_BUS_PORT\").ok().and_then(|s| s.parse().ok()),\n move |topic, event| async {{\n }});\n").as_bytes())
.expect("Failed to create agent main.rs");
main_file
.write(b"}\n")
.expect("Failed to create agent main.rs");
@ -94,7 +100,7 @@ impl RunCmd for NewAgentOpts {
migration_file
.write(
r#"pub use sea_orm_migration::prelude::*;
r#"pub use database::sea_orm_migration::prelude::*;
pub struct Migrator;
@ -125,6 +131,7 @@ impl RunCmd for NewCmd {
}
}
#[derive(Debug, Options)]
pub struct NewOpts {
help: bool,