diff --git a/test_runner/fixtures/neon_cli.py b/test_runner/fixtures/neon_cli.py new file mode 100644 index 0000000000..13d6fd746b --- /dev/null +++ b/test_runner/fixtures/neon_cli.py @@ -0,0 +1,640 @@ +from __future__ import annotations + +import abc +import json +import os +import re +import subprocess +import tempfile +import textwrap +from itertools import chain, product +from pathlib import Path +from typing import ( + Any, + Dict, + List, + Optional, + Tuple, + TypeVar, + cast, +) + +import toml + +from fixtures.common_types import Lsn, TenantId, TimelineId +from fixtures.log_helper import log +from fixtures.pageserver.common_types import IndexPartDump +from fixtures.pg_version import PgVersion +from fixtures.utils import AuxFileStore + +T = TypeVar("T") + + +class AbstractNeonCli(abc.ABC): + """ + A typed wrapper around an arbitrary Neon CLI tool. + Supports a way to run arbitrary command directly via CLI. + Do not use directly, use specific subclasses instead. + """ + + def __init__(self, extra_env: Optional[Dict[str, str]], binpath: Path): + self.extra_env = extra_env + self.binpath = binpath + + COMMAND: str = cast(str, None) # To be overwritten by the derived class. + + def raw_cli( + self, + arguments: List[str], + extra_env_vars: Optional[Dict[str, str]] = None, + check_return_code=True, + timeout=None, + ) -> "subprocess.CompletedProcess[str]": + """ + Run the command with the specified arguments. + + Arguments must be in list form, e.g. ['endpoint', 'create'] + + Return both stdout and stderr, which can be accessed as + + >>> result = env.neon_cli.raw_cli(...) + >>> assert result.stderr == "" + >>> log.info(result.stdout) + + If `check_return_code`, on non-zero exit code logs failure and raises. + """ + + assert isinstance(arguments, list) + assert isinstance(self.COMMAND, str) + + command_path = str(self.binpath / self.COMMAND) + + args = [command_path] + arguments + log.info('Running command "{}"'.format(" ".join(args))) + + env_vars = os.environ.copy() + + # extra env + for extra_env_key, extra_env_value in (self.extra_env or {}).items(): + env_vars[extra_env_key] = extra_env_value + for extra_env_key, extra_env_value in (extra_env_vars or {}).items(): + env_vars[extra_env_key] = extra_env_value + + # Pass through coverage settings + var = "LLVM_PROFILE_FILE" + val = os.environ.get(var) + if val: + env_vars[var] = val + + # Intercept CalledProcessError and print more info + try: + res = subprocess.run( + args, + env=env_vars, + check=False, + universal_newlines=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + timeout=timeout, + ) + except subprocess.TimeoutExpired as e: + if e.stderr: + stderr = e.stderr.decode(errors="replace") + else: + stderr = "" + + if e.stdout: + stdout = e.stdout.decode(errors="replace") + else: + stdout = "" + + log.warn(f"CLI timeout: stderr={stderr}, stdout={stdout}") + raise + + indent = " " + if not res.returncode: + stripped = res.stdout.strip() + lines = stripped.splitlines() + if len(lines) < 2: + log.debug(f"Run {res.args} success: {stripped}") + else: + log.debug("Run %s success:\n%s" % (res.args, textwrap.indent(stripped, indent))) + elif check_return_code: + # this way command output will be in recorded and shown in CI in failure message + indent = indent * 2 + msg = textwrap.dedent( + """\ + Run %s failed: + stdout: + %s + stderr: + %s + """ + ) + msg = msg % ( + res.args, + textwrap.indent(res.stdout.strip(), indent), + textwrap.indent(res.stderr.strip(), indent), + ) + log.info(msg) + raise RuntimeError(msg) from subprocess.CalledProcessError( + res.returncode, res.args, res.stdout, res.stderr + ) + return res + + +class NeonLocalCli(AbstractNeonCli): + """ + A typed wrapper around the `neon_local` CLI tool. + Supports main commands via typed methods and a way to run arbitrary command directly via CLI. + """ + + COMMAND = "neon_local" + + def __init__( + self, + extra_env: Optional[Dict[str, str]], + binpath: Path, + repo_dir: Path, + pg_distrib_dir: Path, + ): + if extra_env is None: + env_vars = {} + else: + env_vars = extra_env.copy() + env_vars["NEON_REPO_DIR"] = str(repo_dir) + env_vars["POSTGRES_DISTRIB_DIR"] = str(pg_distrib_dir) + + super().__init__(env_vars, binpath) + + def raw_cli(self, *args, **kwargs) -> subprocess.CompletedProcess[str]: + return super().raw_cli(*args, **kwargs) + + def create_tenant( + self, + tenant_id: TenantId, + timeline_id: TimelineId, + pg_version: PgVersion, + conf: Optional[Dict[str, Any]] = None, + shard_count: Optional[int] = None, + shard_stripe_size: Optional[int] = None, + placement_policy: Optional[str] = None, + set_default: bool = False, + aux_file_policy: Optional[AuxFileStore] = None, + ): + """ + Creates a new tenant, returns its id and its initial timeline's id. + """ + args = [ + "tenant", + "create", + "--tenant-id", + str(tenant_id), + "--timeline-id", + str(timeline_id), + "--pg-version", + pg_version, + ] + if conf is not None: + args.extend( + chain.from_iterable( + product(["-c"], (f"{key}:{value}" for key, value in conf.items())) + ) + ) + + if aux_file_policy is AuxFileStore.V2: + args.extend(["-c", "switch_aux_file_policy:v2"]) + elif aux_file_policy is AuxFileStore.V1: + args.extend(["-c", "switch_aux_file_policy:v1"]) + elif aux_file_policy is AuxFileStore.CrossValidation: + args.extend(["-c", "switch_aux_file_policy:cross-validation"]) + + if set_default: + args.append("--set-default") + + if shard_count is not None: + args.extend(["--shard-count", str(shard_count)]) + + if shard_stripe_size is not None: + args.extend(["--shard-stripe-size", str(shard_stripe_size)]) + + if placement_policy is not None: + args.extend(["--placement-policy", str(placement_policy)]) + + res = self.raw_cli(args) + res.check_returncode() + + def import_tenant(self, tenant_id: TenantId): + args = ["tenant", "import", "--tenant-id", str(tenant_id)] + res = self.raw_cli(args) + res.check_returncode() + + def set_default(self, tenant_id: TenantId): + """ + Update default tenant for future operations that require tenant_id. + """ + res = self.raw_cli(["tenant", "set-default", "--tenant-id", str(tenant_id)]) + res.check_returncode() + + def config_tenant(self, tenant_id: TenantId, conf: Dict[str, str]): + """ + Update tenant config. + """ + + args = ["tenant", "config", "--tenant-id", str(tenant_id)] + if conf is not None: + args.extend( + chain.from_iterable( + product(["-c"], (f"{key}:{value}" for key, value in conf.items())) + ) + ) + + res = self.raw_cli(args) + res.check_returncode() + + def list_tenants(self) -> "subprocess.CompletedProcess[str]": + res = self.raw_cli(["tenant", "list"]) + res.check_returncode() + return res + + def create_timeline( + self, + new_branch_name: str, + tenant_id: TenantId, + timeline_id: TimelineId, + pg_version: PgVersion, + ) -> TimelineId: + if timeline_id is None: + timeline_id = TimelineId.generate() + + cmd = [ + "timeline", + "create", + "--branch-name", + new_branch_name, + "--tenant-id", + str(tenant_id), + "--timeline-id", + str(timeline_id), + "--pg-version", + pg_version, + ] + + res = self.raw_cli(cmd) + res.check_returncode() + + return timeline_id + + def create_branch( + self, + tenant_id: TenantId, + timeline_id: TimelineId, + new_branch_name, + ancestor_branch_name: Optional[str] = None, + ancestor_start_lsn: Optional[Lsn] = None, + ): + cmd = [ + "timeline", + "branch", + "--branch-name", + new_branch_name, + "--timeline-id", + str(timeline_id), + "--tenant-id", + str(tenant_id), + ] + if ancestor_branch_name is not None: + cmd.extend(["--ancestor-branch-name", ancestor_branch_name]) + if ancestor_start_lsn is not None: + cmd.extend(["--ancestor-start-lsn", str(ancestor_start_lsn)]) + + res = self.raw_cli(cmd) + res.check_returncode() + + def timeline_import( + self, + tenant_id: TenantId, + timeline_id: TimelineId, + new_branch_name: str, + base_lsn: Lsn, + base_tarfile: Path, + pg_version: PgVersion, + end_lsn: Optional[Lsn] = None, + wal_tarfile: Optional[Path] = None, + ): + cmd = [ + "timeline", + "import", + "--tenant-id", + str(tenant_id), + "--timeline-id", + str(timeline_id), + "--pg-version", + pg_version, + "--branch-name", + new_branch_name, + "--base-lsn", + str(base_lsn), + "--base-tarfile", + str(base_tarfile), + ] + if end_lsn is not None: + cmd.extend(["--end-lsn", str(end_lsn)]) + if wal_tarfile is not None: + cmd.extend(["--wal-tarfile", str(wal_tarfile)]) + + res = self.raw_cli(cmd) + res.check_returncode() + + def list_timelines(self, tenant_id: TenantId) -> List[Tuple[str, TimelineId]]: + """ + Returns a list of (branch_name, timeline_id) tuples out of parsed `neon timeline list` CLI output. + """ + + # main [b49f7954224a0ad25cc0013ea107b54b] + # ┣━ @0/16B5A50: test_cli_branch_list_main [20f98c79111b9015d84452258b7d5540] + TIMELINE_DATA_EXTRACTOR: re.Pattern = re.compile( # type: ignore[type-arg] + r"\s?(?P[^\s]+)\s\[(?P[^\]]+)\]", re.MULTILINE + ) + res = self.raw_cli(["timeline", "list", "--tenant-id", str(tenant_id)]) + timelines_cli = sorted( + map( + lambda branch_and_id: (branch_and_id[0], TimelineId(branch_and_id[1])), + TIMELINE_DATA_EXTRACTOR.findall(res.stdout), + ) + ) + return timelines_cli + + def init( + self, + init_config: Dict[str, Any], + force: Optional[str] = None, + ) -> "subprocess.CompletedProcess[str]": + with tempfile.NamedTemporaryFile(mode="w+") as init_config_tmpfile: + init_config_tmpfile.write(toml.dumps(init_config)) + init_config_tmpfile.flush() + + cmd = [ + "init", + f"--config={init_config_tmpfile.name}", + ] + + if force is not None: + cmd.extend(["--force", force]) + + res = self.raw_cli(cmd) + res.check_returncode() + return res + + def storage_controller_start( + self, + timeout_in_seconds: Optional[int] = None, + instance_id: Optional[int] = None, + base_port: Optional[int] = None, + ): + cmd = ["storage_controller", "start"] + if timeout_in_seconds is not None: + cmd.append(f"--start-timeout={timeout_in_seconds}s") + if instance_id is not None: + cmd.append(f"--instance-id={instance_id}") + if base_port is not None: + cmd.append(f"--base-port={base_port}") + return self.raw_cli(cmd) + + def storage_controller_stop(self, immediate: bool, instance_id: Optional[int] = None): + cmd = ["storage_controller", "stop"] + if immediate: + cmd.extend(["-m", "immediate"]) + if instance_id is not None: + cmd.append(f"--instance-id={instance_id}") + return self.raw_cli(cmd) + + def pageserver_start( + self, + id: int, + extra_env_vars: Optional[Dict[str, str]] = None, + timeout_in_seconds: Optional[int] = None, + ) -> "subprocess.CompletedProcess[str]": + start_args = ["pageserver", "start", f"--id={id}"] + if timeout_in_seconds is not None: + start_args.append(f"--start-timeout={timeout_in_seconds}s") + return self.raw_cli(start_args, extra_env_vars=extra_env_vars) + + def pageserver_stop(self, id: int, immediate=False) -> "subprocess.CompletedProcess[str]": + cmd = ["pageserver", "stop", f"--id={id}"] + if immediate: + cmd.extend(["-m", "immediate"]) + + log.info(f"Stopping pageserver with {cmd}") + return self.raw_cli(cmd) + + def safekeeper_start( + self, + id: int, + extra_opts: Optional[List[str]] = None, + extra_env_vars: Optional[Dict[str, str]] = None, + timeout_in_seconds: Optional[int] = None, + ) -> "subprocess.CompletedProcess[str]": + if extra_opts is not None: + extra_opts = [f"-e={opt}" for opt in extra_opts] + else: + extra_opts = [] + if timeout_in_seconds is not None: + extra_opts.append(f"--start-timeout={timeout_in_seconds}s") + return self.raw_cli( + ["safekeeper", "start", str(id), *extra_opts], extra_env_vars=extra_env_vars + ) + + def safekeeper_stop( + self, id: Optional[int] = None, immediate=False + ) -> "subprocess.CompletedProcess[str]": + args = ["safekeeper", "stop"] + if id is not None: + args.append(str(id)) + if immediate: + args.extend(["-m", "immediate"]) + return self.raw_cli(args) + + def broker_start( + self, timeout_in_seconds: Optional[int] = None + ) -> "subprocess.CompletedProcess[str]": + cmd = ["storage_broker", "start"] + if timeout_in_seconds is not None: + cmd.append(f"--start-timeout={timeout_in_seconds}s") + return self.raw_cli(cmd) + + def broker_stop(self) -> "subprocess.CompletedProcess[str]": + cmd = ["storage_broker", "stop"] + return self.raw_cli(cmd) + + def endpoint_create( + self, + branch_name: str, + pg_port: int, + http_port: int, + tenant_id: TenantId, + pg_version: PgVersion, + endpoint_id: Optional[str] = None, + hot_standby: bool = False, + lsn: Optional[Lsn] = None, + pageserver_id: Optional[int] = None, + allow_multiple=False, + ) -> "subprocess.CompletedProcess[str]": + args = [ + "endpoint", + "create", + "--tenant-id", + str(tenant_id), + "--branch-name", + branch_name, + "--pg-version", + pg_version, + ] + if lsn is not None: + args.extend(["--lsn", str(lsn)]) + if pg_port is not None: + args.extend(["--pg-port", str(pg_port)]) + if http_port is not None: + args.extend(["--http-port", str(http_port)]) + if endpoint_id is not None: + args.append(endpoint_id) + if hot_standby: + args.extend(["--hot-standby", "true"]) + if pageserver_id is not None: + args.extend(["--pageserver-id", str(pageserver_id)]) + if allow_multiple: + args.extend(["--allow-multiple"]) + + res = self.raw_cli(args) + res.check_returncode() + return res + + def endpoint_start( + self, + endpoint_id: str, + safekeepers: Optional[List[int]] = None, + remote_ext_config: Optional[str] = None, + pageserver_id: Optional[int] = None, + allow_multiple=False, + basebackup_request_tries: Optional[int] = None, + ) -> "subprocess.CompletedProcess[str]": + args = [ + "endpoint", + "start", + ] + extra_env_vars = {} + if basebackup_request_tries is not None: + extra_env_vars["NEON_COMPUTE_TESTING_BASEBACKUP_TRIES"] = str(basebackup_request_tries) + if remote_ext_config is not None: + args.extend(["--remote-ext-config", remote_ext_config]) + + if safekeepers is not None: + args.extend(["--safekeepers", (",".join(map(str, safekeepers)))]) + if endpoint_id is not None: + args.append(endpoint_id) + if pageserver_id is not None: + args.extend(["--pageserver-id", str(pageserver_id)]) + if allow_multiple: + args.extend(["--allow-multiple"]) + + res = self.raw_cli(args, extra_env_vars) + res.check_returncode() + return res + + def endpoint_reconfigure( + self, + endpoint_id: str, + tenant_id: Optional[TenantId] = None, + pageserver_id: Optional[int] = None, + safekeepers: Optional[List[int]] = None, + check_return_code=True, + ) -> "subprocess.CompletedProcess[str]": + args = ["endpoint", "reconfigure", endpoint_id] + if tenant_id is not None: + args.extend(["--tenant-id", str(tenant_id)]) + if pageserver_id is not None: + args.extend(["--pageserver-id", str(pageserver_id)]) + if safekeepers is not None: + args.extend(["--safekeepers", (",".join(map(str, safekeepers)))]) + return self.raw_cli(args, check_return_code=check_return_code) + + def endpoint_stop( + self, + endpoint_id: str, + destroy=False, + check_return_code=True, + mode: Optional[str] = None, + ) -> "subprocess.CompletedProcess[str]": + args = [ + "endpoint", + "stop", + ] + if destroy: + args.append("--destroy") + if mode is not None: + args.append(f"--mode={mode}") + if endpoint_id is not None: + args.append(endpoint_id) + + return self.raw_cli(args, check_return_code=check_return_code) + + def map_branch( + self, name: str, tenant_id: TenantId, timeline_id: TimelineId + ) -> "subprocess.CompletedProcess[str]": + """ + Map tenant id and timeline id to a neon_local branch name. They do not have to exist. + Usually needed when creating branches via PageserverHttpClient and not neon_local. + + After creating a name mapping, you can use EndpointFactory.create_start + with this registered branch name. + """ + args = [ + "mappings", + "map", + "--branch-name", + name, + "--tenant-id", + str(tenant_id), + "--timeline-id", + str(timeline_id), + ] + + return self.raw_cli(args, check_return_code=True) + + def start(self, check_return_code=True) -> "subprocess.CompletedProcess[str]": + return self.raw_cli(["start"], check_return_code=check_return_code) + + def stop(self, check_return_code=True) -> "subprocess.CompletedProcess[str]": + return self.raw_cli(["stop"], check_return_code=check_return_code) + + +class WalCraft(AbstractNeonCli): + """ + A typed wrapper around the `wal_craft` CLI tool. + Supports main commands via typed methods and a way to run arbitrary command directly via CLI. + """ + + COMMAND = "wal_craft" + + def postgres_config(self) -> List[str]: + res = self.raw_cli(["print-postgres-config"]) + res.check_returncode() + return res.stdout.split("\n") + + def in_existing(self, type: str, connection: str) -> None: + res = self.raw_cli(["in-existing", type, connection]) + res.check_returncode() + + +class Pagectl(AbstractNeonCli): + """ + A typed wrapper around the `pagectl` utility CLI tool. + """ + + COMMAND = "pagectl" + + def dump_index_part(self, path: Path) -> IndexPartDump: + res = self.raw_cli(["index-part", "dump", str(path)]) + res.check_returncode() + parsed = json.loads(res.stdout) + return IndexPartDump.from_json(parsed) diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 419208cadf..c2c1b6c100 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -9,8 +9,6 @@ import os import re import shutil import subprocess -import tempfile -import textwrap import threading import time import uuid @@ -21,7 +19,6 @@ from datetime import datetime from enum import Enum from fcntl import LOCK_EX, LOCK_UN, flock from functools import cached_property -from itertools import chain, product from pathlib import Path from types import TracebackType from typing import ( @@ -64,11 +61,12 @@ from fixtures.common_types import Lsn, NodeId, TenantId, TenantShardId, Timeline from fixtures.endpoint.http import EndpointHttpClient from fixtures.log_helper import log from fixtures.metrics import Metrics, MetricsGetter, parse_metrics +from fixtures.neon_cli import NeonLocalCli, Pagectl from fixtures.pageserver.allowed_errors import ( DEFAULT_PAGESERVER_ALLOWED_ERRORS, DEFAULT_STORAGE_CONTROLLER_ALLOWED_ERRORS, ) -from fixtures.pageserver.common_types import IndexPartDump, LayerName, parse_layer_file_name +from fixtures.pageserver.common_types import LayerName, parse_layer_file_name from fixtures.pageserver.http import PageserverHttpClient from fixtures.pageserver.utils import ( wait_for_last_record_lsn, @@ -952,7 +950,7 @@ class NeonEnv: initial_tenant - tenant ID of the initial tenant created in the repository - neon_cli - can be used to run the 'neon' CLI tool + neon_cli - can be used to run the 'neon_local' CLI tool create_tenant() - initializes a new tenant and an initial empty timeline on it, returns the tenant and timeline id @@ -972,8 +970,6 @@ class NeonEnv: self.rust_log_override = config.rust_log_override self.port_distributor = config.port_distributor self.s3_mock_server = config.mock_s3_server - self.neon_cli = NeonCli(env=self) - self.pagectl = Pagectl(env=self) self.endpoints = EndpointFactory(self) self.safekeepers: List[Safekeeper] = [] self.pageservers: List[NeonPageserver] = [] @@ -993,6 +989,21 @@ class NeonEnv: self.initial_tenant = config.initial_tenant self.initial_timeline = config.initial_timeline + neon_local_env_vars = {} + if self.rust_log_override is not None: + neon_local_env_vars["RUST_LOG"] = self.rust_log_override + self.neon_cli = NeonLocalCli( + extra_env=neon_local_env_vars, + binpath=self.neon_local_binpath, + repo_dir=self.repo_dir, + pg_distrib_dir=self.pg_distrib_dir, + ) + + pagectl_env_vars = {} + if self.rust_log_override is not None: + pagectl_env_vars["RUST_LOG"] = self.rust_log_override + self.pagectl = Pagectl(extra_env=pagectl_env_vars, binpath=self.neon_binpath) + # The URL for the pageserver to use as its control_plane_api config if config.storage_controller_port_override is not None: log.info( @@ -1499,592 +1510,6 @@ class PageserverPort: http: int -class AbstractNeonCli(abc.ABC): - """ - A typed wrapper around an arbitrary Neon CLI tool. - Supports a way to run arbitrary command directly via CLI. - Do not use directly, use specific subclasses instead. - """ - - def __init__(self, env: NeonEnv): - self.env = env - - COMMAND: str = cast(str, None) # To be overwritten by the derived class. - - def raw_cli( - self, - arguments: List[str], - extra_env_vars: Optional[Dict[str, str]] = None, - check_return_code=True, - timeout=None, - local_binpath=False, - ) -> "subprocess.CompletedProcess[str]": - """ - Run the command with the specified arguments. - - Arguments must be in list form, e.g. ['pg', 'create'] - - Return both stdout and stderr, which can be accessed as - - >>> result = env.neon_cli.raw_cli(...) - >>> assert result.stderr == "" - >>> log.info(result.stdout) - - If `check_return_code`, on non-zero exit code logs failure and raises. - - If `local_binpath` is true, then we are invoking a test utility - """ - - assert isinstance(arguments, list) - assert isinstance(self.COMMAND, str) - - if local_binpath: - # Test utility - bin_neon = str(self.env.neon_local_binpath / self.COMMAND) - else: - # Normal binary - bin_neon = str(self.env.neon_binpath / self.COMMAND) - - args = [bin_neon] + arguments - log.info('Running command "{}"'.format(" ".join(args))) - - env_vars = os.environ.copy() - env_vars["NEON_REPO_DIR"] = str(self.env.repo_dir) - env_vars["POSTGRES_DISTRIB_DIR"] = str(self.env.pg_distrib_dir) - if self.env.rust_log_override is not None: - env_vars["RUST_LOG"] = self.env.rust_log_override - for extra_env_key, extra_env_value in (extra_env_vars or {}).items(): - env_vars[extra_env_key] = extra_env_value - - # Pass coverage settings - var = "LLVM_PROFILE_FILE" - val = os.environ.get(var) - if val: - env_vars[var] = val - - # Intercept CalledProcessError and print more info - try: - res = subprocess.run( - args, - env=env_vars, - check=False, - universal_newlines=True, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - timeout=timeout, - ) - except subprocess.TimeoutExpired as e: - if e.stderr: - stderr = e.stderr.decode(errors="replace") - else: - stderr = "" - - if e.stdout: - stdout = e.stdout.decode(errors="replace") - else: - stdout = "" - - log.warn(f"CLI timeout: stderr={stderr}, stdout={stdout}") - raise - - indent = " " - if not res.returncode: - stripped = res.stdout.strip() - lines = stripped.splitlines() - if len(lines) < 2: - log.debug(f"Run {res.args} success: {stripped}") - else: - log.debug("Run %s success:\n%s" % (res.args, textwrap.indent(stripped, indent))) - elif check_return_code: - # this way command output will be in recorded and shown in CI in failure message - indent = indent * 2 - msg = textwrap.dedent( - """\ - Run %s failed: - stdout: - %s - stderr: - %s - """ - ) - msg = msg % ( - res.args, - textwrap.indent(res.stdout.strip(), indent), - textwrap.indent(res.stderr.strip(), indent), - ) - log.info(msg) - raise RuntimeError(msg) from subprocess.CalledProcessError( - res.returncode, res.args, res.stdout, res.stderr - ) - return res - - -class NeonCli(AbstractNeonCli): - """ - A typed wrapper around the `neon` CLI tool. - Supports main commands via typed methods and a way to run arbitrary command directly via CLI. - """ - - COMMAND = "neon_local" - - def raw_cli(self, *args, **kwargs) -> subprocess.CompletedProcess[str]: - kwargs["local_binpath"] = True - return super().raw_cli(*args, **kwargs) - - def create_tenant( - self, - tenant_id: TenantId, - timeline_id: TimelineId, - pg_version: PgVersion, - conf: Optional[Dict[str, Any]] = None, - shard_count: Optional[int] = None, - shard_stripe_size: Optional[int] = None, - placement_policy: Optional[str] = None, - set_default: bool = False, - aux_file_policy: Optional[AuxFileStore] = None, - ): - """ - Creates a new tenant, returns its id and its initial timeline's id. - """ - args = [ - "tenant", - "create", - "--tenant-id", - str(tenant_id), - "--timeline-id", - str(timeline_id), - "--pg-version", - pg_version, - ] - if conf is not None: - args.extend( - chain.from_iterable( - product(["-c"], (f"{key}:{value}" for key, value in conf.items())) - ) - ) - - if aux_file_policy is AuxFileStore.V2: - args.extend(["-c", "switch_aux_file_policy:v2"]) - elif aux_file_policy is AuxFileStore.V1: - args.extend(["-c", "switch_aux_file_policy:v1"]) - elif aux_file_policy is AuxFileStore.CrossValidation: - args.extend(["-c", "switch_aux_file_policy:cross-validation"]) - - if set_default: - args.append("--set-default") - - if shard_count is not None: - args.extend(["--shard-count", str(shard_count)]) - - if shard_stripe_size is not None: - args.extend(["--shard-stripe-size", str(shard_stripe_size)]) - - if placement_policy is not None: - args.extend(["--placement-policy", str(placement_policy)]) - - res = self.raw_cli(args) - res.check_returncode() - - def import_tenant(self, tenant_id: TenantId): - args = ["tenant", "import", "--tenant-id", str(tenant_id)] - res = self.raw_cli(args) - res.check_returncode() - - def set_default(self, tenant_id: TenantId): - """ - Update default tenant for future operations that require tenant_id. - """ - res = self.raw_cli(["tenant", "set-default", "--tenant-id", str(tenant_id)]) - res.check_returncode() - - def config_tenant(self, tenant_id: TenantId, conf: Dict[str, str]): - """ - Update tenant config. - """ - - args = ["tenant", "config", "--tenant-id", str(tenant_id)] - if conf is not None: - args.extend( - chain.from_iterable( - product(["-c"], (f"{key}:{value}" for key, value in conf.items())) - ) - ) - - res = self.raw_cli(args) - res.check_returncode() - - def list_tenants(self) -> "subprocess.CompletedProcess[str]": - res = self.raw_cli(["tenant", "list"]) - res.check_returncode() - return res - - def create_timeline( - self, - new_branch_name: str, - tenant_id: TenantId, - timeline_id: TimelineId, - pg_version: PgVersion, - ) -> TimelineId: - if timeline_id is None: - timeline_id = TimelineId.generate() - - cmd = [ - "timeline", - "create", - "--branch-name", - new_branch_name, - "--tenant-id", - str(tenant_id), - "--timeline-id", - str(timeline_id), - "--pg-version", - pg_version, - ] - - res = self.raw_cli(cmd) - res.check_returncode() - - return timeline_id - - def create_branch( - self, - tenant_id: TenantId, - timeline_id: TimelineId, - new_branch_name: str = DEFAULT_BRANCH_NAME, - ancestor_branch_name: Optional[str] = None, - ancestor_start_lsn: Optional[Lsn] = None, - ): - cmd = [ - "timeline", - "branch", - "--branch-name", - new_branch_name, - "--timeline-id", - str(timeline_id), - "--tenant-id", - str(tenant_id), - ] - if ancestor_branch_name is not None: - cmd.extend(["--ancestor-branch-name", ancestor_branch_name]) - if ancestor_start_lsn is not None: - cmd.extend(["--ancestor-start-lsn", str(ancestor_start_lsn)]) - - res = self.raw_cli(cmd) - res.check_returncode() - - def list_timelines(self, tenant_id: Optional[TenantId] = None) -> List[Tuple[str, TimelineId]]: - """ - Returns a list of (branch_name, timeline_id) tuples out of parsed `neon timeline list` CLI output. - """ - - # main [b49f7954224a0ad25cc0013ea107b54b] - # ┣━ @0/16B5A50: test_cli_branch_list_main [20f98c79111b9015d84452258b7d5540] - TIMELINE_DATA_EXTRACTOR: re.Pattern = re.compile( # type: ignore[type-arg] - r"\s?(?P[^\s]+)\s\[(?P[^\]]+)\]", re.MULTILINE - ) - res = self.raw_cli( - ["timeline", "list", "--tenant-id", str(tenant_id or self.env.initial_tenant)] - ) - timelines_cli = sorted( - map( - lambda branch_and_id: (branch_and_id[0], TimelineId(branch_and_id[1])), - TIMELINE_DATA_EXTRACTOR.findall(res.stdout), - ) - ) - return timelines_cli - - def init( - self, - init_config: Dict[str, Any], - force: Optional[str] = None, - ) -> "subprocess.CompletedProcess[str]": - with tempfile.NamedTemporaryFile(mode="w+") as init_config_tmpfile: - init_config_tmpfile.write(toml.dumps(init_config)) - init_config_tmpfile.flush() - - cmd = [ - "init", - f"--config={init_config_tmpfile.name}", - ] - - if force is not None: - cmd.extend(["--force", force]) - - res = self.raw_cli(cmd) - res.check_returncode() - return res - - def storage_controller_start( - self, - timeout_in_seconds: Optional[int] = None, - instance_id: Optional[int] = None, - base_port: Optional[int] = None, - ): - cmd = ["storage_controller", "start"] - if timeout_in_seconds is not None: - cmd.append(f"--start-timeout={timeout_in_seconds}s") - if instance_id is not None: - cmd.append(f"--instance-id={instance_id}") - if base_port is not None: - cmd.append(f"--base-port={base_port}") - return self.raw_cli(cmd) - - def storage_controller_stop(self, immediate: bool, instance_id: Optional[int] = None): - cmd = ["storage_controller", "stop"] - if immediate: - cmd.extend(["-m", "immediate"]) - if instance_id is not None: - cmd.append(f"--instance-id={instance_id}") - return self.raw_cli(cmd) - - def pageserver_start( - self, - id: int, - extra_env_vars: Optional[Dict[str, str]] = None, - timeout_in_seconds: Optional[int] = None, - ) -> "subprocess.CompletedProcess[str]": - start_args = ["pageserver", "start", f"--id={id}"] - if timeout_in_seconds is not None: - start_args.append(f"--start-timeout={timeout_in_seconds}s") - storage = self.env.pageserver_remote_storage - - if isinstance(storage, S3Storage): - s3_env_vars = storage.access_env_vars() - extra_env_vars = (extra_env_vars or {}) | s3_env_vars - - return self.raw_cli(start_args, extra_env_vars=extra_env_vars) - - def pageserver_stop(self, id: int, immediate=False) -> "subprocess.CompletedProcess[str]": - cmd = ["pageserver", "stop", f"--id={id}"] - if immediate: - cmd.extend(["-m", "immediate"]) - - log.info(f"Stopping pageserver with {cmd}") - return self.raw_cli(cmd) - - def safekeeper_start( - self, - id: int, - extra_opts: Optional[List[str]] = None, - timeout_in_seconds: Optional[int] = None, - ) -> "subprocess.CompletedProcess[str]": - s3_env_vars = None - if isinstance(self.env.safekeepers_remote_storage, S3Storage): - s3_env_vars = self.env.safekeepers_remote_storage.access_env_vars() - - if extra_opts is not None: - extra_opts = [f"-e={opt}" for opt in extra_opts] - else: - extra_opts = [] - if timeout_in_seconds is not None: - extra_opts.append(f"--start-timeout={timeout_in_seconds}s") - return self.raw_cli( - ["safekeeper", "start", str(id), *extra_opts], extra_env_vars=s3_env_vars - ) - - def safekeeper_stop( - self, id: Optional[int] = None, immediate=False - ) -> "subprocess.CompletedProcess[str]": - args = ["safekeeper", "stop"] - if id is not None: - args.append(str(id)) - if immediate: - args.extend(["-m", "immediate"]) - return self.raw_cli(args) - - def broker_start( - self, timeout_in_seconds: Optional[int] = None - ) -> "subprocess.CompletedProcess[str]": - cmd = ["storage_broker", "start"] - if timeout_in_seconds is not None: - cmd.append(f"--start-timeout={timeout_in_seconds}s") - return self.raw_cli(cmd) - - def broker_stop(self) -> "subprocess.CompletedProcess[str]": - cmd = ["storage_broker", "stop"] - return self.raw_cli(cmd) - - def endpoint_create( - self, - branch_name: str, - pg_port: int, - http_port: int, - tenant_id: TenantId, - pg_version: PgVersion, - endpoint_id: Optional[str] = None, - hot_standby: bool = False, - lsn: Optional[Lsn] = None, - pageserver_id: Optional[int] = None, - allow_multiple=False, - ) -> "subprocess.CompletedProcess[str]": - args = [ - "endpoint", - "create", - "--tenant-id", - str(tenant_id), - "--branch-name", - branch_name, - "--pg-version", - pg_version, - ] - if lsn is not None: - args.extend(["--lsn", str(lsn)]) - if pg_port is not None: - args.extend(["--pg-port", str(pg_port)]) - if http_port is not None: - args.extend(["--http-port", str(http_port)]) - if endpoint_id is not None: - args.append(endpoint_id) - if hot_standby: - args.extend(["--hot-standby", "true"]) - if pageserver_id is not None: - args.extend(["--pageserver-id", str(pageserver_id)]) - if allow_multiple: - args.extend(["--allow-multiple"]) - - res = self.raw_cli(args) - res.check_returncode() - return res - - def endpoint_start( - self, - endpoint_id: str, - safekeepers: Optional[List[int]] = None, - remote_ext_config: Optional[str] = None, - pageserver_id: Optional[int] = None, - allow_multiple=False, - basebackup_request_tries: Optional[int] = None, - ) -> "subprocess.CompletedProcess[str]": - args = [ - "endpoint", - "start", - ] - extra_env_vars = {} - if basebackup_request_tries is not None: - extra_env_vars["NEON_COMPUTE_TESTING_BASEBACKUP_TRIES"] = str(basebackup_request_tries) - if remote_ext_config is not None: - args.extend(["--remote-ext-config", remote_ext_config]) - - if safekeepers is not None: - args.extend(["--safekeepers", (",".join(map(str, safekeepers)))]) - if endpoint_id is not None: - args.append(endpoint_id) - if pageserver_id is not None: - args.extend(["--pageserver-id", str(pageserver_id)]) - if allow_multiple: - args.extend(["--allow-multiple"]) - - res = self.raw_cli(args, extra_env_vars) - res.check_returncode() - return res - - def endpoint_reconfigure( - self, - endpoint_id: str, - tenant_id: Optional[TenantId] = None, - pageserver_id: Optional[int] = None, - safekeepers: Optional[List[int]] = None, - check_return_code=True, - ) -> "subprocess.CompletedProcess[str]": - args = ["endpoint", "reconfigure", endpoint_id] - if tenant_id is not None: - args.extend(["--tenant-id", str(tenant_id)]) - if pageserver_id is not None: - args.extend(["--pageserver-id", str(pageserver_id)]) - if safekeepers is not None: - args.extend(["--safekeepers", (",".join(map(str, safekeepers)))]) - return self.raw_cli(args, check_return_code=check_return_code) - - def endpoint_stop( - self, - endpoint_id: str, - destroy=False, - check_return_code=True, - mode: Optional[str] = None, - ) -> "subprocess.CompletedProcess[str]": - args = [ - "endpoint", - "stop", - ] - if destroy: - args.append("--destroy") - if mode is not None: - args.append(f"--mode={mode}") - if endpoint_id is not None: - args.append(endpoint_id) - - return self.raw_cli(args, check_return_code=check_return_code) - - def map_branch( - self, name: str, tenant_id: TenantId, timeline_id: TimelineId - ) -> "subprocess.CompletedProcess[str]": - """ - Map tenant id and timeline id to a neon_local branch name. They do not have to exist. - Usually needed when creating branches via PageserverHttpClient and not neon_local. - - After creating a name mapping, you can use EndpointFactory.create_start - with this registered branch name. - """ - args = [ - "mappings", - "map", - "--branch-name", - name, - "--tenant-id", - str(tenant_id), - "--timeline-id", - str(timeline_id), - ] - - return self.raw_cli(args, check_return_code=True) - - def start(self, check_return_code=True) -> "subprocess.CompletedProcess[str]": - return self.raw_cli(["start"], check_return_code=check_return_code) - - def stop(self, check_return_code=True) -> "subprocess.CompletedProcess[str]": - return self.raw_cli(["stop"], check_return_code=check_return_code) - - -class WalCraft(AbstractNeonCli): - """ - A typed wrapper around the `wal_craft` CLI tool. - Supports main commands via typed methods and a way to run arbitrary command directly via CLI. - """ - - COMMAND = "wal_craft" - - def postgres_config(self) -> List[str]: - res = self.raw_cli(["print-postgres-config"]) - res.check_returncode() - return res.stdout.split("\n") - - def in_existing(self, type: str, connection: str) -> None: - res = self.raw_cli(["in-existing", type, connection]) - res.check_returncode() - - -class ComputeCtl(AbstractNeonCli): - """ - A typed wrapper around the `compute_ctl` CLI tool. - """ - - COMMAND = "compute_ctl" - - -class Pagectl(AbstractNeonCli): - """ - A typed wrapper around the `pagectl` utility CLI tool. - """ - - COMMAND = "pagectl" - - def dump_index_part(self, path: Path) -> IndexPartDump: - res = self.raw_cli(["index-part", "dump", str(path)]) - res.check_returncode() - parsed = json.loads(res.stdout) - return IndexPartDump.from_json(parsed) - - class LogUtils: """ A mixin class which provides utilities for inspecting the logs of a service. @@ -3002,6 +2427,10 @@ class NeonPageserver(PgProtocol, LogUtils): """ assert self.running is False + storage = self.env.pageserver_remote_storage + if isinstance(storage, S3Storage): + s3_env_vars = storage.access_env_vars() + extra_env_vars = (extra_env_vars or {}) | s3_env_vars self.env.neon_cli.pageserver_start( self.id, extra_env_vars=extra_env_vars, timeout_in_seconds=timeout_in_seconds ) @@ -4465,8 +3894,16 @@ class Safekeeper(LogUtils): extra_opts = self.extra_opts assert self.running is False + + s3_env_vars = None + if isinstance(self.env.safekeepers_remote_storage, S3Storage): + s3_env_vars = self.env.safekeepers_remote_storage.access_env_vars() + self.env.neon_cli.safekeeper_start( - self.id, extra_opts=extra_opts, timeout_in_seconds=timeout_in_seconds + self.id, + extra_opts=extra_opts, + timeout_in_seconds=timeout_in_seconds, + extra_env_vars=s3_env_vars, ) self.running = True # wait for wal acceptor start by checking its status @@ -5376,9 +4813,9 @@ def import_timeline_from_vanilla_postgres( """ # Take backup of the existing PostgreSQL server with pg_basebackup - basebackup_dir = os.path.join(test_output_dir, "basebackup") - base_tar = os.path.join(basebackup_dir, "base.tar") - wal_tar = os.path.join(basebackup_dir, "pg_wal.tar") + basebackup_dir = test_output_dir / "basebackup" + base_tar = basebackup_dir / "base.tar" + wal_tar = basebackup_dir / "pg_wal.tar" os.mkdir(basebackup_dir) pg_bin.run( [ @@ -5388,40 +4825,28 @@ def import_timeline_from_vanilla_postgres( "-d", vanilla_pg_connstr, "-D", - basebackup_dir, + str(basebackup_dir), ] ) # Extract start_lsn and end_lsn form the backup manifest file with open(os.path.join(basebackup_dir, "backup_manifest")) as f: manifest = json.load(f) - start_lsn = manifest["WAL-Ranges"][0]["Start-LSN"] - end_lsn = manifest["WAL-Ranges"][0]["End-LSN"] + start_lsn = Lsn(manifest["WAL-Ranges"][0]["Start-LSN"]) + end_lsn = Lsn(manifest["WAL-Ranges"][0]["End-LSN"]) # Import the backup tarballs into the pageserver - env.neon_cli.raw_cli( - [ - "timeline", - "import", - "--tenant-id", - str(tenant_id), - "--timeline-id", - str(timeline_id), - "--branch-name", - branch_name, - "--base-lsn", - start_lsn, - "--base-tarfile", - base_tar, - "--end-lsn", - end_lsn, - "--wal-tarfile", - wal_tar, - "--pg-version", - env.pg_version, - ] + env.neon_cli.timeline_import( + tenant_id=tenant_id, + timeline_id=timeline_id, + new_branch_name=branch_name, + base_lsn=start_lsn, + base_tarfile=base_tar, + end_lsn=end_lsn, + wal_tarfile=wal_tar, + pg_version=env.pg_version, ) - wait_for_last_record_lsn(env.pageserver.http_client(), tenant_id, timeline_id, Lsn(end_lsn)) + wait_for_last_record_lsn(env.pageserver.http_client(), tenant_id, timeline_id, end_lsn) def last_flush_lsn_upload( diff --git a/test_runner/fixtures/pageserver/remote_storage.py b/test_runner/fixtures/pageserver/remote_storage.py index 0c3612716a..bc54fc4c8d 100644 --- a/test_runner/fixtures/pageserver/remote_storage.py +++ b/test_runner/fixtures/pageserver/remote_storage.py @@ -7,7 +7,7 @@ from pathlib import Path from typing import Any, List, Tuple from fixtures.common_types import TenantId, TimelineId -from fixtures.neon_fixtures import NeonEnv, Pagectl +from fixtures.neon_fixtures import NeonEnv from fixtures.pageserver.common_types import ( InvalidFileName, parse_layer_file_name, @@ -35,7 +35,7 @@ def duplicate_one_tenant(env: NeonEnv, template_tenant: TenantId, new_tenant: Te for file in tl.iterdir(): shutil.copy2(file, dst_tl_dir) if "__" in file.name: - Pagectl(env).raw_cli( + env.pagectl.raw_cli( [ "layer", "rewrite-summary", diff --git a/test_runner/regress/test_crafted_wal_end.py b/test_runner/regress/test_crafted_wal_end.py index aeefa66bbc..71369ab131 100644 --- a/test_runner/regress/test_crafted_wal_end.py +++ b/test_runner/regress/test_crafted_wal_end.py @@ -1,6 +1,7 @@ import pytest from fixtures.log_helper import log -from fixtures.neon_fixtures import NeonEnvBuilder, WalCraft +from fixtures.neon_cli import WalCraft +from fixtures.neon_fixtures import NeonEnvBuilder # Restart nodes with WAL end having specially crafted shape, like last record # crossing segment boundary, to test decoding issues. @@ -27,7 +28,7 @@ def test_crafted_wal_end(neon_env_builder: NeonEnvBuilder, wal_type: str): ) endpoint = env.endpoints.create("test_crafted_wal_end") - wal_craft = WalCraft(env) + wal_craft = WalCraft(extra_env=None, binpath=env.neon_binpath) endpoint.config(wal_craft.postgres_config()) endpoint.start() res = endpoint.safe_psql_many( diff --git a/test_runner/regress/test_import.py b/test_runner/regress/test_import.py index 19501c9f73..87b44e4e3e 100644 --- a/test_runner/regress/test_import.py +++ b/test_runner/regress/test_import.py @@ -98,27 +98,15 @@ def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_build ) def import_tar(base, wal): - env.neon_cli.raw_cli( - [ - "timeline", - "import", - "--tenant-id", - str(tenant), - "--timeline-id", - str(timeline), - "--branch-name", - branch_name, - "--base-lsn", - start_lsn, - "--base-tarfile", - base, - "--end-lsn", - end_lsn, - "--wal-tarfile", - wal, - "--pg-version", - env.pg_version, - ] + env.neon_cli.timeline_import( + tenant_id=tenant, + timeline_id=timeline, + new_branch_name=branch_name, + base_tarfile=base, + base_lsn=start_lsn, + wal_tarfile=wal, + end_lsn=end_lsn, + pg_version=env.pg_version, ) # Importing empty file fails @@ -268,23 +256,13 @@ def _import( branch_name = "import_from_pageserver" client = env.pageserver.http_client() env.pageserver.tenant_create(tenant) - env.neon_cli.raw_cli( - [ - "timeline", - "import", - "--tenant-id", - str(tenant), - "--timeline-id", - str(timeline), - "--branch-name", - branch_name, - "--base-lsn", - str(lsn), - "--base-tarfile", - str(tar_output_file), - "--pg-version", - env.pg_version, - ] + env.neon_cli.timeline_import( + tenant_id=tenant, + timeline_id=timeline, + new_branch_name=branch_name, + base_lsn=lsn, + base_tarfile=tar_output_file, + pg_version=env.pg_version, ) # Wait for data to land in s3 diff --git a/test_runner/regress/test_neon_cli.py b/test_runner/regress/test_neon_cli.py index f73b7b49dc..f692b8cf07 100644 --- a/test_runner/regress/test_neon_cli.py +++ b/test_runner/regress/test_neon_cli.py @@ -54,7 +54,10 @@ def test_cli_timeline_list(neon_simple_env: NeonEnv): helper_compare_timeline_list(pageserver_http_client, env, env.initial_tenant) # Check that all new branches are visible via CLI - timelines_cli = [timeline_id for (_, timeline_id) in env.neon_cli.list_timelines()] + timelines_cli = [ + timeline_id + for (_, timeline_id) in env.neon_cli.list_timelines(tenant_id=env.initial_tenant) + ] assert main_timeline_id in timelines_cli assert nested_timeline_id in timelines_cli