diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 05f8d476f9..2042a18e96 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -1776,7 +1776,10 @@ impl Timeline { base_img_from_ancestor: Option<(Key, Lsn, Bytes)>, ) -> anyhow::Result { // Pre-checks for the invariants - if cfg!(debug_assertions) { + + let debug_mode = cfg!(debug_assertions) || cfg!(feature = "testing"); + + if debug_mode { for (log_key, _, _) in full_history { assert_eq!(log_key, &key, "mismatched key"); } @@ -1922,15 +1925,19 @@ impl Timeline { output } + let mut key_exists = false; for (i, split_for_lsn) in split_history.into_iter().enumerate() { // TODO: there could be image keys inside the splits, and we can compute records_since_last_image accordingly. records_since_last_image += split_for_lsn.len(); - let generate_image = if i == 0 && !has_ancestor { + // Whether to produce an image into the final layer files + let produce_image = if i == 0 && !has_ancestor { // We always generate images for the first batch (below horizon / lowest retain_lsn) true } else if i == batch_cnt - 1 { // Do not generate images for the last batch (above horizon) false + } else if records_since_last_image == 0 { + false } else if records_since_last_image >= delta_threshold_cnt { // Generate images when there are too many records true @@ -1945,29 +1952,45 @@ impl Timeline { break; } } - if let Some((_, _, val)) = replay_history.first() { - if !val.will_init() { - return Err(anyhow::anyhow!("invalid history, no base image")).with_context( - || { - generate_debug_trace( - Some(&replay_history), - full_history, - retain_lsn_below_horizon, - horizon, - ) - }, - ); - } + if replay_history.is_empty() && !key_exists { + // The key does not exist at earlier LSN, we can skip this iteration. + retention.push(Vec::new()); + continue; + } else { + key_exists = true; } - if generate_image && records_since_last_image > 0 { + let Some((_, _, val)) = replay_history.first() else { + unreachable!("replay history should not be empty once it exists") + }; + if !val.will_init() { + return Err(anyhow::anyhow!("invalid history, no base image")).with_context(|| { + generate_debug_trace( + Some(&replay_history), + full_history, + retain_lsn_below_horizon, + horizon, + ) + }); + } + // Whether to reconstruct the image. In debug mode, we will generate an image + // at every retain_lsn to ensure data is not corrupted, but we won't put the + // image into the final layer. + let generate_image = produce_image || debug_mode; + if produce_image { records_since_last_image = 0; - let replay_history_for_debug = if cfg!(debug_assertions) { + } + let img_and_lsn = if generate_image { + let replay_history_for_debug = if debug_mode { Some(replay_history.clone()) } else { None }; let replay_history_for_debug_ref = replay_history_for_debug.as_deref(); - let history = std::mem::take(&mut replay_history); + let history = if produce_image { + std::mem::take(&mut replay_history) + } else { + replay_history.clone() + }; let mut img = None; let mut records = Vec::with_capacity(history.len()); if let (_, lsn, Value::Image(val)) = history.first().as_ref().unwrap() { @@ -2004,8 +2027,20 @@ impl Timeline { } records.reverse(); let state = ValueReconstructState { img, records }; - let request_lsn = lsn_split_points[i]; // last batch does not generate image so i is always in range + // last batch does not generate image so i is always in range, unless we force generate + // an image during testing + let request_lsn = if i >= lsn_split_points.len() { + Lsn::MAX + } else { + lsn_split_points[i] + }; let img = self.reconstruct_value(key, request_lsn, state).await?; + Some((request_lsn, img)) + } else { + None + }; + if produce_image { + let (request_lsn, img) = img_and_lsn.unwrap(); replay_history.push((key, request_lsn, Value::Image(img.clone()))); retention.push(vec![(request_lsn, Value::Image(img))]); } else { @@ -2273,6 +2308,8 @@ impl Timeline { let compact_key_range = job.compact_key_range; let compact_lsn_range = job.compact_lsn_range; + let debug_mode = cfg!(debug_assertions) || cfg!(feature = "testing"); + info!("running enhanced gc bottom-most compaction, dry_run={dry_run}, compact_key_range={}..{}, compact_lsn_range={}..{}", compact_key_range.start, compact_key_range.end, compact_lsn_range.start, compact_lsn_range.end); scopeguard::defer! { @@ -2398,7 +2435,7 @@ impl Timeline { .first() .copied() .unwrap_or(job_desc.gc_cutoff); - if cfg!(debug_assertions) { + if debug_mode { assert_eq!( res, job_desc diff --git a/test_runner/fixtures/workload.py b/test_runner/fixtures/workload.py index 1b8c9fef44..eea0ec2b95 100644 --- a/test_runner/fixtures/workload.py +++ b/test_runner/fixtures/workload.py @@ -53,6 +53,22 @@ class Workload: self._endpoint: Endpoint | None = None self._endpoint_opts = endpoint_opts or {} + def branch( + self, + timeline_id: TimelineId, + branch_name: str | None = None, + endpoint_opts: dict[str, Any] | None = None, + ) -> Workload: + """ + Checkpoint the current status of the workload in case of branching + """ + branch_workload = Workload( + self.env, self.tenant_id, timeline_id, branch_name, endpoint_opts + ) + branch_workload.expect_rows = self.expect_rows + branch_workload.churn_cursor = self.churn_cursor + return branch_workload + def reconfigure(self) -> None: """ Request the endpoint to reconfigure based on location reported by storage controller diff --git a/test_runner/regress/test_compaction.py b/test_runner/regress/test_compaction.py index fe0422088a..d0a2349ccf 100644 --- a/test_runner/regress/test_compaction.py +++ b/test_runner/regress/test_compaction.py @@ -112,7 +112,11 @@ page_cache_size=10 @skip_in_debug_build("only run with release build") -def test_pageserver_gc_compaction_smoke(neon_env_builder: NeonEnvBuilder): +@pytest.mark.parametrize( + "with_branches", + ["with_branches", "no_branches"], +) +def test_pageserver_gc_compaction_smoke(neon_env_builder: NeonEnvBuilder, with_branches: str): SMOKE_CONF = { # Run both gc and gc-compaction. "gc_period": "5s", @@ -143,12 +147,17 @@ def test_pageserver_gc_compaction_smoke(neon_env_builder: NeonEnvBuilder): log.info("Writing initial data ...") workload.write_rows(row_count, env.pageserver.id) + child_workloads: list[Workload] = [] + for i in range(1, churn_rounds + 1): if i % 10 == 0: log.info(f"Running churn round {i}/{churn_rounds} ...") - - if (i - 1) % 10 == 0: - # Run gc-compaction every 10 rounds to ensure the test doesn't take too long time. + if i % 10 == 5 and with_branches == "with_branches": + branch_name = f"child-{i}" + branch_timeline_id = env.create_branch(branch_name) + child_workloads.append(workload.branch(branch_timeline_id, branch_name)) + if (i - 1) % 10 == 0 or (i - 1) % 10 == 1: + # Run gc-compaction twice every 10 rounds to ensure the test doesn't take too long time. ps_http.timeline_compact( tenant_id, timeline_id, @@ -179,6 +188,9 @@ def test_pageserver_gc_compaction_smoke(neon_env_builder: NeonEnvBuilder): log.info("Validating at workload end ...") workload.validate(env.pageserver.id) + for child_workload in child_workloads: + log.info(f"Validating at branch {child_workload.branch_name}") + child_workload.validate(env.pageserver.id) # Run a legacy compaction+gc to ensure gc-compaction can coexist with legacy compaction. ps_http.timeline_checkpoint(tenant_id, timeline_id, wait_until_uploaded=True)