mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-04 22:10:39 +00:00
deletion queue: more consistent use of backoff::retry
This commit is contained in:
@@ -40,6 +40,14 @@ const EXECUTE_RETRY_DEADLINE: Duration = Duration::from_millis(100);
|
||||
// From the S3 spec
|
||||
const MAX_KEYS_PER_DELETE: usize = 1000;
|
||||
|
||||
// Arbitrary thresholds for retries: we do not depend on success
|
||||
// within OP_RETRIES, as workers will just go around their consume loop:
|
||||
// the purpose of the backoff::retries with these constants are to
|
||||
// retry _sooner_ than we would if going around the whole loop.
|
||||
pub(crate) const FAILED_REMOTE_OP_WARN_THRESHOLD: u32 = 3;
|
||||
|
||||
pub(crate) const FAILED_REMOTE_OP_RETRIES: u32 = 10;
|
||||
|
||||
// TODO: adminstrative "panic button" config property to disable all deletions
|
||||
// TODO: configurable for how long to wait before executing deletions
|
||||
|
||||
@@ -519,6 +527,26 @@ pub struct FrontendQueueWorker {
|
||||
}
|
||||
|
||||
impl FrontendQueueWorker {
|
||||
async fn upload_pending_list(&mut self) -> anyhow::Result<()> {
|
||||
let key = &self.conf.remote_deletion_list_path(self.pending.sequence);
|
||||
|
||||
backoff::retry(
|
||||
|| {
|
||||
let bytes =
|
||||
serde_json::to_vec(&self.pending).expect("Failed to serialize deletion list");
|
||||
let size = bytes.len();
|
||||
let source = tokio::io::BufReader::new(std::io::Cursor::new(bytes));
|
||||
self.remote_storage.upload(source, size, key, None)
|
||||
},
|
||||
|_| false,
|
||||
FAILED_REMOTE_OP_WARN_THRESHOLD,
|
||||
FAILED_REMOTE_OP_RETRIES,
|
||||
"upload deletion list",
|
||||
backoff::Cancel::new(self.cancel.clone(), || anyhow::anyhow!("Cancelled")),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Try to flush `list` to persistent storage
|
||||
///
|
||||
/// This does not return errors, because on failure to flush we do not lose
|
||||
@@ -533,16 +561,11 @@ impl FrontendQueueWorker {
|
||||
return;
|
||||
}
|
||||
|
||||
let key = &self.conf.remote_deletion_list_path(self.pending.sequence);
|
||||
|
||||
let bytes = serde_json::to_vec(&self.pending).expect("Failed to serialize deletion list");
|
||||
let size = bytes.len();
|
||||
let source = tokio::io::BufReader::new(std::io::Cursor::new(bytes));
|
||||
|
||||
match self.remote_storage.upload(source, size, key, None).await {
|
||||
match self.upload_pending_list().await {
|
||||
Ok(_) => {
|
||||
info!(
|
||||
"Stored deletion list {key} ({0}..{1})",
|
||||
sequence = self.pending.sequence,
|
||||
"Stored deletion list ({0}..{1})",
|
||||
self.pending
|
||||
.objects
|
||||
.first()
|
||||
@@ -585,7 +608,7 @@ impl FrontendQueueWorker {
|
||||
let header_bytes = match backoff::retry(
|
||||
|| self.remote_storage.download_all(&header_path),
|
||||
|e| matches!(e, DownloadError::NotFound),
|
||||
3,
|
||||
FAILED_REMOTE_OP_WARN_THRESHOLD,
|
||||
u32::MAX,
|
||||
"Reading deletion queue header",
|
||||
backoff::Cancel::new(self.cancel.clone(), || DownloadError::Shutdown),
|
||||
@@ -624,7 +647,7 @@ impl FrontendQueueWorker {
|
||||
let lists = backoff::retry(
|
||||
|| async { self.remote_storage.list_prefixes(Some(&prefix)).await },
|
||||
|_| false,
|
||||
3,
|
||||
FAILED_REMOTE_OP_WARN_THRESHOLD,
|
||||
u32::MAX, // There's no point giving up, since once we do that the deletion queue is stuck
|
||||
"Recovering deletion lists",
|
||||
backoff::Cancel::new(self.cancel.clone(), || DownloadError::Shutdown),
|
||||
@@ -687,7 +710,7 @@ impl FrontendQueueWorker {
|
||||
let lists_body = backoff::retry(
|
||||
|| self.remote_storage.download_all(&list_path),
|
||||
|_| false,
|
||||
3,
|
||||
FAILED_REMOTE_OP_WARN_THRESHOLD,
|
||||
u32::MAX,
|
||||
"Reading a deletion list",
|
||||
backoff::Cancel::new(self.cancel.clone(), || DownloadError::Shutdown),
|
||||
|
||||
Reference in New Issue
Block a user