Extend error handling to put_image, put_value_*, and put_batch methods

Co-Authored-By: christian@neon.tech <christian@neon.tech>
This commit is contained in:
Devin AI
2025-05-06 16:29:57 +00:00
parent b419fa9e2f
commit 7a576d723c
6 changed files with 187 additions and 66 deletions

View File

@@ -274,7 +274,13 @@ impl<'a> SplitImageLayerWriter<'a> {
);
self.start_key = key;
}
self.inner.put_image(key, img, ctx).await
self.inner
.put_image(key, img, ctx)
.await
.map_err(|e| match e {
ImageLayerWriterError::Cancelled => anyhow::anyhow!("flush task cancelled"),
ImageLayerWriterError::Other(err) => err,
})
}
pub(crate) async fn finish_with_discard_fn<D, F>(
@@ -417,7 +423,13 @@ impl<'a> SplitDeltaLayerWriter<'a> {
}
self.last_key_written = key;
let (_, inner) = self.inner.as_mut().unwrap();
inner.put_value(key, lsn, val, ctx).await
inner
.put_value(key, lsn, val, ctx)
.await
.map_err(|e| match e {
DeltaLayerWriterError::Cancelled => anyhow::anyhow!("flush task cancelled"),
DeltaLayerWriterError::Other(err) => err,
})
}
pub(crate) async fn finish_with_discard_fn<D, F>(

View File

@@ -484,17 +484,11 @@ impl DeltaLayerWriterInner {
val: Value,
ctx: &RequestContext,
) -> Result<(), DeltaLayerWriterError> {
let val_ser = Value::ser(&val)
.map_err(|e| DeltaLayerWriterError::Other(anyhow::anyhow!(e)))?;
let val_ser =
Value::ser(&val).map_err(|e| DeltaLayerWriterError::Other(anyhow::anyhow!(e)))?;
let (_, res) = self
.put_value_bytes(
key,
lsn,
val_ser.slice_len(),
val.will_init(),
ctx,
)
.put_value_bytes(key, lsn, val_ser.slice_len(), val.will_init(), ctx)
.await;
res
}
@@ -511,29 +505,41 @@ impl DeltaLayerWriterInner {
Buf: IoBuf + Send,
{
if self.lsn_range.start > lsn {
return (val, Err(DeltaLayerWriterError::Other(anyhow::anyhow!(
"lsn_start={}, lsn={}",
self.lsn_range.start,
lsn
))));
return (
val,
Err(DeltaLayerWriterError::Other(anyhow::anyhow!(
"lsn_start={}, lsn={}",
self.lsn_range.start,
lsn
))),
);
}
// We don't want to use compression in delta layer creation
let compression = ImageCompressionAlgorithm::Disabled;
let (val, res) = self
.blob_writer
.write_blob_maybe_compressed(val, ctx, compression)
.await;
let off = match res {
Ok((off, _)) => off,
Err(e) => return (val, Err(match e {
crate::tenant::blob_io::WriteBlobError::Flush(blob_err) => match blob_err {
crate::tenant::blob_io::BlobWriterError::Cancelled => DeltaLayerWriterError::Cancelled,
crate::tenant::blob_io::BlobWriterError::Other(err) => DeltaLayerWriterError::Other(err),
},
other => DeltaLayerWriterError::Other(anyhow::anyhow!(other)),
})),
Err(e) => {
return (
val,
Err(match e {
crate::tenant::blob_io::WriteBlobError::Flush(blob_err) => match blob_err {
crate::tenant::blob_io::BlobWriterError::Cancelled => {
DeltaLayerWriterError::Cancelled
}
crate::tenant::blob_io::BlobWriterError::Other(err) => {
DeltaLayerWriterError::Other(err)
}
},
other => DeltaLayerWriterError::Other(anyhow::anyhow!(other)),
}),
);
}
};
let blob_ref = BlobRef::new(off, will_init);
@@ -543,7 +549,10 @@ impl DeltaLayerWriterInner {
self.num_keys += 1;
(val, res.map_err(|e| DeltaLayerWriterError::Other(anyhow::anyhow!(e))))
(
val,
res.map_err(|e| DeltaLayerWriterError::Other(anyhow::anyhow!(e))),
)
}
fn size(&self) -> u64 {

View File

@@ -866,9 +866,11 @@ impl ImageLayerWriterInner {
ctx: &RequestContext,
) -> Result<(), ImageLayerWriterError> {
if !self.key_range.contains(&key) {
return Err(ImageLayerWriterError::Other(anyhow::anyhow!("key not in range")));
return Err(ImageLayerWriterError::Other(anyhow::anyhow!(
"key not in range"
)));
}
let compression = self.conf.image_compression;
let uncompressed_len = img.len() as u64;
self.uncompressed_bytes += uncompressed_len;
@@ -880,12 +882,16 @@ impl ImageLayerWriterInner {
// TODO: re-use the buffer for `img` further upstack
let (off, compression_info) = res.map_err(|e| match e {
crate::tenant::blob_io::WriteBlobError::Flush(blob_err) => match blob_err {
crate::tenant::blob_io::BlobWriterError::Cancelled => ImageLayerWriterError::Cancelled,
crate::tenant::blob_io::BlobWriterError::Other(err) => ImageLayerWriterError::Other(err),
crate::tenant::blob_io::BlobWriterError::Cancelled => {
ImageLayerWriterError::Cancelled
}
crate::tenant::blob_io::BlobWriterError::Other(err) => {
ImageLayerWriterError::Other(err)
}
},
other => ImageLayerWriterError::Other(anyhow::anyhow!(other)),
})?;
if compression_info.compressed_size.is_some() {
// The image has been considered for compression at least
self.uncompressed_bytes_eligible += uncompressed_len;
@@ -897,7 +903,8 @@ impl ImageLayerWriterInner {
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
key.write_to_byte_slice(&mut keybuf);
self.tree.append(&keybuf, off)
self.tree
.append(&keybuf, off)
.map_err(|e| ImageLayerWriterError::Other(anyhow::anyhow!(e)))?;
#[cfg(feature = "testing")]
@@ -920,7 +927,9 @@ impl ImageLayerWriterInner {
ctx: &RequestContext,
) -> Result<(), ImageLayerWriterError> {
if !self.key_range.contains(&key) {
return Err(ImageLayerWriterError::Other(anyhow::anyhow!("key not in range")));
return Err(ImageLayerWriterError::Other(anyhow::anyhow!(
"key not in range"
)));
}
// NB: we don't update the (un)compressed metrics, since we can't determine them without
@@ -933,15 +942,20 @@ impl ImageLayerWriterInner {
.await;
let offset = res.map_err(|e| match e {
crate::tenant::blob_io::WriteBlobError::Flush(blob_err) => match blob_err {
crate::tenant::blob_io::BlobWriterError::Cancelled => ImageLayerWriterError::Cancelled,
crate::tenant::blob_io::BlobWriterError::Other(err) => ImageLayerWriterError::Other(err),
crate::tenant::blob_io::BlobWriterError::Cancelled => {
ImageLayerWriterError::Cancelled
}
crate::tenant::blob_io::BlobWriterError::Other(err) => {
ImageLayerWriterError::Other(err)
}
},
other => ImageLayerWriterError::Other(anyhow::anyhow!(other)),
})?;
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
key.write_to_byte_slice(&mut keybuf);
self.tree.append(&keybuf, offset)
self.tree
.append(&keybuf, offset)
.map_err(|e| ImageLayerWriterError::Other(anyhow::anyhow!(e)))?;
#[cfg(feature = "testing")]

View File

@@ -26,6 +26,7 @@ use utils::lsn::Lsn;
use utils::vec_map::VecMap;
use wal_decoder::serialized_batch::{SerializedValueBatch, SerializedValueMeta, ValueMeta};
use super::delta_layer::DeltaLayerWriterError;
use super::{DeltaLayerWriter, PersistentLayerDesc, ValuesReconstructState};
use crate::assert_u64_eq_usize::{U64IsUsize, UsizeIsU64, u64_to_usize};
use crate::config::PageServerConf;
@@ -615,7 +616,10 @@ impl InMemoryLayer {
} = serialized_batch;
// Write the batch to the file
inner.file.write_raw(&raw, ctx).await
inner
.file
.write_raw(&raw, ctx)
.await
.map_err(|e| InMemoryLayerError::Other(anyhow::anyhow!(e)))?;
let new_size = inner.file.len();
@@ -648,7 +652,8 @@ impl InMemoryLayer {
batch_offset,
len,
will_init,
})?;
})
.map_err(|e| InMemoryLayerError::Other(anyhow::anyhow!(e)))?;
let vec_map = inner.index.entry(key).or_default();
let old = vec_map.append_or_update_last(lsn, index_entry).unwrap().0;
@@ -805,14 +810,25 @@ impl InMemoryLayer {
ctx,
)
.await;
res?;
res.map_err(|e| match e {
DeltaLayerWriterError::Cancelled => {
anyhow::anyhow!("flush task cancelled")
}
DeltaLayerWriterError::Other(err) => err,
})?;
}
}
}
}
// MAX is used here because we identify L0 layers by full key range
let (desc, path) = delta_layer_writer.finish(Key::MAX, ctx).await?;
let (desc, path) = delta_layer_writer
.finish(Key::MAX, ctx)
.await
.map_err(|e| match e {
DeltaLayerWriterError::Cancelled => anyhow::anyhow!("flush task cancelled"),
DeltaLayerWriterError::Other(err) => err,
})?;
// Hold the permit until all the IO is done, including the fsync in `delta_layer_writer.finish()``.
//

View File

@@ -119,6 +119,8 @@ use crate::tenant::gc_result::GcResult;
use crate::tenant::layer_map::LayerMap;
use crate::tenant::metadata::TimelineMetadata;
use crate::tenant::storage_layer::delta_layer::DeltaEntry;
use crate::tenant::storage_layer::image_layer::ImageLayerWriterError;
use crate::tenant::storage_layer::inmemory_layer::InMemoryLayerError;
use crate::tenant::storage_layer::inmemory_layer::IndexEntry;
use crate::tenant::storage_layer::{
AsLayerDesc, BatchLayerWriter, DeltaLayerWriter, EvictionError, ImageLayerName,
@@ -5232,7 +5234,17 @@ impl Timeline {
};
// Write all the keys we just read into our new image layer.
image_layer_writer.put_image(img_key, img, ctx).await?;
image_layer_writer
.put_image(img_key, img, ctx)
.await
.map_err(|e| match e {
ImageLayerWriterError::Cancelled => CreateImageLayersError::Other(
anyhow::anyhow!("flush task cancelled"),
),
ImageLayerWriterError::Other(err) => {
CreateImageLayersError::Other(err)
}
})?;
wrote_keys = true;
}
}
@@ -5329,7 +5341,15 @@ impl Timeline {
// TODO: split image layers to avoid too large layer files. Too large image files are not handled
// on the normal data path either.
image_layer_writer.put_image(k, v, ctx).await?;
image_layer_writer
.put_image(k, v, ctx)
.await
.map_err(|e| match e {
ImageLayerWriterError::Cancelled => {
CreateImageLayersError::Other(anyhow::anyhow!("flush task cancelled"))
}
ImageLayerWriterError::Other(err) => CreateImageLayersError::Other(err),
})?;
}
if wrote_any_image {
@@ -6922,9 +6942,22 @@ impl Timeline {
)
.await?;
for (key, img) in images {
image_layer_writer.put_image(key, img, ctx).await?;
image_layer_writer
.put_image(key, img, ctx)
.await
.map_err(|e| match e {
ImageLayerWriterError::Cancelled => {
anyhow::anyhow!("flush task cancelled")
}
ImageLayerWriterError::Other(err) => err,
})?;
}
let (desc, path) = image_layer_writer.finish(ctx).await?;
let (desc, path) = image_layer_writer.finish(ctx).await.map_err(|e| match e {
ImageLayerWriterError::Cancelled => {
anyhow::anyhow!("flush task cancelled")
}
ImageLayerWriterError::Other(err) => err,
})?;
let image_layer = Layer::finish_creating(self.conf, self, desc, &path)?;
info!("force created image layer {}", image_layer.local_path());
{
@@ -7378,7 +7411,10 @@ impl TimelineWriter<'_> {
state.max_lsn = std::cmp::max(state.max_lsn, Some(batch_max_lsn));
}
res
res.map_err(|e| match e {
InMemoryLayerError::Cancelled => anyhow::anyhow!("flush task cancelled"),
InMemoryLayerError::Other(err) => err,
})
}
#[cfg(test)]

View File

@@ -15,7 +15,6 @@ use super::{
GetVectoredError, ImageLayerCreationMode, LastImageLayerCreationStatus, RecordedDuration,
Timeline,
};
use crate::tenant::storage_layer::delta_layer::DeltaLayerWriterError;
use crate::tenant::storage_layer::image_layer::ImageLayerWriterError;
use crate::tenant::timeline::DeltaEntry;
@@ -1549,7 +1548,13 @@ impl Timeline {
ctx,
)
.await
.map_err(CompactionError::Other)?;
.map_err(|e| {
if e.to_string().contains("flush task cancelled") {
CompactionError::Other(anyhow::anyhow!("flush task cancelled"))
} else {
CompactionError::Other(anyhow::anyhow!(e))
}
})?;
// Safety of layer rewrites:
// - We are writing to a different local file path than we are reading from, so the old Layer
@@ -1571,11 +1576,12 @@ impl Timeline {
.await?;
if keys_written > 0 {
let (desc, path) = image_layer_writer.finish(ctx).await.map_err(|e| match e {
ImageLayerWriterError::Cancelled => {
let (desc, path) = image_layer_writer.finish(ctx).await.map_err(|e| {
if e.to_string().contains("flush task cancelled") {
CompactionError::Other(anyhow::anyhow!("flush task cancelled"))
} else {
CompactionError::Other(anyhow::anyhow!(e))
}
ImageLayerWriterError::Other(err) => CompactionError::Other(err),
})?;
let new_layer = Layer::finish_creating(self.conf, self, desc, &path)
.map_err(CompactionError::Other)?;
@@ -2144,11 +2150,12 @@ impl Timeline {
.unwrap()
.finish(prev_key.unwrap().next(), ctx)
.await
.map_err(|e| match e {
DeltaLayerWriterError::Cancelled => {
.map_err(|e| {
if e.to_string().contains("flush task cancelled") {
CompactionError::Other(anyhow::anyhow!("flush task cancelled"))
} else {
CompactionError::Other(anyhow::anyhow!(e))
}
DeltaLayerWriterError::Other(err) => CompactionError::Other(err),
})?;
let new_delta = Layer::finish_creating(self.conf, self, desc, &path)
.map_err(CompactionError::Other)?;
@@ -2208,7 +2215,13 @@ impl Timeline {
.unwrap()
.put_value(key, lsn, value, ctx)
.await
.map_err(CompactionError::Other)?;
.map_err(|e| {
if e.to_string().contains("flush task cancelled") {
CompactionError::Other(anyhow::anyhow!("flush task cancelled"))
} else {
CompactionError::Other(anyhow::anyhow!(e))
}
})?;
} else {
let owner = self.shard_identity.get_shard_number(&key);
@@ -2226,11 +2239,12 @@ impl Timeline {
let (desc, path) = writer
.finish(prev_key.unwrap().next(), ctx)
.await
.map_err(|e| match e {
DeltaLayerWriterError::Cancelled => {
.map_err(|e| {
if e.to_string().contains("flush task cancelled") {
CompactionError::Other(anyhow::anyhow!("flush task cancelled"))
} else {
CompactionError::Other(anyhow::anyhow!(e))
}
DeltaLayerWriterError::Other(err) => CompactionError::Other(err),
})?;
let new_delta = Layer::finish_creating(self.conf, self, desc, &path)
.map_err(CompactionError::Other)?;
@@ -3696,8 +3710,13 @@ impl Timeline {
let (desc, path) = delta_writer_before
.finish(job_desc.compaction_key_range.start, ctx)
.await
.context("failed to finish delta layer writer")
.map_err(CompactionError::Other)?;
.map_err(|e| {
if e.to_string().contains("flush task cancelled") {
CompactionError::Other(anyhow::anyhow!("flush task cancelled"))
} else {
CompactionError::Other(anyhow::anyhow!(e))
}
})?;
let layer = Layer::finish_creating(self.conf, self, desc, &path)
.context("failed to finish creating delta layer")
.map_err(CompactionError::Other)?;
@@ -3707,8 +3726,13 @@ impl Timeline {
let (desc, path) = delta_writer_after
.finish(key.key_range.end, ctx)
.await
.context("failed to finish delta layer writer")
.map_err(CompactionError::Other)?;
.map_err(|e| {
if e.to_string().contains("flush task cancelled") {
CompactionError::Other(anyhow::anyhow!("flush task cancelled"))
} else {
CompactionError::Other(anyhow::anyhow!(e))
}
})?;
let layer = Layer::finish_creating(self.conf, self, desc, &path)
.context("failed to finish creating delta layer")
.map_err(CompactionError::Other)?;
@@ -3727,8 +3751,13 @@ impl Timeline {
writer
.finish_with_discard_fn(self, ctx, end_key, discard)
.await
.context("failed to finish image layer writer")
.map_err(CompactionError::Other)?
.map_err(|e| {
if e.to_string().contains("flush task cancelled") {
CompactionError::Other(anyhow::anyhow!("flush task cancelled"))
} else {
CompactionError::Other(anyhow::anyhow!(e))
}
})?
} else {
drop(writer);
Vec::new()
@@ -3741,8 +3770,13 @@ impl Timeline {
delta_layer_writer
.finish_with_discard_fn(self, ctx, discard)
.await
.context("failed to finish delta layer writer")
.map_err(CompactionError::Other)?
.map_err(|e| {
if e.to_string().contains("flush task cancelled") {
CompactionError::Other(anyhow::anyhow!("flush task cancelled"))
} else {
CompactionError::Other(anyhow::anyhow!(e))
}
})?
} else {
drop(delta_layer_writer);
Vec::new()