mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-22 21:59:59 +00:00
page_api: add SplitError for GetPageSplitter (#12709)
Add a `SplitError` for `GetPageSplitter`, with an `Into<tonic::Status>` implementation. This avoids a bunch of boilerplate to convert `GetPageSplitter` errors into `tonic::Status`. Requires #12702. Touches [LKB-191](https://databricks.atlassian.net/browse/LKB-191).
This commit is contained in:
@@ -230,16 +230,14 @@ impl PageserverClient {
|
|||||||
) -> tonic::Result<page_api::GetPageResponse> {
|
) -> tonic::Result<page_api::GetPageResponse> {
|
||||||
// Fast path: request is for a single shard.
|
// Fast path: request is for a single shard.
|
||||||
if let Some(shard_id) =
|
if let Some(shard_id) =
|
||||||
GetPageSplitter::for_single_shard(&req, shards.count, shards.stripe_size)
|
GetPageSplitter::for_single_shard(&req, shards.count, shards.stripe_size)?
|
||||||
.map_err(|err| tonic::Status::internal(err.to_string()))?
|
|
||||||
{
|
{
|
||||||
return Self::get_page_with_shard(req, shards.get(shard_id)?).await;
|
return Self::get_page_with_shard(req, shards.get(shard_id)?).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Request spans multiple shards. Split it, dispatch concurrent per-shard requests, and
|
// Request spans multiple shards. Split it, dispatch concurrent per-shard requests, and
|
||||||
// reassemble the responses.
|
// reassemble the responses.
|
||||||
let mut splitter = GetPageSplitter::split(req, shards.count, shards.stripe_size)
|
let mut splitter = GetPageSplitter::split(req, shards.count, shards.stripe_size)?;
|
||||||
.map_err(|err| tonic::Status::internal(err.to_string()))?;
|
|
||||||
|
|
||||||
let mut shard_requests = FuturesUnordered::new();
|
let mut shard_requests = FuturesUnordered::new();
|
||||||
for (shard_id, shard_req) in splitter.drain_requests() {
|
for (shard_id, shard_req) in splitter.drain_requests() {
|
||||||
@@ -249,14 +247,10 @@ impl PageserverClient {
|
|||||||
}
|
}
|
||||||
|
|
||||||
while let Some((shard_id, shard_response)) = shard_requests.next().await.transpose()? {
|
while let Some((shard_id, shard_response)) = shard_requests.next().await.transpose()? {
|
||||||
splitter
|
splitter.add_response(shard_id, shard_response)?;
|
||||||
.add_response(shard_id, shard_response)
|
|
||||||
.map_err(|err| tonic::Status::internal(err.to_string()))?;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
splitter
|
Ok(splitter.collect_response()?)
|
||||||
.get_response()
|
|
||||||
.map_err(|err| tonic::Status::internal(err.to_string()))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Fetches pages on the given shard. Does not retry internally.
|
/// Fetches pages on the given shard. Does not retry internally.
|
||||||
|
|||||||
@@ -24,4 +24,4 @@ mod split;
|
|||||||
|
|
||||||
pub use client::Client;
|
pub use client::Client;
|
||||||
pub use model::*;
|
pub use model::*;
|
||||||
pub use split::GetPageSplitter;
|
pub use split::{GetPageSplitter, SplitError};
|
||||||
|
|||||||
@@ -1,6 +1,5 @@
|
|||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
|
|
||||||
use anyhow::anyhow;
|
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
|
|
||||||
use crate::model::*;
|
use crate::model::*;
|
||||||
@@ -27,19 +26,19 @@ impl GetPageSplitter {
|
|||||||
req: &GetPageRequest,
|
req: &GetPageRequest,
|
||||||
count: ShardCount,
|
count: ShardCount,
|
||||||
stripe_size: Option<ShardStripeSize>,
|
stripe_size: Option<ShardStripeSize>,
|
||||||
) -> anyhow::Result<Option<ShardIndex>> {
|
) -> Result<Option<ShardIndex>, SplitError> {
|
||||||
// Fast path: unsharded tenant.
|
// Fast path: unsharded tenant.
|
||||||
if count.is_unsharded() {
|
if count.is_unsharded() {
|
||||||
return Ok(Some(ShardIndex::unsharded()));
|
return Ok(Some(ShardIndex::unsharded()));
|
||||||
}
|
}
|
||||||
|
|
||||||
let Some(stripe_size) = stripe_size else {
|
let Some(stripe_size) = stripe_size else {
|
||||||
return Err(anyhow!("stripe size must be given for sharded tenants"));
|
return Err("stripe size must be given for sharded tenants".into());
|
||||||
};
|
};
|
||||||
|
|
||||||
// Find the first page's shard, for comparison.
|
// Find the first page's shard, for comparison.
|
||||||
let Some(&first_page) = req.block_numbers.first() else {
|
let Some(&first_page) = req.block_numbers.first() else {
|
||||||
return Err(anyhow!("no block numbers in request"));
|
return Err("no block numbers in request".into());
|
||||||
};
|
};
|
||||||
let key = rel_block_to_key(req.rel, first_page);
|
let key = rel_block_to_key(req.rel, first_page);
|
||||||
let shard_number = key_to_shard_number(count, stripe_size, &key);
|
let shard_number = key_to_shard_number(count, stripe_size, &key);
|
||||||
@@ -60,7 +59,7 @@ impl GetPageSplitter {
|
|||||||
req: GetPageRequest,
|
req: GetPageRequest,
|
||||||
count: ShardCount,
|
count: ShardCount,
|
||||||
stripe_size: Option<ShardStripeSize>,
|
stripe_size: Option<ShardStripeSize>,
|
||||||
) -> anyhow::Result<Self> {
|
) -> Result<Self, SplitError> {
|
||||||
// The caller should make sure we don't split requests unnecessarily.
|
// The caller should make sure we don't split requests unnecessarily.
|
||||||
debug_assert!(
|
debug_assert!(
|
||||||
Self::for_single_shard(&req, count, stripe_size)?.is_none(),
|
Self::for_single_shard(&req, count, stripe_size)?.is_none(),
|
||||||
@@ -68,10 +67,10 @@ impl GetPageSplitter {
|
|||||||
);
|
);
|
||||||
|
|
||||||
if count.is_unsharded() {
|
if count.is_unsharded() {
|
||||||
return Err(anyhow!("unsharded tenant, no point in splitting request"));
|
return Err("unsharded tenant, no point in splitting request".into());
|
||||||
}
|
}
|
||||||
let Some(stripe_size) = stripe_size else {
|
let Some(stripe_size) = stripe_size else {
|
||||||
return Err(anyhow!("stripe size must be given for sharded tenants"));
|
return Err("stripe size must be given for sharded tenants".into());
|
||||||
};
|
};
|
||||||
|
|
||||||
// Split the requests by shard index.
|
// Split the requests by shard index.
|
||||||
@@ -129,35 +128,32 @@ impl GetPageSplitter {
|
|||||||
|
|
||||||
/// Adds a response from the given shard. The response must match the request ID and have an OK
|
/// Adds a response from the given shard. The response must match the request ID and have an OK
|
||||||
/// status code. A response must not already exist for the given shard ID.
|
/// status code. A response must not already exist for the given shard ID.
|
||||||
#[allow(clippy::result_large_err)]
|
|
||||||
pub fn add_response(
|
pub fn add_response(
|
||||||
&mut self,
|
&mut self,
|
||||||
shard_id: ShardIndex,
|
shard_id: ShardIndex,
|
||||||
response: GetPageResponse,
|
response: GetPageResponse,
|
||||||
) -> anyhow::Result<()> {
|
) -> Result<(), SplitError> {
|
||||||
// The caller should already have converted status codes into tonic::Status.
|
// The caller should already have converted status codes into tonic::Status.
|
||||||
if response.status_code != GetPageStatusCode::Ok {
|
if response.status_code != GetPageStatusCode::Ok {
|
||||||
return Err(anyhow!(
|
return Err(SplitError(format!(
|
||||||
"unexpected non-OK response for shard {shard_id}: {} {}",
|
"unexpected non-OK response for shard {shard_id}: {} {}",
|
||||||
response.status_code,
|
response.status_code,
|
||||||
response.reason.unwrap_or_default()
|
response.reason.unwrap_or_default()
|
||||||
));
|
)));
|
||||||
}
|
}
|
||||||
|
|
||||||
if response.request_id != self.response.request_id {
|
if response.request_id != self.response.request_id {
|
||||||
return Err(anyhow!(
|
return Err(SplitError(format!(
|
||||||
"response ID mismatch for shard {shard_id}: expected {}, got {}",
|
"response ID mismatch for shard {shard_id}: expected {}, got {}",
|
||||||
self.response.request_id,
|
self.response.request_id, response.request_id
|
||||||
response.request_id
|
)));
|
||||||
));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if response.request_id != self.response.request_id {
|
if response.request_id != self.response.request_id {
|
||||||
return Err(anyhow!(
|
return Err(SplitError(format!(
|
||||||
"response ID mismatch for shard {shard_id}: expected {}, got {}",
|
"response ID mismatch for shard {shard_id}: expected {}, got {}",
|
||||||
self.response.request_id,
|
self.response.request_id, response.request_id
|
||||||
response.request_id
|
)));
|
||||||
));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Place the shard response pages into the assembled response, in request order.
|
// Place the shard response pages into the assembled response, in request order.
|
||||||
@@ -169,26 +165,27 @@ impl GetPageSplitter {
|
|||||||
}
|
}
|
||||||
|
|
||||||
let Some(slot) = self.response.pages.get_mut(i) else {
|
let Some(slot) = self.response.pages.get_mut(i) else {
|
||||||
return Err(anyhow!("no block_shards slot {i} for shard {shard_id}"));
|
return Err(SplitError(format!(
|
||||||
|
"no block_shards slot {i} for shard {shard_id}"
|
||||||
|
)));
|
||||||
};
|
};
|
||||||
let Some(page) = pages.next() else {
|
let Some(page) = pages.next() else {
|
||||||
return Err(anyhow!(
|
return Err(SplitError(format!(
|
||||||
"missing page {} in shard {shard_id} response",
|
"missing page {} in shard {shard_id} response",
|
||||||
slot.block_number
|
slot.block_number
|
||||||
));
|
)));
|
||||||
};
|
};
|
||||||
if page.block_number != slot.block_number {
|
if page.block_number != slot.block_number {
|
||||||
return Err(anyhow!(
|
return Err(SplitError(format!(
|
||||||
"shard {shard_id} returned wrong page at index {i}, expected {} got {}",
|
"shard {shard_id} returned wrong page at index {i}, expected {} got {}",
|
||||||
slot.block_number,
|
slot.block_number, page.block_number
|
||||||
page.block_number
|
)));
|
||||||
));
|
|
||||||
}
|
}
|
||||||
if !slot.image.is_empty() {
|
if !slot.image.is_empty() {
|
||||||
return Err(anyhow!(
|
return Err(SplitError(format!(
|
||||||
"shard {shard_id} returned duplicate page {} at index {i}",
|
"shard {shard_id} returned duplicate page {} at index {i}",
|
||||||
slot.block_number
|
slot.block_number
|
||||||
));
|
)));
|
||||||
}
|
}
|
||||||
|
|
||||||
*slot = page;
|
*slot = page;
|
||||||
@@ -196,32 +193,54 @@ impl GetPageSplitter {
|
|||||||
|
|
||||||
// Make sure we've consumed all pages from the shard response.
|
// Make sure we've consumed all pages from the shard response.
|
||||||
if let Some(extra_page) = pages.next() {
|
if let Some(extra_page) = pages.next() {
|
||||||
return Err(anyhow!(
|
return Err(SplitError(format!(
|
||||||
"shard {shard_id} returned extra page: {}",
|
"shard {shard_id} returned extra page: {}",
|
||||||
extra_page.block_number
|
extra_page.block_number
|
||||||
));
|
)));
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Fetches the final, assembled response.
|
/// Collects the final, assembled response.
|
||||||
#[allow(clippy::result_large_err)]
|
pub fn collect_response(self) -> Result<GetPageResponse, SplitError> {
|
||||||
pub fn get_response(self) -> anyhow::Result<GetPageResponse> {
|
|
||||||
// Check that the response is complete.
|
// Check that the response is complete.
|
||||||
for (i, page) in self.response.pages.iter().enumerate() {
|
for (i, page) in self.response.pages.iter().enumerate() {
|
||||||
if page.image.is_empty() {
|
if page.image.is_empty() {
|
||||||
return Err(anyhow!(
|
return Err(SplitError(format!(
|
||||||
"missing page {} for shard {}",
|
"missing page {} for shard {}",
|
||||||
page.block_number,
|
page.block_number,
|
||||||
self.block_shards
|
self.block_shards
|
||||||
.get(i)
|
.get(i)
|
||||||
.map(|s| s.to_string())
|
.map(|s| s.to_string())
|
||||||
.unwrap_or_else(|| "?".to_string())
|
.unwrap_or_else(|| "?".to_string())
|
||||||
));
|
)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(self.response)
|
Ok(self.response)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// A GetPageSplitter error.
|
||||||
|
#[derive(Debug, thiserror::Error)]
|
||||||
|
#[error("{0}")]
|
||||||
|
pub struct SplitError(String);
|
||||||
|
|
||||||
|
impl From<&str> for SplitError {
|
||||||
|
fn from(err: &str) -> Self {
|
||||||
|
SplitError(err.to_string())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<String> for SplitError {
|
||||||
|
fn from(err: String) -> Self {
|
||||||
|
SplitError(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<SplitError> for tonic::Status {
|
||||||
|
fn from(err: SplitError) -> Self {
|
||||||
|
tonic::Status::internal(err.0)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -3685,8 +3685,7 @@ impl GrpcPageServiceHandler {
|
|||||||
|
|
||||||
// Fast path: the request fits in a single shard.
|
// Fast path: the request fits in a single shard.
|
||||||
if let Some(shard_index) =
|
if let Some(shard_index) =
|
||||||
GetPageSplitter::for_single_shard(&req, shard_id.count, Some(shard_id.stripe_size))
|
GetPageSplitter::for_single_shard(&req, shard_id.count, Some(shard_id.stripe_size))?
|
||||||
.map_err(|err| tonic::Status::internal(err.to_string()))?
|
|
||||||
{
|
{
|
||||||
// We got the shard ID from the first page, so these must be equal.
|
// We got the shard ID from the first page, so these must be equal.
|
||||||
assert_eq!(shard_index.shard_number, shard_id.number);
|
assert_eq!(shard_index.shard_number, shard_id.number);
|
||||||
@@ -3697,8 +3696,7 @@ impl GrpcPageServiceHandler {
|
|||||||
// The request spans multiple shards; split it and dispatch parallel requests. All pages
|
// The request spans multiple shards; split it and dispatch parallel requests. All pages
|
||||||
// were originally in the parent shard, and during a split all children are local, so we
|
// were originally in the parent shard, and during a split all children are local, so we
|
||||||
// expect to find local shards for all pages.
|
// expect to find local shards for all pages.
|
||||||
let mut splitter = GetPageSplitter::split(req, shard_id.count, Some(shard_id.stripe_size))
|
let mut splitter = GetPageSplitter::split(req, shard_id.count, Some(shard_id.stripe_size))?;
|
||||||
.map_err(|err| tonic::Status::internal(err.to_string()))?;
|
|
||||||
|
|
||||||
let mut shard_requests = FuturesUnordered::new();
|
let mut shard_requests = FuturesUnordered::new();
|
||||||
for (shard_index, shard_req) in splitter.drain_requests() {
|
for (shard_index, shard_req) in splitter.drain_requests() {
|
||||||
@@ -3717,14 +3715,10 @@ impl GrpcPageServiceHandler {
|
|||||||
}
|
}
|
||||||
|
|
||||||
while let Some((shard_index, shard_response)) = shard_requests.next().await.transpose()? {
|
while let Some((shard_index, shard_response)) = shard_requests.next().await.transpose()? {
|
||||||
splitter
|
splitter.add_response(shard_index, shard_response)?;
|
||||||
.add_response(shard_index, shard_response)
|
|
||||||
.map_err(|err| tonic::Status::internal(err.to_string()))?;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
splitter
|
Ok(splitter.collect_response()?)
|
||||||
.get_response()
|
|
||||||
.map_err(|err| tonic::Status::internal(err.to_string()))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user