mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-10 06:52:55 +00:00
Use serde for RemoteStorageConfig parsing (#8126)
Adds a `Deserialize` impl to `RemoteStorageConfig`. We thus achieve the same as #7743 but with less repetitive code, by deriving `Deserialize` impls on `S3Config`, `AzureConfig`, and `RemoteStorageConfig`. The disadvantage is less useful error messages. The git history of this PR contains a state where we go via an intermediate representation, leveraging the `serde_json` crate, without it ever being actual json though. Also, the PR adds deserialization tests. Alternative to #7743 .
This commit is contained in:
5
Cargo.lock
generated
5
Cargo.lock
generated
@@ -1014,6 +1014,9 @@ name = "camino"
|
||||
version = "1.1.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c59e92b5a388f549b863a7bea62612c09f24c8393560709a54558a9abdfb3b9c"
|
||||
dependencies = [
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "camino-tempfile"
|
||||
@@ -4647,6 +4650,7 @@ dependencies = [
|
||||
"futures-util",
|
||||
"http-types",
|
||||
"humantime",
|
||||
"humantime-serde",
|
||||
"hyper 0.14.26",
|
||||
"itertools",
|
||||
"metrics",
|
||||
@@ -7367,6 +7371,7 @@ dependencies = [
|
||||
"base64 0.21.1",
|
||||
"base64ct",
|
||||
"bytes",
|
||||
"camino",
|
||||
"cc",
|
||||
"chrono",
|
||||
"clap",
|
||||
|
||||
@@ -14,8 +14,9 @@ aws-config.workspace = true
|
||||
aws-sdk-s3.workspace = true
|
||||
aws-credential-types.workspace = true
|
||||
bytes.workspace = true
|
||||
camino.workspace = true
|
||||
camino = { workspace = true, features = ["serde1"] }
|
||||
humantime.workspace = true
|
||||
humantime-serde.workspace = true
|
||||
hyper = { workspace = true, features = ["stream"] }
|
||||
futures.workspace = true
|
||||
rand.workspace = true
|
||||
|
||||
@@ -36,7 +36,6 @@ use futures::stream::Stream;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::sync::Semaphore;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use toml_edit::Item;
|
||||
use tracing::info;
|
||||
|
||||
pub use self::{
|
||||
@@ -451,7 +450,7 @@ impl GenericRemoteStorage {
|
||||
pub fn from_config(storage_config: &RemoteStorageConfig) -> anyhow::Result<Self> {
|
||||
let timeout = storage_config.timeout;
|
||||
Ok(match &storage_config.storage {
|
||||
RemoteStorageKind::LocalFs(path) => {
|
||||
RemoteStorageKind::LocalFs { local_path: path } => {
|
||||
info!("Using fs root '{path}' as a remote storage");
|
||||
Self::LocalFs(LocalFs::new(path.clone(), timeout)?)
|
||||
}
|
||||
@@ -527,21 +526,28 @@ impl<const N: usize> From<[(&str, &str); N]> for StorageMetadata {
|
||||
}
|
||||
|
||||
/// External backup storage configuration, enough for creating a client for that storage.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Deserialize)]
|
||||
pub struct RemoteStorageConfig {
|
||||
/// The storage connection configuration.
|
||||
#[serde(flatten)]
|
||||
pub storage: RemoteStorageKind,
|
||||
/// A common timeout enforced for all requests after concurrency limiter permit has been
|
||||
/// acquired.
|
||||
#[serde(with = "humantime_serde", default = "default_timeout")]
|
||||
pub timeout: Duration,
|
||||
}
|
||||
|
||||
fn default_timeout() -> Duration {
|
||||
RemoteStorageConfig::DEFAULT_TIMEOUT
|
||||
}
|
||||
|
||||
/// A kind of a remote storage to connect to, with its connection configuration.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Deserialize)]
|
||||
#[serde(untagged)]
|
||||
pub enum RemoteStorageKind {
|
||||
/// Storage based on local file system.
|
||||
/// Specify a root folder to place all stored files into.
|
||||
LocalFs(Utf8PathBuf),
|
||||
LocalFs { local_path: Utf8PathBuf },
|
||||
/// AWS S3 based storage, storing all files in the S3 bucket
|
||||
/// specified by the config
|
||||
AwsS3(S3Config),
|
||||
@@ -551,7 +557,7 @@ pub enum RemoteStorageKind {
|
||||
}
|
||||
|
||||
/// AWS S3 bucket coordinates and access credentials to manage the bucket contents (read and write).
|
||||
#[derive(Clone, PartialEq, Eq)]
|
||||
#[derive(Clone, PartialEq, Eq, serde::Deserialize)]
|
||||
pub struct S3Config {
|
||||
/// Name of the bucket to connect to.
|
||||
pub bucket_name: String,
|
||||
@@ -568,11 +574,24 @@ pub struct S3Config {
|
||||
pub endpoint: Option<String>,
|
||||
/// AWS S3 has various limits on its API calls, we need not to exceed those.
|
||||
/// See [`DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT`] for more details.
|
||||
#[serde(default = "default_remote_storage_s3_concurrency_limit")]
|
||||
pub concurrency_limit: NonZeroUsize,
|
||||
#[serde(default = "default_max_keys_per_list_response")]
|
||||
pub max_keys_per_list_response: Option<i32>,
|
||||
#[serde(deserialize_with = "deserialize_storage_class", default)]
|
||||
pub upload_storage_class: Option<StorageClass>,
|
||||
}
|
||||
|
||||
fn default_remote_storage_s3_concurrency_limit() -> NonZeroUsize {
|
||||
DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT
|
||||
.try_into()
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
fn default_max_keys_per_list_response() -> Option<i32> {
|
||||
DEFAULT_MAX_KEYS_PER_LIST_RESPONSE
|
||||
}
|
||||
|
||||
impl Debug for S3Config {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("S3Config")
|
||||
@@ -589,7 +608,7 @@ impl Debug for S3Config {
|
||||
}
|
||||
|
||||
/// Azure bucket coordinates and access credentials to manage the bucket contents (read and write).
|
||||
#[derive(Clone, PartialEq, Eq)]
|
||||
#[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
pub struct AzureConfig {
|
||||
/// Name of the container to connect to.
|
||||
pub container_name: String,
|
||||
@@ -601,10 +620,16 @@ pub struct AzureConfig {
|
||||
pub prefix_in_container: Option<String>,
|
||||
/// Azure has various limits on its API calls, we need not to exceed those.
|
||||
/// See [`DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT`] for more details.
|
||||
#[serde(default = "default_remote_storage_azure_concurrency_limit")]
|
||||
pub concurrency_limit: NonZeroUsize,
|
||||
#[serde(default = "default_max_keys_per_list_response")]
|
||||
pub max_keys_per_list_response: Option<i32>,
|
||||
}
|
||||
|
||||
fn default_remote_storage_azure_concurrency_limit() -> NonZeroUsize {
|
||||
NonZeroUsize::new(DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT).unwrap()
|
||||
}
|
||||
|
||||
impl Debug for AzureConfig {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("AzureConfig")
|
||||
@@ -621,167 +646,47 @@ impl Debug for AzureConfig {
|
||||
}
|
||||
}
|
||||
|
||||
fn deserialize_storage_class<'de, D: serde::Deserializer<'de>>(
|
||||
deserializer: D,
|
||||
) -> Result<Option<StorageClass>, D::Error> {
|
||||
Option::<String>::deserialize(deserializer).and_then(|s| {
|
||||
if let Some(s) = s {
|
||||
use serde::de::Error;
|
||||
let storage_class = StorageClass::from_str(&s).expect("infallible");
|
||||
#[allow(deprecated)]
|
||||
if matches!(storage_class, StorageClass::Unknown(_)) {
|
||||
return Err(D::Error::custom(format!(
|
||||
"Specified storage class unknown to SDK: '{s}'. Allowed values: {:?}",
|
||||
StorageClass::values()
|
||||
)));
|
||||
}
|
||||
Ok(Some(storage_class))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
impl RemoteStorageConfig {
|
||||
pub const DEFAULT_TIMEOUT: Duration = std::time::Duration::from_secs(120);
|
||||
|
||||
pub fn from_toml(toml: &toml_edit::Item) -> anyhow::Result<Option<RemoteStorageConfig>> {
|
||||
let local_path = toml.get("local_path");
|
||||
let bucket_name = toml.get("bucket_name");
|
||||
let bucket_region = toml.get("bucket_region");
|
||||
let container_name = toml.get("container_name");
|
||||
let container_region = toml.get("container_region");
|
||||
|
||||
let use_azure = container_name.is_some() && container_region.is_some();
|
||||
|
||||
let default_concurrency_limit = if use_azure {
|
||||
DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT
|
||||
} else {
|
||||
DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT
|
||||
let document: toml_edit::Document = match toml {
|
||||
toml_edit::Item::Table(toml) => toml.clone().into(),
|
||||
toml_edit::Item::Value(toml_edit::Value::InlineTable(toml)) => {
|
||||
toml.clone().into_table().into()
|
||||
}
|
||||
_ => bail!("toml not a table or inline table"),
|
||||
};
|
||||
let concurrency_limit = NonZeroUsize::new(
|
||||
parse_optional_integer("concurrency_limit", toml)?.unwrap_or(default_concurrency_limit),
|
||||
)
|
||||
.context("Failed to parse 'concurrency_limit' as a positive integer")?;
|
||||
|
||||
let max_keys_per_list_response =
|
||||
parse_optional_integer::<i32, _>("max_keys_per_list_response", toml)
|
||||
.context("Failed to parse 'max_keys_per_list_response' as a positive integer")?
|
||||
.or(DEFAULT_MAX_KEYS_PER_LIST_RESPONSE);
|
||||
|
||||
let endpoint = toml
|
||||
.get("endpoint")
|
||||
.map(|endpoint| parse_toml_string("endpoint", endpoint))
|
||||
.transpose()?;
|
||||
|
||||
let timeout = toml
|
||||
.get("timeout")
|
||||
.map(|timeout| {
|
||||
timeout
|
||||
.as_str()
|
||||
.ok_or_else(|| anyhow::Error::msg("timeout was not a string"))
|
||||
})
|
||||
.transpose()
|
||||
.and_then(|timeout| {
|
||||
timeout
|
||||
.map(humantime::parse_duration)
|
||||
.transpose()
|
||||
.map_err(anyhow::Error::new)
|
||||
})
|
||||
.context("parse timeout")?
|
||||
.unwrap_or(Self::DEFAULT_TIMEOUT);
|
||||
|
||||
if timeout < Duration::from_secs(1) {
|
||||
bail!("timeout was specified as {timeout:?} which is too low");
|
||||
if document.is_empty() {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let storage = match (
|
||||
local_path,
|
||||
bucket_name,
|
||||
bucket_region,
|
||||
container_name,
|
||||
container_region,
|
||||
) {
|
||||
// no 'local_path' nor 'bucket_name' options are provided, consider this remote storage disabled
|
||||
(None, None, None, None, None) => return Ok(None),
|
||||
(_, Some(_), None, ..) => {
|
||||
bail!("'bucket_region' option is mandatory if 'bucket_name' is given ")
|
||||
}
|
||||
(_, None, Some(_), ..) => {
|
||||
bail!("'bucket_name' option is mandatory if 'bucket_region' is given ")
|
||||
}
|
||||
(None, Some(bucket_name), Some(bucket_region), ..) => {
|
||||
RemoteStorageKind::AwsS3(S3Config {
|
||||
bucket_name: parse_toml_string("bucket_name", bucket_name)?,
|
||||
bucket_region: parse_toml_string("bucket_region", bucket_region)?,
|
||||
prefix_in_bucket: toml
|
||||
.get("prefix_in_bucket")
|
||||
.map(|prefix_in_bucket| {
|
||||
parse_toml_string("prefix_in_bucket", prefix_in_bucket)
|
||||
})
|
||||
.transpose()?,
|
||||
endpoint,
|
||||
concurrency_limit,
|
||||
max_keys_per_list_response,
|
||||
upload_storage_class: toml
|
||||
.get("upload_storage_class")
|
||||
.map(|prefix_in_bucket| -> anyhow::Result<_> {
|
||||
let s = parse_toml_string("upload_storage_class", prefix_in_bucket)?;
|
||||
let storage_class = StorageClass::from_str(&s).expect("infallible");
|
||||
#[allow(deprecated)]
|
||||
if matches!(storage_class, StorageClass::Unknown(_)) {
|
||||
bail!("Specified storage class unknown to SDK: '{s}'. Allowed values: {:?}", StorageClass::values());
|
||||
}
|
||||
Ok(storage_class)
|
||||
})
|
||||
.transpose()?,
|
||||
})
|
||||
}
|
||||
(_, _, _, Some(_), None) => {
|
||||
bail!("'container_name' option is mandatory if 'container_region' is given ")
|
||||
}
|
||||
(_, _, _, None, Some(_)) => {
|
||||
bail!("'container_name' option is mandatory if 'container_region' is given ")
|
||||
}
|
||||
(None, None, None, Some(container_name), Some(container_region)) => {
|
||||
RemoteStorageKind::AzureContainer(AzureConfig {
|
||||
container_name: parse_toml_string("container_name", container_name)?,
|
||||
storage_account: toml
|
||||
.get("storage_account")
|
||||
.map(|storage_account| {
|
||||
parse_toml_string("storage_account", storage_account)
|
||||
})
|
||||
.transpose()?,
|
||||
container_region: parse_toml_string("container_region", container_region)?,
|
||||
prefix_in_container: toml
|
||||
.get("prefix_in_container")
|
||||
.map(|prefix_in_container| {
|
||||
parse_toml_string("prefix_in_container", prefix_in_container)
|
||||
})
|
||||
.transpose()?,
|
||||
concurrency_limit,
|
||||
max_keys_per_list_response,
|
||||
})
|
||||
}
|
||||
(Some(local_path), None, None, None, None) => RemoteStorageKind::LocalFs(
|
||||
Utf8PathBuf::from(parse_toml_string("local_path", local_path)?),
|
||||
),
|
||||
(Some(_), Some(_), ..) => {
|
||||
bail!("'local_path' and 'bucket_name' are mutually exclusive")
|
||||
}
|
||||
(Some(_), _, _, Some(_), Some(_)) => {
|
||||
bail!("local_path and 'container_name' are mutually exclusive")
|
||||
}
|
||||
};
|
||||
|
||||
Ok(Some(RemoteStorageConfig { storage, timeout }))
|
||||
Ok(Some(toml_edit::de::from_document(document)?))
|
||||
}
|
||||
}
|
||||
|
||||
// Helper functions to parse a toml Item
|
||||
fn parse_optional_integer<I, E>(name: &str, item: &toml_edit::Item) -> anyhow::Result<Option<I>>
|
||||
where
|
||||
I: TryFrom<i64, Error = E>,
|
||||
E: std::error::Error + Send + Sync + 'static,
|
||||
{
|
||||
let toml_integer = match item.get(name) {
|
||||
Some(item) => item
|
||||
.as_integer()
|
||||
.with_context(|| format!("configure option {name} is not an integer"))?,
|
||||
None => return Ok(None),
|
||||
};
|
||||
|
||||
I::try_from(toml_integer)
|
||||
.map(Some)
|
||||
.with_context(|| format!("configure option {name} is too large"))
|
||||
}
|
||||
|
||||
fn parse_toml_string(name: &str, item: &Item) -> anyhow::Result<String> {
|
||||
let s = item
|
||||
.as_str()
|
||||
.with_context(|| format!("configure option {name} is not a string"))?;
|
||||
Ok(s.to_string())
|
||||
}
|
||||
|
||||
struct ConcurrencyLimiter {
|
||||
// Every request to S3 can be throttled or cancelled, if a certain number of requests per second is exceeded.
|
||||
// Same goes to IAM, which is queried before every S3 request, if enabled. IAM has even lower RPS threshold.
|
||||
@@ -828,6 +733,11 @@ impl ConcurrencyLimiter {
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
fn parse(input: &str) -> anyhow::Result<Option<RemoteStorageConfig>> {
|
||||
let toml = input.parse::<toml_edit::Document>().unwrap();
|
||||
RemoteStorageConfig::from_toml(toml.as_item())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_object_name() {
|
||||
let k = RemotePath::new(Utf8Path::new("a/b/c")).unwrap();
|
||||
@@ -855,18 +765,71 @@ mod tests {
|
||||
let input = "local_path = '.'
|
||||
timeout = '5s'";
|
||||
|
||||
let toml = input.parse::<toml_edit::Document>().unwrap();
|
||||
|
||||
let config = RemoteStorageConfig::from_toml(toml.as_item())
|
||||
.unwrap()
|
||||
.expect("it exists");
|
||||
let config = parse(input).unwrap().expect("it exists");
|
||||
|
||||
assert_eq!(
|
||||
config,
|
||||
RemoteStorageConfig {
|
||||
storage: RemoteStorageKind::LocalFs(Utf8PathBuf::from(".")),
|
||||
storage: RemoteStorageKind::LocalFs {
|
||||
local_path: Utf8PathBuf::from(".")
|
||||
},
|
||||
timeout: Duration::from_secs(5)
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_s3_parsing() {
|
||||
let toml = "\
|
||||
bucket_name = 'foo-bar'
|
||||
bucket_region = 'eu-central-1'
|
||||
upload_storage_class = 'INTELLIGENT_TIERING'
|
||||
timeout = '7s'
|
||||
";
|
||||
|
||||
let config = parse(toml).unwrap().expect("it exists");
|
||||
|
||||
assert_eq!(
|
||||
config,
|
||||
RemoteStorageConfig {
|
||||
storage: RemoteStorageKind::AwsS3(S3Config {
|
||||
bucket_name: "foo-bar".into(),
|
||||
bucket_region: "eu-central-1".into(),
|
||||
prefix_in_bucket: None,
|
||||
endpoint: None,
|
||||
concurrency_limit: default_remote_storage_s3_concurrency_limit(),
|
||||
max_keys_per_list_response: DEFAULT_MAX_KEYS_PER_LIST_RESPONSE,
|
||||
upload_storage_class: Some(StorageClass::IntelligentTiering),
|
||||
}),
|
||||
timeout: Duration::from_secs(7)
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_azure_parsing() {
|
||||
let toml = "\
|
||||
container_name = 'foo-bar'
|
||||
container_region = 'westeurope'
|
||||
upload_storage_class = 'INTELLIGENT_TIERING'
|
||||
timeout = '7s'
|
||||
";
|
||||
|
||||
let config = parse(toml).unwrap().expect("it exists");
|
||||
|
||||
assert_eq!(
|
||||
config,
|
||||
RemoteStorageConfig {
|
||||
storage: RemoteStorageKind::AzureContainer(AzureConfig {
|
||||
container_name: "foo-bar".into(),
|
||||
storage_account: None,
|
||||
container_region: "westeurope".into(),
|
||||
prefix_in_container: None,
|
||||
concurrency_limit: default_remote_storage_azure_concurrency_limit(),
|
||||
max_keys_per_list_response: DEFAULT_MAX_KEYS_PER_LIST_RESPONSE,
|
||||
}),
|
||||
timeout: Duration::from_secs(7)
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1463,7 +1463,7 @@ broker_endpoint = '{broker_endpoint}'
|
||||
assert_eq!(
|
||||
parsed_remote_storage_config,
|
||||
RemoteStorageConfig {
|
||||
storage: RemoteStorageKind::LocalFs(local_storage_path.clone()),
|
||||
storage: RemoteStorageKind::LocalFs { local_path: local_storage_path.clone() },
|
||||
timeout: RemoteStorageConfig::DEFAULT_TIMEOUT,
|
||||
},
|
||||
"Remote storage config should correctly parse the local FS config and fill other storage defaults"
|
||||
|
||||
@@ -850,7 +850,9 @@ mod test {
|
||||
std::fs::create_dir_all(remote_fs_dir)?;
|
||||
let remote_fs_dir = harness.conf.workdir.join("remote_fs").canonicalize_utf8()?;
|
||||
let storage_config = RemoteStorageConfig {
|
||||
storage: RemoteStorageKind::LocalFs(remote_fs_dir.clone()),
|
||||
storage: RemoteStorageKind::LocalFs {
|
||||
local_path: remote_fs_dir.clone(),
|
||||
},
|
||||
timeout: RemoteStorageConfig::DEFAULT_TIMEOUT,
|
||||
};
|
||||
let storage = GenericRemoteStorage::from_config(&storage_config).unwrap();
|
||||
|
||||
@@ -3906,7 +3906,9 @@ pub(crate) mod harness {
|
||||
let remote_fs_dir = conf.workdir.join("localfs");
|
||||
std::fs::create_dir_all(&remote_fs_dir).unwrap();
|
||||
let config = RemoteStorageConfig {
|
||||
storage: RemoteStorageKind::LocalFs(remote_fs_dir.clone()),
|
||||
storage: RemoteStorageKind::LocalFs {
|
||||
local_path: remote_fs_dir.clone(),
|
||||
},
|
||||
timeout: RemoteStorageConfig::DEFAULT_TIMEOUT,
|
||||
};
|
||||
let remote_storage = GenericRemoteStorage::from_config(&config).unwrap();
|
||||
|
||||
@@ -543,7 +543,9 @@ mod tests {
|
||||
rx: impl Stream<Item = RequestData>,
|
||||
) -> Vec<(u64, usize, i64)> {
|
||||
let remote_storage_config = RemoteStorageConfig {
|
||||
storage: RemoteStorageKind::LocalFs(tmpdir.to_path_buf()),
|
||||
storage: RemoteStorageKind::LocalFs {
|
||||
local_path: tmpdir.to_path_buf(),
|
||||
},
|
||||
timeout: std::time::Duration::from_secs(120),
|
||||
};
|
||||
let storage = GenericRemoteStorage::from_config(&remote_storage_config).unwrap();
|
||||
|
||||
@@ -25,6 +25,7 @@ axum = { version = "0.6", features = ["ws"] }
|
||||
base64 = { version = "0.21", features = ["alloc"] }
|
||||
base64ct = { version = "1", default-features = false, features = ["std"] }
|
||||
bytes = { version = "1", features = ["serde"] }
|
||||
camino = { version = "1", default-features = false, features = ["serde1"] }
|
||||
chrono = { version = "0.4", default-features = false, features = ["clock", "serde", "wasmbind"] }
|
||||
clap = { version = "4", features = ["derive", "string"] }
|
||||
clap_builder = { version = "4", default-features = false, features = ["color", "help", "std", "string", "suggestions", "usage"] }
|
||||
|
||||
Reference in New Issue
Block a user