diff --git a/pageserver/src/tenant/layer_map.rs b/pageserver/src/tenant/layer_map.rs index 8d06ccd565..4428e325cd 100644 --- a/pageserver/src/tenant/layer_map.rs +++ b/pageserver/src/tenant/layer_map.rs @@ -285,6 +285,11 @@ where } } + pub fn contains(&self, key_range: &Range, lsn_range: &Range, is_image: bool) -> bool { + let key = historic_layer_coverage::LayerKey::from_ranges(key_range, lsn_range, is_image); + self.historic.contains(&key) + } + /// /// Remove an on-disk layer from the map. /// diff --git a/pageserver/src/tenant/layer_map/historic_layer_coverage.rs b/pageserver/src/tenant/layer_map/historic_layer_coverage.rs index b63c361314..a834253cca 100644 --- a/pageserver/src/tenant/layer_map/historic_layer_coverage.rs +++ b/pageserver/src/tenant/layer_map/historic_layer_coverage.rs @@ -53,6 +53,20 @@ impl<'a, L: crate::tenant::storage_layer::Layer + ?Sized> From<&'a L> for LayerK } } +impl LayerKey { + pub fn from_ranges( + kr: &Range, + lr: &Range, + is_image: bool, + ) -> Self { + LayerKey { + key: kr.start.to_i128()..kr.end.to_i128(), + lsn: lr.start.0..lr.end.0, + is_image, + } + } +} + /// Efficiently queryable layer coverage for each LSN. /// /// Allows answering layer map queries very efficiently, diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index ba3ab6dd4c..88710b99ef 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -867,6 +867,13 @@ pub struct DeltaLayerWriter { } impl DeltaLayerWriter { + pub fn key_start(&self) -> Key { + self.inner.as_ref().unwrap().key_start.clone() + } + pub fn lsn_range(&self) -> Range { + self.inner.as_ref().unwrap().lsn_range.clone() + } + /// /// Start building a new delta layer. /// diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 2543764eca..9838640097 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -3375,7 +3375,31 @@ impl Timeline { || contains_hole { // ... if so, flush previous layer and prepare to write new one - new_layers.push(writer.take().unwrap().finish(prev_key.unwrap().next())?); + + let end_key = prev_key.unwrap().next(); + + let w = writer.take().unwrap(); + + // If an identical L1 layer already exists, no need to create a new one. + // + // This can happen if compaction is interrupted after it has already + // created some or all of the L1 layers, but has not deleted the L0 layers + // yet, so that on next compaction, we do the same work again. + // + // NOTE: this is racy, if there can be any other task that concurrently + // creates L1 layers. Currently, there can be only one compaction task + // running at any time, so this is fine. + if self.layers.read().unwrap().contains( + &(w.key_start()..end_key), + &w.lsn_range(), + false, // not an image layer + ) { + drop(w); + } else { + let new_layer = w.finish(end_key)?; + + new_layers.push(new_layer); + } writer = None; if contains_hole { @@ -3432,6 +3456,10 @@ impl Timeline { drop(all_keys_iter); // So that deltas_to_compact is no longer borrowed + fail_point!("compact-level0-phase1-finish", |_| { + Err(anyhow::anyhow!("failpoint compact-level0-phase1-finish").into()) + }); + Ok(CompactLevel0Phase1Result { new_layers, deltas_to_compact, diff --git a/test_runner/regress/test_duplicate_layers.py b/test_runner/regress/test_duplicate_layers.py new file mode 100644 index 0000000000..77d79cfcc0 --- /dev/null +++ b/test_runner/regress/test_duplicate_layers.py @@ -0,0 +1,42 @@ +import time + +import pytest +from fixtures.neon_fixtures import NeonEnvBuilder, PgBin + + +# Test duplicate layer detection +# +# This test sets fail point at the end of first compaction phase: +# after flushing new L1 layers but before deletion of L0 layes +# It should cause generation of duplicate L1 layer by compaction after restart +@pytest.mark.timeout(600) +def test_duplicate_layers(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin): + env = neon_env_builder.init_start() + + # These warnings are expected, when the pageserver is restarted abruptly + env.pageserver.allowed_errors.append(".*found future image layer.*") + env.pageserver.allowed_errors.append(".*found future delta layer.*") + env.pageserver.allowed_errors.append(".*duplicate layer.*") + + pageserver_http = env.pageserver.http_client() + + # Use aggressive compaction and checkpoint settings + tenant_id, _ = env.neon_cli.create_tenant( + conf={ + "checkpoint_distance": f"{1024 ** 2}", + "compaction_target_size": f"{1024 ** 2}", + "compaction_period": "1 s", + "compaction_threshold": "3", + } + ) + endpoint = env.endpoints.create_start("main", tenant_id=tenant_id) + connstr = endpoint.connstr(options="-csynchronous_commit=off") + pg_bin.run_capture(["pgbench", "-i", "-s10", connstr]) + + pageserver_http.configure_failpoints(("compact-level0-phase1-finish", "exit")) + + with pytest.raises(Exception): + pg_bin.run_capture(["pgbench", "-P1", "-N", "-c5", "-T500", "-Mprepared", connstr]) + env.pageserver.stop() + env.pageserver.start() + time.sleep(10) # let compaction to be performed