Implement layer map using immutable BST (#2998)

This commit is contained in:
bojanserafimov
2023-01-20 16:10:12 -05:00
committed by GitHub
parent 36f048d6b0
commit a3d7ad2d52
11 changed files with 1471 additions and 670 deletions

173
Cargo.lock generated
View File

@@ -37,11 +37,6 @@ dependencies = [
"memchr",
]
[[package]]
name = "amplify_num"
version = "0.4.1"
source = "git+https://github.com/rust-amplify/rust-amplify.git?tag=v4.0.0-beta.1#3ad006cf2804e1862ec7725a7684a493f3023523"
[[package]]
name = "android_system_properties"
version = "0.1.5"
@@ -66,6 +61,15 @@ dependencies = [
"backtrace",
]
[[package]]
name = "archery"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0a8da9bc4c4053ee067669762bcaeea6e241841295a2b6c948312dad6ef4cc02"
dependencies = [
"static_assertions",
]
[[package]]
name = "asn1-rs"
version = "0.5.1"
@@ -137,15 +141,6 @@ dependencies = [
"syn",
]
[[package]]
name = "atomic-polyfill"
version = "0.1.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3ff7eb3f316534d83a8a2c3d1674ace8a5a71198eba31e2e2b597833f699b28"
dependencies = [
"critical-section",
]
[[package]]
name = "atty"
version = "0.2.14"
@@ -629,9 +624,9 @@ dependencies = [
[[package]]
name = "bumpalo"
version = "3.11.1"
version = "3.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "572f695136211188308f16ad2ca5c851a712c464060ae6974944458eb83880ba"
checksum = "0d261e256854913907f67ed06efbc3338dfe6179796deefc1ff763fc1aee5535"
[[package]]
name = "byteorder"
@@ -750,13 +745,13 @@ dependencies = [
[[package]]
name = "clap"
version = "4.0.32"
version = "4.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a7db700bc935f9e43e88d00b0850dae18a63773cfbec6d8e070fccf7fef89a39"
checksum = "4ec7a4128863c188deefe750ac1d1dfe66c236909f845af04beed823638dc1b2"
dependencies = [
"bitflags",
"clap_derive",
"clap_lex 0.3.0",
"clap_lex 0.3.1",
"is-terminal",
"once_cell",
"strsim",
@@ -765,9 +760,9 @@ dependencies = [
[[package]]
name = "clap_derive"
version = "4.0.21"
version = "4.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0177313f9f02afc995627906bbd8967e2be069f5261954222dac78290c2b9014"
checksum = "684a277d672e91966334af371f1a7b5833f9aa00b07c84e92fbce95e00208ce8"
dependencies = [
"heck",
"proc-macro-error",
@@ -787,9 +782,9 @@ dependencies = [
[[package]]
name = "clap_lex"
version = "0.3.0"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0d4198f73e42b4936b35b5bb248d81d2b595ecb170da0bac7655c54eedfa8da8"
checksum = "783fe232adfca04f90f56201b26d79682d4cd2625e0bc7290b95123afe558ade"
dependencies = [
"os_str_bytes",
]
@@ -832,7 +827,7 @@ version = "0.1.0"
dependencies = [
"anyhow",
"chrono",
"clap 4.0.32",
"clap 4.1.1",
"futures",
"hyper",
"notify",
@@ -887,7 +882,7 @@ name = "control_plane"
version = "0.1.0"
dependencies = [
"anyhow",
"clap 4.0.32",
"clap 4.1.1",
"comfy-table",
"git-version",
"nix",
@@ -988,12 +983,6 @@ dependencies = [
"itertools",
]
[[package]]
name = "critical-section"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6548a0ad5d2549e111e1f6a11a6c2e2d00ce6a3dafe22948d67c2b443f775e52"
[[package]]
name = "crossbeam-channel"
version = "0.5.6"
@@ -1030,12 +1019,11 @@ dependencies = [
[[package]]
name = "crossbeam-utils"
version = "0.8.11"
version = "0.8.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51887d4adc7b564537b15adcfb307936f8075dfcd5f00dde9a9f1d29383682bc"
checksum = "4fb766fa798726286dbbb842f174001dab8abc7b627a1dd86e0b7222a95d929f"
dependencies = [
"cfg-if",
"once_cell",
]
[[package]]
@@ -1506,15 +1494,6 @@ version = "1.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eabb4a44450da02c90444cf74558da904edde8fb4e9035a9a6a4e15445af0bd7"
[[package]]
name = "hash32"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b0c35f58762feb77d74ebe43bdbc3210f09be9fe6742234d573bacc26ed92b67"
dependencies = [
"byteorder",
]
[[package]]
name = "hashbrown"
version = "0.12.3"
@@ -1530,19 +1509,6 @@ dependencies = [
"ahash",
]
[[package]]
name = "heapless"
version = "0.7.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "db04bc24a18b9ea980628ecf00e6c0264f3c1426dac36c00cb49b6fbad8b0743"
dependencies = [
"atomic-polyfill",
"hash32",
"rustc_version",
"spin 0.9.4",
"stable_deref_trait",
]
[[package]]
name = "heck"
version = "0.4.0"
@@ -1804,9 +1770,9 @@ dependencies = [
[[package]]
name = "io-lifetimes"
version = "1.0.3"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "46112a93252b123d31a119a8d1a1ac19deac4fac6e0e8b0df58f0d4e5870e63c"
checksum = "e7d6c6f8c91b4b9ed43484ad1a938e393caf35960fce7f82a040497207bd8e9e"
dependencies = [
"libc",
"windows-sys",
@@ -1916,12 +1882,6 @@ dependencies = [
"winapi",
]
[[package]]
name = "libm"
version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "348108ab3fba42ec82ff6e9564fc4ca0247bdccdc68dd8af9764bbc79c3c8ffb"
[[package]]
name = "link-cplusplus"
version = "1.0.8"
@@ -2067,9 +2027,9 @@ checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a"
[[package]]
name = "nix"
version = "0.26.1"
version = "0.26.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "46a58d1d356c6597d08cde02c2f09d785b09e28711837b1ed667dc652c08a694"
checksum = "bfdda3d196821d6af13126e40375cdf7da646a96114af134d5f417a9a1dc8e1a"
dependencies = [
"bitflags",
"cfg-if",
@@ -2081,9 +2041,9 @@ dependencies = [
[[package]]
name = "nom"
version = "7.1.2"
version = "7.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5507769c4919c998e69e49c839d9dc6e693ede4cc4290d6ad8b41d4f09c548c"
checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a"
dependencies = [
"memchr",
"minimal-lexical",
@@ -2154,7 +2114,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "578ede34cf02f8924ab9447f50c28075b4d3e5b269972345e7e0372b38c6cdcd"
dependencies = [
"autocfg",
"libm",
]
[[package]]
@@ -2230,14 +2189,13 @@ checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
name = "pageserver"
version = "0.1.0"
dependencies = [
"amplify_num",
"anyhow",
"async-stream",
"async-trait",
"byteorder",
"bytes",
"chrono",
"clap 4.0.32",
"clap 4.1.1",
"close_fds",
"const_format",
"consumption_metrics",
@@ -2269,7 +2227,7 @@ dependencies = [
"regex",
"remote_storage",
"reqwest",
"rstar",
"rpds",
"scopeguard",
"serde",
"serde_json",
@@ -2581,9 +2539,9 @@ checksum = "dc375e1527247fe1a97d8b7156678dfe7c1af2fc075c9a4db3690ecd2a148068"
[[package]]
name = "proc-macro2"
version = "1.0.49"
version = "1.0.50"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "57a8eca9f9c4ffde41714334dee777596264c7825420f521abc92b5b5deb63a5"
checksum = "6ef7d57beacfaf2d8aee5937dab7b7f28de3cb8b1828479bb5de2a7106f2bae2"
dependencies = [
"unicode-ident",
]
@@ -2683,7 +2641,7 @@ dependencies = [
"bstr",
"bytes",
"chrono",
"clap 4.0.32",
"clap 4.1.1",
"consumption_metrics",
"futures",
"git-version",
@@ -2742,14 +2700,13 @@ dependencies = [
[[package]]
name = "rand"
version = "0.8.4"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2e7573632e6454cf6b99d7aac4ccca54be06da05aca2ef7423d22d27d4d4bcd8"
checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404"
dependencies = [
"libc",
"rand_chacha",
"rand_core",
"rand_hc",
]
[[package]]
@@ -2771,15 +2728,6 @@ dependencies = [
"getrandom",
]
[[package]]
name = "rand_hc"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d51e9f596de227fda2ea6c84607f5558e196eeaf43c986b724ba4fb8fdf497e7"
dependencies = [
"rand_core",
]
[[package]]
name = "rayon"
version = "1.6.1"
@@ -2930,7 +2878,7 @@ dependencies = [
"cc",
"libc",
"once_cell",
"spin 0.5.2",
"spin",
"untrusted",
"web-sys",
"winapi",
@@ -2950,14 +2898,12 @@ dependencies = [
]
[[package]]
name = "rstar"
version = "0.9.3"
name = "rpds"
version = "0.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b40f1bfe5acdab44bc63e6699c28b74f75ec43afb59f3eda01e145aff86a25fa"
checksum = "66262ea963eff99163e6b741fbc3417a52cc13074728c1047e9911789df9b000"
dependencies = [
"heapless",
"num-traits",
"smallvec",
"archery",
]
[[package]]
@@ -3018,9 +2964,9 @@ dependencies = [
[[package]]
name = "rustix"
version = "0.36.6"
version = "0.36.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4feacf7db682c6c329c4ede12649cd36ecab0f3be5b7d74e6a20304725db4549"
checksum = "d4fdebc4b395b7fbb9ab11e462e20ed9051e7b16e42d24042c776eca0ac81b03"
dependencies = [
"bitflags",
"errno",
@@ -3093,7 +3039,7 @@ dependencies = [
"async-trait",
"byteorder",
"bytes",
"clap 4.0.32",
"clap 4.1.1",
"const_format",
"crc32c",
"fs2",
@@ -3479,21 +3425,6 @@ version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
[[package]]
name = "spin"
version = "0.9.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f6002a767bff9e83f8eeecf883ecb8011875a21ae8da43bffb817a57e78cc09"
dependencies = [
"lock_api",
]
[[package]]
name = "stable_deref_trait"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3"
[[package]]
name = "static_assertions"
version = "1.1.0"
@@ -3507,7 +3438,7 @@ dependencies = [
"anyhow",
"async-stream",
"bytes",
"clap 4.0.32",
"clap 4.1.1",
"const_format",
"futures",
"futures-core",
@@ -3639,9 +3570,9 @@ dependencies = [
[[package]]
name = "termcolor"
version = "1.1.3"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bab24d30b911b2376f3a13cc2cd443142f0c81dda04c118693e35b3835757755"
checksum = "be55cf8942feac5c765c2c993422806843c9a9a45d4d5c407ad6dd2ea95eb9b6"
dependencies = [
"winapi-util",
]
@@ -3749,9 +3680,9 @@ dependencies = [
[[package]]
name = "tokio"
version = "1.24.1"
version = "1.24.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d9f76183f91ecfb55e1d7d5602bd1d979e38a3a522fe900241cf195624d67ae"
checksum = "597a12a59981d9e3c38d216785b0c37399f6e415e8d0712047620f189371b0bb"
dependencies = [
"autocfg",
"bytes",
@@ -4183,9 +4114,9 @@ checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a"
[[package]]
name = "ureq"
version = "2.6.1"
version = "2.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "733b5ad78377302af52c0dbcb2623d78fe50e4b3bf215948ff29e9ee031d8566"
checksum = "338b31dd1314f68f3aabf3ed57ab922df95ffcd902476ca7ba3c4ce7b908c46d"
dependencies = [
"base64 0.13.1",
"log",
@@ -4287,7 +4218,7 @@ name = "wal_craft"
version = "0.1.0"
dependencies = [
"anyhow",
"clap 4.0.32",
"clap 4.1.1",
"env_logger",
"log",
"once_cell",
@@ -4534,7 +4465,7 @@ dependencies = [
"anyhow",
"bytes",
"chrono",
"clap 4.0.32",
"clap 4.1.1",
"crossbeam-utils",
"either",
"fail",

View File

@@ -69,7 +69,7 @@ rand = "0.8"
regex = "1.4"
reqwest = { version = "0.11", default-features = false, features = ["rustls-tls"] }
routerify = "3"
rstar = "0.9.3"
rpds = "0.12.0"
rustls = "0.20"
rustls-pemfile = "1"
rustls-split = "0.3"
@@ -107,9 +107,6 @@ x509-parser = "0.14"
env_logger = "0.10"
log = "0.4"
## TODO switch when the new release is made
amplify_num = { git = "https://github.com/rust-amplify/rust-amplify.git", tag = "v4.0.0-beta.1" }
## Libraries from neondatabase/ git forks, ideally with changes to be upstreamed
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="43e6db254a97fdecbce33d8bc0890accfd74495e" }
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="43e6db254a97fdecbce33d8bc0890accfd74495e" }

View File

@@ -11,7 +11,6 @@ default = []
testing = ["fail/failpoints"]
[dependencies]
amplify_num.workspace = true
anyhow.workspace = true
async-stream.workspace = true
async-trait.workspace = true
@@ -41,7 +40,6 @@ postgres-protocol.workspace = true
postgres-types.workspace = true
rand.workspace = true
regex.workspace = true
rstar.workspace = true
scopeguard.workspace = true
serde.workspace = true
serde_json = { workspace = true, features = ["raw_value"] }
@@ -68,6 +66,7 @@ tenant_size_model.workspace = true
utils.workspace = true
workspace_hack.workspace = true
reqwest.workspace = true
rpds.workspace = true
[dev-dependencies]
criterion.workspace = true

View File

@@ -1,13 +1,12 @@
use anyhow::Result;
use pageserver::keyspace::{KeyPartitioning, KeySpace};
use pageserver::repository::Key;
use pageserver::tenant::layer_map::LayerMap;
use pageserver::tenant::storage_layer::{DeltaFileName, ImageFileName, ValueReconstructState};
use pageserver::tenant::storage_layer::{Layer, ValueReconstructResult};
use pageserver::tenant::storage_layer::Layer;
use pageserver::tenant::storage_layer::{DeltaFileName, ImageFileName, LayerDescriptor};
use rand::prelude::{SeedableRng, SliceRandom, StdRng};
use std::cmp::{max, min};
use std::fs::File;
use std::io::{BufRead, BufReader};
use std::ops::Range;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Arc;
@@ -17,102 +16,35 @@ use utils::lsn::Lsn;
use criterion::{criterion_group, criterion_main, Criterion};
struct DummyDelta {
key_range: Range<Key>,
lsn_range: Range<Lsn>,
}
impl Layer for DummyDelta {
fn get_key_range(&self) -> Range<Key> {
self.key_range.clone()
}
fn get_lsn_range(&self) -> Range<Lsn> {
self.lsn_range.clone()
}
fn get_value_reconstruct_data(
&self,
_key: Key,
_lsn_range: Range<Lsn>,
_reconstruct_data: &mut ValueReconstructState,
) -> Result<ValueReconstructResult> {
panic!()
}
fn is_incremental(&self) -> bool {
true
}
fn dump(&self, _verbose: bool) -> Result<()> {
unimplemented!()
}
fn short_id(&self) -> String {
unimplemented!()
}
}
struct DummyImage {
key_range: Range<Key>,
lsn: Lsn,
}
impl Layer for DummyImage {
fn get_key_range(&self) -> Range<Key> {
self.key_range.clone()
}
fn get_lsn_range(&self) -> Range<Lsn> {
// End-bound is exclusive
self.lsn..(self.lsn + 1)
}
fn get_value_reconstruct_data(
&self,
_key: Key,
_lsn_range: Range<Lsn>,
_reconstruct_data: &mut ValueReconstructState,
) -> Result<ValueReconstructResult> {
panic!()
}
fn is_incremental(&self) -> bool {
false
}
fn dump(&self, _verbose: bool) -> Result<()> {
unimplemented!()
}
fn short_id(&self) -> String {
unimplemented!()
}
}
fn build_layer_map(filename_dump: PathBuf) -> LayerMap<dyn Layer> {
let mut layer_map = LayerMap::<dyn Layer>::default();
fn build_layer_map(filename_dump: PathBuf) -> LayerMap<LayerDescriptor> {
let mut layer_map = LayerMap::<LayerDescriptor>::default();
let mut min_lsn = Lsn(u64::MAX);
let mut max_lsn = Lsn(0);
let filenames = BufReader::new(File::open(filename_dump).unwrap()).lines();
let mut updates = layer_map.batch_update();
for fname in filenames {
let fname = &fname.unwrap();
if let Some(imgfilename) = ImageFileName::parse_str(fname) {
let layer = DummyImage {
key_range: imgfilename.key_range,
lsn: imgfilename.lsn,
let layer = LayerDescriptor {
key: imgfilename.key_range,
lsn: imgfilename.lsn..(imgfilename.lsn + 1),
is_incremental: false,
short_id: fname.to_string(),
};
layer_map.insert_historic(Arc::new(layer));
updates.insert_historic(Arc::new(layer));
min_lsn = min(min_lsn, imgfilename.lsn);
max_lsn = max(max_lsn, imgfilename.lsn);
} else if let Some(deltafilename) = DeltaFileName::parse_str(fname) {
let layer = DummyDelta {
key_range: deltafilename.key_range,
lsn_range: deltafilename.lsn_range.clone(),
let layer = LayerDescriptor {
key: deltafilename.key_range.clone(),
lsn: deltafilename.lsn_range.clone(),
is_incremental: true,
short_id: fname.to_string(),
};
layer_map.insert_historic(Arc::new(layer));
updates.insert_historic(Arc::new(layer));
min_lsn = min(min_lsn, deltafilename.lsn_range.start);
max_lsn = max(max_lsn, deltafilename.lsn_range.end);
} else {
@@ -122,11 +54,12 @@ fn build_layer_map(filename_dump: PathBuf) -> LayerMap<dyn Layer> {
println!("min: {min_lsn}, max: {max_lsn}");
updates.flush();
layer_map
}
/// Construct a layer map query pattern for benchmarks
fn uniform_query_pattern(layer_map: &LayerMap<dyn Layer>) -> Vec<(Key, Lsn)> {
fn uniform_query_pattern(layer_map: &LayerMap<LayerDescriptor>) -> Vec<(Key, Lsn)> {
// For each image layer we query one of the pages contained, at LSN right
// before the image layer was created. This gives us a somewhat uniform
// coverage of both the lsn and key space because image layers have
@@ -150,6 +83,41 @@ fn uniform_query_pattern(layer_map: &LayerMap<dyn Layer>) -> Vec<(Key, Lsn)> {
.collect()
}
// Construct a partitioning for testing get_difficulty map when we
// don't have an exact result of `collect_keyspace` to work with.
fn uniform_key_partitioning(layer_map: &LayerMap<LayerDescriptor>, _lsn: Lsn) -> KeyPartitioning {
let mut parts = Vec::new();
// We add a partition boundary at the start of each image layer,
// no matter what lsn range it covers. This is just the easiest
// thing to do. A better thing to do would be to get a real
// partitioning from some database. Even better, remove the need
// for key partitions by deciding where to create image layers
// directly based on a coverage-based difficulty map.
let mut keys: Vec<_> = layer_map
.iter_historic_layers()
.filter_map(|l| {
if l.is_incremental() {
None
} else {
let kr = l.get_key_range();
Some(kr.start.next())
}
})
.collect();
keys.sort();
let mut current_key = Key::from_hex("000000000000000000000000000000000000").unwrap();
for key in keys {
parts.push(KeySpace {
ranges: vec![current_key..key],
});
current_key = key;
}
KeyPartitioning { parts }
}
// Benchmark using metadata extracted from our performance test environment, from
// a project where we have run pgbench many timmes. The pgbench database was initialized
// between each test run.
@@ -183,24 +151,68 @@ fn bench_from_captest_env(c: &mut Criterion) {
// Benchmark using metadata extracted from a real project that was taknig
// too long processing layer map queries.
fn bench_from_real_project(c: &mut Criterion) {
// TODO consider compressing this file
// Init layer map
let now = Instant::now();
let layer_map = build_layer_map(PathBuf::from("benches/odd-brook-layernames.txt"));
println!("Finished layer map init in {:?}", now.elapsed());
// Choose uniformly distributed queries
let queries: Vec<(Key, Lsn)> = uniform_query_pattern(&layer_map);
// Test with uniform query pattern
c.bench_function("real_map_uniform_queries", |b| {
// Choose inputs for get_difficulty_map
let latest_lsn = layer_map
.iter_historic_layers()
.map(|l| l.get_lsn_range().end)
.max()
.unwrap();
let partitioning = uniform_key_partitioning(&layer_map, latest_lsn);
// Check correctness of get_difficulty_map
// TODO put this in a dedicated test outside of this mod
{
println!("running correctness check");
let now = Instant::now();
let result_bruteforce = layer_map.get_difficulty_map_bruteforce(latest_lsn, &partitioning);
assert!(result_bruteforce.len() == partitioning.parts.len());
println!("Finished bruteforce in {:?}", now.elapsed());
let now = Instant::now();
let result_fast = layer_map.get_difficulty_map(latest_lsn, &partitioning, None);
assert!(result_fast.len() == partitioning.parts.len());
println!("Finished fast in {:?}", now.elapsed());
// Assert results are equal. Manually iterate for easier debugging.
let zip = std::iter::zip(
&partitioning.parts,
std::iter::zip(result_bruteforce, result_fast),
);
for (_part, (bruteforce, fast)) in zip {
assert_eq!(bruteforce, fast);
}
println!("No issues found");
}
// Define and name the benchmark function
let mut group = c.benchmark_group("real_map");
group.bench_function("uniform_queries", |b| {
b.iter(|| {
for q in queries.clone().into_iter() {
layer_map.search(q.0, q.1);
}
});
});
group.bench_function("get_difficulty_map", |b| {
b.iter(|| {
layer_map.get_difficulty_map(latest_lsn, &partitioning, Some(3));
});
});
group.finish();
}
// Benchmark using synthetic data. Arrange image layers on stacked diagonal lines.
fn bench_sequential(c: &mut Criterion) {
let mut layer_map: LayerMap<dyn Layer> = LayerMap::default();
// Init layer map. Create 100_000 layers arranged in 1000 diagonal lines.
//
// TODO This code is pretty slow and runs even if we're only running other
@@ -208,39 +220,39 @@ fn bench_sequential(c: &mut Criterion) {
// Putting it inside the `bench_function` closure is not a solution
// because then it runs multiple times during warmup.
let now = Instant::now();
let mut layer_map = LayerMap::default();
let mut updates = layer_map.batch_update();
for i in 0..100_000 {
// TODO try inserting a super-wide layer in between every 10 to reflect
// what often happens with L1 layers that include non-rel changes.
// Maybe do that as a separate test.
let i32 = (i as u32) % 100;
let zero = Key::from_hex("000000000000000000000000000000000000").unwrap();
let layer = DummyImage {
key_range: zero.add(10 * i32)..zero.add(10 * i32 + 1),
lsn: Lsn(10 * i),
let layer = LayerDescriptor {
key: zero.add(10 * i32)..zero.add(10 * i32 + 1),
lsn: Lsn(i)..Lsn(i + 1),
is_incremental: false,
short_id: format!("Layer {}", i),
};
layer_map.insert_historic(Arc::new(layer));
updates.insert_historic(Arc::new(layer));
}
// Manually measure runtime without criterion because criterion
// has a minimum sample size of 10 and I don't want to run it 10 times.
println!("Finished init in {:?}", now.elapsed());
updates.flush();
println!("Finished layer map init in {:?}", now.elapsed());
// Choose 100 uniformly random queries
let rng = &mut StdRng::seed_from_u64(1);
let queries: Vec<(Key, Lsn)> = uniform_query_pattern(&layer_map)
.choose_multiple(rng, 1)
.choose_multiple(rng, 100)
.copied()
.collect();
// Define and name the benchmark function
c.bench_function("sequential_uniform_queries", |b| {
// Run the search queries
let mut group = c.benchmark_group("sequential");
group.bench_function("uniform_queries", |b| {
b.iter(|| {
for q in queries.clone().into_iter() {
layer_map.search(q.0, q.1);
}
});
});
group.finish();
}
criterion_group!(group_1, bench_from_captest_env);

View File

@@ -37,6 +37,17 @@ impl Key {
| self.field6 as i128
}
pub fn from_i128(x: i128) -> Self {
Key {
field1: ((x >> 120) & 0xf) as u8,
field2: ((x >> 104) & 0xFFFF) as u32,
field3: (x >> 72) as u32,
field4: (x >> 40) as u32,
field5: (x >> 32) as u8,
field6: x as u32,
}
}
pub fn next(&self) -> Key {
self.add(1)
}

View File

@@ -9,24 +9,57 @@
//! are frozen, and it is split up into new image and delta layers and the
//! corresponding files are written to disk.
//!
//! Design overview:
//!
//! The `search` method of the layer map is on the read critical path, so we've
//! built an efficient data structure for fast reads, stored in `LayerMap::historic`.
//! Other read methods are less critical but still impact performance of background tasks.
//!
//! This data structure relies on a persistent/immutable binary search tree. See the
//! following lecture for an introduction https://www.youtube.com/watch?v=WqCWghETNDc&t=581s
//! Summary: A persistent/immutable BST (and persistent data structures in general) allows
//! you to modify the tree in such a way that each modification creates a new "version"
//! of the tree. When you modify it, you get a new version, but all previous versions are
//! still accessible too. So if someone is still holding a reference to an older version,
//! they continue to see the tree as it was then. The persistent BST stores all the
//! different versions in an efficient way.
//!
//! Our persistent BST maintains a map of which layer file "covers" each key. It has only
//! one dimension, the key. See `layer_coverage.rs`. We use the persistent/immutable property
//! to handle the LSN dimension.
//!
//! To build the layer map, we insert each layer to the persistent BST in LSN.start order,
//! starting from the oldest one. After each insertion, we grab a reference to that "version"
//! of the tree, and store it in another tree, a BtreeMap keyed by the LSN. See
//! `historic_layer_coverage.rs`.
//!
//! To search for a particular key-LSN pair, you first look up the right "version" in the
//! BTreeMap. Then you search that version of the BST with the key.
//!
//! The persistent BST keeps all the versions, but there is no way to change the old versions
//! afterwards. We can add layers as long as they have larger LSNs than any previous layer in
//! the map, but if we need to remove a layer, or insert anything with an older LSN, we need
//! to throw away most of the persistent BST and build a new one, starting from the oldest
//! LSN. See `LayerMap::flush_updates()`.
//!
mod historic_layer_coverage;
mod layer_coverage;
use crate::keyspace::KeyPartitioning;
use crate::metrics::NUM_ONDISK_LAYERS;
use crate::repository::Key;
use crate::tenant::storage_layer::{range_eq, range_overlaps};
use amplify_num::i256;
use crate::tenant::storage_layer::InMemoryLayer;
use crate::tenant::storage_layer::Layer;
use anyhow::Result;
use num_traits::identities::{One, Zero};
use num_traits::{Bounded, Num, Signed};
use rstar::{RTree, RTreeObject, AABB};
use std::cmp::Ordering;
use std::collections::VecDeque;
use std::ops::Range;
use std::ops::{Add, Div, Mul, Neg, Rem, Sub};
use std::sync::Arc;
use tracing::*;
use utils::lsn::Lsn;
use super::storage_layer::{InMemoryLayer, Layer};
use historic_layer_coverage::BufferedHistoricLayerCoverage;
use super::storage_layer::range_eq;
///
/// LayerMap tracks what layers exist on a timeline.
@@ -51,8 +84,8 @@ pub struct LayerMap<L: ?Sized> {
///
pub frozen_layers: VecDeque<Arc<InMemoryLayer>>,
/// All the historic layers are kept here
historic_layers: RTree<LayerRTreeObject<L>>,
/// Index of the historic layers optimized for search
historic: BufferedHistoricLayerCoverage<Arc<L>>,
/// L0 layers have key range Key::MIN..Key::MAX, and locating them using R-Tree search is very inefficient.
/// So L0 layers are held in l0_delta_layers vector, in addition to the R-tree.
@@ -65,177 +98,64 @@ impl<L: ?Sized> Default for LayerMap<L> {
open_layer: None,
next_open_layer_at: None,
frozen_layers: VecDeque::default(),
historic_layers: RTree::default(),
l0_delta_layers: Vec::default(),
historic: BufferedHistoricLayerCoverage::default(),
}
}
}
struct LayerRTreeObject<L: ?Sized> {
layer: Arc<L>,
envelope: AABB<[IntKey; 2]>,
/// The primary update API for the layer map.
///
/// Batching historic layer insertions and removals is good for
/// performance and this struct helps us do that correctly.
#[must_use]
pub struct BatchedUpdates<'a, L: ?Sized + Layer> {
// While we hold this exclusive reference to the layer map the type checker
// will prevent us from accidentally reading any unflushed updates.
layer_map: &'a mut LayerMap<L>,
}
// Representation of Key as numeric type.
// We can not use native implementation of i128, because rstar::RTree
// doesn't handle properly integer overflow during area calculation: sum(Xi*Yi).
// Overflow will cause panic in debug mode and incorrect area calculation in release mode,
// which leads to non-optimally balanced R-Tree (but doesn't fit correctness of R-Tree work).
// By using i256 as the type, even though all the actual values would fit in i128, we can be
// sure that multiplication doesn't overflow.
//
#[derive(Clone, PartialEq, Eq, PartialOrd, Debug)]
struct IntKey(i256);
impl Copy for IntKey {}
impl IntKey {
fn from(i: i128) -> Self {
IntKey(i256::from(i))
}
}
impl Bounded for IntKey {
fn min_value() -> Self {
IntKey(i256::MIN)
}
fn max_value() -> Self {
IntKey(i256::MAX)
}
}
impl Signed for IntKey {
fn is_positive(&self) -> bool {
self.0 > i256::ZERO
}
fn is_negative(&self) -> bool {
self.0 < i256::ZERO
}
fn signum(&self) -> Self {
match self.0.cmp(&i256::ZERO) {
Ordering::Greater => IntKey(i256::ONE),
Ordering::Less => IntKey(-i256::ONE),
Ordering::Equal => IntKey(i256::ZERO),
}
}
fn abs(&self) -> Self {
IntKey(self.0.abs())
}
fn abs_sub(&self, other: &Self) -> Self {
if self.0 <= other.0 {
IntKey(i256::ZERO)
} else {
IntKey(self.0 - other.0)
}
}
}
impl Neg for IntKey {
type Output = Self;
fn neg(self) -> Self::Output {
IntKey(-self.0)
}
}
impl Rem for IntKey {
type Output = Self;
fn rem(self, rhs: Self) -> Self::Output {
IntKey(self.0 % rhs.0)
}
}
impl Div for IntKey {
type Output = Self;
fn div(self, rhs: Self) -> Self::Output {
IntKey(self.0 / rhs.0)
}
}
impl Add for IntKey {
type Output = Self;
fn add(self, rhs: Self) -> Self::Output {
IntKey(self.0 + rhs.0)
}
}
impl Sub for IntKey {
type Output = Self;
fn sub(self, rhs: Self) -> Self::Output {
IntKey(self.0 - rhs.0)
}
}
impl Mul for IntKey {
type Output = Self;
fn mul(self, rhs: Self) -> Self::Output {
IntKey(self.0 * rhs.0)
}
}
impl One for IntKey {
fn one() -> Self {
IntKey(i256::ONE)
}
}
impl Zero for IntKey {
fn zero() -> Self {
IntKey(i256::ZERO)
}
fn is_zero(&self) -> bool {
self.0 == i256::ZERO
}
}
impl Num for IntKey {
type FromStrRadixErr = <i128 as Num>::FromStrRadixErr;
fn from_str_radix(str: &str, radix: u32) -> Result<Self, Self::FromStrRadixErr> {
Ok(IntKey(i256::from(i128::from_str_radix(str, radix)?)))
}
}
impl<T: ?Sized> PartialEq for LayerRTreeObject<T> {
fn eq(&self, other: &Self) -> bool {
// FIXME: ptr_eq might fail to return true for 'dyn'
// references. Clippy complains about this. In practice it
// seems to work, the assertion below would be triggered
// otherwise but this ought to be fixed.
#[allow(clippy::vtable_address_comparisons)]
Arc::ptr_eq(&self.layer, &other.layer)
}
}
impl<L> RTreeObject for LayerRTreeObject<L>
where
L: ?Sized,
{
type Envelope = AABB<[IntKey; 2]>;
fn envelope(&self) -> Self::Envelope {
self.envelope
}
}
impl<L> LayerRTreeObject<L>
/// Provide ability to batch more updates while hiding the read
/// API so we don't accidentally read without flushing.
impl<L> BatchedUpdates<'_, L>
where
L: ?Sized + Layer,
{
fn new(layer: Arc<L>) -> Self {
let key_range = layer.get_key_range();
let lsn_range = layer.get_lsn_range();
///
/// Insert an on-disk layer.
///
pub fn insert_historic(&mut self, layer: Arc<L>) {
self.layer_map.insert_historic_noflush(layer)
}
let envelope = AABB::from_corners(
[
IntKey::from(key_range.start.to_i128()),
IntKey::from(lsn_range.start.0 as i128),
],
[
IntKey::from(key_range.end.to_i128() - 1),
IntKey::from(lsn_range.end.0 as i128 - 1),
], // AABB::upper is inclusive, while `key_range.end` and `lsn_range.end` are exclusive
);
LayerRTreeObject { layer, envelope }
///
/// Remove an on-disk layer from the map.
///
/// This should be called when the corresponding file on disk has been deleted.
///
pub fn remove_historic(&mut self, layer: Arc<L>) {
self.layer_map.remove_historic_noflush(layer)
}
// We will flush on drop anyway, but this method makes it
// more explicit that there is some work being done.
/// Apply all updates
pub fn flush(self) {
// Flush happens on drop
}
}
// Ideally the flush() method should be called explicitly for more
// controlled execution. But if we forget we'd rather flush on drop
// than panic later or read without flushing.
//
// TODO maybe warn if flush hasn't explicitly been called
impl<L> Drop for BatchedUpdates<'_, L>
where
L: ?Sized + Layer,
{
fn drop(&mut self) {
self.layer_map.flush_updates();
}
}
@@ -281,125 +201,91 @@ where
/// 'open' and 'frozen' layers!
///
pub fn search(&self, key: Key, end_lsn: Lsn) -> Option<SearchResult<L>> {
// Find the latest image layer that covers the given key
let mut latest_img: Option<Arc<L>> = None;
let mut latest_img_lsn: Option<Lsn> = None;
let envelope = AABB::from_corners(
[IntKey::from(key.to_i128()), IntKey::from(0i128)],
[
IntKey::from(key.to_i128()),
IntKey::from(end_lsn.0 as i128 - 1),
],
);
for e in self
.historic_layers
.locate_in_envelope_intersecting(&envelope)
{
let l = &e.layer;
if l.is_incremental() {
continue;
}
assert!(l.get_key_range().contains(&key));
let img_lsn = l.get_lsn_range().start;
assert!(img_lsn < end_lsn);
if Lsn(img_lsn.0 + 1) == end_lsn {
// found exact match
return Some(SearchResult {
layer: Arc::clone(l),
lsn_floor: img_lsn,
});
}
if img_lsn > latest_img_lsn.unwrap_or(Lsn(0)) {
latest_img = Some(Arc::clone(l));
latest_img_lsn = Some(img_lsn);
}
}
let version = self.historic.get().unwrap().get_version(end_lsn.0 - 1)?;
let latest_delta = version.delta_coverage.query(key.to_i128());
let latest_image = version.image_coverage.query(key.to_i128());
// Search the delta layers
let mut latest_delta: Option<Arc<L>> = None;
for e in self
.historic_layers
.locate_in_envelope_intersecting(&envelope)
{
let l = &e.layer;
if !l.is_incremental() {
continue;
match (latest_delta, latest_image) {
(None, None) => None,
(None, Some(image)) => {
let lsn_floor = image.get_lsn_range().start;
Some(SearchResult {
layer: image,
lsn_floor,
})
}
assert!(l.get_key_range().contains(&key));
if l.get_lsn_range().start >= end_lsn {
info!(
"Candidate delta layer {}..{} is too new for lsn {}",
l.get_lsn_range().start,
l.get_lsn_range().end,
end_lsn
);
(Some(delta), None) => {
let lsn_floor = delta.get_lsn_range().start;
Some(SearchResult {
layer: delta,
lsn_floor,
})
}
assert!(l.get_lsn_range().start < end_lsn);
if l.get_lsn_range().end >= end_lsn {
// this layer contains the requested point in the key/lsn space.
// No need to search any further
trace!(
"found layer {} for request on {key} at {end_lsn}",
l.short_id(),
);
latest_delta.replace(Arc::clone(l));
break;
}
if l.get_lsn_range().end > latest_img_lsn.unwrap_or(Lsn(0)) {
// this layer's end LSN is smaller than the requested point. If there's
// nothing newer, this is what we need to return. Remember this.
if let Some(old_candidate) = &latest_delta {
if l.get_lsn_range().end > old_candidate.get_lsn_range().end {
latest_delta.replace(Arc::clone(l));
}
(Some(delta), Some(image)) => {
let img_lsn = image.get_lsn_range().start;
let image_is_newer = image.get_lsn_range().end >= delta.get_lsn_range().end;
let image_exact_match = img_lsn + 1 == end_lsn;
if image_is_newer || image_exact_match {
Some(SearchResult {
layer: image,
lsn_floor: img_lsn,
})
} else {
latest_delta.replace(Arc::clone(l));
let lsn_floor =
std::cmp::max(delta.get_lsn_range().start, image.get_lsn_range().start + 1);
Some(SearchResult {
layer: delta,
lsn_floor,
})
}
}
}
if let Some(l) = latest_delta {
trace!(
"found (old) layer {} for request on {key} at {end_lsn}",
l.short_id(),
);
let lsn_floor = std::cmp::max(
Lsn(latest_img_lsn.unwrap_or(Lsn(0)).0 + 1),
l.get_lsn_range().start,
);
Some(SearchResult {
lsn_floor,
layer: l,
})
} else if let Some(l) = latest_img {
trace!("found img layer and no deltas for request on {key} at {end_lsn}");
Some(SearchResult {
lsn_floor: latest_img_lsn.unwrap(),
layer: l,
})
} else {
trace!("no layer found for request on {key} at {end_lsn}");
None
}
}
/// Start a batch of updates, applied on drop
pub fn batch_update(&mut self) -> BatchedUpdates<'_, L> {
BatchedUpdates { layer_map: self }
}
///
/// Insert an on-disk layer
///
pub fn insert_historic(&mut self, layer: Arc<L>) {
if layer.get_key_range() == (Key::MIN..Key::MAX) {
self.l0_delta_layers.push(layer.clone());
/// Helper function for BatchedUpdates::insert_historic
///
pub(self) fn insert_historic_noflush(&mut self, layer: Arc<L>) {
let kr = layer.get_key_range();
let lr = layer.get_lsn_range();
self.historic.insert(
historic_layer_coverage::LayerKey {
key: kr.start.to_i128()..kr.end.to_i128(),
lsn: lr.start.0..lr.end.0,
is_image: !layer.is_incremental(),
},
Arc::clone(&layer),
);
if Self::is_l0(&layer) {
self.l0_delta_layers.push(layer);
}
self.historic_layers.insert(LayerRTreeObject::new(layer));
NUM_ONDISK_LAYERS.inc();
}
///
/// Remove an on-disk layer from the map.
///
/// This should be called when the corresponding file on disk has been deleted.
/// Helper function for BatchedUpdates::remove_historic
///
pub fn remove_historic(&mut self, layer: Arc<L>) {
if layer.get_key_range() == (Key::MIN..Key::MAX) {
pub fn remove_historic_noflush(&mut self, layer: Arc<L>) {
let kr = layer.get_key_range();
let lr = layer.get_lsn_range();
self.historic.remove(historic_layer_coverage::LayerKey {
key: kr.start.to_i128()..kr.end.to_i128(),
lsn: lr.start.0..lr.end.0,
is_image: !layer.is_incremental(),
});
if Self::is_l0(&layer) {
let len_before = self.l0_delta_layers.len();
// FIXME: ptr_eq might fail to return true for 'dyn'
@@ -411,98 +297,57 @@ where
.retain(|other| !Arc::ptr_eq(other, &layer));
assert_eq!(self.l0_delta_layers.len(), len_before - 1);
}
assert!(self
.historic_layers
.remove(&LayerRTreeObject::new(layer))
.is_some());
NUM_ONDISK_LAYERS.dec();
}
/// Helper function for BatchedUpdates::drop.
pub(self) fn flush_updates(&mut self) {
self.historic.rebuild();
}
/// Is there a newer image layer for given key- and LSN-range? Or a set
/// of image layers within the specified lsn range that cover the entire
/// specified key range?
///
/// This is used for garbage collection, to determine if an old layer can
/// be deleted.
pub fn image_layer_exists(
&self,
key_range: &Range<Key>,
lsn_range: &Range<Lsn>,
) -> Result<bool> {
let mut range_remain = key_range.clone();
pub fn image_layer_exists(&self, key: &Range<Key>, lsn: &Range<Lsn>) -> Result<bool> {
if key.is_empty() {
// Vacuously true. There's a newer image for all 0 of the kerys in the range.
return Ok(true);
}
loop {
let mut made_progress = false;
let envelope = AABB::from_corners(
[
IntKey::from(range_remain.start.to_i128()),
IntKey::from(lsn_range.start.0 as i128),
],
[
IntKey::from(range_remain.end.to_i128() - 1),
IntKey::from(lsn_range.end.0 as i128 - 1),
],
);
for e in self
.historic_layers
.locate_in_envelope_intersecting(&envelope)
{
let l = &e.layer;
if l.is_incremental() {
continue;
}
let img_lsn = l.get_lsn_range().start;
if l.get_key_range().contains(&range_remain.start) && lsn_range.contains(&img_lsn) {
made_progress = true;
let img_key_end = l.get_key_range().end;
let version = match self.historic.get().unwrap().get_version(lsn.end.0 - 1) {
Some(v) => v,
None => return Ok(false),
};
if img_key_end >= range_remain.end {
return Ok(true);
}
range_remain.start = img_key_end;
}
}
let start = key.start.to_i128();
let end = key.end.to_i128();
if !made_progress {
let layer_covers = |layer: Option<Arc<L>>| match layer {
Some(layer) => layer.get_lsn_range().start >= lsn.start,
None => false,
};
// Check the start is covered
if !layer_covers(version.image_coverage.query(start)) {
return Ok(false);
}
// Check after all changes of coverage
for (_, change_val) in version.image_coverage.range(start..end) {
if !layer_covers(change_val) {
return Ok(false);
}
}
Ok(true)
}
pub fn iter_historic_layers(&self) -> impl '_ + Iterator<Item = Arc<L>> {
self.historic_layers.iter().map(|e| e.layer.clone())
}
/// Find the last image layer that covers 'key', ignoring any image layers
/// newer than 'lsn'.
fn find_latest_image(&self, key: Key, lsn: Lsn) -> Option<Arc<L>> {
let mut candidate_lsn = Lsn(0);
let mut candidate = None;
let envelope = AABB::from_corners(
[IntKey::from(key.to_i128()), IntKey::from(0)],
[IntKey::from(key.to_i128()), IntKey::from(lsn.0 as i128)],
);
for e in self
.historic_layers
.locate_in_envelope_intersecting(&envelope)
{
let l = &e.layer;
if l.is_incremental() {
continue;
}
assert!(l.get_key_range().contains(&key));
let this_lsn = l.get_lsn_range().start;
assert!(this_lsn <= lsn);
if this_lsn < candidate_lsn {
// our previous candidate was better
continue;
}
candidate_lsn = this_lsn;
candidate = Some(Arc::clone(l));
}
candidate
self.historic.iter()
}
///
@@ -518,94 +363,288 @@ where
key_range: &Range<Key>,
lsn: Lsn,
) -> Result<Vec<(Range<Key>, Option<Arc<L>>)>> {
let mut points = vec![key_range.start];
let envelope = AABB::from_corners(
[IntKey::from(key_range.start.to_i128()), IntKey::from(0)],
[
IntKey::from(key_range.end.to_i128()),
IntKey::from(lsn.0 as i128),
],
);
for e in self
.historic_layers
.locate_in_envelope_intersecting(&envelope)
{
let l = &e.layer;
assert!(l.get_lsn_range().start <= lsn);
let range = l.get_key_range();
if key_range.contains(&range.start) {
points.push(l.get_key_range().start);
}
if key_range.contains(&range.end) {
points.push(l.get_key_range().end);
}
let version = match self.historic.get().unwrap().get_version(lsn.0) {
Some(v) => v,
None => return Ok(vec![]),
};
let start = key_range.start.to_i128();
let end = key_range.end.to_i128();
// Initialize loop variables
let mut coverage: Vec<(Range<Key>, Option<Arc<L>>)> = vec![];
let mut current_key = start;
let mut current_val = version.image_coverage.query(start);
// Loop through the change events and push intervals
for (change_key, change_val) in version.image_coverage.range(start..end) {
let kr = Key::from_i128(current_key)..Key::from_i128(change_key);
coverage.push((kr, current_val.take()));
current_key = change_key;
current_val = change_val.clone();
}
points.push(key_range.end);
points.sort();
points.dedup();
// Add the final interval
let kr = Key::from_i128(current_key)..Key::from_i128(end);
coverage.push((kr, current_val.take()));
// Ok, we now have a list of "interesting" points in the key space
// For each range between the points, find the latest image
let mut start = *points.first().unwrap();
let mut ranges = Vec::new();
for end in points[1..].iter() {
let img = self.find_latest_image(start, lsn);
ranges.push((start..*end, img));
start = *end;
}
Ok(ranges)
Ok(coverage)
}
/// Count the height of the tallest stack of deltas in this 2d region.
pub fn is_l0(layer: &L) -> bool {
range_eq(&layer.get_key_range(), &(Key::MIN..Key::MAX))
}
/// This function determines which layers are counted in `count_deltas`:
/// layers that should count towards deciding whether or not to reimage
/// a certain partition range.
///
/// There are two kinds of layers we currently consider reimage-worthy:
///
/// Case 1: Non-L0 layers are currently reimage-worthy by default.
/// TODO Some of these layers are very sparse and cover the entire key
/// range. Replacing 256MB of data (or less!) with terabytes of
/// images doesn't seem wise. We need a better heuristic, possibly
/// based on some of these factors:
/// a) whether this layer has any wal in this partition range
/// b) the size of the layer
/// c) the number of images needed to cover it
/// d) the estimated time until we'll have to reimage over it for GC
///
/// Case 2: Since L0 layers by definition cover the entire key space, we consider
/// them reimage-worthy only when the entire key space can be covered by very few
/// images (currently 1).
/// TODO The optimal number should probably be slightly higher than 1, but to
/// implement that we need to plumb a lot more context into this function
/// than just the current partition_range.
pub fn is_reimage_worthy(layer: &L, partition_range: &Range<Key>) -> bool {
// Case 1
if !Self::is_l0(layer) {
return true;
}
// Case 2
if range_eq(partition_range, &(Key::MIN..Key::MAX)) {
return true;
}
false
}
/// Count the height of the tallest stack of reimage-worthy deltas
/// in this 2d region.
///
/// If `limit` is provided we don't try to count above that number.
///
/// This number is used to compute the largest number of deltas that
/// we'll need to visit for any page reconstruction in this region.
/// We use this heuristic to decide whether to create an image layer.
///
/// TODO currently we just return the total number of deltas in the
/// region, no matter if they're stacked on top of each other
/// or next to each other.
pub fn count_deltas(&self, key_range: &Range<Key>, lsn_range: &Range<Lsn>) -> Result<usize> {
let mut result = 0;
if lsn_range.start >= lsn_range.end {
pub fn count_deltas(
&self,
key: &Range<Key>,
lsn: &Range<Lsn>,
limit: Option<usize>,
) -> Result<usize> {
// We get the delta coverage of the region, and for each part of the coverage
// we recurse right underneath the delta. The recursion depth is limited by
// the largest result this function could return, which is in practice between
// 3 and 10 (since we usually try to create an image when the number gets larger).
if lsn.is_empty() || key.is_empty() || limit == Some(0) {
return Ok(0);
}
let envelope = AABB::from_corners(
[
IntKey::from(key_range.start.to_i128()),
IntKey::from(lsn_range.start.0 as i128),
],
[
IntKey::from(key_range.end.to_i128() - 1),
IntKey::from(lsn_range.end.0 as i128 - 1),
],
);
for e in self
.historic_layers
.locate_in_envelope_intersecting(&envelope)
{
let l = &e.layer;
if !l.is_incremental() {
continue;
}
assert!(range_overlaps(&l.get_lsn_range(), lsn_range));
assert!(range_overlaps(&l.get_key_range(), key_range));
// We ignore level0 delta layers. Unless the whole keyspace fits
// into one partition
if !range_eq(key_range, &(Key::MIN..Key::MAX))
&& range_eq(&l.get_key_range(), &(Key::MIN..Key::MAX))
{
continue;
let version = match self.historic.get().unwrap().get_version(lsn.end.0 - 1) {
Some(v) => v,
None => return Ok(0),
};
let start = key.start.to_i128();
let end = key.end.to_i128();
// Initialize loop variables
let mut max_stacked_deltas = 0;
let mut current_key = start;
let mut current_val = version.delta_coverage.query(start);
// Loop through the delta coverage and recurse on each part
for (change_key, change_val) in version.delta_coverage.range(start..end) {
// If there's a relevant delta in this part, add 1 and recurse down
if let Some(val) = current_val {
if val.get_lsn_range().end > lsn.start {
let kr = Key::from_i128(current_key)..Key::from_i128(change_key);
let lr = lsn.start..val.get_lsn_range().start;
if !kr.is_empty() {
let base_count = Self::is_reimage_worthy(&val, key) as usize;
let new_limit = limit.map(|l| l - base_count);
let max_stacked_deltas_underneath =
self.count_deltas(&kr, &lr, new_limit)?;
max_stacked_deltas = std::cmp::max(
max_stacked_deltas,
base_count + max_stacked_deltas_underneath,
);
}
}
}
result += 1;
current_key = change_key;
current_val = change_val.clone();
}
Ok(result)
// Consider the last part
if let Some(val) = current_val {
if val.get_lsn_range().end > lsn.start {
let kr = Key::from_i128(current_key)..Key::from_i128(end);
let lr = lsn.start..val.get_lsn_range().start;
if !kr.is_empty() {
let base_count = Self::is_reimage_worthy(&val, key) as usize;
let new_limit = limit.map(|l| l - base_count);
let max_stacked_deltas_underneath = self.count_deltas(&kr, &lr, new_limit)?;
max_stacked_deltas = std::cmp::max(
max_stacked_deltas,
base_count + max_stacked_deltas_underneath,
);
}
}
}
Ok(max_stacked_deltas)
}
/// Count how many reimage-worthy layers we need to visit for given key-lsn pair.
///
/// The `partition_range` argument is used as context for the reimage-worthiness decision.
///
/// Used as a helper for correctness checks only. Performance not critical.
pub fn get_difficulty(&self, lsn: Lsn, key: Key, partition_range: &Range<Key>) -> usize {
match self.search(key, lsn) {
Some(search_result) => {
if search_result.layer.is_incremental() {
(Self::is_reimage_worthy(&search_result.layer, partition_range) as usize)
+ self.get_difficulty(search_result.lsn_floor, key, partition_range)
} else {
0
}
}
None => 0,
}
}
/// Used for correctness checking. Results are expected to be identical to
/// self.get_difficulty_map. Assumes self.search is correct.
pub fn get_difficulty_map_bruteforce(
&self,
lsn: Lsn,
partitioning: &KeyPartitioning,
) -> Vec<usize> {
// Looking at the difficulty as a function of key, it could only increase
// when a delta layer starts or an image layer ends. Therefore it's sufficient
// to check the difficulties at:
// - the key.start for each non-empty part range
// - the key.start for each delta
// - the key.end for each image
let keys_iter: Box<dyn Iterator<Item = Key>> = {
let mut keys: Vec<Key> = self
.iter_historic_layers()
.map(|layer| {
if layer.is_incremental() {
layer.get_key_range().start
} else {
layer.get_key_range().end
}
})
.collect();
keys.sort();
Box::new(keys.into_iter())
};
let mut keys_iter = keys_iter.peekable();
// Iter the partition and keys together and query all the necessary
// keys, computing the max difficulty for each part.
partitioning
.parts
.iter()
.map(|part| {
let mut difficulty = 0;
// Partition ranges are assumed to be sorted and disjoint
// TODO assert it
for range in &part.ranges {
if !range.is_empty() {
difficulty =
std::cmp::max(difficulty, self.get_difficulty(lsn, range.start, range));
}
while let Some(key) = keys_iter.peek() {
if key >= &range.end {
break;
}
let key = keys_iter.next().unwrap();
if key < range.start {
continue;
}
difficulty =
std::cmp::max(difficulty, self.get_difficulty(lsn, key, range));
}
}
difficulty
})
.collect()
}
/// For each part of a keyspace partitioning, return the maximum number of layers
/// that would be needed for page reconstruction in that part at the given LSN.
///
/// If `limit` is provided we don't try to count above that number.
///
/// This method is used to decide where to create new image layers. Computing the
/// result for the entire partitioning at once allows this function to be more
/// efficient, and further optimization is possible by using iterators instead,
/// to allow early return.
///
/// TODO actually use this method instead of count_deltas. Currently we only use
/// it for benchmarks.
pub fn get_difficulty_map(
&self,
lsn: Lsn,
partitioning: &KeyPartitioning,
limit: Option<usize>,
) -> Vec<usize> {
// TODO This is a naive implementation. Perf improvements to do:
// 1. Instead of calling self.image_coverage and self.count_deltas,
// iterate the image and delta coverage only once.
partitioning
.parts
.iter()
.map(|part| {
let mut difficulty = 0;
for range in &part.ranges {
if limit == Some(difficulty) {
break;
}
for (img_range, last_img) in self
.image_coverage(range, lsn)
.expect("why would this err?")
{
if limit == Some(difficulty) {
break;
}
let img_lsn = if let Some(last_img) = last_img {
last_img.get_lsn_range().end
} else {
Lsn(0)
};
if img_lsn < lsn {
let num_deltas = self
.count_deltas(&img_range, &(img_lsn..lsn), limit)
.expect("why would this err lol?");
difficulty = std::cmp::max(difficulty, num_deltas);
}
}
}
difficulty
})
.collect()
}
/// Return all L0 delta layers
@@ -629,8 +668,8 @@ where
}
println!("historic_layers:");
for e in self.historic_layers.iter() {
e.layer.dump(verbose)?;
for layer in self.iter_historic_layers() {
layer.dump(verbose)?;
}
println!("End dump LayerMap");
Ok(())

View File

@@ -0,0 +1,583 @@
use std::collections::BTreeMap;
use std::ops::Range;
use tracing::info;
use super::layer_coverage::LayerCoverageTuple;
/// Layers in this module are identified and indexed by this data.
///
/// This is a helper struct to enable sorting layers by lsn.start.
///
/// These three values are enough to uniquely identify a layer, since
/// a layer is obligated to contain all contents within range, so two
/// deltas (or images) with the same range have identical content.
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct LayerKey {
// TODO I use i128 and u64 because it was easy for prototyping,
// testing, and benchmarking. If we can use the Lsn and Key
// types without overhead that would be preferable.
pub key: Range<i128>,
pub lsn: Range<u64>,
pub is_image: bool,
}
impl PartialOrd for LayerKey {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for LayerKey {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
// NOTE we really care about comparing by lsn.start first
self.lsn
.start
.cmp(&other.lsn.start)
.then(self.lsn.end.cmp(&other.lsn.end))
.then(self.key.start.cmp(&other.key.start))
.then(self.key.end.cmp(&other.key.end))
.then(self.is_image.cmp(&other.is_image))
}
}
/// Efficiently queryable layer coverage for each LSN.
///
/// Allows answering layer map queries very efficiently,
/// but doesn't allow retroactive insertion, which is
/// sometimes necessary. See BufferedHistoricLayerCoverage.
pub struct HistoricLayerCoverage<Value> {
/// The latest state
head: LayerCoverageTuple<Value>,
/// All previous states
historic: BTreeMap<u64, LayerCoverageTuple<Value>>,
}
impl<T: Clone> Default for HistoricLayerCoverage<T> {
fn default() -> Self {
Self::new()
}
}
impl<Value: Clone> HistoricLayerCoverage<Value> {
pub fn new() -> Self {
Self {
head: LayerCoverageTuple::default(),
historic: BTreeMap::default(),
}
}
/// Add a layer
///
/// Panics if new layer has older lsn.start than an existing layer.
/// See BufferedHistoricLayerCoverage for a more general insertion method.
pub fn insert(&mut self, layer_key: LayerKey, value: Value) {
// It's only a persistent map, not a retroactive one
if let Some(last_entry) = self.historic.iter().next_back() {
let last_lsn = last_entry.0;
if layer_key.lsn.start < *last_lsn {
panic!("unexpected retroactive insert");
}
}
// Insert into data structure
if layer_key.is_image {
self.head
.image_coverage
.insert(layer_key.key, layer_key.lsn.clone(), value);
} else {
self.head
.delta_coverage
.insert(layer_key.key, layer_key.lsn.clone(), value);
}
// Remember history. Clone is O(1)
self.historic.insert(layer_key.lsn.start, self.head.clone());
}
/// Query at a particular LSN, inclusive
pub fn get_version(&self, lsn: u64) -> Option<&LayerCoverageTuple<Value>> {
match self.historic.range(..=lsn).next_back() {
Some((_, v)) => Some(v),
None => None,
}
}
/// Remove all entries after a certain LSN (inclusive)
pub fn trim(&mut self, begin: &u64) {
self.historic.split_off(begin);
self.head = self
.historic
.iter()
.rev()
.next()
.map(|(_, v)| v.clone())
.unwrap_or_default();
}
}
/// This is the most basic test that demonstrates intended usage.
/// All layers in this test have height 1.
#[test]
fn test_persistent_simple() {
let mut map = HistoricLayerCoverage::<String>::new();
map.insert(
LayerKey {
key: 0..5,
lsn: 100..101,
is_image: true,
},
"Layer 1".to_string(),
);
map.insert(
LayerKey {
key: 3..9,
lsn: 110..111,
is_image: true,
},
"Layer 2".to_string(),
);
map.insert(
LayerKey {
key: 5..6,
lsn: 120..121,
is_image: true,
},
"Layer 3".to_string(),
);
// After Layer 1 insertion
let version = map.get_version(105).unwrap();
assert_eq!(version.image_coverage.query(1), Some("Layer 1".to_string()));
assert_eq!(version.image_coverage.query(4), Some("Layer 1".to_string()));
// After Layer 2 insertion
let version = map.get_version(115).unwrap();
assert_eq!(version.image_coverage.query(4), Some("Layer 2".to_string()));
assert_eq!(version.image_coverage.query(8), Some("Layer 2".to_string()));
assert_eq!(version.image_coverage.query(11), None);
// After Layer 3 insertion
let version = map.get_version(125).unwrap();
assert_eq!(version.image_coverage.query(4), Some("Layer 2".to_string()));
assert_eq!(version.image_coverage.query(5), Some("Layer 3".to_string()));
assert_eq!(version.image_coverage.query(7), Some("Layer 2".to_string()));
}
/// Cover simple off-by-one edge cases
#[test]
fn test_off_by_one() {
let mut map = HistoricLayerCoverage::<String>::new();
map.insert(
LayerKey {
key: 3..5,
lsn: 100..110,
is_image: true,
},
"Layer 1".to_string(),
);
// Check different LSNs
let version = map.get_version(99);
assert!(version.is_none());
let version = map.get_version(100).unwrap();
assert_eq!(version.image_coverage.query(4), Some("Layer 1".to_string()));
let version = map.get_version(110).unwrap();
assert_eq!(version.image_coverage.query(4), Some("Layer 1".to_string()));
// Check different keys
let version = map.get_version(105).unwrap();
assert_eq!(version.image_coverage.query(2), None);
assert_eq!(version.image_coverage.query(3), Some("Layer 1".to_string()));
assert_eq!(version.image_coverage.query(4), Some("Layer 1".to_string()));
assert_eq!(version.image_coverage.query(5), None);
}
/// Cover edge cases where layers begin or end on the same key
#[test]
fn test_key_collision() {
let mut map = HistoricLayerCoverage::<String>::new();
map.insert(
LayerKey {
key: 3..5,
lsn: 100..110,
is_image: true,
},
"Layer 10".to_string(),
);
map.insert(
LayerKey {
key: 5..8,
lsn: 100..110,
is_image: true,
},
"Layer 11".to_string(),
);
map.insert(
LayerKey {
key: 3..4,
lsn: 200..210,
is_image: true,
},
"Layer 20".to_string(),
);
// Check after layer 11
let version = map.get_version(105).unwrap();
assert_eq!(version.image_coverage.query(2), None);
assert_eq!(
version.image_coverage.query(3),
Some("Layer 10".to_string())
);
assert_eq!(
version.image_coverage.query(5),
Some("Layer 11".to_string())
);
assert_eq!(
version.image_coverage.query(7),
Some("Layer 11".to_string())
);
assert_eq!(version.image_coverage.query(8), None);
// Check after layer 20
let version = map.get_version(205).unwrap();
assert_eq!(version.image_coverage.query(2), None);
assert_eq!(
version.image_coverage.query(3),
Some("Layer 20".to_string())
);
assert_eq!(
version.image_coverage.query(5),
Some("Layer 11".to_string())
);
assert_eq!(
version.image_coverage.query(7),
Some("Layer 11".to_string())
);
assert_eq!(version.image_coverage.query(8), None);
}
/// Test when rectangles have nontrivial height and possibly overlap
#[test]
fn test_persistent_overlapping() {
let mut map = HistoricLayerCoverage::<String>::new();
// Add 3 key-disjoint layers with varying LSN ranges
map.insert(
LayerKey {
key: 1..2,
lsn: 100..200,
is_image: true,
},
"Layer 1".to_string(),
);
map.insert(
LayerKey {
key: 4..5,
lsn: 110..200,
is_image: true,
},
"Layer 2".to_string(),
);
map.insert(
LayerKey {
key: 7..8,
lsn: 120..300,
is_image: true,
},
"Layer 3".to_string(),
);
// Add wide and short layer
map.insert(
LayerKey {
key: 0..9,
lsn: 130..199,
is_image: true,
},
"Layer 4".to_string(),
);
// Add wide layer taller than some
map.insert(
LayerKey {
key: 0..9,
lsn: 140..201,
is_image: true,
},
"Layer 5".to_string(),
);
// Add wide layer taller than all
map.insert(
LayerKey {
key: 0..9,
lsn: 150..301,
is_image: true,
},
"Layer 6".to_string(),
);
// After layer 4 insertion
let version = map.get_version(135).unwrap();
assert_eq!(version.image_coverage.query(0), Some("Layer 4".to_string()));
assert_eq!(version.image_coverage.query(1), Some("Layer 1".to_string()));
assert_eq!(version.image_coverage.query(2), Some("Layer 4".to_string()));
assert_eq!(version.image_coverage.query(4), Some("Layer 2".to_string()));
assert_eq!(version.image_coverage.query(5), Some("Layer 4".to_string()));
assert_eq!(version.image_coverage.query(7), Some("Layer 3".to_string()));
assert_eq!(version.image_coverage.query(8), Some("Layer 4".to_string()));
// After layer 5 insertion
let version = map.get_version(145).unwrap();
assert_eq!(version.image_coverage.query(0), Some("Layer 5".to_string()));
assert_eq!(version.image_coverage.query(1), Some("Layer 5".to_string()));
assert_eq!(version.image_coverage.query(2), Some("Layer 5".to_string()));
assert_eq!(version.image_coverage.query(4), Some("Layer 5".to_string()));
assert_eq!(version.image_coverage.query(5), Some("Layer 5".to_string()));
assert_eq!(version.image_coverage.query(7), Some("Layer 3".to_string()));
assert_eq!(version.image_coverage.query(8), Some("Layer 5".to_string()));
// After layer 6 insertion
let version = map.get_version(155).unwrap();
assert_eq!(version.image_coverage.query(0), Some("Layer 6".to_string()));
assert_eq!(version.image_coverage.query(1), Some("Layer 6".to_string()));
assert_eq!(version.image_coverage.query(2), Some("Layer 6".to_string()));
assert_eq!(version.image_coverage.query(4), Some("Layer 6".to_string()));
assert_eq!(version.image_coverage.query(5), Some("Layer 6".to_string()));
assert_eq!(version.image_coverage.query(7), Some("Layer 6".to_string()));
assert_eq!(version.image_coverage.query(8), Some("Layer 6".to_string()));
}
/// Wrapper for HistoricLayerCoverage that allows us to hack around the lack
/// of support for retroactive insertion by rebuilding the map since the
/// change.
///
/// Why is this needed? We most often insert new layers with newer LSNs,
/// but during compaction we create layers with non-latest LSN, and during
/// GC we delete historic layers.
///
/// Even though rebuilding is an expensive (N log N) solution to the problem,
/// it's not critical since we do something equally expensive just to decide
/// whether or not to create new image layers.
/// TODO It's not expensive but it's not great to hold a layer map write lock
/// for that long.
///
/// If this becomes an actual bottleneck, one solution would be to build a
/// segment tree that holds PersistentLayerMaps. Though this would mean that
/// we take an additional log(N) performance hit for queries, which will probably
/// still be more critical.
///
/// See this for more on persistent and retroactive techniques:
/// https://www.youtube.com/watch?v=WqCWghETNDc&t=581s
pub struct BufferedHistoricLayerCoverage<Value> {
/// A persistent layer map that we rebuild when we need to retroactively update
historic_coverage: HistoricLayerCoverage<Value>,
/// We buffer insertion into the PersistentLayerMap to decrease the number of rebuilds.
buffer: BTreeMap<LayerKey, Option<Value>>,
/// All current layers. This is not used for search. Only to make rebuilds easier.
layers: BTreeMap<LayerKey, Value>,
}
impl<T: std::fmt::Debug> std::fmt::Debug for BufferedHistoricLayerCoverage<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RetroactiveLayerMap")
.field("buffer", &self.buffer)
.field("layers", &self.layers)
.finish()
}
}
impl<T: Clone> Default for BufferedHistoricLayerCoverage<T> {
fn default() -> Self {
Self::new()
}
}
impl<Value: Clone> BufferedHistoricLayerCoverage<Value> {
pub fn new() -> Self {
Self {
historic_coverage: HistoricLayerCoverage::<Value>::new(),
buffer: BTreeMap::new(),
layers: BTreeMap::new(),
}
}
pub fn insert(&mut self, layer_key: LayerKey, value: Value) {
self.buffer.insert(layer_key, Some(value));
}
pub fn remove(&mut self, layer_key: LayerKey) {
self.buffer.insert(layer_key, None);
}
pub fn rebuild(&mut self) {
// Find the first LSN that needs to be rebuilt
let rebuild_since: u64 = match self.buffer.iter().next() {
Some((LayerKey { lsn, .. }, _)) => lsn.start,
None => return, // No need to rebuild if buffer is empty
};
// Apply buffered updates to self.layers
let num_updates = self.buffer.len();
self.buffer.retain(|layer_key, layer| {
match layer {
Some(l) => {
self.layers.insert(layer_key.clone(), l.clone());
}
None => {
self.layers.remove(layer_key);
}
};
false
});
// Rebuild
let mut num_inserted = 0;
self.historic_coverage.trim(&rebuild_since);
for (layer_key, layer) in self.layers.range(
LayerKey {
lsn: rebuild_since..0,
key: 0..0,
is_image: false,
}..,
) {
self.historic_coverage
.insert(layer_key.clone(), layer.clone());
num_inserted += 1;
}
// TODO maybe only warn if ratio is at least 10
info!(
"Rebuilt layer map. Did {} insertions to process a batch of {} updates.",
num_inserted, num_updates,
)
}
/// Iterate all the layers
pub fn iter(&self) -> impl '_ + Iterator<Item = Value> {
// NOTE we can actually perform this without rebuilding,
// but it's not necessary for now.
if !self.buffer.is_empty() {
panic!("rebuild pls")
}
self.layers.values().cloned()
}
/// Return a reference to a queryable map, assuming all updates
/// have already been processed using self.rebuild()
pub fn get(&self) -> anyhow::Result<&HistoricLayerCoverage<Value>> {
// NOTE we error here instead of implicitly rebuilding because
// rebuilding is somewhat expensive.
// TODO maybe implicitly rebuild and log/sentry an error?
if !self.buffer.is_empty() {
anyhow::bail!("rebuild required")
}
Ok(&self.historic_coverage)
}
}
#[test]
fn test_retroactive_regression_1() {
let mut map = BufferedHistoricLayerCoverage::new();
map.insert(
LayerKey {
key: 0..21267647932558653966460912964485513215,
lsn: 23761336..23761457,
is_image: false,
},
"sdfsdfs".to_string(),
);
map.rebuild();
let version = map.get().unwrap().get_version(23761457).unwrap();
assert_eq!(
version.delta_coverage.query(100),
Some("sdfsdfs".to_string())
);
}
#[test]
fn test_retroactive_simple() {
let mut map = BufferedHistoricLayerCoverage::new();
// Append some images in increasing LSN order
map.insert(
LayerKey {
key: 0..5,
lsn: 100..101,
is_image: true,
},
"Image 1".to_string(),
);
map.insert(
LayerKey {
key: 3..9,
lsn: 110..111,
is_image: true,
},
"Image 2".to_string(),
);
map.insert(
LayerKey {
key: 4..6,
lsn: 120..121,
is_image: true,
},
"Image 3".to_string(),
);
map.insert(
LayerKey {
key: 8..9,
lsn: 120..121,
is_image: true,
},
"Image 4".to_string(),
);
// Add a delta layer out of order
map.insert(
LayerKey {
key: 2..5,
lsn: 105..106,
is_image: true,
},
"Delta 1".to_string(),
);
// Rebuild so we can start querying
map.rebuild();
// Query key 4
let version = map.get().unwrap().get_version(90);
assert!(version.is_none());
let version = map.get().unwrap().get_version(102).unwrap();
assert_eq!(version.image_coverage.query(4), Some("Image 1".to_string()));
let version = map.get().unwrap().get_version(107).unwrap();
assert_eq!(version.image_coverage.query(4), Some("Delta 1".to_string()));
let version = map.get().unwrap().get_version(115).unwrap();
assert_eq!(version.image_coverage.query(4), Some("Image 2".to_string()));
let version = map.get().unwrap().get_version(125).unwrap();
assert_eq!(version.image_coverage.query(4), Some("Image 3".to_string()));
// Remove Image 3
map.remove(LayerKey {
key: 4..6,
lsn: 120..121,
is_image: true,
});
map.rebuild();
// Check deletion worked
let version = map.get().unwrap().get_version(125).unwrap();
assert_eq!(version.image_coverage.query(4), Some("Image 2".to_string()));
assert_eq!(version.image_coverage.query(8), Some("Image 4".to_string()));
}

View File

@@ -0,0 +1,154 @@
use std::ops::Range;
// TODO the `im` crate has 20x more downloads and also has
// persistent/immutable BTree. It also runs a bit faster but
// results are not the same on some tests.
use rpds::RedBlackTreeMapSync;
/// Data structure that can efficiently:
/// - find the latest layer by lsn.end at a given key
/// - iterate the latest layers in a key range
/// - insert layers in non-decreasing lsn.start order
///
/// The struct is parameterized over Value for easier
/// testing, but in practice it's some sort of layer.
pub struct LayerCoverage<Value> {
/// For every change in coverage (as we sweep the key space)
/// we store (lsn.end, value).
///
/// We use an immutable/persistent tree so that we can keep historic
/// versions of this coverage without cloning the whole thing and
/// incurring quadratic memory cost. See HistoricLayerCoverage.
///
/// We use the Sync version of the map because we want Self to
/// be Sync. Using nonsync might be faster, if we can work with
/// that.
nodes: RedBlackTreeMapSync<i128, Option<(u64, Value)>>,
}
impl<T: Clone> Default for LayerCoverage<T> {
fn default() -> Self {
Self::new()
}
}
impl<Value: Clone> LayerCoverage<Value> {
pub fn new() -> Self {
Self {
nodes: RedBlackTreeMapSync::default(),
}
}
/// Helper function to subdivide the key range without changing any values
///
/// Complexity: O(log N)
fn add_node(&mut self, key: i128) {
let value = match self.nodes.range(..=key).last() {
Some((_, Some(v))) => Some(v.clone()),
Some((_, None)) => None,
None => None,
};
self.nodes.insert_mut(key, value);
}
/// Insert a layer.
///
/// Complexity: worst case O(N), in practice O(log N). See NOTE in implementation.
pub fn insert(&mut self, key: Range<i128>, lsn: Range<u64>, value: Value) {
// Add nodes at endpoints
//
// NOTE The order of lines is important. We add nodes at the start
// and end of the key range **before updating any nodes** in order
// to pin down the current coverage outside of the relevant key range.
// Only the coverage inside the layer's key range should change.
self.add_node(key.start);
self.add_node(key.end);
// Raise the height where necessary
//
// NOTE This loop is worst case O(N), but amortized O(log N) in the special
// case when rectangles have no height. In practice I don't think we'll see
// the kind of layer intersections needed to trigger O(N) behavior. The worst
// case is N/2 horizontal layers overlapped with N/2 vertical layers in a
// grid pattern.
let mut to_update = Vec::new();
let mut to_remove = Vec::new();
let mut prev_covered = false;
for (k, node) in self.nodes.range(key.clone()) {
let needs_cover = match node {
None => true,
Some((h, _)) => h < &lsn.end,
};
if needs_cover {
match prev_covered {
true => to_remove.push(*k),
false => to_update.push(*k),
}
}
prev_covered = needs_cover;
}
if !prev_covered {
to_remove.push(key.end);
}
for k in to_update {
self.nodes.insert_mut(k, Some((lsn.end, value.clone())));
}
for k in to_remove {
self.nodes.remove_mut(&k);
}
}
/// Get the latest (by lsn.end) layer at a given key
///
/// Complexity: O(log N)
pub fn query(&self, key: i128) -> Option<Value> {
self.nodes
.range(..=key)
.rev()
.next()?
.1
.as_ref()
.map(|(_, v)| v.clone())
}
/// Iterate the changes in layer coverage in a given range. You will likely
/// want to start with self.query(key.start), and then follow up with self.range
///
/// Complexity: O(log N + result_size)
pub fn range(&self, key: Range<i128>) -> impl '_ + Iterator<Item = (i128, Option<Value>)> {
self.nodes
.range(key)
.map(|(k, v)| (*k, v.as_ref().map(|x| x.1.clone())))
}
/// O(1) clone
pub fn clone(&self) -> Self {
Self {
nodes: self.nodes.clone(),
}
}
}
/// Image and delta coverage at a specific LSN.
pub struct LayerCoverageTuple<Value> {
pub image_coverage: LayerCoverage<Value>,
pub delta_coverage: LayerCoverage<Value>,
}
impl<T: Clone> Default for LayerCoverageTuple<T> {
fn default() -> Self {
Self {
image_coverage: LayerCoverage::default(),
delta_coverage: LayerCoverage::default(),
}
}
}
impl<Value: Clone> LayerCoverageTuple<Value> {
pub fn clone(&self) -> Self {
Self {
image_coverage: self.image_coverage.clone(),
delta_coverage: self.delta_coverage.clone(),
}
}
}

View File

@@ -196,3 +196,50 @@ pub fn downcast_remote_layer(
None
}
}
impl std::fmt::Debug for dyn Layer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Layer")
.field("short_id", &self.short_id())
.finish()
}
}
/// Holds metadata about a layer without any content. Used mostly for testing.
pub struct LayerDescriptor {
pub key: Range<Key>,
pub lsn: Range<Lsn>,
pub is_incremental: bool,
pub short_id: String,
}
impl Layer for LayerDescriptor {
fn get_key_range(&self) -> Range<Key> {
self.key.clone()
}
fn get_lsn_range(&self) -> Range<Lsn> {
self.lsn.clone()
}
fn is_incremental(&self) -> bool {
self.is_incremental
}
fn get_value_reconstruct_data(
&self,
_key: Key,
_lsn_range: Range<Lsn>,
_reconstruct_data: &mut ValueReconstructState,
) -> Result<ValueReconstructResult> {
todo!("This method shouldn't be part of the Layer trait")
}
fn short_id(&self) -> String {
self.short_id.clone()
}
fn dump(&self, _verbose: bool) -> Result<()> {
todo!()
}
}

View File

@@ -970,6 +970,7 @@ impl Timeline {
///
pub(super) fn load_layer_map(&self, disk_consistent_lsn: Lsn) -> anyhow::Result<()> {
let mut layers = self.layers.write().unwrap();
let mut updates = layers.batch_update();
let mut num_layers = 0;
let timer = self.metrics.load_layer_map_histo.start_timer();
@@ -1010,7 +1011,7 @@ impl Timeline {
trace!("found layer {}", layer.path().display());
total_physical_size += file_size;
layers.insert_historic(Arc::new(layer));
updates.insert_historic(Arc::new(layer));
num_layers += 1;
} else if let Some(deltafilename) = DeltaFileName::parse_str(&fname) {
// Create a DeltaLayer struct for each delta file.
@@ -1041,7 +1042,7 @@ impl Timeline {
trace!("found layer {}", layer.path().display());
total_physical_size += file_size;
layers.insert_historic(Arc::new(layer));
updates.insert_historic(Arc::new(layer));
num_layers += 1;
} else if fname == METADATA_FILE_NAME || fname.ends_with(".old") {
// ignore these
@@ -1067,6 +1068,7 @@ impl Timeline {
}
}
updates.flush();
layers.next_open_layer_at = Some(Lsn(disk_consistent_lsn.0) + 1);
info!(
@@ -1091,6 +1093,11 @@ impl Timeline {
// Are we missing some files that are present in remote storage?
// Create RemoteLayer instances for them.
let mut local_only_layers = local_layers;
// We're holding a layer map lock for a while but this
// method is only called during init so it's fine.
let mut layer_map = self.layers.write().unwrap();
let mut updates = layer_map.batch_update();
for remote_layer_name in &index_part.timeline_layers {
let local_layer = local_only_layers.remove(remote_layer_name);
@@ -1129,7 +1136,7 @@ impl Timeline {
anyhow::bail!("could not rename file {local_layer_path:?}: {err:?}");
} else {
self.metrics.resident_physical_size_gauge.sub(local_size);
self.layers.write().unwrap().remove_historic(local_layer);
updates.remove_historic(local_layer);
// fall-through to adding the remote layer
}
} else {
@@ -1171,7 +1178,7 @@ impl Timeline {
);
let remote_layer = Arc::new(remote_layer);
self.layers.write().unwrap().insert_historic(remote_layer);
updates.insert_historic(remote_layer);
}
LayerFileName::Delta(deltafilename) => {
// Create a RemoteLayer for the delta file.
@@ -1194,13 +1201,14 @@ impl Timeline {
&remote_layer_metadata,
);
let remote_layer = Arc::new(remote_layer);
self.layers.write().unwrap().insert_historic(remote_layer);
updates.insert_historic(remote_layer);
}
#[cfg(test)]
LayerFileName::Test(_) => unreachable!(),
}
}
updates.flush();
Ok(local_only_layers)
}
@@ -2099,10 +2107,11 @@ impl Timeline {
])?;
// Add it to the layer map
{
let mut layers = self.layers.write().unwrap();
layers.insert_historic(Arc::new(new_delta));
}
self.layers
.write()
.unwrap()
.batch_update()
.insert_historic(Arc::new(new_delta));
// update the timeline's physical size
let sz = new_delta_path.metadata()?.len();
@@ -2166,13 +2175,15 @@ impl Timeline {
// are some delta layers *later* than current 'lsn', if more WAL was processed and flushed
// after we read last_record_lsn, which is passed here in the 'lsn' argument.
if img_lsn < lsn {
let num_deltas = layers.count_deltas(&img_range, &(img_lsn..lsn))?;
let threshold = self.get_image_creation_threshold();
let num_deltas =
layers.count_deltas(&img_range, &(img_lsn..lsn), Some(threshold))?;
debug!(
"key range {}-{}, has {} deltas on this timeline in LSN range {}..{}",
img_range.start, img_range.end, num_deltas, img_lsn, lsn
);
if num_deltas >= self.get_image_creation_threshold() {
if num_deltas >= threshold {
return Ok(true);
}
}
@@ -2267,6 +2278,7 @@ impl Timeline {
let mut layer_paths_to_upload = HashMap::with_capacity(image_layers.len());
let mut layers = self.layers.write().unwrap();
let mut updates = layers.batch_update();
let timeline_path = self.conf.timeline_path(&self.timeline_id, &self.tenant_id);
for l in image_layers {
let path = l.filename();
@@ -2280,8 +2292,9 @@ impl Timeline {
self.metrics
.resident_physical_size_gauge
.add(metadata.len());
layers.insert_historic(Arc::new(l));
updates.insert_historic(Arc::new(l));
}
updates.flush();
drop(layers);
timer.stop_and_record();
@@ -2577,6 +2590,7 @@ impl Timeline {
}
let mut layers = self.layers.write().unwrap();
let mut updates = layers.batch_update();
let mut new_layer_paths = HashMap::with_capacity(new_layers.len());
for l in new_layers {
let new_delta_path = l.path();
@@ -2597,7 +2611,7 @@ impl Timeline {
new_layer_paths.insert(new_delta_path, LayerFileMetadata::new(metadata.len()));
let x: Arc<dyn PersistentLayer + 'static> = Arc::new(l);
layers.insert_historic(x);
updates.insert_historic(x);
}
// Now that we have reshuffled the data to set of new delta layers, we can
@@ -2611,8 +2625,9 @@ impl Timeline {
}
layer_names_to_delete.push(l.filename());
l.delete()?;
layers.remove_historic(l);
updates.remove_historic(l);
}
updates.flush();
drop(layers);
// Also schedule the deletions in remote storage
@@ -2812,6 +2827,7 @@ impl Timeline {
// 3. it doesn't need to be retained for 'retain_lsns';
// 4. newer on-disk image layers cover the layer's whole key range
//
// TODO holding a write lock is too agressive and avoidable
let mut layers = self.layers.write().unwrap();
'outer: for l in layers.iter_historic_layers() {
result.layers_total += 1;
@@ -2843,6 +2859,8 @@ impl Timeline {
// might be referenced by child branches forever.
// We can track this in child timeline GC and delete parent layers when
// they are no longer needed. This might be complicated with long inheritance chains.
//
// TODO Vec is not a great choice for `retain_lsns`
for retain_lsn in &retain_lsns {
// start_lsn is inclusive
if &l.get_lsn_range().start <= retain_lsn {
@@ -2896,6 +2914,7 @@ impl Timeline {
layers_to_remove.push(Arc::clone(&l));
}
let mut updates = layers.batch_update();
if !layers_to_remove.is_empty() {
// Persist the new GC cutoff value in the metadata file, before
// we actually remove anything.
@@ -2913,7 +2932,13 @@ impl Timeline {
}
layer_names_to_delete.push(doomed_layer.filename());
doomed_layer.delete()?; // FIXME: schedule succeeded deletions before returning?
layers.remove_historic(doomed_layer);
// TODO Removing from the bottom of the layer map is expensive.
// Maybe instead discard all layer map historic versions that
// won't be needed for page reconstruction for this timeline,
// and mark what we can't delete yet as deleted from the layer
// map index without actually rebuilding the index.
updates.remove_historic(doomed_layer);
result.layers_removed += 1;
}
@@ -2925,6 +2950,7 @@ impl Timeline {
remote_client.schedule_layer_file_deletion(&layer_names_to_delete)?;
}
}
updates.flush();
info!(
"GC completed removing {} layers, cutoff {}",
@@ -3081,11 +3107,13 @@ impl Timeline {
// Delta- or ImageLayer in the layer map.
let new_layer = remote_layer.create_downloaded_layer(self_clone.conf, *size);
let mut layers = self_clone.layers.write().unwrap();
let mut updates = layers.batch_update();
{
let l: Arc<dyn PersistentLayer> = remote_layer.clone();
layers.remove_historic(l);
updates.remove_historic(l);
}
layers.insert_historic(new_layer);
updates.insert_historic(new_layer);
updates.flush();
drop(layers);
// Now that we've inserted the download into the layer map,

View File

@@ -31,7 +31,7 @@ memchr = { version = "2" }
nom = { version = "7" }
num-bigint = { version = "0.4" }
num-integer = { version = "0.1", features = ["i128"] }
num-traits = { version = "0.2", features = ["i128", "libm"] }
num-traits = { version = "0.2", features = ["i128"] }
prost = { version = "0.11" }
rand = { version = "0.8", features = ["small_rng"] }
regex = { version = "1" }