Add check that server is enough up to date.

This commit is contained in:
Arseny Sher
2023-11-28 13:09:06 +03:00
parent eabfc5542e
commit 1a1162cd56
2 changed files with 22 additions and 17 deletions

View File

@@ -102,7 +102,7 @@ struct NeonWALReader
XLogRecPtr rem_lsn;
/* prepended to lines logged by neon_walreader, if provided */
char log_prefix[64];
char log_prefix[64];
};
/* palloc and initialize NeonWALReader */
@@ -140,7 +140,6 @@ NeonWALReaderFree(NeonWALReader *state)
neon_wal_segment_close(state);
if (state->wp_conn)
libpqwp_disconnect(state->wp_conn);
Assert(false);
pfree(state);
}
@@ -190,7 +189,7 @@ NeonWALRead(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size count, Ti
else if (state->wre_errno == ENOENT)
{
nwr_log(LOG, "local read failed as segment at %X/%X doesn't exist, attempting remote",
LSN_FORMAT_ARGS(startptr));
LSN_FORMAT_ARGS(startptr));
return NeonWALReadRemote(state, buf, startptr, count, tli);
}
else
@@ -218,7 +217,7 @@ NeonWALReadRemote(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size cou
}
snprintf(state->donor_name, sizeof(state->donor_name), "%s:%s", donor->host, donor->port);
nwr_log(LOG, "establishing connection to %s, flush_lsn %X/%X to fetch WAL",
state->donor_name, LSN_FORMAT_ARGS(donor_lsn));
state->donor_name, LSN_FORMAT_ARGS(donor_lsn));
state->wp_conn = libpqwp_connect_start(donor->conninfo);
if (PQstatus(state->wp_conn->pg_conn) == CONNECTION_BAD)
{
@@ -257,7 +256,7 @@ NeonWALReadRemote(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size cou
"START_REPLICATION PHYSICAL %X/%X (term='" UINT64_FORMAT "')",
LSN_FORMAT_ARGS(startptr), state->wp->propTerm);
nwr_log(LOG, "connection to %s to fetch WAL succeeded, running %s",
state->donor_name, start_repl_query);
state->donor_name, start_repl_query);
if (!libpqwp_send_query(state->wp_conn, start_repl_query))
{
snprintf(state->err_msg, sizeof(state->err_msg),
@@ -317,9 +316,9 @@ NeonWALReadRemote(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size cou
return NEON_WALREAD_ERROR;
}
nwr_log(LOG, "continuing remote read at req_lsn=%X/%X len=%zu, req_progress=%zu",
LSN_FORMAT_ARGS(startptr),
count,
state->req_progress);
LSN_FORMAT_ARGS(startptr),
count,
state->req_progress);
buf += state->req_progress;
}
else
@@ -328,8 +327,8 @@ NeonWALReadRemote(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size cou
state->req_len = count;
state->req_progress = 0;
nwr_log(LOG, "starting remote read req_lsn=%X/%X len=%zu",
LSN_FORMAT_ARGS(startptr),
count);
LSN_FORMAT_ARGS(startptr),
count);
}
while (true)
@@ -385,7 +384,7 @@ NeonWALReadRemote(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size cou
state->rem_lsn >= state->available_lsn)
{
nwr_log(LOG, "closing remote connection as available_lsn %X/%X crossed and next read is likely to be served locally",
LSN_FORMAT_ARGS(state->available_lsn));
LSN_FORMAT_ARGS(state->available_lsn));
NeonWALReaderResetRemote(state);
}
else if (XLogSegmentOffset(state->rem_lsn, wal_segment_size) == 0)
@@ -396,7 +395,7 @@ NeonWALReadRemote(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size cou
if (is_wal_segment_exists(segno, state->segcxt.ws_segsize, tli))
{
nwr_log(LOG, "closing remote connection as WAL file at next lsn %X/%X exists",
LSN_FORMAT_ARGS(state->rem_lsn));
LSN_FORMAT_ARGS(state->rem_lsn));
NeonWALReaderResetRemote(state);
}
}
@@ -485,7 +484,7 @@ NeonWALReaderReadMsg(NeonWALReader *state)
state->wal_rem_len = (Size) (s.len - s.cursor);
state->wal_ptr = (char *) pq_getmsgbytes(&s, s.len - s.cursor);
nwr_log(LOG, "received WAL msg at %X/%X len %zu",
LSN_FORMAT_ARGS(state->rem_lsn), state->wal_rem_len);
LSN_FORMAT_ARGS(state->rem_lsn), state->wal_rem_len);
return NEON_WALREAD_SUCCESS;
}
@@ -506,10 +505,16 @@ NeonWALReaderReadMsg(NeonWALReader *state)
pq_getmsgint64(&s); /* TimestampTz timestamp; */
reply_requested = pq_getmsgbyte(&s);
nwr_log(DEBUG5, "received keepalive end_lsn=%X/%X reply_requested=%d",
LSN_FORMAT_ARGS(end_lsn),
reply_requested);
LSN_FORMAT_ARGS(end_lsn),
reply_requested);
if (end_lsn < state->req_lsn + state->req_len)
{
snprintf(state->err_msg, sizeof(state->err_msg),
"closing remote connection: requested WAL up to %X/%X, but current donor %s has only up to %X/%X",
LSN_FORMAT_ARGS(state->req_lsn + state->req_len), state->donor_name, LSN_FORMAT_ARGS(end_lsn));
goto err;
}
continue;
/* todo: send replies */
}
default:
nwr_log(WARNING, "invalid replication message type %d", msg_type);

View File

@@ -1460,7 +1460,7 @@ walprop_pg_wal_read(Safekeeper *sk, char *buf, XLogRecPtr startptr, Size count)
static void
walprop_pg_wal_reader_allocate(Safekeeper *sk)
{
char log_prefix[64];
char log_prefix[64];
snprintf(log_prefix, sizeof(log_prefix), "sk %s:%s nwr: ", sk->host, sk->port);
sk->xlogreader = NeonWALReaderAllocate(wal_segment_size, sk->wp->propEpochStartLsn, sk->wp, log_prefix);