pageserver: remove legacy read path (#8601)

## Problem

We have been maintaining two read paths (legacy and vectored) for a
while now. The legacy read-path was only used for cross validation in some tests.

## Summary of changes
* Tweak all tests that were using the legacy read path to use the
vectored read path instead
* Remove the read path dispatching based on the pageserver configs
* Remove the legacy read path code

We will be able to remove the single blob io code in
`pageserver/src/tenant/blob_io.rs` when https://github.com/neondatabase/neon/issues/7386 is complete.

Closes https://github.com/neondatabase/neon/issues/8005
This commit is contained in:
Vlad Lazar
2024-08-06 10:14:01 +01:00
committed by GitHub
parent 138f008bab
commit 44fedfd6c3
8 changed files with 121 additions and 850 deletions

View File

@@ -4122,7 +4122,7 @@ pub(crate) mod harness {
#[cfg(test)]
mod tests {
use std::collections::BTreeMap;
use std::collections::{BTreeMap, BTreeSet};
use super::*;
use crate::keyspace::KeySpaceAccum;
@@ -4797,7 +4797,7 @@ mod tests {
lsn: Lsn,
repeat: usize,
key_count: usize,
) -> anyhow::Result<()> {
) -> anyhow::Result<HashMap<Key, BTreeSet<Lsn>>> {
let compact = true;
bulk_insert_maybe_compact_gc(tenant, timeline, ctx, lsn, repeat, key_count, compact).await
}
@@ -4810,7 +4810,9 @@ mod tests {
repeat: usize,
key_count: usize,
compact: bool,
) -> anyhow::Result<()> {
) -> anyhow::Result<HashMap<Key, BTreeSet<Lsn>>> {
let mut inserted: HashMap<Key, BTreeSet<Lsn>> = Default::default();
let mut test_key = Key::from_hex("010000000033333333444444445500000000").unwrap();
let mut blknum = 0;
@@ -4831,6 +4833,7 @@ mod tests {
ctx,
)
.await?;
inserted.entry(test_key).or_default().insert(lsn);
writer.finish_write(lsn);
drop(writer);
@@ -4855,7 +4858,7 @@ mod tests {
assert_eq!(res.layers_removed, 0, "this never removes anything");
}
Ok(())
Ok(inserted)
}
//
@@ -4902,7 +4905,7 @@ mod tests {
.await?;
let lsn = Lsn(0x10);
bulk_insert_compact_gc(&tenant, &tline, &ctx, lsn, 50, 10000).await?;
let inserted = bulk_insert_compact_gc(&tenant, &tline, &ctx, lsn, 50, 10000).await?;
let guard = tline.layers.read().await;
guard.layer_map().dump(true, &ctx).await?;
@@ -4963,9 +4966,39 @@ mod tests {
&ctx,
)
.await;
tline
.validate_get_vectored_impl(&vectored_res, read, reads_lsn, &ctx)
.await;
let mut expected_lsns: HashMap<Key, Lsn> = Default::default();
let mut expect_missing = false;
let mut key = read.start().unwrap();
while key != read.end().unwrap() {
if let Some(lsns) = inserted.get(&key) {
let expected_lsn = lsns.iter().rfind(|lsn| **lsn <= reads_lsn);
match expected_lsn {
Some(lsn) => {
expected_lsns.insert(key, *lsn);
}
None => {
expect_missing = true;
break;
}
}
} else {
expect_missing = true;
break;
}
key = key.next();
}
if expect_missing {
assert!(matches!(vectored_res, Err(GetVectoredError::MissingKey(_))));
} else {
for (key, image) in vectored_res? {
let expected_lsn = expected_lsns.get(&key).expect("determined above");
let expected_image = test_img(&format!("{} at {}", key.field6, expected_lsn));
assert_eq!(image?, expected_image);
}
}
}
Ok(())
@@ -5015,10 +5048,6 @@ mod tests {
)
.await;
child_timeline
.validate_get_vectored_impl(&vectored_res, aux_keyspace, read_lsn, &ctx)
.await;
let images = vectored_res?;
assert!(images.is_empty());
Ok(())

View File

@@ -435,21 +435,6 @@ impl ReadableLayer {
}
}
/// Return value from [`Layer::get_value_reconstruct_data`]
#[derive(Clone, Copy, Debug)]
pub enum ValueReconstructResult {
/// Got all the data needed to reconstruct the requested page
Complete,
/// This layer didn't contain all the required data, the caller should look up
/// the predecessor layer at the returned LSN and collect more data from there.
Continue,
/// This layer didn't contain data needed to reconstruct the page version at
/// the returned LSN. This is usually considered an error, but might be OK
/// in some circumstances.
Missing,
}
/// Layers contain a hint indicating whether they are likely to be used for reads. This is a hint rather
/// than an authoritative value, so that we do not have to update it synchronously when changing the visibility
/// of layers (for example when creating a branch that makes some previously covered layers visible). It should

View File

@@ -36,7 +36,7 @@ use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, Fi
use crate::tenant::disk_btree::{
DiskBtreeBuilder, DiskBtreeIterator, DiskBtreeReader, VisitDirection,
};
use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState};
use crate::tenant::storage_layer::Layer;
use crate::tenant::timeline::GetVectoredError;
use crate::tenant::vectored_blob_io::{
BlobFlag, MaxVectoredReadBytes, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
@@ -826,95 +826,6 @@ impl DeltaLayerInner {
})
}
pub(super) async fn get_value_reconstruct_data(
&self,
key: Key,
lsn_range: Range<Lsn>,
reconstruct_state: &mut ValueReconstructState,
ctx: &RequestContext,
) -> anyhow::Result<ValueReconstructResult> {
let mut need_image = true;
// Scan the page versions backwards, starting from `lsn`.
let block_reader = FileBlockReader::new(&self.file, self.file_id);
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
self.index_start_blk,
self.index_root_blk,
&block_reader,
);
let search_key = DeltaKey::from_key_lsn(&key, Lsn(lsn_range.end.0 - 1));
let mut offsets: Vec<(Lsn, u64)> = Vec::new();
tree_reader
.visit(
&search_key.0,
VisitDirection::Backwards,
|key, value| {
let blob_ref = BlobRef(value);
if key[..KEY_SIZE] != search_key.0[..KEY_SIZE] {
return false;
}
let entry_lsn = DeltaKey::extract_lsn_from_buf(key);
if entry_lsn < lsn_range.start {
return false;
}
offsets.push((entry_lsn, blob_ref.pos()));
!blob_ref.will_init()
},
&RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::DeltaLayerBtreeNode)
.build(),
)
.await?;
let ctx = &RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::DeltaLayerValue)
.build();
// Ok, 'offsets' now contains the offsets of all the entries we need to read
let cursor = block_reader.block_cursor();
let mut buf = Vec::new();
for (entry_lsn, pos) in offsets {
cursor
.read_blob_into_buf(pos, &mut buf, ctx)
.await
.with_context(|| {
format!("Failed to read blob from virtual file {}", self.file.path)
})?;
let val = Value::des(&buf).with_context(|| {
format!(
"Failed to deserialize file blob from virtual file {}",
self.file.path
)
})?;
match val {
Value::Image(img) => {
reconstruct_state.img = Some((entry_lsn, img));
need_image = false;
break;
}
Value::WalRecord(rec) => {
let will_init = rec.will_init();
reconstruct_state.records.push((entry_lsn, rec));
if will_init {
// This WAL record initializes the page, so no need to go further back
need_image = false;
break;
}
}
}
}
// If an older page image is needed to reconstruct the page, let the
// caller know.
if need_image {
Ok(ValueReconstructResult::Continue)
} else {
Ok(ValueReconstructResult::Complete)
}
}
// Look up the keys in the provided keyspace and update
// the reconstruct state with whatever is found.
//

View File

@@ -32,9 +32,7 @@ use crate::tenant::block_io::{BlockBuf, BlockReader, FileBlockReader};
use crate::tenant::disk_btree::{
DiskBtreeBuilder, DiskBtreeIterator, DiskBtreeReader, VisitDirection,
};
use crate::tenant::storage_layer::{
LayerAccessStats, ValueReconstructResult, ValueReconstructState,
};
use crate::tenant::storage_layer::LayerAccessStats;
use crate::tenant::timeline::GetVectoredError;
use crate::tenant::vectored_blob_io::{
BlobFlag, MaxVectoredReadBytes, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
@@ -429,46 +427,6 @@ impl ImageLayerInner {
})
}
pub(super) async fn get_value_reconstruct_data(
&self,
key: Key,
reconstruct_state: &mut ValueReconstructState,
ctx: &RequestContext,
) -> anyhow::Result<ValueReconstructResult> {
let block_reader = FileBlockReader::new(&self.file, self.file_id);
let tree_reader =
DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, &block_reader);
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
key.write_to_byte_slice(&mut keybuf);
if let Some(offset) = tree_reader
.get(
&keybuf,
&RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::ImageLayerBtreeNode)
.build(),
)
.await?
{
let blob = block_reader
.block_cursor()
.read_blob(
offset,
&RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::ImageLayerValue)
.build(),
)
.await
.with_context(|| format!("failed to read value from offset {}", offset))?;
let value = Bytes::from(blob);
reconstruct_state.img = Some((self.lsn, value));
Ok(ValueReconstructResult::Complete)
} else {
Ok(ValueReconstructResult::Missing)
}
}
// Look up the keys in the provided keyspace and update
// the reconstruct state with whatever is found.
pub(super) async fn get_values_reconstruct_data(

View File

@@ -10,11 +10,10 @@ use crate::page_cache::PAGE_SZ;
use crate::repository::{Key, Value};
use crate::tenant::block_io::{BlockCursor, BlockReader, BlockReaderRef};
use crate::tenant::ephemeral_file::EphemeralFile;
use crate::tenant::storage_layer::ValueReconstructResult;
use crate::tenant::timeline::GetVectoredError;
use crate::tenant::{PageReconstructError, Timeline};
use crate::{l0_flush, page_cache, walrecord};
use anyhow::{anyhow, ensure, Result};
use anyhow::{anyhow, Result};
use pageserver_api::keyspace::KeySpace;
use pageserver_api::models::InMemoryLayerInfo;
use pageserver_api::shard::TenantShardId;
@@ -33,10 +32,7 @@ use std::sync::atomic::Ordering as AtomicOrdering;
use std::sync::atomic::{AtomicU64, AtomicUsize};
use tokio::sync::{RwLock, RwLockWriteGuard};
use super::{
DeltaLayerWriter, ResidentLayer, ValueReconstructSituation, ValueReconstructState,
ValuesReconstructState,
};
use super::{DeltaLayerWriter, ResidentLayer, ValueReconstructSituation, ValuesReconstructState};
#[derive(Debug, PartialEq, Eq, Clone, Copy, Hash)]
pub(crate) struct InMemoryLayerFileId(page_cache::FileId);
@@ -55,9 +51,6 @@ pub struct InMemoryLayer {
/// Writes are only allowed when this is `None`.
pub(crate) end_lsn: OnceLock<Lsn>,
/// Used for traversal path. Cached representation of the in-memory layer before frozen.
local_path_str: Arc<str>,
/// Used for traversal path. Cached representation of the in-memory layer after frozen.
frozen_local_path_str: OnceLock<Arc<str>>,
@@ -248,12 +241,6 @@ impl InMemoryLayer {
self.start_lsn..self.end_lsn_or_max()
}
pub(crate) fn local_path_str(&self) -> &Arc<str> {
self.frozen_local_path_str
.get()
.unwrap_or(&self.local_path_str)
}
/// debugging function to print out the contents of the layer
///
/// this is likely completly unused
@@ -303,60 +290,6 @@ impl InMemoryLayer {
Ok(())
}
/// Look up given value in the layer.
pub(crate) async fn get_value_reconstruct_data(
&self,
key: Key,
lsn_range: Range<Lsn>,
reconstruct_state: &mut ValueReconstructState,
ctx: &RequestContext,
) -> anyhow::Result<ValueReconstructResult> {
ensure!(lsn_range.start >= self.start_lsn);
let mut need_image = true;
let ctx = RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::InMemoryLayer)
.build();
let inner = self.inner.read().await;
let reader = inner.file.block_cursor();
// Scan the page versions backwards, starting from `lsn`.
if let Some(vec_map) = inner.index.get(&key) {
let slice = vec_map.slice_range(lsn_range);
for (entry_lsn, pos) in slice.iter().rev() {
let buf = reader.read_blob(*pos, &ctx).await?;
let value = Value::des(&buf)?;
match value {
Value::Image(img) => {
reconstruct_state.img = Some((*entry_lsn, img));
return Ok(ValueReconstructResult::Complete);
}
Value::WalRecord(rec) => {
let will_init = rec.will_init();
reconstruct_state.records.push((*entry_lsn, rec));
if will_init {
// This WAL record initializes the page, so no need to go further back
need_image = false;
break;
}
}
}
}
}
// release lock on 'inner'
// If an older page image is needed to reconstruct the page, let the
// caller know.
if need_image {
Ok(ValueReconstructResult::Continue)
} else {
Ok(ValueReconstructResult::Complete)
}
}
// Look up the keys in the provided keyspace and update
// the reconstruct state with whatever is found.
//
@@ -458,11 +391,6 @@ impl InMemoryLayer {
Ok(InMemoryLayer {
file_id: key,
local_path_str: {
let mut buf = String::new();
inmem_layer_log_display(&mut buf, timeline_id, start_lsn, Lsn::MAX).unwrap();
buf.into()
},
frozen_local_path_str: OnceLock::new(),
conf,
timeline_id,

View File

@@ -24,8 +24,7 @@ use super::delta_layer::{self, DeltaEntry};
use super::image_layer::{self};
use super::{
AsLayerDesc, ImageLayerWriter, LayerAccessStats, LayerAccessStatsReset, LayerName,
LayerVisibilityHint, PersistentLayerDesc, ValueReconstructResult, ValueReconstructState,
ValuesReconstructState,
LayerVisibilityHint, PersistentLayerDesc, ValuesReconstructState,
};
use utils::generation::Generation;
@@ -301,42 +300,6 @@ impl Layer {
self.0.delete_on_drop();
}
/// Return data needed to reconstruct given page at LSN.
///
/// It is up to the caller to collect more data from the previous layer and
/// perform WAL redo, if necessary.
///
/// # Cancellation-Safety
///
/// This method is cancellation-safe.
pub(crate) async fn get_value_reconstruct_data(
&self,
key: Key,
lsn_range: Range<Lsn>,
reconstruct_data: &mut ValueReconstructState,
ctx: &RequestContext,
) -> anyhow::Result<ValueReconstructResult> {
use anyhow::ensure;
let layer = self.0.get_or_maybe_download(true, Some(ctx)).await?;
self.0.access_stats.record_access(ctx);
if self.layer_desc().is_delta {
ensure!(lsn_range.start >= self.layer_desc().lsn_range.start);
ensure!(self.layer_desc().key_range.contains(&key));
} else {
ensure!(self.layer_desc().key_range.contains(&key));
ensure!(lsn_range.start >= self.layer_desc().image_layer_lsn());
ensure!(lsn_range.end >= self.layer_desc().image_layer_lsn());
}
layer
.get_value_reconstruct_data(key, lsn_range, reconstruct_data, &self.0, ctx)
.instrument(tracing::debug_span!("get_value_reconstruct_data", layer=%self))
.await
.with_context(|| format!("get_value_reconstruct_data for layer {self}"))
}
pub(crate) async fn get_values_reconstruct_data(
&self,
keyspace: KeySpace,
@@ -441,10 +404,6 @@ impl Layer {
&self.0.path
}
pub(crate) fn debug_str(&self) -> &Arc<str> {
&self.0.debug_str
}
pub(crate) fn metadata(&self) -> LayerFileMetadata {
self.0.metadata()
}
@@ -519,7 +478,7 @@ impl Layer {
///
/// However when we want something evicted, we cannot evict it right away as there might be current
/// reads happening on it. For example: it has been searched from [`LayerMap::search`] but not yet
/// read with [`Layer::get_value_reconstruct_data`].
/// read with [`Layer::get_values_reconstruct_data`].
///
/// [`LayerMap::search`]: crate::tenant::layer_map::LayerMap::search
#[derive(Debug)]
@@ -600,9 +559,6 @@ struct LayerInner {
/// Full path to the file; unclear if this should exist anymore.
path: Utf8PathBuf,
/// String representation of the layer, used for traversal id.
debug_str: Arc<str>,
desc: PersistentLayerDesc,
/// Timeline access is needed for remote timeline client and metrics.
@@ -836,9 +792,6 @@ impl LayerInner {
LayerInner {
conf,
debug_str: {
format!("timelines/{}/{}", timeline.timeline_id, desc.layer_name()).into()
},
path: local_path,
desc,
timeline: Arc::downgrade(timeline),
@@ -1759,28 +1712,6 @@ impl DownloadedLayer {
.map_err(|e| anyhow::anyhow!("layer load failed earlier: {e}"))
}
async fn get_value_reconstruct_data(
&self,
key: Key,
lsn_range: Range<Lsn>,
reconstruct_data: &mut ValueReconstructState,
owner: &Arc<LayerInner>,
ctx: &RequestContext,
) -> anyhow::Result<ValueReconstructResult> {
use LayerKind::*;
match self.get(owner, ctx).await? {
Delta(d) => {
d.get_value_reconstruct_data(key, lsn_range, reconstruct_data, ctx)
.await
}
Image(i) => {
i.get_value_reconstruct_data(key, reconstruct_data, ctx)
.await
}
}
}
async fn get_values_reconstruct_data(
&self,
keyspace: KeySpace,

View File

@@ -50,13 +50,26 @@ async fn smoke_test() {
// all layers created at pageserver are like `layer`, initialized with strong
// Arc<DownloadedLayer>.
let controlfile_keyspace = KeySpace {
ranges: vec![CONTROLFILE_KEY..CONTROLFILE_KEY.next()],
};
let img_before = {
let mut data = ValueReconstructState::default();
let mut data = ValuesReconstructState::default();
layer
.get_value_reconstruct_data(CONTROLFILE_KEY, Lsn(0x10)..Lsn(0x11), &mut data, &ctx)
.get_values_reconstruct_data(
controlfile_keyspace.clone(),
Lsn(0x10)..Lsn(0x11),
&mut data,
&ctx,
)
.await
.unwrap();
data.img
data.keys
.remove(&CONTROLFILE_KEY)
.expect("must be present")
.expect("should not error")
.img
.take()
.expect("tenant harness writes the control file")
};
@@ -74,13 +87,24 @@ async fn smoke_test() {
// on accesses when the layer is evicted, it will automatically be downloaded.
let img_after = {
let mut data = ValueReconstructState::default();
let mut data = ValuesReconstructState::default();
layer
.get_value_reconstruct_data(CONTROLFILE_KEY, Lsn(0x10)..Lsn(0x11), &mut data, &ctx)
.get_values_reconstruct_data(
controlfile_keyspace.clone(),
Lsn(0x10)..Lsn(0x11),
&mut data,
&ctx,
)
.instrument(download_span.clone())
.await
.unwrap();
data.img.take().unwrap()
data.keys
.remove(&CONTROLFILE_KEY)
.expect("must be present")
.expect("should not error")
.img
.take()
.expect("tenant harness writes the control file")
};
assert_eq!(img_before, img_after);
@@ -830,7 +854,7 @@ async fn eviction_cancellation_on_drop() {
fn layer_size() {
assert_eq!(size_of::<LayerAccessStats>(), 8);
assert_eq!(size_of::<PersistentLayerDesc>(), 104);
assert_eq!(size_of::<LayerInner>(), 312);
assert_eq!(size_of::<LayerInner>(), 296);
// it also has the utf8 path
}

View File

@@ -22,8 +22,8 @@ use handle::ShardTimelineId;
use once_cell::sync::Lazy;
use pageserver_api::{
key::{
AUX_FILES_KEY, KEY_SIZE, METADATA_KEY_BEGIN_PREFIX, METADATA_KEY_END_PREFIX,
NON_INHERITED_RANGE, NON_INHERITED_SPARSE_RANGE,
KEY_SIZE, METADATA_KEY_BEGIN_PREFIX, METADATA_KEY_END_PREFIX, NON_INHERITED_RANGE,
NON_INHERITED_SPARSE_RANGE,
},
keyspace::{KeySpaceAccum, KeySpaceRandomAccum, SparseKeyPartitioning},
models::{
@@ -59,10 +59,7 @@ use std::{
collections::{BTreeMap, HashMap, HashSet},
sync::atomic::AtomicU64,
};
use std::{
cmp::{max, min},
ops::ControlFlow,
};
use std::{cmp::min, ops::ControlFlow};
use std::{
collections::btree_map::Entry,
ops::{Deref, Range},
@@ -87,8 +84,8 @@ use crate::{
disk_usage_eviction_task::finite_f32,
tenant::storage_layer::{
AsLayerDesc, DeltaLayerWriter, EvictionError, ImageLayerWriter, InMemoryLayer, Layer,
LayerAccessStatsReset, LayerName, ResidentLayer, ValueReconstructResult,
ValueReconstructState, ValuesReconstructState,
LayerAccessStatsReset, LayerName, ResidentLayer, ValueReconstructState,
ValuesReconstructState,
},
};
use crate::{
@@ -543,7 +540,6 @@ pub struct MissingKeyError {
cont_lsn: Lsn,
request_lsn: Lsn,
ancestor_lsn: Option<Lsn>,
traversal_path: Vec<TraversalPathItem>,
backtrace: Option<std::backtrace::Backtrace>,
}
@@ -564,18 +560,6 @@ impl std::fmt::Display for MissingKeyError {
write!(f, ", ancestor {}", ancestor_lsn)?;
}
if !self.traversal_path.is_empty() {
writeln!(f)?;
}
for (r, c, l) in &self.traversal_path {
writeln!(
f,
"layer traversal: result {:?}, cont_lsn {}, layer: {}",
r, c, l,
)?;
}
if let Some(ref backtrace) = self.backtrace {
write!(f, "\n{}", backtrace)?;
}
@@ -918,119 +902,44 @@ impl Timeline {
self.timeline_get_throttle.throttle(ctx, 1).await;
match self.conf.get_impl {
GetImpl::Legacy => {
let reconstruct_state = ValueReconstructState {
records: Vec::new(),
img: None,
};
let keyspace = KeySpace {
ranges: vec![key..key.next()],
};
self.get_impl(key, lsn, reconstruct_state, ctx).await
}
GetImpl::Vectored => {
let keyspace = KeySpace {
ranges: vec![key..key.next()],
};
// Initialise the reconstruct state for the key with the cache
// entry returned above.
let mut reconstruct_state = ValuesReconstructState::new();
// Initialise the reconstruct state for the key with the cache
// entry returned above.
let mut reconstruct_state = ValuesReconstructState::new();
let vectored_res = self
.get_vectored_impl(keyspace.clone(), lsn, &mut reconstruct_state, ctx)
.await;
let vectored_res = self
.get_vectored_impl(keyspace.clone(), lsn, &mut reconstruct_state, ctx)
.await;
if self.conf.validate_vectored_get {
self.validate_get_vectored_impl(&vectored_res, keyspace, lsn, ctx)
.await;
}
let key_value = vectored_res?.pop_first();
match key_value {
Some((got_key, value)) => {
if got_key != key {
error!(
"Expected {}, but singular vectored get returned {}",
key, got_key
);
Err(PageReconstructError::Other(anyhow!(
"Singular vectored get returned wrong key"
)))
} else {
value
}
}
None => Err(PageReconstructError::MissingKey(MissingKeyError {
key,
shard: self.shard_identity.get_shard_number(&key),
cont_lsn: Lsn(0),
request_lsn: lsn,
ancestor_lsn: None,
traversal_path: Vec::new(),
backtrace: None,
})),
let key_value = vectored_res?.pop_first();
match key_value {
Some((got_key, value)) => {
if got_key != key {
error!(
"Expected {}, but singular vectored get returned {}",
key, got_key
);
Err(PageReconstructError::Other(anyhow!(
"Singular vectored get returned wrong key"
)))
} else {
value
}
}
None => Err(PageReconstructError::MissingKey(MissingKeyError {
key,
shard: self.shard_identity.get_shard_number(&key),
cont_lsn: Lsn(0),
request_lsn: lsn,
ancestor_lsn: None,
backtrace: None,
})),
}
}
/// Not subject to [`Self::timeline_get_throttle`].
async fn get_impl(
&self,
key: Key,
lsn: Lsn,
mut reconstruct_state: ValueReconstructState,
ctx: &RequestContext,
) -> Result<Bytes, PageReconstructError> {
// XXX: structured stats collection for layer eviction here.
trace!(
"get page request for {}@{} from task kind {:?}",
key,
lsn,
ctx.task_kind()
);
let timer = crate::metrics::GET_RECONSTRUCT_DATA_TIME
.for_get_kind(GetKind::Singular)
.start_timer();
let path = self
.get_reconstruct_data(key, lsn, &mut reconstruct_state, ctx)
.await?;
timer.stop_and_record();
let start = Instant::now();
let res = self.reconstruct_value(key, lsn, reconstruct_state).await;
let elapsed = start.elapsed();
crate::metrics::RECONSTRUCT_TIME
.for_get_kind(GetKind::Singular)
.observe(elapsed.as_secs_f64());
if cfg!(feature = "testing")
&& res.is_err()
&& !matches!(res, Err(PageReconstructError::Cancelled))
{
// it can only be walredo issue
use std::fmt::Write;
let mut msg = String::new();
path.into_iter().for_each(|(res, cont_lsn, layer)| {
writeln!(
msg,
"- layer traversal: result {res:?}, cont_lsn {cont_lsn}, layer: {}",
layer,
)
.expect("string grows")
});
// this is to rule out or provide evidence that we could in some cases read a duplicate
// walrecord
tracing::info!("walredo failed, path:\n{msg}");
}
res
}
pub(crate) const MAX_GET_VECTORED_KEYS: u64 = 32;
pub(crate) const VEC_GET_LAYERS_VISITED_WARN_THRESH: f64 = 512.0;
@@ -1080,28 +989,14 @@ impl Timeline {
.throttle(ctx, key_count as usize)
.await;
let res = match self.conf.get_vectored_impl {
GetVectoredImpl::Sequential => {
self.get_vectored_sequential_impl(keyspace, lsn, ctx).await
}
GetVectoredImpl::Vectored => {
let vectored_res = self
.get_vectored_impl(
keyspace.clone(),
lsn,
&mut ValuesReconstructState::new(),
ctx,
)
.await;
if self.conf.validate_vectored_get {
self.validate_get_vectored_impl(&vectored_res, keyspace, lsn, ctx)
.await;
}
vectored_res
}
};
let res = self
.get_vectored_impl(
keyspace.clone(),
lsn,
&mut ValuesReconstructState::new(),
ctx,
)
.await;
if let Some((metric, start)) = start {
let elapsed = start.elapsed();
@@ -1190,65 +1085,6 @@ impl Timeline {
vectored_res
}
/// Not subject to [`Self::timeline_get_throttle`].
pub(super) async fn get_vectored_sequential_impl(
&self,
keyspace: KeySpace,
lsn: Lsn,
ctx: &RequestContext,
) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
let mut values = BTreeMap::new();
for range in keyspace.ranges {
let mut key = range.start;
while key != range.end {
let block = self
.get_impl(key, lsn, ValueReconstructState::default(), ctx)
.await;
use PageReconstructError::*;
match block {
Err(Cancelled) => return Err(GetVectoredError::Cancelled),
Err(MissingKey(_))
if NON_INHERITED_RANGE.contains(&key)
|| NON_INHERITED_SPARSE_RANGE.contains(&key) =>
{
// Ignore missing key error for aux key range. TODO: currently, we assume non_inherited_range == aux_key_range.
// When we add more types of keys into the page server, we should revisit this part of code and throw errors
// accordingly.
key = key.next();
}
Err(MissingKey(err)) => {
return Err(GetVectoredError::MissingKey(err));
}
Err(Other(err))
if err
.to_string()
.contains("downloading evicted layer file failed") =>
{
return Err(GetVectoredError::Other(err))
}
Err(Other(err))
if err
.chain()
.any(|cause| cause.to_string().contains("layer loading failed")) =>
{
// The intent here is to achieve error parity with the vectored read path.
// When vectored read fails to load a layer it fails the whole read, hence
// we mimic this behaviour here to keep the validation happy.
return Err(GetVectoredError::Other(err));
}
_ => {
values.insert(key, block);
key = key.next();
}
}
}
}
Ok(values)
}
pub(super) async fn get_vectored_impl(
&self,
keyspace: KeySpace,
@@ -1319,113 +1155,6 @@ impl Timeline {
Ok(results)
}
/// Not subject to [`Self::timeline_get_throttle`].
pub(super) async fn validate_get_vectored_impl(
&self,
vectored_res: &Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError>,
keyspace: KeySpace,
lsn: Lsn,
ctx: &RequestContext,
) {
if keyspace.overlaps(&Key::metadata_key_range()) {
// skip validation for metadata key range
return;
}
let sequential_res = self
.get_vectored_sequential_impl(keyspace.clone(), lsn, ctx)
.await;
fn errors_match(lhs: &GetVectoredError, rhs: &GetVectoredError) -> bool {
use GetVectoredError::*;
match (lhs, rhs) {
(Oversized(l), Oversized(r)) => l == r,
(InvalidLsn(l), InvalidLsn(r)) => l == r,
(MissingKey(l), MissingKey(r)) => l.key == r.key,
(GetReadyAncestorError(_), GetReadyAncestorError(_)) => true,
(Other(_), Other(_)) => true,
_ => false,
}
}
match (&sequential_res, vectored_res) {
(Err(GetVectoredError::Cancelled), _) => {},
(_, Err(GetVectoredError::Cancelled)) => {},
(Err(seq_err), Ok(_)) => {
panic!(concat!("Sequential get failed with {}, but vectored get did not",
" - keyspace={:?} lsn={}"),
seq_err, keyspace, lsn) },
(Ok(_), Err(GetVectoredError::GetReadyAncestorError(GetReadyAncestorError::AncestorLsnTimeout(_)))) => {
// Sequential get runs after vectored get, so it is possible for the later
// to time out while waiting for its ancestor's Lsn to become ready and for the
// former to succeed (it essentially has a doubled wait time).
},
(Ok(_), Err(vec_err)) => {
panic!(concat!("Vectored get failed with {}, but sequential get did not",
" - keyspace={:?} lsn={}"),
vec_err, keyspace, lsn) },
(Err(seq_err), Err(vec_err)) => {
assert!(errors_match(seq_err, vec_err),
"Mismatched errors: {seq_err} != {vec_err} - keyspace={keyspace:?} lsn={lsn}")},
(Ok(seq_values), Ok(vec_values)) => {
seq_values.iter().zip(vec_values.iter()).for_each(|((seq_key, seq_res), (vec_key, vec_res))| {
assert_eq!(seq_key, vec_key);
match (seq_res, vec_res) {
(Ok(seq_blob), Ok(vec_blob)) => {
Self::validate_key_equivalence(seq_key, &keyspace, lsn, seq_blob, vec_blob);
},
(Err(err), Ok(_)) => {
panic!(
concat!("Sequential get failed with {} for key {}, but vectored get did not",
" - keyspace={:?} lsn={}"),
err, seq_key, keyspace, lsn) },
(Ok(_), Err(err)) => {
panic!(
concat!("Vectored get failed with {} for key {}, but sequential get did not",
" - keyspace={:?} lsn={}"),
err, seq_key, keyspace, lsn) },
(Err(_), Err(_)) => {}
}
})
}
}
}
fn validate_key_equivalence(
key: &Key,
keyspace: &KeySpace,
lsn: Lsn,
seq: &Bytes,
vec: &Bytes,
) {
if *key == AUX_FILES_KEY {
// The value reconstruct of AUX_FILES_KEY from records is not deterministic
// since it uses a hash map under the hood. Hence, deserialise both results
// before comparing.
let seq_aux_dir_res = AuxFilesDirectory::des(seq);
let vec_aux_dir_res = AuxFilesDirectory::des(vec);
match (&seq_aux_dir_res, &vec_aux_dir_res) {
(Ok(seq_aux_dir), Ok(vec_aux_dir)) => {
assert_eq!(
seq_aux_dir, vec_aux_dir,
"Mismatch for key {} - keyspace={:?} lsn={}",
key, keyspace, lsn
);
}
(Err(_), Err(_)) => {}
_ => {
panic!("Mismatch for {key}: {seq_aux_dir_res:?} != {vec_aux_dir_res:?}");
}
}
} else {
// All other keys should reconstruct deterministically, so we simply compare the blobs.
assert_eq!(
seq, vec,
"Image mismatch for key {key} - keyspace={keyspace:?} lsn={lsn}"
);
}
}
/// Get last or prev record separately. Same as get_last_record_rlsn().last/prev.
pub(crate) fn get_last_record_lsn(&self) -> Lsn {
self.last_record_lsn.load().last
@@ -3215,228 +2944,7 @@ impl Timeline {
}
}
type TraversalId = Arc<str>;
trait TraversalLayerExt {
fn traversal_id(&self) -> TraversalId;
}
impl TraversalLayerExt for Layer {
fn traversal_id(&self) -> TraversalId {
Arc::clone(self.debug_str())
}
}
impl TraversalLayerExt for Arc<InMemoryLayer> {
fn traversal_id(&self) -> TraversalId {
Arc::clone(self.local_path_str())
}
}
impl Timeline {
///
/// Get a handle to a Layer for reading.
///
/// The returned Layer might be from an ancestor timeline, if the
/// segment hasn't been updated on this timeline yet.
///
/// This function takes the current timeline's locked LayerMap as an argument,
/// so callers can avoid potential race conditions.
///
/// # Cancel-Safety
///
/// This method is cancellation-safe.
async fn get_reconstruct_data(
&self,
key: Key,
request_lsn: Lsn,
reconstruct_state: &mut ValueReconstructState,
ctx: &RequestContext,
) -> Result<Vec<TraversalPathItem>, PageReconstructError> {
// Start from the current timeline.
let mut timeline_owned;
let mut timeline = self;
let mut read_count = scopeguard::guard(0, |cnt| {
crate::metrics::READ_NUM_LAYERS_VISITED.observe(cnt as f64)
});
// For debugging purposes, collect the path of layers that we traversed
// through. It's included in the error message if we fail to find the key.
let mut traversal_path = Vec::<TraversalPathItem>::new();
let cached_lsn = if let Some((cached_lsn, _)) = &reconstruct_state.img {
*cached_lsn
} else {
Lsn(0)
};
// 'prev_lsn' tracks the last LSN that we were at in our search. It's used
// to check that each iteration make some progress, to break infinite
// looping if something goes wrong.
let mut prev_lsn = None;
let mut result = ValueReconstructResult::Continue;
let mut cont_lsn = Lsn(request_lsn.0 + 1);
'outer: loop {
if self.cancel.is_cancelled() {
return Err(PageReconstructError::Cancelled);
}
// The function should have updated 'state'
//info!("CALLED for {} at {}: {:?} with {} records, cached {}", key, cont_lsn, result, reconstruct_state.records.len(), cached_lsn);
match result {
ValueReconstructResult::Complete => return Ok(traversal_path),
ValueReconstructResult::Continue => {
// If we reached an earlier cached page image, we're done.
if cont_lsn == cached_lsn + 1 {
return Ok(traversal_path);
}
if let Some(prev) = prev_lsn {
if prev <= cont_lsn {
// Didn't make any progress in last iteration. Error out to avoid
// getting stuck in the loop.
return Err(PageReconstructError::MissingKey(MissingKeyError {
key,
shard: self.shard_identity.get_shard_number(&key),
cont_lsn: Lsn(cont_lsn.0 - 1),
request_lsn,
ancestor_lsn: Some(timeline.ancestor_lsn),
traversal_path,
backtrace: None,
}));
}
}
prev_lsn = Some(cont_lsn);
}
ValueReconstructResult::Missing => {
return Err(PageReconstructError::MissingKey(MissingKeyError {
key,
shard: self.shard_identity.get_shard_number(&key),
cont_lsn,
request_lsn,
ancestor_lsn: None,
traversal_path,
backtrace: if cfg!(test) {
Some(std::backtrace::Backtrace::force_capture())
} else {
None
},
}));
}
}
// Recurse into ancestor if needed
if let Some(ancestor_timeline) = timeline.ancestor_timeline.as_ref() {
if key.is_inherited_key() && Lsn(cont_lsn.0 - 1) <= timeline.ancestor_lsn {
trace!(
"going into ancestor {}, cont_lsn is {}",
timeline.ancestor_lsn,
cont_lsn
);
timeline_owned = timeline
.get_ready_ancestor_timeline(ancestor_timeline, ctx)
.await?;
timeline = &*timeline_owned;
prev_lsn = None;
continue 'outer;
}
}
let guard = timeline.layers.read().await;
let layers = guard.layer_map();
// Check the open and frozen in-memory layers first, in order from newest
// to oldest.
if let Some(open_layer) = &layers.open_layer {
let start_lsn = open_layer.get_lsn_range().start;
if cont_lsn > start_lsn {
//info!("CHECKING for {} at {} on open layer {}", key, cont_lsn, open_layer.layer_name().display());
// Get all the data needed to reconstruct the page version from this layer.
// But if we have an older cached page image, no need to go past that.
let lsn_floor = max(cached_lsn + 1, start_lsn);
let open_layer = open_layer.clone();
drop(guard);
result = match open_layer
.get_value_reconstruct_data(
key,
lsn_floor..cont_lsn,
reconstruct_state,
ctx,
)
.await
{
Ok(result) => result,
Err(e) => return Err(PageReconstructError::from(e)),
};
cont_lsn = lsn_floor;
*read_count += 1;
traversal_path.push((result, cont_lsn, open_layer.traversal_id()));
continue 'outer;
}
}
for frozen_layer in layers.frozen_layers.iter().rev() {
let start_lsn = frozen_layer.get_lsn_range().start;
if cont_lsn > start_lsn {
//info!("CHECKING for {} at {} on frozen layer {}", key, cont_lsn, frozen_layer.layer_name().display());
let lsn_floor = max(cached_lsn + 1, start_lsn);
let frozen_layer = frozen_layer.clone();
drop(guard);
result = match frozen_layer
.get_value_reconstruct_data(
key,
lsn_floor..cont_lsn,
reconstruct_state,
ctx,
)
.await
{
Ok(result) => result,
Err(e) => return Err(PageReconstructError::from(e)),
};
cont_lsn = lsn_floor;
*read_count += 1;
traversal_path.push((result, cont_lsn, frozen_layer.traversal_id()));
continue 'outer;
}
}
if let Some(SearchResult { lsn_floor, layer }) = layers.search(key, cont_lsn) {
let layer = guard.get_from_desc(&layer);
drop(guard);
// Get all the data needed to reconstruct the page version from this layer.
// But if we have an older cached page image, no need to go past that.
let lsn_floor = max(cached_lsn + 1, lsn_floor);
result = match layer
.get_value_reconstruct_data(key, lsn_floor..cont_lsn, reconstruct_state, ctx)
.await
{
Ok(result) => result,
Err(e) => return Err(PageReconstructError::from(e)),
};
cont_lsn = lsn_floor;
*read_count += 1;
traversal_path.push((result, cont_lsn, layer.traversal_id()));
continue 'outer;
} else if timeline.ancestor_timeline.is_some() {
// Nothing on this timeline. Traverse to parent
result = ValueReconstructResult::Continue;
cont_lsn = Lsn(timeline.ancestor_lsn.0 + 1);
continue 'outer;
} else {
// Nothing found
result = ValueReconstructResult::Missing;
continue 'outer;
}
}
}
#[allow(clippy::doc_lazy_continuation)]
/// Get the data needed to reconstruct all keys in the provided keyspace
///
@@ -3530,7 +3038,6 @@ impl Timeline {
cont_lsn,
request_lsn,
ancestor_lsn: Some(timeline.ancestor_lsn),
traversal_path: vec![],
backtrace: None,
}));
}
@@ -5895,8 +5402,6 @@ impl Timeline {
}
}
type TraversalPathItem = (ValueReconstructResult, Lsn, TraversalId);
/// Tracking writes ingestion does to a particular in-memory layer.
///
/// Cleared upon freezing a layer.