Move check of page LSNB to heck_page_lsn() function

This commit is contained in:
Kosntantin Knizhnik
2025-07-14 18:03:58 +03:00
parent 1db119c657
commit 7c8e87056b

View File

@@ -1045,6 +1045,34 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns
Assert(!found);
}
/*
* Check that pahge LSN returned by PS to replica is not beyand replay LSN.
* It can happen only in case of deteriorated lease.
*/
static bool
check_page_lsn(NeonGetPageResponse* resp, XLogRecPtr* replay_lsn_ptr)
{
if (RecoveryInProgress())
{
XLogRecPtr page_lsn = PageGetLSN((Page)resp->page);
#if PG_VERSION_NUM >= 150000
XLogRecPtr replay_lsn = GetCurrentReplayRecPtr(NULL);
#else
/*
* PG14 doesn't have GetCurrentReplayRecPtr() function which returns end of currently applied record.
* And GetXLogReplayRecPtr returns end of WAL records which was already applied.
* So we have to use this hack with resp->req.lsn which is expected to contain end record ptr in this case.
* But it works onlyfor v3 protocol version.
*/
XLogRecPtr replay_lsn = Max(GetXLogReplayRecPtr(NULL), resp->req.hdr.lsn);
#endif
if (replay_lsn_ptr)
*replay_lsn_ptr = replay_lsn;
return replay_lsn == 0 || page_lsn <= replay_lsn;
}
return true;
}
/*
* Lookup of already received prefetch requests. Only already received responses matching required LSNs are accepted.
* Present pages are marked in "mask" bitmap and total number of such pages is returned.
@@ -1068,7 +1096,7 @@ communicator_prefetch_lookupv(NRelFileInfo rinfo, ForkNumber forknum, BlockNumbe
for (int i = 0; i < nblocks; i++)
{
PrfHashEntry *entry;
NeonGetPageResponse* resp;
hashkey.buftag.blockNum = blocknum + i;
entry = prfh_lookup(MyPState->prf_hash, &hashkey);
@@ -1101,8 +1129,16 @@ communicator_prefetch_lookupv(NRelFileInfo rinfo, ForkNumber forknum, BlockNumbe
continue;
}
Assert(slot->response->tag == T_NeonGetPageResponse); /* checked by check_getpage_response when response was assigned to the slot */
memcpy(buffers[i], ((NeonGetPageResponse*)slot->response)->page, BLCKSZ);
resp = (NeonGetPageResponse*)slot->response;
/*
* Ignore "in-future" responses caused by deteriorated lease
*/
if (!check_page_lsn(resp, NULL))
{
continue;
}
memcpy(buffers[i], resp->page, BLCKSZ);
/*
* With lfc_store_prefetch_result=true prefetch result is stored in LFC in prefetch_pump_state when response is received
@@ -2227,22 +2263,14 @@ Retry:
case T_NeonGetPageResponse:
{
NeonGetPageResponse* getpage_resp = (NeonGetPageResponse *) resp;
if (resp->lsn != UINT64_MAX) /* replica */
XLogRecPtr replay_lsn;
if (!check_page_lsn(getpage_resp, &replay_lsn))
{
XLogRecPtr page_lsn = PageGetLSN((Page)getpage_resp->page);
#if PG_VERSION_NUM >= 150000
XLogRecPtr replay_lsn = GetCurrentReplayRecPtr(NULL);
#else
XLogRecPtr replay_lsn = Max(GetXLogReplayRecPtr(NULL), resp->lsn);
#endif
if (replay_lsn != 0 && page_lsn > replay_lsn)
{
/* Alternative to throw error is to repeat the query with request_lsn=replay_lsn */
ereport(ERROR,
(errcode(ERRCODE_IO_ERROR),
errmsg("There is no more version of page %u of relation %u/%u/%u.%u at LSN %X/%X at page server, request LSN %X/%X, latest version is at LSN %X/%X",
blockno, RelFileInfoFmt(rinfo), forkNum, LSN_FORMAT_ARGS(replay_lsn), LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(page_lsn))));
}
/* Alternative to throw error is to repeat the query with request_lsn=replay_lsn */
ereport(ERROR,
(errcode(ERRCODE_IO_ERROR),
errmsg("There is no more version of page %u of relation %u/%u/%u.%u at LSN %X/%X at page server, request LSN %X/%X, latest version is at LSN %X/%X",
blockno, RelFileInfoFmt(rinfo), forkNum, LSN_FORMAT_ARGS(replay_lsn), LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(PageGetLSN((Page)getpage_resp->page)))));
}
memcpy(buffer, getpage_resp->page, BLCKSZ);