diff --git a/pageserver/client_grpc/src/client.rs b/pageserver/client_grpc/src/client.rs index b8ee57bf9f..dad37ebe74 100644 --- a/pageserver/client_grpc/src/client.rs +++ b/pageserver/client_grpc/src/client.rs @@ -230,16 +230,14 @@ impl PageserverClient { ) -> tonic::Result { // Fast path: request is for a single shard. if let Some(shard_id) = - GetPageSplitter::for_single_shard(&req, shards.count, shards.stripe_size) - .map_err(|err| tonic::Status::internal(err.to_string()))? + GetPageSplitter::for_single_shard(&req, shards.count, shards.stripe_size)? { return Self::get_page_with_shard(req, shards.get(shard_id)?).await; } // Request spans multiple shards. Split it, dispatch concurrent per-shard requests, and // reassemble the responses. - let mut splitter = GetPageSplitter::split(req, shards.count, shards.stripe_size) - .map_err(|err| tonic::Status::internal(err.to_string()))?; + let mut splitter = GetPageSplitter::split(req, shards.count, shards.stripe_size)?; let mut shard_requests = FuturesUnordered::new(); for (shard_id, shard_req) in splitter.drain_requests() { @@ -249,14 +247,10 @@ impl PageserverClient { } while let Some((shard_id, shard_response)) = shard_requests.next().await.transpose()? { - splitter - .add_response(shard_id, shard_response) - .map_err(|err| tonic::Status::internal(err.to_string()))?; + splitter.add_response(shard_id, shard_response)?; } - splitter - .get_response() - .map_err(|err| tonic::Status::internal(err.to_string())) + Ok(splitter.collect_response()?) } /// Fetches pages on the given shard. Does not retry internally. diff --git a/pageserver/page_api/src/lib.rs b/pageserver/page_api/src/lib.rs index b44df6337f..b9be6b8b91 100644 --- a/pageserver/page_api/src/lib.rs +++ b/pageserver/page_api/src/lib.rs @@ -24,4 +24,4 @@ mod split; pub use client::Client; pub use model::*; -pub use split::GetPageSplitter; +pub use split::{GetPageSplitter, SplitError}; diff --git a/pageserver/page_api/src/split.rs b/pageserver/page_api/src/split.rs index 5ecc90a166..27c1c995e0 100644 --- a/pageserver/page_api/src/split.rs +++ b/pageserver/page_api/src/split.rs @@ -1,6 +1,5 @@ use std::collections::HashMap; -use anyhow::anyhow; use bytes::Bytes; use crate::model::*; @@ -27,19 +26,19 @@ impl GetPageSplitter { req: &GetPageRequest, count: ShardCount, stripe_size: Option, - ) -> anyhow::Result> { + ) -> Result, SplitError> { // Fast path: unsharded tenant. if count.is_unsharded() { return Ok(Some(ShardIndex::unsharded())); } let Some(stripe_size) = stripe_size else { - return Err(anyhow!("stripe size must be given for sharded tenants")); + return Err("stripe size must be given for sharded tenants".into()); }; // Find the first page's shard, for comparison. let Some(&first_page) = req.block_numbers.first() else { - return Err(anyhow!("no block numbers in request")); + return Err("no block numbers in request".into()); }; let key = rel_block_to_key(req.rel, first_page); let shard_number = key_to_shard_number(count, stripe_size, &key); @@ -60,7 +59,7 @@ impl GetPageSplitter { req: GetPageRequest, count: ShardCount, stripe_size: Option, - ) -> anyhow::Result { + ) -> Result { // The caller should make sure we don't split requests unnecessarily. debug_assert!( Self::for_single_shard(&req, count, stripe_size)?.is_none(), @@ -68,10 +67,10 @@ impl GetPageSplitter { ); if count.is_unsharded() { - return Err(anyhow!("unsharded tenant, no point in splitting request")); + return Err("unsharded tenant, no point in splitting request".into()); } let Some(stripe_size) = stripe_size else { - return Err(anyhow!("stripe size must be given for sharded tenants")); + return Err("stripe size must be given for sharded tenants".into()); }; // Split the requests by shard index. @@ -129,35 +128,32 @@ impl GetPageSplitter { /// Adds a response from the given shard. The response must match the request ID and have an OK /// status code. A response must not already exist for the given shard ID. - #[allow(clippy::result_large_err)] pub fn add_response( &mut self, shard_id: ShardIndex, response: GetPageResponse, - ) -> anyhow::Result<()> { + ) -> Result<(), SplitError> { // The caller should already have converted status codes into tonic::Status. if response.status_code != GetPageStatusCode::Ok { - return Err(anyhow!( + return Err(SplitError(format!( "unexpected non-OK response for shard {shard_id}: {} {}", response.status_code, response.reason.unwrap_or_default() - )); + ))); } if response.request_id != self.response.request_id { - return Err(anyhow!( + return Err(SplitError(format!( "response ID mismatch for shard {shard_id}: expected {}, got {}", - self.response.request_id, - response.request_id - )); + self.response.request_id, response.request_id + ))); } if response.request_id != self.response.request_id { - return Err(anyhow!( + return Err(SplitError(format!( "response ID mismatch for shard {shard_id}: expected {}, got {}", - self.response.request_id, - response.request_id - )); + self.response.request_id, response.request_id + ))); } // Place the shard response pages into the assembled response, in request order. @@ -169,26 +165,27 @@ impl GetPageSplitter { } let Some(slot) = self.response.pages.get_mut(i) else { - return Err(anyhow!("no block_shards slot {i} for shard {shard_id}")); + return Err(SplitError(format!( + "no block_shards slot {i} for shard {shard_id}" + ))); }; let Some(page) = pages.next() else { - return Err(anyhow!( + return Err(SplitError(format!( "missing page {} in shard {shard_id} response", slot.block_number - )); + ))); }; if page.block_number != slot.block_number { - return Err(anyhow!( + return Err(SplitError(format!( "shard {shard_id} returned wrong page at index {i}, expected {} got {}", - slot.block_number, - page.block_number - )); + slot.block_number, page.block_number + ))); } if !slot.image.is_empty() { - return Err(anyhow!( + return Err(SplitError(format!( "shard {shard_id} returned duplicate page {} at index {i}", slot.block_number - )); + ))); } *slot = page; @@ -196,32 +193,54 @@ impl GetPageSplitter { // Make sure we've consumed all pages from the shard response. if let Some(extra_page) = pages.next() { - return Err(anyhow!( + return Err(SplitError(format!( "shard {shard_id} returned extra page: {}", extra_page.block_number - )); + ))); } Ok(()) } - /// Fetches the final, assembled response. - #[allow(clippy::result_large_err)] - pub fn get_response(self) -> anyhow::Result { + /// Collects the final, assembled response. + pub fn collect_response(self) -> Result { // Check that the response is complete. for (i, page) in self.response.pages.iter().enumerate() { if page.image.is_empty() { - return Err(anyhow!( + return Err(SplitError(format!( "missing page {} for shard {}", page.block_number, self.block_shards .get(i) .map(|s| s.to_string()) .unwrap_or_else(|| "?".to_string()) - )); + ))); } } Ok(self.response) } } + +/// A GetPageSplitter error. +#[derive(Debug, thiserror::Error)] +#[error("{0}")] +pub struct SplitError(String); + +impl From<&str> for SplitError { + fn from(err: &str) -> Self { + SplitError(err.to_string()) + } +} + +impl From for SplitError { + fn from(err: String) -> Self { + SplitError(err) + } +} + +impl From for tonic::Status { + fn from(err: SplitError) -> Self { + tonic::Status::internal(err.0) + } +} diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index f16046657a..116e289e99 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -3685,8 +3685,7 @@ impl GrpcPageServiceHandler { // Fast path: the request fits in a single shard. if let Some(shard_index) = - GetPageSplitter::for_single_shard(&req, shard_id.count, Some(shard_id.stripe_size)) - .map_err(|err| tonic::Status::internal(err.to_string()))? + GetPageSplitter::for_single_shard(&req, shard_id.count, Some(shard_id.stripe_size))? { // We got the shard ID from the first page, so these must be equal. assert_eq!(shard_index.shard_number, shard_id.number); @@ -3697,8 +3696,7 @@ impl GrpcPageServiceHandler { // The request spans multiple shards; split it and dispatch parallel requests. All pages // were originally in the parent shard, and during a split all children are local, so we // expect to find local shards for all pages. - let mut splitter = GetPageSplitter::split(req, shard_id.count, Some(shard_id.stripe_size)) - .map_err(|err| tonic::Status::internal(err.to_string()))?; + let mut splitter = GetPageSplitter::split(req, shard_id.count, Some(shard_id.stripe_size))?; let mut shard_requests = FuturesUnordered::new(); for (shard_index, shard_req) in splitter.drain_requests() { @@ -3717,14 +3715,10 @@ impl GrpcPageServiceHandler { } while let Some((shard_index, shard_response)) = shard_requests.next().await.transpose()? { - splitter - .add_response(shard_index, shard_response) - .map_err(|err| tonic::Status::internal(err.to_string()))?; + splitter.add_response(shard_index, shard_response)?; } - splitter - .get_response() - .map_err(|err| tonic::Status::internal(err.to_string())) + Ok(splitter.collect_response()?) } }