mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-08 05:52:55 +00:00
Add large insertion and slow WAL sending to test_hot_standby.
To exercise MAX_SEND_SIZE sending from safekeeper; we've had a bug with WAL records torn across several XLogData messages. Add failpoint to safekeeper to slow down sending. Also check for corrupted WAL complains in standby log. Make the test a bit simpler in passing, e.g. we don't need explicit commits as autocommit is enabled by default. https://neondb.slack.com/archives/C05L7D1JAUS/p1703774799114719 https://github.com/neondatabase/cloud/issues/9057
This commit is contained in:
@@ -17,6 +17,7 @@ use postgres_ffi::{TimestampTz, MAX_SEND_SIZE};
|
||||
use pq_proto::{BeMessage, WalSndKeepAlive, XLogDataBody};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use utils::failpoint_support;
|
||||
use utils::id::TenantTimelineId;
|
||||
use utils::lsn::AtomicLsn;
|
||||
use utils::pageserver_feedback::PageserverFeedback;
|
||||
@@ -559,6 +560,11 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
|
||||
}))
|
||||
.await?;
|
||||
|
||||
if let Some(appname) = &self.appname {
|
||||
if appname == "replica" {
|
||||
failpoint_support::sleep_millis_async!("sk-send-wal-replica-sleep");
|
||||
}
|
||||
}
|
||||
trace!(
|
||||
"sent {} bytes of WAL {}-{}",
|
||||
send_size,
|
||||
|
||||
@@ -347,7 +347,9 @@ class PgProtocol:
|
||||
"""
|
||||
return self.safe_psql_many([query], **kwargs)[0]
|
||||
|
||||
def safe_psql_many(self, queries: List[str], **kwargs: Any) -> List[List[Tuple[Any, ...]]]:
|
||||
def safe_psql_many(
|
||||
self, queries: List[str], log_query=True, **kwargs: Any
|
||||
) -> List[List[Tuple[Any, ...]]]:
|
||||
"""
|
||||
Execute queries against the node and return all rows.
|
||||
This method passes all extra params to connstr.
|
||||
@@ -356,7 +358,8 @@ class PgProtocol:
|
||||
with closing(self.connect(**kwargs)) as conn:
|
||||
with conn.cursor() as cur:
|
||||
for query in queries:
|
||||
log.info(f"Executing query: {query}")
|
||||
if log_query:
|
||||
log.info(f"Executing query: {query}")
|
||||
cur.execute(query)
|
||||
|
||||
if cur.description is None:
|
||||
@@ -365,11 +368,11 @@ class PgProtocol:
|
||||
result.append(cur.fetchall())
|
||||
return result
|
||||
|
||||
def safe_psql_scalar(self, query) -> Any:
|
||||
def safe_psql_scalar(self, query, log_query=True) -> Any:
|
||||
"""
|
||||
Execute query returning single row with single column.
|
||||
"""
|
||||
return self.safe_psql(query)[0][0]
|
||||
return self.safe_psql(query, log_query=log_query)[0][0]
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -2925,7 +2928,9 @@ class Safekeeper:
|
||||
|
||||
def http_client(self, auth_token: Optional[str] = None) -> SafekeeperHttpClient:
|
||||
is_testing_enabled = '"testing"' in self.env.get_binary_version("safekeeper")
|
||||
return SafekeeperHttpClient(port=self.port.http, auth_token=auth_token, is_testing_enabled=is_testing_enabled)
|
||||
return SafekeeperHttpClient(
|
||||
port=self.port.http, auth_token=auth_token, is_testing_enabled=is_testing_enabled
|
||||
)
|
||||
|
||||
def data_dir(self) -> str:
|
||||
return os.path.join(self.env.repo_dir, "safekeepers", f"sk{self.id}")
|
||||
@@ -2976,7 +2981,7 @@ class SafekeeperMetrics:
|
||||
class SafekeeperHttpClient(requests.Session):
|
||||
HTTPError = requests.HTTPError
|
||||
|
||||
def __init__(self, port: int, auth_token: Optional[str] = None, is_testing_enabled = False):
|
||||
def __init__(self, port: int, auth_token: Optional[str] = None, is_testing_enabled=False):
|
||||
super().__init__()
|
||||
self.port = port
|
||||
self.auth_token = auth_token
|
||||
|
||||
@@ -1,19 +1,59 @@
|
||||
import os
|
||||
import re
|
||||
import time
|
||||
|
||||
from fixtures.neon_fixtures import NeonEnv
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import Endpoint, NeonEnv
|
||||
|
||||
|
||||
def wait_caughtup(primary: Endpoint, secondary: Endpoint):
|
||||
primary_lsn = primary.safe_psql_scalar(
|
||||
"SELECT pg_current_wal_insert_lsn()::text", log_query=False
|
||||
)
|
||||
while True:
|
||||
secondary_lsn = secondary.safe_psql_scalar(
|
||||
"SELECT pg_last_wal_replay_lsn()", log_query=False
|
||||
)
|
||||
caught_up = secondary_lsn >= primary_lsn
|
||||
log.info(f"caughtup={caught_up}, primary_lsn={primary_lsn}, secondary_lsn={secondary_lsn}")
|
||||
if caught_up:
|
||||
return
|
||||
time.sleep(1)
|
||||
|
||||
|
||||
# Check for corrupted WAL messages which might otherwise go unnoticed if
|
||||
# reconnection fixes this.
|
||||
def scan_standby_log_for_errors(secondary):
|
||||
log_path = secondary.endpoint_path() / "compute.log"
|
||||
with log_path.open("r") as f:
|
||||
markers = re.compile(
|
||||
r"incorrect resource manager data|record with incorrect|invalid magic number|unexpected pageaddr"
|
||||
)
|
||||
for line in f:
|
||||
if markers.search(line):
|
||||
log.info(f"bad error in standby log: {line}")
|
||||
raise AssertionError()
|
||||
|
||||
|
||||
def test_hot_standby(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
|
||||
# We've had a bug caused by WAL records split across multiple XLogData
|
||||
# messages resulting in corrupted WAL complains on standby. It reproduced
|
||||
# only when sending from safekeeper is slow enough to grab full
|
||||
# MAX_SEND_SIZE messages. So insert sleep through failpoints, but only in
|
||||
# one conf to decrease test time.
|
||||
slow_down_send = "[debug-pg16]" in os.environ.get("PYTEST_CURRENT_TEST", "")
|
||||
if slow_down_send:
|
||||
sk_http = env.safekeepers[0].http_client()
|
||||
sk_http.configure_failpoints([("sk-send-wal-replica-sleep", "return(100)")])
|
||||
|
||||
with env.endpoints.create_start(
|
||||
branch_name="main",
|
||||
endpoint_id="primary",
|
||||
) as primary:
|
||||
time.sleep(1)
|
||||
with env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary") as secondary:
|
||||
primary_lsn = None
|
||||
caught_up = False
|
||||
queries = [
|
||||
"SHOW neon.timeline_id",
|
||||
"SHOW neon.tenant_id",
|
||||
@@ -26,23 +66,6 @@ def test_hot_standby(neon_simple_env: NeonEnv):
|
||||
with p_con.cursor() as p_cur:
|
||||
p_cur.execute("CREATE TABLE test AS SELECT generate_series(1, 100) AS i")
|
||||
|
||||
# Explicit commit to make sure other connections (and replicas) can
|
||||
# see the changes of this commit.
|
||||
p_con.commit()
|
||||
|
||||
with p_con.cursor() as p_cur:
|
||||
p_cur.execute("SELECT pg_current_wal_insert_lsn()::text")
|
||||
res = p_cur.fetchone()
|
||||
assert res is not None
|
||||
(lsn,) = res
|
||||
primary_lsn = lsn
|
||||
|
||||
# Explicit commit to make sure other connections (and replicas) can
|
||||
# see the changes of this commit.
|
||||
# Note that this may generate more WAL if the transaction has changed
|
||||
# things, but we don't care about that.
|
||||
p_con.commit()
|
||||
|
||||
for query in queries:
|
||||
with p_con.cursor() as p_cur:
|
||||
p_cur.execute(query)
|
||||
@@ -51,30 +74,28 @@ def test_hot_standby(neon_simple_env: NeonEnv):
|
||||
response = res
|
||||
responses[query] = response
|
||||
|
||||
# insert more data to make safekeeper send MAX_SEND_SIZE messages
|
||||
if slow_down_send:
|
||||
primary.safe_psql("create table t(key int, value text)")
|
||||
primary.safe_psql("insert into t select generate_series(1, 100000), 'payload'")
|
||||
|
||||
wait_caughtup(primary, secondary)
|
||||
|
||||
with secondary.connect() as s_con:
|
||||
with s_con.cursor() as s_cur:
|
||||
s_cur.execute("SELECT 1 WHERE pg_is_in_recovery()")
|
||||
res = s_cur.fetchone()
|
||||
assert res is not None
|
||||
|
||||
while not caught_up:
|
||||
with s_con.cursor() as secondary_cursor:
|
||||
secondary_cursor.execute("SELECT pg_last_wal_replay_lsn()")
|
||||
res = secondary_cursor.fetchone()
|
||||
assert res is not None
|
||||
(secondary_lsn,) = res
|
||||
# There may be more changes on the primary after we got our LSN
|
||||
# due to e.g. autovacuum, but that shouldn't impact the content
|
||||
# of the tables, so we check whether we've replayed up to at
|
||||
# least after the commit of the `test` table.
|
||||
caught_up = secondary_lsn >= primary_lsn
|
||||
|
||||
# Explicit commit to flush any transient transaction-level state.
|
||||
s_con.commit()
|
||||
|
||||
for query in queries:
|
||||
with s_con.cursor() as secondary_cursor:
|
||||
secondary_cursor.execute(query)
|
||||
response = secondary_cursor.fetchone()
|
||||
assert response is not None
|
||||
assert response == responses[query]
|
||||
|
||||
scan_standby_log_for_errors(secondary)
|
||||
|
||||
# clean up
|
||||
if slow_down_send:
|
||||
sk_http.configure_failpoints(("sk-send-wal-replica-sleep", "off"))
|
||||
|
||||
Reference in New Issue
Block a user