Pass PG's current TimeLineID around to the right places

This allows pg_promote() to change PG's local timeline ID to a new one.
This commit is contained in:
Matthias van de Meent
2025-05-28 14:34:40 +02:00
parent d33fbeee06
commit efb58f0743
6 changed files with 33 additions and 10 deletions

View File

@@ -69,6 +69,7 @@ struct NeonWALReader
WALSegmentContext segcxt;
WALOpenSegment seg;
int wre_errno;
TimeLineID local_active_tlid;
/* Explains failure to read, static for simplicity. */
char err_msg[NEON_WALREADER_ERR_MSG_LEN];
@@ -106,7 +107,8 @@ struct NeonWALReader
/* palloc and initialize NeonWALReader */
NeonWALReader *
NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn, char *log_prefix)
NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn,
char *log_prefix, TimeLineID tlid)
{
NeonWALReader *reader;
@@ -118,6 +120,7 @@ NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn, char *log_
MemoryContextAllocZero(TopMemoryContext, sizeof(NeonWALReader));
reader->available_lsn = available_lsn;
reader->local_active_tlid = tlid;
reader->seg.ws_file = -1;
reader->seg.ws_segno = 0;
reader->seg.ws_tli = 0;
@@ -577,6 +580,17 @@ NeonWALReaderIsRemConnEstablished(NeonWALReader *state)
return state->rem_state == RS_ESTABLISHED;
}
/*
* Whether remote connection is established. Once this is done, until successful
* local read or error socket is stable and user can update socket events
* instead of readding it each time.
*/
TimeLineID
NeonWALReaderLocalActiveTimeLineID(NeonWALReader *state)
{
return state->local_active_tlid;
}
/*
* Returns events user should wait on connection socket or 0 if remote
* connection is not active.

View File

@@ -19,9 +19,12 @@ typedef enum
NEON_WALREAD_ERROR,
} NeonWALReadResult;
extern NeonWALReader *NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn, char *log_prefix);
extern NeonWALReader *NeonWALReaderAllocate(int wal_segment_size,
XLogRecPtr available_lsn,
char *log_prefix, TimeLineID tlid);
extern void NeonWALReaderFree(NeonWALReader *state);
extern void NeonWALReaderResetRemote(NeonWALReader *state);
extern TimeLineID NeonWALReaderLocalActiveTimeLineID(NeonWALReader *state);
extern NeonWALReadResult NeonWALRead(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli);
extern pgsocket NeonWALReaderSocket(NeonWALReader *state);
extern uint32 NeonWALReaderEvents(NeonWALReader *state);

View File

@@ -98,6 +98,7 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api)
wp = palloc0(sizeof(WalProposer));
wp->config = config;
wp->api = api;
wp->localTimeLineID = config->pgTimeline;
wp->state = WPS_COLLECTING_TERMS;
wp->mconf.generation = INVALID_GENERATION;
wp->mconf.members.len = 0;

View File

@@ -810,6 +810,8 @@ typedef struct WalProposer
/* WAL has been generated up to this point */
XLogRecPtr availableLsn;
/* Current local TimeLineId in use */
TimeLineID localTimeLineID;
/* cached GetAcknowledgedByQuorumWALPosition result */
XLogRecPtr commitLsn;

View File

@@ -172,6 +172,7 @@ WalProposerMain(Datum main_arg)
walprop_pg_load_libpqwalreceiver();
wp = WalProposerCreate(&walprop_config, walprop_pg);
wp->localTimeLineID = GetWALInsertionTimeLine();
wp->last_reconnect_attempt = walprop_pg_get_current_timestamp(wp);
walprop_pg_init_walsender();
@@ -1499,7 +1500,10 @@ walprop_pg_wal_reader_allocate(Safekeeper *sk)
snprintf(log_prefix, sizeof(log_prefix), WP_LOG_PREFIX "sk %s:%s nwr: ", sk->host, sk->port);
Assert(!sk->xlogreader);
sk->xlogreader = NeonWALReaderAllocate(wal_segment_size, sk->wp->propTermStartLsn, log_prefix);
/* note that WalProposer shouldn't access safekeepers when active */
sk->xlogreader = NeonWALReaderAllocate(wal_segment_size,
sk->wp->propTermStartLsn, log_prefix,
sk->wp->localTimeLineID);
if (sk->xlogreader == NULL)
wpg_log(FATAL, "failed to allocate xlog reader");
}
@@ -1513,7 +1517,7 @@ walprop_pg_wal_read(Safekeeper *sk, char *buf, XLogRecPtr startptr, Size count,
buf,
startptr,
count,
walprop_pg_get_timeline_id());
sk->wp->localTimeLineID);
if (res == NEON_WALREAD_SUCCESS)
{

View File

@@ -68,8 +68,7 @@ NeonWALReadWaitForWAL(XLogRecPtr loc)
}
static int
NeonWALPageRead(
XLogReaderState *xlogreader,
NeonWALPageRead(XLogReaderState *xlogreader,
XLogRecPtr targetPagePtr,
int reqLen,
XLogRecPtr targetRecPtr,
@@ -106,12 +105,11 @@ NeonWALPageRead(
for (;;)
{
NeonWALReadResult res = NeonWALRead(
wal_reader,
NeonWALReadResult res = NeonWALRead(wal_reader,
readBuf,
targetPagePtr,
count,
walprop_pg_get_timeline_id());
NeonWALReaderLocalActiveTimeLineID(wal_reader));
if (res == NEON_WALREAD_SUCCESS)
{
@@ -202,7 +200,8 @@ NeonOnDemandXLogReaderRoutines(XLogReaderRoutine *xlr)
{
elog(ERROR, "unable to start walsender when basebackupLsn is 0");
}
wal_reader = NeonWALReaderAllocate(wal_segment_size, basebackupLsn, "[walsender] ");
wal_reader = NeonWALReaderAllocate(wal_segment_size, basebackupLsn,
"[walsender] ", 1);
}
xlr->page_read = NeonWALPageRead;
xlr->segment_open = NeonWALReadSegmentOpen;