Compare commits

...

6 Commits

Author SHA1 Message Date
Heikki Linnakangas
d99c1abfc3 Move RestoreRunningXactsFromClog() to neon extension
Also change how it works on overflow: instead of continuing with the
incomplete information, and risking incorrect query results, bail out
and wait for running-xacts record to arrive instead.
2024-05-23 11:12:36 +03:00
Konstantin Knizhnik
b1eb0e135e Fix test_replication_start_subxid_overflow.py test 2024-05-23 11:02:06 +03:00
Heikki Linnakangas
87c701fca7 Add two test cases, which are failing 2024-05-23 11:02:06 +03:00
Konstantin Knizhnik
52dcaea12e Make ruff happy 2024-05-23 11:02:06 +03:00
Konstantin Knizhnik
57ac0a122b Limit number of reported known xids to avoid too many KnownAssignedXids error 2024-05-23 11:02:06 +03:00
Konstantin Knizhnik
315be6a664 Restore running xacts from CLOG on replica startup 2024-05-23 11:02:06 +03:00
8 changed files with 382 additions and 9 deletions

View File

@@ -373,6 +373,7 @@ impl WalIngest {
if info == pg_constants::XLOG_RUNNING_XACTS {
let xlrec = crate::walrecord::XlRunningXacts::decode(&mut buf);
self.checkpoint.oldestActiveXid = xlrec.oldest_running_xid;
self.checkpoint_modified = true;
}
}
_x => {

View File

@@ -81,10 +81,145 @@ neon_rm_redo(XLogReaderState *record)
}
}
/*
* FIXME: This is an odd place for RestoreRunningXactsFromClog(). It's
* handy to call it from neon_rm_startup(). Move this somewhere else
* in pgxn/.
*/
#include "access/clog.h"
#include "access/transam.h"
#include "access/twophase.h"
#include "common/controldata_utils.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "storage/standby.h"
#include "catalog/pg_control.h"
/*
* This is taken from procarry.c
* TODO: should we better move it to some header file?
*/
#define PROCARRAY_MAXPROCS (MaxBackends + max_prepared_xacts)
#define TOTAL_MAX_CACHED_SUBXIDS \
((PGPROC_MAX_CACHED_SUBXIDS + 1) * PROCARRAY_MAXPROCS)
/*
* We do not want to wait running-xacts record from primary which is
* generated each 15 seconds (if there is some activity). So we try
* to restore information about running transactions from CLOG.
*/
static void
RestoreRunningXactsFromClog(void)
{
TransactionId from;
TransactionId till;
int xcnt;
TransactionId *xids;
RunningTransactionsData running;
TransactionId latestCompletedXid;
int max_xcnt;
bool overflowed;
{
ControlFileData *ControlFile;
bool crc_ok;
LWLockAcquire(ControlFileLock, LW_SHARED);
ControlFile = get_controlfile(DataDir, &crc_ok);
from = ControlFile->checkPointCopy.oldestActiveXid;
till = XidFromFullTransactionId(ShmemVariableCache->nextXid);
LWLockRelease(ControlFileLock);
pfree(ControlFile);
}
if (!TransactionIdIsNormal(from))
{
/*
* No checkpoint or running-xacts record was written,
* so use most conservative approximation for oldestActiveXid: firstNormalTransactionId.
* There are should not be problems with wraparounf because it is not possible that
* XID is overflown without writting any checkpoint or running-xact record.
*/
from = FirstNormalTransactionId;
}
/*
* To avoid "too many KnownAssignedXids" error later during replay, we
* limit number of reported transactions. This is a tradeoff: if we are
* willing to consume more of the KnownAssignedXids space for the XIDs
* now, that allows us to start up, but we might run out of space later.
*
* XXX: What would be the safe limit that would guarantee that we won't
* run out of space later? And how much above that are we willing to
* "overbook" hoping that we won't need the space later?
*/
max_xcnt = TOTAL_MAX_CACHED_SUBXIDS / 2;
xids = (TransactionId *) palloc(max_xcnt * sizeof(TransactionId));
xcnt = 0;
for (TransactionId xid = from; TransactionIdPrecedes(xid, till);)
{
XLogRecPtr xidlsn;
XidStatus xidstatus = TransactionIdGetStatus(xid, &xidlsn);
if (xidstatus == TRANSACTION_STATUS_IN_PROGRESS)
{
if (xcnt < max_xcnt)
xids[xcnt] = xid;
{
/*
* Overflowed. We won't be able to install the
* RunningTransactions snapshot. We could bail out now, but
* keep going to count the XIDs, for the sake of the LOG
* message at the end of the function.
*/
}
xcnt++;
}
TransactionIdAdvance(xid);
}
/*
* Construct a RunningTransactions snapshot with the XIDs scanned from
* CLOG.
*
* TODO: test that this works right with prepared transactions.
*/
if (xcnt <= max_xcnt)
{
running.xcnt = xcnt;
running.subxcnt = 0;
running.subxid_overflow = false;
running.nextXid = XidFromFullTransactionId(ShmemVariableCache->nextXid);
running.oldestRunningXid = from;
latestCompletedXid = till;
TransactionIdRetreat(latestCompletedXid);
Assert(TransactionIdIsNormal(latestCompletedXid));
running.latestCompletedXid = latestCompletedXid;
running.xids = xids;
ProcArrayApplyRecoveryInfo(&running);
StandbyRecoverPreparedTransactions();
elog(LOG, "initialized known-assigned XIDs with %d in-progress XIDs between %u and %u (max %d)",
xcnt, from, till, max_xcnt);
}
else
elog(LOG, "could not initialize known-assigned XIDs because there are too many (sub)transactions active (%d in-progress XIDs between %u and %u, max %d)",
xcnt, from, till, max_xcnt);
pfree(xids);
}
static void
neon_rm_startup(void)
{
/* nothing to do here */
if (standbyState == STANDBY_INITIALIZED)
RestoreRunningXactsFromClog();
else
elog(LOG, "neon_rm_startup called with standbyState=%d", standbyState);
}
static void

View File

@@ -1,9 +1,7 @@
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, wait_replica_caughtup
@pytest.mark.xfail
def test_replication_start(neon_simple_env: NeonEnv):
env = neon_simple_env

View File

@@ -0,0 +1,239 @@
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, wait_for_last_flush_lsn, wait_replica_caughtup
# This test replica startup in case of large number of active subtransactions at primary node.
# Number of known transaction xids is limited and if we do not limit number of transactions
# restored from CLOG, then it will cause crash of replica.
def test_replication_start_subxid_overflow(neon_simple_env: NeonEnv):
env = neon_simple_env
with env.endpoints.create_start(branch_name="main", endpoint_id="primary") as primary:
with primary.connect() as p_con:
with p_con.cursor() as p_cur:
p_cur.execute("begin")
p_cur.execute("create table t(pk serial primary key, payload integer)")
p_cur.execute(
"""create or replace function create_subxacts(n integer) returns void as $$
declare
i integer;
begin
for i in 1..n loop
begin
insert into t (payload) values (0);
exception
when others then
raise exception 'caught something';
end;
end loop;
end; $$ language plpgsql"""
)
p_cur.execute("select create_subxacts(100000)")
p_cur.execute("select txid_current()")
xid = p_cur.fetchall()[0][0]
log.info(f"Master transaction {xid}")
with env.endpoints.new_replica_start(
origin=primary, endpoint_id="secondary"
) as secondary:
wait_replica_caughtup(primary, secondary)
with secondary.connect() as s_con:
with s_con.cursor() as s_cur:
# Enforce setting hint bits for pg_class tuples.
# If master's transaction is not marked as in-progress in MVCC snapshot,
# then XMIN_INVALID hint bit will be set for table's 't' tuple makeing it invisible.
s_cur.execute("select * from pg_class")
p_cur.execute("commit")
wait_replica_caughtup(primary, secondary)
s_cur.execute("select * from t where pk = 1")
assert s_cur.fetchone() == (1, 0)
# Test starting a replica at a point in time where there is a transaction
# running in primary with lots of subtransactions. Test MVCC in standby before
# and after the commit of the transaction.
#
# XXX: This currently fails, the query in standby incorrectly sees some sub-xids
# as committed:
#
# test_runner/regress/test_replication_start_subxid_overflow.py:105: AssertionError
#
# FAILED test_runner/regress/test_replication_start_subxid_overflow.py::test_replication_start_subxid_overflow2[debug-pg16] - assert (93229,) == (0,)
def test_replication_start_subxid_overflow2(neon_simple_env: NeonEnv):
env = neon_simple_env
primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary")
p_con = primary.connect()
p_cur = p_con.cursor()
p_cur.execute("create table t(pk serial primary key, payload integer)")
p_cur.execute(
"""create or replace function create_subxacts(n integer) returns void as $$
declare
i integer;
begin
for i in 1..n loop
begin
insert into t (payload) values (0);
exception
when others then
raise exception 'caught something';
end;
end loop;
end; $$ language plpgsql"""
)
# Start a new transaction in primary, with a lot of subtransactions
p_cur.execute("begin")
p_cur.execute("select create_subxacts(100000)")
# Create a replica at this LSN
wait_for_last_flush_lsn(env, primary, env.initial_tenant, env.initial_timeline)
with env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary") as secondary:
s_con = secondary.connect()
s_cur = s_con.cursor()
# The transaction in primary has not committed yet.
wait_replica_caughtup(primary, secondary)
s_cur.execute("select count(*) from t")
assert s_cur.fetchone() == (0,)
# Add more sub-xids to the same transaction in primary
p_cur.execute("select create_subxacts(10000)")
# The transaction still hasn't committed
wait_replica_caughtup(primary, secondary)
s_cur.execute("BEGIN ISOLATION LEVEL REPEATABLE READ")
s_cur.execute("select count(*) from t")
assert s_cur.fetchone() == (0,)
# Commit the transaction in primary
p_cur.execute("commit")
# Should still be invisible to the old snapshot
wait_replica_caughtup(primary, secondary)
s_cur.execute("select count(*) from t")
assert s_cur.fetchone() == (0,)
# Commit the REPEATABLE READ transaction in standby
s_cur.execute("commit")
# The transaction should now visible to new snapshot in standby
s_cur.execute("select count(*) from t")
assert s_cur.fetchone() == (200000,)
# Test starting a standby from situation that there is one transaction running
# in primary, with lots of subtransactions. Then, after the standby is running,
# start a large number of new transcations, with lots of subtransactions each, in
# the primary.
#
# XXX: This currently crashes the startup process in secondary with:
#
# FATAL: too many KnownAssignedXids
# CONTEXT: WAL redo at 0/1895CB0 for neon/INSERT: off: 25, flags: 0x08; blkref #0: rel 1663/5/16385, blk 64
def test_replication_start_subxid_overflow3(neon_simple_env: NeonEnv):
env = neon_simple_env
primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary")
with primary.connect() as p_con:
p_cur = p_con.cursor()
p_cur.execute("create table t(pk serial primary key, payload integer)")
p_cur.execute(
"""create or replace function create_subxacts(n integer) returns void as $$
declare
i integer;
begin
for i in 1..n loop
begin
insert into t (payload) values (0);
exception
when others then
raise exception 'caught something';
end;
end loop;
end; $$ language plpgsql"""
)
p_cur.execute(f"show max_connections")
max_connections = int(p_cur.fetchone()[0])
n_connections = max_connections - 2
n_subxids = 100
# Start one top tranaction in primary, with lots of subtransactions. This uses up much the
# known-assigned XIDs space in the standby, but doesn't cause it to overflow.
large_p_conn = primary.connect()
large_p_cur = large_p_conn.cursor()
large_p_cur.execute("begin")
large_p_cur.execute("select create_subxacts(2000)")
# Create a replica at this LSN
wait_for_last_flush_lsn(env, primary, env.initial_tenant, env.initial_timeline)
with env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary") as secondary:
s_con = secondary.connect()
s_cur = s_con.cursor()
# The transaction in primary has not committed yet.
wait_replica_caughtup(primary, secondary)
s_cur.execute("select count(*) from t")
assert s_cur.fetchone() == (0,)
# Start max number of top transactions in primary, with a lot of
# subtransactions each We add the subtransactions to each top
# transaction in a round-robin fashion, instead of adding a lot of
# subtransactions ot one top transaction at a time. This way, we will
# have the max number of subtransactions in the in-memory subxid cache
# of each top transaction, until they all overflow.
#
# Currently, PGPROC_MAX_CACHED_SUBXIDS == 64, so this will overflow the
# all the subxid caches after creating 64 subxids in each top
# transaction. The point just before the caches have overflowed is the
# most interesting point in time, but we'll keep going beyond that, to
# ensure that this test is robust even if PGPROC_MAX_CACHED_SUBXIDS
# changes.
p_cons = []
p_curs = []
for i in range(0, n_connections):
p_con = primary.connect()
p_cur = p_con.cursor()
p_cur.execute("begin")
p_cons.append(p_con)
p_curs.append(p_cur)
for subxid in range(0, n_subxids):
for i in range(0, n_connections):
p_curs[i].execute("select create_subxacts(1)")
# None of the transactions have committed yet, so they should be
# invisible in standby.
s_cur.execute("select count(*) from t")
assert s_cur.fetchone() == (0,)
# The transactions still haven't committed. Start a new snapshot, we
# will use it later, after committing in the primary.
wait_replica_caughtup(primary, secondary)
s_cur.execute("BEGIN ISOLATION LEVEL REPEATABLE READ")
s_cur.execute("select count(*) from t")
assert s_cur.fetchone() == (0,)
# Commit all the transactions in the primary
large_p_cur.execute("commit")
for i in range(0, n_connections):
p_curs[i].execute("commit")
# All the XIDs still be invisible to the old snapshot
wait_replica_caughtup(primary, secondary)
s_cur.execute("select count(*) from t")
assert s_cur.fetchone() == (0,)
# Commit the REPEATABLE READ transaction in standby
s_cur.execute("commit")
# The transaction should now visible to new snapshot in standby
s_cur.execute("select count(*) from t")
assert s_cur.fetchone() == (n_connections * n_subxids,)

View File

@@ -1,5 +1,5 @@
{
"v16": ["16.3", "3c2b9d576c580e0b5b7108001f959b8c5b42e0a2"],
"v15": ["15.7", "74fb144890c4f955db1ef50ee1eeb9d8a6c2f69d"],
"v14": ["14.12", "0d30e28f74f49fe6a27a6bd45dcfeb1060656b8f"]
"v16": ["16.3", "c83756da5260c784f68ca21c037c7e3fd4ca87c4"],
"v15": ["15.7", "2712558896d95fcf27963da6c64a5f844d0621fa"],
"v14": ["14.12", "05d21e2b4167245102eddee01a158264fb2eddfe"]
}