diff --git a/pgxn/neon/walproposer_utils.c b/pgxn/neon/walproposer_utils.c index e1dcaa081d..9e1fc11756 100644 --- a/pgxn/neon/walproposer_utils.c +++ b/pgxn/neon/walproposer_utils.c @@ -37,68 +37,14 @@ static XLogSegNo walpropSegNo = 0; /* START cloned file-local variables and functions from walsender.c */ -/* - * xlogreader used for replication. Note that a WAL sender doing physical - * replication does not need xlogreader to read WAL, but it needs one to - * keep a state of its work. - */ -static XLogReaderState *xlogreader = NULL; - -/* - * These variables keep track of the state of the timeline we're currently - * sending. sendTimeLine identifies the timeline. If sendTimeLineIsHistoric, - * the timeline is not the latest timeline on this server, and the server's - * history forked off from that timeline at sendTimeLineValidUpto. - */ -static TimeLineID sendTimeLine = 0; -static TimeLineID sendTimeLineNextTLI = 0; -static bool sendTimeLineIsHistoric = false; -static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr; - -/* - * Timestamp of last ProcessRepliesIfAny() that saw a reply from the - * standby. Set to 0 if wal_sender_timeout doesn't need to be active. - */ -static TimestampTz last_reply_timestamp = 0; - -/* Have we sent a heartbeat message asking for reply, since last reply? */ -static bool waiting_for_ping_response = false; - -static bool streamingDoneSending; -static bool streamingDoneReceiving; - -/* Are we there yet? */ -static bool WalSndCaughtUp = false; - -/* Flags set by signal handlers for later service in main loop */ -static volatile sig_atomic_t got_STOPPING = false; - /* * How far have we sent WAL already? This is also advertised in * MyWalSnd->sentPtr. (Actually, this is the next WAL location to send.) */ static XLogRecPtr sentPtr = InvalidXLogRecPtr; -/* - * This is set while we are streaming. When not set - * PROCSIG_WALSND_INIT_STOPPING signal will be handled like SIGTERM. When set, - * the main loop is responsible for checking got_STOPPING and terminating when - * it's set (after streaming any remaining WAL). - */ -static volatile sig_atomic_t replication_active = false; - -typedef void (*WalSndSendDataCallback) (void); -static void WalSndLoop(WalSndSendDataCallback send_data); -static void XLogSendPhysical(void); -#if PG_VERSION_NUM >= 150000 -static XLogRecPtr GetStandbyFlushRecPtr(TimeLineID *tli); -#else -static XLogRecPtr GetStandbyFlushRecPtr(void); -#endif - -static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo, - TimeLineID *tli_p); - +static void WalSndLoop(void); +static void XLogBroadcastWalProposer(void); /* END cloned file-level variables and functions from walsender.c */ int @@ -506,7 +452,7 @@ XLogWalPropClose(XLogRecPtr recptr) /* START of cloned functions from walsender.c */ /* - * Handle START_REPLICATION command. + * Subscribe for new WAL and stream it in the loop to safekeepers. * * At the moment, this never returns, but an ereport(ERROR) will take us back * to the main loop. @@ -524,18 +470,6 @@ StartProposerReplication(StartReplicationCmd *cmd) errmsg("IDENTIFY_SYSTEM has not been run before START_REPLICATION"))); #endif - /* create xlogreader for physical replication */ - xlogreader = - XLogReaderAllocate(wal_segment_size, NULL, - XL_ROUTINE(.segment_open = WalSndSegmentOpen, - .segment_close = wal_segment_close), - NULL); - - if (!xlogreader) - ereport(ERROR, - (errcode(ERRCODE_OUT_OF_MEMORY), - errmsg("out of memory"))); - /* * We assume here that we're logging enough information in the WAL for * log-shipping, since this is checked in PostmasterMain(). @@ -569,341 +503,61 @@ StartProposerReplication(StartReplicationCmd *cmd) * we keep this code around to lighten the load for when we need it. */ #if PG_VERSION_NUM >= 150000 - if (am_cascading_walsender) - { - /* this also updates ThisTimeLineID */ - FlushPtr = GetStandbyFlushRecPtr(&currTLI); - } - else - FlushPtr = GetFlushRecPtr(&currTLI); + FlushPtr = GetFlushRecPtr(&currTLI); #else - if (am_cascading_walsender) - { - /* this also updates ThisTimeLineID */ - FlushPtr = GetStandbyFlushRecPtr(); - } - else - FlushPtr = GetFlushRecPtr(); - + FlushPtr = GetFlushRecPtr(); currTLI = ThisTimeLineID; #endif + /* + * When we first start replication the standby will be behind the + * primary. For some applications, for example synchronous + * replication, it is important to have a clear state for this initial + * catchup mode, so we can trigger actions when we change streaming + * state later. We may stay in this state for a long time, which is + * exactly why we want to be able to monitor whether or not we are + * still here. + */ + WalSndSetState(WALSNDSTATE_CATCHUP); - if (cmd->timeline != 0) + /* + * Don't allow a request to stream from a future point in WAL that + * hasn't been flushed to disk in this server yet. + */ + if (FlushPtr < cmd->startpoint) { - XLogRecPtr switchpoint; - - sendTimeLine = cmd->timeline; - if (sendTimeLine == currTLI) - { - sendTimeLineIsHistoric = false; - sendTimeLineValidUpto = InvalidXLogRecPtr; - } - else - { - List *timeLineHistory; - - sendTimeLineIsHistoric = true; - - /* - * Check that the timeline the client requested exists, and the - * requested start location is on that timeline. - */ - timeLineHistory = readTimeLineHistory(currTLI); - switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory, - &sendTimeLineNextTLI); - list_free_deep(timeLineHistory); - - /* - * Found the requested timeline in the history. Check that - * requested startpoint is on that timeline in our history. - * - * This is quite loose on purpose. We only check that we didn't - * fork off the requested timeline before the switchpoint. We - * don't check that we switched *to* it before the requested - * starting point. This is because the client can legitimately - * request to start replication from the beginning of the WAL - * segment that contains switchpoint, but on the new timeline, so - * that it doesn't end up with a partial segment. If you ask for - * too old a starting point, you'll get an error later when we - * fail to find the requested WAL segment in pg_wal. - * - * XXX: we could be more strict here and only allow a startpoint - * that's older than the switchpoint, if it's still in the same - * WAL segment. - */ - if (!XLogRecPtrIsInvalid(switchpoint) && - switchpoint < cmd->startpoint) - { - ereport(ERROR, - (errmsg("requested starting point %X/%X on timeline %u is not in this server's history", - LSN_FORMAT_ARGS(cmd->startpoint), - cmd->timeline), - errdetail("This server's history forked from timeline %u at %X/%X.", - cmd->timeline, - LSN_FORMAT_ARGS(switchpoint)))); - } - sendTimeLineValidUpto = switchpoint; - } - } - else - { - sendTimeLine = currTLI; - sendTimeLineValidUpto = InvalidXLogRecPtr; - sendTimeLineIsHistoric = false; + ereport(ERROR, + (errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X", + LSN_FORMAT_ARGS(cmd->startpoint), + LSN_FORMAT_ARGS(FlushPtr)))); } - streamingDoneSending = streamingDoneReceiving = false; + /* Start streaming from the requested point */ + sentPtr = cmd->startpoint; - /* If there is nothing to stream, don't even enter COPY mode */ - if (!sendTimeLineIsHistoric || cmd->startpoint < sendTimeLineValidUpto) - { - /* - * When we first start replication the standby will be behind the - * primary. For some applications, for example synchronous - * replication, it is important to have a clear state for this initial - * catchup mode, so we can trigger actions when we change streaming - * state later. We may stay in this state for a long time, which is - * exactly why we want to be able to monitor whether or not we are - * still here. - */ - WalSndSetState(WALSNDSTATE_CATCHUP); + /* Initialize shared memory status, too */ + SpinLockAcquire(&MyWalSnd->mutex); + MyWalSnd->sentPtr = sentPtr; + SpinLockRelease(&MyWalSnd->mutex); - /* - * Don't allow a request to stream from a future point in WAL that - * hasn't been flushed to disk in this server yet. - */ - if (FlushPtr < cmd->startpoint) - { - ereport(ERROR, - (errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X", - LSN_FORMAT_ARGS(cmd->startpoint), - LSN_FORMAT_ARGS(FlushPtr)))); - } + SyncRepInitConfig(); - /* Start streaming from the requested point */ - sentPtr = cmd->startpoint; + /* Infinite send loop, never returns */ + WalSndLoop(); - /* Initialize shared memory status, too */ - SpinLockAcquire(&MyWalSnd->mutex); - MyWalSnd->sentPtr = sentPtr; - SpinLockRelease(&MyWalSnd->mutex); - - SyncRepInitConfig(); - - /* Main loop of walsender */ - replication_active = true; - - WalSndLoop(XLogSendPhysical); - - replication_active = false; - if (got_STOPPING) - proc_exit(0); - WalSndSetState(WALSNDSTATE_STARTUP); - - Assert(streamingDoneSending && streamingDoneReceiving); - } + WalSndSetState(WALSNDSTATE_STARTUP); if (cmd->slotname) ReplicationSlotRelease(); - - /* - * Copy is finished now. Send a single-row result set indicating the next - * timeline. - */ - if (sendTimeLineIsHistoric) - { - char startpos_str[8 + 1 + 8 + 1]; - DestReceiver *dest; - TupOutputState *tstate; - TupleDesc tupdesc; - Datum values[2]; - bool nulls[2]; - - snprintf(startpos_str, sizeof(startpos_str), "%X/%X", - LSN_FORMAT_ARGS(sendTimeLineValidUpto)); - - dest = CreateDestReceiver(DestRemoteSimple); - MemSet(nulls, false, sizeof(nulls)); - - /* - * Need a tuple descriptor representing two columns. int8 may seem - * like a surprising data type for this, but in theory int4 would not - * be wide enough for this, as TimeLineID is unsigned. - */ - tupdesc = CreateTemplateTupleDesc(2); - TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli", - INT8OID, -1, 0); - TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos", - TEXTOID, -1, 0); - - /* prepare for projection of tuple */ - tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual); - - values[0] = Int64GetDatum((int64) sendTimeLineNextTLI); - values[1] = CStringGetTextDatum(startpos_str); - - /* send it to dest */ - do_tup_output(tstate, values, nulls); - - end_tup_output(tstate); - } - - /* Send CommandComplete message */ - EndReplicationCommand("START_STREAMING"); } -#if PG_VERSION_NUM >= 150000 -static XLogRecPtr -GetStandbyFlushRecPtr(TimeLineID *tli) -{ - XLogRecPtr replayPtr; - TimeLineID replayTLI; - XLogRecPtr receivePtr; - TimeLineID receiveTLI; - XLogRecPtr result; - - /* - * We can safely send what's already been replayed. Also, if walreceiver - * is streaming WAL from the same timeline, we can send anything that it - * has streamed, but hasn't been replayed yet. - */ - - receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI); - replayPtr = GetXLogReplayRecPtr(&replayTLI); - - *tli = replayTLI; - - result = replayPtr; - if (receiveTLI == replayTLI && receivePtr > replayPtr) - result = receivePtr; - - return result; -} -#else /* - * Returns the latest point in WAL that has been safely flushed to disk, and - * can be sent to the standby. This should only be called when in recovery, - * ie. we're streaming to a cascaded standby. - * - * As a side-effect, ThisTimeLineID is updated to the TLI of the last - * replayed WAL record. + * Main loop that waits for LSN updates and calls the walproposer. + * Synchronous replication sets latch in WalSndWakeup at walsender.c */ -static XLogRecPtr -GetStandbyFlushRecPtr(void) -{ - XLogRecPtr replayPtr; - TimeLineID replayTLI; - XLogRecPtr receivePtr; - TimeLineID receiveTLI; - XLogRecPtr result; - - /* - * We can safely send what's already been replayed. Also, if walreceiver - * is streaming WAL from the same timeline, we can send anything that it - * has streamed, but hasn't been replayed yet. - */ - - receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI); - replayPtr = GetXLogReplayRecPtr(&replayTLI); - - ThisTimeLineID = replayTLI; - - result = replayPtr; - if (receiveTLI == ThisTimeLineID && receivePtr > replayPtr) - result = receivePtr; - - return result; -} -#endif - - - -/* XLogReaderRoutine->segment_open callback */ static void -WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo, - TimeLineID *tli_p) +WalSndLoop(void) { - char path[MAXPGPATH]; - - /*------- - * When reading from a historic timeline, and there is a timeline switch - * within this segment, read from the WAL segment belonging to the new - * timeline. - * - * For example, imagine that this server is currently on timeline 5, and - * we're streaming timeline 4. The switch from timeline 4 to 5 happened at - * 0/13002088. In pg_wal, we have these files: - * - * ... - * 000000040000000000000012 - * 000000040000000000000013 - * 000000050000000000000013 - * 000000050000000000000014 - * ... - * - * In this situation, when requested to send the WAL from segment 0x13, on - * timeline 4, we read the WAL from file 000000050000000000000013. Archive - * recovery prefers files from newer timelines, so if the segment was - * restored from the archive on this server, the file belonging to the old - * timeline, 000000040000000000000013, might not exist. Their contents are - * equal up to the switchpoint, because at a timeline switch, the used - * portion of the old segment is copied to the new file. ------- - */ - *tli_p = sendTimeLine; - if (sendTimeLineIsHistoric) - { - XLogSegNo endSegNo; - - XLByteToSeg(sendTimeLineValidUpto, endSegNo, state->segcxt.ws_segsize); - if (nextSegNo == endSegNo) - *tli_p = sendTimeLineNextTLI; - } - - XLogFilePath(path, *tli_p, nextSegNo, state->segcxt.ws_segsize); - state->seg.ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY); - if (state->seg.ws_file >= 0) - return; - - /* - * If the file is not found, assume it's because the standby asked for a - * too old WAL segment that has already been removed or recycled. - */ - if (errno == ENOENT) - { - char xlogfname[MAXFNAMELEN]; - int save_errno = errno; - - XLogFileName(xlogfname, *tli_p, nextSegNo, wal_segment_size); - errno = save_errno; - ereport(ERROR, - (errcode_for_file_access(), - errmsg("requested WAL segment %s has already been removed", - xlogfname))); - } - else - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not open file \"%s\": %m", - path))); -} - - -/* Main loop of walsender process that streams the WAL over Copy messages. */ -static void -WalSndLoop(WalSndSendDataCallback send_data) -{ - /* - * Initialize the last reply timestamp. That enables timeout processing - * from hereon. - */ - last_reply_timestamp = GetCurrentTimestamp(); - waiting_for_ping_response = false; - - /* - * Loop until we reach the end of this timeline or the client requests to - * stop streaming. - */ for (;;) { /* Clear any already-pending wakeups */ @@ -911,153 +565,41 @@ WalSndLoop(WalSndSendDataCallback send_data) CHECK_FOR_INTERRUPTS(); - /* Process any requests or signals received recently */ - if (ConfigReloadPending) - { - ConfigReloadPending = false; - ProcessConfigFile(PGC_SIGHUP); - SyncRepInitConfig(); - } + XLogBroadcastWalProposer(); - /* always true */ - if (am_wal_proposer) - { - send_data(); - if (WalSndCaughtUp) - { - if (MyWalSnd->state == WALSNDSTATE_CATCHUP) - WalSndSetState(WALSNDSTATE_STREAMING); - WalProposerPoll(); - WalSndCaughtUp = false; - } - continue; - } + if (MyWalSnd->state == WALSNDSTATE_CATCHUP) + WalSndSetState(WALSNDSTATE_STREAMING); + WalProposerPoll(); } } /* - * Send out the WAL in its normal physical/stored form. - * - * Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk, - * but not yet sent to the client, and buffer it in the libpq output - * buffer. - * - * If there is no unsent WAL remaining, WalSndCaughtUp is set to true, - * otherwise WalSndCaughtUp is set to false. + * Notify walproposer about the new WAL position. */ static void -XLogSendPhysical(void) +XLogBroadcastWalProposer(void) { - XLogRecPtr SendRqstPtr; XLogRecPtr startptr; XLogRecPtr endptr; - Size nbytes PG_USED_FOR_ASSERTS_ONLY; - TimeLineID currTLI; - /* If requested switch the WAL sender to the stopping state. */ - if (got_STOPPING) - WalSndSetState(WALSNDSTATE_STOPPING); + /* Start from the last sent position */ + startptr = sentPtr; - if (streamingDoneSending) - { - WalSndCaughtUp = true; - return; - } - - /* Figure out how far we can safely send the WAL. */ - if (sendTimeLineIsHistoric) - { - /* - * Streaming an old timeline that's in this server's history, but is - * not the one we're currently inserting or replaying. It can be - * streamed up to the point where we switched off that timeline. - */ - SendRqstPtr = sendTimeLineValidUpto; - } - else if (am_cascading_walsender) - { - /* - * Streaming the latest timeline on a standby. - * - * Attempt to send all WAL that has already been replayed, so that we - * know it's valid. If we're receiving WAL through streaming - * replication, it's also OK to send any WAL that has been received - * but not replayed. - * - * The timeline we're recovering from can change, or we can be - * promoted. In either case, the current timeline becomes historic. We - * need to detect that so that we don't try to stream past the point - * where we switched to another timeline. We check for promotion or - * timeline switch after calculating FlushPtr, to avoid a race - * condition: if the timeline becomes historic just after we checked - * that it was still current, it's still be OK to stream it up to the - * FlushPtr that was calculated before it became historic. - */ - bool becameHistoric = false; + /* + * Streaming the current timeline on a primary. + * + * Attempt to send all data that's already been written out and + * fsync'd to disk. We cannot go further than what's been written out + * given the current implementation of WALRead(). And in any case + * it's unsafe to send WAL that is not securely down to disk on the + * primary: if the primary subsequently crashes and restarts, standbys + * must not have applied any WAL that got lost on the primary. + */ #if PG_VERSION_NUM >= 150000 - SendRqstPtr = GetStandbyFlushRecPtr(&currTLI); + endptr = GetFlushRecPtr(NULL); #else - SendRqstPtr = GetStandbyFlushRecPtr(); - currTLI = ThisTimeLineID; + endptr = GetFlushRecPtr(); #endif - if (!RecoveryInProgress()) - { - /* - * We have been promoted. RecoveryInProgress() updated - * ThisTimeLineID to the new current timeline. - */ - am_cascading_walsender = false; - becameHistoric = true; - } - else - { - /* - * Still a cascading standby. But is the timeline we're sending - * still the one recovery is recovering from? currTLI was updated - * by the GetStandbyFlushRecPtr() call above. - */ - if (sendTimeLine != currTLI) - becameHistoric = true; - } - - if (becameHistoric) - { - /* - * The timeline we were sending has become historic. Read the - * timeline history file of the new timeline to see where exactly - * we forked off from the timeline we were sending. - */ - List *history; - - history = readTimeLineHistory(currTLI); - sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, history, &sendTimeLineNextTLI); - - Assert(sendTimeLine < sendTimeLineNextTLI); - list_free_deep(history); - - sendTimeLineIsHistoric = true; - - SendRqstPtr = sendTimeLineValidUpto; - } - } - else - { - /* - * Streaming the current timeline on a primary. - * - * Attempt to send all data that's already been written out and - * fsync'd to disk. We cannot go further than what's been written out - * given the current implementation of WALRead(). And in any case - * it's unsafe to send WAL that is not securely down to disk on the - * primary: if the primary subsequently crashes and restarts, standbys - * must not have applied any WAL that got lost on the primary. - */ -#if PG_VERSION_NUM >= 150000 - SendRqstPtr = GetFlushRecPtr(NULL); -#else - SendRqstPtr = GetFlushRecPtr(); -#endif - } /* * Record the current system time as an approximation of the time at which @@ -1083,91 +625,14 @@ XLogSendPhysical(void) * that arbitrary LSN is eventually reported as written, flushed and * applied, so that it can measure the elapsed time. */ - LagTrackerWrite(SendRqstPtr, GetCurrentTimestamp()); - - /* - * If this is a historic timeline and we've reached the point where we - * forked to the next timeline, stop streaming. - * - * Note: We might already have sent WAL > sendTimeLineValidUpto. The - * startup process will normally replay all WAL that has been received - * from the primary, before promoting, but if the WAL streaming is - * terminated at a WAL page boundary, the valid portion of the timeline - * might end in the middle of a WAL record. We might've already sent the - * first half of that partial WAL record to the cascading standby, so that - * sentPtr > sendTimeLineValidUpto. That's OK; the cascading standby can't - * replay the partial WAL record either, so it can still follow our - * timeline switch. - */ - if (sendTimeLineIsHistoric && sendTimeLineValidUpto <= sentPtr) - { - /* close the current file. */ - if (xlogreader->seg.ws_file >= 0) - wal_segment_close(xlogreader); - - /* Send CopyDone */ - pq_putmessage_noblock('c', NULL, 0); - streamingDoneSending = true; - - WalSndCaughtUp = true; - - elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)", - LSN_FORMAT_ARGS(sendTimeLineValidUpto), - LSN_FORMAT_ARGS(sentPtr)); - return; - } + LagTrackerWrite(endptr, GetCurrentTimestamp()); /* Do we have any work to do? */ - Assert(sentPtr <= SendRqstPtr); - if (SendRqstPtr <= sentPtr) - { - WalSndCaughtUp = true; + Assert(startptr <= endptr); + if (endptr <= startptr) return; - } - /* - * Figure out how much to send in one message. If there's no more than - * MAX_SEND_SIZE bytes to send, send everything. Otherwise send - * MAX_SEND_SIZE bytes, but round back to logfile or page boundary. - * - * The rounding is not only for performance reasons. Walreceiver relies on - * the fact that we never split a WAL record across two messages. Since a - * long WAL record is split at page boundary into continuation records, - * page boundary is always a safe cut-off point. We also assume that - * SendRqstPtr never points to the middle of a WAL record. - */ - startptr = sentPtr; - endptr = startptr; - endptr += MAX_SEND_SIZE; - - /* if we went beyond SendRqstPtr, back off */ - if (SendRqstPtr <= endptr) - { - endptr = SendRqstPtr; - if (sendTimeLineIsHistoric) - WalSndCaughtUp = false; - else - WalSndCaughtUp = true; - } - else - { - /* round down to page boundary. */ - endptr -= (endptr % XLOG_BLCKSZ); - WalSndCaughtUp = false; - } - - nbytes = endptr - startptr; - Assert(nbytes <= MAX_SEND_SIZE); - - /* always true */ - if (am_wal_proposer) - { - WalProposerBroadcast(startptr, endptr); - } - else - { - /* code removed for brevity */ - } + WalProposerBroadcast(startptr, endptr); sentPtr = endptr; /* Update shared memory status */