diff --git a/pageserver/src/restore_local_repo.rs b/pageserver/src/restore_local_repo.rs index c7f84d7bad..4cc7c7feb5 100644 --- a/pageserver/src/restore_local_repo.rs +++ b/pageserver/src/restore_local_repo.rs @@ -425,7 +425,7 @@ pub fn save_decoded_record( let parsed_xact = XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info); save_xact_record(timeline, lsn, &parsed_xact, decoded)?; // Remove twophase file. see RemoveTwoPhaseFile() in postgres code - info!( + trace!( "unlink twophaseFile for xid {} parsed_xact.xid {} here at {}", decoded.xl_xid, parsed_xact.xid, lsn ); diff --git a/test_runner/batch_others/test_twophase.py b/test_runner/batch_others/test_twophase.py index ab1cdf3001..1d572c5992 100644 --- a/test_runner/batch_others/test_twophase.py +++ b/test_runner/batch_others/test_twophase.py @@ -1,3 +1,5 @@ +import os + from fixtures.zenith_fixtures import PostgresFactory, ZenithPageserver @@ -28,24 +30,59 @@ def test_twophase(zenith_cli, pageserver: ZenithPageserver, postgres: PostgresFa cur.execute("INSERT INTO foo VALUES ('two')") cur.execute("PREPARE TRANSACTION 'insert_two'") + # Prepare a transaction that will insert a row + cur.execute('BEGIN') + cur.execute("INSERT INTO foo VALUES ('three')") + cur.execute("PREPARE TRANSACTION 'insert_three'") + + # Prepare another transaction that will insert a row + cur.execute('BEGIN') + cur.execute("INSERT INTO foo VALUES ('four')") + cur.execute("PREPARE TRANSACTION 'insert_four'") + + # On checkpoint state data copied to files in + # pg_twophase directory and fsynced + cur.execute('CHECKPOINT') + + twophase_files = os.listdir(pg.pg_twophase_dir_path()) + print(twophase_files) + assert len(twophase_files) == 4 + + cur.execute("COMMIT PREPARED 'insert_three'") + cur.execute("ROLLBACK PREPARED 'insert_four'") + cur.execute('CHECKPOINT') + + twophase_files = os.listdir(pg.pg_twophase_dir_path()) + print(twophase_files) + assert len(twophase_files) == 2 + # Create a branch with the transaction in prepared state zenith_cli.run(["branch", "test_twophase_prepared", "test_twophase"]) - pg2 = postgres.create_start( + # Create compute node, but don't start. + # We want to observe pgdata before postgres starts + pg2 = postgres.create( 'test_twophase_prepared', config_lines=['max_prepared_transactions=5'], ) + + # Check that we restored only needed twophase files + twophase_files2 = os.listdir(pg2.pg_twophase_dir_path()) + print(twophase_files2) + assert twophase_files2.sort() == twophase_files.sort() + + pg2 = pg2.start() conn2 = pg2.connect() cur2 = conn2.cursor() - # On the new branch, commit one of the prepared transactions, abort the other one. + # On the new branch, commit one of the prepared transactions, + # abort the other one. cur2.execute("COMMIT PREPARED 'insert_one'") cur2.execute("ROLLBACK PREPARED 'insert_two'") cur2.execute('SELECT * FROM foo') - assert cur2.fetchall() == [('one', )] + assert cur2.fetchall() == [('one',), ('three',)] - # Neither insert is visible on the original branch, the transactions are still - # in prepared state there. + # Only one committed insert is visible on the original branch cur.execute('SELECT * FROM foo') - assert cur.fetchall() == [] + assert cur.fetchall() == [('three',)] diff --git a/test_runner/fixtures/zenith_fixtures.py b/test_runner/fixtures/zenith_fixtures.py index 2a1081af8f..afb350bfea 100644 --- a/test_runner/fixtures/zenith_fixtures.py +++ b/test_runner/fixtures/zenith_fixtures.py @@ -309,6 +309,13 @@ class Postgres(PgProtocol): path = pathlib.Path('pgdatadirs') / 'tenants' / self.tenant_id / self.branch / 'pg_xact' return os.path.join(self.repo_dir, path) + def pg_twophase_dir_path(self) -> str: + """ Path to pg_twophase dir """ + print(self.tenant_id) + print(self.branch) + path = pathlib.Path('pgdatadirs') / 'tenants' / self.tenant_id / self.branch / 'pg_twophase' + return os.path.join(self.repo_dir, path) + def config_file_path(self) -> str: """ Path to postgresql.conf """ filename = pathlib.Path('pgdatadirs') / 'tenants' / self.tenant_id / self.branch / 'postgresql.conf'