mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 01:12:56 +00:00
Move RestoreRunningXactsFromClog() to neon extension
Also change how it works on overflow: instead of continuing with the incomplete information, and risking incorrect query results, bail out and wait for running-xacts record to arrive instead.
This commit is contained in:
@@ -81,10 +81,145 @@ neon_rm_redo(XLogReaderState *record)
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* FIXME: This is an odd place for RestoreRunningXactsFromClog(). It's
|
||||
* handy to call it from neon_rm_startup(). Move this somewhere else
|
||||
* in pgxn/.
|
||||
*/
|
||||
|
||||
#include "access/clog.h"
|
||||
#include "access/transam.h"
|
||||
#include "access/twophase.h"
|
||||
#include "common/controldata_utils.h"
|
||||
#include "storage/proc.h"
|
||||
#include "storage/procarray.h"
|
||||
#include "storage/standby.h"
|
||||
#include "catalog/pg_control.h"
|
||||
|
||||
/*
|
||||
* This is taken from procarry.c
|
||||
* TODO: should we better move it to some header file?
|
||||
*/
|
||||
#define PROCARRAY_MAXPROCS (MaxBackends + max_prepared_xacts)
|
||||
#define TOTAL_MAX_CACHED_SUBXIDS \
|
||||
((PGPROC_MAX_CACHED_SUBXIDS + 1) * PROCARRAY_MAXPROCS)
|
||||
|
||||
/*
|
||||
* We do not want to wait running-xacts record from primary which is
|
||||
* generated each 15 seconds (if there is some activity). So we try
|
||||
* to restore information about running transactions from CLOG.
|
||||
*/
|
||||
static void
|
||||
RestoreRunningXactsFromClog(void)
|
||||
{
|
||||
TransactionId from;
|
||||
TransactionId till;
|
||||
int xcnt;
|
||||
TransactionId *xids;
|
||||
RunningTransactionsData running;
|
||||
TransactionId latestCompletedXid;
|
||||
int max_xcnt;
|
||||
bool overflowed;
|
||||
|
||||
{
|
||||
ControlFileData *ControlFile;
|
||||
bool crc_ok;
|
||||
|
||||
LWLockAcquire(ControlFileLock, LW_SHARED);
|
||||
ControlFile = get_controlfile(DataDir, &crc_ok);
|
||||
from = ControlFile->checkPointCopy.oldestActiveXid;
|
||||
till = XidFromFullTransactionId(ShmemVariableCache->nextXid);
|
||||
LWLockRelease(ControlFileLock);
|
||||
pfree(ControlFile);
|
||||
}
|
||||
|
||||
if (!TransactionIdIsNormal(from))
|
||||
{
|
||||
/*
|
||||
* No checkpoint or running-xacts record was written,
|
||||
* so use most conservative approximation for oldestActiveXid: firstNormalTransactionId.
|
||||
* There are should not be problems with wraparounf because it is not possible that
|
||||
* XID is overflown without writting any checkpoint or running-xact record.
|
||||
*/
|
||||
from = FirstNormalTransactionId;
|
||||
}
|
||||
|
||||
/*
|
||||
* To avoid "too many KnownAssignedXids" error later during replay, we
|
||||
* limit number of reported transactions. This is a tradeoff: if we are
|
||||
* willing to consume more of the KnownAssignedXids space for the XIDs
|
||||
* now, that allows us to start up, but we might run out of space later.
|
||||
*
|
||||
* XXX: What would be the safe limit that would guarantee that we won't
|
||||
* run out of space later? And how much above that are we willing to
|
||||
* "overbook" hoping that we won't need the space later?
|
||||
*/
|
||||
max_xcnt = TOTAL_MAX_CACHED_SUBXIDS / 2;
|
||||
|
||||
xids = (TransactionId *) palloc(max_xcnt * sizeof(TransactionId));
|
||||
xcnt = 0;
|
||||
for (TransactionId xid = from; TransactionIdPrecedes(xid, till);)
|
||||
{
|
||||
XLogRecPtr xidlsn;
|
||||
XidStatus xidstatus = TransactionIdGetStatus(xid, &xidlsn);
|
||||
|
||||
if (xidstatus == TRANSACTION_STATUS_IN_PROGRESS)
|
||||
{
|
||||
if (xcnt < max_xcnt)
|
||||
xids[xcnt] = xid;
|
||||
{
|
||||
/*
|
||||
* Overflowed. We won't be able to install the
|
||||
* RunningTransactions snapshot. We could bail out now, but
|
||||
* keep going to count the XIDs, for the sake of the LOG
|
||||
* message at the end of the function.
|
||||
*/
|
||||
}
|
||||
xcnt++;
|
||||
}
|
||||
TransactionIdAdvance(xid);
|
||||
}
|
||||
|
||||
/*
|
||||
* Construct a RunningTransactions snapshot with the XIDs scanned from
|
||||
* CLOG.
|
||||
*
|
||||
* TODO: test that this works right with prepared transactions.
|
||||
*/
|
||||
if (xcnt <= max_xcnt)
|
||||
{
|
||||
running.xcnt = xcnt;
|
||||
running.subxcnt = 0;
|
||||
running.subxid_overflow = false;
|
||||
running.nextXid = XidFromFullTransactionId(ShmemVariableCache->nextXid);
|
||||
running.oldestRunningXid = from;
|
||||
latestCompletedXid = till;
|
||||
TransactionIdRetreat(latestCompletedXid);
|
||||
Assert(TransactionIdIsNormal(latestCompletedXid));
|
||||
running.latestCompletedXid = latestCompletedXid;
|
||||
running.xids = xids;
|
||||
|
||||
ProcArrayApplyRecoveryInfo(&running);
|
||||
|
||||
StandbyRecoverPreparedTransactions();
|
||||
|
||||
elog(LOG, "initialized known-assigned XIDs with %d in-progress XIDs between %u and %u (max %d)",
|
||||
xcnt, from, till, max_xcnt);
|
||||
}
|
||||
else
|
||||
elog(LOG, "could not initialize known-assigned XIDs because there are too many (sub)transactions active (%d in-progress XIDs between %u and %u, max %d)",
|
||||
xcnt, from, till, max_xcnt);
|
||||
|
||||
pfree(xids);
|
||||
}
|
||||
|
||||
static void
|
||||
neon_rm_startup(void)
|
||||
{
|
||||
/* nothing to do here */
|
||||
if (standbyState == STANDBY_INITIALIZED)
|
||||
RestoreRunningXactsFromClog();
|
||||
else
|
||||
elog(LOG, "neon_rm_startup called with standbyState=%d", standbyState);
|
||||
}
|
||||
|
||||
static void
|
||||
|
||||
@@ -163,12 +163,12 @@ def test_replication_start_subxid_overflow3(neon_simple_env: NeonEnv):
|
||||
n_connections = max_connections - 2
|
||||
n_subxids = 100
|
||||
|
||||
# Start one top tranaction in primary, with lots of subtransactions. This fills up the
|
||||
# known-assigned XIDs space in the standby.
|
||||
# Start one top tranaction in primary, with lots of subtransactions. This uses up much the
|
||||
# known-assigned XIDs space in the standby, but doesn't cause it to overflow.
|
||||
large_p_conn = primary.connect()
|
||||
large_p_cur = large_p_conn.cursor()
|
||||
large_p_cur.execute("begin")
|
||||
large_p_cur.execute("select create_subxacts(20000)")
|
||||
large_p_cur.execute("select create_subxacts(2000)")
|
||||
|
||||
# Create a replica at this LSN
|
||||
wait_for_last_flush_lsn(env, primary, env.initial_tenant, env.initial_timeline)
|
||||
|
||||
2
vendor/postgres-v14
vendored
2
vendor/postgres-v14
vendored
Submodule vendor/postgres-v14 updated: 0d30e28f74...05d21e2b41
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: 74fb144890...2712558896
2
vendor/postgres-v16
vendored
2
vendor/postgres-v16
vendored
Submodule vendor/postgres-v16 updated: 3c2b9d576c...c83756da52
6
vendor/revisions.json
vendored
6
vendor/revisions.json
vendored
@@ -1,5 +1,5 @@
|
||||
{
|
||||
"v16": ["16.3", "3c2b9d576c580e0b5b7108001f959b8c5b42e0a2"],
|
||||
"v15": ["15.7", "74fb144890c4f955db1ef50ee1eeb9d8a6c2f69d"],
|
||||
"v14": ["14.12", "0d30e28f74f49fe6a27a6bd45dcfeb1060656b8f"]
|
||||
"v16": ["16.3", "c83756da5260c784f68ca21c037c7e3fd4ca87c4"],
|
||||
"v15": ["15.7", "2712558896d95fcf27963da6c64a5f844d0621fa"],
|
||||
"v14": ["14.12", "05d21e2b4167245102eddee01a158264fb2eddfe"]
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user