From b401cec1573fb5df4d64f8ad238f70893e1a03f5 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 10 Jul 2025 20:09:53 +0000 Subject: [PATCH] WIP --- pageserver/src/page_service.rs | 2 + test_runner/fixtures/neon_fixtures.py | 24 +++- test_runner/regress/test_hot_standby.py | 170 ++++++++++++++++-------- 3 files changed, 132 insertions(+), 64 deletions(-) diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 70fdb2e789..1082f6daa8 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -1195,6 +1195,8 @@ impl PageServerHandler { }) .await?; + failpoint_support::pausable_failpoint!("pagestream_read_message:before_gc_cutoff_check", cancel).unwrap(); + // We're holding the Handle let effective_lsn = match Self::effective_request_lsn( &shard, diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 42924f9b83..677873906b 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -5631,14 +5631,24 @@ def tenant_get_shards( ] -def wait_replica_caughtup(primary: Endpoint, secondary: Endpoint): - primary_lsn = Lsn( - primary.safe_psql_scalar("SELECT pg_current_wal_flush_lsn()", log_query=False) - ) - while True: - secondary_lsn = Lsn( - secondary.safe_psql_scalar("SELECT pg_last_wal_replay_lsn()", log_query=False) +def wait_replica_caughtup(primary: Endpoint, secondary: Endpoint, primary_cursor=None, secondary_cursor=None): + if primary_cursor is not None: + primary_cursor.execute("SELECT pg_current_wal_flush_lsn()") + [res] = primary_cursor.fetchone() + primary_lsn = Lsn(res) + else: + primary_lsn = Lsn( + primary.safe_psql_scalar("SELECT pg_current_wal_flush_lsn()", log_query=False) ) + while True: + if secondary_cursor is not None: + secondary_cursor.execute("SELECT pg_last_wal_replay_lsn()") + [res] = secondary_cursor.fetchone() + secondary_lsn = Lsn(res) + else: + secondary_lsn = Lsn( + secondary.safe_psql_scalar("SELECT pg_last_wal_replay_lsn()", log_query=False) + ) caught_up = secondary_lsn >= primary_lsn log.info(f"caughtup={caught_up}, primary_lsn={primary_lsn}, secondary_lsn={secondary_lsn}") if caught_up: diff --git a/test_runner/regress/test_hot_standby.py b/test_runner/regress/test_hot_standby.py index e384c57c2e..66e8226e11 100644 --- a/test_runner/regress/test_hot_standby.py +++ b/test_runner/regress/test_hot_standby.py @@ -1,11 +1,13 @@ from __future__ import annotations import asyncio +import concurrent.futures import os import threading import time from functools import partial +import psycopg2 import pytest from fixtures.common_types import Lsn from fixtures.log_helper import log @@ -137,6 +139,8 @@ def test_hot_standby_gc(neon_env_builder: NeonEnvBuilder, pause_apply: bool): # this test is largely about PS GC behavior, we control it manually "gc_period": "0s", "compaction_period": "0s", + # short gc_horizon to force the issue + "gc_horizon": 1, } env = neon_env_builder.init_start(initial_tenant_conf=tenant_conf) timeline_id = env.initial_timeline @@ -162,15 +166,7 @@ def test_hot_standby_gc(neon_env_builder: NeonEnvBuilder, pause_apply: bool): wait_replica_caughtup(primary, secondary) s_cur = secondary.connect().cursor() - - s_cur.execute("SELECT 1 WHERE pg_is_in_recovery()") - res = s_cur.fetchone() - assert res is not None - - s_cur.execute("SHOW hot_standby_feedback") - res = s_cur.fetchone() - assert res is not None - assert res[0] == "off" + s_cur2 = secondary.connect().cursor() s_cur.execute("SELECT COUNT(*) FROM test") res = s_cur.fetchone() @@ -182,68 +178,128 @@ def test_hot_standby_gc(neon_env_builder: NeonEnvBuilder, pause_apply: bool): # so we still remember the LSNs of the pages. secondary.clear_buffers(cursor=s_cur) + # fault in evicted pages now, so that this cursor/backend doesn't need to do getpages + # all of this pain is needed solely because we don't have failpoints in compute to selectively + # pause just the secondary getpage requests + s_cur2.execute("SELECT pg_backend_pid()") + log.info(f"s_cur2 is {s_cur2.fetchone()}") + s_cur2.execute("SELECT pg_last_wal_replay_lsn()") + if pause_apply: - s_cur.execute("SELECT pg_wal_replay_pause()") + s_cur2.execute("SELECT pg_wal_replay_pause()") # Do other stuff on the primary, to advance the WAL p_cur.execute("CREATE TABLE test2 AS SELECT generate_series(1, 1000000) AS g") - # Run GC. The PITR interval is very small, so this advances the GC cutoff LSN - # very close to the primary's current insert LSN. shards = tenant_get_shards(env, tenant_id, None) - for tenant_shard_id, pageserver in shards: + + + for _, pageserver in shards: client = pageserver.http_client() - client.timeline_checkpoint(tenant_shard_id, timeline_id) - client.timeline_compact(tenant_shard_id, timeline_id) - client.timeline_gc(tenant_shard_id, timeline_id, 0) + client.configure_failpoints( + ( + "pagestream_read_message:before_gc_cutoff_check", + f"pause", + ) + ) - # Re-execute the query. The GetPage requests that this - # generates use old not_modified_since LSNs, older than - # the GC cutoff, but new request LSNs. (In protocol - # version 1 there was only one LSN, and this failed.) - log_replica_lag(primary, secondary) - s_cur.execute("SELECT COUNT(*) FROM test") - log_replica_lag(primary, secondary) - res = s_cur.fetchone() - assert res == (10000,) + with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: - if pause_apply: - s_cur.execute("SELECT pg_wal_replay_resume()") + # + # kick off a bunch of getpage requests at the current apply_lsn + # - wait_replica_caughtup(primary, secondary) + def make_replica_send_getpage(): + s_cur.execute("SELECT COUNT(*) FROM test") + res = s_cur.fetchone() + assert res == (10000,) - # Wait for PS's view of standby horizon to catch up. - # (When we switch to leases (LKB-88) we need to change this to watch the lease lsn move.) - # (TODO: instead of checking impl details here, somehow assert that gc can delete layers now. - # Tricky to do that without flakiness though.) - # We already waited for replica to catch up, so, this timeout is strictly on - # a few few in-memory only RPCs to propagate standby_horizon. - timeout_secs = 10 - started_at = time.time() - shards = tenant_get_shards(env, tenant_id, None) - for tenant_shard_id, pageserver in shards: - client = pageserver.http_client() + task1 = executor.submit(make_replica_send_getpage) + + # + # wait until the requests have hit the failpoint, which is + # very early, before capturing gc cutoff rcu read; so, gc + # cutoff isn't held back by these requests; think of it + # as a network delay + # + time.sleep(5) + + # + # advance apply_lsn by resuming replay + # + def replay_lsn(): + s_cur2.execute("SELECT pg_last_wal_replay_lsn()") + [replay_lsn] = s_cur2.fetchone() + return Lsn(replay_lsn) + submitted_lsn = replay_lsn() + log.info(f"submitted getpages with request_lsn={submitted_lsn}") + log.info("resuming wal replay") + s_cur2.execute("SELECT pg_wal_replay_resume()") + log.info("waiting for secondary to catch up") while True: - secondary_apply_lsn = Lsn( - secondary.safe_psql_scalar( - "SELECT pg_last_wal_replay_lsn()", log_query=False + current_replay_lsn = replay_lsn() + log.info(f"{current_replay_lsn=} {submitted_lsn}") + if current_replay_lsn > submitted_lsn + 8192: + break + time.sleep(1) + log.info("pausing wal replay") + s_cur2.execute("SELECT pg_wal_replay_pause()") + + # + # secondary now is at a higher apply_lsn + # wait for it to propagate into standby_horizon + # + for tenant_shard_id, pageserver in shards: + client = pageserver.http_client() + # wait for standby horizon to catch up + while True: + metrics = client.get_metrics() + sample = metrics.query_one("pageserver_standby_horizon", {"tenant_id": str(tenant_shard_id.tenant_id), "shard_id": str(tenant_shard_id.shard_index), "timeline_id": str(timeline_id)}) + current_standby_horizon = Lsn(int(sample.value)) + log.info(f"{current_standby_horizon}") + current_replay_lsn = replay_lsn() + log.info(f"{current_standby_horizon=} {current_replay_lsn=}") + if current_standby_horizon == current_replay_lsn: + break + time.sleep(1) + + # + # now trigger gc; it will cutoff at standby_horizon, i.e., + # at the advanced apply_lsn, above the delayed requests' request_lsn + # + + log.info("do gc") + for tenant_shard_id, pageserver in shards: + client.timeline_checkpoint(tenant_shard_id, timeline_id) + client.timeline_compact(tenant_shard_id, timeline_id) + gc_status = client.timeline_gc(tenant_shard_id, timeline_id, 0) + log.info(f"{gc_status=}") + client.timeline_compact(tenant_shard_id, timeline_id, enhanced_gc_bottom_most_compaction=True) + gc_status = client.timeline_gc(tenant_shard_id, timeline_id, 0) + log.info(f"{gc_status=}") + detail = client.timeline_detail(tenant_shard_id, timeline_id) + log.info(f"{detail=}") + assert Lsn(detail["applied_gc_cutoff_lsn"]) == current_standby_horizon + + # + # unblock the requests that were delayed + # until we fix the bug, they will fail because their request_lsn is below standby horizon + # + log.info("unblock requests") + for _, pageserver in shards: + client = pageserver.http_client() + client.configure_failpoints( + ( + "pagestream_read_message:before_gc_cutoff_check", + f"off", ) ) - standby_horizon_metric = client.get_metrics().query_one( - "pageserver_standby_horizon", - { - "tenant_id": str(tenant_shard_id.tenant_id), - "shard_id": str(tenant_shard_id.shard_index), - "timeline_id": str(timeline_id), - }, - ) - standby_horizon_at_ps = Lsn(int(standby_horizon_metric.value)) - log.info(f"{tenant_shard_id.shard_index=}: {standby_horizon_at_ps=} {secondary_apply_lsn=}") - if secondary_apply_lsn == standby_horizon_at_ps: - break - if time.time() - started_at > timeout_secs: - pytest.fail(f"standby_horizon didn't propagate within {timeout_secs=}, this is holding up gc on secondary") - time.sleep(1) + log.info("waiting for select to complete") + # until the bug is fixed, the blocked getpage requests will fail with the error below + expect_fail = f"requested at {submitted_lsn} gc cutoff {current_standby_horizon}" + log.info(f"until the bug is fixed, we expect task1 to fail with a postgres IO error because of failed getpage, witht he following messages: {expect_fail}") + task1.result() + log.info("the delayed requests completed without errors, wohoo, the bug is fixed") def run_pgbench(connstr: str, pg_bin: PgBin):