pageserver: instantiate deletion queue

This commit is contained in:
John Spray
2023-09-04 13:53:11 +01:00
parent 60241567ce
commit eb464d5322
4 changed files with 99 additions and 10 deletions

View File

@@ -8,6 +8,7 @@ use anyhow::{anyhow, Context};
use clap::{Arg, ArgAction, Command};
use metrics::launch_timestamp::{set_launch_timestamp_metric, LaunchTimestamp};
use pageserver::control_plane_client::{ControlPlaneClient, ControlPlaneGenerationsApi};
use pageserver::disk_usage_eviction_task::{self, launch_disk_usage_global_eviction_task};
use pageserver::metrics::{STARTUP_DURATION, STARTUP_IS_LOADING};
use pageserver::task_mgr::WALRECEIVER_RUNTIME;
@@ -20,6 +21,7 @@ use metrics::set_build_info_metric;
use pageserver::{
config::{defaults::*, PageServerConf},
context::{DownloadBehavior, RequestContext},
deletion_queue::DeletionQueue,
http, page_cache, page_service, task_mgr,
task_mgr::TaskKind,
task_mgr::{BACKGROUND_RUNTIME, COMPUTE_REQUEST_RUNTIME, MGMT_REQUEST_RUNTIME},
@@ -346,9 +348,54 @@ fn start_pageserver(
}
};
// Top-level cancellation token for the process
let shutdown_pageserver = tokio_util::sync::CancellationToken::new();
// Set up remote storage client
let remote_storage = create_remote_storage_client(conf)?;
// Set up control plane client
let control_plane_client = match ControlPlaneClient::new(conf, &shutdown_pageserver) {
Some(c) => {
let inner: Arc<dyn ControlPlaneGenerationsApi + Send + Sync> = Arc::new(c);
Some(inner)
}
None => None,
};
// Set up deletion queue
let (deletion_queue, deletion_frontend, deletion_backend, deletion_executor) =
DeletionQueue::new(
remote_storage.clone(),
control_plane_client,
conf,
shutdown_pageserver.clone(),
);
if let Some(mut deletion_frontend) = deletion_frontend {
BACKGROUND_RUNTIME.spawn(async move {
deletion_frontend
.background()
.instrument(info_span!(parent:None, "deletion frontend"))
.await
});
}
if let Some(mut deletion_backend) = deletion_backend {
BACKGROUND_RUNTIME.spawn(async move {
deletion_backend
.background()
.instrument(info_span!(parent: None, "deletion backend"))
.await
});
}
if let Some(mut deletion_executor) = deletion_executor {
BACKGROUND_RUNTIME.spawn(async move {
deletion_executor
.background()
.instrument(info_span!(parent: None, "deletion executor"))
.await
});
}
// Up to this point no significant I/O has been done: this should have been fast. Record
// duration prior to starting I/O intensive phase of startup.
startup_checkpoint("initial", "Starting loading tenants");
@@ -379,8 +426,7 @@ fn start_pageserver(
};
// Scan the local 'tenants/' directory and start loading the tenants
let shutdown_pageserver = tokio_util::sync::CancellationToken::new();
let deletion_queue_client = deletion_queue.new_client();
BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr(
conf,
TenantSharedResources {
@@ -481,7 +527,7 @@ fn start_pageserver(
http::routes::State::new(
conf,
http_auth.clone(),
remote_storage,
remote_storage.clone(),
broker_client.clone(),
disk_usage_eviction_state,
)
@@ -607,7 +653,11 @@ fn start_pageserver(
// Right now that tree doesn't reach very far, and `task_mgr` is used instead.
// The plan is to change that over time.
shutdown_pageserver.take();
BACKGROUND_RUNTIME.block_on(pageserver::shutdown_pageserver(0));
let bg_remote_storage = remote_storage.clone();
BACKGROUND_RUNTIME.block_on(pageserver::shutdown_pageserver(
bg_remote_storage.map(|_| deletion_queue.new_client()),
0,
));
unreachable!()
}
})

View File

@@ -3,7 +3,8 @@ pub mod basebackup;
pub mod config;
pub mod consumption_metrics;
pub mod context;
mod control_plane_client;
pub mod control_plane_client;
pub mod deletion_queue;
pub mod disk_usage_eviction_task;
pub mod http;
pub mod import_datadir;
@@ -26,7 +27,8 @@ pub mod failpoint_support;
use std::path::Path;
use crate::task_mgr::TaskKind;
use crate::{deletion_queue::DeletionQueueError, task_mgr::TaskKind};
use deletion_queue::DeletionQueueClient;
use tracing::info;
/// Current storage format version
@@ -49,7 +51,10 @@ static ZERO_PAGE: bytes::Bytes = bytes::Bytes::from_static(&[0u8; 8192]);
pub use crate::metrics::preinitialize_metrics;
#[tracing::instrument]
pub async fn shutdown_pageserver(exit_code: i32) {
pub async fn shutdown_pageserver(
deletion_queue_client: Option<DeletionQueueClient>,
exit_code: i32,
) {
use std::time::Duration;
// Shut down the libpq endpoint task. This prevents new connections from
// being accepted.
@@ -77,6 +82,31 @@ pub async fn shutdown_pageserver(exit_code: i32) {
)
.await;
// Best effort to persist any outstanding deletions, to avoid leaking objects
if let Some(deletion_queue_client) = deletion_queue_client {
match tokio::time::timeout(Duration::from_secs(5), deletion_queue_client.flush()).await {
Ok(flush_r) => {
match flush_r {
Ok(()) => {
info!("Deletion queue flushed successfully on shutdown")
}
Err(e) => {
match e {
DeletionQueueError::ShuttingDown => {
// This is not harmful for correctness, but is unexpected: the deletion
// queue's workers should stay alive as long as there are any client handles instantiated.
tracing::warn!("Deletion queue stopped prematurely");
}
}
}
}
}
Err(e) => {
tracing::warn!("Timed out flushing deletion queue on shutdown ({e})")
}
}
}
// Shut down the HTTP endpoint last, so that you can still check the server's
// status while it's shutting down.
// FIXME: We should probably stop accepting commands like attach/detach earlier.

View File

@@ -455,7 +455,7 @@ async fn task_finish(
}
if shutdown_process {
shutdown_pageserver(1).await;
shutdown_pageserver(None, 1).await;
}
}

View File

@@ -20,7 +20,8 @@ use utils::crashsafe;
use crate::config::PageServerConf;
use crate::context::{DownloadBehavior, RequestContext};
use crate::control_plane_client::ControlPlaneClient;
use crate::control_plane_client::{ControlPlaneClient, ControlPlaneGenerationsApi};
use crate::deletion_queue::DeletionQueueClient;
use crate::task_mgr::{self, TaskKind};
use crate::tenant::config::TenantConfOpt;
use crate::tenant::delete::DeleteTenantFlow;
@@ -116,7 +117,15 @@ pub async fn init_tenant_mgr(
// If we are configured to use the control plane API, then it is the source of truth for what tenants to load.
let tenant_generations = if let Some(client) = ControlPlaneClient::new(conf, &cancel) {
Some(client.re_attach().await?)
let result = client.re_attach().await?;
// Tip off the deletion queue about latest attached generations before starting any Tenants
resources
.deletion_queue_client
.recover(result.clone())
.await?;
Some(result)
} else {
info!("Control plane API not configured, tenant generations are disabled");
None