mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-08 05:52:55 +00:00
fix(pageserver): give L0 compaction priorities over image layer creation (#8443)
close https://github.com/neondatabase/neon/issues/8435 ## Summary of changes If L0 compaction did not include all L0 layers, skip image generation. There are multiple possible solutions to the original issue, i.e., an alternative is to wrap the partial L0 compaction in a loop until it compacts all L0 layers. However, considering that we should weight all tenants equally, the current solution can ensure everyone gets a chance to run compaction, and those who write too much won't get a chance to create image layers. This creates a natural backpressure feedback that they get a slower read due to no image layers are created, slowing down their writes, and eventually compaction could keep up with their writes + generate image layers. Consider deployment, we should add an alert on "skipping image layer generation", so that we won't run into the case that image layers are not generated => incidents again. --------- Signed-off-by: Alex Chi Z <chi@neon.tech>
This commit is contained in:
@@ -1616,21 +1616,23 @@ impl Tenant {
|
||||
/// This function is periodically called by compactor task.
|
||||
/// Also it can be explicitly requested per timeline through page server
|
||||
/// api's 'compact' command.
|
||||
///
|
||||
/// Returns whether we have pending compaction task.
|
||||
async fn compaction_iteration(
|
||||
&self,
|
||||
cancel: &CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), timeline::CompactionError> {
|
||||
) -> Result<bool, timeline::CompactionError> {
|
||||
// Don't start doing work during shutdown, or when broken, we do not need those in the logs
|
||||
if !self.is_active() {
|
||||
return Ok(());
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
{
|
||||
let conf = self.tenant_conf.load();
|
||||
if !conf.location.may_delete_layers_hint() || !conf.location.may_upload_layers_hint() {
|
||||
info!("Skipping compaction in location state {:?}", conf.location);
|
||||
return Ok(());
|
||||
return Ok(false);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1657,11 +1659,13 @@ impl Tenant {
|
||||
// Before doing any I/O work, check our circuit breaker
|
||||
if self.compaction_circuit_breaker.lock().unwrap().is_broken() {
|
||||
info!("Skipping compaction due to previous failures");
|
||||
return Ok(());
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
let mut has_pending_task = false;
|
||||
|
||||
for (timeline_id, timeline) in &timelines_to_compact {
|
||||
timeline
|
||||
has_pending_task |= timeline
|
||||
.compact(cancel, EnumSet::empty(), ctx)
|
||||
.instrument(info_span!("compact_timeline", %timeline_id))
|
||||
.await
|
||||
@@ -1681,7 +1685,7 @@ impl Tenant {
|
||||
.unwrap()
|
||||
.success(&CIRCUIT_BREAKERS_UNBROKEN);
|
||||
|
||||
Ok(())
|
||||
Ok(has_pending_task)
|
||||
}
|
||||
|
||||
// Call through to all timelines to freeze ephemeral layers if needed. Usually
|
||||
|
||||
@@ -210,24 +210,28 @@ async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
|
||||
Duration::from_secs(10)
|
||||
} else {
|
||||
// Run compaction
|
||||
if let Err(e) = tenant.compaction_iteration(&cancel, &ctx).await {
|
||||
let wait_duration = backoff::exponential_backoff_duration_seconds(
|
||||
error_run_count + 1,
|
||||
1.0,
|
||||
MAX_BACKOFF_SECS,
|
||||
);
|
||||
error_run_count += 1;
|
||||
let wait_duration = Duration::from_secs_f64(wait_duration);
|
||||
log_compaction_error(
|
||||
&e,
|
||||
error_run_count,
|
||||
&wait_duration,
|
||||
cancel.is_cancelled(),
|
||||
);
|
||||
wait_duration
|
||||
} else {
|
||||
error_run_count = 0;
|
||||
period
|
||||
match tenant.compaction_iteration(&cancel, &ctx).await {
|
||||
Err(e) => {
|
||||
let wait_duration = backoff::exponential_backoff_duration_seconds(
|
||||
error_run_count + 1,
|
||||
1.0,
|
||||
MAX_BACKOFF_SECS,
|
||||
);
|
||||
error_run_count += 1;
|
||||
let wait_duration = Duration::from_secs_f64(wait_duration);
|
||||
log_compaction_error(
|
||||
&e,
|
||||
error_run_count,
|
||||
&wait_duration,
|
||||
cancel.is_cancelled(),
|
||||
);
|
||||
wait_duration
|
||||
}
|
||||
Ok(has_pending_task) => {
|
||||
error_run_count = 0;
|
||||
// schedule the next compaction immediately in case there is a pending compaction task
|
||||
if has_pending_task { Duration::from_secs(0) } else { period }
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -1769,13 +1769,14 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
/// Outermost timeline compaction operation; downloads needed layers.
|
||||
/// Outermost timeline compaction operation; downloads needed layers. Returns whether we have pending
|
||||
/// compaction tasks.
|
||||
pub(crate) async fn compact(
|
||||
self: &Arc<Self>,
|
||||
cancel: &CancellationToken,
|
||||
flags: EnumSet<CompactFlags>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), CompactionError> {
|
||||
) -> Result<bool, CompactionError> {
|
||||
// most likely the cancellation token is from background task, but in tests it could be the
|
||||
// request task as well.
|
||||
|
||||
@@ -1795,8 +1796,8 @@ impl Timeline {
|
||||
// compaction task goes over it's period (20s) which is quite often in production.
|
||||
let (_guard, _permit) = tokio::select! {
|
||||
tuple = prepare => { tuple },
|
||||
_ = self.cancel.cancelled() => return Ok(()),
|
||||
_ = cancel.cancelled() => return Ok(()),
|
||||
_ = self.cancel.cancelled() => return Ok(false),
|
||||
_ = cancel.cancelled() => return Ok(false),
|
||||
};
|
||||
|
||||
let last_record_lsn = self.get_last_record_lsn();
|
||||
@@ -1804,11 +1805,14 @@ impl Timeline {
|
||||
// Last record Lsn could be zero in case the timeline was just created
|
||||
if !last_record_lsn.is_valid() {
|
||||
warn!("Skipping compaction for potentially just initialized timeline, it has invalid last record lsn: {last_record_lsn}");
|
||||
return Ok(());
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
match self.get_compaction_algorithm_settings().kind {
|
||||
CompactionAlgorithm::Tiered => self.compact_tiered(cancel, ctx).await,
|
||||
CompactionAlgorithm::Tiered => {
|
||||
self.compact_tiered(cancel, ctx).await?;
|
||||
Ok(false)
|
||||
}
|
||||
CompactionAlgorithm::Legacy => self.compact_legacy(cancel, flags, ctx).await,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -102,17 +102,19 @@ impl KeyHistoryRetention {
|
||||
|
||||
impl Timeline {
|
||||
/// TODO: cancellation
|
||||
///
|
||||
/// Returns whether the compaction has pending tasks.
|
||||
pub(crate) async fn compact_legacy(
|
||||
self: &Arc<Self>,
|
||||
cancel: &CancellationToken,
|
||||
flags: EnumSet<CompactFlags>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), CompactionError> {
|
||||
) -> Result<bool, CompactionError> {
|
||||
if flags.contains(CompactFlags::EnhancedGcBottomMostCompaction) {
|
||||
return self
|
||||
.compact_with_gc(cancel, ctx)
|
||||
self.compact_with_gc(cancel, ctx)
|
||||
.await
|
||||
.map_err(CompactionError::Other);
|
||||
.map_err(CompactionError::Other)?;
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
// High level strategy for compaction / image creation:
|
||||
@@ -160,7 +162,7 @@ impl Timeline {
|
||||
// Define partitioning schema if needed
|
||||
|
||||
// FIXME: the match should only cover repartitioning, not the next steps
|
||||
let partition_count = match self
|
||||
let (partition_count, has_pending_tasks) = match self
|
||||
.repartition(
|
||||
self.get_last_record_lsn(),
|
||||
self.get_compaction_target_size(),
|
||||
@@ -177,30 +179,35 @@ impl Timeline {
|
||||
|
||||
// 2. Compact
|
||||
let timer = self.metrics.compact_time_histo.start_timer();
|
||||
self.compact_level0(target_file_size, ctx).await?;
|
||||
let fully_compacted = self.compact_level0(target_file_size, ctx).await?;
|
||||
timer.stop_and_record();
|
||||
|
||||
// 3. Create new image layers for partitions that have been modified
|
||||
// "enough".
|
||||
let mut partitioning = dense_partitioning;
|
||||
partitioning
|
||||
.parts
|
||||
.extend(sparse_partitioning.into_dense().parts);
|
||||
let image_layers = self
|
||||
.create_image_layers(
|
||||
&partitioning,
|
||||
lsn,
|
||||
if flags.contains(CompactFlags::ForceImageLayerCreation) {
|
||||
ImageLayerCreationMode::Force
|
||||
} else {
|
||||
ImageLayerCreationMode::Try
|
||||
},
|
||||
&image_ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
self.upload_new_image_layers(image_layers)?;
|
||||
partitioning.parts.len()
|
||||
// 3. Create new image layers for partitions that have been modified
|
||||
// "enough". Skip image layer creation if L0 compaction cannot keep up.
|
||||
if fully_compacted {
|
||||
let image_layers = self
|
||||
.create_image_layers(
|
||||
&partitioning,
|
||||
lsn,
|
||||
if flags.contains(CompactFlags::ForceImageLayerCreation) {
|
||||
ImageLayerCreationMode::Force
|
||||
} else {
|
||||
ImageLayerCreationMode::Try
|
||||
},
|
||||
&image_ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
self.upload_new_image_layers(image_layers)?;
|
||||
} else {
|
||||
info!("skipping image layer generation due to L0 compaction did not include all layers.");
|
||||
}
|
||||
(partitioning.parts.len(), !fully_compacted)
|
||||
}
|
||||
Err(err) => {
|
||||
// no partitioning? This is normal, if the timeline was just created
|
||||
@@ -212,7 +219,7 @@ impl Timeline {
|
||||
if !self.cancel.is_cancelled() {
|
||||
tracing::error!("could not compact, repartitioning keyspace failed: {err:?}");
|
||||
}
|
||||
1
|
||||
(1, false)
|
||||
}
|
||||
};
|
||||
|
||||
@@ -225,7 +232,7 @@ impl Timeline {
|
||||
self.compact_shard_ancestors(rewrite_max, ctx).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
Ok(has_pending_tasks)
|
||||
}
|
||||
|
||||
/// Check for layers that are elegible to be rewritten:
|
||||
@@ -432,15 +439,16 @@ impl Timeline {
|
||||
}
|
||||
|
||||
/// Collect a bunch of Level 0 layer files, and compact and reshuffle them as
|
||||
/// as Level 1 files.
|
||||
/// as Level 1 files. Returns whether the L0 layers are fully compacted.
|
||||
async fn compact_level0(
|
||||
self: &Arc<Self>,
|
||||
target_file_size: u64,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), CompactionError> {
|
||||
) -> Result<bool, CompactionError> {
|
||||
let CompactLevel0Phase1Result {
|
||||
new_layers,
|
||||
deltas_to_compact,
|
||||
fully_compacted,
|
||||
} = {
|
||||
let phase1_span = info_span!("compact_level0_phase1");
|
||||
let ctx = ctx.attached_child();
|
||||
@@ -463,12 +471,12 @@ impl Timeline {
|
||||
|
||||
if new_layers.is_empty() && deltas_to_compact.is_empty() {
|
||||
// nothing to do
|
||||
return Ok(());
|
||||
return Ok(true);
|
||||
}
|
||||
|
||||
self.finish_compact_batch(&new_layers, &Vec::new(), &deltas_to_compact)
|
||||
.await?;
|
||||
Ok(())
|
||||
Ok(fully_compacted)
|
||||
}
|
||||
|
||||
/// Level0 files first phase of compaction, explained in the [`Self::compact_legacy`] comment.
|
||||
@@ -535,6 +543,8 @@ impl Timeline {
|
||||
) as u64
|
||||
* std::cmp::max(self.get_checkpoint_distance(), DEFAULT_CHECKPOINT_DISTANCE);
|
||||
|
||||
let mut fully_compacted = true;
|
||||
|
||||
deltas_to_compact.push(
|
||||
first_level0_delta
|
||||
.download_and_keep_resident()
|
||||
@@ -562,6 +572,7 @@ impl Timeline {
|
||||
"L0 compaction picker hit max delta layer size limit: {}",
|
||||
delta_size_limit
|
||||
);
|
||||
fully_compacted = false;
|
||||
|
||||
// Proceed with compaction, but only a subset of L0s
|
||||
break;
|
||||
@@ -923,6 +934,7 @@ impl Timeline {
|
||||
.into_iter()
|
||||
.map(|x| x.drop_eviction_guard())
|
||||
.collect::<Vec<_>>(),
|
||||
fully_compacted,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -931,6 +943,9 @@ impl Timeline {
|
||||
struct CompactLevel0Phase1Result {
|
||||
new_layers: Vec<ResidentLayer>,
|
||||
deltas_to_compact: Vec<Layer>,
|
||||
// Whether we have included all L0 layers, or selected only part of them due to the
|
||||
// L0 compaction size limit.
|
||||
fully_compacted: bool,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
|
||||
Reference in New Issue
Block a user