From 7c4d79f4db6c427ce3e18b72469e18b083ecf164 Mon Sep 17 00:00:00 2001 From: John Spray Date: Tue, 22 Aug 2023 12:48:31 +0100 Subject: [PATCH] deletion queue: cancellable retries --- libs/remote_storage/src/lib.rs | 3 +++ pageserver/src/bin/pageserver.rs | 6 +++++- pageserver/src/deletion_queue.rs | 26 ++++++++++++++++++++------ 3 files changed, 28 insertions(+), 7 deletions(-) diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs index 334ca8a5b2..18822072ff 100644 --- a/libs/remote_storage/src/lib.rs +++ b/libs/remote_storage/src/lib.rs @@ -190,6 +190,8 @@ pub enum DownloadError { BadInput(anyhow::Error), /// The file was not found in the remote storage. NotFound, + /// The client was shut down + Shutdown, /// The file was found in the remote storage, but the download failed. Other(anyhow::Error), } @@ -201,6 +203,7 @@ impl std::fmt::Display for DownloadError { write!(f, "Failed to download a remote file due to user input: {e}") } DownloadError::NotFound => write!(f, "No file found for the remote object id given"), + DownloadError::Shutdown => write!(f, "Client shutting down"), DownloadError::Other(e) => write!(f, "Failed to download a remote file: {e:?}"), } } diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index af866087fc..43e8d47222 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -352,8 +352,9 @@ fn start_pageserver( let remote_storage = create_remote_storage_client(conf)?; // Set up deletion queue + let deletion_queue_cancel = tokio_util::sync::CancellationToken::new(); let (deletion_queue, deletion_frontend, deletion_backend) = - DeletionQueue::new(remote_storage.clone(), conf); + DeletionQueue::new(remote_storage.clone(), conf, deletion_queue_cancel.clone()); if let Some(mut deletion_frontend) = deletion_frontend { BACKGROUND_RUNTIME.spawn(async move { deletion_frontend @@ -655,6 +656,9 @@ fn start_pageserver( } }); + // Clean shutdown of deletion queue workers + deletion_queue_cancel.cancel(); + unreachable!() } }) diff --git a/pageserver/src/deletion_queue.rs b/pageserver/src/deletion_queue.rs index 4aa19e901a..553e3f2eb0 100644 --- a/pageserver/src/deletion_queue.rs +++ b/pageserver/src/deletion_queue.rs @@ -8,6 +8,7 @@ use serde_with::serde_as; use thiserror::Error; use tokio; use tokio::time::Duration; +use tokio_util::sync::CancellationToken; use tracing::{self, debug, error, info, warn}; use utils::backoff; use utils::id::{TenantId, TimelineId}; @@ -517,6 +518,9 @@ pub struct FrontendQueueWorker { // this is lazy usually, but after a failed flush it is set to a smaller time // period to drive retries timeout: Duration, + + // Worker loop is torn down when this fires. + cancel: CancellationToken, } impl FrontendQueueWorker { @@ -592,6 +596,7 @@ impl FrontendQueueWorker { 3, u32::MAX, "Reading deletion queue header", + backoff::Cancel::new(self.cancel.clone(), || DownloadError::Shutdown), ) .await { @@ -622,15 +627,15 @@ impl FrontendQueueWorker { }; }; - // TODO: this needs a CancellationToken or equivalent: usual worker teardown happens via the channel let prefix = RemotePath::new(&self.conf.remote_deletion_node_prefix()) .expect("Failed to compose path"); let lists = backoff::retry( || async { self.remote_storage.list_prefixes(Some(&prefix)).await }, - |_| false, // TODO impl is_permanent + |_| false, 3, u32::MAX, // There's no point giving up, since once we do that the deletion queue is stuck "Recovering deletion lists", + backoff::Cancel::new(self.cancel.clone(), || DownloadError::Shutdown), ) .await?; @@ -693,6 +698,7 @@ impl FrontendQueueWorker { 3, u32::MAX, "Reading a deletion list", + backoff::Cancel::new(self.cancel.clone(), || DownloadError::Shutdown), ) .await?; @@ -826,6 +832,7 @@ impl DeletionQueue { pub fn new( remote_storage: Option, conf: &'static PageServerConf, + cancel: CancellationToken, ) -> ( Self, Option, @@ -850,6 +857,7 @@ impl DeletionQueue { tx: backend_tx, timeout: FLUSH_DEFAULT_DEADLINE, pending_flushes: Vec::new(), + cancel, }), Some(BackendQueueWorker { remote_storage, @@ -896,8 +904,11 @@ mod test { impl TestSetup { /// Simulate a pageserver restart by destroying and recreating the deletion queue fn restart(&mut self) { - let (deletion_queue, fe_worker, be_worker) = - DeletionQueue::new(Some(self.storage.clone()), self.harness.conf); + let (deletion_queue, fe_worker, be_worker) = DeletionQueue::new( + Some(self.storage.clone()), + self.harness.conf, + CancellationToken::new(), + ); self.deletion_queue = deletion_queue; @@ -948,8 +959,11 @@ mod test { )); let entered_runtime = runtime.enter(); - let (deletion_queue, fe_worker, be_worker) = - DeletionQueue::new(Some(storage.clone()), harness.conf); + let (deletion_queue, fe_worker, be_worker) = DeletionQueue::new( + Some(storage.clone()), + harness.conf, + CancellationToken::new(), + ); let mut fe_worker = fe_worker.unwrap(); let mut be_worker = be_worker.unwrap();