From add51e13727649952466630658fbee0f48346e21 Mon Sep 17 00:00:00 2001 From: Shany Pozin Date: Fri, 9 Jun 2023 13:23:12 +0300 Subject: [PATCH] Add delete_objects to storage api (#4449) ## Summary of changes Add missing delete_objects API to support bulk deletes --- libs/remote_storage/src/lib.rs | 10 +++++ libs/remote_storage/src/local_fs.rs | 7 ++++ libs/remote_storage/src/s3_bucket.rs | 41 ++++++++++++++++++++ libs/remote_storage/src/simulate_failures.rs | 7 ++++ libs/remote_storage/tests/test_real_s3.rs | 31 +++++++++++++++ 5 files changed, 96 insertions(+) diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs index e0cc3ca543..ac1f8a357e 100644 --- a/libs/remote_storage/src/lib.rs +++ b/libs/remote_storage/src/lib.rs @@ -111,6 +111,8 @@ pub trait RemoteStorage: Send + Sync + 'static { ) -> Result; async fn delete(&self, path: &RemotePath) -> anyhow::Result<()>; + + async fn delete_objects<'a>(&self, paths: &'a [RemotePath]) -> anyhow::Result<()>; } pub struct Download { @@ -223,6 +225,14 @@ impl GenericRemoteStorage { Self::Unreliable(s) => s.delete(path).await, } } + + pub async fn delete_objects<'a>(&self, paths: &'a [RemotePath]) -> anyhow::Result<()> { + match self { + Self::LocalFs(s) => s.delete_objects(paths).await, + Self::AwsS3(s) => s.delete_objects(paths).await, + Self::Unreliable(s) => s.delete_objects(paths).await, + } + } } impl GenericRemoteStorage { diff --git a/libs/remote_storage/src/local_fs.rs b/libs/remote_storage/src/local_fs.rs index c73e647845..59304c2481 100644 --- a/libs/remote_storage/src/local_fs.rs +++ b/libs/remote_storage/src/local_fs.rs @@ -320,6 +320,13 @@ impl RemoteStorage for LocalFs { .await .map_err(|e| anyhow::anyhow!(e))?) } + + async fn delete_objects<'a>(&self, paths: &'a [RemotePath]) -> anyhow::Result<()> { + for path in paths { + self.delete(path).await? + } + Ok(()) + } } fn storage_metadata_path(original_path: &Path) -> PathBuf { diff --git a/libs/remote_storage/src/s3_bucket.rs b/libs/remote_storage/src/s3_bucket.rs index 0be8c72fe0..38e1bf00f8 100644 --- a/libs/remote_storage/src/s3_bucket.rs +++ b/libs/remote_storage/src/s3_bucket.rs @@ -17,6 +17,7 @@ use aws_sdk_s3::{ error::SdkError, operation::get_object::GetObjectError, primitives::ByteStream, + types::{Delete, ObjectIdentifier}, Client, }; use aws_smithy_http::body::SdkBody; @@ -81,12 +82,24 @@ pub(super) mod metrics { .inc(); } + pub fn inc_delete_objects(count: u64) { + S3_REQUESTS_COUNT + .with_label_values(&["delete_object"]) + .inc_by(count); + } + pub fn inc_delete_object_fail() { S3_REQUESTS_FAIL_COUNT .with_label_values(&["delete_object"]) .inc(); } + pub fn inc_delete_objects_fail(count: u64) { + S3_REQUESTS_FAIL_COUNT + .with_label_values(&["delete_object"]) + .inc_by(count); + } + pub fn inc_list_objects() { S3_REQUESTS_COUNT.with_label_values(&["list_objects"]).inc(); } @@ -396,6 +409,34 @@ impl RemoteStorage for S3Bucket { }) .await } + async fn delete_objects<'a>(&self, paths: &'a [RemotePath]) -> anyhow::Result<()> { + let _guard = self + .concurrency_limiter + .acquire() + .await + .context("Concurrency limiter semaphore got closed during S3 delete")?; + + let mut delete_objects = Vec::with_capacity(paths.len()); + for path in paths { + let obj_id = ObjectIdentifier::builder() + .set_key(Some(self.relative_path_to_s3_object(path))) + .build(); + delete_objects.push(obj_id); + } + + metrics::inc_delete_objects(paths.len() as u64); + self.client + .delete_objects() + .bucket(self.bucket_name.clone()) + .delete(Delete::builder().set_objects(Some(delete_objects)).build()) + .send() + .await + .map_err(|e| { + metrics::inc_delete_objects_fail(paths.len() as u64); + e + })?; + Ok(()) + } async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> { let _guard = self diff --git a/libs/remote_storage/src/simulate_failures.rs b/libs/remote_storage/src/simulate_failures.rs index cb40859831..2f341bb29d 100644 --- a/libs/remote_storage/src/simulate_failures.rs +++ b/libs/remote_storage/src/simulate_failures.rs @@ -119,4 +119,11 @@ impl RemoteStorage for UnreliableWrapper { self.attempt(RemoteOp::Delete(path.clone()))?; self.inner.delete(path).await } + + async fn delete_objects<'a>(&self, paths: &'a [RemotePath]) -> anyhow::Result<()> { + for path in paths { + self.delete(path).await? + } + Ok(()) + } } diff --git a/libs/remote_storage/tests/test_real_s3.rs b/libs/remote_storage/tests/test_real_s3.rs index 48ed8f686c..5f52b0754c 100644 --- a/libs/remote_storage/tests/test_real_s3.rs +++ b/libs/remote_storage/tests/test_real_s3.rs @@ -107,6 +107,37 @@ async fn s3_delete_non_exising_works(ctx: &mut MaybeEnabledS3) -> anyhow::Result Ok(()) } +#[test_context(MaybeEnabledS3)] +#[tokio::test] +async fn s3_delete_objects_works(ctx: &mut MaybeEnabledS3) -> anyhow::Result<()> { + let ctx = match ctx { + MaybeEnabledS3::Enabled(ctx) => ctx, + MaybeEnabledS3::Disabled => return Ok(()), + }; + + let path1 = RemotePath::new(&PathBuf::from(format!("{}/path1", ctx.base_prefix,))) + .with_context(|| "RemotePath conversion")?; + + let path2 = RemotePath::new(&PathBuf::from(format!("{}/path2", ctx.base_prefix,))) + .with_context(|| "RemotePath conversion")?; + + let data1 = "remote blob data1".as_bytes(); + let data1_len = data1.len(); + let data2 = "remote blob data2".as_bytes(); + let data2_len = data2.len(); + ctx.client + .upload(std::io::Cursor::new(data1), data1_len, &path1, None) + .await?; + + ctx.client + .upload(std::io::Cursor::new(data2), data2_len, &path2, None) + .await?; + + ctx.client.delete_objects(&[path1, path2]).await?; + + Ok(()) +} + fn ensure_logging_ready() { LOGGING_DONE.get_or_init(|| { utils::logging::init(