mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-04 03:52:56 +00:00
## Problem In our experience running the system so far, almost all of the "hang compute" situations are due to the compute (postgres) pointing at the wrong pageservers. We currently mainly rely on the promethesus exporter (PGExporter) running on PG to detect and report any down time, but these can be unreliable because the read and write probes the PGExporter runs do not always generate pageserver requests due to caching, even though the real user might be experiencing down time when touching uncached pages. We are also about to start disk-wiping node pool rotation operations in prod clusters for our pageservers, and it is critical to have a convenient way to monitor the impact of these node pool rotations so that we can quickly respond to any issues. These metrics should provide very clear signals to address this operational need. ## Summary of changes Added a pair of metrics to detect issues between postgres' PageStream protocol (e.g. get_page_at_lsn, get_base_backup, etc.) communications with pageservers: * On the compute node (compute_ctl), exports a counter metric that is incremented every time postgres requests a configuration refresh. Postgres today only requests these configuration refreshes when it cannot connect to a pageserver or if the pageserver rejects its request by disconnecting. * On the pageserver, exports a counter metric that is incremented every time it receives a PageStream request that cannot be handled because the tenant is not known or if the request was routed to the wrong shard (e.g. secondary). ### How I plan to use metrics I plan to use the metrics added here to create alerts. The alerts can fire, for example, if these counters have been continuously increasing for over a certain period of time. During rollouts, misrouted requests may occasionally happen, but they should soon die down as reconfigurations make progress. We can start with something like raising the alert if the counters have been increasing continuously for over 5 minutes. ## How is this tested? New integration tests in `test_runner/regress/test_hadron_ps_connectivity_metrics.py` Co-authored-by: William Huang <william.huang@databricks.com>
748 lines
25 KiB
Python
748 lines
25 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import tempfile
|
|
import textwrap
|
|
from itertools import chain, product
|
|
from typing import TYPE_CHECKING, cast
|
|
|
|
import toml
|
|
|
|
from fixtures.common_types import Lsn, TenantId, TimelineId
|
|
from fixtures.log_helper import log
|
|
from fixtures.pageserver.common_types import IndexPartDump
|
|
|
|
if TYPE_CHECKING:
|
|
from pathlib import Path
|
|
from typing import (
|
|
Any,
|
|
)
|
|
|
|
from fixtures.endpoint.http import ComputeClaimsScope
|
|
from fixtures.pg_version import PgVersion
|
|
|
|
|
|
# Used to be an ABC. abc.ABC removed due to linter without name change.
|
|
class AbstractNeonCli:
|
|
"""
|
|
A typed wrapper around an arbitrary Neon CLI tool.
|
|
Supports a way to run arbitrary command directly via CLI.
|
|
Do not use directly, use specific subclasses instead.
|
|
"""
|
|
|
|
def __init__(self, extra_env: dict[str, str] | None, binpath: Path):
|
|
self.extra_env = extra_env
|
|
self.binpath = binpath
|
|
|
|
COMMAND: str = cast("str", None) # To be overwritten by the derived class.
|
|
|
|
def raw_cli(
|
|
self,
|
|
arguments: list[str],
|
|
extra_env_vars: dict[str, str] | None = None,
|
|
check_return_code=True,
|
|
timeout=None,
|
|
) -> subprocess.CompletedProcess[str]:
|
|
"""
|
|
Run the command with the specified arguments.
|
|
|
|
Arguments must be in list form, e.g. ['endpoint', 'create']
|
|
|
|
Return both stdout and stderr, which can be accessed as
|
|
|
|
>>> result = env.neon_cli.raw_cli(...)
|
|
>>> assert result.stderr == ""
|
|
>>> log.info(result.stdout)
|
|
|
|
If `check_return_code`, on non-zero exit code logs failure and raises.
|
|
"""
|
|
|
|
assert isinstance(arguments, list)
|
|
assert isinstance(self.COMMAND, str)
|
|
|
|
command_path = str(self.binpath / self.COMMAND)
|
|
|
|
args = [command_path] + arguments
|
|
log.info('Running command "{}"'.format(" ".join(args)))
|
|
|
|
env_vars = os.environ.copy()
|
|
|
|
# extra env
|
|
for extra_env_key, extra_env_value in (self.extra_env or {}).items():
|
|
env_vars[extra_env_key] = extra_env_value
|
|
for extra_env_key, extra_env_value in (extra_env_vars or {}).items():
|
|
env_vars[extra_env_key] = extra_env_value
|
|
|
|
# Pass through coverage settings
|
|
var = "LLVM_PROFILE_FILE"
|
|
val = os.environ.get(var)
|
|
if val:
|
|
env_vars[var] = val
|
|
|
|
# Intercept CalledProcessError and print more info
|
|
try:
|
|
res = subprocess.run(
|
|
args,
|
|
env=env_vars,
|
|
check=False,
|
|
text=True,
|
|
capture_output=True,
|
|
timeout=timeout,
|
|
)
|
|
except subprocess.TimeoutExpired as e:
|
|
if e.stderr:
|
|
stderr = e.stderr.decode(errors="replace")
|
|
else:
|
|
stderr = ""
|
|
|
|
if e.stdout:
|
|
stdout = e.stdout.decode(errors="replace")
|
|
else:
|
|
stdout = ""
|
|
|
|
log.warning(f"CLI timeout: stderr={stderr}, stdout={stdout}")
|
|
raise
|
|
|
|
indent = " "
|
|
if not res.returncode:
|
|
stripped = res.stdout.strip()
|
|
lines = stripped.splitlines()
|
|
if len(lines) < 2:
|
|
log.debug(f"Run {res.args} success: {stripped}")
|
|
else:
|
|
log.debug("Run %s success:\n%s", res.args, textwrap.indent(stripped, indent))
|
|
elif check_return_code:
|
|
# this way command output will be in recorded and shown in CI in failure message
|
|
indent = indent * 2
|
|
msg = textwrap.dedent(
|
|
"""\
|
|
Run %s failed:
|
|
stdout:
|
|
%s
|
|
stderr:
|
|
%s
|
|
"""
|
|
)
|
|
msg = msg % (
|
|
res.args,
|
|
textwrap.indent(res.stdout.strip(), indent),
|
|
textwrap.indent(res.stderr.strip(), indent),
|
|
)
|
|
log.info(msg)
|
|
raise RuntimeError(msg) from subprocess.CalledProcessError(
|
|
res.returncode, res.args, res.stdout, res.stderr
|
|
)
|
|
return res
|
|
|
|
|
|
class NeonLocalCli(AbstractNeonCli):
|
|
"""A typed wrapper around the `neon_local` CLI tool.
|
|
Supports main commands via typed methods and a way to run arbitrary command directly via CLI.
|
|
|
|
Note: The methods in this class are supposed to be faithful wrappers of the underlying
|
|
'neon_local' commands. If you're tempted to add any logic here, please consider putting it
|
|
in the caller instead!
|
|
|
|
There are a few exceptions where these wrapper methods intentionally differ from the
|
|
underlying commands, however:
|
|
- Many 'neon_local' commands take an optional 'tenant_id' argument and use the default from
|
|
the config file if it's omitted. The corresponding wrappers require an explicit 'tenant_id'
|
|
argument. The idea is that we don't want to rely on the config file's default in tests,
|
|
because NeonEnv has its own 'initial_tenant'. They are currently always the same, but we
|
|
want to rely on the Neonenv's default instead of the config file default in tests.
|
|
|
|
- Similarly, --pg_version argument is always required in the wrappers, even when it's
|
|
optional in the 'neon_local' command. The default in 'neon_local' is a specific
|
|
hardcoded version, but in tests, we never want to accidentally rely on that;, we
|
|
always want to use the version from the test fixtures.
|
|
|
|
- Wrappers for commands that create a new tenant or timeline ID require the new tenant
|
|
or timeline ID to be passed by the caller, while the 'neon_local' commands will
|
|
generate a random ID if it's not specified. This is because we don't want to have to
|
|
parse the ID from the 'neon_local' output. Making it required ensures that the
|
|
caller has to generate it.
|
|
"""
|
|
|
|
COMMAND = "neon_local"
|
|
|
|
def __init__(
|
|
self,
|
|
extra_env: dict[str, str] | None,
|
|
binpath: Path,
|
|
repo_dir: Path,
|
|
pg_distrib_dir: Path,
|
|
):
|
|
if extra_env is None:
|
|
env_vars = {}
|
|
else:
|
|
env_vars = extra_env.copy()
|
|
env_vars["NEON_REPO_DIR"] = str(repo_dir)
|
|
env_vars["POSTGRES_DISTRIB_DIR"] = str(pg_distrib_dir)
|
|
|
|
super().__init__(env_vars, binpath)
|
|
|
|
def raw_cli(self, *args, **kwargs) -> subprocess.CompletedProcess[str]:
|
|
return super().raw_cli(*args, **kwargs)
|
|
|
|
def tenant_create(
|
|
self,
|
|
tenant_id: TenantId,
|
|
timeline_id: TimelineId,
|
|
pg_version: PgVersion,
|
|
conf: dict[str, Any] | None = None,
|
|
shard_count: int | None = None,
|
|
shard_stripe_size: int | None = None,
|
|
placement_policy: str | None = None,
|
|
set_default: bool = False,
|
|
):
|
|
"""
|
|
Creates a new tenant, returns its id and its initial timeline's id.
|
|
"""
|
|
args = [
|
|
"tenant",
|
|
"create",
|
|
"--tenant-id",
|
|
str(tenant_id),
|
|
"--timeline-id",
|
|
str(timeline_id),
|
|
"--pg-version",
|
|
pg_version,
|
|
]
|
|
if conf is not None:
|
|
for key, value in conf.items():
|
|
if isinstance(value, bool):
|
|
args.extend(
|
|
["-c", f"{key}:{str(value).lower()}"]
|
|
) # only accepts true/false not True/False
|
|
else:
|
|
args.extend(["-c", f"{key}:{value}"])
|
|
|
|
if set_default:
|
|
args.append("--set-default")
|
|
|
|
if shard_count is not None:
|
|
args.extend(["--shard-count", str(shard_count)])
|
|
|
|
if shard_stripe_size is not None:
|
|
args.extend(["--shard-stripe-size", str(shard_stripe_size)])
|
|
|
|
if placement_policy is not None:
|
|
args.extend(["--placement-policy", str(placement_policy)])
|
|
|
|
res = self.raw_cli(args)
|
|
res.check_returncode()
|
|
|
|
def tenant_import(self, tenant_id: TenantId):
|
|
args = ["tenant", "import", "--tenant-id", str(tenant_id)]
|
|
res = self.raw_cli(args)
|
|
res.check_returncode()
|
|
|
|
def tenant_set_default(self, tenant_id: TenantId):
|
|
"""
|
|
Update default tenant for future operations that require tenant_id.
|
|
"""
|
|
res = self.raw_cli(["tenant", "set-default", "--tenant-id", str(tenant_id)])
|
|
res.check_returncode()
|
|
|
|
def tenant_config(self, tenant_id: TenantId, conf: dict[str, str]):
|
|
"""
|
|
Update tenant config.
|
|
"""
|
|
|
|
args = ["tenant", "config", "--tenant-id", str(tenant_id)]
|
|
if conf is not None:
|
|
args.extend(
|
|
chain.from_iterable(
|
|
product(["-c"], (f"{key}:{value}" for key, value in conf.items()))
|
|
)
|
|
)
|
|
|
|
res = self.raw_cli(args)
|
|
res.check_returncode()
|
|
|
|
def tenant_list(self) -> subprocess.CompletedProcess[str]:
|
|
res = self.raw_cli(["tenant", "list"])
|
|
res.check_returncode()
|
|
return res
|
|
|
|
def timeline_create(
|
|
self,
|
|
new_branch_name: str,
|
|
tenant_id: TenantId,
|
|
timeline_id: TimelineId,
|
|
pg_version: PgVersion,
|
|
) -> TimelineId:
|
|
if timeline_id is None:
|
|
timeline_id = TimelineId.generate()
|
|
|
|
cmd = [
|
|
"timeline",
|
|
"create",
|
|
"--branch-name",
|
|
new_branch_name,
|
|
"--tenant-id",
|
|
str(tenant_id),
|
|
"--timeline-id",
|
|
str(timeline_id),
|
|
"--pg-version",
|
|
pg_version,
|
|
]
|
|
|
|
res = self.raw_cli(cmd)
|
|
res.check_returncode()
|
|
|
|
return timeline_id
|
|
|
|
def timeline_branch(
|
|
self,
|
|
tenant_id: TenantId,
|
|
timeline_id: TimelineId,
|
|
new_branch_name,
|
|
ancestor_branch_name: str | None = None,
|
|
ancestor_start_lsn: Lsn | None = None,
|
|
):
|
|
cmd = [
|
|
"timeline",
|
|
"branch",
|
|
"--branch-name",
|
|
new_branch_name,
|
|
"--timeline-id",
|
|
str(timeline_id),
|
|
"--tenant-id",
|
|
str(tenant_id),
|
|
]
|
|
if ancestor_branch_name is not None:
|
|
cmd.extend(["--ancestor-branch-name", ancestor_branch_name])
|
|
if ancestor_start_lsn is not None:
|
|
cmd.extend(["--ancestor-start-lsn", str(ancestor_start_lsn)])
|
|
|
|
res = self.raw_cli(cmd)
|
|
res.check_returncode()
|
|
|
|
def timeline_import(
|
|
self,
|
|
tenant_id: TenantId,
|
|
timeline_id: TimelineId,
|
|
new_branch_name: str,
|
|
base_lsn: Lsn,
|
|
base_tarfile: Path,
|
|
pg_version: PgVersion,
|
|
end_lsn: Lsn | None = None,
|
|
wal_tarfile: Path | None = None,
|
|
):
|
|
cmd = [
|
|
"timeline",
|
|
"import",
|
|
"--tenant-id",
|
|
str(tenant_id),
|
|
"--timeline-id",
|
|
str(timeline_id),
|
|
"--pg-version",
|
|
pg_version,
|
|
"--branch-name",
|
|
new_branch_name,
|
|
"--base-lsn",
|
|
str(base_lsn),
|
|
"--base-tarfile",
|
|
str(base_tarfile),
|
|
]
|
|
if end_lsn is not None:
|
|
cmd.extend(["--end-lsn", str(end_lsn)])
|
|
if wal_tarfile is not None:
|
|
cmd.extend(["--wal-tarfile", str(wal_tarfile)])
|
|
|
|
res = self.raw_cli(cmd)
|
|
res.check_returncode()
|
|
|
|
def timeline_list(self, tenant_id: TenantId) -> list[tuple[str, TimelineId]]:
|
|
"""
|
|
Returns a list of (branch_name, timeline_id) tuples out of parsed `neon timeline list` CLI output.
|
|
"""
|
|
|
|
# main [b49f7954224a0ad25cc0013ea107b54b]
|
|
# ┣━ @0/16B5A50: test_cli_branch_list_main [20f98c79111b9015d84452258b7d5540]
|
|
TIMELINE_DATA_EXTRACTOR: re.Pattern = re.compile( # type: ignore[type-arg]
|
|
r"\s?(?P<branch_name>[^\s]+)\s\[(?P<timeline_id>[^\]]+)\]", re.MULTILINE
|
|
)
|
|
res = self.raw_cli(["timeline", "list", "--tenant-id", str(tenant_id)])
|
|
timelines_cli = sorted(
|
|
map(
|
|
lambda branch_and_id: (branch_and_id[0], TimelineId(branch_and_id[1])),
|
|
TIMELINE_DATA_EXTRACTOR.findall(res.stdout),
|
|
)
|
|
)
|
|
return timelines_cli
|
|
|
|
def init(
|
|
self,
|
|
init_config: dict[str, Any],
|
|
force: str | None = None,
|
|
) -> subprocess.CompletedProcess[str]:
|
|
with tempfile.NamedTemporaryFile(mode="w+") as init_config_tmpfile:
|
|
init_config_tmpfile.write(toml.dumps(init_config))
|
|
init_config_tmpfile.flush()
|
|
|
|
cmd = [
|
|
"init",
|
|
f"--config={init_config_tmpfile.name}",
|
|
]
|
|
|
|
if force is not None:
|
|
cmd.extend(["--force", force])
|
|
|
|
res = self.raw_cli(cmd)
|
|
res.check_returncode()
|
|
return res
|
|
|
|
def storage_controller_start(
|
|
self,
|
|
timeout_in_seconds: int | None = None,
|
|
instance_id: int | None = None,
|
|
base_port: int | None = None,
|
|
handle_ps_local_disk_loss: bool | None = None,
|
|
):
|
|
cmd = ["storage_controller", "start"]
|
|
if timeout_in_seconds is not None:
|
|
cmd.append(f"--start-timeout={timeout_in_seconds}s")
|
|
if instance_id is not None:
|
|
cmd.append(f"--instance-id={instance_id}")
|
|
if base_port is not None:
|
|
cmd.append(f"--base-port={base_port}")
|
|
if handle_ps_local_disk_loss is not None:
|
|
cmd.append(
|
|
f"--handle-ps-local-disk-loss={'true' if handle_ps_local_disk_loss else 'false'}"
|
|
)
|
|
return self.raw_cli(cmd)
|
|
|
|
def storage_controller_stop(self, immediate: bool, instance_id: int | None = None):
|
|
cmd = ["storage_controller", "stop"]
|
|
if immediate:
|
|
cmd.extend(["-m", "immediate"])
|
|
if instance_id is not None:
|
|
cmd.append(f"--instance-id={instance_id}")
|
|
return self.raw_cli(cmd)
|
|
|
|
def endpoint_storage_start(self, timeout_in_seconds: int | None = None):
|
|
cmd = ["endpoint-storage", "start"]
|
|
if timeout_in_seconds is not None:
|
|
cmd.append(f"--start-timeout={timeout_in_seconds}s")
|
|
return self.raw_cli(cmd)
|
|
|
|
def endpoint_storage_stop(self, immediate: bool):
|
|
cmd = ["endpoint-storage", "stop"]
|
|
if immediate:
|
|
cmd.extend(["-m", "immediate"])
|
|
return self.raw_cli(cmd)
|
|
pass
|
|
|
|
def pageserver_start(
|
|
self,
|
|
id: int,
|
|
extra_env_vars: dict[str, str] | None = None,
|
|
timeout_in_seconds: int | None = None,
|
|
) -> subprocess.CompletedProcess[str]:
|
|
start_args = ["pageserver", "start", f"--id={id}"]
|
|
if timeout_in_seconds is not None:
|
|
start_args.append(f"--start-timeout={timeout_in_seconds}s")
|
|
return self.raw_cli(start_args, extra_env_vars=extra_env_vars)
|
|
|
|
def pageserver_stop(self, id: int, immediate=False) -> subprocess.CompletedProcess[str]:
|
|
cmd = ["pageserver", "stop", f"--id={id}"]
|
|
if immediate:
|
|
cmd.extend(["-m", "immediate"])
|
|
|
|
return self.raw_cli(cmd)
|
|
|
|
def safekeeper_start(
|
|
self,
|
|
id: int,
|
|
extra_opts: list[str] | None = None,
|
|
extra_env_vars: dict[str, str] | None = None,
|
|
timeout_in_seconds: int | None = None,
|
|
) -> subprocess.CompletedProcess[str]:
|
|
if extra_opts is not None:
|
|
extra_opts = [f"-e={opt}" for opt in extra_opts]
|
|
else:
|
|
extra_opts = []
|
|
if timeout_in_seconds is not None:
|
|
extra_opts.append(f"--start-timeout={timeout_in_seconds}s")
|
|
return self.raw_cli(
|
|
["safekeeper", "start", str(id), *extra_opts], extra_env_vars=extra_env_vars
|
|
)
|
|
|
|
def safekeeper_stop(
|
|
self, id: int | None = None, immediate=False
|
|
) -> subprocess.CompletedProcess[str]:
|
|
args = ["safekeeper", "stop"]
|
|
if id is not None:
|
|
args.append(str(id))
|
|
if immediate:
|
|
args.extend(["-m", "immediate"])
|
|
return self.raw_cli(args)
|
|
|
|
def storage_broker_start(
|
|
self, timeout_in_seconds: int | None = None
|
|
) -> subprocess.CompletedProcess[str]:
|
|
cmd = ["storage_broker", "start"]
|
|
if timeout_in_seconds is not None:
|
|
cmd.append(f"--start-timeout={timeout_in_seconds}s")
|
|
return self.raw_cli(cmd)
|
|
|
|
def storage_broker_stop(self) -> subprocess.CompletedProcess[str]:
|
|
cmd = ["storage_broker", "stop"]
|
|
return self.raw_cli(cmd)
|
|
|
|
def endpoint_create(
|
|
self,
|
|
branch_name: str,
|
|
pg_port: int,
|
|
external_http_port: int,
|
|
internal_http_port: int,
|
|
tenant_id: TenantId,
|
|
pg_version: PgVersion,
|
|
endpoint_id: str | None = None,
|
|
grpc: bool | None = None,
|
|
hot_standby: bool = False,
|
|
lsn: Lsn | None = None,
|
|
pageserver_id: int | None = None,
|
|
allow_multiple=False,
|
|
update_catalog: bool = False,
|
|
privileged_role_name: str | None = None,
|
|
) -> subprocess.CompletedProcess[str]:
|
|
args = [
|
|
"endpoint",
|
|
"create",
|
|
"--tenant-id",
|
|
str(tenant_id),
|
|
"--branch-name",
|
|
branch_name,
|
|
"--pg-version",
|
|
pg_version,
|
|
]
|
|
if lsn is not None:
|
|
args.extend(["--lsn", str(lsn)])
|
|
if pg_port is not None:
|
|
args.extend(["--pg-port", str(pg_port)])
|
|
if external_http_port is not None:
|
|
args.extend(["--external-http-port", str(external_http_port)])
|
|
if internal_http_port is not None:
|
|
args.extend(["--internal-http-port", str(internal_http_port)])
|
|
if grpc:
|
|
args.append("--grpc")
|
|
if endpoint_id is not None:
|
|
args.append(endpoint_id)
|
|
if hot_standby:
|
|
args.extend(["--hot-standby", "true"])
|
|
if pageserver_id is not None:
|
|
args.extend(["--pageserver-id", str(pageserver_id)])
|
|
if allow_multiple:
|
|
args.extend(["--allow-multiple"])
|
|
if update_catalog:
|
|
args.extend(["--update-catalog"])
|
|
if privileged_role_name is not None:
|
|
args.extend(["--privileged-role-name", privileged_role_name])
|
|
|
|
res = self.raw_cli(args)
|
|
res.check_returncode()
|
|
return res
|
|
|
|
def endpoint_generate_jwt(
|
|
self, endpoint_id: str, scope: ComputeClaimsScope | None = None
|
|
) -> str:
|
|
"""
|
|
Generate a JWT for making requests to the endpoint's external HTTP
|
|
server.
|
|
"""
|
|
args = ["endpoint", "generate-jwt", endpoint_id]
|
|
if scope:
|
|
args += ["--scope", str(scope)]
|
|
|
|
cmd = self.raw_cli(args)
|
|
cmd.check_returncode()
|
|
|
|
return cmd.stdout
|
|
|
|
def endpoint_start(
|
|
self,
|
|
endpoint_id: str,
|
|
safekeepers_generation: int | None = None,
|
|
safekeepers: list[int] | None = None,
|
|
remote_ext_base_url: str | None = None,
|
|
pageserver_id: int | None = None,
|
|
allow_multiple: bool = False,
|
|
create_test_user: bool = False,
|
|
basebackup_request_tries: int | None = None,
|
|
timeout: str | None = None,
|
|
env: dict[str, str] | None = None,
|
|
dev: bool = False,
|
|
autoprewarm: bool = False,
|
|
offload_lfc_interval_seconds: int | None = None,
|
|
) -> subprocess.CompletedProcess[str]:
|
|
args = [
|
|
"endpoint",
|
|
"start",
|
|
]
|
|
extra_env_vars = env or {}
|
|
if basebackup_request_tries is not None:
|
|
extra_env_vars["NEON_COMPUTE_TESTING_BASEBACKUP_RETRIES"] = str(
|
|
basebackup_request_tries
|
|
)
|
|
if remote_ext_base_url is not None:
|
|
args.extend(["--remote-ext-base-url", remote_ext_base_url])
|
|
|
|
if safekeepers_generation is not None:
|
|
args.extend(["--safekeepers-generation", str(safekeepers_generation)])
|
|
if safekeepers is not None:
|
|
args.extend(["--safekeepers", (",".join(map(str, safekeepers)))])
|
|
if endpoint_id is not None:
|
|
args.append(endpoint_id)
|
|
if pageserver_id is not None:
|
|
args.extend(["--pageserver-id", str(pageserver_id)])
|
|
if allow_multiple:
|
|
args.extend(["--allow-multiple"])
|
|
if create_test_user:
|
|
args.extend(["--create-test-user"])
|
|
if timeout is not None:
|
|
args.extend(["--start-timeout", str(timeout)])
|
|
if autoprewarm:
|
|
args.extend(["--autoprewarm"])
|
|
if offload_lfc_interval_seconds is not None:
|
|
args.extend(["--offload-lfc-interval-seconds", str(offload_lfc_interval_seconds)])
|
|
if dev:
|
|
args.extend(["--dev"])
|
|
|
|
res = self.raw_cli(args, extra_env_vars)
|
|
res.check_returncode()
|
|
return res
|
|
|
|
def endpoint_reconfigure(
|
|
self,
|
|
endpoint_id: str,
|
|
tenant_id: TenantId | None = None,
|
|
pageserver_id: int | None = None,
|
|
safekeepers: list[int] | None = None,
|
|
check_return_code=True,
|
|
timeout_sec: float | None = None,
|
|
) -> subprocess.CompletedProcess[str]:
|
|
args = ["endpoint", "reconfigure", endpoint_id]
|
|
if tenant_id is not None:
|
|
args.extend(["--tenant-id", str(tenant_id)])
|
|
if pageserver_id is not None:
|
|
args.extend(["--pageserver-id", str(pageserver_id)])
|
|
if safekeepers is not None:
|
|
args.extend(["--safekeepers", (",".join(map(str, safekeepers)))])
|
|
return self.raw_cli(args, check_return_code=check_return_code, timeout=timeout_sec)
|
|
|
|
def endpoint_refresh_configuration(
|
|
self,
|
|
endpoint_id: str,
|
|
) -> subprocess.CompletedProcess[str]:
|
|
args = ["endpoint", "refresh-configuration", endpoint_id]
|
|
res = self.raw_cli(args)
|
|
res.check_returncode()
|
|
return res
|
|
|
|
def endpoint_stop(
|
|
self,
|
|
endpoint_id: str,
|
|
destroy=False,
|
|
check_return_code=True,
|
|
mode: str | None = None,
|
|
) -> tuple[Lsn | None, subprocess.CompletedProcess[str]]:
|
|
args = [
|
|
"endpoint",
|
|
"stop",
|
|
]
|
|
if destroy:
|
|
args.append("--destroy")
|
|
if mode is not None:
|
|
args.append(f"--mode={mode}")
|
|
if endpoint_id is not None:
|
|
args.append(endpoint_id)
|
|
|
|
proc = self.raw_cli(args, check_return_code=check_return_code)
|
|
log.debug(f"endpoint stop stdout: {proc.stdout}")
|
|
lsn_str = proc.stdout.split()[-1]
|
|
lsn: Lsn | None = None if lsn_str == "null" else Lsn(lsn_str)
|
|
return lsn, proc
|
|
|
|
def endpoint_update_pageservers(
|
|
self,
|
|
endpoint_id: str,
|
|
pageserver_id: int | None = None,
|
|
) -> subprocess.CompletedProcess[str]:
|
|
args = [
|
|
"endpoint",
|
|
"update-pageservers",
|
|
endpoint_id,
|
|
]
|
|
if pageserver_id is not None:
|
|
args.extend(["--pageserver-id", str(pageserver_id)])
|
|
res = self.raw_cli(args)
|
|
res.check_returncode()
|
|
return res
|
|
|
|
def mappings_map_branch(
|
|
self, name: str, tenant_id: TenantId, timeline_id: TimelineId
|
|
) -> subprocess.CompletedProcess[str]:
|
|
"""
|
|
Map tenant id and timeline id to a neon_local branch name. They do not have to exist.
|
|
Usually needed when creating branches via PageserverHttpClient and not neon_local.
|
|
|
|
After creating a name mapping, you can use EndpointFactory.create_start
|
|
with this registered branch name.
|
|
"""
|
|
args = [
|
|
"mappings",
|
|
"map",
|
|
"--branch-name",
|
|
name,
|
|
"--tenant-id",
|
|
str(tenant_id),
|
|
"--timeline-id",
|
|
str(timeline_id),
|
|
]
|
|
|
|
return self.raw_cli(args, check_return_code=True)
|
|
|
|
def start(self, check_return_code=True) -> subprocess.CompletedProcess[str]:
|
|
return self.raw_cli(["start"], check_return_code=check_return_code)
|
|
|
|
def stop(self, check_return_code=True) -> subprocess.CompletedProcess[str]:
|
|
return self.raw_cli(["stop"], check_return_code=check_return_code)
|
|
|
|
|
|
class WalCraft(AbstractNeonCli):
|
|
"""
|
|
A typed wrapper around the `wal_craft` CLI tool.
|
|
Supports main commands via typed methods and a way to run arbitrary command directly via CLI.
|
|
"""
|
|
|
|
COMMAND = "wal_craft"
|
|
|
|
def postgres_config(self) -> list[str]:
|
|
res = self.raw_cli(["print-postgres-config"])
|
|
res.check_returncode()
|
|
return res.stdout.split("\n")
|
|
|
|
def in_existing(self, type: str, connection: str) -> None:
|
|
res = self.raw_cli(["in-existing", type, connection])
|
|
res.check_returncode()
|
|
|
|
|
|
class Pagectl(AbstractNeonCli):
|
|
"""
|
|
A typed wrapper around the `pagectl` utility CLI tool.
|
|
"""
|
|
|
|
COMMAND = "pagectl"
|
|
|
|
def dump_index_part(self, path: Path) -> IndexPartDump:
|
|
res = self.raw_cli(["index-part", "dump", str(path)])
|
|
res.check_returncode()
|
|
parsed = json.loads(res.stdout)
|
|
return IndexPartDump.from_json(parsed)
|