Compare commits

...

5 Commits

Author SHA1 Message Date
Vlad Lazar
10df5a650b wip 2024-02-01 16:26:43 +00:00
Vlad Lazar
d37b99c7b1 pageserver: lift ancestor timeline logic from read path
When the read path needs to follow a key into the ancestor timeline,
it needs to wait for said ancestor to become active and aware of
it's branching lsn. The logic is lifted into a separate function with
it's own new error type.

This is done because the vectored read path needs the same logic.
It's also the reason for the newly introduced error type.
2024-02-01 16:26:22 +00:00
Vlad Lazar
4c6627f3d9 pageserver_api: remove overlaps from KeySpace
This commit adds a function to `KeySpace` which updates a key key space
by removing all overlaps with a second key space. This can involve
splitting or removing of existing ranges.

The implementation is not particularly efficient: O(M * N * log(N))
where N is the number of ranges in the current key space and M is the
number of ranges in the key space we are checking against. In practice,
this shouldn't matter much since, in the short term, the only caller
of this function will be the vectored read path and the number of key
spaces invovled will be small. This follows from the upper bound placed
on the number of keys accepted by the vectored read path.

A couple other small utility functions are added. They'll be used by the
vectored search path as well.
2024-01-31 10:15:40 +00:00
Vlad Lazar
ae77e5355c pagseserver: add range layer map lookup
Use a two pointer algorithm to collect all the layers that intersect
a given range. The trailing pointer tracks the start of the current
range (current location in the key space) and the forward pointer
tracks the next coverage change.

The result of the collection is an map from layer to range which is
ordered by LSN. The ordering will come in handy to the rest of the read
path for figuring out where to continue from.
2024-01-25 12:28:32 +00:00
Vlad Lazar
493760bb7a pageserver: add ranged layer coverage for all intersected layers 2024-01-25 10:58:30 +00:00
15 changed files with 1850 additions and 143 deletions

1
Cargo.lock generated
View File

@@ -3386,6 +3386,7 @@ dependencies = [
"enum-map",
"hex",
"humantime-serde",
"itertools",
"postgres_ffi",
"rand 0.8.5",
"serde",

View File

@@ -20,6 +20,7 @@ strum_macros.workspace = true
hex.workspace = true
thiserror.workspace = true
humantime-serde.workspace = true
itertools.workspace = true
workspace_hack.workspace = true

View File

@@ -2,6 +2,7 @@ use postgres_ffi::BLCKSZ;
use std::ops::Range;
use crate::key::Key;
use itertools::Itertools;
///
/// Represents a set of Keys, in a compact form.
@@ -63,16 +64,93 @@ impl KeySpace {
KeyPartitioning { parts }
}
pub fn merge(&mut self, other: &KeySpace) {
let all_ranges = self
.ranges
.iter()
.merge_by(other.ranges.iter(), |lhs, rhs| lhs.start < rhs.start);
let mut accum = KeySpaceAccum::new();
let mut prev: Option<&Range<Key>> = None;
for range in all_ranges {
if let Some(prev) = prev {
let overlap =
std::cmp::max(range.start, prev.start) < std::cmp::min(range.end, prev.end);
assert!(
!overlap,
"Attempt to merge ovelapping keyspaces: {:?} overlaps {:?}",
prev, range
);
}
accum.add_range(range.clone());
prev = Some(range);
}
self.ranges = accum.to_keyspace().ranges;
}
/// Update the keyspace such that it doesn't contain any range
/// that is overlapping with `other`. This can involve splitting or
/// removing of existing ranges.
pub fn remove_overlapping_with(&mut self, other: &KeySpace) {
for range in &other.ranges {
while let Some(overlap_at) = self.overlaps_at(range) {
let overlapped = self.ranges[overlap_at].clone();
if overlapped.start < range.start && overlapped.end <= range.end {
// Higher part of the range is completely overlapped.
self.ranges[overlap_at].end = range.start;
}
if overlapped.start >= range.start && overlapped.end > range.end {
// Lower part of the range is completely overlapped.
self.ranges[overlap_at].start = range.end;
}
if overlapped.start < range.start && overlapped.end > range.end {
// Middle part of the range is overlapped.
self.ranges[overlap_at].end = range.start;
self.ranges
.insert(overlap_at + 1, range.end..overlapped.end);
}
if overlapped.start >= range.start && overlapped.end <= range.end {
// Whole range is overlapped
self.ranges.remove(overlap_at);
}
}
}
}
pub fn start(&self) -> Option<Key> {
self.ranges.first().map(|range| range.start)
}
pub fn end(&self) -> Option<Key> {
self.ranges.last().map(|range| range.end)
}
#[allow(unused)]
pub fn total_size(&self) -> usize {
self.ranges
.iter()
.map(|range| key_range_size(range) as usize)
.sum()
}
fn overlaps_at(&self, range: &Range<Key>) -> Option<usize> {
match self.ranges.binary_search_by_key(&range.end, |r| r.start) {
Ok(0) => None,
Err(0) => None,
Ok(index) if self.ranges[index - 1].end > range.start => Some(index - 1),
Err(index) if self.ranges[index - 1].end > range.start => Some(index - 1),
_ => None,
}
}
///
/// Check if key space contains overlapping range
///
pub fn overlaps(&self, range: &Range<Key>) -> bool {
match self.ranges.binary_search_by_key(&range.end, |r| r.start) {
Ok(0) => false,
Err(0) => false,
Ok(index) => self.ranges[index - 1].end > range.start,
Err(index) => self.ranges[index - 1].end > range.start,
}
self.overlaps_at(range).is_some()
}
}
@@ -441,4 +519,104 @@ mod tests {
// xxxxxxxxxxx
assert!(ks.overlaps(&kr(0..30))); // XXXXX This fails currently!
}
#[test]
fn test_remove_overlapping_with() {
// Test full overlaps
let mut key_space1 = KeySpace {
ranges: vec![
Key::from_i128(1)..Key::from_i128(4),
Key::from_i128(5)..Key::from_i128(8),
Key::from_i128(10)..Key::from_i128(12),
],
};
let key_space2 = KeySpace {
ranges: vec![
Key::from_i128(2)..Key::from_i128(3),
Key::from_i128(6)..Key::from_i128(7),
Key::from_i128(11)..Key::from_i128(13),
],
};
key_space1.remove_overlapping_with(&key_space2);
assert_eq!(
key_space1.ranges,
vec![
Key::from_i128(1)..Key::from_i128(2),
Key::from_i128(3)..Key::from_i128(4),
Key::from_i128(5)..Key::from_i128(6),
Key::from_i128(7)..Key::from_i128(8),
Key::from_i128(10)..Key::from_i128(11)
]
);
// Test partial ovelaps
let mut key_space1 = KeySpace {
ranges: vec![
Key::from_i128(1)..Key::from_i128(5),
Key::from_i128(7)..Key::from_i128(10),
Key::from_i128(12)..Key::from_i128(15),
],
};
let key_space2 = KeySpace {
ranges: vec![
Key::from_i128(3)..Key::from_i128(6),
Key::from_i128(8)..Key::from_i128(11),
Key::from_i128(14)..Key::from_i128(17),
],
};
key_space1.remove_overlapping_with(&key_space2);
assert_eq!(
key_space1.ranges,
vec![
Key::from_i128(1)..Key::from_i128(3),
Key::from_i128(7)..Key::from_i128(8),
Key::from_i128(12)..Key::from_i128(14),
]
);
// Test no overlaps
let mut key_space1 = KeySpace {
ranges: vec![
Key::from_i128(1)..Key::from_i128(5),
Key::from_i128(7)..Key::from_i128(10),
Key::from_i128(12)..Key::from_i128(15),
],
};
let key_space2 = KeySpace {
ranges: vec![
Key::from_i128(6)..Key::from_i128(7),
Key::from_i128(11)..Key::from_i128(12),
Key::from_i128(15)..Key::from_i128(17),
],
};
key_space1.remove_overlapping_with(&key_space2);
assert_eq!(
key_space1.ranges,
vec![
Key::from_i128(1)..Key::from_i128(5),
Key::from_i128(7)..Key::from_i128(10),
Key::from_i128(12)..Key::from_i128(15),
]
);
// Test one range overlaps multiple
let mut key_space1 = KeySpace {
ranges: vec![
Key::from_i128(1)..Key::from_i128(5),
Key::from_i128(7)..Key::from_i128(10),
Key::from_i128(12)..Key::from_i128(15),
],
};
let key_space2 = KeySpace {
ranges: vec![Key::from_i128(4)..Key::from_i128(14)],
};
key_space1.remove_overlapping_with(&key_space2);
assert_eq!(
key_space1.ranges,
vec![
Key::from_i128(1)..Key::from_i128(4),
Key::from_i128(14)..Key::from_i128(15),
]
);
}
}

View File

@@ -262,10 +262,26 @@ where
let blocks = self
.timeline
.get_vectored(&part.ranges, self.lsn, self.ctx)
.await?;
.await;
let blocks = match blocks {
Ok(blocks) => blocks,
Err(err) => {
error!("get_vectored failed: {}", err);
return Err(anyhow!(err));
}
};
for (key, block) in blocks {
slru_builder.add_block(&key, block?).await?;
let block = match block {
Ok(block) => block,
Err(err) => {
error!("get_vectored failed on key {}: {}", key, err);
return Err(anyhow!(err));
}
};
slru_builder.add_block(&key, block).await?;
}
}

View File

@@ -4113,11 +4113,13 @@ mod tests {
use crate::keyspace::KeySpaceAccum;
use crate::repository::{Key, Value};
use crate::tenant::harness::*;
use crate::tenant::timeline::GetVectoredImpl;
use crate::DEFAULT_PG_VERSION;
use crate::METADATA_FILE_NAME;
use bytes::BytesMut;
use hex_literal::hex;
use once_cell::sync::Lazy;
use pageserver_api::keyspace::KeySpace;
use rand::{thread_rng, Rng};
use tokio_util::sync::CancellationToken;
@@ -4810,6 +4812,61 @@ mod tests {
Ok(())
}
async fn bulk_insert_compact_gc(
timeline: Arc<Timeline>,
ctx: &RequestContext,
mut lsn: Lsn,
repeat: usize,
key_count: usize,
) -> anyhow::Result<()> {
let mut test_key = Key::from_hex("010000000033333333444444445500000000").unwrap();
let mut blknum = 0;
// Enforce that key range is monotonously increasing
let mut keyspace = KeySpaceAccum::new();
for _ in 0..repeat {
for _ in 0..key_count {
test_key.field6 = blknum;
let writer = timeline.writer().await;
writer
.put(
test_key,
lsn,
&Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
ctx,
)
.await?;
writer.finish_write(lsn);
drop(writer);
keyspace.add_key(test_key);
lsn = Lsn(lsn.0 + 0x10);
blknum += 1;
}
let cutoff = timeline.get_last_record_lsn();
timeline
.update_gc_info(
Vec::new(),
cutoff,
Duration::ZERO,
&CancellationToken::new(),
ctx,
)
.await?;
timeline.freeze_and_flush().await?;
timeline
.compact(&CancellationToken::new(), EnumSet::empty(), ctx)
.await?;
timeline.gc().await?;
}
Ok(())
}
//
// Insert 1000 key-value pairs with increasing keys, flush, compact, GC.
// Repeat 50 times.
@@ -4822,49 +4879,126 @@ mod tests {
.create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)
.await?;
let mut lsn = Lsn(0x10);
let lsn = Lsn(0x10);
bulk_insert_compact_gc(tline.clone(), &ctx, lsn, 50, 10000).await?;
let mut keyspace = KeySpaceAccum::new();
let guard = tline.layers.read().await;
guard.layer_map().dump(true, &ctx).await?;
let mut test_key = Key::from_hex("010000000033333333444444445500000000").unwrap();
let mut blknum = 0;
for _ in 0..50 {
for _ in 0..10000 {
test_key.field6 = blknum;
let writer = tline.writer().await;
writer
.put(
test_key,
lsn,
&Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
&ctx,
)
.await?;
writer.finish_write(lsn);
drop(writer);
Ok(())
}
keyspace.add_key(test_key);
// Test the vectored get real implementation against a simple sequential implementation.
//
// The test generates a keyspace by repeatedly flushing the in-memory layer and compacting.
// Projected to 2D the key space looks like below. Lsn grows upwards on the Y axis and keys
// grow to the right on the X axis.
// [Delta]
// [Delta]
// [Delta]
// [Delta]
// ------------ Image ---------------
//
// After layer generation we pick the ranges to query as follows:
// 1. The beginning of each delta layer
// 2. At the seam between two adjacent delta layers
//
// There's one major downside to this test: delta layers only contains images,
// so the search can stop at the first delta layer and doesn't traverse any deeper.
#[tokio::test]
async fn test_get_vectored() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_bulk_insert")?;
let (tenant, ctx) = harness.load().await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)
.await?;
lsn = Lsn(lsn.0 + 0x10);
blknum += 1;
let lsn = Lsn(0x10);
bulk_insert_compact_gc(tline.clone(), &ctx, lsn, 50, 10000).await?;
let guard = tline.layers.read().await;
guard.layer_map().dump(true, &ctx).await?;
let mut reads = Vec::new();
let mut prev = None;
guard.layer_map().iter_historic_layers().for_each(|desc| {
if !desc.is_delta() {
prev = Some(desc.clone());
return;
}
let cutoff = tline.get_last_record_lsn();
let start = desc.key_range.start;
let end = desc
.key_range
.start
.add(Timeline::MAX_GET_VECTORED_KEYS.try_into().unwrap());
reads.push(KeySpace {
ranges: vec![start..end],
});
tline
.update_gc_info(
Vec::new(),
cutoff,
Duration::ZERO,
&CancellationToken::new(),
&ctx,
)
.await?;
tline.freeze_and_flush().await?;
tline
.compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
.await?;
tline.gc().await?;
if let Some(prev) = &prev {
if !prev.is_delta() {
return;
}
let first_range = Key {
field6: prev.key_range.end.field6 - 4,
..prev.key_range.end
}..prev.key_range.end;
let second_range = desc.key_range.start..Key {
field6: desc.key_range.start.field6 + 4,
..desc.key_range.start
};
reads.push(KeySpace {
ranges: vec![first_range, second_range],
});
};
prev = Some(desc.clone());
});
drop(guard);
// Pick a big LSN such that we query over all the changes.
// Technically, u64::MAX - 1 is the largest LSN supported by the read path,
// but there seems to be a bug on the non-vectored search path which surfaces
// in that case.
let reads_lsn = Lsn(u64::MAX - 1000);
for read in reads {
info!("Doing vectored read on {:?}", read);
tline.set_get_vectored_impl(GetVectoredImpl::Sequential);
let sequential_res = tline
.get_vectored(&read.ranges, reads_lsn, &ctx)
.await?
.into_iter();
tline.set_get_vectored_impl(GetVectoredImpl::Vectored);
let vectored_res = tline
.get_vectored(&read.ranges, reads_lsn, &ctx)
.await?
.into_iter();
sequential_res.zip(vectored_res).for_each(
|((seq_key, seq_res), (vec_key, vec_res))| {
assert_eq!(seq_key, vec_key);
match (&seq_res, &vec_res) {
(Ok(seq_blob), Ok(vec_blob)) => assert_eq!(seq_blob, vec_blob),
_ => {
error!(
"Unexpected error for key={}: seq_res={:?} vec_res={:?}",
seq_key,
seq_res.map(|_| "Ok(...)"),
vec_res.map(|_| "Ok(...)")
);
panic!()
}
}
},
);
}
Ok(())

View File

@@ -51,7 +51,10 @@ use crate::keyspace::KeyPartitioning;
use crate::repository::Key;
use crate::tenant::storage_layer::InMemoryLayer;
use anyhow::Result;
use std::collections::VecDeque;
use pageserver_api::keyspace::KeySpaceAccum;
use std::cmp::Ordering;
use std::collections::{BTreeMap, VecDeque};
use std::iter::Peekable;
use std::ops::Range;
use std::sync::Arc;
use utils::lsn::Lsn;
@@ -144,11 +147,238 @@ impl Drop for BatchedUpdates<'_> {
}
/// Return value of LayerMap::search
#[derive(Eq, PartialEq, Debug)]
pub struct SearchResult {
pub layer: Arc<PersistentLayerDesc>,
pub lsn_floor: Lsn,
}
#[derive(Debug)]
pub struct OrderedSearchResult(pub SearchResult);
impl Ord for OrderedSearchResult {
fn cmp(&self, other: &Self) -> Ordering {
self.0.lsn_floor.cmp(&other.0.lsn_floor)
}
}
impl PartialOrd for OrderedSearchResult {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl PartialEq for OrderedSearchResult {
fn eq(&self, other: &Self) -> bool {
self.0.lsn_floor == other.0.lsn_floor
}
}
impl Eq for OrderedSearchResult {}
#[derive(Debug)]
pub struct RangeSearchResult {
pub found: BTreeMap<OrderedSearchResult, KeySpaceAccum>,
pub not_found: KeySpaceAccum,
}
impl RangeSearchResult {
fn new() -> Self {
Self {
found: BTreeMap::new(),
not_found: KeySpaceAccum::new(),
}
}
}
/// Collector for results of range search queries on the LayerMap.
/// It should be provided with two iterators for the delta and image coverage
/// that contain all the changes for layers which intersect the range.
struct RangeSearchCollector<Iter>
where
Iter: Iterator<Item = (i128, Option<Arc<PersistentLayerDesc>>)>,
{
delta_coverage: Peekable<Iter>,
image_coverage: Peekable<Iter>,
key_range: Range<Key>,
end_lsn: Lsn,
current_delta: Option<Arc<PersistentLayerDesc>>,
current_image: Option<Arc<PersistentLayerDesc>>,
result: RangeSearchResult,
}
#[derive(Debug)]
enum NextLayerType {
Delta(i128),
Image(i128),
Both(i128),
}
impl NextLayerType {
fn next_change_at_key(&self) -> Key {
match self {
NextLayerType::Delta(at) => Key::from_i128(*at),
NextLayerType::Image(at) => Key::from_i128(*at),
NextLayerType::Both(at) => Key::from_i128(*at),
}
}
}
impl<Iter> RangeSearchCollector<Iter>
where
Iter: Iterator<Item = (i128, Option<Arc<PersistentLayerDesc>>)>,
{
fn new(
key_range: Range<Key>,
end_lsn: Lsn,
delta_coverage: Iter,
image_coverage: Iter,
) -> Self {
Self {
delta_coverage: delta_coverage.peekable(),
image_coverage: image_coverage.peekable(),
key_range: key_range.clone(),
end_lsn,
current_delta: None,
current_image: None,
result: RangeSearchResult::new(),
}
}
/// Run the collector. Collection is implemented via a two pointer algorithm.
/// One pointer tracks the start of the current range and the other tracks
/// the beggining of the next range which will overlap with the next change
/// in coverage across both image and delta.
fn collect(mut self) -> RangeSearchResult {
let next_layer_type = self.choose_next_layer_type();
let mut current_range_start = match next_layer_type {
None => {
// No changes for the range
self.pad_range(self.key_range.clone());
return self.result;
}
Some(layer_type) if self.key_range.end <= layer_type.next_change_at_key() => {
// Changes only after the end of the range
self.pad_range(self.key_range.clone());
return self.result;
}
Some(layer_type) => {
// Changes for the range exist. Record anything before the first
// coverage change as not found.
let coverage_start = layer_type.next_change_at_key();
let range_before = self.key_range.start..coverage_start;
self.pad_range(range_before);
self.advance(&layer_type);
coverage_start
}
};
while current_range_start < self.key_range.end {
let next_layer_type = self.choose_next_layer_type();
match next_layer_type {
Some(t) => {
let current_range_end = t.next_change_at_key();
self.add_range(current_range_start..current_range_end);
current_range_start = current_range_end;
self.advance(&t);
}
None => {
self.add_range(current_range_start..self.key_range.end);
current_range_start = self.key_range.end;
}
}
}
self.result
}
/// Mark a range as not found (i.e. no layers intersect it)
fn pad_range(&mut self, key_range: Range<Key>) {
if !key_range.is_empty() {
self.result.not_found.add_range(key_range);
}
}
/// Select the appropiate layer for the given range and update
/// the collector.
fn add_range(&mut self, covered_range: Range<Key>) {
let selected = LayerMap::select_layer(
self.current_delta.clone(),
self.current_image.clone(),
self.end_lsn,
);
match selected {
Some(search_result) => self
.result
.found
.entry(OrderedSearchResult(search_result))
.or_default()
.add_range(covered_range),
None => self.pad_range(covered_range),
}
}
/// Move to the next coverage change.
fn advance(&mut self, layer_type: &NextLayerType) {
match layer_type {
NextLayerType::Delta(_) => {
let (_, layer) = self.delta_coverage.next().unwrap();
self.current_delta = layer;
}
NextLayerType::Image(_) => {
let (_, layer) = self.image_coverage.next().unwrap();
self.current_image = layer;
}
NextLayerType::Both(_) => {
let (_, image_layer) = self.image_coverage.next().unwrap();
let (_, delta_layer) = self.delta_coverage.next().unwrap();
self.current_image = image_layer;
self.current_delta = delta_layer;
}
}
}
/// Pick the next coverage change: the one at the lesser key or both if they're alligned.
fn choose_next_layer_type(&mut self) -> Option<NextLayerType> {
let next_delta_at = self.delta_coverage.peek().map(|(key, _)| key);
let next_image_at = self.image_coverage.peek().map(|(key, _)| key);
match (next_delta_at, next_image_at) {
(None, None) => None,
(Some(next_delta_at), None) => Some(NextLayerType::Delta(*next_delta_at)),
(None, Some(next_image_at)) => Some(NextLayerType::Image(*next_image_at)),
(Some(next_delta_at), Some(next_image_at)) if next_image_at < next_delta_at => {
Some(NextLayerType::Image(*next_image_at))
}
(Some(next_delta_at), Some(next_image_at)) if next_delta_at < next_image_at => {
Some(NextLayerType::Delta(*next_delta_at))
}
(Some(next_delta_at), Some(_)) => Some(NextLayerType::Both(*next_delta_at)),
}
}
}
#[derive(PartialEq, Eq, Hash, Debug, Clone)]
pub enum InMemoryLayerHandle {
Open { lsn_floor: Lsn },
Frozen { idx: usize, lsn_floor: Lsn },
}
impl InMemoryLayerHandle {
pub fn get_lsn_floor(&self) -> Lsn {
match self {
InMemoryLayerHandle::Open { lsn_floor } => *lsn_floor,
InMemoryLayerHandle::Frozen { lsn_floor, .. } => *lsn_floor,
}
}
}
impl LayerMap {
///
/// Find the latest layer (by lsn.end) that covers the given
@@ -186,7 +416,18 @@ impl LayerMap {
let latest_delta = version.delta_coverage.query(key.to_i128());
let latest_image = version.image_coverage.query(key.to_i128());
match (latest_delta, latest_image) {
Self::select_layer(latest_delta, latest_image, end_lsn)
}
fn select_layer(
delta_layer: Option<Arc<PersistentLayerDesc>>,
image_layer: Option<Arc<PersistentLayerDesc>>,
end_lsn: Lsn,
) -> Option<SearchResult> {
assert!(delta_layer.as_ref().map_or(true, |l| l.is_delta()));
assert!(image_layer.as_ref().map_or(true, |l| !l.is_delta()));
match (delta_layer, image_layer) {
(None, None) => None,
(None, Some(image)) => {
let lsn_floor = image.get_lsn_range().start;
@@ -223,6 +464,17 @@ impl LayerMap {
}
}
pub fn range_search(&self, key_range: Range<Key>, end_lsn: Lsn) -> Option<RangeSearchResult> {
let version = self.historic.get().unwrap().get_version(end_lsn.0 - 1)?;
let raw_range = key_range.start.to_i128()..key_range.end.to_i128();
let delta_changes = version.delta_coverage.range_overlaps(&raw_range);
let image_changes = version.image_coverage.range_overlaps(&raw_range);
let collector = RangeSearchCollector::new(key_range, end_lsn, delta_changes, image_changes);
Some(collector.collect())
}
/// Start a batch of updates, applied on drop
pub fn batch_update(&mut self) -> BatchedUpdates<'_> {
BatchedUpdates { layer_map: self }
@@ -321,6 +573,47 @@ impl LayerMap {
self.historic.iter()
}
pub fn iter_in_memory_layers(&self) -> impl '_ + Iterator<Item = Arc<InMemoryLayer>> {
match &self.open_layer {
Some(layer) => itertools::Either::Left(
std::iter::once(layer.clone()).chain(self.frozen_layers.iter().cloned().rev()),
),
None => itertools::Either::Right(self.frozen_layers.iter().cloned().rev()),
}
}
// TODO:
// * define in memory handle
// * update layer fringe to use PersistentLayerDesc and InMemoryLayerHandle
pub fn find_in_memory_layer<Pred>(&self, mut pred: Pred) -> Option<InMemoryLayerHandle>
where
Pred: FnMut(&Arc<InMemoryLayer>) -> bool,
{
if let Some(open) = &self.open_layer {
if pred(open) {
return Some(InMemoryLayerHandle::Open {
lsn_floor: open.get_lsn_range().start,
});
}
}
let pos = self.frozen_layers.iter().position(pred);
match pos {
Some(idx) => Some(InMemoryLayerHandle::Frozen {
idx,
lsn_floor: self.frozen_layers[idx].get_lsn_range().start,
}),
None => None,
}
}
pub fn get_in_memory_layer(&self, handle: &InMemoryLayerHandle) -> Option<Arc<InMemoryLayer>> {
match handle {
InMemoryLayerHandle::Open { .. } => self.open_layer.clone(),
InMemoryLayerHandle::Frozen { idx, .. } => self.frozen_layers.get(*idx).cloned(),
}
}
///
/// Divide the whole given range of keys into sub-ranges based on the latest
/// image layer that covers each range at the specified lsn (inclusive).
@@ -631,3 +924,163 @@ impl LayerMap {
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[derive(Clone)]
struct LayerDesc {
key_range: Range<Key>,
lsn_range: Range<Lsn>,
is_delta: bool,
}
fn create_layer_map(layers: Vec<LayerDesc>) -> LayerMap {
let mut layer_map = LayerMap::default();
for layer in layers {
layer_map.insert_historic_noflush(PersistentLayerDesc::new_test(
layer.key_range,
layer.lsn_range,
layer.is_delta,
));
}
layer_map.flush_updates();
layer_map
}
fn assert_range_search_result_eq(lhs: RangeSearchResult, rhs: RangeSearchResult) {
assert_eq!(lhs.not_found.to_keyspace(), rhs.not_found.to_keyspace());
let lhs: Vec<_> = lhs
.found
.into_iter()
.map(|(search_result, accum)| (search_result.0, accum.to_keyspace()))
.collect();
let rhs: Vec<_> = rhs
.found
.into_iter()
.map(|(search_result, accum)| (search_result.0, accum.to_keyspace()))
.collect();
assert_eq!(lhs, rhs);
}
fn brute_force_range_search(
layer_map: &LayerMap,
key_range: Range<Key>,
end_lsn: Lsn,
) -> RangeSearchResult {
let mut range_search_result = RangeSearchResult::new();
let mut key = key_range.start;
while key != key_range.end {
let res = layer_map.search(key, end_lsn);
match res {
Some(res) => {
range_search_result
.found
.entry(OrderedSearchResult(res))
.or_default()
.add_key(key);
}
None => {
range_search_result.not_found.add_key(key);
}
}
key = key.next();
}
range_search_result
}
#[test]
fn ranged_search_on_empty_layer_map() {
let layer_map = LayerMap::default();
let range = Key::from_i128(100)..Key::from_i128(200);
let res = layer_map.range_search(range, Lsn(100));
assert!(res.is_none());
}
#[test]
fn ranged_search() {
let layers = vec![
LayerDesc {
key_range: Key::from_i128(15)..Key::from_i128(50),
lsn_range: Lsn(0)..Lsn(5),
is_delta: false,
},
LayerDesc {
key_range: Key::from_i128(10)..Key::from_i128(20),
lsn_range: Lsn(10)..Lsn(20),
is_delta: true,
},
LayerDesc {
key_range: Key::from_i128(15)..Key::from_i128(25),
lsn_range: Lsn(20)..Lsn(30),
is_delta: true,
},
LayerDesc {
key_range: Key::from_i128(35)..Key::from_i128(40),
lsn_range: Lsn(25)..Lsn(35),
is_delta: true,
},
LayerDesc {
key_range: Key::from_i128(35)..Key::from_i128(40),
lsn_range: Lsn(35)..Lsn(40),
is_delta: false,
},
];
let layer_map = create_layer_map(layers.clone());
for start in 0..60 {
for end in (start + 1)..60 {
let range = Key::from_i128(start)..Key::from_i128(end);
let result = layer_map.range_search(range.clone(), Lsn(100)).unwrap();
let expected = brute_force_range_search(&layer_map, range, Lsn(100));
assert_range_search_result_eq(result, expected);
}
}
}
#[test]
fn ranged_search_compacted() {
let layers = vec![
LayerDesc {
key_range: Key::from_i128(0)..Key::from_i128(100),
lsn_range: Lsn(9)..Lsn(10),
is_delta: false,
},
LayerDesc {
key_range: Key::from_i128(10)..Key::from_i128(20),
lsn_range: Lsn(10)..Lsn(20),
is_delta: true,
},
LayerDesc {
key_range: Key::from_i128(20)..Key::from_i128(30),
lsn_range: Lsn(20)..Lsn(30),
is_delta: true,
},
LayerDesc {
key_range: Key::from_i128(30)..Key::from_i128(40),
lsn_range: Lsn(30)..Lsn(40),
is_delta: true,
},
];
let layer_map = create_layer_map(layers.clone());
for start in 0..105 {
for end in (start + 1)..105 {
let range = Key::from_i128(start)..Key::from_i128(end);
let result = layer_map.range_search(range.clone(), Lsn(100)).unwrap();
let expected = brute_force_range_search(&layer_map, range, Lsn(100));
assert_range_search_result_eq(result, expected);
}
}
}
}

View File

@@ -129,6 +129,42 @@ impl<Value: Clone> LayerCoverage<Value> {
.map(|(k, v)| (*k, v.as_ref().map(|x| x.1.clone())))
}
/// Returns an iterator which includes all coverage changes for layers that intersect
/// with the provided range.
pub fn range_overlaps(
&self,
key_range: &Range<i128>,
) -> impl Iterator<Item = (i128, Option<Value>)> + '_
where
Value: Eq,
{
let first_change = self.query(key_range.start);
match first_change {
Some(change) => {
// If the start of the range is covered, we have to deal with two cases:
// 1. Start of the range is aligned with the start of a layer.
// In this case the return of `self.range` will contain the layer which aligns with the start of the key range.
// We advance said iterator to avoid duplicating the first change.
// 2. Start of the range is not aligned with the start of a layer.
let range = key_range.start..key_range.end;
let mut range_coverage = self.range(range).peekable();
if range_coverage
.peek()
.is_some_and(|c| c.1 == Some(change.clone()))
{
range_coverage.next();
}
itertools::Either::Left(
std::iter::once((key_range.start, Some(change))).chain(range_coverage),
)
}
None => {
let range = key_range.start..key_range.end;
let coverage = self.range(range);
itertools::Either::Right(coverage)
}
}
}
/// O(1) clone
pub fn clone(&self) -> Self {
Self {

View File

@@ -8,17 +8,24 @@ pub(crate) mod layer;
mod layer_desc;
use crate::context::{AccessStatsBehavior, RequestContext};
use crate::repository::Value;
use crate::task_mgr::TaskKind;
use crate::walrecord::NeonWalRecord;
use bytes::Bytes;
use enum_map::EnumMap;
use enumset::EnumSet;
use once_cell::sync::Lazy;
use pageserver_api::key::Key;
use pageserver_api::keyspace::{KeySpace, KeySpaceAccum};
use pageserver_api::models::{
LayerAccessKind, LayerResidenceEvent, LayerResidenceEventReason, LayerResidenceStatus,
};
use std::cell::{Ref, RefCell};
use std::cmp::{Ordering, Reverse};
use std::collections::hash_map::Entry;
use std::collections::{BinaryHeap, HashMap};
use std::ops::Range;
use std::sync::Mutex;
use std::sync::{Arc, Mutex};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tracing::warn;
use utils::history_buffer::HistoryBufferWithDropCounter;
@@ -34,6 +41,13 @@ pub use layer_desc::{PersistentLayerDesc, PersistentLayerKey};
pub(crate) use layer::{EvictionError, Layer, ResidentLayer};
use self::layer::DownloadError;
use super::layer_map::{InMemoryLayerHandle, LayerMap};
use super::timeline::layer_manager::LayerManager;
use super::timeline::GetVectoredError;
use super::{PageReconstructError, Timeline};
pub fn range_overlaps<T>(a: &Range<T>, b: &Range<T>) -> bool
where
T: PartialOrd<T>,
@@ -61,12 +75,219 @@ where
/// the same ValueReconstructState struct in the next 'get_value_reconstruct_data'
/// call, to collect more records.
///
#[derive(Debug)]
#[derive(Debug, Default)]
pub struct ValueReconstructState {
pub records: Vec<(Lsn, NeonWalRecord)>,
pub img: Option<(Lsn, Bytes)>,
}
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
pub enum ValureReconstructSituation {
Complete,
#[default]
Continue,
}
#[derive(Debug, Default, Clone)]
pub struct ValueReconstructStateTmp {
pub records: Vec<(Lsn, NeonWalRecord)>,
pub img: Option<(Lsn, Bytes)>,
cached_lsn: Option<Lsn>,
situation: ValureReconstructSituation,
}
impl From<ValueReconstructStateTmp> for ValueReconstructState {
fn from(mut state: ValueReconstructStateTmp) -> Self {
state.records.sort_by_key(|(lsn, _)| Reverse(*lsn));
ValueReconstructState {
records: state.records,
img: state.img
}
}
}
pub struct ValuesReconstructState {
pub keys: HashMap<Key, Result<ValueReconstructStateTmp, PageReconstructError>>,
pub total_keys_done: usize,
keys_done: KeySpaceAccum,
}
impl ValuesReconstructState {
pub fn new() -> Self {
Self {
keys: HashMap::new(),
total_keys_done: 0,
keys_done: KeySpaceAccum::new(),
}
}
pub fn on_key_error(&mut self, key: Key, err: PageReconstructError) {
let previous = self.keys.insert(key, Err(err));
if let Some(Ok(state)) = previous {
if state.situation == ValureReconstructSituation::Continue {
self.total_keys_done += 1;
self.keys_done.add_key(key);
}
}
}
pub fn update_key(&mut self, key: &Key, lsn: Lsn, value: Value) -> bool {
let state = self
.keys
.entry(*key)
.or_insert(Ok(ValueReconstructStateTmp::default()));
if let Ok(state) = state {
let key_done = match state.situation {
ValureReconstructSituation::Complete => true,
ValureReconstructSituation::Continue => match value {
Value::Image(img) => {
state.img = Some((lsn, img));
true
}
Value::WalRecord(rec) => {
let reached_cache = state.cached_lsn.map(|clsn| clsn + 1) == Some(lsn);
let will_init = rec.will_init();
state.records.push((lsn, rec));
will_init || reached_cache
}
},
};
if key_done && state.situation == ValureReconstructSituation::Continue {
state.situation = ValureReconstructSituation::Complete;
self.total_keys_done += 1;
self.keys_done.add_key(*key);
}
key_done
} else {
true
}
}
pub fn get_cached_lsn(&self, key: &Key) -> Option<Lsn> {
self.keys
.get(key)
.and_then(|k| k.as_ref().ok())
.and_then(|state| state.cached_lsn)
}
pub fn consume_done_keys(&mut self) -> KeySpace {
self.keys_done.consume_keyspace()
}
}
#[derive(PartialEq, Eq, Hash, Debug, Clone)]
pub(crate) enum ReadableLayerDesc {
Persistent {
desc: PersistentLayerDesc,
lsn_floor: Lsn,
},
InMemory(InMemoryLayerHandle),
}
#[derive(Debug)]
struct ReadableLayerDescOrdered(ReadableLayerDesc);
#[derive(Debug)]
pub struct LayerFringe {
layers_by_lsn: BinaryHeap<Reverse<ReadableLayerDescOrdered>>,
layers: HashMap<ReadableLayerDesc, KeySpace>,
}
impl LayerFringe {
pub fn new() -> Self {
LayerFringe {
layers_by_lsn: BinaryHeap::new(),
layers: HashMap::new(),
}
}
pub fn next_layer(&mut self) -> Option<(ReadableLayerDesc, KeySpace)> {
let handle = match self.layers_by_lsn.pop() {
Some(h) => h,
None => return None,
};
let removed = self.layers.remove_entry(&handle.0 .0);
match removed {
Some((layer, keyspace)) => Some((layer, keyspace)),
None => panic!(),
}
}
pub fn update(&mut self, layer: ReadableLayerDesc, keyspace: KeySpace) {
let entry = self.layers.entry(layer.clone());
match entry {
Entry::Occupied(mut entry) => {
entry.get_mut().merge(&keyspace);
},
Entry::Vacant(entry) => {
self.layers_by_lsn
.push(Reverse(ReadableLayerDescOrdered(entry.key().clone())));
entry.insert(keyspace);
}
}
}
}
impl Ord for ReadableLayerDescOrdered {
fn cmp(&self, other: &Self) -> Ordering {
self.0.get_lsn_floor().cmp(&other.0.get_lsn_floor())
}
}
impl PartialOrd for ReadableLayerDescOrdered {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl PartialEq for ReadableLayerDescOrdered {
fn eq(&self, other: &Self) -> bool {
self.0.get_lsn_floor() == other.0.get_lsn_floor()
}
}
impl Eq for ReadableLayerDescOrdered {}
impl ReadableLayerDesc {
pub(super) fn get_lsn_floor(&self) -> Lsn {
match self {
ReadableLayerDesc::Persistent { lsn_floor, .. } => *lsn_floor,
ReadableLayerDesc::InMemory(l) => l.get_lsn_floor(),
}
}
pub async fn get_values_reconstruct_data(
&self,
layer_manager: &LayerManager,
keyspace: KeySpace,
end_lsn: Lsn,
reconstruct_state: &mut ValuesReconstructState,
ctx: &RequestContext,
) -> Result<(), GetVectoredError> {
match self {
ReadableLayerDesc::Persistent { desc, .. } => {
let layer = layer_manager.get_from_desc(desc);
layer
.get_values_reconstruct_data(keyspace, end_lsn, reconstruct_state, ctx)
.await
}
ReadableLayerDesc::InMemory(desc) => {
let layer = layer_manager.layer_map().get_in_memory_layer(desc).unwrap();
layer
.get_values_reconstruct_data(keyspace, end_lsn, reconstruct_state, ctx)
.await
}
}
}
}
/// Return value from [`Layer::get_value_reconstruct_data`]
#[derive(Clone, Copy, Debug)]
pub enum ValueReconstructResult {

View File

@@ -35,16 +35,19 @@ use crate::tenant::blob_io::BlobWriter;
use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, FileBlockReader};
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState};
use crate::tenant::Timeline;
use crate::tenant::timeline::GetVectoredError;
use crate::tenant::{PageReconstructError, Timeline};
use crate::virtual_file::VirtualFile;
use crate::{walrecord, TEMP_FILE_SUFFIX};
use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
use anyhow::{bail, ensure, Context, Result};
use anyhow::{anyhow, bail, ensure, Context, Result};
use camino::{Utf8Path, Utf8PathBuf};
use pageserver_api::keyspace::{KeySpace, KeySpaceAccum};
use pageserver_api::models::LayerAccessKind;
use pageserver_api::shard::TenantShardId;
use rand::{distributions::Alphanumeric, Rng};
use serde::{Deserialize, Serialize};
use std::collections::{BTreeMap, VecDeque};
use std::fs::File;
use std::io::SeekFrom;
use std::ops::Range;
@@ -59,7 +62,9 @@ use utils::{
lsn::Lsn,
};
use super::{AsLayerDesc, LayerAccessStats, PersistentLayerDesc, ResidentLayer};
use super::{
AsLayerDesc, LayerAccessStats, PersistentLayerDesc, ResidentLayer, ValuesReconstructState,
};
///
/// Header stored in the beginning of the file
@@ -773,6 +778,7 @@ impl DeltaLayerInner {
let cursor = file.block_cursor();
let mut buf = Vec::new();
for (entry_lsn, pos) in offsets {
info!("Reading sequential from delta layer: key={} lsn={} block={}", key, entry_lsn, pos);
cursor
.read_blob_into_buf(pos, &mut buf, ctx)
.await
@@ -812,6 +818,121 @@ impl DeltaLayerInner {
}
}
pub(super) async fn get_values_reconstruct_data(
&self,
keyspace: KeySpace,
end_lsn: Lsn,
reconstruct_state: &mut ValuesReconstructState,
ctx: &RequestContext,
) -> Result<(), GetVectoredError> {
// Scan the page versions backwards, starting from `lsn`.
let file = &self.file;
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
self.index_start_blk,
self.index_root_blk,
file,
);
let mut offsets: BTreeMap<Key, VecDeque<(Lsn, u64)>> = BTreeMap::new();
for range in keyspace.ranges.iter() {
let mut ignore_key = None;
let last_key = Key::from_i128(range.end.to_i128() - 1);
let last_key = DeltaKey::from_key_lsn(&last_key, Lsn(end_lsn.0 - 1));
tree_reader
.visit(
&last_key.0,
VisitDirection::Backwards,
|raw_key, value| {
let key = Key::from_slice(&raw_key[..KEY_SIZE]);
let entry_lsn = DeltaKey::extract_lsn_from_buf(raw_key);
if key < range.start {
return false;
}
if Some(key) == ignore_key {
return true;
}
if let Some(cached_lsn) = reconstruct_state.get_cached_lsn(&key) {
if entry_lsn <= cached_lsn {
return key != range.start;
}
}
let blob_ref = BlobRef(value);
let lsns_at = offsets
.entry(key)
.or_default();
lsns_at.push_front((entry_lsn, blob_ref.pos()));
if blob_ref.will_init() {
if key == range.start {
return false;
} else {
ignore_key = Some(key);
return true;
}
}
true
},
&RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::DeltaLayerBtreeNode)
.build(),
)
.await
.map_err(|err| GetVectoredError::Other(anyhow!(err)))?;
}
let ctx = &RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::DeltaLayerValue)
.build();
// TODO: coalesce reads
let cursor = file.block_cursor();
let mut buf = Vec::new();
for (key, lsns_at) in offsets {
for (lsn, block_offset) in lsns_at {
info!("Reading vectored from delta layer: key={} lsn={} block={}", key, lsn, block_offset);
let res = cursor
.read_blob_into_buf(block_offset, &mut buf, ctx)
.await;
if let Err(e) = res {
reconstruct_state.on_key_error(
key,
PageReconstructError::from(anyhow!(e).context(format!(
"Failed to read blob from virtual file {}",
file.file.path
))),
);
break;
}
let value = Value::des(&buf);
if let Err(e) = value {
reconstruct_state.on_key_error(
key,
PageReconstructError::from(anyhow!(e).context(format!(
"Failed to deserialize file blob from virtual file {}",
file.file.path
))),
);
break;
}
reconstruct_state.update_key(&key, lsn, value.unwrap());
}
}
Ok(())
}
pub(super) async fn load_keys<'a>(
&'a self,
ctx: &RequestContext,

View File

@@ -26,20 +26,22 @@
use crate::config::PageServerConf;
use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
use crate::page_cache::PAGE_SZ;
use crate::repository::{Key, KEY_SIZE};
use crate::repository::{Key, Value, KEY_SIZE};
use crate::tenant::blob_io::BlobWriter;
use crate::tenant::block_io::{BlockBuf, BlockReader, FileBlockReader};
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
use crate::tenant::storage_layer::{
LayerAccessStats, ValueReconstructResult, ValueReconstructState,
};
use crate::tenant::Timeline;
use crate::tenant::timeline::GetVectoredError;
use crate::tenant::{PageReconstructError, Timeline};
use crate::virtual_file::VirtualFile;
use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
use anyhow::{bail, ensure, Context, Result};
use anyhow::{anyhow, bail, ensure, Context, Result};
use bytes::Bytes;
use camino::{Utf8Path, Utf8PathBuf};
use hex;
use pageserver_api::keyspace::{key_range_size, KeySpace};
use pageserver_api::models::LayerAccessKind;
use pageserver_api::shard::TenantShardId;
use rand::{distributions::Alphanumeric, Rng};
@@ -59,7 +61,7 @@ use utils::{
};
use super::filename::ImageFileName;
use super::{AsLayerDesc, Layer, PersistentLayerDesc, ResidentLayer};
use super::{AsLayerDesc, Layer, PersistentLayerDesc, ResidentLayer, ValuesReconstructState};
///
/// Header stored in the beginning of the file
@@ -444,6 +446,108 @@ impl ImageLayerInner {
Ok(ValueReconstructResult::Missing)
}
}
pub(super) async fn get_values_reconstruct_data(
&self,
keyspace: KeySpace,
reconstruct_state: &mut ValuesReconstructState,
ctx: &RequestContext,
) -> Result<(), GetVectoredError> {
// Same deal as for get_values_reconstruct_data in delta_layer.rs.
// It's simple here since we only have keys and no LSNs.
// For each range in the keyspace, visit the btree starting from the start key.
// Move forward while collecting block offsets until we hit the end key.
//
// Once done, issue the reads.
let file = &self.file;
let tree_reader = DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, file);
let mut blocks_for_ranges = Vec::new();
for range in keyspace.ranges.iter() {
let mut raw_key: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
range.start.write_to_byte_slice(&mut raw_key);
let maybe_start_offset = tree_reader
.get(
&raw_key,
&RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::ImageLayerBtreeNode)
.build(),
)
.await
.map_err(|err| GetVectoredError::Other(anyhow!(err)))?;
match maybe_start_offset {
Some(start_offset) => {
blocks_for_ranges.push((range, start_offset));
}
None => {
return Err(GetVectoredError::MissingKey(range.start));
}
}
// Assert on the fact that the on disk key range has no gaps
// in the interval specified by 'range'.
let range_size = key_range_size(range);
let last_key = range.start.add(range_size - 1);
last_key.write_to_byte_slice(&mut raw_key);
let maybe_end_offset = tree_reader
.get(
&raw_key,
&RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::ImageLayerBtreeNode)
.build(),
)
.await
.map_err(|err| GetVectoredError::Other(anyhow!(err)))?;
assert!(maybe_end_offset.is_some());
assert_eq!(
maybe_end_offset.unwrap() - maybe_start_offset.unwrap(),
(range_size - 1) as u64
);
}
let ctx = &RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::ImageLayerValue)
.build();
// TODO: coalesce reads
let cursor = file.block_cursor();
let mut buf = Vec::new();
for (range, start_offset) in blocks_for_ranges {
let mut range_offset = 0;
let mut key = range.start;
while key < range.end {
let res = cursor
.read_blob_into_buf(start_offset + range_offset, &mut buf, ctx)
.await;
if let Err(e) = res {
reconstruct_state.on_key_error(
key,
PageReconstructError::from(anyhow!(e).context(format!(
"Failed to read blob from virtual file {}",
file.file.path
))),
);
buf.clear();
break;
}
let blob = Bytes::copy_from_slice(buf.as_slice());
reconstruct_state.update_key(&key, self.lsn, Value::Image(blob));
buf.clear();
range_offset += 1;
key = key.next();
}
}
Ok(())
}
}
/// A builder object for constructing a new image layer.

View File

@@ -9,13 +9,16 @@ use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
use crate::repository::{Key, Value};
use crate::tenant::block_io::BlockReader;
use crate::tenant::ephemeral_file::EphemeralFile;
use crate::tenant::storage_layer::{ValueReconstructResult, ValueReconstructState};
use crate::tenant::Timeline;
use crate::tenant::storage_layer::ValueReconstructResult;
use crate::tenant::timeline::GetVectoredError;
use crate::tenant::{PageReconstructError, Timeline};
use crate::walrecord;
use anyhow::{ensure, Result};
use anyhow::{anyhow, ensure, Result};
use pageserver_api::keyspace::KeySpace;
use pageserver_api::models::InMemoryLayerInfo;
use pageserver_api::shard::TenantShardId;
use std::collections::HashMap;
use std::cmp::{Ordering, Reverse};
use std::collections::{BinaryHeap, HashMap, HashSet};
use std::sync::{Arc, OnceLock};
use tracing::*;
use utils::{bin_ser::BeSer, id::TimelineId, lsn::Lsn, vec_map::VecMap};
@@ -25,7 +28,7 @@ use std::fmt::Write as _;
use std::ops::Range;
use tokio::sync::{RwLock, RwLockWriteGuard};
use super::{DeltaLayerWriter, ResidentLayer};
use super::{DeltaLayerWriter, ResidentLayer, ValueReconstructState, ValuesReconstructState};
pub struct InMemoryLayer {
conf: &'static PageServerConf,
@@ -59,6 +62,7 @@ pub struct InMemoryLayerInner {
/// All versions of all pages in the layer are kept here. Indexed
/// by block number and LSN. The value is an offset into the
/// ephemeral file where the page version is stored.
// IDEA: would be great if this was ordered
index: HashMap<Key, VecMap<Lsn, u64>>,
/// The values are stored in a serialized format in this file.
@@ -67,6 +71,13 @@ pub struct InMemoryLayerInner {
file: EphemeralFile,
}
#[derive(Eq, PartialEq, Ord, PartialOrd)]
struct BlockRead {
key: Key,
lsn: Lsn,
block_offset: u64,
}
impl std::fmt::Debug for InMemoryLayerInner {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("InMemoryLayerInner").finish()
@@ -202,6 +213,85 @@ impl InMemoryLayer {
Ok(ValueReconstructResult::Complete)
}
}
pub(crate) async fn get_values_reconstruct_data(
&self,
keyspace: KeySpace,
end_lsn: Lsn,
reconstruct_state: &mut ValuesReconstructState,
ctx: &RequestContext,
) -> Result<(), GetVectoredError> {
let ctx = RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::InMemoryLayer)
.build();
let inner = self.inner.read().await;
let reader = inner.file.block_cursor();
let mut completed_keys = HashSet::new();
let mut min_heap = BinaryHeap::new();
for range in keyspace.ranges.iter() {
let mut key = range.start;
while key < range.end {
if let Some(vec_map) = inner.index.get(&key) {
let range = match reconstruct_state.get_cached_lsn(&key) {
Some(cached_lsn) => (cached_lsn + 1)..end_lsn,
None => self.start_lsn..end_lsn,
};
let slice = vec_map.slice_range(range);
for (entry_lsn, pos) in slice.iter().rev() {
min_heap.push(Reverse(BlockRead {
key,
lsn: *entry_lsn,
block_offset: *pos,
}));
}
}
key = key.next();
}
}
let keyspace_size = keyspace.total_size();
// IDEA: coalesce reads as long as they have sequential block numbers
while completed_keys.len() < keyspace_size && !min_heap.is_empty() {
let block_read = min_heap.pop().unwrap();
if completed_keys.contains(&block_read.0.key) {
continue;
}
let buf = reader.read_blob(block_read.0.block_offset, &ctx).await;
if let Err(e) = buf {
reconstruct_state
.on_key_error(block_read.0.key, PageReconstructError::from(anyhow!(e)));
completed_keys.insert(block_read.0.key);
continue;
}
let value = Value::des(&buf.unwrap());
if let Err(e) = value {
reconstruct_state
.on_key_error(block_read.0.key, PageReconstructError::from(anyhow!(e)));
completed_keys.insert(block_read.0.key);
continue;
}
let key_done = reconstruct_state.update_key(
dbg!(&block_read.0.key),
block_read.0.lsn,
value.unwrap(),
);
if key_done {
completed_keys.insert(block_read.0.key);
}
}
Ok(())
}
}
impl std::fmt::Display for InMemoryLayer {

View File

@@ -1,5 +1,6 @@
use anyhow::Context;
use camino::{Utf8Path, Utf8PathBuf};
use pageserver_api::keyspace::KeySpace;
use pageserver_api::models::{
HistoricLayerInfo, LayerAccessKind, LayerResidenceEventReason, LayerResidenceStatus,
};
@@ -15,10 +16,11 @@ use utils::sync::heavier_once_cell;
use crate::config::PageServerConf;
use crate::context::RequestContext;
use crate::repository::Key;
use crate::tenant::timeline::GetVectoredError;
use crate::tenant::{remote_timeline_client::LayerFileMetadata, Timeline};
use super::delta_layer::{self, DeltaEntry};
use super::image_layer;
use super::{image_layer, ValuesReconstructState};
use super::{
AsLayerDesc, LayerAccessStats, LayerAccessStatsReset, LayerFileName, PersistentLayerDesc,
ValueReconstructResult, ValueReconstructState,
@@ -261,6 +263,40 @@ impl Layer {
.with_context(|| format!("get_value_reconstruct_data for layer {self}"))
}
pub(crate) async fn get_values_reconstruct_data(
&self,
keyspace: KeySpace,
end_lsn: Lsn,
reconstruct_data: &mut ValuesReconstructState,
ctx: &RequestContext,
) -> Result<(), GetVectoredError> {
// use anyhow::ensure;
let layer = self
.0
.get_or_maybe_download(true, Some(ctx))
.await
.map_err(|err| GetVectoredError::Other(anyhow::anyhow!(err)))?;
self.0
.access_stats
.record_access(LayerAccessKind::GetValueReconstructData, ctx);
// if self.layer_desc().is_delta {
// ensure!(lsn_range.start >= self.layer_desc().lsn_range.start);
// ensure!(self.layer_desc().key_range.contains(&key));
// } else {
// ensure!(self.layer_desc().key_range.contains(&key));
// ensure!(lsn_range.start >= self.layer_desc().image_layer_lsn());
// ensure!(lsn_range.end >= self.layer_desc().image_layer_lsn());
// }
layer
.get_values_reconstruct_data(keyspace, end_lsn, reconstruct_data, &self.0, ctx)
.instrument(tracing::debug_span!("get_values_reconstruct_data", layer=%self))
.await
}
/// Download the layer if evicted.
///
/// Will not error when the layer is already downloaded.
@@ -1174,7 +1210,7 @@ pub(crate) enum EvictionError {
/// Error internal to the [`LayerInner::get_or_maybe_download`]
#[derive(Debug, thiserror::Error)]
enum DownloadError {
pub(crate) enum DownloadError {
#[error("timeline has already shutdown")]
TimelineShutdown,
#[error("no remote storage configured")]
@@ -1334,6 +1370,28 @@ impl DownloadedLayer {
}
}
async fn get_values_reconstruct_data(
&self,
keyspace: KeySpace,
end_lsn: Lsn,
reconstruct_data: &mut ValuesReconstructState,
owner: &Arc<LayerInner>,
ctx: &RequestContext,
) -> Result<(), GetVectoredError> {
use LayerKind::*;
match self.get(owner, ctx).await.map_err(GetVectoredError::from)? {
Delta(d) => {
d.get_values_reconstruct_data(keyspace, end_lsn, reconstruct_data, ctx)
.await
}
Image(i) => {
i.get_values_reconstruct_data(keyspace, reconstruct_data, ctx)
.await
}
}
}
async fn dump(&self, owner: &Arc<LayerInner>, ctx: &RequestContext) -> anyhow::Result<()> {
use LayerKind::*;
match self.get(owner, ctx).await? {

View File

@@ -15,7 +15,7 @@ use utils::id::TenantId;
/// A unique identifier of a persistent layer. This is different from `LayerDescriptor`, which is only used in the
/// benchmarks. This struct contains all necessary information to find the image / delta layer. It also provides
/// a unified way to generate layer information like file name.
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize, Hash)]
pub struct PersistentLayerDesc {
pub tenant_shard_id: TenantShardId,
pub timeline_id: TimelineId,
@@ -55,13 +55,13 @@ impl PersistentLayerDesc {
}
#[cfg(test)]
pub fn new_test(key_range: Range<Key>) -> Self {
pub fn new_test(key_range: Range<Key>, lsn_range: Range<Lsn>, is_delta: bool) -> Self {
Self {
tenant_shard_id: TenantShardId::unsharded(TenantId::generate()),
timeline_id: TimelineId::generate(),
key_range,
lsn_range: Lsn(0)..Lsn(1),
is_delta: false,
lsn_range,
is_delta,
file_size: 0,
}
}

View File

@@ -60,7 +60,7 @@ use crate::{
tenant::storage_layer::{
AsLayerDesc, DeltaLayerWriter, EvictionError, ImageLayerWriter, InMemoryLayer, Layer,
LayerAccessStatsReset, LayerFileName, ResidentLayer, ValueReconstructResult,
ValueReconstructState,
ValueReconstructState, ValuesReconstructState,
},
};
use crate::{
@@ -104,11 +104,14 @@ use self::layer_manager::LayerManager;
use self::logical_size::LogicalSize;
use self::walreceiver::{WalReceiver, WalReceiverConf};
use super::config::TenantConf;
use super::remote_timeline_client::index::{IndexLayerMetadata, IndexPart};
use super::remote_timeline_client::RemoteTimelineClient;
use super::secondary::heatmap::{HeatMapLayer, HeatMapTimeline};
use super::{config::TenantConf, storage_layer::ReadableLayerDesc};
use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf};
use super::{
remote_timeline_client::index::{IndexLayerMetadata, IndexPart},
storage_layer::LayerFringe,
};
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub(super) enum FlushLoopState {
@@ -159,6 +162,13 @@ pub struct TimelineResources {
pub deletion_queue_client: DeletionQueueClient,
}
#[derive(Debug, Default, Copy, Clone)]
pub(crate) enum GetVectoredImpl {
Sequential,
#[default]
Vectored,
}
pub struct Timeline {
conf: &'static PageServerConf,
tenant_conf: Arc<RwLock<AttachedTenantConf>>,
@@ -346,6 +356,8 @@ pub struct Timeline {
///
/// Timeline deletion will acquire both compaction and gc locks in whatever order.
gc_lock: tokio::sync::Mutex<()>,
get_vectored_impl: RwLock<GetVectoredImpl>,
}
pub struct WalReceiverInfo {
@@ -443,6 +455,30 @@ pub(crate) enum GetVectoredError {
#[error("Requested at invalid LSN: {0}")]
InvalidLsn(Lsn),
#[error("Requested key {0} not found")]
MissingKey(Key),
#[error(transparent)]
GetReadyAncestorError(GetReadyAncestorError),
#[error(transparent)]
Other(#[from] anyhow::Error),
}
#[derive(thiserror::Error, Debug)]
pub(crate) enum GetReadyAncestorError {
#[error("ancestor timeline {0} is being stopped")]
AncestorStopping(TimelineId),
#[error("Ancestor LSN wait error: {0}")]
AncestorLsnTimeout(#[from] WaitLsnError),
#[error("Cancelled")]
Cancelled,
#[error(transparent)]
Other(#[from] anyhow::Error),
}
#[derive(Clone, Copy)]
@@ -523,6 +559,21 @@ impl From<GetVectoredError> for CreateImageLayersError {
}
}
impl From<GetReadyAncestorError> for PageReconstructError {
fn from(e: GetReadyAncestorError) -> Self {
match e {
GetReadyAncestorError::AncestorStopping(tid) => {
PageReconstructError::AncestorStopping(tid)
}
GetReadyAncestorError::AncestorLsnTimeout(wait_err) => {
PageReconstructError::AncestorLsnTimeout(wait_err)
}
GetReadyAncestorError::Cancelled => PageReconstructError::Cancelled,
GetReadyAncestorError::Other(other) => PageReconstructError::Other(other),
}
}
}
/// Public interface functions
impl Timeline {
/// Get the LSN where this branch was created
@@ -612,6 +663,7 @@ impl Timeline {
.await?;
timer.stop_and_record();
// info!("Sequential reconstruct state for {key} {reconstruct_state:?}");
let start = Instant::now();
let res = self.reconstruct_value(key, lsn, reconstruct_state).await;
let elapsed = start.elapsed();
@@ -619,29 +671,27 @@ impl Timeline {
.for_result(&res)
.observe(elapsed.as_secs_f64());
if cfg!(feature = "testing") && res.is_err() {
// it can only be walredo issue
use std::fmt::Write;
use std::fmt::Write;
let mut msg = String::new();
let mut msg = String::new();
path.into_iter().for_each(|(res, cont_lsn, layer)| {
writeln!(
msg,
"- layer traversal: result {res:?}, cont_lsn {cont_lsn}, layer: {}",
layer(),
)
.expect("string grows")
});
path.into_iter().for_each(|(res, cont_lsn, layer)| {
writeln!(
msg,
"- layer traversal: result {res:?}, cont_lsn {cont_lsn}, layer: {}",
layer(),
)
.expect("string grows")
});
// this is to rule out or provide evidence that we could in some cases read a duplicate
// walrecord
tracing::info!("walredo failed, path:\n{msg}");
}
info!("Path taken: {msg}");
res
}
pub(crate) fn set_get_vectored_impl(&self, impl_type: GetVectoredImpl) {
*self.get_vectored_impl.write().unwrap() = impl_type;
}
pub(crate) const MAX_GET_VECTORED_KEYS: u64 = 32;
/// Look up multiple page versions at a given LSN
@@ -666,8 +716,66 @@ impl Timeline {
return Err(GetVectoredError::Oversized(key_count));
}
let mut values = BTreeMap::new();
for range in key_ranges {
let mut key = range.start;
while key != range.end {
assert!(!self.shard_identity.is_key_disposable(&key));
key = key.next();
}
}
// TODO: update api above to take KeySpace
let keyspace = KeySpace {
ranges: Vec::from(key_ranges),
};
let which = *self.get_vectored_impl.read().unwrap();
match which {
GetVectoredImpl::Sequential => {
self.get_vectored_sequential_impl(keyspace, lsn, ctx).await
}
GetVectoredImpl::Vectored => {
let vectored_res = self.get_vectored_impl(keyspace.clone(), lsn, ctx).await?;
let sequential_res = self
.get_vectored_sequential_impl(keyspace, lsn, ctx)
.await
.unwrap();
vectored_res.iter().zip(sequential_res.iter()).for_each(
|((seq_key, seq_res), (vec_key, vec_res))| {
assert_eq!(seq_key, vec_key, "{} != {}", seq_key, vec_key);
match (seq_res, vec_res) {
(Ok(seq_blob), Ok(vec_blob)) => {
assert_eq!(seq_blob, vec_blob, "Wrong image for key {}", seq_key)
}
_ => {
error!(
"Unexpected error for key={}: seq_res={:?} vec_res={:?}",
seq_key,
seq_res.as_ref().map(|_| "Ok(...)"),
vec_res.as_ref().map(|_| "Ok(...)")
);
panic!()
}
}
},
);
Ok(vectored_res)
}
}
}
async fn get_vectored_sequential_impl(
&self,
keyspace: KeySpace,
lsn: Lsn,
ctx: &RequestContext,
) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
trace!("get_vectored_sequential_impl range={keyspace:?}");
let mut values = BTreeMap::new();
for range in keyspace.ranges {
let mut key = range.start;
while key != range.end {
assert!(!self.shard_identity.is_key_disposable(&key));
@@ -689,6 +797,46 @@ impl Timeline {
Ok(values)
}
async fn get_vectored_impl(
&self,
keyspace: KeySpace,
lsn: Lsn,
ctx: &RequestContext,
) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
trace!("get_vectored_impl range={keyspace:?}");
for range in &keyspace.ranges {
let mut key = range.start;
while key != range.end {
assert!(!self.shard_identity.is_key_disposable(&key));
key = key.next();
}
}
let mut reconstruct_state = ValuesReconstructState::new();
self.get_vectored_reconstruct_data(keyspace, lsn, &mut reconstruct_state, ctx)
.await?;
let mut results: BTreeMap<Key, Result<Bytes, PageReconstructError>> = BTreeMap::new();
for (key, res) in reconstruct_state.keys {
match res {
Err(err) => {
results.insert(key, Err(err));
}
Ok(state) => {
let state = ValueReconstructState::from(state);
// info!("Vectored reconstruct state for {key} {state:?}");
let reconstruct_res = self.reconstruct_value(key, lsn, state).await;
results.insert(key, reconstruct_res);
}
}
}
Ok(results)
}
/// Get last or prev record separately. Same as get_last_record_rlsn().last/prev.
pub fn get_last_record_lsn(&self) -> Lsn {
self.last_record_lsn.load().last
@@ -1483,6 +1631,7 @@ impl Timeline {
compaction_lock: tokio::sync::Mutex::default(),
gc_lock: tokio::sync::Mutex::default(),
get_vectored_impl: RwLock::new(GetVectoredImpl::default()),
};
result.repartition_threshold =
result.get_checkpoint_distance() / REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE;
@@ -2329,7 +2478,14 @@ impl Timeline {
}
// The function should have updated 'state'
//info!("CALLED for {} at {}: {:?} with {} records, cached {}", key, cont_lsn, result, reconstruct_state.records.len(), cached_lsn);
info!(
"CALLED for {} at {}: {:?} with {} records, cached {}",
key,
cont_lsn,
result,
reconstruct_state.records.len(),
cached_lsn
);
match result {
ValueReconstructResult::Complete => return Ok(traversal_path),
ValueReconstructResult::Continue => {
@@ -2376,60 +2532,8 @@ impl Timeline {
timeline.ancestor_lsn,
cont_lsn
);
let ancestor = match timeline.get_ancestor_timeline() {
Ok(timeline) => timeline,
Err(e) => return Err(PageReconstructError::from(e)),
};
// It's possible that the ancestor timeline isn't active yet, or
// is active but hasn't yet caught up to the branch point. Wait
// for it.
//
// This cannot happen while the pageserver is running normally,
// because you cannot create a branch from a point that isn't
// present in the pageserver yet. However, we don't wait for the
// branch point to be uploaded to cloud storage before creating
// a branch. I.e., the branch LSN need not be remote consistent
// for the branching operation to succeed.
//
// Hence, if we try to load a tenant in such a state where
// 1. the existence of the branch was persisted (in IndexPart and/or locally)
// 2. but the ancestor state is behind branch_lsn because it was not yet persisted
// then we will need to wait for the ancestor timeline to
// re-stream WAL up to branch_lsn before we access it.
//
// How can a tenant get in such a state?
// - ungraceful pageserver process exit
// - detach+attach => this is a bug, https://github.com/neondatabase/neon/issues/4219
//
// NB: this could be avoided by requiring
// branch_lsn >= remote_consistent_lsn
// during branch creation.
match ancestor.wait_to_become_active(ctx).await {
Ok(()) => {}
Err(TimelineState::Stopping) => {
return Err(PageReconstructError::AncestorStopping(ancestor.timeline_id));
}
Err(state) => {
return Err(PageReconstructError::Other(anyhow::anyhow!(
"Timeline {} will not become active. Current state: {:?}",
ancestor.timeline_id,
&state,
)));
}
}
ancestor
.wait_lsn(timeline.ancestor_lsn, ctx)
.await
.map_err(|e| match e {
e @ WaitLsnError::Timeout(_) => PageReconstructError::AncestorLsnTimeout(e),
WaitLsnError::Shutdown => PageReconstructError::Cancelled,
e @ WaitLsnError::BadState => {
PageReconstructError::Other(anyhow::anyhow!(e))
}
})?;
timeline_owned = ancestor;
timeline_owned = timeline.get_ready_ancestor_timeline(ctx).await?;
timeline = &*timeline_owned;
prev_lsn = Lsn(u64::MAX);
continue 'outer;
@@ -2443,7 +2547,7 @@ impl Timeline {
if let Some(open_layer) = &layers.open_layer {
let start_lsn = open_layer.get_lsn_range().start;
if cont_lsn > start_lsn {
//info!("CHECKING for {} at {} on open layer {}", key, cont_lsn, open_layer.filename().display());
// info!("CHECKING for {} at {} on open layer {}", key, cont_lsn, open_layer.filename().display());
// Get all the data needed to reconstruct the page version from this layer.
// But if we have an older cached page image, no need to go past that.
let lsn_floor = max(cached_lsn + 1, start_lsn);
@@ -2476,6 +2580,7 @@ impl Timeline {
let start_lsn = frozen_layer.get_lsn_range().start;
if cont_lsn > start_lsn {
//info!("CHECKING for {} at {} on frozen layer {}", key, cont_lsn, frozen_layer.filename().display());
//how
let lsn_floor = max(cached_lsn + 1, start_lsn);
result = match frozen_layer
.get_value_reconstruct_data(
@@ -2539,6 +2644,135 @@ impl Timeline {
}
}
async fn get_vectored_reconstruct_data(
&self,
keyspace: KeySpace,
request_lsn: Lsn,
reconstruct_state: &mut ValuesReconstructState,
ctx: &RequestContext,
) -> Result<(), GetVectoredError> {
// Start from the current timeline.
let mut timeline_owned: Arc<Timeline>;
let mut timeline = self;
let mut unmapped_keyspace = keyspace.clone();
let mut cont_lsn = Lsn(request_lsn.0 + 1);
let mut fringe = LayerFringe::new();
let initial_keyspace_size = keyspace.total_size();
while reconstruct_state.total_keys_done < initial_keyspace_size {
if self.cancel.is_cancelled() {
return Err(GetVectoredError::Cancelled);
}
// 1. Check if all keys are done and return if true
// 2. Split key ranges up to account for gaps left by finished keys
// 3. Switch timelines if needed
// 4. Find the next layer for each unmapped range. This can be open, frozen or disk
// 5. For the latest layer, inspect it and put all the required data in the bag - pop
// the layer from the heap.
// 6. Mark the ranges that previously mapped to this layer as unmapped
// 7. Go back to 1
let keys_done_last_step = reconstruct_state.consume_done_keys();
trace!("keys_done_last_step={keys_done_last_step:?}");
unmapped_keyspace.remove_overlapping_with(&keys_done_last_step);
if Lsn(cont_lsn.0 - 1) <= timeline.ancestor_lsn {
trace!(
"going into ancestor {}, cont_lsn is {}",
timeline.ancestor_lsn,
cont_lsn
);
timeline_owned = timeline
.get_ready_ancestor_timeline(ctx)
.await
.map_err(GetVectoredError::GetReadyAncestorError)?;
timeline = &*timeline_owned;
}
let guard = timeline.layers.read().await;
let layers = guard.layer_map();
let in_memory_layer = layers.find_in_memory_layer(|l| {
let start_lsn = l.get_lsn_range().start;
cont_lsn > start_lsn
});
match in_memory_layer {
Some(l) => {
fringe.update(ReadableLayerDesc::InMemory(l), unmapped_keyspace.clone());
}
None => {
for range in unmapped_keyspace.ranges.iter() {
let results = layers
.range_search(range.clone(), cont_lsn)
.ok_or_else(|| GetVectoredError::InvalidLsn(cont_lsn))?;
trace!(
"Range search for {:?} at {} returned {:?}",
range,
cont_lsn,
results
);
results
.found
.into_iter()
.map(|(res, accum)| {
(
ReadableLayerDesc::Persistent {
desc: (*res.0.layer).clone(),
lsn_floor: res.0.lsn_floor,
},
accum.to_keyspace(),
)
})
.for_each(|(layer, keyspace)| fringe.update(layer, keyspace));
let not_found = &results.not_found.to_keyspace();
if let Some(first_missing_key) = not_found.start() {
return Err(GetVectoredError::MissingKey(first_missing_key));
}
}
}
}
trace!("fringe={fringe:?}");
if let Some((layer_to_read, keyspace_to_read)) = fringe.next_layer() {
trace!(
"Handling layer {:?} for keyspace {:?}",
layer_to_read,
keyspace_to_read
);
layer_to_read
.get_values_reconstruct_data(
&guard,
keyspace_to_read.clone(),
cont_lsn,
reconstruct_state,
ctx,
)
.await?;
unmapped_keyspace = keyspace_to_read;
cont_lsn = layer_to_read.get_lsn_floor();
trace!(
"Handled layer {:?}. cont_lsn={}, unmapped_keyspace={:?}",
layer_to_read,
cont_lsn,
unmapped_keyspace
);
}
}
Ok(())
}
/// # Cancel-safety
///
/// This method is cancellation-safe.
@@ -2559,6 +2793,66 @@ impl Timeline {
Some((lsn, img))
}
async fn get_ready_ancestor_timeline(
&self,
ctx: &RequestContext,
) -> Result<Arc<Timeline>, GetReadyAncestorError> {
let ancestor = match self.get_ancestor_timeline() {
Ok(timeline) => timeline,
Err(e) => return Err(GetReadyAncestorError::from(e)),
};
// It's possible that the ancestor timeline isn't active yet, or
// is active but hasn't yet caught up to the branch point. Wait
// for it.
//
// This cannot happen while the pageserver is running normally,
// because you cannot create a branch from a point that isn't
// present in the pageserver yet. However, we don't wait for the
// branch point to be uploaded to cloud storage before creating
// a branch. I.e., the branch LSN need not be remote consistent
// for the branching operation to succeed.
//
// Hence, if we try to load a tenant in such a state where
// 1. the existence of the branch was persisted (in IndexPart and/or locally)
// 2. but the ancestor state is behind branch_lsn because it was not yet persisted
// then we will need to wait for the ancestor timeline to
// re-stream WAL up to branch_lsn before we access it.
//
// How can a tenant get in such a state?
// - ungraceful pageserver process exit
// - detach+attach => this is a bug, https://github.com/neondatabase/neon/issues/4219
//
// NB: this could be avoided by requiring
// branch_lsn >= remote_consistent_lsn
// during branch creation.
match ancestor.wait_to_become_active(ctx).await {
Ok(()) => {}
Err(TimelineState::Stopping) => {
return Err(GetReadyAncestorError::AncestorStopping(
ancestor.timeline_id,
));
}
Err(state) => {
return Err(GetReadyAncestorError::Other(anyhow::anyhow!(
"Timeline {} will not become active. Current state: {:?}",
ancestor.timeline_id,
&state,
)));
}
}
ancestor
.wait_lsn(self.ancestor_lsn, ctx)
.await
.map_err(|e| match e {
e @ WaitLsnError::Timeout(_) => GetReadyAncestorError::AncestorLsnTimeout(e),
WaitLsnError::Shutdown => GetReadyAncestorError::Cancelled,
e @ WaitLsnError::BadState => GetReadyAncestorError::Other(anyhow::anyhow!(e)),
})?;
Ok(ancestor)
}
fn get_ancestor_timeline(&self) -> anyhow::Result<Arc<Timeline>> {
let ancestor = self.ancestor_timeline.as_ref().with_context(|| {
format!(

View File

@@ -194,7 +194,7 @@ def post_migration_check(endpoint: Endpoint, sum_before_migration: int, old_loca
"major",
],
)
@pytest.mark.parametrize("with_load", ["with_load", "without_load"])
@pytest.mark.parametrize("with_load", ["with_load"])
def test_tenant_relocation(
neon_env_builder: NeonEnvBuilder,
port_distributor: PortDistributor,