diff --git a/pageserver/client_grpc/src/client.rs b/pageserver/client_grpc/src/client.rs index 567c47a9bd..46900b3a9c 100644 --- a/pageserver/client_grpc/src/client.rs +++ b/pageserver/client_grpc/src/client.rs @@ -6,7 +6,7 @@ use anyhow::anyhow; use arc_swap::ArcSwap; use futures::stream::FuturesUnordered; use futures::{FutureExt as _, StreamExt as _}; -use tracing::{instrument, warn}; +use tracing::instrument; use crate::pool::{ChannelPool, ClientGuard, ClientPool, StreamGuard, StreamPool}; use crate::retry::Retry; @@ -112,7 +112,7 @@ impl PageserverClient { self.retry .with(async || { // Relation metadata is only available on shard 0. - let mut client = self.shards.load().get_zero().client().await?; + let mut client = self.shards.load_full().get_zero().client().await?; client.check_rel_exists(req).await }) .await @@ -127,7 +127,7 @@ impl PageserverClient { self.retry .with(async || { // Relation metadata is only available on shard 0. - let mut client = self.shards.load().get_zero().client().await?; + let mut client = self.shards.load_full().get_zero().client().await?; client.get_db_size(req).await }) .await @@ -155,54 +155,30 @@ impl PageserverClient { return Err(tonic::Status::invalid_argument("no block number")); } - // The shard map may change while we're fetching pages. We execute the request with a stable - // view of the current shards, but if it fails and the shard map was changed concurrently, - // we retry with the new shard map. We have to do this in an outer retry loop because the - // shard map change may require us to resplit the request along different shard boundaries. + // The shard map may change while we're fetching pages. We execute the request using a + // stable view of the shards (especially important for requests that span shards), but retry + // the top-level (pre-split) request to pick up shard map changes. This can lead to + // unnecessary retries and re-splits in some cases where requests span shards, but these are + // expected to be rare. // - // TODO: do we need similary retry logic for other requests? Consider moving this into Retry - // somehow. - // - // TODO: we clone the request a bunch of places because of retries. We should pass a - // reference instead and clone at the leaves, but it requires some lifetime juggling. - loop { - let shards = self.shards.load_full(); - match Self::get_page_with_shards(req.clone(), self.shards.load_full(), self.retry).await - { - Ok(resp) => return Ok(resp), - Err(status) => { - // If the shard map didn't change, just return the error. - if Arc::ptr_eq(&shards, &self.shards.load()) { - return Err(status); - } - - // Otherwise, retry the request with the new shard map. - // - // TODO: we retry all errors here. Moved shards will typically return NotFound - // which is not normally retried. Consider only retrying NotFound here. This - // also needs to be coordinated with the server-side shard split logic. - warn!( - "shard map changed, retrying GetPage error {}: {}", - status.code(), - status.message() - ); - } - } - } + // TODO: the gRPC server and client doesn't yet properly support shard splits. Revisit this + // once we figure out how to handle these. + self.retry + .with(async || Self::get_page_with_shards(req.clone(), &self.shards.load_full()).await) + .await } /// Fetches pages using the given shards. This uses a stable view of the shards, regardless of - /// any concurrent shard map updates. + /// concurrent shard map updates. Does not retry internally, but is retried by `get_page()`. async fn get_page_with_shards( req: page_api::GetPageRequest, - shards: Arc, - retry: Retry, + shards: &Shards, ) -> tonic::Result { // Fast path: request is for a single shard. if let Some(shard_id) = GetPageSplitter::is_single_shard(&req, shards.count, shards.stripe_size) { - return Self::get_page_with_shard(req, shards.get(shard_id)?, retry).await; + return Self::get_page_with_shard(req, shards.get(shard_id)?).await; } // Request spans multiple shards. Split it, dispatch concurrent per-shard requests, and @@ -211,50 +187,40 @@ impl PageserverClient { let mut shard_requests = FuturesUnordered::new(); for (shard_id, shard_req) in splitter.drain_requests() { - // NB: each request will retry internally. - let future = Self::get_page_with_shard(shard_req, shards.get(shard_id)?, retry) + let future = Self::get_page_with_shard(shard_req, shards.get(shard_id)?) .map(move |result| result.map(|resp| (shard_id, resp))); shard_requests.push(future); } while let Some((shard_id, shard_response)) = shard_requests.next().await.transpose()? { - splitter.add_response(shard_id, shard_response)?; + splitter.add_response(shard_id, shard_response); } splitter.assemble_response() } - /// Fetches pages on the given shard. - #[instrument(skip_all, fields(shard = %shard.id))] + /// Fetches pages on the given shard. Does not retry internally. async fn get_page_with_shard( req: page_api::GetPageRequest, shard: &Shard, - retry: Retry, ) -> tonic::Result { - let resp = retry - .with(async || { - let stream = shard.stream(req.request_class.is_bulk()).await; - let resp = stream.send(req.clone()).await?; + let expected = req.block_numbers.len(); + let stream = shard.stream(req.request_class.is_bulk()).await; + let resp = stream.send(req).await?; - // Convert per-request errors into a tonic::Status. - if resp.status_code != page_api::GetPageStatusCode::Ok { - return Err(tonic::Status::new( - resp.status_code.into(), - resp.reason.unwrap_or_else(|| String::from("unknown error")), - )); - } + // Convert per-request errors into a tonic::Status. + if resp.status_code != page_api::GetPageStatusCode::Ok { + return Err(tonic::Status::new( + resp.status_code.into(), + resp.reason.unwrap_or_else(|| String::from("unknown error")), + )); + } - Ok(resp) - }) - .await?; - - // Make sure we got the right number of pages. - // NB: check outside of the retry loop, since we don't want to retry this. - let (expected, actual) = (req.block_numbers.len(), resp.page_images.len()); + // Check that we received the expected number of pages. + let actual = resp.page_images.len(); if expected != actual { - return Err(tonic::Status::internal(format!( - "expected {expected} pages for shard {}, got {actual}", - shard.id, + return Err(tonic::Status::data_loss(format!( + "expected {expected} pages, got {actual}", ))); } @@ -270,7 +236,7 @@ impl PageserverClient { self.retry .with(async || { // Relation metadata is only available on shard 0. - let mut client = self.shards.load().get_zero().client().await?; + let mut client = self.shards.load_full().get_zero().client().await?; client.get_rel_size(req).await }) .await @@ -285,7 +251,7 @@ impl PageserverClient { self.retry .with(async || { // SLRU segments are only available on shard 0. - let mut client = self.shards.load().get_zero().client().await?; + let mut client = self.shards.load_full().get_zero().client().await?; client.get_slru_segment(req).await }) .await @@ -419,8 +385,6 @@ impl Shards { /// * Bulk client pool: unbounded. /// * Bulk stream pool: MAX_BULK_STREAMS and MAX_BULK_STREAM_QUEUE_DEPTH. struct Shard { - /// The shard ID. - id: ShardIndex, /// Unary gRPC client pool. client_pool: Arc, /// GetPage stream pool. @@ -482,7 +446,6 @@ impl Shard { ); Ok(Self { - id: shard_id, client_pool, stream_pool, bulk_stream_pool, diff --git a/pageserver/client_grpc/src/split.rs b/pageserver/client_grpc/src/split.rs index 5bbcaab393..894aaa992c 100644 --- a/pageserver/client_grpc/src/split.rs +++ b/pageserver/client_grpc/src/split.rs @@ -97,40 +97,36 @@ impl GetPageSplitter { self.requests.drain() } - /// Adds a response from the given shard. - #[allow(clippy::result_large_err)] - pub fn add_response( - &mut self, - shard_id: ShardIndex, - response: page_api::GetPageResponse, - ) -> tonic::Result<()> { - // The caller should already have converted status codes into tonic::Status. - assert_eq!(response.status_code, page_api::GetPageStatusCode::Ok); + /// Adds a response from the given shard. The response must match the request ID and have an OK + /// status code. A response must not already exist for the given shard ID. + pub fn add_response(&mut self, shard_id: ShardIndex, response: page_api::GetPageResponse) { + // NB: this is called below a `Retry::with()`, so unrecoverable errors should not use a + // retryable status code (e.g. `Internal`). - // Make sure the response matches the request ID. - if response.request_id != self.request_id { - return Err(tonic::Status::internal(format!( - "response ID {} does not match request ID {}", - response.request_id, self.request_id - ))); - } + // The caller should already have converted status codes into tonic::Status. + assert_eq!( + response.status_code, + page_api::GetPageStatusCode::Ok, + "non-OK response" + ); + + // The stream pool ensures the response matches the request ID. + assert_eq!(response.request_id, self.request_id, "response ID mismatch"); // Add the response data to the map. let old = self.responses.insert(shard_id, response.page_images); - if old.is_some() { - return Err(tonic::Status::internal(format!( - "duplicate response for shard {shard_id}", - ))); - } - - Ok(()) + // We only dispatch one request per shard. + assert!(old.is_none(), "duplicate response for shard {shard_id}"); } /// Assembles the shard responses into a single response. Responses must be present for all /// relevant shards, and the total number of pages must match the original request. #[allow(clippy::result_large_err)] pub fn assemble_response(self) -> tonic::Result { + // NB: this is called below a `Retry::with()`, so unrecoverable errors should not use a + // retryable status code (e.g. `Internal`). + let mut response = page_api::GetPageResponse { request_id: self.request_id, status_code: page_api::GetPageStatusCode::Ok, @@ -149,11 +145,11 @@ impl GetPageSplitter { let page = shard_responses .get_mut(shard_id) .ok_or_else(|| { - tonic::Status::internal(format!("missing response for shard {shard_id}")) + tonic::Status::data_loss(format!("missing response for shard {shard_id}")) })? .next() .ok_or_else(|| { - tonic::Status::internal(format!("missing page from shard {shard_id}")) + tonic::Status::data_loss(format!("missing page from shard {shard_id}")) })?; response.page_images.push(page); } @@ -161,7 +157,7 @@ impl GetPageSplitter { // Make sure there are no additional pages. for (shard_id, mut pages) in shard_responses { if pages.next().is_some() { - return Err(tonic::Status::internal(format!( + return Err(tonic::Status::out_of_range(format!( "extra pages returned from shard {shard_id}" ))); }