diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 6103f4c885..bfc7e2312f 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -8,6 +8,7 @@ use anyhow::{anyhow, Context}; use clap::{Arg, ArgAction, Command}; use metrics::launch_timestamp::{set_launch_timestamp_metric, LaunchTimestamp}; +use pageserver::control_plane_client::{ControlPlaneClient, ControlPlaneGenerationsApi}; use pageserver::disk_usage_eviction_task::{self, launch_disk_usage_global_eviction_task}; use pageserver::metrics::{STARTUP_DURATION, STARTUP_IS_LOADING}; use pageserver::task_mgr::WALRECEIVER_RUNTIME; @@ -20,6 +21,7 @@ use metrics::set_build_info_metric; use pageserver::{ config::{defaults::*, PageServerConf}, context::{DownloadBehavior, RequestContext}, + deletion_queue::DeletionQueue, http, page_cache, page_service, task_mgr, task_mgr::TaskKind, task_mgr::{BACKGROUND_RUNTIME, COMPUTE_REQUEST_RUNTIME, MGMT_REQUEST_RUNTIME}, @@ -346,9 +348,54 @@ fn start_pageserver( } }; + // Top-level cancellation token for the process + let shutdown_pageserver = tokio_util::sync::CancellationToken::new(); + // Set up remote storage client let remote_storage = create_remote_storage_client(conf)?; + // Set up control plane client + let control_plane_client = match ControlPlaneClient::new(conf, &shutdown_pageserver) { + Some(c) => { + let inner: Arc = Arc::new(c); + Some(inner) + } + None => None, + }; + + // Set up deletion queue + let (deletion_queue, deletion_frontend, deletion_backend, deletion_executor) = + DeletionQueue::new( + remote_storage.clone(), + control_plane_client, + conf, + shutdown_pageserver.clone(), + ); + if let Some(mut deletion_frontend) = deletion_frontend { + BACKGROUND_RUNTIME.spawn(async move { + deletion_frontend + .background() + .instrument(info_span!(parent:None, "deletion frontend")) + .await + }); + } + if let Some(mut deletion_backend) = deletion_backend { + BACKGROUND_RUNTIME.spawn(async move { + deletion_backend + .background() + .instrument(info_span!(parent: None, "deletion backend")) + .await + }); + } + if let Some(mut deletion_executor) = deletion_executor { + BACKGROUND_RUNTIME.spawn(async move { + deletion_executor + .background() + .instrument(info_span!(parent: None, "deletion executor")) + .await + }); + } + // Up to this point no significant I/O has been done: this should have been fast. Record // duration prior to starting I/O intensive phase of startup. startup_checkpoint("initial", "Starting loading tenants"); @@ -379,8 +426,7 @@ fn start_pageserver( }; // Scan the local 'tenants/' directory and start loading the tenants - let shutdown_pageserver = tokio_util::sync::CancellationToken::new(); - + let deletion_queue_client = deletion_queue.new_client(); BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr( conf, TenantSharedResources { @@ -481,7 +527,7 @@ fn start_pageserver( http::routes::State::new( conf, http_auth.clone(), - remote_storage, + remote_storage.clone(), broker_client.clone(), disk_usage_eviction_state, ) @@ -607,7 +653,11 @@ fn start_pageserver( // Right now that tree doesn't reach very far, and `task_mgr` is used instead. // The plan is to change that over time. shutdown_pageserver.take(); - BACKGROUND_RUNTIME.block_on(pageserver::shutdown_pageserver(0)); + let bg_remote_storage = remote_storage.clone(); + BACKGROUND_RUNTIME.block_on(pageserver::shutdown_pageserver( + bg_remote_storage.map(|_| deletion_queue.new_client()), + 0, + )); unreachable!() } }) diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 3049ad6a4e..a304a41728 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -3,7 +3,8 @@ pub mod basebackup; pub mod config; pub mod consumption_metrics; pub mod context; -mod control_plane_client; +pub mod control_plane_client; +pub mod deletion_queue; pub mod disk_usage_eviction_task; pub mod http; pub mod import_datadir; @@ -26,7 +27,8 @@ pub mod failpoint_support; use std::path::Path; -use crate::task_mgr::TaskKind; +use crate::{deletion_queue::DeletionQueueError, task_mgr::TaskKind}; +use deletion_queue::DeletionQueueClient; use tracing::info; /// Current storage format version @@ -49,7 +51,10 @@ static ZERO_PAGE: bytes::Bytes = bytes::Bytes::from_static(&[0u8; 8192]); pub use crate::metrics::preinitialize_metrics; #[tracing::instrument] -pub async fn shutdown_pageserver(exit_code: i32) { +pub async fn shutdown_pageserver( + deletion_queue_client: Option, + exit_code: i32, +) { use std::time::Duration; // Shut down the libpq endpoint task. This prevents new connections from // being accepted. @@ -77,6 +82,31 @@ pub async fn shutdown_pageserver(exit_code: i32) { ) .await; + // Best effort to persist any outstanding deletions, to avoid leaking objects + if let Some(deletion_queue_client) = deletion_queue_client { + match tokio::time::timeout(Duration::from_secs(5), deletion_queue_client.flush()).await { + Ok(flush_r) => { + match flush_r { + Ok(()) => { + info!("Deletion queue flushed successfully on shutdown") + } + Err(e) => { + match e { + DeletionQueueError::ShuttingDown => { + // This is not harmful for correctness, but is unexpected: the deletion + // queue's workers should stay alive as long as there are any client handles instantiated. + tracing::warn!("Deletion queue stopped prematurely"); + } + } + } + } + } + Err(e) => { + tracing::warn!("Timed out flushing deletion queue on shutdown ({e})") + } + } + } + // Shut down the HTTP endpoint last, so that you can still check the server's // status while it's shutting down. // FIXME: We should probably stop accepting commands like attach/detach earlier. diff --git a/pageserver/src/task_mgr.rs b/pageserver/src/task_mgr.rs index 3c7a1115df..f363949b75 100644 --- a/pageserver/src/task_mgr.rs +++ b/pageserver/src/task_mgr.rs @@ -455,7 +455,7 @@ async fn task_finish( } if shutdown_process { - shutdown_pageserver(1).await; + shutdown_pageserver(None, 1).await; } } diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 0cf5c36b87..f809a1da0f 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -20,7 +20,8 @@ use utils::crashsafe; use crate::config::PageServerConf; use crate::context::{DownloadBehavior, RequestContext}; -use crate::control_plane_client::ControlPlaneClient; +use crate::control_plane_client::{ControlPlaneClient, ControlPlaneGenerationsApi}; +use crate::deletion_queue::DeletionQueueClient; use crate::task_mgr::{self, TaskKind}; use crate::tenant::config::TenantConfOpt; use crate::tenant::delete::DeleteTenantFlow; @@ -116,7 +117,15 @@ pub async fn init_tenant_mgr( // If we are configured to use the control plane API, then it is the source of truth for what tenants to load. let tenant_generations = if let Some(client) = ControlPlaneClient::new(conf, &cancel) { - Some(client.re_attach().await?) + let result = client.re_attach().await?; + + // Tip off the deletion queue about latest attached generations before starting any Tenants + resources + .deletion_queue_client + .recover(result.clone()) + .await?; + + Some(result) } else { info!("Control plane API not configured, tenant generations are disabled"); None