Tiered compaction: add order asserts after delta key k-merge (#7648)

Adds ordering asserts to the output of the delta key iterator
`MergeDeltaKeys` that implements a k-merge.

Part of #7296 : the asserts added by this PR get hit in the reproducers
of #7296 as well, but they are earlier in the pipeline.
This commit is contained in:
Arpad Müller
2024-05-08 13:22:27 +02:00
committed by GitHub
parent e3a2631df9
commit b6d547cf92

View File

@@ -9,6 +9,7 @@ use pageserver_api::shard::ShardIdentity;
use pin_project_lite::pin_project;
use std::collections::BinaryHeap;
use std::collections::VecDeque;
use std::fmt::Display;
use std::future::Future;
use std::ops::{DerefMut, Range};
use std::pin::Pin;
@@ -214,7 +215,7 @@ pub struct KeySize<K> {
pub fn accum_key_values<'a, I, K, D, E>(input: I) -> impl Stream<Item = Result<KeySize<K>, E>>
where
K: Eq,
K: Eq + PartialOrd + Display + Copy,
I: Stream<Item = Result<D, E>>,
D: CompactionDeltaEntry<'a, K>,
{
@@ -229,12 +230,15 @@ where
num_values: 1,
size: first.size(),
};
let mut last_key = accum.key;
while let Some(this) = input.next().await {
let this = this?;
if this.key() == accum.key {
accum.size += this.size();
accum.num_values += 1;
} else {
assert!(last_key <= accum.key, "last_key={last_key} <= accum.key={}", accum.key);
last_key = accum.key;
yield accum;
accum = KeySize {
key: this.key(),
@@ -243,6 +247,7 @@ where
};
}
}
assert!(last_key <= accum.key, "last_key={last_key} <= accum.key={}", accum.key);
yield accum;
}
}