Restructure shard management

This commit is contained in:
Erik Grinaker
2025-07-03 11:44:27 +02:00
parent de97b73d6e
commit 52c586f678
2 changed files with 146 additions and 95 deletions

View File

@@ -49,6 +49,12 @@ pub struct TenantShardId {
pub shard_count: ShardCount,
}
impl std::fmt::Display for ShardCount {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}
impl ShardCount {
pub const MAX: Self = Self(u8::MAX);
pub const MIN: Self = Self(0);

View File

@@ -1,12 +1,12 @@
use std::collections::HashMap;
use std::sync::Arc;
use anyhow::ensure;
use anyhow::{anyhow, ensure};
use pageserver_page_api as page_api;
use tokio_util::sync::CancellationToken;
use utils::backoff;
use utils::id::{TenantId, TimelineId};
use utils::shard::ShardIndex;
use utils::shard::{ShardCount, ShardIndex, ShardNumber};
use crate::pool::{ChannelPool, ClientGuard, ClientPool, StreamGuard, StreamPool};
@@ -18,14 +18,12 @@ use crate::pool::{ChannelPool, ClientGuard, ClientPool, StreamGuard, StreamPool}
/// * Concurrent use by many callers.
/// * Internal handling of GetPage bidirectional streams.
/// * Automatic retries.
/// * Observability.
///
/// TODO: this client does not support base backups or LSN leases, as these are only used by
/// compute_ctl. Consider adding this.
///
/// TODO: use a proper error type.
/// compute_ctl. Consider adding this, but LSN leases need concurrent requests on all shards.
pub struct PageserverClient {
/// Resource pools per shard.
pools: HashMap<ShardIndex, ShardPools>,
shards: Shards,
}
impl PageserverClient {
@@ -36,21 +34,8 @@ impl PageserverClient {
shard_map: HashMap<ShardIndex, String>,
auth_token: Option<String>,
) -> anyhow::Result<Self> {
// TODO: support multiple shards.
ensure!(shard_map.len() == 1, "multiple shard not supported");
ensure!(
shard_map.keys().next() == Some(&ShardIndex::unsharded()),
"only unsharded tenant supported"
);
let mut pools = HashMap::new();
for (shard_id, url) in shard_map {
let shard_pools =
ShardPools::new(url, tenant_id, timeline_id, shard_id, auth_token.clone())?;
pools.insert(shard_id, shard_pools);
}
Ok(Self { pools })
let shards = Shards::new(tenant_id, timeline_id, shard_map, auth_token)?;
Ok(Self { shards })
}
/// Returns whether a relation exists.
@@ -58,11 +43,9 @@ impl PageserverClient {
&self,
req: page_api::CheckRelExistsRequest,
) -> tonic::Result<page_api::CheckRelExistsResponse> {
// Relation metadata is only available on shard 0.
let shard_id = self.shard_zero();
self.with_retries("check_rel_exists", async || {
let mut client = self.get_shard_client(shard_id).await?;
// Relation metadata is only available on shard 0.
let mut client = self.shards.get_zero().client().await?;
client.check_rel_exists(req).await
})
.await
@@ -73,11 +56,9 @@ impl PageserverClient {
&self,
req: page_api::GetDbSizeRequest,
) -> tonic::Result<page_api::GetDbSizeResponse> {
// Relation metadata is only available on shard 0.
let shard_id = self.shard_zero();
self.with_retries("get_db_size", async || {
let mut client = self.get_shard_client(shard_id).await?;
// Relation metadata is only available on shard 0.
let mut client = self.shards.get_zero().client().await?;
client.get_db_size(req).await
})
.await
@@ -95,7 +76,7 @@ impl PageserverClient {
let shard_id = ShardIndex::unsharded();
self.with_retries("get_page", async || {
let stream = self.get_shard_stream(shard_id).await?;
let stream = self.shards.get(shard_id)?.stream().await;
let resp = stream.send(req.clone()).await?;
if resp.status_code != page_api::GetPageStatusCode::Ok {
@@ -115,11 +96,9 @@ impl PageserverClient {
&self,
req: page_api::GetRelSizeRequest,
) -> tonic::Result<page_api::GetRelSizeResponse> {
// Relation metadata is only available on shard 0.
let shard_id = self.shard_zero();
self.with_retries("get_rel_size", async || {
let mut client = self.get_shard_client(shard_id).await?;
// Relation metadata is only available on shard 0.
let mut client = self.shards.get_zero().client().await?;
client.get_rel_size(req).await
})
.await
@@ -130,45 +109,14 @@ impl PageserverClient {
&self,
req: page_api::GetSlruSegmentRequest,
) -> tonic::Result<page_api::GetSlruSegmentResponse> {
// SLRU segments are only available on shard 0.
let shard_id = self.shard_zero();
self.with_retries("get_slru_segment", async || {
let mut client = self.get_shard_client(shard_id).await?;
// SLRU segments are only available on shard 0.
let mut client = self.shards.get_zero().client().await?;
client.get_slru_segment(req).await
})
.await
}
/// Returns a pooled `page_api::Client` for the given shard.
async fn get_shard_client(&self, shard_id: ShardIndex) -> tonic::Result<ClientGuard> {
self.pools
.get(&shard_id)
.ok_or_else(|| tonic::Status::not_found(format!("unknown shard {shard_id}")))?
.clients
.get()
.await
.map_err(|err| tonic::Status::internal(format!("failed to acquire client: {err}")))
}
/// Returns a pooled stream for the given shard.
#[allow(clippy::result_large_err)] // TODO: revisit
async fn get_shard_stream(&self, shard_id: ShardIndex) -> tonic::Result<StreamGuard> {
Ok(self
.pools
.get(&shard_id)
.ok_or_else(|| tonic::Status::not_found(format!("unknown shard {shard_id}")))?
.streams
.get()
.await)
}
/// Returns the shard index for shard 0.
fn shard_zero(&self) -> ShardIndex {
// TODO: support multiple shards.
ShardIndex::unsharded()
}
/// Runs the given closure with exponential backoff retries.
async fn with_retries<T, F, O>(&self, name: &str, f: F) -> tonic::Result<T>
where
@@ -176,16 +124,15 @@ impl PageserverClient {
O: Future<Output = tonic::Result<T>>,
{
/// TODO: tune retry parameters (retry forever?).
/// TODO: add timeouts.
/// TODO: add timeouts?
const WARN_THRESHOLD: u32 = 1;
const MAX_RETRIES: u32 = 10;
// TODO: cancellation.
let cancel = CancellationToken::new();
fn is_permanent(err: &tonic::Status) -> bool {
match err.code() {
// Not really an error, but whatever. Don't retry.
tonic::Code::Ok => true,
// These codes are transient, so retry them.
tonic::Code::Aborted => false,
tonic::Code::Cancelled => false,
@@ -193,7 +140,8 @@ impl PageserverClient {
tonic::Code::Internal => false, // maybe transient failure
tonic::Code::ResourceExhausted => false,
tonic::Code::Unavailable => false,
tonic::Code::Unknown => false, // may as well retry
tonic::Code::Unknown => false, // may as well retry?
// The following codes will like continue to fail, so don't retry.
tonic::Code::AlreadyExists => true,
tonic::Code::DataLoss => true,
@@ -207,26 +155,102 @@ impl PageserverClient {
}
}
// TODO: consider custom logic and logging here, using the caller's span for name.
// TODO: cancellation? Could just drop the future.
let cancel = CancellationToken::new();
backoff::retry(f, is_permanent, WARN_THRESHOLD, MAX_RETRIES, name, &cancel)
.await
.expect("never cancelled (for now)")
}
}
/// Resource pools for a single shard.
///
/// TODO: consider separate pools for normal and bulk traffic, with different settings.
struct ShardPools {
/// Manages unary gRPC clients for this shard.
clients: Arc<ClientPool>,
/// Manages gRPC GetPage streams for this shard. Uses a dedicated client pool, but shares the
/// channel pool with unary clients.
streams: Arc<StreamPool>,
/// Tracks the tenant's shards.
struct Shards {
/// The shard count.
///
/// NB: this is 0 for unsharded tenants, following `ShardIndex::unsharded()` convention.
count: ShardCount,
/// Shards by shard index.
///
/// NB: unsharded tenants use count 0, like `ShardIndex::unsharded()`.
///
/// INVARIANT: every shard 0..count is present.
/// INVARIANT: shard 0 is always present.
map: HashMap<ShardIndex, Shard>,
}
impl ShardPools {
/// Creates a new set of resource pools for the given shard.
pub fn new(
impl Shards {
/// Creates a new set of shards based on a shard map.
fn new(
tenant_id: TenantId,
timeline_id: TimelineId,
shard_map: HashMap<ShardIndex, String>,
auth_token: Option<String>,
) -> anyhow::Result<Self> {
// TODO: support multiple shards.
ensure!(shard_map.len() == 1, "multiple shards not supported");
ensure!(
shard_map.keys().next() == Some(&ShardIndex::unsharded()),
"only unsharded tenant supported"
);
let count = match shard_map.len() {
0 => return Err(anyhow!("no shards provided")),
1 => ShardCount::new(0), // NB: unsharded tenants use 0, like `ShardIndex::unsharded()`
n if n > u8::MAX as usize => return Err(anyhow!("too many shards: {n}")),
n => ShardCount::new(n as u8),
};
let mut map = HashMap::new();
for (shard_id, url) in shard_map {
// The shard index must match the computed shard count, even for unsharded tenants.
if shard_id.shard_count != count {
return Err(anyhow!("invalid shard index {shard_id}, expected {count}"));
}
// The shard index' number and count must be consistent.
if !shard_id.is_unsharded() && shard_id.shard_number.0 >= shard_id.shard_count.0 {
return Err(anyhow!("invalid shard index {shard_id}"));
}
// The above conditions guarantee that we have all shards 0..count: len() matches count,
// shard number < count, and numbers are unique (via hashmap).
let shard = Shard::new(url, tenant_id, timeline_id, shard_id, auth_token.clone())?;
map.insert(shard_id, shard);
}
Ok(Self { count, map })
}
/// Looks up the given shard.
#[allow(clippy::result_large_err)] // TODO: check perf impact
fn get(&self, shard_id: ShardIndex) -> tonic::Result<&Shard> {
self.map
.get(&shard_id)
.ok_or_else(|| tonic::Status::not_found(format!("unknown shard {shard_id}")))
}
/// Returns shard 0.
fn get_zero(&self) -> &Shard {
self.get(ShardIndex::new(ShardNumber(0), self.count))
.expect("always present")
}
}
/// A single shard.
///
/// TODO: consider separate pools for normal and bulk traffic, with different settings.
struct Shard {
/// Dedicated channel pool for this shard. Used by all clients/streams in this shard.
_channel_pool: Arc<ChannelPool>,
/// Unary gRPC client pool for this shard. Uses the shared channel pool.
client_pool: Arc<ClientPool>,
/// GetPage stream pool for this shard. Uses a dedicated client pool, but shares the channel
/// pool with unary clients.
stream_pool: Arc<StreamPool>,
}
impl Shard {
/// Creates a new shard. It has its own dedicated resource pools.
fn new(
url: String,
tenant_id: TenantId,
timeline_id: TimelineId,
@@ -234,25 +258,46 @@ impl ShardPools {
auth_token: Option<String>,
) -> anyhow::Result<Self> {
// Use a common channel pool for all clients, to multiplex unary and stream requests across
// the same TCP connections. The channel pool is unbounded (client pools are bounded).
let channels = ChannelPool::new(url)?;
// the same TCP connections. The channel pool is unbounded (but client pools are bounded).
let channel_pool = ChannelPool::new(url)?;
// Dedicated client pool for unary requests.
let clients = ClientPool::new(
channels.clone(),
let client_pool = ClientPool::new(
channel_pool.clone(),
tenant_id,
timeline_id,
shard_id,
auth_token.clone(),
);
// Dedicated client pool for streams. If this shared a client pool with unary requests,
// long-lived streams could fill up the client pool and starve out unary requests. It
// shares the same underlying channel pool with unary clients though.
let stream_clients =
ClientPool::new(channels, tenant_id, timeline_id, shard_id, auth_token);
let streams = StreamPool::new(stream_clients);
// Stream pool with dedicated client pool. If this shared a client pool with unary requests,
// long-lived streams could fill up the client pool and starve out unary requests. It shares
// the same underlying channel pool with unary clients though, which is unbounded.
let stream_pool = StreamPool::new(ClientPool::new(
channel_pool.clone(),
tenant_id,
timeline_id,
shard_id,
auth_token,
));
Ok(Self { clients, streams })
Ok(Self {
_channel_pool: channel_pool,
client_pool,
stream_pool,
})
}
/// Returns a pooled client for this shard.
async fn client(&self) -> tonic::Result<ClientGuard> {
self.client_pool
.get()
.await
.map_err(|err| tonic::Status::internal(format!("failed to get client: {err}")))
}
/// Returns a pooled stream for this shard.
async fn stream(&self) -> StreamGuard {
self.stream_pool.get().await
}
}