tests: wait walreceiver on sks to be gone on 'immediate' ep restart. (#9099)

When endpoint is stopped in immediate mode and started again there is a
chance of old connection delivering some WAL to safekeepers after second
start checked need for sync-safekeepers and thus grabbed basebackup LSN.
It makes basebackup unusable, so compute panics. Avoid flakiness by
waiting for walreceivers on safekeepers to be gone in such cases. A
better way would be to bump term on safekeepers if sync-safekeepers is
skipped, but it needs more infrastructure.

ref https://github.com/neondatabase/neon/issues/9079
This commit is contained in:
Arseny Sher
2024-10-01 20:54:00 +03:00
committed by GitHub
parent 6efdb1d0f3
commit 17672c88ff
7 changed files with 52 additions and 22 deletions

View File

@@ -20,7 +20,7 @@ from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from fcntl import LOCK_EX, LOCK_UN, flock
from functools import cached_property, partial
from functools import cached_property
from itertools import chain, product
from pathlib import Path
from types import TracebackType
@@ -86,7 +86,7 @@ from fixtures.remote_storage import (
remote_storage_to_toml_dict,
)
from fixtures.safekeeper.http import SafekeeperHttpClient
from fixtures.safekeeper.utils import are_walreceivers_absent
from fixtures.safekeeper.utils import wait_walreceivers_absent
from fixtures.utils import (
ATTACHMENT_NAME_REGEX,
allure_add_grafana_links,
@@ -4100,12 +4100,26 @@ class Endpoint(PgProtocol, LogUtils):
with open(remote_extensions_spec_path, "w") as file:
json.dump(spec, file, indent=4)
def stop(self, mode: str = "fast") -> "Endpoint":
def stop(
self,
mode: str = "fast",
sks_wait_walreceiver_gone: Optional[tuple[List[Safekeeper], TimelineId]] = None,
) -> "Endpoint":
"""
Stop the Postgres instance if it's running.
Because test teardown might try and stop an endpoint concurrently with test code
stopping the endpoint, this method is thread safe
Because test teardown might try and stop an endpoint concurrently with
test code stopping the endpoint, this method is thread safe
If sks_wait_walreceiever_gone is not None, wait for the safekeepers in
this list to have no walreceivers, i.e. compute endpoint connection be
gone. When endpoint is stopped in immediate mode and started again this
avoids race of old connection delivering some data after
sync-safekeepers check, which makes basebackup unusable. TimelineId is
needed because endpoint doesn't know it.
A better solution would be bump term when sync-safekeepers is skipped on
start, see #9079.
Returns self.
"""
@@ -4117,6 +4131,11 @@ class Endpoint(PgProtocol, LogUtils):
self.endpoint_id, check_return_code=self.check_stop_result, mode=mode
)
if sks_wait_walreceiver_gone is not None:
for sk in sks_wait_walreceiver_gone[0]:
cli = sk.http_client()
wait_walreceivers_absent(cli, self.tenant_id, sks_wait_walreceiver_gone[1])
return self
def stop_and_destroy(self, mode: str = "immediate") -> "Endpoint":
@@ -5209,7 +5228,7 @@ def flush_ep_to_pageserver(
for sk in env.safekeepers:
cli = sk.http_client()
# wait until compute connections are gone
wait_until(30, 0.5, partial(are_walreceivers_absent, cli, tenant, timeline))
wait_walreceivers_absent(cli, tenant, timeline)
commit_lsn = max(cli.get_commit_lsn(tenant, timeline), commit_lsn)
# Note: depending on WAL filtering implementation, probably most shards

View File

@@ -1,11 +1,20 @@
from fixtures.common_types import TenantId, TimelineId
from fixtures.log_helper import log
from fixtures.safekeeper.http import SafekeeperHttpClient
from fixtures.utils import wait_until
def are_walreceivers_absent(
def wait_walreceivers_absent(
sk_http_cli: SafekeeperHttpClient, tenant_id: TenantId, timeline_id: TimelineId
):
status = sk_http_cli.timeline_status(tenant_id, timeline_id)
log.info(f"waiting for walreceivers to be gone, currently {status.walreceivers}")
return len(status.walreceivers) == 0
"""
Wait until there is no walreceiver connections from the compute(s) on the
safekeeper.
"""
def walreceivers_absent():
status = sk_http_cli.timeline_status(tenant_id, timeline_id)
log.info(f"waiting for walreceivers to be gone, currently {status.walreceivers}")
assert len(status.walreceivers) == 0
wait_until(30, 0.5, walreceivers_absent)

View File

@@ -435,7 +435,9 @@ $$;
# Wait until pageserver has received all the data, and restart the endpoint
wait_for_wal_insert_lsn(env, endpoint, tenant_id, timeline_id)
endpoint.stop(mode="immediate") # 'immediate' to avoid writing shutdown checkpoint
endpoint.stop(
mode="immediate", sks_wait_walreceiver_gone=(env.safekeepers, timeline_id)
) # 'immediate' to avoid writing shutdown checkpoint
endpoint.start()
# Check that the next-multixid value wrapped around correctly

View File

@@ -103,6 +103,7 @@ def test_replica_start_scan_clog_crashed_xids(neon_simple_env: NeonEnv):
# Initialize the primary, a test table, and a helper function to create lots
# of subtransactions.
env = neon_simple_env
timeline_id = env.initial_timeline
primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary")
primary_conn = primary.connect()
primary_cur = primary_conn.cursor()
@@ -114,7 +115,7 @@ def test_replica_start_scan_clog_crashed_xids(neon_simple_env: NeonEnv):
# chance to write abort records for them.
primary_cur.execute("begin")
primary_cur.execute("select create_subxacts(100000)")
primary.stop(mode="immediate")
primary.stop(mode="immediate", sks_wait_walreceiver_gone=(env.safekeepers, timeline_id))
# Restart the primary. Do some light work, and shut it down cleanly
primary.start()
@@ -659,6 +660,7 @@ def test_replica_start_with_too_many_unused_xids(neon_simple_env: NeonEnv):
# Initialize the primary and a test table
env = neon_simple_env
timeline_id = env.initial_timeline
primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary")
with primary.cursor() as primary_cur:
primary_cur.execute("create table t(pk serial primary key, payload integer)")
@@ -667,7 +669,7 @@ def test_replica_start_with_too_many_unused_xids(neon_simple_env: NeonEnv):
with primary.cursor() as primary_cur:
primary_cur.execute("insert into t (payload) values (0)")
# restart primary
primary.stop("immediate")
primary.stop("immediate", sks_wait_walreceiver_gone=(env.safekeepers, timeline_id))
primary.start()
# Wait for the WAL to be flushed

View File

@@ -13,7 +13,7 @@ def test_subscriber_restart(neon_simple_env: NeonEnv):
pub = env.endpoints.create("publisher")
pub.start()
env.neon_cli.create_branch("subscriber")
sub_timeline_id = env.neon_cli.create_branch("subscriber")
sub = env.endpoints.create("subscriber")
sub.start()
@@ -47,7 +47,7 @@ def test_subscriber_restart(neon_simple_env: NeonEnv):
for _ in range(n_restarts):
# restart subscriber
# time.sleep(2)
sub.stop("immediate")
sub.stop("immediate", sks_wait_walreceiver_gone=(env.safekeepers, sub_timeline_id))
sub.start()
thread.join()

View File

@@ -247,7 +247,7 @@ def test_vm_bit_clear_on_heap_lock_blackbox(neon_env_builder: NeonEnvBuilder):
# in a "clean" way. Our neon extension will write a full-page image of the VM
# page, and we want to avoid that. A clean shutdown will also not do, for the
# same reason.
endpoint.stop(mode="immediate")
endpoint.stop(mode="immediate", sks_wait_walreceiver_gone=(env.safekeepers, timeline_id))
endpoint.start()
pg_conn = endpoint.connect()

View File

@@ -47,7 +47,7 @@ from fixtures.remote_storage import (
s3_storage,
)
from fixtures.safekeeper.http import SafekeeperHttpClient
from fixtures.safekeeper.utils import are_walreceivers_absent
from fixtures.safekeeper.utils import wait_walreceivers_absent
from fixtures.utils import (
PropagatingThread,
get_dir_size,
@@ -1061,6 +1061,7 @@ def test_restart_endpoint(neon_env_builder: NeonEnvBuilder):
# https://github.com/neondatabase/neon/issues/8911
def test_restart_endpoint_after_switch_wal(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
timeline_id = env.initial_timeline
endpoint = env.endpoints.create_start("main")
@@ -1070,7 +1071,7 @@ def test_restart_endpoint_after_switch_wal(neon_env_builder: NeonEnvBuilder):
# we want immediate shutdown to have endpoint restart on xlog switch record,
# so prevent shutdown checkpoint.
endpoint.stop(mode="immediate")
endpoint.stop(mode="immediate", sks_wait_walreceiver_gone=(env.safekeepers, timeline_id))
endpoint = env.endpoints.create_start("main")
endpoint.safe_psql("SELECT 'works'")
@@ -1222,10 +1223,7 @@ def wait_flush_lsn_align_by_ep(env, branch, tenant_id, timeline_id, ep, sks):
# Even if there is no compute, there might be some in flight data; ensure
# all walreceivers die before rechecking.
for sk_http_cli in sk_http_clis:
wait(
partial(are_walreceivers_absent, sk_http_cli, tenant_id, timeline_id),
"walreceivers to be gone",
)
wait_walreceivers_absent(sk_http_cli, tenant_id, timeline_id)
# Now recheck again flush_lsn and exit if it is good
if is_flush_lsn_aligned(sk_http_clis, tenant_id, timeline_id):
return