From 1a1162cd56acbcc2d90e9163dfd06a583c812f95 Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Tue, 28 Nov 2023 13:09:06 +0300 Subject: [PATCH] Add check that server is enough up to date. --- pgxn/neon/neon_walreader.c | 37 +++++++++++++++++++++---------------- pgxn/neon/walproposer_pg.c | 2 +- 2 files changed, 22 insertions(+), 17 deletions(-) diff --git a/pgxn/neon/neon_walreader.c b/pgxn/neon/neon_walreader.c index 268a887c38..a830175386 100644 --- a/pgxn/neon/neon_walreader.c +++ b/pgxn/neon/neon_walreader.c @@ -102,7 +102,7 @@ struct NeonWALReader XLogRecPtr rem_lsn; /* prepended to lines logged by neon_walreader, if provided */ - char log_prefix[64]; + char log_prefix[64]; }; /* palloc and initialize NeonWALReader */ @@ -140,7 +140,6 @@ NeonWALReaderFree(NeonWALReader *state) neon_wal_segment_close(state); if (state->wp_conn) libpqwp_disconnect(state->wp_conn); - Assert(false); pfree(state); } @@ -190,7 +189,7 @@ NeonWALRead(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size count, Ti else if (state->wre_errno == ENOENT) { nwr_log(LOG, "local read failed as segment at %X/%X doesn't exist, attempting remote", - LSN_FORMAT_ARGS(startptr)); + LSN_FORMAT_ARGS(startptr)); return NeonWALReadRemote(state, buf, startptr, count, tli); } else @@ -218,7 +217,7 @@ NeonWALReadRemote(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size cou } snprintf(state->donor_name, sizeof(state->donor_name), "%s:%s", donor->host, donor->port); nwr_log(LOG, "establishing connection to %s, flush_lsn %X/%X to fetch WAL", - state->donor_name, LSN_FORMAT_ARGS(donor_lsn)); + state->donor_name, LSN_FORMAT_ARGS(donor_lsn)); state->wp_conn = libpqwp_connect_start(donor->conninfo); if (PQstatus(state->wp_conn->pg_conn) == CONNECTION_BAD) { @@ -257,7 +256,7 @@ NeonWALReadRemote(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size cou "START_REPLICATION PHYSICAL %X/%X (term='" UINT64_FORMAT "')", LSN_FORMAT_ARGS(startptr), state->wp->propTerm); nwr_log(LOG, "connection to %s to fetch WAL succeeded, running %s", - state->donor_name, start_repl_query); + state->donor_name, start_repl_query); if (!libpqwp_send_query(state->wp_conn, start_repl_query)) { snprintf(state->err_msg, sizeof(state->err_msg), @@ -317,9 +316,9 @@ NeonWALReadRemote(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size cou return NEON_WALREAD_ERROR; } nwr_log(LOG, "continuing remote read at req_lsn=%X/%X len=%zu, req_progress=%zu", - LSN_FORMAT_ARGS(startptr), - count, - state->req_progress); + LSN_FORMAT_ARGS(startptr), + count, + state->req_progress); buf += state->req_progress; } else @@ -328,8 +327,8 @@ NeonWALReadRemote(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size cou state->req_len = count; state->req_progress = 0; nwr_log(LOG, "starting remote read req_lsn=%X/%X len=%zu", - LSN_FORMAT_ARGS(startptr), - count); + LSN_FORMAT_ARGS(startptr), + count); } while (true) @@ -385,7 +384,7 @@ NeonWALReadRemote(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size cou state->rem_lsn >= state->available_lsn) { nwr_log(LOG, "closing remote connection as available_lsn %X/%X crossed and next read is likely to be served locally", - LSN_FORMAT_ARGS(state->available_lsn)); + LSN_FORMAT_ARGS(state->available_lsn)); NeonWALReaderResetRemote(state); } else if (XLogSegmentOffset(state->rem_lsn, wal_segment_size) == 0) @@ -396,7 +395,7 @@ NeonWALReadRemote(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size cou if (is_wal_segment_exists(segno, state->segcxt.ws_segsize, tli)) { nwr_log(LOG, "closing remote connection as WAL file at next lsn %X/%X exists", - LSN_FORMAT_ARGS(state->rem_lsn)); + LSN_FORMAT_ARGS(state->rem_lsn)); NeonWALReaderResetRemote(state); } } @@ -485,7 +484,7 @@ NeonWALReaderReadMsg(NeonWALReader *state) state->wal_rem_len = (Size) (s.len - s.cursor); state->wal_ptr = (char *) pq_getmsgbytes(&s, s.len - s.cursor); nwr_log(LOG, "received WAL msg at %X/%X len %zu", - LSN_FORMAT_ARGS(state->rem_lsn), state->wal_rem_len); + LSN_FORMAT_ARGS(state->rem_lsn), state->wal_rem_len); return NEON_WALREAD_SUCCESS; } @@ -506,10 +505,16 @@ NeonWALReaderReadMsg(NeonWALReader *state) pq_getmsgint64(&s); /* TimestampTz timestamp; */ reply_requested = pq_getmsgbyte(&s); nwr_log(DEBUG5, "received keepalive end_lsn=%X/%X reply_requested=%d", - LSN_FORMAT_ARGS(end_lsn), - reply_requested); + LSN_FORMAT_ARGS(end_lsn), + reply_requested); + if (end_lsn < state->req_lsn + state->req_len) + { + snprintf(state->err_msg, sizeof(state->err_msg), + "closing remote connection: requested WAL up to %X/%X, but current donor %s has only up to %X/%X", + LSN_FORMAT_ARGS(state->req_lsn + state->req_len), state->donor_name, LSN_FORMAT_ARGS(end_lsn)); + goto err; + } continue; - /* todo: send replies */ } default: nwr_log(WARNING, "invalid replication message type %d", msg_type); diff --git a/pgxn/neon/walproposer_pg.c b/pgxn/neon/walproposer_pg.c index 821b1d017a..181b4c07a5 100644 --- a/pgxn/neon/walproposer_pg.c +++ b/pgxn/neon/walproposer_pg.c @@ -1460,7 +1460,7 @@ walprop_pg_wal_read(Safekeeper *sk, char *buf, XLogRecPtr startptr, Size count) static void walprop_pg_wal_reader_allocate(Safekeeper *sk) { - char log_prefix[64]; + char log_prefix[64]; snprintf(log_prefix, sizeof(log_prefix), "sk %s:%s nwr: ", sk->host, sk->port); sk->xlogreader = NeonWALReaderAllocate(wal_segment_size, sk->wp->propEpochStartLsn, sk->wp, log_prefix);