diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 677873906b..2a9c545404 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -5655,7 +5655,6 @@ def wait_replica_caughtup(primary: Endpoint, secondary: Endpoint, primary_cursor return time.sleep(1) - def log_replica_lag(primary: Endpoint, secondary: Endpoint): last_replay_lsn = Lsn( secondary.safe_psql_scalar("SELECT pg_last_wal_replay_lsn()", log_query=False) diff --git a/test_runner/regress/test_hot_standby.py b/test_runner/regress/test_hot_standby.py index 66e8226e11..7f1f6867e5 100644 --- a/test_runner/regress/test_hot_standby.py +++ b/test_runner/regress/test_hot_standby.py @@ -2,6 +2,7 @@ from __future__ import annotations import asyncio import concurrent.futures +import contextlib import os import threading import time @@ -139,8 +140,6 @@ def test_hot_standby_gc(neon_env_builder: NeonEnvBuilder, pause_apply: bool): # this test is largely about PS GC behavior, we control it manually "gc_period": "0s", "compaction_period": "0s", - # short gc_horizon to force the issue - "gc_horizon": 1, } env = neon_env_builder.init_start(initial_tenant_conf=tenant_conf) timeline_id = env.initial_timeline @@ -166,7 +165,15 @@ def test_hot_standby_gc(neon_env_builder: NeonEnvBuilder, pause_apply: bool): wait_replica_caughtup(primary, secondary) s_cur = secondary.connect().cursor() - s_cur2 = secondary.connect().cursor() + + s_cur.execute("SELECT 1 WHERE pg_is_in_recovery()") + res = s_cur.fetchone() + assert res is not None + + s_cur.execute("SHOW hot_standby_feedback") + res = s_cur.fetchone() + assert res is not None + assert res[0] == "off" s_cur.execute("SELECT COUNT(*) FROM test") res = s_cur.fetchone() @@ -178,128 +185,257 @@ def test_hot_standby_gc(neon_env_builder: NeonEnvBuilder, pause_apply: bool): # so we still remember the LSNs of the pages. secondary.clear_buffers(cursor=s_cur) - # fault in evicted pages now, so that this cursor/backend doesn't need to do getpages - # all of this pain is needed solely because we don't have failpoints in compute to selectively - # pause just the secondary getpage requests - s_cur2.execute("SELECT pg_backend_pid()") - log.info(f"s_cur2 is {s_cur2.fetchone()}") - s_cur2.execute("SELECT pg_last_wal_replay_lsn()") - if pause_apply: - s_cur2.execute("SELECT pg_wal_replay_pause()") + s_cur.execute("SELECT pg_wal_replay_pause()") # Do other stuff on the primary, to advance the WAL p_cur.execute("CREATE TABLE test2 AS SELECT generate_series(1, 1000000) AS g") + # Run GC. The PITR interval is very small, so this advances the GC cutoff LSN + # very close to the primary's current insert LSN. shards = tenant_get_shards(env, tenant_id, None) - - - for _, pageserver in shards: + for tenant_shard_id, pageserver in shards: client = pageserver.http_client() - client.configure_failpoints( - ( - "pagestream_read_message:before_gc_cutoff_check", - f"pause", - ) - ) + client.timeline_checkpoint(tenant_shard_id, timeline_id) + client.timeline_compact(tenant_shard_id, timeline_id) + client.timeline_gc(tenant_shard_id, timeline_id, 0) - with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: + # Re-execute the query. The GetPage requests that this + # generates use old not_modified_since LSNs, older than + # the GC cutoff, but new request LSNs. (In protocol + # version 1 there was only one LSN, and this failed.) + log_replica_lag(primary, secondary) + s_cur.execute("SELECT COUNT(*) FROM test") + log_replica_lag(primary, secondary) + res = s_cur.fetchone() + assert res == (10000,) - # - # kick off a bunch of getpage requests at the current apply_lsn - # + if pause_apply: + s_cur.execute("SELECT pg_wal_replay_resume()") - def make_replica_send_getpage(): - s_cur.execute("SELECT COUNT(*) FROM test") - res = s_cur.fetchone() - assert res == (10000,) + wait_replica_caughtup(primary, secondary) - task1 = executor.submit(make_replica_send_getpage) - - # - # wait until the requests have hit the failpoint, which is - # very early, before capturing gc cutoff rcu read; so, gc - # cutoff isn't held back by these requests; think of it - # as a network delay - # - time.sleep(5) - - # - # advance apply_lsn by resuming replay - # - def replay_lsn(): - s_cur2.execute("SELECT pg_last_wal_replay_lsn()") - [replay_lsn] = s_cur2.fetchone() - return Lsn(replay_lsn) - submitted_lsn = replay_lsn() - log.info(f"submitted getpages with request_lsn={submitted_lsn}") - log.info("resuming wal replay") - s_cur2.execute("SELECT pg_wal_replay_resume()") - log.info("waiting for secondary to catch up") + # Wait for PS's view of standby horizon to catch up. + # (When we switch to leases (LKB-88) we need to change this to watch the lease lsn move.) + # (TODO: instead of checking impl details here, somehow assert that gc can delete layers now. + # Tricky to do that without flakiness though.) + # We already waited for replica to catch up, so, this timeout is strictly on + # a few few in-memory only RPCs to propagate standby_horizon. + timeout_secs = 10 + started_at = time.time() + shards = tenant_get_shards(env, tenant_id, None) + for tenant_shard_id, pageserver in shards: + client = pageserver.http_client() while True: - current_replay_lsn = replay_lsn() - log.info(f"{current_replay_lsn=} {submitted_lsn}") - if current_replay_lsn > submitted_lsn + 8192: - break - time.sleep(1) - log.info("pausing wal replay") - s_cur2.execute("SELECT pg_wal_replay_pause()") - - # - # secondary now is at a higher apply_lsn - # wait for it to propagate into standby_horizon - # - for tenant_shard_id, pageserver in shards: - client = pageserver.http_client() - # wait for standby horizon to catch up - while True: - metrics = client.get_metrics() - sample = metrics.query_one("pageserver_standby_horizon", {"tenant_id": str(tenant_shard_id.tenant_id), "shard_id": str(tenant_shard_id.shard_index), "timeline_id": str(timeline_id)}) - current_standby_horizon = Lsn(int(sample.value)) - log.info(f"{current_standby_horizon}") - current_replay_lsn = replay_lsn() - log.info(f"{current_standby_horizon=} {current_replay_lsn=}") - if current_standby_horizon == current_replay_lsn: - break - time.sleep(1) - - # - # now trigger gc; it will cutoff at standby_horizon, i.e., - # at the advanced apply_lsn, above the delayed requests' request_lsn - # - - log.info("do gc") - for tenant_shard_id, pageserver in shards: - client.timeline_checkpoint(tenant_shard_id, timeline_id) - client.timeline_compact(tenant_shard_id, timeline_id) - gc_status = client.timeline_gc(tenant_shard_id, timeline_id, 0) - log.info(f"{gc_status=}") - client.timeline_compact(tenant_shard_id, timeline_id, enhanced_gc_bottom_most_compaction=True) - gc_status = client.timeline_gc(tenant_shard_id, timeline_id, 0) - log.info(f"{gc_status=}") - detail = client.timeline_detail(tenant_shard_id, timeline_id) - log.info(f"{detail=}") - assert Lsn(detail["applied_gc_cutoff_lsn"]) == current_standby_horizon - - # - # unblock the requests that were delayed - # until we fix the bug, they will fail because their request_lsn is below standby horizon - # - log.info("unblock requests") - for _, pageserver in shards: - client = pageserver.http_client() - client.configure_failpoints( - ( - "pagestream_read_message:before_gc_cutoff_check", - f"off", + secondary_apply_lsn = Lsn( + secondary.safe_psql_scalar( + "SELECT pg_last_wal_replay_lsn()", log_query=False ) ) - log.info("waiting for select to complete") - # until the bug is fixed, the blocked getpage requests will fail with the error below - expect_fail = f"requested at {submitted_lsn} gc cutoff {current_standby_horizon}" - log.info(f"until the bug is fixed, we expect task1 to fail with a postgres IO error because of failed getpage, witht he following messages: {expect_fail}") - task1.result() - log.info("the delayed requests completed without errors, wohoo, the bug is fixed") + standby_horizon_metric = client.get_metrics().query_one( + "pageserver_standby_horizon", + { + "tenant_id": str(tenant_shard_id.tenant_id), + "shard_id": str(tenant_shard_id.shard_index), + "timeline_id": str(timeline_id), + }, + ) + standby_horizon_at_ps = Lsn(int(standby_horizon_metric.value)) + log.info(f"{tenant_shard_id.shard_index=}: {standby_horizon_at_ps=} {secondary_apply_lsn=}") + if secondary_apply_lsn == standby_horizon_at_ps: + break + if time.time() - started_at > timeout_secs: + pytest.fail(f"standby_horizon didn't propagate within {timeout_secs=}, this is holding up gc on secondary") + time.sleep(1) + + +def test_hot_standby_gc_with_inflight_requests(neon_env_builder: NeonEnvBuilder): + tenant_conf = { + # set PITR interval to be small, so we can do GC + "pitr_interval": "0 s", + # this test is largely about PS GC behavior, we control it manually + "gc_period": "0s", + "compaction_period": "0s", + } + env = neon_env_builder.init_start(initial_tenant_conf=tenant_conf) + timeline_id = env.initial_timeline + tenant_id = env.initial_tenant + + with contextlib.ExitStack() as stack: + executor = stack.enter_context(concurrent.futures.ThreadPoolExecutor(max_workers=2)) + primary = stack.enter_context(env.endpoints.create_start( branch_name="main", endpoint_id="primary")) + secondary = stack.enter_context(env.endpoints.new_replica_start( + origin=primary, + endpoint_id="secondary", + # Protocol version 2 was introduced to fix the issue + # that this test exercises. With protocol version 1 it + # fails. + config_lines=["neon.protocol_version=2"], + )) + + + p_cur = primary.connect().cursor() + s_cur = secondary.connect().cursor() + s_cur2 = secondary.connect().cursor() + + p_cur.execute("CREATE EXTENSION neon_test_utils") + p_cur.execute("CREATE TABLE test (id int primary key) WITH (autovacuum_enabled=false)") + + # helper + def replay_lsn(): + s_cur2.execute("SELECT pg_last_wal_replay_lsn()") + [replay_lsn] = s_cur2.fetchone() + return Lsn(replay_lsn) + + # + # create initial set of data the standby can query + # + p_cur.execute("INSERT INTO test SELECT generate_series(1, 10000) AS g") + + # + # wait until standby is caught up + # + wait_replica_caughtup(primary, secondary) + s_cur.execute("SELECT COUNT(*) FROM test") + res = s_cur.fetchone() + assert res == (10000,) + + # + # clear standby cache so that the select we + # do in make_replica_send_getpage() below actually + # sends getpage requests. + # + secondary.clear_buffers(cursor=s_cur) + + # + # Even a simple SELECT pg_last_wal_replay_lsn() needs to read some pages. + # Fault them in now so that s_cur2 will be unaffected by + # the pausing of getpage request handling below. + # (This was quite tricky to debug; if something breaks in the future and + # you need to debug this test, try printing backend IDs and correlating them + # with "slow getpage" logs in compute.log) + # (All of this pain would go away if we could scope the failpoint to + # individual backends' getpage requests. + # TODO: add such capability in this PR, it's not much work and the failpoint + # is too high overhead anyway. + # + s_cur2.execute("SELECT pg_last_wal_replay_lsn()") + + # + # Stop wal replay on secondary. + # + s_cur2.execute("SELECT pg_wal_replay_pause()") + + # + # Advance lsn on primary, we'll use it later to advance lsn on secondary + # when we resume replay on secondary. + # + p_cur.execute("CREATE TABLE test2 AS SELECT generate_series(1, 1000000) AS g") + + # + # Block any more getpage requests really early, before we get the RCU read + # for applied_gc_cutoff_lsn. + # This simulates delayed requests. + # + shards = tenant_get_shards(env, tenant_id, None) + for _, pageserver in shards: + client = pageserver.http_client() + client.configure_failpoints( + ( + "pagestream_read_message:before_gc_cutoff_check", + f"pause", + ) + ) + + # + # on the secondary, kick off a bunch of getpage requests + # at the current apply_lsn; they will hit the failpoint, + # simulating that they're stuck somewhere in the network + # + def make_replica_send_getpage(): + s_cur.execute("SELECT COUNT(*) FROM test") + res = s_cur.fetchone() + assert res == (10000,) + + getpage_requests_task = executor.submit(make_replica_send_getpage) + + # + # wait until the requests have hit the failpoint + # TODO: use something not timing dependent here + # + time.sleep(5) + + # + # advance apply_lsn by resuming replay temporarily + # + submitted_lsn = replay_lsn() + log.info(f"submitted getpages with request_lsn={submitted_lsn}") + log.info("resuming wal replay") + s_cur2.execute("SELECT pg_wal_replay_resume()") + log.info("waiting for secondary to catch up") + while True: + current_replay_lsn = replay_lsn() + log.info(f"{current_replay_lsn=} {submitted_lsn}") + if current_replay_lsn > submitted_lsn + 8192: + break + time.sleep(1) + log.info("pausing wal replay") + s_cur2.execute("SELECT pg_wal_replay_pause()") + + # + # secondary now is at a higher apply_lsn + # wait for it to propagate into standby_horizon + # + for tenant_shard_id, pageserver in shards: + client = pageserver.http_client() + # wait for standby horizon to catch up + while True: + metrics = client.get_metrics() + sample = metrics.query_one("pageserver_standby_horizon", {"tenant_id": str(tenant_shard_id.tenant_id), "shard_id": str(tenant_shard_id.shard_index), "timeline_id": str(timeline_id)}) + current_standby_horizon = Lsn(int(sample.value)) + log.info(f"{current_standby_horizon}") + current_replay_lsn = replay_lsn() + log.info(f"{current_standby_horizon=} {current_replay_lsn=}") + if current_standby_horizon == current_replay_lsn: + break + time.sleep(1) + + # + # now trigger gc; it will cutoff at standby_horizon, i.e., + # at the advanced apply_lsn, above the delayed requests' request_lsn + # + log.info("do gc") + for tenant_shard_id, pageserver in shards: + client.timeline_checkpoint(tenant_shard_id, timeline_id) + client.timeline_compact(tenant_shard_id, timeline_id) + gc_status = client.timeline_gc(tenant_shard_id, timeline_id, 0) + log.info(f"{gc_status=}") + client.timeline_compact(tenant_shard_id, timeline_id, enhanced_gc_bottom_most_compaction=True) + gc_status = client.timeline_gc(tenant_shard_id, timeline_id, 0) + log.info(f"{gc_status=}") + detail = client.timeline_detail(tenant_shard_id, timeline_id) + log.info(f"{detail=}") + assert Lsn(detail["applied_gc_cutoff_lsn"]) == current_standby_horizon + + # + # unblock the requests that were delayed + # until we fix the bug, they will fail because their request_lsn is below standby horizon + # + log.info("unblock requests") + for _, pageserver in shards: + client = pageserver.http_client() + client.configure_failpoints( + ( + "pagestream_read_message:before_gc_cutoff_check", + f"off", + ) + ) + log.info("waiting for select to complete") + # until the bug is fixed, the blocked getpage requests will fail with the error below + expect_fail = f"requested at {submitted_lsn} gc cutoff {current_standby_horizon}" + log.info(f"until the bug is fixed, we expect task1 to fail with a postgres IO error because of failed getpage, witht he following messages: {expect_fail}") + getpage_requests_task.result() + log.info("the delayed requests completed without errors, wohoo, the bug is fixed") def run_pgbench(connstr: str, pg_bin: PgBin):