diff --git a/pgxn/neon/neon_walreader.c b/pgxn/neon/neon_walreader.c index c217faa66b..268a887c38 100644 --- a/pgxn/neon/neon_walreader.c +++ b/pgxn/neon/neon_walreader.c @@ -29,6 +29,11 @@ #define NEON_WALREADER_ERR_MSG_LEN 512 +/* + * Can be called where NeonWALReader *state is available in the context, adds log_prefix. + */ +#define nwr_log(elevel, fmt, ...) elog(elevel, "%s" fmt, state->log_prefix, ## __VA_ARGS__) + static NeonWALReadResult NeonWALReadRemote(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli); static NeonWALReadResult NeonWALReaderReadMsg(NeonWALReader *state); static void NeonWALReaderResetRemote(NeonWALReader *state); @@ -95,11 +100,14 @@ struct NeonWALReader * read request */ XLogRecPtr rem_lsn; + + /* prepended to lines logged by neon_walreader, if provided */ + char log_prefix[64]; }; /* palloc and initialize NeonWALReader */ NeonWALReader * -NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn, WalProposer *wp) +NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn, WalProposer *wp, char *log_prefix) { NeonWALReader *reader; @@ -119,6 +127,9 @@ NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn, WalPropose reader->rem_state = RS_NONE; + if (log_prefix) + strncpy(reader->log_prefix, log_prefix, sizeof(reader->log_prefix)); + return reader; } @@ -178,7 +189,7 @@ NeonWALRead(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size count, Ti } else if (state->wre_errno == ENOENT) { - elog(LOG, "local read failed as segment at %X/%X doesn't exist, attempting remote", + nwr_log(LOG, "local read failed as segment at %X/%X doesn't exist, attempting remote", LSN_FORMAT_ARGS(startptr)); return NeonWALReadRemote(state, buf, startptr, count, tli); } @@ -206,7 +217,7 @@ NeonWALReadRemote(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size cou return NEON_WALREAD_ERROR; } snprintf(state->donor_name, sizeof(state->donor_name), "%s:%s", donor->host, donor->port); - elog(LOG, "establishing connection to %s, flush_lsn %X/%X to fetch WAL", + nwr_log(LOG, "establishing connection to %s, flush_lsn %X/%X to fetch WAL", state->donor_name, LSN_FORMAT_ARGS(donor_lsn)); state->wp_conn = libpqwp_connect_start(donor->conninfo); if (PQstatus(state->wp_conn->pg_conn) == CONNECTION_BAD) @@ -245,7 +256,7 @@ NeonWALReadRemote(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size cou snprintf(start_repl_query, sizeof(start_repl_query), "START_REPLICATION PHYSICAL %X/%X (term='" UINT64_FORMAT "')", LSN_FORMAT_ARGS(startptr), state->wp->propTerm); - elog(LOG, "connection to %s to fetch WAL succeeded, running %s", + nwr_log(LOG, "connection to %s to fetch WAL succeeded, running %s", state->donor_name, start_repl_query); if (!libpqwp_send_query(state->wp_conn, start_repl_query)) { @@ -305,7 +316,7 @@ NeonWALReadRemote(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size cou NeonWALReaderResetRemote(state); return NEON_WALREAD_ERROR; } - elog(LOG, "continuing remote read at req_lsn=%X/%X len=%zu, req_progress=%zu", + nwr_log(LOG, "continuing remote read at req_lsn=%X/%X len=%zu, req_progress=%zu", LSN_FORMAT_ARGS(startptr), count, state->req_progress); @@ -316,7 +327,7 @@ NeonWALReadRemote(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size cou state->req_lsn = startptr; state->req_len = count; state->req_progress = 0; - elog(LOG, "starting remote read req_lsn=%X/%X len=%zu", + nwr_log(LOG, "starting remote read req_lsn=%X/%X len=%zu", LSN_FORMAT_ARGS(startptr), count); } @@ -373,7 +384,7 @@ NeonWALReadRemote(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size cou if (state->req_lsn < state->available_lsn && state->rem_lsn >= state->available_lsn) { - elog(LOG, "closing remote connection as available_lsn %X/%X crossed and next read is likely to be served locally", + nwr_log(LOG, "closing remote connection as available_lsn %X/%X crossed and next read is likely to be served locally", LSN_FORMAT_ARGS(state->available_lsn)); NeonWALReaderResetRemote(state); } @@ -384,7 +395,7 @@ NeonWALReadRemote(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size cou XLByteToSeg(state->rem_lsn, segno, state->segcxt.ws_segsize); if (is_wal_segment_exists(segno, state->segcxt.ws_segsize, tli)) { - elog(LOG, "closing remote connection as WAL file at next lsn %X/%X exists", + nwr_log(LOG, "closing remote connection as WAL file at next lsn %X/%X exists", LSN_FORMAT_ARGS(state->rem_lsn)); NeonWALReaderResetRemote(state); } @@ -473,7 +484,7 @@ NeonWALReaderReadMsg(NeonWALReader *state) state->rem_lsn = start_lsn; state->wal_rem_len = (Size) (s.len - s.cursor); state->wal_ptr = (char *) pq_getmsgbytes(&s, s.len - s.cursor); - elog(LOG, "received WAL msg at %X/%X len %zu", + nwr_log(LOG, "received WAL msg at %X/%X len %zu", LSN_FORMAT_ARGS(state->rem_lsn), state->wal_rem_len); return NEON_WALREAD_SUCCESS; @@ -494,14 +505,14 @@ NeonWALReaderReadMsg(NeonWALReader *state) end_lsn = pq_getmsgint64(&s); pq_getmsgint64(&s); /* TimestampTz timestamp; */ reply_requested = pq_getmsgbyte(&s); - elog(DEBUG5, "received keepalive end_lsn=%X/%X reply_requested=%d", + nwr_log(DEBUG5, "received keepalive end_lsn=%X/%X reply_requested=%d", LSN_FORMAT_ARGS(end_lsn), reply_requested); continue; /* todo: send replies */ } default: - elog(WARNING, "invalid replication message type %d", msg_type); + nwr_log(WARNING, "invalid replication message type %d", msg_type); continue; } } @@ -537,7 +548,7 @@ pgsocket NeonWALReaderSocket(NeonWALReader *state) { if (!state->wp_conn) - elog(FATAL, "NeonWALReaderSocket is called without active remote connection"); + nwr_log(FATAL, "NeonWALReaderSocket is called without active remote connection"); return PQsocket(state->wp_conn->pg_conn); } diff --git a/pgxn/neon/neon_walreader.h b/pgxn/neon/neon_walreader.h index 098588f4db..805c94fc53 100644 --- a/pgxn/neon/neon_walreader.h +++ b/pgxn/neon/neon_walreader.h @@ -19,7 +19,7 @@ typedef enum NEON_WALREAD_ERROR, } NeonWALReadResult; -extern NeonWALReader *NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn, WalProposer *wp); +extern NeonWALReader *NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn, WalProposer *wp, char *log_prefix); extern void NeonWALReaderFree(NeonWALReader *state); extern NeonWALReadResult NeonWALRead(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli); extern pgsocket NeonWALReaderSocket(NeonWALReader *state); diff --git a/pgxn/neon/walproposer_pg.c b/pgxn/neon/walproposer_pg.c index 0114c33241..821b1d017a 100644 --- a/pgxn/neon/walproposer_pg.c +++ b/pgxn/neon/walproposer_pg.c @@ -1460,7 +1460,10 @@ walprop_pg_wal_read(Safekeeper *sk, char *buf, XLogRecPtr startptr, Size count) static void walprop_pg_wal_reader_allocate(Safekeeper *sk) { - sk->xlogreader = NeonWALReaderAllocate(wal_segment_size, sk->wp->propEpochStartLsn, sk->wp); + char log_prefix[64]; + + snprintf(log_prefix, sizeof(log_prefix), "sk %s:%s nwr: ", sk->host, sk->port); + sk->xlogreader = NeonWALReaderAllocate(wal_segment_size, sk->wp->propEpochStartLsn, sk->wp, log_prefix); if (sk->xlogreader == NULL) elog(FATAL, "Failed to allocate xlog reader"); }