diff --git a/libs/remote_storage/src/azure_blob.rs b/libs/remote_storage/src/azure_blob.rs index 266a1f6584..acd95a5255 100644 --- a/libs/remote_storage/src/azure_blob.rs +++ b/libs/remote_storage/src/azure_blob.rs @@ -15,7 +15,7 @@ use std::time::SystemTime; use super::REMOTE_STORAGE_PREFIX_SEPARATOR; use anyhow::Result; use azure_core::request_options::{MaxResults, Metadata, Range}; -use azure_core::RetryOptions; +use azure_core::{Continuable, RetryOptions}; use azure_identity::DefaultAzureCredential; use azure_storage::StorageCredentials; use azure_storage_blobs::blob::CopyStatus; @@ -306,23 +306,43 @@ impl RemoteStorage for AzureBlobStorage { builder = builder.max_results(MaxResults::new(limit)); } - let response = builder.into_stream(); - let response = response.into_stream().map_err(to_download_error); - let response = tokio_stream::StreamExt::timeout(response, self.timeout); - let response = response.map(|res| match res { - Ok(res) => res, - Err(_elapsed) => Err(DownloadError::Timeout), - }); + let mut next_marker = None; - let mut response = std::pin::pin!(response); + 'outer: loop { + let mut builder = builder.clone(); + if let Some(marker) = next_marker.clone() { + builder = builder.marker(marker); + } + let response = builder.into_stream(); + let response = response.into_stream().map_err(to_download_error); + let response = tokio_stream::StreamExt::timeout(response, self.timeout); + let response = response.map(|res| match res { + Ok(res) => res, + Err(_elapsed) => Err(DownloadError::Timeout), + }); + + let mut response = std::pin::pin!(response); + + let mut max_keys = max_keys.map(|mk| mk.get()); + let next_item = tokio::select! { + op = response.next() => Ok(op), + _ = cancel.cancelled() => Err(DownloadError::Cancelled), + }?; + let Some(entry) = next_item else { + // The list is complete, so yield it. + break; + }; - let mut max_keys = max_keys.map(|mk| mk.get()); - 'outer: while let Some(entry) = tokio::select! { - op = response.next() => Ok(op), - _ = cancel.cancelled() => Err(DownloadError::Cancelled), - }? { let mut res = Listing::default(); - let entry = entry?; + let entry = match entry { + Ok(entry) => entry, + Err(e) => { + // The error is potentially retryable, so we must rewind the loop after yielding. + yield Err(e); + continue; + } + }; + next_marker = entry.continuation(); let prefix_iter = entry .blobs .prefixes() @@ -348,6 +368,11 @@ impl RemoteStorage for AzureBlobStorage { } } yield Ok(res); + + // We are done here + if next_marker.is_none() { + break; + } } } } diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs index 201e2fb178..0fed86f4b8 100644 --- a/libs/remote_storage/src/lib.rs +++ b/libs/remote_storage/src/lib.rs @@ -165,6 +165,9 @@ pub trait RemoteStorage: Send + Sync + 'static { /// The stream is guaranteed to return at least one element, even in the case of errors /// (in that case it's an `Err()`), or an empty `Listing`. /// + /// The stream is not ending if it returns an error, as long as [`is_permanent`] returns false on the error. + /// The `next` function can be retried, and maybe in a future retry, there will be success. + /// /// Note that the prefix is relative to any `prefix_in_bucket` configured for the client, not /// from the absolute root of the bucket. /// @@ -178,6 +181,7 @@ pub trait RemoteStorage: Send + Sync + 'static { /// unlimted size buckets, as the full list of objects is allocated into a monolithic data structure. /// /// [`ListObjectsV2`]: + /// [`is_permanent`]: DownloadError::is_permanent fn list_streaming( &self, prefix: Option<&RemotePath>, diff --git a/libs/remote_storage/src/s3_bucket.rs b/libs/remote_storage/src/s3_bucket.rs index 39106a4e53..90ed48e06c 100644 --- a/libs/remote_storage/src/s3_bucket.rs +++ b/libs/remote_storage/src/s3_bucket.rs @@ -507,7 +507,7 @@ impl RemoteStorage for S3Bucket { .list_objects_v2() .bucket(self.bucket_name.clone()) .set_prefix(list_prefix.clone()) - .set_continuation_token(continuation_token) + .set_continuation_token(continuation_token.clone()) .set_max_keys(request_max_keys); if let ListingMode::WithDelimiter = mode { @@ -532,7 +532,14 @@ impl RemoteStorage for S3Bucket { .req_seconds .observe_elapsed(kind, &response, started_at); - let response = response?; + let response = match response { + Ok(response) => response, + Err(e) => { + // The error is potentially retryable, so we must rewind the loop after yielding. + yield Err(e); + continue 'outer; + }, + }; let keys = response.contents(); let prefixes = response.common_prefixes.as_deref().unwrap_or_default(); diff --git a/libs/remote_storage/tests/common/mod.rs b/libs/remote_storage/tests/common/mod.rs index da9dc08d8d..daab05d91a 100644 --- a/libs/remote_storage/tests/common/mod.rs +++ b/libs/remote_storage/tests/common/mod.rs @@ -152,7 +152,7 @@ pub(crate) async fn upload_remote_data( let mut upload_tasks = JoinSet::new(); let cancel = CancellationToken::new(); - for i in 1..upload_tasks_count + 1 { + for i in 1..=upload_tasks_count { let task_client = Arc::clone(client); let cancel = cancel.clone(); diff --git a/libs/remote_storage/tests/common/tests.rs b/libs/remote_storage/tests/common/tests.rs index 673151c8ef..38c316397a 100644 --- a/libs/remote_storage/tests/common/tests.rs +++ b/libs/remote_storage/tests/common/tests.rs @@ -1,5 +1,6 @@ use anyhow::Context; use camino::Utf8Path; +use futures::StreamExt; use remote_storage::ListingMode; use remote_storage::RemotePath; use std::sync::Arc; @@ -29,10 +30,10 @@ use super::{ /// * with no prefix, it lists everything after its `${random_prefix_part}/` — that should be `${base_prefix_str}` value only /// * with `${base_prefix_str}/` prefix, it lists every `sub_prefix_${i}` /// -/// With the real S3 enabled and `#[cfg(test)]` Rust configuration used, the S3 client test adds a `max-keys` param to limit the response keys. -/// This way, we are able to test the pagination implicitly, by ensuring all results are returned from the remote storage and avoid uploading too many blobs to S3, -/// since current default AWS S3 pagination limit is 1000. -/// (see https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html#API_ListObjectsV2_RequestSyntax) +/// In the `MaybeEnabledStorageWithTestBlobs::setup`, we set the `max_keys_in_list_response` param to limit the keys in a single response. +/// This way, we are able to test the pagination, by ensuring all results are returned from the remote storage and avoid uploading too many blobs to S3, +/// as the current default AWS S3 pagination limit is 1000. +/// (see ). /// /// Lastly, the test attempts to clean up and remove all uploaded S3 files. /// If any errors appear during the clean up, they get logged, but the test is not failed or stopped until clean up is finished. @@ -87,6 +88,41 @@ async fn pagination_should_work(ctx: &mut MaybeEnabledStorageWithTestBlobs) -> a "remote storage nested prefixes list mismatches with the uploads. Remote only prefixes: {remote_only_prefixes:?}, missing uploaded prefixes: {missing_uploaded_prefixes:?}", ); + // list_streaming + + let prefix_with_slash = base_prefix.add_trailing_slash(); + let mut nested_remote_prefixes_st = test_client.list_streaming( + Some(&prefix_with_slash), + ListingMode::WithDelimiter, + None, + &cancel, + ); + let mut nested_remote_prefixes_combined = HashSet::new(); + let mut segments = 0; + let mut segment_max_size = 0; + while let Some(st) = nested_remote_prefixes_st.next().await { + let st = st?; + segment_max_size = segment_max_size.max(st.prefixes.len()); + nested_remote_prefixes_combined.extend(st.prefixes.into_iter()); + segments += 1; + } + assert!(segments > 1, "less than 2 segments: {segments}"); + assert!( + segment_max_size * 2 <= nested_remote_prefixes_combined.len(), + "double of segment_max_size={segment_max_size} larger number of remote prefixes of {}", + nested_remote_prefixes_combined.len() + ); + let remote_only_prefixes = nested_remote_prefixes_combined + .difference(&expected_remote_prefixes) + .collect::>(); + let missing_uploaded_prefixes = expected_remote_prefixes + .difference(&nested_remote_prefixes_combined) + .collect::>(); + assert_eq!( + remote_only_prefixes.len() + missing_uploaded_prefixes.len(), 0, + "remote storage nested prefixes list mismatches with the uploads. Remote only prefixes: {remote_only_prefixes:?}, missing uploaded prefixes: {missing_uploaded_prefixes:?}", + ); + Ok(()) }