diff --git a/pgxn/neon/neon.c b/pgxn/neon/neon.c index e4968bdf89..3197a7e715 100644 --- a/pgxn/neon/neon.c +++ b/pgxn/neon/neon.c @@ -46,6 +46,21 @@ void _PG_init(void); static int logical_replication_max_snap_files = 300; +static int running_xacts_overflow_policy; + +enum RunningXactsOverflowPolicies { + OP_IGNORE, + OP_SKIP, + OP_WAIT +}; + +static const struct config_enum_entry running_xacts_overflow_policies[] = { + {"ignore", OP_IGNORE, false}, + {"skip", OP_SKIP, false}, + {"wait", OP_WAIT, false}, + {NULL, 0, false} +}; + static void InitLogicalReplicationMonitor(void) { @@ -414,6 +429,7 @@ RestoreRunningXactsFromClog(CheckPoint *checkpoint, TransactionId **xids, int *n restored_xids = (TransactionId *) palloc(max_xcnt * sizeof(TransactionId)); n_restored_xids = 0; next_prepared_idx = 0; + for (TransactionId xid = from; xid != till;) { XLogRecPtr xidlsn; @@ -424,7 +440,7 @@ RestoreRunningXactsFromClog(CheckPoint *checkpoint, TransactionId **xids, int *n /* * "Merge" the prepared transactions into the restored_xids array as * we go. The prepared transactions array is sorted. This is mostly - * a sanity check to ensure that all the prpeared transactions are + * a sanity check to ensure that all the prepared transactions are * seen as in-progress. (There is a check after the loop that we didn't * miss any.) */ @@ -522,14 +538,23 @@ RestoreRunningXactsFromClog(CheckPoint *checkpoint, TransactionId **xids, int *n elog(LOG, "too many running xacts to restore from the CLOG; oldestXid=%u oldestActiveXid=%u nextXid %u", checkpoint->oldestXid, checkpoint->oldestActiveXid, XidFromFullTransactionId(checkpoint->nextXid)); - goto fail; + + switch (running_xacts_overflow_policy) + { + case OP_WAIT: + goto fail; + case OP_IGNORE: + goto success; + case OP_SKIP: + n_restored_xids = 0; + goto success; + } } restored_xids[n_restored_xids++] = xid; skip: TransactionIdAdvance(xid); - continue; } /* sanity check */ @@ -540,11 +565,13 @@ RestoreRunningXactsFromClog(CheckPoint *checkpoint, TransactionId **xids, int *n Assert(false); goto fail; } - + success: elog(LOG, "restored %d running xacts by scanning the CLOG; oldestXid=%u oldestActiveXid=%u nextXid %u", n_restored_xids, checkpoint->oldestXid, checkpoint->oldestActiveXid, XidFromFullTransactionId(checkpoint->nextXid)); *nxids = n_restored_xids; *xids = restored_xids; + if (prepared_xids) + pfree(prepared_xids); return true; fail: @@ -581,6 +608,18 @@ _PG_init(void) restore_running_xacts_callback = RestoreRunningXactsFromClog; + + DefineCustomEnumVariable( + "neon.running_xacts_overflow_policy", + "Action performed on snapshot overflow when restoring runnings xacts from CLOG", + NULL, + &running_xacts_overflow_policy, + OP_IGNORE, + running_xacts_overflow_policies, + PGC_POSTMASTER, + 0, + NULL, NULL, NULL); + /* * Important: This must happen after other parts of the extension are * loaded, otherwise any settings to GUCs that were set before the diff --git a/test_runner/regress/test_replica_start.py b/test_runner/regress/test_replica_start.py index 17d476a8a6..0d95109d6b 100644 --- a/test_runner/regress/test_replica_start.py +++ b/test_runner/regress/test_replica_start.py @@ -210,7 +210,11 @@ def test_replica_start_wait_subxids_finish(neon_simple_env: NeonEnv): # Start it in a separate thread, so that we can do other stuff while it's # blocked waiting for the startup to finish. wait_for_last_flush_lsn(env, primary, env.initial_tenant, env.initial_timeline) - secondary = env.endpoints.new_replica(origin=primary, endpoint_id="secondary") + secondary = env.endpoints.new_replica( + origin=primary, + endpoint_id="secondary", + config_lines=["neon.running_xacts_overflow_policy='wait'"], + ) start_secondary_thread = threading.Thread(target=secondary.start) start_secondary_thread.start() @@ -644,3 +648,43 @@ def test_replica_start_with_prepared_xacts_with_many_subxacts(neon_simple_env: N wait_replica_caughtup(primary, secondary) secondary_cur.execute("select count(*) from t") assert secondary_cur.fetchone() == (200001,) + + +def test_replica_start_with_too_many_unused_xids(neon_simple_env: NeonEnv): + """ + Test the CLOG-scanning mechanism at hot standby startup in the presence of + large number of unsued XIDs, caused by XID alignment and frequent primary restarts + """ + n_restarts = 50 + + # Initialize the primary and a test table + env = neon_simple_env + primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary") + with primary.cursor() as primary_cur: + primary_cur.execute("create table t(pk serial primary key, payload integer)") + + for _ in range(n_restarts): + with primary.cursor() as primary_cur: + primary_cur.execute("insert into t (payload) values (0)") + # restart primary + primary.stop("immediate") + primary.start() + + # Wait for the WAL to be flushed + wait_for_last_flush_lsn(env, primary, env.initial_tenant, env.initial_timeline) + + # stop primary to check that we can start replica without it + primary.stop(mode="immediate") + + # Create a replica. It should start up normally, because of ignore policy + # mechanism. + secondary = env.endpoints.new_replica_start( + origin=primary, + endpoint_id="secondary", + config_lines=["neon.running_xacts_overflow_policy='ignore'"], + ) + + # Check that replica see all changes + with secondary.cursor() as secondary_cur: + secondary_cur.execute("select count(*) from t") + assert secondary_cur.fetchone() == (n_restarts,)