Merge commit 'e7d18bc18' into problame/standby-horizon-leases

This commit is contained in:
Christian Schwarz
2025-08-06 17:19:37 +02:00
11 changed files with 446 additions and 65 deletions

View File

@@ -2,11 +2,12 @@ from __future__ import annotations
import urllib.parse
from enum import StrEnum
from typing import TYPE_CHECKING, final
from typing import TYPE_CHECKING, Any, final
import requests
from requests.adapters import HTTPAdapter
from requests.auth import AuthBase
from requests.exceptions import ReadTimeout
from typing_extensions import override
from fixtures.log_helper import log
@@ -102,6 +103,18 @@ class EndpointHttpClient(requests.Session):
wait_until(offloaded)
def promote(self, safekeepers_lsn: dict[str, Any], disconnect: bool = False):
url = f"http://localhost:{self.external_port}/promote"
if disconnect:
try: # send first request to start promote and disconnect
self.post(url, data=safekeepers_lsn, timeout=0.001)
except ReadTimeout:
pass # wait on second request which returns on promotion finish
res = self.post(url, data=safekeepers_lsn)
res.raise_for_status()
json: dict[str, str] = res.json()
return json
def database_schema(self, database: str):
res = self.get(
f"http://localhost:{self.external_port}/database_schema?database={urllib.parse.quote(database, safe='')}",

View File

@@ -1,29 +1,51 @@
"""
File with secondary->primary promotion testing.
This far, only contains a test that we don't break and that the data is persisted.
Secondary -> primary promotion testing
"""
from enum import StrEnum
from typing import cast
import psycopg2
import pytest
from fixtures.common_types import Lsn
from fixtures.log_helper import log
from fixtures.neon_fixtures import Endpoint, NeonEnv, wait_replica_caughtup
from fixtures.pg_version import PgVersion
from fixtures.utils import USE_LFC
from psycopg2.extensions import cursor as Cursor
from pytest import raises
def stop_and_check_lsn(ep: Endpoint, expected_lsn: Lsn | None):
ep.stop(mode="immediate-terminate")
lsn = ep.terminate_flush_lsn
if expected_lsn is not None:
assert (lsn is not None) == (expected_lsn is not None), f"{lsn=}, {expected_lsn=}"
if lsn is not None:
assert lsn >= expected_lsn, f"{expected_lsn=} < {lsn=}"
else:
assert lsn == expected_lsn, f"{expected_lsn=} != {lsn=}"
def test_replica_promotes(neon_simple_env: NeonEnv, pg_version: PgVersion):
def get_lsn_triple(cur: Cursor) -> tuple[str, str, str]:
cur.execute(
"""
SELECT pg_current_wal_insert_lsn(),
pg_current_wal_lsn(),
pg_current_wal_flush_lsn()
"""
)
return cast("tuple[str, str, str]", cur.fetchone())
class PromoteMethod(StrEnum):
COMPUTE_CTL = "compute-ctl"
POSTGRES = "postgres"
METHOD_OPTIONS = [e for e in PromoteMethod]
METHOD_IDS = [e.value for e in PromoteMethod]
@pytest.mark.skipif(not USE_LFC, reason="LFC is disabled, skipping")
@pytest.mark.parametrize("method", METHOD_OPTIONS, ids=METHOD_IDS)
def test_replica_promote(neon_simple_env: NeonEnv, method: PromoteMethod):
"""
Test that a replica safely promotes, and can commit data updates which
show up when the primary boots up after the promoted secondary endpoint
@@ -38,29 +60,26 @@ def test_replica_promotes(neon_simple_env: NeonEnv, pg_version: PgVersion):
with primary.connect() as primary_conn:
primary_cur = primary_conn.cursor()
primary_cur.execute("create extension neon")
primary_cur.execute(
"create table t(pk bigint GENERATED ALWAYS AS IDENTITY, payload integer)"
)
primary_cur.execute("INSERT INTO t(payload) SELECT generate_series(1, 100)")
primary_cur.execute(
"""
SELECT pg_current_wal_insert_lsn(),
pg_current_wal_lsn(),
pg_current_wal_flush_lsn()
"""
)
lsn_triple = cast("tuple[str, str, str]", primary_cur.fetchone())
lsn_triple = get_lsn_triple(primary_cur)
log.info(f"Primary: Current LSN after workload is {lsn_triple}")
expected_primary_lsn: Lsn = Lsn(lsn_triple[2])
primary_cur.execute("show neon.safekeepers")
safekeepers = primary_cur.fetchall()[0][0]
wait_replica_caughtup(primary, secondary)
if method == PromoteMethod.COMPUTE_CTL:
primary.http_client().offload_lfc()
else:
wait_replica_caughtup(primary, secondary)
with secondary.connect() as secondary_conn:
secondary_cur = secondary_conn.cursor()
secondary_cur.execute("select count(*) from t")
assert secondary_cur.fetchone() == (100,)
with raises(psycopg2.Error):
@@ -71,28 +90,30 @@ def test_replica_promotes(neon_simple_env: NeonEnv, pg_version: PgVersion):
secondary_cur.execute("select count(*) from t")
assert secondary_cur.fetchone() == (100,)
primary_endpoint_id = primary.endpoint_id
stop_and_check_lsn(primary, expected_primary_lsn)
# Reconnect to the secondary to make sure we get a read-write connection
promo_conn = secondary.connect()
promo_cur = promo_conn.cursor()
promo_cur.execute(f"alter system set neon.safekeepers='{safekeepers}'")
promo_cur.execute("select pg_reload_conf()")
if method == PromoteMethod.COMPUTE_CTL:
client = secondary.http_client()
client.prewarm_lfc(primary_endpoint_id)
# control plane knows safekeepers, simulate it by querying primary
assert (lsn := primary.terminate_flush_lsn)
safekeepers_lsn = {"safekeepers": safekeepers, "wal_flush_lsn": lsn}
assert client.promote(safekeepers_lsn)["status"] == "completed"
else:
promo_cur.execute(f"alter system set neon.safekeepers='{safekeepers}'")
promo_cur.execute("select pg_reload_conf()")
promo_cur.execute("SELECT * FROM pg_promote()")
assert promo_cur.fetchone() == (True,)
promo_cur.execute("SELECT * FROM pg_promote()")
assert promo_cur.fetchone() == (True,)
promo_cur.execute(
"""
SELECT pg_current_wal_insert_lsn(),
pg_current_wal_lsn(),
pg_current_wal_flush_lsn()
"""
)
log.info(f"Secondary: LSN after promotion is {promo_cur.fetchone()}")
lsn_triple = get_lsn_triple(promo_cur)
log.info(f"Secondary: LSN after promotion is {lsn_triple}")
# Reconnect to the secondary to make sure we get a read-write connection
with secondary.connect() as new_primary_conn:
new_primary_cur = new_primary_conn.cursor()
with secondary.connect() as conn, conn.cursor() as new_primary_cur:
new_primary_cur.execute("select count(*) from t")
assert new_primary_cur.fetchone() == (100,)
@@ -101,43 +122,34 @@ def test_replica_promotes(neon_simple_env: NeonEnv, pg_version: PgVersion):
)
assert new_primary_cur.fetchall() == [(it,) for it in range(101, 201)]
new_primary_cur = new_primary_conn.cursor()
new_primary_cur = conn.cursor()
new_primary_cur.execute("select payload from t")
assert new_primary_cur.fetchall() == [(it,) for it in range(1, 201)]
new_primary_cur.execute("select count(*) from t")
assert new_primary_cur.fetchone() == (200,)
new_primary_cur.execute(
"""
SELECT pg_current_wal_insert_lsn(),
pg_current_wal_lsn(),
pg_current_wal_flush_lsn()
"""
)
log.info(f"Secondary: LSN after workload is {new_primary_cur.fetchone()}")
with secondary.connect() as second_viewpoint_conn:
new_primary_cur = second_viewpoint_conn.cursor()
lsn_triple = get_lsn_triple(new_primary_cur)
log.info(f"Secondary: LSN after workload is {lsn_triple}")
expected_promoted_lsn = Lsn(lsn_triple[2])
with secondary.connect() as conn, conn.cursor() as new_primary_cur:
new_primary_cur.execute("select payload from t")
assert new_primary_cur.fetchall() == [(it,) for it in range(1, 201)]
# wait_for_last_flush_lsn(env, secondary, env.initial_tenant, env.initial_timeline)
# secondaries don't sync safekeepers on finish so LSN will be None
stop_and_check_lsn(secondary, None)
if method == PromoteMethod.COMPUTE_CTL:
# compute_ctl's /promote switches replica type to Primary so it syncs
# safekeepers on finish
stop_and_check_lsn(secondary, expected_promoted_lsn)
else:
# on testing postgres, we don't update replica type, secondaries don't
# sync so lsn should be None
stop_and_check_lsn(secondary, None)
primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary2")
with primary.connect() as new_primary:
new_primary_cur = new_primary.cursor()
new_primary_cur.execute(
"""
SELECT pg_current_wal_insert_lsn(),
pg_current_wal_lsn(),
pg_current_wal_flush_lsn()
"""
)
lsn_triple = cast("tuple[str, str, str]", new_primary_cur.fetchone())
with primary.connect() as new_primary, new_primary.cursor() as new_primary_cur:
lsn_triple = get_lsn_triple(new_primary_cur)
expected_primary_lsn = Lsn(lsn_triple[2])
log.info(f"New primary: Boot LSN is {lsn_triple}")
@@ -146,5 +158,39 @@ def test_replica_promotes(neon_simple_env: NeonEnv, pg_version: PgVersion):
new_primary_cur.execute("INSERT INTO t (payload) SELECT generate_series(201, 300)")
new_primary_cur.execute("select count(*) from t")
assert new_primary_cur.fetchone() == (300,)
stop_and_check_lsn(primary, expected_primary_lsn)
@pytest.mark.skipif(not USE_LFC, reason="LFC is disabled, skipping")
def test_replica_promote_handler_disconnects(neon_simple_env: NeonEnv):
"""
Test that if a handler disconnects from /promote route of compute_ctl, promotion still happens
once, and no error is thrown
"""
env: NeonEnv = neon_simple_env
primary: Endpoint = env.endpoints.create_start(branch_name="main", endpoint_id="primary")
secondary: Endpoint = env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary")
with primary.connect() as conn, conn.cursor() as cur:
cur.execute("create extension neon")
cur.execute("create table t(pk bigint GENERATED ALWAYS AS IDENTITY, payload integer)")
cur.execute("INSERT INTO t(payload) SELECT generate_series(1, 100)")
cur.execute("show neon.safekeepers")
safekeepers = cur.fetchall()[0][0]
primary.http_client().offload_lfc()
primary_endpoint_id = primary.endpoint_id
primary.stop(mode="immediate-terminate")
assert (lsn := primary.terminate_flush_lsn)
client = secondary.http_client()
client.prewarm_lfc(primary_endpoint_id)
safekeepers_lsn = {"safekeepers": safekeepers, "wal_flush_lsn": lsn}
assert client.promote(safekeepers_lsn, disconnect=True)["status"] == "completed"
with secondary.connect() as conn, conn.cursor() as cur:
cur.execute("select count(*) from t")
assert cur.fetchone() == (100,)
cur.execute("INSERT INTO t (payload) SELECT generate_series(101, 200) RETURNING payload")
cur.execute("select count(*) from t")
assert cur.fetchone() == (200,)