Run pgindent on walproposer.

This commit is contained in:
Arseny Sher
2023-09-29 13:16:48 +03:00
parent 8a0b3d7c32
commit a8a1cc5e80
3 changed files with 218 additions and 215 deletions

View File

@@ -28,7 +28,7 @@
* playing consensus game is impossible, so speculative 'let's just poll
* safekeepers, learn start LSN of future epoch and run basebackup'
* won't work.
*
*
* Both ways are implemented in walproposer_pg.c file. This file contains
* generic part of walproposer which can be used in both cases, but can also
* be used as an independent library.
@@ -57,7 +57,7 @@ static void RecvAcceptorGreeting(Safekeeper *sk);
static void SendVoteRequest(Safekeeper *sk);
static void RecvVoteResponse(Safekeeper *sk);
static void HandleElectedProposer(WalProposer *wp);
static term_t GetHighestTerm(TermHistory * th);
static term_t GetHighestTerm(TermHistory *th);
static term_t GetEpoch(Safekeeper *sk);
static void DetermineEpochStartLsn(WalProposer *wp);
static void SendProposerElected(Safekeeper *sk);
@@ -71,13 +71,13 @@ static XLogRecPtr CalculateMinFlushLsn(WalProposer *wp);
static XLogRecPtr GetAcknowledgedByQuorumWALPosition(WalProposer *wp);
static void HandleSafekeeperResponse(WalProposer *wp);
static bool AsyncRead(Safekeeper *sk, char **buf, int *buf_size);
static bool AsyncReadMessage(Safekeeper *sk, AcceptorProposerMessage * anymsg);
static bool AsyncReadMessage(Safekeeper *sk, AcceptorProposerMessage *anymsg);
static bool BlockingWrite(Safekeeper *sk, void *msg, size_t msg_size, SafekeeperState success_state);
static bool AsyncWrite(Safekeeper *sk, void *msg, size_t msg_size, SafekeeperState flush_state);
static bool AsyncFlush(Safekeeper *sk);
static int CompareLsn(const void *a, const void *b);
static int CompareLsn(const void *a, const void *b);
static char *FormatSafekeeperState(SafekeeperState state);
static void AssertEventsOkForState(uint32 events, Safekeeper *sk);
static void AssertEventsOkForState(uint32 events, Safekeeper *sk);
static uint32 SafekeeperStateDesiredEvents(SafekeeperState state);
static char *FormatEvents(uint32 events);
@@ -88,7 +88,7 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api)
char *sep;
char *port;
WalProposer *wp;
wp = palloc0(sizeof(WalProposer));
wp->config = config;
wp->api = api;
@@ -116,7 +116,7 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api)
{
Safekeeper *sk = &wp->safekeeper[wp->n_safekeepers];
int written = 0;
int written = 0;
written = snprintf((char *) &sk->conninfo, MAXCONNINFO,
"host=%s port=%s dbname=replication options='-c timeline_id=%s tenant_id=%s'",
@@ -198,7 +198,7 @@ WalProposerPoll(WalProposer *wp)
/* Exit loop if latch is set (we got new WAL) */
if ((rc == 1 && events & WL_LATCH_SET))
break;
/*
* If the event contains something that one of our safekeeper states
* was waiting for, we'll advance its state.
@@ -215,15 +215,16 @@ WalProposerPoll(WalProposer *wp)
*/
ReconnectSafekeepers(wp);
if (rc == 0) /* timeout expired */
if (rc == 0) /* timeout expired */
{
/*
* Ensure flushrecptr is set to a recent value. This fixes a case
* where we've not been notified of new WAL records when we were
* planning on consuming them.
*/
if (!wp->config->syncSafekeepers) {
XLogRecPtr flushed = wp->api.get_flush_rec_ptr();
if (!wp->config->syncSafekeepers)
{
XLogRecPtr flushed = wp->api.get_flush_rec_ptr();
if (flushed > wp->availableLsn)
break;
@@ -574,6 +575,7 @@ HandleConnectionEvent(Safekeeper *sk)
elog(LOG, "connected with node %s:%s", sk->host,
sk->port);
sk->latestMsgReceivedAt = wp->api.get_current_timestamp();
/*
* We have to pick some event to update event set. We'll
* eventually need the socket to be readable, so we go with that.
@@ -712,7 +714,7 @@ RecvAcceptorGreeting(Safekeeper *sk)
* until later.
*/
sk->greetResponse.apm.tag = 'g';
if (!AsyncReadMessage(sk, (AcceptorProposerMessage *) & sk->greetResponse))
if (!AsyncReadMessage(sk, (AcceptorProposerMessage *) &sk->greetResponse))
return;
elog(LOG, "received AcceptorGreeting from safekeeper %s:%s", sk->host, sk->port);
@@ -720,10 +722,10 @@ RecvAcceptorGreeting(Safekeeper *sk)
/* Protocol is all good, move to voting. */
sk->state = SS_VOTING;
/*
/*
* Note: it would be better to track the counter on per safekeeper basis,
* but at worst walproposer would restart with 'term rejected', so leave as
* is for now.
* but at worst walproposer would restart with 'term rejected', so leave
* as is for now.
*/
++wp->n_connected;
if (wp->n_connected <= wp->quorum)
@@ -804,7 +806,7 @@ RecvVoteResponse(Safekeeper *sk)
WalProposer *wp = sk->wp;
sk->voteResponse.apm.tag = 'v';
if (!AsyncReadMessage(sk, (AcceptorProposerMessage *) & sk->voteResponse))
if (!AsyncReadMessage(sk, (AcceptorProposerMessage *) &sk->voteResponse))
return;
elog(LOG,
@@ -915,7 +917,7 @@ HandleElectedProposer(WalProposer *wp)
/* latest term in TermHistory, or 0 is there is no entries */
static term_t
GetHighestTerm(TermHistory * th)
GetHighestTerm(TermHistory *th)
{
return th->n_entries > 0 ? th->entries[th->n_entries - 1].term : 0;
}
@@ -1042,7 +1044,7 @@ DetermineEpochStartLsn(WalProposer *wp)
*/
if (!wp->config->syncSafekeepers)
{
WalproposerShmemState * walprop_shared = wp->api.get_shmem_state();
WalproposerShmemState *walprop_shared = wp->api.get_shmem_state();
/*
* Basebackup LSN always points to the beginning of the record (not
@@ -1244,7 +1246,7 @@ BroadcastAppendRequest(WalProposer *wp)
}
static void
PrepareAppendRequest(WalProposer *wp, AppendRequestHeader * req, XLogRecPtr beginLsn, XLogRecPtr endLsn)
PrepareAppendRequest(WalProposer *wp, AppendRequestHeader *req, XLogRecPtr beginLsn, XLogRecPtr endLsn)
{
Assert(endLsn >= beginLsn);
req->tag = 'a';
@@ -1355,9 +1357,9 @@ SendAppendRequests(Safekeeper *sk)
enlargeStringInfo(&sk->outbuf, req->endLsn - req->beginLsn);
/* wal_read will raise error on failure */
wp->api.wal_read(sk->xlogreader,
&sk->outbuf.data[sk->outbuf.len],
req->beginLsn,
req->endLsn - req->beginLsn);
&sk->outbuf.data[sk->outbuf.len],
req->beginLsn,
req->endLsn - req->beginLsn);
sk->outbuf.len += req->endLsn - req->beginLsn;
writeResult = wp->api.conn_async_write(sk->conn, sk->outbuf.data, sk->outbuf.len);
@@ -1419,7 +1421,7 @@ RecvAppendResponses(Safekeeper *sk)
* work until later.
*/
sk->appendResponse.apm.tag = 'a';
if (!AsyncReadMessage(sk, (AcceptorProposerMessage *) & sk->appendResponse))
if (!AsyncReadMessage(sk, (AcceptorProposerMessage *) &sk->appendResponse))
break;
ereport(DEBUG2,
@@ -1460,7 +1462,7 @@ RecvAppendResponses(Safekeeper *sk)
/* Parse a PageserverFeedback message, or the PageserverFeedback part of an AppendResponse */
void
ParsePageserverFeedbackMessage(StringInfo reply_message, PageserverFeedback * rf)
ParsePageserverFeedbackMessage(StringInfo reply_message, PageserverFeedback *rf)
{
uint8 nkeys;
int i;
@@ -1544,8 +1546,8 @@ static XLogRecPtr
CalculateMinFlushLsn(WalProposer *wp)
{
XLogRecPtr lsn = wp->n_safekeepers > 0
? wp->safekeeper[0].appendResponse.flushLsn
: InvalidXLogRecPtr;
? wp->safekeeper[0].appendResponse.flushLsn
: InvalidXLogRecPtr;
for (int i = 1; i < wp->n_safekeepers; i++)
{
@@ -1648,15 +1650,15 @@ HandleSafekeeperResponse(WalProposer *wp)
if (n_synced >= wp->quorum)
{
/* A quorum of safekeepers has been synced! */
/*
* Send empty message to broadcast latest truncateLsn to all safekeepers.
* This helps to finish next sync-safekeepers eailier, by skipping recovery
* step.
*
* We don't need to wait for response because it doesn't affect correctness,
* and TCP should be able to deliver the message to safekeepers in case of
* network working properly.
* Send empty message to broadcast latest truncateLsn to all
* safekeepers. This helps to finish next sync-safekeepers
* eailier, by skipping recovery step.
*
* We don't need to wait for response because it doesn't affect
* correctness, and TCP should be able to deliver the message to
* safekeepers in case of network working properly.
*/
BroadcastAppendRequest(wp);
@@ -1705,7 +1707,7 @@ AsyncRead(Safekeeper *sk, char **buf, int *buf_size)
* failed, a warning is emitted and the connection is reset.
*/
static bool
AsyncReadMessage(Safekeeper *sk, AcceptorProposerMessage * anymsg)
AsyncReadMessage(Safekeeper *sk, AcceptorProposerMessage *anymsg)
{
WalProposer *wp = sk->wp;
@@ -2072,12 +2074,12 @@ FormatEvents(uint32 events)
/* Helper variable to check if there's extra bits */
uint32 all_flags = WL_LATCH_SET
| WL_SOCKET_READABLE
| WL_SOCKET_WRITEABLE
| WL_TIMEOUT
| WL_POSTMASTER_DEATH
| WL_EXIT_ON_PM_DEATH
| WL_SOCKET_CONNECTED;
| WL_SOCKET_READABLE
| WL_SOCKET_WRITEABLE
| WL_TIMEOUT
| WL_POSTMASTER_DEATH
| WL_EXIT_ON_PM_DEATH
| WL_SOCKET_CONNECTED;
/*
* The formatting here isn't supposed to be *particularly* useful -- it's

View File

@@ -38,7 +38,7 @@ typedef enum
PG_ASYNC_READ_TRY_AGAIN,
/* Reading failed. Check PQerrorMessage(conn) */
PG_ASYNC_READ_FAIL,
} PGAsyncReadResult;
} PGAsyncReadResult;
/* Possible return values from WritePGAsync */
typedef enum
@@ -57,7 +57,7 @@ typedef enum
PG_ASYNC_WRITE_TRY_FLUSH,
/* Writing failed. Check PQerrorMessage(conn) */
PG_ASYNC_WRITE_FAIL,
} PGAsyncWriteResult;
} PGAsyncWriteResult;
/*
* WAL safekeeper state, which is used to wait for some event.
@@ -133,7 +133,7 @@ typedef enum
* to read.
*/
SS_ACTIVE,
} SafekeeperState;
} SafekeeperState;
/* Consensus logical timestamp. */
typedef uint64 term_t;
@@ -157,12 +157,12 @@ typedef struct ProposerGreeting
uint8 tenant_id[16];
TimeLineID timeline;
uint32 walSegSize;
} ProposerGreeting;
} ProposerGreeting;
typedef struct AcceptorProposerMessage
{
uint64 tag;
} AcceptorProposerMessage;
} AcceptorProposerMessage;
/*
* Acceptor -> Proposer initial response: the highest term acceptor voted for.
@@ -172,7 +172,7 @@ typedef struct AcceptorGreeting
AcceptorProposerMessage apm;
term_t term;
NNodeId nodeId;
} AcceptorGreeting;
} AcceptorGreeting;
/*
* Proposer -> Acceptor vote request.
@@ -182,20 +182,20 @@ typedef struct VoteRequest
uint64 tag;
term_t term;
pg_uuid_t proposerId; /* for monitoring/debugging */
} VoteRequest;
} VoteRequest;
/* Element of term switching chain. */
typedef struct TermSwitchEntry
{
term_t term;
XLogRecPtr lsn;
} TermSwitchEntry;
} TermSwitchEntry;
typedef struct TermHistory
{
uint32 n_entries;
TermSwitchEntry *entries;
} TermHistory;
} TermHistory;
/* Vote itself, sent from safekeeper to proposer */
typedef struct VoteResponse
@@ -213,7 +213,7 @@ typedef struct VoteResponse
* recovery of some safekeeper */
TermHistory termHistory;
XLogRecPtr timelineStartLsn; /* timeline globally starts at this LSN */
} VoteResponse;
} VoteResponse;
/*
* Proposer -> Acceptor message announcing proposer is elected and communicating
@@ -229,7 +229,7 @@ typedef struct ProposerElected
TermHistory *termHistory;
/* timeline globally starts at this LSN */
XLogRecPtr timelineStartLsn;
} ProposerElected;
} ProposerElected;
/*
* Header of request with WAL message sent from proposer to safekeeper.
@@ -254,7 +254,7 @@ typedef struct AppendRequestHeader
*/
XLogRecPtr truncateLsn;
pg_uuid_t proposerId; /* for monitoring/debugging */
} AppendRequestHeader;
} AppendRequestHeader;
/*
* Hot standby feedback received from replica
@@ -264,7 +264,7 @@ typedef struct HotStandbyFeedback
TimestampTz ts;
FullTransactionId xmin;
FullTransactionId catalog_xmin;
} HotStandbyFeedback;
} HotStandbyFeedback;
typedef struct PageserverFeedback
{
@@ -275,7 +275,7 @@ typedef struct PageserverFeedback
XLogRecPtr disk_consistent_lsn;
XLogRecPtr remote_consistent_lsn;
TimestampTz replytime;
} PageserverFeedback;
} PageserverFeedback;
typedef struct WalproposerShmemState
{
@@ -283,7 +283,7 @@ typedef struct WalproposerShmemState
PageserverFeedback feedback;
term_t mineLastElectedTerm;
pg_atomic_uint64 backpressureThrottlingTime;
} WalproposerShmemState;
} WalproposerShmemState;
/*
* Report safekeeper state to proposer
@@ -307,7 +307,7 @@ typedef struct AppendResponse
/* and custom neon feedback. */
/* This part of the message is extensible. */
PageserverFeedback rf;
} AppendResponse;
} AppendResponse;
/* PageserverFeedback is extensible part of the message that is parsed separately */
/* Other fields are fixed part */
@@ -331,7 +331,7 @@ typedef struct Safekeeper
*
* May contain private information like password and should not be logged.
*/
char conninfo[MAXCONNINFO];
char conninfo[MAXCONNINFO];
/*
* postgres protocol connection to the WAL acceptor
@@ -364,7 +364,7 @@ typedef struct Safekeeper
int eventPos; /* position in wait event set. Equal to -1 if*
* no event */
SafekeeperState state; /* safekeeper state machine state */
TimestampTz latestMsgReceivedAt; /* when latest msg is received */
TimestampTz latestMsgReceivedAt; /* when latest msg is received */
AcceptorGreeting greetResponse; /* acceptor greeting */
VoteResponse voteResponse; /* the vote */
AppendResponse appendResponse; /* feedback for master */
@@ -382,7 +382,7 @@ typedef enum
* 'libpq-fe.h' still has PGRES_POLLING_ACTIVE, but says it's unused.
* We've removed it here to avoid clutter.
*/
} WalProposerConnectPollStatusType;
} WalProposerConnectPollStatusType;
/* Re-exported and modified ExecStatusType */
typedef enum
@@ -407,7 +407,7 @@ typedef enum
WP_EXEC_NEEDS_INPUT,
/* Catch-all failure. Check PQerrorMessage. */
WP_EXEC_FAILED,
} WalProposerExecStatusType;
} WalProposerExecStatusType;
/* Re-exported ConnStatusType */
typedef enum
@@ -421,7 +421,7 @@ typedef enum
* that extra functionality, so we collect them into a single tag here.
*/
WP_CONNECTION_IN_PROGRESS,
} WalProposerConnStatusType;
} WalProposerConnStatusType;
/*
* Collection of hooks for walproposer, to call postgres functions,
@@ -430,185 +430,187 @@ typedef enum
typedef struct walproposer_api
{
/*
* Get WalproposerShmemState. This is used to store information about
* last elected term.
* Get WalproposerShmemState. This is used to store information about last
* elected term.
*/
WalproposerShmemState * (*get_shmem_state) (void);
WalproposerShmemState *(*get_shmem_state) (void);
/*
* Start receiving notifications about new WAL. This is an infinite loop
* which calls WalProposerBroadcast() and WalProposerPoll() to send the WAL.
* which calls WalProposerBroadcast() and WalProposerPoll() to send the
* WAL.
*/
void (*start_streaming) (WalProposer *wp, XLogRecPtr startpos);
void (*start_streaming) (WalProposer *wp, XLogRecPtr startpos);
/* Get pointer to the latest available WAL. */
XLogRecPtr (*get_flush_rec_ptr) (void);
XLogRecPtr (*get_flush_rec_ptr) (void);
/* Get current time. */
TimestampTz (*get_current_timestamp) (void);
TimestampTz (*get_current_timestamp) (void);
/* Get postgres timeline. */
TimeLineID (*get_timeline_id) (void);
TimeLineID (*get_timeline_id) (void);
/* Current error message, aka PQerrorMessage. */
char * (*conn_error_message) (WalProposerConn * conn);
char *(*conn_error_message) (WalProposerConn *conn);
/* Connection status, aka PQstatus. */
WalProposerConnStatusType (*conn_status) (WalProposerConn * conn);
WalProposerConnStatusType (*conn_status) (WalProposerConn *conn);
/* Start the connection, aka PQconnectStart. */
WalProposerConn * (*conn_connect_start) (char *conninfo);
WalProposerConn *(*conn_connect_start) (char *conninfo);
/* Poll an asynchronous connection, aka PQconnectPoll. */
WalProposerConnectPollStatusType (*conn_connect_poll) (WalProposerConn * conn);
WalProposerConnectPollStatusType (*conn_connect_poll) (WalProposerConn *conn);
/* Send a blocking SQL query, aka PQsendQuery. */
bool (*conn_send_query) (WalProposerConn * conn, char * query);
bool (*conn_send_query) (WalProposerConn *conn, char *query);
/* Read the query result, aka PQgetResult. */
WalProposerExecStatusType (*conn_get_query_result) (WalProposerConn * conn);
WalProposerExecStatusType (*conn_get_query_result) (WalProposerConn *conn);
/* Flush buffer to the network, aka PQflush. */
int (*conn_flush) (WalProposerConn * conn);
int (*conn_flush) (WalProposerConn *conn);
/* Close the connection, aka PQfinish. */
void (*conn_finish) (WalProposerConn * conn);
void (*conn_finish) (WalProposerConn *conn);
/* Try to read CopyData message, aka PQgetCopyData. */
PGAsyncReadResult (*conn_async_read) (WalProposerConn * conn, char **buf, int *amount);
PGAsyncReadResult (*conn_async_read) (WalProposerConn *conn, char **buf, int *amount);
/* Try to write CopyData message, aka PQputCopyData. */
PGAsyncWriteResult (*conn_async_write) (WalProposerConn * conn, void const *buf, size_t size);
PGAsyncWriteResult (*conn_async_write) (WalProposerConn *conn, void const *buf, size_t size);
/* Blocking CopyData write, aka PQputCopyData + PQflush. */
bool (*conn_blocking_write) (WalProposerConn * conn, void const *buf, size_t size);
bool (*conn_blocking_write) (WalProposerConn *conn, void const *buf, size_t size);
/* Download WAL from startpos to endpos and make it available locally. */
bool (*recovery_download) (Safekeeper * sk, TimeLineID timeline, XLogRecPtr startpos, XLogRecPtr endpos);
bool (*recovery_download) (Safekeeper *sk, TimeLineID timeline, XLogRecPtr startpos, XLogRecPtr endpos);
/* Read WAL from disk to buf. */
void (*wal_read) (XLogReaderState *state, char *buf, XLogRecPtr startptr, Size count);
void (*wal_read) (XLogReaderState *state, char *buf, XLogRecPtr startptr, Size count);
/* Allocate WAL reader. */
XLogReaderState * (*wal_reader_allocate) (void);
XLogReaderState *(*wal_reader_allocate) (void);
/* Deallocate event set. */
void (*free_event_set) (void);
void (*free_event_set) (void);
/* Initialize event set. */
void (*init_event_set) (int n_safekeepers);
void (*init_event_set) (int n_safekeepers);
/* Update events for an existing safekeeper connection. */
void (*update_event_set) (Safekeeper * sk, uint32 events);
void (*update_event_set) (Safekeeper *sk, uint32 events);
/* Add a new safekeeper connection to the event set. */
void (*add_safekeeper_event_set) (Safekeeper * sk, uint32 events);
void (*add_safekeeper_event_set) (Safekeeper *sk, uint32 events);
/*
* Wait until some event happens:
* - timeout is reached
* - socket event for safekeeper connection
* - new WAL is available
*
* Returns 0 if timeout is reached, 1 if some event happened. Updates events mask
* to indicate events and sets sk to the safekeeper which has an event.
* Wait until some event happens: - timeout is reached - socket event for
* safekeeper connection - new WAL is available
*
* Returns 0 if timeout is reached, 1 if some event happened. Updates
* events mask to indicate events and sets sk to the safekeeper which has
* an event.
*/
int (*wait_event_set) (long timeout, Safekeeper **sk, uint32 *events);
int (*wait_event_set) (long timeout, Safekeeper **sk, uint32 *events);
/* Read random bytes. */
bool (*strong_random) (void *buf, size_t len);
bool (*strong_random) (void *buf, size_t len);
/*
* Get a basebackup LSN. Used to cross-validate with the latest available
* LSN on the safekeepers.
*/
XLogRecPtr (*get_redo_start_lsn) (void);
XLogRecPtr (*get_redo_start_lsn) (void);
/*
* Finish sync safekeepers with the given LSN. This function should not
* return and should exit the program.
*/
void (*finish_sync_safekeepers) (XLogRecPtr lsn);
void (*finish_sync_safekeepers) (XLogRecPtr lsn);
/*
* Called after every new message from the safekeeper. Used to propagate
* backpressure feedback and to confirm WAL persistence (has been commited on
* the quorum of safekeepers).
* backpressure feedback and to confirm WAL persistence (has been commited
* on the quorum of safekeepers).
*/
void (*process_safekeeper_feedback) (WalProposer *wp, XLogRecPtr commitLsn);
void (*process_safekeeper_feedback) (WalProposer *wp, XLogRecPtr commitLsn);
/*
* Called on peer_horizon_lsn updates. Used to advance replication slot
* and to free up disk space by deleting unnecessary WAL.
*/
void (*confirm_wal_streamed) (XLogRecPtr lsn);
void (*confirm_wal_streamed) (XLogRecPtr lsn);
} walproposer_api;
/*
* Configuration of the WAL proposer.
*/
typedef struct WalProposerConfig {
typedef struct WalProposerConfig
{
/* hex-encoded TenantId cstr */
char *neon_tenant;
char *neon_tenant;
/* hex-encoded TimelineId cstr */
char *neon_timeline;
char *neon_timeline;
/*
* Comma-separated list of safekeepers, in the following format:
* host1:port1,host2:port2,host3:port3
*
*
* This cstr should be editable.
*/
char *safekeepers_list;
char *safekeepers_list;
/*
* WalProposer reconnects to offline safekeepers once in this interval.
* Time is in milliseconds.
*/
int safekeeper_reconnect_timeout;
int safekeeper_reconnect_timeout;
/*
* WalProposer terminates the connection if it doesn't receive any message
* from the safekeeper in this interval. Time is in milliseconds.
*/
int safekeeper_connection_timeout;
int safekeeper_connection_timeout;
/*
* WAL segment size. Will be passed to safekeepers in greet request.
* Also used to detect page headers.
* WAL segment size. Will be passed to safekeepers in greet request. Also
* used to detect page headers.
*/
int wal_segment_size;
int wal_segment_size;
/*
* If safekeeper was started in sync mode, walproposer will not subscribe
* for new WAL and will exit when quorum of safekeepers will be synced
* to the latest available LSN.
* for new WAL and will exit when quorum of safekeepers will be synced to
* the latest available LSN.
*/
bool syncSafekeepers;
bool syncSafekeepers;
/* Will be passed to safekeepers in greet request. */
uint64 systemId;
uint64 systemId;
} WalProposerConfig;
/*
* WAL proposer state.
*/
typedef struct WalProposer {
typedef struct WalProposer
{
WalProposerConfig *config;
int n_safekeepers;
int n_safekeepers;
/* (n_safekeepers / 2) + 1 */
int quorum;
int quorum;
Safekeeper safekeeper[MAX_SAFEKEEPERS];
Safekeeper safekeeper[MAX_SAFEKEEPERS];
/* WAL has been generated up to this point */
XLogRecPtr availableLsn;
XLogRecPtr availableLsn;
/* last commitLsn broadcasted to safekeepers */
XLogRecPtr lastSentCommitLsn;
XLogRecPtr lastSentCommitLsn;
ProposerGreeting greetRequest;
@@ -616,42 +618,45 @@ typedef struct WalProposer {
VoteRequest voteRequest;
/*
* Minimal LSN which may be needed for recovery of some safekeeper,
* record-aligned (first record which might not yet received by someone).
* Minimal LSN which may be needed for recovery of some safekeeper,
* record-aligned (first record which might not yet received by someone).
*/
XLogRecPtr truncateLsn;
XLogRecPtr truncateLsn;
/*
* Term of the proposer. We want our term to be highest and unique,
* so we collect terms from safekeepers quorum, choose max and +1.
* After that our term is fixed and must not change. If we observe
* that some safekeeper has higher term, it means that we have another
* running compute, so we must stop immediately.
* Term of the proposer. We want our term to be highest and unique, so we
* collect terms from safekeepers quorum, choose max and +1. After that
* our term is fixed and must not change. If we observe that some
* safekeeper has higher term, it means that we have another running
* compute, so we must stop immediately.
*/
term_t propTerm;
term_t propTerm;
/* term history of the proposer */
TermHistory propTermHistory;
/* epoch start lsn of the proposer */
XLogRecPtr propEpochStartLsn;
XLogRecPtr propEpochStartLsn;
/* Most advanced acceptor epoch */
term_t donorEpoch;
term_t donorEpoch;
/* Most advanced acceptor */
int donor;
int donor;
/* timeline globally starts at this LSN */
XLogRecPtr timelineStartLsn;
XLogRecPtr timelineStartLsn;
/* number of votes collected from safekeepers */
int n_votes;
int n_votes;
/* number of successful connections over the lifetime of walproposer */
int n_connected;
int n_connected;
/* Timestamp of the last reconnection attempt. Related to config->safekeeper_reconnect_timeout */
/*
* Timestamp of the last reconnection attempt. Related to
* config->safekeeper_reconnect_timeout
*/
TimestampTz last_reconnect_attempt;
walproposer_api api;
@@ -662,6 +667,6 @@ extern void WalProposerStart(WalProposer *wp);
extern void WalProposerBroadcast(WalProposer *wp, XLogRecPtr startpos, XLogRecPtr endpos);
extern void WalProposerPoll(WalProposer *wp);
extern void ParsePageserverFeedbackMessage(StringInfo reply_message,
PageserverFeedback *rf);
PageserverFeedback *rf);
#endif /* __NEON_WALPROPOSER_H__ */

View File

@@ -58,7 +58,7 @@ int wal_acceptor_reconnect_timeout = 1000;
int wal_acceptor_connection_timeout = 10000;
static AppendResponse quorumFeedback;
static WalproposerShmemState * walprop_shared;
static WalproposerShmemState *walprop_shared;
static WalProposerConfig walprop_config;
static XLogRecPtr sentPtr = InvalidXLogRecPtr;
static const walproposer_api walprop_pg;
@@ -87,8 +87,8 @@ static void StartProposerReplication(WalProposer *wp, StartReplicationCmd *cmd);
static void WalSndLoop(WalProposer *wp);
static void XLogBroadcastWalProposer(WalProposer *wp);
static void XLogWalPropWrite(char *buf, Size nbytes, XLogRecPtr recptr);
static void XLogWalPropClose(XLogRecPtr recptr);
static void XLogWalPropWrite(char *buf, Size nbytes, XLogRecPtr recptr);
static void XLogWalPropClose(XLogRecPtr recptr);
static void
init_walprop_config(bool syncSafekeepers)
@@ -130,7 +130,7 @@ PGDLLEXPORT void
WalProposerMain(Datum main_arg)
{
WalProposer *wp;
init_walprop_config(false);
walprop_pg_init_bgworker();
walprop_pg_load_libpqwalreceiver();
@@ -277,14 +277,14 @@ backpressure_throttling_impl(void)
TimestampTz start,
stop;
bool retry = PrevProcessInterruptsCallback
? PrevProcessInterruptsCallback()
: false;
? PrevProcessInterruptsCallback()
: false;
/*
* Don't throttle read only transactions or wal sender.
* Do throttle CREATE INDEX CONCURRENTLY, however. It performs some
* stages outside a transaction, even though it writes a lot of WAL.
* Check PROC_IN_SAFE_IC flag to cover that case.
* Don't throttle read only transactions or wal sender. Do throttle CREATE
* INDEX CONCURRENTLY, however. It performs some stages outside a
* transaction, even though it writes a lot of WAL. Check PROC_IN_SAFE_IC
* flag to cover that case.
*/
if (am_walsender
|| (!(MyProc->statusFlags & PROC_IN_SAFE_IC)
@@ -386,7 +386,7 @@ walprop_pg_get_shmem_state(void)
}
void
replication_feedback_set(PageserverFeedback * rf)
replication_feedback_set(PageserverFeedback *rf)
{
SpinLockAcquire(&walprop_shared->mutex);
memcpy(&walprop_shared->feedback, rf, sizeof(PageserverFeedback));
@@ -455,11 +455,11 @@ walprop_pg_init_standalone_sync_safekeepers(void)
if (pipe(postmaster_alive_fds) < 0)
ereport(FATAL,
(errcode_for_file_access(),
errmsg_internal("could not create pipe to monitor postmaster death: %m")));
errmsg_internal("could not create pipe to monitor postmaster death: %m")));
if (fcntl(postmaster_alive_fds[POSTMASTER_FD_WATCH], F_SETFL, O_NONBLOCK) == -1)
ereport(FATAL,
(errcode_for_socket_access(),
errmsg_internal("could not set postmaster death monitoring pipe to nonblocking mode: %m")));
errmsg_internal("could not set postmaster death monitoring pipe to nonblocking mode: %m")));
ChangeToDataDir();
@@ -471,8 +471,8 @@ walprop_pg_init_standalone_sync_safekeepers(void)
{
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not create directory \"%s\": %m",
XLOGDIR)));
errmsg("could not create directory \"%s\": %m",
XLOGDIR)));
exit(1);
}
}
@@ -544,8 +544,7 @@ struct WalProposerConn
{
PGconn *pg_conn;
bool is_nonblocking; /* whether the connection is non-blocking */
char *recvbuf; /* last received data from
* walprop_async_read */
char *recvbuf; /* last received data from walprop_async_read */
};
/* Helper function */
@@ -593,18 +592,16 @@ walprop_connect_start(char *conninfo)
const char *keywords[3];
const char *values[3];
int n;
char *password = neon_auth_token;
char *password = neon_auth_token;
/*
* Connect using the given connection string. If the
* NEON_AUTH_TOKEN environment variable was set, use that as
* the password.
* Connect using the given connection string. If the NEON_AUTH_TOKEN
* environment variable was set, use that as the password.
*
* The connection options are parsed in the order they're given, so
* when we set the password before the connection string, the
* connection string can override the password from the env variable.
* Seems useful, although we don't currently use that capability
* anywhere.
* The connection options are parsed in the order they're given, so when
* we set the password before the connection string, the connection string
* can override the password from the env variable. Seems useful, although
* we don't currently use that capability anywhere.
*/
n = 0;
if (password)
@@ -1018,19 +1015,18 @@ StartProposerReplication(WalProposer *wp, StartReplicationCmd *cmd)
#endif
/*
* When we first start replication the standby will be behind the
* primary. For some applications, for example synchronous
* replication, it is important to have a clear state for this initial
* catchup mode, so we can trigger actions when we change streaming
* state later. We may stay in this state for a long time, which is
* exactly why we want to be able to monitor whether or not we are
* still here.
* When we first start replication the standby will be behind the primary.
* For some applications, for example synchronous replication, it is
* important to have a clear state for this initial catchup mode, so we
* can trigger actions when we change streaming state later. We may stay
* in this state for a long time, which is exactly why we want to be able
* to monitor whether or not we are still here.
*/
WalSndSetState(WALSNDSTATE_CATCHUP);
/*
* Don't allow a request to stream from a future point in WAL that
* hasn't been flushed to disk in this server yet.
* Don't allow a request to stream from a future point in WAL that hasn't
* been flushed to disk in this server yet.
*/
if (FlushPtr < cmd->startpoint)
{
@@ -1096,12 +1092,12 @@ XLogBroadcastWalProposer(WalProposer *wp)
/*
* Streaming the current timeline on a primary.
*
* Attempt to send all data that's already been written out and
* fsync'd to disk. We cannot go further than what's been written out
* given the current implementation of WALRead(). And in any case
* it's unsafe to send WAL that is not securely down to disk on the
* primary: if the primary subsequently crashes and restarts, standbys
* must not have applied any WAL that got lost on the primary.
* Attempt to send all data that's already been written out and fsync'd to
* disk. We cannot go further than what's been written out given the
* current implementation of WALRead(). And in any case it's unsafe to
* send WAL that is not securely down to disk on the primary: if the
* primary subsequently crashes and restarts, standbys must not have
* applied any WAL that got lost on the primary.
*/
#if PG_VERSION_NUM >= 150000
endptr = GetFlushRecPtr(NULL);
@@ -1167,12 +1163,12 @@ XLogBroadcastWalProposer(WalProposer *wp)
* Receive WAL from most advanced safekeeper
*/
static bool
WalProposerRecovery(Safekeeper * sk, TimeLineID timeline, XLogRecPtr startpos, XLogRecPtr endpos)
WalProposerRecovery(Safekeeper *sk, TimeLineID timeline, XLogRecPtr startpos, XLogRecPtr endpos)
{
char *err;
WalReceiverConn *wrconn;
WalRcvStreamOptions options;
char conninfo[MAXCONNINFO];
char conninfo[MAXCONNINFO];
if (!neon_auth_token)
{
@@ -1180,7 +1176,7 @@ WalProposerRecovery(Safekeeper * sk, TimeLineID timeline, XLogRecPtr startpos, X
}
else
{
int written = 0;
int written = 0;
written = snprintf((char *) conninfo, MAXCONNINFO, "password=%s %s", neon_auth_token, sk->conninfo);
if (written > MAXCONNINFO || written < 0)
@@ -1390,11 +1386,11 @@ walprop_pg_wal_read(XLogReaderState *state, char *buf, XLogRecPtr startptr, Size
WALReadError errinfo;
if (!WALRead(state,
buf,
startptr,
count,
walprop_pg_get_timeline_id(),
&errinfo))
buf,
startptr,
count,
walprop_pg_get_timeline_id(),
&errinfo))
{
WALReadRaiseError(&errinfo);
}
@@ -1432,7 +1428,7 @@ walprop_pg_init_event_set(int n_safekeepers)
}
static void
walprop_pg_update_event_set(Safekeeper * sk, uint32 events)
walprop_pg_update_event_set(Safekeeper *sk, uint32 events)
{
/* eventPos = -1 when we don't have an event */
Assert(sk->eventPos != -1);
@@ -1441,7 +1437,7 @@ walprop_pg_update_event_set(Safekeeper * sk, uint32 events)
}
static void
walprop_pg_add_safekeeper_event_set(Safekeeper * sk, uint32 events)
walprop_pg_add_safekeeper_event_set(Safekeeper *sk, uint32 events)
{
sk->eventPos = AddWaitEventToSet(waitEvents, events, walprop_socket(sk->conn), NULL, sk);
}
@@ -1462,22 +1458,21 @@ walprop_pg_wait_event_set(long timeout, Safekeeper **sk, uint32 *events)
#endif
/*
* Wait for a wait event to happen, or timeout:
* - Safekeeper socket can become available for READ or WRITE
* - Our latch got set, because
* * PG15-: We got woken up by a process triggering the WalSender
* * PG16+: WalSndCtl->wal_flush_cv was triggered
* Wait for a wait event to happen, or timeout: - Safekeeper socket can
* become available for READ or WRITE - Our latch got set, because *
* PG15-: We got woken up by a process triggering the WalSender * PG16+:
* WalSndCtl->wal_flush_cv was triggered
*/
rc = WaitEventSetWait(waitEvents, timeout,
&event, 1, WAIT_EVENT_WAL_SENDER_MAIN);
&event, 1, WAIT_EVENT_WAL_SENDER_MAIN);
#if PG_MAJORVERSION_NUM >= 16
if (WalSndCtl != NULL)
late_cv_trigger = ConditionVariableCancelSleep();
#endif
/*
* If wait is terminated by latch set (walsenders' latch is set on
* each wal flush). (no need for pm death check due to WL_EXIT_ON_PM_DEATH)
* If wait is terminated by latch set (walsenders' latch is set on each
* wal flush). (no need for pm death check due to WL_EXIT_ON_PM_DEATH)
*/
if ((rc == 1 && event.events & WL_LATCH_SET) || late_cv_trigger)
{
@@ -1488,8 +1483,8 @@ walprop_pg_wait_event_set(long timeout, Safekeeper **sk, uint32 *events)
}
/*
* If the event contains something about the socket, it means we got
* an event from a safekeeper socket.
* If the event contains something about the socket, it means we got an
* event from a safekeeper socket.
*/
if (rc == 1 && (event.events & (WL_SOCKET_MASK)))
{
@@ -1514,7 +1509,7 @@ walprop_pg_finish_sync_safekeepers(XLogRecPtr lsn)
* Get PageserverFeedback fields from the most advanced safekeeper
*/
static void
GetLatestNeonFeedback(PageserverFeedback * rf, WalProposer *wp)
GetLatestNeonFeedback(PageserverFeedback *rf, WalProposer *wp)
{
int latest_safekeeper = 0;
XLogRecPtr last_received_lsn = InvalidXLogRecPtr;
@@ -1549,7 +1544,7 @@ GetLatestNeonFeedback(PageserverFeedback * rf, WalProposer *wp)
* Combine hot standby feedbacks from all safekeepers.
*/
static void
CombineHotStanbyFeedbacks(HotStandbyFeedback * hs, WalProposer *wp)
CombineHotStanbyFeedbacks(HotStandbyFeedback *hs, WalProposer *wp)
{
hs->ts = 0;
hs->xmin.value = ~0; /* largest unsigned value */
@@ -1560,6 +1555,7 @@ CombineHotStanbyFeedbacks(HotStandbyFeedback * hs, WalProposer *wp)
if (wp->safekeeper[i].appendResponse.hs.ts != 0)
{
HotStandbyFeedback *skhs = &wp->safekeeper[i].appendResponse.hs;
if (FullTransactionIdIsNormal(skhs->xmin)
&& FullTransactionIdPrecedes(skhs->xmin, hs->xmin))
{