diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 90c0e28bc4..cfdb32f755 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -41,6 +41,7 @@ use tokio::sync::watch; use tokio::task::JoinSet; use tokio_util::sync::CancellationToken; use tracing::*; +use upload_queue::NotInitialized; use utils::backoff; use utils::circuit_breaker::CircuitBreaker; use utils::completion; @@ -601,6 +602,15 @@ impl From for GcError { } } +impl From for GcError { + fn from(value: NotInitialized) -> Self { + match value { + NotInitialized::Uninitialized => GcError::Remote(value.into()), + NotInitialized::Stopped | NotInitialized::ShuttingDown => GcError::TimelineCancelled, + } + } +} + impl From for GcError { fn from(_: timeline::layer_manager::Shutdown) -> Self { GcError::TimelineCancelled diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 1344fe4192..8a76d7532f 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -985,7 +985,10 @@ impl RemoteTimelineClient { /// /// The files will be leaked in remote storage unless [`Self::schedule_deletion_of_unlinked`] /// is invoked on them. - pub(crate) fn schedule_gc_update(self: &Arc, gc_layers: &[Layer]) -> anyhow::Result<()> { + pub(crate) fn schedule_gc_update( + self: &Arc, + gc_layers: &[Layer], + ) -> Result<(), NotInitialized> { let mut guard = self.upload_queue.lock().unwrap(); let upload_queue = guard.initialized_mut()?; diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 16ba0fda94..f9d3fdf186 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -369,9 +369,6 @@ impl ImageLayerInner { self.lsn } - /// Returns nested result following Result, Critical>: - /// - inner has the success or transient failure - /// - outer has the permanent failure pub(super) async fn load( path: &Utf8Path, lsn: Lsn, diff --git a/pageserver/src/tenant/storage_layer/layer.rs b/pageserver/src/tenant/storage_layer/layer.rs index 83450d24bb..0175f32268 100644 --- a/pageserver/src/tenant/storage_layer/layer.rs +++ b/pageserver/src/tenant/storage_layer/layer.rs @@ -1848,8 +1848,8 @@ impl ResidentLayer { /// Read all they keys in this layer which match the ShardIdentity, and write them all to /// the provided writer. Return the number of keys written. #[tracing::instrument(level = tracing::Level::DEBUG, skip_all, fields(layer=%self))] - pub(crate) async fn filter<'a>( - &'a self, + pub(crate) async fn filter( + &self, shard_identity: &ShardIdentity, writer: &mut ImageLayerWriter, ctx: &RequestContext, diff --git a/pageserver/src/tenant/tasks.rs b/pageserver/src/tenant/tasks.rs index b4706ea59d..713845e9ac 100644 --- a/pageserver/src/tenant/tasks.rs +++ b/pageserver/src/tenant/tasks.rs @@ -211,6 +211,11 @@ async fn compaction_loop(tenant: Arc, cancel: CancellationToken) { } else { // Run compaction match tenant.compaction_iteration(&cancel, &ctx).await { + Ok(has_pending_task) => { + error_run_count = 0; + // schedule the next compaction immediately in case there is a pending compaction task + if has_pending_task { Duration::ZERO } else { period } + } Err(e) => { let wait_duration = backoff::exponential_backoff_duration_seconds( error_run_count + 1, @@ -227,11 +232,6 @@ async fn compaction_loop(tenant: Arc, cancel: CancellationToken) { ); wait_duration } - Ok(has_pending_task) => { - error_run_count = 0; - // schedule the next compaction immediately in case there is a pending compaction task - if has_pending_task { Duration::from_secs(0) } else { period } - } } }; @@ -265,7 +265,8 @@ async fn compaction_loop(tenant: Arc, cancel: CancellationToken) { count_throttled, sum_throttled_usecs, allowed_rps=%format_args!("{allowed_rps:.0}"), - "shard was throttled in the last n_seconds") + "shard was throttled in the last n_seconds" + ); }); // Sleep diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index f810df5a56..b003834adf 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -4421,22 +4421,24 @@ impl From for CompactionError { } } -impl CompactionError { - /// We cannot do compaction because we could not download a layer that is input to the compaction. - pub(crate) fn input_layer_download_failed( - e: super::storage_layer::layer::DownloadError, - ) -> Self { +impl From for CompactionError { + fn from(e: super::storage_layer::layer::DownloadError) -> Self { match e { - super::storage_layer::layer::DownloadError::TimelineShutdown | - /* TODO DownloadCancelled correct here? */ - super::storage_layer::layer::DownloadError::DownloadCancelled => CompactionError::ShuttingDown, - super::storage_layer::layer::DownloadError::ContextAndConfigReallyDeniesDownloads | - super::storage_layer::layer::DownloadError::DownloadRequired | - super::storage_layer::layer::DownloadError::NotFile(_) | - super::storage_layer::layer::DownloadError::DownloadFailed | - super::storage_layer::layer::DownloadError::PreStatFailed(_)=>CompactionError::Other(anyhow::anyhow!(e)), + super::storage_layer::layer::DownloadError::TimelineShutdown + | super::storage_layer::layer::DownloadError::DownloadCancelled => { + CompactionError::ShuttingDown + } + super::storage_layer::layer::DownloadError::ContextAndConfigReallyDeniesDownloads + | super::storage_layer::layer::DownloadError::DownloadRequired + | super::storage_layer::layer::DownloadError::NotFile(_) + | super::storage_layer::layer::DownloadError::DownloadFailed + | super::storage_layer::layer::DownloadError::PreStatFailed(_) => { + CompactionError::Other(anyhow::anyhow!(e)) + } #[cfg(test)] - super::storage_layer::layer::DownloadError::Failpoint(_) => CompactionError::Other(anyhow::anyhow!(e)), + super::storage_layer::layer::DownloadError::Failpoint(_) => { + CompactionError::Other(anyhow::anyhow!(e)) + } } } } @@ -4990,15 +4992,7 @@ impl Timeline { result.layers_removed = gc_layers.len() as u64; - self.remote_client - .schedule_gc_update(&gc_layers) - .map_err(|e| { - if self.cancel.is_cancelled() { - GcError::TimelineCancelled - } else { - GcError::Remote(e) - } - })?; + self.remote_client.schedule_gc_update(&gc_layers)?; guard.open_mut()?.finish_gc_timeline(&gc_layers); diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 87ec46c0b5..8390cb839c 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -489,10 +489,7 @@ impl Timeline { // - We do not run concurrently with other kinds of compaction, so the only layer map writes we race with are: // - GC, which at worst witnesses us "undelete" a layer that they just deleted. // - ingestion, which only inserts layers, therefore cannot collide with us. - let resident = layer - .download_and_keep_resident() - .await - .map_err(CompactionError::input_layer_download_failed)?; + let resident = layer.download_and_keep_resident().await?; let keys_written = resident .filter(&self.shard_identity, &mut image_layer_writer, ctx) @@ -693,23 +690,14 @@ impl Timeline { let mut fully_compacted = true; - deltas_to_compact.push( - first_level0_delta - .download_and_keep_resident() - .await - .map_err(CompactionError::input_layer_download_failed)?, - ); + deltas_to_compact.push(first_level0_delta.download_and_keep_resident().await?); for l in level0_deltas_iter { let lsn_range = &l.layer_desc().lsn_range; if lsn_range.start != prev_lsn_end { break; } - deltas_to_compact.push( - l.download_and_keep_resident() - .await - .map_err(CompactionError::input_layer_download_failed)?, - ); + deltas_to_compact.push(l.download_and_keep_resident().await?); deltas_to_compact_bytes += l.metadata().file_size; prev_lsn_end = lsn_range.end; @@ -1137,6 +1125,10 @@ impl Timeline { if !self.shard_identity.is_key_disposable(&key) { if writer.is_none() { + if self.cancel.is_cancelled() { + // to be somewhat responsive to cancellation, check for each new layer + return Err(CompactionError::ShuttingDown); + } // Create writer if not initiaized yet writer = Some( DeltaLayerWriter::new( diff --git a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs index a66900522a..b5c577af72 100644 --- a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs @@ -335,6 +335,9 @@ pub(super) async fn handle_walreceiver_connection( filtered_records += 1; } + // FIXME: this cannot be made pausable_failpoint without fixing the + // failpoint library; in tests, the added amount of debugging will cause us + // to timeout the tests. fail_point!("walreceiver-after-ingest"); last_rec_lsn = lsn; diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 4374e74a41..561e8bce04 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -963,7 +963,7 @@ class NeonEnvBuilder: if self.env: log.info("Cleaning up all storage and compute nodes") self.env.stop( - immediate=True, + immediate=False, # if the test threw an exception, don't check for errors # as a failing assertion would cause the cleanup below to fail ps_assert_metric_no_errors=(exc_type is None), diff --git a/test_runner/regress/test_ancestor_branch.py b/test_runner/regress/test_ancestor_branch.py index 7e40081aa2..f83b44a7ad 100644 --- a/test_runner/regress/test_ancestor_branch.py +++ b/test_runner/regress/test_ancestor_branch.py @@ -20,7 +20,9 @@ def test_ancestor_branch(neon_env_builder: NeonEnvBuilder): } ) - pageserver_http.configure_failpoints(("flush-frozen-pausable", "sleep(10000)")) + failpoint = "flush-frozen-pausable" + + pageserver_http.configure_failpoints((failpoint, "sleep(10000)")) endpoint_branch0 = env.endpoints.create_start("main", tenant_id=tenant) branch0_cur = endpoint_branch0.connect().cursor() @@ -96,3 +98,5 @@ def test_ancestor_branch(neon_env_builder: NeonEnvBuilder): assert query_scalar(branch1_cur, "SELECT count(*) FROM foo") == 200000 assert query_scalar(branch2_cur, "SELECT count(*) FROM foo") == 300000 + + pageserver_http.configure_failpoints((failpoint, "off")) diff --git a/test_runner/regress/test_timeline_size.py b/test_runner/regress/test_timeline_size.py index 1f220eec9e..642b9e449b 100644 --- a/test_runner/regress/test_timeline_size.py +++ b/test_runner/regress/test_timeline_size.py @@ -1137,3 +1137,10 @@ def test_lazy_attach_activation(neon_env_builder: NeonEnvBuilder, activation_met delete_lazy_activating(lazy_tenant, env.pageserver, expect_attaching=True) else: raise RuntimeError(activation_method) + + client.configure_failpoints( + [ + ("timeline-calculate-logical-size-pause", "off"), + ("walreceiver-after-ingest", "off"), + ] + )