diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index d416153d22..d4247734fc 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -58,8 +58,6 @@ use crate::span::debug_assert_current_span_has_tenant_and_timeline_id; use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id; use crate::task_mgr; use crate::task_mgr::TaskKind; -use crate::tenant::mgr; -use crate::tenant::mgr::get_active_tenant_with_timeout; use crate::tenant::mgr::GetActiveTenantError; use crate::tenant::mgr::ShardSelector; use crate::tenant::mgr::TenantManager; @@ -554,13 +552,15 @@ impl PageServerHandler { { debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id(); - let tenant = mgr::get_active_tenant_with_timeout( - tenant_id, - ShardSelector::First, - ACTIVE_TENANT_TIMEOUT, - &task_mgr::shutdown_token(), - ) - .await?; + let tenant = self + .tenant_manager + .get_active_tenant_with_timeout( + tenant_id, + ShardSelector::First, + ACTIVE_TENANT_TIMEOUT, + &task_mgr::shutdown_token(), + ) + .await?; // Make request tracer if needed let mut tracer = if tenant.get_trace_read_requests() { @@ -728,13 +728,15 @@ impl PageServerHandler { // Create empty timeline info!("creating new timeline"); - let tenant = get_active_tenant_with_timeout( - tenant_id, - ShardSelector::Zero, - ACTIVE_TENANT_TIMEOUT, - &task_mgr::shutdown_token(), - ) - .await?; + let tenant = self + .tenant_manager + .get_active_tenant_with_timeout( + tenant_id, + ShardSelector::Zero, + ACTIVE_TENANT_TIMEOUT, + &task_mgr::shutdown_token(), + ) + .await?; let timeline = tenant .create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx) .await?; @@ -1372,14 +1374,16 @@ impl PageServerHandler { timeline_id: TimelineId, selector: ShardSelector, ) -> Result, GetActiveTimelineError> { - let tenant = get_active_tenant_with_timeout( - tenant_id, - selector, - ACTIVE_TENANT_TIMEOUT, - &task_mgr::shutdown_token(), - ) - .await - .map_err(GetActiveTimelineError::Tenant)?; + let tenant = self + .tenant_manager + .get_active_tenant_with_timeout( + tenant_id, + selector, + ACTIVE_TENANT_TIMEOUT, + &task_mgr::shutdown_token(), + ) + .await + .map_err(GetActiveTimelineError::Tenant)?; let timeline = tenant.get_timeline(timeline_id, true)?; set_tracing_field_shard_id(&timeline); Ok(timeline) @@ -1773,13 +1777,15 @@ where self.check_permission(Some(tenant_id))?; - let tenant = get_active_tenant_with_timeout( - tenant_id, - ShardSelector::Zero, - ACTIVE_TENANT_TIMEOUT, - &task_mgr::shutdown_token(), - ) - .await?; + let tenant = self + .tenant_manager + .get_active_tenant_with_timeout( + tenant_id, + ShardSelector::Zero, + ACTIVE_TENANT_TIMEOUT, + &task_mgr::shutdown_token(), + ) + .await?; pgb.write_message_noflush(&BeMessage::RowDescription(&[ RowDescriptor::int8_col(b"checkpoint_distance"), RowDescriptor::int8_col(b"checkpoint_timeout"), diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 39d8eded41..dff920a634 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -2132,6 +2132,68 @@ impl TenantManager { Ok(reparented) } + + /// Get a [`Tenant`] in its active state. If the tenant_id is currently in [`TenantSlot::InProgress`] + /// state, then wait for up to `timeout`. If the [`Tenant`] is not currently in [`TenantState::Active`], + /// then wait for up to `timeout` (minus however long we waited for the slot). + pub(crate) async fn get_active_tenant_with_timeout( + &self, + tenant_id: TenantId, + shard_selector: ShardSelector, + timeout: Duration, + cancel: &CancellationToken, + ) -> Result, GetActiveTenantError> { + let wait_start = Instant::now(); + let deadline = wait_start + timeout; + + // Resolve TenantId to TenantShardId. This is usually a quick one-shot thing, the loop is + // for handling the rare case that the slot we're accessing is InProgress. + let tenant_shard = loop { + let resolved = { + let locked = TENANTS.read().unwrap(); + locked.resolve_attached_shard(&tenant_id, shard_selector) + }; + match resolved { + ShardResolveResult::Found(tenant_shard) => break tenant_shard, + ShardResolveResult::NotFound => { + return Err(GetActiveTenantError::NotFound(GetTenantError::NotFound( + tenant_id, + ))); + } + ShardResolveResult::InProgress(barrier) => { + // We can't authoritatively answer right now: wait for InProgress state + // to end, then try again + match timeout_cancellable( + deadline.duration_since(Instant::now()), + cancel, + barrier.wait(), + ) + .await + { + Ok(_) => { + // The barrier completed: proceed around the loop to try looking up again + continue; + } + Err(TimeoutCancellableError::Timeout) => { + return Err(GetActiveTenantError::WaitForActiveTimeout { + latest_state: None, + wait_time: timeout, + }); + } + Err(TimeoutCancellableError::Cancelled) => { + return Err(GetActiveTenantError::Cancelled); + } + } + } + }; + }; + + tracing::debug!("Waiting for tenant to enter active state..."); + tenant_shard + .wait_to_become_active(deadline.duration_since(Instant::now())) + .await?; + Ok(tenant_shard) + } } #[derive(Debug, thiserror::Error)] @@ -2180,67 +2242,6 @@ pub(crate) enum GetActiveTenantError { Broken(String), } -/// Get a [`Tenant`] in its active state. If the tenant_id is currently in [`TenantSlot::InProgress`] -/// state, then wait for up to `timeout`. If the [`Tenant`] is not currently in [`TenantState::Active`], -/// then wait for up to `timeout` (minus however long we waited for the slot). -pub(crate) async fn get_active_tenant_with_timeout( - tenant_id: TenantId, - shard_selector: ShardSelector, - timeout: Duration, - cancel: &CancellationToken, -) -> Result, GetActiveTenantError> { - let wait_start = Instant::now(); - let deadline = wait_start + timeout; - - // Resolve TenantId to TenantShardId. This is usually a quick one-shot thing, the loop is - // for handling the rare case that the slot we're accessing is InProgress. - let tenant_shard = loop { - let resolved = { - let locked = TENANTS.read().unwrap(); - locked.resolve_attached_shard(&tenant_id, shard_selector) - }; - match resolved { - ShardResolveResult::Found(tenant_shard) => break tenant_shard, - ShardResolveResult::NotFound => { - return Err(GetActiveTenantError::NotFound(GetTenantError::NotFound( - tenant_id, - ))); - } - ShardResolveResult::InProgress(barrier) => { - // We can't authoritatively answer right now: wait for InProgress state - // to end, then try again - match timeout_cancellable( - deadline.duration_since(Instant::now()), - cancel, - barrier.wait(), - ) - .await - { - Ok(_) => { - // The barrier completed: proceed around the loop to try looking up again - continue; - } - Err(TimeoutCancellableError::Timeout) => { - return Err(GetActiveTenantError::WaitForActiveTimeout { - latest_state: None, - wait_time: timeout, - }); - } - Err(TimeoutCancellableError::Cancelled) => { - return Err(GetActiveTenantError::Cancelled); - } - } - } - }; - }; - - tracing::debug!("Waiting for tenant to enter active state..."); - tenant_shard - .wait_to_become_active(deadline.duration_since(Instant::now())) - .await?; - Ok(tenant_shard) -} - #[derive(Debug, thiserror::Error)] pub(crate) enum DeleteTimelineError { #[error("Tenant {0}")]