From 5f71aa09d313f829875012752f65ae99e67fc3fe Mon Sep 17 00:00:00 2001 From: Dmitry Rodionov Date: Thu, 21 Jul 2022 20:59:07 +0300 Subject: [PATCH] support running tests against real s3 implementation without mocking --- Cargo.lock | 47 ++- control_plane/src/lib.rs | 6 +- libs/remote_storage/src/lib.rs | 3 + libs/remote_storage/src/local_fs.rs | 2 +- libs/remote_storage/src/s3_bucket.rs | 40 +-- neon_local/src/main.rs | 4 +- pageserver/src/storage_sync/download.rs | 10 + .../batch_others/test_ancestor_branch.py | 3 +- .../batch_others/test_remote_storage.py | 20 +- .../test_tenants_with_remote_storage.py | 23 +- test_runner/batch_others/test_wal_acceptor.py | 35 ++- test_runner/fixtures/neon_fixtures.py | 278 ++++++++++++++---- 12 files changed, 315 insertions(+), 156 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5031ae02e3..4a78b2e504 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -154,9 +154,9 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" [[package]] name = "axum" -version = "0.5.12" +version = "0.5.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d16705af05732b7d3258ec0f7b73c03a658a28925e050d8852d5b568ee8bcf4e" +checksum = "6b9496f0c1d1afb7a2af4338bbe1d969cddfead41d87a9fb3aaa6d0bbc7af648" dependencies = [ "async-trait", "axum-core", @@ -317,15 +317,6 @@ dependencies = [ "serde", ] -[[package]] -name = "cast" -version = "0.2.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c24dab4283a142afa2fdca129b80ad2c6284e073930f964c3a1293c225ee39a" -dependencies = [ - "rustc_version", -] - [[package]] name = "cast" version = "0.3.0" @@ -579,7 +570,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b01d6de93b2b6c65e17c634a26653a29d107b3c98c607c765bf38d041531cd8f" dependencies = [ "atty", - "cast 0.3.0", + "cast", "clap 2.34.0", "criterion-plot", "csv", @@ -600,11 +591,11 @@ dependencies = [ [[package]] name = "criterion-plot" -version = "0.4.4" +version = "0.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d00996de9f2f7559f7f4dc286073197f83e92256a59ed395f9aac01fe717da57" +checksum = "2673cc8207403546f45f5fd319a974b1e6983ad1a3ee7e6041650013be041876" dependencies = [ - "cast 0.2.7", + "cast", "itertools", ] @@ -680,9 +671,9 @@ dependencies = [ [[package]] name = "crypto-common" -version = "0.1.5" +version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2ccfd8c0ee4cce11e45b3fd6f9d5e69e0cc62912aa6a0cb1bf4617b0eba5a12f" +checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3" dependencies = [ "generic-array", "typenum", @@ -1116,9 +1107,9 @@ dependencies = [ [[package]] name = "gimli" -version = "0.26.1" +version = "0.26.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "78cc372d058dcf6d5ecd98510e7fbc9e5aec4d21de70f65fea8fecebcd881bd4" +checksum = "22030e2c5a68ec659fde1e949a745124b48e6fa8b045b7ed5bd1fe4ccc5c4e5d" [[package]] name = "git-version" @@ -1184,9 +1175,9 @@ dependencies = [ [[package]] name = "hashbrown" -version = "0.12.2" +version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "607c8a29735385251a339424dd462993c0fed8fa09d378f259377df08c126022" +checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" [[package]] name = "heck" @@ -1388,7 +1379,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "10a35a97730320ffe8e2d410b5d3b69279b98d2c14bdb8b70ea89ecf7888d41e" dependencies = [ "autocfg", - "hashbrown 0.12.2", + "hashbrown 0.12.3", ] [[package]] @@ -1851,9 +1842,9 @@ dependencies = [ [[package]] name = "os_str_bytes" -version = "6.1.0" +version = "6.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "21326818e99cfe6ce1e524c2a805c189a99b5ae555a35d19f9a284b427d86afa" +checksum = "648001efe5d5c0102d8cea768e348da85d90af8ba91f0bea908f157951493cd4" [[package]] name = "pageserver" @@ -2735,9 +2726,9 @@ dependencies = [ [[package]] name = "rustversion" -version = "1.0.7" +version = "1.0.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a0a5f7c728f5d284929a1cccb5bc19884422bfe6ef4d6c409da2c41838983fcf" +checksum = "24c8ad4f0c00e1eb5bc7614d236a7f1300e3dbd76b68cac8e06fb00b015ad8d8" [[package]] name = "ryu" @@ -3617,9 +3608,9 @@ checksum = "099b7128301d285f79ddd55b9a83d5e6b9e97c92e0ea0daebee7263e932de992" [[package]] name = "unicode-ident" -version = "1.0.1" +version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5bd2fe26506023ed7b5e1e315add59d6f584c621d037f9368fea9cfb988f368c" +checksum = "15c61ba63f9235225a22310255a29b806b907c9b8c964bcbd0a2c70f3f2deea7" [[package]] name = "unicode-normalization" diff --git a/control_plane/src/lib.rs b/control_plane/src/lib.rs index 4dfca588ad..17232ccf45 100644 --- a/control_plane/src/lib.rs +++ b/control_plane/src/lib.rs @@ -51,7 +51,11 @@ fn fill_rust_env_vars(cmd: &mut Command) -> &mut Command { } fn fill_aws_secrets_vars(mut cmd: &mut Command) -> &mut Command { - for env_key in ["AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"] { + for env_key in [ + "AWS_ACCESS_KEY_ID", + "AWS_SECRET_ACCESS_KEY", + "AWS_SESSION_TOKEN", + ] { if let Ok(value) = std::env::var(env_key) { cmd = cmd.env(env_key, value); } diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs index dec79e4580..07f8cb08aa 100644 --- a/libs/remote_storage/src/lib.rs +++ b/libs/remote_storage/src/lib.rs @@ -66,6 +66,9 @@ pub trait RemoteStorage: Send + Sync { async fn list(&self) -> anyhow::Result>; /// Lists all top level subdirectories for a given prefix + /// Note: here we assume that if the prefix is passed it was obtained via remote_object_id + /// which already takes into account any kind of global prefix (prefix_in_bucket for S3 or storage_root for LocalFS) + /// so this method doesnt need to. async fn list_prefixes( &self, prefix: Option, diff --git a/libs/remote_storage/src/local_fs.rs b/libs/remote_storage/src/local_fs.rs index df1581fb51..07b04084b9 100644 --- a/libs/remote_storage/src/local_fs.rs +++ b/libs/remote_storage/src/local_fs.rs @@ -116,7 +116,7 @@ impl RemoteStorage for LocalFs { prefix: Option, ) -> anyhow::Result> { let path = match prefix { - Some(prefix) => Cow::Owned(self.storage_root.join(prefix)), + Some(prefix) => Cow::Owned(prefix), None => Cow::Borrowed(&self.storage_root), }; get_all_files(path.as_ref(), false).await diff --git a/libs/remote_storage/src/s3_bucket.rs b/libs/remote_storage/src/s3_bucket.rs index ff52f033d1..1b241fe4ed 100644 --- a/libs/remote_storage/src/s3_bucket.rs +++ b/libs/remote_storage/src/s3_bucket.rs @@ -171,17 +171,25 @@ impl S3Bucket { let access_key_id = std::env::var("AWS_ACCESS_KEY_ID").ok(); let secret_access_key = std::env::var("AWS_SECRET_ACCESS_KEY").ok(); + // session token is used when authorizing through sso + // which is typically the case when testing locally on developer machine + let session_token = std::env::var("AWS_SESSION_TOKEN").ok(); let client = if access_key_id.is_none() && secret_access_key.is_none() { debug!("Using IAM-based AWS access"); S3Client::new_with(request_dispatcher, InstanceMetadataProvider::new(), region) } else { - debug!("Using credentials-based AWS access"); + debug!( + "Using credentials-based AWS access. Session token is set: {}", + session_token.is_some() + ); S3Client::new_with( request_dispatcher, - StaticProvider::new_minimal( + StaticProvider::new( access_key_id.unwrap_or_default(), secret_access_key.unwrap_or_default(), + session_token, + None, ), region, ) @@ -304,32 +312,24 @@ impl RemoteStorage for S3Bucket { Ok(document_keys) } + /// See the doc for `RemoteStorage::list_prefixes` /// Note: it wont include empty "directories" async fn list_prefixes( &self, prefix: Option, ) -> anyhow::Result> { - let list_prefix = match prefix { - Some(prefix) => { - let mut prefix_in_bucket = self.prefix_in_bucket.clone().unwrap_or_default(); - // if there is no trailing / in default prefix and - // supplied prefix does not start with "/" insert it - if !(prefix_in_bucket.ends_with(S3_PREFIX_SEPARATOR) - || prefix.0.starts_with(S3_PREFIX_SEPARATOR)) - { - prefix_in_bucket.push(S3_PREFIX_SEPARATOR); - } - - prefix_in_bucket.push_str(&prefix.0); + // get the passed prefix or if it is not set use prefix_in_bucket value + let list_prefix = prefix + .map(|p| p.0) + .or_else(|| self.prefix_in_bucket.clone()) + .map(|mut p| { // required to end with a separator // otherwise request will return only the entry of a prefix - if !prefix_in_bucket.ends_with(S3_PREFIX_SEPARATOR) { - prefix_in_bucket.push(S3_PREFIX_SEPARATOR); + if !p.ends_with(S3_PREFIX_SEPARATOR) { + p.push(S3_PREFIX_SEPARATOR); } - Some(prefix_in_bucket) - } - None => self.prefix_in_bucket.clone(), - }; + p + }); let mut document_keys = Vec::new(); diff --git a/neon_local/src/main.rs b/neon_local/src/main.rs index e6f5c6125d..24b40b72d6 100644 --- a/neon_local/src/main.rs +++ b/neon_local/src/main.rs @@ -884,7 +884,7 @@ fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Resul match sub_match.subcommand() { Some(("start", start_match)) => { if let Err(e) = pageserver.start(&pageserver_config_overrides(start_match)) { - eprintln!("pageserver start failed: {}", e); + eprintln!("pageserver start failed: {e}"); exit(1); } } @@ -906,7 +906,7 @@ fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Resul } if let Err(e) = pageserver.start(&pageserver_config_overrides(restart_match)) { - eprintln!("pageserver start failed: {}", e); + eprintln!("pageserver start failed: {e}"); exit(1); } } diff --git a/pageserver/src/storage_sync/download.rs b/pageserver/src/storage_sync/download.rs index a91eaaa7ca..441d5e563e 100644 --- a/pageserver/src/storage_sync/download.rs +++ b/pageserver/src/storage_sync/download.rs @@ -130,6 +130,7 @@ where tenant_path.display() ) })?; + let timelines = storage .list_prefixes(Some(tenant_storage_path)) .await @@ -140,6 +141,13 @@ where ) })?; + if timelines.is_empty() { + anyhow::bail!( + "no timelines found on the remote storage for tenant {}", + tenant_id + ) + } + let mut sync_ids = HashSet::new(); for timeline_remote_storage_key in timelines { @@ -194,6 +202,8 @@ where }) .map_err(DownloadError::BadInput)?; + warn!("part_storage_path {:?}", part_storage_path); + let mut index_part_download = storage.download(&part_storage_path).await?; let mut index_part_bytes = Vec::new(); diff --git a/test_runner/batch_others/test_ancestor_branch.py b/test_runner/batch_others/test_ancestor_branch.py index d8ba0a1b06..c4d36da043 100644 --- a/test_runner/batch_others/test_ancestor_branch.py +++ b/test_runner/batch_others/test_ancestor_branch.py @@ -1,6 +1,5 @@ -import pytest from fixtures.log_helper import log -from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, NeonPageserverApiException +from fixtures.neon_fixtures import NeonEnvBuilder from fixtures.utils import query_scalar diff --git a/test_runner/batch_others/test_remote_storage.py b/test_runner/batch_others/test_remote_storage.py index 6a8497a559..72963ffe21 100644 --- a/test_runner/batch_others/test_remote_storage.py +++ b/test_runner/batch_others/test_remote_storage.py @@ -2,11 +2,10 @@ # env ZENITH_PAGESERVER_OVERRIDES="remote_storage={local_path='/tmp/neon_zzz/'}" poetry ...... import shutil, os -from contextlib import closing from pathlib import Path import time from uuid import UUID -from fixtures.neon_fixtures import NeonEnvBuilder, assert_timeline_local, wait_until, wait_for_last_record_lsn, wait_for_upload +from fixtures.neon_fixtures import NeonEnvBuilder, RemoteStorageKind, assert_timeline_local, available_remote_storages, wait_until, wait_for_last_record_lsn, wait_for_upload from fixtures.log_helper import log from fixtures.utils import lsn_from_hex, query_scalar import pytest @@ -29,18 +28,19 @@ import pytest # * queries the specific data, ensuring that it matches the one stored before # # The tests are done for all types of remote storage pageserver supports. -@pytest.mark.parametrize('storage_type', ['local_fs', 'mock_s3']) -def test_remote_storage_backup_and_restore(neon_env_builder: NeonEnvBuilder, storage_type: str): +@pytest.mark.parametrize('remote_storatge_kind', available_remote_storages()) +def test_remote_storage_backup_and_restore( + neon_env_builder: NeonEnvBuilder, + remote_storatge_kind: RemoteStorageKind, +): # Use this test to check more realistic SK ids: some etcd key parsing bugs were related, # and this test needs SK to write data to pageserver, so it will be visible neon_env_builder.safekeepers_id_start = 12 - if storage_type == 'local_fs': - neon_env_builder.enable_local_fs_remote_storage() - elif storage_type == 'mock_s3': - neon_env_builder.enable_s3_mock_remote_storage('test_remote_storage_backup_and_restore') - else: - raise RuntimeError(f'Unknown storage type: {storage_type}') + neon_env_builder.enable_remote_storage( + remote_storage_kind=remote_storatge_kind, + test_name='test_remote_storage_backup_and_restore', + ) data_id = 1 data_secret = 'very secret secret' diff --git a/test_runner/batch_others/test_tenants_with_remote_storage.py b/test_runner/batch_others/test_tenants_with_remote_storage.py index 8ddb4d1b92..636616a45b 100644 --- a/test_runner/batch_others/test_tenants_with_remote_storage.py +++ b/test_runner/batch_others/test_tenants_with_remote_storage.py @@ -13,7 +13,7 @@ from uuid import UUID import pytest -from fixtures.neon_fixtures import NeonEnvBuilder, NeonEnv, Postgres, wait_for_last_record_lsn, wait_for_upload +from fixtures.neon_fixtures import NeonEnvBuilder, NeonEnv, Postgres, RemoteStorageKind, available_remote_storages, wait_for_last_record_lsn, wait_for_upload from fixtures.utils import lsn_from_hex @@ -38,7 +38,7 @@ async def tenant_workload(env: NeonEnv, pg: Postgres): async def all_tenants_workload(env: NeonEnv, tenants_pgs): workers = [] - for tenant, pg in tenants_pgs: + for _, pg in tenants_pgs: worker = tenant_workload(env, pg) workers.append(asyncio.create_task(worker)) @@ -46,23 +46,18 @@ async def all_tenants_workload(env: NeonEnv, tenants_pgs): await asyncio.gather(*workers) -@pytest.mark.parametrize('storage_type', ['local_fs', 'mock_s3']) -def test_tenants_many(neon_env_builder: NeonEnvBuilder, storage_type: str): - - if storage_type == 'local_fs': - neon_env_builder.enable_local_fs_remote_storage() - elif storage_type == 'mock_s3': - neon_env_builder.enable_s3_mock_remote_storage('test_remote_storage_backup_and_restore') - else: - raise RuntimeError(f'Unknown storage type: {storage_type}') - - neon_env_builder.enable_local_fs_remote_storage() +@pytest.mark.parametrize('remote_storatge_kind', available_remote_storages()) +def test_tenants_many(neon_env_builder: NeonEnvBuilder, remote_storatge_kind: RemoteStorageKind): + neon_env_builder.enable_remote_storage( + remote_storage_kind=remote_storatge_kind, + test_name='test_tenants_many', + ) env = neon_env_builder.init_start() tenants_pgs: List[Tuple[UUID, Postgres]] = [] - for i in range(1, 5): + for _ in range(1, 5): # Use a tiny checkpoint distance, to create a lot of layers quickly tenant, _ = env.neon_cli.create_tenant( conf={ diff --git a/test_runner/batch_others/test_wal_acceptor.py b/test_runner/batch_others/test_wal_acceptor.py index da861bb9f3..6544681bb0 100644 --- a/test_runner/batch_others/test_wal_acceptor.py +++ b/test_runner/batch_others/test_wal_acceptor.py @@ -12,9 +12,8 @@ import uuid from contextlib import closing from dataclasses import dataclass, field -from multiprocessing import Process, Value from pathlib import Path -from fixtures.neon_fixtures import NeonPageserver, PgBin, Etcd, Postgres, RemoteStorageUsers, Safekeeper, NeonEnv, NeonEnvBuilder, PortDistributor, SafekeeperPort, neon_binpath, PgProtocol, wait_for_last_record_lsn, wait_for_upload +from fixtures.neon_fixtures import NeonPageserver, PgBin, Etcd, Postgres, RemoteStorageKind, RemoteStorageUsers, Safekeeper, NeonEnv, NeonEnvBuilder, PortDistributor, SafekeeperPort, available_remote_storages, neon_binpath, PgProtocol, wait_for_last_record_lsn, wait_for_upload from fixtures.utils import get_dir_size, lsn_to_hex, lsn_from_hex, query_scalar from fixtures.log_helper import log from typing import List, Optional, Any @@ -377,15 +376,15 @@ def wait_wal_trim(tenant_id, timeline_id, sk, target_size): time.sleep(0.5) -@pytest.mark.parametrize('storage_type', ['mock_s3', 'local_fs']) -def test_wal_backup(neon_env_builder: NeonEnvBuilder, storage_type: str): +@pytest.mark.parametrize('remote_storatge_kind', available_remote_storages()) +def test_wal_backup(neon_env_builder: NeonEnvBuilder, remote_storatge_kind: RemoteStorageKind): neon_env_builder.num_safekeepers = 3 - if storage_type == 'local_fs': - neon_env_builder.enable_local_fs_remote_storage() - elif storage_type == 'mock_s3': - neon_env_builder.enable_s3_mock_remote_storage('test_safekeepers_wal_backup') - else: - raise RuntimeError(f'Unknown storage type: {storage_type}') + + neon_env_builder.enable_remote_storage( + remote_storage_kind=remote_storatge_kind, + test_name='test_safekeepers_wal_backup', + ) + neon_env_builder.remote_storage_users = RemoteStorageUsers.SAFEKEEPER env = neon_env_builder.init_start() @@ -425,15 +424,15 @@ def test_wal_backup(neon_env_builder: NeonEnvBuilder, storage_type: str): wait_segment_offload(tenant_id, timeline_id, env.safekeepers[1], '0/5000000') -@pytest.mark.parametrize('storage_type', ['mock_s3', 'local_fs']) -def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder, storage_type: str): +@pytest.mark.parametrize('remote_storatge_kind', available_remote_storages()) +def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder, remote_storatge_kind: RemoteStorageKind): neon_env_builder.num_safekeepers = 3 - if storage_type == 'local_fs': - neon_env_builder.enable_local_fs_remote_storage() - elif storage_type == 'mock_s3': - neon_env_builder.enable_s3_mock_remote_storage('test_s3_wal_replay') - else: - raise RuntimeError(f'Unknown storage type: {storage_type}') + + neon_env_builder.enable_remote_storage( + remote_storage_kind=remote_storatge_kind, + test_name='test_s3_wal_replay', + ) + neon_env_builder.remote_storage_users = RemoteStorageUsers.SAFEKEEPER env = neon_env_builder.init_start() diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 6783ab710b..87a598b387 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -3,6 +3,7 @@ from __future__ import annotations from dataclasses import field from contextlib import contextmanager from enum import Flag, auto +import enum import textwrap from cached_property import cached_property import abc @@ -262,6 +263,11 @@ def default_broker(request: Any, port_distributor: PortDistributor): broker.stop() +@pytest.fixture(scope='session') +def run_id(): + yield uuid.uuid4() + + @pytest.fixture(scope='session') def mock_s3_server(port_distributor: PortDistributor): mock_s3_server = MockS3Server(port_distributor.get_port()) @@ -438,26 +444,43 @@ class MockS3Server: def secret_key(self) -> str: return 'test' - def access_env_vars(self) -> Dict[Any, Any]: - return { - 'AWS_ACCESS_KEY_ID': self.access_key(), - 'AWS_SECRET_ACCESS_KEY': self.secret_key(), - } - def kill(self): self.subprocess.kill() +@enum.unique +class RemoteStorageKind(enum.Enum): + LOCAL_FS = "local_fs" + MOCK_S3 = "mock_s3" + REAL_S3 = "real_s3" + + +def available_remote_storages() -> List[RemoteStorageKind]: + remote_storages = [RemoteStorageKind.LOCAL_FS, RemoteStorageKind.MOCK_S3] + if os.getenv("ENABLE_REAL_S3_REMOTE_STORAGE"): + remote_storages.append(RemoteStorageKind.REAL_S3) + return remote_storages + + @dataclass class LocalFsStorage: - local_path: Path + root: Path @dataclass class S3Storage: bucket_name: str bucket_region: str - endpoint: Optional[str] + access_key: str + secret_key: str + endpoint: Optional[str] = None + prefix_in_bucket: Optional[str] = None + + def access_env_vars(self) -> Dict[str, str]: + return { + 'AWS_ACCESS_KEY_ID': self.access_key, + 'AWS_SECRET_ACCESS_KEY': self.secret_key, + } RemoteStorage = Union[LocalFsStorage, S3Storage] @@ -466,16 +489,20 @@ RemoteStorage = Union[LocalFsStorage, S3Storage] # serialize as toml inline table def remote_storage_to_toml_inline_table(remote_storage): if isinstance(remote_storage, LocalFsStorage): - res = f"local_path='{remote_storage.local_path}'" + remote_storage_config = f"local_path='{remote_storage.root}'" elif isinstance(remote_storage, S3Storage): - res = f"bucket_name='{remote_storage.bucket_name}', bucket_region='{remote_storage.bucket_region}'" + remote_storage_config = f"bucket_name='{remote_storage.bucket_name}',\ + bucket_region='{remote_storage.bucket_region}'" + + if remote_storage.prefix_in_bucket is not None: + remote_storage_config += f",prefix_in_bucket='{remote_storage.prefix_in_bucket}'" + if remote_storage.endpoint is not None: - res += f", endpoint='{remote_storage.endpoint}'" - else: - raise Exception(f'Unknown storage configuration {remote_storage}') + remote_storage_config += f",endpoint='{remote_storage.endpoint}'" else: raise Exception("invalid remote storage type") - return f"{{{res}}}" + + return f"{{{remote_storage_config}}}" class RemoteStorageUsers(Flag): @@ -493,28 +520,31 @@ class NeonEnvBuilder: cleaned up after the test has finished. """ def __init__( - self, - repo_dir: Path, - port_distributor: PortDistributor, - broker: Etcd, - mock_s3_server: MockS3Server, - remote_storage: Optional[RemoteStorage] = None, - remote_storage_users: RemoteStorageUsers = RemoteStorageUsers.PAGESERVER, - pageserver_config_override: Optional[str] = None, - num_safekeepers: int = 1, - # Use non-standard SK ids to check for various parsing bugs - safekeepers_id_start: int = 0, - # fsync is disabled by default to make the tests go faster - safekeepers_enable_fsync: bool = False, - auth_enabled: bool = False, - rust_log_override: Optional[str] = None, - default_branch_name=DEFAULT_BRANCH_NAME): + self, + repo_dir: Path, + port_distributor: PortDistributor, + broker: Etcd, + run_id: uuid.UUID, + mock_s3_server: MockS3Server, + remote_storage: Optional[RemoteStorage] = None, + remote_storage_users: RemoteStorageUsers = RemoteStorageUsers.PAGESERVER, + pageserver_config_override: Optional[str] = None, + num_safekeepers: int = 1, + # Use non-standard SK ids to check for various parsing bugs + safekeepers_id_start: int = 0, + # fsync is disabled by default to make the tests go faster + safekeepers_enable_fsync: bool = False, + auth_enabled: bool = False, + rust_log_override: Optional[str] = None, + default_branch_name=DEFAULT_BRANCH_NAME, + ): self.repo_dir = repo_dir self.rust_log_override = rust_log_override self.port_distributor = port_distributor self.remote_storage = remote_storage self.remote_storage_users = remote_storage_users self.broker = broker + self.run_id = run_id self.mock_s3_server = mock_s3_server self.pageserver_config_override = pageserver_config_override self.num_safekeepers = num_safekeepers @@ -523,6 +553,8 @@ class NeonEnvBuilder: self.auth_enabled = auth_enabled self.default_branch_name = default_branch_name self.env: Optional[NeonEnv] = None + self.remote_storage_prefix: Optional[str] = None + self.keep_remote_storage_contents: bool = True def init(self) -> NeonEnv: # Cannot create more than one environment from one builder @@ -538,41 +570,142 @@ class NeonEnvBuilder: self.start() return env - """ - Sets up the pageserver to use the local fs at the `test_dir/local_fs_remote_storage` path. - Errors, if the pageserver has some remote storage configuration already, unless `force_enable` is not set to `True`. - """ + def enable_remote_storage( + self, + remote_storage_kind: RemoteStorageKind, + test_name: str, + force_enable: bool = True, + ): + if remote_storage_kind == RemoteStorageKind.LOCAL_FS: + self.enable_local_fs_remote_storage(force_enable=force_enable) + elif remote_storage_kind == RemoteStorageKind.MOCK_S3: + self.enable_mock_s3_remote_storage(bucket_name=test_name, force_enable=force_enable) + elif remote_storage_kind == RemoteStorageKind.REAL_S3: + self.enable_real_s3_remote_storage(test_name=test_name, force_enable=force_enable) + else: + raise RuntimeError(f'Unknown storage type: {remote_storage_kind}') def enable_local_fs_remote_storage(self, force_enable=True): + """ + Sets up the pageserver to use the local fs at the `test_dir/local_fs_remote_storage` path. + Errors, if the pageserver has some remote storage configuration already, unless `force_enable` is not set to `True`. + """ assert force_enable or self.remote_storage is None, "remote storage is enabled already" self.remote_storage = LocalFsStorage(Path(self.repo_dir / 'local_fs_remote_storage')) - """ - Sets up the pageserver to use the S3 mock server, creates the bucket, if it's not present already. - Starts up the mock server, if that does not run yet. - Errors, if the pageserver has some remote storage configuration already, unless `force_enable` is not set to `True`. - """ - - def enable_s3_mock_remote_storage(self, bucket_name: str, force_enable=True): + def enable_mock_s3_remote_storage(self, bucket_name: str, force_enable=True): + """ + Sets up the pageserver to use the S3 mock server, creates the bucket, if it's not present already. + Starts up the mock server, if that does not run yet. + Errors, if the pageserver has some remote storage configuration already, unless `force_enable` is not set to `True`. + """ assert force_enable or self.remote_storage is None, "remote storage is enabled already" mock_endpoint = self.mock_s3_server.endpoint() mock_region = self.mock_s3_server.region() - boto3.client( + + self.remote_storage_client = boto3.client( 's3', endpoint_url=mock_endpoint, region_name=mock_region, aws_access_key_id=self.mock_s3_server.access_key(), aws_secret_access_key=self.mock_s3_server.secret_key(), ).create_bucket(Bucket=bucket_name) + + self.remote_storage = S3Storage( + bucket_name=bucket_name, + endpoint=mock_endpoint, + bucket_region=mock_region, + access_key=self.mock_s3_server.access_key(), + secret_key=self.mock_s3_server.secret_key(), + ) + + def enable_real_s3_remote_storage(self, test_name: str, force_enable=True): + """ + Sets up configuration to use real s3 endpoint without mock server + """ + assert force_enable or self.remote_storage is None, "remote storage is enabled already" + + access_key = os.getenv("AWS_ACCESS_KEY_ID") + assert access_key, "no aws access key provided" + secret_key = os.getenv("AWS_SECRET_ACCESS_KEY") + assert secret_key, "no aws access key provided" + + # session token is needed for local runs with sso auth + session_token = os.getenv("AWS_SESSION_TOKEN") + + bucket_name = os.getenv("REMOTE_STORAGE_S3_BUCKET") + assert bucket_name, "no remote storage bucket name provided" + region = os.getenv("REMOTE_STORAGE_S3_REGION") + assert region, "no remote storage region provided" + + # do not leave data in real s3 + self.keep_remote_storage_contents = False + + # construct a prefix inside bucket for the particular test case and test run + self.remote_storage_prefix = f'{self.run_id}/{test_name}' + + self.remote_storage_client = boto3.client( + 's3', + region_name=region, + aws_access_key_id=access_key, + aws_secret_access_key=secret_key, + aws_session_token=session_token, + ) self.remote_storage = S3Storage(bucket_name=bucket_name, - endpoint=mock_endpoint, - bucket_region=mock_region) + bucket_region=region, + access_key=access_key, + secret_key=secret_key, + prefix_in_bucket=self.remote_storage_prefix) + + def cleanup_remote_storage(self): + # here wee check for true remote storage, no the local one + # local cleanup is not needed after test because in ci all env will be destroyed anyway + if self.remote_storage_prefix is None: + log.info("no remote storage was set up, skipping cleanup") + return + + if self.keep_remote_storage_contents: + log.info("keep_remote_storage_contents skipping remote storage cleanup") + return + + log.info("removing data from test s3 bucket %s by prefix %s", + self.remote_storage.bucket_name, + self.remote_storage_prefix) + paginator = self.remote_storage_client.get_paginator('list_objects_v2') + pages = paginator.paginate( + Bucket=self.remote_storage.bucket_name, + Prefix=self.remote_storage_prefix, + ) + + objects_to_delete = {'Objects': []} + cnt = 0 + for item in pages.search('Contents'): + # weirdly when nothing is found it returns [None] + if item is None: + break + + objects_to_delete['Objects'].append({'Key': item['Key']}) + + # flush once aws limit reached + if len(objects_to_delete['Objects']) >= 1000: + self.remote_storage_client.delete_objects( + Bucket=self.remote_storage.bucket_name, + Delete=objects_to_delete, + ) + objects_to_delete = dict(Objects=[]) + cnt += 1 + + # flush rest + if len(objects_to_delete['Objects']): + self.remote_storage_client.delete_objects(Bucket=self.remote_storage.bucket_name, + Delete=objects_to_delete) + + log.info("deleted %s objects from remote storage", cnt) def __enter__(self): return self def __exit__(self, exc_type, exc_value, traceback): - # Stop all the nodes. if self.env: log.info('Cleaning up all storage and compute nodes') @@ -581,6 +714,8 @@ class NeonEnvBuilder: sk.stop(immediate=True) self.env.pageserver.stop(immediate=True) + self.cleanup_remote_storage() + class NeonEnv: """ @@ -713,10 +848,13 @@ class NeonEnv: @pytest.fixture(scope=shareable_scope) -def _shared_simple_env(request: Any, - port_distributor: PortDistributor, - mock_s3_server: MockS3Server, - default_broker: Etcd) -> Iterator[NeonEnv]: +def _shared_simple_env( + request: Any, + port_distributor: PortDistributor, + mock_s3_server: MockS3Server, + default_broker: Etcd, + run_id: uuid.UUID, +) -> Iterator[NeonEnv]: """ # Internal fixture backing the `neon_simple_env` fixture. If TEST_SHARED_FIXTURES is set, this is shared by all tests using `neon_simple_env`. @@ -730,8 +868,13 @@ def _shared_simple_env(request: Any, repo_dir = os.path.join(str(top_output_dir), "shared_repo") shutil.rmtree(repo_dir, ignore_errors=True) - with NeonEnvBuilder(Path(repo_dir), port_distributor, default_broker, - mock_s3_server) as builder: + with NeonEnvBuilder( + repo_dir=Path(repo_dir), + port_distributor=port_distributor, + broker=default_broker, + mock_s3_server=mock_s3_server, + run_id=run_id, + ) as builder: env = builder.init_start() # For convenience in tests, create a branch from the freshly-initialized cluster. @@ -756,10 +899,13 @@ def neon_simple_env(_shared_simple_env: NeonEnv) -> Iterator[NeonEnv]: @pytest.fixture(scope='function') -def neon_env_builder(test_output_dir, - port_distributor: PortDistributor, - mock_s3_server: MockS3Server, - default_broker: Etcd) -> Iterator[NeonEnvBuilder]: +def neon_env_builder( + test_output_dir, + port_distributor: PortDistributor, + mock_s3_server: MockS3Server, + default_broker: Etcd, + run_id: uuid.UUID, +) -> Iterator[NeonEnvBuilder]: """ Fixture to create a Neon environment for test. @@ -777,8 +923,13 @@ def neon_env_builder(test_output_dir, repo_dir = os.path.join(test_output_dir, "repo") # Return the builder to the caller - with NeonEnvBuilder(Path(repo_dir), port_distributor, default_broker, - mock_s3_server) as builder: + with NeonEnvBuilder( + repo_dir=Path(repo_dir), + port_distributor=port_distributor, + mock_s3_server=mock_s3_server, + broker=default_broker, + run_id=run_id, + ) as builder: yield builder @@ -1183,7 +1334,10 @@ class NeonCli(AbstractNeonCli): remote_storage_users=self.env.remote_storage_users, pageserver_config_override=self.env.pageserver.config_override) - s3_env_vars = self.env.s3_mock_server.access_env_vars() if self.env.s3_mock_server else None + s3_env_vars = None + if self.env.remote_storage is not None and isinstance(self.env.remote_storage, S3Storage): + s3_env_vars = self.env.remote_storage.access_env_vars() + return self.raw_cli(start_args, extra_env_vars=s3_env_vars) def pageserver_stop(self, immediate=False) -> 'subprocess.CompletedProcess[str]': @@ -1195,7 +1349,10 @@ class NeonCli(AbstractNeonCli): return self.raw_cli(cmd) def safekeeper_start(self, id: int) -> 'subprocess.CompletedProcess[str]': - s3_env_vars = self.env.s3_mock_server.access_env_vars() if self.env.s3_mock_server else None + s3_env_vars = None + if self.env.remote_storage is not None and isinstance(self.env.remote_storage, S3Storage): + s3_env_vars = self.env.remote_storage.access_env_vars() + return self.raw_cli(['safekeeper', 'start', str(id)], extra_env_vars=s3_env_vars) def safekeeper_stop(self, @@ -1337,7 +1494,7 @@ class NeonPageserver(PgProtocol): return self def __exit__(self, exc_type, exc, tb): - self.stop(True) + self.stop(immediate=True) def http_client(self, auth_token: Optional[str] = None) -> NeonPageserverHttpClient: return NeonPageserverHttpClient( @@ -1354,6 +1511,7 @@ def append_pageserver_param_overrides( ): if bool(remote_storage_users & RemoteStorageUsers.PAGESERVER) and remote_storage is not None: remote_storage_toml_table = remote_storage_to_toml_inline_table(remote_storage) + params_to_update.append( f'--pageserver-config-override=remote_storage={remote_storage_toml_table}')