Implement the thing, test_late_init works.

This commit is contained in:
Arseny Sher
2023-11-28 12:16:40 +03:00
parent 3ce3ccac0d
commit 30bb18f3a9
3 changed files with 255 additions and 11 deletions

View File

@@ -1,22 +1,24 @@
/*
* Like WALRead, but returns error instead of throwing ERROR when segment is
* missing + doesn't attempt to read WAL before specified horizon -- basebackup
* LSN. Missing WAL should be fetched by peer recovery, or, alternatively, on
* demand WAL fetching from safekeepers should be implemented in NeonWALReader.
* Like WALRead, but when WAL segment doesn't exist locally instead of throwing
* ERROR asynchronously tries to fetch it from the most advanced safekeeper.
*
* We can't use libpqwalreceiver as it blocks during connection establishment
* (and waiting for PQExec result), so use libpqwalproposer instead.
*
* TODO: keepalives are currently never sent, so the other side can close the
* connection prematurely.
*
* TODO: close conn if reading takes too long to prevent stuck connections.
*/
#include "postgres.h"
#include <sys/stat.h>
#include <unistd.h>
#include "access/xlog_internal.h"
#include "access/xlogdefs.h"
#include "access/xlogreader.h"
#include "libpq/pqformat.h"
#include "storage/fd.h"
#include "utils/wait_event.h"
@@ -25,13 +27,16 @@
#include "neon_walreader.h"
#include "walproposer.h"
#define NEON_WALREADER_ERR_MSG_LEN 256
#define NEON_WALREADER_ERR_MSG_LEN 512
static NeonWALReadResult NeonWALReadRemote(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli);
static NeonWALReadResult NeonWALReaderReadMsg(NeonWALReader *state);
static void NeonWALReaderResetRemote(NeonWALReader *state);
static bool NeonWALReadLocal(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli);
static bool neon_wal_segment_open(NeonWALReader *state, XLogSegNo nextSegNo, TimeLineID *tli_p);
static void neon_wal_segment_close(NeonWALReader *state);
static bool is_wal_segment_exists(XLogSegNo segno, int segsize,
TimeLineID tli);
/*
* State of connection to donor safekeeper.
@@ -79,10 +84,11 @@ struct NeonWALReader
WalProposerConn *wp_conn;
/*
* position in recvbuf from which we'll copy WAL next time, or NULL if
* there is no unprocessed message
* position in wp_conn recvbuf from which we'll copy WAL next time, or
* NULL if there is no unprocessed message
*/
char *wal_ptr;
Size wal_rem_len; /* how many unprocessed bytes left in recvbuf */
/*
* LSN of wal_ptr position according to walsender to cross check against
@@ -111,6 +117,8 @@ NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn, WalPropose
reader->wp = wp;
reader->rem_state = RS_NONE;
return reader;
}
@@ -119,6 +127,9 @@ NeonWALReaderFree(NeonWALReader *state)
{
if (state->seg.ws_file != -1)
neon_wal_segment_close(state);
if (state->wp_conn)
libpqwp_disconnect(state->wp_conn);
Assert(false);
pfree(state);
}
@@ -167,7 +178,8 @@ NeonWALRead(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size count, Ti
}
else if (state->wre_errno == ENOENT)
{
elog(LOG, "local read failed with segment doesn't exist, attempting remote");
elog(LOG, "local read failed as segment at %X/%X doesn't exist, attempting remote",
LSN_FORMAT_ARGS(startptr));
return NeonWALReadRemote(state, buf, startptr, count, tli);
}
else
@@ -176,6 +188,7 @@ NeonWALRead(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size count, Ti
}
}
/* Do the read from remote safekeeper. */
static NeonWALReadResult
NeonWALReadRemote(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli)
{
@@ -292,14 +305,211 @@ NeonWALReadRemote(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size cou
NeonWALReaderResetRemote(state);
return NEON_WALREAD_ERROR;
}
elog(LOG, "moving ptr by %zu bytes restoring progress, req_lsn = %X/%X", state->req_progress, LSN_FORMAT_ARGS(startptr));
elog(LOG, "continuing remote read at req_lsn=%X/%X len=%zu, req_progress=%zu",
LSN_FORMAT_ARGS(startptr),
count,
state->req_progress);
buf += state->req_progress;
}
else
{
state->req_lsn = startptr;
state->req_len = count;
state->req_progress = 0;
elog(LOG, "starting remote read req_lsn=%X/%X len=%zu",
LSN_FORMAT_ARGS(startptr),
count);
}
while (true)
{
Size to_copy;
/*
* If we have no ready data, receive new message.
*/
if (state->wal_rem_len == 0 &&
/*
* check for the sake of 0 length reads; walproposer does these for
* heartbeats, though generally they shouldn't hit remote source.
*/
state->req_len - state->req_progress > 0)
{
NeonWALReadResult read_msg_res = NeonWALReaderReadMsg(state);
if (read_msg_res != NEON_WALREAD_SUCCESS)
return read_msg_res;
}
if (state->req_lsn + state->req_progress != state->rem_lsn)
{
snprintf(state->err_msg, sizeof(state->err_msg),
"expected remote WAL at %X/%X but got %X/%X. Non monotonic read requests could have caused this. req_lsn=%X/%X len=%zu",
LSN_FORMAT_ARGS(state->req_lsn + state->req_progress),
LSN_FORMAT_ARGS(state->rem_lsn),
LSN_FORMAT_ARGS(state->req_lsn),
state->req_len);
NeonWALReaderResetRemote(state);
return NEON_WALREAD_ERROR;
}
/* We can copy min of (available, requested) bytes. */
to_copy =
Min(state->req_len - state->req_progress, state->wal_rem_len);
memcpy(buf, state->wal_ptr, to_copy);
state->wal_ptr += to_copy;
state->wal_rem_len -= to_copy;
state->rem_lsn += to_copy;
if (state->wal_rem_len == 0)
state->wal_ptr = NULL; /* freed by libpqwalproposer */
state->req_progress += to_copy;
if (state->req_progress == state->req_len)
{
/*
* Request completed. If there is a chance of serving next one
* locally, close the connection.
*/
if (state->req_lsn < state->available_lsn &&
state->rem_lsn >= state->available_lsn)
{
elog(LOG, "closing remote connection as available_lsn %X/%X crossed and next read is likely to be served locally",
LSN_FORMAT_ARGS(state->available_lsn));
NeonWALReaderResetRemote(state);
}
else if (XLogSegmentOffset(state->rem_lsn, wal_segment_size) == 0)
{
XLogSegNo segno;
XLByteToSeg(state->rem_lsn, segno, state->segcxt.ws_segsize);
if (is_wal_segment_exists(segno, state->segcxt.ws_segsize, tli))
{
elog(LOG, "closing remote connection as WAL file at next lsn %X/%X exists",
LSN_FORMAT_ARGS(state->rem_lsn));
NeonWALReaderResetRemote(state);
}
}
state->req_lsn = InvalidXLogRecPtr;
state->req_len = 0;
state->req_progress = 0;
return NEON_WALREAD_SUCCESS;
}
}
snprintf(state->err_msg, sizeof(state->err_msg), "remote read failed: not implemented");
return NEON_WALREAD_ERROR;
}
/*
* Read one WAL message from the stream, sets state->wal_ptr in case of success.
* Resets remote state in case of failure.
*/
static NeonWALReadResult
NeonWALReaderReadMsg(NeonWALReader *state)
{
while (true) /* loop until we get 'w' */
{
char *copydata_ptr;
int copydata_size;
StringInfoData s;
char msg_type;
int hdrlen;
Assert(state->rem_state == RS_ESTABLISHED);
Assert(state->wal_ptr == NULL && state->wal_rem_len == 0);
switch (libpqwp_async_read(state->wp_conn,
&copydata_ptr,
&copydata_size))
{
case PG_ASYNC_READ_SUCCESS:
break;
case PG_ASYNC_READ_TRY_AGAIN:
return NEON_WALREAD_WOULDBLOCK;
case PG_ASYNC_READ_FAIL:
snprintf(state->err_msg,
sizeof(state->err_msg),
"req_lsn=%X/%X, req_len=%zu, req_progress=%zu, get copydata failed: %s",
LSN_FORMAT_ARGS(state->req_lsn),
state->req_len,
state->req_progress,
PQerrorMessage(state->wp_conn->pg_conn));
goto err;
}
/* put data on StringInfo to parse */
s.data = copydata_ptr;
s.len = copydata_size;
s.cursor = 0;
s.maxlen = -1;
if (copydata_size == 0)
{
snprintf(state->err_msg,
sizeof(state->err_msg),
"zero length copydata received");
goto err;
}
msg_type = pq_getmsgbyte(&s);
switch (msg_type)
{
case 'w':
{
XLogRecPtr start_lsn;
hdrlen = sizeof(int64) + sizeof(int64) + sizeof(int64);
if (s.len - s.cursor < hdrlen)
{
snprintf(state->err_msg,
sizeof(state->err_msg),
"invalid WAL message received from primary");
goto err;
}
start_lsn = pq_getmsgint64(&s);
pq_getmsgint64(&s); /* XLogRecPtr end_lsn; */
pq_getmsgint64(&s); /* TimestampTz send_time */
state->rem_lsn = start_lsn;
state->wal_rem_len = (Size) (s.len - s.cursor);
state->wal_ptr = (char *) pq_getmsgbytes(&s, s.len - s.cursor);
elog(LOG, "received WAL msg at %X/%X len %zu",
LSN_FORMAT_ARGS(state->rem_lsn), state->wal_rem_len);
return NEON_WALREAD_SUCCESS;
}
case 'k':
{
XLogRecPtr end_lsn;
bool reply_requested;
hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char);
if (s.len - s.cursor < hdrlen)
{
snprintf(state->err_msg, sizeof(state->err_msg),
"invalid keepalive message received from primary");
goto err;
}
end_lsn = pq_getmsgint64(&s);
pq_getmsgint64(&s); /* TimestampTz timestamp; */
reply_requested = pq_getmsgbyte(&s);
elog(DEBUG5, "received keepalive end_lsn=%X/%X reply_requested=%d",
LSN_FORMAT_ARGS(end_lsn),
reply_requested);
continue;
/* todo: send replies */
}
default:
elog(WARNING, "invalid replication message type %d", msg_type);
continue;
}
}
err:
NeonWALReaderResetRemote(state);
return NEON_WALREAD_ERROR;
}
/* reset remote connection and request in progress */
static void
NeonWALReaderResetRemote(NeonWALReader *state)
@@ -315,6 +525,7 @@ NeonWALReaderResetRemote(NeonWALReader *state)
}
state->donor_name[0] = '\0';
state->wal_ptr = NULL;
state->wal_rem_len = 0;
state->rem_lsn = InvalidXLogRecPtr;
}
@@ -476,6 +687,16 @@ neon_wal_segment_open(NeonWALReader *state, XLogSegNo nextSegNo,
return false;
}
static bool
is_wal_segment_exists(XLogSegNo segno, int segsize, TimeLineID tli)
{
struct stat stat_buffer;
char path[MAXPGPATH];
XLogFilePath(path, tli, segno, segsize);
return stat(path, &stat_buffer) == 0;
}
/* copy of vanilla wal_segment_close with NeonWALReader */
static void
neon_wal_segment_close(NeonWALReader *state)

View File

@@ -1257,6 +1257,14 @@ HandleActiveState(Safekeeper *sk, uint32 events)
if (!RecvAppendResponses(sk))
return;
if (events & WL_SOCKET_CLOSED)
{
walprop_log(WARNING, "connection to %s:%s in active state failed, got WL_SOCKET_CLOSED on neon_walreader socket",
sk->host, sk->port);
ShutdownConnection(sk);
return;
}
/* configures event set for yield whatever is the substate */
wp->api.active_state_update_event_set(sk);
}
@@ -1312,7 +1320,7 @@ SendAppendRequests(Safekeeper *sk)
req = &sk->appendRequest;
PrepareAppendRequest(sk->wp, &sk->appendRequest, sk->streamingAt, endLsn);
walprop_log(DEBUG2, "sending message len %ld beginLsn=%X/%X endLsn=%X/%X commitLsn=%X/%X truncateLsn=%X/%X to %s:%s",
walprop_log(LOG, "sending message len %ld beginLsn=%X/%X endLsn=%X/%X commitLsn=%X/%X truncateLsn=%X/%X to %s:%s",
req->endLsn - req->beginLsn,
LSN_FORMAT_ARGS(req->beginLsn),
LSN_FORMAT_ARGS(req->endLsn),
@@ -2109,6 +2117,9 @@ SafekeeperStateDesiredEvents(Safekeeper *sk, uint32 *sk_events, uint32 *nwr_even
*/
case SS_ACTIVE_SEND:
*sk_events = WL_SOCKET_READABLE;
if (NeonWALReaderEvents(sk->xlogreader))
*nwr_events = WL_SOCKET_CLOSED; /* c.f.
* walprop_pg_active_state_update_event_set */
return;
/*
@@ -2126,6 +2137,9 @@ SafekeeperStateDesiredEvents(Safekeeper *sk, uint32 *sk_events, uint32 *nwr_even
*/
case SS_ACTIVE_FLUSH:
*sk_events = WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE;
if (NeonWALReaderEvents(sk->xlogreader))
*nwr_events = WL_SOCKET_CLOSED; /* c.f.
* walprop_pg_active_state_update_event_set */
return;
}
return;

View File

@@ -812,6 +812,7 @@ walprop_flush(Safekeeper *sk)
return (PQflush(sk->conn->pg_conn));
}
/* Like libpqrcv_receive. *buf is valid until the next call. */
PGAsyncReadResult
libpqwp_async_read(WalProposerConn *conn, char **buf, int *amount)
{
@@ -1568,7 +1569,15 @@ walprop_pg_active_state_update_event_set(Safekeeper *sk)
}
else
{
update_nwr_event_set(sk, 0);
/*
* Hack: we should always set 0 here, but for random reasons
* WaitEventSet (WaitEventAdjustEpoll) asserts that there is at least
* some event. Since there is also no way to remove socket except
* reconstructing the whole set, SafekeeperStateDesiredEvents instead
* gives WL_SOCKET_CLOSED if socket exists.
*/
Assert(nwr_events == WL_SOCKET_CLOSED || nwr_events == 0);
update_nwr_event_set(sk, WL_SOCKET_CLOSED);
}
walprop_pg_update_event_set(sk, sk_events);
}