Compare commits

...

6 Commits

Author SHA1 Message Date
Kosntantin Knizhnik
7c8e87056b Move check of page LSNB to heck_page_lsn() function 2025-07-14 18:03:58 +03:00
Konstantin Knizhnik
1db119c657 Use GetCurrentReplayRecPtr instead of GetXLogReplayRecPtr in the check for returned page LSN 2025-07-14 08:44:05 +03:00
Kosntantin Knizhnik
00826b4082 Handle case of 0 replay_lsn 2025-07-13 20:41:49 +03:00
Kosntantin Knizhnik
b726293ec3 Fix indentation 2025-07-13 16:55:29 +03:00
Kosntantin Knizhnik
c42c38138e Set request_lsn=max(gc_cutoff,request_lsn) 2025-07-12 15:55:41 +03:00
Kosntantin Knizhnik
af61b7238d Set request_lsn=max(gc_cutoff,request_lsn) 2025-07-12 15:55:35 +03:00
2 changed files with 57 additions and 8 deletions

View File

@@ -2167,7 +2167,7 @@ impl PageServerHandler {
fn effective_request_lsn(
timeline: &Timeline,
last_record_lsn: Lsn,
request_lsn: Lsn,
mut request_lsn: Lsn,
not_modified_since: Lsn,
latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
) -> Result<Lsn, PageStreamError> {
@@ -2195,12 +2195,16 @@ impl PageServerHandler {
if request_lsn < **latest_gc_cutoff_lsn && !timeline.is_gc_blocked_by_lsn_lease_deadline() {
let gc_info = &timeline.gc_info.read().unwrap();
if !gc_info.lsn_covered_by_lease(request_lsn) {
return Err(
PageStreamError::BadRequest(format!(
"tried to request a page version that was garbage collected. requested at {} gc cutoff {}",
request_lsn, **latest_gc_cutoff_lsn
).into())
// While request was in flight, replica apply_lsn may be advanced.
// latest_gc_cutoff_lsn is conservative estimation for min(redo_lsn) for all replicas,
// so it is safe to move request_lsn forward to latest_gc_cutoff_lsn.
// If replica lease is expired and latest_gc_cutoff_lsn>redo_lsn for this replica,
// then check of page LSN at replia protects it from getting too new version of the page.
warn!(
"Tried to request a page version that was garbage collected. requested at {} gc cutoff {}",
request_lsn, **latest_gc_cutoff_lsn
);
request_lsn = **latest_gc_cutoff_lsn;
}
}

View File

@@ -1045,6 +1045,34 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns
Assert(!found);
}
/*
* Check that pahge LSN returned by PS to replica is not beyand replay LSN.
* It can happen only in case of deteriorated lease.
*/
static bool
check_page_lsn(NeonGetPageResponse* resp, XLogRecPtr* replay_lsn_ptr)
{
if (RecoveryInProgress())
{
XLogRecPtr page_lsn = PageGetLSN((Page)resp->page);
#if PG_VERSION_NUM >= 150000
XLogRecPtr replay_lsn = GetCurrentReplayRecPtr(NULL);
#else
/*
* PG14 doesn't have GetCurrentReplayRecPtr() function which returns end of currently applied record.
* And GetXLogReplayRecPtr returns end of WAL records which was already applied.
* So we have to use this hack with resp->req.lsn which is expected to contain end record ptr in this case.
* But it works onlyfor v3 protocol version.
*/
XLogRecPtr replay_lsn = Max(GetXLogReplayRecPtr(NULL), resp->req.hdr.lsn);
#endif
if (replay_lsn_ptr)
*replay_lsn_ptr = replay_lsn;
return replay_lsn == 0 || page_lsn <= replay_lsn;
}
return true;
}
/*
* Lookup of already received prefetch requests. Only already received responses matching required LSNs are accepted.
* Present pages are marked in "mask" bitmap and total number of such pages is returned.
@@ -1068,7 +1096,7 @@ communicator_prefetch_lookupv(NRelFileInfo rinfo, ForkNumber forknum, BlockNumbe
for (int i = 0; i < nblocks; i++)
{
PrfHashEntry *entry;
NeonGetPageResponse* resp;
hashkey.buftag.blockNum = blocknum + i;
entry = prfh_lookup(MyPState->prf_hash, &hashkey);
@@ -1101,8 +1129,16 @@ communicator_prefetch_lookupv(NRelFileInfo rinfo, ForkNumber forknum, BlockNumbe
continue;
}
Assert(slot->response->tag == T_NeonGetPageResponse); /* checked by check_getpage_response when response was assigned to the slot */
memcpy(buffers[i], ((NeonGetPageResponse*)slot->response)->page, BLCKSZ);
resp = (NeonGetPageResponse*)slot->response;
/*
* Ignore "in-future" responses caused by deteriorated lease
*/
if (!check_page_lsn(resp, NULL))
{
continue;
}
memcpy(buffers[i], resp->page, BLCKSZ);
/*
* With lfc_store_prefetch_result=true prefetch result is stored in LFC in prefetch_pump_state when response is received
@@ -2227,6 +2263,15 @@ Retry:
case T_NeonGetPageResponse:
{
NeonGetPageResponse* getpage_resp = (NeonGetPageResponse *) resp;
XLogRecPtr replay_lsn;
if (!check_page_lsn(getpage_resp, &replay_lsn))
{
/* Alternative to throw error is to repeat the query with request_lsn=replay_lsn */
ereport(ERROR,
(errcode(ERRCODE_IO_ERROR),
errmsg("There is no more version of page %u of relation %u/%u/%u.%u at LSN %X/%X at page server, request LSN %X/%X, latest version is at LSN %X/%X",
blockno, RelFileInfoFmt(rinfo), forkNum, LSN_FORMAT_ARGS(replay_lsn), LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(PageGetLSN((Page)getpage_resp->page)))));
}
memcpy(buffer, getpage_resp->page, BLCKSZ);
/*