mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-10 23:12:54 +00:00
feat(pageserver): support split delta layers (#8599)
part of https://github.com/neondatabase/neon/issues/8002 Similar to https://github.com/neondatabase/neon/pull/8574, we add auto-split support for delta layers. Tests are reused from image layer split writers. --------- Signed-off-by: Alex Chi Z <chi@neon.tech>
This commit is contained in:
@@ -384,6 +384,9 @@ struct DeltaLayerWriterInner {
|
||||
tree: DiskBtreeBuilder<BlockBuf, DELTA_KEY_SIZE>,
|
||||
|
||||
blob_writer: BlobWriter<true>,
|
||||
|
||||
// Number of key-lsns in the layer.
|
||||
num_keys: usize,
|
||||
}
|
||||
|
||||
impl DeltaLayerWriterInner {
|
||||
@@ -425,6 +428,7 @@ impl DeltaLayerWriterInner {
|
||||
lsn_range,
|
||||
tree: tree_builder,
|
||||
blob_writer,
|
||||
num_keys: 0,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -475,6 +479,9 @@ impl DeltaLayerWriterInner {
|
||||
|
||||
let delta_key = DeltaKey::from_key_lsn(&key, lsn);
|
||||
let res = self.tree.append(&delta_key.0, blob_ref.0);
|
||||
|
||||
self.num_keys += 1;
|
||||
|
||||
(val, res.map_err(|e| anyhow::anyhow!(e)))
|
||||
}
|
||||
|
||||
@@ -686,6 +693,17 @@ impl DeltaLayerWriter {
|
||||
.finish(key_end, timeline, ctx)
|
||||
.await
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) fn num_keys(&self) -> usize {
|
||||
self.inner.as_ref().unwrap().num_keys
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) fn estimated_size(&self) -> u64 {
|
||||
let inner = self.inner.as_ref().unwrap();
|
||||
inner.blob_writer.size() + inner.tree.borrow_writer().size() + PAGE_SZ as u64
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for DeltaLayerWriter {
|
||||
|
||||
@@ -1,12 +1,12 @@
|
||||
use std::sync::Arc;
|
||||
use std::{ops::Range, sync::Arc};
|
||||
|
||||
use bytes::Bytes;
|
||||
use pageserver_api::key::{Key, KEY_SIZE};
|
||||
use utils::{id::TimelineId, lsn::Lsn, shard::TenantShardId};
|
||||
|
||||
use crate::{config::PageServerConf, context::RequestContext, tenant::Timeline};
|
||||
use crate::{config::PageServerConf, context::RequestContext, repository::Value, tenant::Timeline};
|
||||
|
||||
use super::{ImageLayerWriter, ResidentLayer};
|
||||
use super::{DeltaLayerWriter, ImageLayerWriter, ResidentLayer};
|
||||
|
||||
/// An image writer that takes images and produces multiple image layers. The interface does not
|
||||
/// guarantee atomicity (i.e., if the image layer generation fails, there might be leftover files
|
||||
@@ -98,6 +98,107 @@ impl SplitImageLayerWriter {
|
||||
generated_layers.push(inner.finish_with_end_key(tline, end_key, ctx).await?);
|
||||
Ok(generated_layers)
|
||||
}
|
||||
|
||||
/// When split writer fails, the caller should call this function and handle partially generated layers.
|
||||
#[allow(dead_code)]
|
||||
pub(crate) async fn take(self) -> anyhow::Result<(Vec<ResidentLayer>, ImageLayerWriter)> {
|
||||
Ok((self.generated_layers, self.inner))
|
||||
}
|
||||
}
|
||||
|
||||
/// A delta writer that takes key-lsn-values and produces multiple delta layers. The interface does not
|
||||
/// guarantee atomicity (i.e., if the delta layer generation fails, there might be leftover files
|
||||
/// to be cleaned up).
|
||||
#[must_use]
|
||||
pub struct SplitDeltaLayerWriter {
|
||||
inner: DeltaLayerWriter,
|
||||
target_layer_size: u64,
|
||||
generated_layers: Vec<ResidentLayer>,
|
||||
conf: &'static PageServerConf,
|
||||
timeline_id: TimelineId,
|
||||
tenant_shard_id: TenantShardId,
|
||||
lsn_range: Range<Lsn>,
|
||||
}
|
||||
|
||||
impl SplitDeltaLayerWriter {
|
||||
pub async fn new(
|
||||
conf: &'static PageServerConf,
|
||||
timeline_id: TimelineId,
|
||||
tenant_shard_id: TenantShardId,
|
||||
start_key: Key,
|
||||
lsn_range: Range<Lsn>,
|
||||
target_layer_size: u64,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Self> {
|
||||
Ok(Self {
|
||||
target_layer_size,
|
||||
inner: DeltaLayerWriter::new(
|
||||
conf,
|
||||
timeline_id,
|
||||
tenant_shard_id,
|
||||
start_key,
|
||||
lsn_range.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await?,
|
||||
generated_layers: Vec::new(),
|
||||
conf,
|
||||
timeline_id,
|
||||
tenant_shard_id,
|
||||
lsn_range,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn put_value(
|
||||
&mut self,
|
||||
key: Key,
|
||||
lsn: Lsn,
|
||||
val: Value,
|
||||
tline: &Arc<Timeline>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
// The current estimation is key size plus LSN size plus value size estimation. This is not an accurate
|
||||
// number, and therefore the final layer size could be a little bit larger or smaller than the target.
|
||||
let addition_size_estimation = KEY_SIZE as u64 + 8 /* LSN u64 size */ + 80 /* value size estimation */;
|
||||
if self.inner.num_keys() >= 1
|
||||
&& self.inner.estimated_size() + addition_size_estimation >= self.target_layer_size
|
||||
{
|
||||
let next_delta_writer = DeltaLayerWriter::new(
|
||||
self.conf,
|
||||
self.timeline_id,
|
||||
self.tenant_shard_id,
|
||||
key,
|
||||
self.lsn_range.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
let prev_delta_writer = std::mem::replace(&mut self.inner, next_delta_writer);
|
||||
self.generated_layers
|
||||
.push(prev_delta_writer.finish(key, tline, ctx).await?);
|
||||
}
|
||||
self.inner.put_value(key, lsn, val, ctx).await
|
||||
}
|
||||
|
||||
pub(crate) async fn finish(
|
||||
self,
|
||||
tline: &Arc<Timeline>,
|
||||
ctx: &RequestContext,
|
||||
end_key: Key,
|
||||
) -> anyhow::Result<Vec<ResidentLayer>> {
|
||||
let Self {
|
||||
mut generated_layers,
|
||||
inner,
|
||||
..
|
||||
} = self;
|
||||
generated_layers.push(inner.finish(end_key, tline, ctx).await?);
|
||||
Ok(generated_layers)
|
||||
}
|
||||
|
||||
/// When split writer fails, the caller should call this function and handle partially generated layers.
|
||||
#[allow(dead_code)]
|
||||
pub(crate) async fn take(self) -> anyhow::Result<(Vec<ResidentLayer>, DeltaLayerWriter)> {
|
||||
Ok((self.generated_layers, self.inner))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -138,7 +239,7 @@ mod tests {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mut writer = SplitImageLayerWriter::new(
|
||||
let mut image_writer = SplitImageLayerWriter::new(
|
||||
tenant.conf,
|
||||
tline.timeline_id,
|
||||
tenant.tenant_shard_id,
|
||||
@@ -150,11 +251,42 @@ mod tests {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
writer
|
||||
let mut delta_writer = SplitDeltaLayerWriter::new(
|
||||
tenant.conf,
|
||||
tline.timeline_id,
|
||||
tenant.tenant_shard_id,
|
||||
get_key(0),
|
||||
Lsn(0x18)..Lsn(0x20),
|
||||
4 * 1024 * 1024,
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
image_writer
|
||||
.put_image(get_key(0), get_img(0), &tline, &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
let layers = writer.finish(&tline, &ctx, get_key(10)).await.unwrap();
|
||||
let layers = image_writer
|
||||
.finish(&tline, &ctx, get_key(10))
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(layers.len(), 1);
|
||||
|
||||
delta_writer
|
||||
.put_value(
|
||||
get_key(0),
|
||||
Lsn(0x18),
|
||||
Value::Image(get_img(0)),
|
||||
&tline,
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let layers = delta_writer
|
||||
.finish(&tline, &ctx, get_key(10))
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(layers.len(), 1);
|
||||
}
|
||||
|
||||
@@ -170,7 +302,7 @@ mod tests {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mut writer = SplitImageLayerWriter::new(
|
||||
let mut image_writer = SplitImageLayerWriter::new(
|
||||
tenant.conf,
|
||||
tline.timeline_id,
|
||||
tenant.tenant_shard_id,
|
||||
@@ -181,26 +313,58 @@ mod tests {
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let mut delta_writer = SplitDeltaLayerWriter::new(
|
||||
tenant.conf,
|
||||
tline.timeline_id,
|
||||
tenant.tenant_shard_id,
|
||||
get_key(0),
|
||||
Lsn(0x18)..Lsn(0x20),
|
||||
4 * 1024 * 1024,
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
const N: usize = 2000;
|
||||
for i in 0..N {
|
||||
let i = i as u32;
|
||||
writer
|
||||
image_writer
|
||||
.put_image(get_key(i), get_large_img(), &tline, &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
delta_writer
|
||||
.put_value(
|
||||
get_key(i),
|
||||
Lsn(0x20),
|
||||
Value::Image(get_large_img()),
|
||||
&tline,
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
let layers = writer
|
||||
let image_layers = image_writer
|
||||
.finish(&tline, &ctx, get_key(N as u32))
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(layers.len(), N / 512 + 1);
|
||||
for idx in 0..layers.len() {
|
||||
assert_ne!(layers[idx].layer_desc().key_range.start, Key::MIN);
|
||||
assert_ne!(layers[idx].layer_desc().key_range.end, Key::MAX);
|
||||
let delta_layers = delta_writer
|
||||
.finish(&tline, &ctx, get_key(N as u32))
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(image_layers.len(), N / 512 + 1);
|
||||
assert_eq!(delta_layers.len(), N / 512 + 1);
|
||||
for idx in 0..image_layers.len() {
|
||||
assert_ne!(image_layers[idx].layer_desc().key_range.start, Key::MIN);
|
||||
assert_ne!(image_layers[idx].layer_desc().key_range.end, Key::MAX);
|
||||
assert_ne!(delta_layers[idx].layer_desc().key_range.start, Key::MIN);
|
||||
assert_ne!(delta_layers[idx].layer_desc().key_range.end, Key::MAX);
|
||||
if idx > 0 {
|
||||
assert_eq!(
|
||||
layers[idx - 1].layer_desc().key_range.end,
|
||||
layers[idx].layer_desc().key_range.start
|
||||
image_layers[idx - 1].layer_desc().key_range.end,
|
||||
image_layers[idx].layer_desc().key_range.start
|
||||
);
|
||||
assert_eq!(
|
||||
delta_layers[idx - 1].layer_desc().key_range.end,
|
||||
delta_layers[idx].layer_desc().key_range.start
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -218,7 +382,7 @@ mod tests {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mut writer = SplitImageLayerWriter::new(
|
||||
let mut image_writer = SplitImageLayerWriter::new(
|
||||
tenant.conf,
|
||||
tline.timeline_id,
|
||||
tenant.tenant_shard_id,
|
||||
@@ -230,15 +394,56 @@ mod tests {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
writer
|
||||
let mut delta_writer = SplitDeltaLayerWriter::new(
|
||||
tenant.conf,
|
||||
tline.timeline_id,
|
||||
tenant.tenant_shard_id,
|
||||
get_key(0),
|
||||
Lsn(0x18)..Lsn(0x20),
|
||||
4 * 1024,
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
image_writer
|
||||
.put_image(get_key(0), get_img(0), &tline, &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
writer
|
||||
image_writer
|
||||
.put_image(get_key(1), get_large_img(), &tline, &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
let layers = writer.finish(&tline, &ctx, get_key(10)).await.unwrap();
|
||||
let layers = image_writer
|
||||
.finish(&tline, &ctx, get_key(10))
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(layers.len(), 2);
|
||||
|
||||
delta_writer
|
||||
.put_value(
|
||||
get_key(0),
|
||||
Lsn(0x18),
|
||||
Value::Image(get_img(0)),
|
||||
&tline,
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
delta_writer
|
||||
.put_value(
|
||||
get_key(1),
|
||||
Lsn(0x1A),
|
||||
Value::Image(get_large_img()),
|
||||
&tline,
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let layers = delta_writer
|
||||
.finish(&tline, &ctx, get_key(10))
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(layers.len(), 2);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user