mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-16 18:02:56 +00:00
## Problem - Scrubber's `tidy` command requires presence of a control plane - Scrubber has no tests at all ## Summary of changes - Add re-usable async streams for reading metadata from a bucket - Add a `scan-metadata` command that reads from those streams and calls existing `checks.rs` code to validate metadata, then returns a summary struct for the bucket. Command returns nonzero status if errors are found. - Add an `enable_scrub_on_exit()` function to NeonEnvBuilder so that tests using remote storage can request to have the scrubber run after they finish - Enable remote storarge and scrub_on_exit in test_pageserver_restart and test_pageserver_chaos This is a "toe in the water" of the overall space of validating the scrubber. Later, we should: - Enable scrubbing at end of tests using remote storage by default - Make the success condition stricter than "no errors": tests should declare what tenants+timelines they expect to see in the bucket (or sniff these from the functions tests use to create them) and we should require that the scrubber reports on these particular tenants/timelines. The `tidy` command is untouched in this PR, but it should be refactored later to use similar async streaming interface instead of the current batch-reading approach (the streams are faster with large buckets), and to also be covered by some tests. --------- Co-authored-by: Joonas Koivunen <joonas@neon.tech> Co-authored-by: Alexander Bayandin <alexander@neon.tech> Co-authored-by: Christian Schwarz <christian@neon.tech> Co-authored-by: Conrad Ludgate <conrad@neon.tech>
173 lines
5.6 KiB
Python
173 lines
5.6 KiB
Python
import enum
|
|
import json
|
|
import os
|
|
import subprocess
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional, Union
|
|
|
|
from fixtures.log_helper import log
|
|
from fixtures.types import TenantId, TimelineId
|
|
|
|
TIMELINE_INDEX_PART_FILE_NAME = "index_part.json"
|
|
|
|
|
|
class MockS3Server:
|
|
"""
|
|
Starts a mock S3 server for testing on a port given, errors if the server fails to start or exits prematurely.
|
|
Relies that `poetry` and `moto` server are installed, since it's the way the tests are run.
|
|
|
|
Also provides a set of methods to derive the connection properties from and the method to kill the underlying server.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
port: int,
|
|
):
|
|
self.port = port
|
|
|
|
# XXX: do not use `shell=True` or add `exec ` to the command here otherwise.
|
|
# We use `self.subprocess.kill()` to shut down the server, which would not "just" work in Linux
|
|
# if a process is started from the shell process.
|
|
self.subprocess = subprocess.Popen(["poetry", "run", "moto_server", "s3", f"-p{port}"])
|
|
error = None
|
|
try:
|
|
return_code = self.subprocess.poll()
|
|
if return_code is not None:
|
|
error = f"expected mock s3 server to run but it exited with code {return_code}. stdout: '{self.subprocess.stdout}', stderr: '{self.subprocess.stderr}'"
|
|
except Exception as e:
|
|
error = f"expected mock s3 server to start but it failed with exception: {e}. stdout: '{self.subprocess.stdout}', stderr: '{self.subprocess.stderr}'"
|
|
if error is not None:
|
|
log.error(error)
|
|
self.kill()
|
|
raise RuntimeError("failed to start s3 mock server")
|
|
|
|
def endpoint(self) -> str:
|
|
return f"http://127.0.0.1:{self.port}"
|
|
|
|
def region(self) -> str:
|
|
return "us-east-1"
|
|
|
|
def access_key(self) -> str:
|
|
return "test"
|
|
|
|
def secret_key(self) -> str:
|
|
return "test"
|
|
|
|
def kill(self):
|
|
self.subprocess.kill()
|
|
|
|
|
|
@enum.unique
|
|
class RemoteStorageKind(str, enum.Enum):
|
|
LOCAL_FS = "local_fs"
|
|
MOCK_S3 = "mock_s3"
|
|
REAL_S3 = "real_s3"
|
|
# Pass to tests that are generic to remote storage
|
|
# to ensure the test pass with or without the remote storage
|
|
NOOP = "noop"
|
|
|
|
|
|
def available_remote_storages() -> List[RemoteStorageKind]:
|
|
remote_storages = [RemoteStorageKind.LOCAL_FS, RemoteStorageKind.MOCK_S3]
|
|
if os.getenv("ENABLE_REAL_S3_REMOTE_STORAGE") is not None:
|
|
remote_storages.append(RemoteStorageKind.REAL_S3)
|
|
log.info("Enabling real s3 storage for tests")
|
|
else:
|
|
log.info("Using mock implementations to test remote storage")
|
|
return remote_storages
|
|
|
|
|
|
def available_s3_storages() -> List[RemoteStorageKind]:
|
|
remote_storages = [RemoteStorageKind.MOCK_S3]
|
|
if os.getenv("ENABLE_REAL_S3_REMOTE_STORAGE") is not None:
|
|
remote_storages.append(RemoteStorageKind.REAL_S3)
|
|
log.info("Enabling real s3 storage for tests")
|
|
else:
|
|
log.info("Using mock implementations to test remote storage")
|
|
return remote_storages
|
|
|
|
|
|
def s3_storage() -> RemoteStorageKind:
|
|
"""
|
|
For tests that require a remote storage impl that exposes an S3
|
|
endpoint, but don't want to parametrize over multiple storage types.
|
|
|
|
Use real S3 if available, else use MockS3
|
|
"""
|
|
if os.getenv("ENABLE_REAL_S3_REMOTE_STORAGE") is not None:
|
|
return RemoteStorageKind.REAL_S3
|
|
else:
|
|
return RemoteStorageKind.MOCK_S3
|
|
|
|
|
|
@dataclass
|
|
class LocalFsStorage:
|
|
root: Path
|
|
|
|
def tenant_path(self, tenant_id: TenantId) -> Path:
|
|
return self.root / "tenants" / str(tenant_id)
|
|
|
|
def timeline_path(self, tenant_id: TenantId, timeline_id: TimelineId) -> Path:
|
|
return self.tenant_path(tenant_id) / "timelines" / str(timeline_id)
|
|
|
|
def index_path(self, tenant_id: TenantId, timeline_id: TimelineId) -> Path:
|
|
return self.timeline_path(tenant_id, timeline_id) / TIMELINE_INDEX_PART_FILE_NAME
|
|
|
|
def index_content(self, tenant_id: TenantId, timeline_id: TimelineId):
|
|
with self.index_path(tenant_id, timeline_id).open("r") as f:
|
|
return json.load(f)
|
|
|
|
|
|
@dataclass
|
|
class S3Storage:
|
|
bucket_name: str
|
|
bucket_region: str
|
|
access_key: str
|
|
secret_key: str
|
|
endpoint: Optional[str] = None
|
|
prefix_in_bucket: Optional[str] = ""
|
|
|
|
def access_env_vars(self) -> Dict[str, str]:
|
|
return {
|
|
"AWS_ACCESS_KEY_ID": self.access_key,
|
|
"AWS_SECRET_ACCESS_KEY": self.secret_key,
|
|
}
|
|
|
|
def to_string(self) -> str:
|
|
return json.dumps(
|
|
{
|
|
"bucket": self.bucket_name,
|
|
"region": self.bucket_region,
|
|
"endpoint": self.endpoint,
|
|
"prefix": self.prefix_in_bucket,
|
|
}
|
|
)
|
|
|
|
|
|
RemoteStorage = Union[LocalFsStorage, S3Storage]
|
|
|
|
|
|
# serialize as toml inline table
|
|
def remote_storage_to_toml_inline_table(remote_storage: RemoteStorage) -> str:
|
|
if isinstance(remote_storage, LocalFsStorage):
|
|
remote_storage_config = f"local_path='{remote_storage.root}'"
|
|
elif isinstance(remote_storage, S3Storage):
|
|
remote_storage_config = f"bucket_name='{remote_storage.bucket_name}',\
|
|
bucket_region='{remote_storage.bucket_region}'"
|
|
|
|
if remote_storage.prefix_in_bucket is not None:
|
|
remote_storage_config += f",prefix_in_bucket='{remote_storage.prefix_in_bucket}'"
|
|
|
|
if remote_storage.endpoint is not None:
|
|
remote_storage_config += f",endpoint='{remote_storage.endpoint}'"
|
|
else:
|
|
raise Exception("invalid remote storage type")
|
|
|
|
return f"{{{remote_storage_config}}}"
|
|
|
|
|
|
class RemoteStorageUsers(enum.Flag):
|
|
PAGESERVER = enum.auto()
|
|
SAFEKEEPER = enum.auto()
|