From 4b0204ede532956d2f08b83ffa01822711b027e0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Tue, 16 Jan 2024 13:07:20 +0100 Subject: [PATCH] Add copy operation tests and implement them for azure blobs (#6362) This implements the `copy` operation for azure blobs, added to S3 by #6091, and adds tests both to s3 and azure ensuring that the copy operation works. --- libs/remote_storage/src/azure_blob.rs | 53 ++++++++++++++++++-- libs/remote_storage/tests/test_real_azure.rs | 38 ++++++++++++++ libs/remote_storage/tests/test_real_s3.rs | 38 ++++++++++++++ 3 files changed, 124 insertions(+), 5 deletions(-) diff --git a/libs/remote_storage/src/azure_blob.rs b/libs/remote_storage/src/azure_blob.rs index 18cf5d97ba..7895a21f66 100644 --- a/libs/remote_storage/src/azure_blob.rs +++ b/libs/remote_storage/src/azure_blob.rs @@ -5,7 +5,9 @@ use std::collections::HashMap; use std::env; use std::num::NonZeroU32; use std::pin::Pin; +use std::str::FromStr; use std::sync::Arc; +use std::time::Duration; use super::REMOTE_STORAGE_PREFIX_SEPARATOR; use anyhow::Result; @@ -13,12 +15,14 @@ use azure_core::request_options::{MaxResults, Metadata, Range}; use azure_core::RetryOptions; use azure_identity::DefaultAzureCredential; use azure_storage::StorageCredentials; +use azure_storage_blobs::blob::CopyStatus; use azure_storage_blobs::prelude::ClientBuilder; use azure_storage_blobs::{blob::operations::GetBlobBuilder, prelude::ContainerClient}; use bytes::Bytes; use futures::stream::Stream; use futures_util::StreamExt; -use http_types::StatusCode; +use http_types::{StatusCode, Url}; +use tokio::time::Instant; use tracing::debug; use crate::s3_bucket::RequestKind; @@ -323,10 +327,49 @@ impl RemoteStorage for AzureBlobStorage { Ok(()) } - async fn copy(&self, _from: &RemotePath, _to: &RemotePath) -> anyhow::Result<()> { - Err(anyhow::anyhow!( - "copy for azure blob storage is not implemented" - )) + async fn copy(&self, from: &RemotePath, to: &RemotePath) -> anyhow::Result<()> { + let _permit = self.permit(RequestKind::Copy).await; + let blob_client = self.client.blob_client(self.relative_path_to_name(to)); + + let source_url = format!( + "{}/{}", + self.client.url()?, + self.relative_path_to_name(from) + ); + let builder = blob_client.copy(Url::from_str(&source_url)?); + + let result = builder.into_future().await?; + + let mut copy_status = result.copy_status; + let start_time = Instant::now(); + const MAX_WAIT_TIME: Duration = Duration::from_secs(60); + loop { + match copy_status { + CopyStatus::Aborted => { + anyhow::bail!("Received abort for copy from {from} to {to}."); + } + CopyStatus::Failed => { + anyhow::bail!("Received failure response for copy from {from} to {to}."); + } + CopyStatus::Success => return Ok(()), + CopyStatus::Pending => (), + } + // The copy is taking longer. Waiting a second and then re-trying. + // TODO estimate time based on copy_progress and adjust time based on that + tokio::time::sleep(Duration::from_millis(1000)).await; + let properties = blob_client.get_properties().into_future().await?; + let Some(status) = properties.blob.properties.copy_status else { + tracing::warn!("copy_status for copy is None!, from={from}, to={to}"); + return Ok(()); + }; + if start_time.elapsed() > MAX_WAIT_TIME { + anyhow::bail!("Copy from from {from} to {to} took longer than limit MAX_WAIT_TIME={}s. copy_pogress={:?}.", + MAX_WAIT_TIME.as_secs_f32(), + properties.blob.properties.copy_progress, + ); + } + copy_status = status; + } } } diff --git a/libs/remote_storage/tests/test_real_azure.rs b/libs/remote_storage/tests/test_real_azure.rs index 0387dc30e7..b359949174 100644 --- a/libs/remote_storage/tests/test_real_azure.rs +++ b/libs/remote_storage/tests/test_real_azure.rs @@ -263,6 +263,44 @@ async fn azure_upload_download_works(ctx: &mut MaybeEnabledAzure) -> anyhow::Res Ok(()) } +#[test_context(MaybeEnabledAzure)] +#[tokio::test] +async fn azure_copy_works(ctx: &mut MaybeEnabledAzure) -> anyhow::Result<()> { + let MaybeEnabledAzure::Enabled(ctx) = ctx else { + return Ok(()); + }; + + let path = RemotePath::new(Utf8Path::new( + format!("{}/file_to_copy", ctx.base_prefix).as_str(), + )) + .with_context(|| "RemotePath conversion")?; + let path_dest = RemotePath::new(Utf8Path::new( + format!("{}/file_dest", ctx.base_prefix).as_str(), + )) + .with_context(|| "RemotePath conversion")?; + + let orig = bytes::Bytes::from_static("remote blob data content".as_bytes()); + + let (data, len) = wrap_stream(orig.clone()); + + ctx.client.upload(data, len, &path, None).await?; + + // Normal download request + ctx.client.copy_object(&path, &path_dest).await?; + + let dl = ctx.client.download(&path_dest).await?; + let buf = download_to_vec(dl).await?; + assert_eq!(&buf, &orig); + + debug!("Cleanup: deleting file at path {path:?}"); + ctx.client + .delete_objects(&[path.clone(), path_dest.clone()]) + .await + .with_context(|| format!("{path:?} removal"))?; + + Ok(()) +} + struct EnabledAzure { client: Arc, base_prefix: &'static str, diff --git a/libs/remote_storage/tests/test_real_s3.rs b/libs/remote_storage/tests/test_real_s3.rs index 8f46b2abd6..661716b48f 100644 --- a/libs/remote_storage/tests/test_real_s3.rs +++ b/libs/remote_storage/tests/test_real_s3.rs @@ -259,6 +259,44 @@ async fn s3_upload_download_works(ctx: &mut MaybeEnabledS3) -> anyhow::Result<() Ok(()) } +#[test_context(MaybeEnabledS3)] +#[tokio::test] +async fn s3_copy_works(ctx: &mut MaybeEnabledS3) -> anyhow::Result<()> { + let MaybeEnabledS3::Enabled(ctx) = ctx else { + return Ok(()); + }; + + let path = RemotePath::new(Utf8Path::new( + format!("{}/file_to_copy", ctx.base_prefix).as_str(), + )) + .with_context(|| "RemotePath conversion")?; + let path_dest = RemotePath::new(Utf8Path::new( + format!("{}/file_dest", ctx.base_prefix).as_str(), + )) + .with_context(|| "RemotePath conversion")?; + + let orig = bytes::Bytes::from_static("remote blob data content".as_bytes()); + + let (data, len) = wrap_stream(orig.clone()); + + ctx.client.upload(data, len, &path, None).await?; + + // Normal download request + ctx.client.copy_object(&path, &path_dest).await?; + + let dl = ctx.client.download(&path_dest).await?; + let buf = download_to_vec(dl).await?; + assert_eq!(&buf, &orig); + + debug!("Cleanup: deleting file at path {path:?}"); + ctx.client + .delete_objects(&[path.clone(), path_dest.clone()]) + .await + .with_context(|| format!("{path:?} removal"))?; + + Ok(()) +} + struct EnabledS3 { client: Arc, base_prefix: &'static str,