From b147722e93a40eaeec2fc5225f1226ef79cf1773 Mon Sep 17 00:00:00 2001 From: Kosntantin Knizhnik Date: Wed, 16 Jul 2025 14:53:26 +0300 Subject: [PATCH] Add neon_communicator_min_inflight_request_lsn function to neon extension --- pgxn/neon/Makefile | 2 + pgxn/neon/communicator.c | 79 +++++++++++++++++++++++------------- pgxn/neon/communicator.h | 2 - pgxn/neon/file_cache.c | 10 ++++- pgxn/neon/neon--1.6--1.7.sql | 3 ++ pgxn/neon/neon--1.7--1.6.sql | 1 + pgxn/neon/neon.h | 1 + pgxn/neon/pagestore_smgr.c | 6 +-- 8 files changed, 69 insertions(+), 35 deletions(-) create mode 100644 pgxn/neon/neon--1.6--1.7.sql create mode 100644 pgxn/neon/neon--1.7--1.6.sql diff --git a/pgxn/neon/Makefile b/pgxn/neon/Makefile index bf7aeb4108..d1ff4ef6a1 100644 --- a/pgxn/neon/Makefile +++ b/pgxn/neon/Makefile @@ -38,6 +38,8 @@ DATA = \ neon--1.3--1.4.sql \ neon--1.4--1.5.sql \ neon--1.5--1.6.sql \ + neon--1.6--1.7.sql \ + neon--1.7--1.6.sql \ neon--1.6--1.5.sql \ neon--1.5--1.4.sql \ neon--1.4--1.3.sql \ diff --git a/pgxn/neon/communicator.c b/pgxn/neon/communicator.c index ddbdaa1c97..d4ed9db3dd 100644 --- a/pgxn/neon/communicator.c +++ b/pgxn/neon/communicator.c @@ -302,6 +302,14 @@ static PrefetchState *MyPState; ) static process_interrupts_callback_t prev_interrupt_cb; + +/* + * Array in shared memory each cell of which contains minimal in-flight request LSN sent to PS by the backend which procno is + * used as index in this array. This array is initially filled with InfiniteXlogRecPtr (UINT64_MAX) so if backend + * didn't send any request to PS, then this value doesn't effect global min. + * + * We support only 64-bit platforms and so assume that access to array elements is atomic and no any synchronization is needed. + */ static XLogRecPtr* minPrefetchLsn; static bool compact_prefetch_buffers(void); @@ -329,7 +337,12 @@ pg_init_communicator(void) static Size CommunicatorShmemSize(void) { +#if PG_MAJORVERSION_NUM >= 15 + Assert(MaxBackends != 0); + return (MaxBackends + NUM_AUXILIARY_PROCS + max_prepared_xacts) * sizeof(XLogRecPtr); +#else return (MAX_BACKENDS + NUM_AUXILIARY_PROCS + max_prepared_xacts) * sizeof(XLogRecPtr); +#endif } void @@ -347,7 +360,10 @@ CommunicatorShmemInit(void) &found); if (!found) { - /* Fill with MAX_UINT64 */ + /* + * Fill with InfiniteXLogRecPtr (UINT64_MAX). + * If backend didn't send any requests to PS, then InfiniteXLogRecPtr doesn't affect global minimal value. + */ memset(minPrefetchLsn, 0xFF, CommunicatorShmemSize()); } } @@ -491,6 +507,20 @@ check_getpage_response(PrefetchRequest* slot, NeonResponse* resp) } } +/* + * Update min in-flight prefetch LSN for this backend. + */ +static void +update_min_prefetch_lsn(uint64 ring_index) +{ + if (ring_index + 1 < MyPState->ring_unused) + { + PrefetchRequest* next_slot = GetPrfSlot(ring_index + 1); + Assert(minPrefetchLsn[MyProcNumber] <= next_slot->request_lsns.request_lsn); + minPrefetchLsn[MyProcNumber] = next_slot->request_lsns.request_lsn; + } +} + /* * If there might be responses still in the TCP buffer, then we should try to * use those, to reduce any TCP backpressure on the OS/PS side. @@ -526,13 +556,7 @@ communicator_prefetch_pump_state(void) if (response == NULL) break; - /* Update min in-flight prefetch reqwuest LSN */ - if (my_ring_index + 1 < MyPState->ring_unused) - { - PrefetchRequest* next_slot = GetPrfSlot(my_ring_index + 1); - Assert(minPrefetchLsn[MyProcNumber] <= next_slot->request_lsns.request_lsn); - minPrefetchLsn[MyProcNumber] = next_slot->request_lsns.request_lsn; - } + update_min_prefetch_lsn(my_ring_index); check_getpage_response(slot, response); @@ -622,7 +646,7 @@ readahead_buffer_resize(int newsize, void *extra) * iteration on the dataset, and trivial compaction. */ for (end = MyPState->ring_unused - 1; - end >= MyPState->ring_last && end != UINT64_MAX && nfree != 0; + end >= MyPState->ring_last && end != InfiniteXLogRecPtr && nfree != 0; end -= 1) { PrefetchRequest *slot = GetPrfSlot(end); @@ -668,7 +692,7 @@ readahead_buffer_resize(int newsize, void *extra) MyNeonCounters->pageserver_open_requests = MyPState->n_requests_inflight; - for (; end >= MyPState->ring_last && end != UINT64_MAX; end -= 1) + for (; end >= MyPState->ring_last && end != InfiniteXLogRecPtr; end -= 1) { PrefetchRequest *slot = GetPrfSlot(end); Assert(slot->status != PRFS_REQUESTED); @@ -712,7 +736,7 @@ consume_prefetch_responses(void) if (MyPState->ring_receive < MyPState->ring_unused) prefetch_wait_for(MyPState->ring_unused - 1); - minPrefetchLsn[MyProcNumber] = InvalidXLogRecPtr; /* No more in-flight prefetch requests from this backend */ + minPrefetchLsn[MyProcNumber] = InfiniteXLogRecPtr; /* No more in-flight prefetch requests from this backend */ /* * We know for sure we're not working on any prefetch pages after @@ -856,13 +880,7 @@ prefetch_read(PrefetchRequest *slot) response = (NeonResponse *) page_server->receive(shard_no); MemoryContextSwitchTo(old); - /* Update min in-flight prefetch reqwuest LSN */ - if (my_ring_index + 1 < MyPState->ring_unused) - { - PrefetchRequest* next_slot = GetPrfSlot(my_ring_index + 1); - Assert(minPrefetchLsn[MyProcNumber] <= next_slot->request_lsns.request_lsn); - minPrefetchLsn[MyProcNumber] = next_slot->request_lsns.request_lsn; - } + update_min_prefetch_lsn(my_ring_index); if (response) { @@ -982,7 +1000,7 @@ prefetch_on_ps_disconnect(void) MyNeonCounters->getpage_prefetch_discards_total += 1; } - minPrefetchLsn[MyProcNumber] = InvalidXLogRecPtr; /* No more in-flight prefetch requests from this backend */ + minPrefetchLsn[MyProcNumber] = InfiniteXLogRecPtr; /* No more in-flight prefetch requests from this backend */ /* * We can have gone into retry due to network error, so update stats with @@ -1117,7 +1135,7 @@ check_page_lsn(NeonGetPageResponse* resp, XLogRecPtr* replay_lsn_ptr) if (RecoveryInProgress()) { XLogRecPtr page_lsn = PageGetLSN((Page)resp->page); -#if PG_VERSION_NUM >= 150000 +#if PG_MAJORVERSION_NUM >= 15 XLogRecPtr replay_lsn = GetCurrentReplayRecPtr(NULL); #else /* @@ -1286,7 +1304,7 @@ Retry: MyPState->ring_unused - MyPState->ring_receive; MyNeonCounters->getpage_prefetches_buffered = MyPState->n_responses_buffered; - last_ring_index = UINT64_MAX; + last_ring_index = InfiniteXLogRecPtr; for (int i = 0; i < nblocks; i++) { @@ -1475,7 +1493,7 @@ Retry: MyPState->ring_unused - MyPState->ring_receive; Assert(any_hits); - Assert(last_ring_index != UINT64_MAX); + Assert(last_ring_index != InfiniteXLogRecPtr); Assert(GetPrfSlot(last_ring_index)->status == PRFS_REQUESTED || GetPrfSlot(last_ring_index)->status == PRFS_RECEIVED); @@ -1986,7 +2004,7 @@ communicator_init(void) * the check here. That's OK, we don't expect the logic to change in old * releases. */ -#if PG_VERSION_NUM>=150000 +#if PG_MAJORVERSION_NUM >= 15 if (MyNeonCounters >= &neon_per_backend_counters_shared[NUM_NEON_PERF_COUNTER_SLOTS]) elog(ERROR, "MyNeonCounters points past end of array"); #endif @@ -2065,7 +2083,7 @@ neon_prefetch_response_usable(neon_request_lsns *request_lsns, * Each request to the pageserver has three LSN values associated with it: * `not_modified_since`, `request_lsn`, and 'effective_request_lsn'. * `not_modified_since` and `request_lsn` are sent to the pageserver, but - * in the primary node, we always use UINT64_MAX as the `request_lsn`, so + * in the primary node, we always use InfiniteXLogRecPtr as the `request_lsn`, so * we remember `effective_request_lsn` separately. In a primary, * `effective_request_lsn` is the same as `not_modified_since`. * See comments in neon_get_request_lsns why we can not use last flush WAL position here. @@ -2290,7 +2308,7 @@ Retry: if (entry == NULL) { ring_index = prefetch_register_bufferv(hashkey.buftag, reqlsns, 1, NULL, false); - Assert(ring_index != UINT64_MAX); + Assert(ring_index != InfiniteXLogRecPtr); slot = GetPrfSlot(ring_index); } else @@ -2685,13 +2703,18 @@ communicator_processinterrupts(void) return prev_interrupt_cb(); } -XLogRecPtr communicator_get_min_prefetch_lsn(void) +PG_FUNCTION_INFO_V1(neon_communicator_min_inflight_request_lsn); + +Datum +neon_communicator_min_inflight_request_lsn(PG_FUNCTION_ARGS) { - XLogRecPtr min_lsn = GetXLogReplayRecPtr(NULL); + XLogRecPtr min_lsn = RecoveryInProgress() + ? GetXLogReplayRecPtr(NULL) + : InfiniteXLogRecPtr; size_t n_procs = ProcGlobal->allProcCount; for (size_t i = 0; i < n_procs; i++) { min_lsn = Min(min_lsn, minPrefetchLsn[i]); } - return min_lsn; + PG_RETURN_INT64(min_lsn); } diff --git a/pgxn/neon/communicator.h b/pgxn/neon/communicator.h index 4f5dc30f15..9563c7421b 100644 --- a/pgxn/neon/communicator.h +++ b/pgxn/neon/communicator.h @@ -46,6 +46,4 @@ extern int communicator_read_slru_segment(SlruKind kind, int64 segno, extern void communicator_reconfigure_timeout_if_needed(void); extern void communicator_prefetch_pump_state(void); -extern XLogRecPtr communicator_get_min_prefetch_lsn(void); - #endif diff --git a/pgxn/neon/file_cache.c b/pgxn/neon/file_cache.c index 54c941ce7a..7cfa769959 100644 --- a/pgxn/neon/file_cache.c +++ b/pgxn/neon/file_cache.c @@ -344,6 +344,9 @@ LfcShmemInit(void) bool found; static HASHCTL info; + if (lfc_max_size <= 0) + return; + lfc_ctl = (FileCacheControl *) ShmemInitStruct("lfc", sizeof(FileCacheControl), &found); if (!found) { @@ -392,8 +395,11 @@ LfcShmemInit(void) void LfcShmemRequest(void) { - RequestAddinShmemSpace(sizeof(FileCacheControl) + hash_estimate_size(SIZE_MB_TO_CHUNKS(lfc_max_size) + 1, FILE_CACHE_ENRTY_SIZE)); - RequestNamedLWLockTranche("lfc_lock", 1); + if (lfc_max_size > 0) + { + RequestAddinShmemSpace(sizeof(FileCacheControl) + hash_estimate_size(SIZE_MB_TO_CHUNKS(lfc_max_size) + 1, FILE_CACHE_ENRTY_SIZE)); + RequestNamedLWLockTranche("lfc_lock", 1); + } } static bool diff --git a/pgxn/neon/neon--1.6--1.7.sql b/pgxn/neon/neon--1.6--1.7.sql new file mode 100644 index 0000000000..6954849787 --- /dev/null +++ b/pgxn/neon/neon--1.6--1.7.sql @@ -0,0 +1,3 @@ +create function neon_communicator_min_inflight_request_lsn() returns pg_lsn +AS 'MODULE_PATHNAME', 'neon_communicator_min_inflight_request_lsn' +LANGUAGE C; diff --git a/pgxn/neon/neon--1.7--1.6.sql b/pgxn/neon/neon--1.7--1.6.sql new file mode 100644 index 0000000000..08f7022d84 --- /dev/null +++ b/pgxn/neon/neon--1.7--1.6.sql @@ -0,0 +1 @@ +drop function neon_communicator_min_inflight_request_lsn(); diff --git a/pgxn/neon/neon.h b/pgxn/neon/neon.h index e0a5acf50f..cb61ce8462 100644 --- a/pgxn/neon/neon.h +++ b/pgxn/neon/neon.h @@ -58,6 +58,7 @@ extern uint32 WAIT_EVENT_NEON_WAL_DL; (errmsg(NEON_TAG "[shard %d] " fmt, shard_no, ##__VA_ARGS__), \ errhidestmt(true), errhidecontext(true), errposition(0), internalerrposition(0))) +#define InfiniteXLogRecPtr UINT64_MAX extern void pg_init_libpagestore(void); extern void pg_init_walproposer(void); diff --git a/pgxn/neon/pagestore_smgr.c b/pgxn/neon/pagestore_smgr.c index 9d25266e10..554a9d4b33 100644 --- a/pgxn/neon/pagestore_smgr.c +++ b/pgxn/neon/pagestore_smgr.c @@ -675,7 +675,7 @@ neon_get_request_lsns(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno, * always have that problem as the can always lag behind the * primary, but for the primary we can avoid it by always * requesting the latest page, by setting request LSN to - * UINT64_MAX. + * InfiniteXLogRecPtr. * * effective_request_lsn is used to check that received response is still valid. * In case of primary node it is last written LSN. Originally we used flush_lsn here, @@ -703,7 +703,7 @@ neon_get_request_lsns(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno, * The problem can be fixed by callingGetFlushRecPtr() before checking if the page is in the buffer cache. * But you can't do that within smgrprefetch(), would need to modify the caller. */ - result->request_lsn = UINT64_MAX; + result->request_lsn = InfiniteXLogRecPtr; result->not_modified_since = last_written_lsn; result->effective_request_lsn = last_written_lsn; } @@ -2158,7 +2158,7 @@ neon_read_slru_segment(SMgrRelation reln, const char* path, int segno, void* buf request_lsn = nm_adjust_lsn(request_lsn); } else - request_lsn = UINT64_MAX; + request_lsn = InfiniteXLogRecPtr; /* * GetRedoStartLsn() returns LSN of the basebackup. We know that the SLRU