From 17672c88ff1074722df36d543f34696c9c291732 Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Tue, 1 Oct 2024 20:54:00 +0300 Subject: [PATCH] tests: wait walreceiver on sks to be gone on 'immediate' ep restart. (#9099) When endpoint is stopped in immediate mode and started again there is a chance of old connection delivering some WAL to safekeepers after second start checked need for sync-safekeepers and thus grabbed basebackup LSN. It makes basebackup unusable, so compute panics. Avoid flakiness by waiting for walreceivers on safekeepers to be gone in such cases. A better way would be to bump term on safekeepers if sync-safekeepers is skipped, but it needs more infrastructure. ref https://github.com/neondatabase/neon/issues/9079 --- test_runner/fixtures/neon_fixtures.py | 31 +++++++++++++++---- test_runner/fixtures/safekeeper/utils.py | 17 +++++++--- test_runner/regress/test_next_xid.py | 4 ++- test_runner/regress/test_replica_start.py | 6 ++-- .../regress/test_subscriber_restart.py | 4 +-- test_runner/regress/test_vm_bits.py | 2 +- test_runner/regress/test_wal_acceptor.py | 10 +++--- 7 files changed, 52 insertions(+), 22 deletions(-) diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index f5019e39dc..6a53a34bc9 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -20,7 +20,7 @@ from dataclasses import dataclass from datetime import datetime from enum import Enum from fcntl import LOCK_EX, LOCK_UN, flock -from functools import cached_property, partial +from functools import cached_property from itertools import chain, product from pathlib import Path from types import TracebackType @@ -86,7 +86,7 @@ from fixtures.remote_storage import ( remote_storage_to_toml_dict, ) from fixtures.safekeeper.http import SafekeeperHttpClient -from fixtures.safekeeper.utils import are_walreceivers_absent +from fixtures.safekeeper.utils import wait_walreceivers_absent from fixtures.utils import ( ATTACHMENT_NAME_REGEX, allure_add_grafana_links, @@ -4100,12 +4100,26 @@ class Endpoint(PgProtocol, LogUtils): with open(remote_extensions_spec_path, "w") as file: json.dump(spec, file, indent=4) - def stop(self, mode: str = "fast") -> "Endpoint": + def stop( + self, + mode: str = "fast", + sks_wait_walreceiver_gone: Optional[tuple[List[Safekeeper], TimelineId]] = None, + ) -> "Endpoint": """ Stop the Postgres instance if it's running. - Because test teardown might try and stop an endpoint concurrently with test code - stopping the endpoint, this method is thread safe + Because test teardown might try and stop an endpoint concurrently with + test code stopping the endpoint, this method is thread safe + + If sks_wait_walreceiever_gone is not None, wait for the safekeepers in + this list to have no walreceivers, i.e. compute endpoint connection be + gone. When endpoint is stopped in immediate mode and started again this + avoids race of old connection delivering some data after + sync-safekeepers check, which makes basebackup unusable. TimelineId is + needed because endpoint doesn't know it. + + A better solution would be bump term when sync-safekeepers is skipped on + start, see #9079. Returns self. """ @@ -4117,6 +4131,11 @@ class Endpoint(PgProtocol, LogUtils): self.endpoint_id, check_return_code=self.check_stop_result, mode=mode ) + if sks_wait_walreceiver_gone is not None: + for sk in sks_wait_walreceiver_gone[0]: + cli = sk.http_client() + wait_walreceivers_absent(cli, self.tenant_id, sks_wait_walreceiver_gone[1]) + return self def stop_and_destroy(self, mode: str = "immediate") -> "Endpoint": @@ -5209,7 +5228,7 @@ def flush_ep_to_pageserver( for sk in env.safekeepers: cli = sk.http_client() # wait until compute connections are gone - wait_until(30, 0.5, partial(are_walreceivers_absent, cli, tenant, timeline)) + wait_walreceivers_absent(cli, tenant, timeline) commit_lsn = max(cli.get_commit_lsn(tenant, timeline), commit_lsn) # Note: depending on WAL filtering implementation, probably most shards diff --git a/test_runner/fixtures/safekeeper/utils.py b/test_runner/fixtures/safekeeper/utils.py index 0e4b5d7883..2a081c6ccb 100644 --- a/test_runner/fixtures/safekeeper/utils.py +++ b/test_runner/fixtures/safekeeper/utils.py @@ -1,11 +1,20 @@ from fixtures.common_types import TenantId, TimelineId from fixtures.log_helper import log from fixtures.safekeeper.http import SafekeeperHttpClient +from fixtures.utils import wait_until -def are_walreceivers_absent( +def wait_walreceivers_absent( sk_http_cli: SafekeeperHttpClient, tenant_id: TenantId, timeline_id: TimelineId ): - status = sk_http_cli.timeline_status(tenant_id, timeline_id) - log.info(f"waiting for walreceivers to be gone, currently {status.walreceivers}") - return len(status.walreceivers) == 0 + """ + Wait until there is no walreceiver connections from the compute(s) on the + safekeeper. + """ + + def walreceivers_absent(): + status = sk_http_cli.timeline_status(tenant_id, timeline_id) + log.info(f"waiting for walreceivers to be gone, currently {status.walreceivers}") + assert len(status.walreceivers) == 0 + + wait_until(30, 0.5, walreceivers_absent) diff --git a/test_runner/regress/test_next_xid.py b/test_runner/regress/test_next_xid.py index 51e847135e..cac74492d7 100644 --- a/test_runner/regress/test_next_xid.py +++ b/test_runner/regress/test_next_xid.py @@ -435,7 +435,9 @@ $$; # Wait until pageserver has received all the data, and restart the endpoint wait_for_wal_insert_lsn(env, endpoint, tenant_id, timeline_id) - endpoint.stop(mode="immediate") # 'immediate' to avoid writing shutdown checkpoint + endpoint.stop( + mode="immediate", sks_wait_walreceiver_gone=(env.safekeepers, timeline_id) + ) # 'immediate' to avoid writing shutdown checkpoint endpoint.start() # Check that the next-multixid value wrapped around correctly diff --git a/test_runner/regress/test_replica_start.py b/test_runner/regress/test_replica_start.py index 0d95109d6b..d5e92b92d1 100644 --- a/test_runner/regress/test_replica_start.py +++ b/test_runner/regress/test_replica_start.py @@ -103,6 +103,7 @@ def test_replica_start_scan_clog_crashed_xids(neon_simple_env: NeonEnv): # Initialize the primary, a test table, and a helper function to create lots # of subtransactions. env = neon_simple_env + timeline_id = env.initial_timeline primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary") primary_conn = primary.connect() primary_cur = primary_conn.cursor() @@ -114,7 +115,7 @@ def test_replica_start_scan_clog_crashed_xids(neon_simple_env: NeonEnv): # chance to write abort records for them. primary_cur.execute("begin") primary_cur.execute("select create_subxacts(100000)") - primary.stop(mode="immediate") + primary.stop(mode="immediate", sks_wait_walreceiver_gone=(env.safekeepers, timeline_id)) # Restart the primary. Do some light work, and shut it down cleanly primary.start() @@ -659,6 +660,7 @@ def test_replica_start_with_too_many_unused_xids(neon_simple_env: NeonEnv): # Initialize the primary and a test table env = neon_simple_env + timeline_id = env.initial_timeline primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary") with primary.cursor() as primary_cur: primary_cur.execute("create table t(pk serial primary key, payload integer)") @@ -667,7 +669,7 @@ def test_replica_start_with_too_many_unused_xids(neon_simple_env: NeonEnv): with primary.cursor() as primary_cur: primary_cur.execute("insert into t (payload) values (0)") # restart primary - primary.stop("immediate") + primary.stop("immediate", sks_wait_walreceiver_gone=(env.safekeepers, timeline_id)) primary.start() # Wait for the WAL to be flushed diff --git a/test_runner/regress/test_subscriber_restart.py b/test_runner/regress/test_subscriber_restart.py index 91caad7220..647a2e6b14 100644 --- a/test_runner/regress/test_subscriber_restart.py +++ b/test_runner/regress/test_subscriber_restart.py @@ -13,7 +13,7 @@ def test_subscriber_restart(neon_simple_env: NeonEnv): pub = env.endpoints.create("publisher") pub.start() - env.neon_cli.create_branch("subscriber") + sub_timeline_id = env.neon_cli.create_branch("subscriber") sub = env.endpoints.create("subscriber") sub.start() @@ -47,7 +47,7 @@ def test_subscriber_restart(neon_simple_env: NeonEnv): for _ in range(n_restarts): # restart subscriber # time.sleep(2) - sub.stop("immediate") + sub.stop("immediate", sks_wait_walreceiver_gone=(env.safekeepers, sub_timeline_id)) sub.start() thread.join() diff --git a/test_runner/regress/test_vm_bits.py b/test_runner/regress/test_vm_bits.py index 3075211ada..ae1b6fdab3 100644 --- a/test_runner/regress/test_vm_bits.py +++ b/test_runner/regress/test_vm_bits.py @@ -247,7 +247,7 @@ def test_vm_bit_clear_on_heap_lock_blackbox(neon_env_builder: NeonEnvBuilder): # in a "clean" way. Our neon extension will write a full-page image of the VM # page, and we want to avoid that. A clean shutdown will also not do, for the # same reason. - endpoint.stop(mode="immediate") + endpoint.stop(mode="immediate", sks_wait_walreceiver_gone=(env.safekeepers, timeline_id)) endpoint.start() pg_conn = endpoint.connect() diff --git a/test_runner/regress/test_wal_acceptor.py b/test_runner/regress/test_wal_acceptor.py index c75235a04b..25c66c3cae 100644 --- a/test_runner/regress/test_wal_acceptor.py +++ b/test_runner/regress/test_wal_acceptor.py @@ -47,7 +47,7 @@ from fixtures.remote_storage import ( s3_storage, ) from fixtures.safekeeper.http import SafekeeperHttpClient -from fixtures.safekeeper.utils import are_walreceivers_absent +from fixtures.safekeeper.utils import wait_walreceivers_absent from fixtures.utils import ( PropagatingThread, get_dir_size, @@ -1061,6 +1061,7 @@ def test_restart_endpoint(neon_env_builder: NeonEnvBuilder): # https://github.com/neondatabase/neon/issues/8911 def test_restart_endpoint_after_switch_wal(neon_env_builder: NeonEnvBuilder): env = neon_env_builder.init_start() + timeline_id = env.initial_timeline endpoint = env.endpoints.create_start("main") @@ -1070,7 +1071,7 @@ def test_restart_endpoint_after_switch_wal(neon_env_builder: NeonEnvBuilder): # we want immediate shutdown to have endpoint restart on xlog switch record, # so prevent shutdown checkpoint. - endpoint.stop(mode="immediate") + endpoint.stop(mode="immediate", sks_wait_walreceiver_gone=(env.safekeepers, timeline_id)) endpoint = env.endpoints.create_start("main") endpoint.safe_psql("SELECT 'works'") @@ -1222,10 +1223,7 @@ def wait_flush_lsn_align_by_ep(env, branch, tenant_id, timeline_id, ep, sks): # Even if there is no compute, there might be some in flight data; ensure # all walreceivers die before rechecking. for sk_http_cli in sk_http_clis: - wait( - partial(are_walreceivers_absent, sk_http_cli, tenant_id, timeline_id), - "walreceivers to be gone", - ) + wait_walreceivers_absent(sk_http_cli, tenant_id, timeline_id) # Now recheck again flush_lsn and exit if it is good if is_flush_lsn_aligned(sk_http_clis, tenant_id, timeline_id): return