diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index 3bcaf5617c..2a0ef37f4a 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -351,19 +351,16 @@ impl PostgresRedoManager { if parsed_xact.info == pg_constants::XLOG_XACT_COMMIT || parsed_xact.info == pg_constants::XLOG_XACT_COMMIT_PREPARED { - transaction_id_set_status( - parsed_xact.xid, - pg_constants::TRANSACTION_STATUS_COMMITTED, - &mut page, - ); - for subxact in &parsed_xact.subxacts { - let pageno = *subxact as u32 / pg_constants::CLOG_XACTS_PER_PAGE; + // Iterate the main XID, followed by all the subxids. + for xid in std::iter::once(&parsed_xact.xid).chain(parsed_xact.subxacts.iter()) + { + let pageno = *xid as u32 / pg_constants::CLOG_XACTS_PER_PAGE; let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; // only update xids on the requested page if rec_segno == segno && blknum == rpageno { transaction_id_set_status( - *subxact, + *xid, pg_constants::TRANSACTION_STATUS_COMMITTED, &mut page, ); @@ -372,19 +369,16 @@ impl PostgresRedoManager { } else if parsed_xact.info == pg_constants::XLOG_XACT_ABORT || parsed_xact.info == pg_constants::XLOG_XACT_ABORT_PREPARED { - transaction_id_set_status( - parsed_xact.xid, - pg_constants::TRANSACTION_STATUS_ABORTED, - &mut page, - ); - for subxact in &parsed_xact.subxacts { - let pageno = *subxact as u32 / pg_constants::CLOG_XACTS_PER_PAGE; + // Iterate the main XID, followed by all the subxids. + for xid in std::iter::once(&parsed_xact.xid).chain(parsed_xact.subxacts.iter()) + { + let pageno = *xid as u32 / pg_constants::CLOG_XACTS_PER_PAGE; let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; // only update xids on the requested page if rec_segno == segno && blknum == rpageno { transaction_id_set_status( - *subxact, + *xid, pg_constants::TRANSACTION_STATUS_ABORTED, &mut page, ); diff --git a/test_runner/batch_others/test_subxacts.py b/test_runner/batch_others/test_subxacts.py new file mode 100644 index 0000000000..562c4786c5 --- /dev/null +++ b/test_runner/batch_others/test_subxacts.py @@ -0,0 +1,41 @@ +from fixtures.zenith_fixtures import ZenithEnv, check_restored_datadir_content +from fixtures.log_helper import log + +pytest_plugins = ("fixtures.zenith_fixtures") + + +# Test subtransactions +# +# The pg_subxact SLRU is not preserved on restarts, and doesn't need to be +# maintained in the pageserver, so subtransactions are not very exciting for +# Zenith. They are included in the commit record though and updated in the +# CLOG. +def test_subxacts(zenith_simple_env: ZenithEnv, test_output_dir): + env = zenith_simple_env + # Create a branch for us + env.zenith_cli(["branch", "test_subxacts", "empty"]) + pg = env.postgres.create_start('test_subxacts') + + log.info("postgres is running on 'test_subxacts' branch") + pg_conn = pg.connect() + cur = pg_conn.cursor() + + cur.execute(''' + CREATE TABLE t1(i int, j int); + ''') + + cur.execute('select pg_switch_wal();') + + # Issue 100 transactions, with 1000 subtransactions in each. + for i in range(100): + cur.execute('begin') + for j in range(1000): + cur.execute(f'savepoint sp{j}') + cur.execute(f'insert into t1 values ({i}, {j})') + cur.execute('commit') + + # force wal flush + cur.execute('checkpoint') + + # Check that we can restore the content of the datadir correctly + check_restored_datadir_content(test_output_dir, env, pg)