support running tests against real s3 implementation without mocking

This commit is contained in:
Dmitry Rodionov
2022-07-21 20:59:07 +03:00
committed by Dmitry Rodionov
parent b4f2c5b514
commit 5f71aa09d3
12 changed files with 315 additions and 156 deletions

47
Cargo.lock generated
View File

@@ -154,9 +154,9 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]]
name = "axum"
version = "0.5.12"
version = "0.5.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d16705af05732b7d3258ec0f7b73c03a658a28925e050d8852d5b568ee8bcf4e"
checksum = "6b9496f0c1d1afb7a2af4338bbe1d969cddfead41d87a9fb3aaa6d0bbc7af648"
dependencies = [
"async-trait",
"axum-core",
@@ -317,15 +317,6 @@ dependencies = [
"serde",
]
[[package]]
name = "cast"
version = "0.2.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4c24dab4283a142afa2fdca129b80ad2c6284e073930f964c3a1293c225ee39a"
dependencies = [
"rustc_version",
]
[[package]]
name = "cast"
version = "0.3.0"
@@ -579,7 +570,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b01d6de93b2b6c65e17c634a26653a29d107b3c98c607c765bf38d041531cd8f"
dependencies = [
"atty",
"cast 0.3.0",
"cast",
"clap 2.34.0",
"criterion-plot",
"csv",
@@ -600,11 +591,11 @@ dependencies = [
[[package]]
name = "criterion-plot"
version = "0.4.4"
version = "0.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d00996de9f2f7559f7f4dc286073197f83e92256a59ed395f9aac01fe717da57"
checksum = "2673cc8207403546f45f5fd319a974b1e6983ad1a3ee7e6041650013be041876"
dependencies = [
"cast 0.2.7",
"cast",
"itertools",
]
@@ -680,9 +671,9 @@ dependencies = [
[[package]]
name = "crypto-common"
version = "0.1.5"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2ccfd8c0ee4cce11e45b3fd6f9d5e69e0cc62912aa6a0cb1bf4617b0eba5a12f"
checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3"
dependencies = [
"generic-array",
"typenum",
@@ -1116,9 +1107,9 @@ dependencies = [
[[package]]
name = "gimli"
version = "0.26.1"
version = "0.26.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "78cc372d058dcf6d5ecd98510e7fbc9e5aec4d21de70f65fea8fecebcd881bd4"
checksum = "22030e2c5a68ec659fde1e949a745124b48e6fa8b045b7ed5bd1fe4ccc5c4e5d"
[[package]]
name = "git-version"
@@ -1184,9 +1175,9 @@ dependencies = [
[[package]]
name = "hashbrown"
version = "0.12.2"
version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "607c8a29735385251a339424dd462993c0fed8fa09d378f259377df08c126022"
checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888"
[[package]]
name = "heck"
@@ -1388,7 +1379,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "10a35a97730320ffe8e2d410b5d3b69279b98d2c14bdb8b70ea89ecf7888d41e"
dependencies = [
"autocfg",
"hashbrown 0.12.2",
"hashbrown 0.12.3",
]
[[package]]
@@ -1851,9 +1842,9 @@ dependencies = [
[[package]]
name = "os_str_bytes"
version = "6.1.0"
version = "6.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "21326818e99cfe6ce1e524c2a805c189a99b5ae555a35d19f9a284b427d86afa"
checksum = "648001efe5d5c0102d8cea768e348da85d90af8ba91f0bea908f157951493cd4"
[[package]]
name = "pageserver"
@@ -2735,9 +2726,9 @@ dependencies = [
[[package]]
name = "rustversion"
version = "1.0.7"
version = "1.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a0a5f7c728f5d284929a1cccb5bc19884422bfe6ef4d6c409da2c41838983fcf"
checksum = "24c8ad4f0c00e1eb5bc7614d236a7f1300e3dbd76b68cac8e06fb00b015ad8d8"
[[package]]
name = "ryu"
@@ -3617,9 +3608,9 @@ checksum = "099b7128301d285f79ddd55b9a83d5e6b9e97c92e0ea0daebee7263e932de992"
[[package]]
name = "unicode-ident"
version = "1.0.1"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5bd2fe26506023ed7b5e1e315add59d6f584c621d037f9368fea9cfb988f368c"
checksum = "15c61ba63f9235225a22310255a29b806b907c9b8c964bcbd0a2c70f3f2deea7"
[[package]]
name = "unicode-normalization"

View File

@@ -51,7 +51,11 @@ fn fill_rust_env_vars(cmd: &mut Command) -> &mut Command {
}
fn fill_aws_secrets_vars(mut cmd: &mut Command) -> &mut Command {
for env_key in ["AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"] {
for env_key in [
"AWS_ACCESS_KEY_ID",
"AWS_SECRET_ACCESS_KEY",
"AWS_SESSION_TOKEN",
] {
if let Ok(value) = std::env::var(env_key) {
cmd = cmd.env(env_key, value);
}

View File

@@ -66,6 +66,9 @@ pub trait RemoteStorage: Send + Sync {
async fn list(&self) -> anyhow::Result<Vec<Self::RemoteObjectId>>;
/// Lists all top level subdirectories for a given prefix
/// Note: here we assume that if the prefix is passed it was obtained via remote_object_id
/// which already takes into account any kind of global prefix (prefix_in_bucket for S3 or storage_root for LocalFS)
/// so this method doesnt need to.
async fn list_prefixes(
&self,
prefix: Option<Self::RemoteObjectId>,

View File

@@ -116,7 +116,7 @@ impl RemoteStorage for LocalFs {
prefix: Option<Self::RemoteObjectId>,
) -> anyhow::Result<Vec<Self::RemoteObjectId>> {
let path = match prefix {
Some(prefix) => Cow::Owned(self.storage_root.join(prefix)),
Some(prefix) => Cow::Owned(prefix),
None => Cow::Borrowed(&self.storage_root),
};
get_all_files(path.as_ref(), false).await

View File

@@ -171,17 +171,25 @@ impl S3Bucket {
let access_key_id = std::env::var("AWS_ACCESS_KEY_ID").ok();
let secret_access_key = std::env::var("AWS_SECRET_ACCESS_KEY").ok();
// session token is used when authorizing through sso
// which is typically the case when testing locally on developer machine
let session_token = std::env::var("AWS_SESSION_TOKEN").ok();
let client = if access_key_id.is_none() && secret_access_key.is_none() {
debug!("Using IAM-based AWS access");
S3Client::new_with(request_dispatcher, InstanceMetadataProvider::new(), region)
} else {
debug!("Using credentials-based AWS access");
debug!(
"Using credentials-based AWS access. Session token is set: {}",
session_token.is_some()
);
S3Client::new_with(
request_dispatcher,
StaticProvider::new_minimal(
StaticProvider::new(
access_key_id.unwrap_or_default(),
secret_access_key.unwrap_or_default(),
session_token,
None,
),
region,
)
@@ -304,32 +312,24 @@ impl RemoteStorage for S3Bucket {
Ok(document_keys)
}
/// See the doc for `RemoteStorage::list_prefixes`
/// Note: it wont include empty "directories"
async fn list_prefixes(
&self,
prefix: Option<Self::RemoteObjectId>,
) -> anyhow::Result<Vec<Self::RemoteObjectId>> {
let list_prefix = match prefix {
Some(prefix) => {
let mut prefix_in_bucket = self.prefix_in_bucket.clone().unwrap_or_default();
// if there is no trailing / in default prefix and
// supplied prefix does not start with "/" insert it
if !(prefix_in_bucket.ends_with(S3_PREFIX_SEPARATOR)
|| prefix.0.starts_with(S3_PREFIX_SEPARATOR))
{
prefix_in_bucket.push(S3_PREFIX_SEPARATOR);
}
prefix_in_bucket.push_str(&prefix.0);
// get the passed prefix or if it is not set use prefix_in_bucket value
let list_prefix = prefix
.map(|p| p.0)
.or_else(|| self.prefix_in_bucket.clone())
.map(|mut p| {
// required to end with a separator
// otherwise request will return only the entry of a prefix
if !prefix_in_bucket.ends_with(S3_PREFIX_SEPARATOR) {
prefix_in_bucket.push(S3_PREFIX_SEPARATOR);
if !p.ends_with(S3_PREFIX_SEPARATOR) {
p.push(S3_PREFIX_SEPARATOR);
}
Some(prefix_in_bucket)
}
None => self.prefix_in_bucket.clone(),
};
p
});
let mut document_keys = Vec::new();

View File

@@ -884,7 +884,7 @@ fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Resul
match sub_match.subcommand() {
Some(("start", start_match)) => {
if let Err(e) = pageserver.start(&pageserver_config_overrides(start_match)) {
eprintln!("pageserver start failed: {}", e);
eprintln!("pageserver start failed: {e}");
exit(1);
}
}
@@ -906,7 +906,7 @@ fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Resul
}
if let Err(e) = pageserver.start(&pageserver_config_overrides(restart_match)) {
eprintln!("pageserver start failed: {}", e);
eprintln!("pageserver start failed: {e}");
exit(1);
}
}

View File

@@ -130,6 +130,7 @@ where
tenant_path.display()
)
})?;
let timelines = storage
.list_prefixes(Some(tenant_storage_path))
.await
@@ -140,6 +141,13 @@ where
)
})?;
if timelines.is_empty() {
anyhow::bail!(
"no timelines found on the remote storage for tenant {}",
tenant_id
)
}
let mut sync_ids = HashSet::new();
for timeline_remote_storage_key in timelines {
@@ -194,6 +202,8 @@ where
})
.map_err(DownloadError::BadInput)?;
warn!("part_storage_path {:?}", part_storage_path);
let mut index_part_download = storage.download(&part_storage_path).await?;
let mut index_part_bytes = Vec::new();

View File

@@ -1,6 +1,5 @@
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, NeonPageserverApiException
from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.utils import query_scalar

View File

@@ -2,11 +2,10 @@
# env ZENITH_PAGESERVER_OVERRIDES="remote_storage={local_path='/tmp/neon_zzz/'}" poetry ......
import shutil, os
from contextlib import closing
from pathlib import Path
import time
from uuid import UUID
from fixtures.neon_fixtures import NeonEnvBuilder, assert_timeline_local, wait_until, wait_for_last_record_lsn, wait_for_upload
from fixtures.neon_fixtures import NeonEnvBuilder, RemoteStorageKind, assert_timeline_local, available_remote_storages, wait_until, wait_for_last_record_lsn, wait_for_upload
from fixtures.log_helper import log
from fixtures.utils import lsn_from_hex, query_scalar
import pytest
@@ -29,18 +28,19 @@ import pytest
# * queries the specific data, ensuring that it matches the one stored before
#
# The tests are done for all types of remote storage pageserver supports.
@pytest.mark.parametrize('storage_type', ['local_fs', 'mock_s3'])
def test_remote_storage_backup_and_restore(neon_env_builder: NeonEnvBuilder, storage_type: str):
@pytest.mark.parametrize('remote_storatge_kind', available_remote_storages())
def test_remote_storage_backup_and_restore(
neon_env_builder: NeonEnvBuilder,
remote_storatge_kind: RemoteStorageKind,
):
# Use this test to check more realistic SK ids: some etcd key parsing bugs were related,
# and this test needs SK to write data to pageserver, so it will be visible
neon_env_builder.safekeepers_id_start = 12
if storage_type == 'local_fs':
neon_env_builder.enable_local_fs_remote_storage()
elif storage_type == 'mock_s3':
neon_env_builder.enable_s3_mock_remote_storage('test_remote_storage_backup_and_restore')
else:
raise RuntimeError(f'Unknown storage type: {storage_type}')
neon_env_builder.enable_remote_storage(
remote_storage_kind=remote_storatge_kind,
test_name='test_remote_storage_backup_and_restore',
)
data_id = 1
data_secret = 'very secret secret'

View File

@@ -13,7 +13,7 @@ from uuid import UUID
import pytest
from fixtures.neon_fixtures import NeonEnvBuilder, NeonEnv, Postgres, wait_for_last_record_lsn, wait_for_upload
from fixtures.neon_fixtures import NeonEnvBuilder, NeonEnv, Postgres, RemoteStorageKind, available_remote_storages, wait_for_last_record_lsn, wait_for_upload
from fixtures.utils import lsn_from_hex
@@ -38,7 +38,7 @@ async def tenant_workload(env: NeonEnv, pg: Postgres):
async def all_tenants_workload(env: NeonEnv, tenants_pgs):
workers = []
for tenant, pg in tenants_pgs:
for _, pg in tenants_pgs:
worker = tenant_workload(env, pg)
workers.append(asyncio.create_task(worker))
@@ -46,23 +46,18 @@ async def all_tenants_workload(env: NeonEnv, tenants_pgs):
await asyncio.gather(*workers)
@pytest.mark.parametrize('storage_type', ['local_fs', 'mock_s3'])
def test_tenants_many(neon_env_builder: NeonEnvBuilder, storage_type: str):
if storage_type == 'local_fs':
neon_env_builder.enable_local_fs_remote_storage()
elif storage_type == 'mock_s3':
neon_env_builder.enable_s3_mock_remote_storage('test_remote_storage_backup_and_restore')
else:
raise RuntimeError(f'Unknown storage type: {storage_type}')
neon_env_builder.enable_local_fs_remote_storage()
@pytest.mark.parametrize('remote_storatge_kind', available_remote_storages())
def test_tenants_many(neon_env_builder: NeonEnvBuilder, remote_storatge_kind: RemoteStorageKind):
neon_env_builder.enable_remote_storage(
remote_storage_kind=remote_storatge_kind,
test_name='test_tenants_many',
)
env = neon_env_builder.init_start()
tenants_pgs: List[Tuple[UUID, Postgres]] = []
for i in range(1, 5):
for _ in range(1, 5):
# Use a tiny checkpoint distance, to create a lot of layers quickly
tenant, _ = env.neon_cli.create_tenant(
conf={

View File

@@ -12,9 +12,8 @@ import uuid
from contextlib import closing
from dataclasses import dataclass, field
from multiprocessing import Process, Value
from pathlib import Path
from fixtures.neon_fixtures import NeonPageserver, PgBin, Etcd, Postgres, RemoteStorageUsers, Safekeeper, NeonEnv, NeonEnvBuilder, PortDistributor, SafekeeperPort, neon_binpath, PgProtocol, wait_for_last_record_lsn, wait_for_upload
from fixtures.neon_fixtures import NeonPageserver, PgBin, Etcd, Postgres, RemoteStorageKind, RemoteStorageUsers, Safekeeper, NeonEnv, NeonEnvBuilder, PortDistributor, SafekeeperPort, available_remote_storages, neon_binpath, PgProtocol, wait_for_last_record_lsn, wait_for_upload
from fixtures.utils import get_dir_size, lsn_to_hex, lsn_from_hex, query_scalar
from fixtures.log_helper import log
from typing import List, Optional, Any
@@ -377,15 +376,15 @@ def wait_wal_trim(tenant_id, timeline_id, sk, target_size):
time.sleep(0.5)
@pytest.mark.parametrize('storage_type', ['mock_s3', 'local_fs'])
def test_wal_backup(neon_env_builder: NeonEnvBuilder, storage_type: str):
@pytest.mark.parametrize('remote_storatge_kind', available_remote_storages())
def test_wal_backup(neon_env_builder: NeonEnvBuilder, remote_storatge_kind: RemoteStorageKind):
neon_env_builder.num_safekeepers = 3
if storage_type == 'local_fs':
neon_env_builder.enable_local_fs_remote_storage()
elif storage_type == 'mock_s3':
neon_env_builder.enable_s3_mock_remote_storage('test_safekeepers_wal_backup')
else:
raise RuntimeError(f'Unknown storage type: {storage_type}')
neon_env_builder.enable_remote_storage(
remote_storage_kind=remote_storatge_kind,
test_name='test_safekeepers_wal_backup',
)
neon_env_builder.remote_storage_users = RemoteStorageUsers.SAFEKEEPER
env = neon_env_builder.init_start()
@@ -425,15 +424,15 @@ def test_wal_backup(neon_env_builder: NeonEnvBuilder, storage_type: str):
wait_segment_offload(tenant_id, timeline_id, env.safekeepers[1], '0/5000000')
@pytest.mark.parametrize('storage_type', ['mock_s3', 'local_fs'])
def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder, storage_type: str):
@pytest.mark.parametrize('remote_storatge_kind', available_remote_storages())
def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder, remote_storatge_kind: RemoteStorageKind):
neon_env_builder.num_safekeepers = 3
if storage_type == 'local_fs':
neon_env_builder.enable_local_fs_remote_storage()
elif storage_type == 'mock_s3':
neon_env_builder.enable_s3_mock_remote_storage('test_s3_wal_replay')
else:
raise RuntimeError(f'Unknown storage type: {storage_type}')
neon_env_builder.enable_remote_storage(
remote_storage_kind=remote_storatge_kind,
test_name='test_s3_wal_replay',
)
neon_env_builder.remote_storage_users = RemoteStorageUsers.SAFEKEEPER
env = neon_env_builder.init_start()

View File

@@ -3,6 +3,7 @@ from __future__ import annotations
from dataclasses import field
from contextlib import contextmanager
from enum import Flag, auto
import enum
import textwrap
from cached_property import cached_property
import abc
@@ -262,6 +263,11 @@ def default_broker(request: Any, port_distributor: PortDistributor):
broker.stop()
@pytest.fixture(scope='session')
def run_id():
yield uuid.uuid4()
@pytest.fixture(scope='session')
def mock_s3_server(port_distributor: PortDistributor):
mock_s3_server = MockS3Server(port_distributor.get_port())
@@ -438,26 +444,43 @@ class MockS3Server:
def secret_key(self) -> str:
return 'test'
def access_env_vars(self) -> Dict[Any, Any]:
return {
'AWS_ACCESS_KEY_ID': self.access_key(),
'AWS_SECRET_ACCESS_KEY': self.secret_key(),
}
def kill(self):
self.subprocess.kill()
@enum.unique
class RemoteStorageKind(enum.Enum):
LOCAL_FS = "local_fs"
MOCK_S3 = "mock_s3"
REAL_S3 = "real_s3"
def available_remote_storages() -> List[RemoteStorageKind]:
remote_storages = [RemoteStorageKind.LOCAL_FS, RemoteStorageKind.MOCK_S3]
if os.getenv("ENABLE_REAL_S3_REMOTE_STORAGE"):
remote_storages.append(RemoteStorageKind.REAL_S3)
return remote_storages
@dataclass
class LocalFsStorage:
local_path: Path
root: Path
@dataclass
class S3Storage:
bucket_name: str
bucket_region: str
endpoint: Optional[str]
access_key: str
secret_key: str
endpoint: Optional[str] = None
prefix_in_bucket: Optional[str] = None
def access_env_vars(self) -> Dict[str, str]:
return {
'AWS_ACCESS_KEY_ID': self.access_key,
'AWS_SECRET_ACCESS_KEY': self.secret_key,
}
RemoteStorage = Union[LocalFsStorage, S3Storage]
@@ -466,16 +489,20 @@ RemoteStorage = Union[LocalFsStorage, S3Storage]
# serialize as toml inline table
def remote_storage_to_toml_inline_table(remote_storage):
if isinstance(remote_storage, LocalFsStorage):
res = f"local_path='{remote_storage.local_path}'"
remote_storage_config = f"local_path='{remote_storage.root}'"
elif isinstance(remote_storage, S3Storage):
res = f"bucket_name='{remote_storage.bucket_name}', bucket_region='{remote_storage.bucket_region}'"
remote_storage_config = f"bucket_name='{remote_storage.bucket_name}',\
bucket_region='{remote_storage.bucket_region}'"
if remote_storage.prefix_in_bucket is not None:
remote_storage_config += f",prefix_in_bucket='{remote_storage.prefix_in_bucket}'"
if remote_storage.endpoint is not None:
res += f", endpoint='{remote_storage.endpoint}'"
else:
raise Exception(f'Unknown storage configuration {remote_storage}')
remote_storage_config += f",endpoint='{remote_storage.endpoint}'"
else:
raise Exception("invalid remote storage type")
return f"{{{res}}}"
return f"{{{remote_storage_config}}}"
class RemoteStorageUsers(Flag):
@@ -493,28 +520,31 @@ class NeonEnvBuilder:
cleaned up after the test has finished.
"""
def __init__(
self,
repo_dir: Path,
port_distributor: PortDistributor,
broker: Etcd,
mock_s3_server: MockS3Server,
remote_storage: Optional[RemoteStorage] = None,
remote_storage_users: RemoteStorageUsers = RemoteStorageUsers.PAGESERVER,
pageserver_config_override: Optional[str] = None,
num_safekeepers: int = 1,
# Use non-standard SK ids to check for various parsing bugs
safekeepers_id_start: int = 0,
# fsync is disabled by default to make the tests go faster
safekeepers_enable_fsync: bool = False,
auth_enabled: bool = False,
rust_log_override: Optional[str] = None,
default_branch_name=DEFAULT_BRANCH_NAME):
self,
repo_dir: Path,
port_distributor: PortDistributor,
broker: Etcd,
run_id: uuid.UUID,
mock_s3_server: MockS3Server,
remote_storage: Optional[RemoteStorage] = None,
remote_storage_users: RemoteStorageUsers = RemoteStorageUsers.PAGESERVER,
pageserver_config_override: Optional[str] = None,
num_safekeepers: int = 1,
# Use non-standard SK ids to check for various parsing bugs
safekeepers_id_start: int = 0,
# fsync is disabled by default to make the tests go faster
safekeepers_enable_fsync: bool = False,
auth_enabled: bool = False,
rust_log_override: Optional[str] = None,
default_branch_name=DEFAULT_BRANCH_NAME,
):
self.repo_dir = repo_dir
self.rust_log_override = rust_log_override
self.port_distributor = port_distributor
self.remote_storage = remote_storage
self.remote_storage_users = remote_storage_users
self.broker = broker
self.run_id = run_id
self.mock_s3_server = mock_s3_server
self.pageserver_config_override = pageserver_config_override
self.num_safekeepers = num_safekeepers
@@ -523,6 +553,8 @@ class NeonEnvBuilder:
self.auth_enabled = auth_enabled
self.default_branch_name = default_branch_name
self.env: Optional[NeonEnv] = None
self.remote_storage_prefix: Optional[str] = None
self.keep_remote_storage_contents: bool = True
def init(self) -> NeonEnv:
# Cannot create more than one environment from one builder
@@ -538,41 +570,142 @@ class NeonEnvBuilder:
self.start()
return env
"""
Sets up the pageserver to use the local fs at the `test_dir/local_fs_remote_storage` path.
Errors, if the pageserver has some remote storage configuration already, unless `force_enable` is not set to `True`.
"""
def enable_remote_storage(
self,
remote_storage_kind: RemoteStorageKind,
test_name: str,
force_enable: bool = True,
):
if remote_storage_kind == RemoteStorageKind.LOCAL_FS:
self.enable_local_fs_remote_storage(force_enable=force_enable)
elif remote_storage_kind == RemoteStorageKind.MOCK_S3:
self.enable_mock_s3_remote_storage(bucket_name=test_name, force_enable=force_enable)
elif remote_storage_kind == RemoteStorageKind.REAL_S3:
self.enable_real_s3_remote_storage(test_name=test_name, force_enable=force_enable)
else:
raise RuntimeError(f'Unknown storage type: {remote_storage_kind}')
def enable_local_fs_remote_storage(self, force_enable=True):
"""
Sets up the pageserver to use the local fs at the `test_dir/local_fs_remote_storage` path.
Errors, if the pageserver has some remote storage configuration already, unless `force_enable` is not set to `True`.
"""
assert force_enable or self.remote_storage is None, "remote storage is enabled already"
self.remote_storage = LocalFsStorage(Path(self.repo_dir / 'local_fs_remote_storage'))
"""
Sets up the pageserver to use the S3 mock server, creates the bucket, if it's not present already.
Starts up the mock server, if that does not run yet.
Errors, if the pageserver has some remote storage configuration already, unless `force_enable` is not set to `True`.
"""
def enable_s3_mock_remote_storage(self, bucket_name: str, force_enable=True):
def enable_mock_s3_remote_storage(self, bucket_name: str, force_enable=True):
"""
Sets up the pageserver to use the S3 mock server, creates the bucket, if it's not present already.
Starts up the mock server, if that does not run yet.
Errors, if the pageserver has some remote storage configuration already, unless `force_enable` is not set to `True`.
"""
assert force_enable or self.remote_storage is None, "remote storage is enabled already"
mock_endpoint = self.mock_s3_server.endpoint()
mock_region = self.mock_s3_server.region()
boto3.client(
self.remote_storage_client = boto3.client(
's3',
endpoint_url=mock_endpoint,
region_name=mock_region,
aws_access_key_id=self.mock_s3_server.access_key(),
aws_secret_access_key=self.mock_s3_server.secret_key(),
).create_bucket(Bucket=bucket_name)
self.remote_storage = S3Storage(
bucket_name=bucket_name,
endpoint=mock_endpoint,
bucket_region=mock_region,
access_key=self.mock_s3_server.access_key(),
secret_key=self.mock_s3_server.secret_key(),
)
def enable_real_s3_remote_storage(self, test_name: str, force_enable=True):
"""
Sets up configuration to use real s3 endpoint without mock server
"""
assert force_enable or self.remote_storage is None, "remote storage is enabled already"
access_key = os.getenv("AWS_ACCESS_KEY_ID")
assert access_key, "no aws access key provided"
secret_key = os.getenv("AWS_SECRET_ACCESS_KEY")
assert secret_key, "no aws access key provided"
# session token is needed for local runs with sso auth
session_token = os.getenv("AWS_SESSION_TOKEN")
bucket_name = os.getenv("REMOTE_STORAGE_S3_BUCKET")
assert bucket_name, "no remote storage bucket name provided"
region = os.getenv("REMOTE_STORAGE_S3_REGION")
assert region, "no remote storage region provided"
# do not leave data in real s3
self.keep_remote_storage_contents = False
# construct a prefix inside bucket for the particular test case and test run
self.remote_storage_prefix = f'{self.run_id}/{test_name}'
self.remote_storage_client = boto3.client(
's3',
region_name=region,
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
aws_session_token=session_token,
)
self.remote_storage = S3Storage(bucket_name=bucket_name,
endpoint=mock_endpoint,
bucket_region=mock_region)
bucket_region=region,
access_key=access_key,
secret_key=secret_key,
prefix_in_bucket=self.remote_storage_prefix)
def cleanup_remote_storage(self):
# here wee check for true remote storage, no the local one
# local cleanup is not needed after test because in ci all env will be destroyed anyway
if self.remote_storage_prefix is None:
log.info("no remote storage was set up, skipping cleanup")
return
if self.keep_remote_storage_contents:
log.info("keep_remote_storage_contents skipping remote storage cleanup")
return
log.info("removing data from test s3 bucket %s by prefix %s",
self.remote_storage.bucket_name,
self.remote_storage_prefix)
paginator = self.remote_storage_client.get_paginator('list_objects_v2')
pages = paginator.paginate(
Bucket=self.remote_storage.bucket_name,
Prefix=self.remote_storage_prefix,
)
objects_to_delete = {'Objects': []}
cnt = 0
for item in pages.search('Contents'):
# weirdly when nothing is found it returns [None]
if item is None:
break
objects_to_delete['Objects'].append({'Key': item['Key']})
# flush once aws limit reached
if len(objects_to_delete['Objects']) >= 1000:
self.remote_storage_client.delete_objects(
Bucket=self.remote_storage.bucket_name,
Delete=objects_to_delete,
)
objects_to_delete = dict(Objects=[])
cnt += 1
# flush rest
if len(objects_to_delete['Objects']):
self.remote_storage_client.delete_objects(Bucket=self.remote_storage.bucket_name,
Delete=objects_to_delete)
log.info("deleted %s objects from remote storage", cnt)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
# Stop all the nodes.
if self.env:
log.info('Cleaning up all storage and compute nodes')
@@ -581,6 +714,8 @@ class NeonEnvBuilder:
sk.stop(immediate=True)
self.env.pageserver.stop(immediate=True)
self.cleanup_remote_storage()
class NeonEnv:
"""
@@ -713,10 +848,13 @@ class NeonEnv:
@pytest.fixture(scope=shareable_scope)
def _shared_simple_env(request: Any,
port_distributor: PortDistributor,
mock_s3_server: MockS3Server,
default_broker: Etcd) -> Iterator[NeonEnv]:
def _shared_simple_env(
request: Any,
port_distributor: PortDistributor,
mock_s3_server: MockS3Server,
default_broker: Etcd,
run_id: uuid.UUID,
) -> Iterator[NeonEnv]:
"""
# Internal fixture backing the `neon_simple_env` fixture. If TEST_SHARED_FIXTURES
is set, this is shared by all tests using `neon_simple_env`.
@@ -730,8 +868,13 @@ def _shared_simple_env(request: Any,
repo_dir = os.path.join(str(top_output_dir), "shared_repo")
shutil.rmtree(repo_dir, ignore_errors=True)
with NeonEnvBuilder(Path(repo_dir), port_distributor, default_broker,
mock_s3_server) as builder:
with NeonEnvBuilder(
repo_dir=Path(repo_dir),
port_distributor=port_distributor,
broker=default_broker,
mock_s3_server=mock_s3_server,
run_id=run_id,
) as builder:
env = builder.init_start()
# For convenience in tests, create a branch from the freshly-initialized cluster.
@@ -756,10 +899,13 @@ def neon_simple_env(_shared_simple_env: NeonEnv) -> Iterator[NeonEnv]:
@pytest.fixture(scope='function')
def neon_env_builder(test_output_dir,
port_distributor: PortDistributor,
mock_s3_server: MockS3Server,
default_broker: Etcd) -> Iterator[NeonEnvBuilder]:
def neon_env_builder(
test_output_dir,
port_distributor: PortDistributor,
mock_s3_server: MockS3Server,
default_broker: Etcd,
run_id: uuid.UUID,
) -> Iterator[NeonEnvBuilder]:
"""
Fixture to create a Neon environment for test.
@@ -777,8 +923,13 @@ def neon_env_builder(test_output_dir,
repo_dir = os.path.join(test_output_dir, "repo")
# Return the builder to the caller
with NeonEnvBuilder(Path(repo_dir), port_distributor, default_broker,
mock_s3_server) as builder:
with NeonEnvBuilder(
repo_dir=Path(repo_dir),
port_distributor=port_distributor,
mock_s3_server=mock_s3_server,
broker=default_broker,
run_id=run_id,
) as builder:
yield builder
@@ -1183,7 +1334,10 @@ class NeonCli(AbstractNeonCli):
remote_storage_users=self.env.remote_storage_users,
pageserver_config_override=self.env.pageserver.config_override)
s3_env_vars = self.env.s3_mock_server.access_env_vars() if self.env.s3_mock_server else None
s3_env_vars = None
if self.env.remote_storage is not None and isinstance(self.env.remote_storage, S3Storage):
s3_env_vars = self.env.remote_storage.access_env_vars()
return self.raw_cli(start_args, extra_env_vars=s3_env_vars)
def pageserver_stop(self, immediate=False) -> 'subprocess.CompletedProcess[str]':
@@ -1195,7 +1349,10 @@ class NeonCli(AbstractNeonCli):
return self.raw_cli(cmd)
def safekeeper_start(self, id: int) -> 'subprocess.CompletedProcess[str]':
s3_env_vars = self.env.s3_mock_server.access_env_vars() if self.env.s3_mock_server else None
s3_env_vars = None
if self.env.remote_storage is not None and isinstance(self.env.remote_storage, S3Storage):
s3_env_vars = self.env.remote_storage.access_env_vars()
return self.raw_cli(['safekeeper', 'start', str(id)], extra_env_vars=s3_env_vars)
def safekeeper_stop(self,
@@ -1337,7 +1494,7 @@ class NeonPageserver(PgProtocol):
return self
def __exit__(self, exc_type, exc, tb):
self.stop(True)
self.stop(immediate=True)
def http_client(self, auth_token: Optional[str] = None) -> NeonPageserverHttpClient:
return NeonPageserverHttpClient(
@@ -1354,6 +1511,7 @@ def append_pageserver_param_overrides(
):
if bool(remote_storage_users & RemoteStorageUsers.PAGESERVER) and remote_storage is not None:
remote_storage_toml_table = remote_storage_to_toml_inline_table(remote_storage)
params_to_update.append(
f'--pageserver-config-override=remote_storage={remote_storage_toml_table}')