#![feature(io_error_more)] use std::ffi::OsStr; use std::io::Write; use config::SharedAppConfig; use model::{FileName, LocalPath, UniqueName}; #[macro_export] macro_rules! fs_async_handler { ($msg: ty, $async: ident, $res: ty) => { impl actix::Handler<$msg> for FsManager { type Result = actix::ResponseActFuture>; fn handle(&mut self, msg: $msg, _ctx: &mut Self::Context) -> Self::Result { use actix::WrapFuture; let config = self.config.clone(); Box::pin(async { $async(msg, config).await }.into_actor(self)) } } }; } #[macro_export] macro_rules! query_fs { ($fs: expr, $msg: expr, default $fail: expr) => { match $fs.send($msg).await { Ok(Ok(r)) => r, Ok(Err(e)) => { log::error!("{e}"); $fail } Err(e) => { log::error!("{e:?}"); $fail } } }; ($fs: expr, $msg: expr, panic) => { match $fs.send($msg).await { Ok(Ok(r)) => r, Ok(Err(e)) => { log::error!("{e}"); panic!("{:?}", e); } Err(e) => { log::error!("{e:?}"); panic!("{:?}", e); } } }; ($fs: expr, $msg: expr, $fail: expr) => { $crate::query_fs!($fs, $msg, $fail, $fail) }; ($fs: expr, $msg: expr, $db_fail: expr, $act_fail: expr) => { match $fs.send($msg).await { Ok(Ok(r)) => r, Ok(Err(e)) => { log::error!("{e}"); return Err($db_fail); } Err(e) => { log::error!("{e:?}"); return Err($act_fail); } } }; } #[derive(Debug, Copy, Clone, serde::Serialize, thiserror::Error)] #[serde(rename_all = "kebab-case", tag = "fs")] pub enum Error { #[error("Can't access file system. Please check privileges")] StorageUnavailable, #[error("Can't write to file. Please check privileges.")] CantWrite, #[error("Can't write to file. There's no more space on disk")] NoSpace, #[error("Can't remove file. Please check privileges.")] CantRemove, #[error("Can't write to file. Invalid path, no filename")] InvalidPath, } pub type Result = std::result::Result; #[derive(Clone)] pub struct FsManager { config: SharedAppConfig, } impl actix::Actor for FsManager { type Context = actix::Context; } impl FsManager { pub async fn build(config: SharedAppConfig) -> Result { Self::validate_fs(config.clone()).await?; Ok(Self { config }) } async fn validate_fs(config: SharedAppConfig) -> Result<()> { let l = config.lock(); let output_path = l.files().local_path(); tokio::fs::create_dir_all(&output_path).await.ok(); let test_file = std::path::PathBuf::new() .join(output_path) .join(format!("{}", uuid::Uuid::new_v4())); tokio::fs::remove_file(test_file.clone()).await.ok(); tokio::fs::write(test_file.clone(), b"1") .await .map_err(|_| Error::StorageUnavailable)?; tokio::fs::remove_file(test_file.clone()) .await .map_err(|_| Error::StorageUnavailable) } } #[derive(actix::Message)] #[rtype(result = "Result<()>")] pub struct RemoveFile { pub file_name: String, } fs_async_handler!(RemoveFile, remove_file, ()); pub(crate) async fn remove_file(msg: RemoveFile, config: SharedAppConfig) -> Result<()> { let output_path = { let l = config.lock(); l.files().local_path() }; match tokio::fs::remove_file( std::path::PathBuf::new() .join(output_path) .join(msg.file_name), ) .await { Ok(_) => Ok(()), Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()), Err(e) => { log::error!("{:?}", e); Err(Error::CantRemove) } } } pub struct WriteResult { /// Unique file name created with UUID and original file extension pub unique_name: UniqueName, /// Path to file in local storage pub local_path: LocalPath, pub file_name: FileName, } #[derive(actix::Message)] #[rtype(result = "Result")] pub struct WriteFile { pub file_name: String, pub stream: tokio::sync::mpsc::UnboundedReceiver, } fs_async_handler!(WriteFile, write_file, WriteResult); pub(crate) async fn write_file(msg: WriteFile, config: SharedAppConfig) -> Result { let WriteFile { file_name, mut stream, } = msg; log::debug!("Writing file {:?}", file_name); let p = std::path::Path::new(&file_name); let ext = p .file_name() .and_then(OsStr::to_str) .map(std::path::Path::new) .and_then(std::path::Path::extension) .and_then(OsStr::to_str) .map(String::from); let unique_name = format!( "{}{}", uuid::Uuid::new_v4(), ext.map(|s| format!(".{s}")).unwrap_or_default() ); let output_path = { let l = config.lock(); l.files().local_path() }; let path = std::path::PathBuf::new() .join(&output_path) .join(&unique_name); log::debug!( "File {:?} will be written as {:?} at {:?}", file_name, unique_name, path ); let mut file = match std::fs::File::create(&path) { Ok(f) => f, Err(e) => { log::error!("{:?}", e); return Err(Error::CantWrite); } }; let mut counter = 0; while let Some(b) = stream.recv().await { counter += 1; if counter % 100_000 == 0 { log::debug!("Wrote {} for {:?}", counter, file_name); } match file.write(&b) { Ok(_) => {} Err(e) if e.kind() == std::io::ErrorKind::StorageFull => return Err(Error::NoSpace), Err(e) => { log::error!("{:?}", e); return Err(Error::CantWrite); } } } log::debug!("File {:?} successfully written", unique_name); Ok(WriteResult { file_name: FileName::new(file_name), unique_name: UniqueName::new(unique_name), local_path: LocalPath::new(path.to_str().unwrap_or_default()), }) }