From 32b801ea1c67271a70226b94e34e2c9f2cfd7422 Mon Sep 17 00:00:00 2001 From: Kosntantin Knizhnik Date: Thu, 17 Jul 2025 17:02:41 +0300 Subject: [PATCH] Fix mistypings --- pgxn/neon/communicator.c | 93 +++++++++++++++++++++++++++++++--- pgxn/neon/neon_perf_counters.h | 2 +- 2 files changed, 86 insertions(+), 9 deletions(-) diff --git a/pgxn/neon/communicator.c b/pgxn/neon/communicator.c index 4c03193d7e..5626b91aa7 100644 --- a/pgxn/neon/communicator.c +++ b/pgxn/neon/communicator.c @@ -83,6 +83,12 @@ neon_shard_log(shard_no, elvl, "Broken connection state: " message, \ ##__VA_ARGS__) +/* + * Backend-local minimal in-flight prefetch LSN. + * We store it in neon_per_backend_counters_shared and not in separate array to minimize false cache sharing + */ +#define MIN_BACKEND_PREFETCH_LSN MyNeonCounters->min_prefetch_lsn + page_server_api *page_server; /* @@ -450,6 +456,20 @@ check_getpage_response(PrefetchRequest* slot, NeonResponse* resp) } } +/* + * Update min in-flight prefetch LSN for this backend. + */ +static void +update_min_prefetch_lsn(uint64 ring_index) +{ + if (ring_index + 1 < MyPState->ring_unused) + { + PrefetchRequest* next_slot = GetPrfSlot(ring_index + 1); + Assert(MIN_BACKEND_PREFETCH_LSN <= next_slot->request_lsns.request_lsn); + MIN_BACKEND_PREFETCH_LSN = next_slot->request_lsns.request_lsn; + } +} + /* * If there might be responses still in the TCP buffer, then we should try to * use those, to reduce any TCP backpressure on the OS/PS side. @@ -474,8 +494,9 @@ communicator_prefetch_pump_state(void) NeonResponse *response; PrefetchRequest *slot; MemoryContext old; + uint64 my_ring_index = MyPState->ring_receive; - slot = GetPrfSlot(MyPState->ring_receive); + slot = GetPrfSlot(my_ring_index); old = MemoryContextSwitchTo(MyPState->errctx); response = page_server->try_receive(slot->shard_no); @@ -484,17 +505,19 @@ communicator_prefetch_pump_state(void) if (response == NULL) break; + update_min_prefetch_lsn(my_ring_index); + check_getpage_response(slot, response); /* The slot should still be valid */ if (slot->status != PRFS_REQUESTED || slot->response != NULL || - slot->my_ring_index != MyPState->ring_receive) + slot->my_ring_index != my_ring_index) { neon_shard_log(slot->shard_no, PANIC, "Incorrect prefetch slot state after receive: status=%d response=%p my=" UINT64_FORMAT " receive=" UINT64_FORMAT "", slot->status, slot->response, - slot->my_ring_index, MyPState->ring_receive); + slot->my_ring_index, my_ring_index); } /* update prefetch state */ MyPState->n_responses_buffered += 1; @@ -661,6 +684,9 @@ consume_prefetch_responses(void) { if (MyPState->ring_receive < MyPState->ring_unused) prefetch_wait_for(MyPState->ring_unused - 1); + + MIN_BACKEND_PREFETCH_LSN = InvalidXLogRecPtr; /* No more in-flight prefetch requests from this backend */ + /* * We know for sure we're not working on any prefetch pages after * this. @@ -802,6 +828,9 @@ prefetch_read(PrefetchRequest *slot) old = MemoryContextSwitchTo(MyPState->errctx); response = (NeonResponse *) page_server->receive(shard_no); MemoryContextSwitchTo(old); + + update_min_prefetch_lsn(my_ring_index); + if (response) { check_getpage_response(slot, response); @@ -920,6 +949,8 @@ prefetch_on_ps_disconnect(void) MyNeonCounters->getpage_prefetch_discards_total += 1; } + MIN_BACKEND_PREFETCH_LSN = InvalidXLogRecPtr; /* No more in-flight prefetch requests from this backend */ + /* * We can have gone into retry due to network error, so update stats with * the latest available @@ -1021,6 +1052,8 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns Assert(slot->response == NULL); Assert(slot->my_ring_index == MyPState->ring_unused); + MIN_BACKEND_PREFETCH_LSN = Min(request.hdr.lsn, MIN_BACKEND_PREFETCH_LSN); + while (!page_server->send(slot->shard_no, (NeonRequest *) &request)) { Assert(mySlotNo == MyPState->ring_unused); @@ -1041,6 +1074,23 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns Assert(!found); } +/* + * Check that returned page LSN is consistent with request lsns + */ +static void +check_page_lsn(NeonGetPageResponse* resp) +{ + if (PageGetLSN(resp->page) > resp->req.hdr.not_modified_since) + neon_log(PANIC, "Invalid getpage response version: %X/%08X is higher than last modified LSN %X/%08X", + LSN_FORMAT_ARGS(PageGetLSN(resp->page)), + LSN_FORMAT_ARGS(resp->req.hdr.not_modified_since)); + + if (PageGetLSN(resp->page) > resp->req.hdr.lsn) + neon_log(PANIC, "Invalid getpage response version: %X/%08X is higher than request LSN %X/%08X", + LSN_FORMAT_ARGS(PageGetLSN(resp->page)), + LSN_FORMAT_ARGS(resp->req.hdr.lsn)); +} + /* * Lookup of already received prefetch requests. Only already received responses matching required LSNs are accepted. * Present pages are marked in "mask" bitmap and total number of such pages is returned. @@ -1064,7 +1114,7 @@ communicator_prefetch_lookupv(NRelFileInfo rinfo, ForkNumber forknum, BlockNumbe for (int i = 0; i < nblocks; i++) { PrfHashEntry *entry; - + NeonGetPageResponse* resp; hashkey.buftag.blockNum = blocknum + i; entry = prfh_lookup(MyPState->prf_hash, &hashkey); @@ -1097,8 +1147,9 @@ communicator_prefetch_lookupv(NRelFileInfo rinfo, ForkNumber forknum, BlockNumbe continue; } Assert(slot->response->tag == T_NeonGetPageResponse); /* checked by check_getpage_response when response was assigned to the slot */ - memcpy(buffers[i], ((NeonGetPageResponse*)slot->response)->page, BLCKSZ); - + resp = (NeonGetPageResponse*)slot->response; + check_page_lsn(resp); + memcpy(buffers[i], resp->page, BLCKSZ); /* * With lfc_store_prefetch_result=true prefetch result is stored in LFC in prefetch_pump_state when response is received @@ -1449,6 +1500,7 @@ page_server_request(void const *req) PG_TRY(); { before_shmem_exit(prefetch_on_exit, Int32GetDatum(shard_no)); + MIN_BACKEND_PREFETCH_LSN = ((NeonRequest *)req)->lsn; do { while (!page_server->send(shard_no, (NeonRequest *) req) @@ -1460,10 +1512,12 @@ page_server_request(void const *req) resp = page_server->receive(shard_no); MyNeonCounters->pageserver_open_requests--; } while (resp == NULL); + MIN_BACKEND_PREFETCH_LSN = InvalidXLogRecPtr; cancel_before_shmem_exit(prefetch_on_exit, Int32GetDatum(shard_no)); } PG_CATCH(); { + MIN_BACKEND_PREFETCH_LSN = InvalidXLogRecPtr; cancel_before_shmem_exit(prefetch_on_exit, Int32GetDatum(shard_no)); /* Nothing should cancel disconnect: we should not leave connection in opaque state */ HOLD_INTERRUPTS(); @@ -1884,7 +1938,7 @@ communicator_init(void) * the check here. That's OK, we don't expect the logic to change in old * releases. */ -#if PG_VERSION_NUM>=150000 +#if PG_MAJORVERSION_NUM >= 15 if (MyNeonCounters >= &neon_per_backend_counters_shared[NUM_NEON_PERF_COUNTER_SLOTS]) elog(ERROR, "MyNeonCounters points past end of array"); #endif @@ -1963,7 +2017,7 @@ neon_prefetch_response_usable(neon_request_lsns *request_lsns, * Each request to the pageserver has three LSN values associated with it: * `not_modified_since`, `request_lsn`, and 'effective_request_lsn'. * `not_modified_since` and `request_lsn` are sent to the pageserver, but - * in the primary node, we always use UINT64_MAX as the `request_lsn`, so + * in the primary node, we always use InvalidXLogRecPtr as the `request_lsn`, so * we remember `effective_request_lsn` separately. In a primary, * `effective_request_lsn` is the same as `not_modified_since`. * See comments in neon_get_request_lsns why we can not use last flush WAL position here. @@ -2223,6 +2277,7 @@ Retry: case T_NeonGetPageResponse: { NeonGetPageResponse* getpage_resp = (NeonGetPageResponse *) resp; + check_page_lsn(getpage_resp); memcpy(buffer, getpage_resp->page, BLCKSZ); /* @@ -2420,15 +2475,18 @@ communicator_read_slru_segment(SlruKind kind, int64 segno, neon_request_lsns *re PG_TRY(); { before_shmem_exit(prefetch_on_exit, Int32GetDatum(shard_no)); + MIN_BACKEND_PREFETCH_LSN = request_lsns->request_lsn; do { while (!page_server->send(shard_no, &request.hdr) || !page_server->flush(shard_no)); resp = page_server->receive(shard_no); } while (resp == NULL); + MIN_BACKEND_PREFETCH_LSN = InvalidXLogRecPtr; cancel_before_shmem_exit(prefetch_on_exit, Int32GetDatum(shard_no)); } PG_CATCH(); { + MIN_BACKEND_PREFETCH_LSN = InvalidXLogRecPtr; cancel_before_shmem_exit(prefetch_on_exit, Int32GetDatum(shard_no)); /* Nothing should cancel disconnect: we should not leave connection in opaque state */ HOLD_INTERRUPTS(); @@ -2573,3 +2631,22 @@ communicator_processinterrupts(void) return prev_interrupt_cb(); } + +PG_FUNCTION_INFO_V1(neon_communicator_min_inflight_request_lsn); + +Datum +neon_communicator_min_inflight_request_lsn(PG_FUNCTION_ARGS) +{ + XLogRecPtr min_lsn = RecoveryInProgress() + ? GetXLogReplayRecPtr(NULL) + : UINT64_MAX; + size_t n_procs = ProcGlobal->allProcCount; + for (size_t i = 0; i < n_procs; i++) + { + if (neon_per_backend_counters_shared[i].min_prefetch_lsn != InvalidXLogRecPtr) + { + min_lsn = Min(min_lsn, neon_per_backend_counters_shared[i].min_prefetch_lsn); + } + } + PG_RETURN_INT64(min_lsn); +} diff --git a/pgxn/neon/neon_perf_counters.h b/pgxn/neon/neon_perf_counters.h index 3e12313469..ce93f3fbb4 100644 --- a/pgxn/neon/neon_perf_counters.h +++ b/pgxn/neon/neon_perf_counters.h @@ -156,7 +156,7 @@ typedef struct QTHistogramData query_time_hist; /* - * Minimal LSBN of infligth prefetch requests + * Minimal LSN of infligth prefetch requests */ XLogRecPtr min_prefetch_lsn; } neon_per_backend_counters;