From 5a48365fb92124d6367ae8953dc9d97e76fd3f06 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Mon, 21 Jul 2025 14:28:39 +0200 Subject: [PATCH] pageserver/client_grpc: don't set stripe size for unsharded tenants (#12639) ## Problem We've had bugs where the compute would use the stale default stripe size from an unsharded tenant after the tenant split with a new stripe size. ## Summary of changes Never specify a stripe size for unsharded tenants, to guard against misuse. Only specify it once tenants are sharded and the stripe size can't change. Also opportunistically changes `GetPageSplitter` to return `anyhow::Result`, since we'll be using this in other code paths as well (specifically during server-side shard splits). --- pageserver/client_grpc/src/client.rs | 35 +++++++---- pageserver/client_grpc/src/split.rs | 90 +++++++++++++++++----------- 2 files changed, 78 insertions(+), 47 deletions(-) diff --git a/pageserver/client_grpc/src/client.rs b/pageserver/client_grpc/src/client.rs index 42bc3c40ac..e6a90fb582 100644 --- a/pageserver/client_grpc/src/client.rs +++ b/pageserver/client_grpc/src/client.rs @@ -8,7 +8,6 @@ use anyhow::anyhow; use arc_swap::ArcSwap; use futures::stream::FuturesUnordered; use futures::{FutureExt as _, StreamExt as _}; -use pageserver_api::shard::DEFAULT_STRIPE_SIZE; use tonic::codec::CompressionEncoding; use tracing::{debug, instrument}; use utils::logging::warn_slow; @@ -141,8 +140,8 @@ impl PageserverClient { if !old.count.is_unsharded() && shard_spec.stripe_size != old.stripe_size { return Err(anyhow!( "can't change stripe size from {} to {}", - old.stripe_size, - shard_spec.stripe_size + old.stripe_size.expect("always Some when sharded"), + shard_spec.stripe_size.expect("always Some when sharded") )); } @@ -232,13 +231,15 @@ impl PageserverClient { // Fast path: request is for a single shard. if let Some(shard_id) = GetPageSplitter::for_single_shard(&req, shards.count, shards.stripe_size) + .map_err(|err| tonic::Status::internal(err.to_string()))? { return Self::get_page_with_shard(req, shards.get(shard_id)?).await; } // Request spans multiple shards. Split it, dispatch concurrent per-shard requests, and // reassemble the responses. - let mut splitter = GetPageSplitter::split(req, shards.count, shards.stripe_size); + let mut splitter = GetPageSplitter::split(req, shards.count, shards.stripe_size) + .map_err(|err| tonic::Status::internal(err.to_string()))?; let mut shard_requests = FuturesUnordered::new(); for (shard_id, shard_req) in splitter.drain_requests() { @@ -248,10 +249,14 @@ impl PageserverClient { } while let Some((shard_id, shard_response)) = shard_requests.next().await.transpose()? { - splitter.add_response(shard_id, shard_response)?; + splitter + .add_response(shard_id, shard_response) + .map_err(|err| tonic::Status::internal(err.to_string()))?; } - splitter.get_response() + splitter + .get_response() + .map_err(|err| tonic::Status::internal(err.to_string())) } /// Fetches pages on the given shard. Does not retry internally. @@ -379,12 +384,14 @@ pub struct ShardSpec { /// NB: this is 0 for unsharded tenants, following `ShardIndex::unsharded()` convention. count: ShardCount, /// The stripe size for these shards. - stripe_size: ShardStripeSize, + /// + /// INVARIANT: None for unsharded tenants, Some for sharded. + stripe_size: Option, } impl ShardSpec { /// Creates a new shard spec with the given URLs and stripe size. All shards must be given. - /// The stripe size may be omitted for unsharded tenants. + /// The stripe size must be Some for sharded tenants, or None for unsharded tenants. pub fn new( urls: HashMap, stripe_size: Option, @@ -397,11 +404,13 @@ impl ShardSpec { n => ShardCount::new(n as u8), }; - // Determine the stripe size. It doesn't matter for unsharded tenants. + // Validate the stripe size. if stripe_size.is_none() && !count.is_unsharded() { return Err(anyhow!("stripe size must be given for sharded tenants")); } - let stripe_size = stripe_size.unwrap_or(DEFAULT_STRIPE_SIZE); + if stripe_size.is_some() && count.is_unsharded() { + return Err(anyhow!("stripe size can't be given for unsharded tenants")); + } // Validate the shard spec. for (shard_id, url) in &urls { @@ -441,8 +450,10 @@ struct Shards { /// /// NB: this is 0 for unsharded tenants, following `ShardIndex::unsharded()` convention. count: ShardCount, - /// The stripe size. Only used for sharded tenants. - stripe_size: ShardStripeSize, + /// The stripe size. + /// + /// INVARIANT: None for unsharded tenants, Some for sharded. + stripe_size: Option, } impl Shards { diff --git a/pageserver/client_grpc/src/split.rs b/pageserver/client_grpc/src/split.rs index ca8965b8dd..8631638686 100644 --- a/pageserver/client_grpc/src/split.rs +++ b/pageserver/client_grpc/src/split.rs @@ -1,11 +1,12 @@ use std::collections::HashMap; +use anyhow::anyhow; use bytes::Bytes; use pageserver_api::key::rel_block_to_key; use pageserver_api::shard::key_to_shard_number; use pageserver_page_api as page_api; -use utils::shard::{ShardCount, ShardIndex, ShardNumber, ShardStripeSize}; +use utils::shard::{ShardCount, ShardIndex, ShardStripeSize}; /// Splits GetPageRequests that straddle shard boundaries and assembles the responses. /// TODO: add tests for this. @@ -25,43 +26,54 @@ impl GetPageSplitter { pub fn for_single_shard( req: &page_api::GetPageRequest, count: ShardCount, - stripe_size: ShardStripeSize, - ) -> Option { + stripe_size: Option, + ) -> anyhow::Result> { // Fast path: unsharded tenant. if count.is_unsharded() { - return Some(ShardIndex::unsharded()); + return Ok(Some(ShardIndex::unsharded())); } - // Find the first page's shard, for comparison. If there are no pages, just return the first - // shard (caller likely checked already, otherwise the server will reject it). + let Some(stripe_size) = stripe_size else { + return Err(anyhow!("stripe size must be given for sharded tenants")); + }; + + // Find the first page's shard, for comparison. let Some(&first_page) = req.block_numbers.first() else { - return Some(ShardIndex::new(ShardNumber(0), count)); + return Err(anyhow!("no block numbers in request")); }; let key = rel_block_to_key(req.rel, first_page); let shard_number = key_to_shard_number(count, stripe_size, &key); - req.block_numbers + Ok(req + .block_numbers .iter() .skip(1) // computed above .all(|&blkno| { let key = rel_block_to_key(req.rel, blkno); key_to_shard_number(count, stripe_size, &key) == shard_number }) - .then_some(ShardIndex::new(shard_number, count)) + .then_some(ShardIndex::new(shard_number, count))) } /// Splits the given request. pub fn split( req: page_api::GetPageRequest, count: ShardCount, - stripe_size: ShardStripeSize, - ) -> Self { + stripe_size: Option, + ) -> anyhow::Result { // The caller should make sure we don't split requests unnecessarily. debug_assert!( - Self::for_single_shard(&req, count, stripe_size).is_none(), + Self::for_single_shard(&req, count, stripe_size)?.is_none(), "unnecessary request split" ); + if count.is_unsharded() { + return Err(anyhow!("unsharded tenant, no point in splitting request")); + } + let Some(stripe_size) = stripe_size else { + return Err(anyhow!("stripe size must be given for sharded tenants")); + }; + // Split the requests by shard index. let mut requests = HashMap::with_capacity(2); // common case let mut block_shards = Vec::with_capacity(req.block_numbers.len()); @@ -103,11 +115,11 @@ impl GetPageSplitter { .collect(), }; - Self { + Ok(Self { requests, response, block_shards, - } + }) } /// Drains the per-shard requests, moving them out of the splitter to avoid extra allocations. @@ -124,21 +136,30 @@ impl GetPageSplitter { &mut self, shard_id: ShardIndex, response: page_api::GetPageResponse, - ) -> tonic::Result<()> { + ) -> anyhow::Result<()> { // The caller should already have converted status codes into tonic::Status. if response.status_code != page_api::GetPageStatusCode::Ok { - return Err(tonic::Status::internal(format!( + return Err(anyhow!( "unexpected non-OK response for shard {shard_id}: {} {}", response.status_code, response.reason.unwrap_or_default() - ))); + )); } if response.request_id != self.response.request_id { - return Err(tonic::Status::internal(format!( + return Err(anyhow!( "response ID mismatch for shard {shard_id}: expected {}, got {}", - self.response.request_id, response.request_id - ))); + self.response.request_id, + response.request_id + )); + } + + if response.request_id != self.response.request_id { + return Err(anyhow!( + "response ID mismatch for shard {shard_id}: expected {}, got {}", + self.response.request_id, + response.request_id + )); } // Place the shard response pages into the assembled response, in request order. @@ -150,27 +171,26 @@ impl GetPageSplitter { } let Some(slot) = self.response.pages.get_mut(i) else { - return Err(tonic::Status::internal(format!( - "no block_shards slot {i} for shard {shard_id}" - ))); + return Err(anyhow!("no block_shards slot {i} for shard {shard_id}")); }; let Some(page) = pages.next() else { - return Err(tonic::Status::internal(format!( + return Err(anyhow!( "missing page {} in shard {shard_id} response", slot.block_number - ))); + )); }; if page.block_number != slot.block_number { - return Err(tonic::Status::internal(format!( + return Err(anyhow!( "shard {shard_id} returned wrong page at index {i}, expected {} got {}", - slot.block_number, page.block_number - ))); + slot.block_number, + page.block_number + )); } if !slot.image.is_empty() { - return Err(tonic::Status::internal(format!( + return Err(anyhow!( "shard {shard_id} returned duplicate page {} at index {i}", slot.block_number - ))); + )); } *slot = page; @@ -178,10 +198,10 @@ impl GetPageSplitter { // Make sure we've consumed all pages from the shard response. if let Some(extra_page) = pages.next() { - return Err(tonic::Status::internal(format!( + return Err(anyhow!( "shard {shard_id} returned extra page: {}", extra_page.block_number - ))); + )); } Ok(()) @@ -189,18 +209,18 @@ impl GetPageSplitter { /// Fetches the final, assembled response. #[allow(clippy::result_large_err)] - pub fn get_response(self) -> tonic::Result { + pub fn get_response(self) -> anyhow::Result { // Check that the response is complete. for (i, page) in self.response.pages.iter().enumerate() { if page.image.is_empty() { - return Err(tonic::Status::internal(format!( + return Err(anyhow!( "missing page {} for shard {}", page.block_number, self.block_shards .get(i) .map(|s| s.to_string()) .unwrap_or_else(|| "?".to_string()) - ))); + )); } }