From f901cd459ba823e6dbf217e8d227d632c4e64813 Mon Sep 17 00:00:00 2001 From: John Spray Date: Tue, 14 May 2024 22:07:14 +0100 Subject: [PATCH] pageserver: refine tenant_id->shard lookup --- pageserver/src/tenant/mgr.rs | 149 ++++++++++++++++------------------- 1 file changed, 69 insertions(+), 80 deletions(-) diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 7a3e36bf02..fe00c8976d 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -119,6 +119,7 @@ pub(crate) enum TenantsMapRemoveResult { /// When resolving a TenantId to a shard, we may be looking for the 0th /// shard, or we might be looking for whichever shard holds a particular page. +#[derive(Copy, Clone)] pub(crate) enum ShardSelector { /// Only return the 0th shard, if it is present. If a non-0th shard is present, /// ignore it. @@ -169,6 +170,14 @@ impl TenantStartupMode { } } +/// Result type for looking up a TenantId to a specific shard +enum ShardResolveResult { + NotFound, + Found(Arc), + // Wait for this barrrier, then query again + InProgress(utils::completion::Barrier), +} + impl TenantsMap { /// Convenience function for typical usage, where we want to get a `Tenant` object, for /// working with attached tenants. If the TenantId is in the map but in Secondary state, @@ -184,26 +193,40 @@ impl TenantsMap { /// A page service client sends a TenantId, and to look up the correct Tenant we must /// resolve this to a fully qualified TenantShardId. + /// + /// During shard splits: we shall see parent shards in InProgress state and skip them, and + /// instead match on child shards which should appear in Attached state. Very early in a shard + /// split, or in other cases where a shard is InProgress, we will return our own InProgress result + /// to instruct the caller to wait for that to finish before querying again. fn resolve_attached_shard( &self, tenant_id: &TenantId, selector: ShardSelector, - ) -> Option { + ) -> ShardResolveResult { let mut want_shard = None; + let mut any_in_progress = None; + match self { - TenantsMap::Initializing => None, + TenantsMap::Initializing => ShardResolveResult::NotFound, TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => { for slot in m.range(TenantShardId::tenant_range(*tenant_id)) { // Ignore all slots that don't contain an attached tenant let tenant = match &slot.1 { TenantSlot::Attached(t) => t, + TenantSlot::InProgress(barrier) => { + // We might still find a usable shard, but in case we don't, remember that + // we saw at least one InProgress slot, so that we can distinguish this case + // from a simple NotFound in our return value. + any_in_progress = Some(barrier.clone()); + continue; + } _ => continue, }; match selector { - ShardSelector::First => return Some(*slot.0), + ShardSelector::First => return ShardResolveResult::Found(tenant.clone()), ShardSelector::Zero if slot.0.shard_number == ShardNumber(0) => { - return Some(*slot.0) + return ShardResolveResult::Found(tenant.clone()) } ShardSelector::Page(key) => { // First slot we see for this tenant, calculate the expected shard number @@ -214,7 +237,7 @@ impl TenantsMap { } if Some(tenant.shard_identity.number) == want_shard { - return Some(*slot.0); + return ShardResolveResult::Found(tenant.clone()); } } _ => continue, @@ -222,7 +245,11 @@ impl TenantsMap { } // Fall through: we didn't find an acceptable shard - None + if let Some(barrier) = any_in_progress { + ShardResolveResult::InProgress(barrier) + } else { + ShardResolveResult::NotFound + } } } } @@ -2160,94 +2187,56 @@ pub(crate) async fn get_active_tenant_with_timeout( timeout: Duration, cancel: &CancellationToken, ) -> Result, GetActiveTenantError> { - enum WaitFor { - Barrier(utils::completion::Barrier), - Tenant(Arc), - } - let wait_start = Instant::now(); let deadline = wait_start + timeout; - let (wait_for, tenant_shard_id) = { - let locked = TENANTS.read().unwrap(); - - // Resolve TenantId to TenantShardId - let tenant_shard_id = locked - .resolve_attached_shard(&tenant_id, shard_selector) - .ok_or(GetActiveTenantError::NotFound(GetTenantError::NotFound( - tenant_id, - )))?; - - let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read) - .map_err(GetTenantError::MapState)?; - match peek_slot { - Some(TenantSlot::Attached(tenant)) => { - match tenant.current_state() { - TenantState::Active => { - // Fast path: we don't need to do any async waiting. - return Ok(tenant.clone()); - } - _ => { - tenant.activate_now(); - (WaitFor::Tenant(tenant.clone()), tenant_shard_id) - } - } - } - Some(TenantSlot::Secondary(_)) => { - return Err(GetActiveTenantError::NotFound(GetTenantError::NotActive( - tenant_shard_id, - ))) - } - Some(TenantSlot::InProgress(barrier)) => { - (WaitFor::Barrier(barrier.clone()), tenant_shard_id) - } - None => { + // Resolve TenantId to TenantShardId. This is usually a quick one-shot thing, the loop is + // for handling the rare case that the slot we're accessing is InProgress. + let tenant_shard = loop { + let resolved = { + let locked = TENANTS.read().unwrap(); + locked.resolve_attached_shard(&tenant_id, shard_selector) + }; + match resolved { + ShardResolveResult::Found(tsid) => break tsid, + ShardResolveResult::NotFound => { return Err(GetActiveTenantError::NotFound(GetTenantError::NotFound( tenant_id, - ))) + ))); } - } - }; - - let tenant = match wait_for { - WaitFor::Barrier(barrier) => { - tracing::debug!("Waiting for tenant InProgress state to pass..."); - timeout_cancellable( - deadline.duration_since(Instant::now()), - cancel, - barrier.wait(), - ) - .await - .map_err(|e| match e { - TimeoutCancellableError::Timeout => GetActiveTenantError::WaitForActiveTimeout { - latest_state: None, - wait_time: wait_start.elapsed(), - }, - TimeoutCancellableError::Cancelled => GetActiveTenantError::Cancelled, - })?; - { - let locked = TENANTS.read().unwrap(); - let peek_slot = - tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read) - .map_err(GetTenantError::MapState)?; - match peek_slot { - Some(TenantSlot::Attached(tenant)) => tenant.clone(), - _ => { - return Err(GetActiveTenantError::NotFound(GetTenantError::NotActive( - tenant_shard_id, - ))) + ShardResolveResult::InProgress(barrier) => { + // We can't authoritatively answer right now: wait for InProgress state + // to end, then try again + match timeout_cancellable( + deadline.duration_since(Instant::now()), + cancel, + barrier.wait(), + ) + .await + { + Ok(_) => { + // The barrier completed: proceed around the loop to try looking up again + continue; + } + Err(TimeoutCancellableError::Timeout) => { + return Err(GetActiveTenantError::WaitForActiveTimeout { + latest_state: None, + wait_time: timeout, + }); + } + Err(TimeoutCancellableError::Cancelled) => { + return Err(GetActiveTenantError::Cancelled); } } } - } - WaitFor::Tenant(tenant) => tenant, + }; }; tracing::debug!("Waiting for tenant to enter active state..."); - tenant + tenant_shard .wait_to_become_active(deadline.duration_since(Instant::now())) .await?; - Ok(tenant) + Ok(tenant_shard) } #[derive(Debug, thiserror::Error)]