From 8ee5588a6fa710764f86a8b53017d769a6dfdd16 Mon Sep 17 00:00:00 2001 From: Alex Chi Z Date: Wed, 30 Oct 2024 16:37:51 -0400 Subject: [PATCH] refactor(pageserver): make image layer creation atomic Signed-off-by: Alex Chi Z --- .../storage_layer/batch_split_writer.rs | 2 + .../src/tenant/storage_layer/image_layer.rs | 9 ++ pageserver/src/tenant/timeline.rs | 100 ++++++++++++------ pageserver/src/tenant/timeline/compaction.rs | 5 +- 4 files changed, 81 insertions(+), 35 deletions(-) diff --git a/pageserver/src/tenant/storage_layer/batch_split_writer.rs b/pageserver/src/tenant/storage_layer/batch_split_writer.rs index 8a397ceb7a..97a5431b33 100644 --- a/pageserver/src/tenant/storage_layer/batch_split_writer.rs +++ b/pageserver/src/tenant/storage_layer/batch_split_writer.rs @@ -120,6 +120,8 @@ impl BatchLayerWriter { writer.finish(layer_key.key_range.end, ctx).await } LayerWriterWrapper::Image(writer) => { + assert_eq!(writer.key_range().start, layer_key.key_range.start); + assert_eq!(writer.lsn(), layer_key.lsn_range.start); writer .finish_with_end_key(layer_key.key_range.end, ctx) .await diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 834d1931d0..bd9d3ab4f4 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -885,6 +885,7 @@ impl ImageLayerWriterInner { } let final_key_range = if let Some(end_key) = end_key { + assert!(end_key <= self.key_range.end); self.key_range.start..end_key } else { self.key_range.clone() @@ -1033,6 +1034,14 @@ impl ImageLayerWriter { ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> { self.inner.take().unwrap().finish(ctx, Some(end_key)).await } + + pub(crate) fn key_range(&self) -> Range { + self.inner.as_ref().unwrap().key_range.clone() + } + + pub(crate) fn lsn(&self) -> Lsn { + self.inner.as_ref().unwrap().lsn + } } impl Drop for ImageLayerWriter { diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 12919866a3..5c10360435 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -20,6 +20,7 @@ use chrono::{DateTime, Utc}; use enumset::EnumSet; use fail::fail_point; use handle::ShardTimelineId; +use itertools::Itertools; use offload::OffloadError; use once_cell::sync::Lazy; use pageserver_api::{ @@ -65,13 +66,14 @@ use std::{ }; use std::{pin::pin, sync::OnceLock}; +use crate::pgdatadir_mapping::MAX_AUX_FILE_V2_DELTAS; use crate::{ aux_file::AuxFileSizeEstimator, tenant::{ config::AttachmentMode, layer_map::{LayerMap, SearchResult}, metadata::TimelineMetadata, - storage_layer::{inmemory_layer::IndexEntry, PersistentLayerDesc}, + storage_layer::inmemory_layer::IndexEntry, }, walingest::WalLagCooldown, walredo, @@ -104,7 +106,6 @@ use crate::{ virtual_file::{MaybeFatalIo, VirtualFile}, }; use crate::{pgdatadir_mapping::LsnForTimestamp, tenant::tasks::BackgroundLoopKind}; -use crate::{pgdatadir_mapping::MAX_AUX_FILE_V2_DELTAS, tenant::storage_layer::PersistentLayerKey}; use pageserver_api::config::tenant_conf_defaults::DEFAULT_PITR_INTERVAL; use crate::config::PageServerConf; @@ -142,7 +143,10 @@ use self::walreceiver::{WalReceiver, WalReceiverConf}; use super::{ config::TenantConf, - storage_layer::{inmemory_layer, LayerVisibilityHint}, + storage_layer::{ + batch_split_writer::{BatchLayerWriter, BatchWriterResult}, + inmemory_layer, LayerVisibilityHint, + }, upload_queue::NotInitialized, MaybeOffloaded, }; @@ -856,7 +860,7 @@ pub(crate) enum ShutdownMode { } struct ImageLayerCreationOutcome { - image: Option, + image: Option, next_start_key: Key, } @@ -4053,11 +4057,14 @@ impl Timeline { if wrote_keys { // Normal path: we have written some data into the new image layer for this // partition, so flush it to disk. - let (desc, path) = image_layer_writer.finish(ctx).await?; - let image_layer = Layer::finish_creating(self.conf, self, desc, &path)?; - info!("created image layer for rel {}", image_layer.local_path()); + info!( + "flushed image layer for rel key_start={} key_end={} lsn={}", + image_layer_writer.key_range().start, + image_layer_writer.key_range().end, + image_layer_writer.lsn() + ); Ok(ImageLayerCreationOutcome { - image: Some(image_layer), + image: Some(image_layer_writer), next_start_key: img_range.end, }) } else { @@ -4143,14 +4150,14 @@ impl Timeline { if wrote_any_image { // Normal path: we have written some data into the new image layer for this // partition, so flush it to disk. - let (desc, path) = image_layer_writer.finish(ctx).await?; - let image_layer = Layer::finish_creating(self.conf, self, desc, &path)?; info!( - "created image layer for metadata {}", - image_layer.local_path() + "flushed image layer for metadata key_start={} key_end={} lsn={}", + image_layer_writer.key_range().start, + image_layer_writer.key_range().end, + image_layer_writer.lsn() ); Ok(ImageLayerCreationOutcome { - image: Some(image_layer), + image: Some(image_layer_writer), next_start_key: img_range.end, }) } else { @@ -4227,7 +4234,6 @@ impl Timeline { ctx: &RequestContext, ) -> Result, CreateImageLayersError> { let timer = self.metrics.create_images_time_histo.start_timer(); - let mut image_layers = Vec::new(); // We need to avoid holes between generated image layers. // Otherwise LayerMap::image_layer_exists will return false if key range of some layer is covered by more than one @@ -4242,6 +4248,8 @@ impl Timeline { let check_for_image_layers = self.should_check_if_image_layers_required(lsn); + let mut batch_image_writer = BatchLayerWriter::new(self.conf).await?; + for partition in partitioning.parts.iter() { if self.cancel.is_cancelled() { return Err(CreateImageLayersError::Cancelled); @@ -4272,24 +4280,6 @@ impl Timeline { continue; } } - if let ImageLayerCreationMode::Force = mode { - // When forced to create image layers, we might try and create them where they already - // exist. This mode is only used in tests/debug. - let layers = self.layers.read().await; - if layers.contains_key(&PersistentLayerKey { - key_range: img_range.clone(), - lsn_range: PersistentLayerDesc::image_layer_lsn_range(lsn), - is_delta: false, - }) { - tracing::info!( - "Skipping image layer at {lsn} {}..{}, already exists", - img_range.start, - img_range.end - ); - start = img_range.end; - continue; - } - } let image_layer_writer = ImageLayerWriter::new( self.conf, @@ -4323,7 +4313,11 @@ impl Timeline { .await?; start = next_start_key; - image_layers.extend(image); + if let Some(image) = image { + let key_range = image.key_range(); + let lsn = image.lsn(); + batch_image_writer.add_unfinished_image_writer(image, key_range, lsn); + } } else { let ImageLayerCreationOutcome { image, @@ -4340,10 +4334,48 @@ impl Timeline { ) .await?; start = next_start_key; - image_layers.extend(image); + if let Some(image) = image { + let key_range = image.key_range(); + let lsn = image.lsn(); + batch_image_writer.add_unfinished_image_writer(image, key_range, lsn); + } } } + let image_layers = batch_image_writer + .finish_with_discard_fn(self, ctx, |key| { + // TODO: remove this clone when Rust Edition 2024 is available, closure should capture this + // lifetime. + let key = key.clone(); + async move { + // When forced to create image layers, we might try and create them where they already + // exist. The force mode is only used in tests/debug. + let layers = self.layers.read().await; + if layers.contains_key(&key) { + tracing::info!( + "Skipping image layer at {} {}..{}, already exists", + key.lsn_range.start, + key.key_range.start, + key.key_range.end + ); + true + } else { + false + } + } + }) + .await?; + let image_layers = image_layers + .into_iter() + .filter_map(|x| { + if let BatchWriterResult::Produced(x) = x { + Some(x) + } else { + None + } + }) + .collect_vec(); + let mut guard = self.layers.write().await; // FIXME: we could add the images to be uploaded *before* returning from here, but right diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 01c2803881..871db82d21 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -2461,7 +2461,10 @@ impl TimelineAdaptor { ) .await?; - if let Some(image_layer) = image { + if let Some(image_layer_writer) = image { + let (desc, path) = image_layer_writer.finish(ctx).await?; + let image_layer = + Layer::finish_creating(self.timeline.conf, &self.timeline, desc, &path)?; self.new_images.push(image_layer); }