diff --git a/test_runner/regress/test_wal_acceptor.py b/test_runner/regress/test_wal_acceptor.py index 631798d643..aaac4570b0 100644 --- a/test_runner/regress/test_wal_acceptor.py +++ b/test_runner/regress/test_wal_acceptor.py @@ -11,7 +11,7 @@ from contextlib import closing from dataclasses import dataclass, field from functools import partial from pathlib import Path -from typing import Any, List, Optional +from typing import Any, Dict, List, Optional import psycopg2 import psycopg2.errors @@ -19,6 +19,7 @@ import psycopg2.extras import pytest from fixtures.broker import NeonBroker from fixtures.log_helper import log +from fixtures.metrics import parse_metrics from fixtures.neon_fixtures import ( Endpoint, NeonEnv, @@ -1477,3 +1478,72 @@ def test_pull_timeline(neon_env_builder: NeonEnvBuilder): execute_payload(endpoint) show_statuses(env.safekeepers, tenant_id, timeline_id) + + +# In this test we check for excessive START_REPLICATION and START_WAL_PUSH queries +# when compute is active, but there are no writes to the timeline. In that case +# pageserver should maintain a single connection to safekeeper and don't attempt +# to reconnect extra times. +# +# The only way to verify this without manipulating time is to sleep for a while. +# In this test we sleep for 60 seconds, so this test takes at least 1 minute to run. +# This is longer than most other tests, we run it only for v16 to save CI resources. +def test_idle_reconnections(neon_env_builder: NeonEnvBuilder): + if os.environ.get("PYTEST_CURRENT_TEST", "").find("[debug-pg16]") == -1: + pytest.skip("run only on debug postgres v16 to save CI resources") + + neon_env_builder.num_safekeepers = 3 + env = neon_env_builder.init_start() + + tenant_id = env.initial_tenant + timeline_id = env.neon_cli.create_branch("test_sk_auth_restart_endpoint") + + def collect_stats() -> Dict[str, float]: + # we need to collect safekeeper_pg_queries_received_total metric from all safekeepers + sk_metrics = [ + parse_metrics(sk.http_client().get_metrics_str(), f"safekeeper_{sk.id}") + for sk in env.safekeepers + ] + + total: Dict[str, float] = {} + + for sk in sk_metrics: + queries_received = sk.query_all("safekeeper_pg_queries_received_total") + log.info(f"{sk.name} queries received: {queries_received}") + for sample in queries_received: + total[sample.labels["query"]] = total.get(sample.labels["query"], 0) + sample.value + + log.info(f"Total queries received: {total}") + + # in the perfect world, we should see only one START_REPLICATION query, + # here we check for 5 to prevent flakiness + assert total.get("START_REPLICATION", 0) <= 5 + + # in the perfect world, we should see ~6 START_WAL_PUSH queries, + # here we check for 15 to prevent flakiness + assert total.get("START_WAL_PUSH", 0) <= 15 + + return total + + collect_stats() + + endpoint = env.endpoints.create_start("test_sk_auth_restart_endpoint") + # just write something to the timeline + endpoint.safe_psql("create table t(i int)") + collect_stats() + + # sleep a bit + time.sleep(30) + + # force checkpoint in pageserver to advance remote_consistent_lsn + wait_lsn_force_checkpoint(tenant_id, timeline_id, endpoint, env.pageserver) + + collect_stats() + + time.sleep(30) + + final_stats = collect_stats() + # pageserver should connect to safekeepers at least once + assert final_stats.get("START_REPLICATION", 0) >= 1 + # walproposer should connect to each safekeeper at least once + assert final_stats.get("START_WAL_PUSH", 0) >= 3