mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-14 03:30:36 +00:00
Compare commits
6 Commits
skyzh/laye
...
restore_ru
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d99c1abfc3 | ||
|
|
b1eb0e135e | ||
|
|
87c701fca7 | ||
|
|
52dcaea12e | ||
|
|
57ac0a122b | ||
|
|
315be6a664 |
@@ -373,6 +373,7 @@ impl WalIngest {
|
||||
if info == pg_constants::XLOG_RUNNING_XACTS {
|
||||
let xlrec = crate::walrecord::XlRunningXacts::decode(&mut buf);
|
||||
self.checkpoint.oldestActiveXid = xlrec.oldest_running_xid;
|
||||
self.checkpoint_modified = true;
|
||||
}
|
||||
}
|
||||
_x => {
|
||||
|
||||
@@ -81,10 +81,145 @@ neon_rm_redo(XLogReaderState *record)
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* FIXME: This is an odd place for RestoreRunningXactsFromClog(). It's
|
||||
* handy to call it from neon_rm_startup(). Move this somewhere else
|
||||
* in pgxn/.
|
||||
*/
|
||||
|
||||
#include "access/clog.h"
|
||||
#include "access/transam.h"
|
||||
#include "access/twophase.h"
|
||||
#include "common/controldata_utils.h"
|
||||
#include "storage/proc.h"
|
||||
#include "storage/procarray.h"
|
||||
#include "storage/standby.h"
|
||||
#include "catalog/pg_control.h"
|
||||
|
||||
/*
|
||||
* This is taken from procarry.c
|
||||
* TODO: should we better move it to some header file?
|
||||
*/
|
||||
#define PROCARRAY_MAXPROCS (MaxBackends + max_prepared_xacts)
|
||||
#define TOTAL_MAX_CACHED_SUBXIDS \
|
||||
((PGPROC_MAX_CACHED_SUBXIDS + 1) * PROCARRAY_MAXPROCS)
|
||||
|
||||
/*
|
||||
* We do not want to wait running-xacts record from primary which is
|
||||
* generated each 15 seconds (if there is some activity). So we try
|
||||
* to restore information about running transactions from CLOG.
|
||||
*/
|
||||
static void
|
||||
RestoreRunningXactsFromClog(void)
|
||||
{
|
||||
TransactionId from;
|
||||
TransactionId till;
|
||||
int xcnt;
|
||||
TransactionId *xids;
|
||||
RunningTransactionsData running;
|
||||
TransactionId latestCompletedXid;
|
||||
int max_xcnt;
|
||||
bool overflowed;
|
||||
|
||||
{
|
||||
ControlFileData *ControlFile;
|
||||
bool crc_ok;
|
||||
|
||||
LWLockAcquire(ControlFileLock, LW_SHARED);
|
||||
ControlFile = get_controlfile(DataDir, &crc_ok);
|
||||
from = ControlFile->checkPointCopy.oldestActiveXid;
|
||||
till = XidFromFullTransactionId(ShmemVariableCache->nextXid);
|
||||
LWLockRelease(ControlFileLock);
|
||||
pfree(ControlFile);
|
||||
}
|
||||
|
||||
if (!TransactionIdIsNormal(from))
|
||||
{
|
||||
/*
|
||||
* No checkpoint or running-xacts record was written,
|
||||
* so use most conservative approximation for oldestActiveXid: firstNormalTransactionId.
|
||||
* There are should not be problems with wraparounf because it is not possible that
|
||||
* XID is overflown without writting any checkpoint or running-xact record.
|
||||
*/
|
||||
from = FirstNormalTransactionId;
|
||||
}
|
||||
|
||||
/*
|
||||
* To avoid "too many KnownAssignedXids" error later during replay, we
|
||||
* limit number of reported transactions. This is a tradeoff: if we are
|
||||
* willing to consume more of the KnownAssignedXids space for the XIDs
|
||||
* now, that allows us to start up, but we might run out of space later.
|
||||
*
|
||||
* XXX: What would be the safe limit that would guarantee that we won't
|
||||
* run out of space later? And how much above that are we willing to
|
||||
* "overbook" hoping that we won't need the space later?
|
||||
*/
|
||||
max_xcnt = TOTAL_MAX_CACHED_SUBXIDS / 2;
|
||||
|
||||
xids = (TransactionId *) palloc(max_xcnt * sizeof(TransactionId));
|
||||
xcnt = 0;
|
||||
for (TransactionId xid = from; TransactionIdPrecedes(xid, till);)
|
||||
{
|
||||
XLogRecPtr xidlsn;
|
||||
XidStatus xidstatus = TransactionIdGetStatus(xid, &xidlsn);
|
||||
|
||||
if (xidstatus == TRANSACTION_STATUS_IN_PROGRESS)
|
||||
{
|
||||
if (xcnt < max_xcnt)
|
||||
xids[xcnt] = xid;
|
||||
{
|
||||
/*
|
||||
* Overflowed. We won't be able to install the
|
||||
* RunningTransactions snapshot. We could bail out now, but
|
||||
* keep going to count the XIDs, for the sake of the LOG
|
||||
* message at the end of the function.
|
||||
*/
|
||||
}
|
||||
xcnt++;
|
||||
}
|
||||
TransactionIdAdvance(xid);
|
||||
}
|
||||
|
||||
/*
|
||||
* Construct a RunningTransactions snapshot with the XIDs scanned from
|
||||
* CLOG.
|
||||
*
|
||||
* TODO: test that this works right with prepared transactions.
|
||||
*/
|
||||
if (xcnt <= max_xcnt)
|
||||
{
|
||||
running.xcnt = xcnt;
|
||||
running.subxcnt = 0;
|
||||
running.subxid_overflow = false;
|
||||
running.nextXid = XidFromFullTransactionId(ShmemVariableCache->nextXid);
|
||||
running.oldestRunningXid = from;
|
||||
latestCompletedXid = till;
|
||||
TransactionIdRetreat(latestCompletedXid);
|
||||
Assert(TransactionIdIsNormal(latestCompletedXid));
|
||||
running.latestCompletedXid = latestCompletedXid;
|
||||
running.xids = xids;
|
||||
|
||||
ProcArrayApplyRecoveryInfo(&running);
|
||||
|
||||
StandbyRecoverPreparedTransactions();
|
||||
|
||||
elog(LOG, "initialized known-assigned XIDs with %d in-progress XIDs between %u and %u (max %d)",
|
||||
xcnt, from, till, max_xcnt);
|
||||
}
|
||||
else
|
||||
elog(LOG, "could not initialize known-assigned XIDs because there are too many (sub)transactions active (%d in-progress XIDs between %u and %u, max %d)",
|
||||
xcnt, from, till, max_xcnt);
|
||||
|
||||
pfree(xids);
|
||||
}
|
||||
|
||||
static void
|
||||
neon_rm_startup(void)
|
||||
{
|
||||
/* nothing to do here */
|
||||
if (standbyState == STANDBY_INITIALIZED)
|
||||
RestoreRunningXactsFromClog();
|
||||
else
|
||||
elog(LOG, "neon_rm_startup called with standbyState=%d", standbyState);
|
||||
}
|
||||
|
||||
static void
|
||||
|
||||
@@ -1,9 +1,7 @@
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv, wait_replica_caughtup
|
||||
|
||||
|
||||
@pytest.mark.xfail
|
||||
def test_replication_start(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
|
||||
|
||||
239
test_runner/regress/test_replication_start_subxid_overflow.py
Normal file
239
test_runner/regress/test_replication_start_subxid_overflow.py
Normal file
@@ -0,0 +1,239 @@
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv, wait_for_last_flush_lsn, wait_replica_caughtup
|
||||
|
||||
|
||||
# This test replica startup in case of large number of active subtransactions at primary node.
|
||||
# Number of known transaction xids is limited and if we do not limit number of transactions
|
||||
# restored from CLOG, then it will cause crash of replica.
|
||||
def test_replication_start_subxid_overflow(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
|
||||
with env.endpoints.create_start(branch_name="main", endpoint_id="primary") as primary:
|
||||
with primary.connect() as p_con:
|
||||
with p_con.cursor() as p_cur:
|
||||
p_cur.execute("begin")
|
||||
p_cur.execute("create table t(pk serial primary key, payload integer)")
|
||||
p_cur.execute(
|
||||
"""create or replace function create_subxacts(n integer) returns void as $$
|
||||
declare
|
||||
i integer;
|
||||
begin
|
||||
for i in 1..n loop
|
||||
begin
|
||||
insert into t (payload) values (0);
|
||||
exception
|
||||
when others then
|
||||
raise exception 'caught something';
|
||||
end;
|
||||
end loop;
|
||||
end; $$ language plpgsql"""
|
||||
)
|
||||
p_cur.execute("select create_subxacts(100000)")
|
||||
p_cur.execute("select txid_current()")
|
||||
xid = p_cur.fetchall()[0][0]
|
||||
log.info(f"Master transaction {xid}")
|
||||
with env.endpoints.new_replica_start(
|
||||
origin=primary, endpoint_id="secondary"
|
||||
) as secondary:
|
||||
wait_replica_caughtup(primary, secondary)
|
||||
with secondary.connect() as s_con:
|
||||
with s_con.cursor() as s_cur:
|
||||
# Enforce setting hint bits for pg_class tuples.
|
||||
# If master's transaction is not marked as in-progress in MVCC snapshot,
|
||||
# then XMIN_INVALID hint bit will be set for table's 't' tuple makeing it invisible.
|
||||
s_cur.execute("select * from pg_class")
|
||||
p_cur.execute("commit")
|
||||
wait_replica_caughtup(primary, secondary)
|
||||
s_cur.execute("select * from t where pk = 1")
|
||||
assert s_cur.fetchone() == (1, 0)
|
||||
|
||||
|
||||
# Test starting a replica at a point in time where there is a transaction
|
||||
# running in primary with lots of subtransactions. Test MVCC in standby before
|
||||
# and after the commit of the transaction.
|
||||
#
|
||||
# XXX: This currently fails, the query in standby incorrectly sees some sub-xids
|
||||
# as committed:
|
||||
#
|
||||
# test_runner/regress/test_replication_start_subxid_overflow.py:105: AssertionError
|
||||
#
|
||||
# FAILED test_runner/regress/test_replication_start_subxid_overflow.py::test_replication_start_subxid_overflow2[debug-pg16] - assert (93229,) == (0,)
|
||||
|
||||
def test_replication_start_subxid_overflow2(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
|
||||
primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary")
|
||||
|
||||
p_con = primary.connect()
|
||||
p_cur = p_con.cursor()
|
||||
|
||||
p_cur.execute("create table t(pk serial primary key, payload integer)")
|
||||
p_cur.execute(
|
||||
"""create or replace function create_subxacts(n integer) returns void as $$
|
||||
declare
|
||||
i integer;
|
||||
begin
|
||||
for i in 1..n loop
|
||||
begin
|
||||
insert into t (payload) values (0);
|
||||
exception
|
||||
when others then
|
||||
raise exception 'caught something';
|
||||
end;
|
||||
end loop;
|
||||
end; $$ language plpgsql"""
|
||||
)
|
||||
|
||||
# Start a new transaction in primary, with a lot of subtransactions
|
||||
p_cur.execute("begin")
|
||||
p_cur.execute("select create_subxacts(100000)")
|
||||
|
||||
# Create a replica at this LSN
|
||||
wait_for_last_flush_lsn(env, primary, env.initial_tenant, env.initial_timeline)
|
||||
with env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary") as secondary:
|
||||
s_con = secondary.connect()
|
||||
s_cur = s_con.cursor()
|
||||
|
||||
# The transaction in primary has not committed yet.
|
||||
wait_replica_caughtup(primary, secondary)
|
||||
s_cur.execute("select count(*) from t")
|
||||
assert s_cur.fetchone() == (0,)
|
||||
|
||||
# Add more sub-xids to the same transaction in primary
|
||||
p_cur.execute("select create_subxacts(10000)")
|
||||
|
||||
# The transaction still hasn't committed
|
||||
wait_replica_caughtup(primary, secondary)
|
||||
s_cur.execute("BEGIN ISOLATION LEVEL REPEATABLE READ")
|
||||
s_cur.execute("select count(*) from t")
|
||||
assert s_cur.fetchone() == (0,)
|
||||
|
||||
# Commit the transaction in primary
|
||||
p_cur.execute("commit")
|
||||
|
||||
# Should still be invisible to the old snapshot
|
||||
wait_replica_caughtup(primary, secondary)
|
||||
s_cur.execute("select count(*) from t")
|
||||
assert s_cur.fetchone() == (0,)
|
||||
|
||||
# Commit the REPEATABLE READ transaction in standby
|
||||
s_cur.execute("commit")
|
||||
|
||||
# The transaction should now visible to new snapshot in standby
|
||||
s_cur.execute("select count(*) from t")
|
||||
assert s_cur.fetchone() == (200000,)
|
||||
|
||||
# Test starting a standby from situation that there is one transaction running
|
||||
# in primary, with lots of subtransactions. Then, after the standby is running,
|
||||
# start a large number of new transcations, with lots of subtransactions each, in
|
||||
# the primary.
|
||||
#
|
||||
# XXX: This currently crashes the startup process in secondary with:
|
||||
#
|
||||
# FATAL: too many KnownAssignedXids
|
||||
# CONTEXT: WAL redo at 0/1895CB0 for neon/INSERT: off: 25, flags: 0x08; blkref #0: rel 1663/5/16385, blk 64
|
||||
def test_replication_start_subxid_overflow3(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
|
||||
primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary")
|
||||
|
||||
with primary.connect() as p_con:
|
||||
p_cur = p_con.cursor()
|
||||
|
||||
p_cur.execute("create table t(pk serial primary key, payload integer)")
|
||||
p_cur.execute(
|
||||
"""create or replace function create_subxacts(n integer) returns void as $$
|
||||
declare
|
||||
i integer;
|
||||
begin
|
||||
for i in 1..n loop
|
||||
begin
|
||||
insert into t (payload) values (0);
|
||||
exception
|
||||
when others then
|
||||
raise exception 'caught something';
|
||||
end;
|
||||
end loop;
|
||||
end; $$ language plpgsql"""
|
||||
)
|
||||
|
||||
p_cur.execute(f"show max_connections")
|
||||
max_connections = int(p_cur.fetchone()[0])
|
||||
|
||||
n_connections = max_connections - 2
|
||||
n_subxids = 100
|
||||
|
||||
# Start one top tranaction in primary, with lots of subtransactions. This uses up much the
|
||||
# known-assigned XIDs space in the standby, but doesn't cause it to overflow.
|
||||
large_p_conn = primary.connect()
|
||||
large_p_cur = large_p_conn.cursor()
|
||||
large_p_cur.execute("begin")
|
||||
large_p_cur.execute("select create_subxacts(2000)")
|
||||
|
||||
# Create a replica at this LSN
|
||||
wait_for_last_flush_lsn(env, primary, env.initial_tenant, env.initial_timeline)
|
||||
with env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary") as secondary:
|
||||
s_con = secondary.connect()
|
||||
s_cur = s_con.cursor()
|
||||
|
||||
# The transaction in primary has not committed yet.
|
||||
wait_replica_caughtup(primary, secondary)
|
||||
s_cur.execute("select count(*) from t")
|
||||
assert s_cur.fetchone() == (0,)
|
||||
|
||||
# Start max number of top transactions in primary, with a lot of
|
||||
# subtransactions each We add the subtransactions to each top
|
||||
# transaction in a round-robin fashion, instead of adding a lot of
|
||||
# subtransactions ot one top transaction at a time. This way, we will
|
||||
# have the max number of subtransactions in the in-memory subxid cache
|
||||
# of each top transaction, until they all overflow.
|
||||
#
|
||||
# Currently, PGPROC_MAX_CACHED_SUBXIDS == 64, so this will overflow the
|
||||
# all the subxid caches after creating 64 subxids in each top
|
||||
# transaction. The point just before the caches have overflowed is the
|
||||
# most interesting point in time, but we'll keep going beyond that, to
|
||||
# ensure that this test is robust even if PGPROC_MAX_CACHED_SUBXIDS
|
||||
# changes.
|
||||
p_cons = []
|
||||
p_curs = []
|
||||
for i in range(0, n_connections):
|
||||
p_con = primary.connect()
|
||||
p_cur = p_con.cursor()
|
||||
|
||||
p_cur.execute("begin")
|
||||
p_cons.append(p_con)
|
||||
p_curs.append(p_cur)
|
||||
|
||||
for subxid in range(0, n_subxids):
|
||||
for i in range(0, n_connections):
|
||||
p_curs[i].execute("select create_subxacts(1)")
|
||||
|
||||
# None of the transactions have committed yet, so they should be
|
||||
# invisible in standby.
|
||||
s_cur.execute("select count(*) from t")
|
||||
assert s_cur.fetchone() == (0,)
|
||||
|
||||
# The transactions still haven't committed. Start a new snapshot, we
|
||||
# will use it later, after committing in the primary.
|
||||
wait_replica_caughtup(primary, secondary)
|
||||
s_cur.execute("BEGIN ISOLATION LEVEL REPEATABLE READ")
|
||||
|
||||
s_cur.execute("select count(*) from t")
|
||||
assert s_cur.fetchone() == (0,)
|
||||
|
||||
# Commit all the transactions in the primary
|
||||
large_p_cur.execute("commit")
|
||||
for i in range(0, n_connections):
|
||||
p_curs[i].execute("commit")
|
||||
|
||||
# All the XIDs still be invisible to the old snapshot
|
||||
wait_replica_caughtup(primary, secondary)
|
||||
s_cur.execute("select count(*) from t")
|
||||
assert s_cur.fetchone() == (0,)
|
||||
|
||||
# Commit the REPEATABLE READ transaction in standby
|
||||
s_cur.execute("commit")
|
||||
|
||||
# The transaction should now visible to new snapshot in standby
|
||||
s_cur.execute("select count(*) from t")
|
||||
assert s_cur.fetchone() == (n_connections * n_subxids,)
|
||||
2
vendor/postgres-v14
vendored
2
vendor/postgres-v14
vendored
Submodule vendor/postgres-v14 updated: 0d30e28f74...05d21e2b41
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: 74fb144890...2712558896
2
vendor/postgres-v16
vendored
2
vendor/postgres-v16
vendored
Submodule vendor/postgres-v16 updated: 3c2b9d576c...c83756da52
6
vendor/revisions.json
vendored
6
vendor/revisions.json
vendored
@@ -1,5 +1,5 @@
|
||||
{
|
||||
"v16": ["16.3", "3c2b9d576c580e0b5b7108001f959b8c5b42e0a2"],
|
||||
"v15": ["15.7", "74fb144890c4f955db1ef50ee1eeb9d8a6c2f69d"],
|
||||
"v14": ["14.12", "0d30e28f74f49fe6a27a6bd45dcfeb1060656b8f"]
|
||||
"v16": ["16.3", "c83756da5260c784f68ca21c037c7e3fd4ca87c4"],
|
||||
"v15": ["15.7", "2712558896d95fcf27963da6c64a5f844d0621fa"],
|
||||
"v14": ["14.12", "05d21e2b4167245102eddee01a158264fb2eddfe"]
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user