Files
neon/test_runner/regress/test_readonly_node.py
Christian Schwarz 2b2a547671 fix(tests): periodic and immediate gc is effectively a no-op in tests (#12431)
The introduction of the default lease deadline feature 9 months ago made
it so
that after PS restart, `.timeline_gc()` calls in Python tests are no-ops
for 10 minute after pageserver startup: the `gc_iteration()` bails with
`Skipping GC because lsn lease deadline is not reached`.

I did some impact analysis in the following PR. About 30 Python tests
are affected:
- https://github.com/neondatabase/neon/pull/12411

Rust tests that don't explicitly enable periodic GC or invoke GC
manually
are unaffected because we disable periodic GC by default in
the `TenantHarness`'s tenant config.
Two tests explicitly did `start_paused=true` + `tokio::time::advance()`,
but it would add cognitive and code bloat to each existing and future
test case that uses TenantHarness if we took that route.

So, this PR sets the default lease deadline feature in both Python
and Rust tests to zero by default. Tests that test the feature were
thus identified by failing the test:
- Python test `test_readonly_node_gc` + `test_lsn_lease_size`
- Rust test `test_lsn_lease`.

To accomplish the above, I changed the code that computes the initial
lease
deadline to respect the pageserver.toml's default tenant config, which
it didn't before (and I would consider a bug). The Python test harness
and the Rust TenantHarness test harness then simply set the default
tenant
config field to zero.

Drive-by:
- `test_lsn_lease_size` was writing a lot of data unnecessarily; reduce
the amount and speed up the test

refs
- PR that introduced default lease deadline:
https://github.com/neondatabase/neon/pull/9055/files
- fixes https://databricks.atlassian.net/browse/LKB-92

---------

Co-authored-by: Christian Schwarz <Christian Schwarz>
2025-07-08 12:56:22 +00:00

351 lines
12 KiB
Python

from __future__ import annotations
import time
from typing import TYPE_CHECKING
import pytest
from fixtures.common_types import Lsn, TenantId, TenantShardId, TimelineId
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
Endpoint,
LogCursor,
NeonEnv,
NeonEnvBuilder,
last_flush_lsn_upload,
tenant_get_shards,
)
from fixtures.pageserver.utils import wait_for_last_record_lsn
from fixtures.utils import query_scalar, wait_until
if TYPE_CHECKING:
from fixtures.pageserver.http import PageserverHttpClient
#
# Create read-only compute nodes, anchored at historical points in time.
#
# This is very similar to the 'test_branch_behind' test, but instead of
# creating branches, creates read-only nodes.
#
def test_readonly_node(neon_simple_env: NeonEnv):
env = neon_simple_env
endpoint_main = env.endpoints.create_start("main")
env.pageserver.allowed_errors.extend(
[
".*basebackup .* failed: invalid basebackup lsn.*",
".*/lsn_lease.*invalid lsn lease request.*",
]
)
main_pg_conn = endpoint_main.connect()
main_cur = main_pg_conn.cursor()
# Create table, and insert the first 100 rows
main_cur.execute("CREATE TABLE foo (t text)")
main_cur.execute(
"""
INSERT INTO foo
SELECT 'long string to consume some space' || g
FROM generate_series(1, 100) g
"""
)
main_cur.execute("SELECT pg_current_wal_insert_lsn()")
lsn_a = query_scalar(main_cur, "SELECT pg_current_wal_insert_lsn()")
log.info("LSN after 100 rows: " + lsn_a)
# Insert some more rows. (This generates enough WAL to fill a few segments.)
main_cur.execute(
"""
INSERT INTO foo
SELECT 'long string to consume some space' || g
FROM generate_series(1, 200000) g
"""
)
lsn_b = query_scalar(main_cur, "SELECT pg_current_wal_insert_lsn()")
log.info("LSN after 200100 rows: " + lsn_b)
# Insert many more rows. This generates enough WAL to fill a few segments.
main_cur.execute(
"""
INSERT INTO foo
SELECT 'long string to consume some space' || g
FROM generate_series(1, 200000) g
"""
)
lsn_c = query_scalar(main_cur, "SELECT pg_current_wal_insert_lsn()")
log.info("LSN after 400100 rows: " + lsn_c)
# Create first read-only node at the point where only 100 rows were inserted
endpoint_hundred = env.endpoints.create_start(
branch_name="main", endpoint_id="ep-readonly_node_hundred", lsn=lsn_a
)
# And another at the point where 200100 rows were inserted
endpoint_more = env.endpoints.create_start(
branch_name="main", endpoint_id="ep-readonly_node_more", lsn=lsn_b
)
# On the 'hundred' node, we should see only 100 rows
hundred_pg_conn = endpoint_hundred.connect()
hundred_cur = hundred_pg_conn.cursor()
hundred_cur.execute("SELECT count(*) FROM foo")
assert hundred_cur.fetchone() == (100,)
# On the 'more' node, we should see 100200 rows
more_pg_conn = endpoint_more.connect()
more_cur = more_pg_conn.cursor()
more_cur.execute("SELECT count(*) FROM foo")
assert more_cur.fetchone() == (200100,)
# All the rows are visible on the main branch
main_cur.execute("SELECT count(*) FROM foo")
assert main_cur.fetchone() == (400100,)
# Check creating a node at segment boundary
endpoint = env.endpoints.create_start(
branch_name="main",
endpoint_id="ep-branch_segment_boundary",
lsn=Lsn("0/3000000"),
)
cur = endpoint.connect().cursor()
cur.execute("SELECT 1")
assert cur.fetchone() == (1,)
# Create node at pre-initdb lsn
with pytest.raises(Exception, match="invalid lsn lease request"):
# compute node startup with invalid LSN should fail
env.endpoints.create_start(
branch_name="main",
endpoint_id="ep-readonly_node_preinitdb",
lsn=Lsn("0/42"),
)
def test_readonly_node_gc(neon_env_builder: NeonEnvBuilder):
"""
Test static endpoint is protected from GC by acquiring and renewing lsn leases.
"""
LSN_LEASE_LENGTH = 8
neon_env_builder.num_pageservers = 2
# GC is manual triggered.
env = neon_env_builder.init_start(
initial_tenant_conf={
# small checkpointing and compaction targets to ensure we generate many upload operations
"checkpoint_distance": f"{128 * 1024}",
"compaction_threshold": "1",
"compaction_target_size": f"{128 * 1024}",
# no PITR horizon, we specify the horizon when we request on-demand GC
"pitr_interval": "0s",
# disable background compaction and GC. We invoke it manually when we want it to happen.
"gc_period": "0s",
"compaction_period": "0s",
# create image layers eagerly, so that GC can remove some layers
"image_creation_threshold": "1",
"image_layer_creation_check_threshold": "0",
# Short lease length to fit test.
"lsn_lease_length": f"{LSN_LEASE_LENGTH}s",
},
initial_tenant_shard_count=2,
)
ROW_COUNT = 500
def generate_updates_on_main(
env: NeonEnv,
ep_main: Endpoint,
data: int,
start=1,
end=ROW_COUNT,
) -> Lsn:
"""
Generates some load on main branch that results in some uploads.
"""
with ep_main.cursor() as cur:
cur.execute(
f"INSERT INTO t0 (v0, v1) SELECT g, '{data}' FROM generate_series({start}, {end}) g ON CONFLICT (v0) DO UPDATE SET v1 = EXCLUDED.v1"
)
cur.execute("VACUUM t0")
last_flush_lsn = last_flush_lsn_upload(
env, ep_main, env.initial_tenant, env.initial_timeline
)
return last_flush_lsn
def get_layers_protected_by_lease(
ps_http: PageserverHttpClient,
tenant_id: TenantId | TenantShardId,
timeline_id: TimelineId,
lease_lsn: Lsn,
) -> set[str]:
"""Get all layers whose start_lsn is less than or equal to the lease lsn."""
layer_map_info = ps_http.layer_map_info(tenant_id, timeline_id)
return set(
x.layer_file_name
for x in layer_map_info.historic_layers
if Lsn(x.lsn_start) <= lease_lsn
)
def trigger_gc_and_select(
env: NeonEnv,
ep_static: Endpoint,
lease_lsn: Lsn,
ctx: str,
offset: None | LogCursor = None,
) -> LogCursor:
"""
Trigger GC manually on all pageservers. Then run an `SELECT` query.
"""
for shard, ps in tenant_get_shards(env, env.initial_tenant):
client = ps.http_client()
layers_guarded_before_gc = get_layers_protected_by_lease(
client, shard, env.initial_timeline, lease_lsn=lease_lsn
)
gc_result = client.timeline_gc(shard, env.initial_timeline, 0)
layers_guarded_after_gc = get_layers_protected_by_lease(
client, shard, env.initial_timeline, lease_lsn=lease_lsn
)
# Note: cannot assert on `layers_removed` here because it could be layers
# not guarded by the lease. Instead, use layer map dump.
assert layers_guarded_before_gc.issubset(layers_guarded_after_gc), (
"Layers guarded by lease before GC should not be removed"
)
log.info(f"{gc_result=}")
# wait for lease renewal before running query.
_, offset = wait_until(
lambda: ep_static.assert_log_contains(
"lsn_lease_bg_task.*Request succeeded", offset=offset
),
)
with ep_static.cursor() as cur:
# Following query should succeed if pages are properly guarded by leases.
cur.execute("SELECT count(*) FROM t0")
assert cur.fetchone() == (ROW_COUNT,)
log.info(f"`SELECT` query succeed after GC, {ctx=}")
return offset
# Insert some records on main branch
with env.endpoints.create_start("main", config_lines=["shared_buffers=1MB"]) as ep_main:
with ep_main.cursor() as cur:
cur.execute("CREATE TABLE t0(v0 int primary key, v1 text)")
lsn = Lsn(0)
for i in range(2):
lsn = generate_updates_on_main(env, ep_main, i)
# Round down to the closest LSN on page boundary (unnormalized).
XLOG_BLCKSZ = 8192
lsn = Lsn((int(lsn) // XLOG_BLCKSZ) * XLOG_BLCKSZ)
with env.endpoints.create_start(
branch_name="main",
endpoint_id="static",
lsn=lsn,
) as ep_static:
with ep_static.cursor() as cur:
cur.execute("SELECT count(*) FROM t0")
assert cur.fetchone() == (ROW_COUNT,)
# Wait for static compute to renew lease at least once.
time.sleep(LSN_LEASE_LENGTH / 2)
generate_updates_on_main(env, ep_main, 3, end=100)
offset = trigger_gc_and_select(
env, ep_static, lease_lsn=lsn, ctx="Before pageservers restart"
)
# Trigger Pageserver restarts
for ps in env.pageservers:
ps.stop()
# Static compute should have at least one lease request failure due to connection.
time.sleep(LSN_LEASE_LENGTH / 2)
ps.start()
trigger_gc_and_select(
env,
ep_static,
lease_lsn=lsn,
ctx="After pageservers restart",
offset=offset,
)
# Reconfigure pageservers
env.pageservers[0].stop()
env.storage_controller.node_configure(
env.pageservers[0].id, {"availability": "Offline"}
)
env.storage_controller.reconcile_until_idle()
trigger_gc_and_select(
env,
ep_static,
lease_lsn=lsn,
ctx="After putting pageserver 0 offline",
offset=offset,
)
# Do some update so we can increment gc_cutoff
generate_updates_on_main(env, ep_main, i, end=100)
# Wait for the existing lease to expire.
time.sleep(LSN_LEASE_LENGTH + 1)
# Now trigger GC again, layers should be removed.
for shard, ps in tenant_get_shards(env, env.initial_tenant):
client = ps.http_client()
gc_result = client.timeline_gc(shard, env.initial_timeline, 0)
log.info(f"{gc_result=}")
assert gc_result["layers_removed"] > 0, "Old layers should be removed after leases expired."
# Similar test, but with more data, and we force checkpoints
def test_timetravel(neon_simple_env: NeonEnv):
env = neon_simple_env
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
client = env.pageserver.http_client()
endpoint = env.endpoints.create_start("main")
lsns = []
with endpoint.cursor() as cur:
cur.execute(
"""
CREATE TABLE testtab(id serial primary key, iteration int, data text);
INSERT INTO testtab (iteration, data) SELECT 0, 'data' FROM generate_series(1, 100000);
"""
)
current_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()"))
lsns.append((0, current_lsn))
for i in range(1, 5):
with endpoint.cursor() as cur:
cur.execute(f"UPDATE testtab SET iteration = {i}")
current_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()"))
lsns.append((i, current_lsn))
# wait until pageserver receives that data
wait_for_last_record_lsn(client, tenant_id, timeline_id, current_lsn)
# run checkpoint manually to force a new layer file
client.timeline_checkpoint(tenant_id, timeline_id)
##### Restart pageserver
env.endpoints.stop_all()
env.pageserver.stop()
env.pageserver.start()
for i, lsn in lsns:
endpoint_old = env.endpoints.create_start(
branch_name="main", endpoint_id=f"ep-old_lsn_{i}", lsn=lsn
)
with endpoint_old.cursor() as cur:
assert query_scalar(cur, f"select count(*) from testtab where iteration={i}") == 100000
assert query_scalar(cur, f"select count(*) from testtab where iteration<>{i}") == 0