From 40164bd5899878878be701798f4415dc2c968e67 Mon Sep 17 00:00:00 2001 From: andres Date: Tue, 25 Oct 2022 18:05:26 +0200 Subject: [PATCH] Use latestMsgReceivedAt in walproposer --- pgxn/neon/walproposer.c | 10 ++++------ pgxn/neon/walproposer.h | 2 +- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/pgxn/neon/walproposer.c b/pgxn/neon/walproposer.c index c78c79a9bb..c5f283aa22 100644 --- a/pgxn/neon/walproposer.c +++ b/pgxn/neon/walproposer.c @@ -438,9 +438,7 @@ WalProposerPoll(void) { Safekeeper *sk = &safekeeper[i]; - if ((sk->state == SS_CONNECTING_WRITE || - sk->state == SS_CONNECTING_READ) && - TimestampDifferenceExceeds(sk->startedConnAt, now, + if (TimestampDifferenceExceeds(sk->latestMsgReceivedAt, now, wal_acceptor_connect_timeout)) { elog(WARNING, "failed to connect to node '%s:%s': exceeded connection timeout %dms", @@ -760,7 +758,7 @@ ResetConnection(Safekeeper *sk) elog(LOG, "connecting with node %s:%s", sk->host, sk->port); sk->state = SS_CONNECTING_WRITE; - sk->startedConnAt = GetCurrentTimestamp(); + sk->latestMsgReceivedAt = GetCurrentTimestamp(); sock = walprop_socket(sk->conn); sk->eventPos = AddWaitEventToSet(waitEvents, WL_SOCKET_WRITEABLE, sock, NULL, sk); @@ -918,7 +916,7 @@ HandleConnectionEvent(Safekeeper *sk) case WP_CONN_POLLING_OK: elog(LOG, "connected with node %s:%s", sk->host, sk->port); - + sk->latestMsgReceivedAt = GetCurrentTimestamp(); /* * We have to pick some event to update event set. We'll * eventually need the socket to be readable, so we go with that. @@ -2304,7 +2302,7 @@ AsyncReadMessage(Safekeeper *sk, AcceptorProposerMessage * anymsg) ResetConnection(sk); return false; } - + sk->latestMsgReceivedAt = GetCurrentTimestamp(); switch (tag) { case 'g': diff --git a/pgxn/neon/walproposer.h b/pgxn/neon/walproposer.h index e237947441..0d3af54a68 100644 --- a/pgxn/neon/walproposer.h +++ b/pgxn/neon/walproposer.h @@ -371,7 +371,7 @@ typedef struct Safekeeper int eventPos; /* position in wait event set. Equal to -1 if* * no event */ SafekeeperState state; /* safekeeper state machine state */ - TimestampTz startedConnAt; /* when connection attempt started */ + TimestampTz latestMsgReceivedAt; /* when latest msg is received */ AcceptorGreeting greetResponse; /* acceptor greeting */ VoteResponse voteResponse; /* the vote */ AppendResponse appendResponse; /* feedback for master */