mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-09 14:32:57 +00:00
Add check for duplicates of generated image layers (#3869)
## Describe your changes ## Issue ticket number and link #3673 ## Checklist before requesting a review - [ ] I have performed a self-review of my code. - [ ] If it is a core feature, I have added thorough tests. - [ ] Do we need to implement analytics? if so did you add the relevant metrics to the dashboard? - [ ] If this PR requires public announcement, mark it with /release-notes label and add several sentences in this section. --------- Co-authored-by: Heikki Linnakangas <heikki@neon.tech>
This commit is contained in:
committed by
GitHub
parent
5d0ecadf7c
commit
732acc54c1
@@ -33,7 +33,7 @@ fn build_layer_map(filename_dump: PathBuf) -> LayerMap<LayerDescriptor> {
|
|||||||
min_lsn = min(min_lsn, lsn_range.start);
|
min_lsn = min(min_lsn, lsn_range.start);
|
||||||
max_lsn = max(max_lsn, Lsn(lsn_range.end.0 - 1));
|
max_lsn = max(max_lsn, Lsn(lsn_range.end.0 - 1));
|
||||||
|
|
||||||
updates.insert_historic(Arc::new(layer));
|
updates.insert_historic(Arc::new(layer)).unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
println!("min: {min_lsn}, max: {max_lsn}");
|
println!("min: {min_lsn}, max: {max_lsn}");
|
||||||
@@ -215,7 +215,7 @@ fn bench_sequential(c: &mut Criterion) {
|
|||||||
is_incremental: false,
|
is_incremental: false,
|
||||||
short_id: format!("Layer {}", i),
|
short_id: format!("Layer {}", i),
|
||||||
};
|
};
|
||||||
updates.insert_historic(Arc::new(layer));
|
updates.insert_historic(Arc::new(layer)).unwrap();
|
||||||
}
|
}
|
||||||
updates.flush();
|
updates.flush();
|
||||||
println!("Finished layer map init in {:?}", now.elapsed());
|
println!("Finished layer map init in {:?}", now.elapsed());
|
||||||
|
|||||||
@@ -267,7 +267,10 @@ impl UninitializedTimeline<'_> {
|
|||||||
.await
|
.await
|
||||||
.context("Failed to flush after basebackup import")?;
|
.context("Failed to flush after basebackup import")?;
|
||||||
|
|
||||||
self.initialize(ctx)
|
// Initialize without loading the layer map. We started with an empty layer map, and already
|
||||||
|
// updated it for the layers that we created during the import.
|
||||||
|
let mut timelines = self.owning_tenant.timelines.lock().unwrap();
|
||||||
|
self.initialize_with_lock(ctx, &mut timelines, false, true)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn raw_timeline(&self) -> anyhow::Result<&Arc<Timeline>> {
|
fn raw_timeline(&self) -> anyhow::Result<&Arc<Timeline>> {
|
||||||
@@ -2308,6 +2311,8 @@ impl Tenant {
|
|||||||
)
|
)
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
|
// Initialize the timeline without loading the layer map, because we already updated the layer
|
||||||
|
// map above, when we imported the datadir.
|
||||||
let timeline = {
|
let timeline = {
|
||||||
let mut timelines = self.timelines.lock().unwrap();
|
let mut timelines = self.timelines.lock().unwrap();
|
||||||
raw_timeline.initialize_with_lock(ctx, &mut timelines, false, true)?
|
raw_timeline.initialize_with_lock(ctx, &mut timelines, false, true)?
|
||||||
|
|||||||
@@ -52,7 +52,7 @@ use crate::metrics::NUM_ONDISK_LAYERS;
|
|||||||
use crate::repository::Key;
|
use crate::repository::Key;
|
||||||
use crate::tenant::storage_layer::InMemoryLayer;
|
use crate::tenant::storage_layer::InMemoryLayer;
|
||||||
use crate::tenant::storage_layer::Layer;
|
use crate::tenant::storage_layer::Layer;
|
||||||
use anyhow::Result;
|
use anyhow::{bail, Result};
|
||||||
use std::collections::VecDeque;
|
use std::collections::VecDeque;
|
||||||
use std::ops::Range;
|
use std::ops::Range;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
@@ -126,7 +126,7 @@ where
|
|||||||
///
|
///
|
||||||
/// Insert an on-disk layer.
|
/// Insert an on-disk layer.
|
||||||
///
|
///
|
||||||
pub fn insert_historic(&mut self, layer: Arc<L>) {
|
pub fn insert_historic(&mut self, layer: Arc<L>) -> anyhow::Result<()> {
|
||||||
self.layer_map.insert_historic_noflush(layer)
|
self.layer_map.insert_historic_noflush(layer)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -274,17 +274,22 @@ where
|
|||||||
///
|
///
|
||||||
/// Helper function for BatchedUpdates::insert_historic
|
/// Helper function for BatchedUpdates::insert_historic
|
||||||
///
|
///
|
||||||
pub(self) fn insert_historic_noflush(&mut self, layer: Arc<L>) {
|
pub(self) fn insert_historic_noflush(&mut self, layer: Arc<L>) -> anyhow::Result<()> {
|
||||||
self.historic.insert(
|
let key = historic_layer_coverage::LayerKey::from(&*layer);
|
||||||
historic_layer_coverage::LayerKey::from(&*layer),
|
if self.historic.contains(&key) {
|
||||||
Arc::clone(&layer),
|
bail!(
|
||||||
);
|
"Attempt to insert duplicate layer {} in layer map",
|
||||||
|
layer.short_id()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
self.historic.insert(key, Arc::clone(&layer));
|
||||||
|
|
||||||
if Self::is_l0(&layer) {
|
if Self::is_l0(&layer) {
|
||||||
self.l0_delta_layers.push(layer);
|
self.l0_delta_layers.push(layer);
|
||||||
}
|
}
|
||||||
|
|
||||||
NUM_ONDISK_LAYERS.inc();
|
NUM_ONDISK_LAYERS.inc();
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
///
|
///
|
||||||
@@ -838,7 +843,7 @@ mod tests {
|
|||||||
|
|
||||||
let expected_in_counts = (1, usize::from(expected_l0));
|
let expected_in_counts = (1, usize::from(expected_l0));
|
||||||
|
|
||||||
map.batch_update().insert_historic(remote.clone());
|
map.batch_update().insert_historic(remote.clone()).unwrap();
|
||||||
assert_eq!(count_layer_in(&map, &remote), expected_in_counts);
|
assert_eq!(count_layer_in(&map, &remote), expected_in_counts);
|
||||||
|
|
||||||
let replaced = map
|
let replaced = map
|
||||||
|
|||||||
@@ -417,6 +417,14 @@ impl<Value: Clone> BufferedHistoricLayerCoverage<Value> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn contains(&self, layer_key: &LayerKey) -> bool {
|
||||||
|
match self.buffer.get(layer_key) {
|
||||||
|
Some(None) => false, // layer remove was buffered
|
||||||
|
Some(_) => true, // layer insert was buffered
|
||||||
|
None => self.layers.contains_key(layer_key), // no buffered ops for this layer
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub fn insert(&mut self, layer_key: LayerKey, value: Value) {
|
pub fn insert(&mut self, layer_key: LayerKey, value: Value) {
|
||||||
self.buffer.insert(layer_key, Some(value));
|
self.buffer.insert(layer_key, Some(value));
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1446,7 +1446,7 @@ impl Timeline {
|
|||||||
|
|
||||||
trace!("found layer {}", layer.path().display());
|
trace!("found layer {}", layer.path().display());
|
||||||
total_physical_size += file_size;
|
total_physical_size += file_size;
|
||||||
updates.insert_historic(Arc::new(layer));
|
updates.insert_historic(Arc::new(layer))?;
|
||||||
num_layers += 1;
|
num_layers += 1;
|
||||||
} else if let Some(deltafilename) = DeltaFileName::parse_str(&fname) {
|
} else if let Some(deltafilename) = DeltaFileName::parse_str(&fname) {
|
||||||
// Create a DeltaLayer struct for each delta file.
|
// Create a DeltaLayer struct for each delta file.
|
||||||
@@ -1478,7 +1478,7 @@ impl Timeline {
|
|||||||
|
|
||||||
trace!("found layer {}", layer.path().display());
|
trace!("found layer {}", layer.path().display());
|
||||||
total_physical_size += file_size;
|
total_physical_size += file_size;
|
||||||
updates.insert_historic(Arc::new(layer));
|
updates.insert_historic(Arc::new(layer))?;
|
||||||
num_layers += 1;
|
num_layers += 1;
|
||||||
} else if fname == METADATA_FILE_NAME || fname.ends_with(".old") {
|
} else if fname == METADATA_FILE_NAME || fname.ends_with(".old") {
|
||||||
// ignore these
|
// ignore these
|
||||||
@@ -1552,7 +1552,7 @@ impl Timeline {
|
|||||||
// remote index file?
|
// remote index file?
|
||||||
// If so, rename_to_backup those files & replace their local layer with
|
// If so, rename_to_backup those files & replace their local layer with
|
||||||
// a RemoteLayer in the layer map so that we re-download them on-demand.
|
// a RemoteLayer in the layer map so that we re-download them on-demand.
|
||||||
if let Some(local_layer) = local_layer {
|
if let Some(local_layer) = &local_layer {
|
||||||
let local_layer_path = local_layer
|
let local_layer_path = local_layer
|
||||||
.local_path()
|
.local_path()
|
||||||
.expect("caller must ensure that local_layers only contains local layers");
|
.expect("caller must ensure that local_layers only contains local layers");
|
||||||
@@ -1577,7 +1577,6 @@ impl Timeline {
|
|||||||
anyhow::bail!("could not rename file {local_layer_path:?}: {err:?}");
|
anyhow::bail!("could not rename file {local_layer_path:?}: {err:?}");
|
||||||
} else {
|
} else {
|
||||||
self.metrics.resident_physical_size_gauge.sub(local_size);
|
self.metrics.resident_physical_size_gauge.sub(local_size);
|
||||||
updates.remove_historic(local_layer);
|
|
||||||
// fall-through to adding the remote layer
|
// fall-through to adding the remote layer
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@@ -1613,7 +1612,11 @@ impl Timeline {
|
|||||||
);
|
);
|
||||||
let remote_layer = Arc::new(remote_layer);
|
let remote_layer = Arc::new(remote_layer);
|
||||||
|
|
||||||
updates.insert_historic(remote_layer);
|
if let Some(local_layer) = &local_layer {
|
||||||
|
updates.replace_historic(local_layer, remote_layer)?;
|
||||||
|
} else {
|
||||||
|
updates.insert_historic(remote_layer)?;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
LayerFileName::Delta(deltafilename) => {
|
LayerFileName::Delta(deltafilename) => {
|
||||||
// Create a RemoteLayer for the delta file.
|
// Create a RemoteLayer for the delta file.
|
||||||
@@ -1637,7 +1640,11 @@ impl Timeline {
|
|||||||
LayerAccessStats::for_loading_layer(LayerResidenceStatus::Evicted),
|
LayerAccessStats::for_loading_layer(LayerResidenceStatus::Evicted),
|
||||||
);
|
);
|
||||||
let remote_layer = Arc::new(remote_layer);
|
let remote_layer = Arc::new(remote_layer);
|
||||||
updates.insert_historic(remote_layer);
|
if let Some(local_layer) = &local_layer {
|
||||||
|
updates.replace_historic(local_layer, remote_layer)?;
|
||||||
|
} else {
|
||||||
|
updates.insert_historic(remote_layer)?;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -2684,7 +2691,7 @@ impl Timeline {
|
|||||||
.write()
|
.write()
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.batch_update()
|
.batch_update()
|
||||||
.insert_historic(Arc::new(new_delta));
|
.insert_historic(Arc::new(new_delta))?;
|
||||||
|
|
||||||
// update the timeline's physical size
|
// update the timeline's physical size
|
||||||
let sz = new_delta_path.metadata()?.len();
|
let sz = new_delta_path.metadata()?.len();
|
||||||
@@ -2889,7 +2896,7 @@ impl Timeline {
|
|||||||
self.metrics
|
self.metrics
|
||||||
.resident_physical_size_gauge
|
.resident_physical_size_gauge
|
||||||
.add(metadata.len());
|
.add(metadata.len());
|
||||||
updates.insert_historic(Arc::new(l));
|
updates.insert_historic(Arc::new(l))?;
|
||||||
}
|
}
|
||||||
updates.flush();
|
updates.flush();
|
||||||
drop(layers);
|
drop(layers);
|
||||||
@@ -3322,7 +3329,7 @@ impl Timeline {
|
|||||||
|
|
||||||
new_layer_paths.insert(new_delta_path, LayerFileMetadata::new(metadata.len()));
|
new_layer_paths.insert(new_delta_path, LayerFileMetadata::new(metadata.len()));
|
||||||
let x: Arc<dyn PersistentLayer + 'static> = Arc::new(l);
|
let x: Arc<dyn PersistentLayer + 'static> = Arc::new(l);
|
||||||
updates.insert_historic(x);
|
updates.insert_historic(x)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Now that we have reshuffled the data to set of new delta layers, we can
|
// Now that we have reshuffled the data to set of new delta layers, we can
|
||||||
|
|||||||
Reference in New Issue
Block a user