diff --git a/libs/remote_storage/src/azure_blob.rs b/libs/remote_storage/src/azure_blob.rs index f98d16789c..1c0d43d479 100644 --- a/libs/remote_storage/src/azure_blob.rs +++ b/libs/remote_storage/src/azure_blob.rs @@ -97,10 +97,7 @@ impl AzureBlobStorage { pub fn relative_path_to_name(&self, path: &RemotePath) -> String { assert_eq!(std::path::MAIN_SEPARATOR, REMOTE_STORAGE_PREFIX_SEPARATOR); - let path_string = path - .get_path() - .as_str() - .trim_end_matches(REMOTE_STORAGE_PREFIX_SEPARATOR); + let path_string = path.get_path().as_str(); match &self.prefix_in_container { Some(prefix) => { if prefix.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) { @@ -277,19 +274,14 @@ impl RemoteStorage for AzureBlobStorage { cancel: &CancellationToken, ) -> impl Stream> { // get the passed prefix or if it is not set use prefix_in_bucket value - let list_prefix = prefix - .map(|p| self.relative_path_to_name(p)) - .or_else(|| self.prefix_in_container.clone()) - .map(|mut p| { - // required to end with a separator - // otherwise request will return only the entry of a prefix - if matches!(mode, ListingMode::WithDelimiter) - && !p.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) - { - p.push(REMOTE_STORAGE_PREFIX_SEPARATOR); + let list_prefix = prefix.map(|p| self.relative_path_to_name(p)).or_else(|| { + self.prefix_in_container.clone().map(|mut s| { + if !s.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) { + s.push(REMOTE_STORAGE_PREFIX_SEPARATOR); } - p - }); + s + }) + }); async_stream::stream! { let _permit = self.permit(RequestKind::List, cancel).await?; diff --git a/storage_scrubber/src/garbage.rs b/storage_scrubber/src/garbage.rs index 91668a42a7..b026efbc3b 100644 --- a/storage_scrubber/src/garbage.rs +++ b/storage_scrubber/src/garbage.rs @@ -21,7 +21,7 @@ use utils::{backoff, id::TenantId}; use crate::{ cloud_admin_api::{CloudAdminApiClient, MaybeDeleted, ProjectData}, init_remote, list_objects_with_retries, - metadata_stream::{stream_tenant_timelines, stream_tenants}, + metadata_stream::{stream_tenant_timelines, stream_tenants_maybe_prefix}, BucketConfig, ConsoleConfig, NodeKind, TenantShardTimelineId, TraversingDepth, MAX_RETRIES, }; @@ -118,9 +118,17 @@ pub async fn find_garbage( console_config: ConsoleConfig, depth: TraversingDepth, node_kind: NodeKind, + tenant_id_prefix: Option, output_path: String, ) -> anyhow::Result<()> { - let garbage = find_garbage_inner(bucket_config, console_config, depth, node_kind).await?; + let garbage = find_garbage_inner( + bucket_config, + console_config, + depth, + node_kind, + tenant_id_prefix, + ) + .await?; let serialized = serde_json::to_vec_pretty(&garbage)?; tokio::fs::write(&output_path, &serialized).await?; @@ -152,6 +160,7 @@ async fn find_garbage_inner( console_config: ConsoleConfig, depth: TraversingDepth, node_kind: NodeKind, + tenant_id_prefix: Option, ) -> anyhow::Result { // Construct clients for S3 and for Console API let (remote_client, target) = init_remote(bucket_config.clone(), node_kind).await?; @@ -178,7 +187,7 @@ async fn find_garbage_inner( // Enumerate Tenants in S3, and check if each one exists in Console tracing::info!("Finding all tenants in {}...", bucket_config.desc_str()); - let tenants = stream_tenants(&remote_client, &target); + let tenants = stream_tenants_maybe_prefix(&remote_client, &target, tenant_id_prefix); let tenants_checked = tenants.map_ok(|t| { let api_client = cloud_admin_api_client.clone(); let console_cache = console_cache.clone(); diff --git a/storage_scrubber/src/main.rs b/storage_scrubber/src/main.rs index 0ffb570984..92979d609e 100644 --- a/storage_scrubber/src/main.rs +++ b/storage_scrubber/src/main.rs @@ -54,6 +54,8 @@ enum Command { node_kind: NodeKind, #[arg(short, long, default_value_t=TraversingDepth::Tenant)] depth: TraversingDepth, + #[arg(short, long, default_value=None)] + tenant_id_prefix: Option, #[arg(short, long, default_value_t = String::from("garbage.json"))] output_path: String, }, @@ -209,10 +211,19 @@ async fn main() -> anyhow::Result<()> { Command::FindGarbage { node_kind, depth, + tenant_id_prefix, output_path, } => { let console_config = ConsoleConfig::from_env()?; - find_garbage(bucket_config, console_config, depth, node_kind, output_path).await + find_garbage( + bucket_config, + console_config, + depth, + node_kind, + tenant_id_prefix, + output_path, + ) + .await } Command::PurgeGarbage { input_path, diff --git a/storage_scrubber/src/metadata_stream.rs b/storage_scrubber/src/metadata_stream.rs index efda7c213d..47447d681c 100644 --- a/storage_scrubber/src/metadata_stream.rs +++ b/storage_scrubber/src/metadata_stream.rs @@ -17,9 +17,20 @@ use utils::id::{TenantId, TimelineId}; pub fn stream_tenants<'a>( remote_client: &'a GenericRemoteStorage, target: &'a RootTarget, +) -> impl Stream> + 'a { + stream_tenants_maybe_prefix(remote_client, target, None) +} +/// Given a remote storage and a target, output a stream of TenantIds discovered via listing prefixes +pub fn stream_tenants_maybe_prefix<'a>( + remote_client: &'a GenericRemoteStorage, + target: &'a RootTarget, + tenant_id_prefix: Option, ) -> impl Stream> + 'a { try_stream! { - let tenants_target = target.tenants_root(); + let mut tenants_target = target.tenants_root(); + if let Some(tenant_id_prefix) = tenant_id_prefix { + tenants_target.prefix_in_bucket += &tenant_id_prefix; + } let mut tenants_stream = std::pin::pin!(stream_objects_with_retries(remote_client, ListingMode::WithDelimiter, &tenants_target)); while let Some(chunk) = tenants_stream.next().await {