mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 01:12:56 +00:00
pageserver: wait for InProgress slots in page service
This commit is contained in:
@@ -14,7 +14,6 @@ use async_compression::tokio::write::GzipEncoder;
|
||||
use bytes::Buf;
|
||||
use bytes::Bytes;
|
||||
use futures::Stream;
|
||||
use pageserver_api::models::TenantState;
|
||||
use pageserver_api::models::{
|
||||
PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
|
||||
PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse,
|
||||
@@ -55,16 +54,18 @@ use crate::metrics;
|
||||
use crate::metrics::LIVE_CONNECTIONS_COUNT;
|
||||
use crate::task_mgr;
|
||||
use crate::task_mgr::TaskKind;
|
||||
use crate::tenant;
|
||||
use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id;
|
||||
use crate::tenant::mgr;
|
||||
use crate::tenant::mgr::GetTenantError;
|
||||
use crate::tenant::{Tenant, Timeline};
|
||||
use crate::tenant::mgr::get_active_tenant_with_timeout;
|
||||
use crate::tenant::mgr::GetActiveTenantError;
|
||||
use crate::tenant::Timeline;
|
||||
use crate::trace::Tracer;
|
||||
|
||||
use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
|
||||
use postgres_ffi::BLCKSZ;
|
||||
|
||||
const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
|
||||
/// Read the end of a tar archive.
|
||||
///
|
||||
/// A tar archive normally ends with two consecutive blocks of zeros, 512 bytes each.
|
||||
@@ -389,7 +390,7 @@ impl PageServerHandler {
|
||||
task_mgr::associate_with(Some(tenant_id), Some(timeline_id));
|
||||
|
||||
// Make request tracer if needed
|
||||
let tenant = get_active_tenant_with_timeout(tenant_id, &ctx).await?;
|
||||
let tenant = mgr::get_active_tenant_with_timeout(tenant_id, ACTIVE_TENANT_TIMEOUT).await?;
|
||||
let mut tracer = if tenant.get_trace_read_requests() {
|
||||
let connection_id = ConnectionId::generate();
|
||||
let path = tenant
|
||||
@@ -525,7 +526,7 @@ impl PageServerHandler {
|
||||
task_mgr::associate_with(Some(tenant_id), Some(timeline_id));
|
||||
// Create empty timeline
|
||||
info!("creating new timeline");
|
||||
let tenant = get_active_tenant_with_timeout(tenant_id, &ctx).await?;
|
||||
let tenant = get_active_tenant_with_timeout(tenant_id, ACTIVE_TENANT_TIMEOUT).await?;
|
||||
let timeline = tenant
|
||||
.create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx)
|
||||
.await?;
|
||||
@@ -584,7 +585,7 @@ impl PageServerHandler {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
task_mgr::associate_with(Some(tenant_id), Some(timeline_id));
|
||||
|
||||
let timeline = get_active_tenant_timeline(tenant_id, timeline_id, &ctx).await?;
|
||||
let timeline = get_active_tenant_timeline(tenant_id, timeline_id).await?;
|
||||
let last_record_lsn = timeline.get_last_record_lsn();
|
||||
if last_record_lsn != start_lsn {
|
||||
return Err(QueryError::Other(
|
||||
@@ -792,7 +793,7 @@ impl PageServerHandler {
|
||||
let started = std::time::Instant::now();
|
||||
|
||||
// check that the timeline exists
|
||||
let timeline = get_active_tenant_timeline(tenant_id, timeline_id, &ctx).await?;
|
||||
let timeline = get_active_tenant_timeline(tenant_id, timeline_id).await?;
|
||||
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
|
||||
if let Some(lsn) = lsn {
|
||||
// Backup was requested at a particular LSN. Wait for it to arrive.
|
||||
@@ -1048,7 +1049,7 @@ where
|
||||
.record("timeline_id", field::display(timeline_id));
|
||||
|
||||
self.check_permission(Some(tenant_id))?;
|
||||
let timeline = get_active_tenant_timeline(tenant_id, timeline_id, &ctx).await?;
|
||||
let timeline = get_active_tenant_timeline(tenant_id, timeline_id).await?;
|
||||
|
||||
let end_of_timeline = timeline.get_last_record_rlsn();
|
||||
|
||||
@@ -1232,7 +1233,7 @@ where
|
||||
|
||||
self.check_permission(Some(tenant_id))?;
|
||||
|
||||
let tenant = get_active_tenant_with_timeout(tenant_id, &ctx).await?;
|
||||
let tenant = get_active_tenant_with_timeout(tenant_id, ACTIVE_TENANT_TIMEOUT).await?;
|
||||
pgb.write_message_noflush(&BeMessage::RowDescription(&[
|
||||
RowDescriptor::int8_col(b"checkpoint_distance"),
|
||||
RowDescriptor::int8_col(b"checkpoint_timeout"),
|
||||
@@ -1278,21 +1279,6 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
enum GetActiveTenantError {
|
||||
#[error(
|
||||
"Timed out waiting {wait_time:?} for tenant active state. Latest state: {latest_state:?}"
|
||||
)]
|
||||
WaitForActiveTimeout {
|
||||
latest_state: TenantState,
|
||||
wait_time: Duration,
|
||||
},
|
||||
#[error(transparent)]
|
||||
NotFound(GetTenantError),
|
||||
#[error(transparent)]
|
||||
WaitTenantActive(tenant::WaitToBecomeActiveError),
|
||||
}
|
||||
|
||||
impl From<GetActiveTenantError> for QueryError {
|
||||
fn from(e: GetActiveTenantError) -> Self {
|
||||
match e {
|
||||
@@ -1305,47 +1291,6 @@ impl From<GetActiveTenantError> for QueryError {
|
||||
}
|
||||
}
|
||||
|
||||
/// Get active tenant.
|
||||
///
|
||||
/// If the tenant is Loading, waits for it to become Active, for up to 30 s. That
|
||||
/// ensures that queries don't fail immediately after pageserver startup, because
|
||||
/// all tenants are still loading.
|
||||
async fn get_active_tenant_with_timeout(
|
||||
tenant_id: TenantId,
|
||||
_ctx: &RequestContext, /* require get a context to support cancellation in the future */
|
||||
) -> Result<Arc<Tenant>, GetActiveTenantError> {
|
||||
let tenant = match mgr::get_tenant(tenant_id, false) {
|
||||
Ok(tenant) => tenant,
|
||||
Err(e @ GetTenantError::NotFound(_)) => return Err(GetActiveTenantError::NotFound(e)),
|
||||
Err(GetTenantError::NotActive(_)) => {
|
||||
unreachable!("we're calling get_tenant with active_only=false")
|
||||
}
|
||||
Err(GetTenantError::Broken(_)) => {
|
||||
unreachable!("we're calling get_tenant with active_only=false")
|
||||
}
|
||||
Err(GetTenantError::MapState(_)) => {
|
||||
unreachable!("TenantManager is initialized before page service starts")
|
||||
}
|
||||
};
|
||||
let wait_time = Duration::from_secs(30);
|
||||
match tokio::time::timeout(wait_time, tenant.wait_to_become_active()).await {
|
||||
Ok(Ok(())) => Ok(tenant),
|
||||
// no .context(), the error message is good enough and some tests depend on it
|
||||
Ok(Err(e)) => Err(GetActiveTenantError::WaitTenantActive(e)),
|
||||
Err(_) => {
|
||||
let latest_state = tenant.current_state();
|
||||
if latest_state == TenantState::Active {
|
||||
Ok(tenant)
|
||||
} else {
|
||||
Err(GetActiveTenantError::WaitForActiveTimeout {
|
||||
latest_state,
|
||||
wait_time,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
enum GetActiveTimelineError {
|
||||
#[error(transparent)]
|
||||
@@ -1367,9 +1312,8 @@ impl From<GetActiveTimelineError> for QueryError {
|
||||
async fn get_active_tenant_timeline(
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Arc<Timeline>, GetActiveTimelineError> {
|
||||
let tenant = get_active_tenant_with_timeout(tenant_id, ctx)
|
||||
let tenant = get_active_tenant_with_timeout(tenant_id, ACTIVE_TENANT_TIMEOUT)
|
||||
.await
|
||||
.map_err(GetActiveTimelineError::Tenant)?;
|
||||
let timeline = tenant
|
||||
|
||||
@@ -7,6 +7,7 @@ use std::borrow::Cow;
|
||||
use std::collections::HashMap;
|
||||
use std::ops::Deref;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
use tokio::fs;
|
||||
|
||||
use anyhow::Context;
|
||||
@@ -939,6 +940,104 @@ pub(crate) fn get_tenant(
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub(crate) enum GetActiveTenantError {
|
||||
#[error(
|
||||
"Timed out waiting {wait_time:?} for tenant active state. Latest state: {latest_state:?}"
|
||||
)]
|
||||
WaitForActiveTimeout {
|
||||
latest_state: TenantState,
|
||||
wait_time: Duration,
|
||||
},
|
||||
#[error(transparent)]
|
||||
NotFound(#[from] GetTenantError),
|
||||
#[error(transparent)]
|
||||
WaitTenantActive(crate::tenant::WaitToBecomeActiveError),
|
||||
}
|
||||
|
||||
/// Get a [`Tenant`] in its active state. If the tenant_id is currently in [`TenantSlot::InProgress`]
|
||||
/// state, then wait for up to `timeout`. If the [`Tenant`] is not currently in [`TenantState::Active`],
|
||||
/// then wait for up to `timeout` (minus however long we waited for the slot).
|
||||
pub(crate) async fn get_active_tenant_with_timeout(
|
||||
tenant_id: TenantId,
|
||||
mut timeout: Duration,
|
||||
) -> Result<Arc<Tenant>, GetActiveTenantError> {
|
||||
enum WaitFor {
|
||||
Barrier(utils::completion::Barrier),
|
||||
Tenant(Arc<Tenant>),
|
||||
}
|
||||
|
||||
let wait_for = {
|
||||
let locked = TENANTS.read().unwrap();
|
||||
let peek_slot =
|
||||
tenant_map_peek_slot(&locked, &tenant_id, true).map_err(GetTenantError::MapState)?;
|
||||
match peek_slot {
|
||||
Some(TenantSlot::Attached(tenant)) => {
|
||||
match tenant.current_state() {
|
||||
TenantState::Active => {
|
||||
// Fast path: we don't need to do any async waiting.
|
||||
return Ok(tenant.clone());
|
||||
}
|
||||
_ => WaitFor::Tenant(tenant.clone()),
|
||||
}
|
||||
}
|
||||
Some(TenantSlot::Secondary) => {
|
||||
return Err(GetActiveTenantError::NotFound(GetTenantError::NotActive(
|
||||
tenant_id,
|
||||
)))
|
||||
}
|
||||
Some(TenantSlot::InProgress(barrier)) => WaitFor::Barrier(barrier.clone()),
|
||||
None => {
|
||||
return Err(GetActiveTenantError::NotFound(GetTenantError::NotFound(
|
||||
tenant_id,
|
||||
)))
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
tracing::debug!("Waiting for tenant InProgress state to pass...");
|
||||
let tenant = match wait_for {
|
||||
WaitFor::Barrier(barrier) => {
|
||||
let wait_start = Instant::now();
|
||||
barrier.wait().await;
|
||||
let wait_duration = Instant::now().duration_since(wait_start);
|
||||
timeout -= wait_duration;
|
||||
{
|
||||
let locked = TENANTS.read().unwrap();
|
||||
let peek_slot = tenant_map_peek_slot(&locked, &tenant_id, true)
|
||||
.map_err(GetTenantError::MapState)?;
|
||||
match peek_slot {
|
||||
Some(TenantSlot::Attached(tenant)) => tenant.clone(),
|
||||
_ => {
|
||||
return Err(GetActiveTenantError::NotFound(GetTenantError::NotActive(
|
||||
tenant_id,
|
||||
)))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
WaitFor::Tenant(tenant) => tenant,
|
||||
};
|
||||
|
||||
tracing::debug!("Waiting for tenant to enter active state...");
|
||||
match tokio::time::timeout(timeout, tenant.wait_to_become_active()).await {
|
||||
Ok(Ok(())) => Ok(tenant),
|
||||
// no .context(), the error message is good enough and some tests depend on it
|
||||
Ok(Err(e)) => Err(GetActiveTenantError::WaitTenantActive(e)),
|
||||
Err(_) => {
|
||||
let latest_state = tenant.current_state();
|
||||
if latest_state == TenantState::Active {
|
||||
Ok(tenant)
|
||||
} else {
|
||||
Err(GetActiveTenantError::WaitForActiveTimeout {
|
||||
latest_state,
|
||||
wait_time: timeout,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn delete_tenant(
|
||||
conf: &'static PageServerConf,
|
||||
remote_storage: Option<GenericRemoteStorage>,
|
||||
|
||||
Reference in New Issue
Block a user