Restructure get_page retries

This commit is contained in:
Erik Grinaker
2025-07-06 18:35:47 +02:00
parent 4b06b547c1
commit 341c5f53d8
2 changed files with 57 additions and 98 deletions

View File

@@ -6,7 +6,7 @@ use anyhow::anyhow;
use arc_swap::ArcSwap;
use futures::stream::FuturesUnordered;
use futures::{FutureExt as _, StreamExt as _};
use tracing::{instrument, warn};
use tracing::instrument;
use crate::pool::{ChannelPool, ClientGuard, ClientPool, StreamGuard, StreamPool};
use crate::retry::Retry;
@@ -112,7 +112,7 @@ impl PageserverClient {
self.retry
.with(async || {
// Relation metadata is only available on shard 0.
let mut client = self.shards.load().get_zero().client().await?;
let mut client = self.shards.load_full().get_zero().client().await?;
client.check_rel_exists(req).await
})
.await
@@ -127,7 +127,7 @@ impl PageserverClient {
self.retry
.with(async || {
// Relation metadata is only available on shard 0.
let mut client = self.shards.load().get_zero().client().await?;
let mut client = self.shards.load_full().get_zero().client().await?;
client.get_db_size(req).await
})
.await
@@ -155,54 +155,30 @@ impl PageserverClient {
return Err(tonic::Status::invalid_argument("no block number"));
}
// The shard map may change while we're fetching pages. We execute the request with a stable
// view of the current shards, but if it fails and the shard map was changed concurrently,
// we retry with the new shard map. We have to do this in an outer retry loop because the
// shard map change may require us to resplit the request along different shard boundaries.
// The shard map may change while we're fetching pages. We execute the request using a
// stable view of the shards (especially important for requests that span shards), but retry
// the top-level (pre-split) request to pick up shard map changes. This can lead to
// unnecessary retries and re-splits in some cases where requests span shards, but these are
// expected to be rare.
//
// TODO: do we need similary retry logic for other requests? Consider moving this into Retry
// somehow.
//
// TODO: we clone the request a bunch of places because of retries. We should pass a
// reference instead and clone at the leaves, but it requires some lifetime juggling.
loop {
let shards = self.shards.load_full();
match Self::get_page_with_shards(req.clone(), self.shards.load_full(), self.retry).await
{
Ok(resp) => return Ok(resp),
Err(status) => {
// If the shard map didn't change, just return the error.
if Arc::ptr_eq(&shards, &self.shards.load()) {
return Err(status);
}
// Otherwise, retry the request with the new shard map.
//
// TODO: we retry all errors here. Moved shards will typically return NotFound
// which is not normally retried. Consider only retrying NotFound here. This
// also needs to be coordinated with the server-side shard split logic.
warn!(
"shard map changed, retrying GetPage error {}: {}",
status.code(),
status.message()
);
}
}
}
// TODO: the gRPC server and client doesn't yet properly support shard splits. Revisit this
// once we figure out how to handle these.
self.retry
.with(async || Self::get_page_with_shards(req.clone(), &self.shards.load_full()).await)
.await
}
/// Fetches pages using the given shards. This uses a stable view of the shards, regardless of
/// any concurrent shard map updates.
/// concurrent shard map updates. Does not retry internally, but is retried by `get_page()`.
async fn get_page_with_shards(
req: page_api::GetPageRequest,
shards: Arc<Shards>,
retry: Retry,
shards: &Shards,
) -> tonic::Result<page_api::GetPageResponse> {
// Fast path: request is for a single shard.
if let Some(shard_id) =
GetPageSplitter::is_single_shard(&req, shards.count, shards.stripe_size)
{
return Self::get_page_with_shard(req, shards.get(shard_id)?, retry).await;
return Self::get_page_with_shard(req, shards.get(shard_id)?).await;
}
// Request spans multiple shards. Split it, dispatch concurrent per-shard requests, and
@@ -211,50 +187,40 @@ impl PageserverClient {
let mut shard_requests = FuturesUnordered::new();
for (shard_id, shard_req) in splitter.drain_requests() {
// NB: each request will retry internally.
let future = Self::get_page_with_shard(shard_req, shards.get(shard_id)?, retry)
let future = Self::get_page_with_shard(shard_req, shards.get(shard_id)?)
.map(move |result| result.map(|resp| (shard_id, resp)));
shard_requests.push(future);
}
while let Some((shard_id, shard_response)) = shard_requests.next().await.transpose()? {
splitter.add_response(shard_id, shard_response)?;
splitter.add_response(shard_id, shard_response);
}
splitter.assemble_response()
}
/// Fetches pages on the given shard.
#[instrument(skip_all, fields(shard = %shard.id))]
/// Fetches pages on the given shard. Does not retry internally.
async fn get_page_with_shard(
req: page_api::GetPageRequest,
shard: &Shard,
retry: Retry,
) -> tonic::Result<page_api::GetPageResponse> {
let resp = retry
.with(async || {
let stream = shard.stream(req.request_class.is_bulk()).await;
let resp = stream.send(req.clone()).await?;
let expected = req.block_numbers.len();
let stream = shard.stream(req.request_class.is_bulk()).await;
let resp = stream.send(req).await?;
// Convert per-request errors into a tonic::Status.
if resp.status_code != page_api::GetPageStatusCode::Ok {
return Err(tonic::Status::new(
resp.status_code.into(),
resp.reason.unwrap_or_else(|| String::from("unknown error")),
));
}
// Convert per-request errors into a tonic::Status.
if resp.status_code != page_api::GetPageStatusCode::Ok {
return Err(tonic::Status::new(
resp.status_code.into(),
resp.reason.unwrap_or_else(|| String::from("unknown error")),
));
}
Ok(resp)
})
.await?;
// Make sure we got the right number of pages.
// NB: check outside of the retry loop, since we don't want to retry this.
let (expected, actual) = (req.block_numbers.len(), resp.page_images.len());
// Check that we received the expected number of pages.
let actual = resp.page_images.len();
if expected != actual {
return Err(tonic::Status::internal(format!(
"expected {expected} pages for shard {}, got {actual}",
shard.id,
return Err(tonic::Status::data_loss(format!(
"expected {expected} pages, got {actual}",
)));
}
@@ -270,7 +236,7 @@ impl PageserverClient {
self.retry
.with(async || {
// Relation metadata is only available on shard 0.
let mut client = self.shards.load().get_zero().client().await?;
let mut client = self.shards.load_full().get_zero().client().await?;
client.get_rel_size(req).await
})
.await
@@ -285,7 +251,7 @@ impl PageserverClient {
self.retry
.with(async || {
// SLRU segments are only available on shard 0.
let mut client = self.shards.load().get_zero().client().await?;
let mut client = self.shards.load_full().get_zero().client().await?;
client.get_slru_segment(req).await
})
.await
@@ -419,8 +385,6 @@ impl Shards {
/// * Bulk client pool: unbounded.
/// * Bulk stream pool: MAX_BULK_STREAMS and MAX_BULK_STREAM_QUEUE_DEPTH.
struct Shard {
/// The shard ID.
id: ShardIndex,
/// Unary gRPC client pool.
client_pool: Arc<ClientPool>,
/// GetPage stream pool.
@@ -482,7 +446,6 @@ impl Shard {
);
Ok(Self {
id: shard_id,
client_pool,
stream_pool,
bulk_stream_pool,

View File

@@ -97,40 +97,36 @@ impl GetPageSplitter {
self.requests.drain()
}
/// Adds a response from the given shard.
#[allow(clippy::result_large_err)]
pub fn add_response(
&mut self,
shard_id: ShardIndex,
response: page_api::GetPageResponse,
) -> tonic::Result<()> {
// The caller should already have converted status codes into tonic::Status.
assert_eq!(response.status_code, page_api::GetPageStatusCode::Ok);
/// Adds a response from the given shard. The response must match the request ID and have an OK
/// status code. A response must not already exist for the given shard ID.
pub fn add_response(&mut self, shard_id: ShardIndex, response: page_api::GetPageResponse) {
// NB: this is called below a `Retry::with()`, so unrecoverable errors should not use a
// retryable status code (e.g. `Internal`).
// Make sure the response matches the request ID.
if response.request_id != self.request_id {
return Err(tonic::Status::internal(format!(
"response ID {} does not match request ID {}",
response.request_id, self.request_id
)));
}
// The caller should already have converted status codes into tonic::Status.
assert_eq!(
response.status_code,
page_api::GetPageStatusCode::Ok,
"non-OK response"
);
// The stream pool ensures the response matches the request ID.
assert_eq!(response.request_id, self.request_id, "response ID mismatch");
// Add the response data to the map.
let old = self.responses.insert(shard_id, response.page_images);
if old.is_some() {
return Err(tonic::Status::internal(format!(
"duplicate response for shard {shard_id}",
)));
}
Ok(())
// We only dispatch one request per shard.
assert!(old.is_none(), "duplicate response for shard {shard_id}");
}
/// Assembles the shard responses into a single response. Responses must be present for all
/// relevant shards, and the total number of pages must match the original request.
#[allow(clippy::result_large_err)]
pub fn assemble_response(self) -> tonic::Result<page_api::GetPageResponse> {
// NB: this is called below a `Retry::with()`, so unrecoverable errors should not use a
// retryable status code (e.g. `Internal`).
let mut response = page_api::GetPageResponse {
request_id: self.request_id,
status_code: page_api::GetPageStatusCode::Ok,
@@ -149,11 +145,11 @@ impl GetPageSplitter {
let page = shard_responses
.get_mut(shard_id)
.ok_or_else(|| {
tonic::Status::internal(format!("missing response for shard {shard_id}"))
tonic::Status::data_loss(format!("missing response for shard {shard_id}"))
})?
.next()
.ok_or_else(|| {
tonic::Status::internal(format!("missing page from shard {shard_id}"))
tonic::Status::data_loss(format!("missing page from shard {shard_id}"))
})?;
response.page_images.push(page);
}
@@ -161,7 +157,7 @@ impl GetPageSplitter {
// Make sure there are no additional pages.
for (shard_id, mut pages) in shard_responses {
if pages.next().is_some() {
return Err(tonic::Status::internal(format!(
return Err(tonic::Status::out_of_range(format!(
"extra pages returned from shard {shard_id}"
)));
}