diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index a229b59560..4a63491e90 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -1,5 +1,6 @@ //! Common traits and structs for layers +pub mod batch_split_writer; pub mod delta_layer; pub mod filter_iterator; pub mod image_layer; @@ -8,7 +9,6 @@ pub(crate) mod layer; mod layer_desc; mod layer_name; pub mod merge_iterator; -pub mod split_writer; use crate::context::{AccessStatsBehavior, RequestContext}; use crate::repository::Value; diff --git a/pageserver/src/tenant/storage_layer/split_writer.rs b/pageserver/src/tenant/storage_layer/batch_split_writer.rs similarity index 80% rename from pageserver/src/tenant/storage_layer/split_writer.rs rename to pageserver/src/tenant/storage_layer/batch_split_writer.rs index 45ac0c6668..272e422c90 100644 --- a/pageserver/src/tenant/storage_layer/split_writer.rs +++ b/pageserver/src/tenant/storage_layer/batch_split_writer.rs @@ -12,41 +12,154 @@ use super::{ DeltaLayerWriter, ImageLayerWriter, PersistentLayerDesc, PersistentLayerKey, ResidentLayer, }; -pub(crate) enum SplitWriterResult { +pub(crate) enum BatchWriterResult { Produced(ResidentLayer), Discarded(PersistentLayerKey), } #[cfg(test)] -impl SplitWriterResult { +impl BatchWriterResult { fn into_resident_layer(self) -> ResidentLayer { match self { - SplitWriterResult::Produced(layer) => layer, - SplitWriterResult::Discarded(_) => panic!("unexpected discarded layer"), + BatchWriterResult::Produced(layer) => layer, + BatchWriterResult::Discarded(_) => panic!("unexpected discarded layer"), } } fn into_discarded_layer(self) -> PersistentLayerKey { match self { - SplitWriterResult::Produced(_) => panic!("unexpected produced layer"), - SplitWriterResult::Discarded(layer) => layer, + BatchWriterResult::Produced(_) => panic!("unexpected produced layer"), + BatchWriterResult::Discarded(layer) => layer, } } } +enum LayerWriterWrapper { + Image(ImageLayerWriter), + Delta(DeltaLayerWriter), +} + +/// An layer writer that takes unfinished layers and finish them atomically. +#[must_use] +pub struct BatchLayerWriter { + generated_layer_writers: Vec<(LayerWriterWrapper, PersistentLayerKey)>, + conf: &'static PageServerConf, +} + +impl BatchLayerWriter { + pub async fn new(conf: &'static PageServerConf) -> anyhow::Result { + Ok(Self { + generated_layer_writers: Vec::new(), + conf, + }) + } + + pub fn add_unfinished_image_writer( + &mut self, + writer: ImageLayerWriter, + key_range: Range, + lsn: Lsn, + ) { + self.generated_layer_writers.push(( + LayerWriterWrapper::Image(writer), + PersistentLayerKey { + key_range, + lsn_range: PersistentLayerDesc::image_layer_lsn_range(lsn), + is_delta: false, + }, + )); + } + + pub fn add_unfinished_delta_writer( + &mut self, + writer: DeltaLayerWriter, + key_range: Range, + lsn_range: Range, + ) { + self.generated_layer_writers.push(( + LayerWriterWrapper::Delta(writer), + PersistentLayerKey { + key_range, + lsn_range, + is_delta: true, + }, + )); + } + + pub(crate) async fn finish_with_discard_fn( + self, + tline: &Arc, + ctx: &RequestContext, + discard_fn: D, + ) -> anyhow::Result> + where + D: Fn(&PersistentLayerKey) -> F, + F: Future, + { + let Self { + generated_layer_writers, + .. + } = self; + let clean_up_layers = |generated_layers: Vec| { + for produced_layer in generated_layers { + if let BatchWriterResult::Produced(resident_layer) = produced_layer { + let layer: Layer = resident_layer.into(); + layer.delete_on_drop(); + } + } + }; + // BEGIN: catch every error and do the recovery in the below section + let mut generated_layers: Vec = Vec::new(); + for (inner, layer_key) in generated_layer_writers { + if discard_fn(&layer_key).await { + generated_layers.push(BatchWriterResult::Discarded(layer_key)); + } else { + let res = match inner { + LayerWriterWrapper::Delta(writer) => { + writer.finish(layer_key.key_range.end, ctx).await + } + LayerWriterWrapper::Image(writer) => { + writer + .finish_with_end_key(layer_key.key_range.end, ctx) + .await + } + }; + let layer = match res { + Ok((desc, path)) => { + match Layer::finish_creating(self.conf, tline, desc, &path) { + Ok(layer) => layer, + Err(e) => { + tokio::fs::remove_file(&path).await.ok(); + clean_up_layers(generated_layers); + return Err(e); + } + } + } + Err(e) => { + // Image/DeltaLayerWriter::finish will clean up the temporary layer if anything goes wrong, + // so we don't need to remove the layer we just failed to create by ourselves. + clean_up_layers(generated_layers); + return Err(e); + } + }; + generated_layers.push(BatchWriterResult::Produced(layer)); + } + } + // END: catch every error and do the recovery in the above section + Ok(generated_layers) + } +} + /// An image writer that takes images and produces multiple image layers. -/// -/// The interface does not guarantee atomicity (i.e., if the image layer generation -/// fails, there might be leftover files to be cleaned up) #[must_use] pub struct SplitImageLayerWriter { inner: ImageLayerWriter, target_layer_size: u64, - generated_layer_writers: Vec<(ImageLayerWriter, PersistentLayerKey)>, + lsn: Lsn, conf: &'static PageServerConf, timeline_id: TimelineId, tenant_shard_id: TenantShardId, - lsn: Lsn, + batches: BatchLayerWriter, start_key: Key, } @@ -71,10 +184,10 @@ impl SplitImageLayerWriter { ctx, ) .await?, - generated_layer_writers: Vec::new(), conf, timeline_id, tenant_shard_id, + batches: BatchLayerWriter::new(conf).await?, lsn, start_key, }) @@ -102,16 +215,13 @@ impl SplitImageLayerWriter { ctx, ) .await?; - let layer_key = PersistentLayerKey { - key_range: self.start_key..key, - lsn_range: PersistentLayerDesc::image_layer_lsn_range(self.lsn), - is_delta: false, - }; let prev_image_writer = std::mem::replace(&mut self.inner, next_image_writer); + self.batches.add_unfinished_image_writer( + prev_image_writer, + self.start_key..key, + self.lsn, + ); self.start_key = key; - - self.generated_layer_writers - .push((prev_image_writer, layer_key)); } self.inner.put_image(key, img, ctx).await } @@ -122,64 +232,18 @@ impl SplitImageLayerWriter { ctx: &RequestContext, end_key: Key, discard_fn: D, - ) -> anyhow::Result> + ) -> anyhow::Result> where D: Fn(&PersistentLayerKey) -> F, F: Future, { let Self { - mut generated_layer_writers, - inner, - .. + mut batches, inner, .. } = self; if inner.num_keys() != 0 { - let layer_key = PersistentLayerKey { - key_range: self.start_key..end_key, - lsn_range: PersistentLayerDesc::image_layer_lsn_range(self.lsn), - is_delta: false, - }; - generated_layer_writers.push((inner, layer_key)); + batches.add_unfinished_image_writer(inner, self.start_key..end_key, self.lsn); } - let clean_up_layers = |generated_layers: Vec| { - for produced_layer in generated_layers { - if let SplitWriterResult::Produced(image_layer) = produced_layer { - let layer: Layer = image_layer.into(); - layer.delete_on_drop(); - } - } - }; - // BEGIN: catch every error and do the recovery in the below section - let mut generated_layers = Vec::new(); - for (inner, layer_key) in generated_layer_writers { - if discard_fn(&layer_key).await { - generated_layers.push(SplitWriterResult::Discarded(layer_key)); - } else { - let layer = match inner - .finish_with_end_key(layer_key.key_range.end, ctx) - .await - { - Ok((desc, path)) => { - match Layer::finish_creating(self.conf, tline, desc, &path) { - Ok(layer) => layer, - Err(e) => { - tokio::fs::remove_file(&path).await.ok(); - clean_up_layers(generated_layers); - return Err(e); - } - } - } - Err(e) => { - // ImageLayerWriter::finish will clean up the temporary layer if anything goes wrong, - // so we don't need to remove the layer we just failed to create by ourselves. - clean_up_layers(generated_layers); - return Err(e); - } - }; - generated_layers.push(SplitWriterResult::Produced(layer)); - } - } - // END: catch every error and do the recovery in the above section - Ok(generated_layers) + batches.finish_with_discard_fn(tline, ctx, discard_fn).await } #[cfg(test)] @@ -188,7 +252,7 @@ impl SplitImageLayerWriter { tline: &Arc, ctx: &RequestContext, end_key: Key, - ) -> anyhow::Result> { + ) -> anyhow::Result> { self.finish_with_discard_fn(tline, ctx, end_key, |_| async { false }) .await } @@ -196,9 +260,6 @@ impl SplitImageLayerWriter { /// A delta writer that takes key-lsn-values and produces multiple delta layers. /// -/// The interface does not guarantee atomicity (i.e., if the delta layer generation fails, -/// there might be leftover files to be cleaned up). -/// /// Note that if updates of a single key exceed the target size limit, all of the updates will be batched /// into a single file. This behavior might change in the future. For reference, the legacy compaction algorithm /// will split them into multiple files based on size. @@ -206,12 +267,12 @@ impl SplitImageLayerWriter { pub struct SplitDeltaLayerWriter { inner: Option<(Key, DeltaLayerWriter)>, target_layer_size: u64, - generated_layer_writers: Vec<(DeltaLayerWriter, PersistentLayerKey)>, conf: &'static PageServerConf, timeline_id: TimelineId, tenant_shard_id: TenantShardId, lsn_range: Range, last_key_written: Key, + batches: BatchLayerWriter, } impl SplitDeltaLayerWriter { @@ -225,12 +286,12 @@ impl SplitDeltaLayerWriter { Ok(Self { target_layer_size, inner: None, - generated_layer_writers: Vec::new(), conf, timeline_id, tenant_shard_id, lsn_range, last_key_written: Key::MIN, + batches: BatchLayerWriter::new(conf).await?, }) } @@ -279,13 +340,11 @@ impl SplitDeltaLayerWriter { .await?; let (start_key, prev_delta_writer) = std::mem::replace(&mut self.inner, Some((key, next_delta_writer))).unwrap(); - let layer_key = PersistentLayerKey { - key_range: start_key..key, - lsn_range: self.lsn_range.clone(), - is_delta: true, - }; - self.generated_layer_writers - .push((prev_delta_writer, layer_key)); + self.batches.add_unfinished_delta_writer( + prev_delta_writer, + start_key..key, + self.lsn_range.clone(), + ); } else if inner.estimated_size() >= S3_UPLOAD_LIMIT { // We have to produce a very large file b/c a key is updated too often. anyhow::bail!( @@ -305,64 +364,25 @@ impl SplitDeltaLayerWriter { tline: &Arc, ctx: &RequestContext, discard_fn: D, - ) -> anyhow::Result> + ) -> anyhow::Result> where D: Fn(&PersistentLayerKey) -> F, F: Future, { let Self { - mut generated_layer_writers, - inner, - .. + mut batches, inner, .. } = self; if let Some((start_key, writer)) = inner { if writer.num_keys() != 0 { let end_key = self.last_key_written.next(); - let layer_key = PersistentLayerKey { - key_range: start_key..end_key, - lsn_range: self.lsn_range.clone(), - is_delta: true, - }; - generated_layer_writers.push((writer, layer_key)); + batches.add_unfinished_delta_writer( + writer, + start_key..end_key, + self.lsn_range.clone(), + ); } } - let clean_up_layers = |generated_layers: Vec| { - for produced_layer in generated_layers { - if let SplitWriterResult::Produced(delta_layer) = produced_layer { - let layer: Layer = delta_layer.into(); - layer.delete_on_drop(); - } - } - }; - // BEGIN: catch every error and do the recovery in the below section - let mut generated_layers = Vec::new(); - for (inner, layer_key) in generated_layer_writers { - if discard_fn(&layer_key).await { - generated_layers.push(SplitWriterResult::Discarded(layer_key)); - } else { - let layer = match inner.finish(layer_key.key_range.end, ctx).await { - Ok((desc, path)) => { - match Layer::finish_creating(self.conf, tline, desc, &path) { - Ok(layer) => layer, - Err(e) => { - tokio::fs::remove_file(&path).await.ok(); - clean_up_layers(generated_layers); - return Err(e); - } - } - } - Err(e) => { - // DeltaLayerWriter::finish will clean up the temporary layer if anything goes wrong, - // so we don't need to remove the layer we just failed to create by ourselves. - clean_up_layers(generated_layers); - return Err(e); - } - }; - generated_layers.push(SplitWriterResult::Produced(layer)); - } - } - // END: catch every error and do the recovery in the above section - Ok(generated_layers) + batches.finish_with_discard_fn(tline, ctx, discard_fn).await } #[cfg(test)] @@ -370,7 +390,7 @@ impl SplitDeltaLayerWriter { self, tline: &Arc, ctx: &RequestContext, - ) -> anyhow::Result> { + ) -> anyhow::Result> { self.finish_with_discard_fn(tline, ctx, |_| async { false }) .await } diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index fa058833d4..ff2be1780e 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -1009,7 +1009,7 @@ impl ImageLayerWriter { self.inner.take().unwrap().finish(ctx, None).await } - /// Finish writing the image layer with an end key, used in [`super::split_writer::SplitImageLayerWriter`]. The end key determines the end of the image layer's covered range and is exclusive. + /// Finish writing the image layer with an end key, used in [`super::batch_split_writer::SplitImageLayerWriter`]. The end key determines the end of the image layer's covered range and is exclusive. pub(super) async fn finish_with_end_key( mut self, end_key: Key, diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 37d907ddcb..6aa5b30f07 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -32,11 +32,11 @@ use crate::page_cache; use crate::statvfs::Statvfs; use crate::tenant::checks::check_valid_layermap; use crate::tenant::remote_timeline_client::WaitCompletionError; +use crate::tenant::storage_layer::batch_split_writer::{ + BatchWriterResult, SplitDeltaLayerWriter, SplitImageLayerWriter, +}; use crate::tenant::storage_layer::filter_iterator::FilterIterator; use crate::tenant::storage_layer::merge_iterator::MergeIterator; -use crate::tenant::storage_layer::split_writer::{ - SplitDeltaLayerWriter, SplitImageLayerWriter, SplitWriterResult, -}; use crate::tenant::storage_layer::{ AsLayerDesc, PersistentLayerDesc, PersistentLayerKey, ValueReconstructState, }; @@ -2038,11 +2038,11 @@ impl Timeline { let produced_image_layers_len = produced_image_layers.len(); for action in produced_delta_layers { match action { - SplitWriterResult::Produced(layer) => { + BatchWriterResult::Produced(layer) => { stat.produce_delta_layer(layer.layer_desc().file_size()); compact_to.push(layer); } - SplitWriterResult::Discarded(l) => { + BatchWriterResult::Discarded(l) => { keep_layers.insert(l); stat.discard_delta_layer(); } @@ -2050,11 +2050,11 @@ impl Timeline { } for action in produced_image_layers { match action { - SplitWriterResult::Produced(layer) => { + BatchWriterResult::Produced(layer) => { stat.produce_image_layer(layer.layer_desc().file_size()); compact_to.push(layer); } - SplitWriterResult::Discarded(l) => { + BatchWriterResult::Discarded(l) => { keep_layers.insert(l); stat.discard_image_layer(); }