diff --git a/libs/remote_storage/src/s3_bucket.rs b/libs/remote_storage/src/s3_bucket.rs index 38e1bf00f8..dafb6dcb45 100644 --- a/libs/remote_storage/src/s3_bucket.rs +++ b/libs/remote_storage/src/s3_bucket.rs @@ -34,6 +34,8 @@ use crate::{ Download, DownloadError, RemotePath, RemoteStorage, S3Config, REMOTE_STORAGE_PREFIX_SEPARATOR, }; +const MAX_DELETE_OBJECTS_REQUEST_SIZE: usize = 1000; + pub(super) mod metrics { use metrics::{register_int_counter_vec, IntCounterVec}; use once_cell::sync::Lazy; @@ -424,17 +426,33 @@ impl RemoteStorage for S3Bucket { delete_objects.push(obj_id); } - metrics::inc_delete_objects(paths.len() as u64); - self.client - .delete_objects() - .bucket(self.bucket_name.clone()) - .delete(Delete::builder().set_objects(Some(delete_objects)).build()) - .send() - .await - .map_err(|e| { - metrics::inc_delete_objects_fail(paths.len() as u64); - e - })?; + for chunk in delete_objects.chunks(MAX_DELETE_OBJECTS_REQUEST_SIZE) { + metrics::inc_delete_objects(chunk.len() as u64); + + let resp = self + .client + .delete_objects() + .bucket(self.bucket_name.clone()) + .delete(Delete::builder().set_objects(Some(chunk.to_vec())).build()) + .send() + .await; + + match resp { + Ok(resp) => { + if let Some(errors) = resp.errors { + metrics::inc_delete_objects_fail(errors.len() as u64); + return Err(anyhow::format_err!( + "Failed to delete {} objects", + errors.len() + )); + } + } + Err(e) => { + metrics::inc_delete_objects_fail(chunk.len() as u64); + return Err(e.into()); + } + } + } Ok(()) } diff --git a/libs/remote_storage/src/simulate_failures.rs b/libs/remote_storage/src/simulate_failures.rs index 2f341bb29d..741c18bf6f 100644 --- a/libs/remote_storage/src/simulate_failures.rs +++ b/libs/remote_storage/src/simulate_failures.rs @@ -24,6 +24,7 @@ enum RemoteOp { Upload(RemotePath), Download(RemotePath), Delete(RemotePath), + DeleteObjects(Vec), } impl UnreliableWrapper { @@ -121,8 +122,18 @@ impl RemoteStorage for UnreliableWrapper { } async fn delete_objects<'a>(&self, paths: &'a [RemotePath]) -> anyhow::Result<()> { + self.attempt(RemoteOp::DeleteObjects(paths.to_vec()))?; + let mut error_counter = 0; for path in paths { - self.delete(path).await? + if (self.delete(path).await).is_err() { + error_counter += 1; + } + } + if error_counter > 0 { + return Err(anyhow::anyhow!( + "failed to delete {} objects", + error_counter + )); } Ok(()) }