diff --git a/.github/actionlint.yml b/.github/actionlint.yml index 25b2fc702a..8a4bcaf811 100644 --- a/.github/actionlint.yml +++ b/.github/actionlint.yml @@ -31,7 +31,7 @@ config-variables: - NEON_PROD_AWS_ACCOUNT_ID - PGREGRESS_PG16_PROJECT_ID - PGREGRESS_PG17_PROJECT_ID - - PREWARM_PGBENCH_SIZE + - PREWARM_PROJECT_ID - REMOTE_STORAGE_AZURE_CONTAINER - REMOTE_STORAGE_AZURE_REGION - SLACK_CICD_CHANNEL_ID diff --git a/.github/workflows/benchmarking.yml b/.github/workflows/benchmarking.yml index df80bad579..c9a998bd4e 100644 --- a/.github/workflows/benchmarking.yml +++ b/.github/workflows/benchmarking.yml @@ -418,7 +418,7 @@ jobs: statuses: write id-token: write # aws-actions/configure-aws-credentials env: - PGBENCH_SIZE: ${{ vars.PREWARM_PGBENCH_SIZE }} + PROJECT_ID: ${{ vars.PREWARM_PROJECT_ID }} POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install DEFAULT_PG_VERSION: 17 TEST_OUTPUT: /tmp/test_output diff --git a/test_runner/fixtures/compare_fixtures.py b/test_runner/fixtures/compare_fixtures.py index 150046b99a..3d248efc04 100644 --- a/test_runner/fixtures/compare_fixtures.py +++ b/test_runner/fixtures/compare_fixtures.py @@ -16,6 +16,7 @@ from typing_extensions import override from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker from fixtures.log_helper import log from fixtures.neon_fixtures import ( + Endpoint, NeonEnv, PgBin, PgProtocol, @@ -129,6 +130,10 @@ class NeonCompare(PgCompare): # Start pg self._pg = self.env.endpoints.create_start("main", "main", self.tenant) + @property + def endpoint(self) -> Endpoint: + return self._pg + @property @override def pg(self) -> PgProtocol: diff --git a/test_runner/fixtures/endpoint/http.py b/test_runner/fixtures/endpoint/http.py index 64db2b1f17..d235ac2143 100644 --- a/test_runner/fixtures/endpoint/http.py +++ b/test_runner/fixtures/endpoint/http.py @@ -79,18 +79,28 @@ class EndpointHttpClient(requests.Session): return json def prewarm_lfc(self, from_endpoint_id: str | None = None): + """ + Prewarm LFC cache from given endpoint and wait till it finishes or errors + """ params = {"from_endpoint": from_endpoint_id} if from_endpoint_id else dict() self.post(self.prewarm_url, params=params).raise_for_status() self.prewarm_lfc_wait() def prewarm_lfc_wait(self): + """ + Wait till LFC prewarm returns with error or success. + If prewarm was not requested before calling this function, it will error + """ + statuses = "failed", "completed", "skipped" + def prewarmed(): json = self.prewarm_lfc_status() status, err = json["status"], json.get("error") - assert status in ["failed", "completed", "skipped"], f"{status}, {err=}" + assert status in statuses, f"{status}, {err=}" wait_until(prewarmed, timeout=60) - assert self.prewarm_lfc_status()["status"] != "failed" + res = self.prewarm_lfc_status() + assert res["status"] != "failed", res def offload_lfc_status(self) -> dict[str, str]: res = self.get(self.offload_url) @@ -99,17 +109,26 @@ class EndpointHttpClient(requests.Session): return json def offload_lfc(self): + """ + Offload LFC cache to endpoint storage and wait till offload finishes or errors + """ self.post(self.offload_url).raise_for_status() self.offload_lfc_wait() def offload_lfc_wait(self): + """ + Wait till LFC offload returns with error or success. + If offload was not requested before calling this function, it will error + """ + def offloaded(): json = self.offload_lfc_status() status, err = json["status"], json.get("error") assert status in ["failed", "completed"], f"{status}, {err=}" - wait_until(offloaded) - assert self.offload_lfc_status()["status"] != "failed" + wait_until(offloaded, timeout=60) + res = self.offload_lfc_status() + assert res["status"] != "failed", res def promote(self, promote_spec: dict[str, Any], disconnect: bool = False): url = f"http://localhost:{self.external_port}/promote" diff --git a/test_runner/fixtures/neon_api.py b/test_runner/fixtures/neon_api.py index 8d447c837f..69160dab20 100644 --- a/test_runner/fixtures/neon_api.py +++ b/test_runner/fixtures/neon_api.py @@ -1,5 +1,6 @@ from __future__ import annotations +import re import time from typing import TYPE_CHECKING, cast, final @@ -13,6 +14,17 @@ if TYPE_CHECKING: from fixtures.pg_version import PgVersion +def connstr_to_env(connstr: str) -> dict[str, str]: + # postgresql://neondb_owner:npg_kuv6Rqi1cB@ep-old-silence-w26pxsvz-pooler.us-east-2.aws.neon.build/neondb?sslmode=require&channel_binding=...' + parts = re.split(r":|@|\/|\?", connstr.removeprefix("postgresql://")) + return { + "PGUSER": parts[0], + "PGPASSWORD": parts[1], + "PGHOST": parts[2], + "PGDATABASE": parts[3], + } + + def connection_parameters_to_env(params: dict[str, str]) -> dict[str, str]: return { "PGHOST": params["host"], diff --git a/test_runner/performance/test_lfc_prewarm.py b/test_runner/performance/test_lfc_prewarm.py index 6c0083de95..d459f9f3bf 100644 --- a/test_runner/performance/test_lfc_prewarm.py +++ b/test_runner/performance/test_lfc_prewarm.py @@ -2,45 +2,48 @@ from __future__ import annotations import os import timeit -import traceback -from concurrent.futures import ThreadPoolExecutor as Exec from pathlib import Path +from threading import Thread from time import sleep -from typing import TYPE_CHECKING, Any, cast +from typing import TYPE_CHECKING, cast import pytest from fixtures.benchmark_fixture import NeonBenchmarker, PgBenchRunResult from fixtures.log_helper import log -from fixtures.neon_api import NeonAPI, connection_parameters_to_env +from fixtures.neon_api import NeonAPI, connstr_to_env + +from performance.test_perf_pgbench import utc_now_timestamp if TYPE_CHECKING: from fixtures.compare_fixtures import NeonCompare from fixtures.neon_fixtures import Endpoint, PgBin from fixtures.pg_version import PgVersion -from performance.test_perf_pgbench import utc_now_timestamp # These tests compare performance for a write-heavy and read-heavy workloads of an ordinary endpoint -# compared to the endpoint which saves its LFC and prewarms using it on startup. +# compared to the endpoint which saves its LFC and prewarms using it on startup def test_compare_prewarmed_pgbench_perf(neon_compare: NeonCompare): env = neon_compare.env - env.create_branch("normal") env.create_branch("prewarmed") pg_bin = neon_compare.pg_bin - ep_normal: Endpoint = env.endpoints.create_start("normal") - ep_prewarmed: Endpoint = env.endpoints.create_start("prewarmed", autoprewarm=True) + ep_ordinary: Endpoint = neon_compare.endpoint + ep_prewarmed: Endpoint = env.endpoints.create_start("prewarmed") - for ep in [ep_normal, ep_prewarmed]: + for ep in [ep_ordinary, ep_prewarmed]: connstr: str = ep.connstr() pg_bin.run(["pgbench", "-i", "-I", "dtGvp", connstr, "-s100"]) - ep.safe_psql("CREATE EXTENSION neon") - client = ep.http_client() - client.offload_lfc() - ep.stop() - ep.start() - client.prewarm_lfc_wait() + ep.safe_psql("CREATE SCHEMA neon; CREATE EXTENSION neon WITH SCHEMA neon") + if ep == ep_prewarmed: + client = ep.http_client() + client.offload_lfc() + ep.stop() + ep.start(autoprewarm=True) + client.prewarm_lfc_wait() + else: + ep.stop() + ep.start() run_start_timestamp = utc_now_timestamp() t0 = timeit.default_timer() @@ -59,6 +62,36 @@ def test_compare_prewarmed_pgbench_perf(neon_compare: NeonCompare): neon_compare.zenbenchmark.record_pg_bench_result(name, res) +def test_compare_prewarmed_read_perf(neon_compare: NeonCompare): + env = neon_compare.env + env.create_branch("prewarmed") + ep_ordinary: Endpoint = neon_compare.endpoint + ep_prewarmed: Endpoint = env.endpoints.create_start("prewarmed") + + sql = [ + "CREATE SCHEMA neon", + "CREATE EXTENSION neon WITH SCHEMA neon", + "CREATE TABLE foo(key serial primary key, t text default 'foooooooooooooooooooooooooooooooooooooooooooooooooooo')", + "INSERT INTO foo SELECT FROM generate_series(1,1000000)", + ] + sql_check = "SELECT count(*) from foo" + + ep_ordinary.safe_psql_many(sql) + ep_ordinary.stop() + ep_ordinary.start() + with neon_compare.record_duration("ordinary_run_duration"): + ep_ordinary.safe_psql(sql_check) + + ep_prewarmed.safe_psql_many(sql) + client = ep_prewarmed.http_client() + client.offload_lfc() + ep_prewarmed.stop() + ep_prewarmed.start(autoprewarm=True) + client.prewarm_lfc_wait() + with neon_compare.record_duration("prewarmed_run_duration"): + ep_prewarmed.safe_psql(sql_check) + + @pytest.mark.remote_cluster @pytest.mark.timeout(2 * 60 * 60) def test_compare_prewarmed_pgbench_perf_benchmark( @@ -67,67 +100,66 @@ def test_compare_prewarmed_pgbench_perf_benchmark( pg_version: PgVersion, zenbenchmark: NeonBenchmarker, ): - name = f"Test prewarmed pgbench performance, GITHUB_RUN_ID={os.getenv('GITHUB_RUN_ID')}" - project = neon_api.create_project(pg_version, name) - project_id = project["project"]["id"] - neon_api.wait_for_operation_to_finish(project_id) - err = False - try: - benchmark_impl(pg_bin, neon_api, project, zenbenchmark) - except Exception as e: - err = True - log.error(f"Caught exception: {e}") - log.error(traceback.format_exc()) - finally: - assert not err - neon_api.delete_project(project_id) + """ + Prewarm API is not public, so this test relies on a pre-created project + with pgbench size of 3424, pgbench -i -IdtGvp -s3424. Sleeping and + offloading constants are hardcoded to this size as well + """ + project_id = os.getenv("PROJECT_ID") + assert project_id + ordinary_branch_id = "" + prewarmed_branch_id = "" + for branch in neon_api.get_branches(project_id)["branches"]: + if branch["name"] == "ordinary": + ordinary_branch_id = branch["id"] + if branch["name"] == "prewarmed": + prewarmed_branch_id = branch["id"] + assert len(ordinary_branch_id) > 0 + assert len(prewarmed_branch_id) > 0 + + ep_ordinary = None + ep_prewarmed = None + for ep in neon_api.get_endpoints(project_id)["endpoints"]: + if ep["branch_id"] == ordinary_branch_id: + ep_ordinary = ep + if ep["branch_id"] == prewarmed_branch_id: + ep_prewarmed = ep + assert ep_ordinary + assert ep_prewarmed + ordinary_id = ep_ordinary["id"] + prewarmed_id = ep_prewarmed["id"] -def benchmark_impl( - pg_bin: PgBin, neon_api: NeonAPI, project: dict[str, Any], zenbenchmark: NeonBenchmarker -): - pgbench_size = int(os.getenv("PGBENCH_SIZE") or "3424") # 50GB offload_secs = 20 - test_duration_min = 5 + test_duration_min = 3 pgbench_duration = f"-T{test_duration_min * 60}" - # prewarm API is not publicly exposed. In order to test performance of a - # fully prewarmed endpoint, wait after it restarts. - # The number here is empirical, based on manual runs on staging + pgbench_init_cmd = ["pgbench", "-P10", "-n", "-c10", pgbench_duration, "-Mprepared"] + pgbench_perf_cmd = pgbench_init_cmd + ["-S"] prewarmed_sleep_secs = 180 - branch_id = project["branch"]["id"] - project_id = project["project"]["id"] - normal_env = connection_parameters_to_env( - project["connection_uris"][0]["connection_parameters"] - ) - normal_id = project["endpoints"][0]["id"] - - prewarmed_branch_id = neon_api.create_branch( - project_id, "prewarmed", parent_id=branch_id, add_endpoint=False - )["branch"]["id"] - neon_api.wait_for_operation_to_finish(project_id) - - ep_prewarmed = neon_api.create_endpoint( - project_id, - prewarmed_branch_id, - endpoint_type="read_write", - settings={"autoprewarm": True, "offload_lfc_interval_seconds": offload_secs}, - ) - neon_api.wait_for_operation_to_finish(project_id) - - prewarmed_env = normal_env.copy() - prewarmed_env["PGHOST"] = ep_prewarmed["endpoint"]["host"] - prewarmed_id = ep_prewarmed["endpoint"]["id"] + ordinary_uri = neon_api.get_connection_uri(project_id, ordinary_branch_id, ordinary_id)["uri"] + prewarmed_uri = neon_api.get_connection_uri(project_id, prewarmed_branch_id, prewarmed_id)[ + "uri" + ] def bench(endpoint_name, endpoint_id, env): - pg_bin.run(["pgbench", "-i", "-I", "dtGvp", f"-s{pgbench_size}"], env) - sleep(offload_secs * 2) # ensure LFC is offloaded after pgbench finishes - neon_api.restart_endpoint(project_id, endpoint_id) - sleep(prewarmed_sleep_secs) + log.info(f"Running pgbench for {pgbench_duration}s to warm up the cache") + pg_bin.run_capture(pgbench_init_cmd, env) # capture useful for debugging + log.info(f"Initialized {endpoint_name}") + if endpoint_name == "prewarmed": + log.info(f"sleeping {offload_secs * 2} to ensure LFC is offloaded") + sleep(offload_secs * 2) + neon_api.restart_endpoint(project_id, endpoint_id) + log.info(f"sleeping {prewarmed_sleep_secs} to ensure LFC is prewarmed") + sleep(prewarmed_sleep_secs) + else: + neon_api.restart_endpoint(project_id, endpoint_id) + + log.info(f"Starting benchmark for {endpoint_name}") run_start_timestamp = utc_now_timestamp() t0 = timeit.default_timer() - out = pg_bin.run_capture(["pgbench", "-c10", pgbench_duration, "-Mprepared"], env) + out = pg_bin.run_capture(pgbench_perf_cmd, env) run_duration = timeit.default_timer() - t0 run_end_timestamp = utc_now_timestamp() @@ -140,29 +172,9 @@ def benchmark_impl( ) zenbenchmark.record_pg_bench_result(endpoint_name, res) - with Exec(max_workers=2) as exe: - exe.submit(bench, "normal", normal_id, normal_env) - exe.submit(bench, "prewarmed", prewarmed_id, prewarmed_env) + prewarmed_args = ("prewarmed", prewarmed_id, connstr_to_env(prewarmed_uri)) + prewarmed_thread = Thread(target=bench, args=prewarmed_args) + prewarmed_thread.start() - -def test_compare_prewarmed_read_perf(neon_compare: NeonCompare): - env = neon_compare.env - env.create_branch("normal") - env.create_branch("prewarmed") - ep_normal: Endpoint = env.endpoints.create_start("normal") - ep_prewarmed: Endpoint = env.endpoints.create_start("prewarmed", autoprewarm=True) - - sql = [ - "CREATE EXTENSION neon", - "CREATE TABLE foo(key serial primary key, t text default 'foooooooooooooooooooooooooooooooooooooooooooooooooooo')", - "INSERT INTO foo SELECT FROM generate_series(1,1000000)", - ] - for ep in [ep_normal, ep_prewarmed]: - ep.safe_psql_many(sql) - client = ep.http_client() - client.offload_lfc() - ep.stop() - ep.start() - client.prewarm_lfc_wait() - with neon_compare.record_duration(f"{ep.branch_name}_run_duration"): - ep.safe_psql("SELECT count(*) from foo") + bench("ordinary", ordinary_id, connstr_to_env(ordinary_uri)) + prewarmed_thread.join()