Update user avatar

This commit is contained in:
Adrian Woźniak 2020-05-05 16:09:26 +02:00
parent 5b871a3332
commit fe06c1e63e
17 changed files with 487 additions and 133 deletions

1
.gitignore vendored
View File

@ -7,3 +7,4 @@ db.toml
db.test.toml
pkg
jirs-client/pkg
tmp

View File

@ -14,20 +14,68 @@ https://git.sr.ht/~tsumanu/jirs
### Config files
#### WEB
```toml
# web.toml
concurrency = 2
port = "5000"
bind = "0.0.0.0"
ssl = false
tmp_dir = "./tmp"
[s3]
access_key_id = ""
secret_access_key = ""
bucket = ""
region_name = "eu-central-1"
[filesystem]
store_path = ""
client_path = "/img"
```
##### Upload local storage
If default feature `"local-storage"` is on your uploaded files will be stored on your machine.
This requires additional configuration.
```toml
[filesystem]
store_path = "/var/jirs/uploads"
client_path = "/img"
```
* `store_path` is your local machine path. Files will be saved there. This can be relative to `CWD` path or absolute path.
* `client_path` is web path
Both must be set and non-empty
##### Upload to AWS S3
If default feature `"aws-s3"` is on your uploaded files will be send to AWS S3 service.
This requires additional configuration.
```toml
[s3]
access_key_id = ""
secret_access_key = ""
bucket = ""
region_name = "eu-central-1"
```
#### Database
```toml
# db.toml
concurrency = 2
database_url = "postgres://postgres@localhost:5432/jirs"
```
#### Mail Service
You can send e-mail only via service which will handle this. This application was build using sendgrid.
```toml
# mail.toml
concurrency = 2
@ -52,6 +100,9 @@ NODE_ENV=development
DEBUG=true
```
Client and Server bind/port must be provided. Client will be build using those variables and will send requests only using this address.
`DATABASE_URL` is required only to setup database. Runtime will use `db.toml`.
### Backend
Requirements:
@ -60,6 +111,7 @@ Requirements:
```bash
cargo install diesel_cli --no-default-features --features postgres
export DATABASE_URL=postgres://postgres@localhost/jirs
diesel setup
diesel migration run

View File

@ -67,6 +67,18 @@ pub fn update(msg: &Msg, model: &mut Model, orders: &mut impl Orders<Msg>) {
}
}
}
Msg::WsMsg(WsMsg::AvatarUrlChanged(user_id, avatar_url)) => {
for user in model.users.iter_mut() {
if user.id == *user_id {
user.avatar_url = Some(avatar_url.clone());
}
}
if let Some(me) = model.user.as_mut() {
if me.id == *user_id {
me.avatar_url = Some(avatar_url.clone());
}
}
}
_ => (),
};
orders.render();

View File

@ -16,7 +16,7 @@ if (process.env.NODE_ENV === "production") {
});
execSync("rm -Rf ./dist");
execSync("mkdir -p ./dist");
execSync('./target/debug/jirs-css -O ./jirs-client/dist/styles.css', {
execSync('./target/release/jirs-css -O ./jirs-client/dist/styles.css', {
cwd: jirDir,
});
console.log("CSS combined");

View File

@ -767,4 +767,7 @@ pub enum WsMsg {
UpdateComment(UpdateCommentPayload),
CommentDeleteRequest(CommentId),
CommentDeleted(CommentId),
// users
AvatarUrlChanged(UserId, String),
}

View File

@ -12,6 +12,19 @@ license = "MPL-2.0"
name = "jirs_server"
path = "./src/main.rs"
[features]
aws-s3 = [
"rusoto_s3",
"rusoto_core"
]
local-storage = [
"actix-files"
]
default = [
"aws-s3",
"local-storage",
]
[dependencies]
serde = { version = "*", features = ["derive"] }
actix = { version = "*" }
@ -21,7 +34,6 @@ actix-service = { version = "*" }
actix-rt = "1"
actix-web-actors = "*"
actix-multipart = { version = "*" }
actix-files = { version = "0.2.1" }
dotenv = { version = "*" }
byteorder = "1.0"
@ -50,10 +62,6 @@ futures = { version = "*" }
lettre = { version = "*" }
lettre_email = { version = "*" }
# Amazon S3
rusoto_s3 = "0.43.0"
rusoto_core = "0.43.0"
[dependencies.diesel]
version = "1.4.4"
features = [ "unstable", "postgres", "numeric", "extras", "uuidv07" ]
@ -61,3 +69,17 @@ features = [ "unstable", "postgres", "numeric", "extras", "uuidv07" ]
[dependencies.jirs-data]
path = "../jirs-data"
features = [ "backend" ]
# Amazon S3
[dependencies.rusoto_s3]
optional = true
version = "0.43.0"
[dependencies.rusoto_core]
optional = true
version = "0.43.0"
# Local storage
[dependencies.actix-files]
optional = true
version = "0.2.1"

View File

@ -1,4 +1,5 @@
use actix::{Handler, Message};
use diesel::pg::Pg;
use diesel::prelude::*;
use serde::{Deserialize, Serialize};
@ -27,17 +28,19 @@ impl Handler<AuthorizeUser> for DbExecutor {
.pool
.get()
.map_err(|_| ServiceErrors::DatabaseConnectionLost)?;
let token = tokens
.filter(access_token.eq(msg.access_token))
.first::<Token>(conn)
.map_err(|_e| {
ServiceErrors::RecordNotFound(format!("token for {}", msg.access_token))
})?;
let user = users
.filter(id.eq(token.user_id))
let user_query = users.filter(id.eq(token.user_id));
debug!("{}", diesel::debug_query::<Pg, _>(&user_query));
user_query
.first::<User>(conn)
.map_err(|_e| ServiceErrors::RecordNotFound(format!("user {}", token.user_id)))?;
Ok(user)
.map_err(|_e| ServiceErrors::RecordNotFound(format!("user {}", token.user_id)))
}
}
@ -55,10 +58,11 @@ impl SyncQuery for AuthorizeUser {
.filter(access_token.eq(self.access_token))
.first::<Token>(&conn)
.map_err(|_| crate::errors::ServiceErrors::Unauthorized)?;
let user = users
.filter(id.eq(token.user_id))
let user_query = users.filter(id.eq(token.user_id));
debug!("{}", diesel::debug_query::<Pg, _>(&user_query));
user_query
.first::<User>(&conn)
.map_err(|_| crate::errors::ServiceErrors::Unauthorized)?;
Ok(user)
.map_err(|_| crate::errors::ServiceErrors::Unauthorized)
}
}

View File

@ -1,4 +1,5 @@
use actix::{Handler, Message};
use diesel::pg::Pg;
use diesel::prelude::*;
use serde::{Deserialize, Serialize};
@ -26,12 +27,12 @@ impl Handler<LoadIssueComments> for DbExecutor {
.pool
.get()
.map_err(|_| ServiceErrors::DatabaseConnectionLost)?;
let rows: Vec<Comment> = comments
.distinct_on(id)
.filter(issue_id.eq(msg.issue_id))
let comments_query = comments.distinct_on(id).filter(issue_id.eq(msg.issue_id));
debug!("{}", diesel::debug_query::<Pg, _>(&comments_query));
comments_query
.load(conn)
.map_err(|_| ServiceErrors::RecordNotFound("issue comments".to_string()))?;
Ok(rows)
.map_err(|_| ServiceErrors::RecordNotFound("issue comments".to_string()))
}
}
@ -63,11 +64,12 @@ impl Handler<CreateComment> for DbExecutor {
.pool
.get()
.map_err(|_| ServiceErrors::DatabaseConnectionLost)?;
let row: Comment = diesel::insert_into(comments)
.values(form)
let comment_query = diesel::insert_into(comments).values(form);
debug!("{}", diesel::debug_query::<Pg, _>(&comment_query));
comment_query
.get_result::<Comment>(conn)
.map_err(|_| ServiceErrors::RecordNotFound("issue comments".to_string()))?;
Ok(row)
.map_err(|_| ServiceErrors::RecordNotFound("issue comments".to_string()))
}
}
@ -98,7 +100,7 @@ impl Handler<UpdateComment> for DbExecutor {
.find(msg.comment_id),
)
.set(body.eq(msg.body));
info!("{}", diesel::debug_query::<diesel::pg::Pg, _>(&query));
info!("{}", diesel::debug_query::<Pg, _>(&query));
let row: Comment = query
.get_result::<Comment>(conn)
.map_err(|_| ServiceErrors::RecordNotFound("issue comments".to_string()))?;
@ -126,13 +128,16 @@ impl Handler<DeleteComment> for DbExecutor {
.pool
.get()
.map_err(|_| ServiceErrors::DatabaseConnectionLost)?;
diesel::delete(
let comment_query = diesel::delete(
comments
.filter(user_id.eq(msg.user_id))
.find(msg.comment_id),
)
.execute(conn)
.map_err(|_| ServiceErrors::RecordNotFound("issue comments".to_string()))?;
);
debug!("{}", diesel::debug_query::<Pg, _>(&comment_query));
comment_query
.execute(conn)
.map_err(|_| ServiceErrors::RecordNotFound("issue comments".to_string()))?;
Ok(())
}
}

View File

@ -1,4 +1,5 @@
use actix::{Handler, Message};
use diesel::pg::Pg;
use diesel::prelude::*;
use serde::{Deserialize, Serialize};
@ -26,9 +27,11 @@ impl Handler<LoadAssignees> for DbExecutor {
.pool
.get()
.map_err(|_| ServiceErrors::DatabaseConnectionLost)?;
issue_assignees
let issue_assignees_query = issue_assignees
.distinct_on(id)
.filter(issue_id.eq(msg.issue_id))
.filter(issue_id.eq(msg.issue_id));
debug!("{}", diesel::debug_query::<Pg, _>(&issue_assignees_query));
issue_assignees_query
.load::<IssueAssignee>(conn)
.map_err(|_| ServiceErrors::RecordNotFound("issue users".to_string()))
}

View File

@ -162,11 +162,10 @@ impl Handler<UpdateIssue> for DbExecutor {
})?;
}
let row = issues
issues
.find(msg.issue_id)
.first::<Issue>(conn)
.map_err(|_| ServiceErrors::DatabaseConnectionLost)?;
Ok(row)
.map_err(|_| ServiceErrors::DatabaseConnectionLost)
}
}

View File

@ -1,4 +1,5 @@
use actix::{Handler, Message};
use diesel::pg::Pg;
use diesel::prelude::*;
use serde::{Deserialize, Serialize};
@ -63,19 +64,21 @@ impl Handler<UpdateProject> for DbExecutor {
.get()
.map_err(|_| ServiceErrors::DatabaseConnectionLost)?;
diesel::update(projects.find(msg.project_id))
.set((
msg.name.map(|v| name.eq(v)),
msg.url.map(|v| url.eq(v)),
msg.description.map(|v| description.eq(v)),
msg.category.map(|v| category.eq(v)),
msg.time_tracking.map(|v| time_tracking.eq(v)),
))
let update_query = diesel::update(projects.find(msg.project_id)).set((
msg.name.map(|v| name.eq(v)),
msg.url.map(|v| url.eq(v)),
msg.description.map(|v| description.eq(v)),
msg.category.map(|v| category.eq(v)),
msg.time_tracking.map(|v| time_tracking.eq(v)),
));
debug!("{}", diesel::debug_query::<Pg, _>(&update_query));
update_query
.execute(conn)
.map_err(|e| ServiceErrors::DatabaseQueryFailed(format!("{}", e)))?;
projects
.filter(id.eq(msg.project_id))
let project_query = projects.find(msg.project_id);
debug!("{}", diesel::debug_query::<Pg, _>(&project_query));
project_query
.first::<Project>(conn)
.map_err(|_| ServiceErrors::RecordNotFound("Project".to_string()))
}

View File

@ -72,10 +72,10 @@ impl Handler<CreateBindToken> for DbExecutor {
refresh_token,
bind_token,
};
let row: Token = diesel::insert_into(tokens)
diesel::insert_into(tokens)
.values(form)
.get_result(conn)
.map_err(|_| ServiceErrors::RecordNotFound("issue comments".to_string()))?;
Ok(row)
.map_err(|_| ServiceErrors::RecordNotFound("issue comments".to_string()))
}
}

View File

@ -3,7 +3,7 @@ use diesel::pg::Pg;
use diesel::prelude::*;
use serde::{Deserialize, Serialize};
use jirs_data::{IssueAssignee, Project, User, UserId};
use jirs_data::{Project, User, UserId};
use crate::db::{DbExecutor, DbPooledConn};
use crate::errors::ServiceErrors;
@ -29,15 +29,15 @@ impl Handler<FindUser> for DbExecutor {
.pool
.get()
.map_err(|_| ServiceErrors::DatabaseConnectionLost)?;
let row: User = users
let users_query = users
.distinct_on(id)
.filter(email.eq(msg.email.as_str()))
.filter(name.eq(msg.name.as_str()))
.filter(name.eq(msg.name.as_str()));
debug!("{}", diesel::debug_query::<Pg, _>(&users_query));
users_query
.first(conn)
.map_err(|_| {
ServiceErrors::RecordNotFound(format!("user {} {}", msg.name, msg.email))
})?;
Ok(row)
.map_err(|_| ServiceErrors::RecordNotFound(format!("user {} {}", msg.name, msg.email)))
}
}
@ -60,12 +60,12 @@ impl Handler<LoadProjectUsers> for DbExecutor {
.pool
.get()
.map_err(|_| ServiceErrors::DatabaseConnectionLost)?;
let rows: Vec<User> = users
.distinct_on(id)
.filter(project_id.eq(msg.project_id))
let users_query = users.distinct_on(id).filter(project_id.eq(msg.project_id));
debug!("{}", diesel::debug_query::<Pg, _>(&users_query));
users_query
.load(conn)
.map_err(|_| ServiceErrors::RecordNotFound("project users".to_string()))?;
Ok(rows)
.map_err(|_| ServiceErrors::RecordNotFound("project users".to_string()))
}
}
@ -89,17 +89,16 @@ impl Handler<LoadIssueAssignees> for DbExecutor {
.pool
.get()
.map_err(|_| ServiceErrors::DatabaseConnectionLost)?;
let rows: Vec<(User, IssueAssignee)> = users
let users_query = users
.distinct_on(id)
.inner_join(issue_assignees.on(user_id.eq(id)))
.filter(issue_id.eq(msg.issue_id))
.select(users::all_columns());
debug!("{}", diesel::debug_query::<Pg, _>(&users_query));
users_query
.load(conn)
.map_err(|_| ServiceErrors::RecordNotFound("issue users".to_string()))?;
let mut vec: Vec<User> = vec![];
for row in rows {
vec.push(row.0);
}
Ok(vec)
.map_err(|_| ServiceErrors::RecordNotFound("issue users".to_string()))
}
}
@ -131,21 +130,17 @@ impl Handler<Register> for DbExecutor {
return Err(ServiceErrors::RegisterCollision);
}
let project: Project = match projects.first(conn) {
Ok(project) => project,
_ => {
let form = CreateProjectForm {
name: "initial".to_string(),
url: "".to_string(),
description: "".to_string(),
category: Default::default(),
};
diesel::insert_into(projects)
.values(form)
.get_result(conn)
.map_err(|_| ServiceErrors::RegisterCollision)?
}
let form = CreateProjectForm {
name: "initial".to_string(),
url: "".to_string(),
description: "".to_string(),
category: Default::default(),
};
let insert_query = diesel::insert_into(projects).values(form);
debug!("{}", diesel::debug_query::<Pg, _>(&insert_query));
let project: Project = insert_query
.get_result(conn)
.map_err(|_| ServiceErrors::RegisterCollision)?;
let form = UserForm {
name: msg.name,
@ -154,7 +149,9 @@ impl Handler<Register> for DbExecutor {
project_id: project.id,
};
match diesel::insert_into(users).values(form).execute(conn) {
let insert_user_query = diesel::insert_into(users).values(form);
debug!("{}", diesel::debug_query::<Pg, _>(&insert_user_query));
match insert_user_query.execute(conn) {
Ok(_) => (),
_ => return Err(ServiceErrors::RegisterCollision),
};
@ -188,12 +185,10 @@ impl Handler<LoadInvitedUsers> for DbExecutor {
.inner_join(invitations.on(i_email.eq(u_email)))
.filter(invited_by_id.eq(msg.user_id))
.select(users::all_columns());
debug!("{}", diesel::debug_query::<Pg, _>(&query).to_string());
let res: Vec<User> = query
debug!("{}", diesel::debug_query::<Pg, _>(&query));
query
.load(conn)
.map_err(|e| ServiceErrors::DatabaseQueryFailed(format!("{}", e)))?;
Ok(res)
.map_err(|e| ServiceErrors::DatabaseQueryFailed(format!("{}", e)))
}
}
@ -205,15 +200,46 @@ fn count_matching_users(name: &str, email: &str, conn: &DbPooledConn) -> i64 {
.or_filter(dsl::email.ne(email).and(dsl::name.eq(name)))
.or_filter(dsl::email.eq(email).and(dsl::name.eq(name)))
.count();
info!(
"{}",
diesel::debug_query::<diesel::pg::Pg, _>(&query).to_string()
);
info!("{}", diesel::debug_query::<diesel::pg::Pg, _>(&query));
query.get_result::<i64>(conn).unwrap_or(1)
}
#[derive(Serialize, Deserialize, Debug)]
pub struct UpdateAvatarUrl {
pub user_id: UserId,
pub avatar_url: Option<String>,
}
impl Message for UpdateAvatarUrl {
type Result = Result<User, ServiceErrors>;
}
impl Handler<UpdateAvatarUrl> for DbExecutor {
type Result = Result<User, ServiceErrors>;
fn handle(&mut self, msg: UpdateAvatarUrl, _ctx: &mut Self::Context) -> Self::Result {
use crate::schema::users::dsl::{avatar_url, id, users};
let conn = &self
.pool
.get()
.map_err(|_| ServiceErrors::DatabaseConnectionLost)?;
let update_query = diesel::update(users)
.set(avatar_url.eq(msg.avatar_url))
.filter(id.eq(msg.user_id));
debug!("{}", diesel::debug_query::<Pg, _>(&update_query));
update_query
.execute(conn)
.map_err(|e| ServiceErrors::DatabaseQueryFailed(format!("{}", e)))?;
let user_query = users.find(msg.user_id);
debug!("{}", diesel::debug_query::<Pg, _>(&user_query));
user_query
.first(conn)
.map_err(|e| ServiceErrors::DatabaseQueryFailed(format!("{}", e)))
}
}
#[cfg(test)]
mod tests {
use crate::db::build_pool;

View File

@ -7,6 +7,7 @@ extern crate log;
use actix::Actor;
use actix_cors::Cors;
#[cfg(feature = "local-storage")]
use actix_files as fs;
use actix_web::{App, HttpServer};
@ -29,6 +30,13 @@ async fn main() -> Result<(), String> {
let web_config = web::Configuration::read();
std::fs::create_dir_all(web_config.tmp_dir.as_str()).map_err(|e| e.to_string())?;
#[cfg(feature = "local-storage")]
if !web_config.filesystem.is_empty() {
let filesystem = &web_config.filesystem;
std::fs::create_dir_all(filesystem.store_path.as_str()).map_err(|e| e.to_string())?;
}
let db_addr = actix::SyncArbiter::start(
crate::db::Configuration::read().concurrency,
crate::db::DbExecutor::default,
@ -41,8 +49,7 @@ async fn main() -> Result<(), String> {
let ws_server = WsServer::default().start();
HttpServer::new(move || {
let web_config = web::Configuration::read();
let mut app = App::new()
let app = App::new()
.wrap(actix_web::middleware::Logger::default())
.wrap(Cors::default())
.data(ws_server.clone())
@ -51,12 +58,19 @@ async fn main() -> Result<(), String> {
.data(crate::db::build_pool())
.service(crate::ws::index)
.service(actix_web::web::scope("/avatar").service(crate::web::avatar::upload));
if let Some(file_system) = web_config.filesystem.as_ref() {
app = app.service(fs::Files::new(
file_system.client_path.as_str(),
file_system.store_path.as_str(),
));
}
#[cfg(feature = "local-storage")]
let web_config = web::Configuration::read();
#[cfg(feature = "local-storage")]
let app = if !web_config.filesystem.is_empty() {
let filesystem = &web_config.filesystem;
app.service(fs::Files::new(
filesystem.client_path.as_str(),
filesystem.store_path.as_str(),
))
} else {
app
};
app
})
.workers(web_config.concurrency)

View File

@ -1,19 +1,37 @@
#[cfg(feature = "aws-s3")]
use std::fs::File;
#[cfg(feature = "aws-s3")]
use std::io::Read;
use std::io::Write;
use actix::Addr;
use actix_multipart::{Field, Multipart};
use actix_web::http::header::ContentDisposition;
use actix_web::web::Data;
use actix_web::{get, post, web, Error, HttpResponse, Responder};
use actix_web::{post, web, Error, HttpResponse};
use futures::{StreamExt, TryStreamExt};
#[cfg(feature = "aws-s3")]
use rusoto_s3::{PutObjectRequest, S3Client, S3};
use jirs_data::{User, UserId, WsMsg};
use crate::db::authorize_user::AuthorizeUser;
use crate::db::users::UpdateAvatarUrl;
use crate::db::DbExecutor;
#[cfg(feature = "aws-s3")]
use crate::web::AmazonS3Storage;
use crate::ws::InnerMsg::BroadcastToChannel;
use crate::ws::WsServer;
#[post("/")]
pub async fn upload(
mut payload: Multipart,
db: Data<Addr<DbExecutor>>,
ws: Data<Addr<WsServer>>,
) -> Result<HttpResponse, Error> {
let mut user_id: Option<UserId> = None;
let mut avatar_url: Option<String> = None;
while let Ok(Some(field)) = payload.try_next().await {
let disposition: ContentDisposition = match field.content_disposition() {
Some(d) => d,
@ -22,47 +40,163 @@ pub async fn upload(
if !disposition.is_form_data() {
return Ok(HttpResponse::BadRequest().finish());
}
let _name = disposition.get_name().as_ref().cloned().unwrap_or_default();
match disposition.get_name() {
Some("token") => handle_token(field, disposition, db.clone()).await?,
Some("avatar") => handle_image(field, disposition, db.clone()).await?,
Some("token") => {
user_id = Some(handle_token(field, db.clone()).await?);
}
Some("avatar") => {
let id = user_id.ok_or_else(|| HttpResponse::Unauthorized().finish())?;
avatar_url = Some(handle_image(id, field, disposition, db.clone()).await?);
}
_ => continue,
};
}
Ok(HttpResponse::Ok().json(""))
match (user_id, avatar_url) {
(Some(user_id), Some(avatar_url)) => {
let user = update_user_avatar(user_id, avatar_url.clone(), db).await?;
ws.send(BroadcastToChannel(
user.project_id,
WsMsg::AvatarUrlChanged(user.id, avatar_url),
))
.await
.map_err(|_| HttpResponse::UnprocessableEntity().finish())?;
Ok(HttpResponse::NoContent().finish())
}
_ => Ok(HttpResponse::UnprocessableEntity().finish()),
}
}
async fn handle_token(
mut field: Field,
_disposition: ContentDisposition,
_db: Data<Addr<DbExecutor>>,
) -> Result<(), Error> {
Ok(())
async fn update_user_avatar(
user_id: UserId,
new_url: String,
db: Data<Addr<DbExecutor>>,
) -> Result<User, Error> {
match db
.send(UpdateAvatarUrl {
user_id,
avatar_url: Some(new_url),
})
.await
{
Ok(Ok(user)) => Ok(user),
Ok(Err(e)) => {
error!("{:?}", e);
Err(HttpResponse::Unauthorized().finish().into())
}
Err(e) => {
error!("{:?}", e);
Err(HttpResponse::Unauthorized().finish().into())
}
}
}
async fn handle_token(mut field: Field, db: Data<Addr<DbExecutor>>) -> Result<UserId, Error> {
let mut f: Vec<u8> = vec![];
while let Some(chunk) = field.next().await {
let data = chunk.unwrap();
f = web::block(move || f.write_all(&data).map(|_| f)).await?;
}
let access_token = String::from_utf8(f)
.unwrap_or_default()
.parse::<uuid::Uuid>()
.map_err(|_| HttpResponse::Unauthorized().finish())?;
match db.send(AuthorizeUser { access_token }).await {
Ok(Ok(user)) => Ok(user.id),
Ok(Err(e)) => {
error!("{:?}", e);
Err(HttpResponse::Unauthorized().finish().into())
}
Err(e) => {
error!("{:?}", e);
Err(HttpResponse::Unauthorized().finish().into())
}
}
}
async fn handle_image(
user_id: UserId,
mut field: Field,
disposition: ContentDisposition,
_db: Data<Addr<DbExecutor>>,
) -> Result<(), Error> {
) -> Result<String, Error> {
let web_config = crate::web::Configuration::read();
let mut new_link = None;
let filename = disposition.get_filename().unwrap();
let filepath = format!("./tmp/{}", filename);
// File::create is blocking operation, use threadpool
let mut f = web::block(|| std::fs::File::create(filepath))
let tmp_file_path = format!("{}/{}-{}", web_config.tmp_dir, user_id, filename);
let mut f = web::block(move || std::fs::File::create(tmp_file_path))
.await
.unwrap();
// Field in turn is stream of *Bytes* object
// Write temp file
while let Some(chunk) = field.next().await {
let data = chunk.unwrap();
// filesystem operations are blocking, we have to use thread pool
f = web::block(move || f.write_all(&data).map(|_| f)).await?;
}
Ok(())
// Write public visible file
#[cfg(feature = "local-storage")]
if !web_config.filesystem.is_empty() {
let filesystem = &web_config.filesystem;
std::fs::copy(
format!("{}/{}-{}", web_config.tmp_dir, user_id, filename),
format!("{}/{}-{}", filesystem.store_path, user_id, filename),
)
.map_err(|_| HttpResponse::InsufficientStorage().finish())?;
new_link = Some(format!(
"{proto}://{bind}{port}{client_path}/{user_id}-{filename}",
proto = if web_config.ssl { "https" } else { "http" },
bind = web_config.bind,
port = match web_config.port.as_str() {
"80" | "443" => "".to_string(),
p @ _ => format!(":{}", p),
},
client_path = filesystem.client_path,
user_id = user_id,
filename = filename
));
}
// Upload to AWS S3
#[cfg(feature = "aws-s3")]
if !web_config.s3.is_empty() {
let s3 = &web_config.s3;
s3.set_variables();
let key = format!("{}-{}", user_id, filename);
let mut tmp_file = File::open(format!("{}/{}-{}", web_config.tmp_dir, user_id, filename))
.map_err(|_| HttpResponse::InternalServerError())?;
let mut buffer: Vec<u8> = vec![];
tmp_file
.read_to_end(&mut buffer)
.map_err(|_| HttpResponse::InternalServerError())?;
let client = S3Client::new(s3.region());
let put_object = PutObjectRequest {
bucket: s3.bucket.clone(),
key: key.clone(),
body: Some(buffer.into()),
..Default::default()
};
let _id = client
.put_object(put_object)
.await
.map_err(|_| HttpResponse::InternalServerError())?;
new_link = Some(aws_s3_url(key.as_str(), s3));
}
std::fs::remove_file(format!("{}/{}-{}", web_config.tmp_dir, user_id, filename).as_str())
.unwrap_or_default();
Ok(new_link.unwrap_or_default())
}
#[get("/{id}")]
async fn download(_id: web::Path<i32>) -> impl Responder {
HttpResponse::Ok().json("")
#[cfg(feature = "aws-s3")]
fn aws_s3_url(key: &str, config: &AmazonS3Storage) -> String {
format!(
"https://{bucket}.s3.{region}.amazonaws.com/{key}",
bucket = config.bucket,
region = config.region_name,
key = key
)
}

View File

@ -3,6 +3,8 @@ use std::fs::*;
use actix::Addr;
use actix_web::web::Data;
use actix_web::{HttpRequest, HttpResponse};
#[cfg(feature = "aws-s3")]
use rusoto_core::Region;
use serde::{Deserialize, Serialize};
use jirs_data::User;
@ -40,24 +42,44 @@ pub enum Protocol {
Https,
}
#[cfg(feature = "local-storage")]
#[derive(Serialize, Deserialize)]
pub struct FileSystemStorage {
pub store_path: String,
pub client_path: String,
}
#[cfg(feature = "local-storage")]
impl FileSystemStorage {
pub fn is_empty(&self) -> bool {
self.store_path.is_empty()
}
}
#[cfg(feature = "aws-s3")]
#[derive(Serialize, Deserialize)]
pub struct AmazonS3Storage {
pub access_key_id: String,
pub secret_access_key: String,
pub bucket: String,
pub region: String,
pub region_name: String,
}
#[derive(Serialize, Deserialize)]
pub enum Storage {
FileSystem,
AmazonS3,
#[cfg(feature = "aws-s3")]
impl AmazonS3Storage {
pub fn is_empty(&self) -> bool {
self.access_key_id.is_empty() || self.secret_access_key.is_empty() || self.bucket.is_empty()
}
pub fn region(&self) -> Region {
self.region_name.parse::<Region>().unwrap_or_default()
}
pub fn set_variables(&self) {
std::env::set_var("AWS_ACCESS_KEY_ID", self.access_key_id.as_str());
std::env::set_var("AWS_SECRET_ACCESS_KEY", self.secret_access_key.as_str());
std::env::set_var("AWS_S3_BUCKET_NAME", self.region_name.as_str());
}
}
#[derive(Serialize, Deserialize)]
@ -66,24 +88,77 @@ pub struct Configuration {
pub port: String,
pub bind: String,
pub ssl: bool,
pub storage: Storage,
pub filesystem: Option<FileSystemStorage>,
pub s3: Option<AmazonS3Storage>,
pub tmp_dir: String,
#[cfg(feature = "aws-s3")]
pub s3: AmazonS3Storage,
#[cfg(feature = "local-storage")]
pub filesystem: FileSystemStorage,
}
impl Default for Configuration {
#[cfg(all(feature = "local-storage", feature = "aws-s3"))]
fn default() -> Self {
Self {
concurrency: 2,
port: "5000".to_string(),
bind: "0.0.0.0".to_string(),
ssl: false,
storage: Storage::FileSystem,
filesystem: Some(FileSystemStorage {
store_path: "./tmp".to_string(),
tmp_dir: "./tmp".to_string(),
filesystem: FileSystemStorage {
store_path: "".to_string(),
client_path: "/img".to_string(),
}),
s3: None,
},
s3: AmazonS3Storage {
access_key_id: "".to_string(),
secret_access_key: "".to_string(),
bucket: "".to_string(),
region_name: Region::default().name().to_string(),
},
}
}
#[cfg(all(feature = "local-storage", not(feature = "aws-s3")))]
fn default() -> Self {
Self {
concurrency: 2,
port: "5000".to_string(),
bind: "0.0.0.0".to_string(),
ssl: false,
tmp_dir: "./tmp".to_string(),
filesystem: FileSystemStorage {
store_path: "./img".to_string(),
client_path: "/img".to_string(),
},
}
}
#[cfg(all(feature = "aws-s3", not(feature = "local-storage")))]
fn default() -> Self {
Self {
concurrency: 2,
port: "5000".to_string(),
bind: "0.0.0.0".to_string(),
ssl: false,
tmp_dir: "./tmp".to_string(),
s3: AmazonS3Storage {
access_key_id: "".to_string(),
secret_access_key: "".to_string(),
bucket: "".to_string(),
region_name: Region::default().name().to_string(),
},
}
}
#[cfg(all(not(feature = "aws-s3"), not(feature = "local-storage")))]
fn default() -> Self {
Self {
concurrency: 2,
port: "5000".to_string(),
bind: "0.0.0.0".to_string(),
ssl: false,
tmp_dir: "./tmp".to_string(),
}
}
}

View File

@ -271,6 +271,7 @@ impl Handler<InnerMsg> for WsServer {
Some(s) => s,
_ => return debug!(" channel not found, aborting..."),
};
let _s = set.len();
for r in set {
let recipient = match self.sessions.get(r) {
Some(r) => r,