mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-17 02:12:56 +00:00
refactor(pageserver): make image layer creation atomic
Signed-off-by: Alex Chi Z <chi@neon.tech>
This commit is contained in:
@@ -120,6 +120,8 @@ impl BatchLayerWriter {
|
||||
writer.finish(layer_key.key_range.end, ctx).await
|
||||
}
|
||||
LayerWriterWrapper::Image(writer) => {
|
||||
assert_eq!(writer.key_range().start, layer_key.key_range.start);
|
||||
assert_eq!(writer.lsn(), layer_key.lsn_range.start);
|
||||
writer
|
||||
.finish_with_end_key(layer_key.key_range.end, ctx)
|
||||
.await
|
||||
|
||||
@@ -885,6 +885,7 @@ impl ImageLayerWriterInner {
|
||||
}
|
||||
|
||||
let final_key_range = if let Some(end_key) = end_key {
|
||||
assert!(end_key <= self.key_range.end);
|
||||
self.key_range.start..end_key
|
||||
} else {
|
||||
self.key_range.clone()
|
||||
@@ -1033,6 +1034,14 @@ impl ImageLayerWriter {
|
||||
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
|
||||
self.inner.take().unwrap().finish(ctx, Some(end_key)).await
|
||||
}
|
||||
|
||||
pub(crate) fn key_range(&self) -> Range<Key> {
|
||||
self.inner.as_ref().unwrap().key_range.clone()
|
||||
}
|
||||
|
||||
pub(crate) fn lsn(&self) -> Lsn {
|
||||
self.inner.as_ref().unwrap().lsn
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for ImageLayerWriter {
|
||||
|
||||
@@ -20,6 +20,7 @@ use chrono::{DateTime, Utc};
|
||||
use enumset::EnumSet;
|
||||
use fail::fail_point;
|
||||
use handle::ShardTimelineId;
|
||||
use itertools::Itertools;
|
||||
use offload::OffloadError;
|
||||
use once_cell::sync::Lazy;
|
||||
use pageserver_api::{
|
||||
@@ -65,13 +66,14 @@ use std::{
|
||||
};
|
||||
use std::{pin::pin, sync::OnceLock};
|
||||
|
||||
use crate::pgdatadir_mapping::MAX_AUX_FILE_V2_DELTAS;
|
||||
use crate::{
|
||||
aux_file::AuxFileSizeEstimator,
|
||||
tenant::{
|
||||
config::AttachmentMode,
|
||||
layer_map::{LayerMap, SearchResult},
|
||||
metadata::TimelineMetadata,
|
||||
storage_layer::{inmemory_layer::IndexEntry, PersistentLayerDesc},
|
||||
storage_layer::inmemory_layer::IndexEntry,
|
||||
},
|
||||
walingest::WalLagCooldown,
|
||||
walredo,
|
||||
@@ -104,7 +106,6 @@ use crate::{
|
||||
virtual_file::{MaybeFatalIo, VirtualFile},
|
||||
};
|
||||
use crate::{pgdatadir_mapping::LsnForTimestamp, tenant::tasks::BackgroundLoopKind};
|
||||
use crate::{pgdatadir_mapping::MAX_AUX_FILE_V2_DELTAS, tenant::storage_layer::PersistentLayerKey};
|
||||
use pageserver_api::config::tenant_conf_defaults::DEFAULT_PITR_INTERVAL;
|
||||
|
||||
use crate::config::PageServerConf;
|
||||
@@ -142,7 +143,10 @@ use self::walreceiver::{WalReceiver, WalReceiverConf};
|
||||
|
||||
use super::{
|
||||
config::TenantConf,
|
||||
storage_layer::{inmemory_layer, LayerVisibilityHint},
|
||||
storage_layer::{
|
||||
batch_split_writer::{BatchLayerWriter, BatchWriterResult},
|
||||
inmemory_layer, LayerVisibilityHint,
|
||||
},
|
||||
upload_queue::NotInitialized,
|
||||
MaybeOffloaded,
|
||||
};
|
||||
@@ -856,7 +860,7 @@ pub(crate) enum ShutdownMode {
|
||||
}
|
||||
|
||||
struct ImageLayerCreationOutcome {
|
||||
image: Option<ResidentLayer>,
|
||||
image: Option<ImageLayerWriter>,
|
||||
next_start_key: Key,
|
||||
}
|
||||
|
||||
@@ -4053,11 +4057,14 @@ impl Timeline {
|
||||
if wrote_keys {
|
||||
// Normal path: we have written some data into the new image layer for this
|
||||
// partition, so flush it to disk.
|
||||
let (desc, path) = image_layer_writer.finish(ctx).await?;
|
||||
let image_layer = Layer::finish_creating(self.conf, self, desc, &path)?;
|
||||
info!("created image layer for rel {}", image_layer.local_path());
|
||||
info!(
|
||||
"flushed image layer for rel key_start={} key_end={} lsn={}",
|
||||
image_layer_writer.key_range().start,
|
||||
image_layer_writer.key_range().end,
|
||||
image_layer_writer.lsn()
|
||||
);
|
||||
Ok(ImageLayerCreationOutcome {
|
||||
image: Some(image_layer),
|
||||
image: Some(image_layer_writer),
|
||||
next_start_key: img_range.end,
|
||||
})
|
||||
} else {
|
||||
@@ -4143,14 +4150,14 @@ impl Timeline {
|
||||
if wrote_any_image {
|
||||
// Normal path: we have written some data into the new image layer for this
|
||||
// partition, so flush it to disk.
|
||||
let (desc, path) = image_layer_writer.finish(ctx).await?;
|
||||
let image_layer = Layer::finish_creating(self.conf, self, desc, &path)?;
|
||||
info!(
|
||||
"created image layer for metadata {}",
|
||||
image_layer.local_path()
|
||||
"flushed image layer for metadata key_start={} key_end={} lsn={}",
|
||||
image_layer_writer.key_range().start,
|
||||
image_layer_writer.key_range().end,
|
||||
image_layer_writer.lsn()
|
||||
);
|
||||
Ok(ImageLayerCreationOutcome {
|
||||
image: Some(image_layer),
|
||||
image: Some(image_layer_writer),
|
||||
next_start_key: img_range.end,
|
||||
})
|
||||
} else {
|
||||
@@ -4227,7 +4234,6 @@ impl Timeline {
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Vec<ResidentLayer>, CreateImageLayersError> {
|
||||
let timer = self.metrics.create_images_time_histo.start_timer();
|
||||
let mut image_layers = Vec::new();
|
||||
|
||||
// We need to avoid holes between generated image layers.
|
||||
// Otherwise LayerMap::image_layer_exists will return false if key range of some layer is covered by more than one
|
||||
@@ -4242,6 +4248,8 @@ impl Timeline {
|
||||
|
||||
let check_for_image_layers = self.should_check_if_image_layers_required(lsn);
|
||||
|
||||
let mut batch_image_writer = BatchLayerWriter::new(self.conf).await?;
|
||||
|
||||
for partition in partitioning.parts.iter() {
|
||||
if self.cancel.is_cancelled() {
|
||||
return Err(CreateImageLayersError::Cancelled);
|
||||
@@ -4272,24 +4280,6 @@ impl Timeline {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
if let ImageLayerCreationMode::Force = mode {
|
||||
// When forced to create image layers, we might try and create them where they already
|
||||
// exist. This mode is only used in tests/debug.
|
||||
let layers = self.layers.read().await;
|
||||
if layers.contains_key(&PersistentLayerKey {
|
||||
key_range: img_range.clone(),
|
||||
lsn_range: PersistentLayerDesc::image_layer_lsn_range(lsn),
|
||||
is_delta: false,
|
||||
}) {
|
||||
tracing::info!(
|
||||
"Skipping image layer at {lsn} {}..{}, already exists",
|
||||
img_range.start,
|
||||
img_range.end
|
||||
);
|
||||
start = img_range.end;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
let image_layer_writer = ImageLayerWriter::new(
|
||||
self.conf,
|
||||
@@ -4323,7 +4313,11 @@ impl Timeline {
|
||||
.await?;
|
||||
|
||||
start = next_start_key;
|
||||
image_layers.extend(image);
|
||||
if let Some(image) = image {
|
||||
let key_range = image.key_range();
|
||||
let lsn = image.lsn();
|
||||
batch_image_writer.add_unfinished_image_writer(image, key_range, lsn);
|
||||
}
|
||||
} else {
|
||||
let ImageLayerCreationOutcome {
|
||||
image,
|
||||
@@ -4340,10 +4334,48 @@ impl Timeline {
|
||||
)
|
||||
.await?;
|
||||
start = next_start_key;
|
||||
image_layers.extend(image);
|
||||
if let Some(image) = image {
|
||||
let key_range = image.key_range();
|
||||
let lsn = image.lsn();
|
||||
batch_image_writer.add_unfinished_image_writer(image, key_range, lsn);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let image_layers = batch_image_writer
|
||||
.finish_with_discard_fn(self, ctx, |key| {
|
||||
// TODO: remove this clone when Rust Edition 2024 is available, closure should capture this
|
||||
// lifetime.
|
||||
let key = key.clone();
|
||||
async move {
|
||||
// When forced to create image layers, we might try and create them where they already
|
||||
// exist. The force mode is only used in tests/debug.
|
||||
let layers = self.layers.read().await;
|
||||
if layers.contains_key(&key) {
|
||||
tracing::info!(
|
||||
"Skipping image layer at {} {}..{}, already exists",
|
||||
key.lsn_range.start,
|
||||
key.key_range.start,
|
||||
key.key_range.end
|
||||
);
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
})
|
||||
.await?;
|
||||
let image_layers = image_layers
|
||||
.into_iter()
|
||||
.filter_map(|x| {
|
||||
if let BatchWriterResult::Produced(x) = x {
|
||||
Some(x)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect_vec();
|
||||
|
||||
let mut guard = self.layers.write().await;
|
||||
|
||||
// FIXME: we could add the images to be uploaded *before* returning from here, but right
|
||||
|
||||
@@ -2461,7 +2461,10 @@ impl TimelineAdaptor {
|
||||
)
|
||||
.await?;
|
||||
|
||||
if let Some(image_layer) = image {
|
||||
if let Some(image_layer_writer) = image {
|
||||
let (desc, path) = image_layer_writer.finish(ctx).await?;
|
||||
let image_layer =
|
||||
Layer::finish_creating(self.timeline.conf, &self.timeline, desc, &path)?;
|
||||
self.new_images.push(image_layer);
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user