Compare commits

...

22 Commits

Author SHA1 Message Date
Bojan Serafimov
c50e13d9d5 Use im 2022-12-13 21:11:57 -05:00
Bojan Serafimov
db72f432e5 Remove 2/3 clones on search path 2022-12-13 13:42:53 -05:00
Bojan Serafimov
5a6284f6f8 Remove Arc<Vec<_>> 2022-12-13 13:16:13 -05:00
Bojan Serafimov
c11c19d568 Check for exact match 2022-12-12 20:25:24 -05:00
Bojan Serafimov
d9a239475c Fix off-by-one 2022-12-12 16:31:22 -05:00
Bojan Serafimov
a4311bd961 Implement deletion 2022-12-12 16:26:36 -05:00
Bojan Serafimov
e0f23242be Add todo 2022-12-12 14:00:04 -05:00
Bojan Serafimov
0de1ec14e0 Add error context 2022-12-12 13:55:56 -05:00
Bojan Serafimov
0053ccac13 Add live correctness comparison 2022-12-12 13:48:54 -05:00
Bojan Serafimov
b6686d63b0 Add layers.rebuild_index() calls 2022-12-12 13:26:52 -05:00
Bojan Serafimov
edf8a08dcc Implement lsn_floor logic (untested) 2022-12-12 13:09:54 -05:00
Bojan Serafimov
b51d766a44 Add tests 2022-12-12 10:57:03 -05:00
Bojan Serafimov
c9b1655885 Remove sorting hacks 2022-12-12 10:18:01 -05:00
Bojan Serafimov
206ddec636 Add fn rebuild_index() 2022-12-09 23:51:16 -05:00
Bojan Serafimov
cb44698c47 Test and implement lsn.end queries 2022-12-09 17:47:22 -05:00
Bojan Serafimov
af325a30db Update api to use Range 2022-12-09 15:53:03 -05:00
Bojan Serafimov
be499156e5 Add note 2022-12-08 11:28:41 -05:00
Bojan Serafimov
4a11f6e2de WIP sketch for considering lsn_end 2022-12-08 11:17:46 -05:00
Bojan Serafimov
d9190aae87 WIP rebuild API 2022-12-07 18:51:08 -05:00
Bojan Serafimov
6bce11e810 Hacky first attempt to integrate 2022-12-06 14:14:27 -05:00
Bojan Serafimov
39003aa9f3 Simplify 2022-12-02 21:26:13 -05:00
Bojan Serafimov
a8f0d27c92 Benchmark immutable bst layer map 2022-12-02 21:22:09 -05:00
7 changed files with 660 additions and 22 deletions

68
Cargo.lock generated
View File

@@ -66,6 +66,15 @@ dependencies = [
"backtrace",
]
[[package]]
name = "archery"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0a8da9bc4c4053ee067669762bcaeea6e241841295a2b6c948312dad6ef4cc02"
dependencies = [
"static_assertions",
]
[[package]]
name = "arrayvec"
version = "0.7.2"
@@ -626,6 +635,15 @@ version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "bitmaps"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "031043d04099746d8db04daf1fa424b2bc8bd69d92b25962dcde24da39ab64a2"
dependencies = [
"typenum",
]
[[package]]
name = "block-buffer"
version = "0.10.3"
@@ -1817,6 +1835,20 @@ dependencies = [
"unicode-normalization",
]
[[package]]
name = "im"
version = "15.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d0acd33ff0285af998aaf9b57342af478078f53492322fafc47450e09397e0e9"
dependencies = [
"bitmaps",
"rand_core",
"rand_xoshiro",
"sized-chunks",
"typenum",
"version_check",
]
[[package]]
name = "indexmap"
version = "1.9.1"
@@ -2328,6 +2360,7 @@ dependencies = [
"humantime",
"humantime-serde",
"hyper",
"im",
"itertools",
"metrics",
"nix 0.25.0",
@@ -2345,6 +2378,7 @@ dependencies = [
"rand",
"regex",
"remote_storage",
"rpds",
"rstar",
"scopeguard",
"serde",
@@ -2953,6 +2987,15 @@ dependencies = [
"rand_core",
]
[[package]]
name = "rand_xoshiro"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6f97cdb2a36ed4183de61b2f824cc45c9f1037f28afe0a322e9fff4c108b5aaa"
dependencies = [
"rand_core",
]
[[package]]
name = "rayon"
version = "1.5.3"
@@ -3154,6 +3197,15 @@ dependencies = [
"regex",
]
[[package]]
name = "rpds"
version = "0.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "66262ea963eff99163e6b741fbc3417a52cc13074728c1047e9911789df9b000"
dependencies = [
"archery",
]
[[package]]
name = "rstar"
version = "0.9.3"
@@ -3578,6 +3630,16 @@ version = "0.3.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7bd3e3206899af3f8b12af284fafc038cc1dc2b41d1b89dd17297221c5d225de"
[[package]]
name = "sized-chunks"
version = "0.6.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "16d69225bde7a69b235da73377861095455d298f2b970996eec25ddbb42b3d1e"
dependencies = [
"bitmaps",
"typenum",
]
[[package]]
name = "slab"
version = "0.4.7"
@@ -3624,6 +3686,12 @@ version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3"
[[package]]
name = "static_assertions"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
[[package]]
name = "storage_broker"
version = "0.1.0"

View File

@@ -69,6 +69,8 @@ remote_storage = { path = "../libs/remote_storage" }
tenant_size_model = { path = "../libs/tenant_size_model" }
utils = { path = "../libs/utils" }
workspace_hack = { version = "0.1", path = "../workspace_hack" }
rpds = "0.12.0"
im = "15.1.0"
[dev-dependencies]
criterion = "0.4"

View File

@@ -1,5 +1,7 @@
use anyhow::Result;
use num_traits::ToPrimitive;
use pageserver::repository::{Key, Value};
use pageserver::tenant::bst_layer_map::RetroactiveLayerMap;
use pageserver::tenant::filename::{DeltaFileName, ImageFileName};
use pageserver::tenant::layer_map::LayerMap;
use pageserver::tenant::storage_layer::Layer;
@@ -178,6 +180,7 @@ fn build_layer_map(filename_dump: PathBuf) -> LayerMap {
panic!("unexpected filename {fname}");
}
}
layer_map.rebuild_index();
println!("min: {min_lsn}, max: {max_lsn}");
@@ -243,23 +246,52 @@ fn bench_from_captest_env(c: &mut Criterion) {
// too long processing layer map queries.
fn bench_from_real_project(c: &mut Criterion) {
// TODO consider compressing this file
// Init layer map
let now = Instant::now();
let layer_map = build_layer_map(PathBuf::from("benches/odd-brook-layernames.txt"));
println!("Finished layer map init in {:?}", now.elapsed());
// Init bst layer map with the same layers
let now = Instant::now();
let mut bstlm = RetroactiveLayerMap::new();
for layer in layer_map.iter_historic_layers() {
let kr = layer.get_key_range();
let lr = layer.get_lsn_range();
bstlm.insert(
kr.start.to_i128()..kr.end.to_i128(),
lr.start.0..lr.end.0,
format!("Layer {}", lr.start.0),
);
}
bstlm.rebuild();
println!("Finished bst init in {:?}", now.elapsed());
// Choose uniformly distributed queries
let queries: Vec<(Key, Lsn)> = uniform_query_pattern(&layer_map);
// Test with uniform query pattern
c.bench_function("real_map_uniform_queries", |b| {
// Define and name the benchmark function
let mut group = c.benchmark_group("real_map_uniform_queries");
group.bench_function("current_code", |b| {
b.iter(|| {
for q in queries.clone().into_iter() {
layer_map.search(q.0, q.1).unwrap();
}
});
});
group.bench_function("persistent_bst", |b| {
b.iter(|| {
for q in queries.clone().into_iter() {
bstlm.query(q.0.to_i128(), q.1 .0);
}
});
});
group.finish();
}
// Benchmark using synthetic data. Arrange image layers on stacked diagonal lines.
fn bench_sequential(c: &mut Criterion) {
let mut layer_map = LayerMap::default();
// Init layer map. Create 100_000 layers arranged in 1000 diagonal lines.
//
// TODO This code is pretty slow and runs even if we're only running other
@@ -267,39 +299,59 @@ fn bench_sequential(c: &mut Criterion) {
// Putting it inside the `bench_function` closure is not a solution
// because then it runs multiple times during warmup.
let now = Instant::now();
let mut layer_map = LayerMap::default();
for i in 0..100_000 {
// TODO try inserting a super-wide layer in between every 10 to reflect
// what often happens with L1 layers that include non-rel changes.
// Maybe do that as a separate test.
let i32 = (i as u32) % 100;
let zero = Key::from_hex("000000000000000000000000000000000000").unwrap();
let layer = DummyImage {
key_range: zero.add(10 * i32)..zero.add(10 * i32 + 1),
lsn: Lsn(10 * i),
lsn: Lsn(i),
};
layer_map.insert_historic(Arc::new(layer));
}
layer_map.rebuild_index();
println!("Finished layer map init in {:?}", now.elapsed());
// Manually measure runtime without criterion because criterion
// has a minimum sample size of 10 and I don't want to run it 10 times.
println!("Finished init in {:?}", now.elapsed());
// Init bst layer map with the same layers
let now = Instant::now();
let mut bstlm = RetroactiveLayerMap::new();
for layer in layer_map.iter_historic_layers() {
let kr = layer.get_key_range();
let lr = layer.get_lsn_range();
bstlm.insert(
kr.start.to_i128()..kr.end.to_i128(),
lr.start.0..lr.end.0,
format!("Layer {}", lr.start.0),
);
}
bstlm.rebuild();
println!("Finished bst init in {:?}", now.elapsed());
// Choose 100 uniformly random queries
let rng = &mut StdRng::seed_from_u64(1);
let queries: Vec<(Key, Lsn)> = uniform_query_pattern(&layer_map)
.choose_multiple(rng, 1)
.choose_multiple(rng, 100)
.copied()
.collect();
// Define and name the benchmark function
c.bench_function("sequential_uniform_queries", |b| {
// Run the search queries
let mut group = c.benchmark_group("sequential_uniform_queries");
group.bench_function("current_code", |b| {
b.iter(|| {
for q in queries.clone().into_iter() {
layer_map.search(q.0, q.1).unwrap();
}
});
});
group.bench_function("persistent_bst", |b| {
b.iter(|| {
for q in queries.clone().into_iter() {
bstlm.query(q.0.to_i128(), q.1 .0);
}
});
});
group.finish();
}
criterion_group!(group_1, bench_from_captest_env);

View File

@@ -73,6 +73,7 @@ use utils::{
mod blob_io;
pub mod block_io;
pub mod bst_layer_map;
mod delta_layer;
mod disk_btree;
pub(crate) mod ephemeral_file;

View File

@@ -0,0 +1,407 @@
use std::collections::BTreeMap;
use std::ops::Range;
// TODO drop rpds. So far `im` looks 30% faster.
use rpds::RedBlackTreeMapSync;
use im::OrdMap;
/// Layer map implemented using persistent/immutable binary search tree.
/// It supports historical queries, but no retroactive inserts. For that
/// see RetroactiveLayerMap.
///
/// Layer type is abstracted as Value to make unit testing easier.
pub struct PersistentLayerMap<Value> {
/// Mapping key to the latest layer (if any) until the next key.
/// We use the Sync version of the map because we want Self to
/// be Sync.
///
/// TODO Separate Head into its own struct LatestLayerMap
/// TODO Merge historic with retroactive, into HistoricLayerMap
/// TODO Maintain a pair of heads, one for images, one for deltas.
/// This way we can query both of them with one BTreeMap query.
head: OrdMap<i128, Option<(u64, Value)>>,
/// All previous states of `self.head`
///
/// TODO: Sorted Vec + binary search could be slightly faster.
historic: BTreeMap<u64, OrdMap<i128, Option<(u64, Value)>>>,
}
impl<Value: std::fmt::Debug> std::fmt::Debug for PersistentLayerMap<Value> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let head_vec: Vec<_> = self.head.iter().collect();
write!(f, "PersistentLayerMap: head: {:?}", head_vec)
}
}
impl<T: Clone> Default for PersistentLayerMap<T> {
fn default() -> Self {
Self::new()
}
}
impl<Value: Clone> PersistentLayerMap<Value> {
pub fn new() -> Self {
Self {
head: OrdMap::default(),
historic: BTreeMap::default(),
}
}
/// Helper function to subdivide the key range without changing any values
fn add_node(self: &mut Self, key: i128) {
let value = match self.head.range(0..=key).last() {
Some((_, Some(v))) => Some(v.clone()),
Some((_, None)) => None,
None => None,
};
self.head.insert(key, value);
}
pub fn insert(self: &mut Self, key: Range<i128>, lsn: Range<u64>, value: Value) {
// It's only a persistent map, not a retroactive one
if let Some(last_entry) = self.historic.iter().rev().next() {
let last_lsn = last_entry.0;
if lsn.start == *last_lsn {
// TODO there are edge cases to take care of
}
if lsn.start < *last_lsn {
panic!("unexpected retroactive insert");
}
}
// NOTE The order of the following lines is important!!
// Add nodes at endpoints
self.add_node(key.start);
self.add_node(key.end);
// Raise the height where necessary
//
// NOTE This loop is worst case O(N), but amortized O(log N) in the special
// case when rectangles have no height. In practice I don't think we'll see
// the kind of layer intersections needed to trigger O(N) behavior. If we
// do it can be fixed using lazy propagation.
let mut to_update = Vec::new();
let mut to_remove = Vec::new();
let mut prev_covered = false;
for (k, node) in self.head.range(key.clone()) {
let needs_cover = match node {
None => true,
Some((h, _)) => h < &lsn.end,
};
if needs_cover {
match prev_covered {
true => to_remove.push(k.clone()),
false => to_update.push(k.clone()),
}
}
prev_covered = needs_cover;
}
if !prev_covered {
to_remove.push(key.end);
}
for k in to_update {
self.head
.insert(k.clone(), Some((lsn.end.clone(), value.clone())));
}
for k in to_remove {
self.head.remove(&k);
}
// Remember history. Clone is O(1)
self.historic.insert(lsn.start, self.head.clone());
}
pub fn query(self: &Self, key: i128, lsn: u64) -> Option<Value> {
let version = self.historic.range(0..=lsn).rev().next()?.1;
version
.get_prev(&key)?
// .range(0..=key).rev().next()?
// NOTE The canonical way to do this in other crates is
// `.range(0..=key).rev.next()` and `im` supports this
// API but it's 2x slower than `.get_prev(&key)`.
.1
.as_ref()
.map(|(_, v)| v.clone())
}
pub fn trim(self: &mut Self, begin: &u64) {
self.historic.split_off(begin);
self.head = self
.historic
.iter()
.rev()
.next()
.map(|(_, v)| v.clone())
.unwrap_or_default();
}
}
/// Basic test for the immutable bst library, just to show usage.
#[test]
fn test_immutable_bst_dependency() {
let map = RedBlackTreeMapSync::<i32, i32>::default();
let mut v1 = map.clone();
let v2 = map.insert(1, 5);
// We can query current and past versions of key 1
assert_eq!(v1.get(&1), None);
assert_eq!(v2.get(&1), Some(&5));
// We can mutate old state, but it creates a branch.
// It doesn't retroactively change future versions.
v1.insert_mut(2, 6);
assert_eq!(v1.get(&2), Some(&6));
assert_eq!(v2.get(&2), None);
}
/// This is the most basic test that demonstrates intended usage.
/// All layers in this test have height 1.
#[test]
fn test_persistent_simple() {
let mut map = PersistentLayerMap::<String>::new();
map.insert(0..5, 100..101, "Layer 1".to_string());
map.insert(3..9, 110..111, "Layer 2".to_string());
map.insert(5..6, 120..121, "Layer 3".to_string());
// After Layer 1 insertion
assert_eq!(map.query(1, 105), Some("Layer 1".to_string()));
assert_eq!(map.query(4, 105), Some("Layer 1".to_string()));
// After Layer 2 insertion
assert_eq!(map.query(4, 115), Some("Layer 2".to_string()));
assert_eq!(map.query(8, 115), Some("Layer 2".to_string()));
assert_eq!(map.query(11, 115), None);
// After Layer 3 insertion
assert_eq!(map.query(4, 125), Some("Layer 2".to_string()));
assert_eq!(map.query(5, 125), Some("Layer 3".to_string()));
assert_eq!(map.query(7, 125), Some("Layer 2".to_string()));
}
/// Cover simple off-by-one edge cases
#[test]
fn test_off_by_one() {
let mut map = PersistentLayerMap::<String>::new();
map.insert(3..5, 100..110, "Layer 1".to_string());
// Check different LSNs
assert_eq!(map.query(4, 99), None);
assert_eq!(map.query(4, 100), Some("Layer 1".to_string()));
// Check different keys
assert_eq!(map.query(2, 105), None);
assert_eq!(map.query(3, 105), Some("Layer 1".to_string()));
assert_eq!(map.query(4, 105), Some("Layer 1".to_string()));
assert_eq!(map.query(5, 105), None);
}
/// Cover edge cases where layers begin or end on the same key
#[test]
fn test_key_collision() {
let mut map = PersistentLayerMap::<String>::new();
map.insert(3..5, 100..110, "Layer 10".to_string());
map.insert(5..8, 100..110, "Layer 11".to_string());
map.insert(3..4, 200..210, "Layer 20".to_string());
// Check after layer 11
assert_eq!(map.query(2, 105), None);
assert_eq!(map.query(3, 105), Some("Layer 10".to_string()));
assert_eq!(map.query(5, 105), Some("Layer 11".to_string()));
assert_eq!(map.query(7, 105), Some("Layer 11".to_string()));
assert_eq!(map.query(8, 105), None);
// Check after layer 20
assert_eq!(map.query(2, 205), None);
assert_eq!(map.query(3, 205), Some("Layer 20".to_string()));
assert_eq!(map.query(5, 205), Some("Layer 11".to_string()));
assert_eq!(map.query(7, 205), Some("Layer 11".to_string()));
assert_eq!(map.query(8, 205), None);
}
/// Test when rectangles have nontrivial height and possibly overlap
#[test]
fn test_persistent_overlapping() {
let mut map = PersistentLayerMap::<String>::new();
// Add 3 key-disjoint layers with varying LSN ranges
map.insert(1..2, 100..200, "Layer 1".to_string());
map.insert(4..5, 110..200, "Layer 2".to_string());
map.insert(7..8, 120..300, "Layer 3".to_string());
// Add wide and short layer
map.insert(0..9, 130..199, "Layer 4".to_string());
// Add wide layer taller than some
map.insert(0..9, 140..201, "Layer 5".to_string());
// Add wide layer taller than all
map.insert(0..9, 150..301, "Layer 6".to_string());
// After layer 4 insertion
assert_eq!(map.query(0, 135), Some("Layer 4".to_string()));
assert_eq!(map.query(1, 135), Some("Layer 1".to_string()));
assert_eq!(map.query(2, 135), Some("Layer 4".to_string()));
assert_eq!(map.query(4, 135), Some("Layer 2".to_string()));
assert_eq!(map.query(5, 135), Some("Layer 4".to_string()));
assert_eq!(map.query(7, 135), Some("Layer 3".to_string()));
assert_eq!(map.query(8, 135), Some("Layer 4".to_string()));
// After layer 5 insertion
assert_eq!(map.query(0, 145), Some("Layer 5".to_string()));
assert_eq!(map.query(1, 145), Some("Layer 5".to_string()));
assert_eq!(map.query(2, 145), Some("Layer 5".to_string()));
assert_eq!(map.query(4, 145), Some("Layer 5".to_string()));
assert_eq!(map.query(5, 145), Some("Layer 5".to_string()));
assert_eq!(map.query(7, 145), Some("Layer 3".to_string()));
assert_eq!(map.query(8, 145), Some("Layer 5".to_string()));
// After layer 6 insertion
assert_eq!(map.query(0, 155), Some("Layer 6".to_string()));
assert_eq!(map.query(1, 155), Some("Layer 6".to_string()));
assert_eq!(map.query(2, 155), Some("Layer 6".to_string()));
assert_eq!(map.query(4, 155), Some("Layer 6".to_string()));
assert_eq!(map.query(5, 155), Some("Layer 6".to_string()));
assert_eq!(map.query(7, 155), Some("Layer 6".to_string()));
assert_eq!(map.query(8, 155), Some("Layer 6".to_string()));
}
/// Layer map that supports:
/// - efficient historical queries
/// - efficient append only updates
/// - tombstones and similar methods for non-latest updates
/// - compaction/rebuilding to remove tombstones
///
/// See this for better retroactive techniques we can try
/// https://www.youtube.com/watch?v=WqCWghETNDc&t=581s
///
/// Layer type is abstracted as Value to make unit testing easier.
pub struct RetroactiveLayerMap<Value> {
/// A persistent layer map that we rebuild when we need to retroactively update
map: PersistentLayerMap<Value>,
/// We buffer insertion into the PersistentLayerMap to decrease the number of rebuilds.
/// A value of None means we want to delete this item.
buffer: BTreeMap<(u64, u64, i128, i128), Option<Value>>,
/// All current layers. This is not used for search. Only to make rebuilds easier.
layers: BTreeMap<(u64, u64, i128, i128), Value>,
}
impl<Value: std::fmt::Debug> std::fmt::Debug for RetroactiveLayerMap<Value> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "RetroactiveLayerMap: head: {:?}", self.map)
}
}
impl<T: Clone> Default for RetroactiveLayerMap<T> {
fn default() -> Self {
Self::new()
}
}
impl<Value: Clone> RetroactiveLayerMap<Value> {
pub fn new() -> Self {
Self {
map: PersistentLayerMap::<Value>::new(),
buffer: BTreeMap::new(),
layers: BTreeMap::new(),
}
}
pub fn insert(self: &mut Self, key: Range<i128>, lsn: Range<u64>, value: Value) {
self.buffer.insert(
(lsn.start, lsn.end, key.start, key.end),
Some(value.clone()),
);
}
pub fn remove(self: &mut Self, key: Range<i128>, lsn: Range<u64>) {
self.buffer
.insert((lsn.start, lsn.end, key.start, key.end), None);
}
pub fn rebuild(self: &mut Self) {
// Find the first LSN that needs to be rebuilt
let rebuild_since: u64 = match self.buffer.iter().next() {
Some(((lsn_start, _, _, _), _)) => lsn_start.clone(),
None => return, // No need to rebuild if buffer is empty
};
// Apply buffered updates to self.layers
self.buffer.retain(|rect, layer| {
match layer {
Some(l) => {
let existing = self.layers.insert(rect.clone(), l.clone());
if existing.is_some() {
panic!("can't overwrite layer");
}
}
None => {
let existing = self.layers.remove(rect);
if existing.is_none() {
panic!("invalid layer deletion");
}
}
};
false
});
// Rebuild
self.map.trim(&rebuild_since);
for ((lsn_start, lsn_end, key_start, key_end), layer) in
self.layers.range((rebuild_since, 0, 0, 0)..)
{
self.map
.insert(*key_start..*key_end, *lsn_start..*lsn_end, layer.clone());
}
}
pub fn clear(self: &mut Self) {
self.map.trim(&0);
}
pub fn query(self: &Self, key: i128, lsn: u64) -> Option<Value> {
if !self.buffer.is_empty() {
panic!("rebuild pls")
}
self.map.query(key, lsn)
}
}
#[test]
fn test_retroactive_simple() {
let mut map = RetroactiveLayerMap::new();
// Append some images in increasing LSN order
map.insert(0..5, 100..101, "Image 1".to_string());
map.insert(3..9, 110..111, "Image 2".to_string());
map.insert(4..6, 120..121, "Image 3".to_string());
map.insert(8..9, 120..121, "Image 4".to_string());
// Add a delta layer out of order
map.insert(2..5, 105..106, "Delta 1".to_string());
// Rebuild so we can start querying
map.rebuild();
// Query key 4
assert_eq!(map.query(4, 90), None);
assert_eq!(map.query(4, 102), Some("Image 1".to_string()));
assert_eq!(map.query(4, 107), Some("Delta 1".to_string()));
assert_eq!(map.query(4, 115), Some("Image 2".to_string()));
assert_eq!(map.query(4, 125), Some("Image 3".to_string()));
// Remove Image 3
map.remove(4..6, 120..121);
map.rebuild();
// Check deletion worked
assert_eq!(map.query(4, 125), Some("Image 2".to_string()));
assert_eq!(map.query(8, 125), Some("Image 4".to_string()));
}

View File

@@ -28,6 +28,8 @@ use std::sync::Arc;
use tracing::*;
use utils::lsn::Lsn;
use super::bst_layer_map::RetroactiveLayerMap;
///
/// LayerMap tracks what layers exist on a timeline.
///
@@ -55,6 +57,11 @@ pub struct LayerMap {
/// All the historic layers are kept here
historic_layers: RTree<LayerRTreeObject>,
/// HACK I'm experimenting with a new index to reaplace the RTree. If this
/// works out I'll clean up the struct later.
index: RetroactiveLayerMap<Arc<dyn Layer>>,
images: RetroactiveLayerMap<Arc<dyn Layer>>,
/// L0 layers have key range Key::MIN..Key::MAX, and locating them using R-Tree search is very inefficient.
/// So L0 layers are held in l0_delta_layers vector, in addition to the R-tree.
l0_delta_layers: Vec<Arc<dyn Layer>>,
@@ -241,6 +248,65 @@ impl LayerMap {
/// layer.
///
pub fn search(&self, key: Key, end_lsn: Lsn) -> Result<Option<SearchResult>> {
let old = self.search_old(key, end_lsn)?;
let new = self.search_new(key, end_lsn)?;
match (&old, &new) {
(None, None) => {}
(None, Some(_)) => panic!("returned Some, expected None"),
(Some(_), None) => panic!("returned None, expected Some"),
(Some(old), Some(new)) => {
// TODO be more verbose and flexible
let context = format!("query: key {}, end_lsn: {}", key, end_lsn);
assert_eq!(old.layer.filename(), new.layer.filename(), "{}", context);
assert_eq!(old.lsn_floor, new.lsn_floor, "{}", context);
}
}
return Ok(new);
}
// HACK just testing correctness
fn search_new(&self, key: Key, end_lsn: Lsn) -> Result<Option<SearchResult>> {
// TODO I'm making two separate queries, which is 2x the cost, but that
// can be avoided in varous ways. Caching latest_image queries is
// probably the simplest, but combining the two data structures
// might be better.
let latest_layer = self.index.query(key.to_i128(), end_lsn.0 - 1);
let latest_image = self.images.query(key.to_i128(), end_lsn.0 - 1);
// Check for exact match
let latest_image = if let Some(image) = latest_image {
let img_lsn = image.get_lsn_range().start;
if Lsn(img_lsn.0 + 1) == end_lsn {
return Ok(Some(SearchResult {
layer: image,
lsn_floor: img_lsn,
}));
}
// HACK just to give back ownership of latest_image to parent scope.
// There's definitely a cleaner way to do it.
Some(image)
} else {
None
};
return Ok(latest_layer.map(|layer| {
// Compute lsn_floor
let mut lsn_floor = layer.get_lsn_range().start;
if let Some(image) = latest_image {
if layer.is_incremental() {
lsn_floor = std::cmp::max(lsn_floor, image.get_lsn_range().start + 1)
}
}
SearchResult {
layer,
lsn_floor,
}
}));
}
// HACK just testing correctness
fn search_old(&self, key: Key, end_lsn: Lsn) -> Result<Option<SearchResult>> {
// linear search
// Find the latest image layer that covers the given key
let mut latest_img: Option<Arc<dyn Layer>> = None;
@@ -345,19 +411,50 @@ impl LayerMap {
/// Insert an on-disk layer
///
pub fn insert_historic(&mut self, layer: Arc<dyn Layer>) {
let kr = layer.get_key_range();
let lr = layer.get_lsn_range();
self.index.insert(
kr.start.to_i128()..kr.end.to_i128(),
lr.start.0..lr.end.0,
Arc::clone(&layer),
);
if !layer.is_incremental() {
self.images.insert(
kr.start.to_i128()..kr.end.to_i128(),
lr.start.0..lr.end.0,
Arc::clone(&layer),
);
}
if layer.get_key_range() == (Key::MIN..Key::MAX) {
self.l0_delta_layers.push(layer.clone());
}
// TODO remove this so insert isn't slow. I need it for now for iter_historic()
self.historic_layers.insert(LayerRTreeObject::new(layer));
NUM_ONDISK_LAYERS.inc();
}
/// Must be called after a batch of insert_historic calls, before querying
pub fn rebuild_index(&mut self) {
self.index.rebuild();
self.images.rebuild();
}
///
/// Remove an on-disk layer from the map.
///
/// This should be called when the corresponding file on disk has been deleted.
///
pub fn remove_historic(&mut self, layer: Arc<dyn Layer>) {
let kr = layer.get_key_range();
let lr = layer.get_lsn_range();
self.index
.remove(kr.start.to_i128()..kr.end.to_i128(), lr.start.0..lr.end.0);
if !layer.is_incremental() {
self.images
.remove(kr.start.to_i128()..kr.end.to_i128(), lr.start.0..lr.end.0);
}
if layer.get_key_range() == (Key::MIN..Key::MAX) {
let len_before = self.l0_delta_layers.len();
@@ -586,3 +683,5 @@ impl LayerMap {
Ok(())
}
}
// TODO add layer map tests

View File

@@ -931,6 +931,7 @@ impl Timeline {
trace!("found layer {}", layer.filename().display());
total_physical_size += layer.path().metadata()?.len();
layers.insert_historic(Arc::new(layer));
layers.rebuild_index();
num_layers += 1;
} else if let Some(deltafilename) = DeltaFileName::parse_str(&fname) {
// Create a DeltaLayer struct for each delta file.
@@ -955,6 +956,7 @@ impl Timeline {
trace!("found layer {}", layer.filename().display());
total_physical_size += layer.path().metadata()?.len();
layers.insert_historic(Arc::new(layer));
layers.rebuild_index();
num_layers += 1;
} else if fname == METADATA_FILE_NAME || fname.ends_with(".old") {
// ignore these
@@ -1085,10 +1087,11 @@ impl Timeline {
let image_layer =
ImageLayer::new(self.conf, self.timeline_id, self.tenant_id, &imgfilename);
self.layers
.write()
.unwrap()
.insert_historic(Arc::new(image_layer));
{
let mut layers = self.layers.write().unwrap();
layers.insert_historic(Arc::new(image_layer));
layers.rebuild_index();
}
self.metrics.current_physical_size_gauge.add(sz);
} else if let Some(deltafilename) = DeltaFileName::parse_str(fname) {
// Create a DeltaLayer struct for each delta file.
@@ -1115,10 +1118,11 @@ impl Timeline {
let delta_layer =
DeltaLayer::new(self.conf, self.timeline_id, self.tenant_id, &deltafilename);
self.layers
.write()
.unwrap()
.insert_historic(Arc::new(delta_layer));
{
let mut layers = self.layers.write().unwrap();
layers.insert_historic(Arc::new(delta_layer));
layers.rebuild_index();
}
self.metrics.current_physical_size_gauge.add(sz);
} else {
bail!("unexpected layer filename in remote storage: {}", fname);
@@ -1811,6 +1815,7 @@ impl Timeline {
{
let mut layers = self.layers.write().unwrap();
layers.insert_historic(Arc::new(new_delta));
layers.rebuild_index();
}
// update the timeline's physical size
@@ -1970,6 +1975,7 @@ impl Timeline {
self.metrics.current_physical_size_gauge.add(metadata.len());
layers.insert_historic(Arc::new(l));
}
layers.rebuild_index();
drop(layers);
timer.stop_and_record();
@@ -2276,6 +2282,7 @@ impl Timeline {
new_layer_paths.insert(new_delta_path, LayerFileMetadata::new(metadata.len()));
layers.insert_historic(Arc::new(l));
layers.rebuild_index();
}
// Now that we have reshuffled the data to set of new delta layers, we can
@@ -2291,6 +2298,7 @@ impl Timeline {
l.delete()?;
layers.remove_historic(l);
}
layers.rebuild_index();
drop(layers);
// Also schedule the deletions in remote storage
@@ -2587,6 +2595,7 @@ impl Timeline {
layers.remove_historic(doomed_layer);
result.layers_removed += 1;
}
layers.rebuild_index();
info!(
"GC completed removing {} layers, cutoff {}",