Add comments

This commit is contained in:
Arthur Petukhovsky
2023-09-27 12:45:14 +00:00
parent 3bf6ecb2a6
commit 1a19b4d2e7
4 changed files with 167 additions and 24 deletions

View File

@@ -18,6 +18,10 @@ extern char *neon_auth_token;
extern char *neon_timeline;
extern char *neon_tenant;
extern char *wal_acceptors_list;
extern int wal_acceptor_reconnect_timeout;
extern int wal_acceptor_connection_timeout;
extern void pg_init_libpagestore(void);
extern void pg_init_walproposer(void);

View File

@@ -7,9 +7,9 @@
*
* We have two ways of launching WalProposer:
*
* 1. As a background worker which will run physical WalSender with
* am_wal_proposer flag set to true. WalSender in turn would handle WAL
* reading part and call WalProposer when ready to scatter WAL.
* 1. As a background worker which will pretend to be physical WalSender.
* WalProposer will receive notifications about new available WAL and
* will immediately broadcast it to alive safekeepers.
*
* 2. As a standalone utility by running `postgres --sync-safekeepers`. That
* is needed to create LSN from which it is safe to start postgres. More
@@ -28,6 +28,10 @@
* playing consensus game is impossible, so speculative 'let's just poll
* safekeepers, learn start LSN of future epoch and run basebackup'
* won't work.
*
* Both ways are implemented in walproposer_pg.c file. This file contains
* generic part of walproposer which can be used in both cases, but can also
* be used as an independent library.
*
*-------------------------------------------------------------------------
*/
@@ -227,13 +231,14 @@ WalProposerPoll(WalProposer *wp)
}
now = wp->api.get_current_timestamp();
if (rc == 0 || TimeToReconnect(wp, now) <= 0) /* timeout expired: poll state */
/* timeout expired: poll state */
if (rc == 0 || TimeToReconnect(wp, now) <= 0)
{
TimestampTz now;
/*
* If no WAL was generated during timeout (and we have already
* collected the quorum), then send pool message
* collected the quorum), then send empty keepalive message
*/
if (wp->availableLsn != InvalidXLogRecPtr)
{
@@ -904,7 +909,7 @@ HandleElectedProposer(WalProposer *wp)
return;
}
wp->api.start_streaming(wp, wp->propEpochStartLsn, wp->greetRequest.timeline);
wp->api.start_streaming(wp, wp->propEpochStartLsn);
/* Should not return here */
}

View File

@@ -22,10 +22,6 @@
*/
#define WL_NO_EVENTS 0
extern char *wal_acceptors_list;
extern int wal_acceptor_reconnect_timeout;
extern int wal_acceptor_connection_timeout;
struct WalProposerConn; /* Defined in implementation (walprop_pg.c) */
typedef struct WalProposerConn WalProposerConn;
@@ -433,66 +429,198 @@ typedef enum
*/
typedef struct walproposer_api
{
/*
* Get WalproposerShmemState. This is used to store information about
* last elected term.
*/
WalproposerShmemState * (*get_shmem_state) (void);
void (*start_streaming) (WalProposer *wp, XLogRecPtr startpos, TimeLineID timeline);
/*
* Start receiving notifications about new WAL. This is an infinite loop
* which calls WalProposerBroadcast() and WalProposerPoll() to send the WAL.
*/
void (*start_streaming) (WalProposer *wp, XLogRecPtr startpos);
/* Get pointer to the latest available WAL. */
XLogRecPtr (*get_flush_rec_ptr) (void);
/* Get current time. */
TimestampTz (*get_current_timestamp) (void);
/* Get postgres timeline. */
TimeLineID (*get_timeline_id) (void);
/* Current error message, aka PQerrorMessage. */
char * (*conn_error_message) (WalProposerConn * conn);
/* Connection status, aka PQstatus. */
WalProposerConnStatusType (*conn_status) (WalProposerConn * conn);
/* Start the connection, aka PQconnectStart. */
WalProposerConn * (*conn_connect_start) (char *conninfo);
/* Poll an asynchronous connection, aka PQconnectPoll. */
WalProposerConnectPollStatusType (*conn_connect_poll) (WalProposerConn * conn);
/* Send a blocking SQL query, aka PQsendQuery. */
bool (*conn_send_query) (WalProposerConn * conn, char * query);
/* Read the query result, aka PQgetResult. */
WalProposerExecStatusType (*conn_get_query_result) (WalProposerConn * conn);
pgsocket (*conn_socket) (WalProposerConn * conn);
/* Flush buffer to the network, aka PQflush. */
int (*conn_flush) (WalProposerConn * conn);
/* Close the connection, aka PQfinish. */
void (*conn_finish) (WalProposerConn * conn);
/* Try to read CopyData message, aka PQgetCopyData. */
PGAsyncReadResult (*conn_async_read) (WalProposerConn * conn, char **buf, int *amount);
/* Try to write CopyData message, aka PQputCopyData. */
PGAsyncWriteResult (*conn_async_write) (WalProposerConn * conn, void const *buf, size_t size);
/* Blocking CopyData write, aka PQputCopyData + PQflush. */
bool (*conn_blocking_write) (WalProposerConn * conn, void const *buf, size_t size);
/* Download WAL from startpos to endpos and make it available locally. */
bool (*recovery_download) (Safekeeper * sk, TimeLineID timeline, XLogRecPtr startpos, XLogRecPtr endpos);
/* Read WAL from disk to buf. */
void (*wal_read) (XLogReaderState *state, char *buf, XLogRecPtr startptr, Size count);
/* Allocate WAL reader. */
XLogReaderState * (*wal_reader_allocate) (void);
/* Deallocate event set. */
void (*free_event_set) (void);
/* Initialize event set. */
void (*init_event_set) (int n_safekeepers);
/* Update events for an existing safekeeper connection. */
void (*update_event_set) (Safekeeper * sk, uint32 events);
/* Add a new safekeeper connection to the event set. */
void (*add_safekeeper_event_set) (Safekeeper * sk, uint32 events);
/*
* Wait until some event happens:
* - timeout is reached
* - socket event for safekeeper connection
* - new WAL is available
*
* Returns 0 if timeout is reached, 1 if some event happened. Updates events mask
* to indicate events and sets sk to the safekeeper which has an event.
*/
int (*wait_event_set) (long timeout, Safekeeper **sk, uint32 *events);
/* Read random bytes. */
bool (*strong_random) (void *buf, size_t len);
/*
* Get a basebackup LSN. Used to cross-validate with the latest available
* LSN on the safekeepers.
*/
XLogRecPtr (*get_redo_start_lsn) (void);
/*
* Finish sync safekeepers with the given LSN. This function should not
* return and should exit the program.
*/
void (*finish_sync_safekeepers) (XLogRecPtr lsn);
/*
* Called after every new message from the safekeeper. Used to propagate
* backpressure feedback and to confirm WAL persistence (has been commited on
* the quorum of safekeepers).
*/
void (*process_safekeeper_feedback) (WalProposer *wp, XLogRecPtr commitLsn);
/*
* Called on peer_horizon_lsn updates. Used to advance replication slot
* and to free up disk space by deleting unnecessary WAL.
*/
void (*confirm_wal_streamed) (XLogRecPtr lsn);
} walproposer_api;
/*
* Configuration of the WAL proposer.
*/
typedef struct WalProposerConfig {
/* hex-encoded TenantId cstr */
char *neon_tenant;
/* hex-encoded TimelineId cstr */
char *neon_timeline;
/*
* Comma-separated list of safekeepers, in the following format:
* host1:port1,host2:port2,host3:port3
*
* This cstr should be editable.
*/
char *safekeepers_list;
/*
* WalProposer reconnects to offline safekeepers once in this interval.
* Time is in milliseconds.
*/
int safekeeper_reconnect_timeout;
/*
* WalProposer terminates the connection if it doesn't receive any message
* from the safekeeper in this interval. Time is in milliseconds.
*/
int safekeeper_connection_timeout;
/*
* WAL segment size. Will be passed to safekeepers in greet request.
* Also used to detect page headers.
*/
int wal_segment_size;
/*
* If safekeeper was started in sync mode, walproposer will not subscribe
* for new WAL and will exit when quorum of safekeepers will be synced
* to the latest available LSN.
*/
bool syncSafekeepers;
/* Will be passed to safekeepers in greet request. */
uint64 systemId;
} WalProposerConfig;
/*
* WAL proposer state.
*/
typedef struct WalProposer {
WalProposerConfig *config;
int n_safekeepers;
/* (n_safekeepers / 2) + 1 */
int quorum;
Safekeeper safekeeper[MAX_SAFEKEEPERS];
/* WAL has been generated up to this point */
XLogRecPtr availableLsn;
/* last commitLsn broadcast to safekeepers */
/* last commitLsn broadcasted to safekeepers */
XLogRecPtr lastSentCommitLsn;
ProposerGreeting greetRequest;
/* Vote request for safekeeper */
VoteRequest voteRequest;
/*
* Minimal LSN which may be needed for recovery of some safekeeper,
* record-aligned (first record which might not yet received by someone).
*/
XLogRecPtr truncateLsn;
/*
* Term of the proposer. We want our term to be highest and unique,
* so we collect terms from safekeepers quorum, choose max and +1.
@@ -501,19 +629,31 @@ typedef struct WalProposer {
* running compute, so we must stop immediately.
*/
term_t propTerm;
/* term history of the proposer */
TermHistory propTermHistory;
/* epoch start lsn of the proposer */
XLogRecPtr propEpochStartLsn;
/* Most advanced acceptor epoch */
term_t donorEpoch;
/* Most advanced acceptor */
int donor;
/* timeline globally starts at this LSN */
XLogRecPtr timelineStartLsn;
/* number of votes collected from safekeepers */
int n_votes;
/* number of successful connections over the lifetime of walproposer */
int n_connected;
/* Timestamp of the last reconnection attempt. Related to config->safekeeper_reconnect_timeout */
TimestampTz last_reconnect_attempt;
walproposer_api api;
} WalProposer;

View File

@@ -52,7 +52,7 @@ int wal_acceptor_connection_timeout = 10000;
static AppendResponse quorumFeedback;
static WalproposerShmemState * walprop_shared;
static WalProposerConfig walprop_config;
static XLogRecPtr sentPtr = InvalidXLogRecPtr;
static const walproposer_api walprop_pg;
static void nwp_shmem_startup_hook(void);
@@ -75,8 +75,6 @@ static shmem_request_hook_type prev_shmem_request_hook = NULL;
static void walproposer_shmem_request(void);
#endif
static XLogRecPtr sentPtr = InvalidXLogRecPtr;
static void StartProposerReplication(WalProposer *wp, StartReplicationCmd *cmd);
static void WalSndLoop(WalProposer *wp);
static void XLogBroadcastWalProposer(WalProposer *wp);
@@ -235,13 +233,13 @@ backpressure_lag_impl(void)
/*
* WalproposerShmemSize --- report amount of shared memory space needed
*/
Size
static Size
WalproposerShmemSize(void)
{
return sizeof(WalproposerShmemState);
}
bool
static bool
WalproposerShmemInit(void)
{
bool found;
@@ -401,14 +399,14 @@ replication_feedback_get_lsns(XLogRecPtr *writeLsn, XLogRecPtr *flushLsn, XLogRe
* Start walsender streaming replication
*/
static void
walprop_pg_start_streaming(WalProposer *wp, XLogRecPtr startpos, TimeLineID timeline)
walprop_pg_start_streaming(WalProposer *wp, XLogRecPtr startpos)
{
StartReplicationCmd cmd;
elog(LOG, "WAL proposer starts streaming at %X/%X",
LSN_FORMAT_ARGS(startpos));
cmd.slotname = WAL_PROPOSER_SLOT_NAME;
cmd.timeline = timeline;
cmd.timeline = wp->greetRequest.timeline;
cmd.startpoint = startpos;
StartProposerReplication(wp, &cmd);
}
@@ -1632,9 +1630,6 @@ walprop_pg_confirm_wal_streamed(XLogRecPtr lsn)
PhysicalConfirmReceivedLocation(lsn);
}
/*
* Temporary globally exported walproposer API for postgres.
*/
static const walproposer_api walprop_pg = {
.get_shmem_state = walprop_pg_get_shmem_state,
.start_streaming = walprop_pg_start_streaming,
@@ -1647,7 +1642,6 @@ static const walproposer_api walprop_pg = {
.conn_connect_poll = walprop_connect_poll,
.conn_send_query = walprop_send_query,
.conn_get_query_result = walprop_get_query_result,
.conn_socket = walprop_socket,
.conn_flush = walprop_flush,
.conn_finish = walprop_finish,
.conn_async_read = walprop_async_read,