diff --git a/pgxn/neon/Makefile b/pgxn/neon/Makefile index a9f174cd3c..4fecffb155 100644 --- a/pgxn/neon/Makefile +++ b/pgxn/neon/Makefile @@ -9,6 +9,7 @@ OBJS = \ libpagestore.o \ neon.o \ neon_utils.o \ + neon_walreader.o \ pagestore_smgr.o \ relsize_cache.o \ walproposer.o \ diff --git a/pgxn/neon/neon_walreader.c b/pgxn/neon/neon_walreader.c new file mode 100644 index 0000000000..ab94987b74 --- /dev/null +++ b/pgxn/neon/neon_walreader.c @@ -0,0 +1,222 @@ +/* + * Like WALRead, but returns error instead of throwing ERROR when segment is + * missing + doesn't attempt to read WAL before specified horizon -- basebackup + * LSN. Missing WAL should be fetched by peer recovery, or, alternatively, on + * demand WAL fetching from safekeepers should be implemented in NeonWALReader. + */ +#include "postgres.h" + +#include + +#include "access/xlog_internal.h" +#include "access/xlogdefs.h" +#include "access/xlogreader.h" +#include "storage/fd.h" +#include "utils/wait_event.h" + +#include "neon_walreader.h" +#include "walproposer.h" + +#define NEON_WALREADER_ERR_MSG_LEN 128 + +static void neon_wal_segment_close(NeonWALReader *state); + +struct NeonWALReader +{ + /* LSN before */ + XLogRecPtr available_lsn; + WALSegmentContext segcxt; + WALOpenSegment seg; + int wre_errno; + /* Explains failure to read, static for simplicity. */ + char err_msg[NEON_WALREADER_ERR_MSG_LEN]; +}; + +/* palloc and initialize NeonWALReader */ +NeonWALReader * +NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn) +{ + NeonWALReader *reader; + + reader = (NeonWALReader *) + palloc_extended(sizeof(NeonWALReader), + MCXT_ALLOC_NO_OOM | MCXT_ALLOC_ZERO); + if (!reader) + return NULL; + + reader->available_lsn = available_lsn; + reader->seg.ws_file = -1; + reader->seg.ws_segno = 0; + reader->seg.ws_tli = 0; + reader->segcxt.ws_segsize = wal_segment_size; + + return reader; +} + +void +NeonWALReaderFree(NeonWALReader *state) +{ + if (state->seg.ws_file != -1) + neon_wal_segment_close(state); + pfree(state); +} + + +/* + * Copy of vanilla wal_segment_open, but returns false in case of error instead + * of ERROR, with errno set. + * + * XLogReaderRoutine->segment_open callback for local pg_wal files + */ +static bool +neon_wal_segment_open(NeonWALReader *state, XLogSegNo nextSegNo, + TimeLineID *tli_p) +{ + TimeLineID tli = *tli_p; + char path[MAXPGPATH]; + + XLogFilePath(path, tli, nextSegNo, state->segcxt.ws_segsize); + walprop_log(LOG, "opening %s", path); + state->seg.ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY); + if (state->seg.ws_file >= 0) + return true; + + return false; +} + +/* copy of vanilla wal_segment_close with NeonWALReader */ +void +neon_wal_segment_close(NeonWALReader *state) +{ + close(state->seg.ws_file); + /* need to check errno? */ + state->seg.ws_file = -1; +} + +/* + * Mostly copy of vanilla WALRead, but 1) returns error if requested data before + * available_lsn 2) returns error is segment is missing instead of throwing + * ERROR. + * + * Read 'count' bytes into 'buf', starting at location 'startptr', from WAL + * fetched from timeline 'tli'. + * + * Returns true if succeeded, false if an error occurs, in which case + * 'state->errno' shows whether it was missing WAL (ENOENT) or something else, + * and 'err' the desciption. + */ +bool +NeonWALRead(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli) +{ + char *p; + XLogRecPtr recptr; + Size nbytes; + + if (startptr < state->available_lsn) + { + state->wre_errno = 0; + snprintf(state->err_msg, sizeof(state->err_msg), "failed to read WAL at %X/%X which is earlier than available %X/%X", + LSN_FORMAT_ARGS(startptr), LSN_FORMAT_ARGS(state->available_lsn)); + return false; + } + + p = buf; + recptr = startptr; + nbytes = count; + + while (nbytes > 0) + { + uint32 startoff; + int segbytes; + int readbytes; + + startoff = XLogSegmentOffset(recptr, state->segcxt.ws_segsize); + + /* + * If the data we want is not in a segment we have open, close what we + * have (if anything) and open the next one, using the caller's + * provided openSegment callback. + */ + if (state->seg.ws_file < 0 || + !XLByteInSeg(recptr, state->seg.ws_segno, state->segcxt.ws_segsize) || + tli != state->seg.ws_tli) + { + XLogSegNo nextSegNo; + + if (state->seg.ws_file >= 0) + neon_wal_segment_close(state); + + XLByteToSeg(recptr, nextSegNo, state->segcxt.ws_segsize); + if (!neon_wal_segment_open(state, nextSegNo, &tli)) + { + char fname[MAXFNAMELEN]; + + state->wre_errno = errno; + + XLogFileName(fname, tli, nextSegNo, state->segcxt.ws_segsize); + snprintf(state->err_msg, sizeof(state->err_msg), "failed to open WAL segment %s while reading at %X/%X: %s", + fname, LSN_FORMAT_ARGS(recptr), strerror(state->wre_errno)); + return false; + } + + /* This shouldn't happen -- indicates a bug in segment_open */ + Assert(state->seg.ws_file >= 0); + + /* Update the current segment info. */ + state->seg.ws_tli = tli; + state->seg.ws_segno = nextSegNo; + } + + /* How many bytes are within this segment? */ + if (nbytes > (state->segcxt.ws_segsize - startoff)) + segbytes = state->segcxt.ws_segsize - startoff; + else + segbytes = nbytes; + +#ifndef FRONTEND + pgstat_report_wait_start(WAIT_EVENT_WAL_READ); +#endif + + /* Reset errno first; eases reporting non-errno-affecting errors */ + errno = 0; + readbytes = pg_pread(state->seg.ws_file, p, segbytes, (off_t) startoff); + +#ifndef FRONTEND + pgstat_report_wait_end(); +#endif + + if (readbytes <= 0) + { + char fname[MAXFNAMELEN]; + + XLogFileName(fname, state->seg.ws_tli, state->seg.ws_segno, state->segcxt.ws_segsize); + + if (readbytes < 0) + { + state->wre_errno = errno; + snprintf(state->err_msg, sizeof(state->err_msg), "could not read from log segment %s, offset %d: %m: %s", + fname, startoff, strerror(state->wre_errno)); + } + else + { + snprintf(state->err_msg, sizeof(state->err_msg), "could not read from log segment %s, offset %d: %m: unexpected EOF", + fname, startoff); + } + return false; + } + + /* Update state for read */ + recptr += readbytes; + nbytes -= readbytes; + p += readbytes; + } + + return true; + +} + +char * +NeonWALReaderErrMsg(NeonWALReader *state) +{ + return state->err_msg; +} diff --git a/pgxn/neon/neon_walreader.h b/pgxn/neon/neon_walreader.h new file mode 100644 index 0000000000..dfb50622a7 --- /dev/null +++ b/pgxn/neon/neon_walreader.h @@ -0,0 +1,13 @@ +#ifndef __NEON_WALREADER_H__ +#define __NEON_WALREADER_H__ + +#include "access/xlogdefs.h" + +/* forward declare so we don't have to expose the struct to the public */ +struct NeonWALReader; +typedef struct NeonWALReader NeonWALReader; + +extern NeonWALReader *NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn); +extern void NeonWALReaderFree(NeonWALReader *state); +extern bool NeonWALRead(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli); +extern char *NeonWALReaderErrMsg(NeonWALReader *state); diff --git a/pgxn/neon/walproposer.c b/pgxn/neon/walproposer.c index 7d9dbfdb7f..69c79c1adf 100644 --- a/pgxn/neon/walproposer.c +++ b/pgxn/neon/walproposer.c @@ -125,7 +125,7 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api) } initStringInfo(&wp->safekeeper[wp->n_safekeepers].outbuf); - wp->api.wal_reader_allocate(&wp->safekeeper[wp->n_safekeepers]); + wp->safekeeper[wp->n_safekeepers].xlogreader = NULL; wp->safekeeper[wp->n_safekeepers].flushWrite = false; wp->safekeeper[wp->n_safekeepers].startStreamingAt = InvalidXLogRecPtr; wp->safekeeper[wp->n_safekeepers].streamingAt = InvalidXLogRecPtr; @@ -355,6 +355,12 @@ ShutdownConnection(Safekeeper *sk) sk->voteResponse.termHistory.entries = NULL; HackyRemoveWalProposerEvent(sk); + + if (sk->xlogreader) + { + NeonWALReaderFree(sk->xlogreader); + sk->xlogreader = NULL; + } } /* @@ -1104,6 +1110,10 @@ SendProposerElected(Safekeeper *sk) term_t lastCommonTerm; int i; + /* Now that we are ready to send it's a good moment to create WAL reader */ + Assert(!sk->xlogreader); + wp->api.wal_reader_allocate(sk); + /* * Determine start LSN by comparing safekeeper's log term switch history * and proposer's, searching for the divergence point. @@ -1369,10 +1379,17 @@ SendAppendRequests(Safekeeper *sk) /* write the WAL itself */ enlargeStringInfo(&sk->outbuf, req->endLsn - req->beginLsn); /* wal_read will raise error on failure */ - wp->api.wal_read(sk, - &sk->outbuf.data[sk->outbuf.len], - req->beginLsn, - req->endLsn - req->beginLsn); + if (!wp->api.wal_read(sk, + &sk->outbuf.data[sk->outbuf.len], + req->beginLsn, + req->endLsn - req->beginLsn)) + { + walprop_log(WARNING, "WAL reading for node %s:%s failed: %s", + sk->host, sk->port, + NeonWALReaderErrMsg(sk->xlogreader)); + ShutdownConnection(sk); + return false; + } sk->outbuf.len += req->endLsn - req->beginLsn; writeResult = wp->api.conn_async_write(sk, sk->outbuf.data, sk->outbuf.len); diff --git a/pgxn/neon/walproposer.h b/pgxn/neon/walproposer.h index ae7812e710..6fdbabb905 100644 --- a/pgxn/neon/walproposer.h +++ b/pgxn/neon/walproposer.h @@ -10,6 +10,8 @@ #include "utils/uuid.h" #include "replication/walreceiver.h" +#include "neon_walreader.h" + #define SK_MAGIC 0xCafeCeefu #define SK_PROTOCOL_VERSION 2 @@ -369,7 +371,7 @@ typedef struct Safekeeper /* * WAL reader, allocated for each safekeeper. */ - XLogReaderState *xlogreader; + NeonWALReader *xlogreader; /* * Position in wait event set. Equal to -1 if no event @@ -509,7 +511,7 @@ typedef struct walproposer_api bool (*recovery_download) (Safekeeper *sk, TimeLineID timeline, XLogRecPtr startpos, XLogRecPtr endpos); /* Read WAL from disk to buf. */ - void (*wal_read) (Safekeeper *sk, char *buf, XLogRecPtr startptr, Size count); + bool (*wal_read) (Safekeeper *sk, char *buf, XLogRecPtr startptr, Size count); /* Allocate WAL reader. */ void (*wal_reader_allocate) (Safekeeper *sk); diff --git a/pgxn/neon/walproposer_pg.c b/pgxn/neon/walproposer_pg.c index 8fed127459..3bdc171690 100644 --- a/pgxn/neon/walproposer_pg.c +++ b/pgxn/neon/walproposer_pg.c @@ -45,6 +45,7 @@ #include "neon.h" #include "walproposer.h" +#include "neon_walreader.h" #include "libpq-fe.h" #define XLOG_HDR_SIZE (1 + 8 * 3) /* 'w' + startPos + walEnd + timestamp */ @@ -1386,26 +1387,20 @@ XLogWalPropClose(XLogRecPtr recptr) walpropFile = -1; } -static void +static bool walprop_pg_wal_read(Safekeeper *sk, char *buf, XLogRecPtr startptr, Size count) { - WALReadError errinfo; - - if (!WALRead(sk->xlogreader, - buf, - startptr, - count, - walprop_pg_get_timeline_id(), - &errinfo)) - { - WALReadRaiseError(&errinfo); - } + return NeonWALRead(sk->xlogreader, + buf, + startptr, + count, + walprop_pg_get_timeline_id()); } static void walprop_pg_wal_reader_allocate(Safekeeper *sk) { - sk->xlogreader = XLogReaderAllocate(wal_segment_size, NULL, XL_ROUTINE(.segment_open = wal_segment_open,.segment_close = wal_segment_close), NULL); + sk->xlogreader = NeonWALReaderAllocate(wal_segment_size, sk->wp->propEpochStartLsn); if (sk->xlogreader == NULL) elog(FATAL, "Failed to allocate xlog reader"); }