mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-28 00:23:00 +00:00
Introduce NeonWALReader downloading sk -> compute WAL on demand.
It is similar to XLogReader, but when either requested segment is missing locally or requested LSN is before basebackup_lsn NeonWALReader asynchronously fetches WAL from one of safekeepers. Patch includes walproposer switch to NeonWALReader, splitting wouldn't make much sense as it is hard to test otherwise. This finally removes risk of pg_wal explosion (as well as slow start time) when one safekeeper is lagging, at the same time allowing to recover it. In the future reader should also be used by logical walsender for similar reasons (currently we download the tail on compute start synchronously). The main test is test_lagging_sk. However, I also run it manually a lot varying MAX_SEND_SIZE on both sides (on safekeeper and on walproposer), testing various fragmentations (one side having small buffer, another, both), which brought up https://github.com/neondatabase/neon/issues/6055 closes https://github.com/neondatabase/neon/issues/1012
This commit is contained in:
@@ -9,6 +9,7 @@ OBJS = \
|
||||
libpagestore.o \
|
||||
neon.o \
|
||||
neon_utils.o \
|
||||
neon_walreader.o \
|
||||
pagestore_smgr.o \
|
||||
relsize_cache.o \
|
||||
walproposer.o \
|
||||
|
||||
96
pgxn/neon/libpqwalproposer.h
Normal file
96
pgxn/neon/libpqwalproposer.h
Normal file
@@ -0,0 +1,96 @@
|
||||
/*
|
||||
* Interface to set of libpq wrappers walproposer and neon_walreader need.
|
||||
* Similar to libpqwalreceiver, but it has blocking connection establishment and
|
||||
* pqexec which don't fit us. Implementation is at walproposer_pg.c.
|
||||
*/
|
||||
#ifndef ___LIBPQWALPROPOSER_H__
|
||||
#define ___LIBPQWALPROPOSER_H__
|
||||
|
||||
/* Re-exported and modified ExecStatusType */
|
||||
typedef enum
|
||||
{
|
||||
/* We received a single CopyBoth result */
|
||||
WP_EXEC_SUCCESS_COPYBOTH,
|
||||
|
||||
/*
|
||||
* Any success result other than a single CopyBoth was received. The
|
||||
* specifics of the result were already logged, but it may be useful to
|
||||
* provide an error message indicating which safekeeper messed up.
|
||||
*
|
||||
* Do not expect PQerrorMessage to be appropriately set.
|
||||
*/
|
||||
WP_EXEC_UNEXPECTED_SUCCESS,
|
||||
|
||||
/*
|
||||
* No result available at this time. Wait until read-ready, then call
|
||||
* again. Internally, this is returned when PQisBusy indicates that
|
||||
* PQgetResult would block.
|
||||
*/
|
||||
WP_EXEC_NEEDS_INPUT,
|
||||
/* Catch-all failure. Check PQerrorMessage. */
|
||||
WP_EXEC_FAILED,
|
||||
} WalProposerExecStatusType;
|
||||
|
||||
/* Possible return values from walprop_async_read */
|
||||
typedef enum
|
||||
{
|
||||
/* The full read was successful. buf now points to the data */
|
||||
PG_ASYNC_READ_SUCCESS,
|
||||
|
||||
/*
|
||||
* The read is ongoing. Wait until the connection is read-ready, then try
|
||||
* again.
|
||||
*/
|
||||
PG_ASYNC_READ_TRY_AGAIN,
|
||||
/* Reading failed. Check PQerrorMessage(conn) */
|
||||
PG_ASYNC_READ_FAIL,
|
||||
} PGAsyncReadResult;
|
||||
|
||||
/* Possible return values from walprop_async_write */
|
||||
typedef enum
|
||||
{
|
||||
/* The write fully completed */
|
||||
PG_ASYNC_WRITE_SUCCESS,
|
||||
|
||||
/*
|
||||
* The write started, but you'll need to call PQflush some more times to
|
||||
* finish it off. We just tried, so it's best to wait until the connection
|
||||
* is read- or write-ready to try again.
|
||||
*
|
||||
* If it becomes read-ready, call PQconsumeInput and flush again. If it
|
||||
* becomes write-ready, just call PQflush.
|
||||
*/
|
||||
PG_ASYNC_WRITE_TRY_FLUSH,
|
||||
/* Writing failed. Check PQerrorMessage(conn) */
|
||||
PG_ASYNC_WRITE_FAIL,
|
||||
} PGAsyncWriteResult;
|
||||
|
||||
/*
|
||||
* This header is included by walproposer.h to define walproposer_api; if we're
|
||||
* building walproposer without pg, ignore libpq part, leaving only interface
|
||||
* types.
|
||||
*/
|
||||
#ifndef WALPROPOSER_LIB
|
||||
|
||||
#include "libpq-fe.h"
|
||||
|
||||
/*
|
||||
* Sometimes working directly with underlying PGconn is simpler, export the
|
||||
* whole thing for simplicity.
|
||||
*/
|
||||
typedef struct WalProposerConn
|
||||
{
|
||||
PGconn *pg_conn;
|
||||
bool is_nonblocking; /* whether the connection is non-blocking */
|
||||
char *recvbuf; /* last received CopyData message from
|
||||
* walprop_async_read */
|
||||
} WalProposerConn;
|
||||
|
||||
extern WalProposerConn *libpqwp_connect_start(char *conninfo);
|
||||
extern bool libpqwp_send_query(WalProposerConn *conn, char *query);
|
||||
extern WalProposerExecStatusType libpqwp_get_query_result(WalProposerConn *conn);
|
||||
extern PGAsyncReadResult libpqwp_async_read(WalProposerConn *conn, char **buf, int *amount);
|
||||
extern void libpqwp_disconnect(WalProposerConn *conn);
|
||||
|
||||
#endif /* WALPROPOSER_LIB */
|
||||
#endif /* ___LIBPQWALPROPOSER_H__ */
|
||||
731
pgxn/neon/neon_walreader.c
Normal file
731
pgxn/neon/neon_walreader.c
Normal file
@@ -0,0 +1,731 @@
|
||||
/*
|
||||
* Like WALRead, but when WAL segment doesn't exist locally instead of throwing
|
||||
* ERROR asynchronously tries to fetch it from the most advanced safekeeper.
|
||||
*
|
||||
* We can't use libpqwalreceiver as it blocks during connection establishment
|
||||
* (and waiting for PQExec result), so use libpqwalproposer instead.
|
||||
*
|
||||
* TODO: keepalives are currently never sent, so the other side can close the
|
||||
* connection prematurely.
|
||||
*
|
||||
* TODO: close conn if reading takes too long to prevent stuck connections.
|
||||
*/
|
||||
#include "postgres.h"
|
||||
|
||||
#include <sys/stat.h>
|
||||
#include <unistd.h>
|
||||
|
||||
#include "access/xlog_internal.h"
|
||||
#include "access/xlogdefs.h"
|
||||
#include "access/xlogreader.h"
|
||||
#include "libpq/pqformat.h"
|
||||
#include "storage/fd.h"
|
||||
#include "utils/wait_event.h"
|
||||
|
||||
#include "libpq-fe.h"
|
||||
|
||||
#include "neon_walreader.h"
|
||||
#include "walproposer.h"
|
||||
|
||||
#define NEON_WALREADER_ERR_MSG_LEN 512
|
||||
|
||||
/*
|
||||
* Can be called where NeonWALReader *state is available in the context, adds log_prefix.
|
||||
*/
|
||||
#define nwr_log(elevel, fmt, ...) elog(elevel, "%s" fmt, state->log_prefix, ## __VA_ARGS__)
|
||||
|
||||
static NeonWALReadResult NeonWALReadRemote(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli);
|
||||
static NeonWALReadResult NeonWALReaderReadMsg(NeonWALReader *state);
|
||||
static void NeonWALReaderResetRemote(NeonWALReader *state);
|
||||
static bool NeonWALReadLocal(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli);
|
||||
static bool neon_wal_segment_open(NeonWALReader *state, XLogSegNo nextSegNo, TimeLineID *tli_p);
|
||||
static void neon_wal_segment_close(NeonWALReader *state);
|
||||
static bool is_wal_segment_exists(XLogSegNo segno, int segsize,
|
||||
TimeLineID tli);
|
||||
|
||||
/*
|
||||
* State of connection to donor safekeeper.
|
||||
*/
|
||||
typedef enum
|
||||
{
|
||||
/* no remote connection */
|
||||
RS_NONE,
|
||||
/* doing PQconnectPoll, need readable socket */
|
||||
RS_CONNECTING_READ,
|
||||
/* doing PQconnectPoll, need writable socket */
|
||||
RS_CONNECTING_WRITE,
|
||||
/* Waiting for START_REPLICATION result */
|
||||
RS_WAIT_EXEC_RESULT,
|
||||
/* replication stream established */
|
||||
RS_ESTABLISHED,
|
||||
} NeonWALReaderRemoteState;
|
||||
|
||||
struct NeonWALReader
|
||||
{
|
||||
/*
|
||||
* LSN before which we assume WAL is not available locally. Exists because
|
||||
* though first segment after startup always exists, part before
|
||||
* basebackup LSN is filled with zeros.
|
||||
*/
|
||||
XLogRecPtr available_lsn;
|
||||
WALSegmentContext segcxt;
|
||||
WALOpenSegment seg;
|
||||
int wre_errno;
|
||||
/* Explains failure to read, static for simplicity. */
|
||||
char err_msg[NEON_WALREADER_ERR_MSG_LEN];
|
||||
|
||||
/*
|
||||
* Saved info about request in progress, used to check validity of
|
||||
* arguments after resume and remember how far we accomplished it. req_lsn
|
||||
* is 0 if there is no request in progress.
|
||||
*/
|
||||
XLogRecPtr req_lsn;
|
||||
Size req_len;
|
||||
Size req_progress;
|
||||
WalProposer *wp; /* we learn donor through walproposer */
|
||||
char donor_name[64]; /* saved donor safekeeper name for logging */
|
||||
/* state of connection to safekeeper */
|
||||
NeonWALReaderRemoteState rem_state;
|
||||
WalProposerConn *wp_conn;
|
||||
|
||||
/*
|
||||
* position in wp_conn recvbuf from which we'll copy WAL next time, or
|
||||
* NULL if there is no unprocessed message
|
||||
*/
|
||||
char *wal_ptr;
|
||||
Size wal_rem_len; /* how many unprocessed bytes left in recvbuf */
|
||||
|
||||
/*
|
||||
* LSN of wal_ptr position according to walsender to cross check against
|
||||
* read request
|
||||
*/
|
||||
XLogRecPtr rem_lsn;
|
||||
|
||||
/* prepended to lines logged by neon_walreader, if provided */
|
||||
char log_prefix[64];
|
||||
};
|
||||
|
||||
/* palloc and initialize NeonWALReader */
|
||||
NeonWALReader *
|
||||
NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn, WalProposer *wp, char *log_prefix)
|
||||
{
|
||||
NeonWALReader *reader;
|
||||
|
||||
reader = (NeonWALReader *)
|
||||
palloc_extended(sizeof(NeonWALReader),
|
||||
MCXT_ALLOC_NO_OOM | MCXT_ALLOC_ZERO);
|
||||
if (!reader)
|
||||
return NULL;
|
||||
|
||||
reader->available_lsn = available_lsn;
|
||||
reader->seg.ws_file = -1;
|
||||
reader->seg.ws_segno = 0;
|
||||
reader->seg.ws_tli = 0;
|
||||
reader->segcxt.ws_segsize = wal_segment_size;
|
||||
|
||||
reader->wp = wp;
|
||||
|
||||
reader->rem_state = RS_NONE;
|
||||
|
||||
if (log_prefix)
|
||||
strlcpy(reader->log_prefix, log_prefix, sizeof(reader->log_prefix));
|
||||
|
||||
return reader;
|
||||
}
|
||||
|
||||
void
|
||||
NeonWALReaderFree(NeonWALReader *state)
|
||||
{
|
||||
if (state->seg.ws_file != -1)
|
||||
neon_wal_segment_close(state);
|
||||
if (state->wp_conn)
|
||||
libpqwp_disconnect(state->wp_conn);
|
||||
pfree(state);
|
||||
}
|
||||
|
||||
/*
|
||||
* Like vanilla WALRead, but if requested position is before available_lsn or
|
||||
* WAL segment doesn't exist on disk, it tries to fetch needed segment from the
|
||||
* advanced safekeeper.
|
||||
*
|
||||
* Read 'count' bytes into 'buf', starting at location 'startptr', from WAL
|
||||
* fetched from timeline 'tli'.
|
||||
*
|
||||
* Returns NEON_WALREAD_SUCCESS if succeeded, NEON_WALREAD_ERROR if an error
|
||||
* occurs, in which case 'err' has the desciption. Error always closes remote
|
||||
* connection, if there was any, so socket subscription should be removed.
|
||||
*
|
||||
* NEON_WALREAD_WOULDBLOCK means caller should obtain socket to wait for with
|
||||
* NeonWALReaderSocket and call NeonWALRead again with exactly the same
|
||||
* arguments when NeonWALReaderEvents happen on the socket. Note that per libpq
|
||||
* docs during connection establishment (before first successful read) socket
|
||||
* underneath might change.
|
||||
*
|
||||
* Also, eventually walreader should switch from remote to local read; caller
|
||||
* should remove subscription to socket then by checking NeonWALReaderEvents
|
||||
* after successful read (otherwise next read might reopen the connection with
|
||||
* different socket).
|
||||
*
|
||||
* Reading not monotonically is not supported and will result in error.
|
||||
*
|
||||
* Caller should be sure that WAL up to requested LSN exists, otherwise
|
||||
* NEON_WALREAD_WOULDBLOCK might be always returned.
|
||||
*/
|
||||
NeonWALReadResult
|
||||
NeonWALRead(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli)
|
||||
{
|
||||
/*
|
||||
* If requested data is before known available basebackup lsn or there is
|
||||
* already active remote state, do remote read.
|
||||
*/
|
||||
if (startptr < state->available_lsn || state->rem_state != RS_NONE)
|
||||
{
|
||||
return NeonWALReadRemote(state, buf, startptr, count, tli);
|
||||
}
|
||||
if (NeonWALReadLocal(state, buf, startptr, count, tli))
|
||||
{
|
||||
return NEON_WALREAD_SUCCESS;
|
||||
}
|
||||
else if (state->wre_errno == ENOENT)
|
||||
{
|
||||
nwr_log(LOG, "local read failed as segment at %X/%X doesn't exist, attempting remote",
|
||||
LSN_FORMAT_ARGS(startptr));
|
||||
return NeonWALReadRemote(state, buf, startptr, count, tli);
|
||||
}
|
||||
else
|
||||
{
|
||||
return NEON_WALREAD_ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
/* Do the read from remote safekeeper. */
|
||||
static NeonWALReadResult
|
||||
NeonWALReadRemote(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli)
|
||||
{
|
||||
if (state->rem_state == RS_NONE)
|
||||
{
|
||||
XLogRecPtr donor_lsn;
|
||||
|
||||
/* no connection yet; start one */
|
||||
Safekeeper *donor = GetDonor(state->wp, &donor_lsn);
|
||||
|
||||
if (donor == NULL)
|
||||
{
|
||||
snprintf(state->err_msg, sizeof(state->err_msg),
|
||||
"failed to establish remote connection to fetch WAL: no donor available");
|
||||
return NEON_WALREAD_ERROR;
|
||||
}
|
||||
snprintf(state->donor_name, sizeof(state->donor_name), "%s:%s", donor->host, donor->port);
|
||||
nwr_log(LOG, "establishing connection to %s, flush_lsn %X/%X to fetch WAL",
|
||||
state->donor_name, LSN_FORMAT_ARGS(donor_lsn));
|
||||
state->wp_conn = libpqwp_connect_start(donor->conninfo);
|
||||
if (PQstatus(state->wp_conn->pg_conn) == CONNECTION_BAD)
|
||||
{
|
||||
snprintf(state->err_msg, sizeof(state->err_msg),
|
||||
"failed to connect to %s to fetch WAL: immediately failed with %s",
|
||||
state->donor_name, PQerrorMessage(state->wp_conn->pg_conn));
|
||||
NeonWALReaderResetRemote(state);
|
||||
return NEON_WALREAD_ERROR;
|
||||
}
|
||||
/* we'll poll immediately */
|
||||
state->rem_state = RS_CONNECTING_READ;
|
||||
}
|
||||
|
||||
if (state->rem_state == RS_CONNECTING_READ || state->rem_state == RS_CONNECTING_WRITE)
|
||||
{
|
||||
switch (PQconnectPoll(state->wp_conn->pg_conn))
|
||||
{
|
||||
case PGRES_POLLING_FAILED:
|
||||
snprintf(state->err_msg, sizeof(state->err_msg),
|
||||
"failed to connect to %s to fetch WAL: poll error: %s",
|
||||
state->donor_name, PQerrorMessage(state->wp_conn->pg_conn));
|
||||
NeonWALReaderResetRemote(state);
|
||||
return NEON_WALREAD_ERROR;
|
||||
case PGRES_POLLING_READING:
|
||||
state->rem_state = RS_CONNECTING_READ;
|
||||
return NEON_WALREAD_WOULDBLOCK;
|
||||
case PGRES_POLLING_WRITING:
|
||||
state->rem_state = RS_CONNECTING_WRITE;
|
||||
return NEON_WALREAD_WOULDBLOCK;
|
||||
case PGRES_POLLING_OK:
|
||||
{
|
||||
/* connection successfully established */
|
||||
char start_repl_query[128];
|
||||
|
||||
snprintf(start_repl_query, sizeof(start_repl_query),
|
||||
"START_REPLICATION PHYSICAL %X/%X (term='" UINT64_FORMAT "')",
|
||||
LSN_FORMAT_ARGS(startptr), state->wp->propTerm);
|
||||
nwr_log(LOG, "connection to %s to fetch WAL succeeded, running %s",
|
||||
state->donor_name, start_repl_query);
|
||||
if (!libpqwp_send_query(state->wp_conn, start_repl_query))
|
||||
{
|
||||
snprintf(state->err_msg, sizeof(state->err_msg),
|
||||
"failed to send %s query to %s: %s",
|
||||
start_repl_query, state->donor_name, PQerrorMessage(state->wp_conn->pg_conn));
|
||||
NeonWALReaderResetRemote(state);
|
||||
return NEON_WALREAD_ERROR;
|
||||
}
|
||||
state->rem_state = RS_WAIT_EXEC_RESULT;
|
||||
break;
|
||||
}
|
||||
|
||||
default: /* there is unused PGRES_POLLING_ACTIVE */
|
||||
Assert(false);
|
||||
return NEON_WALREAD_ERROR; /* keep the compiler quiet */
|
||||
}
|
||||
}
|
||||
|
||||
if (state->rem_state == RS_WAIT_EXEC_RESULT)
|
||||
{
|
||||
switch (libpqwp_get_query_result(state->wp_conn))
|
||||
{
|
||||
case WP_EXEC_SUCCESS_COPYBOTH:
|
||||
state->rem_state = RS_ESTABLISHED;
|
||||
break;
|
||||
case WP_EXEC_NEEDS_INPUT:
|
||||
return NEON_WALREAD_WOULDBLOCK;
|
||||
case WP_EXEC_FAILED:
|
||||
snprintf(state->err_msg, sizeof(state->err_msg),
|
||||
"get START_REPLICATION result from %s failed: %s",
|
||||
state->donor_name, PQerrorMessage(state->wp_conn->pg_conn));
|
||||
NeonWALReaderResetRemote(state);
|
||||
return NEON_WALREAD_ERROR;
|
||||
default: /* can't happen */
|
||||
snprintf(state->err_msg, sizeof(state->err_msg),
|
||||
"get START_REPLICATION result from %s: unexpected result",
|
||||
state->donor_name);
|
||||
NeonWALReaderResetRemote(state);
|
||||
return NEON_WALREAD_ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
Assert(state->rem_state == RS_ESTABLISHED);
|
||||
|
||||
/*
|
||||
* If we had the request before, verify args are the same and advance the
|
||||
* result ptr according to the progress; otherwise register the request.
|
||||
*/
|
||||
if (state->req_lsn != InvalidXLogRecPtr)
|
||||
{
|
||||
if (state->req_lsn != startptr || state->req_len != count)
|
||||
{
|
||||
snprintf(state->err_msg, sizeof(state->err_msg),
|
||||
"args changed during request, was %X/%X %zu, now %X/%X %zu",
|
||||
LSN_FORMAT_ARGS(state->req_lsn), state->req_len, LSN_FORMAT_ARGS(startptr), count);
|
||||
NeonWALReaderResetRemote(state);
|
||||
return NEON_WALREAD_ERROR;
|
||||
}
|
||||
nwr_log(DEBUG5, "continuing remote read at req_lsn=%X/%X len=%zu, req_progress=%zu",
|
||||
LSN_FORMAT_ARGS(startptr),
|
||||
count,
|
||||
state->req_progress);
|
||||
buf += state->req_progress;
|
||||
}
|
||||
else
|
||||
{
|
||||
state->req_lsn = startptr;
|
||||
state->req_len = count;
|
||||
state->req_progress = 0;
|
||||
nwr_log(DEBUG5, "starting remote read req_lsn=%X/%X len=%zu",
|
||||
LSN_FORMAT_ARGS(startptr),
|
||||
count);
|
||||
}
|
||||
|
||||
while (true)
|
||||
{
|
||||
Size to_copy;
|
||||
|
||||
/*
|
||||
* If we have no ready data, receive new message.
|
||||
*/
|
||||
if (state->wal_rem_len == 0 &&
|
||||
|
||||
/*
|
||||
* check for the sake of 0 length reads; walproposer does these for
|
||||
* heartbeats, though generally they shouldn't hit remote source.
|
||||
*/
|
||||
state->req_len - state->req_progress > 0)
|
||||
{
|
||||
NeonWALReadResult read_msg_res = NeonWALReaderReadMsg(state);
|
||||
|
||||
if (read_msg_res != NEON_WALREAD_SUCCESS)
|
||||
return read_msg_res;
|
||||
}
|
||||
|
||||
if (state->req_lsn + state->req_progress != state->rem_lsn)
|
||||
{
|
||||
snprintf(state->err_msg, sizeof(state->err_msg),
|
||||
"expected remote WAL at %X/%X but got %X/%X. Non monotonic read requests could have caused this. req_lsn=%X/%X len=%zu",
|
||||
LSN_FORMAT_ARGS(state->req_lsn + state->req_progress),
|
||||
LSN_FORMAT_ARGS(state->rem_lsn),
|
||||
LSN_FORMAT_ARGS(state->req_lsn),
|
||||
state->req_len);
|
||||
NeonWALReaderResetRemote(state);
|
||||
return NEON_WALREAD_ERROR;
|
||||
}
|
||||
|
||||
/* We can copy min of (available, requested) bytes. */
|
||||
to_copy =
|
||||
Min(state->req_len - state->req_progress, state->wal_rem_len);
|
||||
memcpy(buf, state->wal_ptr, to_copy);
|
||||
state->wal_ptr += to_copy;
|
||||
state->wal_rem_len -= to_copy;
|
||||
state->rem_lsn += to_copy;
|
||||
if (state->wal_rem_len == 0)
|
||||
state->wal_ptr = NULL; /* freed by libpqwalproposer */
|
||||
buf += to_copy;
|
||||
state->req_progress += to_copy;
|
||||
if (state->req_progress == state->req_len)
|
||||
{
|
||||
XLogSegNo next_segno;
|
||||
XLogSegNo req_segno;
|
||||
|
||||
XLByteToSeg(state->req_lsn, req_segno, state->segcxt.ws_segsize);
|
||||
XLByteToSeg(state->rem_lsn, next_segno, state->segcxt.ws_segsize);
|
||||
|
||||
/*
|
||||
* Request completed. If there is a chance of serving next one
|
||||
* locally, close the connection.
|
||||
*/
|
||||
if (state->req_lsn < state->available_lsn &&
|
||||
state->rem_lsn >= state->available_lsn)
|
||||
{
|
||||
nwr_log(LOG, "closing remote connection as available_lsn %X/%X crossed and next read at %X/%X is likely to be served locally",
|
||||
LSN_FORMAT_ARGS(state->available_lsn), LSN_FORMAT_ARGS(state->rem_lsn));
|
||||
NeonWALReaderResetRemote(state);
|
||||
}
|
||||
else if (state->rem_lsn >= state->available_lsn && next_segno > req_segno &&
|
||||
is_wal_segment_exists(next_segno, state->segcxt.ws_segsize, tli))
|
||||
{
|
||||
nwr_log(LOG, "closing remote connection as WAL file at next lsn %X/%X exists",
|
||||
LSN_FORMAT_ARGS(state->rem_lsn));
|
||||
NeonWALReaderResetRemote(state);
|
||||
}
|
||||
state->req_lsn = InvalidXLogRecPtr;
|
||||
state->req_len = 0;
|
||||
state->req_progress = 0;
|
||||
return NEON_WALREAD_SUCCESS;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Read one WAL message from the stream, sets state->wal_ptr in case of success.
|
||||
* Resets remote state in case of failure.
|
||||
*/
|
||||
static NeonWALReadResult
|
||||
NeonWALReaderReadMsg(NeonWALReader *state)
|
||||
{
|
||||
while (true) /* loop until we get 'w' */
|
||||
{
|
||||
char *copydata_ptr;
|
||||
int copydata_size;
|
||||
StringInfoData s;
|
||||
char msg_type;
|
||||
int hdrlen;
|
||||
|
||||
Assert(state->rem_state == RS_ESTABLISHED);
|
||||
Assert(state->wal_ptr == NULL && state->wal_rem_len == 0);
|
||||
|
||||
switch (libpqwp_async_read(state->wp_conn,
|
||||
©data_ptr,
|
||||
©data_size))
|
||||
{
|
||||
case PG_ASYNC_READ_SUCCESS:
|
||||
break;
|
||||
case PG_ASYNC_READ_TRY_AGAIN:
|
||||
return NEON_WALREAD_WOULDBLOCK;
|
||||
case PG_ASYNC_READ_FAIL:
|
||||
snprintf(state->err_msg,
|
||||
sizeof(state->err_msg),
|
||||
"req_lsn=%X/%X, req_len=%zu, req_progress=%zu, get copydata failed: %s",
|
||||
LSN_FORMAT_ARGS(state->req_lsn),
|
||||
state->req_len,
|
||||
state->req_progress,
|
||||
PQerrorMessage(state->wp_conn->pg_conn));
|
||||
goto err;
|
||||
}
|
||||
|
||||
/* put data on StringInfo to parse */
|
||||
s.data = copydata_ptr;
|
||||
s.len = copydata_size;
|
||||
s.cursor = 0;
|
||||
s.maxlen = -1;
|
||||
|
||||
if (copydata_size == 0)
|
||||
{
|
||||
snprintf(state->err_msg,
|
||||
sizeof(state->err_msg),
|
||||
"zero length copydata received");
|
||||
goto err;
|
||||
}
|
||||
msg_type = pq_getmsgbyte(&s);
|
||||
switch (msg_type)
|
||||
{
|
||||
case 'w':
|
||||
{
|
||||
XLogRecPtr start_lsn;
|
||||
|
||||
hdrlen = sizeof(int64) + sizeof(int64) + sizeof(int64);
|
||||
if (s.len - s.cursor < hdrlen)
|
||||
{
|
||||
snprintf(state->err_msg,
|
||||
sizeof(state->err_msg),
|
||||
"invalid WAL message received from primary");
|
||||
goto err;
|
||||
}
|
||||
|
||||
start_lsn = pq_getmsgint64(&s);
|
||||
pq_getmsgint64(&s); /* XLogRecPtr end_lsn; */
|
||||
pq_getmsgint64(&s); /* TimestampTz send_time */
|
||||
|
||||
state->rem_lsn = start_lsn;
|
||||
state->wal_rem_len = (Size) (s.len - s.cursor);
|
||||
state->wal_ptr = (char *) pq_getmsgbytes(&s, s.len - s.cursor);
|
||||
nwr_log(DEBUG5, "received WAL msg at %X/%X len %zu",
|
||||
LSN_FORMAT_ARGS(state->rem_lsn), state->wal_rem_len);
|
||||
|
||||
return NEON_WALREAD_SUCCESS;
|
||||
}
|
||||
case 'k':
|
||||
{
|
||||
XLogRecPtr end_lsn;
|
||||
bool reply_requested;
|
||||
|
||||
hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char);
|
||||
if (s.len - s.cursor < hdrlen)
|
||||
{
|
||||
snprintf(state->err_msg, sizeof(state->err_msg),
|
||||
"invalid keepalive message received from primary");
|
||||
goto err;
|
||||
}
|
||||
|
||||
end_lsn = pq_getmsgint64(&s);
|
||||
pq_getmsgint64(&s); /* TimestampTz timestamp; */
|
||||
reply_requested = pq_getmsgbyte(&s);
|
||||
nwr_log(DEBUG5, "received keepalive end_lsn=%X/%X reply_requested=%d",
|
||||
LSN_FORMAT_ARGS(end_lsn),
|
||||
reply_requested);
|
||||
if (end_lsn < state->req_lsn + state->req_len)
|
||||
{
|
||||
snprintf(state->err_msg, sizeof(state->err_msg),
|
||||
"closing remote connection: requested WAL up to %X/%X, but current donor %s has only up to %X/%X",
|
||||
LSN_FORMAT_ARGS(state->req_lsn + state->req_len), state->donor_name, LSN_FORMAT_ARGS(end_lsn));
|
||||
goto err;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
default:
|
||||
nwr_log(WARNING, "invalid replication message type %d", msg_type);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
err:
|
||||
NeonWALReaderResetRemote(state);
|
||||
return NEON_WALREAD_ERROR;
|
||||
}
|
||||
|
||||
/* reset remote connection and request in progress */
|
||||
static void
|
||||
NeonWALReaderResetRemote(NeonWALReader *state)
|
||||
{
|
||||
state->req_lsn = InvalidXLogRecPtr;
|
||||
state->req_len = 0;
|
||||
state->req_progress = 0;
|
||||
state->rem_state = RS_NONE;
|
||||
if (state->wp_conn)
|
||||
{
|
||||
libpqwp_disconnect(state->wp_conn);
|
||||
state->wp_conn = NULL;
|
||||
}
|
||||
state->donor_name[0] = '\0';
|
||||
state->wal_ptr = NULL;
|
||||
state->wal_rem_len = 0;
|
||||
state->rem_lsn = InvalidXLogRecPtr;
|
||||
}
|
||||
|
||||
/*
|
||||
* Return socket of connection to remote source. Must be called only when
|
||||
* connection exists (NeonWALReaderEvents returns non zero).
|
||||
*/
|
||||
pgsocket
|
||||
NeonWALReaderSocket(NeonWALReader *state)
|
||||
{
|
||||
if (!state->wp_conn)
|
||||
nwr_log(FATAL, "NeonWALReaderSocket is called without active remote connection");
|
||||
return PQsocket(state->wp_conn->pg_conn);
|
||||
}
|
||||
|
||||
/*
|
||||
* Returns events user should wait on connection socket or 0 if remote
|
||||
* connection is not active.
|
||||
*/
|
||||
extern uint32
|
||||
NeonWALReaderEvents(NeonWALReader *state)
|
||||
{
|
||||
switch (state->rem_state)
|
||||
{
|
||||
case RS_NONE:
|
||||
return 0;
|
||||
case RS_CONNECTING_READ:
|
||||
return WL_SOCKET_READABLE;
|
||||
case RS_CONNECTING_WRITE:
|
||||
return WL_SOCKET_WRITEABLE;
|
||||
case RS_WAIT_EXEC_RESULT:
|
||||
case RS_ESTABLISHED:
|
||||
return WL_SOCKET_READABLE;
|
||||
default:
|
||||
Assert(false);
|
||||
return 0; /* make compiler happy */
|
||||
}
|
||||
}
|
||||
|
||||
static bool
|
||||
NeonWALReadLocal(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli)
|
||||
{
|
||||
char *p;
|
||||
XLogRecPtr recptr;
|
||||
Size nbytes;
|
||||
|
||||
p = buf;
|
||||
recptr = startptr;
|
||||
nbytes = count;
|
||||
|
||||
while (nbytes > 0)
|
||||
{
|
||||
uint32 startoff;
|
||||
int segbytes;
|
||||
int readbytes;
|
||||
|
||||
startoff = XLogSegmentOffset(recptr, state->segcxt.ws_segsize);
|
||||
|
||||
/*
|
||||
* If the data we want is not in a segment we have open, close what we
|
||||
* have (if anything) and open the next one, using the caller's
|
||||
* provided openSegment callback.
|
||||
*/
|
||||
if (state->seg.ws_file < 0 ||
|
||||
!XLByteInSeg(recptr, state->seg.ws_segno, state->segcxt.ws_segsize) ||
|
||||
tli != state->seg.ws_tli)
|
||||
{
|
||||
XLogSegNo nextSegNo;
|
||||
|
||||
neon_wal_segment_close(state);
|
||||
|
||||
XLByteToSeg(recptr, nextSegNo, state->segcxt.ws_segsize);
|
||||
if (!neon_wal_segment_open(state, nextSegNo, &tli))
|
||||
{
|
||||
char fname[MAXFNAMELEN];
|
||||
|
||||
state->wre_errno = errno;
|
||||
|
||||
XLogFileName(fname, tli, nextSegNo, state->segcxt.ws_segsize);
|
||||
snprintf(state->err_msg, sizeof(state->err_msg), "failed to open WAL segment %s while reading at %X/%X: %s",
|
||||
fname, LSN_FORMAT_ARGS(recptr), strerror(state->wre_errno));
|
||||
return false;
|
||||
}
|
||||
|
||||
/* This shouldn't happen -- indicates a bug in segment_open */
|
||||
Assert(state->seg.ws_file >= 0);
|
||||
|
||||
/* Update the current segment info. */
|
||||
state->seg.ws_tli = tli;
|
||||
state->seg.ws_segno = nextSegNo;
|
||||
}
|
||||
|
||||
/* How many bytes are within this segment? */
|
||||
if (nbytes > (state->segcxt.ws_segsize - startoff))
|
||||
segbytes = state->segcxt.ws_segsize - startoff;
|
||||
else
|
||||
segbytes = nbytes;
|
||||
|
||||
#ifndef FRONTEND
|
||||
pgstat_report_wait_start(WAIT_EVENT_WAL_READ);
|
||||
#endif
|
||||
|
||||
/* Reset errno first; eases reporting non-errno-affecting errors */
|
||||
errno = 0;
|
||||
readbytes = pg_pread(state->seg.ws_file, p, segbytes, (off_t) startoff);
|
||||
|
||||
#ifndef FRONTEND
|
||||
pgstat_report_wait_end();
|
||||
#endif
|
||||
|
||||
if (readbytes <= 0)
|
||||
{
|
||||
char fname[MAXFNAMELEN];
|
||||
|
||||
XLogFileName(fname, state->seg.ws_tli, state->seg.ws_segno, state->segcxt.ws_segsize);
|
||||
|
||||
if (readbytes < 0)
|
||||
{
|
||||
state->wre_errno = errno;
|
||||
snprintf(state->err_msg, sizeof(state->err_msg), "could not read from log segment %s, offset %d: %m: %s",
|
||||
fname, startoff, strerror(state->wre_errno));
|
||||
}
|
||||
else
|
||||
{
|
||||
snprintf(state->err_msg, sizeof(state->err_msg), "could not read from log segment %s, offset %d: %m: unexpected EOF",
|
||||
fname, startoff);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/* Update state for read */
|
||||
recptr += readbytes;
|
||||
nbytes -= readbytes;
|
||||
p += readbytes;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/*
|
||||
* Copy of vanilla wal_segment_open, but returns false in case of error instead
|
||||
* of ERROR, with errno set.
|
||||
*
|
||||
* XLogReaderRoutine->segment_open callback for local pg_wal files
|
||||
*/
|
||||
static bool
|
||||
neon_wal_segment_open(NeonWALReader *state, XLogSegNo nextSegNo,
|
||||
TimeLineID *tli_p)
|
||||
{
|
||||
TimeLineID tli = *tli_p;
|
||||
char path[MAXPGPATH];
|
||||
|
||||
XLogFilePath(path, tli, nextSegNo, state->segcxt.ws_segsize);
|
||||
nwr_log(DEBUG5, "opening %s", path);
|
||||
state->seg.ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY);
|
||||
if (state->seg.ws_file >= 0)
|
||||
return true;
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
static bool
|
||||
is_wal_segment_exists(XLogSegNo segno, int segsize, TimeLineID tli)
|
||||
{
|
||||
struct stat stat_buffer;
|
||||
char path[MAXPGPATH];
|
||||
|
||||
XLogFilePath(path, tli, segno, segsize);
|
||||
return stat(path, &stat_buffer) == 0;
|
||||
}
|
||||
|
||||
/* copy of vanilla wal_segment_close with NeonWALReader */
|
||||
static void
|
||||
neon_wal_segment_close(NeonWALReader *state)
|
||||
{
|
||||
if (state->seg.ws_file >= 0)
|
||||
{
|
||||
close(state->seg.ws_file);
|
||||
/* need to check errno? */
|
||||
state->seg.ws_file = -1;
|
||||
}
|
||||
}
|
||||
|
||||
char *
|
||||
NeonWALReaderErrMsg(NeonWALReader *state)
|
||||
{
|
||||
return state->err_msg;
|
||||
}
|
||||
29
pgxn/neon/neon_walreader.h
Normal file
29
pgxn/neon/neon_walreader.h
Normal file
@@ -0,0 +1,29 @@
|
||||
#ifndef __NEON_WALREADER_H__
|
||||
#define __NEON_WALREADER_H__
|
||||
|
||||
#include "access/xlogdefs.h"
|
||||
|
||||
/* forward declare so we don't have to expose the struct to the public */
|
||||
struct NeonWALReader;
|
||||
typedef struct NeonWALReader NeonWALReader;
|
||||
|
||||
/* avoid including walproposer.h as it includes us */
|
||||
struct WalProposer;
|
||||
typedef struct WalProposer WalProposer;
|
||||
|
||||
/* NeonWALRead return value */
|
||||
typedef enum
|
||||
{
|
||||
NEON_WALREAD_SUCCESS,
|
||||
NEON_WALREAD_WOULDBLOCK,
|
||||
NEON_WALREAD_ERROR,
|
||||
} NeonWALReadResult;
|
||||
|
||||
extern NeonWALReader *NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn, WalProposer *wp, char *log_prefix);
|
||||
extern void NeonWALReaderFree(NeonWALReader *state);
|
||||
extern NeonWALReadResult NeonWALRead(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli);
|
||||
extern pgsocket NeonWALReaderSocket(NeonWALReader *state);
|
||||
extern uint32 NeonWALReaderEvents(NeonWALReader *state);
|
||||
extern char *NeonWALReaderErrMsg(NeonWALReader *state);
|
||||
|
||||
#endif /* __NEON_WALREADER_H__ */
|
||||
@@ -45,7 +45,6 @@
|
||||
|
||||
/* Prototypes for private functions */
|
||||
static void WalProposerLoop(WalProposer *wp);
|
||||
static void HackyRemoveWalProposerEvent(Safekeeper *to_remove);
|
||||
static void ShutdownConnection(Safekeeper *sk);
|
||||
static void ResetConnection(Safekeeper *sk);
|
||||
static long TimeToReconnect(WalProposer *wp, TimestampTz now);
|
||||
@@ -78,11 +77,11 @@ static bool BlockingWrite(Safekeeper *sk, void *msg, size_t msg_size, Safekeeper
|
||||
static bool AsyncWrite(Safekeeper *sk, void *msg, size_t msg_size, SafekeeperState flush_state);
|
||||
static bool AsyncFlush(Safekeeper *sk);
|
||||
static int CompareLsn(const void *a, const void *b);
|
||||
static char *FormatSafekeeperState(SafekeeperState state);
|
||||
static char *FormatSafekeeperState(Safekeeper *sk);
|
||||
static void AssertEventsOkForState(uint32 events, Safekeeper *sk);
|
||||
static uint32 SafekeeperStateDesiredEvents(SafekeeperState state);
|
||||
static char *FormatEvents(WalProposer *wp, uint32 events);
|
||||
|
||||
|
||||
WalProposer *
|
||||
WalProposerCreate(WalProposerConfig *config, walproposer_api api)
|
||||
{
|
||||
@@ -113,6 +112,7 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api)
|
||||
wp->safekeeper[wp->n_safekeepers].host = host;
|
||||
wp->safekeeper[wp->n_safekeepers].port = port;
|
||||
wp->safekeeper[wp->n_safekeepers].state = SS_OFFLINE;
|
||||
wp->safekeeper[wp->n_safekeepers].active_state = SS_ACTIVE_SEND;
|
||||
wp->safekeeper[wp->n_safekeepers].wp = wp;
|
||||
|
||||
{
|
||||
@@ -127,8 +127,6 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api)
|
||||
}
|
||||
|
||||
initStringInfo(&wp->safekeeper[wp->n_safekeepers].outbuf);
|
||||
wp->api.wal_reader_allocate(&wp->safekeeper[wp->n_safekeepers]);
|
||||
wp->safekeeper[wp->n_safekeepers].flushWrite = false;
|
||||
wp->safekeeper[wp->n_safekeepers].startStreamingAt = InvalidXLogRecPtr;
|
||||
wp->safekeeper[wp->n_safekeepers].streamingAt = InvalidXLogRecPtr;
|
||||
wp->n_safekeepers += 1;
|
||||
@@ -277,7 +275,7 @@ WalProposerPoll(WalProposer *wp)
|
||||
wp->config->safekeeper_connection_timeout))
|
||||
{
|
||||
walprop_log(WARNING, "terminating connection to safekeeper '%s:%s' in '%s' state: no messages received during the last %dms or connection attempt took longer than that",
|
||||
sk->host, sk->port, FormatSafekeeperState(sk->state), wp->config->safekeeper_connection_timeout);
|
||||
sk->host, sk->port, FormatSafekeeperState(sk), wp->config->safekeeper_connection_timeout);
|
||||
ShutdownConnection(sk);
|
||||
}
|
||||
}
|
||||
@@ -305,58 +303,20 @@ WalProposerLoop(WalProposer *wp)
|
||||
WalProposerPoll(wp);
|
||||
}
|
||||
|
||||
/*
|
||||
* Hack: provides a way to remove the event corresponding to an individual walproposer from the set.
|
||||
*
|
||||
* Note: Internally, this completely reconstructs the event set. It should be avoided if possible.
|
||||
*/
|
||||
static void
|
||||
HackyRemoveWalProposerEvent(Safekeeper *to_remove)
|
||||
{
|
||||
WalProposer *wp = to_remove->wp;
|
||||
|
||||
/* Remove the existing event set, assign sk->eventPos = -1 */
|
||||
wp->api.free_event_set(wp);
|
||||
/* Re-initialize it without adding any safekeeper events */
|
||||
wp->api.init_event_set(wp);
|
||||
|
||||
/*
|
||||
* loop through the existing safekeepers. If they aren't the one we're
|
||||
* removing, and if they have a socket we can use, re-add the applicable
|
||||
* events.
|
||||
*/
|
||||
for (int i = 0; i < wp->n_safekeepers; i++)
|
||||
{
|
||||
uint32 desired_events = WL_NO_EVENTS;
|
||||
Safekeeper *sk = &wp->safekeeper[i];
|
||||
|
||||
if (sk == to_remove)
|
||||
continue;
|
||||
|
||||
/* If this safekeeper isn't offline, add an event for it! */
|
||||
if (sk->state != SS_OFFLINE)
|
||||
{
|
||||
desired_events = SafekeeperStateDesiredEvents(sk->state);
|
||||
/* will set sk->eventPos */
|
||||
wp->api.add_safekeeper_event_set(sk, desired_events);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* Shuts down and cleans up the connection for a safekeeper. Sets its state to SS_OFFLINE */
|
||||
static void
|
||||
ShutdownConnection(Safekeeper *sk)
|
||||
{
|
||||
sk->wp->api.conn_finish(sk);
|
||||
sk->state = SS_OFFLINE;
|
||||
sk->flushWrite = false;
|
||||
sk->streamingAt = InvalidXLogRecPtr;
|
||||
|
||||
if (sk->voteResponse.termHistory.entries)
|
||||
pfree(sk->voteResponse.termHistory.entries);
|
||||
sk->voteResponse.termHistory.entries = NULL;
|
||||
|
||||
HackyRemoveWalProposerEvent(sk);
|
||||
sk->wp->api.conn_finish(sk);
|
||||
sk->wp->api.rm_safekeeper_event_set(sk);
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -474,7 +434,9 @@ ReconnectSafekeepers(WalProposer *wp)
|
||||
static void
|
||||
AdvancePollState(Safekeeper *sk, uint32 events)
|
||||
{
|
||||
#ifdef WALPROPOSER_LIB /* walprop_log needs wp in lib build */
|
||||
WalProposer *wp = sk->wp;
|
||||
#endif
|
||||
|
||||
/*
|
||||
* Sanity check. We assume further down that the operations don't block
|
||||
@@ -527,7 +489,7 @@ AdvancePollState(Safekeeper *sk, uint32 events)
|
||||
*/
|
||||
case SS_VOTING:
|
||||
walprop_log(WARNING, "EOF from node %s:%s in %s state", sk->host,
|
||||
sk->port, FormatSafekeeperState(sk->state));
|
||||
sk->port, FormatSafekeeperState(sk));
|
||||
ResetConnection(sk);
|
||||
return;
|
||||
|
||||
@@ -556,7 +518,7 @@ AdvancePollState(Safekeeper *sk, uint32 events)
|
||||
*/
|
||||
case SS_IDLE:
|
||||
walprop_log(WARNING, "EOF from node %s:%s in %s state", sk->host,
|
||||
sk->port, FormatSafekeeperState(sk->state));
|
||||
sk->port, FormatSafekeeperState(sk));
|
||||
ResetConnection(sk);
|
||||
return;
|
||||
|
||||
@@ -622,7 +584,7 @@ HandleConnectionEvent(Safekeeper *sk)
|
||||
* Because PQconnectPoll can change the socket, we have to un-register the
|
||||
* old event and re-register an event on the new socket.
|
||||
*/
|
||||
HackyRemoveWalProposerEvent(sk);
|
||||
wp->api.rm_safekeeper_event_set(sk);
|
||||
wp->api.add_safekeeper_event_set(sk, new_events);
|
||||
|
||||
/* If we successfully connected, send START_WAL_PUSH query */
|
||||
@@ -1112,6 +1074,9 @@ SendProposerElected(Safekeeper *sk)
|
||||
term_t lastCommonTerm;
|
||||
int i;
|
||||
|
||||
/* Now that we are ready to send it's a good moment to create WAL reader */
|
||||
wp->api.wal_reader_allocate(sk);
|
||||
|
||||
/*
|
||||
* Determine start LSN by comparing safekeeper's log term switch history
|
||||
* and proposer's, searching for the divergence point.
|
||||
@@ -1231,6 +1196,7 @@ StartStreaming(Safekeeper *sk)
|
||||
* once for a connection.
|
||||
*/
|
||||
sk->state = SS_ACTIVE;
|
||||
sk->active_state = SS_ACTIVE_SEND;
|
||||
sk->streamingAt = sk->startStreamingAt;
|
||||
|
||||
/* event set will be updated inside SendMessageToNode */
|
||||
@@ -1289,9 +1255,13 @@ HandleActiveState(Safekeeper *sk, uint32 events)
|
||||
{
|
||||
WalProposer *wp = sk->wp;
|
||||
|
||||
uint32 newEvents = WL_SOCKET_READABLE;
|
||||
|
||||
if (events & WL_SOCKET_WRITEABLE)
|
||||
/*
|
||||
* Note: we don't known which socket awoke us (sk or nwr). However, as
|
||||
* SendAppendRequests always tries to send at least one msg in
|
||||
* SS_ACTIVE_SEND be careful not to go there if are only after sk
|
||||
* response, otherwise it'd create busy loop of pings.
|
||||
*/
|
||||
if (events & WL_SOCKET_WRITEABLE || sk->active_state == SS_ACTIVE_READ_WAL)
|
||||
if (!SendAppendRequests(sk))
|
||||
return;
|
||||
|
||||
@@ -1299,28 +1269,29 @@ HandleActiveState(Safekeeper *sk, uint32 events)
|
||||
if (!RecvAppendResponses(sk))
|
||||
return;
|
||||
|
||||
/*
|
||||
* We should wait for WL_SOCKET_WRITEABLE event if we have unflushed data
|
||||
* in the buffer.
|
||||
*
|
||||
* LSN comparison checks if we have pending unsent messages. This check
|
||||
* isn't necessary now, because we always send append messages immediately
|
||||
* after arrival. But it's good to have it here in case we change this
|
||||
* behavior in the future.
|
||||
*/
|
||||
if (sk->streamingAt != wp->availableLsn || sk->flushWrite)
|
||||
newEvents |= WL_SOCKET_WRITEABLE;
|
||||
#if PG_VERSION_NUM >= 150000
|
||||
/* expected never to happen, c.f. walprop_pg_active_state_update_event_set */
|
||||
if (events & WL_SOCKET_CLOSED)
|
||||
{
|
||||
walprop_log(WARNING, "connection to %s:%s in active state failed, got WL_SOCKET_CLOSED on neon_walreader socket",
|
||||
sk->host, sk->port);
|
||||
ShutdownConnection(sk);
|
||||
return;
|
||||
}
|
||||
#endif
|
||||
|
||||
wp->api.update_event_set(sk, newEvents);
|
||||
/* configures event set for yield whatever is the substate */
|
||||
wp->api.active_state_update_event_set(sk);
|
||||
}
|
||||
|
||||
/*
|
||||
* Send WAL messages starting from sk->streamingAt until the end or non-writable
|
||||
* socket, whichever comes first. Caller should take care of updating event set.
|
||||
* Even if no unsent WAL is available, at least one empty message will be sent
|
||||
* as a heartbeat, if socket is ready.
|
||||
* socket or neon_walreader blocks, whichever comes first; active_state is
|
||||
* updated accordingly. Caller should take care of updating event set. Even if
|
||||
* no unsent WAL is available, at least one empty message will be sent as a
|
||||
* heartbeat, if socket is ready.
|
||||
*
|
||||
* Can change state if Async* functions encounter errors and reset connection.
|
||||
* Resets state and kills the connections if any error on them is encountered.
|
||||
* Returns false in this case, true otherwise.
|
||||
*/
|
||||
static bool
|
||||
@@ -1328,11 +1299,11 @@ SendAppendRequests(Safekeeper *sk)
|
||||
{
|
||||
WalProposer *wp = sk->wp;
|
||||
XLogRecPtr endLsn;
|
||||
AppendRequestHeader *req;
|
||||
PGAsyncWriteResult writeResult;
|
||||
bool sentAnything = false;
|
||||
AppendRequestHeader *req;
|
||||
|
||||
if (sk->flushWrite)
|
||||
if (sk->active_state == SS_ACTIVE_FLUSH)
|
||||
{
|
||||
if (!AsyncFlush(sk))
|
||||
|
||||
@@ -1343,76 +1314,101 @@ SendAppendRequests(Safekeeper *sk)
|
||||
return sk->state == SS_ACTIVE;
|
||||
|
||||
/* Event set will be updated in the end of HandleActiveState */
|
||||
sk->flushWrite = false;
|
||||
sk->active_state = SS_ACTIVE_SEND;
|
||||
}
|
||||
|
||||
while (sk->streamingAt != wp->availableLsn || !sentAnything)
|
||||
{
|
||||
sentAnything = true;
|
||||
|
||||
endLsn = sk->streamingAt;
|
||||
endLsn += MAX_SEND_SIZE;
|
||||
|
||||
/* if we went beyond available WAL, back off */
|
||||
if (endLsn > wp->availableLsn)
|
||||
if (sk->active_state == SS_ACTIVE_SEND)
|
||||
{
|
||||
endLsn = wp->availableLsn;
|
||||
sentAnything = true;
|
||||
|
||||
endLsn = sk->streamingAt;
|
||||
endLsn += MAX_SEND_SIZE;
|
||||
|
||||
/* if we went beyond available WAL, back off */
|
||||
if (endLsn > wp->availableLsn)
|
||||
{
|
||||
endLsn = wp->availableLsn;
|
||||
}
|
||||
|
||||
req = &sk->appendRequest;
|
||||
PrepareAppendRequest(sk->wp, &sk->appendRequest, sk->streamingAt, endLsn);
|
||||
|
||||
walprop_log(DEBUG5, "sending message len %ld beginLsn=%X/%X endLsn=%X/%X commitLsn=%X/%X truncateLsn=%X/%X to %s:%s",
|
||||
req->endLsn - req->beginLsn,
|
||||
LSN_FORMAT_ARGS(req->beginLsn),
|
||||
LSN_FORMAT_ARGS(req->endLsn),
|
||||
LSN_FORMAT_ARGS(req->commitLsn),
|
||||
LSN_FORMAT_ARGS(wp->truncateLsn), sk->host, sk->port);
|
||||
|
||||
resetStringInfo(&sk->outbuf);
|
||||
|
||||
/* write AppendRequest header */
|
||||
appendBinaryStringInfo(&sk->outbuf, (char *) req, sizeof(AppendRequestHeader));
|
||||
enlargeStringInfo(&sk->outbuf, req->endLsn - req->beginLsn);
|
||||
sk->active_state = SS_ACTIVE_READ_WAL;
|
||||
}
|
||||
|
||||
req = &sk->appendRequest;
|
||||
PrepareAppendRequest(sk->wp, &sk->appendRequest, sk->streamingAt, endLsn);
|
||||
|
||||
walprop_log(DEBUG2, "sending message len %ld beginLsn=%X/%X endLsn=%X/%X commitLsn=%X/%X truncateLsn=%X/%X to %s:%s",
|
||||
req->endLsn - req->beginLsn,
|
||||
LSN_FORMAT_ARGS(req->beginLsn),
|
||||
LSN_FORMAT_ARGS(req->endLsn),
|
||||
LSN_FORMAT_ARGS(req->commitLsn),
|
||||
LSN_FORMAT_ARGS(wp->truncateLsn), sk->host, sk->port);
|
||||
|
||||
resetStringInfo(&sk->outbuf);
|
||||
|
||||
/* write AppendRequest header */
|
||||
appendBinaryStringInfo(&sk->outbuf, (char *) req, sizeof(AppendRequestHeader));
|
||||
|
||||
/* write the WAL itself */
|
||||
enlargeStringInfo(&sk->outbuf, req->endLsn - req->beginLsn);
|
||||
/* wal_read will raise error on failure */
|
||||
wp->api.wal_read(sk,
|
||||
&sk->outbuf.data[sk->outbuf.len],
|
||||
req->beginLsn,
|
||||
req->endLsn - req->beginLsn);
|
||||
sk->outbuf.len += req->endLsn - req->beginLsn;
|
||||
|
||||
writeResult = wp->api.conn_async_write(sk, sk->outbuf.data, sk->outbuf.len);
|
||||
|
||||
/* Mark current message as sent, whatever the result is */
|
||||
sk->streamingAt = endLsn;
|
||||
|
||||
switch (writeResult)
|
||||
if (sk->active_state == SS_ACTIVE_READ_WAL)
|
||||
{
|
||||
case PG_ASYNC_WRITE_SUCCESS:
|
||||
/* Continue writing the next message */
|
||||
break;
|
||||
char *errmsg;
|
||||
|
||||
case PG_ASYNC_WRITE_TRY_FLUSH:
|
||||
req = &sk->appendRequest;
|
||||
|
||||
/*
|
||||
* * We still need to call PQflush some more to finish the
|
||||
* job. Caller function will handle this by setting right
|
||||
* event* set.
|
||||
*/
|
||||
sk->flushWrite = true;
|
||||
return true;
|
||||
switch (wp->api.wal_read(sk,
|
||||
&sk->outbuf.data[sk->outbuf.len],
|
||||
req->beginLsn,
|
||||
req->endLsn - req->beginLsn,
|
||||
&errmsg))
|
||||
{
|
||||
case NEON_WALREAD_SUCCESS:
|
||||
break;
|
||||
case NEON_WALREAD_WOULDBLOCK:
|
||||
return true;
|
||||
case NEON_WALREAD_ERROR:
|
||||
walprop_log(WARNING, "WAL reading for node %s:%s failed: %s",
|
||||
sk->host, sk->port, errmsg);
|
||||
ShutdownConnection(sk);
|
||||
return false;
|
||||
default:
|
||||
Assert(false);
|
||||
}
|
||||
|
||||
case PG_ASYNC_WRITE_FAIL:
|
||||
walprop_log(WARNING, "Failed to send to node %s:%s in %s state: %s",
|
||||
sk->host, sk->port, FormatSafekeeperState(sk->state),
|
||||
wp->api.conn_error_message(sk));
|
||||
ShutdownConnection(sk);
|
||||
return false;
|
||||
default:
|
||||
Assert(false);
|
||||
return false;
|
||||
sk->outbuf.len += req->endLsn - req->beginLsn;
|
||||
|
||||
writeResult = wp->api.conn_async_write(sk, sk->outbuf.data, sk->outbuf.len);
|
||||
|
||||
/* Mark current message as sent, whatever the result is */
|
||||
sk->streamingAt = req->endLsn;
|
||||
|
||||
switch (writeResult)
|
||||
{
|
||||
case PG_ASYNC_WRITE_SUCCESS:
|
||||
/* Continue writing the next message */
|
||||
sk->active_state = SS_ACTIVE_SEND;
|
||||
break;
|
||||
|
||||
case PG_ASYNC_WRITE_TRY_FLUSH:
|
||||
|
||||
/*
|
||||
* We still need to call PQflush some more to finish the
|
||||
* job. Caller function will handle this by setting right
|
||||
* event set.
|
||||
*/
|
||||
sk->active_state = SS_ACTIVE_FLUSH;
|
||||
return true;
|
||||
|
||||
case PG_ASYNC_WRITE_FAIL:
|
||||
walprop_log(WARNING, "failed to send to node %s:%s in %s state: %s",
|
||||
sk->host, sk->port, FormatSafekeeperState(sk),
|
||||
wp->api.conn_error_message(sk));
|
||||
ShutdownConnection(sk);
|
||||
return false;
|
||||
default:
|
||||
Assert(false);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1422,7 +1418,7 @@ SendAppendRequests(Safekeeper *sk)
|
||||
/*
|
||||
* Receive and process all available feedback.
|
||||
*
|
||||
* Can change state if Async* functions encounter errors and reset connection.
|
||||
* Resets state and kills the connection if any error on it is encountered.
|
||||
* Returns false in this case, true otherwise.
|
||||
*
|
||||
* NB: This function can call SendMessageToNode and produce new messages.
|
||||
@@ -1608,6 +1604,53 @@ GetAcknowledgedByQuorumWALPosition(WalProposer *wp)
|
||||
return responses[wp->n_safekeepers - wp->quorum];
|
||||
}
|
||||
|
||||
/*
|
||||
* Return safekeeper with active connection from which WAL can be downloaded, or
|
||||
* none if it doesn't exist. donor_lsn is set to end position of the donor to
|
||||
* the best of our knowledge.
|
||||
*/
|
||||
Safekeeper *
|
||||
GetDonor(WalProposer *wp, XLogRecPtr *donor_lsn)
|
||||
{
|
||||
*donor_lsn = InvalidXLogRecPtr;
|
||||
Safekeeper *donor = NULL;
|
||||
int i;
|
||||
|
||||
if (wp->n_votes < wp->quorum)
|
||||
{
|
||||
walprop_log(WARNING, "GetDonor called before elections are won");
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/*
|
||||
* First, consider node which had determined our term start LSN as we know
|
||||
* about its position immediately after election before any feedbacks are
|
||||
* sent.
|
||||
*/
|
||||
if (wp->safekeeper[wp->donor].state >= SS_IDLE)
|
||||
{
|
||||
donor = &wp->safekeeper[wp->donor];
|
||||
*donor_lsn = wp->propEpochStartLsn;
|
||||
}
|
||||
|
||||
/*
|
||||
* But also check feedbacks from all nodes with live connections and take
|
||||
* the highest one. Note: if node sends feedbacks it already processed
|
||||
* elected message so its term is fine.
|
||||
*/
|
||||
for (i = 0; i < wp->n_safekeepers; i++)
|
||||
{
|
||||
Safekeeper *sk = &wp->safekeeper[i];
|
||||
|
||||
if (sk->state == SS_ACTIVE && sk->appendResponse.flushLsn > *donor_lsn)
|
||||
{
|
||||
donor = sk;
|
||||
*donor_lsn = sk->appendResponse.flushLsn;
|
||||
}
|
||||
}
|
||||
return donor;
|
||||
}
|
||||
|
||||
static void
|
||||
HandleSafekeeperResponse(WalProposer *wp)
|
||||
{
|
||||
@@ -1713,7 +1756,7 @@ AsyncRead(Safekeeper *sk, char **buf, int *buf_size)
|
||||
|
||||
case PG_ASYNC_READ_FAIL:
|
||||
walprop_log(WARNING, "Failed to read from node %s:%s in %s state: %s", sk->host,
|
||||
sk->port, FormatSafekeeperState(sk->state),
|
||||
sk->port, FormatSafekeeperState(sk),
|
||||
wp->api.conn_error_message(sk));
|
||||
ShutdownConnection(sk);
|
||||
return false;
|
||||
@@ -1753,7 +1796,7 @@ AsyncReadMessage(Safekeeper *sk, AcceptorProposerMessage *anymsg)
|
||||
if (tag != anymsg->tag)
|
||||
{
|
||||
walprop_log(WARNING, "unexpected message tag %c from node %s:%s in state %s", (char) tag, sk->host,
|
||||
sk->port, FormatSafekeeperState(sk->state));
|
||||
sk->port, FormatSafekeeperState(sk));
|
||||
ResetConnection(sk);
|
||||
return false;
|
||||
}
|
||||
@@ -1824,12 +1867,13 @@ static bool
|
||||
BlockingWrite(Safekeeper *sk, void *msg, size_t msg_size, SafekeeperState success_state)
|
||||
{
|
||||
WalProposer *wp = sk->wp;
|
||||
uint32 events;
|
||||
uint32 sk_events;
|
||||
uint32 nwr_events;
|
||||
|
||||
if (!wp->api.conn_blocking_write(sk, msg, msg_size))
|
||||
{
|
||||
walprop_log(WARNING, "Failed to send to node %s:%s in %s state: %s",
|
||||
sk->host, sk->port, FormatSafekeeperState(sk->state),
|
||||
sk->host, sk->port, FormatSafekeeperState(sk),
|
||||
wp->api.conn_error_message(sk));
|
||||
ShutdownConnection(sk);
|
||||
return false;
|
||||
@@ -1841,9 +1885,15 @@ BlockingWrite(Safekeeper *sk, void *msg, size_t msg_size, SafekeeperState succes
|
||||
* If the new state will be waiting for events to happen, update the event
|
||||
* set to wait for those
|
||||
*/
|
||||
events = SafekeeperStateDesiredEvents(success_state);
|
||||
if (events)
|
||||
wp->api.update_event_set(sk, events);
|
||||
SafekeeperStateDesiredEvents(sk, &sk_events, &nwr_events);
|
||||
|
||||
/*
|
||||
* nwr_events is relevant only during SS_ACTIVE which doesn't use
|
||||
* BlockingWrite
|
||||
*/
|
||||
Assert(!nwr_events);
|
||||
if (sk_events)
|
||||
wp->api.update_event_set(sk, sk_events);
|
||||
|
||||
return true;
|
||||
}
|
||||
@@ -1876,7 +1926,7 @@ AsyncWrite(Safekeeper *sk, void *msg, size_t msg_size, SafekeeperState flush_sta
|
||||
return false;
|
||||
case PG_ASYNC_WRITE_FAIL:
|
||||
walprop_log(WARNING, "Failed to send to node %s:%s in %s state: %s",
|
||||
sk->host, sk->port, FormatSafekeeperState(sk->state),
|
||||
sk->host, sk->port, FormatSafekeeperState(sk),
|
||||
wp->api.conn_error_message(sk));
|
||||
ShutdownConnection(sk);
|
||||
return false;
|
||||
@@ -1915,7 +1965,7 @@ AsyncFlush(Safekeeper *sk)
|
||||
return false;
|
||||
case -1:
|
||||
walprop_log(WARNING, "Failed to flush write to node %s:%s in %s state: %s",
|
||||
sk->host, sk->port, FormatSafekeeperState(sk->state),
|
||||
sk->host, sk->port, FormatSafekeeperState(sk),
|
||||
wp->api.conn_error_message(sk));
|
||||
ResetConnection(sk);
|
||||
return false;
|
||||
@@ -1945,18 +1995,18 @@ CompareLsn(const void *a, const void *b)
|
||||
*
|
||||
* The strings are intended to be used as a prefix to "state", e.g.:
|
||||
*
|
||||
* walprop_log(LOG, "currently in %s state", FormatSafekeeperState(sk->state));
|
||||
* walprop_log(LOG, "currently in %s state", FormatSafekeeperState(sk));
|
||||
*
|
||||
* If this sort of phrasing doesn't fit the message, instead use something like:
|
||||
*
|
||||
* walprop_log(LOG, "currently in state [%s]", FormatSafekeeperState(sk->state));
|
||||
* walprop_log(LOG, "currently in state [%s]", FormatSafekeeperState(sk));
|
||||
*/
|
||||
static char *
|
||||
FormatSafekeeperState(SafekeeperState state)
|
||||
FormatSafekeeperState(Safekeeper *sk)
|
||||
{
|
||||
char *return_val = NULL;
|
||||
|
||||
switch (state)
|
||||
switch (sk->state)
|
||||
{
|
||||
case SS_OFFLINE:
|
||||
return_val = "offline";
|
||||
@@ -1984,7 +2034,18 @@ FormatSafekeeperState(SafekeeperState state)
|
||||
return_val = "idle";
|
||||
break;
|
||||
case SS_ACTIVE:
|
||||
return_val = "active";
|
||||
switch (sk->active_state)
|
||||
{
|
||||
case SS_ACTIVE_SEND:
|
||||
return_val = "active send";
|
||||
break;
|
||||
case SS_ACTIVE_READ_WAL:
|
||||
return_val = "active read WAL";
|
||||
break;
|
||||
case SS_ACTIVE_FLUSH:
|
||||
return_val = "active flush";
|
||||
break;
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -1997,22 +2058,21 @@ FormatSafekeeperState(SafekeeperState state)
|
||||
static void
|
||||
AssertEventsOkForState(uint32 events, Safekeeper *sk)
|
||||
{
|
||||
WalProposer *wp = sk->wp;
|
||||
uint32 expected = SafekeeperStateDesiredEvents(sk->state);
|
||||
|
||||
/*
|
||||
* The events are in-line with what we're expecting, under two conditions:
|
||||
* (a) if we aren't expecting anything, `events` has no read- or
|
||||
* write-ready component. (b) if we are expecting something, there's
|
||||
* overlap (i.e. `events & expected != 0`)
|
||||
*/
|
||||
uint32 sk_events;
|
||||
uint32 nwr_events;
|
||||
uint32 expected;
|
||||
bool events_ok_for_state; /* long name so the `Assert` is more
|
||||
* clear later */
|
||||
WalProposer *wp = sk->wp;
|
||||
|
||||
if (expected == WL_NO_EVENTS)
|
||||
events_ok_for_state = ((events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)) == 0);
|
||||
else
|
||||
events_ok_for_state = ((events & expected) != 0);
|
||||
SafekeeperStateDesiredEvents(sk, &sk_events, &nwr_events);
|
||||
|
||||
/*
|
||||
* Without one more level of notify target indirection we have no way to
|
||||
* distinguish which socket woke up us, so just union expected events.
|
||||
*/
|
||||
expected = sk_events | nwr_events;
|
||||
events_ok_for_state = ((events & expected) != 0);
|
||||
|
||||
if (!events_ok_for_state)
|
||||
{
|
||||
@@ -2021,36 +2081,39 @@ AssertEventsOkForState(uint32 events, Safekeeper *sk)
|
||||
* and then an assertion that's guaranteed to fail.
|
||||
*/
|
||||
walprop_log(WARNING, "events %s mismatched for safekeeper %s:%s in state [%s]",
|
||||
FormatEvents(wp, events), sk->host, sk->port, FormatSafekeeperState(sk->state));
|
||||
FormatEvents(wp, events), sk->host, sk->port, FormatSafekeeperState(sk));
|
||||
Assert(events_ok_for_state);
|
||||
}
|
||||
}
|
||||
|
||||
/* Returns the set of events a safekeeper in this state should be waiting on
|
||||
/* Returns the set of events for both safekeeper (sk_events) and neon_walreader
|
||||
* (nwr_events) sockets a safekeeper in this state should be waiting on.
|
||||
*
|
||||
* This will return WL_NO_EVENTS (= 0) for some events. */
|
||||
static uint32
|
||||
SafekeeperStateDesiredEvents(SafekeeperState state)
|
||||
void
|
||||
SafekeeperStateDesiredEvents(Safekeeper *sk, uint32 *sk_events, uint32 *nwr_events)
|
||||
{
|
||||
uint32 result = WL_NO_EVENTS;
|
||||
WalProposer *wp = sk->wp;
|
||||
|
||||
*nwr_events = 0; /* nwr_events is empty for most states */
|
||||
|
||||
/* If the state doesn't have a modifier, we can check the base state */
|
||||
switch (state)
|
||||
switch (sk->state)
|
||||
{
|
||||
/* Connecting states say what they want in the name */
|
||||
case SS_CONNECTING_READ:
|
||||
result = WL_SOCKET_READABLE;
|
||||
break;
|
||||
*sk_events = WL_SOCKET_READABLE;
|
||||
return;
|
||||
case SS_CONNECTING_WRITE:
|
||||
result = WL_SOCKET_WRITEABLE;
|
||||
break;
|
||||
*sk_events = WL_SOCKET_WRITEABLE;
|
||||
return;
|
||||
|
||||
/* Reading states need the socket to be read-ready to continue */
|
||||
case SS_WAIT_EXEC_RESULT:
|
||||
case SS_HANDSHAKE_RECV:
|
||||
case SS_WAIT_VERDICT:
|
||||
result = WL_SOCKET_READABLE;
|
||||
break;
|
||||
*sk_events = WL_SOCKET_READABLE;
|
||||
return;
|
||||
|
||||
/*
|
||||
* Idle states use read-readiness as a sign that the connection
|
||||
@@ -2058,32 +2121,66 @@ SafekeeperStateDesiredEvents(SafekeeperState state)
|
||||
*/
|
||||
case SS_VOTING:
|
||||
case SS_IDLE:
|
||||
result = WL_SOCKET_READABLE;
|
||||
break;
|
||||
*sk_events = WL_SOCKET_READABLE;
|
||||
return;
|
||||
|
||||
/*
|
||||
* Flush states require write-ready for flushing. Active state
|
||||
* does both reading and writing.
|
||||
*
|
||||
* TODO: SS_ACTIVE sometimes doesn't need to be write-ready. We
|
||||
* should check sk->flushWrite here to set WL_SOCKET_WRITEABLE.
|
||||
*/
|
||||
case SS_SEND_ELECTED_FLUSH:
|
||||
*sk_events = WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE;
|
||||
return;
|
||||
|
||||
case SS_ACTIVE:
|
||||
result = WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE;
|
||||
break;
|
||||
switch (sk->active_state)
|
||||
{
|
||||
/*
|
||||
* Everything is sent; we just wait for sk responses and
|
||||
* latch.
|
||||
*
|
||||
* Note: this assumes we send all available WAL to
|
||||
* safekeeper in one wakeup (unless it blocks). Otherwise
|
||||
* we would want WL_SOCKET_WRITEABLE here to finish the
|
||||
* work.
|
||||
*/
|
||||
case SS_ACTIVE_SEND:
|
||||
*sk_events = WL_SOCKET_READABLE;
|
||||
/* c.f. walprop_pg_active_state_update_event_set */
|
||||
#if PG_VERSION_NUM >= 150000
|
||||
if (wp->api.wal_reader_events(sk))
|
||||
*nwr_events = WL_SOCKET_CLOSED;
|
||||
#endif /* on PG 14 nwr_events remains 0 */
|
||||
return;
|
||||
|
||||
/*
|
||||
* Waiting for neon_walreader socket, but we still read
|
||||
* responses from sk socket.
|
||||
*/
|
||||
case SS_ACTIVE_READ_WAL:
|
||||
*sk_events = WL_SOCKET_READABLE;
|
||||
*nwr_events = wp->api.wal_reader_events(sk);
|
||||
return;
|
||||
|
||||
/*
|
||||
* Need to flush the sk socket, so ignore neon_walreader
|
||||
* one and set write interest on sk.
|
||||
*/
|
||||
case SS_ACTIVE_FLUSH:
|
||||
*sk_events = WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE;
|
||||
#if PG_VERSION_NUM >= 150000
|
||||
/* c.f. walprop_pg_active_state_update_event_set */
|
||||
if (wp->api.wal_reader_events(sk))
|
||||
*nwr_events = WL_SOCKET_CLOSED;
|
||||
#endif /* on PG 14 nwr_events remains 0 */
|
||||
return;
|
||||
}
|
||||
return;
|
||||
|
||||
/* The offline state expects no events. */
|
||||
case SS_OFFLINE:
|
||||
result = WL_NO_EVENTS;
|
||||
break;
|
||||
*sk_events = 0;
|
||||
return;
|
||||
|
||||
default:
|
||||
Assert(false);
|
||||
break;
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
/* Returns a human-readable string corresponding to the event set
|
||||
|
||||
@@ -8,6 +8,9 @@
|
||||
#include "replication/walreceiver.h"
|
||||
#include "utils/uuid.h"
|
||||
|
||||
#include "libpqwalproposer.h"
|
||||
#include "neon_walreader.h"
|
||||
|
||||
#define SK_MAGIC 0xCafeCeefu
|
||||
#define SK_PROTOCOL_VERSION 2
|
||||
|
||||
@@ -20,43 +23,9 @@
|
||||
*/
|
||||
#define WL_NO_EVENTS 0
|
||||
|
||||
struct WalProposerConn; /* Defined in implementation (walprop_pg.c) */
|
||||
struct WalProposerConn; /* Defined in libpqwalproposer.h */
|
||||
typedef struct WalProposerConn WalProposerConn;
|
||||
|
||||
/* Possible return values from ReadPGAsync */
|
||||
typedef enum
|
||||
{
|
||||
/* The full read was successful. buf now points to the data */
|
||||
PG_ASYNC_READ_SUCCESS,
|
||||
|
||||
/*
|
||||
* The read is ongoing. Wait until the connection is read-ready, then try
|
||||
* again.
|
||||
*/
|
||||
PG_ASYNC_READ_TRY_AGAIN,
|
||||
/* Reading failed. Check PQerrorMessage(conn) */
|
||||
PG_ASYNC_READ_FAIL,
|
||||
} PGAsyncReadResult;
|
||||
|
||||
/* Possible return values from WritePGAsync */
|
||||
typedef enum
|
||||
{
|
||||
/* The write fully completed */
|
||||
PG_ASYNC_WRITE_SUCCESS,
|
||||
|
||||
/*
|
||||
* The write started, but you'll need to call PQflush some more times to
|
||||
* finish it off. We just tried, so it's best to wait until the connection
|
||||
* is read- or write-ready to try again.
|
||||
*
|
||||
* If it becomes read-ready, call PQconsumeInput and flush again. If it
|
||||
* becomes write-ready, just call PQflush.
|
||||
*/
|
||||
PG_ASYNC_WRITE_TRY_FLUSH,
|
||||
/* Writing failed. Check PQerrorMessage(conn) */
|
||||
PG_ASYNC_WRITE_FAIL,
|
||||
} PGAsyncWriteResult;
|
||||
|
||||
/*
|
||||
* WAL safekeeper state, which is used to wait for some event.
|
||||
*
|
||||
@@ -133,6 +102,40 @@ typedef enum
|
||||
SS_ACTIVE,
|
||||
} SafekeeperState;
|
||||
|
||||
/*
|
||||
* Sending WAL substates of SS_ACTIVE.
|
||||
*/
|
||||
typedef enum
|
||||
{
|
||||
/*
|
||||
* We are ready to send more WAL, waiting for latch set to learn about
|
||||
* more WAL becoming available (or just a timeout to send heartbeat).
|
||||
*/
|
||||
SS_ACTIVE_SEND,
|
||||
|
||||
/*
|
||||
* Polling neon_walreader to receive chunk of WAL (probably remotely) to
|
||||
* send to this safekeeper.
|
||||
*
|
||||
* Note: socket management is done completely inside walproposer_pg for
|
||||
* simplicity, and thus simulation doesn't test it. Which is fine as
|
||||
* simulation is mainly aimed at consensus checks, not waiteventset
|
||||
* management.
|
||||
*
|
||||
* Also, while in this state we don't touch safekeeper socket, so in
|
||||
* theory it might close connection as inactive. This can be addressed if
|
||||
* needed; however, while fetching WAL we should regularly send it, so the
|
||||
* problem is unlikely. Vice versa is also true (SS_ACTIVE doesn't handle
|
||||
* walreader socket), but similarly shouldn't be a problem.
|
||||
*/
|
||||
SS_ACTIVE_READ_WAL,
|
||||
|
||||
/*
|
||||
* Waiting for write readiness to flush the socket.
|
||||
*/
|
||||
SS_ACTIVE_FLUSH,
|
||||
} SafekeeperActiveState;
|
||||
|
||||
/* Consensus logical timestamp. */
|
||||
typedef uint64 term_t;
|
||||
|
||||
@@ -341,12 +344,11 @@ typedef struct Safekeeper
|
||||
*/
|
||||
XLogRecPtr startStreamingAt;
|
||||
|
||||
bool flushWrite; /* set to true if we need to call AsyncFlush,*
|
||||
* to flush pending messages */
|
||||
XLogRecPtr streamingAt; /* current streaming position */
|
||||
AppendRequestHeader appendRequest; /* request for sending to safekeeper */
|
||||
|
||||
SafekeeperState state; /* safekeeper state machine state */
|
||||
SafekeeperActiveState active_state;
|
||||
TimestampTz latestMsgReceivedAt; /* when latest msg is received */
|
||||
AcceptorGreeting greetResponse; /* acceptor greeting */
|
||||
VoteResponse voteResponse; /* the vote */
|
||||
@@ -367,12 +369,17 @@ typedef struct Safekeeper
|
||||
/*
|
||||
* WAL reader, allocated for each safekeeper.
|
||||
*/
|
||||
XLogReaderState *xlogreader;
|
||||
NeonWALReader *xlogreader;
|
||||
|
||||
/*
|
||||
* Position in wait event set. Equal to -1 if no event
|
||||
*/
|
||||
int eventPos;
|
||||
|
||||
/*
|
||||
* Neon WAL reader position in wait event set, or -1 if no socket.
|
||||
*/
|
||||
int nwrEventPos;
|
||||
#endif
|
||||
|
||||
|
||||
@@ -401,31 +408,6 @@ typedef enum
|
||||
*/
|
||||
} WalProposerConnectPollStatusType;
|
||||
|
||||
/* Re-exported and modified ExecStatusType */
|
||||
typedef enum
|
||||
{
|
||||
/* We received a single CopyBoth result */
|
||||
WP_EXEC_SUCCESS_COPYBOTH,
|
||||
|
||||
/*
|
||||
* Any success result other than a single CopyBoth was received. The
|
||||
* specifics of the result were already logged, but it may be useful to
|
||||
* provide an error message indicating which safekeeper messed up.
|
||||
*
|
||||
* Do not expect PQerrorMessage to be appropriately set.
|
||||
*/
|
||||
WP_EXEC_UNEXPECTED_SUCCESS,
|
||||
|
||||
/*
|
||||
* No result available at this time. Wait until read-ready, then call
|
||||
* again. Internally, this is returned when PQisBusy indicates that
|
||||
* PQgetResult would block.
|
||||
*/
|
||||
WP_EXEC_NEEDS_INPUT,
|
||||
/* Catch-all failure. Check PQerrorMessage. */
|
||||
WP_EXEC_FAILED,
|
||||
} WalProposerExecStatusType;
|
||||
|
||||
/* Re-exported ConnStatusType */
|
||||
typedef enum
|
||||
{
|
||||
@@ -486,7 +468,7 @@ typedef struct walproposer_api
|
||||
/* Flush buffer to the network, aka PQflush. */
|
||||
int (*conn_flush) (Safekeeper *sk);
|
||||
|
||||
/* Close the connection, aka PQfinish. */
|
||||
/* Reset sk state: close pq connection, deallocate xlogreader. */
|
||||
void (*conn_finish) (Safekeeper *sk);
|
||||
|
||||
/*
|
||||
@@ -506,14 +488,14 @@ typedef struct walproposer_api
|
||||
/* Download WAL from startpos to endpos and make it available locally. */
|
||||
bool (*recovery_download) (Safekeeper *sk, TimeLineID timeline, XLogRecPtr startpos, XLogRecPtr endpos);
|
||||
|
||||
/* Read WAL from disk to buf. */
|
||||
void (*wal_read) (Safekeeper *sk, char *buf, XLogRecPtr startptr, Size count);
|
||||
|
||||
/* Allocate WAL reader. */
|
||||
void (*wal_reader_allocate) (Safekeeper *sk);
|
||||
|
||||
/* Deallocate event set. */
|
||||
void (*free_event_set) (WalProposer *wp);
|
||||
/* Read WAL from disk to buf. */
|
||||
NeonWALReadResult (*wal_read) (Safekeeper *sk, char *buf, XLogRecPtr startptr, Size count, char **errmsg);
|
||||
|
||||
/* Returns events to be awaited on WAL reader, if any. */
|
||||
uint32 (*wal_reader_events) (Safekeeper *sk);
|
||||
|
||||
/* Initialize event set. */
|
||||
void (*init_event_set) (WalProposer *wp);
|
||||
@@ -521,9 +503,15 @@ typedef struct walproposer_api
|
||||
/* Update events for an existing safekeeper connection. */
|
||||
void (*update_event_set) (Safekeeper *sk, uint32 events);
|
||||
|
||||
/* Configure wait event set for yield in SS_ACTIVE. */
|
||||
void (*active_state_update_event_set) (Safekeeper *sk);
|
||||
|
||||
/* Add a new safekeeper connection to the event set. */
|
||||
void (*add_safekeeper_event_set) (Safekeeper *sk, uint32 events);
|
||||
|
||||
/* Remove safekeeper connection from event set */
|
||||
void (*rm_safekeeper_event_set) (Safekeeper *sk);
|
||||
|
||||
/*
|
||||
* Wait until some event happens: - timeout is reached - socket event for
|
||||
* safekeeper connection - new WAL is available
|
||||
@@ -709,6 +697,13 @@ extern void WalProposerBroadcast(WalProposer *wp, XLogRecPtr startpos, XLogRecPt
|
||||
extern void WalProposerPoll(WalProposer *wp);
|
||||
extern void WalProposerFree(WalProposer *wp);
|
||||
|
||||
/*
|
||||
* WaitEventSet API doesn't allow to remove socket, so walproposer_pg uses it to
|
||||
* recreate set from scratch, hence the export.
|
||||
*/
|
||||
extern void SafekeeperStateDesiredEvents(Safekeeper *sk, uint32 *sk_events, uint32 *nwr_events);
|
||||
extern Safekeeper *GetDonor(WalProposer *wp, XLogRecPtr *donor_lsn);
|
||||
|
||||
|
||||
#define WPEVENT 1337 /* special log level for walproposer internal
|
||||
* events */
|
||||
|
||||
@@ -44,10 +44,13 @@
|
||||
#include "utils/ps_status.h"
|
||||
#include "utils/timestamp.h"
|
||||
|
||||
#include "neon.h"
|
||||
#include "walproposer.h"
|
||||
#include "libpq-fe.h"
|
||||
|
||||
#include "libpqwalproposer.h"
|
||||
#include "neon.h"
|
||||
#include "neon_walreader.h"
|
||||
#include "walproposer.h"
|
||||
|
||||
#define XLOG_HDR_SIZE (1 + 8 * 3) /* 'w' + startPos + walEnd + timestamp */
|
||||
#define XLOG_HDR_START_POS 1 /* offset of start position in wal sender*
|
||||
* message header */
|
||||
@@ -94,6 +97,10 @@ static void XLogBroadcastWalProposer(WalProposer *wp);
|
||||
static void XLogWalPropWrite(WalProposer *wp, char *buf, Size nbytes, XLogRecPtr recptr);
|
||||
static void XLogWalPropClose(XLogRecPtr recptr);
|
||||
|
||||
static void add_nwr_event_set(Safekeeper *sk, uint32 events);
|
||||
static void update_nwr_event_set(Safekeeper *sk, uint32 events);
|
||||
static void rm_safekeeper_event_set(Safekeeper *to_remove, bool is_sk);
|
||||
|
||||
static void
|
||||
init_walprop_config(bool syncSafekeepers)
|
||||
{
|
||||
@@ -543,14 +550,6 @@ walprop_pg_load_libpqwalreceiver(void)
|
||||
elog(ERROR, "libpqwalreceiver didn't initialize correctly");
|
||||
}
|
||||
|
||||
/* Header in walproposer.h -- Wrapper struct to abstract away the libpq connection */
|
||||
struct WalProposerConn
|
||||
{
|
||||
PGconn *pg_conn;
|
||||
bool is_nonblocking; /* whether the connection is non-blocking */
|
||||
char *recvbuf; /* last received data from walprop_async_read */
|
||||
};
|
||||
|
||||
/* Helper function */
|
||||
static bool
|
||||
ensure_nonblocking_status(WalProposerConn *conn, bool is_nonblocking)
|
||||
@@ -588,16 +587,17 @@ walprop_status(Safekeeper *sk)
|
||||
}
|
||||
}
|
||||
|
||||
static void
|
||||
walprop_connect_start(Safekeeper *sk)
|
||||
WalProposerConn *
|
||||
libpqwp_connect_start(char *conninfo)
|
||||
{
|
||||
|
||||
PGconn *pg_conn;
|
||||
WalProposerConn *conn;
|
||||
const char *keywords[3];
|
||||
const char *values[3];
|
||||
int n;
|
||||
char *password = neon_auth_token;
|
||||
|
||||
Assert(sk->conn == NULL);
|
||||
|
||||
/*
|
||||
* Connect using the given connection string. If the NEON_AUTH_TOKEN
|
||||
@@ -616,7 +616,7 @@ walprop_connect_start(Safekeeper *sk)
|
||||
n++;
|
||||
}
|
||||
keywords[n] = "dbname";
|
||||
values[n] = sk->conninfo;
|
||||
values[n] = conninfo;
|
||||
n++;
|
||||
keywords[n] = NULL;
|
||||
values[n] = NULL;
|
||||
@@ -637,11 +637,20 @@ walprop_connect_start(Safekeeper *sk)
|
||||
* palloc will exit on failure though, so there's not much we could do if
|
||||
* it *did* fail.
|
||||
*/
|
||||
sk->conn = palloc(sizeof(WalProposerConn));
|
||||
sk->conn->pg_conn = pg_conn;
|
||||
sk->conn->is_nonblocking = false; /* connections always start in
|
||||
* blocking mode */
|
||||
sk->conn->recvbuf = NULL;
|
||||
conn = palloc(sizeof(WalProposerConn));
|
||||
conn->pg_conn = pg_conn;
|
||||
conn->is_nonblocking = false; /* connections always start in blocking
|
||||
* mode */
|
||||
conn->recvbuf = NULL;
|
||||
return conn;
|
||||
}
|
||||
|
||||
static void
|
||||
walprop_connect_start(Safekeeper *sk)
|
||||
{
|
||||
Assert(sk->conn == NULL);
|
||||
sk->conn = libpqwp_connect_start(sk->conninfo);
|
||||
|
||||
}
|
||||
|
||||
static WalProposerConnectPollStatusType
|
||||
@@ -685,26 +694,33 @@ walprop_connect_poll(Safekeeper *sk)
|
||||
return return_val;
|
||||
}
|
||||
|
||||
static bool
|
||||
walprop_send_query(Safekeeper *sk, char *query)
|
||||
extern bool
|
||||
libpqwp_send_query(WalProposerConn *conn, char *query)
|
||||
{
|
||||
/*
|
||||
* We need to be in blocking mode for sending the query to run without
|
||||
* requiring a call to PQflush
|
||||
*/
|
||||
if (!ensure_nonblocking_status(sk->conn, false))
|
||||
if (!ensure_nonblocking_status(conn, false))
|
||||
return false;
|
||||
|
||||
/* PQsendQuery returns 1 on success, 0 on failure */
|
||||
if (!PQsendQuery(sk->conn->pg_conn, query))
|
||||
if (!PQsendQuery(conn->pg_conn, query))
|
||||
return false;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
static WalProposerExecStatusType
|
||||
walprop_get_query_result(Safekeeper *sk)
|
||||
static bool
|
||||
walprop_send_query(Safekeeper *sk, char *query)
|
||||
{
|
||||
return libpqwp_send_query(sk->conn, query);
|
||||
}
|
||||
|
||||
WalProposerExecStatusType
|
||||
libpqwp_get_query_result(WalProposerConn *conn)
|
||||
{
|
||||
|
||||
PGresult *result;
|
||||
WalProposerExecStatusType return_val;
|
||||
|
||||
@@ -712,14 +728,14 @@ walprop_get_query_result(Safekeeper *sk)
|
||||
char *unexpected_success = NULL;
|
||||
|
||||
/* Consume any input that we might be missing */
|
||||
if (!PQconsumeInput(sk->conn->pg_conn))
|
||||
if (!PQconsumeInput(conn->pg_conn))
|
||||
return WP_EXEC_FAILED;
|
||||
|
||||
if (PQisBusy(sk->conn->pg_conn))
|
||||
if (PQisBusy(conn->pg_conn))
|
||||
return WP_EXEC_NEEDS_INPUT;
|
||||
|
||||
|
||||
result = PQgetResult(sk->conn->pg_conn);
|
||||
result = PQgetResult(conn->pg_conn);
|
||||
|
||||
/*
|
||||
* PQgetResult returns NULL only if getting the result was successful &
|
||||
@@ -780,6 +796,12 @@ walprop_get_query_result(Safekeeper *sk)
|
||||
return return_val;
|
||||
}
|
||||
|
||||
static WalProposerExecStatusType
|
||||
walprop_get_query_result(Safekeeper *sk)
|
||||
{
|
||||
return libpqwp_get_query_result(sk->conn);
|
||||
}
|
||||
|
||||
static pgsocket
|
||||
walprop_socket(Safekeeper *sk)
|
||||
{
|
||||
@@ -792,38 +814,21 @@ walprop_flush(Safekeeper *sk)
|
||||
return (PQflush(sk->conn->pg_conn));
|
||||
}
|
||||
|
||||
static void
|
||||
walprop_finish(Safekeeper *sk)
|
||||
/* Like libpqrcv_receive. *buf is valid until the next call. */
|
||||
PGAsyncReadResult
|
||||
libpqwp_async_read(WalProposerConn *conn, char **buf, int *amount)
|
||||
{
|
||||
if (!sk->conn)
|
||||
return;
|
||||
|
||||
if (sk->conn->recvbuf != NULL)
|
||||
PQfreemem(sk->conn->recvbuf);
|
||||
PQfinish(sk->conn->pg_conn);
|
||||
pfree(sk->conn);
|
||||
sk->conn = NULL;
|
||||
}
|
||||
|
||||
/*
|
||||
* Receive a message from the safekeeper.
|
||||
*
|
||||
* On success, the data is placed in *buf. It is valid until the next call
|
||||
* to this function.
|
||||
*/
|
||||
static PGAsyncReadResult
|
||||
walprop_async_read(Safekeeper *sk, char **buf, int *amount)
|
||||
{
|
||||
int result;
|
||||
|
||||
if (sk->conn->recvbuf != NULL)
|
||||
if (conn->recvbuf != NULL)
|
||||
{
|
||||
PQfreemem(sk->conn->recvbuf);
|
||||
sk->conn->recvbuf = NULL;
|
||||
PQfreemem(conn->recvbuf);
|
||||
conn->recvbuf = NULL;
|
||||
}
|
||||
|
||||
/* Call PQconsumeInput so that we have the data we need */
|
||||
if (!PQconsumeInput(sk->conn->pg_conn))
|
||||
if (!PQconsumeInput(conn->pg_conn))
|
||||
{
|
||||
*amount = 0;
|
||||
*buf = NULL;
|
||||
@@ -841,7 +846,7 @@ walprop_async_read(Safekeeper *sk, char **buf, int *amount)
|
||||
* sometimes be triggered by the server returning an ErrorResponse (which
|
||||
* also happens to have the effect that the copy is done).
|
||||
*/
|
||||
switch (result = PQgetCopyData(sk->conn->pg_conn, &sk->conn->recvbuf, true))
|
||||
switch (result = PQgetCopyData(conn->pg_conn, &conn->recvbuf, true))
|
||||
{
|
||||
case 0:
|
||||
*amount = 0;
|
||||
@@ -856,7 +861,7 @@ walprop_async_read(Safekeeper *sk, char **buf, int *amount)
|
||||
* We can check PQgetResult to make sure that the server
|
||||
* failed; it'll always result in PGRES_FATAL_ERROR
|
||||
*/
|
||||
ExecStatusType status = PQresultStatus(PQgetResult(sk->conn->pg_conn));
|
||||
ExecStatusType status = PQresultStatus(PQgetResult(conn->pg_conn));
|
||||
|
||||
if (status != PGRES_FATAL_ERROR)
|
||||
elog(FATAL, "unexpected result status %d after failed PQgetCopyData", status);
|
||||
@@ -877,11 +882,23 @@ walprop_async_read(Safekeeper *sk, char **buf, int *amount)
|
||||
default:
|
||||
/* Positive values indicate the size of the returned result */
|
||||
*amount = result;
|
||||
*buf = sk->conn->recvbuf;
|
||||
*buf = conn->recvbuf;
|
||||
return PG_ASYNC_READ_SUCCESS;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Receive a message from the safekeeper.
|
||||
*
|
||||
* On success, the data is placed in *buf. It is valid until the next call
|
||||
* to this function.
|
||||
*/
|
||||
static PGAsyncReadResult
|
||||
walprop_async_read(Safekeeper *sk, char **buf, int *amount)
|
||||
{
|
||||
return libpqwp_async_read(sk->conn, buf, amount);
|
||||
}
|
||||
|
||||
static PGAsyncWriteResult
|
||||
walprop_async_write(Safekeeper *sk, void const *buf, size_t size)
|
||||
{
|
||||
@@ -964,6 +981,33 @@ walprop_blocking_write(Safekeeper *sk, void const *buf, size_t size)
|
||||
return true;
|
||||
}
|
||||
|
||||
void
|
||||
libpqwp_disconnect(WalProposerConn *conn)
|
||||
{
|
||||
if (conn->recvbuf != NULL)
|
||||
PQfreemem(conn->recvbuf);
|
||||
PQfinish(conn->pg_conn);
|
||||
pfree(conn);
|
||||
}
|
||||
|
||||
static void
|
||||
walprop_finish(Safekeeper *sk)
|
||||
{
|
||||
if (sk->conn)
|
||||
{
|
||||
libpqwp_disconnect(sk->conn);
|
||||
sk->conn = NULL;
|
||||
}
|
||||
|
||||
/* free xlogreader */
|
||||
if (sk->xlogreader)
|
||||
{
|
||||
NeonWALReaderFree(sk->xlogreader);
|
||||
sk->xlogreader = NULL;
|
||||
}
|
||||
rm_safekeeper_event_set(sk, false);
|
||||
}
|
||||
|
||||
/*
|
||||
* Subscribe for new WAL and stream it in the loop to safekeepers.
|
||||
*
|
||||
@@ -1402,30 +1446,56 @@ XLogWalPropClose(XLogRecPtr recptr)
|
||||
walpropFile = -1;
|
||||
}
|
||||
|
||||
static void
|
||||
walprop_pg_wal_read(Safekeeper *sk, char *buf, XLogRecPtr startptr, Size count)
|
||||
{
|
||||
WALReadError errinfo;
|
||||
|
||||
if (!WALRead(sk->xlogreader,
|
||||
buf,
|
||||
startptr,
|
||||
count,
|
||||
walprop_pg_get_timeline_id(),
|
||||
&errinfo))
|
||||
{
|
||||
WALReadRaiseError(&errinfo);
|
||||
}
|
||||
}
|
||||
|
||||
static void
|
||||
walprop_pg_wal_reader_allocate(Safekeeper *sk)
|
||||
{
|
||||
sk->xlogreader = XLogReaderAllocate(wal_segment_size, NULL, XL_ROUTINE(.segment_open = wal_segment_open,.segment_close = wal_segment_close), NULL);
|
||||
char log_prefix[64];
|
||||
|
||||
snprintf(log_prefix, sizeof(log_prefix), "sk %s:%s nwr: ", sk->host, sk->port);
|
||||
Assert(!sk->xlogreader);
|
||||
sk->xlogreader = NeonWALReaderAllocate(wal_segment_size, sk->wp->propEpochStartLsn, sk->wp, log_prefix);
|
||||
if (sk->xlogreader == NULL)
|
||||
elog(FATAL, "Failed to allocate xlog reader");
|
||||
}
|
||||
|
||||
static NeonWALReadResult
|
||||
walprop_pg_wal_read(Safekeeper *sk, char *buf, XLogRecPtr startptr, Size count, char **errmsg)
|
||||
{
|
||||
NeonWALReadResult res;
|
||||
|
||||
res = NeonWALRead(sk->xlogreader,
|
||||
buf,
|
||||
startptr,
|
||||
count,
|
||||
walprop_pg_get_timeline_id());
|
||||
|
||||
if (res == NEON_WALREAD_SUCCESS)
|
||||
{
|
||||
/*
|
||||
* If we have the socket subscribed, but walreader doesn't need any
|
||||
* events, it must mean that remote connection just closed hoping to
|
||||
* do next read locally. Remove the socket then. It is important to do
|
||||
* as otherwise next read might open another connection and we won't
|
||||
* be able to distinguish whether we have correct socket added in wait
|
||||
* event set.
|
||||
*/
|
||||
if (NeonWALReaderEvents(sk->xlogreader) == 0)
|
||||
rm_safekeeper_event_set(sk, false);
|
||||
}
|
||||
else if (res == NEON_WALREAD_ERROR)
|
||||
{
|
||||
*errmsg = NeonWALReaderErrMsg(sk->xlogreader);
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
static uint32
|
||||
walprop_pg_wal_reader_events(Safekeeper *sk)
|
||||
{
|
||||
return NeonWALReaderEvents(sk->xlogreader);
|
||||
}
|
||||
|
||||
static WaitEventSet *waitEvents;
|
||||
|
||||
static void
|
||||
@@ -1440,6 +1510,7 @@ walprop_pg_free_event_set(WalProposer *wp)
|
||||
for (int i = 0; i < wp->n_safekeepers; i++)
|
||||
{
|
||||
wp->safekeeper[i].eventPos = -1;
|
||||
wp->safekeeper[i].nwrEventPos = -1;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1449,11 +1520,35 @@ walprop_pg_init_event_set(WalProposer *wp)
|
||||
if (waitEvents)
|
||||
elog(FATAL, "double-initialization of event set");
|
||||
|
||||
waitEvents = CreateWaitEventSet(TopMemoryContext, 2 + wp->n_safekeepers);
|
||||
/* for each sk, we have socket plus potentially socket for neon walreader */
|
||||
waitEvents = CreateWaitEventSet(TopMemoryContext, 2 + 2 * wp->n_safekeepers);
|
||||
AddWaitEventToSet(waitEvents, WL_LATCH_SET, PGINVALID_SOCKET,
|
||||
MyLatch, NULL);
|
||||
AddWaitEventToSet(waitEvents, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
|
||||
NULL, NULL);
|
||||
|
||||
for (int i = 0; i < wp->n_safekeepers; i++)
|
||||
{
|
||||
wp->safekeeper[i].eventPos = -1;
|
||||
wp->safekeeper[i].nwrEventPos = -1;
|
||||
}
|
||||
}
|
||||
|
||||
/* add safekeeper socket to wait event set */
|
||||
static void
|
||||
walprop_pg_add_safekeeper_event_set(Safekeeper *sk, uint32 events)
|
||||
{
|
||||
Assert(sk->eventPos == -1);
|
||||
sk->eventPos = AddWaitEventToSet(waitEvents, events, walprop_socket(sk), NULL, sk);
|
||||
}
|
||||
|
||||
/* add neon wal reader socket to wait event set */
|
||||
static void
|
||||
add_nwr_event_set(Safekeeper *sk, uint32 events)
|
||||
{
|
||||
Assert(sk->nwrEventPos == -1);
|
||||
sk->nwrEventPos = AddWaitEventToSet(waitEvents, events, NeonWALReaderSocket(sk->xlogreader), NULL, sk);
|
||||
elog(DEBUG5, "sk %s:%s: added nwr socket events %d", sk->host, sk->port, events);
|
||||
}
|
||||
|
||||
static void
|
||||
@@ -1465,10 +1560,147 @@ walprop_pg_update_event_set(Safekeeper *sk, uint32 events)
|
||||
ModifyWaitEvent(waitEvents, sk->eventPos, events, NULL);
|
||||
}
|
||||
|
||||
/*
|
||||
* Update neon_walreader event.
|
||||
* Can be called when nwr socket doesn't exist, does nothing in this case.
|
||||
*/
|
||||
static void
|
||||
walprop_pg_add_safekeeper_event_set(Safekeeper *sk, uint32 events)
|
||||
update_nwr_event_set(Safekeeper *sk, uint32 events)
|
||||
{
|
||||
sk->eventPos = AddWaitEventToSet(waitEvents, events, walprop_socket(sk), NULL, sk);
|
||||
/* eventPos = -1 when we don't have an event */
|
||||
if (sk->nwrEventPos != -1)
|
||||
ModifyWaitEvent(waitEvents, sk->nwrEventPos, events, NULL);
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
walprop_pg_active_state_update_event_set(Safekeeper *sk)
|
||||
{
|
||||
uint32 sk_events;
|
||||
uint32 nwr_events;
|
||||
|
||||
Assert(sk->state == SS_ACTIVE);
|
||||
SafekeeperStateDesiredEvents(sk, &sk_events, &nwr_events);
|
||||
|
||||
/*
|
||||
* If we need to wait for neon_walreader, ensure we have up to date socket
|
||||
* in the wait event set.
|
||||
*/
|
||||
if (sk->active_state == SS_ACTIVE_READ_WAL)
|
||||
{
|
||||
/*
|
||||
* TODO: instead of reattaching socket (and thus recreating WES) each
|
||||
* time we should keep it if possible, i.e. if connection is already
|
||||
* established. Note that single neon_walreader object can switch
|
||||
* between local and remote reads multiple times during its lifetime,
|
||||
* so careful bookkeeping is needed here.
|
||||
*/
|
||||
rm_safekeeper_event_set(sk, false);
|
||||
add_nwr_event_set(sk, nwr_events);
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
* Hack: we should always set 0 here, but for random reasons
|
||||
* WaitEventSet (WaitEventAdjustEpoll) asserts that there is at least
|
||||
* some event. Since there is also no way to remove socket except
|
||||
* reconstructing the whole set, SafekeeperStateDesiredEvents instead
|
||||
* gives WL_SOCKET_CLOSED if socket exists. We never expect it to
|
||||
* trigger.
|
||||
*
|
||||
* On PG 14 which doesn't have WL_SOCKET_CLOSED resort to event
|
||||
* removal.
|
||||
*/
|
||||
#if PG_VERSION_NUM >= 150000
|
||||
Assert(nwr_events == WL_SOCKET_CLOSED || nwr_events == 0);
|
||||
update_nwr_event_set(sk, WL_SOCKET_CLOSED);
|
||||
#else /* pg 14 */
|
||||
rm_safekeeper_event_set(sk, false);
|
||||
#endif
|
||||
}
|
||||
walprop_pg_update_event_set(sk, sk_events);
|
||||
}
|
||||
|
||||
static void
|
||||
walprop_pg_rm_safekeeper_event_set(Safekeeper *to_remove)
|
||||
{
|
||||
rm_safekeeper_event_set(to_remove, true);
|
||||
}
|
||||
|
||||
/*
|
||||
* A hacky way to remove single event from the event set. Can be called if event
|
||||
* doesn't exist, does nothing in this case.
|
||||
*
|
||||
* Note: Internally, this completely reconstructs the event set. It should be
|
||||
* avoided if possible.
|
||||
*
|
||||
* If is_sk is true, socket of connection to safekeeper is removed; otherwise
|
||||
* socket of neon_walreader.
|
||||
*/
|
||||
static void
|
||||
rm_safekeeper_event_set(Safekeeper *to_remove, bool is_sk)
|
||||
{
|
||||
WalProposer *wp = to_remove->wp;
|
||||
|
||||
elog(DEBUG5, "sk %s:%s: removing event, is_sk %d",
|
||||
to_remove->host, to_remove->port, is_sk);
|
||||
|
||||
/*
|
||||
* Shortpath for exiting if have nothing to do. We never call this
|
||||
* function with safekeeper socket not existing, but do that with neon
|
||||
* walreader socket.
|
||||
*/
|
||||
if ((is_sk && to_remove->eventPos == -1) ||
|
||||
(!is_sk && to_remove->nwrEventPos == -1))
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
/* Remove the existing event set, assign sk->eventPos = -1 */
|
||||
walprop_pg_free_event_set(wp);
|
||||
|
||||
/* Re-initialize it without adding any safekeeper events */
|
||||
wp->api.init_event_set(wp);
|
||||
|
||||
/*
|
||||
* loop through the existing safekeepers. If they aren't the one we're
|
||||
* removing, and if they have a socket we can use, re-add the applicable
|
||||
* events.
|
||||
*/
|
||||
for (int i = 0; i < wp->n_safekeepers; i++)
|
||||
{
|
||||
Safekeeper *sk = &wp->safekeeper[i];
|
||||
|
||||
if (sk == to_remove)
|
||||
{
|
||||
if (is_sk)
|
||||
sk->eventPos = -1;
|
||||
else
|
||||
sk->nwrEventPos = -1;
|
||||
}
|
||||
|
||||
/*
|
||||
* If this safekeeper isn't offline, add events for it, except for the
|
||||
* event requested to remove.
|
||||
*/
|
||||
if (sk->state != SS_OFFLINE)
|
||||
{
|
||||
uint32 sk_events;
|
||||
uint32 nwr_events;
|
||||
|
||||
SafekeeperStateDesiredEvents(sk, &sk_events, &nwr_events);
|
||||
|
||||
if (sk != to_remove || !is_sk)
|
||||
{
|
||||
/* will set sk->eventPos */
|
||||
wp->api.add_safekeeper_event_set(sk, sk_events);
|
||||
}
|
||||
else if ((sk != to_remove || is_sk) && nwr_events)
|
||||
{
|
||||
add_nwr_event_set(sk, nwr_events);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static int
|
||||
@@ -1750,12 +1982,14 @@ static const walproposer_api walprop_pg = {
|
||||
.conn_async_write = walprop_async_write,
|
||||
.conn_blocking_write = walprop_blocking_write,
|
||||
.recovery_download = WalProposerRecovery,
|
||||
.wal_read = walprop_pg_wal_read,
|
||||
.wal_reader_allocate = walprop_pg_wal_reader_allocate,
|
||||
.free_event_set = walprop_pg_free_event_set,
|
||||
.wal_read = walprop_pg_wal_read,
|
||||
.wal_reader_events = walprop_pg_wal_reader_events,
|
||||
.init_event_set = walprop_pg_init_event_set,
|
||||
.update_event_set = walprop_pg_update_event_set,
|
||||
.active_state_update_event_set = walprop_pg_active_state_update_event_set,
|
||||
.add_safekeeper_event_set = walprop_pg_add_safekeeper_event_set,
|
||||
.rm_safekeeper_event_set = walprop_pg_rm_safekeeper_event_set,
|
||||
.wait_event_set = walprop_pg_wait_event_set,
|
||||
.strong_random = walprop_pg_strong_random,
|
||||
.get_redo_start_lsn = walprop_pg_get_redo_start_lsn,
|
||||
|
||||
Reference in New Issue
Block a user