Add nwr_log.

This commit is contained in:
Arseny Sher
2023-11-28 12:40:54 +03:00
parent 30bb18f3a9
commit eabfc5542e
3 changed files with 28 additions and 14 deletions

View File

@@ -29,6 +29,11 @@
#define NEON_WALREADER_ERR_MSG_LEN 512
/*
* Can be called where NeonWALReader *state is available in the context, adds log_prefix.
*/
#define nwr_log(elevel, fmt, ...) elog(elevel, "%s" fmt, state->log_prefix, ## __VA_ARGS__)
static NeonWALReadResult NeonWALReadRemote(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli);
static NeonWALReadResult NeonWALReaderReadMsg(NeonWALReader *state);
static void NeonWALReaderResetRemote(NeonWALReader *state);
@@ -95,11 +100,14 @@ struct NeonWALReader
* read request
*/
XLogRecPtr rem_lsn;
/* prepended to lines logged by neon_walreader, if provided */
char log_prefix[64];
};
/* palloc and initialize NeonWALReader */
NeonWALReader *
NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn, WalProposer *wp)
NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn, WalProposer *wp, char *log_prefix)
{
NeonWALReader *reader;
@@ -119,6 +127,9 @@ NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn, WalPropose
reader->rem_state = RS_NONE;
if (log_prefix)
strncpy(reader->log_prefix, log_prefix, sizeof(reader->log_prefix));
return reader;
}
@@ -178,7 +189,7 @@ NeonWALRead(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size count, Ti
}
else if (state->wre_errno == ENOENT)
{
elog(LOG, "local read failed as segment at %X/%X doesn't exist, attempting remote",
nwr_log(LOG, "local read failed as segment at %X/%X doesn't exist, attempting remote",
LSN_FORMAT_ARGS(startptr));
return NeonWALReadRemote(state, buf, startptr, count, tli);
}
@@ -206,7 +217,7 @@ NeonWALReadRemote(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size cou
return NEON_WALREAD_ERROR;
}
snprintf(state->donor_name, sizeof(state->donor_name), "%s:%s", donor->host, donor->port);
elog(LOG, "establishing connection to %s, flush_lsn %X/%X to fetch WAL",
nwr_log(LOG, "establishing connection to %s, flush_lsn %X/%X to fetch WAL",
state->donor_name, LSN_FORMAT_ARGS(donor_lsn));
state->wp_conn = libpqwp_connect_start(donor->conninfo);
if (PQstatus(state->wp_conn->pg_conn) == CONNECTION_BAD)
@@ -245,7 +256,7 @@ NeonWALReadRemote(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size cou
snprintf(start_repl_query, sizeof(start_repl_query),
"START_REPLICATION PHYSICAL %X/%X (term='" UINT64_FORMAT "')",
LSN_FORMAT_ARGS(startptr), state->wp->propTerm);
elog(LOG, "connection to %s to fetch WAL succeeded, running %s",
nwr_log(LOG, "connection to %s to fetch WAL succeeded, running %s",
state->donor_name, start_repl_query);
if (!libpqwp_send_query(state->wp_conn, start_repl_query))
{
@@ -305,7 +316,7 @@ NeonWALReadRemote(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size cou
NeonWALReaderResetRemote(state);
return NEON_WALREAD_ERROR;
}
elog(LOG, "continuing remote read at req_lsn=%X/%X len=%zu, req_progress=%zu",
nwr_log(LOG, "continuing remote read at req_lsn=%X/%X len=%zu, req_progress=%zu",
LSN_FORMAT_ARGS(startptr),
count,
state->req_progress);
@@ -316,7 +327,7 @@ NeonWALReadRemote(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size cou
state->req_lsn = startptr;
state->req_len = count;
state->req_progress = 0;
elog(LOG, "starting remote read req_lsn=%X/%X len=%zu",
nwr_log(LOG, "starting remote read req_lsn=%X/%X len=%zu",
LSN_FORMAT_ARGS(startptr),
count);
}
@@ -373,7 +384,7 @@ NeonWALReadRemote(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size cou
if (state->req_lsn < state->available_lsn &&
state->rem_lsn >= state->available_lsn)
{
elog(LOG, "closing remote connection as available_lsn %X/%X crossed and next read is likely to be served locally",
nwr_log(LOG, "closing remote connection as available_lsn %X/%X crossed and next read is likely to be served locally",
LSN_FORMAT_ARGS(state->available_lsn));
NeonWALReaderResetRemote(state);
}
@@ -384,7 +395,7 @@ NeonWALReadRemote(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size cou
XLByteToSeg(state->rem_lsn, segno, state->segcxt.ws_segsize);
if (is_wal_segment_exists(segno, state->segcxt.ws_segsize, tli))
{
elog(LOG, "closing remote connection as WAL file at next lsn %X/%X exists",
nwr_log(LOG, "closing remote connection as WAL file at next lsn %X/%X exists",
LSN_FORMAT_ARGS(state->rem_lsn));
NeonWALReaderResetRemote(state);
}
@@ -473,7 +484,7 @@ NeonWALReaderReadMsg(NeonWALReader *state)
state->rem_lsn = start_lsn;
state->wal_rem_len = (Size) (s.len - s.cursor);
state->wal_ptr = (char *) pq_getmsgbytes(&s, s.len - s.cursor);
elog(LOG, "received WAL msg at %X/%X len %zu",
nwr_log(LOG, "received WAL msg at %X/%X len %zu",
LSN_FORMAT_ARGS(state->rem_lsn), state->wal_rem_len);
return NEON_WALREAD_SUCCESS;
@@ -494,14 +505,14 @@ NeonWALReaderReadMsg(NeonWALReader *state)
end_lsn = pq_getmsgint64(&s);
pq_getmsgint64(&s); /* TimestampTz timestamp; */
reply_requested = pq_getmsgbyte(&s);
elog(DEBUG5, "received keepalive end_lsn=%X/%X reply_requested=%d",
nwr_log(DEBUG5, "received keepalive end_lsn=%X/%X reply_requested=%d",
LSN_FORMAT_ARGS(end_lsn),
reply_requested);
continue;
/* todo: send replies */
}
default:
elog(WARNING, "invalid replication message type %d", msg_type);
nwr_log(WARNING, "invalid replication message type %d", msg_type);
continue;
}
}
@@ -537,7 +548,7 @@ pgsocket
NeonWALReaderSocket(NeonWALReader *state)
{
if (!state->wp_conn)
elog(FATAL, "NeonWALReaderSocket is called without active remote connection");
nwr_log(FATAL, "NeonWALReaderSocket is called without active remote connection");
return PQsocket(state->wp_conn->pg_conn);
}

View File

@@ -19,7 +19,7 @@ typedef enum
NEON_WALREAD_ERROR,
} NeonWALReadResult;
extern NeonWALReader *NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn, WalProposer *wp);
extern NeonWALReader *NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn, WalProposer *wp, char *log_prefix);
extern void NeonWALReaderFree(NeonWALReader *state);
extern NeonWALReadResult NeonWALRead(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli);
extern pgsocket NeonWALReaderSocket(NeonWALReader *state);

View File

@@ -1460,7 +1460,10 @@ walprop_pg_wal_read(Safekeeper *sk, char *buf, XLogRecPtr startptr, Size count)
static void
walprop_pg_wal_reader_allocate(Safekeeper *sk)
{
sk->xlogreader = NeonWALReaderAllocate(wal_segment_size, sk->wp->propEpochStartLsn, sk->wp);
char log_prefix[64];
snprintf(log_prefix, sizeof(log_prefix), "sk %s:%s nwr: ", sk->host, sk->port);
sk->xlogreader = NeonWALReaderAllocate(wal_segment_size, sk->wp->propEpochStartLsn, sk->wp, log_prefix);
if (sk->xlogreader == NULL)
elog(FATAL, "Failed to allocate xlog reader");
}