Add test for SLRU download (#7377)

Before PR #7377, on-demand SLRU download always used the basebackup's
LSN in the SLRU download, but that LSN might get garbage-collected away
in the pageserver. We should request the latest LSN, like with GetPage
requests, with the LSN just indicating that we know that the page hasn't
been changed since the LSN (since the basebackup in this case).

Add test to demonstrate the problem. Without the fix, it fails with
"tried to request a page version that was garbage collected" error from
the pageserver.

I wrote this test as part of earlier PR #6693, but that fell through
the cracks and was never applied. PR #7377 superseded the fix from
that older PR, but the test is still valid.
This commit is contained in:
Heikki Linnakangas
2024-04-25 19:45:48 +03:00
committed by Heikki Linnakangas
parent a2a44ea213
commit 0397427dcf

View File

@@ -0,0 +1,131 @@
from typing import Optional
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder, tenant_get_shards
from fixtures.types import Lsn
from fixtures.utils import query_scalar
#
# Test on-demand download of the pg_xact SLRUs
#
@pytest.mark.parametrize("shard_count", [None, 4])
def test_ondemand_download_pg_xact(neon_env_builder: NeonEnvBuilder, shard_count: Optional[int]):
if shard_count is not None:
neon_env_builder.num_pageservers = shard_count
tenant_conf = {
"lazy_slru_download": "true",
# set PITR interval to be small, so we can do GC
"pitr_interval": "0 s",
}
env = neon_env_builder.init_start(
initial_tenant_conf=tenant_conf, initial_tenant_shard_count=shard_count
)
timeline_id = env.initial_timeline
tenant_id = env.initial_tenant
endpoint = env.endpoints.create_start("main")
pg_conn = endpoint.connect()
cur = pg_conn.cursor()
cur.execute("CREATE EXTENSION neon_test_utils")
# Create a test table
cur.execute("CREATE TABLE clogtest (id integer)")
cur.execute("INSERT INTO clogtest VALUES (1)")
# Consume a lot of XIDs, to create more pg_xact segments
for _ in range(1000):
cur.execute("select test_consume_xids(10000);")
cur.execute("INSERT INTO clogtest VALUES (2)")
for _ in range(1000):
cur.execute("select test_consume_xids(10000);")
cur.execute("INSERT INTO clogtest VALUES (2)")
for _ in range(1000):
cur.execute("select test_consume_xids(10000);")
cur.execute("INSERT INTO clogtest VALUES (3)")
# Restart postgres. After restart, the new instance will download the
# pg_xact segments lazily.
endpoint.stop()
endpoint.start()
pg_conn = endpoint.connect()
cur = pg_conn.cursor()
# Consume more WAL, so that the pageserver can compact and GC older data,
# including the LSN that we started the new endpoint at,
cur.execute("CREATE TABLE anothertable (i int, t text)")
cur.execute(
"INSERT INTO anothertable SELECT g, 'long string to consume some space' || g FROM generate_series(1, 10000) g"
)
# Run GC
shards = tenant_get_shards(env, tenant_id, None)
for tenant_shard_id, pageserver in shards:
client = pageserver.http_client()
client.timeline_checkpoint(tenant_shard_id, timeline_id)
client.timeline_compact(tenant_shard_id, timeline_id)
client.timeline_gc(tenant_shard_id, timeline_id, 0)
# Test that this can still on-demand download the old pg_xact segments
cur.execute("select xmin, xmax, * from clogtest")
tup = cur.fetchall()
log.info(f"tuples = {tup}")
@pytest.mark.parametrize("shard_count", [None, 4])
def test_ondemand_download_replica(neon_env_builder: NeonEnvBuilder, shard_count: Optional[int]):
if shard_count is not None:
neon_env_builder.num_pageservers = shard_count
tenant_conf = {
"lazy_slru_download": "true",
}
env = neon_env_builder.init_start(
initial_tenant_conf=tenant_conf, initial_tenant_shard_count=shard_count
)
endpoint = env.endpoints.create_start("main")
pg_conn = endpoint.connect()
cur = pg_conn.cursor()
cur.execute("CREATE EXTENSION neon_test_utils")
# Create a test table
cur.execute("CREATE TABLE clogtest (id integer)")
cur.execute("INSERT INTO clogtest VALUES (1)")
# Consume a lot of XIDs, to create more pg_xact segments
for _ in range(1000):
cur.execute("select test_consume_xids(10000);")
# Open a new connection and insert another row, but leave
# the transaction open
pg_conn2 = endpoint.connect()
cur2 = pg_conn2.cursor()
cur2.execute("BEGIN")
cur2.execute("INSERT INTO clogtest VALUES (2)")
# Another insert on the first connection, which is committed.
for _ in range(1000):
cur.execute("select test_consume_xids(10000);")
cur.execute("INSERT INTO clogtest VALUES (3)")
# Start standby at this point in time
lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_insert_lsn()"))
endpoint_at_lsn = env.endpoints.create_start(
branch_name="main", endpoint_id="ep-at-lsn", lsn=lsn
)
# Commit transaction 2, after the standby was launched.
cur2.execute("COMMIT")
# The replica should not see transaction 2 as committed.
conn_replica = endpoint_at_lsn.connect()
cur_replica = conn_replica.cursor()
cur_replica.execute("SELECT * FROM clogtest")
assert cur_replica.fetchall() == [(1,), (3,)]