diff --git a/Cargo.lock b/Cargo.lock index ca263c1..958f70d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1052,7 +1052,7 @@ dependencies = [ "thiserror", "tokio 1.28.2", "tracing", - "uuid 1.3.3", + "uuid 1.3.4", "whatlang", ] @@ -1855,7 +1855,7 @@ dependencies = [ "thiserror", "tokio 1.28.2", "toml 0.7.4", - "uuid 1.3.3", + "uuid 1.3.4", ] [[package]] @@ -1901,7 +1901,7 @@ dependencies = [ "http", "rand 0.8.5", "unidecode", - "uuid 1.3.3", + "uuid 1.3.4", ] [[package]] @@ -1964,19 +1964,21 @@ dependencies = [ "serde", "tokio 1.28.2", "tracing", - "uuid 1.3.3", + "uuid 1.3.4", ] [[package]] name = "file-storage-s3" version = "0.1.0" dependencies = [ + "async-trait", "file-storage-adapter", "futures 0.3.28", "rust-s3", "serde", "tokio 1.28.2", "tracing", + "uuid 1.3.4", ] [[package]] @@ -3390,7 +3392,7 @@ dependencies = [ "sqlx-core", "thiserror", "tracing", - "uuid 1.3.3", + "uuid 1.3.4", "validator", ] @@ -3629,7 +3631,7 @@ dependencies = [ "thiserror", "tokio 1.28.2", "tracing", - "uuid 1.3.3", + "uuid 1.3.4", ] [[package]] @@ -3774,7 +3776,7 @@ dependencies = [ "toml 0.7.4", "tracing", "traitcast", - "uuid 1.3.3", + "uuid 1.3.4", ] [[package]] @@ -3834,7 +3836,7 @@ dependencies = [ "thiserror", "tokio 1.28.2", "tracing", - "uuid 1.3.3", + "uuid 1.3.4", "wapc", "wapc-codec", "wapc-pool", @@ -5016,7 +5018,7 @@ dependencies = [ "thiserror", "tokio 1.28.2", "tracing", - "uuid 1.3.3", + "uuid 1.3.4", "whatlang", ] @@ -5417,7 +5419,7 @@ dependencies = [ "time 0.3.20", "tokio-stream", "url", - "uuid 1.3.3", + "uuid 1.3.4", "webpki-roots", "whoami", ] @@ -5488,7 +5490,7 @@ dependencies = [ "thiserror", "tokio 1.28.2", "tracing", - "uuid 1.3.3", + "uuid 1.3.4", ] [[package]] @@ -5828,7 +5830,7 @@ dependencies = [ "thiserror", "tokio 1.28.2", "tracing", - "uuid 1.3.3", + "uuid 1.3.4", ] [[package]] @@ -6279,9 +6281,9 @@ dependencies = [ [[package]] name = "uuid" -version = "1.3.3" +version = "1.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "345444e32442451b267fc254ae85a209c64be56d2890e601a0c37ff0c3c5ecd2" +checksum = "0fa2982af2eec27de306107c027578ff7f423d65f7250e40ce0fea8f45248b81" dependencies = [ "atomic", "getrandom 0.2.9", diff --git a/crates/file-storage-adapter/src/lib.rs b/crates/file-storage-adapter/src/lib.rs index 67f66e9..a45f125 100644 --- a/crates/file-storage-adapter/src/lib.rs +++ b/crates/file-storage-adapter/src/lib.rs @@ -12,8 +12,16 @@ pub enum Error { CreateRootDir(String, std::io::Error), #[error("Failed to create file {0:?}: {1:?}")] OutputFile(PathBuf, std::io::Error), + #[error("Failed to create file on external drive")] + SaveExternal, + #[error("Failed to delete file from external drive")] + DeleteExternal, #[error("Failed to delete file {0}: {1:?}")] DeleteFile(Url, std::io::Error), + #[error("Invalid external storage config")] + ExternalDriveConfig, + #[error("Connection to external storage failed")] + ExternalConnection, } pub type SResult = Result; diff --git a/crates/file-storage-s3-plugin/Cargo.toml b/crates/file-storage-s3-plugin/Cargo.toml index dadce52..bdbd782 100644 --- a/crates/file-storage-s3-plugin/Cargo.toml +++ b/crates/file-storage-s3-plugin/Cargo.toml @@ -10,7 +10,9 @@ path = "src/lib.rs" [dependencies] file-storage-adapter = { workspace = true } futures = { version = "0.3.28" } -rust-s3 = { version = "0.33.0", features = [] } +rust-s3 = { version = "0.33.0", features = ['with-tokio'] } serde = { version = "1.0.163", features = ['derive'] } tokio = { version = "1.28.2" } tracing = { version = "0.1.37" } +async-trait = { version = "0.1.68" } +uuid = { version = "1.3.4", features = ['v4'] } diff --git a/crates/file-storage-s3-plugin/src/lib.rs b/crates/file-storage-s3-plugin/src/lib.rs index 7d12d9a..e51c972 100644 --- a/crates/file-storage-s3-plugin/src/lib.rs +++ b/crates/file-storage-s3-plugin/src/lib.rs @@ -1,14 +1,118 @@ -pub fn add(left: usize, right: usize) -> usize { - left + right +use std::path::{Path, PathBuf}; +use std::str::FromStr; + +use async_trait::async_trait; +use file_storage_adapter::{Error, PluginConfig, SResult, Url}; +use futures::{AsyncRead, TryFutureExt}; +use s3::creds::Credentials; +use s3::{Bucket, Region}; +use serde::{Deserialize, Serialize}; +use tracing::error; + +#[derive(Serialize, Deserialize, Debug, Clone)] +struct S3FileStorageConfig { + /// Bucket subdirectory in which files will be stored + bucket_prefix: Option, + bucket_name: String, + region: String, + access_key: Option, + secret_key: Option, + security_token: Option, + session_token: Option, + profile: Option, } -#[cfg(test)] -mod tests { - use super::*; +pub struct S3FileStorage { + pub bucket: Bucket, + pub bucket_prefix: Option, +} - #[test] - fn it_works() { - let result = add(2, 2); - assert_eq!(result, 4); +#[async_trait] +impl file_storage_adapter::FileStorage for S3FileStorage { + fn name() -> &'static str + where + Self: Sized, + { + "s3-file-storage" + } + + async fn new(config: PluginConfig) -> SResult + where + Self: Sized, + { + let config: S3FileStorageConfig = config.config().map_err(|e| { + error!("Invalid S3 config: {e}"); + Error::ConfigFormat + })?; + let bucket = Bucket::new( + &config.bucket_name, + Region::from_str(&config.region).expect("Infallible"), + Credentials::new( + config.access_key.as_deref(), + config.secret_key.as_deref(), + config.security_token.as_deref(), + config.session_token.as_deref(), + config.profile.as_deref(), + ) + .map_err(|e| { + error!("Invalid s3 credentials: {e}"); + Error::ExternalDriveConfig + })?, + ) + .map_err(|e| { + error!("Failed to connect to s3 bucket: {e}"); + Error::ExternalConnection + })? + .with_path_style(); + Ok(Self { + bucket, + bucket_prefix: config.bucket_prefix, + }) + } + + async fn store(&mut self, mut file: Box) -> SResult { + let name = uuid::Uuid::new_v4().hyphenated().to_string(); + let mut path = PathBuf::new(); + if let Some(prefix) = self.bucket_prefix.as_ref() { + path = path.join(prefix); + }; + path = path.join(name); + + let status_code: u16 = self + .bucket + .put_object_stream(file.as_mut(), &path) + .map_err(|e| { + error!("Failed to upload file to s3: {e}"); + Error::Save + }) + .await?; + + if status_code >= 300 { + Err(Error::SaveExternal) + } else { + Ok(Url { + base_url: format!( + "{}{}", + self.bucket.region().scheme(), + self.bucket.region().host() + ), + file_name: path + .as_os_str() + .to_str() + .expect("Invalid s3 path") + .to_string(), + }) + } + } + + async fn erase(&mut self, file_name: Url) -> SResult { + self.bucket + .delete_object(file_name.to_string()) + .await + .map_err(|e| { + error!("Unable to delete file from external drive: {e}"); + Error::DeleteExternal + }) + .map(|r| r.bytes().len()) } }