diff --git a/pgxn/neon/neon_walreader.c b/pgxn/neon/neon_walreader.c index d5e3a38dbb..b4ffbe0b18 100644 --- a/pgxn/neon/neon_walreader.c +++ b/pgxn/neon/neon_walreader.c @@ -69,6 +69,7 @@ struct NeonWALReader WALSegmentContext segcxt; WALOpenSegment seg; int wre_errno; + TimeLineID local_active_tlid; /* Explains failure to read, static for simplicity. */ char err_msg[NEON_WALREADER_ERR_MSG_LEN]; @@ -106,7 +107,8 @@ struct NeonWALReader /* palloc and initialize NeonWALReader */ NeonWALReader * -NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn, char *log_prefix) +NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn, + char *log_prefix, TimeLineID tlid) { NeonWALReader *reader; @@ -118,6 +120,7 @@ NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn, char *log_ MemoryContextAllocZero(TopMemoryContext, sizeof(NeonWALReader)); reader->available_lsn = available_lsn; + reader->local_active_tlid = tlid; reader->seg.ws_file = -1; reader->seg.ws_segno = 0; reader->seg.ws_tli = 0; @@ -577,6 +580,17 @@ NeonWALReaderIsRemConnEstablished(NeonWALReader *state) return state->rem_state == RS_ESTABLISHED; } +/* + * Whether remote connection is established. Once this is done, until successful + * local read or error socket is stable and user can update socket events + * instead of readding it each time. + */ +TimeLineID +NeonWALReaderLocalActiveTimeLineID(NeonWALReader *state) +{ + return state->local_active_tlid; +} + /* * Returns events user should wait on connection socket or 0 if remote * connection is not active. diff --git a/pgxn/neon/neon_walreader.h b/pgxn/neon/neon_walreader.h index 3e41825069..b89018fde2 100644 --- a/pgxn/neon/neon_walreader.h +++ b/pgxn/neon/neon_walreader.h @@ -19,9 +19,12 @@ typedef enum NEON_WALREAD_ERROR, } NeonWALReadResult; -extern NeonWALReader *NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn, char *log_prefix); +extern NeonWALReader *NeonWALReaderAllocate(int wal_segment_size, + XLogRecPtr available_lsn, + char *log_prefix, TimeLineID tlid); extern void NeonWALReaderFree(NeonWALReader *state); extern void NeonWALReaderResetRemote(NeonWALReader *state); +extern TimeLineID NeonWALReaderLocalActiveTimeLineID(NeonWALReader *state); extern NeonWALReadResult NeonWALRead(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli); extern pgsocket NeonWALReaderSocket(NeonWALReader *state); extern uint32 NeonWALReaderEvents(NeonWALReader *state); diff --git a/pgxn/neon/walproposer.c b/pgxn/neon/walproposer.c index db63b6352f..c345ce667d 100644 --- a/pgxn/neon/walproposer.c +++ b/pgxn/neon/walproposer.c @@ -98,6 +98,7 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api) wp = palloc0(sizeof(WalProposer)); wp->config = config; wp->api = api; + wp->localTimeLineID = config->pgTimeline; wp->state = WPS_COLLECTING_TERMS; wp->mconf.generation = INVALID_GENERATION; wp->mconf.members.len = 0; diff --git a/pgxn/neon/walproposer.h b/pgxn/neon/walproposer.h index 69ff5c409f..727cf276c1 100644 --- a/pgxn/neon/walproposer.h +++ b/pgxn/neon/walproposer.h @@ -810,6 +810,8 @@ typedef struct WalProposer /* WAL has been generated up to this point */ XLogRecPtr availableLsn; + /* Current local TimeLineId in use */ + TimeLineID localTimeLineID; /* cached GetAcknowledgedByQuorumWALPosition result */ XLogRecPtr commitLsn; diff --git a/pgxn/neon/walproposer_pg.c b/pgxn/neon/walproposer_pg.c index 265a5561ad..af5ee6de46 100644 --- a/pgxn/neon/walproposer_pg.c +++ b/pgxn/neon/walproposer_pg.c @@ -172,6 +172,7 @@ WalProposerMain(Datum main_arg) walprop_pg_load_libpqwalreceiver(); wp = WalProposerCreate(&walprop_config, walprop_pg); + wp->localTimeLineID = GetWALInsertionTimeLine(); wp->last_reconnect_attempt = walprop_pg_get_current_timestamp(wp); walprop_pg_init_walsender(); @@ -1499,7 +1500,10 @@ walprop_pg_wal_reader_allocate(Safekeeper *sk) snprintf(log_prefix, sizeof(log_prefix), WP_LOG_PREFIX "sk %s:%s nwr: ", sk->host, sk->port); Assert(!sk->xlogreader); - sk->xlogreader = NeonWALReaderAllocate(wal_segment_size, sk->wp->propTermStartLsn, log_prefix); + /* note that WalProposer shouldn't access safekeepers when active */ + sk->xlogreader = NeonWALReaderAllocate(wal_segment_size, + sk->wp->propTermStartLsn, log_prefix, + sk->wp->localTimeLineID); if (sk->xlogreader == NULL) wpg_log(FATAL, "failed to allocate xlog reader"); } @@ -1513,7 +1517,7 @@ walprop_pg_wal_read(Safekeeper *sk, char *buf, XLogRecPtr startptr, Size count, buf, startptr, count, - walprop_pg_get_timeline_id()); + sk->wp->localTimeLineID); if (res == NEON_WALREAD_SUCCESS) { diff --git a/pgxn/neon/walsender_hooks.c b/pgxn/neon/walsender_hooks.c index 81198d6c8d..704848aa45 100644 --- a/pgxn/neon/walsender_hooks.c +++ b/pgxn/neon/walsender_hooks.c @@ -68,8 +68,7 @@ NeonWALReadWaitForWAL(XLogRecPtr loc) } static int -NeonWALPageRead( - XLogReaderState *xlogreader, +NeonWALPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, @@ -106,12 +105,11 @@ NeonWALPageRead( for (;;) { - NeonWALReadResult res = NeonWALRead( - wal_reader, + NeonWALReadResult res = NeonWALRead(wal_reader, readBuf, targetPagePtr, count, - walprop_pg_get_timeline_id()); + NeonWALReaderLocalActiveTimeLineID(wal_reader)); if (res == NEON_WALREAD_SUCCESS) { @@ -202,7 +200,8 @@ NeonOnDemandXLogReaderRoutines(XLogReaderRoutine *xlr) { elog(ERROR, "unable to start walsender when basebackupLsn is 0"); } - wal_reader = NeonWALReaderAllocate(wal_segment_size, basebackupLsn, "[walsender] "); + wal_reader = NeonWALReaderAllocate(wal_segment_size, basebackupLsn, + "[walsender] ", 1); } xlr->page_read = NeonWALPageRead; xlr->segment_open = NeonWALReadSegmentOpen;