diff --git a/libs/remote_storage/src/azure_blob.rs b/libs/remote_storage/src/azure_blob.rs index c6d5224706..df6d45dde1 100644 --- a/libs/remote_storage/src/azure_blob.rs +++ b/libs/remote_storage/src/azure_blob.rs @@ -191,6 +191,7 @@ impl RemoteStorage for AzureBlobStorage { &self, prefix: Option<&RemotePath>, mode: ListingMode, + max_keys: Option, ) -> anyhow::Result { // get the passed prefix or if it is not set use prefix_in_bucket value let list_prefix = prefix @@ -223,6 +224,8 @@ impl RemoteStorage for AzureBlobStorage { let mut response = builder.into_stream(); let mut res = Listing::default(); + // NonZeroU32 doesn't support subtraction apparently + let mut max_keys = max_keys.map(|mk| mk.get()); while let Some(l) = response.next().await { let entry = l.map_err(to_download_error)?; let prefix_iter = entry @@ -235,7 +238,18 @@ impl RemoteStorage for AzureBlobStorage { .blobs .blobs() .map(|k| self.name_to_relative_path(&k.name)); - res.keys.extend(blob_iter); + + for key in blob_iter { + res.keys.push(key); + if let Some(mut mk) = max_keys { + assert!(mk > 0); + mk -= 1; + if mk == 0 { + return Ok(res); // limit reached + } + max_keys = Some(mk); + } + } } Ok(res) } diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs index b6648931ac..5a0b74e406 100644 --- a/libs/remote_storage/src/lib.rs +++ b/libs/remote_storage/src/lib.rs @@ -16,7 +16,12 @@ mod simulate_failures; mod support; use std::{ - collections::HashMap, fmt::Debug, num::NonZeroUsize, pin::Pin, sync::Arc, time::SystemTime, + collections::HashMap, + fmt::Debug, + num::{NonZeroU32, NonZeroUsize}, + pin::Pin, + sync::Arc, + time::SystemTime, }; use anyhow::{bail, Context}; @@ -155,7 +160,7 @@ pub trait RemoteStorage: Send + Sync + 'static { prefix: Option<&RemotePath>, ) -> Result, DownloadError> { let result = self - .list(prefix, ListingMode::WithDelimiter) + .list(prefix, ListingMode::WithDelimiter, None) .await? .prefixes; Ok(result) @@ -171,11 +176,17 @@ pub trait RemoteStorage: Send + Sync + 'static { /// whereas, /// list_prefixes("foo/bar/") = ["cat", "dog"] /// See `test_real_s3.rs` for more details. + /// + /// max_keys limits max number of keys returned; None means unlimited. async fn list_files( &self, prefix: Option<&RemotePath>, + max_keys: Option, ) -> Result, DownloadError> { - let result = self.list(prefix, ListingMode::NoDelimiter).await?.keys; + let result = self + .list(prefix, ListingMode::NoDelimiter, max_keys) + .await? + .keys; Ok(result) } @@ -183,6 +194,7 @@ pub trait RemoteStorage: Send + Sync + 'static { &self, prefix: Option<&RemotePath>, _mode: ListingMode, + max_keys: Option, ) -> Result; /// Streams the local file contents into remote into the remote storage entry. @@ -341,27 +353,31 @@ impl GenericRemoteStorage> { &self, prefix: Option<&RemotePath>, mode: ListingMode, + max_keys: Option, ) -> anyhow::Result { match self { - Self::LocalFs(s) => s.list(prefix, mode).await, - Self::AwsS3(s) => s.list(prefix, mode).await, - Self::AzureBlob(s) => s.list(prefix, mode).await, - Self::Unreliable(s) => s.list(prefix, mode).await, + Self::LocalFs(s) => s.list(prefix, mode, max_keys).await, + Self::AwsS3(s) => s.list(prefix, mode, max_keys).await, + Self::AzureBlob(s) => s.list(prefix, mode, max_keys).await, + Self::Unreliable(s) => s.list(prefix, mode, max_keys).await, } } // A function for listing all the files in a "directory" // Example: // list_files("foo/bar") = ["foo/bar/a.txt", "foo/bar/b.txt"] + // + // max_keys limits max number of keys returned; None means unlimited. pub async fn list_files( &self, folder: Option<&RemotePath>, + max_keys: Option, ) -> Result, DownloadError> { match self { - Self::LocalFs(s) => s.list_files(folder).await, - Self::AwsS3(s) => s.list_files(folder).await, - Self::AzureBlob(s) => s.list_files(folder).await, - Self::Unreliable(s) => s.list_files(folder).await, + Self::LocalFs(s) => s.list_files(folder, max_keys).await, + Self::AwsS3(s) => s.list_files(folder, max_keys).await, + Self::AzureBlob(s) => s.list_files(folder, max_keys).await, + Self::Unreliable(s) => s.list_files(folder, max_keys).await, } } diff --git a/libs/remote_storage/src/local_fs.rs b/libs/remote_storage/src/local_fs.rs index 3ebea76181..f53ba9db07 100644 --- a/libs/remote_storage/src/local_fs.rs +++ b/libs/remote_storage/src/local_fs.rs @@ -4,7 +4,9 @@ //! This storage used in tests, but can also be used in cases when a certain persistent //! volume is mounted to the local FS. -use std::{borrow::Cow, future::Future, io::ErrorKind, pin::Pin, time::SystemTime}; +use std::{ + borrow::Cow, future::Future, io::ErrorKind, num::NonZeroU32, pin::Pin, time::SystemTime, +}; use anyhow::{bail, ensure, Context}; use bytes::Bytes; @@ -162,6 +164,7 @@ impl RemoteStorage for LocalFs { &self, prefix: Option<&RemotePath>, mode: ListingMode, + max_keys: Option, ) -> Result { let mut result = Listing::default(); @@ -178,6 +181,9 @@ impl RemoteStorage for LocalFs { !path.is_dir() }) .collect(); + if let Some(max_keys) = max_keys { + result.keys.truncate(max_keys.get() as usize); + } return Ok(result); } @@ -790,12 +796,12 @@ mod fs_tests { let child = upload_dummy_file(&storage, "grandparent/parent/child", None).await?; let uncle = upload_dummy_file(&storage, "grandparent/uncle", None).await?; - let listing = storage.list(None, ListingMode::NoDelimiter).await?; + let listing = storage.list(None, ListingMode::NoDelimiter, None).await?; assert!(listing.prefixes.is_empty()); assert_eq!(listing.keys, [uncle.clone(), child.clone()].to_vec()); // Delimiter: should only go one deep - let listing = storage.list(None, ListingMode::WithDelimiter).await?; + let listing = storage.list(None, ListingMode::WithDelimiter, None).await?; assert_eq!( listing.prefixes, @@ -808,6 +814,7 @@ mod fs_tests { .list( Some(&RemotePath::from_string("timelines/some_timeline/grandparent").unwrap()), ListingMode::WithDelimiter, + None, ) .await?; assert_eq!( diff --git a/libs/remote_storage/src/s3_bucket.rs b/libs/remote_storage/src/s3_bucket.rs index 2b33a6ffd1..dee5750cac 100644 --- a/libs/remote_storage/src/s3_bucket.rs +++ b/libs/remote_storage/src/s3_bucket.rs @@ -7,6 +7,7 @@ use std::{ borrow::Cow, collections::HashMap, + num::NonZeroU32, pin::Pin, sync::Arc, task::{Context, Poll}, @@ -408,8 +409,11 @@ impl RemoteStorage for S3Bucket { &self, prefix: Option<&RemotePath>, mode: ListingMode, + max_keys: Option, ) -> Result { let kind = RequestKind::List; + // s3 sdk wants i32 + let mut max_keys = max_keys.map(|mk| mk.get() as i32); let mut result = Listing::default(); // get the passed prefix or if it is not set use prefix_in_bucket value @@ -433,13 +437,20 @@ impl RemoteStorage for S3Bucket { let _guard = self.permit(kind).await; let started_at = start_measuring_requests(kind); + // min of two Options, returning Some if one is value and another is + // None (None is smaller than anything, so plain min doesn't work). + let request_max_keys = self + .max_keys_per_list_response + .into_iter() + .chain(max_keys.into_iter()) + .min(); let mut request = self .client .list_objects_v2() .bucket(self.bucket_name.clone()) .set_prefix(list_prefix.clone()) .set_continuation_token(continuation_token) - .set_max_keys(self.max_keys_per_list_response); + .set_max_keys(request_max_keys); if let ListingMode::WithDelimiter = mode { request = request.delimiter(REMOTE_STORAGE_PREFIX_SEPARATOR.to_string()); @@ -469,6 +480,14 @@ impl RemoteStorage for S3Bucket { let object_path = object.key().expect("response does not contain a key"); let remote_path = self.s3_object_to_relative_path(object_path); result.keys.push(remote_path); + if let Some(mut mk) = max_keys { + assert!(mk > 0); + mk -= 1; + if mk == 0 { + return Ok(result); // limit reached + } + max_keys = Some(mk); + } } result.prefixes.extend( diff --git a/libs/remote_storage/src/simulate_failures.rs b/libs/remote_storage/src/simulate_failures.rs index 14bdb5ed4d..3dfa16b64e 100644 --- a/libs/remote_storage/src/simulate_failures.rs +++ b/libs/remote_storage/src/simulate_failures.rs @@ -4,6 +4,7 @@ use bytes::Bytes; use futures::stream::Stream; use std::collections::HashMap; +use std::num::NonZeroU32; use std::sync::Mutex; use std::time::SystemTime; use std::{collections::hash_map::Entry, sync::Arc}; @@ -113,20 +114,22 @@ impl RemoteStorage for UnreliableWrapper { async fn list_files( &self, folder: Option<&RemotePath>, + max_keys: Option, ) -> Result, DownloadError> { self.attempt(RemoteOp::ListPrefixes(folder.cloned())) .map_err(DownloadError::Other)?; - self.inner.list_files(folder).await + self.inner.list_files(folder, max_keys).await } async fn list( &self, prefix: Option<&RemotePath>, mode: ListingMode, + max_keys: Option, ) -> Result { self.attempt(RemoteOp::ListPrefixes(prefix.cloned())) .map_err(DownloadError::Other)?; - self.inner.list(prefix, mode).await + self.inner.list(prefix, mode, max_keys).await } async fn upload( diff --git a/libs/remote_storage/tests/common/tests.rs b/libs/remote_storage/tests/common/tests.rs index abccc24c97..6d062f3898 100644 --- a/libs/remote_storage/tests/common/tests.rs +++ b/libs/remote_storage/tests/common/tests.rs @@ -1,8 +1,8 @@ use anyhow::Context; use camino::Utf8Path; use remote_storage::RemotePath; -use std::collections::HashSet; use std::sync::Arc; +use std::{collections::HashSet, num::NonZeroU32}; use test_context::test_context; use tracing::debug; @@ -103,7 +103,7 @@ async fn list_files_works(ctx: &mut MaybeEnabledStorageWithSimpleTestBlobs) -> a let base_prefix = RemotePath::new(Utf8Path::new("folder1")).context("common_prefix construction")?; let root_files = test_client - .list_files(None) + .list_files(None, None) .await .context("client list root files failure")? .into_iter() @@ -113,8 +113,17 @@ async fn list_files_works(ctx: &mut MaybeEnabledStorageWithSimpleTestBlobs) -> a ctx.remote_blobs.clone(), "remote storage list_files on root mismatches with the uploads." ); + + // Test that max_keys limit works. In total there are about 21 files (see + // upload_simple_remote_data call in test_real_s3.rs). + let limited_root_files = test_client + .list_files(None, Some(NonZeroU32::new(2).unwrap())) + .await + .context("client list root files failure")?; + assert_eq!(limited_root_files.len(), 2); + let nested_remote_files = test_client - .list_files(Some(&base_prefix)) + .list_files(Some(&base_prefix), None) .await .context("client list nested files failure")? .into_iter() diff --git a/libs/remote_storage/tests/test_real_s3.rs b/libs/remote_storage/tests/test_real_s3.rs index fc52dabc36..3dc8347c83 100644 --- a/libs/remote_storage/tests/test_real_s3.rs +++ b/libs/remote_storage/tests/test_real_s3.rs @@ -70,7 +70,7 @@ async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow: } async fn list_files(client: &Arc) -> anyhow::Result> { - Ok(retry(|| client.list_files(None)) + Ok(retry(|| client.list_files(None, None)) .await .context("list root files failure")? .into_iter() diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 0c7dd68c3f..e17dea01a8 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -1151,7 +1151,7 @@ impl RemoteTimelineClient { let remaining = download_retry( || async { self.storage_impl - .list_files(Some(&timeline_storage_path)) + .list_files(Some(&timeline_storage_path), None) .await }, "list remaining files", diff --git a/pageserver/src/tenant/remote_timeline_client/download.rs b/pageserver/src/tenant/remote_timeline_client/download.rs index 33287fc8f4..e755cd08f3 100644 --- a/pageserver/src/tenant/remote_timeline_client/download.rs +++ b/pageserver/src/tenant/remote_timeline_client/download.rs @@ -220,7 +220,7 @@ pub async fn list_remote_timelines( || { download_cancellable( &cancel, - storage.list(Some(&remote_path), ListingMode::WithDelimiter), + storage.list(Some(&remote_path), ListingMode::WithDelimiter, None), ) }, &format!("list timelines for {tenant_shard_id}"), @@ -373,7 +373,7 @@ pub(super) async fn download_index_part( let index_prefix = remote_index_path(tenant_shard_id, timeline_id, Generation::none()); let indices = download_retry( - || async { storage.list_files(Some(&index_prefix)).await }, + || async { storage.list_files(Some(&index_prefix), None).await }, "list index_part files", cancel, ) diff --git a/safekeeper/src/wal_backup.rs b/safekeeper/src/wal_backup.rs index df99244770..dbdc742d26 100644 --- a/safekeeper/src/wal_backup.rs +++ b/safekeeper/src/wal_backup.rs @@ -10,6 +10,7 @@ use utils::id::NodeId; use std::cmp::min; use std::collections::{HashMap, HashSet}; +use std::num::NonZeroU32; use std::pin::Pin; use std::sync::Arc; use std::time::Duration; @@ -546,6 +547,10 @@ pub async fn delete_timeline(ttid: &TenantTimelineId) -> Result<()> { let ttid_path = Utf8Path::new(&ttid.tenant_id.to_string()).join(ttid.timeline_id.to_string()); let remote_path = RemotePath::new(&ttid_path)?; + // see DEFAULT_MAX_KEYS_PER_LIST_RESPONSE + // const Option unwrap is not stable, otherwise it would be const. + let batch_size: NonZeroU32 = NonZeroU32::new(1000).unwrap(); + // A backoff::retry is used here for two reasons: // - To provide a backoff rather than busy-polling the API on errors // - To absorb transient 429/503 conditions without hitting our error @@ -557,8 +562,26 @@ pub async fn delete_timeline(ttid: &TenantTimelineId) -> Result<()> { let token = CancellationToken::new(); // not really used backoff::retry( || async { - let files = storage.list_files(Some(&remote_path)).await?; - storage.delete_objects(&files).await + // Do list-delete in batch_size batches to make progress even if there a lot of files. + // Alternatively we could make list_files return iterator, but it is more complicated and + // I'm not sure deleting while iterating is expected in s3. + loop { + let files = storage + .list_files(Some(&remote_path), Some(batch_size)) + .await?; + if files.is_empty() { + return Ok(()); // done + } + // (at least) s3 results are sorted, so can log min/max: + // "List results are always returned in UTF-8 binary order." + info!( + "deleting batch of {} WAL segments [{}-{}]", + files.len(), + files.first().unwrap().object_name().unwrap_or(""), + files.last().unwrap().object_name().unwrap_or("") + ); + storage.delete_objects(&files).await?; + } }, |_| false, 3, @@ -594,7 +617,7 @@ pub async fn copy_s3_segments( let remote_path = RemotePath::new(&relative_dst_path)?; - let files = storage.list_files(Some(&remote_path)).await?; + let files = storage.list_files(Some(&remote_path), None).await?; let uploaded_segments = &files .iter() .filter_map(|file| file.object_name().map(ToOwned::to_owned))