Files
neon/test_runner/regress/test_hot_standby.py
Alex Chi Z 643683f41a fixup(#7204 / postgres): revert IsPrimaryAlive checks (#7209)
Fix #7204.

https://github.com/neondatabase/postgres/pull/400
https://github.com/neondatabase/postgres/pull/401
https://github.com/neondatabase/postgres/pull/402

These commits never go into prod. Detailed investigation will be posted
in another issue. Reverting the commits so that things can keep running
in prod. This pull request adds the test to start two replicas. It fails
on the current main https://github.com/neondatabase/neon/pull/7210 but
passes in this pull request.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-03-23 01:01:51 +00:00

105 lines
4.0 KiB
Python

import os
import re
import time
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, wait_replica_caughtup
# Check for corrupted WAL messages which might otherwise go unnoticed if
# reconnection fixes this.
def scan_standby_log_for_errors(secondary):
log_path = secondary.endpoint_path() / "compute.log"
with log_path.open("r") as f:
markers = re.compile(
r"incorrect resource manager data|record with incorrect|invalid magic number|unexpected pageaddr"
)
for line in f:
if markers.search(line):
log.info(f"bad error in standby log: {line}")
raise AssertionError()
def test_hot_standby(neon_simple_env: NeonEnv):
env = neon_simple_env
# We've had a bug caused by WAL records split across multiple XLogData
# messages resulting in corrupted WAL complains on standby. It reproduced
# only when sending from safekeeper is slow enough to grab full
# MAX_SEND_SIZE messages. So insert sleep through failpoints, but only in
# one conf to decrease test time.
slow_down_send = "[debug-pg16]" in os.environ.get("PYTEST_CURRENT_TEST", "")
if slow_down_send:
sk_http = env.safekeepers[0].http_client()
sk_http.configure_failpoints([("sk-send-wal-replica-sleep", "return(100)")])
with env.endpoints.create_start(
branch_name="main",
endpoint_id="primary",
) as primary:
time.sleep(1)
with env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary") as secondary:
queries = [
"SHOW neon.timeline_id",
"SHOW neon.tenant_id",
"SELECT relname FROM pg_class WHERE relnamespace = current_schema()::regnamespace::oid",
"SELECT COUNT(*), SUM(i) FROM test",
]
responses = dict()
with primary.connect() as p_con:
with p_con.cursor() as p_cur:
p_cur.execute("CREATE TABLE test AS SELECT generate_series(1, 100) AS i")
for query in queries:
with p_con.cursor() as p_cur:
p_cur.execute(query)
res = p_cur.fetchone()
assert res is not None
response = res
responses[query] = response
# insert more data to make safekeeper send MAX_SEND_SIZE messages
if slow_down_send:
primary.safe_psql("create table t(key int, value text)")
primary.safe_psql("insert into t select generate_series(1, 100000), 'payload'")
wait_replica_caughtup(primary, secondary)
with secondary.connect() as s_con:
with s_con.cursor() as s_cur:
s_cur.execute("SELECT 1 WHERE pg_is_in_recovery()")
res = s_cur.fetchone()
assert res is not None
for query in queries:
with s_con.cursor() as secondary_cursor:
secondary_cursor.execute(query)
response = secondary_cursor.fetchone()
assert response is not None
assert response == responses[query]
scan_standby_log_for_errors(secondary)
# clean up
if slow_down_send:
sk_http.configure_failpoints(("sk-send-wal-replica-sleep", "off"))
def test_2_replicas_start(neon_simple_env: NeonEnv):
env = neon_simple_env
with env.endpoints.create_start(
branch_name="main",
endpoint_id="primary",
) as primary:
time.sleep(1)
with env.endpoints.new_replica_start(
origin=primary, endpoint_id="secondary1"
) as secondary1:
with env.endpoints.new_replica_start(
origin=primary, endpoint_id="secondary2"
) as secondary2:
wait_replica_caughtup(primary, secondary1)
wait_replica_caughtup(primary, secondary2)