diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index f9becf53ff..e50fc2a266 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -384,6 +384,9 @@ struct DeltaLayerWriterInner { tree: DiskBtreeBuilder, blob_writer: BlobWriter, + + // Number of key-lsns in the layer. + num_keys: usize, } impl DeltaLayerWriterInner { @@ -425,6 +428,7 @@ impl DeltaLayerWriterInner { lsn_range, tree: tree_builder, blob_writer, + num_keys: 0, }) } @@ -475,6 +479,9 @@ impl DeltaLayerWriterInner { let delta_key = DeltaKey::from_key_lsn(&key, lsn); let res = self.tree.append(&delta_key.0, blob_ref.0); + + self.num_keys += 1; + (val, res.map_err(|e| anyhow::anyhow!(e))) } @@ -686,6 +693,17 @@ impl DeltaLayerWriter { .finish(key_end, timeline, ctx) .await } + + #[cfg(test)] + pub(crate) fn num_keys(&self) -> usize { + self.inner.as_ref().unwrap().num_keys + } + + #[cfg(test)] + pub(crate) fn estimated_size(&self) -> u64 { + let inner = self.inner.as_ref().unwrap(); + inner.blob_writer.size() + inner.tree.borrow_writer().size() + PAGE_SZ as u64 + } } impl Drop for DeltaLayerWriter { diff --git a/pageserver/src/tenant/storage_layer/split_writer.rs b/pageserver/src/tenant/storage_layer/split_writer.rs index a4091a890c..a966775f9e 100644 --- a/pageserver/src/tenant/storage_layer/split_writer.rs +++ b/pageserver/src/tenant/storage_layer/split_writer.rs @@ -1,12 +1,12 @@ -use std::sync::Arc; +use std::{ops::Range, sync::Arc}; use bytes::Bytes; use pageserver_api::key::{Key, KEY_SIZE}; use utils::{id::TimelineId, lsn::Lsn, shard::TenantShardId}; -use crate::{config::PageServerConf, context::RequestContext, tenant::Timeline}; +use crate::{config::PageServerConf, context::RequestContext, repository::Value, tenant::Timeline}; -use super::{ImageLayerWriter, ResidentLayer}; +use super::{DeltaLayerWriter, ImageLayerWriter, ResidentLayer}; /// An image writer that takes images and produces multiple image layers. The interface does not /// guarantee atomicity (i.e., if the image layer generation fails, there might be leftover files @@ -98,6 +98,107 @@ impl SplitImageLayerWriter { generated_layers.push(inner.finish_with_end_key(tline, end_key, ctx).await?); Ok(generated_layers) } + + /// When split writer fails, the caller should call this function and handle partially generated layers. + #[allow(dead_code)] + pub(crate) async fn take(self) -> anyhow::Result<(Vec, ImageLayerWriter)> { + Ok((self.generated_layers, self.inner)) + } +} + +/// A delta writer that takes key-lsn-values and produces multiple delta layers. The interface does not +/// guarantee atomicity (i.e., if the delta layer generation fails, there might be leftover files +/// to be cleaned up). +#[must_use] +pub struct SplitDeltaLayerWriter { + inner: DeltaLayerWriter, + target_layer_size: u64, + generated_layers: Vec, + conf: &'static PageServerConf, + timeline_id: TimelineId, + tenant_shard_id: TenantShardId, + lsn_range: Range, +} + +impl SplitDeltaLayerWriter { + pub async fn new( + conf: &'static PageServerConf, + timeline_id: TimelineId, + tenant_shard_id: TenantShardId, + start_key: Key, + lsn_range: Range, + target_layer_size: u64, + ctx: &RequestContext, + ) -> anyhow::Result { + Ok(Self { + target_layer_size, + inner: DeltaLayerWriter::new( + conf, + timeline_id, + tenant_shard_id, + start_key, + lsn_range.clone(), + ctx, + ) + .await?, + generated_layers: Vec::new(), + conf, + timeline_id, + tenant_shard_id, + lsn_range, + }) + } + + pub async fn put_value( + &mut self, + key: Key, + lsn: Lsn, + val: Value, + tline: &Arc, + ctx: &RequestContext, + ) -> anyhow::Result<()> { + // The current estimation is key size plus LSN size plus value size estimation. This is not an accurate + // number, and therefore the final layer size could be a little bit larger or smaller than the target. + let addition_size_estimation = KEY_SIZE as u64 + 8 /* LSN u64 size */ + 80 /* value size estimation */; + if self.inner.num_keys() >= 1 + && self.inner.estimated_size() + addition_size_estimation >= self.target_layer_size + { + let next_delta_writer = DeltaLayerWriter::new( + self.conf, + self.timeline_id, + self.tenant_shard_id, + key, + self.lsn_range.clone(), + ctx, + ) + .await?; + let prev_delta_writer = std::mem::replace(&mut self.inner, next_delta_writer); + self.generated_layers + .push(prev_delta_writer.finish(key, tline, ctx).await?); + } + self.inner.put_value(key, lsn, val, ctx).await + } + + pub(crate) async fn finish( + self, + tline: &Arc, + ctx: &RequestContext, + end_key: Key, + ) -> anyhow::Result> { + let Self { + mut generated_layers, + inner, + .. + } = self; + generated_layers.push(inner.finish(end_key, tline, ctx).await?); + Ok(generated_layers) + } + + /// When split writer fails, the caller should call this function and handle partially generated layers. + #[allow(dead_code)] + pub(crate) async fn take(self) -> anyhow::Result<(Vec, DeltaLayerWriter)> { + Ok((self.generated_layers, self.inner)) + } } #[cfg(test)] @@ -138,7 +239,7 @@ mod tests { .await .unwrap(); - let mut writer = SplitImageLayerWriter::new( + let mut image_writer = SplitImageLayerWriter::new( tenant.conf, tline.timeline_id, tenant.tenant_shard_id, @@ -150,11 +251,42 @@ mod tests { .await .unwrap(); - writer + let mut delta_writer = SplitDeltaLayerWriter::new( + tenant.conf, + tline.timeline_id, + tenant.tenant_shard_id, + get_key(0), + Lsn(0x18)..Lsn(0x20), + 4 * 1024 * 1024, + &ctx, + ) + .await + .unwrap(); + + image_writer .put_image(get_key(0), get_img(0), &tline, &ctx) .await .unwrap(); - let layers = writer.finish(&tline, &ctx, get_key(10)).await.unwrap(); + let layers = image_writer + .finish(&tline, &ctx, get_key(10)) + .await + .unwrap(); + assert_eq!(layers.len(), 1); + + delta_writer + .put_value( + get_key(0), + Lsn(0x18), + Value::Image(get_img(0)), + &tline, + &ctx, + ) + .await + .unwrap(); + let layers = delta_writer + .finish(&tline, &ctx, get_key(10)) + .await + .unwrap(); assert_eq!(layers.len(), 1); } @@ -170,7 +302,7 @@ mod tests { .await .unwrap(); - let mut writer = SplitImageLayerWriter::new( + let mut image_writer = SplitImageLayerWriter::new( tenant.conf, tline.timeline_id, tenant.tenant_shard_id, @@ -181,26 +313,58 @@ mod tests { ) .await .unwrap(); + let mut delta_writer = SplitDeltaLayerWriter::new( + tenant.conf, + tline.timeline_id, + tenant.tenant_shard_id, + get_key(0), + Lsn(0x18)..Lsn(0x20), + 4 * 1024 * 1024, + &ctx, + ) + .await + .unwrap(); const N: usize = 2000; for i in 0..N { let i = i as u32; - writer + image_writer .put_image(get_key(i), get_large_img(), &tline, &ctx) .await .unwrap(); + delta_writer + .put_value( + get_key(i), + Lsn(0x20), + Value::Image(get_large_img()), + &tline, + &ctx, + ) + .await + .unwrap(); } - let layers = writer + let image_layers = image_writer .finish(&tline, &ctx, get_key(N as u32)) .await .unwrap(); - assert_eq!(layers.len(), N / 512 + 1); - for idx in 0..layers.len() { - assert_ne!(layers[idx].layer_desc().key_range.start, Key::MIN); - assert_ne!(layers[idx].layer_desc().key_range.end, Key::MAX); + let delta_layers = delta_writer + .finish(&tline, &ctx, get_key(N as u32)) + .await + .unwrap(); + assert_eq!(image_layers.len(), N / 512 + 1); + assert_eq!(delta_layers.len(), N / 512 + 1); + for idx in 0..image_layers.len() { + assert_ne!(image_layers[idx].layer_desc().key_range.start, Key::MIN); + assert_ne!(image_layers[idx].layer_desc().key_range.end, Key::MAX); + assert_ne!(delta_layers[idx].layer_desc().key_range.start, Key::MIN); + assert_ne!(delta_layers[idx].layer_desc().key_range.end, Key::MAX); if idx > 0 { assert_eq!( - layers[idx - 1].layer_desc().key_range.end, - layers[idx].layer_desc().key_range.start + image_layers[idx - 1].layer_desc().key_range.end, + image_layers[idx].layer_desc().key_range.start + ); + assert_eq!( + delta_layers[idx - 1].layer_desc().key_range.end, + delta_layers[idx].layer_desc().key_range.start ); } } @@ -218,7 +382,7 @@ mod tests { .await .unwrap(); - let mut writer = SplitImageLayerWriter::new( + let mut image_writer = SplitImageLayerWriter::new( tenant.conf, tline.timeline_id, tenant.tenant_shard_id, @@ -230,15 +394,56 @@ mod tests { .await .unwrap(); - writer + let mut delta_writer = SplitDeltaLayerWriter::new( + tenant.conf, + tline.timeline_id, + tenant.tenant_shard_id, + get_key(0), + Lsn(0x18)..Lsn(0x20), + 4 * 1024, + &ctx, + ) + .await + .unwrap(); + + image_writer .put_image(get_key(0), get_img(0), &tline, &ctx) .await .unwrap(); - writer + image_writer .put_image(get_key(1), get_large_img(), &tline, &ctx) .await .unwrap(); - let layers = writer.finish(&tline, &ctx, get_key(10)).await.unwrap(); + let layers = image_writer + .finish(&tline, &ctx, get_key(10)) + .await + .unwrap(); + assert_eq!(layers.len(), 2); + + delta_writer + .put_value( + get_key(0), + Lsn(0x18), + Value::Image(get_img(0)), + &tline, + &ctx, + ) + .await + .unwrap(); + delta_writer + .put_value( + get_key(1), + Lsn(0x1A), + Value::Image(get_large_img()), + &tline, + &ctx, + ) + .await + .unwrap(); + let layers = delta_writer + .finish(&tline, &ctx, get_key(10)) + .await + .unwrap(); assert_eq!(layers.len(), 2); } }