From 4e2c70b0e8cd036bd9fe3c284cca667b2caa1f94 Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Mon, 20 Nov 2023 22:16:22 +0100 Subject: [PATCH] wip --- Makefile | 42 ++--- pgxn/neon/neon_walreader.c | 374 +++++++++++++++++++++++++++++++------ pgxn/neon/neon_walreader.h | 20 +- pgxn/neon/walproposer.c | 75 +++++++- pgxn/neon/walproposer.h | 26 ++- pgxn/neon/walproposer_pg.c | 131 +++++++++++-- 6 files changed, 563 insertions(+), 105 deletions(-) diff --git a/Makefile b/Makefile index 4bfab30044..840519713e 100644 --- a/Makefile +++ b/Makefile @@ -126,32 +126,32 @@ postgres-check-%: postgres-% $(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$* MAKELEVEL=0 check .PHONY: neon-pg-ext-% -neon-pg-ext-%: postgres-% +neon-pg-ext-%: # postgres-% +@echo "Compiling neon $*" mkdir -p $(POSTGRES_INSTALL_DIR)/build/neon-$* $(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config CFLAGS='$(PG_CFLAGS) $(COPT)' \ -C $(POSTGRES_INSTALL_DIR)/build/neon-$* \ -f $(ROOT_PROJECT_DIR)/pgxn/neon/Makefile install - +@echo "Compiling neon_walredo $*" - mkdir -p $(POSTGRES_INSTALL_DIR)/build/neon-walredo-$* - $(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config CFLAGS='$(PG_CFLAGS) $(COPT)' \ - -C $(POSTGRES_INSTALL_DIR)/build/neon-walredo-$* \ - -f $(ROOT_PROJECT_DIR)/pgxn/neon_walredo/Makefile install - +@echo "Compiling neon_rmgr $*" - mkdir -p $(POSTGRES_INSTALL_DIR)/build/neon-rmgr-$* - $(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config CFLAGS='$(PG_CFLAGS) $(COPT)' \ - -C $(POSTGRES_INSTALL_DIR)/build/neon-rmgr-$* \ - -f $(ROOT_PROJECT_DIR)/pgxn/neon_rmgr/Makefile install - +@echo "Compiling neon_test_utils $*" - mkdir -p $(POSTGRES_INSTALL_DIR)/build/neon-test-utils-$* - $(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config CFLAGS='$(PG_CFLAGS) $(COPT)' \ - -C $(POSTGRES_INSTALL_DIR)/build/neon-test-utils-$* \ - -f $(ROOT_PROJECT_DIR)/pgxn/neon_test_utils/Makefile install - +@echo "Compiling neon_utils $*" - mkdir -p $(POSTGRES_INSTALL_DIR)/build/neon-utils-$* - $(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config CFLAGS='$(PG_CFLAGS) $(COPT)' \ - -C $(POSTGRES_INSTALL_DIR)/build/neon-utils-$* \ - -f $(ROOT_PROJECT_DIR)/pgxn/neon_utils/Makefile install + # +@echo "Compiling neon_walredo $*" + # mkdir -p $(POSTGRES_INSTALL_DIR)/build/neon-walredo-$* + # $(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config CFLAGS='$(PG_CFLAGS) $(COPT)' \ + # -C $(POSTGRES_INSTALL_DIR)/build/neon-walredo-$* \ + # -f $(ROOT_PROJECT_DIR)/pgxn/neon_walredo/Makefile install + # +@echo "Compiling neon_rmgr $*" + # mkdir -p $(POSTGRES_INSTALL_DIR)/build/neon-rmgr-$* + # $(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config CFLAGS='$(PG_CFLAGS) $(COPT)' \ + # -C $(POSTGRES_INSTALL_DIR)/build/neon-rmgr-$* \ + # -f $(ROOT_PROJECT_DIR)/pgxn/neon_rmgr/Makefile install + # +@echo "Compiling neon_test_utils $*" + # mkdir -p $(POSTGRES_INSTALL_DIR)/build/neon-test-utils-$* + # $(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config CFLAGS='$(PG_CFLAGS) $(COPT)' \ + # -C $(POSTGRES_INSTALL_DIR)/build/neon-test-utils-$* \ + # -f $(ROOT_PROJECT_DIR)/pgxn/neon_test_utils/Makefile install + # +@echo "Compiling neon_utils $*" + # mkdir -p $(POSTGRES_INSTALL_DIR)/build/neon-utils-$* + # $(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config CFLAGS='$(PG_CFLAGS) $(COPT)' \ + # -C $(POSTGRES_INSTALL_DIR)/build/neon-utils-$* \ + # -f $(ROOT_PROJECT_DIR)/pgxn/neon_utils/Makefile install .PHONY: neon-pg-ext-clean-% neon-pg-ext-clean-%: diff --git a/pgxn/neon/neon_walreader.c b/pgxn/neon/neon_walreader.c index ab94987b74..4ce0bc3ced 100644 --- a/pgxn/neon/neon_walreader.c +++ b/pgxn/neon/neon_walreader.c @@ -3,6 +3,12 @@ * missing + doesn't attempt to read WAL before specified horizon -- basebackup * LSN. Missing WAL should be fetched by peer recovery, or, alternatively, on * demand WAL fetching from safekeepers should be implemented in NeonWALReader. + * + * We can't use libpqwalreceiver as it blocks during connection establishment + * (and waiting for PQExec result), so use libpqwalproposer instead. + * + * TODO: keepalives are currently never sent, so the other side can close the + * connection prematurely. */ #include "postgres.h" @@ -14,27 +20,80 @@ #include "storage/fd.h" #include "utils/wait_event.h" +#include "libpq-fe.h" + #include "neon_walreader.h" #include "walproposer.h" -#define NEON_WALREADER_ERR_MSG_LEN 128 +#define NEON_WALREADER_ERR_MSG_LEN 256 +static NeonWALReadResult NeonWALReadRemote(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli); +static void NeonWALReaderResetRemote(NeonWALReader *state); +static bool NeonWALReadLocal(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli); +static bool neon_wal_segment_open(NeonWALReader *state, XLogSegNo nextSegNo, TimeLineID *tli_p); static void neon_wal_segment_close(NeonWALReader *state); +/* + * State of connection to donor safekeeper. + */ +typedef enum +{ + /* no remote connection */ + RS_NONE, + /* doing PQconnectPoll, need readable socket */ + RS_CONNECTING_READ, + /* doing PQconnectPoll, need writable socket */ + RS_CONNECTING_WRITE, + /* Waiting for START_REPLICATION result */ + RS_WAIT_EXEC_RESULT, + /* replication stream established */ + RS_ESTABLISHED, +} NeonWALReaderRemoteState; + struct NeonWALReader { - /* LSN before */ + /* + * LSN before which we assume WAL is not available locally. Exists because + * though first segment after startup always exists, part before + * basebackup LSN is filled with zeros. + */ XLogRecPtr available_lsn; WALSegmentContext segcxt; WALOpenSegment seg; int wre_errno; /* Explains failure to read, static for simplicity. */ char err_msg[NEON_WALREADER_ERR_MSG_LEN]; + + /* + * Saved info about request in progress, used to check validity of + * arguments after resume and remember how far we accomplished it. req_lsn + * is 0 if there is no request in progress. + */ + XLogRecPtr req_lsn; + Size req_len; + Size req_progress; + WalProposer *wp; /* we learn donor through walproposer */ + char donor_name[64]; /* saved donor safekeeper name for logging */ + /* state of connection to safekeeper */ + NeonWALReaderRemoteState rem_state; + WalProposerConn *wp_conn; + + /* + * position in recvbuf from which we'll copy WAL next time, or NULL if + * there is no unprocessed message + */ + char *wal_ptr; + + /* + * LSN of wal_ptr position according to walsender to cross check against + * read request + */ + XLogRecPtr rem_lsn; }; /* palloc and initialize NeonWALReader */ NeonWALReader * -NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn) +NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn, WalProposer *wp) { NeonWALReader *reader; @@ -50,6 +109,8 @@ NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn) reader->seg.ws_tli = 0; reader->segcxt.ws_segsize = wal_segment_size; + reader->wp = wp; + return reader; } @@ -61,65 +122,242 @@ NeonWALReaderFree(NeonWALReader *state) pfree(state); } - /* - * Copy of vanilla wal_segment_open, but returns false in case of error instead - * of ERROR, with errno set. - * - * XLogReaderRoutine->segment_open callback for local pg_wal files - */ -static bool -neon_wal_segment_open(NeonWALReader *state, XLogSegNo nextSegNo, - TimeLineID *tli_p) -{ - TimeLineID tli = *tli_p; - char path[MAXPGPATH]; - - XLogFilePath(path, tli, nextSegNo, state->segcxt.ws_segsize); - walprop_log(LOG, "opening %s", path); - state->seg.ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY); - if (state->seg.ws_file >= 0) - return true; - - return false; -} - -/* copy of vanilla wal_segment_close with NeonWALReader */ -void -neon_wal_segment_close(NeonWALReader *state) -{ - close(state->seg.ws_file); - /* need to check errno? */ - state->seg.ws_file = -1; -} - -/* - * Mostly copy of vanilla WALRead, but 1) returns error if requested data before - * available_lsn 2) returns error is segment is missing instead of throwing - * ERROR. + * Like vanilla WALRead, but if requested position is before available_lsn or + * WAL segment doesn't exist on disk, it tries to fetch needed segment from the + * advanced safekeeper. * * Read 'count' bytes into 'buf', starting at location 'startptr', from WAL * fetched from timeline 'tli'. * - * Returns true if succeeded, false if an error occurs, in which case - * 'state->errno' shows whether it was missing WAL (ENOENT) or something else, - * and 'err' the desciption. + * Returns NEON_WALREAD_SUCCESS if succeeded, NEON_WALREAD_ERROR if an error + * occurs, in which case 'err' has the desciption. Error always closes remote + * connection, if there was any, so socket subscription should be removed. + * + * NEON_WALREAD_WOULDBLOCK means caller should obtain socket to wait for with + * NeonWALReaderSocket and call NeonWALRead again with exactly the same + * arguments when NeonWALReaderEvents happen on the socket. Note that per libpq + * docs during connection establishment (before first successful read) socket + * underneath might change. + * + * Also, eventually walreader should switch from remote to local read; caller + * should remove subscription to socket then by checking NeonWALReaderEvents + * after successful read (otherwise next read might reopen the connection with + * different socket). + * + * Reading not monotonically is not supported and will result in error. + * + * Caller should be sure that WAL up to requested LSN exists, otherwise + * NEON_WALREAD_WOULDBLOCK might be always returned. */ -bool +NeonWALReadResult NeonWALRead(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli) +{ + /* + * If requested data is before known available basebackup lsn or there is + * already active remote state, do remote read. + */ + if (startptr < state->available_lsn || state->rem_state != RS_NONE) + { + return NeonWALReadRemote(state, buf, startptr, count, tli); + } + if (NeonWALReadLocal(state, buf, startptr, count, tli)) + { + return NEON_WALREAD_SUCCESS; + } + else if (state->wre_errno == ENOENT) + { + elog(LOG, "local read failed with segment doesn't exist, attempting remote"); + return NeonWALReadRemote(state, buf, startptr, count, tli); + } + else + { + return NEON_WALREAD_ERROR; + } +} + +static NeonWALReadResult +NeonWALReadRemote(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli) +{ + if (state->rem_state == RS_NONE) + { + /* no connection yet; start one */ + Safekeeper *donor = GetDonor(state->wp); + + if (donor == NULL) + { + snprintf(state->err_msg, sizeof(state->err_msg), + "failed to establish remote connection to fetch WAL: no donor available"); + return NEON_WALREAD_ERROR; + } + snprintf(state->donor_name, sizeof(state->donor_name), "%s:%s", donor->host, donor->port); + elog(LOG, "establishing connection to %s to fetch WAL", state->donor_name); + state->wp_conn = libpqwp_connect_start(donor->conninfo); + if (PQstatus(state->wp_conn->pg_conn) == CONNECTION_BAD) + { + snprintf(state->err_msg, sizeof(state->err_msg), + "failed to connect to %s:%s to fetch WAL: immediately failed with %s", + state->donor_name, PQerrorMessage(state->wp_conn->pg_conn)); + NeonWALReaderResetRemote(state); + return NEON_WALREAD_ERROR; + } + /* we'll poll immediately */ + state->rem_state = RS_CONNECTING_READ; + } + + if (state->rem_state == RS_CONNECTING_READ || state->rem_state == RS_CONNECTING_WRITE) + { + switch (PQconnectPoll(state->wp_conn->pg_conn)) + { + case PGRES_POLLING_FAILED: + snprintf(state->err_msg, sizeof(state->err_msg), + "failed to connect to %s to fetch WAL: poll error: %s", + state->donor_name, PQerrorMessage(state->wp_conn->pg_conn)); + NeonWALReaderResetRemote(state); + return NEON_WALREAD_ERROR; + case PGRES_POLLING_READING: + state->rem_state = RS_CONNECTING_READ; + return NEON_WALREAD_WOULDBLOCK; + case PGRES_POLLING_WRITING: + state->rem_state = RS_CONNECTING_WRITE; + return NEON_WALREAD_WOULDBLOCK; + case PGRES_POLLING_OK: + { + /* connection successfully established */ + char start_repl_query[128]; + + snprintf(start_repl_query, sizeof(start_repl_query), + "START_REPLICATION PHYSICAL %X/%X (term='" UINT64_FORMAT "')", + LSN_FORMAT_ARGS(startptr), state->wp->propTerm); + elog(LOG, "connection to %s to fetch WAL succeeded, running %s", + state->donor_name, start_repl_query); + if (!libpqwp_send_query(state->wp_conn, start_repl_query)) + { + snprintf(state->err_msg, sizeof(state->err_msg), + "failed to send %s query to %s: %s", + start_repl_query, state->donor_name, PQerrorMessage(state->wp_conn->pg_conn)); + NeonWALReaderResetRemote(state); + return NEON_WALREAD_ERROR; + } + state->rem_state = RS_WAIT_EXEC_RESULT; + break; + } + + default: /* there is unused PGRES_POLLING_ACTIVE */ + Assert(false); + return NEON_WALREAD_ERROR; /* keep the compiler quiet */ + } + } + + if (state->rem_state == RS_WAIT_EXEC_RESULT) + { + switch (libpqwp_get_query_result(state->wp_conn)) + { + case WP_EXEC_SUCCESS_COPYBOTH: + state->rem_state = RS_ESTABLISHED; + break; + case WP_EXEC_NEEDS_INPUT: + return NEON_WALREAD_WOULDBLOCK; + case WP_EXEC_FAILED: + snprintf(state->err_msg, sizeof(state->err_msg), + "get result from %s failed: %s", + state->donor_name, PQerrorMessage(state->wp_conn->pg_conn)); + NeonWALReaderResetRemote(state); + return NEON_WALREAD_ERROR; + default: /* can't happen */ + snprintf(state->err_msg, sizeof(state->err_msg), + "get result from %s: unexpected result", + state->donor_name); + NeonWALReaderResetRemote(state); + return NEON_WALREAD_ERROR; + } + } + + Assert(state->rem_state == RS_ESTABLISHED); + + /* + * If we had the request before, verify args are the same and advance the + * result ptr according to the progress; otherwise register the request. + */ + if (state->req_lsn != InvalidXLogRecPtr) + { + if (state->req_lsn != startptr || state->req_len != count) + { + snprintf(state->err_msg, sizeof(state->err_msg), + "args changed during request, was %X/%X %zu, now %X/%X %zu", + LSN_FORMAT_ARGS(state->req_lsn), state->req_len, LSN_FORMAT_ARGS(startptr), count); + NeonWALReaderResetRemote(state); + return NEON_WALREAD_ERROR; + } + elog(LOG, "moving ptr by %zu bytes restoring progress, req_lsn = %X/%X", state->req_progress, LSN_FORMAT_ARGS(startptr)); + buf += state->req_progress; + } + + snprintf(state->err_msg, sizeof(state->err_msg), "remote read failed: not implemented"); + return NEON_WALREAD_ERROR; +} + +/* reset remote connection and request in progress */ +static void +NeonWALReaderResetRemote(NeonWALReader *state) +{ + state->req_lsn = InvalidXLogRecPtr; + state->req_len = 0; + state->req_progress = 0; + state->rem_state = RS_NONE; + if (state->wp_conn) + { + libpqwp_disconnect(state->wp_conn); + state->wp_conn = NULL; + } + state->donor_name[0] = '\0'; + state->wal_ptr = NULL; + state->rem_lsn = InvalidXLogRecPtr; +} + +/* + * Return socket of connection to remote source. Must be called only when + * connection exists (NeonWALReaderEvents returns non zero). + */ +pgsocket +NeonWALReaderSocket(NeonWALReader *state) +{ + if (!state->wp_conn) + elog(FATAL, "NeonWALReaderSocket is called without active remote connection"); + return PQsocket(state->wp_conn->pg_conn); +} + +/* + * Returns events user should wait on connection socket or 0 if remote + * connection is not active. + */ +extern uint32 +NeonWALReaderEvents(NeonWALReader *state) +{ + switch (state->rem_state) + { + case RS_NONE: + return 0; + case RS_CONNECTING_READ: + return WL_SOCKET_READABLE; + case RS_CONNECTING_WRITE: + return WL_SOCKET_WRITEABLE; + case RS_WAIT_EXEC_RESULT: + case RS_ESTABLISHED: + return WL_SOCKET_READABLE; + default: + Assert(false); + break; + } +} + +static bool +NeonWALReadLocal(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli) { char *p; XLogRecPtr recptr; Size nbytes; - if (startptr < state->available_lsn) - { - state->wre_errno = 0; - snprintf(state->err_msg, sizeof(state->err_msg), "failed to read WAL at %X/%X which is earlier than available %X/%X", - LSN_FORMAT_ARGS(startptr), LSN_FORMAT_ARGS(state->available_lsn)); - return false; - } - p = buf; recptr = startptr; nbytes = count; @@ -143,8 +381,7 @@ NeonWALRead(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size count, Ti { XLogSegNo nextSegNo; - if (state->seg.ws_file >= 0) - neon_wal_segment_close(state); + neon_wal_segment_close(state); XLByteToSeg(recptr, nextSegNo, state->segcxt.ws_segsize); if (!neon_wal_segment_open(state, nextSegNo, &tli)) @@ -212,7 +449,40 @@ NeonWALRead(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size count, Ti } return true; +} +/* + * Copy of vanilla wal_segment_open, but returns false in case of error instead + * of ERROR, with errno set. + * + * XLogReaderRoutine->segment_open callback for local pg_wal files + */ +static bool +neon_wal_segment_open(NeonWALReader *state, XLogSegNo nextSegNo, + TimeLineID *tli_p) +{ + TimeLineID tli = *tli_p; + char path[MAXPGPATH]; + + XLogFilePath(path, tli, nextSegNo, state->segcxt.ws_segsize); + walprop_log(LOG, "opening %s", path); + state->seg.ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY); + if (state->seg.ws_file >= 0) + return true; + + return false; +} + +/* copy of vanilla wal_segment_close with NeonWALReader */ +static void +neon_wal_segment_close(NeonWALReader *state) +{ + if (state->seg.ws_file >= 0) + { + close(state->seg.ws_file); + /* need to check errno? */ + state->seg.ws_file = -1; + } } char * diff --git a/pgxn/neon/neon_walreader.h b/pgxn/neon/neon_walreader.h index dfb50622a7..098588f4db 100644 --- a/pgxn/neon/neon_walreader.h +++ b/pgxn/neon/neon_walreader.h @@ -7,7 +7,23 @@ struct NeonWALReader; typedef struct NeonWALReader NeonWALReader; -extern NeonWALReader *NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn); +/* avoid including walproposer.h as it includes us */ +struct WalProposer; +typedef struct WalProposer WalProposer; + +/* NeonWALRead return value */ +typedef enum +{ + NEON_WALREAD_SUCCESS, + NEON_WALREAD_WOULDBLOCK, + NEON_WALREAD_ERROR, +} NeonWALReadResult; + +extern NeonWALReader *NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn, WalProposer *wp); extern void NeonWALReaderFree(NeonWALReader *state); -extern bool NeonWALRead(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli); +extern NeonWALReadResult NeonWALRead(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli); +extern pgsocket NeonWALReaderSocket(NeonWALReader *state); +extern uint32 NeonWALReaderEvents(NeonWALReader *state); extern char *NeonWALReaderErrMsg(NeonWALReader *state); + +#endif /* __NEON_WALREADER_H__ */ diff --git a/pgxn/neon/walproposer.c b/pgxn/neon/walproposer.c index 7dd652c232..6cc3f2a840 100644 --- a/pgxn/neon/walproposer.c +++ b/pgxn/neon/walproposer.c @@ -1334,17 +1334,28 @@ SendAppendRequests(Safekeeper *sk) /* write the WAL itself */ enlargeStringInfo(&sk->outbuf, req->endLsn - req->beginLsn); /* wal_read will raise error on failure */ - if (!wp->api.wal_read(sk, - &sk->outbuf.data[sk->outbuf.len], - req->beginLsn, - req->endLsn - req->beginLsn)) + switch (wp->api.wal_read(sk, + &sk->outbuf.data[sk->outbuf.len], + req->beginLsn, + req->endLsn - req->beginLsn)) { - walprop_log(WARNING, "WAL reading for node %s:%s failed: %s", - sk->host, sk->port, - NeonWALReaderErrMsg(sk->xlogreader)); - ShutdownConnection(sk); - return false; + case NEON_WALREAD_SUCCESS: + break; + case NEON_WALREAD_WOULDBLOCK: + walprop_log(LOG, "wal reading wouldblock"); + /* todo */ + ShutdownConnection(sk); + return false; + case NEON_WALREAD_ERROR: + walprop_log(WARNING, "WAL reading for node %s:%s failed: %s", + sk->host, sk->port, + NeonWALReaderErrMsg(sk->xlogreader)); + ShutdownConnection(sk); + return false; + default: + Assert(false); } + sk->outbuf.len += req->endLsn - req->beginLsn; writeResult = wp->api.conn_async_write(sk, sk->outbuf.data, sk->outbuf.len); @@ -1567,6 +1578,52 @@ GetAcknowledgedByQuorumWALPosition(WalProposer *wp) return responses[wp->n_safekeepers - wp->quorum]; } +/* + * Return safekeeper with active connection from which WAL can be downloaded, or + * none if it doesn't exist. + */ +Safekeeper * +GetDonor(WalProposer *wp) +{ + XLogRecPtr donor_lsn = InvalidXLogRecPtr; + Safekeeper *donor = NULL; + int i; + + if (wp->n_votes < wp->quorum) + { + walprop_log(WARNING, "GetDonor called before elections are winned"); + return NULL; + } + + /* + * First, consider node which had determined our term start LSN as we know + * about its position immediately after election before any feedbacks are + * sent. + */ + if (wp->safekeeper[wp->donor].state == SS_ACTIVE) + { + donor = &wp->safekeeper[wp->donor]; + donor_lsn = wp->propEpochStartLsn; + } + + /* + * But also check feedbacks from all nodes with live connections and take + * the highest one. Note: if node sends feedbacks it already processed + * elected message so its term is fine. + */ + for (i = 0; i < wp->n_safekeepers; i++) + { + Safekeeper *sk = &wp->safekeeper[i]; + + if (sk->state == SS_ACTIVE && sk->appendResponse.flushLsn >= donor_lsn) + { + donor = sk; + donor_lsn = sk->appendResponse.flushLsn; + } + } + return donor; +} + static void HandleSafekeeperResponse(WalProposer *wp) { diff --git a/pgxn/neon/walproposer.h b/pgxn/neon/walproposer.h index 0c09702589..f3d161a300 100644 --- a/pgxn/neon/walproposer.h +++ b/pgxn/neon/walproposer.h @@ -97,6 +97,23 @@ typedef enum */ SS_IDLE, + /* + * Waiting for neon walreader socket to receive chunk of WAL to send to + * this safekeeper. + * + * Note: socket management is done completely inside walproposer_pg for + * simplicity, and thus simulation doesn't test it. Which is fine as + * simulation is mainly aimed at consensus checks, not waiteventset + * management. + * + * Also, while in this state we don't touch safekeeper socket, so in + * theory it might close connection as inactive. This can be addressed if + * needed; however, while fetching WAL we should regularly send it, so the + * problem is unlikely. Vice versa is also true (SS_ACTIVE doesn't handle + * walreader socket), but similarly shouldn't be a problem. + */ + SS_WAIT_REMOTE_WAL, + /* * Active phase, when we acquired quorum and have WAL to send or feedback * to read. @@ -344,6 +361,11 @@ typedef struct Safekeeper * Position in wait event set. Equal to -1 if no event */ int eventPos; + + /* + * Neon WAL reader position in wait event set, or -1 if no socket. + */ + int nwrEventPos; #endif @@ -453,7 +475,7 @@ typedef struct walproposer_api bool (*recovery_download) (Safekeeper *sk, TimeLineID timeline, XLogRecPtr startpos, XLogRecPtr endpos); /* Read WAL from disk to buf. */ - bool (*wal_read) (Safekeeper *sk, char *buf, XLogRecPtr startptr, Size count); + NeonWALReadResult (*wal_read) (Safekeeper *sk, char *buf, XLogRecPtr startptr, Size count); /* Allocate WAL reader. */ void (*wal_reader_allocate) (Safekeeper *sk); @@ -660,7 +682,7 @@ extern void WalProposerFree(WalProposer *wp); * recreate set from scratch, hence the export. */ extern uint32 SafekeeperStateDesiredEvents(SafekeeperState state); - +extern Safekeeper *GetDonor(WalProposer *wp); #define WPEVENT 1337 /* special log level for walproposer internal * events */ diff --git a/pgxn/neon/walproposer_pg.c b/pgxn/neon/walproposer_pg.c index df522e3758..a48c5a776b 100644 --- a/pgxn/neon/walproposer_pg.c +++ b/pgxn/neon/walproposer_pg.c @@ -94,6 +94,10 @@ static void XLogBroadcastWalProposer(WalProposer *wp); static void XLogWalPropWrite(char *buf, Size nbytes, XLogRecPtr recptr); static void XLogWalPropClose(XLogRecPtr recptr); +static void add_nwr_event_set(Safekeeper *sk, uint32 events); +static void update_nwr_event_set(Safekeeper *sk, uint32 events); +static void rm_safekeeper_event_set(Safekeeper *to_remove, bool is_sk); + static void init_walprop_config(bool syncSafekeepers) { @@ -1424,20 +1428,54 @@ XLogWalPropClose(XLogRecPtr recptr) walpropFile = -1; } -static bool +static NeonWALReadResult walprop_pg_wal_read(Safekeeper *sk, char *buf, XLogRecPtr startptr, Size count) { - return NeonWALRead(sk->xlogreader, - buf, - startptr, - count, - walprop_pg_get_timeline_id()); + NeonWALReadResult res; + + res = NeonWALRead(sk->xlogreader, + buf, + startptr, + count, + walprop_pg_get_timeline_id()); + switch (res) + { + case NEON_WALREAD_SUCCESS: + /* don't wake up us until we send the chunk */ + update_nwr_event_set(sk, 0); + + /* + * If we have the socket subscribed, but walreader doesn't need + * any events, it must mean that remote connection just closed + * hoping to do next read locally. Remove the socket then. + */ + if (NeonWALReaderEvents(sk->xlogreader) == 0) + rm_safekeeper_event_set(sk, false); + return res; + case NEON_WALREAD_WOULDBLOCK: + + /* + * TODO: instead of reattaching socket (and thus recreating WES) + * each time we should keep it if possible, i.e. if connection is + * already established. Note that single neon_walreader object can + * switch between local and remote reads multiple times during its + * lifetime, so careful bookkeeping is needed here. + */ + rm_safekeeper_event_set(sk, false); + add_nwr_event_set(sk, NeonWALReaderEvents(sk->xlogreader)); + return res; + case NEON_WALREAD_ERROR: + rm_safekeeper_event_set(sk, false); + return res; + default: + Assert(false); + } } static void walprop_pg_wal_reader_allocate(Safekeeper *sk) { - sk->xlogreader = NeonWALReaderAllocate(wal_segment_size, sk->wp->propEpochStartLsn); + sk->xlogreader = NeonWALReaderAllocate(wal_segment_size, sk->wp->propEpochStartLsn, sk->wp); if (sk->xlogreader == NULL) elog(FATAL, "Failed to allocate xlog reader"); } @@ -1465,13 +1503,30 @@ walprop_pg_init_event_set(WalProposer *wp) if (waitEvents) elog(FATAL, "double-initialization of event set"); - waitEvents = CreateWaitEventSet(TopMemoryContext, 2 + wp->n_safekeepers); + /* for each sk, we have socket plus potentially socket for neon walreader */ + waitEvents = CreateWaitEventSet(TopMemoryContext, 2 + 2 * wp->n_safekeepers); AddWaitEventToSet(waitEvents, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, NULL); AddWaitEventToSet(waitEvents, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET, NULL, NULL); } +/* add safekeeper socket to wait event set */ +static void +walprop_pg_add_safekeeper_event_set(Safekeeper *sk, uint32 events) +{ + Assert(sk->eventPos == -1); + sk->eventPos = AddWaitEventToSet(waitEvents, events, walprop_socket(sk), NULL, sk); +} + +/* add neon wal reader socket to wait event set */ +static void +add_nwr_event_set(Safekeeper *sk, uint32 events) +{ + Assert(sk->nwrEventPos == -1); + sk->nwrEventPos = AddWaitEventToSet(waitEvents, events, NeonWALReaderSocket(sk->xlogreader), NULL, sk); +} + static void walprop_pg_update_event_set(Safekeeper *sk, uint32 events) { @@ -1481,22 +1536,47 @@ walprop_pg_update_event_set(Safekeeper *sk, uint32 events) ModifyWaitEvent(waitEvents, sk->eventPos, events, NULL); } +/* Can be called when nwr socket doesn't exist, does nothing in this case. */ static void -walprop_pg_add_safekeeper_event_set(Safekeeper *sk, uint32 events) +update_nwr_event_set(Safekeeper *sk, uint32 events) { - sk->eventPos = AddWaitEventToSet(waitEvents, events, walprop_socket(sk), NULL, sk); + /* eventPos = -1 when we don't have an event */ + if (sk->nwrEventPos != -1) + ModifyWaitEvent(waitEvents, sk->nwrEventPos, events, NULL); } -/* - * Hack: provides a way to remove the event corresponding to an individual walproposer from the set. - * - * Note: Internally, this completely reconstructs the event set. It should be avoided if possible. - */ static void walprop_pg_rm_safekeeper_event_set(Safekeeper *to_remove) +{ + rm_safekeeper_event_set(to_remove, true); +} + +/* + * A hacky way to remove single event from the event set. Can be called if event + * doesn't exist, does nothing in this case. + * + * Note: Internally, this completely reconstructs the event set. It should be + * avoided if possible. + * + * If is_sk is true, socket of connection to safekeeper is removed; otherwise + * socket of neon wal reader. + */ +static void +rm_safekeeper_event_set(Safekeeper *to_remove, bool is_sk) { WalProposer *wp = to_remove->wp; + /* + * Shortpath for exiting if have nothing to do. We never call this + * function with safekeeper socket not existing, but do that with neon + * walreader socket. + */ + if ((is_sk && to_remove->eventPos == -1) || + (!is_sk && to_remove->nwrEventPos == -1)) + { + return; + } + /* Remove the existing event set, assign sk->eventPos = -1 */ walprop_pg_free_event_set(wp); @@ -1514,14 +1594,27 @@ walprop_pg_rm_safekeeper_event_set(Safekeeper *to_remove) Safekeeper *sk = &wp->safekeeper[i]; if (sk == to_remove) + { + if (is_sk) + sk->eventPos = -1; + else + sk->nwrEventPos = -1; continue; + } - /* If this safekeeper isn't offline, add an event for it! */ + /* If this safekeeper isn't offline, add events for it! */ if (sk->state != SS_OFFLINE) { - desired_events = SafekeeperStateDesiredEvents(sk->state); - /* will set sk->eventPos */ - wp->api.add_safekeeper_event_set(sk, desired_events); + if (!is_sk) + { + desired_events = SafekeeperStateDesiredEvents(sk->state); + /* will set sk->eventPos */ + wp->api.add_safekeeper_event_set(sk, desired_events); + } + else if (NeonWALReaderEvents(sk->xlogreader) != 0) + { + add_nwr_event_set(sk, NeonWALReaderEvents(sk->xlogreader)); + } } } }