mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-17 10:22:56 +00:00
Compare commits
312 Commits
hackathon/
...
release-58
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
feeb2dc6fa | ||
|
|
57f476ff5a | ||
|
|
7ee2bebdb7 | ||
|
|
be598f1bf4 | ||
|
|
939b5954a5 | ||
|
|
371020fe6a | ||
|
|
f45818abed | ||
|
|
0384267d58 | ||
|
|
62b3bd968a | ||
|
|
e3e3bc3542 | ||
|
|
be014a2222 | ||
|
|
2e1fe71cc0 | ||
|
|
068c158ca5 | ||
|
|
b16e4f689f | ||
|
|
dbff725a0c | ||
|
|
7fa4628434 | ||
|
|
fc538a38b9 | ||
|
|
c2e7cb324f | ||
|
|
101043122e | ||
|
|
c4d7d59825 | ||
|
|
0de1e1d664 | ||
|
|
271598b77f | ||
|
|
459bc479dc | ||
|
|
c213373a59 | ||
|
|
e0addc100d | ||
|
|
0519138b04 | ||
|
|
5da39b469c | ||
|
|
82027e22dd | ||
|
|
c431e2f1c5 | ||
|
|
4e5724d9c3 | ||
|
|
0d3e499059 | ||
|
|
7b860b837c | ||
|
|
41fc96e20f | ||
|
|
fb2b1ce57b | ||
|
|
464717451b | ||
|
|
c6ed86d3d0 | ||
|
|
f0a9017008 | ||
|
|
bb7949ba00 | ||
|
|
1df0f69664 | ||
|
|
970066a914 | ||
|
|
1ebd3897c0 | ||
|
|
6460beffcd | ||
|
|
6f7f8958db | ||
|
|
936a00e077 | ||
|
|
96a4e8de66 | ||
|
|
01180666b0 | ||
|
|
6c94269c32 | ||
|
|
edc691647d | ||
|
|
855d7b4781 | ||
|
|
c49c9707ce | ||
|
|
2227540a0d | ||
|
|
f1347f2417 | ||
|
|
30b295b017 | ||
|
|
1cef395266 | ||
|
|
78d160f76d | ||
|
|
b9238059d6 | ||
|
|
d0cb4b88c8 | ||
|
|
1ec3e39d4e | ||
|
|
a1a74eef2c | ||
|
|
90e689adda | ||
|
|
f0b2d4b053 | ||
|
|
299d9474c9 | ||
|
|
7234208b36 | ||
|
|
93450f11f5 | ||
|
|
2f0f9edf33 | ||
|
|
d424f2b7c8 | ||
|
|
21315e80bc | ||
|
|
483b66d383 | ||
|
|
aa72a22661 | ||
|
|
5c0264b591 | ||
|
|
9f13277729 | ||
|
|
54aa319805 | ||
|
|
4a227484bf | ||
|
|
2f83f85291 | ||
|
|
d6cfcb0d93 | ||
|
|
392843ad2a | ||
|
|
bd4dae8f4a | ||
|
|
b05fe53cfd | ||
|
|
c13a2f0df1 | ||
|
|
39be366fc5 | ||
|
|
6eda0a3158 | ||
|
|
306c7a1813 | ||
|
|
80be423a58 | ||
|
|
5dcfef82f2 | ||
|
|
e67b8f69c0 | ||
|
|
e546872ab4 | ||
|
|
322ea1cf7c | ||
|
|
3633742de9 | ||
|
|
079d3a37ba | ||
|
|
a46e77b476 | ||
|
|
a92702b01e | ||
|
|
8ff3253f20 | ||
|
|
04b82c92a7 | ||
|
|
e5bf423e68 | ||
|
|
60af392e45 | ||
|
|
661fc41e71 | ||
|
|
702c488f32 | ||
|
|
45c5122754 | ||
|
|
558394f710 | ||
|
|
73b0898608 | ||
|
|
e65be4c2dc | ||
|
|
40087b8164 | ||
|
|
c762b59483 | ||
|
|
5d71601ca9 | ||
|
|
a113c3e433 | ||
|
|
e81fc598f4 | ||
|
|
48b845fa76 | ||
|
|
27096858dc | ||
|
|
4430d0ae7d | ||
|
|
6e183aa0de | ||
|
|
fd6d0b7635 | ||
|
|
3710c32aae | ||
|
|
be83bee49d | ||
|
|
cf28e5922a | ||
|
|
7d384d6953 | ||
|
|
4b3b37b912 | ||
|
|
1d8d200f4d | ||
|
|
0d80d6ce18 | ||
|
|
f653ee039f | ||
|
|
e614a95853 | ||
|
|
850db4cc13 | ||
|
|
8a316b1277 | ||
|
|
4d13bae449 | ||
|
|
49377abd98 | ||
|
|
a6b2f4e54e | ||
|
|
face60d50b | ||
|
|
9768aa27f2 | ||
|
|
96b2e575e1 | ||
|
|
7222777784 | ||
|
|
5469fdede0 | ||
|
|
72aa6b9fdd | ||
|
|
ae0634b7be | ||
|
|
70711f32fa | ||
|
|
52a88af0aa | ||
|
|
b7a43bf817 | ||
|
|
dce91b33a4 | ||
|
|
23ee4f3050 | ||
|
|
46857e8282 | ||
|
|
368ab0ce54 | ||
|
|
a5987eebfd | ||
|
|
6686ede30f | ||
|
|
373c7057cc | ||
|
|
7d6ec16166 | ||
|
|
0e6fdc8a58 | ||
|
|
521438a5c6 | ||
|
|
07d7874bc8 | ||
|
|
1804111a02 | ||
|
|
cd0178efed | ||
|
|
333574be57 | ||
|
|
79a799a143 | ||
|
|
9da06af6c9 | ||
|
|
ce1753d036 | ||
|
|
67db8432b4 | ||
|
|
4e2e44e524 | ||
|
|
ed786104f3 | ||
|
|
84b74f2bd1 | ||
|
|
fec2ad6283 | ||
|
|
98eebd4682 | ||
|
|
2f74287c9b | ||
|
|
aee1bf95e3 | ||
|
|
b9de9d75ff | ||
|
|
7943b709e6 | ||
|
|
d7d066d493 | ||
|
|
e78ac22107 | ||
|
|
76a8f2bb44 | ||
|
|
8d59a8581f | ||
|
|
b1ddd01289 | ||
|
|
6eae4fc9aa | ||
|
|
765455bca2 | ||
|
|
4204960942 | ||
|
|
67345d66ea | ||
|
|
2266ee5971 | ||
|
|
b58445d855 | ||
|
|
36050e7f3d | ||
|
|
33360ed96d | ||
|
|
39a28d1108 | ||
|
|
efa6aa134f | ||
|
|
2c724e56e2 | ||
|
|
feff887c6f | ||
|
|
353d915fcf | ||
|
|
2e38098cbc | ||
|
|
a6fe5ea1ac | ||
|
|
05b0aed0c1 | ||
|
|
cd1705357d | ||
|
|
6bc7561290 | ||
|
|
fbd3ac14b5 | ||
|
|
e437787c8f | ||
|
|
3460dbf90b | ||
|
|
6b89d99677 | ||
|
|
6cc8ea86e4 | ||
|
|
e62a492d6f | ||
|
|
a475cdf642 | ||
|
|
7002c79a47 | ||
|
|
ee6cf357b4 | ||
|
|
e5c2086b5f | ||
|
|
5f1208296a | ||
|
|
88e8e473cd | ||
|
|
b0a77844f6 | ||
|
|
1baf464307 | ||
|
|
e9b8e81cea | ||
|
|
85d6194aa4 | ||
|
|
333a7a68ef | ||
|
|
6aa4e41bee | ||
|
|
840183e51f | ||
|
|
cbccc94b03 | ||
|
|
fce227df22 | ||
|
|
bd787e800f | ||
|
|
4a7704b4a3 | ||
|
|
ff1119da66 | ||
|
|
4c3ba1627b | ||
|
|
1407174fb2 | ||
|
|
ec9dcb1889 | ||
|
|
d11d781afc | ||
|
|
4e44565b71 | ||
|
|
4ed51ad33b | ||
|
|
1c1ebe5537 | ||
|
|
c19cb7f386 | ||
|
|
4b97d31b16 | ||
|
|
923ade3dd7 | ||
|
|
b04e711975 | ||
|
|
afd0a6b39a | ||
|
|
99752286d8 | ||
|
|
15df93363c | ||
|
|
bc0ab741af | ||
|
|
51d9dfeaa3 | ||
|
|
f63cb18155 | ||
|
|
0de603d88e | ||
|
|
240913912a | ||
|
|
91a4ea0de2 | ||
|
|
8608704f49 | ||
|
|
efef68ce99 | ||
|
|
8daefd24da | ||
|
|
46cc8b7982 | ||
|
|
38cd90dd0c | ||
|
|
a51b269f15 | ||
|
|
43bf6d0a0f | ||
|
|
15273a9b66 | ||
|
|
78aca668d0 | ||
|
|
acbf4148ea | ||
|
|
6508540561 | ||
|
|
a41b5244a8 | ||
|
|
2b3189be95 | ||
|
|
248563c595 | ||
|
|
14cd6ca933 | ||
|
|
eb36403e71 | ||
|
|
3c6f779698 | ||
|
|
f67f0c1c11 | ||
|
|
edb02d3299 | ||
|
|
664a69e65b | ||
|
|
478322ebf9 | ||
|
|
802f174072 | ||
|
|
47f9890bae | ||
|
|
262265daad | ||
|
|
300da5b872 | ||
|
|
7b22b5c433 | ||
|
|
ffca97bc1e | ||
|
|
cb356f3259 | ||
|
|
c85374295f | ||
|
|
4992160677 | ||
|
|
bd535b3371 | ||
|
|
d90c5a03af | ||
|
|
2d02cc9079 | ||
|
|
49ad94b99f | ||
|
|
948a217398 | ||
|
|
125381eae7 | ||
|
|
cd01bbc715 | ||
|
|
d8b5e3b88d | ||
|
|
06d25f2186 | ||
|
|
f759b561f3 | ||
|
|
ece0555600 | ||
|
|
73ea0a0b01 | ||
|
|
d8f6d6fd6f | ||
|
|
d24de169a7 | ||
|
|
0816168296 | ||
|
|
277b44d57a | ||
|
|
68c2c3880e | ||
|
|
49da498f65 | ||
|
|
2c76ba3dd7 | ||
|
|
dbe3dc69ad | ||
|
|
8e5bb3ed49 | ||
|
|
ab0be7b8da | ||
|
|
b4c55f5d24 | ||
|
|
ede70d833c | ||
|
|
70c3d18bb0 | ||
|
|
7a491f52c4 | ||
|
|
323c4ecb4f | ||
|
|
3d2466607e | ||
|
|
ed478b39f4 | ||
|
|
91585a558d | ||
|
|
93467eae1f | ||
|
|
f3aac81d19 | ||
|
|
979ad60c19 | ||
|
|
9316cb1b1f | ||
|
|
e7939a527a | ||
|
|
36d26665e1 | ||
|
|
873347f977 | ||
|
|
e814ac16f9 | ||
|
|
ad3055d386 | ||
|
|
94e03eb452 | ||
|
|
380f26ef79 | ||
|
|
3c5b7f59d7 | ||
|
|
fee89f80b5 | ||
|
|
41cce8eaf1 | ||
|
|
f88fe0218d | ||
|
|
cc856eca85 | ||
|
|
cf350c6002 | ||
|
|
0ce6b6a0a3 | ||
|
|
73f247d537 | ||
|
|
960be82183 | ||
|
|
806e5a6c19 | ||
|
|
8d5df07cce | ||
|
|
df7a9d1407 |
@@ -343,7 +343,33 @@ impl WalIngest {
|
||||
xlog_checkpoint.oldestActiveXid,
|
||||
self.checkpoint.oldestActiveXid
|
||||
);
|
||||
self.checkpoint.oldestActiveXid = xlog_checkpoint.oldestActiveXid;
|
||||
|
||||
// A shutdown checkpoint has `oldestActiveXid == InvalidTransactionid`,
|
||||
// because at shutdown, all in-progress transactions will implicitly
|
||||
// end. Postgres startup code knows that, and allows hot standby to start
|
||||
// immediately from a shutdown checkpoint.
|
||||
//
|
||||
// In Neon, Postgres hot standby startup always behaves as if starting from
|
||||
// an online checkpoint. It needs a valid `oldestActiveXid` value, so
|
||||
// instead of overwriting self.checkpoint.oldestActiveXid with
|
||||
// InvalidTransactionid from the checkpoint WAL record, update it to a
|
||||
// proper value, knowing that there are no in-progress transactions at this
|
||||
// point, except for prepared transactions.
|
||||
//
|
||||
// See also the neon code changes in the InitWalRecovery() function.
|
||||
if xlog_checkpoint.oldestActiveXid == pg_constants::INVALID_TRANSACTION_ID
|
||||
&& info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
|
||||
{
|
||||
let mut oldest_active_xid = self.checkpoint.nextXid.value as u32;
|
||||
for xid in modification.tline.list_twophase_files(lsn, ctx).await? {
|
||||
if (xid.wrapping_sub(oldest_active_xid) as i32) < 0 {
|
||||
oldest_active_xid = xid;
|
||||
}
|
||||
}
|
||||
self.checkpoint.oldestActiveXid = oldest_active_xid;
|
||||
} else {
|
||||
self.checkpoint.oldestActiveXid = xlog_checkpoint.oldestActiveXid;
|
||||
}
|
||||
|
||||
// Write a new checkpoint key-value pair on every checkpoint record, even
|
||||
// if nothing really changed. Not strictly required, but it seems nice to
|
||||
@@ -375,6 +401,7 @@ impl WalIngest {
|
||||
if info == pg_constants::XLOG_RUNNING_XACTS {
|
||||
let xlrec = crate::walrecord::XlRunningXacts::decode(&mut buf);
|
||||
self.checkpoint.oldestActiveXid = xlrec.oldest_running_xid;
|
||||
self.checkpoint_modified = true;
|
||||
}
|
||||
}
|
||||
pg_constants::RM_REPLORIGIN_ID => {
|
||||
@@ -1277,13 +1304,10 @@ impl WalIngest {
|
||||
xlrec.pageno, xlrec.oldest_xid, xlrec.oldest_xid_db
|
||||
);
|
||||
|
||||
// Here we treat oldestXid and oldestXidDB
|
||||
// differently from postgres redo routines.
|
||||
// In postgres checkpoint.oldestXid lags behind xlrec.oldest_xid
|
||||
// until checkpoint happens and updates the value.
|
||||
// Here we can use the most recent value.
|
||||
// It's just an optimization, though and can be deleted.
|
||||
// TODO Figure out if there will be any issues with replica.
|
||||
// In Postgres, oldestXid and oldestXidDB are updated in memory when the CLOG is
|
||||
// truncated, but a checkpoint record with the updated values isn't written until
|
||||
// later. In Neon, a server can start at any LSN, not just on a checkpoint record,
|
||||
// so we keep the oldestXid and oldestXidDB up-to-date.
|
||||
self.checkpoint.oldestXid = xlrec.oldest_xid;
|
||||
self.checkpoint.oldestXidDB = xlrec.oldest_xid_db;
|
||||
self.checkpoint_modified = true;
|
||||
|
||||
293
pgxn/neon/neon.c
293
pgxn/neon/neon.c
@@ -12,6 +12,8 @@
|
||||
#include "fmgr.h"
|
||||
|
||||
#include "miscadmin.h"
|
||||
#include "access/subtrans.h"
|
||||
#include "access/twophase.h"
|
||||
#include "access/xact.h"
|
||||
#include "access/xlog.h"
|
||||
#include "storage/buf_internals.h"
|
||||
@@ -22,10 +24,12 @@
|
||||
#include "replication/logical.h"
|
||||
#include "replication/slot.h"
|
||||
#include "replication/walsender.h"
|
||||
#include "storage/proc.h"
|
||||
#include "storage/procsignal.h"
|
||||
#include "tcop/tcopprot.h"
|
||||
#include "funcapi.h"
|
||||
#include "access/htup_details.h"
|
||||
#include "utils/builtins.h"
|
||||
#include "utils/pg_lsn.h"
|
||||
#include "utils/guc.h"
|
||||
#include "utils/wait_event.h"
|
||||
@@ -266,6 +270,293 @@ LogicalSlotsMonitorMain(Datum main_arg)
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* XXX: These private to procarray.c, but we need them here.
|
||||
*/
|
||||
#define PROCARRAY_MAXPROCS (MaxBackends + max_prepared_xacts)
|
||||
#define TOTAL_MAX_CACHED_SUBXIDS \
|
||||
((PGPROC_MAX_CACHED_SUBXIDS + 1) * PROCARRAY_MAXPROCS)
|
||||
|
||||
/*
|
||||
* Restore running-xact information by scanning the CLOG at startup.
|
||||
*
|
||||
* In PostgreSQL, a standby always has to wait for a running-xacts WAL record
|
||||
* to arrive before it can start accepting queries. Furthermore, if there are
|
||||
* transactions with too many subxids (> 64) open to fit in the in-memory
|
||||
* subxids cache, the running-xacts record will be marked as "suboverflowed",
|
||||
* and the standby will need to also wait for the currently in-progress
|
||||
* transactions to finish.
|
||||
*
|
||||
* That's not great in PostgreSQL, because a hot standby does not necessary
|
||||
* open up for queries immediately as you might expect. But it's worse in
|
||||
* Neon: A standby in Neon doesn't need to start WAL replay from a checkpoint
|
||||
* record; it can start at any LSN. Postgres arranges things so that there is
|
||||
* a running-xacts record soon after every checkpoint record, but when you
|
||||
* start from an arbitrary LSN, that doesn't help. If the primary is idle, or
|
||||
* not running at all, it might never write a new running-xacts record,
|
||||
* leaving the replica in a limbo where it can never start accepting queries.
|
||||
*
|
||||
* To mitigate that, we have an additional mechanism to find the running-xacts
|
||||
* information: we scan the CLOG, making note of any XIDs not marked as
|
||||
* committed or aborted. They are added to the Postgres known-assigned XIDs
|
||||
* array by calling ProcArrayApplyRecoveryInfo() in the caller of this
|
||||
* function.
|
||||
*
|
||||
* There is one big limitation with that mechanism: The size of the
|
||||
* known-assigned XIDs is limited, so if there are a lot of in-progress XIDs,
|
||||
* we have to give up. Furthermore, we don't know how many of the in-progress
|
||||
* XIDs are subtransactions, and if we use up all the space in the
|
||||
* known-assigned XIDs array for subtransactions, we might run out of space in
|
||||
* the array later during WAL replay, causing the replica to shut down with
|
||||
* "ERROR: too many KnownAssignedXids". The safe # of XIDs that we can add to
|
||||
* the known-assigned array without risking that error later is very low,
|
||||
* merely PGPROC_MAX_CACHED_SUBXIDS == 64, so we take our chances and use up
|
||||
* to half of the known-assigned XIDs array for the subtransactions, even
|
||||
* though that risks getting the error later.
|
||||
*
|
||||
* Note: It's OK if the recovered list of XIDs includes some transactions that
|
||||
* have crashed in the primary, and hence will never commit. They will be seen
|
||||
* as in-progress, until we see a new next running-acts record with an
|
||||
* oldestActiveXid that invalidates them. That's how the known-assigned XIDs
|
||||
* array always works.
|
||||
*
|
||||
* If scraping the CLOG doesn't succeed for some reason, like the subxid
|
||||
* overflow, Postgres will fall back to waiting for a running-xacts record
|
||||
* like usual.
|
||||
*
|
||||
* Returns true if a complete list of in-progress XIDs was scraped.
|
||||
*/
|
||||
static bool
|
||||
RestoreRunningXactsFromClog(CheckPoint *checkpoint, TransactionId **xids, int *nxids)
|
||||
{
|
||||
TransactionId from;
|
||||
TransactionId till;
|
||||
int max_xcnt;
|
||||
TransactionId *prepared_xids = NULL;
|
||||
int n_prepared_xids;
|
||||
TransactionId *restored_xids = NULL;
|
||||
int n_restored_xids;
|
||||
int next_prepared_idx;
|
||||
|
||||
Assert(*xids == NULL);
|
||||
|
||||
/*
|
||||
* If the checkpoint doesn't have a valid oldestActiveXid, bail out. We
|
||||
* don't know where to start the scan.
|
||||
*
|
||||
* This shouldn't happen, because the pageserver always maintains a valid
|
||||
* oldestActiveXid nowadays. Except when starting at an old point in time
|
||||
* that was ingested before the pageserver was taught to do that.
|
||||
*/
|
||||
if (!TransactionIdIsValid(checkpoint->oldestActiveXid))
|
||||
{
|
||||
elog(LOG, "cannot restore running-xacts from CLOG because oldestActiveXid is not set");
|
||||
goto fail;
|
||||
}
|
||||
|
||||
/*
|
||||
* We will scan the CLOG starting from the oldest active XID.
|
||||
*
|
||||
* In some corner cases, the oldestActiveXid from the last checkpoint
|
||||
* might already have been truncated from the CLOG. That is,
|
||||
* oldestActiveXid might be older than oldestXid. That's possible because
|
||||
* oldestActiveXid is only updated at checkpoints. After the last
|
||||
* checkpoint, the oldest transaction might have committed, and the CLOG
|
||||
* might also have been already truncated. So if oldestActiveXid is older
|
||||
* than oldestXid, start at oldestXid instead. (Otherwise we'd try to
|
||||
* access CLOG segments that have already been truncated away.)
|
||||
*/
|
||||
from = TransactionIdPrecedes(checkpoint->oldestXid, checkpoint->oldestActiveXid)
|
||||
? checkpoint->oldestActiveXid : checkpoint->oldestXid;
|
||||
till = XidFromFullTransactionId(checkpoint->nextXid);
|
||||
|
||||
/*
|
||||
* To avoid "too many KnownAssignedXids" error later during replay, we
|
||||
* limit number of collected transactions. This is a tradeoff: if we are
|
||||
* willing to consume more of the KnownAssignedXids space for the XIDs
|
||||
* now, that allows us to start up, but we might run out of space later.
|
||||
*
|
||||
* The size of the KnownAssignedXids array is TOTAL_MAX_CACHED_SUBXIDS,
|
||||
* which is (PGPROC_MAX_CACHED_SUBXIDS + 1) * PROCARRAY_MAXPROCS). In
|
||||
* PostgreSQL, that's always enough because the primary will always write
|
||||
* an XLOG_XACT_ASSIGNMENT record if a transaction has more than
|
||||
* PGPROC_MAX_CACHED_SUBXIDS subtransactions. Seeing that record allows
|
||||
* the standby to mark the XIDs in pg_subtrans and removing them from the
|
||||
* KnowingAssignedXids array.
|
||||
*
|
||||
* Here, we don't know which XIDs belong to subtransactions that have
|
||||
* already been WAL-logged with an XLOG_XACT_ASSIGNMENT record. If we
|
||||
* wanted to be totally safe and avoid the possibility of getting a "too
|
||||
* many KnownAssignedXids" error later, we would have to limit ourselves
|
||||
* to PGPROC_MAX_CACHED_SUBXIDS, which is not much. And that includes top
|
||||
* transaction IDs too, because we cannot distinguish between top
|
||||
* transaction IDs and subtransactions here.
|
||||
*
|
||||
* Somewhat arbitrarily, we use up to half of KnownAssignedXids. That
|
||||
* strikes a sensible balance between being useful, and risking a "too
|
||||
* many KnownAssignedXids" error later.
|
||||
*/
|
||||
max_xcnt = TOTAL_MAX_CACHED_SUBXIDS / 2;
|
||||
|
||||
/*
|
||||
* Collect XIDs of prepared transactions in an array. This includes only
|
||||
* their top-level XIDs. We assume that StandbyRecoverPreparedTransactions
|
||||
* has already been called, so we can find all the sub-transactions in
|
||||
* pg_subtrans.
|
||||
*/
|
||||
PrescanPreparedTransactions(&prepared_xids, &n_prepared_xids);
|
||||
qsort(prepared_xids, n_prepared_xids, sizeof(TransactionId), xidLogicalComparator);
|
||||
|
||||
/*
|
||||
* Scan the CLOG, collecting in-progress XIDs into 'restored_xids'.
|
||||
*/
|
||||
elog(DEBUG1, "scanning CLOG between %u and %u for in-progress XIDs", from, till);
|
||||
restored_xids = (TransactionId *) palloc(max_xcnt * sizeof(TransactionId));
|
||||
n_restored_xids = 0;
|
||||
next_prepared_idx = 0;
|
||||
for (TransactionId xid = from; xid != till;)
|
||||
{
|
||||
XLogRecPtr xidlsn;
|
||||
XidStatus xidstatus;
|
||||
|
||||
xidstatus = TransactionIdGetStatus(xid, &xidlsn);
|
||||
|
||||
/*
|
||||
* "Merge" the prepared transactions into the restored_xids array as
|
||||
* we go. The prepared transactions array is sorted. This is mostly
|
||||
* a sanity check to ensure that all the prpeared transactions are
|
||||
* seen as in-progress. (There is a check after the loop that we didn't
|
||||
* miss any.)
|
||||
*/
|
||||
if (next_prepared_idx < n_prepared_xids && xid == prepared_xids[next_prepared_idx])
|
||||
{
|
||||
/*
|
||||
* This is a top-level transaction ID of a prepared transaction.
|
||||
* Include it in the array.
|
||||
*/
|
||||
|
||||
/* sanity check */
|
||||
if (xidstatus != TRANSACTION_STATUS_IN_PROGRESS)
|
||||
{
|
||||
elog(LOG, "prepared transaction %u has unexpected status %X, cannot restore running-xacts from CLOG",
|
||||
xid, xidstatus);
|
||||
Assert(false);
|
||||
goto fail;
|
||||
}
|
||||
|
||||
elog(DEBUG1, "XID %u: was next prepared xact (%d / %d)", xid, next_prepared_idx, n_prepared_xids);
|
||||
next_prepared_idx++;
|
||||
}
|
||||
else if (xidstatus == TRANSACTION_STATUS_COMMITTED)
|
||||
{
|
||||
elog(DEBUG1, "XID %u: was committed", xid);
|
||||
goto skip;
|
||||
}
|
||||
else if (xidstatus == TRANSACTION_STATUS_ABORTED)
|
||||
{
|
||||
elog(DEBUG1, "XID %u: was aborted", xid);
|
||||
goto skip;
|
||||
}
|
||||
else if (xidstatus == TRANSACTION_STATUS_IN_PROGRESS)
|
||||
{
|
||||
/*
|
||||
* In-progress transactions are included in the array.
|
||||
*
|
||||
* Except subtransactions of the prepared transactions. They are
|
||||
* already set in pg_subtrans, and hence don't need to be tracked
|
||||
* in the known-assigned XIDs array.
|
||||
*/
|
||||
if (n_prepared_xids > 0)
|
||||
{
|
||||
TransactionId parent = SubTransGetParent(xid);
|
||||
|
||||
if (TransactionIdIsValid(parent))
|
||||
{
|
||||
/*
|
||||
* This is a subtransaction belonging to a prepared
|
||||
* transaction.
|
||||
*
|
||||
* Sanity check that it is in the prepared XIDs array. It
|
||||
* should be, because StandbyRecoverPreparedTransactions
|
||||
* populated pg_subtrans, and no other XID should be set
|
||||
* in it yet. (This also relies on the fact that
|
||||
* StandbyRecoverPreparedTransactions sets the parent of
|
||||
* each subxid to point directly to the top-level XID,
|
||||
* rather than restoring the original subtransaction
|
||||
* hierarchy.)
|
||||
*/
|
||||
if (bsearch(&parent, prepared_xids, next_prepared_idx,
|
||||
sizeof(TransactionId), xidLogicalComparator) == NULL)
|
||||
{
|
||||
elog(LOG, "sub-XID %u has unexpected parent %u, cannot restore running-xacts from CLOG",
|
||||
xid, parent);
|
||||
Assert(false);
|
||||
goto fail;
|
||||
}
|
||||
elog(DEBUG1, "XID %u: was a subtransaction of prepared xid %u", xid, parent);
|
||||
goto skip;
|
||||
}
|
||||
}
|
||||
|
||||
/* include it in the array */
|
||||
elog(DEBUG1, "XID %u: is in progress", xid);
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
* SUB_COMMITTED is a transient state used at commit. We don't
|
||||
* expect to see that here.
|
||||
*/
|
||||
elog(LOG, "XID %u has unexpected status %X in pg_xact, cannot restore running-xacts from CLOG",
|
||||
xid, xidstatus);
|
||||
Assert(false);
|
||||
goto fail;
|
||||
}
|
||||
|
||||
if (n_restored_xids >= max_xcnt)
|
||||
{
|
||||
/*
|
||||
* Overflowed. We won't be able to install the RunningTransactions
|
||||
* snapshot.
|
||||
*/
|
||||
elog(LOG, "too many running xacts to restore from the CLOG; oldestXid=%u oldestActiveXid=%u nextXid %u",
|
||||
checkpoint->oldestXid, checkpoint->oldestActiveXid,
|
||||
XidFromFullTransactionId(checkpoint->nextXid));
|
||||
goto fail;
|
||||
}
|
||||
|
||||
restored_xids[n_restored_xids++] = xid;
|
||||
|
||||
skip:
|
||||
TransactionIdAdvance(xid);
|
||||
continue;
|
||||
}
|
||||
|
||||
/* sanity check */
|
||||
if (next_prepared_idx != n_prepared_xids)
|
||||
{
|
||||
elog(LOG, "prepared transaction ID %u was not visited in the CLOG scan, cannot restore running-xacts from CLOG",
|
||||
prepared_xids[next_prepared_idx]);
|
||||
Assert(false);
|
||||
goto fail;
|
||||
}
|
||||
|
||||
elog(LOG, "restored %d running xacts by scanning the CLOG; oldestXid=%u oldestActiveXid=%u nextXid %u",
|
||||
n_restored_xids, checkpoint->oldestXid, checkpoint->oldestActiveXid, XidFromFullTransactionId(checkpoint->nextXid));
|
||||
*nxids = n_restored_xids;
|
||||
*xids = restored_xids;
|
||||
return true;
|
||||
|
||||
fail:
|
||||
*nxids = 0;
|
||||
*xids = NULL;
|
||||
if (restored_xids)
|
||||
pfree(restored_xids);
|
||||
if (prepared_xids)
|
||||
pfree(prepared_xids);
|
||||
return false;
|
||||
}
|
||||
|
||||
void
|
||||
_PG_init(void)
|
||||
{
|
||||
@@ -288,6 +579,8 @@ _PG_init(void)
|
||||
|
||||
pg_init_extension_server();
|
||||
|
||||
restore_running_xacts_callback = RestoreRunningXactsFromClog;
|
||||
|
||||
/*
|
||||
* Important: This must happen after other parts of the extension are
|
||||
* loaded, otherwise any settings to GUCs that were set before the
|
||||
|
||||
@@ -7,7 +7,7 @@ OBJS = \
|
||||
neontest.o
|
||||
|
||||
EXTENSION = neon_test_utils
|
||||
DATA = neon_test_utils--1.1.sql
|
||||
DATA = neon_test_utils--1.2.sql
|
||||
PGFILEDESC = "neon_test_utils - helpers for neon testing and debugging"
|
||||
|
||||
PG_CONFIG = pg_config
|
||||
|
||||
@@ -41,7 +41,7 @@ RETURNS bytea
|
||||
AS 'MODULE_PATHNAME', 'get_raw_page_at_lsn_ex'
|
||||
LANGUAGE C PARALLEL UNSAFE;
|
||||
|
||||
CREATE FUNCTION neon_xlogflush(lsn pg_lsn)
|
||||
CREATE FUNCTION neon_xlogflush(lsn pg_lsn DEFAULT NULL)
|
||||
RETURNS VOID
|
||||
AS 'MODULE_PATHNAME', 'neon_xlogflush'
|
||||
LANGUAGE C PARALLEL UNSAFE;
|
||||
@@ -1,6 +1,6 @@
|
||||
# neon_test_utils extension
|
||||
comment = 'helpers for neon testing and debugging'
|
||||
default_version = '1.1'
|
||||
default_version = '1.2'
|
||||
module_pathname = '$libdir/neon_test_utils'
|
||||
relocatable = true
|
||||
trusted = true
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
#include "access/relation.h"
|
||||
#include "access/xact.h"
|
||||
#include "access/xlog.h"
|
||||
#include "access/xlog_internal.h"
|
||||
#include "catalog/namespace.h"
|
||||
#include "fmgr.h"
|
||||
#include "funcapi.h"
|
||||
@@ -444,11 +445,46 @@ get_raw_page_at_lsn_ex(PG_FUNCTION_ARGS)
|
||||
|
||||
/*
|
||||
* Directly calls XLogFlush(lsn) to flush WAL buffers.
|
||||
*
|
||||
* If 'lsn' is not specified (is NULL), flush all generated WAL.
|
||||
*/
|
||||
Datum
|
||||
neon_xlogflush(PG_FUNCTION_ARGS)
|
||||
{
|
||||
XLogRecPtr lsn = PG_GETARG_LSN(0);
|
||||
XLogRecPtr lsn;
|
||||
|
||||
if (RecoveryInProgress())
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
||||
errmsg("recovery is in progress"),
|
||||
errhint("cannot flush WAL during recovery.")));
|
||||
|
||||
if (!PG_ARGISNULL(0))
|
||||
lsn = PG_GETARG_LSN(0);
|
||||
else
|
||||
{
|
||||
lsn = GetXLogInsertRecPtr();
|
||||
|
||||
/*---
|
||||
* The LSN returned by GetXLogInsertRecPtr() is the position where the
|
||||
* next inserted record would begin. If the last record ended just at
|
||||
* the page boundary, the next record will begin after the page header
|
||||
* on the next page, and that's what GetXLogInsertRecPtr().returns,
|
||||
* but the page header has not been written yet. If we tried to flush
|
||||
* it, XLogFlush() would throw an error:
|
||||
*
|
||||
* ERROR : xlog flush request %X/%X is not satisfied --- flushed only to %X/%X
|
||||
*
|
||||
* To avoid that, if the insert position points to just after the page
|
||||
* header, back off to page boundary.
|
||||
*/
|
||||
if (lsn % XLOG_BLCKSZ == SizeOfXLogShortPHD &&
|
||||
XLogSegmentOffset(lsn, wal_segment_size) > XLOG_BLCKSZ)
|
||||
lsn -= SizeOfXLogShortPHD;
|
||||
else if (lsn % XLOG_BLCKSZ == SizeOfXLogLongPHD &&
|
||||
XLogSegmentOffset(lsn, wal_segment_size) < XLOG_BLCKSZ)
|
||||
lsn -= SizeOfXLogLongPHD;
|
||||
}
|
||||
|
||||
XLogFlush(lsn);
|
||||
PG_RETURN_VOID();
|
||||
|
||||
@@ -3491,7 +3491,6 @@ class Endpoint(PgProtocol, LogUtils):
|
||||
):
|
||||
super().__init__(host="localhost", port=pg_port, user="cloud_admin", dbname="postgres")
|
||||
self.env = env
|
||||
self.running = False
|
||||
self.branch_name: Optional[str] = None # dubious
|
||||
self.endpoint_id: Optional[str] = None # dubious, see asserts below
|
||||
self.pgdata_dir: Optional[str] = None # Path to computenode PGDATA
|
||||
@@ -3857,7 +3856,9 @@ class EndpointFactory:
|
||||
|
||||
return self
|
||||
|
||||
def new_replica(self, origin: Endpoint, endpoint_id: str, config_lines: Optional[List[str]]):
|
||||
def new_replica(
|
||||
self, origin: Endpoint, endpoint_id: str, config_lines: Optional[List[str]] = None
|
||||
):
|
||||
branch_name = origin.branch_name
|
||||
assert origin in self.endpoints
|
||||
assert branch_name is not None
|
||||
|
||||
@@ -198,7 +198,7 @@ def wait_for_last_record_lsn(
|
||||
lsn: Lsn,
|
||||
) -> Lsn:
|
||||
"""waits for pageserver to catch up to a certain lsn, returns the last observed lsn."""
|
||||
for i in range(100):
|
||||
for i in range(1000):
|
||||
current_lsn = last_record_lsn(pageserver_http, tenant, timeline)
|
||||
if current_lsn >= lsn:
|
||||
return current_lsn
|
||||
|
||||
646
test_runner/regress/test_replica_start.py
Normal file
646
test_runner/regress/test_replica_start.py
Normal file
@@ -0,0 +1,646 @@
|
||||
"""
|
||||
In PostgreSQL, a standby always has to wait for a running-xacts WAL record to
|
||||
arrive before it can start accepting queries. Furthermore, if there are
|
||||
transactions with too many subxids (> 64) open to fit in the in-memory subxids
|
||||
cache, the running-xacts record will be marked as "suboverflowed", and the
|
||||
standby will need to also wait for the currently in-progress transactions to
|
||||
finish.
|
||||
|
||||
In Neon, we have an additional mechanism that scans the CLOG at server startup
|
||||
to determine the list of running transactions, so that the standby can start up
|
||||
immediately without waiting for the running-xacts record, but that mechanism
|
||||
only works if the # of active (sub-)transactions is reasonably small. Otherwise
|
||||
it falls back to waiting. Furthermore, it's somewhat optimistic in using up the
|
||||
known-assigned XIDs array: if too many transactions with subxids are started in
|
||||
the primary later, the replay in the replica will crash with "too many
|
||||
KnownAssignedXids" error.
|
||||
|
||||
This module contains tests for those various cases at standby startup: starting
|
||||
from shutdown checkpoint, using the CLOG scanning mechanism, waiting for
|
||||
running-xacts record and for in-progress transactions to finish etc.
|
||||
"""
|
||||
|
||||
import threading
|
||||
from contextlib import closing
|
||||
|
||||
import psycopg2
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv, wait_for_last_flush_lsn, wait_replica_caughtup
|
||||
from fixtures.pg_version import PgVersion
|
||||
from fixtures.utils import query_scalar, wait_until
|
||||
|
||||
CREATE_SUBXACTS_FUNC = """
|
||||
create or replace function create_subxacts(n integer) returns void as $$
|
||||
declare
|
||||
i integer;
|
||||
begin
|
||||
for i in 1..n loop
|
||||
begin
|
||||
insert into t (payload) values (0);
|
||||
exception
|
||||
when others then
|
||||
raise exception 'caught something: %', sqlerrm;
|
||||
end;
|
||||
end loop;
|
||||
end; $$ language plpgsql
|
||||
"""
|
||||
|
||||
|
||||
def test_replica_start_scan_clog(neon_simple_env: NeonEnv):
|
||||
"""
|
||||
Test the CLOG-scanning mechanism at hot standby startup. There is one
|
||||
transaction active in the primary when the standby is started. The primary
|
||||
is killed before it has a chance to write a running-xacts record. The
|
||||
CLOG-scanning at neon startup allows the standby to start up anyway.
|
||||
|
||||
See the module docstring for background.
|
||||
"""
|
||||
|
||||
# Initialize the primary, a test table, and a helper function to create lots
|
||||
# of subtransactions.
|
||||
env = neon_simple_env
|
||||
primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary")
|
||||
primary_conn = primary.connect()
|
||||
primary_cur = primary_conn.cursor()
|
||||
primary_cur.execute("CREATE EXTENSION neon_test_utils")
|
||||
primary_cur.execute("create table t(pk serial primary key, payload integer)")
|
||||
primary_cur.execute(CREATE_SUBXACTS_FUNC)
|
||||
primary_cur.execute("select pg_switch_wal()")
|
||||
|
||||
# Start a transaction in the primary. Leave the transaction open.
|
||||
#
|
||||
# The transaction has some subtransactions, but not too many to cause the
|
||||
# CLOG-scanning mechanism to give up.
|
||||
primary_cur.execute("begin")
|
||||
primary_cur.execute("select create_subxacts(50)")
|
||||
|
||||
# Wait for the WAL to be flushed, but then immediately kill the primary,
|
||||
# before it has a chance to generate a running-xacts record.
|
||||
primary_cur.execute("select neon_xlogflush()")
|
||||
wait_for_last_flush_lsn(env, primary, env.initial_tenant, env.initial_timeline)
|
||||
primary.stop(mode="immediate")
|
||||
|
||||
# Create a replica. It should start up normally, thanks to the CLOG-scanning
|
||||
# mechanism.
|
||||
secondary = env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary")
|
||||
|
||||
# The transaction did not commit, so it should not be visible in the secondary
|
||||
secondary_conn = secondary.connect()
|
||||
secondary_cur = secondary_conn.cursor()
|
||||
secondary_cur.execute("select count(*) from t")
|
||||
assert secondary_cur.fetchone() == (0,)
|
||||
|
||||
|
||||
def test_replica_start_scan_clog_crashed_xids(neon_simple_env: NeonEnv):
|
||||
"""
|
||||
Test the CLOG-scanning mechanism at hot standby startup, after
|
||||
leaving behind crashed transactions.
|
||||
|
||||
See the module docstring for background.
|
||||
"""
|
||||
|
||||
# Initialize the primary, a test table, and a helper function to create lots
|
||||
# of subtransactions.
|
||||
env = neon_simple_env
|
||||
primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary")
|
||||
primary_conn = primary.connect()
|
||||
primary_cur = primary_conn.cursor()
|
||||
primary_cur.execute("create table t(pk serial primary key, payload integer)")
|
||||
primary_cur.execute(CREATE_SUBXACTS_FUNC)
|
||||
primary_cur.execute("select pg_switch_wal()")
|
||||
|
||||
# Consume a lot of XIDs, then kill Postgres without giving it a
|
||||
# chance to write abort records for them.
|
||||
primary_cur.execute("begin")
|
||||
primary_cur.execute("select create_subxacts(100000)")
|
||||
primary.stop(mode="immediate")
|
||||
|
||||
# Restart the primary. Do some light work, and shut it down cleanly
|
||||
primary.start()
|
||||
primary_conn = primary.connect()
|
||||
primary_cur = primary_conn.cursor()
|
||||
primary_cur.execute("insert into t (payload) values (0)")
|
||||
primary.stop(mode="fast")
|
||||
|
||||
# Create a replica. It should start up normally, thanks to the CLOG-scanning
|
||||
# mechanism. (Restarting the primary writes a checkpoint and/or running-xacts
|
||||
# record, which allows the standby to know that the crashed XIDs are aborted)
|
||||
secondary = env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary")
|
||||
|
||||
secondary_conn = secondary.connect()
|
||||
secondary_cur = secondary_conn.cursor()
|
||||
secondary_cur.execute("select count(*) from t")
|
||||
assert secondary_cur.fetchone() == (1,)
|
||||
|
||||
|
||||
def test_replica_start_at_running_xacts(neon_simple_env: NeonEnv, pg_version):
|
||||
"""
|
||||
Test that starting a replica works right after the primary has
|
||||
created a running-xacts record. This may seem like a trivial case,
|
||||
but during development, we had a bug that was triggered by having
|
||||
oldestActiveXid == nextXid. Starting right after a running-xacts
|
||||
record is one way to test that case.
|
||||
|
||||
See the module docstring for background.
|
||||
"""
|
||||
env = neon_simple_env
|
||||
|
||||
if env.pg_version == PgVersion.V14 or env.pg_version == PgVersion.V15:
|
||||
pytest.skip("pg_log_standby_snapshot() function is available only in PG16")
|
||||
|
||||
primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary")
|
||||
primary_conn = primary.connect()
|
||||
primary_cur = primary_conn.cursor()
|
||||
|
||||
primary_cur.execute("CREATE EXTENSION neon_test_utils")
|
||||
primary_cur.execute("select pg_log_standby_snapshot()")
|
||||
primary_cur.execute("select neon_xlogflush()")
|
||||
wait_for_last_flush_lsn(env, primary, env.initial_tenant, env.initial_timeline)
|
||||
|
||||
secondary = env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary")
|
||||
|
||||
secondary_conn = secondary.connect()
|
||||
secondary_cur = secondary_conn.cursor()
|
||||
secondary_cur.execute("select 123")
|
||||
assert secondary_cur.fetchone() == (123,)
|
||||
|
||||
|
||||
def test_replica_start_wait_subxids_finish(neon_simple_env: NeonEnv):
|
||||
"""
|
||||
Test replica startup when there are a lot of (sub)transactions active in the
|
||||
primary. That's too many for the CLOG-scanning mechanism to handle, so the
|
||||
replica has to wait for the large transaction to finish before it starts to
|
||||
accept queries.
|
||||
|
||||
After replica startup, test MVCC with transactions that were in-progress
|
||||
when the replica was started.
|
||||
|
||||
See the module docstring for background.
|
||||
"""
|
||||
|
||||
# Initialize the primary, a test table, and a helper function to create
|
||||
# lots of subtransactions.
|
||||
env = neon_simple_env
|
||||
primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary")
|
||||
primary_conn = primary.connect()
|
||||
primary_cur = primary_conn.cursor()
|
||||
primary_cur.execute("create table t(pk serial primary key, payload integer)")
|
||||
primary_cur.execute(CREATE_SUBXACTS_FUNC)
|
||||
|
||||
# Start a transaction with 100000 subtransactions, and leave it open. That's
|
||||
# too many to fit in the "known-assigned XIDs array" in the replica, and
|
||||
# also too many to fit in the subxid caches so the running-xacts record will
|
||||
# also overflow.
|
||||
primary_cur.execute("begin")
|
||||
primary_cur.execute("select create_subxacts(100000)")
|
||||
|
||||
# Start another, smaller transaction in the primary. We'll come back to this
|
||||
# later.
|
||||
primary_conn2 = primary.connect()
|
||||
primary_cur2 = primary_conn2.cursor()
|
||||
primary_cur2.execute("begin")
|
||||
primary_cur2.execute("insert into t (payload) values (0)")
|
||||
|
||||
# Create a replica. but before that, wait for the wal to be flushed to
|
||||
# safekeepers, so that the replica is started at a point where the large
|
||||
# transaction is already active. (The whole transaction might not be flushed
|
||||
# yet, but that's OK.)
|
||||
#
|
||||
# Start it in a separate thread, so that we can do other stuff while it's
|
||||
# blocked waiting for the startup to finish.
|
||||
wait_for_last_flush_lsn(env, primary, env.initial_tenant, env.initial_timeline)
|
||||
secondary = env.endpoints.new_replica(origin=primary, endpoint_id="secondary")
|
||||
start_secondary_thread = threading.Thread(target=secondary.start)
|
||||
start_secondary_thread.start()
|
||||
|
||||
# Verify that the replica has otherwise started up, but cannot start
|
||||
# accepting queries yet.
|
||||
log.info("Waiting 5 s to verify that the secondary does not start")
|
||||
start_secondary_thread.join(5)
|
||||
assert secondary.log_contains("consistent recovery state reached")
|
||||
assert secondary.log_contains("started streaming WAL from primary")
|
||||
# The "redo starts" message is printed when the first WAL record is
|
||||
# received. It might or might not be present in the log depending on how
|
||||
# far exactly the WAL was flushed when the replica was started, and whether
|
||||
# background activity caused any more WAL records to be flushed on the
|
||||
# primary afterwards.
|
||||
#
|
||||
# assert secondary.log_contains("redo # starts")
|
||||
|
||||
# should not be open for connections yet
|
||||
assert start_secondary_thread.is_alive()
|
||||
assert not secondary.is_running()
|
||||
assert not secondary.log_contains("database system is ready to accept read-only connections")
|
||||
|
||||
# Commit the large transaction in the primary.
|
||||
#
|
||||
# Within the next 15 s, the primary should write a new running-xacts record
|
||||
# to the WAL which shows the transaction as completed. Once the replica
|
||||
# replays that record, it will start accepting queries.
|
||||
primary_cur.execute("commit")
|
||||
start_secondary_thread.join()
|
||||
|
||||
# Verify that the large transaction is correctly visible in the secondary
|
||||
# (but not the second, small transaction, which is still in-progress!)
|
||||
secondary_conn = secondary.connect()
|
||||
secondary_cur = secondary_conn.cursor()
|
||||
secondary_cur.execute("select count(*) from t")
|
||||
assert secondary_cur.fetchone() == (100000,)
|
||||
|
||||
# Perform some more MVCC testing using the second transaction that was
|
||||
# started in the primary before the replica was created
|
||||
primary_cur2.execute("select create_subxacts(10000)")
|
||||
|
||||
# The second transaction still hasn't committed
|
||||
wait_replica_caughtup(primary, secondary)
|
||||
secondary_cur.execute("BEGIN ISOLATION LEVEL REPEATABLE READ")
|
||||
secondary_cur.execute("select count(*) from t")
|
||||
assert secondary_cur.fetchone() == (100000,)
|
||||
|
||||
# Commit the second transaction in the primary
|
||||
primary_cur2.execute("commit")
|
||||
|
||||
# Should still be invisible to the old snapshot
|
||||
wait_replica_caughtup(primary, secondary)
|
||||
secondary_cur.execute("select count(*) from t")
|
||||
assert secondary_cur.fetchone() == (100000,)
|
||||
|
||||
# Commit the REPEATABLE READ transaction in the replica. Both
|
||||
# primary transactions should now be visible to a new snapshot.
|
||||
secondary_cur.execute("commit")
|
||||
secondary_cur.execute("select count(*) from t")
|
||||
assert secondary_cur.fetchone() == (110001,)
|
||||
|
||||
|
||||
def test_replica_too_many_known_assigned_xids(neon_simple_env: NeonEnv):
|
||||
"""
|
||||
The CLOG-scanning mechanism fills the known-assigned XIDs array
|
||||
optimistically at standby startup, betting that it can still fit
|
||||
upcoming transactions replayed later from the WAL in the
|
||||
array. This test tests what happens when that bet fails and the
|
||||
known-assigned XID array fills up after the standby has already
|
||||
been started. The WAL redo will fail with an error:
|
||||
|
||||
FATAL: too many KnownAssignedXids
|
||||
CONTEXT: WAL redo at 0/1895CB0 for neon/INSERT: off: 25, flags: 0x08; blkref #0: rel 1663/5/16385, blk 64
|
||||
|
||||
which causes the standby to shut down.
|
||||
|
||||
See the module docstring for background.
|
||||
"""
|
||||
|
||||
# Initialize the primary, a test table, and a helper function to create lots
|
||||
# of subtransactions.
|
||||
env = neon_simple_env
|
||||
primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary")
|
||||
primary_conn = primary.connect()
|
||||
primary_cur = primary_conn.cursor()
|
||||
primary_cur.execute("CREATE EXTENSION neon_test_utils")
|
||||
primary_cur.execute("create table t(pk serial primary key, payload integer)")
|
||||
primary_cur.execute(CREATE_SUBXACTS_FUNC)
|
||||
|
||||
# Determine how many connections we can use
|
||||
primary_cur.execute("show max_connections")
|
||||
max_connections = int(primary_cur.fetchall()[0][0])
|
||||
primary_cur.execute("show superuser_reserved_connections")
|
||||
superuser_reserved_connections = int(primary_cur.fetchall()[0][0])
|
||||
n_connections = max_connections - superuser_reserved_connections
|
||||
n_subxids = 200
|
||||
|
||||
# Start one top transaction in primary, with lots of subtransactions. This
|
||||
# uses up much of the known-assigned XIDs space in the standby, but doesn't
|
||||
# cause it to overflow.
|
||||
large_p_conn = primary.connect()
|
||||
large_p_cur = large_p_conn.cursor()
|
||||
large_p_cur.execute("begin")
|
||||
large_p_cur.execute(f"select create_subxacts({max_connections} * 30)")
|
||||
|
||||
with closing(primary.connect()) as small_p_conn:
|
||||
with small_p_conn.cursor() as small_p_cur:
|
||||
small_p_cur.execute("select create_subxacts(1)")
|
||||
|
||||
# Create a replica at this LSN
|
||||
primary_cur.execute("select neon_xlogflush()")
|
||||
wait_for_last_flush_lsn(env, primary, env.initial_tenant, env.initial_timeline)
|
||||
secondary = env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary")
|
||||
secondary_conn = secondary.connect()
|
||||
secondary_cur = secondary_conn.cursor()
|
||||
|
||||
# The transaction in primary has not committed yet.
|
||||
wait_replica_caughtup(primary, secondary)
|
||||
secondary_cur.execute("select count(*) from t")
|
||||
assert secondary_cur.fetchone() == (1,)
|
||||
|
||||
# Start max number of top transactions in primary, with a lot of
|
||||
# subtransactions each. We add the subtransactions to each top transaction
|
||||
# in a round-robin fashion, instead of adding a lot of subtransactions to
|
||||
# one top transaction at a time. This way, we will have the max number of
|
||||
# subtransactions in the in-memory subxid cache of each top transaction,
|
||||
# until they all overflow.
|
||||
#
|
||||
# Currently, PGPROC_MAX_CACHED_SUBXIDS == 64, so this will overflow the all
|
||||
# the subxid caches after creating 64 subxids in each top transaction. The
|
||||
# point just before the caches have overflowed is the most interesting point
|
||||
# in time, but we'll keep going beyond that, to ensure that this test is
|
||||
# robust even if PGPROC_MAX_CACHED_SUBXIDS changes.
|
||||
p_curs = []
|
||||
for _ in range(0, n_connections):
|
||||
p_cur = primary.connect().cursor()
|
||||
p_cur.execute("begin")
|
||||
p_curs.append(p_cur)
|
||||
|
||||
for _subxid in range(0, n_subxids):
|
||||
for i in range(0, n_connections):
|
||||
p_curs[i].execute("select create_subxacts(1)")
|
||||
|
||||
# Commit all the transactions in the primary
|
||||
for i in range(0, n_connections):
|
||||
p_curs[i].execute("commit")
|
||||
large_p_cur.execute("commit")
|
||||
|
||||
# Wait until the replica crashes with "too many KnownAssignedXids" error.
|
||||
def check_replica_crashed():
|
||||
try:
|
||||
secondary.connect()
|
||||
except psycopg2.Error:
|
||||
# Once the connection fails, return success
|
||||
return None
|
||||
raise RuntimeError("connection succeeded")
|
||||
|
||||
wait_until(20, 0.5, check_replica_crashed)
|
||||
assert secondary.log_contains("too many KnownAssignedXids")
|
||||
|
||||
# Replica is crashed, so ignore stop result
|
||||
secondary.check_stop_result = False
|
||||
|
||||
|
||||
def test_replica_start_repro_visibility_bug(neon_simple_env: NeonEnv):
|
||||
"""
|
||||
Before PR #7288, a hot standby in neon incorrectly started up
|
||||
immediately, before it had received a running-xacts record. That
|
||||
led to visibility bugs if there were active transactions in the
|
||||
primary. This test reproduces the incorrect query results and
|
||||
incorrectly set hint bits, before that was fixed.
|
||||
"""
|
||||
env = neon_simple_env
|
||||
|
||||
primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary")
|
||||
p_cur = primary.connect().cursor()
|
||||
|
||||
p_cur.execute("begin")
|
||||
p_cur.execute("create table t(pk integer primary key, payload integer)")
|
||||
p_cur.execute("insert into t values (generate_series(1,100000), 0)")
|
||||
|
||||
secondary = env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary")
|
||||
wait_replica_caughtup(primary, secondary)
|
||||
s_cur = secondary.connect().cursor()
|
||||
|
||||
# Set hint bits for pg_class tuples. If primary's transaction is
|
||||
# not marked as in-progress in MVCC snapshot, then XMIN_INVALID
|
||||
# hint bit will be set for table's 't' tuple, making it invisible
|
||||
# even after the commit record is replayed later.
|
||||
s_cur.execute("select * from pg_class")
|
||||
|
||||
p_cur.execute("commit")
|
||||
wait_replica_caughtup(primary, secondary)
|
||||
s_cur.execute("select * from t where pk = 1")
|
||||
assert s_cur.fetchone() == (1, 0)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("shutdown", [True, False])
|
||||
def test_replica_start_with_prepared_xacts(neon_simple_env: NeonEnv, shutdown: bool):
|
||||
"""
|
||||
Test the CLOG-scanning mechanism at hot standby startup in the presence of
|
||||
prepared transactions.
|
||||
|
||||
This test is run in two variants: one where the primary server is shut down
|
||||
before starting the secondary, or not.
|
||||
"""
|
||||
|
||||
# Initialize the primary, a test table, and a helper function to create lots
|
||||
# of subtransactions.
|
||||
env = neon_simple_env
|
||||
primary = env.endpoints.create_start(
|
||||
branch_name="main", endpoint_id="primary", config_lines=["max_prepared_transactions=5"]
|
||||
)
|
||||
primary_conn = primary.connect()
|
||||
primary_cur = primary_conn.cursor()
|
||||
primary_cur.execute("CREATE EXTENSION neon_test_utils")
|
||||
primary_cur.execute("create table t(pk serial primary key, payload integer)")
|
||||
primary_cur.execute("create table t1(pk integer primary key)")
|
||||
primary_cur.execute("create table t2(pk integer primary key)")
|
||||
primary_cur.execute(CREATE_SUBXACTS_FUNC)
|
||||
|
||||
# Prepare a transaction for two-phase commit
|
||||
primary_cur.execute("begin")
|
||||
primary_cur.execute("insert into t1 values (1)")
|
||||
primary_cur.execute("prepare transaction 't1'")
|
||||
|
||||
# Prepare another transaction for two-phase commit, with a subtransaction
|
||||
primary_cur.execute("begin")
|
||||
primary_cur.execute("insert into t2 values (2)")
|
||||
primary_cur.execute("savepoint sp")
|
||||
primary_cur.execute("insert into t2 values (3)")
|
||||
primary_cur.execute("prepare transaction 't2'")
|
||||
|
||||
# Start a transaction in the primary. Leave the transaction open.
|
||||
#
|
||||
# The transaction has some subtransactions, but not too many to cause the
|
||||
# CLOG-scanning mechanism to give up.
|
||||
primary_cur.execute("begin")
|
||||
primary_cur.execute("select create_subxacts(50)")
|
||||
|
||||
# Wait for the WAL to be flushed
|
||||
primary_cur.execute("select neon_xlogflush()")
|
||||
wait_for_last_flush_lsn(env, primary, env.initial_tenant, env.initial_timeline)
|
||||
|
||||
if shutdown:
|
||||
primary.stop(mode="fast")
|
||||
|
||||
# Create a replica. It should start up normally, thanks to the CLOG-scanning
|
||||
# mechanism.
|
||||
secondary = env.endpoints.new_replica_start(
|
||||
origin=primary, endpoint_id="secondary", config_lines=["max_prepared_transactions=5"]
|
||||
)
|
||||
|
||||
# The transaction did not commit, so it should not be visible in the secondary
|
||||
secondary_conn = secondary.connect()
|
||||
secondary_cur = secondary_conn.cursor()
|
||||
secondary_cur.execute("select count(*) from t")
|
||||
assert secondary_cur.fetchone() == (0,)
|
||||
secondary_cur.execute("select count(*) from t1")
|
||||
assert secondary_cur.fetchone() == (0,)
|
||||
secondary_cur.execute("select count(*) from t2")
|
||||
assert secondary_cur.fetchone() == (0,)
|
||||
|
||||
if shutdown:
|
||||
primary.start()
|
||||
primary_conn = primary.connect()
|
||||
primary_cur = primary_conn.cursor()
|
||||
else:
|
||||
primary_cur.execute("commit")
|
||||
primary_cur.execute("commit prepared 't1'")
|
||||
primary_cur.execute("commit prepared 't2'")
|
||||
|
||||
wait_replica_caughtup(primary, secondary)
|
||||
|
||||
secondary_cur.execute("select count(*) from t")
|
||||
if shutdown:
|
||||
assert secondary_cur.fetchone() == (0,)
|
||||
else:
|
||||
assert secondary_cur.fetchone() == (50,)
|
||||
secondary_cur.execute("select * from t1")
|
||||
assert secondary_cur.fetchall() == [(1,)]
|
||||
secondary_cur.execute("select * from t2")
|
||||
assert secondary_cur.fetchall() == [(2,), (3,)]
|
||||
|
||||
|
||||
def test_replica_start_with_prepared_xacts_with_subxacts(neon_simple_env: NeonEnv):
|
||||
"""
|
||||
Test the CLOG-scanning mechanism at hot standby startup in the presence of
|
||||
prepared transactions, with subtransactions.
|
||||
"""
|
||||
|
||||
# Initialize the primary, a test table, and a helper function to create lots
|
||||
# of subtransactions.
|
||||
env = neon_simple_env
|
||||
primary = env.endpoints.create_start(
|
||||
branch_name="main", endpoint_id="primary", config_lines=["max_prepared_transactions=5"]
|
||||
)
|
||||
primary_conn = primary.connect()
|
||||
primary_cur = primary_conn.cursor()
|
||||
|
||||
# Install extension containing function needed for test
|
||||
primary_cur.execute("CREATE EXTENSION neon_test_utils")
|
||||
|
||||
primary_cur.execute("create table t(pk serial primary key, payload integer)")
|
||||
primary_cur.execute(CREATE_SUBXACTS_FUNC)
|
||||
|
||||
# Advance nextXid close to the beginning of the next pg_subtrans segment (2^16 XIDs)
|
||||
#
|
||||
# This is interesting, because it tests that pg_subtrans is initialized correctly
|
||||
# at standby startup. (We had a bug where it didn't at one point during development.)
|
||||
while True:
|
||||
xid = int(query_scalar(primary_cur, "SELECT txid_current()"))
|
||||
log.info(f"xid now {xid}")
|
||||
# Consume 500 transactions at a time until we get close
|
||||
if xid < 65535 - 600:
|
||||
primary_cur.execute("select test_consume_xids(500);")
|
||||
else:
|
||||
break
|
||||
primary_cur.execute("checkpoint")
|
||||
|
||||
# Prepare a transaction for two-phase commit
|
||||
primary_cur.execute("begin")
|
||||
primary_cur.execute("select create_subxacts(1000)")
|
||||
primary_cur.execute("prepare transaction 't1'")
|
||||
|
||||
# Wait for the WAL to be flushed, and stop the primary
|
||||
wait_for_last_flush_lsn(env, primary, env.initial_tenant, env.initial_timeline)
|
||||
primary.stop(mode="fast")
|
||||
|
||||
# Create a replica. It should start up normally, thanks to the CLOG-scanning
|
||||
# mechanism.
|
||||
secondary = env.endpoints.new_replica_start(
|
||||
origin=primary, endpoint_id="secondary", config_lines=["max_prepared_transactions=5"]
|
||||
)
|
||||
|
||||
# The transaction did not commit, so it should not be visible in the secondary
|
||||
secondary_conn = secondary.connect()
|
||||
secondary_cur = secondary_conn.cursor()
|
||||
secondary_cur.execute("select count(*) from t")
|
||||
assert secondary_cur.fetchone() == (0,)
|
||||
|
||||
primary.start()
|
||||
|
||||
# Open a lot of subtransactions in the primary, causing the subxids cache to overflow
|
||||
primary_conn = primary.connect()
|
||||
primary_cur = primary_conn.cursor()
|
||||
primary_cur.execute("select create_subxacts(100000)")
|
||||
|
||||
wait_replica_caughtup(primary, secondary)
|
||||
|
||||
secondary_cur.execute("select count(*) from t")
|
||||
assert secondary_cur.fetchone() == (100000,)
|
||||
|
||||
primary_cur.execute("commit prepared 't1'")
|
||||
|
||||
wait_replica_caughtup(primary, secondary)
|
||||
secondary_cur.execute("select count(*) from t")
|
||||
assert secondary_cur.fetchone() == (101000,)
|
||||
|
||||
|
||||
def test_replica_start_with_prepared_xacts_with_many_subxacts(neon_simple_env: NeonEnv):
|
||||
"""
|
||||
Test the CLOG-scanning mechanism at hot standby startup in the presence of
|
||||
prepared transactions, with lots of subtransactions.
|
||||
|
||||
Like test_replica_start_with_prepared_xacts_with_subxacts, but with more
|
||||
subxacts, to test that the prepared transaction's subxids don't consume
|
||||
space in the known-assigned XIDs array. (They are set in pg_subtrans
|
||||
instead)
|
||||
"""
|
||||
|
||||
# Initialize the primary, a test table, and a helper function to create lots
|
||||
# of subtransactions.
|
||||
env = neon_simple_env
|
||||
primary = env.endpoints.create_start(
|
||||
branch_name="main", endpoint_id="primary", config_lines=["max_prepared_transactions=5"]
|
||||
)
|
||||
primary_conn = primary.connect()
|
||||
primary_cur = primary_conn.cursor()
|
||||
|
||||
# Install extension containing function needed for test
|
||||
primary_cur.execute("CREATE EXTENSION neon_test_utils")
|
||||
|
||||
primary_cur.execute("create table t(pk serial primary key, payload integer)")
|
||||
primary_cur.execute(CREATE_SUBXACTS_FUNC)
|
||||
|
||||
# Prepare a transaction for two-phase commit, with lots of subxids
|
||||
primary_cur.execute("begin")
|
||||
primary_cur.execute("select create_subxacts(50000)")
|
||||
|
||||
# to make things a bit more varied, intersperse a few other XIDs in between
|
||||
# the prepared transaction's sub-XIDs
|
||||
with primary.connect().cursor() as primary_cur2:
|
||||
primary_cur2.execute("insert into t (payload) values (123)")
|
||||
primary_cur2.execute("begin; insert into t (payload) values (-1); rollback")
|
||||
|
||||
primary_cur.execute("select create_subxacts(50000)")
|
||||
primary_cur.execute("prepare transaction 't1'")
|
||||
|
||||
# Wait for the WAL to be flushed
|
||||
wait_for_last_flush_lsn(env, primary, env.initial_tenant, env.initial_timeline)
|
||||
|
||||
primary.stop(mode="fast")
|
||||
|
||||
# Create a replica. It should start up normally, thanks to the CLOG-scanning
|
||||
# mechanism.
|
||||
secondary = env.endpoints.new_replica_start(
|
||||
origin=primary, endpoint_id="secondary", config_lines=["max_prepared_transactions=5"]
|
||||
)
|
||||
|
||||
# The transaction did not commit, so it should not be visible in the secondary
|
||||
secondary_conn = secondary.connect()
|
||||
secondary_cur = secondary_conn.cursor()
|
||||
secondary_cur.execute("select count(*) from t")
|
||||
assert secondary_cur.fetchone() == (1,)
|
||||
|
||||
primary.start()
|
||||
|
||||
# Open a lot of subtransactions in the primary, causing the subxids cache to overflow
|
||||
primary_conn = primary.connect()
|
||||
primary_cur = primary_conn.cursor()
|
||||
primary_cur.execute("select create_subxacts(100000)")
|
||||
|
||||
wait_replica_caughtup(primary, secondary)
|
||||
|
||||
secondary_cur.execute("select count(*) from t")
|
||||
assert secondary_cur.fetchone() == (100001,)
|
||||
|
||||
primary_cur.execute("commit prepared 't1'")
|
||||
|
||||
wait_replica_caughtup(primary, secondary)
|
||||
secondary_cur.execute("select count(*) from t")
|
||||
assert secondary_cur.fetchone() == (200001,)
|
||||
@@ -1,32 +0,0 @@
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv, wait_replica_caughtup
|
||||
|
||||
|
||||
@pytest.mark.xfail
|
||||
def test_replication_start(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
|
||||
with env.endpoints.create_start(branch_name="main", endpoint_id="primary") as primary:
|
||||
with primary.connect() as p_con:
|
||||
with p_con.cursor() as p_cur:
|
||||
p_cur.execute("begin")
|
||||
p_cur.execute("create table t(pk integer primary key, payload integer)")
|
||||
p_cur.execute("insert into t values (generate_series(1,100000), 0)")
|
||||
p_cur.execute("select txid_current()")
|
||||
xid = p_cur.fetchall()[0][0]
|
||||
log.info(f"Master transaction {xid}")
|
||||
with env.endpoints.new_replica_start(
|
||||
origin=primary, endpoint_id="secondary"
|
||||
) as secondary:
|
||||
wait_replica_caughtup(primary, secondary)
|
||||
with secondary.connect() as s_con:
|
||||
with s_con.cursor() as s_cur:
|
||||
# Enforce setting hint bits for pg_class tuples.
|
||||
# If master's transaction is not marked as in-progress in MVCC snapshot,
|
||||
# then XMIN_INVALID hint bit will be set for table's 't' tuple makeing it invisible.
|
||||
s_cur.execute("select * from pg_class")
|
||||
p_cur.execute("commit")
|
||||
wait_replica_caughtup(primary, secondary)
|
||||
s_cur.execute("select * from t where pk = 1")
|
||||
assert s_cur.fetchone() == (1, 0)
|
||||
2
vendor/postgres-v14
vendored
2
vendor/postgres-v14
vendored
Submodule vendor/postgres-v14 updated: 223dd92595...ad73770c44
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: f54d7373eb...4874c8e52e
2
vendor/postgres-v16
vendored
2
vendor/postgres-v16
vendored
Submodule vendor/postgres-v16 updated: e06bebc753...b810fdfcbb
6
vendor/revisions.json
vendored
6
vendor/revisions.json
vendored
@@ -1,5 +1,5 @@
|
||||
{
|
||||
"v16": ["16.3", "e06bebc75306b583e758b52c95946d41109239b2"],
|
||||
"v15": ["15.7", "f54d7373eb0de5a54bce2becdb1c801026c7edff"],
|
||||
"v14": ["14.12", "223dd925959f8124711dd3d867dc8ba6629d52c0"]
|
||||
"v16": ["16.3", "b810fdfcbb59afea7ea7bbe0cf94eaccb55a2ea2"],
|
||||
"v15": ["15.7", "4874c8e52ed349a9f8290bbdcd91eb92677a5d24"],
|
||||
"v14": ["14.12", "ad73770c446ea361f43e4f0404798b7e5e7a62d8"]
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user