diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index a04195e12b..ba5b2608bd 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -647,17 +647,20 @@ fn start_pageserver( None, "libpq endpoint listener", true, - async move { - page_service::libpq_listener_main( - conf, - broker_client, - pg_auth, - pageserver_listener, - conf.pg_auth_type, - libpq_ctx, - task_mgr::shutdown_token(), - ) - .await + { + let tenant_manager = tenant_manager.clone(); + async move { + page_service::libpq_listener_main( + tenant_manager, + broker_client, + pg_auth, + pageserver_listener, + conf.pg_auth_type, + libpq_ctx, + task_mgr::shutdown_token(), + ) + .await + } }, ); } diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index f6b251283c..35aba044b2 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -32,6 +32,7 @@ use std::str; use std::str::FromStr; use std::sync::Arc; use std::time::Duration; +use std::time::Instant; use tokio::io::AsyncWriteExt; use tokio::io::{AsyncRead, AsyncWrite}; use tokio_util::io::StreamReader; @@ -49,7 +50,6 @@ use utils::{ use crate::auth::check_permission; use crate::basebackup; use crate::basebackup::BasebackupError; -use crate::config::PageServerConf; use crate::context::{DownloadBehavior, RequestContext}; use crate::import_datadir::import_wal_from_tar; use crate::metrics; @@ -59,13 +59,15 @@ use crate::span::debug_assert_current_span_has_tenant_and_timeline_id; use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id; use crate::task_mgr; use crate::task_mgr::TaskKind; -use crate::tenant::mgr; -use crate::tenant::mgr::get_active_tenant_with_timeout; use crate::tenant::mgr::GetActiveTenantError; +use crate::tenant::mgr::GetTenantError; +use crate::tenant::mgr::ShardResolveResult; use crate::tenant::mgr::ShardSelector; +use crate::tenant::mgr::TenantManager; use crate::tenant::timeline::WaitLsnError; use crate::tenant::GetTimelineError; use crate::tenant::PageReconstructError; +use crate::tenant::Tenant; use crate::tenant::Timeline; use crate::trace::Tracer; use pageserver_api::key::rel_block_to_key; @@ -135,7 +137,7 @@ async fn read_tar_eof(mut reader: (impl AsyncRead + Unpin)) -> anyhow::Result<() /// Listens for connections, and launches a new handler task for each. /// pub async fn libpq_listener_main( - conf: &'static PageServerConf, + tenant_manager: Arc, broker_client: storage_broker::BrokerClientChannel, auth: Option>, listener: TcpListener, @@ -180,7 +182,7 @@ pub async fn libpq_listener_main( "serving compute connection task", false, page_service_conn_main( - conf, + tenant_manager.clone(), broker_client.clone(), local_auth, socket, @@ -203,7 +205,7 @@ pub async fn libpq_listener_main( #[instrument(skip_all, fields(peer_addr))] async fn page_service_conn_main( - conf: &'static PageServerConf, + tenant_manager: Arc, broker_client: storage_broker::BrokerClientChannel, auth: Option>, socket: tokio::net::TcpStream, @@ -260,7 +262,8 @@ async fn page_service_conn_main( // and create a child per-query context when it invokes process_query. // But it's in a shared crate, so, we store connection_ctx inside PageServerHandler // and create the per-query context in process_query ourselves. - let mut conn_handler = PageServerHandler::new(conf, broker_client, auth, connection_ctx); + let mut conn_handler = + PageServerHandler::new(tenant_manager, broker_client, auth, connection_ctx); let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?; match pgbackend @@ -291,11 +294,12 @@ struct HandlerTimeline { } struct PageServerHandler { - _conf: &'static PageServerConf, broker_client: storage_broker::BrokerClientChannel, auth: Option>, claims: Option, + tenant_manager: Arc, + /// The context created for the lifetime of the connection /// services by this PageServerHandler. /// For each query received over the connection, @@ -381,13 +385,13 @@ impl From for QueryError { impl PageServerHandler { pub fn new( - conf: &'static PageServerConf, + tenant_manager: Arc, broker_client: storage_broker::BrokerClientChannel, auth: Option>, connection_ctx: RequestContext, ) -> Self { PageServerHandler { - _conf: conf, + tenant_manager, broker_client, auth, claims: None, @@ -552,13 +556,9 @@ impl PageServerHandler { { debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id(); - let tenant = mgr::get_active_tenant_with_timeout( - tenant_id, - ShardSelector::First, - ACTIVE_TENANT_TIMEOUT, - &task_mgr::shutdown_token(), - ) - .await?; + let tenant = self + .get_active_tenant_with_timeout(tenant_id, ShardSelector::First, ACTIVE_TENANT_TIMEOUT) + .await?; // Make request tracer if needed let mut tracer = if tenant.get_trace_read_requests() { @@ -726,13 +726,9 @@ impl PageServerHandler { // Create empty timeline info!("creating new timeline"); - let tenant = get_active_tenant_with_timeout( - tenant_id, - ShardSelector::Zero, - ACTIVE_TENANT_TIMEOUT, - &task_mgr::shutdown_token(), - ) - .await?; + let tenant = self + .get_active_tenant_with_timeout(tenant_id, ShardSelector::Zero, ACTIVE_TENANT_TIMEOUT) + .await?; let timeline = tenant .create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx) .await?; @@ -1370,18 +1366,69 @@ impl PageServerHandler { timeline_id: TimelineId, selector: ShardSelector, ) -> Result, GetActiveTimelineError> { - let tenant = get_active_tenant_with_timeout( - tenant_id, - selector, - ACTIVE_TENANT_TIMEOUT, - &task_mgr::shutdown_token(), - ) - .await - .map_err(GetActiveTimelineError::Tenant)?; + let tenant = self + .get_active_tenant_with_timeout(tenant_id, selector, ACTIVE_TENANT_TIMEOUT) + .await + .map_err(GetActiveTimelineError::Tenant)?; let timeline = tenant.get_timeline(timeline_id, true)?; set_tracing_field_shard_id(&timeline); Ok(timeline) } + + /// Get a shard's [`Tenant`] in its active state, if present. If we don't find the shard and some + /// slots for this tenant are `InProgress` then we will wait. + /// If we find the [`Tenant`] and it's not yet in state [`TenantState::Active`], we will wait. + /// + /// `timeout` is used as a total timeout for the whole wait operation. + async fn get_active_tenant_with_timeout( + &self, + tenant_id: TenantId, + shard_selector: ShardSelector, + timeout: Duration, + ) -> Result, GetActiveTenantError> { + let wait_start = Instant::now(); + let deadline = wait_start + timeout; + + // Resolve TenantId to TenantShardId. This is usually a quick one-shot thing, the loop is + // for handling the rare case that the slot we're accessing is InProgress. + let tenant_shard = loop { + let resolved = self + .tenant_manager + .resolve_attached_shard(&tenant_id, shard_selector); + match resolved { + ShardResolveResult::Found(tenant_shard) => break tenant_shard, + ShardResolveResult::NotFound => { + return Err(GetActiveTenantError::NotFound(GetTenantError::NotFound( + tenant_id, + ))); + } + ShardResolveResult::InProgress(barrier) => { + // We can't authoritatively answer right now: wait for InProgress state + // to end, then try again + tokio::select! { + _ = self.await_connection_cancelled() => { + return Err(GetActiveTenantError::Cancelled) + }, + _ = barrier.wait() => { + // The barrier completed: proceed around the loop to try looking up again + }, + _ = tokio::time::sleep(deadline.duration_since(Instant::now())) => { + return Err(GetActiveTenantError::WaitForActiveTimeout { + latest_state: None, + wait_time: timeout, + }); + } + } + } + }; + }; + + tracing::debug!("Waiting for tenant to enter active state..."); + tenant_shard + .wait_to_become_active(deadline.duration_since(Instant::now())) + .await?; + Ok(tenant_shard) + } } #[async_trait::async_trait] @@ -1771,13 +1818,13 @@ where self.check_permission(Some(tenant_id))?; - let tenant = get_active_tenant_with_timeout( - tenant_id, - ShardSelector::Zero, - ACTIVE_TENANT_TIMEOUT, - &task_mgr::shutdown_token(), - ) - .await?; + let tenant = self + .get_active_tenant_with_timeout( + tenant_id, + ShardSelector::Zero, + ACTIVE_TENANT_TIMEOUT, + ) + .await?; pgb.write_message_noflush(&BeMessage::RowDescription(&[ RowDescriptor::int8_col(b"checkpoint_distance"), RowDescriptor::int8_col(b"checkpoint_timeout"), diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 5abda7b64e..1d8e2cf6d3 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -16,10 +16,9 @@ use std::cmp::Ordering; use std::collections::{BTreeMap, HashMap}; use std::ops::Deref; use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::time::Duration; use sysinfo::SystemExt; use tokio::fs; -use utils::timeout::{timeout_cancellable, TimeoutCancellableError}; use anyhow::Context; use once_cell::sync::Lazy; @@ -119,6 +118,7 @@ pub(crate) enum TenantsMapRemoveResult { /// When resolving a TenantId to a shard, we may be looking for the 0th /// shard, or we might be looking for whichever shard holds a particular page. +#[derive(Copy, Clone)] pub(crate) enum ShardSelector { /// Only return the 0th shard, if it is present. If a non-0th shard is present, /// ignore it. @@ -169,6 +169,14 @@ impl TenantStartupMode { } } +/// Result type for looking up a TenantId to a specific shard +pub(crate) enum ShardResolveResult { + NotFound, + Found(Arc), + // Wait for this barrrier, then query again + InProgress(utils::completion::Barrier), +} + impl TenantsMap { /// Convenience function for typical usage, where we want to get a `Tenant` object, for /// working with attached tenants. If the TenantId is in the map but in Secondary state, @@ -182,51 +190,6 @@ impl TenantsMap { } } - /// A page service client sends a TenantId, and to look up the correct Tenant we must - /// resolve this to a fully qualified TenantShardId. - fn resolve_attached_shard( - &self, - tenant_id: &TenantId, - selector: ShardSelector, - ) -> Option { - let mut want_shard = None; - match self { - TenantsMap::Initializing => None, - TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => { - for slot in m.range(TenantShardId::tenant_range(*tenant_id)) { - // Ignore all slots that don't contain an attached tenant - let tenant = match &slot.1 { - TenantSlot::Attached(t) => t, - _ => continue, - }; - - match selector { - ShardSelector::First => return Some(*slot.0), - ShardSelector::Zero if slot.0.shard_number == ShardNumber(0) => { - return Some(*slot.0) - } - ShardSelector::Page(key) => { - // First slot we see for this tenant, calculate the expected shard number - // for the key: we will use this for checking if this and subsequent - // slots contain the key, rather than recalculating the hash each time. - if want_shard.is_none() { - want_shard = Some(tenant.shard_identity.get_shard_number(&key)); - } - - if Some(tenant.shard_identity.number) == want_shard { - return Some(*slot.0); - } - } - _ => continue, - } - } - - // Fall through: we didn't find an acceptable shard - None - } - } - } - /// Only for use from DeleteTenantFlow. This method directly removes a TenantSlot from the map. /// /// The normal way to remove a tenant is using a SlotGuard, which will gracefully remove the guarded @@ -2053,6 +2016,72 @@ impl TenantManager { Ok(reparented) } + + /// A page service client sends a TenantId, and to look up the correct Tenant we must + /// resolve this to a fully qualified TenantShardId. + /// + /// During shard splits: we shall see parent shards in InProgress state and skip them, and + /// instead match on child shards which should appear in Attached state. Very early in a shard + /// split, or in other cases where a shard is InProgress, we will return our own InProgress result + /// to instruct the caller to wait for that to finish before querying again. + pub(crate) fn resolve_attached_shard( + &self, + tenant_id: &TenantId, + selector: ShardSelector, + ) -> ShardResolveResult { + let tenants = self.tenants.read().unwrap(); + let mut want_shard = None; + let mut any_in_progress = None; + + match &*tenants { + TenantsMap::Initializing => ShardResolveResult::NotFound, + TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => { + for slot in m.range(TenantShardId::tenant_range(*tenant_id)) { + // Ignore all slots that don't contain an attached tenant + let tenant = match &slot.1 { + TenantSlot::Attached(t) => t, + TenantSlot::InProgress(barrier) => { + // We might still find a usable shard, but in case we don't, remember that + // we saw at least one InProgress slot, so that we can distinguish this case + // from a simple NotFound in our return value. + any_in_progress = Some(barrier.clone()); + continue; + } + _ => continue, + }; + + match selector { + ShardSelector::First => return ShardResolveResult::Found(tenant.clone()), + ShardSelector::Zero if slot.0.shard_number == ShardNumber(0) => { + return ShardResolveResult::Found(tenant.clone()) + } + ShardSelector::Page(key) => { + // First slot we see for this tenant, calculate the expected shard number + // for the key: we will use this for checking if this and subsequent + // slots contain the key, rather than recalculating the hash each time. + if want_shard.is_none() { + want_shard = Some(tenant.shard_identity.get_shard_number(&key)); + } + + if Some(tenant.shard_identity.number) == want_shard { + return ShardResolveResult::Found(tenant.clone()); + } + } + _ => continue, + } + } + + // Fall through: we didn't find a slot that was in Attached state & matched our selector. If + // we found one or more InProgress slot, indicate to caller that they should retry later. Otherwise + // this requested shard simply isn't found. + if let Some(barrier) = any_in_progress { + ShardResolveResult::InProgress(barrier) + } else { + ShardResolveResult::NotFound + } + } + } + } } #[derive(Debug, thiserror::Error)] @@ -2101,105 +2130,6 @@ pub(crate) enum GetActiveTenantError { Broken(String), } -/// Get a [`Tenant`] in its active state. If the tenant_id is currently in [`TenantSlot::InProgress`] -/// state, then wait for up to `timeout`. If the [`Tenant`] is not currently in [`TenantState::Active`], -/// then wait for up to `timeout` (minus however long we waited for the slot). -pub(crate) async fn get_active_tenant_with_timeout( - tenant_id: TenantId, - shard_selector: ShardSelector, - timeout: Duration, - cancel: &CancellationToken, -) -> Result, GetActiveTenantError> { - enum WaitFor { - Barrier(utils::completion::Barrier), - Tenant(Arc), - } - - let wait_start = Instant::now(); - let deadline = wait_start + timeout; - - let (wait_for, tenant_shard_id) = { - let locked = TENANTS.read().unwrap(); - - // Resolve TenantId to TenantShardId - let tenant_shard_id = locked - .resolve_attached_shard(&tenant_id, shard_selector) - .ok_or(GetActiveTenantError::NotFound(GetTenantError::NotFound( - tenant_id, - )))?; - - let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read) - .map_err(GetTenantError::MapState)?; - match peek_slot { - Some(TenantSlot::Attached(tenant)) => { - match tenant.current_state() { - TenantState::Active => { - // Fast path: we don't need to do any async waiting. - return Ok(tenant.clone()); - } - _ => { - tenant.activate_now(); - (WaitFor::Tenant(tenant.clone()), tenant_shard_id) - } - } - } - Some(TenantSlot::Secondary(_)) => { - return Err(GetActiveTenantError::NotFound(GetTenantError::NotActive( - tenant_shard_id, - ))) - } - Some(TenantSlot::InProgress(barrier)) => { - (WaitFor::Barrier(barrier.clone()), tenant_shard_id) - } - None => { - return Err(GetActiveTenantError::NotFound(GetTenantError::NotFound( - tenant_id, - ))) - } - } - }; - - let tenant = match wait_for { - WaitFor::Barrier(barrier) => { - tracing::debug!("Waiting for tenant InProgress state to pass..."); - timeout_cancellable( - deadline.duration_since(Instant::now()), - cancel, - barrier.wait(), - ) - .await - .map_err(|e| match e { - TimeoutCancellableError::Timeout => GetActiveTenantError::WaitForActiveTimeout { - latest_state: None, - wait_time: wait_start.elapsed(), - }, - TimeoutCancellableError::Cancelled => GetActiveTenantError::Cancelled, - })?; - { - let locked = TENANTS.read().unwrap(); - let peek_slot = - tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read) - .map_err(GetTenantError::MapState)?; - match peek_slot { - Some(TenantSlot::Attached(tenant)) => tenant.clone(), - _ => { - return Err(GetActiveTenantError::NotFound(GetTenantError::NotActive( - tenant_shard_id, - ))) - } - } - } - } - WaitFor::Tenant(tenant) => tenant, - }; - - tracing::debug!("Waiting for tenant to enter active state..."); - tenant - .wait_to_become_active(deadline.duration_since(Instant::now())) - .await?; - Ok(tenant) -} - #[derive(Debug, thiserror::Error)] pub(crate) enum DeleteTimelineError { #[error("Tenant {0}")]