#![feature(io_error_more)] use std::ffi::OsStr; use std::io::Write; use config::SharedAppConfig; use model::FileName; #[macro_export] macro_rules! fs_async_handler { ($msg: ty, $async: ident, $res: ty) => { impl actix::Handler<$msg> for FsManager { type Result = actix::ResponseActFuture>; fn handle(&mut self, msg: $msg, _ctx: &mut Self::Context) -> Self::Result { use actix::WrapFuture; let config = self.config.clone(); Box::pin(async { $async(msg, config).await }.into_actor(self)) } } }; } #[derive(Debug, thiserror::Error)] pub enum Error { #[error("Can't access file system. Please check privileges")] StorageUnavailable, #[error("Can't write to file. Please check privileges. {0:?}")] CantWrite(std::io::Error), #[error("Can't write to file. There's no more space on disk")] NoSpace, #[error("Can't remove file. Please check privileges. {0:?}")] CantRemove(std::io::Error), #[error("Can't write to file. Invalid path, no filename")] InvalidPath, } pub type Result = std::result::Result; pub struct FsManager { config: SharedAppConfig, } impl actix::Actor for FsManager { type Context = actix::Context; } impl FsManager { pub async fn build(config: SharedAppConfig) -> Result { Self::validate_fs(config.clone()).await?; Ok(Self { config }) } async fn validate_fs(config: SharedAppConfig) -> Result<()> { let l = config.lock(); let output_path = l.files().local_path(); let test_file = std::path::PathBuf::new() .join(output_path) .join(format!("{}", uuid::Uuid::new_v4())); tokio::fs::remove_file(test_file.clone()).await.ok(); tokio::fs::write(test_file.clone(), b"1") .await .map_err(|_| Error::StorageUnavailable)?; tokio::fs::remove_file(test_file.clone()) .await .map_err(|_| Error::StorageUnavailable) } } #[derive(actix::Message)] #[rtype(result = "Result<()>")] pub struct RemoveFile { pub file_name: String, } fs_async_handler!(RemoveFile, remove_file, ()); pub(crate) async fn remove_file(msg: RemoveFile, config: SharedAppConfig) -> Result<()> { let output_path = { let l = config.lock(); l.files().local_path() }; match tokio::fs::remove_file( std::path::PathBuf::new() .join(output_path) .join(msg.file_name), ) .await { Ok(_) => Ok(()), Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()), Err(e) => Err(Error::CantRemove(e)), } } #[derive(actix::Message)] #[rtype(result = "Result")] pub struct WriteFile { pub file_name: String, pub stream: tokio::sync::mpsc::UnboundedReceiver, } fs_async_handler!(WriteFile, write_file, FileName); pub(crate) async fn write_file(msg: WriteFile, config: SharedAppConfig) -> Result { let WriteFile { file_name, mut stream, } = msg; let p = std::path::Path::new(&file_name); let ext = p .file_name() .and_then(OsStr::to_str) .map(std::path::Path::new) .and_then(std::path::Path::extension) .and_then(OsStr::to_str) .map(String::from); let file_name = format!( "{}{}", uuid::Uuid::new_v4(), ext.map(|s| format!(".{s}")).unwrap_or_default() ); let output_path = { let l = config.lock(); l.files().local_path() }; let path = std::path::PathBuf::new().join(output_path).join(&file_name); let mut file = match std::fs::File::create(path) { Ok(f) => f, Err(e) => return Err(Error::CantWrite(e)), }; while let Some(b) = stream.recv().await { match file.write(&[b]) { Ok(_) => {} Err(e) if e.kind() == std::io::ErrorKind::StorageFull => return Err(Error::NoSpace), Err(e) => return Err(Error::CantWrite(e)), } } Ok(FileName::from(file_name)) }