Add client shard routing

This commit is contained in:
Erik Grinaker
2025-07-03 14:42:35 +02:00
parent d4b4724921
commit 14214eb853
5 changed files with 44 additions and 15 deletions

View File

@@ -332,7 +332,11 @@ fn hash_combine(mut a: u32, mut b: u32) -> u32 {
///
/// The mapping of key to shard is not stable across changes to ShardCount: this is intentional
/// and will be handled at higher levels when shards are split.
fn key_to_shard_number(count: ShardCount, stripe_size: ShardStripeSize, key: &Key) -> ShardNumber {
pub fn key_to_shard_number(
count: ShardCount,
stripe_size: ShardStripeSize,
key: &Key,
) -> ShardNumber {
// Fast path for un-sharded tenants or broadcast keys
if count < ShardCount(2) || key_is_shard0(key) {
return ShardNumber(0);

View File

@@ -1,8 +1,10 @@
use std::collections::HashMap;
use std::sync::Arc;
use anyhow::{anyhow, ensure};
use anyhow::anyhow;
use compute_api::spec::PageserverProtocol;
use pageserver_api::key::{Key, rel_block_to_key};
use pageserver_api::shard::{ShardStripeSize, key_to_shard_number};
use pageserver_page_api as page_api;
use tokio::time::Instant;
use tracing::{error, info, instrument, warn};
@@ -25,6 +27,7 @@ use crate::pool::{ChannelPool, ClientGuard, ClientPool, StreamGuard, StreamPool}
/// TODO: this client does not support base backups or LSN leases, as these are only used by
/// compute_ctl. Consider adding this, but LSN leases need concurrent requests on all shards.
pub struct PageserverClient {
// TODO: support swapping out the shard map, e.g. via an ArcSwap.
shards: Shards,
}
@@ -35,9 +38,10 @@ impl PageserverClient {
tenant_id: TenantId,
timeline_id: TimelineId,
shard_map: HashMap<ShardIndex, String>,
stripe_size: ShardStripeSize,
auth_token: Option<String>,
) -> anyhow::Result<Self> {
let shards = Shards::new(tenant_id, timeline_id, shard_map, auth_token)?;
let shards = Shards::new(tenant_id, timeline_id, shard_map, stripe_size, auth_token)?;
Ok(Self { shards })
}
@@ -84,11 +88,19 @@ impl PageserverClient {
&self,
req: page_api::GetPageRequest,
) -> tonic::Result<page_api::GetPageResponse> {
// TODO: support multiple shards.
let shard_id = ShardIndex::unsharded();
// TODO: this needs to split batch requests across shards and reassemble responses into a
// single response. It must also re-split the batch in case the shard map changes. For now,
// just use the first page.
let key = rel_block_to_key(
req.rel,
req.block_numbers
.first()
.copied()
.ok_or_else(|| tonic::Status::invalid_argument("no block numbers provided"))?,
);
self.with_retries(async || {
let stream = self.shards.get(shard_id)?.stream().await;
let stream = self.shards.get_for_key(key).stream().await;
let resp = stream.send(req.clone()).await?;
if resp.status_code != page_api::GetPageStatusCode::Ok {
@@ -249,6 +261,8 @@ struct Shards {
///
/// NB: this is 0 for unsharded tenants, following `ShardIndex::unsharded()` convention.
count: ShardCount,
/// The stripe size. Only used for sharded tenants.
stripe_size: ShardStripeSize,
/// Shards by shard index.
///
/// NB: unsharded tenants use count 0, like `ShardIndex::unsharded()`.
@@ -264,15 +278,9 @@ impl Shards {
tenant_id: TenantId,
timeline_id: TimelineId,
shard_map: HashMap<ShardIndex, String>,
stripe_size: ShardStripeSize,
auth_token: Option<String>,
) -> anyhow::Result<Self> {
// TODO: support multiple shards.
ensure!(shard_map.len() == 1, "multiple shards not supported");
ensure!(
shard_map.keys().next() == Some(&ShardIndex::unsharded()),
"only unsharded tenant supported"
);
let count = match shard_map.len() {
0 => return Err(anyhow!("no shards provided")),
1 => ShardCount::new(0), // NB: unsharded tenants use 0, like `ShardIndex::unsharded()`
@@ -296,7 +304,11 @@ impl Shards {
map.insert(shard_id, shard);
}
Ok(Self { count, map })
Ok(Self {
count,
stripe_size,
map,
})
}
/// Looks up the given shard.
@@ -307,6 +319,13 @@ impl Shards {
.ok_or_else(|| tonic::Status::not_found(format!("unknown shard {shard_id}")))
}
/// Looks up the shard that owns the given key.
fn get_for_key(&self, key: Key) -> &Shard {
let shard_number = key_to_shard_number(self.count, self.stripe_size, &key);
self.get(ShardIndex::new(shard_number, self.count))
.expect("must exist")
}
/// Returns shard 0.
fn get_zero(&self) -> &Shard {
self.get(ShardIndex::new(ShardNumber(0), self.count))

View File

@@ -3353,6 +3353,8 @@ impl GrpcPageServiceHandler {
/// TODO: get_vectored() currently enforces a batch limit of 32. Postgres will typically send
/// batches up to effective_io_concurrency = 100. Either we have to accept large batches, or
/// split them up in the client or server.
///
/// TODO: verify that the given keys belong to this shard.
#[instrument(skip_all, fields(req_id, rel, blkno, blks, req_lsn, mod_lsn))]
async fn get_page(
ctx: &RequestContext,

View File

@@ -30,6 +30,7 @@ metrics.workspace = true
uring-common = { workspace = true, features = ["bytes"] }
pageserver_client_grpc.workspace = true
pageserver_api.workspace = true
pageserver_page_api.workspace = true
neon-shmem.workspace = true

View File

@@ -13,6 +13,7 @@ use crate::integrated_cache::{CacheResult, IntegratedCacheWriteAccess};
use crate::neon_request::{CGetPageVRequest, CPrefetchVRequest};
use crate::neon_request::{NeonIORequest, NeonIOResult};
use crate::worker_process::in_progress_ios::{RequestInProgressKey, RequestInProgressTable};
use pageserver_api::shard::ShardStripeSize;
use pageserver_client_grpc::PageserverClient;
use pageserver_page_api as page_api;
@@ -94,9 +95,11 @@ pub(super) async fn init(
.integrated_cache_init_struct
.worker_process_init(last_lsn, file_cache);
// TODO: plumb through the stripe size.
let stripe_size = ShardStripeSize::default();
let tenant_id = TenantId::from_str(&tenant_id).expect("invalid tenant ID");
let timeline_id = TimelineId::from_str(&timeline_id).expect("invalid timeline ID");
let client = PageserverClient::new(tenant_id, timeline_id, shard_map, auth_token)
let client = PageserverClient::new(tenant_id, timeline_id, shard_map, stripe_size, auth_token)
.expect("count not create client");
let request_counters = IntCounterVec::new(