feat(pageserver): validate data integrity during gc-compaction (#10131)

## Problem

part of https://github.com/neondatabase/neon/issues/9114
part of investigation of
https://github.com/neondatabase/neon/issues/10049

## Summary of changes

* If `cfg!(test) or cfg!(feature = testing)`, then we will always try
generating an image to ensure the history is replayable, but not put the
image layer into the final layer results, therefore discovering wrong
key history before we hit a read error.
* I suspect it's easier to trigger some races if gc-compaction is
continuously run on a timeline, so I increased the frequency to twice
per 10 churns.
* Also, create branches in gc-compaction smoke tests to get more test
coverage.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
Co-authored-by: Arpad Müller <arpad@neon.tech>
This commit is contained in:
Alex Chi Z.
2025-01-15 17:04:06 -05:00
committed by GitHub
parent 55a68b28a2
commit a753349cb0
3 changed files with 89 additions and 24 deletions

View File

@@ -1776,7 +1776,10 @@ impl Timeline {
base_img_from_ancestor: Option<(Key, Lsn, Bytes)>,
) -> anyhow::Result<KeyHistoryRetention> {
// Pre-checks for the invariants
if cfg!(debug_assertions) {
let debug_mode = cfg!(debug_assertions) || cfg!(feature = "testing");
if debug_mode {
for (log_key, _, _) in full_history {
assert_eq!(log_key, &key, "mismatched key");
}
@@ -1922,15 +1925,19 @@ impl Timeline {
output
}
let mut key_exists = false;
for (i, split_for_lsn) in split_history.into_iter().enumerate() {
// TODO: there could be image keys inside the splits, and we can compute records_since_last_image accordingly.
records_since_last_image += split_for_lsn.len();
let generate_image = if i == 0 && !has_ancestor {
// Whether to produce an image into the final layer files
let produce_image = if i == 0 && !has_ancestor {
// We always generate images for the first batch (below horizon / lowest retain_lsn)
true
} else if i == batch_cnt - 1 {
// Do not generate images for the last batch (above horizon)
false
} else if records_since_last_image == 0 {
false
} else if records_since_last_image >= delta_threshold_cnt {
// Generate images when there are too many records
true
@@ -1945,29 +1952,45 @@ impl Timeline {
break;
}
}
if let Some((_, _, val)) = replay_history.first() {
if !val.will_init() {
return Err(anyhow::anyhow!("invalid history, no base image")).with_context(
|| {
generate_debug_trace(
Some(&replay_history),
full_history,
retain_lsn_below_horizon,
horizon,
)
},
);
}
if replay_history.is_empty() && !key_exists {
// The key does not exist at earlier LSN, we can skip this iteration.
retention.push(Vec::new());
continue;
} else {
key_exists = true;
}
if generate_image && records_since_last_image > 0 {
let Some((_, _, val)) = replay_history.first() else {
unreachable!("replay history should not be empty once it exists")
};
if !val.will_init() {
return Err(anyhow::anyhow!("invalid history, no base image")).with_context(|| {
generate_debug_trace(
Some(&replay_history),
full_history,
retain_lsn_below_horizon,
horizon,
)
});
}
// Whether to reconstruct the image. In debug mode, we will generate an image
// at every retain_lsn to ensure data is not corrupted, but we won't put the
// image into the final layer.
let generate_image = produce_image || debug_mode;
if produce_image {
records_since_last_image = 0;
let replay_history_for_debug = if cfg!(debug_assertions) {
}
let img_and_lsn = if generate_image {
let replay_history_for_debug = if debug_mode {
Some(replay_history.clone())
} else {
None
};
let replay_history_for_debug_ref = replay_history_for_debug.as_deref();
let history = std::mem::take(&mut replay_history);
let history = if produce_image {
std::mem::take(&mut replay_history)
} else {
replay_history.clone()
};
let mut img = None;
let mut records = Vec::with_capacity(history.len());
if let (_, lsn, Value::Image(val)) = history.first().as_ref().unwrap() {
@@ -2004,8 +2027,20 @@ impl Timeline {
}
records.reverse();
let state = ValueReconstructState { img, records };
let request_lsn = lsn_split_points[i]; // last batch does not generate image so i is always in range
// last batch does not generate image so i is always in range, unless we force generate
// an image during testing
let request_lsn = if i >= lsn_split_points.len() {
Lsn::MAX
} else {
lsn_split_points[i]
};
let img = self.reconstruct_value(key, request_lsn, state).await?;
Some((request_lsn, img))
} else {
None
};
if produce_image {
let (request_lsn, img) = img_and_lsn.unwrap();
replay_history.push((key, request_lsn, Value::Image(img.clone())));
retention.push(vec![(request_lsn, Value::Image(img))]);
} else {
@@ -2273,6 +2308,8 @@ impl Timeline {
let compact_key_range = job.compact_key_range;
let compact_lsn_range = job.compact_lsn_range;
let debug_mode = cfg!(debug_assertions) || cfg!(feature = "testing");
info!("running enhanced gc bottom-most compaction, dry_run={dry_run}, compact_key_range={}..{}, compact_lsn_range={}..{}", compact_key_range.start, compact_key_range.end, compact_lsn_range.start, compact_lsn_range.end);
scopeguard::defer! {
@@ -2398,7 +2435,7 @@ impl Timeline {
.first()
.copied()
.unwrap_or(job_desc.gc_cutoff);
if cfg!(debug_assertions) {
if debug_mode {
assert_eq!(
res,
job_desc

View File

@@ -53,6 +53,22 @@ class Workload:
self._endpoint: Endpoint | None = None
self._endpoint_opts = endpoint_opts or {}
def branch(
self,
timeline_id: TimelineId,
branch_name: str | None = None,
endpoint_opts: dict[str, Any] | None = None,
) -> Workload:
"""
Checkpoint the current status of the workload in case of branching
"""
branch_workload = Workload(
self.env, self.tenant_id, timeline_id, branch_name, endpoint_opts
)
branch_workload.expect_rows = self.expect_rows
branch_workload.churn_cursor = self.churn_cursor
return branch_workload
def reconfigure(self) -> None:
"""
Request the endpoint to reconfigure based on location reported by storage controller

View File

@@ -112,7 +112,11 @@ page_cache_size=10
@skip_in_debug_build("only run with release build")
def test_pageserver_gc_compaction_smoke(neon_env_builder: NeonEnvBuilder):
@pytest.mark.parametrize(
"with_branches",
["with_branches", "no_branches"],
)
def test_pageserver_gc_compaction_smoke(neon_env_builder: NeonEnvBuilder, with_branches: str):
SMOKE_CONF = {
# Run both gc and gc-compaction.
"gc_period": "5s",
@@ -143,12 +147,17 @@ def test_pageserver_gc_compaction_smoke(neon_env_builder: NeonEnvBuilder):
log.info("Writing initial data ...")
workload.write_rows(row_count, env.pageserver.id)
child_workloads: list[Workload] = []
for i in range(1, churn_rounds + 1):
if i % 10 == 0:
log.info(f"Running churn round {i}/{churn_rounds} ...")
if (i - 1) % 10 == 0:
# Run gc-compaction every 10 rounds to ensure the test doesn't take too long time.
if i % 10 == 5 and with_branches == "with_branches":
branch_name = f"child-{i}"
branch_timeline_id = env.create_branch(branch_name)
child_workloads.append(workload.branch(branch_timeline_id, branch_name))
if (i - 1) % 10 == 0 or (i - 1) % 10 == 1:
# Run gc-compaction twice every 10 rounds to ensure the test doesn't take too long time.
ps_http.timeline_compact(
tenant_id,
timeline_id,
@@ -179,6 +188,9 @@ def test_pageserver_gc_compaction_smoke(neon_env_builder: NeonEnvBuilder):
log.info("Validating at workload end ...")
workload.validate(env.pageserver.id)
for child_workload in child_workloads:
log.info(f"Validating at branch {child_workload.branch_name}")
child_workload.validate(env.pageserver.id)
# Run a legacy compaction+gc to ensure gc-compaction can coexist with legacy compaction.
ps_http.timeline_checkpoint(tenant_id, timeline_id, wait_until_uploaded=True)