Fix tiered compaction k-merge bug and use in-memory alternative (#7661)

This PR does two things:

First, it fixes a bug with tiered compaction's k-merge implementation.
It ignored the lsn of a key during ordering, so multiple updates of the
same key could be read in arbitrary order, say from different layers.
For example there is layers `[(a, 2),(b, 3)]` and `[(a, 1),(c, 2)]` in
the heap, they might return `(a,2)` and `(a,1)`.

Ultimately, this change wasn't enough to fix the ordering issues in
#7296, in other words there is likely still bugs in the k-merge. So as
the second thing, we switch away from the k-merge to an in-memory based
one, similar to #4839, but leave the code around to be improved and
maybe switched to later on.

Part of #7296
This commit is contained in:
Arpad Müller
2024-05-09 16:01:16 +02:00
committed by GitHub
parent 107f535294
commit 41fb838799
2 changed files with 34 additions and 5 deletions

View File

@@ -24,7 +24,9 @@ use tracing::{debug, info};
use std::collections::{HashSet, VecDeque};
use std::ops::Range;
use crate::helpers::{accum_key_values, keyspace_total_size, merge_delta_keys, overlaps_with};
use crate::helpers::{
accum_key_values, keyspace_total_size, merge_delta_keys_buffered, overlaps_with,
};
use crate::interface::*;
use utils::lsn::Lsn;
@@ -535,7 +537,10 @@ where
}
}
// Open stream
let key_value_stream = std::pin::pin!(merge_delta_keys::<E>(deltas.as_slice(), ctx));
let key_value_stream =
std::pin::pin!(merge_delta_keys_buffered::<E>(deltas.as_slice(), ctx)
.await?
.map(Result::<_, anyhow::Error>::Ok));
let mut new_jobs = Vec::new();
// Slide a window through the keyspace

View File

@@ -14,6 +14,7 @@ use std::future::Future;
use std::ops::{DerefMut, Range};
use std::pin::Pin;
use std::task::{ready, Poll};
use utils::lsn::Lsn;
pub fn keyspace_total_size<K>(
keyspace: &CompactionKeySpace<K>,
@@ -109,17 +110,40 @@ pub fn merge_delta_keys<'a, E: CompactionJobExecutor>(
}
}
pub async fn merge_delta_keys_buffered<'a, E: CompactionJobExecutor + 'a>(
layers: &'a [E::DeltaLayer],
ctx: &'a E::RequestContext,
) -> anyhow::Result<impl Stream<Item = <E::DeltaLayer as CompactionDeltaLayer<E>>::DeltaEntry<'a>>>
{
let mut keys = Vec::new();
for l in layers {
// Boxing and casting to LoadFuture is required to obtain the right Sync bound.
// If we do l.load_keys(ctx).await? directly, there is a compilation error.
let load_future: LoadFuture<'a, _> = Box::pin(l.load_keys(ctx));
keys.extend(load_future.await?.into_iter());
}
keys.sort_by_key(|k| (k.key(), k.lsn()));
let stream = futures::stream::iter(keys.into_iter());
Ok(stream)
}
enum LazyLoadLayer<'a, E: CompactionJobExecutor> {
Loaded(VecDeque<<E::DeltaLayer as CompactionDeltaLayer<E>>::DeltaEntry<'a>>),
Unloaded(&'a E::DeltaLayer),
}
impl<'a, E: CompactionJobExecutor> LazyLoadLayer<'a, E> {
fn key(&self) -> E::Key {
fn min_key(&self) -> E::Key {
match self {
Self::Loaded(entries) => entries.front().unwrap().key(),
Self::Unloaded(dl) => dl.key_range().start,
}
}
fn min_lsn(&self) -> Lsn {
match self {
Self::Loaded(entries) => entries.front().unwrap().lsn(),
Self::Unloaded(dl) => dl.lsn_range().start,
}
}
}
impl<'a, E: CompactionJobExecutor> PartialOrd for LazyLoadLayer<'a, E> {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
@@ -129,12 +153,12 @@ impl<'a, E: CompactionJobExecutor> PartialOrd for LazyLoadLayer<'a, E> {
impl<'a, E: CompactionJobExecutor> Ord for LazyLoadLayer<'a, E> {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
// reverse order so that we get a min-heap
other.key().cmp(&self.key())
(other.min_key(), other.min_lsn()).cmp(&(self.min_key(), self.min_lsn()))
}
}
impl<'a, E: CompactionJobExecutor> PartialEq for LazyLoadLayer<'a, E> {
fn eq(&self, other: &Self) -> bool {
self.key().eq(&other.key())
self.cmp(other) == std::cmp::Ordering::Equal
}
}
impl<'a, E: CompactionJobExecutor> Eq for LazyLoadLayer<'a, E> {}