mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-10 15:02:56 +00:00
Tweak GetPageSplitter
This commit is contained in:
@@ -103,11 +103,11 @@ impl PageserverClient {
|
||||
return self.get_page_for_shard(shard_id, req).await;
|
||||
}
|
||||
|
||||
// Slow path: request spans multiple shards. Split it, dispatch per-shard requests in
|
||||
// parallel, and reassemble the responses.
|
||||
// Request spans multiple shards. Split it, dispatch concurrent per-shard requests, and
|
||||
// reassemble the responses.
|
||||
//
|
||||
// TODO: when we add shard map updates, we need to detect that case and re-split the
|
||||
// request on errors.
|
||||
// TODO: when we support shard map updates, we need to detect when it changes and re-split
|
||||
// the request on errors.
|
||||
let mut splitter = GetPageSplitter::split(req, self.shards.count, self.shards.stripe_size);
|
||||
|
||||
let mut shard_requests: FuturesUnordered<_> = splitter
|
||||
@@ -123,7 +123,7 @@ impl PageserverClient {
|
||||
splitter.add_response(shard_id, shard_response)?;
|
||||
}
|
||||
|
||||
splitter.reassemble()
|
||||
splitter.assemble_response()
|
||||
}
|
||||
|
||||
/// Fetches pages that belong to the given shard.
|
||||
|
||||
@@ -7,23 +7,23 @@ use pageserver_api::shard::{ShardStripeSize, key_to_shard_number};
|
||||
use pageserver_page_api as page_api;
|
||||
use utils::shard::{ShardCount, ShardIndex};
|
||||
|
||||
/// Splits GetPageRequests across shard boundaries and reassembles the responses.
|
||||
/// Splits GetPageRequests that straddle shard boundaries and assembles the responses.
|
||||
/// TODO: add tests for this.
|
||||
pub struct GetPageSplitter {
|
||||
/// The original request ID. Used for all shard requests.
|
||||
request_id: page_api::RequestID,
|
||||
/// Requests by shard index.
|
||||
/// Split requests by shard index.
|
||||
requests: HashMap<ShardIndex, page_api::GetPageRequest>,
|
||||
/// Maps the page offset in the input request (index) to the shard index. This is used to
|
||||
/// reassemble the responses in the same order as the original request.
|
||||
/// Maps the offset in `GetPageRequest::block_numbers` to the owning shard. Used to assemble
|
||||
/// the response pages in the same order as the original request.
|
||||
block_shards: Vec<ShardIndex>,
|
||||
/// Page responses by shard index. Will be reassembled into a single response.
|
||||
/// Page responses by shard index. Will be assembled into a single response.
|
||||
responses: HashMap<ShardIndex, Vec<Bytes>>,
|
||||
}
|
||||
|
||||
impl GetPageSplitter {
|
||||
/// Checks if the given request belongs to a single shard, and returns the shard ID. This is the
|
||||
/// common case, so we do a full scan in order to avoid unnecessary allocations and overhead.
|
||||
/// Checks if the given request only touches a single shard, and returns the shard ID. This is
|
||||
/// the common case, so we check first in order to avoid unnecessary allocations and overhead.
|
||||
/// The caller must ensure that the request has at least one block number, or this will panic.
|
||||
pub fn is_single_shard(
|
||||
req: &page_api::GetPageRequest,
|
||||
@@ -57,7 +57,7 @@ impl GetPageSplitter {
|
||||
) -> Self {
|
||||
// The caller should make sure we don't split requests unnecessarily.
|
||||
debug_assert!(
|
||||
Self::is_single_shard(&req, count, stripe_size).is_some(),
|
||||
Self::is_single_shard(&req, count, stripe_size).is_none(),
|
||||
"unnecessary request split"
|
||||
);
|
||||
|
||||
@@ -97,7 +97,7 @@ impl GetPageSplitter {
|
||||
self.requests.drain()
|
||||
}
|
||||
|
||||
/// Adds a response for the given shard index.
|
||||
/// Adds a response from the given shard.
|
||||
#[allow(clippy::result_large_err)]
|
||||
pub fn add_response(
|
||||
&mut self,
|
||||
@@ -107,7 +107,7 @@ impl GetPageSplitter {
|
||||
// The caller should already have converted status codes into tonic::Status.
|
||||
assert_eq!(response.status_code, page_api::GetPageStatusCode::Ok);
|
||||
|
||||
// Ensure the response is for the same request ID.
|
||||
// Make sure the response matches the request ID.
|
||||
if response.request_id != self.request_id {
|
||||
return Err(tonic::Status::internal(format!(
|
||||
"response ID {} does not match request ID {}",
|
||||
@@ -117,14 +117,20 @@ impl GetPageSplitter {
|
||||
|
||||
// Add the response data to the map.
|
||||
let old = self.responses.insert(shard_id, response.page_images);
|
||||
assert!(old.is_none(), "duplicate response for shard {shard_id}");
|
||||
|
||||
if old.is_some() {
|
||||
return Err(tonic::Status::internal(format!(
|
||||
"duplicate response for shard {shard_id}",
|
||||
)));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Reassembles the shard responses into a single response.
|
||||
/// Assembles the shard responses into a single response. Responses must be present for all
|
||||
/// relevant shards, and the total number of pages must match the original request.
|
||||
#[allow(clippy::result_large_err)]
|
||||
pub fn reassemble(self) -> tonic::Result<page_api::GetPageResponse> {
|
||||
pub fn assemble_response(self) -> tonic::Result<page_api::GetPageResponse> {
|
||||
let mut response = page_api::GetPageResponse {
|
||||
request_id: self.request_id,
|
||||
status_code: page_api::GetPageStatusCode::Ok,
|
||||
@@ -132,7 +138,7 @@ impl GetPageSplitter {
|
||||
page_images: Vec::with_capacity(self.block_shards.len()),
|
||||
};
|
||||
|
||||
// Convert the shard responses into iterators we can conveniently pull from.
|
||||
// Set up per-shard page iterators we can pull from.
|
||||
let mut shard_responses = HashMap::with_capacity(self.responses.len());
|
||||
for (shard_id, responses) in self.responses {
|
||||
shard_responses.insert(shard_id, responses.into_iter());
|
||||
@@ -152,7 +158,7 @@ impl GetPageSplitter {
|
||||
response.page_images.push(page);
|
||||
}
|
||||
|
||||
// Make sure we didn't get any additional pages.
|
||||
// Make sure there are no additional pages.
|
||||
for (shard_id, mut pages) in shard_responses {
|
||||
if pages.next().is_some() {
|
||||
return Err(tonic::Status::internal(format!(
|
||||
|
||||
Reference in New Issue
Block a user