diff --git a/libs/utils/src/logging.rs b/libs/utils/src/logging.rs index 5828a400a0..d67c0f123b 100644 --- a/libs/utils/src/logging.rs +++ b/libs/utils/src/logging.rs @@ -1,4 +1,5 @@ use std::future::Future; +use std::pin::Pin; use std::str::FromStr; use std::time::Duration; @@ -7,7 +8,7 @@ use metrics::{IntCounter, IntCounterVec}; use once_cell::sync::Lazy; use strum_macros::{EnumString, VariantNames}; use tokio::time::Instant; -use tracing::info; +use tracing::{info, warn}; /// Logs a critical error, similarly to `tracing::error!`. This will: /// @@ -377,10 +378,11 @@ impl std::fmt::Debug for SecretString { /// /// TODO: consider upgrading this to a warning, but currently it fires too often. #[inline] -pub async fn log_slow(name: &str, threshold: Duration, f: std::pin::Pin<&mut F>) -> O -where - F: Future, -{ +pub async fn log_slow( + name: &str, + threshold: Duration, + f: Pin<&mut impl Future>, +) -> O { monitor_slow_future( threshold, threshold, // period = threshold @@ -394,16 +396,42 @@ where if !is_slow { return; } + let elapsed = elapsed_total.as_secs_f64(); if ready { - info!( - "slow {name} completed after {:.3}s", - elapsed_total.as_secs_f64() - ); + info!("slow {name} completed after {elapsed:.3}s"); } else { - info!( - "slow {name} still running after {:.3}s", - elapsed_total.as_secs_f64() - ); + info!("slow {name} still running after {elapsed:.3}s"); + } + }, + ) + .await +} + +/// Logs a periodic warning if a future is slow to complete. +#[inline] +pub async fn warn_slow( + name: &str, + threshold: Duration, + f: Pin<&mut impl Future>, +) -> O { + monitor_slow_future( + threshold, + threshold, // period = threshold + f, + |MonitorSlowFutureCallback { + ready, + is_slow, + elapsed_total, + elapsed_since_last_callback: _, + }| { + if !is_slow { + return; + } + let elapsed = elapsed_total.as_secs_f64(); + if ready { + warn!("slow {name} completed after {elapsed:.3}s"); + } else { + warn!("slow {name} still running after {elapsed:.3}s"); } }, ) @@ -416,7 +444,7 @@ where pub async fn monitor_slow_future( threshold: Duration, period: Duration, - mut fut: std::pin::Pin<&mut F>, + mut fut: Pin<&mut F>, mut cb: impl FnMut(MonitorSlowFutureCallback), ) -> O where diff --git a/pageserver/client_grpc/src/client.rs b/pageserver/client_grpc/src/client.rs index 7049fbdb96..7732585f7c 100644 --- a/pageserver/client_grpc/src/client.rs +++ b/pageserver/client_grpc/src/client.rs @@ -1,13 +1,16 @@ use std::collections::HashMap; use std::num::NonZero; +use std::pin::pin; use std::sync::Arc; +use std::time::{Duration, Instant}; use anyhow::anyhow; use arc_swap::ArcSwap; use futures::stream::FuturesUnordered; use futures::{FutureExt as _, StreamExt as _}; use tonic::codec::CompressionEncoding; -use tracing::instrument; +use tracing::{debug, instrument}; +use utils::logging::warn_slow; use crate::pool::{ChannelPool, ClientGuard, ClientPool, StreamGuard, StreamPool}; use crate::retry::Retry; @@ -44,6 +47,23 @@ const MAX_BULK_STREAMS: NonZero = NonZero::new(16).unwrap(); /// get a larger queue depth. const MAX_BULK_STREAM_QUEUE_DEPTH: NonZero = NonZero::new(4).unwrap(); +/// The overall request call timeout, including retries and pool acquisition. +/// TODO: should we retry forever? Should the caller decide? +const CALL_TIMEOUT: Duration = Duration::from_secs(60); + +/// The per-request (retry attempt) timeout, including any lazy connection establishment. +const REQUEST_TIMEOUT: Duration = Duration::from_secs(10); + +/// The initial request retry backoff duration. The first retry does not back off. +/// TODO: use a different backoff for ResourceExhausted (rate limiting)? Needs server support. +const BASE_BACKOFF: Duration = Duration::from_millis(5); + +/// The maximum request retry backoff duration. +const MAX_BACKOFF: Duration = Duration::from_secs(5); + +/// Threshold and interval for warning about slow operation. +const SLOW_THRESHOLD: Duration = Duration::from_secs(3); + /// A rich Pageserver gRPC client for a single tenant timeline. This client is more capable than the /// basic `page_api::Client` gRPC client, and supports: /// @@ -67,8 +87,6 @@ pub struct PageserverClient { compression: Option, /// The shards for this tenant. shards: ArcSwap, - /// The retry configuration. - retry: Retry, } impl PageserverClient { @@ -94,7 +112,6 @@ impl PageserverClient { auth_token, compression, shards: ArcSwap::new(Arc::new(shards)), - retry: Retry, }) } @@ -142,13 +159,15 @@ impl PageserverClient { &self, req: page_api::CheckRelExistsRequest, ) -> tonic::Result { - self.retry - .with(async |_| { - // Relation metadata is only available on shard 0. - let mut client = self.shards.load_full().get_zero().client().await?; - client.check_rel_exists(req).await - }) - .await + debug!("sending request: {req:?}"); + let resp = Self::with_retries(CALL_TIMEOUT, async |_| { + // Relation metadata is only available on shard 0. + let mut client = self.shards.load_full().get_zero().client().await?; + Self::with_timeout(REQUEST_TIMEOUT, client.check_rel_exists(req)).await + }) + .await?; + debug!("received response: {resp:?}"); + Ok(resp) } /// Returns the total size of a database, as # of bytes. @@ -157,13 +176,15 @@ impl PageserverClient { &self, req: page_api::GetDbSizeRequest, ) -> tonic::Result { - self.retry - .with(async |_| { - // Relation metadata is only available on shard 0. - let mut client = self.shards.load_full().get_zero().client().await?; - client.get_db_size(req).await - }) - .await + debug!("sending request: {req:?}"); + let resp = Self::with_retries(CALL_TIMEOUT, async |_| { + // Relation metadata is only available on shard 0. + let mut client = self.shards.load_full().get_zero().client().await?; + Self::with_timeout(REQUEST_TIMEOUT, client.get_db_size(req)).await + }) + .await?; + debug!("received response: {resp:?}"); + Ok(resp) } /// Fetches pages. The `request_id` must be unique across all in-flight requests, and the @@ -193,6 +214,8 @@ impl PageserverClient { return Err(tonic::Status::invalid_argument("request attempt must be 0")); } + debug!("sending request: {req:?}"); + // The shards may change while we're fetching pages. We execute the request using a stable // view of the shards (especially important for requests that span shards), but retry the // top-level (pre-split) request to pick up shard changes. This can lead to unnecessary @@ -201,13 +224,16 @@ impl PageserverClient { // // TODO: the gRPC server and client doesn't yet properly support shard splits. Revisit this // once we figure out how to handle these. - self.retry - .with(async |attempt| { - let mut req = req.clone(); - req.request_id.attempt = attempt as u32; - Self::get_page_with_shards(req, &self.shards.load_full()).await - }) - .await + let resp = Self::with_retries(CALL_TIMEOUT, async |attempt| { + let mut req = req.clone(); + req.request_id.attempt = attempt as u32; + let shards = self.shards.load_full(); + Self::with_timeout(REQUEST_TIMEOUT, Self::get_page_with_shards(req, &shards)).await + }) + .await?; + + debug!("received response: {resp:?}"); + Ok(resp) } /// Fetches pages using the given shards. This uses a stable view of the shards, regardless of @@ -290,13 +316,15 @@ impl PageserverClient { &self, req: page_api::GetRelSizeRequest, ) -> tonic::Result { - self.retry - .with(async |_| { - // Relation metadata is only available on shard 0. - let mut client = self.shards.load_full().get_zero().client().await?; - client.get_rel_size(req).await - }) - .await + debug!("sending request: {req:?}"); + let resp = Self::with_retries(CALL_TIMEOUT, async |_| { + // Relation metadata is only available on shard 0. + let mut client = self.shards.load_full().get_zero().client().await?; + Self::with_timeout(REQUEST_TIMEOUT, client.get_rel_size(req)).await + }) + .await?; + debug!("received response: {resp:?}"); + Ok(resp) } /// Fetches an SLRU segment. @@ -305,13 +333,45 @@ impl PageserverClient { &self, req: page_api::GetSlruSegmentRequest, ) -> tonic::Result { - self.retry - .with(async |_| { - // SLRU segments are only available on shard 0. - let mut client = self.shards.load_full().get_zero().client().await?; - client.get_slru_segment(req).await - }) - .await + debug!("sending request: {req:?}"); + let resp = Self::with_retries(CALL_TIMEOUT, async |_| { + // SLRU segments are only available on shard 0. + let mut client = self.shards.load_full().get_zero().client().await?; + Self::with_timeout(REQUEST_TIMEOUT, client.get_slru_segment(req)).await + }) + .await?; + debug!("received response: {resp:?}"); + Ok(resp) + } + + /// Runs the given async closure with retries up to the given timeout. Only certain gRPC status + /// codes are retried, see [`Retry::should_retry`]. Returns `DeadlineExceeded` on timeout. + async fn with_retries(timeout: Duration, f: F) -> tonic::Result + where + F: FnMut(usize) -> O, // pass attempt number, starting at 0 + O: Future>, + { + Retry { + timeout: Some(timeout), + base_backoff: BASE_BACKOFF, + max_backoff: MAX_BACKOFF, + } + .with(f) + .await + } + + /// Runs the given future with a timeout. Returns `DeadlineExceeded` on timeout. + async fn with_timeout( + timeout: Duration, + f: impl Future>, + ) -> tonic::Result { + let started = Instant::now(); + tokio::time::timeout(timeout, f).await.map_err(|_| { + tonic::Status::deadline_exceeded(format!( + "request timed out after {:.3}s", + started.elapsed().as_secs_f64() + )) + })? } } @@ -525,19 +585,25 @@ impl Shard { } /// Returns a pooled client for this shard. + #[instrument(skip_all)] async fn client(&self) -> tonic::Result { - self.client_pool - .get() - .await - .map_err(|err| tonic::Status::internal(format!("failed to get client: {err}"))) + warn_slow( + "client pool acquisition", + SLOW_THRESHOLD, + pin!(self.client_pool.get()), + ) + .await + .map_err(|err| tonic::Status::internal(format!("failed to get client: {err}"))) } /// Returns a pooled stream for this shard. If `bulk` is `true`, uses the dedicated bulk stream /// pool (e.g. for prefetches). + #[instrument(skip_all, fields(bulk))] async fn stream(&self, bulk: bool) -> StreamGuard { - match bulk { - false => self.stream_pool.get().await, - true => self.bulk_stream_pool.get().await, - } + let pool = match bulk { + false => &self.stream_pool, + true => &self.bulk_stream_pool, + }; + warn_slow("stream pool acquisition", SLOW_THRESHOLD, pin!(pool.get())).await } } diff --git a/pageserver/client_grpc/src/retry.rs b/pageserver/client_grpc/src/retry.rs index a1e0b8636f..8a138711e8 100644 --- a/pageserver/client_grpc/src/retry.rs +++ b/pageserver/client_grpc/src/retry.rs @@ -1,5 +1,6 @@ use std::time::Duration; +use futures::future::pending; use tokio::time::Instant; use tracing::{error, info, warn}; @@ -8,60 +9,54 @@ use utils::backoff::exponential_backoff_duration; /// A retry handler for Pageserver gRPC requests. /// /// This is used instead of backoff::retry for better control and observability. -pub struct Retry; +pub struct Retry { + /// Timeout across all retry attempts. If None, retries forever. + pub timeout: Option, + /// The initial backoff duration. The first retry does not use a backoff. + pub base_backoff: Duration, + /// The maximum backoff duration. + pub max_backoff: Duration, +} impl Retry { - /// The per-request timeout. - // TODO: tune these, and/or make them configurable. Should we retry forever? - const REQUEST_TIMEOUT: Duration = Duration::from_secs(10); - /// The total timeout across all attempts - const TOTAL_TIMEOUT: Duration = Duration::from_secs(60); - /// The initial backoff duration. - const BASE_BACKOFF: Duration = Duration::from_millis(10); - /// The maximum backoff duration. - const MAX_BACKOFF: Duration = Duration::from_secs(10); - /// If true, log successful requests. For debugging. - const LOG_SUCCESS: bool = false; - - /// Runs the given async closure with timeouts and retries (exponential backoff), passing the - /// attempt number starting at 0. Logs errors, using the current tracing span for context. + /// Runs the given async closure with timeouts and retries (exponential backoff). Logs errors, + /// using the current tracing span for context. /// - /// Only certain gRPC status codes are retried, see [`Self::should_retry`]. For default - /// timeouts, see [`Self::REQUEST_TIMEOUT`] and [`Self::TOTAL_TIMEOUT`]. + /// Only certain gRPC status codes are retried, see [`Self::should_retry`]. pub async fn with(&self, mut f: F) -> tonic::Result where - F: FnMut(usize) -> O, // takes attempt number, starting at 0 + F: FnMut(usize) -> O, // pass attempt number, starting at 0 O: Future>, { let started = Instant::now(); - let deadline = started + Self::TOTAL_TIMEOUT; + let deadline = self.timeout.map(|timeout| started + timeout); let mut last_error = None; let mut retries = 0; loop { - // Set up a future to wait for the backoff (if any) and run the request with a timeout. + // Set up a future to wait for the backoff, if any, and run the closure. let backoff_and_try = async { // NB: sleep() always sleeps 1ms, even when given a 0 argument. See: // https://github.com/tokio-rs/tokio/issues/6866 - if let Some(backoff) = Self::backoff_duration(retries) { + if let Some(backoff) = self.backoff_duration(retries) { tokio::time::sleep(backoff).await; } - let request_started = Instant::now(); - tokio::time::timeout(Self::REQUEST_TIMEOUT, f(retries)) - .await - .map_err(|_| { - tonic::Status::deadline_exceeded(format!( - "request timed out after {:.3}s", - request_started.elapsed().as_secs_f64() - )) - })? + f(retries).await }; - // Wait for the backoff and request, or bail out if the total timeout is exceeded. + // Set up a future for the timeout, if any. + let timeout = async { + match deadline { + Some(deadline) => tokio::time::sleep_until(deadline).await, + None => pending().await, + } + }; + + // Wait for the backoff and request, or bail out if the timeout is exceeded. let result = tokio::select! { result = backoff_and_try => result, - _ = tokio::time::sleep_until(deadline) => { + _ = timeout => { let last_error = last_error.unwrap_or_else(|| { tonic::Status::deadline_exceeded(format!( "request timed out after {:.3}s", @@ -79,7 +74,7 @@ impl Retry { match result { // Success, return the result. Ok(result) => { - if retries > 0 || Self::LOG_SUCCESS { + if retries > 0 { info!( "request succeeded after {retries} retries in {:.3}s", started.elapsed().as_secs_f64(), @@ -112,12 +107,13 @@ impl Retry { } } - /// Returns the backoff duration for the given retry attempt, or None for no backoff. - fn backoff_duration(retry: usize) -> Option { + /// Returns the backoff duration for the given retry attempt, or None for no backoff. The first + /// attempt and first retry never backs off, so this returns None for 0 and 1 retries. + fn backoff_duration(&self, retries: usize) -> Option { let backoff = exponential_backoff_duration( - retry as u32, - Self::BASE_BACKOFF.as_secs_f64(), - Self::MAX_BACKOFF.as_secs_f64(), + (retries as u32).saturating_sub(1), // first retry does not back off + self.base_backoff.as_secs_f64(), + self.max_backoff.as_secs_f64(), ); (!backoff.is_zero()).then_some(backoff) }