mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-08 05:52:55 +00:00
Improves `wait_until` by: * Use `timeout` instead of `iterations`. This allows changing the timeout/interval parameters independently. * Make `timeout` and `interval` optional (default 20s and 0.5s). Most callers don't care. * Only output status every 1s by default, and add optional `status_interval` parameter. * Remove `show_intermediate_error`, this was always emitted anyway. Most callers have been updated to use the defaults, except where they had good reason otherwise.
152 lines
6.5 KiB
Python
152 lines
6.5 KiB
Python
from __future__ import annotations
|
|
|
|
import os
|
|
from typing import TYPE_CHECKING
|
|
|
|
from fixtures.common_types import Lsn, TenantId
|
|
from fixtures.log_helper import log
|
|
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder
|
|
from fixtures.utils import wait_until
|
|
|
|
if TYPE_CHECKING:
|
|
from typing import Any
|
|
|
|
|
|
# Checks that pageserver's walreceiver state is printed in the logs during WAL wait timeout.
|
|
# Ensures that walreceiver does not run without any data inserted and only starts after the insertion.
|
|
def test_pageserver_lsn_wait_error_start(neon_env_builder: NeonEnvBuilder):
|
|
# Trigger WAL wait timeout faster
|
|
neon_env_builder.pageserver_config_override = "wait_lsn_timeout = '1s'"
|
|
env = neon_env_builder.init_start()
|
|
env.pageserver.http_client()
|
|
|
|
# In this test we force 'Timed out while waiting for WAL record error' while
|
|
# fetching basebackup and don't want any retries.
|
|
os.environ["NEON_COMPUTE_TESTING_BASEBACKUP_RETRIES"] = "1"
|
|
|
|
tenant_id, timeline_id = env.create_tenant()
|
|
expected_timeout_error = f"Timed out while waiting for WAL record at LSN {future_lsn} to arrive"
|
|
env.pageserver.allowed_errors.append(f".*{expected_timeout_error}.*")
|
|
|
|
try:
|
|
trigger_wait_lsn_timeout(env, tenant_id)
|
|
except Exception as e:
|
|
exception_string = str(e)
|
|
assert expected_timeout_error in exception_string, "Should time out during waiting for WAL"
|
|
assert (
|
|
"WalReceiver status: Not active" in exception_string
|
|
), "Walreceiver should not be active before any data writes"
|
|
|
|
insert_test_elements(env, tenant_id, start=0, count=1_000)
|
|
try:
|
|
trigger_wait_lsn_timeout(env, tenant_id)
|
|
except Exception as e:
|
|
exception_string = str(e)
|
|
assert expected_timeout_error in exception_string, "Should time out during waiting for WAL"
|
|
assert (
|
|
"WalReceiver status: Not active" not in exception_string
|
|
), "Should not be inactive anymore after INSERTs are made"
|
|
assert "WalReceiver status" in exception_string, "But still should have some other status"
|
|
|
|
|
|
# Checks that all active safekeepers are shown in pageserver's walreceiver state printed on WAL wait timeout.
|
|
# Kills one of the safekeepers and ensures that only the active ones are printed in the state.
|
|
def test_pageserver_lsn_wait_error_safekeeper_stop(neon_env_builder: NeonEnvBuilder):
|
|
# Trigger WAL wait timeout faster
|
|
def customize_pageserver_toml(ps_cfg: dict[str, Any]):
|
|
ps_cfg["wait_lsn_timeout"] = "2s"
|
|
tenant_config = ps_cfg.setdefault("tenant_config", {})
|
|
tenant_config["walreceiver_connect_timeout"] = "2s"
|
|
tenant_config["lagging_wal_timeout"] = "2s"
|
|
|
|
# In this test we force 'Timed out while waiting for WAL record error' while
|
|
# fetching basebackup and don't want any retries.
|
|
os.environ["NEON_COMPUTE_TESTING_BASEBACKUP_RETRIES"] = "1"
|
|
neon_env_builder.pageserver_config_override = customize_pageserver_toml
|
|
|
|
# Have notable SK ids to ensure we check logs for their presence, not some other random numbers
|
|
neon_env_builder.safekeepers_id_start = 12345
|
|
neon_env_builder.num_safekeepers = 3
|
|
env = neon_env_builder.init_start()
|
|
env.pageserver.http_client()
|
|
|
|
tenant_id, timeline_id = env.create_tenant()
|
|
|
|
expected_timeout_error = f"Timed out while waiting for WAL record at LSN {future_lsn} to arrive"
|
|
env.pageserver.allowed_errors.append(f".*{expected_timeout_error}.*")
|
|
# we configure wait_lsn_timeout to a shorter value than the lagging_wal_timeout / walreceiver_connect_timeout
|
|
# => after we run into a timeout and reconnect to a different SK, more time than wait_lsn_timeout has passed
|
|
# ==> we log this error
|
|
env.pageserver.allowed_errors.append(
|
|
".*ingesting record with timestamp lagging more than wait_lsn_timeout.*"
|
|
)
|
|
|
|
insert_test_elements(env, tenant_id, start=0, count=1)
|
|
|
|
def all_sks_in_wareceiver_state():
|
|
try:
|
|
trigger_wait_lsn_timeout(env, tenant_id)
|
|
except Exception as e:
|
|
exception_string = str(e)
|
|
assert (
|
|
expected_timeout_error in exception_string
|
|
), "Should time out during waiting for WAL"
|
|
|
|
for safekeeper in env.safekeepers:
|
|
assert (
|
|
str(safekeeper.id) in exception_string
|
|
), f"Should have safekeeper {safekeeper.id} printed in walreceiver state after WAL wait timeout"
|
|
|
|
wait_until(all_sks_in_wareceiver_state, timeout=30)
|
|
|
|
stopped_safekeeper = env.safekeepers[-1]
|
|
stopped_safekeeper_id = stopped_safekeeper.id
|
|
log.info(f"Stopping safekeeper {stopped_safekeeper.id}")
|
|
stopped_safekeeper.stop()
|
|
|
|
def all_but_stopped_sks_in_wareceiver_state():
|
|
try:
|
|
trigger_wait_lsn_timeout(env, tenant_id)
|
|
except Exception as e:
|
|
# Strip out the part before stdout, as it contains full command with the list of all safekeepers
|
|
exception_string = str(e).split("stdout", 1)[-1]
|
|
assert (
|
|
expected_timeout_error in exception_string
|
|
), "Should time out during waiting for WAL"
|
|
|
|
for safekeeper in env.safekeepers:
|
|
if safekeeper.id == stopped_safekeeper_id:
|
|
assert (
|
|
str(safekeeper.id) not in exception_string
|
|
), f"Should not have stopped safekeeper {safekeeper.id} printed in walreceiver state after 2nd WAL wait timeout"
|
|
else:
|
|
assert (
|
|
str(safekeeper.id) in exception_string
|
|
), f"Should have safekeeper {safekeeper.id} printed in walreceiver state after 2nd WAL wait timeout"
|
|
|
|
wait_until(all_but_stopped_sks_in_wareceiver_state, timeout=30)
|
|
|
|
|
|
def insert_test_elements(env: NeonEnv, tenant_id: TenantId, start: int, count: int):
|
|
first_element_id = start
|
|
last_element_id = first_element_id + count
|
|
with env.endpoints.create_start("main", tenant_id=tenant_id) as endpoint:
|
|
with endpoint.cursor() as cur:
|
|
cur.execute("CREATE TABLE IF NOT EXISTS t(key serial primary key, value text)")
|
|
cur.execute(
|
|
f"INSERT INTO t SELECT i, CONCAT('payload_', i) FROM generate_series({first_element_id},{last_element_id}) as i"
|
|
)
|
|
|
|
|
|
future_lsn = Lsn("0/FFFFFFFF")
|
|
|
|
|
|
def trigger_wait_lsn_timeout(env: NeonEnv, tenant_id: TenantId):
|
|
with env.endpoints.create_start(
|
|
"main",
|
|
tenant_id=tenant_id,
|
|
lsn=future_lsn,
|
|
) as endpoint:
|
|
with endpoint.cursor() as cur:
|
|
cur.execute("SELECT 1")
|