From fbb979d5e34d1a2aed6578faba72d3b6cad60366 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Tue, 19 Dec 2023 11:29:50 +0100 Subject: [PATCH] remote_storage: move shared utilities for S3 and Azure into common module (#6176) The PR does two things: * move the util functions present in the remote_storage Azure and S3 test files into a shared one, deduplicating them. * add a `s3_upload_download_works` test as a copy of the Azure test The goal is mainly to fight duplication and make the code a little bit more generic (like removing mentions of s3 and azure from function names). This is a first step towards #6146. --- libs/remote_storage/tests/common/mod.rs | 200 +++++++++++++++ libs/remote_storage/tests/test_real_azure.rs | 219 ++-------------- libs/remote_storage/tests/test_real_s3.rs | 253 +++++-------------- 3 files changed, 288 insertions(+), 384 deletions(-) create mode 100644 libs/remote_storage/tests/common/mod.rs diff --git a/libs/remote_storage/tests/common/mod.rs b/libs/remote_storage/tests/common/mod.rs new file mode 100644 index 0000000000..bca117ed1a --- /dev/null +++ b/libs/remote_storage/tests/common/mod.rs @@ -0,0 +1,200 @@ +use std::collections::HashSet; +use std::ops::ControlFlow; +use std::path::PathBuf; +use std::sync::Arc; + +use anyhow::Context; +use bytes::Bytes; +use camino::Utf8Path; +use futures::stream::Stream; +use once_cell::sync::OnceCell; +use remote_storage::{Download, GenericRemoteStorage, RemotePath}; +use tokio::task::JoinSet; +use tracing::{debug, error, info}; + +static LOGGING_DONE: OnceCell<()> = OnceCell::new(); + +pub(crate) fn upload_stream( + content: std::borrow::Cow<'static, [u8]>, +) -> ( + impl Stream> + Send + Sync + 'static, + usize, +) { + use std::borrow::Cow; + + let content = match content { + Cow::Borrowed(x) => Bytes::from_static(x), + Cow::Owned(vec) => Bytes::from(vec), + }; + wrap_stream(content) +} + +pub(crate) fn wrap_stream( + content: bytes::Bytes, +) -> ( + impl Stream> + Send + Sync + 'static, + usize, +) { + let len = content.len(); + let content = futures::future::ready(Ok(content)); + + (futures::stream::once(content), len) +} + +pub(crate) async fn download_to_vec(dl: Download) -> anyhow::Result> { + let mut buf = Vec::new(); + tokio::io::copy_buf( + &mut tokio_util::io::StreamReader::new(dl.download_stream), + &mut buf, + ) + .await?; + Ok(buf) +} + +// Uploads files `folder{j}/blob{i}.txt`. See test description for more details. +pub(crate) async fn upload_simple_remote_data( + client: &Arc, + upload_tasks_count: usize, +) -> ControlFlow, HashSet> { + info!("Creating {upload_tasks_count} remote files"); + let mut upload_tasks = JoinSet::new(); + for i in 1..upload_tasks_count + 1 { + let task_client = Arc::clone(client); + upload_tasks.spawn(async move { + let blob_path = PathBuf::from(format!("folder{}/blob_{}.txt", i / 7, i)); + let blob_path = RemotePath::new( + Utf8Path::from_path(blob_path.as_path()).expect("must be valid blob path"), + ) + .with_context(|| format!("{blob_path:?} to RemotePath conversion"))?; + debug!("Creating remote item {i} at path {blob_path:?}"); + + let (data, len) = upload_stream(format!("remote blob data {i}").into_bytes().into()); + task_client.upload(data, len, &blob_path, None).await?; + + Ok::<_, anyhow::Error>(blob_path) + }); + } + + let mut upload_tasks_failed = false; + let mut uploaded_blobs = HashSet::with_capacity(upload_tasks_count); + while let Some(task_run_result) = upload_tasks.join_next().await { + match task_run_result + .context("task join failed") + .and_then(|task_result| task_result.context("upload task failed")) + { + Ok(upload_path) => { + uploaded_blobs.insert(upload_path); + } + Err(e) => { + error!("Upload task failed: {e:?}"); + upload_tasks_failed = true; + } + } + } + + if upload_tasks_failed { + ControlFlow::Break(uploaded_blobs) + } else { + ControlFlow::Continue(uploaded_blobs) + } +} + +pub(crate) async fn cleanup( + client: &Arc, + objects_to_delete: HashSet, +) { + info!( + "Removing {} objects from the remote storage during cleanup", + objects_to_delete.len() + ); + let mut delete_tasks = JoinSet::new(); + for object_to_delete in objects_to_delete { + let task_client = Arc::clone(client); + delete_tasks.spawn(async move { + debug!("Deleting remote item at path {object_to_delete:?}"); + task_client + .delete(&object_to_delete) + .await + .with_context(|| format!("{object_to_delete:?} removal")) + }); + } + + while let Some(task_run_result) = delete_tasks.join_next().await { + match task_run_result { + Ok(task_result) => match task_result { + Ok(()) => {} + Err(e) => error!("Delete task failed: {e:?}"), + }, + Err(join_err) => error!("Delete task did not finish correctly: {join_err}"), + } + } +} +pub(crate) struct Uploads { + pub(crate) prefixes: HashSet, + pub(crate) blobs: HashSet, +} + +pub(crate) async fn upload_remote_data( + client: &Arc, + base_prefix_str: &'static str, + upload_tasks_count: usize, +) -> ControlFlow { + info!("Creating {upload_tasks_count} remote files"); + let mut upload_tasks = JoinSet::new(); + for i in 1..upload_tasks_count + 1 { + let task_client = Arc::clone(client); + upload_tasks.spawn(async move { + let prefix = format!("{base_prefix_str}/sub_prefix_{i}/"); + let blob_prefix = RemotePath::new(Utf8Path::new(&prefix)) + .with_context(|| format!("{prefix:?} to RemotePath conversion"))?; + let blob_path = blob_prefix.join(Utf8Path::new(&format!("blob_{i}"))); + debug!("Creating remote item {i} at path {blob_path:?}"); + + let (data, data_len) = + upload_stream(format!("remote blob data {i}").into_bytes().into()); + task_client.upload(data, data_len, &blob_path, None).await?; + + Ok::<_, anyhow::Error>((blob_prefix, blob_path)) + }); + } + + let mut upload_tasks_failed = false; + let mut uploaded_prefixes = HashSet::with_capacity(upload_tasks_count); + let mut uploaded_blobs = HashSet::with_capacity(upload_tasks_count); + while let Some(task_run_result) = upload_tasks.join_next().await { + match task_run_result + .context("task join failed") + .and_then(|task_result| task_result.context("upload task failed")) + { + Ok((upload_prefix, upload_path)) => { + uploaded_prefixes.insert(upload_prefix); + uploaded_blobs.insert(upload_path); + } + Err(e) => { + error!("Upload task failed: {e:?}"); + upload_tasks_failed = true; + } + } + } + + let uploads = Uploads { + prefixes: uploaded_prefixes, + blobs: uploaded_blobs, + }; + if upload_tasks_failed { + ControlFlow::Break(uploads) + } else { + ControlFlow::Continue(uploads) + } +} + +pub(crate) fn ensure_logging_ready() { + LOGGING_DONE.get_or_init(|| { + utils::logging::init( + utils::logging::LogFormat::Test, + utils::logging::TracingErrorLayerEnablement::Disabled, + utils::logging::Output::Stdout, + ) + .expect("logging init failed"); + }); +} diff --git a/libs/remote_storage/tests/test_real_azure.rs b/libs/remote_storage/tests/test_real_azure.rs index 7327803198..0387dc30e7 100644 --- a/libs/remote_storage/tests/test_real_azure.rs +++ b/libs/remote_storage/tests/test_real_azure.rs @@ -2,23 +2,23 @@ use std::collections::HashSet; use std::env; use std::num::NonZeroUsize; use std::ops::ControlFlow; -use std::path::PathBuf; use std::sync::Arc; use std::time::UNIX_EPOCH; use anyhow::Context; -use bytes::Bytes; use camino::Utf8Path; -use futures::stream::Stream; -use once_cell::sync::OnceCell; use remote_storage::{ - AzureConfig, Download, GenericRemoteStorage, RemotePath, RemoteStorageConfig, RemoteStorageKind, + AzureConfig, GenericRemoteStorage, RemotePath, RemoteStorageConfig, RemoteStorageKind, }; use test_context::{test_context, AsyncTestContext}; -use tokio::task::JoinSet; -use tracing::{debug, error, info}; +use tracing::{debug, info}; -static LOGGING_DONE: OnceCell<()> = OnceCell::new(); +mod common; + +use common::{ + cleanup, download_to_vec, ensure_logging_ready, upload_remote_data, upload_simple_remote_data, + upload_stream, wrap_stream, +}; const ENABLE_REAL_AZURE_REMOTE_STORAGE_ENV_VAR_NAME: &str = "ENABLE_REAL_AZURE_REMOTE_STORAGE"; @@ -30,7 +30,7 @@ const BASE_PREFIX: &str = "test"; /// If real Azure tests are disabled, the test passes, skipping any real test run: currently, there's no way to mark the test ignored in runtime with the /// deafult test framework, see https://github.com/rust-lang/rust/issues/68007 for details. /// -/// First, the test creates a set of Azure blobs with keys `/${random_prefix_part}/${base_prefix_str}/sub_prefix_${i}/blob_${i}` in [`upload_azure_data`] +/// First, the test creates a set of Azure blobs with keys `/${random_prefix_part}/${base_prefix_str}/sub_prefix_${i}/blob_${i}` in [`upload_remote_data`] /// where /// * `random_prefix_part` is set for the entire Azure client during the Azure client creation in [`create_azure_client`], to avoid multiple test runs interference /// * `base_prefix_str` is a common prefix to use in the client requests: we would want to ensure that the client is able to list nested prefixes inside the bucket @@ -97,7 +97,7 @@ async fn azure_pagination_should_work( /// Uses real Azure and requires [`ENABLE_REAL_AZURE_REMOTE_STORAGE_ENV_VAR_NAME`] and related Azure cred env vars specified. Test will skip real code and pass if env vars not set. /// See `Azure_pagination_should_work` for more information. /// -/// First, create a set of Azure objects with keys `random_prefix/folder{j}/blob_{i}.txt` in [`upload_azure_data`] +/// First, create a set of Azure objects with keys `random_prefix/folder{j}/blob_{i}.txt` in [`upload_remote_data`] /// Then performs the following queries: /// 1. `list_files(None)`. This should return all files `random_prefix/folder{j}/blob_{i}.txt` /// 2. `list_files("folder1")`. This should return all files `random_prefix/folder1/blob_{i}.txt` @@ -218,18 +218,9 @@ async fn azure_upload_download_works(ctx: &mut MaybeEnabledAzure) -> anyhow::Res ctx.client.upload(data, len, &path, None).await?; - async fn download_and_compare(dl: Download) -> anyhow::Result> { - let mut buf = Vec::new(); - tokio::io::copy_buf( - &mut tokio_util::io::StreamReader::new(dl.download_stream), - &mut buf, - ) - .await?; - Ok(buf) - } // Normal download request let dl = ctx.client.download(&path).await?; - let buf = download_and_compare(dl).await?; + let buf = download_to_vec(dl).await?; assert_eq!(&buf, &orig); // Full range (end specified) @@ -237,12 +228,12 @@ async fn azure_upload_download_works(ctx: &mut MaybeEnabledAzure) -> anyhow::Res .client .download_byte_range(&path, 0, Some(len as u64)) .await?; - let buf = download_and_compare(dl).await?; + let buf = download_to_vec(dl).await?; assert_eq!(&buf, &orig); // partial range (end specified) let dl = ctx.client.download_byte_range(&path, 4, Some(10)).await?; - let buf = download_and_compare(dl).await?; + let buf = download_to_vec(dl).await?; assert_eq!(&buf, &orig[4..10]); // partial range (end beyond real end) @@ -250,17 +241,17 @@ async fn azure_upload_download_works(ctx: &mut MaybeEnabledAzure) -> anyhow::Res .client .download_byte_range(&path, 8, Some(len as u64 * 100)) .await?; - let buf = download_and_compare(dl).await?; + let buf = download_to_vec(dl).await?; assert_eq!(&buf, &orig[8..]); // Partial range (end unspecified) let dl = ctx.client.download_byte_range(&path, 4, None).await?; - let buf = download_and_compare(dl).await?; + let buf = download_to_vec(dl).await?; assert_eq!(&buf, &orig[4..]); // Full range (end unspecified) let dl = ctx.client.download_byte_range(&path, 0, None).await?; - let buf = download_and_compare(dl).await?; + let buf = download_to_vec(dl).await?; assert_eq!(&buf, &orig); debug!("Cleanup: deleting file at path {path:?}"); @@ -272,17 +263,6 @@ async fn azure_upload_download_works(ctx: &mut MaybeEnabledAzure) -> anyhow::Res Ok(()) } -fn ensure_logging_ready() { - LOGGING_DONE.get_or_init(|| { - utils::logging::init( - utils::logging::LogFormat::Test, - utils::logging::TracingErrorLayerEnablement::Disabled, - utils::logging::Output::Stdout, - ) - .expect("logging init failed"); - }); -} - struct EnabledAzure { client: Arc, base_prefix: &'static str, @@ -352,7 +332,7 @@ impl AsyncTestContext for MaybeEnabledAzureWithTestBlobs { let enabled = EnabledAzure::setup(Some(max_keys_in_list_response)).await; - match upload_azure_data(&enabled.client, enabled.base_prefix, upload_tasks_count).await { + match upload_remote_data(&enabled.client, enabled.base_prefix, upload_tasks_count).await { ControlFlow::Continue(uploads) => { info!("Remote objects created successfully"); @@ -414,7 +394,7 @@ impl AsyncTestContext for MaybeEnabledAzureWithSimpleTestBlobs { let enabled = EnabledAzure::setup(Some(max_keys_in_list_response)).await; - match upload_simple_azure_data(&enabled.client, upload_tasks_count).await { + match upload_simple_remote_data(&enabled.client, upload_tasks_count).await { ControlFlow::Continue(uploads) => { info!("Remote objects created successfully"); @@ -478,166 +458,3 @@ fn create_azure_client( GenericRemoteStorage::from_config(&remote_storage_config).context("remote storage init")?, )) } - -struct Uploads { - prefixes: HashSet, - blobs: HashSet, -} - -async fn upload_azure_data( - client: &Arc, - base_prefix_str: &'static str, - upload_tasks_count: usize, -) -> ControlFlow { - info!("Creating {upload_tasks_count} Azure files"); - let mut upload_tasks = JoinSet::new(); - for i in 1..upload_tasks_count + 1 { - let task_client = Arc::clone(client); - upload_tasks.spawn(async move { - let prefix = format!("{base_prefix_str}/sub_prefix_{i}/"); - let blob_prefix = RemotePath::new(Utf8Path::new(&prefix)) - .with_context(|| format!("{prefix:?} to RemotePath conversion"))?; - let blob_path = blob_prefix.join(Utf8Path::new(&format!("blob_{i}"))); - debug!("Creating remote item {i} at path {blob_path:?}"); - - let (data, len) = upload_stream(format!("remote blob data {i}").into_bytes().into()); - task_client.upload(data, len, &blob_path, None).await?; - - Ok::<_, anyhow::Error>((blob_prefix, blob_path)) - }); - } - - let mut upload_tasks_failed = false; - let mut uploaded_prefixes = HashSet::with_capacity(upload_tasks_count); - let mut uploaded_blobs = HashSet::with_capacity(upload_tasks_count); - while let Some(task_run_result) = upload_tasks.join_next().await { - match task_run_result - .context("task join failed") - .and_then(|task_result| task_result.context("upload task failed")) - { - Ok((upload_prefix, upload_path)) => { - uploaded_prefixes.insert(upload_prefix); - uploaded_blobs.insert(upload_path); - } - Err(e) => { - error!("Upload task failed: {e:?}"); - upload_tasks_failed = true; - } - } - } - - let uploads = Uploads { - prefixes: uploaded_prefixes, - blobs: uploaded_blobs, - }; - if upload_tasks_failed { - ControlFlow::Break(uploads) - } else { - ControlFlow::Continue(uploads) - } -} - -async fn cleanup(client: &Arc, objects_to_delete: HashSet) { - info!( - "Removing {} objects from the remote storage during cleanup", - objects_to_delete.len() - ); - let mut delete_tasks = JoinSet::new(); - for object_to_delete in objects_to_delete { - let task_client = Arc::clone(client); - delete_tasks.spawn(async move { - debug!("Deleting remote item at path {object_to_delete:?}"); - task_client - .delete(&object_to_delete) - .await - .with_context(|| format!("{object_to_delete:?} removal")) - }); - } - - while let Some(task_run_result) = delete_tasks.join_next().await { - match task_run_result { - Ok(task_result) => match task_result { - Ok(()) => {} - Err(e) => error!("Delete task failed: {e:?}"), - }, - Err(join_err) => error!("Delete task did not finish correctly: {join_err}"), - } - } -} - -// Uploads files `folder{j}/blob{i}.txt`. See test description for more details. -async fn upload_simple_azure_data( - client: &Arc, - upload_tasks_count: usize, -) -> ControlFlow, HashSet> { - info!("Creating {upload_tasks_count} Azure files"); - let mut upload_tasks = JoinSet::new(); - for i in 1..upload_tasks_count + 1 { - let task_client = Arc::clone(client); - upload_tasks.spawn(async move { - let blob_path = PathBuf::from(format!("folder{}/blob_{}.txt", i / 7, i)); - let blob_path = RemotePath::new( - Utf8Path::from_path(blob_path.as_path()).expect("must be valid blob path"), - ) - .with_context(|| format!("{blob_path:?} to RemotePath conversion"))?; - debug!("Creating remote item {i} at path {blob_path:?}"); - - let (data, len) = upload_stream(format!("remote blob data {i}").into_bytes().into()); - task_client.upload(data, len, &blob_path, None).await?; - - Ok::<_, anyhow::Error>(blob_path) - }); - } - - let mut upload_tasks_failed = false; - let mut uploaded_blobs = HashSet::with_capacity(upload_tasks_count); - while let Some(task_run_result) = upload_tasks.join_next().await { - match task_run_result - .context("task join failed") - .and_then(|task_result| task_result.context("upload task failed")) - { - Ok(upload_path) => { - uploaded_blobs.insert(upload_path); - } - Err(e) => { - error!("Upload task failed: {e:?}"); - upload_tasks_failed = true; - } - } - } - - if upload_tasks_failed { - ControlFlow::Break(uploaded_blobs) - } else { - ControlFlow::Continue(uploaded_blobs) - } -} - -// FIXME: copypasted from test_real_s3, can't remember how to share a module which is not compiled -// to binary -fn upload_stream( - content: std::borrow::Cow<'static, [u8]>, -) -> ( - impl Stream> + Send + Sync + 'static, - usize, -) { - use std::borrow::Cow; - - let content = match content { - Cow::Borrowed(x) => Bytes::from_static(x), - Cow::Owned(vec) => Bytes::from(vec), - }; - wrap_stream(content) -} - -fn wrap_stream( - content: bytes::Bytes, -) -> ( - impl Stream> + Send + Sync + 'static, - usize, -) { - let len = content.len(); - let content = futures::future::ready(Ok(content)); - - (futures::stream::once(content), len) -} diff --git a/libs/remote_storage/tests/test_real_s3.rs b/libs/remote_storage/tests/test_real_s3.rs index ecd834e61c..8f46b2abd6 100644 --- a/libs/remote_storage/tests/test_real_s3.rs +++ b/libs/remote_storage/tests/test_real_s3.rs @@ -2,23 +2,23 @@ use std::collections::HashSet; use std::env; use std::num::NonZeroUsize; use std::ops::ControlFlow; -use std::path::PathBuf; use std::sync::Arc; use std::time::UNIX_EPOCH; use anyhow::Context; -use bytes::Bytes; use camino::Utf8Path; -use futures::stream::Stream; -use once_cell::sync::OnceCell; use remote_storage::{ GenericRemoteStorage, RemotePath, RemoteStorageConfig, RemoteStorageKind, S3Config, }; use test_context::{test_context, AsyncTestContext}; -use tokio::task::JoinSet; -use tracing::{debug, error, info}; +use tracing::{debug, info}; -static LOGGING_DONE: OnceCell<()> = OnceCell::new(); +mod common; + +use common::{ + cleanup, download_to_vec, ensure_logging_ready, upload_remote_data, upload_simple_remote_data, + upload_stream, wrap_stream, +}; const ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME: &str = "ENABLE_REAL_S3_REMOTE_STORAGE"; @@ -30,7 +30,7 @@ const BASE_PREFIX: &str = "test"; /// If real S3 tests are disabled, the test passes, skipping any real test run: currently, there's no way to mark the test ignored in runtime with the /// deafult test framework, see https://github.com/rust-lang/rust/issues/68007 for details. /// -/// First, the test creates a set of S3 objects with keys `/${random_prefix_part}/${base_prefix_str}/sub_prefix_${i}/blob_${i}` in [`upload_s3_data`] +/// First, the test creates a set of S3 objects with keys `/${random_prefix_part}/${base_prefix_str}/sub_prefix_${i}/blob_${i}` in [`upload_remote_data`] /// where /// * `random_prefix_part` is set for the entire S3 client during the S3 client creation in [`create_s3_client`], to avoid multiple test runs interference /// * `base_prefix_str` is a common prefix to use in the client requests: we would want to ensure that the client is able to list nested prefixes inside the bucket @@ -95,7 +95,7 @@ async fn s3_pagination_should_work(ctx: &mut MaybeEnabledS3WithTestBlobs) -> any /// Uses real S3 and requires [`ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME`] and related S3 cred env vars specified. Test will skip real code and pass if env vars not set. /// See `s3_pagination_should_work` for more information. /// -/// First, create a set of S3 objects with keys `random_prefix/folder{j}/blob_{i}.txt` in [`upload_s3_data`] +/// First, create a set of S3 objects with keys `random_prefix/folder{j}/blob_{i}.txt` in [`upload_remote_data`] /// Then performs the following queries: /// 1. `list_files(None)`. This should return all files `random_prefix/folder{j}/blob_{i}.txt` /// 2. `list_files("folder1")`. This should return all files `random_prefix/folder1/blob_{i}.txt` @@ -198,15 +198,65 @@ async fn s3_delete_objects_works(ctx: &mut MaybeEnabledS3) -> anyhow::Result<()> Ok(()) } -fn ensure_logging_ready() { - LOGGING_DONE.get_or_init(|| { - utils::logging::init( - utils::logging::LogFormat::Test, - utils::logging::TracingErrorLayerEnablement::Disabled, - utils::logging::Output::Stdout, - ) - .expect("logging init failed"); - }); +#[test_context(MaybeEnabledS3)] +#[tokio::test] +async fn s3_upload_download_works(ctx: &mut MaybeEnabledS3) -> anyhow::Result<()> { + let MaybeEnabledS3::Enabled(ctx) = ctx else { + return Ok(()); + }; + + let path = RemotePath::new(Utf8Path::new(format!("{}/file", ctx.base_prefix).as_str())) + .with_context(|| "RemotePath conversion")?; + + let orig = bytes::Bytes::from_static("remote blob data here".as_bytes()); + + let (data, len) = wrap_stream(orig.clone()); + + ctx.client.upload(data, len, &path, None).await?; + + // Normal download request + let dl = ctx.client.download(&path).await?; + let buf = download_to_vec(dl).await?; + assert_eq!(&buf, &orig); + + // Full range (end specified) + let dl = ctx + .client + .download_byte_range(&path, 0, Some(len as u64)) + .await?; + let buf = download_to_vec(dl).await?; + assert_eq!(&buf, &orig); + + // partial range (end specified) + let dl = ctx.client.download_byte_range(&path, 4, Some(10)).await?; + let buf = download_to_vec(dl).await?; + assert_eq!(&buf, &orig[4..10]); + + // partial range (end beyond real end) + let dl = ctx + .client + .download_byte_range(&path, 8, Some(len as u64 * 100)) + .await?; + let buf = download_to_vec(dl).await?; + assert_eq!(&buf, &orig[8..]); + + // Partial range (end unspecified) + let dl = ctx.client.download_byte_range(&path, 4, None).await?; + let buf = download_to_vec(dl).await?; + assert_eq!(&buf, &orig[4..]); + + // Full range (end unspecified) + let dl = ctx.client.download_byte_range(&path, 0, None).await?; + let buf = download_to_vec(dl).await?; + assert_eq!(&buf, &orig); + + debug!("Cleanup: deleting file at path {path:?}"); + ctx.client + .delete(&path) + .await + .with_context(|| format!("{path:?} removal"))?; + + Ok(()) } struct EnabledS3 { @@ -278,7 +328,7 @@ impl AsyncTestContext for MaybeEnabledS3WithTestBlobs { let enabled = EnabledS3::setup(Some(max_keys_in_list_response)).await; - match upload_s3_data(&enabled.client, enabled.base_prefix, upload_tasks_count).await { + match upload_remote_data(&enabled.client, enabled.base_prefix, upload_tasks_count).await { ControlFlow::Continue(uploads) => { info!("Remote objects created successfully"); @@ -340,7 +390,7 @@ impl AsyncTestContext for MaybeEnabledS3WithSimpleTestBlobs { let enabled = EnabledS3::setup(Some(max_keys_in_list_response)).await; - match upload_simple_s3_data(&enabled.client, upload_tasks_count).await { + match upload_simple_remote_data(&enabled.client, upload_tasks_count).await { ControlFlow::Continue(uploads) => { info!("Remote objects created successfully"); @@ -403,166 +453,3 @@ fn create_s3_client( GenericRemoteStorage::from_config(&remote_storage_config).context("remote storage init")?, )) } - -struct Uploads { - prefixes: HashSet, - blobs: HashSet, -} - -async fn upload_s3_data( - client: &Arc, - base_prefix_str: &'static str, - upload_tasks_count: usize, -) -> ControlFlow { - info!("Creating {upload_tasks_count} S3 files"); - let mut upload_tasks = JoinSet::new(); - for i in 1..upload_tasks_count + 1 { - let task_client = Arc::clone(client); - upload_tasks.spawn(async move { - let prefix = format!("{base_prefix_str}/sub_prefix_{i}/"); - let blob_prefix = RemotePath::new(Utf8Path::new(&prefix)) - .with_context(|| format!("{prefix:?} to RemotePath conversion"))?; - let blob_path = blob_prefix.join(Utf8Path::new(&format!("blob_{i}"))); - debug!("Creating remote item {i} at path {blob_path:?}"); - - let (data, data_len) = - upload_stream(format!("remote blob data {i}").into_bytes().into()); - task_client.upload(data, data_len, &blob_path, None).await?; - - Ok::<_, anyhow::Error>((blob_prefix, blob_path)) - }); - } - - let mut upload_tasks_failed = false; - let mut uploaded_prefixes = HashSet::with_capacity(upload_tasks_count); - let mut uploaded_blobs = HashSet::with_capacity(upload_tasks_count); - while let Some(task_run_result) = upload_tasks.join_next().await { - match task_run_result - .context("task join failed") - .and_then(|task_result| task_result.context("upload task failed")) - { - Ok((upload_prefix, upload_path)) => { - uploaded_prefixes.insert(upload_prefix); - uploaded_blobs.insert(upload_path); - } - Err(e) => { - error!("Upload task failed: {e:?}"); - upload_tasks_failed = true; - } - } - } - - let uploads = Uploads { - prefixes: uploaded_prefixes, - blobs: uploaded_blobs, - }; - if upload_tasks_failed { - ControlFlow::Break(uploads) - } else { - ControlFlow::Continue(uploads) - } -} - -async fn cleanup(client: &Arc, objects_to_delete: HashSet) { - info!( - "Removing {} objects from the remote storage during cleanup", - objects_to_delete.len() - ); - let mut delete_tasks = JoinSet::new(); - for object_to_delete in objects_to_delete { - let task_client = Arc::clone(client); - delete_tasks.spawn(async move { - debug!("Deleting remote item at path {object_to_delete:?}"); - task_client - .delete(&object_to_delete) - .await - .with_context(|| format!("{object_to_delete:?} removal")) - }); - } - - while let Some(task_run_result) = delete_tasks.join_next().await { - match task_run_result { - Ok(task_result) => match task_result { - Ok(()) => {} - Err(e) => error!("Delete task failed: {e:?}"), - }, - Err(join_err) => error!("Delete task did not finish correctly: {join_err}"), - } - } -} - -// Uploads files `folder{j}/blob{i}.txt`. See test description for more details. -async fn upload_simple_s3_data( - client: &Arc, - upload_tasks_count: usize, -) -> ControlFlow, HashSet> { - info!("Creating {upload_tasks_count} S3 files"); - let mut upload_tasks = JoinSet::new(); - for i in 1..upload_tasks_count + 1 { - let task_client = Arc::clone(client); - upload_tasks.spawn(async move { - let blob_path = PathBuf::from(format!("folder{}/blob_{}.txt", i / 7, i)); - let blob_path = RemotePath::new( - Utf8Path::from_path(blob_path.as_path()).expect("must be valid blob path"), - ) - .with_context(|| format!("{blob_path:?} to RemotePath conversion"))?; - debug!("Creating remote item {i} at path {blob_path:?}"); - - let (data, data_len) = - upload_stream(format!("remote blob data {i}").into_bytes().into()); - task_client.upload(data, data_len, &blob_path, None).await?; - - Ok::<_, anyhow::Error>(blob_path) - }); - } - - let mut upload_tasks_failed = false; - let mut uploaded_blobs = HashSet::with_capacity(upload_tasks_count); - while let Some(task_run_result) = upload_tasks.join_next().await { - match task_run_result - .context("task join failed") - .and_then(|task_result| task_result.context("upload task failed")) - { - Ok(upload_path) => { - uploaded_blobs.insert(upload_path); - } - Err(e) => { - error!("Upload task failed: {e:?}"); - upload_tasks_failed = true; - } - } - } - - if upload_tasks_failed { - ControlFlow::Break(uploaded_blobs) - } else { - ControlFlow::Continue(uploaded_blobs) - } -} - -fn upload_stream( - content: std::borrow::Cow<'static, [u8]>, -) -> ( - impl Stream> + Send + Sync + 'static, - usize, -) { - use std::borrow::Cow; - - let content = match content { - Cow::Borrowed(x) => Bytes::from_static(x), - Cow::Owned(vec) => Bytes::from(vec), - }; - wrap_stream(content) -} - -fn wrap_stream( - content: bytes::Bytes, -) -> ( - impl Stream> + Send + Sync + 'static, - usize, -) { - let len = content.len(); - let content = futures::future::ready(Ok(content)); - - (futures::stream::once(content), len) -}