diff --git a/pageserver/benches/bench_layer_map.rs b/pageserver/benches/bench_layer_map.rs index e11af49449..e1444778b8 100644 --- a/pageserver/benches/bench_layer_map.rs +++ b/pageserver/benches/bench_layer_map.rs @@ -7,7 +7,6 @@ use std::time::Instant; use criterion::measurement::WallTime; use criterion::{BenchmarkGroup, Criterion, black_box, criterion_group, criterion_main}; -use pageserver::keyspace::{KeyPartitioning, KeySpace}; use pageserver::tenant::layer_map::LayerMap; use pageserver::tenant::storage_layer::{LayerName, PersistentLayerDesc}; use pageserver_api::key::Key; @@ -72,41 +71,6 @@ fn uniform_query_pattern(layer_map: &LayerMap) -> Vec<(Key, Lsn)> { .collect() } -// Construct a partitioning for testing get_difficulty map when we -// don't have an exact result of `collect_keyspace` to work with. -fn uniform_key_partitioning(layer_map: &LayerMap, _lsn: Lsn) -> KeyPartitioning { - let mut parts = Vec::new(); - - // We add a partition boundary at the start of each image layer, - // no matter what lsn range it covers. This is just the easiest - // thing to do. A better thing to do would be to get a real - // partitioning from some database. Even better, remove the need - // for key partitions by deciding where to create image layers - // directly based on a coverage-based difficulty map. - let mut keys: Vec<_> = layer_map - .iter_historic_layers() - .filter_map(|l| { - if l.is_incremental() { - None - } else { - let kr = l.get_key_range(); - Some(kr.start.next()) - } - }) - .collect(); - keys.sort(); - - let mut current_key = Key::from_hex("000000000000000000000000000000000000").unwrap(); - for key in keys { - parts.push(KeySpace { - ranges: vec![current_key..key], - }); - current_key = key; - } - - KeyPartitioning { parts } -} - // Benchmark using metadata extracted from our performance test environment, from // a project where we have run pgbench many timmes. The pgbench database was initialized // between each test run. @@ -148,41 +112,6 @@ fn bench_from_real_project(c: &mut Criterion) { // Choose uniformly distributed queries let queries: Vec<(Key, Lsn)> = uniform_query_pattern(&layer_map); - // Choose inputs for get_difficulty_map - let latest_lsn = layer_map - .iter_historic_layers() - .map(|l| l.get_lsn_range().end) - .max() - .unwrap(); - let partitioning = uniform_key_partitioning(&layer_map, latest_lsn); - - // Check correctness of get_difficulty_map - // TODO put this in a dedicated test outside of this mod - { - println!("running correctness check"); - - let now = Instant::now(); - let result_bruteforce = layer_map.get_difficulty_map_bruteforce(latest_lsn, &partitioning); - assert!(result_bruteforce.len() == partitioning.parts.len()); - println!("Finished bruteforce in {:?}", now.elapsed()); - - let now = Instant::now(); - let result_fast = layer_map.get_difficulty_map(latest_lsn, &partitioning, None); - assert!(result_fast.len() == partitioning.parts.len()); - println!("Finished fast in {:?}", now.elapsed()); - - // Assert results are equal. Manually iterate for easier debugging. - let zip = std::iter::zip( - &partitioning.parts, - std::iter::zip(result_bruteforce, result_fast), - ); - for (_part, (bruteforce, fast)) in zip { - assert_eq!(bruteforce, fast); - } - - println!("No issues found"); - } - // Define and name the benchmark function let mut group = c.benchmark_group("real_map"); group.bench_function("uniform_queries", |b| { @@ -192,11 +121,6 @@ fn bench_from_real_project(c: &mut Criterion) { } }); }); - group.bench_function("get_difficulty_map", |b| { - b.iter(|| { - layer_map.get_difficulty_map(latest_lsn, &partitioning, Some(3)); - }); - }); group.finish(); } diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 11d656eb25..776e523c2e 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -2501,6 +2501,7 @@ impl Tenant { initdb_lsn: Lsn, pg_version: u32, ctx: &RequestContext, + in_memory_layer_desc: Vec, delta_layer_desc: Vec, image_layer_desc: Vec<(Lsn, Vec<(pageserver_api::key::Key, bytes::Bytes)>)>, end_lsn: Lsn, @@ -2522,6 +2523,11 @@ impl Tenant { .force_create_image_layer(lsn, images, Some(initdb_lsn), ctx) .await?; } + for in_memory in in_memory_layer_desc { + tline + .force_create_in_memory_layer(in_memory, Some(initdb_lsn), ctx) + .await?; + } let layer_names = tline .layers .read() @@ -5913,6 +5919,8 @@ mod tests { #[cfg(feature = "testing")] use timeline::GcInfo; #[cfg(feature = "testing")] + use timeline::InMemoryLayerTestDesc; + #[cfg(feature = "testing")] use timeline::compaction::{KeyHistoryRetention, KeyLogAtLsn}; use timeline::{CompactOptions, DeltaLayerTestDesc}; use utils::id::TenantId; @@ -7925,6 +7933,7 @@ mod tests { Lsn(0x10), DEFAULT_PG_VERSION, &ctx, + Vec::new(), // in-memory layers Vec::new(), // delta layers vec![(Lsn(0x20), vec![(base_key, test_img("data key 1"))])], // image layers Lsn(0x20), // it's fine to not advance LSN to 0x30 while using 0x30 to get below because `get_vectored_impl` does not wait for LSN @@ -8012,6 +8021,7 @@ mod tests { Lsn(0x10), DEFAULT_PG_VERSION, &ctx, + Vec::new(), // in-memory layers Vec::new(), // delta layers vec![( Lsn(0x20), @@ -8227,6 +8237,7 @@ mod tests { Lsn(0x10), DEFAULT_PG_VERSION, &ctx, + Vec::new(), // in-memory layers // delta layers vec![ DeltaLayerTestDesc::new_with_inferred_key_range( @@ -8307,6 +8318,7 @@ mod tests { Lsn(0x10), DEFAULT_PG_VERSION, &ctx, + Vec::new(), // in-memory layers // delta layers vec![ DeltaLayerTestDesc::new_with_inferred_key_range( @@ -8380,6 +8392,7 @@ mod tests { Lsn(0x10), DEFAULT_PG_VERSION, &ctx, + Vec::new(), // in-memory layers // delta layers vec![ DeltaLayerTestDesc::new_with_inferred_key_range( @@ -8512,6 +8525,7 @@ mod tests { Lsn(0x10), DEFAULT_PG_VERSION, &ctx, + Vec::new(), // in-memory layers vec![ DeltaLayerTestDesc::new_with_inferred_key_range(Lsn(0x20)..Lsn(0x48), delta1), DeltaLayerTestDesc::new_with_inferred_key_range(Lsn(0x20)..Lsn(0x48), delta2), @@ -8705,6 +8719,7 @@ mod tests { Lsn(0x10), DEFAULT_PG_VERSION, &ctx, + Vec::new(), // in-memory layers vec![DeltaLayerTestDesc::new_with_inferred_key_range( Lsn(0x10)..Lsn(0x40), delta1, @@ -8761,6 +8776,7 @@ mod tests { Lsn(0x10), DEFAULT_PG_VERSION, &ctx, + Vec::new(), // in-memory layers Vec::new(), image_layers, end_lsn, @@ -8967,6 +8983,7 @@ mod tests { Lsn(0x08), DEFAULT_PG_VERSION, &ctx, + Vec::new(), // in-memory layers vec![ DeltaLayerTestDesc::new_with_inferred_key_range( Lsn(0x08)..Lsn(0x10), @@ -8985,7 +9002,7 @@ mod tests { delta3, ), ], // delta layers - vec![], // image layers + vec![], // image layers Lsn(0x50), ) .await? @@ -8996,6 +9013,7 @@ mod tests { Lsn(0x10), DEFAULT_PG_VERSION, &ctx, + Vec::new(), // in-memory layers vec![ DeltaLayerTestDesc::new_with_inferred_key_range( Lsn(0x10)..Lsn(0x48), @@ -9546,6 +9564,7 @@ mod tests { Lsn(0x10), DEFAULT_PG_VERSION, &ctx, + Vec::new(), // in-memory layers vec![ DeltaLayerTestDesc::new_with_inferred_key_range(Lsn(0x10)..Lsn(0x48), delta1), DeltaLayerTestDesc::new_with_inferred_key_range(Lsn(0x10)..Lsn(0x48), delta2), @@ -9793,6 +9812,7 @@ mod tests { Lsn(0x10), DEFAULT_PG_VERSION, &ctx, + Vec::new(), // in-memory layers vec![ // delta1 and delta 2 only contain a single key but multiple updates DeltaLayerTestDesc::new_with_inferred_key_range(Lsn(0x10)..Lsn(0x30), delta1), @@ -10028,6 +10048,7 @@ mod tests { Lsn(0x10), DEFAULT_PG_VERSION, &ctx, + vec![], // in-memory layers vec![], // delta layers vec![(Lsn(0x18), img_layer)], // image layers Lsn(0x18), @@ -10274,6 +10295,7 @@ mod tests { baseline_image_layer_lsn, DEFAULT_PG_VERSION, &ctx, + vec![], // in-memory layers vec![DeltaLayerTestDesc::new_with_inferred_key_range( delta_layer_start_lsn..delta_layer_end_lsn, delta_layer_spec, @@ -10305,6 +10327,158 @@ mod tests { Ok(()) } + #[cfg(feature = "testing")] + #[tokio::test] + async fn test_vectored_read_with_image_layer_inside_inmem() -> anyhow::Result<()> { + let harness = + TenantHarness::create("test_vectored_read_with_image_layer_inside_inmem").await?; + let (tenant, ctx) = harness.load().await; + + let will_init_keys = [2, 6]; + fn get_key(id: u32) -> Key { + let mut key = Key::from_hex("110000000033333333444444445500000000").unwrap(); + key.field6 = id; + key + } + + let mut expected_key_values = HashMap::new(); + + let baseline_image_layer_lsn = Lsn(0x10); + let mut baseline_img_layer = Vec::new(); + for i in 0..5 { + let key = get_key(i); + let value = format!("value {i}@{baseline_image_layer_lsn}"); + + let removed = expected_key_values.insert(key, value.clone()); + assert!(removed.is_none()); + + baseline_img_layer.push((key, Bytes::from(value))); + } + + let nested_image_layer_lsn = Lsn(0x50); + let mut nested_img_layer = Vec::new(); + for i in 5..10 { + let key = get_key(i); + let value = format!("value {i}@{nested_image_layer_lsn}"); + + let removed = expected_key_values.insert(key, value.clone()); + assert!(removed.is_none()); + + nested_img_layer.push((key, Bytes::from(value))); + } + + let frozen_layer = { + let lsn_range = Lsn(0x40)..Lsn(0x60); + let mut data = Vec::new(); + for i in 0..10 { + let key = get_key(i); + let key_in_nested = nested_img_layer + .iter() + .any(|(key_with_img, _)| *key_with_img == key); + let lsn = { + if key_in_nested { + Lsn(nested_image_layer_lsn.0 + 5) + } else { + lsn_range.start + } + }; + + let will_init = will_init_keys.contains(&i); + if will_init { + data.push((key, lsn, Value::WalRecord(NeonWalRecord::wal_init("")))); + + expected_key_values.insert(key, "".to_string()); + } else { + let delta = format!("@{lsn}"); + data.push(( + key, + lsn, + Value::WalRecord(NeonWalRecord::wal_append(&delta)), + )); + + expected_key_values + .get_mut(&key) + .expect("An image exists for each key") + .push_str(delta.as_str()); + } + } + + InMemoryLayerTestDesc { + lsn_range, + is_open: false, + data, + } + }; + + let (open_layer, last_record_lsn) = { + let start_lsn = Lsn(0x70); + let mut data = Vec::new(); + let mut end_lsn = Lsn(0); + for i in 0..10 { + let key = get_key(i); + let lsn = Lsn(start_lsn.0 + i as u64); + let delta = format!("@{lsn}"); + data.push(( + key, + lsn, + Value::WalRecord(NeonWalRecord::wal_append(&delta)), + )); + + expected_key_values + .get_mut(&key) + .expect("An image exists for each key") + .push_str(delta.as_str()); + + end_lsn = std::cmp::max(end_lsn, lsn); + } + + ( + InMemoryLayerTestDesc { + lsn_range: start_lsn..Lsn::MAX, + is_open: true, + data, + }, + end_lsn, + ) + }; + + assert!( + nested_image_layer_lsn > frozen_layer.lsn_range.start + && nested_image_layer_lsn < frozen_layer.lsn_range.end + ); + + let tline = tenant + .create_test_timeline_with_layers( + TIMELINE_ID, + baseline_image_layer_lsn, + DEFAULT_PG_VERSION, + &ctx, + vec![open_layer, frozen_layer], // in-memory layers + Vec::new(), // delta layers + vec![ + (baseline_image_layer_lsn, baseline_img_layer), + (nested_image_layer_lsn, nested_img_layer), + ], // image layers + last_record_lsn, + ) + .await?; + + let keyspace = KeySpace::single(get_key(0)..get_key(10)); + let results = tline + .get_vectored(keyspace, last_record_lsn, IoConcurrency::sequential(), &ctx) + .await + .expect("No vectored errors"); + for (key, res) in results { + let value = res.expect("No key errors"); + let expected_value = expected_key_values.remove(&key).expect("No unknown keys"); + assert_eq!(value, Bytes::from(expected_value.clone())); + + tracing::info!("key={key} value={expected_value}"); + } + + Ok(()) + } + fn sort_layer_key(k1: &PersistentLayerKey, k2: &PersistentLayerKey) -> std::cmp::Ordering { ( k1.is_delta, @@ -10420,6 +10594,7 @@ mod tests { Lsn(0x10), DEFAULT_PG_VERSION, &ctx, + vec![], // in-memory layers vec![ DeltaLayerTestDesc::new_with_inferred_key_range(Lsn(0x20)..Lsn(0x48), delta1), DeltaLayerTestDesc::new_with_inferred_key_range(Lsn(0x20)..Lsn(0x48), delta2), @@ -10804,6 +10979,7 @@ mod tests { Lsn(0x10), DEFAULT_PG_VERSION, &ctx, + vec![], // in-memory layers vec![ // delta1/2/4 only contain a single key but multiple updates DeltaLayerTestDesc::new_with_inferred_key_range(Lsn(0x20)..Lsn(0x28), delta1), @@ -11055,6 +11231,7 @@ mod tests { Lsn(0x10), DEFAULT_PG_VERSION, &ctx, + vec![], // in-memory layers vec![ // delta1/2/4 only contain a single key but multiple updates DeltaLayerTestDesc::new_with_inferred_key_range(Lsn(0x20)..Lsn(0x28), delta1), diff --git a/pageserver/src/tenant/layer_map.rs b/pageserver/src/tenant/layer_map.rs index 59f5a6bd90..2b04e53f10 100644 --- a/pageserver/src/tenant/layer_map.rs +++ b/pageserver/src/tenant/layer_map.rs @@ -62,8 +62,7 @@ use utils::lsn::Lsn; use super::storage_layer::{LayerVisibilityHint, PersistentLayerDesc}; use crate::context::RequestContext; -use crate::keyspace::KeyPartitioning; -use crate::tenant::storage_layer::InMemoryLayer; +use crate::tenant::storage_layer::{InMemoryLayer, ReadableLayerWeak}; /// /// LayerMap tracks what layers exist on a timeline. @@ -167,7 +166,7 @@ impl Drop for BatchedUpdates<'_> { /// Return value of LayerMap::search #[derive(Eq, PartialEq, Debug, Hash)] pub struct SearchResult { - pub layer: Arc, + pub layer: ReadableLayerWeak, pub lsn_floor: Lsn, } @@ -175,19 +174,37 @@ pub struct SearchResult { /// /// Contains a mapping from a layer description to a keyspace /// accumulator that contains all the keys which intersect the layer -/// from the original search space. Keys that were not found are accumulated -/// in a separate key space accumulator. +/// from the original search space. #[derive(Debug)] pub struct RangeSearchResult { pub found: HashMap, - pub not_found: KeySpaceAccum, } impl RangeSearchResult { fn new() -> Self { Self { found: HashMap::new(), - not_found: KeySpaceAccum::new(), + } + } + + fn map_to_in_memory_layer( + in_memory_layer: Option, + range: Range, + ) -> RangeSearchResult { + match in_memory_layer { + Some(inmem) => { + let search_result = SearchResult { + lsn_floor: inmem.get_lsn_range().start, + layer: ReadableLayerWeak::InMemoryLayer(inmem), + }; + + let mut accum = KeySpaceAccum::new(); + accum.add_range(range); + RangeSearchResult { + found: HashMap::from([(search_result, accum)]), + } + } + None => RangeSearchResult::new(), } } } @@ -199,6 +216,7 @@ struct RangeSearchCollector where Iter: Iterator>)>, { + in_memory_layer: Option, delta_coverage: Peekable, image_coverage: Peekable, key_range: Range, @@ -234,10 +252,12 @@ where fn new( key_range: Range, end_lsn: Lsn, + in_memory_layer: Option, delta_coverage: Iter, image_coverage: Iter, ) -> Self { Self { + in_memory_layer, delta_coverage: delta_coverage.peekable(), image_coverage: image_coverage.peekable(), key_range, @@ -266,8 +286,7 @@ where return self.result; } Some(layer_type) => { - // Changes for the range exist. Record anything before the first - // coverage change as not found. + // Changes for the range exist. let coverage_start = layer_type.next_change_at_key(); let range_before = self.key_range.start..coverage_start; self.pad_range(range_before); @@ -297,10 +316,22 @@ where self.result } - /// Mark a range as not found (i.e. no layers intersect it) + /// Map a range which does not intersect any persistent layers to + /// the in-memory layer candidate. fn pad_range(&mut self, key_range: Range) { if !key_range.is_empty() { - self.result.not_found.add_range(key_range); + if let Some(ref inmem) = self.in_memory_layer { + let search_result = SearchResult { + layer: ReadableLayerWeak::InMemoryLayer(inmem.clone()), + lsn_floor: inmem.get_lsn_range().start, + }; + + self.result + .found + .entry(search_result) + .or_default() + .add_range(key_range); + } } } @@ -310,6 +341,7 @@ where let selected = LayerMap::select_layer( self.current_delta.clone(), self.current_image.clone(), + self.in_memory_layer.clone(), self.end_lsn, ); @@ -365,6 +397,24 @@ where } } +#[derive(Debug, PartialEq, Eq, Clone, Hash)] +pub struct InMemoryLayerDesc { + handle: InMemoryLayerHandle, + lsn_range: Range, +} + +impl InMemoryLayerDesc { + pub(crate) fn get_lsn_range(&self) -> Range { + self.lsn_range.clone() + } +} + +#[derive(Debug, PartialEq, Eq, Clone, Hash)] +enum InMemoryLayerHandle { + Open, + Frozen(usize), +} + impl LayerMap { /// /// Find the latest layer (by lsn.end) that covers the given @@ -394,69 +444,161 @@ impl LayerMap { /// layer result, or simplify the api to `get_latest_image` and /// `get_latest_delta`, and only call `get_latest_image` once. /// - /// NOTE: This only searches the 'historic' layers, *not* the - /// 'open' and 'frozen' layers! - /// pub fn search(&self, key: Key, end_lsn: Lsn) -> Option { - let version = self.historic.get().unwrap().get_version(end_lsn.0 - 1)?; + let in_memory_layer = self.search_in_memory_layer(end_lsn); + + let version = match self.historic.get().unwrap().get_version(end_lsn.0 - 1) { + Some(version) => version, + None => { + return in_memory_layer.map(|desc| SearchResult { + lsn_floor: desc.get_lsn_range().start, + layer: ReadableLayerWeak::InMemoryLayer(desc), + }); + } + }; + let latest_delta = version.delta_coverage.query(key.to_i128()); let latest_image = version.image_coverage.query(key.to_i128()); - Self::select_layer(latest_delta, latest_image, end_lsn) + Self::select_layer(latest_delta, latest_image, in_memory_layer, end_lsn) } + /// Select a layer from three potential candidates (in-memory, delta and image layer). + /// The candidates represent the first layer of each type which intersect a key range. + /// + /// Layer types have an in implicit priority (image > delta > in-memory). For instance, + /// if we have the option of reading an LSN range from both an image and a delta, we + /// should read from the image. fn select_layer( delta_layer: Option>, image_layer: Option>, + in_memory_layer: Option, end_lsn: Lsn, ) -> Option { assert!(delta_layer.as_ref().is_none_or(|l| l.is_delta())); assert!(image_layer.as_ref().is_none_or(|l| !l.is_delta())); - match (delta_layer, image_layer) { - (None, None) => None, - (None, Some(image)) => { + match (delta_layer, image_layer, in_memory_layer) { + (None, None, None) => None, + (None, Some(image), None) => { let lsn_floor = image.get_lsn_range().start; Some(SearchResult { - layer: image, + layer: ReadableLayerWeak::PersistentLayer(image), lsn_floor, }) } - (Some(delta), None) => { + (Some(delta), None, None) => { let lsn_floor = delta.get_lsn_range().start; Some(SearchResult { - layer: delta, + layer: ReadableLayerWeak::PersistentLayer(delta), lsn_floor, }) } - (Some(delta), Some(image)) => { + (Some(delta), Some(image), None) => { let img_lsn = image.get_lsn_range().start; let image_is_newer = image.get_lsn_range().end >= delta.get_lsn_range().end; let image_exact_match = img_lsn + 1 == end_lsn; if image_is_newer || image_exact_match { Some(SearchResult { - layer: image, + layer: ReadableLayerWeak::PersistentLayer(image), + lsn_floor: img_lsn, + }) + } else { + // If the delta overlaps with the image in the LSN dimension, do a partial + // up to the image layer. + let lsn_floor = + std::cmp::max(delta.get_lsn_range().start, image.get_lsn_range().start + 1); + Some(SearchResult { + layer: ReadableLayerWeak::PersistentLayer(delta), + lsn_floor, + }) + } + } + (None, None, Some(inmem)) => { + let lsn_floor = inmem.get_lsn_range().start; + Some(SearchResult { + layer: ReadableLayerWeak::InMemoryLayer(inmem), + lsn_floor, + }) + } + (None, Some(image), Some(inmem)) => { + // If the in-memory layer overlaps with the image in the LSN dimension, do a partial + // up to the image layer. + let img_lsn = image.get_lsn_range().start; + let image_is_newer = image.get_lsn_range().end >= inmem.get_lsn_range().end; + let image_exact_match = img_lsn + 1 == end_lsn; + if image_is_newer || image_exact_match { + Some(SearchResult { + layer: ReadableLayerWeak::PersistentLayer(image), lsn_floor: img_lsn, }) } else { let lsn_floor = - std::cmp::max(delta.get_lsn_range().start, image.get_lsn_range().start + 1); + std::cmp::max(inmem.get_lsn_range().start, image.get_lsn_range().start + 1); Some(SearchResult { - layer: delta, + layer: ReadableLayerWeak::InMemoryLayer(inmem), lsn_floor, }) } } + (Some(delta), None, Some(inmem)) => { + // Overlaps between delta and in-memory layers are not a valid + // state, but we handle them here for completeness. + let delta_end = delta.get_lsn_range().end; + let delta_is_newer = delta_end >= inmem.get_lsn_range().end; + let delta_exact_match = delta_end == end_lsn; + if delta_is_newer || delta_exact_match { + Some(SearchResult { + lsn_floor: delta.get_lsn_range().start, + layer: ReadableLayerWeak::PersistentLayer(delta), + }) + } else { + // If the in-memory layer overlaps with the delta in the LSN dimension, do a partial + // up to the delta layer. + let lsn_floor = + std::cmp::max(inmem.get_lsn_range().start, delta.get_lsn_range().end); + Some(SearchResult { + layer: ReadableLayerWeak::InMemoryLayer(inmem), + lsn_floor, + }) + } + } + (Some(delta), Some(image), Some(inmem)) => { + // Determine the preferred persistent layer without taking the in-memory layer + // into consideration. + let persistent_res = + Self::select_layer(Some(delta.clone()), Some(image.clone()), None, end_lsn) + .unwrap(); + let persistent_l = match persistent_res.layer { + ReadableLayerWeak::PersistentLayer(l) => l, + ReadableLayerWeak::InMemoryLayer(_) => unreachable!(), + }; + + // Now handle the in-memory layer overlaps. + let inmem_res = if persistent_l.is_delta() { + Self::select_layer(Some(persistent_l), None, Some(inmem.clone()), end_lsn) + .unwrap() + } else { + Self::select_layer(None, Some(persistent_l), Some(inmem.clone()), end_lsn) + .unwrap() + }; + + Some(SearchResult { + layer: inmem_res.layer, + // Use the more restrictive LSN floor + lsn_floor: std::cmp::max(persistent_res.lsn_floor, inmem_res.lsn_floor), + }) + } } } pub fn range_search(&self, key_range: Range, end_lsn: Lsn) -> RangeSearchResult { + let in_memory_layer = self.search_in_memory_layer(end_lsn); + let version = match self.historic.get().unwrap().get_version(end_lsn.0 - 1) { Some(version) => version, None => { - let mut result = RangeSearchResult::new(); - result.not_found.add_range(key_range); - return result; + return RangeSearchResult::map_to_in_memory_layer(in_memory_layer, key_range); } }; @@ -464,7 +606,13 @@ impl LayerMap { let delta_changes = version.delta_coverage.range_overlaps(&raw_range); let image_changes = version.image_coverage.range_overlaps(&raw_range); - let collector = RangeSearchCollector::new(key_range, end_lsn, delta_changes, image_changes); + let collector = RangeSearchCollector::new( + key_range, + end_lsn, + in_memory_layer, + delta_changes, + image_changes, + ); collector.collect() } @@ -571,17 +719,36 @@ impl LayerMap { } /// Get a ref counted pointer for the first in memory layer that matches the provided predicate. - pub fn find_in_memory_layer(&self, mut pred: Pred) -> Option> - where - Pred: FnMut(&Arc) -> bool, - { + pub(crate) fn search_in_memory_layer(&self, below: Lsn) -> Option { + let is_below = |l: &Arc| { + let start_lsn = l.get_lsn_range().start; + below > start_lsn + }; + if let Some(open) = &self.open_layer { - if pred(open) { - return Some(open.clone()); + if is_below(open) { + return Some(InMemoryLayerDesc { + handle: InMemoryLayerHandle::Open, + lsn_range: open.get_lsn_range(), + }); } } - self.frozen_layers.iter().rfind(|l| pred(l)).cloned() + self.frozen_layers + .iter() + .enumerate() + .rfind(|(_idx, l)| is_below(l)) + .map(|(idx, l)| InMemoryLayerDesc { + handle: InMemoryLayerHandle::Frozen(idx), + lsn_range: l.get_lsn_range(), + }) + } + + pub(crate) fn in_memory_layer(&self, desc: &InMemoryLayerDesc) -> Arc { + match desc.handle { + InMemoryLayerHandle::Open => self.open_layer.as_ref().unwrap().clone(), + InMemoryLayerHandle::Frozen(idx) => self.frozen_layers[idx].clone(), + } } /// @@ -737,136 +904,6 @@ impl LayerMap { max_stacked_deltas } - /// Count how many reimage-worthy layers we need to visit for given key-lsn pair. - /// - /// The `partition_range` argument is used as context for the reimage-worthiness decision. - /// - /// Used as a helper for correctness checks only. Performance not critical. - pub fn get_difficulty(&self, lsn: Lsn, key: Key, partition_range: &Range) -> usize { - match self.search(key, lsn) { - Some(search_result) => { - if search_result.layer.is_incremental() { - (Self::is_reimage_worthy(&search_result.layer, partition_range) as usize) - + self.get_difficulty(search_result.lsn_floor, key, partition_range) - } else { - 0 - } - } - None => 0, - } - } - - /// Used for correctness checking. Results are expected to be identical to - /// self.get_difficulty_map. Assumes self.search is correct. - pub fn get_difficulty_map_bruteforce( - &self, - lsn: Lsn, - partitioning: &KeyPartitioning, - ) -> Vec { - // Looking at the difficulty as a function of key, it could only increase - // when a delta layer starts or an image layer ends. Therefore it's sufficient - // to check the difficulties at: - // - the key.start for each non-empty part range - // - the key.start for each delta - // - the key.end for each image - let keys_iter: Box> = { - let mut keys: Vec = self - .iter_historic_layers() - .map(|layer| { - if layer.is_incremental() { - layer.get_key_range().start - } else { - layer.get_key_range().end - } - }) - .collect(); - keys.sort(); - Box::new(keys.into_iter()) - }; - let mut keys_iter = keys_iter.peekable(); - - // Iter the partition and keys together and query all the necessary - // keys, computing the max difficulty for each part. - partitioning - .parts - .iter() - .map(|part| { - let mut difficulty = 0; - // Partition ranges are assumed to be sorted and disjoint - // TODO assert it - for range in &part.ranges { - if !range.is_empty() { - difficulty = - std::cmp::max(difficulty, self.get_difficulty(lsn, range.start, range)); - } - while let Some(key) = keys_iter.peek() { - if key >= &range.end { - break; - } - let key = keys_iter.next().unwrap(); - if key < range.start { - continue; - } - difficulty = - std::cmp::max(difficulty, self.get_difficulty(lsn, key, range)); - } - } - difficulty - }) - .collect() - } - - /// For each part of a keyspace partitioning, return the maximum number of layers - /// that would be needed for page reconstruction in that part at the given LSN. - /// - /// If `limit` is provided we don't try to count above that number. - /// - /// This method is used to decide where to create new image layers. Computing the - /// result for the entire partitioning at once allows this function to be more - /// efficient, and further optimization is possible by using iterators instead, - /// to allow early return. - /// - /// TODO actually use this method instead of count_deltas. Currently we only use - /// it for benchmarks. - pub fn get_difficulty_map( - &self, - lsn: Lsn, - partitioning: &KeyPartitioning, - limit: Option, - ) -> Vec { - // TODO This is a naive implementation. Perf improvements to do: - // 1. Instead of calling self.image_coverage and self.count_deltas, - // iterate the image and delta coverage only once. - partitioning - .parts - .iter() - .map(|part| { - let mut difficulty = 0; - for range in &part.ranges { - if limit == Some(difficulty) { - break; - } - for (img_range, last_img) in self.image_coverage(range, lsn) { - if limit == Some(difficulty) { - break; - } - let img_lsn = if let Some(last_img) = last_img { - last_img.get_lsn_range().end - } else { - Lsn(0) - }; - - if img_lsn < lsn { - let num_deltas = self.count_deltas(&img_range, &(img_lsn..lsn), limit); - difficulty = std::cmp::max(difficulty, num_deltas); - } - } - } - difficulty - }) - .collect() - } - /// Return all L0 delta layers pub fn level0_deltas(&self) -> &Vec> { &self.l0_delta_layers @@ -1069,6 +1106,10 @@ mod tests { use std::collections::HashMap; use std::path::PathBuf; + use crate::{ + DEFAULT_PG_VERSION, + tenant::{harness::TenantHarness, storage_layer::LayerName}, + }; use pageserver_api::key::DBDIR_KEY; use pageserver_api::keyspace::{KeySpace, KeySpaceRandomAccum}; use utils::id::{TenantId, TimelineId}; @@ -1076,7 +1117,6 @@ mod tests { use super::*; use crate::tenant::IndexPart; - use crate::tenant::storage_layer::LayerName; #[derive(Clone)] struct LayerDesc { @@ -1101,7 +1141,6 @@ mod tests { } fn assert_range_search_result_eq(lhs: RangeSearchResult, rhs: RangeSearchResult) { - assert_eq!(lhs.not_found.to_keyspace(), rhs.not_found.to_keyspace()); let lhs: HashMap = lhs .found .into_iter() @@ -1127,17 +1166,12 @@ mod tests { let mut key = key_range.start; while key != key_range.end { let res = layer_map.search(key, end_lsn); - match res { - Some(res) => { - range_search_result - .found - .entry(res) - .or_default() - .add_key(key); - } - None => { - range_search_result.not_found.add_key(key); - } + if let Some(res) = res { + range_search_result + .found + .entry(res) + .or_default() + .add_key(key); } key = key.next(); @@ -1152,20 +1186,49 @@ mod tests { let range = Key::from_i128(100)..Key::from_i128(200); let res = layer_map.range_search(range.clone(), Lsn(100)); - assert_eq!( - res.not_found.to_keyspace(), - KeySpace { - ranges: vec![range] - } - ); + assert_range_search_result_eq(res, RangeSearchResult::new()); } - #[test] - fn ranged_search() { + #[tokio::test] + async fn ranged_search() { + let harness = TenantHarness::create("ranged_search").await.unwrap(); + let (tenant, ctx) = harness.load().await; + let timeline_id = TimelineId::generate(); + // Create the timeline such that the in-memory layers can be written + // to the timeline directory. + tenant + .create_test_timeline(timeline_id, Lsn(0x10), DEFAULT_PG_VERSION, &ctx) + .await + .unwrap(); + + let gate = utils::sync::gate::Gate::default(); + let add_in_memory_layer = async |layer_map: &mut LayerMap, lsn_range: Range| { + let layer = InMemoryLayer::create( + harness.conf, + timeline_id, + harness.tenant_shard_id, + lsn_range.start, + &gate, + &ctx, + ) + .await + .unwrap(); + + layer.freeze(lsn_range.end).await; + + layer_map.frozen_layers.push_back(Arc::new(layer)); + }; + + let in_memory_layer_configurations = [ + vec![], + // Overlaps with the top-most image + vec![Lsn(35)..Lsn(50)], + ]; + let layers = vec![ LayerDesc { key_range: Key::from_i128(15)..Key::from_i128(50), - lsn_range: Lsn(0)..Lsn(5), + lsn_range: Lsn(5)..Lsn(6), is_delta: false, }, LayerDesc { @@ -1185,19 +1248,27 @@ mod tests { }, LayerDesc { key_range: Key::from_i128(35)..Key::from_i128(40), - lsn_range: Lsn(35)..Lsn(40), + lsn_range: Lsn(40)..Lsn(41), is_delta: false, }, ]; - let layer_map = create_layer_map(layers.clone()); - for start in 0..60 { - for end in (start + 1)..60 { - let range = Key::from_i128(start)..Key::from_i128(end); - let result = layer_map.range_search(range.clone(), Lsn(100)); - let expected = brute_force_range_search(&layer_map, range, Lsn(100)); + let mut layer_map = create_layer_map(layers.clone()); + for in_memory_layers in in_memory_layer_configurations { + for in_mem_layer_range in in_memory_layers { + add_in_memory_layer(&mut layer_map, in_mem_layer_range).await; + } - assert_range_search_result_eq(result, expected); + for start in 0..60 { + for end in (start + 1)..60 { + let range = Key::from_i128(start)..Key::from_i128(end); + let result = layer_map.range_search(range.clone(), Lsn(100)); + let expected = brute_force_range_search(&layer_map, range, Lsn(100)); + + eprintln!("{start}..{end}: {result:?}"); + + assert_range_search_result_eq(result, expected); + } } } } @@ -1490,12 +1561,348 @@ mod tests { // Sanity: the layer that holds latest data for the DBDIR key should always be visible // (just using this key as a key that will always exist for any layermap fixture) - let dbdir_layer = layer_map - .search(DBDIR_KEY, index.metadata.disk_consistent_lsn()) - .unwrap(); + let dbdir_layer = { + let readable_layer = layer_map + .search(DBDIR_KEY, index.metadata.disk_consistent_lsn()) + .unwrap(); + + match readable_layer.layer { + ReadableLayerWeak::PersistentLayer(desc) => desc, + ReadableLayerWeak::InMemoryLayer(_) => unreachable!(""), + } + }; assert!(matches!( - layer_visibilities.get(&dbdir_layer.layer).unwrap(), + layer_visibilities.get(&dbdir_layer).unwrap(), LayerVisibilityHint::Visible )); } } + +#[cfg(test)] +mod select_layer_tests { + use super::*; + + fn create_persistent_layer( + start_lsn: u64, + end_lsn: u64, + is_delta: bool, + ) -> Arc { + if !is_delta { + assert_eq!(end_lsn, start_lsn + 1); + } + + Arc::new(PersistentLayerDesc::new_test( + Key::MIN..Key::MAX, + Lsn(start_lsn)..Lsn(end_lsn), + is_delta, + )) + } + + fn create_inmem_layer(start_lsn: u64, end_lsn: u64) -> InMemoryLayerDesc { + InMemoryLayerDesc { + handle: InMemoryLayerHandle::Open, + lsn_range: Lsn(start_lsn)..Lsn(end_lsn), + } + } + + #[test] + fn test_select_layer_empty() { + assert!(LayerMap::select_layer(None, None, None, Lsn(100)).is_none()); + } + + #[test] + fn test_select_layer_only_delta() { + let delta = create_persistent_layer(10, 20, true); + let result = LayerMap::select_layer(Some(delta.clone()), None, None, Lsn(100)).unwrap(); + + assert_eq!(result.lsn_floor, Lsn(10)); + assert!( + matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &delta)) + ); + } + + #[test] + fn test_select_layer_only_image() { + let image = create_persistent_layer(10, 11, false); + let result = LayerMap::select_layer(None, Some(image.clone()), None, Lsn(100)).unwrap(); + + assert_eq!(result.lsn_floor, Lsn(10)); + assert!( + matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &image)) + ); + } + + #[test] + fn test_select_layer_only_inmem() { + let inmem = create_inmem_layer(10, 20); + let result = LayerMap::select_layer(None, None, Some(inmem.clone()), Lsn(100)).unwrap(); + + assert_eq!(result.lsn_floor, Lsn(10)); + assert!(matches!(result.layer, ReadableLayerWeak::InMemoryLayer(l) if l == inmem)); + } + + #[test] + fn test_select_layer_image_inside_delta() { + let delta = create_persistent_layer(10, 20, true); + let image = create_persistent_layer(15, 16, false); + + let result = + LayerMap::select_layer(Some(delta.clone()), Some(image.clone()), None, Lsn(100)) + .unwrap(); + + assert_eq!(result.lsn_floor, Lsn(16)); + assert!( + matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &delta)) + ); + + let result = LayerMap::select_layer( + Some(delta.clone()), + Some(image.clone()), + None, + result.lsn_floor, + ) + .unwrap(); + + assert_eq!(result.lsn_floor, Lsn(15)); + assert!( + matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &image)) + ); + } + + #[test] + fn test_select_layer_newer_image() { + let delta = create_persistent_layer(10, 20, true); + let image = create_persistent_layer(25, 26, false); + + let result = + LayerMap::select_layer(Some(delta.clone()), Some(image.clone()), None, Lsn(30)) + .unwrap(); + + assert_eq!(result.lsn_floor, Lsn(25)); + assert!( + matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &image)) + ); + + let result = + LayerMap::select_layer(Some(delta.clone()), None, None, result.lsn_floor).unwrap(); + + assert_eq!(result.lsn_floor, Lsn(10)); + assert!( + matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &delta)) + ); + } + + #[test] + fn test_select_layer_delta_with_older_image() { + let delta = create_persistent_layer(15, 25, true); + let image = create_persistent_layer(10, 11, false); + + let result = + LayerMap::select_layer(Some(delta.clone()), Some(image.clone()), None, Lsn(30)) + .unwrap(); + + assert_eq!(result.lsn_floor, Lsn(15)); + assert!( + matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &delta)) + ); + + let result = + LayerMap::select_layer(None, Some(image.clone()), None, result.lsn_floor).unwrap(); + + assert_eq!(result.lsn_floor, Lsn(10)); + assert!( + matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &image)) + ); + } + + #[test] + fn test_select_layer_image_inside_inmem() { + let image = create_persistent_layer(15, 16, false); + let inmem = create_inmem_layer(10, 25); + + let result = + LayerMap::select_layer(None, Some(image.clone()), Some(inmem.clone()), Lsn(30)) + .unwrap(); + + assert_eq!(result.lsn_floor, Lsn(16)); + assert!(matches!(result.layer, ReadableLayerWeak::InMemoryLayer(l) if l == inmem)); + + let result = LayerMap::select_layer( + None, + Some(image.clone()), + Some(inmem.clone()), + result.lsn_floor, + ) + .unwrap(); + + assert_eq!(result.lsn_floor, Lsn(15)); + assert!( + matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &image)) + ); + + let result = + LayerMap::select_layer(None, None, Some(inmem.clone()), result.lsn_floor).unwrap(); + assert_eq!(result.lsn_floor, Lsn(10)); + assert!(matches!(result.layer, ReadableLayerWeak::InMemoryLayer(l) if l == inmem)); + } + + #[test] + fn test_select_layer_delta_inside_inmem() { + let delta_top = create_persistent_layer(15, 20, true); + let delta_bottom = create_persistent_layer(10, 15, true); + let inmem = create_inmem_layer(15, 25); + + let result = + LayerMap::select_layer(Some(delta_top.clone()), None, Some(inmem.clone()), Lsn(30)) + .unwrap(); + + assert_eq!(result.lsn_floor, Lsn(20)); + assert!(matches!(result.layer, ReadableLayerWeak::InMemoryLayer(l) if l == inmem)); + + let result = LayerMap::select_layer( + Some(delta_top.clone()), + None, + Some(inmem.clone()), + result.lsn_floor, + ) + .unwrap(); + + assert_eq!(result.lsn_floor, Lsn(15)); + assert!( + matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &delta_top)) + ); + + let result = LayerMap::select_layer( + Some(delta_bottom.clone()), + None, + Some(inmem.clone()), + result.lsn_floor, + ) + .unwrap(); + assert_eq!(result.lsn_floor, Lsn(10)); + assert!( + matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &delta_bottom)) + ); + } + + #[test] + fn test_select_layer_all_overlap_1() { + let inmem = create_inmem_layer(10, 30); + let delta = create_persistent_layer(15, 25, true); + let image = create_persistent_layer(20, 21, false); + + let result = LayerMap::select_layer( + Some(delta.clone()), + Some(image.clone()), + Some(inmem.clone()), + Lsn(50), + ) + .unwrap(); + + assert_eq!(result.lsn_floor, Lsn(25)); + assert!(matches!(result.layer, ReadableLayerWeak::InMemoryLayer(l) if l == inmem)); + + let result = LayerMap::select_layer( + Some(delta.clone()), + Some(image.clone()), + Some(inmem.clone()), + result.lsn_floor, + ) + .unwrap(); + + assert_eq!(result.lsn_floor, Lsn(21)); + assert!( + matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &delta)) + ); + + let result = LayerMap::select_layer( + Some(delta.clone()), + Some(image.clone()), + Some(inmem.clone()), + result.lsn_floor, + ) + .unwrap(); + + assert_eq!(result.lsn_floor, Lsn(20)); + assert!( + matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &image)) + ); + } + + #[test] + fn test_select_layer_all_overlap_2() { + let inmem = create_inmem_layer(20, 30); + let delta = create_persistent_layer(10, 40, true); + let image = create_persistent_layer(25, 26, false); + + let result = LayerMap::select_layer( + Some(delta.clone()), + Some(image.clone()), + Some(inmem.clone()), + Lsn(50), + ) + .unwrap(); + + assert_eq!(result.lsn_floor, Lsn(26)); + assert!( + matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &delta)) + ); + + let result = LayerMap::select_layer( + Some(delta.clone()), + Some(image.clone()), + Some(inmem.clone()), + result.lsn_floor, + ) + .unwrap(); + + assert_eq!(result.lsn_floor, Lsn(25)); + assert!( + matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &image)) + ); + } + + #[test] + fn test_select_layer_all_overlap_3() { + let inmem = create_inmem_layer(30, 40); + let delta = create_persistent_layer(10, 30, true); + let image = create_persistent_layer(20, 21, false); + + let result = LayerMap::select_layer( + Some(delta.clone()), + Some(image.clone()), + Some(inmem.clone()), + Lsn(50), + ) + .unwrap(); + + assert_eq!(result.lsn_floor, Lsn(30)); + assert!(matches!(result.layer, ReadableLayerWeak::InMemoryLayer(l) if l == inmem)); + + let result = LayerMap::select_layer( + Some(delta.clone()), + Some(image.clone()), + None, + result.lsn_floor, + ) + .unwrap(); + + assert_eq!(result.lsn_floor, Lsn(21)); + assert!( + matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &delta)) + ); + + let result = LayerMap::select_layer( + Some(delta.clone()), + Some(image.clone()), + None, + result.lsn_floor, + ) + .unwrap(); + + assert_eq!(result.lsn_floor, Lsn(20)); + assert!( + matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &image)) + ); + } +} diff --git a/pageserver/src/tenant/layer_map/historic_layer_coverage.rs b/pageserver/src/tenant/layer_map/historic_layer_coverage.rs index f8bec48886..b3dc8e56a3 100644 --- a/pageserver/src/tenant/layer_map/historic_layer_coverage.rs +++ b/pageserver/src/tenant/layer_map/historic_layer_coverage.rs @@ -63,6 +63,8 @@ pub struct HistoricLayerCoverage { /// The latest state head: LayerCoverageTuple, + /// TODO: this could be an ordered vec using binary search. + /// We push into this map everytime we add a layer, so might see some benefit /// All previous states historic: BTreeMap>, } @@ -419,6 +421,10 @@ pub struct BufferedHistoricLayerCoverage { buffer: BTreeMap>, /// All current layers. This is not used for search. Only to make rebuilds easier. + // TODO: This map is never cleared. Rebuilds could use the post-trim last entry of + // [`Self::historic_coverage`] instead of doubling memory usage. + // [`Self::len`]: can require rebuild and serve from latest historic + // [`Self::iter`]: already requires rebuild => can serve from latest historic layers: BTreeMap, } diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index 7f313f46a2..ece163b24a 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -40,6 +40,7 @@ use utils::sync::gate::GateGuard; use self::inmemory_layer::InMemoryLayerFileId; use super::PageReconstructError; +use super::layer_map::InMemoryLayerDesc; use super::timeline::{GetVectoredError, ReadPath}; use crate::config::PageServerConf; use crate::context::{AccessStatsBehavior, RequestContext}; @@ -721,6 +722,12 @@ struct LayerToVisitId { lsn_floor: Lsn, } +#[derive(Debug, PartialEq, Eq, Hash)] +pub enum ReadableLayerWeak { + PersistentLayer(Arc), + InMemoryLayer(InMemoryLayerDesc), +} + /// Layer wrapper for the read path. Note that it is valid /// to use these layers even after external operations have /// been performed on them (compaction, freeze, etc.). @@ -873,7 +880,7 @@ impl ReadableLayer { } ReadableLayer::InMemoryLayer(layer) => { layer - .get_values_reconstruct_data(keyspace, lsn_range.end, reconstruct_state, ctx) + .get_values_reconstruct_data(keyspace, lsn_range, reconstruct_state, ctx) .await } } diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index ffdfe1dc27..46135b5330 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -416,7 +416,7 @@ impl InMemoryLayer { pub(crate) async fn get_values_reconstruct_data( self: &Arc, keyspace: KeySpace, - end_lsn: Lsn, + lsn_range: Range, reconstruct_state: &mut ValuesReconstructState, ctx: &RequestContext, ) -> Result<(), GetVectoredError> { @@ -433,8 +433,6 @@ impl InMemoryLayer { let mut reads: HashMap> = HashMap::new(); let mut ios: HashMap<(Key, Lsn), OnDiskValueIo> = Default::default(); - let lsn_range = self.start_lsn..end_lsn; - for range in keyspace.ranges.iter() { for (key, vec_map) in inner .index diff --git a/pageserver/src/tenant/storage_layer/layer/tests.rs b/pageserver/src/tenant/storage_layer/layer/tests.rs index d43dfefdbc..a7f3c6b8c5 100644 --- a/pageserver/src/tenant/storage_layer/layer/tests.rs +++ b/pageserver/src/tenant/storage_layer/layer/tests.rs @@ -49,6 +49,7 @@ async fn smoke_test() { Lsn(0x10), 14, &ctx, + Default::default(), // in-memory layers Default::default(), image_layers, Lsn(0x100), diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 851f84f603..17dbcee74e 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -3914,39 +3914,22 @@ impl Timeline { let guard = timeline.layers.read().await; let layers = guard.layer_map()?; - let in_memory_layer = layers.find_in_memory_layer(|l| { - let start_lsn = l.get_lsn_range().start; - cont_lsn > start_lsn - }); + for range in unmapped_keyspace.ranges.iter() { + let results = layers.range_search(range.clone(), cont_lsn); - match in_memory_layer { - Some(l) => { - let lsn_range = l.get_lsn_range().start..cont_lsn; - fringe.update( - ReadableLayer::InMemoryLayer(l), - unmapped_keyspace.clone(), - lsn_range, - ); - } - None => { - for range in unmapped_keyspace.ranges.iter() { - let results = layers.range_search(range.clone(), cont_lsn); - - results - .found - .into_iter() - .map(|(SearchResult { layer, lsn_floor }, keyspace_accum)| { - ( - ReadableLayer::PersistentLayer(guard.get_from_desc(&layer)), - keyspace_accum.to_keyspace(), - lsn_floor..cont_lsn, - ) - }) - .for_each(|(layer, keyspace, lsn_range)| { - fringe.update(layer, keyspace, lsn_range) - }); - } - } + results + .found + .into_iter() + .map(|(SearchResult { layer, lsn_floor }, keyspace_accum)| { + ( + guard.upgrade(layer), + keyspace_accum.to_keyspace(), + lsn_floor..cont_lsn, + ) + }) + .for_each(|(layer, keyspace, lsn_range)| { + fringe.update(layer, keyspace, lsn_range) + }); } // It's safe to drop the layer map lock after planning the next round of reads. @@ -5555,6 +5538,14 @@ pub struct DeltaLayerTestDesc { pub data: Vec<(Key, Lsn, Value)>, } +#[cfg(test)] +#[derive(Clone)] +pub struct InMemoryLayerTestDesc { + pub lsn_range: Range, + pub data: Vec<(Key, Lsn, Value)>, + pub is_open: bool, +} + #[cfg(test)] impl DeltaLayerTestDesc { pub fn new(lsn_range: Range, key_range: Range, data: Vec<(Key, Lsn, Value)>) -> Self { @@ -6567,6 +6558,92 @@ impl Timeline { Ok(()) } + /// Force create an in-memory layer and place them into the layer map. + #[cfg(test)] + pub(super) async fn force_create_in_memory_layer( + self: &Arc, + mut in_memory: InMemoryLayerTestDesc, + check_start_lsn: Option, + ctx: &RequestContext, + ) -> anyhow::Result<()> { + use utils::bin_ser::BeSer; + + // Validate LSNs + if let Some(check_start_lsn) = check_start_lsn { + assert!(in_memory.lsn_range.start >= check_start_lsn); + } + + let last_record_lsn = self.get_last_record_lsn(); + let layer_end_lsn = if in_memory.is_open { + in_memory + .data + .iter() + .map(|(_key, lsn, _value)| lsn) + .max() + .cloned() + } else { + Some(in_memory.lsn_range.end) + }; + + if let Some(end) = layer_end_lsn { + assert!( + end <= last_record_lsn, + "advance last record lsn before inserting a layer, end_lsn={}, last_record_lsn={}", + end, + last_record_lsn, + ); + } + + in_memory.data.iter().for_each(|(_key, lsn, _value)| { + assert!(*lsn >= in_memory.lsn_range.start); + assert!(*lsn < in_memory.lsn_range.end); + }); + + // Build the batch + in_memory + .data + .sort_unstable_by(|(ka, la, _), (kb, lb, _)| (ka, la).cmp(&(kb, lb))); + + let data = in_memory + .data + .into_iter() + .map(|(key, lsn, value)| { + let value_size = value.serialized_size().unwrap() as usize; + (key.to_compact(), lsn, value_size, value) + }) + .collect::>(); + + let batch = SerializedValueBatch::from_values(data); + + // Create the in-memory layer and write the batch into it + let layer = InMemoryLayer::create( + self.conf, + self.timeline_id, + self.tenant_shard_id, + in_memory.lsn_range.start, + &self.gate, + ctx, + ) + .await + .unwrap(); + + layer.put_batch(batch, ctx).await.unwrap(); + if !in_memory.is_open { + layer.freeze(in_memory.lsn_range.end).await; + } + + info!("force created in-memory layer {:?}", in_memory.lsn_range); + + // Link the layer to the layer map + { + let mut guard = self.layers.write().await; + let layer_map = guard.open_mut().unwrap(); + layer_map.force_insert_in_memory_layer(Arc::new(layer)); + } + + Ok(()) + } + /// Return all keys at the LSN in the image layers #[cfg(test)] pub(crate) async fn inspect_image_layers( @@ -6999,6 +7076,7 @@ mod tests { Lsn(0x10), 14, &ctx, + Vec::new(), // in-memory layers delta_layers, image_layers, Lsn(0x100), @@ -7132,6 +7210,7 @@ mod tests { Lsn(0x10), 14, &ctx, + Vec::new(), // in-memory layers delta_layers, image_layers, Lsn(0x100), diff --git a/pageserver/src/tenant/timeline/layer_manager.rs b/pageserver/src/tenant/timeline/layer_manager.rs index e552ea83de..1b489028dc 100644 --- a/pageserver/src/tenant/timeline/layer_manager.rs +++ b/pageserver/src/tenant/timeline/layer_manager.rs @@ -8,14 +8,14 @@ use tracing::trace; use utils::id::TimelineId; use utils::lsn::{AtomicLsn, Lsn}; -use super::TimelineWriterState; +use super::{ReadableLayer, TimelineWriterState}; use crate::config::PageServerConf; use crate::context::RequestContext; use crate::metrics::TimelineMetrics; use crate::tenant::layer_map::{BatchedUpdates, LayerMap}; use crate::tenant::storage_layer::{ AsLayerDesc, InMemoryLayer, Layer, LayerVisibilityHint, PersistentLayerDesc, - PersistentLayerKey, ResidentLayer, + PersistentLayerKey, ReadableLayerWeak, ResidentLayer, }; /// Provides semantic APIs to manipulate the layer map. @@ -37,6 +37,21 @@ impl Default for LayerManager { } impl LayerManager { + pub(crate) fn upgrade(&self, weak: ReadableLayerWeak) -> ReadableLayer { + match weak { + ReadableLayerWeak::PersistentLayer(desc) => { + ReadableLayer::PersistentLayer(self.get_from_desc(&desc)) + } + ReadableLayerWeak::InMemoryLayer(desc) => { + let inmem = self + .layer_map() + .expect("no concurrent shutdown") + .in_memory_layer(&desc); + ReadableLayer::InMemoryLayer(inmem) + } + } + } + pub(crate) fn get_from_key(&self, key: &PersistentLayerKey) -> Layer { // The assumption for the `expect()` is that all code maintains the following invariant: // A layer's descriptor is present in the LayerMap => the LayerFileManager contains a layer for the descriptor. @@ -470,6 +485,25 @@ impl OpenLayerManager { mapping.remove(layer); layer.delete_on_drop(); } + + #[cfg(test)] + pub(crate) fn force_insert_in_memory_layer(&mut self, layer: Arc) { + use pageserver_api::models::InMemoryLayerInfo; + + match layer.info() { + InMemoryLayerInfo::Open { .. } => { + assert!(self.layer_map.open_layer.is_none()); + self.layer_map.open_layer = Some(layer); + } + InMemoryLayerInfo::Frozen { lsn_start, .. } => { + if let Some(last) = self.layer_map.frozen_layers.back() { + assert!(last.get_lsn_range().end <= lsn_start); + } + + self.layer_map.frozen_layers.push_back(layer); + } + } + } } pub(crate) struct LayerFileManager(HashMap);