diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index c24d037dde..c1fe67c87c 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -33,6 +33,7 @@ use utils::sync::gate::GateGuard; use utils::lsn::Lsn; +pub use batch_split_writer::{BatchLayerWriter, SplitDeltaLayerWriter, SplitImageLayerWriter}; pub use delta_layer::{DeltaLayer, DeltaLayerWriter, ValueRef}; pub use image_layer::{ImageLayer, ImageLayerWriter}; pub use inmemory_layer::InMemoryLayer; diff --git a/pageserver/src/tenant/storage_layer/batch_split_writer.rs b/pageserver/src/tenant/storage_layer/batch_split_writer.rs index 8a397ceb7a..22d8b81bcc 100644 --- a/pageserver/src/tenant/storage_layer/batch_split_writer.rs +++ b/pageserver/src/tenant/storage_layer/batch_split_writer.rs @@ -87,6 +87,23 @@ impl BatchLayerWriter { )); } + pub(crate) async fn finish( + self, + tline: &Arc, + ctx: &RequestContext, + ) -> anyhow::Result> { + let res = self + .finish_with_discard_fn(tline, ctx, |_| async { false }) + .await?; + let mut output = Vec::new(); + for r in res { + if let BatchWriterResult::Produced(layer) = r { + output.push(layer); + } + } + Ok(output) + } + pub(crate) async fn finish_with_discard_fn( self, tline: &Arc, diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 076220df51..2033ebcdeb 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -70,6 +70,7 @@ use std::sync::{Arc, Mutex, OnceLock, RwLock, Weak}; use std::time::{Duration, Instant, SystemTime}; use crate::l0_flush::{self, L0FlushGlobalState}; +use crate::tenant::storage_layer::ImageLayerName; use crate::{ aux_file::AuxFileSizeEstimator, page_service::TenantManagerTypes, @@ -78,7 +79,7 @@ use crate::{ layer_map::{LayerMap, SearchResult}, metadata::TimelineMetadata, storage_layer::{ - inmemory_layer::IndexEntry, IoConcurrency, PersistentLayerDesc, + inmemory_layer::IndexEntry, BatchLayerWriter, IoConcurrency, PersistentLayerDesc, ValueReconstructSituation, }, }, @@ -933,7 +934,7 @@ pub(crate) enum ShutdownMode { } struct ImageLayerCreationOutcome { - image: Option, + unfinished_image_layer: Option, next_start_key: Key, } @@ -4405,11 +4406,15 @@ impl Timeline { if wrote_keys { // Normal path: we have written some data into the new image layer for this // partition, so flush it to disk. - let (desc, path) = image_layer_writer.finish(ctx).await?; - let image_layer = Layer::finish_creating(self.conf, self, desc, &path)?; - info!("created image layer for rel {}", image_layer.local_path()); + info!( + "produced image layer for rel {}", + ImageLayerName { + key_range: img_range.clone(), + lsn + }, + ); Ok(ImageLayerCreationOutcome { - image: Some(image_layer), + unfinished_image_layer: Some(image_layer_writer), next_start_key: img_range.end, }) } else { @@ -4419,7 +4424,7 @@ impl Timeline { // layer we write will cover the key range that we just scanned. tracing::debug!("no data in range {}-{}", img_range.start, img_range.end); Ok(ImageLayerCreationOutcome { - image: None, + unfinished_image_layer: None, next_start_key: start, }) } @@ -4468,7 +4473,7 @@ impl Timeline { if !trigger_generation && mode == ImageLayerCreationMode::Try { return Ok(ImageLayerCreationOutcome { - image: None, + unfinished_image_layer: None, next_start_key: img_range.end, }); } @@ -4494,14 +4499,15 @@ impl Timeline { if wrote_any_image { // Normal path: we have written some data into the new image layer for this // partition, so flush it to disk. - let (desc, path) = image_layer_writer.finish(ctx).await?; - let image_layer = Layer::finish_creating(self.conf, self, desc, &path)?; info!( "created image layer for metadata {}", - image_layer.local_path() + ImageLayerName { + key_range: img_range.clone(), + lsn + } ); Ok(ImageLayerCreationOutcome { - image: Some(image_layer), + unfinished_image_layer: Some(image_layer_writer), next_start_key: img_range.end, }) } else { @@ -4511,7 +4517,7 @@ impl Timeline { // layer we write will cover the key range that we just scanned. tracing::debug!("no data in range {}-{}", img_range.start, img_range.end); Ok(ImageLayerCreationOutcome { - image: None, + unfinished_image_layer: None, next_start_key: start, }) } @@ -4578,7 +4584,6 @@ impl Timeline { ctx: &RequestContext, ) -> Result, CreateImageLayersError> { let timer = self.metrics.create_images_time_histo.start_timer(); - let mut image_layers = Vec::new(); // We need to avoid holes between generated image layers. // Otherwise LayerMap::image_layer_exists will return false if key range of some layer is covered by more than one @@ -4593,6 +4598,8 @@ impl Timeline { let check_for_image_layers = self.should_check_if_image_layers_required(lsn); + let mut batch_image_writer = BatchLayerWriter::new(self.conf).await?; + for partition in partitioning.parts.iter() { if self.cancel.is_cancelled() { return Err(CreateImageLayersError::Cancelled); @@ -4665,45 +4672,45 @@ impl Timeline { .map_err(|_| CreateImageLayersError::Cancelled)?, ); - if !compact_metadata { - let ImageLayerCreationOutcome { - image, - next_start_key, - } = self - .create_image_layer_for_rel_blocks( - partition, - image_layer_writer, - lsn, - ctx, - img_range, - start, - io_concurrency, - ) - .await?; - - start = next_start_key; - image_layers.extend(image); + let ImageLayerCreationOutcome { + unfinished_image_layer, + next_start_key, + } = if !compact_metadata { + self.create_image_layer_for_rel_blocks( + partition, + image_layer_writer, + lsn, + ctx, + img_range.clone(), + start, + io_concurrency, + ) + .await? } else { - let ImageLayerCreationOutcome { - image, - next_start_key, - } = self - .create_image_layer_for_metadata_keys( - partition, - image_layer_writer, - lsn, - ctx, - img_range, - mode, - start, - io_concurrency, - ) - .await?; - start = next_start_key; - image_layers.extend(image); + self.create_image_layer_for_metadata_keys( + partition, + image_layer_writer, + lsn, + ctx, + img_range.clone(), + mode, + start, + io_concurrency, + ) + .await? + }; + start = next_start_key; + if let Some(unfinished_image_layer) = unfinished_image_layer { + batch_image_writer.add_unfinished_image_writer( + unfinished_image_layer, + img_range, + lsn, + ); } } + let image_layers = batch_image_writer.finish(self, ctx).await?; + let mut guard = self.layers.write().await; // FIXME: we could add the images to be uploaded *before* returning from here, but right diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 28c3381318..ad19738bc2 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -3197,7 +3197,7 @@ impl TimelineAdaptor { // TODO set proper (stateful) start. The create_image_layer_for_rel_blocks function mostly let start = Key::MIN; let ImageLayerCreationOutcome { - image, + unfinished_image_layer, next_start_key: _, } = self .timeline @@ -3212,7 +3212,10 @@ impl TimelineAdaptor { ) .await?; - if let Some(image_layer) = image { + if let Some(image_layer_writer) = unfinished_image_layer { + let (desc, path) = image_layer_writer.finish(ctx).await?; + let image_layer = + Layer::finish_creating(self.timeline.conf, &self.timeline, desc, &path)?; self.new_images.push(image_layer); }