diff --git a/pgxn/neon/neon.h b/pgxn/neon/neon.h index 456da90d89..f711a78052 100644 --- a/pgxn/neon/neon.h +++ b/pgxn/neon/neon.h @@ -33,4 +33,7 @@ extern bool (*old_redo_read_buffer_filter) (XLogReaderState *record, uint8 block extern uint64 BackpressureThrottlingTime(void); extern void replication_feedback_get_lsns(XLogRecPtr *writeLsn, XLogRecPtr *flushLsn, XLogRecPtr *applyLsn); +extern void PGDLLEXPORT WalProposerSync(int argc, char *argv[]); +extern void PGDLLEXPORT WalProposerMain(Datum main_arg); + #endif /* NEON_H */ diff --git a/pgxn/neon/walproposer.c b/pgxn/neon/walproposer.c index efc7549deb..a9b9c97465 100644 --- a/pgxn/neon/walproposer.c +++ b/pgxn/neon/walproposer.c @@ -37,49 +37,13 @@ #include "walproposer.h" #include "neon_utils.h" -static bool syncSafekeepers = false; - -static int n_safekeepers = 0; -static int quorum = 0; -static Safekeeper safekeeper[MAX_SAFEKEEPERS]; -static XLogRecPtr availableLsn; /* WAL has been generated up to this point */ -static XLogRecPtr lastSentCommitLsn; /* last commitLsn broadcast to* - * safekeepers */ -static ProposerGreeting greetRequest; -static VoteRequest voteRequest; /* Vote request for safekeeper */ -/* - * Minimal LSN which may be needed for recovery of some safekeeper, - * record-aligned (first record which might not yet received by someone). - */ -static XLogRecPtr truncateLsn; - -/* - * Term of the proposer. We want our term to be highest and unique, - * so we collect terms from safekeepers quorum, choose max and +1. - * After that our term is fixed and must not change. If we observe - * that some safekeeper has higher term, it means that we have another - * running compute, so we must stop immediately. - */ -static term_t propTerm; -static TermHistory propTermHistory; /* term history of the proposer */ -static XLogRecPtr propEpochStartLsn; /* epoch start lsn of the proposer */ -static term_t donorEpoch; /* Most advanced acceptor epoch */ -static int donor; /* Most advanced acceptor */ -static XLogRecPtr timelineStartLsn; /* timeline globally starts at this LSN */ -static int n_votes = 0; -static int n_connected = 0; -static TimestampTz last_reconnect_attempt; - /* Prototypes for private functions */ -static void WalProposerRegister(void); -static void WalProposerInit(XLogRecPtr flushRecPtr, uint64 systemId); -static void WalProposerStart(void); -static void WalProposerLoop(void); +static void WalProposerLoop(WalProposer *wp); static void HackyRemoveWalProposerEvent(Safekeeper *to_remove); static void ShutdownConnection(Safekeeper *sk); static void ResetConnection(Safekeeper *sk); -static long TimeToReconnect(TimestampTz now); -static void ReconnectSafekeepers(void); +static long TimeToReconnect(WalProposer *wp, TimestampTz now); +static void ReconnectSafekeepers(WalProposer *wp); static void AdvancePollState(Safekeeper *sk, uint32 events); static void HandleConnectionEvent(Safekeeper *sk); static void SendStartWALPush(Safekeeper *sk); @@ -88,60 +52,113 @@ static void SendProposerGreeting(Safekeeper *sk); static void RecvAcceptorGreeting(Safekeeper *sk); static void SendVoteRequest(Safekeeper *sk); static void RecvVoteResponse(Safekeeper *sk); -static void HandleElectedProposer(void); +static void HandleElectedProposer(WalProposer *wp); static term_t GetHighestTerm(TermHistory * th); static term_t GetEpoch(Safekeeper *sk); -static void DetermineEpochStartLsn(void); +static void DetermineEpochStartLsn(WalProposer *wp); static void SendProposerElected(Safekeeper *sk); static void StartStreaming(Safekeeper *sk); static void SendMessageToNode(Safekeeper *sk); -static void BroadcastAppendRequest(void); +static void BroadcastAppendRequest(WalProposer *wp); static void HandleActiveState(Safekeeper *sk, uint32 events); static bool SendAppendRequests(Safekeeper *sk); static bool RecvAppendResponses(Safekeeper *sk); -static XLogRecPtr CalculateMinFlushLsn(void); -static XLogRecPtr GetAcknowledgedByQuorumWALPosition(void); -static void HandleSafekeeperResponse(void); +static XLogRecPtr CalculateMinFlushLsn(WalProposer *wp); +static XLogRecPtr GetAcknowledgedByQuorumWALPosition(WalProposer *wp); +static void HandleSafekeeperResponse(WalProposer *wp); static bool AsyncRead(Safekeeper *sk, char **buf, int *buf_size); static bool AsyncReadMessage(Safekeeper *sk, AcceptorProposerMessage * anymsg); static bool BlockingWrite(Safekeeper *sk, void *msg, size_t msg_size, SafekeeperState success_state); static bool AsyncWrite(Safekeeper *sk, void *msg, size_t msg_size, SafekeeperState flush_state); static bool AsyncFlush(Safekeeper *sk); +static int CompareLsn(const void *a, const void *b); +static char *FormatSafekeeperState(SafekeeperState state); +static void AssertEventsOkForState(uint32 events, Safekeeper *sk); +static uint32 SafekeeperStateDesiredEvents(SafekeeperState state); +static char *FormatEvents(uint32 events); -static int CompareLsn(const void *a, const void *b); -static char *FormatSafekeeperState(SafekeeperState state); -static void AssertEventsOkForState(uint32 events, Safekeeper *sk); -static uint32 SafekeeperStateDesiredEvents(SafekeeperState state); -static char *FormatEvents(uint32 events); - -/* - * Entry point for `postgres --sync-safekeepers`. - */ -PGDLLEXPORT void -WalProposerSync(int argc, char *argv[]) +WalProposer * +WalProposerCreate(WalProposerConfig *config, walproposer_api api) { - syncSafekeepers = true; - walprop_pg.init_standalone_sync_safekeepers(); + char *host; + char *sep; + char *port; + WalProposer *wp; + + wp = palloc0(sizeof(WalProposer)); + wp->config = config; + wp->api = api; - WalProposerInit(0, 0); - WalProposerStart(); -} + for (host = wp->config->safekeepers_list; host != NULL && *host != '\0'; host = sep) + { + port = strchr(host, ':'); + if (port == NULL) + { + elog(FATAL, "port is not specified"); + } + *port++ = '\0'; + sep = strchr(port, ','); + if (sep != NULL) + *sep++ = '\0'; + if (wp->n_safekeepers + 1 >= MAX_SAFEKEEPERS) + { + elog(FATAL, "Too many safekeepers"); + } + wp->safekeeper[wp->n_safekeepers].host = host; + wp->safekeeper[wp->n_safekeepers].port = port; + wp->safekeeper[wp->n_safekeepers].state = SS_OFFLINE; + wp->safekeeper[wp->n_safekeepers].conn = NULL; + wp->safekeeper[wp->n_safekeepers].wp = wp; -/* - * WAL proposer bgworker entry point. - */ -PGDLLEXPORT void -WalProposerMain(Datum main_arg) -{ - uint64 systemId = walprop_pg.init_bgworker(); + { + Safekeeper *sk = &wp->safekeeper[wp->n_safekeepers]; + int written = 0; - WalProposerInit(walprop_pg.get_flush_rec_ptr(), systemId); + written = snprintf((char *) &sk->conninfo, MAXCONNINFO, + "host=%s port=%s dbname=replication options='-c timeline_id=%s tenant_id=%s'", + sk->host, sk->port, wp->config->neon_timeline, wp->config->neon_tenant); + if (written > MAXCONNINFO || written < 0) + elog(FATAL, "could not create connection string for safekeeper %s:%s", sk->host, sk->port); + } - last_reconnect_attempt = walprop_pg.get_current_timestamp(); + initStringInfo(&wp->safekeeper[wp->n_safekeepers].outbuf); + wp->safekeeper[wp->n_safekeepers].xlogreader = wp->api.wal_reader_allocate(); + if (wp->safekeeper[wp->n_safekeepers].xlogreader == NULL) + elog(FATAL, "Failed to allocate xlog reader"); + wp->safekeeper[wp->n_safekeepers].flushWrite = false; + wp->safekeeper[wp->n_safekeepers].startStreamingAt = InvalidXLogRecPtr; + wp->safekeeper[wp->n_safekeepers].streamingAt = InvalidXLogRecPtr; + wp->n_safekeepers += 1; + } + if (wp->n_safekeepers < 1) + { + elog(FATAL, "Safekeepers addresses are not specified"); + } + wp->quorum = wp->n_safekeepers / 2 + 1; - walprop_pg.init_walsender(); + /* Fill the greeting package */ + wp->greetRequest.tag = 'g'; + wp->greetRequest.protocolVersion = SK_PROTOCOL_VERSION; + wp->greetRequest.pgVersion = PG_VERSION_NUM; + wp->api.strong_random(&wp->greetRequest.proposerId, sizeof(wp->greetRequest.proposerId)); + wp->greetRequest.systemId = wp->config->systemId; + if (!wp->config->neon_timeline) + elog(FATAL, "neon.timeline_id is not provided"); + if (*wp->config->neon_timeline != '\0' && + !HexDecodeString(wp->greetRequest.timeline_id, wp->config->neon_timeline, 16)) + elog(FATAL, "Could not parse neon.timeline_id, %s", wp->config->neon_timeline); + if (!wp->config->neon_tenant) + elog(FATAL, "neon.tenant_id is not provided"); + if (*wp->config->neon_tenant != '\0' && + !HexDecodeString(wp->greetRequest.tenant_id, wp->config->neon_tenant, 16)) + elog(FATAL, "Could not parse neon.tenant_id, %s", wp->config->neon_tenant); - WalProposerStart(); + wp->greetRequest.timeline = wp->api.get_timeline_id(); + wp->greetRequest.walSegSize = wp->config->wal_segment_size; + + wp->api.init_event_set(wp->n_safekeepers); + + return wp; } /* @@ -149,11 +166,11 @@ WalProposerMain(Datum main_arg) * called from walsender every time the new WAL is available. */ void -WalProposerBroadcast(XLogRecPtr startpos, XLogRecPtr endpos) +WalProposerBroadcast(WalProposer *wp, XLogRecPtr startpos, XLogRecPtr endpos) { - Assert(startpos == availableLsn && endpos >= availableLsn); - availableLsn = endpos; - BroadcastAppendRequest(); + Assert(startpos == wp->availableLsn && endpos >= wp->availableLsn); + wp->availableLsn = endpos; + BroadcastAppendRequest(wp); } /* @@ -162,17 +179,17 @@ WalProposerBroadcast(XLogRecPtr startpos, XLogRecPtr endpos) * to walproposer. */ void -WalProposerPoll(void) +WalProposerPoll(WalProposer *wp) { while (true) { Safekeeper *sk = NULL; int rc = 0; uint32 events = 0; - TimestampTz now = walprop_pg.get_current_timestamp(); - long timeout = TimeToReconnect(now); + TimestampTz now = wp->api.get_current_timestamp(); + long timeout = TimeToReconnect(wp, now); - rc = walprop_pg.wait_event_set(timeout, &sk, &events); + rc = wp->api.wait_event_set(timeout, &sk, &events); /* Exit loop if latch is set (we got new WAL) */ if ((rc == 1 && events & WL_LATCH_SET)) @@ -192,7 +209,7 @@ WalProposerPoll(void) * If the timeout expired, attempt to reconnect to any safekeepers * that we dropped */ - ReconnectSafekeepers(); + ReconnectSafekeepers(wp); if (rc == 0) /* timeout expired */ { @@ -201,16 +218,16 @@ WalProposerPoll(void) * where we've not been notified of new WAL records when we were * planning on consuming them. */ - if (!syncSafekeepers) { - XLogRecPtr flushed = walprop_pg.get_flush_rec_ptr(); + if (!wp->config->syncSafekeepers) { + XLogRecPtr flushed = wp->api.get_flush_rec_ptr(); - if (flushed > availableLsn) + if (flushed > wp->availableLsn) break; } } - now = walprop_pg.get_current_timestamp(); - if (rc == 0 || TimeToReconnect(now) <= 0) /* timeout expired: poll state */ + now = wp->api.get_current_timestamp(); + if (rc == 0 || TimeToReconnect(wp, now) <= 0) /* timeout expired: poll state */ { TimestampTz now; @@ -218,24 +235,24 @@ WalProposerPoll(void) * If no WAL was generated during timeout (and we have already * collected the quorum), then send pool message */ - if (availableLsn != InvalidXLogRecPtr) + if (wp->availableLsn != InvalidXLogRecPtr) { - BroadcastAppendRequest(); + BroadcastAppendRequest(wp); } /* * Abandon connection attempts which take too long. */ - now = walprop_pg.get_current_timestamp(); - for (int i = 0; i < n_safekeepers; i++) + now = wp->api.get_current_timestamp(); + for (int i = 0; i < wp->n_safekeepers; i++) { - Safekeeper *sk = &safekeeper[i]; + Safekeeper *sk = &wp->safekeeper[i]; if (TimestampDifferenceExceeds(sk->latestMsgReceivedAt, now, - wal_acceptor_connection_timeout)) + wp->config->safekeeper_connection_timeout)) { elog(WARNING, "terminating connection to safekeeper '%s:%s' in '%s' state: no messages received during the last %dms or connection attempt took longer than that", - sk->host, sk->port, FormatSafekeeperState(sk->state), wal_acceptor_connection_timeout); + sk->host, sk->port, FormatSafekeeperState(sk->state), wp->config->safekeeper_connection_timeout); ShutdownConnection(sk); } } @@ -243,102 +260,24 @@ WalProposerPoll(void) } } -static void -WalProposerInit(XLogRecPtr flushRecPtr, uint64 systemId) -{ - char *host; - char *sep; - char *port; - - walprop_pg.load_libpqwalreceiver(); - - for (host = wal_acceptors_list; host != NULL && *host != '\0'; host = sep) - { - port = strchr(host, ':'); - if (port == NULL) - { - elog(FATAL, "port is not specified"); - } - *port++ = '\0'; - sep = strchr(port, ','); - if (sep != NULL) - *sep++ = '\0'; - if (n_safekeepers + 1 >= MAX_SAFEKEEPERS) - { - elog(FATAL, "Too many safekeepers"); - } - safekeeper[n_safekeepers].host = host; - safekeeper[n_safekeepers].port = port; - safekeeper[n_safekeepers].state = SS_OFFLINE; - safekeeper[n_safekeepers].conn = NULL; - - { - Safekeeper *sk = &safekeeper[n_safekeepers]; - int written = 0; - - written = snprintf((char *) &sk->conninfo, MAXCONNINFO, - "host=%s port=%s dbname=replication options='-c timeline_id=%s tenant_id=%s'", - sk->host, sk->port, neon_timeline, neon_tenant); - if (written > MAXCONNINFO || written < 0) - elog(FATAL, "could not create connection string for safekeeper %s:%s", sk->host, sk->port); - } - - initStringInfo(&safekeeper[n_safekeepers].outbuf); - safekeeper[n_safekeepers].xlogreader = walprop_pg.wal_reader_allocate(); - if (safekeeper[n_safekeepers].xlogreader == NULL) - elog(FATAL, "Failed to allocate xlog reader"); - safekeeper[n_safekeepers].flushWrite = false; - safekeeper[n_safekeepers].startStreamingAt = InvalidXLogRecPtr; - safekeeper[n_safekeepers].streamingAt = InvalidXLogRecPtr; - n_safekeepers += 1; - } - if (n_safekeepers < 1) - { - elog(FATAL, "Safekeepers addresses are not specified"); - } - quorum = n_safekeepers / 2 + 1; - - /* Fill the greeting package */ - greetRequest.tag = 'g'; - greetRequest.protocolVersion = SK_PROTOCOL_VERSION; - greetRequest.pgVersion = PG_VERSION_NUM; - walprop_pg.strong_random(&greetRequest.proposerId, sizeof(greetRequest.proposerId)); - greetRequest.systemId = systemId; - if (!neon_timeline) - elog(FATAL, "neon.timeline_id is not provided"); - if (*neon_timeline != '\0' && - !HexDecodeString(greetRequest.timeline_id, neon_timeline, 16)) - elog(FATAL, "Could not parse neon.timeline_id, %s", neon_timeline); - if (!neon_tenant) - elog(FATAL, "neon.tenant_id is not provided"); - if (*neon_tenant != '\0' && - !HexDecodeString(greetRequest.tenant_id, neon_tenant, 16)) - elog(FATAL, "Could not parse neon.tenant_id, %s", neon_tenant); - - greetRequest.timeline = walprop_pg.get_timeline_id(); - greetRequest.walSegSize = wal_segment_size; - - walprop_pg.init_event_set(n_safekeepers); -} - -static void -WalProposerStart(void) +void +WalProposerStart(WalProposer *wp) { /* Initiate connections to all safekeeper nodes */ - for (int i = 0; i < n_safekeepers; i++) + for (int i = 0; i < wp->n_safekeepers; i++) { - ResetConnection(&safekeeper[i]); + ResetConnection(&wp->safekeeper[i]); } - WalProposerLoop(); + WalProposerLoop(wp); } static void -WalProposerLoop(void) +WalProposerLoop(WalProposer *wp) { while (true) - WalProposerPoll(); + WalProposerPoll(wp); } /* @@ -349,20 +288,22 @@ WalProposerLoop(void) static void HackyRemoveWalProposerEvent(Safekeeper *to_remove) { + WalProposer *wp = to_remove->wp; + /* Remove the existing event set */ - walprop_pg.free_event_set(); + wp->api.free_event_set(); /* Re-initialize it without adding any safekeeper events */ - walprop_pg.init_event_set(n_safekeepers); + wp->api.init_event_set(wp->n_safekeepers); /* * loop through the existing safekeepers. If they aren't the one we're * removing, and if they have a socket we can use, re-add the applicable * events. */ - for (int i = 0; i < n_safekeepers; i++) + for (int i = 0; i < wp->n_safekeepers; i++) { uint32 desired_events = WL_NO_EVENTS; - Safekeeper *sk = &safekeeper[i]; + Safekeeper *sk = &wp->safekeeper[i]; sk->eventPos = -1; @@ -374,7 +315,7 @@ HackyRemoveWalProposerEvent(Safekeeper *to_remove) { desired_events = SafekeeperStateDesiredEvents(sk->state); /* will set sk->eventPos */ - walprop_pg.add_safekeeper_event_set(sk, desired_events); + wp->api.add_safekeeper_event_set(sk, desired_events); } } } @@ -384,7 +325,7 @@ static void ShutdownConnection(Safekeeper *sk) { if (sk->conn) - walprop_pg.conn_finish(sk->conn); + sk->wp->api.conn_finish(sk->conn); sk->conn = NULL; sk->state = SS_OFFLINE; sk->flushWrite = false; @@ -406,6 +347,8 @@ ShutdownConnection(Safekeeper *sk) static void ResetConnection(Safekeeper *sk) { + WalProposer *wp = sk->wp; + if (sk->state != SS_OFFLINE) { ShutdownConnection(sk); @@ -414,7 +357,7 @@ ResetConnection(Safekeeper *sk) /* * Try to establish new connection */ - sk->conn = walprop_pg.conn_connect_start((char *) &sk->conninfo, neon_auth_token); + sk->conn = wp->api.conn_connect_start((char *) &sk->conninfo); /* * "If the result is null, then libpq has been unable to allocate a new @@ -428,7 +371,7 @@ ResetConnection(Safekeeper *sk) * PQconnectPoll. Before we do that though, we need to check that it * didn't immediately fail. */ - if (walprop_pg.conn_status(sk->conn) == WP_CONNECTION_BAD) + if (wp->api.conn_status(sk->conn) == WP_CONNECTION_BAD) { /*--- * According to libpq docs: @@ -440,13 +383,13 @@ ResetConnection(Safekeeper *sk) * https://www.postgresql.org/docs/devel/libpq-connect.html#LIBPQ-PQCONNECTSTARTPARAMS */ elog(WARNING, "Immediate failure to connect with node '%s:%s':\n\terror: %s", - sk->host, sk->port, walprop_pg.conn_error_message(sk->conn)); + sk->host, sk->port, wp->api.conn_error_message(sk->conn)); /* * Even though the connection failed, we still need to clean up the * object */ - walprop_pg.conn_finish(sk->conn); + wp->api.conn_finish(sk->conn); sk->conn = NULL; return; } @@ -467,9 +410,9 @@ ResetConnection(Safekeeper *sk) elog(LOG, "connecting with node %s:%s", sk->host, sk->port); sk->state = SS_CONNECTING_WRITE; - sk->latestMsgReceivedAt = walprop_pg.get_current_timestamp(); + sk->latestMsgReceivedAt = wp->api.get_current_timestamp(); - walprop_pg.add_safekeeper_event_set(sk, WL_SOCKET_WRITEABLE); + wp->api.add_safekeeper_event_set(sk, WL_SOCKET_WRITEABLE); return; } @@ -479,16 +422,16 @@ ResetConnection(Safekeeper *sk) * (do we actually need this?). */ static long -TimeToReconnect(TimestampTz now) +TimeToReconnect(WalProposer *wp, TimestampTz now) { TimestampTz passed; TimestampTz till_reconnect; - if (wal_acceptor_reconnect_timeout <= 0) + if (wp->config->safekeeper_reconnect_timeout <= 0) return -1; - passed = now - last_reconnect_attempt; - till_reconnect = wal_acceptor_reconnect_timeout * 1000 - passed; + passed = now - wp->last_reconnect_attempt; + till_reconnect = wp->config->safekeeper_reconnect_timeout * 1000 - passed; if (till_reconnect <= 0) return 0; return (long) (till_reconnect / 1000); @@ -496,17 +439,17 @@ TimeToReconnect(TimestampTz now) /* If the timeout has expired, attempt to reconnect to all offline safekeepers */ static void -ReconnectSafekeepers(void) +ReconnectSafekeepers(WalProposer *wp) { - TimestampTz now = walprop_pg.get_current_timestamp(); + TimestampTz now = wp->api.get_current_timestamp(); - if (TimeToReconnect(now) == 0) + if (TimeToReconnect(wp, now) == 0) { - last_reconnect_attempt = now; - for (int i = 0; i < n_safekeepers; i++) + wp->last_reconnect_attempt = now; + for (int i = 0; i < wp->n_safekeepers; i++) { - if (safekeeper[i].state == SS_OFFLINE) - ResetConnection(&safekeeper[i]); + if (wp->safekeeper[i].state == SS_OFFLINE) + ResetConnection(&wp->safekeeper[i]); } } } @@ -614,7 +557,8 @@ AdvancePollState(Safekeeper *sk, uint32 events) static void HandleConnectionEvent(Safekeeper *sk) { - WalProposerConnectPollStatusType result = walprop_pg.conn_connect_poll(sk->conn); + WalProposer *wp = sk->wp; + WalProposerConnectPollStatusType result = wp->api.conn_connect_poll(sk->conn); /* The new set of events we'll wait on, after updating */ uint32 new_events = WL_NO_EVENTS; @@ -624,7 +568,7 @@ HandleConnectionEvent(Safekeeper *sk) case WP_CONN_POLLING_OK: elog(LOG, "connected with node %s:%s", sk->host, sk->port); - sk->latestMsgReceivedAt = walprop_pg.get_current_timestamp(); + sk->latestMsgReceivedAt = wp->api.get_current_timestamp(); /* * We have to pick some event to update event set. We'll * eventually need the socket to be readable, so we go with that. @@ -646,7 +590,7 @@ HandleConnectionEvent(Safekeeper *sk) case WP_CONN_POLLING_FAILED: elog(WARNING, "failed to connect to node '%s:%s': %s", - sk->host, sk->port, walprop_pg.conn_error_message(sk->conn)); + sk->host, sk->port, wp->api.conn_error_message(sk->conn)); /* * If connecting failed, we don't want to restart the connection @@ -663,7 +607,7 @@ HandleConnectionEvent(Safekeeper *sk) * old event and re-register an event on the new socket. */ HackyRemoveWalProposerEvent(sk); - walprop_pg.add_safekeeper_event_set(sk, new_events); + wp->api.add_safekeeper_event_set(sk, new_events); /* If we successfully connected, send START_WAL_PUSH query */ if (result == WP_CONN_POLLING_OK) @@ -678,21 +622,25 @@ HandleConnectionEvent(Safekeeper *sk) static void SendStartWALPush(Safekeeper *sk) { - if (!walprop_pg.conn_send_query(sk->conn, "START_WAL_PUSH")) + WalProposer *wp = sk->wp; + + if (!wp->api.conn_send_query(sk->conn, "START_WAL_PUSH")) { elog(WARNING, "Failed to send 'START_WAL_PUSH' query to safekeeper %s:%s: %s", - sk->host, sk->port, walprop_pg.conn_error_message(sk->conn)); + sk->host, sk->port, wp->api.conn_error_message(sk->conn)); ShutdownConnection(sk); return; } sk->state = SS_WAIT_EXEC_RESULT; - walprop_pg.update_event_set(sk, WL_SOCKET_READABLE); + wp->api.update_event_set(sk, WL_SOCKET_READABLE); } static void RecvStartWALPushResult(Safekeeper *sk) { - switch (walprop_pg.conn_get_query_result(sk->conn)) + WalProposer *wp = sk->wp; + + switch (wp->api.conn_get_query_result(sk->conn)) { /* * Successful result, move on to starting the handshake @@ -716,7 +664,7 @@ RecvStartWALPushResult(Safekeeper *sk) case WP_EXEC_FAILED: elog(WARNING, "Failed to send query to safekeeper %s:%s: %s", - sk->host, sk->port, walprop_pg.conn_error_message(sk->conn)); + sk->host, sk->port, wp->api.conn_error_message(sk->conn)); ShutdownConnection(sk); return; @@ -745,12 +693,14 @@ SendProposerGreeting(Safekeeper *sk) * On failure, logging & resetting the connection is handled. We just need * to handle the control flow. */ - BlockingWrite(sk, &greetRequest, sizeof(greetRequest), SS_HANDSHAKE_RECV); + BlockingWrite(sk, &sk->wp->greetRequest, sizeof(sk->wp->greetRequest), SS_HANDSHAKE_RECV); } static void RecvAcceptorGreeting(Safekeeper *sk) { + WalProposer *wp = sk->wp; + /* * If our reading doesn't immediately succeed, any necessary error * handling or state setting is taken care of. We can leave any other work @@ -770,32 +720,32 @@ RecvAcceptorGreeting(Safekeeper *sk) * but at worst walproposer would restart with 'term rejected', so leave as * is for now. */ - ++n_connected; - if (n_connected <= quorum) + ++wp->n_connected; + if (wp->n_connected <= wp->quorum) { /* We're still collecting terms from the majority. */ - propTerm = Max(sk->greetResponse.term, propTerm); + wp->propTerm = Max(sk->greetResponse.term, wp->propTerm); /* Quorum is acquried, prepare the vote request. */ - if (n_connected == quorum) + if (wp->n_connected == wp->quorum) { - propTerm++; - elog(LOG, "proposer connected to quorum (%d) safekeepers, propTerm=" INT64_FORMAT, quorum, propTerm); + wp->propTerm++; + elog(LOG, "proposer connected to quorum (%d) safekeepers, propTerm=" INT64_FORMAT, wp->quorum, wp->propTerm); - voteRequest = (VoteRequest) + wp->voteRequest = (VoteRequest) { .tag = 'v', - .term = propTerm + .term = wp->propTerm }; - memcpy(voteRequest.proposerId.data, greetRequest.proposerId.data, UUID_LEN); + memcpy(wp->voteRequest.proposerId.data, wp->greetRequest.proposerId.data, UUID_LEN); } } - else if (sk->greetResponse.term > propTerm) + else if (sk->greetResponse.term > wp->propTerm) { /* Another compute with higher term is running. */ elog(FATAL, "WAL acceptor %s:%s with term " INT64_FORMAT " rejects our connection request with term " INT64_FORMAT "", sk->host, sk->port, - sk->greetResponse.term, propTerm); + sk->greetResponse.term, wp->propTerm); } /* @@ -804,27 +754,27 @@ RecvAcceptorGreeting(Safekeeper *sk) * * If we do have quorum, we can start an election. */ - if (n_connected < quorum) + if (wp->n_connected < wp->quorum) { /* * SS_VOTING is an idle state; read-ready indicates the connection * closed. */ - walprop_pg.update_event_set(sk, WL_SOCKET_READABLE); + wp->api.update_event_set(sk, WL_SOCKET_READABLE); } else { /* * Now send voting request to the cohort and wait responses */ - for (int j = 0; j < n_safekeepers; j++) + for (int j = 0; j < wp->n_safekeepers; j++) { /* * Remember: SS_VOTING indicates that the safekeeper is * participating in voting, but hasn't sent anything yet. */ - if (safekeeper[j].state == SS_VOTING) - SendVoteRequest(&safekeeper[j]); + if (wp->safekeeper[j].state == SS_VOTING) + SendVoteRequest(&wp->safekeeper[j]); } } } @@ -832,10 +782,12 @@ RecvAcceptorGreeting(Safekeeper *sk) static void SendVoteRequest(Safekeeper *sk) { + WalProposer *wp = sk->wp; + /* We have quorum for voting, send our vote request */ - elog(LOG, "requesting vote from %s:%s for term " UINT64_FORMAT, sk->host, sk->port, voteRequest.term); + elog(LOG, "requesting vote from %s:%s for term " UINT64_FORMAT, sk->host, sk->port, wp->voteRequest.term); /* On failure, logging & resetting is handled */ - if (!BlockingWrite(sk, &voteRequest, sizeof(voteRequest), SS_WAIT_VERDICT)) + if (!BlockingWrite(sk, &wp->voteRequest, sizeof(wp->voteRequest), SS_WAIT_VERDICT)) return; /* If successful, wait for read-ready with SS_WAIT_VERDICT */ @@ -844,6 +796,8 @@ SendVoteRequest(Safekeeper *sk) static void RecvVoteResponse(Safekeeper *sk) { + WalProposer *wp = sk->wp; + sk->voteResponse.apm.tag = 'v'; if (!AsyncReadMessage(sk, (AcceptorProposerMessage *) & sk->voteResponse)) return; @@ -861,21 +815,21 @@ RecvVoteResponse(Safekeeper *sk) * we are not elected yet and thus need the vote. */ if ((!sk->voteResponse.voteGiven) && - (sk->voteResponse.term > propTerm || n_votes < quorum)) + (sk->voteResponse.term > wp->propTerm || wp->n_votes < wp->quorum)) { elog(FATAL, "WAL acceptor %s:%s with term " INT64_FORMAT " rejects our connection request with term " INT64_FORMAT "", sk->host, sk->port, - sk->voteResponse.term, propTerm); + sk->voteResponse.term, wp->propTerm); } - Assert(sk->voteResponse.term == propTerm); + Assert(sk->voteResponse.term == wp->propTerm); /* Handshake completed, do we have quorum? */ - n_votes++; - if (n_votes < quorum) + wp->n_votes++; + if (wp->n_votes < wp->quorum) { sk->state = SS_IDLE; /* can't do much yet, no quorum */ } - else if (n_votes > quorum) + else if (wp->n_votes > wp->quorum) { /* recovery already performed, just start streaming */ SendProposerElected(sk); @@ -884,9 +838,9 @@ RecvVoteResponse(Safekeeper *sk) { sk->state = SS_IDLE; /* Idle state waits for read-ready events */ - walprop_pg.update_event_set(sk, WL_SOCKET_READABLE); + wp->api.update_event_set(sk, WL_SOCKET_READABLE); - HandleElectedProposer(); + HandleElectedProposer(sk->wp); } } @@ -898,36 +852,36 @@ RecvVoteResponse(Safekeeper *sk) * replication from walsender. */ static void -HandleElectedProposer(void) +HandleElectedProposer(WalProposer *wp) { - DetermineEpochStartLsn(); + DetermineEpochStartLsn(wp); /* * Check if not all safekeepers are up-to-date, we need to download WAL * needed to synchronize them */ - if (truncateLsn < propEpochStartLsn) + if (wp->truncateLsn < wp->propEpochStartLsn) { elog(LOG, "start recovery because truncateLsn=%X/%X is not " "equal to epochStartLsn=%X/%X", - LSN_FORMAT_ARGS(truncateLsn), - LSN_FORMAT_ARGS(propEpochStartLsn)); + LSN_FORMAT_ARGS(wp->truncateLsn), + LSN_FORMAT_ARGS(wp->propEpochStartLsn)); /* Perform recovery */ - if (!walprop_pg.recovery_download(&safekeeper[donor], greetRequest.timeline, truncateLsn, propEpochStartLsn)) + if (!wp->api.recovery_download(&wp->safekeeper[wp->donor], wp->greetRequest.timeline, wp->truncateLsn, wp->propEpochStartLsn)) elog(FATAL, "Failed to recover state"); } - else if (syncSafekeepers) + else if (wp->config->syncSafekeepers) { /* Sync is not needed: just exit */ - walprop_pg.finish_sync_safekeepers(propEpochStartLsn); + wp->api.finish_sync_safekeepers(wp->propEpochStartLsn); /* unreachable */ } - for (int i = 0; i < n_safekeepers; i++) + for (int i = 0; i < wp->n_safekeepers; i++) { - if (safekeeper[i].state == SS_IDLE) - SendProposerElected(&safekeeper[i]); + if (wp->safekeeper[i].state == SS_IDLE) + SendProposerElected(&wp->safekeeper[i]); } /* @@ -936,7 +890,7 @@ HandleElectedProposer(void) * because that state is used only for quorum waiting. */ - if (syncSafekeepers) + if (wp->config->syncSafekeepers) { /* * Send empty message to enforce receiving feedback even from nodes @@ -944,13 +898,13 @@ HandleElectedProposer(void) * epoch which finishes sync-safeekepers who doesn't generate any real * new records. Will go away once we switch to async acks. */ - BroadcastAppendRequest(); + BroadcastAppendRequest(wp); /* keep polling until all safekeepers are synced */ return; } - walprop_pg.start_streaming(propEpochStartLsn, greetRequest.timeline); + wp->api.start_streaming(wp, wp->propEpochStartLsn, wp->greetRequest.timeline); /* Should not return here */ } @@ -970,9 +924,9 @@ GetEpoch(Safekeeper *sk) /* If LSN points to the page header, skip it */ static XLogRecPtr -SkipXLogPageHeader(XLogRecPtr lsn) +SkipXLogPageHeader(WalProposer *wp, XLogRecPtr lsn) { - if (XLogSegmentOffset(lsn, wal_segment_size) == 0) + if (XLogSegmentOffset(lsn, wp->config->wal_segment_size) == 0) { lsn += SizeOfXLogLongPHD; } @@ -992,41 +946,41 @@ SkipXLogPageHeader(XLogRecPtr lsn) * only for skipping recovery). */ static void -DetermineEpochStartLsn(void) +DetermineEpochStartLsn(WalProposer *wp) { TermHistory *dth; - propEpochStartLsn = InvalidXLogRecPtr; - donorEpoch = 0; - truncateLsn = InvalidXLogRecPtr; - timelineStartLsn = InvalidXLogRecPtr; + wp->propEpochStartLsn = InvalidXLogRecPtr; + wp->donorEpoch = 0; + wp->truncateLsn = InvalidXLogRecPtr; + wp->timelineStartLsn = InvalidXLogRecPtr; - for (int i = 0; i < n_safekeepers; i++) + for (int i = 0; i < wp->n_safekeepers; i++) { - if (safekeeper[i].state == SS_IDLE) + if (wp->safekeeper[i].state == SS_IDLE) { - if (GetEpoch(&safekeeper[i]) > donorEpoch || - (GetEpoch(&safekeeper[i]) == donorEpoch && - safekeeper[i].voteResponse.flushLsn > propEpochStartLsn)) + if (GetEpoch(&wp->safekeeper[i]) > wp->donorEpoch || + (GetEpoch(&wp->safekeeper[i]) == wp->donorEpoch && + wp->safekeeper[i].voteResponse.flushLsn > wp->propEpochStartLsn)) { - donorEpoch = GetEpoch(&safekeeper[i]); - propEpochStartLsn = safekeeper[i].voteResponse.flushLsn; - donor = i; + wp->donorEpoch = GetEpoch(&wp->safekeeper[i]); + wp->propEpochStartLsn = wp->safekeeper[i].voteResponse.flushLsn; + wp->donor = i; } - truncateLsn = Max(safekeeper[i].voteResponse.truncateLsn, truncateLsn); + wp->truncateLsn = Max(wp->safekeeper[i].voteResponse.truncateLsn, wp->truncateLsn); - if (safekeeper[i].voteResponse.timelineStartLsn != InvalidXLogRecPtr) + if (wp->safekeeper[i].voteResponse.timelineStartLsn != InvalidXLogRecPtr) { /* timelineStartLsn should be the same everywhere or unknown */ - if (timelineStartLsn != InvalidXLogRecPtr && - timelineStartLsn != safekeeper[i].voteResponse.timelineStartLsn) + if (wp->timelineStartLsn != InvalidXLogRecPtr && + wp->timelineStartLsn != wp->safekeeper[i].voteResponse.timelineStartLsn) { elog(WARNING, "inconsistent timelineStartLsn: current %X/%X, received %X/%X", - LSN_FORMAT_ARGS(timelineStartLsn), - LSN_FORMAT_ARGS(safekeeper[i].voteResponse.timelineStartLsn)); + LSN_FORMAT_ARGS(wp->timelineStartLsn), + LSN_FORMAT_ARGS(wp->safekeeper[i].voteResponse.timelineStartLsn)); } - timelineStartLsn = safekeeper[i].voteResponse.timelineStartLsn; + wp->timelineStartLsn = wp->safekeeper[i].voteResponse.timelineStartLsn; } } } @@ -1035,14 +989,14 @@ DetermineEpochStartLsn(void) * If propEpochStartLsn is 0 everywhere, we are bootstrapping -- nothing * was committed yet. Start streaming then from the basebackup LSN. */ - if (propEpochStartLsn == InvalidXLogRecPtr && !syncSafekeepers) + if (wp->propEpochStartLsn == InvalidXLogRecPtr && !wp->config->syncSafekeepers) { - propEpochStartLsn = truncateLsn = walprop_pg.get_redo_start_lsn(); - if (timelineStartLsn == InvalidXLogRecPtr) + wp->propEpochStartLsn = wp->truncateLsn = wp->api.get_redo_start_lsn(); + if (wp->timelineStartLsn == InvalidXLogRecPtr) { - timelineStartLsn = walprop_pg.get_redo_start_lsn(); + wp->timelineStartLsn = wp->api.get_redo_start_lsn(); } - elog(LOG, "bumped epochStartLsn to the first record %X/%X", LSN_FORMAT_ARGS(propEpochStartLsn)); + elog(LOG, "bumped epochStartLsn to the first record %X/%X", LSN_FORMAT_ARGS(wp->propEpochStartLsn)); } /* @@ -1050,40 +1004,40 @@ DetermineEpochStartLsn(void) * some connected safekeeper; it must have carried truncateLsn pointing to * the first record. */ - Assert((truncateLsn != InvalidXLogRecPtr) || - (syncSafekeepers && truncateLsn == propEpochStartLsn)); + Assert((wp->truncateLsn != InvalidXLogRecPtr) || + (wp->config->syncSafekeepers && wp->truncateLsn == wp->propEpochStartLsn)); /* * We will be generating WAL since propEpochStartLsn, so we should set * availableLsn to mark this LSN as the latest available position. */ - availableLsn = propEpochStartLsn; + wp->availableLsn = wp->propEpochStartLsn; /* * Proposer's term history is the donor's + its own entry. */ - dth = &safekeeper[donor].voteResponse.termHistory; - propTermHistory.n_entries = dth->n_entries + 1; - propTermHistory.entries = palloc(sizeof(TermSwitchEntry) * propTermHistory.n_entries); - memcpy(propTermHistory.entries, dth->entries, sizeof(TermSwitchEntry) * dth->n_entries); - propTermHistory.entries[propTermHistory.n_entries - 1].term = propTerm; - propTermHistory.entries[propTermHistory.n_entries - 1].lsn = propEpochStartLsn; + dth = &wp->safekeeper[wp->donor].voteResponse.termHistory; + wp->propTermHistory.n_entries = dth->n_entries + 1; + wp->propTermHistory.entries = palloc(sizeof(TermSwitchEntry) * wp->propTermHistory.n_entries); + memcpy(wp->propTermHistory.entries, dth->entries, sizeof(TermSwitchEntry) * dth->n_entries); + wp->propTermHistory.entries[wp->propTermHistory.n_entries - 1].term = wp->propTerm; + wp->propTermHistory.entries[wp->propTermHistory.n_entries - 1].lsn = wp->propEpochStartLsn; elog(LOG, "got votes from majority (%d) of nodes, term " UINT64_FORMAT ", epochStartLsn %X/%X, donor %s:%s, truncate_lsn %X/%X", - quorum, - propTerm, - LSN_FORMAT_ARGS(propEpochStartLsn), - safekeeper[donor].host, safekeeper[donor].port, - LSN_FORMAT_ARGS(truncateLsn)); + wp->quorum, + wp->propTerm, + LSN_FORMAT_ARGS(wp->propEpochStartLsn), + wp->safekeeper[wp->donor].host, wp->safekeeper[wp->donor].port, + LSN_FORMAT_ARGS(wp->truncateLsn)); /* * Ensure the basebackup we are running (at RedoStartLsn) matches LSN * since which we are going to write according to the consensus. If not, * we must bail out, as clog and other non rel data is inconsistent. */ - if (!syncSafekeepers) + if (!wp->config->syncSafekeepers) { - WalproposerShmemState * walprop_shared = walprop_pg.get_shmem_state(); + WalproposerShmemState * walprop_shared = wp->api.get_shmem_state(); /* * Basebackup LSN always points to the beginning of the record (not @@ -1091,7 +1045,7 @@ DetermineEpochStartLsn(void) * Safekeepers don't skip header as they need continious stream of * data, so correct LSN for comparison. */ - if (SkipXLogPageHeader(propEpochStartLsn) != walprop_pg.get_redo_start_lsn()) + if (SkipXLogPageHeader(wp, wp->propEpochStartLsn) != wp->api.get_redo_start_lsn()) { /* * However, allow to proceed if previously elected leader was me; @@ -1103,11 +1057,11 @@ DetermineEpochStartLsn(void) { elog(PANIC, "collected propEpochStartLsn %X/%X, but basebackup LSN %X/%X", - LSN_FORMAT_ARGS(propEpochStartLsn), - LSN_FORMAT_ARGS(walprop_pg.get_redo_start_lsn())); + LSN_FORMAT_ARGS(wp->propEpochStartLsn), + LSN_FORMAT_ARGS(wp->api.get_redo_start_lsn())); } } - walprop_shared->mineLastElectedTerm = propTerm; + walprop_shared->mineLastElectedTerm = wp->propTerm; } } @@ -1123,6 +1077,7 @@ DetermineEpochStartLsn(void) static void SendProposerElected(Safekeeper *sk) { + WalProposer *wp = sk->wp; ProposerElected msg; TermHistory *th; term_t lastCommonTerm; @@ -1140,22 +1095,22 @@ SendProposerElected(Safekeeper *sk) th = &sk->voteResponse.termHistory; /* We must start somewhere. */ - Assert(propTermHistory.n_entries >= 1); + Assert(wp->propTermHistory.n_entries >= 1); - for (i = 0; i < Min(propTermHistory.n_entries, th->n_entries); i++) + for (i = 0; i < Min(wp->propTermHistory.n_entries, th->n_entries); i++) { - if (propTermHistory.entries[i].term != th->entries[i].term) + if (wp->propTermHistory.entries[i].term != th->entries[i].term) break; /* term must begin everywhere at the same point */ - Assert(propTermHistory.entries[i].lsn == th->entries[i].lsn); + Assert(wp->propTermHistory.entries[i].lsn == th->entries[i].lsn); } i--; /* step back to the last common term */ if (i < 0) { /* safekeeper is empty or no common point, start from the beginning */ - sk->startStreamingAt = propTermHistory.entries[0].lsn; + sk->startStreamingAt = wp->propTermHistory.entries[0].lsn; - if (sk->startStreamingAt < truncateLsn) + if (sk->startStreamingAt < wp->truncateLsn) { /* * There's a gap between the WAL starting point and a truncateLsn, @@ -1176,10 +1131,10 @@ SendProposerElected(Safekeeper *sk) * safekeeper, and it's aligned to the WAL record, so we can * safely start streaming from this point. */ - sk->startStreamingAt = truncateLsn; + sk->startStreamingAt = wp->truncateLsn; elog(WARNING, "empty safekeeper joined cluster as %s:%s, historyStart=%X/%X, sk->startStreamingAt=%X/%X", - sk->host, sk->port, LSN_FORMAT_ARGS(propTermHistory.entries[0].lsn), + sk->host, sk->port, LSN_FORMAT_ARGS(wp->propTermHistory.entries[0].lsn), LSN_FORMAT_ARGS(sk->startStreamingAt)); } } @@ -1191,28 +1146,28 @@ SendProposerElected(Safekeeper *sk) * proposer, LSN it is currently writing, but then we just pick * safekeeper pos as it obviously can't be higher. */ - if (propTermHistory.entries[i].term == propTerm) + if (wp->propTermHistory.entries[i].term == wp->propTerm) { sk->startStreamingAt = sk->voteResponse.flushLsn; } else { - XLogRecPtr propEndLsn = propTermHistory.entries[i + 1].lsn; + XLogRecPtr propEndLsn = wp->propTermHistory.entries[i + 1].lsn; XLogRecPtr skEndLsn = (i + 1 < th->n_entries ? th->entries[i + 1].lsn : sk->voteResponse.flushLsn); sk->startStreamingAt = Min(propEndLsn, skEndLsn); } } - Assert(sk->startStreamingAt >= truncateLsn && sk->startStreamingAt <= availableLsn); + Assert(sk->startStreamingAt >= wp->truncateLsn && sk->startStreamingAt <= wp->availableLsn); msg.tag = 'e'; - msg.term = propTerm; + msg.term = wp->propTerm; msg.startStreamingAt = sk->startStreamingAt; - msg.termHistory = &propTermHistory; - msg.timelineStartLsn = timelineStartLsn; + msg.termHistory = &wp->propTermHistory; + msg.timelineStartLsn = wp->timelineStartLsn; - lastCommonTerm = i >= 0 ? propTermHistory.entries[i].term : 0; + lastCommonTerm = i >= 0 ? wp->propTermHistory.entries[i].term : 0; elog(LOG, "sending elected msg to node " UINT64_FORMAT " term=" UINT64_FORMAT ", startStreamingAt=%X/%X (lastCommonTerm=" UINT64_FORMAT "), termHistory.n_entries=%u to %s:%s, timelineStartLsn=%X/%X", sk->greetResponse.nodeId, msg.term, LSN_FORMAT_ARGS(msg.startStreamingAt), lastCommonTerm, msg.termHistory->n_entries, sk->host, sk->port, LSN_FORMAT_ARGS(msg.timelineStartLsn)); @@ -1276,25 +1231,25 @@ SendMessageToNode(Safekeeper *sk) * Broadcast new message to all caught-up safekeepers */ static void -BroadcastAppendRequest() +BroadcastAppendRequest(WalProposer *wp) { - for (int i = 0; i < n_safekeepers; i++) - if (safekeeper[i].state == SS_ACTIVE) - SendMessageToNode(&safekeeper[i]); + for (int i = 0; i < wp->n_safekeepers; i++) + if (wp->safekeeper[i].state == SS_ACTIVE) + SendMessageToNode(&wp->safekeeper[i]); } static void -PrepareAppendRequest(AppendRequestHeader * req, XLogRecPtr beginLsn, XLogRecPtr endLsn) +PrepareAppendRequest(WalProposer *wp, AppendRequestHeader * req, XLogRecPtr beginLsn, XLogRecPtr endLsn) { Assert(endLsn >= beginLsn); req->tag = 'a'; - req->term = propTerm; - req->epochStartLsn = propEpochStartLsn; + req->term = wp->propTerm; + req->epochStartLsn = wp->propEpochStartLsn; req->beginLsn = beginLsn; req->endLsn = endLsn; - req->commitLsn = GetAcknowledgedByQuorumWALPosition(); - req->truncateLsn = truncateLsn; - req->proposerId = greetRequest.proposerId; + req->commitLsn = GetAcknowledgedByQuorumWALPosition(wp); + req->truncateLsn = wp->truncateLsn; + req->proposerId = wp->greetRequest.proposerId; } /* @@ -1303,6 +1258,8 @@ PrepareAppendRequest(AppendRequestHeader * req, XLogRecPtr beginLsn, XLogRecPtr static void HandleActiveState(Safekeeper *sk, uint32 events) { + WalProposer *wp = sk->wp; + uint32 newEvents = WL_SOCKET_READABLE; if (events & WL_SOCKET_WRITEABLE) @@ -1322,10 +1279,10 @@ HandleActiveState(Safekeeper *sk, uint32 events) * after arrival. But it's good to have it here in case we change this * behavior in the future. */ - if (sk->streamingAt != availableLsn || sk->flushWrite) + if (sk->streamingAt != wp->availableLsn || sk->flushWrite) newEvents |= WL_SOCKET_WRITEABLE; - walprop_pg.update_event_set(sk, newEvents); + wp->api.update_event_set(sk, newEvents); } /* @@ -1340,6 +1297,7 @@ HandleActiveState(Safekeeper *sk, uint32 events) static bool SendAppendRequests(Safekeeper *sk) { + WalProposer *wp = sk->wp; XLogRecPtr endLsn; AppendRequestHeader *req; PGAsyncWriteResult writeResult; @@ -1359,7 +1317,7 @@ SendAppendRequests(Safekeeper *sk) sk->flushWrite = false; } - while (sk->streamingAt != availableLsn || !sentAnything) + while (sk->streamingAt != wp->availableLsn || !sentAnything) { sentAnything = true; @@ -1367,13 +1325,13 @@ SendAppendRequests(Safekeeper *sk) endLsn += MAX_SEND_SIZE; /* if we went beyond available WAL, back off */ - if (endLsn > availableLsn) + if (endLsn > wp->availableLsn) { - endLsn = availableLsn; + endLsn = wp->availableLsn; } req = &sk->appendRequest; - PrepareAppendRequest(&sk->appendRequest, sk->streamingAt, endLsn); + PrepareAppendRequest(sk->wp, &sk->appendRequest, sk->streamingAt, endLsn); ereport(DEBUG2, (errmsg("sending message len %ld beginLsn=%X/%X endLsn=%X/%X commitLsn=%X/%X truncateLsn=%X/%X to %s:%s", @@ -1381,7 +1339,7 @@ SendAppendRequests(Safekeeper *sk) LSN_FORMAT_ARGS(req->beginLsn), LSN_FORMAT_ARGS(req->endLsn), LSN_FORMAT_ARGS(req->commitLsn), - LSN_FORMAT_ARGS(truncateLsn), sk->host, sk->port))); + LSN_FORMAT_ARGS(wp->truncateLsn), sk->host, sk->port))); resetStringInfo(&sk->outbuf); @@ -1391,13 +1349,13 @@ SendAppendRequests(Safekeeper *sk) /* write the WAL itself */ enlargeStringInfo(&sk->outbuf, req->endLsn - req->beginLsn); /* wal_read will raise error on failure */ - walprop_pg.wal_read(sk->xlogreader, + wp->api.wal_read(sk->xlogreader, &sk->outbuf.data[sk->outbuf.len], req->beginLsn, req->endLsn - req->beginLsn); sk->outbuf.len += req->endLsn - req->beginLsn; - writeResult = walprop_pg.conn_async_write(sk->conn, sk->outbuf.data, sk->outbuf.len); + writeResult = wp->api.conn_async_write(sk->conn, sk->outbuf.data, sk->outbuf.len); /* Mark current message as sent, whatever the result is */ sk->streamingAt = endLsn; @@ -1421,7 +1379,7 @@ SendAppendRequests(Safekeeper *sk) case PG_ASYNC_WRITE_FAIL: elog(WARNING, "Failed to send to node %s:%s in %s state: %s", sk->host, sk->port, FormatSafekeeperState(sk->state), - walprop_pg.conn_error_message(sk->conn)); + wp->api.conn_error_message(sk->conn)); ShutdownConnection(sk); return false; default: @@ -1444,6 +1402,7 @@ SendAppendRequests(Safekeeper *sk) static bool RecvAppendResponses(Safekeeper *sk) { + WalProposer *wp = sk->wp; XLogRecPtr minQuorumLsn; bool readAnything = false; @@ -1465,12 +1424,12 @@ RecvAppendResponses(Safekeeper *sk) LSN_FORMAT_ARGS(sk->appendResponse.commitLsn), sk->host, sk->port))); - if (sk->appendResponse.term > propTerm) + if (sk->appendResponse.term > wp->propTerm) { /* Another compute with higher term is running. */ elog(PANIC, "WAL acceptor %s:%s with term " INT64_FORMAT " rejected our request, our term " INT64_FORMAT "", sk->host, sk->port, - sk->appendResponse.term, propTerm); + sk->appendResponse.term, wp->propTerm); } readAnything = true; @@ -1479,16 +1438,16 @@ RecvAppendResponses(Safekeeper *sk) if (!readAnything) return sk->state == SS_ACTIVE; - HandleSafekeeperResponse(); + HandleSafekeeperResponse(wp); /* * Also send the new commit lsn to all the safekeepers. */ - minQuorumLsn = GetAcknowledgedByQuorumWALPosition(); - if (minQuorumLsn > lastSentCommitLsn) + minQuorumLsn = GetAcknowledgedByQuorumWALPosition(wp); + if (minQuorumLsn > wp->lastSentCommitLsn) { - BroadcastAppendRequest(); - lastSentCommitLsn = minQuorumLsn; + BroadcastAppendRequest(wp); + wp->lastSentCommitLsn = minQuorumLsn; } return sk->state == SS_ACTIVE; @@ -1577,15 +1536,15 @@ ParsePageserverFeedbackMessage(StringInfo reply_message, PageserverFeedback * rf * last WAL record that can be safely discarded. */ static XLogRecPtr -CalculateMinFlushLsn(void) +CalculateMinFlushLsn(WalProposer *wp) { - XLogRecPtr lsn = n_safekeepers > 0 - ? safekeeper[0].appendResponse.flushLsn + XLogRecPtr lsn = wp->n_safekeepers > 0 + ? wp->safekeeper[0].appendResponse.flushLsn : InvalidXLogRecPtr; - for (int i = 1; i < n_safekeepers; i++) + for (int i = 1; i < wp->n_safekeepers; i++) { - lsn = Min(lsn, safekeeper[i].appendResponse.flushLsn); + lsn = Min(lsn, wp->safekeeper[i].appendResponse.flushLsn); } return lsn; } @@ -1594,37 +1553,37 @@ CalculateMinFlushLsn(void) * Calculate WAL position acknowledged by quorum */ static XLogRecPtr -GetAcknowledgedByQuorumWALPosition(void) +GetAcknowledgedByQuorumWALPosition(WalProposer *wp) { XLogRecPtr responses[MAX_SAFEKEEPERS]; /* * Sort acknowledged LSNs */ - for (int i = 0; i < n_safekeepers; i++) + for (int i = 0; i < wp->n_safekeepers; i++) { /* * Like in Raft, we aren't allowed to commit entries from previous * terms, so ignore reported LSN until it gets to epochStartLsn. */ - responses[i] = safekeeper[i].appendResponse.flushLsn >= propEpochStartLsn ? safekeeper[i].appendResponse.flushLsn : 0; + responses[i] = wp->safekeeper[i].appendResponse.flushLsn >= wp->propEpochStartLsn ? wp->safekeeper[i].appendResponse.flushLsn : 0; } - qsort(responses, n_safekeepers, sizeof(XLogRecPtr), CompareLsn); + qsort(responses, wp->n_safekeepers, sizeof(XLogRecPtr), CompareLsn); /* * Get the smallest LSN committed by quorum */ - return responses[n_safekeepers - quorum]; + return responses[wp->n_safekeepers - wp->quorum]; } static void -HandleSafekeeperResponse(void) +HandleSafekeeperResponse(WalProposer *wp) { XLogRecPtr minQuorumLsn; XLogRecPtr minFlushLsn; - minQuorumLsn = GetAcknowledgedByQuorumWALPosition(); - walprop_pg.process_safekeeper_feedback(safekeeper, n_safekeepers, syncSafekeepers, minQuorumLsn); + minQuorumLsn = GetAcknowledgedByQuorumWALPosition(wp); + wp->api.process_safekeeper_feedback(wp, minQuorumLsn); /* * Try to advance truncateLsn to minFlushLsn, which is the last record @@ -1640,16 +1599,16 @@ HandleSafekeeperResponse(void) * term' in Raft); 2) chunks we read from WAL and send are plain sheets of * bytes, but safekeepers ack only on record boundaries. */ - minFlushLsn = CalculateMinFlushLsn(); - if (minFlushLsn > truncateLsn) + minFlushLsn = CalculateMinFlushLsn(wp); + if (minFlushLsn > wp->truncateLsn) { - truncateLsn = minFlushLsn; + wp->truncateLsn = minFlushLsn; /* * Advance the replication slot to free up old WAL files. Note that * slot doesn't exist if we are in syncSafekeepers mode. */ - walprop_pg.confirm_wal_streamed(truncateLsn); + wp->api.confirm_wal_streamed(wp->truncateLsn); } /* @@ -1664,15 +1623,15 @@ HandleSafekeeperResponse(void) * (due to pageserver connecting to not-synced-safekeeper) we currently * wait for all seemingly alive safekeepers to get synced. */ - if (syncSafekeepers) + if (wp->config->syncSafekeepers) { int n_synced; n_synced = 0; - for (int i = 0; i < n_safekeepers; i++) + for (int i = 0; i < wp->n_safekeepers; i++) { - Safekeeper *sk = &safekeeper[i]; - bool synced = sk->appendResponse.commitLsn >= propEpochStartLsn; + Safekeeper *sk = &wp->safekeeper[i]; + bool synced = sk->appendResponse.commitLsn >= wp->propEpochStartLsn; /* alive safekeeper which is not synced yet; wait for it */ if (sk->state != SS_OFFLINE && !synced) @@ -1681,7 +1640,7 @@ HandleSafekeeperResponse(void) n_synced++; } - if (n_synced >= quorum) + if (n_synced >= wp->quorum) { /* A quorum of safekeepers has been synced! */ @@ -1694,9 +1653,9 @@ HandleSafekeeperResponse(void) * and TCP should be able to deliver the message to safekeepers in case of * network working properly. */ - BroadcastAppendRequest(); + BroadcastAppendRequest(wp); - walprop_pg.finish_sync_safekeepers(propEpochStartLsn); + wp->api.finish_sync_safekeepers(wp->propEpochStartLsn); /* unreachable */ } } @@ -1709,7 +1668,9 @@ HandleSafekeeperResponse(void) static bool AsyncRead(Safekeeper *sk, char **buf, int *buf_size) { - switch (walprop_pg.conn_async_read(sk->conn, buf, buf_size)) + WalProposer *wp = sk->wp; + + switch (wp->api.conn_async_read(sk->conn, buf, buf_size)) { case PG_ASYNC_READ_SUCCESS: return true; @@ -1721,7 +1682,7 @@ AsyncRead(Safekeeper *sk, char **buf, int *buf_size) case PG_ASYNC_READ_FAIL: elog(WARNING, "Failed to read from node %s:%s in %s state: %s", sk->host, sk->port, FormatSafekeeperState(sk->state), - walprop_pg.conn_error_message(sk->conn)); + wp->api.conn_error_message(sk->conn)); ShutdownConnection(sk); return false; } @@ -1741,6 +1702,8 @@ AsyncRead(Safekeeper *sk, char **buf, int *buf_size) static bool AsyncReadMessage(Safekeeper *sk, AcceptorProposerMessage * anymsg) { + WalProposer *wp = sk->wp; + char *buf; int buf_size; uint64 tag; @@ -1762,7 +1725,7 @@ AsyncReadMessage(Safekeeper *sk, AcceptorProposerMessage * anymsg) ResetConnection(sk); return false; } - sk->latestMsgReceivedAt = walprop_pg.get_current_timestamp(); + sk->latestMsgReceivedAt = wp->api.get_current_timestamp(); switch (tag) { case 'g': @@ -1828,13 +1791,14 @@ AsyncReadMessage(Safekeeper *sk, AcceptorProposerMessage * anymsg) static bool BlockingWrite(Safekeeper *sk, void *msg, size_t msg_size, SafekeeperState success_state) { + WalProposer *wp = sk->wp; uint32 events; - if (!walprop_pg.conn_blocking_write(sk->conn, msg, msg_size)) + if (!wp->api.conn_blocking_write(sk->conn, msg, msg_size)) { elog(WARNING, "Failed to send to node %s:%s in %s state: %s", sk->host, sk->port, FormatSafekeeperState(sk->state), - walprop_pg.conn_error_message(sk->conn)); + wp->api.conn_error_message(sk->conn)); ShutdownConnection(sk); return false; } @@ -1847,7 +1811,7 @@ BlockingWrite(Safekeeper *sk, void *msg, size_t msg_size, SafekeeperState succes */ events = SafekeeperStateDesiredEvents(success_state); if (events) - walprop_pg.update_event_set(sk, events); + wp->api.update_event_set(sk, events); return true; } @@ -1862,7 +1826,9 @@ BlockingWrite(Safekeeper *sk, void *msg, size_t msg_size, SafekeeperState succes static bool AsyncWrite(Safekeeper *sk, void *msg, size_t msg_size, SafekeeperState flush_state) { - switch (walprop_pg.conn_async_write(sk->conn, msg, msg_size)) + WalProposer *wp = sk->wp; + + switch (wp->api.conn_async_write(sk->conn, msg, msg_size)) { case PG_ASYNC_WRITE_SUCCESS: return true; @@ -1874,12 +1840,12 @@ AsyncWrite(Safekeeper *sk, void *msg, size_t msg_size, SafekeeperState flush_sta * this function */ sk->state = flush_state; - walprop_pg.update_event_set(sk, WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE); + wp->api.update_event_set(sk, WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE); return false; case PG_ASYNC_WRITE_FAIL: elog(WARNING, "Failed to send to node %s:%s in %s state: %s", sk->host, sk->port, FormatSafekeeperState(sk->state), - walprop_pg.conn_error_message(sk->conn)); + wp->api.conn_error_message(sk->conn)); ShutdownConnection(sk); return false; default: @@ -1899,13 +1865,15 @@ AsyncWrite(Safekeeper *sk, void *msg, size_t msg_size, SafekeeperState flush_sta static bool AsyncFlush(Safekeeper *sk) { + WalProposer *wp = sk->wp; + /*--- * PQflush returns: * 0 if successful [we're good to move on] * 1 if unable to send everything yet [call PQflush again] * -1 if it failed [emit an error] */ - switch (walprop_pg.conn_flush(sk->conn)) + switch (wp->api.conn_flush(sk->conn)) { case 0: /* flush is done */ @@ -1916,7 +1884,7 @@ AsyncFlush(Safekeeper *sk) case -1: elog(WARNING, "Failed to flush write to node %s:%s in %s state: %s", sk->host, sk->port, FormatSafekeeperState(sk->state), - walprop_pg.conn_error_message(sk->conn)); + wp->api.conn_error_message(sk->conn)); ResetConnection(sk); return false; default: diff --git a/pgxn/neon/walproposer.h b/pgxn/neon/walproposer.h index 2b2f2d8354..41d9ee95f1 100644 --- a/pgxn/neon/walproposer.h +++ b/pgxn/neon/walproposer.h @@ -317,11 +317,16 @@ typedef struct AppendResponse /* Other fields are fixed part */ #define APPENDRESPONSE_FIXEDPART_SIZE offsetof(AppendResponse, rf) +struct WalProposer; +typedef struct WalProposer WalProposer; + /* * Descriptor of safekeeper */ typedef struct Safekeeper { + WalProposer *wp; + char const *host; char const *port; @@ -369,13 +374,6 @@ typedef struct Safekeeper AppendResponse appendResponse; /* feedback for master */ } Safekeeper; -extern void PGDLLEXPORT WalProposerSync(int argc, char *argv[]); -extern void PGDLLEXPORT WalProposerMain(Datum main_arg); -extern void WalProposerBroadcast(XLogRecPtr startpos, XLogRecPtr endpos); -extern void WalProposerPoll(void); -extern void ParsePageserverFeedbackMessage(StringInfo reply_message, - PageserverFeedback *rf); - /* Re-exported PostgresPollingStatusType */ typedef enum { @@ -436,17 +434,13 @@ typedef enum typedef struct walproposer_api { WalproposerShmemState * (*get_shmem_state) (void); - void (*start_streaming) (XLogRecPtr startpos, TimeLineID timeline); - void (*init_walsender) (void); - void (*init_standalone_sync_safekeepers) (void); - uint64 (*init_bgworker) (void); + void (*start_streaming) (WalProposer *wp, XLogRecPtr startpos, TimeLineID timeline); XLogRecPtr (*get_flush_rec_ptr) (void); TimestampTz (*get_current_timestamp) (void); TimeLineID (*get_timeline_id) (void); - void (*load_libpqwalreceiver) (void); char * (*conn_error_message) (WalProposerConn * conn); WalProposerConnStatusType (*conn_status) (WalProposerConn * conn); - WalProposerConn * (*conn_connect_start) (char *conninfo, char *password); + WalProposerConn * (*conn_connect_start) (char *conninfo); WalProposerConnectPollStatusType (*conn_connect_poll) (WalProposerConn * conn); bool (*conn_send_query) (WalProposerConn * conn, char * query); WalProposerExecStatusType (*conn_get_query_result) (WalProposerConn * conn); @@ -467,10 +461,67 @@ typedef struct walproposer_api bool (*strong_random) (void *buf, size_t len); XLogRecPtr (*get_redo_start_lsn) (void); void (*finish_sync_safekeepers) (XLogRecPtr lsn); - void (*process_safekeeper_feedback) (Safekeeper * safekeepers, int n_safekeepers, bool isSync, XLogRecPtr commitLsn); + void (*process_safekeeper_feedback) (WalProposer *wp, XLogRecPtr commitLsn); void (*confirm_wal_streamed) (XLogRecPtr lsn); } walproposer_api; -extern const walproposer_api walprop_pg; +typedef struct WalProposerConfig { + char *neon_tenant; + char *neon_timeline; + char *safekeepers_list; + int safekeeper_reconnect_timeout; + int safekeeper_connection_timeout; + int wal_segment_size; + bool syncSafekeepers; + uint64 systemId; +} WalProposerConfig; + +typedef struct WalProposer { + WalProposerConfig *config; + int n_safekeepers; + int quorum; + Safekeeper safekeeper[MAX_SAFEKEEPERS]; + /* WAL has been generated up to this point */ + XLogRecPtr availableLsn; + /* last commitLsn broadcast to safekeepers */ + XLogRecPtr lastSentCommitLsn; + ProposerGreeting greetRequest; + /* Vote request for safekeeper */ + VoteRequest voteRequest; + /* + * Minimal LSN which may be needed for recovery of some safekeeper, + * record-aligned (first record which might not yet received by someone). + */ + XLogRecPtr truncateLsn; + /* + * Term of the proposer. We want our term to be highest and unique, + * so we collect terms from safekeepers quorum, choose max and +1. + * After that our term is fixed and must not change. If we observe + * that some safekeeper has higher term, it means that we have another + * running compute, so we must stop immediately. + */ + term_t propTerm; + /* term history of the proposer */ + TermHistory propTermHistory; + /* epoch start lsn of the proposer */ + XLogRecPtr propEpochStartLsn; + /* Most advanced acceptor epoch */ + term_t donorEpoch; + /* Most advanced acceptor */ + int donor; + /* timeline globally starts at this LSN */ + XLogRecPtr timelineStartLsn; + int n_votes; + int n_connected; + TimestampTz last_reconnect_attempt; + walproposer_api api; +} WalProposer; + +extern WalProposer *WalProposerCreate(WalProposerConfig *config, walproposer_api api); +extern void WalProposerStart(WalProposer *wp); +extern void WalProposerBroadcast(WalProposer *wp, XLogRecPtr startpos, XLogRecPtr endpos); +extern void WalProposerPoll(WalProposer *wp); +extern void ParsePageserverFeedbackMessage(StringInfo reply_message, + PageserverFeedback *rf); #endif /* __NEON_WALPROPOSER_H__ */ diff --git a/pgxn/neon/walproposer_pg.c b/pgxn/neon/walproposer_pg.c index f9ee86d958..7c3039dafe 100644 --- a/pgxn/neon/walproposer_pg.c +++ b/pgxn/neon/walproposer_pg.c @@ -50,8 +50,10 @@ int wal_acceptor_reconnect_timeout = 1000; int wal_acceptor_connection_timeout = 10000; static AppendResponse quorumFeedback; - static WalproposerShmemState * walprop_shared; +static WalProposerConfig walprop_config; + +static const walproposer_api walprop_pg; static void nwp_shmem_startup_hook(void); static void nwp_register_gucs(void); @@ -60,6 +62,12 @@ static uint64 backpressure_lag_impl(void); static bool backpressure_throttling_impl(void); static void walprop_register_bgworker(void); +static void walprop_pg_init_standalone_sync_safekeepers(void); +static void walprop_pg_init_walsender(void); +static void walprop_pg_init_bgworker(void); +static TimestampTz walprop_pg_get_current_timestamp(void); +static void walprop_pg_load_libpqwalreceiver(void); + static process_interrupts_callback_t PrevProcessInterruptsCallback; static shmem_startup_hook_type prev_shmem_startup_hook_type; #if PG_VERSION_NUM >= 150000 @@ -69,13 +77,65 @@ static void walproposer_shmem_request(void); static XLogRecPtr sentPtr = InvalidXLogRecPtr; -static void StartProposerReplication(StartReplicationCmd *cmd); -static void WalSndLoop(void); -static void XLogBroadcastWalProposer(void); +static void StartProposerReplication(WalProposer *wp, StartReplicationCmd *cmd); +static void WalSndLoop(WalProposer *wp); +static void XLogBroadcastWalProposer(WalProposer *wp); static void XLogWalPropWrite(char *buf, Size nbytes, XLogRecPtr recptr); static void XLogWalPropClose(XLogRecPtr recptr); +static void +init_walprop_config(bool syncSafekeepers) +{ + walprop_config.neon_tenant = neon_tenant; + walprop_config.neon_timeline = neon_timeline; + walprop_config.safekeepers_list = wal_acceptors_list; + walprop_config.safekeeper_reconnect_timeout = wal_acceptor_reconnect_timeout; + walprop_config.safekeeper_connection_timeout = wal_acceptor_connection_timeout; + walprop_config.wal_segment_size = wal_segment_size; + walprop_config.syncSafekeepers = syncSafekeepers; + if (!syncSafekeepers) + walprop_config.systemId = GetSystemIdentifier(); + else + walprop_config.systemId = 0; +} + +/* + * Entry point for `postgres --sync-safekeepers`. + */ +PGDLLEXPORT void +WalProposerSync(int argc, char *argv[]) +{ + WalProposer *wp; + + init_walprop_config(true); + walprop_pg_init_standalone_sync_safekeepers(); + walprop_pg_load_libpqwalreceiver(); + + wp = WalProposerCreate(&walprop_config, walprop_pg); + + WalProposerStart(wp); +} + +/* + * WAL proposer bgworker entry point. + */ +PGDLLEXPORT void +WalProposerMain(Datum main_arg) +{ + WalProposer *wp; + + init_walprop_config(false); + walprop_pg_init_bgworker(); + walprop_pg_load_libpqwalreceiver(); + + wp = WalProposerCreate(&walprop_config, walprop_pg); + wp->last_reconnect_attempt = walprop_pg_get_current_timestamp(); + + walprop_pg_init_walsender(); + WalProposerStart(wp); +} + /* * Initialize GUCs, bgworker, shmem and backpressure. */ @@ -341,7 +401,7 @@ replication_feedback_get_lsns(XLogRecPtr *writeLsn, XLogRecPtr *flushLsn, XLogRe * Start walsender streaming replication */ static void -walprop_pg_start_streaming(XLogRecPtr startpos, TimeLineID timeline) +walprop_pg_start_streaming(WalProposer *wp, XLogRecPtr startpos, TimeLineID timeline) { StartReplicationCmd cmd; @@ -350,7 +410,7 @@ walprop_pg_start_streaming(XLogRecPtr startpos, TimeLineID timeline) cmd.slotname = WAL_PROPOSER_SLOT_NAME; cmd.timeline = timeline; cmd.startpoint = startpos; - StartProposerReplication(&cmd); + StartProposerReplication(wp, &cmd); } static void @@ -413,7 +473,7 @@ walprop_pg_init_standalone_sync_safekeepers(void) BackgroundWorkerUnblockSignals(); } -static uint64 +static void walprop_pg_init_bgworker(void) { #if PG_VERSION_NUM >= 150000 @@ -436,8 +496,6 @@ walprop_pg_init_bgworker(void) #else GetXLogReplayRecPtr(&ThisTimeLineID); #endif - - return GetSystemIdentifier(); } static XLogRecPtr @@ -522,13 +580,14 @@ walprop_status(WalProposerConn *conn) } static WalProposerConn * -walprop_connect_start(char *conninfo, char *password) +walprop_connect_start(char *conninfo) { WalProposerConn *conn; PGconn *pg_conn; const char *keywords[3]; const char *values[3]; int n; + char *password = neon_auth_token; /* * Connect using the given connection string. If the @@ -901,7 +960,7 @@ walprop_blocking_write(WalProposerConn *conn, void const *buf, size_t size) * to the main loop. */ static void -StartProposerReplication(StartReplicationCmd *cmd) +StartProposerReplication(WalProposer *wp, StartReplicationCmd *cmd) { XLogRecPtr FlushPtr; TimeLineID currTLI; @@ -986,7 +1045,7 @@ StartProposerReplication(StartReplicationCmd *cmd) SyncRepInitConfig(); /* Infinite send loop, never returns */ - WalSndLoop(); + WalSndLoop(wp); WalSndSetState(WALSNDSTATE_STARTUP); @@ -999,7 +1058,7 @@ StartProposerReplication(StartReplicationCmd *cmd) * Synchronous replication sets latch in WalSndWakeup at walsender.c */ static void -WalSndLoop(void) +WalSndLoop(WalProposer *wp) { /* Clear any already-pending wakeups */ ResetLatch(MyLatch); @@ -1008,11 +1067,11 @@ WalSndLoop(void) { CHECK_FOR_INTERRUPTS(); - XLogBroadcastWalProposer(); + XLogBroadcastWalProposer(wp); if (MyWalSnd->state == WALSNDSTATE_CATCHUP) WalSndSetState(WALSNDSTATE_STREAMING); - WalProposerPoll(); + WalProposerPoll(wp); } } @@ -1020,7 +1079,7 @@ WalSndLoop(void) * Notify walproposer about the new WAL position. */ static void -XLogBroadcastWalProposer(void) +XLogBroadcastWalProposer(WalProposer *wp) { XLogRecPtr startptr; XLogRecPtr endptr; @@ -1075,7 +1134,7 @@ XLogBroadcastWalProposer(void) if (endptr <= startptr) return; - WalProposerBroadcast(startptr, endptr); + WalProposerBroadcast(wp, startptr, endptr); sentPtr = endptr; /* Update shared memory status */ @@ -1449,25 +1508,25 @@ walprop_pg_finish_sync_safekeepers(XLogRecPtr lsn) * Get PageserverFeedback fields from the most advanced safekeeper */ static void -GetLatestNeonFeedback(PageserverFeedback * rf, Safekeeper * safekeepers, int n_safekeepers) +GetLatestNeonFeedback(PageserverFeedback * rf, WalProposer *wp) { int latest_safekeeper = 0; XLogRecPtr last_received_lsn = InvalidXLogRecPtr; - for (int i = 0; i < n_safekeepers; i++) + for (int i = 0; i < wp->n_safekeepers; i++) { - if (safekeepers[i].appendResponse.rf.last_received_lsn > last_received_lsn) + if (wp->safekeeper[i].appendResponse.rf.last_received_lsn > last_received_lsn) { latest_safekeeper = i; - last_received_lsn = safekeepers[i].appendResponse.rf.last_received_lsn; + last_received_lsn = wp->safekeeper[i].appendResponse.rf.last_received_lsn; } } - rf->currentClusterSize = safekeepers[latest_safekeeper].appendResponse.rf.currentClusterSize; - rf->last_received_lsn = safekeepers[latest_safekeeper].appendResponse.rf.last_received_lsn; - rf->disk_consistent_lsn = safekeepers[latest_safekeeper].appendResponse.rf.disk_consistent_lsn; - rf->remote_consistent_lsn = safekeepers[latest_safekeeper].appendResponse.rf.remote_consistent_lsn; - rf->replytime = safekeepers[latest_safekeeper].appendResponse.rf.replytime; + rf->currentClusterSize = wp->safekeeper[latest_safekeeper].appendResponse.rf.currentClusterSize; + rf->last_received_lsn = wp->safekeeper[latest_safekeeper].appendResponse.rf.last_received_lsn; + rf->disk_consistent_lsn = wp->safekeeper[latest_safekeeper].appendResponse.rf.disk_consistent_lsn; + rf->remote_consistent_lsn = wp->safekeeper[latest_safekeeper].appendResponse.rf.remote_consistent_lsn; + rf->replytime = wp->safekeeper[latest_safekeeper].appendResponse.rf.replytime; elog(DEBUG2, "GetLatestNeonFeedback: currentClusterSize %lu," " last_received_lsn %X/%X, disk_consistent_lsn %X/%X, remote_consistent_lsn %X/%X, replytime %lu", @@ -1484,17 +1543,17 @@ GetLatestNeonFeedback(PageserverFeedback * rf, Safekeeper * safekeepers, int n_s * Combine hot standby feedbacks from all safekeepers. */ static void -CombineHotStanbyFeedbacks(HotStandbyFeedback * hs, Safekeeper * safekeepers, int n_safekeepers) +CombineHotStanbyFeedbacks(HotStandbyFeedback * hs, WalProposer *wp) { hs->ts = 0; hs->xmin.value = ~0; /* largest unsigned value */ hs->catalog_xmin.value = ~0; /* largest unsigned value */ - for (int i = 0; i < n_safekeepers; i++) + for (int i = 0; i < wp->n_safekeepers; i++) { - if (safekeepers[i].appendResponse.hs.ts != 0) + if (wp->safekeeper[i].appendResponse.hs.ts != 0) { - HotStandbyFeedback *skhs = &safekeepers[i].appendResponse.hs; + HotStandbyFeedback *skhs = &wp->safekeeper[i].appendResponse.hs; if (FullTransactionIdIsNormal(skhs->xmin) && FullTransactionIdPrecedes(skhs->xmin, hs->xmin)) { @@ -1517,17 +1576,17 @@ CombineHotStanbyFeedbacks(HotStandbyFeedback * hs, Safekeeper * safekeepers, int } static void -walprop_pg_process_safekeeper_feedback(Safekeeper * safekeepers, int n_safekeepers, bool isSync, XLogRecPtr commitLsn) +walprop_pg_process_safekeeper_feedback(WalProposer *wp, XLogRecPtr commitLsn) { HotStandbyFeedback hsFeedback; XLogRecPtr diskConsistentLsn; diskConsistentLsn = quorumFeedback.rf.disk_consistent_lsn; - if (!isSync) + if (!wp->config->syncSafekeepers) { /* Get PageserverFeedback fields from the most advanced safekeeper */ - GetLatestNeonFeedback(&quorumFeedback.rf, safekeepers, n_safekeepers); + GetLatestNeonFeedback(&quorumFeedback.rf, wp); SetZenithCurrentClusterSize(quorumFeedback.rf.currentClusterSize); } @@ -1538,7 +1597,7 @@ walprop_pg_process_safekeeper_feedback(Safekeeper * safekeepers, int n_safekeepe quorumFeedback.flushLsn = commitLsn; /* advance the replication slot */ - if (!isSync) + if (!wp->config->syncSafekeepers) ProcessStandbyReply( /* write_lsn - This is what durably stored in WAL service. */ quorumFeedback.flushLsn, @@ -1553,11 +1612,11 @@ walprop_pg_process_safekeeper_feedback(Safekeeper * safekeepers, int n_safekeepe walprop_pg_get_current_timestamp(), false); } - CombineHotStanbyFeedbacks(&hsFeedback, safekeepers, n_safekeepers); + CombineHotStanbyFeedbacks(&hsFeedback, wp); if (hsFeedback.ts != 0 && memcmp(&hsFeedback, &quorumFeedback.hs, sizeof hsFeedback) != 0) { quorumFeedback.hs = hsFeedback; - if (!isSync) + if (!wp->config->syncSafekeepers) ProcessStandbyHSFeedback(hsFeedback.ts, XidFromFullTransactionId(hsFeedback.xmin), EpochFromFullTransactionId(hsFeedback.xmin), @@ -1576,16 +1635,12 @@ walprop_pg_confirm_wal_streamed(XLogRecPtr lsn) /* * Temporary globally exported walproposer API for postgres. */ -const walproposer_api walprop_pg = { +static const walproposer_api walprop_pg = { .get_shmem_state = walprop_pg_get_shmem_state, .start_streaming = walprop_pg_start_streaming, - .init_walsender = walprop_pg_init_walsender, - .init_standalone_sync_safekeepers = walprop_pg_init_standalone_sync_safekeepers, - .init_bgworker = walprop_pg_init_bgworker, .get_flush_rec_ptr = walprop_pg_get_flush_rec_ptr, .get_current_timestamp = walprop_pg_get_current_timestamp, .get_timeline_id = walprop_pg_get_timeline_id, - .load_libpqwalreceiver = walprop_pg_load_libpqwalreceiver, .conn_error_message = walprop_error_message, .conn_status = walprop_status, .conn_connect_start = walprop_connect_start,