diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index e1ebe92c61..c6d1a0052a 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -442,6 +442,10 @@ pub trait PersistentLayer: Layer + AsLayerDesc { None } + fn downcast_delta_layer(self: Arc) -> Option> { + None + } + fn is_remote_layer(&self) -> bool { false } diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 2eab2106a6..83a22f9f13 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -51,6 +51,7 @@ use std::io::{Seek, SeekFrom}; use std::ops::Range; use std::os::unix::fs::FileExt; use std::path::{Path, PathBuf}; +use std::sync::Arc; use tracing::*; use utils::{ @@ -414,6 +415,10 @@ impl AsLayerDesc for DeltaLayer { } impl PersistentLayer for DeltaLayer { + fn downcast_delta_layer(self: Arc) -> Option> { + Some(self) + } + fn local_path(&self) -> Option { Some(self.path()) } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index ded65c732a..58144d9050 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -24,7 +24,7 @@ use tracing::*; use utils::id::TenantTimelineId; use std::cmp::{max, min, Ordering}; -use std::collections::{BinaryHeap, HashMap}; +use std::collections::{BinaryHeap, HashMap, HashSet}; use std::fs; use std::ops::{Deref, Range}; use std::path::{Path, PathBuf}; @@ -3140,7 +3140,7 @@ impl Timeline { #[derive(Default)] struct CompactLevel0Phase1Result { - new_layers: Vec, + new_layers: Vec>, deltas_to_compact: Vec>, } @@ -3318,6 +3318,37 @@ impl Timeline { return Ok(CompactLevel0Phase1Result::default()); } + // This failpoint is used together with `test_duplicate_layers` integration test. + // It returns the compaction result exactly the same layers as input to compaction. + // We want to ensure that this will not cause any problem when updating the layer map + // after the compaction is finished. + // + // Currently, there are two rare edge cases that will cause duplicated layers being + // inserted. + // 1. The compaction job is inturrupted / did not finish successfully. Assume we have file 1, 2, 3, 4, which + // is compacted to 5, but the page server is shut down, next time we start page server we will get a layer + // map containing 1, 2, 3, 4, and 5, whereas 5 has the same content as 4. If we trigger L0 compation at this + // point again, it is likely that we will get a file 6 which has the same content and the key range as 5, + // and this causes an overwrite. This is acceptable because the content is the same, and we should do a + // layer replace instead of the normal remove / upload process. + // 2. The input workload pattern creates exactly n files that are sorted, non-overlapping and is of target file + // size length. Compaction will likely create the same set of n files afterwards. + // + // This failpoint is a superset of both of the cases. + fail_point!("compact-level0-phase1-return-same", |_| { + println!("compact-level0-phase1-return-same"); // so that we can check if we hit the failpoint + Ok(CompactLevel0Phase1Result { + new_layers: level0_deltas + .iter() + .map(|x| x.clone().downcast_delta_layer().unwrap()) + .collect(), + deltas_to_compact: level0_deltas + .iter() + .map(|x| x.layer_desc().clone().into()) + .collect(), + }) + }); + // Gather the files to compact in this iteration. // // Start with the oldest Level 0 delta file, and collect any other @@ -3576,7 +3607,9 @@ impl Timeline { || contains_hole { // ... if so, flush previous layer and prepare to write new one - new_layers.push(writer.take().unwrap().finish(prev_key.unwrap().next())?); + new_layers.push(Arc::new( + writer.take().unwrap().finish(prev_key.unwrap().next())?, + )); writer = None; if contains_hole { @@ -3614,7 +3647,7 @@ impl Timeline { prev_key = Some(key); } if let Some(writer) = writer { - new_layers.push(writer.finish(prev_key.unwrap().next())?); + new_layers.push(Arc::new(writer.finish(prev_key.unwrap().next())?)); } // Sync layers @@ -3723,6 +3756,11 @@ impl Timeline { let mut guard = self.layers.write().await; let mut new_layer_paths = HashMap::with_capacity(new_layers.len()); + // In some rare cases, we may generate a file with exactly the same key range / LSN as before the compaction. + // We should move to numbering the layer files instead of naming them using key range / LSN some day. But for + // now, we just skip the file to avoid unintentional modification to files on the disk and in the layer map. + let mut duplicated_layers = HashSet::new(); + let mut insert_layers = Vec::new(); let mut remove_layers = Vec::new(); @@ -3749,21 +3787,33 @@ impl Timeline { .add(metadata.len()); new_layer_paths.insert(new_delta_path, LayerFileMetadata::new(metadata.len())); - let x: Arc = Arc::new(l); - x.access_stats().record_residence_event( + l.access_stats().record_residence_event( &guard, LayerResidenceStatus::Resident, LayerResidenceEventReason::LayerCreate, ); - insert_layers.push(x); + let l = l as Arc; + if guard.contains(&l) { + duplicated_layers.insert(l.layer_desc().key()); + } else { + if LayerMap::is_l0(l.layer_desc()) { + return Err(CompactionError::Other(anyhow!("compaction generates a L0 layer file as output, which will cause infinite compaction."))); + } + insert_layers.push(l); + } } // Now that we have reshuffled the data to set of new delta layers, we can // delete the old ones let mut layer_names_to_delete = Vec::with_capacity(deltas_to_compact.len()); - for l in deltas_to_compact { - layer_names_to_delete.push(l.filename()); - remove_layers.push(guard.get_from_desc(&l)); + for ldesc in deltas_to_compact { + if duplicated_layers.contains(&ldesc.key()) { + // skip duplicated layers, they will not be removed; we have already overwritten them + // with new layers in the compaction phase 1. + continue; + } + layer_names_to_delete.push(ldesc.filename()); + remove_layers.push(guard.get_from_desc(&ldesc)); } guard.finish_compact_l0( diff --git a/pageserver/src/tenant/timeline/layer_manager.rs b/pageserver/src/tenant/timeline/layer_manager.rs index 979155f8d2..77f5f38314 100644 --- a/pageserver/src/tenant/timeline/layer_manager.rs +++ b/pageserver/src/tenant/timeline/layer_manager.rs @@ -295,6 +295,10 @@ impl LayerManager { Ok(()) } + + pub(crate) fn contains(&self, layer: &Arc) -> bool { + self.layer_fmgr.contains(layer) + } } pub struct LayerFileManager( @@ -319,6 +323,10 @@ impl LayerFileManager { } } + pub(crate) fn contains(&self, layer: &Arc) -> bool { + self.0.contains_key(&layer.layer_desc().key()) + } + pub(crate) fn new() -> Self { Self(HashMap::new()) } diff --git a/test_runner/regress/test_duplicate_layers.py b/test_runner/regress/test_duplicate_layers.py new file mode 100644 index 0000000000..c1832a2063 --- /dev/null +++ b/test_runner/regress/test_duplicate_layers.py @@ -0,0 +1,36 @@ +import time + +import pytest +from fixtures.neon_fixtures import NeonEnvBuilder, PgBin + + +# Test duplicate layer detection +# +# This test sets fail point at the end of first compaction phase: +# after flushing new L1 layers but before deletion of L0 layers +# it should cause generation of duplicate L1 layer by compaction after restart. +@pytest.mark.timeout(600) +def test_duplicate_layers(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin): + env = neon_env_builder.init_start() + pageserver_http = env.pageserver.http_client() + + # Use aggressive compaction and checkpoint settings + tenant_id, _ = env.neon_cli.create_tenant( + conf={ + "checkpoint_distance": f"{1024 ** 2}", + "compaction_target_size": f"{1024 ** 2}", + "compaction_period": "5 s", + "compaction_threshold": "3", + } + ) + + pageserver_http.configure_failpoints(("compact-level0-phase1-return-same", "return")) + + endpoint = env.endpoints.create_start("main", tenant_id=tenant_id) + connstr = endpoint.connstr(options="-csynchronous_commit=off") + pg_bin.run_capture(["pgbench", "-i", "-s1", connstr]) + + time.sleep(10) # let compaction to be performed + assert env.pageserver.log_contains("compact-level0-phase1-return-same") + + pg_bin.run_capture(["pgbench", "-P1", "-N", "-c5", "-T500", "-Mprepared", connstr])