mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-10 23:12:54 +00:00
pageserver: fix vectored read path delta layer index traversal (#7001)
## Problem Last weeks enablement of vectored get generated a number of panics. From them, I diagnosed two issues in the delta layer index traversal logic 1. The `key >= range.start && lsn >= lsn_range.start` was too aggressive. Lsns are not monotonically increasing in the delta layer index (keys are though), so we cannot assert on them. 2. Lsns greater or equal to `lsn_range.end` were not skipped. This caused the query to consider records newer than the request Lsn. ## Summary of changes * Fix the issues mentioned above inline * Refactor the layer traversal logic to make it unit testable * Add unit test which reproduces the failure modes listed above.
This commit is contained in:
@@ -18,10 +18,19 @@
|
||||
//! - An Iterator interface would be more convenient for the callers than the
|
||||
//! 'visit' function
|
||||
//!
|
||||
use async_stream::try_stream;
|
||||
use byteorder::{ReadBytesExt, BE};
|
||||
use bytes::{BufMut, Bytes, BytesMut};
|
||||
use either::Either;
|
||||
use std::{cmp::Ordering, io, result};
|
||||
use futures::Stream;
|
||||
use hex;
|
||||
use std::{
|
||||
cmp::Ordering,
|
||||
io,
|
||||
iter::Rev,
|
||||
ops::{Range, RangeInclusive},
|
||||
result,
|
||||
};
|
||||
use thiserror::Error;
|
||||
use tracing::error;
|
||||
|
||||
@@ -250,6 +259,90 @@ where
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
/// Return a stream which yields all key, value pairs from the index
|
||||
/// starting from the first key greater or equal to `start_key`.
|
||||
///
|
||||
/// Note that this is a copy of [`Self::visit`].
|
||||
/// TODO: Once the sequential read path is removed this will become
|
||||
/// the only index traversal method.
|
||||
pub fn get_stream_from<'a>(
|
||||
&'a self,
|
||||
start_key: &'a [u8; L],
|
||||
ctx: &'a RequestContext,
|
||||
) -> impl Stream<Item = std::result::Result<(Vec<u8>, u64), DiskBtreeError>> + 'a {
|
||||
try_stream! {
|
||||
let mut stack = Vec::new();
|
||||
stack.push((self.root_blk, None));
|
||||
let block_cursor = self.reader.block_cursor();
|
||||
while let Some((node_blknum, opt_iter)) = stack.pop() {
|
||||
// Locate the node.
|
||||
let node_buf = block_cursor
|
||||
.read_blk(self.start_blk + node_blknum, ctx)
|
||||
.await?;
|
||||
|
||||
let node = OnDiskNode::deparse(node_buf.as_ref())?;
|
||||
let prefix_len = node.prefix_len as usize;
|
||||
let suffix_len = node.suffix_len as usize;
|
||||
|
||||
assert!(node.num_children > 0);
|
||||
|
||||
let mut keybuf = Vec::new();
|
||||
keybuf.extend(node.prefix);
|
||||
keybuf.resize(prefix_len + suffix_len, 0);
|
||||
|
||||
let mut iter: Either<Range<usize>, Rev<RangeInclusive<usize>>> = if let Some(iter) = opt_iter {
|
||||
iter
|
||||
} else {
|
||||
// Locate the first match
|
||||
let idx = match node.binary_search(start_key, keybuf.as_mut_slice()) {
|
||||
Ok(idx) => idx,
|
||||
Err(idx) => {
|
||||
if node.level == 0 {
|
||||
// Imagine that the node contains the following keys:
|
||||
//
|
||||
// 1
|
||||
// 3 <-- idx
|
||||
// 5
|
||||
//
|
||||
// If the search key is '2' and there is exact match,
|
||||
// the binary search would return the index of key
|
||||
// '3'. That's cool, '3' is the first key to return.
|
||||
idx
|
||||
} else {
|
||||
// This is an internal page, so each key represents a lower
|
||||
// bound for what's in the child page. If there is no exact
|
||||
// match, we have to return the *previous* entry.
|
||||
//
|
||||
// 1 <-- return this
|
||||
// 3 <-- idx
|
||||
// 5
|
||||
idx.saturating_sub(1)
|
||||
}
|
||||
}
|
||||
};
|
||||
Either::Left(idx..node.num_children.into())
|
||||
};
|
||||
|
||||
// idx points to the first match now. Keep going from there
|
||||
while let Some(idx) = iter.next() {
|
||||
let key_off = idx * suffix_len;
|
||||
let suffix = &node.keys[key_off..key_off + suffix_len];
|
||||
keybuf[prefix_len..].copy_from_slice(suffix);
|
||||
let value = node.value(idx);
|
||||
#[allow(clippy::collapsible_if)]
|
||||
if node.level == 0 {
|
||||
// leaf
|
||||
yield (keybuf.clone(), value.to_u64());
|
||||
} else {
|
||||
stack.push((node_blknum, Some(iter)));
|
||||
stack.push((value.to_blknum(), None));
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// Scan the tree, starting from 'search_key', in the given direction. 'visitor'
|
||||
/// will be called for every key >= 'search_key' (or <= 'search_key', if scanning
|
||||
|
||||
@@ -46,6 +46,7 @@ use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
|
||||
use anyhow::{anyhow, bail, ensure, Context, Result};
|
||||
use bytes::BytesMut;
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use futures::StreamExt;
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
use pageserver_api::models::LayerAccessKind;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
@@ -847,10 +848,33 @@ impl DeltaLayerInner {
|
||||
reconstruct_state: &mut ValuesReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), GetVectoredError> {
|
||||
let reads = self
|
||||
.plan_reads(keyspace, lsn_range, reconstruct_state, ctx)
|
||||
.await
|
||||
.map_err(GetVectoredError::Other)?;
|
||||
let block_reader = FileBlockReader::new(&self.file, self.file_id);
|
||||
let index_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
|
||||
self.index_start_blk,
|
||||
self.index_root_blk,
|
||||
block_reader,
|
||||
);
|
||||
|
||||
let planner = VectoredReadPlanner::new(
|
||||
self.max_vectored_read_bytes
|
||||
.expect("Layer is loaded with max vectored bytes config")
|
||||
.0
|
||||
.into(),
|
||||
);
|
||||
|
||||
let data_end_offset = self.index_start_blk as u64 * PAGE_SZ as u64;
|
||||
|
||||
let reads = Self::plan_reads(
|
||||
keyspace,
|
||||
lsn_range,
|
||||
data_end_offset,
|
||||
index_reader,
|
||||
planner,
|
||||
reconstruct_state,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
.map_err(GetVectoredError::Other)?;
|
||||
|
||||
self.do_reads_and_update_state(reads, reconstruct_state)
|
||||
.await;
|
||||
@@ -858,73 +882,64 @@ impl DeltaLayerInner {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn plan_reads(
|
||||
&self,
|
||||
async fn plan_reads<Reader>(
|
||||
keyspace: KeySpace,
|
||||
lsn_range: Range<Lsn>,
|
||||
data_end_offset: u64,
|
||||
index_reader: DiskBtreeReader<Reader, DELTA_KEY_SIZE>,
|
||||
mut planner: VectoredReadPlanner,
|
||||
reconstruct_state: &mut ValuesReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Vec<VectoredRead>> {
|
||||
let mut planner = VectoredReadPlanner::new(
|
||||
self.max_vectored_read_bytes
|
||||
.expect("Layer is loaded with max vectored bytes config")
|
||||
.0
|
||||
.into(),
|
||||
);
|
||||
|
||||
let block_reader = FileBlockReader::new(&self.file, self.file_id);
|
||||
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
|
||||
self.index_start_blk,
|
||||
self.index_root_blk,
|
||||
block_reader,
|
||||
);
|
||||
) -> anyhow::Result<Vec<VectoredRead>>
|
||||
where
|
||||
Reader: BlockReader,
|
||||
{
|
||||
let ctx = RequestContextBuilder::extend(ctx)
|
||||
.page_content_kind(PageContentKind::DeltaLayerBtreeNode)
|
||||
.build();
|
||||
|
||||
for range in keyspace.ranges.iter() {
|
||||
let mut range_end_handled = false;
|
||||
|
||||
let start_key = DeltaKey::from_key_lsn(&range.start, lsn_range.start);
|
||||
tree_reader
|
||||
.visit(
|
||||
&start_key.0,
|
||||
VisitDirection::Forwards,
|
||||
|raw_key, value| {
|
||||
let key = Key::from_slice(&raw_key[..KEY_SIZE]);
|
||||
let lsn = DeltaKey::extract_lsn_from_buf(raw_key);
|
||||
let blob_ref = BlobRef(value);
|
||||
let index_stream = index_reader.get_stream_from(&start_key.0, &ctx);
|
||||
let mut index_stream = std::pin::pin!(index_stream);
|
||||
|
||||
assert!(key >= range.start && lsn >= lsn_range.start);
|
||||
while let Some(index_entry) = index_stream.next().await {
|
||||
let (raw_key, value) = index_entry?;
|
||||
let key = Key::from_slice(&raw_key[..KEY_SIZE]);
|
||||
let lsn = DeltaKey::extract_lsn_from_buf(&raw_key);
|
||||
let blob_ref = BlobRef(value);
|
||||
|
||||
let cached_lsn = reconstruct_state.get_cached_lsn(&key);
|
||||
let flag = {
|
||||
if cached_lsn >= Some(lsn) {
|
||||
BlobFlag::Ignore
|
||||
} else if blob_ref.will_init() {
|
||||
BlobFlag::Replaces
|
||||
} else {
|
||||
BlobFlag::None
|
||||
}
|
||||
};
|
||||
// Lsns are not monotonically increasing across keys, so we don't assert on them.
|
||||
assert!(key >= range.start);
|
||||
|
||||
if key >= range.end || (key.next() == range.end && lsn >= lsn_range.end) {
|
||||
planner.handle_range_end(blob_ref.pos());
|
||||
range_end_handled = true;
|
||||
false
|
||||
} else {
|
||||
planner.handle(key, lsn, blob_ref.pos(), flag);
|
||||
true
|
||||
}
|
||||
},
|
||||
&RequestContextBuilder::extend(ctx)
|
||||
.page_content_kind(PageContentKind::DeltaLayerBtreeNode)
|
||||
.build(),
|
||||
)
|
||||
.await
|
||||
.map_err(|err| anyhow!(err))?;
|
||||
let outside_lsn_range = !lsn_range.contains(&lsn);
|
||||
let below_cached_lsn = reconstruct_state.get_cached_lsn(&key) >= Some(lsn);
|
||||
|
||||
let flag = {
|
||||
if outside_lsn_range || below_cached_lsn {
|
||||
BlobFlag::Ignore
|
||||
} else if blob_ref.will_init() {
|
||||
BlobFlag::ReplaceAll
|
||||
} else {
|
||||
// Usual path: add blob to the read
|
||||
BlobFlag::None
|
||||
}
|
||||
};
|
||||
|
||||
if key >= range.end || (key.next() == range.end && lsn >= lsn_range.end) {
|
||||
planner.handle_range_end(blob_ref.pos());
|
||||
range_end_handled = true;
|
||||
break;
|
||||
} else {
|
||||
planner.handle(key, lsn, blob_ref.pos(), flag);
|
||||
}
|
||||
}
|
||||
|
||||
if !range_end_handled {
|
||||
let payload_end = self.index_start_blk as u64 * PAGE_SZ as u64;
|
||||
tracing::info!("Handling range end fallback at {}", payload_end);
|
||||
planner.handle_range_end(payload_end);
|
||||
tracing::info!("Handling range end fallback at {}", data_end_offset);
|
||||
planner.handle_range_end(data_end_offset);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1190,3 +1205,131 @@ impl<'a> pageserver_compaction::interface::CompactionDeltaEntry<'a, Key> for Del
|
||||
self.size
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
use super::*;
|
||||
use crate::{
|
||||
context::DownloadBehavior, task_mgr::TaskKind, tenant::disk_btree::tests::TestDisk,
|
||||
};
|
||||
|
||||
/// Construct an index for a fictional delta layer and and then
|
||||
/// traverse in order to plan vectored reads for a query. Finally,
|
||||
/// verify that the traversal fed the right index key and value
|
||||
/// pairs into the planner.
|
||||
#[tokio::test]
|
||||
async fn test_delta_layer_index_traversal() {
|
||||
let base_key = Key {
|
||||
field1: 0,
|
||||
field2: 1663,
|
||||
field3: 12972,
|
||||
field4: 16396,
|
||||
field5: 0,
|
||||
field6: 246080,
|
||||
};
|
||||
|
||||
// Populate the index with some entries
|
||||
let entries: BTreeMap<Key, Vec<Lsn>> = BTreeMap::from([
|
||||
(base_key, vec![Lsn(1), Lsn(5), Lsn(25), Lsn(26), Lsn(28)]),
|
||||
(base_key.add(1), vec![Lsn(2), Lsn(5), Lsn(10), Lsn(50)]),
|
||||
(base_key.add(2), vec![Lsn(2), Lsn(5), Lsn(10), Lsn(50)]),
|
||||
(base_key.add(5), vec![Lsn(10), Lsn(15), Lsn(16), Lsn(20)]),
|
||||
]);
|
||||
|
||||
let mut disk = TestDisk::default();
|
||||
let mut writer = DiskBtreeBuilder::<_, DELTA_KEY_SIZE>::new(&mut disk);
|
||||
|
||||
let mut disk_offset = 0;
|
||||
for (key, lsns) in &entries {
|
||||
for lsn in lsns {
|
||||
let index_key = DeltaKey::from_key_lsn(key, *lsn);
|
||||
let blob_ref = BlobRef::new(disk_offset, false);
|
||||
writer
|
||||
.append(&index_key.0, blob_ref.0)
|
||||
.expect("In memory disk append should never fail");
|
||||
|
||||
disk_offset += 1;
|
||||
}
|
||||
}
|
||||
|
||||
// Prepare all the arguments for the call into `plan_reads` below
|
||||
let (root_offset, _writer) = writer
|
||||
.finish()
|
||||
.expect("In memory disk finish should never fail");
|
||||
let reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(0, root_offset, disk);
|
||||
let planner = VectoredReadPlanner::new(100);
|
||||
let mut reconstruct_state = ValuesReconstructState::new();
|
||||
let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
|
||||
|
||||
let keyspace = KeySpace {
|
||||
ranges: vec![
|
||||
base_key..base_key.add(3),
|
||||
base_key.add(3)..base_key.add(100),
|
||||
],
|
||||
};
|
||||
let lsn_range = Lsn(2)..Lsn(40);
|
||||
|
||||
// Plan and validate
|
||||
let vectored_reads = DeltaLayerInner::plan_reads(
|
||||
keyspace.clone(),
|
||||
lsn_range.clone(),
|
||||
disk_offset,
|
||||
reader,
|
||||
planner,
|
||||
&mut reconstruct_state,
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
.expect("Read planning should not fail");
|
||||
|
||||
validate(keyspace, lsn_range, vectored_reads, entries);
|
||||
}
|
||||
|
||||
fn validate(
|
||||
keyspace: KeySpace,
|
||||
lsn_range: Range<Lsn>,
|
||||
vectored_reads: Vec<VectoredRead>,
|
||||
index_entries: BTreeMap<Key, Vec<Lsn>>,
|
||||
) {
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
struct BlobSpec {
|
||||
key: Key,
|
||||
lsn: Lsn,
|
||||
at: u64,
|
||||
}
|
||||
|
||||
let mut planned_blobs = Vec::new();
|
||||
for read in vectored_reads {
|
||||
for (at, meta) in read.blobs_at.as_slice() {
|
||||
planned_blobs.push(BlobSpec {
|
||||
key: meta.key,
|
||||
lsn: meta.lsn,
|
||||
at: *at,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
let mut expected_blobs = Vec::new();
|
||||
let mut disk_offset = 0;
|
||||
for (key, lsns) in index_entries {
|
||||
for lsn in lsns {
|
||||
let key_included = keyspace.ranges.iter().any(|range| range.contains(&key));
|
||||
let lsn_included = lsn_range.contains(&lsn);
|
||||
|
||||
if key_included && lsn_included {
|
||||
expected_blobs.push(BlobSpec {
|
||||
key,
|
||||
lsn,
|
||||
at: disk_offset,
|
||||
});
|
||||
}
|
||||
|
||||
disk_offset += 1;
|
||||
}
|
||||
}
|
||||
|
||||
assert_eq!(planned_blobs, expected_blobs);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -43,6 +43,7 @@ use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
|
||||
use anyhow::{anyhow, bail, ensure, Context, Result};
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use hex;
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
use pageserver_api::models::LayerAccessKind;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
@@ -54,6 +55,7 @@ use std::ops::Range;
|
||||
use std::os::unix::prelude::FileExt;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::OnceCell;
|
||||
use tokio_stream::StreamExt;
|
||||
use tracing::*;
|
||||
|
||||
use utils::{
|
||||
@@ -488,35 +490,33 @@ impl ImageLayerInner {
|
||||
let tree_reader =
|
||||
DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
|
||||
|
||||
let ctx = RequestContextBuilder::extend(ctx)
|
||||
.page_content_kind(PageContentKind::ImageLayerBtreeNode)
|
||||
.build();
|
||||
|
||||
for range in keyspace.ranges.iter() {
|
||||
let mut range_end_handled = false;
|
||||
|
||||
let mut search_key: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
|
||||
range.start.write_to_byte_slice(&mut search_key);
|
||||
|
||||
tree_reader
|
||||
.visit(
|
||||
&search_key,
|
||||
VisitDirection::Forwards,
|
||||
|raw_key, offset| {
|
||||
let key = Key::from_slice(&raw_key[..KEY_SIZE]);
|
||||
assert!(key >= range.start);
|
||||
let index_stream = tree_reader.get_stream_from(&search_key, &ctx);
|
||||
let mut index_stream = std::pin::pin!(index_stream);
|
||||
|
||||
if key >= range.end {
|
||||
planner.handle_range_end(offset);
|
||||
range_end_handled = true;
|
||||
false
|
||||
} else {
|
||||
planner.handle(key, self.lsn, offset, BlobFlag::None);
|
||||
true
|
||||
}
|
||||
},
|
||||
&RequestContextBuilder::extend(ctx)
|
||||
.page_content_kind(PageContentKind::ImageLayerBtreeNode)
|
||||
.build(),
|
||||
)
|
||||
.await
|
||||
.map_err(|err| GetVectoredError::Other(anyhow!(err)))?;
|
||||
while let Some(index_entry) = index_stream.next().await {
|
||||
let (raw_key, offset) = index_entry?;
|
||||
|
||||
let key = Key::from_slice(&raw_key[..KEY_SIZE]);
|
||||
assert!(key >= range.start);
|
||||
|
||||
if key >= range.end {
|
||||
planner.handle_range_end(offset);
|
||||
range_end_handled = true;
|
||||
break;
|
||||
} else {
|
||||
planner.handle(key, self.lsn, offset, BlobFlag::None);
|
||||
}
|
||||
}
|
||||
|
||||
if !range_end_handled {
|
||||
let payload_end = self.index_start_blk as u64 * PAGE_SZ as u64;
|
||||
|
||||
@@ -128,7 +128,7 @@ impl VectoredReadBuilder {
|
||||
pub enum BlobFlag {
|
||||
None,
|
||||
Ignore,
|
||||
Replaces,
|
||||
ReplaceAll,
|
||||
}
|
||||
|
||||
/// Planner for vectored blob reads.
|
||||
@@ -170,7 +170,7 @@ impl VectoredReadPlanner {
|
||||
/// incorrect data to the user.
|
||||
///
|
||||
/// The `flag` argument has two interesting values:
|
||||
/// * [`BlobFlag::Replaces`]: The blob for this key should replace all existing blobs.
|
||||
/// * [`BlobFlag::ReplaceAll`]: The blob for this key should replace all existing blobs.
|
||||
/// This is used for WAL records that `will_init`.
|
||||
/// * [`BlobFlag::Ignore`]: This blob should not be included in the read. This happens
|
||||
/// if the blob is cached.
|
||||
@@ -204,7 +204,7 @@ impl VectoredReadPlanner {
|
||||
let blobs_for_key = self.blobs.entry(key).or_default();
|
||||
blobs_for_key.push((lsn, start_offset, end_offset));
|
||||
}
|
||||
BlobFlag::Replaces => {
|
||||
BlobFlag::ReplaceAll => {
|
||||
let blobs_for_key = self.blobs.entry(key).or_default();
|
||||
blobs_for_key.clear();
|
||||
blobs_for_key.push((lsn, start_offset, end_offset));
|
||||
@@ -411,10 +411,10 @@ mod tests {
|
||||
let blob_descriptions = vec![
|
||||
(first_key, lsn, 0, BlobFlag::None), // First in read 1
|
||||
(first_key, lsn, 1024, BlobFlag::None), // Last in read 1
|
||||
(second_key, lsn, 2 * 1024, BlobFlag::Replaces),
|
||||
(second_key, lsn, 2 * 1024, BlobFlag::ReplaceAll),
|
||||
(second_key, lsn, 3 * 1024, BlobFlag::None),
|
||||
(second_key, lsn, 4 * 1024, BlobFlag::Replaces), // First in read 2
|
||||
(second_key, lsn, 5 * 1024, BlobFlag::None), // Last in read 2
|
||||
(second_key, lsn, 4 * 1024, BlobFlag::ReplaceAll), // First in read 2
|
||||
(second_key, lsn, 5 * 1024, BlobFlag::None), // Last in read 2
|
||||
];
|
||||
|
||||
let ranges = [&blob_descriptions[0..2], &blob_descriptions[4..]];
|
||||
|
||||
Reference in New Issue
Block a user