|
|
|
|
@@ -6,14 +6,10 @@ use futures::future::BoxFuture;
|
|
|
|
|
use futures::{Stream, StreamExt};
|
|
|
|
|
use itertools::Itertools;
|
|
|
|
|
use pageserver_api::shard::ShardIdentity;
|
|
|
|
|
use pin_project_lite::pin_project;
|
|
|
|
|
use std::collections::BinaryHeap;
|
|
|
|
|
use std::collections::VecDeque;
|
|
|
|
|
use std::collections::{binary_heap, BinaryHeap};
|
|
|
|
|
use std::fmt::Display;
|
|
|
|
|
use std::future::Future;
|
|
|
|
|
use std::ops::{DerefMut, Range};
|
|
|
|
|
use std::pin::Pin;
|
|
|
|
|
use std::task::{ready, Poll};
|
|
|
|
|
use std::ops::Range;
|
|
|
|
|
use utils::lsn::Lsn;
|
|
|
|
|
|
|
|
|
|
pub const PAGE_SZ: u64 = 8192;
|
|
|
|
|
@@ -85,33 +81,6 @@ pub fn intersect_keyspace<K: Ord + Clone + Copy>(
|
|
|
|
|
ranges
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Create a stream that iterates through all DeltaEntrys among all input
|
|
|
|
|
/// layers, in key-lsn order.
|
|
|
|
|
///
|
|
|
|
|
/// This is public because the create_delta() implementation likely wants to use this too
|
|
|
|
|
/// TODO: move to a more shared place
|
|
|
|
|
pub fn merge_delta_keys<'a, E: CompactionJobExecutor>(
|
|
|
|
|
layers: &'a [E::DeltaLayer],
|
|
|
|
|
ctx: &'a E::RequestContext,
|
|
|
|
|
) -> MergeDeltaKeys<'a, E> {
|
|
|
|
|
// Use a binary heap to merge the layers. Each input layer is initially
|
|
|
|
|
// represented by a LazyLoadLayer::Unloaded element, which uses the start of
|
|
|
|
|
// the layer's key range as the key. The first time a layer reaches the top
|
|
|
|
|
// of the heap, all the keys of the layer are loaded into a sorted vector.
|
|
|
|
|
//
|
|
|
|
|
// This helps to keep the memory usage reasonable: we only need to hold in
|
|
|
|
|
// memory the DeltaEntrys of the layers that overlap with the "current" key.
|
|
|
|
|
let mut heap: BinaryHeap<LazyLoadLayer<'a, E>> = BinaryHeap::new();
|
|
|
|
|
for l in layers {
|
|
|
|
|
heap.push(LazyLoadLayer::Unloaded(l));
|
|
|
|
|
}
|
|
|
|
|
MergeDeltaKeys {
|
|
|
|
|
heap,
|
|
|
|
|
ctx,
|
|
|
|
|
load_future: None,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub async fn merge_delta_keys_buffered<'a, E: CompactionJobExecutor + 'a>(
|
|
|
|
|
layers: &'a [E::DeltaLayer],
|
|
|
|
|
ctx: &'a E::RequestContext,
|
|
|
|
|
@@ -129,104 +98,139 @@ pub async fn merge_delta_keys_buffered<'a, E: CompactionJobExecutor + 'a>(
|
|
|
|
|
Ok(stream)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
enum LazyLoadLayer<'a, E: CompactionJobExecutor> {
|
|
|
|
|
Loaded(VecDeque<<E::DeltaLayer as CompactionDeltaLayer<E>>::DeltaEntry<'a>>),
|
|
|
|
|
Unloaded(&'a E::DeltaLayer),
|
|
|
|
|
/// Wrapper type to make `dl.load_keys`` compile.
|
|
|
|
|
type LoadFuture<'a, E> = BoxFuture<'a, anyhow::Result<Vec<E>>>;
|
|
|
|
|
|
|
|
|
|
pub enum LayerIterator<'a, E: CompactionJobExecutor> {
|
|
|
|
|
Loaded(
|
|
|
|
|
VecDeque<<E::DeltaLayer as CompactionDeltaLayer<E>>::DeltaEntry<'a>>,
|
|
|
|
|
&'a E::RequestContext,
|
|
|
|
|
),
|
|
|
|
|
Unloaded(&'a E::DeltaLayer, &'a E::RequestContext),
|
|
|
|
|
}
|
|
|
|
|
impl<'a, E: CompactionJobExecutor> LazyLoadLayer<'a, E> {
|
|
|
|
|
fn min_key(&self) -> E::Key {
|
|
|
|
|
|
|
|
|
|
impl<'a, E: CompactionJobExecutor + 'a> LayerIterator<'a, E> {
|
|
|
|
|
pub fn new(delta_layer: &'a E::DeltaLayer, ctx: &'a E::RequestContext) -> Self {
|
|
|
|
|
Self::Unloaded(delta_layer, ctx)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub fn key_lsn(&self) -> (E::Key, Lsn) {
|
|
|
|
|
match self {
|
|
|
|
|
Self::Loaded(entries) => entries.front().unwrap().key(),
|
|
|
|
|
Self::Unloaded(dl) => dl.key_range().start,
|
|
|
|
|
Self::Unloaded(dl, _) => (dl.key_range().start, dl.lsn_range().start),
|
|
|
|
|
Self::Loaded(entries, _) => entries.front().map(|x| (x.key(), x.lsn())).unwrap(),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
fn min_lsn(&self) -> Lsn {
|
|
|
|
|
|
|
|
|
|
async fn load(&mut self) -> anyhow::Result<()> {
|
|
|
|
|
match self {
|
|
|
|
|
Self::Loaded(entries) => entries.front().unwrap().lsn(),
|
|
|
|
|
Self::Unloaded(dl) => dl.lsn_range().start,
|
|
|
|
|
Self::Unloaded(dl, ctx) => {
|
|
|
|
|
let unloaded_key_lsn = (dl.key_range().start, dl.lsn_range().start);
|
|
|
|
|
let fut: LoadFuture<
|
|
|
|
|
'a,
|
|
|
|
|
<E::DeltaLayer as CompactionDeltaLayer<E>>::DeltaEntry<'a>,
|
|
|
|
|
> = Box::pin(dl.load_keys(ctx));
|
|
|
|
|
let keys = VecDeque::from(fut.await?);
|
|
|
|
|
assert_eq!(
|
|
|
|
|
keys.front().as_ref().map(|x| (x.key(), x.lsn())).unwrap(),
|
|
|
|
|
unloaded_key_lsn,
|
|
|
|
|
"unmatched start key_lsn"
|
|
|
|
|
);
|
|
|
|
|
*self = Self::Loaded(keys, ctx);
|
|
|
|
|
Ok(())
|
|
|
|
|
}
|
|
|
|
|
Self::Loaded(_, _) => Ok(()),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub async fn entry(
|
|
|
|
|
&mut self,
|
|
|
|
|
) -> anyhow::Result<&<E::DeltaLayer as CompactionDeltaLayer<E>>::DeltaEntry<'a>> {
|
|
|
|
|
self.load().await?;
|
|
|
|
|
let Self::Loaded(x, _) = self else {
|
|
|
|
|
unreachable!()
|
|
|
|
|
};
|
|
|
|
|
Ok(x.front().unwrap())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub async fn next(
|
|
|
|
|
&mut self,
|
|
|
|
|
) -> anyhow::Result<<E::DeltaLayer as CompactionDeltaLayer<E>>::DeltaEntry<'a>> {
|
|
|
|
|
self.load().await?; // requires Box::pin to make it compile
|
|
|
|
|
let Self::Loaded(x, _) = self else {
|
|
|
|
|
unreachable!()
|
|
|
|
|
};
|
|
|
|
|
Ok(x.pop_front().expect("already reached the end"))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub fn is_end(&self) -> bool {
|
|
|
|
|
match self {
|
|
|
|
|
Self::Unloaded(_, _) => false,
|
|
|
|
|
Self::Loaded(x, _) => x.is_empty(),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
impl<'a, E: CompactionJobExecutor> PartialOrd for LazyLoadLayer<'a, E> {
|
|
|
|
|
|
|
|
|
|
impl<'a, E: CompactionJobExecutor + 'a> PartialOrd for LayerIterator<'a, E> {
|
|
|
|
|
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
|
|
|
|
|
Some(self.cmp(other))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
impl<'a, E: CompactionJobExecutor> Ord for LazyLoadLayer<'a, E> {
|
|
|
|
|
|
|
|
|
|
impl<'a, E: CompactionJobExecutor + 'a> Ord for LayerIterator<'a, E> {
|
|
|
|
|
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
|
|
|
|
|
// reverse order so that we get a min-heap
|
|
|
|
|
(other.min_key(), other.min_lsn()).cmp(&(self.min_key(), self.min_lsn()))
|
|
|
|
|
// reverse comparison to get a min-heap
|
|
|
|
|
other.key_lsn().cmp(&self.key_lsn())
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
impl<'a, E: CompactionJobExecutor> PartialEq for LazyLoadLayer<'a, E> {
|
|
|
|
|
|
|
|
|
|
impl<'a, E: CompactionJobExecutor + 'a> PartialEq for LayerIterator<'a, E> {
|
|
|
|
|
fn eq(&self, other: &Self) -> bool {
|
|
|
|
|
self.cmp(other) == std::cmp::Ordering::Equal
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
impl<'a, E: CompactionJobExecutor> Eq for LazyLoadLayer<'a, E> {}
|
|
|
|
|
|
|
|
|
|
type LoadFuture<'a, E> = BoxFuture<'a, anyhow::Result<Vec<E>>>;
|
|
|
|
|
impl<'a, E: CompactionJobExecutor + 'a> Eq for LayerIterator<'a, E> {}
|
|
|
|
|
|
|
|
|
|
// Stream returned by `merge_delta_keys`
|
|
|
|
|
pin_project! {
|
|
|
|
|
#[allow(clippy::type_complexity)]
|
|
|
|
|
pub struct MergeDeltaKeys<'a, E: CompactionJobExecutor> {
|
|
|
|
|
heap: BinaryHeap<LazyLoadLayer<'a, E>>,
|
|
|
|
|
|
|
|
|
|
#[pin]
|
|
|
|
|
load_future: Option<LoadFuture<'a, <E::DeltaLayer as CompactionDeltaLayer<E>>::DeltaEntry<'a>>>,
|
|
|
|
|
|
|
|
|
|
ctx: &'a E::RequestContext,
|
|
|
|
|
}
|
|
|
|
|
pub struct DeltaMergeIterator<'a, E: CompactionJobExecutor> {
|
|
|
|
|
heap: BinaryHeap<LayerIterator<'a, E>>,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl<'a, E> Stream for MergeDeltaKeys<'a, E>
|
|
|
|
|
where
|
|
|
|
|
E: CompactionJobExecutor + 'a,
|
|
|
|
|
{
|
|
|
|
|
type Item = anyhow::Result<<E::DeltaLayer as CompactionDeltaLayer<E>>::DeltaEntry<'a>>;
|
|
|
|
|
impl<'a, E: CompactionJobExecutor + 'a> DeltaMergeIterator<'a, E> {
|
|
|
|
|
pub fn new(delta_layers: &'a [E::DeltaLayer], ctx: &'a E::RequestContext) -> Self {
|
|
|
|
|
let mut heap = BinaryHeap::new();
|
|
|
|
|
for dl in delta_layers {
|
|
|
|
|
heap.push(LayerIterator::new(dl, ctx));
|
|
|
|
|
}
|
|
|
|
|
Self { heap }
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn poll_next(
|
|
|
|
|
self: Pin<&mut Self>,
|
|
|
|
|
cx: &mut std::task::Context<'_>,
|
|
|
|
|
) -> Poll<std::option::Option<<Self as futures::Stream>::Item>> {
|
|
|
|
|
let mut this = self.project();
|
|
|
|
|
loop {
|
|
|
|
|
if let Some(mut load_future) = this.load_future.as_mut().as_pin_mut() {
|
|
|
|
|
// We are waiting for loading the keys to finish
|
|
|
|
|
match ready!(load_future.as_mut().poll(cx)) {
|
|
|
|
|
Ok(entries) => {
|
|
|
|
|
this.load_future.set(None);
|
|
|
|
|
*this.heap.peek_mut().unwrap() =
|
|
|
|
|
LazyLoadLayer::Loaded(VecDeque::from(entries));
|
|
|
|
|
}
|
|
|
|
|
Err(e) => {
|
|
|
|
|
return Poll::Ready(Some(Err(e)));
|
|
|
|
|
}
|
|
|
|
|
pub fn is_end(&self) -> bool {
|
|
|
|
|
self.heap.is_empty()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// The next key-lsn entry that will be returned by `next`.
|
|
|
|
|
pub fn key_lsn(&self) -> (E::Key, Lsn) {
|
|
|
|
|
self.heap.peek().expect("already reached the end").key_lsn()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Move to the next entry and return the current entry.
|
|
|
|
|
pub async fn next(
|
|
|
|
|
&mut self,
|
|
|
|
|
) -> anyhow::Result<<E::DeltaLayer as CompactionDeltaLayer<E>>::DeltaEntry<'a>> {
|
|
|
|
|
let Some(mut top) = self.heap.peek_mut() else {
|
|
|
|
|
panic!("already reached the end")
|
|
|
|
|
};
|
|
|
|
|
match top.next().await {
|
|
|
|
|
Ok(entry) => {
|
|
|
|
|
if top.is_end() {
|
|
|
|
|
binary_heap::PeekMut::pop(top);
|
|
|
|
|
}
|
|
|
|
|
Ok(entry)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// If the topmost layer in the heap hasn't been loaded yet, start
|
|
|
|
|
// loading it. Otherwise return the next entry from it and update
|
|
|
|
|
// the layer's position in the heap (this decreaseKey operation is
|
|
|
|
|
// performed implicitly when `top` is dropped).
|
|
|
|
|
if let Some(mut top) = this.heap.peek_mut() {
|
|
|
|
|
match top.deref_mut() {
|
|
|
|
|
LazyLoadLayer::Unloaded(ref mut l) => {
|
|
|
|
|
let fut = l.load_keys(this.ctx);
|
|
|
|
|
this.load_future.set(Some(Box::pin(fut)));
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
LazyLoadLayer::Loaded(ref mut entries) => {
|
|
|
|
|
let result = entries.pop_front().unwrap();
|
|
|
|
|
if entries.is_empty() {
|
|
|
|
|
std::collections::binary_heap::PeekMut::pop(top);
|
|
|
|
|
}
|
|
|
|
|
return Poll::Ready(Some(Ok(result)));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
return Poll::Ready(None);
|
|
|
|
|
Err(e) => {
|
|
|
|
|
// pop the item if there is an error, otherwise it might cause further panic when binary heap compares it after `PeekMut` gets dropped.
|
|
|
|
|
binary_heap::PeekMut::pop(top);
|
|
|
|
|
Err(e)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|