Compare commits

...

57 Commits

Author SHA1 Message Date
Bojan Serafimov
6e478a4fc0 Fix 2023-01-13 00:52:10 -05:00
Bojan Serafimov
e796521bb7 compare 2023-01-13 00:50:20 -05:00
Bojan Serafimov
76055c7bbc clippy 2023-01-12 20:16:33 -05:00
Bojan Serafimov
fe0851f2c4 clippy 2023-01-12 20:07:08 -05:00
Bojan Serafimov
21c85b1969 cargo hakari generate 2023-01-12 19:47:33 -05:00
Bojan Serafimov
5dc99f1b04 Fmt 2023-01-12 19:46:51 -05:00
Bojan Serafimov
2fcbb46338 Remove false assertion 2023-01-12 18:52:15 -05:00
Bojan Serafimov
87c3e55449 Add early return to get_difficulty_map for 10x speedup 2023-01-12 13:23:59 -05:00
Bojan Serafimov
9886741828 Compare against bruteforce, fix bugs 2023-01-11 18:04:44 -05:00
Bojan Serafimov
5557fc6062 Add todo 2023-01-11 14:02:55 -05:00
Bojan Serafimov
dbb5d0800d Fix lsn bound 2023-01-11 13:59:26 -05:00
Bojan Serafimov
e392d25828 Improve image_layer_exists docstring 2023-01-11 12:54:13 -05:00
Bojan Serafimov
2951be5386 Improve search docstring 2023-01-11 12:52:15 -05:00
Bojan Serafimov
7c6af6d729 Comments 2023-01-10 21:43:32 -05:00
Bojan Serafimov
cb5b9375d2 Organize modules, rename structs 2023-01-10 21:03:26 -05:00
Bojan Serafimov
d3d17f2c7c Simplify 2023-01-10 17:30:40 -05:00
Bojan Serafimov
c3f5e00ad1 Comments 2023-01-10 15:32:18 -05:00
Bojan Serafimov
d0095d4457 Comments 2023-01-10 15:02:06 -05:00
Bojan Serafimov
7d057e1038 Simplify 2023-01-10 11:52:01 -05:00
Bojan Serafimov
f22437086e Rename file 2023-01-10 11:09:02 -05:00
Bojan Serafimov
7c6909c31f Merge branch 'main' into immutable_bst_layer_map 2023-01-10 11:02:45 -05:00
Bojan Serafimov
a2642966f2 Add arbitrary key partitioning for benchmark 2023-01-05 20:21:19 -05:00
Bojan Serafimov
3d6bc126ed Partially implement get_difficulty_map bench 2023-01-05 03:23:10 -05:00
Bojan Serafimov
fb6569c880 Implement get_difficulty_map 2023-01-05 02:50:48 -05:00
Bojan Serafimov
115549261c Add get_difficulty_map method 2023-01-05 01:58:18 -05:00
Bojan Serafimov
01e09fc56c Cleanup bench_layer_map 2023-01-05 01:35:36 -05:00
Bojan Serafimov
8618f1d225 Remove DummyImage and DummyDelta 2023-01-03 23:54:14 -05:00
Bojan Serafimov
6f374d24be Merge branch 'main' into immutable_bst_layer_map 2023-01-03 18:02:20 -05:00
Bojan Serafimov
f0a23b474c Fix a count_deltas bug 2022-12-29 14:30:04 -05:00
Bojan Serafimov
6bfa99e0c4 Fix infinite recursion 2022-12-28 21:30:35 -05:00
Bojan Serafimov
bfeaee90ca implement image_layer_exists (not tested) 2022-12-28 19:22:10 -05:00
Bojan Serafimov
d835d8c853 Implement count_deltas (untested, probably buggy) 2022-12-28 17:51:06 -05:00
Bojan Serafimov
3b40d7b61a Big refactor (WIP) 2022-12-28 14:12:41 -05:00
Bojan Serafimov
2737e33565 Implement image_coverage (untested) 2022-12-27 15:10:56 -05:00
Bojan Serafimov
5fb9619ab0 Rebuild only once on load 2022-12-27 13:39:23 -05:00
Bojan Serafimov
de548acefc Fix bug with negative keys 2022-12-13 22:46:13 -05:00
Bojan Serafimov
db72f432e5 Remove 2/3 clones on search path 2022-12-13 13:42:53 -05:00
Bojan Serafimov
5a6284f6f8 Remove Arc<Vec<_>> 2022-12-13 13:16:13 -05:00
Bojan Serafimov
c11c19d568 Check for exact match 2022-12-12 20:25:24 -05:00
Bojan Serafimov
d9a239475c Fix off-by-one 2022-12-12 16:31:22 -05:00
Bojan Serafimov
a4311bd961 Implement deletion 2022-12-12 16:26:36 -05:00
Bojan Serafimov
e0f23242be Add todo 2022-12-12 14:00:04 -05:00
Bojan Serafimov
0de1ec14e0 Add error context 2022-12-12 13:55:56 -05:00
Bojan Serafimov
0053ccac13 Add live correctness comparison 2022-12-12 13:48:54 -05:00
Bojan Serafimov
b6686d63b0 Add layers.rebuild_index() calls 2022-12-12 13:26:52 -05:00
Bojan Serafimov
edf8a08dcc Implement lsn_floor logic (untested) 2022-12-12 13:09:54 -05:00
Bojan Serafimov
b51d766a44 Add tests 2022-12-12 10:57:03 -05:00
Bojan Serafimov
c9b1655885 Remove sorting hacks 2022-12-12 10:18:01 -05:00
Bojan Serafimov
206ddec636 Add fn rebuild_index() 2022-12-09 23:51:16 -05:00
Bojan Serafimov
cb44698c47 Test and implement lsn.end queries 2022-12-09 17:47:22 -05:00
Bojan Serafimov
af325a30db Update api to use Range 2022-12-09 15:53:03 -05:00
Bojan Serafimov
be499156e5 Add note 2022-12-08 11:28:41 -05:00
Bojan Serafimov
4a11f6e2de WIP sketch for considering lsn_end 2022-12-08 11:17:46 -05:00
Bojan Serafimov
d9190aae87 WIP rebuild API 2022-12-07 18:51:08 -05:00
Bojan Serafimov
6bce11e810 Hacky first attempt to integrate 2022-12-06 14:14:27 -05:00
Bojan Serafimov
39003aa9f3 Simplify 2022-12-02 21:26:13 -05:00
Bojan Serafimov
a8f0d27c92 Benchmark immutable bst layer map 2022-12-02 21:22:09 -05:00
11 changed files with 1405 additions and 739 deletions

419
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -67,7 +67,9 @@ storage_broker = { version = "0.1", path = "../storage_broker" }
tenant_size_model = { path = "../libs/tenant_size_model" }
utils = { path = "../libs/utils" }
workspace_hack = { version = "0.1", path = "../workspace_hack" }
rpds = "0.12.0"
reqwest = { version = "0.11", default-features = false, features = ["rustls-tls"] }
im = "15.1.0"
[dev-dependencies]
criterion = "0.4"

View File

@@ -1,13 +1,12 @@
use anyhow::Result;
use pageserver::keyspace::{KeyPartitioning, KeySpace};
use pageserver::repository::Key;
use pageserver::tenant::layer_map::LayerMap;
use pageserver::tenant::storage_layer::{DeltaFileName, ImageFileName, ValueReconstructState};
use pageserver::tenant::storage_layer::{Layer, ValueReconstructResult};
use pageserver::tenant::storage_layer::Layer;
use pageserver::tenant::storage_layer::{DeltaFileName, ImageFileName, LayerDescriptor};
use rand::prelude::{SeedableRng, SliceRandom, StdRng};
use std::cmp::{max, min};
use std::fs::File;
use std::io::{BufRead, BufReader};
use std::ops::Range;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Arc;
@@ -17,80 +16,8 @@ use utils::lsn::Lsn;
use criterion::{criterion_group, criterion_main, Criterion};
struct DummyDelta {
key_range: Range<Key>,
lsn_range: Range<Lsn>,
}
impl Layer for DummyDelta {
fn get_key_range(&self) -> Range<Key> {
self.key_range.clone()
}
fn get_lsn_range(&self) -> Range<Lsn> {
self.lsn_range.clone()
}
fn get_value_reconstruct_data(
&self,
_key: Key,
_lsn_range: Range<Lsn>,
_reconstruct_data: &mut ValueReconstructState,
) -> Result<ValueReconstructResult> {
panic!()
}
fn is_incremental(&self) -> bool {
true
}
fn dump(&self, _verbose: bool) -> Result<()> {
unimplemented!()
}
fn short_id(&self) -> String {
unimplemented!()
}
}
struct DummyImage {
key_range: Range<Key>,
lsn: Lsn,
}
impl Layer for DummyImage {
fn get_key_range(&self) -> Range<Key> {
self.key_range.clone()
}
fn get_lsn_range(&self) -> Range<Lsn> {
// End-bound is exclusive
self.lsn..(self.lsn + 1)
}
fn get_value_reconstruct_data(
&self,
_key: Key,
_lsn_range: Range<Lsn>,
_reconstruct_data: &mut ValueReconstructState,
) -> Result<ValueReconstructResult> {
panic!()
}
fn is_incremental(&self) -> bool {
false
}
fn dump(&self, _verbose: bool) -> Result<()> {
unimplemented!()
}
fn short_id(&self) -> String {
unimplemented!()
}
}
fn build_layer_map(filename_dump: PathBuf) -> LayerMap<dyn Layer> {
let mut layer_map = LayerMap::<dyn Layer>::default();
fn build_layer_map(filename_dump: PathBuf) -> LayerMap<LayerDescriptor> {
let mut layer_map = LayerMap::<LayerDescriptor>::default();
let mut min_lsn = Lsn(u64::MAX);
let mut max_lsn = Lsn(0);
@@ -100,17 +27,21 @@ fn build_layer_map(filename_dump: PathBuf) -> LayerMap<dyn Layer> {
for fname in filenames {
let fname = &fname.unwrap();
if let Some(imgfilename) = ImageFileName::parse_str(fname) {
let layer = DummyImage {
key_range: imgfilename.key_range,
lsn: imgfilename.lsn,
let layer = LayerDescriptor {
key: imgfilename.key_range,
lsn: imgfilename.lsn..(imgfilename.lsn + 1),
is_incremental: false,
short_id: fname.to_string(),
};
layer_map.insert_historic(Arc::new(layer));
min_lsn = min(min_lsn, imgfilename.lsn);
max_lsn = max(max_lsn, imgfilename.lsn);
} else if let Some(deltafilename) = DeltaFileName::parse_str(fname) {
let layer = DummyDelta {
key_range: deltafilename.key_range,
lsn_range: deltafilename.lsn_range.clone(),
let layer = LayerDescriptor {
key: deltafilename.key_range.clone(),
lsn: deltafilename.lsn_range.clone(),
is_incremental: true,
short_id: fname.to_string(),
};
layer_map.insert_historic(Arc::new(layer));
min_lsn = min(min_lsn, deltafilename.lsn_range.start);
@@ -119,6 +50,7 @@ fn build_layer_map(filename_dump: PathBuf) -> LayerMap<dyn Layer> {
panic!("unexpected filename {fname}");
}
}
layer_map.rebuild_index();
println!("min: {min_lsn}, max: {max_lsn}");
@@ -126,7 +58,7 @@ fn build_layer_map(filename_dump: PathBuf) -> LayerMap<dyn Layer> {
}
/// Construct a layer map query pattern for benchmarks
fn uniform_query_pattern(layer_map: &LayerMap<dyn Layer>) -> Vec<(Key, Lsn)> {
fn uniform_query_pattern(layer_map: &LayerMap<LayerDescriptor>) -> Vec<(Key, Lsn)> {
// For each image layer we query one of the pages contained, at LSN right
// before the image layer was created. This gives us a somewhat uniform
// coverage of both the lsn and key space because image layers have
@@ -150,6 +82,41 @@ fn uniform_query_pattern(layer_map: &LayerMap<dyn Layer>) -> Vec<(Key, Lsn)> {
.collect()
}
// Construct a partitioning for testing get_difficulty map when we
// don't have an exact result of `collect_keyspace` to work with.
fn uniform_key_partitioning(layer_map: &LayerMap<LayerDescriptor>, _lsn: Lsn) -> KeyPartitioning {
let mut parts = Vec::new();
// We add a partition boundary at the start of each image layer,
// no matter what lsn range it covers. This is just the easiest
// thing to do. A better thing to do would be to get a real
// partitioning from some database. Even better, remove the need
// for key partitions by deciding where to create image layers
// directly based on a coverage-based difficulty map.
let mut keys: Vec<_> = layer_map
.iter_historic_layers()
.filter_map(|l| {
if l.is_incremental() {
None
} else {
let kr = l.get_key_range();
Some(kr.start.next())
}
})
.collect();
keys.sort();
let mut current_key = Key::from_hex("000000000000000000000000000000000000").unwrap();
for key in keys {
parts.push(KeySpace {
ranges: vec![current_key..key],
});
current_key = key;
}
KeyPartitioning { parts }
}
// Benchmark using metadata extracted from our performance test environment, from
// a project where we have run pgbench many timmes. The pgbench database was initialized
// between each test run.
@@ -183,24 +150,71 @@ fn bench_from_captest_env(c: &mut Criterion) {
// Benchmark using metadata extracted from a real project that was taknig
// too long processing layer map queries.
fn bench_from_real_project(c: &mut Criterion) {
// TODO consider compressing this file
// Init layer map
let now = Instant::now();
let layer_map = build_layer_map(PathBuf::from("benches/odd-brook-layernames.txt"));
println!("Finished layer map init in {:?}", now.elapsed());
// Choose uniformly distributed queries
let queries: Vec<(Key, Lsn)> = uniform_query_pattern(&layer_map);
// Test with uniform query pattern
c.bench_function("real_map_uniform_queries", |b| {
// Choose inputs for get_difficulty_map
let latest_lsn = layer_map
.iter_historic_layers()
.map(|l| l.get_lsn_range().end)
.max()
.unwrap();
let partitioning = uniform_key_partitioning(&layer_map, latest_lsn);
// Check correctness of get_difficulty_map
// TODO put this in a dedicated test outside of this mod
{
println!("running correctness check");
let now = Instant::now();
let result_bruteforce = layer_map.get_difficulty_map_bruteforce(latest_lsn, &partitioning);
assert!(result_bruteforce.len() == partitioning.parts.len());
println!("Finished bruteforce in {:?}", now.elapsed());
let now = Instant::now();
let result_fast = layer_map.get_difficulty_map(latest_lsn, &partitioning, None);
assert!(result_fast.len() == partitioning.parts.len());
println!("Finished fast in {:?}", now.elapsed());
// Assert results are equal. Manually iterate for easier debugging.
//
// TODO The difficulty numbers are in the 50-80 range. That's pretty bad.
// We should be monitoring it in prod.
let zip = std::iter::zip(
&partitioning.parts,
std::iter::zip(result_bruteforce, result_fast),
);
for (_part, (bruteforce, fast)) in zip {
assert_eq!(bruteforce, fast);
}
println!("No issues found");
}
// Define and name the benchmark function
let mut group = c.benchmark_group("real_map");
group.bench_function("uniform_queries", |b| {
b.iter(|| {
for q in queries.clone().into_iter() {
layer_map.search(q.0, q.1);
}
});
});
group.bench_function("get_difficulty_map", |b| {
b.iter(|| {
layer_map.get_difficulty_map(latest_lsn, &partitioning, Some(3));
});
});
group.finish();
}
// Benchmark using synthetic data. Arrange image layers on stacked diagonal lines.
fn bench_sequential(c: &mut Criterion) {
let mut layer_map: LayerMap<dyn Layer> = LayerMap::default();
// Init layer map. Create 100_000 layers arranged in 1000 diagonal lines.
//
// TODO This code is pretty slow and runs even if we're only running other
@@ -208,39 +222,38 @@ fn bench_sequential(c: &mut Criterion) {
// Putting it inside the `bench_function` closure is not a solution
// because then it runs multiple times during warmup.
let now = Instant::now();
let mut layer_map = LayerMap::default();
for i in 0..100_000 {
// TODO try inserting a super-wide layer in between every 10 to reflect
// what often happens with L1 layers that include non-rel changes.
// Maybe do that as a separate test.
let i32 = (i as u32) % 100;
let zero = Key::from_hex("000000000000000000000000000000000000").unwrap();
let layer = DummyImage {
key_range: zero.add(10 * i32)..zero.add(10 * i32 + 1),
lsn: Lsn(10 * i),
let layer = LayerDescriptor {
key: zero.add(10 * i32)..zero.add(10 * i32 + 1),
lsn: Lsn(i)..Lsn(i + 1),
is_incremental: false,
short_id: format!("Layer {}", i),
};
layer_map.insert_historic(Arc::new(layer));
}
// Manually measure runtime without criterion because criterion
// has a minimum sample size of 10 and I don't want to run it 10 times.
println!("Finished init in {:?}", now.elapsed());
layer_map.rebuild_index();
println!("Finished layer map init in {:?}", now.elapsed());
// Choose 100 uniformly random queries
let rng = &mut StdRng::seed_from_u64(1);
let queries: Vec<(Key, Lsn)> = uniform_query_pattern(&layer_map)
.choose_multiple(rng, 1)
.choose_multiple(rng, 100)
.copied()
.collect();
// Define and name the benchmark function
c.bench_function("sequential_uniform_queries", |b| {
// Run the search queries
let mut group = c.benchmark_group("sequential");
group.bench_function("uniform_queries", |b| {
b.iter(|| {
for q in queries.clone().into_iter() {
layer_map.search(q.0, q.1);
}
});
});
group.finish();
}
criterion_group!(group_1, bench_from_captest_env);

View File

@@ -37,6 +37,17 @@ impl Key {
| self.field6 as i128
}
pub fn from_i128(x: i128) -> Self {
Key {
field1: ((x >> 120) & 0xf) as u8,
field2: ((x >> 104) & 0xFFFF) as u32,
field3: (x >> 72) as u32,
field4: (x >> 40) as u32,
field5: (x >> 32) as u8,
field6: x as u32,
}
}
pub fn next(&self) -> Key {
self.add(1)
}

View File

@@ -10,23 +10,21 @@
//! corresponding files are written to disk.
//!
mod historic_layer_coverage;
mod layer_coverage;
use crate::keyspace::KeyPartitioning;
use crate::metrics::NUM_ONDISK_LAYERS;
use crate::repository::Key;
use crate::tenant::storage_layer::{range_eq, range_overlaps};
use amplify_num::i256;
use crate::tenant::storage_layer::InMemoryLayer;
use crate::tenant::storage_layer::Layer;
use anyhow::Result;
use num_traits::identities::{One, Zero};
use num_traits::{Bounded, Num, Signed};
use rstar::{RTree, RTreeObject, AABB};
use std::cmp::Ordering;
use std::collections::VecDeque;
use std::ops::Range;
use std::ops::{Add, Div, Mul, Neg, Rem, Sub};
use std::sync::Arc;
use tracing::*;
use utils::lsn::Lsn;
use super::storage_layer::{InMemoryLayer, Layer};
use historic_layer_coverage::BufferedHistoricLayerCoverage;
///
/// LayerMap tracks what layers exist on a timeline.
@@ -51,194 +49,26 @@ pub struct LayerMap<L: ?Sized> {
///
pub frozen_layers: VecDeque<Arc<InMemoryLayer>>,
/// All the historic layers are kept here
historic_layers: RTree<LayerRTreeObject<L>>,
/// Index of the historic layers optimized for search
historic: BufferedHistoricLayerCoverage<Arc<L>>,
/// L0 layers have key range Key::MIN..Key::MAX, and locating them using R-Tree search is very inefficient.
/// So L0 layers are held in l0_delta_layers vector, in addition to the R-tree.
l0_delta_layers: Vec<Arc<L>>,
}
impl<L: ?Sized> Default for LayerMap<L> {
impl<L: ?Sized + PartialEq> Default for LayerMap<L> {
fn default() -> Self {
Self {
open_layer: None,
next_open_layer_at: None,
frozen_layers: VecDeque::default(),
historic_layers: RTree::default(),
l0_delta_layers: Vec::default(),
historic: BufferedHistoricLayerCoverage::default(),
}
}
}
struct LayerRTreeObject<L: ?Sized> {
layer: Arc<L>,
envelope: AABB<[IntKey; 2]>,
}
// Representation of Key as numeric type.
// We can not use native implementation of i128, because rstar::RTree
// doesn't handle properly integer overflow during area calculation: sum(Xi*Yi).
// Overflow will cause panic in debug mode and incorrect area calculation in release mode,
// which leads to non-optimally balanced R-Tree (but doesn't fit correctness of R-Tree work).
// By using i256 as the type, even though all the actual values would fit in i128, we can be
// sure that multiplication doesn't overflow.
//
#[derive(Clone, PartialEq, Eq, PartialOrd, Debug)]
struct IntKey(i256);
impl Copy for IntKey {}
impl IntKey {
fn from(i: i128) -> Self {
IntKey(i256::from(i))
}
}
impl Bounded for IntKey {
fn min_value() -> Self {
IntKey(i256::MIN)
}
fn max_value() -> Self {
IntKey(i256::MAX)
}
}
impl Signed for IntKey {
fn is_positive(&self) -> bool {
self.0 > i256::ZERO
}
fn is_negative(&self) -> bool {
self.0 < i256::ZERO
}
fn signum(&self) -> Self {
match self.0.cmp(&i256::ZERO) {
Ordering::Greater => IntKey(i256::ONE),
Ordering::Less => IntKey(-i256::ONE),
Ordering::Equal => IntKey(i256::ZERO),
}
}
fn abs(&self) -> Self {
IntKey(self.0.abs())
}
fn abs_sub(&self, other: &Self) -> Self {
if self.0 <= other.0 {
IntKey(i256::ZERO)
} else {
IntKey(self.0 - other.0)
}
}
}
impl Neg for IntKey {
type Output = Self;
fn neg(self) -> Self::Output {
IntKey(-self.0)
}
}
impl Rem for IntKey {
type Output = Self;
fn rem(self, rhs: Self) -> Self::Output {
IntKey(self.0 % rhs.0)
}
}
impl Div for IntKey {
type Output = Self;
fn div(self, rhs: Self) -> Self::Output {
IntKey(self.0 / rhs.0)
}
}
impl Add for IntKey {
type Output = Self;
fn add(self, rhs: Self) -> Self::Output {
IntKey(self.0 + rhs.0)
}
}
impl Sub for IntKey {
type Output = Self;
fn sub(self, rhs: Self) -> Self::Output {
IntKey(self.0 - rhs.0)
}
}
impl Mul for IntKey {
type Output = Self;
fn mul(self, rhs: Self) -> Self::Output {
IntKey(self.0 * rhs.0)
}
}
impl One for IntKey {
fn one() -> Self {
IntKey(i256::ONE)
}
}
impl Zero for IntKey {
fn zero() -> Self {
IntKey(i256::ZERO)
}
fn is_zero(&self) -> bool {
self.0 == i256::ZERO
}
}
impl Num for IntKey {
type FromStrRadixErr = <i128 as Num>::FromStrRadixErr;
fn from_str_radix(str: &str, radix: u32) -> Result<Self, Self::FromStrRadixErr> {
Ok(IntKey(i256::from(i128::from_str_radix(str, radix)?)))
}
}
impl<T: ?Sized> PartialEq for LayerRTreeObject<T> {
fn eq(&self, other: &Self) -> bool {
// FIXME: ptr_eq might fail to return true for 'dyn'
// references. Clippy complains about this. In practice it
// seems to work, the assertion below would be triggered
// otherwise but this ought to be fixed.
#[allow(clippy::vtable_address_comparisons)]
Arc::ptr_eq(&self.layer, &other.layer)
}
}
impl<L> RTreeObject for LayerRTreeObject<L>
where
L: ?Sized,
{
type Envelope = AABB<[IntKey; 2]>;
fn envelope(&self) -> Self::Envelope {
self.envelope
}
}
impl<L> LayerRTreeObject<L>
where
L: ?Sized + Layer,
{
fn new(layer: Arc<L>) -> Self {
let key_range = layer.get_key_range();
let lsn_range = layer.get_lsn_range();
let envelope = AABB::from_corners(
[
IntKey::from(key_range.start.to_i128()),
IntKey::from(lsn_range.start.0 as i128),
],
[
IntKey::from(key_range.end.to_i128() - 1),
IntKey::from(lsn_range.end.0 as i128 - 1),
], // AABB::upper is inclusive, while `key_range.end` and `lsn_range.end` are exclusive
);
LayerRTreeObject { layer, envelope }
}
}
/// Return value of LayerMap::search
pub struct SearchResult<L: ?Sized> {
pub layer: Arc<L>,
@@ -247,141 +77,120 @@ pub struct SearchResult<L: ?Sized> {
impl<L> LayerMap<L>
where
L: ?Sized + Layer,
L: ?Sized + Layer + PartialEq,
{
///
/// Find the latest layer that covers the given 'key', with lsn <
/// 'end_lsn'.
/// Find the latest layer (by lsn.end) that covers the given
/// 'key', with lsn.start < 'end_lsn'.
///
/// Returns the layer, if any, and an 'lsn_floor' value that
/// indicates which portion of the layer the caller should
/// check. 'lsn_floor' is normally the start-LSN of the layer, but
/// can be greater if there is an overlapping layer that might
/// contain the version, even if it's missing from the returned
/// layer.
/// The caller of this function is the page reconstruction
/// algorithm looking for the next relevant delta layer, or
/// the terminal image layer. The caller will pass the lsn_floor
/// value as end_lsn in the next call to search.
///
/// If there's an image layer exactly below the given end_lsn,
/// search should return that layer regardless if there are
/// overlapping deltas.
///
/// If the latest layer is a delta and there is an overlapping
/// image with it below, the lsn_floor returned should be right
/// above that image so we don't skip it in the search. Otherwise
/// the lsn_floor returned should be the bottom of the delta layer
/// because we should make as much progress down the lsn axis
/// as possible. It's fine if this way we skip some overlapping
/// deltas, because the delta we returned would contain the same
/// wal content.
///
/// TODO: This API is convoluted and inefficient. If the caller
/// makes N search calls, we'll end up finding the same latest
/// image layer N times. We should either cache the latest image
/// layer result, or simplify the api to `get_latest_image` and
/// `get_latest_delta`, and only call `get_latest_image` once.
///
/// NOTE: This only searches the 'historic' layers, *not* the
/// 'open' and 'frozen' layers!
///
pub fn search(&self, key: Key, end_lsn: Lsn) -> Option<SearchResult<L>> {
// Find the latest image layer that covers the given key
let mut latest_img: Option<Arc<L>> = None;
let mut latest_img_lsn: Option<Lsn> = None;
let envelope = AABB::from_corners(
[IntKey::from(key.to_i128()), IntKey::from(0i128)],
[
IntKey::from(key.to_i128()),
IntKey::from(end_lsn.0 as i128 - 1),
],
);
for e in self
.historic_layers
.locate_in_envelope_intersecting(&envelope)
{
let l = &e.layer;
if l.is_incremental() {
continue;
}
assert!(l.get_key_range().contains(&key));
let img_lsn = l.get_lsn_range().start;
assert!(img_lsn < end_lsn);
if Lsn(img_lsn.0 + 1) == end_lsn {
// found exact match
return Some(SearchResult {
layer: Arc::clone(l),
lsn_floor: img_lsn,
});
}
if img_lsn > latest_img_lsn.unwrap_or(Lsn(0)) {
latest_img = Some(Arc::clone(l));
latest_img_lsn = Some(img_lsn);
}
}
let version = self.historic.get().unwrap().get_version(end_lsn.0 - 1)?;
let latest_delta = version.delta_coverage.query(key.to_i128());
let latest_image = version.image_coverage.query(key.to_i128());
// Search the delta layers
let mut latest_delta: Option<Arc<L>> = None;
for e in self
.historic_layers
.locate_in_envelope_intersecting(&envelope)
{
let l = &e.layer;
if !l.is_incremental() {
continue;
match (latest_delta, latest_image) {
(None, None) => None,
(None, Some(image)) => {
let lsn_floor = image.get_lsn_range().start;
Some(SearchResult {
layer: image,
lsn_floor,
})
}
assert!(l.get_key_range().contains(&key));
if l.get_lsn_range().start >= end_lsn {
info!(
"Candidate delta layer {}..{} is too new for lsn {}",
l.get_lsn_range().start,
l.get_lsn_range().end,
end_lsn
);
(Some(delta), None) => {
let lsn_floor = delta.get_lsn_range().start;
Some(SearchResult {
layer: delta,
lsn_floor,
})
}
assert!(l.get_lsn_range().start < end_lsn);
if l.get_lsn_range().end >= end_lsn {
// this layer contains the requested point in the key/lsn space.
// No need to search any further
trace!(
"found layer {} for request on {key} at {end_lsn}",
l.short_id(),
);
latest_delta.replace(Arc::clone(l));
break;
}
if l.get_lsn_range().end > latest_img_lsn.unwrap_or(Lsn(0)) {
// this layer's end LSN is smaller than the requested point. If there's
// nothing newer, this is what we need to return. Remember this.
if let Some(old_candidate) = &latest_delta {
if l.get_lsn_range().end > old_candidate.get_lsn_range().end {
latest_delta.replace(Arc::clone(l));
}
(Some(delta), Some(image)) => {
let img_lsn = image.get_lsn_range().start;
let image_is_newer = image.get_lsn_range().end > delta.get_lsn_range().end;
let image_exact_match = Lsn(img_lsn.0 + 1) == end_lsn;
if image_is_newer || image_exact_match {
Some(SearchResult {
layer: image,
lsn_floor: img_lsn,
})
} else {
latest_delta.replace(Arc::clone(l));
let lsn_floor =
std::cmp::max(delta.get_lsn_range().start, image.get_lsn_range().start + 1);
Some(SearchResult {
layer: delta,
lsn_floor,
})
}
}
}
if let Some(l) = latest_delta {
trace!(
"found (old) layer {} for request on {key} at {end_lsn}",
l.short_id(),
);
let lsn_floor = std::cmp::max(
Lsn(latest_img_lsn.unwrap_or(Lsn(0)).0 + 1),
l.get_lsn_range().start,
);
Some(SearchResult {
lsn_floor,
layer: l,
})
} else if let Some(l) = latest_img {
trace!("found img layer and no deltas for request on {key} at {end_lsn}");
Some(SearchResult {
lsn_floor: latest_img_lsn.unwrap(),
layer: l,
})
} else {
trace!("no layer found for request on {key} at {end_lsn}");
None
}
}
///
/// Insert an on-disk layer
///
pub fn insert_historic(&mut self, layer: Arc<L>) {
let kr = layer.get_key_range();
let lr = layer.get_lsn_range();
self.historic.insert(
kr.start.to_i128()..kr.end.to_i128(),
lr.start.0..lr.end.0,
Arc::clone(&layer),
!layer.is_incremental(),
);
if layer.get_key_range() == (Key::MIN..Key::MAX) {
self.l0_delta_layers.push(layer.clone());
self.l0_delta_layers.push(layer);
}
self.historic_layers.insert(LayerRTreeObject::new(layer));
NUM_ONDISK_LAYERS.inc();
}
/// Must be called after a batch of insert_historic calls, before querying
pub fn rebuild_index(&mut self) {
self.historic.rebuild();
}
///
/// Remove an on-disk layer from the map.
///
/// This should be called when the corresponding file on disk has been deleted.
///
pub fn remove_historic(&mut self, layer: Arc<L>) {
let kr = layer.get_key_range();
let lr = layer.get_lsn_range();
self.historic.remove(
kr.start.to_i128()..kr.end.to_i128(),
lr.start.0..lr.end.0,
!layer.is_incremental(),
);
if layer.get_key_range() == (Key::MIN..Key::MAX) {
let len_before = self.l0_delta_layers.len();
@@ -394,96 +203,51 @@ where
.retain(|other| !Arc::ptr_eq(other, &layer));
assert_eq!(self.l0_delta_layers.len(), len_before - 1);
}
assert!(self
.historic_layers
.remove(&LayerRTreeObject::new(layer))
.is_some());
NUM_ONDISK_LAYERS.dec();
}
/// Is there a newer image layer for given key- and LSN-range?
/// Is there a newer image layer for given key- and LSN-range? Or a set
/// of image layers within the specified lsn range that cover the entire
/// specified key range?
///
/// This is used for garbage collection, to determine if an old layer can
/// be deleted.
pub fn image_layer_exists(
&self,
key_range: &Range<Key>,
lsn_range: &Range<Lsn>,
) -> Result<bool> {
let mut range_remain = key_range.clone();
pub fn image_layer_exists(&self, key: &Range<Key>, lsn: &Range<Lsn>) -> Result<bool> {
if key.is_empty() {
return Ok(true);
}
loop {
let mut made_progress = false;
let envelope = AABB::from_corners(
[
IntKey::from(range_remain.start.to_i128()),
IntKey::from(lsn_range.start.0 as i128),
],
[
IntKey::from(range_remain.end.to_i128() - 1),
IntKey::from(lsn_range.end.0 as i128 - 1),
],
);
for e in self
.historic_layers
.locate_in_envelope_intersecting(&envelope)
{
let l = &e.layer;
if l.is_incremental() {
continue;
}
let img_lsn = l.get_lsn_range().start;
if l.get_key_range().contains(&range_remain.start) && lsn_range.contains(&img_lsn) {
made_progress = true;
let img_key_end = l.get_key_range().end;
let version = match self.historic.get().unwrap().get_version(lsn.end.0) {
Some(v) => v,
None => return Ok(false),
};
if img_key_end >= range_remain.end {
return Ok(true);
}
range_remain.start = img_key_end;
}
}
let start = key.start.to_i128();
let end = key.end.to_i128();
if !made_progress {
let layer_covers = |layer: Option<Arc<L>>| match layer {
Some(layer) => layer.get_lsn_range().start >= lsn.start,
None => false,
};
// Check the start is covered
if !layer_covers(version.image_coverage.query(start)) {
return Ok(false);
}
// Check after all changes of coverage
for (_, change_val) in version.image_coverage.range(start..end) {
if !layer_covers(change_val) {
return Ok(false);
}
}
Ok(true)
}
pub fn iter_historic_layers(&self) -> impl '_ + Iterator<Item = Arc<L>> {
self.historic_layers.iter().map(|e| e.layer.clone())
}
/// Find the last image layer that covers 'key', ignoring any image layers
/// newer than 'lsn'.
fn find_latest_image(&self, key: Key, lsn: Lsn) -> Option<Arc<L>> {
let mut candidate_lsn = Lsn(0);
let mut candidate = None;
let envelope = AABB::from_corners(
[IntKey::from(key.to_i128()), IntKey::from(0)],
[IntKey::from(key.to_i128()), IntKey::from(lsn.0 as i128)],
);
for e in self
.historic_layers
.locate_in_envelope_intersecting(&envelope)
{
let l = &e.layer;
if l.is_incremental() {
continue;
}
assert!(l.get_key_range().contains(&key));
let this_lsn = l.get_lsn_range().start;
assert!(this_lsn <= lsn);
if this_lsn < candidate_lsn {
// our previous candidate was better
continue;
}
candidate_lsn = this_lsn;
candidate = Some(Arc::clone(l));
}
candidate
self.historic.iter()
}
///
@@ -499,87 +263,239 @@ where
key_range: &Range<Key>,
lsn: Lsn,
) -> Result<Vec<(Range<Key>, Option<Arc<L>>)>> {
let mut points = vec![key_range.start];
let envelope = AABB::from_corners(
[IntKey::from(key_range.start.to_i128()), IntKey::from(0)],
[
IntKey::from(key_range.end.to_i128()),
IntKey::from(lsn.0 as i128),
],
);
for e in self
.historic_layers
.locate_in_envelope_intersecting(&envelope)
{
let l = &e.layer;
assert!(l.get_lsn_range().start <= lsn);
let range = l.get_key_range();
if key_range.contains(&range.start) {
points.push(l.get_key_range().start);
}
if key_range.contains(&range.end) {
points.push(l.get_key_range().end);
}
// TODO I'm 80% sure the lsn bound is inclusive. Double-check that
// and document it. Do the same for image_layer_exists and count_deltas
let version = match self.historic.get().unwrap().get_version(lsn.0) {
Some(v) => v,
None => return Ok(vec![]),
};
let start = key_range.start.to_i128();
let end = key_range.end.to_i128();
// Initialize loop variables
let mut coverage: Vec<(Range<Key>, Option<Arc<L>>)> = vec![];
let mut current_key = start;
let mut current_val = version.image_coverage.query(start);
// Loop through the change events and push intervals
for (change_key, change_val) in version.image_coverage.range(start..end) {
let kr = Key::from_i128(current_key)..Key::from_i128(change_key);
coverage.push((kr, current_val.take()));
current_key = change_key;
current_val = change_val.clone();
}
points.push(key_range.end);
points.sort();
points.dedup();
// Add the final interval
let kr = Key::from_i128(current_key)..Key::from_i128(end);
coverage.push((kr, current_val.take()));
// Ok, we now have a list of "interesting" points in the key space
// For each range between the points, find the latest image
let mut start = *points.first().unwrap();
let mut ranges = Vec::new();
for end in points[1..].iter() {
let img = self.find_latest_image(start, lsn);
ranges.push((start..*end, img));
start = *end;
}
Ok(ranges)
Ok(coverage)
}
/// Count how many L1 delta layers there are that overlap with the
/// given key and LSN range.
pub fn count_deltas(&self, key_range: &Range<Key>, lsn_range: &Range<Lsn>) -> Result<usize> {
let mut result = 0;
if lsn_range.start >= lsn_range.end {
/// Count the height of the tallest stack of deltas in this 2d region.
///
/// If `limit` is provided we don't try to count above that number.
///
/// This number is used to compute the largest number of deltas that
/// we'll need to visit for any page reconstruction in this region.
/// We use this heuristic to decide whether to create an image layer.
pub fn count_deltas(
&self,
key: &Range<Key>,
lsn: &Range<Lsn>,
limit: Option<usize>,
) -> Result<usize> {
// We get the delta coverage of the region, and for each part of the coverage
// we recurse right underneath the delta. The recursion depth is limited by
// the largest result this function could return, which is in practice between
// 3 and 10 (since we usually try to create an image when the number gets larger).
if lsn.is_empty() || key.is_empty() || limit == Some(0) {
return Ok(0);
}
let envelope = AABB::from_corners(
[
IntKey::from(key_range.start.to_i128()),
IntKey::from(lsn_range.start.0 as i128),
],
[
IntKey::from(key_range.end.to_i128() - 1),
IntKey::from(lsn_range.end.0 as i128 - 1),
],
);
for e in self
.historic_layers
.locate_in_envelope_intersecting(&envelope)
{
let l = &e.layer;
if !l.is_incremental() {
continue;
}
assert!(range_overlaps(&l.get_lsn_range(), lsn_range));
assert!(range_overlaps(&l.get_key_range(), key_range));
// We ignore level0 delta layers. Unless the whole keyspace fits
// into one partition
if !range_eq(key_range, &(Key::MIN..Key::MAX))
&& range_eq(&l.get_key_range(), &(Key::MIN..Key::MAX))
{
continue;
let version = match self.historic.get().unwrap().get_version(lsn.end.0 - 1) {
Some(v) => v,
None => return Ok(0),
};
let start = key.start.to_i128();
let end = key.end.to_i128();
// Initialize loop variables
let mut max_stacked_deltas = 0;
let mut current_key = start;
let mut current_val = version.delta_coverage.query(start);
// Loop through the delta coverage and recurse on each part
for (change_key, change_val) in version.delta_coverage.range(start..end) {
// If there's a relevant delta in this part, add 1 and recurse down
if let Some(val) = current_val {
if val.get_lsn_range().end > lsn.start {
let kr = Key::from_i128(current_key)..Key::from_i128(change_key);
let lr = lsn.start..val.get_lsn_range().start;
if !kr.is_empty() {
let new_limit = limit.map(|l| l - 1);
let max_stacked_deltas_underneath =
self.count_deltas(&kr, &lr, new_limit)?;
max_stacked_deltas =
std::cmp::max(max_stacked_deltas, 1 + max_stacked_deltas_underneath);
}
}
}
result += 1;
current_key = change_key;
current_val = change_val.clone();
}
Ok(result)
// Consider the last part
if let Some(val) = current_val {
if val.get_lsn_range().end > lsn.start {
let kr = Key::from_i128(current_key)..Key::from_i128(end);
let lr = lsn.start..val.get_lsn_range().start;
if !kr.is_empty() {
let new_limit = limit.map(|l| l - 1);
let max_stacked_deltas_underneath = self.count_deltas(&kr, &lr, new_limit)?;
max_stacked_deltas =
std::cmp::max(max_stacked_deltas, 1 + max_stacked_deltas_underneath);
}
}
}
Ok(max_stacked_deltas)
}
/// Count how many layers we need to visit for given key-lsn pair.
///
/// Used as a helper for correctness checks only. Performance not critical.
pub fn get_difficulty(&self, lsn: Lsn, key: Key) -> usize {
match self.search(key, lsn) {
Some(search_result) => {
if search_result.layer.is_incremental() {
1 + self.get_difficulty(search_result.lsn_floor, key)
} else {
0
}
}
None => 0,
}
}
/// Used for correctness checking. Results are expected to be identical to
/// self.get_difficulty_map. Assumes self.search is correct.
pub fn get_difficulty_map_bruteforce(
&self,
lsn: Lsn,
partitioning: &KeyPartitioning,
) -> Vec<usize> {
// Looking at the difficulty as a function of key, it could only increase
// when a delta layer starts or an image layer ends. Therefore it's sufficient
// to check the difficulties at:
// - the key.start for each non-empty part range
// - the key.start for each delta
// - the key.end for each image
let keys_iter: Box<dyn Iterator<Item = Key>> = {
let mut keys: Vec<Key> = self
.iter_historic_layers()
.map(|layer| {
if layer.is_incremental() {
layer.get_key_range().start
} else {
layer.get_key_range().end
}
})
.collect();
keys.sort();
Box::new(keys.into_iter())
};
let mut keys_iter = keys_iter.peekable();
// Iter the partition and keys together and query all the necessary
// keys, computing the max difficulty for each part.
partitioning
.parts
.iter()
.map(|part| {
let mut difficulty = 0;
// Partition ranges are assumed to be sorted and disjoint
// TODO assert it
for range in &part.ranges {
if !range.is_empty() {
difficulty =
std::cmp::max(difficulty, self.get_difficulty(lsn, range.start));
}
while let Some(key) = keys_iter.peek() {
if key >= &range.end {
break;
}
let key = keys_iter.next().unwrap();
if key < range.start {
continue;
}
difficulty = std::cmp::max(difficulty, self.get_difficulty(lsn, key));
}
}
difficulty
})
.collect()
}
/// For each part of a keyspace partitioning, return the maximum number of layers
/// that would be needed for page reconstruction in that part at the given LSN.
///
/// If `limit` is provided we don't try to count above that number.
///
/// This method is used to decide where to create new image layers. Computing the
/// result for the entire partitioning at once allows this function to be more
/// efficient, and further optimization is possible by using iterators instead,
/// to allow early return.
///
/// TODO actually use this method instead of count_deltas. Currently we only use
/// it for benchmarks.
pub fn get_difficulty_map(
&self,
lsn: Lsn,
partitioning: &KeyPartitioning,
limit: Option<usize>,
) -> Vec<usize> {
// TODO This is a naive implementation. Perf improvements to do:
// 1. Instead of calling self.image_coverage and self.count_deltas,
// iterate the image and delta coverage only once.
partitioning
.parts
.iter()
.map(|part| {
let mut difficulty = 0;
for range in &part.ranges {
if limit == Some(difficulty) {
break;
}
for (img_range, last_img) in self
.image_coverage(range, lsn)
.expect("why would this err?")
{
if limit == Some(difficulty) {
break;
}
let img_lsn = if let Some(last_img) = last_img {
last_img.get_lsn_range().end
} else {
Lsn(0)
};
if img_lsn < lsn {
let num_deltas = self
.count_deltas(&img_range, &(img_lsn..lsn), limit)
.expect("why would this err lol?");
difficulty = std::cmp::max(difficulty, num_deltas);
}
}
}
difficulty
})
.collect()
}
/// Return all L0 delta layers
@@ -603,8 +519,8 @@ where
}
println!("historic_layers:");
for e in self.historic_layers.iter() {
e.layer.dump(verbose)?;
for layer in self.iter_historic_layers() {
layer.dump(verbose)?;
}
println!("End dump LayerMap");
Ok(())

View File

@@ -0,0 +1,418 @@
use std::collections::BTreeMap;
use std::ops::Range;
use super::layer_coverage::LayerCoverageTuple;
/// Efficiently queryable layer coverage for each LSN.
///
/// Allows answering layer map queries very efficiently,
/// but doesn't allow retroactive insertion, which is
/// sometimes necessary. See BufferedHistoricLayerCoverage.
pub struct HistoricLayerCoverage<Value> {
/// The latest state
head: LayerCoverageTuple<Value>,
/// All previous states
historic: BTreeMap<u64, LayerCoverageTuple<Value>>,
}
impl<T: Clone + PartialEq> Default for HistoricLayerCoverage<T> {
fn default() -> Self {
Self::new()
}
}
impl<Value: Clone + PartialEq> HistoricLayerCoverage<Value> {
pub fn new() -> Self {
Self {
head: LayerCoverageTuple::default(),
historic: BTreeMap::default(),
}
}
/// Add a layer
///
/// Panics if new layer has older lsn.start than an existing layer.
/// See BufferedHistoricLayerCoverage for a more general insertion method.
pub fn insert(&mut self, key: Range<i128>, lsn: Range<u64>, value: Value, is_image: bool) {
// It's only a persistent map, not a retroactive one
if let Some(last_entry) = self.historic.iter().rev().next() {
let last_lsn = last_entry.0;
if lsn.start == *last_lsn {
// TODO there are edge cases to take care of
}
if lsn.start < *last_lsn {
panic!("unexpected retroactive insert");
}
}
// Insert into data structure
if is_image {
self.head.image_coverage.insert(key, lsn.clone(), value);
} else {
self.head.delta_coverage.insert(key, lsn.clone(), value);
}
// Remember history. Clone is O(1)
self.historic.insert(lsn.start, self.head.clone());
}
/// Query at a particular LSN, inclusive
pub fn get_version(&self, lsn: u64) -> Option<&LayerCoverageTuple<Value>> {
match self.historic.range(..=lsn).rev().next() {
Some((_, v)) => Some(v),
None => None,
}
}
/// Remove all entries after a certain LSN
pub fn trim(&mut self, begin: &u64) {
self.historic.split_off(begin);
self.head = self
.historic
.iter()
.rev()
.next()
.map(|(_, v)| v.clone())
.unwrap_or_default();
}
}
/// This is the most basic test that demonstrates intended usage.
/// All layers in this test have height 1.
#[test]
fn test_persistent_simple() {
let mut map = HistoricLayerCoverage::<String>::new();
map.insert(0..5, 100..101, "Layer 1".to_string(), true);
map.insert(3..9, 110..111, "Layer 2".to_string(), true);
map.insert(5..6, 120..121, "Layer 3".to_string(), true);
// After Layer 1 insertion
let version = map.get_version(105).unwrap();
assert_eq!(version.image_coverage.query(1), Some("Layer 1".to_string()));
assert_eq!(version.image_coverage.query(4), Some("Layer 1".to_string()));
// After Layer 2 insertion
let version = map.get_version(115).unwrap();
assert_eq!(version.image_coverage.query(4), Some("Layer 2".to_string()));
assert_eq!(version.image_coverage.query(8), Some("Layer 2".to_string()));
assert_eq!(version.image_coverage.query(11), None);
// After Layer 3 insertion
let version = map.get_version(125).unwrap();
assert_eq!(version.image_coverage.query(4), Some("Layer 2".to_string()));
assert_eq!(version.image_coverage.query(5), Some("Layer 3".to_string()));
assert_eq!(version.image_coverage.query(7), Some("Layer 2".to_string()));
}
/// Cover simple off-by-one edge cases
#[test]
fn test_off_by_one() {
let mut map = HistoricLayerCoverage::<String>::new();
map.insert(3..5, 100..110, "Layer 1".to_string(), true);
// Check different LSNs
let version = map.get_version(99);
assert!(version.is_none());
let version = map.get_version(100).unwrap();
assert_eq!(version.image_coverage.query(4), Some("Layer 1".to_string()));
let version = map.get_version(110).unwrap();
assert_eq!(version.image_coverage.query(4), Some("Layer 1".to_string()));
// Check different keys
let version = map.get_version(105).unwrap();
assert_eq!(version.image_coverage.query(2), None);
assert_eq!(version.image_coverage.query(3), Some("Layer 1".to_string()));
assert_eq!(version.image_coverage.query(4), Some("Layer 1".to_string()));
assert_eq!(version.image_coverage.query(5), None);
}
/// Cover edge cases where layers begin or end on the same key
#[test]
fn test_key_collision() {
let mut map = HistoricLayerCoverage::<String>::new();
map.insert(3..5, 100..110, "Layer 10".to_string(), true);
map.insert(5..8, 100..110, "Layer 11".to_string(), true);
map.insert(3..4, 200..210, "Layer 20".to_string(), true);
// Check after layer 11
let version = map.get_version(105).unwrap();
assert_eq!(version.image_coverage.query(2), None);
assert_eq!(
version.image_coverage.query(3),
Some("Layer 10".to_string())
);
assert_eq!(
version.image_coverage.query(5),
Some("Layer 11".to_string())
);
assert_eq!(
version.image_coverage.query(7),
Some("Layer 11".to_string())
);
assert_eq!(version.image_coverage.query(8), None);
// Check after layer 20
let version = map.get_version(205).unwrap();
assert_eq!(version.image_coverage.query(2), None);
assert_eq!(
version.image_coverage.query(3),
Some("Layer 20".to_string())
);
assert_eq!(
version.image_coverage.query(5),
Some("Layer 11".to_string())
);
assert_eq!(
version.image_coverage.query(7),
Some("Layer 11".to_string())
);
assert_eq!(version.image_coverage.query(8), None);
}
/// Test when rectangles have nontrivial height and possibly overlap
#[test]
fn test_persistent_overlapping() {
let mut map = HistoricLayerCoverage::<String>::new();
// Add 3 key-disjoint layers with varying LSN ranges
map.insert(1..2, 100..200, "Layer 1".to_string(), true);
map.insert(4..5, 110..200, "Layer 2".to_string(), true);
map.insert(7..8, 120..300, "Layer 3".to_string(), true);
// Add wide and short layer
map.insert(0..9, 130..199, "Layer 4".to_string(), true);
// Add wide layer taller than some
map.insert(0..9, 140..201, "Layer 5".to_string(), true);
// Add wide layer taller than all
map.insert(0..9, 150..301, "Layer 6".to_string(), true);
// After layer 4 insertion
let version = map.get_version(135).unwrap();
assert_eq!(version.image_coverage.query(0), Some("Layer 4".to_string()));
assert_eq!(version.image_coverage.query(1), Some("Layer 1".to_string()));
assert_eq!(version.image_coverage.query(2), Some("Layer 4".to_string()));
assert_eq!(version.image_coverage.query(4), Some("Layer 2".to_string()));
assert_eq!(version.image_coverage.query(5), Some("Layer 4".to_string()));
assert_eq!(version.image_coverage.query(7), Some("Layer 3".to_string()));
assert_eq!(version.image_coverage.query(8), Some("Layer 4".to_string()));
// After layer 5 insertion
let version = map.get_version(145).unwrap();
assert_eq!(version.image_coverage.query(0), Some("Layer 5".to_string()));
assert_eq!(version.image_coverage.query(1), Some("Layer 5".to_string()));
assert_eq!(version.image_coverage.query(2), Some("Layer 5".to_string()));
assert_eq!(version.image_coverage.query(4), Some("Layer 5".to_string()));
assert_eq!(version.image_coverage.query(5), Some("Layer 5".to_string()));
assert_eq!(version.image_coverage.query(7), Some("Layer 3".to_string()));
assert_eq!(version.image_coverage.query(8), Some("Layer 5".to_string()));
// After layer 6 insertion
let version = map.get_version(155).unwrap();
assert_eq!(version.image_coverage.query(0), Some("Layer 6".to_string()));
assert_eq!(version.image_coverage.query(1), Some("Layer 6".to_string()));
assert_eq!(version.image_coverage.query(2), Some("Layer 6".to_string()));
assert_eq!(version.image_coverage.query(4), Some("Layer 6".to_string()));
assert_eq!(version.image_coverage.query(5), Some("Layer 6".to_string()));
assert_eq!(version.image_coverage.query(7), Some("Layer 6".to_string()));
assert_eq!(version.image_coverage.query(8), Some("Layer 6".to_string()));
}
/// Wrapper for HistoricLayerCoverage that allows us to hack around the lack
/// of support for retroactive insertion by rebuilding the map since the
/// change.
///
/// Why is this needed? We most often insert new layers with newer LSNs,
/// but during compaction we create layers with non-latest LSN, and during
/// GC we delete historic layers.
///
/// Even though rebuilding is an expensive (N log N) solution to the problem,
/// it's not critical since we do something equally expensive just to decide
/// whether or not to create new image layers.
///
/// If this becomes an actual bottleneck, one solution would be to build a
/// segment tree that holds PersistentLayerMaps. Though this would mean that
/// we take an additional log(N) performance hit for queries, which will probably
/// still be more critical.
///
/// See this for more on persistent and retroactive techniques:
/// https://www.youtube.com/watch?v=WqCWghETNDc&t=581s
pub struct BufferedHistoricLayerCoverage<Value> {
/// A persistent layer map that we rebuild when we need to retroactively update
historic_coverage: HistoricLayerCoverage<Value>,
/// We buffer insertion into the PersistentLayerMap to decrease the number of rebuilds.
///
/// We implicitly assume that layers are identified by their key and lsn range.
/// A value of None means we want to delete this item.
buffer: BTreeMap<(u64, u64, i128, i128, bool), Option<Value>>,
/// All current layers. This is not used for search. Only to make rebuilds easier.
///
/// We implicitly assume that layers are identified by their key and lsn range.
layers: BTreeMap<(u64, u64, i128, i128, bool), Value>,
}
impl<T: std::fmt::Debug> std::fmt::Debug for BufferedHistoricLayerCoverage<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RetroactiveLayerMap")
.field("buffer", &self.buffer)
.field("layers", &self.layers)
.finish()
}
}
impl<T: Clone + PartialEq> Default for BufferedHistoricLayerCoverage<T> {
fn default() -> Self {
Self::new()
}
}
impl<Value: Clone + PartialEq> BufferedHistoricLayerCoverage<Value> {
pub fn new() -> Self {
Self {
historic_coverage: HistoricLayerCoverage::<Value>::new(),
buffer: BTreeMap::new(),
layers: BTreeMap::new(),
}
}
pub fn insert(&mut self, key: Range<i128>, lsn: Range<u64>, value: Value, is_image: bool) {
self.buffer.insert(
(lsn.start, lsn.end, key.start, key.end, is_image),
Some(value),
);
}
pub fn remove(&mut self, key: Range<i128>, lsn: Range<u64>, is_image: bool) {
self.buffer
.insert((lsn.start, lsn.end, key.start, key.end, is_image), None);
}
pub fn rebuild(&mut self) {
// Find the first LSN that needs to be rebuilt
let rebuild_since: u64 = match self.buffer.iter().next() {
Some(((lsn_start, _, _, _, _), _)) => *lsn_start,
None => return, // No need to rebuild if buffer is empty
};
// Apply buffered updates to self.layers
self.buffer.retain(|rect, layer| {
match layer {
Some(l) => {
let existing = self.layers.insert(*rect, l.clone());
if existing.is_some() {
// TODO this happened once. Investigate.
// panic!("can't overwrite layer");
}
}
None => {
let existing = self.layers.remove(rect);
if existing.is_none() {
panic!("invalid layer deletion");
}
}
};
false
});
// Rebuild
self.historic_coverage.trim(&rebuild_since);
for ((lsn_start, lsn_end, key_start, key_end, is_image), layer) in
self.layers.range((rebuild_since, 0, 0, 0, false)..)
{
self.historic_coverage.insert(
*key_start..*key_end,
*lsn_start..*lsn_end,
layer.clone(),
*is_image,
);
}
}
/// Iterate all the layers
pub fn iter(&self) -> impl '_ + Iterator<Item = Value> {
// NOTE we can actually perform this without rebuilding,
// but it's not necessary for now.
if !self.buffer.is_empty() {
panic!("rebuild pls")
}
self.layers.values().cloned()
}
/// Return a reference to a queryable map, assuming all updates
/// have already been processed using self.rebuild()
pub fn get(&self) -> anyhow::Result<&HistoricLayerCoverage<Value>> {
// NOTE we error here instead of implicitly rebuilding because
// rebuilding is somewhat expensive.
// TODO maybe implicitly rebuild and log/sentry an error?
if !self.buffer.is_empty() {
anyhow::bail!("rebuild required")
}
Ok(&self.historic_coverage)
}
}
#[test]
fn test_retroactive_regression_1() {
let mut map = BufferedHistoricLayerCoverage::new();
map.insert(
0..21267647932558653966460912964485513215,
23761336..23761457,
"sdfsdfs".to_string(),
false,
);
map.rebuild();
let version = map.get().unwrap().get_version(23761457).unwrap();
assert_eq!(
version.delta_coverage.query(100),
Some("sdfsdfs".to_string())
);
}
#[test]
fn test_retroactive_simple() {
let mut map = BufferedHistoricLayerCoverage::new();
// Append some images in increasing LSN order
map.insert(0..5, 100..101, "Image 1".to_string(), true);
map.insert(3..9, 110..111, "Image 2".to_string(), true);
map.insert(4..6, 120..121, "Image 3".to_string(), true);
map.insert(8..9, 120..121, "Image 4".to_string(), true);
// Add a delta layer out of order
map.insert(2..5, 105..106, "Delta 1".to_string(), true);
// Rebuild so we can start querying
map.rebuild();
// Query key 4
let version = map.get().unwrap().get_version(90);
assert!(version.is_none());
let version = map.get().unwrap().get_version(102).unwrap();
assert_eq!(version.image_coverage.query(4), Some("Image 1".to_string()));
let version = map.get().unwrap().get_version(107).unwrap();
assert_eq!(version.image_coverage.query(4), Some("Delta 1".to_string()));
let version = map.get().unwrap().get_version(115).unwrap();
assert_eq!(version.image_coverage.query(4), Some("Image 2".to_string()));
let version = map.get().unwrap().get_version(125).unwrap();
assert_eq!(version.image_coverage.query(4), Some("Image 3".to_string()));
// Remove Image 3
map.remove(4..6, 120..121, true);
map.rebuild();
// Check deletion worked
let version = map.get().unwrap().get_version(125).unwrap();
assert_eq!(version.image_coverage.query(4), Some("Image 2".to_string()));
assert_eq!(version.image_coverage.query(8), Some("Image 4".to_string()));
}

View File

@@ -0,0 +1,200 @@
use std::ops::Range;
// TODO the `im` crate has 20x more downloads and also has
// persistent/immutable BTree. It also runs a bit faster but
// results are not the same on some tests.
use rpds::RedBlackTreeMapSync;
use im::OrdMap;
/// Data structure that can efficiently:
/// - find the latest layer by lsn.end at a given key
/// - iterate the latest layers in a key range
/// - insert layers in non-decreasing lsn.start order
///
/// The struct is parameterized over Value for easier
/// testing, but in practice it's some sort of layer.
pub struct LayerCoverage<Value> {
/// For every change in coverage (as we sweep the key space)
/// we store (lsn.end, value).
///
/// We use an immutable/persistent tree so that we can keep historic
/// versions of this coverage without cloning the whole thing and
/// incurring quadratic memory cost. See HistoricLayerCoverage.
///
/// We use the Sync version of the map because we want Self to
/// be Sync. Using nonsync might be faster, if we can work with
/// that.
nodes: RedBlackTreeMapSync<i128, Option<(u64, Value)>>,
im: OrdMap<i128, Option<(u64, Value)>>,
}
impl<T: Clone + PartialEq> Default for LayerCoverage<T> {
fn default() -> Self {
Self::new()
}
}
impl<Value: Clone + PartialEq> LayerCoverage<Value> {
pub fn new() -> Self {
Self {
nodes: RedBlackTreeMapSync::default(),
im: OrdMap::default(),
}
}
/// Helper function to subdivide the key range without changing any values
///
/// Complexity: O(log N)
fn add_node(&mut self, key: i128) {
let value = match self.nodes.range(..=key).last() {
Some((_, Some(v))) => Some(v.clone()),
Some((_, None)) => None,
None => None,
};
self.nodes.insert_mut(key, value);
let im_value = match self.im.range(..=key).last() {
Some((_, Some(v))) => Some(v.clone()),
Some((_, None)) => None,
None => None,
};
self.im.insert(key, im_value);
}
/// Insert a layer.
///
/// Complexity: worst case O(N), in practice O(log N). See not in implementation.
pub fn insert(&mut self, key: Range<i128>, lsn: Range<u64>, value: Value) {
// NOTE The order of the following lines is important!!
// Add nodes at endpoints
self.add_node(key.start);
self.add_node(key.end);
// Raise the height where necessary
//
// NOTE This loop is worst case O(N), but amortized O(log N) in the special
// case when rectangles have no height. In practice I don't think we'll see
// the kind of layer intersections needed to trigger O(N) behavior. The worst
// case is N/2 horizontal layers overlapped with N/2 vertical layers in a
// grid pattern.
let mut to_update = Vec::new();
let mut to_remove = Vec::new();
let mut prev_covered = false;
for (k, node) in self.nodes.range(key.clone()) {
let needs_cover = match node {
None => true,
Some((h, _)) => h < &lsn.end,
};
if needs_cover {
match prev_covered {
true => to_remove.push(*k),
false => to_update.push(*k),
}
}
prev_covered = needs_cover;
}
if !prev_covered {
to_remove.push(key.end);
}
for k in to_update {
self.nodes.insert_mut(k, Some((lsn.end, value.clone())));
}
for k in to_remove {
self.nodes.remove_mut(&k);
}
let mut to_update = Vec::new();
let mut to_remove = Vec::new();
let mut prev_covered = false;
for (k, node) in self.im.range(key.clone()) {
let needs_cover = match node {
None => true,
Some((h, _)) => h < &lsn.end,
};
if needs_cover {
match prev_covered {
true => to_remove.push(*k),
false => to_update.push(*k),
}
}
prev_covered = needs_cover;
}
if !prev_covered {
to_remove.push(key.end);
}
for k in to_update {
self.im.insert(k, Some((lsn.end, value.clone())));
}
for k in to_remove {
self.im.remove(&k);
}
}
/// Get the latest (by lsn.end) layer at a given key
///
/// Complexity: O(log N)
pub fn query(&self, key: i128) -> Option<Value> {
let res = self.nodes
.range(..=key)
.rev()
.next()?
.1
.as_ref()
.map(|(_, v)| v.clone());
let im_res = self.im
.range(..=key)
.rev()
.next()?
.1
.as_ref()
.map(|(_, v)| v.clone());
if res != im_res {
panic!("aha");
}
im_res
}
/// Iterate the changes in layer coverage in a given range. You will likely
/// want to start with self.query(key.start), and then follow up with self.range
///
/// Complexity: O(log N + result_size)
pub fn range(&self, key: Range<i128>) -> impl '_ + Iterator<Item = (i128, Option<Value>)> {
self.nodes
.range(key)
.map(|(k, v)| (*k, v.as_ref().map(|x| x.1.clone())))
}
/// O(1) clone
pub fn clone(&self) -> Self {
Self {
nodes: self.nodes.clone(),
im: self.im.clone(),
}
}
}
/// Image and delta coverage at a specific LSN.
pub struct LayerCoverageTuple<Value> {
pub image_coverage: LayerCoverage<Value>,
pub delta_coverage: LayerCoverage<Value>,
}
impl<T: Clone + PartialEq> Default for LayerCoverageTuple<T> {
fn default() -> Self {
Self {
image_coverage: LayerCoverage::default(),
delta_coverage: LayerCoverage::default(),
}
}
}
impl<Value: Clone + PartialEq> LayerCoverageTuple<Value> {
pub fn clone(&self) -> Self {
Self {
image_coverage: self.image_coverage.clone(),
delta_coverage: self.delta_coverage.clone(),
}
}
}

View File

@@ -187,6 +187,12 @@ pub trait PersistentLayer: Layer {
fn file_size(&self) -> Option<u64>;
}
impl PartialEq for dyn PersistentLayer {
fn eq(&self, other: &Self) -> bool {
self.filename().eq(&other.filename())
}
}
pub fn downcast_remote_layer(
layer: &Arc<dyn PersistentLayer>,
) -> Option<std::sync::Arc<RemoteLayer>> {
@@ -196,3 +202,56 @@ pub fn downcast_remote_layer(
None
}
}
impl std::fmt::Debug for dyn Layer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Layer")
.field("short_id", &self.short_id())
.finish()
}
}
/// Holds metadata about a layer without any content. Used mostly for testing.
pub struct LayerDescriptor {
pub key: Range<Key>,
pub lsn: Range<Lsn>,
pub is_incremental: bool,
pub short_id: String,
}
impl PartialEq for LayerDescriptor {
fn eq(&self, other: &Self) -> bool {
self.short_id == other.short_id
}
}
impl Layer for LayerDescriptor {
fn get_key_range(&self) -> Range<Key> {
self.key.clone()
}
fn get_lsn_range(&self) -> Range<Lsn> {
self.lsn.clone()
}
fn is_incremental(&self) -> bool {
self.is_incremental
}
fn get_value_reconstruct_data(
&self,
_key: Key,
_lsn_range: Range<Lsn>,
_reconstruct_data: &mut ValueReconstructState,
) -> Result<ValueReconstructResult> {
todo!("This method shouldn't be part of the Layer trait")
}
fn short_id(&self) -> String {
self.short_id.clone()
}
fn dump(&self, _verbose: bool) -> Result<()> {
todo!()
}
}

View File

@@ -1148,6 +1148,7 @@ impl Timeline {
}
}
layers.rebuild_index();
layers.next_open_layer_at = Some(Lsn(disk_consistent_lsn.0) + 1);
info!(
@@ -1210,7 +1211,11 @@ impl Timeline {
anyhow::bail!("could not rename file {local_layer_path:?}: {err:?}");
} else {
self.metrics.resident_physical_size_gauge.sub(local_size);
self.layers.write().unwrap().remove_historic(local_layer);
{
let mut layers = self.layers.write().unwrap();
layers.remove_historic(local_layer);
layers.rebuild_index();
}
// fall-through to adding the remote layer
}
} else {
@@ -1252,7 +1257,11 @@ impl Timeline {
);
let remote_layer = Arc::new(remote_layer);
self.layers.write().unwrap().insert_historic(remote_layer);
{
let mut layers = self.layers.write().unwrap();
layers.insert_historic(remote_layer);
layers.rebuild_index();
}
}
LayerFileName::Delta(deltafilename) => {
// Create a RemoteLayer for the delta file.
@@ -1275,7 +1284,11 @@ impl Timeline {
&remote_layer_metadata,
);
let remote_layer = Arc::new(remote_layer);
self.layers.write().unwrap().insert_historic(remote_layer);
{
let mut layers = self.layers.write().unwrap();
layers.insert_historic(remote_layer);
layers.rebuild_index();
}
}
#[cfg(test)]
LayerFileName::Test(_) => unreachable!(),
@@ -2185,6 +2198,7 @@ impl Timeline {
{
let mut layers = self.layers.write().unwrap();
layers.insert_historic(Arc::new(new_delta));
layers.rebuild_index();
}
// update the timeline's physical size
@@ -2249,13 +2263,15 @@ impl Timeline {
// are some delta layers *later* than current 'lsn', if more WAL was processed and flushed
// after we read last_record_lsn, which is passed here in the 'lsn' argument.
if img_lsn < lsn {
let num_deltas = layers.count_deltas(&img_range, &(img_lsn..lsn))?;
let threshold = self.get_image_creation_threshold();
let num_deltas =
layers.count_deltas(&img_range, &(img_lsn..lsn), Some(threshold))?;
debug!(
"key range {}-{}, has {} deltas on this timeline in LSN range {}..{}",
img_range.start, img_range.end, num_deltas, img_lsn, lsn
);
if num_deltas >= self.get_image_creation_threshold() {
if num_deltas >= threshold {
return Ok(true);
}
}
@@ -2360,6 +2376,7 @@ impl Timeline {
.add(metadata.len());
layers.insert_historic(Arc::new(l));
}
layers.rebuild_index();
drop(layers);
timer.stop_and_record();
@@ -2691,6 +2708,7 @@ impl Timeline {
l.delete()?;
layers.remove_historic(l);
}
layers.rebuild_index();
drop(layers);
// Also schedule the deletions in remote storage
@@ -3004,6 +3022,7 @@ impl Timeline {
remote_client.schedule_layer_file_deletion(&layer_names_to_delete)?;
}
}
layers.rebuild_index();
info!(
"GC completed removing {} layers, cutoff {}",
@@ -3163,6 +3182,7 @@ impl Timeline {
layers.remove_historic(l);
}
layers.insert_historic(new_layer);
layers.rebuild_index();
drop(layers);
// Now that we've inserted the download into the layer map,

View File

@@ -172,8 +172,10 @@ def get_scales_matrix(default: int = 10) -> List[int]:
@pytest.mark.parametrize("duration", get_durations_matrix())
def test_pgbench(neon_with_baseline: PgCompare, scale: int, duration: int):
run_test_pgbench(neon_with_baseline, scale, duration, PgBenchLoadType.INIT)
run_test_pgbench(neon_with_baseline, scale, duration, PgBenchLoadType.SIMPLE_UPDATE)
run_test_pgbench(neon_with_baseline, scale, duration, PgBenchLoadType.SELECT_ONLY)
run_test_pgbench(neon_with_baseline, scale, duration, PgBenchLoadType.INIT)
run_test_pgbench(neon_with_baseline, scale, duration, PgBenchLoadType.INIT)
# run_test_pgbench(neon_with_baseline, scale, duration, PgBenchLoadType.SIMPLE_UPDATE)
# run_test_pgbench(neon_with_baseline, scale, duration, PgBenchLoadType.SELECT_ONLY)
# The following 3 tests run on an existing database as it was set up by previous tests,

View File

@@ -25,6 +25,7 @@ futures-task = { version = "0.3", default-features = false, features = ["alloc",
futures-util = { version = "0.3", features = ["alloc", "async-await", "async-await-macro", "channel", "futures-channel", "futures-io", "futures-macro", "futures-sink", "io", "memchr", "sink", "slab", "std"] }
hashbrown = { version = "0.12", features = ["ahash", "inline-more", "raw"] }
indexmap = { version = "1", default-features = false, features = ["std"] }
itertools = { version = "0.10", features = ["use_alloc", "use_std"] }
libc = { version = "0.2", features = ["extra_traits", "std"] }
log = { version = "0.4", default-features = false, features = ["serde", "std"] }
memchr = { version = "2", features = ["std"] }
@@ -53,6 +54,7 @@ bytes = { version = "1", features = ["serde", "std"] }
either = { version = "1", features = ["use_std"] }
hashbrown = { version = "0.12", features = ["ahash", "inline-more", "raw"] }
indexmap = { version = "1", default-features = false, features = ["std"] }
itertools = { version = "0.10", features = ["use_alloc", "use_std"] }
libc = { version = "0.2", features = ["extra_traits", "std"] }
log = { version = "0.4", default-features = false, features = ["serde", "std"] }
memchr = { version = "2", features = ["std"] }