diff --git a/test_runner/regress/test_wal_receiver.py b/test_runner/regress/test_wal_receiver.py index be2aa2b346..294f86ffa7 100644 --- a/test_runner/regress/test_wal_receiver.py +++ b/test_runner/regress/test_wal_receiver.py @@ -1,11 +1,12 @@ from __future__ import annotations -import time +import os from typing import TYPE_CHECKING from fixtures.common_types import Lsn, TenantId from fixtures.log_helper import log from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder +from fixtures.utils import wait_until if TYPE_CHECKING: from typing import Any @@ -19,6 +20,10 @@ def test_pageserver_lsn_wait_error_start(neon_env_builder: NeonEnvBuilder): env = neon_env_builder.init_start() env.pageserver.http_client() + # In this test we force 'Timed out while waiting for WAL record error' while + # fetching basebackup and don't want any retries. + os.environ["NEON_COMPUTE_TESTING_BASEBACKUP_RETRIES"] = "1" + tenant_id, timeline_id = env.create_tenant() expected_timeout_error = f"Timed out while waiting for WAL record at LSN {future_lsn} to arrive" env.pageserver.allowed_errors.append(f".*{expected_timeout_error}.*") @@ -49,11 +54,14 @@ def test_pageserver_lsn_wait_error_start(neon_env_builder: NeonEnvBuilder): def test_pageserver_lsn_wait_error_safekeeper_stop(neon_env_builder: NeonEnvBuilder): # Trigger WAL wait timeout faster def customize_pageserver_toml(ps_cfg: dict[str, Any]): - ps_cfg["wait_lsn_timeout"] = "1s" + ps_cfg["wait_lsn_timeout"] = "2s" tenant_config = ps_cfg.setdefault("tenant_config", {}) tenant_config["walreceiver_connect_timeout"] = "2s" tenant_config["lagging_wal_timeout"] = "2s" + # In this test we force 'Timed out while waiting for WAL record error' while + # fetching basebackup and don't want any retries. + os.environ["NEON_COMPUTE_TESTING_BASEBACKUP_RETRIES"] = "1" neon_env_builder.pageserver_config_override = customize_pageserver_toml # Have notable SK ids to ensure we check logs for their presence, not some other random numbers @@ -64,7 +72,6 @@ def test_pageserver_lsn_wait_error_safekeeper_stop(neon_env_builder: NeonEnvBuil tenant_id, timeline_id = env.create_tenant() - elements_to_insert = 1_000_000 expected_timeout_error = f"Timed out while waiting for WAL record at LSN {future_lsn} to arrive" env.pageserver.allowed_errors.append(f".*{expected_timeout_error}.*") # we configure wait_lsn_timeout to a shorter value than the lagging_wal_timeout / walreceiver_connect_timeout @@ -74,45 +81,50 @@ def test_pageserver_lsn_wait_error_safekeeper_stop(neon_env_builder: NeonEnvBuil ".*ingesting record with timestamp lagging more than wait_lsn_timeout.*" ) - insert_test_elements(env, tenant_id, start=0, count=elements_to_insert) + insert_test_elements(env, tenant_id, start=0, count=1) - try: - trigger_wait_lsn_timeout(env, tenant_id) - except Exception as e: - exception_string = str(e) - assert expected_timeout_error in exception_string, "Should time out during waiting for WAL" - - for safekeeper in env.safekeepers: + def all_sks_in_wareceiver_state(): + try: + trigger_wait_lsn_timeout(env, tenant_id) + except Exception as e: + exception_string = str(e) assert ( - str(safekeeper.id) in exception_string - ), f"Should have safekeeper {safekeeper.id} printed in walreceiver state after WAL wait timeout" + expected_timeout_error in exception_string + ), "Should time out during waiting for WAL" + + for safekeeper in env.safekeepers: + assert ( + str(safekeeper.id) in exception_string + ), f"Should have safekeeper {safekeeper.id} printed in walreceiver state after WAL wait timeout" + + wait_until(60, 0.5, all_sks_in_wareceiver_state) stopped_safekeeper = env.safekeepers[-1] stopped_safekeeper_id = stopped_safekeeper.id log.info(f"Stopping safekeeper {stopped_safekeeper.id}") stopped_safekeeper.stop() - # sleep until stopped safekeeper is removed from candidates - time.sleep(2) - # Spend some more time inserting, to ensure SKs report updated statuses and walreceiver in PS have time to update its connection stats. - insert_test_elements(env, tenant_id, start=elements_to_insert + 1, count=elements_to_insert) + def all_but_stopped_sks_in_wareceiver_state(): + try: + trigger_wait_lsn_timeout(env, tenant_id) + except Exception as e: + # Strip out the part before stdout, as it contains full command with the list of all safekeepers + exception_string = str(e).split("stdout", 1)[-1] + assert ( + expected_timeout_error in exception_string + ), "Should time out during waiting for WAL" - try: - trigger_wait_lsn_timeout(env, tenant_id) - except Exception as e: - # Strip out the part before stdout, as it contains full command with the list of all safekeepers - exception_string = str(e).split("stdout", 1)[-1] - assert expected_timeout_error in exception_string, "Should time out during waiting for WAL" + for safekeeper in env.safekeepers: + if safekeeper.id == stopped_safekeeper_id: + assert ( + str(safekeeper.id) not in exception_string + ), f"Should not have stopped safekeeper {safekeeper.id} printed in walreceiver state after 2nd WAL wait timeout" + else: + assert ( + str(safekeeper.id) in exception_string + ), f"Should have safekeeper {safekeeper.id} printed in walreceiver state after 2nd WAL wait timeout" - for safekeeper in env.safekeepers: - if safekeeper.id == stopped_safekeeper_id: - assert ( - str(safekeeper.id) not in exception_string - ), f"Should not have stopped safekeeper {safekeeper.id} printed in walreceiver state after 2nd WAL wait timeout" - else: - assert ( - str(safekeeper.id) in exception_string - ), f"Should have safekeeper {safekeeper.id} printed in walreceiver state after 2nd WAL wait timeout" + wait_until(60, 0.5, all_but_stopped_sks_in_wareceiver_state) def insert_test_elements(env: NeonEnv, tenant_id: TenantId, start: int, count: int):