diff --git a/pageserver/src/deletion_queue.rs b/pageserver/src/deletion_queue.rs index efa5cc4f99..21d45dfad3 100644 --- a/pageserver/src/deletion_queue.rs +++ b/pageserver/src/deletion_queue.rs @@ -40,6 +40,14 @@ const EXECUTE_RETRY_DEADLINE: Duration = Duration::from_millis(100); // From the S3 spec const MAX_KEYS_PER_DELETE: usize = 1000; +// Arbitrary thresholds for retries: we do not depend on success +// within OP_RETRIES, as workers will just go around their consume loop: +// the purpose of the backoff::retries with these constants are to +// retry _sooner_ than we would if going around the whole loop. +pub(crate) const FAILED_REMOTE_OP_WARN_THRESHOLD: u32 = 3; + +pub(crate) const FAILED_REMOTE_OP_RETRIES: u32 = 10; + // TODO: adminstrative "panic button" config property to disable all deletions // TODO: configurable for how long to wait before executing deletions @@ -519,6 +527,26 @@ pub struct FrontendQueueWorker { } impl FrontendQueueWorker { + async fn upload_pending_list(&mut self) -> anyhow::Result<()> { + let key = &self.conf.remote_deletion_list_path(self.pending.sequence); + + backoff::retry( + || { + let bytes = + serde_json::to_vec(&self.pending).expect("Failed to serialize deletion list"); + let size = bytes.len(); + let source = tokio::io::BufReader::new(std::io::Cursor::new(bytes)); + self.remote_storage.upload(source, size, key, None) + }, + |_| false, + FAILED_REMOTE_OP_WARN_THRESHOLD, + FAILED_REMOTE_OP_RETRIES, + "upload deletion list", + backoff::Cancel::new(self.cancel.clone(), || anyhow::anyhow!("Cancelled")), + ) + .await + } + /// Try to flush `list` to persistent storage /// /// This does not return errors, because on failure to flush we do not lose @@ -533,16 +561,11 @@ impl FrontendQueueWorker { return; } - let key = &self.conf.remote_deletion_list_path(self.pending.sequence); - - let bytes = serde_json::to_vec(&self.pending).expect("Failed to serialize deletion list"); - let size = bytes.len(); - let source = tokio::io::BufReader::new(std::io::Cursor::new(bytes)); - - match self.remote_storage.upload(source, size, key, None).await { + match self.upload_pending_list().await { Ok(_) => { info!( - "Stored deletion list {key} ({0}..{1})", + sequence = self.pending.sequence, + "Stored deletion list ({0}..{1})", self.pending .objects .first() @@ -585,7 +608,7 @@ impl FrontendQueueWorker { let header_bytes = match backoff::retry( || self.remote_storage.download_all(&header_path), |e| matches!(e, DownloadError::NotFound), - 3, + FAILED_REMOTE_OP_WARN_THRESHOLD, u32::MAX, "Reading deletion queue header", backoff::Cancel::new(self.cancel.clone(), || DownloadError::Shutdown), @@ -624,7 +647,7 @@ impl FrontendQueueWorker { let lists = backoff::retry( || async { self.remote_storage.list_prefixes(Some(&prefix)).await }, |_| false, - 3, + FAILED_REMOTE_OP_WARN_THRESHOLD, u32::MAX, // There's no point giving up, since once we do that the deletion queue is stuck "Recovering deletion lists", backoff::Cancel::new(self.cancel.clone(), || DownloadError::Shutdown), @@ -687,7 +710,7 @@ impl FrontendQueueWorker { let lists_body = backoff::retry( || self.remote_storage.download_all(&list_path), |_| false, - 3, + FAILED_REMOTE_OP_WARN_THRESHOLD, u32::MAX, "Reading a deletion list", backoff::Cancel::new(self.cancel.clone(), || DownloadError::Shutdown),