From 9b6af2bcaddad49f59809c69717c23ced725dfa9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Mon, 18 Nov 2024 22:01:48 +0100 Subject: [PATCH] Add the ability to configure GenericRemoteStorage for the scrubber (#9652) Earlier work (#7547) has made the scrubber internally generic, but one could only configure it to use S3 storage. This is the final piece to make (most of, snapshotting still requires S3) the scrubber be able to be configured via GenericRemoteStorage. I.e. you can now set an env var like: ``` REMOTE_STORAGE_CONFIG='remote_storage = { bucket_name = "neon-dev-safekeeper-us-east-2d", bucket_region = "us-east-2" } ``` and the scrubber will read it instead. --- Cargo.lock | 1 - libs/remote_storage/src/config.rs | 21 ++- pageserver/ctl/Cargo.toml | 1 - pageserver/ctl/src/main.rs | 6 +- storage_scrubber/src/find_large_objects.rs | 4 +- storage_scrubber/src/garbage.rs | 4 +- storage_scrubber/src/lib.rs | 129 +++++++++++------- storage_scrubber/src/main.rs | 18 +-- .../src/scan_safekeeper_metadata.rs | 5 +- storage_scrubber/src/tenant_snapshot.rs | 14 +- 10 files changed, 116 insertions(+), 87 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index da8cefb219..c7af140f7d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3578,7 +3578,6 @@ dependencies = [ "thiserror", "tokio", "tokio-util", - "toml_edit", "utils", "workspace_hack", ] diff --git a/libs/remote_storage/src/config.rs b/libs/remote_storage/src/config.rs index d0e92411da..e99ae4f747 100644 --- a/libs/remote_storage/src/config.rs +++ b/libs/remote_storage/src/config.rs @@ -26,6 +26,16 @@ pub struct RemoteStorageConfig { pub timeout: Duration, } +impl RemoteStorageKind { + pub fn bucket_name(&self) -> Option<&str> { + match self { + RemoteStorageKind::LocalFs { .. } => None, + RemoteStorageKind::AwsS3(config) => Some(&config.bucket_name), + RemoteStorageKind::AzureContainer(config) => Some(&config.container_name), + } + } +} + fn default_timeout() -> Duration { RemoteStorageConfig::DEFAULT_TIMEOUT } @@ -178,6 +188,14 @@ impl RemoteStorageConfig { pub fn from_toml(toml: &toml_edit::Item) -> anyhow::Result { Ok(utils::toml_edit_ext::deserialize_item(toml)?) } + + pub fn from_toml_str(input: &str) -> anyhow::Result { + let toml_document = toml_edit::DocumentMut::from_str(input)?; + if let Some(item) = toml_document.get("remote_storage") { + return Self::from_toml(item); + } + Self::from_toml(toml_document.as_item()) + } } #[cfg(test)] @@ -185,8 +203,7 @@ mod tests { use super::*; fn parse(input: &str) -> anyhow::Result { - let toml = input.parse::().unwrap(); - RemoteStorageConfig::from_toml(toml.as_item()) + RemoteStorageConfig::from_toml_str(input) } #[test] diff --git a/pageserver/ctl/Cargo.toml b/pageserver/ctl/Cargo.toml index a753f806a0..39ca47568c 100644 --- a/pageserver/ctl/Cargo.toml +++ b/pageserver/ctl/Cargo.toml @@ -18,7 +18,6 @@ postgres_ffi.workspace = true thiserror.workspace = true tokio.workspace = true tokio-util.workspace = true -toml_edit.workspace = true utils.workspace = true svg_fmt.workspace = true workspace_hack.workspace = true diff --git a/pageserver/ctl/src/main.rs b/pageserver/ctl/src/main.rs index 92e766d2fb..a0aac89dc8 100644 --- a/pageserver/ctl/src/main.rs +++ b/pageserver/ctl/src/main.rs @@ -174,11 +174,7 @@ async fn main() -> anyhow::Result<()> { println!("specified prefix '{}' failed validation", cmd.prefix); return Ok(()); }; - let toml_document = toml_edit::DocumentMut::from_str(&cmd.config_toml_str)?; - let toml_item = toml_document - .get("remote_storage") - .expect("need remote_storage"); - let config = RemoteStorageConfig::from_toml(toml_item)?; + let config = RemoteStorageConfig::from_toml_str(&cmd.config_toml_str)?; let storage = remote_storage::GenericRemoteStorage::from_config(&config).await; let cancel = CancellationToken::new(); storage diff --git a/storage_scrubber/src/find_large_objects.rs b/storage_scrubber/src/find_large_objects.rs index 88e36af560..95d3af1453 100644 --- a/storage_scrubber/src/find_large_objects.rs +++ b/storage_scrubber/src/find_large_objects.rs @@ -106,9 +106,9 @@ pub async fn find_large_objects( } } - let bucket_name = target.bucket_name(); + let desc_str = target.desc_str(); tracing::info!( - "Scan of {bucket_name} finished. Scanned {tenant_ctr} shards. objects={object_ctr}, found={}.", + "Scan of {desc_str} finished. Scanned {tenant_ctr} shards. objects={object_ctr}, found={}.", objects.len() ); Ok(LargeObjectListing { objects }) diff --git a/storage_scrubber/src/garbage.rs b/storage_scrubber/src/garbage.rs index 863dbf960d..91668a42a7 100644 --- a/storage_scrubber/src/garbage.rs +++ b/storage_scrubber/src/garbage.rs @@ -177,7 +177,7 @@ async fn find_garbage_inner( })); // Enumerate Tenants in S3, and check if each one exists in Console - tracing::info!("Finding all tenants in bucket {}...", bucket_config.bucket); + tracing::info!("Finding all tenants in {}...", bucket_config.desc_str()); let tenants = stream_tenants(&remote_client, &target); let tenants_checked = tenants.map_ok(|t| { let api_client = cloud_admin_api_client.clone(); @@ -524,7 +524,7 @@ pub async fn purge_garbage( init_remote(garbage_list.bucket_config.clone(), garbage_list.node_kind).await?; assert_eq!( - &garbage_list.bucket_config.bucket, + garbage_list.bucket_config.bucket_name().unwrap(), remote_client.bucket_name().unwrap() ); diff --git a/storage_scrubber/src/lib.rs b/storage_scrubber/src/lib.rs index de0857cb5f..1fe4fc58cd 100644 --- a/storage_scrubber/src/lib.rs +++ b/storage_scrubber/src/lib.rs @@ -29,8 +29,7 @@ use pageserver::tenant::TENANTS_SEGMENT_NAME; use pageserver_api::shard::TenantShardId; use remote_storage::{ DownloadOpts, GenericRemoteStorage, Listing, ListingMode, RemotePath, RemoteStorageConfig, - RemoteStorageKind, S3Config, DEFAULT_MAX_KEYS_PER_LIST_RESPONSE, - DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT, + RemoteStorageKind, S3Config, }; use reqwest::Url; use serde::{Deserialize, Serialize}; @@ -48,7 +47,7 @@ const CLOUD_ADMIN_API_TOKEN_ENV_VAR: &str = "CLOUD_ADMIN_API_TOKEN"; #[derive(Debug, Clone)] pub struct S3Target { - pub bucket_name: String, + pub desc_str: String, /// This `prefix_in_bucket` is only equal to the PS/SK config of the same /// name for the RootTarget: other instances of S3Target will have prefix_in_bucket /// with extra parts. @@ -172,7 +171,7 @@ impl RootTarget { }; S3Target { - bucket_name: root.bucket_name.clone(), + desc_str: root.desc_str.clone(), prefix_in_bucket: format!( "{}/{TENANTS_SEGMENT_NAME}/{tenant_id}", root.prefix_in_bucket @@ -209,10 +208,10 @@ impl RootTarget { } } - pub fn bucket_name(&self) -> &str { + pub fn desc_str(&self) -> &str { match self { - Self::Pageserver(root) => &root.bucket_name, - Self::Safekeeper(root) => &root.bucket_name, + Self::Pageserver(root) => &root.desc_str, + Self::Safekeeper(root) => &root.desc_str, } } @@ -230,24 +229,61 @@ pub fn remote_timeline_path_id(id: &TenantShardTimelineId) -> RemotePath { #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(deny_unknown_fields)] -pub struct BucketConfig { - pub region: String, - pub bucket: String, - pub prefix_in_bucket: Option, -} +pub struct BucketConfig(RemoteStorageConfig); impl BucketConfig { pub fn from_env() -> anyhow::Result { - let region = env::var("REGION").context("'REGION' param retrieval")?; - let bucket = env::var("BUCKET").context("'BUCKET' param retrieval")?; - let prefix_in_bucket = env::var("BUCKET_PREFIX").ok(); - - Ok(Self { - region, - bucket, - prefix_in_bucket, - }) + if let Ok(legacy) = Self::from_env_legacy() { + return Ok(legacy); + } + let config_toml = + env::var("REMOTE_STORAGE_CONFIG").context("'REMOTE_STORAGE_CONFIG' retrieval")?; + let remote_config = RemoteStorageConfig::from_toml_str(&config_toml)?; + Ok(BucketConfig(remote_config)) } + + fn from_env_legacy() -> anyhow::Result { + let bucket_region = env::var("REGION").context("'REGION' param retrieval")?; + let bucket_name = env::var("BUCKET").context("'BUCKET' param retrieval")?; + let prefix_in_bucket = env::var("BUCKET_PREFIX").ok(); + let endpoint = env::var("AWS_ENDPOINT_URL").ok(); + // Create a json object which we then deserialize so that we don't + // have to repeat all of the S3Config fields. + let s3_config_json = serde_json::json!({ + "bucket_name": bucket_name, + "bucket_region": bucket_region, + "prefix_in_bucket": prefix_in_bucket, + "endpoint": endpoint, + }); + let config: RemoteStorageConfig = serde_json::from_value(s3_config_json)?; + Ok(BucketConfig(config)) + } + pub fn desc_str(&self) -> String { + match &self.0.storage { + RemoteStorageKind::LocalFs { local_path } => { + format!("local path {local_path}") + } + RemoteStorageKind::AwsS3(config) => format!( + "bucket {}, region {}", + config.bucket_name, config.bucket_region + ), + RemoteStorageKind::AzureContainer(config) => format!( + "bucket {}, storage account {:?}, region {}", + config.container_name, config.storage_account, config.container_region + ), + } + } + pub fn bucket_name(&self) -> Option<&str> { + self.0.storage.bucket_name() + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(deny_unknown_fields)] +pub struct BucketConfigLegacy { + pub region: String, + pub bucket: String, + pub prefix_in_bucket: Option, } pub struct ControllerClientConfig { @@ -337,13 +373,9 @@ fn default_prefix_in_bucket(node_kind: NodeKind) -> &'static str { } } -fn make_root_target( - bucket_name: String, - prefix_in_bucket: String, - node_kind: NodeKind, -) -> RootTarget { +fn make_root_target(desc_str: String, prefix_in_bucket: String, node_kind: NodeKind) -> RootTarget { let s3_target = S3Target { - bucket_name, + desc_str, prefix_in_bucket, delimiter: "/".to_string(), }; @@ -354,15 +386,15 @@ fn make_root_target( } async fn init_remote_s3( - bucket_config: BucketConfig, + bucket_config: S3Config, node_kind: NodeKind, ) -> anyhow::Result<(Arc, RootTarget)> { - let bucket_region = Region::new(bucket_config.region); + let bucket_region = Region::new(bucket_config.bucket_region); let s3_client = Arc::new(init_s3_client(bucket_region).await); let default_prefix = default_prefix_in_bucket(node_kind).to_string(); let s3_root = make_root_target( - bucket_config.bucket, + bucket_config.bucket_name, bucket_config.prefix_in_bucket.unwrap_or(default_prefix), node_kind, ); @@ -371,33 +403,28 @@ async fn init_remote_s3( } async fn init_remote( - bucket_config: BucketConfig, + mut storage_config: BucketConfig, node_kind: NodeKind, ) -> anyhow::Result<(GenericRemoteStorage, RootTarget)> { - let endpoint = env::var("AWS_ENDPOINT_URL").ok(); + let desc_str = storage_config.desc_str(); + let default_prefix = default_prefix_in_bucket(node_kind).to_string(); - let prefix_in_bucket = Some(bucket_config.prefix_in_bucket.unwrap_or(default_prefix)); - let storage = S3Config { - bucket_name: bucket_config.bucket.clone(), - bucket_region: bucket_config.region, - prefix_in_bucket, - endpoint, - concurrency_limit: DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT - .try_into() - .unwrap(), - max_keys_per_list_response: DEFAULT_MAX_KEYS_PER_LIST_RESPONSE, - upload_storage_class: None, - }; - let storage_config = RemoteStorageConfig { - storage: RemoteStorageKind::AwsS3(storage), - timeout: RemoteStorageConfig::DEFAULT_TIMEOUT, - }; + + match &mut storage_config.0.storage { + RemoteStorageKind::AwsS3(ref mut config) => { + config.prefix_in_bucket.get_or_insert(default_prefix); + } + RemoteStorageKind::AzureContainer(ref mut config) => { + config.prefix_in_container.get_or_insert(default_prefix); + } + RemoteStorageKind::LocalFs { .. } => (), + } // We already pass the prefix to the remote client above let prefix_in_root_target = String::new(); - let root_target = make_root_target(bucket_config.bucket, prefix_in_root_target, node_kind); + let root_target = make_root_target(desc_str, prefix_in_root_target, node_kind); - let client = GenericRemoteStorage::from_config(&storage_config).await?; + let client = GenericRemoteStorage::from_config(&storage_config.0).await?; Ok((client, root_target)) } @@ -469,7 +496,7 @@ async fn list_objects_with_retries( } warn!( "list_objects_v2 query failed: bucket_name={}, prefix={}, delimiter={}, error={}", - s3_target.bucket_name, + remote_client.bucket_name().unwrap_or_default(), s3_target.prefix_in_bucket, s3_target.delimiter, DisplayErrorContext(e), diff --git a/storage_scrubber/src/main.rs b/storage_scrubber/src/main.rs index ee816534c6..0ffb570984 100644 --- a/storage_scrubber/src/main.rs +++ b/storage_scrubber/src/main.rs @@ -140,7 +140,7 @@ async fn main() -> anyhow::Result<()> { "{}_{}_{}_{}.log", std::env::args().next().unwrap(), command_log_name, - bucket_config.bucket, + bucket_config.bucket_name().unwrap_or("nobucket"), chrono::Utc::now().format("%Y_%m_%d__%H_%M_%S") )); @@ -191,13 +191,7 @@ async fn main() -> anyhow::Result<()> { // Strictly speaking an empty bucket is a valid bucket, but if someone ran the // scrubber they were likely expecting to scan something, and if we see no timelines // at all then it's likely due to some configuration issues like a bad prefix - bail!( - "No timelines found in bucket {} prefix {}", - bucket_config.bucket, - bucket_config - .prefix_in_bucket - .unwrap_or("".to_string()) - ); + bail!("No timelines found in {}", bucket_config.desc_str()); } Ok(()) } else { @@ -396,13 +390,7 @@ pub async fn scan_pageserver_metadata_cmd( // Strictly speaking an empty bucket is a valid bucket, but if someone ran the // scrubber they were likely expecting to scan something, and if we see no timelines // at all then it's likely due to some configuration issues like a bad prefix - tracing::error!( - "No timelines found in bucket {} prefix {}", - bucket_config.bucket, - bucket_config - .prefix_in_bucket - .unwrap_or("".to_string()) - ); + tracing::error!("No timelines found in {}", bucket_config.desc_str()); if exit_code { std::process::exit(1); } diff --git a/storage_scrubber/src/scan_safekeeper_metadata.rs b/storage_scrubber/src/scan_safekeeper_metadata.rs index 403b4590a8..0a4d4266a0 100644 --- a/storage_scrubber/src/scan_safekeeper_metadata.rs +++ b/storage_scrubber/src/scan_safekeeper_metadata.rs @@ -84,10 +84,7 @@ pub async fn scan_safekeeper_metadata( bucket_config: BucketConfig, db_or_list: DatabaseOrList, ) -> anyhow::Result { - info!( - "checking bucket {}, region {}", - bucket_config.bucket, bucket_config.region - ); + info!("checking {}", bucket_config.desc_str()); let (remote_client, target) = init_remote(bucket_config, NodeKind::Safekeeper).await?; let console_config = ConsoleConfig::from_env()?; diff --git a/storage_scrubber/src/tenant_snapshot.rs b/storage_scrubber/src/tenant_snapshot.rs index bb4079b5f4..39e0b5c9b4 100644 --- a/storage_scrubber/src/tenant_snapshot.rs +++ b/storage_scrubber/src/tenant_snapshot.rs @@ -16,7 +16,7 @@ use pageserver::tenant::remote_timeline_client::index::LayerFileMetadata; use pageserver::tenant::storage_layer::LayerName; use pageserver::tenant::IndexPart; use pageserver_api::shard::TenantShardId; -use remote_storage::GenericRemoteStorage; +use remote_storage::{GenericRemoteStorage, S3Config}; use utils::generation::Generation; use utils::id::TenantId; @@ -24,6 +24,7 @@ pub struct SnapshotDownloader { s3_client: Arc, s3_root: RootTarget, bucket_config: BucketConfig, + bucket_config_s3: S3Config, tenant_id: TenantId, output_path: Utf8PathBuf, concurrency: usize, @@ -36,12 +37,17 @@ impl SnapshotDownloader { output_path: Utf8PathBuf, concurrency: usize, ) -> anyhow::Result { + let bucket_config_s3 = match &bucket_config.0.storage { + remote_storage::RemoteStorageKind::AwsS3(config) => config.clone(), + _ => panic!("only S3 configuration is supported for snapshot downloading"), + }; let (s3_client, s3_root) = - init_remote_s3(bucket_config.clone(), NodeKind::Pageserver).await?; + init_remote_s3(bucket_config_s3.clone(), NodeKind::Pageserver).await?; Ok(Self { s3_client, s3_root, bucket_config, + bucket_config_s3, tenant_id, output_path, concurrency, @@ -87,7 +93,7 @@ impl SnapshotDownloader { let versions = self .s3_client .list_object_versions() - .bucket(self.bucket_config.bucket.clone()) + .bucket(self.bucket_config_s3.bucket_name.clone()) .prefix(&remote_layer_path) .send() .await?; @@ -96,7 +102,7 @@ impl SnapshotDownloader { }; download_object_to_file_s3( &self.s3_client, - &self.bucket_config.bucket, + &self.bucket_config_s3.bucket_name, &remote_layer_path, version.version_id.as_deref(), &local_path,