Address PR comments: restore maybe_fatal_err, use anyhow::Error::new, add TODO comments, fix string matching

Co-Authored-By: christian@neon.tech <christian@neon.tech>
This commit is contained in:
Devin AI
2025-05-06 19:17:57 +00:00
parent 000503b38a
commit 486d9f0c4d
5 changed files with 30 additions and 43 deletions

View File

@@ -92,7 +92,7 @@ impl Header {
#[derive(Debug, thiserror::Error)]
pub enum BlobWriterError {
#[error("flush task cancelled")]
#[error("cancelled")]
Cancelled,
#[error(transparent)]
Other(anyhow::Error),
@@ -247,7 +247,7 @@ where
ctx: &RequestContext,
flush_task_span: tracing::Span,
) -> Result<Self, BlobWriterError> {
let gate_token = gate.enter().map_err(|e| BlobWriterError::Other(e.into()))?;
let gate_token = gate.enter().map_err(|_| BlobWriterError::Cancelled)?;
Ok(Self {
io_buf: Some(BytesMut::new()),

View File

@@ -485,7 +485,7 @@ impl DeltaLayerWriterInner {
ctx: &RequestContext,
) -> Result<(), DeltaLayerWriterError> {
let val_ser =
Value::ser(&val).map_err(|e| DeltaLayerWriterError::Other(anyhow::anyhow!(e)))?;
Value::ser(&val).map_err(|e| DeltaLayerWriterError::Other(anyhow::Error::new(e)))?;
let (_, res) = self
.put_value_bytes(key, lsn, val_ser.slice_len(), val.will_init(), ctx)
@@ -585,14 +585,14 @@ impl DeltaLayerWriterInner {
let (index_root_blk, block_buf) = self
.tree
.finish()
.map_err(|e| DeltaLayerWriterError::Other(anyhow::anyhow!(e)))?;
.map_err(|e| DeltaLayerWriterError::Other(anyhow::Error::new(e)))?;
let mut offset = index_start_blk as u64 * PAGE_SZ as u64;
// TODO(yuchen): https://github.com/neondatabase/neon/issues/10092
// Should we just replace BlockBuf::blocks with one big buffer
for buf in block_buf.blocks {
let (_buf, res) = file.write_all_at(buf.slice_len(), offset, ctx).await;
res.map_err(|e| DeltaLayerWriterError::Other(anyhow::anyhow!(e)))?;
res.map_err(|e| DeltaLayerWriterError::Other(anyhow::Error::new(e)))?;
offset += PAGE_SZ as u64;
}
assert!(self.lsn_range.start < self.lsn_range.end);
@@ -611,15 +611,12 @@ impl DeltaLayerWriterInner {
// Writes summary at the first block (offset 0).
let buf = summary
.ser_into_page()
.map_err(|e| DeltaLayerWriterError::Other(anyhow::anyhow!(e)))?;
.map_err(|e| DeltaLayerWriterError::Other(anyhow::Error::new(e)))?;
let (_buf, res) = file.write_all_at(buf.slice_len(), 0, ctx).await;
res.map_err(|e| DeltaLayerWriterError::Other(anyhow::anyhow!(e)))?;
res.map_err(|e| DeltaLayerWriterError::Other(anyhow::Error::new(e)))?;
let metadata = file.metadata().await.map_err(|e| {
DeltaLayerWriterError::Other(anyhow::anyhow!(
"get file metadata to determine size: {}",
e
))
DeltaLayerWriterError::Other(anyhow::Error::new(e))
})?;
// 5GB limit for objects without multipart upload (which we don't want to use)
@@ -632,6 +629,7 @@ impl DeltaLayerWriterInner {
metadata.len()
)));
}
// Note: Because we opened the file in write-only mode, we cannot
// reuse the same VirtualFile for reading later. That's why we don't
@@ -646,9 +644,9 @@ impl DeltaLayerWriterInner {
);
// fsync the file
file.sync_all().await.map_err(|e| {
DeltaLayerWriterError::Other(anyhow::anyhow!("delta_layer sync_all: {}", e))
})?;
file.sync_all()
.await
.maybe_fatal_err("delta_layer sync_all")?;
trace!("created delta layer {}", self.path);
@@ -786,7 +784,7 @@ impl DeltaLayerWriter {
#[derive(Debug, thiserror::Error)]
pub enum DeltaLayerWriterError {
#[error("flush task cancelled")]
#[error("cancelled")]
Cancelled,
#[error(transparent)]
Other(anyhow::Error),
@@ -802,7 +800,7 @@ pub enum RewriteSummaryError {
impl From<std::io::Error> for RewriteSummaryError {
fn from(e: std::io::Error) -> Self {
Self::Other(anyhow::anyhow!(e))
Self::Other(anyhow::Error::new(e))
}
}

View File

@@ -5863,6 +5863,8 @@ pub(crate) enum CompactionError {
Other(anyhow::Error),
#[error("Compaction already running: {0}")]
AlreadyRunning(&'static str),
#[error("cancelled")]
Cancelled,
}
impl CompactionError {
@@ -5877,6 +5879,7 @@ impl CompactionError {
PageReconstructError::Cancelled
))
| Self::Offload(OffloadError::Cancelled)
| Self::Cancelled
)
}

View File

@@ -1549,12 +1549,9 @@ impl Timeline {
ctx,
)
.await
.map_err(|e| {
if e.to_string().contains("flush task cancelled") {
CompactionError::Other(anyhow::anyhow!("flush task cancelled"))
} else {
CompactionError::Other(e)
}
.map_err(|e| match e {
ImageLayerWriterError::Cancelled => CompactionError::Cancelled,
ImageLayerWriterError::Other(err) => CompactionError::Other(err),
})?;
// Safety of layer rewrites:
@@ -2237,12 +2234,9 @@ impl Timeline {
let (desc, path) = writer
.finish(prev_key.unwrap().next(), ctx)
.await
.map_err(|e| {
if e.to_string().contains("flush task cancelled") {
CompactionError::Other(anyhow::anyhow!("flush task cancelled"))
} else {
CompactionError::Other(e.into())
}
.map_err(|e| match e {
DeltaLayerWriterError::Cancelled => CompactionError::Cancelled,
DeltaLayerWriterError::Other(err) => CompactionError::Other(err),
})?;
let new_delta = Layer::finish_creating(self.conf, self, desc, &path)
.map_err(CompactionError::Other)?;
@@ -3747,12 +3741,9 @@ impl Timeline {
writer
.finish_with_discard_fn(self, ctx, end_key, discard)
.await
.map_err(|e| {
if e.to_string().contains("flush task cancelled") {
CompactionError::Other(anyhow::anyhow!("flush task cancelled"))
} else {
CompactionError::Other(e.into())
}
.map_err(|e| match e {
ImageLayerWriterError::Cancelled => CompactionError::Cancelled,
ImageLayerWriterError::Other(err) => CompactionError::Other(err),
})?
} else {
drop(writer);
@@ -3766,12 +3757,9 @@ impl Timeline {
delta_layer_writer
.finish_with_discard_fn(self, ctx, discard)
.await
.map_err(|e| {
if e.to_string().contains("flush task cancelled") {
CompactionError::Other(anyhow::anyhow!("flush task cancelled"))
} else {
CompactionError::Other(e.into())
}
.map_err(|e| match e {
DeltaLayerWriterError::Cancelled => CompactionError::Cancelled,
DeltaLayerWriterError::Other(err) => CompactionError::Other(err),
})?
} else {
drop(delta_layer_writer);

View File

@@ -818,9 +818,7 @@ async fn copy_lsn_prefix(
.finish(reused_highest_key, ctx)
.await
.map_err(|e| match e {
DeltaLayerWriterError::Cancelled => {
Error::Prepare(anyhow::anyhow!("flush task cancelled"))
}
DeltaLayerWriterError::Cancelled => Error::ShuttingDown,
DeltaLayerWriterError::Other(err) => Error::Prepare(err),
})?;
let copied = Layer::finish_creating(target_timeline.conf, target_timeline, desc, &path)