mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 01:12:56 +00:00
pageserver/client_grpc: don't set stripe size for unsharded tenants (#12639)
## Problem We've had bugs where the compute would use the stale default stripe size from an unsharded tenant after the tenant split with a new stripe size. ## Summary of changes Never specify a stripe size for unsharded tenants, to guard against misuse. Only specify it once tenants are sharded and the stripe size can't change. Also opportunistically changes `GetPageSplitter` to return `anyhow::Result`, since we'll be using this in other code paths as well (specifically during server-side shard splits).
This commit is contained in:
@@ -8,7 +8,6 @@ use anyhow::anyhow;
|
||||
use arc_swap::ArcSwap;
|
||||
use futures::stream::FuturesUnordered;
|
||||
use futures::{FutureExt as _, StreamExt as _};
|
||||
use pageserver_api::shard::DEFAULT_STRIPE_SIZE;
|
||||
use tonic::codec::CompressionEncoding;
|
||||
use tracing::{debug, instrument};
|
||||
use utils::logging::warn_slow;
|
||||
@@ -141,8 +140,8 @@ impl PageserverClient {
|
||||
if !old.count.is_unsharded() && shard_spec.stripe_size != old.stripe_size {
|
||||
return Err(anyhow!(
|
||||
"can't change stripe size from {} to {}",
|
||||
old.stripe_size,
|
||||
shard_spec.stripe_size
|
||||
old.stripe_size.expect("always Some when sharded"),
|
||||
shard_spec.stripe_size.expect("always Some when sharded")
|
||||
));
|
||||
}
|
||||
|
||||
@@ -232,13 +231,15 @@ impl PageserverClient {
|
||||
// Fast path: request is for a single shard.
|
||||
if let Some(shard_id) =
|
||||
GetPageSplitter::for_single_shard(&req, shards.count, shards.stripe_size)
|
||||
.map_err(|err| tonic::Status::internal(err.to_string()))?
|
||||
{
|
||||
return Self::get_page_with_shard(req, shards.get(shard_id)?).await;
|
||||
}
|
||||
|
||||
// Request spans multiple shards. Split it, dispatch concurrent per-shard requests, and
|
||||
// reassemble the responses.
|
||||
let mut splitter = GetPageSplitter::split(req, shards.count, shards.stripe_size);
|
||||
let mut splitter = GetPageSplitter::split(req, shards.count, shards.stripe_size)
|
||||
.map_err(|err| tonic::Status::internal(err.to_string()))?;
|
||||
|
||||
let mut shard_requests = FuturesUnordered::new();
|
||||
for (shard_id, shard_req) in splitter.drain_requests() {
|
||||
@@ -248,10 +249,14 @@ impl PageserverClient {
|
||||
}
|
||||
|
||||
while let Some((shard_id, shard_response)) = shard_requests.next().await.transpose()? {
|
||||
splitter.add_response(shard_id, shard_response)?;
|
||||
splitter
|
||||
.add_response(shard_id, shard_response)
|
||||
.map_err(|err| tonic::Status::internal(err.to_string()))?;
|
||||
}
|
||||
|
||||
splitter.get_response()
|
||||
splitter
|
||||
.get_response()
|
||||
.map_err(|err| tonic::Status::internal(err.to_string()))
|
||||
}
|
||||
|
||||
/// Fetches pages on the given shard. Does not retry internally.
|
||||
@@ -379,12 +384,14 @@ pub struct ShardSpec {
|
||||
/// NB: this is 0 for unsharded tenants, following `ShardIndex::unsharded()` convention.
|
||||
count: ShardCount,
|
||||
/// The stripe size for these shards.
|
||||
stripe_size: ShardStripeSize,
|
||||
///
|
||||
/// INVARIANT: None for unsharded tenants, Some for sharded.
|
||||
stripe_size: Option<ShardStripeSize>,
|
||||
}
|
||||
|
||||
impl ShardSpec {
|
||||
/// Creates a new shard spec with the given URLs and stripe size. All shards must be given.
|
||||
/// The stripe size may be omitted for unsharded tenants.
|
||||
/// The stripe size must be Some for sharded tenants, or None for unsharded tenants.
|
||||
pub fn new(
|
||||
urls: HashMap<ShardIndex, String>,
|
||||
stripe_size: Option<ShardStripeSize>,
|
||||
@@ -397,11 +404,13 @@ impl ShardSpec {
|
||||
n => ShardCount::new(n as u8),
|
||||
};
|
||||
|
||||
// Determine the stripe size. It doesn't matter for unsharded tenants.
|
||||
// Validate the stripe size.
|
||||
if stripe_size.is_none() && !count.is_unsharded() {
|
||||
return Err(anyhow!("stripe size must be given for sharded tenants"));
|
||||
}
|
||||
let stripe_size = stripe_size.unwrap_or(DEFAULT_STRIPE_SIZE);
|
||||
if stripe_size.is_some() && count.is_unsharded() {
|
||||
return Err(anyhow!("stripe size can't be given for unsharded tenants"));
|
||||
}
|
||||
|
||||
// Validate the shard spec.
|
||||
for (shard_id, url) in &urls {
|
||||
@@ -441,8 +450,10 @@ struct Shards {
|
||||
///
|
||||
/// NB: this is 0 for unsharded tenants, following `ShardIndex::unsharded()` convention.
|
||||
count: ShardCount,
|
||||
/// The stripe size. Only used for sharded tenants.
|
||||
stripe_size: ShardStripeSize,
|
||||
/// The stripe size.
|
||||
///
|
||||
/// INVARIANT: None for unsharded tenants, Some for sharded.
|
||||
stripe_size: Option<ShardStripeSize>,
|
||||
}
|
||||
|
||||
impl Shards {
|
||||
|
||||
@@ -1,11 +1,12 @@
|
||||
use std::collections::HashMap;
|
||||
|
||||
use anyhow::anyhow;
|
||||
use bytes::Bytes;
|
||||
|
||||
use pageserver_api::key::rel_block_to_key;
|
||||
use pageserver_api::shard::key_to_shard_number;
|
||||
use pageserver_page_api as page_api;
|
||||
use utils::shard::{ShardCount, ShardIndex, ShardNumber, ShardStripeSize};
|
||||
use utils::shard::{ShardCount, ShardIndex, ShardStripeSize};
|
||||
|
||||
/// Splits GetPageRequests that straddle shard boundaries and assembles the responses.
|
||||
/// TODO: add tests for this.
|
||||
@@ -25,43 +26,54 @@ impl GetPageSplitter {
|
||||
pub fn for_single_shard(
|
||||
req: &page_api::GetPageRequest,
|
||||
count: ShardCount,
|
||||
stripe_size: ShardStripeSize,
|
||||
) -> Option<ShardIndex> {
|
||||
stripe_size: Option<ShardStripeSize>,
|
||||
) -> anyhow::Result<Option<ShardIndex>> {
|
||||
// Fast path: unsharded tenant.
|
||||
if count.is_unsharded() {
|
||||
return Some(ShardIndex::unsharded());
|
||||
return Ok(Some(ShardIndex::unsharded()));
|
||||
}
|
||||
|
||||
// Find the first page's shard, for comparison. If there are no pages, just return the first
|
||||
// shard (caller likely checked already, otherwise the server will reject it).
|
||||
let Some(stripe_size) = stripe_size else {
|
||||
return Err(anyhow!("stripe size must be given for sharded tenants"));
|
||||
};
|
||||
|
||||
// Find the first page's shard, for comparison.
|
||||
let Some(&first_page) = req.block_numbers.first() else {
|
||||
return Some(ShardIndex::new(ShardNumber(0), count));
|
||||
return Err(anyhow!("no block numbers in request"));
|
||||
};
|
||||
let key = rel_block_to_key(req.rel, first_page);
|
||||
let shard_number = key_to_shard_number(count, stripe_size, &key);
|
||||
|
||||
req.block_numbers
|
||||
Ok(req
|
||||
.block_numbers
|
||||
.iter()
|
||||
.skip(1) // computed above
|
||||
.all(|&blkno| {
|
||||
let key = rel_block_to_key(req.rel, blkno);
|
||||
key_to_shard_number(count, stripe_size, &key) == shard_number
|
||||
})
|
||||
.then_some(ShardIndex::new(shard_number, count))
|
||||
.then_some(ShardIndex::new(shard_number, count)))
|
||||
}
|
||||
|
||||
/// Splits the given request.
|
||||
pub fn split(
|
||||
req: page_api::GetPageRequest,
|
||||
count: ShardCount,
|
||||
stripe_size: ShardStripeSize,
|
||||
) -> Self {
|
||||
stripe_size: Option<ShardStripeSize>,
|
||||
) -> anyhow::Result<Self> {
|
||||
// The caller should make sure we don't split requests unnecessarily.
|
||||
debug_assert!(
|
||||
Self::for_single_shard(&req, count, stripe_size).is_none(),
|
||||
Self::for_single_shard(&req, count, stripe_size)?.is_none(),
|
||||
"unnecessary request split"
|
||||
);
|
||||
|
||||
if count.is_unsharded() {
|
||||
return Err(anyhow!("unsharded tenant, no point in splitting request"));
|
||||
}
|
||||
let Some(stripe_size) = stripe_size else {
|
||||
return Err(anyhow!("stripe size must be given for sharded tenants"));
|
||||
};
|
||||
|
||||
// Split the requests by shard index.
|
||||
let mut requests = HashMap::with_capacity(2); // common case
|
||||
let mut block_shards = Vec::with_capacity(req.block_numbers.len());
|
||||
@@ -103,11 +115,11 @@ impl GetPageSplitter {
|
||||
.collect(),
|
||||
};
|
||||
|
||||
Self {
|
||||
Ok(Self {
|
||||
requests,
|
||||
response,
|
||||
block_shards,
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/// Drains the per-shard requests, moving them out of the splitter to avoid extra allocations.
|
||||
@@ -124,21 +136,30 @@ impl GetPageSplitter {
|
||||
&mut self,
|
||||
shard_id: ShardIndex,
|
||||
response: page_api::GetPageResponse,
|
||||
) -> tonic::Result<()> {
|
||||
) -> anyhow::Result<()> {
|
||||
// The caller should already have converted status codes into tonic::Status.
|
||||
if response.status_code != page_api::GetPageStatusCode::Ok {
|
||||
return Err(tonic::Status::internal(format!(
|
||||
return Err(anyhow!(
|
||||
"unexpected non-OK response for shard {shard_id}: {} {}",
|
||||
response.status_code,
|
||||
response.reason.unwrap_or_default()
|
||||
)));
|
||||
));
|
||||
}
|
||||
|
||||
if response.request_id != self.response.request_id {
|
||||
return Err(tonic::Status::internal(format!(
|
||||
return Err(anyhow!(
|
||||
"response ID mismatch for shard {shard_id}: expected {}, got {}",
|
||||
self.response.request_id, response.request_id
|
||||
)));
|
||||
self.response.request_id,
|
||||
response.request_id
|
||||
));
|
||||
}
|
||||
|
||||
if response.request_id != self.response.request_id {
|
||||
return Err(anyhow!(
|
||||
"response ID mismatch for shard {shard_id}: expected {}, got {}",
|
||||
self.response.request_id,
|
||||
response.request_id
|
||||
));
|
||||
}
|
||||
|
||||
// Place the shard response pages into the assembled response, in request order.
|
||||
@@ -150,27 +171,26 @@ impl GetPageSplitter {
|
||||
}
|
||||
|
||||
let Some(slot) = self.response.pages.get_mut(i) else {
|
||||
return Err(tonic::Status::internal(format!(
|
||||
"no block_shards slot {i} for shard {shard_id}"
|
||||
)));
|
||||
return Err(anyhow!("no block_shards slot {i} for shard {shard_id}"));
|
||||
};
|
||||
let Some(page) = pages.next() else {
|
||||
return Err(tonic::Status::internal(format!(
|
||||
return Err(anyhow!(
|
||||
"missing page {} in shard {shard_id} response",
|
||||
slot.block_number
|
||||
)));
|
||||
));
|
||||
};
|
||||
if page.block_number != slot.block_number {
|
||||
return Err(tonic::Status::internal(format!(
|
||||
return Err(anyhow!(
|
||||
"shard {shard_id} returned wrong page at index {i}, expected {} got {}",
|
||||
slot.block_number, page.block_number
|
||||
)));
|
||||
slot.block_number,
|
||||
page.block_number
|
||||
));
|
||||
}
|
||||
if !slot.image.is_empty() {
|
||||
return Err(tonic::Status::internal(format!(
|
||||
return Err(anyhow!(
|
||||
"shard {shard_id} returned duplicate page {} at index {i}",
|
||||
slot.block_number
|
||||
)));
|
||||
));
|
||||
}
|
||||
|
||||
*slot = page;
|
||||
@@ -178,10 +198,10 @@ impl GetPageSplitter {
|
||||
|
||||
// Make sure we've consumed all pages from the shard response.
|
||||
if let Some(extra_page) = pages.next() {
|
||||
return Err(tonic::Status::internal(format!(
|
||||
return Err(anyhow!(
|
||||
"shard {shard_id} returned extra page: {}",
|
||||
extra_page.block_number
|
||||
)));
|
||||
));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -189,18 +209,18 @@ impl GetPageSplitter {
|
||||
|
||||
/// Fetches the final, assembled response.
|
||||
#[allow(clippy::result_large_err)]
|
||||
pub fn get_response(self) -> tonic::Result<page_api::GetPageResponse> {
|
||||
pub fn get_response(self) -> anyhow::Result<page_api::GetPageResponse> {
|
||||
// Check that the response is complete.
|
||||
for (i, page) in self.response.pages.iter().enumerate() {
|
||||
if page.image.is_empty() {
|
||||
return Err(tonic::Status::internal(format!(
|
||||
return Err(anyhow!(
|
||||
"missing page {} for shard {}",
|
||||
page.block_number,
|
||||
self.block_shards
|
||||
.get(i)
|
||||
.map(|s| s.to_string())
|
||||
.unwrap_or_else(|| "?".to_string())
|
||||
)));
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user