Introduce NeonWALReader fetching missing WAL from safekeepers.

This commit is contained in:
Arseny Sher
2023-10-23 17:05:41 +03:00
parent a8c96bb16b
commit 356abb3280
7 changed files with 1466 additions and 323 deletions

View File

@@ -9,6 +9,7 @@ OBJS = \
libpagestore.o \
neon.o \
neon_utils.o \
neon_walreader.o \
pagestore_smgr.o \
relsize_cache.o \
walproposer.o \

View File

@@ -0,0 +1,96 @@
/*
* Interface to set of libpq wrappers walproposer and neon_walreader need.
* Similar to libpqwalreceiver, but it has blocking connection establishment and
* pqexec which don't fit us. Implementation is at walproposer_pg.c.
*/
#ifndef ___LIBPQWALPROPOSER_H__
#define ___LIBPQWALPROPOSER_H__
/* Re-exported and modified ExecStatusType */
typedef enum
{
/* We received a single CopyBoth result */
WP_EXEC_SUCCESS_COPYBOTH,
/*
* Any success result other than a single CopyBoth was received. The
* specifics of the result were already logged, but it may be useful to
* provide an error message indicating which safekeeper messed up.
*
* Do not expect PQerrorMessage to be appropriately set.
*/
WP_EXEC_UNEXPECTED_SUCCESS,
/*
* No result available at this time. Wait until read-ready, then call
* again. Internally, this is returned when PQisBusy indicates that
* PQgetResult would block.
*/
WP_EXEC_NEEDS_INPUT,
/* Catch-all failure. Check PQerrorMessage. */
WP_EXEC_FAILED,
} WalProposerExecStatusType;
/* Possible return values from walprop_async_read */
typedef enum
{
/* The full read was successful. buf now points to the data */
PG_ASYNC_READ_SUCCESS,
/*
* The read is ongoing. Wait until the connection is read-ready, then try
* again.
*/
PG_ASYNC_READ_TRY_AGAIN,
/* Reading failed. Check PQerrorMessage(conn) */
PG_ASYNC_READ_FAIL,
} PGAsyncReadResult;
/* Possible return values from walprop_async_write */
typedef enum
{
/* The write fully completed */
PG_ASYNC_WRITE_SUCCESS,
/*
* The write started, but you'll need to call PQflush some more times to
* finish it off. We just tried, so it's best to wait until the connection
* is read- or write-ready to try again.
*
* If it becomes read-ready, call PQconsumeInput and flush again. If it
* becomes write-ready, just call PQflush.
*/
PG_ASYNC_WRITE_TRY_FLUSH,
/* Writing failed. Check PQerrorMessage(conn) */
PG_ASYNC_WRITE_FAIL,
} PGAsyncWriteResult;
/*
* This header is included by walproposer.h to define walproposer_api; if we're
* building walproposer without pg, ignore libpq part, leaving only interface
* types.
*/
#ifndef WALPROPOSER_LIB
#include "libpq-fe.h"
/*
* Sometimes working directly with underlying PGconn is simpler, export the
* whole thing for simplicity.
*/
typedef struct WalProposerConn
{
PGconn *pg_conn;
bool is_nonblocking; /* whether the connection is non-blocking */
char *recvbuf; /* last received CopyData message from
* walprop_async_read */
} WalProposerConn;
extern WalProposerConn *libpqwp_connect_start(char *conninfo);
extern bool libpqwp_send_query(WalProposerConn *conn, char *query);
extern WalProposerExecStatusType libpqwp_get_query_result(WalProposerConn *conn);
extern PGAsyncReadResult libpqwp_async_read(WalProposerConn *conn, char **buf, int *amount);
extern void libpqwp_disconnect(WalProposerConn *conn);
#endif /* WALPROPOSER_LIB */
#endif /* ___LIBPQWALPROPOSER_H__ */

731
pgxn/neon/neon_walreader.c Normal file
View File

@@ -0,0 +1,731 @@
/*
* Like WALRead, but when WAL segment doesn't exist locally instead of throwing
* ERROR asynchronously tries to fetch it from the most advanced safekeeper.
*
* We can't use libpqwalreceiver as it blocks during connection establishment
* (and waiting for PQExec result), so use libpqwalproposer instead.
*
* TODO: keepalives are currently never sent, so the other side can close the
* connection prematurely.
*
* TODO: close conn if reading takes too long to prevent stuck connections.
*/
#include "postgres.h"
#include <sys/stat.h>
#include <unistd.h>
#include "access/xlog_internal.h"
#include "access/xlogdefs.h"
#include "access/xlogreader.h"
#include "libpq/pqformat.h"
#include "storage/fd.h"
#include "utils/wait_event.h"
#include "libpq-fe.h"
#include "neon_walreader.h"
#include "walproposer.h"
#define NEON_WALREADER_ERR_MSG_LEN 512
/*
* Can be called where NeonWALReader *state is available in the context, adds log_prefix.
*/
#define nwr_log(elevel, fmt, ...) elog(elevel, "%s" fmt, state->log_prefix, ## __VA_ARGS__)
static NeonWALReadResult NeonWALReadRemote(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli);
static NeonWALReadResult NeonWALReaderReadMsg(NeonWALReader *state);
static void NeonWALReaderResetRemote(NeonWALReader *state);
static bool NeonWALReadLocal(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli);
static bool neon_wal_segment_open(NeonWALReader *state, XLogSegNo nextSegNo, TimeLineID *tli_p);
static void neon_wal_segment_close(NeonWALReader *state);
static bool is_wal_segment_exists(XLogSegNo segno, int segsize,
TimeLineID tli);
/*
* State of connection to donor safekeeper.
*/
typedef enum
{
/* no remote connection */
RS_NONE,
/* doing PQconnectPoll, need readable socket */
RS_CONNECTING_READ,
/* doing PQconnectPoll, need writable socket */
RS_CONNECTING_WRITE,
/* Waiting for START_REPLICATION result */
RS_WAIT_EXEC_RESULT,
/* replication stream established */
RS_ESTABLISHED,
} NeonWALReaderRemoteState;
struct NeonWALReader
{
/*
* LSN before which we assume WAL is not available locally. Exists because
* though first segment after startup always exists, part before
* basebackup LSN is filled with zeros.
*/
XLogRecPtr available_lsn;
WALSegmentContext segcxt;
WALOpenSegment seg;
int wre_errno;
/* Explains failure to read, static for simplicity. */
char err_msg[NEON_WALREADER_ERR_MSG_LEN];
/*
* Saved info about request in progress, used to check validity of
* arguments after resume and remember how far we accomplished it. req_lsn
* is 0 if there is no request in progress.
*/
XLogRecPtr req_lsn;
Size req_len;
Size req_progress;
WalProposer *wp; /* we learn donor through walproposer */
char donor_name[64]; /* saved donor safekeeper name for logging */
/* state of connection to safekeeper */
NeonWALReaderRemoteState rem_state;
WalProposerConn *wp_conn;
/*
* position in wp_conn recvbuf from which we'll copy WAL next time, or
* NULL if there is no unprocessed message
*/
char *wal_ptr;
Size wal_rem_len; /* how many unprocessed bytes left in recvbuf */
/*
* LSN of wal_ptr position according to walsender to cross check against
* read request
*/
XLogRecPtr rem_lsn;
/* prepended to lines logged by neon_walreader, if provided */
char log_prefix[64];
};
/* palloc and initialize NeonWALReader */
NeonWALReader *
NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn, WalProposer *wp, char *log_prefix)
{
NeonWALReader *reader;
reader = (NeonWALReader *)
palloc_extended(sizeof(NeonWALReader),
MCXT_ALLOC_NO_OOM | MCXT_ALLOC_ZERO);
if (!reader)
return NULL;
reader->available_lsn = available_lsn;
reader->seg.ws_file = -1;
reader->seg.ws_segno = 0;
reader->seg.ws_tli = 0;
reader->segcxt.ws_segsize = wal_segment_size;
reader->wp = wp;
reader->rem_state = RS_NONE;
if (log_prefix)
strncpy(reader->log_prefix, log_prefix, sizeof(reader->log_prefix));
return reader;
}
void
NeonWALReaderFree(NeonWALReader *state)
{
if (state->seg.ws_file != -1)
neon_wal_segment_close(state);
if (state->wp_conn)
libpqwp_disconnect(state->wp_conn);
pfree(state);
}
/*
* Like vanilla WALRead, but if requested position is before available_lsn or
* WAL segment doesn't exist on disk, it tries to fetch needed segment from the
* advanced safekeeper.
*
* Read 'count' bytes into 'buf', starting at location 'startptr', from WAL
* fetched from timeline 'tli'.
*
* Returns NEON_WALREAD_SUCCESS if succeeded, NEON_WALREAD_ERROR if an error
* occurs, in which case 'err' has the desciption. Error always closes remote
* connection, if there was any, so socket subscription should be removed.
*
* NEON_WALREAD_WOULDBLOCK means caller should obtain socket to wait for with
* NeonWALReaderSocket and call NeonWALRead again with exactly the same
* arguments when NeonWALReaderEvents happen on the socket. Note that per libpq
* docs during connection establishment (before first successful read) socket
* underneath might change.
*
* Also, eventually walreader should switch from remote to local read; caller
* should remove subscription to socket then by checking NeonWALReaderEvents
* after successful read (otherwise next read might reopen the connection with
* different socket).
*
* Reading not monotonically is not supported and will result in error.
*
* Caller should be sure that WAL up to requested LSN exists, otherwise
* NEON_WALREAD_WOULDBLOCK might be always returned.
*/
NeonWALReadResult
NeonWALRead(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli)
{
/*
* If requested data is before known available basebackup lsn or there is
* already active remote state, do remote read.
*/
if (startptr < state->available_lsn || state->rem_state != RS_NONE)
{
return NeonWALReadRemote(state, buf, startptr, count, tli);
}
if (NeonWALReadLocal(state, buf, startptr, count, tli))
{
return NEON_WALREAD_SUCCESS;
}
else if (state->wre_errno == ENOENT)
{
nwr_log(LOG, "local read failed as segment at %X/%X doesn't exist, attempting remote",
LSN_FORMAT_ARGS(startptr));
return NeonWALReadRemote(state, buf, startptr, count, tli);
}
else
{
return NEON_WALREAD_ERROR;
}
}
/* Do the read from remote safekeeper. */
static NeonWALReadResult
NeonWALReadRemote(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli)
{
if (state->rem_state == RS_NONE)
{
XLogRecPtr donor_lsn;
/* no connection yet; start one */
Safekeeper *donor = GetDonor(state->wp, &donor_lsn);
if (donor == NULL)
{
snprintf(state->err_msg, sizeof(state->err_msg),
"failed to establish remote connection to fetch WAL: no donor available");
return NEON_WALREAD_ERROR;
}
snprintf(state->donor_name, sizeof(state->donor_name), "%s:%s", donor->host, donor->port);
nwr_log(LOG, "establishing connection to %s, flush_lsn %X/%X to fetch WAL",
state->donor_name, LSN_FORMAT_ARGS(donor_lsn));
state->wp_conn = libpqwp_connect_start(donor->conninfo);
if (PQstatus(state->wp_conn->pg_conn) == CONNECTION_BAD)
{
snprintf(state->err_msg, sizeof(state->err_msg),
"failed to connect to %s to fetch WAL: immediately failed with %s",
state->donor_name, PQerrorMessage(state->wp_conn->pg_conn));
NeonWALReaderResetRemote(state);
return NEON_WALREAD_ERROR;
}
/* we'll poll immediately */
state->rem_state = RS_CONNECTING_READ;
}
if (state->rem_state == RS_CONNECTING_READ || state->rem_state == RS_CONNECTING_WRITE)
{
switch (PQconnectPoll(state->wp_conn->pg_conn))
{
case PGRES_POLLING_FAILED:
snprintf(state->err_msg, sizeof(state->err_msg),
"failed to connect to %s to fetch WAL: poll error: %s",
state->donor_name, PQerrorMessage(state->wp_conn->pg_conn));
NeonWALReaderResetRemote(state);
return NEON_WALREAD_ERROR;
case PGRES_POLLING_READING:
state->rem_state = RS_CONNECTING_READ;
return NEON_WALREAD_WOULDBLOCK;
case PGRES_POLLING_WRITING:
state->rem_state = RS_CONNECTING_WRITE;
return NEON_WALREAD_WOULDBLOCK;
case PGRES_POLLING_OK:
{
/* connection successfully established */
char start_repl_query[128];
snprintf(start_repl_query, sizeof(start_repl_query),
"START_REPLICATION PHYSICAL %X/%X (term='" UINT64_FORMAT "')",
LSN_FORMAT_ARGS(startptr), state->wp->propTerm);
nwr_log(LOG, "connection to %s to fetch WAL succeeded, running %s",
state->donor_name, start_repl_query);
if (!libpqwp_send_query(state->wp_conn, start_repl_query))
{
snprintf(state->err_msg, sizeof(state->err_msg),
"failed to send %s query to %s: %s",
start_repl_query, state->donor_name, PQerrorMessage(state->wp_conn->pg_conn));
NeonWALReaderResetRemote(state);
return NEON_WALREAD_ERROR;
}
state->rem_state = RS_WAIT_EXEC_RESULT;
break;
}
default: /* there is unused PGRES_POLLING_ACTIVE */
Assert(false);
return NEON_WALREAD_ERROR; /* keep the compiler quiet */
}
}
if (state->rem_state == RS_WAIT_EXEC_RESULT)
{
switch (libpqwp_get_query_result(state->wp_conn))
{
case WP_EXEC_SUCCESS_COPYBOTH:
state->rem_state = RS_ESTABLISHED;
break;
case WP_EXEC_NEEDS_INPUT:
return NEON_WALREAD_WOULDBLOCK;
case WP_EXEC_FAILED:
snprintf(state->err_msg, sizeof(state->err_msg),
"get START_REPLICATION result from %s failed: %s",
state->donor_name, PQerrorMessage(state->wp_conn->pg_conn));
NeonWALReaderResetRemote(state);
return NEON_WALREAD_ERROR;
default: /* can't happen */
snprintf(state->err_msg, sizeof(state->err_msg),
"get START_REPLICATION result from %s: unexpected result",
state->donor_name);
NeonWALReaderResetRemote(state);
return NEON_WALREAD_ERROR;
}
}
Assert(state->rem_state == RS_ESTABLISHED);
/*
* If we had the request before, verify args are the same and advance the
* result ptr according to the progress; otherwise register the request.
*/
if (state->req_lsn != InvalidXLogRecPtr)
{
if (state->req_lsn != startptr || state->req_len != count)
{
snprintf(state->err_msg, sizeof(state->err_msg),
"args changed during request, was %X/%X %zu, now %X/%X %zu",
LSN_FORMAT_ARGS(state->req_lsn), state->req_len, LSN_FORMAT_ARGS(startptr), count);
NeonWALReaderResetRemote(state);
return NEON_WALREAD_ERROR;
}
nwr_log(DEBUG5, "continuing remote read at req_lsn=%X/%X len=%zu, req_progress=%zu",
LSN_FORMAT_ARGS(startptr),
count,
state->req_progress);
buf += state->req_progress;
}
else
{
state->req_lsn = startptr;
state->req_len = count;
state->req_progress = 0;
nwr_log(DEBUG5, "starting remote read req_lsn=%X/%X len=%zu",
LSN_FORMAT_ARGS(startptr),
count);
}
while (true)
{
Size to_copy;
/*
* If we have no ready data, receive new message.
*/
if (state->wal_rem_len == 0 &&
/*
* check for the sake of 0 length reads; walproposer does these for
* heartbeats, though generally they shouldn't hit remote source.
*/
state->req_len - state->req_progress > 0)
{
NeonWALReadResult read_msg_res = NeonWALReaderReadMsg(state);
if (read_msg_res != NEON_WALREAD_SUCCESS)
return read_msg_res;
}
if (state->req_lsn + state->req_progress != state->rem_lsn)
{
snprintf(state->err_msg, sizeof(state->err_msg),
"expected remote WAL at %X/%X but got %X/%X. Non monotonic read requests could have caused this. req_lsn=%X/%X len=%zu",
LSN_FORMAT_ARGS(state->req_lsn + state->req_progress),
LSN_FORMAT_ARGS(state->rem_lsn),
LSN_FORMAT_ARGS(state->req_lsn),
state->req_len);
NeonWALReaderResetRemote(state);
return NEON_WALREAD_ERROR;
}
/* We can copy min of (available, requested) bytes. */
to_copy =
Min(state->req_len - state->req_progress, state->wal_rem_len);
memcpy(buf, state->wal_ptr, to_copy);
state->wal_ptr += to_copy;
state->wal_rem_len -= to_copy;
state->rem_lsn += to_copy;
if (state->wal_rem_len == 0)
state->wal_ptr = NULL; /* freed by libpqwalproposer */
buf += to_copy;
state->req_progress += to_copy;
if (state->req_progress == state->req_len)
{
XLogSegNo next_segno;
XLogSegNo req_segno;
XLByteToSeg(state->req_lsn, req_segno, state->segcxt.ws_segsize);
XLByteToSeg(state->rem_lsn, next_segno, state->segcxt.ws_segsize);
/*
* Request completed. If there is a chance of serving next one
* locally, close the connection.
*/
if (state->req_lsn < state->available_lsn &&
state->rem_lsn >= state->available_lsn)
{
nwr_log(LOG, "closing remote connection as available_lsn %X/%X crossed and next read at %X/%X is likely to be served locally",
LSN_FORMAT_ARGS(state->available_lsn), LSN_FORMAT_ARGS(state->rem_lsn));
NeonWALReaderResetRemote(state);
}
else if (state->rem_lsn >= state->available_lsn && next_segno > req_segno &&
is_wal_segment_exists(next_segno, state->segcxt.ws_segsize, tli))
{
nwr_log(LOG, "closing remote connection as WAL file at next lsn %X/%X exists",
LSN_FORMAT_ARGS(state->rem_lsn));
NeonWALReaderResetRemote(state);
}
state->req_lsn = InvalidXLogRecPtr;
state->req_len = 0;
state->req_progress = 0;
return NEON_WALREAD_SUCCESS;
}
}
}
/*
* Read one WAL message from the stream, sets state->wal_ptr in case of success.
* Resets remote state in case of failure.
*/
static NeonWALReadResult
NeonWALReaderReadMsg(NeonWALReader *state)
{
while (true) /* loop until we get 'w' */
{
char *copydata_ptr;
int copydata_size;
StringInfoData s;
char msg_type;
int hdrlen;
Assert(state->rem_state == RS_ESTABLISHED);
Assert(state->wal_ptr == NULL && state->wal_rem_len == 0);
switch (libpqwp_async_read(state->wp_conn,
&copydata_ptr,
&copydata_size))
{
case PG_ASYNC_READ_SUCCESS:
break;
case PG_ASYNC_READ_TRY_AGAIN:
return NEON_WALREAD_WOULDBLOCK;
case PG_ASYNC_READ_FAIL:
snprintf(state->err_msg,
sizeof(state->err_msg),
"req_lsn=%X/%X, req_len=%zu, req_progress=%zu, get copydata failed: %s",
LSN_FORMAT_ARGS(state->req_lsn),
state->req_len,
state->req_progress,
PQerrorMessage(state->wp_conn->pg_conn));
goto err;
}
/* put data on StringInfo to parse */
s.data = copydata_ptr;
s.len = copydata_size;
s.cursor = 0;
s.maxlen = -1;
if (copydata_size == 0)
{
snprintf(state->err_msg,
sizeof(state->err_msg),
"zero length copydata received");
goto err;
}
msg_type = pq_getmsgbyte(&s);
switch (msg_type)
{
case 'w':
{
XLogRecPtr start_lsn;
hdrlen = sizeof(int64) + sizeof(int64) + sizeof(int64);
if (s.len - s.cursor < hdrlen)
{
snprintf(state->err_msg,
sizeof(state->err_msg),
"invalid WAL message received from primary");
goto err;
}
start_lsn = pq_getmsgint64(&s);
pq_getmsgint64(&s); /* XLogRecPtr end_lsn; */
pq_getmsgint64(&s); /* TimestampTz send_time */
state->rem_lsn = start_lsn;
state->wal_rem_len = (Size) (s.len - s.cursor);
state->wal_ptr = (char *) pq_getmsgbytes(&s, s.len - s.cursor);
nwr_log(DEBUG5, "received WAL msg at %X/%X len %zu",
LSN_FORMAT_ARGS(state->rem_lsn), state->wal_rem_len);
return NEON_WALREAD_SUCCESS;
}
case 'k':
{
XLogRecPtr end_lsn;
bool reply_requested;
hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char);
if (s.len - s.cursor < hdrlen)
{
snprintf(state->err_msg, sizeof(state->err_msg),
"invalid keepalive message received from primary");
goto err;
}
end_lsn = pq_getmsgint64(&s);
pq_getmsgint64(&s); /* TimestampTz timestamp; */
reply_requested = pq_getmsgbyte(&s);
nwr_log(DEBUG5, "received keepalive end_lsn=%X/%X reply_requested=%d",
LSN_FORMAT_ARGS(end_lsn),
reply_requested);
if (end_lsn < state->req_lsn + state->req_len)
{
snprintf(state->err_msg, sizeof(state->err_msg),
"closing remote connection: requested WAL up to %X/%X, but current donor %s has only up to %X/%X",
LSN_FORMAT_ARGS(state->req_lsn + state->req_len), state->donor_name, LSN_FORMAT_ARGS(end_lsn));
goto err;
}
continue;
}
default:
nwr_log(WARNING, "invalid replication message type %d", msg_type);
continue;
}
}
err:
NeonWALReaderResetRemote(state);
return NEON_WALREAD_ERROR;
}
/* reset remote connection and request in progress */
static void
NeonWALReaderResetRemote(NeonWALReader *state)
{
state->req_lsn = InvalidXLogRecPtr;
state->req_len = 0;
state->req_progress = 0;
state->rem_state = RS_NONE;
if (state->wp_conn)
{
libpqwp_disconnect(state->wp_conn);
state->wp_conn = NULL;
}
state->donor_name[0] = '\0';
state->wal_ptr = NULL;
state->wal_rem_len = 0;
state->rem_lsn = InvalidXLogRecPtr;
}
/*
* Return socket of connection to remote source. Must be called only when
* connection exists (NeonWALReaderEvents returns non zero).
*/
pgsocket
NeonWALReaderSocket(NeonWALReader *state)
{
if (!state->wp_conn)
nwr_log(FATAL, "NeonWALReaderSocket is called without active remote connection");
return PQsocket(state->wp_conn->pg_conn);
}
/*
* Returns events user should wait on connection socket or 0 if remote
* connection is not active.
*/
extern uint32
NeonWALReaderEvents(NeonWALReader *state)
{
switch (state->rem_state)
{
case RS_NONE:
return 0;
case RS_CONNECTING_READ:
return WL_SOCKET_READABLE;
case RS_CONNECTING_WRITE:
return WL_SOCKET_WRITEABLE;
case RS_WAIT_EXEC_RESULT:
case RS_ESTABLISHED:
return WL_SOCKET_READABLE;
default:
Assert(false);
return 0; /* make compiler happy */
}
}
static bool
NeonWALReadLocal(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli)
{
char *p;
XLogRecPtr recptr;
Size nbytes;
p = buf;
recptr = startptr;
nbytes = count;
while (nbytes > 0)
{
uint32 startoff;
int segbytes;
int readbytes;
startoff = XLogSegmentOffset(recptr, state->segcxt.ws_segsize);
/*
* If the data we want is not in a segment we have open, close what we
* have (if anything) and open the next one, using the caller's
* provided openSegment callback.
*/
if (state->seg.ws_file < 0 ||
!XLByteInSeg(recptr, state->seg.ws_segno, state->segcxt.ws_segsize) ||
tli != state->seg.ws_tli)
{
XLogSegNo nextSegNo;
neon_wal_segment_close(state);
XLByteToSeg(recptr, nextSegNo, state->segcxt.ws_segsize);
if (!neon_wal_segment_open(state, nextSegNo, &tli))
{
char fname[MAXFNAMELEN];
state->wre_errno = errno;
XLogFileName(fname, tli, nextSegNo, state->segcxt.ws_segsize);
snprintf(state->err_msg, sizeof(state->err_msg), "failed to open WAL segment %s while reading at %X/%X: %s",
fname, LSN_FORMAT_ARGS(recptr), strerror(state->wre_errno));
return false;
}
/* This shouldn't happen -- indicates a bug in segment_open */
Assert(state->seg.ws_file >= 0);
/* Update the current segment info. */
state->seg.ws_tli = tli;
state->seg.ws_segno = nextSegNo;
}
/* How many bytes are within this segment? */
if (nbytes > (state->segcxt.ws_segsize - startoff))
segbytes = state->segcxt.ws_segsize - startoff;
else
segbytes = nbytes;
#ifndef FRONTEND
pgstat_report_wait_start(WAIT_EVENT_WAL_READ);
#endif
/* Reset errno first; eases reporting non-errno-affecting errors */
errno = 0;
readbytes = pg_pread(state->seg.ws_file, p, segbytes, (off_t) startoff);
#ifndef FRONTEND
pgstat_report_wait_end();
#endif
if (readbytes <= 0)
{
char fname[MAXFNAMELEN];
XLogFileName(fname, state->seg.ws_tli, state->seg.ws_segno, state->segcxt.ws_segsize);
if (readbytes < 0)
{
state->wre_errno = errno;
snprintf(state->err_msg, sizeof(state->err_msg), "could not read from log segment %s, offset %d: %m: %s",
fname, startoff, strerror(state->wre_errno));
}
else
{
snprintf(state->err_msg, sizeof(state->err_msg), "could not read from log segment %s, offset %d: %m: unexpected EOF",
fname, startoff);
}
return false;
}
/* Update state for read */
recptr += readbytes;
nbytes -= readbytes;
p += readbytes;
}
return true;
}
/*
* Copy of vanilla wal_segment_open, but returns false in case of error instead
* of ERROR, with errno set.
*
* XLogReaderRoutine->segment_open callback for local pg_wal files
*/
static bool
neon_wal_segment_open(NeonWALReader *state, XLogSegNo nextSegNo,
TimeLineID *tli_p)
{
TimeLineID tli = *tli_p;
char path[MAXPGPATH];
XLogFilePath(path, tli, nextSegNo, state->segcxt.ws_segsize);
nwr_log(LOG, "opening %s", path);
state->seg.ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY);
if (state->seg.ws_file >= 0)
return true;
return false;
}
static bool
is_wal_segment_exists(XLogSegNo segno, int segsize, TimeLineID tli)
{
struct stat stat_buffer;
char path[MAXPGPATH];
XLogFilePath(path, tli, segno, segsize);
return stat(path, &stat_buffer) == 0;
}
/* copy of vanilla wal_segment_close with NeonWALReader */
static void
neon_wal_segment_close(NeonWALReader *state)
{
if (state->seg.ws_file >= 0)
{
close(state->seg.ws_file);
/* need to check errno? */
state->seg.ws_file = -1;
}
}
char *
NeonWALReaderErrMsg(NeonWALReader *state)
{
return state->err_msg;
}

View File

@@ -0,0 +1,29 @@
#ifndef __NEON_WALREADER_H__
#define __NEON_WALREADER_H__
#include "access/xlogdefs.h"
/* forward declare so we don't have to expose the struct to the public */
struct NeonWALReader;
typedef struct NeonWALReader NeonWALReader;
/* avoid including walproposer.h as it includes us */
struct WalProposer;
typedef struct WalProposer WalProposer;
/* NeonWALRead return value */
typedef enum
{
NEON_WALREAD_SUCCESS,
NEON_WALREAD_WOULDBLOCK,
NEON_WALREAD_ERROR,
} NeonWALReadResult;
extern NeonWALReader *NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn, WalProposer *wp, char *log_prefix);
extern void NeonWALReaderFree(NeonWALReader *state);
extern NeonWALReadResult NeonWALRead(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli);
extern pgsocket NeonWALReaderSocket(NeonWALReader *state);
extern uint32 NeonWALReaderEvents(NeonWALReader *state);
extern char *NeonWALReaderErrMsg(NeonWALReader *state);
#endif /* __NEON_WALREADER_H__ */

View File

@@ -43,7 +43,6 @@
/* Prototypes for private functions */
static void WalProposerLoop(WalProposer *wp);
static void HackyRemoveWalProposerEvent(Safekeeper *to_remove);
static void ShutdownConnection(Safekeeper *sk);
static void ResetConnection(Safekeeper *sk);
static long TimeToReconnect(WalProposer *wp, TimestampTz now);
@@ -76,10 +75,9 @@ static bool BlockingWrite(Safekeeper *sk, void *msg, size_t msg_size, Safekeeper
static bool AsyncWrite(Safekeeper *sk, void *msg, size_t msg_size, SafekeeperState flush_state);
static bool AsyncFlush(Safekeeper *sk);
static int CompareLsn(const void *a, const void *b);
static char *FormatSafekeeperState(SafekeeperState state);
static char *FormatSafekeeperState(SafekeeperState state, SafekeeperActiveState active_state);
static void AssertEventsOkForState(uint32 events, Safekeeper *sk);
static uint32 SafekeeperStateDesiredEvents(SafekeeperState state);
static char *FormatEvents(WalProposer *wp, uint32 events);
static char *FormatEvents(uint32 events);
WalProposer *
WalProposerCreate(WalProposerConfig *config, walproposer_api api)
@@ -125,8 +123,7 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api)
}
initStringInfo(&wp->safekeeper[wp->n_safekeepers].outbuf);
wp->api.wal_reader_allocate(&wp->safekeeper[wp->n_safekeepers]);
wp->safekeeper[wp->n_safekeepers].flushWrite = false;
wp->safekeeper[wp->n_safekeepers].xlogreader = NULL;
wp->safekeeper[wp->n_safekeepers].startStreamingAt = InvalidXLogRecPtr;
wp->safekeeper[wp->n_safekeepers].streamingAt = InvalidXLogRecPtr;
wp->n_safekeepers += 1;
@@ -275,7 +272,7 @@ WalProposerPoll(WalProposer *wp)
wp->config->safekeeper_connection_timeout))
{
walprop_log(WARNING, "terminating connection to safekeeper '%s:%s' in '%s' state: no messages received during the last %dms or connection attempt took longer than that",
sk->host, sk->port, FormatSafekeeperState(sk->state), wp->config->safekeeper_connection_timeout);
sk->host, sk->port, FormatSafekeeperState(sk->state, sk->active_state), wp->config->safekeeper_connection_timeout);
ShutdownConnection(sk);
}
}
@@ -303,43 +300,6 @@ WalProposerLoop(WalProposer *wp)
WalProposerPoll(wp);
}
/*
* Hack: provides a way to remove the event corresponding to an individual walproposer from the set.
*
* Note: Internally, this completely reconstructs the event set. It should be avoided if possible.
*/
static void
HackyRemoveWalProposerEvent(Safekeeper *to_remove)
{
WalProposer *wp = to_remove->wp;
/* Remove the existing event set, assign sk->eventPos = -1 */
wp->api.free_event_set(wp);
/* Re-initialize it without adding any safekeeper events */
wp->api.init_event_set(wp);
/*
* loop through the existing safekeepers. If they aren't the one we're
* removing, and if they have a socket we can use, re-add the applicable
* events.
*/
for (int i = 0; i < wp->n_safekeepers; i++)
{
uint32 desired_events = WL_NO_EVENTS;
Safekeeper *sk = &wp->safekeeper[i];
if (sk == to_remove)
continue;
/* If this safekeeper isn't offline, add an event for it! */
if (sk->state != SS_OFFLINE)
{
desired_events = SafekeeperStateDesiredEvents(sk->state);
/* will set sk->eventPos */
wp->api.add_safekeeper_event_set(sk, desired_events);
}
}
}
/* Shuts down and cleans up the connection for a safekeeper. Sets its state to SS_OFFLINE */
static void
@@ -347,14 +307,13 @@ ShutdownConnection(Safekeeper *sk)
{
sk->wp->api.conn_finish(sk);
sk->state = SS_OFFLINE;
sk->flushWrite = false;
sk->streamingAt = InvalidXLogRecPtr;
if (sk->voteResponse.termHistory.entries)
pfree(sk->voteResponse.termHistory.entries);
sk->voteResponse.termHistory.entries = NULL;
HackyRemoveWalProposerEvent(sk);
sk->wp->api.rm_safekeeper_event_set(sk);
}
/*
@@ -472,8 +431,6 @@ ReconnectSafekeepers(WalProposer *wp)
static void
AdvancePollState(Safekeeper *sk, uint32 events)
{
WalProposer *wp = sk->wp;
/*
* Sanity check. We assume further down that the operations don't block
* because the socket is ready.
@@ -525,7 +482,7 @@ AdvancePollState(Safekeeper *sk, uint32 events)
*/
case SS_VOTING:
walprop_log(WARNING, "EOF from node %s:%s in %s state", sk->host,
sk->port, FormatSafekeeperState(sk->state));
sk->port, FormatSafekeeperState(sk->state, sk->active_state));
ResetConnection(sk);
return;
@@ -554,7 +511,7 @@ AdvancePollState(Safekeeper *sk, uint32 events)
*/
case SS_IDLE:
walprop_log(WARNING, "EOF from node %s:%s in %s state", sk->host,
sk->port, FormatSafekeeperState(sk->state));
sk->port, FormatSafekeeperState(sk->state, sk->active_state));
ResetConnection(sk);
return;
@@ -620,7 +577,7 @@ HandleConnectionEvent(Safekeeper *sk)
* Because PQconnectPoll can change the socket, we have to un-register the
* old event and re-register an event on the new socket.
*/
HackyRemoveWalProposerEvent(sk);
wp->api.rm_safekeeper_event_set(sk);
wp->api.add_safekeeper_event_set(sk, new_events);
/* If we successfully connected, send START_WAL_PUSH query */
@@ -1104,6 +1061,10 @@ SendProposerElected(Safekeeper *sk)
term_t lastCommonTerm;
int i;
/* Now that we are ready to send it's a good moment to create WAL reader */
Assert(!sk->xlogreader);
wp->api.wal_reader_allocate(sk);
/*
* Determine start LSN by comparing safekeeper's log term switch history
* and proposer's, searching for the divergence point.
@@ -1223,6 +1184,7 @@ StartStreaming(Safekeeper *sk)
* once for a connection.
*/
sk->state = SS_ACTIVE;
sk->active_state = SS_ACTIVE_SEND;
sk->streamingAt = sk->startStreamingAt;
/* event set will be updated inside SendMessageToNode */
@@ -1281,9 +1243,13 @@ HandleActiveState(Safekeeper *sk, uint32 events)
{
WalProposer *wp = sk->wp;
uint32 newEvents = WL_SOCKET_READABLE;
if (events & WL_SOCKET_WRITEABLE)
/*
* Note: we don't known which socket awoke us (sk or nwr). However, as
* SendAppendRequests always tries to send at least one msg in
* SS_ACTIVE_SEND be careful not to go there if are only after sk
* response, otherwise it'd create busy loop of pings.
*/
if (events & WL_SOCKET_WRITEABLE || sk->active_state == SS_ACTIVE_READ_WAL)
if (!SendAppendRequests(sk))
return;
@@ -1291,28 +1257,26 @@ HandleActiveState(Safekeeper *sk, uint32 events)
if (!RecvAppendResponses(sk))
return;
/*
* We should wait for WL_SOCKET_WRITEABLE event if we have unflushed data
* in the buffer.
*
* LSN comparison checks if we have pending unsent messages. This check
* isn't necessary now, because we always send append messages immediately
* after arrival. But it's good to have it here in case we change this
* behavior in the future.
*/
if (sk->streamingAt != wp->availableLsn || sk->flushWrite)
newEvents |= WL_SOCKET_WRITEABLE;
if (events & WL_SOCKET_CLOSED)
{
walprop_log(WARNING, "connection to %s:%s in active state failed, got WL_SOCKET_CLOSED on neon_walreader socket",
sk->host, sk->port);
ShutdownConnection(sk);
return;
}
wp->api.update_event_set(sk, newEvents);
/* configures event set for yield whatever is the substate */
wp->api.active_state_update_event_set(sk);
}
/*
* Send WAL messages starting from sk->streamingAt until the end or non-writable
* socket, whichever comes first. Caller should take care of updating event set.
* Even if no unsent WAL is available, at least one empty message will be sent
* as a heartbeat, if socket is ready.
* socket or neon_walreader blocks, whichever comes first; active_state is
* updated accordingly. Caller should take care of updating event set. Even if
* no unsent WAL is available, at least one empty message will be sent as a
* heartbeat, if socket is ready.
*
* Can change state if Async* functions encounter errors and reset connection.
* Resets state and kills the connections if any error on them is encountered.
* Returns false in this case, true otherwise.
*/
static bool
@@ -1320,11 +1284,11 @@ SendAppendRequests(Safekeeper *sk)
{
WalProposer *wp = sk->wp;
XLogRecPtr endLsn;
AppendRequestHeader *req;
PGAsyncWriteResult writeResult;
bool sentAnything = false;
AppendRequestHeader *req;
if (sk->flushWrite)
if (sk->active_state == SS_ACTIVE_FLUSH)
{
if (!AsyncFlush(sk))
@@ -1335,76 +1299,99 @@ SendAppendRequests(Safekeeper *sk)
return sk->state == SS_ACTIVE;
/* Event set will be updated in the end of HandleActiveState */
sk->flushWrite = false;
sk->active_state = SS_ACTIVE_SEND;
}
while (sk->streamingAt != wp->availableLsn || !sentAnything)
{
sentAnything = true;
endLsn = sk->streamingAt;
endLsn += MAX_SEND_SIZE;
/* if we went beyond available WAL, back off */
if (endLsn > wp->availableLsn)
if (sk->active_state == SS_ACTIVE_SEND)
{
endLsn = wp->availableLsn;
sentAnything = true;
endLsn = sk->streamingAt;
endLsn += MAX_SEND_SIZE;
/* if we went beyond available WAL, back off */
if (endLsn > wp->availableLsn)
{
endLsn = wp->availableLsn;
}
req = &sk->appendRequest;
PrepareAppendRequest(sk->wp, &sk->appendRequest, sk->streamingAt, endLsn);
walprop_log(DEBUG5, "sending message len %ld beginLsn=%X/%X endLsn=%X/%X commitLsn=%X/%X truncateLsn=%X/%X to %s:%s",
req->endLsn - req->beginLsn,
LSN_FORMAT_ARGS(req->beginLsn),
LSN_FORMAT_ARGS(req->endLsn),
LSN_FORMAT_ARGS(req->commitLsn),
LSN_FORMAT_ARGS(wp->truncateLsn), sk->host, sk->port);
resetStringInfo(&sk->outbuf);
/* write AppendRequest header */
appendBinaryStringInfo(&sk->outbuf, (char *) req, sizeof(AppendRequestHeader));
enlargeStringInfo(&sk->outbuf, req->endLsn - req->beginLsn);
sk->active_state = SS_ACTIVE_READ_WAL;
}
req = &sk->appendRequest;
PrepareAppendRequest(sk->wp, &sk->appendRequest, sk->streamingAt, endLsn);
walprop_log(DEBUG2, "sending message len %ld beginLsn=%X/%X endLsn=%X/%X commitLsn=%X/%X truncateLsn=%X/%X to %s:%s",
req->endLsn - req->beginLsn,
LSN_FORMAT_ARGS(req->beginLsn),
LSN_FORMAT_ARGS(req->endLsn),
LSN_FORMAT_ARGS(req->commitLsn),
LSN_FORMAT_ARGS(wp->truncateLsn), sk->host, sk->port);
resetStringInfo(&sk->outbuf);
/* write AppendRequest header */
appendBinaryStringInfo(&sk->outbuf, (char *) req, sizeof(AppendRequestHeader));
/* write the WAL itself */
enlargeStringInfo(&sk->outbuf, req->endLsn - req->beginLsn);
/* wal_read will raise error on failure */
wp->api.wal_read(sk,
&sk->outbuf.data[sk->outbuf.len],
req->beginLsn,
req->endLsn - req->beginLsn);
sk->outbuf.len += req->endLsn - req->beginLsn;
writeResult = wp->api.conn_async_write(sk, sk->outbuf.data, sk->outbuf.len);
/* Mark current message as sent, whatever the result is */
sk->streamingAt = endLsn;
switch (writeResult)
if (sk->active_state == SS_ACTIVE_READ_WAL)
{
case PG_ASYNC_WRITE_SUCCESS:
/* Continue writing the next message */
break;
req = &sk->appendRequest;
case PG_ASYNC_WRITE_TRY_FLUSH:
switch (wp->api.wal_read(sk,
&sk->outbuf.data[sk->outbuf.len],
req->beginLsn,
req->endLsn - req->beginLsn))
{
case NEON_WALREAD_SUCCESS:
break;
case NEON_WALREAD_WOULDBLOCK:
return true;
case NEON_WALREAD_ERROR:
walprop_log(WARNING, "WAL reading for node %s:%s failed: %s",
sk->host, sk->port,
NeonWALReaderErrMsg(sk->xlogreader));
ShutdownConnection(sk);
return false;
default:
Assert(false);
}
/*
* * We still need to call PQflush some more to finish the
* job. Caller function will handle this by setting right
* event* set.
*/
sk->flushWrite = true;
return true;
sk->outbuf.len += req->endLsn - req->beginLsn;
case PG_ASYNC_WRITE_FAIL:
walprop_log(WARNING, "Failed to send to node %s:%s in %s state: %s",
sk->host, sk->port, FormatSafekeeperState(sk->state),
wp->api.conn_error_message(sk));
ShutdownConnection(sk);
return false;
default:
Assert(false);
return false;
writeResult = wp->api.conn_async_write(sk, sk->outbuf.data, sk->outbuf.len);
/* Mark current message as sent, whatever the result is */
sk->streamingAt = req->endLsn;
switch (writeResult)
{
case PG_ASYNC_WRITE_SUCCESS:
/* Continue writing the next message */
sk->active_state = SS_ACTIVE_SEND;
break;
case PG_ASYNC_WRITE_TRY_FLUSH:
/*
* We still need to call PQflush some more to finish the
* job. Caller function will handle this by setting right
* event set.
*/
sk->active_state = SS_ACTIVE_FLUSH;
return true;
case PG_ASYNC_WRITE_FAIL:
walprop_log(WARNING, "failed to send to node %s:%s in %s state: %s",
sk->host, sk->port, FormatSafekeeperState(sk->state, sk->active_state),
wp->api.conn_error_message(sk));
ShutdownConnection(sk);
return false;
default:
Assert(false);
return false;
}
}
}
@@ -1414,7 +1401,7 @@ SendAppendRequests(Safekeeper *sk)
/*
* Receive and process all available feedback.
*
* Can change state if Async* functions encounter errors and reset connection.
* Resets state and kills the connection if any error on it is encountered.
* Returns false in this case, true otherwise.
*
* NB: This function can call SendMessageToNode and produce new messages.
@@ -1595,6 +1582,53 @@ GetAcknowledgedByQuorumWALPosition(WalProposer *wp)
return responses[wp->n_safekeepers - wp->quorum];
}
/*
* Return safekeeper with active connection from which WAL can be downloaded, or
* none if it doesn't exist. donor_lsn is set to end position of the donor to
* the best of our knowledge.
*/
Safekeeper *
GetDonor(WalProposer *wp, XLogRecPtr *donor_lsn)
{
*donor_lsn = InvalidXLogRecPtr;
Safekeeper *donor = NULL;
int i;
if (wp->n_votes < wp->quorum)
{
walprop_log(WARNING, "GetDonor called before elections are won");
return NULL;
}
/*
* First, consider node which had determined our term start LSN as we know
* about its position immediately after election before any feedbacks are
* sent.
*/
if (wp->safekeeper[wp->donor].state >= SS_IDLE)
{
donor = &wp->safekeeper[wp->donor];
*donor_lsn = wp->propEpochStartLsn;
}
/*
* But also check feedbacks from all nodes with live connections and take
* the highest one. Note: if node sends feedbacks it already processed
* elected message so its term is fine.
*/
for (i = 0; i < wp->n_safekeepers; i++)
{
Safekeeper *sk = &wp->safekeeper[i];
if (sk->state == SS_ACTIVE && sk->appendResponse.flushLsn > *donor_lsn)
{
donor = sk;
*donor_lsn = sk->appendResponse.flushLsn;
}
}
return donor;
}
static void
HandleSafekeeperResponse(WalProposer *wp)
{
@@ -1700,7 +1734,7 @@ AsyncRead(Safekeeper *sk, char **buf, int *buf_size)
case PG_ASYNC_READ_FAIL:
walprop_log(WARNING, "Failed to read from node %s:%s in %s state: %s", sk->host,
sk->port, FormatSafekeeperState(sk->state),
sk->port, FormatSafekeeperState(sk->state, sk->active_state),
wp->api.conn_error_message(sk));
ShutdownConnection(sk);
return false;
@@ -1740,7 +1774,7 @@ AsyncReadMessage(Safekeeper *sk, AcceptorProposerMessage *anymsg)
if (tag != anymsg->tag)
{
walprop_log(WARNING, "unexpected message tag %c from node %s:%s in state %s", (char) tag, sk->host,
sk->port, FormatSafekeeperState(sk->state));
sk->port, FormatSafekeeperState(sk->state, sk->active_state));
ResetConnection(sk);
return false;
}
@@ -1811,12 +1845,13 @@ static bool
BlockingWrite(Safekeeper *sk, void *msg, size_t msg_size, SafekeeperState success_state)
{
WalProposer *wp = sk->wp;
uint32 events;
uint32 sk_events;
uint32 nwr_events;
if (!wp->api.conn_blocking_write(sk, msg, msg_size))
{
walprop_log(WARNING, "Failed to send to node %s:%s in %s state: %s",
sk->host, sk->port, FormatSafekeeperState(sk->state),
sk->host, sk->port, FormatSafekeeperState(sk->state, sk->active_state),
wp->api.conn_error_message(sk));
ShutdownConnection(sk);
return false;
@@ -1828,9 +1863,15 @@ BlockingWrite(Safekeeper *sk, void *msg, size_t msg_size, SafekeeperState succes
* If the new state will be waiting for events to happen, update the event
* set to wait for those
*/
events = SafekeeperStateDesiredEvents(success_state);
if (events)
wp->api.update_event_set(sk, events);
SafekeeperStateDesiredEvents(sk, &sk_events, &nwr_events);
/*
* nwr_events is relevant only during SS_ACTIVE which doesn't user
* BlockingWrite
*/
Assert(!nwr_events);
if (sk_events)
wp->api.update_event_set(sk, sk_events);
return true;
}
@@ -1863,7 +1904,7 @@ AsyncWrite(Safekeeper *sk, void *msg, size_t msg_size, SafekeeperState flush_sta
return false;
case PG_ASYNC_WRITE_FAIL:
walprop_log(WARNING, "Failed to send to node %s:%s in %s state: %s",
sk->host, sk->port, FormatSafekeeperState(sk->state),
sk->host, sk->port, FormatSafekeeperState(sk->state, sk->active_state),
wp->api.conn_error_message(sk));
ShutdownConnection(sk);
return false;
@@ -1902,7 +1943,7 @@ AsyncFlush(Safekeeper *sk)
return false;
case -1:
walprop_log(WARNING, "Failed to flush write to node %s:%s in %s state: %s",
sk->host, sk->port, FormatSafekeeperState(sk->state),
sk->host, sk->port, FormatSafekeeperState(sk->state, sk->active_state),
wp->api.conn_error_message(sk));
ResetConnection(sk);
return false;
@@ -1932,14 +1973,14 @@ CompareLsn(const void *a, const void *b)
*
* The strings are intended to be used as a prefix to "state", e.g.:
*
* walprop_log(LOG, "currently in %s state", FormatSafekeeperState(sk->state));
* walprop_log(LOG, "currently in %s state", FormatSafekeeperState(sk->state, sk->active_state));
*
* If this sort of phrasing doesn't fit the message, instead use something like:
*
* walprop_log(LOG, "currently in state [%s]", FormatSafekeeperState(sk->state));
* walprop_log(LOG, "currently in state [%s]", FormatSafekeeperState(sk->state, sk->active_state));
*/
static char *
FormatSafekeeperState(SafekeeperState state)
FormatSafekeeperState(SafekeeperState state, SafekeeperActiveState active_state)
{
char *return_val = NULL;
@@ -1971,7 +2012,18 @@ FormatSafekeeperState(SafekeeperState state)
return_val = "idle";
break;
case SS_ACTIVE:
return_val = "active";
switch (active_state)
{
case SS_ACTIVE_SEND:
return_val = "active send";
break;
case SS_ACTIVE_READ_WAL:
return_val = "active read WAL";
break;
case SS_ACTIVE_FLUSH:
return_val = "active flush";
break;
}
break;
}
@@ -1984,22 +2036,20 @@ FormatSafekeeperState(SafekeeperState state)
static void
AssertEventsOkForState(uint32 events, Safekeeper *sk)
{
WalProposer *wp = sk->wp;
uint32 expected = SafekeeperStateDesiredEvents(sk->state);
/*
* The events are in-line with what we're expecting, under two conditions:
* (a) if we aren't expecting anything, `events` has no read- or
* write-ready component. (b) if we are expecting something, there's
* overlap (i.e. `events & expected != 0`)
*/
uint32 sk_events;
uint32 nwr_events;
uint32 expected;
bool events_ok_for_state; /* long name so the `Assert` is more
* clear later */
if (expected == WL_NO_EVENTS)
events_ok_for_state = ((events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)) == 0);
else
events_ok_for_state = ((events & expected) != 0);
SafekeeperStateDesiredEvents(sk, &sk_events, &nwr_events);
/*
* Without one more level of notify target indirection we have no way to
* distinguish which socket woke up us, so just union expected events.
*/
expected = sk_events | nwr_events;
events_ok_for_state = ((events & expected) != 0);
if (!events_ok_for_state)
{
@@ -2008,36 +2058,37 @@ AssertEventsOkForState(uint32 events, Safekeeper *sk)
* and then an assertion that's guaranteed to fail.
*/
walprop_log(WARNING, "events %s mismatched for safekeeper %s:%s in state [%s]",
FormatEvents(wp, events), sk->host, sk->port, FormatSafekeeperState(sk->state));
FormatEvents(events), sk->host, sk->port, FormatSafekeeperState(sk->state, sk->active_state));
Assert(events_ok_for_state);
}
}
/* Returns the set of events a safekeeper in this state should be waiting on
/* Returns the set of events for both safekeeper (sk_events) and neon_walreader
* (nwr_events) sockets a safekeeper in this state should be waiting on.
*
* This will return WL_NO_EVENTS (= 0) for some events. */
static uint32
SafekeeperStateDesiredEvents(SafekeeperState state)
void
SafekeeperStateDesiredEvents(Safekeeper *sk, uint32 *sk_events, uint32 *nwr_events)
{
uint32 result = WL_NO_EVENTS;
*nwr_events = 0; /* nwr_events is empty for most states */
/* If the state doesn't have a modifier, we can check the base state */
switch (state)
switch (sk->state)
{
/* Connecting states say what they want in the name */
case SS_CONNECTING_READ:
result = WL_SOCKET_READABLE;
break;
*sk_events = WL_SOCKET_READABLE;
return;
case SS_CONNECTING_WRITE:
result = WL_SOCKET_WRITEABLE;
break;
*sk_events = WL_SOCKET_WRITEABLE;
return;
/* Reading states need the socket to be read-ready to continue */
case SS_WAIT_EXEC_RESULT:
case SS_HANDSHAKE_RECV:
case SS_WAIT_VERDICT:
result = WL_SOCKET_READABLE;
break;
*sk_events = WL_SOCKET_READABLE;
return;
/*
* Idle states use read-readiness as a sign that the connection
@@ -2045,32 +2096,62 @@ SafekeeperStateDesiredEvents(SafekeeperState state)
*/
case SS_VOTING:
case SS_IDLE:
result = WL_SOCKET_READABLE;
break;
*sk_events = WL_SOCKET_READABLE;
return;
/*
* Flush states require write-ready for flushing. Active state
* does both reading and writing.
*
* TODO: SS_ACTIVE sometimes doesn't need to be write-ready. We
* should check sk->flushWrite here to set WL_SOCKET_WRITEABLE.
*/
case SS_SEND_ELECTED_FLUSH:
*sk_events = WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE;
return;
case SS_ACTIVE:
result = WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE;
break;
switch (sk->active_state)
{
/*
* Everything is sent; we just wait for sk responses and
* latch.
*
* Note: this assumes we send all available WAL to
* safekeeper in one wakeup (unless it blocks). Otherwise
* we would want WL_SOCKET_WRITEABLE here to finish the
* work.
*/
case SS_ACTIVE_SEND:
*sk_events = WL_SOCKET_READABLE;
if (NeonWALReaderEvents(sk->xlogreader))
*nwr_events = WL_SOCKET_CLOSED; /* c.f.
* walprop_pg_active_state_update_event_set */
return;
/*
* Waiting for neon_walreader socket, but we still read
* responses from sk socket.
*/
case SS_ACTIVE_READ_WAL:
*sk_events = WL_SOCKET_READABLE;
*nwr_events = NeonWALReaderEvents(sk->xlogreader);
return;
/*
* Need to flush the sk socket, so ignore neon_walreader
* one and set write interest on sk.
*/
case SS_ACTIVE_FLUSH:
*sk_events = WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE;
if (NeonWALReaderEvents(sk->xlogreader))
*nwr_events = WL_SOCKET_CLOSED; /* c.f.
* walprop_pg_active_state_update_event_set */
return;
}
return;
/* The offline state expects no events. */
case SS_OFFLINE:
result = WL_NO_EVENTS;
break;
*sk_events = 0;
return;
default:
Assert(false);
break;
}
return result;
}
/* Returns a human-readable string corresponding to the event set
@@ -2081,7 +2162,7 @@ SafekeeperStateDesiredEvents(SafekeeperState state)
* The string should not be freed. It should also not be expected to remain the same between
* function calls. */
static char *
FormatEvents(WalProposer *wp, uint32 events)
FormatEvents(uint32 events)
{
static char return_str[8];

View File

@@ -10,6 +10,9 @@
#include "utils/uuid.h"
#include "replication/walreceiver.h"
#include "libpqwalproposer.h"
#include "neon_walreader.h"
#define SK_MAGIC 0xCafeCeefu
#define SK_PROTOCOL_VERSION 2
@@ -22,43 +25,9 @@
*/
#define WL_NO_EVENTS 0
struct WalProposerConn; /* Defined in implementation (walprop_pg.c) */
struct WalProposerConn; /* Defined in libpqwalproposer.h */
typedef struct WalProposerConn WalProposerConn;
/* Possible return values from ReadPGAsync */
typedef enum
{
/* The full read was successful. buf now points to the data */
PG_ASYNC_READ_SUCCESS,
/*
* The read is ongoing. Wait until the connection is read-ready, then try
* again.
*/
PG_ASYNC_READ_TRY_AGAIN,
/* Reading failed. Check PQerrorMessage(conn) */
PG_ASYNC_READ_FAIL,
} PGAsyncReadResult;
/* Possible return values from WritePGAsync */
typedef enum
{
/* The write fully completed */
PG_ASYNC_WRITE_SUCCESS,
/*
* The write started, but you'll need to call PQflush some more times to
* finish it off. We just tried, so it's best to wait until the connection
* is read- or write-ready to try again.
*
* If it becomes read-ready, call PQconsumeInput and flush again. If it
* becomes write-ready, just call PQflush.
*/
PG_ASYNC_WRITE_TRY_FLUSH,
/* Writing failed. Check PQerrorMessage(conn) */
PG_ASYNC_WRITE_FAIL,
} PGAsyncWriteResult;
/*
* WAL safekeeper state, which is used to wait for some event.
*
@@ -135,6 +104,40 @@ typedef enum
SS_ACTIVE,
} SafekeeperState;
/*
* Sending WAL substates of SS_ACTIVE.
*/
typedef enum
{
/*
* We are ready to send more WAL, waiting for latch set to learn about
* more WAL becoming available (or just a timeout to send heartbeat).
*/
SS_ACTIVE_SEND,
/*
* Polling neon_walreader to receive chunk of WAL (probably remotely) to
* send to this safekeeper.
*
* Note: socket management is done completely inside walproposer_pg for
* simplicity, and thus simulation doesn't test it. Which is fine as
* simulation is mainly aimed at consensus checks, not waiteventset
* management.
*
* Also, while in this state we don't touch safekeeper socket, so in
* theory it might close connection as inactive. This can be addressed if
* needed; however, while fetching WAL we should regularly send it, so the
* problem is unlikely. Vice versa is also true (SS_ACTIVE doesn't handle
* walreader socket), but similarly shouldn't be a problem.
*/
SS_ACTIVE_READ_WAL,
/*
* Waiting for write readiness to flush the socket.
*/
SS_ACTIVE_FLUSH,
} SafekeeperActiveState;
/* Consensus logical timestamp. */
typedef uint64 term_t;
@@ -343,12 +346,11 @@ typedef struct Safekeeper
*/
XLogRecPtr startStreamingAt;
bool flushWrite; /* set to true if we need to call AsyncFlush,*
* to flush pending messages */
XLogRecPtr streamingAt; /* current streaming position */
AppendRequestHeader appendRequest; /* request for sending to safekeeper */
SafekeeperState state; /* safekeeper state machine state */
SafekeeperActiveState active_state;
TimestampTz latestMsgReceivedAt; /* when latest msg is received */
AcceptorGreeting greetResponse; /* acceptor greeting */
VoteResponse voteResponse; /* the vote */
@@ -369,12 +371,17 @@ typedef struct Safekeeper
/*
* WAL reader, allocated for each safekeeper.
*/
XLogReaderState *xlogreader;
NeonWALReader *xlogreader;
/*
* Position in wait event set. Equal to -1 if no event
*/
int eventPos;
/*
* Neon WAL reader position in wait event set, or -1 if no socket.
*/
int nwrEventPos;
#endif
@@ -403,31 +410,6 @@ typedef enum
*/
} WalProposerConnectPollStatusType;
/* Re-exported and modified ExecStatusType */
typedef enum
{
/* We received a single CopyBoth result */
WP_EXEC_SUCCESS_COPYBOTH,
/*
* Any success result other than a single CopyBoth was received. The
* specifics of the result were already logged, but it may be useful to
* provide an error message indicating which safekeeper messed up.
*
* Do not expect PQerrorMessage to be appropriately set.
*/
WP_EXEC_UNEXPECTED_SUCCESS,
/*
* No result available at this time. Wait until read-ready, then call
* again. Internally, this is returned when PQisBusy indicates that
* PQgetResult would block.
*/
WP_EXEC_NEEDS_INPUT,
/* Catch-all failure. Check PQerrorMessage. */
WP_EXEC_FAILED,
} WalProposerExecStatusType;
/* Re-exported ConnStatusType */
typedef enum
{
@@ -509,23 +491,26 @@ typedef struct walproposer_api
bool (*recovery_download) (Safekeeper *sk, TimeLineID timeline, XLogRecPtr startpos, XLogRecPtr endpos);
/* Read WAL from disk to buf. */
void (*wal_read) (Safekeeper *sk, char *buf, XLogRecPtr startptr, Size count);
NeonWALReadResult (*wal_read) (Safekeeper *sk, char *buf, XLogRecPtr startptr, Size count);
/* Allocate WAL reader. */
void (*wal_reader_allocate) (Safekeeper *sk);
/* Deallocate event set. */
void (*free_event_set) (WalProposer *wp);
/* Initialize event set. */
void (*init_event_set) (WalProposer *wp);
/* Update events for an existing safekeeper connection. */
void (*update_event_set) (Safekeeper *sk, uint32 events);
/* Configure wait event set for yield in SS_ACTIVE. */
void (*active_state_update_event_set) (Safekeeper *sk);
/* Add a new safekeeper connection to the event set. */
void (*add_safekeeper_event_set) (Safekeeper *sk, uint32 events);
/* Remove safekeeper connection from event set */
void (*rm_safekeeper_event_set) (Safekeeper *sk);
/*
* Wait until some event happens: - timeout is reached - socket event for
* safekeeper connection - new WAL is available
@@ -711,6 +696,13 @@ extern void WalProposerBroadcast(WalProposer *wp, XLogRecPtr startpos, XLogRecPt
extern void WalProposerPoll(WalProposer *wp);
extern void WalProposerFree(WalProposer *wp);
/*
* WaitEventSet API doesn't allow to remove socket, so walproposer_pg uses it to
* recreate set from scratch, hence the export.
*/
extern void SafekeeperStateDesiredEvents(Safekeeper *sk, uint32 *sk_events, uint32 *nwr_events);
extern Safekeeper *GetDonor(WalProposer *wp, XLogRecPtr *donor_lsn);
#define WPEVENT 1337 /* special log level for walproposer internal
* events */

View File

@@ -43,10 +43,13 @@
#include "utils/ps_status.h"
#include "utils/timestamp.h"
#include "neon.h"
#include "walproposer.h"
#include "libpq-fe.h"
#include "libpqwalproposer.h"
#include "neon.h"
#include "neon_walreader.h"
#include "walproposer.h"
#define XLOG_HDR_SIZE (1 + 8 * 3) /* 'w' + startPos + walEnd + timestamp */
#define XLOG_HDR_START_POS 1 /* offset of start position in wal sender*
* message header */
@@ -91,6 +94,10 @@ static void XLogBroadcastWalProposer(WalProposer *wp);
static void XLogWalPropWrite(char *buf, Size nbytes, XLogRecPtr recptr);
static void XLogWalPropClose(XLogRecPtr recptr);
static void add_nwr_event_set(Safekeeper *sk, uint32 events);
static void update_nwr_event_set(Safekeeper *sk, uint32 events);
static void rm_safekeeper_event_set(Safekeeper *to_remove, bool is_sk);
static void
init_walprop_config(bool syncSafekeepers)
{
@@ -541,14 +548,6 @@ walprop_pg_load_libpqwalreceiver(void)
elog(ERROR, "libpqwalreceiver didn't initialize correctly");
}
/* Header in walproposer.h -- Wrapper struct to abstract away the libpq connection */
struct WalProposerConn
{
PGconn *pg_conn;
bool is_nonblocking; /* whether the connection is non-blocking */
char *recvbuf; /* last received data from walprop_async_read */
};
/* Helper function */
static bool
ensure_nonblocking_status(WalProposerConn *conn, bool is_nonblocking)
@@ -586,16 +585,17 @@ walprop_status(Safekeeper *sk)
}
}
static void
walprop_connect_start(Safekeeper *sk)
WalProposerConn *
libpqwp_connect_start(char *conninfo)
{
PGconn *pg_conn;
WalProposerConn *conn;
const char *keywords[3];
const char *values[3];
int n;
char *password = neon_auth_token;
Assert(sk->conn == NULL);
/*
* Connect using the given connection string. If the NEON_AUTH_TOKEN
@@ -614,7 +614,7 @@ walprop_connect_start(Safekeeper *sk)
n++;
}
keywords[n] = "dbname";
values[n] = sk->conninfo;
values[n] = conninfo;
n++;
keywords[n] = NULL;
values[n] = NULL;
@@ -635,11 +635,20 @@ walprop_connect_start(Safekeeper *sk)
* palloc will exit on failure though, so there's not much we could do if
* it *did* fail.
*/
sk->conn = palloc(sizeof(WalProposerConn));
sk->conn->pg_conn = pg_conn;
sk->conn->is_nonblocking = false; /* connections always start in
* blocking mode */
sk->conn->recvbuf = NULL;
conn = palloc(sizeof(WalProposerConn));
conn->pg_conn = pg_conn;
conn->is_nonblocking = false; /* connections always start in blocking
* mode */
conn->recvbuf = NULL;
return conn;
}
static void
walprop_connect_start(Safekeeper *sk)
{
Assert(sk->conn == NULL);
sk->conn = libpqwp_connect_start(sk->conninfo);
}
static WalProposerConnectPollStatusType
@@ -683,26 +692,33 @@ walprop_connect_poll(Safekeeper *sk)
return return_val;
}
static bool
walprop_send_query(Safekeeper *sk, char *query)
extern bool
libpqwp_send_query(WalProposerConn *conn, char *query)
{
/*
* We need to be in blocking mode for sending the query to run without
* requiring a call to PQflush
*/
if (!ensure_nonblocking_status(sk->conn, false))
if (!ensure_nonblocking_status(conn, false))
return false;
/* PQsendQuery returns 1 on success, 0 on failure */
if (!PQsendQuery(sk->conn->pg_conn, query))
if (!PQsendQuery(conn->pg_conn, query))
return false;
return true;
}
static WalProposerExecStatusType
walprop_get_query_result(Safekeeper *sk)
static bool
walprop_send_query(Safekeeper *sk, char *query)
{
return libpqwp_send_query(sk->conn, query);
}
WalProposerExecStatusType
libpqwp_get_query_result(WalProposerConn *conn)
{
PGresult *result;
WalProposerExecStatusType return_val;
@@ -710,14 +726,14 @@ walprop_get_query_result(Safekeeper *sk)
char *unexpected_success = NULL;
/* Consume any input that we might be missing */
if (!PQconsumeInput(sk->conn->pg_conn))
if (!PQconsumeInput(conn->pg_conn))
return WP_EXEC_FAILED;
if (PQisBusy(sk->conn->pg_conn))
if (PQisBusy(conn->pg_conn))
return WP_EXEC_NEEDS_INPUT;
result = PQgetResult(sk->conn->pg_conn);
result = PQgetResult(conn->pg_conn);
/*
* PQgetResult returns NULL only if getting the result was successful &
@@ -778,6 +794,12 @@ walprop_get_query_result(Safekeeper *sk)
return return_val;
}
static WalProposerExecStatusType
walprop_get_query_result(Safekeeper *sk)
{
return libpqwp_get_query_result(sk->conn);
}
static pgsocket
walprop_socket(Safekeeper *sk)
{
@@ -790,38 +812,21 @@ walprop_flush(Safekeeper *sk)
return (PQflush(sk->conn->pg_conn));
}
static void
walprop_finish(Safekeeper *sk)
/* Like libpqrcv_receive. *buf is valid until the next call. */
PGAsyncReadResult
libpqwp_async_read(WalProposerConn *conn, char **buf, int *amount)
{
if (!sk->conn)
return;
if (sk->conn->recvbuf != NULL)
PQfreemem(sk->conn->recvbuf);
PQfinish(sk->conn->pg_conn);
pfree(sk->conn);
sk->conn = NULL;
}
/*
* Receive a message from the safekeeper.
*
* On success, the data is placed in *buf. It is valid until the next call
* to this function.
*/
static PGAsyncReadResult
walprop_async_read(Safekeeper *sk, char **buf, int *amount)
{
int result;
if (sk->conn->recvbuf != NULL)
if (conn->recvbuf != NULL)
{
PQfreemem(sk->conn->recvbuf);
sk->conn->recvbuf = NULL;
PQfreemem(conn->recvbuf);
conn->recvbuf = NULL;
}
/* Call PQconsumeInput so that we have the data we need */
if (!PQconsumeInput(sk->conn->pg_conn))
if (!PQconsumeInput(conn->pg_conn))
{
*amount = 0;
*buf = NULL;
@@ -839,7 +844,7 @@ walprop_async_read(Safekeeper *sk, char **buf, int *amount)
* sometimes be triggered by the server returning an ErrorResponse (which
* also happens to have the effect that the copy is done).
*/
switch (result = PQgetCopyData(sk->conn->pg_conn, &sk->conn->recvbuf, true))
switch (result = PQgetCopyData(conn->pg_conn, &conn->recvbuf, true))
{
case 0:
*amount = 0;
@@ -854,7 +859,7 @@ walprop_async_read(Safekeeper *sk, char **buf, int *amount)
* We can check PQgetResult to make sure that the server
* failed; it'll always result in PGRES_FATAL_ERROR
*/
ExecStatusType status = PQresultStatus(PQgetResult(sk->conn->pg_conn));
ExecStatusType status = PQresultStatus(PQgetResult(conn->pg_conn));
if (status != PGRES_FATAL_ERROR)
elog(FATAL, "unexpected result status %d after failed PQgetCopyData", status);
@@ -875,11 +880,23 @@ walprop_async_read(Safekeeper *sk, char **buf, int *amount)
default:
/* Positive values indicate the size of the returned result */
*amount = result;
*buf = sk->conn->recvbuf;
*buf = conn->recvbuf;
return PG_ASYNC_READ_SUCCESS;
}
}
/*
* Receive a message from the safekeeper.
*
* On success, the data is placed in *buf. It is valid until the next call
* to this function.
*/
static PGAsyncReadResult
walprop_async_read(Safekeeper *sk, char **buf, int *amount)
{
return libpqwp_async_read(sk->conn, buf, amount);
}
static PGAsyncWriteResult
walprop_async_write(Safekeeper *sk, void const *buf, size_t size)
{
@@ -962,6 +979,32 @@ walprop_blocking_write(Safekeeper *sk, void const *buf, size_t size)
return true;
}
void
libpqwp_disconnect(WalProposerConn *conn)
{
if (conn->recvbuf != NULL)
PQfreemem(conn->recvbuf);
PQfinish(conn->pg_conn);
pfree(conn);
}
static void
walprop_finish(Safekeeper *sk)
{
if (sk->conn)
{
libpqwp_disconnect(sk->conn);
sk->conn = NULL;
}
/* free xlogreader */
if (sk->xlogreader)
{
NeonWALReaderFree(sk->xlogreader);
sk->xlogreader = NULL;
}
}
/*
* Subscribe for new WAL and stream it in the loop to safekeepers.
*
@@ -1386,26 +1429,41 @@ XLogWalPropClose(XLogRecPtr recptr)
walpropFile = -1;
}
static void
static NeonWALReadResult
walprop_pg_wal_read(Safekeeper *sk, char *buf, XLogRecPtr startptr, Size count)
{
WALReadError errinfo;
NeonWALReadResult res;
if (!WALRead(sk->xlogreader,
buf,
startptr,
count,
walprop_pg_get_timeline_id(),
&errinfo))
res = NeonWALRead(sk->xlogreader,
buf,
startptr,
count,
walprop_pg_get_timeline_id());
if (res == NEON_WALREAD_SUCCESS)
{
WALReadRaiseError(&errinfo);
/*
* If we have the socket subscribed, but walreader doesn't need any
* events, it must mean that remote connection just closed hoping to
* do next read locally. Remove the socket then. It is important to do
* as otherwise next read might open another connection and we won't
* be able to distinguish whether we have correct socket added in wait
* event set.
*/
if (NeonWALReaderEvents(sk->xlogreader) == 0)
rm_safekeeper_event_set(sk, false);
}
return res;
}
static void
walprop_pg_wal_reader_allocate(Safekeeper *sk)
{
sk->xlogreader = XLogReaderAllocate(wal_segment_size, NULL, XL_ROUTINE(.segment_open = wal_segment_open,.segment_close = wal_segment_close), NULL);
char log_prefix[64];
snprintf(log_prefix, sizeof(log_prefix), "sk %s:%s nwr: ", sk->host, sk->port);
sk->xlogreader = NeonWALReaderAllocate(wal_segment_size, sk->wp->propEpochStartLsn, sk->wp, log_prefix);
if (sk->xlogreader == NULL)
elog(FATAL, "Failed to allocate xlog reader");
}
@@ -1424,6 +1482,7 @@ walprop_pg_free_event_set(WalProposer *wp)
for (int i = 0; i < wp->n_safekeepers; i++)
{
wp->safekeeper[i].eventPos = -1;
wp->safekeeper[i].nwrEventPos = -1;
}
}
@@ -1433,11 +1492,35 @@ walprop_pg_init_event_set(WalProposer *wp)
if (waitEvents)
elog(FATAL, "double-initialization of event set");
waitEvents = CreateWaitEventSet(TopMemoryContext, 2 + wp->n_safekeepers);
/* for each sk, we have socket plus potentially socket for neon walreader */
waitEvents = CreateWaitEventSet(TopMemoryContext, 2 + 2 * wp->n_safekeepers);
AddWaitEventToSet(waitEvents, WL_LATCH_SET, PGINVALID_SOCKET,
MyLatch, NULL);
AddWaitEventToSet(waitEvents, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
NULL, NULL);
for (int i = 0; i < wp->n_safekeepers; i++)
{
wp->safekeeper[i].eventPos = -1;
wp->safekeeper[i].nwrEventPos = -1;
}
}
/* add safekeeper socket to wait event set */
static void
walprop_pg_add_safekeeper_event_set(Safekeeper *sk, uint32 events)
{
Assert(sk->eventPos == -1);
sk->eventPos = AddWaitEventToSet(waitEvents, events, walprop_socket(sk), NULL, sk);
}
/* add neon wal reader socket to wait event set */
static void
add_nwr_event_set(Safekeeper *sk, uint32 events)
{
Assert(sk->nwrEventPos == -1);
sk->nwrEventPos = AddWaitEventToSet(waitEvents, events, NeonWALReaderSocket(sk->xlogreader), NULL, sk);
elog(DEBUG5, "sk %s:%s: added nwr socket events %d", sk->host, sk->port, events);
}
static void
@@ -1449,10 +1532,139 @@ walprop_pg_update_event_set(Safekeeper *sk, uint32 events)
ModifyWaitEvent(waitEvents, sk->eventPos, events, NULL);
}
/*
* Update neon_walreader event.
* Can be called when nwr socket doesn't exist, does nothing in this case.
*/
static void
walprop_pg_add_safekeeper_event_set(Safekeeper *sk, uint32 events)
update_nwr_event_set(Safekeeper *sk, uint32 events)
{
sk->eventPos = AddWaitEventToSet(waitEvents, events, walprop_socket(sk), NULL, sk);
/* eventPos = -1 when we don't have an event */
if (sk->nwrEventPos != -1)
ModifyWaitEvent(waitEvents, sk->nwrEventPos, events, NULL);
}
static void
walprop_pg_active_state_update_event_set(Safekeeper *sk)
{
uint32 sk_events;
uint32 nwr_events;
Assert(sk->state == SS_ACTIVE);
SafekeeperStateDesiredEvents(sk, &sk_events, &nwr_events);
/*
* If we need to wait for neon_walreader, ensure we have up to date socket
* in the wait event set.
*/
if (sk->active_state == SS_ACTIVE_READ_WAL)
{
/*
* TODO: instead of reattaching socket (and thus recreating WES) each
* time we should keep it if possible, i.e. if connection is already
* established. Note that single neon_walreader object can switch
* between local and remote reads multiple times during its lifetime,
* so careful bookkeeping is needed here.
*/
rm_safekeeper_event_set(sk, false);
add_nwr_event_set(sk, nwr_events);
}
else
{
/*
* Hack: we should always set 0 here, but for random reasons
* WaitEventSet (WaitEventAdjustEpoll) asserts that there is at least
* some event. Since there is also no way to remove socket except
* reconstructing the whole set, SafekeeperStateDesiredEvents instead
* gives WL_SOCKET_CLOSED if socket exists.
*/
Assert(nwr_events == WL_SOCKET_CLOSED || nwr_events == 0);
update_nwr_event_set(sk, WL_SOCKET_CLOSED);
}
walprop_pg_update_event_set(sk, sk_events);
}
static void
walprop_pg_rm_safekeeper_event_set(Safekeeper *to_remove)
{
rm_safekeeper_event_set(to_remove, true);
}
/*
* A hacky way to remove single event from the event set. Can be called if event
* doesn't exist, does nothing in this case.
*
* Note: Internally, this completely reconstructs the event set. It should be
* avoided if possible.
*
* If is_sk is true, socket of connection to safekeeper is removed; otherwise
* socket of neon_walreader.
*/
static void
rm_safekeeper_event_set(Safekeeper *to_remove, bool is_sk)
{
WalProposer *wp = to_remove->wp;
elog(DEBUG5, "sk %s:%s: removing event, is_sk %d",
to_remove->host, to_remove->port, is_sk);
/*
* Shortpath for exiting if have nothing to do. We never call this
* function with safekeeper socket not existing, but do that with neon
* walreader socket.
*/
if ((is_sk && to_remove->eventPos == -1) ||
(!is_sk && to_remove->nwrEventPos == -1))
{
return;
}
/* Remove the existing event set, assign sk->eventPos = -1 */
walprop_pg_free_event_set(wp);
/* Re-initialize it without adding any safekeeper events */
wp->api.init_event_set(wp);
/*
* loop through the existing safekeepers. If they aren't the one we're
* removing, and if they have a socket we can use, re-add the applicable
* events.
*/
for (int i = 0; i < wp->n_safekeepers; i++)
{
Safekeeper *sk = &wp->safekeeper[i];
if (sk == to_remove)
{
if (is_sk)
sk->eventPos = -1;
else
sk->nwrEventPos = -1;
}
/*
* If this safekeeper isn't offline, add events for it, except for the
* event requested to remove.
*/
if (sk->state != SS_OFFLINE)
{
uint32 sk_events;
uint32 nwr_events;
SafekeeperStateDesiredEvents(sk, &sk_events, &nwr_events);
if (sk != to_remove || !is_sk)
{
/* will set sk->eventPos */
wp->api.add_safekeeper_event_set(sk, sk_events);
}
else if ((sk != to_remove || is_sk) && nwr_events)
{
add_nwr_event_set(sk, nwr_events);
}
}
}
}
static int
@@ -1718,10 +1930,11 @@ static const walproposer_api walprop_pg = {
.recovery_download = WalProposerRecovery,
.wal_read = walprop_pg_wal_read,
.wal_reader_allocate = walprop_pg_wal_reader_allocate,
.free_event_set = walprop_pg_free_event_set,
.init_event_set = walprop_pg_init_event_set,
.update_event_set = walprop_pg_update_event_set,
.active_state_update_event_set = walprop_pg_active_state_update_event_set,
.add_safekeeper_event_set = walprop_pg_add_safekeeper_event_set,
.rm_safekeeper_event_set = walprop_pg_rm_safekeeper_event_set,
.wait_event_set = walprop_pg_wait_event_set,
.strong_random = walprop_pg_strong_random,
.get_redo_start_lsn = walprop_pg_get_redo_start_lsn,