Add [WP] prefix to all walproposer logging.

- rename walpop_log to wp_log
- create also wpg_log which is used in postgres-specific code
- in passing format messages to start with lower case
This commit is contained in:
Arseny Sher
2024-01-02 20:27:53 +03:00
committed by Arseny Sher
parent aa9f1d4b69
commit ae3eaf9995
4 changed files with 169 additions and 157 deletions

View File

@@ -425,7 +425,7 @@ mod tests {
}
fn log_internal(&self, _wp: &mut crate::bindings::WalProposer, level: Level, msg: &str) {
println!("walprop_log[{}] {}", level, msg);
println!("wp_log[{}] {}", level, msg);
}
fn after_election(&self, _wp: &mut crate::bindings::WalProposer) {

View File

@@ -99,7 +99,7 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api)
port = strchr(host, ':');
if (port == NULL)
{
walprop_log(FATAL, "port is not specified");
wp_log(FATAL, "port is not specified");
}
*port++ = '\0';
sep = strchr(port, ',');
@@ -107,7 +107,7 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api)
*sep++ = '\0';
if (wp->n_safekeepers + 1 >= MAX_SAFEKEEPERS)
{
walprop_log(FATAL, "Too many safekeepers");
wp_log(FATAL, "too many safekeepers");
}
wp->safekeeper[wp->n_safekeepers].host = host;
wp->safekeeper[wp->n_safekeepers].port = port;
@@ -123,7 +123,7 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api)
"host=%s port=%s dbname=replication options='-c timeline_id=%s tenant_id=%s'",
sk->host, sk->port, wp->config->neon_timeline, wp->config->neon_tenant);
if (written > MAXCONNINFO || written < 0)
walprop_log(FATAL, "could not create connection string for safekeeper %s:%s", sk->host, sk->port);
wp_log(FATAL, "could not create connection string for safekeeper %s:%s", sk->host, sk->port);
}
initStringInfo(&wp->safekeeper[wp->n_safekeepers].outbuf);
@@ -133,7 +133,7 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api)
}
if (wp->n_safekeepers < 1)
{
walprop_log(FATAL, "Safekeepers addresses are not specified");
wp_log(FATAL, "safekeepers addresses are not specified");
}
wp->quorum = wp->n_safekeepers / 2 + 1;
@@ -144,15 +144,15 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api)
wp->api.strong_random(wp, &wp->greetRequest.proposerId, sizeof(wp->greetRequest.proposerId));
wp->greetRequest.systemId = wp->config->systemId;
if (!wp->config->neon_timeline)
walprop_log(FATAL, "neon.timeline_id is not provided");
wp_log(FATAL, "neon.timeline_id is not provided");
if (*wp->config->neon_timeline != '\0' &&
!HexDecodeString(wp->greetRequest.timeline_id, wp->config->neon_timeline, 16))
walprop_log(FATAL, "Could not parse neon.timeline_id, %s", wp->config->neon_timeline);
wp_log(FATAL, "could not parse neon.timeline_id, %s", wp->config->neon_timeline);
if (!wp->config->neon_tenant)
walprop_log(FATAL, "neon.tenant_id is not provided");
wp_log(FATAL, "neon.tenant_id is not provided");
if (*wp->config->neon_tenant != '\0' &&
!HexDecodeString(wp->greetRequest.tenant_id, wp->config->neon_tenant, 16))
walprop_log(FATAL, "Could not parse neon.tenant_id, %s", wp->config->neon_tenant);
wp_log(FATAL, "could not parse neon.tenant_id, %s", wp->config->neon_tenant);
wp->greetRequest.timeline = wp->config->pgTimeline;
wp->greetRequest.walSegSize = wp->config->wal_segment_size;
@@ -274,8 +274,8 @@ WalProposerPoll(WalProposer *wp)
if (TimestampDifferenceExceeds(sk->latestMsgReceivedAt, now,
wp->config->safekeeper_connection_timeout))
{
walprop_log(WARNING, "terminating connection to safekeeper '%s:%s' in '%s' state: no messages received during the last %dms or connection attempt took longer than that",
sk->host, sk->port, FormatSafekeeperState(sk), wp->config->safekeeper_connection_timeout);
wp_log(WARNING, "terminating connection to safekeeper '%s:%s' in '%s' state: no messages received during the last %dms or connection attempt took longer than that",
sk->host, sk->port, FormatSafekeeperState(sk), wp->config->safekeeper_connection_timeout);
ShutdownConnection(sk);
}
}
@@ -356,8 +356,8 @@ ResetConnection(Safekeeper *sk)
*
* https://www.postgresql.org/docs/devel/libpq-connect.html#LIBPQ-PQCONNECTSTARTPARAMS
*/
walprop_log(WARNING, "Immediate failure to connect with node '%s:%s':\n\terror: %s",
sk->host, sk->port, wp->api.conn_error_message(sk));
wp_log(WARNING, "immediate failure to connect with node '%s:%s':\n\terror: %s",
sk->host, sk->port, wp->api.conn_error_message(sk));
/*
* Even though the connection failed, we still need to clean up the
@@ -380,7 +380,7 @@ ResetConnection(Safekeeper *sk)
* (see libpqrcv_connect, defined in
* src/backend/replication/libpqwalreceiver/libpqwalreceiver.c)
*/
walprop_log(LOG, "connecting with node %s:%s", sk->host, sk->port);
wp_log(LOG, "connecting with node %s:%s", sk->host, sk->port);
sk->state = SS_CONNECTING_WRITE;
sk->latestMsgReceivedAt = wp->api.get_current_timestamp(wp);
@@ -434,7 +434,7 @@ ReconnectSafekeepers(WalProposer *wp)
static void
AdvancePollState(Safekeeper *sk, uint32 events)
{
#ifdef WALPROPOSER_LIB /* walprop_log needs wp in lib build */
#ifdef WALPROPOSER_LIB /* wp_log needs wp in lib build */
WalProposer *wp = sk->wp;
#endif
@@ -452,8 +452,8 @@ AdvancePollState(Safekeeper *sk, uint32 events)
* ResetConnection
*/
case SS_OFFLINE:
walprop_log(FATAL, "Unexpected safekeeper %s:%s state advancement: is offline",
sk->host, sk->port);
wp_log(FATAL, "unexpected safekeeper %s:%s state advancement: is offline",
sk->host, sk->port);
break; /* actually unreachable, but prevents
* -Wimplicit-fallthrough */
@@ -488,8 +488,8 @@ AdvancePollState(Safekeeper *sk, uint32 events)
* requests.
*/
case SS_VOTING:
walprop_log(WARNING, "EOF from node %s:%s in %s state", sk->host,
sk->port, FormatSafekeeperState(sk));
wp_log(WARNING, "EOF from node %s:%s in %s state", sk->host,
sk->port, FormatSafekeeperState(sk));
ResetConnection(sk);
return;
@@ -517,8 +517,8 @@ AdvancePollState(Safekeeper *sk, uint32 events)
* Idle state for waiting votes from quorum.
*/
case SS_IDLE:
walprop_log(WARNING, "EOF from node %s:%s in %s state", sk->host,
sk->port, FormatSafekeeperState(sk));
wp_log(WARNING, "EOF from node %s:%s in %s state", sk->host,
sk->port, FormatSafekeeperState(sk));
ResetConnection(sk);
return;
@@ -543,8 +543,8 @@ HandleConnectionEvent(Safekeeper *sk)
switch (result)
{
case WP_CONN_POLLING_OK:
walprop_log(LOG, "connected with node %s:%s", sk->host,
sk->port);
wp_log(LOG, "connected with node %s:%s", sk->host,
sk->port);
sk->latestMsgReceivedAt = wp->api.get_current_timestamp(wp);
/*
@@ -567,8 +567,8 @@ HandleConnectionEvent(Safekeeper *sk)
break;
case WP_CONN_POLLING_FAILED:
walprop_log(WARNING, "failed to connect to node '%s:%s': %s",
sk->host, sk->port, wp->api.conn_error_message(sk));
wp_log(WARNING, "failed to connect to node '%s:%s': %s",
sk->host, sk->port, wp->api.conn_error_message(sk));
/*
* If connecting failed, we don't want to restart the connection
@@ -604,8 +604,8 @@ SendStartWALPush(Safekeeper *sk)
if (!wp->api.conn_send_query(sk, "START_WAL_PUSH"))
{
walprop_log(WARNING, "Failed to send 'START_WAL_PUSH' query to safekeeper %s:%s: %s",
sk->host, sk->port, wp->api.conn_error_message(sk));
wp_log(WARNING, "failed to send 'START_WAL_PUSH' query to safekeeper %s:%s: %s",
sk->host, sk->port, wp->api.conn_error_message(sk));
ShutdownConnection(sk);
return;
}
@@ -641,8 +641,8 @@ RecvStartWALPushResult(Safekeeper *sk)
break;
case WP_EXEC_FAILED:
walprop_log(WARNING, "Failed to send query to safekeeper %s:%s: %s",
sk->host, sk->port, wp->api.conn_error_message(sk));
wp_log(WARNING, "failed to send query to safekeeper %s:%s: %s",
sk->host, sk->port, wp->api.conn_error_message(sk));
ShutdownConnection(sk);
return;
@@ -652,8 +652,8 @@ RecvStartWALPushResult(Safekeeper *sk)
* wrong"
*/
case WP_EXEC_UNEXPECTED_SUCCESS:
walprop_log(WARNING, "Received bad response from safekeeper %s:%s query execution",
sk->host, sk->port);
wp_log(WARNING, "received bad response from safekeeper %s:%s query execution",
sk->host, sk->port);
ShutdownConnection(sk);
return;
}
@@ -688,7 +688,7 @@ RecvAcceptorGreeting(Safekeeper *sk)
if (!AsyncReadMessage(sk, (AcceptorProposerMessage *) &sk->greetResponse))
return;
walprop_log(LOG, "received AcceptorGreeting from safekeeper %s:%s", sk->host, sk->port);
wp_log(LOG, "received AcceptorGreeting from safekeeper %s:%s", sk->host, sk->port);
/* Protocol is all good, move to voting. */
sk->state = SS_VOTING;
@@ -708,7 +708,7 @@ RecvAcceptorGreeting(Safekeeper *sk)
if (wp->n_connected == wp->quorum)
{
wp->propTerm++;
walprop_log(LOG, "proposer connected to quorum (%d) safekeepers, propTerm=" INT64_FORMAT, wp->quorum, wp->propTerm);
wp_log(LOG, "proposer connected to quorum (%d) safekeepers, propTerm=" INT64_FORMAT, wp->quorum, wp->propTerm);
wp->voteRequest = (VoteRequest)
{
@@ -721,9 +721,9 @@ RecvAcceptorGreeting(Safekeeper *sk)
else if (sk->greetResponse.term > wp->propTerm)
{
/* Another compute with higher term is running. */
walprop_log(FATAL, "WAL acceptor %s:%s with term " INT64_FORMAT " rejects our connection request with term " INT64_FORMAT "",
sk->host, sk->port,
sk->greetResponse.term, wp->propTerm);
wp_log(FATAL, "WAL acceptor %s:%s with term " INT64_FORMAT " rejects our connection request with term " INT64_FORMAT "",
sk->host, sk->port,
sk->greetResponse.term, wp->propTerm);
}
/*
@@ -763,7 +763,7 @@ SendVoteRequest(Safekeeper *sk)
WalProposer *wp = sk->wp;
/* We have quorum for voting, send our vote request */
walprop_log(LOG, "requesting vote from %s:%s for term " UINT64_FORMAT, sk->host, sk->port, wp->voteRequest.term);
wp_log(LOG, "requesting vote from %s:%s for term " UINT64_FORMAT, sk->host, sk->port, wp->voteRequest.term);
/* On failure, logging & resetting is handled */
if (!BlockingWrite(sk, &wp->voteRequest, sizeof(wp->voteRequest), SS_WAIT_VERDICT))
return;
@@ -780,12 +780,12 @@ RecvVoteResponse(Safekeeper *sk)
if (!AsyncReadMessage(sk, (AcceptorProposerMessage *) &sk->voteResponse))
return;
walprop_log(LOG,
"got VoteResponse from acceptor %s:%s, voteGiven=" UINT64_FORMAT ", epoch=" UINT64_FORMAT ", flushLsn=%X/%X, truncateLsn=%X/%X, timelineStartLsn=%X/%X",
sk->host, sk->port, sk->voteResponse.voteGiven, GetHighestTerm(&sk->voteResponse.termHistory),
LSN_FORMAT_ARGS(sk->voteResponse.flushLsn),
LSN_FORMAT_ARGS(sk->voteResponse.truncateLsn),
LSN_FORMAT_ARGS(sk->voteResponse.timelineStartLsn));
wp_log(LOG,
"got VoteResponse from acceptor %s:%s, voteGiven=" UINT64_FORMAT ", epoch=" UINT64_FORMAT ", flushLsn=%X/%X, truncateLsn=%X/%X, timelineStartLsn=%X/%X",
sk->host, sk->port, sk->voteResponse.voteGiven, GetHighestTerm(&sk->voteResponse.termHistory),
LSN_FORMAT_ARGS(sk->voteResponse.flushLsn),
LSN_FORMAT_ARGS(sk->voteResponse.truncateLsn),
LSN_FORMAT_ARGS(sk->voteResponse.timelineStartLsn));
/*
* In case of acceptor rejecting our vote, bail out, but only if either it
@@ -795,9 +795,9 @@ RecvVoteResponse(Safekeeper *sk)
if ((!sk->voteResponse.voteGiven) &&
(sk->voteResponse.term > wp->propTerm || wp->n_votes < wp->quorum))
{
walprop_log(FATAL, "WAL acceptor %s:%s with term " INT64_FORMAT " rejects our connection request with term " INT64_FORMAT "",
sk->host, sk->port,
sk->voteResponse.term, wp->propTerm);
wp_log(FATAL, "WAL acceptor %s:%s with term " INT64_FORMAT " rejects our connection request with term " INT64_FORMAT "",
sk->host, sk->port,
sk->voteResponse.term, wp->propTerm);
}
Assert(sk->voteResponse.term == wp->propTerm);
@@ -841,7 +841,7 @@ HandleElectedProposer(WalProposer *wp)
*/
if (!wp->api.recovery_download(wp, &wp->safekeeper[wp->donor]))
{
walprop_log(FATAL, "failed to download WAL for logical replicaiton");
wp_log(FATAL, "failed to download WAL for logical replicaiton");
}
if (wp->truncateLsn == wp->propEpochStartLsn && wp->config->syncSafekeepers)
@@ -948,10 +948,10 @@ DetermineEpochStartLsn(WalProposer *wp)
if (wp->timelineStartLsn != InvalidXLogRecPtr &&
wp->timelineStartLsn != wp->safekeeper[i].voteResponse.timelineStartLsn)
{
walprop_log(WARNING,
"inconsistent timelineStartLsn: current %X/%X, received %X/%X",
LSN_FORMAT_ARGS(wp->timelineStartLsn),
LSN_FORMAT_ARGS(wp->safekeeper[i].voteResponse.timelineStartLsn));
wp_log(WARNING,
"inconsistent timelineStartLsn: current %X/%X, received %X/%X",
LSN_FORMAT_ARGS(wp->timelineStartLsn),
LSN_FORMAT_ARGS(wp->safekeeper[i].voteResponse.timelineStartLsn));
}
wp->timelineStartLsn = wp->safekeeper[i].voteResponse.timelineStartLsn;
}
@@ -969,7 +969,7 @@ DetermineEpochStartLsn(WalProposer *wp)
{
wp->timelineStartLsn = wp->api.get_redo_start_lsn(wp);
}
walprop_log(LOG, "bumped epochStartLsn to the first record %X/%X", LSN_FORMAT_ARGS(wp->propEpochStartLsn));
wp_log(LOG, "bumped epochStartLsn to the first record %X/%X", LSN_FORMAT_ARGS(wp->propEpochStartLsn));
}
/*
@@ -996,12 +996,12 @@ DetermineEpochStartLsn(WalProposer *wp)
wp->propTermHistory.entries[wp->propTermHistory.n_entries - 1].term = wp->propTerm;
wp->propTermHistory.entries[wp->propTermHistory.n_entries - 1].lsn = wp->propEpochStartLsn;
walprop_log(LOG, "got votes from majority (%d) of nodes, term " UINT64_FORMAT ", epochStartLsn %X/%X, donor %s:%s, truncate_lsn %X/%X",
wp->quorum,
wp->propTerm,
LSN_FORMAT_ARGS(wp->propEpochStartLsn),
wp->safekeeper[wp->donor].host, wp->safekeeper[wp->donor].port,
LSN_FORMAT_ARGS(wp->truncateLsn));
wp_log(LOG, "got votes from majority (%d) of nodes, term " UINT64_FORMAT ", epochStartLsn %X/%X, donor %s:%s, truncate_lsn %X/%X",
wp->quorum,
wp->propTerm,
LSN_FORMAT_ARGS(wp->propEpochStartLsn),
wp->safekeeper[wp->donor].host, wp->safekeeper[wp->donor].port,
LSN_FORMAT_ARGS(wp->truncateLsn));
/*
* Ensure the basebackup we are running (at RedoStartLsn) matches LSN
@@ -1034,10 +1034,10 @@ DetermineEpochStartLsn(WalProposer *wp)
* scenario.
*/
disable_core_dump();
walprop_log(PANIC,
"collected propEpochStartLsn %X/%X, but basebackup LSN %X/%X",
LSN_FORMAT_ARGS(wp->propEpochStartLsn),
LSN_FORMAT_ARGS(wp->api.get_redo_start_lsn(wp)));
wp_log(PANIC,
"collected propEpochStartLsn %X/%X, but basebackup LSN %X/%X",
LSN_FORMAT_ARGS(wp->propEpochStartLsn),
LSN_FORMAT_ARGS(wp->api.get_redo_start_lsn(wp)));
}
}
walprop_shared->mineLastElectedTerm = wp->propTerm;
@@ -1115,9 +1115,9 @@ SendProposerElected(Safekeeper *sk)
*/
sk->startStreamingAt = wp->truncateLsn;
walprop_log(WARNING, "empty safekeeper joined cluster as %s:%s, historyStart=%X/%X, sk->startStreamingAt=%X/%X",
sk->host, sk->port, LSN_FORMAT_ARGS(wp->propTermHistory.entries[0].lsn),
LSN_FORMAT_ARGS(sk->startStreamingAt));
wp_log(WARNING, "empty safekeeper joined cluster as %s:%s, historyStart=%X/%X, sk->startStreamingAt=%X/%X",
sk->host, sk->port, LSN_FORMAT_ARGS(wp->propTermHistory.entries[0].lsn),
LSN_FORMAT_ARGS(sk->startStreamingAt));
}
}
else
@@ -1150,9 +1150,9 @@ SendProposerElected(Safekeeper *sk)
msg.timelineStartLsn = wp->timelineStartLsn;
lastCommonTerm = i >= 0 ? wp->propTermHistory.entries[i].term : 0;
walprop_log(LOG,
"sending elected msg to node " UINT64_FORMAT " term=" UINT64_FORMAT ", startStreamingAt=%X/%X (lastCommonTerm=" UINT64_FORMAT "), termHistory.n_entries=%u to %s:%s, timelineStartLsn=%X/%X",
sk->greetResponse.nodeId, msg.term, LSN_FORMAT_ARGS(msg.startStreamingAt), lastCommonTerm, msg.termHistory->n_entries, sk->host, sk->port, LSN_FORMAT_ARGS(msg.timelineStartLsn));
wp_log(LOG,
"sending elected msg to node " UINT64_FORMAT " term=" UINT64_FORMAT ", startStreamingAt=%X/%X (lastCommonTerm=" UINT64_FORMAT "), termHistory.n_entries=%u to %s:%s, timelineStartLsn=%X/%X",
sk->greetResponse.nodeId, msg.term, LSN_FORMAT_ARGS(msg.startStreamingAt), lastCommonTerm, msg.termHistory->n_entries, sk->host, sk->port, LSN_FORMAT_ARGS(msg.timelineStartLsn));
resetStringInfo(&sk->outbuf);
pq_sendint64_le(&sk->outbuf, msg.tag);
@@ -1261,8 +1261,8 @@ HandleActiveState(Safekeeper *sk, uint32 events)
/* expected never to happen, c.f. walprop_pg_active_state_update_event_set */
if (events & WL_SOCKET_CLOSED)
{
walprop_log(WARNING, "connection to %s:%s in active state failed, got WL_SOCKET_CLOSED on neon_walreader socket",
sk->host, sk->port);
wp_log(WARNING, "connection to %s:%s in active state failed, got WL_SOCKET_CLOSED on neon_walreader socket",
sk->host, sk->port);
ShutdownConnection(sk);
return;
}
@@ -1323,12 +1323,12 @@ SendAppendRequests(Safekeeper *sk)
req = &sk->appendRequest;
PrepareAppendRequest(sk->wp, &sk->appendRequest, sk->streamingAt, endLsn);
walprop_log(DEBUG5, "sending message len %ld beginLsn=%X/%X endLsn=%X/%X commitLsn=%X/%X truncateLsn=%X/%X to %s:%s",
req->endLsn - req->beginLsn,
LSN_FORMAT_ARGS(req->beginLsn),
LSN_FORMAT_ARGS(req->endLsn),
LSN_FORMAT_ARGS(req->commitLsn),
LSN_FORMAT_ARGS(wp->truncateLsn), sk->host, sk->port);
wp_log(DEBUG5, "sending message len %ld beginLsn=%X/%X endLsn=%X/%X commitLsn=%X/%X truncateLsn=%X/%X to %s:%s",
req->endLsn - req->beginLsn,
LSN_FORMAT_ARGS(req->beginLsn),
LSN_FORMAT_ARGS(req->endLsn),
LSN_FORMAT_ARGS(req->commitLsn),
LSN_FORMAT_ARGS(wp->truncateLsn), sk->host, sk->port);
resetStringInfo(&sk->outbuf);
@@ -1355,8 +1355,8 @@ SendAppendRequests(Safekeeper *sk)
case NEON_WALREAD_WOULDBLOCK:
return true;
case NEON_WALREAD_ERROR:
walprop_log(WARNING, "WAL reading for node %s:%s failed: %s",
sk->host, sk->port, errmsg);
wp_log(WARNING, "WAL reading for node %s:%s failed: %s",
sk->host, sk->port, errmsg);
ShutdownConnection(sk);
return false;
default:
@@ -1388,9 +1388,9 @@ SendAppendRequests(Safekeeper *sk)
return true;
case PG_ASYNC_WRITE_FAIL:
walprop_log(WARNING, "failed to send to node %s:%s in %s state: %s",
sk->host, sk->port, FormatSafekeeperState(sk),
wp->api.conn_error_message(sk));
wp_log(WARNING, "failed to send to node %s:%s in %s state: %s",
sk->host, sk->port, FormatSafekeeperState(sk),
wp->api.conn_error_message(sk));
ShutdownConnection(sk);
return false;
default:
@@ -1429,11 +1429,11 @@ RecvAppendResponses(Safekeeper *sk)
if (!AsyncReadMessage(sk, (AcceptorProposerMessage *) &sk->appendResponse))
break;
walprop_log(DEBUG2, "received message term=" INT64_FORMAT " flushLsn=%X/%X commitLsn=%X/%X from %s:%s",
sk->appendResponse.term,
LSN_FORMAT_ARGS(sk->appendResponse.flushLsn),
LSN_FORMAT_ARGS(sk->appendResponse.commitLsn),
sk->host, sk->port);
wp_log(DEBUG2, "received message term=" INT64_FORMAT " flushLsn=%X/%X commitLsn=%X/%X from %s:%s",
sk->appendResponse.term,
LSN_FORMAT_ARGS(sk->appendResponse.flushLsn),
LSN_FORMAT_ARGS(sk->appendResponse.commitLsn),
sk->host, sk->port);
if (sk->appendResponse.term > wp->propTerm)
{
@@ -1443,9 +1443,9 @@ RecvAppendResponses(Safekeeper *sk)
* core as this is kinda expected scenario.
*/
disable_core_dump();
walprop_log(PANIC, "WAL acceptor %s:%s with term " INT64_FORMAT " rejected our request, our term " INT64_FORMAT "",
sk->host, sk->port,
sk->appendResponse.term, wp->propTerm);
wp_log(PANIC, "WAL acceptor %s:%s with term " INT64_FORMAT " rejected our request, our term " INT64_FORMAT "",
sk->host, sk->port,
sk->appendResponse.term, wp->propTerm);
}
readAnything = true;
@@ -1489,32 +1489,32 @@ ParsePageserverFeedbackMessage(WalProposer *wp, StringInfo reply_message, Pagese
pq_getmsgint(reply_message, sizeof(int32));
/* read value length */
rf->currentClusterSize = pq_getmsgint64(reply_message);
walprop_log(DEBUG2, "ParsePageserverFeedbackMessage: current_timeline_size %lu",
rf->currentClusterSize);
wp_log(DEBUG2, "ParsePageserverFeedbackMessage: current_timeline_size %lu",
rf->currentClusterSize);
}
else if ((strcmp(key, "ps_writelsn") == 0) || (strcmp(key, "last_received_lsn") == 0))
{
pq_getmsgint(reply_message, sizeof(int32));
/* read value length */
rf->last_received_lsn = pq_getmsgint64(reply_message);
walprop_log(DEBUG2, "ParsePageserverFeedbackMessage: last_received_lsn %X/%X",
LSN_FORMAT_ARGS(rf->last_received_lsn));
wp_log(DEBUG2, "ParsePageserverFeedbackMessage: last_received_lsn %X/%X",
LSN_FORMAT_ARGS(rf->last_received_lsn));
}
else if ((strcmp(key, "ps_flushlsn") == 0) || (strcmp(key, "disk_consistent_lsn") == 0))
{
pq_getmsgint(reply_message, sizeof(int32));
/* read value length */
rf->disk_consistent_lsn = pq_getmsgint64(reply_message);
walprop_log(DEBUG2, "ParsePageserverFeedbackMessage: disk_consistent_lsn %X/%X",
LSN_FORMAT_ARGS(rf->disk_consistent_lsn));
wp_log(DEBUG2, "ParsePageserverFeedbackMessage: disk_consistent_lsn %X/%X",
LSN_FORMAT_ARGS(rf->disk_consistent_lsn));
}
else if ((strcmp(key, "ps_applylsn") == 0) || (strcmp(key, "remote_consistent_lsn") == 0))
{
pq_getmsgint(reply_message, sizeof(int32));
/* read value length */
rf->remote_consistent_lsn = pq_getmsgint64(reply_message);
walprop_log(DEBUG2, "ParsePageserverFeedbackMessage: remote_consistent_lsn %X/%X",
LSN_FORMAT_ARGS(rf->remote_consistent_lsn));
wp_log(DEBUG2, "ParsePageserverFeedbackMessage: remote_consistent_lsn %X/%X",
LSN_FORMAT_ARGS(rf->remote_consistent_lsn));
}
else if ((strcmp(key, "ps_replytime") == 0) || (strcmp(key, "replytime") == 0))
{
@@ -1526,8 +1526,8 @@ ParsePageserverFeedbackMessage(WalProposer *wp, StringInfo reply_message, Pagese
/* Copy because timestamptz_to_str returns a static buffer */
replyTimeStr = pstrdup(timestamptz_to_str(rf->replytime));
walprop_log(DEBUG2, "ParsePageserverFeedbackMessage: replytime %lu reply_time: %s",
rf->replytime, replyTimeStr);
wp_log(DEBUG2, "ParsePageserverFeedbackMessage: replytime %lu reply_time: %s",
rf->replytime, replyTimeStr);
pfree(replyTimeStr);
}
@@ -1541,7 +1541,7 @@ ParsePageserverFeedbackMessage(WalProposer *wp, StringInfo reply_message, Pagese
* Skip unknown keys to support backward compatibile protocol
* changes
*/
walprop_log(LOG, "ParsePageserverFeedbackMessage: unknown key: %s len %d", key, len);
wp_log(LOG, "ParsePageserverFeedbackMessage: unknown key: %s len %d", key, len);
pq_getmsgbytes(reply_message, len);
};
}
@@ -1606,7 +1606,7 @@ GetDonor(WalProposer *wp, XLogRecPtr *donor_lsn)
if (wp->n_votes < wp->quorum)
{
walprop_log(WARNING, "GetDonor called before elections are won");
wp_log(WARNING, "GetDonor called before elections are won");
return NULL;
}
@@ -1734,9 +1734,9 @@ AsyncRead(Safekeeper *sk, char **buf, int *buf_size)
return false;
case PG_ASYNC_READ_FAIL:
walprop_log(WARNING, "Failed to read from node %s:%s in %s state: %s", sk->host,
sk->port, FormatSafekeeperState(sk),
wp->api.conn_error_message(sk));
wp_log(WARNING, "failed to read from node %s:%s in %s state: %s", sk->host,
sk->port, FormatSafekeeperState(sk),
wp->api.conn_error_message(sk));
ShutdownConnection(sk);
return false;
}
@@ -1774,8 +1774,8 @@ AsyncReadMessage(Safekeeper *sk, AcceptorProposerMessage *anymsg)
tag = pq_getmsgint64_le(&s);
if (tag != anymsg->tag)
{
walprop_log(WARNING, "unexpected message tag %c from node %s:%s in state %s", (char) tag, sk->host,
sk->port, FormatSafekeeperState(sk));
wp_log(WARNING, "unexpected message tag %c from node %s:%s in state %s", (char) tag, sk->host,
sk->port, FormatSafekeeperState(sk));
ResetConnection(sk);
return false;
}
@@ -1851,9 +1851,9 @@ BlockingWrite(Safekeeper *sk, void *msg, size_t msg_size, SafekeeperState succes
if (!wp->api.conn_blocking_write(sk, msg, msg_size))
{
walprop_log(WARNING, "Failed to send to node %s:%s in %s state: %s",
sk->host, sk->port, FormatSafekeeperState(sk),
wp->api.conn_error_message(sk));
wp_log(WARNING, "failed to send to node %s:%s in %s state: %s",
sk->host, sk->port, FormatSafekeeperState(sk),
wp->api.conn_error_message(sk));
ShutdownConnection(sk);
return false;
}
@@ -1904,9 +1904,9 @@ AsyncWrite(Safekeeper *sk, void *msg, size_t msg_size, SafekeeperState flush_sta
wp->api.update_event_set(sk, WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE);
return false;
case PG_ASYNC_WRITE_FAIL:
walprop_log(WARNING, "Failed to send to node %s:%s in %s state: %s",
sk->host, sk->port, FormatSafekeeperState(sk),
wp->api.conn_error_message(sk));
wp_log(WARNING, "failed to send to node %s:%s in %s state: %s",
sk->host, sk->port, FormatSafekeeperState(sk),
wp->api.conn_error_message(sk));
ShutdownConnection(sk);
return false;
default:
@@ -1943,9 +1943,9 @@ AsyncFlush(Safekeeper *sk)
/* Nothing to do; try again when the socket's ready */
return false;
case -1:
walprop_log(WARNING, "Failed to flush write to node %s:%s in %s state: %s",
sk->host, sk->port, FormatSafekeeperState(sk),
wp->api.conn_error_message(sk));
wp_log(WARNING, "failed to flush write to node %s:%s in %s state: %s",
sk->host, sk->port, FormatSafekeeperState(sk),
wp->api.conn_error_message(sk));
ResetConnection(sk);
return false;
default:
@@ -1974,11 +1974,11 @@ CompareLsn(const void *a, const void *b)
*
* The strings are intended to be used as a prefix to "state", e.g.:
*
* walprop_log(LOG, "currently in %s state", FormatSafekeeperState(sk));
* wp_log(LOG, "currently in %s state", FormatSafekeeperState(sk));
*
* If this sort of phrasing doesn't fit the message, instead use something like:
*
* walprop_log(LOG, "currently in state [%s]", FormatSafekeeperState(sk));
* wp_log(LOG, "currently in state [%s]", FormatSafekeeperState(sk));
*/
static char *
FormatSafekeeperState(Safekeeper *sk)
@@ -2059,8 +2059,8 @@ AssertEventsOkForState(uint32 events, Safekeeper *sk)
* To give a descriptive message in the case of failure, we use elog
* and then an assertion that's guaranteed to fail.
*/
walprop_log(WARNING, "events %s mismatched for safekeeper %s:%s in state [%s]",
FormatEvents(wp, events), sk->host, sk->port, FormatSafekeeperState(sk));
wp_log(WARNING, "events %s mismatched for safekeeper %s:%s in state [%s]",
FormatEvents(wp, events), sk->host, sk->port, FormatSafekeeperState(sk));
Assert(events_ok_for_state);
}
}
@@ -2199,8 +2199,8 @@ FormatEvents(WalProposer *wp, uint32 events)
if (events & (~all_flags))
{
walprop_log(WARNING, "Event formatting found unexpected component %d",
events & (~all_flags));
wp_log(WARNING, "event formatting found unexpected component %d",
events & (~all_flags));
return_str[6] = '*';
return_str[7] = '\0';
}

View File

@@ -707,11 +707,23 @@ extern Safekeeper *GetDonor(WalProposer *wp, XLogRecPtr *donor_lsn);
#define WPEVENT 1337 /* special log level for walproposer internal
* events */
#define WP_LOG_PREFIX "[WP] "
/*
* wp_log is used in pure wp code (walproposer.c), allowing API callback to
* catch logging.
*/
#ifdef WALPROPOSER_LIB
extern void WalProposerLibLog(WalProposer *wp, int elevel, char *fmt,...);
#define walprop_log(elevel, ...) WalProposerLibLog(wp, elevel, __VA_ARGS__)
#define wp_log(elevel, fmt, ...) WalProposerLibLog(wp, elevel, fmt, ## __VA_ARGS__)
#else
#define walprop_log(elevel, ...) elog(elevel, __VA_ARGS__)
#define wp_log(elevel, fmt, ...) elog(elevel, WP_LOG_PREFIX fmt, ## __VA_ARGS__)
#endif
/*
* And wpg_log is used all other (postgres specific) walproposer code, just
* adding prefix.
*/
#define wpg_log(elevel, fmt, ...) elog(elevel, WP_LOG_PREFIX fmt, ## __VA_ARGS__)
#endif /* __NEON_WALPROPOSER_H__ */

View File

@@ -424,8 +424,8 @@ walprop_pg_start_streaming(WalProposer *wp, XLogRecPtr startpos)
{
StartReplicationCmd cmd;
elog(LOG, "WAL proposer starts streaming at %X/%X",
LSN_FORMAT_ARGS(startpos));
wpg_log(LOG, "WAL proposer starts streaming at %X/%X",
LSN_FORMAT_ARGS(startpos));
cmd.slotname = WAL_PROPOSER_SLOT_NAME;
cmd.timeline = wp->greetRequest.timeline;
cmd.startpoint = startpos;
@@ -549,7 +549,7 @@ walprop_pg_load_libpqwalreceiver(void)
{
load_file("libpqwalreceiver", false);
if (WalReceiverFunctions == NULL)
elog(ERROR, "libpqwalreceiver didn't initialize correctly");
wpg_log(ERROR, "libpqwalreceiver didn't initialize correctly");
}
/* Helper function */
@@ -630,7 +630,7 @@ libpqwp_connect_start(char *conninfo)
* PGconn structure"
*/
if (!pg_conn)
elog(FATAL, "failed to allocate new PGconn object");
wpg_log(FATAL, "failed to allocate new PGconn object");
/*
* And in theory this allocation can fail as well, but it's incredibly
@@ -680,7 +680,7 @@ walprop_connect_poll(Safekeeper *sk)
* unused. We'll expect it's never returned.
*/
case PGRES_POLLING_ACTIVE:
elog(FATAL, "Unexpected PGRES_POLLING_ACTIVE returned from PQconnectPoll");
wpg_log(FATAL, "unexpected PGRES_POLLING_ACTIVE returned from PQconnectPoll");
/*
* This return is never actually reached, but it's here to make
@@ -745,7 +745,7 @@ libpqwp_get_query_result(WalProposerConn *conn)
*/
if (!result)
{
elog(WARNING, "[libpqwalproposer] Unexpected successful end of command results");
wpg_log(WARNING, "[libpqwalproposer] Unexpected successful end of command results");
return WP_EXEC_UNEXPECTED_SUCCESS;
}
@@ -793,7 +793,7 @@ libpqwp_get_query_result(WalProposerConn *conn)
}
if (unexpected_success)
elog(WARNING, "[libpqwalproposer] Unexpected successful %s", unexpected_success);
wpg_log(WARNING, "[libpqwalproposer] Unexpected successful %s", unexpected_success);
return return_val;
}
@@ -872,7 +872,7 @@ libpqwp_async_read(WalProposerConn *conn, char **buf, int *amount)
ExecStatusType status = PQresultStatus(PQgetResult(conn->pg_conn));
if (status != PGRES_FATAL_ERROR)
elog(FATAL, "unexpected result status %d after failed PQgetCopyData", status);
wpg_log(FATAL, "unexpected result status %d after failed PQgetCopyData", status);
/*
* If there was actually an error, it'll be properly reported
@@ -937,7 +937,7 @@ walprop_async_write(Safekeeper *sk, void const *buf, size_t size)
case -1:
return PG_ASYNC_WRITE_FAIL;
default:
elog(FATAL, "invalid return %d from PQputCopyData", result);
wpg_log(FATAL, "invalid return %d from PQputCopyData", result);
}
/*
@@ -958,7 +958,7 @@ walprop_async_write(Safekeeper *sk, void const *buf, size_t size)
case -1:
return PG_ASYNC_WRITE_FAIL;
default:
elog(FATAL, "invalid return %d from PQflush", result);
wpg_log(FATAL, "invalid return %d from PQflush", result);
}
}
@@ -1247,8 +1247,8 @@ WalProposerRecovery(WalProposer *wp, Safekeeper *sk)
if (max_slot_wal_keep_size_mb > 0 && download_range_mb >= max_slot_wal_keep_size_mb)
{
startpos = endpos - max_slot_wal_keep_size_mb * 1024 * 1024;
walprop_log(WARNING, "capped WAL download for logical replication to %X/%X as max_slot_wal_keep_size=%dMB",
LSN_FORMAT_ARGS(startpos), max_slot_wal_keep_size_mb);
wpg_log(WARNING, "capped WAL download for logical replication to %X/%X as max_slot_wal_keep_size=%dMB",
LSN_FORMAT_ARGS(startpos), max_slot_wal_keep_size_mb);
}
timeline = wp->greetRequest.timeline;
@@ -1262,7 +1262,7 @@ WalProposerRecovery(WalProposer *wp, Safekeeper *sk)
written = snprintf((char *) conninfo, MAXCONNINFO, "password=%s %s", neon_auth_token, sk->conninfo);
if (written > MAXCONNINFO || written < 0)
elog(FATAL, "could not append password to the safekeeper connection string");
wpg_log(FATAL, "could not append password to the safekeeper connection string");
}
#if PG_MAJORVERSION_NUM < 16
@@ -1279,11 +1279,11 @@ WalProposerRecovery(WalProposer *wp, Safekeeper *sk)
err)));
return false;
}
elog(LOG,
"start recovery for logical replication from %s:%s starting from %X/%08X till %X/%08X timeline "
"%d",
sk->host, sk->port, (uint32) (startpos >> 32),
(uint32) startpos, (uint32) (endpos >> 32), (uint32) endpos, timeline);
wpg_log(LOG,
"start recovery for logical replication from %s:%s starting from %X/%08X till %X/%08X timeline "
"%d",
sk->host, sk->port, (uint32) (startpos >> 32),
(uint32) startpos, (uint32) (endpos >> 32), (uint32) endpos, timeline);
options.logical = false;
options.startpoint = startpos;
@@ -1481,11 +1481,11 @@ walprop_pg_wal_reader_allocate(Safekeeper *sk)
{
char log_prefix[64];
snprintf(log_prefix, sizeof(log_prefix), "sk %s:%s nwr: ", sk->host, sk->port);
snprintf(log_prefix, sizeof(log_prefix), WP_LOG_PREFIX "sk %s:%s nwr: ", sk->host, sk->port);
Assert(!sk->xlogreader);
sk->xlogreader = NeonWALReaderAllocate(wal_segment_size, sk->wp->propEpochStartLsn, sk->wp, log_prefix);
if (sk->xlogreader == NULL)
elog(FATAL, "Failed to allocate xlog reader");
wpg_log(FATAL, "failed to allocate xlog reader");
}
static NeonWALReadResult
@@ -1549,7 +1549,7 @@ static void
walprop_pg_init_event_set(WalProposer *wp)
{
if (waitEvents)
elog(FATAL, "double-initialization of event set");
wpg_log(FATAL, "double-initialization of event set");
/* for each sk, we have socket plus potentially socket for neon walreader */
waitEvents = CreateWaitEventSet(TopMemoryContext, 2 + 2 * wp->n_safekeepers);
@@ -1581,7 +1581,7 @@ add_nwr_event_set(Safekeeper *sk, uint32 events)
Assert(sk->nwrEventPos == -1);
sk->nwrEventPos = AddWaitEventToSet(waitEvents, events, NeonWALReaderSocket(sk->xlogreader), NULL, sk);
sk->nwrConnEstablished = NeonWALReaderIsRemConnEstablished(sk->xlogreader);
elog(DEBUG5, "sk %s:%s: added nwr socket events %d", sk->host, sk->port, events);
wpg_log(DEBUG5, "sk %s:%s: added nwr socket events %d", sk->host, sk->port, events);
}
static void
@@ -1680,8 +1680,8 @@ rm_safekeeper_event_set(Safekeeper *to_remove, bool is_sk)
{
WalProposer *wp = to_remove->wp;
elog(DEBUG5, "sk %s:%s: removing event, is_sk %d",
to_remove->host, to_remove->port, is_sk);
wpg_log(DEBUG5, "sk %s:%s: removing event, is_sk %d",
to_remove->host, to_remove->port, is_sk);
/*
* Shortpath for exiting if have nothing to do. We never call this
@@ -1835,13 +1835,13 @@ GetLatestNeonFeedback(PageserverFeedback *rf, WalProposer *wp)
rf->remote_consistent_lsn = wp->safekeeper[latest_safekeeper].appendResponse.rf.remote_consistent_lsn;
rf->replytime = wp->safekeeper[latest_safekeeper].appendResponse.rf.replytime;
elog(DEBUG2, "GetLatestNeonFeedback: currentClusterSize %lu,"
" last_received_lsn %X/%X, disk_consistent_lsn %X/%X, remote_consistent_lsn %X/%X, replytime %lu",
rf->currentClusterSize,
LSN_FORMAT_ARGS(rf->last_received_lsn),
LSN_FORMAT_ARGS(rf->disk_consistent_lsn),
LSN_FORMAT_ARGS(rf->remote_consistent_lsn),
rf->replytime);
wpg_log(DEBUG2, "GetLatestNeonFeedback: currentClusterSize %lu,"
" last_received_lsn %X/%X, disk_consistent_lsn %X/%X, remote_consistent_lsn %X/%X, replytime %lu",
rf->currentClusterSize,
LSN_FORMAT_ARGS(rf->last_received_lsn),
LSN_FORMAT_ARGS(rf->disk_consistent_lsn),
LSN_FORMAT_ARGS(rf->remote_consistent_lsn),
rf->replytime);
}
/*
@@ -1987,7 +1987,7 @@ GetLogRepRestartLSN(WalProposer *wp)
{
uint64 download_range_mb;
elog(LOG, "logical replication restart LSN %X/%X", LSN_FORMAT_ARGS(lrRestartLsn));
wpg_log(LOG, "logical replication restart LSN %X/%X", LSN_FORMAT_ARGS(lrRestartLsn));
/*
* If we need to download more than a max_slot_wal_keep_size,
@@ -1999,8 +1999,8 @@ GetLogRepRestartLSN(WalProposer *wp)
download_range_mb = (wp->propEpochStartLsn - lrRestartLsn) / MB;
if (max_slot_wal_keep_size_mb > 0 && download_range_mb >= max_slot_wal_keep_size_mb)
{
walprop_log(WARNING, "not downloading WAL for logical replication since %X/%X as max_slot_wal_keep_size=%dMB",
LSN_FORMAT_ARGS(lrRestartLsn), max_slot_wal_keep_size_mb);
wpg_log(WARNING, "not downloading WAL for logical replication since %X/%X as max_slot_wal_keep_size=%dMB",
LSN_FORMAT_ARGS(lrRestartLsn), max_slot_wal_keep_size_mb);
return InvalidXLogRecPtr;
}