From ae3eaf99957433b2df51aa79fb7b63f6959156f9 Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Tue, 2 Jan 2024 20:27:53 +0300 Subject: [PATCH] Add [WP] prefix to all walproposer logging. - rename walpop_log to wp_log - create also wpg_log which is used in postgres-specific code - in passing format messages to start with lower case --- libs/walproposer/src/walproposer.rs | 2 +- pgxn/neon/walproposer.c | 240 ++++++++++++++-------------- pgxn/neon/walproposer.h | 16 +- pgxn/neon/walproposer_pg.c | 68 ++++---- 4 files changed, 169 insertions(+), 157 deletions(-) diff --git a/libs/walproposer/src/walproposer.rs b/libs/walproposer/src/walproposer.rs index 35c8f6904d..7251545792 100644 --- a/libs/walproposer/src/walproposer.rs +++ b/libs/walproposer/src/walproposer.rs @@ -425,7 +425,7 @@ mod tests { } fn log_internal(&self, _wp: &mut crate::bindings::WalProposer, level: Level, msg: &str) { - println!("walprop_log[{}] {}", level, msg); + println!("wp_log[{}] {}", level, msg); } fn after_election(&self, _wp: &mut crate::bindings::WalProposer) { diff --git a/pgxn/neon/walproposer.c b/pgxn/neon/walproposer.c index 7fb0cab9a0..2ea724f927 100644 --- a/pgxn/neon/walproposer.c +++ b/pgxn/neon/walproposer.c @@ -99,7 +99,7 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api) port = strchr(host, ':'); if (port == NULL) { - walprop_log(FATAL, "port is not specified"); + wp_log(FATAL, "port is not specified"); } *port++ = '\0'; sep = strchr(port, ','); @@ -107,7 +107,7 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api) *sep++ = '\0'; if (wp->n_safekeepers + 1 >= MAX_SAFEKEEPERS) { - walprop_log(FATAL, "Too many safekeepers"); + wp_log(FATAL, "too many safekeepers"); } wp->safekeeper[wp->n_safekeepers].host = host; wp->safekeeper[wp->n_safekeepers].port = port; @@ -123,7 +123,7 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api) "host=%s port=%s dbname=replication options='-c timeline_id=%s tenant_id=%s'", sk->host, sk->port, wp->config->neon_timeline, wp->config->neon_tenant); if (written > MAXCONNINFO || written < 0) - walprop_log(FATAL, "could not create connection string for safekeeper %s:%s", sk->host, sk->port); + wp_log(FATAL, "could not create connection string for safekeeper %s:%s", sk->host, sk->port); } initStringInfo(&wp->safekeeper[wp->n_safekeepers].outbuf); @@ -133,7 +133,7 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api) } if (wp->n_safekeepers < 1) { - walprop_log(FATAL, "Safekeepers addresses are not specified"); + wp_log(FATAL, "safekeepers addresses are not specified"); } wp->quorum = wp->n_safekeepers / 2 + 1; @@ -144,15 +144,15 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api) wp->api.strong_random(wp, &wp->greetRequest.proposerId, sizeof(wp->greetRequest.proposerId)); wp->greetRequest.systemId = wp->config->systemId; if (!wp->config->neon_timeline) - walprop_log(FATAL, "neon.timeline_id is not provided"); + wp_log(FATAL, "neon.timeline_id is not provided"); if (*wp->config->neon_timeline != '\0' && !HexDecodeString(wp->greetRequest.timeline_id, wp->config->neon_timeline, 16)) - walprop_log(FATAL, "Could not parse neon.timeline_id, %s", wp->config->neon_timeline); + wp_log(FATAL, "could not parse neon.timeline_id, %s", wp->config->neon_timeline); if (!wp->config->neon_tenant) - walprop_log(FATAL, "neon.tenant_id is not provided"); + wp_log(FATAL, "neon.tenant_id is not provided"); if (*wp->config->neon_tenant != '\0' && !HexDecodeString(wp->greetRequest.tenant_id, wp->config->neon_tenant, 16)) - walprop_log(FATAL, "Could not parse neon.tenant_id, %s", wp->config->neon_tenant); + wp_log(FATAL, "could not parse neon.tenant_id, %s", wp->config->neon_tenant); wp->greetRequest.timeline = wp->config->pgTimeline; wp->greetRequest.walSegSize = wp->config->wal_segment_size; @@ -274,8 +274,8 @@ WalProposerPoll(WalProposer *wp) if (TimestampDifferenceExceeds(sk->latestMsgReceivedAt, now, wp->config->safekeeper_connection_timeout)) { - walprop_log(WARNING, "terminating connection to safekeeper '%s:%s' in '%s' state: no messages received during the last %dms or connection attempt took longer than that", - sk->host, sk->port, FormatSafekeeperState(sk), wp->config->safekeeper_connection_timeout); + wp_log(WARNING, "terminating connection to safekeeper '%s:%s' in '%s' state: no messages received during the last %dms or connection attempt took longer than that", + sk->host, sk->port, FormatSafekeeperState(sk), wp->config->safekeeper_connection_timeout); ShutdownConnection(sk); } } @@ -356,8 +356,8 @@ ResetConnection(Safekeeper *sk) * * https://www.postgresql.org/docs/devel/libpq-connect.html#LIBPQ-PQCONNECTSTARTPARAMS */ - walprop_log(WARNING, "Immediate failure to connect with node '%s:%s':\n\terror: %s", - sk->host, sk->port, wp->api.conn_error_message(sk)); + wp_log(WARNING, "immediate failure to connect with node '%s:%s':\n\terror: %s", + sk->host, sk->port, wp->api.conn_error_message(sk)); /* * Even though the connection failed, we still need to clean up the @@ -380,7 +380,7 @@ ResetConnection(Safekeeper *sk) * (see libpqrcv_connect, defined in * src/backend/replication/libpqwalreceiver/libpqwalreceiver.c) */ - walprop_log(LOG, "connecting with node %s:%s", sk->host, sk->port); + wp_log(LOG, "connecting with node %s:%s", sk->host, sk->port); sk->state = SS_CONNECTING_WRITE; sk->latestMsgReceivedAt = wp->api.get_current_timestamp(wp); @@ -434,7 +434,7 @@ ReconnectSafekeepers(WalProposer *wp) static void AdvancePollState(Safekeeper *sk, uint32 events) { -#ifdef WALPROPOSER_LIB /* walprop_log needs wp in lib build */ +#ifdef WALPROPOSER_LIB /* wp_log needs wp in lib build */ WalProposer *wp = sk->wp; #endif @@ -452,8 +452,8 @@ AdvancePollState(Safekeeper *sk, uint32 events) * ResetConnection */ case SS_OFFLINE: - walprop_log(FATAL, "Unexpected safekeeper %s:%s state advancement: is offline", - sk->host, sk->port); + wp_log(FATAL, "unexpected safekeeper %s:%s state advancement: is offline", + sk->host, sk->port); break; /* actually unreachable, but prevents * -Wimplicit-fallthrough */ @@ -488,8 +488,8 @@ AdvancePollState(Safekeeper *sk, uint32 events) * requests. */ case SS_VOTING: - walprop_log(WARNING, "EOF from node %s:%s in %s state", sk->host, - sk->port, FormatSafekeeperState(sk)); + wp_log(WARNING, "EOF from node %s:%s in %s state", sk->host, + sk->port, FormatSafekeeperState(sk)); ResetConnection(sk); return; @@ -517,8 +517,8 @@ AdvancePollState(Safekeeper *sk, uint32 events) * Idle state for waiting votes from quorum. */ case SS_IDLE: - walprop_log(WARNING, "EOF from node %s:%s in %s state", sk->host, - sk->port, FormatSafekeeperState(sk)); + wp_log(WARNING, "EOF from node %s:%s in %s state", sk->host, + sk->port, FormatSafekeeperState(sk)); ResetConnection(sk); return; @@ -543,8 +543,8 @@ HandleConnectionEvent(Safekeeper *sk) switch (result) { case WP_CONN_POLLING_OK: - walprop_log(LOG, "connected with node %s:%s", sk->host, - sk->port); + wp_log(LOG, "connected with node %s:%s", sk->host, + sk->port); sk->latestMsgReceivedAt = wp->api.get_current_timestamp(wp); /* @@ -567,8 +567,8 @@ HandleConnectionEvent(Safekeeper *sk) break; case WP_CONN_POLLING_FAILED: - walprop_log(WARNING, "failed to connect to node '%s:%s': %s", - sk->host, sk->port, wp->api.conn_error_message(sk)); + wp_log(WARNING, "failed to connect to node '%s:%s': %s", + sk->host, sk->port, wp->api.conn_error_message(sk)); /* * If connecting failed, we don't want to restart the connection @@ -604,8 +604,8 @@ SendStartWALPush(Safekeeper *sk) if (!wp->api.conn_send_query(sk, "START_WAL_PUSH")) { - walprop_log(WARNING, "Failed to send 'START_WAL_PUSH' query to safekeeper %s:%s: %s", - sk->host, sk->port, wp->api.conn_error_message(sk)); + wp_log(WARNING, "failed to send 'START_WAL_PUSH' query to safekeeper %s:%s: %s", + sk->host, sk->port, wp->api.conn_error_message(sk)); ShutdownConnection(sk); return; } @@ -641,8 +641,8 @@ RecvStartWALPushResult(Safekeeper *sk) break; case WP_EXEC_FAILED: - walprop_log(WARNING, "Failed to send query to safekeeper %s:%s: %s", - sk->host, sk->port, wp->api.conn_error_message(sk)); + wp_log(WARNING, "failed to send query to safekeeper %s:%s: %s", + sk->host, sk->port, wp->api.conn_error_message(sk)); ShutdownConnection(sk); return; @@ -652,8 +652,8 @@ RecvStartWALPushResult(Safekeeper *sk) * wrong" */ case WP_EXEC_UNEXPECTED_SUCCESS: - walprop_log(WARNING, "Received bad response from safekeeper %s:%s query execution", - sk->host, sk->port); + wp_log(WARNING, "received bad response from safekeeper %s:%s query execution", + sk->host, sk->port); ShutdownConnection(sk); return; } @@ -688,7 +688,7 @@ RecvAcceptorGreeting(Safekeeper *sk) if (!AsyncReadMessage(sk, (AcceptorProposerMessage *) &sk->greetResponse)) return; - walprop_log(LOG, "received AcceptorGreeting from safekeeper %s:%s", sk->host, sk->port); + wp_log(LOG, "received AcceptorGreeting from safekeeper %s:%s", sk->host, sk->port); /* Protocol is all good, move to voting. */ sk->state = SS_VOTING; @@ -708,7 +708,7 @@ RecvAcceptorGreeting(Safekeeper *sk) if (wp->n_connected == wp->quorum) { wp->propTerm++; - walprop_log(LOG, "proposer connected to quorum (%d) safekeepers, propTerm=" INT64_FORMAT, wp->quorum, wp->propTerm); + wp_log(LOG, "proposer connected to quorum (%d) safekeepers, propTerm=" INT64_FORMAT, wp->quorum, wp->propTerm); wp->voteRequest = (VoteRequest) { @@ -721,9 +721,9 @@ RecvAcceptorGreeting(Safekeeper *sk) else if (sk->greetResponse.term > wp->propTerm) { /* Another compute with higher term is running. */ - walprop_log(FATAL, "WAL acceptor %s:%s with term " INT64_FORMAT " rejects our connection request with term " INT64_FORMAT "", - sk->host, sk->port, - sk->greetResponse.term, wp->propTerm); + wp_log(FATAL, "WAL acceptor %s:%s with term " INT64_FORMAT " rejects our connection request with term " INT64_FORMAT "", + sk->host, sk->port, + sk->greetResponse.term, wp->propTerm); } /* @@ -763,7 +763,7 @@ SendVoteRequest(Safekeeper *sk) WalProposer *wp = sk->wp; /* We have quorum for voting, send our vote request */ - walprop_log(LOG, "requesting vote from %s:%s for term " UINT64_FORMAT, sk->host, sk->port, wp->voteRequest.term); + wp_log(LOG, "requesting vote from %s:%s for term " UINT64_FORMAT, sk->host, sk->port, wp->voteRequest.term); /* On failure, logging & resetting is handled */ if (!BlockingWrite(sk, &wp->voteRequest, sizeof(wp->voteRequest), SS_WAIT_VERDICT)) return; @@ -780,12 +780,12 @@ RecvVoteResponse(Safekeeper *sk) if (!AsyncReadMessage(sk, (AcceptorProposerMessage *) &sk->voteResponse)) return; - walprop_log(LOG, - "got VoteResponse from acceptor %s:%s, voteGiven=" UINT64_FORMAT ", epoch=" UINT64_FORMAT ", flushLsn=%X/%X, truncateLsn=%X/%X, timelineStartLsn=%X/%X", - sk->host, sk->port, sk->voteResponse.voteGiven, GetHighestTerm(&sk->voteResponse.termHistory), - LSN_FORMAT_ARGS(sk->voteResponse.flushLsn), - LSN_FORMAT_ARGS(sk->voteResponse.truncateLsn), - LSN_FORMAT_ARGS(sk->voteResponse.timelineStartLsn)); + wp_log(LOG, + "got VoteResponse from acceptor %s:%s, voteGiven=" UINT64_FORMAT ", epoch=" UINT64_FORMAT ", flushLsn=%X/%X, truncateLsn=%X/%X, timelineStartLsn=%X/%X", + sk->host, sk->port, sk->voteResponse.voteGiven, GetHighestTerm(&sk->voteResponse.termHistory), + LSN_FORMAT_ARGS(sk->voteResponse.flushLsn), + LSN_FORMAT_ARGS(sk->voteResponse.truncateLsn), + LSN_FORMAT_ARGS(sk->voteResponse.timelineStartLsn)); /* * In case of acceptor rejecting our vote, bail out, but only if either it @@ -795,9 +795,9 @@ RecvVoteResponse(Safekeeper *sk) if ((!sk->voteResponse.voteGiven) && (sk->voteResponse.term > wp->propTerm || wp->n_votes < wp->quorum)) { - walprop_log(FATAL, "WAL acceptor %s:%s with term " INT64_FORMAT " rejects our connection request with term " INT64_FORMAT "", - sk->host, sk->port, - sk->voteResponse.term, wp->propTerm); + wp_log(FATAL, "WAL acceptor %s:%s with term " INT64_FORMAT " rejects our connection request with term " INT64_FORMAT "", + sk->host, sk->port, + sk->voteResponse.term, wp->propTerm); } Assert(sk->voteResponse.term == wp->propTerm); @@ -841,7 +841,7 @@ HandleElectedProposer(WalProposer *wp) */ if (!wp->api.recovery_download(wp, &wp->safekeeper[wp->donor])) { - walprop_log(FATAL, "failed to download WAL for logical replicaiton"); + wp_log(FATAL, "failed to download WAL for logical replicaiton"); } if (wp->truncateLsn == wp->propEpochStartLsn && wp->config->syncSafekeepers) @@ -948,10 +948,10 @@ DetermineEpochStartLsn(WalProposer *wp) if (wp->timelineStartLsn != InvalidXLogRecPtr && wp->timelineStartLsn != wp->safekeeper[i].voteResponse.timelineStartLsn) { - walprop_log(WARNING, - "inconsistent timelineStartLsn: current %X/%X, received %X/%X", - LSN_FORMAT_ARGS(wp->timelineStartLsn), - LSN_FORMAT_ARGS(wp->safekeeper[i].voteResponse.timelineStartLsn)); + wp_log(WARNING, + "inconsistent timelineStartLsn: current %X/%X, received %X/%X", + LSN_FORMAT_ARGS(wp->timelineStartLsn), + LSN_FORMAT_ARGS(wp->safekeeper[i].voteResponse.timelineStartLsn)); } wp->timelineStartLsn = wp->safekeeper[i].voteResponse.timelineStartLsn; } @@ -969,7 +969,7 @@ DetermineEpochStartLsn(WalProposer *wp) { wp->timelineStartLsn = wp->api.get_redo_start_lsn(wp); } - walprop_log(LOG, "bumped epochStartLsn to the first record %X/%X", LSN_FORMAT_ARGS(wp->propEpochStartLsn)); + wp_log(LOG, "bumped epochStartLsn to the first record %X/%X", LSN_FORMAT_ARGS(wp->propEpochStartLsn)); } /* @@ -996,12 +996,12 @@ DetermineEpochStartLsn(WalProposer *wp) wp->propTermHistory.entries[wp->propTermHistory.n_entries - 1].term = wp->propTerm; wp->propTermHistory.entries[wp->propTermHistory.n_entries - 1].lsn = wp->propEpochStartLsn; - walprop_log(LOG, "got votes from majority (%d) of nodes, term " UINT64_FORMAT ", epochStartLsn %X/%X, donor %s:%s, truncate_lsn %X/%X", - wp->quorum, - wp->propTerm, - LSN_FORMAT_ARGS(wp->propEpochStartLsn), - wp->safekeeper[wp->donor].host, wp->safekeeper[wp->donor].port, - LSN_FORMAT_ARGS(wp->truncateLsn)); + wp_log(LOG, "got votes from majority (%d) of nodes, term " UINT64_FORMAT ", epochStartLsn %X/%X, donor %s:%s, truncate_lsn %X/%X", + wp->quorum, + wp->propTerm, + LSN_FORMAT_ARGS(wp->propEpochStartLsn), + wp->safekeeper[wp->donor].host, wp->safekeeper[wp->donor].port, + LSN_FORMAT_ARGS(wp->truncateLsn)); /* * Ensure the basebackup we are running (at RedoStartLsn) matches LSN @@ -1034,10 +1034,10 @@ DetermineEpochStartLsn(WalProposer *wp) * scenario. */ disable_core_dump(); - walprop_log(PANIC, - "collected propEpochStartLsn %X/%X, but basebackup LSN %X/%X", - LSN_FORMAT_ARGS(wp->propEpochStartLsn), - LSN_FORMAT_ARGS(wp->api.get_redo_start_lsn(wp))); + wp_log(PANIC, + "collected propEpochStartLsn %X/%X, but basebackup LSN %X/%X", + LSN_FORMAT_ARGS(wp->propEpochStartLsn), + LSN_FORMAT_ARGS(wp->api.get_redo_start_lsn(wp))); } } walprop_shared->mineLastElectedTerm = wp->propTerm; @@ -1115,9 +1115,9 @@ SendProposerElected(Safekeeper *sk) */ sk->startStreamingAt = wp->truncateLsn; - walprop_log(WARNING, "empty safekeeper joined cluster as %s:%s, historyStart=%X/%X, sk->startStreamingAt=%X/%X", - sk->host, sk->port, LSN_FORMAT_ARGS(wp->propTermHistory.entries[0].lsn), - LSN_FORMAT_ARGS(sk->startStreamingAt)); + wp_log(WARNING, "empty safekeeper joined cluster as %s:%s, historyStart=%X/%X, sk->startStreamingAt=%X/%X", + sk->host, sk->port, LSN_FORMAT_ARGS(wp->propTermHistory.entries[0].lsn), + LSN_FORMAT_ARGS(sk->startStreamingAt)); } } else @@ -1150,9 +1150,9 @@ SendProposerElected(Safekeeper *sk) msg.timelineStartLsn = wp->timelineStartLsn; lastCommonTerm = i >= 0 ? wp->propTermHistory.entries[i].term : 0; - walprop_log(LOG, - "sending elected msg to node " UINT64_FORMAT " term=" UINT64_FORMAT ", startStreamingAt=%X/%X (lastCommonTerm=" UINT64_FORMAT "), termHistory.n_entries=%u to %s:%s, timelineStartLsn=%X/%X", - sk->greetResponse.nodeId, msg.term, LSN_FORMAT_ARGS(msg.startStreamingAt), lastCommonTerm, msg.termHistory->n_entries, sk->host, sk->port, LSN_FORMAT_ARGS(msg.timelineStartLsn)); + wp_log(LOG, + "sending elected msg to node " UINT64_FORMAT " term=" UINT64_FORMAT ", startStreamingAt=%X/%X (lastCommonTerm=" UINT64_FORMAT "), termHistory.n_entries=%u to %s:%s, timelineStartLsn=%X/%X", + sk->greetResponse.nodeId, msg.term, LSN_FORMAT_ARGS(msg.startStreamingAt), lastCommonTerm, msg.termHistory->n_entries, sk->host, sk->port, LSN_FORMAT_ARGS(msg.timelineStartLsn)); resetStringInfo(&sk->outbuf); pq_sendint64_le(&sk->outbuf, msg.tag); @@ -1261,8 +1261,8 @@ HandleActiveState(Safekeeper *sk, uint32 events) /* expected never to happen, c.f. walprop_pg_active_state_update_event_set */ if (events & WL_SOCKET_CLOSED) { - walprop_log(WARNING, "connection to %s:%s in active state failed, got WL_SOCKET_CLOSED on neon_walreader socket", - sk->host, sk->port); + wp_log(WARNING, "connection to %s:%s in active state failed, got WL_SOCKET_CLOSED on neon_walreader socket", + sk->host, sk->port); ShutdownConnection(sk); return; } @@ -1323,12 +1323,12 @@ SendAppendRequests(Safekeeper *sk) req = &sk->appendRequest; PrepareAppendRequest(sk->wp, &sk->appendRequest, sk->streamingAt, endLsn); - walprop_log(DEBUG5, "sending message len %ld beginLsn=%X/%X endLsn=%X/%X commitLsn=%X/%X truncateLsn=%X/%X to %s:%s", - req->endLsn - req->beginLsn, - LSN_FORMAT_ARGS(req->beginLsn), - LSN_FORMAT_ARGS(req->endLsn), - LSN_FORMAT_ARGS(req->commitLsn), - LSN_FORMAT_ARGS(wp->truncateLsn), sk->host, sk->port); + wp_log(DEBUG5, "sending message len %ld beginLsn=%X/%X endLsn=%X/%X commitLsn=%X/%X truncateLsn=%X/%X to %s:%s", + req->endLsn - req->beginLsn, + LSN_FORMAT_ARGS(req->beginLsn), + LSN_FORMAT_ARGS(req->endLsn), + LSN_FORMAT_ARGS(req->commitLsn), + LSN_FORMAT_ARGS(wp->truncateLsn), sk->host, sk->port); resetStringInfo(&sk->outbuf); @@ -1355,8 +1355,8 @@ SendAppendRequests(Safekeeper *sk) case NEON_WALREAD_WOULDBLOCK: return true; case NEON_WALREAD_ERROR: - walprop_log(WARNING, "WAL reading for node %s:%s failed: %s", - sk->host, sk->port, errmsg); + wp_log(WARNING, "WAL reading for node %s:%s failed: %s", + sk->host, sk->port, errmsg); ShutdownConnection(sk); return false; default: @@ -1388,9 +1388,9 @@ SendAppendRequests(Safekeeper *sk) return true; case PG_ASYNC_WRITE_FAIL: - walprop_log(WARNING, "failed to send to node %s:%s in %s state: %s", - sk->host, sk->port, FormatSafekeeperState(sk), - wp->api.conn_error_message(sk)); + wp_log(WARNING, "failed to send to node %s:%s in %s state: %s", + sk->host, sk->port, FormatSafekeeperState(sk), + wp->api.conn_error_message(sk)); ShutdownConnection(sk); return false; default: @@ -1429,11 +1429,11 @@ RecvAppendResponses(Safekeeper *sk) if (!AsyncReadMessage(sk, (AcceptorProposerMessage *) &sk->appendResponse)) break; - walprop_log(DEBUG2, "received message term=" INT64_FORMAT " flushLsn=%X/%X commitLsn=%X/%X from %s:%s", - sk->appendResponse.term, - LSN_FORMAT_ARGS(sk->appendResponse.flushLsn), - LSN_FORMAT_ARGS(sk->appendResponse.commitLsn), - sk->host, sk->port); + wp_log(DEBUG2, "received message term=" INT64_FORMAT " flushLsn=%X/%X commitLsn=%X/%X from %s:%s", + sk->appendResponse.term, + LSN_FORMAT_ARGS(sk->appendResponse.flushLsn), + LSN_FORMAT_ARGS(sk->appendResponse.commitLsn), + sk->host, sk->port); if (sk->appendResponse.term > wp->propTerm) { @@ -1443,9 +1443,9 @@ RecvAppendResponses(Safekeeper *sk) * core as this is kinda expected scenario. */ disable_core_dump(); - walprop_log(PANIC, "WAL acceptor %s:%s with term " INT64_FORMAT " rejected our request, our term " INT64_FORMAT "", - sk->host, sk->port, - sk->appendResponse.term, wp->propTerm); + wp_log(PANIC, "WAL acceptor %s:%s with term " INT64_FORMAT " rejected our request, our term " INT64_FORMAT "", + sk->host, sk->port, + sk->appendResponse.term, wp->propTerm); } readAnything = true; @@ -1489,32 +1489,32 @@ ParsePageserverFeedbackMessage(WalProposer *wp, StringInfo reply_message, Pagese pq_getmsgint(reply_message, sizeof(int32)); /* read value length */ rf->currentClusterSize = pq_getmsgint64(reply_message); - walprop_log(DEBUG2, "ParsePageserverFeedbackMessage: current_timeline_size %lu", - rf->currentClusterSize); + wp_log(DEBUG2, "ParsePageserverFeedbackMessage: current_timeline_size %lu", + rf->currentClusterSize); } else if ((strcmp(key, "ps_writelsn") == 0) || (strcmp(key, "last_received_lsn") == 0)) { pq_getmsgint(reply_message, sizeof(int32)); /* read value length */ rf->last_received_lsn = pq_getmsgint64(reply_message); - walprop_log(DEBUG2, "ParsePageserverFeedbackMessage: last_received_lsn %X/%X", - LSN_FORMAT_ARGS(rf->last_received_lsn)); + wp_log(DEBUG2, "ParsePageserverFeedbackMessage: last_received_lsn %X/%X", + LSN_FORMAT_ARGS(rf->last_received_lsn)); } else if ((strcmp(key, "ps_flushlsn") == 0) || (strcmp(key, "disk_consistent_lsn") == 0)) { pq_getmsgint(reply_message, sizeof(int32)); /* read value length */ rf->disk_consistent_lsn = pq_getmsgint64(reply_message); - walprop_log(DEBUG2, "ParsePageserverFeedbackMessage: disk_consistent_lsn %X/%X", - LSN_FORMAT_ARGS(rf->disk_consistent_lsn)); + wp_log(DEBUG2, "ParsePageserverFeedbackMessage: disk_consistent_lsn %X/%X", + LSN_FORMAT_ARGS(rf->disk_consistent_lsn)); } else if ((strcmp(key, "ps_applylsn") == 0) || (strcmp(key, "remote_consistent_lsn") == 0)) { pq_getmsgint(reply_message, sizeof(int32)); /* read value length */ rf->remote_consistent_lsn = pq_getmsgint64(reply_message); - walprop_log(DEBUG2, "ParsePageserverFeedbackMessage: remote_consistent_lsn %X/%X", - LSN_FORMAT_ARGS(rf->remote_consistent_lsn)); + wp_log(DEBUG2, "ParsePageserverFeedbackMessage: remote_consistent_lsn %X/%X", + LSN_FORMAT_ARGS(rf->remote_consistent_lsn)); } else if ((strcmp(key, "ps_replytime") == 0) || (strcmp(key, "replytime") == 0)) { @@ -1526,8 +1526,8 @@ ParsePageserverFeedbackMessage(WalProposer *wp, StringInfo reply_message, Pagese /* Copy because timestamptz_to_str returns a static buffer */ replyTimeStr = pstrdup(timestamptz_to_str(rf->replytime)); - walprop_log(DEBUG2, "ParsePageserverFeedbackMessage: replytime %lu reply_time: %s", - rf->replytime, replyTimeStr); + wp_log(DEBUG2, "ParsePageserverFeedbackMessage: replytime %lu reply_time: %s", + rf->replytime, replyTimeStr); pfree(replyTimeStr); } @@ -1541,7 +1541,7 @@ ParsePageserverFeedbackMessage(WalProposer *wp, StringInfo reply_message, Pagese * Skip unknown keys to support backward compatibile protocol * changes */ - walprop_log(LOG, "ParsePageserverFeedbackMessage: unknown key: %s len %d", key, len); + wp_log(LOG, "ParsePageserverFeedbackMessage: unknown key: %s len %d", key, len); pq_getmsgbytes(reply_message, len); }; } @@ -1606,7 +1606,7 @@ GetDonor(WalProposer *wp, XLogRecPtr *donor_lsn) if (wp->n_votes < wp->quorum) { - walprop_log(WARNING, "GetDonor called before elections are won"); + wp_log(WARNING, "GetDonor called before elections are won"); return NULL; } @@ -1734,9 +1734,9 @@ AsyncRead(Safekeeper *sk, char **buf, int *buf_size) return false; case PG_ASYNC_READ_FAIL: - walprop_log(WARNING, "Failed to read from node %s:%s in %s state: %s", sk->host, - sk->port, FormatSafekeeperState(sk), - wp->api.conn_error_message(sk)); + wp_log(WARNING, "failed to read from node %s:%s in %s state: %s", sk->host, + sk->port, FormatSafekeeperState(sk), + wp->api.conn_error_message(sk)); ShutdownConnection(sk); return false; } @@ -1774,8 +1774,8 @@ AsyncReadMessage(Safekeeper *sk, AcceptorProposerMessage *anymsg) tag = pq_getmsgint64_le(&s); if (tag != anymsg->tag) { - walprop_log(WARNING, "unexpected message tag %c from node %s:%s in state %s", (char) tag, sk->host, - sk->port, FormatSafekeeperState(sk)); + wp_log(WARNING, "unexpected message tag %c from node %s:%s in state %s", (char) tag, sk->host, + sk->port, FormatSafekeeperState(sk)); ResetConnection(sk); return false; } @@ -1851,9 +1851,9 @@ BlockingWrite(Safekeeper *sk, void *msg, size_t msg_size, SafekeeperState succes if (!wp->api.conn_blocking_write(sk, msg, msg_size)) { - walprop_log(WARNING, "Failed to send to node %s:%s in %s state: %s", - sk->host, sk->port, FormatSafekeeperState(sk), - wp->api.conn_error_message(sk)); + wp_log(WARNING, "failed to send to node %s:%s in %s state: %s", + sk->host, sk->port, FormatSafekeeperState(sk), + wp->api.conn_error_message(sk)); ShutdownConnection(sk); return false; } @@ -1904,9 +1904,9 @@ AsyncWrite(Safekeeper *sk, void *msg, size_t msg_size, SafekeeperState flush_sta wp->api.update_event_set(sk, WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE); return false; case PG_ASYNC_WRITE_FAIL: - walprop_log(WARNING, "Failed to send to node %s:%s in %s state: %s", - sk->host, sk->port, FormatSafekeeperState(sk), - wp->api.conn_error_message(sk)); + wp_log(WARNING, "failed to send to node %s:%s in %s state: %s", + sk->host, sk->port, FormatSafekeeperState(sk), + wp->api.conn_error_message(sk)); ShutdownConnection(sk); return false; default: @@ -1943,9 +1943,9 @@ AsyncFlush(Safekeeper *sk) /* Nothing to do; try again when the socket's ready */ return false; case -1: - walprop_log(WARNING, "Failed to flush write to node %s:%s in %s state: %s", - sk->host, sk->port, FormatSafekeeperState(sk), - wp->api.conn_error_message(sk)); + wp_log(WARNING, "failed to flush write to node %s:%s in %s state: %s", + sk->host, sk->port, FormatSafekeeperState(sk), + wp->api.conn_error_message(sk)); ResetConnection(sk); return false; default: @@ -1974,11 +1974,11 @@ CompareLsn(const void *a, const void *b) * * The strings are intended to be used as a prefix to "state", e.g.: * - * walprop_log(LOG, "currently in %s state", FormatSafekeeperState(sk)); + * wp_log(LOG, "currently in %s state", FormatSafekeeperState(sk)); * * If this sort of phrasing doesn't fit the message, instead use something like: * - * walprop_log(LOG, "currently in state [%s]", FormatSafekeeperState(sk)); + * wp_log(LOG, "currently in state [%s]", FormatSafekeeperState(sk)); */ static char * FormatSafekeeperState(Safekeeper *sk) @@ -2059,8 +2059,8 @@ AssertEventsOkForState(uint32 events, Safekeeper *sk) * To give a descriptive message in the case of failure, we use elog * and then an assertion that's guaranteed to fail. */ - walprop_log(WARNING, "events %s mismatched for safekeeper %s:%s in state [%s]", - FormatEvents(wp, events), sk->host, sk->port, FormatSafekeeperState(sk)); + wp_log(WARNING, "events %s mismatched for safekeeper %s:%s in state [%s]", + FormatEvents(wp, events), sk->host, sk->port, FormatSafekeeperState(sk)); Assert(events_ok_for_state); } } @@ -2199,8 +2199,8 @@ FormatEvents(WalProposer *wp, uint32 events) if (events & (~all_flags)) { - walprop_log(WARNING, "Event formatting found unexpected component %d", - events & (~all_flags)); + wp_log(WARNING, "event formatting found unexpected component %d", + events & (~all_flags)); return_str[6] = '*'; return_str[7] = '\0'; } diff --git a/pgxn/neon/walproposer.h b/pgxn/neon/walproposer.h index 6d478076fe..688d8e6e52 100644 --- a/pgxn/neon/walproposer.h +++ b/pgxn/neon/walproposer.h @@ -707,11 +707,23 @@ extern Safekeeper *GetDonor(WalProposer *wp, XLogRecPtr *donor_lsn); #define WPEVENT 1337 /* special log level for walproposer internal * events */ +#define WP_LOG_PREFIX "[WP] " + +/* + * wp_log is used in pure wp code (walproposer.c), allowing API callback to + * catch logging. + */ #ifdef WALPROPOSER_LIB extern void WalProposerLibLog(WalProposer *wp, int elevel, char *fmt,...); -#define walprop_log(elevel, ...) WalProposerLibLog(wp, elevel, __VA_ARGS__) +#define wp_log(elevel, fmt, ...) WalProposerLibLog(wp, elevel, fmt, ## __VA_ARGS__) #else -#define walprop_log(elevel, ...) elog(elevel, __VA_ARGS__) +#define wp_log(elevel, fmt, ...) elog(elevel, WP_LOG_PREFIX fmt, ## __VA_ARGS__) #endif +/* + * And wpg_log is used all other (postgres specific) walproposer code, just + * adding prefix. + */ +#define wpg_log(elevel, fmt, ...) elog(elevel, WP_LOG_PREFIX fmt, ## __VA_ARGS__) + #endif /* __NEON_WALPROPOSER_H__ */ diff --git a/pgxn/neon/walproposer_pg.c b/pgxn/neon/walproposer_pg.c index 7773aabfab..a3edffa6cb 100644 --- a/pgxn/neon/walproposer_pg.c +++ b/pgxn/neon/walproposer_pg.c @@ -424,8 +424,8 @@ walprop_pg_start_streaming(WalProposer *wp, XLogRecPtr startpos) { StartReplicationCmd cmd; - elog(LOG, "WAL proposer starts streaming at %X/%X", - LSN_FORMAT_ARGS(startpos)); + wpg_log(LOG, "WAL proposer starts streaming at %X/%X", + LSN_FORMAT_ARGS(startpos)); cmd.slotname = WAL_PROPOSER_SLOT_NAME; cmd.timeline = wp->greetRequest.timeline; cmd.startpoint = startpos; @@ -549,7 +549,7 @@ walprop_pg_load_libpqwalreceiver(void) { load_file("libpqwalreceiver", false); if (WalReceiverFunctions == NULL) - elog(ERROR, "libpqwalreceiver didn't initialize correctly"); + wpg_log(ERROR, "libpqwalreceiver didn't initialize correctly"); } /* Helper function */ @@ -630,7 +630,7 @@ libpqwp_connect_start(char *conninfo) * PGconn structure" */ if (!pg_conn) - elog(FATAL, "failed to allocate new PGconn object"); + wpg_log(FATAL, "failed to allocate new PGconn object"); /* * And in theory this allocation can fail as well, but it's incredibly @@ -680,7 +680,7 @@ walprop_connect_poll(Safekeeper *sk) * unused. We'll expect it's never returned. */ case PGRES_POLLING_ACTIVE: - elog(FATAL, "Unexpected PGRES_POLLING_ACTIVE returned from PQconnectPoll"); + wpg_log(FATAL, "unexpected PGRES_POLLING_ACTIVE returned from PQconnectPoll"); /* * This return is never actually reached, but it's here to make @@ -745,7 +745,7 @@ libpqwp_get_query_result(WalProposerConn *conn) */ if (!result) { - elog(WARNING, "[libpqwalproposer] Unexpected successful end of command results"); + wpg_log(WARNING, "[libpqwalproposer] Unexpected successful end of command results"); return WP_EXEC_UNEXPECTED_SUCCESS; } @@ -793,7 +793,7 @@ libpqwp_get_query_result(WalProposerConn *conn) } if (unexpected_success) - elog(WARNING, "[libpqwalproposer] Unexpected successful %s", unexpected_success); + wpg_log(WARNING, "[libpqwalproposer] Unexpected successful %s", unexpected_success); return return_val; } @@ -872,7 +872,7 @@ libpqwp_async_read(WalProposerConn *conn, char **buf, int *amount) ExecStatusType status = PQresultStatus(PQgetResult(conn->pg_conn)); if (status != PGRES_FATAL_ERROR) - elog(FATAL, "unexpected result status %d after failed PQgetCopyData", status); + wpg_log(FATAL, "unexpected result status %d after failed PQgetCopyData", status); /* * If there was actually an error, it'll be properly reported @@ -937,7 +937,7 @@ walprop_async_write(Safekeeper *sk, void const *buf, size_t size) case -1: return PG_ASYNC_WRITE_FAIL; default: - elog(FATAL, "invalid return %d from PQputCopyData", result); + wpg_log(FATAL, "invalid return %d from PQputCopyData", result); } /* @@ -958,7 +958,7 @@ walprop_async_write(Safekeeper *sk, void const *buf, size_t size) case -1: return PG_ASYNC_WRITE_FAIL; default: - elog(FATAL, "invalid return %d from PQflush", result); + wpg_log(FATAL, "invalid return %d from PQflush", result); } } @@ -1247,8 +1247,8 @@ WalProposerRecovery(WalProposer *wp, Safekeeper *sk) if (max_slot_wal_keep_size_mb > 0 && download_range_mb >= max_slot_wal_keep_size_mb) { startpos = endpos - max_slot_wal_keep_size_mb * 1024 * 1024; - walprop_log(WARNING, "capped WAL download for logical replication to %X/%X as max_slot_wal_keep_size=%dMB", - LSN_FORMAT_ARGS(startpos), max_slot_wal_keep_size_mb); + wpg_log(WARNING, "capped WAL download for logical replication to %X/%X as max_slot_wal_keep_size=%dMB", + LSN_FORMAT_ARGS(startpos), max_slot_wal_keep_size_mb); } timeline = wp->greetRequest.timeline; @@ -1262,7 +1262,7 @@ WalProposerRecovery(WalProposer *wp, Safekeeper *sk) written = snprintf((char *) conninfo, MAXCONNINFO, "password=%s %s", neon_auth_token, sk->conninfo); if (written > MAXCONNINFO || written < 0) - elog(FATAL, "could not append password to the safekeeper connection string"); + wpg_log(FATAL, "could not append password to the safekeeper connection string"); } #if PG_MAJORVERSION_NUM < 16 @@ -1279,11 +1279,11 @@ WalProposerRecovery(WalProposer *wp, Safekeeper *sk) err))); return false; } - elog(LOG, - "start recovery for logical replication from %s:%s starting from %X/%08X till %X/%08X timeline " - "%d", - sk->host, sk->port, (uint32) (startpos >> 32), - (uint32) startpos, (uint32) (endpos >> 32), (uint32) endpos, timeline); + wpg_log(LOG, + "start recovery for logical replication from %s:%s starting from %X/%08X till %X/%08X timeline " + "%d", + sk->host, sk->port, (uint32) (startpos >> 32), + (uint32) startpos, (uint32) (endpos >> 32), (uint32) endpos, timeline); options.logical = false; options.startpoint = startpos; @@ -1481,11 +1481,11 @@ walprop_pg_wal_reader_allocate(Safekeeper *sk) { char log_prefix[64]; - snprintf(log_prefix, sizeof(log_prefix), "sk %s:%s nwr: ", sk->host, sk->port); + snprintf(log_prefix, sizeof(log_prefix), WP_LOG_PREFIX "sk %s:%s nwr: ", sk->host, sk->port); Assert(!sk->xlogreader); sk->xlogreader = NeonWALReaderAllocate(wal_segment_size, sk->wp->propEpochStartLsn, sk->wp, log_prefix); if (sk->xlogreader == NULL) - elog(FATAL, "Failed to allocate xlog reader"); + wpg_log(FATAL, "failed to allocate xlog reader"); } static NeonWALReadResult @@ -1549,7 +1549,7 @@ static void walprop_pg_init_event_set(WalProposer *wp) { if (waitEvents) - elog(FATAL, "double-initialization of event set"); + wpg_log(FATAL, "double-initialization of event set"); /* for each sk, we have socket plus potentially socket for neon walreader */ waitEvents = CreateWaitEventSet(TopMemoryContext, 2 + 2 * wp->n_safekeepers); @@ -1581,7 +1581,7 @@ add_nwr_event_set(Safekeeper *sk, uint32 events) Assert(sk->nwrEventPos == -1); sk->nwrEventPos = AddWaitEventToSet(waitEvents, events, NeonWALReaderSocket(sk->xlogreader), NULL, sk); sk->nwrConnEstablished = NeonWALReaderIsRemConnEstablished(sk->xlogreader); - elog(DEBUG5, "sk %s:%s: added nwr socket events %d", sk->host, sk->port, events); + wpg_log(DEBUG5, "sk %s:%s: added nwr socket events %d", sk->host, sk->port, events); } static void @@ -1680,8 +1680,8 @@ rm_safekeeper_event_set(Safekeeper *to_remove, bool is_sk) { WalProposer *wp = to_remove->wp; - elog(DEBUG5, "sk %s:%s: removing event, is_sk %d", - to_remove->host, to_remove->port, is_sk); + wpg_log(DEBUG5, "sk %s:%s: removing event, is_sk %d", + to_remove->host, to_remove->port, is_sk); /* * Shortpath for exiting if have nothing to do. We never call this @@ -1835,13 +1835,13 @@ GetLatestNeonFeedback(PageserverFeedback *rf, WalProposer *wp) rf->remote_consistent_lsn = wp->safekeeper[latest_safekeeper].appendResponse.rf.remote_consistent_lsn; rf->replytime = wp->safekeeper[latest_safekeeper].appendResponse.rf.replytime; - elog(DEBUG2, "GetLatestNeonFeedback: currentClusterSize %lu," - " last_received_lsn %X/%X, disk_consistent_lsn %X/%X, remote_consistent_lsn %X/%X, replytime %lu", - rf->currentClusterSize, - LSN_FORMAT_ARGS(rf->last_received_lsn), - LSN_FORMAT_ARGS(rf->disk_consistent_lsn), - LSN_FORMAT_ARGS(rf->remote_consistent_lsn), - rf->replytime); + wpg_log(DEBUG2, "GetLatestNeonFeedback: currentClusterSize %lu," + " last_received_lsn %X/%X, disk_consistent_lsn %X/%X, remote_consistent_lsn %X/%X, replytime %lu", + rf->currentClusterSize, + LSN_FORMAT_ARGS(rf->last_received_lsn), + LSN_FORMAT_ARGS(rf->disk_consistent_lsn), + LSN_FORMAT_ARGS(rf->remote_consistent_lsn), + rf->replytime); } /* @@ -1987,7 +1987,7 @@ GetLogRepRestartLSN(WalProposer *wp) { uint64 download_range_mb; - elog(LOG, "logical replication restart LSN %X/%X", LSN_FORMAT_ARGS(lrRestartLsn)); + wpg_log(LOG, "logical replication restart LSN %X/%X", LSN_FORMAT_ARGS(lrRestartLsn)); /* * If we need to download more than a max_slot_wal_keep_size, @@ -1999,8 +1999,8 @@ GetLogRepRestartLSN(WalProposer *wp) download_range_mb = (wp->propEpochStartLsn - lrRestartLsn) / MB; if (max_slot_wal_keep_size_mb > 0 && download_range_mb >= max_slot_wal_keep_size_mb) { - walprop_log(WARNING, "not downloading WAL for logical replication since %X/%X as max_slot_wal_keep_size=%dMB", - LSN_FORMAT_ARGS(lrRestartLsn), max_slot_wal_keep_size_mb); + wpg_log(WARNING, "not downloading WAL for logical replication since %X/%X as max_slot_wal_keep_size=%dMB", + LSN_FORMAT_ARGS(lrRestartLsn), max_slot_wal_keep_size_mb); return InvalidXLogRecPtr; }