diff --git a/libs/pageserver_api/src/shard.rs b/libs/pageserver_api/src/shard.rs index 5a13aace64..d6f4cd5e66 100644 --- a/libs/pageserver_api/src/shard.rs +++ b/libs/pageserver_api/src/shard.rs @@ -332,7 +332,11 @@ fn hash_combine(mut a: u32, mut b: u32) -> u32 { /// /// The mapping of key to shard is not stable across changes to ShardCount: this is intentional /// and will be handled at higher levels when shards are split. -fn key_to_shard_number(count: ShardCount, stripe_size: ShardStripeSize, key: &Key) -> ShardNumber { +pub fn key_to_shard_number( + count: ShardCount, + stripe_size: ShardStripeSize, + key: &Key, +) -> ShardNumber { // Fast path for un-sharded tenants or broadcast keys if count < ShardCount(2) || key_is_shard0(key) { return ShardNumber(0); diff --git a/pageserver/client_grpc/src/client.rs b/pageserver/client_grpc/src/client.rs index ad2ee18761..546c9d41bd 100644 --- a/pageserver/client_grpc/src/client.rs +++ b/pageserver/client_grpc/src/client.rs @@ -1,8 +1,10 @@ use std::collections::HashMap; use std::sync::Arc; -use anyhow::{anyhow, ensure}; +use anyhow::anyhow; use compute_api::spec::PageserverProtocol; +use pageserver_api::key::{Key, rel_block_to_key}; +use pageserver_api::shard::{ShardStripeSize, key_to_shard_number}; use pageserver_page_api as page_api; use tokio::time::Instant; use tracing::{error, info, instrument, warn}; @@ -25,6 +27,7 @@ use crate::pool::{ChannelPool, ClientGuard, ClientPool, StreamGuard, StreamPool} /// TODO: this client does not support base backups or LSN leases, as these are only used by /// compute_ctl. Consider adding this, but LSN leases need concurrent requests on all shards. pub struct PageserverClient { + // TODO: support swapping out the shard map, e.g. via an ArcSwap. shards: Shards, } @@ -35,9 +38,10 @@ impl PageserverClient { tenant_id: TenantId, timeline_id: TimelineId, shard_map: HashMap, + stripe_size: ShardStripeSize, auth_token: Option, ) -> anyhow::Result { - let shards = Shards::new(tenant_id, timeline_id, shard_map, auth_token)?; + let shards = Shards::new(tenant_id, timeline_id, shard_map, stripe_size, auth_token)?; Ok(Self { shards }) } @@ -84,11 +88,19 @@ impl PageserverClient { &self, req: page_api::GetPageRequest, ) -> tonic::Result { - // TODO: support multiple shards. - let shard_id = ShardIndex::unsharded(); + // TODO: this needs to split batch requests across shards and reassemble responses into a + // single response. It must also re-split the batch in case the shard map changes. For now, + // just use the first page. + let key = rel_block_to_key( + req.rel, + req.block_numbers + .first() + .copied() + .ok_or_else(|| tonic::Status::invalid_argument("no block numbers provided"))?, + ); self.with_retries(async || { - let stream = self.shards.get(shard_id)?.stream().await; + let stream = self.shards.get_for_key(key).stream().await; let resp = stream.send(req.clone()).await?; if resp.status_code != page_api::GetPageStatusCode::Ok { @@ -249,6 +261,8 @@ struct Shards { /// /// NB: this is 0 for unsharded tenants, following `ShardIndex::unsharded()` convention. count: ShardCount, + /// The stripe size. Only used for sharded tenants. + stripe_size: ShardStripeSize, /// Shards by shard index. /// /// NB: unsharded tenants use count 0, like `ShardIndex::unsharded()`. @@ -264,15 +278,9 @@ impl Shards { tenant_id: TenantId, timeline_id: TimelineId, shard_map: HashMap, + stripe_size: ShardStripeSize, auth_token: Option, ) -> anyhow::Result { - // TODO: support multiple shards. - ensure!(shard_map.len() == 1, "multiple shards not supported"); - ensure!( - shard_map.keys().next() == Some(&ShardIndex::unsharded()), - "only unsharded tenant supported" - ); - let count = match shard_map.len() { 0 => return Err(anyhow!("no shards provided")), 1 => ShardCount::new(0), // NB: unsharded tenants use 0, like `ShardIndex::unsharded()` @@ -296,7 +304,11 @@ impl Shards { map.insert(shard_id, shard); } - Ok(Self { count, map }) + Ok(Self { + count, + stripe_size, + map, + }) } /// Looks up the given shard. @@ -307,6 +319,13 @@ impl Shards { .ok_or_else(|| tonic::Status::not_found(format!("unknown shard {shard_id}"))) } + /// Looks up the shard that owns the given key. + fn get_for_key(&self, key: Key) -> &Shard { + let shard_number = key_to_shard_number(self.count, self.stripe_size, &key); + self.get(ShardIndex::new(shard_number, self.count)) + .expect("must exist") + } + /// Returns shard 0. fn get_zero(&self) -> &Shard { self.get(ShardIndex::new(ShardNumber(0), self.count)) diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 1d824ac846..586f03b19c 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -3353,6 +3353,8 @@ impl GrpcPageServiceHandler { /// TODO: get_vectored() currently enforces a batch limit of 32. Postgres will typically send /// batches up to effective_io_concurrency = 100. Either we have to accept large batches, or /// split them up in the client or server. + /// + /// TODO: verify that the given keys belong to this shard. #[instrument(skip_all, fields(req_id, rel, blkno, blks, req_lsn, mod_lsn))] async fn get_page( ctx: &RequestContext, diff --git a/pgxn/neon/communicator/Cargo.toml b/pgxn/neon/communicator/Cargo.toml index d40c9a66a3..3aefea5688 100644 --- a/pgxn/neon/communicator/Cargo.toml +++ b/pgxn/neon/communicator/Cargo.toml @@ -30,6 +30,7 @@ metrics.workspace = true uring-common = { workspace = true, features = ["bytes"] } pageserver_client_grpc.workspace = true +pageserver_api.workspace = true pageserver_page_api.workspace = true neon-shmem.workspace = true diff --git a/pgxn/neon/communicator/src/worker_process/main_loop.rs b/pgxn/neon/communicator/src/worker_process/main_loop.rs index 8ea297e784..0c960249d4 100644 --- a/pgxn/neon/communicator/src/worker_process/main_loop.rs +++ b/pgxn/neon/communicator/src/worker_process/main_loop.rs @@ -13,6 +13,7 @@ use crate::integrated_cache::{CacheResult, IntegratedCacheWriteAccess}; use crate::neon_request::{CGetPageVRequest, CPrefetchVRequest}; use crate::neon_request::{NeonIORequest, NeonIOResult}; use crate::worker_process::in_progress_ios::{RequestInProgressKey, RequestInProgressTable}; +use pageserver_api::shard::ShardStripeSize; use pageserver_client_grpc::PageserverClient; use pageserver_page_api as page_api; @@ -94,9 +95,11 @@ pub(super) async fn init( .integrated_cache_init_struct .worker_process_init(last_lsn, file_cache); + // TODO: plumb through the stripe size. + let stripe_size = ShardStripeSize::default(); let tenant_id = TenantId::from_str(&tenant_id).expect("invalid tenant ID"); let timeline_id = TimelineId::from_str(&timeline_id).expect("invalid timeline ID"); - let client = PageserverClient::new(tenant_id, timeline_id, shard_map, auth_token) + let client = PageserverClient::new(tenant_id, timeline_id, shard_map, stripe_size, auth_token) .expect("count not create client"); let request_counters = IntCounterVec::new(