diff --git a/libs/remote_storage/src/azure_blob.rs b/libs/remote_storage/src/azure_blob.rs index 1c0d43d479..ae0a94295c 100644 --- a/libs/remote_storage/src/azure_blob.rs +++ b/libs/remote_storage/src/azure_blob.rs @@ -24,6 +24,7 @@ use azure_storage_blobs::{blob::operations::GetBlobBuilder, prelude::ContainerCl use bytes::Bytes; use futures::future::Either; use futures::stream::Stream; +use futures::FutureExt; use futures_util::StreamExt; use futures_util::TryStreamExt; use http_types::{StatusCode, Url}; @@ -31,6 +32,7 @@ use scopeguard::ScopeGuard; use tokio_util::sync::CancellationToken; use tracing::debug; use utils::backoff; +use utils::backoff::exponential_backoff_duration_seconds; use crate::metrics::{start_measuring_requests, AttemptOutcome, RequestKind}; use crate::{ @@ -302,40 +304,59 @@ impl RemoteStorage for AzureBlobStorage { let mut next_marker = None; + let mut timeout_try_cnt = 1; + 'outer: loop { let mut builder = builder.clone(); if let Some(marker) = next_marker.clone() { builder = builder.marker(marker); } - let response = builder.into_stream(); - let response = response.into_stream().map_err(to_download_error); - let response = tokio_stream::StreamExt::timeout(response, self.timeout); - let response = response.map(|res| match res { - Ok(res) => res, - Err(_elapsed) => Err(DownloadError::Timeout), + // Azure Blob Rust SDK does not expose the list blob API directly. Users have to use + // their pageable iterator wrapper that returns all keys as a stream. We want to have + // full control of paging, and therefore we only take the first item from the stream. + let mut response_stream = builder.into_stream(); + let response = response_stream.next(); + // Timeout mechanism: Azure client will sometimes stuck on a request, but retrying that request + // would immediately succeed. Therefore, we use exponential backoff timeout to retry the request. + // (Usually, exponential backoff is used to determine the sleep time between two retries.) We + // start with 10.0 second timeout, and double the timeout for each failure, up to 5 failures. + // timeout = min(5 * (1.0+1.0)^n, self.timeout). + let this_timeout = (5.0 * exponential_backoff_duration_seconds(timeout_try_cnt, 1.0, self.timeout.as_secs_f64())).min(self.timeout.as_secs_f64()); + let response = tokio::time::timeout(Duration::from_secs_f64(this_timeout), response); + let response = response.map(|res| { + match res { + Ok(Some(Ok(res))) => Ok(Some(res)), + Ok(Some(Err(e))) => Err(to_download_error(e)), + Ok(None) => Ok(None), + Err(_elasped) => Err(DownloadError::Timeout), + } }); - - let mut response = std::pin::pin!(response); - let mut max_keys = max_keys.map(|mk| mk.get()); let next_item = tokio::select! { - op = response.next() => Ok(op), + op = response => op, _ = cancel.cancelled() => Err(DownloadError::Cancelled), - }?; + }; + + if let Err(DownloadError::Timeout) = &next_item { + timeout_try_cnt += 1; + if timeout_try_cnt <= 5 { + continue; + } + } + + let next_item = next_item?; + + if timeout_try_cnt >= 2 { + tracing::warn!("Azure Blob Storage list timed out and succeeded after {} tries", timeout_try_cnt); + } + timeout_try_cnt = 1; + let Some(entry) = next_item else { // The list is complete, so yield it. break; }; let mut res = Listing::default(); - let entry = match entry { - Ok(entry) => entry, - Err(e) => { - // The error is potentially retryable, so we must rewind the loop after yielding. - yield Err(e); - continue; - } - }; next_marker = entry.continuation(); let prefix_iter = entry .blobs @@ -351,7 +372,7 @@ impl RemoteStorage for AzureBlobStorage { last_modified: k.properties.last_modified.into(), size: k.properties.content_length, } - ); + ); for key in blob_iter { res.keys.push(key);