From fe06c1e63e65778f009576521cb2da21683c494d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrian=20Wo=C5=BAniak?= Date: Tue, 5 May 2020 16:09:26 +0200 Subject: [PATCH] Update user avatar --- .gitignore | 1 + README.md | 52 ++++++++ jirs-client/src/ws/mod.rs | 12 ++ jirs-client/webpack.config.js | 2 +- jirs-data/src/lib.rs | 3 + jirs-server/Cargo.toml | 32 ++++- jirs-server/src/db/authorize_user.rs | 20 +-- jirs-server/src/db/comments.rs | 33 +++-- jirs-server/src/db/issue_assignees.rs | 7 +- jirs-server/src/db/issues.rs | 5 +- jirs-server/src/db/projects.rs | 23 ++-- jirs-server/src/db/tokens.rs | 6 +- jirs-server/src/db/users.rs | 116 ++++++++++------- jirs-server/src/main.rs | 30 +++-- jirs-server/src/web/avatar.rs | 176 +++++++++++++++++++++++--- jirs-server/src/web/mod.rs | 101 +++++++++++++-- jirs-server/src/ws/mod.rs | 1 + 17 files changed, 487 insertions(+), 133 deletions(-) diff --git a/.gitignore b/.gitignore index 5c85882a..5eb81521 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,4 @@ db.toml db.test.toml pkg jirs-client/pkg +tmp diff --git a/README.md b/README.md index ea3a96ef..c2e7dcd3 100644 --- a/README.md +++ b/README.md @@ -14,20 +14,68 @@ https://git.sr.ht/~tsumanu/jirs ### Config files +#### WEB + ```toml # web.toml concurrency = 2 port = "5000" bind = "0.0.0.0" ssl = false +tmp_dir = "./tmp" + +[s3] +access_key_id = "" +secret_access_key = "" +bucket = "" +region_name = "eu-central-1" + +[filesystem] +store_path = "" +client_path = "/img" ``` +##### Upload local storage + +If default feature `"local-storage"` is on your uploaded files will be stored on your machine. +This requires additional configuration. + +```toml +[filesystem] +store_path = "/var/jirs/uploads" +client_path = "/img" +``` + +* `store_path` is your local machine path. Files will be saved there. This can be relative to `CWD` path or absolute path. +* `client_path` is web path + +Both must be set and non-empty + +##### Upload to AWS S3 + +If default feature `"aws-s3"` is on your uploaded files will be send to AWS S3 service. +This requires additional configuration. + +```toml +[s3] +access_key_id = "" +secret_access_key = "" +bucket = "" +region_name = "eu-central-1" +``` + +#### Database + ```toml # db.toml concurrency = 2 database_url = "postgres://postgres@localhost:5432/jirs" ``` +#### Mail Service + +You can send e-mail only via service which will handle this. This application was build using sendgrid. + ```toml # mail.toml concurrency = 2 @@ -52,6 +100,9 @@ NODE_ENV=development DEBUG=true ``` +Client and Server bind/port must be provided. Client will be build using those variables and will send requests only using this address. +`DATABASE_URL` is required only to setup database. Runtime will use `db.toml`. + ### Backend Requirements: @@ -60,6 +111,7 @@ Requirements: ```bash cargo install diesel_cli --no-default-features --features postgres +export DATABASE_URL=postgres://postgres@localhost/jirs diesel setup diesel migration run diff --git a/jirs-client/src/ws/mod.rs b/jirs-client/src/ws/mod.rs index 313ad1a3..3301256b 100644 --- a/jirs-client/src/ws/mod.rs +++ b/jirs-client/src/ws/mod.rs @@ -67,6 +67,18 @@ pub fn update(msg: &Msg, model: &mut Model, orders: &mut impl Orders) { } } } + Msg::WsMsg(WsMsg::AvatarUrlChanged(user_id, avatar_url)) => { + for user in model.users.iter_mut() { + if user.id == *user_id { + user.avatar_url = Some(avatar_url.clone()); + } + } + if let Some(me) = model.user.as_mut() { + if me.id == *user_id { + me.avatar_url = Some(avatar_url.clone()); + } + } + } _ => (), }; orders.render(); diff --git a/jirs-client/webpack.config.js b/jirs-client/webpack.config.js index f329d88a..8e302aeb 100644 --- a/jirs-client/webpack.config.js +++ b/jirs-client/webpack.config.js @@ -16,7 +16,7 @@ if (process.env.NODE_ENV === "production") { }); execSync("rm -Rf ./dist"); execSync("mkdir -p ./dist"); - execSync('./target/debug/jirs-css -O ./jirs-client/dist/styles.css', { + execSync('./target/release/jirs-css -O ./jirs-client/dist/styles.css', { cwd: jirDir, }); console.log("CSS combined"); diff --git a/jirs-data/src/lib.rs b/jirs-data/src/lib.rs index 626be7b3..6ed371d1 100644 --- a/jirs-data/src/lib.rs +++ b/jirs-data/src/lib.rs @@ -767,4 +767,7 @@ pub enum WsMsg { UpdateComment(UpdateCommentPayload), CommentDeleteRequest(CommentId), CommentDeleted(CommentId), + + // users + AvatarUrlChanged(UserId, String), } diff --git a/jirs-server/Cargo.toml b/jirs-server/Cargo.toml index 0e2022d9..03ec62eb 100644 --- a/jirs-server/Cargo.toml +++ b/jirs-server/Cargo.toml @@ -12,6 +12,19 @@ license = "MPL-2.0" name = "jirs_server" path = "./src/main.rs" +[features] +aws-s3 = [ + "rusoto_s3", + "rusoto_core" +] +local-storage = [ + "actix-files" +] +default = [ + "aws-s3", + "local-storage", +] + [dependencies] serde = { version = "*", features = ["derive"] } actix = { version = "*" } @@ -21,7 +34,6 @@ actix-service = { version = "*" } actix-rt = "1" actix-web-actors = "*" actix-multipart = { version = "*" } -actix-files = { version = "0.2.1" } dotenv = { version = "*" } byteorder = "1.0" @@ -50,10 +62,6 @@ futures = { version = "*" } lettre = { version = "*" } lettre_email = { version = "*" } -# Amazon S3 -rusoto_s3 = "0.43.0" -rusoto_core = "0.43.0" - [dependencies.diesel] version = "1.4.4" features = [ "unstable", "postgres", "numeric", "extras", "uuidv07" ] @@ -61,3 +69,17 @@ features = [ "unstable", "postgres", "numeric", "extras", "uuidv07" ] [dependencies.jirs-data] path = "../jirs-data" features = [ "backend" ] + +# Amazon S3 +[dependencies.rusoto_s3] +optional = true +version = "0.43.0" + +[dependencies.rusoto_core] +optional = true +version = "0.43.0" + +# Local storage +[dependencies.actix-files] +optional = true +version = "0.2.1" diff --git a/jirs-server/src/db/authorize_user.rs b/jirs-server/src/db/authorize_user.rs index a045197d..be963ef6 100644 --- a/jirs-server/src/db/authorize_user.rs +++ b/jirs-server/src/db/authorize_user.rs @@ -1,4 +1,5 @@ use actix::{Handler, Message}; +use diesel::pg::Pg; use diesel::prelude::*; use serde::{Deserialize, Serialize}; @@ -27,17 +28,19 @@ impl Handler for DbExecutor { .pool .get() .map_err(|_| ServiceErrors::DatabaseConnectionLost)?; + let token = tokens .filter(access_token.eq(msg.access_token)) .first::(conn) .map_err(|_e| { ServiceErrors::RecordNotFound(format!("token for {}", msg.access_token)) })?; - let user = users - .filter(id.eq(token.user_id)) + + let user_query = users.filter(id.eq(token.user_id)); + debug!("{}", diesel::debug_query::(&user_query)); + user_query .first::(conn) - .map_err(|_e| ServiceErrors::RecordNotFound(format!("user {}", token.user_id)))?; - Ok(user) + .map_err(|_e| ServiceErrors::RecordNotFound(format!("user {}", token.user_id))) } } @@ -55,10 +58,11 @@ impl SyncQuery for AuthorizeUser { .filter(access_token.eq(self.access_token)) .first::(&conn) .map_err(|_| crate::errors::ServiceErrors::Unauthorized)?; - let user = users - .filter(id.eq(token.user_id)) + + let user_query = users.filter(id.eq(token.user_id)); + debug!("{}", diesel::debug_query::(&user_query)); + user_query .first::(&conn) - .map_err(|_| crate::errors::ServiceErrors::Unauthorized)?; - Ok(user) + .map_err(|_| crate::errors::ServiceErrors::Unauthorized) } } diff --git a/jirs-server/src/db/comments.rs b/jirs-server/src/db/comments.rs index c74d3993..e21bbccd 100644 --- a/jirs-server/src/db/comments.rs +++ b/jirs-server/src/db/comments.rs @@ -1,4 +1,5 @@ use actix::{Handler, Message}; +use diesel::pg::Pg; use diesel::prelude::*; use serde::{Deserialize, Serialize}; @@ -26,12 +27,12 @@ impl Handler for DbExecutor { .pool .get() .map_err(|_| ServiceErrors::DatabaseConnectionLost)?; - let rows: Vec = comments - .distinct_on(id) - .filter(issue_id.eq(msg.issue_id)) + + let comments_query = comments.distinct_on(id).filter(issue_id.eq(msg.issue_id)); + debug!("{}", diesel::debug_query::(&comments_query)); + comments_query .load(conn) - .map_err(|_| ServiceErrors::RecordNotFound("issue comments".to_string()))?; - Ok(rows) + .map_err(|_| ServiceErrors::RecordNotFound("issue comments".to_string())) } } @@ -63,11 +64,12 @@ impl Handler for DbExecutor { .pool .get() .map_err(|_| ServiceErrors::DatabaseConnectionLost)?; - let row: Comment = diesel::insert_into(comments) - .values(form) + + let comment_query = diesel::insert_into(comments).values(form); + debug!("{}", diesel::debug_query::(&comment_query)); + comment_query .get_result::(conn) - .map_err(|_| ServiceErrors::RecordNotFound("issue comments".to_string()))?; - Ok(row) + .map_err(|_| ServiceErrors::RecordNotFound("issue comments".to_string())) } } @@ -98,7 +100,7 @@ impl Handler for DbExecutor { .find(msg.comment_id), ) .set(body.eq(msg.body)); - info!("{}", diesel::debug_query::(&query)); + info!("{}", diesel::debug_query::(&query)); let row: Comment = query .get_result::(conn) .map_err(|_| ServiceErrors::RecordNotFound("issue comments".to_string()))?; @@ -126,13 +128,16 @@ impl Handler for DbExecutor { .pool .get() .map_err(|_| ServiceErrors::DatabaseConnectionLost)?; - diesel::delete( + + let comment_query = diesel::delete( comments .filter(user_id.eq(msg.user_id)) .find(msg.comment_id), - ) - .execute(conn) - .map_err(|_| ServiceErrors::RecordNotFound("issue comments".to_string()))?; + ); + debug!("{}", diesel::debug_query::(&comment_query)); + comment_query + .execute(conn) + .map_err(|_| ServiceErrors::RecordNotFound("issue comments".to_string()))?; Ok(()) } } diff --git a/jirs-server/src/db/issue_assignees.rs b/jirs-server/src/db/issue_assignees.rs index 3ad5bc31..f358a5f8 100644 --- a/jirs-server/src/db/issue_assignees.rs +++ b/jirs-server/src/db/issue_assignees.rs @@ -1,4 +1,5 @@ use actix::{Handler, Message}; +use diesel::pg::Pg; use diesel::prelude::*; use serde::{Deserialize, Serialize}; @@ -26,9 +27,11 @@ impl Handler for DbExecutor { .pool .get() .map_err(|_| ServiceErrors::DatabaseConnectionLost)?; - issue_assignees + let issue_assignees_query = issue_assignees .distinct_on(id) - .filter(issue_id.eq(msg.issue_id)) + .filter(issue_id.eq(msg.issue_id)); + debug!("{}", diesel::debug_query::(&issue_assignees_query)); + issue_assignees_query .load::(conn) .map_err(|_| ServiceErrors::RecordNotFound("issue users".to_string())) } diff --git a/jirs-server/src/db/issues.rs b/jirs-server/src/db/issues.rs index ea7a1fd7..7f48d9a9 100644 --- a/jirs-server/src/db/issues.rs +++ b/jirs-server/src/db/issues.rs @@ -162,11 +162,10 @@ impl Handler for DbExecutor { })?; } - let row = issues + issues .find(msg.issue_id) .first::(conn) - .map_err(|_| ServiceErrors::DatabaseConnectionLost)?; - Ok(row) + .map_err(|_| ServiceErrors::DatabaseConnectionLost) } } diff --git a/jirs-server/src/db/projects.rs b/jirs-server/src/db/projects.rs index 164da9ea..1078ceef 100644 --- a/jirs-server/src/db/projects.rs +++ b/jirs-server/src/db/projects.rs @@ -1,4 +1,5 @@ use actix::{Handler, Message}; +use diesel::pg::Pg; use diesel::prelude::*; use serde::{Deserialize, Serialize}; @@ -63,19 +64,21 @@ impl Handler for DbExecutor { .get() .map_err(|_| ServiceErrors::DatabaseConnectionLost)?; - diesel::update(projects.find(msg.project_id)) - .set(( - msg.name.map(|v| name.eq(v)), - msg.url.map(|v| url.eq(v)), - msg.description.map(|v| description.eq(v)), - msg.category.map(|v| category.eq(v)), - msg.time_tracking.map(|v| time_tracking.eq(v)), - )) + let update_query = diesel::update(projects.find(msg.project_id)).set(( + msg.name.map(|v| name.eq(v)), + msg.url.map(|v| url.eq(v)), + msg.description.map(|v| description.eq(v)), + msg.category.map(|v| category.eq(v)), + msg.time_tracking.map(|v| time_tracking.eq(v)), + )); + debug!("{}", diesel::debug_query::(&update_query)); + update_query .execute(conn) .map_err(|e| ServiceErrors::DatabaseQueryFailed(format!("{}", e)))?; - projects - .filter(id.eq(msg.project_id)) + let project_query = projects.find(msg.project_id); + debug!("{}", diesel::debug_query::(&project_query)); + project_query .first::(conn) .map_err(|_| ServiceErrors::RecordNotFound("Project".to_string())) } diff --git a/jirs-server/src/db/tokens.rs b/jirs-server/src/db/tokens.rs index dcecaf4d..70ae2723 100644 --- a/jirs-server/src/db/tokens.rs +++ b/jirs-server/src/db/tokens.rs @@ -72,10 +72,10 @@ impl Handler for DbExecutor { refresh_token, bind_token, }; - let row: Token = diesel::insert_into(tokens) + + diesel::insert_into(tokens) .values(form) .get_result(conn) - .map_err(|_| ServiceErrors::RecordNotFound("issue comments".to_string()))?; - Ok(row) + .map_err(|_| ServiceErrors::RecordNotFound("issue comments".to_string())) } } diff --git a/jirs-server/src/db/users.rs b/jirs-server/src/db/users.rs index 134b941c..29dba16c 100644 --- a/jirs-server/src/db/users.rs +++ b/jirs-server/src/db/users.rs @@ -3,7 +3,7 @@ use diesel::pg::Pg; use diesel::prelude::*; use serde::{Deserialize, Serialize}; -use jirs_data::{IssueAssignee, Project, User, UserId}; +use jirs_data::{Project, User, UserId}; use crate::db::{DbExecutor, DbPooledConn}; use crate::errors::ServiceErrors; @@ -29,15 +29,15 @@ impl Handler for DbExecutor { .pool .get() .map_err(|_| ServiceErrors::DatabaseConnectionLost)?; - let row: User = users + + let users_query = users .distinct_on(id) .filter(email.eq(msg.email.as_str())) - .filter(name.eq(msg.name.as_str())) + .filter(name.eq(msg.name.as_str())); + debug!("{}", diesel::debug_query::(&users_query)); + users_query .first(conn) - .map_err(|_| { - ServiceErrors::RecordNotFound(format!("user {} {}", msg.name, msg.email)) - })?; - Ok(row) + .map_err(|_| ServiceErrors::RecordNotFound(format!("user {} {}", msg.name, msg.email))) } } @@ -60,12 +60,12 @@ impl Handler for DbExecutor { .pool .get() .map_err(|_| ServiceErrors::DatabaseConnectionLost)?; - let rows: Vec = users - .distinct_on(id) - .filter(project_id.eq(msg.project_id)) + + let users_query = users.distinct_on(id).filter(project_id.eq(msg.project_id)); + debug!("{}", diesel::debug_query::(&users_query)); + users_query .load(conn) - .map_err(|_| ServiceErrors::RecordNotFound("project users".to_string()))?; - Ok(rows) + .map_err(|_| ServiceErrors::RecordNotFound("project users".to_string())) } } @@ -89,17 +89,16 @@ impl Handler for DbExecutor { .pool .get() .map_err(|_| ServiceErrors::DatabaseConnectionLost)?; - let rows: Vec<(User, IssueAssignee)> = users + + let users_query = users .distinct_on(id) .inner_join(issue_assignees.on(user_id.eq(id))) .filter(issue_id.eq(msg.issue_id)) + .select(users::all_columns()); + debug!("{}", diesel::debug_query::(&users_query)); + users_query .load(conn) - .map_err(|_| ServiceErrors::RecordNotFound("issue users".to_string()))?; - let mut vec: Vec = vec![]; - for row in rows { - vec.push(row.0); - } - Ok(vec) + .map_err(|_| ServiceErrors::RecordNotFound("issue users".to_string())) } } @@ -131,21 +130,17 @@ impl Handler for DbExecutor { return Err(ServiceErrors::RegisterCollision); } - let project: Project = match projects.first(conn) { - Ok(project) => project, - _ => { - let form = CreateProjectForm { - name: "initial".to_string(), - url: "".to_string(), - description: "".to_string(), - category: Default::default(), - }; - diesel::insert_into(projects) - .values(form) - .get_result(conn) - .map_err(|_| ServiceErrors::RegisterCollision)? - } + let form = CreateProjectForm { + name: "initial".to_string(), + url: "".to_string(), + description: "".to_string(), + category: Default::default(), }; + let insert_query = diesel::insert_into(projects).values(form); + debug!("{}", diesel::debug_query::(&insert_query)); + let project: Project = insert_query + .get_result(conn) + .map_err(|_| ServiceErrors::RegisterCollision)?; let form = UserForm { name: msg.name, @@ -154,7 +149,9 @@ impl Handler for DbExecutor { project_id: project.id, }; - match diesel::insert_into(users).values(form).execute(conn) { + let insert_user_query = diesel::insert_into(users).values(form); + debug!("{}", diesel::debug_query::(&insert_user_query)); + match insert_user_query.execute(conn) { Ok(_) => (), _ => return Err(ServiceErrors::RegisterCollision), }; @@ -188,12 +185,10 @@ impl Handler for DbExecutor { .inner_join(invitations.on(i_email.eq(u_email))) .filter(invited_by_id.eq(msg.user_id)) .select(users::all_columns()); - debug!("{}", diesel::debug_query::(&query).to_string()); - - let res: Vec = query + debug!("{}", diesel::debug_query::(&query)); + query .load(conn) - .map_err(|e| ServiceErrors::DatabaseQueryFailed(format!("{}", e)))?; - Ok(res) + .map_err(|e| ServiceErrors::DatabaseQueryFailed(format!("{}", e))) } } @@ -205,15 +200,46 @@ fn count_matching_users(name: &str, email: &str, conn: &DbPooledConn) -> i64 { .or_filter(dsl::email.ne(email).and(dsl::name.eq(name))) .or_filter(dsl::email.eq(email).and(dsl::name.eq(name))) .count(); - - info!( - "{}", - diesel::debug_query::(&query).to_string() - ); - + info!("{}", diesel::debug_query::(&query)); query.get_result::(conn).unwrap_or(1) } +#[derive(Serialize, Deserialize, Debug)] +pub struct UpdateAvatarUrl { + pub user_id: UserId, + pub avatar_url: Option, +} + +impl Message for UpdateAvatarUrl { + type Result = Result; +} + +impl Handler for DbExecutor { + type Result = Result; + + fn handle(&mut self, msg: UpdateAvatarUrl, _ctx: &mut Self::Context) -> Self::Result { + use crate::schema::users::dsl::{avatar_url, id, users}; + + let conn = &self + .pool + .get() + .map_err(|_| ServiceErrors::DatabaseConnectionLost)?; + let update_query = diesel::update(users) + .set(avatar_url.eq(msg.avatar_url)) + .filter(id.eq(msg.user_id)); + debug!("{}", diesel::debug_query::(&update_query)); + update_query + .execute(conn) + .map_err(|e| ServiceErrors::DatabaseQueryFailed(format!("{}", e)))?; + + let user_query = users.find(msg.user_id); + debug!("{}", diesel::debug_query::(&user_query)); + user_query + .first(conn) + .map_err(|e| ServiceErrors::DatabaseQueryFailed(format!("{}", e))) + } +} + #[cfg(test)] mod tests { use crate::db::build_pool; diff --git a/jirs-server/src/main.rs b/jirs-server/src/main.rs index 63af1af8..6ae9440d 100644 --- a/jirs-server/src/main.rs +++ b/jirs-server/src/main.rs @@ -7,6 +7,7 @@ extern crate log; use actix::Actor; use actix_cors::Cors; +#[cfg(feature = "local-storage")] use actix_files as fs; use actix_web::{App, HttpServer}; @@ -29,6 +30,13 @@ async fn main() -> Result<(), String> { let web_config = web::Configuration::read(); + std::fs::create_dir_all(web_config.tmp_dir.as_str()).map_err(|e| e.to_string())?; + #[cfg(feature = "local-storage")] + if !web_config.filesystem.is_empty() { + let filesystem = &web_config.filesystem; + std::fs::create_dir_all(filesystem.store_path.as_str()).map_err(|e| e.to_string())?; + } + let db_addr = actix::SyncArbiter::start( crate::db::Configuration::read().concurrency, crate::db::DbExecutor::default, @@ -41,8 +49,7 @@ async fn main() -> Result<(), String> { let ws_server = WsServer::default().start(); HttpServer::new(move || { - let web_config = web::Configuration::read(); - let mut app = App::new() + let app = App::new() .wrap(actix_web::middleware::Logger::default()) .wrap(Cors::default()) .data(ws_server.clone()) @@ -51,12 +58,19 @@ async fn main() -> Result<(), String> { .data(crate::db::build_pool()) .service(crate::ws::index) .service(actix_web::web::scope("/avatar").service(crate::web::avatar::upload)); - if let Some(file_system) = web_config.filesystem.as_ref() { - app = app.service(fs::Files::new( - file_system.client_path.as_str(), - file_system.store_path.as_str(), - )); - } + + #[cfg(feature = "local-storage")] + let web_config = web::Configuration::read(); + #[cfg(feature = "local-storage")] + let app = if !web_config.filesystem.is_empty() { + let filesystem = &web_config.filesystem; + app.service(fs::Files::new( + filesystem.client_path.as_str(), + filesystem.store_path.as_str(), + )) + } else { + app + }; app }) .workers(web_config.concurrency) diff --git a/jirs-server/src/web/avatar.rs b/jirs-server/src/web/avatar.rs index a2a989ed..385edb6f 100644 --- a/jirs-server/src/web/avatar.rs +++ b/jirs-server/src/web/avatar.rs @@ -1,19 +1,37 @@ +#[cfg(feature = "aws-s3")] +use std::fs::File; +#[cfg(feature = "aws-s3")] +use std::io::Read; use std::io::Write; use actix::Addr; use actix_multipart::{Field, Multipart}; use actix_web::http::header::ContentDisposition; use actix_web::web::Data; -use actix_web::{get, post, web, Error, HttpResponse, Responder}; +use actix_web::{post, web, Error, HttpResponse}; use futures::{StreamExt, TryStreamExt}; +#[cfg(feature = "aws-s3")] +use rusoto_s3::{PutObjectRequest, S3Client, S3}; +use jirs_data::{User, UserId, WsMsg}; + +use crate::db::authorize_user::AuthorizeUser; +use crate::db::users::UpdateAvatarUrl; use crate::db::DbExecutor; +#[cfg(feature = "aws-s3")] +use crate::web::AmazonS3Storage; +use crate::ws::InnerMsg::BroadcastToChannel; +use crate::ws::WsServer; #[post("/")] pub async fn upload( mut payload: Multipart, db: Data>, + ws: Data>, ) -> Result { + let mut user_id: Option = None; + let mut avatar_url: Option = None; + while let Ok(Some(field)) = payload.try_next().await { let disposition: ContentDisposition = match field.content_disposition() { Some(d) => d, @@ -22,47 +40,163 @@ pub async fn upload( if !disposition.is_form_data() { return Ok(HttpResponse::BadRequest().finish()); } - let _name = disposition.get_name().as_ref().cloned().unwrap_or_default(); match disposition.get_name() { - Some("token") => handle_token(field, disposition, db.clone()).await?, - Some("avatar") => handle_image(field, disposition, db.clone()).await?, + Some("token") => { + user_id = Some(handle_token(field, db.clone()).await?); + } + Some("avatar") => { + let id = user_id.ok_or_else(|| HttpResponse::Unauthorized().finish())?; + avatar_url = Some(handle_image(id, field, disposition, db.clone()).await?); + } _ => continue, }; } - Ok(HttpResponse::Ok().json("")) + match (user_id, avatar_url) { + (Some(user_id), Some(avatar_url)) => { + let user = update_user_avatar(user_id, avatar_url.clone(), db).await?; + ws.send(BroadcastToChannel( + user.project_id, + WsMsg::AvatarUrlChanged(user.id, avatar_url), + )) + .await + .map_err(|_| HttpResponse::UnprocessableEntity().finish())?; + Ok(HttpResponse::NoContent().finish()) + } + _ => Ok(HttpResponse::UnprocessableEntity().finish()), + } } -async fn handle_token( - mut field: Field, - _disposition: ContentDisposition, - _db: Data>, -) -> Result<(), Error> { - Ok(()) +async fn update_user_avatar( + user_id: UserId, + new_url: String, + db: Data>, +) -> Result { + match db + .send(UpdateAvatarUrl { + user_id, + avatar_url: Some(new_url), + }) + .await + { + Ok(Ok(user)) => Ok(user), + + Ok(Err(e)) => { + error!("{:?}", e); + Err(HttpResponse::Unauthorized().finish().into()) + } + Err(e) => { + error!("{:?}", e); + Err(HttpResponse::Unauthorized().finish().into()) + } + } +} + +async fn handle_token(mut field: Field, db: Data>) -> Result { + let mut f: Vec = vec![]; + while let Some(chunk) = field.next().await { + let data = chunk.unwrap(); + f = web::block(move || f.write_all(&data).map(|_| f)).await?; + } + let access_token = String::from_utf8(f) + .unwrap_or_default() + .parse::() + .map_err(|_| HttpResponse::Unauthorized().finish())?; + match db.send(AuthorizeUser { access_token }).await { + Ok(Ok(user)) => Ok(user.id), + + Ok(Err(e)) => { + error!("{:?}", e); + Err(HttpResponse::Unauthorized().finish().into()) + } + Err(e) => { + error!("{:?}", e); + Err(HttpResponse::Unauthorized().finish().into()) + } + } } async fn handle_image( + user_id: UserId, mut field: Field, disposition: ContentDisposition, _db: Data>, -) -> Result<(), Error> { +) -> Result { let web_config = crate::web::Configuration::read(); + let mut new_link = None; let filename = disposition.get_filename().unwrap(); - let filepath = format!("./tmp/{}", filename); - // File::create is blocking operation, use threadpool - let mut f = web::block(|| std::fs::File::create(filepath)) + let tmp_file_path = format!("{}/{}-{}", web_config.tmp_dir, user_id, filename); + let mut f = web::block(move || std::fs::File::create(tmp_file_path)) .await .unwrap(); - // Field in turn is stream of *Bytes* object + + // Write temp file while let Some(chunk) = field.next().await { let data = chunk.unwrap(); - // filesystem operations are blocking, we have to use thread pool f = web::block(move || f.write_all(&data).map(|_| f)).await?; } - Ok(()) + + // Write public visible file + #[cfg(feature = "local-storage")] + if !web_config.filesystem.is_empty() { + let filesystem = &web_config.filesystem; + std::fs::copy( + format!("{}/{}-{}", web_config.tmp_dir, user_id, filename), + format!("{}/{}-{}", filesystem.store_path, user_id, filename), + ) + .map_err(|_| HttpResponse::InsufficientStorage().finish())?; + + new_link = Some(format!( + "{proto}://{bind}{port}{client_path}/{user_id}-{filename}", + proto = if web_config.ssl { "https" } else { "http" }, + bind = web_config.bind, + port = match web_config.port.as_str() { + "80" | "443" => "".to_string(), + p @ _ => format!(":{}", p), + }, + client_path = filesystem.client_path, + user_id = user_id, + filename = filename + )); + } + + // Upload to AWS S3 + #[cfg(feature = "aws-s3")] + if !web_config.s3.is_empty() { + let s3 = &web_config.s3; + s3.set_variables(); + let key = format!("{}-{}", user_id, filename); + let mut tmp_file = File::open(format!("{}/{}-{}", web_config.tmp_dir, user_id, filename)) + .map_err(|_| HttpResponse::InternalServerError())?; + let mut buffer: Vec = vec![]; + tmp_file + .read_to_end(&mut buffer) + .map_err(|_| HttpResponse::InternalServerError())?; + + let client = S3Client::new(s3.region()); + let put_object = PutObjectRequest { + bucket: s3.bucket.clone(), + key: key.clone(), + body: Some(buffer.into()), + ..Default::default() + }; + let _id = client + .put_object(put_object) + .await + .map_err(|_| HttpResponse::InternalServerError())?; + new_link = Some(aws_s3_url(key.as_str(), s3)); + } + std::fs::remove_file(format!("{}/{}-{}", web_config.tmp_dir, user_id, filename).as_str()) + .unwrap_or_default(); + Ok(new_link.unwrap_or_default()) } -#[get("/{id}")] -async fn download(_id: web::Path) -> impl Responder { - HttpResponse::Ok().json("") +#[cfg(feature = "aws-s3")] +fn aws_s3_url(key: &str, config: &AmazonS3Storage) -> String { + format!( + "https://{bucket}.s3.{region}.amazonaws.com/{key}", + bucket = config.bucket, + region = config.region_name, + key = key + ) } diff --git a/jirs-server/src/web/mod.rs b/jirs-server/src/web/mod.rs index 1f9dcd2b..604b2a8c 100644 --- a/jirs-server/src/web/mod.rs +++ b/jirs-server/src/web/mod.rs @@ -3,6 +3,8 @@ use std::fs::*; use actix::Addr; use actix_web::web::Data; use actix_web::{HttpRequest, HttpResponse}; +#[cfg(feature = "aws-s3")] +use rusoto_core::Region; use serde::{Deserialize, Serialize}; use jirs_data::User; @@ -40,24 +42,44 @@ pub enum Protocol { Https, } +#[cfg(feature = "local-storage")] #[derive(Serialize, Deserialize)] pub struct FileSystemStorage { pub store_path: String, pub client_path: String, } +#[cfg(feature = "local-storage")] +impl FileSystemStorage { + pub fn is_empty(&self) -> bool { + self.store_path.is_empty() + } +} + +#[cfg(feature = "aws-s3")] #[derive(Serialize, Deserialize)] pub struct AmazonS3Storage { pub access_key_id: String, pub secret_access_key: String, pub bucket: String, - pub region: String, + pub region_name: String, } -#[derive(Serialize, Deserialize)] -pub enum Storage { - FileSystem, - AmazonS3, +#[cfg(feature = "aws-s3")] +impl AmazonS3Storage { + pub fn is_empty(&self) -> bool { + self.access_key_id.is_empty() || self.secret_access_key.is_empty() || self.bucket.is_empty() + } + + pub fn region(&self) -> Region { + self.region_name.parse::().unwrap_or_default() + } + + pub fn set_variables(&self) { + std::env::set_var("AWS_ACCESS_KEY_ID", self.access_key_id.as_str()); + std::env::set_var("AWS_SECRET_ACCESS_KEY", self.secret_access_key.as_str()); + std::env::set_var("AWS_S3_BUCKET_NAME", self.region_name.as_str()); + } } #[derive(Serialize, Deserialize)] @@ -66,24 +88,77 @@ pub struct Configuration { pub port: String, pub bind: String, pub ssl: bool, - pub storage: Storage, - pub filesystem: Option, - pub s3: Option, + pub tmp_dir: String, + #[cfg(feature = "aws-s3")] + pub s3: AmazonS3Storage, + #[cfg(feature = "local-storage")] + pub filesystem: FileSystemStorage, } impl Default for Configuration { + #[cfg(all(feature = "local-storage", feature = "aws-s3"))] fn default() -> Self { Self { concurrency: 2, port: "5000".to_string(), bind: "0.0.0.0".to_string(), ssl: false, - storage: Storage::FileSystem, - filesystem: Some(FileSystemStorage { - store_path: "./tmp".to_string(), + + tmp_dir: "./tmp".to_string(), + filesystem: FileSystemStorage { + store_path: "".to_string(), client_path: "/img".to_string(), - }), - s3: None, + }, + s3: AmazonS3Storage { + access_key_id: "".to_string(), + secret_access_key: "".to_string(), + bucket: "".to_string(), + region_name: Region::default().name().to_string(), + }, + } + } + #[cfg(all(feature = "local-storage", not(feature = "aws-s3")))] + fn default() -> Self { + Self { + concurrency: 2, + port: "5000".to_string(), + bind: "0.0.0.0".to_string(), + ssl: false, + + tmp_dir: "./tmp".to_string(), + filesystem: FileSystemStorage { + store_path: "./img".to_string(), + client_path: "/img".to_string(), + }, + } + } + + #[cfg(all(feature = "aws-s3", not(feature = "local-storage")))] + fn default() -> Self { + Self { + concurrency: 2, + port: "5000".to_string(), + bind: "0.0.0.0".to_string(), + ssl: false, + + tmp_dir: "./tmp".to_string(), + s3: AmazonS3Storage { + access_key_id: "".to_string(), + secret_access_key: "".to_string(), + bucket: "".to_string(), + region_name: Region::default().name().to_string(), + }, + } + } + + #[cfg(all(not(feature = "aws-s3"), not(feature = "local-storage")))] + fn default() -> Self { + Self { + concurrency: 2, + port: "5000".to_string(), + bind: "0.0.0.0".to_string(), + ssl: false, + tmp_dir: "./tmp".to_string(), } } } diff --git a/jirs-server/src/ws/mod.rs b/jirs-server/src/ws/mod.rs index 83cf58f1..8ad494dd 100644 --- a/jirs-server/src/ws/mod.rs +++ b/jirs-server/src/ws/mod.rs @@ -271,6 +271,7 @@ impl Handler for WsServer { Some(s) => s, _ => return debug!(" channel not found, aborting..."), }; + let _s = set.len(); for r in set { let recipient = match self.sessions.get(r) { Some(r) => r,