bazzar/actors/fs_manager/src/lib.rs
2022-05-09 16:17:27 +02:00

227 lines
6.1 KiB
Rust

#![feature(io_error_more)]
use std::ffi::OsStr;
use std::io::Write;
use config::SharedAppConfig;
use model::{FileName, LocalPath, UniqueName};
#[macro_export]
macro_rules! fs_async_handler {
($msg: ty, $async: ident, $res: ty) => {
impl actix::Handler<$msg> for FsManager {
type Result = actix::ResponseActFuture<Self, Result<$res>>;
fn handle(&mut self, msg: $msg, _ctx: &mut Self::Context) -> Self::Result {
use actix::WrapFuture;
let config = self.config.clone();
Box::pin(async { $async(msg, config).await }.into_actor(self))
}
}
};
}
#[macro_export]
macro_rules! query_fs {
($fs: expr, $msg: expr, default $fail: expr) => {
match $fs.send($msg).await {
Ok(Ok(r)) => r,
Ok(Err(e)) => {
log::error!("{e}");
$fail
}
Err(e) => {
log::error!("{e:?}");
$fail
}
}
};
($fs: expr, $msg: expr, panic) => {
match $fs.send($msg).await {
Ok(Ok(r)) => r,
Ok(Err(e)) => {
log::error!("{e}");
panic!("{:?}", e);
}
Err(e) => {
log::error!("{e:?}");
panic!("{:?}", e);
}
}
};
($fs: expr, $msg: expr, $fail: expr) => {
$crate::query_db!($fs, $msg, $fail, $fail)
};
($fs: expr, $msg: expr, $db_fail: expr, $act_fail: expr) => {
match $fs.send($msg).await {
Ok(Ok(r)) => r,
Ok(Err(e)) => {
log::error!("{e}");
return Err($db_fail);
}
Err(e) => {
log::error!("{e:?}");
return Err($act_fail);
}
}
};
}
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("Can't access file system. Please check privileges")]
StorageUnavailable,
#[error("Can't write to file. Please check privileges. {0:?}")]
CantWrite(std::io::Error),
#[error("Can't write to file. There's no more space on disk")]
NoSpace,
#[error("Can't remove file. Please check privileges. {0:?}")]
CantRemove(std::io::Error),
#[error("Can't write to file. Invalid path, no filename")]
InvalidPath,
}
pub type Result<T> = std::result::Result<T, Error>;
#[derive(Clone)]
pub struct FsManager {
config: SharedAppConfig,
}
impl actix::Actor for FsManager {
type Context = actix::Context<FsManager>;
}
impl FsManager {
pub async fn build(config: SharedAppConfig) -> Result<Self> {
Self::validate_fs(config.clone()).await?;
Ok(Self { config })
}
async fn validate_fs(config: SharedAppConfig) -> Result<()> {
let l = config.lock();
let output_path = l.files().local_path();
tokio::fs::create_dir_all(&output_path).await.ok();
let test_file = std::path::PathBuf::new()
.join(output_path)
.join(format!("{}", uuid::Uuid::new_v4()));
tokio::fs::remove_file(test_file.clone()).await.ok();
tokio::fs::write(test_file.clone(), b"1")
.await
.map_err(|_| Error::StorageUnavailable)?;
tokio::fs::remove_file(test_file.clone())
.await
.map_err(|_| Error::StorageUnavailable)
}
}
#[derive(actix::Message)]
#[rtype(result = "Result<()>")]
pub struct RemoveFile {
pub file_name: String,
}
fs_async_handler!(RemoveFile, remove_file, ());
pub(crate) async fn remove_file(msg: RemoveFile, config: SharedAppConfig) -> Result<()> {
let output_path = {
let l = config.lock();
l.files().local_path()
};
match tokio::fs::remove_file(
std::path::PathBuf::new()
.join(output_path)
.join(msg.file_name),
)
.await
{
Ok(_) => Ok(()),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
Err(e) => Err(Error::CantRemove(e)),
}
}
pub struct WriteResult {
/// Unique file name created with UUID and original file extension
pub unique_name: UniqueName,
/// Path to file in local storage
pub local_path: LocalPath,
pub file_name: FileName,
}
#[derive(actix::Message)]
#[rtype(result = "Result<WriteResult>")]
pub struct WriteFile {
pub file_name: String,
pub stream: tokio::sync::mpsc::UnboundedReceiver<u8>,
}
fs_async_handler!(WriteFile, write_file, WriteResult);
pub(crate) async fn write_file(msg: WriteFile, config: SharedAppConfig) -> Result<WriteResult> {
let WriteFile {
file_name,
mut stream,
} = msg;
log::debug!("Writing file {:?}", file_name);
let p = std::path::Path::new(&file_name);
let ext = p
.file_name()
.and_then(OsStr::to_str)
.map(std::path::Path::new)
.and_then(std::path::Path::extension)
.and_then(OsStr::to_str)
.map(String::from);
let unique_name = format!(
"{}{}",
uuid::Uuid::new_v4(),
ext.map(|s| format!(".{s}")).unwrap_or_default()
);
let output_path = {
let l = config.lock();
l.files().local_path()
};
let path = std::path::PathBuf::new()
.join(&output_path)
.join(&unique_name);
log::debug!(
"File {:?} will be written as {:?} at {:?}",
file_name,
unique_name,
path
);
let mut file = match std::fs::File::create(&path) {
Ok(f) => f,
Err(e) => return Err(Error::CantWrite(e)),
};
let mut counter = 0;
while let Some(b) = stream.recv().await {
counter += 1;
if counter % 100_000 == 0 {
log::debug!("Wrote {} for {:?}", counter, file_name);
}
match file.write(&[b]) {
Ok(_) => {}
Err(e) if e.kind() == std::io::ErrorKind::StorageFull => return Err(Error::NoSpace),
Err(e) => return Err(Error::CantWrite(e)),
}
}
log::debug!("File {:?} successfully written", unique_name);
Ok(WriteResult {
file_name: FileName::new(file_name),
unique_name: UniqueName::new(unique_name),
local_path: LocalPath::from(path.to_str().unwrap_or_default().to_string()),
})
}