mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-22 23:50:39 +00:00
Compare commits
52 Commits
arpad/vect
...
vlad/asdas
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4529b10553 | ||
|
|
81a28a74c2 | ||
|
|
98d5c1a4f1 | ||
|
|
04b7e56541 | ||
|
|
cc4c0619b8 | ||
|
|
94e9811ca4 | ||
|
|
a020f55a80 | ||
|
|
1d28c8ea4e | ||
|
|
8719d813ec | ||
|
|
7918fdd2ab | ||
|
|
e874b08914 | ||
|
|
357d4233ec | ||
|
|
37f7ed1f30 | ||
|
|
e55993c043 | ||
|
|
82892adcc6 | ||
|
|
2e67e48ac1 | ||
|
|
3f84ecac31 | ||
|
|
9acc5613ce | ||
|
|
2e792927fd | ||
|
|
29c35bb631 | ||
|
|
f5a96ac5f0 | ||
|
|
cead53dafc | ||
|
|
bb5f9dd423 | ||
|
|
dad88b0b32 | ||
|
|
a610ddb307 | ||
|
|
9603ae7bca | ||
|
|
af94059c5a | ||
|
|
821119cf0c | ||
|
|
c72511c8cf | ||
|
|
b0dc5e62c2 | ||
|
|
b579306e47 | ||
|
|
ab34898b86 | ||
|
|
0f55da3629 | ||
|
|
bfae30a086 | ||
|
|
d80c2690e5 | ||
|
|
5028575672 | ||
|
|
5c58d976b7 | ||
|
|
48844436fc | ||
|
|
74bfb93498 | ||
|
|
13a255801c | ||
|
|
bb96416a79 | ||
|
|
0b365fdac7 | ||
|
|
256e8e0a90 | ||
|
|
0edba09730 | ||
|
|
1c1ff34490 | ||
|
|
e948e2d2b8 | ||
|
|
54d039d143 | ||
|
|
5cb73c34c0 | ||
|
|
9cd20e6a21 | ||
|
|
575c8e0bbf | ||
|
|
cc843d14fb | ||
|
|
d3250be5db |
@@ -693,7 +693,6 @@ impl DeletionQueue {
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[allow(unused)]
|
||||
mod test {
|
||||
use camino::Utf8Path;
|
||||
use hex_literal::hex;
|
||||
@@ -992,7 +991,6 @@ mod test {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[cfg(any())]
|
||||
async fn deletion_queue_validation() -> anyhow::Result<()> {
|
||||
let ctx = setup("deletion_queue_validation").expect("Failed test setup");
|
||||
let client = ctx.deletion_queue.new_client();
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
#![recursion_limit = "300"]
|
||||
#![deny(clippy::undocumented_unsafe_blocks)]
|
||||
#![allow(unused)]
|
||||
|
||||
mod auth;
|
||||
pub mod basebackup;
|
||||
|
||||
@@ -3937,8 +3937,7 @@ pub(crate) mod harness {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(any())]
|
||||
#[allow(unused)]
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
@@ -4703,7 +4702,6 @@ mod tests {
|
||||
// There's one major downside to this test: delta layers only contains images,
|
||||
// so the search can stop at the first delta layer and doesn't traverse any deeper.
|
||||
#[tokio::test]
|
||||
#[cfg(any())]
|
||||
async fn test_get_vectored() -> anyhow::Result<()> {
|
||||
let harness = TenantHarness::create("test_get_vectored")?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
@@ -4852,7 +4850,6 @@ mod tests {
|
||||
// ------------------------------+
|
||||
// ```
|
||||
#[tokio::test]
|
||||
#[cfg(any())]
|
||||
async fn test_get_vectored_key_gap() -> anyhow::Result<()> {
|
||||
let tenant_conf = TenantConf {
|
||||
// Make compaction deterministic
|
||||
@@ -5012,7 +5009,6 @@ mod tests {
|
||||
// * X - page images
|
||||
// ```
|
||||
#[tokio::test]
|
||||
#[cfg(any())]
|
||||
async fn test_get_vectored_ancestor_descent() -> anyhow::Result<()> {
|
||||
let harness = TenantHarness::create("test_get_vectored_on_lsn_axis")?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
|
||||
@@ -394,7 +394,7 @@ impl BlobWriter<false> {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(any())]
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::{context::DownloadBehavior, task_mgr::TaskKind, tenant::block_io::BlockReaderRef};
|
||||
|
||||
@@ -86,7 +86,7 @@ pub(crate) enum BlockReaderRef<'a> {
|
||||
Slice(&'a [u8]),
|
||||
#[cfg(test)]
|
||||
TestDisk(&'a super::disk_btree::tests::TestDisk),
|
||||
#[cfg(any())]
|
||||
#[cfg(test)]
|
||||
VirtualFile(&'a VirtualFile),
|
||||
}
|
||||
|
||||
@@ -105,7 +105,7 @@ impl<'a> BlockReaderRef<'a> {
|
||||
Slice(s) => Self::read_blk_slice(s, blknum),
|
||||
#[cfg(test)]
|
||||
TestDisk(r) => r.read_blk(blknum),
|
||||
#[cfg(any())]
|
||||
#[cfg(test)]
|
||||
VirtualFile(r) => r.read_blk(blknum, ctx).await,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1498,7 +1498,7 @@ impl DeltaLayerInner {
|
||||
offset
|
||||
}
|
||||
|
||||
#[cfg(any())]
|
||||
#[cfg(test)]
|
||||
pub(crate) fn iter<'a>(&'a self, ctx: &'a RequestContext) -> DeltaLayerIterator<'a> {
|
||||
let block_reader = FileBlockReader::new(&self.file, self.file_id);
|
||||
let tree_reader =
|
||||
@@ -1638,7 +1638,7 @@ impl<'a> DeltaLayerIterator<'a> {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(any())]
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
|
||||
@@ -1290,7 +1290,6 @@ mod test {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(any())]
|
||||
#[tokio::test]
|
||||
async fn image_layer_iterator() {
|
||||
let harness = TenantHarness::create("image_layer_iterator").unwrap();
|
||||
|
||||
@@ -1890,7 +1890,7 @@ impl ResidentLayer {
|
||||
self.owner.metadata()
|
||||
}
|
||||
|
||||
#[cfg(any())]
|
||||
#[cfg(test)]
|
||||
pub(crate) async fn get_as_delta(
|
||||
&self,
|
||||
ctx: &RequestContext,
|
||||
|
||||
@@ -315,7 +315,7 @@ impl<'a> VectoredBlobReader<'a> {
|
||||
read.size(),
|
||||
buf.capacity()
|
||||
);
|
||||
let mut buf = self
|
||||
let buf = self
|
||||
.file
|
||||
.read_exact_at(buf.slice(0..read.size()), read.start, ctx)
|
||||
.await?
|
||||
@@ -364,8 +364,6 @@ impl<'a> VectoredBlobReader<'a> {
|
||||
|
||||
assert_eq!(end - start, blob_size);
|
||||
|
||||
buf[start as usize..end as usize].fill(0xaf);
|
||||
|
||||
metas.push(VectoredBlob {
|
||||
start: start as usize,
|
||||
end: end as usize,
|
||||
|
||||
@@ -1015,7 +1015,6 @@ impl tokio_epoll_uring::IoFd for FileGuard {
|
||||
|
||||
#[cfg(test)]
|
||||
impl VirtualFile {
|
||||
#[cfg(any())]
|
||||
pub(crate) async fn read_blk(
|
||||
&self,
|
||||
blknum: u32,
|
||||
|
||||
@@ -343,33 +343,7 @@ impl WalIngest {
|
||||
xlog_checkpoint.oldestActiveXid,
|
||||
self.checkpoint.oldestActiveXid
|
||||
);
|
||||
|
||||
// A shutdown checkpoint has `oldestActiveXid == InvalidTransactionid`,
|
||||
// because at shutdown, all in-progress transactions will implicitly
|
||||
// end. Postgres startup code knows that, and allows hot standby to start
|
||||
// immediately from a shutdown checkpoint.
|
||||
//
|
||||
// In Neon, Postgres hot standby startup always behaves as if starting from
|
||||
// an online checkpoint. It needs a valid `oldestActiveXid` value, so
|
||||
// instead of overwriting self.checkpoint.oldestActiveXid with
|
||||
// InvalidTransactionid from the checkpoint WAL record, update it to a
|
||||
// proper value, knowing that there are no in-progress transactions at this
|
||||
// point, except for prepared transactions.
|
||||
//
|
||||
// See also the neon code changes in the InitWalRecovery() function.
|
||||
if xlog_checkpoint.oldestActiveXid == pg_constants::INVALID_TRANSACTION_ID
|
||||
&& info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
|
||||
{
|
||||
let mut oldest_active_xid = self.checkpoint.nextXid.value as u32;
|
||||
for xid in modification.tline.list_twophase_files(lsn, ctx).await? {
|
||||
if (xid.wrapping_sub(oldest_active_xid) as i32) < 0 {
|
||||
oldest_active_xid = xid;
|
||||
}
|
||||
}
|
||||
self.checkpoint.oldestActiveXid = oldest_active_xid;
|
||||
} else {
|
||||
self.checkpoint.oldestActiveXid = xlog_checkpoint.oldestActiveXid;
|
||||
}
|
||||
self.checkpoint.oldestActiveXid = xlog_checkpoint.oldestActiveXid;
|
||||
|
||||
// Write a new checkpoint key-value pair on every checkpoint record, even
|
||||
// if nothing really changed. Not strictly required, but it seems nice to
|
||||
@@ -401,7 +375,6 @@ impl WalIngest {
|
||||
if info == pg_constants::XLOG_RUNNING_XACTS {
|
||||
let xlrec = crate::walrecord::XlRunningXacts::decode(&mut buf);
|
||||
self.checkpoint.oldestActiveXid = xlrec.oldest_running_xid;
|
||||
self.checkpoint_modified = true;
|
||||
}
|
||||
}
|
||||
pg_constants::RM_REPLORIGIN_ID => {
|
||||
@@ -1304,10 +1277,13 @@ impl WalIngest {
|
||||
xlrec.pageno, xlrec.oldest_xid, xlrec.oldest_xid_db
|
||||
);
|
||||
|
||||
// In Postgres, oldestXid and oldestXidDB are updated in memory when the CLOG is
|
||||
// truncated, but a checkpoint record with the updated values isn't written until
|
||||
// later. In Neon, a server can start at any LSN, not just on a checkpoint record,
|
||||
// so we keep the oldestXid and oldestXidDB up-to-date.
|
||||
// Here we treat oldestXid and oldestXidDB
|
||||
// differently from postgres redo routines.
|
||||
// In postgres checkpoint.oldestXid lags behind xlrec.oldest_xid
|
||||
// until checkpoint happens and updates the value.
|
||||
// Here we can use the most recent value.
|
||||
// It's just an optimization, though and can be deleted.
|
||||
// TODO Figure out if there will be any issues with replica.
|
||||
self.checkpoint.oldestXid = xlrec.oldest_xid;
|
||||
self.checkpoint.oldestXidDB = xlrec.oldest_xid_db;
|
||||
self.checkpoint_modified = true;
|
||||
|
||||
293
pgxn/neon/neon.c
293
pgxn/neon/neon.c
@@ -12,8 +12,6 @@
|
||||
#include "fmgr.h"
|
||||
|
||||
#include "miscadmin.h"
|
||||
#include "access/subtrans.h"
|
||||
#include "access/twophase.h"
|
||||
#include "access/xact.h"
|
||||
#include "access/xlog.h"
|
||||
#include "storage/buf_internals.h"
|
||||
@@ -24,12 +22,10 @@
|
||||
#include "replication/logical.h"
|
||||
#include "replication/slot.h"
|
||||
#include "replication/walsender.h"
|
||||
#include "storage/proc.h"
|
||||
#include "storage/procsignal.h"
|
||||
#include "tcop/tcopprot.h"
|
||||
#include "funcapi.h"
|
||||
#include "access/htup_details.h"
|
||||
#include "utils/builtins.h"
|
||||
#include "utils/pg_lsn.h"
|
||||
#include "utils/guc.h"
|
||||
#include "utils/wait_event.h"
|
||||
@@ -270,293 +266,6 @@ LogicalSlotsMonitorMain(Datum main_arg)
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* XXX: These private to procarray.c, but we need them here.
|
||||
*/
|
||||
#define PROCARRAY_MAXPROCS (MaxBackends + max_prepared_xacts)
|
||||
#define TOTAL_MAX_CACHED_SUBXIDS \
|
||||
((PGPROC_MAX_CACHED_SUBXIDS + 1) * PROCARRAY_MAXPROCS)
|
||||
|
||||
/*
|
||||
* Restore running-xact information by scanning the CLOG at startup.
|
||||
*
|
||||
* In PostgreSQL, a standby always has to wait for a running-xacts WAL record
|
||||
* to arrive before it can start accepting queries. Furthermore, if there are
|
||||
* transactions with too many subxids (> 64) open to fit in the in-memory
|
||||
* subxids cache, the running-xacts record will be marked as "suboverflowed",
|
||||
* and the standby will need to also wait for the currently in-progress
|
||||
* transactions to finish.
|
||||
*
|
||||
* That's not great in PostgreSQL, because a hot standby does not necessary
|
||||
* open up for queries immediately as you might expect. But it's worse in
|
||||
* Neon: A standby in Neon doesn't need to start WAL replay from a checkpoint
|
||||
* record; it can start at any LSN. Postgres arranges things so that there is
|
||||
* a running-xacts record soon after every checkpoint record, but when you
|
||||
* start from an arbitrary LSN, that doesn't help. If the primary is idle, or
|
||||
* not running at all, it might never write a new running-xacts record,
|
||||
* leaving the replica in a limbo where it can never start accepting queries.
|
||||
*
|
||||
* To mitigate that, we have an additional mechanism to find the running-xacts
|
||||
* information: we scan the CLOG, making note of any XIDs not marked as
|
||||
* committed or aborted. They are added to the Postgres known-assigned XIDs
|
||||
* array by calling ProcArrayApplyRecoveryInfo() in the caller of this
|
||||
* function.
|
||||
*
|
||||
* There is one big limitation with that mechanism: The size of the
|
||||
* known-assigned XIDs is limited, so if there are a lot of in-progress XIDs,
|
||||
* we have to give up. Furthermore, we don't know how many of the in-progress
|
||||
* XIDs are subtransactions, and if we use up all the space in the
|
||||
* known-assigned XIDs array for subtransactions, we might run out of space in
|
||||
* the array later during WAL replay, causing the replica to shut down with
|
||||
* "ERROR: too many KnownAssignedXids". The safe # of XIDs that we can add to
|
||||
* the known-assigned array without risking that error later is very low,
|
||||
* merely PGPROC_MAX_CACHED_SUBXIDS == 64, so we take our chances and use up
|
||||
* to half of the known-assigned XIDs array for the subtransactions, even
|
||||
* though that risks getting the error later.
|
||||
*
|
||||
* Note: It's OK if the recovered list of XIDs includes some transactions that
|
||||
* have crashed in the primary, and hence will never commit. They will be seen
|
||||
* as in-progress, until we see a new next running-acts record with an
|
||||
* oldestActiveXid that invalidates them. That's how the known-assigned XIDs
|
||||
* array always works.
|
||||
*
|
||||
* If scraping the CLOG doesn't succeed for some reason, like the subxid
|
||||
* overflow, Postgres will fall back to waiting for a running-xacts record
|
||||
* like usual.
|
||||
*
|
||||
* Returns true if a complete list of in-progress XIDs was scraped.
|
||||
*/
|
||||
static bool
|
||||
RestoreRunningXactsFromClog(CheckPoint *checkpoint, TransactionId **xids, int *nxids)
|
||||
{
|
||||
TransactionId from;
|
||||
TransactionId till;
|
||||
int max_xcnt;
|
||||
TransactionId *prepared_xids = NULL;
|
||||
int n_prepared_xids;
|
||||
TransactionId *restored_xids = NULL;
|
||||
int n_restored_xids;
|
||||
int next_prepared_idx;
|
||||
|
||||
Assert(*xids == NULL);
|
||||
|
||||
/*
|
||||
* If the checkpoint doesn't have a valid oldestActiveXid, bail out. We
|
||||
* don't know where to start the scan.
|
||||
*
|
||||
* This shouldn't happen, because the pageserver always maintains a valid
|
||||
* oldestActiveXid nowadays. Except when starting at an old point in time
|
||||
* that was ingested before the pageserver was taught to do that.
|
||||
*/
|
||||
if (!TransactionIdIsValid(checkpoint->oldestActiveXid))
|
||||
{
|
||||
elog(LOG, "cannot restore running-xacts from CLOG because oldestActiveXid is not set");
|
||||
goto fail;
|
||||
}
|
||||
|
||||
/*
|
||||
* We will scan the CLOG starting from the oldest active XID.
|
||||
*
|
||||
* In some corner cases, the oldestActiveXid from the last checkpoint
|
||||
* might already have been truncated from the CLOG. That is,
|
||||
* oldestActiveXid might be older than oldestXid. That's possible because
|
||||
* oldestActiveXid is only updated at checkpoints. After the last
|
||||
* checkpoint, the oldest transaction might have committed, and the CLOG
|
||||
* might also have been already truncated. So if oldestActiveXid is older
|
||||
* than oldestXid, start at oldestXid instead. (Otherwise we'd try to
|
||||
* access CLOG segments that have already been truncated away.)
|
||||
*/
|
||||
from = TransactionIdPrecedes(checkpoint->oldestXid, checkpoint->oldestActiveXid)
|
||||
? checkpoint->oldestActiveXid : checkpoint->oldestXid;
|
||||
till = XidFromFullTransactionId(checkpoint->nextXid);
|
||||
|
||||
/*
|
||||
* To avoid "too many KnownAssignedXids" error later during replay, we
|
||||
* limit number of collected transactions. This is a tradeoff: if we are
|
||||
* willing to consume more of the KnownAssignedXids space for the XIDs
|
||||
* now, that allows us to start up, but we might run out of space later.
|
||||
*
|
||||
* The size of the KnownAssignedXids array is TOTAL_MAX_CACHED_SUBXIDS,
|
||||
* which is (PGPROC_MAX_CACHED_SUBXIDS + 1) * PROCARRAY_MAXPROCS). In
|
||||
* PostgreSQL, that's always enough because the primary will always write
|
||||
* an XLOG_XACT_ASSIGNMENT record if a transaction has more than
|
||||
* PGPROC_MAX_CACHED_SUBXIDS subtransactions. Seeing that record allows
|
||||
* the standby to mark the XIDs in pg_subtrans and removing them from the
|
||||
* KnowingAssignedXids array.
|
||||
*
|
||||
* Here, we don't know which XIDs belong to subtransactions that have
|
||||
* already been WAL-logged with an XLOG_XACT_ASSIGNMENT record. If we
|
||||
* wanted to be totally safe and avoid the possibility of getting a "too
|
||||
* many KnownAssignedXids" error later, we would have to limit ourselves
|
||||
* to PGPROC_MAX_CACHED_SUBXIDS, which is not much. And that includes top
|
||||
* transaction IDs too, because we cannot distinguish between top
|
||||
* transaction IDs and subtransactions here.
|
||||
*
|
||||
* Somewhat arbitrarily, we use up to half of KnownAssignedXids. That
|
||||
* strikes a sensible balance between being useful, and risking a "too
|
||||
* many KnownAssignedXids" error later.
|
||||
*/
|
||||
max_xcnt = TOTAL_MAX_CACHED_SUBXIDS / 2;
|
||||
|
||||
/*
|
||||
* Collect XIDs of prepared transactions in an array. This includes only
|
||||
* their top-level XIDs. We assume that StandbyRecoverPreparedTransactions
|
||||
* has already been called, so we can find all the sub-transactions in
|
||||
* pg_subtrans.
|
||||
*/
|
||||
PrescanPreparedTransactions(&prepared_xids, &n_prepared_xids);
|
||||
qsort(prepared_xids, n_prepared_xids, sizeof(TransactionId), xidLogicalComparator);
|
||||
|
||||
/*
|
||||
* Scan the CLOG, collecting in-progress XIDs into 'restored_xids'.
|
||||
*/
|
||||
elog(DEBUG1, "scanning CLOG between %u and %u for in-progress XIDs", from, till);
|
||||
restored_xids = (TransactionId *) palloc(max_xcnt * sizeof(TransactionId));
|
||||
n_restored_xids = 0;
|
||||
next_prepared_idx = 0;
|
||||
for (TransactionId xid = from; xid != till;)
|
||||
{
|
||||
XLogRecPtr xidlsn;
|
||||
XidStatus xidstatus;
|
||||
|
||||
xidstatus = TransactionIdGetStatus(xid, &xidlsn);
|
||||
|
||||
/*
|
||||
* "Merge" the prepared transactions into the restored_xids array as
|
||||
* we go. The prepared transactions array is sorted. This is mostly
|
||||
* a sanity check to ensure that all the prpeared transactions are
|
||||
* seen as in-progress. (There is a check after the loop that we didn't
|
||||
* miss any.)
|
||||
*/
|
||||
if (next_prepared_idx < n_prepared_xids && xid == prepared_xids[next_prepared_idx])
|
||||
{
|
||||
/*
|
||||
* This is a top-level transaction ID of a prepared transaction.
|
||||
* Include it in the array.
|
||||
*/
|
||||
|
||||
/* sanity check */
|
||||
if (xidstatus != TRANSACTION_STATUS_IN_PROGRESS)
|
||||
{
|
||||
elog(LOG, "prepared transaction %u has unexpected status %X, cannot restore running-xacts from CLOG",
|
||||
xid, xidstatus);
|
||||
Assert(false);
|
||||
goto fail;
|
||||
}
|
||||
|
||||
elog(DEBUG1, "XID %u: was next prepared xact (%d / %d)", xid, next_prepared_idx, n_prepared_xids);
|
||||
next_prepared_idx++;
|
||||
}
|
||||
else if (xidstatus == TRANSACTION_STATUS_COMMITTED)
|
||||
{
|
||||
elog(DEBUG1, "XID %u: was committed", xid);
|
||||
goto skip;
|
||||
}
|
||||
else if (xidstatus == TRANSACTION_STATUS_ABORTED)
|
||||
{
|
||||
elog(DEBUG1, "XID %u: was aborted", xid);
|
||||
goto skip;
|
||||
}
|
||||
else if (xidstatus == TRANSACTION_STATUS_IN_PROGRESS)
|
||||
{
|
||||
/*
|
||||
* In-progress transactions are included in the array.
|
||||
*
|
||||
* Except subtransactions of the prepared transactions. They are
|
||||
* already set in pg_subtrans, and hence don't need to be tracked
|
||||
* in the known-assigned XIDs array.
|
||||
*/
|
||||
if (n_prepared_xids > 0)
|
||||
{
|
||||
TransactionId parent = SubTransGetParent(xid);
|
||||
|
||||
if (TransactionIdIsValid(parent))
|
||||
{
|
||||
/*
|
||||
* This is a subtransaction belonging to a prepared
|
||||
* transaction.
|
||||
*
|
||||
* Sanity check that it is in the prepared XIDs array. It
|
||||
* should be, because StandbyRecoverPreparedTransactions
|
||||
* populated pg_subtrans, and no other XID should be set
|
||||
* in it yet. (This also relies on the fact that
|
||||
* StandbyRecoverPreparedTransactions sets the parent of
|
||||
* each subxid to point directly to the top-level XID,
|
||||
* rather than restoring the original subtransaction
|
||||
* hierarchy.)
|
||||
*/
|
||||
if (bsearch(&parent, prepared_xids, next_prepared_idx,
|
||||
sizeof(TransactionId), xidLogicalComparator) == NULL)
|
||||
{
|
||||
elog(LOG, "sub-XID %u has unexpected parent %u, cannot restore running-xacts from CLOG",
|
||||
xid, parent);
|
||||
Assert(false);
|
||||
goto fail;
|
||||
}
|
||||
elog(DEBUG1, "XID %u: was a subtransaction of prepared xid %u", xid, parent);
|
||||
goto skip;
|
||||
}
|
||||
}
|
||||
|
||||
/* include it in the array */
|
||||
elog(DEBUG1, "XID %u: is in progress", xid);
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
* SUB_COMMITTED is a transient state used at commit. We don't
|
||||
* expect to see that here.
|
||||
*/
|
||||
elog(LOG, "XID %u has unexpected status %X in pg_xact, cannot restore running-xacts from CLOG",
|
||||
xid, xidstatus);
|
||||
Assert(false);
|
||||
goto fail;
|
||||
}
|
||||
|
||||
if (n_restored_xids >= max_xcnt)
|
||||
{
|
||||
/*
|
||||
* Overflowed. We won't be able to install the RunningTransactions
|
||||
* snapshot.
|
||||
*/
|
||||
elog(LOG, "too many running xacts to restore from the CLOG; oldestXid=%u oldestActiveXid=%u nextXid %u",
|
||||
checkpoint->oldestXid, checkpoint->oldestActiveXid,
|
||||
XidFromFullTransactionId(checkpoint->nextXid));
|
||||
goto fail;
|
||||
}
|
||||
|
||||
restored_xids[n_restored_xids++] = xid;
|
||||
|
||||
skip:
|
||||
TransactionIdAdvance(xid);
|
||||
continue;
|
||||
}
|
||||
|
||||
/* sanity check */
|
||||
if (next_prepared_idx != n_prepared_xids)
|
||||
{
|
||||
elog(LOG, "prepared transaction ID %u was not visited in the CLOG scan, cannot restore running-xacts from CLOG",
|
||||
prepared_xids[next_prepared_idx]);
|
||||
Assert(false);
|
||||
goto fail;
|
||||
}
|
||||
|
||||
elog(LOG, "restored %d running xacts by scanning the CLOG; oldestXid=%u oldestActiveXid=%u nextXid %u",
|
||||
n_restored_xids, checkpoint->oldestXid, checkpoint->oldestActiveXid, XidFromFullTransactionId(checkpoint->nextXid));
|
||||
*nxids = n_restored_xids;
|
||||
*xids = restored_xids;
|
||||
return true;
|
||||
|
||||
fail:
|
||||
*nxids = 0;
|
||||
*xids = NULL;
|
||||
if (restored_xids)
|
||||
pfree(restored_xids);
|
||||
if (prepared_xids)
|
||||
pfree(prepared_xids);
|
||||
return false;
|
||||
}
|
||||
|
||||
void
|
||||
_PG_init(void)
|
||||
{
|
||||
@@ -579,8 +288,6 @@ _PG_init(void)
|
||||
|
||||
pg_init_extension_server();
|
||||
|
||||
restore_running_xacts_callback = RestoreRunningXactsFromClog;
|
||||
|
||||
/*
|
||||
* Important: This must happen after other parts of the extension are
|
||||
* loaded, otherwise any settings to GUCs that were set before the
|
||||
|
||||
8
poetry.lock
generated
8
poetry.lock
generated
@@ -1,4 +1,4 @@
|
||||
# This file is automatically @generated by Poetry 1.8.2 and should not be changed by hand.
|
||||
# This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand.
|
||||
|
||||
[[package]]
|
||||
name = "aiohttp"
|
||||
@@ -734,13 +734,13 @@ typing-extensions = ">=4.1.0"
|
||||
|
||||
[[package]]
|
||||
name = "certifi"
|
||||
version = "2023.7.22"
|
||||
version = "2024.7.4"
|
||||
description = "Python package for providing Mozilla's CA Bundle."
|
||||
optional = false
|
||||
python-versions = ">=3.6"
|
||||
files = [
|
||||
{file = "certifi-2023.7.22-py3-none-any.whl", hash = "sha256:92d6037539857d8206b8f6ae472e8b77db8058fec5937a1ef3f54304089edbb9"},
|
||||
{file = "certifi-2023.7.22.tar.gz", hash = "sha256:539cc1d13202e33ca466e88b2807e29f4c13049d6d87031a3c110744495cb082"},
|
||||
{file = "certifi-2024.7.4-py3-none-any.whl", hash = "sha256:c198e21b1289c2ab85ee4e67bb4b4ef3ead0892059901a8d5b622f24a1101e90"},
|
||||
{file = "certifi-2024.7.4.tar.gz", hash = "sha256:5a1e7645bc0ec61a09e26c36f6106dd4cf40c6db3a1fb6352b0244e7fb057c7b"},
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
||||
@@ -3542,6 +3542,7 @@ class Endpoint(PgProtocol, LogUtils):
|
||||
):
|
||||
super().__init__(host="localhost", port=pg_port, user="cloud_admin", dbname="postgres")
|
||||
self.env = env
|
||||
self.running = False
|
||||
self.branch_name: Optional[str] = None # dubious
|
||||
self.endpoint_id: Optional[str] = None # dubious, see asserts below
|
||||
self.pgdata_dir: Optional[str] = None # Path to computenode PGDATA
|
||||
@@ -3915,9 +3916,7 @@ class EndpointFactory:
|
||||
|
||||
return self
|
||||
|
||||
def new_replica(
|
||||
self, origin: Endpoint, endpoint_id: str, config_lines: Optional[List[str]] = None
|
||||
):
|
||||
def new_replica(self, origin: Endpoint, endpoint_id: str, config_lines: Optional[List[str]]):
|
||||
branch_name = origin.branch_name
|
||||
assert origin in self.endpoints
|
||||
assert branch_name is not None
|
||||
|
||||
@@ -198,7 +198,7 @@ def wait_for_last_record_lsn(
|
||||
lsn: Lsn,
|
||||
) -> Lsn:
|
||||
"""waits for pageserver to catch up to a certain lsn, returns the last observed lsn."""
|
||||
for i in range(1000):
|
||||
for i in range(100):
|
||||
current_lsn = last_record_lsn(pageserver_http, tenant, timeline)
|
||||
if current_lsn >= lsn:
|
||||
return current_lsn
|
||||
|
||||
@@ -1,646 +0,0 @@
|
||||
"""
|
||||
In PostgreSQL, a standby always has to wait for a running-xacts WAL record to
|
||||
arrive before it can start accepting queries. Furthermore, if there are
|
||||
transactions with too many subxids (> 64) open to fit in the in-memory subxids
|
||||
cache, the running-xacts record will be marked as "suboverflowed", and the
|
||||
standby will need to also wait for the currently in-progress transactions to
|
||||
finish.
|
||||
|
||||
In Neon, we have an additional mechanism that scans the CLOG at server startup
|
||||
to determine the list of running transactions, so that the standby can start up
|
||||
immediately without waiting for the running-xacts record, but that mechanism
|
||||
only works if the # of active (sub-)transactions is reasonably small. Otherwise
|
||||
it falls back to waiting. Furthermore, it's somewhat optimistic in using up the
|
||||
known-assigned XIDs array: if too many transactions with subxids are started in
|
||||
the primary later, the replay in the replica will crash with "too many
|
||||
KnownAssignedXids" error.
|
||||
|
||||
This module contains tests for those various cases at standby startup: starting
|
||||
from shutdown checkpoint, using the CLOG scanning mechanism, waiting for
|
||||
running-xacts record and for in-progress transactions to finish etc.
|
||||
"""
|
||||
|
||||
import threading
|
||||
from contextlib import closing
|
||||
|
||||
import psycopg2
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv, wait_for_last_flush_lsn, wait_replica_caughtup
|
||||
from fixtures.pg_version import PgVersion
|
||||
from fixtures.utils import query_scalar, wait_until
|
||||
|
||||
CREATE_SUBXACTS_FUNC = """
|
||||
create or replace function create_subxacts(n integer) returns void as $$
|
||||
declare
|
||||
i integer;
|
||||
begin
|
||||
for i in 1..n loop
|
||||
begin
|
||||
insert into t (payload) values (0);
|
||||
exception
|
||||
when others then
|
||||
raise exception 'caught something: %', sqlerrm;
|
||||
end;
|
||||
end loop;
|
||||
end; $$ language plpgsql
|
||||
"""
|
||||
|
||||
|
||||
def test_replica_start_scan_clog(neon_simple_env: NeonEnv):
|
||||
"""
|
||||
Test the CLOG-scanning mechanism at hot standby startup. There is one
|
||||
transaction active in the primary when the standby is started. The primary
|
||||
is killed before it has a chance to write a running-xacts record. The
|
||||
CLOG-scanning at neon startup allows the standby to start up anyway.
|
||||
|
||||
See the module docstring for background.
|
||||
"""
|
||||
|
||||
# Initialize the primary, a test table, and a helper function to create lots
|
||||
# of subtransactions.
|
||||
env = neon_simple_env
|
||||
primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary")
|
||||
primary_conn = primary.connect()
|
||||
primary_cur = primary_conn.cursor()
|
||||
primary_cur.execute("CREATE EXTENSION neon_test_utils")
|
||||
primary_cur.execute("create table t(pk serial primary key, payload integer)")
|
||||
primary_cur.execute(CREATE_SUBXACTS_FUNC)
|
||||
primary_cur.execute("select pg_switch_wal()")
|
||||
|
||||
# Start a transaction in the primary. Leave the transaction open.
|
||||
#
|
||||
# The transaction has some subtransactions, but not too many to cause the
|
||||
# CLOG-scanning mechanism to give up.
|
||||
primary_cur.execute("begin")
|
||||
primary_cur.execute("select create_subxacts(50)")
|
||||
|
||||
# Wait for the WAL to be flushed, but then immediately kill the primary,
|
||||
# before it has a chance to generate a running-xacts record.
|
||||
primary_cur.execute("select neon_xlogflush()")
|
||||
wait_for_last_flush_lsn(env, primary, env.initial_tenant, env.initial_timeline)
|
||||
primary.stop(mode="immediate")
|
||||
|
||||
# Create a replica. It should start up normally, thanks to the CLOG-scanning
|
||||
# mechanism.
|
||||
secondary = env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary")
|
||||
|
||||
# The transaction did not commit, so it should not be visible in the secondary
|
||||
secondary_conn = secondary.connect()
|
||||
secondary_cur = secondary_conn.cursor()
|
||||
secondary_cur.execute("select count(*) from t")
|
||||
assert secondary_cur.fetchone() == (0,)
|
||||
|
||||
|
||||
def test_replica_start_scan_clog_crashed_xids(neon_simple_env: NeonEnv):
|
||||
"""
|
||||
Test the CLOG-scanning mechanism at hot standby startup, after
|
||||
leaving behind crashed transactions.
|
||||
|
||||
See the module docstring for background.
|
||||
"""
|
||||
|
||||
# Initialize the primary, a test table, and a helper function to create lots
|
||||
# of subtransactions.
|
||||
env = neon_simple_env
|
||||
primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary")
|
||||
primary_conn = primary.connect()
|
||||
primary_cur = primary_conn.cursor()
|
||||
primary_cur.execute("create table t(pk serial primary key, payload integer)")
|
||||
primary_cur.execute(CREATE_SUBXACTS_FUNC)
|
||||
primary_cur.execute("select pg_switch_wal()")
|
||||
|
||||
# Consume a lot of XIDs, then kill Postgres without giving it a
|
||||
# chance to write abort records for them.
|
||||
primary_cur.execute("begin")
|
||||
primary_cur.execute("select create_subxacts(100000)")
|
||||
primary.stop(mode="immediate")
|
||||
|
||||
# Restart the primary. Do some light work, and shut it down cleanly
|
||||
primary.start()
|
||||
primary_conn = primary.connect()
|
||||
primary_cur = primary_conn.cursor()
|
||||
primary_cur.execute("insert into t (payload) values (0)")
|
||||
primary.stop(mode="fast")
|
||||
|
||||
# Create a replica. It should start up normally, thanks to the CLOG-scanning
|
||||
# mechanism. (Restarting the primary writes a checkpoint and/or running-xacts
|
||||
# record, which allows the standby to know that the crashed XIDs are aborted)
|
||||
secondary = env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary")
|
||||
|
||||
secondary_conn = secondary.connect()
|
||||
secondary_cur = secondary_conn.cursor()
|
||||
secondary_cur.execute("select count(*) from t")
|
||||
assert secondary_cur.fetchone() == (1,)
|
||||
|
||||
|
||||
def test_replica_start_at_running_xacts(neon_simple_env: NeonEnv, pg_version):
|
||||
"""
|
||||
Test that starting a replica works right after the primary has
|
||||
created a running-xacts record. This may seem like a trivial case,
|
||||
but during development, we had a bug that was triggered by having
|
||||
oldestActiveXid == nextXid. Starting right after a running-xacts
|
||||
record is one way to test that case.
|
||||
|
||||
See the module docstring for background.
|
||||
"""
|
||||
env = neon_simple_env
|
||||
|
||||
if env.pg_version == PgVersion.V14 or env.pg_version == PgVersion.V15:
|
||||
pytest.skip("pg_log_standby_snapshot() function is available only in PG16")
|
||||
|
||||
primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary")
|
||||
primary_conn = primary.connect()
|
||||
primary_cur = primary_conn.cursor()
|
||||
|
||||
primary_cur.execute("CREATE EXTENSION neon_test_utils")
|
||||
primary_cur.execute("select pg_log_standby_snapshot()")
|
||||
primary_cur.execute("select neon_xlogflush()")
|
||||
wait_for_last_flush_lsn(env, primary, env.initial_tenant, env.initial_timeline)
|
||||
|
||||
secondary = env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary")
|
||||
|
||||
secondary_conn = secondary.connect()
|
||||
secondary_cur = secondary_conn.cursor()
|
||||
secondary_cur.execute("select 123")
|
||||
assert secondary_cur.fetchone() == (123,)
|
||||
|
||||
|
||||
def test_replica_start_wait_subxids_finish(neon_simple_env: NeonEnv):
|
||||
"""
|
||||
Test replica startup when there are a lot of (sub)transactions active in the
|
||||
primary. That's too many for the CLOG-scanning mechanism to handle, so the
|
||||
replica has to wait for the large transaction to finish before it starts to
|
||||
accept queries.
|
||||
|
||||
After replica startup, test MVCC with transactions that were in-progress
|
||||
when the replica was started.
|
||||
|
||||
See the module docstring for background.
|
||||
"""
|
||||
|
||||
# Initialize the primary, a test table, and a helper function to create
|
||||
# lots of subtransactions.
|
||||
env = neon_simple_env
|
||||
primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary")
|
||||
primary_conn = primary.connect()
|
||||
primary_cur = primary_conn.cursor()
|
||||
primary_cur.execute("create table t(pk serial primary key, payload integer)")
|
||||
primary_cur.execute(CREATE_SUBXACTS_FUNC)
|
||||
|
||||
# Start a transaction with 100000 subtransactions, and leave it open. That's
|
||||
# too many to fit in the "known-assigned XIDs array" in the replica, and
|
||||
# also too many to fit in the subxid caches so the running-xacts record will
|
||||
# also overflow.
|
||||
primary_cur.execute("begin")
|
||||
primary_cur.execute("select create_subxacts(100000)")
|
||||
|
||||
# Start another, smaller transaction in the primary. We'll come back to this
|
||||
# later.
|
||||
primary_conn2 = primary.connect()
|
||||
primary_cur2 = primary_conn2.cursor()
|
||||
primary_cur2.execute("begin")
|
||||
primary_cur2.execute("insert into t (payload) values (0)")
|
||||
|
||||
# Create a replica. but before that, wait for the wal to be flushed to
|
||||
# safekeepers, so that the replica is started at a point where the large
|
||||
# transaction is already active. (The whole transaction might not be flushed
|
||||
# yet, but that's OK.)
|
||||
#
|
||||
# Start it in a separate thread, so that we can do other stuff while it's
|
||||
# blocked waiting for the startup to finish.
|
||||
wait_for_last_flush_lsn(env, primary, env.initial_tenant, env.initial_timeline)
|
||||
secondary = env.endpoints.new_replica(origin=primary, endpoint_id="secondary")
|
||||
start_secondary_thread = threading.Thread(target=secondary.start)
|
||||
start_secondary_thread.start()
|
||||
|
||||
# Verify that the replica has otherwise started up, but cannot start
|
||||
# accepting queries yet.
|
||||
log.info("Waiting 5 s to verify that the secondary does not start")
|
||||
start_secondary_thread.join(5)
|
||||
assert secondary.log_contains("consistent recovery state reached")
|
||||
assert secondary.log_contains("started streaming WAL from primary")
|
||||
# The "redo starts" message is printed when the first WAL record is
|
||||
# received. It might or might not be present in the log depending on how
|
||||
# far exactly the WAL was flushed when the replica was started, and whether
|
||||
# background activity caused any more WAL records to be flushed on the
|
||||
# primary afterwards.
|
||||
#
|
||||
# assert secondary.log_contains("redo # starts")
|
||||
|
||||
# should not be open for connections yet
|
||||
assert start_secondary_thread.is_alive()
|
||||
assert not secondary.is_running()
|
||||
assert not secondary.log_contains("database system is ready to accept read-only connections")
|
||||
|
||||
# Commit the large transaction in the primary.
|
||||
#
|
||||
# Within the next 15 s, the primary should write a new running-xacts record
|
||||
# to the WAL which shows the transaction as completed. Once the replica
|
||||
# replays that record, it will start accepting queries.
|
||||
primary_cur.execute("commit")
|
||||
start_secondary_thread.join()
|
||||
|
||||
# Verify that the large transaction is correctly visible in the secondary
|
||||
# (but not the second, small transaction, which is still in-progress!)
|
||||
secondary_conn = secondary.connect()
|
||||
secondary_cur = secondary_conn.cursor()
|
||||
secondary_cur.execute("select count(*) from t")
|
||||
assert secondary_cur.fetchone() == (100000,)
|
||||
|
||||
# Perform some more MVCC testing using the second transaction that was
|
||||
# started in the primary before the replica was created
|
||||
primary_cur2.execute("select create_subxacts(10000)")
|
||||
|
||||
# The second transaction still hasn't committed
|
||||
wait_replica_caughtup(primary, secondary)
|
||||
secondary_cur.execute("BEGIN ISOLATION LEVEL REPEATABLE READ")
|
||||
secondary_cur.execute("select count(*) from t")
|
||||
assert secondary_cur.fetchone() == (100000,)
|
||||
|
||||
# Commit the second transaction in the primary
|
||||
primary_cur2.execute("commit")
|
||||
|
||||
# Should still be invisible to the old snapshot
|
||||
wait_replica_caughtup(primary, secondary)
|
||||
secondary_cur.execute("select count(*) from t")
|
||||
assert secondary_cur.fetchone() == (100000,)
|
||||
|
||||
# Commit the REPEATABLE READ transaction in the replica. Both
|
||||
# primary transactions should now be visible to a new snapshot.
|
||||
secondary_cur.execute("commit")
|
||||
secondary_cur.execute("select count(*) from t")
|
||||
assert secondary_cur.fetchone() == (110001,)
|
||||
|
||||
|
||||
def test_replica_too_many_known_assigned_xids(neon_simple_env: NeonEnv):
|
||||
"""
|
||||
The CLOG-scanning mechanism fills the known-assigned XIDs array
|
||||
optimistically at standby startup, betting that it can still fit
|
||||
upcoming transactions replayed later from the WAL in the
|
||||
array. This test tests what happens when that bet fails and the
|
||||
known-assigned XID array fills up after the standby has already
|
||||
been started. The WAL redo will fail with an error:
|
||||
|
||||
FATAL: too many KnownAssignedXids
|
||||
CONTEXT: WAL redo at 0/1895CB0 for neon/INSERT: off: 25, flags: 0x08; blkref #0: rel 1663/5/16385, blk 64
|
||||
|
||||
which causes the standby to shut down.
|
||||
|
||||
See the module docstring for background.
|
||||
"""
|
||||
|
||||
# Initialize the primary, a test table, and a helper function to create lots
|
||||
# of subtransactions.
|
||||
env = neon_simple_env
|
||||
primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary")
|
||||
primary_conn = primary.connect()
|
||||
primary_cur = primary_conn.cursor()
|
||||
primary_cur.execute("CREATE EXTENSION neon_test_utils")
|
||||
primary_cur.execute("create table t(pk serial primary key, payload integer)")
|
||||
primary_cur.execute(CREATE_SUBXACTS_FUNC)
|
||||
|
||||
# Determine how many connections we can use
|
||||
primary_cur.execute("show max_connections")
|
||||
max_connections = int(primary_cur.fetchall()[0][0])
|
||||
primary_cur.execute("show superuser_reserved_connections")
|
||||
superuser_reserved_connections = int(primary_cur.fetchall()[0][0])
|
||||
n_connections = max_connections - superuser_reserved_connections
|
||||
n_subxids = 200
|
||||
|
||||
# Start one top transaction in primary, with lots of subtransactions. This
|
||||
# uses up much of the known-assigned XIDs space in the standby, but doesn't
|
||||
# cause it to overflow.
|
||||
large_p_conn = primary.connect()
|
||||
large_p_cur = large_p_conn.cursor()
|
||||
large_p_cur.execute("begin")
|
||||
large_p_cur.execute(f"select create_subxacts({max_connections} * 30)")
|
||||
|
||||
with closing(primary.connect()) as small_p_conn:
|
||||
with small_p_conn.cursor() as small_p_cur:
|
||||
small_p_cur.execute("select create_subxacts(1)")
|
||||
|
||||
# Create a replica at this LSN
|
||||
primary_cur.execute("select neon_xlogflush()")
|
||||
wait_for_last_flush_lsn(env, primary, env.initial_tenant, env.initial_timeline)
|
||||
secondary = env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary")
|
||||
secondary_conn = secondary.connect()
|
||||
secondary_cur = secondary_conn.cursor()
|
||||
|
||||
# The transaction in primary has not committed yet.
|
||||
wait_replica_caughtup(primary, secondary)
|
||||
secondary_cur.execute("select count(*) from t")
|
||||
assert secondary_cur.fetchone() == (1,)
|
||||
|
||||
# Start max number of top transactions in primary, with a lot of
|
||||
# subtransactions each. We add the subtransactions to each top transaction
|
||||
# in a round-robin fashion, instead of adding a lot of subtransactions to
|
||||
# one top transaction at a time. This way, we will have the max number of
|
||||
# subtransactions in the in-memory subxid cache of each top transaction,
|
||||
# until they all overflow.
|
||||
#
|
||||
# Currently, PGPROC_MAX_CACHED_SUBXIDS == 64, so this will overflow the all
|
||||
# the subxid caches after creating 64 subxids in each top transaction. The
|
||||
# point just before the caches have overflowed is the most interesting point
|
||||
# in time, but we'll keep going beyond that, to ensure that this test is
|
||||
# robust even if PGPROC_MAX_CACHED_SUBXIDS changes.
|
||||
p_curs = []
|
||||
for _ in range(0, n_connections):
|
||||
p_cur = primary.connect().cursor()
|
||||
p_cur.execute("begin")
|
||||
p_curs.append(p_cur)
|
||||
|
||||
for _subxid in range(0, n_subxids):
|
||||
for i in range(0, n_connections):
|
||||
p_curs[i].execute("select create_subxacts(1)")
|
||||
|
||||
# Commit all the transactions in the primary
|
||||
for i in range(0, n_connections):
|
||||
p_curs[i].execute("commit")
|
||||
large_p_cur.execute("commit")
|
||||
|
||||
# Wait until the replica crashes with "too many KnownAssignedXids" error.
|
||||
def check_replica_crashed():
|
||||
try:
|
||||
secondary.connect()
|
||||
except psycopg2.Error:
|
||||
# Once the connection fails, return success
|
||||
return None
|
||||
raise RuntimeError("connection succeeded")
|
||||
|
||||
wait_until(20, 0.5, check_replica_crashed)
|
||||
assert secondary.log_contains("too many KnownAssignedXids")
|
||||
|
||||
# Replica is crashed, so ignore stop result
|
||||
secondary.check_stop_result = False
|
||||
|
||||
|
||||
def test_replica_start_repro_visibility_bug(neon_simple_env: NeonEnv):
|
||||
"""
|
||||
Before PR #7288, a hot standby in neon incorrectly started up
|
||||
immediately, before it had received a running-xacts record. That
|
||||
led to visibility bugs if there were active transactions in the
|
||||
primary. This test reproduces the incorrect query results and
|
||||
incorrectly set hint bits, before that was fixed.
|
||||
"""
|
||||
env = neon_simple_env
|
||||
|
||||
primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary")
|
||||
p_cur = primary.connect().cursor()
|
||||
|
||||
p_cur.execute("begin")
|
||||
p_cur.execute("create table t(pk integer primary key, payload integer)")
|
||||
p_cur.execute("insert into t values (generate_series(1,100000), 0)")
|
||||
|
||||
secondary = env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary")
|
||||
wait_replica_caughtup(primary, secondary)
|
||||
s_cur = secondary.connect().cursor()
|
||||
|
||||
# Set hint bits for pg_class tuples. If primary's transaction is
|
||||
# not marked as in-progress in MVCC snapshot, then XMIN_INVALID
|
||||
# hint bit will be set for table's 't' tuple, making it invisible
|
||||
# even after the commit record is replayed later.
|
||||
s_cur.execute("select * from pg_class")
|
||||
|
||||
p_cur.execute("commit")
|
||||
wait_replica_caughtup(primary, secondary)
|
||||
s_cur.execute("select * from t where pk = 1")
|
||||
assert s_cur.fetchone() == (1, 0)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("shutdown", [True, False])
|
||||
def test_replica_start_with_prepared_xacts(neon_simple_env: NeonEnv, shutdown: bool):
|
||||
"""
|
||||
Test the CLOG-scanning mechanism at hot standby startup in the presence of
|
||||
prepared transactions.
|
||||
|
||||
This test is run in two variants: one where the primary server is shut down
|
||||
before starting the secondary, or not.
|
||||
"""
|
||||
|
||||
# Initialize the primary, a test table, and a helper function to create lots
|
||||
# of subtransactions.
|
||||
env = neon_simple_env
|
||||
primary = env.endpoints.create_start(
|
||||
branch_name="main", endpoint_id="primary", config_lines=["max_prepared_transactions=5"]
|
||||
)
|
||||
primary_conn = primary.connect()
|
||||
primary_cur = primary_conn.cursor()
|
||||
primary_cur.execute("CREATE EXTENSION neon_test_utils")
|
||||
primary_cur.execute("create table t(pk serial primary key, payload integer)")
|
||||
primary_cur.execute("create table t1(pk integer primary key)")
|
||||
primary_cur.execute("create table t2(pk integer primary key)")
|
||||
primary_cur.execute(CREATE_SUBXACTS_FUNC)
|
||||
|
||||
# Prepare a transaction for two-phase commit
|
||||
primary_cur.execute("begin")
|
||||
primary_cur.execute("insert into t1 values (1)")
|
||||
primary_cur.execute("prepare transaction 't1'")
|
||||
|
||||
# Prepare another transaction for two-phase commit, with a subtransaction
|
||||
primary_cur.execute("begin")
|
||||
primary_cur.execute("insert into t2 values (2)")
|
||||
primary_cur.execute("savepoint sp")
|
||||
primary_cur.execute("insert into t2 values (3)")
|
||||
primary_cur.execute("prepare transaction 't2'")
|
||||
|
||||
# Start a transaction in the primary. Leave the transaction open.
|
||||
#
|
||||
# The transaction has some subtransactions, but not too many to cause the
|
||||
# CLOG-scanning mechanism to give up.
|
||||
primary_cur.execute("begin")
|
||||
primary_cur.execute("select create_subxacts(50)")
|
||||
|
||||
# Wait for the WAL to be flushed
|
||||
primary_cur.execute("select neon_xlogflush()")
|
||||
wait_for_last_flush_lsn(env, primary, env.initial_tenant, env.initial_timeline)
|
||||
|
||||
if shutdown:
|
||||
primary.stop(mode="fast")
|
||||
|
||||
# Create a replica. It should start up normally, thanks to the CLOG-scanning
|
||||
# mechanism.
|
||||
secondary = env.endpoints.new_replica_start(
|
||||
origin=primary, endpoint_id="secondary", config_lines=["max_prepared_transactions=5"]
|
||||
)
|
||||
|
||||
# The transaction did not commit, so it should not be visible in the secondary
|
||||
secondary_conn = secondary.connect()
|
||||
secondary_cur = secondary_conn.cursor()
|
||||
secondary_cur.execute("select count(*) from t")
|
||||
assert secondary_cur.fetchone() == (0,)
|
||||
secondary_cur.execute("select count(*) from t1")
|
||||
assert secondary_cur.fetchone() == (0,)
|
||||
secondary_cur.execute("select count(*) from t2")
|
||||
assert secondary_cur.fetchone() == (0,)
|
||||
|
||||
if shutdown:
|
||||
primary.start()
|
||||
primary_conn = primary.connect()
|
||||
primary_cur = primary_conn.cursor()
|
||||
else:
|
||||
primary_cur.execute("commit")
|
||||
primary_cur.execute("commit prepared 't1'")
|
||||
primary_cur.execute("commit prepared 't2'")
|
||||
|
||||
wait_replica_caughtup(primary, secondary)
|
||||
|
||||
secondary_cur.execute("select count(*) from t")
|
||||
if shutdown:
|
||||
assert secondary_cur.fetchone() == (0,)
|
||||
else:
|
||||
assert secondary_cur.fetchone() == (50,)
|
||||
secondary_cur.execute("select * from t1")
|
||||
assert secondary_cur.fetchall() == [(1,)]
|
||||
secondary_cur.execute("select * from t2")
|
||||
assert secondary_cur.fetchall() == [(2,), (3,)]
|
||||
|
||||
|
||||
def test_replica_start_with_prepared_xacts_with_subxacts(neon_simple_env: NeonEnv):
|
||||
"""
|
||||
Test the CLOG-scanning mechanism at hot standby startup in the presence of
|
||||
prepared transactions, with subtransactions.
|
||||
"""
|
||||
|
||||
# Initialize the primary, a test table, and a helper function to create lots
|
||||
# of subtransactions.
|
||||
env = neon_simple_env
|
||||
primary = env.endpoints.create_start(
|
||||
branch_name="main", endpoint_id="primary", config_lines=["max_prepared_transactions=5"]
|
||||
)
|
||||
primary_conn = primary.connect()
|
||||
primary_cur = primary_conn.cursor()
|
||||
|
||||
# Install extension containing function needed for test
|
||||
primary_cur.execute("CREATE EXTENSION neon_test_utils")
|
||||
|
||||
primary_cur.execute("create table t(pk serial primary key, payload integer)")
|
||||
primary_cur.execute(CREATE_SUBXACTS_FUNC)
|
||||
|
||||
# Advance nextXid close to the beginning of the next pg_subtrans segment (2^16 XIDs)
|
||||
#
|
||||
# This is interesting, because it tests that pg_subtrans is initialized correctly
|
||||
# at standby startup. (We had a bug where it didn't at one point during development.)
|
||||
while True:
|
||||
xid = int(query_scalar(primary_cur, "SELECT txid_current()"))
|
||||
log.info(f"xid now {xid}")
|
||||
# Consume 500 transactions at a time until we get close
|
||||
if xid < 65535 - 600:
|
||||
primary_cur.execute("select test_consume_xids(500);")
|
||||
else:
|
||||
break
|
||||
primary_cur.execute("checkpoint")
|
||||
|
||||
# Prepare a transaction for two-phase commit
|
||||
primary_cur.execute("begin")
|
||||
primary_cur.execute("select create_subxacts(1000)")
|
||||
primary_cur.execute("prepare transaction 't1'")
|
||||
|
||||
# Wait for the WAL to be flushed, and stop the primary
|
||||
wait_for_last_flush_lsn(env, primary, env.initial_tenant, env.initial_timeline)
|
||||
primary.stop(mode="fast")
|
||||
|
||||
# Create a replica. It should start up normally, thanks to the CLOG-scanning
|
||||
# mechanism.
|
||||
secondary = env.endpoints.new_replica_start(
|
||||
origin=primary, endpoint_id="secondary", config_lines=["max_prepared_transactions=5"]
|
||||
)
|
||||
|
||||
# The transaction did not commit, so it should not be visible in the secondary
|
||||
secondary_conn = secondary.connect()
|
||||
secondary_cur = secondary_conn.cursor()
|
||||
secondary_cur.execute("select count(*) from t")
|
||||
assert secondary_cur.fetchone() == (0,)
|
||||
|
||||
primary.start()
|
||||
|
||||
# Open a lot of subtransactions in the primary, causing the subxids cache to overflow
|
||||
primary_conn = primary.connect()
|
||||
primary_cur = primary_conn.cursor()
|
||||
primary_cur.execute("select create_subxacts(100000)")
|
||||
|
||||
wait_replica_caughtup(primary, secondary)
|
||||
|
||||
secondary_cur.execute("select count(*) from t")
|
||||
assert secondary_cur.fetchone() == (100000,)
|
||||
|
||||
primary_cur.execute("commit prepared 't1'")
|
||||
|
||||
wait_replica_caughtup(primary, secondary)
|
||||
secondary_cur.execute("select count(*) from t")
|
||||
assert secondary_cur.fetchone() == (101000,)
|
||||
|
||||
|
||||
def test_replica_start_with_prepared_xacts_with_many_subxacts(neon_simple_env: NeonEnv):
|
||||
"""
|
||||
Test the CLOG-scanning mechanism at hot standby startup in the presence of
|
||||
prepared transactions, with lots of subtransactions.
|
||||
|
||||
Like test_replica_start_with_prepared_xacts_with_subxacts, but with more
|
||||
subxacts, to test that the prepared transaction's subxids don't consume
|
||||
space in the known-assigned XIDs array. (They are set in pg_subtrans
|
||||
instead)
|
||||
"""
|
||||
|
||||
# Initialize the primary, a test table, and a helper function to create lots
|
||||
# of subtransactions.
|
||||
env = neon_simple_env
|
||||
primary = env.endpoints.create_start(
|
||||
branch_name="main", endpoint_id="primary", config_lines=["max_prepared_transactions=5"]
|
||||
)
|
||||
primary_conn = primary.connect()
|
||||
primary_cur = primary_conn.cursor()
|
||||
|
||||
# Install extension containing function needed for test
|
||||
primary_cur.execute("CREATE EXTENSION neon_test_utils")
|
||||
|
||||
primary_cur.execute("create table t(pk serial primary key, payload integer)")
|
||||
primary_cur.execute(CREATE_SUBXACTS_FUNC)
|
||||
|
||||
# Prepare a transaction for two-phase commit, with lots of subxids
|
||||
primary_cur.execute("begin")
|
||||
primary_cur.execute("select create_subxacts(50000)")
|
||||
|
||||
# to make things a bit more varied, intersperse a few other XIDs in between
|
||||
# the prepared transaction's sub-XIDs
|
||||
with primary.connect().cursor() as primary_cur2:
|
||||
primary_cur2.execute("insert into t (payload) values (123)")
|
||||
primary_cur2.execute("begin; insert into t (payload) values (-1); rollback")
|
||||
|
||||
primary_cur.execute("select create_subxacts(50000)")
|
||||
primary_cur.execute("prepare transaction 't1'")
|
||||
|
||||
# Wait for the WAL to be flushed
|
||||
wait_for_last_flush_lsn(env, primary, env.initial_tenant, env.initial_timeline)
|
||||
|
||||
primary.stop(mode="fast")
|
||||
|
||||
# Create a replica. It should start up normally, thanks to the CLOG-scanning
|
||||
# mechanism.
|
||||
secondary = env.endpoints.new_replica_start(
|
||||
origin=primary, endpoint_id="secondary", config_lines=["max_prepared_transactions=5"]
|
||||
)
|
||||
|
||||
# The transaction did not commit, so it should not be visible in the secondary
|
||||
secondary_conn = secondary.connect()
|
||||
secondary_cur = secondary_conn.cursor()
|
||||
secondary_cur.execute("select count(*) from t")
|
||||
assert secondary_cur.fetchone() == (1,)
|
||||
|
||||
primary.start()
|
||||
|
||||
# Open a lot of subtransactions in the primary, causing the subxids cache to overflow
|
||||
primary_conn = primary.connect()
|
||||
primary_cur = primary_conn.cursor()
|
||||
primary_cur.execute("select create_subxacts(100000)")
|
||||
|
||||
wait_replica_caughtup(primary, secondary)
|
||||
|
||||
secondary_cur.execute("select count(*) from t")
|
||||
assert secondary_cur.fetchone() == (100001,)
|
||||
|
||||
primary_cur.execute("commit prepared 't1'")
|
||||
|
||||
wait_replica_caughtup(primary, secondary)
|
||||
secondary_cur.execute("select count(*) from t")
|
||||
assert secondary_cur.fetchone() == (200001,)
|
||||
32
test_runner/regress/test_replication_start.py
Normal file
32
test_runner/regress/test_replication_start.py
Normal file
@@ -0,0 +1,32 @@
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv, wait_replica_caughtup
|
||||
|
||||
|
||||
@pytest.mark.xfail
|
||||
def test_replication_start(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
|
||||
with env.endpoints.create_start(branch_name="main", endpoint_id="primary") as primary:
|
||||
with primary.connect() as p_con:
|
||||
with p_con.cursor() as p_cur:
|
||||
p_cur.execute("begin")
|
||||
p_cur.execute("create table t(pk integer primary key, payload integer)")
|
||||
p_cur.execute("insert into t values (generate_series(1,100000), 0)")
|
||||
p_cur.execute("select txid_current()")
|
||||
xid = p_cur.fetchall()[0][0]
|
||||
log.info(f"Master transaction {xid}")
|
||||
with env.endpoints.new_replica_start(
|
||||
origin=primary, endpoint_id="secondary"
|
||||
) as secondary:
|
||||
wait_replica_caughtup(primary, secondary)
|
||||
with secondary.connect() as s_con:
|
||||
with s_con.cursor() as s_cur:
|
||||
# Enforce setting hint bits for pg_class tuples.
|
||||
# If master's transaction is not marked as in-progress in MVCC snapshot,
|
||||
# then XMIN_INVALID hint bit will be set for table's 't' tuple makeing it invisible.
|
||||
s_cur.execute("select * from pg_class")
|
||||
p_cur.execute("commit")
|
||||
wait_replica_caughtup(primary, secondary)
|
||||
s_cur.execute("select * from t where pk = 1")
|
||||
assert s_cur.fetchone() == (1, 0)
|
||||
2
vendor/postgres-v14
vendored
2
vendor/postgres-v14
vendored
Submodule vendor/postgres-v14 updated: ad73770c44...223dd92595
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: 4874c8e52e...f54d7373eb
2
vendor/postgres-v16
vendored
2
vendor/postgres-v16
vendored
Submodule vendor/postgres-v16 updated: b810fdfcbb...e06bebc753
6
vendor/revisions.json
vendored
6
vendor/revisions.json
vendored
@@ -1,5 +1,5 @@
|
||||
{
|
||||
"v16": ["16.3", "b810fdfcbb59afea7ea7bbe0cf94eaccb55a2ea2"],
|
||||
"v15": ["15.7", "4874c8e52ed349a9f8290bbdcd91eb92677a5d24"],
|
||||
"v14": ["14.12", "ad73770c446ea361f43e4f0404798b7e5e7a62d8"]
|
||||
"v16": ["16.3", "e06bebc75306b583e758b52c95946d41109239b2"],
|
||||
"v15": ["15.7", "f54d7373eb0de5a54bce2becdb1c801026c7edff"],
|
||||
"v14": ["14.12", "223dd925959f8124711dd3d867dc8ba6629d52c0"]
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user