From ee1fba8729b8ec2aa7f05623aff68de2af431063 Mon Sep 17 00:00:00 2001 From: John Spray Date: Tue, 12 Sep 2023 09:18:18 +0100 Subject: [PATCH] pageserver: fix deletion queue flush on shutdown --- pageserver/src/bin/pageserver.rs | 10 +++----- pageserver/src/deletion_queue.rs | 40 +++++++++++++++++++++++++++++--- pageserver/src/lib.rs | 35 +++++----------------------- 3 files changed, 46 insertions(+), 39 deletions(-) diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 9816da67bd..94fd339391 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -365,12 +365,7 @@ fn start_pageserver( // Set up deletion queue let (deletion_queue, deletion_frontend, deletion_backend, deletion_executor) = - DeletionQueue::new( - remote_storage.clone(), - control_plane_client, - conf, - shutdown_pageserver.clone(), - ); + DeletionQueue::new(remote_storage.clone(), control_plane_client, conf); if let Some(mut deletion_frontend) = deletion_frontend { BACKGROUND_RUNTIME.spawn(async move { deletion_frontend @@ -656,8 +651,9 @@ fn start_pageserver( // The plan is to change that over time. shutdown_pageserver.take(); let bg_remote_storage = remote_storage.clone(); + let bg_deletion_queue = deletion_queue.clone(); BACKGROUND_RUNTIME.block_on(pageserver::shutdown_pageserver( - bg_remote_storage.map(|_| deletion_queue.new_client()), + bg_remote_storage.map(|_| bg_deletion_queue), 0, )); unreachable!() diff --git a/pageserver/src/deletion_queue.rs b/pageserver/src/deletion_queue.rs index 3bdd5eba18..234e984240 100644 --- a/pageserver/src/deletion_queue.rs +++ b/pageserver/src/deletion_queue.rs @@ -5,6 +5,7 @@ mod frontend; use std::collections::HashMap; use std::path::PathBuf; use std::sync::Arc; +use std::time::Duration; use crate::control_plane_client::ControlPlaneGenerationsApi; use crate::metrics::DELETION_QUEUE_SUBMITTED; @@ -71,6 +72,9 @@ use crate::{config::PageServerConf, tenant::storage_layer::LayerFileName}; #[derive(Clone)] pub struct DeletionQueue { client: DeletionQueueClient, + + // Parent cancellation token for the tokens passed into background workers + cancel: CancellationToken, } #[derive(Debug)] @@ -570,7 +574,6 @@ impl DeletionQueue { remote_storage: Option, control_plane_client: Option>, conf: &'static PageServerConf, - cancel: CancellationToken, ) -> ( Self, Option, @@ -589,6 +592,11 @@ impl DeletionQueue { let lsn_table = Arc::new(std::sync::RwLock::new(VisibleLsnUpdates::new())); + // The deletion queue has an independent cancellation token to + // the general pageserver shutdown token, because it stays alive a bit + // longer to flush after Tenants have all been torn down. + let cancel = CancellationToken::new(); + let remote_storage = match remote_storage { None => { return ( @@ -598,6 +606,7 @@ impl DeletionQueue { executor_tx, lsn_table: lsn_table.clone(), }, + cancel, }, None, None, @@ -614,6 +623,7 @@ impl DeletionQueue { executor_tx: executor_tx.clone(), lsn_table: lsn_table.clone(), }, + cancel: cancel.clone(), }, Some(FrontendQueueWorker::new( conf, @@ -636,6 +646,32 @@ impl DeletionQueue { )), ) } + + pub async fn shutdown(&mut self, timeout: Duration) { + self.cancel.cancel(); + + match tokio::time::timeout(timeout, self.client.flush()).await { + Ok(flush_r) => { + match flush_r { + Ok(()) => { + tracing::info!("Deletion queue flushed successfully on shutdown") + } + Err(e) => { + match e { + DeletionQueueError::ShuttingDown => { + // This is not harmful for correctness, but is unexpected: the deletion + // queue's workers should stay alive as long as there are any client handles instantiated. + tracing::warn!("Deletion queue stopped prematurely"); + } + } + } + } + } + Err(e) => { + tracing::warn!("Timed out flushing deletion queue on shutdown ({e})") + } + } + } } #[cfg(test)] @@ -692,7 +728,6 @@ mod test { Some(self.storage.clone()), Some(self.mock_control_plane.clone()), self.harness.conf, - CancellationToken::new(), ); self.deletion_queue = deletion_queue; @@ -815,7 +850,6 @@ mod test { Some(storage.clone()), Some(mock_control_plane.clone()), harness.conf, - CancellationToken::new(), ); let mut fe_worker = fe_worker.unwrap(); diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index a304a41728..c4b71ee3d6 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -27,8 +27,8 @@ pub mod failpoint_support; use std::path::Path; -use crate::{deletion_queue::DeletionQueueError, task_mgr::TaskKind}; -use deletion_queue::DeletionQueueClient; +use crate::task_mgr::TaskKind; +use deletion_queue::DeletionQueue; use tracing::info; /// Current storage format version @@ -50,11 +50,8 @@ static ZERO_PAGE: bytes::Bytes = bytes::Bytes::from_static(&[0u8; 8192]); pub use crate::metrics::preinitialize_metrics; -#[tracing::instrument] -pub async fn shutdown_pageserver( - deletion_queue_client: Option, - exit_code: i32, -) { +#[tracing::instrument(skip(deletion_queue))] +pub async fn shutdown_pageserver(deletion_queue: Option, exit_code: i32) { use std::time::Duration; // Shut down the libpq endpoint task. This prevents new connections from // being accepted. @@ -83,28 +80,8 @@ pub async fn shutdown_pageserver( .await; // Best effort to persist any outstanding deletions, to avoid leaking objects - if let Some(deletion_queue_client) = deletion_queue_client { - match tokio::time::timeout(Duration::from_secs(5), deletion_queue_client.flush()).await { - Ok(flush_r) => { - match flush_r { - Ok(()) => { - info!("Deletion queue flushed successfully on shutdown") - } - Err(e) => { - match e { - DeletionQueueError::ShuttingDown => { - // This is not harmful for correctness, but is unexpected: the deletion - // queue's workers should stay alive as long as there are any client handles instantiated. - tracing::warn!("Deletion queue stopped prematurely"); - } - } - } - } - } - Err(e) => { - tracing::warn!("Timed out flushing deletion queue on shutdown ({e})") - } - } + if let Some(mut deletion_queue) = deletion_queue { + deletion_queue.shutdown(Duration::from_secs(5)).await; } // Shut down the HTTP endpoint last, so that you can still check the server's