diff --git a/Cargo.lock b/Cargo.lock index 85080f8473..1d68b8f862 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4500,6 +4500,7 @@ name = "pageserver_client_grpc" version = "0.1.0" dependencies = [ "anyhow", + "arc-swap", "bytes", "compute_api", "futures", diff --git a/pageserver/client_grpc/Cargo.toml b/pageserver/client_grpc/Cargo.toml index ca224900ac..e2741ad839 100644 --- a/pageserver/client_grpc/Cargo.toml +++ b/pageserver/client_grpc/Cargo.toml @@ -9,6 +9,7 @@ testing = ["pageserver_api/testing"] [dependencies] anyhow.workspace = true +arc-swap.workspace = true bytes.workspace = true compute_api.workspace = true futures.workspace = true diff --git a/pageserver/client_grpc/src/client.rs b/pageserver/client_grpc/src/client.rs index 63852868c3..ee09c1f13c 100644 --- a/pageserver/client_grpc/src/client.rs +++ b/pageserver/client_grpc/src/client.rs @@ -3,6 +3,7 @@ use std::num::NonZero; use std::sync::Arc; use anyhow::anyhow; +use arc_swap::ArcSwap; use futures::stream::FuturesUnordered; use futures::{FutureExt as _, StreamExt as _}; use tracing::instrument; @@ -55,28 +56,74 @@ const MAX_BULK_STREAM_QUEUE_DEPTH: NonZero = NonZero::new(4).unwrap(); /// TODO: this client does not support base backups or LSN leases, as these are only used by /// compute_ctl. Consider adding this, but LSN leases need concurrent requests on all shards. pub struct PageserverClient { - // TODO: support swapping out the shard map, e.g. via an ArcSwap. - shards: Shards, + /// The tenant ID. + tenant_id: TenantId, + /// The timeline ID. + timeline_id: TimelineId, + /// The JWT auth token for this tenant, if any. + auth_token: Option, + /// The shards for this tenant. + shards: ArcSwap, + /// The retry configuration. retry: Retry, } impl PageserverClient { /// Creates a new Pageserver client for a given tenant and timeline. Uses the Pageservers given - /// in the shard map, which must be complete and must use gRPC URLs. + /// in the shard spec, which must be complete and must use gRPC URLs. pub fn new( tenant_id: TenantId, timeline_id: TimelineId, - shard_map: HashMap, - stripe_size: ShardStripeSize, + shard_spec: ShardSpec, auth_token: Option, ) -> anyhow::Result { - let shards = Shards::new(tenant_id, timeline_id, shard_map, stripe_size, auth_token)?; + let shards = Shards::new(tenant_id, timeline_id, shard_spec, auth_token.clone())?; Ok(Self { - shards, + tenant_id, + timeline_id, + auth_token, + shards: ArcSwap::new(Arc::new(shards)), retry: Retry, }) } + /// Updates the shards from the given shard spec. In-flight requests will complete using the + /// existing shards, but may retry with the new shards if they fail. + /// + /// TODO: verify that in-flight requests are allowed to complete, and that the old pools are + /// properly spun down and dropped afterwards. + pub fn update_shards(&self, shard_spec: ShardSpec) -> anyhow::Result<()> { + // Validate the shard spec. We should really use `ArcSwap::rcu` for this, to avoid races + // with concurrent updates, but that involves creating a new `Shards` on every attempt, + // which spins up a bunch of Tokio tasks and such. These should already be checked elsewhere + // in the stack, and if they're violated then we already have problems elsewhere, so a + // best-effort but possibly-racy check is okay here. + let old = self.shards.load_full(); + if shard_spec.count < old.count { + return Err(anyhow!( + "can't reduce shard count from {} to {}", + old.count, + shard_spec.count + )); + } + if !old.count.is_unsharded() && shard_spec.stripe_size != old.stripe_size { + return Err(anyhow!( + "can't change stripe size from {} to {}", + old.stripe_size, + shard_spec.stripe_size + )); + } + + let shards = Shards::new( + self.tenant_id, + self.timeline_id, + shard_spec, + self.auth_token.clone(), + )?; + self.shards.store(Arc::new(shards)); + Ok(()) + } + /// Returns whether a relation exists. #[instrument(skip_all, fields(rel=%req.rel, lsn=%req.read_lsn))] pub async fn check_rel_exists( @@ -86,7 +133,7 @@ impl PageserverClient { self.retry .with(async || { // Relation metadata is only available on shard 0. - let mut client = self.shards.get_zero().client().await?; + let mut client = self.shards.load_full().get_zero().client().await?; client.check_rel_exists(req).await }) .await @@ -101,7 +148,7 @@ impl PageserverClient { self.retry .with(async || { // Relation metadata is only available on shard 0. - let mut client = self.shards.get_zero().client().await?; + let mut client = self.shards.load_full().get_zero().client().await?; client.get_db_size(req).await }) .await @@ -129,28 +176,42 @@ impl PageserverClient { return Err(tonic::Status::invalid_argument("no block number")); } + // The shards may change while we're fetching pages. We execute the request using a stable + // view of the shards (especially important for requests that span shards), but retry the + // top-level (pre-split) request to pick up shard changes. This can lead to unnecessary + // retries and re-splits in some cases where requests span shards, but these are expected to + // be rare. + // + // TODO: the gRPC server and client doesn't yet properly support shard splits. Revisit this + // once we figure out how to handle these. + self.retry + .with(async || Self::get_page_with_shards(req.clone(), &self.shards.load_full()).await) + .await + } + + /// Fetches pages using the given shards. This uses a stable view of the shards, regardless of + /// concurrent shard updates. Does not retry internally, but is retried by `get_page()`. + async fn get_page_with_shards( + req: page_api::GetPageRequest, + shards: &Shards, + ) -> tonic::Result { // Fast path: request is for a single shard. if let Some(shard_id) = - GetPageSplitter::is_single_shard(&req, self.shards.count, self.shards.stripe_size) + GetPageSplitter::is_single_shard(&req, shards.count, shards.stripe_size) { - return self.get_page_for_shard(shard_id, req).await; + return Self::get_page_with_shard(req, shards.get(shard_id)?).await; } // Request spans multiple shards. Split it, dispatch concurrent per-shard requests, and // reassemble the responses. - // - // TODO: when we support shard map updates, we need to detect when it changes and re-split - // the request on errors. - let mut splitter = GetPageSplitter::split(req, self.shards.count, self.shards.stripe_size); + let mut splitter = GetPageSplitter::split(req, shards.count, shards.stripe_size); - let mut shard_requests: FuturesUnordered<_> = splitter - .drain_requests() - .map(|(shard_id, shard_req)| { - // NB: each request will retry internally. - self.get_page_for_shard(shard_id, shard_req) - .map(move |result| result.map(|resp| (shard_id, resp))) - }) - .collect(); + let mut shard_requests = FuturesUnordered::new(); + for (shard_id, shard_req) in splitter.drain_requests() { + let future = Self::get_page_with_shard(shard_req, shards.get(shard_id)?) + .map(move |result| result.map(|resp| (shard_id, resp))); + shard_requests.push(future); + } while let Some((shard_id, shard_response)) = shard_requests.next().await.transpose()? { splitter.add_response(shard_id, shard_response)?; @@ -159,41 +220,28 @@ impl PageserverClient { splitter.assemble_response() } - /// Fetches pages that belong to the given shard. - #[instrument(skip_all, fields(shard = %shard_id))] - async fn get_page_for_shard( - &self, - shard_id: ShardIndex, + /// Fetches pages on the given shard. Does not retry internally. + async fn get_page_with_shard( req: page_api::GetPageRequest, + shard: &Shard, ) -> tonic::Result { - let resp = self - .retry - .with(async || { - let stream = self - .shards - .get(shard_id)? - .stream(req.request_class.is_bulk()) - .await; - let resp = stream.send(req.clone()).await?; + let expected = req.block_numbers.len(); + let stream = shard.stream(req.request_class.is_bulk()).await; + let resp = stream.send(req).await?; - // Convert per-request errors into a tonic::Status. - if resp.status_code != page_api::GetPageStatusCode::Ok { - return Err(tonic::Status::new( - resp.status_code.into(), - resp.reason.unwrap_or_else(|| String::from("unknown error")), - )); - } + // Convert per-request errors into a tonic::Status. + if resp.status_code != page_api::GetPageStatusCode::Ok { + return Err(tonic::Status::new( + resp.status_code.into(), + resp.reason.unwrap_or_else(|| String::from("unknown error")), + )); + } - Ok(resp) - }) - .await?; - - // Make sure we got the right number of pages. - // NB: check outside of the retry loop, since we don't want to retry this. - let (expected, actual) = (req.block_numbers.len(), resp.page_images.len()); + // Check that we received the expected number of pages. + let actual = resp.page_images.len(); if expected != actual { return Err(tonic::Status::internal(format!( - "expected {expected} pages for shard {shard_id}, got {actual}", + "expected {expected} pages, got {actual}", ))); } @@ -209,7 +257,7 @@ impl PageserverClient { self.retry .with(async || { // Relation metadata is only available on shard 0. - let mut client = self.shards.get_zero().client().await?; + let mut client = self.shards.load_full().get_zero().client().await?; client.get_rel_size(req).await }) .await @@ -224,48 +272,51 @@ impl PageserverClient { self.retry .with(async || { // SLRU segments are only available on shard 0. - let mut client = self.shards.get_zero().client().await?; + let mut client = self.shards.load_full().get_zero().client().await?; client.get_slru_segment(req).await }) .await } } -/// Tracks the tenant's shards. -struct Shards { +/// Shard specification for a PageserverClient. +pub struct ShardSpec { + /// Maps shard indices to gRPC URLs. + /// + /// INVARIANT: every shard 0..count is present, and shard 0 is always present. + /// INVARIANT: every URL is valid and uses grpc:// scheme. + urls: HashMap, /// The shard count. /// /// NB: this is 0 for unsharded tenants, following `ShardIndex::unsharded()` convention. count: ShardCount, - /// The stripe size. Only used for sharded tenants. + /// The stripe size for these shards. stripe_size: ShardStripeSize, - /// Shards by shard index. - /// - /// NB: unsharded tenants use count 0, like `ShardIndex::unsharded()`. - /// - /// INVARIANT: every shard 0..count is present. - /// INVARIANT: shard 0 is always present. - map: HashMap, } -impl Shards { - /// Creates a new set of shards based on a shard map. - fn new( - tenant_id: TenantId, - timeline_id: TimelineId, - shard_map: HashMap, - stripe_size: ShardStripeSize, - auth_token: Option, +impl ShardSpec { + /// Creates a new shard spec with the given URLs and stripe size. All shards must be given. + /// The stripe size may be omitted for unsharded tenants. + pub fn new( + urls: HashMap, + stripe_size: Option, ) -> anyhow::Result { - let count = match shard_map.len() { + // Compute the shard count. + let count = match urls.len() { 0 => return Err(anyhow!("no shards provided")), 1 => ShardCount::new(0), // NB: unsharded tenants use 0, like `ShardIndex::unsharded()` n if n > u8::MAX as usize => return Err(anyhow!("too many shards: {n}")), n => ShardCount::new(n as u8), }; - let mut map = HashMap::new(); - for (shard_id, url) in shard_map { + // Determine the stripe size. It doesn't matter for unsharded tenants. + if stripe_size.is_none() && !count.is_unsharded() { + return Err(anyhow!("stripe size must be given for sharded tenants")); + } + let stripe_size = stripe_size.unwrap_or_default(); + + // Validate the shard spec. + for (shard_id, url) in &urls { // The shard index must match the computed shard count, even for unsharded tenants. if shard_id.shard_count != count { return Err(anyhow!("invalid shard index {shard_id}, expected {count}")); @@ -276,21 +327,64 @@ impl Shards { } // The above conditions guarantee that we have all shards 0..count: len() matches count, // shard number < count, and numbers are unique (via hashmap). - let shard = Shard::new(url, tenant_id, timeline_id, shard_id, auth_token.clone())?; - map.insert(shard_id, shard); + + // Validate the URL. + if PageserverProtocol::from_connstring(url)? != PageserverProtocol::Grpc { + return Err(anyhow!("invalid shard URL {url}: must use gRPC")); + } } Ok(Self { + urls, count, stripe_size, - map, + }) + } +} + +/// Tracks the tenant's shards. +struct Shards { + /// Shards by shard index. + /// + /// INVARIANT: every shard 0..count is present. + /// INVARIANT: shard 0 is always present. + by_index: HashMap, + /// The shard count. + /// + /// NB: this is 0 for unsharded tenants, following `ShardIndex::unsharded()` convention. + count: ShardCount, + /// The stripe size. Only used for sharded tenants. + stripe_size: ShardStripeSize, +} + +impl Shards { + /// Creates a new set of shards based on a shard spec. + fn new( + tenant_id: TenantId, + timeline_id: TimelineId, + shard_spec: ShardSpec, + auth_token: Option, + ) -> anyhow::Result { + // NB: the shard spec has already been validated when constructed. + let mut shards = HashMap::with_capacity(shard_spec.urls.len()); + for (shard_id, url) in shard_spec.urls { + shards.insert( + shard_id, + Shard::new(url, tenant_id, timeline_id, shard_id, auth_token.clone())?, + ); + } + + Ok(Self { + by_index: shards, + count: shard_spec.count, + stripe_size: shard_spec.stripe_size, }) } /// Looks up the given shard. #[allow(clippy::result_large_err)] // TODO: check perf impact fn get(&self, shard_id: ShardIndex) -> tonic::Result<&Shard> { - self.map + self.by_index .get(&shard_id) .ok_or_else(|| tonic::Status::not_found(format!("unknown shard {shard_id}"))) } @@ -329,11 +423,6 @@ impl Shard { shard_id: ShardIndex, auth_token: Option, ) -> anyhow::Result { - // Sanity-check that the URL uses gRPC. - if PageserverProtocol::from_connstring(&url)? != PageserverProtocol::Grpc { - return Err(anyhow!("invalid shard URL {url}: must use gRPC")); - } - // Common channel pool for unary and stream requests. Bounded by client/stream pools. let channel_pool = ChannelPool::new(url.clone(), MAX_CLIENTS_PER_CHANNEL)?; diff --git a/pageserver/client_grpc/src/lib.rs b/pageserver/client_grpc/src/lib.rs index 3fc7178be2..14fb3fbd5a 100644 --- a/pageserver/client_grpc/src/lib.rs +++ b/pageserver/client_grpc/src/lib.rs @@ -3,4 +3,4 @@ mod pool; mod retry; mod split; -pub use client::PageserverClient; +pub use client::{PageserverClient, ShardSpec}; diff --git a/pageserver/client_grpc/src/retry.rs b/pageserver/client_grpc/src/retry.rs index b0473204d7..a4d4b19870 100644 --- a/pageserver/client_grpc/src/retry.rs +++ b/pageserver/client_grpc/src/retry.rs @@ -131,7 +131,6 @@ impl Retry { tonic::Code::Aborted => true, tonic::Code::Cancelled => true, tonic::Code::DeadlineExceeded => true, // maybe transient slowness - tonic::Code::Internal => true, // maybe transient failure? tonic::Code::ResourceExhausted => true, tonic::Code::Unavailable => true, @@ -139,6 +138,10 @@ impl Retry { tonic::Code::AlreadyExists => false, tonic::Code::DataLoss => false, tonic::Code::FailedPrecondition => false, + // NB: don't retry Internal. It is intended for serious errors such as invariant + // violations, and is also used for client-side invariant checks that would otherwise + // result in retry loops. + tonic::Code::Internal => false, tonic::Code::InvalidArgument => false, tonic::Code::NotFound => false, tonic::Code::OutOfRange => false, diff --git a/pageserver/client_grpc/src/split.rs b/pageserver/client_grpc/src/split.rs index 5bbcaab393..57c9299b96 100644 --- a/pageserver/client_grpc/src/split.rs +++ b/pageserver/client_grpc/src/split.rs @@ -97,7 +97,8 @@ impl GetPageSplitter { self.requests.drain() } - /// Adds a response from the given shard. + /// Adds a response from the given shard. The response must match the request ID and have an OK + /// status code. A response must not already exist for the given shard ID. #[allow(clippy::result_large_err)] pub fn add_response( &mut self, @@ -105,24 +106,30 @@ impl GetPageSplitter { response: page_api::GetPageResponse, ) -> tonic::Result<()> { // The caller should already have converted status codes into tonic::Status. - assert_eq!(response.status_code, page_api::GetPageStatusCode::Ok); + if response.status_code != page_api::GetPageStatusCode::Ok { + return Err(tonic::Status::internal(format!( + "unexpected non-OK response for shard {shard_id}: {:?}", + response.status_code + ))); + } - // Make sure the response matches the request ID. + // The stream pool ensures the response matches the request ID. if response.request_id != self.request_id { return Err(tonic::Status::internal(format!( - "response ID {} does not match request ID {}", - response.request_id, self.request_id + "response ID mismatch for shard {shard_id}: expected {}, got {}", + self.request_id, response.request_id + ))); + } + + // We only dispatch one request per shard. + if self.responses.contains_key(&shard_id) { + return Err(tonic::Status::internal(format!( + "duplicate response for shard {shard_id}" ))); } // Add the response data to the map. - let old = self.responses.insert(shard_id, response.page_images); - - if old.is_some() { - return Err(tonic::Status::internal(format!( - "duplicate response for shard {shard_id}", - ))); - } + self.responses.insert(shard_id, response.page_images); Ok(()) }