This commit is contained in:
Arseny Sher
2023-11-20 22:16:22 +01:00
parent c982540705
commit 4e2c70b0e8
6 changed files with 563 additions and 105 deletions

View File

@@ -126,32 +126,32 @@ postgres-check-%: postgres-%
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$* MAKELEVEL=0 check
.PHONY: neon-pg-ext-%
neon-pg-ext-%: postgres-%
neon-pg-ext-%: # postgres-%
+@echo "Compiling neon $*"
mkdir -p $(POSTGRES_INSTALL_DIR)/build/neon-$*
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config CFLAGS='$(PG_CFLAGS) $(COPT)' \
-C $(POSTGRES_INSTALL_DIR)/build/neon-$* \
-f $(ROOT_PROJECT_DIR)/pgxn/neon/Makefile install
+@echo "Compiling neon_walredo $*"
mkdir -p $(POSTGRES_INSTALL_DIR)/build/neon-walredo-$*
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config CFLAGS='$(PG_CFLAGS) $(COPT)' \
-C $(POSTGRES_INSTALL_DIR)/build/neon-walredo-$* \
-f $(ROOT_PROJECT_DIR)/pgxn/neon_walredo/Makefile install
+@echo "Compiling neon_rmgr $*"
mkdir -p $(POSTGRES_INSTALL_DIR)/build/neon-rmgr-$*
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config CFLAGS='$(PG_CFLAGS) $(COPT)' \
-C $(POSTGRES_INSTALL_DIR)/build/neon-rmgr-$* \
-f $(ROOT_PROJECT_DIR)/pgxn/neon_rmgr/Makefile install
+@echo "Compiling neon_test_utils $*"
mkdir -p $(POSTGRES_INSTALL_DIR)/build/neon-test-utils-$*
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config CFLAGS='$(PG_CFLAGS) $(COPT)' \
-C $(POSTGRES_INSTALL_DIR)/build/neon-test-utils-$* \
-f $(ROOT_PROJECT_DIR)/pgxn/neon_test_utils/Makefile install
+@echo "Compiling neon_utils $*"
mkdir -p $(POSTGRES_INSTALL_DIR)/build/neon-utils-$*
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config CFLAGS='$(PG_CFLAGS) $(COPT)' \
-C $(POSTGRES_INSTALL_DIR)/build/neon-utils-$* \
-f $(ROOT_PROJECT_DIR)/pgxn/neon_utils/Makefile install
# +@echo "Compiling neon_walredo $*"
# mkdir -p $(POSTGRES_INSTALL_DIR)/build/neon-walredo-$*
# $(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config CFLAGS='$(PG_CFLAGS) $(COPT)' \
# -C $(POSTGRES_INSTALL_DIR)/build/neon-walredo-$* \
# -f $(ROOT_PROJECT_DIR)/pgxn/neon_walredo/Makefile install
# +@echo "Compiling neon_rmgr $*"
# mkdir -p $(POSTGRES_INSTALL_DIR)/build/neon-rmgr-$*
# $(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config CFLAGS='$(PG_CFLAGS) $(COPT)' \
# -C $(POSTGRES_INSTALL_DIR)/build/neon-rmgr-$* \
# -f $(ROOT_PROJECT_DIR)/pgxn/neon_rmgr/Makefile install
# +@echo "Compiling neon_test_utils $*"
# mkdir -p $(POSTGRES_INSTALL_DIR)/build/neon-test-utils-$*
# $(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config CFLAGS='$(PG_CFLAGS) $(COPT)' \
# -C $(POSTGRES_INSTALL_DIR)/build/neon-test-utils-$* \
# -f $(ROOT_PROJECT_DIR)/pgxn/neon_test_utils/Makefile install
# +@echo "Compiling neon_utils $*"
# mkdir -p $(POSTGRES_INSTALL_DIR)/build/neon-utils-$*
# $(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config CFLAGS='$(PG_CFLAGS) $(COPT)' \
# -C $(POSTGRES_INSTALL_DIR)/build/neon-utils-$* \
# -f $(ROOT_PROJECT_DIR)/pgxn/neon_utils/Makefile install
.PHONY: neon-pg-ext-clean-%
neon-pg-ext-clean-%:

View File

@@ -3,6 +3,12 @@
* missing + doesn't attempt to read WAL before specified horizon -- basebackup
* LSN. Missing WAL should be fetched by peer recovery, or, alternatively, on
* demand WAL fetching from safekeepers should be implemented in NeonWALReader.
*
* We can't use libpqwalreceiver as it blocks during connection establishment
* (and waiting for PQExec result), so use libpqwalproposer instead.
*
* TODO: keepalives are currently never sent, so the other side can close the
* connection prematurely.
*/
#include "postgres.h"
@@ -14,27 +20,80 @@
#include "storage/fd.h"
#include "utils/wait_event.h"
#include "libpq-fe.h"
#include "neon_walreader.h"
#include "walproposer.h"
#define NEON_WALREADER_ERR_MSG_LEN 128
#define NEON_WALREADER_ERR_MSG_LEN 256
static NeonWALReadResult NeonWALReadRemote(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli);
static void NeonWALReaderResetRemote(NeonWALReader *state);
static bool NeonWALReadLocal(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli);
static bool neon_wal_segment_open(NeonWALReader *state, XLogSegNo nextSegNo, TimeLineID *tli_p);
static void neon_wal_segment_close(NeonWALReader *state);
/*
* State of connection to donor safekeeper.
*/
typedef enum
{
/* no remote connection */
RS_NONE,
/* doing PQconnectPoll, need readable socket */
RS_CONNECTING_READ,
/* doing PQconnectPoll, need writable socket */
RS_CONNECTING_WRITE,
/* Waiting for START_REPLICATION result */
RS_WAIT_EXEC_RESULT,
/* replication stream established */
RS_ESTABLISHED,
} NeonWALReaderRemoteState;
struct NeonWALReader
{
/* LSN before */
/*
* LSN before which we assume WAL is not available locally. Exists because
* though first segment after startup always exists, part before
* basebackup LSN is filled with zeros.
*/
XLogRecPtr available_lsn;
WALSegmentContext segcxt;
WALOpenSegment seg;
int wre_errno;
/* Explains failure to read, static for simplicity. */
char err_msg[NEON_WALREADER_ERR_MSG_LEN];
/*
* Saved info about request in progress, used to check validity of
* arguments after resume and remember how far we accomplished it. req_lsn
* is 0 if there is no request in progress.
*/
XLogRecPtr req_lsn;
Size req_len;
Size req_progress;
WalProposer *wp; /* we learn donor through walproposer */
char donor_name[64]; /* saved donor safekeeper name for logging */
/* state of connection to safekeeper */
NeonWALReaderRemoteState rem_state;
WalProposerConn *wp_conn;
/*
* position in recvbuf from which we'll copy WAL next time, or NULL if
* there is no unprocessed message
*/
char *wal_ptr;
/*
* LSN of wal_ptr position according to walsender to cross check against
* read request
*/
XLogRecPtr rem_lsn;
};
/* palloc and initialize NeonWALReader */
NeonWALReader *
NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn)
NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn, WalProposer *wp)
{
NeonWALReader *reader;
@@ -50,6 +109,8 @@ NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn)
reader->seg.ws_tli = 0;
reader->segcxt.ws_segsize = wal_segment_size;
reader->wp = wp;
return reader;
}
@@ -61,65 +122,242 @@ NeonWALReaderFree(NeonWALReader *state)
pfree(state);
}
/*
* Copy of vanilla wal_segment_open, but returns false in case of error instead
* of ERROR, with errno set.
*
* XLogReaderRoutine->segment_open callback for local pg_wal files
*/
static bool
neon_wal_segment_open(NeonWALReader *state, XLogSegNo nextSegNo,
TimeLineID *tli_p)
{
TimeLineID tli = *tli_p;
char path[MAXPGPATH];
XLogFilePath(path, tli, nextSegNo, state->segcxt.ws_segsize);
walprop_log(LOG, "opening %s", path);
state->seg.ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY);
if (state->seg.ws_file >= 0)
return true;
return false;
}
/* copy of vanilla wal_segment_close with NeonWALReader */
void
neon_wal_segment_close(NeonWALReader *state)
{
close(state->seg.ws_file);
/* need to check errno? */
state->seg.ws_file = -1;
}
/*
* Mostly copy of vanilla WALRead, but 1) returns error if requested data before
* available_lsn 2) returns error is segment is missing instead of throwing
* ERROR.
* Like vanilla WALRead, but if requested position is before available_lsn or
* WAL segment doesn't exist on disk, it tries to fetch needed segment from the
* advanced safekeeper.
*
* Read 'count' bytes into 'buf', starting at location 'startptr', from WAL
* fetched from timeline 'tli'.
*
* Returns true if succeeded, false if an error occurs, in which case
* 'state->errno' shows whether it was missing WAL (ENOENT) or something else,
* and 'err' the desciption.
* Returns NEON_WALREAD_SUCCESS if succeeded, NEON_WALREAD_ERROR if an error
* occurs, in which case 'err' has the desciption. Error always closes remote
* connection, if there was any, so socket subscription should be removed.
*
* NEON_WALREAD_WOULDBLOCK means caller should obtain socket to wait for with
* NeonWALReaderSocket and call NeonWALRead again with exactly the same
* arguments when NeonWALReaderEvents happen on the socket. Note that per libpq
* docs during connection establishment (before first successful read) socket
* underneath might change.
*
* Also, eventually walreader should switch from remote to local read; caller
* should remove subscription to socket then by checking NeonWALReaderEvents
* after successful read (otherwise next read might reopen the connection with
* different socket).
*
* Reading not monotonically is not supported and will result in error.
*
* Caller should be sure that WAL up to requested LSN exists, otherwise
* NEON_WALREAD_WOULDBLOCK might be always returned.
*/
bool
NeonWALReadResult
NeonWALRead(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli)
{
/*
* If requested data is before known available basebackup lsn or there is
* already active remote state, do remote read.
*/
if (startptr < state->available_lsn || state->rem_state != RS_NONE)
{
return NeonWALReadRemote(state, buf, startptr, count, tli);
}
if (NeonWALReadLocal(state, buf, startptr, count, tli))
{
return NEON_WALREAD_SUCCESS;
}
else if (state->wre_errno == ENOENT)
{
elog(LOG, "local read failed with segment doesn't exist, attempting remote");
return NeonWALReadRemote(state, buf, startptr, count, tli);
}
else
{
return NEON_WALREAD_ERROR;
}
}
static NeonWALReadResult
NeonWALReadRemote(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli)
{
if (state->rem_state == RS_NONE)
{
/* no connection yet; start one */
Safekeeper *donor = GetDonor(state->wp);
if (donor == NULL)
{
snprintf(state->err_msg, sizeof(state->err_msg),
"failed to establish remote connection to fetch WAL: no donor available");
return NEON_WALREAD_ERROR;
}
snprintf(state->donor_name, sizeof(state->donor_name), "%s:%s", donor->host, donor->port);
elog(LOG, "establishing connection to %s to fetch WAL", state->donor_name);
state->wp_conn = libpqwp_connect_start(donor->conninfo);
if (PQstatus(state->wp_conn->pg_conn) == CONNECTION_BAD)
{
snprintf(state->err_msg, sizeof(state->err_msg),
"failed to connect to %s:%s to fetch WAL: immediately failed with %s",
state->donor_name, PQerrorMessage(state->wp_conn->pg_conn));
NeonWALReaderResetRemote(state);
return NEON_WALREAD_ERROR;
}
/* we'll poll immediately */
state->rem_state = RS_CONNECTING_READ;
}
if (state->rem_state == RS_CONNECTING_READ || state->rem_state == RS_CONNECTING_WRITE)
{
switch (PQconnectPoll(state->wp_conn->pg_conn))
{
case PGRES_POLLING_FAILED:
snprintf(state->err_msg, sizeof(state->err_msg),
"failed to connect to %s to fetch WAL: poll error: %s",
state->donor_name, PQerrorMessage(state->wp_conn->pg_conn));
NeonWALReaderResetRemote(state);
return NEON_WALREAD_ERROR;
case PGRES_POLLING_READING:
state->rem_state = RS_CONNECTING_READ;
return NEON_WALREAD_WOULDBLOCK;
case PGRES_POLLING_WRITING:
state->rem_state = RS_CONNECTING_WRITE;
return NEON_WALREAD_WOULDBLOCK;
case PGRES_POLLING_OK:
{
/* connection successfully established */
char start_repl_query[128];
snprintf(start_repl_query, sizeof(start_repl_query),
"START_REPLICATION PHYSICAL %X/%X (term='" UINT64_FORMAT "')",
LSN_FORMAT_ARGS(startptr), state->wp->propTerm);
elog(LOG, "connection to %s to fetch WAL succeeded, running %s",
state->donor_name, start_repl_query);
if (!libpqwp_send_query(state->wp_conn, start_repl_query))
{
snprintf(state->err_msg, sizeof(state->err_msg),
"failed to send %s query to %s: %s",
start_repl_query, state->donor_name, PQerrorMessage(state->wp_conn->pg_conn));
NeonWALReaderResetRemote(state);
return NEON_WALREAD_ERROR;
}
state->rem_state = RS_WAIT_EXEC_RESULT;
break;
}
default: /* there is unused PGRES_POLLING_ACTIVE */
Assert(false);
return NEON_WALREAD_ERROR; /* keep the compiler quiet */
}
}
if (state->rem_state == RS_WAIT_EXEC_RESULT)
{
switch (libpqwp_get_query_result(state->wp_conn))
{
case WP_EXEC_SUCCESS_COPYBOTH:
state->rem_state = RS_ESTABLISHED;
break;
case WP_EXEC_NEEDS_INPUT:
return NEON_WALREAD_WOULDBLOCK;
case WP_EXEC_FAILED:
snprintf(state->err_msg, sizeof(state->err_msg),
"get result from %s failed: %s",
state->donor_name, PQerrorMessage(state->wp_conn->pg_conn));
NeonWALReaderResetRemote(state);
return NEON_WALREAD_ERROR;
default: /* can't happen */
snprintf(state->err_msg, sizeof(state->err_msg),
"get result from %s: unexpected result",
state->donor_name);
NeonWALReaderResetRemote(state);
return NEON_WALREAD_ERROR;
}
}
Assert(state->rem_state == RS_ESTABLISHED);
/*
* If we had the request before, verify args are the same and advance the
* result ptr according to the progress; otherwise register the request.
*/
if (state->req_lsn != InvalidXLogRecPtr)
{
if (state->req_lsn != startptr || state->req_len != count)
{
snprintf(state->err_msg, sizeof(state->err_msg),
"args changed during request, was %X/%X %zu, now %X/%X %zu",
LSN_FORMAT_ARGS(state->req_lsn), state->req_len, LSN_FORMAT_ARGS(startptr), count);
NeonWALReaderResetRemote(state);
return NEON_WALREAD_ERROR;
}
elog(LOG, "moving ptr by %zu bytes restoring progress, req_lsn = %X/%X", state->req_progress, LSN_FORMAT_ARGS(startptr));
buf += state->req_progress;
}
snprintf(state->err_msg, sizeof(state->err_msg), "remote read failed: not implemented");
return NEON_WALREAD_ERROR;
}
/* reset remote connection and request in progress */
static void
NeonWALReaderResetRemote(NeonWALReader *state)
{
state->req_lsn = InvalidXLogRecPtr;
state->req_len = 0;
state->req_progress = 0;
state->rem_state = RS_NONE;
if (state->wp_conn)
{
libpqwp_disconnect(state->wp_conn);
state->wp_conn = NULL;
}
state->donor_name[0] = '\0';
state->wal_ptr = NULL;
state->rem_lsn = InvalidXLogRecPtr;
}
/*
* Return socket of connection to remote source. Must be called only when
* connection exists (NeonWALReaderEvents returns non zero).
*/
pgsocket
NeonWALReaderSocket(NeonWALReader *state)
{
if (!state->wp_conn)
elog(FATAL, "NeonWALReaderSocket is called without active remote connection");
return PQsocket(state->wp_conn->pg_conn);
}
/*
* Returns events user should wait on connection socket or 0 if remote
* connection is not active.
*/
extern uint32
NeonWALReaderEvents(NeonWALReader *state)
{
switch (state->rem_state)
{
case RS_NONE:
return 0;
case RS_CONNECTING_READ:
return WL_SOCKET_READABLE;
case RS_CONNECTING_WRITE:
return WL_SOCKET_WRITEABLE;
case RS_WAIT_EXEC_RESULT:
case RS_ESTABLISHED:
return WL_SOCKET_READABLE;
default:
Assert(false);
break;
}
}
static bool
NeonWALReadLocal(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli)
{
char *p;
XLogRecPtr recptr;
Size nbytes;
if (startptr < state->available_lsn)
{
state->wre_errno = 0;
snprintf(state->err_msg, sizeof(state->err_msg), "failed to read WAL at %X/%X which is earlier than available %X/%X",
LSN_FORMAT_ARGS(startptr), LSN_FORMAT_ARGS(state->available_lsn));
return false;
}
p = buf;
recptr = startptr;
nbytes = count;
@@ -143,8 +381,7 @@ NeonWALRead(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size count, Ti
{
XLogSegNo nextSegNo;
if (state->seg.ws_file >= 0)
neon_wal_segment_close(state);
neon_wal_segment_close(state);
XLByteToSeg(recptr, nextSegNo, state->segcxt.ws_segsize);
if (!neon_wal_segment_open(state, nextSegNo, &tli))
@@ -212,7 +449,40 @@ NeonWALRead(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size count, Ti
}
return true;
}
/*
* Copy of vanilla wal_segment_open, but returns false in case of error instead
* of ERROR, with errno set.
*
* XLogReaderRoutine->segment_open callback for local pg_wal files
*/
static bool
neon_wal_segment_open(NeonWALReader *state, XLogSegNo nextSegNo,
TimeLineID *tli_p)
{
TimeLineID tli = *tli_p;
char path[MAXPGPATH];
XLogFilePath(path, tli, nextSegNo, state->segcxt.ws_segsize);
walprop_log(LOG, "opening %s", path);
state->seg.ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY);
if (state->seg.ws_file >= 0)
return true;
return false;
}
/* copy of vanilla wal_segment_close with NeonWALReader */
static void
neon_wal_segment_close(NeonWALReader *state)
{
if (state->seg.ws_file >= 0)
{
close(state->seg.ws_file);
/* need to check errno? */
state->seg.ws_file = -1;
}
}
char *

View File

@@ -7,7 +7,23 @@
struct NeonWALReader;
typedef struct NeonWALReader NeonWALReader;
extern NeonWALReader *NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn);
/* avoid including walproposer.h as it includes us */
struct WalProposer;
typedef struct WalProposer WalProposer;
/* NeonWALRead return value */
typedef enum
{
NEON_WALREAD_SUCCESS,
NEON_WALREAD_WOULDBLOCK,
NEON_WALREAD_ERROR,
} NeonWALReadResult;
extern NeonWALReader *NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn, WalProposer *wp);
extern void NeonWALReaderFree(NeonWALReader *state);
extern bool NeonWALRead(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli);
extern NeonWALReadResult NeonWALRead(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli);
extern pgsocket NeonWALReaderSocket(NeonWALReader *state);
extern uint32 NeonWALReaderEvents(NeonWALReader *state);
extern char *NeonWALReaderErrMsg(NeonWALReader *state);
#endif /* __NEON_WALREADER_H__ */

View File

@@ -1334,17 +1334,28 @@ SendAppendRequests(Safekeeper *sk)
/* write the WAL itself */
enlargeStringInfo(&sk->outbuf, req->endLsn - req->beginLsn);
/* wal_read will raise error on failure */
if (!wp->api.wal_read(sk,
&sk->outbuf.data[sk->outbuf.len],
req->beginLsn,
req->endLsn - req->beginLsn))
switch (wp->api.wal_read(sk,
&sk->outbuf.data[sk->outbuf.len],
req->beginLsn,
req->endLsn - req->beginLsn))
{
walprop_log(WARNING, "WAL reading for node %s:%s failed: %s",
sk->host, sk->port,
NeonWALReaderErrMsg(sk->xlogreader));
ShutdownConnection(sk);
return false;
case NEON_WALREAD_SUCCESS:
break;
case NEON_WALREAD_WOULDBLOCK:
walprop_log(LOG, "wal reading wouldblock");
/* todo */
ShutdownConnection(sk);
return false;
case NEON_WALREAD_ERROR:
walprop_log(WARNING, "WAL reading for node %s:%s failed: %s",
sk->host, sk->port,
NeonWALReaderErrMsg(sk->xlogreader));
ShutdownConnection(sk);
return false;
default:
Assert(false);
}
sk->outbuf.len += req->endLsn - req->beginLsn;
writeResult = wp->api.conn_async_write(sk, sk->outbuf.data, sk->outbuf.len);
@@ -1567,6 +1578,52 @@ GetAcknowledgedByQuorumWALPosition(WalProposer *wp)
return responses[wp->n_safekeepers - wp->quorum];
}
/*
* Return safekeeper with active connection from which WAL can be downloaded, or
* none if it doesn't exist.
*/
Safekeeper *
GetDonor(WalProposer *wp)
{
XLogRecPtr donor_lsn = InvalidXLogRecPtr;
Safekeeper *donor = NULL;
int i;
if (wp->n_votes < wp->quorum)
{
walprop_log(WARNING, "GetDonor called before elections are winned");
return NULL;
}
/*
* First, consider node which had determined our term start LSN as we know
* about its position immediately after election before any feedbacks are
* sent.
*/
if (wp->safekeeper[wp->donor].state == SS_ACTIVE)
{
donor = &wp->safekeeper[wp->donor];
donor_lsn = wp->propEpochStartLsn;
}
/*
* But also check feedbacks from all nodes with live connections and take
* the highest one. Note: if node sends feedbacks it already processed
* elected message so its term is fine.
*/
for (i = 0; i < wp->n_safekeepers; i++)
{
Safekeeper *sk = &wp->safekeeper[i];
if (sk->state == SS_ACTIVE && sk->appendResponse.flushLsn >= donor_lsn)
{
donor = sk;
donor_lsn = sk->appendResponse.flushLsn;
}
}
return donor;
}
static void
HandleSafekeeperResponse(WalProposer *wp)
{

View File

@@ -97,6 +97,23 @@ typedef enum
*/
SS_IDLE,
/*
* Waiting for neon walreader socket to receive chunk of WAL to send to
* this safekeeper.
*
* Note: socket management is done completely inside walproposer_pg for
* simplicity, and thus simulation doesn't test it. Which is fine as
* simulation is mainly aimed at consensus checks, not waiteventset
* management.
*
* Also, while in this state we don't touch safekeeper socket, so in
* theory it might close connection as inactive. This can be addressed if
* needed; however, while fetching WAL we should regularly send it, so the
* problem is unlikely. Vice versa is also true (SS_ACTIVE doesn't handle
* walreader socket), but similarly shouldn't be a problem.
*/
SS_WAIT_REMOTE_WAL,
/*
* Active phase, when we acquired quorum and have WAL to send or feedback
* to read.
@@ -344,6 +361,11 @@ typedef struct Safekeeper
* Position in wait event set. Equal to -1 if no event
*/
int eventPos;
/*
* Neon WAL reader position in wait event set, or -1 if no socket.
*/
int nwrEventPos;
#endif
@@ -453,7 +475,7 @@ typedef struct walproposer_api
bool (*recovery_download) (Safekeeper *sk, TimeLineID timeline, XLogRecPtr startpos, XLogRecPtr endpos);
/* Read WAL from disk to buf. */
bool (*wal_read) (Safekeeper *sk, char *buf, XLogRecPtr startptr, Size count);
NeonWALReadResult (*wal_read) (Safekeeper *sk, char *buf, XLogRecPtr startptr, Size count);
/* Allocate WAL reader. */
void (*wal_reader_allocate) (Safekeeper *sk);
@@ -660,7 +682,7 @@ extern void WalProposerFree(WalProposer *wp);
* recreate set from scratch, hence the export.
*/
extern uint32 SafekeeperStateDesiredEvents(SafekeeperState state);
extern Safekeeper *GetDonor(WalProposer *wp);
#define WPEVENT 1337 /* special log level for walproposer internal
* events */

View File

@@ -94,6 +94,10 @@ static void XLogBroadcastWalProposer(WalProposer *wp);
static void XLogWalPropWrite(char *buf, Size nbytes, XLogRecPtr recptr);
static void XLogWalPropClose(XLogRecPtr recptr);
static void add_nwr_event_set(Safekeeper *sk, uint32 events);
static void update_nwr_event_set(Safekeeper *sk, uint32 events);
static void rm_safekeeper_event_set(Safekeeper *to_remove, bool is_sk);
static void
init_walprop_config(bool syncSafekeepers)
{
@@ -1424,20 +1428,54 @@ XLogWalPropClose(XLogRecPtr recptr)
walpropFile = -1;
}
static bool
static NeonWALReadResult
walprop_pg_wal_read(Safekeeper *sk, char *buf, XLogRecPtr startptr, Size count)
{
return NeonWALRead(sk->xlogreader,
buf,
startptr,
count,
walprop_pg_get_timeline_id());
NeonWALReadResult res;
res = NeonWALRead(sk->xlogreader,
buf,
startptr,
count,
walprop_pg_get_timeline_id());
switch (res)
{
case NEON_WALREAD_SUCCESS:
/* don't wake up us until we send the chunk */
update_nwr_event_set(sk, 0);
/*
* If we have the socket subscribed, but walreader doesn't need
* any events, it must mean that remote connection just closed
* hoping to do next read locally. Remove the socket then.
*/
if (NeonWALReaderEvents(sk->xlogreader) == 0)
rm_safekeeper_event_set(sk, false);
return res;
case NEON_WALREAD_WOULDBLOCK:
/*
* TODO: instead of reattaching socket (and thus recreating WES)
* each time we should keep it if possible, i.e. if connection is
* already established. Note that single neon_walreader object can
* switch between local and remote reads multiple times during its
* lifetime, so careful bookkeeping is needed here.
*/
rm_safekeeper_event_set(sk, false);
add_nwr_event_set(sk, NeonWALReaderEvents(sk->xlogreader));
return res;
case NEON_WALREAD_ERROR:
rm_safekeeper_event_set(sk, false);
return res;
default:
Assert(false);
}
}
static void
walprop_pg_wal_reader_allocate(Safekeeper *sk)
{
sk->xlogreader = NeonWALReaderAllocate(wal_segment_size, sk->wp->propEpochStartLsn);
sk->xlogreader = NeonWALReaderAllocate(wal_segment_size, sk->wp->propEpochStartLsn, sk->wp);
if (sk->xlogreader == NULL)
elog(FATAL, "Failed to allocate xlog reader");
}
@@ -1465,13 +1503,30 @@ walprop_pg_init_event_set(WalProposer *wp)
if (waitEvents)
elog(FATAL, "double-initialization of event set");
waitEvents = CreateWaitEventSet(TopMemoryContext, 2 + wp->n_safekeepers);
/* for each sk, we have socket plus potentially socket for neon walreader */
waitEvents = CreateWaitEventSet(TopMemoryContext, 2 + 2 * wp->n_safekeepers);
AddWaitEventToSet(waitEvents, WL_LATCH_SET, PGINVALID_SOCKET,
MyLatch, NULL);
AddWaitEventToSet(waitEvents, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
NULL, NULL);
}
/* add safekeeper socket to wait event set */
static void
walprop_pg_add_safekeeper_event_set(Safekeeper *sk, uint32 events)
{
Assert(sk->eventPos == -1);
sk->eventPos = AddWaitEventToSet(waitEvents, events, walprop_socket(sk), NULL, sk);
}
/* add neon wal reader socket to wait event set */
static void
add_nwr_event_set(Safekeeper *sk, uint32 events)
{
Assert(sk->nwrEventPos == -1);
sk->nwrEventPos = AddWaitEventToSet(waitEvents, events, NeonWALReaderSocket(sk->xlogreader), NULL, sk);
}
static void
walprop_pg_update_event_set(Safekeeper *sk, uint32 events)
{
@@ -1481,22 +1536,47 @@ walprop_pg_update_event_set(Safekeeper *sk, uint32 events)
ModifyWaitEvent(waitEvents, sk->eventPos, events, NULL);
}
/* Can be called when nwr socket doesn't exist, does nothing in this case. */
static void
walprop_pg_add_safekeeper_event_set(Safekeeper *sk, uint32 events)
update_nwr_event_set(Safekeeper *sk, uint32 events)
{
sk->eventPos = AddWaitEventToSet(waitEvents, events, walprop_socket(sk), NULL, sk);
/* eventPos = -1 when we don't have an event */
if (sk->nwrEventPos != -1)
ModifyWaitEvent(waitEvents, sk->nwrEventPos, events, NULL);
}
/*
* Hack: provides a way to remove the event corresponding to an individual walproposer from the set.
*
* Note: Internally, this completely reconstructs the event set. It should be avoided if possible.
*/
static void
walprop_pg_rm_safekeeper_event_set(Safekeeper *to_remove)
{
rm_safekeeper_event_set(to_remove, true);
}
/*
* A hacky way to remove single event from the event set. Can be called if event
* doesn't exist, does nothing in this case.
*
* Note: Internally, this completely reconstructs the event set. It should be
* avoided if possible.
*
* If is_sk is true, socket of connection to safekeeper is removed; otherwise
* socket of neon wal reader.
*/
static void
rm_safekeeper_event_set(Safekeeper *to_remove, bool is_sk)
{
WalProposer *wp = to_remove->wp;
/*
* Shortpath for exiting if have nothing to do. We never call this
* function with safekeeper socket not existing, but do that with neon
* walreader socket.
*/
if ((is_sk && to_remove->eventPos == -1) ||
(!is_sk && to_remove->nwrEventPos == -1))
{
return;
}
/* Remove the existing event set, assign sk->eventPos = -1 */
walprop_pg_free_event_set(wp);
@@ -1514,14 +1594,27 @@ walprop_pg_rm_safekeeper_event_set(Safekeeper *to_remove)
Safekeeper *sk = &wp->safekeeper[i];
if (sk == to_remove)
{
if (is_sk)
sk->eventPos = -1;
else
sk->nwrEventPos = -1;
continue;
}
/* If this safekeeper isn't offline, add an event for it! */
/* If this safekeeper isn't offline, add events for it! */
if (sk->state != SS_OFFLINE)
{
desired_events = SafekeeperStateDesiredEvents(sk->state);
/* will set sk->eventPos */
wp->api.add_safekeeper_event_set(sk, desired_events);
if (!is_sk)
{
desired_events = SafekeeperStateDesiredEvents(sk->state);
/* will set sk->eventPos */
wp->api.add_safekeeper_event_set(sk, desired_events);
}
else if (NeonWALReaderEvents(sk->xlogreader) != 0)
{
add_nwr_event_set(sk, NeonWALReaderEvents(sk->xlogreader));
}
}
}
}