mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-22 21:59:59 +00:00
Replace MAX_KEYS_PER_DELETE constant with function (#10061)
Azure has a different per-request limit of 256 items for bulk deletion compared to the number of 1000 on AWS. Therefore, we need to support multiple values. Due to `GenericRemoteStorage`, we can't add an associated constant, but it has to be a function. The PR replaces the `MAX_KEYS_PER_DELETE` constant with a function of the same name, implemented on both the `RemoteStorage` trait as well as on `GenericRemoteStorage`. The value serves as hint of how many objects to pass to the `delete_objects` function. Reading: * https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch * https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html Part of #7931
This commit is contained in:
@@ -624,6 +624,10 @@ impl RemoteStorage for AzureBlobStorage {
|
||||
res
|
||||
}
|
||||
|
||||
fn max_keys_per_delete(&self) -> usize {
|
||||
super::MAX_KEYS_PER_DELETE_AZURE
|
||||
}
|
||||
|
||||
async fn copy(
|
||||
&self,
|
||||
from: &RemotePath,
|
||||
|
||||
@@ -70,7 +70,14 @@ pub const DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT: usize = 100;
|
||||
pub const DEFAULT_MAX_KEYS_PER_LIST_RESPONSE: Option<i32> = None;
|
||||
|
||||
/// As defined in S3 docs
|
||||
pub const MAX_KEYS_PER_DELETE: usize = 1000;
|
||||
///
|
||||
/// <https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html>
|
||||
pub const MAX_KEYS_PER_DELETE_S3: usize = 1000;
|
||||
|
||||
/// As defined in Azure docs
|
||||
///
|
||||
/// <https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch>
|
||||
pub const MAX_KEYS_PER_DELETE_AZURE: usize = 256;
|
||||
|
||||
const REMOTE_STORAGE_PREFIX_SEPARATOR: char = '/';
|
||||
|
||||
@@ -340,6 +347,14 @@ pub trait RemoteStorage: Send + Sync + 'static {
|
||||
cancel: &CancellationToken,
|
||||
) -> anyhow::Result<()>;
|
||||
|
||||
/// Returns the maximum number of keys that a call to [`Self::delete_objects`] can delete without chunking
|
||||
///
|
||||
/// The value returned is only an optimization hint, One can pass larger number of objects to
|
||||
/// `delete_objects` as well.
|
||||
///
|
||||
/// The value is guaranteed to be >= 1.
|
||||
fn max_keys_per_delete(&self) -> usize;
|
||||
|
||||
/// Deletes all objects matching the given prefix.
|
||||
///
|
||||
/// NB: this uses NoDelimiter and will match partial prefixes. For example, the prefix /a/b will
|
||||
@@ -533,6 +548,16 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
|
||||
}
|
||||
}
|
||||
|
||||
/// [`RemoteStorage::max_keys_per_delete`]
|
||||
pub fn max_keys_per_delete(&self) -> usize {
|
||||
match self {
|
||||
Self::LocalFs(s) => s.max_keys_per_delete(),
|
||||
Self::AwsS3(s) => s.max_keys_per_delete(),
|
||||
Self::AzureBlob(s) => s.max_keys_per_delete(),
|
||||
Self::Unreliable(s) => s.max_keys_per_delete(),
|
||||
}
|
||||
}
|
||||
|
||||
/// See [`RemoteStorage::delete_prefix`]
|
||||
pub async fn delete_prefix(
|
||||
&self,
|
||||
|
||||
@@ -573,6 +573,10 @@ impl RemoteStorage for LocalFs {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn max_keys_per_delete(&self) -> usize {
|
||||
super::MAX_KEYS_PER_DELETE_S3
|
||||
}
|
||||
|
||||
async fn copy(
|
||||
&self,
|
||||
from: &RemotePath,
|
||||
|
||||
@@ -48,7 +48,7 @@ use crate::{
|
||||
metrics::{start_counting_cancelled_wait, start_measuring_requests},
|
||||
support::PermitCarrying,
|
||||
ConcurrencyLimiter, Download, DownloadError, DownloadOpts, Listing, ListingMode, ListingObject,
|
||||
RemotePath, RemoteStorage, TimeTravelError, TimeoutOrCancel, MAX_KEYS_PER_DELETE,
|
||||
RemotePath, RemoteStorage, TimeTravelError, TimeoutOrCancel, MAX_KEYS_PER_DELETE_S3,
|
||||
REMOTE_STORAGE_PREFIX_SEPARATOR,
|
||||
};
|
||||
|
||||
@@ -355,7 +355,7 @@ impl S3Bucket {
|
||||
let kind = RequestKind::Delete;
|
||||
let mut cancel = std::pin::pin!(cancel.cancelled());
|
||||
|
||||
for chunk in delete_objects.chunks(MAX_KEYS_PER_DELETE) {
|
||||
for chunk in delete_objects.chunks(MAX_KEYS_PER_DELETE_S3) {
|
||||
let started_at = start_measuring_requests(kind);
|
||||
|
||||
let req = self
|
||||
@@ -832,6 +832,10 @@ impl RemoteStorage for S3Bucket {
|
||||
self.delete_oids(&permit, &delete_objects, cancel).await
|
||||
}
|
||||
|
||||
fn max_keys_per_delete(&self) -> usize {
|
||||
MAX_KEYS_PER_DELETE_S3
|
||||
}
|
||||
|
||||
async fn delete(&self, path: &RemotePath, cancel: &CancellationToken) -> anyhow::Result<()> {
|
||||
let paths = std::array::from_ref(path);
|
||||
self.delete_objects(paths, cancel).await
|
||||
|
||||
@@ -203,6 +203,10 @@ impl RemoteStorage for UnreliableWrapper {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn max_keys_per_delete(&self) -> usize {
|
||||
self.inner.max_keys_per_delete()
|
||||
}
|
||||
|
||||
async fn copy(
|
||||
&self,
|
||||
from: &RemotePath,
|
||||
|
||||
@@ -9,7 +9,6 @@
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
use remote_storage::RemotePath;
|
||||
use remote_storage::TimeoutOrCancel;
|
||||
use remote_storage::MAX_KEYS_PER_DELETE;
|
||||
use std::time::Duration;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::info;
|
||||
@@ -131,7 +130,8 @@ impl Deleter {
|
||||
}
|
||||
|
||||
pub(super) async fn background(&mut self) -> Result<(), DeletionQueueError> {
|
||||
self.accumulator.reserve(MAX_KEYS_PER_DELETE);
|
||||
let max_keys_per_delete = self.remote_storage.max_keys_per_delete();
|
||||
self.accumulator.reserve(max_keys_per_delete);
|
||||
|
||||
loop {
|
||||
if self.cancel.is_cancelled() {
|
||||
@@ -156,14 +156,14 @@ impl Deleter {
|
||||
|
||||
match msg {
|
||||
DeleterMessage::Delete(mut list) => {
|
||||
while !list.is_empty() || self.accumulator.len() == MAX_KEYS_PER_DELETE {
|
||||
if self.accumulator.len() == MAX_KEYS_PER_DELETE {
|
||||
while !list.is_empty() || self.accumulator.len() == max_keys_per_delete {
|
||||
if self.accumulator.len() == max_keys_per_delete {
|
||||
self.flush().await?;
|
||||
// If we have received this number of keys, proceed with attempting to execute
|
||||
assert_eq!(self.accumulator.len(), 0);
|
||||
}
|
||||
|
||||
let available_slots = MAX_KEYS_PER_DELETE - self.accumulator.len();
|
||||
let available_slots = max_keys_per_delete - self.accumulator.len();
|
||||
let take_count = std::cmp::min(available_slots, list.len());
|
||||
for path in list.drain(list.len() - take_count..) {
|
||||
self.accumulator.push(path);
|
||||
|
||||
@@ -459,12 +459,10 @@ pub async fn get_timeline_objects(
|
||||
Ok(list.keys)
|
||||
}
|
||||
|
||||
const MAX_KEYS_PER_DELETE: usize = 1000;
|
||||
|
||||
/// Drain a buffer of keys into DeleteObjects requests
|
||||
///
|
||||
/// If `drain` is true, drains keys completely; otherwise stops when <
|
||||
/// MAX_KEYS_PER_DELETE keys are left.
|
||||
/// `max_keys_per_delete`` keys are left.
|
||||
/// `num_deleted` returns number of deleted keys.
|
||||
async fn do_delete(
|
||||
remote_client: &GenericRemoteStorage,
|
||||
@@ -474,9 +472,10 @@ async fn do_delete(
|
||||
progress_tracker: &mut DeletionProgressTracker,
|
||||
) -> anyhow::Result<()> {
|
||||
let cancel = CancellationToken::new();
|
||||
while (!keys.is_empty() && drain) || (keys.len() >= MAX_KEYS_PER_DELETE) {
|
||||
let max_keys_per_delete = remote_client.max_keys_per_delete();
|
||||
while (!keys.is_empty() && drain) || (keys.len() >= max_keys_per_delete) {
|
||||
let request_keys =
|
||||
keys.split_off(keys.len() - (std::cmp::min(MAX_KEYS_PER_DELETE, keys.len())));
|
||||
keys.split_off(keys.len() - (std::cmp::min(max_keys_per_delete, keys.len())));
|
||||
|
||||
let request_keys: Vec<RemotePath> = request_keys.into_iter().map(|o| o.key).collect();
|
||||
|
||||
@@ -617,7 +616,7 @@ pub async fn purge_garbage(
|
||||
}
|
||||
|
||||
objects_to_delete.append(&mut object_list);
|
||||
if objects_to_delete.len() >= MAX_KEYS_PER_DELETE {
|
||||
if objects_to_delete.len() >= remote_client.max_keys_per_delete() {
|
||||
do_delete(
|
||||
&remote_client,
|
||||
&mut objects_to_delete,
|
||||
|
||||
Reference in New Issue
Block a user