From 34812a6aaba2e2193b617cbb69ee4568aeca8274 Mon Sep 17 00:00:00 2001 From: Tristan Partin Date: Mon, 4 Nov 2024 15:52:01 -0600 Subject: [PATCH] Improve some typing related to performance testing for LR Signed-off-by: Tristan Partin --- test_runner/fixtures/neon_api.py | 11 +++++--- .../performance/test_logical_replication.py | 27 ++++++++++++------- 2 files changed, 25 insertions(+), 13 deletions(-) diff --git a/test_runner/fixtures/neon_api.py b/test_runner/fixtures/neon_api.py index 5934baccff..89c1f324b4 100644 --- a/test_runner/fixtures/neon_api.py +++ b/test_runner/fixtures/neon_api.py @@ -1,7 +1,7 @@ from __future__ import annotations import time -from typing import TYPE_CHECKING, cast +from typing import TYPE_CHECKING, cast, final import requests @@ -261,17 +261,22 @@ class NeonAPI: time.sleep(0.5) +@final class NeonApiEndpoint: def __init__(self, neon_api: NeonAPI, pg_version: PgVersion, project_id: Optional[str]): self.neon_api = neon_api + self.project_id: str + self.endpoint_id: str + self.connstr: str + if project_id is None: project = neon_api.create_project(pg_version) - neon_api.wait_for_operation_to_finish(project["project"]["id"]) + neon_api.wait_for_operation_to_finish(cast("str", project["project"]["id"])) self.project_id = project["project"]["id"] self.endpoint_id = project["endpoints"][0]["id"] self.connstr = project["connection_uris"][0]["connection_uri"] self.pgbench_env = connection_parameters_to_env( - project["connection_uris"][0]["connection_parameters"] + cast("dict[str, str]", project["connection_uris"][0]["connection_parameters"]) ) self.is_new = True else: diff --git a/test_runner/performance/test_logical_replication.py b/test_runner/performance/test_logical_replication.py index 8b2a296bdd..e62485905e 100644 --- a/test_runner/performance/test_logical_replication.py +++ b/test_runner/performance/test_logical_replication.py @@ -1,7 +1,7 @@ from __future__ import annotations import time -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, cast import psycopg2 import psycopg2.extras @@ -12,13 +12,16 @@ from fixtures.log_helper import log from fixtures.neon_fixtures import logical_replication_sync if TYPE_CHECKING: + from subprocess import Popen + from typing import AnyStr + from fixtures.benchmark_fixture import NeonBenchmarker from fixtures.neon_api import NeonApiEndpoint - from fixtures.neon_fixtures import NeonEnv, PgBin + from fixtures.neon_fixtures import NeonEnv, PgBin, VanillaPostgres @pytest.mark.timeout(1000) -def test_logical_replication(neon_simple_env: NeonEnv, pg_bin: PgBin, vanilla_pg): +def test_logical_replication(neon_simple_env: NeonEnv, pg_bin: PgBin, vanilla_pg: VanillaPostgres): env = neon_simple_env endpoint = env.endpoints.create_start("main") @@ -47,24 +50,28 @@ def test_logical_replication(neon_simple_env: NeonEnv, pg_bin: PgBin, vanilla_pg logical_replication_sync(vanilla_pg, endpoint) log.info(f"Sync with master took {time.time() - start} seconds") - sum_master = endpoint.safe_psql("select sum(abalance) from pgbench_accounts")[0][0] - sum_replica = vanilla_pg.safe_psql("select sum(abalance) from pgbench_accounts")[0][0] + sum_master = cast("int", endpoint.safe_psql("select sum(abalance) from pgbench_accounts")[0][0]) + sum_replica = cast( + "int", vanilla_pg.safe_psql("select sum(abalance) from pgbench_accounts")[0][0] + ) assert sum_master == sum_replica -def check_pgbench_still_running(pgbench, label=""): +def check_pgbench_still_running(pgbench: Popen[AnyStr], label: str = ""): rc = pgbench.poll() if rc is not None: raise RuntimeError(f"{label} pgbench terminated early with return code {rc}") -def measure_logical_replication_lag(sub_cur, pub_cur, timeout_sec=600): +def measure_logical_replication_lag( + sub_cur: psycopg2.cursor, pub_cur: psycopg2.cursor, timeout_sec: float = 600 +): start = time.time() pub_cur.execute("SELECT pg_current_wal_flush_lsn()") - pub_lsn = Lsn(pub_cur.fetchall()[0][0]) + pub_lsn = Lsn(cast("str", pub_cur.fetchall()[0][0])) while (time.time() - start) < timeout_sec: sub_cur.execute("SELECT latest_end_lsn FROM pg_catalog.pg_stat_subscription") - res = sub_cur.fetchall()[0][0] + res = cast("str", sub_cur.fetchall()[0][0]) if res: log.info(f"subscriber_lsn={res}") sub_lsn = Lsn(res) @@ -286,7 +293,7 @@ def test_snap_files( conn.autocommit = True with conn.cursor() as cur: cur.execute("SELECT rolsuper FROM pg_roles WHERE rolname = 'neondb_owner'") - is_super = cur.fetchall()[0][0] + is_super = cast("bool", cur.fetchall()[0][0]) assert is_super, "This benchmark won't work if we don't have superuser" pg_bin.run_capture(["pgbench", "-i", "-I", "dtGvp", "-s100"], env=env)