mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-06 04:52:55 +00:00
The remote_storage crate contains two copies of each test, one for azure and one for S3. The repetition is not necessary and makes the tests more prone to drift, so we remove it by moving the tests into a shared module. The module has a different name depending on where it is included, so that each test still has "s3" or "azure" in its full path, allowing you to just test the S3 test or just the azure tests. Earlier PR that removed some duplication already: #6176 Fixes #6146.
220 lines
7.0 KiB
Rust
220 lines
7.0 KiB
Rust
use std::collections::HashSet;
|
|
use std::env;
|
|
use std::num::NonZeroUsize;
|
|
use std::ops::ControlFlow;
|
|
use std::sync::Arc;
|
|
use std::time::UNIX_EPOCH;
|
|
|
|
use anyhow::Context;
|
|
use remote_storage::{
|
|
GenericRemoteStorage, RemotePath, RemoteStorageConfig, RemoteStorageKind, S3Config,
|
|
};
|
|
use test_context::AsyncTestContext;
|
|
use tracing::info;
|
|
|
|
mod common;
|
|
|
|
#[path = "common/tests.rs"]
|
|
mod tests_s3;
|
|
|
|
use common::{cleanup, ensure_logging_ready, upload_remote_data, upload_simple_remote_data};
|
|
|
|
const ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME: &str = "ENABLE_REAL_S3_REMOTE_STORAGE";
|
|
|
|
const BASE_PREFIX: &str = "test";
|
|
|
|
struct EnabledS3 {
|
|
client: Arc<GenericRemoteStorage>,
|
|
base_prefix: &'static str,
|
|
}
|
|
|
|
impl EnabledS3 {
|
|
async fn setup(max_keys_in_list_response: Option<i32>) -> Self {
|
|
let client = create_s3_client(max_keys_in_list_response)
|
|
.context("S3 client creation")
|
|
.expect("S3 client creation failed");
|
|
|
|
EnabledS3 {
|
|
client,
|
|
base_prefix: BASE_PREFIX,
|
|
}
|
|
}
|
|
}
|
|
|
|
enum MaybeEnabledStorage {
|
|
Enabled(EnabledS3),
|
|
Disabled,
|
|
}
|
|
|
|
#[async_trait::async_trait]
|
|
impl AsyncTestContext for MaybeEnabledStorage {
|
|
async fn setup() -> Self {
|
|
ensure_logging_ready();
|
|
|
|
if env::var(ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME).is_err() {
|
|
info!(
|
|
"`{}` env variable is not set, skipping the test",
|
|
ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME
|
|
);
|
|
return Self::Disabled;
|
|
}
|
|
|
|
Self::Enabled(EnabledS3::setup(None).await)
|
|
}
|
|
}
|
|
|
|
enum MaybeEnabledStorageWithTestBlobs {
|
|
Enabled(S3WithTestBlobs),
|
|
Disabled,
|
|
UploadsFailed(anyhow::Error, S3WithTestBlobs),
|
|
}
|
|
|
|
struct S3WithTestBlobs {
|
|
enabled: EnabledS3,
|
|
remote_prefixes: HashSet<RemotePath>,
|
|
remote_blobs: HashSet<RemotePath>,
|
|
}
|
|
|
|
#[async_trait::async_trait]
|
|
impl AsyncTestContext for MaybeEnabledStorageWithTestBlobs {
|
|
async fn setup() -> Self {
|
|
ensure_logging_ready();
|
|
if env::var(ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME).is_err() {
|
|
info!(
|
|
"`{}` env variable is not set, skipping the test",
|
|
ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME
|
|
);
|
|
return Self::Disabled;
|
|
}
|
|
|
|
let max_keys_in_list_response = 10;
|
|
let upload_tasks_count = 1 + (2 * usize::try_from(max_keys_in_list_response).unwrap());
|
|
|
|
let enabled = EnabledS3::setup(Some(max_keys_in_list_response)).await;
|
|
|
|
match upload_remote_data(&enabled.client, enabled.base_prefix, upload_tasks_count).await {
|
|
ControlFlow::Continue(uploads) => {
|
|
info!("Remote objects created successfully");
|
|
|
|
Self::Enabled(S3WithTestBlobs {
|
|
enabled,
|
|
remote_prefixes: uploads.prefixes,
|
|
remote_blobs: uploads.blobs,
|
|
})
|
|
}
|
|
ControlFlow::Break(uploads) => Self::UploadsFailed(
|
|
anyhow::anyhow!("One or multiple blobs failed to upload to S3"),
|
|
S3WithTestBlobs {
|
|
enabled,
|
|
remote_prefixes: uploads.prefixes,
|
|
remote_blobs: uploads.blobs,
|
|
},
|
|
),
|
|
}
|
|
}
|
|
|
|
async fn teardown(self) {
|
|
match self {
|
|
Self::Disabled => {}
|
|
Self::Enabled(ctx) | Self::UploadsFailed(_, ctx) => {
|
|
cleanup(&ctx.enabled.client, ctx.remote_blobs).await;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// NOTE: the setups for the list_prefixes test and the list_files test are very similar
|
|
// However, they are not idential. The list_prefixes function is concerned with listing prefixes,
|
|
// whereas the list_files function is concerned with listing files.
|
|
// See `RemoteStorage::list_files` documentation for more details
|
|
enum MaybeEnabledStorageWithSimpleTestBlobs {
|
|
Enabled(S3WithSimpleTestBlobs),
|
|
Disabled,
|
|
UploadsFailed(anyhow::Error, S3WithSimpleTestBlobs),
|
|
}
|
|
struct S3WithSimpleTestBlobs {
|
|
enabled: EnabledS3,
|
|
remote_blobs: HashSet<RemotePath>,
|
|
}
|
|
|
|
#[async_trait::async_trait]
|
|
impl AsyncTestContext for MaybeEnabledStorageWithSimpleTestBlobs {
|
|
async fn setup() -> Self {
|
|
ensure_logging_ready();
|
|
if env::var(ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME).is_err() {
|
|
info!(
|
|
"`{}` env variable is not set, skipping the test",
|
|
ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME
|
|
);
|
|
return Self::Disabled;
|
|
}
|
|
|
|
let max_keys_in_list_response = 10;
|
|
let upload_tasks_count = 1 + (2 * usize::try_from(max_keys_in_list_response).unwrap());
|
|
|
|
let enabled = EnabledS3::setup(Some(max_keys_in_list_response)).await;
|
|
|
|
match upload_simple_remote_data(&enabled.client, upload_tasks_count).await {
|
|
ControlFlow::Continue(uploads) => {
|
|
info!("Remote objects created successfully");
|
|
|
|
Self::Enabled(S3WithSimpleTestBlobs {
|
|
enabled,
|
|
remote_blobs: uploads,
|
|
})
|
|
}
|
|
ControlFlow::Break(uploads) => Self::UploadsFailed(
|
|
anyhow::anyhow!("One or multiple blobs failed to upload to S3"),
|
|
S3WithSimpleTestBlobs {
|
|
enabled,
|
|
remote_blobs: uploads,
|
|
},
|
|
),
|
|
}
|
|
}
|
|
|
|
async fn teardown(self) {
|
|
match self {
|
|
Self::Disabled => {}
|
|
Self::Enabled(ctx) | Self::UploadsFailed(_, ctx) => {
|
|
cleanup(&ctx.enabled.client, ctx.remote_blobs).await;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
fn create_s3_client(
|
|
max_keys_per_list_response: Option<i32>,
|
|
) -> anyhow::Result<Arc<GenericRemoteStorage>> {
|
|
use rand::Rng;
|
|
|
|
let remote_storage_s3_bucket = env::var("REMOTE_STORAGE_S3_BUCKET")
|
|
.context("`REMOTE_STORAGE_S3_BUCKET` env var is not set, but real S3 tests are enabled")?;
|
|
let remote_storage_s3_region = env::var("REMOTE_STORAGE_S3_REGION")
|
|
.context("`REMOTE_STORAGE_S3_REGION` env var is not set, but real S3 tests are enabled")?;
|
|
|
|
// due to how time works, we've had test runners use the same nanos as bucket prefixes.
|
|
// millis is just a debugging aid for easier finding the prefix later.
|
|
let millis = std::time::SystemTime::now()
|
|
.duration_since(UNIX_EPOCH)
|
|
.context("random s3 test prefix part calculation")?
|
|
.as_millis();
|
|
|
|
// because nanos can be the same for two threads so can millis, add randomness
|
|
let random = rand::thread_rng().gen::<u32>();
|
|
|
|
let remote_storage_config = RemoteStorageConfig {
|
|
storage: RemoteStorageKind::AwsS3(S3Config {
|
|
bucket_name: remote_storage_s3_bucket,
|
|
bucket_region: remote_storage_s3_region,
|
|
prefix_in_bucket: Some(format!("test_{millis}_{random:08x}/")),
|
|
endpoint: None,
|
|
concurrency_limit: NonZeroUsize::new(100).unwrap(),
|
|
max_keys_per_list_response,
|
|
}),
|
|
};
|
|
Ok(Arc::new(
|
|
GenericRemoteStorage::from_config(&remote_storage_config).context("remote storage init")?,
|
|
))
|
|
}
|