mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-16 01:42:55 +00:00
avoid traversing the anyhow Cause chain
Observation: there was only a thin layer of anyhow between the types for which log_compaction_error chased down the cause chain and the types and the conversion to CompactionError::Other. So, remove the implicit #[from] conversion generated by thiserror, and de-`anyhow`ify / explicitly opt-into-`::Other` all the places that used it previously.
This commit is contained in:
@@ -1881,7 +1881,7 @@ impl Tenant {
|
||||
&self,
|
||||
cancel: &CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<(), timeline::CompactionError> {
|
||||
) -> Result<(), timeline::CompactionError> {
|
||||
// Don't start doing work during shutdown, or when broken, we do not need those in the logs
|
||||
if !self.is_active() {
|
||||
return Ok(());
|
||||
|
||||
@@ -619,8 +619,8 @@ impl LayerMap {
|
||||
}
|
||||
|
||||
/// Return all L0 delta layers
|
||||
pub fn get_level0_deltas(&self) -> Result<Vec<Arc<PersistentLayerDesc>>> {
|
||||
Ok(self.l0_delta_layers.to_vec())
|
||||
pub fn get_level0_deltas(&self) -> Vec<Arc<PersistentLayerDesc>> {
|
||||
self.l0_delta_layers.to_vec()
|
||||
}
|
||||
|
||||
/// debugging function to print out the contents of the layer map
|
||||
|
||||
@@ -237,7 +237,7 @@ use utils::id::{TenantId, TimelineId};
|
||||
use self::index::IndexPart;
|
||||
|
||||
use super::storage_layer::{Layer, LayerFileName, ResidentLayer};
|
||||
use super::upload_queue::SetDeletedFlagProgress;
|
||||
use super::upload_queue::{self, SetDeletedFlagProgress};
|
||||
use super::Generation;
|
||||
|
||||
pub(crate) use download::{is_temp_download_file, list_remote_timelines};
|
||||
@@ -621,7 +621,9 @@ impl RemoteTimelineClient {
|
||||
///
|
||||
/// Like schedule_index_upload_for_metadata_update(), this merely adds
|
||||
/// the upload to the upload queue and returns quickly.
|
||||
pub fn schedule_index_upload_for_file_changes(self: &Arc<Self>) -> anyhow::Result<()> {
|
||||
pub(crate) fn schedule_index_upload_for_file_changes(
|
||||
self: &Arc<Self>,
|
||||
) -> Result<(), upload_queue::NotInitialized> {
|
||||
let mut guard = self.upload_queue.lock().unwrap();
|
||||
let upload_queue = guard.initialized_mut()?;
|
||||
|
||||
@@ -666,7 +668,7 @@ impl RemoteTimelineClient {
|
||||
pub(crate) fn schedule_layer_file_upload(
|
||||
self: &Arc<Self>,
|
||||
layer: ResidentLayer,
|
||||
) -> anyhow::Result<()> {
|
||||
) -> Result<(), upload_queue::NotInitialized> {
|
||||
let mut guard = self.upload_queue.lock().unwrap();
|
||||
let upload_queue = guard.initialized_mut()?;
|
||||
|
||||
@@ -875,7 +877,7 @@ impl RemoteTimelineClient {
|
||||
self: &Arc<Self>,
|
||||
compacted_from: &[Layer],
|
||||
compacted_to: &[ResidentLayer],
|
||||
) -> anyhow::Result<()> {
|
||||
) -> Result<(), upload_queue::NotInitialized> {
|
||||
let mut guard = self.upload_queue.lock().unwrap();
|
||||
let upload_queue = guard.initialized_mut()?;
|
||||
|
||||
|
||||
@@ -290,7 +290,7 @@ impl Layer {
|
||||
}
|
||||
|
||||
/// Downloads if necessary and creates a guard, which will keep this layer from being evicted.
|
||||
pub(crate) async fn download_and_keep_resident(&self) -> anyhow::Result<ResidentLayer> {
|
||||
pub(crate) async fn download_and_keep_resident(&self) -> Result<ResidentLayer, DownloadError> {
|
||||
let downloaded = self.0.get_or_maybe_download(true, None).await?;
|
||||
|
||||
Ok(ResidentLayer {
|
||||
@@ -1174,7 +1174,7 @@ pub(crate) enum EvictionError {
|
||||
|
||||
/// Error internal to the [`LayerInner::get_or_maybe_download`]
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
enum DownloadError {
|
||||
pub(crate) enum DownloadError {
|
||||
#[error("timeline has already shutdown")]
|
||||
TimelineShutdown,
|
||||
#[error("no remote storage configured")]
|
||||
@@ -1197,6 +1197,15 @@ enum DownloadError {
|
||||
PostStatFailed(#[source] std::io::Error),
|
||||
}
|
||||
|
||||
impl DownloadError {
|
||||
pub fn is_cancelled(&self) -> bool {
|
||||
match self {
|
||||
Self::TimelineShutdown | Self::DownloadCancelled => true,
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub(crate) enum NeedsDownload {
|
||||
NotFound,
|
||||
|
||||
@@ -232,27 +232,7 @@ fn log_compaction_error(
|
||||
let decision = match e {
|
||||
ShuttingDown => None,
|
||||
_ if task_cancelled => Some(LooksLike::Info),
|
||||
Other(e) => {
|
||||
let root_cause = e.root_cause();
|
||||
|
||||
let is_stopping = {
|
||||
let upload_queue = root_cause
|
||||
.downcast_ref::<NotInitialized>()
|
||||
.is_some_and(|e| e.is_stopping());
|
||||
|
||||
let timeline = root_cause
|
||||
.downcast_ref::<PageReconstructError>()
|
||||
.is_some_and(|e| e.is_stopping());
|
||||
|
||||
upload_queue || timeline
|
||||
};
|
||||
|
||||
if is_stopping {
|
||||
Some(LooksLike::Info)
|
||||
} else {
|
||||
Some(LooksLike::Error)
|
||||
}
|
||||
}
|
||||
Other(e) => Some(LooksLike::Error),
|
||||
};
|
||||
|
||||
match decision {
|
||||
|
||||
@@ -103,11 +103,14 @@ use self::layer_manager::LayerManager;
|
||||
use self::logical_size::LogicalSize;
|
||||
use self::walreceiver::{WalReceiver, WalReceiverConf};
|
||||
|
||||
use super::config::TenantConf;
|
||||
use super::remote_timeline_client::index::{IndexLayerMetadata, IndexPart};
|
||||
use super::remote_timeline_client::RemoteTimelineClient;
|
||||
use super::secondary::heatmap::{HeatMapLayer, HeatMapTimeline};
|
||||
use super::{config::TenantConf, upload_queue::NotInitialized};
|
||||
use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf};
|
||||
use super::{
|
||||
remote_timeline_client::index::{IndexLayerMetadata, IndexPart},
|
||||
storage_layer::layer,
|
||||
};
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
|
||||
pub(super) enum FlushLoopState {
|
||||
@@ -844,8 +847,7 @@ impl Timeline {
|
||||
// "enough".
|
||||
let layers = self
|
||||
.create_image_layers(&partitioning, lsn, false, &image_ctx)
|
||||
.await
|
||||
.map_err(anyhow::Error::from)?;
|
||||
.await?;
|
||||
if let Some(remote_client) = &self.remote_client {
|
||||
for layer in layers {
|
||||
remote_client.schedule_layer_file_upload(layer)?;
|
||||
@@ -3212,7 +3214,46 @@ pub(crate) enum CompactionError {
|
||||
ShuttingDown,
|
||||
/// Compaction cannot be done right now; page reconstruction and so on.
|
||||
#[error(transparent)]
|
||||
Other(#[from] anyhow::Error),
|
||||
Other(anyhow::Error),
|
||||
}
|
||||
|
||||
impl CompactionError {
|
||||
fn other<E>(err: E) -> Self
|
||||
where
|
||||
E: std::error::Error + Send + Sync + 'static,
|
||||
{
|
||||
CompactionError::Other(anyhow::Error::new(err))
|
||||
}
|
||||
}
|
||||
|
||||
impl From<PageReconstructError> for CompactionError {
|
||||
fn from(value: PageReconstructError) -> Self {
|
||||
if value.is_stopping() {
|
||||
CompactionError::ShuttingDown
|
||||
} else {
|
||||
CompactionError::other(value)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<NotInitialized> for CompactionError {
|
||||
fn from(value: NotInitialized) -> Self {
|
||||
if value.is_stopping() {
|
||||
CompactionError::ShuttingDown
|
||||
} else {
|
||||
CompactionError::other(value)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<layer::DownloadError> for CompactionError {
|
||||
fn from(value: layer::DownloadError) -> Self {
|
||||
if value.is_cancelled() {
|
||||
CompactionError::ShuttingDown
|
||||
} else {
|
||||
CompactionError::other(value)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[serde_as]
|
||||
@@ -3345,7 +3386,7 @@ impl Timeline {
|
||||
stats.read_lock_held_spawn_blocking_startup_micros =
|
||||
stats.read_lock_acquisition_micros.till_now(); // set by caller
|
||||
let layers = guard.layer_map();
|
||||
let level0_deltas = layers.get_level0_deltas()?;
|
||||
let level0_deltas = layers.get_level0_deltas();
|
||||
let mut level0_deltas = level0_deltas
|
||||
.into_iter()
|
||||
.map(|x| guard.get_from_desc(&x))
|
||||
@@ -3392,7 +3433,8 @@ impl Timeline {
|
||||
delta
|
||||
.download_and_keep_resident()
|
||||
.await
|
||||
.context("download layer for failpoint")?,
|
||||
.context("download layer for failpoint")
|
||||
.map_err(CompactionError::Other)?,
|
||||
);
|
||||
}
|
||||
tracing::info!("compact-level0-phase1-return-same"); // so that we can check if we hit the failpoint
|
||||
@@ -3476,7 +3518,7 @@ impl Timeline {
|
||||
let mut all_keys = Vec::new();
|
||||
|
||||
for l in deltas_to_compact.iter() {
|
||||
all_keys.extend(l.load_keys(ctx).await?);
|
||||
all_keys.extend(l.load_keys(ctx).await.map_err(CompactionError::Other)?);
|
||||
}
|
||||
|
||||
// FIXME: should spawn_blocking the rest of this function
|
||||
@@ -3496,7 +3538,10 @@ impl Timeline {
|
||||
// has not so much sense, because largest holes will corresponds field1/field2 changes.
|
||||
// But we are mostly interested to eliminate holes which cause generation of excessive image layers.
|
||||
// That is why it is better to measure size of hole as number of covering image layers.
|
||||
let coverage_size = layers.image_coverage(&key_range, last_record_lsn)?.len();
|
||||
let coverage_size = layers
|
||||
.image_coverage(&key_range, last_record_lsn)
|
||||
.map_err(CompactionError::Other)?
|
||||
.len();
|
||||
if coverage_size >= min_hole_coverage_size {
|
||||
heap.push(Hole {
|
||||
key_range,
|
||||
@@ -3595,7 +3640,7 @@ impl Timeline {
|
||||
key, lsn, ref val, ..
|
||||
} in all_values_iter
|
||||
{
|
||||
let value = val.load(ctx).await?;
|
||||
let value = val.load(ctx).await.map_err(CompactionError::Other)?;
|
||||
let same_key = prev_key.map_or(false, |prev_key| prev_key == key);
|
||||
// We need to check key boundaries once we reach next key or end of layer with the same key
|
||||
if !same_key || lsn == dup_end_lsn {
|
||||
@@ -3652,7 +3697,8 @@ impl Timeline {
|
||||
.take()
|
||||
.unwrap()
|
||||
.finish(prev_key.unwrap().next(), self)
|
||||
.await?,
|
||||
.await
|
||||
.map_err(CompactionError::Other)?,
|
||||
);
|
||||
writer = None;
|
||||
|
||||
@@ -3682,7 +3728,8 @@ impl Timeline {
|
||||
lsn_range.clone()
|
||||
},
|
||||
)
|
||||
.await?,
|
||||
.await
|
||||
.map_err(CompactionError::Other)?,
|
||||
);
|
||||
}
|
||||
|
||||
@@ -3693,7 +3740,12 @@ impl Timeline {
|
||||
});
|
||||
|
||||
if !self.shard_identity.is_key_disposable(&key) {
|
||||
writer.as_mut().unwrap().put_value(key, lsn, value).await?;
|
||||
writer
|
||||
.as_mut()
|
||||
.unwrap()
|
||||
.put_value(key, lsn, value)
|
||||
.await
|
||||
.map_err(CompactionError::Other)?;
|
||||
} else {
|
||||
debug!(
|
||||
"Dropping key {} during compaction (it belongs on shard {:?})",
|
||||
@@ -3709,7 +3761,12 @@ impl Timeline {
|
||||
prev_key = Some(key);
|
||||
}
|
||||
if let Some(writer) = writer {
|
||||
new_layers.push(writer.finish(prev_key.unwrap().next(), self).await?);
|
||||
new_layers.push(
|
||||
writer
|
||||
.finish(prev_key.unwrap().next(), self)
|
||||
.await
|
||||
.map_err(CompactionError::Other)?,
|
||||
);
|
||||
}
|
||||
|
||||
// Sync layers
|
||||
@@ -3738,7 +3795,8 @@ impl Timeline {
|
||||
// minimize latency.
|
||||
par_fsync::par_fsync_async(&layer_paths)
|
||||
.await
|
||||
.context("fsync all new layers")?;
|
||||
.context("fsync all new layers")
|
||||
.map_err(CompactionError::Other)?;
|
||||
|
||||
let timeline_dir = self
|
||||
.conf
|
||||
@@ -3746,7 +3804,8 @@ impl Timeline {
|
||||
|
||||
par_fsync::par_fsync_async(&[timeline_dir])
|
||||
.await
|
||||
.context("fsync of timeline dir")?;
|
||||
.context("fsync of timeline dir")
|
||||
.map_err(CompactionError::Other)?;
|
||||
}
|
||||
|
||||
stats.write_layer_files_micros = stats.read_lock_drop_micros.till_now();
|
||||
|
||||
@@ -234,7 +234,9 @@ impl UploadQueue {
|
||||
Ok(self.initialized_mut().expect("we just set it"))
|
||||
}
|
||||
|
||||
pub(crate) fn initialized_mut(&mut self) -> anyhow::Result<&mut UploadQueueInitialized> {
|
||||
pub(crate) fn initialized_mut(
|
||||
&mut self,
|
||||
) -> Result<&mut UploadQueueInitialized, NotInitialized> {
|
||||
use UploadQueue::*;
|
||||
match self {
|
||||
Uninitialized => Err(NotInitialized::Uninitialized.into()),
|
||||
|
||||
Reference in New Issue
Block a user