mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-14 00:42:54 +00:00
fix(pageserver): make image layer generation atomic (#10516)
## Problem close https://github.com/neondatabase/neon/issues/8362 ## Summary of changes Use `BatchLayerWriter` to ensure we clean up image layers after failed compaction. Signed-off-by: Alex Chi Z <chi@neon.tech>
This commit is contained in:
@@ -33,6 +33,7 @@ use utils::sync::gate::GateGuard;
|
||||
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
pub use batch_split_writer::{BatchLayerWriter, SplitDeltaLayerWriter, SplitImageLayerWriter};
|
||||
pub use delta_layer::{DeltaLayer, DeltaLayerWriter, ValueRef};
|
||||
pub use image_layer::{ImageLayer, ImageLayerWriter};
|
||||
pub use inmemory_layer::InMemoryLayer;
|
||||
|
||||
@@ -87,6 +87,23 @@ impl BatchLayerWriter {
|
||||
));
|
||||
}
|
||||
|
||||
pub(crate) async fn finish(
|
||||
self,
|
||||
tline: &Arc<Timeline>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Vec<ResidentLayer>> {
|
||||
let res = self
|
||||
.finish_with_discard_fn(tline, ctx, |_| async { false })
|
||||
.await?;
|
||||
let mut output = Vec::new();
|
||||
for r in res {
|
||||
if let BatchWriterResult::Produced(layer) = r {
|
||||
output.push(layer);
|
||||
}
|
||||
}
|
||||
Ok(output)
|
||||
}
|
||||
|
||||
pub(crate) async fn finish_with_discard_fn<D, F>(
|
||||
self,
|
||||
tline: &Arc<Timeline>,
|
||||
|
||||
@@ -70,6 +70,7 @@ use std::sync::{Arc, Mutex, OnceLock, RwLock, Weak};
|
||||
use std::time::{Duration, Instant, SystemTime};
|
||||
|
||||
use crate::l0_flush::{self, L0FlushGlobalState};
|
||||
use crate::tenant::storage_layer::ImageLayerName;
|
||||
use crate::{
|
||||
aux_file::AuxFileSizeEstimator,
|
||||
page_service::TenantManagerTypes,
|
||||
@@ -78,7 +79,7 @@ use crate::{
|
||||
layer_map::{LayerMap, SearchResult},
|
||||
metadata::TimelineMetadata,
|
||||
storage_layer::{
|
||||
inmemory_layer::IndexEntry, IoConcurrency, PersistentLayerDesc,
|
||||
inmemory_layer::IndexEntry, BatchLayerWriter, IoConcurrency, PersistentLayerDesc,
|
||||
ValueReconstructSituation,
|
||||
},
|
||||
},
|
||||
@@ -933,7 +934,7 @@ pub(crate) enum ShutdownMode {
|
||||
}
|
||||
|
||||
struct ImageLayerCreationOutcome {
|
||||
image: Option<ResidentLayer>,
|
||||
unfinished_image_layer: Option<ImageLayerWriter>,
|
||||
next_start_key: Key,
|
||||
}
|
||||
|
||||
@@ -4405,11 +4406,15 @@ impl Timeline {
|
||||
if wrote_keys {
|
||||
// Normal path: we have written some data into the new image layer for this
|
||||
// partition, so flush it to disk.
|
||||
let (desc, path) = image_layer_writer.finish(ctx).await?;
|
||||
let image_layer = Layer::finish_creating(self.conf, self, desc, &path)?;
|
||||
info!("created image layer for rel {}", image_layer.local_path());
|
||||
info!(
|
||||
"produced image layer for rel {}",
|
||||
ImageLayerName {
|
||||
key_range: img_range.clone(),
|
||||
lsn
|
||||
},
|
||||
);
|
||||
Ok(ImageLayerCreationOutcome {
|
||||
image: Some(image_layer),
|
||||
unfinished_image_layer: Some(image_layer_writer),
|
||||
next_start_key: img_range.end,
|
||||
})
|
||||
} else {
|
||||
@@ -4419,7 +4424,7 @@ impl Timeline {
|
||||
// layer we write will cover the key range that we just scanned.
|
||||
tracing::debug!("no data in range {}-{}", img_range.start, img_range.end);
|
||||
Ok(ImageLayerCreationOutcome {
|
||||
image: None,
|
||||
unfinished_image_layer: None,
|
||||
next_start_key: start,
|
||||
})
|
||||
}
|
||||
@@ -4468,7 +4473,7 @@ impl Timeline {
|
||||
|
||||
if !trigger_generation && mode == ImageLayerCreationMode::Try {
|
||||
return Ok(ImageLayerCreationOutcome {
|
||||
image: None,
|
||||
unfinished_image_layer: None,
|
||||
next_start_key: img_range.end,
|
||||
});
|
||||
}
|
||||
@@ -4494,14 +4499,15 @@ impl Timeline {
|
||||
if wrote_any_image {
|
||||
// Normal path: we have written some data into the new image layer for this
|
||||
// partition, so flush it to disk.
|
||||
let (desc, path) = image_layer_writer.finish(ctx).await?;
|
||||
let image_layer = Layer::finish_creating(self.conf, self, desc, &path)?;
|
||||
info!(
|
||||
"created image layer for metadata {}",
|
||||
image_layer.local_path()
|
||||
ImageLayerName {
|
||||
key_range: img_range.clone(),
|
||||
lsn
|
||||
}
|
||||
);
|
||||
Ok(ImageLayerCreationOutcome {
|
||||
image: Some(image_layer),
|
||||
unfinished_image_layer: Some(image_layer_writer),
|
||||
next_start_key: img_range.end,
|
||||
})
|
||||
} else {
|
||||
@@ -4511,7 +4517,7 @@ impl Timeline {
|
||||
// layer we write will cover the key range that we just scanned.
|
||||
tracing::debug!("no data in range {}-{}", img_range.start, img_range.end);
|
||||
Ok(ImageLayerCreationOutcome {
|
||||
image: None,
|
||||
unfinished_image_layer: None,
|
||||
next_start_key: start,
|
||||
})
|
||||
}
|
||||
@@ -4578,7 +4584,6 @@ impl Timeline {
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Vec<ResidentLayer>, CreateImageLayersError> {
|
||||
let timer = self.metrics.create_images_time_histo.start_timer();
|
||||
let mut image_layers = Vec::new();
|
||||
|
||||
// We need to avoid holes between generated image layers.
|
||||
// Otherwise LayerMap::image_layer_exists will return false if key range of some layer is covered by more than one
|
||||
@@ -4593,6 +4598,8 @@ impl Timeline {
|
||||
|
||||
let check_for_image_layers = self.should_check_if_image_layers_required(lsn);
|
||||
|
||||
let mut batch_image_writer = BatchLayerWriter::new(self.conf).await?;
|
||||
|
||||
for partition in partitioning.parts.iter() {
|
||||
if self.cancel.is_cancelled() {
|
||||
return Err(CreateImageLayersError::Cancelled);
|
||||
@@ -4665,45 +4672,45 @@ impl Timeline {
|
||||
.map_err(|_| CreateImageLayersError::Cancelled)?,
|
||||
);
|
||||
|
||||
if !compact_metadata {
|
||||
let ImageLayerCreationOutcome {
|
||||
image,
|
||||
next_start_key,
|
||||
} = self
|
||||
.create_image_layer_for_rel_blocks(
|
||||
partition,
|
||||
image_layer_writer,
|
||||
lsn,
|
||||
ctx,
|
||||
img_range,
|
||||
start,
|
||||
io_concurrency,
|
||||
)
|
||||
.await?;
|
||||
|
||||
start = next_start_key;
|
||||
image_layers.extend(image);
|
||||
let ImageLayerCreationOutcome {
|
||||
unfinished_image_layer,
|
||||
next_start_key,
|
||||
} = if !compact_metadata {
|
||||
self.create_image_layer_for_rel_blocks(
|
||||
partition,
|
||||
image_layer_writer,
|
||||
lsn,
|
||||
ctx,
|
||||
img_range.clone(),
|
||||
start,
|
||||
io_concurrency,
|
||||
)
|
||||
.await?
|
||||
} else {
|
||||
let ImageLayerCreationOutcome {
|
||||
image,
|
||||
next_start_key,
|
||||
} = self
|
||||
.create_image_layer_for_metadata_keys(
|
||||
partition,
|
||||
image_layer_writer,
|
||||
lsn,
|
||||
ctx,
|
||||
img_range,
|
||||
mode,
|
||||
start,
|
||||
io_concurrency,
|
||||
)
|
||||
.await?;
|
||||
start = next_start_key;
|
||||
image_layers.extend(image);
|
||||
self.create_image_layer_for_metadata_keys(
|
||||
partition,
|
||||
image_layer_writer,
|
||||
lsn,
|
||||
ctx,
|
||||
img_range.clone(),
|
||||
mode,
|
||||
start,
|
||||
io_concurrency,
|
||||
)
|
||||
.await?
|
||||
};
|
||||
start = next_start_key;
|
||||
if let Some(unfinished_image_layer) = unfinished_image_layer {
|
||||
batch_image_writer.add_unfinished_image_writer(
|
||||
unfinished_image_layer,
|
||||
img_range,
|
||||
lsn,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
let image_layers = batch_image_writer.finish(self, ctx).await?;
|
||||
|
||||
let mut guard = self.layers.write().await;
|
||||
|
||||
// FIXME: we could add the images to be uploaded *before* returning from here, but right
|
||||
|
||||
@@ -3197,7 +3197,7 @@ impl TimelineAdaptor {
|
||||
// TODO set proper (stateful) start. The create_image_layer_for_rel_blocks function mostly
|
||||
let start = Key::MIN;
|
||||
let ImageLayerCreationOutcome {
|
||||
image,
|
||||
unfinished_image_layer,
|
||||
next_start_key: _,
|
||||
} = self
|
||||
.timeline
|
||||
@@ -3212,7 +3212,10 @@ impl TimelineAdaptor {
|
||||
)
|
||||
.await?;
|
||||
|
||||
if let Some(image_layer) = image {
|
||||
if let Some(image_layer_writer) = unfinished_image_layer {
|
||||
let (desc, path) = image_layer_writer.finish(ctx).await?;
|
||||
let image_layer =
|
||||
Layer::finish_creating(self.timeline.conf, &self.timeline, desc, &path)?;
|
||||
self.new_images.push(image_layer);
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user