diff --git a/pgxn/neon/neon.h b/pgxn/neon/neon.h index f711a78052..3300c67456 100644 --- a/pgxn/neon/neon.h +++ b/pgxn/neon/neon.h @@ -18,6 +18,10 @@ extern char *neon_auth_token; extern char *neon_timeline; extern char *neon_tenant; +extern char *wal_acceptors_list; +extern int wal_acceptor_reconnect_timeout; +extern int wal_acceptor_connection_timeout; + extern void pg_init_libpagestore(void); extern void pg_init_walproposer(void); diff --git a/pgxn/neon/walproposer.c b/pgxn/neon/walproposer.c index a9b9c97465..5bfce37fc5 100644 --- a/pgxn/neon/walproposer.c +++ b/pgxn/neon/walproposer.c @@ -7,9 +7,9 @@ * * We have two ways of launching WalProposer: * - * 1. As a background worker which will run physical WalSender with - * am_wal_proposer flag set to true. WalSender in turn would handle WAL - * reading part and call WalProposer when ready to scatter WAL. + * 1. As a background worker which will pretend to be physical WalSender. + * WalProposer will receive notifications about new available WAL and + * will immediately broadcast it to alive safekeepers. * * 2. As a standalone utility by running `postgres --sync-safekeepers`. That * is needed to create LSN from which it is safe to start postgres. More @@ -28,6 +28,10 @@ * playing consensus game is impossible, so speculative 'let's just poll * safekeepers, learn start LSN of future epoch and run basebackup' * won't work. + * + * Both ways are implemented in walproposer_pg.c file. This file contains + * generic part of walproposer which can be used in both cases, but can also + * be used as an independent library. * *------------------------------------------------------------------------- */ @@ -227,13 +231,14 @@ WalProposerPoll(WalProposer *wp) } now = wp->api.get_current_timestamp(); - if (rc == 0 || TimeToReconnect(wp, now) <= 0) /* timeout expired: poll state */ + /* timeout expired: poll state */ + if (rc == 0 || TimeToReconnect(wp, now) <= 0) { TimestampTz now; /* * If no WAL was generated during timeout (and we have already - * collected the quorum), then send pool message + * collected the quorum), then send empty keepalive message */ if (wp->availableLsn != InvalidXLogRecPtr) { @@ -904,7 +909,7 @@ HandleElectedProposer(WalProposer *wp) return; } - wp->api.start_streaming(wp, wp->propEpochStartLsn, wp->greetRequest.timeline); + wp->api.start_streaming(wp, wp->propEpochStartLsn); /* Should not return here */ } diff --git a/pgxn/neon/walproposer.h b/pgxn/neon/walproposer.h index 41d9ee95f1..63f7cf1eb0 100644 --- a/pgxn/neon/walproposer.h +++ b/pgxn/neon/walproposer.h @@ -22,10 +22,6 @@ */ #define WL_NO_EVENTS 0 -extern char *wal_acceptors_list; -extern int wal_acceptor_reconnect_timeout; -extern int wal_acceptor_connection_timeout; - struct WalProposerConn; /* Defined in implementation (walprop_pg.c) */ typedef struct WalProposerConn WalProposerConn; @@ -433,66 +429,198 @@ typedef enum */ typedef struct walproposer_api { + /* + * Get WalproposerShmemState. This is used to store information about + * last elected term. + */ WalproposerShmemState * (*get_shmem_state) (void); - void (*start_streaming) (WalProposer *wp, XLogRecPtr startpos, TimeLineID timeline); + + /* + * Start receiving notifications about new WAL. This is an infinite loop + * which calls WalProposerBroadcast() and WalProposerPoll() to send the WAL. + */ + void (*start_streaming) (WalProposer *wp, XLogRecPtr startpos); + + /* Get pointer to the latest available WAL. */ XLogRecPtr (*get_flush_rec_ptr) (void); + + /* Get current time. */ TimestampTz (*get_current_timestamp) (void); + + /* Get postgres timeline. */ TimeLineID (*get_timeline_id) (void); + + /* Current error message, aka PQerrorMessage. */ char * (*conn_error_message) (WalProposerConn * conn); + + /* Connection status, aka PQstatus. */ WalProposerConnStatusType (*conn_status) (WalProposerConn * conn); + + /* Start the connection, aka PQconnectStart. */ WalProposerConn * (*conn_connect_start) (char *conninfo); + + /* Poll an asynchronous connection, aka PQconnectPoll. */ WalProposerConnectPollStatusType (*conn_connect_poll) (WalProposerConn * conn); + + /* Send a blocking SQL query, aka PQsendQuery. */ bool (*conn_send_query) (WalProposerConn * conn, char * query); + + /* Read the query result, aka PQgetResult. */ WalProposerExecStatusType (*conn_get_query_result) (WalProposerConn * conn); - pgsocket (*conn_socket) (WalProposerConn * conn); + + /* Flush buffer to the network, aka PQflush. */ int (*conn_flush) (WalProposerConn * conn); + + /* Close the connection, aka PQfinish. */ void (*conn_finish) (WalProposerConn * conn); + + /* Try to read CopyData message, aka PQgetCopyData. */ PGAsyncReadResult (*conn_async_read) (WalProposerConn * conn, char **buf, int *amount); + + /* Try to write CopyData message, aka PQputCopyData. */ PGAsyncWriteResult (*conn_async_write) (WalProposerConn * conn, void const *buf, size_t size); + + /* Blocking CopyData write, aka PQputCopyData + PQflush. */ bool (*conn_blocking_write) (WalProposerConn * conn, void const *buf, size_t size); + + /* Download WAL from startpos to endpos and make it available locally. */ bool (*recovery_download) (Safekeeper * sk, TimeLineID timeline, XLogRecPtr startpos, XLogRecPtr endpos); + + /* Read WAL from disk to buf. */ void (*wal_read) (XLogReaderState *state, char *buf, XLogRecPtr startptr, Size count); + + /* Allocate WAL reader. */ XLogReaderState * (*wal_reader_allocate) (void); + + /* Deallocate event set. */ void (*free_event_set) (void); + + /* Initialize event set. */ void (*init_event_set) (int n_safekeepers); + + /* Update events for an existing safekeeper connection. */ void (*update_event_set) (Safekeeper * sk, uint32 events); + + /* Add a new safekeeper connection to the event set. */ void (*add_safekeeper_event_set) (Safekeeper * sk, uint32 events); + + /* + * Wait until some event happens: + * - timeout is reached + * - socket event for safekeeper connection + * - new WAL is available + * + * Returns 0 if timeout is reached, 1 if some event happened. Updates events mask + * to indicate events and sets sk to the safekeeper which has an event. + */ int (*wait_event_set) (long timeout, Safekeeper **sk, uint32 *events); + + /* Read random bytes. */ bool (*strong_random) (void *buf, size_t len); + + /* + * Get a basebackup LSN. Used to cross-validate with the latest available + * LSN on the safekeepers. + */ XLogRecPtr (*get_redo_start_lsn) (void); + + /* + * Finish sync safekeepers with the given LSN. This function should not + * return and should exit the program. + */ void (*finish_sync_safekeepers) (XLogRecPtr lsn); + + /* + * Called after every new message from the safekeeper. Used to propagate + * backpressure feedback and to confirm WAL persistence (has been commited on + * the quorum of safekeepers). + */ void (*process_safekeeper_feedback) (WalProposer *wp, XLogRecPtr commitLsn); + + /* + * Called on peer_horizon_lsn updates. Used to advance replication slot + * and to free up disk space by deleting unnecessary WAL. + */ void (*confirm_wal_streamed) (XLogRecPtr lsn); } walproposer_api; +/* + * Configuration of the WAL proposer. + */ typedef struct WalProposerConfig { + /* hex-encoded TenantId cstr */ char *neon_tenant; + + /* hex-encoded TimelineId cstr */ char *neon_timeline; + + /* + * Comma-separated list of safekeepers, in the following format: + * host1:port1,host2:port2,host3:port3 + * + * This cstr should be editable. + */ char *safekeepers_list; + + /* + * WalProposer reconnects to offline safekeepers once in this interval. + * Time is in milliseconds. + */ int safekeeper_reconnect_timeout; + + /* + * WalProposer terminates the connection if it doesn't receive any message + * from the safekeeper in this interval. Time is in milliseconds. + */ int safekeeper_connection_timeout; + + /* + * WAL segment size. Will be passed to safekeepers in greet request. + * Also used to detect page headers. + */ int wal_segment_size; + + /* + * If safekeeper was started in sync mode, walproposer will not subscribe + * for new WAL and will exit when quorum of safekeepers will be synced + * to the latest available LSN. + */ bool syncSafekeepers; + + /* Will be passed to safekeepers in greet request. */ uint64 systemId; } WalProposerConfig; + +/* + * WAL proposer state. + */ typedef struct WalProposer { WalProposerConfig *config; int n_safekeepers; + + /* (n_safekeepers / 2) + 1 */ int quorum; + Safekeeper safekeeper[MAX_SAFEKEEPERS]; + /* WAL has been generated up to this point */ XLogRecPtr availableLsn; - /* last commitLsn broadcast to safekeepers */ + + /* last commitLsn broadcasted to safekeepers */ XLogRecPtr lastSentCommitLsn; + ProposerGreeting greetRequest; + /* Vote request for safekeeper */ VoteRequest voteRequest; + /* * Minimal LSN which may be needed for recovery of some safekeeper, * record-aligned (first record which might not yet received by someone). */ XLogRecPtr truncateLsn; + /* * Term of the proposer. We want our term to be highest and unique, * so we collect terms from safekeepers quorum, choose max and +1. @@ -501,19 +629,31 @@ typedef struct WalProposer { * running compute, so we must stop immediately. */ term_t propTerm; + /* term history of the proposer */ TermHistory propTermHistory; + /* epoch start lsn of the proposer */ XLogRecPtr propEpochStartLsn; + /* Most advanced acceptor epoch */ term_t donorEpoch; + /* Most advanced acceptor */ int donor; + /* timeline globally starts at this LSN */ XLogRecPtr timelineStartLsn; + + /* number of votes collected from safekeepers */ int n_votes; + + /* number of successful connections over the lifetime of walproposer */ int n_connected; + + /* Timestamp of the last reconnection attempt. Related to config->safekeeper_reconnect_timeout */ TimestampTz last_reconnect_attempt; + walproposer_api api; } WalProposer; diff --git a/pgxn/neon/walproposer_pg.c b/pgxn/neon/walproposer_pg.c index 7c3039dafe..113616c37b 100644 --- a/pgxn/neon/walproposer_pg.c +++ b/pgxn/neon/walproposer_pg.c @@ -52,7 +52,7 @@ int wal_acceptor_connection_timeout = 10000; static AppendResponse quorumFeedback; static WalproposerShmemState * walprop_shared; static WalProposerConfig walprop_config; - +static XLogRecPtr sentPtr = InvalidXLogRecPtr; static const walproposer_api walprop_pg; static void nwp_shmem_startup_hook(void); @@ -75,8 +75,6 @@ static shmem_request_hook_type prev_shmem_request_hook = NULL; static void walproposer_shmem_request(void); #endif -static XLogRecPtr sentPtr = InvalidXLogRecPtr; - static void StartProposerReplication(WalProposer *wp, StartReplicationCmd *cmd); static void WalSndLoop(WalProposer *wp); static void XLogBroadcastWalProposer(WalProposer *wp); @@ -235,13 +233,13 @@ backpressure_lag_impl(void) /* * WalproposerShmemSize --- report amount of shared memory space needed */ -Size +static Size WalproposerShmemSize(void) { return sizeof(WalproposerShmemState); } -bool +static bool WalproposerShmemInit(void) { bool found; @@ -401,14 +399,14 @@ replication_feedback_get_lsns(XLogRecPtr *writeLsn, XLogRecPtr *flushLsn, XLogRe * Start walsender streaming replication */ static void -walprop_pg_start_streaming(WalProposer *wp, XLogRecPtr startpos, TimeLineID timeline) +walprop_pg_start_streaming(WalProposer *wp, XLogRecPtr startpos) { StartReplicationCmd cmd; elog(LOG, "WAL proposer starts streaming at %X/%X", LSN_FORMAT_ARGS(startpos)); cmd.slotname = WAL_PROPOSER_SLOT_NAME; - cmd.timeline = timeline; + cmd.timeline = wp->greetRequest.timeline; cmd.startpoint = startpos; StartProposerReplication(wp, &cmd); } @@ -1632,9 +1630,6 @@ walprop_pg_confirm_wal_streamed(XLogRecPtr lsn) PhysicalConfirmReceivedLocation(lsn); } -/* - * Temporary globally exported walproposer API for postgres. - */ static const walproposer_api walprop_pg = { .get_shmem_state = walprop_pg_get_shmem_state, .start_streaming = walprop_pg_start_streaming, @@ -1647,7 +1642,6 @@ static const walproposer_api walprop_pg = { .conn_connect_poll = walprop_connect_poll, .conn_send_query = walprop_send_query, .conn_get_query_result = walprop_get_query_result, - .conn_socket = walprop_socket, .conn_flush = walprop_flush, .conn_finish = walprop_finish, .conn_async_read = walprop_async_read,