diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index d63c240365..7935aeb5e9 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -1650,7 +1650,9 @@ async fn timeline_compact_handler( .await .map_err(|e| ApiError::InternalServerError(e.into()))?; if wait_until_uploaded { - timeline.remote_client.wait_completion().await.map_err(ApiError::InternalServerError)?; + timeline.remote_client.wait_completion().await + // XXX map to correct ApiError for the cases where it's due to shutdown + .context("wait completion").map_err(ApiError::InternalServerError)?; } json_response(StatusCode::OK, ()) } @@ -1709,7 +1711,9 @@ async fn timeline_checkpoint_handler( } if wait_until_uploaded { - timeline.remote_client.wait_completion().await.map_err(ApiError::InternalServerError)?; + timeline.remote_client.wait_completion().await + // XXX map to correct ApiError for the cases where it's due to shutdown + .context("wait completion").map_err(ApiError::InternalServerError)?; } json_response(StatusCode::OK, ()) diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index f83c7021e3..f359326cc0 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -1620,7 +1620,7 @@ impl Tenant { &self, cancel: &CancellationToken, ctx: &RequestContext, - ) -> anyhow::Result<(), timeline::CompactionError> { + ) -> Result<(), timeline::CompactionError> { // Don't start doing work during shutdown, or when broken, we do not need those in the logs if !self.is_active() { return Ok(()); @@ -1665,12 +1665,14 @@ impl Tenant { .compact(cancel, EnumSet::empty(), ctx) .instrument(info_span!("compact_timeline", %timeline_id)) .await - .map_err(|e| { - self.compaction_circuit_breaker - .lock() - .unwrap() - .fail(&CIRCUIT_BREAKERS_BROKEN, &e); - e + .inspect_err(|e| match e { + timeline::CompactionError::ShuttingDown => (), + timeline::CompactionError::Other(e) => { + self.compaction_circuit_breaker + .lock() + .unwrap() + .fail(&CIRCUIT_BREAKERS_BROKEN, e); + } })?; } @@ -4568,7 +4570,7 @@ mod tests { let layer_map = tline.layers.read().await; let level0_deltas = layer_map .layer_map() - .get_level0_deltas()? + .get_level0_deltas() .into_iter() .map(|desc| layer_map.get_from_desc(&desc)) .collect::>(); @@ -5787,7 +5789,7 @@ mod tests { .read() .await .layer_map() - .get_level0_deltas()? + .get_level0_deltas() .len(); tline.compact(&cancel, EnumSet::empty(), &ctx).await?; @@ -5797,7 +5799,7 @@ mod tests { .read() .await .layer_map() - .get_level0_deltas()? + .get_level0_deltas() .len(); assert!(after_num_l0_delta_files < before_num_l0_delta_files, "after_num_l0_delta_files={after_num_l0_delta_files}, before_num_l0_delta_files={before_num_l0_delta_files}"); diff --git a/pageserver/src/tenant/layer_map.rs b/pageserver/src/tenant/layer_map.rs index 72167d02ab..6f150a2d5c 100644 --- a/pageserver/src/tenant/layer_map.rs +++ b/pageserver/src/tenant/layer_map.rs @@ -845,8 +845,8 @@ impl LayerMap { } /// Return all L0 delta layers - pub fn get_level0_deltas(&self) -> Result>> { - Ok(self.l0_delta_layers.to_vec()) + pub fn get_level0_deltas(&self) -> Vec> { + self.l0_delta_layers.to_vec() } /// debugging function to print out the contents of the layer map diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index c75d1eaa5e..8b26f122cf 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -287,6 +287,14 @@ pub enum PersistIndexPartWithDeletedFlagError { Other(#[from] anyhow::Error), } +#[derive(Debug, thiserror::Error)] +pub enum WaitCompletionError { + #[error(transparent)] + NotInitialized(NotInitialized), + #[error("wait_completion aborted because upload queue was stopped")] + UploadQueueShutDownOrStopped, +} + /// A client for accessing a timeline's data in remote storage. /// /// This takes care of managing the number of connections, and balancing them @@ -630,7 +638,7 @@ impl RemoteTimelineClient { /// /// Like schedule_index_upload_for_metadata_update(), this merely adds /// the upload to the upload queue and returns quickly. - pub fn schedule_index_upload_for_file_changes(self: &Arc) -> anyhow::Result<()> { + pub fn schedule_index_upload_for_file_changes(self: &Arc) -> Result<(), NotInitialized> { let mut guard = self.upload_queue.lock().unwrap(); let upload_queue = guard.initialized_mut()?; @@ -645,7 +653,7 @@ impl RemoteTimelineClient { fn schedule_index_upload( self: &Arc, upload_queue: &mut UploadQueueInitialized, - ) -> anyhow::Result<()> { + ) -> Result<(), NotInitialized> { let disk_consistent_lsn = upload_queue.dirty.metadata.disk_consistent_lsn(); // fix up the duplicated field upload_queue.dirty.disk_consistent_lsn = disk_consistent_lsn; @@ -653,7 +661,7 @@ impl RemoteTimelineClient { // make sure it serializes before doing it in perform_upload_task so that it doesn't // look like a retryable error let void = std::io::sink(); - serde_json::to_writer(void, &upload_queue.dirty).context("serialize index_part.json")?; + serde_json::to_writer(void, &upload_queue.dirty).expect("serialize index_part.json"); let index_part = &upload_queue.dirty; @@ -699,7 +707,9 @@ impl RemoteTimelineClient { self.schedule_barrier0(upload_queue) }; - Self::wait_completion0(receiver).await + Self::wait_completion0(receiver) + .await + .context("wait completion") } /// Schedules uploading a new version of `index_part.json` with the given layers added, @@ -732,7 +742,9 @@ impl RemoteTimelineClient { barrier }; - Self::wait_completion0(barrier).await + Self::wait_completion0(barrier) + .await + .context("wait completion") } /// Launch an upload operation in the background; the file is added to be included in next @@ -740,7 +752,7 @@ impl RemoteTimelineClient { pub(crate) fn schedule_layer_file_upload( self: &Arc, layer: ResidentLayer, - ) -> anyhow::Result<()> { + ) -> Result<(), NotInitialized> { let mut guard = self.upload_queue.lock().unwrap(); let upload_queue = guard.initialized_mut()?; @@ -826,7 +838,7 @@ impl RemoteTimelineClient { self: &Arc, upload_queue: &mut UploadQueueInitialized, names: I, - ) -> anyhow::Result> + ) -> Result, NotInitialized> where I: IntoIterator, { @@ -952,7 +964,7 @@ impl RemoteTimelineClient { self: &Arc, compacted_from: &[Layer], compacted_to: &[ResidentLayer], - ) -> anyhow::Result<()> { + ) -> Result<(), NotInitialized> { let mut guard = self.upload_queue.lock().unwrap(); let upload_queue = guard.initialized_mut()?; @@ -969,10 +981,12 @@ impl RemoteTimelineClient { } /// Wait for all previously scheduled uploads/deletions to complete - pub(crate) async fn wait_completion(self: &Arc) -> anyhow::Result<()> { + pub(crate) async fn wait_completion(self: &Arc) -> Result<(), WaitCompletionError> { let receiver = { let mut guard = self.upload_queue.lock().unwrap(); - let upload_queue = guard.initialized_mut()?; + let upload_queue = guard + .initialized_mut() + .map_err(WaitCompletionError::NotInitialized)?; self.schedule_barrier0(upload_queue) }; @@ -981,9 +995,9 @@ impl RemoteTimelineClient { async fn wait_completion0( mut receiver: tokio::sync::watch::Receiver<()>, - ) -> anyhow::Result<()> { + ) -> Result<(), WaitCompletionError> { if receiver.changed().await.is_err() { - anyhow::bail!("wait_completion aborted because upload queue was stopped"); + return Err(WaitCompletionError::UploadQueueShutDownOrStopped); } Ok(()) diff --git a/pageserver/src/tenant/storage_layer/layer.rs b/pageserver/src/tenant/storage_layer/layer.rs index 1db3e7c675..619c4d044d 100644 --- a/pageserver/src/tenant/storage_layer/layer.rs +++ b/pageserver/src/tenant/storage_layer/layer.rs @@ -17,7 +17,7 @@ use crate::context::{DownloadBehavior, RequestContext}; use crate::repository::Key; use crate::span::debug_assert_current_span_has_tenant_and_timeline_id; use crate::task_mgr::TaskKind; -use crate::tenant::timeline::GetVectoredError; +use crate::tenant::timeline::{CompactionError, GetVectoredError}; use crate::tenant::{remote_timeline_client::LayerFileMetadata, Timeline}; use super::delta_layer::{self, DeltaEntry}; @@ -426,7 +426,7 @@ impl Layer { } /// Downloads if necessary and creates a guard, which will keep this layer from being evicted. - pub(crate) async fn download_and_keep_resident(&self) -> anyhow::Result { + pub(crate) async fn download_and_keep_resident(&self) -> Result { let downloaded = self.0.get_or_maybe_download(true, None).await?; Ok(ResidentLayer { @@ -1862,12 +1862,24 @@ impl ResidentLayer { shard_identity: &ShardIdentity, writer: &mut ImageLayerWriter, ctx: &RequestContext, - ) -> anyhow::Result { + ) -> Result { use LayerKind::*; - match self.downloaded.get(&self.owner.0, ctx).await? { - Delta(_) => anyhow::bail!(format!("cannot filter() on a delta layer {self}")), - Image(i) => i.filter(shard_identity, writer, ctx).await, + match self + .downloaded + .get(&self.owner.0, ctx) + .await + .map_err(CompactionError::Other)? + { + Delta(_) => { + return Err(CompactionError::Other(anyhow::anyhow!(format!( + "cannot filter() on a delta layer {self}" + )))); + } + Image(i) => i + .filter(shard_identity, writer, ctx) + .await + .map_err(CompactionError::Other), } } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 178b707aa7..8829040c70 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -4786,7 +4786,7 @@ pub(crate) enum CompactionError { ShuttingDown, /// Compaction cannot be done right now; page reconstruction and so on. #[error(transparent)] - Other(#[from] anyhow::Error), + Other(anyhow::Error), } impl From for CompactionError { @@ -4801,6 +4801,38 @@ impl From for CompactionError { } } +impl From for CompactionError { + fn from(value: super::upload_queue::NotInitialized) -> Self { + match value { + super::upload_queue::NotInitialized::Uninitialized + | super::upload_queue::NotInitialized::Stopped => { + CompactionError::Other(anyhow::anyhow!(value)) + } + super::upload_queue::NotInitialized::ShuttingDown => CompactionError::ShuttingDown, + } + } +} + +impl CompactionError { + /// We cannot do compaction because we could not download a layer that is input to the compaction. + pub(crate) fn input_layer_download_failed( + e: super::storage_layer::layer::DownloadError, + ) -> Self { + match e { + super::storage_layer::layer::DownloadError::TimelineShutdown | + /* TODO DownloadCancelled correct here? */ + super::storage_layer::layer::DownloadError::DownloadCancelled => CompactionError::ShuttingDown, + super::storage_layer::layer::DownloadError::ContextAndConfigReallyDeniesDownloads | + super::storage_layer::layer::DownloadError::DownloadRequired | + super::storage_layer::layer::DownloadError::NotFile(_) | + super::storage_layer::layer::DownloadError::DownloadFailed | + super::storage_layer::layer::DownloadError::PreStatFailed(_)=>CompactionError::Other(anyhow::anyhow!(e)), + #[cfg(test)] + super::storage_layer::layer::DownloadError::Failpoint(_) => CompactionError::Other(anyhow::anyhow!(e)), + } + } +} + #[serde_as] #[derive(serde::Serialize)] struct RecordedDuration(#[serde_as(as = "serde_with::DurationMicroSeconds")] Duration); @@ -4874,7 +4906,7 @@ impl Timeline { new_deltas: &[ResidentLayer], new_images: &[ResidentLayer], layers_to_remove: &[Layer], - ) -> anyhow::Result<()> { + ) -> Result<(), CompactionError> { let mut guard = self.layers.write().await; let mut duplicated_layers = HashSet::new(); @@ -4892,7 +4924,7 @@ impl Timeline { // because we have not implemented L0 => L0 compaction. duplicated_layers.insert(l.layer_desc().key()); } else if LayerMap::is_l0(&l.layer_desc().key_range) { - bail!("compaction generates a L0 layer file as output, which will cause infinite compaction."); + return Err(CompactionError::Other(anyhow::anyhow!("compaction generates a L0 layer file as output, which will cause infinite compaction."))); } else { insert_layers.push(l.clone()); } @@ -4924,7 +4956,7 @@ impl Timeline { self: &Arc, mut replace_layers: Vec<(Layer, ResidentLayer)>, mut drop_layers: Vec, - ) -> anyhow::Result<()> { + ) -> Result<(), super::upload_queue::NotInitialized> { let mut guard = self.layers.write().await; // Trim our lists in case our caller (compaction) raced with someone else (GC) removing layers: we want @@ -4946,7 +4978,7 @@ impl Timeline { fn upload_new_image_layers( self: &Arc, new_images: impl IntoIterator, - ) -> anyhow::Result<()> { + ) -> Result<(), super::upload_queue::NotInitialized> { for layer in new_images { self.remote_client.schedule_layer_file_upload(layer)?; } diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index d0a74e3924..487ff6cd80 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -27,6 +27,7 @@ use utils::id::TimelineId; use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder}; use crate::page_cache; use crate::tenant::config::defaults::{DEFAULT_CHECKPOINT_DISTANCE, DEFAULT_COMPACTION_THRESHOLD}; +use crate::tenant::remote_timeline_client::WaitCompletionError; use crate::tenant::storage_layer::merge_iterator::MergeIterator; use crate::tenant::storage_layer::{AsLayerDesc, PersistentLayerDesc, ValueReconstructState}; use crate::tenant::timeline::{drop_rlock, DeltaLayerWriter, ImageLayerWriter}; @@ -108,7 +109,10 @@ impl Timeline { ctx: &RequestContext, ) -> Result<(), CompactionError> { if flags.contains(CompactFlags::EnhancedGcBottomMostCompaction) { - return self.compact_with_gc(cancel, ctx).await; + return self + .compact_with_gc(cancel, ctx) + .await + .map_err(CompactionError::Other); } // High level strategy for compaction / image creation: @@ -236,7 +240,7 @@ impl Timeline { self: &Arc, rewrite_max: usize, ctx: &RequestContext, - ) -> anyhow::Result<()> { + ) -> Result<(), CompactionError> { let mut drop_layers = Vec::new(); let mut layers_to_rewrite: Vec = Vec::new(); @@ -357,7 +361,8 @@ impl Timeline { layer.layer_desc().image_layer_lsn(), ctx, ) - .await?; + .await + .map_err(CompactionError::Other)?; // Safety of layer rewrites: // - We are writing to a different local file path than we are reading from, so the old Layer @@ -372,14 +377,20 @@ impl Timeline { // - We do not run concurrently with other kinds of compaction, so the only layer map writes we race with are: // - GC, which at worst witnesses us "undelete" a layer that they just deleted. // - ingestion, which only inserts layers, therefore cannot collide with us. - let resident = layer.download_and_keep_resident().await?; + let resident = layer + .download_and_keep_resident() + .await + .map_err(CompactionError::input_layer_download_failed)?; let keys_written = resident .filter(&self.shard_identity, &mut image_layer_writer, ctx) .await?; if keys_written > 0 { - let new_layer = image_layer_writer.finish(self, ctx).await?; + let new_layer = image_layer_writer + .finish(self, ctx) + .await + .map_err(CompactionError::Other)?; tracing::info!(layer=%new_layer, "Rewrote layer, {} -> {} bytes", layer.metadata().file_size, new_layer.metadata().file_size); @@ -407,7 +418,13 @@ impl Timeline { // necessary for correctness, but it simplifies testing, and avoids proceeding with another // Timeline's compaction while this timeline's uploads may be generating lots of disk I/O // load. - self.remote_client.wait_completion().await?; + match self.remote_client.wait_completion().await { + Ok(()) => (), + Err(WaitCompletionError::NotInitialized(ni)) => return Err(CompactionError::from(ni)), + Err(WaitCompletionError::UploadQueueShutDownOrStopped) => { + return Err(CompactionError::ShuttingDown) + } + } fail::fail_point!("compact-shard-ancestors-persistent"); @@ -465,7 +482,7 @@ impl Timeline { stats.read_lock_held_spawn_blocking_startup_micros = stats.read_lock_acquisition_micros.till_now(); // set by caller let layers = guard.layer_map(); - let level0_deltas = layers.get_level0_deltas()?; + let level0_deltas = layers.get_level0_deltas(); let mut level0_deltas = level0_deltas .into_iter() .map(|x| guard.get_from_desc(&x)) @@ -518,14 +535,23 @@ impl Timeline { ) as u64 * std::cmp::max(self.get_checkpoint_distance(), DEFAULT_CHECKPOINT_DISTANCE); - deltas_to_compact.push(first_level0_delta.download_and_keep_resident().await?); + deltas_to_compact.push( + first_level0_delta + .download_and_keep_resident() + .await + .map_err(CompactionError::input_layer_download_failed)?, + ); for l in level0_deltas_iter { let lsn_range = &l.layer_desc().lsn_range; if lsn_range.start != prev_lsn_end { break; } - deltas_to_compact.push(l.download_and_keep_resident().await?); + deltas_to_compact.push( + l.download_and_keep_resident() + .await + .map_err(CompactionError::input_layer_download_failed)?, + ); deltas_to_compact_bytes += l.metadata().file_size; prev_lsn_end = lsn_range.end; @@ -584,7 +610,7 @@ impl Timeline { let mut all_keys = Vec::new(); for l in deltas_to_compact.iter() { - all_keys.extend(l.load_keys(ctx).await?); + all_keys.extend(l.load_keys(ctx).await.map_err(CompactionError::Other)?); } // FIXME: should spawn_blocking the rest of this function @@ -706,7 +732,7 @@ impl Timeline { key, lsn, ref val, .. } in all_values_iter { - let value = val.load(ctx).await?; + let value = val.load(ctx).await.map_err(CompactionError::Other)?; let same_key = prev_key.map_or(false, |prev_key| prev_key == key); // We need to check key boundaries once we reach next key or end of layer with the same key if !same_key || lsn == dup_end_lsn { @@ -763,7 +789,8 @@ impl Timeline { .take() .unwrap() .finish(prev_key.unwrap().next(), self, ctx) - .await?, + .await + .map_err(CompactionError::Other)?, ); writer = None; @@ -801,7 +828,8 @@ impl Timeline { }, ctx, ) - .await?, + .await + .map_err(CompactionError::Other)?, ); } @@ -809,7 +837,8 @@ impl Timeline { .as_mut() .unwrap() .put_value(key, lsn, value, ctx) - .await?; + .await + .map_err(CompactionError::Other)?; } else { debug!( "Dropping key {} during compaction (it belongs on shard {:?})", @@ -825,7 +854,12 @@ impl Timeline { prev_key = Some(key); } if let Some(writer) = writer { - new_layers.push(writer.finish(prev_key.unwrap().next(), self, ctx).await?); + new_layers.push( + writer + .finish(prev_key.unwrap().next(), self, ctx) + .await + .map_err(CompactionError::Other)?, + ); } // Sync layers @@ -1007,7 +1041,7 @@ impl Timeline { let guard = self.layers.read().await; let layers = guard.layer_map(); - let l0_deltas = layers.get_level0_deltas()?; + let l0_deltas = layers.get_level0_deltas(); drop(guard); // As an optimization, if we find that there are too few L0 layers, @@ -1037,7 +1071,9 @@ impl Timeline { fanout, ctx, ) - .await?; + .await + // TODO: compact_tiered needs to return CompactionError + .map_err(CompactionError::Other)?; adaptor.flush_updates().await?; Ok(()) @@ -1235,7 +1271,7 @@ impl Timeline { self: &Arc, _cancel: &CancellationToken, ctx: &RequestContext, - ) -> Result<(), CompactionError> { + ) -> anyhow::Result<()> { use std::collections::BTreeSet; info!("running enhanced gc bottom-most compaction"); @@ -1504,7 +1540,7 @@ impl TimelineAdaptor { } } - pub async fn flush_updates(&mut self) -> anyhow::Result<()> { + pub async fn flush_updates(&mut self) -> Result<(), CompactionError> { let layers_to_delete = { let guard = self.timeline.layers.read().await; self.layers_to_delete diff --git a/pageserver/src/tenant/timeline/detach_ancestor.rs b/pageserver/src/tenant/timeline/detach_ancestor.rs index 49ce3db3e6..ee5f8cd52a 100644 --- a/pageserver/src/tenant/timeline/detach_ancestor.rs +++ b/pageserver/src/tenant/timeline/detach_ancestor.rs @@ -26,7 +26,7 @@ pub(crate) enum Error { #[error("flushing failed")] FlushAncestor(#[source] FlushLayerError), #[error("layer download failed")] - RewrittenDeltaDownloadFailed(#[source] anyhow::Error), + RewrittenDeltaDownloadFailed(#[source] crate::tenant::storage_layer::layer::DownloadError), #[error("copying LSN prefix locally failed")] CopyDeltaPrefix(#[source] anyhow::Error), #[error("upload rewritten layer")] diff --git a/pageserver/src/tenant/upload_queue.rs b/pageserver/src/tenant/upload_queue.rs index f7440ecdae..592f41cb21 100644 --- a/pageserver/src/tenant/upload_queue.rs +++ b/pageserver/src/tenant/upload_queue.rs @@ -130,7 +130,7 @@ pub(super) enum UploadQueueStopped { } #[derive(thiserror::Error, Debug)] -pub(crate) enum NotInitialized { +pub enum NotInitialized { #[error("queue is in state Uninitialized")] Uninitialized, #[error("queue is in state Stopped")]