mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-07 05:22:56 +00:00
pageserver: add range layer map search implementation (#6469)
## Problem There's no efficient way of querying the layer map for a range. ## Summary of changes Introduce a range query for the layer map (`LayerMap::range_search`). There's two broad steps to it: 1. Find all coverage changes for layers that intersect the queried range (see `LayerCoverage::range_overlaps`). The slightly tricky part is dealing with the start of the range. We can either be aligned with a layer or not and we need to treat these cases differently. 2. Iterate over the coverage changes and collect the result. For this we use a two pointer approach: the trailing pointer tracks the start of the current range (current location in the key space) and the forward pointer tracks the next coverage change. Plugging the range search into the read path is deferred to a future PR. ## Performance I adapted the layer map benchmarks on a local branch. Range searches are between 2x and 2.5x slower than point searches. That's in line with what I expected since we query thelayer map twice. Since `Timeline::get` will proxy to `Timeline::get_vectored` we can special case the one element layer map range search at that point.
This commit is contained in:
@@ -51,7 +51,10 @@ use crate::keyspace::KeyPartitioning;
|
||||
use crate::repository::Key;
|
||||
use crate::tenant::storage_layer::InMemoryLayer;
|
||||
use anyhow::Result;
|
||||
use std::collections::VecDeque;
|
||||
use pageserver_api::keyspace::KeySpaceAccum;
|
||||
use std::cmp::Ordering;
|
||||
use std::collections::{BTreeMap, VecDeque};
|
||||
use std::iter::Peekable;
|
||||
use std::ops::Range;
|
||||
use std::sync::Arc;
|
||||
use utils::lsn::Lsn;
|
||||
@@ -144,11 +147,221 @@ impl Drop for BatchedUpdates<'_> {
|
||||
}
|
||||
|
||||
/// Return value of LayerMap::search
|
||||
#[derive(Eq, PartialEq, Debug)]
|
||||
pub struct SearchResult {
|
||||
pub layer: Arc<PersistentLayerDesc>,
|
||||
pub lsn_floor: Lsn,
|
||||
}
|
||||
|
||||
pub struct OrderedSearchResult(SearchResult);
|
||||
|
||||
impl Ord for OrderedSearchResult {
|
||||
fn cmp(&self, other: &Self) -> Ordering {
|
||||
self.0.lsn_floor.cmp(&other.0.lsn_floor)
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialOrd for OrderedSearchResult {
|
||||
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
|
||||
Some(self.cmp(other))
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialEq for OrderedSearchResult {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
self.0.lsn_floor == other.0.lsn_floor
|
||||
}
|
||||
}
|
||||
|
||||
impl Eq for OrderedSearchResult {}
|
||||
|
||||
pub struct RangeSearchResult {
|
||||
pub found: BTreeMap<OrderedSearchResult, KeySpaceAccum>,
|
||||
pub not_found: KeySpaceAccum,
|
||||
}
|
||||
|
||||
impl RangeSearchResult {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
found: BTreeMap::new(),
|
||||
not_found: KeySpaceAccum::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Collector for results of range search queries on the LayerMap.
|
||||
/// It should be provided with two iterators for the delta and image coverage
|
||||
/// that contain all the changes for layers which intersect the range.
|
||||
struct RangeSearchCollector<Iter>
|
||||
where
|
||||
Iter: Iterator<Item = (i128, Option<Arc<PersistentLayerDesc>>)>,
|
||||
{
|
||||
delta_coverage: Peekable<Iter>,
|
||||
image_coverage: Peekable<Iter>,
|
||||
key_range: Range<Key>,
|
||||
end_lsn: Lsn,
|
||||
|
||||
current_delta: Option<Arc<PersistentLayerDesc>>,
|
||||
current_image: Option<Arc<PersistentLayerDesc>>,
|
||||
|
||||
result: RangeSearchResult,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
enum NextLayerType {
|
||||
Delta(i128),
|
||||
Image(i128),
|
||||
Both(i128),
|
||||
}
|
||||
|
||||
impl NextLayerType {
|
||||
fn next_change_at_key(&self) -> Key {
|
||||
match self {
|
||||
NextLayerType::Delta(at) => Key::from_i128(*at),
|
||||
NextLayerType::Image(at) => Key::from_i128(*at),
|
||||
NextLayerType::Both(at) => Key::from_i128(*at),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<Iter> RangeSearchCollector<Iter>
|
||||
where
|
||||
Iter: Iterator<Item = (i128, Option<Arc<PersistentLayerDesc>>)>,
|
||||
{
|
||||
fn new(
|
||||
key_range: Range<Key>,
|
||||
end_lsn: Lsn,
|
||||
delta_coverage: Iter,
|
||||
image_coverage: Iter,
|
||||
) -> Self {
|
||||
Self {
|
||||
delta_coverage: delta_coverage.peekable(),
|
||||
image_coverage: image_coverage.peekable(),
|
||||
key_range,
|
||||
end_lsn,
|
||||
current_delta: None,
|
||||
current_image: None,
|
||||
result: RangeSearchResult::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Run the collector. Collection is implemented via a two pointer algorithm.
|
||||
/// One pointer tracks the start of the current range and the other tracks
|
||||
/// the beginning of the next range which will overlap with the next change
|
||||
/// in coverage across both image and delta.
|
||||
fn collect(mut self) -> RangeSearchResult {
|
||||
let next_layer_type = self.choose_next_layer_type();
|
||||
let mut current_range_start = match next_layer_type {
|
||||
None => {
|
||||
// No changes for the range
|
||||
self.pad_range(self.key_range.clone());
|
||||
return self.result;
|
||||
}
|
||||
Some(layer_type) if self.key_range.end <= layer_type.next_change_at_key() => {
|
||||
// Changes only after the end of the range
|
||||
self.pad_range(self.key_range.clone());
|
||||
return self.result;
|
||||
}
|
||||
Some(layer_type) => {
|
||||
// Changes for the range exist. Record anything before the first
|
||||
// coverage change as not found.
|
||||
let coverage_start = layer_type.next_change_at_key();
|
||||
let range_before = self.key_range.start..coverage_start;
|
||||
self.pad_range(range_before);
|
||||
|
||||
self.advance(&layer_type);
|
||||
coverage_start
|
||||
}
|
||||
};
|
||||
|
||||
while current_range_start < self.key_range.end {
|
||||
let next_layer_type = self.choose_next_layer_type();
|
||||
match next_layer_type {
|
||||
Some(t) => {
|
||||
let current_range_end = t.next_change_at_key();
|
||||
self.add_range(current_range_start..current_range_end);
|
||||
current_range_start = current_range_end;
|
||||
|
||||
self.advance(&t);
|
||||
}
|
||||
None => {
|
||||
self.add_range(current_range_start..self.key_range.end);
|
||||
current_range_start = self.key_range.end;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
self.result
|
||||
}
|
||||
|
||||
/// Mark a range as not found (i.e. no layers intersect it)
|
||||
fn pad_range(&mut self, key_range: Range<Key>) {
|
||||
if !key_range.is_empty() {
|
||||
self.result.not_found.add_range(key_range);
|
||||
}
|
||||
}
|
||||
|
||||
/// Select the appropiate layer for the given range and update
|
||||
/// the collector.
|
||||
fn add_range(&mut self, covered_range: Range<Key>) {
|
||||
let selected = LayerMap::select_layer(
|
||||
self.current_delta.clone(),
|
||||
self.current_image.clone(),
|
||||
self.end_lsn,
|
||||
);
|
||||
|
||||
match selected {
|
||||
Some(search_result) => self
|
||||
.result
|
||||
.found
|
||||
.entry(OrderedSearchResult(search_result))
|
||||
.or_default()
|
||||
.add_range(covered_range),
|
||||
None => self.pad_range(covered_range),
|
||||
}
|
||||
}
|
||||
|
||||
/// Move to the next coverage change.
|
||||
fn advance(&mut self, layer_type: &NextLayerType) {
|
||||
match layer_type {
|
||||
NextLayerType::Delta(_) => {
|
||||
let (_, layer) = self.delta_coverage.next().unwrap();
|
||||
self.current_delta = layer;
|
||||
}
|
||||
NextLayerType::Image(_) => {
|
||||
let (_, layer) = self.image_coverage.next().unwrap();
|
||||
self.current_image = layer;
|
||||
}
|
||||
NextLayerType::Both(_) => {
|
||||
let (_, image_layer) = self.image_coverage.next().unwrap();
|
||||
let (_, delta_layer) = self.delta_coverage.next().unwrap();
|
||||
|
||||
self.current_image = image_layer;
|
||||
self.current_delta = delta_layer;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Pick the next coverage change: the one at the lesser key or both if they're alligned.
|
||||
fn choose_next_layer_type(&mut self) -> Option<NextLayerType> {
|
||||
let next_delta_at = self.delta_coverage.peek().map(|(key, _)| key);
|
||||
let next_image_at = self.image_coverage.peek().map(|(key, _)| key);
|
||||
|
||||
match (next_delta_at, next_image_at) {
|
||||
(None, None) => None,
|
||||
(Some(next_delta_at), None) => Some(NextLayerType::Delta(*next_delta_at)),
|
||||
(None, Some(next_image_at)) => Some(NextLayerType::Image(*next_image_at)),
|
||||
(Some(next_delta_at), Some(next_image_at)) if next_image_at < next_delta_at => {
|
||||
Some(NextLayerType::Image(*next_image_at))
|
||||
}
|
||||
(Some(next_delta_at), Some(next_image_at)) if next_delta_at < next_image_at => {
|
||||
Some(NextLayerType::Delta(*next_delta_at))
|
||||
}
|
||||
(Some(next_delta_at), Some(_)) => Some(NextLayerType::Both(*next_delta_at)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl LayerMap {
|
||||
///
|
||||
/// Find the latest layer (by lsn.end) that covers the given
|
||||
@@ -186,7 +399,18 @@ impl LayerMap {
|
||||
let latest_delta = version.delta_coverage.query(key.to_i128());
|
||||
let latest_image = version.image_coverage.query(key.to_i128());
|
||||
|
||||
match (latest_delta, latest_image) {
|
||||
Self::select_layer(latest_delta, latest_image, end_lsn)
|
||||
}
|
||||
|
||||
fn select_layer(
|
||||
delta_layer: Option<Arc<PersistentLayerDesc>>,
|
||||
image_layer: Option<Arc<PersistentLayerDesc>>,
|
||||
end_lsn: Lsn,
|
||||
) -> Option<SearchResult> {
|
||||
assert!(delta_layer.as_ref().map_or(true, |l| l.is_delta()));
|
||||
assert!(image_layer.as_ref().map_or(true, |l| !l.is_delta()));
|
||||
|
||||
match (delta_layer, image_layer) {
|
||||
(None, None) => None,
|
||||
(None, Some(image)) => {
|
||||
let lsn_floor = image.get_lsn_range().start;
|
||||
@@ -223,6 +447,17 @@ impl LayerMap {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn range_search(&self, key_range: Range<Key>, end_lsn: Lsn) -> Option<RangeSearchResult> {
|
||||
let version = self.historic.get().unwrap().get_version(end_lsn.0 - 1)?;
|
||||
|
||||
let raw_range = key_range.start.to_i128()..key_range.end.to_i128();
|
||||
let delta_changes = version.delta_coverage.range_overlaps(&raw_range);
|
||||
let image_changes = version.image_coverage.range_overlaps(&raw_range);
|
||||
|
||||
let collector = RangeSearchCollector::new(key_range, end_lsn, delta_changes, image_changes);
|
||||
Some(collector.collect())
|
||||
}
|
||||
|
||||
/// Start a batch of updates, applied on drop
|
||||
pub fn batch_update(&mut self) -> BatchedUpdates<'_> {
|
||||
BatchedUpdates { layer_map: self }
|
||||
@@ -631,3 +866,126 @@ impl LayerMap {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[derive(Clone)]
|
||||
struct LayerDesc {
|
||||
key_range: Range<Key>,
|
||||
lsn_range: Range<Lsn>,
|
||||
is_delta: bool,
|
||||
}
|
||||
|
||||
fn create_layer_map(layers: Vec<LayerDesc>) -> LayerMap {
|
||||
let mut layer_map = LayerMap::default();
|
||||
|
||||
for layer in layers {
|
||||
layer_map.insert_historic_noflush(PersistentLayerDesc::new_test(
|
||||
layer.key_range,
|
||||
layer.lsn_range,
|
||||
layer.is_delta,
|
||||
));
|
||||
}
|
||||
|
||||
layer_map.flush_updates();
|
||||
layer_map
|
||||
}
|
||||
|
||||
fn assert_range_search_result_eq(lhs: RangeSearchResult, rhs: RangeSearchResult) {
|
||||
assert_eq!(lhs.not_found.to_keyspace(), rhs.not_found.to_keyspace());
|
||||
let lhs: Vec<_> = lhs
|
||||
.found
|
||||
.into_iter()
|
||||
.map(|(search_result, accum)| (search_result.0, accum.to_keyspace()))
|
||||
.collect();
|
||||
let rhs: Vec<_> = rhs
|
||||
.found
|
||||
.into_iter()
|
||||
.map(|(search_result, accum)| (search_result.0, accum.to_keyspace()))
|
||||
.collect();
|
||||
|
||||
assert_eq!(lhs, rhs);
|
||||
}
|
||||
|
||||
fn brute_force_range_search(
|
||||
layer_map: &LayerMap,
|
||||
key_range: Range<Key>,
|
||||
end_lsn: Lsn,
|
||||
) -> RangeSearchResult {
|
||||
let mut range_search_result = RangeSearchResult::new();
|
||||
|
||||
let mut key = key_range.start;
|
||||
while key != key_range.end {
|
||||
let res = layer_map.search(key, end_lsn);
|
||||
match res {
|
||||
Some(res) => {
|
||||
range_search_result
|
||||
.found
|
||||
.entry(OrderedSearchResult(res))
|
||||
.or_default()
|
||||
.add_key(key);
|
||||
}
|
||||
None => {
|
||||
range_search_result.not_found.add_key(key);
|
||||
}
|
||||
}
|
||||
|
||||
key = key.next();
|
||||
}
|
||||
|
||||
range_search_result
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn ranged_search_on_empty_layer_map() {
|
||||
let layer_map = LayerMap::default();
|
||||
let range = Key::from_i128(100)..Key::from_i128(200);
|
||||
|
||||
let res = layer_map.range_search(range, Lsn(100));
|
||||
assert!(res.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn ranged_search() {
|
||||
let layers = vec![
|
||||
LayerDesc {
|
||||
key_range: Key::from_i128(15)..Key::from_i128(50),
|
||||
lsn_range: Lsn(0)..Lsn(5),
|
||||
is_delta: false,
|
||||
},
|
||||
LayerDesc {
|
||||
key_range: Key::from_i128(10)..Key::from_i128(20),
|
||||
lsn_range: Lsn(5)..Lsn(20),
|
||||
is_delta: true,
|
||||
},
|
||||
LayerDesc {
|
||||
key_range: Key::from_i128(15)..Key::from_i128(25),
|
||||
lsn_range: Lsn(20)..Lsn(30),
|
||||
is_delta: true,
|
||||
},
|
||||
LayerDesc {
|
||||
key_range: Key::from_i128(35)..Key::from_i128(40),
|
||||
lsn_range: Lsn(25)..Lsn(35),
|
||||
is_delta: true,
|
||||
},
|
||||
LayerDesc {
|
||||
key_range: Key::from_i128(35)..Key::from_i128(40),
|
||||
lsn_range: Lsn(35)..Lsn(40),
|
||||
is_delta: false,
|
||||
},
|
||||
];
|
||||
|
||||
let layer_map = create_layer_map(layers.clone());
|
||||
for start in 0..60 {
|
||||
for end in (start + 1)..60 {
|
||||
let range = Key::from_i128(start)..Key::from_i128(end);
|
||||
let result = layer_map.range_search(range.clone(), Lsn(100)).unwrap();
|
||||
let expected = brute_force_range_search(&layer_map, range, Lsn(100));
|
||||
|
||||
assert_range_search_result_eq(result, expected);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -129,6 +129,42 @@ impl<Value: Clone> LayerCoverage<Value> {
|
||||
.map(|(k, v)| (*k, v.as_ref().map(|x| x.1.clone())))
|
||||
}
|
||||
|
||||
/// Returns an iterator which includes all coverage changes for layers that intersect
|
||||
/// with the provided range.
|
||||
pub fn range_overlaps(
|
||||
&self,
|
||||
key_range: &Range<i128>,
|
||||
) -> impl Iterator<Item = (i128, Option<Value>)> + '_
|
||||
where
|
||||
Value: Eq,
|
||||
{
|
||||
let first_change = self.query(key_range.start);
|
||||
match first_change {
|
||||
Some(change) => {
|
||||
// If the start of the range is covered, we have to deal with two cases:
|
||||
// 1. Start of the range is aligned with the start of a layer.
|
||||
// In this case the return of `self.range` will contain the layer which aligns with the start of the key range.
|
||||
// We advance said iterator to avoid duplicating the first change.
|
||||
// 2. Start of the range is not aligned with the start of a layer.
|
||||
let range = key_range.start..key_range.end;
|
||||
let mut range_coverage = self.range(range).peekable();
|
||||
if range_coverage
|
||||
.peek()
|
||||
.is_some_and(|c| c.1.as_ref() == Some(&change))
|
||||
{
|
||||
range_coverage.next();
|
||||
}
|
||||
itertools::Either::Left(
|
||||
std::iter::once((key_range.start, Some(change))).chain(range_coverage),
|
||||
)
|
||||
}
|
||||
None => {
|
||||
let range = key_range.start..key_range.end;
|
||||
let coverage = self.range(range);
|
||||
itertools::Either::Right(coverage)
|
||||
}
|
||||
}
|
||||
}
|
||||
/// O(1) clone
|
||||
pub fn clone(&self) -> Self {
|
||||
Self {
|
||||
|
||||
@@ -55,13 +55,13 @@ impl PersistentLayerDesc {
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub fn new_test(key_range: Range<Key>) -> Self {
|
||||
pub fn new_test(key_range: Range<Key>, lsn_range: Range<Lsn>, is_delta: bool) -> Self {
|
||||
Self {
|
||||
tenant_shard_id: TenantShardId::unsharded(TenantId::generate()),
|
||||
timeline_id: TimelineId::generate(),
|
||||
key_range,
|
||||
lsn_range: Lsn(0)..Lsn(1),
|
||||
is_delta: false,
|
||||
lsn_range,
|
||||
is_delta,
|
||||
file_size: 0,
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user