Check prefetch response before assignment to slot (#12371)

## Problem

See [Slack
Channel](https://databricks.enterprise.slack.com/archives/C091LHU6NNB)

Dropping connection without resetting prefetch state can cause
request/response mismatch.
And lack of check response correctness in communicator_prefetch_lookupv
can cause data corruption.

## Summary of changes

1. Validate response before assignment to prefetch slot.
2. Consume prefetch requests before sending any other requests.

---------

Co-authored-by: Kosntantin Knizhnik <konstantin.knizhnik@databricks.com>
Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
This commit is contained in:
Konstantin Knizhnik
2025-07-09 15:49:21 +03:00
committed by GitHub
parent 7049003cf7
commit 4ee0da0a20

View File

@@ -65,6 +65,7 @@
#include "port/pg_iovec.h"
#include "postmaster/interrupt.h"
#include "replication/walsender.h"
#include "storage/ipc.h"
#include "utils/timeout.h"
#include "bitmap.h"
@@ -412,6 +413,47 @@ compact_prefetch_buffers(void)
return false;
}
/*
* Check that prefetch response matches the slot
*/
static void
check_getpage_response(PrefetchRequest* slot, NeonResponse* resp)
{
if (resp->tag != T_NeonGetPageResponse && resp->tag != T_NeonErrorResponse)
{
neon_shard_log(slot->shard_no, PANIC, "Unexpected prefetch response %d, ring_receive=%ld, ring_flush=%ld, ring_unused=%ld",
resp->tag, MyPState->ring_receive, MyPState->ring_flush, MyPState->ring_unused);
}
if (neon_protocol_version >= 3)
{
NRelFileInfo rinfo = BufTagGetNRelFileInfo(slot->buftag);
if (resp->tag == T_NeonGetPageResponse)
{
NeonGetPageResponse * getpage_resp = (NeonGetPageResponse *)resp;
if (resp->reqid != slot->reqid ||
resp->lsn != slot->request_lsns.request_lsn ||
resp->not_modified_since != slot->request_lsns.not_modified_since ||
!RelFileInfoEquals(getpage_resp->req.rinfo, rinfo) ||
getpage_resp->req.forknum != slot->buftag.forkNum ||
getpage_resp->req.blkno != slot->buftag.blockNum)
{
NEON_PANIC_CONNECTION_STATE(slot->shard_no, PANIC,
"Receive unexpected getpage response {reqid=%lx,lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u, block=%u} to get page request {reqid=%lx,lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u, block=%u}",
resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), RelFileInfoFmt(getpage_resp->req.rinfo), getpage_resp->req.forknum, getpage_resp->req.blkno,
slot->reqid, LSN_FORMAT_ARGS(slot->request_lsns.request_lsn), LSN_FORMAT_ARGS(slot->request_lsns.not_modified_since), RelFileInfoFmt(rinfo), slot->buftag.forkNum, slot->buftag.blockNum);
}
}
else if (resp->reqid != slot->reqid ||
resp->lsn != slot->request_lsns.request_lsn ||
resp->not_modified_since != slot->request_lsns.not_modified_since)
{
elog(WARNING, NEON_TAG "Error message {reqid=%lx,lsn=%X/%08X, since=%X/%08X} doesn't match exists request {reqid=%lx,lsn=%X/%08X, since=%X/%08X}",
resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since),
slot->reqid, LSN_FORMAT_ARGS(slot->request_lsns.request_lsn), LSN_FORMAT_ARGS(slot->request_lsns.not_modified_since));
}
}
}
/*
* If there might be responses still in the TCP buffer, then we should try to
* use those, to reduce any TCP backpressure on the OS/PS side.
@@ -446,15 +488,18 @@ communicator_prefetch_pump_state(void)
if (response == NULL)
break;
check_getpage_response(slot, response);
/* The slot should still be valid */
if (slot->status != PRFS_REQUESTED ||
slot->response != NULL ||
slot->my_ring_index != MyPState->ring_receive)
neon_shard_log(slot->shard_no, ERROR,
{
neon_shard_log(slot->shard_no, PANIC,
"Incorrect prefetch slot state after receive: status=%d response=%p my=%lu receive=%lu",
slot->status, slot->response,
(long) slot->my_ring_index, (long) MyPState->ring_receive);
}
/* update prefetch state */
MyPState->n_responses_buffered += 1;
MyPState->n_requests_inflight -= 1;
@@ -593,6 +638,21 @@ readahead_buffer_resize(int newsize, void *extra)
}
/*
* Callback to be called on backend exit to ensure correct state of compute-PS communication
* in case of backend cancel
*/
static void
prefetch_on_exit(int code, Datum arg)
{
if (code != 0) /* do disconnect only on abnormal backend termination */
{
shardno_t shard_no = DatumGetInt32(arg);
prefetch_on_ps_disconnect();
page_server->disconnect(shard_no);
}
}
/*
* Make sure that there are no responses still in the buffer.
@@ -605,6 +665,11 @@ consume_prefetch_responses(void)
{
if (MyPState->ring_receive < MyPState->ring_unused)
prefetch_wait_for(MyPState->ring_unused - 1);
/*
* We know for sure we're not working on any prefetch pages after
* this.
*/
END_PREFETCH_RECEIVE_WORK();
}
static void
@@ -722,10 +787,12 @@ prefetch_read(PrefetchRequest *slot)
if (slot->status != PRFS_REQUESTED ||
slot->response != NULL ||
slot->my_ring_index != MyPState->ring_receive)
neon_shard_log(slot->shard_no, ERROR,
{
neon_shard_log(slot->shard_no, PANIC,
"Incorrect prefetch read: status=%d response=%p my=%lu receive=%lu",
slot->status, slot->response,
(long)slot->my_ring_index, (long)MyPState->ring_receive);
}
/*
* Copy the request info so that if an error happens and the prefetch
@@ -741,14 +808,18 @@ prefetch_read(PrefetchRequest *slot)
MemoryContextSwitchTo(old);
if (response)
{
check_getpage_response(slot, response);
/* The slot should still be valid */
if (slot->status != PRFS_REQUESTED ||
slot->response != NULL ||
slot->my_ring_index != MyPState->ring_receive)
neon_shard_log(shard_no, ERROR,
{
neon_shard_log(shard_no, PANIC,
"Incorrect prefetch slot state after receive: status=%d response=%p my=%lu receive=%lu",
slot->status, slot->response,
(long) slot->my_ring_index, (long) MyPState->ring_receive);
}
/* update prefetch state */
MyPState->n_responses_buffered += 1;
@@ -820,11 +891,10 @@ communicator_prefetch_receive(BufferTag tag)
void
prefetch_on_ps_disconnect(void)
{
bool save_readpage_reentrant_guard = readpage_reentrant_guard;
MyPState->ring_flush = MyPState->ring_unused;
/* Prohibit callig of prefetch_pump_state */
START_PREFETCH_RECEIVE_WORK();
/* Nothing should cancel disconnect: we should not leave connection in opaque state */
HOLD_INTERRUPTS();
while (MyPState->ring_receive < MyPState->ring_unused)
{
@@ -854,9 +924,6 @@ prefetch_on_ps_disconnect(void)
MyNeonCounters->getpage_prefetch_discards_total += 1;
}
/* Restore guard */
readpage_reentrant_guard = save_readpage_reentrant_guard;
/*
* We can have gone into retry due to network error, so update stats with
* the latest available
@@ -865,6 +932,8 @@ prefetch_on_ps_disconnect(void)
MyPState->n_requests_inflight;
MyNeonCounters->getpage_prefetches_buffered =
MyPState->n_responses_buffered;
RESUME_INTERRUPTS();
}
/*
@@ -1027,16 +1096,11 @@ communicator_prefetch_lookupv(NRelFileInfo rinfo, ForkNumber forknum, BlockNumbe
/*
* Ignore errors
*/
if (slot->response->tag != T_NeonGetPageResponse)
if (slot->response->tag == T_NeonErrorResponse)
{
if (slot->response->tag != T_NeonErrorResponse)
{
NEON_PANIC_CONNECTION_STATE(slot->shard_no, PANIC,
"Expected GetPage (0x%02x) or Error (0x%02x) response to GetPageRequest, but got 0x%02x",
T_NeonGetPageResponse, T_NeonErrorResponse, slot->response->tag);
}
continue;
}
Assert(slot->response->tag == T_NeonGetPageResponse); /* checked by check_getpage_response when response was assigned to the slot */
memcpy(buffers[i], ((NeonGetPageResponse*)slot->response)->page, BLCKSZ);
@@ -1351,7 +1415,7 @@ equal_requests(NeonRequest* a, NeonRequest* b)
static NeonResponse *
page_server_request(void const *req)
{
NeonResponse *resp;
NeonResponse *resp = NULL;
BufferTag tag = {0};
shardno_t shard_no;
@@ -1371,7 +1435,7 @@ page_server_request(void const *req)
tag.blockNum = ((NeonGetPageRequest *) req)->blkno;
break;
default:
neon_log(ERROR, "Unexpected request tag: %d", messageTag(req));
neon_log(PANIC, "Unexpected request tag: %d", messageTag(req));
}
shard_no = get_shard_number(&tag);
@@ -1384,9 +1448,12 @@ page_server_request(void const *req)
shard_no = 0;
}
do
consume_prefetch_responses();
PG_TRY();
{
PG_TRY();
before_shmem_exit(prefetch_on_exit, Int32GetDatum(shard_no));
do
{
while (!page_server->send(shard_no, (NeonRequest *) req)
|| !page_server->flush(shard_no))
@@ -1394,30 +1461,24 @@ page_server_request(void const *req)
/* do nothing */
}
MyNeonCounters->pageserver_open_requests++;
consume_prefetch_responses();
resp = page_server->receive(shard_no);
MyNeonCounters->pageserver_open_requests--;
}
PG_CATCH();
{
/*
* Cancellation in this code needs to be handled better at some
* point, but this currently seems fine for now.
*/
page_server->disconnect(shard_no);
MyNeonCounters->pageserver_open_requests = 0;
} while (resp == NULL);
cancel_before_shmem_exit(prefetch_on_exit, Int32GetDatum(shard_no));
}
PG_CATCH();
{
cancel_before_shmem_exit(prefetch_on_exit, Int32GetDatum(shard_no));
/* Nothing should cancel disconnect: we should not leave connection in opaque state */
HOLD_INTERRUPTS();
page_server->disconnect(shard_no);
MyNeonCounters->pageserver_open_requests = 0;
RESUME_INTERRUPTS();
/*
* We know for sure we're not working on any prefetch pages after
* this.
*/
END_PREFETCH_RECEIVE_WORK();
PG_RE_THROW();
}
PG_END_TRY();
PG_RE_THROW();
}
PG_END_TRY();
} while (resp == NULL);
return resp;
}
@@ -1502,7 +1563,7 @@ nm_pack_request(NeonRequest *msg)
case T_NeonDbSizeResponse:
case T_NeonGetSlruSegmentResponse:
default:
neon_log(ERROR, "unexpected neon message tag 0x%02x", msg->tag);
neon_log(PANIC, "unexpected neon message tag 0x%02x", msg->tag);
break;
}
return s;
@@ -1654,7 +1715,7 @@ nm_unpack_response(StringInfo s)
case T_NeonDbSizeRequest:
case T_NeonGetSlruSegmentRequest:
default:
neon_log(ERROR, "unexpected neon message tag 0x%02x", tag);
neon_log(PANIC, "unexpected neon message tag 0x%02x", tag);
break;
}
@@ -1983,7 +2044,7 @@ communicator_exists(NRelFileInfo rinfo, ForkNumber forkNum, neon_request_lsns *r
!RelFileInfoEquals(exists_resp->req.rinfo, request.rinfo) ||
exists_resp->req.forknum != request.forknum)
{
NEON_PANIC_CONNECTION_STATE(-1, PANIC,
NEON_PANIC_CONNECTION_STATE(0, PANIC,
"Unexpect response {reqid=%lx,lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u} to exits request {reqid=%lx,lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u}",
resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), RelFileInfoFmt(exists_resp->req.rinfo), exists_resp->req.forknum,
request.hdr.reqid, LSN_FORMAT_ARGS(request.hdr.lsn), LSN_FORMAT_ARGS(request.hdr.not_modified_since), RelFileInfoFmt(request.rinfo), request.forknum);
@@ -2014,7 +2075,7 @@ communicator_exists(NRelFileInfo rinfo, ForkNumber forkNum, neon_request_lsns *r
break;
default:
NEON_PANIC_CONNECTION_STATE(-1, PANIC,
NEON_PANIC_CONNECTION_STATE(0, PANIC,
"Expected Exists (0x%02x) or Error (0x%02x) response to ExistsRequest, but got 0x%02x",
T_NeonExistsResponse, T_NeonErrorResponse, resp->tag);
}
@@ -2158,6 +2219,7 @@ Retry:
Assert(memcmp(&hashkey.buftag, &slot->buftag, sizeof(BufferTag)) == 0);
Assert(hashkey.buftag.blockNum == base_blockno + i);
/* We already checked that response match request when storing it in slot */
resp = slot->response;
switch (resp->tag)
@@ -2165,21 +2227,6 @@ Retry:
case T_NeonGetPageResponse:
{
NeonGetPageResponse* getpage_resp = (NeonGetPageResponse *) resp;
if (neon_protocol_version >= 3)
{
if (resp->reqid != slot->reqid ||
resp->lsn != slot->request_lsns.request_lsn ||
resp->not_modified_since != slot->request_lsns.not_modified_since ||
!RelFileInfoEquals(getpage_resp->req.rinfo, rinfo) ||
getpage_resp->req.forknum != forkNum ||
getpage_resp->req.blkno != base_blockno + i)
{
NEON_PANIC_CONNECTION_STATE(-1, PANIC,
"Unexpect response {reqid=%lx,lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u, block=%u} to get page request {reqid=%lx,lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u, block=%u}",
resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), RelFileInfoFmt(getpage_resp->req.rinfo), getpage_resp->req.forknum, getpage_resp->req.blkno,
slot->reqid, LSN_FORMAT_ARGS(slot->request_lsns.request_lsn), LSN_FORMAT_ARGS(slot->request_lsns.not_modified_since), RelFileInfoFmt(rinfo), forkNum, base_blockno + i);
}
}
memcpy(buffer, getpage_resp->page, BLCKSZ);
/*
@@ -2192,17 +2239,6 @@ Retry:
break;
}
case T_NeonErrorResponse:
if (neon_protocol_version >= 3)
{
if (resp->reqid != slot->reqid ||
resp->lsn != slot->request_lsns.request_lsn ||
resp->not_modified_since != slot->request_lsns.not_modified_since)
{
elog(WARNING, NEON_TAG "Error message {reqid=%lx,lsn=%X/%08X, since=%X/%08X} doesn't match get relsize request {reqid=%lx,lsn=%X/%08X, since=%X/%08X}",
resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since),
slot->reqid, LSN_FORMAT_ARGS(slot->request_lsns.request_lsn), LSN_FORMAT_ARGS(slot->request_lsns.not_modified_since));
}
}
ereport(ERROR,
(errcode(ERRCODE_IO_ERROR),
errmsg(NEON_TAG "[shard %d, reqid %lx] could not read block %u in rel %u/%u/%u.%u from page server at lsn %X/%08X",
@@ -2257,7 +2293,7 @@ communicator_nblocks(NRelFileInfo rinfo, ForkNumber forknum, neon_request_lsns *
!RelFileInfoEquals(relsize_resp->req.rinfo, request.rinfo) ||
relsize_resp->req.forknum != forknum)
{
NEON_PANIC_CONNECTION_STATE(-1, PANIC,
NEON_PANIC_CONNECTION_STATE(0, PANIC,
"Unexpect response {reqid=%lx,lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u} to get relsize request {reqid=%lx,lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u}",
resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), RelFileInfoFmt(relsize_resp->req.rinfo), relsize_resp->req.forknum,
request.hdr.reqid, LSN_FORMAT_ARGS(request.hdr.lsn), LSN_FORMAT_ARGS(request.hdr.not_modified_since), RelFileInfoFmt(request.rinfo), forknum);
@@ -2288,7 +2324,7 @@ communicator_nblocks(NRelFileInfo rinfo, ForkNumber forknum, neon_request_lsns *
break;
default:
NEON_PANIC_CONNECTION_STATE(-1, PANIC,
NEON_PANIC_CONNECTION_STATE(0, PANIC,
"Expected Nblocks (0x%02x) or Error (0x%02x) response to NblocksRequest, but got 0x%02x",
T_NeonNblocksResponse, T_NeonErrorResponse, resp->tag);
}
@@ -2327,7 +2363,7 @@ communicator_dbsize(Oid dbNode, neon_request_lsns *request_lsns)
if (!equal_requests(resp, &request.hdr) ||
dbsize_resp->req.dbNode != dbNode)
{
NEON_PANIC_CONNECTION_STATE(-1, PANIC,
NEON_PANIC_CONNECTION_STATE(0, PANIC,
"Unexpect response {reqid=%lx,lsn=%X/%08X, since=%X/%08X, dbNode=%u} to get DB size request {reqid=%lx,lsn=%X/%08X, since=%X/%08X, dbNode=%u}",
resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), dbsize_resp->req.dbNode,
request.hdr.reqid, LSN_FORMAT_ARGS(request.hdr.lsn), LSN_FORMAT_ARGS(request.hdr.not_modified_since), dbNode);
@@ -2356,7 +2392,7 @@ communicator_dbsize(Oid dbNode, neon_request_lsns *request_lsns)
break;
default:
NEON_PANIC_CONNECTION_STATE(-1, PANIC,
NEON_PANIC_CONNECTION_STATE(0, PANIC,
"Expected DbSize (0x%02x) or Error (0x%02x) response to DbSizeRequest, but got 0x%02x",
T_NeonDbSizeResponse, T_NeonErrorResponse, resp->tag);
}
@@ -2372,7 +2408,7 @@ communicator_read_slru_segment(SlruKind kind, int64 segno, neon_request_lsns *re
{
int n_blocks;
shardno_t shard_no = 0; /* All SLRUs are at shard 0 */
NeonResponse *resp;
NeonResponse *resp = NULL;
NeonGetSlruSegmentRequest request;
request = (NeonGetSlruSegmentRequest) {
@@ -2383,14 +2419,29 @@ communicator_read_slru_segment(SlruKind kind, int64 segno, neon_request_lsns *re
.segno = segno
};
do
consume_prefetch_responses();
PG_TRY();
{
while (!page_server->send(shard_no, &request.hdr) || !page_server->flush(shard_no));
before_shmem_exit(prefetch_on_exit, Int32GetDatum(shard_no));
do
{
while (!page_server->send(shard_no, &request.hdr) || !page_server->flush(shard_no));
resp = page_server->receive(shard_no);
} while (resp == NULL);
cancel_before_shmem_exit(prefetch_on_exit, Int32GetDatum(shard_no));
}
PG_CATCH();
{
cancel_before_shmem_exit(prefetch_on_exit, Int32GetDatum(shard_no));
/* Nothing should cancel disconnect: we should not leave connection in opaque state */
HOLD_INTERRUPTS();
page_server->disconnect(shard_no);
RESUME_INTERRUPTS();
consume_prefetch_responses();
resp = page_server->receive(shard_no);
} while (resp == NULL);
PG_RE_THROW();
}
PG_END_TRY();
switch (resp->tag)
{
@@ -2403,7 +2454,7 @@ communicator_read_slru_segment(SlruKind kind, int64 segno, neon_request_lsns *re
slru_resp->req.kind != kind ||
slru_resp->req.segno != segno)
{
NEON_PANIC_CONNECTION_STATE(-1, PANIC,
NEON_PANIC_CONNECTION_STATE(0, PANIC,
"Unexpect response {reqid=%lx,lsn=%X/%08X, since=%X/%08X, kind=%u, segno=%u} to get SLRU segment request {reqid=%lx,lsn=%X/%08X, since=%X/%08X, kind=%u, segno=%lluu}",
resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), slru_resp->req.kind, slru_resp->req.segno,
request.hdr.reqid, LSN_FORMAT_ARGS(request.hdr.lsn), LSN_FORMAT_ARGS(request.hdr.not_modified_since), kind, (unsigned long long) segno);
@@ -2435,7 +2486,7 @@ communicator_read_slru_segment(SlruKind kind, int64 segno, neon_request_lsns *re
break;
default:
NEON_PANIC_CONNECTION_STATE(-1, PANIC,
NEON_PANIC_CONNECTION_STATE(0, PANIC,
"Expected GetSlruSegment (0x%02x) or Error (0x%02x) response to GetSlruSegmentRequest, but got 0x%02x",
T_NeonGetSlruSegmentResponse, T_NeonErrorResponse, resp->tag);
}