From 933bb88694cb990e66e436c0be5dde7d29528d1b Mon Sep 17 00:00:00 2001 From: Alex Chi Z Date: Wed, 7 Aug 2024 19:57:33 +0800 Subject: [PATCH] Revert "refactor(page_service): Timeline gate guard holding + cancellation + shutdown (#8339)" This reverts commit 4e3b70e3081165ebd5ca1f93e90cb172bcf6a16e. --- pageserver/src/bin/pageserver.rs | 43 +- pageserver/src/http/routes.rs | 5 - pageserver/src/lib.rs | 10 +- pageserver/src/page_service.rs | 766 +++++++++--------- pageserver/src/tenant.rs | 2 - pageserver/src/tenant/mgr.rs | 6 +- pageserver/src/tenant/timeline.rs | 20 - pageserver/src/tenant/timeline/handle.rs | 967 ----------------------- 8 files changed, 432 insertions(+), 1387 deletions(-) delete mode 100644 pageserver/src/tenant/timeline/handle.rs diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 5ebd6511ac..2d00f311fb 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -17,9 +17,11 @@ use pageserver::config::PageserverIdentity; use pageserver::control_plane_client::ControlPlaneClient; use pageserver::disk_usage_eviction_task::{self, launch_disk_usage_global_eviction_task}; use pageserver::metrics::{STARTUP_DURATION, STARTUP_IS_LOADING}; -use pageserver::task_mgr::{COMPUTE_REQUEST_RUNTIME, WALRECEIVER_RUNTIME}; +use pageserver::task_mgr::WALRECEIVER_RUNTIME; use pageserver::tenant::{secondary, TenantSharedResources}; -use pageserver::{CancellableTask, ConsumptionMetricsTasks, HttpEndpointListener}; +use pageserver::{ + CancellableTask, ConsumptionMetricsTasks, HttpEndpointListener, LibpqEndpointListener, +}; use remote_storage::GenericRemoteStorage; use tokio::signal::unix::SignalKind; use tokio::time::Instant; @@ -29,9 +31,11 @@ use tracing::*; use metrics::set_build_info_metric; use pageserver::{ config::PageServerConf, + context::{DownloadBehavior, RequestContext}, deletion_queue::DeletionQueue, http, page_cache, page_service, task_mgr, - task_mgr::{BACKGROUND_RUNTIME, MGMT_REQUEST_RUNTIME}, + task_mgr::TaskKind, + task_mgr::{BACKGROUND_RUNTIME, COMPUTE_REQUEST_RUNTIME, MGMT_REQUEST_RUNTIME}, tenant::mgr, virtual_file, }; @@ -590,13 +594,30 @@ fn start_pageserver( // Spawn a task to listen for libpq connections. It will spawn further tasks // for each connection. We created the listener earlier already. - let page_service = page_service::spawn(conf, tenant_manager.clone(), pg_auth, { - let _entered = COMPUTE_REQUEST_RUNTIME.enter(); // TcpListener::from_std requires it - pageserver_listener - .set_nonblocking(true) - .context("set listener to nonblocking")?; - tokio::net::TcpListener::from_std(pageserver_listener).context("create tokio listener")? - }); + let libpq_listener = { + let cancel = CancellationToken::new(); + let libpq_ctx = RequestContext::todo_child( + TaskKind::LibpqEndpointListener, + // listener task shouldn't need to download anything. (We will + // create a separate sub-contexts for each connection, with their + // own download behavior. This context is used only to listen and + // accept connections.) + DownloadBehavior::Error, + ); + + let task = COMPUTE_REQUEST_RUNTIME.spawn(task_mgr::exit_on_panic_or_error( + "libpq listener", + page_service::libpq_listener_main( + tenant_manager.clone(), + pg_auth, + pageserver_listener, + conf.pg_auth_type, + libpq_ctx, + cancel.clone(), + ), + )); + LibpqEndpointListener(CancellableTask { task, cancel }) + }; let mut shutdown_pageserver = Some(shutdown_pageserver.drop_guard()); @@ -624,7 +645,7 @@ fn start_pageserver( shutdown_pageserver.take(); pageserver::shutdown_pageserver( http_endpoint_listener, - page_service, + libpq_listener, consumption_metrics_tasks, disk_usage_eviction_task, &tenant_manager, diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index fdab780bfb..0bd22bb72d 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -296,11 +296,6 @@ impl From for ApiError { GetActiveTenantError::WaitForActiveTimeout { .. } => { ApiError::ResourceUnavailable(format!("{}", e).into()) } - GetActiveTenantError::SwitchedTenant => { - // in our HTTP handlers, this error doesn't happen - // TODO: separate error types - ApiError::ResourceUnavailable("switched tenant".into()) - } } } } diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 5aee13cfc6..7251cd42e3 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -32,6 +32,7 @@ pub mod walingest; pub mod walrecord; pub mod walredo; +use crate::task_mgr::TaskKind; use camino::Utf8Path; use deletion_queue::DeletionQueue; use tenant::{ @@ -64,6 +65,7 @@ pub struct CancellableTask { pub cancel: CancellationToken, } pub struct HttpEndpointListener(pub CancellableTask); +pub struct LibpqEndpointListener(pub CancellableTask); pub struct ConsumptionMetricsTasks(pub CancellableTask); pub struct DiskUsageEvictionTask(pub CancellableTask); impl CancellableTask { @@ -77,7 +79,7 @@ impl CancellableTask { #[allow(clippy::too_many_arguments)] pub async fn shutdown_pageserver( http_listener: HttpEndpointListener, - page_service: page_service::Listener, + libpq_listener: LibpqEndpointListener, consumption_metrics_worker: ConsumptionMetricsTasks, disk_usage_eviction_task: Option, tenant_manager: &TenantManager, @@ -162,8 +164,8 @@ pub async fn shutdown_pageserver( // Shut down the libpq endpoint task. This prevents new connections from // being accepted. - let remaining_connections = timed( - page_service.stop_accepting(), + timed( + libpq_listener.0.shutdown(), "shutdown LibpqEndpointListener", Duration::from_secs(1), ) @@ -181,7 +183,7 @@ pub async fn shutdown_pageserver( // Shut down any page service tasks: any in-progress work for particular timelines or tenants // should already have been canclled via mgr::shutdown_all_tenants timed( - remaining_connections.shutdown(), + task_mgr::shutdown_tasks(Some(TaskKind::PageRequestHandler), None, None), "shutdown PageRequestHandlers", Duration::from_secs(1), ) diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 5344b83e0d..6353f713e0 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -4,8 +4,9 @@ use anyhow::Context; use async_compression::tokio::write::GzipEncoder; use bytes::Buf; -use futures::FutureExt; -use once_cell::sync::OnceCell; +use futures::stream::FuturesUnordered; +use futures::StreamExt; +use pageserver_api::key::Key; use pageserver_api::models::TenantState; use pageserver_api::models::{ PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse, @@ -14,23 +15,28 @@ use pageserver_api::models::{ PagestreamGetSlruSegmentRequest, PagestreamGetSlruSegmentResponse, PagestreamNblocksRequest, PagestreamNblocksResponse, PagestreamProtocolVersion, }; +use pageserver_api::shard::ShardIndex; +use pageserver_api::shard::ShardNumber; use pageserver_api::shard::TenantShardId; use postgres_backend::{is_expected_io_error, AuthType, PostgresBackend, QueryError}; use pq_proto::framed::ConnectionError; use pq_proto::FeStartupPacket; use pq_proto::{BeMessage, FeMessage, RowDescriptor}; use std::borrow::Cow; +use std::collections::HashMap; use std::io; +use std::net::TcpListener; use std::str; use std::str::FromStr; use std::sync::Arc; +use std::time::Duration; +use std::time::Instant; use std::time::SystemTime; -use std::time::{Duration, Instant}; use tokio::io::AsyncWriteExt; use tokio::io::{AsyncRead, AsyncWrite}; -use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; use tracing::*; +use utils::sync::gate::GateGuard; use utils::{ auth::{Claims, Scope, SwappableJwtAuth}, id::{TenantId, TimelineId}, @@ -41,130 +47,61 @@ use utils::{ use crate::auth::check_permission; use crate::basebackup; use crate::basebackup::BasebackupError; -use crate::config::PageServerConf; use crate::context::{DownloadBehavior, RequestContext}; use crate::metrics; use crate::metrics::{ComputeCommandKind, COMPUTE_COMMANDS_COUNTERS, LIVE_CONNECTIONS}; use crate::pgdatadir_mapping::Version; use crate::span::debug_assert_current_span_has_tenant_and_timeline_id; use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id; +use crate::task_mgr; use crate::task_mgr::TaskKind; -use crate::task_mgr::{self, COMPUTE_REQUEST_RUNTIME}; +use crate::tenant::mgr::GetActiveTenantError; +use crate::tenant::mgr::GetTenantError; +use crate::tenant::mgr::ShardResolveResult; use crate::tenant::mgr::ShardSelector; use crate::tenant::mgr::TenantManager; -use crate::tenant::mgr::{GetActiveTenantError, GetTenantError, ShardResolveResult}; -use crate::tenant::timeline::{self, WaitLsnError}; +use crate::tenant::timeline::WaitLsnError; use crate::tenant::GetTimelineError; use crate::tenant::PageReconstructError; +use crate::tenant::Tenant; use crate::tenant::Timeline; use pageserver_api::key::rel_block_to_key; use pageserver_api::reltag::SlruKind; use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID; use postgres_ffi::BLCKSZ; -/// How long we may wait for a [`crate::tenant::mgr::TenantSlot::InProgress`]` and/or a [`crate::tenant::Tenant`] which -/// is not yet in state [`TenantState::Active`]. -/// -/// NB: this is a different value than [`crate::http::routes::ACTIVE_TENANT_TIMEOUT`]. +// How long we may wait for a [`TenantSlot::InProgress`]` and/or a [`Tenant`] which +// is not yet in state [`TenantState::Active`]. const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(30000); /////////////////////////////////////////////////////////////////////////////// -pub struct Listener { - cancel: CancellationToken, - /// Cancel the listener task through `listen_cancel` to shut down the listener - /// and get a handle on the existing connections. - task: JoinHandle, -} - -pub struct Connections { - cancel: CancellationToken, - tasks: tokio::task::JoinSet, -} - -pub fn spawn( - conf: &'static PageServerConf, - tenant_manager: Arc, - pg_auth: Option>, - tcp_listener: tokio::net::TcpListener, -) -> Listener { - let cancel = CancellationToken::new(); - let libpq_ctx = RequestContext::todo_child( - TaskKind::LibpqEndpointListener, - // listener task shouldn't need to download anything. (We will - // create a separate sub-contexts for each connection, with their - // own download behavior. This context is used only to listen and - // accept connections.) - DownloadBehavior::Error, - ); - let task = COMPUTE_REQUEST_RUNTIME.spawn(task_mgr::exit_on_panic_or_error( - "libpq listener", - libpq_listener_main( - tenant_manager, - pg_auth, - tcp_listener, - conf.pg_auth_type, - libpq_ctx, - cancel.clone(), - ) - .map(anyhow::Ok), - )); - - Listener { cancel, task } -} - -impl Listener { - pub async fn stop_accepting(self) -> Connections { - self.cancel.cancel(); - self.task - .await - .expect("unreachable: we wrap the listener task in task_mgr::exit_on_panic_or_error") - } -} -impl Connections { - pub async fn shutdown(self) { - let Self { cancel, mut tasks } = self; - cancel.cancel(); - while let Some(res) = tasks.join_next().await { - // the logging done here mimics what was formerly done by task_mgr - match res { - Ok(Ok(())) => {} - Ok(Err(e)) => error!("error in page_service connection task: {:?}", e), - Err(e) => error!("page_service connection task panicked: {:?}", e), - } - } - } -} - /// /// Main loop of the page service. /// /// Listens for connections, and launches a new handler task for each. /// -/// Returns Ok(()) upon cancellation via `cancel`, returning the set of -/// open connections. -/// pub async fn libpq_listener_main( tenant_manager: Arc, auth: Option>, - listener: tokio::net::TcpListener, + listener: TcpListener, auth_type: AuthType, listener_ctx: RequestContext, - listener_cancel: CancellationToken, -) -> Connections { - let connections_cancel = CancellationToken::new(); - let mut connection_handler_tasks = tokio::task::JoinSet::default(); + cancel: CancellationToken, +) -> anyhow::Result<()> { + listener.set_nonblocking(true)?; + let tokio_listener = tokio::net::TcpListener::from_std(listener)?; // Wait for a new connection to arrive, or for server shutdown. while let Some(res) = tokio::select! { biased; - _ = listener_cancel.cancelled() => { + _ = cancel.cancelled() => { // We were requested to shut down. None } - res = listener.accept() => { + res = tokio_listener.accept() => { Some(res) } } { @@ -173,16 +110,28 @@ pub async fn libpq_listener_main( // Connection established. Spawn a new task to handle it. debug!("accepted connection from {}", peer_addr); let local_auth = auth.clone(); + let connection_ctx = listener_ctx .detached_child(TaskKind::PageRequestHandler, DownloadBehavior::Download); - connection_handler_tasks.spawn(page_service_conn_main( - tenant_manager.clone(), - local_auth, - socket, - auth_type, - connection_ctx, - connections_cancel.child_token(), - )); + + // PageRequestHandler tasks are not associated with any particular + // timeline in the task manager. In practice most connections will + // only deal with a particular timeline, but we don't know which one + // yet. + task_mgr::spawn( + &tokio::runtime::Handle::current(), + TaskKind::PageRequestHandler, + None, + None, + "serving compute connection task", + page_service_conn_main( + tenant_manager.clone(), + local_auth, + socket, + auth_type, + connection_ctx, + ), + ); } Err(err) => { // accept() failed. Log the error, and loop back to retry on next connection. @@ -191,16 +140,11 @@ pub async fn libpq_listener_main( } } - debug!("page_service listener loop terminated"); + debug!("page_service loop terminated"); - Connections { - cancel: connections_cancel, - tasks: connection_handler_tasks, - } + Ok(()) } -type ConnectionHandlerResult = anyhow::Result<()>; - #[instrument(skip_all, fields(peer_addr))] async fn page_service_conn_main( tenant_manager: Arc, @@ -208,8 +152,7 @@ async fn page_service_conn_main( socket: tokio::net::TcpStream, auth_type: AuthType, connection_ctx: RequestContext, - cancel: CancellationToken, -) -> ConnectionHandlerResult { +) -> anyhow::Result<()> { let _guard = LIVE_CONNECTIONS .with_label_values(&["page_service"]) .guard(); @@ -257,11 +200,13 @@ async fn page_service_conn_main( // and create a child per-query context when it invokes process_query. // But it's in a shared crate, so, we store connection_ctx inside PageServerHandler // and create the per-query context in process_query ourselves. - let mut conn_handler = - PageServerHandler::new(tenant_manager, auth, connection_ctx, cancel.clone()); + let mut conn_handler = PageServerHandler::new(tenant_manager, auth, connection_ctx); let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?; - match pgbackend.run(&mut conn_handler, &cancel).await { + match pgbackend + .run(&mut conn_handler, &task_mgr::shutdown_token()) + .await + { Ok(()) => { // we've been requested to shut down Ok(()) @@ -278,154 +223,32 @@ async fn page_service_conn_main( } } +/// While a handler holds a reference to a Timeline, it also holds a the +/// timeline's Gate open. +struct HandlerTimeline { + timeline: Arc, + _guard: GateGuard, +} + struct PageServerHandler { auth: Option>, claims: Option, + tenant_manager: Arc, + /// The context created for the lifetime of the connection /// services by this PageServerHandler. /// For each query received over the connection, /// `process_query` creates a child context from this one. connection_ctx: RequestContext, - cancel: CancellationToken, - - timeline_handles: TimelineHandles, -} - -struct TimelineHandles { - wrapper: TenantManagerWrapper, + /// See [`Self::cache_timeline`] for usage. + /// /// Note on size: the typical size of this map is 1. The largest size we expect /// to see is the number of shards divided by the number of pageservers (typically < 2), /// or the ratio used when splitting shards (i.e. how many children created from one) /// parent shard, where a "large" number might be ~8. - handles: timeline::handle::Cache, -} - -impl TimelineHandles { - fn new(tenant_manager: Arc) -> Self { - Self { - wrapper: TenantManagerWrapper { - tenant_manager, - tenant_id: OnceCell::new(), - }, - handles: Default::default(), - } - } - async fn get( - &mut self, - tenant_id: TenantId, - timeline_id: TimelineId, - shard_selector: ShardSelector, - ) -> Result, GetActiveTimelineError> { - if *self.wrapper.tenant_id.get_or_init(|| tenant_id) != tenant_id { - return Err(GetActiveTimelineError::Tenant( - GetActiveTenantError::SwitchedTenant, - )); - } - self.handles - .get(timeline_id, shard_selector, &self.wrapper) - .await - .map_err(|e| match e { - timeline::handle::GetError::TenantManager(e) => e, - timeline::handle::GetError::TimelineGateClosed => { - trace!("timeline gate closed"); - GetActiveTimelineError::Timeline(GetTimelineError::ShuttingDown) - } - timeline::handle::GetError::PerTimelineStateShutDown => { - trace!("per-timeline state shut down"); - GetActiveTimelineError::Timeline(GetTimelineError::ShuttingDown) - } - }) - } -} - -pub(crate) struct TenantManagerWrapper { - tenant_manager: Arc, - // We do not support switching tenant_id on a connection at this point. - // We can can add support for this later if needed without changing - // the protocol. - tenant_id: once_cell::sync::OnceCell, -} - -#[derive(Debug)] -pub(crate) struct TenantManagerTypes; - -impl timeline::handle::Types for TenantManagerTypes { - type TenantManagerError = GetActiveTimelineError; - type TenantManager = TenantManagerWrapper; - type Timeline = Arc; -} - -impl timeline::handle::ArcTimeline for Arc { - fn gate(&self) -> &utils::sync::gate::Gate { - &self.gate - } - - fn shard_timeline_id(&self) -> timeline::handle::ShardTimelineId { - Timeline::shard_timeline_id(self) - } - - fn per_timeline_state(&self) -> &timeline::handle::PerTimelineState { - &self.handles - } - - fn get_shard_identity(&self) -> &pageserver_api::shard::ShardIdentity { - Timeline::get_shard_identity(self) - } -} - -impl timeline::handle::TenantManager for TenantManagerWrapper { - async fn resolve( - &self, - timeline_id: TimelineId, - shard_selector: ShardSelector, - ) -> Result, GetActiveTimelineError> { - let tenant_id = self.tenant_id.get().expect("we set this in get()"); - let timeout = ACTIVE_TENANT_TIMEOUT; - let wait_start = Instant::now(); - let deadline = wait_start + timeout; - let tenant_shard = loop { - let resolved = self - .tenant_manager - .resolve_attached_shard(tenant_id, shard_selector); - match resolved { - ShardResolveResult::Found(tenant_shard) => break tenant_shard, - ShardResolveResult::NotFound => { - return Err(GetActiveTimelineError::Tenant( - GetActiveTenantError::NotFound(GetTenantError::NotFound(*tenant_id)), - )); - } - ShardResolveResult::InProgress(barrier) => { - // We can't authoritatively answer right now: wait for InProgress state - // to end, then try again - tokio::select! { - _ = barrier.wait() => { - // The barrier completed: proceed around the loop to try looking up again - }, - _ = tokio::time::sleep(deadline.duration_since(Instant::now())) => { - return Err(GetActiveTimelineError::Tenant(GetActiveTenantError::WaitForActiveTimeout { - latest_state: None, - wait_time: timeout, - })); - } - } - } - }; - }; - - tracing::debug!("Waiting for tenant to enter active state..."); - tenant_shard - .wait_to_become_active(deadline.duration_since(Instant::now())) - .await - .map_err(GetActiveTimelineError::Tenant)?; - - let timeline = tenant_shard - .get_timeline(timeline_id, true) - .map_err(GetActiveTimelineError::Timeline)?; - set_tracing_field_shard_id(&timeline); - Ok(timeline) - } + shard_timelines: HashMap, } #[derive(thiserror::Error, Debug)] @@ -469,11 +292,7 @@ impl From for PageStreamError { impl From for PageStreamError { fn from(value: GetActiveTimelineError) -> Self { match value { - GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled) - | GetActiveTimelineError::Tenant(GetActiveTenantError::WillNotBecomeActive( - TenantState::Stopping { .. }, - )) - | GetActiveTimelineError::Timeline(GetTimelineError::ShuttingDown) => Self::Shutdown, + GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled) => Self::Shutdown, GetActiveTimelineError::Tenant(e) => Self::NotFound(format!("{e}").into()), GetActiveTimelineError::Timeline(e) => Self::NotFound(format!("{e}").into()), } @@ -505,17 +324,64 @@ impl PageServerHandler { tenant_manager: Arc, auth: Option>, connection_ctx: RequestContext, - cancel: CancellationToken, ) -> Self { PageServerHandler { + tenant_manager, auth, claims: None, connection_ctx, - timeline_handles: TimelineHandles::new(tenant_manager), - cancel, + shard_timelines: HashMap::new(), } } + /// Future that completes when we need to shut down the connection. + /// + /// We currently need to shut down when any of the following happens: + /// 1. any of the timelines we hold GateGuards for in `shard_timelines` is cancelled + /// 2. task_mgr requests shutdown of the connection + /// + /// NB on (1): the connection's lifecycle is not actually tied to any of the + /// `shard_timelines`s' lifecycles. But it's _necessary_ in the current + /// implementation to be responsive to timeline cancellation because + /// the connection holds their `GateGuards` open (sored in `shard_timelines`). + /// We currently do the easy thing and terminate the connection if any of the + /// shard_timelines gets cancelled. But really, we cuold spend more effort + /// and simply remove the cancelled timeline from the `shard_timelines`, thereby + /// dropping the guard. + /// + /// NB: keep in sync with [`Self::is_connection_cancelled`] + async fn await_connection_cancelled(&self) { + // A short wait before we expend the cycles to walk our timeline map. This avoids incurring + // that cost every time we check for cancellation. + tokio::time::sleep(Duration::from_millis(10)).await; + + // This function is never called concurrently with code that adds timelines to shard_timelines, + // which is enforced by the borrow checker (the future returned by this function carries the + // immutable &self). So it's fine to evaluate shard_timelines after the sleep, we don't risk + // missing any inserts to the map. + + let mut cancellation_sources = Vec::with_capacity(1 + self.shard_timelines.len()); + use futures::future::Either; + cancellation_sources.push(Either::Left(task_mgr::shutdown_watcher())); + cancellation_sources.extend( + self.shard_timelines + .values() + .map(|ht| Either::Right(ht.timeline.cancel.cancelled())), + ); + FuturesUnordered::from_iter(cancellation_sources) + .next() + .await; + } + + /// Checking variant of [`Self::await_connection_cancelled`]. + fn is_connection_cancelled(&self) -> bool { + task_mgr::is_shutdown_requested() + || self + .shard_timelines + .values() + .any(|ht| ht.timeline.cancel.is_cancelled() || ht.timeline.is_stopping()) + } + /// This function always respects cancellation of any timeline in `[Self::shard_timelines]`. Pass in /// a cancellation token at the next scope up (such as a tenant cancellation token) to ensure we respect /// cancellation if there aren't any timelines in the cache. @@ -534,21 +400,15 @@ impl PageServerHandler { flush_r = pgb.flush() => { Ok(flush_r?) }, + _ = self.await_connection_cancelled() => { + Err(QueryError::Shutdown) + } _ = cancel.cancelled() => { Err(QueryError::Shutdown) } ) } - /// Pagestream sub-protocol handler. - /// - /// It is a simple request-response protocol inside a COPYBOTH session. - /// - /// # Coding Discipline - /// - /// Coding discipline within this function: all interaction with the `pgb` connection - /// needs to be sensitive to connection shutdown, currently signalled via [`Self::cancel`]. - /// This is so that we can shutdown page_service quickly. #[instrument(skip_all)] async fn handle_pagerequests( &mut self, @@ -563,27 +423,27 @@ impl PageServerHandler { { debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id(); + let tenant = self + .get_active_tenant_with_timeout(tenant_id, ShardSelector::First, ACTIVE_TENANT_TIMEOUT) + .await?; + // switch client to COPYBOTH pgb.write_message_noflush(&BeMessage::CopyBothResponse)?; - tokio::select! { - biased; - _ = self.cancel.cancelled() => { - return Err(QueryError::Shutdown) - } - res = pgb.flush() => { - res?; - } - } + self.flush_cancellable(pgb, &tenant.cancel).await?; loop { - // read request bytes (it's exactly 1 PagestreamFeMessage per CopyData) let msg = tokio::select! { biased; - _ = self.cancel.cancelled() => { + + _ = self.await_connection_cancelled() => { + // We were requested to shut down. + info!("shutdown request received in page handler"); return Err(QueryError::Shutdown) } + msg = pgb.read_message() => { msg } }; + let copy_data_bytes = match msg? { Some(FeMessage::CopyData(bytes)) => bytes, Some(FeMessage::Terminate) => break, @@ -598,12 +458,13 @@ impl PageServerHandler { trace!("query: {copy_data_bytes:?}"); fail::fail_point!("ps::handle-pagerequest-message"); - // parse request let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader(), protocol_version)?; - // invoke handler function - let (handler_result, span) = match neon_fe_msg { + // TODO: We could create a new per-request context here, with unique ID. + // Currently we use the same per-timeline context for all requests + + let (response, span) = match neon_fe_msg { PagestreamFeMessage::Exists(req) => { fail::fail_point!("ps::handle-pagerequest-message::exists"); let span = tracing::info_span!("handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.request_lsn); @@ -657,26 +518,31 @@ impl PageServerHandler { } }; - // Map handler result to protocol behavior. - // Some handler errors cause exit from pagestream protocol. - // Other handler errors are sent back as an error message and we stay in pagestream protocol. - let response_msg = match handler_result { - Err(e) => match &e { - PageStreamError::Shutdown => { - // If we fail to fulfil a request during shutdown, which may be _because_ of - // shutdown, then do not send the error to the client. Instead just drop the - // connection. - span.in_scope(|| info!("dropping connection due to shutdown")); - return Err(QueryError::Shutdown); - } - PageStreamError::Reconnect(reason) => { - span.in_scope(|| info!("handler requested reconnect: {reason}")); - return Err(QueryError::Reconnect); - } - PageStreamError::Read(_) - | PageStreamError::LsnTimeout(_) - | PageStreamError::NotFound(_) - | PageStreamError::BadRequest(_) => { + match response { + Err(PageStreamError::Shutdown) => { + // If we fail to fulfil a request during shutdown, which may be _because_ of + // shutdown, then do not send the error to the client. Instead just drop the + // connection. + span.in_scope(|| info!("dropping connection due to shutdown")); + return Err(QueryError::Shutdown); + } + Err(PageStreamError::Reconnect(reason)) => { + span.in_scope(|| info!("handler requested reconnect: {reason}")); + return Err(QueryError::Reconnect); + } + Err(e) if self.is_connection_cancelled() => { + // This branch accomodates code within request handlers that returns an anyhow::Error instead of a clean + // shutdown error, this may be buried inside a PageReconstructError::Other for example. + // + // Requests may fail as soon as we are Stopping, even if the Timeline's cancellation token wasn't fired yet, + // because wait_lsn etc will drop out + // is_stopping(): [`Timeline::flush_and_shutdown`] has entered + // is_canceled(): [`Timeline::shutdown`]` has entered + span.in_scope(|| info!("dropped error response during shutdown: {e:#}")); + return Err(QueryError::Shutdown); + } + r => { + let response_msg = r.unwrap_or_else(|e| { // print the all details to the log with {:#}, but for the client the // error message is enough. Do not log if shutting down, as the anyhow::Error // here includes cancellation which is not an error. @@ -687,22 +553,10 @@ impl PageServerHandler { PagestreamBeMessage::Error(PagestreamErrorResponse { message: e.to_string(), }) - } - }, - Ok(response_msg) => response_msg, - }; + }); - // marshal & transmit response message - pgb.write_message_noflush(&BeMessage::CopyData(&response_msg.serialize()))?; - tokio::select! { - biased; - _ = self.cancel.cancelled() => { - // We were requested to shut down. - info!("shutdown request received in page handler"); - return Err(QueryError::Shutdown) - } - res = pgb.flush() => { - res?; + pgb.write_message_noflush(&BeMessage::CopyData(&response_msg.serialize()))?; + self.flush_cancellable(pgb, &tenant.cancel).await?; } } } @@ -790,7 +644,7 @@ impl PageServerHandler { #[instrument(skip_all, fields(shard_id, %lsn))] async fn handle_make_lsn_lease( - &mut self, + &self, pgb: &mut PostgresBackend, tenant_shard_id: TenantShardId, timeline_id: TimelineId, @@ -800,16 +654,10 @@ impl PageServerHandler { where IO: AsyncRead + AsyncWrite + Send + Sync + Unpin, { + let shard_selector = ShardSelector::Known(tenant_shard_id.to_index()); let timeline = self - .timeline_handles - .get( - tenant_shard_id.tenant_id, - timeline_id, - ShardSelector::Known(tenant_shard_id.to_index()), - ) + .get_active_tenant_timeline(tenant_shard_id.tenant_id, timeline_id, shard_selector) .await?; - set_tracing_field_shard_id(&timeline); - let lease = timeline.make_lsn_lease(lsn, timeline.get_lsn_lease_length(), ctx)?; let valid_until = lease .valid_until @@ -835,17 +683,14 @@ impl PageServerHandler { req: &PagestreamExistsRequest, ctx: &RequestContext, ) -> Result { - let timeline = self - .timeline_handles - .get(tenant_id, timeline_id, ShardSelector::Zero) - .await?; + let timeline = self.get_timeline_shard_zero(tenant_id, timeline_id).await?; let _timer = timeline .query_metrics .start_timer(metrics::SmgrQueryType::GetRelExists, ctx); let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); let lsn = Self::wait_or_get_last_lsn( - &timeline, + timeline, req.request_lsn, req.not_modified_since, &latest_gc_cutoff_lsn, @@ -870,10 +715,7 @@ impl PageServerHandler { req: &PagestreamNblocksRequest, ctx: &RequestContext, ) -> Result { - let timeline = self - .timeline_handles - .get(tenant_id, timeline_id, ShardSelector::Zero) - .await?; + let timeline = self.get_timeline_shard_zero(tenant_id, timeline_id).await?; let _timer = timeline .query_metrics @@ -881,7 +723,7 @@ impl PageServerHandler { let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); let lsn = Self::wait_or_get_last_lsn( - &timeline, + timeline, req.request_lsn, req.not_modified_since, &latest_gc_cutoff_lsn, @@ -906,10 +748,7 @@ impl PageServerHandler { req: &PagestreamDbSizeRequest, ctx: &RequestContext, ) -> Result { - let timeline = self - .timeline_handles - .get(tenant_id, timeline_id, ShardSelector::Zero) - .await?; + let timeline = self.get_timeline_shard_zero(tenant_id, timeline_id).await?; let _timer = timeline .query_metrics @@ -917,7 +756,7 @@ impl PageServerHandler { let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); let lsn = Self::wait_or_get_last_lsn( - &timeline, + timeline, req.request_lsn, req.not_modified_since, &latest_gc_cutoff_lsn, @@ -935,6 +774,122 @@ impl PageServerHandler { })) } + /// For most getpage requests, we will already have a Timeline to serve the request: this function + /// looks up such a Timeline synchronously and without touching any global state. + fn get_cached_timeline_for_page( + &mut self, + req: &PagestreamGetPageRequest, + ) -> Result<&Arc, Key> { + let key = if let Some((first_idx, first_timeline)) = self.shard_timelines.iter().next() { + // Fastest path: single sharded case + if first_idx.shard_count.count() == 1 { + return Ok(&first_timeline.timeline); + } + + let key = rel_block_to_key(req.rel, req.blkno); + let shard_num = first_timeline + .timeline + .get_shard_identity() + .get_shard_number(&key); + + // Fast path: matched the first timeline in our local handler map. This case is common if + // only one shard per tenant is attached to this pageserver. + if first_timeline.timeline.get_shard_identity().number == shard_num { + return Ok(&first_timeline.timeline); + } + + let shard_index = ShardIndex { + shard_number: shard_num, + shard_count: first_timeline.timeline.get_shard_identity().count, + }; + + // Fast-ish path: timeline is in the connection handler's local cache + if let Some(found) = self.shard_timelines.get(&shard_index) { + return Ok(&found.timeline); + } + + key + } else { + rel_block_to_key(req.rel, req.blkno) + }; + + Err(key) + } + + /// Having looked up the [`Timeline`] instance for a particular shard, cache it to enable + /// use in future requests without having to traverse [`crate::tenant::mgr::TenantManager`] + /// again. + /// + /// Note that all the Timelines in this cache are for the same timeline_id: they're differ + /// in which shard they belong to. When we serve a getpage@lsn request, we choose a shard + /// based on key. + /// + /// The typical size of this cache is 1, as we generally create shards to distribute work + /// across pageservers, so don't tend to have multiple shards for the same tenant on the + /// same pageserver. + fn cache_timeline( + &mut self, + timeline: Arc, + ) -> Result<&Arc, GetActiveTimelineError> { + let gate_guard = timeline + .gate + .enter() + .map_err(|_| GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled))?; + + let shard_index = timeline.tenant_shard_id.to_index(); + let entry = self + .shard_timelines + .entry(shard_index) + .or_insert(HandlerTimeline { + timeline, + _guard: gate_guard, + }); + + Ok(&entry.timeline) + } + + /// If [`Self::get_cached_timeline_for_page`] missed, then this function is used to populate the cache with + /// a Timeline to serve requests for this key, if such a Timeline is present on this pageserver. If no such + /// Timeline is found, then we will return an error (this indicates that the client is talking to the wrong node). + async fn load_timeline_for_page( + &mut self, + tenant_id: TenantId, + timeline_id: TimelineId, + key: Key, + ) -> anyhow::Result<&Arc, GetActiveTimelineError> { + // Slow path: we must call out to the TenantManager to find the timeline for this Key + let timeline = self + .get_active_tenant_timeline(tenant_id, timeline_id, ShardSelector::Page(key)) + .await?; + + self.cache_timeline(timeline) + } + + async fn get_timeline_shard_zero( + &mut self, + tenant_id: TenantId, + timeline_id: TimelineId, + ) -> anyhow::Result<&Arc, GetActiveTimelineError> { + // This is a borrow-checker workaround: we can't return from inside of the `if let Some` because + // that would be an immutable-borrow-self return, whereas later in the function we will use a mutable + // ref to salf. So instead, we first build a bool, and then return while not borrowing self. + let have_cached = if let Some((idx, _tl)) = self.shard_timelines.iter().next() { + idx.shard_number == ShardNumber(0) + } else { + false + }; + + if have_cached { + let entry = self.shard_timelines.iter().next().unwrap(); + Ok(&entry.1.timeline) + } else { + let timeline = self + .get_active_tenant_timeline(tenant_id, timeline_id, ShardSelector::Zero) + .await?; + Ok(self.cache_timeline(timeline)?) + } + } + #[instrument(skip_all, fields(shard_id))] async fn handle_get_page_at_lsn_request( &mut self, @@ -943,30 +898,33 @@ impl PageServerHandler { req: &PagestreamGetPageRequest, ctx: &RequestContext, ) -> Result { - let timeline = match self - .timeline_handles - .get( - tenant_id, - timeline_id, - ShardSelector::Page(rel_block_to_key(req.rel, req.blkno)), - ) - .await - { - Ok(tl) => tl, - Err(GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_))) => { - // We already know this tenant exists in general, because we resolved it at - // start of connection. Getting a NotFound here indicates that the shard containing - // the requested page is not present on this node: the client's knowledge of shard->pageserver - // mapping is out of date. - // - // Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via - // client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration - // and talk to a different pageserver. - return Err(PageStreamError::Reconnect( - "getpage@lsn request routed to wrong shard".into(), - )); + let timeline = match self.get_cached_timeline_for_page(req) { + Ok(tl) => { + set_tracing_field_shard_id(tl); + tl + } + Err(key) => { + match self + .load_timeline_for_page(tenant_id, timeline_id, key) + .await + { + Ok(t) => t, + Err(GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_))) => { + // We already know this tenant exists in general, because we resolved it at + // start of connection. Getting a NotFound here indicates that the shard containing + // the requested page is not present on this node: the client's knowledge of shard->pageserver + // mapping is out of date. + // + // Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via + // client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration + // and talk to a different pageserver. + return Err(PageStreamError::Reconnect( + "getpage@lsn request routed to wrong shard".into(), + )); + } + Err(e) => return Err(e.into()), + } } - Err(e) => return Err(e.into()), }; let _timer = timeline @@ -975,7 +933,7 @@ impl PageServerHandler { let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); let lsn = Self::wait_or_get_last_lsn( - &timeline, + timeline, req.request_lsn, req.not_modified_since, &latest_gc_cutoff_lsn, @@ -1000,10 +958,7 @@ impl PageServerHandler { req: &PagestreamGetSlruSegmentRequest, ctx: &RequestContext, ) -> Result { - let timeline = self - .timeline_handles - .get(tenant_id, timeline_id, ShardSelector::Zero) - .await?; + let timeline = self.get_timeline_shard_zero(tenant_id, timeline_id).await?; let _timer = timeline .query_metrics @@ -1011,7 +966,7 @@ impl PageServerHandler { let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); let lsn = Self::wait_or_get_last_lsn( - &timeline, + timeline, req.request_lsn, req.not_modified_since, &latest_gc_cutoff_lsn, @@ -1032,15 +987,6 @@ impl PageServerHandler { /// Full basebackups should only be used for debugging purposes. /// Originally, it was introduced to enable breaking storage format changes, /// but that is not applicable anymore. - /// - /// # Coding Discipline - /// - /// Coding discipline within this function: all interaction with the `pgb` connection - /// needs to be sensitive to connection shutdown, currently signalled via [`Self::cancel`]. - /// This is so that we can shutdown page_service quickly. - /// - /// TODO: wrap the pgb that we pass to the basebackup handler so that it's sensitive - /// to connection cancellation. #[allow(clippy::too_many_arguments)] #[instrument(skip_all, fields(shard_id, ?lsn, ?prev_lsn, %full_backup))] async fn handle_basebackup_request( @@ -1066,11 +1012,10 @@ impl PageServerHandler { let started = std::time::Instant::now(); + // check that the timeline exists let timeline = self - .timeline_handles - .get(tenant_id, timeline_id, ShardSelector::Zero) + .get_active_tenant_timeline(tenant_id, timeline_id, ShardSelector::Zero) .await?; - let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); if let Some(lsn) = lsn { // Backup was requested at a particular LSN. Wait for it to arrive. @@ -1092,7 +1037,7 @@ impl PageServerHandler { // switch client to COPYOUT pgb.write_message_noflush(&BeMessage::CopyOutResponse) .map_err(QueryError::Disconnected)?; - self.flush_cancellable(pgb, &self.cancel).await?; + self.flush_cancellable(pgb, &timeline.cancel).await?; // Send a tarball of the latest layer on the timeline. Compress if not // fullbackup. TODO Compress in that case too (tests need to be updated) @@ -1183,6 +1128,77 @@ impl PageServerHandler { .expect("claims presence already checked"); check_permission(claims, tenant_id).map_err(|e| QueryError::Unauthorized(e.0)) } + + /// Shorthand for getting a reference to a Timeline of an Active tenant. + async fn get_active_tenant_timeline( + &self, + tenant_id: TenantId, + timeline_id: TimelineId, + selector: ShardSelector, + ) -> Result, GetActiveTimelineError> { + let tenant = self + .get_active_tenant_with_timeout(tenant_id, selector, ACTIVE_TENANT_TIMEOUT) + .await + .map_err(GetActiveTimelineError::Tenant)?; + let timeline = tenant.get_timeline(timeline_id, true)?; + set_tracing_field_shard_id(&timeline); + Ok(timeline) + } + + /// Get a shard's [`Tenant`] in its active state, if present. If we don't find the shard and some + /// slots for this tenant are `InProgress` then we will wait. + /// If we find the [`Tenant`] and it's not yet in state [`TenantState::Active`], we will wait. + /// + /// `timeout` is used as a total timeout for the whole wait operation. + async fn get_active_tenant_with_timeout( + &self, + tenant_id: TenantId, + shard_selector: ShardSelector, + timeout: Duration, + ) -> Result, GetActiveTenantError> { + let wait_start = Instant::now(); + let deadline = wait_start + timeout; + + // Resolve TenantId to TenantShardId. This is usually a quick one-shot thing, the loop is + // for handling the rare case that the slot we're accessing is InProgress. + let tenant_shard = loop { + let resolved = self + .tenant_manager + .resolve_attached_shard(&tenant_id, shard_selector); + match resolved { + ShardResolveResult::Found(tenant_shard) => break tenant_shard, + ShardResolveResult::NotFound => { + return Err(GetActiveTenantError::NotFound(GetTenantError::NotFound( + tenant_id, + ))); + } + ShardResolveResult::InProgress(barrier) => { + // We can't authoritatively answer right now: wait for InProgress state + // to end, then try again + tokio::select! { + _ = self.await_connection_cancelled() => { + return Err(GetActiveTenantError::Cancelled) + }, + _ = barrier.wait() => { + // The barrier completed: proceed around the loop to try looking up again + }, + _ = tokio::time::sleep(deadline.duration_since(Instant::now())) => { + return Err(GetActiveTenantError::WaitForActiveTimeout { + latest_state: None, + wait_time: timeout, + }); + } + } + } + }; + }; + + tracing::debug!("Waiting for tenant to enter active state..."); + tenant_shard + .wait_to_become_active(deadline.duration_since(Instant::now())) + .await?; + Ok(tenant_shard) + } } #[async_trait::async_trait] @@ -1489,7 +1505,7 @@ impl From for QueryError { } #[derive(Debug, thiserror::Error)] -pub(crate) enum GetActiveTimelineError { +enum GetActiveTimelineError { #[error(transparent)] Tenant(GetActiveTenantError), #[error(transparent)] diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 989ed0d4eb..563b4de8b0 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -447,8 +447,6 @@ impl WalRedoManager { #[derive(Debug, thiserror::Error, PartialEq, Eq)] pub enum GetTimelineError { - #[error("Timeline is shutting down")] - ShuttingDown, #[error("Timeline {tenant_id}/{timeline_id} is not active, state: {state:?}")] NotActive { tenant_id: TenantShardId, diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index b5568d37b5..0c5557d0a1 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -116,6 +116,8 @@ pub(crate) enum ShardSelector { /// Only return the 0th shard, if it is present. If a non-0th shard is present, /// ignore it. Zero, + /// Pick the first shard we find for the TenantId + First, /// Pick the shard that holds this key Page(Key), /// The shard ID is known: pick the given shard @@ -2092,6 +2094,7 @@ impl TenantManager { }; match selector { + ShardSelector::First => return ShardResolveResult::Found(tenant.clone()), ShardSelector::Zero if slot.0.shard_number == ShardNumber(0) => { return ShardResolveResult::Found(tenant.clone()) } @@ -2173,9 +2176,6 @@ pub(crate) enum GetActiveTenantError { /// never happen. #[error("Tenant is broken: {0}")] Broken(String), - - #[error("reconnect to switch tenant id")] - SwitchedTenant, } #[derive(Debug, thiserror::Error)] diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index a05e4e0712..f6f8664a8a 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -3,7 +3,6 @@ pub(crate) mod compaction; pub mod delete; pub(crate) mod detach_ancestor; mod eviction_task; -pub(crate) mod handle; mod init; pub mod layer_manager; pub(crate) mod logical_size; @@ -18,7 +17,6 @@ use camino::Utf8Path; use chrono::{DateTime, Utc}; use enumset::EnumSet; use fail::fail_point; -use handle::ShardTimelineId; use once_cell::sync::Lazy; use pageserver_api::{ key::{ @@ -427,8 +425,6 @@ pub struct Timeline { pub(crate) extra_test_dense_keyspace: ArcSwap, pub(crate) l0_flush_global_state: L0FlushGlobalState, - - pub(crate) handles: handle::PerTimelineState, } pub struct WalReceiverInfo { @@ -1656,9 +1652,6 @@ impl Timeline { tracing::debug!("Cancelling CancellationToken"); self.cancel.cancel(); - // Ensure Prevent new page service requests from starting. - self.handles.shutdown(); - // Transition the remote_client into a state where it's only useful for timeline deletion. // (The deletion use case is why we can't just hook up remote_client to Self::cancel).) self.remote_client.stop(); @@ -2184,8 +2177,6 @@ impl Timeline { extra_test_dense_keyspace: ArcSwap::new(Arc::new(KeySpace::default())), l0_flush_global_state: resources.l0_flush_global_state, - - handles: Default::default(), }; result.repartition_threshold = result.get_checkpoint_distance() / REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE; @@ -3245,17 +3236,6 @@ impl Timeline { &self.shard_identity } - #[inline(always)] - pub(crate) fn shard_timeline_id(&self) -> ShardTimelineId { - ShardTimelineId { - shard_index: ShardIndex { - shard_number: self.shard_identity.number, - shard_count: self.shard_identity.count, - }, - timeline_id: self.timeline_id, - } - } - /// /// Get a handle to the latest layer for appending. /// diff --git a/pageserver/src/tenant/timeline/handle.rs b/pageserver/src/tenant/timeline/handle.rs deleted file mode 100644 index e82559b8b3..0000000000 --- a/pageserver/src/tenant/timeline/handle.rs +++ /dev/null @@ -1,967 +0,0 @@ -//! An efficient way to keep the timeline gate open without preventing -//! timeline shutdown for longer than a single call to a timeline method. -//! -//! # Motivation -//! -//! On a single page service connection, we're typically serving a single TenantTimelineId. -//! -//! Without sharding, there is a single Timeline object to which we dispatch -//! all requests. For example, a getpage request gets dispatched to the -//! Timeline::get method of the Timeline object that represents the -//! (tenant,timeline) of that connection. -//! -//! With sharding, for each request that comes in on the connection, -//! we first have to perform shard routing based on the requested key (=~ page number). -//! The result of shard routing is a Timeline object. -//! We then dispatch the request to that Timeline object. -//! -//! Regardless of whether the tenant is sharded or not, we want to ensure that -//! we hold the Timeline gate open while we're invoking the method on the -//! Timeline object. -//! -//! However, we want to avoid the overhead of entering the gate for every -//! method invocation. -//! -//! Further, for shard routing, we want to avoid calling the tenant manager to -//! resolve the shard for every request. Instead, we want to cache the -//! routing result so we can bypass the tenant manager for all subsequent requests -//! that get routed to that shard. -//! -//! Regardless of how we accomplish the above, it should not -//! prevent the Timeline from shutting down promptly. -//! -//! # Design -//! -//! There are three user-facing data structures: -//! - `PerTimelineState`: a struct embedded into each Timeline struct. Lifetime == Timeline lifetime. -//! - `Cache`: a struct private to each connection handler; Lifetime == connection lifetime. -//! - `Handle`: a smart pointer that holds the Timeline gate open and derefs to `&Timeline`. -//! Lifetime: for a single request dispatch on the Timeline (i.e., one getpage request) -//! -//! The `Handle` is just a wrapper around an `Arc`. -//! -//! There is one long-lived `Arc`, which is stored in the `PerTimelineState`. -//! The `Cache` stores a `Weak` for each cached Timeline. -//! -//! To dispatch a request, the page service connection calls `Cache::get`. -//! -//! A cache miss means we consult the tenant manager for shard routing, -//! resulting in an `Arc`. We enter its gate _once_ and construct an -//! `Arc`. We store a `Weak` in the cache -//! and the `Arc` in the `PerTimelineState`. -//! -//! For subsequent requests, `Cache::get` will perform a "fast path" shard routing -//! and find the `Weak` in the cache. -//! We upgrade the `Weak` to an `Arc` and wrap it in the user-facing `Handle` type. -//! -//! The request handler dispatches the request to the right `>::$request_method`. -//! It then drops the `Handle`, which drops the `Arc`. -//! -//! # Memory Management / How The Reference Cycle Is Broken -//! -//! The attentive reader may have noticed the strong reference cycle -//! from `Arc` to `PerTimelineState` to `Arc`. -//! -//! This cycle is intentional: while it exists, the `Cache` can upgrade its -//! `Weak` to an `Arc` in a single atomic operation. -//! -//! The cycle is broken by either -//! - `PerTimelineState::shutdown` or -//! - dropping the `Cache`. -//! -//! Concurrently existing `Handle`s will extend the existence of the cycle. -//! However, since `Handle`s are short-lived and new `Handle`s are not -//! handed out after either `PerTimelineState::shutdown` or `Cache` drop, -//! that extension of the cycle is bounded. -//! -//! # Fast Path for Shard Routing -//! -//! The `Cache` has a fast path for shard routing to avoid calling into -//! the tenant manager for every request. -//! -//! The `Cache` maintains a hash map of `ShardTimelineId` to `Weak`. -//! -//! The current implementation uses the first entry in the hash map -//! to determine the `ShardParameters` and derive the correct -//! `ShardIndex` for the requested key. -//! -//! It then looks up the hash map for that `ShardTimelineId := {ShardIndex,TimelineId}`. -//! -//! If the lookup is successful and the `Weak` can be upgraded, -//! it's a hit. -//! -//! ## Cache invalidation -//! -//! The insight is that cache invalidation is sufficient and most efficiently done lazily. -//! The only reasons why an entry in the cache can become stale are: -//! 1. The `PerTimelineState` / Timeline is shutting down e.g. because the shard is -//! being detached, timeline or shard deleted, or pageserver is shutting down. -//! 2. We're doing a shard split and new traffic should be routed to the child shards. -//! -//! Regarding (1), we will eventually fail to upgrade the `Weak` once the -//! timeline has shut down, and when that happens, we remove the entry from the cache. -//! -//! Regarding (2), the insight is that it is toally fine to keep dispatching requests -//! to the parent shard during a shard split. Eventually, the shard split task will -//! shut down the parent => case (1). - -use std::collections::hash_map; -use std::collections::HashMap; -use std::sync::atomic::AtomicBool; -use std::sync::atomic::Ordering; -use std::sync::Arc; -use std::sync::Mutex; -use std::sync::Weak; - -use pageserver_api::shard::ShardIdentity; -use tracing::instrument; -use tracing::trace; -use utils::id::TimelineId; -use utils::shard::ShardIndex; -use utils::shard::ShardNumber; - -use crate::tenant::mgr::ShardSelector; - -/// The requirement for Debug is so that #[derive(Debug)] works in some places. -pub(crate) trait Types: Sized + std::fmt::Debug { - type TenantManagerError: Sized + std::fmt::Debug; - type TenantManager: TenantManager + Sized; - type Timeline: ArcTimeline + Sized; -} - -/// Uniquely identifies a [`Cache`] instance over the lifetime of the process. -/// Required so [`Cache::drop`] can take out the handles from the [`PerTimelineState`]. -/// Alternative to this would be to allocate [`Cache`] in a `Box` and identify it by the pointer. -#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] -struct CacheId(u64); - -impl CacheId { - fn next() -> Self { - static NEXT_ID: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(1); - let id = NEXT_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed); - if id == 0 { - panic!("CacheId::new() returned 0, overflow"); - } - Self(id) - } -} - -/// See module-level comment. -pub(crate) struct Cache { - id: CacheId, - map: Map, -} - -type Map = HashMap>>; - -impl Default for Cache { - fn default() -> Self { - Self { - id: CacheId::next(), - map: Default::default(), - } - } -} - -#[derive(PartialEq, Eq, Debug, Hash, Clone, Copy)] -pub(crate) struct ShardTimelineId { - pub(crate) shard_index: ShardIndex, - pub(crate) timeline_id: TimelineId, -} - -/// See module-level comment. -pub(crate) struct Handle(Arc>); -struct HandleInner { - shut_down: AtomicBool, - timeline: T::Timeline, - // The timeline's gate held open. - _gate_guard: utils::sync::gate::GateGuard, -} - -/// Embedded in each [`Types::Timeline`] as the anchor for the only long-lived strong ref to `HandleInner`. -/// -/// See module-level comment for details. -pub struct PerTimelineState { - // None = shutting down - handles: Mutex>>>>, -} - -impl Default for PerTimelineState { - fn default() -> Self { - Self { - handles: Mutex::new(Some(Default::default())), - } - } -} - -/// Abstract view of [`crate::tenant::mgr`], for testability. -pub(crate) trait TenantManager { - /// Invoked by [`Cache::get`] to resolve a [`ShardTimelineId`] to a [`Types::Timeline`]. - /// Errors are returned as [`GetError::TenantManager`]. - async fn resolve( - &self, - timeline_id: TimelineId, - shard_selector: ShardSelector, - ) -> Result; -} - -/// Abstract view of an [`Arc`], for testability. -pub(crate) trait ArcTimeline: Clone { - fn gate(&self) -> &utils::sync::gate::Gate; - fn shard_timeline_id(&self) -> ShardTimelineId; - fn get_shard_identity(&self) -> &ShardIdentity; - fn per_timeline_state(&self) -> &PerTimelineState; -} - -/// Errors returned by [`Cache::get`]. -#[derive(Debug)] -pub(crate) enum GetError { - TenantManager(T::TenantManagerError), - TimelineGateClosed, - PerTimelineStateShutDown, -} - -/// Internal type used in [`Cache::get`]. -enum RoutingResult { - FastPath(Handle), - SlowPath(ShardTimelineId), - NeedConsultTenantManager, -} - -impl Cache { - /// See module-level comment for details. - /// - /// Does NOT check for the shutdown state of [`Types::Timeline`]. - /// Instead, the methods of [`Types::Timeline`] that are invoked through - /// the [`Handle`] are responsible for checking these conditions - /// and if so, return an error that causes the page service to - /// close the connection. - #[instrument(level = "trace", skip_all)] - pub(crate) async fn get( - &mut self, - timeline_id: TimelineId, - shard_selector: ShardSelector, - tenant_manager: &T::TenantManager, - ) -> Result, GetError> { - // terminates because each iteration removes an element from the map - loop { - let handle = self - .get_impl(timeline_id, shard_selector, tenant_manager) - .await?; - if handle.0.shut_down.load(Ordering::Relaxed) { - let removed = self - .map - .remove(&handle.0.timeline.shard_timeline_id()) - .expect("invariant of get_impl is that the returned handle is in the map"); - assert!( - Weak::ptr_eq(&removed, &Arc::downgrade(&handle.0)), - "shard_timeline_id() incorrect?" - ); - } else { - return Ok(handle); - } - } - } - - #[instrument(level = "trace", skip_all)] - async fn get_impl( - &mut self, - timeline_id: TimelineId, - shard_selector: ShardSelector, - tenant_manager: &T::TenantManager, - ) -> Result, GetError> { - let miss: ShardSelector = { - let routing_state = self.shard_routing(timeline_id, shard_selector); - match routing_state { - RoutingResult::FastPath(handle) => return Ok(handle), - RoutingResult::SlowPath(key) => match self.map.get(&key) { - Some(cached) => match cached.upgrade() { - Some(upgraded) => return Ok(Handle(upgraded)), - None => { - trace!("handle cache stale"); - self.map.remove(&key).unwrap(); - ShardSelector::Known(key.shard_index) - } - }, - None => ShardSelector::Known(key.shard_index), - }, - RoutingResult::NeedConsultTenantManager => shard_selector, - } - }; - self.get_miss(timeline_id, miss, tenant_manager).await - } - - #[inline(always)] - fn shard_routing( - &mut self, - timeline_id: TimelineId, - shard_selector: ShardSelector, - ) -> RoutingResult { - loop { - // terminates because when every iteration we remove an element from the map - let Some((first_key, first_handle)) = self.map.iter().next() else { - return RoutingResult::NeedConsultTenantManager; - }; - let Some(first_handle) = first_handle.upgrade() else { - // TODO: dedup with get() - trace!("handle cache stale"); - let first_key_owned = *first_key; - self.map.remove(&first_key_owned).unwrap(); - continue; - }; - - let first_handle_shard_identity = first_handle.timeline.get_shard_identity(); - let make_shard_index = |shard_num: ShardNumber| ShardIndex { - shard_number: shard_num, - shard_count: first_handle_shard_identity.count, - }; - - let need_idx = match shard_selector { - ShardSelector::Page(key) => { - make_shard_index(first_handle_shard_identity.get_shard_number(&key)) - } - ShardSelector::Zero => make_shard_index(ShardNumber(0)), - ShardSelector::Known(shard_idx) => shard_idx, - }; - let need_shard_timeline_id = ShardTimelineId { - shard_index: need_idx, - timeline_id, - }; - let first_handle_shard_timeline_id = ShardTimelineId { - shard_index: first_handle_shard_identity.shard_index(), - timeline_id: first_handle.timeline.shard_timeline_id().timeline_id, - }; - - if need_shard_timeline_id == first_handle_shard_timeline_id { - return RoutingResult::FastPath(Handle(first_handle)); - } else { - return RoutingResult::SlowPath(need_shard_timeline_id); - } - } - } - - #[instrument(level = "trace", skip_all)] - #[inline(always)] - async fn get_miss( - &mut self, - timeline_id: TimelineId, - shard_selector: ShardSelector, - tenant_manager: &T::TenantManager, - ) -> Result, GetError> { - match tenant_manager.resolve(timeline_id, shard_selector).await { - Ok(timeline) => { - let key = timeline.shard_timeline_id(); - match &shard_selector { - ShardSelector::Zero => assert_eq!(key.shard_index.shard_number, ShardNumber(0)), - ShardSelector::Page(_) => (), // gotta trust tenant_manager - ShardSelector::Known(idx) => assert_eq!(idx, &key.shard_index), - } - - let gate_guard = match timeline.gate().enter() { - Ok(guard) => guard, - Err(_) => { - return Err(GetError::TimelineGateClosed); - } - }; - trace!("creating new HandleInner"); - let handle = Arc::new( - // TODO: global metric that keeps track of the number of live HandlerTimeline instances - // so we can identify reference cycle bugs. - HandleInner { - shut_down: AtomicBool::new(false), - _gate_guard: gate_guard, - timeline: timeline.clone(), - }, - ); - let handle = { - let mut lock_guard = timeline - .per_timeline_state() - .handles - .lock() - .expect("mutex poisoned"); - match &mut *lock_guard { - Some(per_timeline_state) => { - let replaced = per_timeline_state.insert(self.id, Arc::clone(&handle)); - assert!(replaced.is_none(), "some earlier code left a stale handle"); - match self.map.entry(key) { - hash_map::Entry::Occupied(_o) => { - // This cannot not happen because - // 1. we're the _miss_ handle, i.e., `self.map` didn't contain an entry and - // 2. we were holding &mut self during .resolve().await above, so, no other thread can have inserted a handle - // while we were waiting for the tenant manager. - unreachable!() - } - hash_map::Entry::Vacant(v) => { - v.insert(Arc::downgrade(&handle)); - handle - } - } - } - None => { - return Err(GetError::PerTimelineStateShutDown); - } - } - }; - Ok(Handle(handle)) - } - Err(e) => Err(GetError::TenantManager(e)), - } - } -} - -impl PerTimelineState { - /// After this method returns, [`Cache::get`] will never again return a [`Handle`] - /// to the [`Types::Timeline`] that embeds this per-timeline state. - /// Even if [`TenantManager::resolve`] would still resolve to it. - /// - /// Already-alive [`Handle`]s for will remain open, usable, and keeping the [`ArcTimeline`] alive. - /// That's ok because they're short-lived. See module-level comment for details. - #[instrument(level = "trace", skip_all)] - pub(super) fn shutdown(&self) { - let handles = self - .handles - .lock() - .expect("mutex poisoned") - // NB: this .take() sets locked to None. - // That's what makes future `Cache::get` misses fail. - // Cache hits are taken care of below. - .take(); - let Some(handles) = handles else { - trace!("already shut down"); - return; - }; - for handle in handles.values() { - // Make hits fail. - handle.shut_down.store(true, Ordering::Relaxed); - } - drop(handles); - } -} - -impl std::ops::Deref for Handle { - type Target = T::Timeline; - fn deref(&self) -> &Self::Target { - &self.0.timeline - } -} - -#[cfg(test)] -impl Drop for HandleInner { - fn drop(&mut self) { - trace!("HandleInner dropped"); - } -} - -// When dropping a [`Cache`], prune its handles in the [`PerTimelineState`] to break the reference cycle. -impl Drop for Cache { - fn drop(&mut self) { - for (_, weak) in self.map.drain() { - if let Some(strong) = weak.upgrade() { - // handle is still being kept alive in PerTimelineState - let timeline = strong.timeline.per_timeline_state(); - let mut handles = timeline.handles.lock().expect("mutex poisoned"); - if let Some(handles) = &mut *handles { - let Some(removed) = handles.remove(&self.id) else { - // There could have been a shutdown inbetween us upgrading the weak and locking the mutex. - continue; - }; - assert!(Arc::ptr_eq(&removed, &strong)); - } - } - } - } -} - -#[cfg(test)] -mod tests { - use pageserver_api::{ - key::{rel_block_to_key, Key, DBDIR_KEY}, - models::ShardParameters, - reltag::RelTag, - shard::ShardStripeSize, - }; - use utils::shard::ShardCount; - - use super::*; - - const FOREVER: std::time::Duration = std::time::Duration::from_secs(u64::MAX); - - #[derive(Debug)] - struct TestTypes; - impl Types for TestTypes { - type TenantManagerError = anyhow::Error; - type TenantManager = StubManager; - type Timeline = Arc; - } - - struct StubManager { - shards: Vec>, - } - - struct StubTimeline { - gate: utils::sync::gate::Gate, - id: TimelineId, - shard: ShardIdentity, - per_timeline_state: PerTimelineState, - myself: Weak, - } - - impl StubTimeline { - fn getpage(&self) { - // do nothing - } - } - - impl ArcTimeline for Arc { - fn gate(&self) -> &utils::sync::gate::Gate { - &self.gate - } - - fn shard_timeline_id(&self) -> ShardTimelineId { - ShardTimelineId { - shard_index: self.shard.shard_index(), - timeline_id: self.id, - } - } - - fn get_shard_identity(&self) -> &ShardIdentity { - &self.shard - } - - fn per_timeline_state(&self) -> &PerTimelineState { - &self.per_timeline_state - } - } - - impl TenantManager for StubManager { - async fn resolve( - &self, - timeline_id: TimelineId, - shard_selector: ShardSelector, - ) -> anyhow::Result> { - for timeline in &self.shards { - if timeline.id == timeline_id { - match &shard_selector { - ShardSelector::Zero if timeline.shard.is_shard_zero() => { - return Ok(Arc::clone(timeline)); - } - ShardSelector::Zero => continue, - ShardSelector::Page(key) if timeline.shard.is_key_local(key) => { - return Ok(Arc::clone(timeline)); - } - ShardSelector::Page(_) => continue, - ShardSelector::Known(idx) if idx == &timeline.shard.shard_index() => { - return Ok(Arc::clone(timeline)); - } - ShardSelector::Known(_) => continue, - } - } - } - anyhow::bail!("not found") - } - } - - #[tokio::test(start_paused = true)] - async fn test_timeline_shutdown() { - crate::tenant::harness::setup_logging(); - - let timeline_id = TimelineId::generate(); - let shard0 = Arc::new_cyclic(|myself| StubTimeline { - gate: Default::default(), - id: timeline_id, - shard: ShardIdentity::unsharded(), - per_timeline_state: PerTimelineState::default(), - myself: myself.clone(), - }); - let mgr = StubManager { - shards: vec![shard0.clone()], - }; - let key = DBDIR_KEY; - - let mut cache = Cache::::default(); - - // - // fill the cache - // - assert_eq!( - (Arc::strong_count(&shard0), Arc::weak_count(&shard0)), - (2, 1), - "strong: shard0, mgr; weak: myself" - ); - - let handle: Handle<_> = cache - .get(timeline_id, ShardSelector::Page(key), &mgr) - .await - .expect("we have the timeline"); - let handle_inner_weak = Arc::downgrade(&handle.0); - assert!(Weak::ptr_eq(&handle.myself, &shard0.myself)); - assert_eq!( - ( - Weak::strong_count(&handle_inner_weak), - Weak::weak_count(&handle_inner_weak) - ), - (2, 2), - "strong: handle, per_timeline_state, weak: handle_inner_weak, cache" - ); - assert_eq!(cache.map.len(), 1); - - assert_eq!( - (Arc::strong_count(&shard0), Arc::weak_count(&shard0)), - (3, 1), - "strong: handleinner(per_timeline_state), shard0, mgr; weak: myself" - ); - drop(handle); - assert_eq!( - (Arc::strong_count(&shard0), Arc::weak_count(&shard0)), - (3, 1), - "strong: handleinner(per_timeline_state), shard0, mgr; weak: myself" - ); - - // - // demonstrate that Handle holds up gate closure - // but shutdown prevents new handles from being handed out - // - - tokio::select! { - _ = shard0.gate.close() => { - panic!("cache and per-timeline handler state keep cache open"); - } - _ = tokio::time::sleep(FOREVER) => { - // NB: first poll of close() makes it enter closing state - } - } - - let handle = cache - .get(timeline_id, ShardSelector::Page(key), &mgr) - .await - .expect("we have the timeline"); - assert!(Weak::ptr_eq(&handle.myself, &shard0.myself)); - - // SHUTDOWN - shard0.per_timeline_state.shutdown(); // keeping handle alive across shutdown - - assert_eq!( - 1, - Weak::strong_count(&handle_inner_weak), - "through local var handle" - ); - assert_eq!( - cache.map.len(), - 1, - "this is an implementation detail but worth pointing out: we can't clear the cache from shutdown(), it's cleared on first access after" - ); - assert_eq!( - (Arc::strong_count(&shard0), Arc::weak_count(&shard0)), - (3, 1), - "strong: handleinner(via handle), shard0, mgr; weak: myself" - ); - - // this handle is perfectly usable - handle.getpage(); - - cache - .get(timeline_id, ShardSelector::Page(key), &mgr) - .await - .err() - .expect("documented behavior: can't get new handle after shutdown, even if there is an alive Handle"); - assert_eq!( - cache.map.len(), - 0, - "first access after shutdown cleans up the Weak's from the cache" - ); - - tokio::select! { - _ = shard0.gate.close() => { - panic!("handle is keeping gate open"); - } - _ = tokio::time::sleep(FOREVER) => { } - } - - drop(handle); - assert_eq!( - 0, - Weak::strong_count(&handle_inner_weak), - "the HandleInner destructor already ran" - ); - assert_eq!( - (Arc::strong_count(&shard0), Arc::weak_count(&shard0)), - (2, 1), - "strong: shard0, mgr; weak: myself" - ); - - // closing gate succeeds after dropping handle - tokio::select! { - _ = shard0.gate.close() => { } - _ = tokio::time::sleep(FOREVER) => { - panic!("handle is dropped, no other gate holders exist") - } - } - - // map gets cleaned on next lookup - cache - .get(timeline_id, ShardSelector::Page(key), &mgr) - .await - .err() - .expect("documented behavior: can't get new handle after shutdown"); - assert_eq!(cache.map.len(), 0); - - // ensure all refs to shard0 are gone and we're not leaking anything - let myself = Weak::clone(&shard0.myself); - drop(shard0); - drop(mgr); - assert_eq!(Weak::strong_count(&myself), 0); - } - - #[tokio::test] - async fn test_multiple_timelines_and_deletion() { - crate::tenant::harness::setup_logging(); - - let timeline_a = TimelineId::generate(); - let timeline_b = TimelineId::generate(); - assert_ne!(timeline_a, timeline_b); - let timeline_a = Arc::new_cyclic(|myself| StubTimeline { - gate: Default::default(), - id: timeline_a, - shard: ShardIdentity::unsharded(), - per_timeline_state: PerTimelineState::default(), - myself: myself.clone(), - }); - let timeline_b = Arc::new_cyclic(|myself| StubTimeline { - gate: Default::default(), - id: timeline_b, - shard: ShardIdentity::unsharded(), - per_timeline_state: PerTimelineState::default(), - myself: myself.clone(), - }); - let mut mgr = StubManager { - shards: vec![timeline_a.clone(), timeline_b.clone()], - }; - let key = DBDIR_KEY; - - let mut cache = Cache::::default(); - - cache - .get(timeline_a.id, ShardSelector::Page(key), &mgr) - .await - .expect("we have it"); - cache - .get(timeline_b.id, ShardSelector::Page(key), &mgr) - .await - .expect("we have it"); - assert_eq!(cache.map.len(), 2); - - // delete timeline A - timeline_a.per_timeline_state.shutdown(); - mgr.shards.retain(|t| t.id != timeline_a.id); - assert!( - mgr.resolve(timeline_a.id, ShardSelector::Page(key)) - .await - .is_err(), - "broken StubManager implementation" - ); - - assert_eq!( - cache.map.len(), - 2, - "cache still has a Weak handle to Timeline A" - ); - cache - .get(timeline_a.id, ShardSelector::Page(key), &mgr) - .await - .err() - .expect("documented behavior: can't get new handle after shutdown"); - assert_eq!(cache.map.len(), 1, "next access cleans up the cache"); - - cache - .get(timeline_b.id, ShardSelector::Page(key), &mgr) - .await - .expect("we still have it"); - } - - fn make_relation_key_for_shard(shard: ShardNumber, params: &ShardParameters) -> Key { - rel_block_to_key( - RelTag { - spcnode: 1663, - dbnode: 208101, - relnode: 2620, - forknum: 0, - }, - shard.0 as u32 * params.stripe_size.0, - ) - } - - #[tokio::test(start_paused = true)] - async fn test_shard_split() { - crate::tenant::harness::setup_logging(); - let timeline_id = TimelineId::generate(); - let parent = Arc::new_cyclic(|myself| StubTimeline { - gate: Default::default(), - id: timeline_id, - shard: ShardIdentity::unsharded(), - per_timeline_state: PerTimelineState::default(), - myself: myself.clone(), - }); - let child_params = ShardParameters { - count: ShardCount(2), - stripe_size: ShardStripeSize::default(), - }; - let child0 = Arc::new_cyclic(|myself| StubTimeline { - gate: Default::default(), - id: timeline_id, - shard: ShardIdentity::from_params(ShardNumber(0), &child_params), - per_timeline_state: PerTimelineState::default(), - myself: myself.clone(), - }); - let child1 = Arc::new_cyclic(|myself| StubTimeline { - gate: Default::default(), - id: timeline_id, - shard: ShardIdentity::from_params(ShardNumber(1), &child_params), - per_timeline_state: PerTimelineState::default(), - myself: myself.clone(), - }); - let child_shards_by_shard_number = [child0.clone(), child1.clone()]; - - let mut cache = Cache::::default(); - - // fill the cache with the parent - for i in 0..2 { - let handle = cache - .get( - timeline_id, - ShardSelector::Page(make_relation_key_for_shard(ShardNumber(i), &child_params)), - &StubManager { - shards: vec![parent.clone()], - }, - ) - .await - .expect("we have it"); - assert!( - Weak::ptr_eq(&handle.myself, &parent.myself), - "mgr returns parent first" - ); - drop(handle); - } - - // - // SHARD SPLIT: tenant manager changes, but the cache isn't informed - // - - // while we haven't shut down the parent, the cache will return the cached parent, even - // if the tenant manager returns the child - for i in 0..2 { - let handle = cache - .get( - timeline_id, - ShardSelector::Page(make_relation_key_for_shard(ShardNumber(i), &child_params)), - &StubManager { - shards: vec![], // doesn't matter what's in here, the cache is fully loaded - }, - ) - .await - .expect("we have it"); - assert!( - Weak::ptr_eq(&handle.myself, &parent.myself), - "mgr returns parent" - ); - drop(handle); - } - - let parent_handle = cache - .get( - timeline_id, - ShardSelector::Page(make_relation_key_for_shard(ShardNumber(0), &child_params)), - &StubManager { - shards: vec![parent.clone()], - }, - ) - .await - .expect("we have it"); - assert!(Weak::ptr_eq(&parent_handle.myself, &parent.myself)); - - // invalidate the cache - parent.per_timeline_state.shutdown(); - - // the cache will now return the child, even though the parent handle still exists - for i in 0..2 { - let handle = cache - .get( - timeline_id, - ShardSelector::Page(make_relation_key_for_shard(ShardNumber(i), &child_params)), - &StubManager { - shards: vec![child0.clone(), child1.clone()], // <====== this changed compared to previous loop - }, - ) - .await - .expect("we have it"); - assert!( - Weak::ptr_eq( - &handle.myself, - &child_shards_by_shard_number[i as usize].myself - ), - "mgr returns child" - ); - drop(handle); - } - - // all the while the parent handle kept the parent gate open - tokio::select! { - _ = parent_handle.gate.close() => { - panic!("parent handle is keeping gate open"); - } - _ = tokio::time::sleep(FOREVER) => { } - } - drop(parent_handle); - tokio::select! { - _ = parent.gate.close() => { } - _ = tokio::time::sleep(FOREVER) => { - panic!("parent handle is dropped, no other gate holders exist") - } - } - } - - #[tokio::test(start_paused = true)] - async fn test_connection_handler_exit() { - crate::tenant::harness::setup_logging(); - let timeline_id = TimelineId::generate(); - let shard0 = Arc::new_cyclic(|myself| StubTimeline { - gate: Default::default(), - id: timeline_id, - shard: ShardIdentity::unsharded(), - per_timeline_state: PerTimelineState::default(), - myself: myself.clone(), - }); - let mgr = StubManager { - shards: vec![shard0.clone()], - }; - let key = DBDIR_KEY; - - // Simulate 10 connections that's opened, used, and closed - let mut used_handles = vec![]; - for _ in 0..10 { - let mut cache = Cache::::default(); - let handle = { - let handle = cache - .get(timeline_id, ShardSelector::Page(key), &mgr) - .await - .expect("we have the timeline"); - assert!(Weak::ptr_eq(&handle.myself, &shard0.myself)); - handle - }; - handle.getpage(); - used_handles.push(Arc::downgrade(&handle.0)); - } - - // No handles exist, thus gates are closed and don't require shutdown - assert!(used_handles - .iter() - .all(|weak| Weak::strong_count(weak) == 0)); - - // ... thus the gate should close immediately, even without shutdown - tokio::select! { - _ = shard0.gate.close() => { } - _ = tokio::time::sleep(FOREVER) => { - panic!("handle is dropped, no other gate holders exist") - } - } - } -}