diff --git a/pageserver/src/tenant/layer_map.rs b/pageserver/src/tenant/layer_map.rs index c31d401e84..bb52e586d1 100644 --- a/pageserver/src/tenant/layer_map.rs +++ b/pageserver/src/tenant/layer_map.rs @@ -51,7 +51,10 @@ use crate::keyspace::KeyPartitioning; use crate::repository::Key; use crate::tenant::storage_layer::InMemoryLayer; use anyhow::Result; -use std::collections::VecDeque; +use pageserver_api::keyspace::KeySpaceAccum; +use std::cmp::Ordering; +use std::collections::{BTreeMap, VecDeque}; +use std::iter::Peekable; use std::ops::Range; use std::sync::Arc; use utils::lsn::Lsn; @@ -144,11 +147,221 @@ impl Drop for BatchedUpdates<'_> { } /// Return value of LayerMap::search +#[derive(Eq, PartialEq, Debug)] pub struct SearchResult { pub layer: Arc, pub lsn_floor: Lsn, } +pub struct OrderedSearchResult(SearchResult); + +impl Ord for OrderedSearchResult { + fn cmp(&self, other: &Self) -> Ordering { + self.0.lsn_floor.cmp(&other.0.lsn_floor) + } +} + +impl PartialOrd for OrderedSearchResult { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl PartialEq for OrderedSearchResult { + fn eq(&self, other: &Self) -> bool { + self.0.lsn_floor == other.0.lsn_floor + } +} + +impl Eq for OrderedSearchResult {} + +pub struct RangeSearchResult { + pub found: BTreeMap, + pub not_found: KeySpaceAccum, +} + +impl RangeSearchResult { + fn new() -> Self { + Self { + found: BTreeMap::new(), + not_found: KeySpaceAccum::new(), + } + } +} + +/// Collector for results of range search queries on the LayerMap. +/// It should be provided with two iterators for the delta and image coverage +/// that contain all the changes for layers which intersect the range. +struct RangeSearchCollector +where + Iter: Iterator>)>, +{ + delta_coverage: Peekable, + image_coverage: Peekable, + key_range: Range, + end_lsn: Lsn, + + current_delta: Option>, + current_image: Option>, + + result: RangeSearchResult, +} + +#[derive(Debug)] +enum NextLayerType { + Delta(i128), + Image(i128), + Both(i128), +} + +impl NextLayerType { + fn next_change_at_key(&self) -> Key { + match self { + NextLayerType::Delta(at) => Key::from_i128(*at), + NextLayerType::Image(at) => Key::from_i128(*at), + NextLayerType::Both(at) => Key::from_i128(*at), + } + } +} + +impl RangeSearchCollector +where + Iter: Iterator>)>, +{ + fn new( + key_range: Range, + end_lsn: Lsn, + delta_coverage: Iter, + image_coverage: Iter, + ) -> Self { + Self { + delta_coverage: delta_coverage.peekable(), + image_coverage: image_coverage.peekable(), + key_range, + end_lsn, + current_delta: None, + current_image: None, + result: RangeSearchResult::new(), + } + } + + /// Run the collector. Collection is implemented via a two pointer algorithm. + /// One pointer tracks the start of the current range and the other tracks + /// the beginning of the next range which will overlap with the next change + /// in coverage across both image and delta. + fn collect(mut self) -> RangeSearchResult { + let next_layer_type = self.choose_next_layer_type(); + let mut current_range_start = match next_layer_type { + None => { + // No changes for the range + self.pad_range(self.key_range.clone()); + return self.result; + } + Some(layer_type) if self.key_range.end <= layer_type.next_change_at_key() => { + // Changes only after the end of the range + self.pad_range(self.key_range.clone()); + return self.result; + } + Some(layer_type) => { + // Changes for the range exist. Record anything before the first + // coverage change as not found. + let coverage_start = layer_type.next_change_at_key(); + let range_before = self.key_range.start..coverage_start; + self.pad_range(range_before); + + self.advance(&layer_type); + coverage_start + } + }; + + while current_range_start < self.key_range.end { + let next_layer_type = self.choose_next_layer_type(); + match next_layer_type { + Some(t) => { + let current_range_end = t.next_change_at_key(); + self.add_range(current_range_start..current_range_end); + current_range_start = current_range_end; + + self.advance(&t); + } + None => { + self.add_range(current_range_start..self.key_range.end); + current_range_start = self.key_range.end; + } + } + } + + self.result + } + + /// Mark a range as not found (i.e. no layers intersect it) + fn pad_range(&mut self, key_range: Range) { + if !key_range.is_empty() { + self.result.not_found.add_range(key_range); + } + } + + /// Select the appropiate layer for the given range and update + /// the collector. + fn add_range(&mut self, covered_range: Range) { + let selected = LayerMap::select_layer( + self.current_delta.clone(), + self.current_image.clone(), + self.end_lsn, + ); + + match selected { + Some(search_result) => self + .result + .found + .entry(OrderedSearchResult(search_result)) + .or_default() + .add_range(covered_range), + None => self.pad_range(covered_range), + } + } + + /// Move to the next coverage change. + fn advance(&mut self, layer_type: &NextLayerType) { + match layer_type { + NextLayerType::Delta(_) => { + let (_, layer) = self.delta_coverage.next().unwrap(); + self.current_delta = layer; + } + NextLayerType::Image(_) => { + let (_, layer) = self.image_coverage.next().unwrap(); + self.current_image = layer; + } + NextLayerType::Both(_) => { + let (_, image_layer) = self.image_coverage.next().unwrap(); + let (_, delta_layer) = self.delta_coverage.next().unwrap(); + + self.current_image = image_layer; + self.current_delta = delta_layer; + } + } + } + + /// Pick the next coverage change: the one at the lesser key or both if they're alligned. + fn choose_next_layer_type(&mut self) -> Option { + let next_delta_at = self.delta_coverage.peek().map(|(key, _)| key); + let next_image_at = self.image_coverage.peek().map(|(key, _)| key); + + match (next_delta_at, next_image_at) { + (None, None) => None, + (Some(next_delta_at), None) => Some(NextLayerType::Delta(*next_delta_at)), + (None, Some(next_image_at)) => Some(NextLayerType::Image(*next_image_at)), + (Some(next_delta_at), Some(next_image_at)) if next_image_at < next_delta_at => { + Some(NextLayerType::Image(*next_image_at)) + } + (Some(next_delta_at), Some(next_image_at)) if next_delta_at < next_image_at => { + Some(NextLayerType::Delta(*next_delta_at)) + } + (Some(next_delta_at), Some(_)) => Some(NextLayerType::Both(*next_delta_at)), + } + } +} + impl LayerMap { /// /// Find the latest layer (by lsn.end) that covers the given @@ -186,7 +399,18 @@ impl LayerMap { let latest_delta = version.delta_coverage.query(key.to_i128()); let latest_image = version.image_coverage.query(key.to_i128()); - match (latest_delta, latest_image) { + Self::select_layer(latest_delta, latest_image, end_lsn) + } + + fn select_layer( + delta_layer: Option>, + image_layer: Option>, + end_lsn: Lsn, + ) -> Option { + assert!(delta_layer.as_ref().map_or(true, |l| l.is_delta())); + assert!(image_layer.as_ref().map_or(true, |l| !l.is_delta())); + + match (delta_layer, image_layer) { (None, None) => None, (None, Some(image)) => { let lsn_floor = image.get_lsn_range().start; @@ -223,6 +447,17 @@ impl LayerMap { } } + pub fn range_search(&self, key_range: Range, end_lsn: Lsn) -> Option { + let version = self.historic.get().unwrap().get_version(end_lsn.0 - 1)?; + + let raw_range = key_range.start.to_i128()..key_range.end.to_i128(); + let delta_changes = version.delta_coverage.range_overlaps(&raw_range); + let image_changes = version.image_coverage.range_overlaps(&raw_range); + + let collector = RangeSearchCollector::new(key_range, end_lsn, delta_changes, image_changes); + Some(collector.collect()) + } + /// Start a batch of updates, applied on drop pub fn batch_update(&mut self) -> BatchedUpdates<'_> { BatchedUpdates { layer_map: self } @@ -631,3 +866,126 @@ impl LayerMap { Ok(()) } } + +#[cfg(test)] +mod tests { + use super::*; + + #[derive(Clone)] + struct LayerDesc { + key_range: Range, + lsn_range: Range, + is_delta: bool, + } + + fn create_layer_map(layers: Vec) -> LayerMap { + let mut layer_map = LayerMap::default(); + + for layer in layers { + layer_map.insert_historic_noflush(PersistentLayerDesc::new_test( + layer.key_range, + layer.lsn_range, + layer.is_delta, + )); + } + + layer_map.flush_updates(); + layer_map + } + + fn assert_range_search_result_eq(lhs: RangeSearchResult, rhs: RangeSearchResult) { + assert_eq!(lhs.not_found.to_keyspace(), rhs.not_found.to_keyspace()); + let lhs: Vec<_> = lhs + .found + .into_iter() + .map(|(search_result, accum)| (search_result.0, accum.to_keyspace())) + .collect(); + let rhs: Vec<_> = rhs + .found + .into_iter() + .map(|(search_result, accum)| (search_result.0, accum.to_keyspace())) + .collect(); + + assert_eq!(lhs, rhs); + } + + fn brute_force_range_search( + layer_map: &LayerMap, + key_range: Range, + end_lsn: Lsn, + ) -> RangeSearchResult { + let mut range_search_result = RangeSearchResult::new(); + + let mut key = key_range.start; + while key != key_range.end { + let res = layer_map.search(key, end_lsn); + match res { + Some(res) => { + range_search_result + .found + .entry(OrderedSearchResult(res)) + .or_default() + .add_key(key); + } + None => { + range_search_result.not_found.add_key(key); + } + } + + key = key.next(); + } + + range_search_result + } + + #[test] + fn ranged_search_on_empty_layer_map() { + let layer_map = LayerMap::default(); + let range = Key::from_i128(100)..Key::from_i128(200); + + let res = layer_map.range_search(range, Lsn(100)); + assert!(res.is_none()); + } + + #[test] + fn ranged_search() { + let layers = vec![ + LayerDesc { + key_range: Key::from_i128(15)..Key::from_i128(50), + lsn_range: Lsn(0)..Lsn(5), + is_delta: false, + }, + LayerDesc { + key_range: Key::from_i128(10)..Key::from_i128(20), + lsn_range: Lsn(5)..Lsn(20), + is_delta: true, + }, + LayerDesc { + key_range: Key::from_i128(15)..Key::from_i128(25), + lsn_range: Lsn(20)..Lsn(30), + is_delta: true, + }, + LayerDesc { + key_range: Key::from_i128(35)..Key::from_i128(40), + lsn_range: Lsn(25)..Lsn(35), + is_delta: true, + }, + LayerDesc { + key_range: Key::from_i128(35)..Key::from_i128(40), + lsn_range: Lsn(35)..Lsn(40), + is_delta: false, + }, + ]; + + let layer_map = create_layer_map(layers.clone()); + for start in 0..60 { + for end in (start + 1)..60 { + let range = Key::from_i128(start)..Key::from_i128(end); + let result = layer_map.range_search(range.clone(), Lsn(100)).unwrap(); + let expected = brute_force_range_search(&layer_map, range, Lsn(100)); + + assert_range_search_result_eq(result, expected); + } + } + } +} diff --git a/pageserver/src/tenant/layer_map/layer_coverage.rs b/pageserver/src/tenant/layer_map/layer_coverage.rs index 1d9101d3d1..cf0085c071 100644 --- a/pageserver/src/tenant/layer_map/layer_coverage.rs +++ b/pageserver/src/tenant/layer_map/layer_coverage.rs @@ -129,6 +129,42 @@ impl LayerCoverage { .map(|(k, v)| (*k, v.as_ref().map(|x| x.1.clone()))) } + /// Returns an iterator which includes all coverage changes for layers that intersect + /// with the provided range. + pub fn range_overlaps( + &self, + key_range: &Range, + ) -> impl Iterator)> + '_ + where + Value: Eq, + { + let first_change = self.query(key_range.start); + match first_change { + Some(change) => { + // If the start of the range is covered, we have to deal with two cases: + // 1. Start of the range is aligned with the start of a layer. + // In this case the return of `self.range` will contain the layer which aligns with the start of the key range. + // We advance said iterator to avoid duplicating the first change. + // 2. Start of the range is not aligned with the start of a layer. + let range = key_range.start..key_range.end; + let mut range_coverage = self.range(range).peekable(); + if range_coverage + .peek() + .is_some_and(|c| c.1.as_ref() == Some(&change)) + { + range_coverage.next(); + } + itertools::Either::Left( + std::iter::once((key_range.start, Some(change))).chain(range_coverage), + ) + } + None => { + let range = key_range.start..key_range.end; + let coverage = self.range(range); + itertools::Either::Right(coverage) + } + } + } /// O(1) clone pub fn clone(&self) -> Self { Self { diff --git a/pageserver/src/tenant/storage_layer/layer_desc.rs b/pageserver/src/tenant/storage_layer/layer_desc.rs index bf24407fc5..fa78e9fdb2 100644 --- a/pageserver/src/tenant/storage_layer/layer_desc.rs +++ b/pageserver/src/tenant/storage_layer/layer_desc.rs @@ -55,13 +55,13 @@ impl PersistentLayerDesc { } #[cfg(test)] - pub fn new_test(key_range: Range) -> Self { + pub fn new_test(key_range: Range, lsn_range: Range, is_delta: bool) -> Self { Self { tenant_shard_id: TenantShardId::unsharded(TenantId::generate()), timeline_id: TimelineId::generate(), key_range, - lsn_range: Lsn(0)..Lsn(1), - is_delta: false, + lsn_range, + is_delta, file_size: 0, } }