Remove WAL segments from s3 in batches.

Do list-delete operations in batches instead of doing full list first, to ensure
deletion makes progress even if there are a lot of files to remove.

To this end, add max_keys limit to remote storage list_files.
This commit is contained in:
Arseny Sher
2024-02-09 16:41:43 +03:00
committed by Arseny Sher
parent 96d89cde51
commit 1bb9abebf2
10 changed files with 119 additions and 28 deletions

View File

@@ -191,6 +191,7 @@ impl RemoteStorage for AzureBlobStorage {
&self,
prefix: Option<&RemotePath>,
mode: ListingMode,
max_keys: Option<NonZeroU32>,
) -> anyhow::Result<Listing, DownloadError> {
// get the passed prefix or if it is not set use prefix_in_bucket value
let list_prefix = prefix
@@ -223,6 +224,8 @@ impl RemoteStorage for AzureBlobStorage {
let mut response = builder.into_stream();
let mut res = Listing::default();
// NonZeroU32 doesn't support subtraction apparently
let mut max_keys = max_keys.map(|mk| mk.get());
while let Some(l) = response.next().await {
let entry = l.map_err(to_download_error)?;
let prefix_iter = entry
@@ -235,7 +238,18 @@ impl RemoteStorage for AzureBlobStorage {
.blobs
.blobs()
.map(|k| self.name_to_relative_path(&k.name));
res.keys.extend(blob_iter);
for key in blob_iter {
res.keys.push(key);
if let Some(mut mk) = max_keys {
assert!(mk > 0);
mk -= 1;
if mk == 0 {
return Ok(res); // limit reached
}
max_keys = Some(mk);
}
}
}
Ok(res)
}

View File

@@ -16,7 +16,12 @@ mod simulate_failures;
mod support;
use std::{
collections::HashMap, fmt::Debug, num::NonZeroUsize, pin::Pin, sync::Arc, time::SystemTime,
collections::HashMap,
fmt::Debug,
num::{NonZeroU32, NonZeroUsize},
pin::Pin,
sync::Arc,
time::SystemTime,
};
use anyhow::{bail, Context};
@@ -155,7 +160,7 @@ pub trait RemoteStorage: Send + Sync + 'static {
prefix: Option<&RemotePath>,
) -> Result<Vec<RemotePath>, DownloadError> {
let result = self
.list(prefix, ListingMode::WithDelimiter)
.list(prefix, ListingMode::WithDelimiter, None)
.await?
.prefixes;
Ok(result)
@@ -171,11 +176,17 @@ pub trait RemoteStorage: Send + Sync + 'static {
/// whereas,
/// list_prefixes("foo/bar/") = ["cat", "dog"]
/// See `test_real_s3.rs` for more details.
///
/// max_keys limits max number of keys returned; None means unlimited.
async fn list_files(
&self,
prefix: Option<&RemotePath>,
max_keys: Option<NonZeroU32>,
) -> Result<Vec<RemotePath>, DownloadError> {
let result = self.list(prefix, ListingMode::NoDelimiter).await?.keys;
let result = self
.list(prefix, ListingMode::NoDelimiter, max_keys)
.await?
.keys;
Ok(result)
}
@@ -183,6 +194,7 @@ pub trait RemoteStorage: Send + Sync + 'static {
&self,
prefix: Option<&RemotePath>,
_mode: ListingMode,
max_keys: Option<NonZeroU32>,
) -> Result<Listing, DownloadError>;
/// Streams the local file contents into remote into the remote storage entry.
@@ -341,27 +353,31 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
&self,
prefix: Option<&RemotePath>,
mode: ListingMode,
max_keys: Option<NonZeroU32>,
) -> anyhow::Result<Listing, DownloadError> {
match self {
Self::LocalFs(s) => s.list(prefix, mode).await,
Self::AwsS3(s) => s.list(prefix, mode).await,
Self::AzureBlob(s) => s.list(prefix, mode).await,
Self::Unreliable(s) => s.list(prefix, mode).await,
Self::LocalFs(s) => s.list(prefix, mode, max_keys).await,
Self::AwsS3(s) => s.list(prefix, mode, max_keys).await,
Self::AzureBlob(s) => s.list(prefix, mode, max_keys).await,
Self::Unreliable(s) => s.list(prefix, mode, max_keys).await,
}
}
// A function for listing all the files in a "directory"
// Example:
// list_files("foo/bar") = ["foo/bar/a.txt", "foo/bar/b.txt"]
//
// max_keys limits max number of keys returned; None means unlimited.
pub async fn list_files(
&self,
folder: Option<&RemotePath>,
max_keys: Option<NonZeroU32>,
) -> Result<Vec<RemotePath>, DownloadError> {
match self {
Self::LocalFs(s) => s.list_files(folder).await,
Self::AwsS3(s) => s.list_files(folder).await,
Self::AzureBlob(s) => s.list_files(folder).await,
Self::Unreliable(s) => s.list_files(folder).await,
Self::LocalFs(s) => s.list_files(folder, max_keys).await,
Self::AwsS3(s) => s.list_files(folder, max_keys).await,
Self::AzureBlob(s) => s.list_files(folder, max_keys).await,
Self::Unreliable(s) => s.list_files(folder, max_keys).await,
}
}

View File

@@ -4,7 +4,9 @@
//! This storage used in tests, but can also be used in cases when a certain persistent
//! volume is mounted to the local FS.
use std::{borrow::Cow, future::Future, io::ErrorKind, pin::Pin, time::SystemTime};
use std::{
borrow::Cow, future::Future, io::ErrorKind, num::NonZeroU32, pin::Pin, time::SystemTime,
};
use anyhow::{bail, ensure, Context};
use bytes::Bytes;
@@ -162,6 +164,7 @@ impl RemoteStorage for LocalFs {
&self,
prefix: Option<&RemotePath>,
mode: ListingMode,
max_keys: Option<NonZeroU32>,
) -> Result<Listing, DownloadError> {
let mut result = Listing::default();
@@ -178,6 +181,9 @@ impl RemoteStorage for LocalFs {
!path.is_dir()
})
.collect();
if let Some(max_keys) = max_keys {
result.keys.truncate(max_keys.get() as usize);
}
return Ok(result);
}
@@ -790,12 +796,12 @@ mod fs_tests {
let child = upload_dummy_file(&storage, "grandparent/parent/child", None).await?;
let uncle = upload_dummy_file(&storage, "grandparent/uncle", None).await?;
let listing = storage.list(None, ListingMode::NoDelimiter).await?;
let listing = storage.list(None, ListingMode::NoDelimiter, None).await?;
assert!(listing.prefixes.is_empty());
assert_eq!(listing.keys, [uncle.clone(), child.clone()].to_vec());
// Delimiter: should only go one deep
let listing = storage.list(None, ListingMode::WithDelimiter).await?;
let listing = storage.list(None, ListingMode::WithDelimiter, None).await?;
assert_eq!(
listing.prefixes,
@@ -808,6 +814,7 @@ mod fs_tests {
.list(
Some(&RemotePath::from_string("timelines/some_timeline/grandparent").unwrap()),
ListingMode::WithDelimiter,
None,
)
.await?;
assert_eq!(

View File

@@ -7,6 +7,7 @@
use std::{
borrow::Cow,
collections::HashMap,
num::NonZeroU32,
pin::Pin,
sync::Arc,
task::{Context, Poll},
@@ -408,8 +409,11 @@ impl RemoteStorage for S3Bucket {
&self,
prefix: Option<&RemotePath>,
mode: ListingMode,
max_keys: Option<NonZeroU32>,
) -> Result<Listing, DownloadError> {
let kind = RequestKind::List;
// s3 sdk wants i32
let mut max_keys = max_keys.map(|mk| mk.get() as i32);
let mut result = Listing::default();
// get the passed prefix or if it is not set use prefix_in_bucket value
@@ -433,13 +437,20 @@ impl RemoteStorage for S3Bucket {
let _guard = self.permit(kind).await;
let started_at = start_measuring_requests(kind);
// min of two Options, returning Some if one is value and another is
// None (None is smaller than anything, so plain min doesn't work).
let request_max_keys = self
.max_keys_per_list_response
.into_iter()
.chain(max_keys.into_iter())
.min();
let mut request = self
.client
.list_objects_v2()
.bucket(self.bucket_name.clone())
.set_prefix(list_prefix.clone())
.set_continuation_token(continuation_token)
.set_max_keys(self.max_keys_per_list_response);
.set_max_keys(request_max_keys);
if let ListingMode::WithDelimiter = mode {
request = request.delimiter(REMOTE_STORAGE_PREFIX_SEPARATOR.to_string());
@@ -469,6 +480,14 @@ impl RemoteStorage for S3Bucket {
let object_path = object.key().expect("response does not contain a key");
let remote_path = self.s3_object_to_relative_path(object_path);
result.keys.push(remote_path);
if let Some(mut mk) = max_keys {
assert!(mk > 0);
mk -= 1;
if mk == 0 {
return Ok(result); // limit reached
}
max_keys = Some(mk);
}
}
result.prefixes.extend(

View File

@@ -4,6 +4,7 @@
use bytes::Bytes;
use futures::stream::Stream;
use std::collections::HashMap;
use std::num::NonZeroU32;
use std::sync::Mutex;
use std::time::SystemTime;
use std::{collections::hash_map::Entry, sync::Arc};
@@ -113,20 +114,22 @@ impl RemoteStorage for UnreliableWrapper {
async fn list_files(
&self,
folder: Option<&RemotePath>,
max_keys: Option<NonZeroU32>,
) -> Result<Vec<RemotePath>, DownloadError> {
self.attempt(RemoteOp::ListPrefixes(folder.cloned()))
.map_err(DownloadError::Other)?;
self.inner.list_files(folder).await
self.inner.list_files(folder, max_keys).await
}
async fn list(
&self,
prefix: Option<&RemotePath>,
mode: ListingMode,
max_keys: Option<NonZeroU32>,
) -> Result<Listing, DownloadError> {
self.attempt(RemoteOp::ListPrefixes(prefix.cloned()))
.map_err(DownloadError::Other)?;
self.inner.list(prefix, mode).await
self.inner.list(prefix, mode, max_keys).await
}
async fn upload(

View File

@@ -1,8 +1,8 @@
use anyhow::Context;
use camino::Utf8Path;
use remote_storage::RemotePath;
use std::collections::HashSet;
use std::sync::Arc;
use std::{collections::HashSet, num::NonZeroU32};
use test_context::test_context;
use tracing::debug;
@@ -103,7 +103,7 @@ async fn list_files_works(ctx: &mut MaybeEnabledStorageWithSimpleTestBlobs) -> a
let base_prefix =
RemotePath::new(Utf8Path::new("folder1")).context("common_prefix construction")?;
let root_files = test_client
.list_files(None)
.list_files(None, None)
.await
.context("client list root files failure")?
.into_iter()
@@ -113,8 +113,17 @@ async fn list_files_works(ctx: &mut MaybeEnabledStorageWithSimpleTestBlobs) -> a
ctx.remote_blobs.clone(),
"remote storage list_files on root mismatches with the uploads."
);
// Test that max_keys limit works. In total there are about 21 files (see
// upload_simple_remote_data call in test_real_s3.rs).
let limited_root_files = test_client
.list_files(None, Some(NonZeroU32::new(2).unwrap()))
.await
.context("client list root files failure")?;
assert_eq!(limited_root_files.len(), 2);
let nested_remote_files = test_client
.list_files(Some(&base_prefix))
.list_files(Some(&base_prefix), None)
.await
.context("client list nested files failure")?
.into_iter()

View File

@@ -70,7 +70,7 @@ async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow:
}
async fn list_files(client: &Arc<GenericRemoteStorage>) -> anyhow::Result<HashSet<RemotePath>> {
Ok(retry(|| client.list_files(None))
Ok(retry(|| client.list_files(None, None))
.await
.context("list root files failure")?
.into_iter()

View File

@@ -1151,7 +1151,7 @@ impl RemoteTimelineClient {
let remaining = download_retry(
|| async {
self.storage_impl
.list_files(Some(&timeline_storage_path))
.list_files(Some(&timeline_storage_path), None)
.await
},
"list remaining files",

View File

@@ -220,7 +220,7 @@ pub async fn list_remote_timelines(
|| {
download_cancellable(
&cancel,
storage.list(Some(&remote_path), ListingMode::WithDelimiter),
storage.list(Some(&remote_path), ListingMode::WithDelimiter, None),
)
},
&format!("list timelines for {tenant_shard_id}"),
@@ -373,7 +373,7 @@ pub(super) async fn download_index_part(
let index_prefix = remote_index_path(tenant_shard_id, timeline_id, Generation::none());
let indices = download_retry(
|| async { storage.list_files(Some(&index_prefix)).await },
|| async { storage.list_files(Some(&index_prefix), None).await },
"list index_part files",
cancel,
)

View File

@@ -10,6 +10,7 @@ use utils::id::NodeId;
use std::cmp::min;
use std::collections::{HashMap, HashSet};
use std::num::NonZeroU32;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
@@ -546,6 +547,10 @@ pub async fn delete_timeline(ttid: &TenantTimelineId) -> Result<()> {
let ttid_path = Utf8Path::new(&ttid.tenant_id.to_string()).join(ttid.timeline_id.to_string());
let remote_path = RemotePath::new(&ttid_path)?;
// see DEFAULT_MAX_KEYS_PER_LIST_RESPONSE
// const Option unwrap is not stable, otherwise it would be const.
let batch_size: NonZeroU32 = NonZeroU32::new(1000).unwrap();
// A backoff::retry is used here for two reasons:
// - To provide a backoff rather than busy-polling the API on errors
// - To absorb transient 429/503 conditions without hitting our error
@@ -557,8 +562,26 @@ pub async fn delete_timeline(ttid: &TenantTimelineId) -> Result<()> {
let token = CancellationToken::new(); // not really used
backoff::retry(
|| async {
let files = storage.list_files(Some(&remote_path)).await?;
storage.delete_objects(&files).await
// Do list-delete in batch_size batches to make progress even if there a lot of files.
// Alternatively we could make list_files return iterator, but it is more complicated and
// I'm not sure deleting while iterating is expected in s3.
loop {
let files = storage
.list_files(Some(&remote_path), Some(batch_size))
.await?;
if files.is_empty() {
return Ok(()); // done
}
// (at least) s3 results are sorted, so can log min/max:
// "List results are always returned in UTF-8 binary order."
info!(
"deleting batch of {} WAL segments [{}-{}]",
files.len(),
files.first().unwrap().object_name().unwrap_or(""),
files.last().unwrap().object_name().unwrap_or("")
);
storage.delete_objects(&files).await?;
}
},
|_| false,
3,
@@ -594,7 +617,7 @@ pub async fn copy_s3_segments(
let remote_path = RemotePath::new(&relative_dst_path)?;
let files = storage.list_files(Some(&remote_path)).await?;
let files = storage.list_files(Some(&remote_path), None).await?;
let uploaded_segments = &files
.iter()
.filter_map(|file| file.object_name().map(ToOwned::to_owned))