pageserver: only retry WaitForActiveTimeout during shard resolution (#12772)

## Problem

In https://github.com/neondatabase/neon/pull/12467, timeouts and retries
were added to `Cache::get` tenant shard resolution to paper over an
issue with read unavailability during shard splits. However, this
retries _all_ errors, including irrecoverable errors like `NotFound`.

This causes problems with gRPC child shard routing in #12702, which
targets specific shards with `ShardSelector::Known` and relies on prompt
`NotFound` errors to reroute requests to child shards. These retries
introduce a 1s delay for all reads during child routing.

The broader problem of read unavailability during shard splits is left
as future work, see https://databricks.atlassian.net/browse/LKB-672.

Touches #12702.
Touches [LKB-191](https://databricks.atlassian.net/browse/LKB-191).

## Summary of changes

* Change `TenantManager` to always return a concrete
`GetActiveTimelineError`.
* Only retry `WaitForActiveTimeout` errors.
* Lots of code unindentation due to the simplified error handling.

Out of caution, we do not gate the retries on `ShardSelector`, since
this can trigger other races. Improvements here are left as future work.
This commit is contained in:
Erik Grinaker
2025-07-29 14:33:02 +02:00
committed by GitHub
parent e2411818ef
commit 61f267d8f9
3 changed files with 82 additions and 90 deletions

View File

@@ -466,13 +466,6 @@ impl TimelineHandles {
self.handles self.handles
.get(timeline_id, shard_selector, &self.wrapper) .get(timeline_id, shard_selector, &self.wrapper)
.await .await
.map_err(|e| match e {
timeline::handle::GetError::TenantManager(e) => e,
timeline::handle::GetError::PerTimelineStateShutDown => {
trace!("per-timeline state shut down");
GetActiveTimelineError::Timeline(GetTimelineError::ShuttingDown)
}
})
} }
fn tenant_id(&self) -> Option<TenantId> { fn tenant_id(&self) -> Option<TenantId> {
@@ -488,11 +481,9 @@ pub(crate) struct TenantManagerWrapper {
tenant_id: once_cell::sync::OnceCell<TenantId>, tenant_id: once_cell::sync::OnceCell<TenantId>,
} }
#[derive(Debug)]
pub(crate) struct TenantManagerTypes; pub(crate) struct TenantManagerTypes;
impl timeline::handle::Types for TenantManagerTypes { impl timeline::handle::Types for TenantManagerTypes {
type TenantManagerError = GetActiveTimelineError;
type TenantManager = TenantManagerWrapper; type TenantManager = TenantManagerWrapper;
type Timeline = TenantManagerCacheItem; type Timeline = TenantManagerCacheItem;
} }

View File

@@ -1522,6 +1522,12 @@ impl TenantManager {
self.resources.deletion_queue_client.flush_advisory(); self.resources.deletion_queue_client.flush_advisory();
// Phase 2: Put the parent shard to InProgress and grab a reference to the parent Tenant // Phase 2: Put the parent shard to InProgress and grab a reference to the parent Tenant
//
// TODO: keeping the parent as InProgress while spawning the children causes read
// unavailability, as we can't acquire a timeline handle for it. The parent should be
// available for reads until the children are ready -- potentially until *all* subsplits
// across all parent shards are complete and the compute has been notified. See:
// <https://databricks.atlassian.net/browse/LKB-672>.
drop(tenant); drop(tenant);
let mut parent_slot_guard = let mut parent_slot_guard =
self.tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?; self.tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?;

View File

@@ -224,11 +224,11 @@ use tracing::{instrument, trace};
use utils::id::TimelineId; use utils::id::TimelineId;
use utils::shard::{ShardIndex, ShardNumber}; use utils::shard::{ShardIndex, ShardNumber};
use crate::tenant::mgr::ShardSelector; use crate::page_service::GetActiveTimelineError;
use crate::tenant::GetTimelineError;
use crate::tenant::mgr::{GetActiveTenantError, ShardSelector};
/// The requirement for Debug is so that #[derive(Debug)] works in some places. pub(crate) trait Types: Sized {
pub(crate) trait Types: Sized + std::fmt::Debug {
type TenantManagerError: Sized + std::fmt::Debug;
type TenantManager: TenantManager<Self> + Sized; type TenantManager: TenantManager<Self> + Sized;
type Timeline: Timeline<Self> + Sized; type Timeline: Timeline<Self> + Sized;
} }
@@ -307,12 +307,11 @@ impl<T: Types> Default for PerTimelineState<T> {
/// Abstract view of [`crate::tenant::mgr`], for testability. /// Abstract view of [`crate::tenant::mgr`], for testability.
pub(crate) trait TenantManager<T: Types> { pub(crate) trait TenantManager<T: Types> {
/// Invoked by [`Cache::get`] to resolve a [`ShardTimelineId`] to a [`Types::Timeline`]. /// Invoked by [`Cache::get`] to resolve a [`ShardTimelineId`] to a [`Types::Timeline`].
/// Errors are returned as [`GetError::TenantManager`].
async fn resolve( async fn resolve(
&self, &self,
timeline_id: TimelineId, timeline_id: TimelineId,
shard_selector: ShardSelector, shard_selector: ShardSelector,
) -> Result<T::Timeline, T::TenantManagerError>; ) -> Result<T::Timeline, GetActiveTimelineError>;
} }
/// Abstract view of an [`Arc<Timeline>`], for testability. /// Abstract view of an [`Arc<Timeline>`], for testability.
@@ -322,13 +321,6 @@ pub(crate) trait Timeline<T: Types> {
fn per_timeline_state(&self) -> &PerTimelineState<T>; fn per_timeline_state(&self) -> &PerTimelineState<T>;
} }
/// Errors returned by [`Cache::get`].
#[derive(Debug)]
pub(crate) enum GetError<T: Types> {
TenantManager(T::TenantManagerError),
PerTimelineStateShutDown,
}
/// Internal type used in [`Cache::get`]. /// Internal type used in [`Cache::get`].
enum RoutingResult<T: Types> { enum RoutingResult<T: Types> {
FastPath(Handle<T>), FastPath(Handle<T>),
@@ -345,7 +337,7 @@ impl<T: Types> Cache<T> {
timeline_id: TimelineId, timeline_id: TimelineId,
shard_selector: ShardSelector, shard_selector: ShardSelector,
tenant_manager: &T::TenantManager, tenant_manager: &T::TenantManager,
) -> Result<Handle<T>, GetError<T>> { ) -> Result<Handle<T>, GetActiveTimelineError> {
const GET_MAX_RETRIES: usize = 10; const GET_MAX_RETRIES: usize = 10;
const RETRY_BACKOFF: Duration = Duration::from_millis(100); const RETRY_BACKOFF: Duration = Duration::from_millis(100);
let mut attempt = 0; let mut attempt = 0;
@@ -356,7 +348,11 @@ impl<T: Types> Cache<T> {
.await .await
{ {
Ok(handle) => return Ok(handle), Ok(handle) => return Ok(handle),
Err(e) => { Err(
e @ GetActiveTimelineError::Tenant(GetActiveTenantError::WaitForActiveTimeout {
..
}),
) => {
// Retry on tenant manager error to handle tenant split more gracefully // Retry on tenant manager error to handle tenant split more gracefully
if attempt < GET_MAX_RETRIES { if attempt < GET_MAX_RETRIES {
tokio::time::sleep(RETRY_BACKOFF).await; tokio::time::sleep(RETRY_BACKOFF).await;
@@ -370,6 +366,7 @@ impl<T: Types> Cache<T> {
return Err(e); return Err(e);
} }
} }
Err(err) => return Err(err),
} }
} }
} }
@@ -388,7 +385,7 @@ impl<T: Types> Cache<T> {
timeline_id: TimelineId, timeline_id: TimelineId,
shard_selector: ShardSelector, shard_selector: ShardSelector,
tenant_manager: &T::TenantManager, tenant_manager: &T::TenantManager,
) -> Result<Handle<T>, GetError<T>> { ) -> Result<Handle<T>, GetActiveTimelineError> {
// terminates because when every iteration we remove an element from the map // terminates because when every iteration we remove an element from the map
let miss: ShardSelector = loop { let miss: ShardSelector = loop {
let routing_state = self.shard_routing(timeline_id, shard_selector); let routing_state = self.shard_routing(timeline_id, shard_selector);
@@ -468,9 +465,8 @@ impl<T: Types> Cache<T> {
timeline_id: TimelineId, timeline_id: TimelineId,
shard_selector: ShardSelector, shard_selector: ShardSelector,
tenant_manager: &T::TenantManager, tenant_manager: &T::TenantManager,
) -> Result<Handle<T>, GetError<T>> { ) -> Result<Handle<T>, GetActiveTimelineError> {
match tenant_manager.resolve(timeline_id, shard_selector).await { let timeline = tenant_manager.resolve(timeline_id, shard_selector).await?;
Ok(timeline) => {
let key = timeline.shard_timeline_id(); let key = timeline.shard_timeline_id();
match &shard_selector { match &shard_selector {
ShardSelector::Zero => assert_eq!(key.shard_index.shard_number, ShardNumber(0)), ShardSelector::Zero => assert_eq!(key.shard_index.shard_number, ShardNumber(0)),
@@ -480,8 +476,7 @@ impl<T: Types> Cache<T> {
trace!("creating new HandleInner"); trace!("creating new HandleInner");
let timeline = Arc::new(timeline); let timeline = Arc::new(timeline);
let handle_inner_arc = let handle_inner_arc = Arc::new(Mutex::new(HandleInner::Open(Arc::clone(&timeline))));
Arc::new(Mutex::new(HandleInner::Open(Arc::clone(&timeline))));
let handle_weak = WeakHandle { let handle_weak = WeakHandle {
inner: Arc::downgrade(&handle_inner_arc), inner: Arc::downgrade(&handle_inner_arc),
}; };
@@ -489,16 +484,17 @@ impl<T: Types> Cache<T> {
.upgrade() .upgrade()
.ok() .ok()
.expect("we just created it and it's not linked anywhere yet"); .expect("we just created it and it's not linked anywhere yet");
{
let mut lock_guard = timeline let mut lock_guard = timeline
.per_timeline_state() .per_timeline_state()
.handles .handles
.lock() .lock()
.expect("mutex poisoned"); .expect("mutex poisoned");
match &mut *lock_guard { let Some(per_timeline_state) = &mut *lock_guard else {
Some(per_timeline_state) => { return Err(GetActiveTimelineError::Timeline(
let replaced = GetTimelineError::ShuttingDown,
per_timeline_state.insert(self.id, Arc::clone(&handle_inner_arc)); ));
};
let replaced = per_timeline_state.insert(self.id, Arc::clone(&handle_inner_arc));
assert!(replaced.is_none(), "some earlier code left a stale handle"); assert!(replaced.is_none(), "some earlier code left a stale handle");
match self.map.entry(key) { match self.map.entry(key) {
hash_map::Entry::Occupied(_o) => { hash_map::Entry::Occupied(_o) => {
@@ -512,17 +508,8 @@ impl<T: Types> Cache<T> {
v.insert(handle_weak); v.insert(handle_weak);
} }
} }
}
None => {
return Err(GetError::PerTimelineStateShutDown);
}
}
}
Ok(handle) Ok(handle)
} }
Err(e) => Err(GetError::TenantManager(e)),
}
}
} }
pub(crate) enum HandleUpgradeError { pub(crate) enum HandleUpgradeError {
@@ -655,7 +642,8 @@ mod tests {
use pageserver_api::models::ShardParameters; use pageserver_api::models::ShardParameters;
use pageserver_api::reltag::RelTag; use pageserver_api::reltag::RelTag;
use pageserver_api::shard::DEFAULT_STRIPE_SIZE; use pageserver_api::shard::DEFAULT_STRIPE_SIZE;
use utils::shard::ShardCount; use utils::id::TenantId;
use utils::shard::{ShardCount, TenantShardId};
use utils::sync::gate::GateGuard; use utils::sync::gate::GateGuard;
use super::*; use super::*;
@@ -665,7 +653,6 @@ mod tests {
#[derive(Debug)] #[derive(Debug)]
struct TestTypes; struct TestTypes;
impl Types for TestTypes { impl Types for TestTypes {
type TenantManagerError = anyhow::Error;
type TenantManager = StubManager; type TenantManager = StubManager;
type Timeline = Entered; type Timeline = Entered;
} }
@@ -716,40 +703,48 @@ mod tests {
&self, &self,
timeline_id: TimelineId, timeline_id: TimelineId,
shard_selector: ShardSelector, shard_selector: ShardSelector,
) -> anyhow::Result<Entered> { ) -> Result<Entered, GetActiveTimelineError> {
fn enter_gate(
timeline: &StubTimeline,
) -> Result<Arc<GateGuard>, GetActiveTimelineError> {
Ok(Arc::new(timeline.gate.enter().map_err(|_| {
GetActiveTimelineError::Timeline(GetTimelineError::ShuttingDown)
})?))
}
for timeline in &self.shards { for timeline in &self.shards {
if timeline.id == timeline_id { if timeline.id == timeline_id {
let enter_gate = || {
let gate_guard = timeline.gate.enter()?;
let gate_guard = Arc::new(gate_guard);
anyhow::Ok(gate_guard)
};
match &shard_selector { match &shard_selector {
ShardSelector::Zero if timeline.shard.is_shard_zero() => { ShardSelector::Zero if timeline.shard.is_shard_zero() => {
return Ok(Entered { return Ok(Entered {
timeline: Arc::clone(timeline), timeline: Arc::clone(timeline),
gate_guard: enter_gate()?, gate_guard: enter_gate(timeline)?,
}); });
} }
ShardSelector::Zero => continue, ShardSelector::Zero => continue,
ShardSelector::Page(key) if timeline.shard.is_key_local(key) => { ShardSelector::Page(key) if timeline.shard.is_key_local(key) => {
return Ok(Entered { return Ok(Entered {
timeline: Arc::clone(timeline), timeline: Arc::clone(timeline),
gate_guard: enter_gate()?, gate_guard: enter_gate(timeline)?,
}); });
} }
ShardSelector::Page(_) => continue, ShardSelector::Page(_) => continue,
ShardSelector::Known(idx) if idx == &timeline.shard.shard_index() => { ShardSelector::Known(idx) if idx == &timeline.shard.shard_index() => {
return Ok(Entered { return Ok(Entered {
timeline: Arc::clone(timeline), timeline: Arc::clone(timeline),
gate_guard: enter_gate()?, gate_guard: enter_gate(timeline)?,
}); });
} }
ShardSelector::Known(_) => continue, ShardSelector::Known(_) => continue,
} }
} }
} }
anyhow::bail!("not found") Err(GetActiveTimelineError::Timeline(
GetTimelineError::NotFound {
tenant_id: TenantShardId::unsharded(TenantId::from([0; 16])),
timeline_id,
},
))
} }
} }