diff --git a/Cargo.lock b/Cargo.lock index c49a2daba7..caed814d5f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4499,6 +4499,7 @@ name = "pageserver_client_grpc" version = "0.1.0" dependencies = [ "anyhow", + "bytes", "compute_api", "futures", "pageserver_api", diff --git a/pageserver/client_grpc/Cargo.toml b/pageserver/client_grpc/Cargo.toml index 0a8bcad2ef..84e27abb84 100644 --- a/pageserver/client_grpc/Cargo.toml +++ b/pageserver/client_grpc/Cargo.toml @@ -6,6 +6,7 @@ license.workspace = true [dependencies] anyhow.workspace = true +bytes.workspace = true compute_api.workspace = true futures.workspace = true pageserver_api.workspace = true diff --git a/pageserver/client_grpc/src/client.rs b/pageserver/client_grpc/src/client.rs index 5bccdeede3..c21ce2e47d 100644 --- a/pageserver/client_grpc/src/client.rs +++ b/pageserver/client_grpc/src/client.rs @@ -2,13 +2,15 @@ use std::collections::HashMap; use std::sync::Arc; use anyhow::anyhow; +use futures::stream::FuturesUnordered; +use futures::{FutureExt as _, StreamExt as _}; use tracing::instrument; use crate::pool::{ChannelPool, ClientGuard, ClientPool, StreamGuard, StreamPool}; use crate::retry::Retry; +use crate::split::GetPageSplitter; use compute_api::spec::PageserverProtocol; -use pageserver_api::key::{Key, rel_block_to_key}; -use pageserver_api::shard::{ShardStripeSize, key_to_shard_number}; +use pageserver_api::shard::ShardStripeSize; use pageserver_page_api as page_api; use utils::id::{TenantId, TimelineId}; use utils::shard::{ShardCount, ShardIndex, ShardNumber}; @@ -78,10 +80,11 @@ impl PageserverClient { .await } - /// Fetches a page. The `request_id` must be unique across all in-flight requests. + /// Fetches pages. The `request_id` must be unique across all in-flight requests. Automatically + /// splits requests that straddle shard boundaries, and assembles the responses. /// - /// Unlike the `page_api::Client`, this client automatically converts `status_code` into - /// `tonic::Status` errors. All responses will have `GetPageStatusCode::Ok`. + /// Unlike `page_api::Client`, this automatically converts `status_code` into `tonic::Status` + /// errors. All responses will have `GetPageStatusCode::Ok`. #[instrument(skip_all, fields( req_id = %req.request_id, rel = %req.rel, @@ -93,22 +96,55 @@ impl PageserverClient { &self, req: page_api::GetPageRequest, ) -> tonic::Result { - // TODO: this needs to split batch requests across shards and reassemble responses into a - // single response. It must also re-split the batch in case the shard map changes. For now, - // just use the first page. - let key = rel_block_to_key( - req.rel, - req.block_numbers - .first() - .copied() - .ok_or_else(|| tonic::Status::invalid_argument("no block numbers provided"))?, - ); + // Make sure we have at least one page. + if req.block_numbers.is_empty() { + return Err(tonic::Status::invalid_argument("no block number")); + } - self.retry + // Fast path: request is for a single shard. + if let Some(shard_id) = + GetPageSplitter::is_single_shard(&req, self.shards.count, self.shards.stripe_size) + { + return self.get_page_for_shard(shard_id, req).await; + } + + // Request spans multiple shards. Split it, dispatch concurrent per-shard requests, and + // reassemble the responses. + // + // TODO: when we support shard map updates, we need to detect when it changes and re-split + // the request on errors. + let mut splitter = GetPageSplitter::split(req, self.shards.count, self.shards.stripe_size); + + let mut shard_requests: FuturesUnordered<_> = splitter + .drain_requests() + .map(|(shard_id, shard_req)| { + // NB: each request will retry internally. + self.get_page_for_shard(shard_id, shard_req) + .map(move |result| result.map(|resp| (shard_id, resp))) + }) + .collect(); + + while let Some((shard_id, shard_response)) = shard_requests.next().await.transpose()? { + splitter.add_response(shard_id, shard_response)?; + } + + splitter.assemble_response() + } + + /// Fetches pages that belong to the given shard. + #[instrument(skip_all, fields(shard = %shard_id))] + async fn get_page_for_shard( + &self, + shard_id: ShardIndex, + req: page_api::GetPageRequest, + ) -> tonic::Result { + let resp = self + .retry .with(async || { - let stream = self.shards.get_for_key(key).stream().await; + let stream = self.shards.get(shard_id)?.stream().await; let resp = stream.send(req.clone()).await?; + // Convert per-request errors into a tonic::Status. if resp.status_code != page_api::GetPageStatusCode::Ok { return Err(tonic::Status::new( resp.status_code.into(), @@ -118,7 +154,18 @@ impl PageserverClient { Ok(resp) }) - .await + .await?; + + // Make sure we got the right number of pages. + // NB: check outside of the retry loop, since we don't want to retry this. + let (expected, actual) = (req.block_numbers.len(), resp.page_images.len()); + if expected != actual { + return Err(tonic::Status::internal(format!( + "expected {expected} pages for shard {shard_id}, got {actual}", + ))); + } + + Ok(resp) } /// Returns the size of a relation, as # of blocks. @@ -216,13 +263,6 @@ impl Shards { .ok_or_else(|| tonic::Status::not_found(format!("unknown shard {shard_id}"))) } - /// Looks up the shard that owns the given key. - fn get_for_key(&self, key: Key) -> &Shard { - let shard_number = key_to_shard_number(self.count, self.stripe_size, &key); - self.get(ShardIndex::new(shard_number, self.count)) - .expect("must exist") - } - /// Returns shard 0. fn get_zero(&self) -> &Shard { self.get(ShardIndex::new(ShardNumber(0), self.count)) diff --git a/pageserver/client_grpc/src/lib.rs b/pageserver/client_grpc/src/lib.rs index 2a59f9868c..3fc7178be2 100644 --- a/pageserver/client_grpc/src/lib.rs +++ b/pageserver/client_grpc/src/lib.rs @@ -1,5 +1,6 @@ mod client; mod pool; mod retry; +mod split; pub use client::PageserverClient; diff --git a/pageserver/client_grpc/src/split.rs b/pageserver/client_grpc/src/split.rs new file mode 100644 index 0000000000..5bbcaab393 --- /dev/null +++ b/pageserver/client_grpc/src/split.rs @@ -0,0 +1,172 @@ +use std::collections::HashMap; + +use bytes::Bytes; + +use pageserver_api::key::rel_block_to_key; +use pageserver_api::shard::{ShardStripeSize, key_to_shard_number}; +use pageserver_page_api as page_api; +use utils::shard::{ShardCount, ShardIndex}; + +/// Splits GetPageRequests that straddle shard boundaries and assembles the responses. +/// TODO: add tests for this. +pub struct GetPageSplitter { + /// The original request ID. Used for all shard requests. + request_id: page_api::RequestID, + /// Split requests by shard index. + requests: HashMap, + /// Maps the offset in `GetPageRequest::block_numbers` to the owning shard. Used to assemble + /// the response pages in the same order as the original request. + block_shards: Vec, + /// Page responses by shard index. Will be assembled into a single response. + responses: HashMap>, +} + +impl GetPageSplitter { + /// Checks if the given request only touches a single shard, and returns the shard ID. This is + /// the common case, so we check first in order to avoid unnecessary allocations and overhead. + /// The caller must ensure that the request has at least one block number, or this will panic. + pub fn is_single_shard( + req: &page_api::GetPageRequest, + count: ShardCount, + stripe_size: ShardStripeSize, + ) -> Option { + // Fast path: unsharded tenant. + if count.is_unsharded() { + return Some(ShardIndex::unsharded()); + } + + // Find the base shard index for the first page, and compare with the rest. + let key = rel_block_to_key(req.rel, *req.block_numbers.first().expect("no pages")); + let shard_number = key_to_shard_number(count, stripe_size, &key); + + req.block_numbers + .iter() + .skip(1) // computed above + .all(|&blkno| { + let key = rel_block_to_key(req.rel, blkno); + key_to_shard_number(count, stripe_size, &key) == shard_number + }) + .then_some(ShardIndex::new(shard_number, count)) + } + + /// Splits the given request. + pub fn split( + req: page_api::GetPageRequest, + count: ShardCount, + stripe_size: ShardStripeSize, + ) -> Self { + // The caller should make sure we don't split requests unnecessarily. + debug_assert!( + Self::is_single_shard(&req, count, stripe_size).is_none(), + "unnecessary request split" + ); + + // Split the requests by shard index. + let mut requests = HashMap::with_capacity(2); // common case + let mut block_shards = Vec::with_capacity(req.block_numbers.len()); + for blkno in req.block_numbers { + let key = rel_block_to_key(req.rel, blkno); + let shard_number = key_to_shard_number(count, stripe_size, &key); + let shard_id = ShardIndex::new(shard_number, count); + + let shard_req = requests + .entry(shard_id) + .or_insert_with(|| page_api::GetPageRequest { + request_id: req.request_id, + request_class: req.request_class, + rel: req.rel, + read_lsn: req.read_lsn, + block_numbers: Vec::new(), + }); + shard_req.block_numbers.push(blkno); + block_shards.push(shard_id); + } + + Self { + request_id: req.request_id, + responses: HashMap::with_capacity(requests.len()), + requests, + block_shards, + } + } + + /// Drains the per-shard requests, moving them out of the hashmap to avoid extra allocations. + pub fn drain_requests( + &mut self, + ) -> impl Iterator { + self.requests.drain() + } + + /// Adds a response from the given shard. + #[allow(clippy::result_large_err)] + pub fn add_response( + &mut self, + shard_id: ShardIndex, + response: page_api::GetPageResponse, + ) -> tonic::Result<()> { + // The caller should already have converted status codes into tonic::Status. + assert_eq!(response.status_code, page_api::GetPageStatusCode::Ok); + + // Make sure the response matches the request ID. + if response.request_id != self.request_id { + return Err(tonic::Status::internal(format!( + "response ID {} does not match request ID {}", + response.request_id, self.request_id + ))); + } + + // Add the response data to the map. + let old = self.responses.insert(shard_id, response.page_images); + + if old.is_some() { + return Err(tonic::Status::internal(format!( + "duplicate response for shard {shard_id}", + ))); + } + + Ok(()) + } + + /// Assembles the shard responses into a single response. Responses must be present for all + /// relevant shards, and the total number of pages must match the original request. + #[allow(clippy::result_large_err)] + pub fn assemble_response(self) -> tonic::Result { + let mut response = page_api::GetPageResponse { + request_id: self.request_id, + status_code: page_api::GetPageStatusCode::Ok, + reason: None, + page_images: Vec::with_capacity(self.block_shards.len()), + }; + + // Set up per-shard page iterators we can pull from. + let mut shard_responses = HashMap::with_capacity(self.responses.len()); + for (shard_id, responses) in self.responses { + shard_responses.insert(shard_id, responses.into_iter()); + } + + // Reassemble the responses in the same order as the original request. + for shard_id in &self.block_shards { + let page = shard_responses + .get_mut(shard_id) + .ok_or_else(|| { + tonic::Status::internal(format!("missing response for shard {shard_id}")) + })? + .next() + .ok_or_else(|| { + tonic::Status::internal(format!("missing page from shard {shard_id}")) + })?; + response.page_images.push(page); + } + + // Make sure there are no additional pages. + for (shard_id, mut pages) in shard_responses { + if pages.next().is_some() { + return Err(tonic::Status::internal(format!( + "extra pages returned from shard {shard_id}" + ))); + } + } + + Ok(response) + } +}