From 1066bca5e3bd0aa446f385f700e0be0cdf55d3ab Mon Sep 17 00:00:00 2001 From: Alex Chi Z Date: Mon, 17 Jul 2023 10:26:29 -0400 Subject: [PATCH] compaction: allow duplicated layers and skip in replacement (#4696) ## Problem Compactions might generate files of exactly the same name as before compaction due to our naming of layer files. This could have already caused some mess in the system, and is known to cause some issues like https://github.com/neondatabase/neon/issues/4088. Therefore, we now consider duplicated layers in the post-compaction process to avoid violating the layer map duplicate checks. related previous works: close https://github.com/neondatabase/neon/pull/4094 error reported in: https://github.com/neondatabase/neon/issues/4690, https://github.com/neondatabase/neon/issues/4088 ## Summary of changes If a file already exists in the layer map before the compaction, do not modify the layer map and do not delete the file. The file on disk at that time should be the new one overwritten by the compaction process. This PR also adds a test case with a fail point that produces exactly the same set of files. This bypassing behavior is safe because the produced layer files have the same content / are the same representation of the original file. An alternative might be directly removing the duplicate check in the layer map, but I feel it would be good if we can prevent that in the first place. --------- Signed-off-by: Alex Chi Z Co-authored-by: Konstantin Knizhnik Co-authored-by: Heikki Linnakangas Co-authored-by: Joonas Koivunen --- pageserver/src/tenant/storage_layer.rs | 4 ++ .../src/tenant/storage_layer/delta_layer.rs | 5 ++ pageserver/src/tenant/timeline.rs | 70 ++++++++++++++++--- .../src/tenant/timeline/layer_manager.rs | 8 +++ test_runner/regress/test_duplicate_layers.py | 36 ++++++++++ 5 files changed, 113 insertions(+), 10 deletions(-) create mode 100644 test_runner/regress/test_duplicate_layers.py diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index e1ebe92c61..c6d1a0052a 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -442,6 +442,10 @@ pub trait PersistentLayer: Layer + AsLayerDesc { None } + fn downcast_delta_layer(self: Arc) -> Option> { + None + } + fn is_remote_layer(&self) -> bool { false } diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 2eab2106a6..83a22f9f13 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -51,6 +51,7 @@ use std::io::{Seek, SeekFrom}; use std::ops::Range; use std::os::unix::fs::FileExt; use std::path::{Path, PathBuf}; +use std::sync::Arc; use tracing::*; use utils::{ @@ -414,6 +415,10 @@ impl AsLayerDesc for DeltaLayer { } impl PersistentLayer for DeltaLayer { + fn downcast_delta_layer(self: Arc) -> Option> { + Some(self) + } + fn local_path(&self) -> Option { Some(self.path()) } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index ded65c732a..58144d9050 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -24,7 +24,7 @@ use tracing::*; use utils::id::TenantTimelineId; use std::cmp::{max, min, Ordering}; -use std::collections::{BinaryHeap, HashMap}; +use std::collections::{BinaryHeap, HashMap, HashSet}; use std::fs; use std::ops::{Deref, Range}; use std::path::{Path, PathBuf}; @@ -3140,7 +3140,7 @@ impl Timeline { #[derive(Default)] struct CompactLevel0Phase1Result { - new_layers: Vec, + new_layers: Vec>, deltas_to_compact: Vec>, } @@ -3318,6 +3318,37 @@ impl Timeline { return Ok(CompactLevel0Phase1Result::default()); } + // This failpoint is used together with `test_duplicate_layers` integration test. + // It returns the compaction result exactly the same layers as input to compaction. + // We want to ensure that this will not cause any problem when updating the layer map + // after the compaction is finished. + // + // Currently, there are two rare edge cases that will cause duplicated layers being + // inserted. + // 1. The compaction job is inturrupted / did not finish successfully. Assume we have file 1, 2, 3, 4, which + // is compacted to 5, but the page server is shut down, next time we start page server we will get a layer + // map containing 1, 2, 3, 4, and 5, whereas 5 has the same content as 4. If we trigger L0 compation at this + // point again, it is likely that we will get a file 6 which has the same content and the key range as 5, + // and this causes an overwrite. This is acceptable because the content is the same, and we should do a + // layer replace instead of the normal remove / upload process. + // 2. The input workload pattern creates exactly n files that are sorted, non-overlapping and is of target file + // size length. Compaction will likely create the same set of n files afterwards. + // + // This failpoint is a superset of both of the cases. + fail_point!("compact-level0-phase1-return-same", |_| { + println!("compact-level0-phase1-return-same"); // so that we can check if we hit the failpoint + Ok(CompactLevel0Phase1Result { + new_layers: level0_deltas + .iter() + .map(|x| x.clone().downcast_delta_layer().unwrap()) + .collect(), + deltas_to_compact: level0_deltas + .iter() + .map(|x| x.layer_desc().clone().into()) + .collect(), + }) + }); + // Gather the files to compact in this iteration. // // Start with the oldest Level 0 delta file, and collect any other @@ -3576,7 +3607,9 @@ impl Timeline { || contains_hole { // ... if so, flush previous layer and prepare to write new one - new_layers.push(writer.take().unwrap().finish(prev_key.unwrap().next())?); + new_layers.push(Arc::new( + writer.take().unwrap().finish(prev_key.unwrap().next())?, + )); writer = None; if contains_hole { @@ -3614,7 +3647,7 @@ impl Timeline { prev_key = Some(key); } if let Some(writer) = writer { - new_layers.push(writer.finish(prev_key.unwrap().next())?); + new_layers.push(Arc::new(writer.finish(prev_key.unwrap().next())?)); } // Sync layers @@ -3723,6 +3756,11 @@ impl Timeline { let mut guard = self.layers.write().await; let mut new_layer_paths = HashMap::with_capacity(new_layers.len()); + // In some rare cases, we may generate a file with exactly the same key range / LSN as before the compaction. + // We should move to numbering the layer files instead of naming them using key range / LSN some day. But for + // now, we just skip the file to avoid unintentional modification to files on the disk and in the layer map. + let mut duplicated_layers = HashSet::new(); + let mut insert_layers = Vec::new(); let mut remove_layers = Vec::new(); @@ -3749,21 +3787,33 @@ impl Timeline { .add(metadata.len()); new_layer_paths.insert(new_delta_path, LayerFileMetadata::new(metadata.len())); - let x: Arc = Arc::new(l); - x.access_stats().record_residence_event( + l.access_stats().record_residence_event( &guard, LayerResidenceStatus::Resident, LayerResidenceEventReason::LayerCreate, ); - insert_layers.push(x); + let l = l as Arc; + if guard.contains(&l) { + duplicated_layers.insert(l.layer_desc().key()); + } else { + if LayerMap::is_l0(l.layer_desc()) { + return Err(CompactionError::Other(anyhow!("compaction generates a L0 layer file as output, which will cause infinite compaction."))); + } + insert_layers.push(l); + } } // Now that we have reshuffled the data to set of new delta layers, we can // delete the old ones let mut layer_names_to_delete = Vec::with_capacity(deltas_to_compact.len()); - for l in deltas_to_compact { - layer_names_to_delete.push(l.filename()); - remove_layers.push(guard.get_from_desc(&l)); + for ldesc in deltas_to_compact { + if duplicated_layers.contains(&ldesc.key()) { + // skip duplicated layers, they will not be removed; we have already overwritten them + // with new layers in the compaction phase 1. + continue; + } + layer_names_to_delete.push(ldesc.filename()); + remove_layers.push(guard.get_from_desc(&ldesc)); } guard.finish_compact_l0( diff --git a/pageserver/src/tenant/timeline/layer_manager.rs b/pageserver/src/tenant/timeline/layer_manager.rs index 979155f8d2..77f5f38314 100644 --- a/pageserver/src/tenant/timeline/layer_manager.rs +++ b/pageserver/src/tenant/timeline/layer_manager.rs @@ -295,6 +295,10 @@ impl LayerManager { Ok(()) } + + pub(crate) fn contains(&self, layer: &Arc) -> bool { + self.layer_fmgr.contains(layer) + } } pub struct LayerFileManager( @@ -319,6 +323,10 @@ impl LayerFileManager { } } + pub(crate) fn contains(&self, layer: &Arc) -> bool { + self.0.contains_key(&layer.layer_desc().key()) + } + pub(crate) fn new() -> Self { Self(HashMap::new()) } diff --git a/test_runner/regress/test_duplicate_layers.py b/test_runner/regress/test_duplicate_layers.py new file mode 100644 index 0000000000..c1832a2063 --- /dev/null +++ b/test_runner/regress/test_duplicate_layers.py @@ -0,0 +1,36 @@ +import time + +import pytest +from fixtures.neon_fixtures import NeonEnvBuilder, PgBin + + +# Test duplicate layer detection +# +# This test sets fail point at the end of first compaction phase: +# after flushing new L1 layers but before deletion of L0 layers +# it should cause generation of duplicate L1 layer by compaction after restart. +@pytest.mark.timeout(600) +def test_duplicate_layers(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin): + env = neon_env_builder.init_start() + pageserver_http = env.pageserver.http_client() + + # Use aggressive compaction and checkpoint settings + tenant_id, _ = env.neon_cli.create_tenant( + conf={ + "checkpoint_distance": f"{1024 ** 2}", + "compaction_target_size": f"{1024 ** 2}", + "compaction_period": "5 s", + "compaction_threshold": "3", + } + ) + + pageserver_http.configure_failpoints(("compact-level0-phase1-return-same", "return")) + + endpoint = env.endpoints.create_start("main", tenant_id=tenant_id) + connstr = endpoint.connstr(options="-csynchronous_commit=off") + pg_bin.run_capture(["pgbench", "-i", "-s1", connstr]) + + time.sleep(10) # let compaction to be performed + assert env.pageserver.log_contains("compact-level0-phase1-return-same") + + pg_bin.run_capture(["pgbench", "-P1", "-N", "-c5", "-T500", "-Mprepared", connstr])