Compare commits

...

4 Commits

Author SHA1 Message Date
Konstantin Knizhnik
2e448a22ba Bump Postgres versions 2024-08-21 08:41:48 +03:00
Konstantin Knizhnik
d1231431d6 Bump Postgres version 2024-08-21 08:41:47 +03:00
Konstantin Knizhnik
8b445a7222 Add more traces to PS to understand the reasons of last commits in test_restart_subscriber 2024-08-21 08:41:47 +03:00
Konstantin Knizhnik
2f2aa9b79b Add trace to investigate fukyness of test_subscriber_restart 2024-08-21 08:41:47 +03:00
7 changed files with 32 additions and 8 deletions

View File

@@ -413,6 +413,10 @@ where
let mut content = Vec::with_capacity(n_origins * 16 + 8);
content.extend_from_slice(&pg_constants::REPLICATION_STATE_MAGIC.to_le_bytes());
for (origin_id, origin_lsn) in repl_origins {
info!(
"Include origin_id={}, origin_lsn={} in basebackup",
origin_id, origin_lsn
);
content.extend_from_slice(&origin_id.to_le_bytes());
content.extend_from_slice(&[0u8; 6]); // align to 8 bytes
content.extend_from_slice(&origin_lsn.0.to_le_bytes());

View File

@@ -235,6 +235,7 @@ impl WalIngest {
&parsed_xact,
info == pg_constants::XLOG_XACT_COMMIT,
decoded.origin_id,
lsn,
ctx,
)
.await?;
@@ -248,6 +249,7 @@ impl WalIngest {
&parsed_xact,
info == pg_constants::XLOG_XACT_COMMIT_PREPARED,
decoded.origin_id,
lsn,
ctx,
)
.await?;
@@ -408,11 +410,16 @@ impl WalIngest {
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
if info == pg_constants::XLOG_REPLORIGIN_SET {
let xlrec = crate::walrecord::XlReploriginSet::decode(&mut buf);
info!(
"Set replication origin_id={}, origin_lsn={}",
xlrec.node_id, xlrec.remote_lsn
);
modification
.set_replorigin(xlrec.node_id, xlrec.remote_lsn)
.await?
} else if info == pg_constants::XLOG_REPLORIGIN_DROP {
let xlrec = crate::walrecord::XlReploriginDrop::decode(&mut buf);
info!("Drop replication origin_id={}", xlrec.node_id);
modification.drop_replorigin(xlrec.node_id).await?
}
}
@@ -1220,6 +1227,7 @@ impl WalIngest {
parsed: &XlXactParsedRecord,
is_commit: bool,
origin_id: u16,
lsn: Lsn,
ctx: &RequestContext,
) -> anyhow::Result<()> {
// Record update of CLOG pages
@@ -1286,6 +1294,10 @@ impl WalIngest {
}
}
if origin_id != 0 {
info!(
"Commit at {} origin_id={}, origin_lsn={}",
lsn, origin_id, parsed.origin_lsn
);
modification
.set_replorigin(origin_id, parsed.origin_lsn)
.await?;

View File

@@ -1,6 +1,7 @@
import threading
import time
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv
from fixtures.utils import wait_until
@@ -21,9 +22,13 @@ def test_subscriber_restart(neon_simple_env: NeonEnv):
n_restarts = 100
def check_that_changes_propagated():
scur.execute("SELECT received_lsn from pg_stat_subscription")
received_lsn = scur.fetchall()[0][0]
log.info(f"received_lsn={received_lsn}")
scur.execute("SELECT count(*) FROM t")
res = scur.fetchall()
assert res[0][0] == n_records
count = scur.fetchall()[0][0]
log.info(f"count={count}")
assert count == n_records
def insert_data(pub):
with pub.cursor() as pcur:
@@ -55,5 +60,8 @@ def test_subscriber_restart(neon_simple_env: NeonEnv):
thread.join()
pcur.execute(f"INSERT into t values ({n_records}, 0)")
n_records += 1
pcur.execute("SELECT pg_current_wal_flush_lsn()")
flush_lsn = pcur.fetchall()[0][0]
log.info(f"flush_lsn={flush_lsn}")
with sub.cursor() as scur:
wait_until(60, 0.5, check_that_changes_propagated)

View File

@@ -1,14 +1,14 @@
{
"v16": [
"16.3",
"47a9122a5a150a3217fafd3f3d4fe8e020ea718a"
"60905ba5d5c3f507c0a12b71cb3e95c818ef5fda"
],
"v15": [
"15.7",
"46b4b235f38413ab5974bb22c022f9b829257674"
"5c4076c873026f2a3314f1432b6b3d0cf8e57544"
],
"v14": [
"14.12",
"3fd7a45f8aae85c080df6329e3c85887b7f3a737"
"796e0cfa111617f5186aa1ee29a1c974317dbeea"
]
}