mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 17:32:56 +00:00
refactor(pageserver): better k-merge implementation for tiered compaction
Signed-off-by: Alex Chi Z <chi@neon.tech>
This commit is contained in:
@@ -6,14 +6,10 @@ use futures::future::BoxFuture;
|
||||
use futures::{Stream, StreamExt};
|
||||
use itertools::Itertools;
|
||||
use pageserver_api::shard::ShardIdentity;
|
||||
use pin_project_lite::pin_project;
|
||||
use std::collections::BinaryHeap;
|
||||
use std::collections::VecDeque;
|
||||
use std::collections::{binary_heap, BinaryHeap};
|
||||
use std::fmt::Display;
|
||||
use std::future::Future;
|
||||
use std::ops::{DerefMut, Range};
|
||||
use std::pin::Pin;
|
||||
use std::task::{ready, Poll};
|
||||
use std::ops::Range;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
pub const PAGE_SZ: u64 = 8192;
|
||||
@@ -85,33 +81,6 @@ pub fn intersect_keyspace<K: Ord + Clone + Copy>(
|
||||
ranges
|
||||
}
|
||||
|
||||
/// Create a stream that iterates through all DeltaEntrys among all input
|
||||
/// layers, in key-lsn order.
|
||||
///
|
||||
/// This is public because the create_delta() implementation likely wants to use this too
|
||||
/// TODO: move to a more shared place
|
||||
pub fn merge_delta_keys<'a, E: CompactionJobExecutor>(
|
||||
layers: &'a [E::DeltaLayer],
|
||||
ctx: &'a E::RequestContext,
|
||||
) -> MergeDeltaKeys<'a, E> {
|
||||
// Use a binary heap to merge the layers. Each input layer is initially
|
||||
// represented by a LazyLoadLayer::Unloaded element, which uses the start of
|
||||
// the layer's key range as the key. The first time a layer reaches the top
|
||||
// of the heap, all the keys of the layer are loaded into a sorted vector.
|
||||
//
|
||||
// This helps to keep the memory usage reasonable: we only need to hold in
|
||||
// memory the DeltaEntrys of the layers that overlap with the "current" key.
|
||||
let mut heap: BinaryHeap<LazyLoadLayer<'a, E>> = BinaryHeap::new();
|
||||
for l in layers {
|
||||
heap.push(LazyLoadLayer::Unloaded(l));
|
||||
}
|
||||
MergeDeltaKeys {
|
||||
heap,
|
||||
ctx,
|
||||
load_future: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn merge_delta_keys_buffered<'a, E: CompactionJobExecutor + 'a>(
|
||||
layers: &'a [E::DeltaLayer],
|
||||
ctx: &'a E::RequestContext,
|
||||
@@ -129,104 +98,139 @@ pub async fn merge_delta_keys_buffered<'a, E: CompactionJobExecutor + 'a>(
|
||||
Ok(stream)
|
||||
}
|
||||
|
||||
enum LazyLoadLayer<'a, E: CompactionJobExecutor> {
|
||||
Loaded(VecDeque<<E::DeltaLayer as CompactionDeltaLayer<E>>::DeltaEntry<'a>>),
|
||||
Unloaded(&'a E::DeltaLayer),
|
||||
/// Wrapper type to make `dl.load_keys`` compile.
|
||||
type LoadFuture<'a, E> = BoxFuture<'a, anyhow::Result<Vec<E>>>;
|
||||
|
||||
pub enum LayerIterator<'a, E: CompactionJobExecutor> {
|
||||
Loaded(
|
||||
VecDeque<<E::DeltaLayer as CompactionDeltaLayer<E>>::DeltaEntry<'a>>,
|
||||
&'a E::RequestContext,
|
||||
),
|
||||
Unloaded(&'a E::DeltaLayer, &'a E::RequestContext),
|
||||
}
|
||||
impl<'a, E: CompactionJobExecutor> LazyLoadLayer<'a, E> {
|
||||
fn min_key(&self) -> E::Key {
|
||||
|
||||
impl<'a, E: CompactionJobExecutor + 'a> LayerIterator<'a, E> {
|
||||
pub fn new(delta_layer: &'a E::DeltaLayer, ctx: &'a E::RequestContext) -> Self {
|
||||
Self::Unloaded(delta_layer, ctx)
|
||||
}
|
||||
|
||||
pub fn key_lsn(&self) -> (E::Key, Lsn) {
|
||||
match self {
|
||||
Self::Loaded(entries) => entries.front().unwrap().key(),
|
||||
Self::Unloaded(dl) => dl.key_range().start,
|
||||
Self::Unloaded(dl, _) => (dl.key_range().start, dl.lsn_range().start),
|
||||
Self::Loaded(entries, _) => entries.front().map(|x| (x.key(), x.lsn())).unwrap(),
|
||||
}
|
||||
}
|
||||
fn min_lsn(&self) -> Lsn {
|
||||
|
||||
async fn load(&mut self) -> anyhow::Result<()> {
|
||||
match self {
|
||||
Self::Loaded(entries) => entries.front().unwrap().lsn(),
|
||||
Self::Unloaded(dl) => dl.lsn_range().start,
|
||||
Self::Unloaded(dl, ctx) => {
|
||||
let unloaded_key_lsn = (dl.key_range().start, dl.lsn_range().start);
|
||||
let fut: LoadFuture<
|
||||
'a,
|
||||
<E::DeltaLayer as CompactionDeltaLayer<E>>::DeltaEntry<'a>,
|
||||
> = Box::pin(dl.load_keys(ctx));
|
||||
let keys = VecDeque::from(fut.await?);
|
||||
assert_eq!(
|
||||
keys.front().as_ref().map(|x| (x.key(), x.lsn())).unwrap(),
|
||||
unloaded_key_lsn,
|
||||
"unmatched start key_lsn"
|
||||
);
|
||||
*self = Self::Loaded(keys, ctx);
|
||||
Ok(())
|
||||
}
|
||||
Self::Loaded(_, _) => Ok(()),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn entry(
|
||||
&mut self,
|
||||
) -> anyhow::Result<&<E::DeltaLayer as CompactionDeltaLayer<E>>::DeltaEntry<'a>> {
|
||||
self.load().await?;
|
||||
let Self::Loaded(x, _) = self else {
|
||||
unreachable!()
|
||||
};
|
||||
Ok(x.front().unwrap())
|
||||
}
|
||||
|
||||
pub async fn next(
|
||||
&mut self,
|
||||
) -> anyhow::Result<<E::DeltaLayer as CompactionDeltaLayer<E>>::DeltaEntry<'a>> {
|
||||
self.load().await?; // requires Box::pin to make it compile
|
||||
let Self::Loaded(x, _) = self else {
|
||||
unreachable!()
|
||||
};
|
||||
Ok(x.pop_front().expect("already reached the end"))
|
||||
}
|
||||
|
||||
pub fn is_end(&self) -> bool {
|
||||
match self {
|
||||
Self::Unloaded(_, _) => false,
|
||||
Self::Loaded(x, _) => x.is_empty(),
|
||||
}
|
||||
}
|
||||
}
|
||||
impl<'a, E: CompactionJobExecutor> PartialOrd for LazyLoadLayer<'a, E> {
|
||||
|
||||
impl<'a, E: CompactionJobExecutor + 'a> PartialOrd for LayerIterator<'a, E> {
|
||||
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
|
||||
Some(self.cmp(other))
|
||||
}
|
||||
}
|
||||
impl<'a, E: CompactionJobExecutor> Ord for LazyLoadLayer<'a, E> {
|
||||
|
||||
impl<'a, E: CompactionJobExecutor + 'a> Ord for LayerIterator<'a, E> {
|
||||
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
|
||||
// reverse order so that we get a min-heap
|
||||
(other.min_key(), other.min_lsn()).cmp(&(self.min_key(), self.min_lsn()))
|
||||
// reverse comparison to get a min-heap
|
||||
other.key_lsn().cmp(&self.key_lsn())
|
||||
}
|
||||
}
|
||||
impl<'a, E: CompactionJobExecutor> PartialEq for LazyLoadLayer<'a, E> {
|
||||
|
||||
impl<'a, E: CompactionJobExecutor + 'a> PartialEq for LayerIterator<'a, E> {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
self.cmp(other) == std::cmp::Ordering::Equal
|
||||
}
|
||||
}
|
||||
impl<'a, E: CompactionJobExecutor> Eq for LazyLoadLayer<'a, E> {}
|
||||
|
||||
type LoadFuture<'a, E> = BoxFuture<'a, anyhow::Result<Vec<E>>>;
|
||||
impl<'a, E: CompactionJobExecutor + 'a> Eq for LayerIterator<'a, E> {}
|
||||
|
||||
// Stream returned by `merge_delta_keys`
|
||||
pin_project! {
|
||||
#[allow(clippy::type_complexity)]
|
||||
pub struct MergeDeltaKeys<'a, E: CompactionJobExecutor> {
|
||||
heap: BinaryHeap<LazyLoadLayer<'a, E>>,
|
||||
|
||||
#[pin]
|
||||
load_future: Option<LoadFuture<'a, <E::DeltaLayer as CompactionDeltaLayer<E>>::DeltaEntry<'a>>>,
|
||||
|
||||
ctx: &'a E::RequestContext,
|
||||
}
|
||||
pub struct DeltaMergeIterator<'a, E: CompactionJobExecutor> {
|
||||
heap: BinaryHeap<LayerIterator<'a, E>>,
|
||||
}
|
||||
|
||||
impl<'a, E> Stream for MergeDeltaKeys<'a, E>
|
||||
where
|
||||
E: CompactionJobExecutor + 'a,
|
||||
{
|
||||
type Item = anyhow::Result<<E::DeltaLayer as CompactionDeltaLayer<E>>::DeltaEntry<'a>>;
|
||||
impl<'a, E: CompactionJobExecutor + 'a> DeltaMergeIterator<'a, E> {
|
||||
pub fn new(delta_layers: &'a [E::DeltaLayer], ctx: &'a E::RequestContext) -> Self {
|
||||
let mut heap = BinaryHeap::new();
|
||||
for dl in delta_layers {
|
||||
heap.push(LayerIterator::new(dl, ctx));
|
||||
}
|
||||
Self { heap }
|
||||
}
|
||||
|
||||
fn poll_next(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> Poll<std::option::Option<<Self as futures::Stream>::Item>> {
|
||||
let mut this = self.project();
|
||||
loop {
|
||||
if let Some(mut load_future) = this.load_future.as_mut().as_pin_mut() {
|
||||
// We are waiting for loading the keys to finish
|
||||
match ready!(load_future.as_mut().poll(cx)) {
|
||||
Ok(entries) => {
|
||||
this.load_future.set(None);
|
||||
*this.heap.peek_mut().unwrap() =
|
||||
LazyLoadLayer::Loaded(VecDeque::from(entries));
|
||||
}
|
||||
Err(e) => {
|
||||
return Poll::Ready(Some(Err(e)));
|
||||
}
|
||||
pub fn is_end(&self) -> bool {
|
||||
self.heap.is_empty()
|
||||
}
|
||||
|
||||
/// The next key-lsn entry that will be returned by `next`.
|
||||
pub fn key_lsn(&self) -> (E::Key, Lsn) {
|
||||
self.heap.peek().expect("already reached the end").key_lsn()
|
||||
}
|
||||
|
||||
/// Move to the next entry and return the current entry.
|
||||
pub async fn next(
|
||||
&mut self,
|
||||
) -> anyhow::Result<<E::DeltaLayer as CompactionDeltaLayer<E>>::DeltaEntry<'a>> {
|
||||
let Some(mut top) = self.heap.peek_mut() else {
|
||||
panic!("already reached the end")
|
||||
};
|
||||
match top.next().await {
|
||||
Ok(entry) => {
|
||||
if top.is_end() {
|
||||
binary_heap::PeekMut::pop(top);
|
||||
}
|
||||
Ok(entry)
|
||||
}
|
||||
|
||||
// If the topmost layer in the heap hasn't been loaded yet, start
|
||||
// loading it. Otherwise return the next entry from it and update
|
||||
// the layer's position in the heap (this decreaseKey operation is
|
||||
// performed implicitly when `top` is dropped).
|
||||
if let Some(mut top) = this.heap.peek_mut() {
|
||||
match top.deref_mut() {
|
||||
LazyLoadLayer::Unloaded(ref mut l) => {
|
||||
let fut = l.load_keys(this.ctx);
|
||||
this.load_future.set(Some(Box::pin(fut)));
|
||||
continue;
|
||||
}
|
||||
LazyLoadLayer::Loaded(ref mut entries) => {
|
||||
let result = entries.pop_front().unwrap();
|
||||
if entries.is_empty() {
|
||||
std::collections::binary_heap::PeekMut::pop(top);
|
||||
}
|
||||
return Poll::Ready(Some(Ok(result)));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
return Poll::Ready(None);
|
||||
Err(e) => {
|
||||
// pop the item if there is an error, otherwise it might cause further panic when binary heap compares it after `PeekMut` gets dropped.
|
||||
binary_heap::PeekMut::pop(top);
|
||||
Err(e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -92,7 +92,9 @@ pub trait CompactionJobExecutor {
|
||||
) -> impl Future<Output = anyhow::Result<()>> + Send;
|
||||
}
|
||||
|
||||
pub trait CompactionKey: std::cmp::Ord + Clone + Copy + std::fmt::Display {
|
||||
pub trait CompactionKey:
|
||||
std::cmp::Ord + Clone + Copy + std::fmt::Display + std::fmt::Debug
|
||||
{
|
||||
const MIN: Self;
|
||||
const MAX: Self;
|
||||
|
||||
|
||||
@@ -2,7 +2,6 @@ mod draw;
|
||||
|
||||
use draw::{LayerTraceEvent, LayerTraceFile, LayerTraceOp};
|
||||
|
||||
use futures::StreamExt;
|
||||
use pageserver_api::shard::ShardIdentity;
|
||||
use rand::Rng;
|
||||
use tracing::info;
|
||||
@@ -15,7 +14,8 @@ use std::sync::Arc;
|
||||
use std::sync::Mutex;
|
||||
|
||||
use crate::helpers::PAGE_SZ;
|
||||
use crate::helpers::{merge_delta_keys, overlaps_with};
|
||||
use crate::helpers::overlaps_with;
|
||||
use crate::helpers::DeltaMergeIterator;
|
||||
|
||||
use crate::interface;
|
||||
use crate::interface::CompactionLayer;
|
||||
@@ -545,12 +545,11 @@ impl interface::CompactionJobExecutor for MockTimeline {
|
||||
input_layers: &[Arc<MockDeltaLayer>],
|
||||
ctx: &MockRequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut key_value_stream =
|
||||
std::pin::pin!(merge_delta_keys::<MockTimeline>(input_layers, ctx));
|
||||
let mut key_value_stream = DeltaMergeIterator::<MockTimeline>::new(input_layers, ctx);
|
||||
let mut records: Vec<MockRecord> = Vec::new();
|
||||
let mut total_len = 2;
|
||||
while let Some(delta_entry) = key_value_stream.next().await {
|
||||
let delta_entry: MockRecord = delta_entry?;
|
||||
while !key_value_stream.is_end() {
|
||||
let delta_entry: MockRecord = key_value_stream.next().await?;
|
||||
if key_range.contains(&delta_entry.key) && lsn_range.contains(&delta_entry.lsn) {
|
||||
total_len += delta_entry.len;
|
||||
records.push(delta_entry);
|
||||
|
||||
Reference in New Issue
Block a user