Merge branch 'main' into communicator-rewrite

This commit is contained in:
Erik Grinaker
2025-07-29 22:05:16 +02:00
22 changed files with 759 additions and 233 deletions

View File

@@ -230,16 +230,14 @@ impl PageserverClient {
) -> tonic::Result<page_api::GetPageResponse> {
// Fast path: request is for a single shard.
if let Some(shard_id) =
GetPageSplitter::for_single_shard(&req, shards.count, shards.stripe_size)
.map_err(|err| tonic::Status::internal(err.to_string()))?
GetPageSplitter::for_single_shard(&req, shards.count, shards.stripe_size)?
{
return Self::get_page_with_shard(req, shards.get(shard_id)?).await;
}
// Request spans multiple shards. Split it, dispatch concurrent per-shard requests, and
// reassemble the responses.
let mut splitter = GetPageSplitter::split(req, shards.count, shards.stripe_size)
.map_err(|err| tonic::Status::internal(err.to_string()))?;
let mut splitter = GetPageSplitter::split(req, shards.count, shards.stripe_size)?;
let mut shard_requests = FuturesUnordered::new();
for (shard_id, shard_req) in splitter.drain_requests() {
@@ -249,14 +247,10 @@ impl PageserverClient {
}
while let Some((shard_id, shard_response)) = shard_requests.next().await.transpose()? {
splitter
.add_response(shard_id, shard_response)
.map_err(|err| tonic::Status::internal(err.to_string()))?;
splitter.add_response(shard_id, shard_response)?;
}
splitter
.get_response()
.map_err(|err| tonic::Status::internal(err.to_string()))
Ok(splitter.collect_response()?)
}
/// Fetches pages on the given shard. Does not retry internally.

View File

@@ -24,4 +24,4 @@ mod split;
pub use client::Client;
pub use model::*;
pub use split::GetPageSplitter;
pub use split::{GetPageSplitter, SplitError};

View File

@@ -1,6 +1,5 @@
use std::collections::HashMap;
use anyhow::anyhow;
use bytes::Bytes;
use crate::model::*;
@@ -27,19 +26,19 @@ impl GetPageSplitter {
req: &GetPageRequest,
count: ShardCount,
stripe_size: Option<ShardStripeSize>,
) -> anyhow::Result<Option<ShardIndex>> {
) -> Result<Option<ShardIndex>, SplitError> {
// Fast path: unsharded tenant.
if count.is_unsharded() {
return Ok(Some(ShardIndex::unsharded()));
}
let Some(stripe_size) = stripe_size else {
return Err(anyhow!("stripe size must be given for sharded tenants"));
return Err("stripe size must be given for sharded tenants".into());
};
// Find the first page's shard, for comparison.
let Some(&first_page) = req.block_numbers.first() else {
return Err(anyhow!("no block numbers in request"));
return Err("no block numbers in request".into());
};
let key = rel_block_to_key(req.rel, first_page);
let shard_number = key_to_shard_number(count, stripe_size, &key);
@@ -60,7 +59,7 @@ impl GetPageSplitter {
req: GetPageRequest,
count: ShardCount,
stripe_size: Option<ShardStripeSize>,
) -> anyhow::Result<Self> {
) -> Result<Self, SplitError> {
// The caller should make sure we don't split requests unnecessarily.
debug_assert!(
Self::for_single_shard(&req, count, stripe_size)?.is_none(),
@@ -68,10 +67,10 @@ impl GetPageSplitter {
);
if count.is_unsharded() {
return Err(anyhow!("unsharded tenant, no point in splitting request"));
return Err("unsharded tenant, no point in splitting request".into());
}
let Some(stripe_size) = stripe_size else {
return Err(anyhow!("stripe size must be given for sharded tenants"));
return Err("stripe size must be given for sharded tenants".into());
};
// Split the requests by shard index.
@@ -129,35 +128,32 @@ impl GetPageSplitter {
/// Adds a response from the given shard. The response must match the request ID and have an OK
/// status code. A response must not already exist for the given shard ID.
#[allow(clippy::result_large_err)]
pub fn add_response(
&mut self,
shard_id: ShardIndex,
response: GetPageResponse,
) -> anyhow::Result<()> {
) -> Result<(), SplitError> {
// The caller should already have converted status codes into tonic::Status.
if response.status_code != GetPageStatusCode::Ok {
return Err(anyhow!(
return Err(SplitError(format!(
"unexpected non-OK response for shard {shard_id}: {} {}",
response.status_code,
response.reason.unwrap_or_default()
));
)));
}
if response.request_id != self.response.request_id {
return Err(anyhow!(
return Err(SplitError(format!(
"response ID mismatch for shard {shard_id}: expected {}, got {}",
self.response.request_id,
response.request_id
));
self.response.request_id, response.request_id
)));
}
if response.request_id != self.response.request_id {
return Err(anyhow!(
return Err(SplitError(format!(
"response ID mismatch for shard {shard_id}: expected {}, got {}",
self.response.request_id,
response.request_id
));
self.response.request_id, response.request_id
)));
}
// Place the shard response pages into the assembled response, in request order.
@@ -169,26 +165,27 @@ impl GetPageSplitter {
}
let Some(slot) = self.response.pages.get_mut(i) else {
return Err(anyhow!("no block_shards slot {i} for shard {shard_id}"));
return Err(SplitError(format!(
"no block_shards slot {i} for shard {shard_id}"
)));
};
let Some(page) = pages.next() else {
return Err(anyhow!(
return Err(SplitError(format!(
"missing page {} in shard {shard_id} response",
slot.block_number
));
)));
};
if page.block_number != slot.block_number {
return Err(anyhow!(
return Err(SplitError(format!(
"shard {shard_id} returned wrong page at index {i}, expected {} got {}",
slot.block_number,
page.block_number
));
slot.block_number, page.block_number
)));
}
if !slot.image.is_empty() {
return Err(anyhow!(
return Err(SplitError(format!(
"shard {shard_id} returned duplicate page {} at index {i}",
slot.block_number
));
)));
}
*slot = page;
@@ -196,32 +193,54 @@ impl GetPageSplitter {
// Make sure we've consumed all pages from the shard response.
if let Some(extra_page) = pages.next() {
return Err(anyhow!(
return Err(SplitError(format!(
"shard {shard_id} returned extra page: {}",
extra_page.block_number
));
)));
}
Ok(())
}
/// Fetches the final, assembled response.
#[allow(clippy::result_large_err)]
pub fn get_response(self) -> anyhow::Result<GetPageResponse> {
/// Collects the final, assembled response.
pub fn collect_response(self) -> Result<GetPageResponse, SplitError> {
// Check that the response is complete.
for (i, page) in self.response.pages.iter().enumerate() {
if page.image.is_empty() {
return Err(anyhow!(
return Err(SplitError(format!(
"missing page {} for shard {}",
page.block_number,
self.block_shards
.get(i)
.map(|s| s.to_string())
.unwrap_or_else(|| "?".to_string())
));
)));
}
}
Ok(self.response)
}
}
/// A GetPageSplitter error.
#[derive(Debug, thiserror::Error)]
#[error("{0}")]
pub struct SplitError(String);
impl From<&str> for SplitError {
fn from(err: &str) -> Self {
SplitError(err.to_string())
}
}
impl From<String> for SplitError {
fn from(err: String) -> Self {
SplitError(err)
}
}
impl From<SplitError> for tonic::Status {
fn from(err: SplitError) -> Self {
tonic::Status::internal(err.0)
}
}

View File

@@ -473,13 +473,6 @@ impl TimelineHandles {
fn tenant_id(&self) -> Option<TenantId> {
self.wrapper.tenant_id.get().copied()
}
/// Returns whether a child shard exists locally for the given shard.
fn has_child_shard(&self, tenant_id: TenantId, shard_index: ShardIndex) -> bool {
self.wrapper
.tenant_manager
.has_child_shard(tenant_id, shard_index)
}
}
pub(crate) struct TenantManagerWrapper {
@@ -3432,18 +3425,6 @@ impl GrpcPageServiceHandler {
Ok(CancellableTask { task, cancel })
}
/// Errors if the request is executed on a non-zero shard. Only shard 0 has a complete view of
/// relations and their sizes, as well as SLRU segments and similar data.
#[allow(clippy::result_large_err)]
fn ensure_shard_zero(timeline: &Handle<TenantManagerTypes>) -> Result<(), tonic::Status> {
match timeline.get_shard_index().shard_number.0 {
0 => Ok(()),
shard => Err(tonic::Status::invalid_argument(format!(
"request must execute on shard zero (is shard {shard})",
))),
}
}
/// Generates a PagestreamRequest header from a ReadLsn and request ID.
fn make_hdr(
read_lsn: page_api::ReadLsn,
@@ -3465,56 +3446,55 @@ impl GrpcPageServiceHandler {
&self,
req: &tonic::Request<impl Any>,
) -> Result<Handle<TenantManagerTypes>, GetActiveTimelineError> {
let ttid = *extract::<TenantTimelineId>(req);
let TenantTimelineId {
tenant_id,
timeline_id,
} = *extract::<TenantTimelineId>(req);
let shard_index = *extract::<ShardIndex>(req);
let shard_selector = ShardSelector::Known(shard_index);
// TODO: untangle acquisition from TenantManagerWrapper::resolve() and Cache::get(), to
// avoid the unnecessary overhead.
TimelineHandles::new(self.tenant_manager.clone())
.get(ttid.tenant_id, ttid.timeline_id, shard_selector)
.get(tenant_id, timeline_id, ShardSelector::Known(shard_index))
.await
}
/// Acquires a timeline handle for the given request, which must be for shard zero.
/// Acquires a timeline handle for the given request, which must be for shard zero. Most
/// metadata requests are only valid on shard zero.
///
/// NB: during an ongoing shard split, the compute will keep talking to the parent shard until
/// the split is committed, but the parent shard may have been removed in the meanwhile. In that
/// case, we reroute the request to the new child shard. See [`Self::maybe_split_get_page`].
///
/// TODO: revamp the split protocol to avoid this child routing.
async fn get_shard_zero_request_timeline(
async fn get_request_timeline_shard_zero(
&self,
req: &tonic::Request<impl Any>,
) -> Result<Handle<TenantManagerTypes>, tonic::Status> {
let ttid = *extract::<TenantTimelineId>(req);
let TenantTimelineId {
tenant_id,
timeline_id,
} = *extract::<TenantTimelineId>(req);
let shard_index = *extract::<ShardIndex>(req);
if shard_index.shard_number.0 != 0 {
return Err(tonic::Status::invalid_argument(format!(
"request must use shard zero (requested shard {shard_index})",
"request only valid on shard zero (requested shard {shard_index})",
)));
}
// TODO: untangle acquisition from TenantManagerWrapper::resolve() and Cache::get(), to
// avoid the unnecessary overhead.
//
// TODO: this does internal retries, which will delay requests during shard splits (we won't
// look for the child until the parent's retries are exhausted). Don't do that.
let mut handles = TimelineHandles::new(self.tenant_manager.clone());
match handles
.get(
ttid.tenant_id,
ttid.timeline_id,
ShardSelector::Known(shard_index),
)
.get(tenant_id, timeline_id, ShardSelector::Known(shard_index))
.await
{
Ok(timeline) => Ok(timeline),
Err(err) => {
// We may be in the middle of a shard split. Try to find a child shard 0.
if let Ok(timeline) = handles
.get(ttid.tenant_id, ttid.timeline_id, ShardSelector::Zero)
.get(tenant_id, timeline_id, ShardSelector::Zero)
.await
&& timeline.get_shard_index().shard_count > shard_index.shard_count
{
@@ -3661,12 +3641,13 @@ impl GrpcPageServiceHandler {
}
/// Processes a GetPage request when there is a potential shard split in progress. We have to
/// reroute the request any local child shards, and split batch requests that straddle multiple
/// child shards.
/// reroute the request to any local child shards, and split batch requests that straddle
/// multiple child shards.
///
/// Parent shards are split and removed incrementally, but the compute is only notified once the
/// entire split commits, which can take several minutes. In the meanwhile, the compute will be
/// sending requests to the parent shard.
/// Parent shards are split and removed incrementally (there may be many parent shards when
/// splitting an already-sharded tenant), but the compute is only notified once the overall
/// split commits, which can take several minutes. In the meanwhile, the compute will be sending
/// requests to the parent shards.
///
/// TODO: add test infrastructure to provoke this situation frequently and for long periods of
/// time, to properly exercise it.
@@ -3676,10 +3657,12 @@ impl GrpcPageServiceHandler {
/// * Notify the compute about each subsplit.
/// * Return an error that updates the compute's shard map.
#[instrument(skip_all)]
#[allow(clippy::too_many_arguments)]
async fn maybe_split_get_page(
ctx: &RequestContext,
handles: &mut TimelineHandles,
ttid: TenantTimelineId,
tenant_id: TenantId,
timeline_id: TimelineId,
parent: ShardIndex,
req: page_api::GetPageRequest,
io_concurrency: IoConcurrency,
@@ -3690,8 +3673,8 @@ impl GrpcPageServiceHandler {
// the page must have a higher shard count.
let timeline = handles
.get(
ttid.tenant_id,
ttid.timeline_id,
tenant_id,
timeline_id,
ShardSelector::Page(rel_block_to_key(req.rel, req.block_numbers[0])),
)
.await?;
@@ -3703,8 +3686,7 @@ impl GrpcPageServiceHandler {
// Fast path: the request fits in a single shard.
if let Some(shard_index) =
GetPageSplitter::for_single_shard(&req, shard_id.count, Some(shard_id.stripe_size))
.map_err(|err| tonic::Status::internal(err.to_string()))?
GetPageSplitter::for_single_shard(&req, shard_id.count, Some(shard_id.stripe_size))?
{
// We got the shard ID from the first page, so these must be equal.
assert_eq!(shard_index.shard_number, shard_id.number);
@@ -3715,17 +3697,12 @@ impl GrpcPageServiceHandler {
// The request spans multiple shards; split it and dispatch parallel requests. All pages
// were originally in the parent shard, and during a split all children are local, so we
// expect to find local shards for all pages.
let mut splitter = GetPageSplitter::split(req, shard_id.count, Some(shard_id.stripe_size))
.map_err(|err| tonic::Status::internal(err.to_string()))?;
let mut splitter = GetPageSplitter::split(req, shard_id.count, Some(shard_id.stripe_size))?;
let mut shard_requests = FuturesUnordered::new();
for (shard_index, shard_req) in splitter.drain_requests() {
let timeline = handles
.get(
ttid.tenant_id,
ttid.timeline_id,
ShardSelector::Known(shard_index),
)
.get(tenant_id, timeline_id, ShardSelector::Known(shard_index))
.await?;
let future = Self::get_page(
ctx,
@@ -3739,14 +3716,10 @@ impl GrpcPageServiceHandler {
}
while let Some((shard_index, shard_response)) = shard_requests.next().await.transpose()? {
splitter
.add_response(shard_index, shard_response)
.map_err(|err| tonic::Status::internal(err.to_string()))?;
splitter.add_response(shard_index, shard_response)?;
}
splitter
.get_response()
.map_err(|err| tonic::Status::internal(err.to_string()))
Ok(splitter.collect_response()?)
}
}
@@ -3775,7 +3748,7 @@ impl proto::PageService for GrpcPageServiceHandler {
// to be the sweet spot where throughput is saturated.
const CHUNK_SIZE: usize = 256 * 1024;
let timeline = self.get_shard_zero_request_timeline(&req).await?;
let timeline = self.get_request_timeline_shard_zero(&req).await?;
let ctx = self.ctx.with_scope_timeline(&timeline);
// Validate the request and decorate the span.
@@ -3894,7 +3867,7 @@ impl proto::PageService for GrpcPageServiceHandler {
req: tonic::Request<proto::GetDbSizeRequest>,
) -> Result<tonic::Response<proto::GetDbSizeResponse>, tonic::Status> {
let received_at = extract::<ReceivedAt>(&req).0;
let timeline = self.get_shard_zero_request_timeline(&req).await?;
let timeline = self.get_request_timeline_shard_zero(&req).await?;
let ctx = self.ctx.with_scope_page_service_pagestream(&timeline);
// Validate the request, decorate the span, and convert it to a Pagestream request.
@@ -3931,25 +3904,21 @@ impl proto::PageService for GrpcPageServiceHandler {
// reroute requests to the child shards below, but we also detect the common cases here
// where either the shard exists or no shards exist at all. If we have a child shard, we
// can't acquire a weak handle because we don't know which child shard to use yet.
//
// TODO: TimelineHandles.get() does internal retries, which will delay requests during shard
// splits. It shouldn't.
let ttid = *extract::<TenantTimelineId>(&req);
let TenantTimelineId {
tenant_id,
timeline_id,
} = *extract::<TenantTimelineId>(&req);
let shard_index = *extract::<ShardIndex>(&req);
let mut handles = TimelineHandles::new(self.tenant_manager.clone());
let timeline = match handles
.get(
ttid.tenant_id,
ttid.timeline_id,
ShardSelector::Known(shard_index),
)
.get(tenant_id, timeline_id, ShardSelector::Known(shard_index))
.await
{
// The timeline shard exists. Keep a weak handle to reuse for each request.
Ok(timeline) => Some(timeline.downgrade()),
// The shard doesn't exist, but a child shard does. We'll reroute requests later.
Err(_) if handles.has_child_shard(ttid.tenant_id, shard_index) => None,
Err(_) if self.tenant_manager.has_child_shard(tenant_id, shard_index) => None,
// Failed to fetch the timeline, and no child shard exists. Error out.
Err(err) => return Err(err.into()),
};
@@ -4005,7 +3974,8 @@ impl proto::PageService for GrpcPageServiceHandler {
Self::maybe_split_get_page(
&ctx,
&mut handles,
ttid,
tenant_id,
timeline_id,
shard_index,
req,
io_concurrency.clone(),
@@ -4040,7 +4010,7 @@ impl proto::PageService for GrpcPageServiceHandler {
req: tonic::Request<proto::GetRelSizeRequest>,
) -> Result<tonic::Response<proto::GetRelSizeResponse>, tonic::Status> {
let received_at = extract::<ReceivedAt>(&req).0;
let timeline = self.get_shard_zero_request_timeline(&req).await?;
let timeline = self.get_request_timeline_shard_zero(&req).await?;
let ctx = self.ctx.with_scope_page_service_pagestream(&timeline);
// Validate the request, decorate the span, and convert it to a Pagestream request.
@@ -4076,11 +4046,10 @@ impl proto::PageService for GrpcPageServiceHandler {
req: tonic::Request<proto::GetSlruSegmentRequest>,
) -> Result<tonic::Response<proto::GetSlruSegmentResponse>, tonic::Status> {
let received_at = extract::<ReceivedAt>(&req).0;
let timeline = self.get_shard_zero_request_timeline(&req).await?;
let timeline = self.get_request_timeline_shard_zero(&req).await?;
let ctx = self.ctx.with_scope_page_service_pagestream(&timeline);
// Validate the request, decorate the span, and convert it to a Pagestream request.
Self::ensure_shard_zero(&timeline)?;
let req: page_api::GetSlruSegmentRequest = req.into_inner().try_into()?;
span_record!(kind=%req.kind, segno=%req.segno, lsn=%req.read_lsn);

View File

@@ -826,9 +826,9 @@ impl TenantManager {
peek_slot.is_some()
}
/// Returns whether a local slot exists for a child shard of the given tenant and shard count.
/// Note that this just checks for a shard with a larger shard count, and it may not be a
/// direct child of the given shard.
/// Returns whether a local shard exists that's a child of the given tenant shard. Note that
/// this just checks for any shard with a larger shard count, and it may not be a direct child
/// of the given shard (their keyspace may not overlap).
pub(crate) fn has_child_shard(&self, tenant_id: TenantId, shard_index: ShardIndex) -> bool {
match &*self.tenants.read().unwrap() {
TenantsMap::Initializing => false,
@@ -1536,9 +1536,10 @@ impl TenantManager {
// Phase 2: Put the parent shard to InProgress and grab a reference to the parent Tenant
//
// TODO: keeping the parent as InProgress while spawning the children causes read
// unavailability, as we can't acquire a timeline handle for it. The parent should be
// available for reads until the children are ready -- potentially until *all* subsplits
// across all parent shards are complete and the compute has been notified. See:
// unavailability, as we can't acquire a new timeline handle for it (existing handles appear
// to still work though, even downgraded ones). The parent should be available for reads
// until the children are ready -- potentially until *all* subsplits across all parent
// shards are complete and the compute has been notified. See:
// <https://databricks.atlassian.net/browse/LKB-672>.
drop(tenant);
let mut parent_slot_guard =