diff --git a/Cargo.lock b/Cargo.lock index 17aacd8ee7..a19a97a40d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3085,6 +3085,7 @@ dependencies = [ "serde", "serde_json", "tempfile", + "test-context", "tokio", "tokio-util", "toml_edit", @@ -3888,6 +3889,27 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "test-context" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "055831a02a4f5aa28fede67f2902014273eb8c21b958ac5ebbd59b71ef30dbc3" +dependencies = [ + "async-trait", + "futures", + "test-context-macros", +] + +[[package]] +name = "test-context-macros" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8901a55b0a7a06ebc4a674dcca925170da8e613fa3b163a1df804ed10afb154d" +dependencies = [ + "quote", + "syn", +] + [[package]] name = "textwrap" version = "0.16.0" diff --git a/Cargo.toml b/Cargo.toml index e27a50a1cb..09cc150606 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -97,6 +97,7 @@ strum_macros = "0.24" svg_fmt = "0.4.1" sync_wrapper = "0.1.2" tar = "0.4" +test-context = "0.1" thiserror = "1.0" tls-listener = { version = "0.6", features = ["rustls", "hyper-h1"] } tokio = { version = "1.17", features = ["macros"] } diff --git a/libs/remote_storage/Cargo.toml b/libs/remote_storage/Cargo.toml index 15812e8439..da15823b69 100644 --- a/libs/remote_storage/Cargo.toml +++ b/libs/remote_storage/Cargo.toml @@ -26,3 +26,4 @@ workspace_hack.workspace = true [dev-dependencies] tempfile.workspace = true +test-context.workspace = true diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs index 901f849801..1d50a777f4 100644 --- a/libs/remote_storage/src/lib.rs +++ b/libs/remote_storage/src/lib.rs @@ -39,6 +39,9 @@ pub const DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS: u32 = 10; /// ~3500 PUT/COPY/POST/DELETE or 5500 GET/HEAD S3 requests /// https://aws.amazon.com/premiumsupport/knowledge-center/s3-request-limit-avoid-throttling/ pub const DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT: usize = 100; +/// No limits on the client side, which currenltly means 1000 for AWS S3. +/// https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html#API_ListObjectsV2_RequestSyntax +pub const DEFAULT_MAX_KEYS_PER_LIST_RESPONSE: Option = None; const REMOTE_STORAGE_PREFIX_SEPARATOR: char = '/'; @@ -64,6 +67,10 @@ impl RemotePath { pub fn object_name(&self) -> Option<&str> { self.0.file_name().and_then(|os_str| os_str.to_str()) } + + pub fn join(&self, segment: &Path) -> Self { + Self(self.0.join(segment)) + } } /// Storage (potentially remote) API to manage its state. @@ -266,6 +273,7 @@ pub struct S3Config { /// AWS S3 has various limits on its API calls, we need not to exceed those. /// See [`DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT`] for more details. pub concurrency_limit: NonZeroUsize, + pub max_keys_per_list_response: Option, } impl Debug for S3Config { @@ -275,6 +283,10 @@ impl Debug for S3Config { .field("bucket_region", &self.bucket_region) .field("prefix_in_bucket", &self.prefix_in_bucket) .field("concurrency_limit", &self.concurrency_limit) + .field( + "max_keys_per_list_response", + &self.max_keys_per_list_response, + ) .finish() } } @@ -303,6 +315,11 @@ impl RemoteStorageConfig { ) .context("Failed to parse 'concurrency_limit' as a positive integer")?; + let max_keys_per_list_response = + parse_optional_integer::("max_keys_per_list_response", toml) + .context("Failed to parse 'max_keys_per_list_response' as a positive integer")? + .or(DEFAULT_MAX_KEYS_PER_LIST_RESPONSE); + let storage = match (local_path, bucket_name, bucket_region) { // no 'local_path' nor 'bucket_name' options are provided, consider this remote storage disabled (None, None, None) => return Ok(None), @@ -324,6 +341,7 @@ impl RemoteStorageConfig { .map(|endpoint| parse_toml_string("endpoint", endpoint)) .transpose()?, concurrency_limit, + max_keys_per_list_response, }), (Some(local_path), None, None) => RemoteStorageKind::LocalFs(PathBuf::from( parse_toml_string("local_path", local_path)?, diff --git a/libs/remote_storage/src/s3_bucket.rs b/libs/remote_storage/src/s3_bucket.rs index a476ff32e0..d4eb7d9244 100644 --- a/libs/remote_storage/src/s3_bucket.rs +++ b/libs/remote_storage/src/s3_bucket.rs @@ -102,6 +102,7 @@ pub struct S3Bucket { client: Client, bucket_name: String, prefix_in_bucket: Option, + max_keys_per_list_response: Option, // Every request to S3 can be throttled or cancelled, if a certain number of requests per second is exceeded. // Same goes to IAM, which is queried before every S3 request, if enabled. IAM has even lower RPS threshold. // The helps to ensure we don't exceed the thresholds. @@ -164,6 +165,7 @@ impl S3Bucket { Ok(Self { client, bucket_name: aws_config.bucket_name.clone(), + max_keys_per_list_response: aws_config.max_keys_per_list_response, prefix_in_bucket, concurrency_limiter: Arc::new(Semaphore::new(aws_config.concurrency_limit.get())), }) @@ -293,6 +295,7 @@ impl RemoteStorage for S3Bucket { .set_prefix(self.prefix_in_bucket.clone()) .delimiter(REMOTE_STORAGE_PREFIX_SEPARATOR.to_string()) .set_continuation_token(continuation_token) + .set_max_keys(self.max_keys_per_list_response) .send() .await .map_err(|e| { @@ -355,6 +358,7 @@ impl RemoteStorage for S3Bucket { .set_prefix(list_prefix.clone()) .set_continuation_token(continuation_token) .delimiter(REMOTE_STORAGE_PREFIX_SEPARATOR.to_string()) + .set_max_keys(self.max_keys_per_list_response) .send() .await .map_err(|e| { diff --git a/libs/remote_storage/tests/pagination_tests.rs b/libs/remote_storage/tests/pagination_tests.rs new file mode 100644 index 0000000000..eb52409c44 --- /dev/null +++ b/libs/remote_storage/tests/pagination_tests.rs @@ -0,0 +1,275 @@ +use std::collections::HashSet; +use std::env; +use std::num::{NonZeroU32, NonZeroUsize}; +use std::ops::ControlFlow; +use std::path::{Path, PathBuf}; +use std::sync::Arc; +use std::time::UNIX_EPOCH; + +use anyhow::Context; +use remote_storage::{ + GenericRemoteStorage, RemotePath, RemoteStorageConfig, RemoteStorageKind, S3Config, +}; +use test_context::{test_context, AsyncTestContext}; +use tokio::task::JoinSet; +use tracing::{debug, error, info}; + +const ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME: &str = "ENABLE_REAL_S3_REMOTE_STORAGE"; + +/// Tests that S3 client can list all prefixes, even if the response come paginated and requires multiple S3 queries. +/// Uses real S3 and requires [`ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME`] and related S3 cred env vars specified. +/// See the client creation in [`create_s3_client`] for details on the required env vars. +/// If real S3 tests are disabled, the test passes, skipping any real test run: currently, there's no way to mark the test ignored in runtime with the +/// deafult test framework, see https://github.com/rust-lang/rust/issues/68007 for details. +/// +/// First, the test creates a set of S3 objects with keys `/${random_prefix_part}/${base_prefix_str}/sub_prefix_${i}/blob_${i}` in [`upload_s3_data`] +/// where +/// * `random_prefix_part` is set for the entire S3 client during the S3 client creation in [`create_s3_client`], to avoid multiple test runs interference +/// * `base_prefix_str` is a common prefix to use in the client requests: we would want to ensure that the client is able to list nested prefixes inside the bucket +/// +/// Then, verifies that the client does return correct prefixes when queried: +/// * with no prefix, it lists everything after its `${random_prefix_part}/` — that should be `${base_prefix_str}` value only +/// * with `${base_prefix_str}/` prefix, it lists every `sub_prefix_${i}` +/// +/// With the real S3 enabled and `#[cfg(test)]` Rust configuration used, the S3 client test adds a `max-keys` param to limit the response keys. +/// This way, we are able to test the pagination implicitly, by ensuring all results are returned from the remote storage and avoid uploading too many blobs to S3, +/// since current default AWS S3 pagination limit is 1000. +/// (see https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html#API_ListObjectsV2_RequestSyntax) +/// +/// Lastly, the test attempts to clean up and remove all uploaded S3 files. +/// If any errors appear during the clean up, they get logged, but the test is not failed or stopped until clean up is finished. +#[test_context(MaybeEnabledS3)] +#[tokio::test] +async fn s3_pagination_should_work(ctx: &mut MaybeEnabledS3) -> anyhow::Result<()> { + let ctx = match ctx { + MaybeEnabledS3::Enabled(ctx) => ctx, + MaybeEnabledS3::Disabled => return Ok(()), + MaybeEnabledS3::UploadsFailed(e, _) => anyhow::bail!("S3 init failed: {e:?}"), + }; + + let test_client = Arc::clone(&ctx.client_with_excessive_pagination); + let expected_remote_prefixes = ctx.remote_prefixes.clone(); + + let base_prefix = + RemotePath::new(Path::new(ctx.base_prefix_str)).context("common_prefix construction")?; + let root_remote_prefixes = test_client + .list_prefixes(None) + .await + .context("client list root prefixes failure")? + .into_iter() + .collect::>(); + assert_eq!( + root_remote_prefixes, HashSet::from([base_prefix.clone()]), + "remote storage root prefixes list mismatches with the uploads. Returned prefixes: {root_remote_prefixes:?}" + ); + + let nested_remote_prefixes = test_client + .list_prefixes(Some(&base_prefix)) + .await + .context("client list nested prefixes failure")? + .into_iter() + .collect::>(); + let remote_only_prefixes = nested_remote_prefixes + .difference(&expected_remote_prefixes) + .collect::>(); + let missing_uploaded_prefixes = expected_remote_prefixes + .difference(&nested_remote_prefixes) + .collect::>(); + assert_eq!( + remote_only_prefixes.len() + missing_uploaded_prefixes.len(), 0, + "remote storage nested prefixes list mismatches with the uploads. Remote only prefixes: {remote_only_prefixes:?}, missing uploaded prefixes: {missing_uploaded_prefixes:?}", + ); + + Ok(()) +} + +enum MaybeEnabledS3 { + Enabled(S3WithTestBlobs), + Disabled, + UploadsFailed(anyhow::Error, S3WithTestBlobs), +} + +struct S3WithTestBlobs { + client_with_excessive_pagination: Arc, + base_prefix_str: &'static str, + remote_prefixes: HashSet, + remote_blobs: HashSet, +} + +#[async_trait::async_trait] +impl AsyncTestContext for MaybeEnabledS3 { + async fn setup() -> Self { + utils::logging::init(utils::logging::LogFormat::Test).expect("logging init failed"); + if env::var(ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME).is_err() { + info!( + "`{}` env variable is not set, skipping the test", + ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME + ); + return Self::Disabled; + } + + let max_keys_in_list_response = 10; + let upload_tasks_count = 1 + (2 * usize::try_from(max_keys_in_list_response).unwrap()); + + let client_with_excessive_pagination = create_s3_client(max_keys_in_list_response) + .context("S3 client creation") + .expect("S3 client creation failed"); + + let base_prefix_str = "test/"; + match upload_s3_data( + &client_with_excessive_pagination, + base_prefix_str, + upload_tasks_count, + ) + .await + { + ControlFlow::Continue(uploads) => { + info!("Remote objects created successfully"); + Self::Enabled(S3WithTestBlobs { + client_with_excessive_pagination, + base_prefix_str, + remote_prefixes: uploads.prefixes, + remote_blobs: uploads.blobs, + }) + } + ControlFlow::Break(uploads) => Self::UploadsFailed( + anyhow::anyhow!("One or multiple blobs failed to upload to S3"), + S3WithTestBlobs { + client_with_excessive_pagination, + base_prefix_str, + remote_prefixes: uploads.prefixes, + remote_blobs: uploads.blobs, + }, + ), + } + } + + async fn teardown(self) { + match self { + Self::Disabled => {} + Self::Enabled(ctx) | Self::UploadsFailed(_, ctx) => { + cleanup(&ctx.client_with_excessive_pagination, ctx.remote_blobs).await; + } + } + } +} + +fn create_s3_client(max_keys_per_list_response: i32) -> anyhow::Result> { + let remote_storage_s3_bucket = env::var("REMOTE_STORAGE_S3_BUCKET") + .context("`REMOTE_STORAGE_S3_BUCKET` env var is not set, but real S3 tests are enabled")?; + let remote_storage_s3_region = env::var("REMOTE_STORAGE_S3_REGION") + .context("`REMOTE_STORAGE_S3_REGION` env var is not set, but real S3 tests are enabled")?; + let random_prefix_part = std::time::SystemTime::now() + .duration_since(UNIX_EPOCH) + .context("random s3 test prefix part calculation")? + .as_millis(); + let remote_storage_config = RemoteStorageConfig { + max_concurrent_syncs: NonZeroUsize::new(100).unwrap(), + max_sync_errors: NonZeroU32::new(5).unwrap(), + storage: RemoteStorageKind::AwsS3(S3Config { + bucket_name: remote_storage_s3_bucket, + bucket_region: remote_storage_s3_region, + prefix_in_bucket: Some(format!("pagination_should_work_test_{random_prefix_part}/")), + endpoint: None, + concurrency_limit: NonZeroUsize::new(100).unwrap(), + max_keys_per_list_response: Some(max_keys_per_list_response), + }), + }; + Ok(Arc::new( + GenericRemoteStorage::from_config(&remote_storage_config).context("remote storage init")?, + )) +} + +struct Uploads { + prefixes: HashSet, + blobs: HashSet, +} + +async fn upload_s3_data( + client: &Arc, + base_prefix_str: &'static str, + upload_tasks_count: usize, +) -> ControlFlow { + info!("Creating {upload_tasks_count} S3 files"); + let mut upload_tasks = JoinSet::new(); + for i in 1..upload_tasks_count + 1 { + let task_client = Arc::clone(client); + upload_tasks.spawn(async move { + let prefix = PathBuf::from(format!("{base_prefix_str}/sub_prefix_{i}/")); + let blob_prefix = RemotePath::new(&prefix) + .with_context(|| format!("{prefix:?} to RemotePath conversion"))?; + let blob_path = blob_prefix.join(Path::new(&format!("blob_{i}"))); + debug!("Creating remote item {i} at path {blob_path:?}"); + + let data = format!("remote blob data {i}").into_bytes(); + let data_len = data.len(); + task_client + .upload( + Box::new(std::io::Cursor::new(data)), + data_len, + &blob_path, + None, + ) + .await?; + + Ok::<_, anyhow::Error>((blob_prefix, blob_path)) + }); + } + + let mut upload_tasks_failed = false; + let mut uploaded_prefixes = HashSet::with_capacity(upload_tasks_count); + let mut uploaded_blobs = HashSet::with_capacity(upload_tasks_count); + while let Some(task_run_result) = upload_tasks.join_next().await { + match task_run_result + .context("task join failed") + .and_then(|task_result| task_result.context("upload task failed")) + { + Ok((upload_prefix, upload_path)) => { + uploaded_prefixes.insert(upload_prefix); + uploaded_blobs.insert(upload_path); + } + Err(e) => { + error!("Upload task failed: {e:?}"); + upload_tasks_failed = true; + } + } + } + + let uploads = Uploads { + prefixes: uploaded_prefixes, + blobs: uploaded_blobs, + }; + if upload_tasks_failed { + ControlFlow::Break(uploads) + } else { + ControlFlow::Continue(uploads) + } +} + +async fn cleanup(client: &Arc, objects_to_delete: HashSet) { + info!( + "Removing {} objects from the remote storage during cleanup", + objects_to_delete.len() + ); + let mut delete_tasks = JoinSet::new(); + for object_to_delete in objects_to_delete { + let task_client = Arc::clone(client); + delete_tasks.spawn(async move { + debug!("Deleting remote item at path {object_to_delete:?}"); + task_client + .delete(&object_to_delete) + .await + .with_context(|| format!("{object_to_delete:?} removal")) + }); + } + + while let Some(task_run_result) = delete_tasks.join_next().await { + match task_run_result { + Ok(task_result) => match task_result { + Ok(()) => {} + Err(e) => error!("Delete task failed: {e:?}"), + }, + Err(join_err) => error!("Delete task did not finish correctly: {join_err}"), + } + } +} diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index 58a6056385..7293e69f69 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -1238,6 +1238,7 @@ broker_endpoint = '{broker_endpoint}' prefix_in_bucket: Some(prefix_in_bucket.clone()), endpoint: Some(endpoint.clone()), concurrency_limit: s3_concurrency_limit, + max_keys_per_list_response: None, }), }, "Remote storage config should correctly parse the S3 config" diff --git a/test_runner/regress/test_remote_storage.py b/test_runner/regress/test_remote_storage.py index b9709d9b83..f6600e8974 100644 --- a/test_runner/regress/test_remote_storage.py +++ b/test_runner/regress/test_remote_storage.py @@ -6,7 +6,7 @@ import shutil import threading import time from pathlib import Path -from typing import Dict, List, Set, Tuple +from typing import Dict, List, Tuple import pytest from fixtures.log_helper import log @@ -717,50 +717,6 @@ def test_empty_branch_remote_storage_upload_on_restart( ), f"New branch should have been reuploaded on pageserver restart to the remote storage path '{new_branch_on_remote_storage}'" -# Test creates >1000 timelines and upload them to the remote storage. -# AWS S3 does not return more than 1000 items and starts paginating, ensure that pageserver paginates correctly. -@pytest.mark.skip("Too slow to run, requires too much disk space to run") -@pytest.mark.parametrize("remote_storage_kind", [RemoteStorageKind.MOCK_S3]) -def test_thousands_of_branches( - neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind -): - neon_env_builder.enable_remote_storage( - remote_storage_kind=remote_storage_kind, - test_name="test_compaction_downloads_on_demand_without_image_creation", - ) - - env = neon_env_builder.init_start() - client = env.pageserver.http_client() - expected_timelines: Set[TimelineId] = set([]) - tenant_id = env.initial_tenant - pg = env.postgres.create_start("main", tenant_id=tenant_id) - - max_timelines = 1500 - for i in range(0, max_timelines): - new_timeline_id = TimelineId.generate() - log.info(f"Creating timeline {new_timeline_id}, {i + 1} out of {max_timelines}") - expected_timelines.add(new_timeline_id) - - client.timeline_create(tenant_id, new_timeline_id=new_timeline_id) - client.timeline_checkpoint(tenant_id, new_timeline_id) - wait_for_last_flush_lsn(env, pg, tenant_id, new_timeline_id) - with pg.cursor() as cur: - current_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()")) - wait_for_upload(client, tenant_id, new_timeline_id, current_lsn) - - client.tenant_detach(tenant_id=tenant_id) - client.tenant_attach(tenant_id=tenant_id) - - timelines_after_reattach = set( - [timeline["timeline_id"] for timeline in client.timeline_list(tenant_id=tenant_id)] - ) - - assert ( - expected_timelines == timelines_after_reattach - ), f"Timelines after reattach do not match the ones created initially. \ - Missing timelines: {expected_timelines - timelines_after_reattach}, extra timelines: {timelines_after_reattach - expected_timelines}" - - def wait_upload_queue_empty( client: PageserverHttpClient, tenant_id: TenantId, timeline_id: TimelineId ):