diff --git a/test_runner/regress/test_bad_connection.py b/test_runner/regress/test_bad_connection.py index e89f9f2ad3..561d550296 100644 --- a/test_runner/regress/test_bad_connection.py +++ b/test_runner/regress/test_bad_connection.py @@ -3,11 +3,12 @@ from __future__ import annotations import asyncio import random import time +from asyncio import TaskGroup import psycopg2.errors import pytest from fixtures.log_helper import log -from fixtures.neon_fixtures import NeonEnvBuilder +from fixtures.neon_fixtures import NeonEnvBuilder, PgBin from fixtures.pageserver_mitm import BreakConnectionException, PageserverProxy from fixtures.port_distributor import PortDistributor @@ -85,32 +86,91 @@ def test_compute_pageserver_connection_stress(neon_env_builder: NeonEnvBuilder): env.pageserver.stop() -@pytest.mark.timeout(600) +# +# Observations: +# +# 1. If the backend is waiting for response to GetPage request, and the client disconnects, +# the backend will not immediately abort the GetPage request. It will not notice that the +# client is gone, until it tries to send something back to the client, or if a timeout +# kills the query. +# +# So to reproduce the traffic jam, you need: +# +# - A network glitch, which causes one GetPage request/response to be lost or delayed. +# It might get lost at IP level, and TCP retransmits might take a long time. Or there might +# be a glitch in the pageserver or compute, which causes the request to be "stuck". +# +# - An application with a application level timeout and retry. If the +# query doesn't return in a timely a fashion, the application kills the connection and +# retries the query, or a runs similar query that needs the same page. +# +# The first time the GetPage request is stuck and it disconnects, it leaves behind a +# backend that's still waiting for the GetPage response, and is holding the buffer lock. +# The client has closed the connection, but the server doesn't get the memo. +# On each subsequent retry, the connection will block waiting for the buffer lock, give up, +# and leave behind another backend blocked indefinitely. +# +# The situation unravels when the original backend doing the GetPage request finally +# gets a response, or it gets confirmation that the TCP connection is lost. +# +# This test reproduces the traffic jam using a MITM proxy between pageserver and compute, +# and forcing one GetPage request to get stuck. +# +# Recommendations: +# - set client_connection_check_interval = '10s'. This makes Postgres wake up and check +# for client connection loss. It's not perfect, it might not notice if the client has +# e.g rebooted without sending a RST packet, but there's no downside +# +# - Add a timeout to GetPage requests. If no response is received from the pageserver +# in, say, 10 s, terminate the pageserver connection and retry. XXX: Negotiate this +# behavior with the storage team +# +# +@pytest.mark.timeout(120) def test_compute_pageserver_connection_stress2( - neon_env_builder: NeonEnvBuilder, port_distributor: PortDistributor, - pg_bin: PgBin + neon_env_builder: NeonEnvBuilder, port_distributor: PortDistributor, pg_bin: PgBin ): env = neon_env_builder.init_start() # Set up the MITM proxy + global error_fraction + global delay_fraction + error_fraction = 0 + delay_fraction = 0 async def response_cb(conn_id): + global delay_fraction global error_fraction + if random.random() < error_fraction: raise BreakConnectionException("unlucky") + orig_delay_fraction = delay_fraction + if random.random() < delay_fraction: + delay_fraction = 0 + log.info(f"[{conn_id}] making getpage request STUCK") + try: + await asyncio.sleep(300) + finally: + delay_fraction = orig_delay_fraction + log.info(f"[{conn_id}] delay finished") + mitm_listen_port = port_distributor.get_port() mitm = PageserverProxy(mitm_listen_port, env.pageserver.service_port.pg, response_cb) def main(): - global error_fraction + global error_fraction, delay_fraction endpoint = env.endpoints.create( "main", config_lines=[ "max_connections=1000", - ]) + "shared_buffers=8MB", + "log_connections=on", + "log_disconnections=on", + ], + ) endpoint.start() with open(endpoint.pg_data_dir_path() / "postgresql.conf", "a") as conf: @@ -127,22 +187,94 @@ def test_compute_pageserver_connection_stress2( connstr = endpoint.connstr() log.info(f"Start a pgbench workload on pg {connstr}") - error_fraction=0.001 + error_fraction = 0.001 pg_bin.run_capture(["pgbench", "-i", "-I", "dtGvp", f"-s{scale}", connstr]) - error_fraction=0.01 - pg_bin.run_capture(["pgbench", "-S", "-c80", "-j4", "-P1", "-T60", connstr]) + error_fraction = 0.00 + delay_fraction = 0.001 + + cur.execute("select max(aid) from pgbench_accounts") + num_accounts = 100000 * scale + + num_clients = 200 + + app = WalkingApplication(num_accounts, num_clients, endpoint, 1000000) + asyncio.run(app.run()) mitm.shutdown() async def mm(): - await asyncio.gather( - asyncio.to_thread(main), - mitm.run_server() - ) + await asyncio.gather(asyncio.to_thread(main), mitm.run_server()) asyncio.run(mm()) # do a graceful shutdown which would had caught the allowed_errors before # https://github.com/neondatabase/neon/pull/8632 env.pageserver.stop() + + +class WalkingApplication: + """ + A test application with following characteristics: + + - It performs single-row lookups in pgbench_accounts table, just like pgbench -S + + - Whenever a query takes longer than 10s, the application disconnects, reconnects, + and retries the query, with the same parameter. This way, if there's a problem + with a single page, the application will keep retrying it rather than work + around it. + + - The lookups are not randomly distributed, but form a "walking herd" pattern, + where the queries walk through all accounts, with some randomness. This way, + there's a lot of locality of access, but the locality moves throughout the + table. + + """ + + def __init__(self, num_accounts, num_clients, endpoint, num_xacts): + self.num_accounts = num_accounts + self.num_clients = num_clients + self.endpoint = endpoint + self.running = True + self.num_xacts = num_xacts + + self.xacts_started = 0 + self.xacts_performed = 0 + self.xacts_failed = 0 + + async def run(self): + async with TaskGroup() as group: + for i in range(1, self.num_clients): + group.create_task(self.walking_client(i)) + + async def walking_client(self, client_id): + local_xacts_performed = 0 + + conn = None + stmt = None + failed = False + while self.running and self.xacts_started < self.num_xacts: + self.xacts_started += 1 + if not failed: + aid = (self.xacts_started * 100 + random.randint(0, 100)) % self.num_accounts + 1 + + if conn is None: + conn = await self.endpoint.connect_async() + await conn.execute("set statement_timeout=0") + stmt = await conn.prepare("SELECT abalance FROM pgbench_accounts WHERE aid = $1") + + try: + async with asyncio.timeout(10): + res = await stmt.fetchval(aid) + if local_xacts_performed % 1000 == 0: + log.info( + f"[{client_id}] result {self.xacts_performed}/{self.num_xacts}: balance of account {aid}: {res}" + ) + self.xacts_performed += 1 + local_xacts_performed += 1 + failed = False + except TimeoutError: + log.info(f"[{client_id}] query on aid {aid} timed out. Reconnecting") + conn.terminate() + conn = None + failed = True