mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-05 06:20:37 +00:00
Big refactor (WIP)
This commit is contained in:
@@ -263,6 +263,7 @@ fn bench_from_real_project(c: &mut Criterion) {
|
||||
kr.start.to_i128()..kr.end.to_i128(),
|
||||
lr.start.0..lr.end.0,
|
||||
format!("Layer {}", lr.start.0),
|
||||
!layer.is_incremental(),
|
||||
);
|
||||
}
|
||||
bstlm.rebuild();
|
||||
@@ -323,6 +324,7 @@ fn bench_sequential(c: &mut Criterion) {
|
||||
kr.start.to_i128()..kr.end.to_i128(),
|
||||
lr.start.0..lr.end.0,
|
||||
format!("Layer {}", lr.start.0),
|
||||
!layer.is_incremental(),
|
||||
);
|
||||
}
|
||||
bstlm.rebuild();
|
||||
|
||||
@@ -74,12 +74,14 @@ use utils::{
|
||||
mod blob_io;
|
||||
pub mod block_io;
|
||||
pub mod bst_layer_map;
|
||||
pub mod coverage;
|
||||
mod delta_layer;
|
||||
mod disk_btree;
|
||||
pub(crate) mod ephemeral_file;
|
||||
pub mod filename;
|
||||
mod image_layer;
|
||||
mod inmemory_layer;
|
||||
pub mod latest_layer_map;
|
||||
pub mod layer_map;
|
||||
|
||||
pub mod metadata;
|
||||
|
||||
@@ -1,38 +1,14 @@
|
||||
use std::collections::BTreeMap;
|
||||
use std::ops::Range;
|
||||
use std::sync::Arc;
|
||||
|
||||
// TODO the `im` crate has 20x more downloads and also has
|
||||
// persistent/immutable BTree. See if it's better.
|
||||
use rpds::RedBlackTreeMapSync;
|
||||
use super::latest_layer_map::LatestLayerMap;
|
||||
|
||||
/// Layer map implemented using persistent/immutable binary search tree.
|
||||
/// It supports historical queries, but no retroactive inserts. For that
|
||||
/// see RetroactiveLayerMap.
|
||||
///
|
||||
/// Layer type is abstracted as Value to make unit testing easier.
|
||||
pub struct PersistentLayerMap<Value> {
|
||||
/// Mapping key to the latest layer (if any) until the next key.
|
||||
/// We use the Sync version of the map because we want Self to
|
||||
/// be Sync.
|
||||
///
|
||||
/// TODO Separate Head into its own struct LatestLayerMap
|
||||
/// TODO Merge historic with retroactive, into HistoricLayerMap
|
||||
/// TODO Maintain a pair of heads, one for images, one for deltas.
|
||||
/// This way we can query both of them with one BTreeMap query.
|
||||
head: RedBlackTreeMapSync<i128, Option<(u64, Value)>>,
|
||||
/// The latest-only solution
|
||||
head: LatestLayerMap<Value>,
|
||||
|
||||
/// All previous states of `self.head`
|
||||
///
|
||||
/// TODO: Sorted Vec + binary search could be slightly faster.
|
||||
historic: BTreeMap<u64, RedBlackTreeMapSync<i128, Option<(u64, Value)>>>,
|
||||
}
|
||||
|
||||
impl<Value: std::fmt::Debug> std::fmt::Debug for PersistentLayerMap<Value> {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
let head_vec: Vec<_> = self.head.iter().collect();
|
||||
write!(f, "PersistentLayerMap: head: {:?}", head_vec)
|
||||
}
|
||||
/// All previous states
|
||||
historic: BTreeMap<u64, LatestLayerMap<Value>>,
|
||||
}
|
||||
|
||||
impl<T: Clone> Default for PersistentLayerMap<T> {
|
||||
@@ -44,22 +20,18 @@ impl<T: Clone> Default for PersistentLayerMap<T> {
|
||||
impl<Value: Clone> PersistentLayerMap<Value> {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
head: RedBlackTreeMapSync::default(),
|
||||
head: LatestLayerMap::default(),
|
||||
historic: BTreeMap::default(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Helper function to subdivide the key range without changing any values
|
||||
fn add_node(self: &mut Self, key: i128) {
|
||||
let value = match self.head.range(..=key).last() {
|
||||
Some((_, Some(v))) => Some(v.clone()),
|
||||
Some((_, None)) => None,
|
||||
None => None,
|
||||
};
|
||||
self.head.insert_mut(key, value);
|
||||
}
|
||||
|
||||
pub fn insert(self: &mut Self, key: Range<i128>, lsn: Range<u64>, value: Value) {
|
||||
pub fn insert(
|
||||
self: &mut Self,
|
||||
key: Range<i128>,
|
||||
lsn: Range<u64>,
|
||||
value: Value,
|
||||
is_image: bool,
|
||||
) {
|
||||
// It's only a persistent map, not a retroactive one
|
||||
if let Some(last_entry) = self.historic.iter().rev().next() {
|
||||
let last_lsn = last_entry.0;
|
||||
@@ -71,65 +43,25 @@ impl<Value: Clone> PersistentLayerMap<Value> {
|
||||
}
|
||||
}
|
||||
|
||||
// NOTE The order of the following lines is important!!
|
||||
|
||||
// Add nodes at endpoints
|
||||
self.add_node(key.start);
|
||||
self.add_node(key.end);
|
||||
|
||||
// Raise the height where necessary
|
||||
//
|
||||
// NOTE This loop is worst case O(N), but amortized O(log N) in the special
|
||||
// case when rectangles have no height. In practice I don't think we'll see
|
||||
// the kind of layer intersections needed to trigger O(N) behavior. If we
|
||||
// do it can be fixed using lazy propagation.
|
||||
let mut to_update = Vec::new();
|
||||
let mut to_remove = Vec::new();
|
||||
let mut prev_covered = false;
|
||||
for (k, node) in self.head.range(key.clone()) {
|
||||
let needs_cover = match node {
|
||||
None => true,
|
||||
Some((h, _)) => h < &lsn.end,
|
||||
};
|
||||
if needs_cover {
|
||||
match prev_covered {
|
||||
true => to_remove.push(k.clone()),
|
||||
false => to_update.push(k.clone()),
|
||||
}
|
||||
}
|
||||
prev_covered = needs_cover;
|
||||
}
|
||||
if !prev_covered {
|
||||
to_remove.push(key.end);
|
||||
}
|
||||
for k in to_update {
|
||||
self.head
|
||||
.insert_mut(k.clone(), Some((lsn.end.clone(), value.clone())));
|
||||
}
|
||||
for k in to_remove {
|
||||
self.head.remove_mut(&k);
|
||||
}
|
||||
self.head.insert(key, lsn.clone(), value, is_image);
|
||||
|
||||
// Remember history. Clone is O(1)
|
||||
self.historic.insert(lsn.start, self.head.clone());
|
||||
}
|
||||
|
||||
pub fn query(self: &Self, key: i128, lsn: u64) -> Option<Value> {
|
||||
let version = self.historic.range(..=lsn).rev().next()?.1;
|
||||
version
|
||||
.range(..=key)
|
||||
.rev()
|
||||
.next()?
|
||||
.1
|
||||
.as_ref()
|
||||
.map(|(_, v)| v.clone())
|
||||
pub fn query(self: &Self, key: i128, lsn: u64) -> (Option<Value>, Option<Value>) {
|
||||
let version = match self.historic.range(..=lsn).rev().next() {
|
||||
Some((_, v)) => v,
|
||||
None => return (None, None),
|
||||
};
|
||||
version.query(key)
|
||||
}
|
||||
|
||||
pub fn get_coverage(
|
||||
self: &Self,
|
||||
lsn: u64,
|
||||
) -> Option<&RedBlackTreeMapSync<i128, Option<(u64, Value)>>> {
|
||||
Some(self.historic.range(..=lsn).rev().next()?.1)
|
||||
pub fn get_version(self: &Self, lsn: u64) -> Option<&LatestLayerMap<Value>> {
|
||||
match self.historic.range(..=lsn).rev().next() {
|
||||
Some((_, v)) => Some(v),
|
||||
None => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn trim(self: &mut Self, begin: &u64) {
|
||||
@@ -144,64 +76,46 @@ impl<Value: Clone> PersistentLayerMap<Value> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Basic test for the immutable bst library, just to show usage.
|
||||
#[test]
|
||||
fn test_immutable_bst_dependency() {
|
||||
let map = RedBlackTreeMapSync::<i32, i32>::default();
|
||||
|
||||
let mut v1 = map.clone();
|
||||
let v2 = map.insert(1, 5);
|
||||
|
||||
// We can query current and past versions of key 1
|
||||
assert_eq!(v1.get(&1), None);
|
||||
assert_eq!(v2.get(&1), Some(&5));
|
||||
|
||||
// We can mutate old state, but it creates a branch.
|
||||
// It doesn't retroactively change future versions.
|
||||
v1.insert_mut(2, 6);
|
||||
assert_eq!(v1.get(&2), Some(&6));
|
||||
assert_eq!(v2.get(&2), None);
|
||||
}
|
||||
|
||||
/// This is the most basic test that demonstrates intended usage.
|
||||
/// All layers in this test have height 1.
|
||||
#[test]
|
||||
fn test_persistent_simple() {
|
||||
let mut map = PersistentLayerMap::<String>::new();
|
||||
map.insert(0..5, 100..101, "Layer 1".to_string());
|
||||
map.insert(3..9, 110..111, "Layer 2".to_string());
|
||||
map.insert(5..6, 120..121, "Layer 3".to_string());
|
||||
map.insert(0..5, 100..101, "Layer 1".to_string(), true);
|
||||
map.insert(3..9, 110..111, "Layer 2".to_string(), true);
|
||||
map.insert(5..6, 120..121, "Layer 3".to_string(), true);
|
||||
|
||||
// After Layer 1 insertion
|
||||
assert_eq!(map.query(1, 105), Some("Layer 1".to_string()));
|
||||
assert_eq!(map.query(4, 105), Some("Layer 1".to_string()));
|
||||
assert_eq!(map.query(1, 105).1, Some("Layer 1".to_string()));
|
||||
assert_eq!(map.query(4, 105).1, Some("Layer 1".to_string()));
|
||||
|
||||
// After Layer 2 insertion
|
||||
assert_eq!(map.query(4, 115), Some("Layer 2".to_string()));
|
||||
assert_eq!(map.query(8, 115), Some("Layer 2".to_string()));
|
||||
assert_eq!(map.query(11, 115), None);
|
||||
assert_eq!(map.query(4, 115).1, Some("Layer 2".to_string()));
|
||||
assert_eq!(map.query(8, 115).1, Some("Layer 2".to_string()));
|
||||
assert_eq!(map.query(11, 115).1, None);
|
||||
|
||||
// After Layer 3 insertion
|
||||
assert_eq!(map.query(4, 125), Some("Layer 2".to_string()));
|
||||
assert_eq!(map.query(5, 125), Some("Layer 3".to_string()));
|
||||
assert_eq!(map.query(7, 125), Some("Layer 2".to_string()));
|
||||
assert_eq!(map.query(4, 125).1, Some("Layer 2".to_string()));
|
||||
assert_eq!(map.query(5, 125).1, Some("Layer 3".to_string()));
|
||||
assert_eq!(map.query(7, 125).1, Some("Layer 2".to_string()));
|
||||
}
|
||||
|
||||
/// Cover simple off-by-one edge cases
|
||||
#[test]
|
||||
fn test_off_by_one() {
|
||||
let mut map = PersistentLayerMap::<String>::new();
|
||||
map.insert(3..5, 100..110, "Layer 1".to_string());
|
||||
map.insert(3..5, 100..110, "Layer 1".to_string(), true);
|
||||
|
||||
// Check different LSNs
|
||||
assert_eq!(map.query(4, 99), None);
|
||||
assert_eq!(map.query(4, 100), Some("Layer 1".to_string()));
|
||||
assert_eq!(map.query(4, 99).1, None);
|
||||
assert_eq!(map.query(4, 100).1, Some("Layer 1".to_string()));
|
||||
assert_eq!(map.query(4, 110).1, Some("Layer 1".to_string()));
|
||||
|
||||
// Check different keys
|
||||
assert_eq!(map.query(2, 105), None);
|
||||
assert_eq!(map.query(3, 105), Some("Layer 1".to_string()));
|
||||
assert_eq!(map.query(4, 105), Some("Layer 1".to_string()));
|
||||
assert_eq!(map.query(5, 105), None);
|
||||
assert_eq!(map.query(2, 105).1, None);
|
||||
assert_eq!(map.query(3, 105).1, Some("Layer 1".to_string()));
|
||||
assert_eq!(map.query(4, 105).1, Some("Layer 1".to_string()));
|
||||
assert_eq!(map.query(5, 105).1, None);
|
||||
}
|
||||
|
||||
/// Cover edge cases where layers begin or end on the same key
|
||||
@@ -209,24 +123,24 @@ fn test_off_by_one() {
|
||||
fn test_key_collision() {
|
||||
let mut map = PersistentLayerMap::<String>::new();
|
||||
|
||||
map.insert(3..5, 100..110, "Layer 10".to_string());
|
||||
map.insert(5..8, 100..110, "Layer 11".to_string());
|
||||
map.insert(3..5, 100..110, "Layer 10".to_string(), true);
|
||||
map.insert(5..8, 100..110, "Layer 11".to_string(), true);
|
||||
|
||||
map.insert(3..4, 200..210, "Layer 20".to_string());
|
||||
map.insert(3..4, 200..210, "Layer 20".to_string(), true);
|
||||
|
||||
// Check after layer 11
|
||||
assert_eq!(map.query(2, 105), None);
|
||||
assert_eq!(map.query(3, 105), Some("Layer 10".to_string()));
|
||||
assert_eq!(map.query(5, 105), Some("Layer 11".to_string()));
|
||||
assert_eq!(map.query(7, 105), Some("Layer 11".to_string()));
|
||||
assert_eq!(map.query(8, 105), None);
|
||||
assert_eq!(map.query(2, 105).1, None);
|
||||
assert_eq!(map.query(3, 105).1, Some("Layer 10".to_string()));
|
||||
assert_eq!(map.query(5, 105).1, Some("Layer 11".to_string()));
|
||||
assert_eq!(map.query(7, 105).1, Some("Layer 11".to_string()));
|
||||
assert_eq!(map.query(8, 105).1, None);
|
||||
|
||||
// Check after layer 20
|
||||
assert_eq!(map.query(2, 205), None);
|
||||
assert_eq!(map.query(3, 205), Some("Layer 20".to_string()));
|
||||
assert_eq!(map.query(5, 205), Some("Layer 11".to_string()));
|
||||
assert_eq!(map.query(7, 205), Some("Layer 11".to_string()));
|
||||
assert_eq!(map.query(8, 205), None);
|
||||
assert_eq!(map.query(2, 205).1, None);
|
||||
assert_eq!(map.query(3, 205).1, Some("Layer 20".to_string()));
|
||||
assert_eq!(map.query(5, 205).1, Some("Layer 11".to_string()));
|
||||
assert_eq!(map.query(7, 205).1, Some("Layer 11".to_string()));
|
||||
assert_eq!(map.query(8, 205).1, None);
|
||||
}
|
||||
|
||||
/// Test when rectangles have nontrivial height and possibly overlap
|
||||
@@ -235,45 +149,45 @@ fn test_persistent_overlapping() {
|
||||
let mut map = PersistentLayerMap::<String>::new();
|
||||
|
||||
// Add 3 key-disjoint layers with varying LSN ranges
|
||||
map.insert(1..2, 100..200, "Layer 1".to_string());
|
||||
map.insert(4..5, 110..200, "Layer 2".to_string());
|
||||
map.insert(7..8, 120..300, "Layer 3".to_string());
|
||||
map.insert(1..2, 100..200, "Layer 1".to_string(), true);
|
||||
map.insert(4..5, 110..200, "Layer 2".to_string(), true);
|
||||
map.insert(7..8, 120..300, "Layer 3".to_string(), true);
|
||||
|
||||
// Add wide and short layer
|
||||
map.insert(0..9, 130..199, "Layer 4".to_string());
|
||||
map.insert(0..9, 130..199, "Layer 4".to_string(), true);
|
||||
|
||||
// Add wide layer taller than some
|
||||
map.insert(0..9, 140..201, "Layer 5".to_string());
|
||||
map.insert(0..9, 140..201, "Layer 5".to_string(), true);
|
||||
|
||||
// Add wide layer taller than all
|
||||
map.insert(0..9, 150..301, "Layer 6".to_string());
|
||||
map.insert(0..9, 150..301, "Layer 6".to_string(), true);
|
||||
|
||||
// After layer 4 insertion
|
||||
assert_eq!(map.query(0, 135), Some("Layer 4".to_string()));
|
||||
assert_eq!(map.query(1, 135), Some("Layer 1".to_string()));
|
||||
assert_eq!(map.query(2, 135), Some("Layer 4".to_string()));
|
||||
assert_eq!(map.query(4, 135), Some("Layer 2".to_string()));
|
||||
assert_eq!(map.query(5, 135), Some("Layer 4".to_string()));
|
||||
assert_eq!(map.query(7, 135), Some("Layer 3".to_string()));
|
||||
assert_eq!(map.query(8, 135), Some("Layer 4".to_string()));
|
||||
assert_eq!(map.query(0, 135).1, Some("Layer 4".to_string()));
|
||||
assert_eq!(map.query(1, 135).1, Some("Layer 1".to_string()));
|
||||
assert_eq!(map.query(2, 135).1, Some("Layer 4".to_string()));
|
||||
assert_eq!(map.query(4, 135).1, Some("Layer 2".to_string()));
|
||||
assert_eq!(map.query(5, 135).1, Some("Layer 4".to_string()));
|
||||
assert_eq!(map.query(7, 135).1, Some("Layer 3".to_string()));
|
||||
assert_eq!(map.query(8, 135).1, Some("Layer 4".to_string()));
|
||||
|
||||
// After layer 5 insertion
|
||||
assert_eq!(map.query(0, 145), Some("Layer 5".to_string()));
|
||||
assert_eq!(map.query(1, 145), Some("Layer 5".to_string()));
|
||||
assert_eq!(map.query(2, 145), Some("Layer 5".to_string()));
|
||||
assert_eq!(map.query(4, 145), Some("Layer 5".to_string()));
|
||||
assert_eq!(map.query(5, 145), Some("Layer 5".to_string()));
|
||||
assert_eq!(map.query(7, 145), Some("Layer 3".to_string()));
|
||||
assert_eq!(map.query(8, 145), Some("Layer 5".to_string()));
|
||||
assert_eq!(map.query(0, 145).1, Some("Layer 5".to_string()));
|
||||
assert_eq!(map.query(1, 145).1, Some("Layer 5".to_string()));
|
||||
assert_eq!(map.query(2, 145).1, Some("Layer 5".to_string()));
|
||||
assert_eq!(map.query(4, 145).1, Some("Layer 5".to_string()));
|
||||
assert_eq!(map.query(5, 145).1, Some("Layer 5".to_string()));
|
||||
assert_eq!(map.query(7, 145).1, Some("Layer 3".to_string()));
|
||||
assert_eq!(map.query(8, 145).1, Some("Layer 5".to_string()));
|
||||
|
||||
// After layer 6 insertion
|
||||
assert_eq!(map.query(0, 155), Some("Layer 6".to_string()));
|
||||
assert_eq!(map.query(1, 155), Some("Layer 6".to_string()));
|
||||
assert_eq!(map.query(2, 155), Some("Layer 6".to_string()));
|
||||
assert_eq!(map.query(4, 155), Some("Layer 6".to_string()));
|
||||
assert_eq!(map.query(5, 155), Some("Layer 6".to_string()));
|
||||
assert_eq!(map.query(7, 155), Some("Layer 6".to_string()));
|
||||
assert_eq!(map.query(8, 155), Some("Layer 6".to_string()));
|
||||
assert_eq!(map.query(0, 155).1, Some("Layer 6".to_string()));
|
||||
assert_eq!(map.query(1, 155).1, Some("Layer 6".to_string()));
|
||||
assert_eq!(map.query(2, 155).1, Some("Layer 6".to_string()));
|
||||
assert_eq!(map.query(4, 155).1, Some("Layer 6".to_string()));
|
||||
assert_eq!(map.query(5, 155).1, Some("Layer 6".to_string()));
|
||||
assert_eq!(map.query(7, 155).1, Some("Layer 6".to_string()));
|
||||
assert_eq!(map.query(8, 155).1, Some("Layer 6".to_string()));
|
||||
}
|
||||
|
||||
/// Layer map that supports:
|
||||
@@ -292,15 +206,18 @@ pub struct RetroactiveLayerMap<Value> {
|
||||
|
||||
/// We buffer insertion into the PersistentLayerMap to decrease the number of rebuilds.
|
||||
/// A value of None means we want to delete this item.
|
||||
buffer: BTreeMap<(u64, u64, i128, i128), Option<Value>>,
|
||||
buffer: BTreeMap<(u64, u64, i128, i128, bool), Option<Value>>,
|
||||
|
||||
/// All current layers. This is not used for search. Only to make rebuilds easier.
|
||||
layers: BTreeMap<(u64, u64, i128, i128), Value>,
|
||||
layers: BTreeMap<(u64, u64, i128, i128, bool), Value>,
|
||||
}
|
||||
|
||||
impl<Value: std::fmt::Debug> std::fmt::Debug for RetroactiveLayerMap<Value> {
|
||||
impl<T: std::fmt::Debug> std::fmt::Debug for RetroactiveLayerMap<T> {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "RetroactiveLayerMap: head: {:?}", self.map)
|
||||
f.debug_struct("RetroactiveLayerMap")
|
||||
.field("buffer", &self.buffer)
|
||||
.field("layers", &self.layers)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -319,22 +236,28 @@ impl<Value: Clone> RetroactiveLayerMap<Value> {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn insert(self: &mut Self, key: Range<i128>, lsn: Range<u64>, value: Value) {
|
||||
pub fn insert(
|
||||
self: &mut Self,
|
||||
key: Range<i128>,
|
||||
lsn: Range<u64>,
|
||||
value: Value,
|
||||
is_image: bool,
|
||||
) {
|
||||
self.buffer.insert(
|
||||
(lsn.start, lsn.end, key.start, key.end),
|
||||
(lsn.start, lsn.end, key.start, key.end, is_image),
|
||||
Some(value.clone()),
|
||||
);
|
||||
}
|
||||
|
||||
pub fn remove(self: &mut Self, key: Range<i128>, lsn: Range<u64>) {
|
||||
pub fn remove(self: &mut Self, key: Range<i128>, lsn: Range<u64>, is_image: bool) {
|
||||
self.buffer
|
||||
.insert((lsn.start, lsn.end, key.start, key.end), None);
|
||||
.insert((lsn.start, lsn.end, key.start, key.end, is_image), None);
|
||||
}
|
||||
|
||||
pub fn rebuild(self: &mut Self) {
|
||||
// Find the first LSN that needs to be rebuilt
|
||||
let rebuild_since: u64 = match self.buffer.iter().next() {
|
||||
Some(((lsn_start, _, _, _), _)) => lsn_start.clone(),
|
||||
Some(((lsn_start, _, _, _, _), _)) => lsn_start.clone(),
|
||||
None => return, // No need to rebuild if buffer is empty
|
||||
};
|
||||
|
||||
@@ -359,11 +282,15 @@ impl<Value: Clone> RetroactiveLayerMap<Value> {
|
||||
|
||||
// Rebuild
|
||||
self.map.trim(&rebuild_since);
|
||||
for ((lsn_start, lsn_end, key_start, key_end), layer) in
|
||||
self.layers.range((rebuild_since, 0, 0, 0)..)
|
||||
for ((lsn_start, lsn_end, key_start, key_end, is_image), layer) in
|
||||
self.layers.range((rebuild_since, 0, 0, 0, false)..)
|
||||
{
|
||||
self.map
|
||||
.insert(*key_start..*key_end, *lsn_start..*lsn_end, layer.clone());
|
||||
self.map.insert(
|
||||
*key_start..*key_end,
|
||||
*lsn_start..*lsn_end,
|
||||
layer.clone(),
|
||||
*is_image,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -371,7 +298,7 @@ impl<Value: Clone> RetroactiveLayerMap<Value> {
|
||||
self.map.trim(&0);
|
||||
}
|
||||
|
||||
pub fn query(self: &Self, key: i128, lsn: u64) -> Option<Value> {
|
||||
pub fn query(self: &Self, key: i128, lsn: u64) -> (Option<Value>, Option<Value>) {
|
||||
if !self.buffer.is_empty() {
|
||||
panic!("rebuild pls")
|
||||
}
|
||||
@@ -379,16 +306,37 @@ impl<Value: Clone> RetroactiveLayerMap<Value> {
|
||||
self.map.query(key, lsn)
|
||||
}
|
||||
|
||||
pub fn get_coverage(
|
||||
self: &Self,
|
||||
lsn: u64,
|
||||
) -> Option<&RedBlackTreeMapSync<i128, Option<(u64, Value)>>> {
|
||||
pub fn get_version(self: &Self, lsn: u64) -> Option<&LatestLayerMap<Value>> {
|
||||
if !self.buffer.is_empty() {
|
||||
panic!("rebuild pls")
|
||||
}
|
||||
|
||||
self.map.get_coverage(lsn)
|
||||
self.map.get_version(lsn)
|
||||
}
|
||||
|
||||
pub fn iter(self: &Self) -> impl '_ + Iterator<Item = Value> {
|
||||
if !self.buffer.is_empty() {
|
||||
panic!("rebuild pls")
|
||||
}
|
||||
|
||||
self.layers.iter().map(|(_, v)| v.clone())
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_retroactive_regression_1() {
|
||||
let mut map = RetroactiveLayerMap::new();
|
||||
|
||||
map.insert(
|
||||
0..21267647932558653966460912964485513215,
|
||||
23761336..23761457,
|
||||
"sdfsdfs".to_string(),
|
||||
false,
|
||||
);
|
||||
|
||||
map.rebuild();
|
||||
|
||||
assert_eq!(map.query(100, 23761457).0, Some("sdfsdfs".to_string()));
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -396,29 +344,29 @@ fn test_retroactive_simple() {
|
||||
let mut map = RetroactiveLayerMap::new();
|
||||
|
||||
// Append some images in increasing LSN order
|
||||
map.insert(0..5, 100..101, "Image 1".to_string());
|
||||
map.insert(3..9, 110..111, "Image 2".to_string());
|
||||
map.insert(4..6, 120..121, "Image 3".to_string());
|
||||
map.insert(8..9, 120..121, "Image 4".to_string());
|
||||
map.insert(0..5, 100..101, "Image 1".to_string(), true);
|
||||
map.insert(3..9, 110..111, "Image 2".to_string(), true);
|
||||
map.insert(4..6, 120..121, "Image 3".to_string(), true);
|
||||
map.insert(8..9, 120..121, "Image 4".to_string(), true);
|
||||
|
||||
// Add a delta layer out of order
|
||||
map.insert(2..5, 105..106, "Delta 1".to_string());
|
||||
map.insert(2..5, 105..106, "Delta 1".to_string(), true);
|
||||
|
||||
// Rebuild so we can start querying
|
||||
map.rebuild();
|
||||
|
||||
// Query key 4
|
||||
assert_eq!(map.query(4, 90), None);
|
||||
assert_eq!(map.query(4, 102), Some("Image 1".to_string()));
|
||||
assert_eq!(map.query(4, 107), Some("Delta 1".to_string()));
|
||||
assert_eq!(map.query(4, 115), Some("Image 2".to_string()));
|
||||
assert_eq!(map.query(4, 125), Some("Image 3".to_string()));
|
||||
assert_eq!(map.query(4, 90).1, None);
|
||||
assert_eq!(map.query(4, 102).1, Some("Image 1".to_string()));
|
||||
assert_eq!(map.query(4, 107).1, Some("Delta 1".to_string()));
|
||||
assert_eq!(map.query(4, 115).1, Some("Image 2".to_string()));
|
||||
assert_eq!(map.query(4, 125).1, Some("Image 3".to_string()));
|
||||
|
||||
// Remove Image 3
|
||||
map.remove(4..6, 120..121);
|
||||
map.remove(4..6, 120..121, true);
|
||||
map.rebuild();
|
||||
|
||||
// Check deletion worked
|
||||
assert_eq!(map.query(4, 125), Some("Image 2".to_string()));
|
||||
assert_eq!(map.query(8, 125), Some("Image 4".to_string()));
|
||||
assert_eq!(map.query(4, 125).1, Some("Image 2".to_string()));
|
||||
assert_eq!(map.query(8, 125).1, Some("Image 4".to_string()));
|
||||
}
|
||||
|
||||
104
pageserver/src/tenant/coverage.rs
Normal file
104
pageserver/src/tenant/coverage.rs
Normal file
@@ -0,0 +1,104 @@
|
||||
use std::ops::Range;
|
||||
|
||||
// TODO the `im` crate has 20x more downloads and also has
|
||||
// persistent/immutable BTree. It also runs a bit faster but
|
||||
// results are not the same on some tests.
|
||||
use rpds::RedBlackTreeMapSync;
|
||||
|
||||
pub struct Coverage<Value> {
|
||||
/// Mapping key to the latest layer (if any) until the next key.
|
||||
/// We use the Sync version of the map because we want Self to
|
||||
/// be Sync. Using nonsync might be faster, if we can work with
|
||||
/// that.
|
||||
head: RedBlackTreeMapSync<i128, Option<(u64, Value)>>,
|
||||
}
|
||||
|
||||
impl<T: Clone> Default for Coverage<T> {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
impl<Value: Clone> Coverage<Value> {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
head: RedBlackTreeMapSync::default(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Helper function to subdivide the key range without changing any values
|
||||
fn add_node(self: &mut Self, key: i128) {
|
||||
let value = match self.head.range(..=key).last() {
|
||||
Some((_, Some(v))) => Some(v.clone()),
|
||||
Some((_, None)) => None,
|
||||
None => None,
|
||||
};
|
||||
self.head.insert_mut(key, value);
|
||||
}
|
||||
|
||||
pub fn insert(self: &mut Self, key: Range<i128>, lsn: Range<u64>, value: Value) {
|
||||
// NOTE The order of the following lines is important!!
|
||||
|
||||
// Add nodes at endpoints
|
||||
self.add_node(key.start);
|
||||
self.add_node(key.end);
|
||||
|
||||
// Raise the height where necessary
|
||||
//
|
||||
// NOTE This loop is worst case O(N), but amortized O(log N) in the special
|
||||
// case when rectangles have no height. In practice I don't think we'll see
|
||||
// the kind of layer intersections needed to trigger O(N) behavior. If we
|
||||
// do it can be fixed using lazy propagation.
|
||||
let mut to_update = Vec::new();
|
||||
let mut to_remove = Vec::new();
|
||||
let mut prev_covered = false;
|
||||
for (k, node) in self.head.range(key.clone()) {
|
||||
let needs_cover = match node {
|
||||
None => true,
|
||||
Some((h, _)) => h < &lsn.end,
|
||||
};
|
||||
if needs_cover {
|
||||
match prev_covered {
|
||||
true => to_remove.push(k.clone()),
|
||||
false => to_update.push(k.clone()),
|
||||
}
|
||||
}
|
||||
prev_covered = needs_cover;
|
||||
}
|
||||
if !prev_covered {
|
||||
to_remove.push(key.end);
|
||||
}
|
||||
for k in to_update {
|
||||
self.head
|
||||
.insert_mut(k.clone(), Some((lsn.end.clone(), value.clone())));
|
||||
}
|
||||
for k in to_remove {
|
||||
self.head.remove_mut(&k);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn query(self: &Self, key: i128) -> Option<Value> {
|
||||
self.head
|
||||
.range(..=key)
|
||||
.rev()
|
||||
.next()?
|
||||
.1
|
||||
.as_ref()
|
||||
.map(|(_, v)| v.clone())
|
||||
}
|
||||
|
||||
pub fn range(
|
||||
self: &Self,
|
||||
key: Range<i128>,
|
||||
) -> impl '_ + Iterator<Item = (i128, Option<Value>)> {
|
||||
self.head
|
||||
.range(key)
|
||||
.map(|(k, v)| (k.clone(), v.as_ref().map(|x| x.1.clone())))
|
||||
}
|
||||
|
||||
pub fn clone(self: &Self) -> Self {
|
||||
Self {
|
||||
head: self.head.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
60
pageserver/src/tenant/latest_layer_map.rs
Normal file
60
pageserver/src/tenant/latest_layer_map.rs
Normal file
@@ -0,0 +1,60 @@
|
||||
use std::ops::Range;
|
||||
|
||||
use super::coverage::Coverage;
|
||||
|
||||
pub struct LatestLayerMap<Value> {
|
||||
image_coverage: Coverage<Value>,
|
||||
delta_coverage: Coverage<Value>,
|
||||
}
|
||||
|
||||
impl<T: Clone> Default for LatestLayerMap<T> {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
image_coverage: Coverage::default(),
|
||||
delta_coverage: Coverage::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<Value: Clone> LatestLayerMap<Value> {
|
||||
pub fn insert(
|
||||
self: &mut Self,
|
||||
key: Range<i128>,
|
||||
lsn: Range<u64>,
|
||||
value: Value,
|
||||
is_image: bool,
|
||||
) {
|
||||
if is_image {
|
||||
self.image_coverage.insert(key, lsn, value);
|
||||
} else {
|
||||
self.delta_coverage.insert(key.clone(), lsn.clone(), value);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn query(self: &Self, key: i128) -> (Option<Value>, Option<Value>) {
|
||||
let delta = self.delta_coverage.query(key);
|
||||
let image = self.image_coverage.query(key);
|
||||
(delta, image)
|
||||
}
|
||||
|
||||
pub fn image_coverage(
|
||||
self: &Self,
|
||||
key: Range<i128>,
|
||||
) -> impl '_ + Iterator<Item = (i128, Option<Value>)> {
|
||||
self.image_coverage.range(key)
|
||||
}
|
||||
|
||||
pub fn delta_coverage(
|
||||
self: &Self,
|
||||
key: Range<i128>,
|
||||
) -> impl '_ + Iterator<Item = (i128, Option<Value>)> {
|
||||
self.delta_coverage.range(key)
|
||||
}
|
||||
|
||||
pub fn clone(self: &Self) -> Self {
|
||||
Self {
|
||||
image_coverage: self.image_coverage.clone(),
|
||||
delta_coverage: self.delta_coverage.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -54,181 +54,14 @@ pub struct LayerMap {
|
||||
///
|
||||
pub frozen_layers: VecDeque<Arc<InMemoryLayer>>,
|
||||
|
||||
/// All the historic layers are kept here
|
||||
historic_layers: RTree<LayerRTreeObject>,
|
||||
|
||||
/// HACK I'm experimenting with a new index to reaplace the RTree. If this
|
||||
/// works out I'll clean up the struct later.
|
||||
/// Index of the historic layers optimized for search
|
||||
index: RetroactiveLayerMap<Arc<dyn Layer>>,
|
||||
images: RetroactiveLayerMap<Arc<dyn Layer>>,
|
||||
|
||||
/// L0 layers have key range Key::MIN..Key::MAX, and locating them using R-Tree search is very inefficient.
|
||||
/// So L0 layers are held in l0_delta_layers vector, in addition to the R-tree.
|
||||
l0_delta_layers: Vec<Arc<dyn Layer>>,
|
||||
}
|
||||
|
||||
struct LayerRTreeObject {
|
||||
layer: Arc<dyn Layer>,
|
||||
|
||||
envelope: AABB<[IntKey; 2]>,
|
||||
}
|
||||
|
||||
// Representation of Key as numeric type.
|
||||
// We can not use native implementation of i128, because rstar::RTree
|
||||
// doesn't handle properly integer overflow during area calculation: sum(Xi*Yi).
|
||||
// Overflow will cause panic in debug mode and incorrect area calculation in release mode,
|
||||
// which leads to non-optimally balanced R-Tree (but doesn't fit correctness of R-Tree work).
|
||||
// By using i256 as the type, even though all the actual values would fit in i128, we can be
|
||||
// sure that multiplication doesn't overflow.
|
||||
//
|
||||
|
||||
#[derive(Clone, PartialEq, Eq, PartialOrd, Debug)]
|
||||
struct IntKey(i256);
|
||||
|
||||
impl Copy for IntKey {}
|
||||
|
||||
impl IntKey {
|
||||
fn from(i: i128) -> Self {
|
||||
IntKey(i256::from(i))
|
||||
}
|
||||
}
|
||||
|
||||
impl Bounded for IntKey {
|
||||
fn min_value() -> Self {
|
||||
IntKey(i256::MIN)
|
||||
}
|
||||
fn max_value() -> Self {
|
||||
IntKey(i256::MAX)
|
||||
}
|
||||
}
|
||||
|
||||
impl Signed for IntKey {
|
||||
fn is_positive(&self) -> bool {
|
||||
self.0 > i256::ZERO
|
||||
}
|
||||
fn is_negative(&self) -> bool {
|
||||
self.0 < i256::ZERO
|
||||
}
|
||||
fn signum(&self) -> Self {
|
||||
match self.0.cmp(&i256::ZERO) {
|
||||
Ordering::Greater => IntKey(i256::ONE),
|
||||
Ordering::Less => IntKey(-i256::ONE),
|
||||
Ordering::Equal => IntKey(i256::ZERO),
|
||||
}
|
||||
}
|
||||
fn abs(&self) -> Self {
|
||||
IntKey(self.0.abs())
|
||||
}
|
||||
fn abs_sub(&self, other: &Self) -> Self {
|
||||
if self.0 <= other.0 {
|
||||
IntKey(i256::ZERO)
|
||||
} else {
|
||||
IntKey(self.0 - other.0)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Neg for IntKey {
|
||||
type Output = Self;
|
||||
fn neg(self) -> Self::Output {
|
||||
IntKey(-self.0)
|
||||
}
|
||||
}
|
||||
|
||||
impl Rem for IntKey {
|
||||
type Output = Self;
|
||||
fn rem(self, rhs: Self) -> Self::Output {
|
||||
IntKey(self.0 % rhs.0)
|
||||
}
|
||||
}
|
||||
|
||||
impl Div for IntKey {
|
||||
type Output = Self;
|
||||
fn div(self, rhs: Self) -> Self::Output {
|
||||
IntKey(self.0 / rhs.0)
|
||||
}
|
||||
}
|
||||
|
||||
impl Add for IntKey {
|
||||
type Output = Self;
|
||||
fn add(self, rhs: Self) -> Self::Output {
|
||||
IntKey(self.0 + rhs.0)
|
||||
}
|
||||
}
|
||||
|
||||
impl Sub for IntKey {
|
||||
type Output = Self;
|
||||
fn sub(self, rhs: Self) -> Self::Output {
|
||||
IntKey(self.0 - rhs.0)
|
||||
}
|
||||
}
|
||||
|
||||
impl Mul for IntKey {
|
||||
type Output = Self;
|
||||
fn mul(self, rhs: Self) -> Self::Output {
|
||||
IntKey(self.0 * rhs.0)
|
||||
}
|
||||
}
|
||||
|
||||
impl One for IntKey {
|
||||
fn one() -> Self {
|
||||
IntKey(i256::ONE)
|
||||
}
|
||||
}
|
||||
|
||||
impl Zero for IntKey {
|
||||
fn zero() -> Self {
|
||||
IntKey(i256::ZERO)
|
||||
}
|
||||
fn is_zero(&self) -> bool {
|
||||
self.0 == i256::ZERO
|
||||
}
|
||||
}
|
||||
|
||||
impl Num for IntKey {
|
||||
type FromStrRadixErr = <i128 as Num>::FromStrRadixErr;
|
||||
fn from_str_radix(str: &str, radix: u32) -> Result<Self, Self::FromStrRadixErr> {
|
||||
Ok(IntKey(i256::from(i128::from_str_radix(str, radix)?)))
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialEq for LayerRTreeObject {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
// FIXME: ptr_eq might fail to return true for 'dyn'
|
||||
// references. Clippy complains about this. In practice it
|
||||
// seems to work, the assertion below would be triggered
|
||||
// otherwise but this ought to be fixed.
|
||||
#[allow(clippy::vtable_address_comparisons)]
|
||||
Arc::ptr_eq(&self.layer, &other.layer)
|
||||
}
|
||||
}
|
||||
|
||||
impl RTreeObject for LayerRTreeObject {
|
||||
type Envelope = AABB<[IntKey; 2]>;
|
||||
fn envelope(&self) -> Self::Envelope {
|
||||
self.envelope
|
||||
}
|
||||
}
|
||||
|
||||
impl LayerRTreeObject {
|
||||
fn new(layer: Arc<dyn Layer>) -> Self {
|
||||
let key_range = layer.get_key_range();
|
||||
let lsn_range = layer.get_lsn_range();
|
||||
|
||||
let envelope = AABB::from_corners(
|
||||
[
|
||||
IntKey::from(key_range.start.to_i128()),
|
||||
IntKey::from(lsn_range.start.0 as i128),
|
||||
],
|
||||
[
|
||||
IntKey::from(key_range.end.to_i128() - 1),
|
||||
IntKey::from(lsn_range.end.0 as i128 - 1),
|
||||
], // AABB::upper is inclusive, while `key_range.end` and `lsn_range.end` are exclusive
|
||||
);
|
||||
LayerRTreeObject { layer, envelope }
|
||||
}
|
||||
}
|
||||
|
||||
/// Return value of LayerMap::search
|
||||
pub struct SearchResult {
|
||||
pub layer: Arc<dyn Layer>,
|
||||
@@ -248,159 +81,40 @@ impl LayerMap {
|
||||
/// layer.
|
||||
///
|
||||
pub fn search(&self, key: Key, end_lsn: Lsn) -> Result<Option<SearchResult>> {
|
||||
// let old = self.search_old(key, end_lsn)?;
|
||||
let new = self.search_new(key, end_lsn)?;
|
||||
// match (&old, &new) {
|
||||
// (None, None) => {}
|
||||
// (None, Some(_)) => panic!("returned Some, expected None"),
|
||||
// (Some(_), None) => panic!("returned None, expected Some"),
|
||||
// (Some(old), Some(new)) => {
|
||||
// // TODO be more verbose and flexible
|
||||
// let context = format!("query: key {}, end_lsn: {}", key, end_lsn);
|
||||
// assert_eq!(old.layer.filename(), new.layer.filename(), "{}", context);
|
||||
// assert_eq!(old.lsn_floor, new.lsn_floor, "{}", context);
|
||||
// }
|
||||
// }
|
||||
return Ok(new);
|
||||
}
|
||||
|
||||
// HACK just testing correctness
|
||||
fn search_new(&self, key: Key, end_lsn: Lsn) -> Result<Option<SearchResult>> {
|
||||
// TODO I'm making two separate queries, which is 2x the cost, but that
|
||||
// can be avoided in varous ways. Caching latest_image queries is
|
||||
// probably the simplest, but combining the two data structures
|
||||
// might be better.
|
||||
let latest_layer = self.index.query(key.to_i128(), end_lsn.0 - 1);
|
||||
let latest_image = self.images.query(key.to_i128(), end_lsn.0 - 1);
|
||||
|
||||
// Check for exact match
|
||||
let latest_image = if let Some(image) = latest_image {
|
||||
let img_lsn = image.get_lsn_range().start;
|
||||
if Lsn(img_lsn.0 + 1) == end_lsn {
|
||||
return Ok(Some(SearchResult {
|
||||
match self.index.query(key.to_i128(), end_lsn.0 - 1) {
|
||||
(None, None) => Ok(None),
|
||||
(None, Some(image)) => {
|
||||
let lsn_floor = image.get_lsn_range().start;
|
||||
Ok(Some(SearchResult {
|
||||
layer: image,
|
||||
lsn_floor: img_lsn,
|
||||
}));
|
||||
lsn_floor,
|
||||
}))
|
||||
}
|
||||
|
||||
// HACK just to give back ownership of latest_image to parent scope.
|
||||
// There's definitely a cleaner way to do it.
|
||||
Some(image)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
return Ok(latest_layer.map(|layer| {
|
||||
// Compute lsn_floor
|
||||
let mut lsn_floor = layer.get_lsn_range().start;
|
||||
if let Some(image) = latest_image {
|
||||
if layer.is_incremental() {
|
||||
lsn_floor = std::cmp::max(lsn_floor, image.get_lsn_range().start + 1)
|
||||
(Some(delta), None) => {
|
||||
let lsn_floor = delta.get_lsn_range().start;
|
||||
Ok(Some(SearchResult {
|
||||
layer: delta,
|
||||
lsn_floor,
|
||||
}))
|
||||
}
|
||||
(Some(delta), Some(image)) => {
|
||||
let img_lsn = image.get_lsn_range().start;
|
||||
let image_is_newer = image.get_lsn_range().end > delta.get_lsn_range().end;
|
||||
let image_exact_match = Lsn(img_lsn.0 + 1) == end_lsn;
|
||||
if image_is_newer || image_exact_match {
|
||||
Ok(Some(SearchResult {
|
||||
layer: image,
|
||||
lsn_floor: img_lsn,
|
||||
}))
|
||||
} else {
|
||||
let lsn_floor =
|
||||
std::cmp::max(delta.get_lsn_range().start, image.get_lsn_range().start + 1);
|
||||
Ok(Some(SearchResult {
|
||||
layer: delta,
|
||||
lsn_floor,
|
||||
}))
|
||||
}
|
||||
}
|
||||
SearchResult { layer, lsn_floor }
|
||||
}));
|
||||
}
|
||||
|
||||
// HACK just testing correctness
|
||||
fn search_old(&self, key: Key, end_lsn: Lsn) -> Result<Option<SearchResult>> {
|
||||
// linear search
|
||||
// Find the latest image layer that covers the given key
|
||||
let mut latest_img: Option<Arc<dyn Layer>> = None;
|
||||
let mut latest_img_lsn: Option<Lsn> = None;
|
||||
let envelope = AABB::from_corners(
|
||||
[IntKey::from(key.to_i128()), IntKey::from(0i128)],
|
||||
[
|
||||
IntKey::from(key.to_i128()),
|
||||
IntKey::from(end_lsn.0 as i128 - 1),
|
||||
],
|
||||
);
|
||||
for e in self
|
||||
.historic_layers
|
||||
.locate_in_envelope_intersecting(&envelope)
|
||||
{
|
||||
let l = &e.layer;
|
||||
if l.is_incremental() {
|
||||
continue;
|
||||
}
|
||||
assert!(l.get_key_range().contains(&key));
|
||||
let img_lsn = l.get_lsn_range().start;
|
||||
assert!(img_lsn < end_lsn);
|
||||
if Lsn(img_lsn.0 + 1) == end_lsn {
|
||||
// found exact match
|
||||
return Ok(Some(SearchResult {
|
||||
layer: Arc::clone(l),
|
||||
lsn_floor: img_lsn,
|
||||
}));
|
||||
}
|
||||
if img_lsn > latest_img_lsn.unwrap_or(Lsn(0)) {
|
||||
latest_img = Some(Arc::clone(l));
|
||||
latest_img_lsn = Some(img_lsn);
|
||||
}
|
||||
}
|
||||
|
||||
// Search the delta layers
|
||||
let mut latest_delta: Option<Arc<dyn Layer>> = None;
|
||||
for e in self
|
||||
.historic_layers
|
||||
.locate_in_envelope_intersecting(&envelope)
|
||||
{
|
||||
let l = &e.layer;
|
||||
if !l.is_incremental() {
|
||||
continue;
|
||||
}
|
||||
assert!(l.get_key_range().contains(&key));
|
||||
if l.get_lsn_range().start >= end_lsn {
|
||||
info!(
|
||||
"Candidate delta layer {}..{} is too new for lsn {}",
|
||||
l.get_lsn_range().start,
|
||||
l.get_lsn_range().end,
|
||||
end_lsn
|
||||
);
|
||||
}
|
||||
assert!(l.get_lsn_range().start < end_lsn);
|
||||
if l.get_lsn_range().end >= end_lsn {
|
||||
// this layer contains the requested point in the key/lsn space.
|
||||
// No need to search any further
|
||||
trace!(
|
||||
"found layer {} for request on {key} at {end_lsn}",
|
||||
l.filename().display(),
|
||||
);
|
||||
latest_delta.replace(Arc::clone(l));
|
||||
break;
|
||||
}
|
||||
// this layer's end LSN is smaller than the requested point. If there's
|
||||
// nothing newer, this is what we need to return. Remember this.
|
||||
if let Some(old_candidate) = &latest_delta {
|
||||
if l.get_lsn_range().end > old_candidate.get_lsn_range().end {
|
||||
latest_delta.replace(Arc::clone(l));
|
||||
}
|
||||
} else {
|
||||
latest_delta.replace(Arc::clone(l));
|
||||
}
|
||||
}
|
||||
if let Some(l) = latest_delta {
|
||||
trace!(
|
||||
"found (old) layer {} for request on {key} at {end_lsn}",
|
||||
l.filename().display(),
|
||||
);
|
||||
let lsn_floor = std::cmp::max(
|
||||
Lsn(latest_img_lsn.unwrap_or(Lsn(0)).0 + 1),
|
||||
l.get_lsn_range().start,
|
||||
);
|
||||
Ok(Some(SearchResult {
|
||||
lsn_floor,
|
||||
layer: l,
|
||||
}))
|
||||
} else if let Some(l) = latest_img {
|
||||
trace!("found img layer and no deltas for request on {key} at {end_lsn}");
|
||||
Ok(Some(SearchResult {
|
||||
lsn_floor: latest_img_lsn.unwrap(),
|
||||
layer: l,
|
||||
}))
|
||||
} else {
|
||||
trace!("no layer found for request on {key} at {end_lsn}");
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -414,27 +128,19 @@ impl LayerMap {
|
||||
kr.start.to_i128()..kr.end.to_i128(),
|
||||
lr.start.0..lr.end.0,
|
||||
Arc::clone(&layer),
|
||||
!layer.is_incremental(),
|
||||
);
|
||||
if !layer.is_incremental() {
|
||||
self.images.insert(
|
||||
kr.start.to_i128()..kr.end.to_i128(),
|
||||
lr.start.0..lr.end.0,
|
||||
Arc::clone(&layer),
|
||||
);
|
||||
}
|
||||
|
||||
if layer.get_key_range() == (Key::MIN..Key::MAX) {
|
||||
self.l0_delta_layers.push(layer.clone());
|
||||
}
|
||||
// TODO remove this so insert isn't slow. I need it for now for iter_historic()
|
||||
self.historic_layers.insert(LayerRTreeObject::new(layer));
|
||||
|
||||
NUM_ONDISK_LAYERS.inc();
|
||||
}
|
||||
|
||||
/// Must be called after a batch of insert_historic calls, before querying
|
||||
pub fn rebuild_index(&mut self) {
|
||||
self.index.rebuild();
|
||||
self.images.rebuild();
|
||||
}
|
||||
|
||||
///
|
||||
@@ -445,12 +151,11 @@ impl LayerMap {
|
||||
pub fn remove_historic(&mut self, layer: Arc<dyn Layer>) {
|
||||
let kr = layer.get_key_range();
|
||||
let lr = layer.get_lsn_range();
|
||||
self.index
|
||||
.remove(kr.start.to_i128()..kr.end.to_i128(), lr.start.0..lr.end.0);
|
||||
if !layer.is_incremental() {
|
||||
self.images
|
||||
.remove(kr.start.to_i128()..kr.end.to_i128(), lr.start.0..lr.end.0);
|
||||
}
|
||||
self.index.remove(
|
||||
kr.start.to_i128()..kr.end.to_i128(),
|
||||
lr.start.0..lr.end.0,
|
||||
!layer.is_incremental(),
|
||||
);
|
||||
|
||||
if layer.get_key_range() == (Key::MIN..Key::MAX) {
|
||||
let len_before = self.l0_delta_layers.len();
|
||||
@@ -464,10 +169,7 @@ impl LayerMap {
|
||||
.retain(|other| !Arc::ptr_eq(other, &layer));
|
||||
assert_eq!(self.l0_delta_layers.len(), len_before - 1);
|
||||
}
|
||||
assert!(self
|
||||
.historic_layers
|
||||
.remove(&LayerRTreeObject::new(layer))
|
||||
.is_some());
|
||||
|
||||
NUM_ONDISK_LAYERS.dec();
|
||||
}
|
||||
|
||||
@@ -480,87 +182,17 @@ impl LayerMap {
|
||||
key_range: &Range<Key>,
|
||||
lsn_range: &Range<Lsn>,
|
||||
) -> Result<bool> {
|
||||
// TODO implement using new index
|
||||
let mut range_remain = key_range.clone();
|
||||
|
||||
loop {
|
||||
let mut made_progress = false;
|
||||
let envelope = AABB::from_corners(
|
||||
[
|
||||
IntKey::from(range_remain.start.to_i128()),
|
||||
IntKey::from(lsn_range.start.0 as i128),
|
||||
],
|
||||
[
|
||||
IntKey::from(range_remain.end.to_i128() - 1),
|
||||
IntKey::from(lsn_range.end.0 as i128 - 1),
|
||||
],
|
||||
);
|
||||
for e in self
|
||||
.historic_layers
|
||||
.locate_in_envelope_intersecting(&envelope)
|
||||
{
|
||||
let l = &e.layer;
|
||||
if l.is_incremental() {
|
||||
continue;
|
||||
}
|
||||
let img_lsn = l.get_lsn_range().start;
|
||||
if l.get_key_range().contains(&range_remain.start) && lsn_range.contains(&img_lsn) {
|
||||
made_progress = true;
|
||||
let img_key_end = l.get_key_range().end;
|
||||
|
||||
if img_key_end >= range_remain.end {
|
||||
return Ok(true);
|
||||
}
|
||||
range_remain.start = img_key_end;
|
||||
}
|
||||
}
|
||||
|
||||
if !made_progress {
|
||||
return Ok(false);
|
||||
}
|
||||
}
|
||||
todo!()
|
||||
}
|
||||
|
||||
pub fn iter_historic_layers(&self) -> impl '_ + Iterator<Item = Arc<dyn Layer>> {
|
||||
// TODO implement using new index
|
||||
self.historic_layers.iter().map(|e| e.layer.clone())
|
||||
self.index.iter()
|
||||
}
|
||||
|
||||
/// Find the last image layer that covers 'key', ignoring any image layers
|
||||
/// newer than 'lsn'.
|
||||
fn find_latest_image(&self, key: Key, lsn: Lsn) -> Option<Arc<dyn Layer>> {
|
||||
let use_new_method = true;
|
||||
if use_new_method {
|
||||
return self.images.query(key.to_i128(), lsn.0);
|
||||
}
|
||||
|
||||
let mut candidate_lsn = Lsn(0);
|
||||
let mut candidate = None;
|
||||
let envelope = AABB::from_corners(
|
||||
[IntKey::from(key.to_i128()), IntKey::from(0)],
|
||||
[IntKey::from(key.to_i128()), IntKey::from(lsn.0 as i128)],
|
||||
);
|
||||
for e in self
|
||||
.historic_layers
|
||||
.locate_in_envelope_intersecting(&envelope)
|
||||
{
|
||||
let l = &e.layer;
|
||||
if l.is_incremental() {
|
||||
continue;
|
||||
}
|
||||
|
||||
assert!(l.get_key_range().contains(&key));
|
||||
let this_lsn = l.get_lsn_range().start;
|
||||
assert!(this_lsn <= lsn);
|
||||
if this_lsn < candidate_lsn {
|
||||
// our previous candidate was better
|
||||
continue;
|
||||
}
|
||||
candidate_lsn = this_lsn;
|
||||
candidate = Some(Arc::clone(l));
|
||||
}
|
||||
|
||||
candidate
|
||||
return self.index.query(key.to_i128(), lsn.0).1;
|
||||
}
|
||||
|
||||
///
|
||||
@@ -576,122 +208,44 @@ impl LayerMap {
|
||||
key_range: &Range<Key>,
|
||||
lsn: Lsn,
|
||||
) -> Result<Vec<(Range<Key>, Option<Arc<dyn Layer>>)>> {
|
||||
let use_new_method = true;
|
||||
if use_new_method {
|
||||
let bounds = match self.images.get_coverage(lsn.0) {
|
||||
Some(x) => x,
|
||||
None => return Ok(vec![]),
|
||||
};
|
||||
let version = match self.index.get_version(lsn.0) {
|
||||
Some(v) => v,
|
||||
None => return Ok(vec![]),
|
||||
};
|
||||
|
||||
let start = key_range.start.to_i128();
|
||||
let end = key_range.end.to_i128();
|
||||
let start = key_range.start.to_i128();
|
||||
let end = key_range.end.to_i128();
|
||||
|
||||
// Initialize loop variables
|
||||
let mut coverage: Vec<(Range<Key>, Option<Arc<dyn Layer>>)> = vec![];
|
||||
let mut current_key = start.clone();
|
||||
let mut current_val = match bounds.range(..=start).rev().next() {
|
||||
Some((_, Some((_, v)))) => Some(v.clone()),
|
||||
Some((_, None)) => None,
|
||||
None => None,
|
||||
};
|
||||
// Initialize loop variables
|
||||
let mut coverage: Vec<(Range<Key>, Option<Arc<dyn Layer>>)> = vec![];
|
||||
let mut current_key = start.clone();
|
||||
let mut current_val = version.query(start).1;
|
||||
|
||||
// Loop through the change events and push intervals
|
||||
for (change_key, change_val) in bounds.range(start..end) {
|
||||
let kr = Key::from_i128(current_key)..Key::from_i128(*change_key);
|
||||
coverage.push((kr, current_val.take()));
|
||||
current_key = change_key.clone();
|
||||
current_val = change_val.as_ref().map(|(_, v)| v.clone());
|
||||
}
|
||||
|
||||
// Add the final interval
|
||||
let kr = Key::from_i128(current_key)..Key::from_i128(end);
|
||||
// Loop through the change events and push intervals
|
||||
for (change_key, change_val) in version.image_coverage(start..end) {
|
||||
let kr = Key::from_i128(current_key)..Key::from_i128(change_key);
|
||||
coverage.push((kr, current_val.take()));
|
||||
|
||||
return Ok(coverage);
|
||||
current_key = change_key.clone();
|
||||
current_val = change_val.clone();
|
||||
}
|
||||
|
||||
let mut points = vec![key_range.start];
|
||||
let envelope = AABB::from_corners(
|
||||
[IntKey::from(key_range.start.to_i128()), IntKey::from(0)],
|
||||
[
|
||||
IntKey::from(key_range.end.to_i128()),
|
||||
IntKey::from(lsn.0 as i128),
|
||||
],
|
||||
);
|
||||
for e in self
|
||||
.historic_layers
|
||||
.locate_in_envelope_intersecting(&envelope)
|
||||
{
|
||||
let l = &e.layer;
|
||||
assert!(l.get_lsn_range().start <= lsn);
|
||||
let range = l.get_key_range();
|
||||
if key_range.contains(&range.start) {
|
||||
points.push(l.get_key_range().start);
|
||||
}
|
||||
if key_range.contains(&range.end) {
|
||||
points.push(l.get_key_range().end);
|
||||
}
|
||||
}
|
||||
points.push(key_range.end);
|
||||
// Add the final interval
|
||||
let kr = Key::from_i128(current_key)..Key::from_i128(end);
|
||||
coverage.push((kr, current_val.take()));
|
||||
|
||||
points.sort();
|
||||
points.dedup();
|
||||
|
||||
// Ok, we now have a list of "interesting" points in the key space
|
||||
|
||||
// For each range between the points, find the latest image
|
||||
let mut start = *points.first().unwrap();
|
||||
let mut ranges = Vec::new();
|
||||
for end in points[1..].iter() {
|
||||
let img = self.find_latest_image(start, lsn);
|
||||
|
||||
ranges.push((start..*end, img));
|
||||
|
||||
start = *end;
|
||||
}
|
||||
Ok(ranges)
|
||||
return Ok(coverage);
|
||||
}
|
||||
|
||||
/// Count how many L1 delta layers there are that overlap with the
|
||||
/// given key and LSN range.
|
||||
pub fn count_deltas(&self, key_range: &Range<Key>, lsn_range: &Range<Lsn>) -> Result<usize> {
|
||||
// TODO implement using new index
|
||||
let mut result = 0;
|
||||
if lsn_range.start >= lsn_range.end {
|
||||
return Ok(0);
|
||||
}
|
||||
let envelope = AABB::from_corners(
|
||||
[
|
||||
IntKey::from(key_range.start.to_i128()),
|
||||
IntKey::from(lsn_range.start.0 as i128),
|
||||
],
|
||||
[
|
||||
IntKey::from(key_range.end.to_i128() - 1),
|
||||
IntKey::from(lsn_range.end.0 as i128 - 1),
|
||||
],
|
||||
);
|
||||
for e in self
|
||||
.historic_layers
|
||||
.locate_in_envelope_intersecting(&envelope)
|
||||
{
|
||||
let l = &e.layer;
|
||||
if !l.is_incremental() {
|
||||
continue;
|
||||
}
|
||||
assert!(range_overlaps(&l.get_lsn_range(), lsn_range));
|
||||
assert!(range_overlaps(&l.get_key_range(), key_range));
|
||||
|
||||
// We ignore level0 delta layers. Unless the whole keyspace fits
|
||||
// into one partition
|
||||
if !range_eq(key_range, &(Key::MIN..Key::MAX))
|
||||
&& range_eq(&l.get_key_range(), &(Key::MIN..Key::MAX))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
result += 1;
|
||||
}
|
||||
Ok(result)
|
||||
// TODO write recursive function:
|
||||
// 1. Get delta coverage
|
||||
// 2. Recurse on each part
|
||||
//
|
||||
// This will count some layers twice, but we're interested in the max number of
|
||||
// stacked deltas anyway (parallel deltas don't matter) so that's fine.
|
||||
todo!()
|
||||
}
|
||||
|
||||
/// Return all L0 delta layers
|
||||
|
||||
@@ -151,3 +151,11 @@ pub trait Layer: Send + Sync {
|
||||
/// Dump summary of the contents of the layer to stdout
|
||||
fn dump(&self, verbose: bool) -> Result<()>;
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for dyn Layer {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("Layer")
|
||||
.field("filename", &self.filename())
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user