Files
neon/pageserver/client_grpc/src/client.rs
Erik Grinaker 3915995530 pageserver/client_grpc: add rich Pageserver gRPC client (#12462)
## Problem

For the communicator, we need a rich Pageserver gRPC client.

Touches #11735.
Requires #12434.

## Summary of changes

This patch adds an initial rich Pageserver gRPC client. It supports:

* Sharded tenants across multiple Pageservers.
* Pooling of connections, clients, and streams for efficient resource
use.
* Concurrent use by many callers.
* Internal handling of GetPage bidirectional streams, with pipelining
and error handling.
* Automatic retries.
* Observability.

The client is still under development. In particular, it needs GetPage
batch splitting, shard map updates, and performance optimization. This
will be addressed in follow-up PRs.
2025-07-09 11:42:46 +00:00

304 lines
11 KiB
Rust

use std::collections::HashMap;
use std::sync::Arc;
use anyhow::anyhow;
use tracing::instrument;
use crate::pool::{ChannelPool, ClientGuard, ClientPool, StreamGuard, StreamPool};
use crate::retry::Retry;
use compute_api::spec::PageserverProtocol;
use pageserver_api::key::{Key, rel_block_to_key};
use pageserver_api::shard::{ShardStripeSize, key_to_shard_number};
use pageserver_page_api as page_api;
use utils::id::{TenantId, TimelineId};
use utils::shard::{ShardCount, ShardIndex, ShardNumber};
/// A rich Pageserver gRPC client for a single tenant timeline. This client is more capable than the
/// basic `page_api::Client` gRPC client, and supports:
///
/// * Sharded tenants across multiple Pageservers.
/// * Pooling of connections, clients, and streams for efficient resource use.
/// * Concurrent use by many callers.
/// * Internal handling of GetPage bidirectional streams, with pipelining and error handling.
/// * Automatic retries.
/// * Observability.
///
/// TODO: this client does not support base backups or LSN leases, as these are only used by
/// compute_ctl. Consider adding this, but LSN leases need concurrent requests on all shards.
pub struct PageserverClient {
// TODO: support swapping out the shard map, e.g. via an ArcSwap.
shards: Shards,
retry: Retry,
}
impl PageserverClient {
/// Creates a new Pageserver client for a given tenant and timeline. Uses the Pageservers given
/// in the shard map, which must be complete and must use gRPC URLs.
pub fn new(
tenant_id: TenantId,
timeline_id: TimelineId,
shard_map: HashMap<ShardIndex, String>,
stripe_size: ShardStripeSize,
auth_token: Option<String>,
) -> anyhow::Result<Self> {
let shards = Shards::new(tenant_id, timeline_id, shard_map, stripe_size, auth_token)?;
Ok(Self {
shards,
retry: Retry,
})
}
/// Returns whether a relation exists.
#[instrument(skip_all, fields(rel=%req.rel, lsn=%req.read_lsn))]
pub async fn check_rel_exists(
&self,
req: page_api::CheckRelExistsRequest,
) -> tonic::Result<page_api::CheckRelExistsResponse> {
self.retry
.with(async || {
// Relation metadata is only available on shard 0.
let mut client = self.shards.get_zero().client().await?;
client.check_rel_exists(req).await
})
.await
}
/// Returns the total size of a database, as # of bytes.
#[instrument(skip_all, fields(db_oid=%req.db_oid, lsn=%req.read_lsn))]
pub async fn get_db_size(
&self,
req: page_api::GetDbSizeRequest,
) -> tonic::Result<page_api::GetDbSizeResponse> {
self.retry
.with(async || {
// Relation metadata is only available on shard 0.
let mut client = self.shards.get_zero().client().await?;
client.get_db_size(req).await
})
.await
}
/// Fetches a page. The `request_id` must be unique across all in-flight requests.
///
/// Unlike the `page_api::Client`, this client automatically converts `status_code` into
/// `tonic::Status` errors. All responses will have `GetPageStatusCode::Ok`.
#[instrument(skip_all, fields(
req_id = %req.request_id,
rel = %req.rel,
blkno = %req.block_numbers[0],
blks = %req.block_numbers.len(),
lsn = %req.read_lsn,
))]
pub async fn get_page(
&self,
req: page_api::GetPageRequest,
) -> tonic::Result<page_api::GetPageResponse> {
// TODO: this needs to split batch requests across shards and reassemble responses into a
// single response. It must also re-split the batch in case the shard map changes. For now,
// just use the first page.
let key = rel_block_to_key(
req.rel,
req.block_numbers
.first()
.copied()
.ok_or_else(|| tonic::Status::invalid_argument("no block numbers provided"))?,
);
self.retry
.with(async || {
let stream = self.shards.get_for_key(key).stream().await;
let resp = stream.send(req.clone()).await?;
if resp.status_code != page_api::GetPageStatusCode::Ok {
return Err(tonic::Status::new(
resp.status_code.into(),
resp.reason.unwrap_or_else(|| String::from("unknown error")),
));
}
Ok(resp)
})
.await
}
/// Returns the size of a relation, as # of blocks.
#[instrument(skip_all, fields(rel=%req.rel, lsn=%req.read_lsn))]
pub async fn get_rel_size(
&self,
req: page_api::GetRelSizeRequest,
) -> tonic::Result<page_api::GetRelSizeResponse> {
self.retry
.with(async || {
// Relation metadata is only available on shard 0.
let mut client = self.shards.get_zero().client().await?;
client.get_rel_size(req).await
})
.await
}
/// Fetches an SLRU segment.
#[instrument(skip_all, fields(kind=%req.kind, segno=%req.segno, lsn=%req.read_lsn))]
pub async fn get_slru_segment(
&self,
req: page_api::GetSlruSegmentRequest,
) -> tonic::Result<page_api::GetSlruSegmentResponse> {
self.retry
.with(async || {
// SLRU segments are only available on shard 0.
let mut client = self.shards.get_zero().client().await?;
client.get_slru_segment(req).await
})
.await
}
}
/// Tracks the tenant's shards.
struct Shards {
/// The shard count.
///
/// NB: this is 0 for unsharded tenants, following `ShardIndex::unsharded()` convention.
count: ShardCount,
/// The stripe size. Only used for sharded tenants.
stripe_size: ShardStripeSize,
/// Shards by shard index.
///
/// NB: unsharded tenants use count 0, like `ShardIndex::unsharded()`.
///
/// INVARIANT: every shard 0..count is present.
/// INVARIANT: shard 0 is always present.
map: HashMap<ShardIndex, Shard>,
}
impl Shards {
/// Creates a new set of shards based on a shard map.
fn new(
tenant_id: TenantId,
timeline_id: TimelineId,
shard_map: HashMap<ShardIndex, String>,
stripe_size: ShardStripeSize,
auth_token: Option<String>,
) -> anyhow::Result<Self> {
let count = match shard_map.len() {
0 => return Err(anyhow!("no shards provided")),
1 => ShardCount::new(0), // NB: unsharded tenants use 0, like `ShardIndex::unsharded()`
n if n > u8::MAX as usize => return Err(anyhow!("too many shards: {n}")),
n => ShardCount::new(n as u8),
};
let mut map = HashMap::new();
for (shard_id, url) in shard_map {
// The shard index must match the computed shard count, even for unsharded tenants.
if shard_id.shard_count != count {
return Err(anyhow!("invalid shard index {shard_id}, expected {count}"));
}
// The shard index' number and count must be consistent.
if !shard_id.is_unsharded() && shard_id.shard_number.0 >= shard_id.shard_count.0 {
return Err(anyhow!("invalid shard index {shard_id}"));
}
// The above conditions guarantee that we have all shards 0..count: len() matches count,
// shard number < count, and numbers are unique (via hashmap).
let shard = Shard::new(url, tenant_id, timeline_id, shard_id, auth_token.clone())?;
map.insert(shard_id, shard);
}
Ok(Self {
count,
stripe_size,
map,
})
}
/// Looks up the given shard.
#[allow(clippy::result_large_err)] // TODO: check perf impact
fn get(&self, shard_id: ShardIndex) -> tonic::Result<&Shard> {
self.map
.get(&shard_id)
.ok_or_else(|| tonic::Status::not_found(format!("unknown shard {shard_id}")))
}
/// Looks up the shard that owns the given key.
fn get_for_key(&self, key: Key) -> &Shard {
let shard_number = key_to_shard_number(self.count, self.stripe_size, &key);
self.get(ShardIndex::new(shard_number, self.count))
.expect("must exist")
}
/// Returns shard 0.
fn get_zero(&self) -> &Shard {
self.get(ShardIndex::new(ShardNumber(0), self.count))
.expect("always present")
}
}
/// A single shard.
///
/// TODO: consider separate pools for normal and bulk traffic, with different settings.
struct Shard {
/// Dedicated channel pool for this shard. Shared by all clients/streams in this shard.
_channel_pool: Arc<ChannelPool>,
/// Unary gRPC client pool for this shard. Uses the shared channel pool.
client_pool: Arc<ClientPool>,
/// GetPage stream pool for this shard. Uses a dedicated client pool, but shares the channel
/// pool with unary clients.
stream_pool: Arc<StreamPool>,
}
impl Shard {
/// Creates a new shard. It has its own dedicated resource pools.
fn new(
url: String,
tenant_id: TenantId,
timeline_id: TimelineId,
shard_id: ShardIndex,
auth_token: Option<String>,
) -> anyhow::Result<Self> {
// Sanity-check that the URL uses gRPC.
if PageserverProtocol::from_connstring(&url)? != PageserverProtocol::Grpc {
return Err(anyhow!("invalid shard URL {url}: must use gRPC"));
}
// Use a common channel pool for all clients, to multiplex unary and stream requests across
// the same TCP connections. The channel pool is unbounded (but client pools are bounded).
let channel_pool = ChannelPool::new(url)?;
// Dedicated client pool for unary requests.
let client_pool = ClientPool::new(
channel_pool.clone(),
tenant_id,
timeline_id,
shard_id,
auth_token.clone(),
);
// Stream pool with dedicated client pool. If this shared a client pool with unary requests,
// long-lived streams could fill up the client pool and starve out unary requests. It shares
// the same underlying channel pool with unary clients though, which is unbounded.
let stream_pool = StreamPool::new(ClientPool::new(
channel_pool.clone(),
tenant_id,
timeline_id,
shard_id,
auth_token,
));
Ok(Self {
_channel_pool: channel_pool,
client_pool,
stream_pool,
})
}
/// Returns a pooled client for this shard.
async fn client(&self) -> tonic::Result<ClientGuard> {
self.client_pool
.get()
.await
.map_err(|err| tonic::Status::internal(format!("failed to get client: {err}")))
}
/// Returns a pooled stream for this shard.
async fn stream(&self) -> StreamGuard {
self.stream_pool.get().await
}
}