From f0cbd5353ae057dbbeef1968901fe81c0295fd70 Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Wed, 4 Oct 2023 12:51:26 +0300 Subject: [PATCH] Use in wp custom WAL reader gracefully handling missing WAL. and disable recovery on start. --- pgxn/neon/walproposer.c | 35 +++++-- pgxn/neon/walproposer.h | 26 ++++- pgxn/neon/walproposer_utils.c | 183 ++++++++++++++++++++++++++++++++++ 3 files changed, 233 insertions(+), 11 deletions(-) diff --git a/pgxn/neon/walproposer.c b/pgxn/neon/walproposer.c index a9342bd984..4630b159fa 100644 --- a/pgxn/neon/walproposer.c +++ b/pgxn/neon/walproposer.c @@ -566,9 +566,7 @@ WalProposerInit(XLogRecPtr flushRecPtr, uint64 systemId) } initStringInfo(&safekeeper[n_safekeepers].outbuf); - safekeeper[n_safekeepers].xlogreader = XLogReaderAllocate(wal_segment_size, NULL, XL_ROUTINE(.segment_open = wal_segment_open,.segment_close = wal_segment_close), NULL); - if (safekeeper[n_safekeepers].xlogreader == NULL) - elog(FATAL, "Failed to allocate xlog reader"); + safekeeper[n_safekeepers].xlogreader = NULL; safekeeper[n_safekeepers].flushWrite = false; safekeeper[n_safekeepers].startStreamingAt = InvalidXLogRecPtr; safekeeper[n_safekeepers].streamingAt = InvalidXLogRecPtr; @@ -716,6 +714,12 @@ ShutdownConnection(Safekeeper *sk) sk->voteResponse.termHistory.entries = NULL; HackyRemoveWalProposerEvent(sk); + + if (sk->xlogreader) + { + NeonWALReaderFree(sk->xlogreader); + sk->xlogreader = NULL; + } } /* @@ -1238,8 +1242,8 @@ HandleElectedProposer(void) LSN_FORMAT_ARGS(truncateLsn), LSN_FORMAT_ARGS(propEpochStartLsn)); /* Perform recovery */ - if (!WalProposerRecovery(donor, greetRequest.timeline, truncateLsn, propEpochStartLsn)) - elog(FATAL, "Failed to recover state"); + // if (!WalProposerRecovery(donor, greetRequest.timeline, truncateLsn, propEpochStartLsn)) + // elog(FATAL, "Failed to recover state"); } else if (syncSafekeepers) { @@ -1555,6 +1559,12 @@ SendProposerElected(Safekeeper *sk) term_t lastCommonTerm; int i; + /* It's a good moment to create WAL reader */ + Assert(!sk->xlogreader); + sk->xlogreader = NeonWALReaderAllocate(wal_segment_size, propEpochStartLsn); + if (!sk->xlogreader) + elog(FATAL, "failed to allocate xlog reader"); + /* * Determine start LSN by comparing safekeeper's log term switch history * and proposer's, searching for the divergence point. @@ -1834,19 +1844,24 @@ SendAppendRequests(Safekeeper *sk) /* write the WAL itself */ enlargeStringInfo(&sk->outbuf, req->endLsn - req->beginLsn); - if (!WALRead(sk->xlogreader, + + if (!NeonWALRead(sk->xlogreader, &sk->outbuf.data[sk->outbuf.len], req->beginLsn, req->endLsn - req->beginLsn, #if PG_VERSION_NUM >= 150000 /* FIXME don't use hardcoded timeline_id here */ - 1, + 1 #else - ThisTimeLineID, + ThisTimeLineID #endif - &errinfo)) + )) { - WALReadRaiseError(&errinfo); + elog(WARNING, "WAL reading for node %s:%s failed: %s", + sk->host, sk->port, + sk->xlogreader->err_msg); + ShutdownConnection(sk); + return false; } sk->outbuf.len += req->endLsn - req->beginLsn; diff --git a/pgxn/neon/walproposer.h b/pgxn/neon/walproposer.h index fa1ba30a8f..0fb92f6910 100644 --- a/pgxn/neon/walproposer.h +++ b/pgxn/neon/walproposer.h @@ -2,6 +2,7 @@ #define __NEON_WALPROPOSER_H__ #include "access/xlogdefs.h" +#include "access/xlogreader.h" #include "postgres.h" #include "port.h" #include "access/xlog_internal.h" @@ -327,6 +328,24 @@ typedef struct AppendResponse /* Other fields are fixed part */ #define APPENDRESPONSE_FIXEDPART_SIZE offsetof(AppendResponse, rf) +#define NEON_WALREADER_ERR_MSG_LEN 128 + +/* + * Like WALRead, but returns error instead of throwing ERROR when segment is + * missing + doesn't attempt to read WAL before specified horizon -- basebackup + * LSN. Missing WAL should be fetched by peer recovery, or, alternatively, on + * demand WAL fetching from safekeepers should be implemented in NeonWALReader. + */ +typedef struct { + /* LSN before */ + XLogRecPtr available_lsn; + WALSegmentContext segcxt; + WALOpenSegment seg; + int wre_errno; + /* Explains failure to read, static for simplicity. */ + char err_msg[NEON_WALREADER_ERR_MSG_LEN]; +} NeonWALReader; + /* * Descriptor of safekeeper */ @@ -358,7 +377,7 @@ typedef struct Safekeeper /* * WAL reader, allocated for each safekeeper. */ - XLogReaderState *xlogreader; + NeonWALReader *xlogreader; /* * Streaming will start here; must be record boundary. @@ -508,4 +527,9 @@ extern bool walprop_blocking_write(WalProposerConn *conn, void const *buf, size_ extern uint64 BackpressureThrottlingTime(void); +extern NeonWALReader *NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn); +extern void NeonWALReaderFree(NeonWALReader *state); +extern bool NeonWALRead(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli); + + #endif /* __NEON_WALPROPOSER_H__ */ diff --git a/pgxn/neon/walproposer_utils.c b/pgxn/neon/walproposer_utils.c index 05030360f6..446385795e 100644 --- a/pgxn/neon/walproposer_utils.c +++ b/pgxn/neon/walproposer_utils.c @@ -12,6 +12,7 @@ #include "replication/slot.h" #include "walproposer_utils.h" #include "replication/walsender_private.h" +#include "utils/wait_event.h" #include "storage/ipc.h" #include "utils/builtins.h" @@ -657,3 +658,185 @@ XLogBroadcastWalProposer(void) set_ps_display(activitymsg); } } + +/* palloc and initialize NeonWALReader */ +NeonWALReader *NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn) +{ + NeonWALReader *reader; + + reader = (NeonWALReader *) + palloc_extended(sizeof(NeonWALReader), + MCXT_ALLOC_NO_OOM | MCXT_ALLOC_ZERO); + if (!reader) + return NULL; + + reader->available_lsn = available_lsn; + reader->seg.ws_file = -1; + reader->seg.ws_segno = 0; + reader->seg.ws_tli = 0; + reader->segcxt.ws_segsize = wal_segment_size; + + return reader; +} + +static void neon_wal_segment_close(NeonWALReader *state); + +void +NeonWALReaderFree(NeonWALReader *state) +{ + if (state->seg.ws_file != -1) + neon_wal_segment_close(state); + pfree(state); +} + + +/* + * Copy of vanilla wal_segment_open, but returns false in case of error instead + * of ERROR, with errno set. + * + * XLogReaderRoutine->segment_open callback for local pg_wal files + */ +static bool +neon_wal_segment_open(NeonWALReader *state, XLogSegNo nextSegNo, + TimeLineID *tli_p) +{ + TimeLineID tli = *tli_p; + char path[MAXPGPATH]; + + XLogFilePath(path, tli, nextSegNo, state->segcxt.ws_segsize); + state->seg.ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY); + if (state->seg.ws_file >= 0) + return true; + + return false; +} + +/* copy of vanilla wal_segment_close with NeonWALReader */ +void +neon_wal_segment_close(NeonWALReader *state) +{ + close(state->seg.ws_file); + /* need to check errno? */ + state->seg.ws_file = -1; +} + +/* + * Mostly copy of vanilla WALRead, but 1) returns error if requested data before + * available_lsn 2) returns error is segment is missing instead of throwing + * ERROR. + * + * Read 'count' bytes into 'buf', starting at location 'startptr', from WAL + * fetched from timeline 'tli'. + * + * Returns true if succeeded, false if an error occurs, in which case + * 'state->errno' shows whether it was missing WAL (ENOENT) or something else, + * and 'err' the desciption. + */ +bool NeonWALRead(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli) +{ + char *p; + XLogRecPtr recptr; + Size nbytes; + + if (startptr < state->available_lsn) + { + state->wre_errno = 0; + snprintf(state->err_msg, sizeof(state->err_msg), "failed to read WAL at %X/%X which is earlier than available %X/%X", + LSN_FORMAT_ARGS(startptr), LSN_FORMAT_ARGS(state->available_lsn)); + return false; + } + + p = buf; + recptr = startptr; + nbytes = count; + + while (nbytes > 0) + { + uint32 startoff; + int segbytes; + int readbytes; + + startoff = XLogSegmentOffset(recptr, state->segcxt.ws_segsize); + + /* + * If the data we want is not in a segment we have open, close what we + * have (if anything) and open the next one, using the caller's + * provided openSegment callback. + */ + if (state->seg.ws_file < 0 || + !XLByteInSeg(recptr, state->seg.ws_segno, state->segcxt.ws_segsize) || + tli != state->seg.ws_tli) + { + XLogSegNo nextSegNo; + + if (state->seg.ws_file >= 0) + neon_wal_segment_close(state); + + XLByteToSeg(recptr, nextSegNo, state->segcxt.ws_segsize); + if (!neon_wal_segment_open(state, nextSegNo, &tli)) + { + char fname[MAXFNAMELEN]; + + state->wre_errno = errno; + + XLogFileName(fname, tli, nextSegNo, state->segcxt.ws_segsize); + snprintf(state->err_msg, sizeof(state->err_msg), "failed to open WAL segment %s while reading at %X/%X: %s", + fname, LSN_FORMAT_ARGS(recptr), strerror(state->wre_errno)); + return false; + } + + /* This shouldn't happen -- indicates a bug in segment_open */ + Assert(state->seg.ws_file >= 0); + + /* Update the current segment info. */ + state->seg.ws_tli = tli; + state->seg.ws_segno = nextSegNo; + } + + /* How many bytes are within this segment? */ + if (nbytes > (state->segcxt.ws_segsize - startoff)) + segbytes = state->segcxt.ws_segsize - startoff; + else + segbytes = nbytes; + +#ifndef FRONTEND + pgstat_report_wait_start(WAIT_EVENT_WAL_READ); +#endif + + /* Reset errno first; eases reporting non-errno-affecting errors */ + errno = 0; + readbytes = pg_pread(state->seg.ws_file, p, segbytes, (off_t) startoff); + +#ifndef FRONTEND + pgstat_report_wait_end(); +#endif + + if (readbytes <= 0) + { + char fname[MAXFNAMELEN]; + + XLogFileName(fname, state->seg.ws_tli, state->seg.ws_segno, state->segcxt.ws_segsize); + + if (readbytes < 0) + { + state->wre_errno = errno; + snprintf(state->err_msg, sizeof(state->err_msg), "could not read from log segment %s, offset %d: %m: %s", + fname, startoff, strerror(state->wre_errno)); + } + else + { + snprintf(state->err_msg, sizeof(state->err_msg), "could not read from log segment %s, offset %d: %m: unexpected EOF", + fname, startoff); + } + return false; + } + + /* Update state for read */ + recptr += readbytes; + nbytes -= readbytes; + p += readbytes; + } + + return true; + +} \ No newline at end of file