From 75747cdbffeb0b6d2a2a311584368de68cd9aadc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Sat, 22 Jun 2024 19:57:09 +0200 Subject: [PATCH] Use serde for RemoteStorageConfig parsing (#8126) Adds a `Deserialize` impl to `RemoteStorageConfig`. We thus achieve the same as #7743 but with less repetitive code, by deriving `Deserialize` impls on `S3Config`, `AzureConfig`, and `RemoteStorageConfig`. The disadvantage is less useful error messages. The git history of this PR contains a state where we go via an intermediate representation, leveraging the `serde_json` crate, without it ever being actual json though. Also, the PR adds deserialization tests. Alternative to #7743 . --- Cargo.lock | 5 + libs/remote_storage/Cargo.toml | 3 +- libs/remote_storage/src/lib.rs | 289 ++++++++++++++----------------- pageserver/src/config.rs | 2 +- pageserver/src/deletion_queue.rs | 4 +- pageserver/src/tenant.rs | 4 +- proxy/src/context/parquet.rs | 4 +- workspace_hack/Cargo.toml | 1 + 8 files changed, 144 insertions(+), 168 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index cf8a0b3286..77bf012402 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1014,6 +1014,9 @@ name = "camino" version = "1.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c59e92b5a388f549b863a7bea62612c09f24c8393560709a54558a9abdfb3b9c" +dependencies = [ + "serde", +] [[package]] name = "camino-tempfile" @@ -4647,6 +4650,7 @@ dependencies = [ "futures-util", "http-types", "humantime", + "humantime-serde", "hyper 0.14.26", "itertools", "metrics", @@ -7367,6 +7371,7 @@ dependencies = [ "base64 0.21.1", "base64ct", "bytes", + "camino", "cc", "chrono", "clap", diff --git a/libs/remote_storage/Cargo.toml b/libs/remote_storage/Cargo.toml index 78da01c9a0..23d82b90bd 100644 --- a/libs/remote_storage/Cargo.toml +++ b/libs/remote_storage/Cargo.toml @@ -14,8 +14,9 @@ aws-config.workspace = true aws-sdk-s3.workspace = true aws-credential-types.workspace = true bytes.workspace = true -camino.workspace = true +camino = { workspace = true, features = ["serde1"] } humantime.workspace = true +humantime-serde.workspace = true hyper = { workspace = true, features = ["stream"] } futures.workspace = true rand.workspace = true diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs index 72748e156c..e39ac581c7 100644 --- a/libs/remote_storage/src/lib.rs +++ b/libs/remote_storage/src/lib.rs @@ -36,7 +36,6 @@ use futures::stream::Stream; use serde::{Deserialize, Serialize}; use tokio::sync::Semaphore; use tokio_util::sync::CancellationToken; -use toml_edit::Item; use tracing::info; pub use self::{ @@ -451,7 +450,7 @@ impl GenericRemoteStorage { pub fn from_config(storage_config: &RemoteStorageConfig) -> anyhow::Result { let timeout = storage_config.timeout; Ok(match &storage_config.storage { - RemoteStorageKind::LocalFs(path) => { + RemoteStorageKind::LocalFs { local_path: path } => { info!("Using fs root '{path}' as a remote storage"); Self::LocalFs(LocalFs::new(path.clone(), timeout)?) } @@ -527,21 +526,28 @@ impl From<[(&str, &str); N]> for StorageMetadata { } /// External backup storage configuration, enough for creating a client for that storage. -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, Deserialize)] pub struct RemoteStorageConfig { /// The storage connection configuration. + #[serde(flatten)] pub storage: RemoteStorageKind, /// A common timeout enforced for all requests after concurrency limiter permit has been /// acquired. + #[serde(with = "humantime_serde", default = "default_timeout")] pub timeout: Duration, } +fn default_timeout() -> Duration { + RemoteStorageConfig::DEFAULT_TIMEOUT +} + /// A kind of a remote storage to connect to, with its connection configuration. -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, Deserialize)] +#[serde(untagged)] pub enum RemoteStorageKind { /// Storage based on local file system. /// Specify a root folder to place all stored files into. - LocalFs(Utf8PathBuf), + LocalFs { local_path: Utf8PathBuf }, /// AWS S3 based storage, storing all files in the S3 bucket /// specified by the config AwsS3(S3Config), @@ -551,7 +557,7 @@ pub enum RemoteStorageKind { } /// AWS S3 bucket coordinates and access credentials to manage the bucket contents (read and write). -#[derive(Clone, PartialEq, Eq)] +#[derive(Clone, PartialEq, Eq, serde::Deserialize)] pub struct S3Config { /// Name of the bucket to connect to. pub bucket_name: String, @@ -568,11 +574,24 @@ pub struct S3Config { pub endpoint: Option, /// AWS S3 has various limits on its API calls, we need not to exceed those. /// See [`DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT`] for more details. + #[serde(default = "default_remote_storage_s3_concurrency_limit")] pub concurrency_limit: NonZeroUsize, + #[serde(default = "default_max_keys_per_list_response")] pub max_keys_per_list_response: Option, + #[serde(deserialize_with = "deserialize_storage_class", default)] pub upload_storage_class: Option, } +fn default_remote_storage_s3_concurrency_limit() -> NonZeroUsize { + DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT + .try_into() + .unwrap() +} + +fn default_max_keys_per_list_response() -> Option { + DEFAULT_MAX_KEYS_PER_LIST_RESPONSE +} + impl Debug for S3Config { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("S3Config") @@ -589,7 +608,7 @@ impl Debug for S3Config { } /// Azure bucket coordinates and access credentials to manage the bucket contents (read and write). -#[derive(Clone, PartialEq, Eq)] +#[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] pub struct AzureConfig { /// Name of the container to connect to. pub container_name: String, @@ -601,10 +620,16 @@ pub struct AzureConfig { pub prefix_in_container: Option, /// Azure has various limits on its API calls, we need not to exceed those. /// See [`DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT`] for more details. + #[serde(default = "default_remote_storage_azure_concurrency_limit")] pub concurrency_limit: NonZeroUsize, + #[serde(default = "default_max_keys_per_list_response")] pub max_keys_per_list_response: Option, } +fn default_remote_storage_azure_concurrency_limit() -> NonZeroUsize { + NonZeroUsize::new(DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT).unwrap() +} + impl Debug for AzureConfig { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("AzureConfig") @@ -621,167 +646,47 @@ impl Debug for AzureConfig { } } +fn deserialize_storage_class<'de, D: serde::Deserializer<'de>>( + deserializer: D, +) -> Result, D::Error> { + Option::::deserialize(deserializer).and_then(|s| { + if let Some(s) = s { + use serde::de::Error; + let storage_class = StorageClass::from_str(&s).expect("infallible"); + #[allow(deprecated)] + if matches!(storage_class, StorageClass::Unknown(_)) { + return Err(D::Error::custom(format!( + "Specified storage class unknown to SDK: '{s}'. Allowed values: {:?}", + StorageClass::values() + ))); + } + Ok(Some(storage_class)) + } else { + Ok(None) + } + }) +} + impl RemoteStorageConfig { pub const DEFAULT_TIMEOUT: Duration = std::time::Duration::from_secs(120); pub fn from_toml(toml: &toml_edit::Item) -> anyhow::Result> { - let local_path = toml.get("local_path"); - let bucket_name = toml.get("bucket_name"); - let bucket_region = toml.get("bucket_region"); - let container_name = toml.get("container_name"); - let container_region = toml.get("container_region"); - - let use_azure = container_name.is_some() && container_region.is_some(); - - let default_concurrency_limit = if use_azure { - DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT - } else { - DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT + let document: toml_edit::Document = match toml { + toml_edit::Item::Table(toml) => toml.clone().into(), + toml_edit::Item::Value(toml_edit::Value::InlineTable(toml)) => { + toml.clone().into_table().into() + } + _ => bail!("toml not a table or inline table"), }; - let concurrency_limit = NonZeroUsize::new( - parse_optional_integer("concurrency_limit", toml)?.unwrap_or(default_concurrency_limit), - ) - .context("Failed to parse 'concurrency_limit' as a positive integer")?; - let max_keys_per_list_response = - parse_optional_integer::("max_keys_per_list_response", toml) - .context("Failed to parse 'max_keys_per_list_response' as a positive integer")? - .or(DEFAULT_MAX_KEYS_PER_LIST_RESPONSE); - - let endpoint = toml - .get("endpoint") - .map(|endpoint| parse_toml_string("endpoint", endpoint)) - .transpose()?; - - let timeout = toml - .get("timeout") - .map(|timeout| { - timeout - .as_str() - .ok_or_else(|| anyhow::Error::msg("timeout was not a string")) - }) - .transpose() - .and_then(|timeout| { - timeout - .map(humantime::parse_duration) - .transpose() - .map_err(anyhow::Error::new) - }) - .context("parse timeout")? - .unwrap_or(Self::DEFAULT_TIMEOUT); - - if timeout < Duration::from_secs(1) { - bail!("timeout was specified as {timeout:?} which is too low"); + if document.is_empty() { + return Ok(None); } - let storage = match ( - local_path, - bucket_name, - bucket_region, - container_name, - container_region, - ) { - // no 'local_path' nor 'bucket_name' options are provided, consider this remote storage disabled - (None, None, None, None, None) => return Ok(None), - (_, Some(_), None, ..) => { - bail!("'bucket_region' option is mandatory if 'bucket_name' is given ") - } - (_, None, Some(_), ..) => { - bail!("'bucket_name' option is mandatory if 'bucket_region' is given ") - } - (None, Some(bucket_name), Some(bucket_region), ..) => { - RemoteStorageKind::AwsS3(S3Config { - bucket_name: parse_toml_string("bucket_name", bucket_name)?, - bucket_region: parse_toml_string("bucket_region", bucket_region)?, - prefix_in_bucket: toml - .get("prefix_in_bucket") - .map(|prefix_in_bucket| { - parse_toml_string("prefix_in_bucket", prefix_in_bucket) - }) - .transpose()?, - endpoint, - concurrency_limit, - max_keys_per_list_response, - upload_storage_class: toml - .get("upload_storage_class") - .map(|prefix_in_bucket| -> anyhow::Result<_> { - let s = parse_toml_string("upload_storage_class", prefix_in_bucket)?; - let storage_class = StorageClass::from_str(&s).expect("infallible"); - #[allow(deprecated)] - if matches!(storage_class, StorageClass::Unknown(_)) { - bail!("Specified storage class unknown to SDK: '{s}'. Allowed values: {:?}", StorageClass::values()); - } - Ok(storage_class) - }) - .transpose()?, - }) - } - (_, _, _, Some(_), None) => { - bail!("'container_name' option is mandatory if 'container_region' is given ") - } - (_, _, _, None, Some(_)) => { - bail!("'container_name' option is mandatory if 'container_region' is given ") - } - (None, None, None, Some(container_name), Some(container_region)) => { - RemoteStorageKind::AzureContainer(AzureConfig { - container_name: parse_toml_string("container_name", container_name)?, - storage_account: toml - .get("storage_account") - .map(|storage_account| { - parse_toml_string("storage_account", storage_account) - }) - .transpose()?, - container_region: parse_toml_string("container_region", container_region)?, - prefix_in_container: toml - .get("prefix_in_container") - .map(|prefix_in_container| { - parse_toml_string("prefix_in_container", prefix_in_container) - }) - .transpose()?, - concurrency_limit, - max_keys_per_list_response, - }) - } - (Some(local_path), None, None, None, None) => RemoteStorageKind::LocalFs( - Utf8PathBuf::from(parse_toml_string("local_path", local_path)?), - ), - (Some(_), Some(_), ..) => { - bail!("'local_path' and 'bucket_name' are mutually exclusive") - } - (Some(_), _, _, Some(_), Some(_)) => { - bail!("local_path and 'container_name' are mutually exclusive") - } - }; - - Ok(Some(RemoteStorageConfig { storage, timeout })) + Ok(Some(toml_edit::de::from_document(document)?)) } } -// Helper functions to parse a toml Item -fn parse_optional_integer(name: &str, item: &toml_edit::Item) -> anyhow::Result> -where - I: TryFrom, - E: std::error::Error + Send + Sync + 'static, -{ - let toml_integer = match item.get(name) { - Some(item) => item - .as_integer() - .with_context(|| format!("configure option {name} is not an integer"))?, - None => return Ok(None), - }; - - I::try_from(toml_integer) - .map(Some) - .with_context(|| format!("configure option {name} is too large")) -} - -fn parse_toml_string(name: &str, item: &Item) -> anyhow::Result { - let s = item - .as_str() - .with_context(|| format!("configure option {name} is not a string"))?; - Ok(s.to_string()) -} - struct ConcurrencyLimiter { // Every request to S3 can be throttled or cancelled, if a certain number of requests per second is exceeded. // Same goes to IAM, which is queried before every S3 request, if enabled. IAM has even lower RPS threshold. @@ -828,6 +733,11 @@ impl ConcurrencyLimiter { mod tests { use super::*; + fn parse(input: &str) -> anyhow::Result> { + let toml = input.parse::().unwrap(); + RemoteStorageConfig::from_toml(toml.as_item()) + } + #[test] fn test_object_name() { let k = RemotePath::new(Utf8Path::new("a/b/c")).unwrap(); @@ -855,18 +765,71 @@ mod tests { let input = "local_path = '.' timeout = '5s'"; - let toml = input.parse::().unwrap(); - - let config = RemoteStorageConfig::from_toml(toml.as_item()) - .unwrap() - .expect("it exists"); + let config = parse(input).unwrap().expect("it exists"); assert_eq!( config, RemoteStorageConfig { - storage: RemoteStorageKind::LocalFs(Utf8PathBuf::from(".")), + storage: RemoteStorageKind::LocalFs { + local_path: Utf8PathBuf::from(".") + }, timeout: Duration::from_secs(5) } ); } + + #[test] + fn test_s3_parsing() { + let toml = "\ + bucket_name = 'foo-bar' + bucket_region = 'eu-central-1' + upload_storage_class = 'INTELLIGENT_TIERING' + timeout = '7s' + "; + + let config = parse(toml).unwrap().expect("it exists"); + + assert_eq!( + config, + RemoteStorageConfig { + storage: RemoteStorageKind::AwsS3(S3Config { + bucket_name: "foo-bar".into(), + bucket_region: "eu-central-1".into(), + prefix_in_bucket: None, + endpoint: None, + concurrency_limit: default_remote_storage_s3_concurrency_limit(), + max_keys_per_list_response: DEFAULT_MAX_KEYS_PER_LIST_RESPONSE, + upload_storage_class: Some(StorageClass::IntelligentTiering), + }), + timeout: Duration::from_secs(7) + } + ); + } + + #[test] + fn test_azure_parsing() { + let toml = "\ + container_name = 'foo-bar' + container_region = 'westeurope' + upload_storage_class = 'INTELLIGENT_TIERING' + timeout = '7s' + "; + + let config = parse(toml).unwrap().expect("it exists"); + + assert_eq!( + config, + RemoteStorageConfig { + storage: RemoteStorageKind::AzureContainer(AzureConfig { + container_name: "foo-bar".into(), + storage_account: None, + container_region: "westeurope".into(), + prefix_in_container: None, + concurrency_limit: default_remote_storage_azure_concurrency_limit(), + max_keys_per_list_response: DEFAULT_MAX_KEYS_PER_LIST_RESPONSE, + }), + timeout: Duration::from_secs(7) + } + ); + } } diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index badea48b98..feb1363843 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -1463,7 +1463,7 @@ broker_endpoint = '{broker_endpoint}' assert_eq!( parsed_remote_storage_config, RemoteStorageConfig { - storage: RemoteStorageKind::LocalFs(local_storage_path.clone()), + storage: RemoteStorageKind::LocalFs { local_path: local_storage_path.clone() }, timeout: RemoteStorageConfig::DEFAULT_TIMEOUT, }, "Remote storage config should correctly parse the local FS config and fill other storage defaults" diff --git a/pageserver/src/deletion_queue.rs b/pageserver/src/deletion_queue.rs index 3960fc1b99..e779729f8d 100644 --- a/pageserver/src/deletion_queue.rs +++ b/pageserver/src/deletion_queue.rs @@ -850,7 +850,9 @@ mod test { std::fs::create_dir_all(remote_fs_dir)?; let remote_fs_dir = harness.conf.workdir.join("remote_fs").canonicalize_utf8()?; let storage_config = RemoteStorageConfig { - storage: RemoteStorageKind::LocalFs(remote_fs_dir.clone()), + storage: RemoteStorageKind::LocalFs { + local_path: remote_fs_dir.clone(), + }, timeout: RemoteStorageConfig::DEFAULT_TIMEOUT, }; let storage = GenericRemoteStorage::from_config(&storage_config).unwrap(); diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index ca5765c99b..ace95af10a 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -3906,7 +3906,9 @@ pub(crate) mod harness { let remote_fs_dir = conf.workdir.join("localfs"); std::fs::create_dir_all(&remote_fs_dir).unwrap(); let config = RemoteStorageConfig { - storage: RemoteStorageKind::LocalFs(remote_fs_dir.clone()), + storage: RemoteStorageKind::LocalFs { + local_path: remote_fs_dir.clone(), + }, timeout: RemoteStorageConfig::DEFAULT_TIMEOUT, }; let remote_storage = GenericRemoteStorage::from_config(&config).unwrap(); diff --git a/proxy/src/context/parquet.rs b/proxy/src/context/parquet.rs index 1355b7e1d8..e72bf199e3 100644 --- a/proxy/src/context/parquet.rs +++ b/proxy/src/context/parquet.rs @@ -543,7 +543,9 @@ mod tests { rx: impl Stream, ) -> Vec<(u64, usize, i64)> { let remote_storage_config = RemoteStorageConfig { - storage: RemoteStorageKind::LocalFs(tmpdir.to_path_buf()), + storage: RemoteStorageKind::LocalFs { + local_path: tmpdir.to_path_buf(), + }, timeout: std::time::Duration::from_secs(120), }; let storage = GenericRemoteStorage::from_config(&remote_storage_config).unwrap(); diff --git a/workspace_hack/Cargo.toml b/workspace_hack/Cargo.toml index df16c71789..139a5647c5 100644 --- a/workspace_hack/Cargo.toml +++ b/workspace_hack/Cargo.toml @@ -25,6 +25,7 @@ axum = { version = "0.6", features = ["ws"] } base64 = { version = "0.21", features = ["alloc"] } base64ct = { version = "1", default-features = false, features = ["std"] } bytes = { version = "1", features = ["serde"] } +camino = { version = "1", default-features = false, features = ["serde1"] } chrono = { version = "0.4", default-features = false, features = ["clock", "serde", "wasmbind"] } clap = { version = "4", features = ["derive", "string"] } clap_builder = { version = "4", default-features = false, features = ["color", "help", "std", "string", "suggestions", "usage"] }