diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 1d9b91c9ce..5a8a23e81f 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -1881,7 +1881,7 @@ impl Tenant { &self, cancel: &CancellationToken, ctx: &RequestContext, - ) -> anyhow::Result<(), timeline::CompactionError> { + ) -> Result<(), timeline::CompactionError> { // Don't start doing work during shutdown, or when broken, we do not need those in the logs if !self.is_active() { return Ok(()); diff --git a/pageserver/src/tenant/layer_map.rs b/pageserver/src/tenant/layer_map.rs index 9b6225501f..a61fd9890c 100644 --- a/pageserver/src/tenant/layer_map.rs +++ b/pageserver/src/tenant/layer_map.rs @@ -619,8 +619,8 @@ impl LayerMap { } /// Return all L0 delta layers - pub fn get_level0_deltas(&self) -> Result>> { - Ok(self.l0_delta_layers.to_vec()) + pub fn get_level0_deltas(&self) -> Vec> { + self.l0_delta_layers.to_vec() } /// debugging function to print out the contents of the layer map diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 1b5f861c90..05a9bd1ef4 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -237,7 +237,7 @@ use utils::id::{TenantId, TimelineId}; use self::index::IndexPart; use super::storage_layer::{Layer, LayerFileName, ResidentLayer}; -use super::upload_queue::SetDeletedFlagProgress; +use super::upload_queue::{self, SetDeletedFlagProgress}; use super::Generation; pub(crate) use download::{is_temp_download_file, list_remote_timelines}; @@ -621,7 +621,9 @@ impl RemoteTimelineClient { /// /// Like schedule_index_upload_for_metadata_update(), this merely adds /// the upload to the upload queue and returns quickly. - pub fn schedule_index_upload_for_file_changes(self: &Arc) -> anyhow::Result<()> { + pub(crate) fn schedule_index_upload_for_file_changes( + self: &Arc, + ) -> Result<(), upload_queue::NotInitialized> { let mut guard = self.upload_queue.lock().unwrap(); let upload_queue = guard.initialized_mut()?; @@ -666,7 +668,7 @@ impl RemoteTimelineClient { pub(crate) fn schedule_layer_file_upload( self: &Arc, layer: ResidentLayer, - ) -> anyhow::Result<()> { + ) -> Result<(), upload_queue::NotInitialized> { let mut guard = self.upload_queue.lock().unwrap(); let upload_queue = guard.initialized_mut()?; @@ -875,7 +877,7 @@ impl RemoteTimelineClient { self: &Arc, compacted_from: &[Layer], compacted_to: &[ResidentLayer], - ) -> anyhow::Result<()> { + ) -> Result<(), upload_queue::NotInitialized> { let mut guard = self.upload_queue.lock().unwrap(); let upload_queue = guard.initialized_mut()?; diff --git a/pageserver/src/tenant/storage_layer/layer.rs b/pageserver/src/tenant/storage_layer/layer.rs index 12af866810..59c86b7f35 100644 --- a/pageserver/src/tenant/storage_layer/layer.rs +++ b/pageserver/src/tenant/storage_layer/layer.rs @@ -290,7 +290,7 @@ impl Layer { } /// Downloads if necessary and creates a guard, which will keep this layer from being evicted. - pub(crate) async fn download_and_keep_resident(&self) -> anyhow::Result { + pub(crate) async fn download_and_keep_resident(&self) -> Result { let downloaded = self.0.get_or_maybe_download(true, None).await?; Ok(ResidentLayer { @@ -1174,7 +1174,7 @@ pub(crate) enum EvictionError { /// Error internal to the [`LayerInner::get_or_maybe_download`] #[derive(Debug, thiserror::Error)] -enum DownloadError { +pub(crate) enum DownloadError { #[error("timeline has already shutdown")] TimelineShutdown, #[error("no remote storage configured")] @@ -1197,6 +1197,15 @@ enum DownloadError { PostStatFailed(#[source] std::io::Error), } +impl DownloadError { + pub fn is_cancelled(&self) -> bool { + match self { + Self::TimelineShutdown | Self::DownloadCancelled => true, + _ => false, + } + } +} + #[derive(Debug, PartialEq)] pub(crate) enum NeedsDownload { NotFound, diff --git a/pageserver/src/tenant/tasks.rs b/pageserver/src/tenant/tasks.rs index 5f39c46a84..024caf570e 100644 --- a/pageserver/src/tenant/tasks.rs +++ b/pageserver/src/tenant/tasks.rs @@ -232,27 +232,7 @@ fn log_compaction_error( let decision = match e { ShuttingDown => None, _ if task_cancelled => Some(LooksLike::Info), - Other(e) => { - let root_cause = e.root_cause(); - - let is_stopping = { - let upload_queue = root_cause - .downcast_ref::() - .is_some_and(|e| e.is_stopping()); - - let timeline = root_cause - .downcast_ref::() - .is_some_and(|e| e.is_stopping()); - - upload_queue || timeline - }; - - if is_stopping { - Some(LooksLike::Info) - } else { - Some(LooksLike::Error) - } - } + Other(e) => Some(LooksLike::Error), }; match decision { diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 598e6e0385..cc579f98c8 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -103,11 +103,14 @@ use self::layer_manager::LayerManager; use self::logical_size::LogicalSize; use self::walreceiver::{WalReceiver, WalReceiverConf}; -use super::config::TenantConf; -use super::remote_timeline_client::index::{IndexLayerMetadata, IndexPart}; use super::remote_timeline_client::RemoteTimelineClient; use super::secondary::heatmap::{HeatMapLayer, HeatMapTimeline}; +use super::{config::TenantConf, upload_queue::NotInitialized}; use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf}; +use super::{ + remote_timeline_client::index::{IndexLayerMetadata, IndexPart}, + storage_layer::layer, +}; #[derive(Debug, PartialEq, Eq, Clone, Copy)] pub(super) enum FlushLoopState { @@ -844,8 +847,7 @@ impl Timeline { // "enough". let layers = self .create_image_layers(&partitioning, lsn, false, &image_ctx) - .await - .map_err(anyhow::Error::from)?; + .await?; if let Some(remote_client) = &self.remote_client { for layer in layers { remote_client.schedule_layer_file_upload(layer)?; @@ -3212,7 +3214,46 @@ pub(crate) enum CompactionError { ShuttingDown, /// Compaction cannot be done right now; page reconstruction and so on. #[error(transparent)] - Other(#[from] anyhow::Error), + Other(anyhow::Error), +} + +impl CompactionError { + fn other(err: E) -> Self + where + E: std::error::Error + Send + Sync + 'static, + { + CompactionError::Other(anyhow::Error::new(err)) + } +} + +impl From for CompactionError { + fn from(value: PageReconstructError) -> Self { + if value.is_stopping() { + CompactionError::ShuttingDown + } else { + CompactionError::other(value) + } + } +} + +impl From for CompactionError { + fn from(value: NotInitialized) -> Self { + if value.is_stopping() { + CompactionError::ShuttingDown + } else { + CompactionError::other(value) + } + } +} + +impl From for CompactionError { + fn from(value: layer::DownloadError) -> Self { + if value.is_cancelled() { + CompactionError::ShuttingDown + } else { + CompactionError::other(value) + } + } } #[serde_as] @@ -3345,7 +3386,7 @@ impl Timeline { stats.read_lock_held_spawn_blocking_startup_micros = stats.read_lock_acquisition_micros.till_now(); // set by caller let layers = guard.layer_map(); - let level0_deltas = layers.get_level0_deltas()?; + let level0_deltas = layers.get_level0_deltas(); let mut level0_deltas = level0_deltas .into_iter() .map(|x| guard.get_from_desc(&x)) @@ -3392,7 +3433,8 @@ impl Timeline { delta .download_and_keep_resident() .await - .context("download layer for failpoint")?, + .context("download layer for failpoint") + .map_err(CompactionError::Other)?, ); } tracing::info!("compact-level0-phase1-return-same"); // so that we can check if we hit the failpoint @@ -3476,7 +3518,7 @@ impl Timeline { let mut all_keys = Vec::new(); for l in deltas_to_compact.iter() { - all_keys.extend(l.load_keys(ctx).await?); + all_keys.extend(l.load_keys(ctx).await.map_err(CompactionError::Other)?); } // FIXME: should spawn_blocking the rest of this function @@ -3496,7 +3538,10 @@ impl Timeline { // has not so much sense, because largest holes will corresponds field1/field2 changes. // But we are mostly interested to eliminate holes which cause generation of excessive image layers. // That is why it is better to measure size of hole as number of covering image layers. - let coverage_size = layers.image_coverage(&key_range, last_record_lsn)?.len(); + let coverage_size = layers + .image_coverage(&key_range, last_record_lsn) + .map_err(CompactionError::Other)? + .len(); if coverage_size >= min_hole_coverage_size { heap.push(Hole { key_range, @@ -3595,7 +3640,7 @@ impl Timeline { key, lsn, ref val, .. } in all_values_iter { - let value = val.load(ctx).await?; + let value = val.load(ctx).await.map_err(CompactionError::Other)?; let same_key = prev_key.map_or(false, |prev_key| prev_key == key); // We need to check key boundaries once we reach next key or end of layer with the same key if !same_key || lsn == dup_end_lsn { @@ -3652,7 +3697,8 @@ impl Timeline { .take() .unwrap() .finish(prev_key.unwrap().next(), self) - .await?, + .await + .map_err(CompactionError::Other)?, ); writer = None; @@ -3682,7 +3728,8 @@ impl Timeline { lsn_range.clone() }, ) - .await?, + .await + .map_err(CompactionError::Other)?, ); } @@ -3693,7 +3740,12 @@ impl Timeline { }); if !self.shard_identity.is_key_disposable(&key) { - writer.as_mut().unwrap().put_value(key, lsn, value).await?; + writer + .as_mut() + .unwrap() + .put_value(key, lsn, value) + .await + .map_err(CompactionError::Other)?; } else { debug!( "Dropping key {} during compaction (it belongs on shard {:?})", @@ -3709,7 +3761,12 @@ impl Timeline { prev_key = Some(key); } if let Some(writer) = writer { - new_layers.push(writer.finish(prev_key.unwrap().next(), self).await?); + new_layers.push( + writer + .finish(prev_key.unwrap().next(), self) + .await + .map_err(CompactionError::Other)?, + ); } // Sync layers @@ -3738,7 +3795,8 @@ impl Timeline { // minimize latency. par_fsync::par_fsync_async(&layer_paths) .await - .context("fsync all new layers")?; + .context("fsync all new layers") + .map_err(CompactionError::Other)?; let timeline_dir = self .conf @@ -3746,7 +3804,8 @@ impl Timeline { par_fsync::par_fsync_async(&[timeline_dir]) .await - .context("fsync of timeline dir")?; + .context("fsync of timeline dir") + .map_err(CompactionError::Other)?; } stats.write_layer_files_micros = stats.read_lock_drop_micros.till_now(); diff --git a/pageserver/src/tenant/upload_queue.rs b/pageserver/src/tenant/upload_queue.rs index 0b61bc0a10..aa094346d6 100644 --- a/pageserver/src/tenant/upload_queue.rs +++ b/pageserver/src/tenant/upload_queue.rs @@ -234,7 +234,9 @@ impl UploadQueue { Ok(self.initialized_mut().expect("we just set it")) } - pub(crate) fn initialized_mut(&mut self) -> anyhow::Result<&mut UploadQueueInitialized> { + pub(crate) fn initialized_mut( + &mut self, + ) -> Result<&mut UploadQueueInitialized, NotInitialized> { use UploadQueue::*; match self { Uninitialized => Err(NotInitialized::Uninitialized.into()),