From 5263b39e2c3937c3de604fa02b93a45d5a31e9b7 Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Thu, 3 Aug 2023 20:34:05 +0300 Subject: [PATCH] fix: shutdown logging again (#4886) During deploys of 2023-08-03 we logged too much on shutdown. Fix the logging by timing each top level shutdown step, and possibly warn on it taking more than a rough threshold, based on how long I think it possibly should be taking. Also remove all shutdown logging from background tasks since there is already "shutdown is taking a long time" logging. Co-authored-by: John Spray --- pageserver/src/lib.rs | 108 +++++++++++++++++- pageserver/src/tenant/mgr.rs | 94 ++++++++------- pageserver/src/tenant/tasks.rs | 19 --- .../src/tenant/timeline/eviction_task.rs | 3 - test_runner/fixtures/neon_fixtures.py | 1 + 5 files changed, 154 insertions(+), 71 deletions(-) diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index f43651e931..1b1d0acaee 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -47,24 +47,50 @@ pub use crate::metrics::preinitialize_metrics; #[tracing::instrument] pub async fn shutdown_pageserver(exit_code: i32) { + use std::time::Duration; // Shut down the libpq endpoint task. This prevents new connections from // being accepted. - task_mgr::shutdown_tasks(Some(TaskKind::LibpqEndpointListener), None, None).await; + timed( + task_mgr::shutdown_tasks(Some(TaskKind::LibpqEndpointListener), None, None), + "shutdown LibpqEndpointListener", + Duration::from_secs(1), + ) + .await; // Shut down any page service tasks. - task_mgr::shutdown_tasks(Some(TaskKind::PageRequestHandler), None, None).await; + timed( + task_mgr::shutdown_tasks(Some(TaskKind::PageRequestHandler), None, None), + "shutdown PageRequestHandlers", + Duration::from_secs(1), + ) + .await; // Shut down all the tenants. This flushes everything to disk and kills // the checkpoint and GC tasks. - tenant::mgr::shutdown_all_tenants().await; + timed( + tenant::mgr::shutdown_all_tenants(), + "shutdown all tenants", + Duration::from_secs(5), + ) + .await; // Shut down the HTTP endpoint last, so that you can still check the server's // status while it's shutting down. // FIXME: We should probably stop accepting commands like attach/detach earlier. - task_mgr::shutdown_tasks(Some(TaskKind::HttpEndpointListener), None, None).await; + timed( + task_mgr::shutdown_tasks(Some(TaskKind::HttpEndpointListener), None, None), + "shutdown http", + Duration::from_secs(1), + ) + .await; // There should be nothing left, but let's be sure - task_mgr::shutdown_tasks(None, None, None).await; + timed( + task_mgr::shutdown_tasks(None, None, None), + "shutdown leftovers", + Duration::from_secs(1), + ) + .await; info!("Shut down successfully completed"); std::process::exit(exit_code); } @@ -172,6 +198,45 @@ pub struct InitializationOrder { pub background_jobs_can_start: utils::completion::Barrier, } +/// Time the future with a warning when it exceeds a threshold. +async fn timed( + fut: Fut, + name: &str, + warn_at: std::time::Duration, +) -> ::Output { + let started = std::time::Instant::now(); + + let mut fut = std::pin::pin!(fut); + + match tokio::time::timeout(warn_at, &mut fut).await { + Ok(ret) => { + tracing::info!( + task = name, + elapsed_ms = started.elapsed().as_millis(), + "completed" + ); + ret + } + Err(_) => { + tracing::info!( + task = name, + elapsed_ms = started.elapsed().as_millis(), + "still waiting, taking longer than expected..." + ); + + let ret = fut.await; + + tracing::warn!( + task = name, + elapsed_ms = started.elapsed().as_millis(), + "completed, took longer than expected" + ); + + ret + } + } +} + #[cfg(test)] mod backoff_defaults_tests { use super::*; @@ -202,3 +267,36 @@ mod backoff_defaults_tests { ); } } + +#[cfg(test)] +mod timed_tests { + use super::timed; + use std::time::Duration; + + #[tokio::test] + async fn timed_completes_when_inner_future_completes() { + // A future that completes on time should have its result returned + let r1 = timed( + async move { + tokio::time::sleep(Duration::from_millis(10)).await; + 123 + }, + "test 1", + Duration::from_millis(50), + ) + .await; + assert_eq!(r1, 123); + + // A future that completes too slowly should also have its result returned + let r1 = timed( + async move { + tokio::time::sleep(Duration::from_millis(50)).await; + 456 + }, + "test 1", + Duration::from_millis(10), + ) + .await; + assert_eq!(r1, 456); + } +} diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 25c5e3f2e0..2635953e6d 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -266,71 +266,77 @@ async fn shutdown_all_tenants0(tenants: &tokio::sync::RwLock) { } }; + let started_at = std::time::Instant::now(); let mut join_set = JoinSet::new(); for (tenant_id, tenant) in tenants_to_shut_down { join_set.spawn( async move { - // ordering shouldn't matter for this, either we store true right away or never - let ordering = std::sync::atomic::Ordering::Relaxed; - let joined_other = std::sync::atomic::AtomicBool::new(false); + let freeze_and_flush = true; - let mut shutdown = std::pin::pin!(async { - let freeze_and_flush = true; - - let res = { - let (_guard, shutdown_progress) = completion::channel(); - tenant.shutdown(shutdown_progress, freeze_and_flush).await - }; - - if let Err(other_progress) = res { - // join the another shutdown in progress - joined_other.store(true, ordering); - other_progress.wait().await; - } - }); - - // in practice we might not have a lot time to go, since systemd is going to - // SIGKILL us at 10s, but we can try. delete tenant might take a while, so put out - // a warning. - let warning = std::time::Duration::from_secs(5); - let mut warning = std::pin::pin!(tokio::time::sleep(warning)); - - tokio::select! { - _ = &mut shutdown => {}, - _ = &mut warning => { - let joined_other = joined_other.load(ordering); - warn!(%joined_other, "waiting for the shutdown to complete"); - shutdown.await; - } + let res = { + let (_guard, shutdown_progress) = completion::channel(); + tenant.shutdown(shutdown_progress, freeze_and_flush).await }; + if let Err(other_progress) = res { + // join the another shutdown in progress + other_progress.wait().await; + } + + // we cannot afford per tenant logging here, because if s3 is degraded, we are + // going to log too many lines + debug!("tenant successfully stopped"); } .instrument(info_span!("shutdown", %tenant_id)), ); } + let total = join_set.len(); let mut panicked = 0; + let mut buffering = true; + const BUFFER_FOR: std::time::Duration = std::time::Duration::from_millis(500); + let mut buffered = std::pin::pin!(tokio::time::sleep(BUFFER_FOR)); - while let Some(res) = join_set.join_next().await { - match res { - Ok(()) => {} - Err(join_error) if join_error.is_cancelled() => { - unreachable!("we are not cancelling any of the futures"); - } - Err(join_error) if join_error.is_panic() => { - // cannot really do anything, as this panic is likely a bug - panicked += 1; - } - Err(join_error) => { - warn!("unknown kind of JoinError: {join_error}"); + while !join_set.is_empty() { + tokio::select! { + Some(joined) = join_set.join_next() => { + match joined { + Ok(()) => {} + Err(join_error) if join_error.is_cancelled() => { + unreachable!("we are not cancelling any of the futures"); + } + Err(join_error) if join_error.is_panic() => { + // cannot really do anything, as this panic is likely a bug + panicked += 1; + } + Err(join_error) => { + warn!("unknown kind of JoinError: {join_error}"); + } + } + if !buffering { + // buffer so that every 500ms since the first update (or starting) we'll log + // how far away we are; this is because we will get SIGKILL'd at 10s, and we + // are not able to log *then*. + buffering = true; + buffered.as_mut().reset(tokio::time::Instant::now() + BUFFER_FOR); + } + }, + _ = &mut buffered, if buffering => { + buffering = false; + info!(remaining = join_set.len(), total, elapsed_ms = started_at.elapsed().as_millis(), "waiting for tenants to shutdown"); } } } if panicked > 0 { - warn!(panicked, "observed panicks while shutting down tenants"); + warn!( + panicked, + total, "observed panicks while shutting down tenants" + ); } + + // caller will log how long we took } pub async fn create_tenant( diff --git a/pageserver/src/tenant/tasks.rs b/pageserver/src/tenant/tasks.rs index 6400eb7cbe..c067a84471 100644 --- a/pageserver/src/tenant/tasks.rs +++ b/pageserver/src/tenant/tasks.rs @@ -73,17 +73,13 @@ pub fn start_background_loops( /// async fn compaction_loop(tenant: Arc, cancel: CancellationToken) { let wait_duration = Duration::from_secs(2); - info!("starting"); TENANT_TASK_EVENTS.with_label_values(&["start"]).inc(); async { let ctx = RequestContext::todo_child(TaskKind::Compaction, DownloadBehavior::Download); let mut first = true; loop { - trace!("waking up"); - tokio::select! { _ = cancel.cancelled() => { - info!("received cancellation request"); return; }, tenant_wait_result = wait_for_active_tenant(&tenant) => match tenant_wait_result { @@ -103,11 +99,6 @@ async fn compaction_loop(tenant: Arc, cancel: CancellationToken) { } } - if cancel.is_cancelled() { - info!("received cancellation request"); - break; - } - let started_at = Instant::now(); let sleep_duration = if period == Duration::ZERO { @@ -131,15 +122,12 @@ async fn compaction_loop(tenant: Arc, cancel: CancellationToken) { .await .is_ok() { - info!("received cancellation request during idling"); break; } } } .await; TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc(); - - trace!("compaction loop stopped."); } /// @@ -147,7 +135,6 @@ async fn compaction_loop(tenant: Arc, cancel: CancellationToken) { /// async fn gc_loop(tenant: Arc, cancel: CancellationToken) { let wait_duration = Duration::from_secs(2); - info!("starting"); TENANT_TASK_EVENTS.with_label_values(&["start"]).inc(); async { // GC might require downloading, to find the cutoff LSN that corresponds to the @@ -156,11 +143,8 @@ async fn gc_loop(tenant: Arc, cancel: CancellationToken) { RequestContext::todo_child(TaskKind::GarbageCollector, DownloadBehavior::Download); let mut first = true; loop { - trace!("waking up"); - tokio::select! { _ = cancel.cancelled() => { - info!("received cancellation request"); return; }, tenant_wait_result = wait_for_active_tenant(&tenant) => match tenant_wait_result { @@ -205,14 +189,12 @@ async fn gc_loop(tenant: Arc, cancel: CancellationToken) { .await .is_ok() { - info!("received cancellation request during idling"); break; } } } .await; TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc(); - trace!("GC loop stopped."); } async fn wait_for_active_tenant(tenant: &Arc) -> ControlFlow<()> { @@ -237,7 +219,6 @@ async fn wait_for_active_tenant(tenant: &Arc) -> ControlFlow<()> { } } Err(_sender_dropped_error) => { - info!("Tenant dropped the state updates sender, quitting waiting for tenant and the task loop"); return ControlFlow::Break(()); } } diff --git a/pageserver/src/tenant/timeline/eviction_task.rs b/pageserver/src/tenant/timeline/eviction_task.rs index 354f971e11..3e407dda57 100644 --- a/pageserver/src/tenant/timeline/eviction_task.rs +++ b/pageserver/src/tenant/timeline/eviction_task.rs @@ -78,9 +78,6 @@ impl Timeline { #[instrument(skip_all, fields(tenant_id = %self.tenant_id, timeline_id = %self.timeline_id))] async fn eviction_task(self: Arc, cancel: CancellationToken) { - scopeguard::defer! { - info!("eviction task finishing"); - } use crate::tenant::tasks::random_init_delay; { let policy = self.get_eviction_policy(); diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index ac237cafca..f5d4ca3f20 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -1529,6 +1529,7 @@ class NeonPageserver(PgProtocol): ".*Compaction failed, retrying in .*: queue is in state Stopped.*", # Pageserver timeline deletion should be polled until it gets 404, so ignore it globally ".*Error processing HTTP request: NotFound: Timeline .* was not found", + ".*took more than expected to complete.*", ] def start(