mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-26 01:20:38 +00:00
pageserver: fix deletion queue flush on shutdown
This commit is contained in:
@@ -365,12 +365,7 @@ fn start_pageserver(
|
||||
|
||||
// Set up deletion queue
|
||||
let (deletion_queue, deletion_frontend, deletion_backend, deletion_executor) =
|
||||
DeletionQueue::new(
|
||||
remote_storage.clone(),
|
||||
control_plane_client,
|
||||
conf,
|
||||
shutdown_pageserver.clone(),
|
||||
);
|
||||
DeletionQueue::new(remote_storage.clone(), control_plane_client, conf);
|
||||
if let Some(mut deletion_frontend) = deletion_frontend {
|
||||
BACKGROUND_RUNTIME.spawn(async move {
|
||||
deletion_frontend
|
||||
@@ -656,8 +651,9 @@ fn start_pageserver(
|
||||
// The plan is to change that over time.
|
||||
shutdown_pageserver.take();
|
||||
let bg_remote_storage = remote_storage.clone();
|
||||
let bg_deletion_queue = deletion_queue.clone();
|
||||
BACKGROUND_RUNTIME.block_on(pageserver::shutdown_pageserver(
|
||||
bg_remote_storage.map(|_| deletion_queue.new_client()),
|
||||
bg_remote_storage.map(|_| bg_deletion_queue),
|
||||
0,
|
||||
));
|
||||
unreachable!()
|
||||
|
||||
@@ -5,6 +5,7 @@ mod frontend;
|
||||
use std::collections::HashMap;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::control_plane_client::ControlPlaneGenerationsApi;
|
||||
use crate::metrics::DELETION_QUEUE_SUBMITTED;
|
||||
@@ -71,6 +72,9 @@ use crate::{config::PageServerConf, tenant::storage_layer::LayerFileName};
|
||||
#[derive(Clone)]
|
||||
pub struct DeletionQueue {
|
||||
client: DeletionQueueClient,
|
||||
|
||||
// Parent cancellation token for the tokens passed into background workers
|
||||
cancel: CancellationToken,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -570,7 +574,6 @@ impl DeletionQueue {
|
||||
remote_storage: Option<GenericRemoteStorage>,
|
||||
control_plane_client: Option<Arc<dyn ControlPlaneGenerationsApi + Send + Sync>>,
|
||||
conf: &'static PageServerConf,
|
||||
cancel: CancellationToken,
|
||||
) -> (
|
||||
Self,
|
||||
Option<FrontendQueueWorker>,
|
||||
@@ -589,6 +592,11 @@ impl DeletionQueue {
|
||||
|
||||
let lsn_table = Arc::new(std::sync::RwLock::new(VisibleLsnUpdates::new()));
|
||||
|
||||
// The deletion queue has an independent cancellation token to
|
||||
// the general pageserver shutdown token, because it stays alive a bit
|
||||
// longer to flush after Tenants have all been torn down.
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
let remote_storage = match remote_storage {
|
||||
None => {
|
||||
return (
|
||||
@@ -598,6 +606,7 @@ impl DeletionQueue {
|
||||
executor_tx,
|
||||
lsn_table: lsn_table.clone(),
|
||||
},
|
||||
cancel,
|
||||
},
|
||||
None,
|
||||
None,
|
||||
@@ -614,6 +623,7 @@ impl DeletionQueue {
|
||||
executor_tx: executor_tx.clone(),
|
||||
lsn_table: lsn_table.clone(),
|
||||
},
|
||||
cancel: cancel.clone(),
|
||||
},
|
||||
Some(FrontendQueueWorker::new(
|
||||
conf,
|
||||
@@ -636,6 +646,32 @@ impl DeletionQueue {
|
||||
)),
|
||||
)
|
||||
}
|
||||
|
||||
pub async fn shutdown(&mut self, timeout: Duration) {
|
||||
self.cancel.cancel();
|
||||
|
||||
match tokio::time::timeout(timeout, self.client.flush()).await {
|
||||
Ok(flush_r) => {
|
||||
match flush_r {
|
||||
Ok(()) => {
|
||||
tracing::info!("Deletion queue flushed successfully on shutdown")
|
||||
}
|
||||
Err(e) => {
|
||||
match e {
|
||||
DeletionQueueError::ShuttingDown => {
|
||||
// This is not harmful for correctness, but is unexpected: the deletion
|
||||
// queue's workers should stay alive as long as there are any client handles instantiated.
|
||||
tracing::warn!("Deletion queue stopped prematurely");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!("Timed out flushing deletion queue on shutdown ({e})")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -692,7 +728,6 @@ mod test {
|
||||
Some(self.storage.clone()),
|
||||
Some(self.mock_control_plane.clone()),
|
||||
self.harness.conf,
|
||||
CancellationToken::new(),
|
||||
);
|
||||
|
||||
self.deletion_queue = deletion_queue;
|
||||
@@ -815,7 +850,6 @@ mod test {
|
||||
Some(storage.clone()),
|
||||
Some(mock_control_plane.clone()),
|
||||
harness.conf,
|
||||
CancellationToken::new(),
|
||||
);
|
||||
|
||||
let mut fe_worker = fe_worker.unwrap();
|
||||
|
||||
@@ -27,8 +27,8 @@ pub mod failpoint_support;
|
||||
|
||||
use std::path::Path;
|
||||
|
||||
use crate::{deletion_queue::DeletionQueueError, task_mgr::TaskKind};
|
||||
use deletion_queue::DeletionQueueClient;
|
||||
use crate::task_mgr::TaskKind;
|
||||
use deletion_queue::DeletionQueue;
|
||||
use tracing::info;
|
||||
|
||||
/// Current storage format version
|
||||
@@ -50,11 +50,8 @@ static ZERO_PAGE: bytes::Bytes = bytes::Bytes::from_static(&[0u8; 8192]);
|
||||
|
||||
pub use crate::metrics::preinitialize_metrics;
|
||||
|
||||
#[tracing::instrument]
|
||||
pub async fn shutdown_pageserver(
|
||||
deletion_queue_client: Option<DeletionQueueClient>,
|
||||
exit_code: i32,
|
||||
) {
|
||||
#[tracing::instrument(skip(deletion_queue))]
|
||||
pub async fn shutdown_pageserver(deletion_queue: Option<DeletionQueue>, exit_code: i32) {
|
||||
use std::time::Duration;
|
||||
// Shut down the libpq endpoint task. This prevents new connections from
|
||||
// being accepted.
|
||||
@@ -83,28 +80,8 @@ pub async fn shutdown_pageserver(
|
||||
.await;
|
||||
|
||||
// Best effort to persist any outstanding deletions, to avoid leaking objects
|
||||
if let Some(deletion_queue_client) = deletion_queue_client {
|
||||
match tokio::time::timeout(Duration::from_secs(5), deletion_queue_client.flush()).await {
|
||||
Ok(flush_r) => {
|
||||
match flush_r {
|
||||
Ok(()) => {
|
||||
info!("Deletion queue flushed successfully on shutdown")
|
||||
}
|
||||
Err(e) => {
|
||||
match e {
|
||||
DeletionQueueError::ShuttingDown => {
|
||||
// This is not harmful for correctness, but is unexpected: the deletion
|
||||
// queue's workers should stay alive as long as there are any client handles instantiated.
|
||||
tracing::warn!("Deletion queue stopped prematurely");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!("Timed out flushing deletion queue on shutdown ({e})")
|
||||
}
|
||||
}
|
||||
if let Some(mut deletion_queue) = deletion_queue {
|
||||
deletion_queue.shutdown(Duration::from_secs(5)).await;
|
||||
}
|
||||
|
||||
// Shut down the HTTP endpoint last, so that you can still check the server's
|
||||
|
||||
Reference in New Issue
Block a user