From a6898f8deb04ce0033ab4e0d07da0f96627ef234 Mon Sep 17 00:00:00 2001 From: Alex Chi Z Date: Thu, 23 May 2024 11:51:14 -0400 Subject: [PATCH] feat(pageserver): support multiple key ranges for image initial flush path Signed-off-by: Alex Chi Z --- pageserver/src/tenant/timeline.rs | 54 ++++++++----------- .../src/tenant/timeline/layer_manager.rs | 10 ++-- 2 files changed, 27 insertions(+), 37 deletions(-) diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 262e1896ce..3c3a21f942 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -3882,7 +3882,7 @@ impl Timeline { } } - let (layers_to_upload, delta_layer_to_add) = if create_image_layer { + let (layers_to_upload, delta_layers_to_add) = if create_image_layer { // Note: The 'ctx' in use here has DownloadBehavior::Error. We should not // require downloading anything during initial import. let ((rel_partition, metadata_partition), _lsn) = self @@ -3899,26 +3899,23 @@ impl Timeline { } // For metadata, always create delta layers. - let delta_layer = if !metadata_partition.parts.is_empty() { - assert_eq!( - metadata_partition.parts.len(), - 1, - "currently sparse keyspace should only contain a single aux file keyspace" - ); - let metadata_keyspace = &metadata_partition.parts[0]; - assert_eq!( - metadata_keyspace.0.ranges.len(), - 1, - "aux file keyspace should be a single range" - ); - self.create_delta_layer( - &frozen_layer, - Some(metadata_keyspace.0.ranges[0].clone()), - ctx, - ) - .await? + let delta_layers = if !metadata_partition.parts.is_empty() { + // In the current implementation, the metadata partition will only have one part, and the part will only have + // one single key range. This might change in the future. + let mut delta_layers_created = Vec::new(); + for ks in &metadata_partition.parts { + for range in &ks.0.ranges { + let layer = self + .create_delta_layer(&frozen_layer, Some(range.clone()), ctx) + .await?; + if let Some(layer) = layer { + delta_layers_created.push(layer); + } + } + } + delta_layers_created } else { - None + Vec::new() }; // For image layers, we add them immediately into the layer map. @@ -3933,12 +3930,8 @@ impl Timeline { .await?, ); - if let Some(delta_layer) = delta_layer { - layers_to_upload.push(delta_layer.clone()); - (layers_to_upload, Some(delta_layer)) - } else { - (layers_to_upload, None) - } + layers_to_upload.extend(delta_layers.iter().cloned()); + (layers_to_upload, delta_layers) } else { // Normal case, write out a L0 delta layer file. // `create_delta_layer` will not modify the layer map. @@ -3946,12 +3939,7 @@ impl Timeline { let Some(layer) = self.create_delta_layer(&frozen_layer, None, ctx).await? else { panic!("delta layer cannot be empty if no filter is applied"); }; - ( - // FIXME: even though we have a single image and single delta layer assumption - // we push them to vec - vec![layer.clone()], - Some(layer), - ) + (vec![layer.clone()], vec![layer]) }; pausable_failpoint!("flush-layer-cancel-after-writing-layer-out-pausable"); @@ -3972,7 +3960,7 @@ impl Timeline { return Err(FlushLayerError::Cancelled); } - guard.finish_flush_l0_layer(delta_layer_to_add.as_ref(), &frozen_layer, &self.metrics); + guard.finish_flush_l0_layer(&delta_layers_to_add, &frozen_layer, &self.metrics); if self.set_disk_consistent_lsn(disk_consistent_lsn) { // Schedule remote uploads that will reflect our new disk_consistent_lsn diff --git a/pageserver/src/tenant/timeline/layer_manager.rs b/pageserver/src/tenant/timeline/layer_manager.rs index 248420e632..06b199c0de 100644 --- a/pageserver/src/tenant/timeline/layer_manager.rs +++ b/pageserver/src/tenant/timeline/layer_manager.rs @@ -166,7 +166,7 @@ impl LayerManager { /// Flush a frozen layer and add the written delta layer to the layer map. pub(crate) fn finish_flush_l0_layer( &mut self, - delta_layer: Option<&ResidentLayer>, + delta_layers: &[ResidentLayer], frozen_layer_for_check: &Arc, metrics: &TimelineMetrics, ) { @@ -181,10 +181,12 @@ impl LayerManager { // layer to disk at the same time, that would not work. assert_eq!(Arc::as_ptr(&inmem), Arc::as_ptr(frozen_layer_for_check)); - if let Some(l) = delta_layer { + if !delta_layers.is_empty() { let mut updates = self.layer_map.batch_update(); - Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr); - metrics.record_new_file_metrics(l.layer_desc().file_size); + for l in delta_layers { + Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr); + metrics.record_new_file_metrics(l.layer_desc().file_size); + } updates.flush(); } }