Compare commits

...

9 Commits

Author SHA1 Message Date
Devin AI
8ec905239b Fix unused imports and pattern matching for CompactionError::Cancelled
Co-Authored-By: christian@neon.tech <christian@neon.tech>
2025-05-07 09:07:04 +00:00
Devin AI
e32aceff16 Replace string comparison with pattern matching for error handling
Co-Authored-By: christian@neon.tech <christian@neon.tech>
2025-05-06 19:28:51 +00:00
Devin AI
486d9f0c4d Address PR comments: restore maybe_fatal_err, use anyhow::Error::new, add TODO comments, fix string matching
Co-Authored-By: christian@neon.tech <christian@neon.tech>
2025-05-06 19:17:57 +00:00
Devin AI
000503b38a Fix error handling in compaction.rs to use .into() for proper type conversion
Co-Authored-By: christian@neon.tech <christian@neon.tech>
2025-05-06 18:53:08 +00:00
Devin AI
7a576d723c Extend error handling to put_image, put_value_*, and put_batch methods
Co-Authored-By: christian@neon.tech <christian@neon.tech>
2025-05-06 16:29:57 +00:00
Devin AI
b419fa9e2f Fix InMemoryLayerError enum placement
Co-Authored-By: christian@neon.tech <christian@neon.tech>
2025-05-06 16:04:52 +00:00
Devin AI
aa45bf998d Extend error handling to put_image, put_value_*, and put_batch methods
Co-Authored-By: christian@neon.tech <christian@neon.tech>
2025-05-06 16:04:20 +00:00
Devin AI
f8dffb62cf Bubble up BlobWriterError to users of BlobWriter
Co-Authored-By: christian@neon.tech <christian@neon.tech>
2025-05-06 14:56:28 +00:00
Devin AI
ee7479cb68 Replace anyhow::Result with typed BlobWriterError in BlobWriter error path
Co-Authored-By: christian@neon.tech <christian@neon.tech>
2025-05-06 09:57:13 +00:00
11 changed files with 357 additions and 124 deletions

View File

@@ -2469,6 +2469,7 @@ async fn timeline_checkpoint_handler(
.map_err(|e|
match e {
CompactionError::ShuttingDown => ApiError::ShuttingDown,
CompactionError::Cancelled => ApiError::ShuttingDown,
CompactionError::Offload(e) => ApiError::InternalServerError(anyhow::anyhow!(e)),
CompactionError::CollectKeySpaceError(e) => ApiError::InternalServerError(anyhow::anyhow!(e)),
CompactionError::Other(e) => ApiError::InternalServerError(e),

View File

@@ -3198,6 +3198,7 @@ impl TenantShard {
match err {
err if err.is_cancel() => {}
CompactionError::ShuttingDown => (),
CompactionError::Cancelled => (),
// Offload failures don't trip the circuit breaker, since they're cheap to retry and
// shouldn't block compaction.
CompactionError::Offload(_) => {}

View File

@@ -90,10 +90,18 @@ impl Header {
}
}
#[derive(Debug, thiserror::Error)]
pub enum BlobWriterError {
#[error("cancelled")]
Cancelled,
#[error(transparent)]
Other(anyhow::Error),
}
#[derive(Debug, thiserror::Error)]
pub enum WriteBlobError {
#[error(transparent)]
Flush(FlushTaskError),
Flush(BlobWriterError),
#[error("blob too large ({len} bytes)")]
BlobTooLarge { len: usize },
#[error(transparent)]
@@ -238,14 +246,16 @@ where
cancel: CancellationToken,
ctx: &RequestContext,
flush_task_span: tracing::Span,
) -> anyhow::Result<Self> {
) -> Result<Self, BlobWriterError> {
let gate_token = gate.enter().map_err(|_| BlobWriterError::Cancelled)?;
Ok(Self {
io_buf: Some(BytesMut::new()),
writer: BufferedWriter::new(
file,
start_offset,
|| IoBufferMut::with_capacity(Self::CAPACITY),
gate.enter()?,
gate_token,
cancel,
ctx,
flush_task_span,
@@ -265,13 +275,16 @@ where
&mut self,
src_buf: FullSlice<Buf>,
ctx: &RequestContext,
) -> (FullSlice<Buf>, Result<(), FlushTaskError>) {
) -> (FullSlice<Buf>, Result<(), BlobWriterError>) {
let res = self
.writer
// TODO: why are we taking a FullSlice if we're going to pass a borrow downstack?
// Can remove all the complexity around owned buffers upstack
.write_buffered_borrowed(&src_buf, ctx)
.await
.map_err(|e| match e {
FlushTaskError::Cancelled => BlobWriterError::Cancelled,
})
.map(|len| {
self.offset += len as u64;
});
@@ -418,8 +431,10 @@ where
self,
mode: BufferedWriterShutdownMode,
ctx: &RequestContext,
) -> Result<W, FlushTaskError> {
let (_, file) = self.writer.shutdown(mode, ctx).await?;
) -> Result<W, BlobWriterError> {
let (_, file) = self.writer.shutdown(mode, ctx).await.map_err(|e| match e {
FlushTaskError::Cancelled => BlobWriterError::Cancelled,
})?;
Ok(file)
}
}
@@ -467,8 +482,11 @@ pub(crate) mod tests {
.await?,
gate.enter()?,
);
let mut wtr =
BlobWriter::new(file, 0, &gate, cancel.clone(), ctx, info_span!("test")).unwrap();
let mut wtr = BlobWriter::new(file, 0, &gate, cancel.clone(), ctx, info_span!("test"))
.map_err(|e| match e {
BlobWriterError::Cancelled => anyhow::anyhow!("flush task cancelled"),
BlobWriterError::Other(err) => err,
})?;
for blob in blobs.iter() {
let (_, res) = if compression {
let res = wtr
@@ -490,7 +508,11 @@ pub(crate) mod tests {
BufferedWriterShutdownMode::ZeroPadToNextMultiple(PAGE_SZ),
ctx,
)
.await?;
.await
.map_err(|e| match e {
BlobWriterError::Cancelled => anyhow::anyhow!("flush task cancelled"),
BlobWriterError::Other(err) => err,
})?;
file.disarm_into_inner()
};
Ok((temp_dir, pathbuf, offsets))

View File

@@ -19,6 +19,14 @@ use crate::context::RequestContext;
use crate::tenant::Timeline;
use crate::tenant::storage_layer::Layer;
#[derive(Debug, thiserror::Error)]
pub enum BatchSplitWriterError {
#[error("cancelled")]
Cancelled,
#[error(transparent)]
Other(anyhow::Error),
}
pub(crate) enum BatchWriterResult {
Produced(ResidentLayer),
Discarded(PersistentLayerKey),
@@ -97,7 +105,7 @@ impl BatchLayerWriter {
self,
tline: &Arc<Timeline>,
ctx: &RequestContext,
) -> anyhow::Result<Vec<ResidentLayer>> {
) -> Result<Vec<ResidentLayer>, BatchSplitWriterError> {
let res = self
.finish_with_discard_fn(tline, ctx, |_| async { false })
.await?;
@@ -115,7 +123,7 @@ impl BatchLayerWriter {
tline: &Arc<Timeline>,
ctx: &RequestContext,
discard_fn: D,
) -> anyhow::Result<Vec<BatchWriterResult>>
) -> Result<Vec<BatchWriterResult>, BatchSplitWriterError>
where
D: Fn(&PersistentLayerKey) -> F,
F: Future<Output = bool>,
@@ -139,14 +147,14 @@ impl BatchLayerWriter {
generated_layers.push(BatchWriterResult::Discarded(layer_key));
} else {
let res = match inner {
LayerWriterWrapper::Delta(writer) => {
writer.finish(layer_key.key_range.end, ctx).await
}
LayerWriterWrapper::Image(writer) => {
writer
.finish_with_end_key(layer_key.key_range.end, ctx)
.await
}
LayerWriterWrapper::Delta(writer) => writer
.finish(layer_key.key_range.end, ctx)
.await
.map_err(|e| BatchSplitWriterError::Other(anyhow::anyhow!(e))),
LayerWriterWrapper::Image(writer) => writer
.finish_with_end_key(layer_key.key_range.end, ctx)
.await
.map_err(|e| BatchSplitWriterError::Other(anyhow::anyhow!(e))),
};
let layer = match res {
Ok((desc, path)) => {
@@ -155,7 +163,7 @@ impl BatchLayerWriter {
Err(e) => {
tokio::fs::remove_file(&path).await.ok();
clean_up_layers(generated_layers);
return Err(e);
return Err(BatchSplitWriterError::Other(e));
}
}
}
@@ -235,7 +243,7 @@ impl<'a> SplitImageLayerWriter<'a> {
key: Key,
img: Bytes,
ctx: &RequestContext,
) -> anyhow::Result<()> {
) -> Result<(), BatchSplitWriterError> {
// The current estimation is an upper bound of the space that the key/image could take
// because we did not consider compression in this estimation. The resulting image layer
// could be smaller than the target size.
@@ -253,7 +261,8 @@ impl<'a> SplitImageLayerWriter<'a> {
self.cancel.clone(),
ctx,
)
.await?;
.await
.map_err(|e| BatchSplitWriterError::Other(anyhow::anyhow!(e)))?;
let prev_image_writer = std::mem::replace(&mut self.inner, next_image_writer);
self.batches.add_unfinished_image_writer(
prev_image_writer,
@@ -262,7 +271,10 @@ impl<'a> SplitImageLayerWriter<'a> {
);
self.start_key = key;
}
self.inner.put_image(key, img, ctx).await
self.inner
.put_image(key, img, ctx)
.await
.map_err(|e| BatchSplitWriterError::Other(anyhow::anyhow!(e)))
}
pub(crate) async fn finish_with_discard_fn<D, F>(
@@ -271,7 +283,7 @@ impl<'a> SplitImageLayerWriter<'a> {
ctx: &RequestContext,
end_key: Key,
discard_fn: D,
) -> anyhow::Result<Vec<BatchWriterResult>>
) -> Result<Vec<BatchWriterResult>, BatchSplitWriterError>
where
D: Fn(&PersistentLayerKey) -> F,
F: Future<Output = bool>,
@@ -291,7 +303,7 @@ impl<'a> SplitImageLayerWriter<'a> {
tline: &Arc<Timeline>,
ctx: &RequestContext,
end_key: Key,
) -> anyhow::Result<Vec<BatchWriterResult>> {
) -> Result<Vec<BatchWriterResult>, BatchSplitWriterError> {
self.finish_with_discard_fn(tline, ctx, end_key, |_| async { false })
.await
}
@@ -346,7 +358,7 @@ impl<'a> SplitDeltaLayerWriter<'a> {
lsn: Lsn,
val: Value,
ctx: &RequestContext,
) -> anyhow::Result<()> {
) -> Result<(), BatchSplitWriterError> {
// The current estimation is key size plus LSN size plus value size estimation. This is not an accurate
// number, and therefore the final layer size could be a little bit larger or smaller than the target.
//
@@ -366,7 +378,8 @@ impl<'a> SplitDeltaLayerWriter<'a> {
self.cancel.clone(),
ctx,
)
.await?,
.await
.map_err(|e| BatchSplitWriterError::Other(anyhow::anyhow!(e)))?,
));
}
let (_, inner) = self.inner.as_mut().unwrap();
@@ -386,7 +399,8 @@ impl<'a> SplitDeltaLayerWriter<'a> {
self.cancel.clone(),
ctx,
)
.await?;
.await
.map_err(|e| BatchSplitWriterError::Other(anyhow::anyhow!(e)))?;
let (start_key, prev_delta_writer) =
self.inner.replace((key, next_delta_writer)).unwrap();
self.batches.add_unfinished_delta_writer(
@@ -396,16 +410,19 @@ impl<'a> SplitDeltaLayerWriter<'a> {
);
} else if inner.estimated_size() >= S3_UPLOAD_LIMIT {
// We have to produce a very large file b/c a key is updated too often.
anyhow::bail!(
return Err(BatchSplitWriterError::Other(anyhow::anyhow!(
"a single key is updated too often: key={}, estimated_size={}, and the layer file cannot be produced",
key,
inner.estimated_size()
);
)));
}
}
self.last_key_written = key;
let (_, inner) = self.inner.as_mut().unwrap();
inner.put_value(key, lsn, val, ctx).await
inner
.put_value(key, lsn, val, ctx)
.await
.map_err(|e| BatchSplitWriterError::Other(anyhow::anyhow!(e)))
}
pub(crate) async fn finish_with_discard_fn<D, F>(
@@ -413,7 +430,7 @@ impl<'a> SplitDeltaLayerWriter<'a> {
tline: &Arc<Timeline>,
ctx: &RequestContext,
discard_fn: D,
) -> anyhow::Result<Vec<BatchWriterResult>>
) -> Result<Vec<BatchWriterResult>, BatchSplitWriterError>
where
D: Fn(&PersistentLayerKey) -> F,
F: Future<Output = bool>,
@@ -439,7 +456,7 @@ impl<'a> SplitDeltaLayerWriter<'a> {
self,
tline: &Arc<Timeline>,
ctx: &RequestContext,
) -> anyhow::Result<Vec<BatchWriterResult>> {
) -> Result<Vec<BatchWriterResult>, BatchSplitWriterError> {
self.finish_with_discard_fn(tline, ctx, |_| async { false })
.await
}

View File

@@ -35,7 +35,9 @@ use std::str::FromStr;
use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use anyhow::{Context, Result, bail, ensure};
use crate::tenant::blob_io::BlobWriterError;
use anyhow::{Context, Result, bail};
use camino::{Utf8Path, Utf8PathBuf};
use futures::StreamExt;
use itertools::Itertools;
@@ -76,7 +78,7 @@ use crate::tenant::vectored_blob_io::{
use crate::virtual_file::TempVirtualFile;
use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt};
use crate::virtual_file::owned_buffers_io::write::{Buffer, BufferedWriterShutdownMode};
use crate::virtual_file::{self, IoBuffer, IoBufferMut, MaybeFatalIo, VirtualFile};
use crate::virtual_file::{self, IoBuffer, IoBufferMut, VirtualFile};
use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
///
@@ -448,7 +450,11 @@ impl DeltaLayerWriterInner {
cancel,
ctx,
info_span!(parent: None, "delta_layer_writer_flush_task", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), timeline_id=%timeline_id, path = %path),
)?;
)
.map_err(|e| match e {
BlobWriterError::Cancelled => anyhow::anyhow!("flush task cancelled"),
BlobWriterError::Other(err) => err,
})?;
// Initialize the b-tree index builder
let block_buf = BlockBuf::new();
@@ -477,15 +483,12 @@ impl DeltaLayerWriterInner {
lsn: Lsn,
val: Value,
ctx: &RequestContext,
) -> anyhow::Result<()> {
) -> Result<(), DeltaLayerWriterError> {
let val_ser =
Value::ser(&val).map_err(|e| DeltaLayerWriterError::Other(anyhow::Error::new(e)))?;
let (_, res) = self
.put_value_bytes(
key,
lsn,
Value::ser(&val)?.slice_len(),
val.will_init(),
ctx,
)
.put_value_bytes(key, lsn, val_ser.slice_len(), val.will_init(), ctx)
.await;
res
}
@@ -497,25 +500,46 @@ impl DeltaLayerWriterInner {
val: FullSlice<Buf>,
will_init: bool,
ctx: &RequestContext,
) -> (FullSlice<Buf>, anyhow::Result<()>)
) -> (FullSlice<Buf>, Result<(), DeltaLayerWriterError>)
where
Buf: IoBuf + Send,
{
assert!(
self.lsn_range.start <= lsn,
"lsn_start={}, lsn={}",
self.lsn_range.start,
lsn
);
if self.lsn_range.start > lsn {
return (
val,
Err(DeltaLayerWriterError::Other(anyhow::anyhow!(
"lsn_start={}, lsn={}",
self.lsn_range.start,
lsn
))),
);
}
// We don't want to use compression in delta layer creation
let compression = ImageCompressionAlgorithm::Disabled;
let (val, res) = self
.blob_writer
.write_blob_maybe_compressed(val, ctx, compression)
.await;
let off = match res {
Ok((off, _)) => off,
Err(e) => return (val, Err(anyhow::anyhow!(e))),
Err(e) => {
return (
val,
Err(match e {
crate::tenant::blob_io::WriteBlobError::Flush(blob_err) => match blob_err {
crate::tenant::blob_io::BlobWriterError::Cancelled => {
DeltaLayerWriterError::Cancelled
}
crate::tenant::blob_io::BlobWriterError::Other(err) => {
DeltaLayerWriterError::Other(err)
}
},
other => DeltaLayerWriterError::Other(anyhow::anyhow!(other)),
}),
);
}
};
let blob_ref = BlobRef::new(off, will_init);
@@ -525,7 +549,10 @@ impl DeltaLayerWriterInner {
self.num_keys += 1;
(val, res.map_err(|e| anyhow::anyhow!(e)))
(
val,
res.map_err(|e| DeltaLayerWriterError::Other(anyhow::anyhow!(e))),
)
}
fn size(&self) -> u64 {
@@ -539,7 +566,7 @@ impl DeltaLayerWriterInner {
self,
key_end: Key,
ctx: &RequestContext,
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
) -> Result<(PersistentLayerDesc, Utf8PathBuf), DeltaLayerWriterError> {
let index_start_blk = self.blob_writer.size().div_ceil(PAGE_SZ as u64) as u32;
let file = self
@@ -548,17 +575,24 @@ impl DeltaLayerWriterInner {
BufferedWriterShutdownMode::ZeroPadToNextMultiple(PAGE_SZ),
ctx,
)
.await?;
.await
.map_err(|e| match e {
BlobWriterError::Cancelled => DeltaLayerWriterError::Cancelled,
BlobWriterError::Other(err) => DeltaLayerWriterError::Other(err),
})?;
// Write out the index
let (index_root_blk, block_buf) = self.tree.finish()?;
let (index_root_blk, block_buf) = self
.tree
.finish()
.map_err(|e| DeltaLayerWriterError::Other(anyhow::Error::new(e)))?;
let mut offset = index_start_blk as u64 * PAGE_SZ as u64;
// TODO(yuchen): https://github.com/neondatabase/neon/issues/10092
// Should we just replace BlockBuf::blocks with one big buffer
for buf in block_buf.blocks {
let (_buf, res) = file.write_all_at(buf.slice_len(), offset, ctx).await;
res?;
res.map_err(|e| DeltaLayerWriterError::Other(anyhow::Error::new(e)))?;
offset += PAGE_SZ as u64;
}
assert!(self.lsn_range.start < self.lsn_range.end);
@@ -575,24 +609,27 @@ impl DeltaLayerWriterInner {
};
// Writes summary at the first block (offset 0).
let buf = summary.ser_into_page()?;
let buf = summary
.ser_into_page()
.map_err(|e| DeltaLayerWriterError::Other(anyhow::Error::new(e)))?;
let (_buf, res) = file.write_all_at(buf.slice_len(), 0, ctx).await;
res?;
res.map_err(|e| DeltaLayerWriterError::Other(anyhow::Error::new(e)))?;
let metadata = file
.metadata()
.await
.context("get file metadata to determine size")?;
.map_err(|e| DeltaLayerWriterError::Other(anyhow::Error::new(e)))?;
// 5GB limit for objects without multipart upload (which we don't want to use)
// Make it a little bit below to account for differing GB units
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/upload-objects.html
ensure!(
metadata.len() <= S3_UPLOAD_LIMIT,
"Created delta layer file at {} of size {} above limit {S3_UPLOAD_LIMIT}!",
file.path(),
metadata.len()
);
if metadata.len() > S3_UPLOAD_LIMIT {
return Err(DeltaLayerWriterError::Other(anyhow::anyhow!(
"Created delta layer file at {} of size {} above limit {S3_UPLOAD_LIMIT}!",
file.path(),
metadata.len()
)));
}
// Note: Because we opened the file in write-only mode, we cannot
// reuse the same VirtualFile for reading later. That's why we don't
@@ -609,7 +646,7 @@ impl DeltaLayerWriterInner {
// fsync the file
file.sync_all()
.await
.maybe_fatal_err("delta_layer sync_all")?;
.map_err(|e| DeltaLayerWriterError::Other(anyhow::Error::new(e)))?;
trace!("created delta layer {}", self.path);
@@ -694,7 +731,7 @@ impl DeltaLayerWriter {
lsn: Lsn,
val: Value,
ctx: &RequestContext,
) -> anyhow::Result<()> {
) -> Result<(), DeltaLayerWriterError> {
self.inner
.as_mut()
.unwrap()
@@ -709,7 +746,7 @@ impl DeltaLayerWriter {
val: FullSlice<Buf>,
will_init: bool,
ctx: &RequestContext,
) -> (FullSlice<Buf>, anyhow::Result<()>)
) -> (FullSlice<Buf>, Result<(), DeltaLayerWriterError>)
where
Buf: IoBuf + Send,
{
@@ -731,7 +768,7 @@ impl DeltaLayerWriter {
mut self,
key_end: Key,
ctx: &RequestContext,
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
) -> Result<(PersistentLayerDesc, Utf8PathBuf), DeltaLayerWriterError> {
self.inner.take().unwrap().finish(key_end, ctx).await
}
@@ -745,6 +782,14 @@ impl DeltaLayerWriter {
}
}
#[derive(Debug, thiserror::Error)]
pub enum DeltaLayerWriterError {
#[error("cancelled")]
Cancelled,
#[error(transparent)]
Other(anyhow::Error),
}
#[derive(thiserror::Error, Debug)]
pub enum RewriteSummaryError {
#[error("magic mismatch")]
@@ -755,7 +800,7 @@ pub enum RewriteSummaryError {
impl From<std::io::Error> for RewriteSummaryError {
fn from(e: std::io::Error) -> Self {
Self::Other(anyhow::anyhow!(e))
Self::Other(anyhow::Error::new(e))
}
}

View File

@@ -33,7 +33,9 @@ use std::str::FromStr;
use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use anyhow::{Context, Result, bail, ensure};
use crate::tenant::blob_io::BlobWriterError;
use anyhow::{Context, Result, bail};
use bytes::Bytes;
use camino::{Utf8Path, Utf8PathBuf};
use hex;
@@ -74,7 +76,7 @@ use crate::tenant::vectored_blob_io::{
use crate::virtual_file::TempVirtualFile;
use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
use crate::virtual_file::owned_buffers_io::write::{Buffer, BufferedWriterShutdownMode};
use crate::virtual_file::{self, IoBuffer, IoBufferMut, MaybeFatalIo, VirtualFile};
use crate::virtual_file::{self, IoBuffer, IoBufferMut, VirtualFile};
use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
///
@@ -340,6 +342,14 @@ impl ImageLayer {
}
}
#[derive(Debug, thiserror::Error)]
pub enum ImageLayerWriterError {
#[error("flush task cancelled")]
Cancelled,
#[error(transparent)]
Other(anyhow::Error),
}
#[derive(thiserror::Error, Debug)]
pub enum RewriteSummaryError {
#[error("magic mismatch")]
@@ -814,7 +824,11 @@ impl ImageLayerWriterInner {
cancel,
ctx,
info_span!(parent: None, "image_layer_writer_flush_task", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), timeline_id=%timeline_id, path = %path),
)?;
)
.map_err(|e| match e {
BlobWriterError::Cancelled => anyhow::anyhow!("flush task cancelled"),
BlobWriterError::Other(err) => err,
})?;
// Initialize the b-tree index builder
let block_buf = BlockBuf::new();
@@ -850,8 +864,13 @@ impl ImageLayerWriterInner {
key: Key,
img: Bytes,
ctx: &RequestContext,
) -> anyhow::Result<()> {
ensure!(self.key_range.contains(&key));
) -> Result<(), ImageLayerWriterError> {
if !self.key_range.contains(&key) {
return Err(ImageLayerWriterError::Other(anyhow::anyhow!(
"key not in range"
)));
}
let compression = self.conf.image_compression;
let uncompressed_len = img.len() as u64;
self.uncompressed_bytes += uncompressed_len;
@@ -861,7 +880,18 @@ impl ImageLayerWriterInner {
.write_blob_maybe_compressed(img.slice_len(), ctx, compression)
.await;
// TODO: re-use the buffer for `img` further upstack
let (off, compression_info) = res?;
let (off, compression_info) = res.map_err(|e| match e {
crate::tenant::blob_io::WriteBlobError::Flush(blob_err) => match blob_err {
crate::tenant::blob_io::BlobWriterError::Cancelled => {
ImageLayerWriterError::Cancelled
}
crate::tenant::blob_io::BlobWriterError::Other(err) => {
ImageLayerWriterError::Other(err)
}
},
other => ImageLayerWriterError::Other(anyhow::anyhow!(other)),
})?;
if compression_info.compressed_size.is_some() {
// The image has been considered for compression at least
self.uncompressed_bytes_eligible += uncompressed_len;
@@ -873,7 +903,9 @@ impl ImageLayerWriterInner {
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
key.write_to_byte_slice(&mut keybuf);
self.tree.append(&keybuf, off)?;
self.tree
.append(&keybuf, off)
.map_err(|e| ImageLayerWriterError::Other(anyhow::anyhow!(e)))?;
#[cfg(feature = "testing")]
{
@@ -893,8 +925,12 @@ impl ImageLayerWriterInner {
key: Key,
raw_with_header: Bytes,
ctx: &RequestContext,
) -> anyhow::Result<()> {
ensure!(self.key_range.contains(&key));
) -> Result<(), ImageLayerWriterError> {
if !self.key_range.contains(&key) {
return Err(ImageLayerWriterError::Other(anyhow::anyhow!(
"key not in range"
)));
}
// NB: we don't update the (un)compressed metrics, since we can't determine them without
// decompressing the image. This seems okay.
@@ -904,11 +940,23 @@ impl ImageLayerWriterInner {
.blob_writer
.write_blob_raw(raw_with_header.slice_len(), ctx)
.await;
let offset = res?;
let offset = res.map_err(|e| match e {
crate::tenant::blob_io::WriteBlobError::Flush(blob_err) => match blob_err {
crate::tenant::blob_io::BlobWriterError::Cancelled => {
ImageLayerWriterError::Cancelled
}
crate::tenant::blob_io::BlobWriterError::Other(err) => {
ImageLayerWriterError::Other(err)
}
},
other => ImageLayerWriterError::Other(anyhow::anyhow!(other)),
})?;
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
key.write_to_byte_slice(&mut keybuf);
self.tree.append(&keybuf, offset)?;
self.tree
.append(&keybuf, offset)
.map_err(|e| ImageLayerWriterError::Other(anyhow::anyhow!(e)))?;
#[cfg(feature = "testing")]
{
@@ -925,7 +973,7 @@ impl ImageLayerWriterInner {
self,
ctx: &RequestContext,
end_key: Option<Key>,
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
) -> Result<(PersistentLayerDesc, Utf8PathBuf), ImageLayerWriterError> {
let index_start_blk = self.blob_writer.size().div_ceil(PAGE_SZ as u64) as u32;
// Calculate compression ratio
@@ -948,17 +996,24 @@ impl ImageLayerWriterInner {
BufferedWriterShutdownMode::ZeroPadToNextMultiple(PAGE_SZ),
ctx,
)
.await?;
.await
.map_err(|e| match e {
BlobWriterError::Cancelled => ImageLayerWriterError::Cancelled,
BlobWriterError::Other(err) => ImageLayerWriterError::Other(err),
})?;
// Write out the index
let mut offset = index_start_blk as u64 * PAGE_SZ as u64;
let (index_root_blk, block_buf) = self.tree.finish()?;
let (index_root_blk, block_buf) = self
.tree
.finish()
.map_err(|e| ImageLayerWriterError::Other(anyhow::anyhow!(e)))?;
// TODO(yuchen): https://github.com/neondatabase/neon/issues/10092
// Should we just replace BlockBuf::blocks with one big buffer?
for buf in block_buf.blocks {
let (_buf, res) = file.write_all_at(buf.slice_len(), offset, ctx).await;
res?;
res.map_err(|e| ImageLayerWriterError::Other(anyhow::anyhow!(e)))?;
offset += PAGE_SZ as u64;
}
@@ -981,14 +1036,18 @@ impl ImageLayerWriterInner {
};
// Writes summary at the first block (offset 0).
let buf = summary.ser_into_page()?;
let buf = summary
.ser_into_page()
.map_err(|e| ImageLayerWriterError::Other(anyhow::anyhow!(e)))?;
let (_buf, res) = file.write_all_at(buf.slice_len(), 0, ctx).await;
res?;
res.map_err(|e| ImageLayerWriterError::Other(anyhow::anyhow!(e)))?;
let metadata = file
.metadata()
.await
.context("get metadata to determine file size")?;
let metadata = file.metadata().await.map_err(|e| {
ImageLayerWriterError::Other(anyhow::anyhow!(
"get metadata to determine file size: {}",
e
))
})?;
let desc = PersistentLayerDesc::new_img(
self.tenant_shard_id,
@@ -1011,9 +1070,9 @@ impl ImageLayerWriterInner {
// set inner.file here. The first read will have to re-open it.
// fsync the file
file.sync_all()
.await
.maybe_fatal_err("image_layer sync_all")?;
file.sync_all().await.map_err(|e| {
ImageLayerWriterError::Other(anyhow::anyhow!("image_layer sync_all: {}", e))
})?;
trace!("created image layer {}", self.path);
@@ -1093,7 +1152,7 @@ impl ImageLayerWriter {
key: Key,
img: Bytes,
ctx: &RequestContext,
) -> anyhow::Result<()> {
) -> Result<(), ImageLayerWriterError> {
self.inner.as_mut().unwrap().put_image(key, img, ctx).await
}
@@ -1108,7 +1167,7 @@ impl ImageLayerWriter {
key: Key,
raw_with_header: Bytes,
ctx: &RequestContext,
) -> anyhow::Result<()> {
) -> Result<(), ImageLayerWriterError> {
self.inner
.as_mut()
.unwrap()
@@ -1132,7 +1191,7 @@ impl ImageLayerWriter {
pub(crate) async fn finish(
mut self,
ctx: &RequestContext,
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
) -> Result<(PersistentLayerDesc, Utf8PathBuf), ImageLayerWriterError> {
self.inner.take().unwrap().finish(ctx, None).await
}
@@ -1141,7 +1200,7 @@ impl ImageLayerWriter {
mut self,
end_key: Key,
ctx: &RequestContext,
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
) -> Result<(PersistentLayerDesc, Utf8PathBuf), ImageLayerWriterError> {
self.inner.take().unwrap().finish(ctx, Some(end_key)).await
}
}

View File

@@ -26,6 +26,7 @@ use utils::lsn::Lsn;
use utils::vec_map::VecMap;
use wal_decoder::serialized_batch::{SerializedValueBatch, SerializedValueMeta, ValueMeta};
use super::delta_layer::DeltaLayerWriterError;
use super::{DeltaLayerWriter, PersistentLayerDesc, ValuesReconstructState};
use crate::assert_u64_eq_usize::{U64IsUsize, UsizeIsU64, u64_to_usize};
use crate::config::PageServerConf;
@@ -581,7 +582,17 @@ impl InMemoryLayer {
estimated_in_mem_size: AtomicU64::new(0),
})
}
}
#[derive(Debug, thiserror::Error)]
pub enum InMemoryLayerError {
#[error("flush task cancelled")]
Cancelled,
#[error(transparent)]
Other(anyhow::Error),
}
impl InMemoryLayer {
/// Write path.
///
/// Errors are not retryable, the [`InMemoryLayer`] must be discarded, and not be read from.
@@ -591,7 +602,7 @@ impl InMemoryLayer {
&self,
serialized_batch: SerializedValueBatch,
ctx: &RequestContext,
) -> anyhow::Result<()> {
) -> Result<(), InMemoryLayerError> {
let mut inner = self.inner.write().await;
self.assert_writable();
@@ -605,7 +616,11 @@ impl InMemoryLayer {
} = serialized_batch;
// Write the batch to the file
inner.file.write_raw(&raw, ctx).await?;
inner
.file
.write_raw(&raw, ctx)
.await
.map_err(|e| InMemoryLayerError::Other(anyhow::anyhow!(e)))?;
let new_size = inner.file.len();
let expected_new_len = base_offset
@@ -637,7 +652,8 @@ impl InMemoryLayer {
batch_offset,
len,
will_init,
})?;
})
.map_err(|e| InMemoryLayerError::Other(anyhow::anyhow!(e)))?;
let vec_map = inner.index.entry(key).or_default();
let old = vec_map.append_or_update_last(lsn, index_entry).unwrap().0;
@@ -794,14 +810,25 @@ impl InMemoryLayer {
ctx,
)
.await;
res?;
res.map_err(|e| match e {
DeltaLayerWriterError::Cancelled => {
anyhow::anyhow!("flush task cancelled")
}
DeltaLayerWriterError::Other(err) => err,
})?;
}
}
}
}
// MAX is used here because we identify L0 layers by full key range
let (desc, path) = delta_layer_writer.finish(Key::MAX, ctx).await?;
let (desc, path) = delta_layer_writer
.finish(Key::MAX, ctx)
.await
.map_err(|e| match e {
DeltaLayerWriterError::Cancelled => anyhow::anyhow!("flush task cancelled"),
DeltaLayerWriterError::Other(err) => err,
})?;
// Hold the permit until all the IO is done, including the fsync in `delta_layer_writer.finish()``.
//

View File

@@ -300,6 +300,7 @@ pub(crate) fn log_compaction_error(
let level = match err {
e if e.is_cancel() => return,
ShuttingDown => return,
Cancelled => return,
Offload(_) => Level::ERROR,
AlreadyRunning(_) => Level::ERROR,
CollectKeySpaceError(_) => Level::ERROR,

View File

@@ -119,6 +119,8 @@ use crate::tenant::gc_result::GcResult;
use crate::tenant::layer_map::LayerMap;
use crate::tenant::metadata::TimelineMetadata;
use crate::tenant::storage_layer::delta_layer::DeltaEntry;
use crate::tenant::storage_layer::image_layer::ImageLayerWriterError;
use crate::tenant::storage_layer::inmemory_layer::InMemoryLayerError;
use crate::tenant::storage_layer::inmemory_layer::IndexEntry;
use crate::tenant::storage_layer::{
AsLayerDesc, BatchLayerWriter, DeltaLayerWriter, EvictionError, ImageLayerName,
@@ -773,6 +775,21 @@ impl From<layer_manager::Shutdown> for CreateImageLayersError {
}
}
impl From<crate::tenant::storage_layer::batch_split_writer::BatchSplitWriterError>
for CreateImageLayersError
{
fn from(err: crate::tenant::storage_layer::batch_split_writer::BatchSplitWriterError) -> Self {
match err {
crate::tenant::storage_layer::batch_split_writer::BatchSplitWriterError::Cancelled => {
Self::Cancelled
}
crate::tenant::storage_layer::batch_split_writer::BatchSplitWriterError::Other(err) => {
Self::Other(err)
}
}
}
}
#[derive(thiserror::Error, Debug, Clone)]
pub(crate) enum FlushLayerError {
/// Timeline cancellation token was cancelled
@@ -2041,6 +2058,9 @@ impl Timeline {
Err(CompactionError::ShuttingDown) => {
// Covered by the `Err(e) if e.is_cancel()` branch.
}
Err(CompactionError::Cancelled) => {
// Covered by the `Err(e) if e.is_cancel()` branch.
}
Err(CompactionError::AlreadyRunning(_)) => {
// Covered by the `Err(e) if e.is_cancel()` branch.
}
@@ -5232,7 +5252,17 @@ impl Timeline {
};
// Write all the keys we just read into our new image layer.
image_layer_writer.put_image(img_key, img, ctx).await?;
image_layer_writer
.put_image(img_key, img, ctx)
.await
.map_err(|e| match e {
ImageLayerWriterError::Cancelled => CreateImageLayersError::Other(
anyhow::anyhow!("flush task cancelled"),
),
ImageLayerWriterError::Other(err) => {
CreateImageLayersError::Other(err)
}
})?;
wrote_keys = true;
}
}
@@ -5329,7 +5359,15 @@ impl Timeline {
// TODO: split image layers to avoid too large layer files. Too large image files are not handled
// on the normal data path either.
image_layer_writer.put_image(k, v, ctx).await?;
image_layer_writer
.put_image(k, v, ctx)
.await
.map_err(|e| match e {
ImageLayerWriterError::Cancelled => {
CreateImageLayersError::Other(anyhow::anyhow!("flush task cancelled"))
}
ImageLayerWriterError::Other(err) => CreateImageLayersError::Other(err),
})?;
}
if wrote_any_image {
@@ -5843,6 +5881,8 @@ pub(crate) enum CompactionError {
Other(anyhow::Error),
#[error("Compaction already running: {0}")]
AlreadyRunning(&'static str),
#[error("cancelled")]
Cancelled,
}
impl CompactionError {
@@ -5857,6 +5897,7 @@ impl CompactionError {
PageReconstructError::Cancelled
))
| Self::Offload(OffloadError::Cancelled)
| Self::Cancelled
)
}
@@ -6922,9 +6963,22 @@ impl Timeline {
)
.await?;
for (key, img) in images {
image_layer_writer.put_image(key, img, ctx).await?;
image_layer_writer
.put_image(key, img, ctx)
.await
.map_err(|e| match e {
ImageLayerWriterError::Cancelled => {
anyhow::anyhow!("flush task cancelled")
}
ImageLayerWriterError::Other(err) => err,
})?;
}
let (desc, path) = image_layer_writer.finish(ctx).await?;
let (desc, path) = image_layer_writer.finish(ctx).await.map_err(|e| match e {
ImageLayerWriterError::Cancelled => {
anyhow::anyhow!("flush task cancelled")
}
ImageLayerWriterError::Other(err) => err,
})?;
let image_layer = Layer::finish_creating(self.conf, self, desc, &path)?;
info!("force created image layer {}", image_layer.local_path());
{
@@ -7378,7 +7432,10 @@ impl TimelineWriter<'_> {
state.max_lsn = std::cmp::max(state.max_lsn, Some(batch_max_lsn));
}
res
res.map_err(|e| match e {
InMemoryLayerError::Cancelled => anyhow::anyhow!("flush task cancelled"),
InMemoryLayerError::Other(err) => err,
})
}
#[cfg(test)]

View File

@@ -1547,7 +1547,7 @@ impl Timeline {
ctx,
)
.await
.map_err(CompactionError::Other)?;
.map_err(|e| CompactionError::Other(anyhow::anyhow!(e)))?;
// Safety of layer rewrites:
// - We are writing to a different local file path than we are reading from, so the old Layer
@@ -1572,7 +1572,7 @@ impl Timeline {
let (desc, path) = image_layer_writer
.finish(ctx)
.await
.map_err(CompactionError::Other)?;
.map_err(|e| CompactionError::Other(anyhow::anyhow!(e)))?;
let new_layer = Layer::finish_creating(self.conf, self, desc, &path)
.map_err(CompactionError::Other)?;
info!(layer=%new_layer, "rewrote layer, {} -> {} bytes",
@@ -2140,7 +2140,7 @@ impl Timeline {
.unwrap()
.finish(prev_key.unwrap().next(), ctx)
.await
.map_err(CompactionError::Other)?;
.map_err(|e| CompactionError::Other(anyhow::anyhow!(e)))?;
let new_delta = Layer::finish_creating(self.conf, self, desc, &path)
.map_err(CompactionError::Other)?;
@@ -2199,7 +2199,7 @@ impl Timeline {
.unwrap()
.put_value(key, lsn, value, ctx)
.await
.map_err(CompactionError::Other)?;
.map_err(|e| CompactionError::Other(anyhow::anyhow!(e)))?;
} else {
let owner = self.shard_identity.get_shard_number(&key);
@@ -2217,7 +2217,7 @@ impl Timeline {
let (desc, path) = writer
.finish(prev_key.unwrap().next(), ctx)
.await
.map_err(CompactionError::Other)?;
.map_err(|e| CompactionError::Other(anyhow::anyhow!(e)))?;
let new_delta = Layer::finish_creating(self.conf, self, desc, &path)
.map_err(CompactionError::Other)?;
new_layers.push(new_delta);
@@ -3682,8 +3682,7 @@ impl Timeline {
let (desc, path) = delta_writer_before
.finish(job_desc.compaction_key_range.start, ctx)
.await
.context("failed to finish delta layer writer")
.map_err(CompactionError::Other)?;
.map_err(|e| CompactionError::Other(anyhow::anyhow!(e)))?;
let layer = Layer::finish_creating(self.conf, self, desc, &path)
.context("failed to finish creating delta layer")
.map_err(CompactionError::Other)?;
@@ -3693,8 +3692,7 @@ impl Timeline {
let (desc, path) = delta_writer_after
.finish(key.key_range.end, ctx)
.await
.context("failed to finish delta layer writer")
.map_err(CompactionError::Other)?;
.map_err(|e| CompactionError::Other(anyhow::anyhow!(e)))?;
let layer = Layer::finish_creating(self.conf, self, desc, &path)
.context("failed to finish creating delta layer")
.map_err(CompactionError::Other)?;
@@ -3713,8 +3711,7 @@ impl Timeline {
writer
.finish_with_discard_fn(self, ctx, end_key, discard)
.await
.context("failed to finish image layer writer")
.map_err(CompactionError::Other)?
.map_err(|e| CompactionError::Other(anyhow::anyhow!(e)))?
} else {
drop(writer);
Vec::new()
@@ -3727,8 +3724,7 @@ impl Timeline {
delta_layer_writer
.finish_with_discard_fn(self, ctx, discard)
.await
.context("failed to finish delta layer writer")
.map_err(CompactionError::Other)?
.map_err(|e| CompactionError::Other(anyhow::anyhow!(e)))?
} else {
drop(delta_layer_writer);
Vec::new()
@@ -4253,7 +4249,10 @@ impl TimelineAdaptor {
unfinished_image_layer,
} = outcome
{
let (desc, path) = unfinished_image_layer.finish(ctx).await?;
let (desc, path) = unfinished_image_layer
.finish(ctx)
.await
.map_err(|e| CreateImageLayersError::Other(anyhow::anyhow!(e)))?;
let image_layer =
Layer::finish_creating(self.timeline.conf, &self.timeline, desc, &path)?;
self.new_images.push(image_layer);

View File

@@ -1,6 +1,7 @@
use std::collections::HashSet;
use std::sync::Arc;
use crate::tenant::storage_layer::delta_layer::DeltaLayerWriterError;
use anyhow::Context;
use bytes::Bytes;
use http_utils::error::ApiError;
@@ -816,7 +817,10 @@ async fn copy_lsn_prefix(
let (desc, path) = writer
.finish(reused_highest_key, ctx)
.await
.map_err(Error::Prepare)?;
.map_err(|e| match e {
DeltaLayerWriterError::Cancelled => Error::ShuttingDown,
DeltaLayerWriterError::Other(err) => Error::Prepare(err),
})?;
let copied = Layer::finish_creating(target_timeline.conf, target_timeline, desc, &path)
.map_err(Error::Prepare)?;