fix(pageserver): make image split layer writer finish atomic (#8841)

Part of https://github.com/neondatabase/neon/issues/8836

## Summary of changes

This pull request makes the image layer split writer atomic when
finishing the layers. All the produced layers either finish at the same
time, or discard at the same time. Note that this does not prevent
atomicity when crash, but anyways, it will be cleaned up on pageserver
restart.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
Co-authored-by: Christian Schwarz <christian@neon.tech>
This commit is contained in:
Alex Chi Z.
2024-10-21 10:59:48 -04:00
committed by GitHub
parent 2dcac94194
commit aca81f5fa4
4 changed files with 149 additions and 112 deletions

View File

@@ -515,8 +515,8 @@ impl DeltaLayerWriterInner {
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
let temp_path = self.path.clone();
let result = self.finish0(key_end, ctx).await;
if result.is_err() {
tracing::info!(%temp_path, "cleaning up temporary file after error during writing");
if let Err(ref e) = result {
tracing::info!(%temp_path, "cleaning up temporary file after error during writing: {e}");
if let Err(e) = std::fs::remove_file(&temp_path) {
tracing::warn!(error=%e, %temp_path, "error cleaning up temporary layer file after error during writing");
}

View File

@@ -827,6 +827,25 @@ impl ImageLayerWriterInner {
self,
ctx: &RequestContext,
end_key: Option<Key>,
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
let temp_path = self.path.clone();
let result = self.finish0(ctx, end_key).await;
if let Err(ref e) = result {
tracing::info!(%temp_path, "cleaning up temporary file after error during writing: {e}");
if let Err(e) = std::fs::remove_file(&temp_path) {
tracing::warn!(error=%e, %temp_path, "error cleaning up temporary layer file after error during writing");
}
}
result
}
///
/// Finish writing the image layer.
///
async fn finish0(
self,
ctx: &RequestContext,
end_key: Option<Key>,
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
let index_start_blk = self.blob_writer.size().div_ceil(PAGE_SZ as u64) as u32;

View File

@@ -42,7 +42,7 @@ impl SplitWriterResult {
pub struct SplitImageLayerWriter {
inner: ImageLayerWriter,
target_layer_size: u64,
generated_layers: Vec<SplitWriterResult>,
generated_layer_writers: Vec<(ImageLayerWriter, PersistentLayerKey)>,
conf: &'static PageServerConf,
timeline_id: TimelineId,
tenant_shard_id: TenantShardId,
@@ -71,7 +71,7 @@ impl SplitImageLayerWriter {
ctx,
)
.await?,
generated_layers: Vec::new(),
generated_layer_writers: Vec::new(),
conf,
timeline_id,
tenant_shard_id,
@@ -80,18 +80,12 @@ impl SplitImageLayerWriter {
})
}
pub async fn put_image_with_discard_fn<D, F>(
pub async fn put_image(
&mut self,
key: Key,
img: Bytes,
tline: &Arc<Timeline>,
ctx: &RequestContext,
discard: D,
) -> anyhow::Result<()>
where
D: FnOnce(&PersistentLayerKey) -> F,
F: Future<Output = bool>,
{
) -> anyhow::Result<()> {
// The current estimation is an upper bound of the space that the key/image could take
// because we did not consider compression in this estimation. The resulting image layer
// could be smaller than the target size.
@@ -108,72 +102,83 @@ impl SplitImageLayerWriter {
ctx,
)
.await?;
let prev_image_writer = std::mem::replace(&mut self.inner, next_image_writer);
let layer_key = PersistentLayerKey {
key_range: self.start_key..key,
lsn_range: PersistentLayerDesc::image_layer_lsn_range(self.lsn),
is_delta: false,
};
let prev_image_writer = std::mem::replace(&mut self.inner, next_image_writer);
self.start_key = key;
if discard(&layer_key).await {
drop(prev_image_writer);
self.generated_layers
.push(SplitWriterResult::Discarded(layer_key));
} else {
let (desc, path) = prev_image_writer.finish_with_end_key(key, ctx).await?;
let layer = Layer::finish_creating(self.conf, tline, desc, &path)?;
self.generated_layers
.push(SplitWriterResult::Produced(layer));
}
self.generated_layer_writers
.push((prev_image_writer, layer_key));
}
self.inner.put_image(key, img, ctx).await
}
#[cfg(test)]
pub async fn put_image(
&mut self,
key: Key,
img: Bytes,
tline: &Arc<Timeline>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
self.put_image_with_discard_fn(key, img, tline, ctx, |_| async { false })
.await
}
pub(crate) async fn finish_with_discard_fn<D, F>(
self,
tline: &Arc<Timeline>,
ctx: &RequestContext,
end_key: Key,
discard: D,
discard_fn: D,
) -> anyhow::Result<Vec<SplitWriterResult>>
where
D: FnOnce(&PersistentLayerKey) -> F,
D: Fn(&PersistentLayerKey) -> F,
F: Future<Output = bool>,
{
let Self {
mut generated_layers,
mut generated_layer_writers,
inner,
..
} = self;
if inner.num_keys() == 0 {
return Ok(generated_layers);
if inner.num_keys() != 0 {
let layer_key = PersistentLayerKey {
key_range: self.start_key..end_key,
lsn_range: PersistentLayerDesc::image_layer_lsn_range(self.lsn),
is_delta: false,
};
generated_layer_writers.push((inner, layer_key));
}
let layer_key = PersistentLayerKey {
key_range: self.start_key..end_key,
lsn_range: PersistentLayerDesc::image_layer_lsn_range(self.lsn),
is_delta: false,
let clean_up_layers = |generated_layers: Vec<SplitWriterResult>| {
for produced_layer in generated_layers {
if let SplitWriterResult::Produced(image_layer) = produced_layer {
let layer: Layer = image_layer.into();
layer.delete_on_drop();
}
}
};
if discard(&layer_key).await {
generated_layers.push(SplitWriterResult::Discarded(layer_key));
} else {
let (desc, path) = inner.finish_with_end_key(end_key, ctx).await?;
let layer = Layer::finish_creating(self.conf, tline, desc, &path)?;
generated_layers.push(SplitWriterResult::Produced(layer));
// BEGIN: catch every error and do the recovery in the below section
let mut generated_layers = Vec::new();
for (inner, layer_key) in generated_layer_writers {
if discard_fn(&layer_key).await {
generated_layers.push(SplitWriterResult::Discarded(layer_key));
} else {
let layer = match inner
.finish_with_end_key(layer_key.key_range.end, ctx)
.await
{
Ok((desc, path)) => {
match Layer::finish_creating(self.conf, tline, desc, &path) {
Ok(layer) => layer,
Err(e) => {
tokio::fs::remove_file(&path).await.ok();
clean_up_layers(generated_layers);
return Err(e);
}
}
}
Err(e) => {
// ImageLayerWriter::finish will clean up the temporary layer if anything goes wrong,
// so we don't need to remove it by ourselves.
clean_up_layers(generated_layers);
return Err(e);
}
};
generated_layers.push(SplitWriterResult::Produced(layer));
}
}
// END: catch every error and do the recovery in the above section
Ok(generated_layers)
}
@@ -187,11 +192,6 @@ impl SplitImageLayerWriter {
self.finish_with_discard_fn(tline, ctx, end_key, |_| async { false })
.await
}
/// This function will be deprecated with #8841.
pub(crate) fn take(self) -> anyhow::Result<(Vec<SplitWriterResult>, ImageLayerWriter)> {
Ok((self.generated_layers, self.inner))
}
}
/// A delta writer that takes key-lsn-values and produces multiple delta layers.
@@ -296,8 +296,16 @@ impl SplitDeltaLayerWriter {
self.generated_layers
.push(SplitWriterResult::Discarded(layer_key));
} else {
// `finish` will remove the file if anything goes wrong, while we need to handle deleting temporary
// files for `finish_creating`.
let (desc, path) = prev_delta_writer.finish(key, ctx).await?;
let delta_layer = Layer::finish_creating(self.conf, tline, desc, &path)?;
let delta_layer = match Layer::finish_creating(self.conf, tline, desc, &path) {
Ok(layer) => layer,
Err(e) => {
tokio::fs::remove_file(&path).await.ok();
return Err(e);
}
};
self.generated_layers
.push(SplitWriterResult::Produced(delta_layer));
}
@@ -357,8 +365,16 @@ impl SplitDeltaLayerWriter {
if discard(&layer_key).await {
generated_layers.push(SplitWriterResult::Discarded(layer_key));
} else {
// `finish` will remove the file if anything goes wrong, while we need to handle deleting temporary
// files for `finish_creating`.
let (desc, path) = inner.finish(end_key, ctx).await?;
let delta_layer = Layer::finish_creating(self.conf, tline, desc, &path)?;
let delta_layer = match Layer::finish_creating(self.conf, tline, desc, &path) {
Ok(layer) => layer,
Err(e) => {
tokio::fs::remove_file(&path).await.ok();
return Err(e);
}
};
generated_layers.push(SplitWriterResult::Produced(delta_layer));
}
Ok(generated_layers)
@@ -447,7 +463,7 @@ mod tests {
.unwrap();
image_writer
.put_image(get_key(0), get_img(0), &tline, &ctx)
.put_image(get_key(0), get_img(0), &ctx)
.await
.unwrap();
let layers = image_writer
@@ -486,14 +502,18 @@ mod tests {
#[tokio::test]
async fn write_split() {
// Test the split writer with retaining all the layers we have produced (discard=false)
write_split_helper("split_writer_write_split", false).await;
}
#[tokio::test]
async fn write_split_discard() {
write_split_helper("split_writer_write_split_discard", false).await;
// Test the split writer with discarding all the layers we have produced (discard=true)
write_split_helper("split_writer_write_split_discard", true).await;
}
/// Test the image+delta writer by writing a large number of images and deltas. If discard is
/// set to true, all layers will be discarded.
async fn write_split_helper(harness_name: &'static str, discard: bool) {
let harness = TenantHarness::create(harness_name).await.unwrap();
let (tenant, ctx) = harness.load().await;
@@ -527,9 +547,7 @@ mod tests {
for i in 0..N {
let i = i as u32;
image_writer
.put_image_with_discard_fn(get_key(i), get_large_img(), &tline, &ctx, |_| async {
discard
})
.put_image(get_key(i), get_large_img(), &ctx)
.await
.unwrap();
delta_writer
@@ -545,51 +563,54 @@ mod tests {
.unwrap();
}
let image_layers = image_writer
.finish(&tline, &ctx, get_key(N as u32))
.finish_with_discard_fn(&tline, &ctx, get_key(N as u32), |_| async { discard })
.await
.unwrap();
let delta_layers = delta_writer.finish(&tline, &ctx).await.unwrap();
if discard {
for layer in image_layers {
layer.into_discarded_layer();
}
for layer in delta_layers {
layer.into_discarded_layer();
}
} else {
let image_layers = image_layers
.into_iter()
.map(|x| x.into_resident_layer())
.collect_vec();
let delta_layers = delta_layers
.into_iter()
.map(|x| x.into_resident_layer())
.collect_vec();
assert_eq!(image_layers.len(), N / 512 + 1);
assert_eq!(delta_layers.len(), N / 512 + 1);
assert_eq!(
delta_layers.first().unwrap().layer_desc().key_range.start,
get_key(0)
);
assert_eq!(
delta_layers.last().unwrap().layer_desc().key_range.end,
get_key(N as u32)
);
for idx in 0..image_layers.len() {
assert_ne!(image_layers[idx].layer_desc().key_range.start, Key::MIN);
assert_ne!(image_layers[idx].layer_desc().key_range.end, Key::MAX);
assert_ne!(delta_layers[idx].layer_desc().key_range.start, Key::MIN);
assert_ne!(delta_layers[idx].layer_desc().key_range.end, Key::MAX);
if idx > 0 {
assert_eq!(
image_layers[idx - 1].layer_desc().key_range.end,
image_layers[idx].layer_desc().key_range.start
);
assert_eq!(
delta_layers[idx - 1].layer_desc().key_range.end,
delta_layers[idx].layer_desc().key_range.start
);
let delta_layers = delta_writer
.finish_with_discard_fn(&tline, &ctx, |_| async { discard })
.await
.unwrap();
let image_layers = image_layers
.into_iter()
.map(|x| {
if discard {
x.into_discarded_layer()
} else {
x.into_resident_layer().layer_desc().key()
}
})
.collect_vec();
let delta_layers = delta_layers
.into_iter()
.map(|x| {
if discard {
x.into_discarded_layer()
} else {
x.into_resident_layer().layer_desc().key()
}
})
.collect_vec();
assert_eq!(image_layers.len(), N / 512 + 1);
assert_eq!(delta_layers.len(), N / 512 + 1);
assert_eq!(delta_layers.first().unwrap().key_range.start, get_key(0));
assert_eq!(
delta_layers.last().unwrap().key_range.end,
get_key(N as u32)
);
for idx in 0..image_layers.len() {
assert_ne!(image_layers[idx].key_range.start, Key::MIN);
assert_ne!(image_layers[idx].key_range.end, Key::MAX);
assert_ne!(delta_layers[idx].key_range.start, Key::MIN);
assert_ne!(delta_layers[idx].key_range.end, Key::MAX);
if idx > 0 {
assert_eq!(
image_layers[idx - 1].key_range.end,
image_layers[idx].key_range.start
);
assert_eq!(
delta_layers[idx - 1].key_range.end,
delta_layers[idx].key_range.start
);
}
}
}
@@ -629,11 +650,11 @@ mod tests {
.unwrap();
image_writer
.put_image(get_key(0), get_img(0), &tline, &ctx)
.put_image(get_key(0), get_img(0), &ctx)
.await
.unwrap();
image_writer
.put_image(get_key(1), get_large_img(), &tline, &ctx)
.put_image(get_key(1), get_large_img(), &ctx)
.await
.unwrap();
let layers = image_writer

View File

@@ -141,9 +141,7 @@ impl KeyHistoryRetention {
};
stat.produce_image_key(img);
if let Some(image_writer) = image_writer.as_mut() {
image_writer
.put_image_with_discard_fn(key, img.clone(), tline, ctx, discard)
.await?;
image_writer.put_image(key, img.clone(), ctx).await?;
} else {
delta_writer
.put_value_with_discard_fn(
@@ -2041,8 +2039,7 @@ impl Timeline {
.finish_with_discard_fn(self, ctx, Key::MAX, discard)
.await?
} else {
let (layers, _) = writer.take()?;
assert!(layers.is_empty(), "image layers produced in dry run mode?");
drop(writer);
Vec::new()
}
} else {