237 lines
6.4 KiB
Rust
237 lines
6.4 KiB
Rust
#![feature(io_error_more)]
|
|
|
|
use std::ffi::OsStr;
|
|
use std::io::Write;
|
|
|
|
use config::SharedAppConfig;
|
|
use model::{FileName, LocalPath, UniqueName};
|
|
|
|
#[macro_export]
|
|
macro_rules! fs_async_handler {
|
|
($msg: ty, $async: ident, $res: ty) => {
|
|
impl actix::Handler<$msg> for FsManager {
|
|
type Result = actix::ResponseActFuture<Self, Result<$res>>;
|
|
|
|
fn handle(&mut self, msg: $msg, _ctx: &mut Self::Context) -> Self::Result {
|
|
use actix::WrapFuture;
|
|
let config = self.config.clone();
|
|
Box::pin(async { $async(msg, config).await }.into_actor(self))
|
|
}
|
|
}
|
|
};
|
|
}
|
|
|
|
#[macro_export]
|
|
macro_rules! query_fs {
|
|
($fs: expr, $msg: expr, default $fail: expr) => {
|
|
match $fs.send($msg).await {
|
|
Ok(Ok(r)) => r,
|
|
Ok(Err(e)) => {
|
|
tracing::error!("{e}");
|
|
$fail
|
|
}
|
|
Err(e) => {
|
|
tracing::error!("{e:?}");
|
|
$fail
|
|
}
|
|
}
|
|
};
|
|
($fs: expr, $msg: expr, panic) => {
|
|
match $fs.send($msg).await {
|
|
Ok(Ok(r)) => r,
|
|
Ok(Err(e)) => {
|
|
tracing::error!("{e}");
|
|
panic!("{:?}", e);
|
|
}
|
|
Err(e) => {
|
|
tracing::error!("{e:?}");
|
|
panic!("{:?}", e);
|
|
}
|
|
}
|
|
};
|
|
|
|
($fs: expr, $msg: expr, $fail: expr) => {
|
|
$crate::query_fs!($fs, $msg, $fail, $fail)
|
|
};
|
|
|
|
($fs: expr, $msg: expr, $db_fail: expr, $act_fail: expr) => {
|
|
match $fs.send($msg).await {
|
|
Ok(Ok(r)) => r,
|
|
Ok(Err(e)) => {
|
|
tracing::error!("{e}");
|
|
return Err($db_fail);
|
|
}
|
|
Err(e) => {
|
|
tracing::error!("{e:?}");
|
|
return Err($act_fail);
|
|
}
|
|
}
|
|
};
|
|
}
|
|
|
|
#[derive(Debug, Copy, Clone, serde::Serialize, thiserror::Error)]
|
|
#[serde(rename_all = "kebab-case", tag = "fs")]
|
|
pub enum Error {
|
|
#[error("Can't access file system. Please check privileges")]
|
|
StorageUnavailable,
|
|
#[error("Can't write to file. Please check privileges.")]
|
|
CantWrite,
|
|
#[error("Can't write to file. There's no more space on disk")]
|
|
NoSpace,
|
|
#[error("Can't remove file. Please check privileges.")]
|
|
CantRemove,
|
|
#[error("Can't write to file. Invalid path, no filename")]
|
|
InvalidPath,
|
|
}
|
|
|
|
pub type Result<T> = std::result::Result<T, Error>;
|
|
|
|
#[derive(Clone)]
|
|
pub struct FsManager {
|
|
config: SharedAppConfig,
|
|
}
|
|
|
|
impl actix::Actor for FsManager {
|
|
type Context = actix::Context<FsManager>;
|
|
}
|
|
|
|
impl FsManager {
|
|
pub async fn build(config: SharedAppConfig) -> Result<Self> {
|
|
Self::validate_fs(config.clone()).await?;
|
|
Ok(Self { config })
|
|
}
|
|
|
|
async fn validate_fs(config: SharedAppConfig) -> Result<()> {
|
|
let l = config.lock();
|
|
let output_path = l.files().local_path();
|
|
|
|
tokio::fs::create_dir_all(&output_path).await.ok();
|
|
|
|
let test_file = std::path::PathBuf::new()
|
|
.join(output_path)
|
|
.join(format!("{}", uuid::Uuid::new_v4()));
|
|
tokio::fs::remove_file(test_file.clone()).await.ok();
|
|
tokio::fs::write(test_file.clone(), b"1")
|
|
.await
|
|
.map_err(|_| Error::StorageUnavailable)?;
|
|
tokio::fs::remove_file(test_file.clone())
|
|
.await
|
|
.map_err(|_| Error::StorageUnavailable)
|
|
}
|
|
}
|
|
|
|
#[derive(actix::Message)]
|
|
#[rtype(result = "Result<()>")]
|
|
pub struct RemoveFile {
|
|
pub file_name: String,
|
|
}
|
|
|
|
fs_async_handler!(RemoveFile, remove_file, ());
|
|
|
|
pub(crate) async fn remove_file(msg: RemoveFile, config: SharedAppConfig) -> Result<()> {
|
|
let output_path = {
|
|
let l = config.lock();
|
|
l.files().local_path()
|
|
};
|
|
match tokio::fs::remove_file(
|
|
std::path::PathBuf::new()
|
|
.join(output_path)
|
|
.join(msg.file_name),
|
|
)
|
|
.await
|
|
{
|
|
Ok(_) => Ok(()),
|
|
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
|
|
Err(e) => {
|
|
tracing::error!("{:?}", e);
|
|
Err(Error::CantRemove)
|
|
}
|
|
}
|
|
}
|
|
|
|
pub struct WriteResult {
|
|
/// Unique file name created with UUID and original file extension
|
|
pub unique_name: UniqueName,
|
|
/// Path to file in local storage
|
|
pub local_path: LocalPath,
|
|
pub file_name: FileName,
|
|
}
|
|
|
|
#[derive(actix::Message)]
|
|
#[rtype(result = "Result<WriteResult>")]
|
|
pub struct WriteFile {
|
|
pub file_name: String,
|
|
pub stream: tokio::sync::mpsc::UnboundedReceiver<actix_web::web::Bytes>,
|
|
}
|
|
|
|
fs_async_handler!(WriteFile, write_file, WriteResult);
|
|
|
|
pub(crate) async fn write_file(msg: WriteFile, config: SharedAppConfig) -> Result<WriteResult> {
|
|
let WriteFile {
|
|
file_name,
|
|
mut stream,
|
|
} = msg;
|
|
|
|
tracing::debug!("Writing file {:?}", file_name);
|
|
|
|
let p = std::path::Path::new(&file_name);
|
|
let ext = p
|
|
.file_name()
|
|
.and_then(OsStr::to_str)
|
|
.map(std::path::Path::new)
|
|
.and_then(std::path::Path::extension)
|
|
.and_then(OsStr::to_str)
|
|
.map(String::from);
|
|
|
|
let unique_name = format!(
|
|
"{}{}",
|
|
uuid::Uuid::new_v4(),
|
|
ext.map(|s| format!(".{s}")).unwrap_or_default()
|
|
);
|
|
|
|
let output_path = {
|
|
let l = config.lock();
|
|
l.files().local_path()
|
|
};
|
|
|
|
let path = std::path::PathBuf::new()
|
|
.join(&output_path)
|
|
.join(&unique_name);
|
|
tracing::debug!(
|
|
"File {:?} will be written as {:?} at {:?}",
|
|
file_name,
|
|
unique_name,
|
|
path
|
|
);
|
|
let mut file = match std::fs::File::create(&path) {
|
|
Ok(f) => f,
|
|
Err(e) => {
|
|
tracing::error!("{:?}", e);
|
|
return Err(Error::CantWrite);
|
|
}
|
|
};
|
|
|
|
let mut counter = 0;
|
|
while let Some(b) = stream.recv().await {
|
|
counter += 1;
|
|
if counter % 100_000 == 0 {
|
|
tracing::debug!("Wrote {} for {:?}", counter, file_name);
|
|
}
|
|
match file.write(&b) {
|
|
Ok(_) => {}
|
|
Err(e) if e.kind() == std::io::ErrorKind::StorageFull => return Err(Error::NoSpace),
|
|
Err(e) => {
|
|
tracing::error!("{:?}", e);
|
|
return Err(Error::CantWrite);
|
|
}
|
|
}
|
|
}
|
|
tracing::debug!("File {:?} successfully written", unique_name);
|
|
|
|
Ok(WriteResult {
|
|
file_name: FileName::new(file_name),
|
|
unique_name: UniqueName::new(unique_name),
|
|
local_path: LocalPath::new(path.to_str().unwrap_or_default()),
|
|
})
|
|
}
|