Rename walproposer_utils to neon_utils

This commit is contained in:
Arthur Petukhovsky
2023-09-25 13:31:48 +00:00
parent 0e1ff5db4c
commit 227eb21303
9 changed files with 783 additions and 792 deletions

View File

@@ -8,10 +8,10 @@ OBJS = \
file_cache.o \
libpagestore.o \
neon.o \
neon_utils.o \
pagestore_smgr.o \
relsize_cache.o \
walproposer.o \
walproposer_utils.o \
walproposer_pg.o \
control_plane_connector.o

View File

@@ -30,7 +30,7 @@
#include "neon.h"
#include "walproposer.h"
#include "walproposer_utils.h"
#include "neon_utils.h"
#define PageStoreTrace DEBUG5

116
pgxn/neon/neon_utils.c Normal file
View File

@@ -0,0 +1,116 @@
#include "postgres.h"
#include "access/timeline.h"
#include "access/xlogutils.h"
#include "common/logging.h"
#include "common/ip.h"
#include "funcapi.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "postmaster/interrupt.h"
#include "replication/slot.h"
#include "replication/walsender_private.h"
#include "storage/ipc.h"
#include "utils/builtins.h"
#include "utils/ps_status.h"
#include "libpq-fe.h"
#include <netinet/tcp.h>
#include <unistd.h>
#if PG_VERSION_NUM >= 150000
#include "access/xlogutils.h"
#include "access/xlogrecovery.h"
#endif
#if PG_MAJORVERSION_NUM >= 16
#include "utils/guc.h"
#endif
/*
* Convert a character which represents a hexadecimal digit to an integer.
*
* Returns -1 if the character is not a hexadecimal digit.
*/
int
HexDecodeChar(char c)
{
if (c >= '0' && c <= '9')
return c - '0';
if (c >= 'a' && c <= 'f')
return c - 'a' + 10;
if (c >= 'A' && c <= 'F')
return c - 'A' + 10;
return -1;
}
/*
* Decode a hex string into a byte string, 2 hex chars per byte.
*
* Returns false if invalid characters are encountered; otherwise true.
*/
bool
HexDecodeString(uint8 *result, char *input, int nbytes)
{
int i;
for (i = 0; i < nbytes; ++i)
{
int n1 = HexDecodeChar(input[i * 2]);
int n2 = HexDecodeChar(input[i * 2 + 1]);
if (n1 < 0 || n2 < 0)
return false;
result[i] = n1 * 16 + n2;
}
return true;
}
/* --------------------------------
* pq_getmsgint32_le - get a binary 4-byte int from a message buffer in native (LE) order
* --------------------------------
*/
uint32
pq_getmsgint32_le(StringInfo msg)
{
uint32 n32;
pq_copymsgbytes(msg, (char *) &n32, sizeof(n32));
return n32;
}
/* --------------------------------
* pq_getmsgint64 - get a binary 8-byte int from a message buffer in native (LE) order
* --------------------------------
*/
uint64
pq_getmsgint64_le(StringInfo msg)
{
uint64 n64;
pq_copymsgbytes(msg, (char *) &n64, sizeof(n64));
return n64;
}
/* append a binary [u]int32 to a StringInfo buffer in native (LE) order */
void
pq_sendint32_le(StringInfo buf, uint32 i)
{
enlargeStringInfo(buf, sizeof(uint32));
memcpy(buf->data + buf->len, &i, sizeof(uint32));
buf->len += sizeof(uint32);
}
/* append a binary [u]int64 to a StringInfo buffer in native (LE) order */
void
pq_sendint64_le(StringInfo buf, uint64 i)
{
enlargeStringInfo(buf, sizeof(uint64));
memcpy(buf->data + buf->len, &i, sizeof(uint64));
buf->len += sizeof(uint64);
}

13
pgxn/neon/neon_utils.h Normal file
View File

@@ -0,0 +1,13 @@
#ifndef __NEON_UTILS_H__
#define __NEON_UTILS_H__
#include "postgres.h"
#include "libpq/pqformat.h"
bool HexDecodeString(uint8 *result, char *input, int nbytes);
uint32 pq_getmsgint32_le(StringInfo msg);
uint64 pq_getmsgint64_le(StringInfo msg);
void pq_sendint32_le(StringInfo buf, uint32 i);
void pq_sendint64_le(StringInfo buf, uint64 i);
#endif /* __NEON_UTILS_H__ */

View File

@@ -72,7 +72,7 @@
#include "neon.h"
#include "walproposer.h"
#include "walproposer_utils.h"
#include "neon_utils.h"
static bool syncSafekeepers = false;
@@ -133,7 +133,6 @@ static void HandleElectedProposer(void);
static term_t GetHighestTerm(TermHistory * th);
static term_t GetEpoch(Safekeeper *sk);
static void DetermineEpochStartLsn(void);
static bool WalProposerRecovery(int donor, TimeLineID timeline, XLogRecPtr startpos, XLogRecPtr endpos);
static void SendProposerElected(Safekeeper *sk);
static void StartStreaming(Safekeeper *sk);
static void SendMessageToNode(Safekeeper *sk);
@@ -151,6 +150,12 @@ static bool BlockingWrite(Safekeeper *sk, void *msg, size_t msg_size, Safekeeper
static bool AsyncWrite(Safekeeper *sk, void *msg, size_t msg_size, SafekeeperState flush_state);
static bool AsyncFlush(Safekeeper *sk);
static int CompareLsn(const void *a, const void *b);
static char *FormatSafekeeperState(SafekeeperState state);
static void AssertEventsOkForState(uint32 events, Safekeeper *sk);
static uint32 SafekeeperStateDesiredEvents(SafekeeperState state);
static char *FormatEvents(uint32 events);
/*
* Entry point for `postgres --sync-safekeepers`.
*/
@@ -1018,7 +1023,7 @@ HandleElectedProposer(void)
LSN_FORMAT_ARGS(truncateLsn),
LSN_FORMAT_ARGS(propEpochStartLsn));
/* Perform recovery */
if (!WalProposerRecovery(donor, greetRequest.timeline, truncateLsn, propEpochStartLsn))
if (!walprop_pg.recovery_download(&safekeeper[donor], greetRequest.timeline, truncateLsn, propEpochStartLsn))
elog(FATAL, "Failed to recover state");
}
else if (syncSafekeepers)
@@ -1215,111 +1220,6 @@ DetermineEpochStartLsn(void)
}
}
/*
* Receive WAL from most advanced safekeeper
*/
static bool
WalProposerRecovery(int donor, TimeLineID timeline, XLogRecPtr startpos, XLogRecPtr endpos)
{
char *err;
WalReceiverConn *wrconn;
WalRcvStreamOptions options;
char conninfo[MAXCONNINFO];
if (!neon_auth_token)
{
memcpy(conninfo, safekeeper[donor].conninfo, MAXCONNINFO);
}
else
{
int written = 0;
written = snprintf((char *) conninfo, MAXCONNINFO, "password=%s %s", neon_auth_token, safekeeper[donor].conninfo);
if (written > MAXCONNINFO || written < 0)
elog(FATAL, "could not append password to the safekeeper connection string");
}
#if PG_MAJORVERSION_NUM < 16
wrconn = walrcv_connect(conninfo, false, "wal_proposer_recovery", &err);
#else
wrconn = walrcv_connect(conninfo, false, false, "wal_proposer_recovery", &err);
#endif
if (!wrconn)
{
ereport(WARNING,
(errmsg("could not connect to WAL acceptor %s:%s: %s",
safekeeper[donor].host, safekeeper[donor].port,
err)));
return false;
}
elog(LOG,
"start recovery from %s:%s starting from %X/%08X till %X/%08X timeline "
"%d",
safekeeper[donor].host, safekeeper[donor].port, (uint32) (startpos >> 32),
(uint32) startpos, (uint32) (endpos >> 32), (uint32) endpos, timeline);
options.logical = false;
options.startpoint = startpos;
options.slotname = NULL;
options.proto.physical.startpointTLI = timeline;
if (walrcv_startstreaming(wrconn, &options))
{
XLogRecPtr rec_start_lsn;
XLogRecPtr rec_end_lsn = 0;
int len;
char *buf;
pgsocket wait_fd = PGINVALID_SOCKET;
while ((len = walrcv_receive(wrconn, &buf, &wait_fd)) >= 0)
{
if (len == 0)
{
(void) WaitLatchOrSocket(
MyLatch, WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE, wait_fd,
-1, WAIT_EVENT_WAL_RECEIVER_MAIN);
}
else
{
Assert(buf[0] == 'w' || buf[0] == 'k');
if (buf[0] == 'k')
continue; /* keepalive */
memcpy(&rec_start_lsn, &buf[XLOG_HDR_START_POS],
sizeof rec_start_lsn);
rec_start_lsn = pg_ntoh64(rec_start_lsn);
rec_end_lsn = rec_start_lsn + len - XLOG_HDR_SIZE;
/* write WAL to disk */
XLogWalPropWrite(&buf[XLOG_HDR_SIZE], len - XLOG_HDR_SIZE, rec_start_lsn);
ereport(DEBUG1,
(errmsg("Recover message %X/%X length %d",
LSN_FORMAT_ARGS(rec_start_lsn), len)));
if (rec_end_lsn >= endpos)
break;
}
}
ereport(LOG,
(errmsg("end of replication stream at %X/%X: %m",
LSN_FORMAT_ARGS(rec_end_lsn))));
walrcv_disconnect(wrconn);
/* failed to receive all WAL till endpos */
if (rec_end_lsn < endpos)
return false;
}
else
{
ereport(LOG,
(errmsg("primary server contains no more WAL on requested timeline %u LSN %X/%08X",
timeline, (uint32) (startpos >> 32), (uint32) startpos)));
return false;
}
return true;
}
/*
* Determine for sk the starting streaming point and send it message
* 1) Announcing we are elected proposer (which immediately advances epoch if
@@ -2253,3 +2153,211 @@ AsyncFlush(Safekeeper *sk)
return false;
}
}
static int
CompareLsn(const void *a, const void *b)
{
XLogRecPtr lsn1 = *((const XLogRecPtr *) a);
XLogRecPtr lsn2 = *((const XLogRecPtr *) b);
if (lsn1 < lsn2)
return -1;
else if (lsn1 == lsn2)
return 0;
else
return 1;
}
/* Returns a human-readable string corresonding to the SafekeeperState
*
* The string should not be freed.
*
* The strings are intended to be used as a prefix to "state", e.g.:
*
* elog(LOG, "currently in %s state", FormatSafekeeperState(sk->state));
*
* If this sort of phrasing doesn't fit the message, instead use something like:
*
* elog(LOG, "currently in state [%s]", FormatSafekeeperState(sk->state));
*/
static char *
FormatSafekeeperState(SafekeeperState state)
{
char *return_val = NULL;
switch (state)
{
case SS_OFFLINE:
return_val = "offline";
break;
case SS_CONNECTING_READ:
case SS_CONNECTING_WRITE:
return_val = "connecting";
break;
case SS_WAIT_EXEC_RESULT:
return_val = "receiving query result";
break;
case SS_HANDSHAKE_RECV:
return_val = "handshake (receiving)";
break;
case SS_VOTING:
return_val = "voting";
break;
case SS_WAIT_VERDICT:
return_val = "wait-for-verdict";
break;
case SS_SEND_ELECTED_FLUSH:
return_val = "send-announcement-flush";
break;
case SS_IDLE:
return_val = "idle";
break;
case SS_ACTIVE:
return_val = "active";
break;
}
Assert(return_val != NULL);
return return_val;
}
/* Asserts that the provided events are expected for given safekeeper's state */
static void
AssertEventsOkForState(uint32 events, Safekeeper *sk)
{
uint32 expected = SafekeeperStateDesiredEvents(sk->state);
/*
* The events are in-line with what we're expecting, under two conditions:
* (a) if we aren't expecting anything, `events` has no read- or
* write-ready component. (b) if we are expecting something, there's
* overlap (i.e. `events & expected != 0`)
*/
bool events_ok_for_state; /* long name so the `Assert` is more
* clear later */
if (expected == WL_NO_EVENTS)
events_ok_for_state = ((events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)) == 0);
else
events_ok_for_state = ((events & expected) != 0);
if (!events_ok_for_state)
{
/*
* To give a descriptive message in the case of failure, we use elog
* and then an assertion that's guaranteed to fail.
*/
elog(WARNING, "events %s mismatched for safekeeper %s:%s in state [%s]",
FormatEvents(events), sk->host, sk->port, FormatSafekeeperState(sk->state));
Assert(events_ok_for_state);
}
}
/* Returns the set of events a safekeeper in this state should be waiting on
*
* This will return WL_NO_EVENTS (= 0) for some events. */
static uint32
SafekeeperStateDesiredEvents(SafekeeperState state)
{
uint32 result = WL_NO_EVENTS;
/* If the state doesn't have a modifier, we can check the base state */
switch (state)
{
/* Connecting states say what they want in the name */
case SS_CONNECTING_READ:
result = WL_SOCKET_READABLE;
break;
case SS_CONNECTING_WRITE:
result = WL_SOCKET_WRITEABLE;
break;
/* Reading states need the socket to be read-ready to continue */
case SS_WAIT_EXEC_RESULT:
case SS_HANDSHAKE_RECV:
case SS_WAIT_VERDICT:
result = WL_SOCKET_READABLE;
break;
/*
* Idle states use read-readiness as a sign that the connection
* has been disconnected.
*/
case SS_VOTING:
case SS_IDLE:
result = WL_SOCKET_READABLE;
break;
/*
* Flush states require write-ready for flushing. Active state
* does both reading and writing.
*
* TODO: SS_ACTIVE sometimes doesn't need to be write-ready. We
* should check sk->flushWrite here to set WL_SOCKET_WRITEABLE.
*/
case SS_SEND_ELECTED_FLUSH:
case SS_ACTIVE:
result = WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE;
break;
/* The offline state expects no events. */
case SS_OFFLINE:
result = WL_NO_EVENTS;
break;
default:
Assert(false);
break;
}
return result;
}
/* Returns a human-readable string corresponding to the event set
*
* If the events do not correspond to something set as the `events` field of a `WaitEvent`, the
* returned string may be meaingless.
*
* The string should not be freed. It should also not be expected to remain the same between
* function calls. */
static char *
FormatEvents(uint32 events)
{
static char return_str[8];
/* Helper variable to check if there's extra bits */
uint32 all_flags = WL_LATCH_SET
| WL_SOCKET_READABLE
| WL_SOCKET_WRITEABLE
| WL_TIMEOUT
| WL_POSTMASTER_DEATH
| WL_EXIT_ON_PM_DEATH
| WL_SOCKET_CONNECTED;
/*
* The formatting here isn't supposed to be *particularly* useful -- it's
* just to give an sense of what events have been triggered without
* needing to remember your powers of two.
*/
return_str[0] = (events & WL_LATCH_SET) ? 'L' : '_';
return_str[1] = (events & WL_SOCKET_READABLE) ? 'R' : '_';
return_str[2] = (events & WL_SOCKET_WRITEABLE) ? 'W' : '_';
return_str[3] = (events & WL_TIMEOUT) ? 'T' : '_';
return_str[4] = (events & WL_POSTMASTER_DEATH) ? 'D' : '_';
return_str[5] = (events & WL_EXIT_ON_PM_DEATH) ? 'E' : '_';
return_str[5] = (events & WL_SOCKET_CONNECTED) ? 'C' : '_';
if (events & (~all_flags))
{
elog(WARNING, "Event formatting found unexpected component %d",
events & (~all_flags));
return_str[6] = '*';
return_str[7] = '\0';
}
else
return_str[6] = '\0';
return (char *) &return_str;
}

View File

@@ -384,7 +384,6 @@ extern void WalProposerBroadcast(XLogRecPtr startpos, XLogRecPtr endpos);
extern void WalProposerPoll(void);
extern void ParsePageserverFeedbackMessage(StringInfo reply_message,
PageserverFeedback *rf);
extern void StartProposerReplication(StartReplicationCmd *cmd);
extern void replication_feedback_set(PageserverFeedback *rf);
extern void replication_feedback_get_lsns(XLogRecPtr *writeLsn, XLogRecPtr *flushLsn, XLogRecPtr *applyLsn);
@@ -474,6 +473,7 @@ typedef struct walproposer_api
PGAsyncReadResult (*conn_async_read) (WalProposerConn * conn, char **buf, int *amount);
PGAsyncWriteResult (*conn_async_write) (WalProposerConn * conn, void const *buf, size_t size);
bool (*conn_blocking_write) (WalProposerConn * conn, void const *buf, size_t size);
bool (*recovery_download) (Safekeeper * sk, TimeLineID timeline, XLogRecPtr startpos, XLogRecPtr endpos);
} walproposer_api;
extern const walproposer_api walprop_pg;

View File

@@ -18,9 +18,7 @@
#include "libpq/pqformat.h"
#include "replication/slot.h"
#include "replication/walreceiver.h"
#if PG_VERSION_NUM >= 160000
#include "replication/walsender_private.h"
#endif
#include "postmaster/bgworker.h"
#include "postmaster/interrupt.h"
#include "postmaster/postmaster.h"
@@ -39,7 +37,6 @@
#include "neon.h"
#include "walproposer.h"
#include "walproposer_utils.h"
#include "libpq-fe.h"
char *wal_acceptors_list = "";
@@ -64,6 +61,15 @@ static shmem_request_hook_type prev_shmem_request_hook = NULL;
static void walproposer_shmem_request(void);
#endif
static XLogRecPtr sentPtr = InvalidXLogRecPtr;
static void StartProposerReplication(StartReplicationCmd *cmd);
static void WalSndLoop(void);
static void XLogBroadcastWalProposer(void);
static void XLogWalPropWrite(char *buf, Size nbytes, XLogRecPtr recptr);
static void XLogWalPropClose(XLogRecPtr recptr);
/*
* Initialize GUCs, bgworker, shmem and backpressure.
*/
@@ -882,6 +888,431 @@ walprop_blocking_write(WalProposerConn *conn, void const *buf, size_t size)
return true;
}
/*
* Subscribe for new WAL and stream it in the loop to safekeepers.
*
* At the moment, this never returns, but an ereport(ERROR) will take us back
* to the main loop.
*/
static void
StartProposerReplication(StartReplicationCmd *cmd)
{
XLogRecPtr FlushPtr;
TimeLineID currTLI;
#if PG_VERSION_NUM < 150000
if (ThisTimeLineID == 0)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("IDENTIFY_SYSTEM has not been run before START_REPLICATION")));
#endif
/*
* We assume here that we're logging enough information in the WAL for
* log-shipping, since this is checked in PostmasterMain().
*
* NOTE: wal_level can only change at shutdown, so in most cases it is
* difficult for there to be WAL data that we can still see that was
* written at wal_level='minimal'.
*/
if (cmd->slotname)
{
ReplicationSlotAcquire(cmd->slotname, true);
if (SlotIsLogical(MyReplicationSlot))
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("cannot use a logical replication slot for physical replication")));
/*
* We don't need to verify the slot's restart_lsn here; instead we
* rely on the caller requesting the starting point to use. If the
* WAL segment doesn't exist, we'll fail later.
*/
}
/*
* Select the timeline. If it was given explicitly by the client, use
* that. Otherwise use the timeline of the last replayed record, which is
* kept in ThisTimeLineID.
*
* Neon doesn't currently use PG Timelines, but it may in the future, so
* we keep this code around to lighten the load for when we need it.
*/
#if PG_VERSION_NUM >= 150000
FlushPtr = GetFlushRecPtr(&currTLI);
#else
FlushPtr = GetFlushRecPtr();
currTLI = ThisTimeLineID;
#endif
/*
* When we first start replication the standby will be behind the
* primary. For some applications, for example synchronous
* replication, it is important to have a clear state for this initial
* catchup mode, so we can trigger actions when we change streaming
* state later. We may stay in this state for a long time, which is
* exactly why we want to be able to monitor whether or not we are
* still here.
*/
WalSndSetState(WALSNDSTATE_CATCHUP);
/*
* Don't allow a request to stream from a future point in WAL that
* hasn't been flushed to disk in this server yet.
*/
if (FlushPtr < cmd->startpoint)
{
ereport(ERROR,
(errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
LSN_FORMAT_ARGS(cmd->startpoint),
LSN_FORMAT_ARGS(FlushPtr))));
}
/* Start streaming from the requested point */
sentPtr = cmd->startpoint;
/* Initialize shared memory status, too */
SpinLockAcquire(&MyWalSnd->mutex);
MyWalSnd->sentPtr = sentPtr;
SpinLockRelease(&MyWalSnd->mutex);
SyncRepInitConfig();
/* Infinite send loop, never returns */
WalSndLoop();
WalSndSetState(WALSNDSTATE_STARTUP);
if (cmd->slotname)
ReplicationSlotRelease();
}
/*
* Main loop that waits for LSN updates and calls the walproposer.
* Synchronous replication sets latch in WalSndWakeup at walsender.c
*/
static void
WalSndLoop(void)
{
/* Clear any already-pending wakeups */
ResetLatch(MyLatch);
for (;;)
{
CHECK_FOR_INTERRUPTS();
XLogBroadcastWalProposer();
if (MyWalSnd->state == WALSNDSTATE_CATCHUP)
WalSndSetState(WALSNDSTATE_STREAMING);
WalProposerPoll();
}
}
/*
* Notify walproposer about the new WAL position.
*/
static void
XLogBroadcastWalProposer(void)
{
XLogRecPtr startptr;
XLogRecPtr endptr;
/* Start from the last sent position */
startptr = sentPtr;
/*
* Streaming the current timeline on a primary.
*
* Attempt to send all data that's already been written out and
* fsync'd to disk. We cannot go further than what's been written out
* given the current implementation of WALRead(). And in any case
* it's unsafe to send WAL that is not securely down to disk on the
* primary: if the primary subsequently crashes and restarts, standbys
* must not have applied any WAL that got lost on the primary.
*/
#if PG_VERSION_NUM >= 150000
endptr = GetFlushRecPtr(NULL);
#else
endptr = GetFlushRecPtr();
#endif
/*
* Record the current system time as an approximation of the time at which
* this WAL location was written for the purposes of lag tracking.
*
* In theory we could make XLogFlush() record a time in shmem whenever WAL
* is flushed and we could get that time as well as the LSN when we call
* GetFlushRecPtr() above (and likewise for the cascading standby
* equivalent), but rather than putting any new code into the hot WAL path
* it seems good enough to capture the time here. We should reach this
* after XLogFlush() runs WalSndWakeupProcessRequests(), and although that
* may take some time, we read the WAL flush pointer and take the time
* very close to together here so that we'll get a later position if it is
* still moving.
*
* Because LagTrackerWrite ignores samples when the LSN hasn't advanced,
* this gives us a cheap approximation for the WAL flush time for this
* LSN.
*
* Note that the LSN is not necessarily the LSN for the data contained in
* the present message; it's the end of the WAL, which might be further
* ahead. All the lag tracking machinery cares about is finding out when
* that arbitrary LSN is eventually reported as written, flushed and
* applied, so that it can measure the elapsed time.
*/
LagTrackerWrite(endptr, GetCurrentTimestamp());
/* Do we have any work to do? */
Assert(startptr <= endptr);
if (endptr <= startptr)
return;
WalProposerBroadcast(startptr, endptr);
sentPtr = endptr;
/* Update shared memory status */
{
WalSnd *walsnd = MyWalSnd;
SpinLockAcquire(&walsnd->mutex);
walsnd->sentPtr = sentPtr;
SpinLockRelease(&walsnd->mutex);
}
/* Report progress of XLOG streaming in PS display */
if (update_process_title)
{
char activitymsg[50];
snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
LSN_FORMAT_ARGS(sentPtr));
set_ps_display(activitymsg);
}
}
/*
* Receive WAL from most advanced safekeeper
*/
static bool
WalProposerRecovery(Safekeeper * sk, TimeLineID timeline, XLogRecPtr startpos, XLogRecPtr endpos)
{
char *err;
WalReceiverConn *wrconn;
WalRcvStreamOptions options;
char conninfo[MAXCONNINFO];
if (!neon_auth_token)
{
memcpy(conninfo, sk->conninfo, MAXCONNINFO);
}
else
{
int written = 0;
written = snprintf((char *) conninfo, MAXCONNINFO, "password=%s %s", neon_auth_token, sk->conninfo);
if (written > MAXCONNINFO || written < 0)
elog(FATAL, "could not append password to the safekeeper connection string");
}
#if PG_MAJORVERSION_NUM < 16
wrconn = walrcv_connect(conninfo, false, "wal_proposer_recovery", &err);
#else
wrconn = walrcv_connect(conninfo, false, false, "wal_proposer_recovery", &err);
#endif
if (!wrconn)
{
ereport(WARNING,
(errmsg("could not connect to WAL acceptor %s:%s: %s",
sk->host, sk->port,
err)));
return false;
}
elog(LOG,
"start recovery from %s:%s starting from %X/%08X till %X/%08X timeline "
"%d",
sk->host, sk->port, (uint32) (startpos >> 32),
(uint32) startpos, (uint32) (endpos >> 32), (uint32) endpos, timeline);
options.logical = false;
options.startpoint = startpos;
options.slotname = NULL;
options.proto.physical.startpointTLI = timeline;
if (walrcv_startstreaming(wrconn, &options))
{
XLogRecPtr rec_start_lsn;
XLogRecPtr rec_end_lsn = 0;
int len;
char *buf;
pgsocket wait_fd = PGINVALID_SOCKET;
while ((len = walrcv_receive(wrconn, &buf, &wait_fd)) >= 0)
{
if (len == 0)
{
(void) WaitLatchOrSocket(
MyLatch, WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE, wait_fd,
-1, WAIT_EVENT_WAL_RECEIVER_MAIN);
}
else
{
Assert(buf[0] == 'w' || buf[0] == 'k');
if (buf[0] == 'k')
continue; /* keepalive */
memcpy(&rec_start_lsn, &buf[XLOG_HDR_START_POS],
sizeof rec_start_lsn);
rec_start_lsn = pg_ntoh64(rec_start_lsn);
rec_end_lsn = rec_start_lsn + len - XLOG_HDR_SIZE;
/* write WAL to disk */
XLogWalPropWrite(&buf[XLOG_HDR_SIZE], len - XLOG_HDR_SIZE, rec_start_lsn);
ereport(DEBUG1,
(errmsg("Recover message %X/%X length %d",
LSN_FORMAT_ARGS(rec_start_lsn), len)));
if (rec_end_lsn >= endpos)
break;
}
}
ereport(LOG,
(errmsg("end of replication stream at %X/%X: %m",
LSN_FORMAT_ARGS(rec_end_lsn))));
walrcv_disconnect(wrconn);
/* failed to receive all WAL till endpos */
if (rec_end_lsn < endpos)
return false;
}
else
{
ereport(LOG,
(errmsg("primary server contains no more WAL on requested timeline %u LSN %X/%08X",
timeline, (uint32) (startpos >> 32), (uint32) startpos)));
return false;
}
return true;
}
/*
* These variables are used similarly to openLogFile/SegNo,
* but for walproposer to write the XLOG during recovery. walpropFileTLI is the TimeLineID
* corresponding the filename of walpropFile.
*/
static int walpropFile = -1;
static TimeLineID walpropFileTLI = 0;
static XLogSegNo walpropSegNo = 0;
/*
* Write XLOG data to disk.
*/
static void
XLogWalPropWrite(char *buf, Size nbytes, XLogRecPtr recptr)
{
int startoff;
int byteswritten;
while (nbytes > 0)
{
int segbytes;
/* Close the current segment if it's completed */
if (walpropFile >= 0 && !XLByteInSeg(recptr, walpropSegNo, wal_segment_size))
XLogWalPropClose(recptr);
if (walpropFile < 0)
{
#if PG_VERSION_NUM >= 150000
/* FIXME Is it ok to use hardcoded value here? */
TimeLineID tli = 1;
#else
bool use_existent = true;
#endif
/* Create/use new log file */
XLByteToSeg(recptr, walpropSegNo, wal_segment_size);
#if PG_VERSION_NUM >= 150000
walpropFile = XLogFileInit(walpropSegNo, tli);
walpropFileTLI = tli;
#else
walpropFile = XLogFileInit(walpropSegNo, &use_existent, false);
walpropFileTLI = ThisTimeLineID;
#endif
}
/* Calculate the start offset of the received logs */
startoff = XLogSegmentOffset(recptr, wal_segment_size);
if (startoff + nbytes > wal_segment_size)
segbytes = wal_segment_size - startoff;
else
segbytes = nbytes;
/* OK to write the logs */
errno = 0;
byteswritten = pg_pwrite(walpropFile, buf, segbytes, (off_t) startoff);
if (byteswritten <= 0)
{
char xlogfname[MAXFNAMELEN];
int save_errno;
/* if write didn't set errno, assume no disk space */
if (errno == 0)
errno = ENOSPC;
save_errno = errno;
XLogFileName(xlogfname, walpropFileTLI, walpropSegNo, wal_segment_size);
errno = save_errno;
ereport(PANIC,
(errcode_for_file_access(),
errmsg("could not write to log segment %s "
"at offset %u, length %lu: %m",
xlogfname, startoff, (unsigned long) segbytes)));
}
/* Update state for write */
recptr += byteswritten;
nbytes -= byteswritten;
buf += byteswritten;
}
/*
* Close the current segment if it's fully written up in the last cycle of
* the loop.
*/
if (walpropFile >= 0 && !XLByteInSeg(recptr, walpropSegNo, wal_segment_size))
{
XLogWalPropClose(recptr);
}
}
/*
* Close the current segment.
*/
static void
XLogWalPropClose(XLogRecPtr recptr)
{
Assert(walpropFile >= 0 && !XLByteInSeg(recptr, walpropSegNo, wal_segment_size));
if (close(walpropFile) != 0)
{
char xlogfname[MAXFNAMELEN];
XLogFileName(xlogfname, walpropFileTLI, walpropSegNo, wal_segment_size);
ereport(PANIC,
(errcode_for_file_access(),
errmsg("could not close log segment %s: %m",
xlogfname)));
}
walpropFile = -1;
}
/*
* Temporary globally exported walproposer API for postgres.
*/
@@ -908,4 +1339,5 @@ const walproposer_api walprop_pg = {
.conn_async_read = walprop_async_read,
.conn_async_write = walprop_async_write,
.conn_blocking_write = walprop_blocking_write,
.recovery_download = WalProposerRecovery,
};

View File

@@ -1,659 +0,0 @@
#include "postgres.h"
#include "access/timeline.h"
#include "access/xlogutils.h"
#include "common/logging.h"
#include "common/ip.h"
#include "funcapi.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "postmaster/interrupt.h"
#include "replication/slot.h"
#include "walproposer_utils.h"
#include "replication/walsender_private.h"
#include "storage/ipc.h"
#include "utils/builtins.h"
#include "utils/ps_status.h"
#include "libpq-fe.h"
#include <netinet/tcp.h>
#include <unistd.h>
#if PG_VERSION_NUM >= 150000
#include "access/xlogutils.h"
#include "access/xlogrecovery.h"
#endif
#if PG_MAJORVERSION_NUM >= 16
#include "utils/guc.h"
#endif
/*
* These variables are used similarly to openLogFile/SegNo,
* but for walproposer to write the XLOG during recovery. walpropFileTLI is the TimeLineID
* corresponding the filename of walpropFile.
*/
static int walpropFile = -1;
static TimeLineID walpropFileTLI = 0;
static XLogSegNo walpropSegNo = 0;
/* START cloned file-local variables and functions from walsender.c */
/*
* How far have we sent WAL already? This is also advertised in
* MyWalSnd->sentPtr. (Actually, this is the next WAL location to send.)
*/
static XLogRecPtr sentPtr = InvalidXLogRecPtr;
static void WalSndLoop(void);
static void XLogBroadcastWalProposer(void);
/* END cloned file-level variables and functions from walsender.c */
int
CompareLsn(const void *a, const void *b)
{
XLogRecPtr lsn1 = *((const XLogRecPtr *) a);
XLogRecPtr lsn2 = *((const XLogRecPtr *) b);
if (lsn1 < lsn2)
return -1;
else if (lsn1 == lsn2)
return 0;
else
return 1;
}
/* Returns a human-readable string corresonding to the SafekeeperState
*
* The string should not be freed.
*
* The strings are intended to be used as a prefix to "state", e.g.:
*
* elog(LOG, "currently in %s state", FormatSafekeeperState(sk->state));
*
* If this sort of phrasing doesn't fit the message, instead use something like:
*
* elog(LOG, "currently in state [%s]", FormatSafekeeperState(sk->state));
*/
char *
FormatSafekeeperState(SafekeeperState state)
{
char *return_val = NULL;
switch (state)
{
case SS_OFFLINE:
return_val = "offline";
break;
case SS_CONNECTING_READ:
case SS_CONNECTING_WRITE:
return_val = "connecting";
break;
case SS_WAIT_EXEC_RESULT:
return_val = "receiving query result";
break;
case SS_HANDSHAKE_RECV:
return_val = "handshake (receiving)";
break;
case SS_VOTING:
return_val = "voting";
break;
case SS_WAIT_VERDICT:
return_val = "wait-for-verdict";
break;
case SS_SEND_ELECTED_FLUSH:
return_val = "send-announcement-flush";
break;
case SS_IDLE:
return_val = "idle";
break;
case SS_ACTIVE:
return_val = "active";
break;
}
Assert(return_val != NULL);
return return_val;
}
/* Asserts that the provided events are expected for given safekeeper's state */
void
AssertEventsOkForState(uint32 events, Safekeeper *sk)
{
uint32 expected = SafekeeperStateDesiredEvents(sk->state);
/*
* The events are in-line with what we're expecting, under two conditions:
* (a) if we aren't expecting anything, `events` has no read- or
* write-ready component. (b) if we are expecting something, there's
* overlap (i.e. `events & expected != 0`)
*/
bool events_ok_for_state; /* long name so the `Assert` is more
* clear later */
if (expected == WL_NO_EVENTS)
events_ok_for_state = ((events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)) == 0);
else
events_ok_for_state = ((events & expected) != 0);
if (!events_ok_for_state)
{
/*
* To give a descriptive message in the case of failure, we use elog
* and then an assertion that's guaranteed to fail.
*/
elog(WARNING, "events %s mismatched for safekeeper %s:%s in state [%s]",
FormatEvents(events), sk->host, sk->port, FormatSafekeeperState(sk->state));
Assert(events_ok_for_state);
}
}
/* Returns the set of events a safekeeper in this state should be waiting on
*
* This will return WL_NO_EVENTS (= 0) for some events. */
uint32
SafekeeperStateDesiredEvents(SafekeeperState state)
{
uint32 result = WL_NO_EVENTS;
/* If the state doesn't have a modifier, we can check the base state */
switch (state)
{
/* Connecting states say what they want in the name */
case SS_CONNECTING_READ:
result = WL_SOCKET_READABLE;
break;
case SS_CONNECTING_WRITE:
result = WL_SOCKET_WRITEABLE;
break;
/* Reading states need the socket to be read-ready to continue */
case SS_WAIT_EXEC_RESULT:
case SS_HANDSHAKE_RECV:
case SS_WAIT_VERDICT:
result = WL_SOCKET_READABLE;
break;
/*
* Idle states use read-readiness as a sign that the connection
* has been disconnected.
*/
case SS_VOTING:
case SS_IDLE:
result = WL_SOCKET_READABLE;
break;
/*
* Flush states require write-ready for flushing. Active state
* does both reading and writing.
*
* TODO: SS_ACTIVE sometimes doesn't need to be write-ready. We
* should check sk->flushWrite here to set WL_SOCKET_WRITEABLE.
*/
case SS_SEND_ELECTED_FLUSH:
case SS_ACTIVE:
result = WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE;
break;
/* The offline state expects no events. */
case SS_OFFLINE:
result = WL_NO_EVENTS;
break;
default:
Assert(false);
break;
}
return result;
}
/* Returns a human-readable string corresponding to the event set
*
* If the events do not correspond to something set as the `events` field of a `WaitEvent`, the
* returned string may be meaingless.
*
* The string should not be freed. It should also not be expected to remain the same between
* function calls. */
char *
FormatEvents(uint32 events)
{
static char return_str[8];
/* Helper variable to check if there's extra bits */
uint32 all_flags = WL_LATCH_SET
| WL_SOCKET_READABLE
| WL_SOCKET_WRITEABLE
| WL_TIMEOUT
| WL_POSTMASTER_DEATH
| WL_EXIT_ON_PM_DEATH
| WL_SOCKET_CONNECTED;
/*
* The formatting here isn't supposed to be *particularly* useful -- it's
* just to give an sense of what events have been triggered without
* needing to remember your powers of two.
*/
return_str[0] = (events & WL_LATCH_SET) ? 'L' : '_';
return_str[1] = (events & WL_SOCKET_READABLE) ? 'R' : '_';
return_str[2] = (events & WL_SOCKET_WRITEABLE) ? 'W' : '_';
return_str[3] = (events & WL_TIMEOUT) ? 'T' : '_';
return_str[4] = (events & WL_POSTMASTER_DEATH) ? 'D' : '_';
return_str[5] = (events & WL_EXIT_ON_PM_DEATH) ? 'E' : '_';
return_str[5] = (events & WL_SOCKET_CONNECTED) ? 'C' : '_';
if (events & (~all_flags))
{
elog(WARNING, "Event formatting found unexpected component %d",
events & (~all_flags));
return_str[6] = '*';
return_str[7] = '\0';
}
else
return_str[6] = '\0';
return (char *) &return_str;
}
/*
* Convert a character which represents a hexadecimal digit to an integer.
*
* Returns -1 if the character is not a hexadecimal digit.
*/
static int
HexDecodeChar(char c)
{
if (c >= '0' && c <= '9')
return c - '0';
if (c >= 'a' && c <= 'f')
return c - 'a' + 10;
if (c >= 'A' && c <= 'F')
return c - 'A' + 10;
return -1;
}
/*
* Decode a hex string into a byte string, 2 hex chars per byte.
*
* Returns false if invalid characters are encountered; otherwise true.
*/
bool
HexDecodeString(uint8 *result, char *input, int nbytes)
{
int i;
for (i = 0; i < nbytes; ++i)
{
int n1 = HexDecodeChar(input[i * 2]);
int n2 = HexDecodeChar(input[i * 2 + 1]);
if (n1 < 0 || n2 < 0)
return false;
result[i] = n1 * 16 + n2;
}
return true;
}
/* --------------------------------
* pq_getmsgint32_le - get a binary 4-byte int from a message buffer in native (LE) order
* --------------------------------
*/
uint32
pq_getmsgint32_le(StringInfo msg)
{
uint32 n32;
pq_copymsgbytes(msg, (char *) &n32, sizeof(n32));
return n32;
}
/* --------------------------------
* pq_getmsgint64 - get a binary 8-byte int from a message buffer in native (LE) order
* --------------------------------
*/
uint64
pq_getmsgint64_le(StringInfo msg)
{
uint64 n64;
pq_copymsgbytes(msg, (char *) &n64, sizeof(n64));
return n64;
}
/* append a binary [u]int32 to a StringInfo buffer in native (LE) order */
void
pq_sendint32_le(StringInfo buf, uint32 i)
{
enlargeStringInfo(buf, sizeof(uint32));
memcpy(buf->data + buf->len, &i, sizeof(uint32));
buf->len += sizeof(uint32);
}
/* append a binary [u]int64 to a StringInfo buffer in native (LE) order */
void
pq_sendint64_le(StringInfo buf, uint64 i)
{
enlargeStringInfo(buf, sizeof(uint64));
memcpy(buf->data + buf->len, &i, sizeof(uint64));
buf->len += sizeof(uint64);
}
/*
* Write XLOG data to disk.
*/
void
XLogWalPropWrite(char *buf, Size nbytes, XLogRecPtr recptr)
{
int startoff;
int byteswritten;
while (nbytes > 0)
{
int segbytes;
/* Close the current segment if it's completed */
if (walpropFile >= 0 && !XLByteInSeg(recptr, walpropSegNo, wal_segment_size))
XLogWalPropClose(recptr);
if (walpropFile < 0)
{
#if PG_VERSION_NUM >= 150000
/* FIXME Is it ok to use hardcoded value here? */
TimeLineID tli = 1;
#else
bool use_existent = true;
#endif
/* Create/use new log file */
XLByteToSeg(recptr, walpropSegNo, wal_segment_size);
#if PG_VERSION_NUM >= 150000
walpropFile = XLogFileInit(walpropSegNo, tli);
walpropFileTLI = tli;
#else
walpropFile = XLogFileInit(walpropSegNo, &use_existent, false);
walpropFileTLI = ThisTimeLineID;
#endif
}
/* Calculate the start offset of the received logs */
startoff = XLogSegmentOffset(recptr, wal_segment_size);
if (startoff + nbytes > wal_segment_size)
segbytes = wal_segment_size - startoff;
else
segbytes = nbytes;
/* OK to write the logs */
errno = 0;
byteswritten = pg_pwrite(walpropFile, buf, segbytes, (off_t) startoff);
if (byteswritten <= 0)
{
char xlogfname[MAXFNAMELEN];
int save_errno;
/* if write didn't set errno, assume no disk space */
if (errno == 0)
errno = ENOSPC;
save_errno = errno;
XLogFileName(xlogfname, walpropFileTLI, walpropSegNo, wal_segment_size);
errno = save_errno;
ereport(PANIC,
(errcode_for_file_access(),
errmsg("could not write to log segment %s "
"at offset %u, length %lu: %m",
xlogfname, startoff, (unsigned long) segbytes)));
}
/* Update state for write */
recptr += byteswritten;
nbytes -= byteswritten;
buf += byteswritten;
}
/*
* Close the current segment if it's fully written up in the last cycle of
* the loop.
*/
if (walpropFile >= 0 && !XLByteInSeg(recptr, walpropSegNo, wal_segment_size))
{
XLogWalPropClose(recptr);
}
}
/*
* Close the current segment.
*/
void
XLogWalPropClose(XLogRecPtr recptr)
{
Assert(walpropFile >= 0 && !XLByteInSeg(recptr, walpropSegNo, wal_segment_size));
if (close(walpropFile) != 0)
{
char xlogfname[MAXFNAMELEN];
XLogFileName(xlogfname, walpropFileTLI, walpropSegNo, wal_segment_size);
ereport(PANIC,
(errcode_for_file_access(),
errmsg("could not close log segment %s: %m",
xlogfname)));
}
walpropFile = -1;
}
/* START of cloned functions from walsender.c */
/*
* Subscribe for new WAL and stream it in the loop to safekeepers.
*
* At the moment, this never returns, but an ereport(ERROR) will take us back
* to the main loop.
*/
void
StartProposerReplication(StartReplicationCmd *cmd)
{
XLogRecPtr FlushPtr;
TimeLineID currTLI;
#if PG_VERSION_NUM < 150000
if (ThisTimeLineID == 0)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("IDENTIFY_SYSTEM has not been run before START_REPLICATION")));
#endif
/*
* We assume here that we're logging enough information in the WAL for
* log-shipping, since this is checked in PostmasterMain().
*
* NOTE: wal_level can only change at shutdown, so in most cases it is
* difficult for there to be WAL data that we can still see that was
* written at wal_level='minimal'.
*/
if (cmd->slotname)
{
ReplicationSlotAcquire(cmd->slotname, true);
if (SlotIsLogical(MyReplicationSlot))
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("cannot use a logical replication slot for physical replication")));
/*
* We don't need to verify the slot's restart_lsn here; instead we
* rely on the caller requesting the starting point to use. If the
* WAL segment doesn't exist, we'll fail later.
*/
}
/*
* Select the timeline. If it was given explicitly by the client, use
* that. Otherwise use the timeline of the last replayed record, which is
* kept in ThisTimeLineID.
*
* Neon doesn't currently use PG Timelines, but it may in the future, so
* we keep this code around to lighten the load for when we need it.
*/
#if PG_VERSION_NUM >= 150000
FlushPtr = GetFlushRecPtr(&currTLI);
#else
FlushPtr = GetFlushRecPtr();
currTLI = ThisTimeLineID;
#endif
/*
* When we first start replication the standby will be behind the
* primary. For some applications, for example synchronous
* replication, it is important to have a clear state for this initial
* catchup mode, so we can trigger actions when we change streaming
* state later. We may stay in this state for a long time, which is
* exactly why we want to be able to monitor whether or not we are
* still here.
*/
WalSndSetState(WALSNDSTATE_CATCHUP);
/*
* Don't allow a request to stream from a future point in WAL that
* hasn't been flushed to disk in this server yet.
*/
if (FlushPtr < cmd->startpoint)
{
ereport(ERROR,
(errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
LSN_FORMAT_ARGS(cmd->startpoint),
LSN_FORMAT_ARGS(FlushPtr))));
}
/* Start streaming from the requested point */
sentPtr = cmd->startpoint;
/* Initialize shared memory status, too */
SpinLockAcquire(&MyWalSnd->mutex);
MyWalSnd->sentPtr = sentPtr;
SpinLockRelease(&MyWalSnd->mutex);
SyncRepInitConfig();
/* Infinite send loop, never returns */
WalSndLoop();
WalSndSetState(WALSNDSTATE_STARTUP);
if (cmd->slotname)
ReplicationSlotRelease();
}
/*
* Main loop that waits for LSN updates and calls the walproposer.
* Synchronous replication sets latch in WalSndWakeup at walsender.c
*/
static void
WalSndLoop(void)
{
/* Clear any already-pending wakeups */
ResetLatch(MyLatch);
for (;;)
{
CHECK_FOR_INTERRUPTS();
XLogBroadcastWalProposer();
if (MyWalSnd->state == WALSNDSTATE_CATCHUP)
WalSndSetState(WALSNDSTATE_STREAMING);
WalProposerPoll();
}
}
/*
* Notify walproposer about the new WAL position.
*/
static void
XLogBroadcastWalProposer(void)
{
XLogRecPtr startptr;
XLogRecPtr endptr;
/* Start from the last sent position */
startptr = sentPtr;
/*
* Streaming the current timeline on a primary.
*
* Attempt to send all data that's already been written out and
* fsync'd to disk. We cannot go further than what's been written out
* given the current implementation of WALRead(). And in any case
* it's unsafe to send WAL that is not securely down to disk on the
* primary: if the primary subsequently crashes and restarts, standbys
* must not have applied any WAL that got lost on the primary.
*/
#if PG_VERSION_NUM >= 150000
endptr = GetFlushRecPtr(NULL);
#else
endptr = GetFlushRecPtr();
#endif
/*
* Record the current system time as an approximation of the time at which
* this WAL location was written for the purposes of lag tracking.
*
* In theory we could make XLogFlush() record a time in shmem whenever WAL
* is flushed and we could get that time as well as the LSN when we call
* GetFlushRecPtr() above (and likewise for the cascading standby
* equivalent), but rather than putting any new code into the hot WAL path
* it seems good enough to capture the time here. We should reach this
* after XLogFlush() runs WalSndWakeupProcessRequests(), and although that
* may take some time, we read the WAL flush pointer and take the time
* very close to together here so that we'll get a later position if it is
* still moving.
*
* Because LagTrackerWrite ignores samples when the LSN hasn't advanced,
* this gives us a cheap approximation for the WAL flush time for this
* LSN.
*
* Note that the LSN is not necessarily the LSN for the data contained in
* the present message; it's the end of the WAL, which might be further
* ahead. All the lag tracking machinery cares about is finding out when
* that arbitrary LSN is eventually reported as written, flushed and
* applied, so that it can measure the elapsed time.
*/
LagTrackerWrite(endptr, GetCurrentTimestamp());
/* Do we have any work to do? */
Assert(startptr <= endptr);
if (endptr <= startptr)
return;
WalProposerBroadcast(startptr, endptr);
sentPtr = endptr;
/* Update shared memory status */
{
WalSnd *walsnd = MyWalSnd;
SpinLockAcquire(&walsnd->mutex);
walsnd->sentPtr = sentPtr;
SpinLockRelease(&walsnd->mutex);
}
/* Report progress of XLOG streaming in PS display */
if (update_process_title)
{
char activitymsg[50];
snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
LSN_FORMAT_ARGS(sentPtr));
set_ps_display(activitymsg);
}
}

View File

@@ -1,19 +0,0 @@
#ifndef __NEON_WALPROPOSER_UTILS_H__
#define __NEON_WALPROPOSER_UTILS_H__
#include "walproposer.h"
int CompareLsn(const void *a, const void *b);
char *FormatSafekeeperState(SafekeeperState state);
void AssertEventsOkForState(uint32 events, Safekeeper *sk);
uint32 SafekeeperStateDesiredEvents(SafekeeperState state);
char *FormatEvents(uint32 events);
bool HexDecodeString(uint8 *result, char *input, int nbytes);
uint32 pq_getmsgint32_le(StringInfo msg);
uint64 pq_getmsgint64_le(StringInfo msg);
void pq_sendint32_le(StringInfo buf, uint32 i);
void pq_sendint64_le(StringInfo buf, uint64 i);
void XLogWalPropWrite(char *buf, Size nbytes, XLogRecPtr recptr);
void XLogWalPropClose(XLogRecPtr recptr);
#endif /* __NEON_WALPROPOSER_UTILS_H__ */