Add test for replica promotion

This validates that replicas can promote, and start write changes,
and that these changes are also persisted.  However, this does not
check any less-than-happy paths.
This commit is contained in:
Matthias van de Meent
2025-05-09 20:03:59 +02:00
committed by Konstantin Knizhnik
parent 397e030fb0
commit 19c5eb53e5
2 changed files with 43 additions and 27 deletions

View File

@@ -74,8 +74,9 @@ def test_hot_standby(neon_simple_env: NeonEnv):
for query in queries:
with s_con.cursor() as secondary_cursor:
secondary_cursor.execute(query)
response = secondary_cursor.fetchone()
assert response is not None
res = secondary_cursor.fetchone()
assert res is not None
response = res
assert response == responses[query]
# Check for corrupted WAL messages which might otherwise go unnoticed if
@@ -164,7 +165,7 @@ def test_hot_standby_gc(neon_env_builder: NeonEnvBuilder, pause_apply: bool):
s_cur.execute("SELECT COUNT(*) FROM test")
res = s_cur.fetchone()
assert res[0] == 10000
assert res == (10000,)
# Clear the cache in the standby, so that when we
# re-execute the query, it will make GetPage
@@ -195,7 +196,7 @@ def test_hot_standby_gc(neon_env_builder: NeonEnvBuilder, pause_apply: bool):
s_cur.execute("SELECT COUNT(*) FROM test")
log_replica_lag(primary, secondary)
res = s_cur.fetchone()
assert res[0] == 10000
assert res == (10000,)
def run_pgbench(connstr: str, pg_bin: PgBin):

View File

@@ -4,19 +4,13 @@ File with secondary->primary promotion testing.
This far, only contains a test that we don't break and that the data is persisted.
"""
from __future__ import annotations
import threading
from contextlib import closing
import psycopg2
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import Endpoint, NeonEnv, wait_for_last_flush_lsn, wait_replica_caughtup
from fixtures.neon_fixtures import Endpoint, NeonEnv, wait_replica_caughtup
from fixtures.pg_version import PgVersion
from fixtures.utils import query_scalar, skip_on_postgres, wait_until
from pytest import raises
def test_replica_promotes(neon_simple_env: NeonEnv):
def test_replica_promotes(neon_simple_env: NeonEnv, pg_version: PgVersion):
"""
Test that a replica safely promotes, and can commit data updates which
show up when the primary boots up after the promoted secondary endpoint
@@ -31,7 +25,9 @@ def test_replica_promotes(neon_simple_env: NeonEnv):
with primary.connect() as primary_conn:
primary_cur = primary_conn.cursor()
primary_cur.execute("create table t(pk bigint GENERATED ALWAYS AS IDENTITY, payload integer)")
primary_cur.execute(
"create table t(pk bigint GENERATED ALWAYS AS IDENTITY, payload integer)"
)
primary_cur.execute("INSERT INTO t(payload) SELECT generate_series(1, 100)")
primary_cur.execute("select pg_switch_wal()")
@@ -42,30 +38,49 @@ def test_replica_promotes(neon_simple_env: NeonEnv):
secondary_cur.execute("select count(*) from t")
assert secondary_cur.fetchone() == (100,)
with pytest.raises():
secondary_cur.execute("INSERT INTO t (payload) SELECT generate_series(101, 200)")
secondary_cur.commit()
with raises(psycopg2.Error):
secondary_cur.execute("INSERT INTO t (payload) SELECT generate_series(101, 200)")
secondary_conn.commit()
secondary_conn.rollback()
secondary_cur.execute("select count(*) from t")
assert secondary_cur.fetchone() == (100,)
wait_replica_caughtup(primary, secondary)
primary.stop(mode="immediate")
wait_replica_caughtup(primary, secondary)
primary.stop()
secondary_cur = secondary_conn.cursor()
secondary_conn = secondary.connect()
secondary_cur = secondary_conn.cursor()
if env.pg_version() is PgVersion.V14:
signalfile = secondary.pgdata_dir / "standby.signal"
assert signalfile.exists()
signalfile.unlink()
promoted = False
while not promoted:
with secondary.connect() as try_it:
try_cursor = try_it.cursor()
try_cursor.execute(
"SELECT setting FROM pg_settings WHERE name = 'transaction_read_only'"
)
promoted = ("off",) == try_cursor.fetchone()
else:
secondary_cur.execute("SELECT * FROM pg_promote()")
assert secondary_cur.fetchone() == (True,)
secondary_cur.execute("INSERT INTO t (payload) SELECT generate_series(101, 200)")
secondary_cur.fetchall()
secondary_conn = secondary.connect()
secondary_cur = secondary_conn.cursor()
secondary_cur.execute("select count(*) from t")
assert secondary_cur.fetchone() == (200,)
secondary_cur.execute("INSERT INTO t (payload) SELECT generate_series(101, 200)")
secondary_cur.fetchall()
secondary_cur.execute("select count(*) from t")
assert secondary_cur.fetchone() == (200,)
secondary.stop(mode="immediate")
primary.start()
with primary.connect() as primary_conn: