diff --git a/pageserver/client_grpc/src/client.rs b/pageserver/client_grpc/src/client.rs index 393f89819a..7049fbdb96 100644 --- a/pageserver/client_grpc/src/client.rs +++ b/pageserver/client_grpc/src/client.rs @@ -218,7 +218,7 @@ impl PageserverClient { ) -> tonic::Result { // Fast path: request is for a single shard. if let Some(shard_id) = - GetPageSplitter::is_single_shard(&req, shards.count, shards.stripe_size) + GetPageSplitter::for_single_shard(&req, shards.count, shards.stripe_size) { return Self::get_page_with_shard(req, shards.get(shard_id)?).await; } @@ -238,7 +238,7 @@ impl PageserverClient { splitter.add_response(shard_id, shard_response)?; } - splitter.assemble_response() + splitter.get_response() } /// Fetches pages on the given shard. Does not retry internally. @@ -246,9 +246,8 @@ impl PageserverClient { req: page_api::GetPageRequest, shard: &Shard, ) -> tonic::Result { - let expected = req.block_numbers.len(); let stream = shard.stream(req.request_class.is_bulk()).await; - let resp = stream.send(req).await?; + let resp = stream.send(req.clone()).await?; // Convert per-request errors into a tonic::Status. if resp.status_code != page_api::GetPageStatusCode::Ok { @@ -258,11 +257,27 @@ impl PageserverClient { )); } - // Check that we received the expected number of pages. - let actual = resp.page_images.len(); - if expected != actual { + // Check that we received the expected pages. + if req.rel != resp.rel { return Err(tonic::Status::internal(format!( - "expected {expected} pages, got {actual}", + "shard {} returned wrong relation, expected {} got {}", + shard.id, req.rel, resp.rel + ))); + } + if !req + .block_numbers + .iter() + .copied() + .eq(resp.pages.iter().map(|p| p.block_number)) + { + return Err(tonic::Status::internal(format!( + "shard {} returned wrong pages, expected {:?} got {:?}", + shard.id, + req.block_numbers, + resp.pages + .iter() + .map(|page| page.block_number) + .collect::>() ))); } @@ -435,6 +450,8 @@ impl Shards { /// * Bulk client pool: unbounded. /// * Bulk stream pool: MAX_BULK_STREAMS and MAX_BULK_STREAM_QUEUE_DEPTH. struct Shard { + /// The shard ID. + id: ShardIndex, /// Unary gRPC client pool. client_pool: Arc, /// GetPage stream pool. @@ -500,6 +517,7 @@ impl Shard { ); Ok(Self { + id: shard_id, client_pool, stream_pool, bulk_stream_pool, diff --git a/pageserver/client_grpc/src/split.rs b/pageserver/client_grpc/src/split.rs index 57c9299b96..b7539b900c 100644 --- a/pageserver/client_grpc/src/split.rs +++ b/pageserver/client_grpc/src/split.rs @@ -5,27 +5,24 @@ use bytes::Bytes; use pageserver_api::key::rel_block_to_key; use pageserver_api::shard::{ShardStripeSize, key_to_shard_number}; use pageserver_page_api as page_api; -use utils::shard::{ShardCount, ShardIndex}; +use utils::shard::{ShardCount, ShardIndex, ShardNumber}; /// Splits GetPageRequests that straddle shard boundaries and assembles the responses. /// TODO: add tests for this. pub struct GetPageSplitter { - /// The original request ID. Used for all shard requests. - request_id: page_api::RequestID, /// Split requests by shard index. requests: HashMap, - /// Maps the offset in `GetPageRequest::block_numbers` to the owning shard. Used to assemble - /// the response pages in the same order as the original request. + /// The response being assembled. Preallocated with empty pages, to be filled in. + response: page_api::GetPageResponse, + /// Maps the offset in `request.block_numbers` and `response.pages` to the owning shard. Used + /// to assemble the response pages in the same order as the original request. block_shards: Vec, - /// Page responses by shard index. Will be assembled into a single response. - responses: HashMap>, } impl GetPageSplitter { /// Checks if the given request only touches a single shard, and returns the shard ID. This is /// the common case, so we check first in order to avoid unnecessary allocations and overhead. - /// The caller must ensure that the request has at least one block number, or this will panic. - pub fn is_single_shard( + pub fn for_single_shard( req: &page_api::GetPageRequest, count: ShardCount, stripe_size: ShardStripeSize, @@ -35,8 +32,12 @@ impl GetPageSplitter { return Some(ShardIndex::unsharded()); } - // Find the base shard index for the first page, and compare with the rest. - let key = rel_block_to_key(req.rel, *req.block_numbers.first().expect("no pages")); + // Find the first page's shard, for comparison. If there are no pages, just return the first + // shard (caller likely checked already, otherwise the server will reject it). + let Some(&first_page) = req.block_numbers.first() else { + return Some(ShardIndex::new(ShardNumber(0), count)); + }; + let key = rel_block_to_key(req.rel, first_page); let shard_number = key_to_shard_number(count, stripe_size, &key); req.block_numbers @@ -57,19 +58,19 @@ impl GetPageSplitter { ) -> Self { // The caller should make sure we don't split requests unnecessarily. debug_assert!( - Self::is_single_shard(&req, count, stripe_size).is_none(), + Self::for_single_shard(&req, count, stripe_size).is_none(), "unnecessary request split" ); // Split the requests by shard index. let mut requests = HashMap::with_capacity(2); // common case let mut block_shards = Vec::with_capacity(req.block_numbers.len()); - for blkno in req.block_numbers { + for &blkno in &req.block_numbers { let key = rel_block_to_key(req.rel, blkno); let shard_number = key_to_shard_number(count, stripe_size, &key); let shard_id = ShardIndex::new(shard_number, count); - let shard_req = requests + requests .entry(shard_id) .or_insert_with(|| page_api::GetPageRequest { request_id: req.request_id, @@ -77,20 +78,39 @@ impl GetPageSplitter { rel: req.rel, read_lsn: req.read_lsn, block_numbers: Vec::new(), - }); - shard_req.block_numbers.push(blkno); + }) + .block_numbers + .push(blkno); block_shards.push(shard_id); } - Self { + // Construct a response to be populated by shard responses. Preallocate empty page slots + // with the expected block numbers. + let response = page_api::GetPageResponse { request_id: req.request_id, - responses: HashMap::with_capacity(requests.len()), + status_code: page_api::GetPageStatusCode::Ok, + reason: None, + rel: req.rel, + pages: req + .block_numbers + .into_iter() + .map(|block_number| { + page_api::Page { + block_number, + image: Bytes::new(), // empty page slot to be filled in + } + }) + .collect(), + }; + + Self { requests, + response, block_shards, } } - /// Drains the per-shard requests, moving them out of the hashmap to avoid extra allocations. + /// Drains the per-shard requests, moving them out of the splitter to avoid extra allocations. pub fn drain_requests( &mut self, ) -> impl Iterator { @@ -108,72 +128,82 @@ impl GetPageSplitter { // The caller should already have converted status codes into tonic::Status. if response.status_code != page_api::GetPageStatusCode::Ok { return Err(tonic::Status::internal(format!( - "unexpected non-OK response for shard {shard_id}: {:?}", - response.status_code + "unexpected non-OK response for shard {shard_id}: {} {}", + response.status_code, + response.reason.unwrap_or_default() ))); } - // The stream pool ensures the response matches the request ID. - if response.request_id != self.request_id { + if response.request_id != self.response.request_id { return Err(tonic::Status::internal(format!( "response ID mismatch for shard {shard_id}: expected {}, got {}", - self.request_id, response.request_id + self.response.request_id, response.request_id ))); } - // We only dispatch one request per shard. - if self.responses.contains_key(&shard_id) { + // Place the shard response pages into the assembled response, in request order. + let mut pages = response.pages.into_iter(); + + for (i, &s) in self.block_shards.iter().enumerate() { + if shard_id != s { + continue; + } + + let Some(slot) = self.response.pages.get_mut(i) else { + return Err(tonic::Status::internal(format!( + "no block_shards slot {i} for shard {shard_id}" + ))); + }; + let Some(page) = pages.next() else { + return Err(tonic::Status::internal(format!( + "missing page {} in shard {shard_id} response", + slot.block_number + ))); + }; + if page.block_number != slot.block_number { + return Err(tonic::Status::internal(format!( + "shard {shard_id} returned wrong page at index {i}, expected {} got {}", + slot.block_number, page.block_number + ))); + } + if !slot.image.is_empty() { + return Err(tonic::Status::internal(format!( + "shard {shard_id} returned duplicate page {} at index {i}", + slot.block_number + ))); + } + + *slot = page; + } + + // Make sure we've consumed all pages from the shard response. + if let Some(extra_page) = pages.next() { return Err(tonic::Status::internal(format!( - "duplicate response for shard {shard_id}" + "shard {shard_id} returned extra page: {}", + extra_page.block_number ))); } - // Add the response data to the map. - self.responses.insert(shard_id, response.page_images); - Ok(()) } - /// Assembles the shard responses into a single response. Responses must be present for all - /// relevant shards, and the total number of pages must match the original request. + /// Fetches the final, assembled response. #[allow(clippy::result_large_err)] - pub fn assemble_response(self) -> tonic::Result { - let mut response = page_api::GetPageResponse { - request_id: self.request_id, - status_code: page_api::GetPageStatusCode::Ok, - reason: None, - page_images: Vec::with_capacity(self.block_shards.len()), - }; - - // Set up per-shard page iterators we can pull from. - let mut shard_responses = HashMap::with_capacity(self.responses.len()); - for (shard_id, responses) in self.responses { - shard_responses.insert(shard_id, responses.into_iter()); - } - - // Reassemble the responses in the same order as the original request. - for shard_id in &self.block_shards { - let page = shard_responses - .get_mut(shard_id) - .ok_or_else(|| { - tonic::Status::internal(format!("missing response for shard {shard_id}")) - })? - .next() - .ok_or_else(|| { - tonic::Status::internal(format!("missing page from shard {shard_id}")) - })?; - response.page_images.push(page); - } - - // Make sure there are no additional pages. - for (shard_id, mut pages) in shard_responses { - if pages.next().is_some() { + pub fn get_response(self) -> tonic::Result { + // Check that the response is complete. + for (i, page) in self.response.pages.iter().enumerate() { + if page.image.is_empty() { return Err(tonic::Status::internal(format!( - "extra pages returned from shard {shard_id}" + "missing page {} for shard {}", + page.block_number, + self.block_shards + .get(i) + .map(|s| s.to_string()) + .unwrap_or_else(|| "?".to_string()) ))); } } - Ok(response) + Ok(self.response) } } diff --git a/pageserver/page_api/proto/page_service.proto b/pageserver/page_api/proto/page_service.proto index b1f266d910..d113a04a42 100644 --- a/pageserver/page_api/proto/page_service.proto +++ b/pageserver/page_api/proto/page_service.proto @@ -208,12 +208,25 @@ enum GetPageClass { message GetPageResponse { // The original request's ID. RequestID request_id = 1; - // The response status code. + // The response status code. If not OK, the rel and page fields will be empty. GetPageStatusCode status_code = 2; // A string describing the status, if any. string reason = 3; - // The 8KB page images, in the same order as the request. Empty if status_code != OK. - repeated bytes page_image = 4; + // The relation that the pages belong to. + RelTag rel = 4; + // The page(s), in the same order as the request. + repeated Page page = 5; +} + +// A page. +// +// TODO: it would be slightly more efficient (but less convenient) to have separate arrays of block +// numbers and images, but given the 8KB page size it's probably negligible. Benchmark it anyway. +message Page { + // The page number. + uint32 block_number = 1; + // The materialized page image, as an 8KB byte vector. + bytes image = 2; } // A GetPageResponse status code. diff --git a/pageserver/page_api/src/client.rs b/pageserver/page_api/src/client.rs index 6523d00d3d..f70d0e7b28 100644 --- a/pageserver/page_api/src/client.rs +++ b/pageserver/page_api/src/client.rs @@ -1,4 +1,5 @@ use anyhow::Context as _; +use futures::future::ready; use futures::{Stream, StreamExt as _, TryStreamExt as _}; use tokio::io::AsyncRead; use tokio_util::io::StreamReader; @@ -110,7 +111,7 @@ impl Client { ) -> tonic::Result> + Send + 'static> { let reqs = reqs.map(proto::GetPageRequest::from); let resps = self.inner.get_pages(reqs).await?.into_inner(); - Ok(resps.map_ok(GetPageResponse::from)) + Ok(resps.and_then(|resp| ready(GetPageResponse::try_from(resp).map_err(|err| err.into())))) } /// Returns the size of a relation, as # of blocks. diff --git a/pageserver/page_api/src/model.rs b/pageserver/page_api/src/model.rs index 4db8237ad8..a9dd154285 100644 --- a/pageserver/page_api/src/model.rs +++ b/pageserver/page_api/src/model.rs @@ -502,22 +502,30 @@ impl From for i32 { pub struct GetPageResponse { /// The original request's ID. pub request_id: RequestID, - /// The response status code. + /// The response status code. If not OK, the `rel` and `pages` fields will be empty. pub status_code: GetPageStatusCode, /// A string describing the status, if any. pub reason: Option, - /// The 8KB page images, in the same order as the request. Empty if status != OK. - pub page_images: Vec, + /// The relation that the pages belong to. + pub rel: RelTag, + // The page(s), in the same order as the request. + pub pages: Vec, } -impl From for GetPageResponse { - fn from(pb: proto::GetPageResponse) -> Self { - Self { - request_id: pb.request_id.unwrap_or_default().into(), +impl TryFrom for GetPageResponse { + type Error = ProtocolError; + + fn try_from(pb: proto::GetPageResponse) -> Result { + Ok(Self { + request_id: pb + .request_id + .ok_or(ProtocolError::Missing("request_id"))? + .into(), status_code: pb.status_code.into(), reason: Some(pb.reason).filter(|r| !r.is_empty()), - page_images: pb.page_image, - } + rel: pb.rel.ok_or(ProtocolError::Missing("rel"))?.try_into()?, + pages: pb.page.into_iter().map(Page::from).collect(), + }) } } @@ -527,7 +535,8 @@ impl From for proto::GetPageResponse { request_id: Some(response.request_id.into()), status_code: response.status_code.into(), reason: response.reason.unwrap_or_default(), - page_image: response.page_images, + rel: Some(response.rel.into()), + page: response.pages.into_iter().map(proto::Page::from).collect(), } } } @@ -560,11 +569,39 @@ impl GetPageResponse { request_id, status_code, reason: Some(status.message().to_string()), - page_images: Vec::new(), + rel: RelTag::default(), + pages: Vec::new(), }) } } +// A page. +#[derive(Clone, Debug)] +pub struct Page { + /// The page number. + pub block_number: u32, + /// The materialized page image, as an 8KB byte vector. + pub image: Bytes, +} + +impl From for Page { + fn from(pb: proto::Page) -> Self { + Self { + block_number: pb.block_number, + image: pb.image, + } + } +} + +impl From for proto::Page { + fn from(page: Page) -> Self { + Self { + block_number: page.block_number, + image: page.image, + } + } +} + /// A GetPage response status code. /// /// These are effectively equivalent to gRPC statuses. However, we use a bidirectional stream diff --git a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs index b5c191e29a..30b30d36f6 100644 --- a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs +++ b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs @@ -694,7 +694,10 @@ impl Client for GrpcClient { "unexpected status code: {}", resp.status_code, ); - Ok((resp.request_id.id, resp.page_images)) + Ok(( + resp.request_id.id, + resp.pages.into_iter().map(|p| p.image).collect(), + )) } } @@ -761,6 +764,9 @@ impl Client for RichGrpcClient { async fn recv_get_page(&mut self) -> anyhow::Result<(u64, Vec)> { let resp = self.requests.next().await.unwrap()?; - Ok((resp.request_id.id, resp.page_images)) + Ok(( + resp.request_id.id, + resp.pages.into_iter().map(|p| p.image).collect(), + )) } } diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index b2f6cd465d..1fc7e4eac7 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -3483,12 +3483,16 @@ impl GrpcPageServiceHandler { request_id: req.request_id, status_code: page_api::GetPageStatusCode::Ok, reason: None, - page_images: Vec::with_capacity(results.len()), + rel: req.rel, + pages: Vec::with_capacity(results.len()), }; for result in results { match result { - Ok((PagestreamBeMessage::GetPage(r), _, _)) => resp.page_images.push(r.page), + Ok((PagestreamBeMessage::GetPage(r), _, _)) => resp.pages.push(page_api::Page { + block_number: r.req.blkno, + image: r.page, + }), Ok((resp, _, _)) => { return Err(tonic::Status::internal(format!( "unexpected response: {resp:?}"