Move remote_storage config related code into dedicated module (#8132)

Moves `RemoteStorageConfig` and related structs and functions into a
dedicated module. Also implements `Serialize` for the config structs
(requested in #8126).

Follow-up of #8126
This commit is contained in:
Arpad Müller
2024-06-24 12:29:54 +02:00
committed by GitHub
parent 78d9059fc7
commit 5446e08891
4 changed files with 285 additions and 252 deletions

View File

@@ -34,7 +34,7 @@ use utils::backoff;
use crate::metrics::{start_measuring_requests, AttemptOutcome, RequestKind};
use crate::{
error::Cancelled, AzureConfig, ConcurrencyLimiter, Download, DownloadError, Listing,
config::AzureConfig, error::Cancelled, ConcurrencyLimiter, Download, DownloadError, Listing,
ListingMode, RemotePath, RemoteStorage, StorageMetadata, TimeTravelError, TimeoutOrCancel,
};

View File

@@ -0,0 +1,277 @@
use std::{fmt::Debug, num::NonZeroUsize, str::FromStr, time::Duration};
use anyhow::bail;
use aws_sdk_s3::types::StorageClass;
use camino::Utf8PathBuf;
use serde::{Deserialize, Serialize};
use crate::{
DEFAULT_MAX_KEYS_PER_LIST_RESPONSE, DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT,
DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT,
};
/// External backup storage configuration, enough for creating a client for that storage.
#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
pub struct RemoteStorageConfig {
/// The storage connection configuration.
#[serde(flatten)]
pub storage: RemoteStorageKind,
/// A common timeout enforced for all requests after concurrency limiter permit has been
/// acquired.
#[serde(
with = "humantime_serde",
default = "default_timeout",
skip_serializing_if = "is_default_timeout"
)]
pub timeout: Duration,
}
fn default_timeout() -> Duration {
RemoteStorageConfig::DEFAULT_TIMEOUT
}
fn is_default_timeout(d: &Duration) -> bool {
*d == RemoteStorageConfig::DEFAULT_TIMEOUT
}
/// A kind of a remote storage to connect to, with its connection configuration.
#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
#[serde(untagged)]
pub enum RemoteStorageKind {
/// Storage based on local file system.
/// Specify a root folder to place all stored files into.
LocalFs { local_path: Utf8PathBuf },
/// AWS S3 based storage, storing all files in the S3 bucket
/// specified by the config
AwsS3(S3Config),
/// Azure Blob based storage, storing all files in the container
/// specified by the config
AzureContainer(AzureConfig),
}
/// AWS S3 bucket coordinates and access credentials to manage the bucket contents (read and write).
#[derive(Clone, PartialEq, Eq, Deserialize, Serialize)]
pub struct S3Config {
/// Name of the bucket to connect to.
pub bucket_name: String,
/// The region where the bucket is located at.
pub bucket_region: String,
/// A "subfolder" in the bucket, to use the same bucket separately by multiple remote storage users at once.
pub prefix_in_bucket: Option<String>,
/// A base URL to send S3 requests to.
/// By default, the endpoint is derived from a region name, assuming it's
/// an AWS S3 region name, erroring on wrong region name.
/// Endpoint provides a way to support other S3 flavors and their regions.
///
/// Example: `http://127.0.0.1:5000`
pub endpoint: Option<String>,
/// AWS S3 has various limits on its API calls, we need not to exceed those.
/// See [`DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT`] for more details.
#[serde(default = "default_remote_storage_s3_concurrency_limit")]
pub concurrency_limit: NonZeroUsize,
#[serde(default = "default_max_keys_per_list_response")]
pub max_keys_per_list_response: Option<i32>,
#[serde(
deserialize_with = "deserialize_storage_class",
serialize_with = "serialize_storage_class",
default
)]
pub upload_storage_class: Option<StorageClass>,
}
fn default_remote_storage_s3_concurrency_limit() -> NonZeroUsize {
DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT
.try_into()
.unwrap()
}
fn default_max_keys_per_list_response() -> Option<i32> {
DEFAULT_MAX_KEYS_PER_LIST_RESPONSE
}
impl Debug for S3Config {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("S3Config")
.field("bucket_name", &self.bucket_name)
.field("bucket_region", &self.bucket_region)
.field("prefix_in_bucket", &self.prefix_in_bucket)
.field("concurrency_limit", &self.concurrency_limit)
.field(
"max_keys_per_list_response",
&self.max_keys_per_list_response,
)
.finish()
}
}
/// Azure bucket coordinates and access credentials to manage the bucket contents (read and write).
#[derive(Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct AzureConfig {
/// Name of the container to connect to.
pub container_name: String,
/// Name of the storage account the container is inside of
pub storage_account: Option<String>,
/// The region where the bucket is located at.
pub container_region: String,
/// A "subfolder" in the container, to use the same container separately by multiple remote storage users at once.
pub prefix_in_container: Option<String>,
/// Azure has various limits on its API calls, we need not to exceed those.
/// See [`DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT`] for more details.
#[serde(default = "default_remote_storage_azure_concurrency_limit")]
pub concurrency_limit: NonZeroUsize,
#[serde(default = "default_max_keys_per_list_response")]
pub max_keys_per_list_response: Option<i32>,
}
fn default_remote_storage_azure_concurrency_limit() -> NonZeroUsize {
NonZeroUsize::new(DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT).unwrap()
}
impl Debug for AzureConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AzureConfig")
.field("bucket_name", &self.container_name)
.field("storage_account", &self.storage_account)
.field("bucket_region", &self.container_region)
.field("prefix_in_container", &self.prefix_in_container)
.field("concurrency_limit", &self.concurrency_limit)
.field(
"max_keys_per_list_response",
&self.max_keys_per_list_response,
)
.finish()
}
}
fn deserialize_storage_class<'de, D: serde::Deserializer<'de>>(
deserializer: D,
) -> Result<Option<StorageClass>, D::Error> {
Option::<String>::deserialize(deserializer).and_then(|s| {
if let Some(s) = s {
use serde::de::Error;
let storage_class = StorageClass::from_str(&s).expect("infallible");
#[allow(deprecated)]
if matches!(storage_class, StorageClass::Unknown(_)) {
return Err(D::Error::custom(format!(
"Specified storage class unknown to SDK: '{s}'. Allowed values: {:?}",
StorageClass::values()
)));
}
Ok(Some(storage_class))
} else {
Ok(None)
}
})
}
fn serialize_storage_class<S: serde::Serializer>(
val: &Option<StorageClass>,
serializer: S,
) -> Result<S::Ok, S::Error> {
let val = val.as_ref().map(StorageClass::as_str);
Option::<&str>::serialize(&val, serializer)
}
impl RemoteStorageConfig {
pub const DEFAULT_TIMEOUT: Duration = std::time::Duration::from_secs(120);
pub fn from_toml(toml: &toml_edit::Item) -> anyhow::Result<Option<RemoteStorageConfig>> {
let document: toml_edit::Document = match toml {
toml_edit::Item::Table(toml) => toml.clone().into(),
toml_edit::Item::Value(toml_edit::Value::InlineTable(toml)) => {
toml.clone().into_table().into()
}
_ => bail!("toml not a table or inline table"),
};
if document.is_empty() {
return Ok(None);
}
Ok(Some(toml_edit::de::from_document(document)?))
}
}
#[cfg(test)]
mod tests {
use super::*;
fn parse(input: &str) -> anyhow::Result<Option<RemoteStorageConfig>> {
let toml = input.parse::<toml_edit::Document>().unwrap();
RemoteStorageConfig::from_toml(toml.as_item())
}
#[test]
fn parse_localfs_config_with_timeout() {
let input = "local_path = '.'
timeout = '5s'";
let config = parse(input).unwrap().expect("it exists");
assert_eq!(
config,
RemoteStorageConfig {
storage: RemoteStorageKind::LocalFs {
local_path: Utf8PathBuf::from(".")
},
timeout: Duration::from_secs(5)
}
);
}
#[test]
fn test_s3_parsing() {
let toml = "\
bucket_name = 'foo-bar'
bucket_region = 'eu-central-1'
upload_storage_class = 'INTELLIGENT_TIERING'
timeout = '7s'
";
let config = parse(toml).unwrap().expect("it exists");
assert_eq!(
config,
RemoteStorageConfig {
storage: RemoteStorageKind::AwsS3(S3Config {
bucket_name: "foo-bar".into(),
bucket_region: "eu-central-1".into(),
prefix_in_bucket: None,
endpoint: None,
concurrency_limit: default_remote_storage_s3_concurrency_limit(),
max_keys_per_list_response: DEFAULT_MAX_KEYS_PER_LIST_RESPONSE,
upload_storage_class: Some(StorageClass::IntelligentTiering),
}),
timeout: Duration::from_secs(7)
}
);
}
#[test]
fn test_azure_parsing() {
let toml = "\
container_name = 'foo-bar'
container_region = 'westeurope'
upload_storage_class = 'INTELLIGENT_TIERING'
timeout = '7s'
";
let config = parse(toml).unwrap().expect("it exists");
assert_eq!(
config,
RemoteStorageConfig {
storage: RemoteStorageKind::AzureContainer(AzureConfig {
container_name: "foo-bar".into(),
storage_account: None,
container_region: "westeurope".into(),
prefix_in_container: None,
concurrency_limit: default_remote_storage_azure_concurrency_limit(),
max_keys_per_list_response: DEFAULT_MAX_KEYS_PER_LIST_RESPONSE,
}),
timeout: Duration::from_secs(7)
}
);
}
}

View File

@@ -10,6 +10,7 @@
#![deny(clippy::undocumented_unsafe_blocks)]
mod azure_blob;
mod config;
mod error;
mod local_fs;
mod metrics;
@@ -18,17 +19,10 @@ mod simulate_failures;
mod support;
use std::{
collections::HashMap,
fmt::Debug,
num::{NonZeroU32, NonZeroUsize},
pin::Pin,
str::FromStr,
sync::Arc,
time::{Duration, SystemTime},
collections::HashMap, fmt::Debug, num::NonZeroU32, pin::Pin, sync::Arc, time::SystemTime,
};
use anyhow::{bail, Context};
use aws_sdk_s3::types::StorageClass;
use anyhow::Context;
use camino::{Utf8Path, Utf8PathBuf};
use bytes::Bytes;
@@ -44,6 +38,8 @@ pub use self::{
};
use s3_bucket::RequestKind;
pub use crate::config::{AzureConfig, RemoteStorageConfig, RemoteStorageKind, S3Config};
/// Azure SDK's ETag type is a simple String wrapper: we use this internally instead of repeating it here.
pub use azure_core::Etag;
@@ -525,168 +521,6 @@ impl<const N: usize> From<[(&str, &str); N]> for StorageMetadata {
}
}
/// External backup storage configuration, enough for creating a client for that storage.
#[derive(Debug, Clone, PartialEq, Eq, Deserialize)]
pub struct RemoteStorageConfig {
/// The storage connection configuration.
#[serde(flatten)]
pub storage: RemoteStorageKind,
/// A common timeout enforced for all requests after concurrency limiter permit has been
/// acquired.
#[serde(with = "humantime_serde", default = "default_timeout")]
pub timeout: Duration,
}
fn default_timeout() -> Duration {
RemoteStorageConfig::DEFAULT_TIMEOUT
}
/// A kind of a remote storage to connect to, with its connection configuration.
#[derive(Debug, Clone, PartialEq, Eq, Deserialize)]
#[serde(untagged)]
pub enum RemoteStorageKind {
/// Storage based on local file system.
/// Specify a root folder to place all stored files into.
LocalFs { local_path: Utf8PathBuf },
/// AWS S3 based storage, storing all files in the S3 bucket
/// specified by the config
AwsS3(S3Config),
/// Azure Blob based storage, storing all files in the container
/// specified by the config
AzureContainer(AzureConfig),
}
/// AWS S3 bucket coordinates and access credentials to manage the bucket contents (read and write).
#[derive(Clone, PartialEq, Eq, serde::Deserialize)]
pub struct S3Config {
/// Name of the bucket to connect to.
pub bucket_name: String,
/// The region where the bucket is located at.
pub bucket_region: String,
/// A "subfolder" in the bucket, to use the same bucket separately by multiple remote storage users at once.
pub prefix_in_bucket: Option<String>,
/// A base URL to send S3 requests to.
/// By default, the endpoint is derived from a region name, assuming it's
/// an AWS S3 region name, erroring on wrong region name.
/// Endpoint provides a way to support other S3 flavors and their regions.
///
/// Example: `http://127.0.0.1:5000`
pub endpoint: Option<String>,
/// AWS S3 has various limits on its API calls, we need not to exceed those.
/// See [`DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT`] for more details.
#[serde(default = "default_remote_storage_s3_concurrency_limit")]
pub concurrency_limit: NonZeroUsize,
#[serde(default = "default_max_keys_per_list_response")]
pub max_keys_per_list_response: Option<i32>,
#[serde(deserialize_with = "deserialize_storage_class", default)]
pub upload_storage_class: Option<StorageClass>,
}
fn default_remote_storage_s3_concurrency_limit() -> NonZeroUsize {
DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT
.try_into()
.unwrap()
}
fn default_max_keys_per_list_response() -> Option<i32> {
DEFAULT_MAX_KEYS_PER_LIST_RESPONSE
}
impl Debug for S3Config {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("S3Config")
.field("bucket_name", &self.bucket_name)
.field("bucket_region", &self.bucket_region)
.field("prefix_in_bucket", &self.prefix_in_bucket)
.field("concurrency_limit", &self.concurrency_limit)
.field(
"max_keys_per_list_response",
&self.max_keys_per_list_response,
)
.finish()
}
}
/// Azure bucket coordinates and access credentials to manage the bucket contents (read and write).
#[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct AzureConfig {
/// Name of the container to connect to.
pub container_name: String,
/// Name of the storage account the container is inside of
pub storage_account: Option<String>,
/// The region where the bucket is located at.
pub container_region: String,
/// A "subfolder" in the container, to use the same container separately by multiple remote storage users at once.
pub prefix_in_container: Option<String>,
/// Azure has various limits on its API calls, we need not to exceed those.
/// See [`DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT`] for more details.
#[serde(default = "default_remote_storage_azure_concurrency_limit")]
pub concurrency_limit: NonZeroUsize,
#[serde(default = "default_max_keys_per_list_response")]
pub max_keys_per_list_response: Option<i32>,
}
fn default_remote_storage_azure_concurrency_limit() -> NonZeroUsize {
NonZeroUsize::new(DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT).unwrap()
}
impl Debug for AzureConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AzureConfig")
.field("bucket_name", &self.container_name)
.field("storage_account", &self.storage_account)
.field("bucket_region", &self.container_region)
.field("prefix_in_container", &self.prefix_in_container)
.field("concurrency_limit", &self.concurrency_limit)
.field(
"max_keys_per_list_response",
&self.max_keys_per_list_response,
)
.finish()
}
}
fn deserialize_storage_class<'de, D: serde::Deserializer<'de>>(
deserializer: D,
) -> Result<Option<StorageClass>, D::Error> {
Option::<String>::deserialize(deserializer).and_then(|s| {
if let Some(s) = s {
use serde::de::Error;
let storage_class = StorageClass::from_str(&s).expect("infallible");
#[allow(deprecated)]
if matches!(storage_class, StorageClass::Unknown(_)) {
return Err(D::Error::custom(format!(
"Specified storage class unknown to SDK: '{s}'. Allowed values: {:?}",
StorageClass::values()
)));
}
Ok(Some(storage_class))
} else {
Ok(None)
}
})
}
impl RemoteStorageConfig {
pub const DEFAULT_TIMEOUT: Duration = std::time::Duration::from_secs(120);
pub fn from_toml(toml: &toml_edit::Item) -> anyhow::Result<Option<RemoteStorageConfig>> {
let document: toml_edit::Document = match toml {
toml_edit::Item::Table(toml) => toml.clone().into(),
toml_edit::Item::Value(toml_edit::Value::InlineTable(toml)) => {
toml.clone().into_table().into()
}
_ => bail!("toml not a table or inline table"),
};
if document.is_empty() {
return Ok(None);
}
Ok(Some(toml_edit::de::from_document(document)?))
}
}
struct ConcurrencyLimiter {
// Every request to S3 can be throttled or cancelled, if a certain number of requests per second is exceeded.
// Same goes to IAM, which is queried before every S3 request, if enabled. IAM has even lower RPS threshold.
@@ -733,11 +567,6 @@ impl ConcurrencyLimiter {
mod tests {
use super::*;
fn parse(input: &str) -> anyhow::Result<Option<RemoteStorageConfig>> {
let toml = input.parse::<toml_edit::Document>().unwrap();
RemoteStorageConfig::from_toml(toml.as_item())
}
#[test]
fn test_object_name() {
let k = RemotePath::new(Utf8Path::new("a/b/c")).unwrap();
@@ -759,77 +588,4 @@ mod tests {
let err = RemotePath::new(Utf8Path::new("/")).expect_err("Should fail on absolute paths");
assert_eq!(err.to_string(), "Path \"/\" is not relative");
}
#[test]
fn parse_localfs_config_with_timeout() {
let input = "local_path = '.'
timeout = '5s'";
let config = parse(input).unwrap().expect("it exists");
assert_eq!(
config,
RemoteStorageConfig {
storage: RemoteStorageKind::LocalFs {
local_path: Utf8PathBuf::from(".")
},
timeout: Duration::from_secs(5)
}
);
}
#[test]
fn test_s3_parsing() {
let toml = "\
bucket_name = 'foo-bar'
bucket_region = 'eu-central-1'
upload_storage_class = 'INTELLIGENT_TIERING'
timeout = '7s'
";
let config = parse(toml).unwrap().expect("it exists");
assert_eq!(
config,
RemoteStorageConfig {
storage: RemoteStorageKind::AwsS3(S3Config {
bucket_name: "foo-bar".into(),
bucket_region: "eu-central-1".into(),
prefix_in_bucket: None,
endpoint: None,
concurrency_limit: default_remote_storage_s3_concurrency_limit(),
max_keys_per_list_response: DEFAULT_MAX_KEYS_PER_LIST_RESPONSE,
upload_storage_class: Some(StorageClass::IntelligentTiering),
}),
timeout: Duration::from_secs(7)
}
);
}
#[test]
fn test_azure_parsing() {
let toml = "\
container_name = 'foo-bar'
container_region = 'westeurope'
upload_storage_class = 'INTELLIGENT_TIERING'
timeout = '7s'
";
let config = parse(toml).unwrap().expect("it exists");
assert_eq!(
config,
RemoteStorageConfig {
storage: RemoteStorageKind::AzureContainer(AzureConfig {
container_name: "foo-bar".into(),
storage_account: None,
container_region: "westeurope".into(),
prefix_in_container: None,
concurrency_limit: default_remote_storage_azure_concurrency_limit(),
max_keys_per_list_response: DEFAULT_MAX_KEYS_PER_LIST_RESPONSE,
}),
timeout: Duration::from_secs(7)
}
);
}
}

View File

@@ -46,12 +46,12 @@ use utils::backoff;
use super::StorageMetadata;
use crate::{
config::S3Config,
error::Cancelled,
metrics::{start_counting_cancelled_wait, start_measuring_requests},
support::PermitCarrying,
ConcurrencyLimiter, Download, DownloadError, Listing, ListingMode, RemotePath, RemoteStorage,
S3Config, TimeTravelError, TimeoutOrCancel, MAX_KEYS_PER_DELETE,
REMOTE_STORAGE_PREFIX_SEPARATOR,
TimeTravelError, TimeoutOrCancel, MAX_KEYS_PER_DELETE, REMOTE_STORAGE_PREFIX_SEPARATOR,
};
use crate::metrics::AttemptOutcome;