mirror of
https://github.com/neondatabase/neon.git
synced 2026-07-04 04:30:38 +00:00
pagseserver: add range layer map lookup
Use a two pointer algorithm to collect all the layers that intersect a given range. The trailing pointer tracks the start of the current range (current location in the key space) and the forward pointer tracks the next coverage change. The result of the collection is an map from layer to range which is ordered by LSN. The ordering will come in handy to the rest of the read path for figuring out where to continue from.
This commit is contained in:
@@ -51,7 +51,10 @@ use crate::keyspace::KeyPartitioning;
|
||||
use crate::repository::Key;
|
||||
use crate::tenant::storage_layer::InMemoryLayer;
|
||||
use anyhow::Result;
|
||||
use std::collections::VecDeque;
|
||||
use pageserver_api::keyspace::KeySpaceAccum;
|
||||
use std::cmp::Ordering;
|
||||
use std::collections::{BTreeMap, VecDeque};
|
||||
use std::iter::Peekable;
|
||||
use std::ops::Range;
|
||||
use std::sync::Arc;
|
||||
use utils::lsn::Lsn;
|
||||
@@ -144,11 +147,221 @@ impl Drop for BatchedUpdates<'_> {
|
||||
}
|
||||
|
||||
/// Return value of LayerMap::search
|
||||
#[derive(Eq, PartialEq, Debug)]
|
||||
pub struct SearchResult {
|
||||
pub layer: Arc<PersistentLayerDesc>,
|
||||
pub lsn_floor: Lsn,
|
||||
}
|
||||
|
||||
pub struct OrderedSearchResult(SearchResult);
|
||||
|
||||
impl Ord for OrderedSearchResult {
|
||||
fn cmp(&self, other: &Self) -> Ordering {
|
||||
self.0.lsn_floor.cmp(&other.0.lsn_floor)
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialOrd for OrderedSearchResult {
|
||||
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
|
||||
Some(self.cmp(other))
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialEq for OrderedSearchResult {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
self.0.lsn_floor == other.0.lsn_floor
|
||||
}
|
||||
}
|
||||
|
||||
impl Eq for OrderedSearchResult {}
|
||||
|
||||
pub struct RangeSearchResult {
|
||||
pub found: BTreeMap<OrderedSearchResult, KeySpaceAccum>,
|
||||
pub not_found: KeySpaceAccum,
|
||||
}
|
||||
|
||||
impl RangeSearchResult {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
found: BTreeMap::new(),
|
||||
not_found: KeySpaceAccum::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Collector for results of range search queries on the LayerMap.
|
||||
/// It should be provided with two iterators for the delta and image coverage
|
||||
/// that contain all the changes for layers which intersect the range.
|
||||
struct RangeSearchCollector<Iter>
|
||||
where
|
||||
Iter: Iterator<Item = (i128, Option<Arc<PersistentLayerDesc>>)>,
|
||||
{
|
||||
delta_coverage: Peekable<Iter>,
|
||||
image_coverage: Peekable<Iter>,
|
||||
key_range: Range<Key>,
|
||||
end_lsn: Lsn,
|
||||
|
||||
current_delta: Option<Arc<PersistentLayerDesc>>,
|
||||
current_image: Option<Arc<PersistentLayerDesc>>,
|
||||
|
||||
result: RangeSearchResult,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
enum NextLayerType {
|
||||
Delta(i128),
|
||||
Image(i128),
|
||||
Both(i128),
|
||||
}
|
||||
|
||||
impl NextLayerType {
|
||||
fn next_change_at_key(&self) -> Key {
|
||||
match self {
|
||||
NextLayerType::Delta(at) => Key::from_i128(*at),
|
||||
NextLayerType::Image(at) => Key::from_i128(*at),
|
||||
NextLayerType::Both(at) => Key::from_i128(*at),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<Iter> RangeSearchCollector<Iter>
|
||||
where
|
||||
Iter: Iterator<Item = (i128, Option<Arc<PersistentLayerDesc>>)>,
|
||||
{
|
||||
fn new(
|
||||
key_range: Range<Key>,
|
||||
end_lsn: Lsn,
|
||||
delta_coverage: Iter,
|
||||
image_coverage: Iter,
|
||||
) -> Self {
|
||||
Self {
|
||||
delta_coverage: delta_coverage.peekable(),
|
||||
image_coverage: image_coverage.peekable(),
|
||||
key_range: key_range.clone(),
|
||||
end_lsn,
|
||||
current_delta: None,
|
||||
current_image: None,
|
||||
result: RangeSearchResult::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Run the collector. Collection is implemented via a two pointer algorithm.
|
||||
/// One pointer tracks the start of the current range and the other tracks
|
||||
/// the beggining of the next range which will overlap with the next change
|
||||
/// in coverage across both image and delta.
|
||||
fn collect(mut self) -> RangeSearchResult {
|
||||
let next_layer_type = self.choose_next_layer_type();
|
||||
let mut current_range_start = match next_layer_type {
|
||||
None => {
|
||||
// No changes for the range
|
||||
self.pad_range(self.key_range.clone());
|
||||
return self.result;
|
||||
}
|
||||
Some(layer_type) if self.key_range.end <= layer_type.next_change_at_key() => {
|
||||
// Changes only after the end of the range
|
||||
self.pad_range(self.key_range.clone());
|
||||
return self.result;
|
||||
}
|
||||
Some(layer_type) => {
|
||||
// Changes for the range exist. Record anything before the first
|
||||
// coverage change as not found.
|
||||
let coverage_start = layer_type.next_change_at_key();
|
||||
let range_before = self.key_range.start..coverage_start;
|
||||
self.pad_range(range_before);
|
||||
|
||||
self.advance(&layer_type);
|
||||
coverage_start
|
||||
}
|
||||
};
|
||||
|
||||
while current_range_start < self.key_range.end {
|
||||
let next_layer_type = self.choose_next_layer_type();
|
||||
match next_layer_type {
|
||||
Some(t) => {
|
||||
let current_range_end = t.next_change_at_key();
|
||||
self.add_range(current_range_start..current_range_end);
|
||||
current_range_start = current_range_end;
|
||||
|
||||
self.advance(&t);
|
||||
}
|
||||
None => {
|
||||
self.add_range(current_range_start..self.key_range.end);
|
||||
current_range_start = self.key_range.end;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
self.result
|
||||
}
|
||||
|
||||
/// Mark a range as not found (i.e. no layers intersect it)
|
||||
fn pad_range(&mut self, key_range: Range<Key>) {
|
||||
if !key_range.is_empty() {
|
||||
self.result.not_found.add_range(key_range);
|
||||
}
|
||||
}
|
||||
|
||||
/// Select the appropiate layer for the given range and update
|
||||
/// the collector.
|
||||
fn add_range(&mut self, covered_range: Range<Key>) {
|
||||
let selected = LayerMap::select_layer(
|
||||
self.current_delta.clone(),
|
||||
self.current_image.clone(),
|
||||
self.end_lsn,
|
||||
);
|
||||
|
||||
match selected {
|
||||
Some(search_result) => self
|
||||
.result
|
||||
.found
|
||||
.entry(OrderedSearchResult(search_result))
|
||||
.or_default()
|
||||
.add_range(covered_range),
|
||||
None => self.pad_range(covered_range),
|
||||
}
|
||||
}
|
||||
|
||||
/// Move to the next coverage change.
|
||||
fn advance(&mut self, layer_type: &NextLayerType) {
|
||||
match layer_type {
|
||||
NextLayerType::Delta(_) => {
|
||||
let (_, layer) = self.delta_coverage.next().unwrap();
|
||||
self.current_delta = layer;
|
||||
}
|
||||
NextLayerType::Image(_) => {
|
||||
let (_, layer) = self.image_coverage.next().unwrap();
|
||||
self.current_image = layer;
|
||||
}
|
||||
NextLayerType::Both(_) => {
|
||||
let (_, image_layer) = self.image_coverage.next().unwrap();
|
||||
let (_, delta_layer) = self.delta_coverage.next().unwrap();
|
||||
|
||||
self.current_image = image_layer;
|
||||
self.current_delta = delta_layer;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Pick the next coverage change: the one at the lesser key or both if they're alligned.
|
||||
fn choose_next_layer_type(&mut self) -> Option<NextLayerType> {
|
||||
let next_delta_at = self.delta_coverage.peek().map(|(key, _)| key);
|
||||
let next_image_at = self.image_coverage.peek().map(|(key, _)| key);
|
||||
|
||||
match (next_delta_at, next_image_at) {
|
||||
(None, None) => None,
|
||||
(Some(next_delta_at), None) => Some(NextLayerType::Delta(*next_delta_at)),
|
||||
(None, Some(next_image_at)) => Some(NextLayerType::Image(*next_image_at)),
|
||||
(Some(next_delta_at), Some(next_image_at)) if next_image_at < next_delta_at => {
|
||||
Some(NextLayerType::Image(*next_image_at))
|
||||
}
|
||||
(Some(next_delta_at), Some(next_image_at)) if next_delta_at < next_image_at => {
|
||||
Some(NextLayerType::Delta(*next_delta_at))
|
||||
}
|
||||
(Some(next_delta_at), Some(_)) => Some(NextLayerType::Both(*next_delta_at)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl LayerMap {
|
||||
///
|
||||
/// Find the latest layer (by lsn.end) that covers the given
|
||||
@@ -186,7 +399,18 @@ impl LayerMap {
|
||||
let latest_delta = version.delta_coverage.query(key.to_i128());
|
||||
let latest_image = version.image_coverage.query(key.to_i128());
|
||||
|
||||
match (latest_delta, latest_image) {
|
||||
Self::select_layer(latest_delta, latest_image, end_lsn)
|
||||
}
|
||||
|
||||
fn select_layer(
|
||||
delta_layer: Option<Arc<PersistentLayerDesc>>,
|
||||
image_layer: Option<Arc<PersistentLayerDesc>>,
|
||||
end_lsn: Lsn,
|
||||
) -> Option<SearchResult> {
|
||||
assert!(delta_layer.as_ref().map_or(true, |l| l.is_delta()));
|
||||
assert!(image_layer.as_ref().map_or(true, |l| !l.is_delta()));
|
||||
|
||||
match (delta_layer, image_layer) {
|
||||
(None, None) => None,
|
||||
(None, Some(image)) => {
|
||||
let lsn_floor = image.get_lsn_range().start;
|
||||
@@ -223,6 +447,17 @@ impl LayerMap {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn range_search(&self, key_range: Range<Key>, end_lsn: Lsn) -> Option<RangeSearchResult> {
|
||||
let version = self.historic.get().unwrap().get_version(end_lsn.0 - 1)?;
|
||||
|
||||
let raw_range = key_range.start.to_i128()..key_range.end.to_i128();
|
||||
let delta_changes = version.delta_coverage.range_overlaps(&raw_range);
|
||||
let image_changes = version.image_coverage.range_overlaps(&raw_range);
|
||||
|
||||
let collector = RangeSearchCollector::new(key_range, end_lsn, delta_changes, image_changes);
|
||||
Some(collector.collect())
|
||||
}
|
||||
|
||||
/// Start a batch of updates, applied on drop
|
||||
pub fn batch_update(&mut self) -> BatchedUpdates<'_> {
|
||||
BatchedUpdates { layer_map: self }
|
||||
@@ -631,3 +866,126 @@ impl LayerMap {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[derive(Clone)]
|
||||
struct LayerDesc {
|
||||
key_range: Range<Key>,
|
||||
lsn_range: Range<Lsn>,
|
||||
is_delta: bool,
|
||||
}
|
||||
|
||||
fn create_layer_map(layers: Vec<LayerDesc>) -> LayerMap {
|
||||
let mut layer_map = LayerMap::default();
|
||||
|
||||
for layer in layers {
|
||||
layer_map.insert_historic_noflush(PersistentLayerDesc::new_test(
|
||||
layer.key_range,
|
||||
layer.lsn_range,
|
||||
layer.is_delta,
|
||||
));
|
||||
}
|
||||
|
||||
layer_map.flush_updates();
|
||||
layer_map
|
||||
}
|
||||
|
||||
fn assert_range_search_result_eq(lhs: RangeSearchResult, rhs: RangeSearchResult) {
|
||||
assert_eq!(lhs.not_found.to_keyspace(), rhs.not_found.to_keyspace());
|
||||
let lhs: Vec<_> = lhs
|
||||
.found
|
||||
.into_iter()
|
||||
.map(|(search_result, accum)| (search_result.0, accum.to_keyspace()))
|
||||
.collect();
|
||||
let rhs: Vec<_> = rhs
|
||||
.found
|
||||
.into_iter()
|
||||
.map(|(search_result, accum)| (search_result.0, accum.to_keyspace()))
|
||||
.collect();
|
||||
|
||||
assert_eq!(lhs, rhs);
|
||||
}
|
||||
|
||||
fn brute_force_range_search(
|
||||
layer_map: &LayerMap,
|
||||
key_range: Range<Key>,
|
||||
end_lsn: Lsn,
|
||||
) -> RangeSearchResult {
|
||||
let mut range_search_result = RangeSearchResult::new();
|
||||
|
||||
let mut key = key_range.start;
|
||||
while key != key_range.end {
|
||||
let res = layer_map.search(key, end_lsn);
|
||||
match res {
|
||||
Some(res) => {
|
||||
range_search_result
|
||||
.found
|
||||
.entry(OrderedSearchResult(res))
|
||||
.or_default()
|
||||
.add_key(key);
|
||||
}
|
||||
None => {
|
||||
range_search_result.not_found.add_key(key);
|
||||
}
|
||||
}
|
||||
|
||||
key = key.next();
|
||||
}
|
||||
|
||||
range_search_result
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn ranged_search_on_empty_layer_map() {
|
||||
let layer_map = LayerMap::default();
|
||||
let range = Key::from_i128(100)..Key::from_i128(200);
|
||||
|
||||
let res = layer_map.range_search(range, Lsn(100));
|
||||
assert!(res.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn ranged_search() {
|
||||
let layers = vec![
|
||||
LayerDesc {
|
||||
key_range: Key::from_i128(15)..Key::from_i128(50),
|
||||
lsn_range: Lsn(0)..Lsn(5),
|
||||
is_delta: false,
|
||||
},
|
||||
LayerDesc {
|
||||
key_range: Key::from_i128(10)..Key::from_i128(20),
|
||||
lsn_range: Lsn(10)..Lsn(20),
|
||||
is_delta: true,
|
||||
},
|
||||
LayerDesc {
|
||||
key_range: Key::from_i128(15)..Key::from_i128(25),
|
||||
lsn_range: Lsn(20)..Lsn(30),
|
||||
is_delta: true,
|
||||
},
|
||||
LayerDesc {
|
||||
key_range: Key::from_i128(35)..Key::from_i128(40),
|
||||
lsn_range: Lsn(25)..Lsn(35),
|
||||
is_delta: true,
|
||||
},
|
||||
LayerDesc {
|
||||
key_range: Key::from_i128(35)..Key::from_i128(40),
|
||||
lsn_range: Lsn(35)..Lsn(40),
|
||||
is_delta: false,
|
||||
},
|
||||
];
|
||||
|
||||
let layer_map = create_layer_map(layers.clone());
|
||||
for start in 0..60 {
|
||||
for end in (start + 1)..60 {
|
||||
let range = Key::from_i128(start)..Key::from_i128(end);
|
||||
let result = layer_map.range_search(range.clone(), Lsn(100)).unwrap();
|
||||
let expected = brute_force_range_search(&layer_map, range, Lsn(100));
|
||||
|
||||
assert_range_search_result_eq(result, expected);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -134,10 +134,7 @@ impl<Value: Clone> LayerCoverage<Value> {
|
||||
pub fn range_overlaps(
|
||||
&self,
|
||||
key_range: &Range<i128>,
|
||||
) -> itertools::Either<
|
||||
impl Iterator<Item = (i128, Option<Value>)> + '_,
|
||||
impl Iterator<Item = (i128, Option<Value>)> + '_,
|
||||
>
|
||||
) -> impl Iterator<Item = (i128, Option<Value>)> + '_
|
||||
where
|
||||
Value: Eq,
|
||||
{
|
||||
|
||||
@@ -55,13 +55,13 @@ impl PersistentLayerDesc {
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub fn new_test(key_range: Range<Key>) -> Self {
|
||||
pub fn new_test(key_range: Range<Key>, lsn_range: Range<Lsn>, is_delta: bool) -> Self {
|
||||
Self {
|
||||
tenant_shard_id: TenantShardId::unsharded(TenantId::generate()),
|
||||
timeline_id: TimelineId::generate(),
|
||||
key_range,
|
||||
lsn_range: Lsn(0)..Lsn(1),
|
||||
is_delta: false,
|
||||
lsn_range,
|
||||
is_delta,
|
||||
file_size: 0,
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user