diff --git a/Cargo.lock b/Cargo.lock index d8aba9ba68..edb739f017 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -37,11 +37,6 @@ dependencies = [ "memchr", ] -[[package]] -name = "amplify_num" -version = "0.4.1" -source = "git+https://github.com/rust-amplify/rust-amplify.git?tag=v4.0.0-beta.1#3ad006cf2804e1862ec7725a7684a493f3023523" - [[package]] name = "android_system_properties" version = "0.1.5" @@ -66,6 +61,15 @@ dependencies = [ "backtrace", ] +[[package]] +name = "archery" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0a8da9bc4c4053ee067669762bcaeea6e241841295a2b6c948312dad6ef4cc02" +dependencies = [ + "static_assertions", +] + [[package]] name = "asn1-rs" version = "0.5.1" @@ -137,15 +141,6 @@ dependencies = [ "syn", ] -[[package]] -name = "atomic-polyfill" -version = "0.1.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e3ff7eb3f316534d83a8a2c3d1674ace8a5a71198eba31e2e2b597833f699b28" -dependencies = [ - "critical-section", -] - [[package]] name = "atty" version = "0.2.14" @@ -629,9 +624,9 @@ dependencies = [ [[package]] name = "bumpalo" -version = "3.11.1" +version = "3.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "572f695136211188308f16ad2ca5c851a712c464060ae6974944458eb83880ba" +checksum = "0d261e256854913907f67ed06efbc3338dfe6179796deefc1ff763fc1aee5535" [[package]] name = "byteorder" @@ -750,13 +745,13 @@ dependencies = [ [[package]] name = "clap" -version = "4.0.32" +version = "4.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a7db700bc935f9e43e88d00b0850dae18a63773cfbec6d8e070fccf7fef89a39" +checksum = "4ec7a4128863c188deefe750ac1d1dfe66c236909f845af04beed823638dc1b2" dependencies = [ "bitflags", "clap_derive", - "clap_lex 0.3.0", + "clap_lex 0.3.1", "is-terminal", "once_cell", "strsim", @@ -765,9 +760,9 @@ dependencies = [ [[package]] name = "clap_derive" -version = "4.0.21" +version = "4.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0177313f9f02afc995627906bbd8967e2be069f5261954222dac78290c2b9014" +checksum = "684a277d672e91966334af371f1a7b5833f9aa00b07c84e92fbce95e00208ce8" dependencies = [ "heck", "proc-macro-error", @@ -787,9 +782,9 @@ dependencies = [ [[package]] name = "clap_lex" -version = "0.3.0" +version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0d4198f73e42b4936b35b5bb248d81d2b595ecb170da0bac7655c54eedfa8da8" +checksum = "783fe232adfca04f90f56201b26d79682d4cd2625e0bc7290b95123afe558ade" dependencies = [ "os_str_bytes", ] @@ -832,7 +827,7 @@ version = "0.1.0" dependencies = [ "anyhow", "chrono", - "clap 4.0.32", + "clap 4.1.1", "futures", "hyper", "notify", @@ -887,7 +882,7 @@ name = "control_plane" version = "0.1.0" dependencies = [ "anyhow", - "clap 4.0.32", + "clap 4.1.1", "comfy-table", "git-version", "nix", @@ -988,12 +983,6 @@ dependencies = [ "itertools", ] -[[package]] -name = "critical-section" -version = "1.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6548a0ad5d2549e111e1f6a11a6c2e2d00ce6a3dafe22948d67c2b443f775e52" - [[package]] name = "crossbeam-channel" version = "0.5.6" @@ -1030,12 +1019,11 @@ dependencies = [ [[package]] name = "crossbeam-utils" -version = "0.8.11" +version = "0.8.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "51887d4adc7b564537b15adcfb307936f8075dfcd5f00dde9a9f1d29383682bc" +checksum = "4fb766fa798726286dbbb842f174001dab8abc7b627a1dd86e0b7222a95d929f" dependencies = [ "cfg-if", - "once_cell", ] [[package]] @@ -1506,15 +1494,6 @@ version = "1.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eabb4a44450da02c90444cf74558da904edde8fb4e9035a9a6a4e15445af0bd7" -[[package]] -name = "hash32" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b0c35f58762feb77d74ebe43bdbc3210f09be9fe6742234d573bacc26ed92b67" -dependencies = [ - "byteorder", -] - [[package]] name = "hashbrown" version = "0.12.3" @@ -1530,19 +1509,6 @@ dependencies = [ "ahash", ] -[[package]] -name = "heapless" -version = "0.7.16" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db04bc24a18b9ea980628ecf00e6c0264f3c1426dac36c00cb49b6fbad8b0743" -dependencies = [ - "atomic-polyfill", - "hash32", - "rustc_version", - "spin 0.9.4", - "stable_deref_trait", -] - [[package]] name = "heck" version = "0.4.0" @@ -1804,9 +1770,9 @@ dependencies = [ [[package]] name = "io-lifetimes" -version = "1.0.3" +version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "46112a93252b123d31a119a8d1a1ac19deac4fac6e0e8b0df58f0d4e5870e63c" +checksum = "e7d6c6f8c91b4b9ed43484ad1a938e393caf35960fce7f82a040497207bd8e9e" dependencies = [ "libc", "windows-sys", @@ -1916,12 +1882,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "libm" -version = "0.2.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "348108ab3fba42ec82ff6e9564fc4ca0247bdccdc68dd8af9764bbc79c3c8ffb" - [[package]] name = "link-cplusplus" version = "1.0.8" @@ -2067,9 +2027,9 @@ checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" [[package]] name = "nix" -version = "0.26.1" +version = "0.26.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "46a58d1d356c6597d08cde02c2f09d785b09e28711837b1ed667dc652c08a694" +checksum = "bfdda3d196821d6af13126e40375cdf7da646a96114af134d5f417a9a1dc8e1a" dependencies = [ "bitflags", "cfg-if", @@ -2081,9 +2041,9 @@ dependencies = [ [[package]] name = "nom" -version = "7.1.2" +version = "7.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e5507769c4919c998e69e49c839d9dc6e693ede4cc4290d6ad8b41d4f09c548c" +checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a" dependencies = [ "memchr", "minimal-lexical", @@ -2154,7 +2114,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "578ede34cf02f8924ab9447f50c28075b4d3e5b269972345e7e0372b38c6cdcd" dependencies = [ "autocfg", - "libm", ] [[package]] @@ -2230,14 +2189,13 @@ checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" name = "pageserver" version = "0.1.0" dependencies = [ - "amplify_num", "anyhow", "async-stream", "async-trait", "byteorder", "bytes", "chrono", - "clap 4.0.32", + "clap 4.1.1", "close_fds", "const_format", "consumption_metrics", @@ -2269,7 +2227,7 @@ dependencies = [ "regex", "remote_storage", "reqwest", - "rstar", + "rpds", "scopeguard", "serde", "serde_json", @@ -2581,9 +2539,9 @@ checksum = "dc375e1527247fe1a97d8b7156678dfe7c1af2fc075c9a4db3690ecd2a148068" [[package]] name = "proc-macro2" -version = "1.0.49" +version = "1.0.50" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "57a8eca9f9c4ffde41714334dee777596264c7825420f521abc92b5b5deb63a5" +checksum = "6ef7d57beacfaf2d8aee5937dab7b7f28de3cb8b1828479bb5de2a7106f2bae2" dependencies = [ "unicode-ident", ] @@ -2683,7 +2641,7 @@ dependencies = [ "bstr", "bytes", "chrono", - "clap 4.0.32", + "clap 4.1.1", "consumption_metrics", "futures", "git-version", @@ -2742,14 +2700,13 @@ dependencies = [ [[package]] name = "rand" -version = "0.8.4" +version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2e7573632e6454cf6b99d7aac4ccca54be06da05aca2ef7423d22d27d4d4bcd8" +checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" dependencies = [ "libc", "rand_chacha", "rand_core", - "rand_hc", ] [[package]] @@ -2771,15 +2728,6 @@ dependencies = [ "getrandom", ] -[[package]] -name = "rand_hc" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d51e9f596de227fda2ea6c84607f5558e196eeaf43c986b724ba4fb8fdf497e7" -dependencies = [ - "rand_core", -] - [[package]] name = "rayon" version = "1.6.1" @@ -2930,7 +2878,7 @@ dependencies = [ "cc", "libc", "once_cell", - "spin 0.5.2", + "spin", "untrusted", "web-sys", "winapi", @@ -2950,14 +2898,12 @@ dependencies = [ ] [[package]] -name = "rstar" -version = "0.9.3" +name = "rpds" +version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b40f1bfe5acdab44bc63e6699c28b74f75ec43afb59f3eda01e145aff86a25fa" +checksum = "66262ea963eff99163e6b741fbc3417a52cc13074728c1047e9911789df9b000" dependencies = [ - "heapless", - "num-traits", - "smallvec", + "archery", ] [[package]] @@ -3018,9 +2964,9 @@ dependencies = [ [[package]] name = "rustix" -version = "0.36.6" +version = "0.36.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4feacf7db682c6c329c4ede12649cd36ecab0f3be5b7d74e6a20304725db4549" +checksum = "d4fdebc4b395b7fbb9ab11e462e20ed9051e7b16e42d24042c776eca0ac81b03" dependencies = [ "bitflags", "errno", @@ -3093,7 +3039,7 @@ dependencies = [ "async-trait", "byteorder", "bytes", - "clap 4.0.32", + "clap 4.1.1", "const_format", "crc32c", "fs2", @@ -3479,21 +3425,6 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" -[[package]] -name = "spin" -version = "0.9.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f6002a767bff9e83f8eeecf883ecb8011875a21ae8da43bffb817a57e78cc09" -dependencies = [ - "lock_api", -] - -[[package]] -name = "stable_deref_trait" -version = "1.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" - [[package]] name = "static_assertions" version = "1.1.0" @@ -3507,7 +3438,7 @@ dependencies = [ "anyhow", "async-stream", "bytes", - "clap 4.0.32", + "clap 4.1.1", "const_format", "futures", "futures-core", @@ -3639,9 +3570,9 @@ dependencies = [ [[package]] name = "termcolor" -version = "1.1.3" +version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bab24d30b911b2376f3a13cc2cd443142f0c81dda04c118693e35b3835757755" +checksum = "be55cf8942feac5c765c2c993422806843c9a9a45d4d5c407ad6dd2ea95eb9b6" dependencies = [ "winapi-util", ] @@ -3749,9 +3680,9 @@ dependencies = [ [[package]] name = "tokio" -version = "1.24.1" +version = "1.24.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d9f76183f91ecfb55e1d7d5602bd1d979e38a3a522fe900241cf195624d67ae" +checksum = "597a12a59981d9e3c38d216785b0c37399f6e415e8d0712047620f189371b0bb" dependencies = [ "autocfg", "bytes", @@ -4183,9 +4114,9 @@ checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a" [[package]] name = "ureq" -version = "2.6.1" +version = "2.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "733b5ad78377302af52c0dbcb2623d78fe50e4b3bf215948ff29e9ee031d8566" +checksum = "338b31dd1314f68f3aabf3ed57ab922df95ffcd902476ca7ba3c4ce7b908c46d" dependencies = [ "base64 0.13.1", "log", @@ -4287,7 +4218,7 @@ name = "wal_craft" version = "0.1.0" dependencies = [ "anyhow", - "clap 4.0.32", + "clap 4.1.1", "env_logger", "log", "once_cell", @@ -4534,7 +4465,7 @@ dependencies = [ "anyhow", "bytes", "chrono", - "clap 4.0.32", + "clap 4.1.1", "crossbeam-utils", "either", "fail", diff --git a/Cargo.toml b/Cargo.toml index 74cc16d690..57f4b1d981 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -69,7 +69,7 @@ rand = "0.8" regex = "1.4" reqwest = { version = "0.11", default-features = false, features = ["rustls-tls"] } routerify = "3" -rstar = "0.9.3" +rpds = "0.12.0" rustls = "0.20" rustls-pemfile = "1" rustls-split = "0.3" @@ -107,9 +107,6 @@ x509-parser = "0.14" env_logger = "0.10" log = "0.4" -## TODO switch when the new release is made -amplify_num = { git = "https://github.com/rust-amplify/rust-amplify.git", tag = "v4.0.0-beta.1" } - ## Libraries from neondatabase/ git forks, ideally with changes to be upstreamed postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="43e6db254a97fdecbce33d8bc0890accfd74495e" } postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="43e6db254a97fdecbce33d8bc0890accfd74495e" } diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index cb9e4478bf..66c25e8576 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -11,7 +11,6 @@ default = [] testing = ["fail/failpoints"] [dependencies] -amplify_num.workspace = true anyhow.workspace = true async-stream.workspace = true async-trait.workspace = true @@ -41,7 +40,6 @@ postgres-protocol.workspace = true postgres-types.workspace = true rand.workspace = true regex.workspace = true -rstar.workspace = true scopeguard.workspace = true serde.workspace = true serde_json = { workspace = true, features = ["raw_value"] } @@ -68,6 +66,7 @@ tenant_size_model.workspace = true utils.workspace = true workspace_hack.workspace = true reqwest.workspace = true +rpds.workspace = true [dev-dependencies] criterion.workspace = true diff --git a/pageserver/benches/bench_layer_map.rs b/pageserver/benches/bench_layer_map.rs index 6a01fdfc6f..e18c00da96 100644 --- a/pageserver/benches/bench_layer_map.rs +++ b/pageserver/benches/bench_layer_map.rs @@ -1,13 +1,12 @@ -use anyhow::Result; +use pageserver::keyspace::{KeyPartitioning, KeySpace}; use pageserver::repository::Key; use pageserver::tenant::layer_map::LayerMap; -use pageserver::tenant::storage_layer::{DeltaFileName, ImageFileName, ValueReconstructState}; -use pageserver::tenant::storage_layer::{Layer, ValueReconstructResult}; +use pageserver::tenant::storage_layer::Layer; +use pageserver::tenant::storage_layer::{DeltaFileName, ImageFileName, LayerDescriptor}; use rand::prelude::{SeedableRng, SliceRandom, StdRng}; use std::cmp::{max, min}; use std::fs::File; use std::io::{BufRead, BufReader}; -use std::ops::Range; use std::path::PathBuf; use std::str::FromStr; use std::sync::Arc; @@ -17,102 +16,35 @@ use utils::lsn::Lsn; use criterion::{criterion_group, criterion_main, Criterion}; -struct DummyDelta { - key_range: Range, - lsn_range: Range, -} - -impl Layer for DummyDelta { - fn get_key_range(&self) -> Range { - self.key_range.clone() - } - - fn get_lsn_range(&self) -> Range { - self.lsn_range.clone() - } - fn get_value_reconstruct_data( - &self, - _key: Key, - _lsn_range: Range, - _reconstruct_data: &mut ValueReconstructState, - ) -> Result { - panic!() - } - - fn is_incremental(&self) -> bool { - true - } - - fn dump(&self, _verbose: bool) -> Result<()> { - unimplemented!() - } - - fn short_id(&self) -> String { - unimplemented!() - } -} - -struct DummyImage { - key_range: Range, - lsn: Lsn, -} - -impl Layer for DummyImage { - fn get_key_range(&self) -> Range { - self.key_range.clone() - } - - fn get_lsn_range(&self) -> Range { - // End-bound is exclusive - self.lsn..(self.lsn + 1) - } - - fn get_value_reconstruct_data( - &self, - _key: Key, - _lsn_range: Range, - _reconstruct_data: &mut ValueReconstructState, - ) -> Result { - panic!() - } - - fn is_incremental(&self) -> bool { - false - } - - fn dump(&self, _verbose: bool) -> Result<()> { - unimplemented!() - } - - fn short_id(&self) -> String { - unimplemented!() - } -} - -fn build_layer_map(filename_dump: PathBuf) -> LayerMap { - let mut layer_map = LayerMap::::default(); +fn build_layer_map(filename_dump: PathBuf) -> LayerMap { + let mut layer_map = LayerMap::::default(); let mut min_lsn = Lsn(u64::MAX); let mut max_lsn = Lsn(0); let filenames = BufReader::new(File::open(filename_dump).unwrap()).lines(); + let mut updates = layer_map.batch_update(); for fname in filenames { let fname = &fname.unwrap(); if let Some(imgfilename) = ImageFileName::parse_str(fname) { - let layer = DummyImage { - key_range: imgfilename.key_range, - lsn: imgfilename.lsn, + let layer = LayerDescriptor { + key: imgfilename.key_range, + lsn: imgfilename.lsn..(imgfilename.lsn + 1), + is_incremental: false, + short_id: fname.to_string(), }; - layer_map.insert_historic(Arc::new(layer)); + updates.insert_historic(Arc::new(layer)); min_lsn = min(min_lsn, imgfilename.lsn); max_lsn = max(max_lsn, imgfilename.lsn); } else if let Some(deltafilename) = DeltaFileName::parse_str(fname) { - let layer = DummyDelta { - key_range: deltafilename.key_range, - lsn_range: deltafilename.lsn_range.clone(), + let layer = LayerDescriptor { + key: deltafilename.key_range.clone(), + lsn: deltafilename.lsn_range.clone(), + is_incremental: true, + short_id: fname.to_string(), }; - layer_map.insert_historic(Arc::new(layer)); + updates.insert_historic(Arc::new(layer)); min_lsn = min(min_lsn, deltafilename.lsn_range.start); max_lsn = max(max_lsn, deltafilename.lsn_range.end); } else { @@ -122,11 +54,12 @@ fn build_layer_map(filename_dump: PathBuf) -> LayerMap { println!("min: {min_lsn}, max: {max_lsn}"); + updates.flush(); layer_map } /// Construct a layer map query pattern for benchmarks -fn uniform_query_pattern(layer_map: &LayerMap) -> Vec<(Key, Lsn)> { +fn uniform_query_pattern(layer_map: &LayerMap) -> Vec<(Key, Lsn)> { // For each image layer we query one of the pages contained, at LSN right // before the image layer was created. This gives us a somewhat uniform // coverage of both the lsn and key space because image layers have @@ -150,6 +83,41 @@ fn uniform_query_pattern(layer_map: &LayerMap) -> Vec<(Key, Lsn)> { .collect() } +// Construct a partitioning for testing get_difficulty map when we +// don't have an exact result of `collect_keyspace` to work with. +fn uniform_key_partitioning(layer_map: &LayerMap, _lsn: Lsn) -> KeyPartitioning { + let mut parts = Vec::new(); + + // We add a partition boundary at the start of each image layer, + // no matter what lsn range it covers. This is just the easiest + // thing to do. A better thing to do would be to get a real + // partitioning from some database. Even better, remove the need + // for key partitions by deciding where to create image layers + // directly based on a coverage-based difficulty map. + let mut keys: Vec<_> = layer_map + .iter_historic_layers() + .filter_map(|l| { + if l.is_incremental() { + None + } else { + let kr = l.get_key_range(); + Some(kr.start.next()) + } + }) + .collect(); + keys.sort(); + + let mut current_key = Key::from_hex("000000000000000000000000000000000000").unwrap(); + for key in keys { + parts.push(KeySpace { + ranges: vec![current_key..key], + }); + current_key = key; + } + + KeyPartitioning { parts } +} + // Benchmark using metadata extracted from our performance test environment, from // a project where we have run pgbench many timmes. The pgbench database was initialized // between each test run. @@ -183,24 +151,68 @@ fn bench_from_captest_env(c: &mut Criterion) { // Benchmark using metadata extracted from a real project that was taknig // too long processing layer map queries. fn bench_from_real_project(c: &mut Criterion) { - // TODO consider compressing this file + // Init layer map + let now = Instant::now(); let layer_map = build_layer_map(PathBuf::from("benches/odd-brook-layernames.txt")); + println!("Finished layer map init in {:?}", now.elapsed()); + + // Choose uniformly distributed queries let queries: Vec<(Key, Lsn)> = uniform_query_pattern(&layer_map); - // Test with uniform query pattern - c.bench_function("real_map_uniform_queries", |b| { + // Choose inputs for get_difficulty_map + let latest_lsn = layer_map + .iter_historic_layers() + .map(|l| l.get_lsn_range().end) + .max() + .unwrap(); + let partitioning = uniform_key_partitioning(&layer_map, latest_lsn); + + // Check correctness of get_difficulty_map + // TODO put this in a dedicated test outside of this mod + { + println!("running correctness check"); + + let now = Instant::now(); + let result_bruteforce = layer_map.get_difficulty_map_bruteforce(latest_lsn, &partitioning); + assert!(result_bruteforce.len() == partitioning.parts.len()); + println!("Finished bruteforce in {:?}", now.elapsed()); + + let now = Instant::now(); + let result_fast = layer_map.get_difficulty_map(latest_lsn, &partitioning, None); + assert!(result_fast.len() == partitioning.parts.len()); + println!("Finished fast in {:?}", now.elapsed()); + + // Assert results are equal. Manually iterate for easier debugging. + let zip = std::iter::zip( + &partitioning.parts, + std::iter::zip(result_bruteforce, result_fast), + ); + for (_part, (bruteforce, fast)) in zip { + assert_eq!(bruteforce, fast); + } + + println!("No issues found"); + } + + // Define and name the benchmark function + let mut group = c.benchmark_group("real_map"); + group.bench_function("uniform_queries", |b| { b.iter(|| { for q in queries.clone().into_iter() { layer_map.search(q.0, q.1); } }); }); + group.bench_function("get_difficulty_map", |b| { + b.iter(|| { + layer_map.get_difficulty_map(latest_lsn, &partitioning, Some(3)); + }); + }); + group.finish(); } // Benchmark using synthetic data. Arrange image layers on stacked diagonal lines. fn bench_sequential(c: &mut Criterion) { - let mut layer_map: LayerMap = LayerMap::default(); - // Init layer map. Create 100_000 layers arranged in 1000 diagonal lines. // // TODO This code is pretty slow and runs even if we're only running other @@ -208,39 +220,39 @@ fn bench_sequential(c: &mut Criterion) { // Putting it inside the `bench_function` closure is not a solution // because then it runs multiple times during warmup. let now = Instant::now(); + let mut layer_map = LayerMap::default(); + let mut updates = layer_map.batch_update(); for i in 0..100_000 { - // TODO try inserting a super-wide layer in between every 10 to reflect - // what often happens with L1 layers that include non-rel changes. - // Maybe do that as a separate test. let i32 = (i as u32) % 100; let zero = Key::from_hex("000000000000000000000000000000000000").unwrap(); - let layer = DummyImage { - key_range: zero.add(10 * i32)..zero.add(10 * i32 + 1), - lsn: Lsn(10 * i), + let layer = LayerDescriptor { + key: zero.add(10 * i32)..zero.add(10 * i32 + 1), + lsn: Lsn(i)..Lsn(i + 1), + is_incremental: false, + short_id: format!("Layer {}", i), }; - layer_map.insert_historic(Arc::new(layer)); + updates.insert_historic(Arc::new(layer)); } - - // Manually measure runtime without criterion because criterion - // has a minimum sample size of 10 and I don't want to run it 10 times. - println!("Finished init in {:?}", now.elapsed()); + updates.flush(); + println!("Finished layer map init in {:?}", now.elapsed()); // Choose 100 uniformly random queries let rng = &mut StdRng::seed_from_u64(1); let queries: Vec<(Key, Lsn)> = uniform_query_pattern(&layer_map) - .choose_multiple(rng, 1) + .choose_multiple(rng, 100) .copied() .collect(); // Define and name the benchmark function - c.bench_function("sequential_uniform_queries", |b| { - // Run the search queries + let mut group = c.benchmark_group("sequential"); + group.bench_function("uniform_queries", |b| { b.iter(|| { for q in queries.clone().into_iter() { layer_map.search(q.0, q.1); } }); }); + group.finish(); } criterion_group!(group_1, bench_from_captest_env); diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index 586fd20886..092503b7c5 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -37,6 +37,17 @@ impl Key { | self.field6 as i128 } + pub fn from_i128(x: i128) -> Self { + Key { + field1: ((x >> 120) & 0xf) as u8, + field2: ((x >> 104) & 0xFFFF) as u32, + field3: (x >> 72) as u32, + field4: (x >> 40) as u32, + field5: (x >> 32) as u8, + field6: x as u32, + } + } + pub fn next(&self) -> Key { self.add(1) } diff --git a/pageserver/src/tenant/layer_map.rs b/pageserver/src/tenant/layer_map.rs index 01c5359e88..ed1a32c8fd 100644 --- a/pageserver/src/tenant/layer_map.rs +++ b/pageserver/src/tenant/layer_map.rs @@ -9,24 +9,57 @@ //! are frozen, and it is split up into new image and delta layers and the //! corresponding files are written to disk. //! +//! Design overview: +//! +//! The `search` method of the layer map is on the read critical path, so we've +//! built an efficient data structure for fast reads, stored in `LayerMap::historic`. +//! Other read methods are less critical but still impact performance of background tasks. +//! +//! This data structure relies on a persistent/immutable binary search tree. See the +//! following lecture for an introduction https://www.youtube.com/watch?v=WqCWghETNDc&t=581s +//! Summary: A persistent/immutable BST (and persistent data structures in general) allows +//! you to modify the tree in such a way that each modification creates a new "version" +//! of the tree. When you modify it, you get a new version, but all previous versions are +//! still accessible too. So if someone is still holding a reference to an older version, +//! they continue to see the tree as it was then. The persistent BST stores all the +//! different versions in an efficient way. +//! +//! Our persistent BST maintains a map of which layer file "covers" each key. It has only +//! one dimension, the key. See `layer_coverage.rs`. We use the persistent/immutable property +//! to handle the LSN dimension. +//! +//! To build the layer map, we insert each layer to the persistent BST in LSN.start order, +//! starting from the oldest one. After each insertion, we grab a reference to that "version" +//! of the tree, and store it in another tree, a BtreeMap keyed by the LSN. See +//! `historic_layer_coverage.rs`. +//! +//! To search for a particular key-LSN pair, you first look up the right "version" in the +//! BTreeMap. Then you search that version of the BST with the key. +//! +//! The persistent BST keeps all the versions, but there is no way to change the old versions +//! afterwards. We can add layers as long as they have larger LSNs than any previous layer in +//! the map, but if we need to remove a layer, or insert anything with an older LSN, we need +//! to throw away most of the persistent BST and build a new one, starting from the oldest +//! LSN. See `LayerMap::flush_updates()`. +//! +mod historic_layer_coverage; +mod layer_coverage; + +use crate::keyspace::KeyPartitioning; use crate::metrics::NUM_ONDISK_LAYERS; use crate::repository::Key; -use crate::tenant::storage_layer::{range_eq, range_overlaps}; -use amplify_num::i256; +use crate::tenant::storage_layer::InMemoryLayer; +use crate::tenant::storage_layer::Layer; use anyhow::Result; -use num_traits::identities::{One, Zero}; -use num_traits::{Bounded, Num, Signed}; -use rstar::{RTree, RTreeObject, AABB}; -use std::cmp::Ordering; use std::collections::VecDeque; use std::ops::Range; -use std::ops::{Add, Div, Mul, Neg, Rem, Sub}; use std::sync::Arc; -use tracing::*; use utils::lsn::Lsn; -use super::storage_layer::{InMemoryLayer, Layer}; +use historic_layer_coverage::BufferedHistoricLayerCoverage; + +use super::storage_layer::range_eq; /// /// LayerMap tracks what layers exist on a timeline. @@ -51,8 +84,8 @@ pub struct LayerMap { /// pub frozen_layers: VecDeque>, - /// All the historic layers are kept here - historic_layers: RTree>, + /// Index of the historic layers optimized for search + historic: BufferedHistoricLayerCoverage>, /// L0 layers have key range Key::MIN..Key::MAX, and locating them using R-Tree search is very inefficient. /// So L0 layers are held in l0_delta_layers vector, in addition to the R-tree. @@ -65,177 +98,64 @@ impl Default for LayerMap { open_layer: None, next_open_layer_at: None, frozen_layers: VecDeque::default(), - historic_layers: RTree::default(), l0_delta_layers: Vec::default(), + historic: BufferedHistoricLayerCoverage::default(), } } } -struct LayerRTreeObject { - layer: Arc, - - envelope: AABB<[IntKey; 2]>, +/// The primary update API for the layer map. +/// +/// Batching historic layer insertions and removals is good for +/// performance and this struct helps us do that correctly. +#[must_use] +pub struct BatchedUpdates<'a, L: ?Sized + Layer> { + // While we hold this exclusive reference to the layer map the type checker + // will prevent us from accidentally reading any unflushed updates. + layer_map: &'a mut LayerMap, } -// Representation of Key as numeric type. -// We can not use native implementation of i128, because rstar::RTree -// doesn't handle properly integer overflow during area calculation: sum(Xi*Yi). -// Overflow will cause panic in debug mode and incorrect area calculation in release mode, -// which leads to non-optimally balanced R-Tree (but doesn't fit correctness of R-Tree work). -// By using i256 as the type, even though all the actual values would fit in i128, we can be -// sure that multiplication doesn't overflow. -// - -#[derive(Clone, PartialEq, Eq, PartialOrd, Debug)] -struct IntKey(i256); - -impl Copy for IntKey {} - -impl IntKey { - fn from(i: i128) -> Self { - IntKey(i256::from(i)) - } -} - -impl Bounded for IntKey { - fn min_value() -> Self { - IntKey(i256::MIN) - } - fn max_value() -> Self { - IntKey(i256::MAX) - } -} - -impl Signed for IntKey { - fn is_positive(&self) -> bool { - self.0 > i256::ZERO - } - fn is_negative(&self) -> bool { - self.0 < i256::ZERO - } - fn signum(&self) -> Self { - match self.0.cmp(&i256::ZERO) { - Ordering::Greater => IntKey(i256::ONE), - Ordering::Less => IntKey(-i256::ONE), - Ordering::Equal => IntKey(i256::ZERO), - } - } - fn abs(&self) -> Self { - IntKey(self.0.abs()) - } - fn abs_sub(&self, other: &Self) -> Self { - if self.0 <= other.0 { - IntKey(i256::ZERO) - } else { - IntKey(self.0 - other.0) - } - } -} - -impl Neg for IntKey { - type Output = Self; - fn neg(self) -> Self::Output { - IntKey(-self.0) - } -} - -impl Rem for IntKey { - type Output = Self; - fn rem(self, rhs: Self) -> Self::Output { - IntKey(self.0 % rhs.0) - } -} - -impl Div for IntKey { - type Output = Self; - fn div(self, rhs: Self) -> Self::Output { - IntKey(self.0 / rhs.0) - } -} - -impl Add for IntKey { - type Output = Self; - fn add(self, rhs: Self) -> Self::Output { - IntKey(self.0 + rhs.0) - } -} - -impl Sub for IntKey { - type Output = Self; - fn sub(self, rhs: Self) -> Self::Output { - IntKey(self.0 - rhs.0) - } -} - -impl Mul for IntKey { - type Output = Self; - fn mul(self, rhs: Self) -> Self::Output { - IntKey(self.0 * rhs.0) - } -} - -impl One for IntKey { - fn one() -> Self { - IntKey(i256::ONE) - } -} - -impl Zero for IntKey { - fn zero() -> Self { - IntKey(i256::ZERO) - } - fn is_zero(&self) -> bool { - self.0 == i256::ZERO - } -} - -impl Num for IntKey { - type FromStrRadixErr = ::FromStrRadixErr; - fn from_str_radix(str: &str, radix: u32) -> Result { - Ok(IntKey(i256::from(i128::from_str_radix(str, radix)?))) - } -} - -impl PartialEq for LayerRTreeObject { - fn eq(&self, other: &Self) -> bool { - // FIXME: ptr_eq might fail to return true for 'dyn' - // references. Clippy complains about this. In practice it - // seems to work, the assertion below would be triggered - // otherwise but this ought to be fixed. - #[allow(clippy::vtable_address_comparisons)] - Arc::ptr_eq(&self.layer, &other.layer) - } -} - -impl RTreeObject for LayerRTreeObject -where - L: ?Sized, -{ - type Envelope = AABB<[IntKey; 2]>; - fn envelope(&self) -> Self::Envelope { - self.envelope - } -} - -impl LayerRTreeObject +/// Provide ability to batch more updates while hiding the read +/// API so we don't accidentally read without flushing. +impl BatchedUpdates<'_, L> where L: ?Sized + Layer, { - fn new(layer: Arc) -> Self { - let key_range = layer.get_key_range(); - let lsn_range = layer.get_lsn_range(); + /// + /// Insert an on-disk layer. + /// + pub fn insert_historic(&mut self, layer: Arc) { + self.layer_map.insert_historic_noflush(layer) + } - let envelope = AABB::from_corners( - [ - IntKey::from(key_range.start.to_i128()), - IntKey::from(lsn_range.start.0 as i128), - ], - [ - IntKey::from(key_range.end.to_i128() - 1), - IntKey::from(lsn_range.end.0 as i128 - 1), - ], // AABB::upper is inclusive, while `key_range.end` and `lsn_range.end` are exclusive - ); - LayerRTreeObject { layer, envelope } + /// + /// Remove an on-disk layer from the map. + /// + /// This should be called when the corresponding file on disk has been deleted. + /// + pub fn remove_historic(&mut self, layer: Arc) { + self.layer_map.remove_historic_noflush(layer) + } + + // We will flush on drop anyway, but this method makes it + // more explicit that there is some work being done. + /// Apply all updates + pub fn flush(self) { + // Flush happens on drop + } +} + +// Ideally the flush() method should be called explicitly for more +// controlled execution. But if we forget we'd rather flush on drop +// than panic later or read without flushing. +// +// TODO maybe warn if flush hasn't explicitly been called +impl Drop for BatchedUpdates<'_, L> +where + L: ?Sized + Layer, +{ + fn drop(&mut self) { + self.layer_map.flush_updates(); } } @@ -281,125 +201,91 @@ where /// 'open' and 'frozen' layers! /// pub fn search(&self, key: Key, end_lsn: Lsn) -> Option> { - // Find the latest image layer that covers the given key - let mut latest_img: Option> = None; - let mut latest_img_lsn: Option = None; - let envelope = AABB::from_corners( - [IntKey::from(key.to_i128()), IntKey::from(0i128)], - [ - IntKey::from(key.to_i128()), - IntKey::from(end_lsn.0 as i128 - 1), - ], - ); - for e in self - .historic_layers - .locate_in_envelope_intersecting(&envelope) - { - let l = &e.layer; - if l.is_incremental() { - continue; - } - assert!(l.get_key_range().contains(&key)); - let img_lsn = l.get_lsn_range().start; - assert!(img_lsn < end_lsn); - if Lsn(img_lsn.0 + 1) == end_lsn { - // found exact match - return Some(SearchResult { - layer: Arc::clone(l), - lsn_floor: img_lsn, - }); - } - if img_lsn > latest_img_lsn.unwrap_or(Lsn(0)) { - latest_img = Some(Arc::clone(l)); - latest_img_lsn = Some(img_lsn); - } - } + let version = self.historic.get().unwrap().get_version(end_lsn.0 - 1)?; + let latest_delta = version.delta_coverage.query(key.to_i128()); + let latest_image = version.image_coverage.query(key.to_i128()); - // Search the delta layers - let mut latest_delta: Option> = None; - for e in self - .historic_layers - .locate_in_envelope_intersecting(&envelope) - { - let l = &e.layer; - if !l.is_incremental() { - continue; + match (latest_delta, latest_image) { + (None, None) => None, + (None, Some(image)) => { + let lsn_floor = image.get_lsn_range().start; + Some(SearchResult { + layer: image, + lsn_floor, + }) } - assert!(l.get_key_range().contains(&key)); - if l.get_lsn_range().start >= end_lsn { - info!( - "Candidate delta layer {}..{} is too new for lsn {}", - l.get_lsn_range().start, - l.get_lsn_range().end, - end_lsn - ); + (Some(delta), None) => { + let lsn_floor = delta.get_lsn_range().start; + Some(SearchResult { + layer: delta, + lsn_floor, + }) } - assert!(l.get_lsn_range().start < end_lsn); - if l.get_lsn_range().end >= end_lsn { - // this layer contains the requested point in the key/lsn space. - // No need to search any further - trace!( - "found layer {} for request on {key} at {end_lsn}", - l.short_id(), - ); - latest_delta.replace(Arc::clone(l)); - break; - } - if l.get_lsn_range().end > latest_img_lsn.unwrap_or(Lsn(0)) { - // this layer's end LSN is smaller than the requested point. If there's - // nothing newer, this is what we need to return. Remember this. - if let Some(old_candidate) = &latest_delta { - if l.get_lsn_range().end > old_candidate.get_lsn_range().end { - latest_delta.replace(Arc::clone(l)); - } + (Some(delta), Some(image)) => { + let img_lsn = image.get_lsn_range().start; + let image_is_newer = image.get_lsn_range().end >= delta.get_lsn_range().end; + let image_exact_match = img_lsn + 1 == end_lsn; + if image_is_newer || image_exact_match { + Some(SearchResult { + layer: image, + lsn_floor: img_lsn, + }) } else { - latest_delta.replace(Arc::clone(l)); + let lsn_floor = + std::cmp::max(delta.get_lsn_range().start, image.get_lsn_range().start + 1); + Some(SearchResult { + layer: delta, + lsn_floor, + }) } } } - if let Some(l) = latest_delta { - trace!( - "found (old) layer {} for request on {key} at {end_lsn}", - l.short_id(), - ); - let lsn_floor = std::cmp::max( - Lsn(latest_img_lsn.unwrap_or(Lsn(0)).0 + 1), - l.get_lsn_range().start, - ); - Some(SearchResult { - lsn_floor, - layer: l, - }) - } else if let Some(l) = latest_img { - trace!("found img layer and no deltas for request on {key} at {end_lsn}"); - Some(SearchResult { - lsn_floor: latest_img_lsn.unwrap(), - layer: l, - }) - } else { - trace!("no layer found for request on {key} at {end_lsn}"); - None - } + } + + /// Start a batch of updates, applied on drop + pub fn batch_update(&mut self) -> BatchedUpdates<'_, L> { + BatchedUpdates { layer_map: self } } /// /// Insert an on-disk layer /// - pub fn insert_historic(&mut self, layer: Arc) { - if layer.get_key_range() == (Key::MIN..Key::MAX) { - self.l0_delta_layers.push(layer.clone()); + /// Helper function for BatchedUpdates::insert_historic + /// + pub(self) fn insert_historic_noflush(&mut self, layer: Arc) { + let kr = layer.get_key_range(); + let lr = layer.get_lsn_range(); + self.historic.insert( + historic_layer_coverage::LayerKey { + key: kr.start.to_i128()..kr.end.to_i128(), + lsn: lr.start.0..lr.end.0, + is_image: !layer.is_incremental(), + }, + Arc::clone(&layer), + ); + + if Self::is_l0(&layer) { + self.l0_delta_layers.push(layer); } - self.historic_layers.insert(LayerRTreeObject::new(layer)); + NUM_ONDISK_LAYERS.inc(); } /// /// Remove an on-disk layer from the map. /// - /// This should be called when the corresponding file on disk has been deleted. + /// Helper function for BatchedUpdates::remove_historic /// - pub fn remove_historic(&mut self, layer: Arc) { - if layer.get_key_range() == (Key::MIN..Key::MAX) { + pub fn remove_historic_noflush(&mut self, layer: Arc) { + let kr = layer.get_key_range(); + let lr = layer.get_lsn_range(); + self.historic.remove(historic_layer_coverage::LayerKey { + key: kr.start.to_i128()..kr.end.to_i128(), + lsn: lr.start.0..lr.end.0, + is_image: !layer.is_incremental(), + }); + + if Self::is_l0(&layer) { let len_before = self.l0_delta_layers.len(); // FIXME: ptr_eq might fail to return true for 'dyn' @@ -411,98 +297,57 @@ where .retain(|other| !Arc::ptr_eq(other, &layer)); assert_eq!(self.l0_delta_layers.len(), len_before - 1); } - assert!(self - .historic_layers - .remove(&LayerRTreeObject::new(layer)) - .is_some()); + NUM_ONDISK_LAYERS.dec(); } + /// Helper function for BatchedUpdates::drop. + pub(self) fn flush_updates(&mut self) { + self.historic.rebuild(); + } + /// Is there a newer image layer for given key- and LSN-range? Or a set /// of image layers within the specified lsn range that cover the entire /// specified key range? /// /// This is used for garbage collection, to determine if an old layer can /// be deleted. - pub fn image_layer_exists( - &self, - key_range: &Range, - lsn_range: &Range, - ) -> Result { - let mut range_remain = key_range.clone(); + pub fn image_layer_exists(&self, key: &Range, lsn: &Range) -> Result { + if key.is_empty() { + // Vacuously true. There's a newer image for all 0 of the kerys in the range. + return Ok(true); + } - loop { - let mut made_progress = false; - let envelope = AABB::from_corners( - [ - IntKey::from(range_remain.start.to_i128()), - IntKey::from(lsn_range.start.0 as i128), - ], - [ - IntKey::from(range_remain.end.to_i128() - 1), - IntKey::from(lsn_range.end.0 as i128 - 1), - ], - ); - for e in self - .historic_layers - .locate_in_envelope_intersecting(&envelope) - { - let l = &e.layer; - if l.is_incremental() { - continue; - } - let img_lsn = l.get_lsn_range().start; - if l.get_key_range().contains(&range_remain.start) && lsn_range.contains(&img_lsn) { - made_progress = true; - let img_key_end = l.get_key_range().end; + let version = match self.historic.get().unwrap().get_version(lsn.end.0 - 1) { + Some(v) => v, + None => return Ok(false), + }; - if img_key_end >= range_remain.end { - return Ok(true); - } - range_remain.start = img_key_end; - } - } + let start = key.start.to_i128(); + let end = key.end.to_i128(); - if !made_progress { + let layer_covers = |layer: Option>| match layer { + Some(layer) => layer.get_lsn_range().start >= lsn.start, + None => false, + }; + + // Check the start is covered + if !layer_covers(version.image_coverage.query(start)) { + return Ok(false); + } + + // Check after all changes of coverage + for (_, change_val) in version.image_coverage.range(start..end) { + if !layer_covers(change_val) { return Ok(false); } } + + Ok(true) } pub fn iter_historic_layers(&self) -> impl '_ + Iterator> { - self.historic_layers.iter().map(|e| e.layer.clone()) - } - - /// Find the last image layer that covers 'key', ignoring any image layers - /// newer than 'lsn'. - fn find_latest_image(&self, key: Key, lsn: Lsn) -> Option> { - let mut candidate_lsn = Lsn(0); - let mut candidate = None; - let envelope = AABB::from_corners( - [IntKey::from(key.to_i128()), IntKey::from(0)], - [IntKey::from(key.to_i128()), IntKey::from(lsn.0 as i128)], - ); - for e in self - .historic_layers - .locate_in_envelope_intersecting(&envelope) - { - let l = &e.layer; - if l.is_incremental() { - continue; - } - - assert!(l.get_key_range().contains(&key)); - let this_lsn = l.get_lsn_range().start; - assert!(this_lsn <= lsn); - if this_lsn < candidate_lsn { - // our previous candidate was better - continue; - } - candidate_lsn = this_lsn; - candidate = Some(Arc::clone(l)); - } - - candidate + self.historic.iter() } /// @@ -518,94 +363,288 @@ where key_range: &Range, lsn: Lsn, ) -> Result, Option>)>> { - let mut points = vec![key_range.start]; - let envelope = AABB::from_corners( - [IntKey::from(key_range.start.to_i128()), IntKey::from(0)], - [ - IntKey::from(key_range.end.to_i128()), - IntKey::from(lsn.0 as i128), - ], - ); - for e in self - .historic_layers - .locate_in_envelope_intersecting(&envelope) - { - let l = &e.layer; - assert!(l.get_lsn_range().start <= lsn); - let range = l.get_key_range(); - if key_range.contains(&range.start) { - points.push(l.get_key_range().start); - } - if key_range.contains(&range.end) { - points.push(l.get_key_range().end); - } + let version = match self.historic.get().unwrap().get_version(lsn.0) { + Some(v) => v, + None => return Ok(vec![]), + }; + + let start = key_range.start.to_i128(); + let end = key_range.end.to_i128(); + + // Initialize loop variables + let mut coverage: Vec<(Range, Option>)> = vec![]; + let mut current_key = start; + let mut current_val = version.image_coverage.query(start); + + // Loop through the change events and push intervals + for (change_key, change_val) in version.image_coverage.range(start..end) { + let kr = Key::from_i128(current_key)..Key::from_i128(change_key); + coverage.push((kr, current_val.take())); + current_key = change_key; + current_val = change_val.clone(); } - points.push(key_range.end); - points.sort(); - points.dedup(); + // Add the final interval + let kr = Key::from_i128(current_key)..Key::from_i128(end); + coverage.push((kr, current_val.take())); - // Ok, we now have a list of "interesting" points in the key space - - // For each range between the points, find the latest image - let mut start = *points.first().unwrap(); - let mut ranges = Vec::new(); - for end in points[1..].iter() { - let img = self.find_latest_image(start, lsn); - - ranges.push((start..*end, img)); - - start = *end; - } - Ok(ranges) + Ok(coverage) } - /// Count the height of the tallest stack of deltas in this 2d region. + pub fn is_l0(layer: &L) -> bool { + range_eq(&layer.get_key_range(), &(Key::MIN..Key::MAX)) + } + + /// This function determines which layers are counted in `count_deltas`: + /// layers that should count towards deciding whether or not to reimage + /// a certain partition range. + /// + /// There are two kinds of layers we currently consider reimage-worthy: + /// + /// Case 1: Non-L0 layers are currently reimage-worthy by default. + /// TODO Some of these layers are very sparse and cover the entire key + /// range. Replacing 256MB of data (or less!) with terabytes of + /// images doesn't seem wise. We need a better heuristic, possibly + /// based on some of these factors: + /// a) whether this layer has any wal in this partition range + /// b) the size of the layer + /// c) the number of images needed to cover it + /// d) the estimated time until we'll have to reimage over it for GC + /// + /// Case 2: Since L0 layers by definition cover the entire key space, we consider + /// them reimage-worthy only when the entire key space can be covered by very few + /// images (currently 1). + /// TODO The optimal number should probably be slightly higher than 1, but to + /// implement that we need to plumb a lot more context into this function + /// than just the current partition_range. + pub fn is_reimage_worthy(layer: &L, partition_range: &Range) -> bool { + // Case 1 + if !Self::is_l0(layer) { + return true; + } + + // Case 2 + if range_eq(partition_range, &(Key::MIN..Key::MAX)) { + return true; + } + + false + } + + /// Count the height of the tallest stack of reimage-worthy deltas + /// in this 2d region. + /// + /// If `limit` is provided we don't try to count above that number. /// /// This number is used to compute the largest number of deltas that /// we'll need to visit for any page reconstruction in this region. /// We use this heuristic to decide whether to create an image layer. - /// - /// TODO currently we just return the total number of deltas in the - /// region, no matter if they're stacked on top of each other - /// or next to each other. - pub fn count_deltas(&self, key_range: &Range, lsn_range: &Range) -> Result { - let mut result = 0; - if lsn_range.start >= lsn_range.end { + pub fn count_deltas( + &self, + key: &Range, + lsn: &Range, + limit: Option, + ) -> Result { + // We get the delta coverage of the region, and for each part of the coverage + // we recurse right underneath the delta. The recursion depth is limited by + // the largest result this function could return, which is in practice between + // 3 and 10 (since we usually try to create an image when the number gets larger). + + if lsn.is_empty() || key.is_empty() || limit == Some(0) { return Ok(0); } - let envelope = AABB::from_corners( - [ - IntKey::from(key_range.start.to_i128()), - IntKey::from(lsn_range.start.0 as i128), - ], - [ - IntKey::from(key_range.end.to_i128() - 1), - IntKey::from(lsn_range.end.0 as i128 - 1), - ], - ); - for e in self - .historic_layers - .locate_in_envelope_intersecting(&envelope) - { - let l = &e.layer; - if !l.is_incremental() { - continue; - } - assert!(range_overlaps(&l.get_lsn_range(), lsn_range)); - assert!(range_overlaps(&l.get_key_range(), key_range)); - // We ignore level0 delta layers. Unless the whole keyspace fits - // into one partition - if !range_eq(key_range, &(Key::MIN..Key::MAX)) - && range_eq(&l.get_key_range(), &(Key::MIN..Key::MAX)) - { - continue; + let version = match self.historic.get().unwrap().get_version(lsn.end.0 - 1) { + Some(v) => v, + None => return Ok(0), + }; + + let start = key.start.to_i128(); + let end = key.end.to_i128(); + + // Initialize loop variables + let mut max_stacked_deltas = 0; + let mut current_key = start; + let mut current_val = version.delta_coverage.query(start); + + // Loop through the delta coverage and recurse on each part + for (change_key, change_val) in version.delta_coverage.range(start..end) { + // If there's a relevant delta in this part, add 1 and recurse down + if let Some(val) = current_val { + if val.get_lsn_range().end > lsn.start { + let kr = Key::from_i128(current_key)..Key::from_i128(change_key); + let lr = lsn.start..val.get_lsn_range().start; + if !kr.is_empty() { + let base_count = Self::is_reimage_worthy(&val, key) as usize; + let new_limit = limit.map(|l| l - base_count); + let max_stacked_deltas_underneath = + self.count_deltas(&kr, &lr, new_limit)?; + max_stacked_deltas = std::cmp::max( + max_stacked_deltas, + base_count + max_stacked_deltas_underneath, + ); + } + } } - result += 1; + current_key = change_key; + current_val = change_val.clone(); } - Ok(result) + + // Consider the last part + if let Some(val) = current_val { + if val.get_lsn_range().end > lsn.start { + let kr = Key::from_i128(current_key)..Key::from_i128(end); + let lr = lsn.start..val.get_lsn_range().start; + + if !kr.is_empty() { + let base_count = Self::is_reimage_worthy(&val, key) as usize; + let new_limit = limit.map(|l| l - base_count); + let max_stacked_deltas_underneath = self.count_deltas(&kr, &lr, new_limit)?; + max_stacked_deltas = std::cmp::max( + max_stacked_deltas, + base_count + max_stacked_deltas_underneath, + ); + } + } + } + + Ok(max_stacked_deltas) + } + + /// Count how many reimage-worthy layers we need to visit for given key-lsn pair. + /// + /// The `partition_range` argument is used as context for the reimage-worthiness decision. + /// + /// Used as a helper for correctness checks only. Performance not critical. + pub fn get_difficulty(&self, lsn: Lsn, key: Key, partition_range: &Range) -> usize { + match self.search(key, lsn) { + Some(search_result) => { + if search_result.layer.is_incremental() { + (Self::is_reimage_worthy(&search_result.layer, partition_range) as usize) + + self.get_difficulty(search_result.lsn_floor, key, partition_range) + } else { + 0 + } + } + None => 0, + } + } + + /// Used for correctness checking. Results are expected to be identical to + /// self.get_difficulty_map. Assumes self.search is correct. + pub fn get_difficulty_map_bruteforce( + &self, + lsn: Lsn, + partitioning: &KeyPartitioning, + ) -> Vec { + // Looking at the difficulty as a function of key, it could only increase + // when a delta layer starts or an image layer ends. Therefore it's sufficient + // to check the difficulties at: + // - the key.start for each non-empty part range + // - the key.start for each delta + // - the key.end for each image + let keys_iter: Box> = { + let mut keys: Vec = self + .iter_historic_layers() + .map(|layer| { + if layer.is_incremental() { + layer.get_key_range().start + } else { + layer.get_key_range().end + } + }) + .collect(); + keys.sort(); + Box::new(keys.into_iter()) + }; + let mut keys_iter = keys_iter.peekable(); + + // Iter the partition and keys together and query all the necessary + // keys, computing the max difficulty for each part. + partitioning + .parts + .iter() + .map(|part| { + let mut difficulty = 0; + // Partition ranges are assumed to be sorted and disjoint + // TODO assert it + for range in &part.ranges { + if !range.is_empty() { + difficulty = + std::cmp::max(difficulty, self.get_difficulty(lsn, range.start, range)); + } + while let Some(key) = keys_iter.peek() { + if key >= &range.end { + break; + } + let key = keys_iter.next().unwrap(); + if key < range.start { + continue; + } + difficulty = + std::cmp::max(difficulty, self.get_difficulty(lsn, key, range)); + } + } + difficulty + }) + .collect() + } + + /// For each part of a keyspace partitioning, return the maximum number of layers + /// that would be needed for page reconstruction in that part at the given LSN. + /// + /// If `limit` is provided we don't try to count above that number. + /// + /// This method is used to decide where to create new image layers. Computing the + /// result for the entire partitioning at once allows this function to be more + /// efficient, and further optimization is possible by using iterators instead, + /// to allow early return. + /// + /// TODO actually use this method instead of count_deltas. Currently we only use + /// it for benchmarks. + pub fn get_difficulty_map( + &self, + lsn: Lsn, + partitioning: &KeyPartitioning, + limit: Option, + ) -> Vec { + // TODO This is a naive implementation. Perf improvements to do: + // 1. Instead of calling self.image_coverage and self.count_deltas, + // iterate the image and delta coverage only once. + partitioning + .parts + .iter() + .map(|part| { + let mut difficulty = 0; + for range in &part.ranges { + if limit == Some(difficulty) { + break; + } + for (img_range, last_img) in self + .image_coverage(range, lsn) + .expect("why would this err?") + { + if limit == Some(difficulty) { + break; + } + let img_lsn = if let Some(last_img) = last_img { + last_img.get_lsn_range().end + } else { + Lsn(0) + }; + + if img_lsn < lsn { + let num_deltas = self + .count_deltas(&img_range, &(img_lsn..lsn), limit) + .expect("why would this err lol?"); + difficulty = std::cmp::max(difficulty, num_deltas); + } + } + } + difficulty + }) + .collect() } /// Return all L0 delta layers @@ -629,8 +668,8 @@ where } println!("historic_layers:"); - for e in self.historic_layers.iter() { - e.layer.dump(verbose)?; + for layer in self.iter_historic_layers() { + layer.dump(verbose)?; } println!("End dump LayerMap"); Ok(()) diff --git a/pageserver/src/tenant/layer_map/historic_layer_coverage.rs b/pageserver/src/tenant/layer_map/historic_layer_coverage.rs new file mode 100644 index 0000000000..46821aef15 --- /dev/null +++ b/pageserver/src/tenant/layer_map/historic_layer_coverage.rs @@ -0,0 +1,583 @@ +use std::collections::BTreeMap; +use std::ops::Range; + +use tracing::info; + +use super::layer_coverage::LayerCoverageTuple; + +/// Layers in this module are identified and indexed by this data. +/// +/// This is a helper struct to enable sorting layers by lsn.start. +/// +/// These three values are enough to uniquely identify a layer, since +/// a layer is obligated to contain all contents within range, so two +/// deltas (or images) with the same range have identical content. +#[derive(Debug, PartialEq, Eq, Clone)] +pub struct LayerKey { + // TODO I use i128 and u64 because it was easy for prototyping, + // testing, and benchmarking. If we can use the Lsn and Key + // types without overhead that would be preferable. + pub key: Range, + pub lsn: Range, + pub is_image: bool, +} + +impl PartialOrd for LayerKey { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for LayerKey { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + // NOTE we really care about comparing by lsn.start first + self.lsn + .start + .cmp(&other.lsn.start) + .then(self.lsn.end.cmp(&other.lsn.end)) + .then(self.key.start.cmp(&other.key.start)) + .then(self.key.end.cmp(&other.key.end)) + .then(self.is_image.cmp(&other.is_image)) + } +} + +/// Efficiently queryable layer coverage for each LSN. +/// +/// Allows answering layer map queries very efficiently, +/// but doesn't allow retroactive insertion, which is +/// sometimes necessary. See BufferedHistoricLayerCoverage. +pub struct HistoricLayerCoverage { + /// The latest state + head: LayerCoverageTuple, + + /// All previous states + historic: BTreeMap>, +} + +impl Default for HistoricLayerCoverage { + fn default() -> Self { + Self::new() + } +} + +impl HistoricLayerCoverage { + pub fn new() -> Self { + Self { + head: LayerCoverageTuple::default(), + historic: BTreeMap::default(), + } + } + + /// Add a layer + /// + /// Panics if new layer has older lsn.start than an existing layer. + /// See BufferedHistoricLayerCoverage for a more general insertion method. + pub fn insert(&mut self, layer_key: LayerKey, value: Value) { + // It's only a persistent map, not a retroactive one + if let Some(last_entry) = self.historic.iter().next_back() { + let last_lsn = last_entry.0; + if layer_key.lsn.start < *last_lsn { + panic!("unexpected retroactive insert"); + } + } + + // Insert into data structure + if layer_key.is_image { + self.head + .image_coverage + .insert(layer_key.key, layer_key.lsn.clone(), value); + } else { + self.head + .delta_coverage + .insert(layer_key.key, layer_key.lsn.clone(), value); + } + + // Remember history. Clone is O(1) + self.historic.insert(layer_key.lsn.start, self.head.clone()); + } + + /// Query at a particular LSN, inclusive + pub fn get_version(&self, lsn: u64) -> Option<&LayerCoverageTuple> { + match self.historic.range(..=lsn).next_back() { + Some((_, v)) => Some(v), + None => None, + } + } + + /// Remove all entries after a certain LSN (inclusive) + pub fn trim(&mut self, begin: &u64) { + self.historic.split_off(begin); + self.head = self + .historic + .iter() + .rev() + .next() + .map(|(_, v)| v.clone()) + .unwrap_or_default(); + } +} + +/// This is the most basic test that demonstrates intended usage. +/// All layers in this test have height 1. +#[test] +fn test_persistent_simple() { + let mut map = HistoricLayerCoverage::::new(); + map.insert( + LayerKey { + key: 0..5, + lsn: 100..101, + is_image: true, + }, + "Layer 1".to_string(), + ); + map.insert( + LayerKey { + key: 3..9, + lsn: 110..111, + is_image: true, + }, + "Layer 2".to_string(), + ); + map.insert( + LayerKey { + key: 5..6, + lsn: 120..121, + is_image: true, + }, + "Layer 3".to_string(), + ); + + // After Layer 1 insertion + let version = map.get_version(105).unwrap(); + assert_eq!(version.image_coverage.query(1), Some("Layer 1".to_string())); + assert_eq!(version.image_coverage.query(4), Some("Layer 1".to_string())); + + // After Layer 2 insertion + let version = map.get_version(115).unwrap(); + assert_eq!(version.image_coverage.query(4), Some("Layer 2".to_string())); + assert_eq!(version.image_coverage.query(8), Some("Layer 2".to_string())); + assert_eq!(version.image_coverage.query(11), None); + + // After Layer 3 insertion + let version = map.get_version(125).unwrap(); + assert_eq!(version.image_coverage.query(4), Some("Layer 2".to_string())); + assert_eq!(version.image_coverage.query(5), Some("Layer 3".to_string())); + assert_eq!(version.image_coverage.query(7), Some("Layer 2".to_string())); +} + +/// Cover simple off-by-one edge cases +#[test] +fn test_off_by_one() { + let mut map = HistoricLayerCoverage::::new(); + map.insert( + LayerKey { + key: 3..5, + lsn: 100..110, + is_image: true, + }, + "Layer 1".to_string(), + ); + + // Check different LSNs + let version = map.get_version(99); + assert!(version.is_none()); + let version = map.get_version(100).unwrap(); + assert_eq!(version.image_coverage.query(4), Some("Layer 1".to_string())); + let version = map.get_version(110).unwrap(); + assert_eq!(version.image_coverage.query(4), Some("Layer 1".to_string())); + + // Check different keys + let version = map.get_version(105).unwrap(); + assert_eq!(version.image_coverage.query(2), None); + assert_eq!(version.image_coverage.query(3), Some("Layer 1".to_string())); + assert_eq!(version.image_coverage.query(4), Some("Layer 1".to_string())); + assert_eq!(version.image_coverage.query(5), None); +} + +/// Cover edge cases where layers begin or end on the same key +#[test] +fn test_key_collision() { + let mut map = HistoricLayerCoverage::::new(); + + map.insert( + LayerKey { + key: 3..5, + lsn: 100..110, + is_image: true, + }, + "Layer 10".to_string(), + ); + map.insert( + LayerKey { + key: 5..8, + lsn: 100..110, + is_image: true, + }, + "Layer 11".to_string(), + ); + map.insert( + LayerKey { + key: 3..4, + lsn: 200..210, + is_image: true, + }, + "Layer 20".to_string(), + ); + + // Check after layer 11 + let version = map.get_version(105).unwrap(); + assert_eq!(version.image_coverage.query(2), None); + assert_eq!( + version.image_coverage.query(3), + Some("Layer 10".to_string()) + ); + assert_eq!( + version.image_coverage.query(5), + Some("Layer 11".to_string()) + ); + assert_eq!( + version.image_coverage.query(7), + Some("Layer 11".to_string()) + ); + assert_eq!(version.image_coverage.query(8), None); + + // Check after layer 20 + let version = map.get_version(205).unwrap(); + assert_eq!(version.image_coverage.query(2), None); + assert_eq!( + version.image_coverage.query(3), + Some("Layer 20".to_string()) + ); + assert_eq!( + version.image_coverage.query(5), + Some("Layer 11".to_string()) + ); + assert_eq!( + version.image_coverage.query(7), + Some("Layer 11".to_string()) + ); + assert_eq!(version.image_coverage.query(8), None); +} + +/// Test when rectangles have nontrivial height and possibly overlap +#[test] +fn test_persistent_overlapping() { + let mut map = HistoricLayerCoverage::::new(); + + // Add 3 key-disjoint layers with varying LSN ranges + map.insert( + LayerKey { + key: 1..2, + lsn: 100..200, + is_image: true, + }, + "Layer 1".to_string(), + ); + map.insert( + LayerKey { + key: 4..5, + lsn: 110..200, + is_image: true, + }, + "Layer 2".to_string(), + ); + map.insert( + LayerKey { + key: 7..8, + lsn: 120..300, + is_image: true, + }, + "Layer 3".to_string(), + ); + + // Add wide and short layer + map.insert( + LayerKey { + key: 0..9, + lsn: 130..199, + is_image: true, + }, + "Layer 4".to_string(), + ); + + // Add wide layer taller than some + map.insert( + LayerKey { + key: 0..9, + lsn: 140..201, + is_image: true, + }, + "Layer 5".to_string(), + ); + + // Add wide layer taller than all + map.insert( + LayerKey { + key: 0..9, + lsn: 150..301, + is_image: true, + }, + "Layer 6".to_string(), + ); + + // After layer 4 insertion + let version = map.get_version(135).unwrap(); + assert_eq!(version.image_coverage.query(0), Some("Layer 4".to_string())); + assert_eq!(version.image_coverage.query(1), Some("Layer 1".to_string())); + assert_eq!(version.image_coverage.query(2), Some("Layer 4".to_string())); + assert_eq!(version.image_coverage.query(4), Some("Layer 2".to_string())); + assert_eq!(version.image_coverage.query(5), Some("Layer 4".to_string())); + assert_eq!(version.image_coverage.query(7), Some("Layer 3".to_string())); + assert_eq!(version.image_coverage.query(8), Some("Layer 4".to_string())); + + // After layer 5 insertion + let version = map.get_version(145).unwrap(); + assert_eq!(version.image_coverage.query(0), Some("Layer 5".to_string())); + assert_eq!(version.image_coverage.query(1), Some("Layer 5".to_string())); + assert_eq!(version.image_coverage.query(2), Some("Layer 5".to_string())); + assert_eq!(version.image_coverage.query(4), Some("Layer 5".to_string())); + assert_eq!(version.image_coverage.query(5), Some("Layer 5".to_string())); + assert_eq!(version.image_coverage.query(7), Some("Layer 3".to_string())); + assert_eq!(version.image_coverage.query(8), Some("Layer 5".to_string())); + + // After layer 6 insertion + let version = map.get_version(155).unwrap(); + assert_eq!(version.image_coverage.query(0), Some("Layer 6".to_string())); + assert_eq!(version.image_coverage.query(1), Some("Layer 6".to_string())); + assert_eq!(version.image_coverage.query(2), Some("Layer 6".to_string())); + assert_eq!(version.image_coverage.query(4), Some("Layer 6".to_string())); + assert_eq!(version.image_coverage.query(5), Some("Layer 6".to_string())); + assert_eq!(version.image_coverage.query(7), Some("Layer 6".to_string())); + assert_eq!(version.image_coverage.query(8), Some("Layer 6".to_string())); +} + +/// Wrapper for HistoricLayerCoverage that allows us to hack around the lack +/// of support for retroactive insertion by rebuilding the map since the +/// change. +/// +/// Why is this needed? We most often insert new layers with newer LSNs, +/// but during compaction we create layers with non-latest LSN, and during +/// GC we delete historic layers. +/// +/// Even though rebuilding is an expensive (N log N) solution to the problem, +/// it's not critical since we do something equally expensive just to decide +/// whether or not to create new image layers. +/// TODO It's not expensive but it's not great to hold a layer map write lock +/// for that long. +/// +/// If this becomes an actual bottleneck, one solution would be to build a +/// segment tree that holds PersistentLayerMaps. Though this would mean that +/// we take an additional log(N) performance hit for queries, which will probably +/// still be more critical. +/// +/// See this for more on persistent and retroactive techniques: +/// https://www.youtube.com/watch?v=WqCWghETNDc&t=581s +pub struct BufferedHistoricLayerCoverage { + /// A persistent layer map that we rebuild when we need to retroactively update + historic_coverage: HistoricLayerCoverage, + + /// We buffer insertion into the PersistentLayerMap to decrease the number of rebuilds. + buffer: BTreeMap>, + + /// All current layers. This is not used for search. Only to make rebuilds easier. + layers: BTreeMap, +} + +impl std::fmt::Debug for BufferedHistoricLayerCoverage { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("RetroactiveLayerMap") + .field("buffer", &self.buffer) + .field("layers", &self.layers) + .finish() + } +} + +impl Default for BufferedHistoricLayerCoverage { + fn default() -> Self { + Self::new() + } +} + +impl BufferedHistoricLayerCoverage { + pub fn new() -> Self { + Self { + historic_coverage: HistoricLayerCoverage::::new(), + buffer: BTreeMap::new(), + layers: BTreeMap::new(), + } + } + + pub fn insert(&mut self, layer_key: LayerKey, value: Value) { + self.buffer.insert(layer_key, Some(value)); + } + + pub fn remove(&mut self, layer_key: LayerKey) { + self.buffer.insert(layer_key, None); + } + + pub fn rebuild(&mut self) { + // Find the first LSN that needs to be rebuilt + let rebuild_since: u64 = match self.buffer.iter().next() { + Some((LayerKey { lsn, .. }, _)) => lsn.start, + None => return, // No need to rebuild if buffer is empty + }; + + // Apply buffered updates to self.layers + let num_updates = self.buffer.len(); + self.buffer.retain(|layer_key, layer| { + match layer { + Some(l) => { + self.layers.insert(layer_key.clone(), l.clone()); + } + None => { + self.layers.remove(layer_key); + } + }; + false + }); + + // Rebuild + let mut num_inserted = 0; + self.historic_coverage.trim(&rebuild_since); + for (layer_key, layer) in self.layers.range( + LayerKey { + lsn: rebuild_since..0, + key: 0..0, + is_image: false, + }.., + ) { + self.historic_coverage + .insert(layer_key.clone(), layer.clone()); + num_inserted += 1; + } + + // TODO maybe only warn if ratio is at least 10 + info!( + "Rebuilt layer map. Did {} insertions to process a batch of {} updates.", + num_inserted, num_updates, + ) + } + + /// Iterate all the layers + pub fn iter(&self) -> impl '_ + Iterator { + // NOTE we can actually perform this without rebuilding, + // but it's not necessary for now. + if !self.buffer.is_empty() { + panic!("rebuild pls") + } + + self.layers.values().cloned() + } + + /// Return a reference to a queryable map, assuming all updates + /// have already been processed using self.rebuild() + pub fn get(&self) -> anyhow::Result<&HistoricLayerCoverage> { + // NOTE we error here instead of implicitly rebuilding because + // rebuilding is somewhat expensive. + // TODO maybe implicitly rebuild and log/sentry an error? + if !self.buffer.is_empty() { + anyhow::bail!("rebuild required") + } + + Ok(&self.historic_coverage) + } +} + +#[test] +fn test_retroactive_regression_1() { + let mut map = BufferedHistoricLayerCoverage::new(); + + map.insert( + LayerKey { + key: 0..21267647932558653966460912964485513215, + lsn: 23761336..23761457, + is_image: false, + }, + "sdfsdfs".to_string(), + ); + + map.rebuild(); + + let version = map.get().unwrap().get_version(23761457).unwrap(); + assert_eq!( + version.delta_coverage.query(100), + Some("sdfsdfs".to_string()) + ); +} + +#[test] +fn test_retroactive_simple() { + let mut map = BufferedHistoricLayerCoverage::new(); + + // Append some images in increasing LSN order + map.insert( + LayerKey { + key: 0..5, + lsn: 100..101, + is_image: true, + }, + "Image 1".to_string(), + ); + map.insert( + LayerKey { + key: 3..9, + lsn: 110..111, + is_image: true, + }, + "Image 2".to_string(), + ); + map.insert( + LayerKey { + key: 4..6, + lsn: 120..121, + is_image: true, + }, + "Image 3".to_string(), + ); + map.insert( + LayerKey { + key: 8..9, + lsn: 120..121, + is_image: true, + }, + "Image 4".to_string(), + ); + + // Add a delta layer out of order + map.insert( + LayerKey { + key: 2..5, + lsn: 105..106, + is_image: true, + }, + "Delta 1".to_string(), + ); + + // Rebuild so we can start querying + map.rebuild(); + + // Query key 4 + let version = map.get().unwrap().get_version(90); + assert!(version.is_none()); + let version = map.get().unwrap().get_version(102).unwrap(); + assert_eq!(version.image_coverage.query(4), Some("Image 1".to_string())); + let version = map.get().unwrap().get_version(107).unwrap(); + assert_eq!(version.image_coverage.query(4), Some("Delta 1".to_string())); + let version = map.get().unwrap().get_version(115).unwrap(); + assert_eq!(version.image_coverage.query(4), Some("Image 2".to_string())); + let version = map.get().unwrap().get_version(125).unwrap(); + assert_eq!(version.image_coverage.query(4), Some("Image 3".to_string())); + + // Remove Image 3 + map.remove(LayerKey { + key: 4..6, + lsn: 120..121, + is_image: true, + }); + map.rebuild(); + + // Check deletion worked + let version = map.get().unwrap().get_version(125).unwrap(); + assert_eq!(version.image_coverage.query(4), Some("Image 2".to_string())); + assert_eq!(version.image_coverage.query(8), Some("Image 4".to_string())); +} diff --git a/pageserver/src/tenant/layer_map/layer_coverage.rs b/pageserver/src/tenant/layer_map/layer_coverage.rs new file mode 100644 index 0000000000..4e3b4516dc --- /dev/null +++ b/pageserver/src/tenant/layer_map/layer_coverage.rs @@ -0,0 +1,154 @@ +use std::ops::Range; + +// TODO the `im` crate has 20x more downloads and also has +// persistent/immutable BTree. It also runs a bit faster but +// results are not the same on some tests. +use rpds::RedBlackTreeMapSync; + +/// Data structure that can efficiently: +/// - find the latest layer by lsn.end at a given key +/// - iterate the latest layers in a key range +/// - insert layers in non-decreasing lsn.start order +/// +/// The struct is parameterized over Value for easier +/// testing, but in practice it's some sort of layer. +pub struct LayerCoverage { + /// For every change in coverage (as we sweep the key space) + /// we store (lsn.end, value). + /// + /// We use an immutable/persistent tree so that we can keep historic + /// versions of this coverage without cloning the whole thing and + /// incurring quadratic memory cost. See HistoricLayerCoverage. + /// + /// We use the Sync version of the map because we want Self to + /// be Sync. Using nonsync might be faster, if we can work with + /// that. + nodes: RedBlackTreeMapSync>, +} + +impl Default for LayerCoverage { + fn default() -> Self { + Self::new() + } +} + +impl LayerCoverage { + pub fn new() -> Self { + Self { + nodes: RedBlackTreeMapSync::default(), + } + } + + /// Helper function to subdivide the key range without changing any values + /// + /// Complexity: O(log N) + fn add_node(&mut self, key: i128) { + let value = match self.nodes.range(..=key).last() { + Some((_, Some(v))) => Some(v.clone()), + Some((_, None)) => None, + None => None, + }; + self.nodes.insert_mut(key, value); + } + + /// Insert a layer. + /// + /// Complexity: worst case O(N), in practice O(log N). See NOTE in implementation. + pub fn insert(&mut self, key: Range, lsn: Range, value: Value) { + // Add nodes at endpoints + // + // NOTE The order of lines is important. We add nodes at the start + // and end of the key range **before updating any nodes** in order + // to pin down the current coverage outside of the relevant key range. + // Only the coverage inside the layer's key range should change. + self.add_node(key.start); + self.add_node(key.end); + + // Raise the height where necessary + // + // NOTE This loop is worst case O(N), but amortized O(log N) in the special + // case when rectangles have no height. In practice I don't think we'll see + // the kind of layer intersections needed to trigger O(N) behavior. The worst + // case is N/2 horizontal layers overlapped with N/2 vertical layers in a + // grid pattern. + let mut to_update = Vec::new(); + let mut to_remove = Vec::new(); + let mut prev_covered = false; + for (k, node) in self.nodes.range(key.clone()) { + let needs_cover = match node { + None => true, + Some((h, _)) => h < &lsn.end, + }; + if needs_cover { + match prev_covered { + true => to_remove.push(*k), + false => to_update.push(*k), + } + } + prev_covered = needs_cover; + } + if !prev_covered { + to_remove.push(key.end); + } + for k in to_update { + self.nodes.insert_mut(k, Some((lsn.end, value.clone()))); + } + for k in to_remove { + self.nodes.remove_mut(&k); + } + } + + /// Get the latest (by lsn.end) layer at a given key + /// + /// Complexity: O(log N) + pub fn query(&self, key: i128) -> Option { + self.nodes + .range(..=key) + .rev() + .next()? + .1 + .as_ref() + .map(|(_, v)| v.clone()) + } + + /// Iterate the changes in layer coverage in a given range. You will likely + /// want to start with self.query(key.start), and then follow up with self.range + /// + /// Complexity: O(log N + result_size) + pub fn range(&self, key: Range) -> impl '_ + Iterator)> { + self.nodes + .range(key) + .map(|(k, v)| (*k, v.as_ref().map(|x| x.1.clone()))) + } + + /// O(1) clone + pub fn clone(&self) -> Self { + Self { + nodes: self.nodes.clone(), + } + } +} + +/// Image and delta coverage at a specific LSN. +pub struct LayerCoverageTuple { + pub image_coverage: LayerCoverage, + pub delta_coverage: LayerCoverage, +} + +impl Default for LayerCoverageTuple { + fn default() -> Self { + Self { + image_coverage: LayerCoverage::default(), + delta_coverage: LayerCoverage::default(), + } + } +} + +impl LayerCoverageTuple { + pub fn clone(&self) -> Self { + Self { + image_coverage: self.image_coverage.clone(), + delta_coverage: self.delta_coverage.clone(), + } + } +} diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index 6aee8ce23c..2149fc7eb7 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -196,3 +196,50 @@ pub fn downcast_remote_layer( None } } + +impl std::fmt::Debug for dyn Layer { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Layer") + .field("short_id", &self.short_id()) + .finish() + } +} + +/// Holds metadata about a layer without any content. Used mostly for testing. +pub struct LayerDescriptor { + pub key: Range, + pub lsn: Range, + pub is_incremental: bool, + pub short_id: String, +} + +impl Layer for LayerDescriptor { + fn get_key_range(&self) -> Range { + self.key.clone() + } + + fn get_lsn_range(&self) -> Range { + self.lsn.clone() + } + + fn is_incremental(&self) -> bool { + self.is_incremental + } + + fn get_value_reconstruct_data( + &self, + _key: Key, + _lsn_range: Range, + _reconstruct_data: &mut ValueReconstructState, + ) -> Result { + todo!("This method shouldn't be part of the Layer trait") + } + + fn short_id(&self) -> String { + self.short_id.clone() + } + + fn dump(&self, _verbose: bool) -> Result<()> { + todo!() + } +} diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index d59858f582..5b84df74d4 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -970,6 +970,7 @@ impl Timeline { /// pub(super) fn load_layer_map(&self, disk_consistent_lsn: Lsn) -> anyhow::Result<()> { let mut layers = self.layers.write().unwrap(); + let mut updates = layers.batch_update(); let mut num_layers = 0; let timer = self.metrics.load_layer_map_histo.start_timer(); @@ -1010,7 +1011,7 @@ impl Timeline { trace!("found layer {}", layer.path().display()); total_physical_size += file_size; - layers.insert_historic(Arc::new(layer)); + updates.insert_historic(Arc::new(layer)); num_layers += 1; } else if let Some(deltafilename) = DeltaFileName::parse_str(&fname) { // Create a DeltaLayer struct for each delta file. @@ -1041,7 +1042,7 @@ impl Timeline { trace!("found layer {}", layer.path().display()); total_physical_size += file_size; - layers.insert_historic(Arc::new(layer)); + updates.insert_historic(Arc::new(layer)); num_layers += 1; } else if fname == METADATA_FILE_NAME || fname.ends_with(".old") { // ignore these @@ -1067,6 +1068,7 @@ impl Timeline { } } + updates.flush(); layers.next_open_layer_at = Some(Lsn(disk_consistent_lsn.0) + 1); info!( @@ -1091,6 +1093,11 @@ impl Timeline { // Are we missing some files that are present in remote storage? // Create RemoteLayer instances for them. let mut local_only_layers = local_layers; + + // We're holding a layer map lock for a while but this + // method is only called during init so it's fine. + let mut layer_map = self.layers.write().unwrap(); + let mut updates = layer_map.batch_update(); for remote_layer_name in &index_part.timeline_layers { let local_layer = local_only_layers.remove(remote_layer_name); @@ -1129,7 +1136,7 @@ impl Timeline { anyhow::bail!("could not rename file {local_layer_path:?}: {err:?}"); } else { self.metrics.resident_physical_size_gauge.sub(local_size); - self.layers.write().unwrap().remove_historic(local_layer); + updates.remove_historic(local_layer); // fall-through to adding the remote layer } } else { @@ -1171,7 +1178,7 @@ impl Timeline { ); let remote_layer = Arc::new(remote_layer); - self.layers.write().unwrap().insert_historic(remote_layer); + updates.insert_historic(remote_layer); } LayerFileName::Delta(deltafilename) => { // Create a RemoteLayer for the delta file. @@ -1194,13 +1201,14 @@ impl Timeline { &remote_layer_metadata, ); let remote_layer = Arc::new(remote_layer); - self.layers.write().unwrap().insert_historic(remote_layer); + updates.insert_historic(remote_layer); } #[cfg(test)] LayerFileName::Test(_) => unreachable!(), } } + updates.flush(); Ok(local_only_layers) } @@ -2099,10 +2107,11 @@ impl Timeline { ])?; // Add it to the layer map - { - let mut layers = self.layers.write().unwrap(); - layers.insert_historic(Arc::new(new_delta)); - } + self.layers + .write() + .unwrap() + .batch_update() + .insert_historic(Arc::new(new_delta)); // update the timeline's physical size let sz = new_delta_path.metadata()?.len(); @@ -2166,13 +2175,15 @@ impl Timeline { // are some delta layers *later* than current 'lsn', if more WAL was processed and flushed // after we read last_record_lsn, which is passed here in the 'lsn' argument. if img_lsn < lsn { - let num_deltas = layers.count_deltas(&img_range, &(img_lsn..lsn))?; + let threshold = self.get_image_creation_threshold(); + let num_deltas = + layers.count_deltas(&img_range, &(img_lsn..lsn), Some(threshold))?; debug!( "key range {}-{}, has {} deltas on this timeline in LSN range {}..{}", img_range.start, img_range.end, num_deltas, img_lsn, lsn ); - if num_deltas >= self.get_image_creation_threshold() { + if num_deltas >= threshold { return Ok(true); } } @@ -2267,6 +2278,7 @@ impl Timeline { let mut layer_paths_to_upload = HashMap::with_capacity(image_layers.len()); let mut layers = self.layers.write().unwrap(); + let mut updates = layers.batch_update(); let timeline_path = self.conf.timeline_path(&self.timeline_id, &self.tenant_id); for l in image_layers { let path = l.filename(); @@ -2280,8 +2292,9 @@ impl Timeline { self.metrics .resident_physical_size_gauge .add(metadata.len()); - layers.insert_historic(Arc::new(l)); + updates.insert_historic(Arc::new(l)); } + updates.flush(); drop(layers); timer.stop_and_record(); @@ -2577,6 +2590,7 @@ impl Timeline { } let mut layers = self.layers.write().unwrap(); + let mut updates = layers.batch_update(); let mut new_layer_paths = HashMap::with_capacity(new_layers.len()); for l in new_layers { let new_delta_path = l.path(); @@ -2597,7 +2611,7 @@ impl Timeline { new_layer_paths.insert(new_delta_path, LayerFileMetadata::new(metadata.len())); let x: Arc = Arc::new(l); - layers.insert_historic(x); + updates.insert_historic(x); } // Now that we have reshuffled the data to set of new delta layers, we can @@ -2611,8 +2625,9 @@ impl Timeline { } layer_names_to_delete.push(l.filename()); l.delete()?; - layers.remove_historic(l); + updates.remove_historic(l); } + updates.flush(); drop(layers); // Also schedule the deletions in remote storage @@ -2812,6 +2827,7 @@ impl Timeline { // 3. it doesn't need to be retained for 'retain_lsns'; // 4. newer on-disk image layers cover the layer's whole key range // + // TODO holding a write lock is too agressive and avoidable let mut layers = self.layers.write().unwrap(); 'outer: for l in layers.iter_historic_layers() { result.layers_total += 1; @@ -2843,6 +2859,8 @@ impl Timeline { // might be referenced by child branches forever. // We can track this in child timeline GC and delete parent layers when // they are no longer needed. This might be complicated with long inheritance chains. + // + // TODO Vec is not a great choice for `retain_lsns` for retain_lsn in &retain_lsns { // start_lsn is inclusive if &l.get_lsn_range().start <= retain_lsn { @@ -2896,6 +2914,7 @@ impl Timeline { layers_to_remove.push(Arc::clone(&l)); } + let mut updates = layers.batch_update(); if !layers_to_remove.is_empty() { // Persist the new GC cutoff value in the metadata file, before // we actually remove anything. @@ -2913,7 +2932,13 @@ impl Timeline { } layer_names_to_delete.push(doomed_layer.filename()); doomed_layer.delete()?; // FIXME: schedule succeeded deletions before returning? - layers.remove_historic(doomed_layer); + + // TODO Removing from the bottom of the layer map is expensive. + // Maybe instead discard all layer map historic versions that + // won't be needed for page reconstruction for this timeline, + // and mark what we can't delete yet as deleted from the layer + // map index without actually rebuilding the index. + updates.remove_historic(doomed_layer); result.layers_removed += 1; } @@ -2925,6 +2950,7 @@ impl Timeline { remote_client.schedule_layer_file_deletion(&layer_names_to_delete)?; } } + updates.flush(); info!( "GC completed removing {} layers, cutoff {}", @@ -3081,11 +3107,13 @@ impl Timeline { // Delta- or ImageLayer in the layer map. let new_layer = remote_layer.create_downloaded_layer(self_clone.conf, *size); let mut layers = self_clone.layers.write().unwrap(); + let mut updates = layers.batch_update(); { let l: Arc = remote_layer.clone(); - layers.remove_historic(l); + updates.remove_historic(l); } - layers.insert_historic(new_layer); + updates.insert_historic(new_layer); + updates.flush(); drop(layers); // Now that we've inserted the download into the layer map, diff --git a/workspace_hack/Cargo.toml b/workspace_hack/Cargo.toml index f4b71ae9b7..acc38cfb9d 100644 --- a/workspace_hack/Cargo.toml +++ b/workspace_hack/Cargo.toml @@ -31,7 +31,7 @@ memchr = { version = "2" } nom = { version = "7" } num-bigint = { version = "0.4" } num-integer = { version = "0.1", features = ["i128"] } -num-traits = { version = "0.2", features = ["i128", "libm"] } +num-traits = { version = "0.2", features = ["i128"] } prost = { version = "0.11" } rand = { version = "0.8", features = ["small_rng"] } regex = { version = "1" }