From 9dc9a9b2e950638b4fc018e1254879cbb430ba6a Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Mon, 12 Aug 2024 15:37:15 +0300 Subject: [PATCH] test: do graceful shutdown by default (#8655) It should give us all possible allowed_errors more consistently. While getting the workflows to pass on https://github.com/neondatabase/neon/pull/8632 it was noticed that allowed_errors are rarely hit (1/4). This made me realize that we always do an immediate stop by default. Doing a graceful shutdown would had made the draining more apparent and likely we would not have needed the #8632 hotfix. Downside of doing this is that we will see more timeouts if tests are randomly leaving pause failpoints which fail the shutdown. The net outcome should however be positive, we could even detect too slow shutdowns caused by a bug or deadlock. --- pageserver/src/tenant.rs | 10 +++++ .../src/tenant/remote_timeline_client.rs | 5 ++- .../src/tenant/storage_layer/image_layer.rs | 3 -- pageserver/src/tenant/storage_layer/layer.rs | 4 +- pageserver/src/tenant/tasks.rs | 13 +++--- pageserver/src/tenant/timeline.rs | 40 ++++++++----------- pageserver/src/tenant/timeline/compaction.rs | 22 ++++------ .../walreceiver/walreceiver_connection.rs | 3 ++ test_runner/fixtures/neon_fixtures.py | 2 +- test_runner/regress/test_ancestor_branch.py | 6 ++- test_runner/regress/test_timeline_size.py | 7 ++++ 11 files changed, 63 insertions(+), 52 deletions(-) diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 90c0e28bc4..cfdb32f755 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -41,6 +41,7 @@ use tokio::sync::watch; use tokio::task::JoinSet; use tokio_util::sync::CancellationToken; use tracing::*; +use upload_queue::NotInitialized; use utils::backoff; use utils::circuit_breaker::CircuitBreaker; use utils::completion; @@ -601,6 +602,15 @@ impl From for GcError { } } +impl From for GcError { + fn from(value: NotInitialized) -> Self { + match value { + NotInitialized::Uninitialized => GcError::Remote(value.into()), + NotInitialized::Stopped | NotInitialized::ShuttingDown => GcError::TimelineCancelled, + } + } +} + impl From for GcError { fn from(_: timeline::layer_manager::Shutdown) -> Self { GcError::TimelineCancelled diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 1344fe4192..8a76d7532f 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -985,7 +985,10 @@ impl RemoteTimelineClient { /// /// The files will be leaked in remote storage unless [`Self::schedule_deletion_of_unlinked`] /// is invoked on them. - pub(crate) fn schedule_gc_update(self: &Arc, gc_layers: &[Layer]) -> anyhow::Result<()> { + pub(crate) fn schedule_gc_update( + self: &Arc, + gc_layers: &[Layer], + ) -> Result<(), NotInitialized> { let mut guard = self.upload_queue.lock().unwrap(); let upload_queue = guard.initialized_mut()?; diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 16ba0fda94..f9d3fdf186 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -369,9 +369,6 @@ impl ImageLayerInner { self.lsn } - /// Returns nested result following Result, Critical>: - /// - inner has the success or transient failure - /// - outer has the permanent failure pub(super) async fn load( path: &Utf8Path, lsn: Lsn, diff --git a/pageserver/src/tenant/storage_layer/layer.rs b/pageserver/src/tenant/storage_layer/layer.rs index 83450d24bb..0175f32268 100644 --- a/pageserver/src/tenant/storage_layer/layer.rs +++ b/pageserver/src/tenant/storage_layer/layer.rs @@ -1848,8 +1848,8 @@ impl ResidentLayer { /// Read all they keys in this layer which match the ShardIdentity, and write them all to /// the provided writer. Return the number of keys written. #[tracing::instrument(level = tracing::Level::DEBUG, skip_all, fields(layer=%self))] - pub(crate) async fn filter<'a>( - &'a self, + pub(crate) async fn filter( + &self, shard_identity: &ShardIdentity, writer: &mut ImageLayerWriter, ctx: &RequestContext, diff --git a/pageserver/src/tenant/tasks.rs b/pageserver/src/tenant/tasks.rs index b4706ea59d..713845e9ac 100644 --- a/pageserver/src/tenant/tasks.rs +++ b/pageserver/src/tenant/tasks.rs @@ -211,6 +211,11 @@ async fn compaction_loop(tenant: Arc, cancel: CancellationToken) { } else { // Run compaction match tenant.compaction_iteration(&cancel, &ctx).await { + Ok(has_pending_task) => { + error_run_count = 0; + // schedule the next compaction immediately in case there is a pending compaction task + if has_pending_task { Duration::ZERO } else { period } + } Err(e) => { let wait_duration = backoff::exponential_backoff_duration_seconds( error_run_count + 1, @@ -227,11 +232,6 @@ async fn compaction_loop(tenant: Arc, cancel: CancellationToken) { ); wait_duration } - Ok(has_pending_task) => { - error_run_count = 0; - // schedule the next compaction immediately in case there is a pending compaction task - if has_pending_task { Duration::from_secs(0) } else { period } - } } }; @@ -265,7 +265,8 @@ async fn compaction_loop(tenant: Arc, cancel: CancellationToken) { count_throttled, sum_throttled_usecs, allowed_rps=%format_args!("{allowed_rps:.0}"), - "shard was throttled in the last n_seconds") + "shard was throttled in the last n_seconds" + ); }); // Sleep diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index f810df5a56..b003834adf 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -4421,22 +4421,24 @@ impl From for CompactionError { } } -impl CompactionError { - /// We cannot do compaction because we could not download a layer that is input to the compaction. - pub(crate) fn input_layer_download_failed( - e: super::storage_layer::layer::DownloadError, - ) -> Self { +impl From for CompactionError { + fn from(e: super::storage_layer::layer::DownloadError) -> Self { match e { - super::storage_layer::layer::DownloadError::TimelineShutdown | - /* TODO DownloadCancelled correct here? */ - super::storage_layer::layer::DownloadError::DownloadCancelled => CompactionError::ShuttingDown, - super::storage_layer::layer::DownloadError::ContextAndConfigReallyDeniesDownloads | - super::storage_layer::layer::DownloadError::DownloadRequired | - super::storage_layer::layer::DownloadError::NotFile(_) | - super::storage_layer::layer::DownloadError::DownloadFailed | - super::storage_layer::layer::DownloadError::PreStatFailed(_)=>CompactionError::Other(anyhow::anyhow!(e)), + super::storage_layer::layer::DownloadError::TimelineShutdown + | super::storage_layer::layer::DownloadError::DownloadCancelled => { + CompactionError::ShuttingDown + } + super::storage_layer::layer::DownloadError::ContextAndConfigReallyDeniesDownloads + | super::storage_layer::layer::DownloadError::DownloadRequired + | super::storage_layer::layer::DownloadError::NotFile(_) + | super::storage_layer::layer::DownloadError::DownloadFailed + | super::storage_layer::layer::DownloadError::PreStatFailed(_) => { + CompactionError::Other(anyhow::anyhow!(e)) + } #[cfg(test)] - super::storage_layer::layer::DownloadError::Failpoint(_) => CompactionError::Other(anyhow::anyhow!(e)), + super::storage_layer::layer::DownloadError::Failpoint(_) => { + CompactionError::Other(anyhow::anyhow!(e)) + } } } } @@ -4990,15 +4992,7 @@ impl Timeline { result.layers_removed = gc_layers.len() as u64; - self.remote_client - .schedule_gc_update(&gc_layers) - .map_err(|e| { - if self.cancel.is_cancelled() { - GcError::TimelineCancelled - } else { - GcError::Remote(e) - } - })?; + self.remote_client.schedule_gc_update(&gc_layers)?; guard.open_mut()?.finish_gc_timeline(&gc_layers); diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 87ec46c0b5..8390cb839c 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -489,10 +489,7 @@ impl Timeline { // - We do not run concurrently with other kinds of compaction, so the only layer map writes we race with are: // - GC, which at worst witnesses us "undelete" a layer that they just deleted. // - ingestion, which only inserts layers, therefore cannot collide with us. - let resident = layer - .download_and_keep_resident() - .await - .map_err(CompactionError::input_layer_download_failed)?; + let resident = layer.download_and_keep_resident().await?; let keys_written = resident .filter(&self.shard_identity, &mut image_layer_writer, ctx) @@ -693,23 +690,14 @@ impl Timeline { let mut fully_compacted = true; - deltas_to_compact.push( - first_level0_delta - .download_and_keep_resident() - .await - .map_err(CompactionError::input_layer_download_failed)?, - ); + deltas_to_compact.push(first_level0_delta.download_and_keep_resident().await?); for l in level0_deltas_iter { let lsn_range = &l.layer_desc().lsn_range; if lsn_range.start != prev_lsn_end { break; } - deltas_to_compact.push( - l.download_and_keep_resident() - .await - .map_err(CompactionError::input_layer_download_failed)?, - ); + deltas_to_compact.push(l.download_and_keep_resident().await?); deltas_to_compact_bytes += l.metadata().file_size; prev_lsn_end = lsn_range.end; @@ -1137,6 +1125,10 @@ impl Timeline { if !self.shard_identity.is_key_disposable(&key) { if writer.is_none() { + if self.cancel.is_cancelled() { + // to be somewhat responsive to cancellation, check for each new layer + return Err(CompactionError::ShuttingDown); + } // Create writer if not initiaized yet writer = Some( DeltaLayerWriter::new( diff --git a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs index a66900522a..b5c577af72 100644 --- a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs @@ -335,6 +335,9 @@ pub(super) async fn handle_walreceiver_connection( filtered_records += 1; } + // FIXME: this cannot be made pausable_failpoint without fixing the + // failpoint library; in tests, the added amount of debugging will cause us + // to timeout the tests. fail_point!("walreceiver-after-ingest"); last_rec_lsn = lsn; diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 4374e74a41..561e8bce04 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -963,7 +963,7 @@ class NeonEnvBuilder: if self.env: log.info("Cleaning up all storage and compute nodes") self.env.stop( - immediate=True, + immediate=False, # if the test threw an exception, don't check for errors # as a failing assertion would cause the cleanup below to fail ps_assert_metric_no_errors=(exc_type is None), diff --git a/test_runner/regress/test_ancestor_branch.py b/test_runner/regress/test_ancestor_branch.py index 7e40081aa2..f83b44a7ad 100644 --- a/test_runner/regress/test_ancestor_branch.py +++ b/test_runner/regress/test_ancestor_branch.py @@ -20,7 +20,9 @@ def test_ancestor_branch(neon_env_builder: NeonEnvBuilder): } ) - pageserver_http.configure_failpoints(("flush-frozen-pausable", "sleep(10000)")) + failpoint = "flush-frozen-pausable" + + pageserver_http.configure_failpoints((failpoint, "sleep(10000)")) endpoint_branch0 = env.endpoints.create_start("main", tenant_id=tenant) branch0_cur = endpoint_branch0.connect().cursor() @@ -96,3 +98,5 @@ def test_ancestor_branch(neon_env_builder: NeonEnvBuilder): assert query_scalar(branch1_cur, "SELECT count(*) FROM foo") == 200000 assert query_scalar(branch2_cur, "SELECT count(*) FROM foo") == 300000 + + pageserver_http.configure_failpoints((failpoint, "off")) diff --git a/test_runner/regress/test_timeline_size.py b/test_runner/regress/test_timeline_size.py index 1f220eec9e..642b9e449b 100644 --- a/test_runner/regress/test_timeline_size.py +++ b/test_runner/regress/test_timeline_size.py @@ -1137,3 +1137,10 @@ def test_lazy_attach_activation(neon_env_builder: NeonEnvBuilder, activation_met delete_lazy_activating(lazy_tenant, env.pageserver, expect_attaching=True) else: raise RuntimeError(activation_method) + + client.configure_failpoints( + [ + ("timeline-calculate-logical-size-pause", "off"), + ("walreceiver-after-ingest", "off"), + ] + )