mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 09:22:55 +00:00
feat(pageserver): support multiple key ranges for image initial flush path
Signed-off-by: Alex Chi Z <chi@neon.tech>
This commit is contained in:
@@ -3882,7 +3882,7 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
let (layers_to_upload, delta_layer_to_add) = if create_image_layer {
|
||||
let (layers_to_upload, delta_layers_to_add) = if create_image_layer {
|
||||
// Note: The 'ctx' in use here has DownloadBehavior::Error. We should not
|
||||
// require downloading anything during initial import.
|
||||
let ((rel_partition, metadata_partition), _lsn) = self
|
||||
@@ -3899,26 +3899,23 @@ impl Timeline {
|
||||
}
|
||||
|
||||
// For metadata, always create delta layers.
|
||||
let delta_layer = if !metadata_partition.parts.is_empty() {
|
||||
assert_eq!(
|
||||
metadata_partition.parts.len(),
|
||||
1,
|
||||
"currently sparse keyspace should only contain a single aux file keyspace"
|
||||
);
|
||||
let metadata_keyspace = &metadata_partition.parts[0];
|
||||
assert_eq!(
|
||||
metadata_keyspace.0.ranges.len(),
|
||||
1,
|
||||
"aux file keyspace should be a single range"
|
||||
);
|
||||
self.create_delta_layer(
|
||||
&frozen_layer,
|
||||
Some(metadata_keyspace.0.ranges[0].clone()),
|
||||
ctx,
|
||||
)
|
||||
.await?
|
||||
let delta_layers = if !metadata_partition.parts.is_empty() {
|
||||
// In the current implementation, the metadata partition will only have one part, and the part will only have
|
||||
// one single key range. This might change in the future.
|
||||
let mut delta_layers_created = Vec::new();
|
||||
for ks in &metadata_partition.parts {
|
||||
for range in &ks.0.ranges {
|
||||
let layer = self
|
||||
.create_delta_layer(&frozen_layer, Some(range.clone()), ctx)
|
||||
.await?;
|
||||
if let Some(layer) = layer {
|
||||
delta_layers_created.push(layer);
|
||||
}
|
||||
}
|
||||
}
|
||||
delta_layers_created
|
||||
} else {
|
||||
None
|
||||
Vec::new()
|
||||
};
|
||||
|
||||
// For image layers, we add them immediately into the layer map.
|
||||
@@ -3933,12 +3930,8 @@ impl Timeline {
|
||||
.await?,
|
||||
);
|
||||
|
||||
if let Some(delta_layer) = delta_layer {
|
||||
layers_to_upload.push(delta_layer.clone());
|
||||
(layers_to_upload, Some(delta_layer))
|
||||
} else {
|
||||
(layers_to_upload, None)
|
||||
}
|
||||
layers_to_upload.extend(delta_layers.iter().cloned());
|
||||
(layers_to_upload, delta_layers)
|
||||
} else {
|
||||
// Normal case, write out a L0 delta layer file.
|
||||
// `create_delta_layer` will not modify the layer map.
|
||||
@@ -3946,12 +3939,7 @@ impl Timeline {
|
||||
let Some(layer) = self.create_delta_layer(&frozen_layer, None, ctx).await? else {
|
||||
panic!("delta layer cannot be empty if no filter is applied");
|
||||
};
|
||||
(
|
||||
// FIXME: even though we have a single image and single delta layer assumption
|
||||
// we push them to vec
|
||||
vec![layer.clone()],
|
||||
Some(layer),
|
||||
)
|
||||
(vec![layer.clone()], vec![layer])
|
||||
};
|
||||
|
||||
pausable_failpoint!("flush-layer-cancel-after-writing-layer-out-pausable");
|
||||
@@ -3972,7 +3960,7 @@ impl Timeline {
|
||||
return Err(FlushLayerError::Cancelled);
|
||||
}
|
||||
|
||||
guard.finish_flush_l0_layer(delta_layer_to_add.as_ref(), &frozen_layer, &self.metrics);
|
||||
guard.finish_flush_l0_layer(&delta_layers_to_add, &frozen_layer, &self.metrics);
|
||||
|
||||
if self.set_disk_consistent_lsn(disk_consistent_lsn) {
|
||||
// Schedule remote uploads that will reflect our new disk_consistent_lsn
|
||||
|
||||
@@ -166,7 +166,7 @@ impl LayerManager {
|
||||
/// Flush a frozen layer and add the written delta layer to the layer map.
|
||||
pub(crate) fn finish_flush_l0_layer(
|
||||
&mut self,
|
||||
delta_layer: Option<&ResidentLayer>,
|
||||
delta_layers: &[ResidentLayer],
|
||||
frozen_layer_for_check: &Arc<InMemoryLayer>,
|
||||
metrics: &TimelineMetrics,
|
||||
) {
|
||||
@@ -181,10 +181,12 @@ impl LayerManager {
|
||||
// layer to disk at the same time, that would not work.
|
||||
assert_eq!(Arc::as_ptr(&inmem), Arc::as_ptr(frozen_layer_for_check));
|
||||
|
||||
if let Some(l) = delta_layer {
|
||||
if !delta_layers.is_empty() {
|
||||
let mut updates = self.layer_map.batch_update();
|
||||
Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
|
||||
metrics.record_new_file_metrics(l.layer_desc().file_size);
|
||||
for l in delta_layers {
|
||||
Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
|
||||
metrics.record_new_file_metrics(l.layer_desc().file_size);
|
||||
}
|
||||
updates.flush();
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user