diff --git a/pgxn/neon/walproposer.c b/pgxn/neon/walproposer.c index 5bfce37fc5..c1fd5e3ef3 100644 --- a/pgxn/neon/walproposer.c +++ b/pgxn/neon/walproposer.c @@ -28,7 +28,7 @@ * playing consensus game is impossible, so speculative 'let's just poll * safekeepers, learn start LSN of future epoch and run basebackup' * won't work. - * + * * Both ways are implemented in walproposer_pg.c file. This file contains * generic part of walproposer which can be used in both cases, but can also * be used as an independent library. @@ -57,7 +57,7 @@ static void RecvAcceptorGreeting(Safekeeper *sk); static void SendVoteRequest(Safekeeper *sk); static void RecvVoteResponse(Safekeeper *sk); static void HandleElectedProposer(WalProposer *wp); -static term_t GetHighestTerm(TermHistory * th); +static term_t GetHighestTerm(TermHistory *th); static term_t GetEpoch(Safekeeper *sk); static void DetermineEpochStartLsn(WalProposer *wp); static void SendProposerElected(Safekeeper *sk); @@ -71,13 +71,13 @@ static XLogRecPtr CalculateMinFlushLsn(WalProposer *wp); static XLogRecPtr GetAcknowledgedByQuorumWALPosition(WalProposer *wp); static void HandleSafekeeperResponse(WalProposer *wp); static bool AsyncRead(Safekeeper *sk, char **buf, int *buf_size); -static bool AsyncReadMessage(Safekeeper *sk, AcceptorProposerMessage * anymsg); +static bool AsyncReadMessage(Safekeeper *sk, AcceptorProposerMessage *anymsg); static bool BlockingWrite(Safekeeper *sk, void *msg, size_t msg_size, SafekeeperState success_state); static bool AsyncWrite(Safekeeper *sk, void *msg, size_t msg_size, SafekeeperState flush_state); static bool AsyncFlush(Safekeeper *sk); -static int CompareLsn(const void *a, const void *b); +static int CompareLsn(const void *a, const void *b); static char *FormatSafekeeperState(SafekeeperState state); -static void AssertEventsOkForState(uint32 events, Safekeeper *sk); +static void AssertEventsOkForState(uint32 events, Safekeeper *sk); static uint32 SafekeeperStateDesiredEvents(SafekeeperState state); static char *FormatEvents(uint32 events); @@ -88,7 +88,7 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api) char *sep; char *port; WalProposer *wp; - + wp = palloc0(sizeof(WalProposer)); wp->config = config; wp->api = api; @@ -116,7 +116,7 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api) { Safekeeper *sk = &wp->safekeeper[wp->n_safekeepers]; - int written = 0; + int written = 0; written = snprintf((char *) &sk->conninfo, MAXCONNINFO, "host=%s port=%s dbname=replication options='-c timeline_id=%s tenant_id=%s'", @@ -198,7 +198,7 @@ WalProposerPoll(WalProposer *wp) /* Exit loop if latch is set (we got new WAL) */ if ((rc == 1 && events & WL_LATCH_SET)) break; - + /* * If the event contains something that one of our safekeeper states * was waiting for, we'll advance its state. @@ -215,15 +215,16 @@ WalProposerPoll(WalProposer *wp) */ ReconnectSafekeepers(wp); - if (rc == 0) /* timeout expired */ + if (rc == 0) /* timeout expired */ { /* * Ensure flushrecptr is set to a recent value. This fixes a case * where we've not been notified of new WAL records when we were * planning on consuming them. */ - if (!wp->config->syncSafekeepers) { - XLogRecPtr flushed = wp->api.get_flush_rec_ptr(); + if (!wp->config->syncSafekeepers) + { + XLogRecPtr flushed = wp->api.get_flush_rec_ptr(); if (flushed > wp->availableLsn) break; @@ -574,6 +575,7 @@ HandleConnectionEvent(Safekeeper *sk) elog(LOG, "connected with node %s:%s", sk->host, sk->port); sk->latestMsgReceivedAt = wp->api.get_current_timestamp(); + /* * We have to pick some event to update event set. We'll * eventually need the socket to be readable, so we go with that. @@ -712,7 +714,7 @@ RecvAcceptorGreeting(Safekeeper *sk) * until later. */ sk->greetResponse.apm.tag = 'g'; - if (!AsyncReadMessage(sk, (AcceptorProposerMessage *) & sk->greetResponse)) + if (!AsyncReadMessage(sk, (AcceptorProposerMessage *) &sk->greetResponse)) return; elog(LOG, "received AcceptorGreeting from safekeeper %s:%s", sk->host, sk->port); @@ -720,10 +722,10 @@ RecvAcceptorGreeting(Safekeeper *sk) /* Protocol is all good, move to voting. */ sk->state = SS_VOTING; - /* + /* * Note: it would be better to track the counter on per safekeeper basis, - * but at worst walproposer would restart with 'term rejected', so leave as - * is for now. + * but at worst walproposer would restart with 'term rejected', so leave + * as is for now. */ ++wp->n_connected; if (wp->n_connected <= wp->quorum) @@ -804,7 +806,7 @@ RecvVoteResponse(Safekeeper *sk) WalProposer *wp = sk->wp; sk->voteResponse.apm.tag = 'v'; - if (!AsyncReadMessage(sk, (AcceptorProposerMessage *) & sk->voteResponse)) + if (!AsyncReadMessage(sk, (AcceptorProposerMessage *) &sk->voteResponse)) return; elog(LOG, @@ -915,7 +917,7 @@ HandleElectedProposer(WalProposer *wp) /* latest term in TermHistory, or 0 is there is no entries */ static term_t -GetHighestTerm(TermHistory * th) +GetHighestTerm(TermHistory *th) { return th->n_entries > 0 ? th->entries[th->n_entries - 1].term : 0; } @@ -1042,7 +1044,7 @@ DetermineEpochStartLsn(WalProposer *wp) */ if (!wp->config->syncSafekeepers) { - WalproposerShmemState * walprop_shared = wp->api.get_shmem_state(); + WalproposerShmemState *walprop_shared = wp->api.get_shmem_state(); /* * Basebackup LSN always points to the beginning of the record (not @@ -1244,7 +1246,7 @@ BroadcastAppendRequest(WalProposer *wp) } static void -PrepareAppendRequest(WalProposer *wp, AppendRequestHeader * req, XLogRecPtr beginLsn, XLogRecPtr endLsn) +PrepareAppendRequest(WalProposer *wp, AppendRequestHeader *req, XLogRecPtr beginLsn, XLogRecPtr endLsn) { Assert(endLsn >= beginLsn); req->tag = 'a'; @@ -1355,9 +1357,9 @@ SendAppendRequests(Safekeeper *sk) enlargeStringInfo(&sk->outbuf, req->endLsn - req->beginLsn); /* wal_read will raise error on failure */ wp->api.wal_read(sk->xlogreader, - &sk->outbuf.data[sk->outbuf.len], - req->beginLsn, - req->endLsn - req->beginLsn); + &sk->outbuf.data[sk->outbuf.len], + req->beginLsn, + req->endLsn - req->beginLsn); sk->outbuf.len += req->endLsn - req->beginLsn; writeResult = wp->api.conn_async_write(sk->conn, sk->outbuf.data, sk->outbuf.len); @@ -1419,7 +1421,7 @@ RecvAppendResponses(Safekeeper *sk) * work until later. */ sk->appendResponse.apm.tag = 'a'; - if (!AsyncReadMessage(sk, (AcceptorProposerMessage *) & sk->appendResponse)) + if (!AsyncReadMessage(sk, (AcceptorProposerMessage *) &sk->appendResponse)) break; ereport(DEBUG2, @@ -1460,7 +1462,7 @@ RecvAppendResponses(Safekeeper *sk) /* Parse a PageserverFeedback message, or the PageserverFeedback part of an AppendResponse */ void -ParsePageserverFeedbackMessage(StringInfo reply_message, PageserverFeedback * rf) +ParsePageserverFeedbackMessage(StringInfo reply_message, PageserverFeedback *rf) { uint8 nkeys; int i; @@ -1544,8 +1546,8 @@ static XLogRecPtr CalculateMinFlushLsn(WalProposer *wp) { XLogRecPtr lsn = wp->n_safekeepers > 0 - ? wp->safekeeper[0].appendResponse.flushLsn - : InvalidXLogRecPtr; + ? wp->safekeeper[0].appendResponse.flushLsn + : InvalidXLogRecPtr; for (int i = 1; i < wp->n_safekeepers; i++) { @@ -1648,15 +1650,15 @@ HandleSafekeeperResponse(WalProposer *wp) if (n_synced >= wp->quorum) { /* A quorum of safekeepers has been synced! */ - + /* - * Send empty message to broadcast latest truncateLsn to all safekeepers. - * This helps to finish next sync-safekeepers eailier, by skipping recovery - * step. - * - * We don't need to wait for response because it doesn't affect correctness, - * and TCP should be able to deliver the message to safekeepers in case of - * network working properly. + * Send empty message to broadcast latest truncateLsn to all + * safekeepers. This helps to finish next sync-safekeepers + * eailier, by skipping recovery step. + * + * We don't need to wait for response because it doesn't affect + * correctness, and TCP should be able to deliver the message to + * safekeepers in case of network working properly. */ BroadcastAppendRequest(wp); @@ -1705,7 +1707,7 @@ AsyncRead(Safekeeper *sk, char **buf, int *buf_size) * failed, a warning is emitted and the connection is reset. */ static bool -AsyncReadMessage(Safekeeper *sk, AcceptorProposerMessage * anymsg) +AsyncReadMessage(Safekeeper *sk, AcceptorProposerMessage *anymsg) { WalProposer *wp = sk->wp; @@ -2072,12 +2074,12 @@ FormatEvents(uint32 events) /* Helper variable to check if there's extra bits */ uint32 all_flags = WL_LATCH_SET - | WL_SOCKET_READABLE - | WL_SOCKET_WRITEABLE - | WL_TIMEOUT - | WL_POSTMASTER_DEATH - | WL_EXIT_ON_PM_DEATH - | WL_SOCKET_CONNECTED; + | WL_SOCKET_READABLE + | WL_SOCKET_WRITEABLE + | WL_TIMEOUT + | WL_POSTMASTER_DEATH + | WL_EXIT_ON_PM_DEATH + | WL_SOCKET_CONNECTED; /* * The formatting here isn't supposed to be *particularly* useful -- it's diff --git a/pgxn/neon/walproposer.h b/pgxn/neon/walproposer.h index 63f7cf1eb0..a1a9ccdfdd 100644 --- a/pgxn/neon/walproposer.h +++ b/pgxn/neon/walproposer.h @@ -38,7 +38,7 @@ typedef enum PG_ASYNC_READ_TRY_AGAIN, /* Reading failed. Check PQerrorMessage(conn) */ PG_ASYNC_READ_FAIL, -} PGAsyncReadResult; +} PGAsyncReadResult; /* Possible return values from WritePGAsync */ typedef enum @@ -57,7 +57,7 @@ typedef enum PG_ASYNC_WRITE_TRY_FLUSH, /* Writing failed. Check PQerrorMessage(conn) */ PG_ASYNC_WRITE_FAIL, -} PGAsyncWriteResult; +} PGAsyncWriteResult; /* * WAL safekeeper state, which is used to wait for some event. @@ -133,7 +133,7 @@ typedef enum * to read. */ SS_ACTIVE, -} SafekeeperState; +} SafekeeperState; /* Consensus logical timestamp. */ typedef uint64 term_t; @@ -157,12 +157,12 @@ typedef struct ProposerGreeting uint8 tenant_id[16]; TimeLineID timeline; uint32 walSegSize; -} ProposerGreeting; +} ProposerGreeting; typedef struct AcceptorProposerMessage { uint64 tag; -} AcceptorProposerMessage; +} AcceptorProposerMessage; /* * Acceptor -> Proposer initial response: the highest term acceptor voted for. @@ -172,7 +172,7 @@ typedef struct AcceptorGreeting AcceptorProposerMessage apm; term_t term; NNodeId nodeId; -} AcceptorGreeting; +} AcceptorGreeting; /* * Proposer -> Acceptor vote request. @@ -182,20 +182,20 @@ typedef struct VoteRequest uint64 tag; term_t term; pg_uuid_t proposerId; /* for monitoring/debugging */ -} VoteRequest; +} VoteRequest; /* Element of term switching chain. */ typedef struct TermSwitchEntry { term_t term; XLogRecPtr lsn; -} TermSwitchEntry; +} TermSwitchEntry; typedef struct TermHistory { uint32 n_entries; TermSwitchEntry *entries; -} TermHistory; +} TermHistory; /* Vote itself, sent from safekeeper to proposer */ typedef struct VoteResponse @@ -213,7 +213,7 @@ typedef struct VoteResponse * recovery of some safekeeper */ TermHistory termHistory; XLogRecPtr timelineStartLsn; /* timeline globally starts at this LSN */ -} VoteResponse; +} VoteResponse; /* * Proposer -> Acceptor message announcing proposer is elected and communicating @@ -229,7 +229,7 @@ typedef struct ProposerElected TermHistory *termHistory; /* timeline globally starts at this LSN */ XLogRecPtr timelineStartLsn; -} ProposerElected; +} ProposerElected; /* * Header of request with WAL message sent from proposer to safekeeper. @@ -254,7 +254,7 @@ typedef struct AppendRequestHeader */ XLogRecPtr truncateLsn; pg_uuid_t proposerId; /* for monitoring/debugging */ -} AppendRequestHeader; +} AppendRequestHeader; /* * Hot standby feedback received from replica @@ -264,7 +264,7 @@ typedef struct HotStandbyFeedback TimestampTz ts; FullTransactionId xmin; FullTransactionId catalog_xmin; -} HotStandbyFeedback; +} HotStandbyFeedback; typedef struct PageserverFeedback { @@ -275,7 +275,7 @@ typedef struct PageserverFeedback XLogRecPtr disk_consistent_lsn; XLogRecPtr remote_consistent_lsn; TimestampTz replytime; -} PageserverFeedback; +} PageserverFeedback; typedef struct WalproposerShmemState { @@ -283,7 +283,7 @@ typedef struct WalproposerShmemState PageserverFeedback feedback; term_t mineLastElectedTerm; pg_atomic_uint64 backpressureThrottlingTime; -} WalproposerShmemState; +} WalproposerShmemState; /* * Report safekeeper state to proposer @@ -307,7 +307,7 @@ typedef struct AppendResponse /* and custom neon feedback. */ /* This part of the message is extensible. */ PageserverFeedback rf; -} AppendResponse; +} AppendResponse; /* PageserverFeedback is extensible part of the message that is parsed separately */ /* Other fields are fixed part */ @@ -331,7 +331,7 @@ typedef struct Safekeeper * * May contain private information like password and should not be logged. */ - char conninfo[MAXCONNINFO]; + char conninfo[MAXCONNINFO]; /* * postgres protocol connection to the WAL acceptor @@ -364,7 +364,7 @@ typedef struct Safekeeper int eventPos; /* position in wait event set. Equal to -1 if* * no event */ SafekeeperState state; /* safekeeper state machine state */ - TimestampTz latestMsgReceivedAt; /* when latest msg is received */ + TimestampTz latestMsgReceivedAt; /* when latest msg is received */ AcceptorGreeting greetResponse; /* acceptor greeting */ VoteResponse voteResponse; /* the vote */ AppendResponse appendResponse; /* feedback for master */ @@ -382,7 +382,7 @@ typedef enum * 'libpq-fe.h' still has PGRES_POLLING_ACTIVE, but says it's unused. * We've removed it here to avoid clutter. */ -} WalProposerConnectPollStatusType; +} WalProposerConnectPollStatusType; /* Re-exported and modified ExecStatusType */ typedef enum @@ -407,7 +407,7 @@ typedef enum WP_EXEC_NEEDS_INPUT, /* Catch-all failure. Check PQerrorMessage. */ WP_EXEC_FAILED, -} WalProposerExecStatusType; +} WalProposerExecStatusType; /* Re-exported ConnStatusType */ typedef enum @@ -421,7 +421,7 @@ typedef enum * that extra functionality, so we collect them into a single tag here. */ WP_CONNECTION_IN_PROGRESS, -} WalProposerConnStatusType; +} WalProposerConnStatusType; /* * Collection of hooks for walproposer, to call postgres functions, @@ -430,185 +430,187 @@ typedef enum typedef struct walproposer_api { /* - * Get WalproposerShmemState. This is used to store information about - * last elected term. + * Get WalproposerShmemState. This is used to store information about last + * elected term. */ - WalproposerShmemState * (*get_shmem_state) (void); + WalproposerShmemState *(*get_shmem_state) (void); /* * Start receiving notifications about new WAL. This is an infinite loop - * which calls WalProposerBroadcast() and WalProposerPoll() to send the WAL. + * which calls WalProposerBroadcast() and WalProposerPoll() to send the + * WAL. */ - void (*start_streaming) (WalProposer *wp, XLogRecPtr startpos); + void (*start_streaming) (WalProposer *wp, XLogRecPtr startpos); /* Get pointer to the latest available WAL. */ - XLogRecPtr (*get_flush_rec_ptr) (void); + XLogRecPtr (*get_flush_rec_ptr) (void); /* Get current time. */ - TimestampTz (*get_current_timestamp) (void); + TimestampTz (*get_current_timestamp) (void); /* Get postgres timeline. */ - TimeLineID (*get_timeline_id) (void); + TimeLineID (*get_timeline_id) (void); /* Current error message, aka PQerrorMessage. */ - char * (*conn_error_message) (WalProposerConn * conn); + char *(*conn_error_message) (WalProposerConn *conn); /* Connection status, aka PQstatus. */ - WalProposerConnStatusType (*conn_status) (WalProposerConn * conn); + WalProposerConnStatusType (*conn_status) (WalProposerConn *conn); /* Start the connection, aka PQconnectStart. */ - WalProposerConn * (*conn_connect_start) (char *conninfo); + WalProposerConn *(*conn_connect_start) (char *conninfo); /* Poll an asynchronous connection, aka PQconnectPoll. */ - WalProposerConnectPollStatusType (*conn_connect_poll) (WalProposerConn * conn); - + WalProposerConnectPollStatusType (*conn_connect_poll) (WalProposerConn *conn); + /* Send a blocking SQL query, aka PQsendQuery. */ - bool (*conn_send_query) (WalProposerConn * conn, char * query); - + bool (*conn_send_query) (WalProposerConn *conn, char *query); + /* Read the query result, aka PQgetResult. */ - WalProposerExecStatusType (*conn_get_query_result) (WalProposerConn * conn); + WalProposerExecStatusType (*conn_get_query_result) (WalProposerConn *conn); /* Flush buffer to the network, aka PQflush. */ - int (*conn_flush) (WalProposerConn * conn); + int (*conn_flush) (WalProposerConn *conn); /* Close the connection, aka PQfinish. */ - void (*conn_finish) (WalProposerConn * conn); + void (*conn_finish) (WalProposerConn *conn); /* Try to read CopyData message, aka PQgetCopyData. */ - PGAsyncReadResult (*conn_async_read) (WalProposerConn * conn, char **buf, int *amount); + PGAsyncReadResult (*conn_async_read) (WalProposerConn *conn, char **buf, int *amount); /* Try to write CopyData message, aka PQputCopyData. */ - PGAsyncWriteResult (*conn_async_write) (WalProposerConn * conn, void const *buf, size_t size); - + PGAsyncWriteResult (*conn_async_write) (WalProposerConn *conn, void const *buf, size_t size); + /* Blocking CopyData write, aka PQputCopyData + PQflush. */ - bool (*conn_blocking_write) (WalProposerConn * conn, void const *buf, size_t size); - + bool (*conn_blocking_write) (WalProposerConn *conn, void const *buf, size_t size); + /* Download WAL from startpos to endpos and make it available locally. */ - bool (*recovery_download) (Safekeeper * sk, TimeLineID timeline, XLogRecPtr startpos, XLogRecPtr endpos); - + bool (*recovery_download) (Safekeeper *sk, TimeLineID timeline, XLogRecPtr startpos, XLogRecPtr endpos); + /* Read WAL from disk to buf. */ - void (*wal_read) (XLogReaderState *state, char *buf, XLogRecPtr startptr, Size count); - + void (*wal_read) (XLogReaderState *state, char *buf, XLogRecPtr startptr, Size count); + /* Allocate WAL reader. */ - XLogReaderState * (*wal_reader_allocate) (void); - + XLogReaderState *(*wal_reader_allocate) (void); + /* Deallocate event set. */ - void (*free_event_set) (void); + void (*free_event_set) (void); /* Initialize event set. */ - void (*init_event_set) (int n_safekeepers); - + void (*init_event_set) (int n_safekeepers); + /* Update events for an existing safekeeper connection. */ - void (*update_event_set) (Safekeeper * sk, uint32 events); + void (*update_event_set) (Safekeeper *sk, uint32 events); /* Add a new safekeeper connection to the event set. */ - void (*add_safekeeper_event_set) (Safekeeper * sk, uint32 events); - + void (*add_safekeeper_event_set) (Safekeeper *sk, uint32 events); + /* - * Wait until some event happens: - * - timeout is reached - * - socket event for safekeeper connection - * - new WAL is available - * - * Returns 0 if timeout is reached, 1 if some event happened. Updates events mask - * to indicate events and sets sk to the safekeeper which has an event. + * Wait until some event happens: - timeout is reached - socket event for + * safekeeper connection - new WAL is available + * + * Returns 0 if timeout is reached, 1 if some event happened. Updates + * events mask to indicate events and sets sk to the safekeeper which has + * an event. */ - int (*wait_event_set) (long timeout, Safekeeper **sk, uint32 *events); - + int (*wait_event_set) (long timeout, Safekeeper **sk, uint32 *events); + /* Read random bytes. */ - bool (*strong_random) (void *buf, size_t len); - + bool (*strong_random) (void *buf, size_t len); + /* * Get a basebackup LSN. Used to cross-validate with the latest available * LSN on the safekeepers. */ - XLogRecPtr (*get_redo_start_lsn) (void); + XLogRecPtr (*get_redo_start_lsn) (void); /* * Finish sync safekeepers with the given LSN. This function should not * return and should exit the program. */ - void (*finish_sync_safekeepers) (XLogRecPtr lsn); + void (*finish_sync_safekeepers) (XLogRecPtr lsn); /* * Called after every new message from the safekeeper. Used to propagate - * backpressure feedback and to confirm WAL persistence (has been commited on - * the quorum of safekeepers). + * backpressure feedback and to confirm WAL persistence (has been commited + * on the quorum of safekeepers). */ - void (*process_safekeeper_feedback) (WalProposer *wp, XLogRecPtr commitLsn); - + void (*process_safekeeper_feedback) (WalProposer *wp, XLogRecPtr commitLsn); + /* * Called on peer_horizon_lsn updates. Used to advance replication slot * and to free up disk space by deleting unnecessary WAL. */ - void (*confirm_wal_streamed) (XLogRecPtr lsn); + void (*confirm_wal_streamed) (XLogRecPtr lsn); } walproposer_api; /* * Configuration of the WAL proposer. */ -typedef struct WalProposerConfig { +typedef struct WalProposerConfig +{ /* hex-encoded TenantId cstr */ - char *neon_tenant; + char *neon_tenant; /* hex-encoded TimelineId cstr */ - char *neon_timeline; + char *neon_timeline; /* * Comma-separated list of safekeepers, in the following format: * host1:port1,host2:port2,host3:port3 - * + * * This cstr should be editable. */ - char *safekeepers_list; + char *safekeepers_list; /* * WalProposer reconnects to offline safekeepers once in this interval. * Time is in milliseconds. */ - int safekeeper_reconnect_timeout; + int safekeeper_reconnect_timeout; /* * WalProposer terminates the connection if it doesn't receive any message * from the safekeeper in this interval. Time is in milliseconds. */ - int safekeeper_connection_timeout; + int safekeeper_connection_timeout; /* - * WAL segment size. Will be passed to safekeepers in greet request. - * Also used to detect page headers. + * WAL segment size. Will be passed to safekeepers in greet request. Also + * used to detect page headers. */ - int wal_segment_size; + int wal_segment_size; /* * If safekeeper was started in sync mode, walproposer will not subscribe - * for new WAL and will exit when quorum of safekeepers will be synced - * to the latest available LSN. + * for new WAL and will exit when quorum of safekeepers will be synced to + * the latest available LSN. */ - bool syncSafekeepers; + bool syncSafekeepers; /* Will be passed to safekeepers in greet request. */ - uint64 systemId; + uint64 systemId; } WalProposerConfig; /* * WAL proposer state. */ -typedef struct WalProposer { +typedef struct WalProposer +{ WalProposerConfig *config; - int n_safekeepers; + int n_safekeepers; /* (n_safekeepers / 2) + 1 */ - int quorum; + int quorum; - Safekeeper safekeeper[MAX_SAFEKEEPERS]; + Safekeeper safekeeper[MAX_SAFEKEEPERS]; /* WAL has been generated up to this point */ - XLogRecPtr availableLsn; + XLogRecPtr availableLsn; /* last commitLsn broadcasted to safekeepers */ - XLogRecPtr lastSentCommitLsn; + XLogRecPtr lastSentCommitLsn; ProposerGreeting greetRequest; @@ -616,42 +618,45 @@ typedef struct WalProposer { VoteRequest voteRequest; /* - * Minimal LSN which may be needed for recovery of some safekeeper, - * record-aligned (first record which might not yet received by someone). + * Minimal LSN which may be needed for recovery of some safekeeper, + * record-aligned (first record which might not yet received by someone). */ - XLogRecPtr truncateLsn; + XLogRecPtr truncateLsn; /* - * Term of the proposer. We want our term to be highest and unique, - * so we collect terms from safekeepers quorum, choose max and +1. - * After that our term is fixed and must not change. If we observe - * that some safekeeper has higher term, it means that we have another - * running compute, so we must stop immediately. + * Term of the proposer. We want our term to be highest and unique, so we + * collect terms from safekeepers quorum, choose max and +1. After that + * our term is fixed and must not change. If we observe that some + * safekeeper has higher term, it means that we have another running + * compute, so we must stop immediately. */ - term_t propTerm; + term_t propTerm; /* term history of the proposer */ TermHistory propTermHistory; /* epoch start lsn of the proposer */ - XLogRecPtr propEpochStartLsn; + XLogRecPtr propEpochStartLsn; /* Most advanced acceptor epoch */ - term_t donorEpoch; + term_t donorEpoch; /* Most advanced acceptor */ - int donor; + int donor; /* timeline globally starts at this LSN */ - XLogRecPtr timelineStartLsn; + XLogRecPtr timelineStartLsn; /* number of votes collected from safekeepers */ - int n_votes; + int n_votes; /* number of successful connections over the lifetime of walproposer */ - int n_connected; + int n_connected; - /* Timestamp of the last reconnection attempt. Related to config->safekeeper_reconnect_timeout */ + /* + * Timestamp of the last reconnection attempt. Related to + * config->safekeeper_reconnect_timeout + */ TimestampTz last_reconnect_attempt; walproposer_api api; @@ -662,6 +667,6 @@ extern void WalProposerStart(WalProposer *wp); extern void WalProposerBroadcast(WalProposer *wp, XLogRecPtr startpos, XLogRecPtr endpos); extern void WalProposerPoll(WalProposer *wp); extern void ParsePageserverFeedbackMessage(StringInfo reply_message, - PageserverFeedback *rf); + PageserverFeedback *rf); #endif /* __NEON_WALPROPOSER_H__ */ diff --git a/pgxn/neon/walproposer_pg.c b/pgxn/neon/walproposer_pg.c index fe59db50d5..654b411e94 100644 --- a/pgxn/neon/walproposer_pg.c +++ b/pgxn/neon/walproposer_pg.c @@ -58,7 +58,7 @@ int wal_acceptor_reconnect_timeout = 1000; int wal_acceptor_connection_timeout = 10000; static AppendResponse quorumFeedback; -static WalproposerShmemState * walprop_shared; +static WalproposerShmemState *walprop_shared; static WalProposerConfig walprop_config; static XLogRecPtr sentPtr = InvalidXLogRecPtr; static const walproposer_api walprop_pg; @@ -87,8 +87,8 @@ static void StartProposerReplication(WalProposer *wp, StartReplicationCmd *cmd); static void WalSndLoop(WalProposer *wp); static void XLogBroadcastWalProposer(WalProposer *wp); -static void XLogWalPropWrite(char *buf, Size nbytes, XLogRecPtr recptr); -static void XLogWalPropClose(XLogRecPtr recptr); +static void XLogWalPropWrite(char *buf, Size nbytes, XLogRecPtr recptr); +static void XLogWalPropClose(XLogRecPtr recptr); static void init_walprop_config(bool syncSafekeepers) @@ -130,7 +130,7 @@ PGDLLEXPORT void WalProposerMain(Datum main_arg) { WalProposer *wp; - + init_walprop_config(false); walprop_pg_init_bgworker(); walprop_pg_load_libpqwalreceiver(); @@ -277,14 +277,14 @@ backpressure_throttling_impl(void) TimestampTz start, stop; bool retry = PrevProcessInterruptsCallback - ? PrevProcessInterruptsCallback() - : false; + ? PrevProcessInterruptsCallback() + : false; /* - * Don't throttle read only transactions or wal sender. - * Do throttle CREATE INDEX CONCURRENTLY, however. It performs some - * stages outside a transaction, even though it writes a lot of WAL. - * Check PROC_IN_SAFE_IC flag to cover that case. + * Don't throttle read only transactions or wal sender. Do throttle CREATE + * INDEX CONCURRENTLY, however. It performs some stages outside a + * transaction, even though it writes a lot of WAL. Check PROC_IN_SAFE_IC + * flag to cover that case. */ if (am_walsender || (!(MyProc->statusFlags & PROC_IN_SAFE_IC) @@ -386,7 +386,7 @@ walprop_pg_get_shmem_state(void) } void -replication_feedback_set(PageserverFeedback * rf) +replication_feedback_set(PageserverFeedback *rf) { SpinLockAcquire(&walprop_shared->mutex); memcpy(&walprop_shared->feedback, rf, sizeof(PageserverFeedback)); @@ -455,11 +455,11 @@ walprop_pg_init_standalone_sync_safekeepers(void) if (pipe(postmaster_alive_fds) < 0) ereport(FATAL, (errcode_for_file_access(), - errmsg_internal("could not create pipe to monitor postmaster death: %m"))); + errmsg_internal("could not create pipe to monitor postmaster death: %m"))); if (fcntl(postmaster_alive_fds[POSTMASTER_FD_WATCH], F_SETFL, O_NONBLOCK) == -1) ereport(FATAL, (errcode_for_socket_access(), - errmsg_internal("could not set postmaster death monitoring pipe to nonblocking mode: %m"))); + errmsg_internal("could not set postmaster death monitoring pipe to nonblocking mode: %m"))); ChangeToDataDir(); @@ -471,8 +471,8 @@ walprop_pg_init_standalone_sync_safekeepers(void) { ereport(ERROR, (errcode_for_file_access(), - errmsg("could not create directory \"%s\": %m", - XLOGDIR))); + errmsg("could not create directory \"%s\": %m", + XLOGDIR))); exit(1); } } @@ -544,8 +544,7 @@ struct WalProposerConn { PGconn *pg_conn; bool is_nonblocking; /* whether the connection is non-blocking */ - char *recvbuf; /* last received data from - * walprop_async_read */ + char *recvbuf; /* last received data from walprop_async_read */ }; /* Helper function */ @@ -593,18 +592,16 @@ walprop_connect_start(char *conninfo) const char *keywords[3]; const char *values[3]; int n; - char *password = neon_auth_token; + char *password = neon_auth_token; /* - * Connect using the given connection string. If the - * NEON_AUTH_TOKEN environment variable was set, use that as - * the password. + * Connect using the given connection string. If the NEON_AUTH_TOKEN + * environment variable was set, use that as the password. * - * The connection options are parsed in the order they're given, so - * when we set the password before the connection string, the - * connection string can override the password from the env variable. - * Seems useful, although we don't currently use that capability - * anywhere. + * The connection options are parsed in the order they're given, so when + * we set the password before the connection string, the connection string + * can override the password from the env variable. Seems useful, although + * we don't currently use that capability anywhere. */ n = 0; if (password) @@ -1018,19 +1015,18 @@ StartProposerReplication(WalProposer *wp, StartReplicationCmd *cmd) #endif /* - * When we first start replication the standby will be behind the - * primary. For some applications, for example synchronous - * replication, it is important to have a clear state for this initial - * catchup mode, so we can trigger actions when we change streaming - * state later. We may stay in this state for a long time, which is - * exactly why we want to be able to monitor whether or not we are - * still here. + * When we first start replication the standby will be behind the primary. + * For some applications, for example synchronous replication, it is + * important to have a clear state for this initial catchup mode, so we + * can trigger actions when we change streaming state later. We may stay + * in this state for a long time, which is exactly why we want to be able + * to monitor whether or not we are still here. */ WalSndSetState(WALSNDSTATE_CATCHUP); /* - * Don't allow a request to stream from a future point in WAL that - * hasn't been flushed to disk in this server yet. + * Don't allow a request to stream from a future point in WAL that hasn't + * been flushed to disk in this server yet. */ if (FlushPtr < cmd->startpoint) { @@ -1096,12 +1092,12 @@ XLogBroadcastWalProposer(WalProposer *wp) /* * Streaming the current timeline on a primary. * - * Attempt to send all data that's already been written out and - * fsync'd to disk. We cannot go further than what's been written out - * given the current implementation of WALRead(). And in any case - * it's unsafe to send WAL that is not securely down to disk on the - * primary: if the primary subsequently crashes and restarts, standbys - * must not have applied any WAL that got lost on the primary. + * Attempt to send all data that's already been written out and fsync'd to + * disk. We cannot go further than what's been written out given the + * current implementation of WALRead(). And in any case it's unsafe to + * send WAL that is not securely down to disk on the primary: if the + * primary subsequently crashes and restarts, standbys must not have + * applied any WAL that got lost on the primary. */ #if PG_VERSION_NUM >= 150000 endptr = GetFlushRecPtr(NULL); @@ -1167,12 +1163,12 @@ XLogBroadcastWalProposer(WalProposer *wp) * Receive WAL from most advanced safekeeper */ static bool -WalProposerRecovery(Safekeeper * sk, TimeLineID timeline, XLogRecPtr startpos, XLogRecPtr endpos) +WalProposerRecovery(Safekeeper *sk, TimeLineID timeline, XLogRecPtr startpos, XLogRecPtr endpos) { char *err; WalReceiverConn *wrconn; WalRcvStreamOptions options; - char conninfo[MAXCONNINFO]; + char conninfo[MAXCONNINFO]; if (!neon_auth_token) { @@ -1180,7 +1176,7 @@ WalProposerRecovery(Safekeeper * sk, TimeLineID timeline, XLogRecPtr startpos, X } else { - int written = 0; + int written = 0; written = snprintf((char *) conninfo, MAXCONNINFO, "password=%s %s", neon_auth_token, sk->conninfo); if (written > MAXCONNINFO || written < 0) @@ -1390,11 +1386,11 @@ walprop_pg_wal_read(XLogReaderState *state, char *buf, XLogRecPtr startptr, Size WALReadError errinfo; if (!WALRead(state, - buf, - startptr, - count, - walprop_pg_get_timeline_id(), - &errinfo)) + buf, + startptr, + count, + walprop_pg_get_timeline_id(), + &errinfo)) { WALReadRaiseError(&errinfo); } @@ -1432,7 +1428,7 @@ walprop_pg_init_event_set(int n_safekeepers) } static void -walprop_pg_update_event_set(Safekeeper * sk, uint32 events) +walprop_pg_update_event_set(Safekeeper *sk, uint32 events) { /* eventPos = -1 when we don't have an event */ Assert(sk->eventPos != -1); @@ -1441,7 +1437,7 @@ walprop_pg_update_event_set(Safekeeper * sk, uint32 events) } static void -walprop_pg_add_safekeeper_event_set(Safekeeper * sk, uint32 events) +walprop_pg_add_safekeeper_event_set(Safekeeper *sk, uint32 events) { sk->eventPos = AddWaitEventToSet(waitEvents, events, walprop_socket(sk->conn), NULL, sk); } @@ -1462,22 +1458,21 @@ walprop_pg_wait_event_set(long timeout, Safekeeper **sk, uint32 *events) #endif /* - * Wait for a wait event to happen, or timeout: - * - Safekeeper socket can become available for READ or WRITE - * - Our latch got set, because - * * PG15-: We got woken up by a process triggering the WalSender - * * PG16+: WalSndCtl->wal_flush_cv was triggered + * Wait for a wait event to happen, or timeout: - Safekeeper socket can + * become available for READ or WRITE - Our latch got set, because * + * PG15-: We got woken up by a process triggering the WalSender * PG16+: + * WalSndCtl->wal_flush_cv was triggered */ rc = WaitEventSetWait(waitEvents, timeout, - &event, 1, WAIT_EVENT_WAL_SENDER_MAIN); + &event, 1, WAIT_EVENT_WAL_SENDER_MAIN); #if PG_MAJORVERSION_NUM >= 16 if (WalSndCtl != NULL) late_cv_trigger = ConditionVariableCancelSleep(); #endif /* - * If wait is terminated by latch set (walsenders' latch is set on - * each wal flush). (no need for pm death check due to WL_EXIT_ON_PM_DEATH) + * If wait is terminated by latch set (walsenders' latch is set on each + * wal flush). (no need for pm death check due to WL_EXIT_ON_PM_DEATH) */ if ((rc == 1 && event.events & WL_LATCH_SET) || late_cv_trigger) { @@ -1488,8 +1483,8 @@ walprop_pg_wait_event_set(long timeout, Safekeeper **sk, uint32 *events) } /* - * If the event contains something about the socket, it means we got - * an event from a safekeeper socket. + * If the event contains something about the socket, it means we got an + * event from a safekeeper socket. */ if (rc == 1 && (event.events & (WL_SOCKET_MASK))) { @@ -1514,7 +1509,7 @@ walprop_pg_finish_sync_safekeepers(XLogRecPtr lsn) * Get PageserverFeedback fields from the most advanced safekeeper */ static void -GetLatestNeonFeedback(PageserverFeedback * rf, WalProposer *wp) +GetLatestNeonFeedback(PageserverFeedback *rf, WalProposer *wp) { int latest_safekeeper = 0; XLogRecPtr last_received_lsn = InvalidXLogRecPtr; @@ -1549,7 +1544,7 @@ GetLatestNeonFeedback(PageserverFeedback * rf, WalProposer *wp) * Combine hot standby feedbacks from all safekeepers. */ static void -CombineHotStanbyFeedbacks(HotStandbyFeedback * hs, WalProposer *wp) +CombineHotStanbyFeedbacks(HotStandbyFeedback *hs, WalProposer *wp) { hs->ts = 0; hs->xmin.value = ~0; /* largest unsigned value */ @@ -1560,6 +1555,7 @@ CombineHotStanbyFeedbacks(HotStandbyFeedback * hs, WalProposer *wp) if (wp->safekeeper[i].appendResponse.hs.ts != 0) { HotStandbyFeedback *skhs = &wp->safekeeper[i].appendResponse.hs; + if (FullTransactionIdIsNormal(skhs->xmin) && FullTransactionIdPrecedes(skhs->xmin, hs->xmin)) {