move into separate test and prettify

This commit is contained in:
Christian Schwarz
2025-07-10 20:36:00 +00:00
parent b401cec157
commit 3d8ca0245c
2 changed files with 247 additions and 112 deletions

View File

@@ -5655,7 +5655,6 @@ def wait_replica_caughtup(primary: Endpoint, secondary: Endpoint, primary_cursor
return
time.sleep(1)
def log_replica_lag(primary: Endpoint, secondary: Endpoint):
last_replay_lsn = Lsn(
secondary.safe_psql_scalar("SELECT pg_last_wal_replay_lsn()", log_query=False)

View File

@@ -2,6 +2,7 @@ from __future__ import annotations
import asyncio
import concurrent.futures
import contextlib
import os
import threading
import time
@@ -139,8 +140,6 @@ def test_hot_standby_gc(neon_env_builder: NeonEnvBuilder, pause_apply: bool):
# this test is largely about PS GC behavior, we control it manually
"gc_period": "0s",
"compaction_period": "0s",
# short gc_horizon to force the issue
"gc_horizon": 1,
}
env = neon_env_builder.init_start(initial_tenant_conf=tenant_conf)
timeline_id = env.initial_timeline
@@ -166,7 +165,15 @@ def test_hot_standby_gc(neon_env_builder: NeonEnvBuilder, pause_apply: bool):
wait_replica_caughtup(primary, secondary)
s_cur = secondary.connect().cursor()
s_cur2 = secondary.connect().cursor()
s_cur.execute("SELECT 1 WHERE pg_is_in_recovery()")
res = s_cur.fetchone()
assert res is not None
s_cur.execute("SHOW hot_standby_feedback")
res = s_cur.fetchone()
assert res is not None
assert res[0] == "off"
s_cur.execute("SELECT COUNT(*) FROM test")
res = s_cur.fetchone()
@@ -178,128 +185,257 @@ def test_hot_standby_gc(neon_env_builder: NeonEnvBuilder, pause_apply: bool):
# so we still remember the LSNs of the pages.
secondary.clear_buffers(cursor=s_cur)
# fault in evicted pages now, so that this cursor/backend doesn't need to do getpages
# all of this pain is needed solely because we don't have failpoints in compute to selectively
# pause just the secondary getpage requests
s_cur2.execute("SELECT pg_backend_pid()")
log.info(f"s_cur2 is {s_cur2.fetchone()}")
s_cur2.execute("SELECT pg_last_wal_replay_lsn()")
if pause_apply:
s_cur2.execute("SELECT pg_wal_replay_pause()")
s_cur.execute("SELECT pg_wal_replay_pause()")
# Do other stuff on the primary, to advance the WAL
p_cur.execute("CREATE TABLE test2 AS SELECT generate_series(1, 1000000) AS g")
# Run GC. The PITR interval is very small, so this advances the GC cutoff LSN
# very close to the primary's current insert LSN.
shards = tenant_get_shards(env, tenant_id, None)
for _, pageserver in shards:
for tenant_shard_id, pageserver in shards:
client = pageserver.http_client()
client.configure_failpoints(
(
"pagestream_read_message:before_gc_cutoff_check",
f"pause",
)
)
client.timeline_checkpoint(tenant_shard_id, timeline_id)
client.timeline_compact(tenant_shard_id, timeline_id)
client.timeline_gc(tenant_shard_id, timeline_id, 0)
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
# Re-execute the query. The GetPage requests that this
# generates use old not_modified_since LSNs, older than
# the GC cutoff, but new request LSNs. (In protocol
# version 1 there was only one LSN, and this failed.)
log_replica_lag(primary, secondary)
s_cur.execute("SELECT COUNT(*) FROM test")
log_replica_lag(primary, secondary)
res = s_cur.fetchone()
assert res == (10000,)
#
# kick off a bunch of getpage requests at the current apply_lsn
#
if pause_apply:
s_cur.execute("SELECT pg_wal_replay_resume()")
def make_replica_send_getpage():
s_cur.execute("SELECT COUNT(*) FROM test")
res = s_cur.fetchone()
assert res == (10000,)
wait_replica_caughtup(primary, secondary)
task1 = executor.submit(make_replica_send_getpage)
#
# wait until the requests have hit the failpoint, which is
# very early, before capturing gc cutoff rcu read; so, gc
# cutoff isn't held back by these requests; think of it
# as a network delay
#
time.sleep(5)
#
# advance apply_lsn by resuming replay
#
def replay_lsn():
s_cur2.execute("SELECT pg_last_wal_replay_lsn()")
[replay_lsn] = s_cur2.fetchone()
return Lsn(replay_lsn)
submitted_lsn = replay_lsn()
log.info(f"submitted getpages with request_lsn={submitted_lsn}")
log.info("resuming wal replay")
s_cur2.execute("SELECT pg_wal_replay_resume()")
log.info("waiting for secondary to catch up")
# Wait for PS's view of standby horizon to catch up.
# (When we switch to leases (LKB-88) we need to change this to watch the lease lsn move.)
# (TODO: instead of checking impl details here, somehow assert that gc can delete layers now.
# Tricky to do that without flakiness though.)
# We already waited for replica to catch up, so, this timeout is strictly on
# a few few in-memory only RPCs to propagate standby_horizon.
timeout_secs = 10
started_at = time.time()
shards = tenant_get_shards(env, tenant_id, None)
for tenant_shard_id, pageserver in shards:
client = pageserver.http_client()
while True:
current_replay_lsn = replay_lsn()
log.info(f"{current_replay_lsn=} {submitted_lsn}")
if current_replay_lsn > submitted_lsn + 8192:
break
time.sleep(1)
log.info("pausing wal replay")
s_cur2.execute("SELECT pg_wal_replay_pause()")
#
# secondary now is at a higher apply_lsn
# wait for it to propagate into standby_horizon
#
for tenant_shard_id, pageserver in shards:
client = pageserver.http_client()
# wait for standby horizon to catch up
while True:
metrics = client.get_metrics()
sample = metrics.query_one("pageserver_standby_horizon", {"tenant_id": str(tenant_shard_id.tenant_id), "shard_id": str(tenant_shard_id.shard_index), "timeline_id": str(timeline_id)})
current_standby_horizon = Lsn(int(sample.value))
log.info(f"{current_standby_horizon}")
current_replay_lsn = replay_lsn()
log.info(f"{current_standby_horizon=} {current_replay_lsn=}")
if current_standby_horizon == current_replay_lsn:
break
time.sleep(1)
#
# now trigger gc; it will cutoff at standby_horizon, i.e.,
# at the advanced apply_lsn, above the delayed requests' request_lsn
#
log.info("do gc")
for tenant_shard_id, pageserver in shards:
client.timeline_checkpoint(tenant_shard_id, timeline_id)
client.timeline_compact(tenant_shard_id, timeline_id)
gc_status = client.timeline_gc(tenant_shard_id, timeline_id, 0)
log.info(f"{gc_status=}")
client.timeline_compact(tenant_shard_id, timeline_id, enhanced_gc_bottom_most_compaction=True)
gc_status = client.timeline_gc(tenant_shard_id, timeline_id, 0)
log.info(f"{gc_status=}")
detail = client.timeline_detail(tenant_shard_id, timeline_id)
log.info(f"{detail=}")
assert Lsn(detail["applied_gc_cutoff_lsn"]) == current_standby_horizon
#
# unblock the requests that were delayed
# until we fix the bug, they will fail because their request_lsn is below standby horizon
#
log.info("unblock requests")
for _, pageserver in shards:
client = pageserver.http_client()
client.configure_failpoints(
(
"pagestream_read_message:before_gc_cutoff_check",
f"off",
secondary_apply_lsn = Lsn(
secondary.safe_psql_scalar(
"SELECT pg_last_wal_replay_lsn()", log_query=False
)
)
log.info("waiting for select to complete")
# until the bug is fixed, the blocked getpage requests will fail with the error below
expect_fail = f"requested at {submitted_lsn} gc cutoff {current_standby_horizon}"
log.info(f"until the bug is fixed, we expect task1 to fail with a postgres IO error because of failed getpage, witht he following messages: {expect_fail}")
task1.result()
log.info("the delayed requests completed without errors, wohoo, the bug is fixed")
standby_horizon_metric = client.get_metrics().query_one(
"pageserver_standby_horizon",
{
"tenant_id": str(tenant_shard_id.tenant_id),
"shard_id": str(tenant_shard_id.shard_index),
"timeline_id": str(timeline_id),
},
)
standby_horizon_at_ps = Lsn(int(standby_horizon_metric.value))
log.info(f"{tenant_shard_id.shard_index=}: {standby_horizon_at_ps=} {secondary_apply_lsn=}")
if secondary_apply_lsn == standby_horizon_at_ps:
break
if time.time() - started_at > timeout_secs:
pytest.fail(f"standby_horizon didn't propagate within {timeout_secs=}, this is holding up gc on secondary")
time.sleep(1)
def test_hot_standby_gc_with_inflight_requests(neon_env_builder: NeonEnvBuilder):
tenant_conf = {
# set PITR interval to be small, so we can do GC
"pitr_interval": "0 s",
# this test is largely about PS GC behavior, we control it manually
"gc_period": "0s",
"compaction_period": "0s",
}
env = neon_env_builder.init_start(initial_tenant_conf=tenant_conf)
timeline_id = env.initial_timeline
tenant_id = env.initial_tenant
with contextlib.ExitStack() as stack:
executor = stack.enter_context(concurrent.futures.ThreadPoolExecutor(max_workers=2))
primary = stack.enter_context(env.endpoints.create_start( branch_name="main", endpoint_id="primary"))
secondary = stack.enter_context(env.endpoints.new_replica_start(
origin=primary,
endpoint_id="secondary",
# Protocol version 2 was introduced to fix the issue
# that this test exercises. With protocol version 1 it
# fails.
config_lines=["neon.protocol_version=2"],
))
p_cur = primary.connect().cursor()
s_cur = secondary.connect().cursor()
s_cur2 = secondary.connect().cursor()
p_cur.execute("CREATE EXTENSION neon_test_utils")
p_cur.execute("CREATE TABLE test (id int primary key) WITH (autovacuum_enabled=false)")
# helper
def replay_lsn():
s_cur2.execute("SELECT pg_last_wal_replay_lsn()")
[replay_lsn] = s_cur2.fetchone()
return Lsn(replay_lsn)
#
# create initial set of data the standby can query
#
p_cur.execute("INSERT INTO test SELECT generate_series(1, 10000) AS g")
#
# wait until standby is caught up
#
wait_replica_caughtup(primary, secondary)
s_cur.execute("SELECT COUNT(*) FROM test")
res = s_cur.fetchone()
assert res == (10000,)
#
# clear standby cache so that the select we
# do in make_replica_send_getpage() below actually
# sends getpage requests.
#
secondary.clear_buffers(cursor=s_cur)
#
# Even a simple SELECT pg_last_wal_replay_lsn() needs to read some pages.
# Fault them in now so that s_cur2 will be unaffected by
# the pausing of getpage request handling below.
# (This was quite tricky to debug; if something breaks in the future and
# you need to debug this test, try printing backend IDs and correlating them
# with "slow getpage" logs in compute.log)
# (All of this pain would go away if we could scope the failpoint to
# individual backends' getpage requests.
# TODO: add such capability in this PR, it's not much work and the failpoint
# is too high overhead anyway.
#
s_cur2.execute("SELECT pg_last_wal_replay_lsn()")
#
# Stop wal replay on secondary.
#
s_cur2.execute("SELECT pg_wal_replay_pause()")
#
# Advance lsn on primary, we'll use it later to advance lsn on secondary
# when we resume replay on secondary.
#
p_cur.execute("CREATE TABLE test2 AS SELECT generate_series(1, 1000000) AS g")
#
# Block any more getpage requests really early, before we get the RCU read
# for applied_gc_cutoff_lsn.
# This simulates delayed requests.
#
shards = tenant_get_shards(env, tenant_id, None)
for _, pageserver in shards:
client = pageserver.http_client()
client.configure_failpoints(
(
"pagestream_read_message:before_gc_cutoff_check",
f"pause",
)
)
#
# on the secondary, kick off a bunch of getpage requests
# at the current apply_lsn; they will hit the failpoint,
# simulating that they're stuck somewhere in the network
#
def make_replica_send_getpage():
s_cur.execute("SELECT COUNT(*) FROM test")
res = s_cur.fetchone()
assert res == (10000,)
getpage_requests_task = executor.submit(make_replica_send_getpage)
#
# wait until the requests have hit the failpoint
# TODO: use something not timing dependent here
#
time.sleep(5)
#
# advance apply_lsn by resuming replay temporarily
#
submitted_lsn = replay_lsn()
log.info(f"submitted getpages with request_lsn={submitted_lsn}")
log.info("resuming wal replay")
s_cur2.execute("SELECT pg_wal_replay_resume()")
log.info("waiting for secondary to catch up")
while True:
current_replay_lsn = replay_lsn()
log.info(f"{current_replay_lsn=} {submitted_lsn}")
if current_replay_lsn > submitted_lsn + 8192:
break
time.sleep(1)
log.info("pausing wal replay")
s_cur2.execute("SELECT pg_wal_replay_pause()")
#
# secondary now is at a higher apply_lsn
# wait for it to propagate into standby_horizon
#
for tenant_shard_id, pageserver in shards:
client = pageserver.http_client()
# wait for standby horizon to catch up
while True:
metrics = client.get_metrics()
sample = metrics.query_one("pageserver_standby_horizon", {"tenant_id": str(tenant_shard_id.tenant_id), "shard_id": str(tenant_shard_id.shard_index), "timeline_id": str(timeline_id)})
current_standby_horizon = Lsn(int(sample.value))
log.info(f"{current_standby_horizon}")
current_replay_lsn = replay_lsn()
log.info(f"{current_standby_horizon=} {current_replay_lsn=}")
if current_standby_horizon == current_replay_lsn:
break
time.sleep(1)
#
# now trigger gc; it will cutoff at standby_horizon, i.e.,
# at the advanced apply_lsn, above the delayed requests' request_lsn
#
log.info("do gc")
for tenant_shard_id, pageserver in shards:
client.timeline_checkpoint(tenant_shard_id, timeline_id)
client.timeline_compact(tenant_shard_id, timeline_id)
gc_status = client.timeline_gc(tenant_shard_id, timeline_id, 0)
log.info(f"{gc_status=}")
client.timeline_compact(tenant_shard_id, timeline_id, enhanced_gc_bottom_most_compaction=True)
gc_status = client.timeline_gc(tenant_shard_id, timeline_id, 0)
log.info(f"{gc_status=}")
detail = client.timeline_detail(tenant_shard_id, timeline_id)
log.info(f"{detail=}")
assert Lsn(detail["applied_gc_cutoff_lsn"]) == current_standby_horizon
#
# unblock the requests that were delayed
# until we fix the bug, they will fail because their request_lsn is below standby horizon
#
log.info("unblock requests")
for _, pageserver in shards:
client = pageserver.http_client()
client.configure_failpoints(
(
"pagestream_read_message:before_gc_cutoff_check",
f"off",
)
)
log.info("waiting for select to complete")
# until the bug is fixed, the blocked getpage requests will fail with the error below
expect_fail = f"requested at {submitted_lsn} gc cutoff {current_standby_horizon}"
log.info(f"until the bug is fixed, we expect task1 to fail with a postgres IO error because of failed getpage, witht he following messages: {expect_fail}")
getpage_requests_task.result()
log.info("the delayed requests completed without errors, wohoo, the bug is fixed")
def run_pgbench(connstr: str, pg_bin: PgBin):