diff --git a/libs/remote_storage/src/azure_blob.rs b/libs/remote_storage/src/azure_blob.rs index aca22c6b3e..2aa05a9d30 100644 --- a/libs/remote_storage/src/azure_blob.rs +++ b/libs/remote_storage/src/azure_blob.rs @@ -3,6 +3,7 @@ use std::borrow::Cow; use std::collections::HashMap; use std::env; +use std::fmt::Display; use std::io; use std::num::NonZeroU32; use std::pin::Pin; @@ -29,6 +30,7 @@ use http_types::{StatusCode, Url}; use scopeguard::ScopeGuard; use tokio_util::sync::CancellationToken; use tracing::debug; +use utils::backoff; use crate::metrics::{start_measuring_requests, AttemptOutcome, RequestKind}; use crate::{ @@ -451,26 +453,58 @@ impl RemoteStorage for AzureBlobStorage { // TODO batch requests are not supported by the SDK // https://github.com/Azure/azure-sdk-for-rust/issues/1068 for path in paths { - let blob_client = self.client.blob_client(self.relative_path_to_name(path)); - - let request = blob_client.delete().into_future(); - - let res = tokio::time::timeout(self.timeout, request).await; - - match res { - Ok(Ok(_response)) => continue, - Ok(Err(e)) => { - if let Some(http_err) = e.as_http_error() { - if http_err.status() == StatusCode::NotFound { - continue; - } - } - return Err(e.into()); - } - Err(_elapsed) => return Err(TimeoutOrCancel::Timeout.into()), + #[derive(Debug)] + enum AzureOrTimeout { + AzureError(azure_core::Error), + Timeout, + Cancel, } - } + impl Display for AzureOrTimeout { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{self:?}") + } + } + let warn_threshold = 3; + let max_retries = 5; + backoff::retry( + || async { + let blob_client = self.client.blob_client(self.relative_path_to_name(path)); + let request = blob_client.delete().into_future(); + + let res = tokio::time::timeout(self.timeout, request).await; + + match res { + Ok(Ok(_v)) => Ok(()), + Ok(Err(azure_err)) => { + if let Some(http_err) = azure_err.as_http_error() { + if http_err.status() == StatusCode::NotFound { + return Ok(()); + } + } + Err(AzureOrTimeout::AzureError(azure_err)) + } + Err(_elapsed) => Err(AzureOrTimeout::Timeout), + } + }, + |err| match err { + AzureOrTimeout::AzureError(_) | AzureOrTimeout::Timeout => false, + AzureOrTimeout::Cancel => true, + }, + warn_threshold, + max_retries, + "deleting remote object", + cancel, + ) + .await + .ok_or_else(|| AzureOrTimeout::Cancel) + .and_then(|x| x) + .map_err(|e| match e { + AzureOrTimeout::AzureError(err) => anyhow::Error::from(err), + AzureOrTimeout::Timeout => TimeoutOrCancel::Timeout.into(), + AzureOrTimeout::Cancel => TimeoutOrCancel::Cancel.into(), + })?; + } Ok(()) };