mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-08 13:10:37 +00:00
Compare commits
13 Commits
split-prox
...
duplicate-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3295bd7fd0 | ||
|
|
55599d52d7 | ||
|
|
52e5bc7992 | ||
|
|
281bd9a48d | ||
|
|
04610d8cb2 | ||
|
|
c1adba4657 | ||
|
|
73314d6032 | ||
|
|
1ab62a93c4 | ||
|
|
255e2da4cf | ||
|
|
531002a7bf | ||
|
|
b844473ba4 | ||
|
|
2462b25728 | ||
|
|
215eb06b11 |
@@ -30,13 +30,19 @@ impl PartialOrd for LayerKey {
|
|||||||
|
|
||||||
impl Ord for LayerKey {
|
impl Ord for LayerKey {
|
||||||
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
|
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
|
||||||
// NOTE we really care about comparing by lsn.start first
|
// NOTE we really care about comparing by lsn.start first, because
|
||||||
|
// the persistent data structure must be constructed in that order.
|
||||||
|
// NOTE we also care about comparing by (-lsn.end), then key.start, and
|
||||||
|
// then (-key.end) next to make sure that if one layer contains
|
||||||
|
// another, the outer one is inserted first and the inner one
|
||||||
|
// is detected as redundant. So if we ever find an L1 layer with
|
||||||
|
// many L0s in it, the L0s are redundant, not the L1.
|
||||||
self.lsn
|
self.lsn
|
||||||
.start
|
.start
|
||||||
.cmp(&other.lsn.start)
|
.cmp(&other.lsn.start)
|
||||||
.then(self.lsn.end.cmp(&other.lsn.end))
|
.then(self.lsn.end.cmp(&other.lsn.end).reverse())
|
||||||
.then(self.key.start.cmp(&other.key.start))
|
.then(self.key.start.cmp(&other.key.start))
|
||||||
.then(self.key.end.cmp(&other.key.end))
|
.then(self.key.end.cmp(&other.key.end).reverse())
|
||||||
.then(self.is_image.cmp(&other.is_image))
|
.then(self.is_image.cmp(&other.is_image))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -80,11 +86,14 @@ impl<Value: Clone> HistoricLayerCoverage<Value> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Add a layer
|
/// Add a layer, returning whether the insertion had any effect.
|
||||||
|
///
|
||||||
|
/// Returns false after inserting a layer whose Key-Lsn space is already
|
||||||
|
/// covered by a union of previously inserted layers.
|
||||||
///
|
///
|
||||||
/// Panics if new layer has older lsn.start than an existing layer.
|
/// Panics if new layer has older lsn.start than an existing layer.
|
||||||
/// See BufferedHistoricLayerCoverage for a more general insertion method.
|
/// See BufferedHistoricLayerCoverage for a more general insertion method.
|
||||||
pub fn insert(&mut self, layer_key: LayerKey, value: Value) {
|
pub fn insert(&mut self, layer_key: LayerKey, value: Value) -> bool {
|
||||||
// It's only a persistent map, not a retroactive one
|
// It's only a persistent map, not a retroactive one
|
||||||
if let Some(last_entry) = self.historic.iter().next_back() {
|
if let Some(last_entry) = self.historic.iter().next_back() {
|
||||||
let last_lsn = last_entry.0;
|
let last_lsn = last_entry.0;
|
||||||
@@ -100,10 +109,12 @@ impl<Value: Clone> HistoricLayerCoverage<Value> {
|
|||||||
&mut self.head.delta_coverage
|
&mut self.head.delta_coverage
|
||||||
};
|
};
|
||||||
|
|
||||||
target.insert(layer_key.key, layer_key.lsn.clone(), value);
|
let changed = target.insert(layer_key.key, layer_key.lsn.clone(), value);
|
||||||
|
|
||||||
// Remember history. Clone is O(1)
|
// Remember history. Clone is O(1)
|
||||||
self.historic.insert(layer_key.lsn.start, self.head.clone());
|
self.historic.insert(layer_key.lsn.start, self.head.clone());
|
||||||
|
|
||||||
|
changed
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Query at a particular LSN, inclusive
|
/// Query at a particular LSN, inclusive
|
||||||
@@ -420,6 +431,18 @@ pub struct BufferedHistoricLayerCoverage<Value> {
|
|||||||
|
|
||||||
/// All current layers. This is not used for search. Only to make rebuilds easier.
|
/// All current layers. This is not used for search. Only to make rebuilds easier.
|
||||||
layers: BTreeMap<LayerKey, Value>,
|
layers: BTreeMap<LayerKey, Value>,
|
||||||
|
|
||||||
|
/// Redundant layers are ones that are completely covered by a union of other layers.
|
||||||
|
/// If two layers are identical only one of them will be marked as redundant, such that
|
||||||
|
/// it is always safe to remove all redundant layers without seeing any difference in
|
||||||
|
/// results. If a layer is contained in another, the inner one will be marked as
|
||||||
|
/// redundant. This makes sure that if we have an L1 layer and a set of L0 layers
|
||||||
|
/// that completely cover it, the L0s are marked redundant, not the L1.
|
||||||
|
///
|
||||||
|
/// Redundant layers can show up if the pageserver dies during compaction, after
|
||||||
|
/// creating some L1 layers but before deleting the L0 layers. In this case we'd rather
|
||||||
|
/// notice the redundant L0 layers than recreate the L1 layer, or do something worse.
|
||||||
|
redundant_layers: BTreeMap<LayerKey, Value>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: std::fmt::Debug> std::fmt::Debug for BufferedHistoricLayerCoverage<T> {
|
impl<T: std::fmt::Debug> std::fmt::Debug for BufferedHistoricLayerCoverage<T> {
|
||||||
@@ -443,6 +466,7 @@ impl<Value: Clone> BufferedHistoricLayerCoverage<Value> {
|
|||||||
historic_coverage: HistoricLayerCoverage::<Value>::new(),
|
historic_coverage: HistoricLayerCoverage::<Value>::new(),
|
||||||
buffer: BTreeMap::new(),
|
buffer: BTreeMap::new(),
|
||||||
layers: BTreeMap::new(),
|
layers: BTreeMap::new(),
|
||||||
|
redundant_layers: BTreeMap::new(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -523,6 +547,7 @@ impl<Value: Clone> BufferedHistoricLayerCoverage<Value> {
|
|||||||
}
|
}
|
||||||
None => {
|
None => {
|
||||||
self.layers.remove(layer_key);
|
self.layers.remove(layer_key);
|
||||||
|
self.redundant_layers.remove(layer_key);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
false
|
false
|
||||||
@@ -533,14 +558,20 @@ impl<Value: Clone> BufferedHistoricLayerCoverage<Value> {
|
|||||||
self.historic_coverage.trim(&rebuild_since);
|
self.historic_coverage.trim(&rebuild_since);
|
||||||
for (layer_key, layer) in self.layers.range(
|
for (layer_key, layer) in self.layers.range(
|
||||||
LayerKey {
|
LayerKey {
|
||||||
lsn: rebuild_since..0,
|
lsn: rebuild_since..u64::MAX,
|
||||||
key: 0..0,
|
key: 0..i128::MAX,
|
||||||
is_image: false,
|
is_image: false,
|
||||||
}..,
|
}..,
|
||||||
) {
|
) {
|
||||||
self.historic_coverage
|
let changed = self
|
||||||
|
.historic_coverage
|
||||||
.insert(layer_key.clone(), layer.clone());
|
.insert(layer_key.clone(), layer.clone());
|
||||||
num_inserted += 1;
|
num_inserted += 1;
|
||||||
|
|
||||||
|
if !changed {
|
||||||
|
self.redundant_layers
|
||||||
|
.insert(layer_key.clone(), layer.clone());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO maybe only warn if ratio is at least 10
|
// TODO maybe only warn if ratio is at least 10
|
||||||
@@ -561,6 +592,16 @@ impl<Value: Clone> BufferedHistoricLayerCoverage<Value> {
|
|||||||
self.layers.values().cloned()
|
self.layers.values().cloned()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns whether the layer with the given key was already inserted as redundant
|
||||||
|
#[allow(dead_code)] // This function should be used on startup to clean up.
|
||||||
|
pub fn is_redundant(&self, key: &LayerKey) -> bool {
|
||||||
|
if !self.buffer.is_empty() {
|
||||||
|
panic!("rebuild pls")
|
||||||
|
}
|
||||||
|
|
||||||
|
self.redundant_layers.contains_key(key)
|
||||||
|
}
|
||||||
|
|
||||||
/// Return a reference to a queryable map, assuming all updates
|
/// Return a reference to a queryable map, assuming all updates
|
||||||
/// have already been processed using self.rebuild()
|
/// have already been processed using self.rebuild()
|
||||||
pub fn get(&self) -> anyhow::Result<&HistoricLayerCoverage<Value>> {
|
pub fn get(&self) -> anyhow::Result<&HistoricLayerCoverage<Value>> {
|
||||||
@@ -700,6 +741,32 @@ fn test_retroactive_simple() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_redundant_layers() {
|
||||||
|
for i in 0..10 {
|
||||||
|
for j in 0..10 {
|
||||||
|
let key1 = LayerKey {
|
||||||
|
key: i..(i + 2),
|
||||||
|
lsn: j..(j + 2),
|
||||||
|
is_image: false,
|
||||||
|
};
|
||||||
|
let key2 = LayerKey {
|
||||||
|
key: 3..6,
|
||||||
|
lsn: 3..6,
|
||||||
|
is_image: false,
|
||||||
|
};
|
||||||
|
let is_redundant = i >= 3 && i + 2 <= 6 && j >= 3 && j + 2 <= 6;
|
||||||
|
|
||||||
|
let mut map = BufferedHistoricLayerCoverage::new();
|
||||||
|
map.insert(key1.clone(), "Delta 1".to_string());
|
||||||
|
map.insert(key2.clone(), "Delta 2".to_string());
|
||||||
|
map.rebuild();
|
||||||
|
assert_eq!(map.is_redundant(&key1), is_redundant);
|
||||||
|
assert!(!map.is_redundant(&key2));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_retroactive_replacement() {
|
fn test_retroactive_replacement() {
|
||||||
let mut map = BufferedHistoricLayerCoverage::new();
|
let mut map = BufferedHistoricLayerCoverage::new();
|
||||||
|
|||||||
@@ -61,10 +61,13 @@ impl<Value: Clone> LayerCoverage<Value> {
|
|||||||
self.nodes.insert_mut(key, value);
|
self.nodes.insert_mut(key, value);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Insert a layer.
|
/// Insert a layer, returning whether the insertion had any effect.
|
||||||
|
///
|
||||||
|
/// Returns false after inserting a layer whose Key-Lsn space is already
|
||||||
|
/// covered by a union of previously inserted layers.
|
||||||
///
|
///
|
||||||
/// Complexity: worst case O(N), in practice O(log N). See NOTE in implementation.
|
/// Complexity: worst case O(N), in practice O(log N). See NOTE in implementation.
|
||||||
pub fn insert(&mut self, key: Range<i128>, lsn: Range<u64>, value: Value) {
|
pub fn insert(&mut self, key: Range<i128>, lsn: Range<u64>, value: Value) -> bool {
|
||||||
// Add nodes at endpoints
|
// Add nodes at endpoints
|
||||||
//
|
//
|
||||||
// NOTE The order of lines is important. We add nodes at the start
|
// NOTE The order of lines is important. We add nodes at the start
|
||||||
@@ -99,12 +102,14 @@ impl<Value: Clone> LayerCoverage<Value> {
|
|||||||
}
|
}
|
||||||
// TODO check if the nodes inserted at key.start and key.end are safe
|
// TODO check if the nodes inserted at key.start and key.end are safe
|
||||||
// to remove. It's fine to keep them but they could be redundant.
|
// to remove. It's fine to keep them but they could be redundant.
|
||||||
for k in to_update {
|
for k in &to_update {
|
||||||
self.nodes.insert_mut(k, Some((lsn.end, value.clone())));
|
self.nodes.insert_mut(*k, Some((lsn.end, value.clone())));
|
||||||
}
|
}
|
||||||
for k in to_remove {
|
for k in &to_remove {
|
||||||
self.nodes.remove_mut(&k);
|
self.nodes.remove_mut(k);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
!to_update.is_empty()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get the latest (by lsn.end) layer at a given key
|
/// Get the latest (by lsn.end) layer at a given key
|
||||||
|
|||||||
@@ -3517,6 +3517,10 @@ impl Timeline {
|
|||||||
|
|
||||||
drop(all_keys_iter); // So that deltas_to_compact is no longer borrowed
|
drop(all_keys_iter); // So that deltas_to_compact is no longer borrowed
|
||||||
|
|
||||||
|
fail_point!("compact-level0-phase1-finish", |_| {
|
||||||
|
Err(anyhow::anyhow!("failpoint compact-level0-phase1-finish").into())
|
||||||
|
});
|
||||||
|
|
||||||
Ok(CompactLevel0Phase1Result {
|
Ok(CompactLevel0Phase1Result {
|
||||||
new_layers,
|
new_layers,
|
||||||
deltas_to_compact,
|
deltas_to_compact,
|
||||||
|
|||||||
Reference in New Issue
Block a user