diff --git a/s3_scrubber/src/cloud_admin_api.rs b/s3_scrubber/src/cloud_admin_api.rs index 45cac23690..66ca2f7180 100644 --- a/s3_scrubber/src/cloud_admin_api.rs +++ b/s3_scrubber/src/cloud_admin_api.rs @@ -137,7 +137,7 @@ pub struct ProjectData { pub region_id: String, pub platform_id: String, pub user_id: String, - pub pageserver_id: u64, + pub pageserver_id: Option, #[serde(deserialize_with = "from_nullable_id")] pub tenant: TenantId, pub safekeepers: Vec, diff --git a/s3_scrubber/src/garbage.rs b/s3_scrubber/src/garbage.rs index 7a08dffc66..de3b16b49b 100644 --- a/s3_scrubber/src/garbage.rs +++ b/s3_scrubber/src/garbage.rs @@ -60,6 +60,7 @@ pub struct GarbageList { /// see garbage, we saw some active tenants too. This protects against classes of bugs /// in the scrubber that might otherwise generate a "deleted all" result. active_tenant_count: usize, + active_timeline_count: usize, } impl GarbageList { @@ -67,6 +68,7 @@ impl GarbageList { Self { items: Vec::new(), active_tenant_count: 0, + active_timeline_count: 0, node_kind, bucket_config, } @@ -221,6 +223,7 @@ async fn find_garbage_inner( } else { tracing::debug!("Tenant {tenant_shard_id} is active"); active_tenants.push(tenant_shard_id); + garbage.active_tenant_count = active_tenants.len(); } counter += 1; @@ -271,15 +274,29 @@ async fn find_garbage_inner( std::pin::pin!(timelines_checked.try_buffer_unordered(CONSOLE_CONCURRENCY)); // Update the GarbageList with any timelines which appear not to exist. + let mut active_timelines: Vec = vec![]; while let Some(result) = timelines_checked.next().await { let (ttid, console_result) = result?; if garbage.maybe_append(GarbageEntity::Timeline(ttid), console_result) { tracing::debug!("Timeline {ttid} is garbage"); } else { tracing::debug!("Timeline {ttid} is active"); + active_timelines.push(ttid); + garbage.active_timeline_count = active_timelines.len(); } } + let num_garbage_timelines = garbage + .items + .iter() + .filter(|g| matches!(g.entity, GarbageEntity::Timeline(_))) + .count(); + tracing::info!( + "Found {}/{} garbage timelines in active tenants", + num_garbage_timelines, + active_timelines.len(), + ); + Ok(garbage) } @@ -344,16 +361,22 @@ pub async fn get_timeline_objects( const MAX_KEYS_PER_DELETE: usize = 1000; /// Drain a buffer of keys into DeleteObjects requests +/// +/// If `drain` is true, drains keys completely; otherwise stops when < +/// MAX_KEYS_PER_DELETE keys are left. +/// `num_deleted` returns number of deleted keys. async fn do_delete( s3_client: &Arc, bucket_name: &str, keys: &mut Vec, dry_run: bool, drain: bool, + progress_tracker: &mut DeletionProgressTracker, ) -> anyhow::Result<()> { while (!keys.is_empty() && drain) || (keys.len() >= MAX_KEYS_PER_DELETE) { let request_keys = keys.split_off(keys.len() - (std::cmp::min(MAX_KEYS_PER_DELETE, keys.len()))); + let num_deleted = request_keys.len(); if dry_run { tracing::info!("Dry-run deletion of objects: "); for k in request_keys { @@ -368,12 +391,30 @@ async fn do_delete( .send() .await .context("DeleteObjects request")?; + progress_tracker.register(num_deleted); } } Ok(()) } +/// Simple tracker reporting each 10k deleted keys. +#[derive(Default)] +struct DeletionProgressTracker { + num_deleted: usize, + last_reported_num_deleted: usize, +} + +impl DeletionProgressTracker { + fn register(&mut self, n: usize) { + self.num_deleted += n; + if self.num_deleted - self.last_reported_num_deleted > 10000 { + tracing::info!("progress: deleted {} keys", self.num_deleted); + self.last_reported_num_deleted = self.num_deleted; + } + } +} + pub async fn purge_garbage( input_path: String, mode: PurgeMode, @@ -394,6 +435,14 @@ pub async fn purge_garbage( if garbage_list.active_tenant_count == 0 { anyhow::bail!("Refusing to purge a garbage list that reports 0 active tenants"); } + if garbage_list + .items + .iter() + .any(|g| matches!(g.entity, GarbageEntity::Timeline(_))) + && garbage_list.active_timeline_count == 0 + { + anyhow::bail!("Refusing to purge a garbage list containing garbage timelines that reports 0 active timelines"); + } let filtered_items = garbage_list .items @@ -429,6 +478,7 @@ pub async fn purge_garbage( std::pin::pin!(get_objects_results.try_buffer_unordered(S3_CONCURRENCY)); let mut objects_to_delete = Vec::new(); + let mut progress_tracker = DeletionProgressTracker::default(); while let Some(result) = get_objects_results.next().await { let mut object_list = result?; objects_to_delete.append(&mut object_list); @@ -439,6 +489,7 @@ pub async fn purge_garbage( &mut objects_to_delete, dry_run, false, + &mut progress_tracker, ) .await?; } @@ -450,10 +501,11 @@ pub async fn purge_garbage( &mut objects_to_delete, dry_run, true, + &mut progress_tracker, ) .await?; - tracing::info!("Fell through"); + tracing::info!("{} keys deleted in total", progress_tracker.num_deleted); Ok(()) } diff --git a/s3_scrubber/src/lib.rs b/s3_scrubber/src/lib.rs index 43be258150..78ad9d0da7 100644 --- a/s3_scrubber/src/lib.rs +++ b/s3_scrubber/src/lib.rs @@ -343,7 +343,7 @@ fn init_remote( }), NodeKind::Safekeeper => RootTarget::Safekeeper(S3Target { bucket_name: bucket_config.bucket, - prefix_in_bucket: bucket_config.prefix_in_bucket.unwrap_or("wal".to_string()), + prefix_in_bucket: bucket_config.prefix_in_bucket.unwrap_or("wal/".to_string()), delimiter, }), }; diff --git a/s3_scrubber/src/metadata_stream.rs b/s3_scrubber/src/metadata_stream.rs index b192e0be2e..c05874f556 100644 --- a/s3_scrubber/src/metadata_stream.rs +++ b/s3_scrubber/src/metadata_stream.rs @@ -114,7 +114,7 @@ pub async fn stream_tenant_timelines<'a>( let timelines_target = target.timelines_root(&tenant); loop { - tracing::info!("Listing in {}", tenant); + tracing::debug!("Listing in {}", tenant); let fetch_response = list_objects_with_retries(s3_client, &timelines_target, continuation_token.clone()) .await; @@ -151,7 +151,7 @@ pub async fn stream_tenant_timelines<'a>( } } - tracing::info!("Yielding for {}", tenant); + tracing::debug!("Yielding for {}", tenant); Ok(stream! { for i in timeline_ids { let id = i?;