refactor(pageserver): refactor split writers using batch layer writer (#9493)

part of https://github.com/neondatabase/neon/issues/9114,
https://github.com/neondatabase/neon/issues/8836,
https://github.com/neondatabase/neon/issues/8362

The split layer writer code can be used in a more general way: the
caller puts unfinished writers into the batch layer writer and let batch
layer writer to ensure the atomicity of the layer produces.

## Summary of changes

* Add batch layer writer, which atomically finishes the layers.
`BatchLayerWriter::finish` is simply a copy-paste from previous split
layer writers.
* Refactor split writers to use the batch layer writer.
* The current split writer tests cover all code path of batch layer
writer.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
This commit is contained in:
Alex Chi Z.
2024-10-24 10:49:54 -04:00
committed by GitHub
parent b8a311131e
commit fb0406e9d2
4 changed files with 160 additions and 140 deletions

View File

@@ -1,5 +1,6 @@
//! Common traits and structs for layers
pub mod batch_split_writer;
pub mod delta_layer;
pub mod filter_iterator;
pub mod image_layer;
@@ -8,7 +9,6 @@ pub(crate) mod layer;
mod layer_desc;
mod layer_name;
pub mod merge_iterator;
pub mod split_writer;
use crate::context::{AccessStatsBehavior, RequestContext};
use crate::repository::Value;

View File

@@ -12,41 +12,154 @@ use super::{
DeltaLayerWriter, ImageLayerWriter, PersistentLayerDesc, PersistentLayerKey, ResidentLayer,
};
pub(crate) enum SplitWriterResult {
pub(crate) enum BatchWriterResult {
Produced(ResidentLayer),
Discarded(PersistentLayerKey),
}
#[cfg(test)]
impl SplitWriterResult {
impl BatchWriterResult {
fn into_resident_layer(self) -> ResidentLayer {
match self {
SplitWriterResult::Produced(layer) => layer,
SplitWriterResult::Discarded(_) => panic!("unexpected discarded layer"),
BatchWriterResult::Produced(layer) => layer,
BatchWriterResult::Discarded(_) => panic!("unexpected discarded layer"),
}
}
fn into_discarded_layer(self) -> PersistentLayerKey {
match self {
SplitWriterResult::Produced(_) => panic!("unexpected produced layer"),
SplitWriterResult::Discarded(layer) => layer,
BatchWriterResult::Produced(_) => panic!("unexpected produced layer"),
BatchWriterResult::Discarded(layer) => layer,
}
}
}
enum LayerWriterWrapper {
Image(ImageLayerWriter),
Delta(DeltaLayerWriter),
}
/// An layer writer that takes unfinished layers and finish them atomically.
#[must_use]
pub struct BatchLayerWriter {
generated_layer_writers: Vec<(LayerWriterWrapper, PersistentLayerKey)>,
conf: &'static PageServerConf,
}
impl BatchLayerWriter {
pub async fn new(conf: &'static PageServerConf) -> anyhow::Result<Self> {
Ok(Self {
generated_layer_writers: Vec::new(),
conf,
})
}
pub fn add_unfinished_image_writer(
&mut self,
writer: ImageLayerWriter,
key_range: Range<Key>,
lsn: Lsn,
) {
self.generated_layer_writers.push((
LayerWriterWrapper::Image(writer),
PersistentLayerKey {
key_range,
lsn_range: PersistentLayerDesc::image_layer_lsn_range(lsn),
is_delta: false,
},
));
}
pub fn add_unfinished_delta_writer(
&mut self,
writer: DeltaLayerWriter,
key_range: Range<Key>,
lsn_range: Range<Lsn>,
) {
self.generated_layer_writers.push((
LayerWriterWrapper::Delta(writer),
PersistentLayerKey {
key_range,
lsn_range,
is_delta: true,
},
));
}
pub(crate) async fn finish_with_discard_fn<D, F>(
self,
tline: &Arc<Timeline>,
ctx: &RequestContext,
discard_fn: D,
) -> anyhow::Result<Vec<BatchWriterResult>>
where
D: Fn(&PersistentLayerKey) -> F,
F: Future<Output = bool>,
{
let Self {
generated_layer_writers,
..
} = self;
let clean_up_layers = |generated_layers: Vec<BatchWriterResult>| {
for produced_layer in generated_layers {
if let BatchWriterResult::Produced(resident_layer) = produced_layer {
let layer: Layer = resident_layer.into();
layer.delete_on_drop();
}
}
};
// BEGIN: catch every error and do the recovery in the below section
let mut generated_layers: Vec<BatchWriterResult> = Vec::new();
for (inner, layer_key) in generated_layer_writers {
if discard_fn(&layer_key).await {
generated_layers.push(BatchWriterResult::Discarded(layer_key));
} else {
let res = match inner {
LayerWriterWrapper::Delta(writer) => {
writer.finish(layer_key.key_range.end, ctx).await
}
LayerWriterWrapper::Image(writer) => {
writer
.finish_with_end_key(layer_key.key_range.end, ctx)
.await
}
};
let layer = match res {
Ok((desc, path)) => {
match Layer::finish_creating(self.conf, tline, desc, &path) {
Ok(layer) => layer,
Err(e) => {
tokio::fs::remove_file(&path).await.ok();
clean_up_layers(generated_layers);
return Err(e);
}
}
}
Err(e) => {
// Image/DeltaLayerWriter::finish will clean up the temporary layer if anything goes wrong,
// so we don't need to remove the layer we just failed to create by ourselves.
clean_up_layers(generated_layers);
return Err(e);
}
};
generated_layers.push(BatchWriterResult::Produced(layer));
}
}
// END: catch every error and do the recovery in the above section
Ok(generated_layers)
}
}
/// An image writer that takes images and produces multiple image layers.
///
/// The interface does not guarantee atomicity (i.e., if the image layer generation
/// fails, there might be leftover files to be cleaned up)
#[must_use]
pub struct SplitImageLayerWriter {
inner: ImageLayerWriter,
target_layer_size: u64,
generated_layer_writers: Vec<(ImageLayerWriter, PersistentLayerKey)>,
lsn: Lsn,
conf: &'static PageServerConf,
timeline_id: TimelineId,
tenant_shard_id: TenantShardId,
lsn: Lsn,
batches: BatchLayerWriter,
start_key: Key,
}
@@ -71,10 +184,10 @@ impl SplitImageLayerWriter {
ctx,
)
.await?,
generated_layer_writers: Vec::new(),
conf,
timeline_id,
tenant_shard_id,
batches: BatchLayerWriter::new(conf).await?,
lsn,
start_key,
})
@@ -102,16 +215,13 @@ impl SplitImageLayerWriter {
ctx,
)
.await?;
let layer_key = PersistentLayerKey {
key_range: self.start_key..key,
lsn_range: PersistentLayerDesc::image_layer_lsn_range(self.lsn),
is_delta: false,
};
let prev_image_writer = std::mem::replace(&mut self.inner, next_image_writer);
self.batches.add_unfinished_image_writer(
prev_image_writer,
self.start_key..key,
self.lsn,
);
self.start_key = key;
self.generated_layer_writers
.push((prev_image_writer, layer_key));
}
self.inner.put_image(key, img, ctx).await
}
@@ -122,64 +232,18 @@ impl SplitImageLayerWriter {
ctx: &RequestContext,
end_key: Key,
discard_fn: D,
) -> anyhow::Result<Vec<SplitWriterResult>>
) -> anyhow::Result<Vec<BatchWriterResult>>
where
D: Fn(&PersistentLayerKey) -> F,
F: Future<Output = bool>,
{
let Self {
mut generated_layer_writers,
inner,
..
mut batches, inner, ..
} = self;
if inner.num_keys() != 0 {
let layer_key = PersistentLayerKey {
key_range: self.start_key..end_key,
lsn_range: PersistentLayerDesc::image_layer_lsn_range(self.lsn),
is_delta: false,
};
generated_layer_writers.push((inner, layer_key));
batches.add_unfinished_image_writer(inner, self.start_key..end_key, self.lsn);
}
let clean_up_layers = |generated_layers: Vec<SplitWriterResult>| {
for produced_layer in generated_layers {
if let SplitWriterResult::Produced(image_layer) = produced_layer {
let layer: Layer = image_layer.into();
layer.delete_on_drop();
}
}
};
// BEGIN: catch every error and do the recovery in the below section
let mut generated_layers = Vec::new();
for (inner, layer_key) in generated_layer_writers {
if discard_fn(&layer_key).await {
generated_layers.push(SplitWriterResult::Discarded(layer_key));
} else {
let layer = match inner
.finish_with_end_key(layer_key.key_range.end, ctx)
.await
{
Ok((desc, path)) => {
match Layer::finish_creating(self.conf, tline, desc, &path) {
Ok(layer) => layer,
Err(e) => {
tokio::fs::remove_file(&path).await.ok();
clean_up_layers(generated_layers);
return Err(e);
}
}
}
Err(e) => {
// ImageLayerWriter::finish will clean up the temporary layer if anything goes wrong,
// so we don't need to remove the layer we just failed to create by ourselves.
clean_up_layers(generated_layers);
return Err(e);
}
};
generated_layers.push(SplitWriterResult::Produced(layer));
}
}
// END: catch every error and do the recovery in the above section
Ok(generated_layers)
batches.finish_with_discard_fn(tline, ctx, discard_fn).await
}
#[cfg(test)]
@@ -188,7 +252,7 @@ impl SplitImageLayerWriter {
tline: &Arc<Timeline>,
ctx: &RequestContext,
end_key: Key,
) -> anyhow::Result<Vec<SplitWriterResult>> {
) -> anyhow::Result<Vec<BatchWriterResult>> {
self.finish_with_discard_fn(tline, ctx, end_key, |_| async { false })
.await
}
@@ -196,9 +260,6 @@ impl SplitImageLayerWriter {
/// A delta writer that takes key-lsn-values and produces multiple delta layers.
///
/// The interface does not guarantee atomicity (i.e., if the delta layer generation fails,
/// there might be leftover files to be cleaned up).
///
/// Note that if updates of a single key exceed the target size limit, all of the updates will be batched
/// into a single file. This behavior might change in the future. For reference, the legacy compaction algorithm
/// will split them into multiple files based on size.
@@ -206,12 +267,12 @@ impl SplitImageLayerWriter {
pub struct SplitDeltaLayerWriter {
inner: Option<(Key, DeltaLayerWriter)>,
target_layer_size: u64,
generated_layer_writers: Vec<(DeltaLayerWriter, PersistentLayerKey)>,
conf: &'static PageServerConf,
timeline_id: TimelineId,
tenant_shard_id: TenantShardId,
lsn_range: Range<Lsn>,
last_key_written: Key,
batches: BatchLayerWriter,
}
impl SplitDeltaLayerWriter {
@@ -225,12 +286,12 @@ impl SplitDeltaLayerWriter {
Ok(Self {
target_layer_size,
inner: None,
generated_layer_writers: Vec::new(),
conf,
timeline_id,
tenant_shard_id,
lsn_range,
last_key_written: Key::MIN,
batches: BatchLayerWriter::new(conf).await?,
})
}
@@ -279,13 +340,11 @@ impl SplitDeltaLayerWriter {
.await?;
let (start_key, prev_delta_writer) =
std::mem::replace(&mut self.inner, Some((key, next_delta_writer))).unwrap();
let layer_key = PersistentLayerKey {
key_range: start_key..key,
lsn_range: self.lsn_range.clone(),
is_delta: true,
};
self.generated_layer_writers
.push((prev_delta_writer, layer_key));
self.batches.add_unfinished_delta_writer(
prev_delta_writer,
start_key..key,
self.lsn_range.clone(),
);
} else if inner.estimated_size() >= S3_UPLOAD_LIMIT {
// We have to produce a very large file b/c a key is updated too often.
anyhow::bail!(
@@ -305,64 +364,25 @@ impl SplitDeltaLayerWriter {
tline: &Arc<Timeline>,
ctx: &RequestContext,
discard_fn: D,
) -> anyhow::Result<Vec<SplitWriterResult>>
) -> anyhow::Result<Vec<BatchWriterResult>>
where
D: Fn(&PersistentLayerKey) -> F,
F: Future<Output = bool>,
{
let Self {
mut generated_layer_writers,
inner,
..
mut batches, inner, ..
} = self;
if let Some((start_key, writer)) = inner {
if writer.num_keys() != 0 {
let end_key = self.last_key_written.next();
let layer_key = PersistentLayerKey {
key_range: start_key..end_key,
lsn_range: self.lsn_range.clone(),
is_delta: true,
};
generated_layer_writers.push((writer, layer_key));
batches.add_unfinished_delta_writer(
writer,
start_key..end_key,
self.lsn_range.clone(),
);
}
}
let clean_up_layers = |generated_layers: Vec<SplitWriterResult>| {
for produced_layer in generated_layers {
if let SplitWriterResult::Produced(delta_layer) = produced_layer {
let layer: Layer = delta_layer.into();
layer.delete_on_drop();
}
}
};
// BEGIN: catch every error and do the recovery in the below section
let mut generated_layers = Vec::new();
for (inner, layer_key) in generated_layer_writers {
if discard_fn(&layer_key).await {
generated_layers.push(SplitWriterResult::Discarded(layer_key));
} else {
let layer = match inner.finish(layer_key.key_range.end, ctx).await {
Ok((desc, path)) => {
match Layer::finish_creating(self.conf, tline, desc, &path) {
Ok(layer) => layer,
Err(e) => {
tokio::fs::remove_file(&path).await.ok();
clean_up_layers(generated_layers);
return Err(e);
}
}
}
Err(e) => {
// DeltaLayerWriter::finish will clean up the temporary layer if anything goes wrong,
// so we don't need to remove the layer we just failed to create by ourselves.
clean_up_layers(generated_layers);
return Err(e);
}
};
generated_layers.push(SplitWriterResult::Produced(layer));
}
}
// END: catch every error and do the recovery in the above section
Ok(generated_layers)
batches.finish_with_discard_fn(tline, ctx, discard_fn).await
}
#[cfg(test)]
@@ -370,7 +390,7 @@ impl SplitDeltaLayerWriter {
self,
tline: &Arc<Timeline>,
ctx: &RequestContext,
) -> anyhow::Result<Vec<SplitWriterResult>> {
) -> anyhow::Result<Vec<BatchWriterResult>> {
self.finish_with_discard_fn(tline, ctx, |_| async { false })
.await
}

View File

@@ -1009,7 +1009,7 @@ impl ImageLayerWriter {
self.inner.take().unwrap().finish(ctx, None).await
}
/// Finish writing the image layer with an end key, used in [`super::split_writer::SplitImageLayerWriter`]. The end key determines the end of the image layer's covered range and is exclusive.
/// Finish writing the image layer with an end key, used in [`super::batch_split_writer::SplitImageLayerWriter`]. The end key determines the end of the image layer's covered range and is exclusive.
pub(super) async fn finish_with_end_key(
mut self,
end_key: Key,

View File

@@ -32,11 +32,11 @@ use crate::page_cache;
use crate::statvfs::Statvfs;
use crate::tenant::checks::check_valid_layermap;
use crate::tenant::remote_timeline_client::WaitCompletionError;
use crate::tenant::storage_layer::batch_split_writer::{
BatchWriterResult, SplitDeltaLayerWriter, SplitImageLayerWriter,
};
use crate::tenant::storage_layer::filter_iterator::FilterIterator;
use crate::tenant::storage_layer::merge_iterator::MergeIterator;
use crate::tenant::storage_layer::split_writer::{
SplitDeltaLayerWriter, SplitImageLayerWriter, SplitWriterResult,
};
use crate::tenant::storage_layer::{
AsLayerDesc, PersistentLayerDesc, PersistentLayerKey, ValueReconstructState,
};
@@ -2038,11 +2038,11 @@ impl Timeline {
let produced_image_layers_len = produced_image_layers.len();
for action in produced_delta_layers {
match action {
SplitWriterResult::Produced(layer) => {
BatchWriterResult::Produced(layer) => {
stat.produce_delta_layer(layer.layer_desc().file_size());
compact_to.push(layer);
}
SplitWriterResult::Discarded(l) => {
BatchWriterResult::Discarded(l) => {
keep_layers.insert(l);
stat.discard_delta_layer();
}
@@ -2050,11 +2050,11 @@ impl Timeline {
}
for action in produced_image_layers {
match action {
SplitWriterResult::Produced(layer) => {
BatchWriterResult::Produced(layer) => {
stat.produce_image_layer(layer.layer_desc().file_size());
compact_to.push(layer);
}
SplitWriterResult::Discarded(l) => {
BatchWriterResult::Discarded(l) => {
keep_layers.insert(l);
stat.discard_image_layer();
}