mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-16 09:52:54 +00:00
To exercise MAX_SEND_SIZE sending from safekeeper; we've had a bug with WAL records torn across several XLogData messages. Add failpoint to safekeeper to slow down sending. Also check for corrupted WAL complains in standby log. Make the test a bit simpler in passing, e.g. we don't need explicit commits as autocommit is enabled by default. https://neondb.slack.com/archives/C05L7D1JAUS/p1703774799114719 https://github.com/neondatabase/cloud/issues/9057
102 lines
3.9 KiB
Python
102 lines
3.9 KiB
Python
import os
|
|
import re
|
|
import time
|
|
|
|
from fixtures.log_helper import log
|
|
from fixtures.neon_fixtures import Endpoint, NeonEnv
|
|
|
|
|
|
def wait_caughtup(primary: Endpoint, secondary: Endpoint):
|
|
primary_lsn = primary.safe_psql_scalar(
|
|
"SELECT pg_current_wal_insert_lsn()::text", log_query=False
|
|
)
|
|
while True:
|
|
secondary_lsn = secondary.safe_psql_scalar(
|
|
"SELECT pg_last_wal_replay_lsn()", log_query=False
|
|
)
|
|
caught_up = secondary_lsn >= primary_lsn
|
|
log.info(f"caughtup={caught_up}, primary_lsn={primary_lsn}, secondary_lsn={secondary_lsn}")
|
|
if caught_up:
|
|
return
|
|
time.sleep(1)
|
|
|
|
|
|
# Check for corrupted WAL messages which might otherwise go unnoticed if
|
|
# reconnection fixes this.
|
|
def scan_standby_log_for_errors(secondary):
|
|
log_path = secondary.endpoint_path() / "compute.log"
|
|
with log_path.open("r") as f:
|
|
markers = re.compile(
|
|
r"incorrect resource manager data|record with incorrect|invalid magic number|unexpected pageaddr"
|
|
)
|
|
for line in f:
|
|
if markers.search(line):
|
|
log.info(f"bad error in standby log: {line}")
|
|
raise AssertionError()
|
|
|
|
|
|
def test_hot_standby(neon_simple_env: NeonEnv):
|
|
env = neon_simple_env
|
|
|
|
# We've had a bug caused by WAL records split across multiple XLogData
|
|
# messages resulting in corrupted WAL complains on standby. It reproduced
|
|
# only when sending from safekeeper is slow enough to grab full
|
|
# MAX_SEND_SIZE messages. So insert sleep through failpoints, but only in
|
|
# one conf to decrease test time.
|
|
slow_down_send = "[debug-pg16]" in os.environ.get("PYTEST_CURRENT_TEST", "")
|
|
if slow_down_send:
|
|
sk_http = env.safekeepers[0].http_client()
|
|
sk_http.configure_failpoints([("sk-send-wal-replica-sleep", "return(100)")])
|
|
|
|
with env.endpoints.create_start(
|
|
branch_name="main",
|
|
endpoint_id="primary",
|
|
) as primary:
|
|
time.sleep(1)
|
|
with env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary") as secondary:
|
|
queries = [
|
|
"SHOW neon.timeline_id",
|
|
"SHOW neon.tenant_id",
|
|
"SELECT relname FROM pg_class WHERE relnamespace = current_schema()::regnamespace::oid",
|
|
"SELECT COUNT(*), SUM(i) FROM test",
|
|
]
|
|
responses = dict()
|
|
|
|
with primary.connect() as p_con:
|
|
with p_con.cursor() as p_cur:
|
|
p_cur.execute("CREATE TABLE test AS SELECT generate_series(1, 100) AS i")
|
|
|
|
for query in queries:
|
|
with p_con.cursor() as p_cur:
|
|
p_cur.execute(query)
|
|
res = p_cur.fetchone()
|
|
assert res is not None
|
|
response = res
|
|
responses[query] = response
|
|
|
|
# insert more data to make safekeeper send MAX_SEND_SIZE messages
|
|
if slow_down_send:
|
|
primary.safe_psql("create table t(key int, value text)")
|
|
primary.safe_psql("insert into t select generate_series(1, 100000), 'payload'")
|
|
|
|
wait_caughtup(primary, secondary)
|
|
|
|
with secondary.connect() as s_con:
|
|
with s_con.cursor() as s_cur:
|
|
s_cur.execute("SELECT 1 WHERE pg_is_in_recovery()")
|
|
res = s_cur.fetchone()
|
|
assert res is not None
|
|
|
|
for query in queries:
|
|
with s_con.cursor() as secondary_cursor:
|
|
secondary_cursor.execute(query)
|
|
response = secondary_cursor.fetchone()
|
|
assert response is not None
|
|
assert response == responses[query]
|
|
|
|
scan_standby_log_for_errors(secondary)
|
|
|
|
# clean up
|
|
if slow_down_send:
|
|
sk_http.configure_failpoints(("sk-send-wal-replica-sleep", "off"))
|