pageserver/client_grpc: improve retry logic (#12579)

## Problem

gRPC client retries currently include pool acquisition under the
per-attempt timeout. If pool acquisition is slow (e.g. full pool), this
will cause spurious timeout warnings, and the caller will lose its place
in the pool queue.

Touches #11735.

## Summary of changes

Makes several improvements to retries and related logic:

* Don't include pool acquisition time under request timeouts.
* Move attempt timeouts out of `Retry` and into the closure.
* Make `Retry` configurable, move constants into main module.
* Don't backoff on the first retry, and reduce initial/max backoffs to
5ms and 5s respectively.
* Add `with_retries` and `with_timeout` helpers.
* Add slow logging for pool acquisition, and a `warn_slow` counterpart
to `log_slow`.
* Add debug logging for requests and responses at the client boundary.
This commit is contained in:
Erik Grinaker
2025-07-14 12:43:10 +02:00
committed by GitHub
parent fecb707b19
commit d14d8271b8
3 changed files with 189 additions and 99 deletions

View File

@@ -1,4 +1,5 @@
use std::future::Future;
use std::pin::Pin;
use std::str::FromStr;
use std::time::Duration;
@@ -7,7 +8,7 @@ use metrics::{IntCounter, IntCounterVec};
use once_cell::sync::Lazy;
use strum_macros::{EnumString, VariantNames};
use tokio::time::Instant;
use tracing::info;
use tracing::{info, warn};
/// Logs a critical error, similarly to `tracing::error!`. This will:
///
@@ -377,10 +378,11 @@ impl std::fmt::Debug for SecretString {
///
/// TODO: consider upgrading this to a warning, but currently it fires too often.
#[inline]
pub async fn log_slow<F, O>(name: &str, threshold: Duration, f: std::pin::Pin<&mut F>) -> O
where
F: Future<Output = O>,
{
pub async fn log_slow<O>(
name: &str,
threshold: Duration,
f: Pin<&mut impl Future<Output = O>>,
) -> O {
monitor_slow_future(
threshold,
threshold, // period = threshold
@@ -394,16 +396,42 @@ where
if !is_slow {
return;
}
let elapsed = elapsed_total.as_secs_f64();
if ready {
info!(
"slow {name} completed after {:.3}s",
elapsed_total.as_secs_f64()
);
info!("slow {name} completed after {elapsed:.3}s");
} else {
info!(
"slow {name} still running after {:.3}s",
elapsed_total.as_secs_f64()
);
info!("slow {name} still running after {elapsed:.3}s");
}
},
)
.await
}
/// Logs a periodic warning if a future is slow to complete.
#[inline]
pub async fn warn_slow<O>(
name: &str,
threshold: Duration,
f: Pin<&mut impl Future<Output = O>>,
) -> O {
monitor_slow_future(
threshold,
threshold, // period = threshold
f,
|MonitorSlowFutureCallback {
ready,
is_slow,
elapsed_total,
elapsed_since_last_callback: _,
}| {
if !is_slow {
return;
}
let elapsed = elapsed_total.as_secs_f64();
if ready {
warn!("slow {name} completed after {elapsed:.3}s");
} else {
warn!("slow {name} still running after {elapsed:.3}s");
}
},
)
@@ -416,7 +444,7 @@ where
pub async fn monitor_slow_future<F, O>(
threshold: Duration,
period: Duration,
mut fut: std::pin::Pin<&mut F>,
mut fut: Pin<&mut F>,
mut cb: impl FnMut(MonitorSlowFutureCallback),
) -> O
where

View File

@@ -1,13 +1,16 @@
use std::collections::HashMap;
use std::num::NonZero;
use std::pin::pin;
use std::sync::Arc;
use std::time::{Duration, Instant};
use anyhow::anyhow;
use arc_swap::ArcSwap;
use futures::stream::FuturesUnordered;
use futures::{FutureExt as _, StreamExt as _};
use tonic::codec::CompressionEncoding;
use tracing::instrument;
use tracing::{debug, instrument};
use utils::logging::warn_slow;
use crate::pool::{ChannelPool, ClientGuard, ClientPool, StreamGuard, StreamPool};
use crate::retry::Retry;
@@ -44,6 +47,23 @@ const MAX_BULK_STREAMS: NonZero<usize> = NonZero::new(16).unwrap();
/// get a larger queue depth.
const MAX_BULK_STREAM_QUEUE_DEPTH: NonZero<usize> = NonZero::new(4).unwrap();
/// The overall request call timeout, including retries and pool acquisition.
/// TODO: should we retry forever? Should the caller decide?
const CALL_TIMEOUT: Duration = Duration::from_secs(60);
/// The per-request (retry attempt) timeout, including any lazy connection establishment.
const REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
/// The initial request retry backoff duration. The first retry does not back off.
/// TODO: use a different backoff for ResourceExhausted (rate limiting)? Needs server support.
const BASE_BACKOFF: Duration = Duration::from_millis(5);
/// The maximum request retry backoff duration.
const MAX_BACKOFF: Duration = Duration::from_secs(5);
/// Threshold and interval for warning about slow operation.
const SLOW_THRESHOLD: Duration = Duration::from_secs(3);
/// A rich Pageserver gRPC client for a single tenant timeline. This client is more capable than the
/// basic `page_api::Client` gRPC client, and supports:
///
@@ -67,8 +87,6 @@ pub struct PageserverClient {
compression: Option<CompressionEncoding>,
/// The shards for this tenant.
shards: ArcSwap<Shards>,
/// The retry configuration.
retry: Retry,
}
impl PageserverClient {
@@ -94,7 +112,6 @@ impl PageserverClient {
auth_token,
compression,
shards: ArcSwap::new(Arc::new(shards)),
retry: Retry,
})
}
@@ -142,13 +159,15 @@ impl PageserverClient {
&self,
req: page_api::CheckRelExistsRequest,
) -> tonic::Result<page_api::CheckRelExistsResponse> {
self.retry
.with(async |_| {
// Relation metadata is only available on shard 0.
let mut client = self.shards.load_full().get_zero().client().await?;
client.check_rel_exists(req).await
})
.await
debug!("sending request: {req:?}");
let resp = Self::with_retries(CALL_TIMEOUT, async |_| {
// Relation metadata is only available on shard 0.
let mut client = self.shards.load_full().get_zero().client().await?;
Self::with_timeout(REQUEST_TIMEOUT, client.check_rel_exists(req)).await
})
.await?;
debug!("received response: {resp:?}");
Ok(resp)
}
/// Returns the total size of a database, as # of bytes.
@@ -157,13 +176,15 @@ impl PageserverClient {
&self,
req: page_api::GetDbSizeRequest,
) -> tonic::Result<page_api::GetDbSizeResponse> {
self.retry
.with(async |_| {
// Relation metadata is only available on shard 0.
let mut client = self.shards.load_full().get_zero().client().await?;
client.get_db_size(req).await
})
.await
debug!("sending request: {req:?}");
let resp = Self::with_retries(CALL_TIMEOUT, async |_| {
// Relation metadata is only available on shard 0.
let mut client = self.shards.load_full().get_zero().client().await?;
Self::with_timeout(REQUEST_TIMEOUT, client.get_db_size(req)).await
})
.await?;
debug!("received response: {resp:?}");
Ok(resp)
}
/// Fetches pages. The `request_id` must be unique across all in-flight requests, and the
@@ -193,6 +214,8 @@ impl PageserverClient {
return Err(tonic::Status::invalid_argument("request attempt must be 0"));
}
debug!("sending request: {req:?}");
// The shards may change while we're fetching pages. We execute the request using a stable
// view of the shards (especially important for requests that span shards), but retry the
// top-level (pre-split) request to pick up shard changes. This can lead to unnecessary
@@ -201,13 +224,16 @@ impl PageserverClient {
//
// TODO: the gRPC server and client doesn't yet properly support shard splits. Revisit this
// once we figure out how to handle these.
self.retry
.with(async |attempt| {
let mut req = req.clone();
req.request_id.attempt = attempt as u32;
Self::get_page_with_shards(req, &self.shards.load_full()).await
})
.await
let resp = Self::with_retries(CALL_TIMEOUT, async |attempt| {
let mut req = req.clone();
req.request_id.attempt = attempt as u32;
let shards = self.shards.load_full();
Self::with_timeout(REQUEST_TIMEOUT, Self::get_page_with_shards(req, &shards)).await
})
.await?;
debug!("received response: {resp:?}");
Ok(resp)
}
/// Fetches pages using the given shards. This uses a stable view of the shards, regardless of
@@ -290,13 +316,15 @@ impl PageserverClient {
&self,
req: page_api::GetRelSizeRequest,
) -> tonic::Result<page_api::GetRelSizeResponse> {
self.retry
.with(async |_| {
// Relation metadata is only available on shard 0.
let mut client = self.shards.load_full().get_zero().client().await?;
client.get_rel_size(req).await
})
.await
debug!("sending request: {req:?}");
let resp = Self::with_retries(CALL_TIMEOUT, async |_| {
// Relation metadata is only available on shard 0.
let mut client = self.shards.load_full().get_zero().client().await?;
Self::with_timeout(REQUEST_TIMEOUT, client.get_rel_size(req)).await
})
.await?;
debug!("received response: {resp:?}");
Ok(resp)
}
/// Fetches an SLRU segment.
@@ -305,13 +333,45 @@ impl PageserverClient {
&self,
req: page_api::GetSlruSegmentRequest,
) -> tonic::Result<page_api::GetSlruSegmentResponse> {
self.retry
.with(async |_| {
// SLRU segments are only available on shard 0.
let mut client = self.shards.load_full().get_zero().client().await?;
client.get_slru_segment(req).await
})
.await
debug!("sending request: {req:?}");
let resp = Self::with_retries(CALL_TIMEOUT, async |_| {
// SLRU segments are only available on shard 0.
let mut client = self.shards.load_full().get_zero().client().await?;
Self::with_timeout(REQUEST_TIMEOUT, client.get_slru_segment(req)).await
})
.await?;
debug!("received response: {resp:?}");
Ok(resp)
}
/// Runs the given async closure with retries up to the given timeout. Only certain gRPC status
/// codes are retried, see [`Retry::should_retry`]. Returns `DeadlineExceeded` on timeout.
async fn with_retries<T, F, O>(timeout: Duration, f: F) -> tonic::Result<T>
where
F: FnMut(usize) -> O, // pass attempt number, starting at 0
O: Future<Output = tonic::Result<T>>,
{
Retry {
timeout: Some(timeout),
base_backoff: BASE_BACKOFF,
max_backoff: MAX_BACKOFF,
}
.with(f)
.await
}
/// Runs the given future with a timeout. Returns `DeadlineExceeded` on timeout.
async fn with_timeout<T>(
timeout: Duration,
f: impl Future<Output = tonic::Result<T>>,
) -> tonic::Result<T> {
let started = Instant::now();
tokio::time::timeout(timeout, f).await.map_err(|_| {
tonic::Status::deadline_exceeded(format!(
"request timed out after {:.3}s",
started.elapsed().as_secs_f64()
))
})?
}
}
@@ -525,19 +585,25 @@ impl Shard {
}
/// Returns a pooled client for this shard.
#[instrument(skip_all)]
async fn client(&self) -> tonic::Result<ClientGuard> {
self.client_pool
.get()
.await
.map_err(|err| tonic::Status::internal(format!("failed to get client: {err}")))
warn_slow(
"client pool acquisition",
SLOW_THRESHOLD,
pin!(self.client_pool.get()),
)
.await
.map_err(|err| tonic::Status::internal(format!("failed to get client: {err}")))
}
/// Returns a pooled stream for this shard. If `bulk` is `true`, uses the dedicated bulk stream
/// pool (e.g. for prefetches).
#[instrument(skip_all, fields(bulk))]
async fn stream(&self, bulk: bool) -> StreamGuard {
match bulk {
false => self.stream_pool.get().await,
true => self.bulk_stream_pool.get().await,
}
let pool = match bulk {
false => &self.stream_pool,
true => &self.bulk_stream_pool,
};
warn_slow("stream pool acquisition", SLOW_THRESHOLD, pin!(pool.get())).await
}
}

View File

@@ -1,5 +1,6 @@
use std::time::Duration;
use futures::future::pending;
use tokio::time::Instant;
use tracing::{error, info, warn};
@@ -8,60 +9,54 @@ use utils::backoff::exponential_backoff_duration;
/// A retry handler for Pageserver gRPC requests.
///
/// This is used instead of backoff::retry for better control and observability.
pub struct Retry;
pub struct Retry {
/// Timeout across all retry attempts. If None, retries forever.
pub timeout: Option<Duration>,
/// The initial backoff duration. The first retry does not use a backoff.
pub base_backoff: Duration,
/// The maximum backoff duration.
pub max_backoff: Duration,
}
impl Retry {
/// The per-request timeout.
// TODO: tune these, and/or make them configurable. Should we retry forever?
const REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
/// The total timeout across all attempts
const TOTAL_TIMEOUT: Duration = Duration::from_secs(60);
/// The initial backoff duration.
const BASE_BACKOFF: Duration = Duration::from_millis(10);
/// The maximum backoff duration.
const MAX_BACKOFF: Duration = Duration::from_secs(10);
/// If true, log successful requests. For debugging.
const LOG_SUCCESS: bool = false;
/// Runs the given async closure with timeouts and retries (exponential backoff), passing the
/// attempt number starting at 0. Logs errors, using the current tracing span for context.
/// Runs the given async closure with timeouts and retries (exponential backoff). Logs errors,
/// using the current tracing span for context.
///
/// Only certain gRPC status codes are retried, see [`Self::should_retry`]. For default
/// timeouts, see [`Self::REQUEST_TIMEOUT`] and [`Self::TOTAL_TIMEOUT`].
/// Only certain gRPC status codes are retried, see [`Self::should_retry`].
pub async fn with<T, F, O>(&self, mut f: F) -> tonic::Result<T>
where
F: FnMut(usize) -> O, // takes attempt number, starting at 0
F: FnMut(usize) -> O, // pass attempt number, starting at 0
O: Future<Output = tonic::Result<T>>,
{
let started = Instant::now();
let deadline = started + Self::TOTAL_TIMEOUT;
let deadline = self.timeout.map(|timeout| started + timeout);
let mut last_error = None;
let mut retries = 0;
loop {
// Set up a future to wait for the backoff (if any) and run the request with a timeout.
// Set up a future to wait for the backoff, if any, and run the closure.
let backoff_and_try = async {
// NB: sleep() always sleeps 1ms, even when given a 0 argument. See:
// https://github.com/tokio-rs/tokio/issues/6866
if let Some(backoff) = Self::backoff_duration(retries) {
if let Some(backoff) = self.backoff_duration(retries) {
tokio::time::sleep(backoff).await;
}
let request_started = Instant::now();
tokio::time::timeout(Self::REQUEST_TIMEOUT, f(retries))
.await
.map_err(|_| {
tonic::Status::deadline_exceeded(format!(
"request timed out after {:.3}s",
request_started.elapsed().as_secs_f64()
))
})?
f(retries).await
};
// Wait for the backoff and request, or bail out if the total timeout is exceeded.
// Set up a future for the timeout, if any.
let timeout = async {
match deadline {
Some(deadline) => tokio::time::sleep_until(deadline).await,
None => pending().await,
}
};
// Wait for the backoff and request, or bail out if the timeout is exceeded.
let result = tokio::select! {
result = backoff_and_try => result,
_ = tokio::time::sleep_until(deadline) => {
_ = timeout => {
let last_error = last_error.unwrap_or_else(|| {
tonic::Status::deadline_exceeded(format!(
"request timed out after {:.3}s",
@@ -79,7 +74,7 @@ impl Retry {
match result {
// Success, return the result.
Ok(result) => {
if retries > 0 || Self::LOG_SUCCESS {
if retries > 0 {
info!(
"request succeeded after {retries} retries in {:.3}s",
started.elapsed().as_secs_f64(),
@@ -112,12 +107,13 @@ impl Retry {
}
}
/// Returns the backoff duration for the given retry attempt, or None for no backoff.
fn backoff_duration(retry: usize) -> Option<Duration> {
/// Returns the backoff duration for the given retry attempt, or None for no backoff. The first
/// attempt and first retry never backs off, so this returns None for 0 and 1 retries.
fn backoff_duration(&self, retries: usize) -> Option<Duration> {
let backoff = exponential_backoff_duration(
retry as u32,
Self::BASE_BACKOFF.as_secs_f64(),
Self::MAX_BACKOFF.as_secs_f64(),
(retries as u32).saturating_sub(1), // first retry does not back off
self.base_backoff.as_secs_f64(),
self.max_backoff.as_secs_f64(),
);
(!backoff.is_zero()).then_some(backoff)
}