From 88d1127bf4a261104851690b4b9140e7f07b83dc Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Thu, 3 Jul 2025 21:12:26 +0200 Subject: [PATCH] Tweak GetPageSplitter --- pageserver/client_grpc/src/client.rs | 10 ++++---- pageserver/client_grpc/src/split.rs | 36 ++++++++++++++++------------ 2 files changed, 26 insertions(+), 20 deletions(-) diff --git a/pageserver/client_grpc/src/client.rs b/pageserver/client_grpc/src/client.rs index bc15c956aa..d026751a77 100644 --- a/pageserver/client_grpc/src/client.rs +++ b/pageserver/client_grpc/src/client.rs @@ -103,11 +103,11 @@ impl PageserverClient { return self.get_page_for_shard(shard_id, req).await; } - // Slow path: request spans multiple shards. Split it, dispatch per-shard requests in - // parallel, and reassemble the responses. + // Request spans multiple shards. Split it, dispatch concurrent per-shard requests, and + // reassemble the responses. // - // TODO: when we add shard map updates, we need to detect that case and re-split the - // request on errors. + // TODO: when we support shard map updates, we need to detect when it changes and re-split + // the request on errors. let mut splitter = GetPageSplitter::split(req, self.shards.count, self.shards.stripe_size); let mut shard_requests: FuturesUnordered<_> = splitter @@ -123,7 +123,7 @@ impl PageserverClient { splitter.add_response(shard_id, shard_response)?; } - splitter.reassemble() + splitter.assemble_response() } /// Fetches pages that belong to the given shard. diff --git a/pageserver/client_grpc/src/split.rs b/pageserver/client_grpc/src/split.rs index 0a58f57d5b..5bbcaab393 100644 --- a/pageserver/client_grpc/src/split.rs +++ b/pageserver/client_grpc/src/split.rs @@ -7,23 +7,23 @@ use pageserver_api::shard::{ShardStripeSize, key_to_shard_number}; use pageserver_page_api as page_api; use utils::shard::{ShardCount, ShardIndex}; -/// Splits GetPageRequests across shard boundaries and reassembles the responses. +/// Splits GetPageRequests that straddle shard boundaries and assembles the responses. /// TODO: add tests for this. pub struct GetPageSplitter { /// The original request ID. Used for all shard requests. request_id: page_api::RequestID, - /// Requests by shard index. + /// Split requests by shard index. requests: HashMap, - /// Maps the page offset in the input request (index) to the shard index. This is used to - /// reassemble the responses in the same order as the original request. + /// Maps the offset in `GetPageRequest::block_numbers` to the owning shard. Used to assemble + /// the response pages in the same order as the original request. block_shards: Vec, - /// Page responses by shard index. Will be reassembled into a single response. + /// Page responses by shard index. Will be assembled into a single response. responses: HashMap>, } impl GetPageSplitter { - /// Checks if the given request belongs to a single shard, and returns the shard ID. This is the - /// common case, so we do a full scan in order to avoid unnecessary allocations and overhead. + /// Checks if the given request only touches a single shard, and returns the shard ID. This is + /// the common case, so we check first in order to avoid unnecessary allocations and overhead. /// The caller must ensure that the request has at least one block number, or this will panic. pub fn is_single_shard( req: &page_api::GetPageRequest, @@ -57,7 +57,7 @@ impl GetPageSplitter { ) -> Self { // The caller should make sure we don't split requests unnecessarily. debug_assert!( - Self::is_single_shard(&req, count, stripe_size).is_some(), + Self::is_single_shard(&req, count, stripe_size).is_none(), "unnecessary request split" ); @@ -97,7 +97,7 @@ impl GetPageSplitter { self.requests.drain() } - /// Adds a response for the given shard index. + /// Adds a response from the given shard. #[allow(clippy::result_large_err)] pub fn add_response( &mut self, @@ -107,7 +107,7 @@ impl GetPageSplitter { // The caller should already have converted status codes into tonic::Status. assert_eq!(response.status_code, page_api::GetPageStatusCode::Ok); - // Ensure the response is for the same request ID. + // Make sure the response matches the request ID. if response.request_id != self.request_id { return Err(tonic::Status::internal(format!( "response ID {} does not match request ID {}", @@ -117,14 +117,20 @@ impl GetPageSplitter { // Add the response data to the map. let old = self.responses.insert(shard_id, response.page_images); - assert!(old.is_none(), "duplicate response for shard {shard_id}"); + + if old.is_some() { + return Err(tonic::Status::internal(format!( + "duplicate response for shard {shard_id}", + ))); + } Ok(()) } - /// Reassembles the shard responses into a single response. + /// Assembles the shard responses into a single response. Responses must be present for all + /// relevant shards, and the total number of pages must match the original request. #[allow(clippy::result_large_err)] - pub fn reassemble(self) -> tonic::Result { + pub fn assemble_response(self) -> tonic::Result { let mut response = page_api::GetPageResponse { request_id: self.request_id, status_code: page_api::GetPageStatusCode::Ok, @@ -132,7 +138,7 @@ impl GetPageSplitter { page_images: Vec::with_capacity(self.block_shards.len()), }; - // Convert the shard responses into iterators we can conveniently pull from. + // Set up per-shard page iterators we can pull from. let mut shard_responses = HashMap::with_capacity(self.responses.len()); for (shard_id, responses) in self.responses { shard_responses.insert(shard_id, responses.into_iter()); @@ -152,7 +158,7 @@ impl GetPageSplitter { response.page_images.push(page); } - // Make sure we didn't get any additional pages. + // Make sure there are no additional pages. for (shard_id, mut pages) in shard_responses { if pages.next().is_some() { return Err(tonic::Status::internal(format!(