From 6ffdcfe6a4428cd72473d17ad3310bf2e656c742 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Tue, 16 Jan 2024 18:45:19 +0100 Subject: [PATCH] remote_storage: unify azure and S3 tests (#6364) The remote_storage crate contains two copies of each test, one for azure and one for S3. The repetition is not necessary and makes the tests more prone to drift, so we remove it by moving the tests into a shared module. The module has a different name depending on where it is included, so that each test still has "s3" or "azure" in its full path, allowing you to just test the S3 test or just the azure tests. Earlier PR that removed some duplication already: #6176 Fixes #6146. --- libs/remote_storage/tests/common/tests.rs | 288 ++++++++++++++++++ libs/remote_storage/tests/test_real_azure.rs | 302 +------------------ libs/remote_storage/tests/test_real_s3.rs | 298 +----------------- 3 files changed, 312 insertions(+), 576 deletions(-) create mode 100644 libs/remote_storage/tests/common/tests.rs diff --git a/libs/remote_storage/tests/common/tests.rs b/libs/remote_storage/tests/common/tests.rs new file mode 100644 index 0000000000..abccc24c97 --- /dev/null +++ b/libs/remote_storage/tests/common/tests.rs @@ -0,0 +1,288 @@ +use anyhow::Context; +use camino::Utf8Path; +use remote_storage::RemotePath; +use std::collections::HashSet; +use std::sync::Arc; +use test_context::test_context; +use tracing::debug; + +use crate::common::{download_to_vec, upload_stream, wrap_stream}; + +use super::{ + MaybeEnabledStorage, MaybeEnabledStorageWithSimpleTestBlobs, MaybeEnabledStorageWithTestBlobs, +}; + +/// Tests that S3 client can list all prefixes, even if the response come paginated and requires multiple S3 queries. +/// Uses real S3 and requires [`ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME`] and related S3 cred env vars specified. +/// See the client creation in [`create_s3_client`] for details on the required env vars. +/// If real S3 tests are disabled, the test passes, skipping any real test run: currently, there's no way to mark the test ignored in runtime with the +/// deafult test framework, see https://github.com/rust-lang/rust/issues/68007 for details. +/// +/// First, the test creates a set of S3 objects with keys `/${random_prefix_part}/${base_prefix_str}/sub_prefix_${i}/blob_${i}` in [`upload_remote_data`] +/// where +/// * `random_prefix_part` is set for the entire S3 client during the S3 client creation in [`create_s3_client`], to avoid multiple test runs interference +/// * `base_prefix_str` is a common prefix to use in the client requests: we would want to ensure that the client is able to list nested prefixes inside the bucket +/// +/// Then, verifies that the client does return correct prefixes when queried: +/// * with no prefix, it lists everything after its `${random_prefix_part}/` — that should be `${base_prefix_str}` value only +/// * with `${base_prefix_str}/` prefix, it lists every `sub_prefix_${i}` +/// +/// With the real S3 enabled and `#[cfg(test)]` Rust configuration used, the S3 client test adds a `max-keys` param to limit the response keys. +/// This way, we are able to test the pagination implicitly, by ensuring all results are returned from the remote storage and avoid uploading too many blobs to S3, +/// since current default AWS S3 pagination limit is 1000. +/// (see https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html#API_ListObjectsV2_RequestSyntax) +/// +/// Lastly, the test attempts to clean up and remove all uploaded S3 files. +/// If any errors appear during the clean up, they get logged, but the test is not failed or stopped until clean up is finished. +#[test_context(MaybeEnabledStorageWithTestBlobs)] +#[tokio::test] +async fn pagination_should_work(ctx: &mut MaybeEnabledStorageWithTestBlobs) -> anyhow::Result<()> { + let ctx = match ctx { + MaybeEnabledStorageWithTestBlobs::Enabled(ctx) => ctx, + MaybeEnabledStorageWithTestBlobs::Disabled => return Ok(()), + MaybeEnabledStorageWithTestBlobs::UploadsFailed(e, _) => { + anyhow::bail!("S3 init failed: {e:?}") + } + }; + + let test_client = Arc::clone(&ctx.enabled.client); + let expected_remote_prefixes = ctx.remote_prefixes.clone(); + + let base_prefix = RemotePath::new(Utf8Path::new(ctx.enabled.base_prefix)) + .context("common_prefix construction")?; + let root_remote_prefixes = test_client + .list_prefixes(None) + .await + .context("client list root prefixes failure")? + .into_iter() + .collect::>(); + assert_eq!( + root_remote_prefixes, HashSet::from([base_prefix.clone()]), + "remote storage root prefixes list mismatches with the uploads. Returned prefixes: {root_remote_prefixes:?}" + ); + + let nested_remote_prefixes = test_client + .list_prefixes(Some(&base_prefix)) + .await + .context("client list nested prefixes failure")? + .into_iter() + .collect::>(); + let remote_only_prefixes = nested_remote_prefixes + .difference(&expected_remote_prefixes) + .collect::>(); + let missing_uploaded_prefixes = expected_remote_prefixes + .difference(&nested_remote_prefixes) + .collect::>(); + assert_eq!( + remote_only_prefixes.len() + missing_uploaded_prefixes.len(), 0, + "remote storage nested prefixes list mismatches with the uploads. Remote only prefixes: {remote_only_prefixes:?}, missing uploaded prefixes: {missing_uploaded_prefixes:?}", + ); + + Ok(()) +} + +/// Tests that S3 client can list all files in a folder, even if the response comes paginated and requirees multiple S3 queries. +/// Uses real S3 and requires [`ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME`] and related S3 cred env vars specified. Test will skip real code and pass if env vars not set. +/// See `s3_pagination_should_work` for more information. +/// +/// First, create a set of S3 objects with keys `random_prefix/folder{j}/blob_{i}.txt` in [`upload_remote_data`] +/// Then performs the following queries: +/// 1. `list_files(None)`. This should return all files `random_prefix/folder{j}/blob_{i}.txt` +/// 2. `list_files("folder1")`. This should return all files `random_prefix/folder1/blob_{i}.txt` +#[test_context(MaybeEnabledStorageWithSimpleTestBlobs)] +#[tokio::test] +async fn list_files_works(ctx: &mut MaybeEnabledStorageWithSimpleTestBlobs) -> anyhow::Result<()> { + let ctx = match ctx { + MaybeEnabledStorageWithSimpleTestBlobs::Enabled(ctx) => ctx, + MaybeEnabledStorageWithSimpleTestBlobs::Disabled => return Ok(()), + MaybeEnabledStorageWithSimpleTestBlobs::UploadsFailed(e, _) => { + anyhow::bail!("S3 init failed: {e:?}") + } + }; + let test_client = Arc::clone(&ctx.enabled.client); + let base_prefix = + RemotePath::new(Utf8Path::new("folder1")).context("common_prefix construction")?; + let root_files = test_client + .list_files(None) + .await + .context("client list root files failure")? + .into_iter() + .collect::>(); + assert_eq!( + root_files, + ctx.remote_blobs.clone(), + "remote storage list_files on root mismatches with the uploads." + ); + let nested_remote_files = test_client + .list_files(Some(&base_prefix)) + .await + .context("client list nested files failure")? + .into_iter() + .collect::>(); + let trim_remote_blobs: HashSet<_> = ctx + .remote_blobs + .iter() + .map(|x| x.get_path()) + .filter(|x| x.starts_with("folder1")) + .map(|x| RemotePath::new(x).expect("must be valid path")) + .collect(); + assert_eq!( + nested_remote_files, trim_remote_blobs, + "remote storage list_files on subdirrectory mismatches with the uploads." + ); + Ok(()) +} + +#[test_context(MaybeEnabledStorage)] +#[tokio::test] +async fn delete_non_exising_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<()> { + let ctx = match ctx { + MaybeEnabledStorage::Enabled(ctx) => ctx, + MaybeEnabledStorage::Disabled => return Ok(()), + }; + + let path = RemotePath::new(Utf8Path::new( + format!("{}/for_sure_there_is_nothing_there_really", ctx.base_prefix).as_str(), + )) + .with_context(|| "RemotePath conversion")?; + + ctx.client.delete(&path).await.expect("should succeed"); + + Ok(()) +} + +#[test_context(MaybeEnabledStorage)] +#[tokio::test] +async fn delete_objects_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<()> { + let ctx = match ctx { + MaybeEnabledStorage::Enabled(ctx) => ctx, + MaybeEnabledStorage::Disabled => return Ok(()), + }; + + let path1 = RemotePath::new(Utf8Path::new(format!("{}/path1", ctx.base_prefix).as_str())) + .with_context(|| "RemotePath conversion")?; + + let path2 = RemotePath::new(Utf8Path::new(format!("{}/path2", ctx.base_prefix).as_str())) + .with_context(|| "RemotePath conversion")?; + + let path3 = RemotePath::new(Utf8Path::new(format!("{}/path3", ctx.base_prefix).as_str())) + .with_context(|| "RemotePath conversion")?; + + let (data, len) = upload_stream("remote blob data1".as_bytes().into()); + ctx.client.upload(data, len, &path1, None).await?; + + let (data, len) = upload_stream("remote blob data2".as_bytes().into()); + ctx.client.upload(data, len, &path2, None).await?; + + let (data, len) = upload_stream("remote blob data3".as_bytes().into()); + ctx.client.upload(data, len, &path3, None).await?; + + ctx.client.delete_objects(&[path1, path2]).await?; + + let prefixes = ctx.client.list_prefixes(None).await?; + + assert_eq!(prefixes.len(), 1); + + ctx.client.delete_objects(&[path3]).await?; + + Ok(()) +} + +#[test_context(MaybeEnabledStorage)] +#[tokio::test] +async fn upload_download_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<()> { + let MaybeEnabledStorage::Enabled(ctx) = ctx else { + return Ok(()); + }; + + let path = RemotePath::new(Utf8Path::new(format!("{}/file", ctx.base_prefix).as_str())) + .with_context(|| "RemotePath conversion")?; + + let orig = bytes::Bytes::from_static("remote blob data here".as_bytes()); + + let (data, len) = wrap_stream(orig.clone()); + + ctx.client.upload(data, len, &path, None).await?; + + // Normal download request + let dl = ctx.client.download(&path).await?; + let buf = download_to_vec(dl).await?; + assert_eq!(&buf, &orig); + + // Full range (end specified) + let dl = ctx + .client + .download_byte_range(&path, 0, Some(len as u64)) + .await?; + let buf = download_to_vec(dl).await?; + assert_eq!(&buf, &orig); + + // partial range (end specified) + let dl = ctx.client.download_byte_range(&path, 4, Some(10)).await?; + let buf = download_to_vec(dl).await?; + assert_eq!(&buf, &orig[4..10]); + + // partial range (end beyond real end) + let dl = ctx + .client + .download_byte_range(&path, 8, Some(len as u64 * 100)) + .await?; + let buf = download_to_vec(dl).await?; + assert_eq!(&buf, &orig[8..]); + + // Partial range (end unspecified) + let dl = ctx.client.download_byte_range(&path, 4, None).await?; + let buf = download_to_vec(dl).await?; + assert_eq!(&buf, &orig[4..]); + + // Full range (end unspecified) + let dl = ctx.client.download_byte_range(&path, 0, None).await?; + let buf = download_to_vec(dl).await?; + assert_eq!(&buf, &orig); + + debug!("Cleanup: deleting file at path {path:?}"); + ctx.client + .delete(&path) + .await + .with_context(|| format!("{path:?} removal"))?; + + Ok(()) +} + +#[test_context(MaybeEnabledStorage)] +#[tokio::test] +async fn copy_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<()> { + let MaybeEnabledStorage::Enabled(ctx) = ctx else { + return Ok(()); + }; + + let path = RemotePath::new(Utf8Path::new( + format!("{}/file_to_copy", ctx.base_prefix).as_str(), + )) + .with_context(|| "RemotePath conversion")?; + let path_dest = RemotePath::new(Utf8Path::new( + format!("{}/file_dest", ctx.base_prefix).as_str(), + )) + .with_context(|| "RemotePath conversion")?; + + let orig = bytes::Bytes::from_static("remote blob data content".as_bytes()); + + let (data, len) = wrap_stream(orig.clone()); + + ctx.client.upload(data, len, &path, None).await?; + + // Normal download request + ctx.client.copy_object(&path, &path_dest).await?; + + let dl = ctx.client.download(&path_dest).await?; + let buf = download_to_vec(dl).await?; + assert_eq!(&buf, &orig); + + debug!("Cleanup: deleting file at path {path:?}"); + ctx.client + .delete_objects(&[path.clone(), path_dest.clone()]) + .await + .with_context(|| format!("{path:?} removal"))?; + + Ok(()) +} diff --git a/libs/remote_storage/tests/test_real_azure.rs b/libs/remote_storage/tests/test_real_azure.rs index b359949174..6f9a1ec6f7 100644 --- a/libs/remote_storage/tests/test_real_azure.rs +++ b/libs/remote_storage/tests/test_real_azure.rs @@ -6,301 +6,23 @@ use std::sync::Arc; use std::time::UNIX_EPOCH; use anyhow::Context; -use camino::Utf8Path; use remote_storage::{ AzureConfig, GenericRemoteStorage, RemotePath, RemoteStorageConfig, RemoteStorageKind, }; -use test_context::{test_context, AsyncTestContext}; -use tracing::{debug, info}; +use test_context::AsyncTestContext; +use tracing::info; mod common; -use common::{ - cleanup, download_to_vec, ensure_logging_ready, upload_remote_data, upload_simple_remote_data, - upload_stream, wrap_stream, -}; +#[path = "common/tests.rs"] +mod tests_azure; + +use common::{cleanup, ensure_logging_ready, upload_remote_data, upload_simple_remote_data}; const ENABLE_REAL_AZURE_REMOTE_STORAGE_ENV_VAR_NAME: &str = "ENABLE_REAL_AZURE_REMOTE_STORAGE"; const BASE_PREFIX: &str = "test"; -/// Tests that the Azure client can list all prefixes, even if the response comes paginated and requires multiple HTTP queries. -/// Uses real Azure and requires [`ENABLE_REAL_AZURE_REMOTE_STORAGE_ENV_VAR_NAME`] and related Azure cred env vars specified. -/// See the client creation in [`create_azure_client`] for details on the required env vars. -/// If real Azure tests are disabled, the test passes, skipping any real test run: currently, there's no way to mark the test ignored in runtime with the -/// deafult test framework, see https://github.com/rust-lang/rust/issues/68007 for details. -/// -/// First, the test creates a set of Azure blobs with keys `/${random_prefix_part}/${base_prefix_str}/sub_prefix_${i}/blob_${i}` in [`upload_remote_data`] -/// where -/// * `random_prefix_part` is set for the entire Azure client during the Azure client creation in [`create_azure_client`], to avoid multiple test runs interference -/// * `base_prefix_str` is a common prefix to use in the client requests: we would want to ensure that the client is able to list nested prefixes inside the bucket -/// -/// Then, verifies that the client does return correct prefixes when queried: -/// * with no prefix, it lists everything after its `${random_prefix_part}/` — that should be `${base_prefix_str}` value only -/// * with `${base_prefix_str}/` prefix, it lists every `sub_prefix_${i}` -/// -/// With the real Azure enabled and `#[cfg(test)]` Rust configuration used, the Azure client test adds a `max-keys` param to limit the response keys. -/// This way, we are able to test the pagination implicitly, by ensuring all results are returned from the remote storage and avoid uploading too many blobs to Azure. -/// -/// Lastly, the test attempts to clean up and remove all uploaded Azure files. -/// If any errors appear during the clean up, they get logged, but the test is not failed or stopped until clean up is finished. -#[test_context(MaybeEnabledAzureWithTestBlobs)] -#[tokio::test] -async fn azure_pagination_should_work( - ctx: &mut MaybeEnabledAzureWithTestBlobs, -) -> anyhow::Result<()> { - let ctx = match ctx { - MaybeEnabledAzureWithTestBlobs::Enabled(ctx) => ctx, - MaybeEnabledAzureWithTestBlobs::Disabled => return Ok(()), - MaybeEnabledAzureWithTestBlobs::UploadsFailed(e, _) => { - anyhow::bail!("Azure init failed: {e:?}") - } - }; - - let test_client = Arc::clone(&ctx.enabled.client); - let expected_remote_prefixes = ctx.remote_prefixes.clone(); - - let base_prefix = RemotePath::new(Utf8Path::new(ctx.enabled.base_prefix)) - .context("common_prefix construction")?; - let root_remote_prefixes = test_client - .list_prefixes(None) - .await - .context("client list root prefixes failure")? - .into_iter() - .collect::>(); - assert_eq!( - root_remote_prefixes, HashSet::from([base_prefix.clone()]), - "remote storage root prefixes list mismatches with the uploads. Returned prefixes: {root_remote_prefixes:?}" - ); - - let nested_remote_prefixes = test_client - .list_prefixes(Some(&base_prefix)) - .await - .context("client list nested prefixes failure")? - .into_iter() - .collect::>(); - let remote_only_prefixes = nested_remote_prefixes - .difference(&expected_remote_prefixes) - .collect::>(); - let missing_uploaded_prefixes = expected_remote_prefixes - .difference(&nested_remote_prefixes) - .collect::>(); - assert_eq!( - remote_only_prefixes.len() + missing_uploaded_prefixes.len(), 0, - "remote storage nested prefixes list mismatches with the uploads. Remote only prefixes: {remote_only_prefixes:?}, missing uploaded prefixes: {missing_uploaded_prefixes:?}", - ); - - Ok(()) -} - -/// Tests that Azure client can list all files in a folder, even if the response comes paginated and requirees multiple Azure queries. -/// Uses real Azure and requires [`ENABLE_REAL_AZURE_REMOTE_STORAGE_ENV_VAR_NAME`] and related Azure cred env vars specified. Test will skip real code and pass if env vars not set. -/// See `Azure_pagination_should_work` for more information. -/// -/// First, create a set of Azure objects with keys `random_prefix/folder{j}/blob_{i}.txt` in [`upload_remote_data`] -/// Then performs the following queries: -/// 1. `list_files(None)`. This should return all files `random_prefix/folder{j}/blob_{i}.txt` -/// 2. `list_files("folder1")`. This should return all files `random_prefix/folder1/blob_{i}.txt` -#[test_context(MaybeEnabledAzureWithSimpleTestBlobs)] -#[tokio::test] -async fn azure_list_files_works( - ctx: &mut MaybeEnabledAzureWithSimpleTestBlobs, -) -> anyhow::Result<()> { - let ctx = match ctx { - MaybeEnabledAzureWithSimpleTestBlobs::Enabled(ctx) => ctx, - MaybeEnabledAzureWithSimpleTestBlobs::Disabled => return Ok(()), - MaybeEnabledAzureWithSimpleTestBlobs::UploadsFailed(e, _) => { - anyhow::bail!("Azure init failed: {e:?}") - } - }; - let test_client = Arc::clone(&ctx.enabled.client); - let base_prefix = - RemotePath::new(Utf8Path::new("folder1")).context("common_prefix construction")?; - let root_files = test_client - .list_files(None) - .await - .context("client list root files failure")? - .into_iter() - .collect::>(); - assert_eq!( - root_files, - ctx.remote_blobs.clone(), - "remote storage list_files on root mismatches with the uploads." - ); - let nested_remote_files = test_client - .list_files(Some(&base_prefix)) - .await - .context("client list nested files failure")? - .into_iter() - .collect::>(); - let trim_remote_blobs: HashSet<_> = ctx - .remote_blobs - .iter() - .map(|x| x.get_path()) - .filter(|x| x.starts_with("folder1")) - .map(|x| RemotePath::new(x).expect("must be valid path")) - .collect(); - assert_eq!( - nested_remote_files, trim_remote_blobs, - "remote storage list_files on subdirrectory mismatches with the uploads." - ); - Ok(()) -} - -#[test_context(MaybeEnabledAzure)] -#[tokio::test] -async fn azure_delete_non_exising_works(ctx: &mut MaybeEnabledAzure) -> anyhow::Result<()> { - let ctx = match ctx { - MaybeEnabledAzure::Enabled(ctx) => ctx, - MaybeEnabledAzure::Disabled => return Ok(()), - }; - - let path = RemotePath::new(Utf8Path::new( - format!("{}/for_sure_there_is_nothing_there_really", ctx.base_prefix).as_str(), - )) - .with_context(|| "RemotePath conversion")?; - - ctx.client.delete(&path).await.expect("should succeed"); - - Ok(()) -} - -#[test_context(MaybeEnabledAzure)] -#[tokio::test] -async fn azure_delete_objects_works(ctx: &mut MaybeEnabledAzure) -> anyhow::Result<()> { - let ctx = match ctx { - MaybeEnabledAzure::Enabled(ctx) => ctx, - MaybeEnabledAzure::Disabled => return Ok(()), - }; - - let path1 = RemotePath::new(Utf8Path::new(format!("{}/path1", ctx.base_prefix).as_str())) - .with_context(|| "RemotePath conversion")?; - - let path2 = RemotePath::new(Utf8Path::new(format!("{}/path2", ctx.base_prefix).as_str())) - .with_context(|| "RemotePath conversion")?; - - let path3 = RemotePath::new(Utf8Path::new(format!("{}/path3", ctx.base_prefix).as_str())) - .with_context(|| "RemotePath conversion")?; - - let (data, len) = upload_stream("remote blob data1".as_bytes().into()); - ctx.client.upload(data, len, &path1, None).await?; - - let (data, len) = upload_stream("remote blob data2".as_bytes().into()); - ctx.client.upload(data, len, &path2, None).await?; - - let (data, len) = upload_stream("remote blob data3".as_bytes().into()); - ctx.client.upload(data, len, &path3, None).await?; - - ctx.client.delete_objects(&[path1, path2]).await?; - - let prefixes = ctx.client.list_prefixes(None).await?; - - assert_eq!(prefixes.len(), 1); - - ctx.client.delete_objects(&[path3]).await?; - - Ok(()) -} - -#[test_context(MaybeEnabledAzure)] -#[tokio::test] -async fn azure_upload_download_works(ctx: &mut MaybeEnabledAzure) -> anyhow::Result<()> { - let MaybeEnabledAzure::Enabled(ctx) = ctx else { - return Ok(()); - }; - - let path = RemotePath::new(Utf8Path::new(format!("{}/file", ctx.base_prefix).as_str())) - .with_context(|| "RemotePath conversion")?; - - let orig = bytes::Bytes::from_static("remote blob data here".as_bytes()); - - let (data, len) = wrap_stream(orig.clone()); - - ctx.client.upload(data, len, &path, None).await?; - - // Normal download request - let dl = ctx.client.download(&path).await?; - let buf = download_to_vec(dl).await?; - assert_eq!(&buf, &orig); - - // Full range (end specified) - let dl = ctx - .client - .download_byte_range(&path, 0, Some(len as u64)) - .await?; - let buf = download_to_vec(dl).await?; - assert_eq!(&buf, &orig); - - // partial range (end specified) - let dl = ctx.client.download_byte_range(&path, 4, Some(10)).await?; - let buf = download_to_vec(dl).await?; - assert_eq!(&buf, &orig[4..10]); - - // partial range (end beyond real end) - let dl = ctx - .client - .download_byte_range(&path, 8, Some(len as u64 * 100)) - .await?; - let buf = download_to_vec(dl).await?; - assert_eq!(&buf, &orig[8..]); - - // Partial range (end unspecified) - let dl = ctx.client.download_byte_range(&path, 4, None).await?; - let buf = download_to_vec(dl).await?; - assert_eq!(&buf, &orig[4..]); - - // Full range (end unspecified) - let dl = ctx.client.download_byte_range(&path, 0, None).await?; - let buf = download_to_vec(dl).await?; - assert_eq!(&buf, &orig); - - debug!("Cleanup: deleting file at path {path:?}"); - ctx.client - .delete(&path) - .await - .with_context(|| format!("{path:?} removal"))?; - - Ok(()) -} - -#[test_context(MaybeEnabledAzure)] -#[tokio::test] -async fn azure_copy_works(ctx: &mut MaybeEnabledAzure) -> anyhow::Result<()> { - let MaybeEnabledAzure::Enabled(ctx) = ctx else { - return Ok(()); - }; - - let path = RemotePath::new(Utf8Path::new( - format!("{}/file_to_copy", ctx.base_prefix).as_str(), - )) - .with_context(|| "RemotePath conversion")?; - let path_dest = RemotePath::new(Utf8Path::new( - format!("{}/file_dest", ctx.base_prefix).as_str(), - )) - .with_context(|| "RemotePath conversion")?; - - let orig = bytes::Bytes::from_static("remote blob data content".as_bytes()); - - let (data, len) = wrap_stream(orig.clone()); - - ctx.client.upload(data, len, &path, None).await?; - - // Normal download request - ctx.client.copy_object(&path, &path_dest).await?; - - let dl = ctx.client.download(&path_dest).await?; - let buf = download_to_vec(dl).await?; - assert_eq!(&buf, &orig); - - debug!("Cleanup: deleting file at path {path:?}"); - ctx.client - .delete_objects(&[path.clone(), path_dest.clone()]) - .await - .with_context(|| format!("{path:?} removal"))?; - - Ok(()) -} - struct EnabledAzure { client: Arc, base_prefix: &'static str, @@ -319,13 +41,13 @@ impl EnabledAzure { } } -enum MaybeEnabledAzure { +enum MaybeEnabledStorage { Enabled(EnabledAzure), Disabled, } #[async_trait::async_trait] -impl AsyncTestContext for MaybeEnabledAzure { +impl AsyncTestContext for MaybeEnabledStorage { async fn setup() -> Self { ensure_logging_ready(); @@ -341,7 +63,7 @@ impl AsyncTestContext for MaybeEnabledAzure { } } -enum MaybeEnabledAzureWithTestBlobs { +enum MaybeEnabledStorageWithTestBlobs { Enabled(AzureWithTestBlobs), Disabled, UploadsFailed(anyhow::Error, AzureWithTestBlobs), @@ -354,7 +76,7 @@ struct AzureWithTestBlobs { } #[async_trait::async_trait] -impl AsyncTestContext for MaybeEnabledAzureWithTestBlobs { +impl AsyncTestContext for MaybeEnabledStorageWithTestBlobs { async fn setup() -> Self { ensure_logging_ready(); if env::var(ENABLE_REAL_AZURE_REMOTE_STORAGE_ENV_VAR_NAME).is_err() { @@ -405,7 +127,7 @@ impl AsyncTestContext for MaybeEnabledAzureWithTestBlobs { // However, they are not idential. The list_prefixes function is concerned with listing prefixes, // whereas the list_files function is concerned with listing files. // See `RemoteStorage::list_files` documentation for more details -enum MaybeEnabledAzureWithSimpleTestBlobs { +enum MaybeEnabledStorageWithSimpleTestBlobs { Enabled(AzureWithSimpleTestBlobs), Disabled, UploadsFailed(anyhow::Error, AzureWithSimpleTestBlobs), @@ -416,7 +138,7 @@ struct AzureWithSimpleTestBlobs { } #[async_trait::async_trait] -impl AsyncTestContext for MaybeEnabledAzureWithSimpleTestBlobs { +impl AsyncTestContext for MaybeEnabledStorageWithSimpleTestBlobs { async fn setup() -> Self { ensure_logging_ready(); if env::var(ENABLE_REAL_AZURE_REMOTE_STORAGE_ENV_VAR_NAME).is_err() { diff --git a/libs/remote_storage/tests/test_real_s3.rs b/libs/remote_storage/tests/test_real_s3.rs index 661716b48f..4a999d115e 100644 --- a/libs/remote_storage/tests/test_real_s3.rs +++ b/libs/remote_storage/tests/test_real_s3.rs @@ -6,297 +6,23 @@ use std::sync::Arc; use std::time::UNIX_EPOCH; use anyhow::Context; -use camino::Utf8Path; use remote_storage::{ GenericRemoteStorage, RemotePath, RemoteStorageConfig, RemoteStorageKind, S3Config, }; -use test_context::{test_context, AsyncTestContext}; -use tracing::{debug, info}; +use test_context::AsyncTestContext; +use tracing::info; mod common; -use common::{ - cleanup, download_to_vec, ensure_logging_ready, upload_remote_data, upload_simple_remote_data, - upload_stream, wrap_stream, -}; +#[path = "common/tests.rs"] +mod tests_s3; + +use common::{cleanup, ensure_logging_ready, upload_remote_data, upload_simple_remote_data}; const ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME: &str = "ENABLE_REAL_S3_REMOTE_STORAGE"; const BASE_PREFIX: &str = "test"; -/// Tests that S3 client can list all prefixes, even if the response come paginated and requires multiple S3 queries. -/// Uses real S3 and requires [`ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME`] and related S3 cred env vars specified. -/// See the client creation in [`create_s3_client`] for details on the required env vars. -/// If real S3 tests are disabled, the test passes, skipping any real test run: currently, there's no way to mark the test ignored in runtime with the -/// deafult test framework, see https://github.com/rust-lang/rust/issues/68007 for details. -/// -/// First, the test creates a set of S3 objects with keys `/${random_prefix_part}/${base_prefix_str}/sub_prefix_${i}/blob_${i}` in [`upload_remote_data`] -/// where -/// * `random_prefix_part` is set for the entire S3 client during the S3 client creation in [`create_s3_client`], to avoid multiple test runs interference -/// * `base_prefix_str` is a common prefix to use in the client requests: we would want to ensure that the client is able to list nested prefixes inside the bucket -/// -/// Then, verifies that the client does return correct prefixes when queried: -/// * with no prefix, it lists everything after its `${random_prefix_part}/` — that should be `${base_prefix_str}` value only -/// * with `${base_prefix_str}/` prefix, it lists every `sub_prefix_${i}` -/// -/// With the real S3 enabled and `#[cfg(test)]` Rust configuration used, the S3 client test adds a `max-keys` param to limit the response keys. -/// This way, we are able to test the pagination implicitly, by ensuring all results are returned from the remote storage and avoid uploading too many blobs to S3, -/// since current default AWS S3 pagination limit is 1000. -/// (see https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html#API_ListObjectsV2_RequestSyntax) -/// -/// Lastly, the test attempts to clean up and remove all uploaded S3 files. -/// If any errors appear during the clean up, they get logged, but the test is not failed or stopped until clean up is finished. -#[test_context(MaybeEnabledS3WithTestBlobs)] -#[tokio::test] -async fn s3_pagination_should_work(ctx: &mut MaybeEnabledS3WithTestBlobs) -> anyhow::Result<()> { - let ctx = match ctx { - MaybeEnabledS3WithTestBlobs::Enabled(ctx) => ctx, - MaybeEnabledS3WithTestBlobs::Disabled => return Ok(()), - MaybeEnabledS3WithTestBlobs::UploadsFailed(e, _) => anyhow::bail!("S3 init failed: {e:?}"), - }; - - let test_client = Arc::clone(&ctx.enabled.client); - let expected_remote_prefixes = ctx.remote_prefixes.clone(); - - let base_prefix = RemotePath::new(Utf8Path::new(ctx.enabled.base_prefix)) - .context("common_prefix construction")?; - let root_remote_prefixes = test_client - .list_prefixes(None) - .await - .context("client list root prefixes failure")? - .into_iter() - .collect::>(); - assert_eq!( - root_remote_prefixes, HashSet::from([base_prefix.clone()]), - "remote storage root prefixes list mismatches with the uploads. Returned prefixes: {root_remote_prefixes:?}" - ); - - let nested_remote_prefixes = test_client - .list_prefixes(Some(&base_prefix)) - .await - .context("client list nested prefixes failure")? - .into_iter() - .collect::>(); - let remote_only_prefixes = nested_remote_prefixes - .difference(&expected_remote_prefixes) - .collect::>(); - let missing_uploaded_prefixes = expected_remote_prefixes - .difference(&nested_remote_prefixes) - .collect::>(); - assert_eq!( - remote_only_prefixes.len() + missing_uploaded_prefixes.len(), 0, - "remote storage nested prefixes list mismatches with the uploads. Remote only prefixes: {remote_only_prefixes:?}, missing uploaded prefixes: {missing_uploaded_prefixes:?}", - ); - - Ok(()) -} - -/// Tests that S3 client can list all files in a folder, even if the response comes paginated and requirees multiple S3 queries. -/// Uses real S3 and requires [`ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME`] and related S3 cred env vars specified. Test will skip real code and pass if env vars not set. -/// See `s3_pagination_should_work` for more information. -/// -/// First, create a set of S3 objects with keys `random_prefix/folder{j}/blob_{i}.txt` in [`upload_remote_data`] -/// Then performs the following queries: -/// 1. `list_files(None)`. This should return all files `random_prefix/folder{j}/blob_{i}.txt` -/// 2. `list_files("folder1")`. This should return all files `random_prefix/folder1/blob_{i}.txt` -#[test_context(MaybeEnabledS3WithSimpleTestBlobs)] -#[tokio::test] -async fn s3_list_files_works(ctx: &mut MaybeEnabledS3WithSimpleTestBlobs) -> anyhow::Result<()> { - let ctx = match ctx { - MaybeEnabledS3WithSimpleTestBlobs::Enabled(ctx) => ctx, - MaybeEnabledS3WithSimpleTestBlobs::Disabled => return Ok(()), - MaybeEnabledS3WithSimpleTestBlobs::UploadsFailed(e, _) => { - anyhow::bail!("S3 init failed: {e:?}") - } - }; - let test_client = Arc::clone(&ctx.enabled.client); - let base_prefix = - RemotePath::new(Utf8Path::new("folder1")).context("common_prefix construction")?; - let root_files = test_client - .list_files(None) - .await - .context("client list root files failure")? - .into_iter() - .collect::>(); - assert_eq!( - root_files, - ctx.remote_blobs.clone(), - "remote storage list_files on root mismatches with the uploads." - ); - let nested_remote_files = test_client - .list_files(Some(&base_prefix)) - .await - .context("client list nested files failure")? - .into_iter() - .collect::>(); - let trim_remote_blobs: HashSet<_> = ctx - .remote_blobs - .iter() - .map(|x| x.get_path()) - .filter(|x| x.starts_with("folder1")) - .map(|x| RemotePath::new(x).expect("must be valid path")) - .collect(); - assert_eq!( - nested_remote_files, trim_remote_blobs, - "remote storage list_files on subdirrectory mismatches with the uploads." - ); - Ok(()) -} - -#[test_context(MaybeEnabledS3)] -#[tokio::test] -async fn s3_delete_non_exising_works(ctx: &mut MaybeEnabledS3) -> anyhow::Result<()> { - let ctx = match ctx { - MaybeEnabledS3::Enabled(ctx) => ctx, - MaybeEnabledS3::Disabled => return Ok(()), - }; - - let path = RemotePath::new(Utf8Path::new( - format!("{}/for_sure_there_is_nothing_there_really", ctx.base_prefix).as_str(), - )) - .with_context(|| "RemotePath conversion")?; - - ctx.client.delete(&path).await.expect("should succeed"); - - Ok(()) -} - -#[test_context(MaybeEnabledS3)] -#[tokio::test] -async fn s3_delete_objects_works(ctx: &mut MaybeEnabledS3) -> anyhow::Result<()> { - let ctx = match ctx { - MaybeEnabledS3::Enabled(ctx) => ctx, - MaybeEnabledS3::Disabled => return Ok(()), - }; - - let path1 = RemotePath::new(Utf8Path::new(format!("{}/path1", ctx.base_prefix).as_str())) - .with_context(|| "RemotePath conversion")?; - - let path2 = RemotePath::new(Utf8Path::new(format!("{}/path2", ctx.base_prefix).as_str())) - .with_context(|| "RemotePath conversion")?; - - let path3 = RemotePath::new(Utf8Path::new(format!("{}/path3", ctx.base_prefix).as_str())) - .with_context(|| "RemotePath conversion")?; - - let (data, len) = upload_stream("remote blob data1".as_bytes().into()); - ctx.client.upload(data, len, &path1, None).await?; - - let (data, len) = upload_stream("remote blob data2".as_bytes().into()); - ctx.client.upload(data, len, &path2, None).await?; - - let (data, len) = upload_stream("remote blob data3".as_bytes().into()); - ctx.client.upload(data, len, &path3, None).await?; - - ctx.client.delete_objects(&[path1, path2]).await?; - - let prefixes = ctx.client.list_prefixes(None).await?; - - assert_eq!(prefixes.len(), 1); - - ctx.client.delete_objects(&[path3]).await?; - - Ok(()) -} - -#[test_context(MaybeEnabledS3)] -#[tokio::test] -async fn s3_upload_download_works(ctx: &mut MaybeEnabledS3) -> anyhow::Result<()> { - let MaybeEnabledS3::Enabled(ctx) = ctx else { - return Ok(()); - }; - - let path = RemotePath::new(Utf8Path::new(format!("{}/file", ctx.base_prefix).as_str())) - .with_context(|| "RemotePath conversion")?; - - let orig = bytes::Bytes::from_static("remote blob data here".as_bytes()); - - let (data, len) = wrap_stream(orig.clone()); - - ctx.client.upload(data, len, &path, None).await?; - - // Normal download request - let dl = ctx.client.download(&path).await?; - let buf = download_to_vec(dl).await?; - assert_eq!(&buf, &orig); - - // Full range (end specified) - let dl = ctx - .client - .download_byte_range(&path, 0, Some(len as u64)) - .await?; - let buf = download_to_vec(dl).await?; - assert_eq!(&buf, &orig); - - // partial range (end specified) - let dl = ctx.client.download_byte_range(&path, 4, Some(10)).await?; - let buf = download_to_vec(dl).await?; - assert_eq!(&buf, &orig[4..10]); - - // partial range (end beyond real end) - let dl = ctx - .client - .download_byte_range(&path, 8, Some(len as u64 * 100)) - .await?; - let buf = download_to_vec(dl).await?; - assert_eq!(&buf, &orig[8..]); - - // Partial range (end unspecified) - let dl = ctx.client.download_byte_range(&path, 4, None).await?; - let buf = download_to_vec(dl).await?; - assert_eq!(&buf, &orig[4..]); - - // Full range (end unspecified) - let dl = ctx.client.download_byte_range(&path, 0, None).await?; - let buf = download_to_vec(dl).await?; - assert_eq!(&buf, &orig); - - debug!("Cleanup: deleting file at path {path:?}"); - ctx.client - .delete(&path) - .await - .with_context(|| format!("{path:?} removal"))?; - - Ok(()) -} - -#[test_context(MaybeEnabledS3)] -#[tokio::test] -async fn s3_copy_works(ctx: &mut MaybeEnabledS3) -> anyhow::Result<()> { - let MaybeEnabledS3::Enabled(ctx) = ctx else { - return Ok(()); - }; - - let path = RemotePath::new(Utf8Path::new( - format!("{}/file_to_copy", ctx.base_prefix).as_str(), - )) - .with_context(|| "RemotePath conversion")?; - let path_dest = RemotePath::new(Utf8Path::new( - format!("{}/file_dest", ctx.base_prefix).as_str(), - )) - .with_context(|| "RemotePath conversion")?; - - let orig = bytes::Bytes::from_static("remote blob data content".as_bytes()); - - let (data, len) = wrap_stream(orig.clone()); - - ctx.client.upload(data, len, &path, None).await?; - - // Normal download request - ctx.client.copy_object(&path, &path_dest).await?; - - let dl = ctx.client.download(&path_dest).await?; - let buf = download_to_vec(dl).await?; - assert_eq!(&buf, &orig); - - debug!("Cleanup: deleting file at path {path:?}"); - ctx.client - .delete_objects(&[path.clone(), path_dest.clone()]) - .await - .with_context(|| format!("{path:?} removal"))?; - - Ok(()) -} - struct EnabledS3 { client: Arc, base_prefix: &'static str, @@ -315,13 +41,13 @@ impl EnabledS3 { } } -enum MaybeEnabledS3 { +enum MaybeEnabledStorage { Enabled(EnabledS3), Disabled, } #[async_trait::async_trait] -impl AsyncTestContext for MaybeEnabledS3 { +impl AsyncTestContext for MaybeEnabledStorage { async fn setup() -> Self { ensure_logging_ready(); @@ -337,7 +63,7 @@ impl AsyncTestContext for MaybeEnabledS3 { } } -enum MaybeEnabledS3WithTestBlobs { +enum MaybeEnabledStorageWithTestBlobs { Enabled(S3WithTestBlobs), Disabled, UploadsFailed(anyhow::Error, S3WithTestBlobs), @@ -350,7 +76,7 @@ struct S3WithTestBlobs { } #[async_trait::async_trait] -impl AsyncTestContext for MaybeEnabledS3WithTestBlobs { +impl AsyncTestContext for MaybeEnabledStorageWithTestBlobs { async fn setup() -> Self { ensure_logging_ready(); if env::var(ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME).is_err() { @@ -401,7 +127,7 @@ impl AsyncTestContext for MaybeEnabledS3WithTestBlobs { // However, they are not idential. The list_prefixes function is concerned with listing prefixes, // whereas the list_files function is concerned with listing files. // See `RemoteStorage::list_files` documentation for more details -enum MaybeEnabledS3WithSimpleTestBlobs { +enum MaybeEnabledStorageWithSimpleTestBlobs { Enabled(S3WithSimpleTestBlobs), Disabled, UploadsFailed(anyhow::Error, S3WithSimpleTestBlobs), @@ -412,7 +138,7 @@ struct S3WithSimpleTestBlobs { } #[async_trait::async_trait] -impl AsyncTestContext for MaybeEnabledS3WithSimpleTestBlobs { +impl AsyncTestContext for MaybeEnabledStorageWithSimpleTestBlobs { async fn setup() -> Self { ensure_logging_ready(); if env::var(ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME).is_err() {