From f6cc5cbd0cb296f835762b49d064741b91eb6b9b Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Fri, 4 Jul 2025 20:20:09 +0200 Subject: [PATCH] Split out retry handler to separate module --- pageserver/client_grpc/src/client.rs | 187 ++++++--------------------- pageserver/client_grpc/src/lib.rs | 1 + pageserver/client_grpc/src/retry.rs | 146 +++++++++++++++++++++ 3 files changed, 189 insertions(+), 145 deletions(-) create mode 100644 pageserver/client_grpc/src/retry.rs diff --git a/pageserver/client_grpc/src/client.rs b/pageserver/client_grpc/src/client.rs index d026751a77..c21ce2e47d 100644 --- a/pageserver/client_grpc/src/client.rs +++ b/pageserver/client_grpc/src/client.rs @@ -3,16 +3,15 @@ use std::sync::Arc; use anyhow::anyhow; use futures::stream::FuturesUnordered; -use futures::{FutureExt as _, StreamExt}; -use tokio::time::Instant; -use tracing::{error, info, instrument, warn}; +use futures::{FutureExt as _, StreamExt as _}; +use tracing::instrument; use crate::pool::{ChannelPool, ClientGuard, ClientPool, StreamGuard, StreamPool}; +use crate::retry::Retry; use crate::split::GetPageSplitter; use compute_api::spec::PageserverProtocol; use pageserver_api::shard::ShardStripeSize; use pageserver_page_api as page_api; -use utils::backoff::exponential_backoff_duration; use utils::id::{TenantId, TimelineId}; use utils::shard::{ShardCount, ShardIndex, ShardNumber}; @@ -31,6 +30,7 @@ use utils::shard::{ShardCount, ShardIndex, ShardNumber}; pub struct PageserverClient { // TODO: support swapping out the shard map, e.g. via an ArcSwap. shards: Shards, + retry: Retry, } impl PageserverClient { @@ -44,7 +44,10 @@ impl PageserverClient { auth_token: Option, ) -> anyhow::Result { let shards = Shards::new(tenant_id, timeline_id, shard_map, stripe_size, auth_token)?; - Ok(Self { shards }) + Ok(Self { + shards, + retry: Retry, + }) } /// Returns whether a relation exists. @@ -53,12 +56,13 @@ impl PageserverClient { &self, req: page_api::CheckRelExistsRequest, ) -> tonic::Result { - self.with_retries(async || { - // Relation metadata is only available on shard 0. - let mut client = self.shards.get_zero().client().await?; - client.check_rel_exists(req).await - }) - .await + self.retry + .with(async || { + // Relation metadata is only available on shard 0. + let mut client = self.shards.get_zero().client().await?; + client.check_rel_exists(req).await + }) + .await } /// Returns the total size of a database, as # of bytes. @@ -67,19 +71,20 @@ impl PageserverClient { &self, req: page_api::GetDbSizeRequest, ) -> tonic::Result { - self.with_retries(async || { - // Relation metadata is only available on shard 0. - let mut client = self.shards.get_zero().client().await?; - client.get_db_size(req).await - }) - .await + self.retry + .with(async || { + // Relation metadata is only available on shard 0. + let mut client = self.shards.get_zero().client().await?; + client.get_db_size(req).await + }) + .await } - /// Fetches pages. The `request_id` must be unique across all in-flight requests. Will - /// automatically split requests that span multiple shards, and reassemble the responses. + /// Fetches pages. The `request_id` must be unique across all in-flight requests. Automatically + /// splits requests that straddle shard boundaries, and assembles the responses. /// - /// Unlike the `page_api::Client`, this client automatically converts `status_code` into - /// `tonic::Status` errors. All responses will have `GetPageStatusCode::Ok`. + /// Unlike `page_api::Client`, this automatically converts `status_code` into `tonic::Status` + /// errors. All responses will have `GetPageStatusCode::Ok`. #[instrument(skip_all, fields( req_id = %req.request_id, rel = %req.rel, @@ -134,7 +139,8 @@ impl PageserverClient { req: page_api::GetPageRequest, ) -> tonic::Result { let resp = self - .with_retries(async || { + .retry + .with(async || { let stream = self.shards.get(shard_id)?.stream().await; let resp = stream.send(req.clone()).await?; @@ -168,12 +174,13 @@ impl PageserverClient { &self, req: page_api::GetRelSizeRequest, ) -> tonic::Result { - self.with_retries(async || { - // Relation metadata is only available on shard 0. - let mut client = self.shards.get_zero().client().await?; - client.get_rel_size(req).await - }) - .await + self.retry + .with(async || { + // Relation metadata is only available on shard 0. + let mut client = self.shards.get_zero().client().await?; + client.get_rel_size(req).await + }) + .await } /// Fetches an SLRU segment. @@ -182,123 +189,13 @@ impl PageserverClient { &self, req: page_api::GetSlruSegmentRequest, ) -> tonic::Result { - self.with_retries(async || { - // SLRU segments are only available on shard 0. - let mut client = self.shards.get_zero().client().await?; - client.get_slru_segment(req).await - }) - .await - } - - /// Runs the given closure with retries (exponential backoff). Logs errors. - async fn with_retries(&self, mut f: F) -> tonic::Result - where - F: FnMut() -> O, - O: Future>, - { - // TODO: tune these, and/or make them configurable. Should we retry forever? - const REQUEST_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10); - const TOTAL_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(60); - const BASE_BACKOFF: f64 = 0.1; - const MAX_BACKOFF: f64 = 10.0; - const LOG_SUCCESS: bool = false; // TODO: for debugging - - fn should_retry(code: tonic::Code) -> bool { - match code { - tonic::Code::Ok => panic!("unexpected Ok status code"), - // These codes are transient, so retry them. - tonic::Code::Aborted => true, - tonic::Code::Cancelled => true, - tonic::Code::DeadlineExceeded => true, // maybe transient slowness - tonic::Code::Internal => true, // maybe transient failure? - tonic::Code::ResourceExhausted => true, - tonic::Code::Unavailable => true, - // The following codes will like continue to fail, so don't retry. - tonic::Code::AlreadyExists => false, - tonic::Code::DataLoss => false, - tonic::Code::FailedPrecondition => false, - tonic::Code::InvalidArgument => false, - tonic::Code::NotFound => false, - tonic::Code::OutOfRange => false, - tonic::Code::PermissionDenied => false, - tonic::Code::Unauthenticated => false, - tonic::Code::Unimplemented => false, - tonic::Code::Unknown => false, - } - } - - let started = Instant::now(); - let deadline = started + TOTAL_TIMEOUT; - let mut last_error = None; - let mut retries = 0; - loop { - // Set up a future to wait for the backoff (if any) and run the request with a timeout. - let backoff = exponential_backoff_duration(retries, BASE_BACKOFF, MAX_BACKOFF); - let backoff_and_try = async { - tokio::time::sleep(backoff).await; - let request_started = Instant::now(); - tokio::time::timeout(REQUEST_TIMEOUT, f()) - .await - .map_err(|_| { - tonic::Status::deadline_exceeded(format!( - "request timed out after {:.3}s", - request_started.elapsed().as_secs_f64() - )) - })? - }; - - // Wait for the backoff and request, or bail out if the total timeout is exceeded. - let result = tokio::select! { - result = backoff_and_try => result, - - _ = tokio::time::sleep_until(deadline) => { - let last_error = last_error.unwrap_or_else(|| { - tonic::Status::deadline_exceeded(format!( - "request timed out after {:.3}s", - started.elapsed().as_secs_f64() - )) - }); - error!( - "giving up after {:.3}s and {retries} retries, last error {:?}: {}", - started.elapsed().as_secs_f64(), last_error.code(), last_error.message(), - ); - return Err(last_error); - } - }; - - match result { - Ok(result) => { - if retries > 0 || LOG_SUCCESS { - info!( - "request succeeded after {retries} retries in {:.3}s", - started.elapsed().as_secs_f64(), - ); - } - - return Ok(result); - } - - Err(status) => { - let (code, message) = (status.code(), status.message()); - let should_retry = should_retry(code); - let attempt = retries + 1; - - if !should_retry { - // NB: include the attempt here too. This isn't necessarily the first - // attempt, because the error may change between attempts. - error!( - "request failed with {code:?}: {message}, not retrying (attempt {attempt})" - ); - return Err(status); - } - - warn!("request failed with {code:?}: {message}, retrying (attempt {attempt})"); - - retries += 1; - last_error = Some(status); - } - } - } + self.retry + .with(async || { + // SLRU segments are only available on shard 0. + let mut client = self.shards.get_zero().client().await?; + client.get_slru_segment(req).await + }) + .await } } diff --git a/pageserver/client_grpc/src/lib.rs b/pageserver/client_grpc/src/lib.rs index 840e6eeb5a..3fc7178be2 100644 --- a/pageserver/client_grpc/src/lib.rs +++ b/pageserver/client_grpc/src/lib.rs @@ -1,5 +1,6 @@ mod client; mod pool; +mod retry; mod split; pub use client::PageserverClient; diff --git a/pageserver/client_grpc/src/retry.rs b/pageserver/client_grpc/src/retry.rs new file mode 100644 index 0000000000..c72522cbc0 --- /dev/null +++ b/pageserver/client_grpc/src/retry.rs @@ -0,0 +1,146 @@ +use std::time::Duration; + +use tokio::time::Instant; +use tracing::{error, info, warn}; + +use utils::backoff::exponential_backoff_duration; + +/// A retry handler for Pageserver gRPC requests. +/// +/// This is used instead of backoff::retry for better control and observability. +pub struct Retry; + +impl Retry { + /// The per-request timeout. + // TODO: tune these, and/or make them configurable. Should we retry forever? + const REQUEST_TIMEOUT: Duration = Duration::from_secs(10); + /// The total timeout across all attempts + const TOTAL_TIMEOUT: Duration = Duration::from_secs(60); + /// The initial backoff duration. + const BASE_BACKOFF: Duration = Duration::from_millis(10); + /// The maximum backoff duration. + const MAX_BACKOFF: Duration = Duration::from_secs(10); + /// If true, log successful requests. For debugging. + const LOG_SUCCESS: bool = false; + + /// Runs the given async closure with timeouts and retries (exponential backoff). Logs errors, + /// using the current tracing span for context. + /// + /// Only certain gRPC status codes are retried, see [`Self::should_retry`]. For default + /// timeouts, see [`Self::REQUEST_TIMEOUT`] and [`Self::TOTAL_TIMEOUT`]. + pub async fn with(&self, mut f: F) -> tonic::Result + where + F: FnMut() -> O, + O: Future>, + { + let started = Instant::now(); + let deadline = started + Self::TOTAL_TIMEOUT; + let mut last_error = None; + let mut retries = 0; + loop { + // Set up a future to wait for the backoff (if any) and run the request with a timeout. + let backoff_and_try = async { + tokio::time::sleep(Self::backoff_duration(retries)).await; + + let request_started = Instant::now(); + tokio::time::timeout(Self::REQUEST_TIMEOUT, f()) + .await + .map_err(|_| { + tonic::Status::deadline_exceeded(format!( + "request timed out after {:.3}s", + request_started.elapsed().as_secs_f64() + )) + })? + }; + + // Wait for the backoff and request, or bail out if the total timeout is exceeded. + let result = tokio::select! { + result = backoff_and_try => result, + + _ = tokio::time::sleep_until(deadline) => { + let last_error = last_error.unwrap_or_else(|| { + tonic::Status::deadline_exceeded(format!( + "request timed out after {:.3}s", + started.elapsed().as_secs_f64() + )) + }); + error!( + "giving up after {:.3}s and {retries} retries, last error {:?}: {}", + started.elapsed().as_secs_f64(), last_error.code(), last_error.message(), + ); + return Err(last_error); + } + }; + + match result { + // Success, return the result. + Ok(result) => { + if retries > 0 || Self::LOG_SUCCESS { + info!( + "request succeeded after {retries} retries in {:.3}s", + started.elapsed().as_secs_f64(), + ); + } + + return Ok(result); + } + + // Error, retry or bail out. + Err(status) => { + let (code, message) = (status.code(), status.message()); + let attempt = retries + 1; + + if !Self::should_retry(code) { + // NB: include the attempt here too. This isn't necessarily the first + // attempt, because the error may change between attempts. + error!( + "request failed with {code:?}: {message}, not retrying (attempt {attempt})" + ); + return Err(status); + } + + warn!("request failed with {code:?}: {message}, retrying (attempt {attempt})"); + + retries += 1; + last_error = Some(status); + } + } + } + } + + /// Returns the backoff duration for the given retry attempt. + fn backoff_duration(retry: usize) -> Duration { + exponential_backoff_duration( + retry as u32, + Self::BASE_BACKOFF.as_secs_f64(), + Self::MAX_BACKOFF.as_secs_f64(), + ) + } + + /// Returns true if the given status code should be retries. + fn should_retry(code: tonic::Code) -> bool { + match code { + tonic::Code::Ok => panic!("unexpected Ok status code"), + + // These codes are transient, so retry them. + tonic::Code::Aborted => true, + tonic::Code::Cancelled => true, + tonic::Code::DeadlineExceeded => true, // maybe transient slowness + tonic::Code::Internal => true, // maybe transient failure? + tonic::Code::ResourceExhausted => true, + tonic::Code::Unavailable => true, + + // The following codes will like continue to fail, so don't retry. + tonic::Code::AlreadyExists => false, + tonic::Code::DataLoss => false, + tonic::Code::FailedPrecondition => false, + tonic::Code::InvalidArgument => false, + tonic::Code::NotFound => false, + tonic::Code::OutOfRange => false, + tonic::Code::PermissionDenied => false, + tonic::Code::Unauthenticated => false, + tonic::Code::Unimplemented => false, + tonic::Code::Unknown => false, + } + } +}