From a40eee31a76ffbdaf331757eef8e03d2b3abe15a Mon Sep 17 00:00:00 2001 From: Alex Chi Z Date: Tue, 14 May 2024 12:17:05 -0400 Subject: [PATCH] refactor(pageserver): better k-merge implementation for tiered compaction Signed-off-by: Alex Chi Z --- pageserver/compaction/src/helpers.rs | 214 +++++++++++++------------ pageserver/compaction/src/interface.rs | 4 +- pageserver/compaction/src/simulator.rs | 11 +- 3 files changed, 117 insertions(+), 112 deletions(-) diff --git a/pageserver/compaction/src/helpers.rs b/pageserver/compaction/src/helpers.rs index 8ed1d16082..1eeafb0b1c 100644 --- a/pageserver/compaction/src/helpers.rs +++ b/pageserver/compaction/src/helpers.rs @@ -6,14 +6,10 @@ use futures::future::BoxFuture; use futures::{Stream, StreamExt}; use itertools::Itertools; use pageserver_api::shard::ShardIdentity; -use pin_project_lite::pin_project; -use std::collections::BinaryHeap; use std::collections::VecDeque; +use std::collections::{binary_heap, BinaryHeap}; use std::fmt::Display; -use std::future::Future; -use std::ops::{DerefMut, Range}; -use std::pin::Pin; -use std::task::{ready, Poll}; +use std::ops::Range; use utils::lsn::Lsn; pub const PAGE_SZ: u64 = 8192; @@ -85,33 +81,6 @@ pub fn intersect_keyspace( ranges } -/// Create a stream that iterates through all DeltaEntrys among all input -/// layers, in key-lsn order. -/// -/// This is public because the create_delta() implementation likely wants to use this too -/// TODO: move to a more shared place -pub fn merge_delta_keys<'a, E: CompactionJobExecutor>( - layers: &'a [E::DeltaLayer], - ctx: &'a E::RequestContext, -) -> MergeDeltaKeys<'a, E> { - // Use a binary heap to merge the layers. Each input layer is initially - // represented by a LazyLoadLayer::Unloaded element, which uses the start of - // the layer's key range as the key. The first time a layer reaches the top - // of the heap, all the keys of the layer are loaded into a sorted vector. - // - // This helps to keep the memory usage reasonable: we only need to hold in - // memory the DeltaEntrys of the layers that overlap with the "current" key. - let mut heap: BinaryHeap> = BinaryHeap::new(); - for l in layers { - heap.push(LazyLoadLayer::Unloaded(l)); - } - MergeDeltaKeys { - heap, - ctx, - load_future: None, - } -} - pub async fn merge_delta_keys_buffered<'a, E: CompactionJobExecutor + 'a>( layers: &'a [E::DeltaLayer], ctx: &'a E::RequestContext, @@ -129,104 +98,139 @@ pub async fn merge_delta_keys_buffered<'a, E: CompactionJobExecutor + 'a>( Ok(stream) } -enum LazyLoadLayer<'a, E: CompactionJobExecutor> { - Loaded(VecDeque<>::DeltaEntry<'a>>), - Unloaded(&'a E::DeltaLayer), +/// Wrapper type to make `dl.load_keys`` compile. +type LoadFuture<'a, E> = BoxFuture<'a, anyhow::Result>>; + +pub enum LayerIterator<'a, E: CompactionJobExecutor> { + Loaded( + VecDeque<>::DeltaEntry<'a>>, + &'a E::RequestContext, + ), + Unloaded(&'a E::DeltaLayer, &'a E::RequestContext), } -impl<'a, E: CompactionJobExecutor> LazyLoadLayer<'a, E> { - fn min_key(&self) -> E::Key { + +impl<'a, E: CompactionJobExecutor + 'a> LayerIterator<'a, E> { + pub fn new(delta_layer: &'a E::DeltaLayer, ctx: &'a E::RequestContext) -> Self { + Self::Unloaded(delta_layer, ctx) + } + + pub fn key_lsn(&self) -> (E::Key, Lsn) { match self { - Self::Loaded(entries) => entries.front().unwrap().key(), - Self::Unloaded(dl) => dl.key_range().start, + Self::Unloaded(dl, _) => (dl.key_range().start, dl.lsn_range().start), + Self::Loaded(entries, _) => entries.front().map(|x| (x.key(), x.lsn())).unwrap(), } } - fn min_lsn(&self) -> Lsn { + + async fn load(&mut self) -> anyhow::Result<()> { match self { - Self::Loaded(entries) => entries.front().unwrap().lsn(), - Self::Unloaded(dl) => dl.lsn_range().start, + Self::Unloaded(dl, ctx) => { + let unloaded_key_lsn = (dl.key_range().start, dl.lsn_range().start); + let fut: LoadFuture< + 'a, + >::DeltaEntry<'a>, + > = Box::pin(dl.load_keys(ctx)); + let keys = VecDeque::from(fut.await?); + assert_eq!( + keys.front().as_ref().map(|x| (x.key(), x.lsn())).unwrap(), + unloaded_key_lsn, + "unmatched start key_lsn" + ); + *self = Self::Loaded(keys, ctx); + Ok(()) + } + Self::Loaded(_, _) => Ok(()), + } + } + + pub async fn entry( + &mut self, + ) -> anyhow::Result<&>::DeltaEntry<'a>> { + self.load().await?; + let Self::Loaded(x, _) = self else { + unreachable!() + }; + Ok(x.front().unwrap()) + } + + pub async fn next( + &mut self, + ) -> anyhow::Result<>::DeltaEntry<'a>> { + self.load().await?; // requires Box::pin to make it compile + let Self::Loaded(x, _) = self else { + unreachable!() + }; + Ok(x.pop_front().expect("already reached the end")) + } + + pub fn is_end(&self) -> bool { + match self { + Self::Unloaded(_, _) => false, + Self::Loaded(x, _) => x.is_empty(), } } } -impl<'a, E: CompactionJobExecutor> PartialOrd for LazyLoadLayer<'a, E> { + +impl<'a, E: CompactionJobExecutor + 'a> PartialOrd for LayerIterator<'a, E> { fn partial_cmp(&self, other: &Self) -> Option { Some(self.cmp(other)) } } -impl<'a, E: CompactionJobExecutor> Ord for LazyLoadLayer<'a, E> { + +impl<'a, E: CompactionJobExecutor + 'a> Ord for LayerIterator<'a, E> { fn cmp(&self, other: &Self) -> std::cmp::Ordering { - // reverse order so that we get a min-heap - (other.min_key(), other.min_lsn()).cmp(&(self.min_key(), self.min_lsn())) + // reverse comparison to get a min-heap + other.key_lsn().cmp(&self.key_lsn()) } } -impl<'a, E: CompactionJobExecutor> PartialEq for LazyLoadLayer<'a, E> { + +impl<'a, E: CompactionJobExecutor + 'a> PartialEq for LayerIterator<'a, E> { fn eq(&self, other: &Self) -> bool { self.cmp(other) == std::cmp::Ordering::Equal } } -impl<'a, E: CompactionJobExecutor> Eq for LazyLoadLayer<'a, E> {} -type LoadFuture<'a, E> = BoxFuture<'a, anyhow::Result>>; +impl<'a, E: CompactionJobExecutor + 'a> Eq for LayerIterator<'a, E> {} -// Stream returned by `merge_delta_keys` -pin_project! { -#[allow(clippy::type_complexity)] -pub struct MergeDeltaKeys<'a, E: CompactionJobExecutor> { - heap: BinaryHeap>, - - #[pin] - load_future: Option>::DeltaEntry<'a>>>, - - ctx: &'a E::RequestContext, -} +pub struct DeltaMergeIterator<'a, E: CompactionJobExecutor> { + heap: BinaryHeap>, } -impl<'a, E> Stream for MergeDeltaKeys<'a, E> -where - E: CompactionJobExecutor + 'a, -{ - type Item = anyhow::Result<>::DeltaEntry<'a>>; +impl<'a, E: CompactionJobExecutor + 'a> DeltaMergeIterator<'a, E> { + pub fn new(delta_layers: &'a [E::DeltaLayer], ctx: &'a E::RequestContext) -> Self { + let mut heap = BinaryHeap::new(); + for dl in delta_layers { + heap.push(LayerIterator::new(dl, ctx)); + } + Self { heap } + } - fn poll_next( - self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll::Item>> { - let mut this = self.project(); - loop { - if let Some(mut load_future) = this.load_future.as_mut().as_pin_mut() { - // We are waiting for loading the keys to finish - match ready!(load_future.as_mut().poll(cx)) { - Ok(entries) => { - this.load_future.set(None); - *this.heap.peek_mut().unwrap() = - LazyLoadLayer::Loaded(VecDeque::from(entries)); - } - Err(e) => { - return Poll::Ready(Some(Err(e))); - } + pub fn is_end(&self) -> bool { + self.heap.is_empty() + } + + /// The next key-lsn entry that will be returned by `next`. + pub fn key_lsn(&self) -> (E::Key, Lsn) { + self.heap.peek().expect("already reached the end").key_lsn() + } + + /// Move to the next entry and return the current entry. + pub async fn next( + &mut self, + ) -> anyhow::Result<>::DeltaEntry<'a>> { + let Some(mut top) = self.heap.peek_mut() else { + panic!("already reached the end") + }; + match top.next().await { + Ok(entry) => { + if top.is_end() { + binary_heap::PeekMut::pop(top); } + Ok(entry) } - - // If the topmost layer in the heap hasn't been loaded yet, start - // loading it. Otherwise return the next entry from it and update - // the layer's position in the heap (this decreaseKey operation is - // performed implicitly when `top` is dropped). - if let Some(mut top) = this.heap.peek_mut() { - match top.deref_mut() { - LazyLoadLayer::Unloaded(ref mut l) => { - let fut = l.load_keys(this.ctx); - this.load_future.set(Some(Box::pin(fut))); - continue; - } - LazyLoadLayer::Loaded(ref mut entries) => { - let result = entries.pop_front().unwrap(); - if entries.is_empty() { - std::collections::binary_heap::PeekMut::pop(top); - } - return Poll::Ready(Some(Ok(result))); - } - } - } else { - return Poll::Ready(None); + Err(e) => { + // pop the item if there is an error, otherwise it might cause further panic when binary heap compares it after `PeekMut` gets dropped. + binary_heap::PeekMut::pop(top); + Err(e) } } } diff --git a/pageserver/compaction/src/interface.rs b/pageserver/compaction/src/interface.rs index 35519b5d0a..07ddc4210c 100644 --- a/pageserver/compaction/src/interface.rs +++ b/pageserver/compaction/src/interface.rs @@ -92,7 +92,9 @@ pub trait CompactionJobExecutor { ) -> impl Future> + Send; } -pub trait CompactionKey: std::cmp::Ord + Clone + Copy + std::fmt::Display { +pub trait CompactionKey: + std::cmp::Ord + Clone + Copy + std::fmt::Display + std::fmt::Debug +{ const MIN: Self; const MAX: Self; diff --git a/pageserver/compaction/src/simulator.rs b/pageserver/compaction/src/simulator.rs index a7c8bd5c1f..847e1f03e6 100644 --- a/pageserver/compaction/src/simulator.rs +++ b/pageserver/compaction/src/simulator.rs @@ -2,7 +2,6 @@ mod draw; use draw::{LayerTraceEvent, LayerTraceFile, LayerTraceOp}; -use futures::StreamExt; use pageserver_api::shard::ShardIdentity; use rand::Rng; use tracing::info; @@ -15,7 +14,8 @@ use std::sync::Arc; use std::sync::Mutex; use crate::helpers::PAGE_SZ; -use crate::helpers::{merge_delta_keys, overlaps_with}; +use crate::helpers::overlaps_with; +use crate::helpers::DeltaMergeIterator; use crate::interface; use crate::interface::CompactionLayer; @@ -545,12 +545,11 @@ impl interface::CompactionJobExecutor for MockTimeline { input_layers: &[Arc], ctx: &MockRequestContext, ) -> anyhow::Result<()> { - let mut key_value_stream = - std::pin::pin!(merge_delta_keys::(input_layers, ctx)); + let mut key_value_stream = DeltaMergeIterator::::new(input_layers, ctx); let mut records: Vec = Vec::new(); let mut total_len = 2; - while let Some(delta_entry) = key_value_stream.next().await { - let delta_entry: MockRecord = delta_entry?; + while !key_value_stream.is_end() { + let delta_entry: MockRecord = key_value_stream.next().await?; if key_range.contains(&delta_entry.key) && lsn_range.contains(&delta_entry.lsn) { total_len += delta_entry.len; records.push(delta_entry);