From 30bb18f3a91cb4799531affbac1f6a8aec93be5c Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Tue, 28 Nov 2023 12:16:40 +0300 Subject: [PATCH] Implement the thing, test_late_init works. --- pgxn/neon/neon_walreader.c | 239 +++++++++++++++++++++++++++++++++++-- pgxn/neon/walproposer.c | 16 ++- pgxn/neon/walproposer_pg.c | 11 +- 3 files changed, 255 insertions(+), 11 deletions(-) diff --git a/pgxn/neon/neon_walreader.c b/pgxn/neon/neon_walreader.c index 0f68d19a3c..c217faa66b 100644 --- a/pgxn/neon/neon_walreader.c +++ b/pgxn/neon/neon_walreader.c @@ -1,22 +1,24 @@ /* - * Like WALRead, but returns error instead of throwing ERROR when segment is - * missing + doesn't attempt to read WAL before specified horizon -- basebackup - * LSN. Missing WAL should be fetched by peer recovery, or, alternatively, on - * demand WAL fetching from safekeepers should be implemented in NeonWALReader. + * Like WALRead, but when WAL segment doesn't exist locally instead of throwing + * ERROR asynchronously tries to fetch it from the most advanced safekeeper. * * We can't use libpqwalreceiver as it blocks during connection establishment * (and waiting for PQExec result), so use libpqwalproposer instead. * * TODO: keepalives are currently never sent, so the other side can close the * connection prematurely. + * + * TODO: close conn if reading takes too long to prevent stuck connections. */ #include "postgres.h" +#include #include #include "access/xlog_internal.h" #include "access/xlogdefs.h" #include "access/xlogreader.h" +#include "libpq/pqformat.h" #include "storage/fd.h" #include "utils/wait_event.h" @@ -25,13 +27,16 @@ #include "neon_walreader.h" #include "walproposer.h" -#define NEON_WALREADER_ERR_MSG_LEN 256 +#define NEON_WALREADER_ERR_MSG_LEN 512 static NeonWALReadResult NeonWALReadRemote(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli); +static NeonWALReadResult NeonWALReaderReadMsg(NeonWALReader *state); static void NeonWALReaderResetRemote(NeonWALReader *state); static bool NeonWALReadLocal(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli); static bool neon_wal_segment_open(NeonWALReader *state, XLogSegNo nextSegNo, TimeLineID *tli_p); static void neon_wal_segment_close(NeonWALReader *state); +static bool is_wal_segment_exists(XLogSegNo segno, int segsize, + TimeLineID tli); /* * State of connection to donor safekeeper. @@ -79,10 +84,11 @@ struct NeonWALReader WalProposerConn *wp_conn; /* - * position in recvbuf from which we'll copy WAL next time, or NULL if - * there is no unprocessed message + * position in wp_conn recvbuf from which we'll copy WAL next time, or + * NULL if there is no unprocessed message */ char *wal_ptr; + Size wal_rem_len; /* how many unprocessed bytes left in recvbuf */ /* * LSN of wal_ptr position according to walsender to cross check against @@ -111,6 +117,8 @@ NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn, WalPropose reader->wp = wp; + reader->rem_state = RS_NONE; + return reader; } @@ -119,6 +127,9 @@ NeonWALReaderFree(NeonWALReader *state) { if (state->seg.ws_file != -1) neon_wal_segment_close(state); + if (state->wp_conn) + libpqwp_disconnect(state->wp_conn); + Assert(false); pfree(state); } @@ -167,7 +178,8 @@ NeonWALRead(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size count, Ti } else if (state->wre_errno == ENOENT) { - elog(LOG, "local read failed with segment doesn't exist, attempting remote"); + elog(LOG, "local read failed as segment at %X/%X doesn't exist, attempting remote", + LSN_FORMAT_ARGS(startptr)); return NeonWALReadRemote(state, buf, startptr, count, tli); } else @@ -176,6 +188,7 @@ NeonWALRead(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size count, Ti } } +/* Do the read from remote safekeeper. */ static NeonWALReadResult NeonWALReadRemote(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli) { @@ -292,14 +305,211 @@ NeonWALReadRemote(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size cou NeonWALReaderResetRemote(state); return NEON_WALREAD_ERROR; } - elog(LOG, "moving ptr by %zu bytes restoring progress, req_lsn = %X/%X", state->req_progress, LSN_FORMAT_ARGS(startptr)); + elog(LOG, "continuing remote read at req_lsn=%X/%X len=%zu, req_progress=%zu", + LSN_FORMAT_ARGS(startptr), + count, + state->req_progress); buf += state->req_progress; } + else + { + state->req_lsn = startptr; + state->req_len = count; + state->req_progress = 0; + elog(LOG, "starting remote read req_lsn=%X/%X len=%zu", + LSN_FORMAT_ARGS(startptr), + count); + } + + while (true) + { + Size to_copy; + + /* + * If we have no ready data, receive new message. + */ + if (state->wal_rem_len == 0 && + + /* + * check for the sake of 0 length reads; walproposer does these for + * heartbeats, though generally they shouldn't hit remote source. + */ + state->req_len - state->req_progress > 0) + { + NeonWALReadResult read_msg_res = NeonWALReaderReadMsg(state); + + if (read_msg_res != NEON_WALREAD_SUCCESS) + return read_msg_res; + } + + if (state->req_lsn + state->req_progress != state->rem_lsn) + { + snprintf(state->err_msg, sizeof(state->err_msg), + "expected remote WAL at %X/%X but got %X/%X. Non monotonic read requests could have caused this. req_lsn=%X/%X len=%zu", + LSN_FORMAT_ARGS(state->req_lsn + state->req_progress), + LSN_FORMAT_ARGS(state->rem_lsn), + LSN_FORMAT_ARGS(state->req_lsn), + state->req_len); + NeonWALReaderResetRemote(state); + return NEON_WALREAD_ERROR; + } + + /* We can copy min of (available, requested) bytes. */ + to_copy = + Min(state->req_len - state->req_progress, state->wal_rem_len); + memcpy(buf, state->wal_ptr, to_copy); + state->wal_ptr += to_copy; + state->wal_rem_len -= to_copy; + state->rem_lsn += to_copy; + if (state->wal_rem_len == 0) + state->wal_ptr = NULL; /* freed by libpqwalproposer */ + state->req_progress += to_copy; + if (state->req_progress == state->req_len) + { + /* + * Request completed. If there is a chance of serving next one + * locally, close the connection. + */ + if (state->req_lsn < state->available_lsn && + state->rem_lsn >= state->available_lsn) + { + elog(LOG, "closing remote connection as available_lsn %X/%X crossed and next read is likely to be served locally", + LSN_FORMAT_ARGS(state->available_lsn)); + NeonWALReaderResetRemote(state); + } + else if (XLogSegmentOffset(state->rem_lsn, wal_segment_size) == 0) + { + XLogSegNo segno; + + XLByteToSeg(state->rem_lsn, segno, state->segcxt.ws_segsize); + if (is_wal_segment_exists(segno, state->segcxt.ws_segsize, tli)) + { + elog(LOG, "closing remote connection as WAL file at next lsn %X/%X exists", + LSN_FORMAT_ARGS(state->rem_lsn)); + NeonWALReaderResetRemote(state); + } + } + state->req_lsn = InvalidXLogRecPtr; + state->req_len = 0; + state->req_progress = 0; + return NEON_WALREAD_SUCCESS; + } + } snprintf(state->err_msg, sizeof(state->err_msg), "remote read failed: not implemented"); return NEON_WALREAD_ERROR; } +/* + * Read one WAL message from the stream, sets state->wal_ptr in case of success. + * Resets remote state in case of failure. + */ +static NeonWALReadResult +NeonWALReaderReadMsg(NeonWALReader *state) +{ + while (true) /* loop until we get 'w' */ + { + char *copydata_ptr; + int copydata_size; + StringInfoData s; + char msg_type; + int hdrlen; + + Assert(state->rem_state == RS_ESTABLISHED); + Assert(state->wal_ptr == NULL && state->wal_rem_len == 0); + + switch (libpqwp_async_read(state->wp_conn, + ©data_ptr, + ©data_size)) + { + case PG_ASYNC_READ_SUCCESS: + break; + case PG_ASYNC_READ_TRY_AGAIN: + return NEON_WALREAD_WOULDBLOCK; + case PG_ASYNC_READ_FAIL: + snprintf(state->err_msg, + sizeof(state->err_msg), + "req_lsn=%X/%X, req_len=%zu, req_progress=%zu, get copydata failed: %s", + LSN_FORMAT_ARGS(state->req_lsn), + state->req_len, + state->req_progress, + PQerrorMessage(state->wp_conn->pg_conn)); + goto err; + } + + /* put data on StringInfo to parse */ + s.data = copydata_ptr; + s.len = copydata_size; + s.cursor = 0; + s.maxlen = -1; + + if (copydata_size == 0) + { + snprintf(state->err_msg, + sizeof(state->err_msg), + "zero length copydata received"); + goto err; + } + msg_type = pq_getmsgbyte(&s); + switch (msg_type) + { + case 'w': + { + XLogRecPtr start_lsn; + + hdrlen = sizeof(int64) + sizeof(int64) + sizeof(int64); + if (s.len - s.cursor < hdrlen) + { + snprintf(state->err_msg, + sizeof(state->err_msg), + "invalid WAL message received from primary"); + goto err; + } + + start_lsn = pq_getmsgint64(&s); + pq_getmsgint64(&s); /* XLogRecPtr end_lsn; */ + pq_getmsgint64(&s); /* TimestampTz send_time */ + + state->rem_lsn = start_lsn; + state->wal_rem_len = (Size) (s.len - s.cursor); + state->wal_ptr = (char *) pq_getmsgbytes(&s, s.len - s.cursor); + elog(LOG, "received WAL msg at %X/%X len %zu", + LSN_FORMAT_ARGS(state->rem_lsn), state->wal_rem_len); + + return NEON_WALREAD_SUCCESS; + } + case 'k': + { + XLogRecPtr end_lsn; + bool reply_requested; + + hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char); + if (s.len - s.cursor < hdrlen) + { + snprintf(state->err_msg, sizeof(state->err_msg), + "invalid keepalive message received from primary"); + goto err; + } + + end_lsn = pq_getmsgint64(&s); + pq_getmsgint64(&s); /* TimestampTz timestamp; */ + reply_requested = pq_getmsgbyte(&s); + elog(DEBUG5, "received keepalive end_lsn=%X/%X reply_requested=%d", + LSN_FORMAT_ARGS(end_lsn), + reply_requested); + continue; + /* todo: send replies */ + } + default: + elog(WARNING, "invalid replication message type %d", msg_type); + continue; + } + } +err: + NeonWALReaderResetRemote(state); + return NEON_WALREAD_ERROR; +} + /* reset remote connection and request in progress */ static void NeonWALReaderResetRemote(NeonWALReader *state) @@ -315,6 +525,7 @@ NeonWALReaderResetRemote(NeonWALReader *state) } state->donor_name[0] = '\0'; state->wal_ptr = NULL; + state->wal_rem_len = 0; state->rem_lsn = InvalidXLogRecPtr; } @@ -476,6 +687,16 @@ neon_wal_segment_open(NeonWALReader *state, XLogSegNo nextSegNo, return false; } +static bool +is_wal_segment_exists(XLogSegNo segno, int segsize, TimeLineID tli) +{ + struct stat stat_buffer; + char path[MAXPGPATH]; + + XLogFilePath(path, tli, segno, segsize); + return stat(path, &stat_buffer) == 0; +} + /* copy of vanilla wal_segment_close with NeonWALReader */ static void neon_wal_segment_close(NeonWALReader *state) diff --git a/pgxn/neon/walproposer.c b/pgxn/neon/walproposer.c index 7d8e582b6b..53efa25d0c 100644 --- a/pgxn/neon/walproposer.c +++ b/pgxn/neon/walproposer.c @@ -1257,6 +1257,14 @@ HandleActiveState(Safekeeper *sk, uint32 events) if (!RecvAppendResponses(sk)) return; + if (events & WL_SOCKET_CLOSED) + { + walprop_log(WARNING, "connection to %s:%s in active state failed, got WL_SOCKET_CLOSED on neon_walreader socket", + sk->host, sk->port); + ShutdownConnection(sk); + return; + } + /* configures event set for yield whatever is the substate */ wp->api.active_state_update_event_set(sk); } @@ -1312,7 +1320,7 @@ SendAppendRequests(Safekeeper *sk) req = &sk->appendRequest; PrepareAppendRequest(sk->wp, &sk->appendRequest, sk->streamingAt, endLsn); - walprop_log(DEBUG2, "sending message len %ld beginLsn=%X/%X endLsn=%X/%X commitLsn=%X/%X truncateLsn=%X/%X to %s:%s", + walprop_log(LOG, "sending message len %ld beginLsn=%X/%X endLsn=%X/%X commitLsn=%X/%X truncateLsn=%X/%X to %s:%s", req->endLsn - req->beginLsn, LSN_FORMAT_ARGS(req->beginLsn), LSN_FORMAT_ARGS(req->endLsn), @@ -2109,6 +2117,9 @@ SafekeeperStateDesiredEvents(Safekeeper *sk, uint32 *sk_events, uint32 *nwr_even */ case SS_ACTIVE_SEND: *sk_events = WL_SOCKET_READABLE; + if (NeonWALReaderEvents(sk->xlogreader)) + *nwr_events = WL_SOCKET_CLOSED; /* c.f. + * walprop_pg_active_state_update_event_set */ return; /* @@ -2126,6 +2137,9 @@ SafekeeperStateDesiredEvents(Safekeeper *sk, uint32 *sk_events, uint32 *nwr_even */ case SS_ACTIVE_FLUSH: *sk_events = WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE; + if (NeonWALReaderEvents(sk->xlogreader)) + *nwr_events = WL_SOCKET_CLOSED; /* c.f. + * walprop_pg_active_state_update_event_set */ return; } return; diff --git a/pgxn/neon/walproposer_pg.c b/pgxn/neon/walproposer_pg.c index 8368f48187..0114c33241 100644 --- a/pgxn/neon/walproposer_pg.c +++ b/pgxn/neon/walproposer_pg.c @@ -812,6 +812,7 @@ walprop_flush(Safekeeper *sk) return (PQflush(sk->conn->pg_conn)); } +/* Like libpqrcv_receive. *buf is valid until the next call. */ PGAsyncReadResult libpqwp_async_read(WalProposerConn *conn, char **buf, int *amount) { @@ -1568,7 +1569,15 @@ walprop_pg_active_state_update_event_set(Safekeeper *sk) } else { - update_nwr_event_set(sk, 0); + /* + * Hack: we should always set 0 here, but for random reasons + * WaitEventSet (WaitEventAdjustEpoll) asserts that there is at least + * some event. Since there is also no way to remove socket except + * reconstructing the whole set, SafekeeperStateDesiredEvents instead + * gives WL_SOCKET_CLOSED if socket exists. + */ + Assert(nwr_events == WL_SOCKET_CLOSED || nwr_events == 0); + update_nwr_event_set(sk, WL_SOCKET_CLOSED); } walprop_pg_update_event_set(sk, sk_events); }