mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-23 06:09:59 +00:00
feat(pageserver): report compaction progress (#12401)
## Problem close https://github.com/neondatabase/neon/issues/11528 ## Summary of changes Gives us better observability of compaction progress. - Image creation: num of partition processed / total partition - Gc-compaction: index of the in the queue / total items for a full compaction - Shard ancestor compaction: layers to rewrite / total layers Signed-off-by: Alex Chi Z <chi@neon.tech>
This commit is contained in:
@@ -5308,6 +5308,7 @@ impl Timeline {
|
||||
ctx: &RequestContext,
|
||||
img_range: Range<Key>,
|
||||
io_concurrency: IoConcurrency,
|
||||
progress: Option<(usize, usize)>,
|
||||
) -> Result<ImageLayerCreationOutcome, CreateImageLayersError> {
|
||||
let mut wrote_keys = false;
|
||||
|
||||
@@ -5384,11 +5385,15 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
let progress_report = progress
|
||||
.map(|(idx, total)| format!("({idx}/{total}) "))
|
||||
.unwrap_or_default();
|
||||
if wrote_keys {
|
||||
// Normal path: we have written some data into the new image layer for this
|
||||
// partition, so flush it to disk.
|
||||
info!(
|
||||
"produced image layer for rel {}",
|
||||
"{} produced image layer for rel {}",
|
||||
progress_report,
|
||||
ImageLayerName {
|
||||
key_range: img_range.clone(),
|
||||
lsn
|
||||
@@ -5398,7 +5403,12 @@ impl Timeline {
|
||||
unfinished_image_layer: image_layer_writer,
|
||||
})
|
||||
} else {
|
||||
tracing::debug!("no data in range {}-{}", img_range.start, img_range.end);
|
||||
tracing::debug!(
|
||||
"{} no data in range {}-{}",
|
||||
progress_report,
|
||||
img_range.start,
|
||||
img_range.end
|
||||
);
|
||||
Ok(ImageLayerCreationOutcome::Empty)
|
||||
}
|
||||
}
|
||||
@@ -5633,7 +5643,8 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
for partition in partition_parts.iter() {
|
||||
let total = partition_parts.len();
|
||||
for (idx, partition) in partition_parts.iter().enumerate() {
|
||||
if self.cancel.is_cancelled() {
|
||||
return Err(CreateImageLayersError::Cancelled);
|
||||
}
|
||||
@@ -5718,6 +5729,7 @@ impl Timeline {
|
||||
ctx,
|
||||
img_range.clone(),
|
||||
io_concurrency,
|
||||
Some((idx, total)),
|
||||
)
|
||||
.await?
|
||||
} else {
|
||||
|
||||
@@ -101,7 +101,11 @@ pub enum GcCompactionQueueItem {
|
||||
/// Whether the compaction is triggered automatically (determines whether we need to update L2 LSN)
|
||||
auto: bool,
|
||||
},
|
||||
SubCompactionJob(CompactOptions),
|
||||
SubCompactionJob {
|
||||
i: usize,
|
||||
total: usize,
|
||||
options: CompactOptions,
|
||||
},
|
||||
Notify(GcCompactionJobId, Option<Lsn>),
|
||||
}
|
||||
|
||||
@@ -163,7 +167,7 @@ impl GcCompactionQueueItem {
|
||||
running,
|
||||
job_id: id.0,
|
||||
}),
|
||||
GcCompactionQueueItem::SubCompactionJob(options) => Some(CompactInfoResponse {
|
||||
GcCompactionQueueItem::SubCompactionJob { options, .. } => Some(CompactInfoResponse {
|
||||
compact_key_range: options.compact_key_range,
|
||||
compact_lsn_range: options.compact_lsn_range,
|
||||
sub_compaction: options.sub_compaction,
|
||||
@@ -489,7 +493,7 @@ impl GcCompactionQueue {
|
||||
.map(|job| job.compact_lsn_range.end)
|
||||
.max()
|
||||
.unwrap();
|
||||
for job in jobs {
|
||||
for (i, job) in jobs.into_iter().enumerate() {
|
||||
// Unfortunately we need to convert the `GcCompactJob` back to `CompactionOptions`
|
||||
// until we do further refactors to allow directly call `compact_with_gc`.
|
||||
let mut flags: EnumSet<CompactFlags> = EnumSet::default();
|
||||
@@ -507,7 +511,11 @@ impl GcCompactionQueue {
|
||||
compact_lsn_range: Some(job.compact_lsn_range.into()),
|
||||
sub_compaction_max_job_size_mb: None,
|
||||
};
|
||||
pending_tasks.push(GcCompactionQueueItem::SubCompactionJob(options));
|
||||
pending_tasks.push(GcCompactionQueueItem::SubCompactionJob {
|
||||
options,
|
||||
i,
|
||||
total: jobs_len,
|
||||
});
|
||||
}
|
||||
|
||||
if !auto {
|
||||
@@ -651,7 +659,7 @@ impl GcCompactionQueue {
|
||||
}
|
||||
}
|
||||
}
|
||||
GcCompactionQueueItem::SubCompactionJob(options) => {
|
||||
GcCompactionQueueItem::SubCompactionJob { options, i, total } => {
|
||||
// TODO: error handling, clear the queue if any task fails?
|
||||
let _gc_guard = match gc_block.start().await {
|
||||
Ok(guard) => guard,
|
||||
@@ -663,6 +671,7 @@ impl GcCompactionQueue {
|
||||
)));
|
||||
}
|
||||
};
|
||||
info!("running gc-compaction subcompaction job {}/{}", i, total);
|
||||
let res = timeline.compact_with_options(cancel, options, ctx).await;
|
||||
let compaction_result = match res {
|
||||
Ok(res) => res,
|
||||
@@ -1591,13 +1600,15 @@ impl Timeline {
|
||||
let started = Instant::now();
|
||||
|
||||
let mut replace_image_layers = Vec::new();
|
||||
let total = layers_to_rewrite.len();
|
||||
|
||||
for layer in layers_to_rewrite {
|
||||
for (i, layer) in layers_to_rewrite.into_iter().enumerate() {
|
||||
if self.cancel.is_cancelled() {
|
||||
return Err(CompactionError::ShuttingDown);
|
||||
}
|
||||
|
||||
info!(layer=%layer, "rewriting layer after shard split");
|
||||
info!(layer=%layer, "rewriting layer after shard split: {}/{}", i, total);
|
||||
|
||||
let mut image_layer_writer = ImageLayerWriter::new(
|
||||
self.conf,
|
||||
self.timeline_id,
|
||||
@@ -4343,6 +4354,7 @@ impl TimelineAdaptor {
|
||||
ctx,
|
||||
key_range.clone(),
|
||||
IoConcurrency::sequential(),
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user