diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index ef564ce710..87f293b4fe 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -14,7 +14,6 @@ use async_compression::tokio::write::GzipEncoder; use bytes::Buf; use bytes::Bytes; use futures::Stream; -use pageserver_api::models::TenantState; use pageserver_api::models::{ PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse, PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse, @@ -55,16 +54,18 @@ use crate::metrics; use crate::metrics::LIVE_CONNECTIONS_COUNT; use crate::task_mgr; use crate::task_mgr::TaskKind; -use crate::tenant; use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id; use crate::tenant::mgr; -use crate::tenant::mgr::GetTenantError; -use crate::tenant::{Tenant, Timeline}; +use crate::tenant::mgr::get_active_tenant_with_timeout; +use crate::tenant::mgr::GetActiveTenantError; +use crate::tenant::Timeline; use crate::trace::Tracer; use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID; use postgres_ffi::BLCKSZ; +const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_secs(10); + /// Read the end of a tar archive. /// /// A tar archive normally ends with two consecutive blocks of zeros, 512 bytes each. @@ -389,7 +390,7 @@ impl PageServerHandler { task_mgr::associate_with(Some(tenant_id), Some(timeline_id)); // Make request tracer if needed - let tenant = get_active_tenant_with_timeout(tenant_id, &ctx).await?; + let tenant = mgr::get_active_tenant_with_timeout(tenant_id, ACTIVE_TENANT_TIMEOUT).await?; let mut tracer = if tenant.get_trace_read_requests() { let connection_id = ConnectionId::generate(); let path = tenant @@ -525,7 +526,7 @@ impl PageServerHandler { task_mgr::associate_with(Some(tenant_id), Some(timeline_id)); // Create empty timeline info!("creating new timeline"); - let tenant = get_active_tenant_with_timeout(tenant_id, &ctx).await?; + let tenant = get_active_tenant_with_timeout(tenant_id, ACTIVE_TENANT_TIMEOUT).await?; let timeline = tenant .create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx) .await?; @@ -584,7 +585,7 @@ impl PageServerHandler { debug_assert_current_span_has_tenant_and_timeline_id(); task_mgr::associate_with(Some(tenant_id), Some(timeline_id)); - let timeline = get_active_tenant_timeline(tenant_id, timeline_id, &ctx).await?; + let timeline = get_active_tenant_timeline(tenant_id, timeline_id).await?; let last_record_lsn = timeline.get_last_record_lsn(); if last_record_lsn != start_lsn { return Err(QueryError::Other( @@ -792,7 +793,7 @@ impl PageServerHandler { let started = std::time::Instant::now(); // check that the timeline exists - let timeline = get_active_tenant_timeline(tenant_id, timeline_id, &ctx).await?; + let timeline = get_active_tenant_timeline(tenant_id, timeline_id).await?; let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); if let Some(lsn) = lsn { // Backup was requested at a particular LSN. Wait for it to arrive. @@ -1048,7 +1049,7 @@ where .record("timeline_id", field::display(timeline_id)); self.check_permission(Some(tenant_id))?; - let timeline = get_active_tenant_timeline(tenant_id, timeline_id, &ctx).await?; + let timeline = get_active_tenant_timeline(tenant_id, timeline_id).await?; let end_of_timeline = timeline.get_last_record_rlsn(); @@ -1232,7 +1233,7 @@ where self.check_permission(Some(tenant_id))?; - let tenant = get_active_tenant_with_timeout(tenant_id, &ctx).await?; + let tenant = get_active_tenant_with_timeout(tenant_id, ACTIVE_TENANT_TIMEOUT).await?; pgb.write_message_noflush(&BeMessage::RowDescription(&[ RowDescriptor::int8_col(b"checkpoint_distance"), RowDescriptor::int8_col(b"checkpoint_timeout"), @@ -1278,21 +1279,6 @@ where } } -#[derive(thiserror::Error, Debug)] -enum GetActiveTenantError { - #[error( - "Timed out waiting {wait_time:?} for tenant active state. Latest state: {latest_state:?}" - )] - WaitForActiveTimeout { - latest_state: TenantState, - wait_time: Duration, - }, - #[error(transparent)] - NotFound(GetTenantError), - #[error(transparent)] - WaitTenantActive(tenant::WaitToBecomeActiveError), -} - impl From for QueryError { fn from(e: GetActiveTenantError) -> Self { match e { @@ -1305,47 +1291,6 @@ impl From for QueryError { } } -/// Get active tenant. -/// -/// If the tenant is Loading, waits for it to become Active, for up to 30 s. That -/// ensures that queries don't fail immediately after pageserver startup, because -/// all tenants are still loading. -async fn get_active_tenant_with_timeout( - tenant_id: TenantId, - _ctx: &RequestContext, /* require get a context to support cancellation in the future */ -) -> Result, GetActiveTenantError> { - let tenant = match mgr::get_tenant(tenant_id, false) { - Ok(tenant) => tenant, - Err(e @ GetTenantError::NotFound(_)) => return Err(GetActiveTenantError::NotFound(e)), - Err(GetTenantError::NotActive(_)) => { - unreachable!("we're calling get_tenant with active_only=false") - } - Err(GetTenantError::Broken(_)) => { - unreachable!("we're calling get_tenant with active_only=false") - } - Err(GetTenantError::MapState(_)) => { - unreachable!("TenantManager is initialized before page service starts") - } - }; - let wait_time = Duration::from_secs(30); - match tokio::time::timeout(wait_time, tenant.wait_to_become_active()).await { - Ok(Ok(())) => Ok(tenant), - // no .context(), the error message is good enough and some tests depend on it - Ok(Err(e)) => Err(GetActiveTenantError::WaitTenantActive(e)), - Err(_) => { - let latest_state = tenant.current_state(); - if latest_state == TenantState::Active { - Ok(tenant) - } else { - Err(GetActiveTenantError::WaitForActiveTimeout { - latest_state, - wait_time, - }) - } - } - } -} - #[derive(Debug, thiserror::Error)] enum GetActiveTimelineError { #[error(transparent)] @@ -1367,9 +1312,8 @@ impl From for QueryError { async fn get_active_tenant_timeline( tenant_id: TenantId, timeline_id: TimelineId, - ctx: &RequestContext, ) -> Result, GetActiveTimelineError> { - let tenant = get_active_tenant_with_timeout(tenant_id, ctx) + let tenant = get_active_tenant_with_timeout(tenant_id, ACTIVE_TENANT_TIMEOUT) .await .map_err(GetActiveTimelineError::Tenant)?; let timeline = tenant diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 914098a1cf..08cd389f45 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -7,6 +7,7 @@ use std::borrow::Cow; use std::collections::HashMap; use std::ops::Deref; use std::sync::Arc; +use std::time::{Duration, Instant}; use tokio::fs; use anyhow::Context; @@ -939,6 +940,104 @@ pub(crate) fn get_tenant( } } +#[derive(thiserror::Error, Debug)] +pub(crate) enum GetActiveTenantError { + #[error( + "Timed out waiting {wait_time:?} for tenant active state. Latest state: {latest_state:?}" + )] + WaitForActiveTimeout { + latest_state: TenantState, + wait_time: Duration, + }, + #[error(transparent)] + NotFound(#[from] GetTenantError), + #[error(transparent)] + WaitTenantActive(crate::tenant::WaitToBecomeActiveError), +} + +/// Get a [`Tenant`] in its active state. If the tenant_id is currently in [`TenantSlot::InProgress`] +/// state, then wait for up to `timeout`. If the [`Tenant`] is not currently in [`TenantState::Active`], +/// then wait for up to `timeout` (minus however long we waited for the slot). +pub(crate) async fn get_active_tenant_with_timeout( + tenant_id: TenantId, + mut timeout: Duration, +) -> Result, GetActiveTenantError> { + enum WaitFor { + Barrier(utils::completion::Barrier), + Tenant(Arc), + } + + let wait_for = { + let locked = TENANTS.read().unwrap(); + let peek_slot = + tenant_map_peek_slot(&locked, &tenant_id, true).map_err(GetTenantError::MapState)?; + match peek_slot { + Some(TenantSlot::Attached(tenant)) => { + match tenant.current_state() { + TenantState::Active => { + // Fast path: we don't need to do any async waiting. + return Ok(tenant.clone()); + } + _ => WaitFor::Tenant(tenant.clone()), + } + } + Some(TenantSlot::Secondary) => { + return Err(GetActiveTenantError::NotFound(GetTenantError::NotActive( + tenant_id, + ))) + } + Some(TenantSlot::InProgress(barrier)) => WaitFor::Barrier(barrier.clone()), + None => { + return Err(GetActiveTenantError::NotFound(GetTenantError::NotFound( + tenant_id, + ))) + } + } + }; + + tracing::debug!("Waiting for tenant InProgress state to pass..."); + let tenant = match wait_for { + WaitFor::Barrier(barrier) => { + let wait_start = Instant::now(); + barrier.wait().await; + let wait_duration = Instant::now().duration_since(wait_start); + timeout -= wait_duration; + { + let locked = TENANTS.read().unwrap(); + let peek_slot = tenant_map_peek_slot(&locked, &tenant_id, true) + .map_err(GetTenantError::MapState)?; + match peek_slot { + Some(TenantSlot::Attached(tenant)) => tenant.clone(), + _ => { + return Err(GetActiveTenantError::NotFound(GetTenantError::NotActive( + tenant_id, + ))) + } + } + } + } + WaitFor::Tenant(tenant) => tenant, + }; + + tracing::debug!("Waiting for tenant to enter active state..."); + match tokio::time::timeout(timeout, tenant.wait_to_become_active()).await { + Ok(Ok(())) => Ok(tenant), + // no .context(), the error message is good enough and some tests depend on it + Ok(Err(e)) => Err(GetActiveTenantError::WaitTenantActive(e)), + Err(_) => { + let latest_state = tenant.current_state(); + if latest_state == TenantState::Active { + Ok(tenant) + } else { + Err(GetActiveTenantError::WaitForActiveTimeout { + latest_state, + wait_time: timeout, + }) + } + } + } +} + pub(crate) async fn delete_tenant( conf: &'static PageServerConf, remote_storage: Option,