From 8298bc903c0148db187074374b85f6ae5c0f9347 Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Mon, 3 Mar 2025 17:52:59 +0000 Subject: [PATCH] pageserver: handle in-memory layer overlaps with persistent layers (#11000) ## Problem Image layers may be nested inside in-memory layers as diagnosed [here](https://github.com/neondatabase/neon/issues/10720#issuecomment-2649419252). The read path doesn't support this and may skip over the image layer, resulting in a failure to reconstruct the page. ## Summary of changes We already support nesting of image layers inside delta layers. The logic lives in `LayerMap::select_layer`. The main goal of this PR is to propagate the candidate in-memory layer down to that point and update the selection logic. Important changes are: 1. Support partial reads for the in-memory layer. Previously, we could only specify the start LSN of the read. We need to control the end LSN too. 2. `LayerMap::ranged_search` considers in-memory layers too. Previously, the search for in-memory layers was done explicitly in `Timeline::get_reconstruct_data_timeline`. Note that `LayerMap::ranged_search` now returns a weak readable layer which the `LayerManager` can upgrade. This dance is such that we can unit test the layer selection logic. 3. Update `LayerMap::select_layer` to consider the candidate in-memory layer too Loosely related drive bys: 1. Remove the "keys not found" tracking in the ranged search. This wasn't used anywhere and it just complicates things. 2. Remove the difficulty map stuff from the layer map. Again, not used anywhere. Closes https://github.com/neondatabase/neon/issues/9185 Closes https://github.com/neondatabase/neon/issues/10720 --- pageserver/benches/bench_layer_map.rs | 76 -- pageserver/src/tenant.rs | 179 +++- pageserver/src/tenant/layer_map.rs | 809 +++++++++++++----- .../layer_map/historic_layer_coverage.rs | 6 + pageserver/src/tenant/storage_layer.rs | 9 +- .../tenant/storage_layer/inmemory_layer.rs | 4 +- .../src/tenant/storage_layer/layer/tests.rs | 1 + pageserver/src/tenant/timeline.rs | 143 +++- .../src/tenant/timeline/layer_manager.rs | 38 +- 9 files changed, 949 insertions(+), 316 deletions(-) diff --git a/pageserver/benches/bench_layer_map.rs b/pageserver/benches/bench_layer_map.rs index e11af49449..e1444778b8 100644 --- a/pageserver/benches/bench_layer_map.rs +++ b/pageserver/benches/bench_layer_map.rs @@ -7,7 +7,6 @@ use std::time::Instant; use criterion::measurement::WallTime; use criterion::{BenchmarkGroup, Criterion, black_box, criterion_group, criterion_main}; -use pageserver::keyspace::{KeyPartitioning, KeySpace}; use pageserver::tenant::layer_map::LayerMap; use pageserver::tenant::storage_layer::{LayerName, PersistentLayerDesc}; use pageserver_api::key::Key; @@ -72,41 +71,6 @@ fn uniform_query_pattern(layer_map: &LayerMap) -> Vec<(Key, Lsn)> { .collect() } -// Construct a partitioning for testing get_difficulty map when we -// don't have an exact result of `collect_keyspace` to work with. -fn uniform_key_partitioning(layer_map: &LayerMap, _lsn: Lsn) -> KeyPartitioning { - let mut parts = Vec::new(); - - // We add a partition boundary at the start of each image layer, - // no matter what lsn range it covers. This is just the easiest - // thing to do. A better thing to do would be to get a real - // partitioning from some database. Even better, remove the need - // for key partitions by deciding where to create image layers - // directly based on a coverage-based difficulty map. - let mut keys: Vec<_> = layer_map - .iter_historic_layers() - .filter_map(|l| { - if l.is_incremental() { - None - } else { - let kr = l.get_key_range(); - Some(kr.start.next()) - } - }) - .collect(); - keys.sort(); - - let mut current_key = Key::from_hex("000000000000000000000000000000000000").unwrap(); - for key in keys { - parts.push(KeySpace { - ranges: vec![current_key..key], - }); - current_key = key; - } - - KeyPartitioning { parts } -} - // Benchmark using metadata extracted from our performance test environment, from // a project where we have run pgbench many timmes. The pgbench database was initialized // between each test run. @@ -148,41 +112,6 @@ fn bench_from_real_project(c: &mut Criterion) { // Choose uniformly distributed queries let queries: Vec<(Key, Lsn)> = uniform_query_pattern(&layer_map); - // Choose inputs for get_difficulty_map - let latest_lsn = layer_map - .iter_historic_layers() - .map(|l| l.get_lsn_range().end) - .max() - .unwrap(); - let partitioning = uniform_key_partitioning(&layer_map, latest_lsn); - - // Check correctness of get_difficulty_map - // TODO put this in a dedicated test outside of this mod - { - println!("running correctness check"); - - let now = Instant::now(); - let result_bruteforce = layer_map.get_difficulty_map_bruteforce(latest_lsn, &partitioning); - assert!(result_bruteforce.len() == partitioning.parts.len()); - println!("Finished bruteforce in {:?}", now.elapsed()); - - let now = Instant::now(); - let result_fast = layer_map.get_difficulty_map(latest_lsn, &partitioning, None); - assert!(result_fast.len() == partitioning.parts.len()); - println!("Finished fast in {:?}", now.elapsed()); - - // Assert results are equal. Manually iterate for easier debugging. - let zip = std::iter::zip( - &partitioning.parts, - std::iter::zip(result_bruteforce, result_fast), - ); - for (_part, (bruteforce, fast)) in zip { - assert_eq!(bruteforce, fast); - } - - println!("No issues found"); - } - // Define and name the benchmark function let mut group = c.benchmark_group("real_map"); group.bench_function("uniform_queries", |b| { @@ -192,11 +121,6 @@ fn bench_from_real_project(c: &mut Criterion) { } }); }); - group.bench_function("get_difficulty_map", |b| { - b.iter(|| { - layer_map.get_difficulty_map(latest_lsn, &partitioning, Some(3)); - }); - }); group.finish(); } diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 11d656eb25..776e523c2e 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -2501,6 +2501,7 @@ impl Tenant { initdb_lsn: Lsn, pg_version: u32, ctx: &RequestContext, + in_memory_layer_desc: Vec, delta_layer_desc: Vec, image_layer_desc: Vec<(Lsn, Vec<(pageserver_api::key::Key, bytes::Bytes)>)>, end_lsn: Lsn, @@ -2522,6 +2523,11 @@ impl Tenant { .force_create_image_layer(lsn, images, Some(initdb_lsn), ctx) .await?; } + for in_memory in in_memory_layer_desc { + tline + .force_create_in_memory_layer(in_memory, Some(initdb_lsn), ctx) + .await?; + } let layer_names = tline .layers .read() @@ -5913,6 +5919,8 @@ mod tests { #[cfg(feature = "testing")] use timeline::GcInfo; #[cfg(feature = "testing")] + use timeline::InMemoryLayerTestDesc; + #[cfg(feature = "testing")] use timeline::compaction::{KeyHistoryRetention, KeyLogAtLsn}; use timeline::{CompactOptions, DeltaLayerTestDesc}; use utils::id::TenantId; @@ -7925,6 +7933,7 @@ mod tests { Lsn(0x10), DEFAULT_PG_VERSION, &ctx, + Vec::new(), // in-memory layers Vec::new(), // delta layers vec![(Lsn(0x20), vec![(base_key, test_img("data key 1"))])], // image layers Lsn(0x20), // it's fine to not advance LSN to 0x30 while using 0x30 to get below because `get_vectored_impl` does not wait for LSN @@ -8012,6 +8021,7 @@ mod tests { Lsn(0x10), DEFAULT_PG_VERSION, &ctx, + Vec::new(), // in-memory layers Vec::new(), // delta layers vec![( Lsn(0x20), @@ -8227,6 +8237,7 @@ mod tests { Lsn(0x10), DEFAULT_PG_VERSION, &ctx, + Vec::new(), // in-memory layers // delta layers vec![ DeltaLayerTestDesc::new_with_inferred_key_range( @@ -8307,6 +8318,7 @@ mod tests { Lsn(0x10), DEFAULT_PG_VERSION, &ctx, + Vec::new(), // in-memory layers // delta layers vec![ DeltaLayerTestDesc::new_with_inferred_key_range( @@ -8380,6 +8392,7 @@ mod tests { Lsn(0x10), DEFAULT_PG_VERSION, &ctx, + Vec::new(), // in-memory layers // delta layers vec![ DeltaLayerTestDesc::new_with_inferred_key_range( @@ -8512,6 +8525,7 @@ mod tests { Lsn(0x10), DEFAULT_PG_VERSION, &ctx, + Vec::new(), // in-memory layers vec![ DeltaLayerTestDesc::new_with_inferred_key_range(Lsn(0x20)..Lsn(0x48), delta1), DeltaLayerTestDesc::new_with_inferred_key_range(Lsn(0x20)..Lsn(0x48), delta2), @@ -8705,6 +8719,7 @@ mod tests { Lsn(0x10), DEFAULT_PG_VERSION, &ctx, + Vec::new(), // in-memory layers vec![DeltaLayerTestDesc::new_with_inferred_key_range( Lsn(0x10)..Lsn(0x40), delta1, @@ -8761,6 +8776,7 @@ mod tests { Lsn(0x10), DEFAULT_PG_VERSION, &ctx, + Vec::new(), // in-memory layers Vec::new(), image_layers, end_lsn, @@ -8967,6 +8983,7 @@ mod tests { Lsn(0x08), DEFAULT_PG_VERSION, &ctx, + Vec::new(), // in-memory layers vec![ DeltaLayerTestDesc::new_with_inferred_key_range( Lsn(0x08)..Lsn(0x10), @@ -8985,7 +9002,7 @@ mod tests { delta3, ), ], // delta layers - vec![], // image layers + vec![], // image layers Lsn(0x50), ) .await? @@ -8996,6 +9013,7 @@ mod tests { Lsn(0x10), DEFAULT_PG_VERSION, &ctx, + Vec::new(), // in-memory layers vec![ DeltaLayerTestDesc::new_with_inferred_key_range( Lsn(0x10)..Lsn(0x48), @@ -9546,6 +9564,7 @@ mod tests { Lsn(0x10), DEFAULT_PG_VERSION, &ctx, + Vec::new(), // in-memory layers vec![ DeltaLayerTestDesc::new_with_inferred_key_range(Lsn(0x10)..Lsn(0x48), delta1), DeltaLayerTestDesc::new_with_inferred_key_range(Lsn(0x10)..Lsn(0x48), delta2), @@ -9793,6 +9812,7 @@ mod tests { Lsn(0x10), DEFAULT_PG_VERSION, &ctx, + Vec::new(), // in-memory layers vec![ // delta1 and delta 2 only contain a single key but multiple updates DeltaLayerTestDesc::new_with_inferred_key_range(Lsn(0x10)..Lsn(0x30), delta1), @@ -10028,6 +10048,7 @@ mod tests { Lsn(0x10), DEFAULT_PG_VERSION, &ctx, + vec![], // in-memory layers vec![], // delta layers vec![(Lsn(0x18), img_layer)], // image layers Lsn(0x18), @@ -10274,6 +10295,7 @@ mod tests { baseline_image_layer_lsn, DEFAULT_PG_VERSION, &ctx, + vec![], // in-memory layers vec![DeltaLayerTestDesc::new_with_inferred_key_range( delta_layer_start_lsn..delta_layer_end_lsn, delta_layer_spec, @@ -10305,6 +10327,158 @@ mod tests { Ok(()) } + #[cfg(feature = "testing")] + #[tokio::test] + async fn test_vectored_read_with_image_layer_inside_inmem() -> anyhow::Result<()> { + let harness = + TenantHarness::create("test_vectored_read_with_image_layer_inside_inmem").await?; + let (tenant, ctx) = harness.load().await; + + let will_init_keys = [2, 6]; + fn get_key(id: u32) -> Key { + let mut key = Key::from_hex("110000000033333333444444445500000000").unwrap(); + key.field6 = id; + key + } + + let mut expected_key_values = HashMap::new(); + + let baseline_image_layer_lsn = Lsn(0x10); + let mut baseline_img_layer = Vec::new(); + for i in 0..5 { + let key = get_key(i); + let value = format!("value {i}@{baseline_image_layer_lsn}"); + + let removed = expected_key_values.insert(key, value.clone()); + assert!(removed.is_none()); + + baseline_img_layer.push((key, Bytes::from(value))); + } + + let nested_image_layer_lsn = Lsn(0x50); + let mut nested_img_layer = Vec::new(); + for i in 5..10 { + let key = get_key(i); + let value = format!("value {i}@{nested_image_layer_lsn}"); + + let removed = expected_key_values.insert(key, value.clone()); + assert!(removed.is_none()); + + nested_img_layer.push((key, Bytes::from(value))); + } + + let frozen_layer = { + let lsn_range = Lsn(0x40)..Lsn(0x60); + let mut data = Vec::new(); + for i in 0..10 { + let key = get_key(i); + let key_in_nested = nested_img_layer + .iter() + .any(|(key_with_img, _)| *key_with_img == key); + let lsn = { + if key_in_nested { + Lsn(nested_image_layer_lsn.0 + 5) + } else { + lsn_range.start + } + }; + + let will_init = will_init_keys.contains(&i); + if will_init { + data.push((key, lsn, Value::WalRecord(NeonWalRecord::wal_init("")))); + + expected_key_values.insert(key, "".to_string()); + } else { + let delta = format!("@{lsn}"); + data.push(( + key, + lsn, + Value::WalRecord(NeonWalRecord::wal_append(&delta)), + )); + + expected_key_values + .get_mut(&key) + .expect("An image exists for each key") + .push_str(delta.as_str()); + } + } + + InMemoryLayerTestDesc { + lsn_range, + is_open: false, + data, + } + }; + + let (open_layer, last_record_lsn) = { + let start_lsn = Lsn(0x70); + let mut data = Vec::new(); + let mut end_lsn = Lsn(0); + for i in 0..10 { + let key = get_key(i); + let lsn = Lsn(start_lsn.0 + i as u64); + let delta = format!("@{lsn}"); + data.push(( + key, + lsn, + Value::WalRecord(NeonWalRecord::wal_append(&delta)), + )); + + expected_key_values + .get_mut(&key) + .expect("An image exists for each key") + .push_str(delta.as_str()); + + end_lsn = std::cmp::max(end_lsn, lsn); + } + + ( + InMemoryLayerTestDesc { + lsn_range: start_lsn..Lsn::MAX, + is_open: true, + data, + }, + end_lsn, + ) + }; + + assert!( + nested_image_layer_lsn > frozen_layer.lsn_range.start + && nested_image_layer_lsn < frozen_layer.lsn_range.end + ); + + let tline = tenant + .create_test_timeline_with_layers( + TIMELINE_ID, + baseline_image_layer_lsn, + DEFAULT_PG_VERSION, + &ctx, + vec![open_layer, frozen_layer], // in-memory layers + Vec::new(), // delta layers + vec![ + (baseline_image_layer_lsn, baseline_img_layer), + (nested_image_layer_lsn, nested_img_layer), + ], // image layers + last_record_lsn, + ) + .await?; + + let keyspace = KeySpace::single(get_key(0)..get_key(10)); + let results = tline + .get_vectored(keyspace, last_record_lsn, IoConcurrency::sequential(), &ctx) + .await + .expect("No vectored errors"); + for (key, res) in results { + let value = res.expect("No key errors"); + let expected_value = expected_key_values.remove(&key).expect("No unknown keys"); + assert_eq!(value, Bytes::from(expected_value.clone())); + + tracing::info!("key={key} value={expected_value}"); + } + + Ok(()) + } + fn sort_layer_key(k1: &PersistentLayerKey, k2: &PersistentLayerKey) -> std::cmp::Ordering { ( k1.is_delta, @@ -10420,6 +10594,7 @@ mod tests { Lsn(0x10), DEFAULT_PG_VERSION, &ctx, + vec![], // in-memory layers vec![ DeltaLayerTestDesc::new_with_inferred_key_range(Lsn(0x20)..Lsn(0x48), delta1), DeltaLayerTestDesc::new_with_inferred_key_range(Lsn(0x20)..Lsn(0x48), delta2), @@ -10804,6 +10979,7 @@ mod tests { Lsn(0x10), DEFAULT_PG_VERSION, &ctx, + vec![], // in-memory layers vec![ // delta1/2/4 only contain a single key but multiple updates DeltaLayerTestDesc::new_with_inferred_key_range(Lsn(0x20)..Lsn(0x28), delta1), @@ -11055,6 +11231,7 @@ mod tests { Lsn(0x10), DEFAULT_PG_VERSION, &ctx, + vec![], // in-memory layers vec![ // delta1/2/4 only contain a single key but multiple updates DeltaLayerTestDesc::new_with_inferred_key_range(Lsn(0x20)..Lsn(0x28), delta1), diff --git a/pageserver/src/tenant/layer_map.rs b/pageserver/src/tenant/layer_map.rs index 59f5a6bd90..2b04e53f10 100644 --- a/pageserver/src/tenant/layer_map.rs +++ b/pageserver/src/tenant/layer_map.rs @@ -62,8 +62,7 @@ use utils::lsn::Lsn; use super::storage_layer::{LayerVisibilityHint, PersistentLayerDesc}; use crate::context::RequestContext; -use crate::keyspace::KeyPartitioning; -use crate::tenant::storage_layer::InMemoryLayer; +use crate::tenant::storage_layer::{InMemoryLayer, ReadableLayerWeak}; /// /// LayerMap tracks what layers exist on a timeline. @@ -167,7 +166,7 @@ impl Drop for BatchedUpdates<'_> { /// Return value of LayerMap::search #[derive(Eq, PartialEq, Debug, Hash)] pub struct SearchResult { - pub layer: Arc, + pub layer: ReadableLayerWeak, pub lsn_floor: Lsn, } @@ -175,19 +174,37 @@ pub struct SearchResult { /// /// Contains a mapping from a layer description to a keyspace /// accumulator that contains all the keys which intersect the layer -/// from the original search space. Keys that were not found are accumulated -/// in a separate key space accumulator. +/// from the original search space. #[derive(Debug)] pub struct RangeSearchResult { pub found: HashMap, - pub not_found: KeySpaceAccum, } impl RangeSearchResult { fn new() -> Self { Self { found: HashMap::new(), - not_found: KeySpaceAccum::new(), + } + } + + fn map_to_in_memory_layer( + in_memory_layer: Option, + range: Range, + ) -> RangeSearchResult { + match in_memory_layer { + Some(inmem) => { + let search_result = SearchResult { + lsn_floor: inmem.get_lsn_range().start, + layer: ReadableLayerWeak::InMemoryLayer(inmem), + }; + + let mut accum = KeySpaceAccum::new(); + accum.add_range(range); + RangeSearchResult { + found: HashMap::from([(search_result, accum)]), + } + } + None => RangeSearchResult::new(), } } } @@ -199,6 +216,7 @@ struct RangeSearchCollector where Iter: Iterator>)>, { + in_memory_layer: Option, delta_coverage: Peekable, image_coverage: Peekable, key_range: Range, @@ -234,10 +252,12 @@ where fn new( key_range: Range, end_lsn: Lsn, + in_memory_layer: Option, delta_coverage: Iter, image_coverage: Iter, ) -> Self { Self { + in_memory_layer, delta_coverage: delta_coverage.peekable(), image_coverage: image_coverage.peekable(), key_range, @@ -266,8 +286,7 @@ where return self.result; } Some(layer_type) => { - // Changes for the range exist. Record anything before the first - // coverage change as not found. + // Changes for the range exist. let coverage_start = layer_type.next_change_at_key(); let range_before = self.key_range.start..coverage_start; self.pad_range(range_before); @@ -297,10 +316,22 @@ where self.result } - /// Mark a range as not found (i.e. no layers intersect it) + /// Map a range which does not intersect any persistent layers to + /// the in-memory layer candidate. fn pad_range(&mut self, key_range: Range) { if !key_range.is_empty() { - self.result.not_found.add_range(key_range); + if let Some(ref inmem) = self.in_memory_layer { + let search_result = SearchResult { + layer: ReadableLayerWeak::InMemoryLayer(inmem.clone()), + lsn_floor: inmem.get_lsn_range().start, + }; + + self.result + .found + .entry(search_result) + .or_default() + .add_range(key_range); + } } } @@ -310,6 +341,7 @@ where let selected = LayerMap::select_layer( self.current_delta.clone(), self.current_image.clone(), + self.in_memory_layer.clone(), self.end_lsn, ); @@ -365,6 +397,24 @@ where } } +#[derive(Debug, PartialEq, Eq, Clone, Hash)] +pub struct InMemoryLayerDesc { + handle: InMemoryLayerHandle, + lsn_range: Range, +} + +impl InMemoryLayerDesc { + pub(crate) fn get_lsn_range(&self) -> Range { + self.lsn_range.clone() + } +} + +#[derive(Debug, PartialEq, Eq, Clone, Hash)] +enum InMemoryLayerHandle { + Open, + Frozen(usize), +} + impl LayerMap { /// /// Find the latest layer (by lsn.end) that covers the given @@ -394,69 +444,161 @@ impl LayerMap { /// layer result, or simplify the api to `get_latest_image` and /// `get_latest_delta`, and only call `get_latest_image` once. /// - /// NOTE: This only searches the 'historic' layers, *not* the - /// 'open' and 'frozen' layers! - /// pub fn search(&self, key: Key, end_lsn: Lsn) -> Option { - let version = self.historic.get().unwrap().get_version(end_lsn.0 - 1)?; + let in_memory_layer = self.search_in_memory_layer(end_lsn); + + let version = match self.historic.get().unwrap().get_version(end_lsn.0 - 1) { + Some(version) => version, + None => { + return in_memory_layer.map(|desc| SearchResult { + lsn_floor: desc.get_lsn_range().start, + layer: ReadableLayerWeak::InMemoryLayer(desc), + }); + } + }; + let latest_delta = version.delta_coverage.query(key.to_i128()); let latest_image = version.image_coverage.query(key.to_i128()); - Self::select_layer(latest_delta, latest_image, end_lsn) + Self::select_layer(latest_delta, latest_image, in_memory_layer, end_lsn) } + /// Select a layer from three potential candidates (in-memory, delta and image layer). + /// The candidates represent the first layer of each type which intersect a key range. + /// + /// Layer types have an in implicit priority (image > delta > in-memory). For instance, + /// if we have the option of reading an LSN range from both an image and a delta, we + /// should read from the image. fn select_layer( delta_layer: Option>, image_layer: Option>, + in_memory_layer: Option, end_lsn: Lsn, ) -> Option { assert!(delta_layer.as_ref().is_none_or(|l| l.is_delta())); assert!(image_layer.as_ref().is_none_or(|l| !l.is_delta())); - match (delta_layer, image_layer) { - (None, None) => None, - (None, Some(image)) => { + match (delta_layer, image_layer, in_memory_layer) { + (None, None, None) => None, + (None, Some(image), None) => { let lsn_floor = image.get_lsn_range().start; Some(SearchResult { - layer: image, + layer: ReadableLayerWeak::PersistentLayer(image), lsn_floor, }) } - (Some(delta), None) => { + (Some(delta), None, None) => { let lsn_floor = delta.get_lsn_range().start; Some(SearchResult { - layer: delta, + layer: ReadableLayerWeak::PersistentLayer(delta), lsn_floor, }) } - (Some(delta), Some(image)) => { + (Some(delta), Some(image), None) => { let img_lsn = image.get_lsn_range().start; let image_is_newer = image.get_lsn_range().end >= delta.get_lsn_range().end; let image_exact_match = img_lsn + 1 == end_lsn; if image_is_newer || image_exact_match { Some(SearchResult { - layer: image, + layer: ReadableLayerWeak::PersistentLayer(image), + lsn_floor: img_lsn, + }) + } else { + // If the delta overlaps with the image in the LSN dimension, do a partial + // up to the image layer. + let lsn_floor = + std::cmp::max(delta.get_lsn_range().start, image.get_lsn_range().start + 1); + Some(SearchResult { + layer: ReadableLayerWeak::PersistentLayer(delta), + lsn_floor, + }) + } + } + (None, None, Some(inmem)) => { + let lsn_floor = inmem.get_lsn_range().start; + Some(SearchResult { + layer: ReadableLayerWeak::InMemoryLayer(inmem), + lsn_floor, + }) + } + (None, Some(image), Some(inmem)) => { + // If the in-memory layer overlaps with the image in the LSN dimension, do a partial + // up to the image layer. + let img_lsn = image.get_lsn_range().start; + let image_is_newer = image.get_lsn_range().end >= inmem.get_lsn_range().end; + let image_exact_match = img_lsn + 1 == end_lsn; + if image_is_newer || image_exact_match { + Some(SearchResult { + layer: ReadableLayerWeak::PersistentLayer(image), lsn_floor: img_lsn, }) } else { let lsn_floor = - std::cmp::max(delta.get_lsn_range().start, image.get_lsn_range().start + 1); + std::cmp::max(inmem.get_lsn_range().start, image.get_lsn_range().start + 1); Some(SearchResult { - layer: delta, + layer: ReadableLayerWeak::InMemoryLayer(inmem), lsn_floor, }) } } + (Some(delta), None, Some(inmem)) => { + // Overlaps between delta and in-memory layers are not a valid + // state, but we handle them here for completeness. + let delta_end = delta.get_lsn_range().end; + let delta_is_newer = delta_end >= inmem.get_lsn_range().end; + let delta_exact_match = delta_end == end_lsn; + if delta_is_newer || delta_exact_match { + Some(SearchResult { + lsn_floor: delta.get_lsn_range().start, + layer: ReadableLayerWeak::PersistentLayer(delta), + }) + } else { + // If the in-memory layer overlaps with the delta in the LSN dimension, do a partial + // up to the delta layer. + let lsn_floor = + std::cmp::max(inmem.get_lsn_range().start, delta.get_lsn_range().end); + Some(SearchResult { + layer: ReadableLayerWeak::InMemoryLayer(inmem), + lsn_floor, + }) + } + } + (Some(delta), Some(image), Some(inmem)) => { + // Determine the preferred persistent layer without taking the in-memory layer + // into consideration. + let persistent_res = + Self::select_layer(Some(delta.clone()), Some(image.clone()), None, end_lsn) + .unwrap(); + let persistent_l = match persistent_res.layer { + ReadableLayerWeak::PersistentLayer(l) => l, + ReadableLayerWeak::InMemoryLayer(_) => unreachable!(), + }; + + // Now handle the in-memory layer overlaps. + let inmem_res = if persistent_l.is_delta() { + Self::select_layer(Some(persistent_l), None, Some(inmem.clone()), end_lsn) + .unwrap() + } else { + Self::select_layer(None, Some(persistent_l), Some(inmem.clone()), end_lsn) + .unwrap() + }; + + Some(SearchResult { + layer: inmem_res.layer, + // Use the more restrictive LSN floor + lsn_floor: std::cmp::max(persistent_res.lsn_floor, inmem_res.lsn_floor), + }) + } } } pub fn range_search(&self, key_range: Range, end_lsn: Lsn) -> RangeSearchResult { + let in_memory_layer = self.search_in_memory_layer(end_lsn); + let version = match self.historic.get().unwrap().get_version(end_lsn.0 - 1) { Some(version) => version, None => { - let mut result = RangeSearchResult::new(); - result.not_found.add_range(key_range); - return result; + return RangeSearchResult::map_to_in_memory_layer(in_memory_layer, key_range); } }; @@ -464,7 +606,13 @@ impl LayerMap { let delta_changes = version.delta_coverage.range_overlaps(&raw_range); let image_changes = version.image_coverage.range_overlaps(&raw_range); - let collector = RangeSearchCollector::new(key_range, end_lsn, delta_changes, image_changes); + let collector = RangeSearchCollector::new( + key_range, + end_lsn, + in_memory_layer, + delta_changes, + image_changes, + ); collector.collect() } @@ -571,17 +719,36 @@ impl LayerMap { } /// Get a ref counted pointer for the first in memory layer that matches the provided predicate. - pub fn find_in_memory_layer(&self, mut pred: Pred) -> Option> - where - Pred: FnMut(&Arc) -> bool, - { + pub(crate) fn search_in_memory_layer(&self, below: Lsn) -> Option { + let is_below = |l: &Arc| { + let start_lsn = l.get_lsn_range().start; + below > start_lsn + }; + if let Some(open) = &self.open_layer { - if pred(open) { - return Some(open.clone()); + if is_below(open) { + return Some(InMemoryLayerDesc { + handle: InMemoryLayerHandle::Open, + lsn_range: open.get_lsn_range(), + }); } } - self.frozen_layers.iter().rfind(|l| pred(l)).cloned() + self.frozen_layers + .iter() + .enumerate() + .rfind(|(_idx, l)| is_below(l)) + .map(|(idx, l)| InMemoryLayerDesc { + handle: InMemoryLayerHandle::Frozen(idx), + lsn_range: l.get_lsn_range(), + }) + } + + pub(crate) fn in_memory_layer(&self, desc: &InMemoryLayerDesc) -> Arc { + match desc.handle { + InMemoryLayerHandle::Open => self.open_layer.as_ref().unwrap().clone(), + InMemoryLayerHandle::Frozen(idx) => self.frozen_layers[idx].clone(), + } } /// @@ -737,136 +904,6 @@ impl LayerMap { max_stacked_deltas } - /// Count how many reimage-worthy layers we need to visit for given key-lsn pair. - /// - /// The `partition_range` argument is used as context for the reimage-worthiness decision. - /// - /// Used as a helper for correctness checks only. Performance not critical. - pub fn get_difficulty(&self, lsn: Lsn, key: Key, partition_range: &Range) -> usize { - match self.search(key, lsn) { - Some(search_result) => { - if search_result.layer.is_incremental() { - (Self::is_reimage_worthy(&search_result.layer, partition_range) as usize) - + self.get_difficulty(search_result.lsn_floor, key, partition_range) - } else { - 0 - } - } - None => 0, - } - } - - /// Used for correctness checking. Results are expected to be identical to - /// self.get_difficulty_map. Assumes self.search is correct. - pub fn get_difficulty_map_bruteforce( - &self, - lsn: Lsn, - partitioning: &KeyPartitioning, - ) -> Vec { - // Looking at the difficulty as a function of key, it could only increase - // when a delta layer starts or an image layer ends. Therefore it's sufficient - // to check the difficulties at: - // - the key.start for each non-empty part range - // - the key.start for each delta - // - the key.end for each image - let keys_iter: Box> = { - let mut keys: Vec = self - .iter_historic_layers() - .map(|layer| { - if layer.is_incremental() { - layer.get_key_range().start - } else { - layer.get_key_range().end - } - }) - .collect(); - keys.sort(); - Box::new(keys.into_iter()) - }; - let mut keys_iter = keys_iter.peekable(); - - // Iter the partition and keys together and query all the necessary - // keys, computing the max difficulty for each part. - partitioning - .parts - .iter() - .map(|part| { - let mut difficulty = 0; - // Partition ranges are assumed to be sorted and disjoint - // TODO assert it - for range in &part.ranges { - if !range.is_empty() { - difficulty = - std::cmp::max(difficulty, self.get_difficulty(lsn, range.start, range)); - } - while let Some(key) = keys_iter.peek() { - if key >= &range.end { - break; - } - let key = keys_iter.next().unwrap(); - if key < range.start { - continue; - } - difficulty = - std::cmp::max(difficulty, self.get_difficulty(lsn, key, range)); - } - } - difficulty - }) - .collect() - } - - /// For each part of a keyspace partitioning, return the maximum number of layers - /// that would be needed for page reconstruction in that part at the given LSN. - /// - /// If `limit` is provided we don't try to count above that number. - /// - /// This method is used to decide where to create new image layers. Computing the - /// result for the entire partitioning at once allows this function to be more - /// efficient, and further optimization is possible by using iterators instead, - /// to allow early return. - /// - /// TODO actually use this method instead of count_deltas. Currently we only use - /// it for benchmarks. - pub fn get_difficulty_map( - &self, - lsn: Lsn, - partitioning: &KeyPartitioning, - limit: Option, - ) -> Vec { - // TODO This is a naive implementation. Perf improvements to do: - // 1. Instead of calling self.image_coverage and self.count_deltas, - // iterate the image and delta coverage only once. - partitioning - .parts - .iter() - .map(|part| { - let mut difficulty = 0; - for range in &part.ranges { - if limit == Some(difficulty) { - break; - } - for (img_range, last_img) in self.image_coverage(range, lsn) { - if limit == Some(difficulty) { - break; - } - let img_lsn = if let Some(last_img) = last_img { - last_img.get_lsn_range().end - } else { - Lsn(0) - }; - - if img_lsn < lsn { - let num_deltas = self.count_deltas(&img_range, &(img_lsn..lsn), limit); - difficulty = std::cmp::max(difficulty, num_deltas); - } - } - } - difficulty - }) - .collect() - } - /// Return all L0 delta layers pub fn level0_deltas(&self) -> &Vec> { &self.l0_delta_layers @@ -1069,6 +1106,10 @@ mod tests { use std::collections::HashMap; use std::path::PathBuf; + use crate::{ + DEFAULT_PG_VERSION, + tenant::{harness::TenantHarness, storage_layer::LayerName}, + }; use pageserver_api::key::DBDIR_KEY; use pageserver_api::keyspace::{KeySpace, KeySpaceRandomAccum}; use utils::id::{TenantId, TimelineId}; @@ -1076,7 +1117,6 @@ mod tests { use super::*; use crate::tenant::IndexPart; - use crate::tenant::storage_layer::LayerName; #[derive(Clone)] struct LayerDesc { @@ -1101,7 +1141,6 @@ mod tests { } fn assert_range_search_result_eq(lhs: RangeSearchResult, rhs: RangeSearchResult) { - assert_eq!(lhs.not_found.to_keyspace(), rhs.not_found.to_keyspace()); let lhs: HashMap = lhs .found .into_iter() @@ -1127,17 +1166,12 @@ mod tests { let mut key = key_range.start; while key != key_range.end { let res = layer_map.search(key, end_lsn); - match res { - Some(res) => { - range_search_result - .found - .entry(res) - .or_default() - .add_key(key); - } - None => { - range_search_result.not_found.add_key(key); - } + if let Some(res) = res { + range_search_result + .found + .entry(res) + .or_default() + .add_key(key); } key = key.next(); @@ -1152,20 +1186,49 @@ mod tests { let range = Key::from_i128(100)..Key::from_i128(200); let res = layer_map.range_search(range.clone(), Lsn(100)); - assert_eq!( - res.not_found.to_keyspace(), - KeySpace { - ranges: vec![range] - } - ); + assert_range_search_result_eq(res, RangeSearchResult::new()); } - #[test] - fn ranged_search() { + #[tokio::test] + async fn ranged_search() { + let harness = TenantHarness::create("ranged_search").await.unwrap(); + let (tenant, ctx) = harness.load().await; + let timeline_id = TimelineId::generate(); + // Create the timeline such that the in-memory layers can be written + // to the timeline directory. + tenant + .create_test_timeline(timeline_id, Lsn(0x10), DEFAULT_PG_VERSION, &ctx) + .await + .unwrap(); + + let gate = utils::sync::gate::Gate::default(); + let add_in_memory_layer = async |layer_map: &mut LayerMap, lsn_range: Range| { + let layer = InMemoryLayer::create( + harness.conf, + timeline_id, + harness.tenant_shard_id, + lsn_range.start, + &gate, + &ctx, + ) + .await + .unwrap(); + + layer.freeze(lsn_range.end).await; + + layer_map.frozen_layers.push_back(Arc::new(layer)); + }; + + let in_memory_layer_configurations = [ + vec![], + // Overlaps with the top-most image + vec![Lsn(35)..Lsn(50)], + ]; + let layers = vec![ LayerDesc { key_range: Key::from_i128(15)..Key::from_i128(50), - lsn_range: Lsn(0)..Lsn(5), + lsn_range: Lsn(5)..Lsn(6), is_delta: false, }, LayerDesc { @@ -1185,19 +1248,27 @@ mod tests { }, LayerDesc { key_range: Key::from_i128(35)..Key::from_i128(40), - lsn_range: Lsn(35)..Lsn(40), + lsn_range: Lsn(40)..Lsn(41), is_delta: false, }, ]; - let layer_map = create_layer_map(layers.clone()); - for start in 0..60 { - for end in (start + 1)..60 { - let range = Key::from_i128(start)..Key::from_i128(end); - let result = layer_map.range_search(range.clone(), Lsn(100)); - let expected = brute_force_range_search(&layer_map, range, Lsn(100)); + let mut layer_map = create_layer_map(layers.clone()); + for in_memory_layers in in_memory_layer_configurations { + for in_mem_layer_range in in_memory_layers { + add_in_memory_layer(&mut layer_map, in_mem_layer_range).await; + } - assert_range_search_result_eq(result, expected); + for start in 0..60 { + for end in (start + 1)..60 { + let range = Key::from_i128(start)..Key::from_i128(end); + let result = layer_map.range_search(range.clone(), Lsn(100)); + let expected = brute_force_range_search(&layer_map, range, Lsn(100)); + + eprintln!("{start}..{end}: {result:?}"); + + assert_range_search_result_eq(result, expected); + } } } } @@ -1490,12 +1561,348 @@ mod tests { // Sanity: the layer that holds latest data for the DBDIR key should always be visible // (just using this key as a key that will always exist for any layermap fixture) - let dbdir_layer = layer_map - .search(DBDIR_KEY, index.metadata.disk_consistent_lsn()) - .unwrap(); + let dbdir_layer = { + let readable_layer = layer_map + .search(DBDIR_KEY, index.metadata.disk_consistent_lsn()) + .unwrap(); + + match readable_layer.layer { + ReadableLayerWeak::PersistentLayer(desc) => desc, + ReadableLayerWeak::InMemoryLayer(_) => unreachable!(""), + } + }; assert!(matches!( - layer_visibilities.get(&dbdir_layer.layer).unwrap(), + layer_visibilities.get(&dbdir_layer).unwrap(), LayerVisibilityHint::Visible )); } } + +#[cfg(test)] +mod select_layer_tests { + use super::*; + + fn create_persistent_layer( + start_lsn: u64, + end_lsn: u64, + is_delta: bool, + ) -> Arc { + if !is_delta { + assert_eq!(end_lsn, start_lsn + 1); + } + + Arc::new(PersistentLayerDesc::new_test( + Key::MIN..Key::MAX, + Lsn(start_lsn)..Lsn(end_lsn), + is_delta, + )) + } + + fn create_inmem_layer(start_lsn: u64, end_lsn: u64) -> InMemoryLayerDesc { + InMemoryLayerDesc { + handle: InMemoryLayerHandle::Open, + lsn_range: Lsn(start_lsn)..Lsn(end_lsn), + } + } + + #[test] + fn test_select_layer_empty() { + assert!(LayerMap::select_layer(None, None, None, Lsn(100)).is_none()); + } + + #[test] + fn test_select_layer_only_delta() { + let delta = create_persistent_layer(10, 20, true); + let result = LayerMap::select_layer(Some(delta.clone()), None, None, Lsn(100)).unwrap(); + + assert_eq!(result.lsn_floor, Lsn(10)); + assert!( + matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &delta)) + ); + } + + #[test] + fn test_select_layer_only_image() { + let image = create_persistent_layer(10, 11, false); + let result = LayerMap::select_layer(None, Some(image.clone()), None, Lsn(100)).unwrap(); + + assert_eq!(result.lsn_floor, Lsn(10)); + assert!( + matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &image)) + ); + } + + #[test] + fn test_select_layer_only_inmem() { + let inmem = create_inmem_layer(10, 20); + let result = LayerMap::select_layer(None, None, Some(inmem.clone()), Lsn(100)).unwrap(); + + assert_eq!(result.lsn_floor, Lsn(10)); + assert!(matches!(result.layer, ReadableLayerWeak::InMemoryLayer(l) if l == inmem)); + } + + #[test] + fn test_select_layer_image_inside_delta() { + let delta = create_persistent_layer(10, 20, true); + let image = create_persistent_layer(15, 16, false); + + let result = + LayerMap::select_layer(Some(delta.clone()), Some(image.clone()), None, Lsn(100)) + .unwrap(); + + assert_eq!(result.lsn_floor, Lsn(16)); + assert!( + matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &delta)) + ); + + let result = LayerMap::select_layer( + Some(delta.clone()), + Some(image.clone()), + None, + result.lsn_floor, + ) + .unwrap(); + + assert_eq!(result.lsn_floor, Lsn(15)); + assert!( + matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &image)) + ); + } + + #[test] + fn test_select_layer_newer_image() { + let delta = create_persistent_layer(10, 20, true); + let image = create_persistent_layer(25, 26, false); + + let result = + LayerMap::select_layer(Some(delta.clone()), Some(image.clone()), None, Lsn(30)) + .unwrap(); + + assert_eq!(result.lsn_floor, Lsn(25)); + assert!( + matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &image)) + ); + + let result = + LayerMap::select_layer(Some(delta.clone()), None, None, result.lsn_floor).unwrap(); + + assert_eq!(result.lsn_floor, Lsn(10)); + assert!( + matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &delta)) + ); + } + + #[test] + fn test_select_layer_delta_with_older_image() { + let delta = create_persistent_layer(15, 25, true); + let image = create_persistent_layer(10, 11, false); + + let result = + LayerMap::select_layer(Some(delta.clone()), Some(image.clone()), None, Lsn(30)) + .unwrap(); + + assert_eq!(result.lsn_floor, Lsn(15)); + assert!( + matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &delta)) + ); + + let result = + LayerMap::select_layer(None, Some(image.clone()), None, result.lsn_floor).unwrap(); + + assert_eq!(result.lsn_floor, Lsn(10)); + assert!( + matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &image)) + ); + } + + #[test] + fn test_select_layer_image_inside_inmem() { + let image = create_persistent_layer(15, 16, false); + let inmem = create_inmem_layer(10, 25); + + let result = + LayerMap::select_layer(None, Some(image.clone()), Some(inmem.clone()), Lsn(30)) + .unwrap(); + + assert_eq!(result.lsn_floor, Lsn(16)); + assert!(matches!(result.layer, ReadableLayerWeak::InMemoryLayer(l) if l == inmem)); + + let result = LayerMap::select_layer( + None, + Some(image.clone()), + Some(inmem.clone()), + result.lsn_floor, + ) + .unwrap(); + + assert_eq!(result.lsn_floor, Lsn(15)); + assert!( + matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &image)) + ); + + let result = + LayerMap::select_layer(None, None, Some(inmem.clone()), result.lsn_floor).unwrap(); + assert_eq!(result.lsn_floor, Lsn(10)); + assert!(matches!(result.layer, ReadableLayerWeak::InMemoryLayer(l) if l == inmem)); + } + + #[test] + fn test_select_layer_delta_inside_inmem() { + let delta_top = create_persistent_layer(15, 20, true); + let delta_bottom = create_persistent_layer(10, 15, true); + let inmem = create_inmem_layer(15, 25); + + let result = + LayerMap::select_layer(Some(delta_top.clone()), None, Some(inmem.clone()), Lsn(30)) + .unwrap(); + + assert_eq!(result.lsn_floor, Lsn(20)); + assert!(matches!(result.layer, ReadableLayerWeak::InMemoryLayer(l) if l == inmem)); + + let result = LayerMap::select_layer( + Some(delta_top.clone()), + None, + Some(inmem.clone()), + result.lsn_floor, + ) + .unwrap(); + + assert_eq!(result.lsn_floor, Lsn(15)); + assert!( + matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &delta_top)) + ); + + let result = LayerMap::select_layer( + Some(delta_bottom.clone()), + None, + Some(inmem.clone()), + result.lsn_floor, + ) + .unwrap(); + assert_eq!(result.lsn_floor, Lsn(10)); + assert!( + matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &delta_bottom)) + ); + } + + #[test] + fn test_select_layer_all_overlap_1() { + let inmem = create_inmem_layer(10, 30); + let delta = create_persistent_layer(15, 25, true); + let image = create_persistent_layer(20, 21, false); + + let result = LayerMap::select_layer( + Some(delta.clone()), + Some(image.clone()), + Some(inmem.clone()), + Lsn(50), + ) + .unwrap(); + + assert_eq!(result.lsn_floor, Lsn(25)); + assert!(matches!(result.layer, ReadableLayerWeak::InMemoryLayer(l) if l == inmem)); + + let result = LayerMap::select_layer( + Some(delta.clone()), + Some(image.clone()), + Some(inmem.clone()), + result.lsn_floor, + ) + .unwrap(); + + assert_eq!(result.lsn_floor, Lsn(21)); + assert!( + matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &delta)) + ); + + let result = LayerMap::select_layer( + Some(delta.clone()), + Some(image.clone()), + Some(inmem.clone()), + result.lsn_floor, + ) + .unwrap(); + + assert_eq!(result.lsn_floor, Lsn(20)); + assert!( + matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &image)) + ); + } + + #[test] + fn test_select_layer_all_overlap_2() { + let inmem = create_inmem_layer(20, 30); + let delta = create_persistent_layer(10, 40, true); + let image = create_persistent_layer(25, 26, false); + + let result = LayerMap::select_layer( + Some(delta.clone()), + Some(image.clone()), + Some(inmem.clone()), + Lsn(50), + ) + .unwrap(); + + assert_eq!(result.lsn_floor, Lsn(26)); + assert!( + matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &delta)) + ); + + let result = LayerMap::select_layer( + Some(delta.clone()), + Some(image.clone()), + Some(inmem.clone()), + result.lsn_floor, + ) + .unwrap(); + + assert_eq!(result.lsn_floor, Lsn(25)); + assert!( + matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &image)) + ); + } + + #[test] + fn test_select_layer_all_overlap_3() { + let inmem = create_inmem_layer(30, 40); + let delta = create_persistent_layer(10, 30, true); + let image = create_persistent_layer(20, 21, false); + + let result = LayerMap::select_layer( + Some(delta.clone()), + Some(image.clone()), + Some(inmem.clone()), + Lsn(50), + ) + .unwrap(); + + assert_eq!(result.lsn_floor, Lsn(30)); + assert!(matches!(result.layer, ReadableLayerWeak::InMemoryLayer(l) if l == inmem)); + + let result = LayerMap::select_layer( + Some(delta.clone()), + Some(image.clone()), + None, + result.lsn_floor, + ) + .unwrap(); + + assert_eq!(result.lsn_floor, Lsn(21)); + assert!( + matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &delta)) + ); + + let result = LayerMap::select_layer( + Some(delta.clone()), + Some(image.clone()), + None, + result.lsn_floor, + ) + .unwrap(); + + assert_eq!(result.lsn_floor, Lsn(20)); + assert!( + matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &image)) + ); + } +} diff --git a/pageserver/src/tenant/layer_map/historic_layer_coverage.rs b/pageserver/src/tenant/layer_map/historic_layer_coverage.rs index f8bec48886..b3dc8e56a3 100644 --- a/pageserver/src/tenant/layer_map/historic_layer_coverage.rs +++ b/pageserver/src/tenant/layer_map/historic_layer_coverage.rs @@ -63,6 +63,8 @@ pub struct HistoricLayerCoverage { /// The latest state head: LayerCoverageTuple, + /// TODO: this could be an ordered vec using binary search. + /// We push into this map everytime we add a layer, so might see some benefit /// All previous states historic: BTreeMap>, } @@ -419,6 +421,10 @@ pub struct BufferedHistoricLayerCoverage { buffer: BTreeMap>, /// All current layers. This is not used for search. Only to make rebuilds easier. + // TODO: This map is never cleared. Rebuilds could use the post-trim last entry of + // [`Self::historic_coverage`] instead of doubling memory usage. + // [`Self::len`]: can require rebuild and serve from latest historic + // [`Self::iter`]: already requires rebuild => can serve from latest historic layers: BTreeMap, } diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index 7f313f46a2..ece163b24a 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -40,6 +40,7 @@ use utils::sync::gate::GateGuard; use self::inmemory_layer::InMemoryLayerFileId; use super::PageReconstructError; +use super::layer_map::InMemoryLayerDesc; use super::timeline::{GetVectoredError, ReadPath}; use crate::config::PageServerConf; use crate::context::{AccessStatsBehavior, RequestContext}; @@ -721,6 +722,12 @@ struct LayerToVisitId { lsn_floor: Lsn, } +#[derive(Debug, PartialEq, Eq, Hash)] +pub enum ReadableLayerWeak { + PersistentLayer(Arc), + InMemoryLayer(InMemoryLayerDesc), +} + /// Layer wrapper for the read path. Note that it is valid /// to use these layers even after external operations have /// been performed on them (compaction, freeze, etc.). @@ -873,7 +880,7 @@ impl ReadableLayer { } ReadableLayer::InMemoryLayer(layer) => { layer - .get_values_reconstruct_data(keyspace, lsn_range.end, reconstruct_state, ctx) + .get_values_reconstruct_data(keyspace, lsn_range, reconstruct_state, ctx) .await } } diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index ffdfe1dc27..46135b5330 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -416,7 +416,7 @@ impl InMemoryLayer { pub(crate) async fn get_values_reconstruct_data( self: &Arc, keyspace: KeySpace, - end_lsn: Lsn, + lsn_range: Range, reconstruct_state: &mut ValuesReconstructState, ctx: &RequestContext, ) -> Result<(), GetVectoredError> { @@ -433,8 +433,6 @@ impl InMemoryLayer { let mut reads: HashMap> = HashMap::new(); let mut ios: HashMap<(Key, Lsn), OnDiskValueIo> = Default::default(); - let lsn_range = self.start_lsn..end_lsn; - for range in keyspace.ranges.iter() { for (key, vec_map) in inner .index diff --git a/pageserver/src/tenant/storage_layer/layer/tests.rs b/pageserver/src/tenant/storage_layer/layer/tests.rs index d43dfefdbc..a7f3c6b8c5 100644 --- a/pageserver/src/tenant/storage_layer/layer/tests.rs +++ b/pageserver/src/tenant/storage_layer/layer/tests.rs @@ -49,6 +49,7 @@ async fn smoke_test() { Lsn(0x10), 14, &ctx, + Default::default(), // in-memory layers Default::default(), image_layers, Lsn(0x100), diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 851f84f603..17dbcee74e 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -3914,39 +3914,22 @@ impl Timeline { let guard = timeline.layers.read().await; let layers = guard.layer_map()?; - let in_memory_layer = layers.find_in_memory_layer(|l| { - let start_lsn = l.get_lsn_range().start; - cont_lsn > start_lsn - }); + for range in unmapped_keyspace.ranges.iter() { + let results = layers.range_search(range.clone(), cont_lsn); - match in_memory_layer { - Some(l) => { - let lsn_range = l.get_lsn_range().start..cont_lsn; - fringe.update( - ReadableLayer::InMemoryLayer(l), - unmapped_keyspace.clone(), - lsn_range, - ); - } - None => { - for range in unmapped_keyspace.ranges.iter() { - let results = layers.range_search(range.clone(), cont_lsn); - - results - .found - .into_iter() - .map(|(SearchResult { layer, lsn_floor }, keyspace_accum)| { - ( - ReadableLayer::PersistentLayer(guard.get_from_desc(&layer)), - keyspace_accum.to_keyspace(), - lsn_floor..cont_lsn, - ) - }) - .for_each(|(layer, keyspace, lsn_range)| { - fringe.update(layer, keyspace, lsn_range) - }); - } - } + results + .found + .into_iter() + .map(|(SearchResult { layer, lsn_floor }, keyspace_accum)| { + ( + guard.upgrade(layer), + keyspace_accum.to_keyspace(), + lsn_floor..cont_lsn, + ) + }) + .for_each(|(layer, keyspace, lsn_range)| { + fringe.update(layer, keyspace, lsn_range) + }); } // It's safe to drop the layer map lock after planning the next round of reads. @@ -5555,6 +5538,14 @@ pub struct DeltaLayerTestDesc { pub data: Vec<(Key, Lsn, Value)>, } +#[cfg(test)] +#[derive(Clone)] +pub struct InMemoryLayerTestDesc { + pub lsn_range: Range, + pub data: Vec<(Key, Lsn, Value)>, + pub is_open: bool, +} + #[cfg(test)] impl DeltaLayerTestDesc { pub fn new(lsn_range: Range, key_range: Range, data: Vec<(Key, Lsn, Value)>) -> Self { @@ -6567,6 +6558,92 @@ impl Timeline { Ok(()) } + /// Force create an in-memory layer and place them into the layer map. + #[cfg(test)] + pub(super) async fn force_create_in_memory_layer( + self: &Arc, + mut in_memory: InMemoryLayerTestDesc, + check_start_lsn: Option, + ctx: &RequestContext, + ) -> anyhow::Result<()> { + use utils::bin_ser::BeSer; + + // Validate LSNs + if let Some(check_start_lsn) = check_start_lsn { + assert!(in_memory.lsn_range.start >= check_start_lsn); + } + + let last_record_lsn = self.get_last_record_lsn(); + let layer_end_lsn = if in_memory.is_open { + in_memory + .data + .iter() + .map(|(_key, lsn, _value)| lsn) + .max() + .cloned() + } else { + Some(in_memory.lsn_range.end) + }; + + if let Some(end) = layer_end_lsn { + assert!( + end <= last_record_lsn, + "advance last record lsn before inserting a layer, end_lsn={}, last_record_lsn={}", + end, + last_record_lsn, + ); + } + + in_memory.data.iter().for_each(|(_key, lsn, _value)| { + assert!(*lsn >= in_memory.lsn_range.start); + assert!(*lsn < in_memory.lsn_range.end); + }); + + // Build the batch + in_memory + .data + .sort_unstable_by(|(ka, la, _), (kb, lb, _)| (ka, la).cmp(&(kb, lb))); + + let data = in_memory + .data + .into_iter() + .map(|(key, lsn, value)| { + let value_size = value.serialized_size().unwrap() as usize; + (key.to_compact(), lsn, value_size, value) + }) + .collect::>(); + + let batch = SerializedValueBatch::from_values(data); + + // Create the in-memory layer and write the batch into it + let layer = InMemoryLayer::create( + self.conf, + self.timeline_id, + self.tenant_shard_id, + in_memory.lsn_range.start, + &self.gate, + ctx, + ) + .await + .unwrap(); + + layer.put_batch(batch, ctx).await.unwrap(); + if !in_memory.is_open { + layer.freeze(in_memory.lsn_range.end).await; + } + + info!("force created in-memory layer {:?}", in_memory.lsn_range); + + // Link the layer to the layer map + { + let mut guard = self.layers.write().await; + let layer_map = guard.open_mut().unwrap(); + layer_map.force_insert_in_memory_layer(Arc::new(layer)); + } + + Ok(()) + } + /// Return all keys at the LSN in the image layers #[cfg(test)] pub(crate) async fn inspect_image_layers( @@ -6999,6 +7076,7 @@ mod tests { Lsn(0x10), 14, &ctx, + Vec::new(), // in-memory layers delta_layers, image_layers, Lsn(0x100), @@ -7132,6 +7210,7 @@ mod tests { Lsn(0x10), 14, &ctx, + Vec::new(), // in-memory layers delta_layers, image_layers, Lsn(0x100), diff --git a/pageserver/src/tenant/timeline/layer_manager.rs b/pageserver/src/tenant/timeline/layer_manager.rs index e552ea83de..1b489028dc 100644 --- a/pageserver/src/tenant/timeline/layer_manager.rs +++ b/pageserver/src/tenant/timeline/layer_manager.rs @@ -8,14 +8,14 @@ use tracing::trace; use utils::id::TimelineId; use utils::lsn::{AtomicLsn, Lsn}; -use super::TimelineWriterState; +use super::{ReadableLayer, TimelineWriterState}; use crate::config::PageServerConf; use crate::context::RequestContext; use crate::metrics::TimelineMetrics; use crate::tenant::layer_map::{BatchedUpdates, LayerMap}; use crate::tenant::storage_layer::{ AsLayerDesc, InMemoryLayer, Layer, LayerVisibilityHint, PersistentLayerDesc, - PersistentLayerKey, ResidentLayer, + PersistentLayerKey, ReadableLayerWeak, ResidentLayer, }; /// Provides semantic APIs to manipulate the layer map. @@ -37,6 +37,21 @@ impl Default for LayerManager { } impl LayerManager { + pub(crate) fn upgrade(&self, weak: ReadableLayerWeak) -> ReadableLayer { + match weak { + ReadableLayerWeak::PersistentLayer(desc) => { + ReadableLayer::PersistentLayer(self.get_from_desc(&desc)) + } + ReadableLayerWeak::InMemoryLayer(desc) => { + let inmem = self + .layer_map() + .expect("no concurrent shutdown") + .in_memory_layer(&desc); + ReadableLayer::InMemoryLayer(inmem) + } + } + } + pub(crate) fn get_from_key(&self, key: &PersistentLayerKey) -> Layer { // The assumption for the `expect()` is that all code maintains the following invariant: // A layer's descriptor is present in the LayerMap => the LayerFileManager contains a layer for the descriptor. @@ -470,6 +485,25 @@ impl OpenLayerManager { mapping.remove(layer); layer.delete_on_drop(); } + + #[cfg(test)] + pub(crate) fn force_insert_in_memory_layer(&mut self, layer: Arc) { + use pageserver_api::models::InMemoryLayerInfo; + + match layer.info() { + InMemoryLayerInfo::Open { .. } => { + assert!(self.layer_map.open_layer.is_none()); + self.layer_map.open_layer = Some(layer); + } + InMemoryLayerInfo::Frozen { lsn_start, .. } => { + if let Some(last) = self.layer_map.frozen_layers.back() { + assert!(last.get_lsn_range().end <= lsn_start); + } + + self.layer_map.frozen_layers.push_back(layer); + } + } + } } pub(crate) struct LayerFileManager(HashMap);