Files
neon/test_runner/regress/test_wal_acceptor.py
Suhas Thalanki 842a5091d5 [BRC-3051] Walproposer: Safekeeper quorum health metrics (#930) (#12750)
Today we don't have any indications (other than spammy logs in PG that
nobody monitors) if the Walproposer in PG cannot connect to/get votes
from all Safekeepers. This means we don't have signals indicating that
the Safekeepers are operating at degraded redundancy. We need these
signals.

Added plumbing in PG extension so that the `neon_perf_counters` view
exports the following gauge metrics on safekeeper health:
- `num_configured_safekeepers`: The total number of safekeepers
configured in PG.
- `num_active_safekeepers`: The number of safekeepers that PG is
actively streaming WAL to.

An alert should be raised whenever `num_active_safekeepers` <
`num_configured_safekeepers`.

The metrics are implemented by adding additional state to the
Walproposer shared memory keeping track of the active statuses of
safekeepers using a simple array. The status of the safekeeper is set to
active (1) after the Walproposer acquires a quorum and starts streaming
data to the safekeeper, and is set to inactive (0) when the connection
with a safekeeper is shut down. We scan the safekeeper status array in
Walproposer shared memory when collecting the metrics to produce results
for the gauges.

Added coverage for the metrics to integration test
`test_wal_acceptor.py::test_timeline_disk_usage_limit`.

## Problem

## Summary of changes

---------

Co-authored-by: William Huang <william.huang@databricks.com>
2025-07-30 15:14:59 +00:00

2937 lines
115 KiB
Python

from __future__ import annotations
import filecmp
import logging
import os
import random
import shutil
import signal
import subprocess
import sys
import threading
import time
from contextlib import closing
from dataclasses import dataclass, field
from functools import partial
from pathlib import Path
from typing import TYPE_CHECKING
import psycopg2
import psycopg2.errors
import psycopg2.extras
import pytest
import requests
from fixtures.common_types import Lsn, TenantId, TimelineId
from fixtures.log_helper import log
from fixtures.metrics import parse_metrics
from fixtures.neon_fixtures import (
Endpoint,
NeonEnvBuilder,
PgBin,
PgProtocol,
Safekeeper,
SafekeeperPort,
last_flush_lsn_upload,
)
from fixtures.pageserver.utils import (
assert_prefix_empty,
assert_prefix_not_empty,
timeline_delete_wait_completed,
)
from fixtures.pg_version import PgVersion
from fixtures.remote_storage import (
RemoteStorageKind,
default_remote_storage,
s3_storage,
)
from fixtures.safekeeper.http import (
MembershipConfiguration,
SafekeeperHttpClient,
SafekeeperId,
TimelineCreateRequest,
)
from fixtures.safekeeper.utils import wait_walreceivers_absent
from fixtures.safekeeper_utils import (
is_flush_lsn_caught_up,
is_segment_offloaded,
is_wal_trimmed,
wait_lsn_force_checkpoint,
wait_lsn_force_checkpoint_at,
wait_lsn_force_checkpoint_at_sk,
)
from fixtures.utils import (
PropagatingThread,
query_scalar,
run_only_on_default_postgres,
skip_in_debug_build,
start_in_background,
wait_until,
)
if TYPE_CHECKING:
from typing import Any, Self
from fixtures.port_distributor import PortDistributor
@dataclass
class TimelineMetrics:
timeline_id: TimelineId
last_record_lsn: Lsn
# One entry per each Safekeeper, order is the same
flush_lsns: list[Lsn] = field(default_factory=list)
commit_lsns: list[Lsn] = field(default_factory=list)
# Run page server and multiple acceptors, and multiple compute nodes running
# against different timelines.
def test_many_timelines(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()
n_timelines = 3
branch_names = [f"test_safekeepers_many_timelines_{tlin}" for tlin in range(n_timelines)]
# pageserver, safekeeper operate timelines via their ids (can be represented in hex as 'ad50847381e248feaac9876cc71ae418')
# that's not really human readable, so the branch names are introduced in Neon CLI.
# Neon CLI stores its branch <-> timeline mapping in its internals,
# but we need this to collect metrics from other servers, related to the timeline.
branch_names_to_timeline_ids = {}
# start postgres on each timeline
endpoints = []
for branch_name in branch_names:
new_timeline_id = env.create_branch(branch_name)
endpoints.append(env.endpoints.create_start(branch_name))
branch_names_to_timeline_ids[branch_name] = new_timeline_id
tenant_id = env.initial_tenant
def collect_metrics(message: str) -> list[TimelineMetrics]:
with env.pageserver.http_client() as pageserver_http:
timeline_details = [
pageserver_http.timeline_detail(
tenant_id=tenant_id,
timeline_id=branch_names_to_timeline_ids[branch_name],
)
for branch_name in branch_names
]
# All changes visible to pageserver (last_record_lsn) should be
# confirmed by safekeepers first. As we cannot atomically get
# state of both pageserver and safekeepers, we should start with
# pageserver. Looking at outdated data from pageserver is ok.
# Asking safekeepers first is not ok because new commits may arrive
# to both safekeepers and pageserver after we've already obtained
# safekeepers' state, it will look contradictory.
sk_metrics = [sk.http_client().get_metrics() for sk in env.safekeepers]
timeline_metrics = []
for timeline_detail in timeline_details:
timeline_id = TimelineId(timeline_detail["timeline_id"])
m = TimelineMetrics(
timeline_id=timeline_id,
last_record_lsn=Lsn(timeline_detail["last_record_lsn"]),
)
for sk_m in sk_metrics:
m.flush_lsns.append(Lsn(int(sk_m.flush_lsn_inexact(tenant_id, timeline_id))))
m.commit_lsns.append(Lsn(int(sk_m.commit_lsn_inexact(tenant_id, timeline_id))))
for flush_lsn, commit_lsn in zip(m.flush_lsns, m.commit_lsns, strict=False):
# Invariant. May be < when transaction is in progress.
assert commit_lsn <= flush_lsn, (
f"timeline_id={timeline_id}, timeline_detail={timeline_detail}, sk_metrics={sk_metrics}"
)
# We only call collect_metrics() after a transaction is confirmed by
# the compute node, which only happens after a consensus of safekeepers
# has confirmed the transaction. We assume majority consensus here.
assert (
2 * sum(m.last_record_lsn <= lsn for lsn in m.flush_lsns)
> neon_env_builder.num_safekeepers
), (
f"timeline_id={timeline_id}, timeline_detail={timeline_detail}, sk_metrics={sk_metrics}"
)
assert (
2 * sum(m.last_record_lsn <= lsn for lsn in m.commit_lsns)
> neon_env_builder.num_safekeepers
), (
f"timeline_id={timeline_id}, timeline_detail={timeline_detail}, sk_metrics={sk_metrics}"
)
timeline_metrics.append(m)
log.info(f"{message}: {timeline_metrics}")
return timeline_metrics
# TODO: https://github.com/neondatabase/neon/issues/809
# collect_metrics("before CREATE TABLE")
# Do everything in different loops to have actions on different timelines
# interleaved.
# create schema
for endpoint in endpoints:
endpoint.safe_psql("CREATE TABLE t(key int primary key, value text)")
init_m = collect_metrics("after CREATE TABLE")
# Populate data for 2/3 timelines
class MetricsChecker(threading.Thread):
def __init__(self) -> None:
super().__init__(daemon=True)
self.should_stop = threading.Event()
self.exception: BaseException | None = None
def run(self) -> None:
try:
while not self.should_stop.is_set():
collect_metrics("during INSERT INTO")
time.sleep(1)
except: # noqa: E722
log.error(
"MetricsChecker's thread failed, the test will be failed on .stop() call",
exc_info=True,
)
# We want to preserve traceback as well as the exception
exc_type, exc_value, exc_tb = sys.exc_info()
assert exc_type
e = exc_type(exc_value)
e.__traceback__ = exc_tb
self.exception = e
def stop(self) -> None:
self.should_stop.set()
self.join()
if self.exception:
raise self.exception
metrics_checker = MetricsChecker()
metrics_checker.start()
for endpoint in endpoints[:-1]:
endpoint.safe_psql("INSERT INTO t SELECT generate_series(1,100000), 'payload'")
metrics_checker.stop()
collect_metrics("after INSERT INTO")
# Check data for 2/3 timelines
for endpoint in endpoints[:-1]:
res = endpoint.safe_psql("SELECT sum(key) FROM t")
assert res[0] == (5000050000,)
final_m = collect_metrics("after SELECT")
# Assume that LSNs (a) behave similarly in all timelines; and (b) INSERT INTO alters LSN significantly.
# Also assume that safekeepers will not be significantly out of sync in this test.
middle_lsn = Lsn((int(init_m[0].last_record_lsn) + int(final_m[0].last_record_lsn)) // 2)
assert max(init_m[0].flush_lsns) < middle_lsn < min(final_m[0].flush_lsns)
assert max(init_m[0].commit_lsns) < middle_lsn < min(final_m[0].commit_lsns)
assert max(init_m[1].flush_lsns) < middle_lsn < min(final_m[1].flush_lsns)
assert max(init_m[1].commit_lsns) < middle_lsn < min(final_m[1].commit_lsns)
assert max(init_m[2].flush_lsns) <= min(final_m[2].flush_lsns) < middle_lsn
assert max(init_m[2].commit_lsns) <= min(final_m[2].commit_lsns) < middle_lsn
# Test timeline_list endpoint.
http_cli = env.safekeepers[0].http_client()
assert len(http_cli.timeline_list()) == 4
# Check that dead minority doesn't prevent the commits: execute insert n_inserts
# times, with fault_probability chance of getting a wal acceptor down or up
# along the way. 2 of 3 are always alive, so the work keeps going.
def test_restarts(neon_env_builder: NeonEnvBuilder):
fault_probability = 0.01
n_inserts = 1000
n_acceptors = 3
neon_env_builder.num_safekeepers = n_acceptors
env = neon_env_builder.init_start()
env.create_branch("test_safekeepers_restarts")
endpoint = env.endpoints.create_start("test_safekeepers_restarts")
# we rely upon autocommit after each statement
# as waiting for acceptors happens there
pg_conn = endpoint.connect()
cur = pg_conn.cursor()
failed_node = None
cur.execute("CREATE TABLE t(key int primary key, value text)")
for i in range(n_inserts):
cur.execute("INSERT INTO t values (%s, 'payload');", (i + 1,))
if random.random() <= fault_probability:
if failed_node is None:
failed_node = env.safekeepers[random.randrange(0, n_acceptors)]
failed_node.stop()
else:
failed_node.start()
failed_node = None
assert query_scalar(cur, "SELECT sum(key) FROM t") == (n_inserts * (n_inserts + 1)) // 2
# Test that safekeepers push their info to the broker and learn peer status from it
def test_broker(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 3
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
env = neon_env_builder.init_start()
tenant_id = env.initial_tenant
timeline_id = env.create_branch("test_broker", ancestor_branch_name="main")
endpoint = env.endpoints.create_start("test_broker")
endpoint.safe_psql("CREATE TABLE t(key int primary key, value text)")
# wait until remote_consistent_lsn gets advanced on all safekeepers
clients = [sk.http_client() for sk in env.safekeepers]
stat_before = [cli.timeline_status(tenant_id, timeline_id) for cli in clients]
log.info(f"statuses before insert: {stat_before}")
endpoint.safe_psql("INSERT INTO t SELECT generate_series(1,100), 'payload'")
# wait for remote_consistent_lsn to reach flush_lsn, forcing it with checkpoint
new_rcl = last_flush_lsn_upload(env, endpoint, tenant_id, timeline_id)
log.info(f"new_rcl: {new_rcl}")
endpoint.stop()
# and wait till remote_consistent_lsn propagates to all safekeepers
#
# This timeout is long: safekeepers learn about remote_consistent_lsn updates when a pageserver
# connects, receives a PrimaryKeepAlive, and sends a PageserverFeedback. So the timeout has to encompass:
# - pageserver deletion_queue to validate + publish the remote_consistent_lsn
# - pageserver to reconnect to all safekeepers one by one, with multi-second delays between
#
# TODO: timeline status on safekeeper should take into account peers state as well.
rcl_propagate_secs = 60
started_at = time.time()
while True:
stat_after = [cli.timeline_status(tenant_id, timeline_id) for cli in clients]
if all([s_after.remote_consistent_lsn >= new_rcl for s_after in stat_after]):
break
elapsed = time.time() - started_at
if elapsed > rcl_propagate_secs:
raise RuntimeError(
f"timed out waiting {elapsed:.0f}s for remote_consistent_lsn propagation: status before {stat_before}, status current {stat_after}"
)
time.sleep(1)
# Ensure that safekeepers don't lose remote_consistent_lsn on restart.
for sk in env.safekeepers:
# force persist cfile
sk.http_client().checkpoint(tenant_id, timeline_id)
sk.stop()
sk.start()
stat_after_restart = [cli.timeline_status(tenant_id, timeline_id) for cli in clients]
log.info(f"statuses after {stat_after_restart}")
assert all([s.remote_consistent_lsn >= new_rcl for s in stat_after_restart])
# Test that old WAL consumed by peers and pageserver is removed from safekeepers.
@pytest.mark.parametrize("auth_enabled", [False, True])
def test_wal_removal(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
neon_env_builder.num_safekeepers = 2
# to advance remote_consistent_lsn
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
neon_env_builder.auth_enabled = auth_enabled
env = neon_env_builder.init_start()
tenant_id = env.initial_tenant
timeline_id = env.create_branch("test_safekeepers_wal_removal")
endpoint = env.endpoints.create_start("test_safekeepers_wal_removal")
# Note: it is important to insert at least two segments, as currently
# control file is synced roughly once in segment range and WAL is not
# removed until all horizons are persisted.
endpoint.safe_psql_many(
[
"CREATE TABLE t(key int primary key, value text)",
"INSERT INTO t SELECT generate_series(1,200000), 'payload'",
]
)
# force checkpoint to advance remote_consistent_lsn
pageserver_conn_options = {}
if auth_enabled:
pageserver_conn_options["password"] = env.auth_keys.generate_tenant_token(tenant_id)
wait_lsn_force_checkpoint(
tenant_id, timeline_id, endpoint, env.pageserver, pageserver_conn_options
)
# We will wait for first segment removal. Make sure they exist for starter.
first_segments = [
sk.timeline_dir(tenant_id, timeline_id) / "000000010000000000000001"
for sk in env.safekeepers
]
assert all(os.path.exists(p) for p in first_segments)
if not auth_enabled:
http_cli = env.safekeepers[0].http_client()
else:
http_cli = env.safekeepers[0].http_client(
auth_token=env.auth_keys.generate_tenant_token(tenant_id)
)
http_cli_other = env.safekeepers[0].http_client(
auth_token=env.auth_keys.generate_tenant_token(TenantId.generate())
)
http_cli_noauth = env.safekeepers[0].http_client(gen_sk_wide_token=False)
# Pretend WAL is offloaded to s3.
if auth_enabled:
old_backup_lsn = http_cli.timeline_status(
tenant_id=tenant_id, timeline_id=timeline_id
).backup_lsn
assert "FFFFFFFF/FEFFFFFF" != old_backup_lsn
for cli in [http_cli_other, http_cli_noauth]:
with pytest.raises(cli.HTTPError, match="Forbidden|Unauthorized"):
cli.record_safekeeper_info(
tenant_id, timeline_id, {"backup_lsn": "FFFFFFFF/FEFFFFFF"}
)
assert (
old_backup_lsn
== http_cli.timeline_status(tenant_id=tenant_id, timeline_id=timeline_id).backup_lsn
)
http_cli.record_safekeeper_info(tenant_id, timeline_id, {"backup_lsn": "FFFFFFFF/FEFFFFFF"})
assert (
Lsn("FFFFFFFF/FEFFFFFF")
== http_cli.timeline_status(tenant_id=tenant_id, timeline_id=timeline_id).backup_lsn
)
# wait till first segment is removed on all safekeepers
wait(
lambda first_segments=first_segments: all(not os.path.exists(p) for p in first_segments),
"first segment get removed",
wait_f=lambda http_cli=http_cli, tenant_id=tenant_id, timeline_id=timeline_id: log.info(
f"waiting for segments removal, sk info: {http_cli.timeline_status(tenant_id=tenant_id, timeline_id=timeline_id)}"
),
)
# Wait for something, defined as f() returning True, raising error if this
# doesn't happen without timeout seconds, and calling wait_f while waiting.
def wait(f, desc, timeout=30, wait_f=None):
started_at = time.time()
while True:
try:
if f():
break
except Exception as e:
log.info(f"got exception while waiting for {desc}: {e}")
pass
elapsed = time.time() - started_at
if elapsed > timeout:
raise RuntimeError(f"timed out waiting {elapsed:.0f}s for {desc}")
time.sleep(0.5)
if wait_f is not None:
wait_f()
def test_wal_backup(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 3
remote_storage_kind = s3_storage()
neon_env_builder.enable_safekeeper_remote_storage(remote_storage_kind)
env = neon_env_builder.init_start()
# These are expected after timeline deletion on safekeepers.
env.pageserver.allowed_errors.extend(
[
".*Timeline .* was not found in global map.*",
".*Timeline .* has been deleted.*",
".*Timeline .* was cancelled and cannot be used anymore.*",
]
)
tenant_id = env.initial_tenant
timeline_id = env.create_branch("test_safekeepers_wal_backup")
endpoint = env.endpoints.create_start("test_safekeepers_wal_backup")
pg_conn = endpoint.connect()
cur = pg_conn.cursor()
cur.execute("create table t(key int, value text)")
# Shut down subsequently each of safekeepers and fill a segment while sk is
# down; ensure segment gets offloaded by others.
offloaded_seg_end = [Lsn("0/2000000"), Lsn("0/3000000"), Lsn("0/4000000")]
for victim, seg_end in zip(env.safekeepers, offloaded_seg_end, strict=False):
victim.stop()
# roughly fills one segment
cur.execute("insert into t select generate_series(1,250000), 'payload'")
live_sk = [sk for sk in env.safekeepers if sk != victim][0]
wait(
partial(is_segment_offloaded, live_sk, tenant_id, timeline_id, seg_end),
f"segment ending at {seg_end} get offloaded",
)
victim.start()
# put one of safekeepers down again
env.safekeepers[0].stop()
# restart postgres
endpoint.stop()
endpoint = env.endpoints.create_start("test_safekeepers_wal_backup")
# and ensure offloading still works
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
cur.execute("insert into t select generate_series(1,250000), 'payload'")
seg_end = Lsn("0/5000000")
wait(
partial(is_segment_offloaded, env.safekeepers[1], tenant_id, timeline_id, seg_end),
f"segment ending at {seg_end} get offloaded",
)
env.safekeepers[0].start()
endpoint.stop()
# Test that after timeline deletion remote objects are gone.
prefix = "/".join([str(tenant_id), str(timeline_id)])
assert_prefix_not_empty(neon_env_builder.safekeepers_remote_storage, prefix)
for sk in env.safekeepers:
sk_http = sk.http_client()
sk_http.timeline_delete(tenant_id, timeline_id)
assert_prefix_empty(neon_env_builder.safekeepers_remote_storage, prefix)
# This test is flaky, probably because PUTs of local fs storage are not atomic.
# Let's keep both remote storage kinds for a while to see if this is the case.
# https://github.com/neondatabase/neon/issues/10761
@pytest.mark.parametrize("remote_storage_kind", [s3_storage(), RemoteStorageKind.LOCAL_FS])
def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind):
neon_env_builder.num_safekeepers = 3
neon_env_builder.enable_safekeeper_remote_storage(remote_storage_kind)
env = neon_env_builder.init_start()
tenant_id = env.initial_tenant
timeline_id = env.create_branch("test_s3_wal_replay")
endpoint = env.endpoints.create_start("test_s3_wal_replay")
expected_sum = 0
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
cur.execute("create table t(key int, value text)")
cur.execute("insert into t values (1, 'payload')")
expected_sum += 1
offloaded_seg_end = Lsn("0/3000000")
# roughly fills two segments
cur.execute("insert into t select generate_series(1,500000), 'payload'")
expected_sum += 500000 * 500001 // 2
assert query_scalar(cur, "select sum(key) from t") == expected_sum
for sk in env.safekeepers:
wait(
partial(is_segment_offloaded, sk, tenant_id, timeline_id, offloaded_seg_end),
f"segment ending at {offloaded_seg_end} get offloaded",
)
# advance remote_consistent_lsn to trigger WAL trimming
# this LSN should be less than commit_lsn, so timeline will be active=true in safekeepers, to push broker updates
env.safekeepers[0].http_client().record_safekeeper_info(
tenant_id, timeline_id, {"remote_consistent_lsn": str(offloaded_seg_end)}
)
last_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()"))
for sk in env.safekeepers:
# require WAL to be trimmed, so no more than one segment is left
# on disk
# TODO: WAL removal uses persistent values and control
# file is fsynced roughly once in a segment, so there is a small
# chance that two segments are left on disk, not one. We can
# force persist cf and have 16 instead of 32 here.
target_size_mb = 32 * 1.5
wait(
partial(is_wal_trimmed, sk, tenant_id, timeline_id, target_size_mb),
f"sk_id={sk.id} to trim WAL to {target_size_mb:.2f}MB",
)
# wait till everyone puts data up to last_lsn on disk, we are
# going to recreate state on safekeepers claiming they have data till last_lsn.
wait(
partial(is_flush_lsn_caught_up, sk, tenant_id, timeline_id, last_lsn),
f"sk_id={sk.id} to flush {last_lsn}",
)
ps_http = env.pageserver.http_client()
pageserver_lsn = Lsn(ps_http.timeline_detail(tenant_id, timeline_id)["last_record_lsn"])
lag = last_lsn - pageserver_lsn
log.info(
f"Pageserver last_record_lsn={pageserver_lsn}; flush_lsn={last_lsn}; lag before replay is {lag / 1024}kb"
)
endpoint.stop()
timeline_delete_wait_completed(ps_http, tenant_id, timeline_id)
# Also delete and manually create timeline on safekeepers -- this tests
# scenario of manual recovery on different set of safekeepers.
# save the last (partial) file to put it back after recreation; others will be fetched from s3
sk = env.safekeepers[0]
tli_dir = Path(sk.data_dir) / str(tenant_id) / str(timeline_id)
f_partial = Path([f for f in os.listdir(tli_dir) if f.endswith(".partial")][0])
f_partial_path = tli_dir / f_partial
f_partial_saved = Path(sk.data_dir) / f_partial.name
f_partial_path.rename(f_partial_saved)
pg_version = sk.http_client().timeline_status(tenant_id, timeline_id).pg_version
# Terminate first all safekeepers to prevent communication unexpectantly
# advancing peer_horizon_lsn.
for sk in env.safekeepers:
cli = sk.http_client()
cli.timeline_delete(tenant_id, timeline_id, only_local=True)
# restart safekeeper to clear its in-memory state
sk.stop()
# wait all potenital in flight pushes to broker arrive before starting
# safekeepers (even without sleep, it is very unlikely they are not
# delivered yet).
time.sleep(1)
for sk in env.safekeepers:
sk.start()
cli = sk.http_client()
mconf = MembershipConfiguration(generation=0, members=[], new_members=None)
# set start_lsn to the beginning of the first segment to allow reading
# WAL from there (could you intidb LSN as well).
r = TimelineCreateRequest(
tenant_id, timeline_id, mconf, pg_version, Lsn("0/1000000"), commit_lsn=last_lsn
)
cli.timeline_create(r)
f_partial_path = (
Path(sk.data_dir) / str(tenant_id) / str(timeline_id) / f_partial_saved.name
)
shutil.copy(f_partial_saved, f_partial_path)
# recreate timeline on pageserver from scratch
ps_http.timeline_create(
pg_version=PgVersion(str(pg_version)),
tenant_id=tenant_id,
new_timeline_id=timeline_id,
)
wait_lsn_timeout = 60 * 3
started_at = time.time()
last_debug_print = 0.0
while True:
elapsed = time.time() - started_at
if elapsed > wait_lsn_timeout:
raise RuntimeError("Timed out waiting for WAL redo")
tenant_status = ps_http.tenant_status(tenant_id)
if tenant_status["state"]["slug"] == "Loading":
log.debug(f"Tenant {tenant_id} is still loading, retrying")
else:
pageserver_lsn = Lsn(
env.pageserver.http_client().timeline_detail(tenant_id, timeline_id)[
"last_record_lsn"
]
)
lag = last_lsn - pageserver_lsn
if time.time() > last_debug_print + 10 or lag <= 0:
last_debug_print = time.time()
log.info(f"Pageserver last_record_lsn={pageserver_lsn}; lag is {lag / 1024}kb")
if lag <= 0:
break
time.sleep(1)
log.info(f"WAL redo took {elapsed} s")
# verify data
endpoint.create_start("test_s3_wal_replay")
assert endpoint.safe_psql("select sum(key) from t")[0][0] == expected_sum
class ProposerPostgres(PgProtocol):
"""Object for running postgres without NeonEnv"""
def __init__(
self,
pgdata_dir: str,
pg_bin: PgBin,
tenant_id: TenantId,
timeline_id: TimelineId,
listen_addr: str,
port: int,
):
super().__init__(host=listen_addr, port=port, user="cloud_admin", dbname="postgres")
self.pgdata_dir: str = pgdata_dir
self.pg_bin: PgBin = pg_bin
self.tenant_id: TenantId = tenant_id
self.timeline_id: TimelineId = timeline_id
self.listen_addr: str = listen_addr
self.port: int = port
def pg_data_dir_path(self) -> str:
"""Path to data directory"""
return self.pgdata_dir
def config_file_path(self) -> str:
"""Path to postgresql.conf"""
return os.path.join(self.pgdata_dir, "postgresql.conf")
def create_dir_config(self, safekeepers: str):
"""Create dir and config for running --sync-safekeepers"""
Path(self.pg_data_dir_path()).mkdir(exist_ok=True)
with open(self.config_file_path(), "w") as f:
cfg = [
"synchronous_standby_names = 'walproposer'\n",
"shared_preload_libraries = 'neon'\n",
f"neon.timeline_id = '{self.timeline_id}'\n",
f"neon.tenant_id = '{self.tenant_id}'\n",
"neon.pageserver_connstring = ''\n",
f"neon.safekeepers = '{safekeepers}'\n",
f"listen_addresses = '{self.listen_addr}'\n",
f"port = '{self.port}'\n",
]
f.writelines(cfg)
def sync_safekeepers(self) -> Lsn:
"""
Run 'postgres --sync-safekeepers'.
Returns execution result, which is commit_lsn after sync.
"""
command = ["postgres", "--sync-safekeepers"]
env = {
"PGDATA": self.pg_data_dir_path(),
}
basepath = self.pg_bin.run_capture(command, env, with_command_header=False)
log.info(f"postgres --sync-safekeepers output: {basepath}")
stdout_filename = basepath + ".stdout"
with open(stdout_filename) as stdout_f:
stdout = stdout_f.read()
return Lsn(stdout.strip("\n "))
def initdb(self):
"""Run initdb"""
args = ["initdb", "--username", "cloud_admin", "--pgdata", self.pg_data_dir_path()]
self.pg_bin.run(args)
def start(self):
"""Start postgres with pg_ctl"""
log_path = os.path.join(self.pg_data_dir_path(), "pg.log")
args = ["pg_ctl", "-D", self.pg_data_dir_path(), "-l", log_path, "-w", "start"]
self.pg_bin.run(args)
def stop(self):
"""Stop postgres with pg_ctl"""
args = ["pg_ctl", "-D", self.pg_data_dir_path(), "-m", "immediate", "-w", "stop"]
self.pg_bin.run(args)
@pytest.mark.parametrize("auth_enabled", [False, True])
def test_timeline_status(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
neon_env_builder.auth_enabled = auth_enabled
env = neon_env_builder.init_start()
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
endpoint = env.endpoints.create_start("main")
wa = env.safekeepers[0]
if not auth_enabled:
wa_http_cli = wa.http_client()
wa_http_cli.check_status()
wa_http_cli_debug = wa.http_client()
wa_http_cli_debug.check_status()
else:
wa_http_cli = wa.http_client(auth_token=env.auth_keys.generate_tenant_token(tenant_id))
wa_http_cli.check_status()
wa_http_cli_bad = wa.http_client(
auth_token=env.auth_keys.generate_tenant_token(TenantId.generate())
)
wa_http_cli_bad.check_status()
wa_http_cli_noauth = wa.http_client(gen_sk_wide_token=False)
wa_http_cli_noauth.check_status()
# debug endpoint requires safekeeper scope
wa_http_cli_debug = wa.http_client(auth_token=env.auth_keys.generate_safekeeper_token())
wa_http_cli_debug.check_status()
# create a dummy table to wait for timeline initialization in safekeeper
endpoint.safe_psql("create table wait_for_sk()")
# fetch something sensible from status
tli_status = wa_http_cli.timeline_status(tenant_id, timeline_id)
term = tli_status.term
timeline_start_lsn = tli_status.timeline_start_lsn
if auth_enabled:
for cli in [wa_http_cli_bad, wa_http_cli_noauth]:
with pytest.raises(cli.HTTPError, match="Forbidden|Unauthorized"):
cli.timeline_status(tenant_id, timeline_id)
# fetch debug_dump endpoint
debug_dump_0 = wa_http_cli_debug.debug_dump({"dump_all": "true"})
log.info(f"debug_dump before reboot {debug_dump_0}")
assert debug_dump_0["timelines_count"] == 1
assert debug_dump_0["timelines"][0]["timeline_id"] == str(timeline_id)
assert debug_dump_0["timelines"][0]["wal_last_modified"] != ""
# debug dump non existing tenant, should return no timelines.
debug_dump_non_existent = wa_http_cli_debug.debug_dump(
{"tenant_id": "deadbeefdeadbeefdeadbeefdeadbeef"}
)
log.info(f"debug_dump_non_existend: {debug_dump_non_existent}")
assert len(debug_dump_non_existent["timelines"]) == 0
endpoint.safe_psql("create table t(i int)")
# ensure epoch goes up after reboot
endpoint.stop().start()
endpoint.safe_psql("insert into t values(10)")
tli_status = wa_http_cli.timeline_status(tenant_id, timeline_id)
term_after_reboot = tli_status.term
assert term_after_reboot > term
# and timeline_start_lsn stays the same
assert tli_status.timeline_start_lsn == timeline_start_lsn
# fetch debug_dump after reboot
debug_dump_1 = wa_http_cli_debug.debug_dump({"dump_all": "true"})
log.info(f"debug_dump after reboot {debug_dump_1}")
assert debug_dump_1["timelines_count"] == 1
assert debug_dump_1["timelines"][0]["timeline_id"] == str(timeline_id)
# check that commit_lsn and flush_lsn not decreased
assert (
debug_dump_1["timelines"][0]["memory"]["mem_state"]["commit_lsn"]
>= debug_dump_0["timelines"][0]["memory"]["mem_state"]["commit_lsn"]
)
assert (
debug_dump_1["timelines"][0]["memory"]["flush_lsn"]
>= debug_dump_0["timelines"][0]["memory"]["flush_lsn"]
)
# check .config in response
assert debug_dump_1["config"]["id"] == env.safekeepers[0].id
class DummyConsumer:
def __call__(self, msg):
pass
def test_start_replication_term(neon_env_builder: NeonEnvBuilder):
"""
Test START_REPLICATION of uncommitted part specifying leader term. It must
error if safekeeper switched to different term.
"""
env = neon_env_builder.init_start()
tenant_id = env.initial_tenant
timeline_id = env.create_branch("test_start_replication_term")
endpoint = env.endpoints.create_start("test_start_replication_term")
endpoint.safe_psql("CREATE TABLE t(key int primary key, value text)")
sk = env.safekeepers[0]
sk_http_cli = sk.http_client()
tli_status = sk_http_cli.timeline_status(tenant_id, timeline_id)
timeline_start_lsn = tli_status.timeline_start_lsn
conn_opts = {
"host": "127.0.0.1",
"options": f"-c timeline_id={timeline_id} tenant_id={tenant_id}",
"port": sk.port.pg,
"connection_factory": psycopg2.extras.PhysicalReplicationConnection,
}
sk_pg_conn = psycopg2.connect(**conn_opts) # type: ignore
with sk_pg_conn.cursor() as cur:
# should fail, as first start has term 2
cur.start_replication_expert(f"START_REPLICATION {timeline_start_lsn} (term='3')")
dummy_consumer = DummyConsumer()
with pytest.raises(psycopg2.errors.InternalError_) as excinfo:
cur.consume_stream(dummy_consumer)
assert "failed to acquire term 3" in str(excinfo.value)
# Test auth on all ports: WAL service (postgres protocol), WAL service tenant only and http.
def test_sk_auth(neon_env_builder: NeonEnvBuilder):
neon_env_builder.auth_enabled = True
env = neon_env_builder.init_start()
tenant_id = env.initial_tenant
timeline_id = env.create_branch("test_sk_auth")
env.endpoints.create_start("test_sk_auth")
sk = env.safekeepers[0]
tenant_token = env.auth_keys.generate_tenant_token(tenant_id)
full_token = env.auth_keys.generate_safekeeper_token()
conn_opts = {
"host": "127.0.0.1",
"options": f"-c timeline_id={timeline_id} tenant_id={tenant_id}",
}
connector = PgProtocol(**conn_opts)
# no password, should fail
with pytest.raises(psycopg2.OperationalError):
connector.safe_psql("IDENTIFY_SYSTEM", port=sk.port.pg)
# giving password, should be ok with either token on main pg port
connector.safe_psql("IDENTIFY_SYSTEM", port=sk.port.pg, password=tenant_token)
connector.safe_psql("IDENTIFY_SYSTEM", port=sk.port.pg, password=full_token)
# on tenant only port tenant only token should work
connector.safe_psql("IDENTIFY_SYSTEM", port=sk.port.pg_tenant_only, password=tenant_token)
# but full token should fail
with pytest.raises(psycopg2.OperationalError):
connector.safe_psql("IDENTIFY_SYSTEM", port=sk.port.pg_tenant_only, password=full_token)
# Now test that auth on http/pg can be enabled separately.
# By default, neon_local enables auth on all services if auth is configured,
# so http must require the token.
sk_http_cli_noauth = sk.http_client(gen_sk_wide_token=False)
sk_http_cli_auth = sk.http_client(auth_token=env.auth_keys.generate_tenant_token(tenant_id))
with pytest.raises(sk_http_cli_noauth.HTTPError, match="Forbidden|Unauthorized"):
sk_http_cli_noauth.timeline_status(tenant_id, timeline_id)
sk_http_cli_auth.timeline_status(tenant_id, timeline_id)
# now, disable auth on http
sk.stop()
sk.start(extra_opts=["--http-auth-public-key-path="])
sk_http_cli_noauth.timeline_status(tenant_id, timeline_id) # must work without token
# but pg should still require the token
with pytest.raises(psycopg2.OperationalError):
connector.safe_psql("IDENTIFY_SYSTEM", port=sk.port.pg)
connector.safe_psql("IDENTIFY_SYSTEM", port=sk.port.pg, password=tenant_token)
# now also disable auth on pg, but leave on pg tenant only
sk.stop()
sk.start(extra_opts=["--http-auth-public-key-path=", "--pg-auth-public-key-path="])
sk_http_cli_noauth.timeline_status(tenant_id, timeline_id) # must work without token
connector.safe_psql("IDENTIFY_SYSTEM", port=sk.port.pg) # must work without token
# but pg tenant only should still require the token
with pytest.raises(psycopg2.OperationalError):
connector.safe_psql("IDENTIFY_SYSTEM", port=sk.port.pg_tenant_only)
connector.safe_psql("IDENTIFY_SYSTEM", port=sk.port.pg_tenant_only, password=tenant_token)
# Try restarting endpoint with enabled auth.
def test_restart_endpoint(neon_env_builder: NeonEnvBuilder):
neon_env_builder.auth_enabled = True
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()
env.create_branch("test_sk_auth_restart_endpoint")
endpoint = env.endpoints.create_start("test_sk_auth_restart_endpoint")
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
cur.execute("create table t(i int)")
# Restarting endpoints and random safekeepers, to trigger recovery.
for _i in range(3):
random_sk = random.choice(env.safekeepers)
random_sk.stop()
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
start = random.randint(1, 100000)
end = start + random.randint(1, 10000)
cur.execute("insert into t select generate_series(%s,%s)", (start, end))
endpoint.stop()
random_sk.start()
endpoint.start()
# Try restarting endpoint immediately after xlog switch.
# https://github.com/neondatabase/neon/issues/8911
def test_restart_endpoint_after_switch_wal(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
timeline_id = env.initial_timeline
endpoint = env.endpoints.create_start("main")
endpoint.safe_psql("create table t (i int)")
endpoint.safe_psql("SELECT pg_switch_wal()")
# we want immediate shutdown to have endpoint restart on xlog switch record,
# so prevent shutdown checkpoint.
endpoint.stop(mode="immediate", sks_wait_walreceiver_gone=(env.safekeepers, timeline_id))
endpoint = env.endpoints.create_start("main")
endpoint.safe_psql("SELECT 'works'")
# Test restarting compute at WAL page boundary.
def test_restart_endpoint_wal_page_boundary(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
ep = env.endpoints.create_start("main")
ep.safe_psql("create table t (i int)")
with ep.cursor() as cur:
# measure how much space logical message takes. Sometimes first attempt
# creates huge message and then it stabilizes, have no idea why.
for _ in range(3):
lsn_before = Lsn(query_scalar(cur, "select pg_current_wal_lsn()"))
log.info(f"current_lsn={lsn_before}")
# Non-transactional logical message doesn't write WAL, only XLogInsert's
# it, so use transactional. Which is a bit problematic as transactional
# necessitates commit record. Alternatively we can do smth like
# select neon_xlogflush(pg_current_wal_insert_lsn());
# but isn't much better + that particular call complains on 'xlog flush
# request 0/282C018 is not satisfied' as pg_current_wal_insert_lsn skips
# page headers.
payload = "blahblah"
cur.execute(f"select pg_logical_emit_message(true, 'pref', '{payload}')")
lsn_after_by_curr_wal_lsn = Lsn(query_scalar(cur, "select pg_current_wal_lsn()"))
lsn_diff = lsn_after_by_curr_wal_lsn - lsn_before
logical_message_base = lsn_after_by_curr_wal_lsn - lsn_before - len(payload)
log.info(
f"before {lsn_before}, after {lsn_after_by_curr_wal_lsn}, lsn diff is {lsn_diff}, base {logical_message_base}"
)
# and write logical message spanning exactly as we want
lsn_before = Lsn(query_scalar(cur, "select pg_current_wal_lsn()"))
log.info(f"current_lsn={lsn_before}")
curr_lsn = Lsn(query_scalar(cur, "select pg_current_wal_lsn()"))
offs = int(curr_lsn) % 8192
till_page = 8192 - offs
target_lsn = curr_lsn + till_page
payload_len = (
till_page - logical_message_base - 8
) # not sure why 8 is here, it is deduced from experiments
log.info(
f"current_lsn={curr_lsn}, offs {offs}, till_page {till_page}, target_lsn {target_lsn}"
)
cur.execute(f"select pg_logical_emit_message(true, 'pref', 'f{'a' * payload_len}')")
supposedly_contrecord_end = Lsn(query_scalar(cur, "select pg_current_wal_lsn()"))
log.info(f"supposedly_page_boundary={supposedly_contrecord_end}")
# The calculations to hit the page boundary are very fuzzy, so just
# ignore test if we fail to reach it.
if not (int(supposedly_contrecord_end) % 8192 == 0):
pytest.skip(f"missed page boundary, bad luck: lsn is {supposedly_contrecord_end}")
ep.stop(mode="immediate")
ep = env.endpoints.create_start("main")
ep.safe_psql("insert into t values (42)") # should be ok
# Context manager which logs passed time on exit.
class DurationLogger:
def __init__(self, desc):
self.desc = desc
def __enter__(self):
self.ts_before = time.time()
def __exit__(self, *exc):
log.info(f"{self.desc} finished in {time.time() - self.ts_before}s")
# Context manager which logs WAL position change on exit.
class WalChangeLogger:
def __init__(self, ep, desc_before):
self.ep = ep
self.desc_before = desc_before
def __enter__(self):
self.ts_before = time.time()
self.lsn_before = Lsn(self.ep.safe_psql_scalar("select pg_current_wal_lsn()"))
log.info(f"{self.desc_before}, lsn_before={self.lsn_before}")
def __exit__(self, *exc):
lsn_after = Lsn(self.ep.safe_psql_scalar("select pg_current_wal_lsn()"))
log.info(
f"inserted {((lsn_after - self.lsn_before) / 1024 / 1024):.3f} MB of WAL in {(time.time() - self.ts_before):.3f}s"
)
# Test that we can create timeline with one safekeeper down and initialize it
# later when some data already had been written. It is strictly weaker than
# test_lagging_sk, but also is the simplest test to trigger WAL sk -> compute
# download (recovery) and as such useful for development/testing.
def test_late_init(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()
sk1 = env.safekeepers[0]
sk1.stop()
tenant_id = env.initial_tenant
timeline_id = env.create_branch("test_late_init")
endpoint = env.endpoints.create_start("test_late_init")
# create and insert smth while safekeeper is down...
endpoint.safe_psql("create table t(key int, value text)")
with WalChangeLogger(endpoint, "doing insert with sk1 down"):
endpoint.safe_psql("insert into t select generate_series(1, 1000), 'payload'")
endpoint.stop() # stop compute
# stop another safekeeper, and start one which missed timeline creation
sk2 = env.safekeepers[1]
sk2.stop()
sk1.start()
# insert some more
with DurationLogger("recovery"):
endpoint = env.endpoints.create_start("test_late_init")
endpoint.safe_psql("insert into t select generate_series(1,100), 'payload'")
wait_flush_lsn_align_by_ep(
env, "test_late_init", tenant_id, timeline_id, endpoint, [sk1, env.safekeepers[2]]
)
# Check that WALs are the same.
cmp_sk_wal([sk1, env.safekeepers[2]], tenant_id, timeline_id)
# is timeline flush_lsn equal on provided safekeepers?
def is_flush_lsn_aligned(sk_http_clis, tenant_id, timeline_id):
flush_lsns = [
sk_http_cli.timeline_status(tenant_id, timeline_id).flush_lsn
for sk_http_cli in sk_http_clis
]
log.info(f"waiting for flush_lsn alignment, flush_lsns={flush_lsns}")
return all([flush_lsns[0] == flsn for flsn in flush_lsns])
# Assert by xxd that WAL on given safekeepers is identical. No compute must be
# running for this to be reliable.
def cmp_sk_wal(sks: list[Safekeeper], tenant_id: TenantId, timeline_id: TimelineId):
assert len(sks) >= 2, "cmp_sk_wal makes sense with >= 2 safekeepers passed"
sk_http_clis = [sk.http_client() for sk in sks]
# First check that term / flush_lsn are the same: it is easier to
# report/understand if WALs are different due to that.
statuses = [sk_http_cli.timeline_status(tenant_id, timeline_id) for sk_http_cli in sk_http_clis]
term_flush_lsns = [(s.last_log_term, s.flush_lsn) for s in statuses]
for tfl, sk in zip(term_flush_lsns[1:], sks[1:], strict=False):
assert term_flush_lsns[0] == tfl, (
f"(last_log_term, flush_lsn) are not equal on sks {sks[0].id} and {sk.id}: {term_flush_lsns[0]} != {tfl}"
)
# check that WALs are identic.
segs = [sk.list_segments(tenant_id, timeline_id) for sk in sks]
for cmp_segs, sk in zip(segs[1:], sks[1:], strict=False):
assert segs[0] == cmp_segs, (
f"lists of segments on sks {sks[0].id} and {sk.id} are not identic: {segs[0]} and {cmp_segs}"
)
log.info(f"comparing segs {segs[0]}")
sk0 = sks[0]
for sk in sks[1:]:
(_, mismatch, not_regular) = filecmp.cmpfiles(
sk0.timeline_dir(tenant_id, timeline_id),
sk.timeline_dir(tenant_id, timeline_id),
segs[0],
shallow=False,
)
log.info(
f"filecmp result mismatch and not regular files:\n\t mismatch={mismatch}\n\t not_regular={not_regular}"
)
for f in mismatch:
f1 = sk0.timeline_dir(tenant_id, timeline_id) / f
f2 = sk.timeline_dir(tenant_id, timeline_id) / f
stdout_filename = f"{f2}.filediff"
with open(stdout_filename, "w") as stdout_f:
subprocess.run(f"xxd {f1} > {f1}.hex ", shell=True)
subprocess.run(f"xxd {f2} > {f2}.hex ", shell=True)
cmd = f"diff {f1}.hex {f2}.hex"
subprocess.run([cmd], stdout=stdout_f, shell=True)
assert (mismatch, not_regular) == (
[],
[],
), f"WAL segs {f1} and {f2} on sks {sks[0].id} and {sk.id} are not identic"
# Wait until flush_lsn on given sks becomes equal, assuming endpoint ep is
# running. ep is stopped by this function. This is used in tests which check
# binary equality of WAL segments on safekeepers; which is inherently racy as
# shutting down endpoint might always write some WAL which can get to only one
# safekeeper. So here we recheck flush_lsn again after ep shutdown and retry if
# it has changed.
def wait_flush_lsn_align_by_ep(env, branch, tenant_id, timeline_id, ep, sks):
sk_http_clis = [sk.http_client() for sk in sks]
# First wait for the alignment.
wait(
partial(is_flush_lsn_aligned, sk_http_clis, tenant_id, timeline_id),
"flush_lsn to get aligned",
)
ep.stop() # then stop endpoint
# Even if there is no compute, there might be some in flight data; ensure
# all walreceivers die before rechecking.
for sk_http_cli in sk_http_clis:
wait_walreceivers_absent(sk_http_cli, tenant_id, timeline_id)
# Now recheck again flush_lsn and exit if it is good
if is_flush_lsn_aligned(sk_http_clis, tenant_id, timeline_id):
return
# Otherwise repeat.
log.info("flush_lsn changed during endpoint shutdown; retrying alignment")
ep = env.endpoints.create_start(branch)
# Test behaviour with one safekeeper down and missing a lot of WAL, exercising
# neon_walreader and checking that pg_wal never bloats. Namely, ensures that
# compute doesn't keep many WAL for lagging sk, but still can recover it with
# neon_walreader, in two scenarious: a) WAL never existed on compute (it started
# on basebackup LSN later than lagging sk position) though segment file exists
# b) WAL had been recycled on it and segment file doesn't exist.
#
# Also checks along the way that whenever there are two sks alive, compute
# should be able to commit.
def test_lagging_sk(neon_env_builder: NeonEnvBuilder):
# inserts ~20MB of WAL, a bit more than a segment.
def fill_segment(ep):
ep.safe_psql("insert into t select generate_series(1, 180000), 'payload'")
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()
(sk1, sk2, sk3) = env.safekeepers
# create and insert smth while safekeeper is down...
sk1.stop()
tenant_id = env.initial_tenant
timeline_id = env.create_branch("test_lagging_sk")
ep = env.endpoints.create_start("test_lagging_sk")
ep.safe_psql("create table t(key int, value text)")
# make small insert to be on the same segment
ep.safe_psql("insert into t select generate_series(1, 1000), 'payload'")
log.info("insert with safekeeper down done")
ep.stop() # stop compute
# Stop another safekeeper, and start one which missed timeline creation.
sk2.stop()
sk1.start()
# Start new ep and insert some more. neon_walreader should download WAL for
# sk1 because it should be filled since the horizon (initial LSN) which is
# earlier than basebackup LSN.
ep = env.endpoints.create_start("test_lagging_sk")
ep.safe_psql("insert into t select generate_series(1,100), 'payload'")
# stop ep and ensure WAL is identical after recovery.
wait_flush_lsn_align_by_ep(env, "test_lagging_sk", tenant_id, timeline_id, ep, [sk1, sk3])
# Check that WALs are the same.
cmp_sk_wal([sk1, sk3], tenant_id, timeline_id)
# Now repeat insertion with sk1 down, but with inserting more data to check
# that WAL on compute is removed.
sk1.stop()
sk2.start()
# min_wal_size must be at least 2x segment size.
min_wal_config = [
"min_wal_size=32MB",
"max_wal_size=32MB",
"wal_keep_size=0",
"log_checkpoints=on",
]
ep = env.endpoints.create_start(
"test_lagging_sk",
config_lines=min_wal_config,
)
with WalChangeLogger(ep, "doing large insert with sk1 down"):
for _ in range(0, 5):
fill_segment(ep)
# there shouldn't be more than 2 WAL segments (but dir may have archive_status files)
assert ep.get_pg_wal_size() < 16 * 2.5
sk2.stop() # stop another sk to ensure sk1 and sk3 can work
sk1.start()
with DurationLogger("recovery"):
ep.safe_psql("insert into t select generate_series(1,100), 'payload'") # forces recovery
# stop ep and ensure WAL is identical after recovery.
wait_flush_lsn_align_by_ep(env, "test_lagging_sk", tenant_id, timeline_id, ep, [sk1, sk3])
# Check that WALs are the same.
cmp_sk_wal([sk1, sk3], tenant_id, timeline_id)
# Now do the same with different safekeeper sk2 down, and restarting ep
# before recovery (again scenario when recovery starts below basebackup_lsn,
# but multi segment now).
ep = env.endpoints.create_start(
"test_lagging_sk",
config_lines=["min_wal_size=32MB", "max_wal_size=32MB", "log_checkpoints=on"],
)
with WalChangeLogger(ep, "doing large insert with sk2 down"):
for _ in range(0, 5):
fill_segment(ep)
# there shouldn't be more than 2 WAL segments (but dir may have archive_status files)
assert ep.get_pg_wal_size() < 16 * 2.5
ep.stop()
ep = env.endpoints.create_start(
"test_lagging_sk",
config_lines=min_wal_config,
)
sk2.start()
with DurationLogger("recovery"):
wait_flush_lsn_align_by_ep(env, "test_lagging_sk", tenant_id, timeline_id, ep, [sk2, sk3])
# Check that WALs are the same.
cmp_sk_wal([sk1, sk2, sk3], tenant_id, timeline_id)
env.stop(immediate=True)
# Smaller version of test_one_sk_down testing peer recovery in isolation: that
# it works without compute at all.
def test_peer_recovery(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 3
# timelines should be created the old way
neon_env_builder.storage_controller_config = {
"timelines_onto_safekeepers": False,
}
env = neon_env_builder.init_start()
tenant_id = env.initial_tenant
timeline_id = env.create_branch("test_peer_recovery")
endpoint = env.endpoints.create_start("test_peer_recovery")
endpoint.safe_psql("create table t(key int, value text)")
sk1 = env.safekeepers[0]
sk2 = env.safekeepers[1]
sk1_http_cli = sk1.http_client()
sk2_http_cli = sk2.http_client()
# ensure tli gets created on sk1, peer recovery won't do that
wait(
partial(is_flush_lsn_aligned, [sk1_http_cli, sk2_http_cli], tenant_id, timeline_id),
"flush_lsn to get aligned",
)
sk1 = env.safekeepers[0]
sk1.stop()
# roughly fills one segment
endpoint.safe_psql("insert into t select generate_series(1,250000), 'payload'")
lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
endpoint.stop() # stop compute
# now start safekeeper, but with peer recovery disabled; it should lag for about a segment
sk1.start(extra_opts=["--peer-recovery=false"])
sk1_tli_status = sk1_http_cli.timeline_status(tenant_id, timeline_id)
sk2_tli_status = sk2_http_cli.timeline_status(tenant_id, timeline_id)
log.info(
f"flush_lsns after insertion: sk1={sk1_tli_status.flush_lsn}, sk2={sk2_tli_status.flush_lsn}"
)
assert sk2_tli_status.flush_lsn - sk1_tli_status.flush_lsn >= 16 * 1024 * 1024
# wait a bit, lsns shouldn't change
time.sleep(2)
sk1_tli_status = sk1_http_cli.timeline_status(tenant_id, timeline_id)
sk2_tli_status = sk2_http_cli.timeline_status(tenant_id, timeline_id)
log.info(
f"flush_lsns after waiting: sk1={sk1_tli_status.flush_lsn}, sk2={sk2_tli_status.flush_lsn}"
)
assert sk2_tli_status.flush_lsn - sk1_tli_status.flush_lsn >= 16 * 1024 * 1024
# now restart safekeeper with peer recovery enabled and wait for recovery
sk1.stop().start(extra_opts=["--peer-recovery=true"])
wait(
partial(is_flush_lsn_aligned, [sk1_http_cli, sk2_http_cli], tenant_id, timeline_id),
"flush_lsn to get aligned",
)
sk1_digest = sk1.http_client().timeline_digest(
tenant_id, timeline_id, sk1.get_timeline_start_lsn(tenant_id, timeline_id), lsn
)
sk2_digest = sk1.http_client().timeline_digest(
tenant_id, timeline_id, sk2.get_timeline_start_lsn(tenant_id, timeline_id), lsn
)
assert sk1_digest == sk2_digest
# stop one of safekeepers which weren't recovering and insert a bit more to check we can commit
env.safekeepers[2].stop()
endpoint = env.endpoints.create_start("test_peer_recovery")
endpoint.safe_psql("insert into t select generate_series(1,100), 'payload'")
# Test that when compute is terminated in fast (or smart) mode, walproposer is
# allowed to run and self terminate after shutdown checkpoint is written, so it
# commits it to safekeepers before exiting. This not required for correctness,
# but needed for tests using check_restored_datadir_content.
def test_wp_graceful_shutdown(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
neon_env_builder.num_safekeepers = 1
env = neon_env_builder.init_start()
tenant_id = env.initial_tenant
timeline_id = env.create_branch("test_wp_graceful_shutdown")
ep = env.endpoints.create_start("test_wp_graceful_shutdown")
ep.safe_psql("create table t(key int, value text)")
ep.stop()
# figure out checkpoint lsn
ckpt_lsn = pg_bin.get_pg_controldata_checkpoint_lsn(ep.pg_data_dir_path())
sk_http_cli = env.safekeepers[0].http_client()
commit_lsn = sk_http_cli.timeline_status(tenant_id, timeline_id).commit_lsn
# Note: this is in memory value. Graceful shutdown of walproposer currently
# doesn't guarantee persisted value, which is ok as we need it only for
# tests. Persisting it without risking too many cf flushes needs a wp -> sk
# protocol change. (though in reality shutdown sync-safekeepers does flush
# of cf, so most of the time persisted value wouldn't lag)
log.info(f"sk commit_lsn {commit_lsn}")
# note that ckpt_lsn is the *beginning* of checkpoint record, so commit_lsn
# must be actually higher
assert commit_lsn > ckpt_lsn, "safekeeper must have checkpoint record"
class SafekeeperEnv:
def __init__(
self,
repo_dir: Path,
port_distributor: PortDistributor,
pg_bin: PgBin,
neon_binpath: Path,
num_safekeepers: int = 1,
):
self.repo_dir = repo_dir
self.port_distributor = port_distributor
self.fake_broker_endpoint = f"http://127.0.0.1:{port_distributor.get_port()}"
self.pg_bin = pg_bin
self.num_safekeepers = num_safekeepers
self.bin_safekeeper = str(neon_binpath / "safekeeper")
self.safekeepers: list[subprocess.CompletedProcess[Any]] | None = None
self.postgres: ProposerPostgres | None = None
self.tenant_id: TenantId | None = None
self.timeline_id: TimelineId | None = None
def init(self) -> Self:
assert self.postgres is None, "postgres is already initialized"
assert self.safekeepers is None, "safekeepers are already initialized"
self.tenant_id = TenantId.generate()
self.timeline_id = TimelineId.generate()
self.repo_dir.mkdir(exist_ok=True)
# Create config and a Safekeeper object for each safekeeper
self.safekeepers = []
for i in range(1, self.num_safekeepers + 1):
self.safekeepers.append(self.start_safekeeper(i))
# Create and start postgres
self.postgres = self.create_postgres()
self.postgres.start()
return self
def start_safekeeper(self, i):
port = SafekeeperPort(
pg=self.port_distributor.get_port(),
pg_tenant_only=self.port_distributor.get_port(),
http=self.port_distributor.get_port(),
https=None,
)
safekeeper_dir = self.repo_dir / f"sk{i}"
safekeeper_dir.mkdir(exist_ok=True)
cmd = [
self.bin_safekeeper,
"-l",
f"127.0.0.1:{port.pg}",
"--listen-http",
f"127.0.0.1:{port.http}",
"-D",
str(safekeeper_dir),
"--id",
str(i),
"--broker-endpoint",
self.fake_broker_endpoint,
]
log.info(f'Running command "{" ".join(cmd)}"')
safekeeper_client = SafekeeperHttpClient(
port=port.http,
auth_token=None,
)
safekeeper_process = start_in_background(
cmd, safekeeper_dir, "safekeeper.log", safekeeper_client.check_status
)
return safekeeper_process
def get_safekeeper_connstrs(self):
assert self.safekeepers is not None, "safekeepers are not initialized"
return ",".join([sk_proc.args[2] for sk_proc in self.safekeepers])
def create_postgres(self):
assert self.tenant_id is not None, "tenant_id is not initialized"
assert self.timeline_id is not None, "tenant_id is not initialized"
pgdata_dir = os.path.join(self.repo_dir, "proposer_pgdata")
pg = ProposerPostgres(
pgdata_dir,
self.pg_bin,
self.tenant_id,
self.timeline_id,
"127.0.0.1",
self.port_distributor.get_port(),
)
pg.initdb()
pg.create_dir_config(self.get_safekeeper_connstrs())
return pg
def kill_safekeeper(self, sk_dir):
"""Read pid file and kill process"""
pid_file = os.path.join(sk_dir, "safekeeper.pid")
with open(pid_file) as f:
pid = int(f.read())
log.info(f"Killing safekeeper with pid {pid}")
os.kill(pid, signal.SIGKILL)
def __enter__(self) -> Self:
return self
def __exit__(self, exc_type, exc_value, traceback):
log.info("Cleaning up all safekeeper and compute nodes")
# Stop all the nodes
if self.postgres is not None:
self.postgres.stop()
if self.safekeepers is not None:
for sk_proc in self.safekeepers:
self.kill_safekeeper(sk_proc.args[6])
def test_safekeeper_without_pageserver(
test_output_dir: str,
port_distributor: PortDistributor,
pg_bin: PgBin,
neon_binpath: Path,
):
# Create the environment in the test-specific output dir
repo_dir = Path(os.path.join(test_output_dir, "repo"))
env = SafekeeperEnv(
repo_dir,
port_distributor,
pg_bin,
neon_binpath,
)
with env:
env.init()
assert env.postgres is not None
env.postgres.safe_psql("create table t(i int)")
env.postgres.safe_psql("insert into t select generate_series(1, 100)")
res = env.postgres.safe_psql("select sum(i) from t")[0][0]
assert res == 5050
def test_replace_safekeeper(neon_env_builder: NeonEnvBuilder):
# timelines should be created the old way manually until we have migration support
neon_env_builder.storage_controller_config = {
"timelines_onto_safekeepers": False,
}
def execute_payload(endpoint: Endpoint):
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
# we rely upon autocommit after each statement
# as waiting for acceptors happens there
cur.execute("CREATE TABLE IF NOT EXISTS t(key int, value text)")
cur.execute("INSERT INTO t VALUES (0, 'something')")
sum_before = query_scalar(cur, "SELECT SUM(key) FROM t")
cur.execute("INSERT INTO t SELECT generate_series(1,100000), 'payload'")
sum_after = query_scalar(cur, "SELECT SUM(key) FROM t")
assert sum_after == sum_before + 5000050000
def show_statuses(safekeepers: list[Safekeeper], tenant_id: TenantId, timeline_id: TimelineId):
for sk in safekeepers:
http_cli = sk.http_client()
try:
status = http_cli.timeline_status(tenant_id, timeline_id)
log.info(f"Safekeeper {sk.id} status: {status}")
except Exception as e:
log.info(f"Safekeeper {sk.id} status error: {e}")
neon_env_builder.num_safekeepers = 4
env = neon_env_builder.init_start()
tenant_id = env.initial_tenant
timeline_id = env.create_branch("test_replace_safekeeper")
log.info("Use only first 3 safekeepers")
env.safekeepers[3].stop()
endpoint = env.endpoints.create("test_replace_safekeeper")
endpoint.active_safekeepers = [1, 2, 3]
endpoint.start()
execute_payload(endpoint)
show_statuses(env.safekeepers, tenant_id, timeline_id)
log.info("Restart all safekeepers to flush everything")
env.safekeepers[0].stop(immediate=True)
execute_payload(endpoint)
env.safekeepers[0].start()
env.safekeepers[1].stop(immediate=True)
execute_payload(endpoint)
env.safekeepers[1].start()
env.safekeepers[2].stop(immediate=True)
execute_payload(endpoint)
env.safekeepers[2].start()
env.safekeepers[0].stop(immediate=True)
env.safekeepers[1].stop(immediate=True)
env.safekeepers[2].stop(immediate=True)
env.safekeepers[0].start()
env.safekeepers[1].start()
env.safekeepers[2].start()
execute_payload(endpoint)
show_statuses(env.safekeepers, tenant_id, timeline_id)
log.info("Stop sk1 (simulate failure) and use only quorum of sk2 and sk3")
env.safekeepers[0].stop(immediate=True)
execute_payload(endpoint)
show_statuses(env.safekeepers, tenant_id, timeline_id)
log.info("Recreate postgres to replace failed sk1 with new sk4")
endpoint.stop_and_destroy().create("test_replace_safekeeper")
env.safekeepers[3].start()
endpoint.active_safekeepers = [2, 3, 4]
endpoint.start()
execute_payload(endpoint)
show_statuses(env.safekeepers, tenant_id, timeline_id)
log.info("Stop sk2 to require quorum of sk3 and sk4 for normal work")
env.safekeepers[1].stop(immediate=True)
execute_payload(endpoint)
show_statuses(env.safekeepers, tenant_id, timeline_id)
# Basic pull_timeline test.
# When live_sk_change is False, compute is restarted to change set of
# safekeepers; otherwise it is live reload.
@pytest.mark.parametrize("live_sk_change", [False, True])
def test_pull_timeline(neon_env_builder: NeonEnvBuilder, live_sk_change: bool):
neon_env_builder.auth_enabled = True
def execute_payload(endpoint: Endpoint):
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
# we rely upon autocommit after each statement
# as waiting for acceptors happens there
cur.execute("CREATE TABLE IF NOT EXISTS t(key int, value text)")
cur.execute("INSERT INTO t VALUES (0, 'something')")
sum_before = query_scalar(cur, "SELECT SUM(key) FROM t")
cur.execute("INSERT INTO t SELECT generate_series(1,100000), 'payload'")
sum_after = query_scalar(cur, "SELECT SUM(key) FROM t")
assert sum_after == sum_before + 5000050000
def show_statuses(safekeepers: list[Safekeeper], tenant_id: TenantId, timeline_id: TimelineId):
for sk in safekeepers:
http_cli = sk.http_client(auth_token=env.auth_keys.generate_tenant_token(tenant_id))
try:
status = http_cli.timeline_status(tenant_id, timeline_id)
log.info(f"Safekeeper {sk.id} status: {status}")
except Exception as e:
log.info(f"Safekeeper {sk.id} status error: {e}")
neon_env_builder.num_safekeepers = 4
env = neon_env_builder.init_start()
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
log.info("Use only first 3 safekeepers")
env.safekeepers[3].stop()
endpoint = env.endpoints.create("main")
endpoint.start(safekeepers=[1, 2, 3])
execute_payload(endpoint)
show_statuses(env.safekeepers, tenant_id, timeline_id)
log.info("Kill safekeeper 2, continue with payload")
env.safekeepers[1].stop(immediate=True)
execute_payload(endpoint)
log.info("Initialize new safekeeper 4, pull data from 1 & 3")
env.safekeepers[3].start()
res = env.safekeepers[3].pull_timeline(
[env.safekeepers[0], env.safekeepers[2]], tenant_id, timeline_id
)
sk_id_1 = env.safekeepers[0].safekeeper_id()
sk_id_3 = env.safekeepers[2].safekeeper_id()
sk_id_4 = env.safekeepers[3].safekeeper_id()
new_conf = MembershipConfiguration(
generation=2, members=[sk_id_1, sk_id_3, sk_id_4], new_members=None
)
for i in [0, 2, 3]:
env.safekeepers[i].http_client().membership_switch(tenant_id, timeline_id, new_conf)
log.info("Finished pulling timeline")
log.info(res)
show_statuses(env.safekeepers, tenant_id, timeline_id)
action = "reconfiguing" if live_sk_change else "restarting"
log.info(f"{action} compute with new config to verify that it works")
new_sks = [1, 3, 4]
if not live_sk_change:
endpoint.stop_and_destroy().create("main")
endpoint.start(safekeepers=new_sks)
else:
endpoint.reconfigure(safekeepers=new_sks)
execute_payload(endpoint)
show_statuses(env.safekeepers, tenant_id, timeline_id)
log.info("Stop sk1 (simulate failure) and use only quorum of sk3 and sk4")
env.safekeepers[0].stop(immediate=True)
execute_payload(endpoint)
show_statuses(env.safekeepers, tenant_id, timeline_id)
log.info("Restart sk4 and and use quorum of sk1 and sk4")
env.safekeepers[3].stop()
env.safekeepers[2].stop()
env.safekeepers[0].start()
env.safekeepers[3].start()
execute_payload(endpoint)
show_statuses(env.safekeepers, tenant_id, timeline_id)
# Test pull_timeline while concurrently gc'ing WAL on safekeeper:
# 1) Start pull_timeline, listing files to fetch.
# 2) Write segment, do gc.
# 3) Finish pull_timeline.
# 4) Do some write, verify integrity with timeline_digest.
# Expected to fail while holding off WAL gc plus fetching commit_lsn WAL
# segment is not implemented.
def test_pull_timeline_gc(neon_env_builder: NeonEnvBuilder):
neon_env_builder.auth_enabled = True
neon_env_builder.num_safekeepers = 3
neon_env_builder.enable_safekeeper_remote_storage(default_remote_storage())
env = neon_env_builder.init_start()
(src_sk, dst_sk) = (env.safekeepers[0], env.safekeepers[2])
dst_sk.stop()
[tenant_id, timeline_id] = env.create_tenant()
log.info("use only first 2 safekeepers, 3rd will be seeded")
endpoint = env.endpoints.create("main", tenant_id=tenant_id)
endpoint.active_safekeepers = [1, 2]
endpoint.start()
endpoint.safe_psql("create table t(key int, value text)")
endpoint.safe_psql("insert into t select generate_series(1, 1000), 'pear'")
src_flush_lsn = src_sk.get_flush_lsn(tenant_id, timeline_id)
log.info(f"flush_lsn on src before pull_timeline: {src_flush_lsn}")
src_http = src_sk.http_client()
# run pull_timeline which will halt before downloading files
src_http.configure_failpoints(("sk-snapshot-after-list-pausable", "pause"))
dst_sk.start()
pt_handle = PropagatingThread(
target=dst_sk.pull_timeline, args=([src_sk], tenant_id, timeline_id)
)
pt_handle.start()
src_sk.wait_until_paused("sk-snapshot-after-list-pausable")
# ensure segment exists
endpoint.safe_psql("insert into t select generate_series(1, 180000), 'papaya'")
lsn = last_flush_lsn_upload(
env,
endpoint,
tenant_id,
timeline_id,
auth_token=env.auth_keys.generate_tenant_token(tenant_id),
)
assert lsn > Lsn("0/2000000")
# Checkpoint timeline beyond lsn.
src_sk.checkpoint_up_to(tenant_id, timeline_id, lsn, wait_wal_removal=False)
first_segment_p = src_sk.timeline_dir(tenant_id, timeline_id) / "000000010000000000000001"
log.info(f"first segment exist={os.path.exists(first_segment_p)}")
src_http.configure_failpoints(("sk-snapshot-after-list-pausable", "off"))
pt_handle.join()
# after pull_timeline is finished WAL should be removed on donor
src_sk.checkpoint_up_to(tenant_id, timeline_id, lsn, wait_wal_removal=True)
timeline_start_lsn = src_sk.get_timeline_start_lsn(tenant_id, timeline_id)
dst_flush_lsn = dst_sk.get_flush_lsn(tenant_id, timeline_id)
log.info(f"flush_lsn on dst after pull_timeline: {dst_flush_lsn}")
assert dst_flush_lsn >= src_flush_lsn
digests = [
sk.http_client().timeline_digest(tenant_id, timeline_id, timeline_start_lsn, dst_flush_lsn)
for sk in [src_sk, dst_sk]
]
assert digests[0] == digests[1], f"digest on src is {digests[0]} but on dst is {digests[1]}"
# Test pull_timeline while concurrently changing term on the donor:
# 1) Start pull_timeline, listing files to fetch.
# 2) Change term on the donor
# 3) Finish pull_timeline.
#
# Currently (until proper membership change procedure), we want to pull_timeline
# to fetch the log up to <last_log_term, flush_lsn>. This is unsafe if term
# changes during the procedure (unless timeline is locked all the time but we
# don't want that): recepient might end up with mix of WAL from different
# histories. Thus the schedule above is expected to fail. Later we'd allow
# pull_timeline to only initialize timeline to any valid state (up to
# commit_lsn), holding switch to fully new configuration until it recovers
# enough, so it won't be affected by term change anymore.
#
# Expected to fail while term check is not implemented.
def test_pull_timeline_term_change(neon_env_builder: NeonEnvBuilder):
neon_env_builder.auth_enabled = True
neon_env_builder.num_safekeepers = 3
neon_env_builder.enable_safekeeper_remote_storage(default_remote_storage())
env = neon_env_builder.init_start()
tenant_id = env.initial_tenant
(src_sk, dst_sk) = (env.safekeepers[0], env.safekeepers[2])
dst_sk.stop()
src_http = src_sk.http_client()
src_http.configure_failpoints(("sk-snapshot-after-list-pausable", "pause"))
timeline_id = env.create_branch("pull_timeline_term_changes")
# run pull_timeline which will halt before downloading files
log.info("use only first 2 safekeepers, 3rd will be seeded")
ep = env.endpoints.create("pull_timeline_term_changes")
ep.active_safekeepers = [1, 2]
ep.start()
ep.safe_psql("create table t(key int, value text)")
ep.safe_psql("insert into t select generate_series(1, 1000), 'pear'")
pt_handle = PropagatingThread(
target=dst_sk.pull_timeline, args=([src_sk], tenant_id, timeline_id)
)
dst_sk.start()
pt_handle.start()
src_sk.wait_until_paused("sk-snapshot-after-list-pausable")
src_http = src_sk.http_client()
term_before = src_http.timeline_status(tenant_id, timeline_id).term
# restart compute to bump term
ep.stop()
ep = env.endpoints.create("pull_timeline_term_changes")
ep.active_safekeepers = [1, 2]
ep.start()
ep.safe_psql("insert into t select generate_series(1, 100), 'pear'")
term_after = src_http.timeline_status(tenant_id, timeline_id).term
assert term_after > term_before, f"term_after={term_after}, term_before={term_before}"
src_http.configure_failpoints(("sk-snapshot-after-list-pausable", "off"))
with pytest.raises(requests.exceptions.HTTPError):
pt_handle.join()
def test_pull_timeline_while_evicted(neon_env_builder: NeonEnvBuilder):
"""
Verify that when pull_timeline is used on an evicted timeline, it does not result in
promoting any segments to local disk on the source, and the timeline is correctly instantiated
in evicted state on the destination. This behavior is important to avoid ballooning disk
usage when doing mass migration of timelines.
"""
neon_env_builder.num_safekeepers = 4
neon_env_builder.enable_safekeeper_remote_storage(default_remote_storage())
# Configure safekeepers with ultra-fast eviction policy
neon_env_builder.safekeeper_extra_opts = [
"--enable-offload",
"--partial-backup-timeout",
"50ms",
"--control-file-save-interval",
"1s",
# Safekeepers usually wait a while before evicting something: for this test we want them to
# evict things as soon as they are inactive.
"--eviction-min-resident=100ms",
"--delete-offloaded-wal",
]
initial_tenant_conf = {"lagging_wal_timeout": "1s", "checkpoint_timeout": "100ms"}
env = neon_env_builder.init_start(initial_tenant_conf=initial_tenant_conf)
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
(src_sk, dst_sk) = (env.safekeepers[0], env.safekeepers[-1])
log.info(f"Will pull_timeline on destination {dst_sk.id} from source {src_sk.id}")
ep = env.endpoints.create("main")
ep.active_safekeepers = [s.id for s in env.safekeepers if s.id != dst_sk.id]
log.info(f"Compute writing initially to safekeepers: {ep.active_safekeepers}")
ep.active_safekeepers = [1, 2, 3] # Exclude dst_sk from set written by compute initially
ep.start()
ep.safe_psql("CREATE TABLE t(i int)")
ep.safe_psql("INSERT INTO t VALUES (0)")
ep.stop()
wait_lsn_force_checkpoint_at_sk(src_sk, tenant_id, timeline_id, env.pageserver)
src_http = src_sk.http_client()
dst_http = dst_sk.http_client()
def evicted_on_source():
# Wait for timeline to go into evicted state
assert src_http.get_eviction_state(timeline_id) != "Present"
assert (
src_http.get_metric_value(
"safekeeper_eviction_events_completed_total", {"kind": "evict"}
)
or 0 > 0
)
assert src_http.get_metric_value("safekeeper_evicted_timelines") or 0 > 0
# Check that on source no segment files are present
assert src_sk.list_segments(tenant_id, timeline_id) == []
wait_until(evicted_on_source, timeout=60)
# Invoke pull_timeline: source should serve snapshot request without promoting anything to local disk,
# destination should import the control file only & go into evicted mode immediately
dst_sk.pull_timeline([src_sk], tenant_id, timeline_id)
# Check that on source and destination no segment files are present
assert src_sk.list_segments(tenant_id, timeline_id) == []
assert dst_sk.list_segments(tenant_id, timeline_id) == []
# Check that the timeline on the destination is in the expected evicted state.
evicted_on_source() # It should still be evicted on the source
def evicted_on_destination():
assert dst_http.get_eviction_state(timeline_id) != "Present"
assert dst_http.get_metric_value("safekeeper_evicted_timelines") or 0 > 0
# This should be fast, it is a wait_until because eviction state is updated
# in the background wrt pull_timeline.
wait_until(evicted_on_destination, timeout=1.0, interval=0.1)
# Delete the timeline on the source, to prove that deletion works on an
# evicted timeline _and_ that the final compute test is really not using
# the original location
src_sk.http_client().timeline_delete(tenant_id, timeline_id, only_local=True)
# Check that using the timeline correctly un-evicts it on the new location
ep.active_safekeepers = [2, 3, 4]
ep.start()
ep.safe_psql("INSERT INTO t VALUES (0)")
ep.stop()
def unevicted_on_dest():
assert (
dst_http.get_metric_value(
"safekeeper_eviction_events_completed_total", {"kind": "restore"}
)
or 0 > 0
)
n_evicted = dst_sk.http_client().get_metric_value("safekeeper_evicted_timelines")
assert n_evicted == 0
wait_until(unevicted_on_dest, interval=0.1, timeout=1.0)
# Basic test for http API membership related calls: create timeline and switch
# configuration. Normally these are called by storage controller, but this
# allows to test them separately.
@run_only_on_default_postgres("tests only safekeeper API")
def test_membership_api(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 1
# timelines should be created the old way
neon_env_builder.storage_controller_config = {
"timelines_onto_safekeepers": False,
}
env = neon_env_builder.init_start()
# These are expected after timeline deletion on safekeepers.
env.pageserver.allowed_errors.extend(
[
".*Timeline .* was not found in global map.*",
".*Timeline .* has been deleted.*",
".*Timeline .* was cancelled and cannot be used anymore.*",
]
)
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
sk = env.safekeepers[0]
http_cli = sk.http_client()
sk_id_1 = SafekeeperId(sk.id, "localhost", sk.port.pg_tenant_only)
sk_id_2 = SafekeeperId(11, "localhost", 5434) # just a mock
# Request to switch before timeline creation should fail.
init_conf = MembershipConfiguration(generation=1, members=[sk_id_1], new_members=None)
with pytest.raises(requests.exceptions.HTTPError):
http_cli.membership_switch(tenant_id, timeline_id, init_conf)
# Create timeline.
create_r = TimelineCreateRequest(
tenant_id, timeline_id, init_conf, 150002, Lsn("0/1000000"), commit_lsn=None
)
log.info(f"sending {create_r.to_json()}")
http_cli.timeline_create(create_r)
# Switch into some conf.
joint_conf = MembershipConfiguration(generation=4, members=[sk_id_1], new_members=[sk_id_2])
resp = http_cli.membership_switch(tenant_id, timeline_id, joint_conf)
log.info(f"joint switch resp: {resp}")
assert resp.previous_conf.generation == 1
assert resp.current_conf.generation == 4
# Restart sk, conf should be preserved.
sk.stop().start()
after_restart = http_cli.get_membership(tenant_id, timeline_id)
log.info(f"conf after restart: {after_restart}")
assert after_restart.generation == 4
# Switch into non joint conf of which sk is not a member, must fail.
non_joint_not_member = MembershipConfiguration(
generation=5, members=[sk_id_2], new_members=None
)
with pytest.raises(requests.exceptions.HTTPError):
resp = http_cli.membership_switch(tenant_id, timeline_id, non_joint_not_member)
# Switch into good non joint conf.
non_joint = MembershipConfiguration(generation=6, members=[sk_id_1], new_members=None)
resp = http_cli.membership_switch(tenant_id, timeline_id, non_joint)
log.info(f"non joint switch resp: {resp}")
assert resp.previous_conf.generation == 4
assert resp.current_conf.generation == 6
# Switch request to lower conf should be rejected.
lower_conf = MembershipConfiguration(generation=3, members=[sk_id_1], new_members=None)
with pytest.raises(requests.exceptions.HTTPError):
http_cli.membership_switch(tenant_id, timeline_id, lower_conf)
# Now, exclude sk from the membership, timeline should be deleted.
excluded_conf = MembershipConfiguration(generation=7, members=[sk_id_2], new_members=None)
http_cli.timeline_exclude(tenant_id, timeline_id, excluded_conf)
with pytest.raises(requests.exceptions.HTTPError):
http_cli.timeline_status(tenant_id, timeline_id)
def test_explicit_timeline_creation(neon_env_builder: NeonEnvBuilder):
"""
Test that having neon.safekeepers starting with g#n: with non zero n enables
generations, which as a side effect disables automatic timeline creation.
This is kind of bootstrapping test: here membership conf & timeline is
created manually, later storcon will do that.
"""
neon_env_builder.num_safekeepers = 3
# timelines should be created the old way manually
neon_env_builder.storage_controller_config = {
"timelines_onto_safekeepers": False,
}
env = neon_env_builder.init_start()
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
ep = env.endpoints.create("main")
# expected to fail because timeline is not created on safekeepers
with pytest.raises(Exception, match=r".*timed out.*"):
ep.start(safekeeper_generation=1, safekeepers=[1, 2, 3], timeout="2s")
# create inital mconf
mconf = MembershipConfiguration(
generation=1, members=Safekeeper.sks_to_safekeeper_ids(env.safekeepers), new_members=None
)
Safekeeper.create_timeline(tenant_id, timeline_id, env.pageservers[0], mconf, env.safekeepers)
# Once timeline created endpoint should start.
ep.start(safekeeper_generation=1, safekeepers=[1, 2, 3])
ep.safe_psql("CREATE TABLE IF NOT EXISTS t(key int, value text)")
def test_explicit_timeline_creation_storcon(neon_env_builder: NeonEnvBuilder):
"""
Test that having neon.safekeepers starting with g#n: with non zero n enables
generations, which as a side effect disables automatic timeline creation.
Like test_explicit_timeline_creation, but asks the storcon to
create membership conf & timeline.
"""
neon_env_builder.num_safekeepers = 3
neon_env_builder.storage_controller_config = {
"timelines_onto_safekeepers": True,
}
env = neon_env_builder.init_start()
ep = env.endpoints.create("main")
# endpoint should start.
ep.start(safekeeper_generation=1, safekeepers=[1, 2, 3])
ep.safe_psql("CREATE TABLE IF NOT EXISTS t(key int, value text)")
# In this test we check for excessive START_REPLICATION and START_WAL_PUSH queries
# when compute is active, but there are no writes to the timeline. In that case
# pageserver should maintain a single connection to safekeeper and don't attempt
# to reconnect extra times.
#
# The only way to verify this without manipulating time is to sleep for a while.
# In this test we sleep for 60 seconds, so this test takes at least 1 minute to run.
# This is longer than most other tests, we run it only for v16 to save CI resources.
@run_only_on_default_postgres("run only on release build to save CI resources")
@skip_in_debug_build("run only on release build to save CI resources")
def test_idle_reconnections(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
def collect_stats() -> dict[str, float]:
# we need to collect safekeeper_pg_queries_received_total metric from all safekeepers
sk_metrics = [
parse_metrics(sk.http_client().get_metrics_str(), f"safekeeper_{sk.id}")
for sk in env.safekeepers
]
total: dict[str, float] = {}
for sk in sk_metrics:
queries_received = sk.query_all("safekeeper_pg_queries_received_total")
log.info(f"{sk.name} queries received: {queries_received}")
for sample in queries_received:
total[sample.labels["query"]] = total.get(sample.labels["query"], 0) + sample.value
log.info(f"Total queries received: {total}")
# in the perfect world, we should see only one START_REPLICATION query,
# here we check for 5 to prevent flakiness
assert total.get("START_REPLICATION", 0) <= 5
# in the perfect world, we should see ~6 START_WAL_PUSH queries,
# here we check for 15 to prevent flakiness
assert total.get("START_WAL_PUSH", 0) <= 15
return total
collect_stats()
endpoint = env.endpoints.create_start("main")
# just write something to the timeline
endpoint.safe_psql("create table t(i int)")
collect_stats()
# sleep a bit
time.sleep(30)
# force checkpoint in pageserver to advance remote_consistent_lsn
wait_lsn_force_checkpoint(tenant_id, timeline_id, endpoint, env.pageserver)
collect_stats()
time.sleep(30)
final_stats = collect_stats()
# pageserver should connect to safekeepers at least once
assert final_stats.get("START_REPLICATION", 0) >= 1
# walproposer should connect to each safekeeper at least once
assert final_stats.get("START_WAL_PUSH", 0) >= 3
@pytest.mark.parametrize("insert_rows", [0, 100, 100000, 500000])
def test_timeline_copy(neon_env_builder: NeonEnvBuilder, insert_rows: int):
target_percents = [10, 50, 90, 100]
neon_env_builder.num_safekeepers = 3
# we need remote storage that supports copy_object S3 API
neon_env_builder.enable_safekeeper_remote_storage(RemoteStorageKind.MOCK_S3)
env = neon_env_builder.init_start()
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
endpoint = env.endpoints.create_start("main")
lsns = []
def remember_lsn():
lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
lsns.append(lsn)
return lsn
# remember LSN right after timeline creation
lsn = remember_lsn()
log.info(f"LSN after timeline creation: {lsn}")
endpoint.safe_psql("create table t(key int, value text)")
# Note: currently timelines on sks are created by compute and commit of
# transaction above is finished when 2/3 sks received it, so there is a
# small chance that timeline on this sk is not created/initialized yet,
# hence the usage of waiting function to prevent flakiness.
timeline_start_lsn = (
env.safekeepers[0].http_client().get_non_zero_timeline_start_lsn(tenant_id, timeline_id)
)
log.info(f"Timeline start LSN: {timeline_start_lsn}")
current_percent = 0.0
for new_percent in target_percents:
new_rows = insert_rows * (new_percent - current_percent) / 100
current_percent = new_percent
if new_rows == 0:
continue
endpoint.safe_psql(
f"insert into t select generate_series(1, {new_rows}), repeat('payload!', 10)"
)
# remember LSN right after reaching new_percent
lsn = remember_lsn()
log.info(f"LSN after inserting {new_rows} rows: {lsn}")
# TODO: would be also good to test cases where not all segments are uploaded to S3
for lsn in lsns:
new_timeline_id = TimelineId.generate()
log.info(f"Copying branch for LSN {lsn}, to timeline {new_timeline_id}")
orig_digest = (
env.safekeepers[0]
.http_client()
.timeline_digest(tenant_id, timeline_id, timeline_start_lsn, lsn)
)
log.info(f"Original digest: {orig_digest}")
for sk in env.safekeepers:
wait(
partial(is_flush_lsn_caught_up, sk, tenant_id, timeline_id, lsn),
f"sk_id={sk.id} to flush {lsn}",
)
sk.http_client().copy_timeline(
tenant_id,
timeline_id,
{
"target_timeline_id": str(new_timeline_id),
"until_lsn": str(lsn),
},
)
new_digest = sk.http_client().timeline_digest(
tenant_id, new_timeline_id, timeline_start_lsn, lsn
)
log.info(f"Digest after timeline copy on safekeeper {sk.id}: {new_digest}")
assert orig_digest == new_digest
# TODO: test timelines can start after copy
def test_patch_control_file(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 1
env = neon_env_builder.init_start()
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
endpoint = env.endpoints.create_start("main")
# initialize safekeeper
endpoint.safe_psql("create table t(key int, value text)")
# update control file
res = (
env.safekeepers[0]
.http_client()
.patch_control_file(
tenant_id,
timeline_id,
{
"timeline_start_lsn": "0/1",
},
)
)
timeline_start_lsn_before = res["old_control_file"]["timeline_start_lsn"]
timeline_start_lsn_after = res["new_control_file"]["timeline_start_lsn"]
log.info(f"patch_control_file response: {res}")
log.info(
f"updated control file timeline_start_lsn, before {timeline_start_lsn_before}, after {timeline_start_lsn_after}"
)
assert timeline_start_lsn_after == "0/1"
env.safekeepers[0].stop().start()
# wait/check that safekeeper is alive
endpoint.safe_psql("insert into t values (1, 'payload')")
# check that timeline_start_lsn is updated
res = (
env.safekeepers[0]
.http_client()
.debug_dump({"dump_control_file": "true", "timeline_id": str(timeline_id)})
)
log.info(f"dump_control_file response: {res}")
assert res["timelines"][0]["control_file"]["timeline_start_lsn"] == "0/1"
def test_term_bump(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 1
env = neon_env_builder.init_start()
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
endpoint = env.endpoints.create_start("main")
# initialize safekeeper
endpoint.safe_psql("create table t(key int, value text)")
http_cli = env.safekeepers[0].http_client()
# check that bump up to specific term works
curr_term = http_cli.timeline_status(tenant_id, timeline_id).term
bump_to = curr_term + 3
res = http_cli.term_bump(tenant_id, timeline_id, bump_to)
log.info(f"bump to {bump_to} res: {res}")
assert res.current_term >= bump_to
# check that bump to none increments current term
res = http_cli.term_bump(tenant_id, timeline_id, None)
log.info(f"bump to None res: {res}")
assert res.current_term > bump_to
assert res.current_term > res.previous_term
# check that bumping doesn't work downward
res = http_cli.term_bump(tenant_id, timeline_id, 2)
log.info(f"bump to 2 res: {res}")
assert res.current_term > bump_to
assert res.current_term == res.previous_term
# check that this doesn't kill endpoint because last WAL flush was his and
# thus its basebackup is still good
endpoint.safe_psql("insert into t values (1, 'payload')")
# Test disables periodic pushes from safekeeper to the broker and checks that
# pageserver can still discover safekeepers with discovery requests.
def test_broker_discovery(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 3
neon_env_builder.enable_safekeeper_remote_storage(RemoteStorageKind.LOCAL_FS)
env = neon_env_builder.init_start()
env.create_branch("test_broker_discovery")
endpoint = env.endpoints.create_start(
"test_broker_discovery",
config_lines=["shared_buffers=1MB"],
)
endpoint.safe_psql("create table t(i int, payload text)")
# Install extension containing function needed to clear buffer
endpoint.safe_psql("CREATE EXTENSION neon_test_utils")
def do_something():
time.sleep(1)
# generate some data to commit WAL on safekeepers
endpoint.safe_psql("insert into t select generate_series(1,100), 'action'")
# clear the buffers
endpoint.clear_buffers()
# read data to fetch pages from pageserver
endpoint.safe_psql("select sum(i) from t")
do_something()
do_something()
for sk in env.safekeepers:
# Disable periodic broker push, so pageserver won't be able to discover
# safekeepers without sending a discovery request
sk.stop().start(extra_opts=["--disable-periodic-broker-push"])
do_something()
do_something()
# restart pageserver and check how everything works
env.pageserver.stop().start()
do_something()
do_something()
# Test creates 5 endpoints and tries to wake them up randomly. All timeouts are
# configured to be very short, so that we expect that:
# - pageserver will update remote_consistent_lsn very often
# - safekeepers will upload partial WAL segments very often
# - safekeeper will try to evict and unevict timelines
#
# Test checks that there are no critical errors while doing this. Also it checks
# that every safekeeper has at least one successful eviction.
@pytest.mark.parametrize("delete_offloaded_wal", [False, True])
@pytest.mark.parametrize("restart_chance", [0.0, 0.2])
def test_s3_eviction(
neon_env_builder: NeonEnvBuilder, delete_offloaded_wal: bool, restart_chance: float
):
neon_env_builder.num_safekeepers = 3
neon_env_builder.enable_safekeeper_remote_storage(RemoteStorageKind.LOCAL_FS)
neon_env_builder.safekeeper_extra_opts = [
"--enable-offload",
"--partial-backup-timeout",
"50ms",
"--control-file-save-interval",
"1s",
# Safekeepers usually wait a while before evicting something: for this test we want them to
# evict things as soon as they are inactive.
"--eviction-min-resident=100ms",
]
if delete_offloaded_wal:
neon_env_builder.safekeeper_extra_opts.append("--delete-offloaded-wal")
# make lagging_wal_timeout small to force pageserver quickly forget about
# safekeeper after it stops sending updates (timeline is deactivated) to
# make test faster. Won't be needed with
# https://github.com/neondatabase/neon/issues/8148 fixed.
initial_tenant_conf = {"lagging_wal_timeout": "1s", "checkpoint_timeout": "100ms"}
env = neon_env_builder.init_start(initial_tenant_conf=initial_tenant_conf)
n_timelines = 5
branch_names = [f"branch{tlin}" for tlin in range(n_timelines)]
timelines = []
ps_client = env.pageservers[0].http_client()
# start postgres on each timeline
endpoints: list[Endpoint] = []
for branch_name in branch_names:
timeline_id = env.create_branch(branch_name)
timelines.append(timeline_id)
endpoints.append(env.endpoints.create_start(branch_name))
endpoints[-1].safe_psql("CREATE TABLE t(i int)")
endpoints[-1].safe_psql("INSERT INTO t VALUES (0)")
lsn = endpoints[-1].safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0]
log.info(f"{branch_name}: LSN={lsn}")
endpoints[-1].stop()
# update remote_consistent_lsn on pageserver
ps_client.timeline_checkpoint(env.initial_tenant, timelines[-1], wait_until_uploaded=True)
check_values = [0] * n_timelines
event_metrics_seen = False
n_iters = 20
for _ in range(n_iters):
if log.isEnabledFor(logging.DEBUG):
for j in range(n_timelines):
detail = ps_client.timeline_detail(env.initial_tenant, timelines[j])
log.debug(
f"{branch_names[j]}: RCL={detail['remote_consistent_lsn']}, LRL={detail['last_record_lsn']}"
)
i = random.randint(0, n_timelines - 1)
log.info(f"Starting endpoint {i}")
endpoints[i].start()
check_values[i] += 1
res = endpoints[i].safe_psql("UPDATE t SET i = i + 1 RETURNING i")
assert res[0][0] == check_values[i]
lsn = endpoints[i].safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0]
log.info(f"{branch_names[i]}: LSN={lsn}")
endpoints[i].stop()
# update remote_consistent_lsn on pageserver
ps_client.timeline_checkpoint(env.initial_tenant, timelines[i], wait_until_uploaded=True)
# Do metrics check before restarts, since these will reset to zero across a restart
event_metrics_seen |= any(
sk.http_client().get_metric_value(
"safekeeper_eviction_events_started_total", {"kind": "evict"}
)
or 0 > 0
and sk.http_client().get_metric_value(
"safekeeper_eviction_events_completed_total", {"kind": "evict"}
)
or 0 > 0
and sk.http_client().get_metric_value(
"safekeeper_eviction_events_started_total", {"kind": "restore"}
)
or 0 > 0
and sk.http_client().get_metric_value(
"safekeeper_eviction_events_completed_total", {"kind": "restore"}
)
or 0 > 0
for sk in env.safekeepers
)
# restarting random safekeepers
for sk in env.safekeepers:
if random.random() < restart_chance:
sk.stop().start()
time.sleep(0.5)
# require at least one successful eviction in at least one safekeeper
# TODO: require eviction in each safekeeper after https://github.com/neondatabase/neon/issues/8148 is fixed
assert any(
sk.log_contains("successfully evicted timeline")
and sk.log_contains("successfully restored evicted timeline")
for sk in env.safekeepers
)
assert event_metrics_seen
# test safekeeper_evicted_timelines metric
log.info("testing safekeeper_evicted_timelines metric")
# checkpoint pageserver to force remote_consistent_lsn update
for i in range(n_timelines):
ps_client.timeline_checkpoint(env.initial_tenant, timelines[i], wait_until_uploaded=True)
for ep in endpoints:
log.info(ep.is_running())
sk = env.safekeepers[0]
# all timelines must be evicted eventually
def all_evicted():
n_evicted = sk.http_client().get_metric_value("safekeeper_evicted_timelines")
assert n_evicted # make mypy happy
assert int(n_evicted) == n_timelines
wait_until(all_evicted, timeout=30)
# restart should preserve the metric value
sk.stop().start()
wait_until(all_evicted)
# and endpoint start should reduce is
endpoints[0].start()
def one_unevicted():
n_evicted = sk.http_client().get_metric_value("safekeeper_evicted_timelines")
assert n_evicted # make mypy happy
assert int(n_evicted) < n_timelines
wait_until(one_unevicted)
# Test resetting uploaded partial segment state.
def test_backup_partial_reset(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 1
neon_env_builder.enable_safekeeper_remote_storage(default_remote_storage())
# We want to upload/evict quickly, but not too quickly to check that s3 is
# empty before next round of upload happens.
# Note: this test fails with --delete-offloaded-wal, this is expected.
neon_env_builder.safekeeper_extra_opts = [
"--enable-offload",
"--partial-backup-timeout",
"1s",
"--control-file-save-interval",
"1s",
"--eviction-min-resident=1s",
]
# XXX: pageserver currently connects to safekeeper as long as connection
# manager doesn't remove its entry (default lagging_wal_timeout is 10s),
# causing uneviction. It should be fixed to not reconnect if last
# remote_consistent_lsn is communicated and there is nothing to fetch. Make
# value lower to speed up the test.
initial_tenant_conf = {
"lagging_wal_timeout": "1s",
}
env = neon_env_builder.init_start(initial_tenant_conf)
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
endpoint = env.endpoints.create("main")
endpoint.start()
endpoint.safe_psql("create table t(key int, value text)")
endpoint.stop()
sk = env.safekeepers[0]
# eviction won't happen until remote_consistent_lsn catches up.
wait_lsn_force_checkpoint_at_sk(sk, tenant_id, timeline_id, env.pageserver)
http_cli = env.safekeepers[0].http_client()
# wait until eviction happens
def evicted():
eviction_state = http_cli.get_eviction_state(timeline_id)
log.info(f"eviction_state: {eviction_state}")
if isinstance(eviction_state, str) and eviction_state == "Present":
raise Exception("eviction didn't happen yet")
wait_until(evicted)
# it must have uploaded something
uploaded_segs = sk.list_uploaded_segments(tenant_id, timeline_id)
log.info(f"uploaded segments before reset: {uploaded_segs}")
assert len(uploaded_segs) > 0
reset_res = http_cli.backup_partial_reset(tenant_id, timeline_id)
log.info(f"reset res: {reset_res}")
# Backup_partial_reset must have reset the state and dropped s3 segment.
#
# Note: if listing takes more than --partial-backup-timeout test becomes
# flaky because file might be reuploaded. With local fs it shouldn't be an
# issue, but can add retry if this appears.
uploaded_segs = sk.list_uploaded_segments(tenant_id, timeline_id)
log.info(f"uploaded segments after reset: {uploaded_segs}")
assert len(uploaded_segs) == 0
# calling second time should be ok
http_cli.backup_partial_reset(tenant_id, timeline_id)
# inserting data should be ok
endpoint.start()
endpoint.safe_psql("insert into t values(1, 'hehe')")
def test_pull_timeline_partial_segment_integrity(neon_env_builder: NeonEnvBuilder):
"""
Verify that pulling timeline from a SK with an uploaded partial segment
does not lead to consistency issues:
1. Start 3 SKs - only use two
2. Ingest a bit of WAL
3. Wait for partial to be uploaded
4. Pull timeline to the third SK
6. Replace source with destination SK and start compute
5. Wait for source SK to evict timeline
6. Go back to initial compute SK config and validate that
source SK can unevict the timeline (S3 state is consistent)
"""
neon_env_builder.auth_enabled = True
neon_env_builder.num_safekeepers = 3
neon_env_builder.enable_safekeeper_remote_storage(default_remote_storage())
neon_env_builder.safekeeper_extra_opts = [
"--enable-offload",
"--delete-offloaded-wal",
"--partial-backup-timeout",
"500ms",
"--control-file-save-interval",
"500ms",
"--eviction-min-resident=500ms",
]
# XXX: pageserver currently connects to safekeeper as long as connection
# manager doesn't remove its entry (default lagging_wal_timeout is 10s),
# causing uneviction. It should be fixed to not reconnect if last
# remote_consistent_lsn is communicated and there is nothing to fetch. Until
# this is fixed make value lower to speed up the test.
initial_tenant_conf = {
"lagging_wal_timeout": "1s",
"checkpoint_timeout": "100ms",
}
env = neon_env_builder.init_start(initial_tenant_conf=initial_tenant_conf)
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
(src_sk, dst_sk) = (env.safekeepers[0], env.safekeepers[2])
log.info("use only first 2 safekeepers, 3rd will be seeded")
endpoint = env.endpoints.create("main")
endpoint.active_safekeepers = [1, 2]
endpoint.start()
endpoint.safe_psql("create table t(key int, value text)")
endpoint.safe_psql("insert into t select generate_series(1, 180000), 'papaya'")
endpoint.stop()
def source_partial_segment_uploaded():
first_segment_name = "000000010000000000000001"
segs = src_sk.list_uploaded_segments(tenant_id, timeline_id)
candidate_seg = None
for seg in segs:
if "partial" in seg and "sk1" in seg and not seg.startswith(first_segment_name):
candidate_seg = seg
if candidate_seg is not None:
# The term might change, causing the segment to be gc-ed shortly after,
# so give it a bit of time to make sure it's stable.
time.sleep(2)
segs = src_sk.list_uploaded_segments(tenant_id, timeline_id)
assert candidate_seg in segs
return candidate_seg
raise Exception("Partial segment not uploaded yet")
source_partial_segment = wait_until(source_partial_segment_uploaded)
log.info(
f"Uploaded segments before pull are {src_sk.list_uploaded_segments(tenant_id, timeline_id)}"
)
log.info(f"Tracking source partial segment: {source_partial_segment}")
src_flush_lsn = src_sk.get_flush_lsn(tenant_id, timeline_id)
log.info(f"flush_lsn on src before pull_timeline: {src_flush_lsn}")
pageserver_conn_options = {"password": env.auth_keys.generate_tenant_token(tenant_id)}
wait_lsn_force_checkpoint_at(
src_flush_lsn, tenant_id, timeline_id, env.pageserver, pageserver_conn_options
)
dst_sk.pull_timeline([src_sk], tenant_id, timeline_id)
def evicted():
evictions = src_sk.http_client().get_metric_value(
"safekeeper_eviction_events_completed_total", {"kind": "evict"}
)
if evictions is None or evictions == 0:
raise Exception("Eviction did not happen on source safekeeper yet")
wait_until(evicted)
endpoint.start(safekeepers=[2, 3])
def new_partial_segment_uploaded():
segs = dst_sk.list_uploaded_segments(tenant_id, timeline_id)
for seg in segs:
if "partial" in seg and "sk3" in seg:
return seg
raise Exception("Partial segment not uploaded yet")
log.info(
f"Uploaded segments before post-pull ingest are {src_sk.list_uploaded_segments(tenant_id, timeline_id)}"
)
endpoint.safe_psql("insert into t select generate_series(1, 1000), 'pear'")
wait_until(new_partial_segment_uploaded)
log.info(
f"Uploaded segments after post-pull ingest are {src_sk.list_uploaded_segments(tenant_id, timeline_id)}"
)
# Allow for some gc iterations to happen and assert that the original
# uploaded partial segment remains in place.
time.sleep(5)
segs = src_sk.list_uploaded_segments(tenant_id, timeline_id)
assert source_partial_segment in segs
log.info(
f"Uploaded segments at the end are {src_sk.list_uploaded_segments(tenant_id, timeline_id)}"
)
# Restart the endpoint in order to check that the source safekeeper
# can unevict the timeline
endpoint.stop()
endpoint.start(safekeepers=[1, 2])
def unevicted():
unevictions = src_sk.http_client().get_metric_value(
"safekeeper_eviction_events_completed_total", {"kind": "restore"}
)
if unevictions is None or unevictions == 0:
raise Exception("Uneviction did not happen on source safekeeper yet")
wait_until(unevicted)
@pytest.mark.skip(reason="Lakebase mode")
def test_timeline_disk_usage_limit(neon_env_builder: NeonEnvBuilder):
"""
Test that the timeline disk usage circuit breaker works as expected. We test that:
1. The circuit breaker kicks in when the timeline's disk usage exceeds the configured limit,
and it causes writes to hang.
2. The hanging writes unblock when the issue resolves (by restarting the safekeeper in the
test to simulate a more realistic production troubleshooting scenario).
3. We can continue to write as normal after the issue resolves.
4. There is no data corruption throughout the test.
"""
# Set up environment with a very small disk usage limit (1KB)
neon_env_builder.num_safekeepers = 1
remote_storage_kind = s3_storage()
neon_env_builder.enable_safekeeper_remote_storage(remote_storage_kind)
env = neon_env_builder.init_start()
# Create a timeline and endpoint
env.create_branch("test_timeline_disk_usage_limit")
endpoint = env.endpoints.create_start("test_timeline_disk_usage_limit")
# Install the neon extension in the test database. We need it to query perf counter metrics.
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
cur.execute("CREATE EXTENSION IF NOT EXISTS neon")
# Sanity-check safekeeper connection status in neon_perf_counters in the happy case.
cur.execute(
"SELECT value FROM neon_perf_counters WHERE metric = 'num_active_safekeepers'"
)
assert cur.fetchone() == (1,), "Expected 1 active safekeeper"
cur.execute(
"SELECT value FROM neon_perf_counters WHERE metric = 'num_configured_safekeepers'"
)
assert cur.fetchone() == (1,), "Expected 1 configured safekeeper"
# Get the safekeeper
sk = env.safekeepers[0]
# Restart the safekeeper with a very small disk usage limit (1KB)
sk.stop().start(["--max-timeline-disk-usage-bytes=1024"])
# Inject a failpoint to stop WAL backup
with sk.http_client() as http_cli:
http_cli.configure_failpoints([("backup-lsn-range-pausable", "pause")])
# Write some data that will exceed the 1KB limit. While the failpoint is active, this operation
# will hang as Postgres encounters safekeeper-returned errors and retries.
def run_hanging_insert():
with closing(endpoint.connect()) as bg_conn:
with bg_conn.cursor() as bg_cur:
# This should generate more than 1KB of WAL
bg_cur.execute("create table t(key int, value text)")
bg_cur.execute("insert into t select generate_series(1,2000), 'payload'")
# Start the inserts in a background thread
bg_thread = threading.Thread(target=run_hanging_insert)
bg_thread.start()
# Wait for the error message to appear in the compute log
def error_logged():
if endpoint.log_contains("WAL storage utilization exceeds configured limit") is None:
raise Exception("Expected error message not found in compute log yet")
wait_until(error_logged)
log.info("Found expected error message in compute log, resuming.")
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
# Confirm that neon_perf_counters also indicates that there are no active safekeepers
cur.execute(
"SELECT value FROM neon_perf_counters WHERE metric = 'num_active_safekeepers'"
)
assert cur.fetchone() == (0,), "Expected 0 active safekeepers"
cur.execute(
"SELECT value FROM neon_perf_counters WHERE metric = 'num_configured_safekeepers'"
)
assert cur.fetchone() == (1,), "Expected 1 configured safekeeper"
# Sanity check that the hanging insert is indeed still hanging. Otherwise means the circuit breaker we
# implemented didn't work as expected.
time.sleep(2)
assert bg_thread.is_alive(), (
"The hanging insert somehow unblocked without resolving the disk usage issue!"
)
log.info("Restarting the safekeeper to resume WAL backup.")
# Restart the safekeeper with defaults to both clear the failpoint and resume the larger disk usage limit.
for sk in env.safekeepers:
sk.stop().start(extra_opts=[])
# The hanging insert will now complete. Join the background thread so that we can
# verify that the insert completed successfully.
bg_thread.join(timeout=120)
assert not bg_thread.is_alive(), "Hanging insert did not complete after safekeeper restart"
log.info("Hanging insert unblocked.")
# Verify we can continue to write as normal
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
cur.execute("insert into t select generate_series(2001,3000), 'payload'")
# Sanity check data correctness
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
cur.execute("select count(*) from t")
# 2000 rows from first insert + 1000 from last insert
assert cur.fetchone() == (3000,)
def test_global_disk_usage_limit(neon_env_builder: NeonEnvBuilder):
"""
Similar to `test_timeline_disk_usage_limit`, but test that the global disk usage circuit breaker
also works as expected. The test scenario:
1. Create a timeline and endpoint.
2. Mock high disk usage via failpoint
3. Write data to the timeline so that disk usage exceeds the limit.
4. Verify that the writes hang and the expected error message appears in the compute log.
5. Mock low disk usage via failpoint
6. Verify that the hanging writes unblock and we can continue to write as normal.
"""
neon_env_builder.num_safekeepers = 1
remote_storage_kind = s3_storage()
neon_env_builder.enable_safekeeper_remote_storage(remote_storage_kind)
env = neon_env_builder.init_start()
env.create_branch("test_global_disk_usage_limit")
endpoint = env.endpoints.create_start("test_global_disk_usage_limit")
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
cur.execute("create table t2(key int, value text)")
for sk in env.safekeepers:
sk.stop().start(
extra_opts=["--global-disk-check-interval=1s", "--max-global-disk-usage-ratio=0.8"]
)
# Set the failpoint to have the disk usage check return u64::MAX, which definitely exceeds the practical
# limits in the test environment.
for sk in env.safekeepers:
sk.http_client().configure_failpoints(
[("sk-global-disk-usage", "return(18446744073709551615)")]
)
# Wait until the global disk usage limit watcher trips the circuit breaker.
def error_logged_in_sk():
for sk in env.safekeepers:
if sk.log_contains("Global disk usage exceeded limit") is None:
raise Exception("Expected error message not found in safekeeper log yet")
wait_until(error_logged_in_sk)
def run_hanging_insert_global():
with closing(endpoint.connect()) as bg_conn:
with bg_conn.cursor() as bg_cur:
# This should generate more than 1KiB of WAL
bg_cur.execute("insert into t2 select generate_series(1,2000), 'payload'")
bg_thread_global = threading.Thread(target=run_hanging_insert_global)
bg_thread_global.start()
def error_logged_in_compute():
if endpoint.log_contains("Global disk usage exceeded limit") is None:
raise Exception("Expected error message not found in compute log yet")
wait_until(error_logged_in_compute)
log.info("Found the expected error message in compute log, resuming.")
time.sleep(2)
assert bg_thread_global.is_alive(), "Global hanging insert unblocked prematurely!"
# Make the disk usage check always return 0 through the failpoint to simulate the disk pressure easing.
# The SKs should resume accepting WAL writes without restarting.
for sk in env.safekeepers:
sk.http_client().configure_failpoints([("sk-global-disk-usage", "return(0)")])
bg_thread_global.join(timeout=120)
assert not bg_thread_global.is_alive(), "Hanging global insert did not complete after restart"
log.info("Global hanging insert unblocked.")
# Verify that we can continue to write as normal and we don't have obvious data corruption
# following the recovery.
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
cur.execute("insert into t2 select generate_series(2001,3000), 'payload'")
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
cur.execute("select count(*) from t2")
assert cur.fetchone() == (3000,)