Add check for duplicates of generated image layers (#3869)

## Describe your changes

## Issue ticket number and link

#3673

## Checklist before requesting a review
- [ ] I have performed a self-review of my code.
- [ ] If it is a core feature, I have added thorough tests.
- [ ] Do we need to implement analytics? if so did you add the relevant
metrics to the dashboard?
- [ ] If this PR requires public announcement, mark it with
/release-notes label and add several sentences in this section.

---------

Co-authored-by: Heikki Linnakangas <heikki@neon.tech>
This commit is contained in:
Konstantin Knizhnik
2023-04-13 10:19:34 +03:00
committed by GitHub
parent 5d0ecadf7c
commit 732acc54c1
5 changed files with 45 additions and 20 deletions

View File

@@ -33,7 +33,7 @@ fn build_layer_map(filename_dump: PathBuf) -> LayerMap<LayerDescriptor> {
min_lsn = min(min_lsn, lsn_range.start);
max_lsn = max(max_lsn, Lsn(lsn_range.end.0 - 1));
updates.insert_historic(Arc::new(layer));
updates.insert_historic(Arc::new(layer)).unwrap();
}
println!("min: {min_lsn}, max: {max_lsn}");
@@ -215,7 +215,7 @@ fn bench_sequential(c: &mut Criterion) {
is_incremental: false,
short_id: format!("Layer {}", i),
};
updates.insert_historic(Arc::new(layer));
updates.insert_historic(Arc::new(layer)).unwrap();
}
updates.flush();
println!("Finished layer map init in {:?}", now.elapsed());

View File

@@ -267,7 +267,10 @@ impl UninitializedTimeline<'_> {
.await
.context("Failed to flush after basebackup import")?;
self.initialize(ctx)
// Initialize without loading the layer map. We started with an empty layer map, and already
// updated it for the layers that we created during the import.
let mut timelines = self.owning_tenant.timelines.lock().unwrap();
self.initialize_with_lock(ctx, &mut timelines, false, true)
}
fn raw_timeline(&self) -> anyhow::Result<&Arc<Timeline>> {
@@ -2308,6 +2311,8 @@ impl Tenant {
)
})?;
// Initialize the timeline without loading the layer map, because we already updated the layer
// map above, when we imported the datadir.
let timeline = {
let mut timelines = self.timelines.lock().unwrap();
raw_timeline.initialize_with_lock(ctx, &mut timelines, false, true)?

View File

@@ -52,7 +52,7 @@ use crate::metrics::NUM_ONDISK_LAYERS;
use crate::repository::Key;
use crate::tenant::storage_layer::InMemoryLayer;
use crate::tenant::storage_layer::Layer;
use anyhow::Result;
use anyhow::{bail, Result};
use std::collections::VecDeque;
use std::ops::Range;
use std::sync::Arc;
@@ -126,7 +126,7 @@ where
///
/// Insert an on-disk layer.
///
pub fn insert_historic(&mut self, layer: Arc<L>) {
pub fn insert_historic(&mut self, layer: Arc<L>) -> anyhow::Result<()> {
self.layer_map.insert_historic_noflush(layer)
}
@@ -274,17 +274,22 @@ where
///
/// Helper function for BatchedUpdates::insert_historic
///
pub(self) fn insert_historic_noflush(&mut self, layer: Arc<L>) {
self.historic.insert(
historic_layer_coverage::LayerKey::from(&*layer),
Arc::clone(&layer),
);
pub(self) fn insert_historic_noflush(&mut self, layer: Arc<L>) -> anyhow::Result<()> {
let key = historic_layer_coverage::LayerKey::from(&*layer);
if self.historic.contains(&key) {
bail!(
"Attempt to insert duplicate layer {} in layer map",
layer.short_id()
);
}
self.historic.insert(key, Arc::clone(&layer));
if Self::is_l0(&layer) {
self.l0_delta_layers.push(layer);
}
NUM_ONDISK_LAYERS.inc();
Ok(())
}
///
@@ -838,7 +843,7 @@ mod tests {
let expected_in_counts = (1, usize::from(expected_l0));
map.batch_update().insert_historic(remote.clone());
map.batch_update().insert_historic(remote.clone()).unwrap();
assert_eq!(count_layer_in(&map, &remote), expected_in_counts);
let replaced = map

View File

@@ -417,6 +417,14 @@ impl<Value: Clone> BufferedHistoricLayerCoverage<Value> {
}
}
pub fn contains(&self, layer_key: &LayerKey) -> bool {
match self.buffer.get(layer_key) {
Some(None) => false, // layer remove was buffered
Some(_) => true, // layer insert was buffered
None => self.layers.contains_key(layer_key), // no buffered ops for this layer
}
}
pub fn insert(&mut self, layer_key: LayerKey, value: Value) {
self.buffer.insert(layer_key, Some(value));
}

View File

@@ -1446,7 +1446,7 @@ impl Timeline {
trace!("found layer {}", layer.path().display());
total_physical_size += file_size;
updates.insert_historic(Arc::new(layer));
updates.insert_historic(Arc::new(layer))?;
num_layers += 1;
} else if let Some(deltafilename) = DeltaFileName::parse_str(&fname) {
// Create a DeltaLayer struct for each delta file.
@@ -1478,7 +1478,7 @@ impl Timeline {
trace!("found layer {}", layer.path().display());
total_physical_size += file_size;
updates.insert_historic(Arc::new(layer));
updates.insert_historic(Arc::new(layer))?;
num_layers += 1;
} else if fname == METADATA_FILE_NAME || fname.ends_with(".old") {
// ignore these
@@ -1552,7 +1552,7 @@ impl Timeline {
// remote index file?
// If so, rename_to_backup those files & replace their local layer with
// a RemoteLayer in the layer map so that we re-download them on-demand.
if let Some(local_layer) = local_layer {
if let Some(local_layer) = &local_layer {
let local_layer_path = local_layer
.local_path()
.expect("caller must ensure that local_layers only contains local layers");
@@ -1577,7 +1577,6 @@ impl Timeline {
anyhow::bail!("could not rename file {local_layer_path:?}: {err:?}");
} else {
self.metrics.resident_physical_size_gauge.sub(local_size);
updates.remove_historic(local_layer);
// fall-through to adding the remote layer
}
} else {
@@ -1613,7 +1612,11 @@ impl Timeline {
);
let remote_layer = Arc::new(remote_layer);
updates.insert_historic(remote_layer);
if let Some(local_layer) = &local_layer {
updates.replace_historic(local_layer, remote_layer)?;
} else {
updates.insert_historic(remote_layer)?;
}
}
LayerFileName::Delta(deltafilename) => {
// Create a RemoteLayer for the delta file.
@@ -1637,7 +1640,11 @@ impl Timeline {
LayerAccessStats::for_loading_layer(LayerResidenceStatus::Evicted),
);
let remote_layer = Arc::new(remote_layer);
updates.insert_historic(remote_layer);
if let Some(local_layer) = &local_layer {
updates.replace_historic(local_layer, remote_layer)?;
} else {
updates.insert_historic(remote_layer)?;
}
}
}
}
@@ -2684,7 +2691,7 @@ impl Timeline {
.write()
.unwrap()
.batch_update()
.insert_historic(Arc::new(new_delta));
.insert_historic(Arc::new(new_delta))?;
// update the timeline's physical size
let sz = new_delta_path.metadata()?.len();
@@ -2889,7 +2896,7 @@ impl Timeline {
self.metrics
.resident_physical_size_gauge
.add(metadata.len());
updates.insert_historic(Arc::new(l));
updates.insert_historic(Arc::new(l))?;
}
updates.flush();
drop(layers);
@@ -3322,7 +3329,7 @@ impl Timeline {
new_layer_paths.insert(new_delta_path, LayerFileMetadata::new(metadata.len()));
let x: Arc<dyn PersistentLayer + 'static> = Arc::new(l);
updates.insert_historic(x);
updates.insert_historic(x)?;
}
// Now that we have reshuffled the data to set of new delta layers, we can