pageserver/client_grpc: add shard map updates (#12480)

## Problem

The communicator gRPC client must support changing the shard map on
splits.

Touches #11735.
Requires #12476.

## Summary of changes

* Wrap the shard set in a `ArcSwap` to allow swapping it out.
* Add a new `ShardSpec` parameter struct to pass validated shard info to
the client.
* Add `update_shards()` to change the shard set. In-flight requests are
allowed to complete using the old shards.
* Restructure `get_page` to use a stable view of the shard map, and
retry errors at the top (pre-split) level to pick up shard map changes.
* Also marks `tonic::Status::Internal` as non-retryable, so that we can
use it for client-side invariant checks without continually retrying
these.
This commit is contained in:
Erik Grinaker
2025-07-10 17:46:39 +02:00
committed by GitHub
parent 2c6b327be6
commit 2fc77c836b
6 changed files with 199 additions and 98 deletions

1
Cargo.lock generated
View File

@@ -4500,6 +4500,7 @@ name = "pageserver_client_grpc"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"arc-swap",
"bytes", "bytes",
"compute_api", "compute_api",
"futures", "futures",

View File

@@ -9,6 +9,7 @@ testing = ["pageserver_api/testing"]
[dependencies] [dependencies]
anyhow.workspace = true anyhow.workspace = true
arc-swap.workspace = true
bytes.workspace = true bytes.workspace = true
compute_api.workspace = true compute_api.workspace = true
futures.workspace = true futures.workspace = true

View File

@@ -3,6 +3,7 @@ use std::num::NonZero;
use std::sync::Arc; use std::sync::Arc;
use anyhow::anyhow; use anyhow::anyhow;
use arc_swap::ArcSwap;
use futures::stream::FuturesUnordered; use futures::stream::FuturesUnordered;
use futures::{FutureExt as _, StreamExt as _}; use futures::{FutureExt as _, StreamExt as _};
use tracing::instrument; use tracing::instrument;
@@ -55,28 +56,74 @@ const MAX_BULK_STREAM_QUEUE_DEPTH: NonZero<usize> = NonZero::new(4).unwrap();
/// TODO: this client does not support base backups or LSN leases, as these are only used by /// TODO: this client does not support base backups or LSN leases, as these are only used by
/// compute_ctl. Consider adding this, but LSN leases need concurrent requests on all shards. /// compute_ctl. Consider adding this, but LSN leases need concurrent requests on all shards.
pub struct PageserverClient { pub struct PageserverClient {
// TODO: support swapping out the shard map, e.g. via an ArcSwap. /// The tenant ID.
shards: Shards, tenant_id: TenantId,
/// The timeline ID.
timeline_id: TimelineId,
/// The JWT auth token for this tenant, if any.
auth_token: Option<String>,
/// The shards for this tenant.
shards: ArcSwap<Shards>,
/// The retry configuration.
retry: Retry, retry: Retry,
} }
impl PageserverClient { impl PageserverClient {
/// Creates a new Pageserver client for a given tenant and timeline. Uses the Pageservers given /// Creates a new Pageserver client for a given tenant and timeline. Uses the Pageservers given
/// in the shard map, which must be complete and must use gRPC URLs. /// in the shard spec, which must be complete and must use gRPC URLs.
pub fn new( pub fn new(
tenant_id: TenantId, tenant_id: TenantId,
timeline_id: TimelineId, timeline_id: TimelineId,
shard_map: HashMap<ShardIndex, String>, shard_spec: ShardSpec,
stripe_size: ShardStripeSize,
auth_token: Option<String>, auth_token: Option<String>,
) -> anyhow::Result<Self> { ) -> anyhow::Result<Self> {
let shards = Shards::new(tenant_id, timeline_id, shard_map, stripe_size, auth_token)?; let shards = Shards::new(tenant_id, timeline_id, shard_spec, auth_token.clone())?;
Ok(Self { Ok(Self {
shards, tenant_id,
timeline_id,
auth_token,
shards: ArcSwap::new(Arc::new(shards)),
retry: Retry, retry: Retry,
}) })
} }
/// Updates the shards from the given shard spec. In-flight requests will complete using the
/// existing shards, but may retry with the new shards if they fail.
///
/// TODO: verify that in-flight requests are allowed to complete, and that the old pools are
/// properly spun down and dropped afterwards.
pub fn update_shards(&self, shard_spec: ShardSpec) -> anyhow::Result<()> {
// Validate the shard spec. We should really use `ArcSwap::rcu` for this, to avoid races
// with concurrent updates, but that involves creating a new `Shards` on every attempt,
// which spins up a bunch of Tokio tasks and such. These should already be checked elsewhere
// in the stack, and if they're violated then we already have problems elsewhere, so a
// best-effort but possibly-racy check is okay here.
let old = self.shards.load_full();
if shard_spec.count < old.count {
return Err(anyhow!(
"can't reduce shard count from {} to {}",
old.count,
shard_spec.count
));
}
if !old.count.is_unsharded() && shard_spec.stripe_size != old.stripe_size {
return Err(anyhow!(
"can't change stripe size from {} to {}",
old.stripe_size,
shard_spec.stripe_size
));
}
let shards = Shards::new(
self.tenant_id,
self.timeline_id,
shard_spec,
self.auth_token.clone(),
)?;
self.shards.store(Arc::new(shards));
Ok(())
}
/// Returns whether a relation exists. /// Returns whether a relation exists.
#[instrument(skip_all, fields(rel=%req.rel, lsn=%req.read_lsn))] #[instrument(skip_all, fields(rel=%req.rel, lsn=%req.read_lsn))]
pub async fn check_rel_exists( pub async fn check_rel_exists(
@@ -86,7 +133,7 @@ impl PageserverClient {
self.retry self.retry
.with(async || { .with(async || {
// Relation metadata is only available on shard 0. // Relation metadata is only available on shard 0.
let mut client = self.shards.get_zero().client().await?; let mut client = self.shards.load_full().get_zero().client().await?;
client.check_rel_exists(req).await client.check_rel_exists(req).await
}) })
.await .await
@@ -101,7 +148,7 @@ impl PageserverClient {
self.retry self.retry
.with(async || { .with(async || {
// Relation metadata is only available on shard 0. // Relation metadata is only available on shard 0.
let mut client = self.shards.get_zero().client().await?; let mut client = self.shards.load_full().get_zero().client().await?;
client.get_db_size(req).await client.get_db_size(req).await
}) })
.await .await
@@ -129,28 +176,42 @@ impl PageserverClient {
return Err(tonic::Status::invalid_argument("no block number")); return Err(tonic::Status::invalid_argument("no block number"));
} }
// The shards may change while we're fetching pages. We execute the request using a stable
// view of the shards (especially important for requests that span shards), but retry the
// top-level (pre-split) request to pick up shard changes. This can lead to unnecessary
// retries and re-splits in some cases where requests span shards, but these are expected to
// be rare.
//
// TODO: the gRPC server and client doesn't yet properly support shard splits. Revisit this
// once we figure out how to handle these.
self.retry
.with(async || Self::get_page_with_shards(req.clone(), &self.shards.load_full()).await)
.await
}
/// Fetches pages using the given shards. This uses a stable view of the shards, regardless of
/// concurrent shard updates. Does not retry internally, but is retried by `get_page()`.
async fn get_page_with_shards(
req: page_api::GetPageRequest,
shards: &Shards,
) -> tonic::Result<page_api::GetPageResponse> {
// Fast path: request is for a single shard. // Fast path: request is for a single shard.
if let Some(shard_id) = if let Some(shard_id) =
GetPageSplitter::is_single_shard(&req, self.shards.count, self.shards.stripe_size) GetPageSplitter::is_single_shard(&req, shards.count, shards.stripe_size)
{ {
return self.get_page_for_shard(shard_id, req).await; return Self::get_page_with_shard(req, shards.get(shard_id)?).await;
} }
// Request spans multiple shards. Split it, dispatch concurrent per-shard requests, and // Request spans multiple shards. Split it, dispatch concurrent per-shard requests, and
// reassemble the responses. // reassemble the responses.
// let mut splitter = GetPageSplitter::split(req, shards.count, shards.stripe_size);
// TODO: when we support shard map updates, we need to detect when it changes and re-split
// the request on errors.
let mut splitter = GetPageSplitter::split(req, self.shards.count, self.shards.stripe_size);
let mut shard_requests: FuturesUnordered<_> = splitter let mut shard_requests = FuturesUnordered::new();
.drain_requests() for (shard_id, shard_req) in splitter.drain_requests() {
.map(|(shard_id, shard_req)| { let future = Self::get_page_with_shard(shard_req, shards.get(shard_id)?)
// NB: each request will retry internally. .map(move |result| result.map(|resp| (shard_id, resp)));
self.get_page_for_shard(shard_id, shard_req) shard_requests.push(future);
.map(move |result| result.map(|resp| (shard_id, resp))) }
})
.collect();
while let Some((shard_id, shard_response)) = shard_requests.next().await.transpose()? { while let Some((shard_id, shard_response)) = shard_requests.next().await.transpose()? {
splitter.add_response(shard_id, shard_response)?; splitter.add_response(shard_id, shard_response)?;
@@ -159,41 +220,28 @@ impl PageserverClient {
splitter.assemble_response() splitter.assemble_response()
} }
/// Fetches pages that belong to the given shard. /// Fetches pages on the given shard. Does not retry internally.
#[instrument(skip_all, fields(shard = %shard_id))] async fn get_page_with_shard(
async fn get_page_for_shard(
&self,
shard_id: ShardIndex,
req: page_api::GetPageRequest, req: page_api::GetPageRequest,
shard: &Shard,
) -> tonic::Result<page_api::GetPageResponse> { ) -> tonic::Result<page_api::GetPageResponse> {
let resp = self let expected = req.block_numbers.len();
.retry let stream = shard.stream(req.request_class.is_bulk()).await;
.with(async || { let resp = stream.send(req).await?;
let stream = self
.shards
.get(shard_id)?
.stream(req.request_class.is_bulk())
.await;
let resp = stream.send(req.clone()).await?;
// Convert per-request errors into a tonic::Status. // Convert per-request errors into a tonic::Status.
if resp.status_code != page_api::GetPageStatusCode::Ok { if resp.status_code != page_api::GetPageStatusCode::Ok {
return Err(tonic::Status::new( return Err(tonic::Status::new(
resp.status_code.into(), resp.status_code.into(),
resp.reason.unwrap_or_else(|| String::from("unknown error")), resp.reason.unwrap_or_else(|| String::from("unknown error")),
)); ));
} }
Ok(resp) // Check that we received the expected number of pages.
}) let actual = resp.page_images.len();
.await?;
// Make sure we got the right number of pages.
// NB: check outside of the retry loop, since we don't want to retry this.
let (expected, actual) = (req.block_numbers.len(), resp.page_images.len());
if expected != actual { if expected != actual {
return Err(tonic::Status::internal(format!( return Err(tonic::Status::internal(format!(
"expected {expected} pages for shard {shard_id}, got {actual}", "expected {expected} pages, got {actual}",
))); )));
} }
@@ -209,7 +257,7 @@ impl PageserverClient {
self.retry self.retry
.with(async || { .with(async || {
// Relation metadata is only available on shard 0. // Relation metadata is only available on shard 0.
let mut client = self.shards.get_zero().client().await?; let mut client = self.shards.load_full().get_zero().client().await?;
client.get_rel_size(req).await client.get_rel_size(req).await
}) })
.await .await
@@ -224,48 +272,51 @@ impl PageserverClient {
self.retry self.retry
.with(async || { .with(async || {
// SLRU segments are only available on shard 0. // SLRU segments are only available on shard 0.
let mut client = self.shards.get_zero().client().await?; let mut client = self.shards.load_full().get_zero().client().await?;
client.get_slru_segment(req).await client.get_slru_segment(req).await
}) })
.await .await
} }
} }
/// Tracks the tenant's shards. /// Shard specification for a PageserverClient.
struct Shards { pub struct ShardSpec {
/// Maps shard indices to gRPC URLs.
///
/// INVARIANT: every shard 0..count is present, and shard 0 is always present.
/// INVARIANT: every URL is valid and uses grpc:// scheme.
urls: HashMap<ShardIndex, String>,
/// The shard count. /// The shard count.
/// ///
/// NB: this is 0 for unsharded tenants, following `ShardIndex::unsharded()` convention. /// NB: this is 0 for unsharded tenants, following `ShardIndex::unsharded()` convention.
count: ShardCount, count: ShardCount,
/// The stripe size. Only used for sharded tenants. /// The stripe size for these shards.
stripe_size: ShardStripeSize, stripe_size: ShardStripeSize,
/// Shards by shard index.
///
/// NB: unsharded tenants use count 0, like `ShardIndex::unsharded()`.
///
/// INVARIANT: every shard 0..count is present.
/// INVARIANT: shard 0 is always present.
map: HashMap<ShardIndex, Shard>,
} }
impl Shards { impl ShardSpec {
/// Creates a new set of shards based on a shard map. /// Creates a new shard spec with the given URLs and stripe size. All shards must be given.
fn new( /// The stripe size may be omitted for unsharded tenants.
tenant_id: TenantId, pub fn new(
timeline_id: TimelineId, urls: HashMap<ShardIndex, String>,
shard_map: HashMap<ShardIndex, String>, stripe_size: Option<ShardStripeSize>,
stripe_size: ShardStripeSize,
auth_token: Option<String>,
) -> anyhow::Result<Self> { ) -> anyhow::Result<Self> {
let count = match shard_map.len() { // Compute the shard count.
let count = match urls.len() {
0 => return Err(anyhow!("no shards provided")), 0 => return Err(anyhow!("no shards provided")),
1 => ShardCount::new(0), // NB: unsharded tenants use 0, like `ShardIndex::unsharded()` 1 => ShardCount::new(0), // NB: unsharded tenants use 0, like `ShardIndex::unsharded()`
n if n > u8::MAX as usize => return Err(anyhow!("too many shards: {n}")), n if n > u8::MAX as usize => return Err(anyhow!("too many shards: {n}")),
n => ShardCount::new(n as u8), n => ShardCount::new(n as u8),
}; };
let mut map = HashMap::new(); // Determine the stripe size. It doesn't matter for unsharded tenants.
for (shard_id, url) in shard_map { if stripe_size.is_none() && !count.is_unsharded() {
return Err(anyhow!("stripe size must be given for sharded tenants"));
}
let stripe_size = stripe_size.unwrap_or_default();
// Validate the shard spec.
for (shard_id, url) in &urls {
// The shard index must match the computed shard count, even for unsharded tenants. // The shard index must match the computed shard count, even for unsharded tenants.
if shard_id.shard_count != count { if shard_id.shard_count != count {
return Err(anyhow!("invalid shard index {shard_id}, expected {count}")); return Err(anyhow!("invalid shard index {shard_id}, expected {count}"));
@@ -276,21 +327,64 @@ impl Shards {
} }
// The above conditions guarantee that we have all shards 0..count: len() matches count, // The above conditions guarantee that we have all shards 0..count: len() matches count,
// shard number < count, and numbers are unique (via hashmap). // shard number < count, and numbers are unique (via hashmap).
let shard = Shard::new(url, tenant_id, timeline_id, shard_id, auth_token.clone())?;
map.insert(shard_id, shard); // Validate the URL.
if PageserverProtocol::from_connstring(url)? != PageserverProtocol::Grpc {
return Err(anyhow!("invalid shard URL {url}: must use gRPC"));
}
} }
Ok(Self { Ok(Self {
urls,
count, count,
stripe_size, stripe_size,
map, })
}
}
/// Tracks the tenant's shards.
struct Shards {
/// Shards by shard index.
///
/// INVARIANT: every shard 0..count is present.
/// INVARIANT: shard 0 is always present.
by_index: HashMap<ShardIndex, Shard>,
/// The shard count.
///
/// NB: this is 0 for unsharded tenants, following `ShardIndex::unsharded()` convention.
count: ShardCount,
/// The stripe size. Only used for sharded tenants.
stripe_size: ShardStripeSize,
}
impl Shards {
/// Creates a new set of shards based on a shard spec.
fn new(
tenant_id: TenantId,
timeline_id: TimelineId,
shard_spec: ShardSpec,
auth_token: Option<String>,
) -> anyhow::Result<Self> {
// NB: the shard spec has already been validated when constructed.
let mut shards = HashMap::with_capacity(shard_spec.urls.len());
for (shard_id, url) in shard_spec.urls {
shards.insert(
shard_id,
Shard::new(url, tenant_id, timeline_id, shard_id, auth_token.clone())?,
);
}
Ok(Self {
by_index: shards,
count: shard_spec.count,
stripe_size: shard_spec.stripe_size,
}) })
} }
/// Looks up the given shard. /// Looks up the given shard.
#[allow(clippy::result_large_err)] // TODO: check perf impact #[allow(clippy::result_large_err)] // TODO: check perf impact
fn get(&self, shard_id: ShardIndex) -> tonic::Result<&Shard> { fn get(&self, shard_id: ShardIndex) -> tonic::Result<&Shard> {
self.map self.by_index
.get(&shard_id) .get(&shard_id)
.ok_or_else(|| tonic::Status::not_found(format!("unknown shard {shard_id}"))) .ok_or_else(|| tonic::Status::not_found(format!("unknown shard {shard_id}")))
} }
@@ -329,11 +423,6 @@ impl Shard {
shard_id: ShardIndex, shard_id: ShardIndex,
auth_token: Option<String>, auth_token: Option<String>,
) -> anyhow::Result<Self> { ) -> anyhow::Result<Self> {
// Sanity-check that the URL uses gRPC.
if PageserverProtocol::from_connstring(&url)? != PageserverProtocol::Grpc {
return Err(anyhow!("invalid shard URL {url}: must use gRPC"));
}
// Common channel pool for unary and stream requests. Bounded by client/stream pools. // Common channel pool for unary and stream requests. Bounded by client/stream pools.
let channel_pool = ChannelPool::new(url.clone(), MAX_CLIENTS_PER_CHANNEL)?; let channel_pool = ChannelPool::new(url.clone(), MAX_CLIENTS_PER_CHANNEL)?;

View File

@@ -3,4 +3,4 @@ mod pool;
mod retry; mod retry;
mod split; mod split;
pub use client::PageserverClient; pub use client::{PageserverClient, ShardSpec};

View File

@@ -131,7 +131,6 @@ impl Retry {
tonic::Code::Aborted => true, tonic::Code::Aborted => true,
tonic::Code::Cancelled => true, tonic::Code::Cancelled => true,
tonic::Code::DeadlineExceeded => true, // maybe transient slowness tonic::Code::DeadlineExceeded => true, // maybe transient slowness
tonic::Code::Internal => true, // maybe transient failure?
tonic::Code::ResourceExhausted => true, tonic::Code::ResourceExhausted => true,
tonic::Code::Unavailable => true, tonic::Code::Unavailable => true,
@@ -139,6 +138,10 @@ impl Retry {
tonic::Code::AlreadyExists => false, tonic::Code::AlreadyExists => false,
tonic::Code::DataLoss => false, tonic::Code::DataLoss => false,
tonic::Code::FailedPrecondition => false, tonic::Code::FailedPrecondition => false,
// NB: don't retry Internal. It is intended for serious errors such as invariant
// violations, and is also used for client-side invariant checks that would otherwise
// result in retry loops.
tonic::Code::Internal => false,
tonic::Code::InvalidArgument => false, tonic::Code::InvalidArgument => false,
tonic::Code::NotFound => false, tonic::Code::NotFound => false,
tonic::Code::OutOfRange => false, tonic::Code::OutOfRange => false,

View File

@@ -97,7 +97,8 @@ impl GetPageSplitter {
self.requests.drain() self.requests.drain()
} }
/// Adds a response from the given shard. /// Adds a response from the given shard. The response must match the request ID and have an OK
/// status code. A response must not already exist for the given shard ID.
#[allow(clippy::result_large_err)] #[allow(clippy::result_large_err)]
pub fn add_response( pub fn add_response(
&mut self, &mut self,
@@ -105,24 +106,30 @@ impl GetPageSplitter {
response: page_api::GetPageResponse, response: page_api::GetPageResponse,
) -> tonic::Result<()> { ) -> tonic::Result<()> {
// The caller should already have converted status codes into tonic::Status. // The caller should already have converted status codes into tonic::Status.
assert_eq!(response.status_code, page_api::GetPageStatusCode::Ok); if response.status_code != page_api::GetPageStatusCode::Ok {
return Err(tonic::Status::internal(format!(
"unexpected non-OK response for shard {shard_id}: {:?}",
response.status_code
)));
}
// Make sure the response matches the request ID. // The stream pool ensures the response matches the request ID.
if response.request_id != self.request_id { if response.request_id != self.request_id {
return Err(tonic::Status::internal(format!( return Err(tonic::Status::internal(format!(
"response ID {} does not match request ID {}", "response ID mismatch for shard {shard_id}: expected {}, got {}",
response.request_id, self.request_id self.request_id, response.request_id
)));
}
// We only dispatch one request per shard.
if self.responses.contains_key(&shard_id) {
return Err(tonic::Status::internal(format!(
"duplicate response for shard {shard_id}"
))); )));
} }
// Add the response data to the map. // Add the response data to the map.
let old = self.responses.insert(shard_id, response.page_images); self.responses.insert(shard_id, response.page_images);
if old.is_some() {
return Err(tonic::Status::internal(format!(
"duplicate response for shard {shard_id}",
)));
}
Ok(()) Ok(())
} }