feat(pageserver): support auto split layers based on size (#8574)

part of https://github.com/neondatabase/neon/issues/8002

## Summary of changes

Add a `SplitImageWriter` that automatically splits image layer based on
estimated target image layer size. This does not consider compression
and we might need a better metrics.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
Co-authored-by: Arpad Müller <arpad-m@users.noreply.github.com>
This commit is contained in:
Alex Chi Z.
2024-08-05 13:55:36 +08:00
committed by Arpad Müller
parent 8624aabc98
commit fde8aa103e
3 changed files with 303 additions and 2 deletions

View File

@@ -8,6 +8,9 @@ mod layer_desc;
mod layer_name;
pub mod merge_iterator;
#[cfg(test)]
pub mod split_writer;
use crate::context::{AccessStatsBehavior, RequestContext};
use crate::repository::Value;
use crate::walrecord::NeonWalRecord;

View File

@@ -742,8 +742,14 @@ struct ImageLayerWriterInner {
// where we have chosen their compressed form
uncompressed_bytes_chosen: u64,
// Number of keys in the layer.
num_keys: usize,
blob_writer: BlobWriter<false>,
tree: DiskBtreeBuilder<BlockBuf, KEY_SIZE>,
#[cfg_attr(not(feature = "testing"), allow(dead_code))]
last_written_key: Key,
}
impl ImageLayerWriterInner {
@@ -800,6 +806,8 @@ impl ImageLayerWriterInner {
uncompressed_bytes: 0,
uncompressed_bytes_eligible: 0,
uncompressed_bytes_chosen: 0,
num_keys: 0,
last_written_key: Key::MIN,
};
Ok(writer)
@@ -820,6 +828,7 @@ impl ImageLayerWriterInner {
let compression = self.conf.image_compression;
let uncompressed_len = img.len() as u64;
self.uncompressed_bytes += uncompressed_len;
self.num_keys += 1;
let (_img, res) = self
.blob_writer
.write_blob_maybe_compressed(img, ctx, compression)
@@ -839,6 +848,11 @@ impl ImageLayerWriterInner {
key.write_to_byte_slice(&mut keybuf);
self.tree.append(&keybuf, off)?;
#[cfg(feature = "testing")]
{
self.last_written_key = key;
}
Ok(())
}
@@ -849,6 +863,7 @@ impl ImageLayerWriterInner {
self,
timeline: &Arc<Timeline>,
ctx: &RequestContext,
end_key: Option<Key>,
) -> anyhow::Result<ResidentLayer> {
let index_start_blk =
((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
@@ -899,11 +914,23 @@ impl ImageLayerWriterInner {
let desc = PersistentLayerDesc::new_img(
self.tenant_shard_id,
self.timeline_id,
self.key_range.clone(),
if let Some(end_key) = end_key {
self.key_range.start..end_key
} else {
self.key_range.clone()
},
self.lsn,
metadata.len(),
);
#[cfg(feature = "testing")]
if let Some(end_key) = end_key {
assert!(
self.last_written_key < end_key,
"written key violates end_key range"
);
}
// Note: Because we open the file in write-only mode, we cannot
// reuse the same VirtualFile for reading later. That's why we don't
// set inner.file here. The first read will have to re-open it.
@@ -980,6 +1007,18 @@ impl ImageLayerWriter {
self.inner.as_mut().unwrap().put_image(key, img, ctx).await
}
#[cfg(test)]
/// Estimated size of the image layer.
pub(crate) fn estimated_size(&self) -> u64 {
let inner = self.inner.as_ref().unwrap();
inner.blob_writer.size() + inner.tree.borrow_writer().size() + PAGE_SZ as u64
}
#[cfg(test)]
pub(crate) fn num_keys(&self) -> usize {
self.inner.as_ref().unwrap().num_keys
}
///
/// Finish writing the image layer.
///
@@ -988,7 +1027,22 @@ impl ImageLayerWriter {
timeline: &Arc<Timeline>,
ctx: &RequestContext,
) -> anyhow::Result<super::ResidentLayer> {
self.inner.take().unwrap().finish(timeline, ctx).await
self.inner.take().unwrap().finish(timeline, ctx, None).await
}
#[cfg(test)]
/// Finish writing the image layer with an end key, used in [`super::split_writer::SplitImageLayerWriter`]. The end key determines the end of the image layer's covered range and is exclusive.
pub(super) async fn finish_with_end_key(
mut self,
timeline: &Arc<Timeline>,
end_key: Key,
ctx: &RequestContext,
) -> anyhow::Result<super::ResidentLayer> {
self.inner
.take()
.unwrap()
.finish(timeline, ctx, Some(end_key))
.await
}
}

View File

@@ -0,0 +1,244 @@
use std::sync::Arc;
use bytes::Bytes;
use pageserver_api::key::{Key, KEY_SIZE};
use utils::{id::TimelineId, lsn::Lsn, shard::TenantShardId};
use crate::{config::PageServerConf, context::RequestContext, tenant::Timeline};
use super::{ImageLayerWriter, ResidentLayer};
/// An image writer that takes images and produces multiple image layers. The interface does not
/// guarantee atomicity (i.e., if the image layer generation fails, there might be leftover files
/// to be cleaned up)
#[must_use]
pub struct SplitImageLayerWriter {
inner: ImageLayerWriter,
target_layer_size: u64,
generated_layers: Vec<ResidentLayer>,
conf: &'static PageServerConf,
timeline_id: TimelineId,
tenant_shard_id: TenantShardId,
lsn: Lsn,
}
impl SplitImageLayerWriter {
pub async fn new(
conf: &'static PageServerConf,
timeline_id: TimelineId,
tenant_shard_id: TenantShardId,
start_key: Key,
lsn: Lsn,
target_layer_size: u64,
ctx: &RequestContext,
) -> anyhow::Result<Self> {
Ok(Self {
target_layer_size,
inner: ImageLayerWriter::new(
conf,
timeline_id,
tenant_shard_id,
&(start_key..Key::MAX),
lsn,
ctx,
)
.await?,
generated_layers: Vec::new(),
conf,
timeline_id,
tenant_shard_id,
lsn,
})
}
pub async fn put_image(
&mut self,
key: Key,
img: Bytes,
tline: &Arc<Timeline>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
// The current estimation is an upper bound of the space that the key/image could take
// because we did not consider compression in this estimation. The resulting image layer
// could be smaller than the target size.
let addition_size_estimation = KEY_SIZE as u64 + img.len() as u64;
if self.inner.num_keys() >= 1
&& self.inner.estimated_size() + addition_size_estimation >= self.target_layer_size
{
let next_image_writer = ImageLayerWriter::new(
self.conf,
self.timeline_id,
self.tenant_shard_id,
&(key..Key::MAX),
self.lsn,
ctx,
)
.await?;
let prev_image_writer = std::mem::replace(&mut self.inner, next_image_writer);
self.generated_layers.push(
prev_image_writer
.finish_with_end_key(tline, key, ctx)
.await?,
);
}
self.inner.put_image(key, img, ctx).await
}
pub(crate) async fn finish(
self,
tline: &Arc<Timeline>,
ctx: &RequestContext,
end_key: Key,
) -> anyhow::Result<Vec<ResidentLayer>> {
let Self {
mut generated_layers,
inner,
..
} = self;
generated_layers.push(inner.finish_with_end_key(tline, end_key, ctx).await?);
Ok(generated_layers)
}
}
#[cfg(test)]
mod tests {
use crate::{
tenant::{
harness::{TenantHarness, TIMELINE_ID},
storage_layer::AsLayerDesc,
},
DEFAULT_PG_VERSION,
};
use super::*;
fn get_key(id: u32) -> Key {
let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
key.field6 = id;
key
}
fn get_img(id: u32) -> Bytes {
format!("{id:064}").into()
}
fn get_large_img() -> Bytes {
vec![0; 8192].into()
}
#[tokio::test]
async fn write_one_image() {
let harness = TenantHarness::create("split_writer_write_one_image")
.await
.unwrap();
let (tenant, ctx) = harness.load().await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
.await
.unwrap();
let mut writer = SplitImageLayerWriter::new(
tenant.conf,
tline.timeline_id,
tenant.tenant_shard_id,
get_key(0),
Lsn(0x18),
4 * 1024 * 1024,
&ctx,
)
.await
.unwrap();
writer
.put_image(get_key(0), get_img(0), &tline, &ctx)
.await
.unwrap();
let layers = writer.finish(&tline, &ctx, get_key(10)).await.unwrap();
assert_eq!(layers.len(), 1);
}
#[tokio::test]
async fn write_split() {
let harness = TenantHarness::create("split_writer_write_split")
.await
.unwrap();
let (tenant, ctx) = harness.load().await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
.await
.unwrap();
let mut writer = SplitImageLayerWriter::new(
tenant.conf,
tline.timeline_id,
tenant.tenant_shard_id,
get_key(0),
Lsn(0x18),
4 * 1024 * 1024,
&ctx,
)
.await
.unwrap();
const N: usize = 2000;
for i in 0..N {
let i = i as u32;
writer
.put_image(get_key(i), get_large_img(), &tline, &ctx)
.await
.unwrap();
}
let layers = writer
.finish(&tline, &ctx, get_key(N as u32))
.await
.unwrap();
assert_eq!(layers.len(), N / 512 + 1);
for idx in 0..layers.len() {
assert_ne!(layers[idx].layer_desc().key_range.start, Key::MIN);
assert_ne!(layers[idx].layer_desc().key_range.end, Key::MAX);
if idx > 0 {
assert_eq!(
layers[idx - 1].layer_desc().key_range.end,
layers[idx].layer_desc().key_range.start
);
}
}
}
#[tokio::test]
async fn write_large_img() {
let harness = TenantHarness::create("split_writer_write_large_img")
.await
.unwrap();
let (tenant, ctx) = harness.load().await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
.await
.unwrap();
let mut writer = SplitImageLayerWriter::new(
tenant.conf,
tline.timeline_id,
tenant.tenant_shard_id,
get_key(0),
Lsn(0x18),
4 * 1024,
&ctx,
)
.await
.unwrap();
writer
.put_image(get_key(0), get_img(0), &tline, &ctx)
.await
.unwrap();
writer
.put_image(get_key(1), get_large_img(), &tline, &ctx)
.await
.unwrap();
let layers = writer.finish(&tline, &ctx, get_key(10)).await.unwrap();
assert_eq!(layers.len(), 2);
}
}