diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 4d5dc2d8a9..28c3381318 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -436,12 +436,14 @@ impl KeyHistoryRetention { if dry_run { return true; } - let guard = tline.layers.read().await; - if !guard.contains_key(key) { - return false; + let layer_generation; + { + let guard = tline.layers.read().await; + if !guard.contains_key(key) { + return false; + } + layer_generation = guard.get_from_key(key).metadata().generation; } - let layer_generation = guard.get_from_key(key).metadata().generation; - drop(guard); if layer_generation == tline.generation { info!( key=%key, @@ -2138,6 +2140,11 @@ impl Timeline { self.get_gc_compaction_watermark() }; + if compact_below_lsn == Lsn::INVALID { + tracing::warn!("no layers to compact with gc: gc_cutoff not generated yet, skipping gc bottom-most compaction"); + return Ok(vec![]); + } + // Split compaction job to about 4GB each const GC_COMPACT_MAX_SIZE_MB: u64 = 4 * 1024; let sub_compaction_max_job_size_mb = @@ -2338,6 +2345,11 @@ impl Timeline { // each of the retain_lsn. Therefore, if the user-provided `compact_lsn_range.end` is larger than the real gc cutoff, we will use // the real cutoff. let mut gc_cutoff = if compact_lsn_range.end == Lsn::MAX { + if real_gc_cutoff == Lsn::INVALID { + // If the gc_cutoff is not generated yet, we should not compact anything. + tracing::warn!("no layers to compact with gc: gc_cutoff not generated yet, skipping gc bottom-most compaction"); + return Ok(()); + } real_gc_cutoff } else { compact_lsn_range.end @@ -2869,7 +2881,7 @@ impl Timeline { "produced {} delta layers and {} image layers, {} layers are kept", produced_delta_layers_len, produced_image_layers_len, - layer_selection.len() + keep_layers.len() ); // Step 3: Place back to the layer map. @@ -2915,8 +2927,28 @@ impl Timeline { // be batched into `schedule_compaction_update`. let disk_consistent_lsn = self.disk_consistent_lsn.load(); self.schedule_uploads(disk_consistent_lsn, None)?; + // If a layer gets rewritten throughout gc-compaction, we need to keep that layer only in `compact_to` instead + // of `compact_from`. + let compact_from = { + let mut compact_from = Vec::new(); + let mut compact_to_set = HashMap::new(); + for layer in &compact_to { + compact_to_set.insert(layer.layer_desc().key(), layer); + } + for layer in &layer_selection { + if let Some(to) = compact_to_set.get(&layer.layer_desc().key()) { + tracing::info!( + "skipping delete {} because found same layer key at different generation {}", + layer, to + ); + } else { + compact_from.push(layer.clone()); + } + } + compact_from + }; self.remote_client - .schedule_compaction_update(&layer_selection, &compact_to)?; + .schedule_compaction_update(&compact_from, &compact_to)?; drop(gc_lock); diff --git a/pageserver/src/tenant/timeline/layer_manager.rs b/pageserver/src/tenant/timeline/layer_manager.rs index 3888e7f86a..f1cef7778c 100644 --- a/pageserver/src/tenant/timeline/layer_manager.rs +++ b/pageserver/src/tenant/timeline/layer_manager.rs @@ -337,16 +337,45 @@ impl OpenLayerManager { compact_to: &[ResidentLayer], metrics: &TimelineMetrics, ) { - // We can simply reuse compact l0 logic. Use a different function name to indicate a different type of layer map modification. - self.finish_compact_l0(compact_from, compact_to, metrics) + // gc-compaction could contain layer rewrites. We need to delete the old layers and insert the new ones. + + // Match the old layers with the new layers + let mut add_layers = HashMap::new(); + let mut rewrite_layers = HashMap::new(); + let mut drop_layers = HashMap::new(); + for layer in compact_from { + drop_layers.insert(layer.layer_desc().key(), layer.clone()); + } + for layer in compact_to { + if let Some(old_layer) = drop_layers.remove(&layer.layer_desc().key()) { + rewrite_layers.insert(layer.layer_desc().key(), (old_layer.clone(), layer.clone())); + } else { + add_layers.insert(layer.layer_desc().key(), layer.clone()); + } + } + let add_layers = add_layers.values().cloned().collect::>(); + let drop_layers = drop_layers.values().cloned().collect::>(); + let rewrite_layers = rewrite_layers.values().cloned().collect::>(); + + self.rewrite_layers_inner(&rewrite_layers, &drop_layers, &add_layers, metrics); } /// Called post-compaction when some previous generation image layers were trimmed. - pub(crate) fn rewrite_layers( + pub fn rewrite_layers( &mut self, rewrite_layers: &[(Layer, ResidentLayer)], drop_layers: &[Layer], metrics: &TimelineMetrics, + ) { + self.rewrite_layers_inner(rewrite_layers, drop_layers, &[], metrics); + } + + fn rewrite_layers_inner( + &mut self, + rewrite_layers: &[(Layer, ResidentLayer)], + drop_layers: &[Layer], + add_layers: &[ResidentLayer], + metrics: &TimelineMetrics, ) { let mut updates = self.layer_map.batch_update(); for (old_layer, new_layer) in rewrite_layers { @@ -382,6 +411,10 @@ impl OpenLayerManager { for l in drop_layers { Self::delete_historic_layer(l, &mut updates, &mut self.layer_fmgr); } + for l in add_layers { + Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr); + metrics.record_new_file_metrics(l.layer_desc().file_size); + } updates.flush(); } diff --git a/test_runner/regress/test_compaction.py b/test_runner/regress/test_compaction.py index fde26e1533..2edfc884ad 100644 --- a/test_runner/regress/test_compaction.py +++ b/test_runner/regress/test_compaction.py @@ -1,6 +1,8 @@ from __future__ import annotations import json +import math +import random import time from enum import StrEnum @@ -128,11 +130,6 @@ def test_pageserver_gc_compaction_smoke(neon_env_builder: NeonEnvBuilder, with_b } env = neon_env_builder.init_start(initial_tenant_conf=SMOKE_CONF) - env.pageserver.allowed_errors.append( - r".*failed to acquire partition lock during gc-compaction.*" - ) - env.pageserver.allowed_errors.append(r".*repartition() called concurrently.*") - tenant_id = env.initial_tenant timeline_id = env.initial_timeline @@ -147,6 +144,10 @@ def test_pageserver_gc_compaction_smoke(neon_env_builder: NeonEnvBuilder, with_b log.info("Writing initial data ...") workload.write_rows(row_count, env.pageserver.id) + ps_http.timeline_gc( + tenant_id, timeline_id, None + ) # Force refresh gc info to have gc_cutoff generated + child_workloads: list[Workload] = [] for i in range(1, churn_rounds + 1): @@ -198,6 +199,230 @@ def test_pageserver_gc_compaction_smoke(neon_env_builder: NeonEnvBuilder, with_b ps_http.timeline_gc(tenant_id, timeline_id, None) +@pytest.mark.parametrize( + "compaction_mode", + ["before_restart", "after_restart"], +) +def test_pageserver_gc_compaction_idempotent( + neon_env_builder: NeonEnvBuilder, compaction_mode: str +): + """ + Do gc-compaction twice without writing any new data and see if anything breaks. + We run this test in two modes: + - before_restart: run two gc-compactions before pageserver restart + - after_restart: run one gc-compaction before and one after pageserver restart + """ + SMOKE_CONF = { + # Run both gc and gc-compaction. + "gc_period": "5s", + "compaction_period": "5s", + # No PiTR interval and small GC horizon + "pitr_interval": "0s", + "gc_horizon": 1024, + "lsn_lease_length": "0s", + } + + env = neon_env_builder.init_start(initial_tenant_conf=SMOKE_CONF) + tenant_id = env.initial_tenant + timeline_id = env.initial_timeline + + # Only in testing mode: the warning is expected because we rewrite a layer file of different generations. + # We could potentially patch the sanity-check code to not emit the warning in the future. + env.pageserver.allowed_errors.append(".*was unlinked but was not dangling.*") + + row_count = 10000 + + ps_http = env.pageserver.http_client() + + workload = Workload(env, tenant_id, timeline_id) + workload.init(env.pageserver.id) + + workload.write_rows(row_count, env.pageserver.id) + + child_workloads: list[Workload] = [] + + def compaction_finished(): + queue_depth = len(ps_http.timeline_compact_info(tenant_id, timeline_id)) + assert queue_depth == 0 + + workload.churn_rows(row_count, env.pageserver.id) + env.create_branch("child_branch") # so that we have a retain_lsn + workload.churn_rows(row_count, env.pageserver.id) + # compact 3 times if mode is before_restart + n_compactions = 3 if compaction_mode == "before_restart" else 1 + for _ in range(n_compactions): + # Force refresh gc info to have gc_cutoff generated + ps_http.timeline_gc(tenant_id, timeline_id, None) + ps_http.timeline_compact( + tenant_id, + timeline_id, + enhanced_gc_bottom_most_compaction=True, + body={ + "scheduled": True, + "sub_compaction": True, + "compact_key_range": { + "start": "000000000000000000000000000000000000", + "end": "030000000000000000000000000000000000", + }, + "sub_compaction_max_job_size_mb": 16, + }, + ) + wait_until(compaction_finished, timeout=60) + if compaction_mode == "after_restart": + env.pageserver.restart(True) + ps_http.timeline_gc( + tenant_id, timeline_id, None + ) # Force refresh gc info to have gc_cutoff generated + for _ in range(3): + ps_http.timeline_compact( + tenant_id, + timeline_id, + enhanced_gc_bottom_most_compaction=True, + body={ + "scheduled": True, + "sub_compaction": True, + "compact_key_range": { + "start": "000000000000000000000000000000000000", + "end": "030000000000000000000000000000000000", + }, + "sub_compaction_max_job_size_mb": 16, + }, + ) + wait_until(compaction_finished, timeout=60) + + # ensure gc_compaction is scheduled and it's actually running (instead of skipping due to no layers picked) + env.pageserver.assert_log_contains( + "scheduled_compact_timeline.*picked .* layers for compaction" + ) + + # ensure we hit the duplicated layer key warning at least once: we did two compactions consecutively, + # and the second one should have hit the duplicated layer key warning. + if compaction_mode == "before_restart": + env.pageserver.assert_log_contains("duplicated layer key in the same generation") + else: + env.pageserver.assert_log_contains("same layer key at different generation") + + log.info("Validating at workload end ...") + workload.validate(env.pageserver.id) + for child_workload in child_workloads: + log.info(f"Validating at branch {child_workload.branch_name}") + child_workload.validate(env.pageserver.id) + + # Run a legacy compaction+gc to ensure gc-compaction can coexist with legacy compaction. + ps_http.timeline_checkpoint(tenant_id, timeline_id, wait_until_uploaded=True) + ps_http.timeline_gc(tenant_id, timeline_id, None) + + +@skip_in_debug_build("only run with release build") +def test_pageserver_gc_compaction_interrupt(neon_env_builder: NeonEnvBuilder): + """ + Force interrupt a gc-compaction and see if anything breaks. + """ + SMOKE_CONF = { + # Run both gc and gc-compaction. + "gc_period": "5s", + "compaction_period": "5s", + # No PiTR interval and small GC horizon + "pitr_interval": "0s", + "gc_horizon": "1024", + "lsn_lease_length": "0s", + } + + env = neon_env_builder.init_start(initial_tenant_conf=SMOKE_CONF) + tenant_id = env.initial_tenant + timeline_id = env.initial_timeline + + # Only in testing mode: the warning is expected because we rewrite a layer file of different generations. + # We could potentially patch the sanity-check code to not emit the warning in the future. + env.pageserver.allowed_errors.append(".*was unlinked but was not dangling.*") + + row_count = 10000 + churn_rounds = 20 + + ps_http = env.pageserver.http_client() + + workload = Workload(env, tenant_id, timeline_id) + workload.init(env.pageserver.id) + + log.info("Writing initial data ...") + workload.write_rows(row_count, env.pageserver.id) + + def compaction_finished(): + queue_depth = len(ps_http.timeline_compact_info(tenant_id, timeline_id)) + assert queue_depth == 0 + + expected_compaction_time_seconds = 5.0 + ps_http.timeline_gc( + tenant_id, timeline_id, None + ) # Force refresh gc info to have gc_cutoff generated + for i in range(1, churn_rounds + 1): + log.info(f"Running churn round {i}/{churn_rounds} ...") + workload.churn_rows(row_count, env.pageserver.id) + ps_http.timeline_compact( + tenant_id, + timeline_id, + enhanced_gc_bottom_most_compaction=True, + body={ + "scheduled": True, + "sub_compaction": True, + "compact_key_range": { + "start": "000000000000000000000000000000000000", + "end": "030000000000000000000000000000000000", + }, + "sub_compaction_max_job_size_mb": 16, + }, + ) + # sleep random seconds between 0 and max(compaction_time); if the result is 0, wait until the compaction is complete + # This would hopefully trigger the restart at different periods of the compaction: + # - while we are doing the compaction + # - while we finished the compaction but not yet uploaded the metadata + # - after we uploaded the metadata + time_to_sleep = random.randint(0, max(5, math.ceil(expected_compaction_time_seconds))) + if time_to_sleep == 0 or i == 1: + start = time.time() + wait_until(compaction_finished, timeout=60) + end = time.time() + expected_compaction_time_seconds = end - start + log.info( + f"expected_compaction_time_seconds updated to {expected_compaction_time_seconds} seconds" + ) + else: + time.sleep(time_to_sleep) + env.pageserver.restart(True) + ps_http.timeline_gc( + tenant_id, timeline_id, None + ) # Force refresh gc info to have gc_cutoff generated + ps_http.timeline_compact( + tenant_id, + timeline_id, + enhanced_gc_bottom_most_compaction=True, + body={ + "scheduled": True, + "sub_compaction": True, + "compact_key_range": { + "start": "000000000000000000000000000000000000", + "end": "030000000000000000000000000000000000", + }, + "sub_compaction_max_job_size_mb": 16, + }, + ) + workload.validate(env.pageserver.id) + + wait_until(compaction_finished, timeout=60) + + # ensure gc_compaction is scheduled and it's actually running (instead of skipping due to no layers picked) + env.pageserver.assert_log_contains( + "scheduled_compact_timeline.*picked .* layers for compaction" + ) + + log.info("Validating at workload end ...") + workload.validate(env.pageserver.id) + + # Run a legacy compaction+gc to ensure gc-compaction can coexist with legacy compaction. + ps_http.timeline_checkpoint(tenant_id, timeline_id, wait_until_uploaded=True) + ps_http.timeline_gc(tenant_id, timeline_id, None) + + # Stripe sizes in number of pages. TINY_STRIPES = 16 LARGE_STRIPES = 32768 @@ -238,7 +463,9 @@ def test_sharding_compaction( "pitr_interval": "0s", # disable background compaction and GC. We invoke it manually when we want it to happen. "gc_period": "0s", + "gc_horizon": f"{128 * 1024}", "compaction_period": "0s", + "lsn_lease_length": "0s", # create image layers eagerly: we want to exercise image layer creation in this test. "image_creation_threshold": "1", "image_layer_creation_check_threshold": 0, @@ -313,6 +540,8 @@ def test_sharding_compaction( for shard in env.storage_controller.locate(tenant_id): pageserver = env.get_pageserver(shard["node_id"]) tenant_shard_id = shard["shard_id"] + # Force refresh gc info to have gc_cutoff generated + pageserver.http_client().timeline_gc(tenant_shard_id, timeline_id, None) pageserver.http_client().timeline_compact( tenant_shard_id, timeline_id,