import os import re import time from fixtures.log_helper import log from fixtures.neon_fixtures import Endpoint, NeonEnv def wait_caughtup(primary: Endpoint, secondary: Endpoint): primary_lsn = primary.safe_psql_scalar( "SELECT pg_current_wal_insert_lsn()::text", log_query=False ) while True: secondary_lsn = secondary.safe_psql_scalar( "SELECT pg_last_wal_replay_lsn()", log_query=False ) caught_up = secondary_lsn >= primary_lsn log.info(f"caughtup={caught_up}, primary_lsn={primary_lsn}, secondary_lsn={secondary_lsn}") if caught_up: return time.sleep(1) # Check for corrupted WAL messages which might otherwise go unnoticed if # reconnection fixes this. def scan_standby_log_for_errors(secondary): log_path = secondary.endpoint_path() / "compute.log" with log_path.open("r") as f: markers = re.compile( r"incorrect resource manager data|record with incorrect|invalid magic number|unexpected pageaddr" ) for line in f: if markers.search(line): log.info(f"bad error in standby log: {line}") raise AssertionError() def test_hot_standby(neon_simple_env: NeonEnv): env = neon_simple_env # We've had a bug caused by WAL records split across multiple XLogData # messages resulting in corrupted WAL complains on standby. It reproduced # only when sending from safekeeper is slow enough to grab full # MAX_SEND_SIZE messages. So insert sleep through failpoints, but only in # one conf to decrease test time. slow_down_send = "[debug-pg16]" in os.environ.get("PYTEST_CURRENT_TEST", "") if slow_down_send: sk_http = env.safekeepers[0].http_client() sk_http.configure_failpoints([("sk-send-wal-replica-sleep", "return(100)")]) with env.endpoints.create_start( branch_name="main", endpoint_id="primary", ) as primary: time.sleep(1) with env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary") as secondary: queries = [ "SHOW neon.timeline_id", "SHOW neon.tenant_id", "SELECT relname FROM pg_class WHERE relnamespace = current_schema()::regnamespace::oid", "SELECT COUNT(*), SUM(i) FROM test", ] responses = dict() with primary.connect() as p_con: with p_con.cursor() as p_cur: p_cur.execute("CREATE TABLE test AS SELECT generate_series(1, 100) AS i") for query in queries: with p_con.cursor() as p_cur: p_cur.execute(query) res = p_cur.fetchone() assert res is not None response = res responses[query] = response # insert more data to make safekeeper send MAX_SEND_SIZE messages if slow_down_send: primary.safe_psql("create table t(key int, value text)") primary.safe_psql("insert into t select generate_series(1, 100000), 'payload'") wait_caughtup(primary, secondary) with secondary.connect() as s_con: with s_con.cursor() as s_cur: s_cur.execute("SELECT 1 WHERE pg_is_in_recovery()") res = s_cur.fetchone() assert res is not None for query in queries: with s_con.cursor() as secondary_cursor: secondary_cursor.execute(query) response = secondary_cursor.fetchone() assert response is not None assert response == responses[query] scan_standby_log_for_errors(secondary) # clean up if slow_down_send: sk_http.configure_failpoints(("sk-send-wal-replica-sleep", "off"))