pageserver: integrate k-merge with bottom-most compaction (#8415)

Use the k-merge iterator in the compaction process to reduce memory
footprint.

part of https://github.com/neondatabase/neon/issues/8002

## Summary of changes

* refactor the bottom-most compaction code to use k-merge iterator
* add Send bound on some structs as it is used across the await points

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
Co-authored-by: Arpad Müller <arpad-m@users.noreply.github.com>
This commit is contained in:
Alex Chi Z.
2024-07-18 12:16:44 -04:00
committed by GitHub
parent d263b1804e
commit a4434cf1c0
9 changed files with 62 additions and 71 deletions

View File

@@ -6810,7 +6810,7 @@ mod tests {
vec![
// Image layer at GC horizon
PersistentLayerKey {
key_range: Key::MIN..get_key(10),
key_range: Key::MIN..Key::MAX,
lsn_range: Lsn(0x30)..Lsn(0x31),
is_delta: false
},

View File

@@ -262,7 +262,7 @@ where
pub fn iter<'a>(self, start_key: &'a [u8; L], ctx: &'a RequestContext) -> DiskBtreeIterator<'a>
where
R: 'a,
R: 'a + Send,
{
DiskBtreeIterator {
stream: Box::pin(self.into_stream(start_key, ctx)),
@@ -521,7 +521,7 @@ where
pub struct DiskBtreeIterator<'a> {
#[allow(clippy::type_complexity)]
stream: std::pin::Pin<
Box<dyn Stream<Item = std::result::Result<(Vec<u8>, u64), DiskBtreeError>> + 'a>,
Box<dyn Stream<Item = std::result::Result<(Vec<u8>, u64), DiskBtreeError>> + 'a + Send>,
>,
}

View File

@@ -6,8 +6,6 @@ pub(crate) mod inmemory_layer;
pub(crate) mod layer;
mod layer_desc;
mod layer_name;
#[cfg(test)]
pub mod merge_iterator;
use crate::context::{AccessStatsBehavior, RequestContext};

View File

@@ -33,11 +33,14 @@ use crate::page_cache::{self, FileId, PAGE_SZ};
use crate::repository::{Key, Value, KEY_SIZE};
use crate::tenant::blob_io::BlobWriter;
use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, FileBlockReader};
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
use crate::tenant::disk_btree::{
DiskBtreeBuilder, DiskBtreeIterator, DiskBtreeReader, VisitDirection,
};
use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState};
use crate::tenant::timeline::GetVectoredError;
use crate::tenant::vectored_blob_io::{
BlobFlag, MaxVectoredReadBytes, VectoredBlobReader, VectoredRead, VectoredReadPlanner,
BlobFlag, MaxVectoredReadBytes, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
VectoredReadPlanner,
};
use crate::tenant::{PageReconstructError, Timeline};
use crate::virtual_file::{self, VirtualFile};
@@ -53,6 +56,7 @@ use pageserver_api::models::{ImageCompressionAlgorithm, LayerAccessKind};
use pageserver_api::shard::TenantShardId;
use rand::{distributions::Alphanumeric, Rng};
use serde::{Deserialize, Serialize};
use std::collections::VecDeque;
use std::fs::File;
use std::io::SeekFrom;
use std::ops::Range;
@@ -747,12 +751,10 @@ impl DeltaLayer {
}
impl DeltaLayerInner {
#[cfg(test)]
pub(crate) fn key_range(&self) -> &Range<Key> {
&self.layer_key_range
}
#[cfg(test)]
pub(crate) fn lsn_range(&self) -> &Range<Lsn> {
&self.layer_lsn_range
}
@@ -1512,7 +1514,6 @@ impl DeltaLayerInner {
offset
}
#[cfg(test)]
pub(crate) fn iter<'a>(&'a self, ctx: &'a RequestContext) -> DeltaLayerIterator<'a> {
let block_reader = FileBlockReader::new(&self.file, self.file_id);
let tree_reader =
@@ -1523,7 +1524,7 @@ impl DeltaLayerInner {
index_iter: tree_reader.iter(&[0; DELTA_KEY_SIZE], ctx),
key_values_batch: std::collections::VecDeque::new(),
is_end: false,
planner: crate::tenant::vectored_blob_io::StreamingVectoredReadPlanner::new(
planner: StreamingVectoredReadPlanner::new(
1024 * 8192, // The default value. Unit tests might use a different value. 1024 * 8K = 8MB buffer.
1024, // The default value. Unit tests might use a different value
),
@@ -1595,17 +1596,15 @@ impl<'a> pageserver_compaction::interface::CompactionDeltaEntry<'a, Key> for Del
}
}
#[cfg(test)]
pub struct DeltaLayerIterator<'a> {
delta_layer: &'a DeltaLayerInner,
ctx: &'a RequestContext,
planner: crate::tenant::vectored_blob_io::StreamingVectoredReadPlanner,
index_iter: crate::tenant::disk_btree::DiskBtreeIterator<'a>,
key_values_batch: std::collections::VecDeque<(Key, Lsn, Value)>,
planner: StreamingVectoredReadPlanner,
index_iter: DiskBtreeIterator<'a>,
key_values_batch: VecDeque<(Key, Lsn, Value)>,
is_end: bool,
}
#[cfg(test)]
impl<'a> DeltaLayerIterator<'a> {
/// Retrieve a batch of key-value pairs into the iterator buffer.
async fn next_batch(&mut self) -> anyhow::Result<()> {

View File

@@ -29,13 +29,16 @@ use crate::page_cache::{self, FileId, PAGE_SZ};
use crate::repository::{Key, Value, KEY_SIZE};
use crate::tenant::blob_io::BlobWriter;
use crate::tenant::block_io::{BlockBuf, BlockReader, FileBlockReader};
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
use crate::tenant::disk_btree::{
DiskBtreeBuilder, DiskBtreeIterator, DiskBtreeReader, VisitDirection,
};
use crate::tenant::storage_layer::{
LayerAccessStats, ValueReconstructResult, ValueReconstructState,
};
use crate::tenant::timeline::GetVectoredError;
use crate::tenant::vectored_blob_io::{
BlobFlag, MaxVectoredReadBytes, VectoredBlobReader, VectoredRead, VectoredReadPlanner,
BlobFlag, MaxVectoredReadBytes, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
VectoredReadPlanner,
};
use crate::tenant::{PageReconstructError, Timeline};
use crate::virtual_file::{self, VirtualFile};
@@ -50,6 +53,7 @@ use pageserver_api::models::LayerAccessKind;
use pageserver_api::shard::{ShardIdentity, TenantShardId};
use rand::{distributions::Alphanumeric, Rng};
use serde::{Deserialize, Serialize};
use std::collections::VecDeque;
use std::fs::File;
use std::io::SeekFrom;
use std::ops::Range;
@@ -369,12 +373,10 @@ impl ImageLayer {
}
impl ImageLayerInner {
#[cfg(test)]
pub(crate) fn key_range(&self) -> &Range<Key> {
&self.key_range
}
#[cfg(test)]
pub(crate) fn lsn(&self) -> Lsn {
self.lsn
}
@@ -699,7 +701,6 @@ impl ImageLayerInner {
}
}
#[cfg(test)]
pub(crate) fn iter<'a>(&'a self, ctx: &'a RequestContext) -> ImageLayerIterator<'a> {
let block_reader = FileBlockReader::new(&self.file, self.file_id);
let tree_reader =
@@ -708,9 +709,9 @@ impl ImageLayerInner {
image_layer: self,
ctx,
index_iter: tree_reader.iter(&[0; KEY_SIZE], ctx),
key_values_batch: std::collections::VecDeque::new(),
key_values_batch: VecDeque::new(),
is_end: false,
planner: crate::tenant::vectored_blob_io::StreamingVectoredReadPlanner::new(
planner: StreamingVectoredReadPlanner::new(
1024 * 8192, // The default value. Unit tests might use a different value. 1024 * 8K = 8MB buffer.
1024, // The default value. Unit tests might use a different value
),
@@ -974,17 +975,15 @@ impl Drop for ImageLayerWriter {
}
}
#[cfg(test)]
pub struct ImageLayerIterator<'a> {
image_layer: &'a ImageLayerInner,
ctx: &'a RequestContext,
planner: crate::tenant::vectored_blob_io::StreamingVectoredReadPlanner,
index_iter: crate::tenant::disk_btree::DiskBtreeIterator<'a>,
key_values_batch: std::collections::VecDeque<(Key, Lsn, Value)>,
planner: StreamingVectoredReadPlanner,
index_iter: DiskBtreeIterator<'a>,
key_values_batch: VecDeque<(Key, Lsn, Value)>,
is_end: bool,
}
#[cfg(test)]
impl<'a> ImageLayerIterator<'a> {
/// Retrieve a batch of key-value pairs into the iterator buffer.
async fn next_batch(&mut self) -> anyhow::Result<()> {

View File

@@ -385,6 +385,7 @@ impl Layer {
}
/// Get all key/values in the layer. Should be replaced with an iterator-based API in the future.
#[allow(dead_code)]
pub(crate) async fn load_key_values(
&self,
ctx: &RequestContext,
@@ -1918,7 +1919,7 @@ impl ResidentLayer {
self.owner.metadata()
}
#[cfg(test)]
/// Cast the layer to a delta, return an error if it is an image layer.
pub(crate) async fn get_as_delta(
&self,
ctx: &RequestContext,
@@ -1930,7 +1931,7 @@ impl ResidentLayer {
}
}
#[cfg(test)]
/// Cast the layer to an image, return an error if it is a delta layer.
pub(crate) async fn get_as_image(
&self,
ctx: &RequestContext,

View File

@@ -547,5 +547,9 @@ mod tests {
&ctx,
);
assert_merge_iter_equal(&mut merge_iter, &expect).await;
is_send(merge_iter);
}
fn is_send(_: impl Send) {}
}

View File

@@ -27,6 +27,7 @@ use utils::id::TimelineId;
use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
use crate::page_cache;
use crate::tenant::config::defaults::{DEFAULT_CHECKPOINT_DISTANCE, DEFAULT_COMPACTION_THRESHOLD};
use crate::tenant::storage_layer::merge_iterator::MergeIterator;
use crate::tenant::storage_layer::{AsLayerDesc, PersistentLayerDesc};
use crate::tenant::timeline::{drop_rlock, DeltaLayerWriter, ImageLayerWriter};
use crate::tenant::timeline::{Hole, ImageLayerCreationOutcome};
@@ -1039,10 +1040,12 @@ impl Timeline {
);
// Step 1: (In the future) construct a k-merge iterator over all layers. For now, simply collect all keys + LSNs.
// Also, collect the layer information to decide when to split the new delta layers.
let mut all_key_values = Vec::new();
let mut downloaded_layers = Vec::new();
let mut delta_split_points = BTreeSet::new();
for layer in &layer_selection {
all_key_values.extend(layer.load_key_values(ctx).await?);
let resident_layer = layer.download_and_keep_resident().await?;
downloaded_layers.push(resident_layer);
let desc = layer.layer_desc();
if desc.is_delta() {
// TODO: is it correct to only record split points for deltas intersecting with the GC horizon? (exclude those below/above the horizon)
@@ -1052,44 +1055,28 @@ impl Timeline {
delta_split_points.insert(key_range.end);
}
}
// Key small to large, LSN low to high, if the same LSN has both image and delta due to the merge of delta layers and
// image layers, make image appear before than delta.
struct ValueWrapper<'a>(&'a crate::repository::Value);
impl Ord for ValueWrapper<'_> {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
use crate::repository::Value;
use std::cmp::Ordering;
match (self.0, other.0) {
(Value::Image(_), Value::WalRecord(_)) => Ordering::Less,
(Value::WalRecord(_), Value::Image(_)) => Ordering::Greater,
_ => Ordering::Equal,
}
let mut delta_layers = Vec::new();
let mut image_layers = Vec::new();
for resident_layer in &downloaded_layers {
if resident_layer.layer_desc().is_delta() {
let layer = resident_layer.get_as_delta(ctx).await?;
delta_layers.push(layer);
} else {
let layer = resident_layer.get_as_image(ctx).await?;
image_layers.push(layer);
}
}
impl PartialOrd for ValueWrapper<'_> {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl PartialEq for ValueWrapper<'_> {
fn eq(&self, other: &Self) -> bool {
self.cmp(other) == std::cmp::Ordering::Equal
}
}
impl Eq for ValueWrapper<'_> {}
all_key_values.sort_by(|(k1, l1, v1), (k2, l2, v2)| {
(k1, l1, ValueWrapper(v1)).cmp(&(k2, l2, ValueWrapper(v2)))
});
let mut merge_iter = MergeIterator::create(&delta_layers, &image_layers, ctx);
// Step 2: Produce images+deltas. TODO: ensure newly-produced delta does not overlap with other deltas.
// Data of the same key.
let mut accumulated_values = Vec::new();
let mut last_key = all_key_values.first().unwrap().0; // TODO: assert all_key_values not empty
let mut last_key: Option<Key> = None;
/// Take a list of images and deltas, produce an image at the GC horizon, and a list of deltas above the GC horizon.
async fn flush_accumulated_states(
tline: &Arc<Timeline>,
key: Key,
accumulated_values: &[&(Key, Lsn, crate::repository::Value)],
accumulated_values: &[(Key, Lsn, crate::repository::Value)],
horizon: Lsn,
) -> anyhow::Result<(Vec<(Key, Lsn, crate::repository::Value)>, bytes::Bytes)> {
let mut base_image = None;
@@ -1190,7 +1177,7 @@ impl Timeline {
self.conf,
self.timeline_id,
self.tenant_shard_id,
&(all_key_values.first().unwrap().0..all_key_values.last().unwrap().0.next()),
&(Key::MIN..Key::MAX), // covers the full key range
gc_cutoff,
ctx,
)
@@ -1200,20 +1187,24 @@ impl Timeline {
let delta_split_points = delta_split_points.into_iter().collect_vec();
let mut current_delta_split_point = 0;
let mut delta_layers = Vec::new();
for item @ (key, _, _) in &all_key_values {
if &last_key == key {
accumulated_values.push(item);
while let Some((key, lsn, val)) = merge_iter.next().await? {
if last_key.is_none() || last_key.as_ref() == Some(&key) {
if last_key.is_none() {
last_key = Some(key);
}
accumulated_values.push((key, lsn, val));
} else {
let last_key = last_key.as_mut().unwrap();
let (deltas, image) =
flush_accumulated_states(self, last_key, &accumulated_values, gc_cutoff)
flush_accumulated_states(self, *last_key, &accumulated_values, gc_cutoff)
.await?;
// Put the image into the image layer. Currently we have a single big layer for the compaction.
image_layer_writer.put_image(last_key, image, ctx).await?;
image_layer_writer.put_image(*last_key, image, ctx).await?;
delta_values.extend(deltas);
delta_layers.extend(
flush_deltas(
&mut delta_values,
last_key,
*last_key,
&delta_split_points,
&mut current_delta_split_point,
self,
@@ -1223,11 +1214,12 @@ impl Timeline {
.await?,
);
accumulated_values.clear();
accumulated_values.push(item);
last_key = *key;
*last_key = key;
accumulated_values.push((key, lsn, val));
}
}
let last_key = last_key.expect("no keys produced during compaction");
// TODO: move this part to the loop body
let (deltas, image) =
flush_accumulated_states(self, last_key, &accumulated_values, gc_cutoff).await?;

View File

@@ -396,7 +396,6 @@ impl<'a> VectoredBlobReader<'a> {
/// Read planner used in [`crate::tenant::storage_layer::image_layer::ImageLayerIterator`]. It provides a streaming API for
/// getting read blobs. It returns a batch when `handle` gets called and when the current key would just exceed the read_size and
/// max_cnt constraints.
#[cfg(test)]
pub struct StreamingVectoredReadPlanner {
read_builder: Option<VectoredReadBuilder>,
// Arguments for previous blob passed into [`StreamingVectoredReadPlanner::handle`]
@@ -410,7 +409,6 @@ pub struct StreamingVectoredReadPlanner {
cnt: usize,
}
#[cfg(test)]
impl StreamingVectoredReadPlanner {
pub fn new(max_read_size: u64, max_cnt: usize) -> Self {
assert!(max_cnt > 0);