mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-17 10:22:56 +00:00
Compare commits
57 Commits
release-pr
...
im_vs_rpds
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6e478a4fc0 | ||
|
|
e796521bb7 | ||
|
|
76055c7bbc | ||
|
|
fe0851f2c4 | ||
|
|
21c85b1969 | ||
|
|
5dc99f1b04 | ||
|
|
2fcbb46338 | ||
|
|
87c3e55449 | ||
|
|
9886741828 | ||
|
|
5557fc6062 | ||
|
|
dbb5d0800d | ||
|
|
e392d25828 | ||
|
|
2951be5386 | ||
|
|
7c6af6d729 | ||
|
|
cb5b9375d2 | ||
|
|
d3d17f2c7c | ||
|
|
c3f5e00ad1 | ||
|
|
d0095d4457 | ||
|
|
7d057e1038 | ||
|
|
f22437086e | ||
|
|
7c6909c31f | ||
|
|
a2642966f2 | ||
|
|
3d6bc126ed | ||
|
|
fb6569c880 | ||
|
|
115549261c | ||
|
|
01e09fc56c | ||
|
|
8618f1d225 | ||
|
|
6f374d24be | ||
|
|
f0a23b474c | ||
|
|
6bfa99e0c4 | ||
|
|
bfeaee90ca | ||
|
|
d835d8c853 | ||
|
|
3b40d7b61a | ||
|
|
2737e33565 | ||
|
|
5fb9619ab0 | ||
|
|
de548acefc | ||
|
|
db72f432e5 | ||
|
|
5a6284f6f8 | ||
|
|
c11c19d568 | ||
|
|
d9a239475c | ||
|
|
a4311bd961 | ||
|
|
e0f23242be | ||
|
|
0de1ec14e0 | ||
|
|
0053ccac13 | ||
|
|
b6686d63b0 | ||
|
|
edf8a08dcc | ||
|
|
b51d766a44 | ||
|
|
c9b1655885 | ||
|
|
206ddec636 | ||
|
|
cb44698c47 | ||
|
|
af325a30db | ||
|
|
be499156e5 | ||
|
|
4a11f6e2de | ||
|
|
d9190aae87 | ||
|
|
6bce11e810 | ||
|
|
39003aa9f3 | ||
|
|
a8f0d27c92 |
419
Cargo.lock
generated
419
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -67,7 +67,9 @@ storage_broker = { version = "0.1", path = "../storage_broker" }
|
||||
tenant_size_model = { path = "../libs/tenant_size_model" }
|
||||
utils = { path = "../libs/utils" }
|
||||
workspace_hack = { version = "0.1", path = "../workspace_hack" }
|
||||
rpds = "0.12.0"
|
||||
reqwest = { version = "0.11", default-features = false, features = ["rustls-tls"] }
|
||||
im = "15.1.0"
|
||||
|
||||
[dev-dependencies]
|
||||
criterion = "0.4"
|
||||
|
||||
@@ -1,13 +1,12 @@
|
||||
use anyhow::Result;
|
||||
use pageserver::keyspace::{KeyPartitioning, KeySpace};
|
||||
use pageserver::repository::Key;
|
||||
use pageserver::tenant::layer_map::LayerMap;
|
||||
use pageserver::tenant::storage_layer::{DeltaFileName, ImageFileName, ValueReconstructState};
|
||||
use pageserver::tenant::storage_layer::{Layer, ValueReconstructResult};
|
||||
use pageserver::tenant::storage_layer::Layer;
|
||||
use pageserver::tenant::storage_layer::{DeltaFileName, ImageFileName, LayerDescriptor};
|
||||
use rand::prelude::{SeedableRng, SliceRandom, StdRng};
|
||||
use std::cmp::{max, min};
|
||||
use std::fs::File;
|
||||
use std::io::{BufRead, BufReader};
|
||||
use std::ops::Range;
|
||||
use std::path::PathBuf;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
@@ -17,80 +16,8 @@ use utils::lsn::Lsn;
|
||||
|
||||
use criterion::{criterion_group, criterion_main, Criterion};
|
||||
|
||||
struct DummyDelta {
|
||||
key_range: Range<Key>,
|
||||
lsn_range: Range<Lsn>,
|
||||
}
|
||||
|
||||
impl Layer for DummyDelta {
|
||||
fn get_key_range(&self) -> Range<Key> {
|
||||
self.key_range.clone()
|
||||
}
|
||||
|
||||
fn get_lsn_range(&self) -> Range<Lsn> {
|
||||
self.lsn_range.clone()
|
||||
}
|
||||
fn get_value_reconstruct_data(
|
||||
&self,
|
||||
_key: Key,
|
||||
_lsn_range: Range<Lsn>,
|
||||
_reconstruct_data: &mut ValueReconstructState,
|
||||
) -> Result<ValueReconstructResult> {
|
||||
panic!()
|
||||
}
|
||||
|
||||
fn is_incremental(&self) -> bool {
|
||||
true
|
||||
}
|
||||
|
||||
fn dump(&self, _verbose: bool) -> Result<()> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn short_id(&self) -> String {
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
|
||||
struct DummyImage {
|
||||
key_range: Range<Key>,
|
||||
lsn: Lsn,
|
||||
}
|
||||
|
||||
impl Layer for DummyImage {
|
||||
fn get_key_range(&self) -> Range<Key> {
|
||||
self.key_range.clone()
|
||||
}
|
||||
|
||||
fn get_lsn_range(&self) -> Range<Lsn> {
|
||||
// End-bound is exclusive
|
||||
self.lsn..(self.lsn + 1)
|
||||
}
|
||||
|
||||
fn get_value_reconstruct_data(
|
||||
&self,
|
||||
_key: Key,
|
||||
_lsn_range: Range<Lsn>,
|
||||
_reconstruct_data: &mut ValueReconstructState,
|
||||
) -> Result<ValueReconstructResult> {
|
||||
panic!()
|
||||
}
|
||||
|
||||
fn is_incremental(&self) -> bool {
|
||||
false
|
||||
}
|
||||
|
||||
fn dump(&self, _verbose: bool) -> Result<()> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn short_id(&self) -> String {
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
|
||||
fn build_layer_map(filename_dump: PathBuf) -> LayerMap<dyn Layer> {
|
||||
let mut layer_map = LayerMap::<dyn Layer>::default();
|
||||
fn build_layer_map(filename_dump: PathBuf) -> LayerMap<LayerDescriptor> {
|
||||
let mut layer_map = LayerMap::<LayerDescriptor>::default();
|
||||
|
||||
let mut min_lsn = Lsn(u64::MAX);
|
||||
let mut max_lsn = Lsn(0);
|
||||
@@ -100,17 +27,21 @@ fn build_layer_map(filename_dump: PathBuf) -> LayerMap<dyn Layer> {
|
||||
for fname in filenames {
|
||||
let fname = &fname.unwrap();
|
||||
if let Some(imgfilename) = ImageFileName::parse_str(fname) {
|
||||
let layer = DummyImage {
|
||||
key_range: imgfilename.key_range,
|
||||
lsn: imgfilename.lsn,
|
||||
let layer = LayerDescriptor {
|
||||
key: imgfilename.key_range,
|
||||
lsn: imgfilename.lsn..(imgfilename.lsn + 1),
|
||||
is_incremental: false,
|
||||
short_id: fname.to_string(),
|
||||
};
|
||||
layer_map.insert_historic(Arc::new(layer));
|
||||
min_lsn = min(min_lsn, imgfilename.lsn);
|
||||
max_lsn = max(max_lsn, imgfilename.lsn);
|
||||
} else if let Some(deltafilename) = DeltaFileName::parse_str(fname) {
|
||||
let layer = DummyDelta {
|
||||
key_range: deltafilename.key_range,
|
||||
lsn_range: deltafilename.lsn_range.clone(),
|
||||
let layer = LayerDescriptor {
|
||||
key: deltafilename.key_range.clone(),
|
||||
lsn: deltafilename.lsn_range.clone(),
|
||||
is_incremental: true,
|
||||
short_id: fname.to_string(),
|
||||
};
|
||||
layer_map.insert_historic(Arc::new(layer));
|
||||
min_lsn = min(min_lsn, deltafilename.lsn_range.start);
|
||||
@@ -119,6 +50,7 @@ fn build_layer_map(filename_dump: PathBuf) -> LayerMap<dyn Layer> {
|
||||
panic!("unexpected filename {fname}");
|
||||
}
|
||||
}
|
||||
layer_map.rebuild_index();
|
||||
|
||||
println!("min: {min_lsn}, max: {max_lsn}");
|
||||
|
||||
@@ -126,7 +58,7 @@ fn build_layer_map(filename_dump: PathBuf) -> LayerMap<dyn Layer> {
|
||||
}
|
||||
|
||||
/// Construct a layer map query pattern for benchmarks
|
||||
fn uniform_query_pattern(layer_map: &LayerMap<dyn Layer>) -> Vec<(Key, Lsn)> {
|
||||
fn uniform_query_pattern(layer_map: &LayerMap<LayerDescriptor>) -> Vec<(Key, Lsn)> {
|
||||
// For each image layer we query one of the pages contained, at LSN right
|
||||
// before the image layer was created. This gives us a somewhat uniform
|
||||
// coverage of both the lsn and key space because image layers have
|
||||
@@ -150,6 +82,41 @@ fn uniform_query_pattern(layer_map: &LayerMap<dyn Layer>) -> Vec<(Key, Lsn)> {
|
||||
.collect()
|
||||
}
|
||||
|
||||
// Construct a partitioning for testing get_difficulty map when we
|
||||
// don't have an exact result of `collect_keyspace` to work with.
|
||||
fn uniform_key_partitioning(layer_map: &LayerMap<LayerDescriptor>, _lsn: Lsn) -> KeyPartitioning {
|
||||
let mut parts = Vec::new();
|
||||
|
||||
// We add a partition boundary at the start of each image layer,
|
||||
// no matter what lsn range it covers. This is just the easiest
|
||||
// thing to do. A better thing to do would be to get a real
|
||||
// partitioning from some database. Even better, remove the need
|
||||
// for key partitions by deciding where to create image layers
|
||||
// directly based on a coverage-based difficulty map.
|
||||
let mut keys: Vec<_> = layer_map
|
||||
.iter_historic_layers()
|
||||
.filter_map(|l| {
|
||||
if l.is_incremental() {
|
||||
None
|
||||
} else {
|
||||
let kr = l.get_key_range();
|
||||
Some(kr.start.next())
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
keys.sort();
|
||||
|
||||
let mut current_key = Key::from_hex("000000000000000000000000000000000000").unwrap();
|
||||
for key in keys {
|
||||
parts.push(KeySpace {
|
||||
ranges: vec![current_key..key],
|
||||
});
|
||||
current_key = key;
|
||||
}
|
||||
|
||||
KeyPartitioning { parts }
|
||||
}
|
||||
|
||||
// Benchmark using metadata extracted from our performance test environment, from
|
||||
// a project where we have run pgbench many timmes. The pgbench database was initialized
|
||||
// between each test run.
|
||||
@@ -183,24 +150,71 @@ fn bench_from_captest_env(c: &mut Criterion) {
|
||||
// Benchmark using metadata extracted from a real project that was taknig
|
||||
// too long processing layer map queries.
|
||||
fn bench_from_real_project(c: &mut Criterion) {
|
||||
// TODO consider compressing this file
|
||||
// Init layer map
|
||||
let now = Instant::now();
|
||||
let layer_map = build_layer_map(PathBuf::from("benches/odd-brook-layernames.txt"));
|
||||
println!("Finished layer map init in {:?}", now.elapsed());
|
||||
|
||||
// Choose uniformly distributed queries
|
||||
let queries: Vec<(Key, Lsn)> = uniform_query_pattern(&layer_map);
|
||||
|
||||
// Test with uniform query pattern
|
||||
c.bench_function("real_map_uniform_queries", |b| {
|
||||
// Choose inputs for get_difficulty_map
|
||||
let latest_lsn = layer_map
|
||||
.iter_historic_layers()
|
||||
.map(|l| l.get_lsn_range().end)
|
||||
.max()
|
||||
.unwrap();
|
||||
let partitioning = uniform_key_partitioning(&layer_map, latest_lsn);
|
||||
|
||||
// Check correctness of get_difficulty_map
|
||||
// TODO put this in a dedicated test outside of this mod
|
||||
{
|
||||
println!("running correctness check");
|
||||
|
||||
let now = Instant::now();
|
||||
let result_bruteforce = layer_map.get_difficulty_map_bruteforce(latest_lsn, &partitioning);
|
||||
assert!(result_bruteforce.len() == partitioning.parts.len());
|
||||
println!("Finished bruteforce in {:?}", now.elapsed());
|
||||
|
||||
let now = Instant::now();
|
||||
let result_fast = layer_map.get_difficulty_map(latest_lsn, &partitioning, None);
|
||||
assert!(result_fast.len() == partitioning.parts.len());
|
||||
println!("Finished fast in {:?}", now.elapsed());
|
||||
|
||||
// Assert results are equal. Manually iterate for easier debugging.
|
||||
//
|
||||
// TODO The difficulty numbers are in the 50-80 range. That's pretty bad.
|
||||
// We should be monitoring it in prod.
|
||||
let zip = std::iter::zip(
|
||||
&partitioning.parts,
|
||||
std::iter::zip(result_bruteforce, result_fast),
|
||||
);
|
||||
for (_part, (bruteforce, fast)) in zip {
|
||||
assert_eq!(bruteforce, fast);
|
||||
}
|
||||
|
||||
println!("No issues found");
|
||||
}
|
||||
|
||||
// Define and name the benchmark function
|
||||
let mut group = c.benchmark_group("real_map");
|
||||
group.bench_function("uniform_queries", |b| {
|
||||
b.iter(|| {
|
||||
for q in queries.clone().into_iter() {
|
||||
layer_map.search(q.0, q.1);
|
||||
}
|
||||
});
|
||||
});
|
||||
group.bench_function("get_difficulty_map", |b| {
|
||||
b.iter(|| {
|
||||
layer_map.get_difficulty_map(latest_lsn, &partitioning, Some(3));
|
||||
});
|
||||
});
|
||||
group.finish();
|
||||
}
|
||||
|
||||
// Benchmark using synthetic data. Arrange image layers on stacked diagonal lines.
|
||||
fn bench_sequential(c: &mut Criterion) {
|
||||
let mut layer_map: LayerMap<dyn Layer> = LayerMap::default();
|
||||
|
||||
// Init layer map. Create 100_000 layers arranged in 1000 diagonal lines.
|
||||
//
|
||||
// TODO This code is pretty slow and runs even if we're only running other
|
||||
@@ -208,39 +222,38 @@ fn bench_sequential(c: &mut Criterion) {
|
||||
// Putting it inside the `bench_function` closure is not a solution
|
||||
// because then it runs multiple times during warmup.
|
||||
let now = Instant::now();
|
||||
let mut layer_map = LayerMap::default();
|
||||
for i in 0..100_000 {
|
||||
// TODO try inserting a super-wide layer in between every 10 to reflect
|
||||
// what often happens with L1 layers that include non-rel changes.
|
||||
// Maybe do that as a separate test.
|
||||
let i32 = (i as u32) % 100;
|
||||
let zero = Key::from_hex("000000000000000000000000000000000000").unwrap();
|
||||
let layer = DummyImage {
|
||||
key_range: zero.add(10 * i32)..zero.add(10 * i32 + 1),
|
||||
lsn: Lsn(10 * i),
|
||||
let layer = LayerDescriptor {
|
||||
key: zero.add(10 * i32)..zero.add(10 * i32 + 1),
|
||||
lsn: Lsn(i)..Lsn(i + 1),
|
||||
is_incremental: false,
|
||||
short_id: format!("Layer {}", i),
|
||||
};
|
||||
layer_map.insert_historic(Arc::new(layer));
|
||||
}
|
||||
|
||||
// Manually measure runtime without criterion because criterion
|
||||
// has a minimum sample size of 10 and I don't want to run it 10 times.
|
||||
println!("Finished init in {:?}", now.elapsed());
|
||||
layer_map.rebuild_index();
|
||||
println!("Finished layer map init in {:?}", now.elapsed());
|
||||
|
||||
// Choose 100 uniformly random queries
|
||||
let rng = &mut StdRng::seed_from_u64(1);
|
||||
let queries: Vec<(Key, Lsn)> = uniform_query_pattern(&layer_map)
|
||||
.choose_multiple(rng, 1)
|
||||
.choose_multiple(rng, 100)
|
||||
.copied()
|
||||
.collect();
|
||||
|
||||
// Define and name the benchmark function
|
||||
c.bench_function("sequential_uniform_queries", |b| {
|
||||
// Run the search queries
|
||||
let mut group = c.benchmark_group("sequential");
|
||||
group.bench_function("uniform_queries", |b| {
|
||||
b.iter(|| {
|
||||
for q in queries.clone().into_iter() {
|
||||
layer_map.search(q.0, q.1);
|
||||
}
|
||||
});
|
||||
});
|
||||
group.finish();
|
||||
}
|
||||
|
||||
criterion_group!(group_1, bench_from_captest_env);
|
||||
|
||||
@@ -37,6 +37,17 @@ impl Key {
|
||||
| self.field6 as i128
|
||||
}
|
||||
|
||||
pub fn from_i128(x: i128) -> Self {
|
||||
Key {
|
||||
field1: ((x >> 120) & 0xf) as u8,
|
||||
field2: ((x >> 104) & 0xFFFF) as u32,
|
||||
field3: (x >> 72) as u32,
|
||||
field4: (x >> 40) as u32,
|
||||
field5: (x >> 32) as u8,
|
||||
field6: x as u32,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn next(&self) -> Key {
|
||||
self.add(1)
|
||||
}
|
||||
|
||||
@@ -10,23 +10,21 @@
|
||||
//! corresponding files are written to disk.
|
||||
//!
|
||||
|
||||
mod historic_layer_coverage;
|
||||
mod layer_coverage;
|
||||
|
||||
use crate::keyspace::KeyPartitioning;
|
||||
use crate::metrics::NUM_ONDISK_LAYERS;
|
||||
use crate::repository::Key;
|
||||
use crate::tenant::storage_layer::{range_eq, range_overlaps};
|
||||
use amplify_num::i256;
|
||||
use crate::tenant::storage_layer::InMemoryLayer;
|
||||
use crate::tenant::storage_layer::Layer;
|
||||
use anyhow::Result;
|
||||
use num_traits::identities::{One, Zero};
|
||||
use num_traits::{Bounded, Num, Signed};
|
||||
use rstar::{RTree, RTreeObject, AABB};
|
||||
use std::cmp::Ordering;
|
||||
use std::collections::VecDeque;
|
||||
use std::ops::Range;
|
||||
use std::ops::{Add, Div, Mul, Neg, Rem, Sub};
|
||||
use std::sync::Arc;
|
||||
use tracing::*;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use super::storage_layer::{InMemoryLayer, Layer};
|
||||
use historic_layer_coverage::BufferedHistoricLayerCoverage;
|
||||
|
||||
///
|
||||
/// LayerMap tracks what layers exist on a timeline.
|
||||
@@ -51,194 +49,26 @@ pub struct LayerMap<L: ?Sized> {
|
||||
///
|
||||
pub frozen_layers: VecDeque<Arc<InMemoryLayer>>,
|
||||
|
||||
/// All the historic layers are kept here
|
||||
historic_layers: RTree<LayerRTreeObject<L>>,
|
||||
/// Index of the historic layers optimized for search
|
||||
historic: BufferedHistoricLayerCoverage<Arc<L>>,
|
||||
|
||||
/// L0 layers have key range Key::MIN..Key::MAX, and locating them using R-Tree search is very inefficient.
|
||||
/// So L0 layers are held in l0_delta_layers vector, in addition to the R-tree.
|
||||
l0_delta_layers: Vec<Arc<L>>,
|
||||
}
|
||||
|
||||
impl<L: ?Sized> Default for LayerMap<L> {
|
||||
impl<L: ?Sized + PartialEq> Default for LayerMap<L> {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
open_layer: None,
|
||||
next_open_layer_at: None,
|
||||
frozen_layers: VecDeque::default(),
|
||||
historic_layers: RTree::default(),
|
||||
l0_delta_layers: Vec::default(),
|
||||
historic: BufferedHistoricLayerCoverage::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct LayerRTreeObject<L: ?Sized> {
|
||||
layer: Arc<L>,
|
||||
|
||||
envelope: AABB<[IntKey; 2]>,
|
||||
}
|
||||
|
||||
// Representation of Key as numeric type.
|
||||
// We can not use native implementation of i128, because rstar::RTree
|
||||
// doesn't handle properly integer overflow during area calculation: sum(Xi*Yi).
|
||||
// Overflow will cause panic in debug mode and incorrect area calculation in release mode,
|
||||
// which leads to non-optimally balanced R-Tree (but doesn't fit correctness of R-Tree work).
|
||||
// By using i256 as the type, even though all the actual values would fit in i128, we can be
|
||||
// sure that multiplication doesn't overflow.
|
||||
//
|
||||
|
||||
#[derive(Clone, PartialEq, Eq, PartialOrd, Debug)]
|
||||
struct IntKey(i256);
|
||||
|
||||
impl Copy for IntKey {}
|
||||
|
||||
impl IntKey {
|
||||
fn from(i: i128) -> Self {
|
||||
IntKey(i256::from(i))
|
||||
}
|
||||
}
|
||||
|
||||
impl Bounded for IntKey {
|
||||
fn min_value() -> Self {
|
||||
IntKey(i256::MIN)
|
||||
}
|
||||
fn max_value() -> Self {
|
||||
IntKey(i256::MAX)
|
||||
}
|
||||
}
|
||||
|
||||
impl Signed for IntKey {
|
||||
fn is_positive(&self) -> bool {
|
||||
self.0 > i256::ZERO
|
||||
}
|
||||
fn is_negative(&self) -> bool {
|
||||
self.0 < i256::ZERO
|
||||
}
|
||||
fn signum(&self) -> Self {
|
||||
match self.0.cmp(&i256::ZERO) {
|
||||
Ordering::Greater => IntKey(i256::ONE),
|
||||
Ordering::Less => IntKey(-i256::ONE),
|
||||
Ordering::Equal => IntKey(i256::ZERO),
|
||||
}
|
||||
}
|
||||
fn abs(&self) -> Self {
|
||||
IntKey(self.0.abs())
|
||||
}
|
||||
fn abs_sub(&self, other: &Self) -> Self {
|
||||
if self.0 <= other.0 {
|
||||
IntKey(i256::ZERO)
|
||||
} else {
|
||||
IntKey(self.0 - other.0)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Neg for IntKey {
|
||||
type Output = Self;
|
||||
fn neg(self) -> Self::Output {
|
||||
IntKey(-self.0)
|
||||
}
|
||||
}
|
||||
|
||||
impl Rem for IntKey {
|
||||
type Output = Self;
|
||||
fn rem(self, rhs: Self) -> Self::Output {
|
||||
IntKey(self.0 % rhs.0)
|
||||
}
|
||||
}
|
||||
|
||||
impl Div for IntKey {
|
||||
type Output = Self;
|
||||
fn div(self, rhs: Self) -> Self::Output {
|
||||
IntKey(self.0 / rhs.0)
|
||||
}
|
||||
}
|
||||
|
||||
impl Add for IntKey {
|
||||
type Output = Self;
|
||||
fn add(self, rhs: Self) -> Self::Output {
|
||||
IntKey(self.0 + rhs.0)
|
||||
}
|
||||
}
|
||||
|
||||
impl Sub for IntKey {
|
||||
type Output = Self;
|
||||
fn sub(self, rhs: Self) -> Self::Output {
|
||||
IntKey(self.0 - rhs.0)
|
||||
}
|
||||
}
|
||||
|
||||
impl Mul for IntKey {
|
||||
type Output = Self;
|
||||
fn mul(self, rhs: Self) -> Self::Output {
|
||||
IntKey(self.0 * rhs.0)
|
||||
}
|
||||
}
|
||||
|
||||
impl One for IntKey {
|
||||
fn one() -> Self {
|
||||
IntKey(i256::ONE)
|
||||
}
|
||||
}
|
||||
|
||||
impl Zero for IntKey {
|
||||
fn zero() -> Self {
|
||||
IntKey(i256::ZERO)
|
||||
}
|
||||
fn is_zero(&self) -> bool {
|
||||
self.0 == i256::ZERO
|
||||
}
|
||||
}
|
||||
|
||||
impl Num for IntKey {
|
||||
type FromStrRadixErr = <i128 as Num>::FromStrRadixErr;
|
||||
fn from_str_radix(str: &str, radix: u32) -> Result<Self, Self::FromStrRadixErr> {
|
||||
Ok(IntKey(i256::from(i128::from_str_radix(str, radix)?)))
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: ?Sized> PartialEq for LayerRTreeObject<T> {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
// FIXME: ptr_eq might fail to return true for 'dyn'
|
||||
// references. Clippy complains about this. In practice it
|
||||
// seems to work, the assertion below would be triggered
|
||||
// otherwise but this ought to be fixed.
|
||||
#[allow(clippy::vtable_address_comparisons)]
|
||||
Arc::ptr_eq(&self.layer, &other.layer)
|
||||
}
|
||||
}
|
||||
|
||||
impl<L> RTreeObject for LayerRTreeObject<L>
|
||||
where
|
||||
L: ?Sized,
|
||||
{
|
||||
type Envelope = AABB<[IntKey; 2]>;
|
||||
fn envelope(&self) -> Self::Envelope {
|
||||
self.envelope
|
||||
}
|
||||
}
|
||||
|
||||
impl<L> LayerRTreeObject<L>
|
||||
where
|
||||
L: ?Sized + Layer,
|
||||
{
|
||||
fn new(layer: Arc<L>) -> Self {
|
||||
let key_range = layer.get_key_range();
|
||||
let lsn_range = layer.get_lsn_range();
|
||||
|
||||
let envelope = AABB::from_corners(
|
||||
[
|
||||
IntKey::from(key_range.start.to_i128()),
|
||||
IntKey::from(lsn_range.start.0 as i128),
|
||||
],
|
||||
[
|
||||
IntKey::from(key_range.end.to_i128() - 1),
|
||||
IntKey::from(lsn_range.end.0 as i128 - 1),
|
||||
], // AABB::upper is inclusive, while `key_range.end` and `lsn_range.end` are exclusive
|
||||
);
|
||||
LayerRTreeObject { layer, envelope }
|
||||
}
|
||||
}
|
||||
|
||||
/// Return value of LayerMap::search
|
||||
pub struct SearchResult<L: ?Sized> {
|
||||
pub layer: Arc<L>,
|
||||
@@ -247,141 +77,120 @@ pub struct SearchResult<L: ?Sized> {
|
||||
|
||||
impl<L> LayerMap<L>
|
||||
where
|
||||
L: ?Sized + Layer,
|
||||
L: ?Sized + Layer + PartialEq,
|
||||
{
|
||||
///
|
||||
/// Find the latest layer that covers the given 'key', with lsn <
|
||||
/// 'end_lsn'.
|
||||
/// Find the latest layer (by lsn.end) that covers the given
|
||||
/// 'key', with lsn.start < 'end_lsn'.
|
||||
///
|
||||
/// Returns the layer, if any, and an 'lsn_floor' value that
|
||||
/// indicates which portion of the layer the caller should
|
||||
/// check. 'lsn_floor' is normally the start-LSN of the layer, but
|
||||
/// can be greater if there is an overlapping layer that might
|
||||
/// contain the version, even if it's missing from the returned
|
||||
/// layer.
|
||||
/// The caller of this function is the page reconstruction
|
||||
/// algorithm looking for the next relevant delta layer, or
|
||||
/// the terminal image layer. The caller will pass the lsn_floor
|
||||
/// value as end_lsn in the next call to search.
|
||||
///
|
||||
/// If there's an image layer exactly below the given end_lsn,
|
||||
/// search should return that layer regardless if there are
|
||||
/// overlapping deltas.
|
||||
///
|
||||
/// If the latest layer is a delta and there is an overlapping
|
||||
/// image with it below, the lsn_floor returned should be right
|
||||
/// above that image so we don't skip it in the search. Otherwise
|
||||
/// the lsn_floor returned should be the bottom of the delta layer
|
||||
/// because we should make as much progress down the lsn axis
|
||||
/// as possible. It's fine if this way we skip some overlapping
|
||||
/// deltas, because the delta we returned would contain the same
|
||||
/// wal content.
|
||||
///
|
||||
/// TODO: This API is convoluted and inefficient. If the caller
|
||||
/// makes N search calls, we'll end up finding the same latest
|
||||
/// image layer N times. We should either cache the latest image
|
||||
/// layer result, or simplify the api to `get_latest_image` and
|
||||
/// `get_latest_delta`, and only call `get_latest_image` once.
|
||||
///
|
||||
/// NOTE: This only searches the 'historic' layers, *not* the
|
||||
/// 'open' and 'frozen' layers!
|
||||
///
|
||||
pub fn search(&self, key: Key, end_lsn: Lsn) -> Option<SearchResult<L>> {
|
||||
// Find the latest image layer that covers the given key
|
||||
let mut latest_img: Option<Arc<L>> = None;
|
||||
let mut latest_img_lsn: Option<Lsn> = None;
|
||||
let envelope = AABB::from_corners(
|
||||
[IntKey::from(key.to_i128()), IntKey::from(0i128)],
|
||||
[
|
||||
IntKey::from(key.to_i128()),
|
||||
IntKey::from(end_lsn.0 as i128 - 1),
|
||||
],
|
||||
);
|
||||
for e in self
|
||||
.historic_layers
|
||||
.locate_in_envelope_intersecting(&envelope)
|
||||
{
|
||||
let l = &e.layer;
|
||||
if l.is_incremental() {
|
||||
continue;
|
||||
}
|
||||
assert!(l.get_key_range().contains(&key));
|
||||
let img_lsn = l.get_lsn_range().start;
|
||||
assert!(img_lsn < end_lsn);
|
||||
if Lsn(img_lsn.0 + 1) == end_lsn {
|
||||
// found exact match
|
||||
return Some(SearchResult {
|
||||
layer: Arc::clone(l),
|
||||
lsn_floor: img_lsn,
|
||||
});
|
||||
}
|
||||
if img_lsn > latest_img_lsn.unwrap_or(Lsn(0)) {
|
||||
latest_img = Some(Arc::clone(l));
|
||||
latest_img_lsn = Some(img_lsn);
|
||||
}
|
||||
}
|
||||
let version = self.historic.get().unwrap().get_version(end_lsn.0 - 1)?;
|
||||
let latest_delta = version.delta_coverage.query(key.to_i128());
|
||||
let latest_image = version.image_coverage.query(key.to_i128());
|
||||
|
||||
// Search the delta layers
|
||||
let mut latest_delta: Option<Arc<L>> = None;
|
||||
for e in self
|
||||
.historic_layers
|
||||
.locate_in_envelope_intersecting(&envelope)
|
||||
{
|
||||
let l = &e.layer;
|
||||
if !l.is_incremental() {
|
||||
continue;
|
||||
match (latest_delta, latest_image) {
|
||||
(None, None) => None,
|
||||
(None, Some(image)) => {
|
||||
let lsn_floor = image.get_lsn_range().start;
|
||||
Some(SearchResult {
|
||||
layer: image,
|
||||
lsn_floor,
|
||||
})
|
||||
}
|
||||
assert!(l.get_key_range().contains(&key));
|
||||
if l.get_lsn_range().start >= end_lsn {
|
||||
info!(
|
||||
"Candidate delta layer {}..{} is too new for lsn {}",
|
||||
l.get_lsn_range().start,
|
||||
l.get_lsn_range().end,
|
||||
end_lsn
|
||||
);
|
||||
(Some(delta), None) => {
|
||||
let lsn_floor = delta.get_lsn_range().start;
|
||||
Some(SearchResult {
|
||||
layer: delta,
|
||||
lsn_floor,
|
||||
})
|
||||
}
|
||||
assert!(l.get_lsn_range().start < end_lsn);
|
||||
if l.get_lsn_range().end >= end_lsn {
|
||||
// this layer contains the requested point in the key/lsn space.
|
||||
// No need to search any further
|
||||
trace!(
|
||||
"found layer {} for request on {key} at {end_lsn}",
|
||||
l.short_id(),
|
||||
);
|
||||
latest_delta.replace(Arc::clone(l));
|
||||
break;
|
||||
}
|
||||
if l.get_lsn_range().end > latest_img_lsn.unwrap_or(Lsn(0)) {
|
||||
// this layer's end LSN is smaller than the requested point. If there's
|
||||
// nothing newer, this is what we need to return. Remember this.
|
||||
if let Some(old_candidate) = &latest_delta {
|
||||
if l.get_lsn_range().end > old_candidate.get_lsn_range().end {
|
||||
latest_delta.replace(Arc::clone(l));
|
||||
}
|
||||
(Some(delta), Some(image)) => {
|
||||
let img_lsn = image.get_lsn_range().start;
|
||||
let image_is_newer = image.get_lsn_range().end > delta.get_lsn_range().end;
|
||||
let image_exact_match = Lsn(img_lsn.0 + 1) == end_lsn;
|
||||
if image_is_newer || image_exact_match {
|
||||
Some(SearchResult {
|
||||
layer: image,
|
||||
lsn_floor: img_lsn,
|
||||
})
|
||||
} else {
|
||||
latest_delta.replace(Arc::clone(l));
|
||||
let lsn_floor =
|
||||
std::cmp::max(delta.get_lsn_range().start, image.get_lsn_range().start + 1);
|
||||
Some(SearchResult {
|
||||
layer: delta,
|
||||
lsn_floor,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
if let Some(l) = latest_delta {
|
||||
trace!(
|
||||
"found (old) layer {} for request on {key} at {end_lsn}",
|
||||
l.short_id(),
|
||||
);
|
||||
let lsn_floor = std::cmp::max(
|
||||
Lsn(latest_img_lsn.unwrap_or(Lsn(0)).0 + 1),
|
||||
l.get_lsn_range().start,
|
||||
);
|
||||
Some(SearchResult {
|
||||
lsn_floor,
|
||||
layer: l,
|
||||
})
|
||||
} else if let Some(l) = latest_img {
|
||||
trace!("found img layer and no deltas for request on {key} at {end_lsn}");
|
||||
Some(SearchResult {
|
||||
lsn_floor: latest_img_lsn.unwrap(),
|
||||
layer: l,
|
||||
})
|
||||
} else {
|
||||
trace!("no layer found for request on {key} at {end_lsn}");
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// Insert an on-disk layer
|
||||
///
|
||||
pub fn insert_historic(&mut self, layer: Arc<L>) {
|
||||
let kr = layer.get_key_range();
|
||||
let lr = layer.get_lsn_range();
|
||||
self.historic.insert(
|
||||
kr.start.to_i128()..kr.end.to_i128(),
|
||||
lr.start.0..lr.end.0,
|
||||
Arc::clone(&layer),
|
||||
!layer.is_incremental(),
|
||||
);
|
||||
|
||||
if layer.get_key_range() == (Key::MIN..Key::MAX) {
|
||||
self.l0_delta_layers.push(layer.clone());
|
||||
self.l0_delta_layers.push(layer);
|
||||
}
|
||||
self.historic_layers.insert(LayerRTreeObject::new(layer));
|
||||
|
||||
NUM_ONDISK_LAYERS.inc();
|
||||
}
|
||||
|
||||
/// Must be called after a batch of insert_historic calls, before querying
|
||||
pub fn rebuild_index(&mut self) {
|
||||
self.historic.rebuild();
|
||||
}
|
||||
|
||||
///
|
||||
/// Remove an on-disk layer from the map.
|
||||
///
|
||||
/// This should be called when the corresponding file on disk has been deleted.
|
||||
///
|
||||
pub fn remove_historic(&mut self, layer: Arc<L>) {
|
||||
let kr = layer.get_key_range();
|
||||
let lr = layer.get_lsn_range();
|
||||
self.historic.remove(
|
||||
kr.start.to_i128()..kr.end.to_i128(),
|
||||
lr.start.0..lr.end.0,
|
||||
!layer.is_incremental(),
|
||||
);
|
||||
|
||||
if layer.get_key_range() == (Key::MIN..Key::MAX) {
|
||||
let len_before = self.l0_delta_layers.len();
|
||||
|
||||
@@ -394,96 +203,51 @@ where
|
||||
.retain(|other| !Arc::ptr_eq(other, &layer));
|
||||
assert_eq!(self.l0_delta_layers.len(), len_before - 1);
|
||||
}
|
||||
assert!(self
|
||||
.historic_layers
|
||||
.remove(&LayerRTreeObject::new(layer))
|
||||
.is_some());
|
||||
|
||||
NUM_ONDISK_LAYERS.dec();
|
||||
}
|
||||
|
||||
/// Is there a newer image layer for given key- and LSN-range?
|
||||
/// Is there a newer image layer for given key- and LSN-range? Or a set
|
||||
/// of image layers within the specified lsn range that cover the entire
|
||||
/// specified key range?
|
||||
///
|
||||
/// This is used for garbage collection, to determine if an old layer can
|
||||
/// be deleted.
|
||||
pub fn image_layer_exists(
|
||||
&self,
|
||||
key_range: &Range<Key>,
|
||||
lsn_range: &Range<Lsn>,
|
||||
) -> Result<bool> {
|
||||
let mut range_remain = key_range.clone();
|
||||
pub fn image_layer_exists(&self, key: &Range<Key>, lsn: &Range<Lsn>) -> Result<bool> {
|
||||
if key.is_empty() {
|
||||
return Ok(true);
|
||||
}
|
||||
|
||||
loop {
|
||||
let mut made_progress = false;
|
||||
let envelope = AABB::from_corners(
|
||||
[
|
||||
IntKey::from(range_remain.start.to_i128()),
|
||||
IntKey::from(lsn_range.start.0 as i128),
|
||||
],
|
||||
[
|
||||
IntKey::from(range_remain.end.to_i128() - 1),
|
||||
IntKey::from(lsn_range.end.0 as i128 - 1),
|
||||
],
|
||||
);
|
||||
for e in self
|
||||
.historic_layers
|
||||
.locate_in_envelope_intersecting(&envelope)
|
||||
{
|
||||
let l = &e.layer;
|
||||
if l.is_incremental() {
|
||||
continue;
|
||||
}
|
||||
let img_lsn = l.get_lsn_range().start;
|
||||
if l.get_key_range().contains(&range_remain.start) && lsn_range.contains(&img_lsn) {
|
||||
made_progress = true;
|
||||
let img_key_end = l.get_key_range().end;
|
||||
let version = match self.historic.get().unwrap().get_version(lsn.end.0) {
|
||||
Some(v) => v,
|
||||
None => return Ok(false),
|
||||
};
|
||||
|
||||
if img_key_end >= range_remain.end {
|
||||
return Ok(true);
|
||||
}
|
||||
range_remain.start = img_key_end;
|
||||
}
|
||||
}
|
||||
let start = key.start.to_i128();
|
||||
let end = key.end.to_i128();
|
||||
|
||||
if !made_progress {
|
||||
let layer_covers = |layer: Option<Arc<L>>| match layer {
|
||||
Some(layer) => layer.get_lsn_range().start >= lsn.start,
|
||||
None => false,
|
||||
};
|
||||
|
||||
// Check the start is covered
|
||||
if !layer_covers(version.image_coverage.query(start)) {
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
// Check after all changes of coverage
|
||||
for (_, change_val) in version.image_coverage.range(start..end) {
|
||||
if !layer_covers(change_val) {
|
||||
return Ok(false);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
pub fn iter_historic_layers(&self) -> impl '_ + Iterator<Item = Arc<L>> {
|
||||
self.historic_layers.iter().map(|e| e.layer.clone())
|
||||
}
|
||||
|
||||
/// Find the last image layer that covers 'key', ignoring any image layers
|
||||
/// newer than 'lsn'.
|
||||
fn find_latest_image(&self, key: Key, lsn: Lsn) -> Option<Arc<L>> {
|
||||
let mut candidate_lsn = Lsn(0);
|
||||
let mut candidate = None;
|
||||
let envelope = AABB::from_corners(
|
||||
[IntKey::from(key.to_i128()), IntKey::from(0)],
|
||||
[IntKey::from(key.to_i128()), IntKey::from(lsn.0 as i128)],
|
||||
);
|
||||
for e in self
|
||||
.historic_layers
|
||||
.locate_in_envelope_intersecting(&envelope)
|
||||
{
|
||||
let l = &e.layer;
|
||||
if l.is_incremental() {
|
||||
continue;
|
||||
}
|
||||
|
||||
assert!(l.get_key_range().contains(&key));
|
||||
let this_lsn = l.get_lsn_range().start;
|
||||
assert!(this_lsn <= lsn);
|
||||
if this_lsn < candidate_lsn {
|
||||
// our previous candidate was better
|
||||
continue;
|
||||
}
|
||||
candidate_lsn = this_lsn;
|
||||
candidate = Some(Arc::clone(l));
|
||||
}
|
||||
|
||||
candidate
|
||||
self.historic.iter()
|
||||
}
|
||||
|
||||
///
|
||||
@@ -499,87 +263,239 @@ where
|
||||
key_range: &Range<Key>,
|
||||
lsn: Lsn,
|
||||
) -> Result<Vec<(Range<Key>, Option<Arc<L>>)>> {
|
||||
let mut points = vec![key_range.start];
|
||||
let envelope = AABB::from_corners(
|
||||
[IntKey::from(key_range.start.to_i128()), IntKey::from(0)],
|
||||
[
|
||||
IntKey::from(key_range.end.to_i128()),
|
||||
IntKey::from(lsn.0 as i128),
|
||||
],
|
||||
);
|
||||
for e in self
|
||||
.historic_layers
|
||||
.locate_in_envelope_intersecting(&envelope)
|
||||
{
|
||||
let l = &e.layer;
|
||||
assert!(l.get_lsn_range().start <= lsn);
|
||||
let range = l.get_key_range();
|
||||
if key_range.contains(&range.start) {
|
||||
points.push(l.get_key_range().start);
|
||||
}
|
||||
if key_range.contains(&range.end) {
|
||||
points.push(l.get_key_range().end);
|
||||
}
|
||||
// TODO I'm 80% sure the lsn bound is inclusive. Double-check that
|
||||
// and document it. Do the same for image_layer_exists and count_deltas
|
||||
let version = match self.historic.get().unwrap().get_version(lsn.0) {
|
||||
Some(v) => v,
|
||||
None => return Ok(vec![]),
|
||||
};
|
||||
|
||||
let start = key_range.start.to_i128();
|
||||
let end = key_range.end.to_i128();
|
||||
|
||||
// Initialize loop variables
|
||||
let mut coverage: Vec<(Range<Key>, Option<Arc<L>>)> = vec![];
|
||||
let mut current_key = start;
|
||||
let mut current_val = version.image_coverage.query(start);
|
||||
|
||||
// Loop through the change events and push intervals
|
||||
for (change_key, change_val) in version.image_coverage.range(start..end) {
|
||||
let kr = Key::from_i128(current_key)..Key::from_i128(change_key);
|
||||
coverage.push((kr, current_val.take()));
|
||||
current_key = change_key;
|
||||
current_val = change_val.clone();
|
||||
}
|
||||
points.push(key_range.end);
|
||||
|
||||
points.sort();
|
||||
points.dedup();
|
||||
// Add the final interval
|
||||
let kr = Key::from_i128(current_key)..Key::from_i128(end);
|
||||
coverage.push((kr, current_val.take()));
|
||||
|
||||
// Ok, we now have a list of "interesting" points in the key space
|
||||
|
||||
// For each range between the points, find the latest image
|
||||
let mut start = *points.first().unwrap();
|
||||
let mut ranges = Vec::new();
|
||||
for end in points[1..].iter() {
|
||||
let img = self.find_latest_image(start, lsn);
|
||||
|
||||
ranges.push((start..*end, img));
|
||||
|
||||
start = *end;
|
||||
}
|
||||
Ok(ranges)
|
||||
Ok(coverage)
|
||||
}
|
||||
|
||||
/// Count how many L1 delta layers there are that overlap with the
|
||||
/// given key and LSN range.
|
||||
pub fn count_deltas(&self, key_range: &Range<Key>, lsn_range: &Range<Lsn>) -> Result<usize> {
|
||||
let mut result = 0;
|
||||
if lsn_range.start >= lsn_range.end {
|
||||
/// Count the height of the tallest stack of deltas in this 2d region.
|
||||
///
|
||||
/// If `limit` is provided we don't try to count above that number.
|
||||
///
|
||||
/// This number is used to compute the largest number of deltas that
|
||||
/// we'll need to visit for any page reconstruction in this region.
|
||||
/// We use this heuristic to decide whether to create an image layer.
|
||||
pub fn count_deltas(
|
||||
&self,
|
||||
key: &Range<Key>,
|
||||
lsn: &Range<Lsn>,
|
||||
limit: Option<usize>,
|
||||
) -> Result<usize> {
|
||||
// We get the delta coverage of the region, and for each part of the coverage
|
||||
// we recurse right underneath the delta. The recursion depth is limited by
|
||||
// the largest result this function could return, which is in practice between
|
||||
// 3 and 10 (since we usually try to create an image when the number gets larger).
|
||||
|
||||
if lsn.is_empty() || key.is_empty() || limit == Some(0) {
|
||||
return Ok(0);
|
||||
}
|
||||
let envelope = AABB::from_corners(
|
||||
[
|
||||
IntKey::from(key_range.start.to_i128()),
|
||||
IntKey::from(lsn_range.start.0 as i128),
|
||||
],
|
||||
[
|
||||
IntKey::from(key_range.end.to_i128() - 1),
|
||||
IntKey::from(lsn_range.end.0 as i128 - 1),
|
||||
],
|
||||
);
|
||||
for e in self
|
||||
.historic_layers
|
||||
.locate_in_envelope_intersecting(&envelope)
|
||||
{
|
||||
let l = &e.layer;
|
||||
if !l.is_incremental() {
|
||||
continue;
|
||||
}
|
||||
assert!(range_overlaps(&l.get_lsn_range(), lsn_range));
|
||||
assert!(range_overlaps(&l.get_key_range(), key_range));
|
||||
|
||||
// We ignore level0 delta layers. Unless the whole keyspace fits
|
||||
// into one partition
|
||||
if !range_eq(key_range, &(Key::MIN..Key::MAX))
|
||||
&& range_eq(&l.get_key_range(), &(Key::MIN..Key::MAX))
|
||||
{
|
||||
continue;
|
||||
let version = match self.historic.get().unwrap().get_version(lsn.end.0 - 1) {
|
||||
Some(v) => v,
|
||||
None => return Ok(0),
|
||||
};
|
||||
|
||||
let start = key.start.to_i128();
|
||||
let end = key.end.to_i128();
|
||||
|
||||
// Initialize loop variables
|
||||
let mut max_stacked_deltas = 0;
|
||||
let mut current_key = start;
|
||||
let mut current_val = version.delta_coverage.query(start);
|
||||
|
||||
// Loop through the delta coverage and recurse on each part
|
||||
for (change_key, change_val) in version.delta_coverage.range(start..end) {
|
||||
// If there's a relevant delta in this part, add 1 and recurse down
|
||||
if let Some(val) = current_val {
|
||||
if val.get_lsn_range().end > lsn.start {
|
||||
let kr = Key::from_i128(current_key)..Key::from_i128(change_key);
|
||||
let lr = lsn.start..val.get_lsn_range().start;
|
||||
if !kr.is_empty() {
|
||||
let new_limit = limit.map(|l| l - 1);
|
||||
let max_stacked_deltas_underneath =
|
||||
self.count_deltas(&kr, &lr, new_limit)?;
|
||||
max_stacked_deltas =
|
||||
std::cmp::max(max_stacked_deltas, 1 + max_stacked_deltas_underneath);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
result += 1;
|
||||
current_key = change_key;
|
||||
current_val = change_val.clone();
|
||||
}
|
||||
Ok(result)
|
||||
|
||||
// Consider the last part
|
||||
if let Some(val) = current_val {
|
||||
if val.get_lsn_range().end > lsn.start {
|
||||
let kr = Key::from_i128(current_key)..Key::from_i128(end);
|
||||
let lr = lsn.start..val.get_lsn_range().start;
|
||||
|
||||
if !kr.is_empty() {
|
||||
let new_limit = limit.map(|l| l - 1);
|
||||
let max_stacked_deltas_underneath = self.count_deltas(&kr, &lr, new_limit)?;
|
||||
max_stacked_deltas =
|
||||
std::cmp::max(max_stacked_deltas, 1 + max_stacked_deltas_underneath);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(max_stacked_deltas)
|
||||
}
|
||||
|
||||
/// Count how many layers we need to visit for given key-lsn pair.
|
||||
///
|
||||
/// Used as a helper for correctness checks only. Performance not critical.
|
||||
pub fn get_difficulty(&self, lsn: Lsn, key: Key) -> usize {
|
||||
match self.search(key, lsn) {
|
||||
Some(search_result) => {
|
||||
if search_result.layer.is_incremental() {
|
||||
1 + self.get_difficulty(search_result.lsn_floor, key)
|
||||
} else {
|
||||
0
|
||||
}
|
||||
}
|
||||
None => 0,
|
||||
}
|
||||
}
|
||||
|
||||
/// Used for correctness checking. Results are expected to be identical to
|
||||
/// self.get_difficulty_map. Assumes self.search is correct.
|
||||
pub fn get_difficulty_map_bruteforce(
|
||||
&self,
|
||||
lsn: Lsn,
|
||||
partitioning: &KeyPartitioning,
|
||||
) -> Vec<usize> {
|
||||
// Looking at the difficulty as a function of key, it could only increase
|
||||
// when a delta layer starts or an image layer ends. Therefore it's sufficient
|
||||
// to check the difficulties at:
|
||||
// - the key.start for each non-empty part range
|
||||
// - the key.start for each delta
|
||||
// - the key.end for each image
|
||||
let keys_iter: Box<dyn Iterator<Item = Key>> = {
|
||||
let mut keys: Vec<Key> = self
|
||||
.iter_historic_layers()
|
||||
.map(|layer| {
|
||||
if layer.is_incremental() {
|
||||
layer.get_key_range().start
|
||||
} else {
|
||||
layer.get_key_range().end
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
keys.sort();
|
||||
Box::new(keys.into_iter())
|
||||
};
|
||||
let mut keys_iter = keys_iter.peekable();
|
||||
|
||||
// Iter the partition and keys together and query all the necessary
|
||||
// keys, computing the max difficulty for each part.
|
||||
partitioning
|
||||
.parts
|
||||
.iter()
|
||||
.map(|part| {
|
||||
let mut difficulty = 0;
|
||||
// Partition ranges are assumed to be sorted and disjoint
|
||||
// TODO assert it
|
||||
for range in &part.ranges {
|
||||
if !range.is_empty() {
|
||||
difficulty =
|
||||
std::cmp::max(difficulty, self.get_difficulty(lsn, range.start));
|
||||
}
|
||||
while let Some(key) = keys_iter.peek() {
|
||||
if key >= &range.end {
|
||||
break;
|
||||
}
|
||||
let key = keys_iter.next().unwrap();
|
||||
if key < range.start {
|
||||
continue;
|
||||
}
|
||||
difficulty = std::cmp::max(difficulty, self.get_difficulty(lsn, key));
|
||||
}
|
||||
}
|
||||
difficulty
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// For each part of a keyspace partitioning, return the maximum number of layers
|
||||
/// that would be needed for page reconstruction in that part at the given LSN.
|
||||
///
|
||||
/// If `limit` is provided we don't try to count above that number.
|
||||
///
|
||||
/// This method is used to decide where to create new image layers. Computing the
|
||||
/// result for the entire partitioning at once allows this function to be more
|
||||
/// efficient, and further optimization is possible by using iterators instead,
|
||||
/// to allow early return.
|
||||
///
|
||||
/// TODO actually use this method instead of count_deltas. Currently we only use
|
||||
/// it for benchmarks.
|
||||
pub fn get_difficulty_map(
|
||||
&self,
|
||||
lsn: Lsn,
|
||||
partitioning: &KeyPartitioning,
|
||||
limit: Option<usize>,
|
||||
) -> Vec<usize> {
|
||||
// TODO This is a naive implementation. Perf improvements to do:
|
||||
// 1. Instead of calling self.image_coverage and self.count_deltas,
|
||||
// iterate the image and delta coverage only once.
|
||||
partitioning
|
||||
.parts
|
||||
.iter()
|
||||
.map(|part| {
|
||||
let mut difficulty = 0;
|
||||
for range in &part.ranges {
|
||||
if limit == Some(difficulty) {
|
||||
break;
|
||||
}
|
||||
for (img_range, last_img) in self
|
||||
.image_coverage(range, lsn)
|
||||
.expect("why would this err?")
|
||||
{
|
||||
if limit == Some(difficulty) {
|
||||
break;
|
||||
}
|
||||
let img_lsn = if let Some(last_img) = last_img {
|
||||
last_img.get_lsn_range().end
|
||||
} else {
|
||||
Lsn(0)
|
||||
};
|
||||
|
||||
if img_lsn < lsn {
|
||||
let num_deltas = self
|
||||
.count_deltas(&img_range, &(img_lsn..lsn), limit)
|
||||
.expect("why would this err lol?");
|
||||
difficulty = std::cmp::max(difficulty, num_deltas);
|
||||
}
|
||||
}
|
||||
}
|
||||
difficulty
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Return all L0 delta layers
|
||||
@@ -603,8 +519,8 @@ where
|
||||
}
|
||||
|
||||
println!("historic_layers:");
|
||||
for e in self.historic_layers.iter() {
|
||||
e.layer.dump(verbose)?;
|
||||
for layer in self.iter_historic_layers() {
|
||||
layer.dump(verbose)?;
|
||||
}
|
||||
println!("End dump LayerMap");
|
||||
Ok(())
|
||||
|
||||
418
pageserver/src/tenant/layer_map/historic_layer_coverage.rs
Normal file
418
pageserver/src/tenant/layer_map/historic_layer_coverage.rs
Normal file
@@ -0,0 +1,418 @@
|
||||
use std::collections::BTreeMap;
|
||||
use std::ops::Range;
|
||||
|
||||
use super::layer_coverage::LayerCoverageTuple;
|
||||
|
||||
/// Efficiently queryable layer coverage for each LSN.
|
||||
///
|
||||
/// Allows answering layer map queries very efficiently,
|
||||
/// but doesn't allow retroactive insertion, which is
|
||||
/// sometimes necessary. See BufferedHistoricLayerCoverage.
|
||||
pub struct HistoricLayerCoverage<Value> {
|
||||
/// The latest state
|
||||
head: LayerCoverageTuple<Value>,
|
||||
|
||||
/// All previous states
|
||||
historic: BTreeMap<u64, LayerCoverageTuple<Value>>,
|
||||
}
|
||||
|
||||
impl<T: Clone + PartialEq> Default for HistoricLayerCoverage<T> {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
impl<Value: Clone + PartialEq> HistoricLayerCoverage<Value> {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
head: LayerCoverageTuple::default(),
|
||||
historic: BTreeMap::default(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Add a layer
|
||||
///
|
||||
/// Panics if new layer has older lsn.start than an existing layer.
|
||||
/// See BufferedHistoricLayerCoverage for a more general insertion method.
|
||||
pub fn insert(&mut self, key: Range<i128>, lsn: Range<u64>, value: Value, is_image: bool) {
|
||||
// It's only a persistent map, not a retroactive one
|
||||
if let Some(last_entry) = self.historic.iter().rev().next() {
|
||||
let last_lsn = last_entry.0;
|
||||
if lsn.start == *last_lsn {
|
||||
// TODO there are edge cases to take care of
|
||||
}
|
||||
if lsn.start < *last_lsn {
|
||||
panic!("unexpected retroactive insert");
|
||||
}
|
||||
}
|
||||
|
||||
// Insert into data structure
|
||||
if is_image {
|
||||
self.head.image_coverage.insert(key, lsn.clone(), value);
|
||||
} else {
|
||||
self.head.delta_coverage.insert(key, lsn.clone(), value);
|
||||
}
|
||||
|
||||
// Remember history. Clone is O(1)
|
||||
self.historic.insert(lsn.start, self.head.clone());
|
||||
}
|
||||
|
||||
/// Query at a particular LSN, inclusive
|
||||
pub fn get_version(&self, lsn: u64) -> Option<&LayerCoverageTuple<Value>> {
|
||||
match self.historic.range(..=lsn).rev().next() {
|
||||
Some((_, v)) => Some(v),
|
||||
None => None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Remove all entries after a certain LSN
|
||||
pub fn trim(&mut self, begin: &u64) {
|
||||
self.historic.split_off(begin);
|
||||
self.head = self
|
||||
.historic
|
||||
.iter()
|
||||
.rev()
|
||||
.next()
|
||||
.map(|(_, v)| v.clone())
|
||||
.unwrap_or_default();
|
||||
}
|
||||
}
|
||||
|
||||
/// This is the most basic test that demonstrates intended usage.
|
||||
/// All layers in this test have height 1.
|
||||
#[test]
|
||||
fn test_persistent_simple() {
|
||||
let mut map = HistoricLayerCoverage::<String>::new();
|
||||
map.insert(0..5, 100..101, "Layer 1".to_string(), true);
|
||||
map.insert(3..9, 110..111, "Layer 2".to_string(), true);
|
||||
map.insert(5..6, 120..121, "Layer 3".to_string(), true);
|
||||
|
||||
// After Layer 1 insertion
|
||||
let version = map.get_version(105).unwrap();
|
||||
assert_eq!(version.image_coverage.query(1), Some("Layer 1".to_string()));
|
||||
assert_eq!(version.image_coverage.query(4), Some("Layer 1".to_string()));
|
||||
|
||||
// After Layer 2 insertion
|
||||
let version = map.get_version(115).unwrap();
|
||||
assert_eq!(version.image_coverage.query(4), Some("Layer 2".to_string()));
|
||||
assert_eq!(version.image_coverage.query(8), Some("Layer 2".to_string()));
|
||||
assert_eq!(version.image_coverage.query(11), None);
|
||||
|
||||
// After Layer 3 insertion
|
||||
let version = map.get_version(125).unwrap();
|
||||
assert_eq!(version.image_coverage.query(4), Some("Layer 2".to_string()));
|
||||
assert_eq!(version.image_coverage.query(5), Some("Layer 3".to_string()));
|
||||
assert_eq!(version.image_coverage.query(7), Some("Layer 2".to_string()));
|
||||
}
|
||||
|
||||
/// Cover simple off-by-one edge cases
|
||||
#[test]
|
||||
fn test_off_by_one() {
|
||||
let mut map = HistoricLayerCoverage::<String>::new();
|
||||
map.insert(3..5, 100..110, "Layer 1".to_string(), true);
|
||||
|
||||
// Check different LSNs
|
||||
let version = map.get_version(99);
|
||||
assert!(version.is_none());
|
||||
let version = map.get_version(100).unwrap();
|
||||
assert_eq!(version.image_coverage.query(4), Some("Layer 1".to_string()));
|
||||
let version = map.get_version(110).unwrap();
|
||||
assert_eq!(version.image_coverage.query(4), Some("Layer 1".to_string()));
|
||||
|
||||
// Check different keys
|
||||
let version = map.get_version(105).unwrap();
|
||||
assert_eq!(version.image_coverage.query(2), None);
|
||||
assert_eq!(version.image_coverage.query(3), Some("Layer 1".to_string()));
|
||||
assert_eq!(version.image_coverage.query(4), Some("Layer 1".to_string()));
|
||||
assert_eq!(version.image_coverage.query(5), None);
|
||||
}
|
||||
|
||||
/// Cover edge cases where layers begin or end on the same key
|
||||
#[test]
|
||||
fn test_key_collision() {
|
||||
let mut map = HistoricLayerCoverage::<String>::new();
|
||||
|
||||
map.insert(3..5, 100..110, "Layer 10".to_string(), true);
|
||||
map.insert(5..8, 100..110, "Layer 11".to_string(), true);
|
||||
|
||||
map.insert(3..4, 200..210, "Layer 20".to_string(), true);
|
||||
|
||||
// Check after layer 11
|
||||
let version = map.get_version(105).unwrap();
|
||||
assert_eq!(version.image_coverage.query(2), None);
|
||||
assert_eq!(
|
||||
version.image_coverage.query(3),
|
||||
Some("Layer 10".to_string())
|
||||
);
|
||||
assert_eq!(
|
||||
version.image_coverage.query(5),
|
||||
Some("Layer 11".to_string())
|
||||
);
|
||||
assert_eq!(
|
||||
version.image_coverage.query(7),
|
||||
Some("Layer 11".to_string())
|
||||
);
|
||||
assert_eq!(version.image_coverage.query(8), None);
|
||||
|
||||
// Check after layer 20
|
||||
let version = map.get_version(205).unwrap();
|
||||
assert_eq!(version.image_coverage.query(2), None);
|
||||
assert_eq!(
|
||||
version.image_coverage.query(3),
|
||||
Some("Layer 20".to_string())
|
||||
);
|
||||
assert_eq!(
|
||||
version.image_coverage.query(5),
|
||||
Some("Layer 11".to_string())
|
||||
);
|
||||
assert_eq!(
|
||||
version.image_coverage.query(7),
|
||||
Some("Layer 11".to_string())
|
||||
);
|
||||
assert_eq!(version.image_coverage.query(8), None);
|
||||
}
|
||||
|
||||
/// Test when rectangles have nontrivial height and possibly overlap
|
||||
#[test]
|
||||
fn test_persistent_overlapping() {
|
||||
let mut map = HistoricLayerCoverage::<String>::new();
|
||||
|
||||
// Add 3 key-disjoint layers with varying LSN ranges
|
||||
map.insert(1..2, 100..200, "Layer 1".to_string(), true);
|
||||
map.insert(4..5, 110..200, "Layer 2".to_string(), true);
|
||||
map.insert(7..8, 120..300, "Layer 3".to_string(), true);
|
||||
|
||||
// Add wide and short layer
|
||||
map.insert(0..9, 130..199, "Layer 4".to_string(), true);
|
||||
|
||||
// Add wide layer taller than some
|
||||
map.insert(0..9, 140..201, "Layer 5".to_string(), true);
|
||||
|
||||
// Add wide layer taller than all
|
||||
map.insert(0..9, 150..301, "Layer 6".to_string(), true);
|
||||
|
||||
// After layer 4 insertion
|
||||
let version = map.get_version(135).unwrap();
|
||||
assert_eq!(version.image_coverage.query(0), Some("Layer 4".to_string()));
|
||||
assert_eq!(version.image_coverage.query(1), Some("Layer 1".to_string()));
|
||||
assert_eq!(version.image_coverage.query(2), Some("Layer 4".to_string()));
|
||||
assert_eq!(version.image_coverage.query(4), Some("Layer 2".to_string()));
|
||||
assert_eq!(version.image_coverage.query(5), Some("Layer 4".to_string()));
|
||||
assert_eq!(version.image_coverage.query(7), Some("Layer 3".to_string()));
|
||||
assert_eq!(version.image_coverage.query(8), Some("Layer 4".to_string()));
|
||||
|
||||
// After layer 5 insertion
|
||||
let version = map.get_version(145).unwrap();
|
||||
assert_eq!(version.image_coverage.query(0), Some("Layer 5".to_string()));
|
||||
assert_eq!(version.image_coverage.query(1), Some("Layer 5".to_string()));
|
||||
assert_eq!(version.image_coverage.query(2), Some("Layer 5".to_string()));
|
||||
assert_eq!(version.image_coverage.query(4), Some("Layer 5".to_string()));
|
||||
assert_eq!(version.image_coverage.query(5), Some("Layer 5".to_string()));
|
||||
assert_eq!(version.image_coverage.query(7), Some("Layer 3".to_string()));
|
||||
assert_eq!(version.image_coverage.query(8), Some("Layer 5".to_string()));
|
||||
|
||||
// After layer 6 insertion
|
||||
let version = map.get_version(155).unwrap();
|
||||
assert_eq!(version.image_coverage.query(0), Some("Layer 6".to_string()));
|
||||
assert_eq!(version.image_coverage.query(1), Some("Layer 6".to_string()));
|
||||
assert_eq!(version.image_coverage.query(2), Some("Layer 6".to_string()));
|
||||
assert_eq!(version.image_coverage.query(4), Some("Layer 6".to_string()));
|
||||
assert_eq!(version.image_coverage.query(5), Some("Layer 6".to_string()));
|
||||
assert_eq!(version.image_coverage.query(7), Some("Layer 6".to_string()));
|
||||
assert_eq!(version.image_coverage.query(8), Some("Layer 6".to_string()));
|
||||
}
|
||||
|
||||
/// Wrapper for HistoricLayerCoverage that allows us to hack around the lack
|
||||
/// of support for retroactive insertion by rebuilding the map since the
|
||||
/// change.
|
||||
///
|
||||
/// Why is this needed? We most often insert new layers with newer LSNs,
|
||||
/// but during compaction we create layers with non-latest LSN, and during
|
||||
/// GC we delete historic layers.
|
||||
///
|
||||
/// Even though rebuilding is an expensive (N log N) solution to the problem,
|
||||
/// it's not critical since we do something equally expensive just to decide
|
||||
/// whether or not to create new image layers.
|
||||
///
|
||||
/// If this becomes an actual bottleneck, one solution would be to build a
|
||||
/// segment tree that holds PersistentLayerMaps. Though this would mean that
|
||||
/// we take an additional log(N) performance hit for queries, which will probably
|
||||
/// still be more critical.
|
||||
///
|
||||
/// See this for more on persistent and retroactive techniques:
|
||||
/// https://www.youtube.com/watch?v=WqCWghETNDc&t=581s
|
||||
pub struct BufferedHistoricLayerCoverage<Value> {
|
||||
/// A persistent layer map that we rebuild when we need to retroactively update
|
||||
historic_coverage: HistoricLayerCoverage<Value>,
|
||||
|
||||
/// We buffer insertion into the PersistentLayerMap to decrease the number of rebuilds.
|
||||
///
|
||||
/// We implicitly assume that layers are identified by their key and lsn range.
|
||||
/// A value of None means we want to delete this item.
|
||||
buffer: BTreeMap<(u64, u64, i128, i128, bool), Option<Value>>,
|
||||
|
||||
/// All current layers. This is not used for search. Only to make rebuilds easier.
|
||||
///
|
||||
/// We implicitly assume that layers are identified by their key and lsn range.
|
||||
layers: BTreeMap<(u64, u64, i128, i128, bool), Value>,
|
||||
}
|
||||
|
||||
impl<T: std::fmt::Debug> std::fmt::Debug for BufferedHistoricLayerCoverage<T> {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("RetroactiveLayerMap")
|
||||
.field("buffer", &self.buffer)
|
||||
.field("layers", &self.layers)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Clone + PartialEq> Default for BufferedHistoricLayerCoverage<T> {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
impl<Value: Clone + PartialEq> BufferedHistoricLayerCoverage<Value> {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
historic_coverage: HistoricLayerCoverage::<Value>::new(),
|
||||
buffer: BTreeMap::new(),
|
||||
layers: BTreeMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn insert(&mut self, key: Range<i128>, lsn: Range<u64>, value: Value, is_image: bool) {
|
||||
self.buffer.insert(
|
||||
(lsn.start, lsn.end, key.start, key.end, is_image),
|
||||
Some(value),
|
||||
);
|
||||
}
|
||||
|
||||
pub fn remove(&mut self, key: Range<i128>, lsn: Range<u64>, is_image: bool) {
|
||||
self.buffer
|
||||
.insert((lsn.start, lsn.end, key.start, key.end, is_image), None);
|
||||
}
|
||||
|
||||
pub fn rebuild(&mut self) {
|
||||
// Find the first LSN that needs to be rebuilt
|
||||
let rebuild_since: u64 = match self.buffer.iter().next() {
|
||||
Some(((lsn_start, _, _, _, _), _)) => *lsn_start,
|
||||
None => return, // No need to rebuild if buffer is empty
|
||||
};
|
||||
|
||||
// Apply buffered updates to self.layers
|
||||
self.buffer.retain(|rect, layer| {
|
||||
match layer {
|
||||
Some(l) => {
|
||||
let existing = self.layers.insert(*rect, l.clone());
|
||||
if existing.is_some() {
|
||||
// TODO this happened once. Investigate.
|
||||
// panic!("can't overwrite layer");
|
||||
}
|
||||
}
|
||||
None => {
|
||||
let existing = self.layers.remove(rect);
|
||||
if existing.is_none() {
|
||||
panic!("invalid layer deletion");
|
||||
}
|
||||
}
|
||||
};
|
||||
false
|
||||
});
|
||||
|
||||
// Rebuild
|
||||
self.historic_coverage.trim(&rebuild_since);
|
||||
for ((lsn_start, lsn_end, key_start, key_end, is_image), layer) in
|
||||
self.layers.range((rebuild_since, 0, 0, 0, false)..)
|
||||
{
|
||||
self.historic_coverage.insert(
|
||||
*key_start..*key_end,
|
||||
*lsn_start..*lsn_end,
|
||||
layer.clone(),
|
||||
*is_image,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/// Iterate all the layers
|
||||
pub fn iter(&self) -> impl '_ + Iterator<Item = Value> {
|
||||
// NOTE we can actually perform this without rebuilding,
|
||||
// but it's not necessary for now.
|
||||
if !self.buffer.is_empty() {
|
||||
panic!("rebuild pls")
|
||||
}
|
||||
|
||||
self.layers.values().cloned()
|
||||
}
|
||||
|
||||
/// Return a reference to a queryable map, assuming all updates
|
||||
/// have already been processed using self.rebuild()
|
||||
pub fn get(&self) -> anyhow::Result<&HistoricLayerCoverage<Value>> {
|
||||
// NOTE we error here instead of implicitly rebuilding because
|
||||
// rebuilding is somewhat expensive.
|
||||
// TODO maybe implicitly rebuild and log/sentry an error?
|
||||
if !self.buffer.is_empty() {
|
||||
anyhow::bail!("rebuild required")
|
||||
}
|
||||
|
||||
Ok(&self.historic_coverage)
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_retroactive_regression_1() {
|
||||
let mut map = BufferedHistoricLayerCoverage::new();
|
||||
|
||||
map.insert(
|
||||
0..21267647932558653966460912964485513215,
|
||||
23761336..23761457,
|
||||
"sdfsdfs".to_string(),
|
||||
false,
|
||||
);
|
||||
|
||||
map.rebuild();
|
||||
|
||||
let version = map.get().unwrap().get_version(23761457).unwrap();
|
||||
assert_eq!(
|
||||
version.delta_coverage.query(100),
|
||||
Some("sdfsdfs".to_string())
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_retroactive_simple() {
|
||||
let mut map = BufferedHistoricLayerCoverage::new();
|
||||
|
||||
// Append some images in increasing LSN order
|
||||
map.insert(0..5, 100..101, "Image 1".to_string(), true);
|
||||
map.insert(3..9, 110..111, "Image 2".to_string(), true);
|
||||
map.insert(4..6, 120..121, "Image 3".to_string(), true);
|
||||
map.insert(8..9, 120..121, "Image 4".to_string(), true);
|
||||
|
||||
// Add a delta layer out of order
|
||||
map.insert(2..5, 105..106, "Delta 1".to_string(), true);
|
||||
|
||||
// Rebuild so we can start querying
|
||||
map.rebuild();
|
||||
|
||||
// Query key 4
|
||||
let version = map.get().unwrap().get_version(90);
|
||||
assert!(version.is_none());
|
||||
let version = map.get().unwrap().get_version(102).unwrap();
|
||||
assert_eq!(version.image_coverage.query(4), Some("Image 1".to_string()));
|
||||
let version = map.get().unwrap().get_version(107).unwrap();
|
||||
assert_eq!(version.image_coverage.query(4), Some("Delta 1".to_string()));
|
||||
let version = map.get().unwrap().get_version(115).unwrap();
|
||||
assert_eq!(version.image_coverage.query(4), Some("Image 2".to_string()));
|
||||
let version = map.get().unwrap().get_version(125).unwrap();
|
||||
assert_eq!(version.image_coverage.query(4), Some("Image 3".to_string()));
|
||||
|
||||
// Remove Image 3
|
||||
map.remove(4..6, 120..121, true);
|
||||
map.rebuild();
|
||||
|
||||
// Check deletion worked
|
||||
let version = map.get().unwrap().get_version(125).unwrap();
|
||||
assert_eq!(version.image_coverage.query(4), Some("Image 2".to_string()));
|
||||
assert_eq!(version.image_coverage.query(8), Some("Image 4".to_string()));
|
||||
}
|
||||
200
pageserver/src/tenant/layer_map/layer_coverage.rs
Normal file
200
pageserver/src/tenant/layer_map/layer_coverage.rs
Normal file
@@ -0,0 +1,200 @@
|
||||
use std::ops::Range;
|
||||
|
||||
// TODO the `im` crate has 20x more downloads and also has
|
||||
// persistent/immutable BTree. It also runs a bit faster but
|
||||
// results are not the same on some tests.
|
||||
use rpds::RedBlackTreeMapSync;
|
||||
use im::OrdMap;
|
||||
|
||||
/// Data structure that can efficiently:
|
||||
/// - find the latest layer by lsn.end at a given key
|
||||
/// - iterate the latest layers in a key range
|
||||
/// - insert layers in non-decreasing lsn.start order
|
||||
///
|
||||
/// The struct is parameterized over Value for easier
|
||||
/// testing, but in practice it's some sort of layer.
|
||||
pub struct LayerCoverage<Value> {
|
||||
/// For every change in coverage (as we sweep the key space)
|
||||
/// we store (lsn.end, value).
|
||||
///
|
||||
/// We use an immutable/persistent tree so that we can keep historic
|
||||
/// versions of this coverage without cloning the whole thing and
|
||||
/// incurring quadratic memory cost. See HistoricLayerCoverage.
|
||||
///
|
||||
/// We use the Sync version of the map because we want Self to
|
||||
/// be Sync. Using nonsync might be faster, if we can work with
|
||||
/// that.
|
||||
nodes: RedBlackTreeMapSync<i128, Option<(u64, Value)>>,
|
||||
|
||||
im: OrdMap<i128, Option<(u64, Value)>>,
|
||||
}
|
||||
|
||||
impl<T: Clone + PartialEq> Default for LayerCoverage<T> {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
impl<Value: Clone + PartialEq> LayerCoverage<Value> {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
nodes: RedBlackTreeMapSync::default(),
|
||||
im: OrdMap::default(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Helper function to subdivide the key range without changing any values
|
||||
///
|
||||
/// Complexity: O(log N)
|
||||
fn add_node(&mut self, key: i128) {
|
||||
let value = match self.nodes.range(..=key).last() {
|
||||
Some((_, Some(v))) => Some(v.clone()),
|
||||
Some((_, None)) => None,
|
||||
None => None,
|
||||
};
|
||||
self.nodes.insert_mut(key, value);
|
||||
|
||||
let im_value = match self.im.range(..=key).last() {
|
||||
Some((_, Some(v))) => Some(v.clone()),
|
||||
Some((_, None)) => None,
|
||||
None => None,
|
||||
};
|
||||
self.im.insert(key, im_value);
|
||||
}
|
||||
|
||||
/// Insert a layer.
|
||||
///
|
||||
/// Complexity: worst case O(N), in practice O(log N). See not in implementation.
|
||||
pub fn insert(&mut self, key: Range<i128>, lsn: Range<u64>, value: Value) {
|
||||
// NOTE The order of the following lines is important!!
|
||||
|
||||
// Add nodes at endpoints
|
||||
self.add_node(key.start);
|
||||
self.add_node(key.end);
|
||||
|
||||
// Raise the height where necessary
|
||||
//
|
||||
// NOTE This loop is worst case O(N), but amortized O(log N) in the special
|
||||
// case when rectangles have no height. In practice I don't think we'll see
|
||||
// the kind of layer intersections needed to trigger O(N) behavior. The worst
|
||||
// case is N/2 horizontal layers overlapped with N/2 vertical layers in a
|
||||
// grid pattern.
|
||||
let mut to_update = Vec::new();
|
||||
let mut to_remove = Vec::new();
|
||||
let mut prev_covered = false;
|
||||
for (k, node) in self.nodes.range(key.clone()) {
|
||||
let needs_cover = match node {
|
||||
None => true,
|
||||
Some((h, _)) => h < &lsn.end,
|
||||
};
|
||||
if needs_cover {
|
||||
match prev_covered {
|
||||
true => to_remove.push(*k),
|
||||
false => to_update.push(*k),
|
||||
}
|
||||
}
|
||||
prev_covered = needs_cover;
|
||||
}
|
||||
if !prev_covered {
|
||||
to_remove.push(key.end);
|
||||
}
|
||||
for k in to_update {
|
||||
self.nodes.insert_mut(k, Some((lsn.end, value.clone())));
|
||||
}
|
||||
for k in to_remove {
|
||||
self.nodes.remove_mut(&k);
|
||||
}
|
||||
|
||||
let mut to_update = Vec::new();
|
||||
let mut to_remove = Vec::new();
|
||||
let mut prev_covered = false;
|
||||
for (k, node) in self.im.range(key.clone()) {
|
||||
let needs_cover = match node {
|
||||
None => true,
|
||||
Some((h, _)) => h < &lsn.end,
|
||||
};
|
||||
if needs_cover {
|
||||
match prev_covered {
|
||||
true => to_remove.push(*k),
|
||||
false => to_update.push(*k),
|
||||
}
|
||||
}
|
||||
prev_covered = needs_cover;
|
||||
}
|
||||
if !prev_covered {
|
||||
to_remove.push(key.end);
|
||||
}
|
||||
for k in to_update {
|
||||
self.im.insert(k, Some((lsn.end, value.clone())));
|
||||
}
|
||||
for k in to_remove {
|
||||
self.im.remove(&k);
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the latest (by lsn.end) layer at a given key
|
||||
///
|
||||
/// Complexity: O(log N)
|
||||
pub fn query(&self, key: i128) -> Option<Value> {
|
||||
let res = self.nodes
|
||||
.range(..=key)
|
||||
.rev()
|
||||
.next()?
|
||||
.1
|
||||
.as_ref()
|
||||
.map(|(_, v)| v.clone());
|
||||
let im_res = self.im
|
||||
.range(..=key)
|
||||
.rev()
|
||||
.next()?
|
||||
.1
|
||||
.as_ref()
|
||||
.map(|(_, v)| v.clone());
|
||||
if res != im_res {
|
||||
panic!("aha");
|
||||
}
|
||||
im_res
|
||||
}
|
||||
|
||||
/// Iterate the changes in layer coverage in a given range. You will likely
|
||||
/// want to start with self.query(key.start), and then follow up with self.range
|
||||
///
|
||||
/// Complexity: O(log N + result_size)
|
||||
pub fn range(&self, key: Range<i128>) -> impl '_ + Iterator<Item = (i128, Option<Value>)> {
|
||||
self.nodes
|
||||
.range(key)
|
||||
.map(|(k, v)| (*k, v.as_ref().map(|x| x.1.clone())))
|
||||
}
|
||||
|
||||
/// O(1) clone
|
||||
pub fn clone(&self) -> Self {
|
||||
Self {
|
||||
nodes: self.nodes.clone(),
|
||||
im: self.im.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Image and delta coverage at a specific LSN.
|
||||
pub struct LayerCoverageTuple<Value> {
|
||||
pub image_coverage: LayerCoverage<Value>,
|
||||
pub delta_coverage: LayerCoverage<Value>,
|
||||
}
|
||||
|
||||
impl<T: Clone + PartialEq> Default for LayerCoverageTuple<T> {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
image_coverage: LayerCoverage::default(),
|
||||
delta_coverage: LayerCoverage::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<Value: Clone + PartialEq> LayerCoverageTuple<Value> {
|
||||
pub fn clone(&self) -> Self {
|
||||
Self {
|
||||
image_coverage: self.image_coverage.clone(),
|
||||
delta_coverage: self.delta_coverage.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -187,6 +187,12 @@ pub trait PersistentLayer: Layer {
|
||||
fn file_size(&self) -> Option<u64>;
|
||||
}
|
||||
|
||||
impl PartialEq for dyn PersistentLayer {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
self.filename().eq(&other.filename())
|
||||
}
|
||||
}
|
||||
|
||||
pub fn downcast_remote_layer(
|
||||
layer: &Arc<dyn PersistentLayer>,
|
||||
) -> Option<std::sync::Arc<RemoteLayer>> {
|
||||
@@ -196,3 +202,56 @@ pub fn downcast_remote_layer(
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for dyn Layer {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("Layer")
|
||||
.field("short_id", &self.short_id())
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
/// Holds metadata about a layer without any content. Used mostly for testing.
|
||||
pub struct LayerDescriptor {
|
||||
pub key: Range<Key>,
|
||||
pub lsn: Range<Lsn>,
|
||||
pub is_incremental: bool,
|
||||
pub short_id: String,
|
||||
}
|
||||
|
||||
impl PartialEq for LayerDescriptor {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
self.short_id == other.short_id
|
||||
}
|
||||
}
|
||||
|
||||
impl Layer for LayerDescriptor {
|
||||
fn get_key_range(&self) -> Range<Key> {
|
||||
self.key.clone()
|
||||
}
|
||||
|
||||
fn get_lsn_range(&self) -> Range<Lsn> {
|
||||
self.lsn.clone()
|
||||
}
|
||||
|
||||
fn is_incremental(&self) -> bool {
|
||||
self.is_incremental
|
||||
}
|
||||
|
||||
fn get_value_reconstruct_data(
|
||||
&self,
|
||||
_key: Key,
|
||||
_lsn_range: Range<Lsn>,
|
||||
_reconstruct_data: &mut ValueReconstructState,
|
||||
) -> Result<ValueReconstructResult> {
|
||||
todo!("This method shouldn't be part of the Layer trait")
|
||||
}
|
||||
|
||||
fn short_id(&self) -> String {
|
||||
self.short_id.clone()
|
||||
}
|
||||
|
||||
fn dump(&self, _verbose: bool) -> Result<()> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1148,6 +1148,7 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
layers.rebuild_index();
|
||||
layers.next_open_layer_at = Some(Lsn(disk_consistent_lsn.0) + 1);
|
||||
|
||||
info!(
|
||||
@@ -1210,7 +1211,11 @@ impl Timeline {
|
||||
anyhow::bail!("could not rename file {local_layer_path:?}: {err:?}");
|
||||
} else {
|
||||
self.metrics.resident_physical_size_gauge.sub(local_size);
|
||||
self.layers.write().unwrap().remove_historic(local_layer);
|
||||
{
|
||||
let mut layers = self.layers.write().unwrap();
|
||||
layers.remove_historic(local_layer);
|
||||
layers.rebuild_index();
|
||||
}
|
||||
// fall-through to adding the remote layer
|
||||
}
|
||||
} else {
|
||||
@@ -1252,7 +1257,11 @@ impl Timeline {
|
||||
);
|
||||
let remote_layer = Arc::new(remote_layer);
|
||||
|
||||
self.layers.write().unwrap().insert_historic(remote_layer);
|
||||
{
|
||||
let mut layers = self.layers.write().unwrap();
|
||||
layers.insert_historic(remote_layer);
|
||||
layers.rebuild_index();
|
||||
}
|
||||
}
|
||||
LayerFileName::Delta(deltafilename) => {
|
||||
// Create a RemoteLayer for the delta file.
|
||||
@@ -1275,7 +1284,11 @@ impl Timeline {
|
||||
&remote_layer_metadata,
|
||||
);
|
||||
let remote_layer = Arc::new(remote_layer);
|
||||
self.layers.write().unwrap().insert_historic(remote_layer);
|
||||
{
|
||||
let mut layers = self.layers.write().unwrap();
|
||||
layers.insert_historic(remote_layer);
|
||||
layers.rebuild_index();
|
||||
}
|
||||
}
|
||||
#[cfg(test)]
|
||||
LayerFileName::Test(_) => unreachable!(),
|
||||
@@ -2185,6 +2198,7 @@ impl Timeline {
|
||||
{
|
||||
let mut layers = self.layers.write().unwrap();
|
||||
layers.insert_historic(Arc::new(new_delta));
|
||||
layers.rebuild_index();
|
||||
}
|
||||
|
||||
// update the timeline's physical size
|
||||
@@ -2249,13 +2263,15 @@ impl Timeline {
|
||||
// are some delta layers *later* than current 'lsn', if more WAL was processed and flushed
|
||||
// after we read last_record_lsn, which is passed here in the 'lsn' argument.
|
||||
if img_lsn < lsn {
|
||||
let num_deltas = layers.count_deltas(&img_range, &(img_lsn..lsn))?;
|
||||
let threshold = self.get_image_creation_threshold();
|
||||
let num_deltas =
|
||||
layers.count_deltas(&img_range, &(img_lsn..lsn), Some(threshold))?;
|
||||
|
||||
debug!(
|
||||
"key range {}-{}, has {} deltas on this timeline in LSN range {}..{}",
|
||||
img_range.start, img_range.end, num_deltas, img_lsn, lsn
|
||||
);
|
||||
if num_deltas >= self.get_image_creation_threshold() {
|
||||
if num_deltas >= threshold {
|
||||
return Ok(true);
|
||||
}
|
||||
}
|
||||
@@ -2360,6 +2376,7 @@ impl Timeline {
|
||||
.add(metadata.len());
|
||||
layers.insert_historic(Arc::new(l));
|
||||
}
|
||||
layers.rebuild_index();
|
||||
drop(layers);
|
||||
timer.stop_and_record();
|
||||
|
||||
@@ -2691,6 +2708,7 @@ impl Timeline {
|
||||
l.delete()?;
|
||||
layers.remove_historic(l);
|
||||
}
|
||||
layers.rebuild_index();
|
||||
drop(layers);
|
||||
|
||||
// Also schedule the deletions in remote storage
|
||||
@@ -3004,6 +3022,7 @@ impl Timeline {
|
||||
remote_client.schedule_layer_file_deletion(&layer_names_to_delete)?;
|
||||
}
|
||||
}
|
||||
layers.rebuild_index();
|
||||
|
||||
info!(
|
||||
"GC completed removing {} layers, cutoff {}",
|
||||
@@ -3163,6 +3182,7 @@ impl Timeline {
|
||||
layers.remove_historic(l);
|
||||
}
|
||||
layers.insert_historic(new_layer);
|
||||
layers.rebuild_index();
|
||||
drop(layers);
|
||||
|
||||
// Now that we've inserted the download into the layer map,
|
||||
|
||||
@@ -172,8 +172,10 @@ def get_scales_matrix(default: int = 10) -> List[int]:
|
||||
@pytest.mark.parametrize("duration", get_durations_matrix())
|
||||
def test_pgbench(neon_with_baseline: PgCompare, scale: int, duration: int):
|
||||
run_test_pgbench(neon_with_baseline, scale, duration, PgBenchLoadType.INIT)
|
||||
run_test_pgbench(neon_with_baseline, scale, duration, PgBenchLoadType.SIMPLE_UPDATE)
|
||||
run_test_pgbench(neon_with_baseline, scale, duration, PgBenchLoadType.SELECT_ONLY)
|
||||
run_test_pgbench(neon_with_baseline, scale, duration, PgBenchLoadType.INIT)
|
||||
run_test_pgbench(neon_with_baseline, scale, duration, PgBenchLoadType.INIT)
|
||||
# run_test_pgbench(neon_with_baseline, scale, duration, PgBenchLoadType.SIMPLE_UPDATE)
|
||||
# run_test_pgbench(neon_with_baseline, scale, duration, PgBenchLoadType.SELECT_ONLY)
|
||||
|
||||
|
||||
# The following 3 tests run on an existing database as it was set up by previous tests,
|
||||
|
||||
@@ -25,6 +25,7 @@ futures-task = { version = "0.3", default-features = false, features = ["alloc",
|
||||
futures-util = { version = "0.3", features = ["alloc", "async-await", "async-await-macro", "channel", "futures-channel", "futures-io", "futures-macro", "futures-sink", "io", "memchr", "sink", "slab", "std"] }
|
||||
hashbrown = { version = "0.12", features = ["ahash", "inline-more", "raw"] }
|
||||
indexmap = { version = "1", default-features = false, features = ["std"] }
|
||||
itertools = { version = "0.10", features = ["use_alloc", "use_std"] }
|
||||
libc = { version = "0.2", features = ["extra_traits", "std"] }
|
||||
log = { version = "0.4", default-features = false, features = ["serde", "std"] }
|
||||
memchr = { version = "2", features = ["std"] }
|
||||
@@ -53,6 +54,7 @@ bytes = { version = "1", features = ["serde", "std"] }
|
||||
either = { version = "1", features = ["use_std"] }
|
||||
hashbrown = { version = "0.12", features = ["ahash", "inline-more", "raw"] }
|
||||
indexmap = { version = "1", default-features = false, features = ["std"] }
|
||||
itertools = { version = "0.10", features = ["use_alloc", "use_std"] }
|
||||
libc = { version = "0.2", features = ["extra_traits", "std"] }
|
||||
log = { version = "0.4", default-features = false, features = ["serde", "std"] }
|
||||
memchr = { version = "2", features = ["std"] }
|
||||
|
||||
Reference in New Issue
Block a user