Allow reusing projects between runs of logical replication benchmarks (#8393)

This commit is contained in:
Sasha Krassovsky
2024-07-15 14:55:57 -07:00
committed by GitHub
parent 730db859c7
commit 7eb37fea26
3 changed files with 179 additions and 214 deletions

View File

@@ -261,3 +261,47 @@ class NeonAPI:
if op["status"] in {"scheduling", "running", "cancelling"}:
has_running = True
time.sleep(0.5)
class NeonApiEndpoint:
def __init__(self, neon_api: NeonAPI, pg_version: PgVersion, project_id: Optional[str]):
self.neon_api = neon_api
if project_id is None:
project = neon_api.create_project(pg_version)
neon_api.wait_for_operation_to_finish(project["project"]["id"])
self.project_id = project["project"]["id"]
self.endpoint_id = project["endpoints"][0]["id"]
self.connstr = project["connection_uris"][0]["connection_uri"]
self.pgbench_env = connection_parameters_to_env(
project["connection_uris"][0]["connection_parameters"]
)
self.is_new = True
else:
project = neon_api.get_project_details(project_id)
if int(project["project"]["pg_version"]) != int(pg_version):
raise Exception(
f"A project with the provided ID exists, but it's not of the specified version (expected {pg_version}, got {project['project']['pg_version']})"
)
self.project_id = project_id
eps = neon_api.get_endpoints(project_id)["endpoints"]
self.endpoint_id = eps[0]["id"]
self.connstr = neon_api.get_connection_uri(project_id, endpoint_id=self.endpoint_id)[
"uri"
]
pw = self.connstr.split("@")[0].split(":")[-1]
self.pgbench_env = {
"PGHOST": eps[0]["host"],
"PGDATABASE": "neondb",
"PGUSER": "neondb_owner",
"PGPASSWORD": pw,
}
self.is_new = False
def restart(self):
self.neon_api.restart_endpoint(self.project_id, self.endpoint_id)
self.neon_api.wait_for_operation_to_finish(self.project_id)
def get_synthetic_storage_size(self) -> int:
return int(
self.neon_api.get_project_details(self.project_id)["project"]["synthetic_storage_size"]
)

View File

@@ -87,7 +87,7 @@ from fixtures.utils import (
)
from fixtures.utils import AuxFileStore as AuxFileStore # reexport
from .neon_api import NeonAPI
from .neon_api import NeonAPI, NeonApiEndpoint
"""
This file contains pytest fixtures. A fixture is a test resource that can be
@@ -3158,6 +3158,18 @@ class RemotePostgres(PgProtocol):
pass
@pytest.fixture(scope="function")
def benchmark_project_pub(neon_api: NeonAPI, pg_version: PgVersion) -> NeonApiEndpoint:
project_id = os.getenv("BENCHMARK_PROJECT_ID_PUB")
return NeonApiEndpoint(neon_api, pg_version, project_id)
@pytest.fixture(scope="function")
def benchmark_project_sub(neon_api: NeonAPI, pg_version: PgVersion) -> NeonApiEndpoint:
project_id = os.getenv("BENCHMARK_PROJECT_ID_SUB")
return NeonApiEndpoint(neon_api, pg_version, project_id)
@pytest.fixture(scope="function")
def remote_pg(
test_output_dir: Path, pg_distrib_dir: Path, pg_version: PgVersion

View File

@@ -1,7 +1,6 @@
from __future__ import annotations
import time
import traceback
from typing import TYPE_CHECKING
import psycopg2
@@ -10,15 +9,12 @@ import pytest
from fixtures.benchmark_fixture import MetricReport
from fixtures.common_types import Lsn
from fixtures.log_helper import log
from fixtures.neon_api import connection_parameters_to_env
from fixtures.neon_fixtures import AuxFileStore, logical_replication_sync
from fixtures.pg_version import PgVersion
if TYPE_CHECKING:
from fixtures.benchmark_fixture import NeonBenchmarker
from fixtures.neon_api import NeonAPI
from fixtures.neon_api import NeonApiEndpoint
from fixtures.neon_fixtures import NeonEnv, PgBin
from fixtures.pg_version import PgVersion
@pytest.mark.parametrize("pageserver_aux_file_policy", [AuxFileStore.V2])
@@ -86,8 +82,8 @@ def measure_logical_replication_lag(sub_cur, pub_cur, timeout_sec=600):
@pytest.mark.timeout(2 * 60 * 60)
def test_subscriber_lag(
pg_bin: PgBin,
neon_api: NeonAPI,
pg_version: PgVersion,
benchmark_project_pub: NeonApiEndpoint,
benchmark_project_sub: NeonApiEndpoint,
zenbenchmark: NeonBenchmarker,
):
"""
@@ -99,125 +95,82 @@ def test_subscriber_lag(
sync_interval_min = 5
pgbench_duration = f"-T{test_duration_min * 60 * 2}"
pub_project = neon_api.create_project(pg_version)
pub_project_id = pub_project["project"]["id"]
neon_api.wait_for_operation_to_finish(pub_project_id)
error_occurred = False
pub_env = benchmark_project_pub.pgbench_env
sub_env = benchmark_project_sub.pgbench_env
pub_connstr = benchmark_project_pub.connstr
sub_connstr = benchmark_project_sub.connstr
pg_bin.run_capture(["pgbench", "-i", "-s100"], env=pub_env)
pg_bin.run_capture(["pgbench", "-i", "-s100"], env=sub_env)
pub_conn = psycopg2.connect(pub_connstr)
sub_conn = psycopg2.connect(sub_connstr)
pub_conn.autocommit = True
sub_conn.autocommit = True
with pub_conn.cursor() as pub_cur, sub_conn.cursor() as sub_cur:
if benchmark_project_pub.is_new:
pub_cur.execute("create publication pub1 for table pgbench_accounts, pgbench_history")
if benchmark_project_sub.is_new:
sub_cur.execute("truncate table pgbench_accounts")
sub_cur.execute("truncate table pgbench_history")
sub_cur.execute(f"create subscription sub1 connection '{pub_connstr}' publication pub1")
initial_sync_lag = measure_logical_replication_lag(sub_cur, pub_cur)
pub_conn.close()
sub_conn.close()
zenbenchmark.record("initial_sync_lag", initial_sync_lag, "s", MetricReport.LOWER_IS_BETTER)
pub_workload = pg_bin.run_nonblocking(
["pgbench", "-c10", pgbench_duration, "-Mprepared"], env=pub_env
)
try:
sub_project = neon_api.create_project(pg_version)
sub_project_id = sub_project["project"]["id"]
sub_endpoint_id = sub_project["endpoints"][0]["id"]
neon_api.wait_for_operation_to_finish(sub_project_id)
sub_workload = pg_bin.run_nonblocking(
["pgbench", "-c10", pgbench_duration, "-S"],
env=sub_env,
)
try:
pub_env = connection_parameters_to_env(
pub_project["connection_uris"][0]["connection_parameters"]
)
sub_env = connection_parameters_to_env(
sub_project["connection_uris"][0]["connection_parameters"]
)
pub_connstr = pub_project["connection_uris"][0]["connection_uri"]
sub_connstr = sub_project["connection_uris"][0]["connection_uri"]
start = time.time()
while time.time() - start < test_duration_min * 60:
time.sleep(sync_interval_min * 60)
check_pgbench_still_running(pub_workload, "pub")
check_pgbench_still_running(sub_workload, "sub")
pg_bin.run_capture(["pgbench", "-i", "-s100"], env=pub_env)
pg_bin.run_capture(["pgbench", "-i", "-s100"], env=sub_env)
with psycopg2.connect(pub_connstr) as pub_conn, psycopg2.connect(
sub_connstr
) as sub_conn:
with pub_conn.cursor() as pub_cur, sub_conn.cursor() as sub_cur:
lag = measure_logical_replication_lag(sub_cur, pub_cur)
pub_conn = psycopg2.connect(pub_connstr)
sub_conn = psycopg2.connect(sub_connstr)
pub_conn.autocommit = True
sub_conn.autocommit = True
with pub_conn.cursor() as pub_cur, sub_conn.cursor() as sub_cur:
sub_cur.execute("truncate table pgbench_accounts")
sub_cur.execute("truncate table pgbench_history")
log.info(f"Replica lagged behind master by {lag} seconds")
zenbenchmark.record("replica_lag", lag, "s", MetricReport.LOWER_IS_BETTER)
sub_workload.terminate()
benchmark_project_sub.restart()
pub_cur.execute(
"create publication pub1 for table pgbench_accounts, pgbench_history"
)
sub_cur.execute(
f"create subscription sub1 connection '{pub_connstr}' publication pub1"
)
initial_sync_lag = measure_logical_replication_lag(sub_cur, pub_cur)
pub_conn.close()
sub_conn.close()
zenbenchmark.record(
"initial_sync_lag", initial_sync_lag, "s", MetricReport.LOWER_IS_BETTER
)
pub_workload = pg_bin.run_nonblocking(
["pgbench", "-c10", pgbench_duration, "-Mprepared"], env=pub_env
)
try:
sub_workload = pg_bin.run_nonblocking(
["pgbench", "-c10", pgbench_duration, "-S"],
env=sub_env,
)
try:
start = time.time()
while time.time() - start < test_duration_min * 60:
time.sleep(sync_interval_min * 60)
check_pgbench_still_running(pub_workload, "pub")
check_pgbench_still_running(sub_workload, "sub")
with psycopg2.connect(pub_connstr) as pub_conn, psycopg2.connect(
sub_connstr
) as sub_conn:
with pub_conn.cursor() as pub_cur, sub_conn.cursor() as sub_cur:
lag = measure_logical_replication_lag(sub_cur, pub_cur)
log.info(f"Replica lagged behind master by {lag} seconds")
zenbenchmark.record("replica_lag", lag, "s", MetricReport.LOWER_IS_BETTER)
sub_workload.terminate()
neon_api.restart_endpoint(
sub_project_id,
sub_endpoint_id,
)
neon_api.wait_for_operation_to_finish(sub_project_id)
sub_workload = pg_bin.run_nonblocking(
["pgbench", "-c10", pgbench_duration, "-S"],
env=sub_env,
)
# Measure storage to make sure replication information isn't bloating storage
sub_storage = neon_api.get_project_details(sub_project_id)["project"][
"synthetic_storage_size"
]
pub_storage = neon_api.get_project_details(pub_project_id)["project"][
"synthetic_storage_size"
]
zenbenchmark.record(
"sub_storage", sub_storage, "B", MetricReport.LOWER_IS_BETTER
)
zenbenchmark.record(
"pub_storage", pub_storage, "B", MetricReport.LOWER_IS_BETTER
)
finally:
sub_workload.terminate()
finally:
pub_workload.terminate()
except Exception as e:
error_occurred = True
log.error(f"Caught exception {e}")
log.error(traceback.format_exc())
# Measure storage to make sure replication information isn't bloating storage
sub_storage = benchmark_project_sub.get_synthetic_storage_size()
pub_storage = benchmark_project_pub.get_synthetic_storage_size()
zenbenchmark.record("sub_storage", sub_storage, "B", MetricReport.LOWER_IS_BETTER)
zenbenchmark.record("pub_storage", pub_storage, "B", MetricReport.LOWER_IS_BETTER)
finally:
if not error_occurred:
neon_api.delete_project(sub_project_id)
except Exception as e:
error_occurred = True
log.error(f"Caught exception {e}")
log.error(traceback.format_exc())
sub_workload.terminate()
finally:
assert not error_occurred
neon_api.delete_project(pub_project_id)
pub_workload.terminate()
@pytest.mark.remote_cluster
@pytest.mark.timeout(2 * 60 * 60)
def test_publisher_restart(
pg_bin: PgBin,
neon_api: NeonAPI,
pg_version: PgVersion,
benchmark_project_pub: NeonApiEndpoint,
benchmark_project_sub: NeonApiEndpoint,
zenbenchmark: NeonBenchmarker,
):
"""
@@ -229,114 +182,70 @@ def test_publisher_restart(
sync_interval_min = 5
pgbench_duration = f"-T{test_duration_min * 60 * 2}"
pub_project = neon_api.create_project(pg_version)
pub_project_id = pub_project["project"]["id"]
pub_endpoint_id = pub_project["endpoints"][0]["id"]
neon_api.wait_for_operation_to_finish(pub_project_id)
error_occurred = False
pub_env = benchmark_project_pub.pgbench_env
sub_env = benchmark_project_sub.pgbench_env
pub_connstr = benchmark_project_pub.connstr
sub_connstr = benchmark_project_sub.connstr
pg_bin.run_capture(["pgbench", "-i", "-s100"], env=pub_env)
pg_bin.run_capture(["pgbench", "-i", "-s100"], env=sub_env)
pub_conn = psycopg2.connect(pub_connstr)
sub_conn = psycopg2.connect(sub_connstr)
pub_conn.autocommit = True
sub_conn.autocommit = True
with pub_conn.cursor() as pub_cur, sub_conn.cursor() as sub_cur:
if benchmark_project_pub.is_new:
pub_cur.execute("create publication pub1 for table pgbench_accounts, pgbench_history")
if benchmark_project_sub.is_new:
sub_cur.execute("truncate table pgbench_accounts")
sub_cur.execute("truncate table pgbench_history")
sub_cur.execute(f"create subscription sub1 connection '{pub_connstr}' publication pub1")
initial_sync_lag = measure_logical_replication_lag(sub_cur, pub_cur)
pub_conn.close()
sub_conn.close()
zenbenchmark.record("initial_sync_lag", initial_sync_lag, "s", MetricReport.LOWER_IS_BETTER)
pub_workload = pg_bin.run_nonblocking(
["pgbench", "-c10", pgbench_duration, "-Mprepared"], env=pub_env
)
try:
sub_project = neon_api.create_project(pg_version)
sub_project_id = sub_project["project"]["id"]
neon_api.wait_for_operation_to_finish(sub_project_id)
sub_workload = pg_bin.run_nonblocking(
["pgbench", "-c10", pgbench_duration, "-S"],
env=sub_env,
)
try:
pub_env = connection_parameters_to_env(
pub_project["connection_uris"][0]["connection_parameters"]
)
sub_env = connection_parameters_to_env(
sub_project["connection_uris"][0]["connection_parameters"]
)
pub_connstr = pub_project["connection_uris"][0]["connection_uri"]
sub_connstr = sub_project["connection_uris"][0]["connection_uri"]
start = time.time()
while time.time() - start < test_duration_min * 60:
time.sleep(sync_interval_min * 60)
check_pgbench_still_running(pub_workload, "pub")
check_pgbench_still_running(sub_workload, "sub")
pg_bin.run_capture(["pgbench", "-i", "-s100"], env=pub_env)
pg_bin.run_capture(["pgbench", "-i", "-s100"], env=sub_env)
pub_conn = psycopg2.connect(pub_connstr)
sub_conn = psycopg2.connect(sub_connstr)
pub_conn.autocommit = True
sub_conn.autocommit = True
with pub_conn.cursor() as pub_cur, sub_conn.cursor() as sub_cur:
sub_cur.execute("truncate table pgbench_accounts")
sub_cur.execute("truncate table pgbench_history")
pub_cur.execute(
"create publication pub1 for table pgbench_accounts, pgbench_history"
)
sub_cur.execute(
f"create subscription sub1 connection '{pub_connstr}' publication pub1"
)
initial_sync_lag = measure_logical_replication_lag(sub_cur, pub_cur)
pub_conn.close()
sub_conn.close()
zenbenchmark.record(
"initial_sync_lag", initial_sync_lag, "s", MetricReport.LOWER_IS_BETTER
)
pub_workload = pg_bin.run_nonblocking(
["pgbench", "-c10", pgbench_duration, "-Mprepared"], env=pub_env
)
try:
sub_workload = pg_bin.run_nonblocking(
["pgbench", "-c10", pgbench_duration, "-S"],
env=sub_env,
)
try:
start = time.time()
while time.time() - start < test_duration_min * 60:
time.sleep(sync_interval_min * 60)
check_pgbench_still_running(pub_workload, "pub")
check_pgbench_still_running(sub_workload, "sub")
pub_workload.terminate()
neon_api.restart_endpoint(
pub_project_id,
pub_endpoint_id,
)
neon_api.wait_for_operation_to_finish(pub_project_id)
pub_workload = pg_bin.run_nonblocking(
["pgbench", "-c10", pgbench_duration, "-Mprepared"],
env=pub_env,
)
with psycopg2.connect(pub_connstr) as pub_conn, psycopg2.connect(
sub_connstr
) as sub_conn:
with pub_conn.cursor() as pub_cur, sub_conn.cursor() as sub_cur:
lag = measure_logical_replication_lag(sub_cur, pub_cur)
log.info(f"Replica lagged behind master by {lag} seconds")
zenbenchmark.record("replica_lag", lag, "s", MetricReport.LOWER_IS_BETTER)
# Measure storage to make sure replication information isn't bloating storage
sub_storage = neon_api.get_project_details(sub_project_id)["project"][
"synthetic_storage_size"
]
pub_storage = neon_api.get_project_details(pub_project_id)["project"][
"synthetic_storage_size"
]
zenbenchmark.record(
"sub_storage", sub_storage, "B", MetricReport.LOWER_IS_BETTER
)
zenbenchmark.record(
"pub_storage", pub_storage, "B", MetricReport.LOWER_IS_BETTER
)
finally:
sub_workload.terminate()
finally:
pub_workload.terminate()
except Exception as e:
error_occurred = True
log.error(f"Caught exception {e}")
log.error(traceback.format_exc())
benchmark_project_pub.restart()
pub_workload = pg_bin.run_nonblocking(
["pgbench", "-c10", pgbench_duration, "-Mprepared"],
env=pub_env,
)
with psycopg2.connect(pub_connstr) as pub_conn, psycopg2.connect(
sub_connstr
) as sub_conn:
with pub_conn.cursor() as pub_cur, sub_conn.cursor() as sub_cur:
lag = measure_logical_replication_lag(sub_cur, pub_cur)
log.info(f"Replica lagged behind master by {lag} seconds")
zenbenchmark.record("replica_lag", lag, "s", MetricReport.LOWER_IS_BETTER)
# Measure storage to make sure replication information isn't bloating storage
sub_storage = benchmark_project_sub.get_synthetic_storage_size()
pub_storage = benchmark_project_pub.get_synthetic_storage_size()
zenbenchmark.record("sub_storage", sub_storage, "B", MetricReport.LOWER_IS_BETTER)
zenbenchmark.record("pub_storage", pub_storage, "B", MetricReport.LOWER_IS_BETTER)
finally:
if not error_occurred:
neon_api.delete_project(sub_project_id)
except Exception as e:
error_occurred = True
log.error(f"Caught exception {e}")
log.error(traceback.format_exc())
sub_workload.terminate()
finally:
assert not error_occurred
neon_api.delete_project(pub_project_id)
pub_workload.terminate()