Add more checks for prefetch ring state

This commit is contained in:
Kosntantin Knizhnik
2025-06-25 09:37:36 +03:00
parent d9a639e540
commit a2fd5dfce8
4 changed files with 37 additions and 13 deletions

View File

@@ -412,6 +412,18 @@ compact_prefetch_buffers(void)
return false;
}
static void
dump_prefetch_state(void)
{
neon_log(LOG, "PREFETCH STATE: ring_last=%lx, ring_receive=%lx, ring_flush=%lx, ring_unused=%lx",
MyPState->ring_last, MyPState->ring_receive, MyPState->ring_flush, MyPState->ring_unused);
for (uint64 i = MyPState->ring_last; i < MyPState->ring_unused; i++)
{
PrefetchRequest *slot = GetPrfSlot(i);
neon_log(LOG, "PREFETCH STATE: slot %lx status=%d, reqid=%lx", i, slot->status, slot->reqid);
}
}
/*
* If there might be responses still in the TCP buffer, then we should try to
* use those, to reduce any TCP backpressure on the OS/PS side.
@@ -448,7 +460,8 @@ communicator_prefetch_pump_state(void)
if (response->tag != T_NeonGetPageResponse && response->tag != T_NeonErrorResponse)
{
neon_shard_log(slot->shard_no, ERROR, "Unexpected prefetch response %d, ring_receive=%ld, ring_flush=%ld, ring_unused=%ld",
dump_prefetch_state();
neon_shard_log(slot->shard_no, PANIC, "Unexpected prefetch response %d, ring_receive=%ld, ring_flush=%ld, ring_unused=%ld",
response->tag, MyPState->ring_receive, MyPState->ring_flush, MyPState->ring_unused);
}
@@ -456,11 +469,13 @@ communicator_prefetch_pump_state(void)
if (slot->status != PRFS_REQUESTED ||
slot->response != NULL ||
slot->my_ring_index != MyPState->ring_receive)
{
dump_prefetch_state();
neon_shard_log(slot->shard_no, PANIC,
"Incorrect prefetch slot state after receive: status=%d response=%p my=%lu receive=%lu",
slot->status, slot->response,
(long) slot->my_ring_index, (long) MyPState->ring_receive);
}
/* update prefetch state */
MyPState->n_responses_buffered += 1;
MyPState->n_requests_inflight -= 1;
@@ -468,10 +483,11 @@ communicator_prefetch_pump_state(void)
MyNeonCounters->getpage_prefetches_buffered =
MyPState->n_responses_buffered;
if (response->reqid != slot->reqid)
if (response->reqid != slot->reqid && response->tag != T_NeonErrorResponse)
{
dump_prefetch_state();
ereport(PANIC,
(errmsg(NEON_TAG "[shard %d, reqid %lx] pump state receive unexpected response with reqid %lx", slot->shard_no, slot->reqid, response->reqid),
(errmsg(NEON_TAG "[shard %d, reqid %lx] pump state receive unexpected response %d with reqid %lx", slot->shard_no, slot->reqid, response->tag, response->reqid),
errbacktrace()));
}
/* update slot state */
@@ -734,10 +750,13 @@ prefetch_read(PrefetchRequest *slot)
if (slot->status != PRFS_REQUESTED ||
slot->response != NULL ||
slot->my_ring_index != MyPState->ring_receive)
neon_shard_log(slot->shard_no, ERROR,
{
dump_prefetch_state();
neon_shard_log(slot->shard_no, PANIC,
"Incorrect prefetch read: status=%d response=%p my=%lu receive=%lu",
slot->status, slot->response,
(long)slot->my_ring_index, (long)MyPState->ring_receive);
}
/*
* Copy the request info so that if an error happens and the prefetch
@@ -757,21 +776,26 @@ prefetch_read(PrefetchRequest *slot)
if (slot->status != PRFS_REQUESTED ||
slot->response != NULL ||
slot->my_ring_index != MyPState->ring_receive)
{
dump_prefetch_state();
neon_shard_log(shard_no, PANIC,
"Incorrect prefetch slot state after receive: status=%d response=%p my=%lu receive=%lu",
slot->status, slot->response,
(long) slot->my_ring_index, (long) MyPState->ring_receive);
}
if (response->tag != T_NeonGetPageResponse && response->tag != T_NeonErrorResponse)
{
neon_shard_log(shard_no, ERROR, "Unexpected prefetch response %d, ring_receive=%ld, ring_flush=%ld, ring_unused=%ld",
dump_prefetch_state();
neon_shard_log(shard_no, PANIC, "Unexpected prefetch response %d, ring_receive=%ld, ring_flush=%ld, ring_unused=%ld",
response->tag, MyPState->ring_receive, MyPState->ring_flush, MyPState->ring_unused);
}
if (response->reqid != slot->reqid)
{
dump_prefetch_state();
ereport(PANIC,
(errmsg(NEON_TAG "[shard %d, reqid %lx] prefetch_read receive unexpected response with reqid %lx", slot->shard_no, slot->reqid, response->reqid),
(errmsg(NEON_TAG "[shard %d, reqid %lx] prefetch_read receive unexpected response %d with reqid %lx", slot->shard_no, slot->reqid, response->tag, response->reqid),
errbacktrace()));
}
@@ -1396,7 +1420,7 @@ page_server_request(void const *req)
tag.blockNum = ((NeonGetPageRequest *) req)->blkno;
break;
default:
neon_log(ERROR, "Unexpected request tag: %d", messageTag(req));
neon_log(PANIC, "Unexpected request tag: %d", messageTag(req));
}
shard_no = get_shard_number(&tag);
@@ -1527,7 +1551,7 @@ nm_pack_request(NeonRequest *msg)
case T_NeonDbSizeResponse:
case T_NeonGetSlruSegmentResponse:
default:
neon_log(ERROR, "unexpected neon message tag 0x%02x", msg->tag);
neon_log(PANIC, "unexpected neon message tag 0x%02x", msg->tag);
break;
}
return s;
@@ -1679,7 +1703,7 @@ nm_unpack_response(StringInfo s)
case T_NeonDbSizeRequest:
case T_NeonGetSlruSegmentRequest:
default:
neon_log(ERROR, "unexpected neon message tag 0x%02x", tag);
neon_log(PANIC, "unexpected neon message tag 0x%02x", tag);
break;
}

View File

@@ -998,7 +998,7 @@ pageserver_disconnect_shard(shardno_t shard_no)
* to attach wait events to the WaitEventSets.
*/
CLEANUP_AND_DISCONNECT(shard);
neon_log(LOG, "Disconnect shard %d", shard_no);
shard->state = PS_Disconnected;
}

View File

@@ -5,7 +5,7 @@
],
"v16": [
"16.9",
"7a4c0eacaeb9b97416542fa19103061c166460b1"
"85091d9c28958f19f24aee14526735392da11656"
],
"v15": [
"15.13",