mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-24 14:49:59 +00:00
Add check for duplicates of generated image layers (#3869)
## Describe your changes ## Issue ticket number and link #3673 ## Checklist before requesting a review - [ ] I have performed a self-review of my code. - [ ] If it is a core feature, I have added thorough tests. - [ ] Do we need to implement analytics? if so did you add the relevant metrics to the dashboard? - [ ] If this PR requires public announcement, mark it with /release-notes label and add several sentences in this section. --------- Co-authored-by: Heikki Linnakangas <heikki@neon.tech>
This commit is contained in:
committed by
GitHub
parent
5d0ecadf7c
commit
732acc54c1
@@ -33,7 +33,7 @@ fn build_layer_map(filename_dump: PathBuf) -> LayerMap<LayerDescriptor> {
|
||||
min_lsn = min(min_lsn, lsn_range.start);
|
||||
max_lsn = max(max_lsn, Lsn(lsn_range.end.0 - 1));
|
||||
|
||||
updates.insert_historic(Arc::new(layer));
|
||||
updates.insert_historic(Arc::new(layer)).unwrap();
|
||||
}
|
||||
|
||||
println!("min: {min_lsn}, max: {max_lsn}");
|
||||
@@ -215,7 +215,7 @@ fn bench_sequential(c: &mut Criterion) {
|
||||
is_incremental: false,
|
||||
short_id: format!("Layer {}", i),
|
||||
};
|
||||
updates.insert_historic(Arc::new(layer));
|
||||
updates.insert_historic(Arc::new(layer)).unwrap();
|
||||
}
|
||||
updates.flush();
|
||||
println!("Finished layer map init in {:?}", now.elapsed());
|
||||
|
||||
@@ -267,7 +267,10 @@ impl UninitializedTimeline<'_> {
|
||||
.await
|
||||
.context("Failed to flush after basebackup import")?;
|
||||
|
||||
self.initialize(ctx)
|
||||
// Initialize without loading the layer map. We started with an empty layer map, and already
|
||||
// updated it for the layers that we created during the import.
|
||||
let mut timelines = self.owning_tenant.timelines.lock().unwrap();
|
||||
self.initialize_with_lock(ctx, &mut timelines, false, true)
|
||||
}
|
||||
|
||||
fn raw_timeline(&self) -> anyhow::Result<&Arc<Timeline>> {
|
||||
@@ -2308,6 +2311,8 @@ impl Tenant {
|
||||
)
|
||||
})?;
|
||||
|
||||
// Initialize the timeline without loading the layer map, because we already updated the layer
|
||||
// map above, when we imported the datadir.
|
||||
let timeline = {
|
||||
let mut timelines = self.timelines.lock().unwrap();
|
||||
raw_timeline.initialize_with_lock(ctx, &mut timelines, false, true)?
|
||||
|
||||
@@ -52,7 +52,7 @@ use crate::metrics::NUM_ONDISK_LAYERS;
|
||||
use crate::repository::Key;
|
||||
use crate::tenant::storage_layer::InMemoryLayer;
|
||||
use crate::tenant::storage_layer::Layer;
|
||||
use anyhow::Result;
|
||||
use anyhow::{bail, Result};
|
||||
use std::collections::VecDeque;
|
||||
use std::ops::Range;
|
||||
use std::sync::Arc;
|
||||
@@ -126,7 +126,7 @@ where
|
||||
///
|
||||
/// Insert an on-disk layer.
|
||||
///
|
||||
pub fn insert_historic(&mut self, layer: Arc<L>) {
|
||||
pub fn insert_historic(&mut self, layer: Arc<L>) -> anyhow::Result<()> {
|
||||
self.layer_map.insert_historic_noflush(layer)
|
||||
}
|
||||
|
||||
@@ -274,17 +274,22 @@ where
|
||||
///
|
||||
/// Helper function for BatchedUpdates::insert_historic
|
||||
///
|
||||
pub(self) fn insert_historic_noflush(&mut self, layer: Arc<L>) {
|
||||
self.historic.insert(
|
||||
historic_layer_coverage::LayerKey::from(&*layer),
|
||||
Arc::clone(&layer),
|
||||
);
|
||||
pub(self) fn insert_historic_noflush(&mut self, layer: Arc<L>) -> anyhow::Result<()> {
|
||||
let key = historic_layer_coverage::LayerKey::from(&*layer);
|
||||
if self.historic.contains(&key) {
|
||||
bail!(
|
||||
"Attempt to insert duplicate layer {} in layer map",
|
||||
layer.short_id()
|
||||
);
|
||||
}
|
||||
self.historic.insert(key, Arc::clone(&layer));
|
||||
|
||||
if Self::is_l0(&layer) {
|
||||
self.l0_delta_layers.push(layer);
|
||||
}
|
||||
|
||||
NUM_ONDISK_LAYERS.inc();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
///
|
||||
@@ -838,7 +843,7 @@ mod tests {
|
||||
|
||||
let expected_in_counts = (1, usize::from(expected_l0));
|
||||
|
||||
map.batch_update().insert_historic(remote.clone());
|
||||
map.batch_update().insert_historic(remote.clone()).unwrap();
|
||||
assert_eq!(count_layer_in(&map, &remote), expected_in_counts);
|
||||
|
||||
let replaced = map
|
||||
|
||||
@@ -417,6 +417,14 @@ impl<Value: Clone> BufferedHistoricLayerCoverage<Value> {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn contains(&self, layer_key: &LayerKey) -> bool {
|
||||
match self.buffer.get(layer_key) {
|
||||
Some(None) => false, // layer remove was buffered
|
||||
Some(_) => true, // layer insert was buffered
|
||||
None => self.layers.contains_key(layer_key), // no buffered ops for this layer
|
||||
}
|
||||
}
|
||||
|
||||
pub fn insert(&mut self, layer_key: LayerKey, value: Value) {
|
||||
self.buffer.insert(layer_key, Some(value));
|
||||
}
|
||||
|
||||
@@ -1446,7 +1446,7 @@ impl Timeline {
|
||||
|
||||
trace!("found layer {}", layer.path().display());
|
||||
total_physical_size += file_size;
|
||||
updates.insert_historic(Arc::new(layer));
|
||||
updates.insert_historic(Arc::new(layer))?;
|
||||
num_layers += 1;
|
||||
} else if let Some(deltafilename) = DeltaFileName::parse_str(&fname) {
|
||||
// Create a DeltaLayer struct for each delta file.
|
||||
@@ -1478,7 +1478,7 @@ impl Timeline {
|
||||
|
||||
trace!("found layer {}", layer.path().display());
|
||||
total_physical_size += file_size;
|
||||
updates.insert_historic(Arc::new(layer));
|
||||
updates.insert_historic(Arc::new(layer))?;
|
||||
num_layers += 1;
|
||||
} else if fname == METADATA_FILE_NAME || fname.ends_with(".old") {
|
||||
// ignore these
|
||||
@@ -1552,7 +1552,7 @@ impl Timeline {
|
||||
// remote index file?
|
||||
// If so, rename_to_backup those files & replace their local layer with
|
||||
// a RemoteLayer in the layer map so that we re-download them on-demand.
|
||||
if let Some(local_layer) = local_layer {
|
||||
if let Some(local_layer) = &local_layer {
|
||||
let local_layer_path = local_layer
|
||||
.local_path()
|
||||
.expect("caller must ensure that local_layers only contains local layers");
|
||||
@@ -1577,7 +1577,6 @@ impl Timeline {
|
||||
anyhow::bail!("could not rename file {local_layer_path:?}: {err:?}");
|
||||
} else {
|
||||
self.metrics.resident_physical_size_gauge.sub(local_size);
|
||||
updates.remove_historic(local_layer);
|
||||
// fall-through to adding the remote layer
|
||||
}
|
||||
} else {
|
||||
@@ -1613,7 +1612,11 @@ impl Timeline {
|
||||
);
|
||||
let remote_layer = Arc::new(remote_layer);
|
||||
|
||||
updates.insert_historic(remote_layer);
|
||||
if let Some(local_layer) = &local_layer {
|
||||
updates.replace_historic(local_layer, remote_layer)?;
|
||||
} else {
|
||||
updates.insert_historic(remote_layer)?;
|
||||
}
|
||||
}
|
||||
LayerFileName::Delta(deltafilename) => {
|
||||
// Create a RemoteLayer for the delta file.
|
||||
@@ -1637,7 +1640,11 @@ impl Timeline {
|
||||
LayerAccessStats::for_loading_layer(LayerResidenceStatus::Evicted),
|
||||
);
|
||||
let remote_layer = Arc::new(remote_layer);
|
||||
updates.insert_historic(remote_layer);
|
||||
if let Some(local_layer) = &local_layer {
|
||||
updates.replace_historic(local_layer, remote_layer)?;
|
||||
} else {
|
||||
updates.insert_historic(remote_layer)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -2684,7 +2691,7 @@ impl Timeline {
|
||||
.write()
|
||||
.unwrap()
|
||||
.batch_update()
|
||||
.insert_historic(Arc::new(new_delta));
|
||||
.insert_historic(Arc::new(new_delta))?;
|
||||
|
||||
// update the timeline's physical size
|
||||
let sz = new_delta_path.metadata()?.len();
|
||||
@@ -2889,7 +2896,7 @@ impl Timeline {
|
||||
self.metrics
|
||||
.resident_physical_size_gauge
|
||||
.add(metadata.len());
|
||||
updates.insert_historic(Arc::new(l));
|
||||
updates.insert_historic(Arc::new(l))?;
|
||||
}
|
||||
updates.flush();
|
||||
drop(layers);
|
||||
@@ -3322,7 +3329,7 @@ impl Timeline {
|
||||
|
||||
new_layer_paths.insert(new_delta_path, LayerFileMetadata::new(metadata.len()));
|
||||
let x: Arc<dyn PersistentLayer + 'static> = Arc::new(l);
|
||||
updates.insert_historic(x);
|
||||
updates.insert_historic(x)?;
|
||||
}
|
||||
|
||||
// Now that we have reshuffled the data to set of new delta layers, we can
|
||||
|
||||
Reference in New Issue
Block a user