Use in wp custom WAL reader gracefully handling missing WAL.

and disable recovery on start.
This commit is contained in:
Arseny Sher
2023-10-04 12:51:26 +03:00
parent 8ea21686e1
commit f0cbd5353a
3 changed files with 233 additions and 11 deletions

View File

@@ -566,9 +566,7 @@ WalProposerInit(XLogRecPtr flushRecPtr, uint64 systemId)
}
initStringInfo(&safekeeper[n_safekeepers].outbuf);
safekeeper[n_safekeepers].xlogreader = XLogReaderAllocate(wal_segment_size, NULL, XL_ROUTINE(.segment_open = wal_segment_open,.segment_close = wal_segment_close), NULL);
if (safekeeper[n_safekeepers].xlogreader == NULL)
elog(FATAL, "Failed to allocate xlog reader");
safekeeper[n_safekeepers].xlogreader = NULL;
safekeeper[n_safekeepers].flushWrite = false;
safekeeper[n_safekeepers].startStreamingAt = InvalidXLogRecPtr;
safekeeper[n_safekeepers].streamingAt = InvalidXLogRecPtr;
@@ -716,6 +714,12 @@ ShutdownConnection(Safekeeper *sk)
sk->voteResponse.termHistory.entries = NULL;
HackyRemoveWalProposerEvent(sk);
if (sk->xlogreader)
{
NeonWALReaderFree(sk->xlogreader);
sk->xlogreader = NULL;
}
}
/*
@@ -1238,8 +1242,8 @@ HandleElectedProposer(void)
LSN_FORMAT_ARGS(truncateLsn),
LSN_FORMAT_ARGS(propEpochStartLsn));
/* Perform recovery */
if (!WalProposerRecovery(donor, greetRequest.timeline, truncateLsn, propEpochStartLsn))
elog(FATAL, "Failed to recover state");
// if (!WalProposerRecovery(donor, greetRequest.timeline, truncateLsn, propEpochStartLsn))
// elog(FATAL, "Failed to recover state");
}
else if (syncSafekeepers)
{
@@ -1555,6 +1559,12 @@ SendProposerElected(Safekeeper *sk)
term_t lastCommonTerm;
int i;
/* It's a good moment to create WAL reader */
Assert(!sk->xlogreader);
sk->xlogreader = NeonWALReaderAllocate(wal_segment_size, propEpochStartLsn);
if (!sk->xlogreader)
elog(FATAL, "failed to allocate xlog reader");
/*
* Determine start LSN by comparing safekeeper's log term switch history
* and proposer's, searching for the divergence point.
@@ -1834,19 +1844,24 @@ SendAppendRequests(Safekeeper *sk)
/* write the WAL itself */
enlargeStringInfo(&sk->outbuf, req->endLsn - req->beginLsn);
if (!WALRead(sk->xlogreader,
if (!NeonWALRead(sk->xlogreader,
&sk->outbuf.data[sk->outbuf.len],
req->beginLsn,
req->endLsn - req->beginLsn,
#if PG_VERSION_NUM >= 150000
/* FIXME don't use hardcoded timeline_id here */
1,
1
#else
ThisTimeLineID,
ThisTimeLineID
#endif
&errinfo))
))
{
WALReadRaiseError(&errinfo);
elog(WARNING, "WAL reading for node %s:%s failed: %s",
sk->host, sk->port,
sk->xlogreader->err_msg);
ShutdownConnection(sk);
return false;
}
sk->outbuf.len += req->endLsn - req->beginLsn;

View File

@@ -2,6 +2,7 @@
#define __NEON_WALPROPOSER_H__
#include "access/xlogdefs.h"
#include "access/xlogreader.h"
#include "postgres.h"
#include "port.h"
#include "access/xlog_internal.h"
@@ -327,6 +328,24 @@ typedef struct AppendResponse
/* Other fields are fixed part */
#define APPENDRESPONSE_FIXEDPART_SIZE offsetof(AppendResponse, rf)
#define NEON_WALREADER_ERR_MSG_LEN 128
/*
* Like WALRead, but returns error instead of throwing ERROR when segment is
* missing + doesn't attempt to read WAL before specified horizon -- basebackup
* LSN. Missing WAL should be fetched by peer recovery, or, alternatively, on
* demand WAL fetching from safekeepers should be implemented in NeonWALReader.
*/
typedef struct {
/* LSN before */
XLogRecPtr available_lsn;
WALSegmentContext segcxt;
WALOpenSegment seg;
int wre_errno;
/* Explains failure to read, static for simplicity. */
char err_msg[NEON_WALREADER_ERR_MSG_LEN];
} NeonWALReader;
/*
* Descriptor of safekeeper
*/
@@ -358,7 +377,7 @@ typedef struct Safekeeper
/*
* WAL reader, allocated for each safekeeper.
*/
XLogReaderState *xlogreader;
NeonWALReader *xlogreader;
/*
* Streaming will start here; must be record boundary.
@@ -508,4 +527,9 @@ extern bool walprop_blocking_write(WalProposerConn *conn, void const *buf, size_
extern uint64 BackpressureThrottlingTime(void);
extern NeonWALReader *NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn);
extern void NeonWALReaderFree(NeonWALReader *state);
extern bool NeonWALRead(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli);
#endif /* __NEON_WALPROPOSER_H__ */

View File

@@ -12,6 +12,7 @@
#include "replication/slot.h"
#include "walproposer_utils.h"
#include "replication/walsender_private.h"
#include "utils/wait_event.h"
#include "storage/ipc.h"
#include "utils/builtins.h"
@@ -657,3 +658,185 @@ XLogBroadcastWalProposer(void)
set_ps_display(activitymsg);
}
}
/* palloc and initialize NeonWALReader */
NeonWALReader *NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn)
{
NeonWALReader *reader;
reader = (NeonWALReader *)
palloc_extended(sizeof(NeonWALReader),
MCXT_ALLOC_NO_OOM | MCXT_ALLOC_ZERO);
if (!reader)
return NULL;
reader->available_lsn = available_lsn;
reader->seg.ws_file = -1;
reader->seg.ws_segno = 0;
reader->seg.ws_tli = 0;
reader->segcxt.ws_segsize = wal_segment_size;
return reader;
}
static void neon_wal_segment_close(NeonWALReader *state);
void
NeonWALReaderFree(NeonWALReader *state)
{
if (state->seg.ws_file != -1)
neon_wal_segment_close(state);
pfree(state);
}
/*
* Copy of vanilla wal_segment_open, but returns false in case of error instead
* of ERROR, with errno set.
*
* XLogReaderRoutine->segment_open callback for local pg_wal files
*/
static bool
neon_wal_segment_open(NeonWALReader *state, XLogSegNo nextSegNo,
TimeLineID *tli_p)
{
TimeLineID tli = *tli_p;
char path[MAXPGPATH];
XLogFilePath(path, tli, nextSegNo, state->segcxt.ws_segsize);
state->seg.ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY);
if (state->seg.ws_file >= 0)
return true;
return false;
}
/* copy of vanilla wal_segment_close with NeonWALReader */
void
neon_wal_segment_close(NeonWALReader *state)
{
close(state->seg.ws_file);
/* need to check errno? */
state->seg.ws_file = -1;
}
/*
* Mostly copy of vanilla WALRead, but 1) returns error if requested data before
* available_lsn 2) returns error is segment is missing instead of throwing
* ERROR.
*
* Read 'count' bytes into 'buf', starting at location 'startptr', from WAL
* fetched from timeline 'tli'.
*
* Returns true if succeeded, false if an error occurs, in which case
* 'state->errno' shows whether it was missing WAL (ENOENT) or something else,
* and 'err' the desciption.
*/
bool NeonWALRead(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli)
{
char *p;
XLogRecPtr recptr;
Size nbytes;
if (startptr < state->available_lsn)
{
state->wre_errno = 0;
snprintf(state->err_msg, sizeof(state->err_msg), "failed to read WAL at %X/%X which is earlier than available %X/%X",
LSN_FORMAT_ARGS(startptr), LSN_FORMAT_ARGS(state->available_lsn));
return false;
}
p = buf;
recptr = startptr;
nbytes = count;
while (nbytes > 0)
{
uint32 startoff;
int segbytes;
int readbytes;
startoff = XLogSegmentOffset(recptr, state->segcxt.ws_segsize);
/*
* If the data we want is not in a segment we have open, close what we
* have (if anything) and open the next one, using the caller's
* provided openSegment callback.
*/
if (state->seg.ws_file < 0 ||
!XLByteInSeg(recptr, state->seg.ws_segno, state->segcxt.ws_segsize) ||
tli != state->seg.ws_tli)
{
XLogSegNo nextSegNo;
if (state->seg.ws_file >= 0)
neon_wal_segment_close(state);
XLByteToSeg(recptr, nextSegNo, state->segcxt.ws_segsize);
if (!neon_wal_segment_open(state, nextSegNo, &tli))
{
char fname[MAXFNAMELEN];
state->wre_errno = errno;
XLogFileName(fname, tli, nextSegNo, state->segcxt.ws_segsize);
snprintf(state->err_msg, sizeof(state->err_msg), "failed to open WAL segment %s while reading at %X/%X: %s",
fname, LSN_FORMAT_ARGS(recptr), strerror(state->wre_errno));
return false;
}
/* This shouldn't happen -- indicates a bug in segment_open */
Assert(state->seg.ws_file >= 0);
/* Update the current segment info. */
state->seg.ws_tli = tli;
state->seg.ws_segno = nextSegNo;
}
/* How many bytes are within this segment? */
if (nbytes > (state->segcxt.ws_segsize - startoff))
segbytes = state->segcxt.ws_segsize - startoff;
else
segbytes = nbytes;
#ifndef FRONTEND
pgstat_report_wait_start(WAIT_EVENT_WAL_READ);
#endif
/* Reset errno first; eases reporting non-errno-affecting errors */
errno = 0;
readbytes = pg_pread(state->seg.ws_file, p, segbytes, (off_t) startoff);
#ifndef FRONTEND
pgstat_report_wait_end();
#endif
if (readbytes <= 0)
{
char fname[MAXFNAMELEN];
XLogFileName(fname, state->seg.ws_tli, state->seg.ws_segno, state->segcxt.ws_segsize);
if (readbytes < 0)
{
state->wre_errno = errno;
snprintf(state->err_msg, sizeof(state->err_msg), "could not read from log segment %s, offset %d: %m: %s",
fname, startoff, strerror(state->wre_errno));
}
else
{
snprintf(state->err_msg, sizeof(state->err_msg), "could not read from log segment %s, offset %d: %m: unexpected EOF",
fname, startoff);
}
return false;
}
/* Update state for read */
recptr += readbytes;
nbytes -= readbytes;
p += readbytes;
}
return true;
}