Add s3 file storage
This commit is contained in:
parent
45c624679f
commit
132fb52358
30
Cargo.lock
generated
30
Cargo.lock
generated
@ -1052,7 +1052,7 @@ dependencies = [
|
|||||||
"thiserror",
|
"thiserror",
|
||||||
"tokio 1.28.2",
|
"tokio 1.28.2",
|
||||||
"tracing",
|
"tracing",
|
||||||
"uuid 1.3.3",
|
"uuid 1.3.4",
|
||||||
"whatlang",
|
"whatlang",
|
||||||
]
|
]
|
||||||
|
|
||||||
@ -1855,7 +1855,7 @@ dependencies = [
|
|||||||
"thiserror",
|
"thiserror",
|
||||||
"tokio 1.28.2",
|
"tokio 1.28.2",
|
||||||
"toml 0.7.4",
|
"toml 0.7.4",
|
||||||
"uuid 1.3.3",
|
"uuid 1.3.4",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
@ -1901,7 +1901,7 @@ dependencies = [
|
|||||||
"http",
|
"http",
|
||||||
"rand 0.8.5",
|
"rand 0.8.5",
|
||||||
"unidecode",
|
"unidecode",
|
||||||
"uuid 1.3.3",
|
"uuid 1.3.4",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
@ -1964,19 +1964,21 @@ dependencies = [
|
|||||||
"serde",
|
"serde",
|
||||||
"tokio 1.28.2",
|
"tokio 1.28.2",
|
||||||
"tracing",
|
"tracing",
|
||||||
"uuid 1.3.3",
|
"uuid 1.3.4",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "file-storage-s3"
|
name = "file-storage-s3"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
|
"async-trait",
|
||||||
"file-storage-adapter",
|
"file-storage-adapter",
|
||||||
"futures 0.3.28",
|
"futures 0.3.28",
|
||||||
"rust-s3",
|
"rust-s3",
|
||||||
"serde",
|
"serde",
|
||||||
"tokio 1.28.2",
|
"tokio 1.28.2",
|
||||||
"tracing",
|
"tracing",
|
||||||
|
"uuid 1.3.4",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
@ -3390,7 +3392,7 @@ dependencies = [
|
|||||||
"sqlx-core",
|
"sqlx-core",
|
||||||
"thiserror",
|
"thiserror",
|
||||||
"tracing",
|
"tracing",
|
||||||
"uuid 1.3.3",
|
"uuid 1.3.4",
|
||||||
"validator",
|
"validator",
|
||||||
]
|
]
|
||||||
|
|
||||||
@ -3629,7 +3631,7 @@ dependencies = [
|
|||||||
"thiserror",
|
"thiserror",
|
||||||
"tokio 1.28.2",
|
"tokio 1.28.2",
|
||||||
"tracing",
|
"tracing",
|
||||||
"uuid 1.3.3",
|
"uuid 1.3.4",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
@ -3774,7 +3776,7 @@ dependencies = [
|
|||||||
"toml 0.7.4",
|
"toml 0.7.4",
|
||||||
"tracing",
|
"tracing",
|
||||||
"traitcast",
|
"traitcast",
|
||||||
"uuid 1.3.3",
|
"uuid 1.3.4",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
@ -3834,7 +3836,7 @@ dependencies = [
|
|||||||
"thiserror",
|
"thiserror",
|
||||||
"tokio 1.28.2",
|
"tokio 1.28.2",
|
||||||
"tracing",
|
"tracing",
|
||||||
"uuid 1.3.3",
|
"uuid 1.3.4",
|
||||||
"wapc",
|
"wapc",
|
||||||
"wapc-codec",
|
"wapc-codec",
|
||||||
"wapc-pool",
|
"wapc-pool",
|
||||||
@ -5016,7 +5018,7 @@ dependencies = [
|
|||||||
"thiserror",
|
"thiserror",
|
||||||
"tokio 1.28.2",
|
"tokio 1.28.2",
|
||||||
"tracing",
|
"tracing",
|
||||||
"uuid 1.3.3",
|
"uuid 1.3.4",
|
||||||
"whatlang",
|
"whatlang",
|
||||||
]
|
]
|
||||||
|
|
||||||
@ -5417,7 +5419,7 @@ dependencies = [
|
|||||||
"time 0.3.20",
|
"time 0.3.20",
|
||||||
"tokio-stream",
|
"tokio-stream",
|
||||||
"url",
|
"url",
|
||||||
"uuid 1.3.3",
|
"uuid 1.3.4",
|
||||||
"webpki-roots",
|
"webpki-roots",
|
||||||
"whoami",
|
"whoami",
|
||||||
]
|
]
|
||||||
@ -5488,7 +5490,7 @@ dependencies = [
|
|||||||
"thiserror",
|
"thiserror",
|
||||||
"tokio 1.28.2",
|
"tokio 1.28.2",
|
||||||
"tracing",
|
"tracing",
|
||||||
"uuid 1.3.3",
|
"uuid 1.3.4",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
@ -5828,7 +5830,7 @@ dependencies = [
|
|||||||
"thiserror",
|
"thiserror",
|
||||||
"tokio 1.28.2",
|
"tokio 1.28.2",
|
||||||
"tracing",
|
"tracing",
|
||||||
"uuid 1.3.3",
|
"uuid 1.3.4",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
@ -6279,9 +6281,9 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "uuid"
|
name = "uuid"
|
||||||
version = "1.3.3"
|
version = "1.3.4"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "345444e32442451b267fc254ae85a209c64be56d2890e601a0c37ff0c3c5ecd2"
|
checksum = "0fa2982af2eec27de306107c027578ff7f423d65f7250e40ce0fea8f45248b81"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"atomic",
|
"atomic",
|
||||||
"getrandom 0.2.9",
|
"getrandom 0.2.9",
|
||||||
|
@ -12,8 +12,16 @@ pub enum Error {
|
|||||||
CreateRootDir(String, std::io::Error),
|
CreateRootDir(String, std::io::Error),
|
||||||
#[error("Failed to create file {0:?}: {1:?}")]
|
#[error("Failed to create file {0:?}: {1:?}")]
|
||||||
OutputFile(PathBuf, std::io::Error),
|
OutputFile(PathBuf, std::io::Error),
|
||||||
|
#[error("Failed to create file on external drive")]
|
||||||
|
SaveExternal,
|
||||||
|
#[error("Failed to delete file from external drive")]
|
||||||
|
DeleteExternal,
|
||||||
#[error("Failed to delete file {0}: {1:?}")]
|
#[error("Failed to delete file {0}: {1:?}")]
|
||||||
DeleteFile(Url, std::io::Error),
|
DeleteFile(Url, std::io::Error),
|
||||||
|
#[error("Invalid external storage config")]
|
||||||
|
ExternalDriveConfig,
|
||||||
|
#[error("Connection to external storage failed")]
|
||||||
|
ExternalConnection,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub type SResult<T> = Result<T, Error>;
|
pub type SResult<T> = Result<T, Error>;
|
||||||
|
@ -10,7 +10,9 @@ path = "src/lib.rs"
|
|||||||
[dependencies]
|
[dependencies]
|
||||||
file-storage-adapter = { workspace = true }
|
file-storage-adapter = { workspace = true }
|
||||||
futures = { version = "0.3.28" }
|
futures = { version = "0.3.28" }
|
||||||
rust-s3 = { version = "0.33.0", features = [] }
|
rust-s3 = { version = "0.33.0", features = ['with-tokio'] }
|
||||||
serde = { version = "1.0.163", features = ['derive'] }
|
serde = { version = "1.0.163", features = ['derive'] }
|
||||||
tokio = { version = "1.28.2" }
|
tokio = { version = "1.28.2" }
|
||||||
tracing = { version = "0.1.37" }
|
tracing = { version = "0.1.37" }
|
||||||
|
async-trait = { version = "0.1.68" }
|
||||||
|
uuid = { version = "1.3.4", features = ['v4'] }
|
||||||
|
@ -1,14 +1,118 @@
|
|||||||
pub fn add(left: usize, right: usize) -> usize {
|
use std::path::{Path, PathBuf};
|
||||||
left + right
|
use std::str::FromStr;
|
||||||
|
|
||||||
|
use async_trait::async_trait;
|
||||||
|
use file_storage_adapter::{Error, PluginConfig, SResult, Url};
|
||||||
|
use futures::{AsyncRead, TryFutureExt};
|
||||||
|
use s3::creds::Credentials;
|
||||||
|
use s3::{Bucket, Region};
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use tracing::error;
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||||
|
struct S3FileStorageConfig {
|
||||||
|
/// Bucket subdirectory in which files will be stored
|
||||||
|
bucket_prefix: Option<String>,
|
||||||
|
bucket_name: String,
|
||||||
|
region: String,
|
||||||
|
access_key: Option<String>,
|
||||||
|
secret_key: Option<String>,
|
||||||
|
security_token: Option<String>,
|
||||||
|
session_token: Option<String>,
|
||||||
|
profile: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
pub struct S3FileStorage {
|
||||||
mod tests {
|
pub bucket: Bucket,
|
||||||
use super::*;
|
pub bucket_prefix: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[async_trait]
|
||||||
fn it_works() {
|
impl file_storage_adapter::FileStorage for S3FileStorage {
|
||||||
let result = add(2, 2);
|
fn name() -> &'static str
|
||||||
assert_eq!(result, 4);
|
where
|
||||||
|
Self: Sized,
|
||||||
|
{
|
||||||
|
"s3-file-storage"
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn new(config: PluginConfig) -> SResult<Self>
|
||||||
|
where
|
||||||
|
Self: Sized,
|
||||||
|
{
|
||||||
|
let config: S3FileStorageConfig = config.config().map_err(|e| {
|
||||||
|
error!("Invalid S3 config: {e}");
|
||||||
|
Error::ConfigFormat
|
||||||
|
})?;
|
||||||
|
let bucket = Bucket::new(
|
||||||
|
&config.bucket_name,
|
||||||
|
Region::from_str(&config.region).expect("Infallible"),
|
||||||
|
Credentials::new(
|
||||||
|
config.access_key.as_deref(),
|
||||||
|
config.secret_key.as_deref(),
|
||||||
|
config.security_token.as_deref(),
|
||||||
|
config.session_token.as_deref(),
|
||||||
|
config.profile.as_deref(),
|
||||||
|
)
|
||||||
|
.map_err(|e| {
|
||||||
|
error!("Invalid s3 credentials: {e}");
|
||||||
|
Error::ExternalDriveConfig
|
||||||
|
})?,
|
||||||
|
)
|
||||||
|
.map_err(|e| {
|
||||||
|
error!("Failed to connect to s3 bucket: {e}");
|
||||||
|
Error::ExternalConnection
|
||||||
|
})?
|
||||||
|
.with_path_style();
|
||||||
|
Ok(Self {
|
||||||
|
bucket,
|
||||||
|
bucket_prefix: config.bucket_prefix,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn store(&mut self, mut file: Box<dyn AsyncRead + Unpin + Send>) -> SResult<Url> {
|
||||||
|
let name = uuid::Uuid::new_v4().hyphenated().to_string();
|
||||||
|
let mut path = PathBuf::new();
|
||||||
|
if let Some(prefix) = self.bucket_prefix.as_ref() {
|
||||||
|
path = path.join(prefix);
|
||||||
|
};
|
||||||
|
path = path.join(name);
|
||||||
|
|
||||||
|
let status_code: u16 = self
|
||||||
|
.bucket
|
||||||
|
.put_object_stream(file.as_mut(), &path)
|
||||||
|
.map_err(|e| {
|
||||||
|
error!("Failed to upload file to s3: {e}");
|
||||||
|
Error::Save
|
||||||
|
})
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
if status_code >= 300 {
|
||||||
|
Err(Error::SaveExternal)
|
||||||
|
} else {
|
||||||
|
Ok(Url {
|
||||||
|
base_url: format!(
|
||||||
|
"{}{}",
|
||||||
|
self.bucket.region().scheme(),
|
||||||
|
self.bucket.region().host()
|
||||||
|
),
|
||||||
|
file_name: path
|
||||||
|
.as_os_str()
|
||||||
|
.to_str()
|
||||||
|
.expect("Invalid s3 path")
|
||||||
|
.to_string(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn erase(&mut self, file_name: Url) -> SResult<usize> {
|
||||||
|
self.bucket
|
||||||
|
.delete_object(file_name.to_string())
|
||||||
|
.await
|
||||||
|
.map_err(|e| {
|
||||||
|
error!("Unable to delete file from external drive: {e}");
|
||||||
|
Error::DeleteExternal
|
||||||
|
})
|
||||||
|
.map(|r| r.bytes().len())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user