test(pageserver): more k-merge tests on duplicated keys (#8404)

Existing tenants and some selection of layers might produce duplicated
keys. Add tests to ensure the k-merge iterator handles it correctly. We
also enforced ordering of the k-merge iterator to put images before
deltas.

part of https://github.com/neondatabase/neon/issues/8002

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
Co-authored-by: Arpad Müller <arpad-m@users.noreply.github.com>
This commit is contained in:
Alex Chi Z.
2024-07-17 11:22:38 -04:00
committed by GitHub
parent f2b8e390e7
commit 839a5724a4
2 changed files with 163 additions and 16 deletions

View File

@@ -1672,6 +1672,7 @@ pub(crate) mod test {
use rand::RngCore;
use super::*;
use crate::repository::Value;
use crate::tenant::harness::TIMELINE_ID;
use crate::tenant::vectored_blob_io::StreamingVectoredReadPlanner;
use crate::tenant::Tenant;
@@ -1681,6 +1682,7 @@ pub(crate) mod test {
tenant::{disk_btree::tests::TestDisk, harness::TenantHarness},
DEFAULT_PG_VERSION,
};
use bytes::Bytes;
/// Construct an index for a fictional delta layer and and then
/// traverse in order to plan vectored reads for a query. Finally,
@@ -2249,6 +2251,15 @@ pub(crate) mod test {
(k1, l1).cmp(&(k2, l2))
}
pub(crate) fn sort_delta_value(
(k1, l1, v1): &(Key, Lsn, Value),
(k2, l2, v2): &(Key, Lsn, Value),
) -> std::cmp::Ordering {
let order_1 = if v1.is_image() { 0 } else { 1 };
let order_2 = if v2.is_image() { 0 } else { 1 };
(k1, l1, order_1).cmp(&(k2, l2, order_2))
}
pub(crate) async fn produce_delta_layer(
tenant: &Tenant,
tline: &Arc<Timeline>,
@@ -2257,7 +2268,7 @@ pub(crate) mod test {
) -> anyhow::Result<ResidentLayer> {
deltas.sort_by(sort_delta);
let (key_start, _, _) = deltas.first().unwrap();
let (key_max, _, _) = deltas.first().unwrap();
let (key_max, _, _) = deltas.last().unwrap();
let lsn_min = deltas.iter().map(|(_, lsn, _)| lsn).min().unwrap();
let lsn_max = deltas.iter().map(|(_, lsn, _)| lsn).max().unwrap();
let lsn_end = Lsn(lsn_max.0 + 1);
@@ -2302,9 +2313,6 @@ pub(crate) mod test {
#[tokio::test]
async fn delta_layer_iterator() {
use crate::repository::Value;
use bytes::Bytes;
let harness = TenantHarness::create("delta_layer_iterator").unwrap();
let (tenant, ctx) = harness.load().await;

View File

@@ -96,15 +96,22 @@ impl<'a> std::cmp::PartialOrd for IteratorWrapper<'a> {
impl<'a> std::cmp::Ord for IteratorWrapper<'a> {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
use std::cmp::Ordering;
let a = self.peek_next_key_lsn();
let b = other.peek_next_key_lsn();
let a = self.peek_next_key_lsn_value();
let b = other.peek_next_key_lsn_value();
match (a, b) {
(Some((k1, l1)), Some((k2, l2))) => {
let loaded_1 = if self.is_loaded() { 1 } else { 0 };
let loaded_2 = if other.is_loaded() { 1 } else { 0 };
(Some((k1, l1, v1)), Some((k2, l2, v2))) => {
fn map_value_to_num(val: &Option<&Value>) -> usize {
match val {
None => 0,
Some(Value::Image(_)) => 1,
Some(Value::WalRecord(_)) => 2,
}
}
let order_1 = map_value_to_num(&v1);
let order_2 = map_value_to_num(&v2);
// When key_lsn are the same, the unloaded iter will always appear before the loaded one.
// And note that we do a reverse at the end of the comparison, so it works with the max heap.
(k1, l1, loaded_1).cmp(&(k2, l2, loaded_2))
(k1, l1, order_1).cmp(&(k2, l2, order_2))
}
(Some(_), None) => Ordering::Less,
(None, Some(_)) => Ordering::Greater,
@@ -137,13 +144,16 @@ impl<'a> IteratorWrapper<'a> {
}
}
fn peek_next_key_lsn(&self) -> Option<(&Key, Lsn)> {
fn peek_next_key_lsn_value(&self) -> Option<(&Key, Lsn, Option<&Value>)> {
match self {
Self::Loaded { iter } => iter.peek().as_ref().map(|(key, lsn, _)| (key, *lsn)),
Self::Loaded { iter } => iter
.peek()
.as_ref()
.map(|(key, lsn, val)| (key, *lsn, Some(val))),
Self::NotLoaded {
first_key_lower_bound: (key, lsn),
..
} => Some((key, *lsn)),
} => Some((key, *lsn, None)),
}
}
@@ -191,6 +201,13 @@ impl<'a> IteratorWrapper<'a> {
}
}
/// A merge iterator over delta/image layer iterators. When duplicated records are
/// found, the iterator will not perform any deduplication, and the caller should handle
/// these situation. By saying duplicated records, there are many possibilities:
/// * Two same delta at the same LSN.
/// * Two same image at the same LSN.
/// * Delta/image at the same LSN where the image has already applied the delta.
/// The iterator will always put the image before the delta.
pub struct MergeIterator<'a> {
heap: BinaryHeap<IteratorWrapper<'a>>,
}
@@ -245,8 +262,9 @@ mod tests {
use crate::{
tenant::{
harness::{TenantHarness, TIMELINE_ID},
storage_layer::delta_layer::test::{produce_delta_layer, sort_delta},
storage_layer::delta_layer::test::{produce_delta_layer, sort_delta, sort_delta_value},
},
walrecord::NeonWalRecord,
DEFAULT_PG_VERSION,
};
@@ -407,6 +425,127 @@ mod tests {
// TODO: test layers are loaded only when needed, reducing num of active iterators in k-merge
}
// TODO: image layer merge, delta+image mixed merge
// TODO: is it possible to have duplicated delta at same LSN now? we might need to test that
#[tokio::test]
async fn delta_image_mixed_merge() {
use crate::repository::Value;
use bytes::Bytes;
let harness = TenantHarness::create("merge_iterator_delta_image_mixed_merge").unwrap();
let (tenant, ctx) = harness.load().await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
.await
.unwrap();
fn get_key(id: u32) -> Key {
let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
key.field6 = id;
key
}
// In this test case, we want to test if the iterator still works correctly with multiple copies
// of a delta+image at the same LSN, for example, the following sequence a@10=+a, a@10=+a, a@10=ab, a@10=ab.
// Duplicated deltas/images are possible for old tenants before the full L0 compaction file name fix.
// An incomplete compaction could produce multiple exactly-the-same delta layers. Force image generation
// could produce overlapping images. Apart from duplicated deltas/images, in the current storage implementation
// one key-lsn could have a delta in the delta layer and one image in the image layer. The iterator should
// correctly process these situations and return everything as-is, and the upper layer of the system
// will handle duplicated LSNs.
let test_deltas1 = vec![
(
get_key(0),
Lsn(0x10),
Value::WalRecord(NeonWalRecord::wal_init()),
),
(
get_key(0),
Lsn(0x18),
Value::WalRecord(NeonWalRecord::wal_append("a")),
),
(
get_key(5),
Lsn(0x10),
Value::WalRecord(NeonWalRecord::wal_init()),
),
(
get_key(5),
Lsn(0x18),
Value::WalRecord(NeonWalRecord::wal_append("b")),
),
];
let resident_layer_1 = produce_delta_layer(&tenant, &tline, test_deltas1.clone(), &ctx)
.await
.unwrap();
let mut test_deltas2 = test_deltas1.clone();
test_deltas2.push((
get_key(10),
Lsn(0x20),
Value::Image(Bytes::copy_from_slice(b"test")),
));
let resident_layer_2 = produce_delta_layer(&tenant, &tline, test_deltas2.clone(), &ctx)
.await
.unwrap();
let test_deltas3 = vec![
(
get_key(0),
Lsn(0x10),
Value::Image(Bytes::copy_from_slice(b"")),
),
(
get_key(5),
Lsn(0x18),
Value::Image(Bytes::copy_from_slice(b"b")),
),
(
get_key(15),
Lsn(0x20),
Value::Image(Bytes::copy_from_slice(b"test")),
),
];
let resident_layer_3 = produce_delta_layer(&tenant, &tline, test_deltas3.clone(), &ctx)
.await
.unwrap();
let mut test_deltas4 = test_deltas3.clone();
test_deltas4.push((
get_key(20),
Lsn(0x20),
Value::Image(Bytes::copy_from_slice(b"test")),
));
let resident_layer_4 = produce_delta_layer(&tenant, &tline, test_deltas4.clone(), &ctx)
.await
.unwrap();
let mut expect = Vec::new();
expect.extend(test_deltas1);
expect.extend(test_deltas2);
expect.extend(test_deltas3);
expect.extend(test_deltas4);
expect.sort_by(sort_delta_value);
// Test with different layer order for MergeIterator::create to ensure the order
// is stable.
let mut merge_iter = MergeIterator::create(
&[
resident_layer_4.get_as_delta(&ctx).await.unwrap(),
resident_layer_1.get_as_delta(&ctx).await.unwrap(),
resident_layer_3.get_as_delta(&ctx).await.unwrap(),
resident_layer_2.get_as_delta(&ctx).await.unwrap(),
],
&[],
&ctx,
);
assert_merge_iter_equal(&mut merge_iter, &expect).await;
let mut merge_iter = MergeIterator::create(
&[
resident_layer_1.get_as_delta(&ctx).await.unwrap(),
resident_layer_4.get_as_delta(&ctx).await.unwrap(),
resident_layer_3.get_as_delta(&ctx).await.unwrap(),
resident_layer_2.get_as_delta(&ctx).await.unwrap(),
],
&[],
&ctx,
);
assert_merge_iter_equal(&mut merge_iter, &expect).await;
}
}