mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-14 17:02:56 +00:00
Rather temporary solution before proper: https://github.com/neondatabase/neon/issues/5006 It requires more plumbing so lets not attach deleted tenants first and then implement resume. Additionally fix `assert_prefix_empty`. It had a buggy prefix calculation, and since we always asserted for absence of stuff it worked. Here I started to assert for presence of stuff too and it failed. Added more "presence" asserts to other places to be confident that it works. Resolves [#5016](https://github.com/neondatabase/neon/issues/5016)
160 lines
5.3 KiB
Python
160 lines
5.3 KiB
Python
import enum
|
|
import json
|
|
import os
|
|
import subprocess
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional, Union
|
|
|
|
from fixtures.log_helper import log
|
|
from fixtures.types import TenantId, TimelineId
|
|
|
|
TIMELINE_INDEX_PART_FILE_NAME = "index_part.json"
|
|
|
|
|
|
class MockS3Server:
|
|
"""
|
|
Starts a mock S3 server for testing on a port given, errors if the server fails to start or exits prematurely.
|
|
Relies that `poetry` and `moto` server are installed, since it's the way the tests are run.
|
|
|
|
Also provides a set of methods to derive the connection properties from and the method to kill the underlying server.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
port: int,
|
|
):
|
|
self.port = port
|
|
|
|
# XXX: do not use `shell=True` or add `exec ` to the command here otherwise.
|
|
# We use `self.subprocess.kill()` to shut down the server, which would not "just" work in Linux
|
|
# if a process is started from the shell process.
|
|
self.subprocess = subprocess.Popen(["poetry", "run", "moto_server", "s3", f"-p{port}"])
|
|
error = None
|
|
try:
|
|
return_code = self.subprocess.poll()
|
|
if return_code is not None:
|
|
error = f"expected mock s3 server to run but it exited with code {return_code}. stdout: '{self.subprocess.stdout}', stderr: '{self.subprocess.stderr}'"
|
|
except Exception as e:
|
|
error = f"expected mock s3 server to start but it failed with exception: {e}. stdout: '{self.subprocess.stdout}', stderr: '{self.subprocess.stderr}'"
|
|
if error is not None:
|
|
log.error(error)
|
|
self.kill()
|
|
raise RuntimeError("failed to start s3 mock server")
|
|
|
|
def endpoint(self) -> str:
|
|
return f"http://127.0.0.1:{self.port}"
|
|
|
|
def region(self) -> str:
|
|
return "us-east-1"
|
|
|
|
def access_key(self) -> str:
|
|
return "test"
|
|
|
|
def secret_key(self) -> str:
|
|
return "test"
|
|
|
|
def kill(self):
|
|
self.subprocess.kill()
|
|
|
|
|
|
@enum.unique
|
|
class RemoteStorageKind(str, enum.Enum):
|
|
LOCAL_FS = "local_fs"
|
|
MOCK_S3 = "mock_s3"
|
|
REAL_S3 = "real_s3"
|
|
# Pass to tests that are generic to remote storage
|
|
# to ensure the test pass with or without the remote storage
|
|
NOOP = "noop"
|
|
|
|
|
|
def available_remote_storages() -> List[RemoteStorageKind]:
|
|
remote_storages = [RemoteStorageKind.LOCAL_FS, RemoteStorageKind.MOCK_S3]
|
|
if os.getenv("ENABLE_REAL_S3_REMOTE_STORAGE") is not None:
|
|
remote_storages.append(RemoteStorageKind.REAL_S3)
|
|
log.info("Enabling real s3 storage for tests")
|
|
else:
|
|
log.info("Using mock implementations to test remote storage")
|
|
return remote_storages
|
|
|
|
|
|
def available_s3_storages() -> List[RemoteStorageKind]:
|
|
remote_storages = [RemoteStorageKind.MOCK_S3]
|
|
if os.getenv("ENABLE_REAL_S3_REMOTE_STORAGE") is not None:
|
|
remote_storages.append(RemoteStorageKind.REAL_S3)
|
|
log.info("Enabling real s3 storage for tests")
|
|
else:
|
|
log.info("Using mock implementations to test remote storage")
|
|
return remote_storages
|
|
|
|
|
|
@dataclass
|
|
class LocalFsStorage:
|
|
root: Path
|
|
|
|
def tenant_path(self, tenant_id: TenantId) -> Path:
|
|
return self.root / "tenants" / str(tenant_id)
|
|
|
|
def timeline_path(self, tenant_id: TenantId, timeline_id: TimelineId) -> Path:
|
|
return self.tenant_path(tenant_id) / "timelines" / str(timeline_id)
|
|
|
|
def index_path(self, tenant_id: TenantId, timeline_id: TimelineId) -> Path:
|
|
return self.timeline_path(tenant_id, timeline_id) / TIMELINE_INDEX_PART_FILE_NAME
|
|
|
|
def index_content(self, tenant_id: TenantId, timeline_id: TimelineId):
|
|
with self.index_path(tenant_id, timeline_id).open("r") as f:
|
|
return json.load(f)
|
|
|
|
|
|
@dataclass
|
|
class S3Storage:
|
|
bucket_name: str
|
|
bucket_region: str
|
|
access_key: str
|
|
secret_key: str
|
|
endpoint: Optional[str] = None
|
|
prefix_in_bucket: Optional[str] = ""
|
|
|
|
def access_env_vars(self) -> Dict[str, str]:
|
|
return {
|
|
"AWS_ACCESS_KEY_ID": self.access_key,
|
|
"AWS_SECRET_ACCESS_KEY": self.secret_key,
|
|
}
|
|
|
|
def to_string(self) -> str:
|
|
return json.dumps(
|
|
{
|
|
"bucket": self.bucket_name,
|
|
"region": self.bucket_region,
|
|
"endpoint": self.endpoint,
|
|
"prefix": self.prefix_in_bucket,
|
|
}
|
|
)
|
|
|
|
|
|
RemoteStorage = Union[LocalFsStorage, S3Storage]
|
|
|
|
|
|
# serialize as toml inline table
|
|
def remote_storage_to_toml_inline_table(remote_storage: RemoteStorage) -> str:
|
|
if isinstance(remote_storage, LocalFsStorage):
|
|
remote_storage_config = f"local_path='{remote_storage.root}'"
|
|
elif isinstance(remote_storage, S3Storage):
|
|
remote_storage_config = f"bucket_name='{remote_storage.bucket_name}',\
|
|
bucket_region='{remote_storage.bucket_region}'"
|
|
|
|
if remote_storage.prefix_in_bucket is not None:
|
|
remote_storage_config += f",prefix_in_bucket='{remote_storage.prefix_in_bucket}'"
|
|
|
|
if remote_storage.endpoint is not None:
|
|
remote_storage_config += f",endpoint='{remote_storage.endpoint}'"
|
|
else:
|
|
raise Exception("invalid remote storage type")
|
|
|
|
return f"{{{remote_storage_config}}}"
|
|
|
|
|
|
class RemoteStorageUsers(enum.Flag):
|
|
PAGESERVER = enum.auto()
|
|
SAFEKEEPER = enum.auto()
|