Add test restarting compute at WAL page boundary (#10111)

## Problem

We've had similar test in test_logical_replication, but then removed it
because it wasn't needed to trigger LR related bug. Restarting at WAL
page boundary is still a useful test, so add it separately back.

## Summary of changes

Add the test.
This commit is contained in:
Arseny Sher
2024-12-16 17:53:04 +03:00
committed by GitHub
parent 1ed0e52bc8
commit c5e3314c6e

View File

@@ -1090,6 +1090,62 @@ def test_restart_endpoint_after_switch_wal(neon_env_builder: NeonEnvBuilder):
endpoint.safe_psql("SELECT 'works'")
# Test restarting compute at WAL page boundary.
def test_restart_endpoint_wal_page_boundary(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
ep = env.endpoints.create_start("main")
ep.safe_psql("create table t (i int)")
with ep.cursor() as cur:
# measure how much space logical message takes. Sometimes first attempt
# creates huge message and then it stabilizes, have no idea why.
for _ in range(3):
lsn_before = Lsn(query_scalar(cur, "select pg_current_wal_lsn()"))
log.info(f"current_lsn={lsn_before}")
# Non-transactional logical message doesn't write WAL, only XLogInsert's
# it, so use transactional. Which is a bit problematic as transactional
# necessitates commit record. Alternatively we can do smth like
# select neon_xlogflush(pg_current_wal_insert_lsn());
# but isn't much better + that particular call complains on 'xlog flush
# request 0/282C018 is not satisfied' as pg_current_wal_insert_lsn skips
# page headers.
payload = "blahblah"
cur.execute(f"select pg_logical_emit_message(true, 'pref', '{payload}')")
lsn_after_by_curr_wal_lsn = Lsn(query_scalar(cur, "select pg_current_wal_lsn()"))
lsn_diff = lsn_after_by_curr_wal_lsn - lsn_before
logical_message_base = lsn_after_by_curr_wal_lsn - lsn_before - len(payload)
log.info(
f"before {lsn_before}, after {lsn_after_by_curr_wal_lsn}, lsn diff is {lsn_diff}, base {logical_message_base}"
)
# and write logical message spanning exactly as we want
lsn_before = Lsn(query_scalar(cur, "select pg_current_wal_lsn()"))
log.info(f"current_lsn={lsn_before}")
curr_lsn = Lsn(query_scalar(cur, "select pg_current_wal_lsn()"))
offs = int(curr_lsn) % 8192
till_page = 8192 - offs
target_lsn = curr_lsn + till_page
payload_len = (
till_page - logical_message_base - 8
) # not sure why 8 is here, it is deduced from experiments
log.info(
f"current_lsn={curr_lsn}, offs {offs}, till_page {till_page}, target_lsn {target_lsn}"
)
cur.execute(f"select pg_logical_emit_message(true, 'pref', 'f{'a' * payload_len}')")
supposedly_contrecord_end = Lsn(query_scalar(cur, "select pg_current_wal_lsn()"))
log.info(f"supposedly_page_boundary={supposedly_contrecord_end}")
# The calculations to hit the page boundary are very fuzzy, so just
# ignore test if we fail to reach it.
if not (int(supposedly_contrecord_end) % 8192 == 0):
pytest.skip(f"missed page boundary, bad luck: lsn is {supposedly_contrecord_end}")
ep.stop(mode="immediate")
ep = env.endpoints.create_start("main")
ep.safe_psql("insert into t values (42)") # should be ok
# Context manager which logs passed time on exit.
class DurationLogger:
def __init__(self, desc):