Bubble up BlobWriterError to users of BlobWriter

Co-Authored-By: christian@neon.tech <christian@neon.tech>
This commit is contained in:
Devin AI
2025-05-06 14:56:28 +00:00
parent ee7479cb68
commit f8dffb62cf
5 changed files with 130 additions and 51 deletions

View File

@@ -18,6 +18,8 @@ use crate::config::PageServerConf;
use crate::context::RequestContext;
use crate::tenant::Timeline;
use crate::tenant::storage_layer::Layer;
use crate::tenant::storage_layer::delta_layer::DeltaLayerWriterError;
use crate::tenant::storage_layer::image_layer::ImageLayerWriterError;
pub(crate) enum BatchWriterResult {
Produced(ResidentLayer),
@@ -139,14 +141,24 @@ impl BatchLayerWriter {
generated_layers.push(BatchWriterResult::Discarded(layer_key));
} else {
let res = match inner {
LayerWriterWrapper::Delta(writer) => {
writer.finish(layer_key.key_range.end, ctx).await
}
LayerWriterWrapper::Image(writer) => {
writer
.finish_with_end_key(layer_key.key_range.end, ctx)
.await
}
LayerWriterWrapper::Delta(writer) => writer
.finish(layer_key.key_range.end, ctx)
.await
.map_err(|e| match e {
DeltaLayerWriterError::Cancelled => {
anyhow::anyhow!("flush task cancelled")
}
DeltaLayerWriterError::Other(err) => err,
}),
LayerWriterWrapper::Image(writer) => writer
.finish_with_end_key(layer_key.key_range.end, ctx)
.await
.map_err(|e| match e {
ImageLayerWriterError::Cancelled => {
anyhow::anyhow!("flush task cancelled")
}
ImageLayerWriterError::Other(err) => err,
}),
};
let layer = match res {
Ok((desc, path)) => {

View File

@@ -545,7 +545,7 @@ impl DeltaLayerWriterInner {
self,
key_end: Key,
ctx: &RequestContext,
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
) -> Result<(PersistentLayerDesc, Utf8PathBuf), DeltaLayerWriterError> {
let index_start_blk = self.blob_writer.size().div_ceil(PAGE_SZ as u64) as u32;
let file = self
@@ -554,17 +554,24 @@ impl DeltaLayerWriterInner {
BufferedWriterShutdownMode::ZeroPadToNextMultiple(PAGE_SZ),
ctx,
)
.await?;
.await
.map_err(|e| match e {
BlobWriterError::Cancelled => DeltaLayerWriterError::Cancelled,
BlobWriterError::Other(err) => DeltaLayerWriterError::Other(err),
})?;
// Write out the index
let (index_root_blk, block_buf) = self.tree.finish()?;
let (index_root_blk, block_buf) = self
.tree
.finish()
.map_err(|e| DeltaLayerWriterError::Other(anyhow::anyhow!(e)))?;
let mut offset = index_start_blk as u64 * PAGE_SZ as u64;
// TODO(yuchen): https://github.com/neondatabase/neon/issues/10092
// Should we just replace BlockBuf::blocks with one big buffer
for buf in block_buf.blocks {
let (_buf, res) = file.write_all_at(buf.slice_len(), offset, ctx).await;
res?;
res.map_err(|e| DeltaLayerWriterError::Other(anyhow::anyhow!(e)))?;
offset += PAGE_SZ as u64;
}
assert!(self.lsn_range.start < self.lsn_range.end);
@@ -581,24 +588,29 @@ impl DeltaLayerWriterInner {
};
// Writes summary at the first block (offset 0).
let buf = summary.ser_into_page()?;
let buf = summary
.ser_into_page()
.map_err(|e| DeltaLayerWriterError::Other(anyhow::anyhow!(e)))?;
let (_buf, res) = file.write_all_at(buf.slice_len(), 0, ctx).await;
res?;
res.map_err(|e| DeltaLayerWriterError::Other(anyhow::anyhow!(e)))?;
let metadata = file
.metadata()
.await
.context("get file metadata to determine size")?;
let metadata = file.metadata().await.map_err(|e| {
DeltaLayerWriterError::Other(anyhow::anyhow!(
"get file metadata to determine size: {}",
e
))
})?;
// 5GB limit for objects without multipart upload (which we don't want to use)
// Make it a little bit below to account for differing GB units
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/upload-objects.html
ensure!(
metadata.len() <= S3_UPLOAD_LIMIT,
"Created delta layer file at {} of size {} above limit {S3_UPLOAD_LIMIT}!",
file.path(),
metadata.len()
);
if metadata.len() > S3_UPLOAD_LIMIT {
return Err(DeltaLayerWriterError::Other(anyhow::anyhow!(
"Created delta layer file at {} of size {} above limit {S3_UPLOAD_LIMIT}!",
file.path(),
metadata.len()
)));
}
// Note: Because we opened the file in write-only mode, we cannot
// reuse the same VirtualFile for reading later. That's why we don't
@@ -613,9 +625,9 @@ impl DeltaLayerWriterInner {
);
// fsync the file
file.sync_all()
.await
.maybe_fatal_err("delta_layer sync_all")?;
file.sync_all().await.map_err(|e| {
DeltaLayerWriterError::Other(anyhow::anyhow!("delta_layer sync_all: {}", e))
})?;
trace!("created delta layer {}", self.path);
@@ -737,7 +749,7 @@ impl DeltaLayerWriter {
mut self,
key_end: Key,
ctx: &RequestContext,
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
) -> Result<(PersistentLayerDesc, Utf8PathBuf), DeltaLayerWriterError> {
self.inner.take().unwrap().finish(key_end, ctx).await
}
@@ -751,6 +763,14 @@ impl DeltaLayerWriter {
}
}
#[derive(Debug, thiserror::Error)]
pub enum DeltaLayerWriterError {
#[error("flush task cancelled")]
Cancelled,
#[error(transparent)]
Other(anyhow::Error),
}
#[derive(thiserror::Error, Debug)]
pub enum RewriteSummaryError {
#[error("magic mismatch")]

View File

@@ -342,6 +342,14 @@ impl ImageLayer {
}
}
#[derive(Debug, thiserror::Error)]
pub enum ImageLayerWriterError {
#[error("flush task cancelled")]
Cancelled,
#[error(transparent)]
Other(anyhow::Error),
}
#[derive(thiserror::Error, Debug)]
pub enum RewriteSummaryError {
#[error("magic mismatch")]
@@ -931,7 +939,7 @@ impl ImageLayerWriterInner {
self,
ctx: &RequestContext,
end_key: Option<Key>,
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
) -> Result<(PersistentLayerDesc, Utf8PathBuf), ImageLayerWriterError> {
let index_start_blk = self.blob_writer.size().div_ceil(PAGE_SZ as u64) as u32;
// Calculate compression ratio
@@ -954,17 +962,24 @@ impl ImageLayerWriterInner {
BufferedWriterShutdownMode::ZeroPadToNextMultiple(PAGE_SZ),
ctx,
)
.await?;
.await
.map_err(|e| match e {
BlobWriterError::Cancelled => ImageLayerWriterError::Cancelled,
BlobWriterError::Other(err) => ImageLayerWriterError::Other(err),
})?;
// Write out the index
let mut offset = index_start_blk as u64 * PAGE_SZ as u64;
let (index_root_blk, block_buf) = self.tree.finish()?;
let (index_root_blk, block_buf) = self
.tree
.finish()
.map_err(|e| ImageLayerWriterError::Other(anyhow::anyhow!(e)))?;
// TODO(yuchen): https://github.com/neondatabase/neon/issues/10092
// Should we just replace BlockBuf::blocks with one big buffer?
for buf in block_buf.blocks {
let (_buf, res) = file.write_all_at(buf.slice_len(), offset, ctx).await;
res?;
res.map_err(|e| ImageLayerWriterError::Other(anyhow::anyhow!(e)))?;
offset += PAGE_SZ as u64;
}
@@ -987,14 +1002,18 @@ impl ImageLayerWriterInner {
};
// Writes summary at the first block (offset 0).
let buf = summary.ser_into_page()?;
let buf = summary
.ser_into_page()
.map_err(|e| ImageLayerWriterError::Other(anyhow::anyhow!(e)))?;
let (_buf, res) = file.write_all_at(buf.slice_len(), 0, ctx).await;
res?;
res.map_err(|e| ImageLayerWriterError::Other(anyhow::anyhow!(e)))?;
let metadata = file
.metadata()
.await
.context("get metadata to determine file size")?;
let metadata = file.metadata().await.map_err(|e| {
ImageLayerWriterError::Other(anyhow::anyhow!(
"get metadata to determine file size: {}",
e
))
})?;
let desc = PersistentLayerDesc::new_img(
self.tenant_shard_id,
@@ -1017,9 +1036,9 @@ impl ImageLayerWriterInner {
// set inner.file here. The first read will have to re-open it.
// fsync the file
file.sync_all()
.await
.maybe_fatal_err("image_layer sync_all")?;
file.sync_all().await.map_err(|e| {
ImageLayerWriterError::Other(anyhow::anyhow!("image_layer sync_all: {}", e))
})?;
trace!("created image layer {}", self.path);
@@ -1138,7 +1157,7 @@ impl ImageLayerWriter {
pub(crate) async fn finish(
mut self,
ctx: &RequestContext,
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
) -> Result<(PersistentLayerDesc, Utf8PathBuf), ImageLayerWriterError> {
self.inner.take().unwrap().finish(ctx, None).await
}
@@ -1147,7 +1166,7 @@ impl ImageLayerWriter {
mut self,
end_key: Key,
ctx: &RequestContext,
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
) -> Result<(PersistentLayerDesc, Utf8PathBuf), ImageLayerWriterError> {
self.inner.take().unwrap().finish(ctx, Some(end_key)).await
}
}

View File

@@ -15,6 +15,8 @@ use super::{
GetVectoredError, ImageLayerCreationMode, LastImageLayerCreationStatus, RecordedDuration,
Timeline,
};
use crate::tenant::storage_layer::delta_layer::DeltaLayerWriterError;
use crate::tenant::storage_layer::image_layer::ImageLayerWriterError;
use crate::tenant::timeline::DeltaEntry;
use crate::walredo::RedoAttemptType;
@@ -1569,10 +1571,12 @@ impl Timeline {
.await?;
if keys_written > 0 {
let (desc, path) = image_layer_writer
.finish(ctx)
.await
.map_err(CompactionError::Other)?;
let (desc, path) = image_layer_writer.finish(ctx).await.map_err(|e| match e {
ImageLayerWriterError::Cancelled => {
CompactionError::Other(anyhow::anyhow!("flush task cancelled"))
}
ImageLayerWriterError::Other(err) => CompactionError::Other(err),
})?;
let new_layer = Layer::finish_creating(self.conf, self, desc, &path)
.map_err(CompactionError::Other)?;
info!(layer=%new_layer, "rewrote layer, {} -> {} bytes",
@@ -2140,7 +2144,12 @@ impl Timeline {
.unwrap()
.finish(prev_key.unwrap().next(), ctx)
.await
.map_err(CompactionError::Other)?;
.map_err(|e| match e {
DeltaLayerWriterError::Cancelled => {
CompactionError::Other(anyhow::anyhow!("flush task cancelled"))
}
DeltaLayerWriterError::Other(err) => CompactionError::Other(err),
})?;
let new_delta = Layer::finish_creating(self.conf, self, desc, &path)
.map_err(CompactionError::Other)?;
@@ -2217,7 +2226,12 @@ impl Timeline {
let (desc, path) = writer
.finish(prev_key.unwrap().next(), ctx)
.await
.map_err(CompactionError::Other)?;
.map_err(|e| match e {
DeltaLayerWriterError::Cancelled => {
CompactionError::Other(anyhow::anyhow!("flush task cancelled"))
}
DeltaLayerWriterError::Other(err) => CompactionError::Other(err),
})?;
let new_delta = Layer::finish_creating(self.conf, self, desc, &path)
.map_err(CompactionError::Other)?;
new_layers.push(new_delta);
@@ -4253,7 +4267,15 @@ impl TimelineAdaptor {
unfinished_image_layer,
} = outcome
{
let (desc, path) = unfinished_image_layer.finish(ctx).await?;
let (desc, path) = unfinished_image_layer
.finish(ctx)
.await
.map_err(|e| match e {
ImageLayerWriterError::Cancelled => {
CreateImageLayersError::Other(anyhow::anyhow!("flush task cancelled"))
}
ImageLayerWriterError::Other(err) => CreateImageLayersError::Other(err),
})?;
let image_layer =
Layer::finish_creating(self.timeline.conf, &self.timeline, desc, &path)?;
self.new_images.push(image_layer);

View File

@@ -1,6 +1,7 @@
use std::collections::HashSet;
use std::sync::Arc;
use crate::tenant::storage_layer::delta_layer::DeltaLayerWriterError;
use anyhow::Context;
use bytes::Bytes;
use http_utils::error::ApiError;
@@ -816,7 +817,12 @@ async fn copy_lsn_prefix(
let (desc, path) = writer
.finish(reused_highest_key, ctx)
.await
.map_err(Error::Prepare)?;
.map_err(|e| match e {
DeltaLayerWriterError::Cancelled => {
Error::Prepare(anyhow::anyhow!("flush task cancelled"))
}
DeltaLayerWriterError::Other(err) => Error::Prepare(err),
})?;
let copied = Layer::finish_creating(target_timeline.conf, target_timeline, desc, &path)
.map_err(Error::Prepare)?;