From b254dce8a1564af3eed5fea02c2511c9b40d435f Mon Sep 17 00:00:00 2001 From: "Alex Chi Z." <4198311+skyzh@users.noreply.github.com> Date: Tue, 1 Jul 2025 10:00:27 -0700 Subject: [PATCH] feat(pageserver): report compaction progress (#12401) ## Problem close https://github.com/neondatabase/neon/issues/11528 ## Summary of changes Gives us better observability of compaction progress. - Image creation: num of partition processed / total partition - Gc-compaction: index of the in the queue / total items for a full compaction - Shard ancestor compaction: layers to rewrite / total layers Signed-off-by: Alex Chi Z --- pageserver/src/tenant/timeline.rs | 18 +++++++++++--- pageserver/src/tenant/timeline/compaction.rs | 26 ++++++++++++++------ 2 files changed, 34 insertions(+), 10 deletions(-) diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 08bc6d4a59..aca44718fa 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -5308,6 +5308,7 @@ impl Timeline { ctx: &RequestContext, img_range: Range, io_concurrency: IoConcurrency, + progress: Option<(usize, usize)>, ) -> Result { let mut wrote_keys = false; @@ -5384,11 +5385,15 @@ impl Timeline { } } + let progress_report = progress + .map(|(idx, total)| format!("({idx}/{total}) ")) + .unwrap_or_default(); if wrote_keys { // Normal path: we have written some data into the new image layer for this // partition, so flush it to disk. info!( - "produced image layer for rel {}", + "{} produced image layer for rel {}", + progress_report, ImageLayerName { key_range: img_range.clone(), lsn @@ -5398,7 +5403,12 @@ impl Timeline { unfinished_image_layer: image_layer_writer, }) } else { - tracing::debug!("no data in range {}-{}", img_range.start, img_range.end); + tracing::debug!( + "{} no data in range {}-{}", + progress_report, + img_range.start, + img_range.end + ); Ok(ImageLayerCreationOutcome::Empty) } } @@ -5633,7 +5643,8 @@ impl Timeline { } } - for partition in partition_parts.iter() { + let total = partition_parts.len(); + for (idx, partition) in partition_parts.iter().enumerate() { if self.cancel.is_cancelled() { return Err(CreateImageLayersError::Cancelled); } @@ -5718,6 +5729,7 @@ impl Timeline { ctx, img_range.clone(), io_concurrency, + Some((idx, total)), ) .await? } else { diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 13a4f82607..43573c28a2 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -101,7 +101,11 @@ pub enum GcCompactionQueueItem { /// Whether the compaction is triggered automatically (determines whether we need to update L2 LSN) auto: bool, }, - SubCompactionJob(CompactOptions), + SubCompactionJob { + i: usize, + total: usize, + options: CompactOptions, + }, Notify(GcCompactionJobId, Option), } @@ -163,7 +167,7 @@ impl GcCompactionQueueItem { running, job_id: id.0, }), - GcCompactionQueueItem::SubCompactionJob(options) => Some(CompactInfoResponse { + GcCompactionQueueItem::SubCompactionJob { options, .. } => Some(CompactInfoResponse { compact_key_range: options.compact_key_range, compact_lsn_range: options.compact_lsn_range, sub_compaction: options.sub_compaction, @@ -489,7 +493,7 @@ impl GcCompactionQueue { .map(|job| job.compact_lsn_range.end) .max() .unwrap(); - for job in jobs { + for (i, job) in jobs.into_iter().enumerate() { // Unfortunately we need to convert the `GcCompactJob` back to `CompactionOptions` // until we do further refactors to allow directly call `compact_with_gc`. let mut flags: EnumSet = EnumSet::default(); @@ -507,7 +511,11 @@ impl GcCompactionQueue { compact_lsn_range: Some(job.compact_lsn_range.into()), sub_compaction_max_job_size_mb: None, }; - pending_tasks.push(GcCompactionQueueItem::SubCompactionJob(options)); + pending_tasks.push(GcCompactionQueueItem::SubCompactionJob { + options, + i, + total: jobs_len, + }); } if !auto { @@ -651,7 +659,7 @@ impl GcCompactionQueue { } } } - GcCompactionQueueItem::SubCompactionJob(options) => { + GcCompactionQueueItem::SubCompactionJob { options, i, total } => { // TODO: error handling, clear the queue if any task fails? let _gc_guard = match gc_block.start().await { Ok(guard) => guard, @@ -663,6 +671,7 @@ impl GcCompactionQueue { ))); } }; + info!("running gc-compaction subcompaction job {}/{}", i, total); let res = timeline.compact_with_options(cancel, options, ctx).await; let compaction_result = match res { Ok(res) => res, @@ -1591,13 +1600,15 @@ impl Timeline { let started = Instant::now(); let mut replace_image_layers = Vec::new(); + let total = layers_to_rewrite.len(); - for layer in layers_to_rewrite { + for (i, layer) in layers_to_rewrite.into_iter().enumerate() { if self.cancel.is_cancelled() { return Err(CompactionError::ShuttingDown); } - info!(layer=%layer, "rewriting layer after shard split"); + info!(layer=%layer, "rewriting layer after shard split: {}/{}", i, total); + let mut image_layer_writer = ImageLayerWriter::new( self.conf, self.timeline_id, @@ -4343,6 +4354,7 @@ impl TimelineAdaptor { ctx, key_range.clone(), IoConcurrency::sequential(), + None, ) .await?;