remote_storage: add list_streaming API call (#8466)

This adds the ability to list many prefixes in a streaming fashion to
both the `RemoteStorage` trait as well as `GenericRemoteStorage`.

* The `list` function of the `RemoteStorage` trait is implemented by
default in terms of `list_streaming`.
* For the production users (S3, Azure), `list_streaming` is implemented
and the default `list` implementation is used.
* For `LocalFs`, we keep the `list` implementation and make
`list_streaming` call it.

The `list_streaming` function is implemented for both S3 and Azure.

A TODO for later is retries, which the scrubber currently has while the
`list_streaming` implementations lack them.

part of #8457 and #7547
This commit is contained in:
Arpad Müller
2024-07-24 02:09:01 +02:00
committed by GitHub
parent 18cf5cfefd
commit 2c0d311a54
7 changed files with 185 additions and 117 deletions

1
Cargo.lock generated
View File

@@ -4648,6 +4648,7 @@ name = "remote_storage"
version = "0.1.0"
dependencies = [
"anyhow",
"async-stream",
"async-trait",
"aws-config",
"aws-credential-types",

View File

@@ -7,6 +7,7 @@ license.workspace = true
[dependencies]
anyhow.workspace = true
async-trait.workspace = true
async-stream.workspace = true
once_cell.workspace = true
aws-smithy-async.workspace = true
aws-smithy-types.workspace = true

View File

@@ -267,30 +267,30 @@ fn to_download_error(error: azure_core::Error) -> DownloadError {
}
impl RemoteStorage for AzureBlobStorage {
async fn list(
fn list_streaming(
&self,
prefix: Option<&RemotePath>,
mode: ListingMode,
max_keys: Option<NonZeroU32>,
cancel: &CancellationToken,
) -> anyhow::Result<Listing, DownloadError> {
let _permit = self.permit(RequestKind::List, cancel).await?;
) -> impl Stream<Item = Result<Listing, DownloadError>> {
// get the passed prefix or if it is not set use prefix_in_bucket value
let list_prefix = prefix
.map(|p| self.relative_path_to_name(p))
.or_else(|| self.prefix_in_container.clone())
.map(|mut p| {
// required to end with a separator
// otherwise request will return only the entry of a prefix
if matches!(mode, ListingMode::WithDelimiter)
&& !p.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR)
{
p.push(REMOTE_STORAGE_PREFIX_SEPARATOR);
}
p
});
let op = async {
// get the passed prefix or if it is not set use prefix_in_bucket value
let list_prefix = prefix
.map(|p| self.relative_path_to_name(p))
.or_else(|| self.prefix_in_container.clone())
.map(|mut p| {
// required to end with a separator
// otherwise request will return only the entry of a prefix
if matches!(mode, ListingMode::WithDelimiter)
&& !p.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR)
{
p.push(REMOTE_STORAGE_PREFIX_SEPARATOR);
}
p
});
async_stream::stream! {
let _permit = self.permit(RequestKind::List, cancel).await?;
let mut builder = self.client.list_blobs();
@@ -316,10 +316,12 @@ impl RemoteStorage for AzureBlobStorage {
let mut response = std::pin::pin!(response);
let mut res = Listing::default();
let mut max_keys = max_keys.map(|mk| mk.get());
while let Some(entry) = response.next().await {
'outer: while let Some(entry) = tokio::select! {
op = response.next() => Ok(op),
_ = cancel.cancelled() => Err(DownloadError::Cancelled),
}? {
let mut res = Listing::default();
let entry = entry?;
let prefix_iter = entry
.blobs
@@ -339,19 +341,14 @@ impl RemoteStorage for AzureBlobStorage {
assert!(mk > 0);
mk -= 1;
if mk == 0 {
return Ok(res); // limit reached
yield Ok(res); // limit reached
break 'outer;
}
max_keys = Some(mk);
}
}
yield Ok(res);
}
Ok(res)
};
tokio::select! {
res = op => res,
_ = cancel.cancelled() => Err(DownloadError::Cancelled),
}
}

View File

@@ -26,7 +26,7 @@ use anyhow::Context;
use camino::{Utf8Path, Utf8PathBuf};
use bytes::Bytes;
use futures::stream::Stream;
use futures::{stream::Stream, StreamExt};
use serde::{Deserialize, Serialize};
use tokio::sync::Semaphore;
use tokio_util::sync::CancellationToken;
@@ -160,13 +160,15 @@ pub struct Listing {
/// providing basic CRUD operations for storage files.
#[allow(async_fn_in_trait)]
pub trait RemoteStorage: Send + Sync + 'static {
/// List objects in remote storage, with semantics matching AWS S3's ListObjectsV2.
/// (see `<https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html>`)
/// List objects in remote storage, with semantics matching AWS S3's [`ListObjectsV2`].
///
/// The stream is guaranteed to return at least one element, even in the case of errors
/// (in that case it's an `Err()`), or an empty `Listing`.
///
/// Note that the prefix is relative to any `prefix_in_bucket` configured for the client, not
/// from the absolute root of the bucket.
///
/// `mode` configures whether to use a delimiter. Without a delimiter all keys
/// `mode` configures whether to use a delimiter. Without a delimiter, all keys
/// within the prefix are listed in the `keys` of the result. With a delimiter, any "directories" at the top level of
/// the prefix are returned in the `prefixes` of the result, and keys in the top level of the prefix are
/// returned in `keys` ().
@@ -175,13 +177,31 @@ pub trait RemoteStorage: Send + Sync + 'static {
/// will iteratively call listobjects until it runs out of keys. Note that this is not safe to use on
/// unlimted size buckets, as the full list of objects is allocated into a monolithic data structure.
///
/// [`ListObjectsV2`]: <https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html>
fn list_streaming(
&self,
prefix: Option<&RemotePath>,
mode: ListingMode,
max_keys: Option<NonZeroU32>,
cancel: &CancellationToken,
) -> impl Stream<Item = Result<Listing, DownloadError>>;
async fn list(
&self,
prefix: Option<&RemotePath>,
_mode: ListingMode,
mode: ListingMode,
max_keys: Option<NonZeroU32>,
cancel: &CancellationToken,
) -> Result<Listing, DownloadError>;
) -> Result<Listing, DownloadError> {
let mut stream = std::pin::pin!(self.list_streaming(prefix, mode, max_keys, cancel));
let mut combined = stream.next().await.expect("At least one item required")?;
while let Some(list) = stream.next().await {
let list = list?;
combined.keys.extend_from_slice(&list.keys);
combined.prefixes.extend_from_slice(&list.prefixes);
}
Ok(combined)
}
/// Streams the local file contents into remote into the remote storage entry.
///
@@ -288,8 +308,8 @@ impl Debug for Download {
/// Every storage, currently supported.
/// Serves as a simple way to pass around the [`RemoteStorage`] without dealing with generics.
#[derive(Clone)]
// Require Clone for `Other` due to https://github.com/rust-lang/rust/issues/26925
#[derive(Clone)]
pub enum GenericRemoteStorage<Other: Clone = Arc<UnreliableWrapper>> {
LocalFs(LocalFs),
AwsS3(Arc<S3Bucket>),
@@ -298,13 +318,14 @@ pub enum GenericRemoteStorage<Other: Clone = Arc<UnreliableWrapper>> {
}
impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
// See [`RemoteStorage::list`].
pub async fn list(
&self,
prefix: Option<&RemotePath>,
mode: ListingMode,
max_keys: Option<NonZeroU32>,
cancel: &CancellationToken,
) -> anyhow::Result<Listing, DownloadError> {
) -> Result<Listing, DownloadError> {
match self {
Self::LocalFs(s) => s.list(prefix, mode, max_keys, cancel).await,
Self::AwsS3(s) => s.list(prefix, mode, max_keys, cancel).await,
@@ -313,6 +334,23 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
}
}
// See [`RemoteStorage::list_streaming`].
pub fn list_streaming<'a>(
&'a self,
prefix: Option<&'a RemotePath>,
mode: ListingMode,
max_keys: Option<NonZeroU32>,
cancel: &'a CancellationToken,
) -> impl Stream<Item = Result<Listing, DownloadError>> + 'a {
match self {
Self::LocalFs(s) => Box::pin(s.list_streaming(prefix, mode, max_keys, cancel))
as Pin<Box<dyn Stream<Item = Result<Listing, DownloadError>>>>,
Self::AwsS3(s) => Box::pin(s.list_streaming(prefix, mode, max_keys, cancel)),
Self::AzureBlob(s) => Box::pin(s.list_streaming(prefix, mode, max_keys, cancel)),
Self::Unreliable(s) => Box::pin(s.list_streaming(prefix, mode, max_keys, cancel)),
}
}
/// See [`RemoteStorage::upload`]
pub async fn upload(
&self,

View File

@@ -331,6 +331,17 @@ impl LocalFs {
}
impl RemoteStorage for LocalFs {
fn list_streaming(
&self,
prefix: Option<&RemotePath>,
mode: ListingMode,
max_keys: Option<NonZeroU32>,
cancel: &CancellationToken,
) -> impl Stream<Item = Result<Listing, DownloadError>> {
let listing = self.list(prefix, mode, max_keys, cancel);
futures::stream::once(listing)
}
async fn list(
&self,
prefix: Option<&RemotePath>,

View File

@@ -467,17 +467,16 @@ impl<S: Stream<Item = std::io::Result<Bytes>>> Stream for TimedDownload<S> {
}
impl RemoteStorage for S3Bucket {
async fn list(
fn list_streaming(
&self,
prefix: Option<&RemotePath>,
mode: ListingMode,
max_keys: Option<NonZeroU32>,
cancel: &CancellationToken,
) -> Result<Listing, DownloadError> {
) -> impl Stream<Item = Result<Listing, DownloadError>> {
let kind = RequestKind::List;
// s3 sdk wants i32
let mut max_keys = max_keys.map(|mk| mk.get() as i32);
let mut result = Listing::default();
// get the passed prefix or if it is not set use prefix_in_bucket value
let list_prefix = prefix
@@ -489,89 +488,92 @@ impl RemoteStorage for S3Bucket {
})
});
let _permit = self.permit(kind, cancel).await?;
async_stream::stream! {
let _permit = self.permit(kind, cancel).await?;
let mut continuation_token = None;
let mut continuation_token = None;
'outer: loop {
let started_at = start_measuring_requests(kind);
loop {
let started_at = start_measuring_requests(kind);
// min of two Options, returning Some if one is value and another is
// None (None is smaller than anything, so plain min doesn't work).
let request_max_keys = self
.max_keys_per_list_response
.into_iter()
.chain(max_keys.into_iter())
.min();
let mut request = self
.client
.list_objects_v2()
.bucket(self.bucket_name.clone())
.set_prefix(list_prefix.clone())
.set_continuation_token(continuation_token)
.set_max_keys(request_max_keys);
// min of two Options, returning Some if one is value and another is
// None (None is smaller than anything, so plain min doesn't work).
let request_max_keys = self
.max_keys_per_list_response
.into_iter()
.chain(max_keys.into_iter())
.min();
let mut request = self
.client
.list_objects_v2()
.bucket(self.bucket_name.clone())
.set_prefix(list_prefix.clone())
.set_continuation_token(continuation_token)
.set_max_keys(request_max_keys);
if let ListingMode::WithDelimiter = mode {
request = request.delimiter(REMOTE_STORAGE_PREFIX_SEPARATOR.to_string());
}
let request = request.send();
let response = tokio::select! {
res = request => res,
_ = tokio::time::sleep(self.timeout) => return Err(DownloadError::Timeout),
_ = cancel.cancelled() => return Err(DownloadError::Cancelled),
};
let response = response
.context("Failed to list S3 prefixes")
.map_err(DownloadError::Other);
let started_at = ScopeGuard::into_inner(started_at);
crate::metrics::BUCKET_METRICS
.req_seconds
.observe_elapsed(kind, &response, started_at);
let response = response?;
let keys = response.contents();
let empty = Vec::new();
let prefixes = response.common_prefixes.as_ref().unwrap_or(&empty);
tracing::debug!("list: {} prefixes, {} keys", prefixes.len(), keys.len());
for object in keys {
let object_path = object.key().expect("response does not contain a key");
let remote_path = self.s3_object_to_relative_path(object_path);
result.keys.push(remote_path);
if let Some(mut mk) = max_keys {
assert!(mk > 0);
mk -= 1;
if mk == 0 {
return Ok(result); // limit reached
}
max_keys = Some(mk);
if let ListingMode::WithDelimiter = mode {
request = request.delimiter(REMOTE_STORAGE_PREFIX_SEPARATOR.to_string());
}
let request = request.send();
let response = tokio::select! {
res = request => Ok(res),
_ = tokio::time::sleep(self.timeout) => Err(DownloadError::Timeout),
_ = cancel.cancelled() => Err(DownloadError::Cancelled),
}?;
let response = response
.context("Failed to list S3 prefixes")
.map_err(DownloadError::Other);
let started_at = ScopeGuard::into_inner(started_at);
crate::metrics::BUCKET_METRICS
.req_seconds
.observe_elapsed(kind, &response, started_at);
let response = response?;
let keys = response.contents();
let prefixes = response.common_prefixes.as_deref().unwrap_or_default();
tracing::debug!("list: {} prefixes, {} keys", prefixes.len(), keys.len());
let mut result = Listing::default();
for object in keys {
let object_path = object.key().expect("response does not contain a key");
let remote_path = self.s3_object_to_relative_path(object_path);
result.keys.push(remote_path);
if let Some(mut mk) = max_keys {
assert!(mk > 0);
mk -= 1;
if mk == 0 {
// limit reached
yield Ok(result);
break 'outer;
}
max_keys = Some(mk);
}
}
// S3 gives us prefixes like "foo/", we return them like "foo"
result.prefixes.extend(prefixes.iter().filter_map(|o| {
Some(
self.s3_object_to_relative_path(
o.prefix()?
.trim_end_matches(REMOTE_STORAGE_PREFIX_SEPARATOR),
),
)
}));
yield Ok(result);
continuation_token = match response.next_continuation_token {
Some(new_token) => Some(new_token),
None => break,
};
}
// S3 gives us prefixes like "foo/", we return them like "foo"
result.prefixes.extend(prefixes.iter().filter_map(|o| {
Some(
self.s3_object_to_relative_path(
o.prefix()?
.trim_end_matches(REMOTE_STORAGE_PREFIX_SEPARATOR),
),
)
}));
continuation_token = match response.next_continuation_token {
Some(new_token) => Some(new_token),
None => break,
};
}
Ok(result)
}
async fn upload(

View File

@@ -3,6 +3,7 @@
//! testing purposes.
use bytes::Bytes;
use futures::stream::Stream;
use futures::StreamExt;
use std::collections::HashMap;
use std::num::NonZeroU32;
use std::sync::Mutex;
@@ -107,6 +108,23 @@ impl UnreliableWrapper {
type VoidStorage = crate::LocalFs;
impl RemoteStorage for UnreliableWrapper {
fn list_streaming(
&self,
prefix: Option<&RemotePath>,
mode: ListingMode,
max_keys: Option<NonZeroU32>,
cancel: &CancellationToken,
) -> impl Stream<Item = Result<Listing, DownloadError>> {
async_stream::stream! {
self.attempt(RemoteOp::ListPrefixes(prefix.cloned()))
.map_err(DownloadError::Other)?;
let mut stream = self.inner
.list_streaming(prefix, mode, max_keys, cancel);
while let Some(item) = stream.next().await {
yield item;
}
}
}
async fn list(
&self,
prefix: Option<&RemotePath>,