pageserver: unit test delta layer index traversal

This commit is contained in:
Vlad Lazar
2024-03-04 12:53:53 +00:00
parent 0870dafc32
commit 58f00b83c1

View File

@@ -849,10 +849,33 @@ impl DeltaLayerInner {
reconstruct_state: &mut ValuesReconstructState,
ctx: &RequestContext,
) -> Result<(), GetVectoredError> {
let reads = self
.plan_reads(keyspace, lsn_range, reconstruct_state, ctx)
.await
.map_err(GetVectoredError::Other)?;
let block_reader = FileBlockReader::new(&self.file, self.file_id);
let index_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
self.index_start_blk,
self.index_root_blk,
block_reader,
);
let planner = VectoredReadPlanner::new(
self.max_vectored_read_bytes
.expect("Layer is loaded with max vectored bytes config")
.0
.into(),
);
let data_end_offset = self.index_start_blk as u64 * PAGE_SZ as u64;
let reads = Self::plan_reads(
keyspace,
lsn_range,
data_end_offset,
index_reader,
planner,
reconstruct_state,
ctx,
)
.await
.map_err(GetVectoredError::Other)?;
self.do_reads_and_update_state(reads, reconstruct_state)
.await;
@@ -860,27 +883,19 @@ impl DeltaLayerInner {
Ok(())
}
async fn plan_reads(
&self,
// This is public only for testing purposes.
pub(crate) async fn plan_reads<Reader>(
keyspace: KeySpace,
lsn_range: Range<Lsn>,
data_end_offset: u64,
index_reader: DiskBtreeReader<Reader, DELTA_KEY_SIZE>,
mut planner: VectoredReadPlanner,
reconstruct_state: &mut ValuesReconstructState,
ctx: &RequestContext,
) -> anyhow::Result<Vec<VectoredRead>> {
let mut planner = VectoredReadPlanner::new(
self.max_vectored_read_bytes
.expect("Layer is loaded with max vectored bytes config")
.0
.into(),
);
let block_reader = FileBlockReader::new(&self.file, self.file_id);
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
self.index_start_blk,
self.index_root_blk,
block_reader,
);
) -> anyhow::Result<Vec<VectoredRead>>
where
Reader: BlockReader,
{
let btree_request_context = RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::DeltaLayerBtreeNode)
.build();
@@ -889,7 +904,7 @@ impl DeltaLayerInner {
let mut range_end_handled = false;
let start_key = DeltaKey::from_key_lsn(&range.start, lsn_range.start);
let index_stream = tree_reader.get_stream_from(&start_key.0, &btree_request_context);
let index_stream = index_reader.get_stream_from(&start_key.0, &btree_request_context);
pin_mut!(index_stream);
while let Some(index_entry) = index_stream.next().await {
@@ -928,9 +943,8 @@ impl DeltaLayerInner {
}
if !range_end_handled {
let payload_end = self.index_start_blk as u64 * PAGE_SZ as u64;
tracing::info!("Handling range end fallback at {}", payload_end);
planner.handle_range_end(payload_end);
tracing::info!("Handling range end fallback at {}", data_end_offset);
planner.handle_range_end(data_end_offset);
}
}
@@ -1196,3 +1210,131 @@ impl<'a> pageserver_compaction::interface::CompactionDeltaEntry<'a, Key> for Del
self.size
}
}
#[cfg(test)]
mod test {
use std::collections::BTreeMap;
use super::*;
use crate::{
context::DownloadBehavior, task_mgr::TaskKind, tenant::disk_btree::tests::TestDisk,
};
#[derive(Debug, PartialEq, Eq)]
struct BlobSpec {
key: Key,
lsn: Lsn,
at: u64,
}
fn validate(
keyspace: KeySpace,
lsn_range: Range<Lsn>,
vectored_reads: Vec<VectoredRead>,
index_entries: BTreeMap<Key, Vec<Lsn>>,
) {
let mut planned_blobs = Vec::new();
for read in vectored_reads {
for (at, meta) in read.blobs_at.as_slice() {
planned_blobs.push(BlobSpec {
key: meta.key,
lsn: meta.lsn,
at: *at,
});
}
}
let mut expected_blobs = Vec::new();
let mut disk_offset = 0;
for (key, lsns) in index_entries {
for lsn in lsns {
let key_included = keyspace.ranges.iter().any(|range| range.contains(&key));
let lsn_included = lsn_range.contains(&lsn);
if key_included && lsn_included {
expected_blobs.push(BlobSpec {
key,
lsn,
at: disk_offset,
});
}
disk_offset += 1;
}
}
assert_eq!(planned_blobs, expected_blobs);
}
/// Construct an index for a fictional delta layer and and then
/// traverse in order to plan vectored reads for a query. Finally,
/// verify that the traversal fed the right index key and value
/// pairs into the planner.
#[tokio::test]
async fn test_delta_layer_index_traversal() {
let base_key = Key {
field1: 0,
field2: 1663,
field3: 12972,
field4: 16396,
field5: 0,
field6: 246080,
};
// Populate the index with some entries
let entries: BTreeMap<Key, Vec<Lsn>> = BTreeMap::from([
(base_key, vec![Lsn(1), Lsn(5), Lsn(25), Lsn(26), Lsn(28)]),
(base_key.add(1), vec![Lsn(2), Lsn(5), Lsn(10), Lsn(50)]),
(base_key.add(2), vec![Lsn(2), Lsn(5), Lsn(10), Lsn(50)]),
(base_key.add(5), vec![Lsn(10), Lsn(15), Lsn(16), Lsn(20)]),
]);
let mut disk = TestDisk::default();
let mut writer = DiskBtreeBuilder::<_, DELTA_KEY_SIZE>::new(&mut disk);
let mut disk_offset = 0;
for (key, lsns) in &entries {
for lsn in lsns {
let index_key = DeltaKey::from_key_lsn(key, *lsn);
let blob_ref = BlobRef::new(disk_offset, false);
writer
.append(&index_key.0, blob_ref.0)
.expect("In memory disk append should never fail");
disk_offset += 1;
}
}
// Prepare all the arguments for the call into `plan_reads` below
let (root_offset, _writer) = writer
.finish()
.expect("In memory disk finish should never fail");
let reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(0, root_offset, disk);
let planner = VectoredReadPlanner::new(100);
let mut reconstruct_state = ValuesReconstructState::new();
let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
let keyspace = KeySpace {
ranges: vec![
base_key..base_key.add(3),
base_key.add(3)..base_key.add(100),
],
};
let lsn_range = Lsn(2)..Lsn(40);
// Plan and validate
let vectored_reads = DeltaLayerInner::plan_reads(
keyspace.clone(),
lsn_range.clone(),
disk_offset,
reader,
planner,
&mut reconstruct_state,
&ctx,
)
.await
.expect("Read planning should not fail");
validate(keyspace, lsn_range, vectored_reads, entries);
}
}