diff --git a/test_runner/fixtures/benchmark_fixture.py b/test_runner/fixtures/benchmark_fixture.py index 655ffed90d..338cc47ea2 100644 --- a/test_runner/fixtures/benchmark_fixture.py +++ b/test_runner/fixtures/benchmark_fixture.py @@ -5,7 +5,6 @@ import json import os import re import timeit -import uuid import warnings from contextlib import contextmanager from datetime import datetime @@ -17,6 +16,7 @@ from typing import Iterator, Optional import pytest from _pytest.config import Config from _pytest.terminal import TerminalReporter +from fixtures.types import ZTenantId, ZTimelineId """ This file contains fixtures for micro-benchmarks. @@ -365,11 +365,11 @@ class NeonBenchmarker: assert matches return int(round(float(matches.group(1)))) - def get_timeline_size(self, repo_dir: Path, tenantid: uuid.UUID, timelineid: str): + def get_timeline_size(self, repo_dir: Path, tenantid: ZTenantId, timelineid: ZTimelineId): """ Calculate the on-disk size of a timeline """ - path = "{}/tenants/{}/timelines/{}".format(repo_dir, tenantid.hex, timelineid) + path = "{}/tenants/{}/timelines/{}".format(repo_dir, tenantid, timelineid) totalbytes = 0 for root, dirs, files in os.walk(path): diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index bbc35736bc..9ad9c0cd2f 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -29,20 +29,14 @@ import pytest import requests from cached_property import cached_property from fixtures.log_helper import log +from fixtures.types import Lsn, ZTenantId, ZTimelineId # Type-related stuff from psycopg2.extensions import connection as PgConnection from psycopg2.extensions import make_dsn, parse_dsn from typing_extensions import Literal -from .utils import ( - allure_attach_from_dir, - etcd_path, - get_self_dir, - lsn_from_hex, - lsn_to_hex, - subprocess_capture, -) +from .utils import allure_attach_from_dir, etcd_path, get_self_dir, subprocess_capture """ This file contains pytest fixtures. A fixture is a test resource that can be @@ -378,7 +372,7 @@ class AuthKeys: def generate_tenant_token(self, tenant_id): token = jwt.encode( - {"scope": "tenant", "tenant_id": tenant_id}, self.priv, algorithm="RS256" + {"scope": "tenant", "tenant_id": str(tenant_id)}, self.priv, algorithm="RS256" ) if isinstance(token, bytes): @@ -759,12 +753,12 @@ class NeonEnv: # generate initial tenant ID here instead of letting 'neon init' generate it, # so that we don't need to dig it out of the config file afterwards. - self.initial_tenant = uuid.uuid4() + self.initial_tenant = ZTenantId.generate() # Create a config file corresponding to the options toml = textwrap.dedent( f""" - default_tenant_id = '{self.initial_tenant.hex}' + default_tenant_id = '{self.initial_tenant}' """ ) @@ -846,9 +840,9 @@ class NeonEnv: """Get list of safekeeper endpoints suitable for safekeepers GUC""" return ",".join([f"localhost:{wa.port.pg}" for wa in self.safekeepers]) - def timeline_dir(self, tenant_id: uuid.UUID, timeline_id: uuid.UUID) -> Path: + def timeline_dir(self, tenant_id: ZTenantId, timeline_id: ZTimelineId) -> Path: """Get a timeline directory's path based on the repo directory of the test environment""" - return self.repo_dir / "tenants" / tenant_id.hex / "timelines" / timeline_id.hex + return self.repo_dir / "tenants" / str(tenant_id) / "timelines" / str(timeline_id) @cached_property def auth_keys(self) -> AuthKeys: @@ -976,11 +970,11 @@ class NeonPageserverHttpClient(requests.Session): assert isinstance(res_json, list) return res_json - def tenant_create(self, new_tenant_id: Optional[uuid.UUID] = None) -> uuid.UUID: + def tenant_create(self, new_tenant_id: Optional[ZTenantId] = None) -> ZTenantId: res = self.post( f"http://localhost:{self.port}/v1/tenant", json={ - "new_tenant_id": new_tenant_id.hex if new_tenant_id else None, + "new_tenant_id": str(new_tenant_id) if new_tenant_id else None, }, ) self.verbose_error(res) @@ -988,25 +982,25 @@ class NeonPageserverHttpClient(requests.Session): raise Exception(f"could not create tenant: already exists for id {new_tenant_id}") new_tenant_id = res.json() assert isinstance(new_tenant_id, str) - return uuid.UUID(new_tenant_id) + return ZTenantId(new_tenant_id) - def tenant_attach(self, tenant_id: uuid.UUID): - res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id.hex}/attach") + def tenant_attach(self, tenant_id: ZTenantId): + res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/attach") self.verbose_error(res) - def tenant_detach(self, tenant_id: uuid.UUID): - res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id.hex}/detach") + def tenant_detach(self, tenant_id: ZTenantId): + res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/detach") self.verbose_error(res) - def tenant_status(self, tenant_id: uuid.UUID) -> Dict[Any, Any]: - res = self.get(f"http://localhost:{self.port}/v1/tenant/{tenant_id.hex}") + def tenant_status(self, tenant_id: ZTenantId) -> Dict[Any, Any]: + res = self.get(f"http://localhost:{self.port}/v1/tenant/{tenant_id}") self.verbose_error(res) res_json = res.json() assert isinstance(res_json, dict) return res_json - def timeline_list(self, tenant_id: uuid.UUID) -> List[Dict[str, Any]]: - res = self.get(f"http://localhost:{self.port}/v1/tenant/{tenant_id.hex}/timeline") + def timeline_list(self, tenant_id: ZTenantId) -> List[Dict[str, Any]]: + res = self.get(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline") self.verbose_error(res) res_json = res.json() assert isinstance(res_json, list) @@ -1014,17 +1008,17 @@ class NeonPageserverHttpClient(requests.Session): def timeline_create( self, - tenant_id: uuid.UUID, - new_timeline_id: Optional[uuid.UUID] = None, - ancestor_timeline_id: Optional[uuid.UUID] = None, - ancestor_start_lsn: Optional[str] = None, + tenant_id: ZTenantId, + new_timeline_id: Optional[ZTimelineId] = None, + ancestor_timeline_id: Optional[ZTimelineId] = None, + ancestor_start_lsn: Optional[Lsn] = None, ) -> Dict[Any, Any]: res = self.post( - f"http://localhost:{self.port}/v1/tenant/{tenant_id.hex}/timeline", + f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline", json={ - "new_timeline_id": new_timeline_id.hex if new_timeline_id else None, - "ancestor_start_lsn": ancestor_start_lsn, - "ancestor_timeline_id": ancestor_timeline_id.hex if ancestor_timeline_id else None, + "new_timeline_id": str(new_timeline_id) if new_timeline_id else None, + "ancestor_start_lsn": str(ancestor_start_lsn) if ancestor_start_lsn else None, + "ancestor_timeline_id": str(ancestor_timeline_id) if ancestor_timeline_id else None, }, ) self.verbose_error(res) @@ -1037,8 +1031,8 @@ class NeonPageserverHttpClient(requests.Session): def timeline_detail( self, - tenant_id: uuid.UUID, - timeline_id: uuid.UUID, + tenant_id: ZTenantId, + timeline_id: ZTimelineId, include_non_incremental_logical_size: bool = False, include_non_incremental_physical_size: bool = False, ) -> Dict[Any, Any]: @@ -1049,7 +1043,7 @@ class NeonPageserverHttpClient(requests.Session): params["include-non-incremental-physical-size"] = "yes" res = self.get( - f"http://localhost:{self.port}/v1/tenant/{tenant_id.hex}/timeline/{timeline_id.hex}", + f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}", params=params, ) self.verbose_error(res) @@ -1057,9 +1051,9 @@ class NeonPageserverHttpClient(requests.Session): assert isinstance(res_json, dict) return res_json - def timeline_delete(self, tenant_id: uuid.UUID, timeline_id: uuid.UUID): + def timeline_delete(self, tenant_id: ZTenantId, timeline_id: ZTimelineId): res = self.delete( - f"http://localhost:{self.port}/v1/tenant/{tenant_id.hex}/timeline/{timeline_id.hex}" + f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}" ) self.verbose_error(res) res_json = res.json() @@ -1179,38 +1173,52 @@ class NeonCli(AbstractNeonCli): def create_tenant( self, - tenant_id: Optional[uuid.UUID] = None, - timeline_id: Optional[uuid.UUID] = None, + tenant_id: Optional[ZTenantId] = None, + timeline_id: Optional[ZTimelineId] = None, conf: Optional[Dict[str, str]] = None, - ) -> Tuple[uuid.UUID, uuid.UUID]: + ) -> Tuple[ZTenantId, ZTimelineId]: """ Creates a new tenant, returns its id and its initial timeline's id. """ if tenant_id is None: - tenant_id = uuid.uuid4() + tenant_id = ZTenantId.generate() if timeline_id is None: - timeline_id = uuid.uuid4() + timeline_id = ZTimelineId.generate() if conf is None: res = self.raw_cli( - ["tenant", "create", "--tenant-id", tenant_id.hex, "--timeline-id", timeline_id.hex] + [ + "tenant", + "create", + "--tenant-id", + str(tenant_id), + "--timeline-id", + str(timeline_id), + ] ) else: res = self.raw_cli( - ["tenant", "create", "--tenant-id", tenant_id.hex, "--timeline-id", timeline_id.hex] + [ + "tenant", + "create", + "--tenant-id", + str(tenant_id), + "--timeline-id", + str(timeline_id), + ] + sum(list(map(lambda kv: (["-c", kv[0] + ":" + kv[1]]), conf.items())), []) ) res.check_returncode() return tenant_id, timeline_id - def config_tenant(self, tenant_id: uuid.UUID, conf: Dict[str, str]): + def config_tenant(self, tenant_id: ZTenantId, conf: Dict[str, str]): """ Update tenant config. """ if conf is None: - res = self.raw_cli(["tenant", "config", "--tenant-id", tenant_id.hex]) + res = self.raw_cli(["tenant", "config", "--tenant-id", str(tenant_id)]) else: res = self.raw_cli( - ["tenant", "config", "--tenant-id", tenant_id.hex] + ["tenant", "config", "--tenant-id", str(tenant_id)] + sum(list(map(lambda kv: (["-c", kv[0] + ":" + kv[1]]), conf.items())), []) ) res.check_returncode() @@ -1221,15 +1229,15 @@ class NeonCli(AbstractNeonCli): return res def create_timeline( - self, new_branch_name: str, tenant_id: Optional[uuid.UUID] = None - ) -> uuid.UUID: + self, new_branch_name: str, tenant_id: Optional[ZTenantId] = None + ) -> ZTimelineId: cmd = [ "timeline", "create", "--branch-name", new_branch_name, "--tenant-id", - (tenant_id or self.env.initial_tenant).hex, + str(tenant_id or self.env.initial_tenant), ] res = self.raw_cli(cmd) @@ -1241,16 +1249,16 @@ class NeonCli(AbstractNeonCli): if matches is not None: created_timeline_id = matches.group("timeline_id") - return uuid.UUID(created_timeline_id) + return ZTimelineId(str(created_timeline_id)) - def create_root_branch(self, branch_name: str, tenant_id: Optional[uuid.UUID] = None): + def create_root_branch(self, branch_name: str, tenant_id: Optional[ZTenantId] = None): cmd = [ "timeline", "create", "--branch-name", branch_name, "--tenant-id", - (tenant_id or self.env.initial_tenant).hex, + str(tenant_id or self.env.initial_tenant), ] res = self.raw_cli(cmd) @@ -1265,27 +1273,27 @@ class NeonCli(AbstractNeonCli): if created_timeline_id is None: raise Exception("could not find timeline id after `neon timeline create` invocation") else: - return uuid.UUID(created_timeline_id) + return ZTimelineId(created_timeline_id) def create_branch( self, new_branch_name: str = DEFAULT_BRANCH_NAME, ancestor_branch_name: Optional[str] = None, - tenant_id: Optional[uuid.UUID] = None, - ancestor_start_lsn: Optional[str] = None, - ) -> uuid.UUID: + tenant_id: Optional[ZTenantId] = None, + ancestor_start_lsn: Optional[Lsn] = None, + ) -> ZTimelineId: cmd = [ "timeline", "branch", "--branch-name", new_branch_name, "--tenant-id", - (tenant_id or self.env.initial_tenant).hex, + str(tenant_id or self.env.initial_tenant), ] if ancestor_branch_name is not None: cmd.extend(["--ancestor-branch-name", ancestor_branch_name]) if ancestor_start_lsn is not None: - cmd.extend(["--ancestor-start-lsn", ancestor_start_lsn]) + cmd.extend(["--ancestor-start-lsn", str(ancestor_start_lsn)]) res = self.raw_cli(cmd) res.check_returncode() @@ -1299,9 +1307,11 @@ class NeonCli(AbstractNeonCli): if created_timeline_id is None: raise Exception("could not find timeline id after `neon timeline create` invocation") else: - return uuid.UUID(created_timeline_id) + return ZTimelineId(str(created_timeline_id)) - def list_timelines(self, tenant_id: Optional[uuid.UUID] = None) -> List[Tuple[str, str]]: + def list_timelines( + self, tenant_id: Optional[ZTenantId] = None + ) -> List[Tuple[str, ZTimelineId]]: """ Returns a list of (branch_name, timeline_id) tuples out of parsed `neon timeline list` CLI output. """ @@ -1309,18 +1319,18 @@ class NeonCli(AbstractNeonCli): # (L) main [b49f7954224a0ad25cc0013ea107b54b] # (L) ┣━ @0/16B5A50: test_cli_branch_list_main [20f98c79111b9015d84452258b7d5540] res = self.raw_cli( - ["timeline", "list", "--tenant-id", (tenant_id or self.env.initial_tenant).hex] + ["timeline", "list", "--tenant-id", str(tenant_id or self.env.initial_tenant)] ) timelines_cli = sorted( map( - lambda branch_and_id: (branch_and_id[0], branch_and_id[1]), + lambda branch_and_id: (branch_and_id[0], ZTimelineId(branch_and_id[1])), TIMELINE_DATA_EXTRACTOR.findall(res.stdout), ) ) return timelines_cli def init( - self, config_toml: str, initial_timeline_id: Optional[uuid.UUID] = None + self, config_toml: str, initial_timeline_id: Optional[ZTimelineId] = None ) -> "subprocess.CompletedProcess[str]": with tempfile.NamedTemporaryFile(mode="w+") as tmp: tmp.write(config_toml) @@ -1328,7 +1338,7 @@ class NeonCli(AbstractNeonCli): cmd = ["init", f"--config={tmp.name}"] if initial_timeline_id: - cmd.extend(["--timeline-id", initial_timeline_id.hex]) + cmd.extend(["--timeline-id", str(initial_timeline_id)]) append_pageserver_param_overrides( params_to_update=cmd, remote_storage=self.env.remote_storage, @@ -1399,20 +1409,20 @@ class NeonCli(AbstractNeonCli): self, branch_name: str, node_name: Optional[str] = None, - tenant_id: Optional[uuid.UUID] = None, - lsn: Optional[str] = None, + tenant_id: Optional[ZTenantId] = None, + lsn: Optional[Lsn] = None, port: Optional[int] = None, ) -> "subprocess.CompletedProcess[str]": args = [ "pg", "create", "--tenant-id", - (tenant_id or self.env.initial_tenant).hex, + str(tenant_id or self.env.initial_tenant), "--branch-name", branch_name, ] if lsn is not None: - args.extend(["--lsn", lsn]) + args.extend(["--lsn", str(lsn)]) if port is not None: args.extend(["--port", str(port)]) if node_name is not None: @@ -1425,15 +1435,15 @@ class NeonCli(AbstractNeonCli): def pg_start( self, node_name: str, - tenant_id: Optional[uuid.UUID] = None, - lsn: Optional[str] = None, + tenant_id: Optional[ZTenantId] = None, + lsn: Optional[Lsn] = None, port: Optional[int] = None, ) -> "subprocess.CompletedProcess[str]": args = [ "pg", "start", "--tenant-id", - (tenant_id or self.env.initial_tenant).hex, + str(tenant_id or self.env.initial_tenant), ] if lsn is not None: args.append(f"--lsn={lsn}") @@ -1449,7 +1459,7 @@ class NeonCli(AbstractNeonCli): def pg_stop( self, node_name: str, - tenant_id: Optional[uuid.UUID] = None, + tenant_id: Optional[ZTenantId] = None, destroy=False, check_return_code=True, ) -> "subprocess.CompletedProcess[str]": @@ -1457,7 +1467,7 @@ class NeonCli(AbstractNeonCli): "pg", "stop", "--tenant-id", - (tenant_id or self.env.initial_tenant).hex, + str(tenant_id or self.env.initial_tenant), ] if destroy: args.append("--destroy") @@ -1856,7 +1866,7 @@ class Postgres(PgProtocol): """An object representing a running postgres daemon.""" def __init__( - self, env: NeonEnv, tenant_id: uuid.UUID, port: int, check_stop_result: bool = True + self, env: NeonEnv, tenant_id: ZTenantId, port: int, check_stop_result: bool = True ): super().__init__(host="localhost", port=port, user="cloud_admin", dbname="postgres") self.env = env @@ -1872,7 +1882,7 @@ class Postgres(PgProtocol): self, branch_name: str, node_name: Optional[str] = None, - lsn: Optional[str] = None, + lsn: Optional[Lsn] = None, config_lines: Optional[List[str]] = None, ) -> "Postgres": """ @@ -1887,7 +1897,7 @@ class Postgres(PgProtocol): self.env.neon_cli.pg_create( branch_name, node_name=self.node_name, tenant_id=self.tenant_id, lsn=lsn, port=self.port ) - path = Path("pgdatadirs") / "tenants" / self.tenant_id.hex / self.node_name + path = Path("pgdatadirs") / "tenants" / str(self.tenant_id) / self.node_name self.pgdata_dir = os.path.join(self.env.repo_dir, path) if config_lines is None: @@ -1918,7 +1928,7 @@ class Postgres(PgProtocol): def pg_data_dir_path(self) -> str: """Path to data directory""" assert self.node_name - path = Path("pgdatadirs") / "tenants" / self.tenant_id.hex / self.node_name + path = Path("pgdatadirs") / "tenants" / str(self.tenant_id) / self.node_name return os.path.join(self.env.repo_dir, path) def pg_xact_dir_path(self) -> str: @@ -2005,7 +2015,7 @@ class Postgres(PgProtocol): self, branch_name: str, node_name: Optional[str] = None, - lsn: Optional[str] = None, + lsn: Optional[Lsn] = None, config_lines: Optional[List[str]] = None, ) -> "Postgres": """ @@ -2046,8 +2056,8 @@ class PostgresFactory: self, branch_name: str, node_name: Optional[str] = None, - tenant_id: Optional[uuid.UUID] = None, - lsn: Optional[str] = None, + tenant_id: Optional[ZTenantId] = None, + lsn: Optional[Lsn] = None, config_lines: Optional[List[str]] = None, ) -> Postgres: @@ -2070,8 +2080,8 @@ class PostgresFactory: self, branch_name: str, node_name: Optional[str] = None, - tenant_id: Optional[uuid.UUID] = None, - lsn: Optional[str] = None, + tenant_id: Optional[ZTenantId] = None, + lsn: Optional[Lsn] = None, config_lines: Optional[List[str]] = None, ) -> Postgres: @@ -2146,7 +2156,7 @@ class Safekeeper: return self def append_logical_message( - self, tenant_id: uuid.UUID, timeline_id: uuid.UUID, request: Dict[str, Any] + self, tenant_id: ZTenantId, timeline_id: ZTimelineId, request: Dict[str, Any] ) -> Dict[str, Any]: """ Send JSON_CTRL query to append LogicalMessage to WAL and modify @@ -2156,7 +2166,7 @@ class Safekeeper: # "replication=0" hacks psycopg not to send additional queries # on startup, see https://github.com/psycopg/psycopg2/pull/482 - connstr = f"host=localhost port={self.port.pg} replication=0 options='-c ztimelineid={timeline_id.hex} ztenantid={tenant_id.hex}'" + connstr = f"host=localhost port={self.port.pg} replication=0 options='-c ztimelineid={timeline_id} ztenantid={tenant_id}'" with closing(psycopg2.connect(connstr)) as conn: # server doesn't support transactions @@ -2181,18 +2191,18 @@ class Safekeeper: @dataclass class SafekeeperTimelineStatus: acceptor_epoch: int - flush_lsn: str - timeline_start_lsn: str - backup_lsn: str - remote_consistent_lsn: str + flush_lsn: Lsn + timeline_start_lsn: Lsn + backup_lsn: Lsn + remote_consistent_lsn: Lsn @dataclass class SafekeeperMetrics: # These are metrics from Prometheus which uses float64 internally. # As a consequence, values may differ from real original int64s. - flush_lsn_inexact: Dict[Tuple[str, str], int] = field(default_factory=dict) - commit_lsn_inexact: Dict[Tuple[str, str], int] = field(default_factory=dict) + flush_lsn_inexact: Dict[Tuple[ZTenantId, ZTimelineId], int] = field(default_factory=dict) + commit_lsn_inexact: Dict[Tuple[ZTenantId, ZTimelineId], int] = field(default_factory=dict) class SafekeeperHttpClient(requests.Session): @@ -2209,26 +2219,30 @@ class SafekeeperHttpClient(requests.Session): def check_status(self): self.get(f"http://localhost:{self.port}/v1/status").raise_for_status() - def timeline_status(self, tenant_id: str, timeline_id: str) -> SafekeeperTimelineStatus: + def timeline_status( + self, tenant_id: ZTenantId, timeline_id: ZTimelineId + ) -> SafekeeperTimelineStatus: res = self.get(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}") res.raise_for_status() resj = res.json() return SafekeeperTimelineStatus( acceptor_epoch=resj["acceptor_state"]["epoch"], - flush_lsn=resj["flush_lsn"], - timeline_start_lsn=resj["timeline_start_lsn"], - backup_lsn=resj["backup_lsn"], - remote_consistent_lsn=resj["remote_consistent_lsn"], + flush_lsn=Lsn(resj["flush_lsn"]), + timeline_start_lsn=Lsn(resj["timeline_start_lsn"]), + backup_lsn=Lsn(resj["backup_lsn"]), + remote_consistent_lsn=Lsn(resj["remote_consistent_lsn"]), ) - def record_safekeeper_info(self, tenant_id: str, timeline_id: str, body): + def record_safekeeper_info(self, tenant_id: ZTenantId, timeline_id: ZTimelineId, body): res = self.post( f"http://localhost:{self.port}/v1/record_safekeeper_info/{tenant_id}/{timeline_id}", json=body, ) res.raise_for_status() - def timeline_delete_force(self, tenant_id: str, timeline_id: str) -> Dict[Any, Any]: + def timeline_delete_force( + self, tenant_id: ZTenantId, timeline_id: ZTimelineId + ) -> Dict[Any, Any]: res = self.delete( f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}" ) @@ -2237,7 +2251,7 @@ class SafekeeperHttpClient(requests.Session): assert isinstance(res_json, dict) return res_json - def tenant_delete_force(self, tenant_id: str) -> Dict[Any, Any]: + def tenant_delete_force(self, tenant_id: ZTenantId) -> Dict[Any, Any]: res = self.delete(f"http://localhost:{self.port}/v1/tenant/{tenant_id}") res.raise_for_status() res_json = res.json() @@ -2258,13 +2272,17 @@ class SafekeeperHttpClient(requests.Session): all_metrics_text, re.MULTILINE, ): - metrics.flush_lsn_inexact[(match.group(1), match.group(2))] = int(match.group(3)) + metrics.flush_lsn_inexact[ + (ZTenantId(match.group(1)), ZTimelineId(match.group(2))) + ] = int(match.group(3)) for match in re.finditer( r'^safekeeper_commit_lsn{tenant_id="([0-9a-f]+)",timeline_id="([0-9a-f]+)"} (\S+)$', all_metrics_text, re.MULTILINE, ): - metrics.commit_lsn_inexact[(match.group(1), match.group(2))] = int(match.group(3)) + metrics.commit_lsn_inexact[ + (ZTenantId(match.group(1)), ZTimelineId(match.group(2))) + ] = int(match.group(3)) return metrics @@ -2437,7 +2455,7 @@ def list_files_to_compare(pgdata_dir: Path): # pg is the existing and running compute node, that we want to compare with a basebackup def check_restored_datadir_content(test_output_dir: Path, env: NeonEnv, pg: Postgres): # Get the timeline ID. We need it for the 'basebackup' command - timeline = pg.safe_psql("SHOW neon.timeline_id")[0][0] + timeline = ZTimelineId(pg.safe_psql("SHOW neon.timeline_id")[0][0]) # stop postgres to ensure that files won't change pg.stop() @@ -2453,7 +2471,7 @@ def check_restored_datadir_content(test_output_dir: Path, env: NeonEnv, pg: Post {psql_path} \ --no-psqlrc \ postgres://localhost:{env.pageserver.service_port.pg} \ - -c 'basebackup {pg.tenant_id.hex} {timeline}' \ + -c 'basebackup {pg.tenant_id} {timeline}' \ | tar -x -C {restored_dir_path} """ @@ -2521,7 +2539,7 @@ def wait_until(number_of_iterations: int, interval: float, func): def assert_timeline_local( - pageserver_http_client: NeonPageserverHttpClient, tenant: uuid.UUID, timeline: uuid.UUID + pageserver_http_client: NeonPageserverHttpClient, tenant: ZTenantId, timeline: ZTimelineId ): timeline_detail = pageserver_http_client.timeline_detail( tenant, @@ -2535,33 +2553,33 @@ def assert_timeline_local( def assert_no_in_progress_downloads_for_tenant( pageserver_http_client: NeonPageserverHttpClient, - tenant: uuid.UUID, + tenant: ZTenantId, ): tenant_status = pageserver_http_client.tenant_status(tenant) assert tenant_status["has_in_progress_downloads"] is False, tenant_status def remote_consistent_lsn( - pageserver_http_client: NeonPageserverHttpClient, tenant: uuid.UUID, timeline: uuid.UUID -) -> int: + pageserver_http_client: NeonPageserverHttpClient, tenant: ZTenantId, timeline: ZTimelineId +) -> Lsn: detail = pageserver_http_client.timeline_detail(tenant, timeline) if detail["remote"] is None: # No remote information at all. This happens right after creating # a timeline, before any part of it has been uploaded to remote # storage yet. - return 0 + return Lsn(0) else: lsn_str = detail["remote"]["remote_consistent_lsn"] assert isinstance(lsn_str, str) - return lsn_from_hex(lsn_str) + return Lsn(lsn_str) def wait_for_upload( pageserver_http_client: NeonPageserverHttpClient, - tenant: uuid.UUID, - timeline: uuid.UUID, - lsn: int, + tenant: ZTenantId, + timeline: ZTimelineId, + lsn: Lsn, ): """waits for local timeline upload up to specified lsn""" for i in range(20): @@ -2570,32 +2588,32 @@ def wait_for_upload( return log.info( "waiting for remote_consistent_lsn to reach {}, now {}, iteration {}".format( - lsn_to_hex(lsn), lsn_to_hex(current_lsn), i + 1 + lsn, current_lsn, i + 1 ) ) time.sleep(1) raise Exception( "timed out while waiting for remote_consistent_lsn to reach {}, was {}".format( - lsn_to_hex(lsn), lsn_to_hex(current_lsn) + lsn, current_lsn ) ) def last_record_lsn( - pageserver_http_client: NeonPageserverHttpClient, tenant: uuid.UUID, timeline: uuid.UUID -) -> int: + pageserver_http_client: NeonPageserverHttpClient, tenant: ZTenantId, timeline: ZTimelineId +) -> Lsn: detail = pageserver_http_client.timeline_detail(tenant, timeline) lsn_str = detail["local"]["last_record_lsn"] assert isinstance(lsn_str, str) - return lsn_from_hex(lsn_str) + return Lsn(lsn_str) def wait_for_last_record_lsn( pageserver_http_client: NeonPageserverHttpClient, - tenant: uuid.UUID, - timeline: uuid.UUID, - lsn: int, + tenant: ZTenantId, + timeline: ZTimelineId, + lsn: Lsn, ): """waits for pageserver to catch up to a certain lsn""" for i in range(10): @@ -2604,20 +2622,18 @@ def wait_for_last_record_lsn( return log.info( "waiting for last_record_lsn to reach {}, now {}, iteration {}".format( - lsn_to_hex(lsn), lsn_to_hex(current_lsn), i + 1 + lsn, current_lsn, i + 1 ) ) time.sleep(1) raise Exception( - "timed out while waiting for last_record_lsn to reach {}, was {}".format( - lsn_to_hex(lsn), lsn_to_hex(current_lsn) - ) + "timed out while waiting for last_record_lsn to reach {}, was {}".format(lsn, current_lsn) ) -def wait_for_last_flush_lsn(env: NeonEnv, pg: Postgres, tenant: uuid.UUID, timeline: uuid.UUID): +def wait_for_last_flush_lsn(env: NeonEnv, pg: Postgres, tenant: ZTenantId, timeline: ZTimelineId): """Wait for pageserver to catch up the latest flush LSN""" - last_flush_lsn = lsn_from_hex(pg.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0]) + last_flush_lsn = Lsn(pg.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0]) wait_for_last_record_lsn(env.pageserver.http_client(), tenant, timeline, last_flush_lsn) @@ -2626,8 +2642,8 @@ def fork_at_current_lsn( pg: Postgres, new_branch_name: str, ancestor_branch_name: str, - tenant_id: Optional[uuid.UUID] = None, -) -> uuid.UUID: + tenant_id: Optional[ZTenantId] = None, +) -> ZTimelineId: """ Create new branch at the last LSN of an existing branch. The "last LSN" is taken from the given Postgres instance. The pageserver will wait for all the diff --git a/test_runner/fixtures/types.py b/test_runner/fixtures/types.py new file mode 100644 index 0000000000..d5cb200080 --- /dev/null +++ b/test_runner/fixtures/types.py @@ -0,0 +1,89 @@ +import random +from functools import total_ordering +from typing import Union + + +@total_ordering +class Lsn: + """ + Datatype for an LSN. Internally it is a 64-bit integer, but the string + representation is like "1/123abcd". See also pg_lsn datatype in Postgres + """ + + def __init__(self, x: Union[int, str]): + if isinstance(x, int): + self.lsn_int = x + else: + """Convert lsn from hex notation to int.""" + l, r = x.split("/") + self.lsn_int = (int(l, 16) << 32) + int(r, 16) + # FIXME: error if it doesn't look like a valid LSN + + def __str__(self): + """Convert lsn from int to standard hex notation.""" + return "{:X}/{:X}".format(self.lsn_int >> 32, self.lsn_int & 0xFFFFFFFF) + + def __repr__(self): + return 'Lsn("{:X}/{:X}")'.format(self.lsn_int >> 32, self.lsn_int & 0xFFFFFFFF) + + def __int__(self): + return self.lsn_int + + def __lt__(self, other: "Lsn") -> bool: + return self.lsn_int < other.lsn_int + + def __eq__(self, other) -> bool: + if not isinstance(other, Lsn): + return NotImplemented + return self.lsn_int == other.lsn_int + + # Returns the difference between two Lsns, in bytes + def __sub__(self, other: "Lsn") -> int: + return self.lsn_int - other.lsn_int + + def __hash__(self): + return hash(self.lsn_int) + + +@total_ordering +class ZId: + """ + Datatype for a Neon tenant and timeline IDs. Internally it's a 16-byte array, and + the string representation is in hex. This corresponds to the ZId / ZTenantId / + ZTimelineIds in in the Rust code. + """ + + def __init__(self, x: str): + self.id = bytearray.fromhex(x) + assert len(self.id) == 16 + + def __str__(self): + return self.id.hex() + + def __lt__(self, other) -> bool: + if not isinstance(other, type(self)): + return NotImplemented + return self.id < other.id + + def __eq__(self, other) -> bool: + if not isinstance(other, type(self)): + return NotImplemented + return self.id == other.id + + def __hash__(self): + return hash(str(self.id)) + + @classmethod + def generate(cls): + """Generate a random ID""" + return cls(random.randbytes(16).hex()) + + +class ZTenantId(ZId): + def __repr__(self): + return f'ZTenantId("{self.id.hex()}")' + + +class ZTimelineId(ZId): + def __repr__(self): + return f'ZTimelineId("{self.id.hex()}")' diff --git a/test_runner/fixtures/utils.py b/test_runner/fixtures/utils.py index 88bf6d634d..726116e53c 100644 --- a/test_runner/fixtures/utils.py +++ b/test_runner/fixtures/utils.py @@ -61,17 +61,6 @@ def global_counter() -> int: return _global_counter -def lsn_to_hex(num: int) -> str: - """Convert lsn from int to standard hex notation.""" - return "{:X}/{:X}".format(num >> 32, num & 0xFFFFFFFF) - - -def lsn_from_hex(lsn_hex: str) -> int: - """Convert lsn from hex notation to int.""" - l, r = lsn_hex.split("/") - return (int(l, 16) << 32) + int(r, 16) - - def print_gc_result(row): log.info("GC duration {elapsed} ms".format_map(row)) log.info( diff --git a/test_runner/performance/test_wal_backpressure.py b/test_runner/performance/test_wal_backpressure.py index 03d5ba208a..47e2435052 100644 --- a/test_runner/performance/test_wal_backpressure.py +++ b/test_runner/performance/test_wal_backpressure.py @@ -9,7 +9,7 @@ from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker from fixtures.compare_fixtures import NeonCompare, PgCompare, VanillaCompare from fixtures.log_helper import log from fixtures.neon_fixtures import DEFAULT_BRANCH_NAME, NeonEnvBuilder, PgBin -from fixtures.utils import lsn_from_hex +from fixtures.types import Lsn from performance.test_perf_pgbench import get_durations_matrix, get_scales_matrix @@ -198,8 +198,8 @@ def record_lsn_write_lag(env: PgCompare, run_cond: Callable[[], bool], pool_inte return lsn_write_lags = [] - last_received_lsn = 0 - last_pg_flush_lsn = 0 + last_received_lsn = Lsn(0) + last_pg_flush_lsn = Lsn(0) with env.pg.connect().cursor() as cur: cur.execute("CREATE EXTENSION neon") @@ -218,11 +218,11 @@ def record_lsn_write_lag(env: PgCompare, run_cond: Callable[[], bool], pool_inte res = cur.fetchone() lsn_write_lags.append(res[0]) - curr_received_lsn = lsn_from_hex(res[3]) + curr_received_lsn = Lsn(res[3]) lsn_process_speed = (curr_received_lsn - last_received_lsn) / (1024**2) last_received_lsn = curr_received_lsn - curr_pg_flush_lsn = lsn_from_hex(res[2]) + curr_pg_flush_lsn = Lsn(res[2]) lsn_produce_speed = (curr_pg_flush_lsn - last_pg_flush_lsn) / (1024**2) last_pg_flush_lsn = curr_pg_flush_lsn diff --git a/test_runner/regress/test_ancestor_branch.py b/test_runner/regress/test_ancestor_branch.py index 96612a8aef..b8e81824b0 100644 --- a/test_runner/regress/test_ancestor_branch.py +++ b/test_runner/regress/test_ancestor_branch.py @@ -1,5 +1,6 @@ from fixtures.log_helper import log from fixtures.neon_fixtures import NeonEnvBuilder +from fixtures.types import ZTimelineId from fixtures.utils import query_scalar @@ -26,7 +27,7 @@ def test_ancestor_branch(neon_env_builder: NeonEnvBuilder): pg_branch0 = env.postgres.create_start("main", tenant_id=tenant) branch0_cur = pg_branch0.connect().cursor() - branch0_timeline = query_scalar(branch0_cur, "SHOW neon.timeline_id") + branch0_timeline = ZTimelineId(query_scalar(branch0_cur, "SHOW neon.timeline_id")) log.info(f"b0 timeline {branch0_timeline}") # Create table, and insert 100k rows. @@ -50,7 +51,7 @@ def test_ancestor_branch(neon_env_builder: NeonEnvBuilder): log.info("postgres is running on 'branch1' branch") branch1_cur = pg_branch1.connect().cursor() - branch1_timeline = query_scalar(branch1_cur, "SHOW neon.timeline_id") + branch1_timeline = ZTimelineId(query_scalar(branch1_cur, "SHOW neon.timeline_id")) log.info(f"b1 timeline {branch1_timeline}") branch1_lsn = query_scalar(branch1_cur, "SELECT pg_current_wal_insert_lsn()") @@ -73,7 +74,7 @@ def test_ancestor_branch(neon_env_builder: NeonEnvBuilder): log.info("postgres is running on 'branch2' branch") branch2_cur = pg_branch2.connect().cursor() - branch2_timeline = query_scalar(branch2_cur, "SHOW neon.timeline_id") + branch2_timeline = ZTimelineId(query_scalar(branch2_cur, "SHOW neon.timeline_id")) log.info(f"b2 timeline {branch2_timeline}") branch2_lsn = query_scalar(branch2_cur, "SELECT pg_current_wal_insert_lsn()") @@ -91,7 +92,7 @@ def test_ancestor_branch(neon_env_builder: NeonEnvBuilder): log.info(f"LSN after 300k rows: {lsn_300}") # Run compaction on branch1. - compact = f"compact {tenant.hex} {branch1_timeline} {lsn_200}" + compact = f"compact {tenant} {branch1_timeline} {lsn_200}" log.info(compact) env.pageserver.safe_psql(compact) diff --git a/test_runner/regress/test_auth.py b/test_runner/regress/test_auth.py index 16d6ae45c3..08e38e1461 100644 --- a/test_runner/regress/test_auth.py +++ b/test_runner/regress/test_auth.py @@ -1,8 +1,8 @@ from contextlib import closing -from uuid import uuid4 import pytest from fixtures.neon_fixtures import NeonEnvBuilder, NeonPageserverApiException +from fixtures.types import ZTenantId def test_pageserver_auth(neon_env_builder: NeonEnvBuilder): @@ -11,9 +11,9 @@ def test_pageserver_auth(neon_env_builder: NeonEnvBuilder): ps = env.pageserver - tenant_token = env.auth_keys.generate_tenant_token(env.initial_tenant.hex) + tenant_token = env.auth_keys.generate_tenant_token(env.initial_tenant) tenant_http_client = env.pageserver.http_client(tenant_token) - invalid_tenant_token = env.auth_keys.generate_tenant_token(uuid4().hex) + invalid_tenant_token = env.auth_keys.generate_tenant_token(ZTenantId.generate()) invalid_tenant_http_client = env.pageserver.http_client(invalid_tenant_token) management_token = env.auth_keys.generate_management_token() diff --git a/test_runner/regress/test_branch_and_gc.py b/test_runner/regress/test_branch_and_gc.py index deb041b5d1..c8c5929066 100644 --- a/test_runner/regress/test_branch_and_gc.py +++ b/test_runner/regress/test_branch_and_gc.py @@ -4,7 +4,8 @@ import time import pytest from fixtures.log_helper import log from fixtures.neon_fixtures import NeonEnv -from fixtures.utils import lsn_from_hex, query_scalar +from fixtures.types import Lsn +from fixtures.utils import query_scalar # Test the GC implementation when running with branching. @@ -74,18 +75,16 @@ def test_branch_and_gc(neon_simple_env: NeonEnv): "CREATE TABLE foo(key serial primary key, t text default 'foooooooooooooooooooooooooooooooooooooooooooooooooooo')" ) main_cur.execute("INSERT INTO foo SELECT FROM generate_series(1, 100000)") - lsn1 = query_scalar(main_cur, "SELECT pg_current_wal_insert_lsn()") + lsn1 = Lsn(query_scalar(main_cur, "SELECT pg_current_wal_insert_lsn()")) log.info(f"LSN1: {lsn1}") main_cur.execute("INSERT INTO foo SELECT FROM generate_series(1, 100000)") - lsn2 = query_scalar(main_cur, "SELECT pg_current_wal_insert_lsn()") + lsn2 = Lsn(query_scalar(main_cur, "SELECT pg_current_wal_insert_lsn()")) log.info(f"LSN2: {lsn2}") # Set the GC horizon so that lsn1 is inside the horizon, which means # we can create a new branch starting from lsn1. - env.pageserver.safe_psql( - f"do_gc {tenant.hex} {timeline_main.hex} {lsn_from_hex(lsn2) - lsn_from_hex(lsn1) + 1024}" - ) + env.pageserver.safe_psql(f"do_gc {tenant} {timeline_main} {lsn2 - lsn1 + 1024}") env.neon_cli.create_branch( "test_branch", "test_main", tenant_id=tenant, ancestor_start_lsn=lsn1 @@ -143,7 +142,7 @@ def test_branch_creation_before_gc(neon_simple_env: NeonEnv): "INSERT INTO t SELECT FROM generate_series(1, 100000)", ] ) - lsn = res[2][0][0] + lsn = Lsn(res[2][0][0]) # Use `failpoint=sleep` and `threading` to make the GC iteration triggers *before* the # branch creation task but the individual timeline GC iteration happens *after* @@ -151,7 +150,7 @@ def test_branch_creation_before_gc(neon_simple_env: NeonEnv): env.pageserver.safe_psql("failpoints before-timeline-gc=sleep(2000)") def do_gc(): - env.pageserver.safe_psql(f"do_gc {tenant.hex} {b0.hex} 0") + env.pageserver.safe_psql(f"do_gc {tenant} {b0} 0") thread = threading.Thread(target=do_gc, daemon=True) thread.start() diff --git a/test_runner/regress/test_branch_behind.py b/test_runner/regress/test_branch_behind.py index 51946380d2..5bd6368bfc 100644 --- a/test_runner/regress/test_branch_behind.py +++ b/test_runner/regress/test_branch_behind.py @@ -2,6 +2,7 @@ import psycopg2.extras import pytest from fixtures.log_helper import log from fixtures.neon_fixtures import NeonEnvBuilder +from fixtures.types import Lsn, ZTimelineId from fixtures.utils import print_gc_result, query_scalar @@ -27,13 +28,13 @@ def test_branch_behind(neon_env_builder: NeonEnvBuilder): main_cur = pgmain.connect().cursor() - timeline = query_scalar(main_cur, "SHOW neon.timeline_id") + timeline = ZTimelineId(query_scalar(main_cur, "SHOW neon.timeline_id")) # Create table, and insert the first 100 rows main_cur.execute("CREATE TABLE foo (t text)") # keep some early lsn to test branch creation on out of date lsn - gced_lsn = query_scalar(main_cur, "SELECT pg_current_wal_insert_lsn()") + gced_lsn = Lsn(query_scalar(main_cur, "SELECT pg_current_wal_insert_lsn()")) main_cur.execute( """ @@ -42,7 +43,7 @@ def test_branch_behind(neon_env_builder: NeonEnvBuilder): FROM generate_series(1, 100) g """ ) - lsn_a = query_scalar(main_cur, "SELECT pg_current_wal_insert_lsn()") + lsn_a = Lsn(query_scalar(main_cur, "SELECT pg_current_wal_insert_lsn()")) log.info(f"LSN after 100 rows: {lsn_a}") # Insert some more rows. (This generates enough WAL to fill a few segments.) @@ -53,7 +54,7 @@ def test_branch_behind(neon_env_builder: NeonEnvBuilder): FROM generate_series(1, 200000) g """ ) - lsn_b = query_scalar(main_cur, "SELECT pg_current_wal_insert_lsn()") + lsn_b = Lsn(query_scalar(main_cur, "SELECT pg_current_wal_insert_lsn()")) log.info(f"LSN after 200100 rows: {lsn_b}") # Branch at the point where only 100 rows were inserted @@ -69,7 +70,7 @@ def test_branch_behind(neon_env_builder: NeonEnvBuilder): FROM generate_series(1, 200000) g """ ) - lsn_c = query_scalar(main_cur, "SELECT pg_current_wal_insert_lsn()") + lsn_c = Lsn(query_scalar(main_cur, "SELECT pg_current_wal_insert_lsn()")) log.info(f"LSN after 400100 rows: {lsn_c}") @@ -96,25 +97,25 @@ def test_branch_behind(neon_env_builder: NeonEnvBuilder): # branch at segment boundary env.neon_cli.create_branch( - "test_branch_segment_boundary", "test_branch_behind", ancestor_start_lsn="0/3000000" + "test_branch_segment_boundary", "test_branch_behind", ancestor_start_lsn=Lsn("0/3000000") ) pg = env.postgres.create_start("test_branch_segment_boundary") assert pg.safe_psql("SELECT 1")[0][0] == 1 # branch at pre-initdb lsn with pytest.raises(Exception, match="invalid branch start lsn"): - env.neon_cli.create_branch("test_branch_preinitdb", ancestor_start_lsn="0/42") + env.neon_cli.create_branch("test_branch_preinitdb", ancestor_start_lsn=Lsn("0/42")) # branch at pre-ancestor lsn with pytest.raises(Exception, match="less than timeline ancestor lsn"): env.neon_cli.create_branch( - "test_branch_preinitdb", "test_branch_behind", ancestor_start_lsn="0/42" + "test_branch_preinitdb", "test_branch_behind", ancestor_start_lsn=Lsn("0/42") ) # check that we cannot create branch based on garbage collected data with env.pageserver.cursor(cursor_factory=psycopg2.extras.DictCursor) as pscur: # call gc to advace latest_gc_cutoff_lsn - pscur.execute(f"do_gc {env.initial_tenant.hex} {timeline} 0") + pscur.execute(f"do_gc {env.initial_tenant} {timeline} 0") row = pscur.fetchone() print_gc_result(row) diff --git a/test_runner/regress/test_broken_timeline.py b/test_runner/regress/test_broken_timeline.py index c4b23c24b8..bf44dfd949 100644 --- a/test_runner/regress/test_broken_timeline.py +++ b/test_runner/regress/test_broken_timeline.py @@ -5,7 +5,7 @@ from typing import List, Tuple import pytest from fixtures.log_helper import log from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, Postgres -from fixtures.utils import query_scalar +from fixtures.types import ZTenantId, ZTimelineId # Test restarting page server, while safekeeper and compute node keep @@ -15,19 +15,15 @@ def test_broken_timeline(neon_env_builder: NeonEnvBuilder): neon_env_builder.num_safekeepers = 3 env = neon_env_builder.init_start() - tenant_timelines: List[Tuple[str, str, Postgres]] = [] + tenant_timelines: List[Tuple[ZTenantId, ZTimelineId, Postgres]] = [] for n in range(4): - tenant_id_uuid, timeline_id_uuid = env.neon_cli.create_tenant() - tenant_id = tenant_id_uuid.hex - timeline_id = timeline_id_uuid.hex + tenant_id, timeline_id = env.neon_cli.create_tenant() - pg = env.postgres.create_start("main", tenant_id=tenant_id_uuid) + pg = env.postgres.create_start("main", tenant_id=tenant_id) with pg.cursor() as cur: cur.execute("CREATE TABLE t(key int primary key, value text)") cur.execute("INSERT INTO t SELECT generate_series(1,100), 'payload'") - - timeline_id = query_scalar(cur, "SHOW neon.timeline_id") pg.stop() tenant_timelines.append((tenant_id, timeline_id, pg)) @@ -109,5 +105,5 @@ def test_fix_broken_timelines_on_startup(neon_simple_env: NeonEnv): env.neon_cli.pageserver_start() # Check that tenant with "broken" timeline is not loaded. - with pytest.raises(Exception, match=f"Failed to get repo for tenant {tenant_id.hex}"): + with pytest.raises(Exception, match=f"Failed to get repo for tenant {tenant_id}"): env.neon_cli.list_timelines(tenant_id) diff --git a/test_runner/regress/test_fullbackup.py b/test_runner/regress/test_fullbackup.py index 8155f52060..af94865549 100644 --- a/test_runner/regress/test_fullbackup.py +++ b/test_runner/regress/test_fullbackup.py @@ -8,6 +8,7 @@ from fixtures.neon_fixtures import ( VanillaPostgres, pg_distrib_dir, ) +from fixtures.types import Lsn, ZTimelineId from fixtures.utils import query_scalar, subprocess_capture num_rows = 1000 @@ -26,7 +27,7 @@ def test_fullbackup( log.info("postgres is running on 'test_fullbackup' branch") with pgmain.cursor() as cur: - timeline = query_scalar(cur, "SHOW neon.timeline_id") + timeline = ZTimelineId(query_scalar(cur, "SHOW neon.timeline_id")) # data loading may take a while, so increase statement timeout cur.execute("SET statement_timeout='300s'") @@ -36,7 +37,7 @@ def test_fullbackup( ) cur.execute("CHECKPOINT") - lsn = query_scalar(cur, "SELECT pg_current_wal_insert_lsn()") + lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_insert_lsn()")) log.info(f"start_backup_lsn = {lsn}") # Set LD_LIBRARY_PATH in the env properly, otherwise we may use the wrong libpq. @@ -46,7 +47,7 @@ def test_fullbackup( # Get and unpack fullbackup from pageserver restored_dir_path = env.repo_dir / "restored_datadir" os.mkdir(restored_dir_path, 0o750) - query = f"fullbackup {env.initial_tenant.hex} {timeline} {lsn}" + query = f"fullbackup {env.initial_tenant} {timeline} {lsn}" cmd = ["psql", "--no-psqlrc", env.pageserver.connstr(), "-c", query] result_basepath = pg_bin.run_capture(cmd, env=psql_env) tar_output_file = result_basepath + ".stdout" diff --git a/test_runner/regress/test_gc_aggressive.py b/test_runner/regress/test_gc_aggressive.py index 90824f882a..67ce8871cd 100644 --- a/test_runner/regress/test_gc_aggressive.py +++ b/test_runner/regress/test_gc_aggressive.py @@ -3,6 +3,7 @@ import random from fixtures.log_helper import log from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, Postgres +from fixtures.types import ZTimelineId from fixtures.utils import query_scalar # Test configuration @@ -28,15 +29,15 @@ async def update_table(pg: Postgres): # Perform aggressive GC with 0 horizon -async def gc(env: NeonEnv, timeline: str): +async def gc(env: NeonEnv, timeline: ZTimelineId): psconn = await env.pageserver.connect_async() while updates_performed < updates_to_perform: - await psconn.execute(f"do_gc {env.initial_tenant.hex} {timeline} 0") + await psconn.execute(f"do_gc {env.initial_tenant} {timeline} 0") # At the same time, run UPDATEs and GC -async def update_and_gc(env: NeonEnv, pg: Postgres, timeline: str): +async def update_and_gc(env: NeonEnv, pg: Postgres, timeline: ZTimelineId): workers = [] for worker_id in range(num_connections): workers.append(asyncio.create_task(update_table(pg))) @@ -61,7 +62,7 @@ def test_gc_aggressive(neon_env_builder: NeonEnvBuilder): log.info("postgres is running on test_gc_aggressive branch") with pg.cursor() as cur: - timeline = query_scalar(cur, "SHOW neon.timeline_id") + timeline = ZTimelineId(query_scalar(cur, "SHOW neon.timeline_id")) # Create table, and insert the first 100 rows cur.execute("CREATE TABLE foo (id int, counter int, t text)") diff --git a/test_runner/regress/test_import.py b/test_runner/regress/test_import.py index a2671727f7..fc9f41bda0 100644 --- a/test_runner/regress/test_import.py +++ b/test_runner/regress/test_import.py @@ -5,7 +5,6 @@ import shutil import tarfile from contextlib import closing from pathlib import Path -from uuid import UUID, uuid4 import pytest from fixtures.log_helper import log @@ -18,7 +17,8 @@ from fixtures.neon_fixtures import ( wait_for_last_record_lsn, wait_for_upload, ) -from fixtures.utils import lsn_from_hex, subprocess_capture +from fixtures.types import Lsn, ZTenantId, ZTimelineId +from fixtures.utils import subprocess_capture @pytest.mark.timeout(600) @@ -69,8 +69,8 @@ def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_build end_lsn = manifest["WAL-Ranges"][0]["End-LSN"] node_name = "import_from_vanilla" - tenant = uuid4() - timeline = uuid4() + tenant = ZTenantId.generate() + timeline = ZTimelineId.generate() # Set up pageserver for import neon_env_builder.enable_local_fs_remote_storage() @@ -83,9 +83,9 @@ def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_build "timeline", "import", "--tenant-id", - tenant.hex, + str(tenant), "--timeline-id", - timeline.hex, + str(timeline), "--node-name", node_name, "--base-lsn", @@ -112,8 +112,8 @@ def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_build import_tar(base_tar, wal_tar) # Wait for data to land in s3 - wait_for_last_record_lsn(client, tenant, timeline, lsn_from_hex(end_lsn)) - wait_for_upload(client, tenant, timeline, lsn_from_hex(end_lsn)) + wait_for_last_record_lsn(client, tenant, timeline, Lsn(end_lsn)) + wait_for_upload(client, tenant, timeline, Lsn(end_lsn)) # Check it worked pg = env.postgres.create_start(node_name, tenant_id=tenant) @@ -173,7 +173,7 @@ def test_import_from_pageserver_multisegment(pg_bin: PgBin, neon_env_builder: Ne assert cnt_seg_files > 0 -def _generate_data(num_rows: int, pg: Postgres) -> str: +def _generate_data(num_rows: int, pg: Postgres) -> Lsn: """Generate a table with `num_rows` rows. Returns: @@ -191,10 +191,12 @@ def _generate_data(num_rows: int, pg: Postgres) -> str: cur.execute("SELECT pg_current_wal_insert_lsn()") res = cur.fetchone() assert res is not None and isinstance(res[0], str) - return res[0] + return Lsn(res[0]) -def _import(expected_num_rows: int, lsn: str, env: NeonEnv, pg_bin: PgBin, timeline: UUID) -> str: +def _import( + expected_num_rows: int, lsn: Lsn, env: NeonEnv, pg_bin: PgBin, timeline: ZTimelineId +) -> str: """Test importing backup data to the pageserver. Args: @@ -210,7 +212,7 @@ def _import(expected_num_rows: int, lsn: str, env: NeonEnv, pg_bin: PgBin, timel psql_env = {"LD_LIBRARY_PATH": os.path.join(str(pg_distrib_dir), "lib")} # Get a fullbackup from pageserver - query = f"fullbackup { env.initial_tenant.hex} {timeline.hex} {lsn}" + query = f"fullbackup { env.initial_tenant} {timeline} {lsn}" cmd = ["psql", "--no-psqlrc", env.pageserver.connstr(), "-c", query] result_basepath = pg_bin.run_capture(cmd, env=psql_env) tar_output_file = result_basepath + ".stdout" @@ -228,7 +230,7 @@ def _import(expected_num_rows: int, lsn: str, env: NeonEnv, pg_bin: PgBin, timel # Import using another tenantid, because we use the same pageserver. # TODO Create another pageserver to make test more realistic. - tenant = uuid4() + tenant = ZTenantId.generate() # Import to pageserver node_name = "import_from_pageserver" @@ -239,28 +241,28 @@ def _import(expected_num_rows: int, lsn: str, env: NeonEnv, pg_bin: PgBin, timel "timeline", "import", "--tenant-id", - tenant.hex, + str(tenant), "--timeline-id", - timeline.hex, + str(timeline), "--node-name", node_name, "--base-lsn", - lsn, + str(lsn), "--base-tarfile", os.path.join(tar_output_file), ] ) # Wait for data to land in s3 - wait_for_last_record_lsn(client, tenant, timeline, lsn_from_hex(lsn)) - wait_for_upload(client, tenant, timeline, lsn_from_hex(lsn)) + wait_for_last_record_lsn(client, tenant, timeline, lsn) + wait_for_upload(client, tenant, timeline, lsn) # Check it worked pg = env.postgres.create_start(node_name, tenant_id=tenant) assert pg.safe_psql("select count(*) from tbl") == [(expected_num_rows,)] # Take another fullbackup - query = f"fullbackup { tenant.hex} {timeline.hex} {lsn}" + query = f"fullbackup { tenant} {timeline} {lsn}" cmd = ["psql", "--no-psqlrc", env.pageserver.connstr(), "-c", query] result_basepath = pg_bin.run_capture(cmd, env=psql_env) new_tar_output_file = result_basepath + ".stdout" @@ -272,6 +274,6 @@ def _import(expected_num_rows: int, lsn: str, env: NeonEnv, pg_bin: PgBin, timel # Check that gc works psconn = env.pageserver.connect() pscur = psconn.cursor() - pscur.execute(f"do_gc {tenant.hex} {timeline.hex} 0") + pscur.execute(f"do_gc {tenant} {timeline} 0") return tar_output_file diff --git a/test_runner/regress/test_lsn_mapping.py b/test_runner/regress/test_lsn_mapping.py index 0c1d3648f2..f6ca7000dd 100644 --- a/test_runner/regress/test_lsn_mapping.py +++ b/test_runner/regress/test_lsn_mapping.py @@ -41,7 +41,7 @@ def test_lsn_mapping(neon_env_builder: NeonEnvBuilder): probe_timestamp = tbl[-1][1] + timedelta(hours=1) result = query_scalar( ps_cur, - f"get_lsn_by_timestamp {env.initial_tenant.hex} {new_timeline_id.hex} '{probe_timestamp.isoformat()}Z'", + f"get_lsn_by_timestamp {env.initial_tenant} {new_timeline_id} '{probe_timestamp.isoformat()}Z'", ) assert result == "future" @@ -49,7 +49,7 @@ def test_lsn_mapping(neon_env_builder: NeonEnvBuilder): probe_timestamp = tbl[0][1] - timedelta(hours=10) result = query_scalar( ps_cur, - f"get_lsn_by_timestamp {env.initial_tenant.hex} {new_timeline_id.hex} '{probe_timestamp.isoformat()}Z'", + f"get_lsn_by_timestamp {env.initial_tenant} {new_timeline_id} '{probe_timestamp.isoformat()}Z'", ) assert result == "past" @@ -60,7 +60,7 @@ def test_lsn_mapping(neon_env_builder: NeonEnvBuilder): # Call get_lsn_by_timestamp to get the LSN lsn = query_scalar( ps_cur, - f"get_lsn_by_timestamp {env.initial_tenant.hex} {new_timeline_id.hex} '{probe_timestamp.isoformat()}Z'", + f"get_lsn_by_timestamp {env.initial_tenant} {new_timeline_id} '{probe_timestamp.isoformat()}Z'", ) # Launch a new read-only node at that LSN, and check that only the rows diff --git a/test_runner/regress/test_neon_cli.py b/test_runner/regress/test_neon_cli.py index 1acfa72127..b2342e5ee8 100644 --- a/test_runner/regress/test_neon_cli.py +++ b/test_runner/regress/test_neon_cli.py @@ -1,4 +1,3 @@ -import uuid from typing import cast import requests @@ -8,10 +7,11 @@ from fixtures.neon_fixtures import ( NeonEnvBuilder, NeonPageserverHttpClient, ) +from fixtures.types import ZTenantId, ZTimelineId def helper_compare_timeline_list( - pageserver_http_client: NeonPageserverHttpClient, env: NeonEnv, initial_tenant: uuid.UUID + pageserver_http_client: NeonPageserverHttpClient, env: NeonEnv, initial_tenant: ZTenantId ): """ Compare timelines list returned by CLI and directly via API. @@ -20,7 +20,7 @@ def helper_compare_timeline_list( timelines_api = sorted( map( - lambda t: cast(str, t["timeline_id"]), + lambda t: ZTimelineId(t["timeline_id"]), pageserver_http_client.timeline_list(initial_tenant), ) ) @@ -52,8 +52,8 @@ def test_cli_timeline_list(neon_simple_env: NeonEnv): # Check that all new branches are visible via CLI timelines_cli = [timeline_id for (_, timeline_id) in env.neon_cli.list_timelines()] - assert main_timeline_id.hex in timelines_cli - assert nested_timeline_id.hex in timelines_cli + assert main_timeline_id in timelines_cli + assert nested_timeline_id in timelines_cli def helper_compare_tenant_list(pageserver_http_client: NeonPageserverHttpClient, env: NeonEnv): @@ -85,11 +85,11 @@ def test_cli_tenant_list(neon_simple_env: NeonEnv): helper_compare_tenant_list(pageserver_http_client, env) res = env.neon_cli.list_tenants() - tenants = sorted(map(lambda t: t.split()[0], res.stdout.splitlines())) + tenants = sorted(map(lambda t: ZTenantId(t.split()[0]), res.stdout.splitlines())) - assert env.initial_tenant.hex in tenants - assert tenant1.hex in tenants - assert tenant2.hex in tenants + assert env.initial_tenant in tenants + assert tenant1 in tenants + assert tenant2 in tenants def test_cli_tenant_create(neon_simple_env: NeonEnv): diff --git a/test_runner/regress/test_old_request_lsn.py b/test_runner/regress/test_old_request_lsn.py index 257913ef3f..2b5e2edb5f 100644 --- a/test_runner/regress/test_old_request_lsn.py +++ b/test_runner/regress/test_old_request_lsn.py @@ -1,6 +1,7 @@ import psycopg2.extras from fixtures.log_helper import log from fixtures.neon_fixtures import NeonEnvBuilder +from fixtures.types import ZTimelineId from fixtures.utils import print_gc_result, query_scalar @@ -26,7 +27,7 @@ def test_old_request_lsn(neon_env_builder: NeonEnvBuilder): cur = pg_conn.cursor() # Get the timeline ID of our branch. We need it for the 'do_gc' command - timeline = query_scalar(cur, "SHOW neon.timeline_id") + timeline = ZTimelineId(query_scalar(cur, "SHOW neon.timeline_id")) psconn = env.pageserver.connect() pscur = psconn.cursor(cursor_factory=psycopg2.extras.DictCursor) @@ -60,9 +61,9 @@ def test_old_request_lsn(neon_env_builder: NeonEnvBuilder): # Make a lot of updates on a single row, generating a lot of WAL. Trigger # garbage collections so that the page server will remove old page versions. for i in range(10): - pscur.execute(f"do_gc {env.initial_tenant.hex} {timeline} 0") - row = pscur.fetchone() - print_gc_result(row) + pscur.execute(f"do_gc {env.initial_tenant} {timeline} 0") + gcrow = pscur.fetchone() + print_gc_result(gcrow) for j in range(100): cur.execute("UPDATE foo SET val = val + 1 WHERE id = 1;") diff --git a/test_runner/regress/test_pageserver_api.py b/test_runner/regress/test_pageserver_api.py index 869f53ac0a..8ee38fcf4f 100644 --- a/test_runner/regress/test_pageserver_api.py +++ b/test_runner/regress/test_pageserver_api.py @@ -1,7 +1,6 @@ import pathlib import subprocess from typing import Optional -from uuid import UUID, uuid4 from fixtures.neon_fixtures import ( DEFAULT_BRANCH_NAME, @@ -12,7 +11,7 @@ from fixtures.neon_fixtures import ( pg_distrib_dir, wait_until, ) -from fixtures.utils import lsn_from_hex +from fixtures.types import Lsn, ZTenantId, ZTimelineId # test that we cannot override node id after init @@ -61,39 +60,39 @@ def test_pageserver_init_node_id(neon_simple_env: NeonEnv): assert "has node id already, it cannot be overridden" in bad_update.stderr -def check_client(client: NeonPageserverHttpClient, initial_tenant: UUID): +def check_client(client: NeonPageserverHttpClient, initial_tenant: ZTenantId): client.check_status() # check initial tenant is there - assert initial_tenant.hex in {t["id"] for t in client.tenant_list()} + assert initial_tenant in {ZTenantId(t["id"]) for t in client.tenant_list()} # create new tenant and check it is also there - tenant_id = uuid4() + tenant_id = ZTenantId.generate() client.tenant_create(tenant_id) - assert tenant_id.hex in {t["id"] for t in client.tenant_list()} + assert tenant_id in {ZTenantId(t["id"]) for t in client.tenant_list()} timelines = client.timeline_list(tenant_id) assert len(timelines) == 0, "initial tenant should not have any timelines" # create timeline - timeline_id = uuid4() + timeline_id = ZTimelineId.generate() client.timeline_create(tenant_id=tenant_id, new_timeline_id=timeline_id) timelines = client.timeline_list(tenant_id) assert len(timelines) > 0 # check it is there - assert timeline_id.hex in {b["timeline_id"] for b in client.timeline_list(tenant_id)} + assert timeline_id in {ZTimelineId(b["timeline_id"]) for b in client.timeline_list(tenant_id)} for timeline in timelines: - timeline_id_str = str(timeline["timeline_id"]) + timeline_id = ZTimelineId(timeline["timeline_id"]) timeline_details = client.timeline_detail( tenant_id=tenant_id, - timeline_id=UUID(timeline_id_str), + timeline_id=timeline_id, include_non_incremental_logical_size=True, ) - assert timeline_details["tenant_id"] == tenant_id.hex - assert timeline_details["timeline_id"] == timeline_id_str + assert ZTenantId(timeline_details["tenant_id"]) == tenant_id + assert ZTimelineId(timeline_details["timeline_id"]) == timeline_id local_timeline_details = timeline_details.get("local") assert local_timeline_details is not None @@ -122,10 +121,10 @@ def test_pageserver_http_get_wal_receiver_not_found(neon_simple_env: NeonEnv): def expect_updated_msg_lsn( client: NeonPageserverHttpClient, - tenant_id: UUID, - timeline_id: UUID, - prev_msg_lsn: Optional[int], -) -> int: + tenant_id: ZTenantId, + timeline_id: ZTimelineId, + prev_msg_lsn: Optional[Lsn], +) -> Lsn: timeline_details = client.timeline_detail(tenant_id, timeline_id=timeline_id) # a successful `timeline_details` response must contain the below fields @@ -138,7 +137,7 @@ def expect_updated_msg_lsn( local_timeline_details["last_received_msg_lsn"] is not None ), "the last received message's LSN is empty" - last_msg_lsn = lsn_from_hex(local_timeline_details["last_received_msg_lsn"]) + last_msg_lsn = Lsn(local_timeline_details["last_received_msg_lsn"]) assert ( prev_msg_lsn is None or prev_msg_lsn < last_msg_lsn ), f"the last received message's LSN {last_msg_lsn} hasn't been updated \ diff --git a/test_runner/regress/test_pitr_gc.py b/test_runner/regress/test_pitr_gc.py index 1fc18ebbc4..329f4b7d24 100644 --- a/test_runner/regress/test_pitr_gc.py +++ b/test_runner/regress/test_pitr_gc.py @@ -3,6 +3,7 @@ from contextlib import closing import psycopg2.extras from fixtures.log_helper import log from fixtures.neon_fixtures import NeonEnvBuilder +from fixtures.types import ZTimelineId from fixtures.utils import print_gc_result, query_scalar @@ -24,7 +25,7 @@ def test_pitr_gc(neon_env_builder: NeonEnvBuilder): main_pg_conn = pgmain.connect() main_cur = main_pg_conn.cursor() - timeline = query_scalar(main_cur, "SHOW neon.timeline_id") + timeline = ZTimelineId(query_scalar(main_cur, "SHOW neon.timeline_id")) # Create table main_cur.execute("CREATE TABLE foo (t text)") @@ -57,9 +58,9 @@ def test_pitr_gc(neon_env_builder: NeonEnvBuilder): # run GC with closing(env.pageserver.connect()) as psconn: with psconn.cursor(cursor_factory=psycopg2.extras.DictCursor) as pscur: - pscur.execute(f"compact {env.initial_tenant.hex} {timeline}") + pscur.execute(f"compact {env.initial_tenant} {timeline}") # perform aggressive GC. Data still should be kept because of the PITR setting. - pscur.execute(f"do_gc {env.initial_tenant.hex} {timeline} 0") + pscur.execute(f"do_gc {env.initial_tenant} {timeline} 0") row = pscur.fetchone() print_gc_result(row) diff --git a/test_runner/regress/test_readonly_node.py b/test_runner/regress/test_readonly_node.py index 0bd78c62a3..fac9d97a42 100644 --- a/test_runner/regress/test_readonly_node.py +++ b/test_runner/regress/test_readonly_node.py @@ -1,6 +1,7 @@ import pytest from fixtures.log_helper import log from fixtures.neon_fixtures import NeonEnv +from fixtures.types import Lsn from fixtures.utils import query_scalar @@ -84,7 +85,9 @@ def test_readonly_node(neon_simple_env: NeonEnv): # Check creating a node at segment boundary pg = env.postgres.create_start( - branch_name="test_readonly_node", node_name="test_branch_segment_boundary", lsn="0/3000000" + branch_name="test_readonly_node", + node_name="test_branch_segment_boundary", + lsn=Lsn("0/3000000"), ) cur = pg.connect().cursor() cur.execute("SELECT 1") @@ -94,5 +97,7 @@ def test_readonly_node(neon_simple_env: NeonEnv): with pytest.raises(Exception, match="invalid basebackup lsn"): # compute node startup with invalid LSN should fail env.postgres.create_start( - branch_name="test_readonly_node", node_name="test_readonly_node_preinitdb", lsn="0/42" + branch_name="test_readonly_node", + node_name="test_readonly_node_preinitdb", + lsn=Lsn("0/42"), ) diff --git a/test_runner/regress/test_remote_storage.py b/test_runner/regress/test_remote_storage.py index 0015c75670..04baef6ba0 100644 --- a/test_runner/regress/test_remote_storage.py +++ b/test_runner/regress/test_remote_storage.py @@ -5,7 +5,6 @@ import os import shutil import time from pathlib import Path -from uuid import UUID import pytest from fixtures.log_helper import log @@ -18,7 +17,8 @@ from fixtures.neon_fixtures import ( wait_for_upload, wait_until, ) -from fixtures.utils import lsn_from_hex, query_scalar +from fixtures.types import Lsn, ZTenantId, ZTimelineId +from fixtures.utils import query_scalar # @@ -61,8 +61,8 @@ def test_remote_storage_backup_and_restore( client = env.pageserver.http_client() - tenant_id = pg.safe_psql("show neon.tenant_id")[0][0] - timeline_id = pg.safe_psql("show neon.timeline_id")[0][0] + tenant_id = ZTenantId(pg.safe_psql("show neon.tenant_id")[0][0]) + timeline_id = ZTimelineId(pg.safe_psql("show neon.timeline_id")[0][0]) checkpoint_numbers = range(1, 3) @@ -74,17 +74,17 @@ def test_remote_storage_backup_and_restore( INSERT INTO t{checkpoint_number} VALUES ({data_id}, '{data_secret}|{checkpoint_number}'); """ ) - current_lsn = lsn_from_hex(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()")) + current_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()")) # wait until pageserver receives that data - wait_for_last_record_lsn(client, UUID(tenant_id), UUID(timeline_id), current_lsn) + wait_for_last_record_lsn(client, tenant_id, timeline_id, current_lsn) # run checkpoint manually to be sure that data landed in remote storage env.pageserver.safe_psql(f"checkpoint {tenant_id} {timeline_id}") log.info(f"waiting for checkpoint {checkpoint_number} upload") # wait until pageserver successfully uploaded a checkpoint to remote storage - wait_for_upload(client, UUID(tenant_id), UUID(timeline_id), current_lsn) + wait_for_upload(client, tenant_id, timeline_id, current_lsn) log.info(f"upload of checkpoint {checkpoint_number} is done") ##### Stop the first pageserver instance, erase all its data @@ -101,16 +101,16 @@ def test_remote_storage_backup_and_restore( # Introduce failpoint in download env.pageserver.safe_psql("failpoints remote-storage-download-pre-rename=return") - client.tenant_attach(UUID(tenant_id)) + client.tenant_attach(tenant_id) # is there a better way to assert that failpoint triggered? time.sleep(10) # assert cannot attach timeline that is scheduled for download with pytest.raises(Exception, match="Conflict: Tenant download is already in progress"): - client.tenant_attach(UUID(tenant_id)) + client.tenant_attach(tenant_id) - detail = client.timeline_detail(UUID(tenant_id), UUID(timeline_id)) + detail = client.timeline_detail(tenant_id, timeline_id) log.info("Timeline detail with active failpoint: %s", detail) assert detail["local"] is None assert detail["remote"]["awaits_download"] @@ -119,20 +119,20 @@ def test_remote_storage_backup_and_restore( env.pageserver.stop() env.pageserver.start() - client.tenant_attach(UUID(tenant_id)) + client.tenant_attach(tenant_id) log.info("waiting for timeline redownload") wait_until( number_of_iterations=20, interval=1, - func=lambda: assert_timeline_local(client, UUID(tenant_id), UUID(timeline_id)), + func=lambda: assert_timeline_local(client, tenant_id, timeline_id), ) - detail = client.timeline_detail(UUID(tenant_id), UUID(timeline_id)) + detail = client.timeline_detail(tenant_id, timeline_id) assert detail["local"] is not None log.info("Timeline detail after attach completed: %s", detail) assert ( - lsn_from_hex(detail["local"]["last_record_lsn"]) >= current_lsn + Lsn(detail["local"]["last_record_lsn"]) >= current_lsn ), "current db Lsn should should not be less than the one stored on remote storage" assert not detail["remote"]["awaits_download"] diff --git a/test_runner/regress/test_tenant_conf.py b/test_runner/regress/test_tenant_conf.py index d496edd6dc..51a8101b11 100644 --- a/test_runner/regress/test_tenant_conf.py +++ b/test_runner/regress/test_tenant_conf.py @@ -32,8 +32,8 @@ tenant_config={checkpoint_distance = 10000, compaction_target_size = 1048576}""" # it should match global configuration with closing(env.pageserver.connect()) as psconn: with psconn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as pscur: - log.info(f"show {env.initial_tenant.hex}") - pscur.execute(f"show {env.initial_tenant.hex}") + log.info(f"show {env.initial_tenant}") + pscur.execute(f"show {env.initial_tenant}") res = pscur.fetchone() assert all( i in res.items() @@ -52,7 +52,7 @@ tenant_config={checkpoint_distance = 10000, compaction_target_size = 1048576}""" # check the configuration of the new tenant with closing(env.pageserver.connect()) as psconn: with psconn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as pscur: - pscur.execute(f"show {tenant.hex}") + pscur.execute(f"show {tenant}") res = pscur.fetchone() log.info(f"res: {res}") assert all( @@ -80,7 +80,7 @@ tenant_config={checkpoint_distance = 10000, compaction_target_size = 1048576}""" with closing(env.pageserver.connect()) as psconn: with psconn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as pscur: - pscur.execute(f"show {tenant.hex}") + pscur.execute(f"show {tenant}") res = pscur.fetchone() log.info(f"after config res: {res}") assert all( @@ -103,7 +103,7 @@ tenant_config={checkpoint_distance = 10000, compaction_target_size = 1048576}""" with closing(env.pageserver.connect()) as psconn: with psconn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as pscur: - pscur.execute(f"show {tenant.hex}") + pscur.execute(f"show {tenant}") res = pscur.fetchone() log.info(f"after restart res: {res}") assert all( diff --git a/test_runner/regress/test_tenant_detach.py b/test_runner/regress/test_tenant_detach.py index f1b30429bf..147e22b38f 100644 --- a/test_runner/regress/test_tenant_detach.py +++ b/test_runner/regress/test_tenant_detach.py @@ -1,17 +1,16 @@ -import uuid from threading import Thread -from uuid import uuid4 import psycopg2 import pytest from fixtures.log_helper import log from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, NeonPageserverApiException +from fixtures.types import ZTenantId, ZTimelineId -def do_gc_target(env: NeonEnv, tenant_id: uuid.UUID, timeline_id: uuid.UUID): +def do_gc_target(env: NeonEnv, tenant_id: ZTenantId, timeline_id: ZTimelineId): """Hack to unblock main, see https://github.com/neondatabase/neon/issues/2211""" try: - env.pageserver.safe_psql(f"do_gc {tenant_id.hex} {timeline_id.hex} 0") + env.pageserver.safe_psql(f"do_gc {tenant_id} {timeline_id} 0") except Exception as e: log.error("do_gc failed: %s", e) @@ -21,10 +20,10 @@ def test_tenant_detach_smoke(neon_env_builder: NeonEnvBuilder): pageserver_http = env.pageserver.http_client() # first check for non existing tenant - tenant_id = uuid4() + tenant_id = ZTenantId.generate() with pytest.raises( expected_exception=NeonPageserverApiException, - match=f"Tenant not found for id {tenant_id.hex}", + match=f"Tenant not found for id {tenant_id}", ): pageserver_http.tenant_detach(tenant_id) @@ -32,7 +31,7 @@ def test_tenant_detach_smoke(neon_env_builder: NeonEnvBuilder): tenant_id, timeline_id = env.neon_cli.create_tenant() # assert tenant exists on disk - assert (env.repo_dir / "tenants" / tenant_id.hex).exists() + assert (env.repo_dir / "tenants" / str(tenant_id)).exists() pg = env.postgres.create_start("main", tenant_id=tenant_id) # we rely upon autocommit after each statement @@ -47,7 +46,8 @@ def test_tenant_detach_smoke(neon_env_builder: NeonEnvBuilder): with pytest.raises( expected_exception=psycopg2.DatabaseError, match="gc target timeline does not exist" ): - env.pageserver.safe_psql(f"do_gc {tenant_id.hex} {uuid4().hex} 0") + bogus_timeline_id = ZTimelineId.generate() + env.pageserver.safe_psql(f"do_gc {tenant_id} {bogus_timeline_id} 0") # try to concurrently run gc and detach gc_thread = Thread(target=lambda: do_gc_target(env, tenant_id, timeline_id)) @@ -70,9 +70,9 @@ def test_tenant_detach_smoke(neon_env_builder: NeonEnvBuilder): gc_thread.join(timeout=10) # check that nothing is left on disk for deleted tenant - assert not (env.repo_dir / "tenants" / tenant_id.hex).exists() + assert not (env.repo_dir / "tenants" / str(tenant_id)).exists() with pytest.raises( - expected_exception=psycopg2.DatabaseError, match=f"Tenant {tenant_id.hex} not found" + expected_exception=psycopg2.DatabaseError, match=f"Tenant {tenant_id} not found" ): - env.pageserver.safe_psql(f"do_gc {tenant_id.hex} {timeline_id.hex} 0") + env.pageserver.safe_psql(f"do_gc {tenant_id} {timeline_id} 0") diff --git a/test_runner/regress/test_tenant_relocation.py b/test_runner/regress/test_tenant_relocation.py index 19b0ec05a7..56563ebe87 100644 --- a/test_runner/regress/test_tenant_relocation.py +++ b/test_runner/regress/test_tenant_relocation.py @@ -5,7 +5,6 @@ import subprocess import threading from contextlib import closing, contextmanager from typing import Any, Dict, Optional, Tuple -from uuid import UUID import pytest from fixtures.log_helper import log @@ -25,7 +24,8 @@ from fixtures.neon_fixtures import ( wait_for_upload, wait_until, ) -from fixtures.utils import lsn_from_hex, lsn_to_hex, subprocess_capture +from fixtures.types import Lsn, ZTenantId, ZTimelineId +from fixtures.utils import query_scalar, subprocess_capture def assert_abs_margin_ratio(a: float, b: float, margin_ratio: float): @@ -113,19 +113,21 @@ def load(pg: Postgres, stop_event: threading.Event, load_ok_event: threading.Eve def populate_branch( pg: Postgres, - tenant_id: UUID, + tenant_id: ZTenantId, ps_http: NeonPageserverHttpClient, create_table: bool, expected_sum: Optional[int], -) -> Tuple[UUID, int]: +) -> Tuple[ZTimelineId, Lsn]: # insert some data with pg_cur(pg) as cur: cur.execute("SHOW neon.timeline_id") - timeline_id = UUID(cur.fetchone()[0]) - log.info("timeline to relocate %s", timeline_id.hex) + timeline_id = ZTimelineId(cur.fetchone()[0]) + log.info("timeline to relocate %s", timeline_id) - cur.execute("SELECT pg_current_wal_flush_lsn()") - log.info("pg_current_wal_flush_lsn() %s", lsn_from_hex(cur.fetchone()[0])) + log.info( + "pg_current_wal_flush_lsn(): %s", + Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()")), + ) log.info( "timeline detail %s", ps_http.timeline_detail(tenant_id=tenant_id, timeline_id=timeline_id), @@ -139,21 +141,20 @@ def populate_branch( if expected_sum is not None: cur.execute("SELECT sum(key) FROM t") assert cur.fetchone() == (expected_sum,) - cur.execute("SELECT pg_current_wal_flush_lsn()") + current_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()")) - current_lsn = lsn_from_hex(cur.fetchone()[0]) return timeline_id, current_lsn def ensure_checkpoint( pageserver_cur, pageserver_http: NeonPageserverHttpClient, - tenant_id: UUID, - timeline_id: UUID, - current_lsn: int, + tenant_id: ZTenantId, + timeline_id: ZTimelineId, + current_lsn: Lsn, ): # run checkpoint manually to be sure that data landed in remote storage - pageserver_cur.execute(f"checkpoint {tenant_id.hex} {timeline_id.hex}") + pageserver_cur.execute(f"checkpoint {tenant_id} {timeline_id}") # wait until pageserver successfully uploaded a checkpoint to remote storage wait_for_upload(pageserver_http, tenant_id, timeline_id, current_lsn) @@ -161,10 +162,10 @@ def ensure_checkpoint( def check_timeline_attached( new_pageserver_http_client: NeonPageserverHttpClient, - tenant_id: UUID, - timeline_id: UUID, + tenant_id: ZTenantId, + timeline_id: ZTimelineId, old_timeline_detail: Dict[str, Any], - old_current_lsn: int, + old_current_lsn: Lsn, ): # new pageserver should be in sync (modulo wal tail or vacuum activity) with the old one because there was no new writes since checkpoint new_timeline_detail = assert_timeline_local(new_pageserver_http_client, tenant_id, timeline_id) @@ -172,18 +173,22 @@ def check_timeline_attached( # when load is active these checks can break because lsns are not static # so let's check with some margin assert_abs_margin_ratio( - lsn_from_hex(new_timeline_detail["local"]["disk_consistent_lsn"]), - lsn_from_hex(old_timeline_detail["local"]["disk_consistent_lsn"]), + int(Lsn(new_timeline_detail["local"]["disk_consistent_lsn"])), + int(Lsn(old_timeline_detail["local"]["disk_consistent_lsn"])), 0.03, ) assert_abs_margin_ratio( - lsn_from_hex(new_timeline_detail["local"]["disk_consistent_lsn"]), old_current_lsn, 0.03 + int(Lsn(new_timeline_detail["local"]["disk_consistent_lsn"])), int(old_current_lsn), 0.03 ) def switch_pg_to_new_pageserver( - env: NeonEnv, pg: Postgres, new_pageserver_port: int, tenant_id: UUID, timeline_id: UUID + env: NeonEnv, + pg: Postgres, + new_pageserver_port: int, + tenant_id: ZTenantId, + timeline_id: ZTimelineId, ) -> pathlib.Path: pg.stop() @@ -195,7 +200,7 @@ def switch_pg_to_new_pageserver( pg.start() timeline_to_detach_local_path = ( - env.repo_dir / "tenants" / tenant_id.hex / "timelines" / timeline_id.hex + env.repo_dir / "tenants" / str(tenant_id) / "timelines" / str(timeline_id) ) files_before_detach = os.listdir(timeline_to_detach_local_path) assert ( @@ -260,7 +265,7 @@ def test_tenant_relocation( pageserver_http = env.pageserver.http_client() tenant_id, initial_timeline_id = env.neon_cli.create_tenant( - UUID("74ee8b079a0e437eb0afea7d26a07209") + ZTenantId("74ee8b079a0e437eb0afea7d26a07209") ) log.info("tenant to relocate %s initial_timeline_id %s", tenant_id, initial_timeline_id) @@ -280,7 +285,7 @@ def test_tenant_relocation( env.neon_cli.create_branch( new_branch_name="test_tenant_relocation_second", ancestor_branch_name="test_tenant_relocation_main", - ancestor_start_lsn=lsn_to_hex(current_lsn_main), + ancestor_start_lsn=current_lsn_main, tenant_id=tenant_id, ) pg_second = env.postgres.create_start( @@ -365,7 +370,7 @@ def test_tenant_relocation( "python", os.path.join(base_dir, "scripts/export_import_between_pageservers.py"), "--tenant-id", - tenant_id.hex, + str(tenant_id), "--from-host", "localhost", "--from-http-port", diff --git a/test_runner/regress/test_tenant_tasks.py b/test_runner/regress/test_tenant_tasks.py index 8617bc8ea9..befa4616be 100644 --- a/test_runner/regress/test_tenant_tasks.py +++ b/test_runner/regress/test_tenant_tasks.py @@ -1,6 +1,5 @@ -from uuid import UUID - from fixtures.neon_fixtures import NeonEnvBuilder, wait_until +from fixtures.types import ZTenantId, ZTimelineId def get_only_element(l): # noqa: E741 @@ -23,7 +22,7 @@ def test_tenant_tasks(neon_env_builder: NeonEnvBuilder): def get_state(tenant): all_states = client.tenant_list() - matching = [t for t in all_states if t["id"] == tenant.hex] + matching = [t for t in all_states if ZTenantId(t["id"]) == tenant] return get_only_element(matching)["state"] def get_metric_value(name): @@ -35,8 +34,8 @@ def test_tenant_tasks(neon_env_builder: NeonEnvBuilder): value = line.lstrip(name).strip() return int(value) - def delete_all_timelines(tenant): - timelines = [UUID(t["timeline_id"]) for t in client.timeline_list(tenant)] + def delete_all_timelines(tenant: ZTenantId): + timelines = [ZTimelineId(t["timeline_id"]) for t in client.timeline_list(tenant)] for t in timelines: client.timeline_delete(tenant, t) @@ -55,7 +54,7 @@ def test_tenant_tasks(neon_env_builder: NeonEnvBuilder): # Detach all tenants and wait for them to go idle # TODO they should be already idle since there are no active computes for tenant_info in client.tenant_list(): - tenant_id = UUID(tenant_info["id"]) + tenant_id = ZTenantId(tenant_info["id"]) delete_all_timelines(tenant_id) wait_until(10, 0.2, lambda: assert_idle(tenant_id)) diff --git a/test_runner/regress/test_tenants.py b/test_runner/regress/test_tenants.py index 0e0cd44471..8bbf45205a 100644 --- a/test_runner/regress/test_tenants.py +++ b/test_runner/regress/test_tenants.py @@ -6,7 +6,7 @@ import pytest from fixtures.log_helper import log from fixtures.metrics import parse_metrics from fixtures.neon_fixtures import NeonEnvBuilder -from fixtures.utils import lsn_to_hex +from fixtures.types import Lsn @pytest.mark.parametrize("with_safekeepers", [False, True]) @@ -84,22 +84,24 @@ def test_metrics_normal_work(neon_env_builder: NeonEnvBuilder): sk_metrics = all_metrics[1:] ttids = [ - {"tenant_id": tenant_1.hex, "timeline_id": timeline_1.hex}, - {"tenant_id": tenant_2.hex, "timeline_id": timeline_2.hex}, + {"tenant_id": str(tenant_1), "timeline_id": str(timeline_1)}, + {"tenant_id": str(tenant_2), "timeline_id": str(timeline_2)}, ] # Test metrics per timeline for tt in ttids: log.info(f"Checking metrics for {tt}") - ps_lsn = int(ps_metrics.query_one("pageserver_last_record_lsn", filter=tt).value) - sk_lsns = [int(sk.query_one("safekeeper_commit_lsn", filter=tt).value) for sk in sk_metrics] + ps_lsn = Lsn(int(ps_metrics.query_one("pageserver_last_record_lsn", filter=tt).value)) + sk_lsns = [ + Lsn(int(sk.query_one("safekeeper_commit_lsn", filter=tt).value)) for sk in sk_metrics + ] - log.info(f"ps_lsn: {lsn_to_hex(ps_lsn)}") - log.info(f"sk_lsns: {list(map(lsn_to_hex, sk_lsns))}") + log.info(f"ps_lsn: {ps_lsn}") + log.info(f"sk_lsns: {sk_lsns}") assert ps_lsn <= max(sk_lsns) - assert ps_lsn > 0 + assert ps_lsn > Lsn(0) # Test common metrics for metrics in all_metrics: diff --git a/test_runner/regress/test_tenants_with_remote_storage.py b/test_runner/regress/test_tenants_with_remote_storage.py index 083150e12a..70b474c9a9 100644 --- a/test_runner/regress/test_tenants_with_remote_storage.py +++ b/test_runner/regress/test_tenants_with_remote_storage.py @@ -8,7 +8,6 @@ import asyncio from typing import List, Tuple -from uuid import UUID import pytest from fixtures.neon_fixtures import ( @@ -20,7 +19,7 @@ from fixtures.neon_fixtures import ( wait_for_last_record_lsn, wait_for_upload, ) -from fixtures.utils import lsn_from_hex +from fixtures.types import Lsn, ZTenantId, ZTimelineId async def tenant_workload(env: NeonEnv, pg: Postgres): @@ -28,9 +27,6 @@ async def tenant_workload(env: NeonEnv, pg: Postgres): pg_conn = await pg.connect_async() - await pg_conn.fetchval("show neon.tenant_id") - await pg_conn.fetchval("show neon.timeline_id") - await pg_conn.execute("CREATE TABLE t(key int primary key, value text)") for i in range(1, 100): await pg_conn.execute( @@ -62,7 +58,7 @@ def test_tenants_many(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Rem env = neon_env_builder.init_start() - tenants_pgs: List[Tuple[UUID, Postgres]] = [] + tenants_pgs: List[Tuple[ZTenantId, Postgres]] = [] for _ in range(1, 5): # Use a tiny checkpoint distance, to create a lot of layers quickly @@ -87,13 +83,13 @@ def test_tenants_many(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Rem res = pg.safe_psql_many( ["SHOW neon.tenant_id", "SHOW neon.timeline_id", "SELECT pg_current_wal_flush_lsn()"] ) - tenant_id = res[0][0][0] - timeline_id = res[1][0][0] - current_lsn = lsn_from_hex(res[2][0][0]) + tenant_id = ZTenantId(res[0][0][0]) + timeline_id = ZTimelineId(res[1][0][0]) + current_lsn = Lsn(res[2][0][0]) # wait until pageserver receives all the data - wait_for_last_record_lsn(pageserver_http, UUID(tenant_id), UUID(timeline_id), current_lsn) + wait_for_last_record_lsn(pageserver_http, tenant_id, timeline_id, current_lsn) # run final checkpoint manually to flush all the data to remote storage env.pageserver.safe_psql(f"checkpoint {tenant_id} {timeline_id}") - wait_for_upload(pageserver_http, UUID(tenant_id), UUID(timeline_id), current_lsn) + wait_for_upload(pageserver_http, tenant_id, timeline_id, current_lsn) diff --git a/test_runner/regress/test_timeline_delete.py b/test_runner/regress/test_timeline_delete.py index 7a55ffb769..a5dadc535b 100644 --- a/test_runner/regress/test_timeline_delete.py +++ b/test_runner/regress/test_timeline_delete.py @@ -1,7 +1,6 @@ -from uuid import uuid4 - import pytest from fixtures.neon_fixtures import NeonEnv, NeonPageserverApiException, wait_until +from fixtures.types import ZTenantId, ZTimelineId def test_timeline_delete(neon_simple_env: NeonEnv): @@ -11,15 +10,15 @@ def test_timeline_delete(neon_simple_env: NeonEnv): # first try to delete non existing timeline # for existing tenant: - invalid_timeline_id = uuid4() + invalid_timeline_id = ZTimelineId.generate() with pytest.raises(NeonPageserverApiException, match="timeline not found"): ps_http.timeline_delete(tenant_id=env.initial_tenant, timeline_id=invalid_timeline_id) # for non existing tenant: - invalid_tenant_id = uuid4() + invalid_tenant_id = ZTenantId.generate() with pytest.raises( NeonPageserverApiException, - match=f"Tenant {invalid_tenant_id.hex} not found in local tenant state", + match=f"Tenant {invalid_tenant_id} not found in local tenant state", ): ps_http.timeline_delete(tenant_id=invalid_tenant_id, timeline_id=invalid_timeline_id) @@ -37,7 +36,11 @@ def test_timeline_delete(neon_simple_env: NeonEnv): ): timeline_path = ( - env.repo_dir / "tenants" / env.initial_tenant.hex / "timelines" / parent_timeline_id.hex + env.repo_dir + / "tenants" + / str(env.initial_tenant) + / "timelines" + / str(parent_timeline_id) ) assert timeline_path.exists() @@ -46,7 +49,7 @@ def test_timeline_delete(neon_simple_env: NeonEnv): assert not timeline_path.exists() timeline_path = ( - env.repo_dir / "tenants" / env.initial_tenant.hex / "timelines" / leaf_timeline_id.hex + env.repo_dir / "tenants" / str(env.initial_tenant) / "timelines" / str(leaf_timeline_id) ) assert timeline_path.exists() diff --git a/test_runner/regress/test_timeline_size.py b/test_runner/regress/test_timeline_size.py index f6b665ec8c..aba8567541 100644 --- a/test_runner/regress/test_timeline_size.py +++ b/test_runner/regress/test_timeline_size.py @@ -3,7 +3,6 @@ import random import re import time from contextlib import closing -from uuid import UUID import psycopg2.errors import psycopg2.extras @@ -15,6 +14,7 @@ from fixtures.neon_fixtures import ( assert_timeline_local, wait_for_last_flush_lsn, ) +from fixtures.types import ZTenantId, ZTimelineId from fixtures.utils import get_timeline_dir_size @@ -34,8 +34,6 @@ def test_timeline_size(neon_simple_env: NeonEnv): with closing(pgmain.connect()) as conn: with conn.cursor() as cur: - cur.execute("SHOW neon.timeline_id") - cur.execute("CREATE TABLE foo (t text)") cur.execute( """ @@ -77,8 +75,6 @@ def test_timeline_size_createdropdb(neon_simple_env: NeonEnv): with closing(pgmain.connect()) as conn: with conn.cursor() as cur: - cur.execute("SHOW neon.timeline_id") - res = assert_timeline_local(client, env.initial_tenant, new_timeline_id) local_details = res["local"] assert ( @@ -254,7 +250,7 @@ def test_timeline_physical_size_post_checkpoint(neon_simple_env: NeonEnv): ) wait_for_last_flush_lsn(env, pg, env.initial_tenant, new_timeline_id) - env.pageserver.safe_psql(f"checkpoint {env.initial_tenant.hex} {new_timeline_id.hex}") + env.pageserver.safe_psql(f"checkpoint {env.initial_tenant} {new_timeline_id}") assert_physical_size(env, env.initial_tenant, new_timeline_id) @@ -281,8 +277,8 @@ def test_timeline_physical_size_post_compaction(neon_env_builder: NeonEnvBuilder ) wait_for_last_flush_lsn(env, pg, env.initial_tenant, new_timeline_id) - env.pageserver.safe_psql(f"checkpoint {env.initial_tenant.hex} {new_timeline_id.hex}") - env.pageserver.safe_psql(f"compact {env.initial_tenant.hex} {new_timeline_id.hex}") + env.pageserver.safe_psql(f"checkpoint {env.initial_tenant} {new_timeline_id}") + env.pageserver.safe_psql(f"compact {env.initial_tenant} {new_timeline_id}") assert_physical_size(env, env.initial_tenant, new_timeline_id) @@ -307,7 +303,7 @@ def test_timeline_physical_size_post_gc(neon_env_builder: NeonEnvBuilder): ) wait_for_last_flush_lsn(env, pg, env.initial_tenant, new_timeline_id) - env.pageserver.safe_psql(f"checkpoint {env.initial_tenant.hex} {new_timeline_id.hex}") + env.pageserver.safe_psql(f"checkpoint {env.initial_tenant} {new_timeline_id}") pg.safe_psql( """ @@ -318,9 +314,9 @@ def test_timeline_physical_size_post_gc(neon_env_builder: NeonEnvBuilder): ) wait_for_last_flush_lsn(env, pg, env.initial_tenant, new_timeline_id) - env.pageserver.safe_psql(f"checkpoint {env.initial_tenant.hex} {new_timeline_id.hex}") + env.pageserver.safe_psql(f"checkpoint {env.initial_tenant} {new_timeline_id}") - env.pageserver.safe_psql(f"do_gc {env.initial_tenant.hex} {new_timeline_id.hex} 0") + env.pageserver.safe_psql(f"do_gc {env.initial_tenant} {new_timeline_id} 0") assert_physical_size(env, env.initial_tenant, new_timeline_id) @@ -343,12 +339,12 @@ def test_timeline_size_metrics(neon_simple_env: NeonEnv): ) wait_for_last_flush_lsn(env, pg, env.initial_tenant, new_timeline_id) - env.pageserver.safe_psql(f"checkpoint {env.initial_tenant.hex} {new_timeline_id.hex}") + env.pageserver.safe_psql(f"checkpoint {env.initial_tenant} {new_timeline_id}") # get the metrics and parse the metric for the current timeline's physical size metrics = env.pageserver.http_client().get_metrics() matches = re.search( - f'^pageserver_current_physical_size{{tenant_id="{env.initial_tenant.hex}",timeline_id="{new_timeline_id.hex}"}} (\\S+)$', + f'^pageserver_current_physical_size{{tenant_id="{env.initial_tenant}",timeline_id="{new_timeline_id}"}} (\\S+)$', metrics, re.MULTILINE, ) @@ -361,7 +357,7 @@ def test_timeline_size_metrics(neon_simple_env: NeonEnv): # Check that the logical size metric is sane, and matches matches = re.search( - f'^pageserver_current_logical_size{{tenant_id="{env.initial_tenant.hex}",timeline_id="{new_timeline_id.hex}"}} (\\S+)$', + f'^pageserver_current_logical_size{{tenant_id="{env.initial_tenant}",timeline_id="{new_timeline_id}"}} (\\S+)$', metrics, re.MULTILINE, ) @@ -389,7 +385,7 @@ def test_tenant_physical_size(neon_simple_env: NeonEnv): tenant, timeline = env.neon_cli.create_tenant() - def get_timeline_physical_size(timeline: UUID): + def get_timeline_physical_size(timeline: ZTimelineId): res = client.timeline_detail(tenant, timeline, include_non_incremental_physical_size=True) return res["local"]["current_physical_size_non_incremental"] @@ -408,7 +404,7 @@ def test_tenant_physical_size(neon_simple_env: NeonEnv): ) wait_for_last_flush_lsn(env, pg, tenant, timeline) - env.pageserver.safe_psql(f"checkpoint {tenant.hex} {timeline.hex}") + env.pageserver.safe_psql(f"checkpoint {tenant} {timeline}") timeline_total_size += get_timeline_physical_size(timeline) @@ -418,7 +414,7 @@ def test_tenant_physical_size(neon_simple_env: NeonEnv): assert tenant_physical_size == timeline_total_size -def assert_physical_size(env: NeonEnv, tenant_id: UUID, timeline_id: UUID): +def assert_physical_size(env: NeonEnv, tenant_id: ZTenantId, timeline_id: ZTimelineId): """Check the current physical size returned from timeline API matches the total physical size of the timeline on disk""" client = env.pageserver.http_client() diff --git a/test_runner/regress/test_wal_acceptor.py b/test_runner/regress/test_wal_acceptor.py index 28daeb18ed..cd370e60c0 100644 --- a/test_runner/regress/test_wal_acceptor.py +++ b/test_runner/regress/test_wal_acceptor.py @@ -7,12 +7,10 @@ import subprocess import sys import threading import time -import uuid from contextlib import closing from dataclasses import dataclass, field from pathlib import Path from typing import Any, List, Optional -from uuid import uuid4 import pytest from fixtures.log_helper import log @@ -34,14 +32,19 @@ from fixtures.neon_fixtures import ( wait_for_last_record_lsn, wait_for_upload, ) -from fixtures.utils import get_dir_size, lsn_from_hex, lsn_to_hex, query_scalar +from fixtures.types import Lsn, ZTenantId, ZTimelineId +from fixtures.utils import get_dir_size, query_scalar def wait_lsn_force_checkpoint( - tenant_id: str, timeline_id: str, pg: Postgres, ps: NeonPageserver, pageserver_conn_options={} + tenant_id: ZTenantId, + timeline_id: ZTimelineId, + pg: Postgres, + ps: NeonPageserver, + pageserver_conn_options={}, ): - lsn = lsn_from_hex(pg.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0]) - log.info(f"pg_current_wal_flush_lsn is {lsn_to_hex(lsn)}, waiting for it on pageserver") + lsn = Lsn(pg.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0]) + log.info(f"pg_current_wal_flush_lsn is {lsn}, waiting for it on pageserver") auth_token = None if "password" in pageserver_conn_options: @@ -50,8 +53,8 @@ def wait_lsn_force_checkpoint( # wait for the pageserver to catch up wait_for_last_record_lsn( ps.http_client(auth_token=auth_token), - uuid.UUID(hex=tenant_id), - uuid.UUID(hex=timeline_id), + tenant_id, + timeline_id, lsn, ) @@ -63,19 +66,19 @@ def wait_lsn_force_checkpoint( # ensure that remote_consistent_lsn is advanced wait_for_upload( ps.http_client(auth_token=auth_token), - uuid.UUID(hex=tenant_id), - uuid.UUID(hex=timeline_id), + tenant_id, + timeline_id, lsn, ) @dataclass class TimelineMetrics: - timeline_id: str - last_record_lsn: int + timeline_id: ZTimelineId + last_record_lsn: Lsn # One entry per each Safekeeper, order is the same - flush_lsns: List[int] = field(default_factory=list) - commit_lsns: List[int] = field(default_factory=list) + flush_lsns: List[Lsn] = field(default_factory=list) + commit_lsns: List[Lsn] = field(default_factory=list) # Run page server and multiple acceptors, and multiple compute nodes running @@ -123,7 +126,7 @@ def test_many_timelines(neon_env_builder: NeonEnvBuilder): timeline_metrics = [] for timeline_detail in timeline_details: - timeline_id: str = timeline_detail["timeline_id"] + timeline_id = ZTimelineId(timeline_detail["timeline_id"]) local_timeline_detail = timeline_detail.get("local") if local_timeline_detail is None: @@ -132,11 +135,11 @@ def test_many_timelines(neon_env_builder: NeonEnvBuilder): m = TimelineMetrics( timeline_id=timeline_id, - last_record_lsn=lsn_from_hex(local_timeline_detail["last_record_lsn"]), + last_record_lsn=Lsn(local_timeline_detail["last_record_lsn"]), ) for sk_m in sk_metrics: - m.flush_lsns.append(sk_m.flush_lsn_inexact[(tenant_id.hex, timeline_id)]) - m.commit_lsns.append(sk_m.commit_lsn_inexact[(tenant_id.hex, timeline_id)]) + m.flush_lsns.append(Lsn(sk_m.flush_lsn_inexact[(tenant_id, timeline_id)])) + m.commit_lsns.append(Lsn(sk_m.commit_lsn_inexact[(tenant_id, timeline_id)])) for flush_lsn, commit_lsn in zip(m.flush_lsns, m.commit_lsns): # Invariant. May be < when transaction is in progress. @@ -216,7 +219,7 @@ def test_many_timelines(neon_env_builder: NeonEnvBuilder): final_m = collect_metrics("after SELECT") # Assume that LSNs (a) behave similarly in all timelines; and (b) INSERT INTO alters LSN significantly. # Also assume that safekeepers will not be significantly out of sync in this test. - middle_lsn = (init_m[0].last_record_lsn + final_m[0].last_record_lsn) // 2 + middle_lsn = Lsn((int(init_m[0].last_record_lsn) + int(final_m[0].last_record_lsn)) // 2) assert max(init_m[0].flush_lsns) < middle_lsn < min(final_m[0].flush_lsns) assert max(init_m[0].commit_lsns) < middle_lsn < min(final_m[0].commit_lsns) assert max(init_m[1].flush_lsns) < middle_lsn < min(final_m[1].flush_lsns) @@ -270,8 +273,8 @@ def test_broker(neon_env_builder: NeonEnvBuilder): pg.safe_psql("CREATE TABLE t(key int primary key, value text)") # learn neon timeline from compute - tenant_id = pg.safe_psql("show neon.tenant_id")[0][0] - timeline_id = pg.safe_psql("show neon.timeline_id")[0][0] + tenant_id = ZTenantId(pg.safe_psql("show neon.tenant_id")[0][0]) + timeline_id = ZTimelineId(pg.safe_psql("show neon.timeline_id")[0][0]) # wait until remote_consistent_lsn gets advanced on all safekeepers clients = [sk.http_client() for sk in env.safekeepers] @@ -288,8 +291,7 @@ def test_broker(neon_env_builder: NeonEnvBuilder): while True: stat_after = [cli.timeline_status(tenant_id, timeline_id) for cli in clients] if all( - lsn_from_hex(s_after.remote_consistent_lsn) - > lsn_from_hex(s_before.remote_consistent_lsn) + s_after.remote_consistent_lsn > s_before.remote_consistent_lsn for s_after, s_before in zip(stat_after, stat_before) ): break @@ -323,8 +325,8 @@ def test_wal_removal(neon_env_builder: NeonEnvBuilder, auth_enabled: bool): ] ) - tenant_id = pg.safe_psql("show neon.tenant_id")[0][0] - timeline_id = pg.safe_psql("show neon.timeline_id")[0][0] + tenant_id = ZTenantId(pg.safe_psql("show neon.tenant_id")[0][0]) + timeline_id = ZTimelineId(pg.safe_psql("show neon.timeline_id")[0][0]) # force checkpoint to advance remote_consistent_lsn pageserver_conn_options = {} @@ -334,7 +336,7 @@ def test_wal_removal(neon_env_builder: NeonEnvBuilder, auth_enabled: bool): # We will wait for first segment removal. Make sure they exist for starter. first_segments = [ - os.path.join(sk.data_dir(), tenant_id, timeline_id, "000000010000000000000001") + os.path.join(sk.data_dir(), str(tenant_id), str(timeline_id), "000000010000000000000001") for sk in env.safekeepers ] assert all(os.path.exists(p) for p in first_segments) @@ -346,7 +348,7 @@ def test_wal_removal(neon_env_builder: NeonEnvBuilder, auth_enabled: bool): auth_token=env.auth_keys.generate_tenant_token(tenant_id) ) http_cli_other = env.safekeepers[0].http_client( - auth_token=env.auth_keys.generate_tenant_token(uuid4().hex) + auth_token=env.auth_keys.generate_tenant_token(ZTenantId.generate()) ) http_cli_noauth = env.safekeepers[0].http_client() @@ -367,7 +369,7 @@ def test_wal_removal(neon_env_builder: NeonEnvBuilder, auth_enabled: bool): ) http_cli.record_safekeeper_info(tenant_id, timeline_id, {"backup_lsn": "FFFFFFFF/FEFFFFFF"}) assert ( - "FFFFFFFF/FEFFFFFF" + Lsn("FFFFFFFF/FEFFFFFF") == http_cli.timeline_status(tenant_id=tenant_id, timeline_id=timeline_id).backup_lsn ) @@ -382,14 +384,14 @@ def test_wal_removal(neon_env_builder: NeonEnvBuilder, auth_enabled: bool): time.sleep(0.5) -def wait_segment_offload(tenant_id, timeline_id, live_sk, seg_end): +def wait_segment_offload(tenant_id, timeline_id, live_sk, seg_end: Lsn): started_at = time.time() http_cli = live_sk.http_client() while True: tli_status = http_cli.timeline_status(tenant_id, timeline_id) log.info(f"live sk status is {tli_status}") - if lsn_from_hex(tli_status.backup_lsn) >= lsn_from_hex(seg_end): + if tli_status.backup_lsn >= seg_end: break elapsed = time.time() - started_at if elapsed > 30: @@ -399,23 +401,22 @@ def wait_segment_offload(tenant_id, timeline_id, live_sk, seg_end): time.sleep(0.5) -def wait_wal_trim(tenant_id, timeline_id, sk, target_size): +def wait_wal_trim(tenant_id, timeline_id, sk, target_size_mb): started_at = time.time() http_cli = sk.http_client() while True: tli_status = http_cli.timeline_status(tenant_id, timeline_id) - sk_wal_size = ( - get_dir_size(os.path.join(sk.data_dir(), tenant_id, timeline_id)) / 1024 / 1024 - ) - log.info(f"Safekeeper id={sk.id} wal_size={sk_wal_size:.2f}MB status={tli_status}") + sk_wal_size = get_dir_size(os.path.join(sk.data_dir(), str(tenant_id), str(timeline_id))) + sk_wal_size_mb = sk_wal_size / 1024 / 1024 + log.info(f"Safekeeper id={sk.id} wal_size={sk_wal_size_mb:.2f}MB status={tli_status}") - if sk_wal_size <= target_size: + if sk_wal_size_mb <= target_size_mb: break elapsed = time.time() - started_at if elapsed > 20: raise RuntimeError( - f"timed out waiting {elapsed:.0f}s for sk_id={sk.id} to trim WAL to {target_size:.2f}MB, current size is {sk_wal_size:.2f}MB" + f"timed out waiting {elapsed:.0f}s for sk_id={sk.id} to trim WAL to {target_size_mb:.2f}MB, current size is {sk_wal_size_mb:.2f}MB" ) time.sleep(0.5) @@ -437,8 +438,8 @@ def test_wal_backup(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Remot pg = env.postgres.create_start("test_safekeepers_wal_backup") # learn neon timeline from compute - tenant_id = pg.safe_psql("show neon.tenant_id")[0][0] - timeline_id = pg.safe_psql("show neon.timeline_id")[0][0] + tenant_id = ZTenantId(pg.safe_psql("show neon.tenant_id")[0][0]) + timeline_id = ZTimelineId(pg.safe_psql("show neon.timeline_id")[0][0]) pg_conn = pg.connect() cur = pg_conn.cursor() @@ -446,7 +447,7 @@ def test_wal_backup(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Remot # Shut down subsequently each of safekeepers and fill a segment while sk is # down; ensure segment gets offloaded by others. - offloaded_seg_end = ["0/2000000", "0/3000000", "0/4000000"] + offloaded_seg_end = [Lsn("0/2000000"), Lsn("0/3000000"), Lsn("0/4000000")] for victim, seg_end in zip(env.safekeepers, offloaded_seg_end): victim.stop() # roughly fills one segment @@ -465,7 +466,7 @@ def test_wal_backup(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Remot with closing(pg.connect()) as conn: with conn.cursor() as cur: cur.execute("insert into t select generate_series(1,250000), 'payload'") - wait_segment_offload(tenant_id, timeline_id, env.safekeepers[1], "0/5000000") + wait_segment_offload(tenant_id, timeline_id, env.safekeepers[1], Lsn("0/5000000")) @pytest.mark.parametrize("remote_storage_kind", available_remote_storages()) @@ -492,8 +493,8 @@ def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Re pg = env.postgres.create_start("test_s3_wal_replay") # learn neon timeline from compute - tenant_id = pg.safe_psql("show neon.tenant_id")[0][0] - timeline_id = pg.safe_psql("show neon.timeline_id")[0][0] + tenant_id = ZTenantId(pg.safe_psql("show neon.tenant_id")[0][0]) + timeline_id = ZTimelineId(pg.safe_psql("show neon.timeline_id")[0][0]) expected_sum = 0 @@ -503,7 +504,7 @@ def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Re cur.execute("insert into t values (1, 'payload')") expected_sum += 1 - offloaded_seg_end = ["0/3000000"] + offloaded_seg_end = [Lsn("0/3000000")] for seg_end in offloaded_seg_end: # roughly fills two segments cur.execute("insert into t select generate_series(1,500000), 'payload'") @@ -517,7 +518,7 @@ def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Re # advance remote_consistent_lsn to trigger WAL trimming # this LSN should be less than commit_lsn, so timeline will be active=true in safekeepers, to push etcd updates env.safekeepers[0].http_client().record_safekeeper_info( - tenant_id, timeline_id, {"remote_consistent_lsn": offloaded_seg_end[-1]} + tenant_id, timeline_id, {"remote_consistent_lsn": str(offloaded_seg_end[-1])} ) for sk in env.safekeepers: @@ -526,10 +527,10 @@ def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Re last_lsn = query_scalar(cur, "SELECT pg_current_wal_flush_lsn()") - pageserver_lsn = env.pageserver.http_client().timeline_detail( - uuid.UUID(tenant_id), uuid.UUID((timeline_id)) - )["local"]["last_record_lsn"] - lag = lsn_from_hex(last_lsn) - lsn_from_hex(pageserver_lsn) + pageserver_lsn = env.pageserver.http_client().timeline_detail(tenant_id, timeline_id)["local"][ + "last_record_lsn" + ] + lag = Lsn(last_lsn) - Lsn(pageserver_lsn) log.info( f"Pageserver last_record_lsn={pageserver_lsn}; flush_lsn={last_lsn}; lag before replay is {lag / 1024}kb" ) @@ -554,10 +555,10 @@ def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Re if elapsed > wait_lsn_timeout: raise RuntimeError("Timed out waiting for WAL redo") - pageserver_lsn = env.pageserver.http_client().timeline_detail( - uuid.UUID(tenant_id), uuid.UUID((timeline_id)) - )["local"]["last_record_lsn"] - lag = lsn_from_hex(last_lsn) - lsn_from_hex(pageserver_lsn) + pageserver_lsn = env.pageserver.http_client().timeline_detail(tenant_id, timeline_id)[ + "local" + ]["last_record_lsn"] + lag = Lsn(last_lsn) - Lsn(pageserver_lsn) if time.time() > last_debug_print + 10 or lag <= 0: last_debug_print = time.time() @@ -583,8 +584,8 @@ class ProposerPostgres(PgProtocol): self, pgdata_dir: str, pg_bin, - timeline_id: uuid.UUID, - tenant_id: uuid.UUID, + tenant_id: ZTenantId, + timeline_id: ZTimelineId, listen_addr: str, port: int, ): @@ -592,8 +593,8 @@ class ProposerPostgres(PgProtocol): self.pgdata_dir: str = pgdata_dir self.pg_bin: PgBin = pg_bin - self.timeline_id: uuid.UUID = timeline_id - self.tenant_id: uuid.UUID = tenant_id + self.tenant_id: ZTenantId = tenant_id + self.timeline_id: ZTimelineId = timeline_id self.listen_addr: str = listen_addr self.port: int = port @@ -613,8 +614,8 @@ class ProposerPostgres(PgProtocol): cfg = [ "synchronous_standby_names = 'walproposer'\n", "shared_preload_libraries = 'neon'\n", - f"neon.timeline_id = '{self.timeline_id.hex}'\n", - f"neon.tenant_id = '{self.tenant_id.hex}'\n", + f"neon.timeline_id = '{self.timeline_id}'\n", + f"neon.tenant_id = '{self.tenant_id}'\n", "neon.pageserver_connstring = ''\n", f"neon.safekeepers = '{safekeepers}'\n", f"listen_addresses = '{self.listen_addr}'\n", @@ -623,7 +624,7 @@ class ProposerPostgres(PgProtocol): f.writelines(cfg) - def sync_safekeepers(self) -> str: + def sync_safekeepers(self) -> Lsn: """ Run 'postgres --sync-safekeepers'. Returns execution result, which is commit_lsn after sync. @@ -639,7 +640,7 @@ class ProposerPostgres(PgProtocol): with open(stdout_filename, "r") as stdout_f: stdout = stdout_f.read() - return stdout.strip("\n ") + return Lsn(stdout.strip("\n ")) def initdb(self): """Run initdb""" @@ -671,18 +672,18 @@ def test_sync_safekeepers( neon_env_builder.num_safekeepers = 3 env = neon_env_builder.init_start() - timeline_id = uuid.uuid4() - tenant_id = uuid.uuid4() + tenant_id = ZTenantId.generate() + timeline_id = ZTimelineId.generate() # write config for proposer pgdata_dir = os.path.join(env.repo_dir, "proposer_pgdata") pg = ProposerPostgres( - pgdata_dir, pg_bin, timeline_id, tenant_id, "127.0.0.1", port_distributor.get_port() + pgdata_dir, pg_bin, tenant_id, timeline_id, "127.0.0.1", port_distributor.get_port() ) pg.create_dir_config(env.get_safekeeper_connstrs()) # valid lsn, which is not in the segment start, nor in zero segment - epoch_start_lsn = 0x16B9188 # 0/16B9188 + epoch_start_lsn = Lsn("0/16B9188") begin_lsn = epoch_start_lsn # append and commit WAL @@ -697,14 +698,14 @@ def test_sync_safekeepers( "set_commit_lsn": True, "send_proposer_elected": True, "term": 2, - "begin_lsn": begin_lsn, - "epoch_start_lsn": epoch_start_lsn, - "truncate_lsn": epoch_start_lsn, + "begin_lsn": int(begin_lsn), + "epoch_start_lsn": int(epoch_start_lsn), + "truncate_lsn": int(epoch_start_lsn), }, ) - lsn_hex = lsn_to_hex(res["inserted_wal"]["end_lsn"]) - lsn_after_append.append(lsn_hex) - log.info(f"safekeeper[{i}] lsn after append: {lsn_hex}") + lsn = Lsn(res["inserted_wal"]["end_lsn"]) + lsn_after_append.append(lsn) + log.info(f"safekeeper[{i}] lsn after append: {lsn}") # run sync safekeepers lsn_after_sync = pg.sync_safekeepers() @@ -724,8 +725,8 @@ def test_timeline_status(neon_env_builder: NeonEnvBuilder, auth_enabled: bool): wa = env.safekeepers[0] # learn neon timeline from compute - tenant_id = pg.safe_psql("show neon.tenant_id")[0][0] - timeline_id = pg.safe_psql("show neon.timeline_id")[0][0] + tenant_id = ZTenantId(pg.safe_psql("show neon.tenant_id")[0][0]) + timeline_id = ZTimelineId(pg.safe_psql("show neon.timeline_id")[0][0]) if not auth_enabled: wa_http_cli = wa.http_client() @@ -734,7 +735,7 @@ def test_timeline_status(neon_env_builder: NeonEnvBuilder, auth_enabled: bool): wa_http_cli = wa.http_client(auth_token=env.auth_keys.generate_tenant_token(tenant_id)) wa_http_cli.check_status() wa_http_cli_bad = wa.http_client( - auth_token=env.auth_keys.generate_tenant_token(uuid4().hex) + auth_token=env.auth_keys.generate_tenant_token(ZTenantId.generate()) ) wa_http_cli_bad.check_status() wa_http_cli_noauth = wa.http_client() @@ -784,15 +785,15 @@ class SafekeeperEnv: self.bin_safekeeper = os.path.join(str(neon_binpath), "safekeeper") self.safekeepers: Optional[List[subprocess.CompletedProcess[Any]]] = None self.postgres: Optional[ProposerPostgres] = None - self.tenant_id: Optional[uuid.UUID] = None - self.timeline_id: Optional[uuid.UUID] = None + self.tenant_id: Optional[ZTenantId] = None + self.timeline_id: Optional[ZTimelineId] = None def init(self) -> "SafekeeperEnv": assert self.postgres is None, "postgres is already initialized" assert self.safekeepers is None, "safekeepers are already initialized" - self.timeline_id = uuid.uuid4() - self.tenant_id = uuid.uuid4() + self.tenant_id = ZTenantId.generate() + self.timeline_id = ZTimelineId.generate() self.repo_dir.mkdir(exist_ok=True) # Create config and a Safekeeper object for each safekeeper @@ -841,8 +842,8 @@ class SafekeeperEnv: pg = ProposerPostgres( pgdata_dir, self.pg_bin, - self.timeline_id, self.tenant_id, + self.timeline_id, "127.0.0.1", self.port_distributor.get_port(), ) @@ -911,7 +912,9 @@ def test_replace_safekeeper(neon_env_builder: NeonEnvBuilder): sum_after = query_scalar(cur, "SELECT SUM(key) FROM t") assert sum_after == sum_before + 5000050000 - def show_statuses(safekeepers: List[Safekeeper], tenant_id: str, timeline_id: str): + def show_statuses( + safekeepers: List[Safekeeper], tenant_id: ZTenantId, timeline_id: ZTimelineId + ): for sk in safekeepers: http_cli = sk.http_client() try: @@ -932,8 +935,8 @@ def test_replace_safekeeper(neon_env_builder: NeonEnvBuilder): pg.start() # learn neon timeline from compute - tenant_id = pg.safe_psql("show neon.tenant_id")[0][0] - timeline_id = pg.safe_psql("show neon.timeline_id")[0][0] + tenant_id = ZTenantId(pg.safe_psql("show neon.tenant_id")[0][0]) + timeline_id = ZTimelineId(pg.safe_psql("show neon.timeline_id")[0][0]) execute_payload(pg) show_statuses(env.safekeepers, tenant_id, timeline_id) @@ -985,20 +988,21 @@ def test_replace_safekeeper(neon_env_builder: NeonEnvBuilder): # of WAL segments. def test_wal_deleted_after_broadcast(neon_env_builder: NeonEnvBuilder): # used to calculate delta in collect_stats - last_lsn = 0.0 + last_lsn = Lsn(0) - # returns LSN and pg_wal size, all in MB + # returns pg_wal size in MB def collect_stats(pg: Postgres, cur, enable_logs=True): nonlocal last_lsn assert pg.pgdata_dir is not None log.info("executing INSERT to generate WAL") - current_lsn = lsn_from_hex(query_scalar(cur, "select pg_current_wal_lsn()")) / 1024 / 1024 - pg_wal_size = get_dir_size(os.path.join(pg.pgdata_dir, "pg_wal")) / 1024 / 1024 + current_lsn = Lsn(query_scalar(cur, "select pg_current_wal_lsn()")) + pg_wal_size_mb = get_dir_size(os.path.join(pg.pgdata_dir, "pg_wal")) / 1024 / 1024 if enable_logs: - log.info(f"LSN delta: {current_lsn - last_lsn} MB, current WAL size: {pg_wal_size} MB") + lsn_delta_mb = (current_lsn - last_lsn) / 1024 / 1024 + log.info(f"LSN delta: {lsn_delta_mb} MB, current WAL size: {pg_wal_size_mb} MB") last_lsn = current_lsn - return current_lsn, pg_wal_size + return pg_wal_size_mb # generates about ~20MB of WAL, to create at least one new segment def generate_wal(cur): @@ -1027,7 +1031,7 @@ def test_wal_deleted_after_broadcast(neon_env_builder: NeonEnvBuilder): log.info("executing checkpoint") cur.execute("CHECKPOINT") - wal_size_after_checkpoint = collect_stats(pg, cur)[1] + wal_size_after_checkpoint = collect_stats(pg, cur) # there shouldn't be more than 2 WAL segments (but dir may have archive_status files) assert wal_size_after_checkpoint < 16 * 2.5 @@ -1040,22 +1044,20 @@ def test_delete_force(neon_env_builder: NeonEnvBuilder, auth_enabled: bool): env = neon_env_builder.init_start() # Create two tenants: one will be deleted, other should be preserved. - tenant_id = env.initial_tenant.hex - timeline_id_1 = env.neon_cli.create_branch("br1").hex # Active, delete explicitly - timeline_id_2 = env.neon_cli.create_branch("br2").hex # Inactive, delete explicitly - timeline_id_3 = env.neon_cli.create_branch("br3").hex # Active, delete with the tenant - timeline_id_4 = env.neon_cli.create_branch("br4").hex # Inactive, delete with the tenant + tenant_id = env.initial_tenant + timeline_id_1 = env.neon_cli.create_branch("br1") # Active, delete explicitly + timeline_id_2 = env.neon_cli.create_branch("br2") # Inactive, delete explicitly + timeline_id_3 = env.neon_cli.create_branch("br3") # Active, delete with the tenant + timeline_id_4 = env.neon_cli.create_branch("br4") # Inactive, delete with the tenant - tenant_id_other_uuid, timeline_id_other_uuid = env.neon_cli.create_tenant() - tenant_id_other = tenant_id_other_uuid.hex - timeline_id_other = timeline_id_other_uuid.hex + tenant_id_other, timeline_id_other = env.neon_cli.create_tenant() # Populate branches pg_1 = env.postgres.create_start("br1") pg_2 = env.postgres.create_start("br2") pg_3 = env.postgres.create_start("br3") pg_4 = env.postgres.create_start("br4") - pg_other = env.postgres.create_start("main", tenant_id=uuid.UUID(hex=tenant_id_other)) + pg_other = env.postgres.create_start("main", tenant_id=tenant_id_other) for pg in [pg_1, pg_2, pg_3, pg_4, pg_other]: with closing(pg.connect()) as conn: with conn.cursor() as cur: @@ -1071,11 +1073,11 @@ def test_delete_force(neon_env_builder: NeonEnvBuilder, auth_enabled: bool): auth_token=env.auth_keys.generate_tenant_token(tenant_id_other) ) sk_http_noauth = sk.http_client() - assert (sk_data_dir / tenant_id / timeline_id_1).is_dir() - assert (sk_data_dir / tenant_id / timeline_id_2).is_dir() - assert (sk_data_dir / tenant_id / timeline_id_3).is_dir() - assert (sk_data_dir / tenant_id / timeline_id_4).is_dir() - assert (sk_data_dir / tenant_id_other / timeline_id_other).is_dir() + assert (sk_data_dir / str(tenant_id) / str(timeline_id_1)).is_dir() + assert (sk_data_dir / str(tenant_id) / str(timeline_id_2)).is_dir() + assert (sk_data_dir / str(tenant_id) / str(timeline_id_3)).is_dir() + assert (sk_data_dir / str(tenant_id) / str(timeline_id_4)).is_dir() + assert (sk_data_dir / str(tenant_id_other) / str(timeline_id_other)).is_dir() # Stop branches which should be inactive and restart Safekeeper to drop its in-memory state. pg_2.stop_and_destroy() @@ -1094,22 +1096,22 @@ def test_delete_force(neon_env_builder: NeonEnvBuilder, auth_enabled: bool): "dir_existed": True, "was_active": True, } - assert not (sk_data_dir / tenant_id / timeline_id_1).exists() - assert (sk_data_dir / tenant_id / timeline_id_2).is_dir() - assert (sk_data_dir / tenant_id / timeline_id_3).is_dir() - assert (sk_data_dir / tenant_id / timeline_id_4).is_dir() - assert (sk_data_dir / tenant_id_other / timeline_id_other).is_dir() + assert not (sk_data_dir / str(tenant_id) / str(timeline_id_1)).exists() + assert (sk_data_dir / str(tenant_id) / str(timeline_id_2)).is_dir() + assert (sk_data_dir / str(tenant_id) / str(timeline_id_3)).is_dir() + assert (sk_data_dir / str(tenant_id) / str(timeline_id_4)).is_dir() + assert (sk_data_dir / str(tenant_id_other) / str(timeline_id_other)).is_dir() # Ensure repeated deletion succeeds assert sk_http.timeline_delete_force(tenant_id, timeline_id_1) == { "dir_existed": False, "was_active": False, } - assert not (sk_data_dir / tenant_id / timeline_id_1).exists() - assert (sk_data_dir / tenant_id / timeline_id_2).is_dir() - assert (sk_data_dir / tenant_id / timeline_id_3).is_dir() - assert (sk_data_dir / tenant_id / timeline_id_4).is_dir() - assert (sk_data_dir / tenant_id_other / timeline_id_other).is_dir() + assert not (sk_data_dir / str(tenant_id) / str(timeline_id_1)).exists() + assert (sk_data_dir / str(tenant_id) / str(timeline_id_2)).is_dir() + assert (sk_data_dir / str(tenant_id) / str(timeline_id_3)).is_dir() + assert (sk_data_dir / str(tenant_id) / str(timeline_id_4)).is_dir() + assert (sk_data_dir / str(tenant_id_other) / str(timeline_id_other)).is_dir() if auth_enabled: # Ensure we cannot delete the other tenant @@ -1118,44 +1120,44 @@ def test_delete_force(neon_env_builder: NeonEnvBuilder, auth_enabled: bool): assert sk_h.timeline_delete_force(tenant_id_other, timeline_id_other) with pytest.raises(sk_h.HTTPError, match="Forbidden|Unauthorized"): assert sk_h.tenant_delete_force(tenant_id_other) - assert (sk_data_dir / tenant_id_other / timeline_id_other).is_dir() + assert (sk_data_dir / str(tenant_id_other) / str(timeline_id_other)).is_dir() # Remove initial tenant's br2 (inactive) assert sk_http.timeline_delete_force(tenant_id, timeline_id_2) == { "dir_existed": True, "was_active": False, } - assert not (sk_data_dir / tenant_id / timeline_id_1).exists() - assert not (sk_data_dir / tenant_id / timeline_id_2).exists() - assert (sk_data_dir / tenant_id / timeline_id_3).is_dir() - assert (sk_data_dir / tenant_id / timeline_id_4).is_dir() - assert (sk_data_dir / tenant_id_other / timeline_id_other).is_dir() + assert not (sk_data_dir / str(tenant_id) / str(timeline_id_1)).exists() + assert not (sk_data_dir / str(tenant_id) / str(timeline_id_2)).exists() + assert (sk_data_dir / str(tenant_id) / str(timeline_id_3)).is_dir() + assert (sk_data_dir / str(tenant_id) / str(timeline_id_4)).is_dir() + assert (sk_data_dir / str(tenant_id_other) / str(timeline_id_other)).is_dir() # Remove non-existing branch, should succeed - assert sk_http.timeline_delete_force(tenant_id, "00" * 16) == { + assert sk_http.timeline_delete_force(tenant_id, ZTimelineId("00" * 16)) == { "dir_existed": False, "was_active": False, } - assert not (sk_data_dir / tenant_id / timeline_id_1).exists() - assert not (sk_data_dir / tenant_id / timeline_id_2).exists() - assert (sk_data_dir / tenant_id / timeline_id_3).exists() - assert (sk_data_dir / tenant_id / timeline_id_4).is_dir() - assert (sk_data_dir / tenant_id_other / timeline_id_other).is_dir() + assert not (sk_data_dir / str(tenant_id) / str(timeline_id_1)).exists() + assert not (sk_data_dir / str(tenant_id) / str(timeline_id_2)).exists() + assert (sk_data_dir / str(tenant_id) / str(timeline_id_3)).exists() + assert (sk_data_dir / str(tenant_id) / str(timeline_id_4)).is_dir() + assert (sk_data_dir / str(tenant_id_other) / str(timeline_id_other)).is_dir() # Remove initial tenant fully (two branches are active) response = sk_http.tenant_delete_force(tenant_id) - assert response[timeline_id_3] == { + assert response[str(timeline_id_3)] == { "dir_existed": True, "was_active": True, } - assert not (sk_data_dir / tenant_id).exists() - assert (sk_data_dir / tenant_id_other / timeline_id_other).is_dir() + assert not (sk_data_dir / str(tenant_id)).exists() + assert (sk_data_dir / str(tenant_id_other) / str(timeline_id_other)).is_dir() # Remove initial tenant again. response = sk_http.tenant_delete_force(tenant_id) assert response == {} - assert not (sk_data_dir / tenant_id).exists() - assert (sk_data_dir / tenant_id_other / timeline_id_other).is_dir() + assert not (sk_data_dir / str(tenant_id)).exists() + assert (sk_data_dir / str(tenant_id_other) / str(timeline_id_other)).is_dir() # Ensure the other tenant still works sk_http_other.timeline_status(tenant_id_other, timeline_id_other) diff --git a/test_runner/regress/test_wal_acceptor_async.py b/test_runner/regress/test_wal_acceptor_async.py index 83285e0cbe..e36d3cf94b 100644 --- a/test_runner/regress/test_wal_acceptor_async.py +++ b/test_runner/regress/test_wal_acceptor_async.py @@ -1,14 +1,13 @@ import asyncio import random import time -import uuid from dataclasses import dataclass from typing import List, Optional import asyncpg from fixtures.log_helper import getLogger from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, Postgres, Safekeeper -from fixtures.utils import lsn_from_hex, lsn_to_hex +from fixtures.types import Lsn, ZTenantId, ZTimelineId log = getLogger("root.safekeeper_async") @@ -104,9 +103,9 @@ async def run_random_worker(stats: WorkerStats, pg: Postgres, worker_id, n_accou async def wait_for_lsn( safekeeper: Safekeeper, - tenant_id: str, - timeline_id: str, - wait_lsn: str, + tenant_id: ZTenantId, + timeline_id: ZTimelineId, + wait_lsn: Lsn, polling_interval=1, timeout=60, ): @@ -124,7 +123,7 @@ async def wait_for_lsn( f"Safekeeper at port {safekeeper.port.pg} has flush_lsn {flush_lsn}, waiting for lsn {wait_lsn}" ) - while lsn_from_hex(wait_lsn) > lsn_from_hex(flush_lsn): + while wait_lsn > flush_lsn: elapsed = time.time() - started_at if elapsed > timeout: raise RuntimeError( @@ -156,8 +155,8 @@ async def run_restarts_under_load( test_timeout_at = time.monotonic() + 5 * 60 pg_conn = await pg.connect_async() - tenant_id = await pg_conn.fetchval("show neon.tenant_id") - timeline_id = await pg_conn.fetchval("show neon.timeline_id") + tenant_id = ZTenantId(await pg_conn.fetchval("show neon.tenant_id")) + timeline_id = ZTimelineId(await pg_conn.fetchval("show neon.timeline_id")) bank = BankClient(pg_conn, n_accounts=n_accounts, init_amount=init_amount) # create tables and initial balances @@ -176,14 +175,15 @@ async def run_restarts_under_load( victim = acceptors[victim_idx] victim.stop() - flush_lsn = await pg_conn.fetchval("SELECT pg_current_wal_flush_lsn()") - flush_lsn = lsn_to_hex(flush_lsn) + flush_lsn = Lsn(await pg_conn.fetchval("SELECT pg_current_wal_flush_lsn()")) log.info(f"Postgres flush_lsn {flush_lsn}") - pageserver_lsn = env.pageserver.http_client().timeline_detail( - uuid.UUID(tenant_id), uuid.UUID((timeline_id)) - )["local"]["last_record_lsn"] - sk_ps_lag = lsn_from_hex(flush_lsn) - lsn_from_hex(pageserver_lsn) + pageserver_lsn = Lsn( + env.pageserver.http_client().timeline_detail(tenant_id, timeline_id)["local"][ + "last_record_lsn" + ] + ) + sk_ps_lag = flush_lsn - pageserver_lsn log.info(f"Pageserver last_record_lsn={pageserver_lsn} lag={sk_ps_lag / 1024}kb") # Wait until alive safekeepers catch up with postgres diff --git a/test_runner/regress/test_wal_restore.py b/test_runner/regress/test_wal_restore.py index 0847b5a505..6fd509c4d1 100644 --- a/test_runner/regress/test_wal_restore.py +++ b/test_runner/regress/test_wal_restore.py @@ -9,6 +9,7 @@ from fixtures.neon_fixtures import ( base_dir, pg_distrib_dir, ) +from fixtures.types import ZTenantId def test_wal_restore( @@ -21,7 +22,7 @@ def test_wal_restore( env.neon_cli.create_branch("test_wal_restore") pg = env.postgres.create_start("test_wal_restore") pg.safe_psql("create table t as select generate_series(1,300000)") - tenant_id = pg.safe_psql("show neon.tenant_id")[0][0] + tenant_id = ZTenantId(pg.safe_psql("show neon.tenant_id")[0][0]) env.neon_cli.pageserver_stop() port = port_distributor.get_port() data_dir = test_output_dir / "pgsql.restored"