Remove dead code from walproposer_utils.c (#4525)

This code was mostly copied from walsender.c and the idea was to keep it
similar to walsender.c, so that we can easily copy-paste future upstream
changes to walsender.c to waproposer_utils.c, too. But right now I see that
deleting it doesn't break anything, so it's better to remove unused parts.
This commit is contained in:
Arthur Petukhovsky
2023-08-14 10:49:51 +02:00
committed by GitHub
parent 8173813584
commit 763f5c0641

View File

@@ -37,68 +37,14 @@ static XLogSegNo walpropSegNo = 0;
/* START cloned file-local variables and functions from walsender.c */
/*
* xlogreader used for replication. Note that a WAL sender doing physical
* replication does not need xlogreader to read WAL, but it needs one to
* keep a state of its work.
*/
static XLogReaderState *xlogreader = NULL;
/*
* These variables keep track of the state of the timeline we're currently
* sending. sendTimeLine identifies the timeline. If sendTimeLineIsHistoric,
* the timeline is not the latest timeline on this server, and the server's
* history forked off from that timeline at sendTimeLineValidUpto.
*/
static TimeLineID sendTimeLine = 0;
static TimeLineID sendTimeLineNextTLI = 0;
static bool sendTimeLineIsHistoric = false;
static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr;
/*
* Timestamp of last ProcessRepliesIfAny() that saw a reply from the
* standby. Set to 0 if wal_sender_timeout doesn't need to be active.
*/
static TimestampTz last_reply_timestamp = 0;
/* Have we sent a heartbeat message asking for reply, since last reply? */
static bool waiting_for_ping_response = false;
static bool streamingDoneSending;
static bool streamingDoneReceiving;
/* Are we there yet? */
static bool WalSndCaughtUp = false;
/* Flags set by signal handlers for later service in main loop */
static volatile sig_atomic_t got_STOPPING = false;
/*
* How far have we sent WAL already? This is also advertised in
* MyWalSnd->sentPtr. (Actually, this is the next WAL location to send.)
*/
static XLogRecPtr sentPtr = InvalidXLogRecPtr;
/*
* This is set while we are streaming. When not set
* PROCSIG_WALSND_INIT_STOPPING signal will be handled like SIGTERM. When set,
* the main loop is responsible for checking got_STOPPING and terminating when
* it's set (after streaming any remaining WAL).
*/
static volatile sig_atomic_t replication_active = false;
typedef void (*WalSndSendDataCallback) (void);
static void WalSndLoop(WalSndSendDataCallback send_data);
static void XLogSendPhysical(void);
#if PG_VERSION_NUM >= 150000
static XLogRecPtr GetStandbyFlushRecPtr(TimeLineID *tli);
#else
static XLogRecPtr GetStandbyFlushRecPtr(void);
#endif
static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo,
TimeLineID *tli_p);
static void WalSndLoop(void);
static void XLogBroadcastWalProposer(void);
/* END cloned file-level variables and functions from walsender.c */
int
@@ -506,7 +452,7 @@ XLogWalPropClose(XLogRecPtr recptr)
/* START of cloned functions from walsender.c */
/*
* Handle START_REPLICATION command.
* Subscribe for new WAL and stream it in the loop to safekeepers.
*
* At the moment, this never returns, but an ereport(ERROR) will take us back
* to the main loop.
@@ -524,18 +470,6 @@ StartProposerReplication(StartReplicationCmd *cmd)
errmsg("IDENTIFY_SYSTEM has not been run before START_REPLICATION")));
#endif
/* create xlogreader for physical replication */
xlogreader =
XLogReaderAllocate(wal_segment_size, NULL,
XL_ROUTINE(.segment_open = WalSndSegmentOpen,
.segment_close = wal_segment_close),
NULL);
if (!xlogreader)
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of memory")));
/*
* We assume here that we're logging enough information in the WAL for
* log-shipping, since this is checked in PostmasterMain().
@@ -569,341 +503,61 @@ StartProposerReplication(StartReplicationCmd *cmd)
* we keep this code around to lighten the load for when we need it.
*/
#if PG_VERSION_NUM >= 150000
if (am_cascading_walsender)
{
/* this also updates ThisTimeLineID */
FlushPtr = GetStandbyFlushRecPtr(&currTLI);
}
else
FlushPtr = GetFlushRecPtr(&currTLI);
FlushPtr = GetFlushRecPtr(&currTLI);
#else
if (am_cascading_walsender)
{
/* this also updates ThisTimeLineID */
FlushPtr = GetStandbyFlushRecPtr();
}
else
FlushPtr = GetFlushRecPtr();
FlushPtr = GetFlushRecPtr();
currTLI = ThisTimeLineID;
#endif
/*
* When we first start replication the standby will be behind the
* primary. For some applications, for example synchronous
* replication, it is important to have a clear state for this initial
* catchup mode, so we can trigger actions when we change streaming
* state later. We may stay in this state for a long time, which is
* exactly why we want to be able to monitor whether or not we are
* still here.
*/
WalSndSetState(WALSNDSTATE_CATCHUP);
if (cmd->timeline != 0)
/*
* Don't allow a request to stream from a future point in WAL that
* hasn't been flushed to disk in this server yet.
*/
if (FlushPtr < cmd->startpoint)
{
XLogRecPtr switchpoint;
sendTimeLine = cmd->timeline;
if (sendTimeLine == currTLI)
{
sendTimeLineIsHistoric = false;
sendTimeLineValidUpto = InvalidXLogRecPtr;
}
else
{
List *timeLineHistory;
sendTimeLineIsHistoric = true;
/*
* Check that the timeline the client requested exists, and the
* requested start location is on that timeline.
*/
timeLineHistory = readTimeLineHistory(currTLI);
switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
&sendTimeLineNextTLI);
list_free_deep(timeLineHistory);
/*
* Found the requested timeline in the history. Check that
* requested startpoint is on that timeline in our history.
*
* This is quite loose on purpose. We only check that we didn't
* fork off the requested timeline before the switchpoint. We
* don't check that we switched *to* it before the requested
* starting point. This is because the client can legitimately
* request to start replication from the beginning of the WAL
* segment that contains switchpoint, but on the new timeline, so
* that it doesn't end up with a partial segment. If you ask for
* too old a starting point, you'll get an error later when we
* fail to find the requested WAL segment in pg_wal.
*
* XXX: we could be more strict here and only allow a startpoint
* that's older than the switchpoint, if it's still in the same
* WAL segment.
*/
if (!XLogRecPtrIsInvalid(switchpoint) &&
switchpoint < cmd->startpoint)
{
ereport(ERROR,
(errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
LSN_FORMAT_ARGS(cmd->startpoint),
cmd->timeline),
errdetail("This server's history forked from timeline %u at %X/%X.",
cmd->timeline,
LSN_FORMAT_ARGS(switchpoint))));
}
sendTimeLineValidUpto = switchpoint;
}
}
else
{
sendTimeLine = currTLI;
sendTimeLineValidUpto = InvalidXLogRecPtr;
sendTimeLineIsHistoric = false;
ereport(ERROR,
(errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
LSN_FORMAT_ARGS(cmd->startpoint),
LSN_FORMAT_ARGS(FlushPtr))));
}
streamingDoneSending = streamingDoneReceiving = false;
/* Start streaming from the requested point */
sentPtr = cmd->startpoint;
/* If there is nothing to stream, don't even enter COPY mode */
if (!sendTimeLineIsHistoric || cmd->startpoint < sendTimeLineValidUpto)
{
/*
* When we first start replication the standby will be behind the
* primary. For some applications, for example synchronous
* replication, it is important to have a clear state for this initial
* catchup mode, so we can trigger actions when we change streaming
* state later. We may stay in this state for a long time, which is
* exactly why we want to be able to monitor whether or not we are
* still here.
*/
WalSndSetState(WALSNDSTATE_CATCHUP);
/* Initialize shared memory status, too */
SpinLockAcquire(&MyWalSnd->mutex);
MyWalSnd->sentPtr = sentPtr;
SpinLockRelease(&MyWalSnd->mutex);
/*
* Don't allow a request to stream from a future point in WAL that
* hasn't been flushed to disk in this server yet.
*/
if (FlushPtr < cmd->startpoint)
{
ereport(ERROR,
(errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
LSN_FORMAT_ARGS(cmd->startpoint),
LSN_FORMAT_ARGS(FlushPtr))));
}
SyncRepInitConfig();
/* Start streaming from the requested point */
sentPtr = cmd->startpoint;
/* Infinite send loop, never returns */
WalSndLoop();
/* Initialize shared memory status, too */
SpinLockAcquire(&MyWalSnd->mutex);
MyWalSnd->sentPtr = sentPtr;
SpinLockRelease(&MyWalSnd->mutex);
SyncRepInitConfig();
/* Main loop of walsender */
replication_active = true;
WalSndLoop(XLogSendPhysical);
replication_active = false;
if (got_STOPPING)
proc_exit(0);
WalSndSetState(WALSNDSTATE_STARTUP);
Assert(streamingDoneSending && streamingDoneReceiving);
}
WalSndSetState(WALSNDSTATE_STARTUP);
if (cmd->slotname)
ReplicationSlotRelease();
/*
* Copy is finished now. Send a single-row result set indicating the next
* timeline.
*/
if (sendTimeLineIsHistoric)
{
char startpos_str[8 + 1 + 8 + 1];
DestReceiver *dest;
TupOutputState *tstate;
TupleDesc tupdesc;
Datum values[2];
bool nulls[2];
snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
LSN_FORMAT_ARGS(sendTimeLineValidUpto));
dest = CreateDestReceiver(DestRemoteSimple);
MemSet(nulls, false, sizeof(nulls));
/*
* Need a tuple descriptor representing two columns. int8 may seem
* like a surprising data type for this, but in theory int4 would not
* be wide enough for this, as TimeLineID is unsigned.
*/
tupdesc = CreateTemplateTupleDesc(2);
TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",
INT8OID, -1, 0);
TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",
TEXTOID, -1, 0);
/* prepare for projection of tuple */
tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
values[0] = Int64GetDatum((int64) sendTimeLineNextTLI);
values[1] = CStringGetTextDatum(startpos_str);
/* send it to dest */
do_tup_output(tstate, values, nulls);
end_tup_output(tstate);
}
/* Send CommandComplete message */
EndReplicationCommand("START_STREAMING");
}
#if PG_VERSION_NUM >= 150000
static XLogRecPtr
GetStandbyFlushRecPtr(TimeLineID *tli)
{
XLogRecPtr replayPtr;
TimeLineID replayTLI;
XLogRecPtr receivePtr;
TimeLineID receiveTLI;
XLogRecPtr result;
/*
* We can safely send what's already been replayed. Also, if walreceiver
* is streaming WAL from the same timeline, we can send anything that it
* has streamed, but hasn't been replayed yet.
*/
receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI);
replayPtr = GetXLogReplayRecPtr(&replayTLI);
*tli = replayTLI;
result = replayPtr;
if (receiveTLI == replayTLI && receivePtr > replayPtr)
result = receivePtr;
return result;
}
#else
/*
* Returns the latest point in WAL that has been safely flushed to disk, and
* can be sent to the standby. This should only be called when in recovery,
* ie. we're streaming to a cascaded standby.
*
* As a side-effect, ThisTimeLineID is updated to the TLI of the last
* replayed WAL record.
* Main loop that waits for LSN updates and calls the walproposer.
* Synchronous replication sets latch in WalSndWakeup at walsender.c
*/
static XLogRecPtr
GetStandbyFlushRecPtr(void)
{
XLogRecPtr replayPtr;
TimeLineID replayTLI;
XLogRecPtr receivePtr;
TimeLineID receiveTLI;
XLogRecPtr result;
/*
* We can safely send what's already been replayed. Also, if walreceiver
* is streaming WAL from the same timeline, we can send anything that it
* has streamed, but hasn't been replayed yet.
*/
receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI);
replayPtr = GetXLogReplayRecPtr(&replayTLI);
ThisTimeLineID = replayTLI;
result = replayPtr;
if (receiveTLI == ThisTimeLineID && receivePtr > replayPtr)
result = receivePtr;
return result;
}
#endif
/* XLogReaderRoutine->segment_open callback */
static void
WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo,
TimeLineID *tli_p)
WalSndLoop(void)
{
char path[MAXPGPATH];
/*-------
* When reading from a historic timeline, and there is a timeline switch
* within this segment, read from the WAL segment belonging to the new
* timeline.
*
* For example, imagine that this server is currently on timeline 5, and
* we're streaming timeline 4. The switch from timeline 4 to 5 happened at
* 0/13002088. In pg_wal, we have these files:
*
* ...
* 000000040000000000000012
* 000000040000000000000013
* 000000050000000000000013
* 000000050000000000000014
* ...
*
* In this situation, when requested to send the WAL from segment 0x13, on
* timeline 4, we read the WAL from file 000000050000000000000013. Archive
* recovery prefers files from newer timelines, so if the segment was
* restored from the archive on this server, the file belonging to the old
* timeline, 000000040000000000000013, might not exist. Their contents are
* equal up to the switchpoint, because at a timeline switch, the used
* portion of the old segment is copied to the new file. -------
*/
*tli_p = sendTimeLine;
if (sendTimeLineIsHistoric)
{
XLogSegNo endSegNo;
XLByteToSeg(sendTimeLineValidUpto, endSegNo, state->segcxt.ws_segsize);
if (nextSegNo == endSegNo)
*tli_p = sendTimeLineNextTLI;
}
XLogFilePath(path, *tli_p, nextSegNo, state->segcxt.ws_segsize);
state->seg.ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY);
if (state->seg.ws_file >= 0)
return;
/*
* If the file is not found, assume it's because the standby asked for a
* too old WAL segment that has already been removed or recycled.
*/
if (errno == ENOENT)
{
char xlogfname[MAXFNAMELEN];
int save_errno = errno;
XLogFileName(xlogfname, *tli_p, nextSegNo, wal_segment_size);
errno = save_errno;
ereport(ERROR,
(errcode_for_file_access(),
errmsg("requested WAL segment %s has already been removed",
xlogfname)));
}
else
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not open file \"%s\": %m",
path)));
}
/* Main loop of walsender process that streams the WAL over Copy messages. */
static void
WalSndLoop(WalSndSendDataCallback send_data)
{
/*
* Initialize the last reply timestamp. That enables timeout processing
* from hereon.
*/
last_reply_timestamp = GetCurrentTimestamp();
waiting_for_ping_response = false;
/*
* Loop until we reach the end of this timeline or the client requests to
* stop streaming.
*/
for (;;)
{
/* Clear any already-pending wakeups */
@@ -911,153 +565,41 @@ WalSndLoop(WalSndSendDataCallback send_data)
CHECK_FOR_INTERRUPTS();
/* Process any requests or signals received recently */
if (ConfigReloadPending)
{
ConfigReloadPending = false;
ProcessConfigFile(PGC_SIGHUP);
SyncRepInitConfig();
}
XLogBroadcastWalProposer();
/* always true */
if (am_wal_proposer)
{
send_data();
if (WalSndCaughtUp)
{
if (MyWalSnd->state == WALSNDSTATE_CATCHUP)
WalSndSetState(WALSNDSTATE_STREAMING);
WalProposerPoll();
WalSndCaughtUp = false;
}
continue;
}
if (MyWalSnd->state == WALSNDSTATE_CATCHUP)
WalSndSetState(WALSNDSTATE_STREAMING);
WalProposerPoll();
}
}
/*
* Send out the WAL in its normal physical/stored form.
*
* Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk,
* but not yet sent to the client, and buffer it in the libpq output
* buffer.
*
* If there is no unsent WAL remaining, WalSndCaughtUp is set to true,
* otherwise WalSndCaughtUp is set to false.
* Notify walproposer about the new WAL position.
*/
static void
XLogSendPhysical(void)
XLogBroadcastWalProposer(void)
{
XLogRecPtr SendRqstPtr;
XLogRecPtr startptr;
XLogRecPtr endptr;
Size nbytes PG_USED_FOR_ASSERTS_ONLY;
TimeLineID currTLI;
/* If requested switch the WAL sender to the stopping state. */
if (got_STOPPING)
WalSndSetState(WALSNDSTATE_STOPPING);
/* Start from the last sent position */
startptr = sentPtr;
if (streamingDoneSending)
{
WalSndCaughtUp = true;
return;
}
/* Figure out how far we can safely send the WAL. */
if (sendTimeLineIsHistoric)
{
/*
* Streaming an old timeline that's in this server's history, but is
* not the one we're currently inserting or replaying. It can be
* streamed up to the point where we switched off that timeline.
*/
SendRqstPtr = sendTimeLineValidUpto;
}
else if (am_cascading_walsender)
{
/*
* Streaming the latest timeline on a standby.
*
* Attempt to send all WAL that has already been replayed, so that we
* know it's valid. If we're receiving WAL through streaming
* replication, it's also OK to send any WAL that has been received
* but not replayed.
*
* The timeline we're recovering from can change, or we can be
* promoted. In either case, the current timeline becomes historic. We
* need to detect that so that we don't try to stream past the point
* where we switched to another timeline. We check for promotion or
* timeline switch after calculating FlushPtr, to avoid a race
* condition: if the timeline becomes historic just after we checked
* that it was still current, it's still be OK to stream it up to the
* FlushPtr that was calculated before it became historic.
*/
bool becameHistoric = false;
/*
* Streaming the current timeline on a primary.
*
* Attempt to send all data that's already been written out and
* fsync'd to disk. We cannot go further than what's been written out
* given the current implementation of WALRead(). And in any case
* it's unsafe to send WAL that is not securely down to disk on the
* primary: if the primary subsequently crashes and restarts, standbys
* must not have applied any WAL that got lost on the primary.
*/
#if PG_VERSION_NUM >= 150000
SendRqstPtr = GetStandbyFlushRecPtr(&currTLI);
endptr = GetFlushRecPtr(NULL);
#else
SendRqstPtr = GetStandbyFlushRecPtr();
currTLI = ThisTimeLineID;
endptr = GetFlushRecPtr();
#endif
if (!RecoveryInProgress())
{
/*
* We have been promoted. RecoveryInProgress() updated
* ThisTimeLineID to the new current timeline.
*/
am_cascading_walsender = false;
becameHistoric = true;
}
else
{
/*
* Still a cascading standby. But is the timeline we're sending
* still the one recovery is recovering from? currTLI was updated
* by the GetStandbyFlushRecPtr() call above.
*/
if (sendTimeLine != currTLI)
becameHistoric = true;
}
if (becameHistoric)
{
/*
* The timeline we were sending has become historic. Read the
* timeline history file of the new timeline to see where exactly
* we forked off from the timeline we were sending.
*/
List *history;
history = readTimeLineHistory(currTLI);
sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, history, &sendTimeLineNextTLI);
Assert(sendTimeLine < sendTimeLineNextTLI);
list_free_deep(history);
sendTimeLineIsHistoric = true;
SendRqstPtr = sendTimeLineValidUpto;
}
}
else
{
/*
* Streaming the current timeline on a primary.
*
* Attempt to send all data that's already been written out and
* fsync'd to disk. We cannot go further than what's been written out
* given the current implementation of WALRead(). And in any case
* it's unsafe to send WAL that is not securely down to disk on the
* primary: if the primary subsequently crashes and restarts, standbys
* must not have applied any WAL that got lost on the primary.
*/
#if PG_VERSION_NUM >= 150000
SendRqstPtr = GetFlushRecPtr(NULL);
#else
SendRqstPtr = GetFlushRecPtr();
#endif
}
/*
* Record the current system time as an approximation of the time at which
@@ -1083,91 +625,14 @@ XLogSendPhysical(void)
* that arbitrary LSN is eventually reported as written, flushed and
* applied, so that it can measure the elapsed time.
*/
LagTrackerWrite(SendRqstPtr, GetCurrentTimestamp());
/*
* If this is a historic timeline and we've reached the point where we
* forked to the next timeline, stop streaming.
*
* Note: We might already have sent WAL > sendTimeLineValidUpto. The
* startup process will normally replay all WAL that has been received
* from the primary, before promoting, but if the WAL streaming is
* terminated at a WAL page boundary, the valid portion of the timeline
* might end in the middle of a WAL record. We might've already sent the
* first half of that partial WAL record to the cascading standby, so that
* sentPtr > sendTimeLineValidUpto. That's OK; the cascading standby can't
* replay the partial WAL record either, so it can still follow our
* timeline switch.
*/
if (sendTimeLineIsHistoric && sendTimeLineValidUpto <= sentPtr)
{
/* close the current file. */
if (xlogreader->seg.ws_file >= 0)
wal_segment_close(xlogreader);
/* Send CopyDone */
pq_putmessage_noblock('c', NULL, 0);
streamingDoneSending = true;
WalSndCaughtUp = true;
elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)",
LSN_FORMAT_ARGS(sendTimeLineValidUpto),
LSN_FORMAT_ARGS(sentPtr));
return;
}
LagTrackerWrite(endptr, GetCurrentTimestamp());
/* Do we have any work to do? */
Assert(sentPtr <= SendRqstPtr);
if (SendRqstPtr <= sentPtr)
{
WalSndCaughtUp = true;
Assert(startptr <= endptr);
if (endptr <= startptr)
return;
}
/*
* Figure out how much to send in one message. If there's no more than
* MAX_SEND_SIZE bytes to send, send everything. Otherwise send
* MAX_SEND_SIZE bytes, but round back to logfile or page boundary.
*
* The rounding is not only for performance reasons. Walreceiver relies on
* the fact that we never split a WAL record across two messages. Since a
* long WAL record is split at page boundary into continuation records,
* page boundary is always a safe cut-off point. We also assume that
* SendRqstPtr never points to the middle of a WAL record.
*/
startptr = sentPtr;
endptr = startptr;
endptr += MAX_SEND_SIZE;
/* if we went beyond SendRqstPtr, back off */
if (SendRqstPtr <= endptr)
{
endptr = SendRqstPtr;
if (sendTimeLineIsHistoric)
WalSndCaughtUp = false;
else
WalSndCaughtUp = true;
}
else
{
/* round down to page boundary. */
endptr -= (endptr % XLOG_BLCKSZ);
WalSndCaughtUp = false;
}
nbytes = endptr - startptr;
Assert(nbytes <= MAX_SEND_SIZE);
/* always true */
if (am_wal_proposer)
{
WalProposerBroadcast(startptr, endptr);
}
else
{
/* code removed for brevity */
}
WalProposerBroadcast(startptr, endptr);
sentPtr = endptr;
/* Update shared memory status */