Files
neon/test_runner/regress/test_twophase.py
Heikki Linnakangas 9a32aa828d Fix init of WAL page header at startup (#8914)
If the primary is started at an LSN within the first of a 16 MB WAL
segment, the "long XLOG page header" at the beginning of the segment was
not initialized correctly. That has gone unnnoticed, because under
normal circumstances, nothing looks at the page header. The WAL that is
streamed to the safekeepers starts at the new record's LSN, not at the
beginning of the page, so that bogus page header didn't propagate
elsewhere, and a primary server doesn't normally read the WAL its
written. Which is good because the contents of the page would be bogus
anyway, as it wouldn't contain any of the records before the LSN where
the new record is written.

Except that in the following cases a primary does read its own WAL:

1. When there are two-phase transactions in prepared state at
checkpoint. The checkpointer reads the two-phase state from the
XLOG_XACT_PREPARE record, and writes it to a file in pg_twophase/.

2. Logical decoding reads the WAL starting from the replication slot's
restart LSN.

This PR fixes the problem with two-phase transactions. For that, it's
sufficient to initialize the page header correctly. The checkpointer
only needs to read XLOG_XACT_PREPARE records that were generated after
the server startup, so it's still OK that older WAL is missing / bogus.

I have not investigated if we have a problem with logical decoding,
however. Let's deal with that separately.

Special thanks to @Lzjing-1997, who independently found the same bug
and opened a PR to fix it, although I did not use that PR.
2024-09-21 04:00:38 +03:00

164 lines
5.2 KiB
Python

import os
from pathlib import Path
from fixtures.common_types import TimelineId
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnv,
PgBin,
fork_at_current_lsn,
import_timeline_from_vanilla_postgres,
wait_for_wal_insert_lsn,
)
#
# Test branching, when a transaction is in prepared state
#
def twophase_test_on_timeline(env: NeonEnv):
endpoint = env.endpoints.create_start(
"test_twophase", config_lines=["max_prepared_transactions=5"]
)
conn = endpoint.connect()
cur = conn.cursor()
cur.execute("CREATE TABLE foo (t text)")
# Prepare a transaction that will insert a row
cur.execute("BEGIN")
cur.execute("INSERT INTO foo VALUES ('one')")
cur.execute("PREPARE TRANSACTION 'insert_one'")
# Prepare another transaction that will insert a row
cur.execute("BEGIN")
cur.execute("INSERT INTO foo VALUES ('two')")
cur.execute("PREPARE TRANSACTION 'insert_two'")
# Prepare a transaction that will insert a row
cur.execute("BEGIN")
cur.execute("INSERT INTO foo VALUES ('three')")
cur.execute("PREPARE TRANSACTION 'insert_three'")
# Prepare another transaction that will insert a row
cur.execute("BEGIN")
cur.execute("INSERT INTO foo VALUES ('four')")
cur.execute("PREPARE TRANSACTION 'insert_four'")
# On checkpoint state data copied to files in
# pg_twophase directory and fsynced
cur.execute("CHECKPOINT")
twophase_files = os.listdir(endpoint.pg_twophase_dir_path())
log.info(twophase_files)
assert len(twophase_files) == 4
cur.execute("COMMIT PREPARED 'insert_three'")
cur.execute("ROLLBACK PREPARED 'insert_four'")
cur.execute("CHECKPOINT")
twophase_files = os.listdir(endpoint.pg_twophase_dir_path())
log.info(twophase_files)
assert len(twophase_files) == 2
# Create a branch with the transaction in prepared state
fork_at_current_lsn(env, endpoint, "test_twophase_prepared", "test_twophase")
# Start compute on the new branch
endpoint2 = env.endpoints.create_start(
"test_twophase_prepared",
config_lines=["max_prepared_transactions=5"],
)
# Check that we restored only needed twophase files
twophase_files2 = os.listdir(endpoint2.pg_twophase_dir_path())
log.info(twophase_files2)
assert twophase_files2.sort() == twophase_files.sort()
conn2 = endpoint2.connect()
cur2 = conn2.cursor()
# On the new branch, commit one of the prepared transactions,
# abort the other one.
cur2.execute("COMMIT PREPARED 'insert_one'")
cur2.execute("ROLLBACK PREPARED 'insert_two'")
cur2.execute("SELECT * FROM foo")
assert cur2.fetchall() == [("one",), ("three",)]
# Only one committed insert is visible on the original branch
cur.execute("SELECT * FROM foo")
assert cur.fetchall() == [("three",)]
def test_twophase(neon_simple_env: NeonEnv):
"""
Test branching, when a transaction is in prepared state
"""
env = neon_simple_env
env.neon_cli.create_branch("test_twophase")
twophase_test_on_timeline(env)
def test_twophase_nonzero_epoch(
neon_simple_env: NeonEnv,
test_output_dir: Path,
pg_bin: PgBin,
vanilla_pg,
):
"""
Same as 'test_twophase' test, but with a non-zero XID epoch, i.e. after 4 billion XIDs
have been consumed. (This is to ensure that we correctly use the full 64-bit XIDs in
pg_twophase filenames with PostgreSQL v17.)
"""
env = neon_simple_env
# Reset the vanilla Postgres instance with a higher XID epoch
pg_resetwal_path = os.path.join(pg_bin.pg_bin_path, "pg_resetwal")
cmd = [pg_resetwal_path, "--epoch=1000000000", "-D", str(vanilla_pg.pgdatadir)]
pg_bin.run_capture(cmd)
timeline_id = TimelineId.generate()
# Import the cluster to Neon
vanilla_pg.start()
vanilla_pg.safe_psql("create user cloud_admin with password 'postgres' superuser")
import_timeline_from_vanilla_postgres(
test_output_dir,
env,
pg_bin,
env.initial_tenant,
timeline_id,
"test_twophase",
vanilla_pg.connstr(),
)
vanilla_pg.stop() # don't need the original server anymore
twophase_test_on_timeline(env)
def test_twophase_at_wal_segment_start(neon_simple_env: NeonEnv):
"""
Same as 'test_twophase' test, but the server is started at an LSN at the beginning
of a WAL segment. We had a bug where we didn't initialize the "long XLOG page header"
at the beginning of the segment correctly, which was detected when the checkpointer
tried to read the XLOG_XACT_PREPARE record from the WAL, if that record was on the
very first page of a WAL segment and the server was started up at that first page.
"""
env = neon_simple_env
timeline_id = env.neon_cli.create_branch("test_twophase", "main")
endpoint = env.endpoints.create_start(
"test_twophase", config_lines=["max_prepared_transactions=5"]
)
endpoint.safe_psql("SELECT pg_switch_wal()")
# to avoid hitting https://github.com/neondatabase/neon/issues/9079, wait for the
# WAL to reach the pageserver.
wait_for_wal_insert_lsn(env, endpoint, env.initial_tenant, timeline_id)
endpoint.stop_and_destroy()
twophase_test_on_timeline(env)