Add check for duplicates of generated image layers (#3869)

## Describe your changes

## Issue ticket number and link

#3673

## Checklist before requesting a review
- [ ] I have performed a self-review of my code.
- [ ] If it is a core feature, I have added thorough tests.
- [ ] Do we need to implement analytics? if so did you add the relevant
metrics to the dashboard?
- [ ] If this PR requires public announcement, mark it with
/release-notes label and add several sentences in this section.

---------

Co-authored-by: Heikki Linnakangas <heikki@neon.tech>
This commit is contained in:
Konstantin Knizhnik
2023-04-13 10:19:34 +03:00
committed by GitHub
parent 5d0ecadf7c
commit 732acc54c1
5 changed files with 45 additions and 20 deletions

View File

@@ -33,7 +33,7 @@ fn build_layer_map(filename_dump: PathBuf) -> LayerMap<LayerDescriptor> {
min_lsn = min(min_lsn, lsn_range.start); min_lsn = min(min_lsn, lsn_range.start);
max_lsn = max(max_lsn, Lsn(lsn_range.end.0 - 1)); max_lsn = max(max_lsn, Lsn(lsn_range.end.0 - 1));
updates.insert_historic(Arc::new(layer)); updates.insert_historic(Arc::new(layer)).unwrap();
} }
println!("min: {min_lsn}, max: {max_lsn}"); println!("min: {min_lsn}, max: {max_lsn}");
@@ -215,7 +215,7 @@ fn bench_sequential(c: &mut Criterion) {
is_incremental: false, is_incremental: false,
short_id: format!("Layer {}", i), short_id: format!("Layer {}", i),
}; };
updates.insert_historic(Arc::new(layer)); updates.insert_historic(Arc::new(layer)).unwrap();
} }
updates.flush(); updates.flush();
println!("Finished layer map init in {:?}", now.elapsed()); println!("Finished layer map init in {:?}", now.elapsed());

View File

@@ -267,7 +267,10 @@ impl UninitializedTimeline<'_> {
.await .await
.context("Failed to flush after basebackup import")?; .context("Failed to flush after basebackup import")?;
self.initialize(ctx) // Initialize without loading the layer map. We started with an empty layer map, and already
// updated it for the layers that we created during the import.
let mut timelines = self.owning_tenant.timelines.lock().unwrap();
self.initialize_with_lock(ctx, &mut timelines, false, true)
} }
fn raw_timeline(&self) -> anyhow::Result<&Arc<Timeline>> { fn raw_timeline(&self) -> anyhow::Result<&Arc<Timeline>> {
@@ -2308,6 +2311,8 @@ impl Tenant {
) )
})?; })?;
// Initialize the timeline without loading the layer map, because we already updated the layer
// map above, when we imported the datadir.
let timeline = { let timeline = {
let mut timelines = self.timelines.lock().unwrap(); let mut timelines = self.timelines.lock().unwrap();
raw_timeline.initialize_with_lock(ctx, &mut timelines, false, true)? raw_timeline.initialize_with_lock(ctx, &mut timelines, false, true)?

View File

@@ -52,7 +52,7 @@ use crate::metrics::NUM_ONDISK_LAYERS;
use crate::repository::Key; use crate::repository::Key;
use crate::tenant::storage_layer::InMemoryLayer; use crate::tenant::storage_layer::InMemoryLayer;
use crate::tenant::storage_layer::Layer; use crate::tenant::storage_layer::Layer;
use anyhow::Result; use anyhow::{bail, Result};
use std::collections::VecDeque; use std::collections::VecDeque;
use std::ops::Range; use std::ops::Range;
use std::sync::Arc; use std::sync::Arc;
@@ -126,7 +126,7 @@ where
/// ///
/// Insert an on-disk layer. /// Insert an on-disk layer.
/// ///
pub fn insert_historic(&mut self, layer: Arc<L>) { pub fn insert_historic(&mut self, layer: Arc<L>) -> anyhow::Result<()> {
self.layer_map.insert_historic_noflush(layer) self.layer_map.insert_historic_noflush(layer)
} }
@@ -274,17 +274,22 @@ where
/// ///
/// Helper function for BatchedUpdates::insert_historic /// Helper function for BatchedUpdates::insert_historic
/// ///
pub(self) fn insert_historic_noflush(&mut self, layer: Arc<L>) { pub(self) fn insert_historic_noflush(&mut self, layer: Arc<L>) -> anyhow::Result<()> {
self.historic.insert( let key = historic_layer_coverage::LayerKey::from(&*layer);
historic_layer_coverage::LayerKey::from(&*layer), if self.historic.contains(&key) {
Arc::clone(&layer), bail!(
); "Attempt to insert duplicate layer {} in layer map",
layer.short_id()
);
}
self.historic.insert(key, Arc::clone(&layer));
if Self::is_l0(&layer) { if Self::is_l0(&layer) {
self.l0_delta_layers.push(layer); self.l0_delta_layers.push(layer);
} }
NUM_ONDISK_LAYERS.inc(); NUM_ONDISK_LAYERS.inc();
Ok(())
} }
/// ///
@@ -838,7 +843,7 @@ mod tests {
let expected_in_counts = (1, usize::from(expected_l0)); let expected_in_counts = (1, usize::from(expected_l0));
map.batch_update().insert_historic(remote.clone()); map.batch_update().insert_historic(remote.clone()).unwrap();
assert_eq!(count_layer_in(&map, &remote), expected_in_counts); assert_eq!(count_layer_in(&map, &remote), expected_in_counts);
let replaced = map let replaced = map

View File

@@ -417,6 +417,14 @@ impl<Value: Clone> BufferedHistoricLayerCoverage<Value> {
} }
} }
pub fn contains(&self, layer_key: &LayerKey) -> bool {
match self.buffer.get(layer_key) {
Some(None) => false, // layer remove was buffered
Some(_) => true, // layer insert was buffered
None => self.layers.contains_key(layer_key), // no buffered ops for this layer
}
}
pub fn insert(&mut self, layer_key: LayerKey, value: Value) { pub fn insert(&mut self, layer_key: LayerKey, value: Value) {
self.buffer.insert(layer_key, Some(value)); self.buffer.insert(layer_key, Some(value));
} }

View File

@@ -1446,7 +1446,7 @@ impl Timeline {
trace!("found layer {}", layer.path().display()); trace!("found layer {}", layer.path().display());
total_physical_size += file_size; total_physical_size += file_size;
updates.insert_historic(Arc::new(layer)); updates.insert_historic(Arc::new(layer))?;
num_layers += 1; num_layers += 1;
} else if let Some(deltafilename) = DeltaFileName::parse_str(&fname) { } else if let Some(deltafilename) = DeltaFileName::parse_str(&fname) {
// Create a DeltaLayer struct for each delta file. // Create a DeltaLayer struct for each delta file.
@@ -1478,7 +1478,7 @@ impl Timeline {
trace!("found layer {}", layer.path().display()); trace!("found layer {}", layer.path().display());
total_physical_size += file_size; total_physical_size += file_size;
updates.insert_historic(Arc::new(layer)); updates.insert_historic(Arc::new(layer))?;
num_layers += 1; num_layers += 1;
} else if fname == METADATA_FILE_NAME || fname.ends_with(".old") { } else if fname == METADATA_FILE_NAME || fname.ends_with(".old") {
// ignore these // ignore these
@@ -1552,7 +1552,7 @@ impl Timeline {
// remote index file? // remote index file?
// If so, rename_to_backup those files & replace their local layer with // If so, rename_to_backup those files & replace their local layer with
// a RemoteLayer in the layer map so that we re-download them on-demand. // a RemoteLayer in the layer map so that we re-download them on-demand.
if let Some(local_layer) = local_layer { if let Some(local_layer) = &local_layer {
let local_layer_path = local_layer let local_layer_path = local_layer
.local_path() .local_path()
.expect("caller must ensure that local_layers only contains local layers"); .expect("caller must ensure that local_layers only contains local layers");
@@ -1577,7 +1577,6 @@ impl Timeline {
anyhow::bail!("could not rename file {local_layer_path:?}: {err:?}"); anyhow::bail!("could not rename file {local_layer_path:?}: {err:?}");
} else { } else {
self.metrics.resident_physical_size_gauge.sub(local_size); self.metrics.resident_physical_size_gauge.sub(local_size);
updates.remove_historic(local_layer);
// fall-through to adding the remote layer // fall-through to adding the remote layer
} }
} else { } else {
@@ -1613,7 +1612,11 @@ impl Timeline {
); );
let remote_layer = Arc::new(remote_layer); let remote_layer = Arc::new(remote_layer);
updates.insert_historic(remote_layer); if let Some(local_layer) = &local_layer {
updates.replace_historic(local_layer, remote_layer)?;
} else {
updates.insert_historic(remote_layer)?;
}
} }
LayerFileName::Delta(deltafilename) => { LayerFileName::Delta(deltafilename) => {
// Create a RemoteLayer for the delta file. // Create a RemoteLayer for the delta file.
@@ -1637,7 +1640,11 @@ impl Timeline {
LayerAccessStats::for_loading_layer(LayerResidenceStatus::Evicted), LayerAccessStats::for_loading_layer(LayerResidenceStatus::Evicted),
); );
let remote_layer = Arc::new(remote_layer); let remote_layer = Arc::new(remote_layer);
updates.insert_historic(remote_layer); if let Some(local_layer) = &local_layer {
updates.replace_historic(local_layer, remote_layer)?;
} else {
updates.insert_historic(remote_layer)?;
}
} }
} }
} }
@@ -2684,7 +2691,7 @@ impl Timeline {
.write() .write()
.unwrap() .unwrap()
.batch_update() .batch_update()
.insert_historic(Arc::new(new_delta)); .insert_historic(Arc::new(new_delta))?;
// update the timeline's physical size // update the timeline's physical size
let sz = new_delta_path.metadata()?.len(); let sz = new_delta_path.metadata()?.len();
@@ -2889,7 +2896,7 @@ impl Timeline {
self.metrics self.metrics
.resident_physical_size_gauge .resident_physical_size_gauge
.add(metadata.len()); .add(metadata.len());
updates.insert_historic(Arc::new(l)); updates.insert_historic(Arc::new(l))?;
} }
updates.flush(); updates.flush();
drop(layers); drop(layers);
@@ -3322,7 +3329,7 @@ impl Timeline {
new_layer_paths.insert(new_delta_path, LayerFileMetadata::new(metadata.len())); new_layer_paths.insert(new_delta_path, LayerFileMetadata::new(metadata.len()));
let x: Arc<dyn PersistentLayer + 'static> = Arc::new(l); let x: Arc<dyn PersistentLayer + 'static> = Arc::new(l);
updates.insert_historic(x); updates.insert_historic(x)?;
} }
// Now that we have reshuffled the data to set of new delta layers, we can // Now that we have reshuffled the data to set of new delta layers, we can