Compare commits

...

1 Commits

Author SHA1 Message Date
Alex Chi Z
a6898f8deb feat(pageserver): support multiple key ranges for image initial flush path
Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-05-23 11:51:14 -04:00
2 changed files with 27 additions and 37 deletions

View File

@@ -3882,7 +3882,7 @@ impl Timeline {
} }
} }
let (layers_to_upload, delta_layer_to_add) = if create_image_layer { let (layers_to_upload, delta_layers_to_add) = if create_image_layer {
// Note: The 'ctx' in use here has DownloadBehavior::Error. We should not // Note: The 'ctx' in use here has DownloadBehavior::Error. We should not
// require downloading anything during initial import. // require downloading anything during initial import.
let ((rel_partition, metadata_partition), _lsn) = self let ((rel_partition, metadata_partition), _lsn) = self
@@ -3899,26 +3899,23 @@ impl Timeline {
} }
// For metadata, always create delta layers. // For metadata, always create delta layers.
let delta_layer = if !metadata_partition.parts.is_empty() { let delta_layers = if !metadata_partition.parts.is_empty() {
assert_eq!( // In the current implementation, the metadata partition will only have one part, and the part will only have
metadata_partition.parts.len(), // one single key range. This might change in the future.
1, let mut delta_layers_created = Vec::new();
"currently sparse keyspace should only contain a single aux file keyspace" for ks in &metadata_partition.parts {
); for range in &ks.0.ranges {
let metadata_keyspace = &metadata_partition.parts[0]; let layer = self
assert_eq!( .create_delta_layer(&frozen_layer, Some(range.clone()), ctx)
metadata_keyspace.0.ranges.len(), .await?;
1, if let Some(layer) = layer {
"aux file keyspace should be a single range" delta_layers_created.push(layer);
); }
self.create_delta_layer( }
&frozen_layer, }
Some(metadata_keyspace.0.ranges[0].clone()), delta_layers_created
ctx,
)
.await?
} else { } else {
None Vec::new()
}; };
// For image layers, we add them immediately into the layer map. // For image layers, we add them immediately into the layer map.
@@ -3933,12 +3930,8 @@ impl Timeline {
.await?, .await?,
); );
if let Some(delta_layer) = delta_layer { layers_to_upload.extend(delta_layers.iter().cloned());
layers_to_upload.push(delta_layer.clone()); (layers_to_upload, delta_layers)
(layers_to_upload, Some(delta_layer))
} else {
(layers_to_upload, None)
}
} else { } else {
// Normal case, write out a L0 delta layer file. // Normal case, write out a L0 delta layer file.
// `create_delta_layer` will not modify the layer map. // `create_delta_layer` will not modify the layer map.
@@ -3946,12 +3939,7 @@ impl Timeline {
let Some(layer) = self.create_delta_layer(&frozen_layer, None, ctx).await? else { let Some(layer) = self.create_delta_layer(&frozen_layer, None, ctx).await? else {
panic!("delta layer cannot be empty if no filter is applied"); panic!("delta layer cannot be empty if no filter is applied");
}; };
( (vec![layer.clone()], vec![layer])
// FIXME: even though we have a single image and single delta layer assumption
// we push them to vec
vec![layer.clone()],
Some(layer),
)
}; };
pausable_failpoint!("flush-layer-cancel-after-writing-layer-out-pausable"); pausable_failpoint!("flush-layer-cancel-after-writing-layer-out-pausable");
@@ -3972,7 +3960,7 @@ impl Timeline {
return Err(FlushLayerError::Cancelled); return Err(FlushLayerError::Cancelled);
} }
guard.finish_flush_l0_layer(delta_layer_to_add.as_ref(), &frozen_layer, &self.metrics); guard.finish_flush_l0_layer(&delta_layers_to_add, &frozen_layer, &self.metrics);
if self.set_disk_consistent_lsn(disk_consistent_lsn) { if self.set_disk_consistent_lsn(disk_consistent_lsn) {
// Schedule remote uploads that will reflect our new disk_consistent_lsn // Schedule remote uploads that will reflect our new disk_consistent_lsn

View File

@@ -166,7 +166,7 @@ impl LayerManager {
/// Flush a frozen layer and add the written delta layer to the layer map. /// Flush a frozen layer and add the written delta layer to the layer map.
pub(crate) fn finish_flush_l0_layer( pub(crate) fn finish_flush_l0_layer(
&mut self, &mut self,
delta_layer: Option<&ResidentLayer>, delta_layers: &[ResidentLayer],
frozen_layer_for_check: &Arc<InMemoryLayer>, frozen_layer_for_check: &Arc<InMemoryLayer>,
metrics: &TimelineMetrics, metrics: &TimelineMetrics,
) { ) {
@@ -181,10 +181,12 @@ impl LayerManager {
// layer to disk at the same time, that would not work. // layer to disk at the same time, that would not work.
assert_eq!(Arc::as_ptr(&inmem), Arc::as_ptr(frozen_layer_for_check)); assert_eq!(Arc::as_ptr(&inmem), Arc::as_ptr(frozen_layer_for_check));
if let Some(l) = delta_layer { if !delta_layers.is_empty() {
let mut updates = self.layer_map.batch_update(); let mut updates = self.layer_map.batch_update();
Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr); for l in delta_layers {
metrics.record_new_file_metrics(l.layer_desc().file_size); Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
metrics.record_new_file_metrics(l.layer_desc().file_size);
}
updates.flush(); updates.flush();
} }
} }