diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs index 794e696769..2c9e298f79 100644 --- a/libs/remote_storage/src/lib.rs +++ b/libs/remote_storage/src/lib.rs @@ -144,6 +144,7 @@ impl RemotePath { /// /// The WithDelimiter mode will populate `prefixes` and `keys` in the result. The /// NoDelimiter mode will only populate `keys`. +#[derive(Copy, Clone)] pub enum ListingMode { WithDelimiter, NoDelimiter, diff --git a/storage_scrubber/src/garbage.rs b/storage_scrubber/src/garbage.rs index 73479c3658..d6a73bf366 100644 --- a/storage_scrubber/src/garbage.rs +++ b/storage_scrubber/src/garbage.rs @@ -19,8 +19,8 @@ use utils::id::TenantId; use crate::{ cloud_admin_api::{CloudAdminApiClient, MaybeDeleted, ProjectData}, - init_remote, init_remote_generic, list_objects_with_retries, - metadata_stream::{stream_tenant_timelines, stream_tenants}, + init_remote_generic, list_objects_with_retries_generic, + metadata_stream::{stream_tenant_timelines_generic, stream_tenants_generic}, BucketConfig, ConsoleConfig, NodeKind, TenantShardTimelineId, TraversingDepth, }; @@ -153,7 +153,7 @@ async fn find_garbage_inner( node_kind: NodeKind, ) -> anyhow::Result { // Construct clients for S3 and for Console API - let (s3_client, target) = init_remote(bucket_config.clone(), node_kind).await?; + let (remote_client, target) = init_remote_generic(bucket_config.clone(), node_kind).await?; let cloud_admin_api_client = Arc::new(CloudAdminApiClient::new(console_config)); // Build a set of console-known tenants, for quickly eliminating known-active tenants without having @@ -179,7 +179,7 @@ async fn find_garbage_inner( // Enumerate Tenants in S3, and check if each one exists in Console tracing::info!("Finding all tenants in bucket {}...", bucket_config.bucket); - let tenants = stream_tenants(&s3_client, &target); + let tenants = stream_tenants_generic(&remote_client, &target); let tenants_checked = tenants.map_ok(|t| { let api_client = cloud_admin_api_client.clone(); let console_cache = console_cache.clone(); @@ -237,25 +237,26 @@ async fn find_garbage_inner( // Special case: If it's missing in console, check for known bugs that would enable us to conclusively // identify it as purge-able anyway if console_result.is_none() { - let timelines = stream_tenant_timelines(&s3_client, &target, tenant_shard_id) - .await? - .collect::>() - .await; + let timelines = + stream_tenant_timelines_generic(&remote_client, &target, tenant_shard_id) + .await? + .collect::>() + .await; if timelines.is_empty() { // No timelines, but a heatmap: the deletion bug where we deleted everything but heatmaps - let tenant_objects = list_objects_with_retries( - &s3_client, + let tenant_objects = list_objects_with_retries_generic( + &remote_client, + ListingMode::WithDelimiter, &target.tenant_root(&tenant_shard_id), - None, ) .await?; - let object = tenant_objects.contents.as_ref().unwrap().first().unwrap(); - if object.key.as_ref().unwrap().ends_with("heatmap-v1.json") { + let object = tenant_objects.keys.first().unwrap(); + if object.key.get_path().as_str().ends_with("heatmap-v1.json") { tracing::info!("Tenant {tenant_shard_id}: is missing in console and is only a heatmap (known historic deletion bug)"); garbage.append_buggy(GarbageEntity::Tenant(tenant_shard_id)); continue; } else { - tracing::info!("Tenant {tenant_shard_id} is missing in console and contains one object: {}", object.key.as_ref().unwrap()); + tracing::info!("Tenant {tenant_shard_id} is missing in console and contains one object: {}", object.key); } } else { // A console-unknown tenant with timelines: check if these timelines only contain initdb.tar.zst, from the initial @@ -264,24 +265,18 @@ async fn find_garbage_inner( for timeline_r in timelines { let timeline = timeline_r?; - let timeline_objects = list_objects_with_retries( - &s3_client, + let timeline_objects = list_objects_with_retries_generic( + &remote_client, + ListingMode::WithDelimiter, &target.timeline_root(&timeline), - None, ) .await?; - if timeline_objects - .common_prefixes - .as_ref() - .map(|v| v.len()) - .unwrap_or(0) - > 0 - { + if !timeline_objects.prefixes.is_empty() { // Sub-paths? Unexpected any_non_initdb = true; } else { - let object = timeline_objects.contents.as_ref().unwrap().first().unwrap(); - if object.key.as_ref().unwrap().ends_with("initdb.tar.zst") { + let object = timeline_objects.keys.first().unwrap(); + if object.key.get_path().as_str().ends_with("initdb.tar.zst") { tracing::info!("Timeline {timeline} contains only initdb.tar.zst"); } else { any_non_initdb = true; @@ -336,7 +331,8 @@ async fn find_garbage_inner( // Construct a stream of all timelines within active tenants let active_tenants = tokio_stream::iter(active_tenants.iter().map(Ok)); - let timelines = active_tenants.map_ok(|t| stream_tenant_timelines(&s3_client, &target, *t)); + let timelines = + active_tenants.map_ok(|t| stream_tenant_timelines_generic(&remote_client, &target, *t)); let timelines = timelines.try_buffer_unordered(S3_CONCURRENCY); let timelines = timelines.try_flatten(); diff --git a/storage_scrubber/src/lib.rs b/storage_scrubber/src/lib.rs index e0f154def3..152319b731 100644 --- a/storage_scrubber/src/lib.rs +++ b/storage_scrubber/src/lib.rs @@ -427,6 +427,7 @@ async fn list_objects_with_retries( Err(anyhow!("unreachable unless MAX_RETRIES==0")) } +/// Listing possibly large amounts of keys in a streaming fashion. fn stream_objects_with_retries<'a>( storage_client: &'a GenericRemoteStorage, listing_mode: ListingMode, @@ -465,6 +466,45 @@ fn stream_objects_with_retries<'a>( } } +/// If you want to list a bounded amount of prefixes or keys. For larger numbers of keys/prefixes, +/// use [`stream_objects_with_retries`] instead. +async fn list_objects_with_retries_generic( + remote_client: &GenericRemoteStorage, + listing_mode: ListingMode, + s3_target: &S3Target, +) -> anyhow::Result { + let cancel = CancellationToken::new(); + let prefix_str = &s3_target + .prefix_in_bucket + .strip_prefix("/") + .unwrap_or(&s3_target.prefix_in_bucket); + let prefix = RemotePath::from_string(prefix_str)?; + for trial in 0..MAX_RETRIES { + match remote_client + .list(Some(&prefix), listing_mode, None, &cancel) + .await + { + Ok(response) => return Ok(response), + Err(e) => { + if trial == MAX_RETRIES - 1 { + return Err(e) + .with_context(|| format!("Failed to list objects {MAX_RETRIES} times")); + } + error!( + "list_objects_v2 query failed: bucket_name={}, prefix={}, delimiter={}, error={}", + s3_target.bucket_name, + s3_target.prefix_in_bucket, + s3_target.delimiter, + DisplayErrorContext(e), + ); + let backoff_time = 1 << trial.max(5); + tokio::time::sleep(Duration::from_secs(backoff_time)).await; + } + } + } + panic!("MAX_RETRIES is not allowed to be 0"); +} + async fn download_object_with_retries( s3_client: &Client, bucket_name: &str, diff --git a/storage_scrubber/src/metadata_stream.rs b/storage_scrubber/src/metadata_stream.rs index 91dba3c992..c702c0c312 100644 --- a/storage_scrubber/src/metadata_stream.rs +++ b/storage_scrubber/src/metadata_stream.rs @@ -189,6 +189,63 @@ pub async fn stream_tenant_timelines<'a>( }) } +/// Given a `TenantShardId`, output a stream of the timelines within that tenant, discovered +/// using a listing. The listing is done before the stream is built, so that this +/// function can be used to generate concurrency on a stream using buffer_unordered. +pub async fn stream_tenant_timelines_generic<'a>( + remote_client: &'a GenericRemoteStorage, + target: &'a RootTarget, + tenant: TenantShardId, +) -> anyhow::Result> + 'a> { + let mut timeline_ids: Vec> = Vec::new(); + let timelines_target = target.timelines_root(&tenant); + + let mut objects_stream = std::pin::pin!(stream_objects_with_retries( + remote_client, + ListingMode::WithDelimiter, + &timelines_target + )); + loop { + tracing::debug!("Listing in {tenant}"); + let fetch_response = match objects_stream.next().await { + None => break, + Some(Err(e)) => { + timeline_ids.push(Err(e)); + break; + } + Some(Ok(r)) => r, + }; + + let new_entry_ids = fetch_response + .prefixes + .iter() + .filter_map(|prefix| -> Option<&str> { + prefix + .get_path() + .as_str() + .strip_prefix(&timelines_target.prefix_in_bucket)? + .strip_suffix('/') + }) + .map(|entry_id_str| { + entry_id_str + .parse::() + .with_context(|| format!("Incorrect entry id str: {entry_id_str}")) + }); + + for i in new_entry_ids { + timeline_ids.push(i); + } + } + + tracing::debug!("Yielding for {}", tenant); + Ok(stream! { + for i in timeline_ids { + let id = i?; + yield Ok(TenantShardTimelineId::new(tenant, id)); + } + }) +} + pub(crate) fn stream_listing<'a>( s3_client: &'a Client, target: &'a S3Target,