From 3ffe6de0b9a4f49cf18f6a2ebf0fc2c6274dfccd Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Fri, 29 Nov 2024 10:40:08 +0100 Subject: [PATCH] test_runner/performance: add logical message ingest benchmark (#9749) Adds a benchmark for logical message WAL ingestion throughput end-to-end. Logical messages are essentially noops, and thus ignored by the Pageserver. Example results from my MacBook, with fsync enabled: ``` postgres_ingest: 14.445 s safekeeper_ingest: 29.948 s pageserver_ingest: 30.013 s pageserver_recover_ingest: 8.633 s wal_written: 10,340 MB message_count: 1310720 messages postgres_throughput: 715 MB/s safekeeper_throughput: 345 MB/s pageserver_throughput: 344 MB/s pageserver_recover_throughput: 1197 MB/s ``` See https://github.com/neondatabase/neon/issues/9642#issuecomment-2475995205 for running analysis. Touches #9642. --- test_runner/fixtures/neon_fixtures.py | 31 ++++++ .../test_ingest_logical_message.py | 101 ++++++++++++++++++ 2 files changed, 132 insertions(+) create mode 100644 test_runner/performance/test_ingest_logical_message.py diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 1f4d2aa5ec..e3c88e9965 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -4404,6 +4404,10 @@ class Safekeeper(LogUtils): log.info(f"sk {self.id} flush LSN: {flush_lsn}") return flush_lsn + def get_commit_lsn(self, tenant_id: TenantId, timeline_id: TimelineId) -> Lsn: + timeline_status = self.http_client().timeline_status(tenant_id, timeline_id) + return timeline_status.commit_lsn + def pull_timeline( self, srcs: list[Safekeeper], tenant_id: TenantId, timeline_id: TimelineId ) -> dict[str, Any]: @@ -4949,6 +4953,33 @@ def wait_for_last_flush_lsn( return min(results) +def wait_for_commit_lsn( + env: NeonEnv, + tenant: TenantId, + timeline: TimelineId, + lsn: Lsn, +) -> Lsn: + # TODO: it would be better to poll this in the compute, but there's no API for it. See: + # https://github.com/neondatabase/neon/issues/9758 + "Wait for the given LSN to be committed on any Safekeeper" + + max_commit_lsn = Lsn(0) + for i in range(1000): + for sk in env.safekeepers: + commit_lsn = sk.get_commit_lsn(tenant, timeline) + if commit_lsn >= lsn: + log.info(f"{tenant}/{timeline} at commit_lsn {commit_lsn}") + return commit_lsn + max_commit_lsn = max(max_commit_lsn, commit_lsn) + + if i % 10 == 0: + log.info( + f"{tenant}/{timeline} waiting for commit_lsn to reach {lsn}, now {max_commit_lsn}" + ) + time.sleep(0.1) + raise Exception(f"timed out while waiting for commit_lsn to reach {lsn}, was {max_commit_lsn}") + + def flush_ep_to_pageserver( env: NeonEnv, ep: Endpoint, diff --git a/test_runner/performance/test_ingest_logical_message.py b/test_runner/performance/test_ingest_logical_message.py new file mode 100644 index 0000000000..d3118eb15a --- /dev/null +++ b/test_runner/performance/test_ingest_logical_message.py @@ -0,0 +1,101 @@ +from __future__ import annotations + +import pytest +from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker +from fixtures.common_types import Lsn +from fixtures.log_helper import log +from fixtures.neon_fixtures import ( + NeonEnvBuilder, + wait_for_commit_lsn, + wait_for_last_flush_lsn, +) +from fixtures.pageserver.utils import wait_for_last_record_lsn + + +@pytest.mark.timeout(600) +@pytest.mark.parametrize("size", [1024, 8192, 131072]) +@pytest.mark.parametrize("fsync", [True, False], ids=["fsync", "nofsync"]) +def test_ingest_logical_message( + request: pytest.FixtureRequest, + neon_env_builder: NeonEnvBuilder, + zenbenchmark: NeonBenchmarker, + fsync: bool, + size: int, +): + """ + Benchmarks ingestion of 10 GB of logical message WAL. These are essentially noops, and don't + incur any pageserver writes. + """ + + VOLUME = 10 * 1024**3 + count = VOLUME // size + + neon_env_builder.safekeepers_enable_fsync = fsync + + env = neon_env_builder.init_start() + endpoint = env.endpoints.create_start( + "main", + config_lines=[ + f"fsync = {fsync}", + # Disable backpressure. We don't want to block on pageserver. + "max_replication_apply_lag = 0", + "max_replication_flush_lag = 0", + "max_replication_write_lag = 0", + ], + ) + client = env.pageserver.http_client() + + # Wait for the timeline to be propagated to the pageserver. + wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, env.initial_timeline) + + # Ingest data and measure durations. + start_lsn = Lsn(endpoint.safe_psql("select pg_current_wal_lsn()")[0][0]) + + with endpoint.cursor() as cur: + cur.execute("set statement_timeout = 0") + + # Postgres will return once the logical messages have been written to its local WAL, without + # waiting for Safekeeper commit. We measure ingestion time both for Postgres, Safekeeper, + # and Pageserver to detect bottlenecks. + log.info("Ingesting data") + with zenbenchmark.record_duration("pageserver_ingest"): + with zenbenchmark.record_duration("safekeeper_ingest"): + with zenbenchmark.record_duration("postgres_ingest"): + cur.execute(f""" + select pg_logical_emit_message(false, '', repeat('x', {size})) + from generate_series(1, {count}) + """) + + end_lsn = Lsn(endpoint.safe_psql("select pg_current_wal_lsn()")[0][0]) + + # Wait for Safekeeper. + log.info("Waiting for Safekeeper to catch up") + wait_for_commit_lsn(env, env.initial_tenant, env.initial_timeline, end_lsn) + + # Wait for Pageserver. + log.info("Waiting for Pageserver to catch up") + wait_for_last_record_lsn(client, env.initial_tenant, env.initial_timeline, end_lsn) + + # Now that all data is ingested, delete and recreate the tenant in the pageserver. This will + # reingest all the WAL from the safekeeper without any other constraints. This gives us a + # baseline of how fast the pageserver can ingest this WAL in isolation. + status = env.storage_controller.inspect(tenant_shard_id=env.initial_tenant) + assert status is not None + + client.tenant_delete(env.initial_tenant) + env.pageserver.tenant_create(tenant_id=env.initial_tenant, generation=status[0]) + + with zenbenchmark.record_duration("pageserver_recover_ingest"): + log.info("Recovering WAL into pageserver") + client.timeline_create(env.pg_version, env.initial_tenant, env.initial_timeline) + wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, env.initial_timeline) + + # Emit metrics. + wal_written_mb = round((end_lsn - start_lsn) / (1024 * 1024)) + zenbenchmark.record("wal_written", wal_written_mb, "MB", MetricReport.TEST_PARAM) + zenbenchmark.record("message_count", count, "messages", MetricReport.TEST_PARAM) + + props = {p["name"]: p["value"] for _, p in request.node.user_properties} + for name in ("postgres", "safekeeper", "pageserver", "pageserver_recover"): + throughput = int(wal_written_mb / props[f"{name}_ingest"]) + zenbenchmark.record(f"{name}_throughput", throughput, "MB/s", MetricReport.HIGHER_IS_BETTER)