pageserver: add vectored get unit test

This commit is contained in:
Vlad Lazar
2024-02-02 14:35:23 +00:00
parent 80134af36a
commit c67dfad1e0

View File

@@ -4153,6 +4153,7 @@ mod tests {
use bytes::BytesMut;
use hex_literal::hex;
use once_cell::sync::Lazy;
use pageserver_api::keyspace::KeySpace;
use rand::{thread_rng, Rng};
use tokio_util::sync::CancellationToken;
@@ -4845,6 +4846,61 @@ mod tests {
Ok(())
}
async fn bulk_insert_compact_gc(
timeline: Arc<Timeline>,
ctx: &RequestContext,
mut lsn: Lsn,
repeat: usize,
key_count: usize,
) -> anyhow::Result<()> {
let mut test_key = Key::from_hex("010000000033333333444444445500000000").unwrap();
let mut blknum = 0;
// Enforce that key range is monotonously increasing
let mut keyspace = KeySpaceAccum::new();
for _ in 0..repeat {
for _ in 0..key_count {
test_key.field6 = blknum;
let writer = timeline.writer().await;
writer
.put(
test_key,
lsn,
&Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
ctx,
)
.await?;
writer.finish_write(lsn);
drop(writer);
keyspace.add_key(test_key);
lsn = Lsn(lsn.0 + 0x10);
blknum += 1;
}
let cutoff = timeline.get_last_record_lsn();
timeline
.update_gc_info(
Vec::new(),
cutoff,
Duration::ZERO,
&CancellationToken::new(),
ctx,
)
.await?;
timeline.freeze_and_flush().await?;
timeline
.compact(&CancellationToken::new(), EnumSet::empty(), ctx)
.await?;
timeline.gc().await?;
}
Ok(())
}
//
// Insert 1000 key-value pairs with increasing keys, flush, compact, GC.
// Repeat 50 times.
@@ -4857,49 +4913,98 @@ mod tests {
.create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)
.await?;
let mut lsn = Lsn(0x10);
let lsn = Lsn(0x10);
bulk_insert_compact_gc(tline.clone(), &ctx, lsn, 50, 10000).await?;
let mut keyspace = KeySpaceAccum::new();
Ok(())
}
let mut test_key = Key::from_hex("010000000033333333444444445500000000").unwrap();
let mut blknum = 0;
for _ in 0..50 {
for _ in 0..10000 {
test_key.field6 = blknum;
let writer = tline.writer().await;
writer
.put(
test_key,
lsn,
&Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
&ctx,
)
.await?;
writer.finish_write(lsn);
drop(writer);
// Test the vectored get real implementation against a simple sequential implementation.
//
// The test generates a keyspace by repeatedly flushing the in-memory layer and compacting.
// Projected to 2D the key space looks like below. Lsn grows upwards on the Y axis and keys
// grow to the right on the X axis.
// [Delta]
// [Delta]
// [Delta]
// [Delta]
// ------------ Image ---------------
//
// After layer generation we pick the ranges to query as follows:
// 1. The beginning of each delta layer
// 2. At the seam between two adjacent delta layers
//
// There's one major downside to this test: delta layers only contains images,
// so the search can stop at the first delta layer and doesn't traverse any deeper.
#[tokio::test]
async fn test_get_vectored() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_get_vectored")?;
let (tenant, ctx) = harness.load().await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)
.await?;
keyspace.add_key(test_key);
let lsn = Lsn(0x10);
bulk_insert_compact_gc(tline.clone(), &ctx, lsn, 50, 10000).await?;
lsn = Lsn(lsn.0 + 0x10);
blknum += 1;
let guard = tline.layers.read().await;
guard.layer_map().dump(true, &ctx).await?;
let mut reads = Vec::new();
let mut prev = None;
guard.layer_map().iter_historic_layers().for_each(|desc| {
if !desc.is_delta() {
prev = Some(desc.clone());
return;
}
let cutoff = tline.get_last_record_lsn();
let start = desc.key_range.start;
let end = desc
.key_range
.start
.add(Timeline::MAX_GET_VECTORED_KEYS.try_into().unwrap());
reads.push(KeySpace {
ranges: vec![start..end],
});
if let Some(prev) = &prev {
if !prev.is_delta() {
return;
}
let first_range = Key {
field6: prev.key_range.end.field6 - 4,
..prev.key_range.end
}..prev.key_range.end;
let second_range = desc.key_range.start..Key {
field6: desc.key_range.start.field6 + 4,
..desc.key_range.start
};
reads.push(KeySpace {
ranges: vec![first_range, second_range],
});
};
prev = Some(desc.clone());
});
drop(guard);
// Pick a big LSN such that we query over all the changes.
// Technically, u64::MAX - 1 is the largest LSN supported by the read path,
// but there seems to be a bug on the non-vectored search path which surfaces
// in that case.
let reads_lsn = Lsn(u64::MAX - 1000);
for read in reads {
info!("Doing vectored read on {:?}", read);
let vectored_res = tline.get_vectored_impl(read.clone(), reads_lsn, &ctx).await;
tline
.update_gc_info(
Vec::new(),
cutoff,
Duration::ZERO,
&CancellationToken::new(),
&ctx,
)
.await?;
tline.freeze_and_flush().await?;
tline
.compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
.await?;
tline.gc().await?;
.validate_get_vectored_impl(&vectored_res, read, reads_lsn, &ctx)
.await;
}
Ok(())