diff --git a/pgxn/neon/communicator.c b/pgxn/neon/communicator.c index 6f8b295862..f09254d068 100644 --- a/pgxn/neon/communicator.c +++ b/pgxn/neon/communicator.c @@ -1045,6 +1045,34 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns Assert(!found); } +/* + * Check that pahge LSN returned by PS to replica is not beyand replay LSN. + * It can happen only in case of deteriorated lease. + */ +static bool +check_page_lsn(NeonGetPageResponse* resp, XLogRecPtr* replay_lsn_ptr) +{ + if (RecoveryInProgress()) + { + XLogRecPtr page_lsn = PageGetLSN((Page)resp->page); +#if PG_VERSION_NUM >= 150000 + XLogRecPtr replay_lsn = GetCurrentReplayRecPtr(NULL); +#else + /* + * PG14 doesn't have GetCurrentReplayRecPtr() function which returns end of currently applied record. + * And GetXLogReplayRecPtr returns end of WAL records which was already applied. + * So we have to use this hack with resp->req.lsn which is expected to contain end record ptr in this case. + * But it works onlyfor v3 protocol version. + */ + XLogRecPtr replay_lsn = Max(GetXLogReplayRecPtr(NULL), resp->req.hdr.lsn); +#endif + if (replay_lsn_ptr) + *replay_lsn_ptr = replay_lsn; + return replay_lsn == 0 || page_lsn <= replay_lsn; + } + return true; +} + /* * Lookup of already received prefetch requests. Only already received responses matching required LSNs are accepted. * Present pages are marked in "mask" bitmap and total number of such pages is returned. @@ -1068,7 +1096,7 @@ communicator_prefetch_lookupv(NRelFileInfo rinfo, ForkNumber forknum, BlockNumbe for (int i = 0; i < nblocks; i++) { PrfHashEntry *entry; - + NeonGetPageResponse* resp; hashkey.buftag.blockNum = blocknum + i; entry = prfh_lookup(MyPState->prf_hash, &hashkey); @@ -1101,8 +1129,16 @@ communicator_prefetch_lookupv(NRelFileInfo rinfo, ForkNumber forknum, BlockNumbe continue; } Assert(slot->response->tag == T_NeonGetPageResponse); /* checked by check_getpage_response when response was assigned to the slot */ - memcpy(buffers[i], ((NeonGetPageResponse*)slot->response)->page, BLCKSZ); + resp = (NeonGetPageResponse*)slot->response; + /* + * Ignore "in-future" responses caused by deteriorated lease + */ + if (!check_page_lsn(resp, NULL)) + { + continue; + } + memcpy(buffers[i], resp->page, BLCKSZ); /* * With lfc_store_prefetch_result=true prefetch result is stored in LFC in prefetch_pump_state when response is received @@ -2227,22 +2263,14 @@ Retry: case T_NeonGetPageResponse: { NeonGetPageResponse* getpage_resp = (NeonGetPageResponse *) resp; - if (resp->lsn != UINT64_MAX) /* replica */ + XLogRecPtr replay_lsn; + if (!check_page_lsn(getpage_resp, &replay_lsn)) { - XLogRecPtr page_lsn = PageGetLSN((Page)getpage_resp->page); -#if PG_VERSION_NUM >= 150000 - XLogRecPtr replay_lsn = GetCurrentReplayRecPtr(NULL); -#else - XLogRecPtr replay_lsn = Max(GetXLogReplayRecPtr(NULL), resp->lsn); -#endif - if (replay_lsn != 0 && page_lsn > replay_lsn) - { - /* Alternative to throw error is to repeat the query with request_lsn=replay_lsn */ - ereport(ERROR, - (errcode(ERRCODE_IO_ERROR), - errmsg("There is no more version of page %u of relation %u/%u/%u.%u at LSN %X/%X at page server, request LSN %X/%X, latest version is at LSN %X/%X", - blockno, RelFileInfoFmt(rinfo), forkNum, LSN_FORMAT_ARGS(replay_lsn), LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(page_lsn)))); - } + /* Alternative to throw error is to repeat the query with request_lsn=replay_lsn */ + ereport(ERROR, + (errcode(ERRCODE_IO_ERROR), + errmsg("There is no more version of page %u of relation %u/%u/%u.%u at LSN %X/%X at page server, request LSN %X/%X, latest version is at LSN %X/%X", + blockno, RelFileInfoFmt(rinfo), forkNum, LSN_FORMAT_ARGS(replay_lsn), LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(PageGetLSN((Page)getpage_resp->page))))); } memcpy(buffer, getpage_resp->page, BLCKSZ);