Restore running xacts from CLOG on replica startup (#7288)

We have one pretty serious MVCC visibility bug with hot standby
replicas. We incorrectly treat any transactions that are in progress
in the primary, when the standby is started, as aborted. That can
break MVCC for queries running concurrently in the standby. It can
also lead to hint bits being set incorrectly, and that damage can last
until the replica is restarted.

The fundamental bug was that we treated any replica start as starting
from a shut down server. The fix for that is straightforward: we need
to set 'wasShutdown = false' in InitWalRecovery() (see changes in the
postgres repo).

However, that introduces a new problem: with wasShutdown = false, the
standby will not open up for queries until it receives a running-xacts
WAL record from the primary. That's correct, and that's how Postgres
hot standby always works. But it's a problem for Neon, because:

* It changes the historical behavior for existing users. Currently,
  the standby immediately opens up for queries, so if they now need to
  wait, we can breka existing use cases that were working fine
  (assuming you don't hit the MVCC issues).

* The problem is much worse for Neon than it is for standalone
  PostgreSQL, because in Neon, we can start a replica from an
  arbitrary LSN. In standalone PostgreSQL, the replica always starts
  WAL replay from a checkpoint record, and the primary arranges things
  so that there is always a running-xacts record soon after each
  checkpoint record. You can still hit this issue with PostgreSQL if
  you have a transaction with lots of subtransactions running in the
  primary, but it's pretty rare in practice.

To mitigate that, we introduce another way to collect the
running-xacts information at startup, without waiting for the
running-xacts WAL record: We can the CLOG for XIDs that haven't been
marked as committed or aborted. It has limitations with
subtransactions too, but should mitigate the problem for most users.

See https://github.com/neondatabase/neon/issues/7236.

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
This commit is contained in:
Heikki Linnakangas
2024-07-01 12:58:12 +03:00
parent 75c84c846a
commit 9ce193082a
10 changed files with 981 additions and 48 deletions

View File

@@ -343,7 +343,33 @@ impl WalIngest {
xlog_checkpoint.oldestActiveXid,
self.checkpoint.oldestActiveXid
);
self.checkpoint.oldestActiveXid = xlog_checkpoint.oldestActiveXid;
// A shutdown checkpoint has `oldestActiveXid == InvalidTransactionid`,
// because at shutdown, all in-progress transactions will implicitly
// end. Postgres startup code knows that, and allows hot standby to start
// immediately from a shutdown checkpoint.
//
// In Neon, Postgres hot standby startup always behaves as if starting from
// an online checkpoint. It needs a valid `oldestActiveXid` value, so
// instead of overwriting self.checkpoint.oldestActiveXid with
// InvalidTransactionid from the checkpoint WAL record, update it to a
// proper value, knowing that there are no in-progress transactions at this
// point, except for prepared transactions.
//
// See also the neon code changes in the InitWalRecovery() function.
if xlog_checkpoint.oldestActiveXid == pg_constants::INVALID_TRANSACTION_ID
&& info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
{
let mut oldest_active_xid = self.checkpoint.nextXid.value as u32;
for xid in modification.tline.list_twophase_files(lsn, ctx).await? {
if (xid.wrapping_sub(oldest_active_xid) as i32) < 0 {
oldest_active_xid = xid;
}
}
self.checkpoint.oldestActiveXid = oldest_active_xid;
} else {
self.checkpoint.oldestActiveXid = xlog_checkpoint.oldestActiveXid;
}
// Write a new checkpoint key-value pair on every checkpoint record, even
// if nothing really changed. Not strictly required, but it seems nice to
@@ -375,6 +401,7 @@ impl WalIngest {
if info == pg_constants::XLOG_RUNNING_XACTS {
let xlrec = crate::walrecord::XlRunningXacts::decode(&mut buf);
self.checkpoint.oldestActiveXid = xlrec.oldest_running_xid;
self.checkpoint_modified = true;
}
}
pg_constants::RM_REPLORIGIN_ID => {
@@ -1277,13 +1304,10 @@ impl WalIngest {
xlrec.pageno, xlrec.oldest_xid, xlrec.oldest_xid_db
);
// Here we treat oldestXid and oldestXidDB
// differently from postgres redo routines.
// In postgres checkpoint.oldestXid lags behind xlrec.oldest_xid
// until checkpoint happens and updates the value.
// Here we can use the most recent value.
// It's just an optimization, though and can be deleted.
// TODO Figure out if there will be any issues with replica.
// In Postgres, oldestXid and oldestXidDB are updated in memory when the CLOG is
// truncated, but a checkpoint record with the updated values isn't written until
// later. In Neon, a server can start at any LSN, not just on a checkpoint record,
// so we keep the oldestXid and oldestXidDB up-to-date.
self.checkpoint.oldestXid = xlrec.oldest_xid;
self.checkpoint.oldestXidDB = xlrec.oldest_xid_db;
self.checkpoint_modified = true;

View File

@@ -12,6 +12,8 @@
#include "fmgr.h"
#include "miscadmin.h"
#include "access/subtrans.h"
#include "access/twophase.h"
#include "access/xact.h"
#include "access/xlog.h"
#include "storage/buf_internals.h"
@@ -22,10 +24,12 @@
#include "replication/logical.h"
#include "replication/slot.h"
#include "replication/walsender.h"
#include "storage/proc.h"
#include "storage/procsignal.h"
#include "tcop/tcopprot.h"
#include "funcapi.h"
#include "access/htup_details.h"
#include "utils/builtins.h"
#include "utils/pg_lsn.h"
#include "utils/guc.h"
#include "utils/wait_event.h"
@@ -266,6 +270,293 @@ LogicalSlotsMonitorMain(Datum main_arg)
}
}
/*
* XXX: These private to procarray.c, but we need them here.
*/
#define PROCARRAY_MAXPROCS (MaxBackends + max_prepared_xacts)
#define TOTAL_MAX_CACHED_SUBXIDS \
((PGPROC_MAX_CACHED_SUBXIDS + 1) * PROCARRAY_MAXPROCS)
/*
* Restore running-xact information by scanning the CLOG at startup.
*
* In PostgreSQL, a standby always has to wait for a running-xacts WAL record
* to arrive before it can start accepting queries. Furthermore, if there are
* transactions with too many subxids (> 64) open to fit in the in-memory
* subxids cache, the running-xacts record will be marked as "suboverflowed",
* and the standby will need to also wait for the currently in-progress
* transactions to finish.
*
* That's not great in PostgreSQL, because a hot standby does not necessary
* open up for queries immediately as you might expect. But it's worse in
* Neon: A standby in Neon doesn't need to start WAL replay from a checkpoint
* record; it can start at any LSN. Postgres arranges things so that there is
* a running-xacts record soon after every checkpoint record, but when you
* start from an arbitrary LSN, that doesn't help. If the primary is idle, or
* not running at all, it might never write a new running-xacts record,
* leaving the replica in a limbo where it can never start accepting queries.
*
* To mitigate that, we have an additional mechanism to find the running-xacts
* information: we scan the CLOG, making note of any XIDs not marked as
* committed or aborted. They are added to the Postgres known-assigned XIDs
* array by calling ProcArrayApplyRecoveryInfo() in the caller of this
* function.
*
* There is one big limitation with that mechanism: The size of the
* known-assigned XIDs is limited, so if there are a lot of in-progress XIDs,
* we have to give up. Furthermore, we don't know how many of the in-progress
* XIDs are subtransactions, and if we use up all the space in the
* known-assigned XIDs array for subtransactions, we might run out of space in
* the array later during WAL replay, causing the replica to shut down with
* "ERROR: too many KnownAssignedXids". The safe # of XIDs that we can add to
* the known-assigned array without risking that error later is very low,
* merely PGPROC_MAX_CACHED_SUBXIDS == 64, so we take our chances and use up
* to half of the known-assigned XIDs array for the subtransactions, even
* though that risks getting the error later.
*
* Note: It's OK if the recovered list of XIDs includes some transactions that
* have crashed in the primary, and hence will never commit. They will be seen
* as in-progress, until we see a new next running-acts record with an
* oldestActiveXid that invalidates them. That's how the known-assigned XIDs
* array always works.
*
* If scraping the CLOG doesn't succeed for some reason, like the subxid
* overflow, Postgres will fall back to waiting for a running-xacts record
* like usual.
*
* Returns true if a complete list of in-progress XIDs was scraped.
*/
static bool
RestoreRunningXactsFromClog(CheckPoint *checkpoint, TransactionId **xids, int *nxids)
{
TransactionId from;
TransactionId till;
int max_xcnt;
TransactionId *prepared_xids = NULL;
int n_prepared_xids;
TransactionId *restored_xids = NULL;
int n_restored_xids;
int next_prepared_idx;
Assert(*xids == NULL);
/*
* If the checkpoint doesn't have a valid oldestActiveXid, bail out. We
* don't know where to start the scan.
*
* This shouldn't happen, because the pageserver always maintains a valid
* oldestActiveXid nowadays. Except when starting at an old point in time
* that was ingested before the pageserver was taught to do that.
*/
if (!TransactionIdIsValid(checkpoint->oldestActiveXid))
{
elog(LOG, "cannot restore running-xacts from CLOG because oldestActiveXid is not set");
goto fail;
}
/*
* We will scan the CLOG starting from the oldest active XID.
*
* In some corner cases, the oldestActiveXid from the last checkpoint
* might already have been truncated from the CLOG. That is,
* oldestActiveXid might be older than oldestXid. That's possible because
* oldestActiveXid is only updated at checkpoints. After the last
* checkpoint, the oldest transaction might have committed, and the CLOG
* might also have been already truncated. So if oldestActiveXid is older
* than oldestXid, start at oldestXid instead. (Otherwise we'd try to
* access CLOG segments that have already been truncated away.)
*/
from = TransactionIdPrecedes(checkpoint->oldestXid, checkpoint->oldestActiveXid)
? checkpoint->oldestActiveXid : checkpoint->oldestXid;
till = XidFromFullTransactionId(checkpoint->nextXid);
/*
* To avoid "too many KnownAssignedXids" error later during replay, we
* limit number of collected transactions. This is a tradeoff: if we are
* willing to consume more of the KnownAssignedXids space for the XIDs
* now, that allows us to start up, but we might run out of space later.
*
* The size of the KnownAssignedXids array is TOTAL_MAX_CACHED_SUBXIDS,
* which is (PGPROC_MAX_CACHED_SUBXIDS + 1) * PROCARRAY_MAXPROCS). In
* PostgreSQL, that's always enough because the primary will always write
* an XLOG_XACT_ASSIGNMENT record if a transaction has more than
* PGPROC_MAX_CACHED_SUBXIDS subtransactions. Seeing that record allows
* the standby to mark the XIDs in pg_subtrans and removing them from the
* KnowingAssignedXids array.
*
* Here, we don't know which XIDs belong to subtransactions that have
* already been WAL-logged with an XLOG_XACT_ASSIGNMENT record. If we
* wanted to be totally safe and avoid the possibility of getting a "too
* many KnownAssignedXids" error later, we would have to limit ourselves
* to PGPROC_MAX_CACHED_SUBXIDS, which is not much. And that includes top
* transaction IDs too, because we cannot distinguish between top
* transaction IDs and subtransactions here.
*
* Somewhat arbitrarily, we use up to half of KnownAssignedXids. That
* strikes a sensible balance between being useful, and risking a "too
* many KnownAssignedXids" error later.
*/
max_xcnt = TOTAL_MAX_CACHED_SUBXIDS / 2;
/*
* Collect XIDs of prepared transactions in an array. This includes only
* their top-level XIDs. We assume that StandbyRecoverPreparedTransactions
* has already been called, so we can find all the sub-transactions in
* pg_subtrans.
*/
PrescanPreparedTransactions(&prepared_xids, &n_prepared_xids);
qsort(prepared_xids, n_prepared_xids, sizeof(TransactionId), xidLogicalComparator);
/*
* Scan the CLOG, collecting in-progress XIDs into 'restored_xids'.
*/
elog(DEBUG1, "scanning CLOG between %u and %u for in-progress XIDs", from, till);
restored_xids = (TransactionId *) palloc(max_xcnt * sizeof(TransactionId));
n_restored_xids = 0;
next_prepared_idx = 0;
for (TransactionId xid = from; xid != till;)
{
XLogRecPtr xidlsn;
XidStatus xidstatus;
xidstatus = TransactionIdGetStatus(xid, &xidlsn);
/*
* "Merge" the prepared transactions into the restored_xids array as
* we go. The prepared transactions array is sorted. This is mostly
* a sanity check to ensure that all the prpeared transactions are
* seen as in-progress. (There is a check after the loop that we didn't
* miss any.)
*/
if (next_prepared_idx < n_prepared_xids && xid == prepared_xids[next_prepared_idx])
{
/*
* This is a top-level transaction ID of a prepared transaction.
* Include it in the array.
*/
/* sanity check */
if (xidstatus != TRANSACTION_STATUS_IN_PROGRESS)
{
elog(LOG, "prepared transaction %u has unexpected status %X, cannot restore running-xacts from CLOG",
xid, xidstatus);
Assert(false);
goto fail;
}
elog(DEBUG1, "XID %u: was next prepared xact (%d / %d)", xid, next_prepared_idx, n_prepared_xids);
next_prepared_idx++;
}
else if (xidstatus == TRANSACTION_STATUS_COMMITTED)
{
elog(DEBUG1, "XID %u: was committed", xid);
goto skip;
}
else if (xidstatus == TRANSACTION_STATUS_ABORTED)
{
elog(DEBUG1, "XID %u: was aborted", xid);
goto skip;
}
else if (xidstatus == TRANSACTION_STATUS_IN_PROGRESS)
{
/*
* In-progress transactions are included in the array.
*
* Except subtransactions of the prepared transactions. They are
* already set in pg_subtrans, and hence don't need to be tracked
* in the known-assigned XIDs array.
*/
if (n_prepared_xids > 0)
{
TransactionId parent = SubTransGetParent(xid);
if (TransactionIdIsValid(parent))
{
/*
* This is a subtransaction belonging to a prepared
* transaction.
*
* Sanity check that it is in the prepared XIDs array. It
* should be, because StandbyRecoverPreparedTransactions
* populated pg_subtrans, and no other XID should be set
* in it yet. (This also relies on the fact that
* StandbyRecoverPreparedTransactions sets the parent of
* each subxid to point directly to the top-level XID,
* rather than restoring the original subtransaction
* hierarchy.)
*/
if (bsearch(&parent, prepared_xids, next_prepared_idx,
sizeof(TransactionId), xidLogicalComparator) == NULL)
{
elog(LOG, "sub-XID %u has unexpected parent %u, cannot restore running-xacts from CLOG",
xid, parent);
Assert(false);
goto fail;
}
elog(DEBUG1, "XID %u: was a subtransaction of prepared xid %u", xid, parent);
goto skip;
}
}
/* include it in the array */
elog(DEBUG1, "XID %u: is in progress", xid);
}
else
{
/*
* SUB_COMMITTED is a transient state used at commit. We don't
* expect to see that here.
*/
elog(LOG, "XID %u has unexpected status %X in pg_xact, cannot restore running-xacts from CLOG",
xid, xidstatus);
Assert(false);
goto fail;
}
if (n_restored_xids >= max_xcnt)
{
/*
* Overflowed. We won't be able to install the RunningTransactions
* snapshot.
*/
elog(LOG, "too many running xacts to restore from the CLOG; oldestXid=%u oldestActiveXid=%u nextXid %u",
checkpoint->oldestXid, checkpoint->oldestActiveXid,
XidFromFullTransactionId(checkpoint->nextXid));
goto fail;
}
restored_xids[n_restored_xids++] = xid;
skip:
TransactionIdAdvance(xid);
continue;
}
/* sanity check */
if (next_prepared_idx != n_prepared_xids)
{
elog(LOG, "prepared transaction ID %u was not visited in the CLOG scan, cannot restore running-xacts from CLOG",
prepared_xids[next_prepared_idx]);
Assert(false);
goto fail;
}
elog(LOG, "restored %d running xacts by scanning the CLOG; oldestXid=%u oldestActiveXid=%u nextXid %u",
n_restored_xids, checkpoint->oldestXid, checkpoint->oldestActiveXid, XidFromFullTransactionId(checkpoint->nextXid));
*nxids = n_restored_xids;
*xids = restored_xids;
return true;
fail:
*nxids = 0;
*xids = NULL;
if (restored_xids)
pfree(restored_xids);
if (prepared_xids)
pfree(prepared_xids);
return false;
}
void
_PG_init(void)
{
@@ -288,6 +579,8 @@ _PG_init(void)
pg_init_extension_server();
restore_running_xacts_callback = RestoreRunningXactsFromClog;
/*
* Important: This must happen after other parts of the extension are
* loaded, otherwise any settings to GUCs that were set before the

View File

@@ -3856,7 +3856,9 @@ class EndpointFactory:
return self
def new_replica(self, origin: Endpoint, endpoint_id: str, config_lines: Optional[List[str]]):
def new_replica(
self, origin: Endpoint, endpoint_id: str, config_lines: Optional[List[str]] = None
):
branch_name = origin.branch_name
assert origin in self.endpoints
assert branch_name is not None

View File

@@ -198,7 +198,7 @@ def wait_for_last_record_lsn(
lsn: Lsn,
) -> Lsn:
"""waits for pageserver to catch up to a certain lsn, returns the last observed lsn."""
for i in range(100):
for i in range(1000):
current_lsn = last_record_lsn(pageserver_http, tenant, timeline)
if current_lsn >= lsn:
return current_lsn

View File

@@ -0,0 +1,646 @@
"""
In PostgreSQL, a standby always has to wait for a running-xacts WAL record to
arrive before it can start accepting queries. Furthermore, if there are
transactions with too many subxids (> 64) open to fit in the in-memory subxids
cache, the running-xacts record will be marked as "suboverflowed", and the
standby will need to also wait for the currently in-progress transactions to
finish.
In Neon, we have an additional mechanism that scans the CLOG at server startup
to determine the list of running transactions, so that the standby can start up
immediately without waiting for the running-xacts record, but that mechanism
only works if the # of active (sub-)transactions is reasonably small. Otherwise
it falls back to waiting. Furthermore, it's somewhat optimistic in using up the
known-assigned XIDs array: if too many transactions with subxids are started in
the primary later, the replay in the replica will crash with "too many
KnownAssignedXids" error.
This module contains tests for those various cases at standby startup: starting
from shutdown checkpoint, using the CLOG scanning mechanism, waiting for
running-xacts record and for in-progress transactions to finish etc.
"""
import threading
from contextlib import closing
import psycopg2
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, wait_for_last_flush_lsn, wait_replica_caughtup
from fixtures.pg_version import PgVersion
from fixtures.utils import query_scalar, wait_until
CREATE_SUBXACTS_FUNC = """
create or replace function create_subxacts(n integer) returns void as $$
declare
i integer;
begin
for i in 1..n loop
begin
insert into t (payload) values (0);
exception
when others then
raise exception 'caught something: %', sqlerrm;
end;
end loop;
end; $$ language plpgsql
"""
def test_replica_start_scan_clog(neon_simple_env: NeonEnv):
"""
Test the CLOG-scanning mechanism at hot standby startup. There is one
transaction active in the primary when the standby is started. The primary
is killed before it has a chance to write a running-xacts record. The
CLOG-scanning at neon startup allows the standby to start up anyway.
See the module docstring for background.
"""
# Initialize the primary, a test table, and a helper function to create lots
# of subtransactions.
env = neon_simple_env
primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary")
primary_conn = primary.connect()
primary_cur = primary_conn.cursor()
primary_cur.execute("CREATE EXTENSION neon_test_utils")
primary_cur.execute("create table t(pk serial primary key, payload integer)")
primary_cur.execute(CREATE_SUBXACTS_FUNC)
primary_cur.execute("select pg_switch_wal()")
# Start a transaction in the primary. Leave the transaction open.
#
# The transaction has some subtransactions, but not too many to cause the
# CLOG-scanning mechanism to give up.
primary_cur.execute("begin")
primary_cur.execute("select create_subxacts(50)")
# Wait for the WAL to be flushed, but then immediately kill the primary,
# before it has a chance to generate a running-xacts record.
primary_cur.execute("select neon_xlogflush()")
wait_for_last_flush_lsn(env, primary, env.initial_tenant, env.initial_timeline)
primary.stop(mode="immediate")
# Create a replica. It should start up normally, thanks to the CLOG-scanning
# mechanism.
secondary = env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary")
# The transaction did not commit, so it should not be visible in the secondary
secondary_conn = secondary.connect()
secondary_cur = secondary_conn.cursor()
secondary_cur.execute("select count(*) from t")
assert secondary_cur.fetchone() == (0,)
def test_replica_start_scan_clog_crashed_xids(neon_simple_env: NeonEnv):
"""
Test the CLOG-scanning mechanism at hot standby startup, after
leaving behind crashed transactions.
See the module docstring for background.
"""
# Initialize the primary, a test table, and a helper function to create lots
# of subtransactions.
env = neon_simple_env
primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary")
primary_conn = primary.connect()
primary_cur = primary_conn.cursor()
primary_cur.execute("create table t(pk serial primary key, payload integer)")
primary_cur.execute(CREATE_SUBXACTS_FUNC)
primary_cur.execute("select pg_switch_wal()")
# Consume a lot of XIDs, then kill Postgres without giving it a
# chance to write abort records for them.
primary_cur.execute("begin")
primary_cur.execute("select create_subxacts(100000)")
primary.stop(mode="immediate")
# Restart the primary. Do some light work, and shut it down cleanly
primary.start()
primary_conn = primary.connect()
primary_cur = primary_conn.cursor()
primary_cur.execute("insert into t (payload) values (0)")
primary.stop(mode="fast")
# Create a replica. It should start up normally, thanks to the CLOG-scanning
# mechanism. (Restarting the primary writes a checkpoint and/or running-xacts
# record, which allows the standby to know that the crashed XIDs are aborted)
secondary = env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary")
secondary_conn = secondary.connect()
secondary_cur = secondary_conn.cursor()
secondary_cur.execute("select count(*) from t")
assert secondary_cur.fetchone() == (1,)
def test_replica_start_at_running_xacts(neon_simple_env: NeonEnv, pg_version):
"""
Test that starting a replica works right after the primary has
created a running-xacts record. This may seem like a trivial case,
but during development, we had a bug that was triggered by having
oldestActiveXid == nextXid. Starting right after a running-xacts
record is one way to test that case.
See the module docstring for background.
"""
env = neon_simple_env
if env.pg_version == PgVersion.V14 or env.pg_version == PgVersion.V15:
pytest.skip("pg_log_standby_snapshot() function is available only in PG16")
primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary")
primary_conn = primary.connect()
primary_cur = primary_conn.cursor()
primary_cur.execute("CREATE EXTENSION neon_test_utils")
primary_cur.execute("select pg_log_standby_snapshot()")
primary_cur.execute("select neon_xlogflush()")
wait_for_last_flush_lsn(env, primary, env.initial_tenant, env.initial_timeline)
secondary = env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary")
secondary_conn = secondary.connect()
secondary_cur = secondary_conn.cursor()
secondary_cur.execute("select 123")
assert secondary_cur.fetchone() == (123,)
def test_replica_start_wait_subxids_finish(neon_simple_env: NeonEnv):
"""
Test replica startup when there are a lot of (sub)transactions active in the
primary. That's too many for the CLOG-scanning mechanism to handle, so the
replica has to wait for the large transaction to finish before it starts to
accept queries.
After replica startup, test MVCC with transactions that were in-progress
when the replica was started.
See the module docstring for background.
"""
# Initialize the primary, a test table, and a helper function to create
# lots of subtransactions.
env = neon_simple_env
primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary")
primary_conn = primary.connect()
primary_cur = primary_conn.cursor()
primary_cur.execute("create table t(pk serial primary key, payload integer)")
primary_cur.execute(CREATE_SUBXACTS_FUNC)
# Start a transaction with 100000 subtransactions, and leave it open. That's
# too many to fit in the "known-assigned XIDs array" in the replica, and
# also too many to fit in the subxid caches so the running-xacts record will
# also overflow.
primary_cur.execute("begin")
primary_cur.execute("select create_subxacts(100000)")
# Start another, smaller transaction in the primary. We'll come back to this
# later.
primary_conn2 = primary.connect()
primary_cur2 = primary_conn2.cursor()
primary_cur2.execute("begin")
primary_cur2.execute("insert into t (payload) values (0)")
# Create a replica. but before that, wait for the wal to be flushed to
# safekeepers, so that the replica is started at a point where the large
# transaction is already active. (The whole transaction might not be flushed
# yet, but that's OK.)
#
# Start it in a separate thread, so that we can do other stuff while it's
# blocked waiting for the startup to finish.
wait_for_last_flush_lsn(env, primary, env.initial_tenant, env.initial_timeline)
secondary = env.endpoints.new_replica(origin=primary, endpoint_id="secondary")
start_secondary_thread = threading.Thread(target=secondary.start)
start_secondary_thread.start()
# Verify that the replica has otherwise started up, but cannot start
# accepting queries yet.
log.info("Waiting 5 s to verify that the secondary does not start")
start_secondary_thread.join(5)
assert secondary.log_contains("consistent recovery state reached")
assert secondary.log_contains("started streaming WAL from primary")
# The "redo starts" message is printed when the first WAL record is
# received. It might or might not be present in the log depending on how
# far exactly the WAL was flushed when the replica was started, and whether
# background activity caused any more WAL records to be flushed on the
# primary afterwards.
#
# assert secondary.log_contains("redo # starts")
# should not be open for connections yet
assert start_secondary_thread.is_alive()
assert not secondary.is_running()
assert not secondary.log_contains("database system is ready to accept read-only connections")
# Commit the large transaction in the primary.
#
# Within the next 15 s, the primary should write a new running-xacts record
# to the WAL which shows the transaction as completed. Once the replica
# replays that record, it will start accepting queries.
primary_cur.execute("commit")
start_secondary_thread.join()
# Verify that the large transaction is correctly visible in the secondary
# (but not the second, small transaction, which is still in-progress!)
secondary_conn = secondary.connect()
secondary_cur = secondary_conn.cursor()
secondary_cur.execute("select count(*) from t")
assert secondary_cur.fetchone() == (100000,)
# Perform some more MVCC testing using the second transaction that was
# started in the primary before the replica was created
primary_cur2.execute("select create_subxacts(10000)")
# The second transaction still hasn't committed
wait_replica_caughtup(primary, secondary)
secondary_cur.execute("BEGIN ISOLATION LEVEL REPEATABLE READ")
secondary_cur.execute("select count(*) from t")
assert secondary_cur.fetchone() == (100000,)
# Commit the second transaction in the primary
primary_cur2.execute("commit")
# Should still be invisible to the old snapshot
wait_replica_caughtup(primary, secondary)
secondary_cur.execute("select count(*) from t")
assert secondary_cur.fetchone() == (100000,)
# Commit the REPEATABLE READ transaction in the replica. Both
# primary transactions should now be visible to a new snapshot.
secondary_cur.execute("commit")
secondary_cur.execute("select count(*) from t")
assert secondary_cur.fetchone() == (110001,)
def test_replica_too_many_known_assigned_xids(neon_simple_env: NeonEnv):
"""
The CLOG-scanning mechanism fills the known-assigned XIDs array
optimistically at standby startup, betting that it can still fit
upcoming transactions replayed later from the WAL in the
array. This test tests what happens when that bet fails and the
known-assigned XID array fills up after the standby has already
been started. The WAL redo will fail with an error:
FATAL: too many KnownAssignedXids
CONTEXT: WAL redo at 0/1895CB0 for neon/INSERT: off: 25, flags: 0x08; blkref #0: rel 1663/5/16385, blk 64
which causes the standby to shut down.
See the module docstring for background.
"""
# Initialize the primary, a test table, and a helper function to create lots
# of subtransactions.
env = neon_simple_env
primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary")
primary_conn = primary.connect()
primary_cur = primary_conn.cursor()
primary_cur.execute("CREATE EXTENSION neon_test_utils")
primary_cur.execute("create table t(pk serial primary key, payload integer)")
primary_cur.execute(CREATE_SUBXACTS_FUNC)
# Determine how many connections we can use
primary_cur.execute("show max_connections")
max_connections = int(primary_cur.fetchall()[0][0])
primary_cur.execute("show superuser_reserved_connections")
superuser_reserved_connections = int(primary_cur.fetchall()[0][0])
n_connections = max_connections - superuser_reserved_connections
n_subxids = 200
# Start one top transaction in primary, with lots of subtransactions. This
# uses up much of the known-assigned XIDs space in the standby, but doesn't
# cause it to overflow.
large_p_conn = primary.connect()
large_p_cur = large_p_conn.cursor()
large_p_cur.execute("begin")
large_p_cur.execute(f"select create_subxacts({max_connections} * 30)")
with closing(primary.connect()) as small_p_conn:
with small_p_conn.cursor() as small_p_cur:
small_p_cur.execute("select create_subxacts(1)")
# Create a replica at this LSN
primary_cur.execute("select neon_xlogflush()")
wait_for_last_flush_lsn(env, primary, env.initial_tenant, env.initial_timeline)
secondary = env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary")
secondary_conn = secondary.connect()
secondary_cur = secondary_conn.cursor()
# The transaction in primary has not committed yet.
wait_replica_caughtup(primary, secondary)
secondary_cur.execute("select count(*) from t")
assert secondary_cur.fetchone() == (1,)
# Start max number of top transactions in primary, with a lot of
# subtransactions each. We add the subtransactions to each top transaction
# in a round-robin fashion, instead of adding a lot of subtransactions to
# one top transaction at a time. This way, we will have the max number of
# subtransactions in the in-memory subxid cache of each top transaction,
# until they all overflow.
#
# Currently, PGPROC_MAX_CACHED_SUBXIDS == 64, so this will overflow the all
# the subxid caches after creating 64 subxids in each top transaction. The
# point just before the caches have overflowed is the most interesting point
# in time, but we'll keep going beyond that, to ensure that this test is
# robust even if PGPROC_MAX_CACHED_SUBXIDS changes.
p_curs = []
for _ in range(0, n_connections):
p_cur = primary.connect().cursor()
p_cur.execute("begin")
p_curs.append(p_cur)
for _subxid in range(0, n_subxids):
for i in range(0, n_connections):
p_curs[i].execute("select create_subxacts(1)")
# Commit all the transactions in the primary
for i in range(0, n_connections):
p_curs[i].execute("commit")
large_p_cur.execute("commit")
# Wait until the replica crashes with "too many KnownAssignedXids" error.
def check_replica_crashed():
try:
secondary.connect()
except psycopg2.Error:
# Once the connection fails, return success
return None
raise RuntimeError("connection succeeded")
wait_until(20, 0.5, check_replica_crashed)
assert secondary.log_contains("too many KnownAssignedXids")
# Replica is crashed, so ignore stop result
secondary.check_stop_result = False
def test_replica_start_repro_visibility_bug(neon_simple_env: NeonEnv):
"""
Before PR #7288, a hot standby in neon incorrectly started up
immediately, before it had received a running-xacts record. That
led to visibility bugs if there were active transactions in the
primary. This test reproduces the incorrect query results and
incorrectly set hint bits, before that was fixed.
"""
env = neon_simple_env
primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary")
p_cur = primary.connect().cursor()
p_cur.execute("begin")
p_cur.execute("create table t(pk integer primary key, payload integer)")
p_cur.execute("insert into t values (generate_series(1,100000), 0)")
secondary = env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary")
wait_replica_caughtup(primary, secondary)
s_cur = secondary.connect().cursor()
# Set hint bits for pg_class tuples. If primary's transaction is
# not marked as in-progress in MVCC snapshot, then XMIN_INVALID
# hint bit will be set for table's 't' tuple, making it invisible
# even after the commit record is replayed later.
s_cur.execute("select * from pg_class")
p_cur.execute("commit")
wait_replica_caughtup(primary, secondary)
s_cur.execute("select * from t where pk = 1")
assert s_cur.fetchone() == (1, 0)
@pytest.mark.parametrize("shutdown", [True, False])
def test_replica_start_with_prepared_xacts(neon_simple_env: NeonEnv, shutdown: bool):
"""
Test the CLOG-scanning mechanism at hot standby startup in the presence of
prepared transactions.
This test is run in two variants: one where the primary server is shut down
before starting the secondary, or not.
"""
# Initialize the primary, a test table, and a helper function to create lots
# of subtransactions.
env = neon_simple_env
primary = env.endpoints.create_start(
branch_name="main", endpoint_id="primary", config_lines=["max_prepared_transactions=5"]
)
primary_conn = primary.connect()
primary_cur = primary_conn.cursor()
primary_cur.execute("CREATE EXTENSION neon_test_utils")
primary_cur.execute("create table t(pk serial primary key, payload integer)")
primary_cur.execute("create table t1(pk integer primary key)")
primary_cur.execute("create table t2(pk integer primary key)")
primary_cur.execute(CREATE_SUBXACTS_FUNC)
# Prepare a transaction for two-phase commit
primary_cur.execute("begin")
primary_cur.execute("insert into t1 values (1)")
primary_cur.execute("prepare transaction 't1'")
# Prepare another transaction for two-phase commit, with a subtransaction
primary_cur.execute("begin")
primary_cur.execute("insert into t2 values (2)")
primary_cur.execute("savepoint sp")
primary_cur.execute("insert into t2 values (3)")
primary_cur.execute("prepare transaction 't2'")
# Start a transaction in the primary. Leave the transaction open.
#
# The transaction has some subtransactions, but not too many to cause the
# CLOG-scanning mechanism to give up.
primary_cur.execute("begin")
primary_cur.execute("select create_subxacts(50)")
# Wait for the WAL to be flushed
primary_cur.execute("select neon_xlogflush()")
wait_for_last_flush_lsn(env, primary, env.initial_tenant, env.initial_timeline)
if shutdown:
primary.stop(mode="fast")
# Create a replica. It should start up normally, thanks to the CLOG-scanning
# mechanism.
secondary = env.endpoints.new_replica_start(
origin=primary, endpoint_id="secondary", config_lines=["max_prepared_transactions=5"]
)
# The transaction did not commit, so it should not be visible in the secondary
secondary_conn = secondary.connect()
secondary_cur = secondary_conn.cursor()
secondary_cur.execute("select count(*) from t")
assert secondary_cur.fetchone() == (0,)
secondary_cur.execute("select count(*) from t1")
assert secondary_cur.fetchone() == (0,)
secondary_cur.execute("select count(*) from t2")
assert secondary_cur.fetchone() == (0,)
if shutdown:
primary.start()
primary_conn = primary.connect()
primary_cur = primary_conn.cursor()
else:
primary_cur.execute("commit")
primary_cur.execute("commit prepared 't1'")
primary_cur.execute("commit prepared 't2'")
wait_replica_caughtup(primary, secondary)
secondary_cur.execute("select count(*) from t")
if shutdown:
assert secondary_cur.fetchone() == (0,)
else:
assert secondary_cur.fetchone() == (50,)
secondary_cur.execute("select * from t1")
assert secondary_cur.fetchall() == [(1,)]
secondary_cur.execute("select * from t2")
assert secondary_cur.fetchall() == [(2,), (3,)]
def test_replica_start_with_prepared_xacts_with_subxacts(neon_simple_env: NeonEnv):
"""
Test the CLOG-scanning mechanism at hot standby startup in the presence of
prepared transactions, with subtransactions.
"""
# Initialize the primary, a test table, and a helper function to create lots
# of subtransactions.
env = neon_simple_env
primary = env.endpoints.create_start(
branch_name="main", endpoint_id="primary", config_lines=["max_prepared_transactions=5"]
)
primary_conn = primary.connect()
primary_cur = primary_conn.cursor()
# Install extension containing function needed for test
primary_cur.execute("CREATE EXTENSION neon_test_utils")
primary_cur.execute("create table t(pk serial primary key, payload integer)")
primary_cur.execute(CREATE_SUBXACTS_FUNC)
# Advance nextXid close to the beginning of the next pg_subtrans segment (2^16 XIDs)
#
# This is interesting, because it tests that pg_subtrans is initialized correctly
# at standby startup. (We had a bug where it didn't at one point during development.)
while True:
xid = int(query_scalar(primary_cur, "SELECT txid_current()"))
log.info(f"xid now {xid}")
# Consume 500 transactions at a time until we get close
if xid < 65535 - 600:
primary_cur.execute("select test_consume_xids(500);")
else:
break
primary_cur.execute("checkpoint")
# Prepare a transaction for two-phase commit
primary_cur.execute("begin")
primary_cur.execute("select create_subxacts(1000)")
primary_cur.execute("prepare transaction 't1'")
# Wait for the WAL to be flushed, and stop the primary
wait_for_last_flush_lsn(env, primary, env.initial_tenant, env.initial_timeline)
primary.stop(mode="fast")
# Create a replica. It should start up normally, thanks to the CLOG-scanning
# mechanism.
secondary = env.endpoints.new_replica_start(
origin=primary, endpoint_id="secondary", config_lines=["max_prepared_transactions=5"]
)
# The transaction did not commit, so it should not be visible in the secondary
secondary_conn = secondary.connect()
secondary_cur = secondary_conn.cursor()
secondary_cur.execute("select count(*) from t")
assert secondary_cur.fetchone() == (0,)
primary.start()
# Open a lot of subtransactions in the primary, causing the subxids cache to overflow
primary_conn = primary.connect()
primary_cur = primary_conn.cursor()
primary_cur.execute("select create_subxacts(100000)")
wait_replica_caughtup(primary, secondary)
secondary_cur.execute("select count(*) from t")
assert secondary_cur.fetchone() == (100000,)
primary_cur.execute("commit prepared 't1'")
wait_replica_caughtup(primary, secondary)
secondary_cur.execute("select count(*) from t")
assert secondary_cur.fetchone() == (101000,)
def test_replica_start_with_prepared_xacts_with_many_subxacts(neon_simple_env: NeonEnv):
"""
Test the CLOG-scanning mechanism at hot standby startup in the presence of
prepared transactions, with lots of subtransactions.
Like test_replica_start_with_prepared_xacts_with_subxacts, but with more
subxacts, to test that the prepared transaction's subxids don't consume
space in the known-assigned XIDs array. (They are set in pg_subtrans
instead)
"""
# Initialize the primary, a test table, and a helper function to create lots
# of subtransactions.
env = neon_simple_env
primary = env.endpoints.create_start(
branch_name="main", endpoint_id="primary", config_lines=["max_prepared_transactions=5"]
)
primary_conn = primary.connect()
primary_cur = primary_conn.cursor()
# Install extension containing function needed for test
primary_cur.execute("CREATE EXTENSION neon_test_utils")
primary_cur.execute("create table t(pk serial primary key, payload integer)")
primary_cur.execute(CREATE_SUBXACTS_FUNC)
# Prepare a transaction for two-phase commit, with lots of subxids
primary_cur.execute("begin")
primary_cur.execute("select create_subxacts(50000)")
# to make things a bit more varied, intersperse a few other XIDs in between
# the prepared transaction's sub-XIDs
with primary.connect().cursor() as primary_cur2:
primary_cur2.execute("insert into t (payload) values (123)")
primary_cur2.execute("begin; insert into t (payload) values (-1); rollback")
primary_cur.execute("select create_subxacts(50000)")
primary_cur.execute("prepare transaction 't1'")
# Wait for the WAL to be flushed
wait_for_last_flush_lsn(env, primary, env.initial_tenant, env.initial_timeline)
primary.stop(mode="fast")
# Create a replica. It should start up normally, thanks to the CLOG-scanning
# mechanism.
secondary = env.endpoints.new_replica_start(
origin=primary, endpoint_id="secondary", config_lines=["max_prepared_transactions=5"]
)
# The transaction did not commit, so it should not be visible in the secondary
secondary_conn = secondary.connect()
secondary_cur = secondary_conn.cursor()
secondary_cur.execute("select count(*) from t")
assert secondary_cur.fetchone() == (1,)
primary.start()
# Open a lot of subtransactions in the primary, causing the subxids cache to overflow
primary_conn = primary.connect()
primary_cur = primary_conn.cursor()
primary_cur.execute("select create_subxacts(100000)")
wait_replica_caughtup(primary, secondary)
secondary_cur.execute("select count(*) from t")
assert secondary_cur.fetchone() == (100001,)
primary_cur.execute("commit prepared 't1'")
wait_replica_caughtup(primary, secondary)
secondary_cur.execute("select count(*) from t")
assert secondary_cur.fetchone() == (200001,)

View File

@@ -1,32 +0,0 @@
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, wait_replica_caughtup
@pytest.mark.xfail
def test_replication_start(neon_simple_env: NeonEnv):
env = neon_simple_env
with env.endpoints.create_start(branch_name="main", endpoint_id="primary") as primary:
with primary.connect() as p_con:
with p_con.cursor() as p_cur:
p_cur.execute("begin")
p_cur.execute("create table t(pk integer primary key, payload integer)")
p_cur.execute("insert into t values (generate_series(1,100000), 0)")
p_cur.execute("select txid_current()")
xid = p_cur.fetchall()[0][0]
log.info(f"Master transaction {xid}")
with env.endpoints.new_replica_start(
origin=primary, endpoint_id="secondary"
) as secondary:
wait_replica_caughtup(primary, secondary)
with secondary.connect() as s_con:
with s_con.cursor() as s_cur:
# Enforce setting hint bits for pg_class tuples.
# If master's transaction is not marked as in-progress in MVCC snapshot,
# then XMIN_INVALID hint bit will be set for table's 't' tuple makeing it invisible.
s_cur.execute("select * from pg_class")
p_cur.execute("commit")
wait_replica_caughtup(primary, secondary)
s_cur.execute("select * from t where pk = 1")
assert s_cur.fetchone() == (1, 0)

View File

@@ -1,5 +1,5 @@
{
"v16": ["16.3", "e06bebc75306b583e758b52c95946d41109239b2"],
"v15": ["15.7", "f54d7373eb0de5a54bce2becdb1c801026c7edff"],
"v14": ["14.12", "223dd925959f8124711dd3d867dc8ba6629d52c0"]
"v16": ["16.3", "b810fdfcbb59afea7ea7bbe0cf94eaccb55a2ea2"],
"v15": ["15.7", "4874c8e52ed349a9f8290bbdcd91eb92677a5d24"],
"v14": ["14.12", "ad73770c446ea361f43e4f0404798b7e5e7a62d8"]
}