Files
neon/test_runner/regress/test_pageserver_restart.py
Alexander Bayandin 30a7dd630c ruff: enable TC — flake8-type-checking (#11368)
## Problem

`TYPE_CHECKING` is used inconsistently across Python tests.

## Summary of changes
- Update `ruff`: 0.7.0 -> 0.11.2
- Enable TC (flake8-type-checking):
https://docs.astral.sh/ruff/rules/#flake8-type-checking-tc
- (auto)fix all new issues
2025-03-30 18:58:33 +00:00

273 lines
11 KiB
Python

from __future__ import annotations
import random
from contextlib import closing
from typing import TYPE_CHECKING
import psycopg2.errors as pgerr
import pytest
from fixtures.log_helper import log
from fixtures.remote_storage import s3_storage
from fixtures.utils import skip_in_debug_build, wait_until
if TYPE_CHECKING:
from fixtures.neon_fixtures import NeonEnvBuilder
# Test restarting page server, while safekeeper and compute node keep
# running.
def test_pageserver_restart(neon_env_builder: NeonEnvBuilder):
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
# We inject a delay of 15 seconds for tenant activation below.
# Hence, bump the max delay here to not skip over the activation.
neon_env_builder.pageserver_config_override = 'background_task_maximum_delay="20s"'
env = neon_env_builder.init_start()
endpoint = env.endpoints.create_start("main")
pageserver_http = env.pageserver.http_client()
assert (
pageserver_http.get_metric_value("pageserver_tenant_manager_slots", {"mode": "attached"})
== 1
)
pg_conn = endpoint.connect()
cur = pg_conn.cursor()
# Create table, and insert some rows. Make it big enough that it doesn't fit in
# shared_buffers, otherwise the SELECT after restart will just return answer
# from shared_buffers without hitting the page server, which defeats the point
# of this test.
cur.execute("CREATE TABLE foo (t text)")
cur.execute(
"""
INSERT INTO foo
SELECT 'long string to consume some space' || g
FROM generate_series(1, 100000) g
"""
)
# Verify that the table is larger than shared_buffers
cur.execute(
"""
select setting::int * pg_size_bytes(unit) as shared_buffers, pg_relation_size('foo') as tbl_size
from pg_settings where name = 'shared_buffers'
"""
)
row = cur.fetchone()
assert row is not None
log.info(f"shared_buffers is {row[0]}, table size {row[1]}")
assert int(row[0]) < int(row[1])
# Stop the pageserver gracefully and restart it.
env.pageserver.stop()
env.pageserver.start()
# We reloaded our tenant
assert (
pageserver_http.get_metric_value("pageserver_tenant_manager_slots", {"mode": "attached"})
== 1
)
cur.execute("SELECT count(*) FROM foo")
assert cur.fetchone() == (100000,)
# Restart the server again, but delay the loading of tenants, and test what the
# pageserver does if a compute node connects and sends a request for the tenant
# while it's still in Loading state. (It waits for the loading to finish, and then
# processes the request.)
tenant_load_delay_ms = 15000
env.pageserver.stop()
env.pageserver.start(
extra_env_vars={"FAILPOINTS": f"before-attaching-tenant=return({tenant_load_delay_ms})"}
)
# Check that it's in Attaching state
client = env.pageserver.http_client()
tenant_status = client.tenant_status(env.initial_tenant)
log.info("Tenant status : %s", tenant_status)
assert tenant_status["state"]["slug"] == "Attaching"
# Try to read. This waits until the loading finishes, and then return normally.
cur.execute("SELECT count(*) FROM foo")
assert cur.fetchone() == (100000,)
# Wait for metrics to indicate startup complete, so that we can know all
# startup phases will be reflected in the subsequent checks
def assert_complete():
for sample in pageserver_http.get_metrics().query_all(
"pageserver_startup_duration_seconds"
):
labels = dict(sample.labels)
log.info(f"metric {labels['phase']}={sample.value}")
if labels["phase"] == "complete" and sample.value > 0:
return
raise AssertionError("No 'complete' metric yet")
wait_until(assert_complete)
# Expectation callbacks: arg t is sample value, arg p is the previous phase's sample value
expectations = [
(
"initial",
lambda t, p: True,
), # make no assumptions about the initial time point, it could be 0 in theory
# Remote phase of initial_tenant_load should happen before overall phase is complete
("initial_tenant_load_remote", lambda t, p: t >= 0.0 and t >= p),
# Initial tenant load should reflect the delay we injected
("initial_tenant_load", lambda t, p: t >= (tenant_load_delay_ms / 1000.0) and t >= p),
# Subsequent steps should occur in expected order
("background_jobs_can_start", lambda t, p: t > 0 and t >= p),
("complete", lambda t, p: t > 0 and t >= p),
]
# Accumulate the runtime of each startup phase
values = {}
metrics = pageserver_http.get_metrics()
prev_value = None
for sample in metrics.query_all("pageserver_startup_duration_seconds"):
phase = sample.labels["phase"]
log.info(f"metric {phase}={sample.value}")
assert phase in [e[0] for e in expectations], f"Unexpected phase {phase}"
values[phase] = sample
# Apply expectations to the metrics retrieved
for phase, expectation in expectations:
assert phase in values, f"No data for phase {phase}"
sample = values[phase]
assert expectation(sample.value, prev_value), (
f"Unexpected value for {phase}: {sample.value}"
)
prev_value = sample.value
# Startup is complete, this metric should exist but be zero
assert metrics.query_one("pageserver_startup_is_loading").value == 0
# This histogram should have been populated, although we aren't specific about exactly
# which bucket values: just nonzero
assert any(
bucket.value > 0
for bucket in metrics.query_all("pageserver_tenant_activation_seconds_bucket")
)
# Test that repeatedly kills and restarts the page server, while the
# safekeeper and compute node keep running.
@pytest.mark.timeout(540)
@pytest.mark.parametrize("shard_count", [None, 4])
@skip_in_debug_build("times out in debug builds")
def test_pageserver_chaos(neon_env_builder: NeonEnvBuilder, shard_count: int | None):
# same rationale as with the immediate stop; we might leave orphan layers behind.
neon_env_builder.disable_scrub_on_exit()
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
if shard_count is not None:
neon_env_builder.num_pageservers = shard_count
env = neon_env_builder.init_start(initial_tenant_shard_count=shard_count)
# Use a tiny checkpoint distance, to create a lot of layers quickly.
# That allows us to stress the compaction and layer flushing logic more.
tenant, _ = env.create_tenant(
conf={
"checkpoint_distance": "5000000",
}
)
endpoint = env.endpoints.create_start("main", tenant_id=tenant)
# Create table, and insert some rows. Make it big enough that it doesn't fit in
# shared_buffers, otherwise the SELECT after restart will just return answer
# from shared_buffers without hitting the page server, which defeats the point
# of this test.
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
cur.execute("CREATE TABLE foo (id int, t text, updates int)")
cur.execute("CREATE INDEX ON foo (id)")
cur.execute(
"""
INSERT INTO foo
SELECT g, 'long string to consume some space' || g, 0
FROM generate_series(1, 100000) g
"""
)
# Verify that the table is larger than shared_buffers
cur.execute(
"""
select setting::int * pg_size_bytes(unit) as shared_buffers, pg_relation_size('foo') as tbl_size
from pg_settings where name = 'shared_buffers'
"""
)
row = cur.fetchone()
assert row is not None
log.info(f"shared_buffers is {row[0]}, table size {row[1]}")
assert int(row[0]) < int(row[1])
# We run "random" kills using a fixed seed, to improve reproducibility if a test
# failure is related to a particular order of operations.
seed = 0xDEADBEEF
rng = random.Random(seed)
# Update the whole table, then immediately kill and restart the pageserver
for i in range(1, 15):
endpoint.safe_psql("UPDATE foo set updates = updates + 1")
# This kills the pageserver immediately, to simulate a crash
to_kill = rng.choice(env.pageservers)
to_kill.stop(immediate=True)
to_kill.start()
# Check that all the updates are visible
num_updates = endpoint.safe_psql("SELECT sum(updates) FROM foo")[0][0]
assert num_updates == i * 100000
# currently pageserver cannot tolerate the fact that "s3" goes away, and if
# we succeeded in a compaction before shutdown, there might be a lot of
# uploads pending, certainly more than what we can ingest with MOCK_S3
#
# so instead, do a fast shutdown for this one test.
# See https://github.com/neondatabase/neon/issues/8709
env.stop(immediate=True)
def test_pageserver_lost_and_transaction_aborted(neon_env_builder: NeonEnvBuilder):
"""
If pageserver is unavailable during a transaction abort and target relation is
not present in cache, we abort the transaction in ABORT state which triggers a sigabrt.
This is _expected_ behavour
"""
env = neon_env_builder.init_start()
endpoint = env.endpoints.create_start("main", config_lines=["neon.relsize_hash_size=0"])
with closing(endpoint.connect()) as conn, conn.cursor() as cur:
cur.execute("CREATE DATABASE test")
with (
pytest.raises((pgerr.InterfaceError, pgerr.InternalError)),
endpoint.connect(dbname="test") as conn,
conn.cursor() as cur,
):
cur.execute("create table t(b box)")
env.pageserver.stop()
cur.execute("create index ti on t using gist(b)")
def test_pageserver_lost_and_transaction_committed(neon_env_builder: NeonEnvBuilder):
"""
If pageserver is unavailable during a transaction commit and target relation is
not present in cache, we abort the transaction in COMMIT state which triggers a sigabrt.
This is _expected_ behavour
"""
env = neon_env_builder.init_start()
endpoint = env.endpoints.create_start("main", config_lines=["neon.relsize_hash_size=0"])
with closing(endpoint.connect()) as conn, conn.cursor() as cur:
cur.execute("CREATE DATABASE test")
with (
pytest.raises((pgerr.InterfaceError, pgerr.InternalError)),
endpoint.connect(dbname="test") as conn,
conn.cursor() as cur,
):
cur.execute("create table t(t boolean)")
env.pageserver.stop()
cur.execute("drop table t")