From df4b944906c02d4ca04ffa489e551bdd63a17ee3 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Wed, 24 Jan 2024 23:11:52 +0200 Subject: [PATCH] Fix decoding of got standby feedback --- pgxn/neon/walproposer_pg.c | 11 ++++++----- safekeeper/src/send_wal.rs | 9 ++++++++- test_runner/regress/test_hot_standby.py | 2 +- 3 files changed, 15 insertions(+), 7 deletions(-) diff --git a/pgxn/neon/walproposer_pg.c b/pgxn/neon/walproposer_pg.c index c8bb8328c8..95739c0d9a 100644 --- a/pgxn/neon/walproposer_pg.c +++ b/pgxn/neon/walproposer_pg.c @@ -1838,23 +1838,24 @@ static void CombineHotStanbyFeedbacks(HotStandbyFeedback *hs, WalProposer *wp) { hs->ts = 0; - hs->xmin.value = InvalidFullTransactionId; - hs->catalog_xmin.value = InvalidFullTransactionId; + hs->xmin = InvalidFullTransactionId; + hs->catalog_xmin = InvalidFullTransactionId; for (int i = 0; i < wp->n_safekeepers; i++) { - if (wp->safekeeper[i].appendResponse.hs.ts != 0) + elog(LOG, "hs.ts=%ld hs.xmin=%ld", wp->safekeeper[i].appendResponse.hs.ts, wp->safekeeper[i].appendResponse.hs.xmin); + if (wp->safekeeper[i].appendResponse.hs.ts != 0) { HotStandbyFeedback *skhs = &wp->safekeeper[i].appendResponse.hs; if (FullTransactionIdIsNormal(skhs->xmin) - && (hs->xmin.value == InvalidFullTransactionId || FullTransactionIdPrecedes(skhs->xmin, hs->xmin))) + && (!FullTransactionIdIsValid(hs->xmin) || FullTransactionIdPrecedes(skhs->xmin, hs->xmin))) { hs->xmin = skhs->xmin; hs->ts = skhs->ts; } if (FullTransactionIdIsNormal(skhs->catalog_xmin) - && (hs->xmin.value == InvalidFullTransactionId || FullTransactionIdPrecedes(skhs->catalog_xmin, hs->catalog_xmin))) + && (!FullTransactionIdIsValid(hs->catalog_xmin) || FullTransactionIdPrecedes(skhs->catalog_xmin, hs->catalog_xmin))) { hs->catalog_xmin = skhs->catalog_xmin; hs->ts = skhs->ts; diff --git a/safekeeper/src/send_wal.rs b/safekeeper/src/send_wal.rs index 9167c45c93..a2956799dc 100644 --- a/safekeeper/src/send_wal.rs +++ b/safekeeper/src/send_wal.rs @@ -689,8 +689,15 @@ impl ReplyReader { match msg.first().cloned() { Some(HOT_STANDBY_FEEDBACK_TAG_BYTE) => { // Note: deserializing is on m[1..] because we skip the tag byte. - let hs_feedback = HotStandbyFeedback::des(&msg[1..]) + let mut hs_feedback = HotStandbyFeedback::des(&msg[1..]) .context("failed to deserialize HotStandbyFeedback")?; + // TODO: xmin/catalog_xmin are serialized by walreceiver.c in this way: + // pq_sendint32(&reply_message, xmin); + // pq_sendint32(&reply_message, xmin_epoch); + // So it is two big endian 32-bit words in low endian order! + hs_feedback.xmin = (hs_feedback.xmin >> 32) | (hs_feedback.xmin << 32); + hs_feedback.catalog_xmin = + (hs_feedback.catalog_xmin >> 32) | (hs_feedback.catalog_xmin << 32); self.ws_guard .walsenders .record_hs_feedback(self.ws_guard.id, &hs_feedback); diff --git a/test_runner/regress/test_hot_standby.py b/test_runner/regress/test_hot_standby.py index 33859caab2..d9ea95266b 100644 --- a/test_runner/regress/test_hot_standby.py +++ b/test_runner/regress/test_hot_standby.py @@ -3,7 +3,7 @@ import re import time from fixtures.log_helper import log -from fixtures.neon_fixtures import Endpoint, NeonEnv, wait_replica_caughtup +from fixtures.neon_fixtures import NeonEnv, wait_replica_caughtup # Check for corrupted WAL messages which might otherwise go unnoticed if # reconnection fixes this.