fix(pageserver): handle dup layers during gc-compaction (#10430)

## Problem

If gc-compaction decides to rewrite an image layer, it will now cause
index_part to lose reference to that layer. In details,

* Assume there's only one image layer of key 0000...AAAA at LSN 0x100
and generation 0xA in the system.
* gc-compaction kicks in at gc-horizon 0x100, and then produce
0000...AAAA at LSN 0x100 and generation 0xB.
* It submits a compaction result update into the index part that unlinks
0000-AAAA-100-A and adds 0000-AAAA-100-B

On the remote storage / local disk side, this is fine -- it unlinks
things correctly and uploads the new file. However, the
`index_part.json` itself doesn't record generations. The buggy procedure
is as follows:

1. upload the new file
2. update the index part to remove the old file and add the new file
3. remove the new file

Therefore, the correct update result process for gc-compaction should be
as follows:

* When modifying the layer map, delete the old one and upload the new
one.
* When updating the index, uploading the new one in the index without
deleting the old one.

## Summary of changes

* Modify `finish_gc_compaction` to correctly order insertions and
deletions.
* Update the way gc-compaction uploads the layer files.
* Add new tests.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
This commit is contained in:
Alex Chi Z.
2025-01-23 16:54:44 -05:00
committed by GitHub
parent 6166482589
commit 8d47a60de2
3 changed files with 309 additions and 15 deletions

View File

@@ -436,12 +436,14 @@ impl KeyHistoryRetention {
if dry_run {
return true;
}
let guard = tline.layers.read().await;
if !guard.contains_key(key) {
return false;
let layer_generation;
{
let guard = tline.layers.read().await;
if !guard.contains_key(key) {
return false;
}
layer_generation = guard.get_from_key(key).metadata().generation;
}
let layer_generation = guard.get_from_key(key).metadata().generation;
drop(guard);
if layer_generation == tline.generation {
info!(
key=%key,
@@ -2138,6 +2140,11 @@ impl Timeline {
self.get_gc_compaction_watermark()
};
if compact_below_lsn == Lsn::INVALID {
tracing::warn!("no layers to compact with gc: gc_cutoff not generated yet, skipping gc bottom-most compaction");
return Ok(vec![]);
}
// Split compaction job to about 4GB each
const GC_COMPACT_MAX_SIZE_MB: u64 = 4 * 1024;
let sub_compaction_max_job_size_mb =
@@ -2338,6 +2345,11 @@ impl Timeline {
// each of the retain_lsn. Therefore, if the user-provided `compact_lsn_range.end` is larger than the real gc cutoff, we will use
// the real cutoff.
let mut gc_cutoff = if compact_lsn_range.end == Lsn::MAX {
if real_gc_cutoff == Lsn::INVALID {
// If the gc_cutoff is not generated yet, we should not compact anything.
tracing::warn!("no layers to compact with gc: gc_cutoff not generated yet, skipping gc bottom-most compaction");
return Ok(());
}
real_gc_cutoff
} else {
compact_lsn_range.end
@@ -2869,7 +2881,7 @@ impl Timeline {
"produced {} delta layers and {} image layers, {} layers are kept",
produced_delta_layers_len,
produced_image_layers_len,
layer_selection.len()
keep_layers.len()
);
// Step 3: Place back to the layer map.
@@ -2915,8 +2927,28 @@ impl Timeline {
// be batched into `schedule_compaction_update`.
let disk_consistent_lsn = self.disk_consistent_lsn.load();
self.schedule_uploads(disk_consistent_lsn, None)?;
// If a layer gets rewritten throughout gc-compaction, we need to keep that layer only in `compact_to` instead
// of `compact_from`.
let compact_from = {
let mut compact_from = Vec::new();
let mut compact_to_set = HashMap::new();
for layer in &compact_to {
compact_to_set.insert(layer.layer_desc().key(), layer);
}
for layer in &layer_selection {
if let Some(to) = compact_to_set.get(&layer.layer_desc().key()) {
tracing::info!(
"skipping delete {} because found same layer key at different generation {}",
layer, to
);
} else {
compact_from.push(layer.clone());
}
}
compact_from
};
self.remote_client
.schedule_compaction_update(&layer_selection, &compact_to)?;
.schedule_compaction_update(&compact_from, &compact_to)?;
drop(gc_lock);

View File

@@ -337,16 +337,45 @@ impl OpenLayerManager {
compact_to: &[ResidentLayer],
metrics: &TimelineMetrics,
) {
// We can simply reuse compact l0 logic. Use a different function name to indicate a different type of layer map modification.
self.finish_compact_l0(compact_from, compact_to, metrics)
// gc-compaction could contain layer rewrites. We need to delete the old layers and insert the new ones.
// Match the old layers with the new layers
let mut add_layers = HashMap::new();
let mut rewrite_layers = HashMap::new();
let mut drop_layers = HashMap::new();
for layer in compact_from {
drop_layers.insert(layer.layer_desc().key(), layer.clone());
}
for layer in compact_to {
if let Some(old_layer) = drop_layers.remove(&layer.layer_desc().key()) {
rewrite_layers.insert(layer.layer_desc().key(), (old_layer.clone(), layer.clone()));
} else {
add_layers.insert(layer.layer_desc().key(), layer.clone());
}
}
let add_layers = add_layers.values().cloned().collect::<Vec<_>>();
let drop_layers = drop_layers.values().cloned().collect::<Vec<_>>();
let rewrite_layers = rewrite_layers.values().cloned().collect::<Vec<_>>();
self.rewrite_layers_inner(&rewrite_layers, &drop_layers, &add_layers, metrics);
}
/// Called post-compaction when some previous generation image layers were trimmed.
pub(crate) fn rewrite_layers(
pub fn rewrite_layers(
&mut self,
rewrite_layers: &[(Layer, ResidentLayer)],
drop_layers: &[Layer],
metrics: &TimelineMetrics,
) {
self.rewrite_layers_inner(rewrite_layers, drop_layers, &[], metrics);
}
fn rewrite_layers_inner(
&mut self,
rewrite_layers: &[(Layer, ResidentLayer)],
drop_layers: &[Layer],
add_layers: &[ResidentLayer],
metrics: &TimelineMetrics,
) {
let mut updates = self.layer_map.batch_update();
for (old_layer, new_layer) in rewrite_layers {
@@ -382,6 +411,10 @@ impl OpenLayerManager {
for l in drop_layers {
Self::delete_historic_layer(l, &mut updates, &mut self.layer_fmgr);
}
for l in add_layers {
Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
metrics.record_new_file_metrics(l.layer_desc().file_size);
}
updates.flush();
}

View File

@@ -1,6 +1,8 @@
from __future__ import annotations
import json
import math
import random
import time
from enum import StrEnum
@@ -128,11 +130,6 @@ def test_pageserver_gc_compaction_smoke(neon_env_builder: NeonEnvBuilder, with_b
}
env = neon_env_builder.init_start(initial_tenant_conf=SMOKE_CONF)
env.pageserver.allowed_errors.append(
r".*failed to acquire partition lock during gc-compaction.*"
)
env.pageserver.allowed_errors.append(r".*repartition() called concurrently.*")
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
@@ -147,6 +144,10 @@ def test_pageserver_gc_compaction_smoke(neon_env_builder: NeonEnvBuilder, with_b
log.info("Writing initial data ...")
workload.write_rows(row_count, env.pageserver.id)
ps_http.timeline_gc(
tenant_id, timeline_id, None
) # Force refresh gc info to have gc_cutoff generated
child_workloads: list[Workload] = []
for i in range(1, churn_rounds + 1):
@@ -198,6 +199,230 @@ def test_pageserver_gc_compaction_smoke(neon_env_builder: NeonEnvBuilder, with_b
ps_http.timeline_gc(tenant_id, timeline_id, None)
@pytest.mark.parametrize(
"compaction_mode",
["before_restart", "after_restart"],
)
def test_pageserver_gc_compaction_idempotent(
neon_env_builder: NeonEnvBuilder, compaction_mode: str
):
"""
Do gc-compaction twice without writing any new data and see if anything breaks.
We run this test in two modes:
- before_restart: run two gc-compactions before pageserver restart
- after_restart: run one gc-compaction before and one after pageserver restart
"""
SMOKE_CONF = {
# Run both gc and gc-compaction.
"gc_period": "5s",
"compaction_period": "5s",
# No PiTR interval and small GC horizon
"pitr_interval": "0s",
"gc_horizon": 1024,
"lsn_lease_length": "0s",
}
env = neon_env_builder.init_start(initial_tenant_conf=SMOKE_CONF)
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
# Only in testing mode: the warning is expected because we rewrite a layer file of different generations.
# We could potentially patch the sanity-check code to not emit the warning in the future.
env.pageserver.allowed_errors.append(".*was unlinked but was not dangling.*")
row_count = 10000
ps_http = env.pageserver.http_client()
workload = Workload(env, tenant_id, timeline_id)
workload.init(env.pageserver.id)
workload.write_rows(row_count, env.pageserver.id)
child_workloads: list[Workload] = []
def compaction_finished():
queue_depth = len(ps_http.timeline_compact_info(tenant_id, timeline_id))
assert queue_depth == 0
workload.churn_rows(row_count, env.pageserver.id)
env.create_branch("child_branch") # so that we have a retain_lsn
workload.churn_rows(row_count, env.pageserver.id)
# compact 3 times if mode is before_restart
n_compactions = 3 if compaction_mode == "before_restart" else 1
for _ in range(n_compactions):
# Force refresh gc info to have gc_cutoff generated
ps_http.timeline_gc(tenant_id, timeline_id, None)
ps_http.timeline_compact(
tenant_id,
timeline_id,
enhanced_gc_bottom_most_compaction=True,
body={
"scheduled": True,
"sub_compaction": True,
"compact_key_range": {
"start": "000000000000000000000000000000000000",
"end": "030000000000000000000000000000000000",
},
"sub_compaction_max_job_size_mb": 16,
},
)
wait_until(compaction_finished, timeout=60)
if compaction_mode == "after_restart":
env.pageserver.restart(True)
ps_http.timeline_gc(
tenant_id, timeline_id, None
) # Force refresh gc info to have gc_cutoff generated
for _ in range(3):
ps_http.timeline_compact(
tenant_id,
timeline_id,
enhanced_gc_bottom_most_compaction=True,
body={
"scheduled": True,
"sub_compaction": True,
"compact_key_range": {
"start": "000000000000000000000000000000000000",
"end": "030000000000000000000000000000000000",
},
"sub_compaction_max_job_size_mb": 16,
},
)
wait_until(compaction_finished, timeout=60)
# ensure gc_compaction is scheduled and it's actually running (instead of skipping due to no layers picked)
env.pageserver.assert_log_contains(
"scheduled_compact_timeline.*picked .* layers for compaction"
)
# ensure we hit the duplicated layer key warning at least once: we did two compactions consecutively,
# and the second one should have hit the duplicated layer key warning.
if compaction_mode == "before_restart":
env.pageserver.assert_log_contains("duplicated layer key in the same generation")
else:
env.pageserver.assert_log_contains("same layer key at different generation")
log.info("Validating at workload end ...")
workload.validate(env.pageserver.id)
for child_workload in child_workloads:
log.info(f"Validating at branch {child_workload.branch_name}")
child_workload.validate(env.pageserver.id)
# Run a legacy compaction+gc to ensure gc-compaction can coexist with legacy compaction.
ps_http.timeline_checkpoint(tenant_id, timeline_id, wait_until_uploaded=True)
ps_http.timeline_gc(tenant_id, timeline_id, None)
@skip_in_debug_build("only run with release build")
def test_pageserver_gc_compaction_interrupt(neon_env_builder: NeonEnvBuilder):
"""
Force interrupt a gc-compaction and see if anything breaks.
"""
SMOKE_CONF = {
# Run both gc and gc-compaction.
"gc_period": "5s",
"compaction_period": "5s",
# No PiTR interval and small GC horizon
"pitr_interval": "0s",
"gc_horizon": "1024",
"lsn_lease_length": "0s",
}
env = neon_env_builder.init_start(initial_tenant_conf=SMOKE_CONF)
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
# Only in testing mode: the warning is expected because we rewrite a layer file of different generations.
# We could potentially patch the sanity-check code to not emit the warning in the future.
env.pageserver.allowed_errors.append(".*was unlinked but was not dangling.*")
row_count = 10000
churn_rounds = 20
ps_http = env.pageserver.http_client()
workload = Workload(env, tenant_id, timeline_id)
workload.init(env.pageserver.id)
log.info("Writing initial data ...")
workload.write_rows(row_count, env.pageserver.id)
def compaction_finished():
queue_depth = len(ps_http.timeline_compact_info(tenant_id, timeline_id))
assert queue_depth == 0
expected_compaction_time_seconds = 5.0
ps_http.timeline_gc(
tenant_id, timeline_id, None
) # Force refresh gc info to have gc_cutoff generated
for i in range(1, churn_rounds + 1):
log.info(f"Running churn round {i}/{churn_rounds} ...")
workload.churn_rows(row_count, env.pageserver.id)
ps_http.timeline_compact(
tenant_id,
timeline_id,
enhanced_gc_bottom_most_compaction=True,
body={
"scheduled": True,
"sub_compaction": True,
"compact_key_range": {
"start": "000000000000000000000000000000000000",
"end": "030000000000000000000000000000000000",
},
"sub_compaction_max_job_size_mb": 16,
},
)
# sleep random seconds between 0 and max(compaction_time); if the result is 0, wait until the compaction is complete
# This would hopefully trigger the restart at different periods of the compaction:
# - while we are doing the compaction
# - while we finished the compaction but not yet uploaded the metadata
# - after we uploaded the metadata
time_to_sleep = random.randint(0, max(5, math.ceil(expected_compaction_time_seconds)))
if time_to_sleep == 0 or i == 1:
start = time.time()
wait_until(compaction_finished, timeout=60)
end = time.time()
expected_compaction_time_seconds = end - start
log.info(
f"expected_compaction_time_seconds updated to {expected_compaction_time_seconds} seconds"
)
else:
time.sleep(time_to_sleep)
env.pageserver.restart(True)
ps_http.timeline_gc(
tenant_id, timeline_id, None
) # Force refresh gc info to have gc_cutoff generated
ps_http.timeline_compact(
tenant_id,
timeline_id,
enhanced_gc_bottom_most_compaction=True,
body={
"scheduled": True,
"sub_compaction": True,
"compact_key_range": {
"start": "000000000000000000000000000000000000",
"end": "030000000000000000000000000000000000",
},
"sub_compaction_max_job_size_mb": 16,
},
)
workload.validate(env.pageserver.id)
wait_until(compaction_finished, timeout=60)
# ensure gc_compaction is scheduled and it's actually running (instead of skipping due to no layers picked)
env.pageserver.assert_log_contains(
"scheduled_compact_timeline.*picked .* layers for compaction"
)
log.info("Validating at workload end ...")
workload.validate(env.pageserver.id)
# Run a legacy compaction+gc to ensure gc-compaction can coexist with legacy compaction.
ps_http.timeline_checkpoint(tenant_id, timeline_id, wait_until_uploaded=True)
ps_http.timeline_gc(tenant_id, timeline_id, None)
# Stripe sizes in number of pages.
TINY_STRIPES = 16
LARGE_STRIPES = 32768
@@ -238,7 +463,9 @@ def test_sharding_compaction(
"pitr_interval": "0s",
# disable background compaction and GC. We invoke it manually when we want it to happen.
"gc_period": "0s",
"gc_horizon": f"{128 * 1024}",
"compaction_period": "0s",
"lsn_lease_length": "0s",
# create image layers eagerly: we want to exercise image layer creation in this test.
"image_creation_threshold": "1",
"image_layer_creation_check_threshold": 0,
@@ -313,6 +540,8 @@ def test_sharding_compaction(
for shard in env.storage_controller.locate(tenant_id):
pageserver = env.get_pageserver(shard["node_id"])
tenant_shard_id = shard["shard_id"]
# Force refresh gc info to have gc_cutoff generated
pageserver.http_client().timeline_gc(tenant_shard_id, timeline_id, None)
pageserver.http_client().timeline_compact(
tenant_shard_id,
timeline_id,