From 52c586f6782062d55bb4bf36ed8b3532d40dc812 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Thu, 3 Jul 2025 11:44:27 +0200 Subject: [PATCH] Restructure shard management --- libs/utils/src/shard.rs | 6 + pageserver/client_grpc/src/client.rs | 235 ++++++++++++++++----------- 2 files changed, 146 insertions(+), 95 deletions(-) diff --git a/libs/utils/src/shard.rs b/libs/utils/src/shard.rs index f2b81373e2..633a57c97f 100644 --- a/libs/utils/src/shard.rs +++ b/libs/utils/src/shard.rs @@ -49,6 +49,12 @@ pub struct TenantShardId { pub shard_count: ShardCount, } +impl std::fmt::Display for ShardCount { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.0.fmt(f) + } +} + impl ShardCount { pub const MAX: Self = Self(u8::MAX); pub const MIN: Self = Self(0); diff --git a/pageserver/client_grpc/src/client.rs b/pageserver/client_grpc/src/client.rs index abf3fe6b13..3f7084fe43 100644 --- a/pageserver/client_grpc/src/client.rs +++ b/pageserver/client_grpc/src/client.rs @@ -1,12 +1,12 @@ use std::collections::HashMap; use std::sync::Arc; -use anyhow::ensure; +use anyhow::{anyhow, ensure}; use pageserver_page_api as page_api; use tokio_util::sync::CancellationToken; use utils::backoff; use utils::id::{TenantId, TimelineId}; -use utils::shard::ShardIndex; +use utils::shard::{ShardCount, ShardIndex, ShardNumber}; use crate::pool::{ChannelPool, ClientGuard, ClientPool, StreamGuard, StreamPool}; @@ -18,14 +18,12 @@ use crate::pool::{ChannelPool, ClientGuard, ClientPool, StreamGuard, StreamPool} /// * Concurrent use by many callers. /// * Internal handling of GetPage bidirectional streams. /// * Automatic retries. +/// * Observability. /// /// TODO: this client does not support base backups or LSN leases, as these are only used by -/// compute_ctl. Consider adding this. -/// -/// TODO: use a proper error type. +/// compute_ctl. Consider adding this, but LSN leases need concurrent requests on all shards. pub struct PageserverClient { - /// Resource pools per shard. - pools: HashMap, + shards: Shards, } impl PageserverClient { @@ -36,21 +34,8 @@ impl PageserverClient { shard_map: HashMap, auth_token: Option, ) -> anyhow::Result { - // TODO: support multiple shards. - ensure!(shard_map.len() == 1, "multiple shard not supported"); - ensure!( - shard_map.keys().next() == Some(&ShardIndex::unsharded()), - "only unsharded tenant supported" - ); - - let mut pools = HashMap::new(); - for (shard_id, url) in shard_map { - let shard_pools = - ShardPools::new(url, tenant_id, timeline_id, shard_id, auth_token.clone())?; - pools.insert(shard_id, shard_pools); - } - - Ok(Self { pools }) + let shards = Shards::new(tenant_id, timeline_id, shard_map, auth_token)?; + Ok(Self { shards }) } /// Returns whether a relation exists. @@ -58,11 +43,9 @@ impl PageserverClient { &self, req: page_api::CheckRelExistsRequest, ) -> tonic::Result { - // Relation metadata is only available on shard 0. - let shard_id = self.shard_zero(); - self.with_retries("check_rel_exists", async || { - let mut client = self.get_shard_client(shard_id).await?; + // Relation metadata is only available on shard 0. + let mut client = self.shards.get_zero().client().await?; client.check_rel_exists(req).await }) .await @@ -73,11 +56,9 @@ impl PageserverClient { &self, req: page_api::GetDbSizeRequest, ) -> tonic::Result { - // Relation metadata is only available on shard 0. - let shard_id = self.shard_zero(); - self.with_retries("get_db_size", async || { - let mut client = self.get_shard_client(shard_id).await?; + // Relation metadata is only available on shard 0. + let mut client = self.shards.get_zero().client().await?; client.get_db_size(req).await }) .await @@ -95,7 +76,7 @@ impl PageserverClient { let shard_id = ShardIndex::unsharded(); self.with_retries("get_page", async || { - let stream = self.get_shard_stream(shard_id).await?; + let stream = self.shards.get(shard_id)?.stream().await; let resp = stream.send(req.clone()).await?; if resp.status_code != page_api::GetPageStatusCode::Ok { @@ -115,11 +96,9 @@ impl PageserverClient { &self, req: page_api::GetRelSizeRequest, ) -> tonic::Result { - // Relation metadata is only available on shard 0. - let shard_id = self.shard_zero(); - self.with_retries("get_rel_size", async || { - let mut client = self.get_shard_client(shard_id).await?; + // Relation metadata is only available on shard 0. + let mut client = self.shards.get_zero().client().await?; client.get_rel_size(req).await }) .await @@ -130,45 +109,14 @@ impl PageserverClient { &self, req: page_api::GetSlruSegmentRequest, ) -> tonic::Result { - // SLRU segments are only available on shard 0. - let shard_id = self.shard_zero(); - self.with_retries("get_slru_segment", async || { - let mut client = self.get_shard_client(shard_id).await?; + // SLRU segments are only available on shard 0. + let mut client = self.shards.get_zero().client().await?; client.get_slru_segment(req).await }) .await } - /// Returns a pooled `page_api::Client` for the given shard. - async fn get_shard_client(&self, shard_id: ShardIndex) -> tonic::Result { - self.pools - .get(&shard_id) - .ok_or_else(|| tonic::Status::not_found(format!("unknown shard {shard_id}")))? - .clients - .get() - .await - .map_err(|err| tonic::Status::internal(format!("failed to acquire client: {err}"))) - } - - /// Returns a pooled stream for the given shard. - #[allow(clippy::result_large_err)] // TODO: revisit - async fn get_shard_stream(&self, shard_id: ShardIndex) -> tonic::Result { - Ok(self - .pools - .get(&shard_id) - .ok_or_else(|| tonic::Status::not_found(format!("unknown shard {shard_id}")))? - .streams - .get() - .await) - } - - /// Returns the shard index for shard 0. - fn shard_zero(&self) -> ShardIndex { - // TODO: support multiple shards. - ShardIndex::unsharded() - } - /// Runs the given closure with exponential backoff retries. async fn with_retries(&self, name: &str, f: F) -> tonic::Result where @@ -176,16 +124,15 @@ impl PageserverClient { O: Future>, { /// TODO: tune retry parameters (retry forever?). - /// TODO: add timeouts. + /// TODO: add timeouts? const WARN_THRESHOLD: u32 = 1; const MAX_RETRIES: u32 = 10; - // TODO: cancellation. - let cancel = CancellationToken::new(); fn is_permanent(err: &tonic::Status) -> bool { match err.code() { // Not really an error, but whatever. Don't retry. tonic::Code::Ok => true, + // These codes are transient, so retry them. tonic::Code::Aborted => false, tonic::Code::Cancelled => false, @@ -193,7 +140,8 @@ impl PageserverClient { tonic::Code::Internal => false, // maybe transient failure tonic::Code::ResourceExhausted => false, tonic::Code::Unavailable => false, - tonic::Code::Unknown => false, // may as well retry + tonic::Code::Unknown => false, // may as well retry? + // The following codes will like continue to fail, so don't retry. tonic::Code::AlreadyExists => true, tonic::Code::DataLoss => true, @@ -207,26 +155,102 @@ impl PageserverClient { } } + // TODO: consider custom logic and logging here, using the caller's span for name. + // TODO: cancellation? Could just drop the future. + let cancel = CancellationToken::new(); backoff::retry(f, is_permanent, WARN_THRESHOLD, MAX_RETRIES, name, &cancel) .await .expect("never cancelled (for now)") } } -/// Resource pools for a single shard. -/// -/// TODO: consider separate pools for normal and bulk traffic, with different settings. -struct ShardPools { - /// Manages unary gRPC clients for this shard. - clients: Arc, - /// Manages gRPC GetPage streams for this shard. Uses a dedicated client pool, but shares the - /// channel pool with unary clients. - streams: Arc, +/// Tracks the tenant's shards. +struct Shards { + /// The shard count. + /// + /// NB: this is 0 for unsharded tenants, following `ShardIndex::unsharded()` convention. + count: ShardCount, + /// Shards by shard index. + /// + /// NB: unsharded tenants use count 0, like `ShardIndex::unsharded()`. + /// + /// INVARIANT: every shard 0..count is present. + /// INVARIANT: shard 0 is always present. + map: HashMap, } -impl ShardPools { - /// Creates a new set of resource pools for the given shard. - pub fn new( +impl Shards { + /// Creates a new set of shards based on a shard map. + fn new( + tenant_id: TenantId, + timeline_id: TimelineId, + shard_map: HashMap, + auth_token: Option, + ) -> anyhow::Result { + // TODO: support multiple shards. + ensure!(shard_map.len() == 1, "multiple shards not supported"); + ensure!( + shard_map.keys().next() == Some(&ShardIndex::unsharded()), + "only unsharded tenant supported" + ); + + let count = match shard_map.len() { + 0 => return Err(anyhow!("no shards provided")), + 1 => ShardCount::new(0), // NB: unsharded tenants use 0, like `ShardIndex::unsharded()` + n if n > u8::MAX as usize => return Err(anyhow!("too many shards: {n}")), + n => ShardCount::new(n as u8), + }; + + let mut map = HashMap::new(); + for (shard_id, url) in shard_map { + // The shard index must match the computed shard count, even for unsharded tenants. + if shard_id.shard_count != count { + return Err(anyhow!("invalid shard index {shard_id}, expected {count}")); + } + // The shard index' number and count must be consistent. + if !shard_id.is_unsharded() && shard_id.shard_number.0 >= shard_id.shard_count.0 { + return Err(anyhow!("invalid shard index {shard_id}")); + } + // The above conditions guarantee that we have all shards 0..count: len() matches count, + // shard number < count, and numbers are unique (via hashmap). + let shard = Shard::new(url, tenant_id, timeline_id, shard_id, auth_token.clone())?; + map.insert(shard_id, shard); + } + + Ok(Self { count, map }) + } + + /// Looks up the given shard. + #[allow(clippy::result_large_err)] // TODO: check perf impact + fn get(&self, shard_id: ShardIndex) -> tonic::Result<&Shard> { + self.map + .get(&shard_id) + .ok_or_else(|| tonic::Status::not_found(format!("unknown shard {shard_id}"))) + } + + /// Returns shard 0. + fn get_zero(&self) -> &Shard { + self.get(ShardIndex::new(ShardNumber(0), self.count)) + .expect("always present") + } +} + +/// A single shard. +/// +/// TODO: consider separate pools for normal and bulk traffic, with different settings. +struct Shard { + /// Dedicated channel pool for this shard. Used by all clients/streams in this shard. + _channel_pool: Arc, + /// Unary gRPC client pool for this shard. Uses the shared channel pool. + client_pool: Arc, + /// GetPage stream pool for this shard. Uses a dedicated client pool, but shares the channel + /// pool with unary clients. + stream_pool: Arc, +} + +impl Shard { + /// Creates a new shard. It has its own dedicated resource pools. + fn new( url: String, tenant_id: TenantId, timeline_id: TimelineId, @@ -234,25 +258,46 @@ impl ShardPools { auth_token: Option, ) -> anyhow::Result { // Use a common channel pool for all clients, to multiplex unary and stream requests across - // the same TCP connections. The channel pool is unbounded (client pools are bounded). - let channels = ChannelPool::new(url)?; + // the same TCP connections. The channel pool is unbounded (but client pools are bounded). + let channel_pool = ChannelPool::new(url)?; // Dedicated client pool for unary requests. - let clients = ClientPool::new( - channels.clone(), + let client_pool = ClientPool::new( + channel_pool.clone(), tenant_id, timeline_id, shard_id, auth_token.clone(), ); - // Dedicated client pool for streams. If this shared a client pool with unary requests, - // long-lived streams could fill up the client pool and starve out unary requests. It - // shares the same underlying channel pool with unary clients though. - let stream_clients = - ClientPool::new(channels, tenant_id, timeline_id, shard_id, auth_token); - let streams = StreamPool::new(stream_clients); + // Stream pool with dedicated client pool. If this shared a client pool with unary requests, + // long-lived streams could fill up the client pool and starve out unary requests. It shares + // the same underlying channel pool with unary clients though, which is unbounded. + let stream_pool = StreamPool::new(ClientPool::new( + channel_pool.clone(), + tenant_id, + timeline_id, + shard_id, + auth_token, + )); - Ok(Self { clients, streams }) + Ok(Self { + _channel_pool: channel_pool, + client_pool, + stream_pool, + }) + } + + /// Returns a pooled client for this shard. + async fn client(&self) -> tonic::Result { + self.client_pool + .get() + .await + .map_err(|err| tonic::Status::internal(format!("failed to get client: {err}"))) + } + + /// Returns a pooled stream for this shard. + async fn stream(&self) -> StreamGuard { + self.stream_pool.get().await } }