From c51db1db61c7c1513cb6fc69563e80c4110abd9d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Tue, 10 Dec 2024 12:29:38 +0100 Subject: [PATCH] Replace MAX_KEYS_PER_DELETE constant with function (#10061) Azure has a different per-request limit of 256 items for bulk deletion compared to the number of 1000 on AWS. Therefore, we need to support multiple values. Due to `GenericRemoteStorage`, we can't add an associated constant, but it has to be a function. The PR replaces the `MAX_KEYS_PER_DELETE` constant with a function of the same name, implemented on both the `RemoteStorage` trait as well as on `GenericRemoteStorage`. The value serves as hint of how many objects to pass to the `delete_objects` function. Reading: * https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch * https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html Part of #7931 --- libs/remote_storage/src/azure_blob.rs | 4 +++ libs/remote_storage/src/lib.rs | 27 +++++++++++++++++++- libs/remote_storage/src/local_fs.rs | 4 +++ libs/remote_storage/src/s3_bucket.rs | 8 ++++-- libs/remote_storage/src/simulate_failures.rs | 4 +++ pageserver/src/deletion_queue/deleter.rs | 10 ++++---- storage_scrubber/src/garbage.rs | 11 ++++---- 7 files changed, 54 insertions(+), 14 deletions(-) diff --git a/libs/remote_storage/src/azure_blob.rs b/libs/remote_storage/src/azure_blob.rs index a1d7569140..32c51bc2ad 100644 --- a/libs/remote_storage/src/azure_blob.rs +++ b/libs/remote_storage/src/azure_blob.rs @@ -624,6 +624,10 @@ impl RemoteStorage for AzureBlobStorage { res } + fn max_keys_per_delete(&self) -> usize { + super::MAX_KEYS_PER_DELETE_AZURE + } + async fn copy( &self, from: &RemotePath, diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs index 0ece29d99e..2a3468f986 100644 --- a/libs/remote_storage/src/lib.rs +++ b/libs/remote_storage/src/lib.rs @@ -70,7 +70,14 @@ pub const DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT: usize = 100; pub const DEFAULT_MAX_KEYS_PER_LIST_RESPONSE: Option = None; /// As defined in S3 docs -pub const MAX_KEYS_PER_DELETE: usize = 1000; +/// +/// +pub const MAX_KEYS_PER_DELETE_S3: usize = 1000; + +/// As defined in Azure docs +/// +/// +pub const MAX_KEYS_PER_DELETE_AZURE: usize = 256; const REMOTE_STORAGE_PREFIX_SEPARATOR: char = '/'; @@ -340,6 +347,14 @@ pub trait RemoteStorage: Send + Sync + 'static { cancel: &CancellationToken, ) -> anyhow::Result<()>; + /// Returns the maximum number of keys that a call to [`Self::delete_objects`] can delete without chunking + /// + /// The value returned is only an optimization hint, One can pass larger number of objects to + /// `delete_objects` as well. + /// + /// The value is guaranteed to be >= 1. + fn max_keys_per_delete(&self) -> usize; + /// Deletes all objects matching the given prefix. /// /// NB: this uses NoDelimiter and will match partial prefixes. For example, the prefix /a/b will @@ -533,6 +548,16 @@ impl GenericRemoteStorage> { } } + /// [`RemoteStorage::max_keys_per_delete`] + pub fn max_keys_per_delete(&self) -> usize { + match self { + Self::LocalFs(s) => s.max_keys_per_delete(), + Self::AwsS3(s) => s.max_keys_per_delete(), + Self::AzureBlob(s) => s.max_keys_per_delete(), + Self::Unreliable(s) => s.max_keys_per_delete(), + } + } + /// See [`RemoteStorage::delete_prefix`] pub async fn delete_prefix( &self, diff --git a/libs/remote_storage/src/local_fs.rs b/libs/remote_storage/src/local_fs.rs index ee2fc9d6e2..1a2d421c66 100644 --- a/libs/remote_storage/src/local_fs.rs +++ b/libs/remote_storage/src/local_fs.rs @@ -573,6 +573,10 @@ impl RemoteStorage for LocalFs { Ok(()) } + fn max_keys_per_delete(&self) -> usize { + super::MAX_KEYS_PER_DELETE_S3 + } + async fn copy( &self, from: &RemotePath, diff --git a/libs/remote_storage/src/s3_bucket.rs b/libs/remote_storage/src/s3_bucket.rs index cde32df402..2891f92d07 100644 --- a/libs/remote_storage/src/s3_bucket.rs +++ b/libs/remote_storage/src/s3_bucket.rs @@ -48,7 +48,7 @@ use crate::{ metrics::{start_counting_cancelled_wait, start_measuring_requests}, support::PermitCarrying, ConcurrencyLimiter, Download, DownloadError, DownloadOpts, Listing, ListingMode, ListingObject, - RemotePath, RemoteStorage, TimeTravelError, TimeoutOrCancel, MAX_KEYS_PER_DELETE, + RemotePath, RemoteStorage, TimeTravelError, TimeoutOrCancel, MAX_KEYS_PER_DELETE_S3, REMOTE_STORAGE_PREFIX_SEPARATOR, }; @@ -355,7 +355,7 @@ impl S3Bucket { let kind = RequestKind::Delete; let mut cancel = std::pin::pin!(cancel.cancelled()); - for chunk in delete_objects.chunks(MAX_KEYS_PER_DELETE) { + for chunk in delete_objects.chunks(MAX_KEYS_PER_DELETE_S3) { let started_at = start_measuring_requests(kind); let req = self @@ -832,6 +832,10 @@ impl RemoteStorage for S3Bucket { self.delete_oids(&permit, &delete_objects, cancel).await } + fn max_keys_per_delete(&self) -> usize { + MAX_KEYS_PER_DELETE_S3 + } + async fn delete(&self, path: &RemotePath, cancel: &CancellationToken) -> anyhow::Result<()> { let paths = std::array::from_ref(path); self.delete_objects(paths, cancel).await diff --git a/libs/remote_storage/src/simulate_failures.rs b/libs/remote_storage/src/simulate_failures.rs index 10db53971c..51833c1fe6 100644 --- a/libs/remote_storage/src/simulate_failures.rs +++ b/libs/remote_storage/src/simulate_failures.rs @@ -203,6 +203,10 @@ impl RemoteStorage for UnreliableWrapper { Ok(()) } + fn max_keys_per_delete(&self) -> usize { + self.inner.max_keys_per_delete() + } + async fn copy( &self, from: &RemotePath, diff --git a/pageserver/src/deletion_queue/deleter.rs b/pageserver/src/deletion_queue/deleter.rs index 3d02387c98..ef1dfbac19 100644 --- a/pageserver/src/deletion_queue/deleter.rs +++ b/pageserver/src/deletion_queue/deleter.rs @@ -9,7 +9,6 @@ use remote_storage::GenericRemoteStorage; use remote_storage::RemotePath; use remote_storage::TimeoutOrCancel; -use remote_storage::MAX_KEYS_PER_DELETE; use std::time::Duration; use tokio_util::sync::CancellationToken; use tracing::info; @@ -131,7 +130,8 @@ impl Deleter { } pub(super) async fn background(&mut self) -> Result<(), DeletionQueueError> { - self.accumulator.reserve(MAX_KEYS_PER_DELETE); + let max_keys_per_delete = self.remote_storage.max_keys_per_delete(); + self.accumulator.reserve(max_keys_per_delete); loop { if self.cancel.is_cancelled() { @@ -156,14 +156,14 @@ impl Deleter { match msg { DeleterMessage::Delete(mut list) => { - while !list.is_empty() || self.accumulator.len() == MAX_KEYS_PER_DELETE { - if self.accumulator.len() == MAX_KEYS_PER_DELETE { + while !list.is_empty() || self.accumulator.len() == max_keys_per_delete { + if self.accumulator.len() == max_keys_per_delete { self.flush().await?; // If we have received this number of keys, proceed with attempting to execute assert_eq!(self.accumulator.len(), 0); } - let available_slots = MAX_KEYS_PER_DELETE - self.accumulator.len(); + let available_slots = max_keys_per_delete - self.accumulator.len(); let take_count = std::cmp::min(available_slots, list.len()); for path in list.drain(list.len() - take_count..) { self.accumulator.push(path); diff --git a/storage_scrubber/src/garbage.rs b/storage_scrubber/src/garbage.rs index b026efbc3b..a4e5107e3d 100644 --- a/storage_scrubber/src/garbage.rs +++ b/storage_scrubber/src/garbage.rs @@ -459,12 +459,10 @@ pub async fn get_timeline_objects( Ok(list.keys) } -const MAX_KEYS_PER_DELETE: usize = 1000; - /// Drain a buffer of keys into DeleteObjects requests /// /// If `drain` is true, drains keys completely; otherwise stops when < -/// MAX_KEYS_PER_DELETE keys are left. +/// `max_keys_per_delete`` keys are left. /// `num_deleted` returns number of deleted keys. async fn do_delete( remote_client: &GenericRemoteStorage, @@ -474,9 +472,10 @@ async fn do_delete( progress_tracker: &mut DeletionProgressTracker, ) -> anyhow::Result<()> { let cancel = CancellationToken::new(); - while (!keys.is_empty() && drain) || (keys.len() >= MAX_KEYS_PER_DELETE) { + let max_keys_per_delete = remote_client.max_keys_per_delete(); + while (!keys.is_empty() && drain) || (keys.len() >= max_keys_per_delete) { let request_keys = - keys.split_off(keys.len() - (std::cmp::min(MAX_KEYS_PER_DELETE, keys.len()))); + keys.split_off(keys.len() - (std::cmp::min(max_keys_per_delete, keys.len()))); let request_keys: Vec = request_keys.into_iter().map(|o| o.key).collect(); @@ -617,7 +616,7 @@ pub async fn purge_garbage( } objects_to_delete.append(&mut object_list); - if objects_to_delete.len() >= MAX_KEYS_PER_DELETE { + if objects_to_delete.len() >= remote_client.max_keys_per_delete() { do_delete( &remote_client, &mut objects_to_delete,