Spend some time

This commit is contained in:
Arthur Petukhovsky
2023-05-26 14:45:25 +03:00
parent b55005d2c4
commit 9ccbec0e14
7 changed files with 13957 additions and 4 deletions

View File

@@ -13,5 +13,5 @@
# -lpgcommon -lpgport -lz -lreadline -lm \
# -o walproposer.so walproposer.o
clang -c -o walproposer.o walproposer.c
clang -c -o walproposer.o walproposer.c -ferror-limit=1
ar rcs libwalproposer.a walproposer.o

10668
libs/walproposer/deps.c Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,182 @@
#include "deps.c"
char *wal_acceptors_list;
int wal_acceptor_reconnect_timeout = 1000;
int wal_acceptor_connection_timeout = 5000;
// static void
// nwp_register_gucs(void)
// {
// DefineCustomStringVariable(
// "neon.safekeepers",
// "List of Neon WAL acceptors (host:port)",
// NULL, /* long_desc */
// &wal_acceptors_list, /* valueAddr */
// "", /* bootValue */
// PGC_POSTMASTER,
// GUC_LIST_INPUT, /* extensions can't use*
// * GUC_LIST_QUOTE */
// NULL, NULL, NULL);
// DefineCustomIntVariable(
// "neon.safekeeper_reconnect_timeout",
// "Timeout for reconnecting to offline wal acceptor.",
// NULL,
// &wal_acceptor_reconnect_timeout,
// 1000, 0, INT_MAX, /* default, min, max */
// PGC_SIGHUP, /* context */
// GUC_UNIT_MS, /* flags */
// NULL, NULL, NULL);
// DefineCustomIntVariable(
// "neon.safekeeper_connect_timeout",
// "Timeout for connection establishement and it's maintenance against safekeeper",
// NULL,
// &wal_acceptor_connection_timeout,
// 5000, 0, INT_MAX,
// PGC_SIGHUP,
// GUC_UNIT_MS,
// NULL, NULL, NULL);
// }
/*
* Get latest redo apply position.
*
* Exported to allow WALReceiver to read the pointer directly.
*/
XLogRecPtr
GetXLogReplayRecPtr(void)
{
// TODO
return 0;
}
/*
* GetFlushRecPtr -- Returns the current flush position, ie, the last WAL
* position known to be fsync'd to disk.
*/
XLogRecPtr
GetFlushRecPtr(void)
{
// TODO:
return 0;
}
/*
* RedoStartLsn is set only once by startup process, locking is not required
* after its exit.
*/
XLogRecPtr
GetRedoStartLsn(void)
{
// TODO:
return 0;
}
TimestampTz
GetCurrentTimestamp(void)
{
// TODO:
return 0;
}
/* typedef in latch.h */
struct WaitEventSet
{
};
typedef struct WaitEventSet WaitEventSet;
typedef struct WaitEvent
{
int pos; /* position in the event data structure */
uint32 events; /* triggered events */
pgsocket fd; /* socket fd associated with event */
void *user_data; /* pointer provided in AddWaitEventToSet */
} WaitEvent;
int WaitEventSetWait(WaitEventSet *set, long timeout,
WaitEvent *occurred_events, int nevents,
uint32 wait_event_info)
{
// TODO:
return 0;
}
extern PGDLLIMPORT struct Latch *MyLatch;
int WaitLatchOrSocket(Latch *latch, int wakeEvents,
pgsocket sock, long timeout, uint32 wait_event_info)
{
// TODO:
return 0;
}
XLogReaderState *
XLogReaderAllocate()
{
// TODO:
return NULL;
}
uint64 systemId = 0;
/*
* This is the default value for wal_segment_size to be used when initdb is run
* without the --wal-segsize option. It must be a valid segment size.
*/
#define DEFAULT_XLOG_SEG_SIZE (16*1024*1024)
int wal_segment_size = DEFAULT_XLOG_SEG_SIZE;
WaitEventSet *CreateWaitEventSet(int nevents)
{
// TODO:
return NULL;
}
void FreeWaitEventSet(WaitEventSet *set)
{
// TODO:
}
int AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd,
Latch *latch, void *user_data)
{
// TODO:
return 0;
}
void ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
{
// TODO:
}
#define SizeOfXLogLongPHD 40UL
#define SizeOfXLogShortPHD 24UL
/*
* Error information from WALRead that both backend and frontend caller can
* process. Currently only errors from pg_pread can be reported.
*/
typedef struct WALReadError
{
int wre_errno; /* errno set by the last pg_pread() */
int wre_off; /* Offset we tried to read from. */
int wre_req; /* Bytes requested to be read. */
int wre_read; /* Bytes read by the last read(). */
} WALReadError;
bool WALRead(XLogReaderState *state,
char *buf, XLogRecPtr startptr, Size count,
TimeLineID tli, WALReadError *errinfo)
{
return false;
}
void PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
{
}
void ResetLatch(Latch *latch)
{
}

Binary file not shown.

380
libs/walproposer/utils.c Normal file
View File

@@ -0,0 +1,380 @@
#include "deps.c"
#include "interface.c"
#include "walproposer.h"
/*
* These variables keep track of the state of the timeline we're currently
* sending. sendTimeLine identifies the timeline. If sendTimeLineIsHistoric,
* the timeline is not the latest timeline on this server, and the server's
* history forked off from that timeline at sendTimeLineValidUpto.
*/
static TimeLineID sendTimeLine = 0;
static TimeLineID sendTimeLineNextTLI = 0;
static bool sendTimeLineIsHistoric = false;
static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr;
/*
* Timestamp of last ProcessRepliesIfAny() that saw a reply from the
* standby. Set to 0 if wal_sender_timeout doesn't need to be active.
*/
static TimestampTz last_reply_timestamp = 0;
/* Have we sent a heartbeat message asking for reply, since last reply? */
static bool waiting_for_ping_response = false;
static bool streamingDoneSending;
static bool streamingDoneReceiving;
/* Are we there yet? */
static bool WalSndCaughtUp = false;
/* Flags set by signal handlers for later service in main loop */
static volatile sig_atomic_t got_STOPPING = false;
/*
* How far have we sent WAL already? This is also advertised in
* MyWalSnd->sentPtr. (Actually, this is the next WAL location to send.)
*/
static XLogRecPtr sentPtr = InvalidXLogRecPtr;
/*
* This is set while we are streaming. When not set
* PROCSIG_WALSND_INIT_STOPPING signal will be handled like SIGTERM. When set,
* the main loop is responsible for checking got_STOPPING and terminating when
* it's set (after streaming any remaining WAL).
*/
static volatile sig_atomic_t replication_active = false;
typedef void (*WalSndSendDataCallback) (void);
static void WalSndLoop(WalSndSendDataCallback send_data);
#if PG_VERSION_NUM >= 150000
static XLogRecPtr GetStandbyFlushRecPtr(TimeLineID *tli);
#else
static XLogRecPtr GetStandbyFlushRecPtr(void);
#endif
static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo,
TimeLineID *tli_p);
/* END cloned file-level variables and functions from walsender.c */
int
CompareLsn(const void *a, const void *b)
{
XLogRecPtr lsn1 = *((const XLogRecPtr *) a);
XLogRecPtr lsn2 = *((const XLogRecPtr *) b);
if (lsn1 < lsn2)
return -1;
else if (lsn1 == lsn2)
return 0;
else
return 1;
}
/* Returns a human-readable string corresonding to the SafekeeperState
*
* The string should not be freed.
*
* The strings are intended to be used as a prefix to "state", e.g.:
*
* elog(LOG, "currently in %s state", FormatSafekeeperState(sk->state));
*
* If this sort of phrasing doesn't fit the message, instead use something like:
*
* elog(LOG, "currently in state [%s]", FormatSafekeeperState(sk->state));
*/
char *
FormatSafekeeperState(SafekeeperState state)
{
char *return_val = NULL;
switch (state)
{
case SS_OFFLINE:
return_val = "offline";
break;
case SS_CONNECTING_READ:
case SS_CONNECTING_WRITE:
return_val = "connecting";
break;
case SS_WAIT_EXEC_RESULT:
return_val = "receiving query result";
break;
case SS_HANDSHAKE_RECV:
return_val = "handshake (receiving)";
break;
case SS_VOTING:
return_val = "voting";
break;
case SS_WAIT_VERDICT:
return_val = "wait-for-verdict";
break;
case SS_SEND_ELECTED_FLUSH:
return_val = "send-announcement-flush";
break;
case SS_IDLE:
return_val = "idle";
break;
case SS_ACTIVE:
return_val = "active";
break;
}
Assert(return_val != NULL);
return return_val;
}
uint32
SafekeeperStateDesiredEvents(SafekeeperState state);
char *
FormatEvents(uint32 events);
/* Asserts that the provided events are expected for given safekeeper's state */
void
AssertEventsOkForState(uint32 events, Safekeeper *sk)
{
uint32 expected = SafekeeperStateDesiredEvents(sk->state);
/*
* The events are in-line with what we're expecting, under two conditions:
* (a) if we aren't expecting anything, `events` has no read- or
* write-ready component. (b) if we are expecting something, there's
* overlap (i.e. `events & expected != 0`)
*/
bool events_ok_for_state; /* long name so the `Assert` is more
* clear later */
if (expected == WL_NO_EVENTS)
events_ok_for_state = ((events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)) == 0);
else
events_ok_for_state = ((events & expected) != 0);
if (!events_ok_for_state)
{
/*
* To give a descriptive message in the case of failure, we use elog
* and then an assertion that's guaranteed to fail.
*/
elog(WARNING, "events %s mismatched for safekeeper %s:%s in state [%s]",
FormatEvents(events), sk->host, sk->port, FormatSafekeeperState(sk->state));
Assert(events_ok_for_state);
}
}
/* Returns the set of events a safekeeper in this state should be waiting on
*
* This will return WL_NO_EVENTS (= 0) for some events. */
uint32
SafekeeperStateDesiredEvents(SafekeeperState state)
{
uint32 result = WL_NO_EVENTS;
/* If the state doesn't have a modifier, we can check the base state */
switch (state)
{
/* Connecting states say what they want in the name */
case SS_CONNECTING_READ:
result = WL_SOCKET_READABLE;
break;
case SS_CONNECTING_WRITE:
result = WL_SOCKET_WRITEABLE;
break;
/* Reading states need the socket to be read-ready to continue */
case SS_WAIT_EXEC_RESULT:
case SS_HANDSHAKE_RECV:
case SS_WAIT_VERDICT:
result = WL_SOCKET_READABLE;
break;
/*
* Idle states use read-readiness as a sign that the connection
* has been disconnected.
*/
case SS_VOTING:
case SS_IDLE:
result = WL_SOCKET_READABLE;
break;
/*
* Flush states require write-ready for flushing. Active state
* does both reading and writing.
*
* TODO: SS_ACTIVE sometimes doesn't need to be write-ready. We
* should check sk->flushWrite here to set WL_SOCKET_WRITEABLE.
*/
case SS_SEND_ELECTED_FLUSH:
case SS_ACTIVE:
result = WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE;
break;
/* The offline state expects no events. */
case SS_OFFLINE:
result = WL_NO_EVENTS;
break;
default:
Assert(false);
break;
}
return result;
}
/* Returns a human-readable string corresponding to the event set
*
* If the events do not correspond to something set as the `events` field of a `WaitEvent`, the
* returned string may be meaingless.
*
* The string should not be freed. It should also not be expected to remain the same between
* function calls. */
char *
FormatEvents(uint32 events)
{
static char return_str[8];
/* Helper variable to check if there's extra bits */
uint32 all_flags = WL_LATCH_SET
| WL_SOCKET_READABLE
| WL_SOCKET_WRITEABLE
| WL_TIMEOUT
| WL_POSTMASTER_DEATH
| WL_EXIT_ON_PM_DEATH
| WL_SOCKET_CONNECTED;
/*
* The formatting here isn't supposed to be *particularly* useful -- it's
* just to give an sense of what events have been triggered without
* needing to remember your powers of two.
*/
return_str[0] = (events & WL_LATCH_SET) ? 'L' : '_';
return_str[1] = (events & WL_SOCKET_READABLE) ? 'R' : '_';
return_str[2] = (events & WL_SOCKET_WRITEABLE) ? 'W' : '_';
return_str[3] = (events & WL_TIMEOUT) ? 'T' : '_';
return_str[4] = (events & WL_POSTMASTER_DEATH) ? 'D' : '_';
return_str[5] = (events & WL_EXIT_ON_PM_DEATH) ? 'E' : '_';
return_str[5] = (events & WL_SOCKET_CONNECTED) ? 'C' : '_';
if (events & (~all_flags))
{
elog(WARNING, "Event formatting found unexpected component %d",
events & (~all_flags));
return_str[6] = '*';
return_str[7] = '\0';
}
else
return_str[6] = '\0';
return (char *) &return_str;
}
/*
* Convert a character which represents a hexadecimal digit to an integer.
*
* Returns -1 if the character is not a hexadecimal digit.
*/
static int
HexDecodeChar(char c)
{
if (c >= '0' && c <= '9')
return c - '0';
if (c >= 'a' && c <= 'f')
return c - 'a' + 10;
if (c >= 'A' && c <= 'F')
return c - 'A' + 10;
return -1;
}
/*
* Decode a hex string into a byte string, 2 hex chars per byte.
*
* Returns false if invalid characters are encountered; otherwise true.
*/
bool
HexDecodeString(uint8 *result, char *input, int nbytes)
{
int i;
for (i = 0; i < nbytes; ++i)
{
int n1 = HexDecodeChar(input[i * 2]);
int n2 = HexDecodeChar(input[i * 2 + 1]);
if (n1 < 0 || n2 < 0)
return false;
result[i] = n1 * 16 + n2;
}
return true;
}
/* --------------------------------
* pq_getmsgint32_le - get a binary 4-byte int from a message buffer in native (LE) order
* --------------------------------
*/
uint32
pq_getmsgint32_le(StringInfo msg)
{
uint32 n32;
pq_copymsgbytes(msg, (char *) &n32, sizeof(n32));
return n32;
}
/* --------------------------------
* pq_getmsgint64 - get a binary 8-byte int from a message buffer in native (LE) order
* --------------------------------
*/
uint64
pq_getmsgint64_le(StringInfo msg)
{
uint64 n64;
pq_copymsgbytes(msg, (char *) &n64, sizeof(n64));
return n64;
}
/* append a binary [u]int32 to a StringInfo buffer in native (LE) order */
void
pq_sendint32_le(StringInfo buf, uint32 i)
{
enlargeStringInfo(buf, sizeof(uint32));
memcpy(buf->data + buf->len, &i, sizeof(uint32));
buf->len += sizeof(uint32);
}
/* append a binary [u]int64 to a StringInfo buffer in native (LE) order */
void
pq_sendint64_le(StringInfo buf, uint64 i)
{
enlargeStringInfo(buf, sizeof(uint64));
memcpy(buf->data + buf->len, &i, sizeof(uint64));
buf->len += sizeof(uint64);
}
/* START of cloned functions from walsender.c */
/*
* Handle START_REPLICATION command.
*
* At the moment, this never returns, but an ereport(ERROR) will take us back
* to the main loop.
*/
void
StartProposerReplication(XLogRecPtr startpos)
{
for (;;)
{
/* Clear any already-pending wakeups */
ResetLatch(MyLatch);
WalProposerPoll();
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -1 +1,557 @@
int WalProposerRust(void);
#ifndef __NEON_WALPROPOSER_H__
#define __NEON_WALPROPOSER_H__
#include "deps.c"
// #include "access/xlogdefs.h"
// #include "postgres.h"
// #include "port.h"
// #include "access/xlog_internal.h"
// #include "access/transam.h"
// #include "nodes/replnodes.h"
/*-------------------------------------------------------------------------
*
* uuid.h
* Header file for the "uuid" ADT. In C, we use the name pg_uuid_t,
* to avoid conflicts with any uuid_t type that might be defined by
* the system headers.
*
* Copyright (c) 2007-2021, PostgreSQL Global Development Group
*
* src/include/utils/uuid.h
*
*-------------------------------------------------------------------------
*/
#ifndef UUID_H
#define UUID_H
/* uuid size in bytes */
#define UUID_LEN 16
typedef struct pg_uuid_t
{
unsigned char data[UUID_LEN];
} pg_uuid_t;
/* fmgr interface macros */
#define UUIDPGetDatum(X) PointerGetDatum(X)
#define PG_RETURN_UUID_P(X) return UUIDPGetDatum(X)
#define DatumGetUUIDP(X) ((pg_uuid_t *) DatumGetPointer(X))
#define PG_GETARG_UUID_P(X) DatumGetUUIDP(PG_GETARG_DATUM(X))
#endif /* UUID_H */
// #include "replication/walreceiver.h"
#define SK_MAGIC 0xCafeCeefu
#define SK_PROTOCOL_VERSION 2
#define MAX_SAFEKEEPERS 32
#define MAX_SEND_SIZE (XLOG_BLCKSZ * 16) /* max size of a single* WAL
* message */
#define XLOG_HDR_SIZE (1 + 8 * 3) /* 'w' + startPos + walEnd + timestamp */
#define XLOG_HDR_START_POS 1 /* offset of start position in wal sender*
* message header */
#define XLOG_HDR_END_POS (1 + 8) /* offset of end position in wal sender*
* message header */
/*
* In the spirit of WL_SOCKET_READABLE and others, this corresponds to no events having occured,
* because all WL_* events are given flags equal to some (1 << i), starting from i = 0
*/
#define WL_NO_EVENTS 0
#define MAXCONNINFO 1024
extern char *wal_acceptors_list;
extern int wal_acceptor_reconnect_timeout;
extern int wal_acceptor_connection_timeout;
extern bool am_wal_proposer;
struct WalProposerConn; /* Defined in libpqwalproposer */
typedef struct WalProposerConn WalProposerConn;
struct WalMessage;
typedef struct WalMessage WalMessage;
extern char *neon_timeline_walproposer;
extern char *neon_tenant_walproposer;
extern char *neon_safekeeper_token_walproposer;
/* Possible return values from ReadPGAsync */
typedef enum
{
/* The full read was successful. buf now points to the data */
PG_ASYNC_READ_SUCCESS,
/*
* The read is ongoing. Wait until the connection is read-ready, then try
* again.
*/
PG_ASYNC_READ_TRY_AGAIN,
/* Reading failed. Check PQerrorMessage(conn) */
PG_ASYNC_READ_FAIL,
} PGAsyncReadResult;
/* Possible return values from WritePGAsync */
typedef enum
{
/* The write fully completed */
PG_ASYNC_WRITE_SUCCESS,
/*
* The write started, but you'll need to call PQflush some more times to
* finish it off. We just tried, so it's best to wait until the connection
* is read- or write-ready to try again.
*
* If it becomes read-ready, call PQconsumeInput and flush again. If it
* becomes write-ready, just call PQflush.
*/
PG_ASYNC_WRITE_TRY_FLUSH,
/* Writing failed. Check PQerrorMessage(conn) */
PG_ASYNC_WRITE_FAIL,
} PGAsyncWriteResult;
/*
* WAL safekeeper state, which is used to wait for some event.
*
* States are listed here in the order that they're executed.
*
* Most states, upon failure, will move back to SS_OFFLINE by calls to
* ResetConnection or ShutdownConnection.
*/
typedef enum
{
/*
* Does not have an active connection and will stay that way until further
* notice.
*
* Moves to SS_CONNECTING_WRITE by calls to ResetConnection.
*/
SS_OFFLINE,
/*
* Connecting states. "_READ" waits for the socket to be available for
* reading, "_WRITE" waits for writing. There's no difference in the code
* they execute when polled, but we have this distinction in order to
* recreate the event set in HackyRemoveWalProposerEvent.
*
* After the connection is made, "START_WAL_PUSH" query is sent.
*/
SS_CONNECTING_WRITE,
SS_CONNECTING_READ,
/*
* Waiting for the result of the "START_WAL_PUSH" command.
*
* After we get a successful result, sends handshake to safekeeper.
*/
SS_WAIT_EXEC_RESULT,
/*
* Executing the receiving half of the handshake. After receiving, moves
* to SS_VOTING.
*/
SS_HANDSHAKE_RECV,
/*
* Waiting to participate in voting, but a quorum hasn't yet been reached.
* This is an idle state - we do not expect AdvancePollState to be called.
*
* Moved externally by execution of SS_HANDSHAKE_RECV, when we received a
* quorum of handshakes.
*/
SS_VOTING,
/*
* Already sent voting information, waiting to receive confirmation from
* the node. After receiving, moves to SS_IDLE, if the quorum isn't
* reached yet.
*/
SS_WAIT_VERDICT,
/* Need to flush ProposerElected message. */
SS_SEND_ELECTED_FLUSH,
/*
* Waiting for quorum to send WAL. Idle state. If the socket becomes
* read-ready, the connection has been closed.
*
* Moves to SS_ACTIVE only by call to StartStreaming.
*/
SS_IDLE,
/*
* Active phase, when we acquired quorum and have WAL to send or feedback
* to read.
*/
SS_ACTIVE,
} SafekeeperState;
/* Consensus logical timestamp. */
typedef uint64 term_t;
/* neon storage node id */
typedef uint64 NNodeId;
/*
* Proposer <-> Acceptor messaging.
*/
/* Initial Proposer -> Acceptor message */
typedef struct ProposerGreeting
{
uint64 tag; /* message tag */
uint32 protocolVersion; /* proposer-safekeeper protocol version */
uint32 pgVersion;
pg_uuid_t proposerId;
uint64 systemId; /* Postgres system identifier */
uint8 timeline_id[16]; /* Neon timeline id */
uint8 tenant_id[16];
TimeLineID timeline;
uint32 walSegSize;
} ProposerGreeting;
typedef struct AcceptorProposerMessage
{
uint64 tag;
} AcceptorProposerMessage;
/*
* Acceptor -> Proposer initial response: the highest term acceptor voted for.
*/
typedef struct AcceptorGreeting
{
AcceptorProposerMessage apm;
term_t term;
NNodeId nodeId;
} AcceptorGreeting;
/*
* Proposer -> Acceptor vote request.
*/
typedef struct VoteRequest
{
uint64 tag;
term_t term;
pg_uuid_t proposerId; /* for monitoring/debugging */
} VoteRequest;
/* Element of term switching chain. */
typedef struct TermSwitchEntry
{
term_t term;
XLogRecPtr lsn;
} TermSwitchEntry;
typedef struct TermHistory
{
uint32 n_entries;
TermSwitchEntry *entries;
} TermHistory;
/* Vote itself, sent from safekeeper to proposer */
typedef struct VoteResponse
{
AcceptorProposerMessage apm;
term_t term;
uint64 voteGiven;
/*
* Safekeeper flush_lsn (end of WAL) + history of term switches allow
* proposer to choose the most advanced one.
*/
XLogRecPtr flushLsn;
XLogRecPtr truncateLsn; /* minimal LSN which may be needed for*
* recovery of some safekeeper */
TermHistory termHistory;
XLogRecPtr timelineStartLsn; /* timeline globally starts at this LSN */
} VoteResponse;
/*
* Proposer -> Acceptor message announcing proposer is elected and communicating
* epoch history to it.
*/
typedef struct ProposerElected
{
uint64 tag;
term_t term;
/* proposer will send since this point */
XLogRecPtr startStreamingAt;
/* history of term switches up to this proposer */
TermHistory *termHistory;
/* timeline globally starts at this LSN */
XLogRecPtr timelineStartLsn;
} ProposerElected;
/*
* Header of request with WAL message sent from proposer to safekeeper.
*/
typedef struct AppendRequestHeader
{
uint64 tag;
term_t term; /* term of the proposer */
/*
* LSN since which current proposer appends WAL (begin_lsn of its first
* record); determines epoch switch point.
*/
XLogRecPtr epochStartLsn;
XLogRecPtr beginLsn; /* start position of message in WAL */
XLogRecPtr endLsn; /* end position of message in WAL */
XLogRecPtr commitLsn; /* LSN committed by quorum of safekeepers */
/*
* minimal LSN which may be needed for recovery of some safekeeper (end
* lsn + 1 of last chunk streamed to everyone)
*/
XLogRecPtr truncateLsn;
pg_uuid_t proposerId; /* for monitoring/debugging */
} AppendRequestHeader;
typedef struct FullTransactionId
{
uint64 value;
} FullTransactionId;
/*
* Hot standby feedback received from replica
*/
typedef struct HotStandbyFeedback
{
TimestampTz ts;
FullTransactionId xmin;
FullTransactionId catalog_xmin;
} HotStandbyFeedback;
typedef struct ReplicationFeedback
{
/* current size of the timeline on pageserver */
uint64 currentClusterSize;
/* standby_status_update fields that safekeeper received from pageserver */
XLogRecPtr ps_writelsn;
XLogRecPtr ps_flushlsn;
XLogRecPtr ps_applylsn;
TimestampTz ps_replytime;
} ReplicationFeedback;
typedef struct WalproposerShmemState
{
ReplicationFeedback feedback;
term_t mineLastElectedTerm;
uint64 backpressureThrottlingTime;
} WalproposerShmemState;
/*
* Report safekeeper state to proposer
*/
typedef struct AppendResponse
{
AcceptorProposerMessage apm;
/*
* Current term of the safekeeper; if it is higher than proposer's, the
* compute is out of date.
*/
term_t term;
/* TODO: add comment */
XLogRecPtr flushLsn;
/* Safekeeper reports back his awareness about which WAL is committed, as */
/* this is a criterion for walproposer --sync mode exit */
XLogRecPtr commitLsn;
HotStandbyFeedback hs;
/* Feedback recieved from pageserver includes standby_status_update fields */
/* and custom neon feedback. */
/* This part of the message is extensible. */
ReplicationFeedback rf;
} AppendResponse;
/* ReplicationFeedback is extensible part of the message that is parsed separately */
/* Other fields are fixed part */
#define APPENDRESPONSE_FIXEDPART_SIZE offsetof(AppendResponse, rf)
/*
* Descriptor of safekeeper
*/
typedef struct Safekeeper
{
char const *host;
char const *port;
/*
* connection string for connecting/reconnecting.
*
* May contain private information like password and should not be logged.
*/
char conninfo[MAXCONNINFO];
/*
* postgres protocol connection to the WAL acceptor
*
* Equals NULL only when state = SS_OFFLINE. Nonblocking is set once we
* reach SS_ACTIVE; not before.
*/
WalProposerConn *conn;
/*
* Temporary buffer for the message being sent to the safekeeper.
*/
StringInfoData outbuf;
/*
* WAL reader, allocated for each safekeeper.
*/
XLogReaderState *xlogreader;
/*
* Streaming will start here; must be record boundary.
*/
XLogRecPtr startStreamingAt;
bool flushWrite; /* set to true if we need to call AsyncFlush,*
* to flush pending messages */
XLogRecPtr streamingAt; /* current streaming position */
AppendRequestHeader appendRequest; /* request for sending to safekeeper */
int eventPos; /* position in wait event set. Equal to -1 if*
* no event */
SafekeeperState state; /* safekeeper state machine state */
TimestampTz latestMsgReceivedAt; /* when latest msg is received */
AcceptorGreeting greetResponse; /* acceptor greeting */
VoteResponse voteResponse; /* the vote */
AppendResponse appendResponse; /* feedback for master */
} Safekeeper;
extern void WalProposerSync(int argc, char *argv[]);
extern int WalProposerRust(void);
extern void WalProposerMain(Datum main_arg);
extern void WalProposerBroadcast(XLogRecPtr startpos, XLogRecPtr endpos);
extern void WalProposerPoll(void);
extern void ParseReplicationFeedbackMessage(StringInfo reply_message,
ReplicationFeedback *rf);
extern void StartProposerReplication(XLogRecPtr startpos);
extern Size WalproposerShmemSize(void);
extern bool WalproposerShmemInit(void);
extern void replication_feedback_set(ReplicationFeedback *rf);
extern void replication_feedback_get_lsns(XLogRecPtr *writeLsn, XLogRecPtr *flushLsn, XLogRecPtr *applyLsn);
/* libpqwalproposer hooks & helper type */
/* Re-exported PostgresPollingStatusType */
typedef enum
{
WP_CONN_POLLING_FAILED = 0,
WP_CONN_POLLING_READING,
WP_CONN_POLLING_WRITING,
WP_CONN_POLLING_OK,
/*
* 'libpq-fe.h' still has PGRES_POLLING_ACTIVE, but says it's unused.
* We've removed it here to avoid clutter.
*/
} WalProposerConnectPollStatusType;
/* Re-exported and modified ExecStatusType */
typedef enum
{
/* We received a single CopyBoth result */
WP_EXEC_SUCCESS_COPYBOTH,
/*
* Any success result other than a single CopyBoth was received. The
* specifics of the result were already logged, but it may be useful to
* provide an error message indicating which safekeeper messed up.
*
* Do not expect PQerrorMessage to be appropriately set.
*/
WP_EXEC_UNEXPECTED_SUCCESS,
/*
* No result available at this time. Wait until read-ready, then call
* again. Internally, this is returned when PQisBusy indicates that
* PQgetResult would block.
*/
WP_EXEC_NEEDS_INPUT,
/* Catch-all failure. Check PQerrorMessage. */
WP_EXEC_FAILED,
} WalProposerExecStatusType;
/* Re-exported ConnStatusType */
typedef enum
{
WP_CONNECTION_OK,
WP_CONNECTION_BAD,
/*
* The original ConnStatusType has many more tags, but requests that they
* not be relied upon (except for displaying to the user). We don't need
* that extra functionality, so we collect them into a single tag here.
*/
WP_CONNECTION_IN_PROGRESS,
} WalProposerConnStatusType;
/* Re-exported PQerrorMessage */
extern char *walprop_error_message(WalProposerConn *conn);
/* Re-exported PQstatus */
extern WalProposerConnStatusType walprop_status(WalProposerConn *conn);
/* Re-exported PQconnectStart */
extern WalProposerConn * walprop_connect_start(char *conninfo);
/* Re-exported PQconectPoll */
extern WalProposerConnectPollStatusType walprop_connect_poll(WalProposerConn *conn);
/* Blocking wrapper around PQsendQuery */
extern bool walprop_send_query(WalProposerConn *conn, char *query);
/* Wrapper around PQconsumeInput + PQisBusy + PQgetResult */
extern WalProposerExecStatusType walprop_get_query_result(WalProposerConn *conn);
/* Re-exported PQsocket */
extern pgsocket walprop_socket(WalProposerConn *conn);
/* Wrapper around PQconsumeInput (if socket's read-ready) + PQflush */
extern int walprop_flush(WalProposerConn *conn);
/* Re-exported PQfinish */
extern void walprop_finish(WalProposerConn *conn);
/*
* Ergonomic wrapper around PGgetCopyData
*
* Reads a CopyData block from a safekeeper, setting *amount to the number
* of bytes returned.
*
* This function is allowed to assume certain properties specific to the
* protocol with the safekeepers, so it should not be used as-is for any
* other purpose.
*
* Note: If possible, using <AsyncRead> is generally preferred, because it
* performs a bit of extra checking work that's always required and is normally
* somewhat verbose.
*/
extern PGAsyncReadResult walprop_async_read(WalProposerConn *conn, char **buf, int *amount);
/*
* Ergonomic wrapper around PQputCopyData + PQflush
*
* Starts to write a CopyData block to a safekeeper.
*
* For information on the meaning of return codes, refer to PGAsyncWriteResult.
*/
extern PGAsyncWriteResult walprop_async_write(WalProposerConn *conn, void const *buf, size_t size);
/*
* Blocking equivalent to walprop_async_write_fn
*
* Returns 'true' if successful, 'false' on failure.
*/
extern bool walprop_blocking_write(WalProposerConn *conn, void const *buf, size_t size);
extern uint64 BackpressureThrottlingTime(void);
#endif /* __NEON_WALPROPOSER_H__ */