From a8899e1e0f51b11882d70e2c7c7a176b9beaf173 Mon Sep 17 00:00:00 2001 From: John Spray Date: Fri, 20 Oct 2023 09:15:34 +0100 Subject: [PATCH] pageserver: apply timeout when waiting for tenant loads (#5601) ## Problem Loading tenants shouldn't hang. However, if it does, we shouldn't let one hung tenant prevent the entire process from starting background jobs. ## Summary of changes Generalize the timeout mechanism that we already applied to loading initial logical sizes: each phase in startup where we wait for a barrier is subject to a timeout, and startup will proceed if it doesn't complete within timeout. Startup metrics will still reflect the time when a phase actually completed, rather than when we skipped it. The code isn't the most beautiful, but that kind of reflects the awkwardness of await'ing on a future and then stashing it to await again later if we time out. I could imagine making this cleaner in future by waiting on a structure that doesn't self-destruct on wait() the way Barrier does, then make InitializationOrder into a structure that manages the series of waits etc. --- pageserver/src/bin/pageserver.rs | 165 ++++++++++++++++++++++--------- 1 file changed, 118 insertions(+), 47 deletions(-) diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 76cb0e8ec6..798b9f258b 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -2,6 +2,7 @@ use std::env::{var, VarError}; use std::sync::Arc; +use std::time::Duration; use std::{env, ops::ControlFlow, str::FromStr}; use anyhow::{anyhow, Context}; @@ -200,6 +201,51 @@ fn initialize_config( }) } +struct WaitForPhaseResult { + timeout_remaining: Duration, + skipped: Option, +} + +/// During startup, we apply a timeout to our waits for readiness, to avoid +/// stalling the whole service if one Tenant experiences some problem. Each +/// phase may consume some of the timeout: this function returns the updated +/// timeout for use in the next call. +async fn wait_for_phase(phase: &str, mut fut: F, timeout: Duration) -> WaitForPhaseResult +where + F: std::future::Future + Unpin, +{ + let initial_t = Instant::now(); + let skipped = match tokio::time::timeout(timeout, &mut fut).await { + Ok(_) => None, + Err(_) => { + tracing::info!( + timeout_millis = timeout.as_millis(), + %phase, + "Startup phase timed out, proceeding anyway" + ); + Some(fut) + } + }; + + WaitForPhaseResult { + timeout_remaining: timeout + .checked_sub(Instant::now().duration_since(initial_t)) + .unwrap_or(Duration::ZERO), + skipped, + } +} + +fn startup_checkpoint(started_at: Instant, phase: &str, human_phase: &str) { + let elapsed = started_at.elapsed(); + let secs = elapsed.as_secs_f64(); + STARTUP_DURATION.with_label_values(&[phase]).set(secs); + + info!( + elapsed_ms = elapsed.as_millis(), + "{human_phase} ({secs:.3}s since start)" + ) +} + fn start_pageserver( launch_ts: &'static LaunchTimestamp, conf: &'static PageServerConf, @@ -207,16 +253,6 @@ fn start_pageserver( // Monotonic time for later calculating startup duration let started_startup_at = Instant::now(); - let startup_checkpoint = move |phase: &str, human_phase: &str| { - let elapsed = started_startup_at.elapsed(); - let secs = elapsed.as_secs_f64(); - STARTUP_DURATION.with_label_values(&[phase]).set(secs); - info!( - elapsed_ms = elapsed.as_millis(), - "{human_phase} ({secs:.3}s since start)" - ) - }; - // Print version and launch timestamp to the log, // and expose them as prometheus metrics. // A changed version string indicates changed software. @@ -341,7 +377,7 @@ fn start_pageserver( // Up to this point no significant I/O has been done: this should have been fast. Record // duration prior to starting I/O intensive phase of startup. - startup_checkpoint("initial", "Starting loading tenants"); + startup_checkpoint(started_startup_at, "initial", "Starting loading tenants"); STARTUP_IS_LOADING.set(1); // Startup staging or optimizing: @@ -388,58 +424,93 @@ fn start_pageserver( let shutdown_pageserver = shutdown_pageserver.clone(); let drive_init = async move { // NOTE: unlike many futures in pageserver, this one is cancellation-safe - let guard = scopeguard::guard_on_success((), |_| tracing::info!("Cancelled before initial load completed")); + let guard = scopeguard::guard_on_success((), |_| { + tracing::info!("Cancelled before initial load completed") + }); - init_remote_done_rx.wait().await; - startup_checkpoint("initial_tenant_load_remote", "Remote part of initial load completed"); + let timeout = conf.background_task_maximum_delay; - init_done_rx.wait().await; - startup_checkpoint("initial_tenant_load", "Initial load completed"); - STARTUP_IS_LOADING.set(0); + let init_remote_done = std::pin::pin!(async { + init_remote_done_rx.wait().await; + startup_checkpoint( + started_startup_at, + "initial_tenant_load_remote", + "Remote part of initial load completed", + ); + }); + + let WaitForPhaseResult { + timeout_remaining: timeout, + skipped: init_remote_skipped, + } = wait_for_phase("initial_tenant_load_remote", init_remote_done, timeout).await; + + let init_load_done = std::pin::pin!(async { + init_done_rx.wait().await; + startup_checkpoint( + started_startup_at, + "initial_tenant_load", + "Initial load completed", + ); + STARTUP_IS_LOADING.set(0); + }); + + let WaitForPhaseResult { + timeout_remaining: timeout, + skipped: init_load_skipped, + } = wait_for_phase("initial_tenant_load", init_load_done, timeout).await; // initial logical sizes can now start, as they were waiting on init_done_rx. scopeguard::ScopeGuard::into_inner(guard); - let mut init_sizes_done = std::pin::pin!(init_logical_size_done_rx.wait()); + let guard = scopeguard::guard_on_success((), |_| { + tracing::info!("Cancelled before initial logical sizes completed") + }); - let timeout = conf.background_task_maximum_delay; + let logical_sizes_done = std::pin::pin!(async { + init_logical_size_done_rx.wait().await; + startup_checkpoint( + started_startup_at, + "initial_logical_sizes", + "Initial logical sizes completed", + ); + }); - let guard = scopeguard::guard_on_success((), |_| tracing::info!("Cancelled before initial logical sizes completed")); - - let init_sizes_done = match tokio::time::timeout(timeout, &mut init_sizes_done).await { - Ok(_) => { - startup_checkpoint("initial_logical_sizes", "Initial logical sizes completed"); - None - } - Err(_) => { - tracing::info!( - timeout_millis = timeout.as_millis(), - "Initial logical size timeout elapsed; starting background jobs" - ); - Some(init_sizes_done) - } - }; + let WaitForPhaseResult { + timeout_remaining: _, + skipped: logical_sizes_skipped, + } = wait_for_phase("initial_logical_sizes", logical_sizes_done, timeout).await; scopeguard::ScopeGuard::into_inner(guard); - // allow background jobs to start + // allow background jobs to start: we either completed prior stages, or they reached timeout + // and were skipped. It is important that we do not let them block background jobs indefinitely, + // because things like consumption metrics for billing are blocked by this barrier. drop(background_jobs_can_start); - startup_checkpoint("background_jobs_can_start", "Starting background jobs"); - - if let Some(init_sizes_done) = init_sizes_done { - // ending up here is not a bug; at the latest logical sizes will be queried by - // consumption metrics. - let guard = scopeguard::guard_on_success((), |_| tracing::info!("Cancelled before initial logical sizes completed")); - init_sizes_done.await; - - scopeguard::ScopeGuard::into_inner(guard); - - startup_checkpoint("initial_logical_sizes", "Initial logical sizes completed after timeout (background jobs already started)"); + startup_checkpoint( + started_startup_at, + "background_jobs_can_start", + "Starting background jobs", + ); + // We are done. If we skipped any phases due to timeout, run them to completion here so that + // they will eventually update their startup_checkpoint, and so that we do not declare the + // 'complete' stage until all the other stages are really done. + let guard = scopeguard::guard_on_success((), |_| { + tracing::info!("Cancelled before waiting for skipped phases done") + }); + if let Some(f) = init_remote_skipped { + f.await; } + if let Some(f) = init_load_skipped { + f.await; + } + if let Some(f) = logical_sizes_skipped { + f.await; + } + scopeguard::ScopeGuard::into_inner(guard); - startup_checkpoint("complete", "Startup complete"); + startup_checkpoint(started_startup_at, "complete", "Startup complete"); }; async move {