Add neon.running_xacts_overflow_policy to make it possible for RO replica to startup without primary even in case running xacts overflow (#8323)

## Problem

Right now if there are too many running xacts to be restored from CLOG
at replica startup,
then replica is not trying to restore them and wait for non-overflown
running-xacs WAL record from primary.
But if primary is not active, then replica will not start at all.

Too many running xacts can be caused by transactions with large number
of subtractions.
But right now it can be also cause by two reasons:
- Lack of shutdown checkpoint which updates `oldestRunningXid` (because
of immediate shutdown)
- nextXid alignment on 1024 boundary (which cause loosing ~1k XIDs on
each restart)

Both problems are somehow addressed now.
But we have existed customers with "sparse" CLOG and lack of
checkpoints.
To be able to start RO replicas for such customers I suggest to add GUC
which allows replica to start even in case of subxacts overflow.

## Summary of changes

Add `neon.running_xacts_overflow_policy` with the following values:
- ignore: restore from CLOG last N XIDs and accept connections
- skip: do not restore any XIDs from CXLOGbut still accept connections
- wait: wait non-overflown running xacts record from primary node

## Checklist before requesting a review

- [ ] I have performed a self-review of my code.
- [ ] If it is a core feature, I have added thorough tests.
- [ ] Do we need to implement analytics? if so did you add the relevant
metrics to the dashboard?
- [ ] If this PR requires public announcement, mark it with
/release-notes label and add several sentences in this section.

## Checklist before merging

- [ ] Do not forget to reformat commit message to not include the above
checklist

---------

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
This commit is contained in:
Konstantin Knizhnik
2024-07-15 15:52:00 +03:00
committed by GitHub
parent 4bdfb96078
commit 8a8b83df27
2 changed files with 88 additions and 5 deletions

View File

@@ -46,6 +46,21 @@ void _PG_init(void);
static int logical_replication_max_snap_files = 300;
static int running_xacts_overflow_policy;
enum RunningXactsOverflowPolicies {
OP_IGNORE,
OP_SKIP,
OP_WAIT
};
static const struct config_enum_entry running_xacts_overflow_policies[] = {
{"ignore", OP_IGNORE, false},
{"skip", OP_SKIP, false},
{"wait", OP_WAIT, false},
{NULL, 0, false}
};
static void
InitLogicalReplicationMonitor(void)
{
@@ -414,6 +429,7 @@ RestoreRunningXactsFromClog(CheckPoint *checkpoint, TransactionId **xids, int *n
restored_xids = (TransactionId *) palloc(max_xcnt * sizeof(TransactionId));
n_restored_xids = 0;
next_prepared_idx = 0;
for (TransactionId xid = from; xid != till;)
{
XLogRecPtr xidlsn;
@@ -424,7 +440,7 @@ RestoreRunningXactsFromClog(CheckPoint *checkpoint, TransactionId **xids, int *n
/*
* "Merge" the prepared transactions into the restored_xids array as
* we go. The prepared transactions array is sorted. This is mostly
* a sanity check to ensure that all the prpeared transactions are
* a sanity check to ensure that all the prepared transactions are
* seen as in-progress. (There is a check after the loop that we didn't
* miss any.)
*/
@@ -522,14 +538,23 @@ RestoreRunningXactsFromClog(CheckPoint *checkpoint, TransactionId **xids, int *n
elog(LOG, "too many running xacts to restore from the CLOG; oldestXid=%u oldestActiveXid=%u nextXid %u",
checkpoint->oldestXid, checkpoint->oldestActiveXid,
XidFromFullTransactionId(checkpoint->nextXid));
goto fail;
switch (running_xacts_overflow_policy)
{
case OP_WAIT:
goto fail;
case OP_IGNORE:
goto success;
case OP_SKIP:
n_restored_xids = 0;
goto success;
}
}
restored_xids[n_restored_xids++] = xid;
skip:
TransactionIdAdvance(xid);
continue;
}
/* sanity check */
@@ -540,11 +565,13 @@ RestoreRunningXactsFromClog(CheckPoint *checkpoint, TransactionId **xids, int *n
Assert(false);
goto fail;
}
success:
elog(LOG, "restored %d running xacts by scanning the CLOG; oldestXid=%u oldestActiveXid=%u nextXid %u",
n_restored_xids, checkpoint->oldestXid, checkpoint->oldestActiveXid, XidFromFullTransactionId(checkpoint->nextXid));
*nxids = n_restored_xids;
*xids = restored_xids;
if (prepared_xids)
pfree(prepared_xids);
return true;
fail:
@@ -581,6 +608,18 @@ _PG_init(void)
restore_running_xacts_callback = RestoreRunningXactsFromClog;
DefineCustomEnumVariable(
"neon.running_xacts_overflow_policy",
"Action performed on snapshot overflow when restoring runnings xacts from CLOG",
NULL,
&running_xacts_overflow_policy,
OP_IGNORE,
running_xacts_overflow_policies,
PGC_POSTMASTER,
0,
NULL, NULL, NULL);
/*
* Important: This must happen after other parts of the extension are
* loaded, otherwise any settings to GUCs that were set before the

View File

@@ -210,7 +210,11 @@ def test_replica_start_wait_subxids_finish(neon_simple_env: NeonEnv):
# Start it in a separate thread, so that we can do other stuff while it's
# blocked waiting for the startup to finish.
wait_for_last_flush_lsn(env, primary, env.initial_tenant, env.initial_timeline)
secondary = env.endpoints.new_replica(origin=primary, endpoint_id="secondary")
secondary = env.endpoints.new_replica(
origin=primary,
endpoint_id="secondary",
config_lines=["neon.running_xacts_overflow_policy='wait'"],
)
start_secondary_thread = threading.Thread(target=secondary.start)
start_secondary_thread.start()
@@ -644,3 +648,43 @@ def test_replica_start_with_prepared_xacts_with_many_subxacts(neon_simple_env: N
wait_replica_caughtup(primary, secondary)
secondary_cur.execute("select count(*) from t")
assert secondary_cur.fetchone() == (200001,)
def test_replica_start_with_too_many_unused_xids(neon_simple_env: NeonEnv):
"""
Test the CLOG-scanning mechanism at hot standby startup in the presence of
large number of unsued XIDs, caused by XID alignment and frequent primary restarts
"""
n_restarts = 50
# Initialize the primary and a test table
env = neon_simple_env
primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary")
with primary.cursor() as primary_cur:
primary_cur.execute("create table t(pk serial primary key, payload integer)")
for _ in range(n_restarts):
with primary.cursor() as primary_cur:
primary_cur.execute("insert into t (payload) values (0)")
# restart primary
primary.stop("immediate")
primary.start()
# Wait for the WAL to be flushed
wait_for_last_flush_lsn(env, primary, env.initial_tenant, env.initial_timeline)
# stop primary to check that we can start replica without it
primary.stop(mode="immediate")
# Create a replica. It should start up normally, because of ignore policy
# mechanism.
secondary = env.endpoints.new_replica_start(
origin=primary,
endpoint_id="secondary",
config_lines=["neon.running_xacts_overflow_policy='ignore'"],
)
# Check that replica see all changes
with secondary.cursor() as secondary_cur:
secondary_cur.execute("select count(*) from t")
assert secondary_cur.fetchone() == (n_restarts,)