From 732acc54c1fa744fc0c5c48158c7716371e70b89 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Thu, 13 Apr 2023 10:19:34 +0300 Subject: [PATCH] Add check for duplicates of generated image layers (#3869) ## Describe your changes ## Issue ticket number and link #3673 ## Checklist before requesting a review - [ ] I have performed a self-review of my code. - [ ] If it is a core feature, I have added thorough tests. - [ ] Do we need to implement analytics? if so did you add the relevant metrics to the dashboard? - [ ] If this PR requires public announcement, mark it with /release-notes label and add several sentences in this section. --------- Co-authored-by: Heikki Linnakangas --- pageserver/benches/bench_layer_map.rs | 4 +-- pageserver/src/tenant.rs | 7 +++++- pageserver/src/tenant/layer_map.rs | 21 ++++++++++------ .../layer_map/historic_layer_coverage.rs | 8 ++++++ pageserver/src/tenant/timeline.rs | 25 ++++++++++++------- 5 files changed, 45 insertions(+), 20 deletions(-) diff --git a/pageserver/benches/bench_layer_map.rs b/pageserver/benches/bench_layer_map.rs index 5edfa84d8a..4882fc518f 100644 --- a/pageserver/benches/bench_layer_map.rs +++ b/pageserver/benches/bench_layer_map.rs @@ -33,7 +33,7 @@ fn build_layer_map(filename_dump: PathBuf) -> LayerMap { min_lsn = min(min_lsn, lsn_range.start); max_lsn = max(max_lsn, Lsn(lsn_range.end.0 - 1)); - updates.insert_historic(Arc::new(layer)); + updates.insert_historic(Arc::new(layer)).unwrap(); } println!("min: {min_lsn}, max: {max_lsn}"); @@ -215,7 +215,7 @@ fn bench_sequential(c: &mut Criterion) { is_incremental: false, short_id: format!("Layer {}", i), }; - updates.insert_historic(Arc::new(layer)); + updates.insert_historic(Arc::new(layer)).unwrap(); } updates.flush(); println!("Finished layer map init in {:?}", now.elapsed()); diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 03a4ff8c8e..7e88a12963 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -267,7 +267,10 @@ impl UninitializedTimeline<'_> { .await .context("Failed to flush after basebackup import")?; - self.initialize(ctx) + // Initialize without loading the layer map. We started with an empty layer map, and already + // updated it for the layers that we created during the import. + let mut timelines = self.owning_tenant.timelines.lock().unwrap(); + self.initialize_with_lock(ctx, &mut timelines, false, true) } fn raw_timeline(&self) -> anyhow::Result<&Arc> { @@ -2308,6 +2311,8 @@ impl Tenant { ) })?; + // Initialize the timeline without loading the layer map, because we already updated the layer + // map above, when we imported the datadir. let timeline = { let mut timelines = self.timelines.lock().unwrap(); raw_timeline.initialize_with_lock(ctx, &mut timelines, false, true)? diff --git a/pageserver/src/tenant/layer_map.rs b/pageserver/src/tenant/layer_map.rs index 4c659be9aa..02159ee291 100644 --- a/pageserver/src/tenant/layer_map.rs +++ b/pageserver/src/tenant/layer_map.rs @@ -52,7 +52,7 @@ use crate::metrics::NUM_ONDISK_LAYERS; use crate::repository::Key; use crate::tenant::storage_layer::InMemoryLayer; use crate::tenant::storage_layer::Layer; -use anyhow::Result; +use anyhow::{bail, Result}; use std::collections::VecDeque; use std::ops::Range; use std::sync::Arc; @@ -126,7 +126,7 @@ where /// /// Insert an on-disk layer. /// - pub fn insert_historic(&mut self, layer: Arc) { + pub fn insert_historic(&mut self, layer: Arc) -> anyhow::Result<()> { self.layer_map.insert_historic_noflush(layer) } @@ -274,17 +274,22 @@ where /// /// Helper function for BatchedUpdates::insert_historic /// - pub(self) fn insert_historic_noflush(&mut self, layer: Arc) { - self.historic.insert( - historic_layer_coverage::LayerKey::from(&*layer), - Arc::clone(&layer), - ); + pub(self) fn insert_historic_noflush(&mut self, layer: Arc) -> anyhow::Result<()> { + let key = historic_layer_coverage::LayerKey::from(&*layer); + if self.historic.contains(&key) { + bail!( + "Attempt to insert duplicate layer {} in layer map", + layer.short_id() + ); + } + self.historic.insert(key, Arc::clone(&layer)); if Self::is_l0(&layer) { self.l0_delta_layers.push(layer); } NUM_ONDISK_LAYERS.inc(); + Ok(()) } /// @@ -838,7 +843,7 @@ mod tests { let expected_in_counts = (1, usize::from(expected_l0)); - map.batch_update().insert_historic(remote.clone()); + map.batch_update().insert_historic(remote.clone()).unwrap(); assert_eq!(count_layer_in(&map, &remote), expected_in_counts); let replaced = map diff --git a/pageserver/src/tenant/layer_map/historic_layer_coverage.rs b/pageserver/src/tenant/layer_map/historic_layer_coverage.rs index b63c361314..1fdcd5e5a4 100644 --- a/pageserver/src/tenant/layer_map/historic_layer_coverage.rs +++ b/pageserver/src/tenant/layer_map/historic_layer_coverage.rs @@ -417,6 +417,14 @@ impl BufferedHistoricLayerCoverage { } } + pub fn contains(&self, layer_key: &LayerKey) -> bool { + match self.buffer.get(layer_key) { + Some(None) => false, // layer remove was buffered + Some(_) => true, // layer insert was buffered + None => self.layers.contains_key(layer_key), // no buffered ops for this layer + } + } + pub fn insert(&mut self, layer_key: LayerKey, value: Value) { self.buffer.insert(layer_key, Some(value)); } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 4b0d7a6994..29d8b544cc 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -1446,7 +1446,7 @@ impl Timeline { trace!("found layer {}", layer.path().display()); total_physical_size += file_size; - updates.insert_historic(Arc::new(layer)); + updates.insert_historic(Arc::new(layer))?; num_layers += 1; } else if let Some(deltafilename) = DeltaFileName::parse_str(&fname) { // Create a DeltaLayer struct for each delta file. @@ -1478,7 +1478,7 @@ impl Timeline { trace!("found layer {}", layer.path().display()); total_physical_size += file_size; - updates.insert_historic(Arc::new(layer)); + updates.insert_historic(Arc::new(layer))?; num_layers += 1; } else if fname == METADATA_FILE_NAME || fname.ends_with(".old") { // ignore these @@ -1552,7 +1552,7 @@ impl Timeline { // remote index file? // If so, rename_to_backup those files & replace their local layer with // a RemoteLayer in the layer map so that we re-download them on-demand. - if let Some(local_layer) = local_layer { + if let Some(local_layer) = &local_layer { let local_layer_path = local_layer .local_path() .expect("caller must ensure that local_layers only contains local layers"); @@ -1577,7 +1577,6 @@ impl Timeline { anyhow::bail!("could not rename file {local_layer_path:?}: {err:?}"); } else { self.metrics.resident_physical_size_gauge.sub(local_size); - updates.remove_historic(local_layer); // fall-through to adding the remote layer } } else { @@ -1613,7 +1612,11 @@ impl Timeline { ); let remote_layer = Arc::new(remote_layer); - updates.insert_historic(remote_layer); + if let Some(local_layer) = &local_layer { + updates.replace_historic(local_layer, remote_layer)?; + } else { + updates.insert_historic(remote_layer)?; + } } LayerFileName::Delta(deltafilename) => { // Create a RemoteLayer for the delta file. @@ -1637,7 +1640,11 @@ impl Timeline { LayerAccessStats::for_loading_layer(LayerResidenceStatus::Evicted), ); let remote_layer = Arc::new(remote_layer); - updates.insert_historic(remote_layer); + if let Some(local_layer) = &local_layer { + updates.replace_historic(local_layer, remote_layer)?; + } else { + updates.insert_historic(remote_layer)?; + } } } } @@ -2684,7 +2691,7 @@ impl Timeline { .write() .unwrap() .batch_update() - .insert_historic(Arc::new(new_delta)); + .insert_historic(Arc::new(new_delta))?; // update the timeline's physical size let sz = new_delta_path.metadata()?.len(); @@ -2889,7 +2896,7 @@ impl Timeline { self.metrics .resident_physical_size_gauge .add(metadata.len()); - updates.insert_historic(Arc::new(l)); + updates.insert_historic(Arc::new(l))?; } updates.flush(); drop(layers); @@ -3322,7 +3329,7 @@ impl Timeline { new_layer_paths.insert(new_delta_path, LayerFileMetadata::new(metadata.len())); let x: Arc = Arc::new(l); - updates.insert_historic(x); + updates.insert_historic(x)?; } // Now that we have reshuffled the data to set of new delta layers, we can