From 2c0d311a54927dabea9ae4f97559a0d878f36d9c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Wed, 24 Jul 2024 02:09:01 +0200 Subject: [PATCH] remote_storage: add list_streaming API call (#8466) This adds the ability to list many prefixes in a streaming fashion to both the `RemoteStorage` trait as well as `GenericRemoteStorage`. * The `list` function of the `RemoteStorage` trait is implemented by default in terms of `list_streaming`. * For the production users (S3, Azure), `list_streaming` is implemented and the default `list` implementation is used. * For `LocalFs`, we keep the `list` implementation and make `list_streaming` call it. The `list_streaming` function is implemented for both S3 and Azure. A TODO for later is retries, which the scrubber currently has while the `list_streaming` implementations lack them. part of #8457 and #7547 --- Cargo.lock | 1 + libs/remote_storage/Cargo.toml | 1 + libs/remote_storage/src/azure_blob.rs | 55 +++---- libs/remote_storage/src/lib.rs | 54 ++++++- libs/remote_storage/src/local_fs.rs | 11 ++ libs/remote_storage/src/s3_bucket.rs | 162 ++++++++++--------- libs/remote_storage/src/simulate_failures.rs | 18 +++ 7 files changed, 185 insertions(+), 117 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b03bd57631..df9efbf7cc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4648,6 +4648,7 @@ name = "remote_storage" version = "0.1.0" dependencies = [ "anyhow", + "async-stream", "async-trait", "aws-config", "aws-credential-types", diff --git a/libs/remote_storage/Cargo.toml b/libs/remote_storage/Cargo.toml index 23d82b90bd..414bce1b26 100644 --- a/libs/remote_storage/Cargo.toml +++ b/libs/remote_storage/Cargo.toml @@ -7,6 +7,7 @@ license.workspace = true [dependencies] anyhow.workspace = true async-trait.workspace = true +async-stream.workspace = true once_cell.workspace = true aws-smithy-async.workspace = true aws-smithy-types.workspace = true diff --git a/libs/remote_storage/src/azure_blob.rs b/libs/remote_storage/src/azure_blob.rs index d0146238da..266a1f6584 100644 --- a/libs/remote_storage/src/azure_blob.rs +++ b/libs/remote_storage/src/azure_blob.rs @@ -267,30 +267,30 @@ fn to_download_error(error: azure_core::Error) -> DownloadError { } impl RemoteStorage for AzureBlobStorage { - async fn list( + fn list_streaming( &self, prefix: Option<&RemotePath>, mode: ListingMode, max_keys: Option, cancel: &CancellationToken, - ) -> anyhow::Result { - let _permit = self.permit(RequestKind::List, cancel).await?; + ) -> impl Stream> { + // get the passed prefix or if it is not set use prefix_in_bucket value + let list_prefix = prefix + .map(|p| self.relative_path_to_name(p)) + .or_else(|| self.prefix_in_container.clone()) + .map(|mut p| { + // required to end with a separator + // otherwise request will return only the entry of a prefix + if matches!(mode, ListingMode::WithDelimiter) + && !p.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) + { + p.push(REMOTE_STORAGE_PREFIX_SEPARATOR); + } + p + }); - let op = async { - // get the passed prefix or if it is not set use prefix_in_bucket value - let list_prefix = prefix - .map(|p| self.relative_path_to_name(p)) - .or_else(|| self.prefix_in_container.clone()) - .map(|mut p| { - // required to end with a separator - // otherwise request will return only the entry of a prefix - if matches!(mode, ListingMode::WithDelimiter) - && !p.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) - { - p.push(REMOTE_STORAGE_PREFIX_SEPARATOR); - } - p - }); + async_stream::stream! { + let _permit = self.permit(RequestKind::List, cancel).await?; let mut builder = self.client.list_blobs(); @@ -316,10 +316,12 @@ impl RemoteStorage for AzureBlobStorage { let mut response = std::pin::pin!(response); - let mut res = Listing::default(); - let mut max_keys = max_keys.map(|mk| mk.get()); - while let Some(entry) = response.next().await { + 'outer: while let Some(entry) = tokio::select! { + op = response.next() => Ok(op), + _ = cancel.cancelled() => Err(DownloadError::Cancelled), + }? { + let mut res = Listing::default(); let entry = entry?; let prefix_iter = entry .blobs @@ -339,19 +341,14 @@ impl RemoteStorage for AzureBlobStorage { assert!(mk > 0); mk -= 1; if mk == 0 { - return Ok(res); // limit reached + yield Ok(res); // limit reached + break 'outer; } max_keys = Some(mk); } } + yield Ok(res); } - - Ok(res) - }; - - tokio::select! { - res = op => res, - _ = cancel.cancelled() => Err(DownloadError::Cancelled), } } diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs index 3ee7d15a76..201e2fb178 100644 --- a/libs/remote_storage/src/lib.rs +++ b/libs/remote_storage/src/lib.rs @@ -26,7 +26,7 @@ use anyhow::Context; use camino::{Utf8Path, Utf8PathBuf}; use bytes::Bytes; -use futures::stream::Stream; +use futures::{stream::Stream, StreamExt}; use serde::{Deserialize, Serialize}; use tokio::sync::Semaphore; use tokio_util::sync::CancellationToken; @@ -160,13 +160,15 @@ pub struct Listing { /// providing basic CRUD operations for storage files. #[allow(async_fn_in_trait)] pub trait RemoteStorage: Send + Sync + 'static { - /// List objects in remote storage, with semantics matching AWS S3's ListObjectsV2. - /// (see ``) + /// List objects in remote storage, with semantics matching AWS S3's [`ListObjectsV2`]. + /// + /// The stream is guaranteed to return at least one element, even in the case of errors + /// (in that case it's an `Err()`), or an empty `Listing`. /// /// Note that the prefix is relative to any `prefix_in_bucket` configured for the client, not /// from the absolute root of the bucket. /// - /// `mode` configures whether to use a delimiter. Without a delimiter all keys + /// `mode` configures whether to use a delimiter. Without a delimiter, all keys /// within the prefix are listed in the `keys` of the result. With a delimiter, any "directories" at the top level of /// the prefix are returned in the `prefixes` of the result, and keys in the top level of the prefix are /// returned in `keys` (). @@ -175,13 +177,31 @@ pub trait RemoteStorage: Send + Sync + 'static { /// will iteratively call listobjects until it runs out of keys. Note that this is not safe to use on /// unlimted size buckets, as the full list of objects is allocated into a monolithic data structure. /// + /// [`ListObjectsV2`]: + fn list_streaming( + &self, + prefix: Option<&RemotePath>, + mode: ListingMode, + max_keys: Option, + cancel: &CancellationToken, + ) -> impl Stream>; + async fn list( &self, prefix: Option<&RemotePath>, - _mode: ListingMode, + mode: ListingMode, max_keys: Option, cancel: &CancellationToken, - ) -> Result; + ) -> Result { + let mut stream = std::pin::pin!(self.list_streaming(prefix, mode, max_keys, cancel)); + let mut combined = stream.next().await.expect("At least one item required")?; + while let Some(list) = stream.next().await { + let list = list?; + combined.keys.extend_from_slice(&list.keys); + combined.prefixes.extend_from_slice(&list.prefixes); + } + Ok(combined) + } /// Streams the local file contents into remote into the remote storage entry. /// @@ -288,8 +308,8 @@ impl Debug for Download { /// Every storage, currently supported. /// Serves as a simple way to pass around the [`RemoteStorage`] without dealing with generics. -#[derive(Clone)] // Require Clone for `Other` due to https://github.com/rust-lang/rust/issues/26925 +#[derive(Clone)] pub enum GenericRemoteStorage> { LocalFs(LocalFs), AwsS3(Arc), @@ -298,13 +318,14 @@ pub enum GenericRemoteStorage> { } impl GenericRemoteStorage> { + // See [`RemoteStorage::list`]. pub async fn list( &self, prefix: Option<&RemotePath>, mode: ListingMode, max_keys: Option, cancel: &CancellationToken, - ) -> anyhow::Result { + ) -> Result { match self { Self::LocalFs(s) => s.list(prefix, mode, max_keys, cancel).await, Self::AwsS3(s) => s.list(prefix, mode, max_keys, cancel).await, @@ -313,6 +334,23 @@ impl GenericRemoteStorage> { } } + // See [`RemoteStorage::list_streaming`]. + pub fn list_streaming<'a>( + &'a self, + prefix: Option<&'a RemotePath>, + mode: ListingMode, + max_keys: Option, + cancel: &'a CancellationToken, + ) -> impl Stream> + 'a { + match self { + Self::LocalFs(s) => Box::pin(s.list_streaming(prefix, mode, max_keys, cancel)) + as Pin>>>, + Self::AwsS3(s) => Box::pin(s.list_streaming(prefix, mode, max_keys, cancel)), + Self::AzureBlob(s) => Box::pin(s.list_streaming(prefix, mode, max_keys, cancel)), + Self::Unreliable(s) => Box::pin(s.list_streaming(prefix, mode, max_keys, cancel)), + } + } + /// See [`RemoteStorage::upload`] pub async fn upload( &self, diff --git a/libs/remote_storage/src/local_fs.rs b/libs/remote_storage/src/local_fs.rs index 1f7bcfc982..a4857b0bba 100644 --- a/libs/remote_storage/src/local_fs.rs +++ b/libs/remote_storage/src/local_fs.rs @@ -331,6 +331,17 @@ impl LocalFs { } impl RemoteStorage for LocalFs { + fn list_streaming( + &self, + prefix: Option<&RemotePath>, + mode: ListingMode, + max_keys: Option, + cancel: &CancellationToken, + ) -> impl Stream> { + let listing = self.list(prefix, mode, max_keys, cancel); + futures::stream::once(listing) + } + async fn list( &self, prefix: Option<&RemotePath>, diff --git a/libs/remote_storage/src/s3_bucket.rs b/libs/remote_storage/src/s3_bucket.rs index 056646a01e..39106a4e53 100644 --- a/libs/remote_storage/src/s3_bucket.rs +++ b/libs/remote_storage/src/s3_bucket.rs @@ -467,17 +467,16 @@ impl>> Stream for TimedDownload { } impl RemoteStorage for S3Bucket { - async fn list( + fn list_streaming( &self, prefix: Option<&RemotePath>, mode: ListingMode, max_keys: Option, cancel: &CancellationToken, - ) -> Result { + ) -> impl Stream> { let kind = RequestKind::List; // s3 sdk wants i32 let mut max_keys = max_keys.map(|mk| mk.get() as i32); - let mut result = Listing::default(); // get the passed prefix or if it is not set use prefix_in_bucket value let list_prefix = prefix @@ -489,89 +488,92 @@ impl RemoteStorage for S3Bucket { }) }); - let _permit = self.permit(kind, cancel).await?; + async_stream::stream! { + let _permit = self.permit(kind, cancel).await?; - let mut continuation_token = None; + let mut continuation_token = None; + 'outer: loop { + let started_at = start_measuring_requests(kind); - loop { - let started_at = start_measuring_requests(kind); + // min of two Options, returning Some if one is value and another is + // None (None is smaller than anything, so plain min doesn't work). + let request_max_keys = self + .max_keys_per_list_response + .into_iter() + .chain(max_keys.into_iter()) + .min(); + let mut request = self + .client + .list_objects_v2() + .bucket(self.bucket_name.clone()) + .set_prefix(list_prefix.clone()) + .set_continuation_token(continuation_token) + .set_max_keys(request_max_keys); - // min of two Options, returning Some if one is value and another is - // None (None is smaller than anything, so plain min doesn't work). - let request_max_keys = self - .max_keys_per_list_response - .into_iter() - .chain(max_keys.into_iter()) - .min(); - let mut request = self - .client - .list_objects_v2() - .bucket(self.bucket_name.clone()) - .set_prefix(list_prefix.clone()) - .set_continuation_token(continuation_token) - .set_max_keys(request_max_keys); - - if let ListingMode::WithDelimiter = mode { - request = request.delimiter(REMOTE_STORAGE_PREFIX_SEPARATOR.to_string()); - } - - let request = request.send(); - - let response = tokio::select! { - res = request => res, - _ = tokio::time::sleep(self.timeout) => return Err(DownloadError::Timeout), - _ = cancel.cancelled() => return Err(DownloadError::Cancelled), - }; - - let response = response - .context("Failed to list S3 prefixes") - .map_err(DownloadError::Other); - - let started_at = ScopeGuard::into_inner(started_at); - - crate::metrics::BUCKET_METRICS - .req_seconds - .observe_elapsed(kind, &response, started_at); - - let response = response?; - - let keys = response.contents(); - let empty = Vec::new(); - let prefixes = response.common_prefixes.as_ref().unwrap_or(&empty); - - tracing::debug!("list: {} prefixes, {} keys", prefixes.len(), keys.len()); - - for object in keys { - let object_path = object.key().expect("response does not contain a key"); - let remote_path = self.s3_object_to_relative_path(object_path); - result.keys.push(remote_path); - if let Some(mut mk) = max_keys { - assert!(mk > 0); - mk -= 1; - if mk == 0 { - return Ok(result); // limit reached - } - max_keys = Some(mk); + if let ListingMode::WithDelimiter = mode { + request = request.delimiter(REMOTE_STORAGE_PREFIX_SEPARATOR.to_string()); } + + let request = request.send(); + + let response = tokio::select! { + res = request => Ok(res), + _ = tokio::time::sleep(self.timeout) => Err(DownloadError::Timeout), + _ = cancel.cancelled() => Err(DownloadError::Cancelled), + }?; + + let response = response + .context("Failed to list S3 prefixes") + .map_err(DownloadError::Other); + + let started_at = ScopeGuard::into_inner(started_at); + + crate::metrics::BUCKET_METRICS + .req_seconds + .observe_elapsed(kind, &response, started_at); + + let response = response?; + + let keys = response.contents(); + let prefixes = response.common_prefixes.as_deref().unwrap_or_default(); + + tracing::debug!("list: {} prefixes, {} keys", prefixes.len(), keys.len()); + let mut result = Listing::default(); + + for object in keys { + let object_path = object.key().expect("response does not contain a key"); + let remote_path = self.s3_object_to_relative_path(object_path); + result.keys.push(remote_path); + if let Some(mut mk) = max_keys { + assert!(mk > 0); + mk -= 1; + if mk == 0 { + // limit reached + yield Ok(result); + break 'outer; + } + max_keys = Some(mk); + } + } + + // S3 gives us prefixes like "foo/", we return them like "foo" + result.prefixes.extend(prefixes.iter().filter_map(|o| { + Some( + self.s3_object_to_relative_path( + o.prefix()? + .trim_end_matches(REMOTE_STORAGE_PREFIX_SEPARATOR), + ), + ) + })); + + yield Ok(result); + + continuation_token = match response.next_continuation_token { + Some(new_token) => Some(new_token), + None => break, + }; } - - // S3 gives us prefixes like "foo/", we return them like "foo" - result.prefixes.extend(prefixes.iter().filter_map(|o| { - Some( - self.s3_object_to_relative_path( - o.prefix()? - .trim_end_matches(REMOTE_STORAGE_PREFIX_SEPARATOR), - ), - ) - })); - - continuation_token = match response.next_continuation_token { - Some(new_token) => Some(new_token), - None => break, - }; } - - Ok(result) } async fn upload( diff --git a/libs/remote_storage/src/simulate_failures.rs b/libs/remote_storage/src/simulate_failures.rs index c467a2d196..67e5be2955 100644 --- a/libs/remote_storage/src/simulate_failures.rs +++ b/libs/remote_storage/src/simulate_failures.rs @@ -3,6 +3,7 @@ //! testing purposes. use bytes::Bytes; use futures::stream::Stream; +use futures::StreamExt; use std::collections::HashMap; use std::num::NonZeroU32; use std::sync::Mutex; @@ -107,6 +108,23 @@ impl UnreliableWrapper { type VoidStorage = crate::LocalFs; impl RemoteStorage for UnreliableWrapper { + fn list_streaming( + &self, + prefix: Option<&RemotePath>, + mode: ListingMode, + max_keys: Option, + cancel: &CancellationToken, + ) -> impl Stream> { + async_stream::stream! { + self.attempt(RemoteOp::ListPrefixes(prefix.cloned())) + .map_err(DownloadError::Other)?; + let mut stream = self.inner + .list_streaming(prefix, mode, max_keys, cancel); + while let Some(item) = stream.next().await { + yield item; + } + } + } async fn list( &self, prefix: Option<&RemotePath>,