From aaaa39d9f52a46641c86314ddc9d15565275d9c2 Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Fri, 29 Dec 2023 23:09:36 +0300 Subject: [PATCH] Add large insertion and slow WAL sending to test_hot_standby. To exercise MAX_SEND_SIZE sending from safekeeper; we've had a bug with WAL records torn across several XLogData messages. Add failpoint to safekeeper to slow down sending. Also check for corrupted WAL complains in standby log. Make the test a bit simpler in passing, e.g. we don't need explicit commits as autocommit is enabled by default. https://neondb.slack.com/archives/C05L7D1JAUS/p1703774799114719 https://github.com/neondatabase/cloud/issues/9057 --- safekeeper/src/send_wal.rs | 6 ++ test_runner/fixtures/neon_fixtures.py | 17 +++-- test_runner/regress/test_hot_standby.py | 91 +++++++++++++++---------- 3 files changed, 73 insertions(+), 41 deletions(-) diff --git a/safekeeper/src/send_wal.rs b/safekeeper/src/send_wal.rs index 70590a0f95..bd1d306968 100644 --- a/safekeeper/src/send_wal.rs +++ b/safekeeper/src/send_wal.rs @@ -17,6 +17,7 @@ use postgres_ffi::{TimestampTz, MAX_SEND_SIZE}; use pq_proto::{BeMessage, WalSndKeepAlive, XLogDataBody}; use serde::{Deserialize, Serialize}; use tokio::io::{AsyncRead, AsyncWrite}; +use utils::failpoint_support; use utils::id::TenantTimelineId; use utils::lsn::AtomicLsn; use utils::pageserver_feedback::PageserverFeedback; @@ -559,6 +560,11 @@ impl WalSender<'_, IO> { })) .await?; + if let Some(appname) = &self.appname { + if appname == "replica" { + failpoint_support::sleep_millis_async!("sk-send-wal-replica-sleep"); + } + } trace!( "sent {} bytes of WAL {}-{}", send_size, diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 9aa82d8854..5b1a8ba27d 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -347,7 +347,9 @@ class PgProtocol: """ return self.safe_psql_many([query], **kwargs)[0] - def safe_psql_many(self, queries: List[str], **kwargs: Any) -> List[List[Tuple[Any, ...]]]: + def safe_psql_many( + self, queries: List[str], log_query=True, **kwargs: Any + ) -> List[List[Tuple[Any, ...]]]: """ Execute queries against the node and return all rows. This method passes all extra params to connstr. @@ -356,7 +358,8 @@ class PgProtocol: with closing(self.connect(**kwargs)) as conn: with conn.cursor() as cur: for query in queries: - log.info(f"Executing query: {query}") + if log_query: + log.info(f"Executing query: {query}") cur.execute(query) if cur.description is None: @@ -365,11 +368,11 @@ class PgProtocol: result.append(cur.fetchall()) return result - def safe_psql_scalar(self, query) -> Any: + def safe_psql_scalar(self, query, log_query=True) -> Any: """ Execute query returning single row with single column. """ - return self.safe_psql(query)[0][0] + return self.safe_psql(query, log_query=log_query)[0][0] @dataclass @@ -2925,7 +2928,9 @@ class Safekeeper: def http_client(self, auth_token: Optional[str] = None) -> SafekeeperHttpClient: is_testing_enabled = '"testing"' in self.env.get_binary_version("safekeeper") - return SafekeeperHttpClient(port=self.port.http, auth_token=auth_token, is_testing_enabled=is_testing_enabled) + return SafekeeperHttpClient( + port=self.port.http, auth_token=auth_token, is_testing_enabled=is_testing_enabled + ) def data_dir(self) -> str: return os.path.join(self.env.repo_dir, "safekeepers", f"sk{self.id}") @@ -2976,7 +2981,7 @@ class SafekeeperMetrics: class SafekeeperHttpClient(requests.Session): HTTPError = requests.HTTPError - def __init__(self, port: int, auth_token: Optional[str] = None, is_testing_enabled = False): + def __init__(self, port: int, auth_token: Optional[str] = None, is_testing_enabled=False): super().__init__() self.port = port self.auth_token = auth_token diff --git a/test_runner/regress/test_hot_standby.py b/test_runner/regress/test_hot_standby.py index 031fd2857d..7822e29ed9 100644 --- a/test_runner/regress/test_hot_standby.py +++ b/test_runner/regress/test_hot_standby.py @@ -1,19 +1,59 @@ +import os +import re import time -from fixtures.neon_fixtures import NeonEnv +from fixtures.log_helper import log +from fixtures.neon_fixtures import Endpoint, NeonEnv + + +def wait_caughtup(primary: Endpoint, secondary: Endpoint): + primary_lsn = primary.safe_psql_scalar( + "SELECT pg_current_wal_insert_lsn()::text", log_query=False + ) + while True: + secondary_lsn = secondary.safe_psql_scalar( + "SELECT pg_last_wal_replay_lsn()", log_query=False + ) + caught_up = secondary_lsn >= primary_lsn + log.info(f"caughtup={caught_up}, primary_lsn={primary_lsn}, secondary_lsn={secondary_lsn}") + if caught_up: + return + time.sleep(1) + + +# Check for corrupted WAL messages which might otherwise go unnoticed if +# reconnection fixes this. +def scan_standby_log_for_errors(secondary): + log_path = secondary.endpoint_path() / "compute.log" + with log_path.open("r") as f: + markers = re.compile( + r"incorrect resource manager data|record with incorrect|invalid magic number|unexpected pageaddr" + ) + for line in f: + if markers.search(line): + log.info(f"bad error in standby log: {line}") + raise AssertionError() def test_hot_standby(neon_simple_env: NeonEnv): env = neon_simple_env + # We've had a bug caused by WAL records split across multiple XLogData + # messages resulting in corrupted WAL complains on standby. It reproduced + # only when sending from safekeeper is slow enough to grab full + # MAX_SEND_SIZE messages. So insert sleep through failpoints, but only in + # one conf to decrease test time. + slow_down_send = "[debug-pg16]" in os.environ.get("PYTEST_CURRENT_TEST", "") + if slow_down_send: + sk_http = env.safekeepers[0].http_client() + sk_http.configure_failpoints([("sk-send-wal-replica-sleep", "return(100)")]) + with env.endpoints.create_start( branch_name="main", endpoint_id="primary", ) as primary: time.sleep(1) with env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary") as secondary: - primary_lsn = None - caught_up = False queries = [ "SHOW neon.timeline_id", "SHOW neon.tenant_id", @@ -26,23 +66,6 @@ def test_hot_standby(neon_simple_env: NeonEnv): with p_con.cursor() as p_cur: p_cur.execute("CREATE TABLE test AS SELECT generate_series(1, 100) AS i") - # Explicit commit to make sure other connections (and replicas) can - # see the changes of this commit. - p_con.commit() - - with p_con.cursor() as p_cur: - p_cur.execute("SELECT pg_current_wal_insert_lsn()::text") - res = p_cur.fetchone() - assert res is not None - (lsn,) = res - primary_lsn = lsn - - # Explicit commit to make sure other connections (and replicas) can - # see the changes of this commit. - # Note that this may generate more WAL if the transaction has changed - # things, but we don't care about that. - p_con.commit() - for query in queries: with p_con.cursor() as p_cur: p_cur.execute(query) @@ -51,30 +74,28 @@ def test_hot_standby(neon_simple_env: NeonEnv): response = res responses[query] = response + # insert more data to make safekeeper send MAX_SEND_SIZE messages + if slow_down_send: + primary.safe_psql("create table t(key int, value text)") + primary.safe_psql("insert into t select generate_series(1, 100000), 'payload'") + + wait_caughtup(primary, secondary) + with secondary.connect() as s_con: with s_con.cursor() as s_cur: s_cur.execute("SELECT 1 WHERE pg_is_in_recovery()") res = s_cur.fetchone() assert res is not None - while not caught_up: - with s_con.cursor() as secondary_cursor: - secondary_cursor.execute("SELECT pg_last_wal_replay_lsn()") - res = secondary_cursor.fetchone() - assert res is not None - (secondary_lsn,) = res - # There may be more changes on the primary after we got our LSN - # due to e.g. autovacuum, but that shouldn't impact the content - # of the tables, so we check whether we've replayed up to at - # least after the commit of the `test` table. - caught_up = secondary_lsn >= primary_lsn - - # Explicit commit to flush any transient transaction-level state. - s_con.commit() - for query in queries: with s_con.cursor() as secondary_cursor: secondary_cursor.execute(query) response = secondary_cursor.fetchone() assert response is not None assert response == responses[query] + + scan_standby_log_for_errors(secondary) + + # clean up + if slow_down_send: + sk_http.configure_failpoints(("sk-send-wal-replica-sleep", "off"))