fix(test): only test num of L1 layers in compaction smoke test (#9186)

close https://github.com/neondatabase/neon/issues/9160

For whatever reason, pg17's WAL pattern seems different from others,
which triggers some flaky behavior within the compaction smoke test.

## Summary of changes

* Run L0 compaction before proceeding with the read benchmark.
* So that we can ensure the num of L0 layers is 0 and test the
compaction behavior only with L1 layers.

We have a threshold for triggering L0 compaction. In some cases, the
test case did not produce enough L0 layers to do a L0 compaction,
therefore leaving the layer map with 3+ L0 layers above the L1 layers.
This increases the average read depth for the timeline.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
This commit is contained in:
Alex Chi Z.
2024-10-02 12:42:35 -04:00
committed by GitHub
parent 38a8dcab9f
commit 700885471f
7 changed files with 63 additions and 11 deletions

View File

@@ -1742,6 +1742,10 @@ async fn timeline_compact_handler(
let state = get_state(&request);
let mut flags = EnumSet::empty();
if Some(true) == parse_query_param::<_, bool>(&request, "force_l0_compaction")? {
flags |= CompactFlags::ForceL0Compaction;
}
if Some(true) == parse_query_param::<_, bool>(&request, "force_repartition")? {
flags |= CompactFlags::ForceRepartition;
}
@@ -1788,6 +1792,9 @@ async fn timeline_checkpoint_handler(
let state = get_state(&request);
let mut flags = EnumSet::empty();
if Some(true) == parse_query_param::<_, bool>(&request, "force_l0_compaction")? {
flags |= CompactFlags::ForceL0Compaction;
}
if Some(true) == parse_query_param::<_, bool>(&request, "force_repartition")? {
flags |= CompactFlags::ForceRepartition;
}

View File

@@ -737,6 +737,7 @@ pub enum GetLogicalSizePriority {
pub(crate) enum CompactFlags {
ForceRepartition,
ForceImageLayerCreation,
ForceL0Compaction,
EnhancedGcBottomMostCompaction,
DryRun,
}

View File

@@ -11,6 +11,7 @@ pub(crate) struct RangeAnalysis {
has_image: bool,
num_of_deltas_above_image: usize,
total_num_of_deltas: usize,
num_of_l0: usize,
}
impl Timeline {
@@ -20,8 +21,10 @@ impl Timeline {
let mut delta_ranges = Vec::new();
let mut image_ranges = Vec::new();
let num_of_l0;
let all_layer_files = {
let guard = self.layers.read().await;
num_of_l0 = guard.layer_map().unwrap().level0_deltas().len();
guard.all_persistent_layers()
};
let lsn = self.get_last_record_lsn();
@@ -82,6 +85,7 @@ impl Timeline {
has_image: image_layer.is_some(),
num_of_deltas_above_image: maybe_delta_layers.len(),
total_num_of_deltas: pitr_delta_layers.len(),
num_of_l0,
});
}

View File

@@ -353,7 +353,13 @@ impl Timeline {
// 2. Compact
let timer = self.metrics.compact_time_histo.start_timer();
let fully_compacted = self.compact_level0(target_file_size, ctx).await?;
let fully_compacted = self
.compact_level0(
target_file_size,
flags.contains(CompactFlags::ForceL0Compaction),
ctx,
)
.await?;
timer.stop_and_record();
let mut partitioning = dense_partitioning;
@@ -658,6 +664,7 @@ impl Timeline {
async fn compact_level0(
self: &Arc<Self>,
target_file_size: u64,
force_compaction_ignore_threshold: bool,
ctx: &RequestContext,
) -> Result<bool, CompactionError> {
let CompactLevel0Phase1Result {
@@ -679,9 +686,15 @@ impl Timeline {
let now = tokio::time::Instant::now();
stats.read_lock_acquisition_micros =
DurationRecorder::Recorded(RecordedDuration(now - begin), now);
self.compact_level0_phase1(phase1_layers_locked, stats, target_file_size, &ctx)
.instrument(phase1_span)
.await?
self.compact_level0_phase1(
phase1_layers_locked,
stats,
target_file_size,
force_compaction_ignore_threshold,
&ctx,
)
.instrument(phase1_span)
.await?
};
if new_layers.is_empty() && deltas_to_compact.is_empty() {
@@ -700,6 +713,7 @@ impl Timeline {
guard: tokio::sync::RwLockReadGuard<'a, LayerManager>,
mut stats: CompactLevel0Phase1StatsBuilder,
target_file_size: u64,
force_compaction_ignore_threshold: bool,
ctx: &RequestContext,
) -> Result<CompactLevel0Phase1Result, CompactionError> {
stats.read_lock_held_spawn_blocking_startup_micros =
@@ -711,11 +725,26 @@ impl Timeline {
// Only compact if enough layers have accumulated.
let threshold = self.get_compaction_threshold();
if level0_deltas.is_empty() || level0_deltas.len() < threshold {
debug!(
level0_deltas = level0_deltas.len(),
threshold, "too few deltas to compact"
);
return Ok(CompactLevel0Phase1Result::default());
if force_compaction_ignore_threshold {
if !level0_deltas.is_empty() {
info!(
level0_deltas = level0_deltas.len(),
threshold, "too few deltas to compact, but forcing compaction"
);
} else {
info!(
level0_deltas = level0_deltas.len(),
threshold, "too few deltas to compact, cannot force compaction"
);
return Ok(CompactLevel0Phase1Result::default());
}
} else {
debug!(
level0_deltas = level0_deltas.len(),
threshold, "too few deltas to compact"
);
return Ok(CompactLevel0Phase1Result::default());
}
}
let mut level0_deltas = level0_deltas

View File

@@ -586,6 +586,7 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
timeline_id: TimelineId,
force_repartition=False,
force_image_layer_creation=False,
force_l0_compaction=False,
wait_until_uploaded=False,
enhanced_gc_bottom_most_compaction=False,
):
@@ -595,6 +596,8 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
query["force_repartition"] = "true"
if force_image_layer_creation:
query["force_image_layer_creation"] = "true"
if force_l0_compaction:
query["force_l0_compaction"] = "true"
if wait_until_uploaded:
query["wait_until_uploaded"] = "true"
if enhanced_gc_bottom_most_compaction:
@@ -701,6 +704,7 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
timeline_id: TimelineId,
force_repartition=False,
force_image_layer_creation=False,
force_l0_compaction=False,
wait_until_uploaded=False,
compact: Optional[bool] = None,
**kwargs,
@@ -711,6 +715,8 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
query["force_repartition"] = "true"
if force_image_layer_creation:
query["force_image_layer_creation"] = "true"
if force_l0_compaction:
query["force_l0_compaction"] = "true"
if wait_until_uploaded:
query["wait_until_uploaded"] = "true"

View File

@@ -175,7 +175,9 @@ class Workload:
if upload:
# Wait for written data to be uploaded to S3 (force a checkpoint to trigger upload)
ps_http.timeline_checkpoint(
tenant_shard_id, self.timeline_id, wait_until_uploaded=True
tenant_shard_id,
self.timeline_id,
wait_until_uploaded=True,
)
log.info(f"Churn: waiting for remote LSN {last_flush_lsn}")
else:

View File

@@ -63,7 +63,10 @@ page_cache_size=10
log.info(f"Running churn round {i}/{churn_rounds} ...")
workload.churn_rows(row_count, env.pageserver.id)
ps_http.timeline_compact(tenant_id, timeline_id)
# Force L0 compaction to ensure the number of layers is within bounds; we don't want to count L0 layers
# in this benchmark. In other words, this smoke test ensures number of L1 layers are bound.
ps_http.timeline_compact(tenant_id, timeline_id, force_l0_compaction=True)
assert ps_http.perf_info(tenant_id, timeline_id)[0]["num_of_l0"] <= 1
log.info("Validating at workload end ...")
workload.validate(env.pageserver.id)