diff --git a/Cargo.lock b/Cargo.lock index da8cefb219..c7af140f7d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3578,7 +3578,6 @@ dependencies = [ "thiserror", "tokio", "tokio-util", - "toml_edit", "utils", "workspace_hack", ] diff --git a/libs/remote_storage/src/config.rs b/libs/remote_storage/src/config.rs index d0e92411da..e99ae4f747 100644 --- a/libs/remote_storage/src/config.rs +++ b/libs/remote_storage/src/config.rs @@ -26,6 +26,16 @@ pub struct RemoteStorageConfig { pub timeout: Duration, } +impl RemoteStorageKind { + pub fn bucket_name(&self) -> Option<&str> { + match self { + RemoteStorageKind::LocalFs { .. } => None, + RemoteStorageKind::AwsS3(config) => Some(&config.bucket_name), + RemoteStorageKind::AzureContainer(config) => Some(&config.container_name), + } + } +} + fn default_timeout() -> Duration { RemoteStorageConfig::DEFAULT_TIMEOUT } @@ -178,6 +188,14 @@ impl RemoteStorageConfig { pub fn from_toml(toml: &toml_edit::Item) -> anyhow::Result { Ok(utils::toml_edit_ext::deserialize_item(toml)?) } + + pub fn from_toml_str(input: &str) -> anyhow::Result { + let toml_document = toml_edit::DocumentMut::from_str(input)?; + if let Some(item) = toml_document.get("remote_storage") { + return Self::from_toml(item); + } + Self::from_toml(toml_document.as_item()) + } } #[cfg(test)] @@ -185,8 +203,7 @@ mod tests { use super::*; fn parse(input: &str) -> anyhow::Result { - let toml = input.parse::().unwrap(); - RemoteStorageConfig::from_toml(toml.as_item()) + RemoteStorageConfig::from_toml_str(input) } #[test] diff --git a/pageserver/ctl/Cargo.toml b/pageserver/ctl/Cargo.toml index a753f806a0..39ca47568c 100644 --- a/pageserver/ctl/Cargo.toml +++ b/pageserver/ctl/Cargo.toml @@ -18,7 +18,6 @@ postgres_ffi.workspace = true thiserror.workspace = true tokio.workspace = true tokio-util.workspace = true -toml_edit.workspace = true utils.workspace = true svg_fmt.workspace = true workspace_hack.workspace = true diff --git a/pageserver/ctl/src/main.rs b/pageserver/ctl/src/main.rs index 92e766d2fb..a0aac89dc8 100644 --- a/pageserver/ctl/src/main.rs +++ b/pageserver/ctl/src/main.rs @@ -174,11 +174,7 @@ async fn main() -> anyhow::Result<()> { println!("specified prefix '{}' failed validation", cmd.prefix); return Ok(()); }; - let toml_document = toml_edit::DocumentMut::from_str(&cmd.config_toml_str)?; - let toml_item = toml_document - .get("remote_storage") - .expect("need remote_storage"); - let config = RemoteStorageConfig::from_toml(toml_item)?; + let config = RemoteStorageConfig::from_toml_str(&cmd.config_toml_str)?; let storage = remote_storage::GenericRemoteStorage::from_config(&config).await; let cancel = CancellationToken::new(); storage diff --git a/storage_scrubber/src/find_large_objects.rs b/storage_scrubber/src/find_large_objects.rs index 88e36af560..95d3af1453 100644 --- a/storage_scrubber/src/find_large_objects.rs +++ b/storage_scrubber/src/find_large_objects.rs @@ -106,9 +106,9 @@ pub async fn find_large_objects( } } - let bucket_name = target.bucket_name(); + let desc_str = target.desc_str(); tracing::info!( - "Scan of {bucket_name} finished. Scanned {tenant_ctr} shards. objects={object_ctr}, found={}.", + "Scan of {desc_str} finished. Scanned {tenant_ctr} shards. objects={object_ctr}, found={}.", objects.len() ); Ok(LargeObjectListing { objects }) diff --git a/storage_scrubber/src/garbage.rs b/storage_scrubber/src/garbage.rs index 863dbf960d..91668a42a7 100644 --- a/storage_scrubber/src/garbage.rs +++ b/storage_scrubber/src/garbage.rs @@ -177,7 +177,7 @@ async fn find_garbage_inner( })); // Enumerate Tenants in S3, and check if each one exists in Console - tracing::info!("Finding all tenants in bucket {}...", bucket_config.bucket); + tracing::info!("Finding all tenants in {}...", bucket_config.desc_str()); let tenants = stream_tenants(&remote_client, &target); let tenants_checked = tenants.map_ok(|t| { let api_client = cloud_admin_api_client.clone(); @@ -524,7 +524,7 @@ pub async fn purge_garbage( init_remote(garbage_list.bucket_config.clone(), garbage_list.node_kind).await?; assert_eq!( - &garbage_list.bucket_config.bucket, + garbage_list.bucket_config.bucket_name().unwrap(), remote_client.bucket_name().unwrap() ); diff --git a/storage_scrubber/src/lib.rs b/storage_scrubber/src/lib.rs index de0857cb5f..1fe4fc58cd 100644 --- a/storage_scrubber/src/lib.rs +++ b/storage_scrubber/src/lib.rs @@ -29,8 +29,7 @@ use pageserver::tenant::TENANTS_SEGMENT_NAME; use pageserver_api::shard::TenantShardId; use remote_storage::{ DownloadOpts, GenericRemoteStorage, Listing, ListingMode, RemotePath, RemoteStorageConfig, - RemoteStorageKind, S3Config, DEFAULT_MAX_KEYS_PER_LIST_RESPONSE, - DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT, + RemoteStorageKind, S3Config, }; use reqwest::Url; use serde::{Deserialize, Serialize}; @@ -48,7 +47,7 @@ const CLOUD_ADMIN_API_TOKEN_ENV_VAR: &str = "CLOUD_ADMIN_API_TOKEN"; #[derive(Debug, Clone)] pub struct S3Target { - pub bucket_name: String, + pub desc_str: String, /// This `prefix_in_bucket` is only equal to the PS/SK config of the same /// name for the RootTarget: other instances of S3Target will have prefix_in_bucket /// with extra parts. @@ -172,7 +171,7 @@ impl RootTarget { }; S3Target { - bucket_name: root.bucket_name.clone(), + desc_str: root.desc_str.clone(), prefix_in_bucket: format!( "{}/{TENANTS_SEGMENT_NAME}/{tenant_id}", root.prefix_in_bucket @@ -209,10 +208,10 @@ impl RootTarget { } } - pub fn bucket_name(&self) -> &str { + pub fn desc_str(&self) -> &str { match self { - Self::Pageserver(root) => &root.bucket_name, - Self::Safekeeper(root) => &root.bucket_name, + Self::Pageserver(root) => &root.desc_str, + Self::Safekeeper(root) => &root.desc_str, } } @@ -230,24 +229,61 @@ pub fn remote_timeline_path_id(id: &TenantShardTimelineId) -> RemotePath { #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(deny_unknown_fields)] -pub struct BucketConfig { - pub region: String, - pub bucket: String, - pub prefix_in_bucket: Option, -} +pub struct BucketConfig(RemoteStorageConfig); impl BucketConfig { pub fn from_env() -> anyhow::Result { - let region = env::var("REGION").context("'REGION' param retrieval")?; - let bucket = env::var("BUCKET").context("'BUCKET' param retrieval")?; - let prefix_in_bucket = env::var("BUCKET_PREFIX").ok(); - - Ok(Self { - region, - bucket, - prefix_in_bucket, - }) + if let Ok(legacy) = Self::from_env_legacy() { + return Ok(legacy); + } + let config_toml = + env::var("REMOTE_STORAGE_CONFIG").context("'REMOTE_STORAGE_CONFIG' retrieval")?; + let remote_config = RemoteStorageConfig::from_toml_str(&config_toml)?; + Ok(BucketConfig(remote_config)) } + + fn from_env_legacy() -> anyhow::Result { + let bucket_region = env::var("REGION").context("'REGION' param retrieval")?; + let bucket_name = env::var("BUCKET").context("'BUCKET' param retrieval")?; + let prefix_in_bucket = env::var("BUCKET_PREFIX").ok(); + let endpoint = env::var("AWS_ENDPOINT_URL").ok(); + // Create a json object which we then deserialize so that we don't + // have to repeat all of the S3Config fields. + let s3_config_json = serde_json::json!({ + "bucket_name": bucket_name, + "bucket_region": bucket_region, + "prefix_in_bucket": prefix_in_bucket, + "endpoint": endpoint, + }); + let config: RemoteStorageConfig = serde_json::from_value(s3_config_json)?; + Ok(BucketConfig(config)) + } + pub fn desc_str(&self) -> String { + match &self.0.storage { + RemoteStorageKind::LocalFs { local_path } => { + format!("local path {local_path}") + } + RemoteStorageKind::AwsS3(config) => format!( + "bucket {}, region {}", + config.bucket_name, config.bucket_region + ), + RemoteStorageKind::AzureContainer(config) => format!( + "bucket {}, storage account {:?}, region {}", + config.container_name, config.storage_account, config.container_region + ), + } + } + pub fn bucket_name(&self) -> Option<&str> { + self.0.storage.bucket_name() + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(deny_unknown_fields)] +pub struct BucketConfigLegacy { + pub region: String, + pub bucket: String, + pub prefix_in_bucket: Option, } pub struct ControllerClientConfig { @@ -337,13 +373,9 @@ fn default_prefix_in_bucket(node_kind: NodeKind) -> &'static str { } } -fn make_root_target( - bucket_name: String, - prefix_in_bucket: String, - node_kind: NodeKind, -) -> RootTarget { +fn make_root_target(desc_str: String, prefix_in_bucket: String, node_kind: NodeKind) -> RootTarget { let s3_target = S3Target { - bucket_name, + desc_str, prefix_in_bucket, delimiter: "/".to_string(), }; @@ -354,15 +386,15 @@ fn make_root_target( } async fn init_remote_s3( - bucket_config: BucketConfig, + bucket_config: S3Config, node_kind: NodeKind, ) -> anyhow::Result<(Arc, RootTarget)> { - let bucket_region = Region::new(bucket_config.region); + let bucket_region = Region::new(bucket_config.bucket_region); let s3_client = Arc::new(init_s3_client(bucket_region).await); let default_prefix = default_prefix_in_bucket(node_kind).to_string(); let s3_root = make_root_target( - bucket_config.bucket, + bucket_config.bucket_name, bucket_config.prefix_in_bucket.unwrap_or(default_prefix), node_kind, ); @@ -371,33 +403,28 @@ async fn init_remote_s3( } async fn init_remote( - bucket_config: BucketConfig, + mut storage_config: BucketConfig, node_kind: NodeKind, ) -> anyhow::Result<(GenericRemoteStorage, RootTarget)> { - let endpoint = env::var("AWS_ENDPOINT_URL").ok(); + let desc_str = storage_config.desc_str(); + let default_prefix = default_prefix_in_bucket(node_kind).to_string(); - let prefix_in_bucket = Some(bucket_config.prefix_in_bucket.unwrap_or(default_prefix)); - let storage = S3Config { - bucket_name: bucket_config.bucket.clone(), - bucket_region: bucket_config.region, - prefix_in_bucket, - endpoint, - concurrency_limit: DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT - .try_into() - .unwrap(), - max_keys_per_list_response: DEFAULT_MAX_KEYS_PER_LIST_RESPONSE, - upload_storage_class: None, - }; - let storage_config = RemoteStorageConfig { - storage: RemoteStorageKind::AwsS3(storage), - timeout: RemoteStorageConfig::DEFAULT_TIMEOUT, - }; + + match &mut storage_config.0.storage { + RemoteStorageKind::AwsS3(ref mut config) => { + config.prefix_in_bucket.get_or_insert(default_prefix); + } + RemoteStorageKind::AzureContainer(ref mut config) => { + config.prefix_in_container.get_or_insert(default_prefix); + } + RemoteStorageKind::LocalFs { .. } => (), + } // We already pass the prefix to the remote client above let prefix_in_root_target = String::new(); - let root_target = make_root_target(bucket_config.bucket, prefix_in_root_target, node_kind); + let root_target = make_root_target(desc_str, prefix_in_root_target, node_kind); - let client = GenericRemoteStorage::from_config(&storage_config).await?; + let client = GenericRemoteStorage::from_config(&storage_config.0).await?; Ok((client, root_target)) } @@ -469,7 +496,7 @@ async fn list_objects_with_retries( } warn!( "list_objects_v2 query failed: bucket_name={}, prefix={}, delimiter={}, error={}", - s3_target.bucket_name, + remote_client.bucket_name().unwrap_or_default(), s3_target.prefix_in_bucket, s3_target.delimiter, DisplayErrorContext(e), diff --git a/storage_scrubber/src/main.rs b/storage_scrubber/src/main.rs index ee816534c6..0ffb570984 100644 --- a/storage_scrubber/src/main.rs +++ b/storage_scrubber/src/main.rs @@ -140,7 +140,7 @@ async fn main() -> anyhow::Result<()> { "{}_{}_{}_{}.log", std::env::args().next().unwrap(), command_log_name, - bucket_config.bucket, + bucket_config.bucket_name().unwrap_or("nobucket"), chrono::Utc::now().format("%Y_%m_%d__%H_%M_%S") )); @@ -191,13 +191,7 @@ async fn main() -> anyhow::Result<()> { // Strictly speaking an empty bucket is a valid bucket, but if someone ran the // scrubber they were likely expecting to scan something, and if we see no timelines // at all then it's likely due to some configuration issues like a bad prefix - bail!( - "No timelines found in bucket {} prefix {}", - bucket_config.bucket, - bucket_config - .prefix_in_bucket - .unwrap_or("".to_string()) - ); + bail!("No timelines found in {}", bucket_config.desc_str()); } Ok(()) } else { @@ -396,13 +390,7 @@ pub async fn scan_pageserver_metadata_cmd( // Strictly speaking an empty bucket is a valid bucket, but if someone ran the // scrubber they were likely expecting to scan something, and if we see no timelines // at all then it's likely due to some configuration issues like a bad prefix - tracing::error!( - "No timelines found in bucket {} prefix {}", - bucket_config.bucket, - bucket_config - .prefix_in_bucket - .unwrap_or("".to_string()) - ); + tracing::error!("No timelines found in {}", bucket_config.desc_str()); if exit_code { std::process::exit(1); } diff --git a/storage_scrubber/src/scan_safekeeper_metadata.rs b/storage_scrubber/src/scan_safekeeper_metadata.rs index 403b4590a8..0a4d4266a0 100644 --- a/storage_scrubber/src/scan_safekeeper_metadata.rs +++ b/storage_scrubber/src/scan_safekeeper_metadata.rs @@ -84,10 +84,7 @@ pub async fn scan_safekeeper_metadata( bucket_config: BucketConfig, db_or_list: DatabaseOrList, ) -> anyhow::Result { - info!( - "checking bucket {}, region {}", - bucket_config.bucket, bucket_config.region - ); + info!("checking {}", bucket_config.desc_str()); let (remote_client, target) = init_remote(bucket_config, NodeKind::Safekeeper).await?; let console_config = ConsoleConfig::from_env()?; diff --git a/storage_scrubber/src/tenant_snapshot.rs b/storage_scrubber/src/tenant_snapshot.rs index bb4079b5f4..39e0b5c9b4 100644 --- a/storage_scrubber/src/tenant_snapshot.rs +++ b/storage_scrubber/src/tenant_snapshot.rs @@ -16,7 +16,7 @@ use pageserver::tenant::remote_timeline_client::index::LayerFileMetadata; use pageserver::tenant::storage_layer::LayerName; use pageserver::tenant::IndexPart; use pageserver_api::shard::TenantShardId; -use remote_storage::GenericRemoteStorage; +use remote_storage::{GenericRemoteStorage, S3Config}; use utils::generation::Generation; use utils::id::TenantId; @@ -24,6 +24,7 @@ pub struct SnapshotDownloader { s3_client: Arc, s3_root: RootTarget, bucket_config: BucketConfig, + bucket_config_s3: S3Config, tenant_id: TenantId, output_path: Utf8PathBuf, concurrency: usize, @@ -36,12 +37,17 @@ impl SnapshotDownloader { output_path: Utf8PathBuf, concurrency: usize, ) -> anyhow::Result { + let bucket_config_s3 = match &bucket_config.0.storage { + remote_storage::RemoteStorageKind::AwsS3(config) => config.clone(), + _ => panic!("only S3 configuration is supported for snapshot downloading"), + }; let (s3_client, s3_root) = - init_remote_s3(bucket_config.clone(), NodeKind::Pageserver).await?; + init_remote_s3(bucket_config_s3.clone(), NodeKind::Pageserver).await?; Ok(Self { s3_client, s3_root, bucket_config, + bucket_config_s3, tenant_id, output_path, concurrency, @@ -87,7 +93,7 @@ impl SnapshotDownloader { let versions = self .s3_client .list_object_versions() - .bucket(self.bucket_config.bucket.clone()) + .bucket(self.bucket_config_s3.bucket_name.clone()) .prefix(&remote_layer_path) .send() .await?; @@ -96,7 +102,7 @@ impl SnapshotDownloader { }; download_object_to_file_s3( &self.s3_client, - &self.bucket_config.bucket, + &self.bucket_config_s3.bucket_name, &remote_layer_path, version.version_id.as_deref(), &local_path,