Add neon_communicator_min_inflight_request_lsn function to neon extension

This commit is contained in:
Kosntantin Knizhnik
2025-07-16 14:53:26 +03:00
parent 855b6ea6aa
commit b147722e93
8 changed files with 69 additions and 35 deletions

View File

@@ -38,6 +38,8 @@ DATA = \
neon--1.3--1.4.sql \
neon--1.4--1.5.sql \
neon--1.5--1.6.sql \
neon--1.6--1.7.sql \
neon--1.7--1.6.sql \
neon--1.6--1.5.sql \
neon--1.5--1.4.sql \
neon--1.4--1.3.sql \

View File

@@ -302,6 +302,14 @@ static PrefetchState *MyPState;
)
static process_interrupts_callback_t prev_interrupt_cb;
/*
* Array in shared memory each cell of which contains minimal in-flight request LSN sent to PS by the backend which procno is
* used as index in this array. This array is initially filled with InfiniteXlogRecPtr (UINT64_MAX) so if backend
* didn't send any request to PS, then this value doesn't effect global min.
*
* We support only 64-bit platforms and so assume that access to array elements is atomic and no any synchronization is needed.
*/
static XLogRecPtr* minPrefetchLsn;
static bool compact_prefetch_buffers(void);
@@ -329,7 +337,12 @@ pg_init_communicator(void)
static Size
CommunicatorShmemSize(void)
{
#if PG_MAJORVERSION_NUM >= 15
Assert(MaxBackends != 0);
return (MaxBackends + NUM_AUXILIARY_PROCS + max_prepared_xacts) * sizeof(XLogRecPtr);
#else
return (MAX_BACKENDS + NUM_AUXILIARY_PROCS + max_prepared_xacts) * sizeof(XLogRecPtr);
#endif
}
void
@@ -347,7 +360,10 @@ CommunicatorShmemInit(void)
&found);
if (!found)
{
/* Fill with MAX_UINT64 */
/*
* Fill with InfiniteXLogRecPtr (UINT64_MAX).
* If backend didn't send any requests to PS, then InfiniteXLogRecPtr doesn't affect global minimal value.
*/
memset(minPrefetchLsn, 0xFF, CommunicatorShmemSize());
}
}
@@ -491,6 +507,20 @@ check_getpage_response(PrefetchRequest* slot, NeonResponse* resp)
}
}
/*
* Update min in-flight prefetch LSN for this backend.
*/
static void
update_min_prefetch_lsn(uint64 ring_index)
{
if (ring_index + 1 < MyPState->ring_unused)
{
PrefetchRequest* next_slot = GetPrfSlot(ring_index + 1);
Assert(minPrefetchLsn[MyProcNumber] <= next_slot->request_lsns.request_lsn);
minPrefetchLsn[MyProcNumber] = next_slot->request_lsns.request_lsn;
}
}
/*
* If there might be responses still in the TCP buffer, then we should try to
* use those, to reduce any TCP backpressure on the OS/PS side.
@@ -526,13 +556,7 @@ communicator_prefetch_pump_state(void)
if (response == NULL)
break;
/* Update min in-flight prefetch reqwuest LSN */
if (my_ring_index + 1 < MyPState->ring_unused)
{
PrefetchRequest* next_slot = GetPrfSlot(my_ring_index + 1);
Assert(minPrefetchLsn[MyProcNumber] <= next_slot->request_lsns.request_lsn);
minPrefetchLsn[MyProcNumber] = next_slot->request_lsns.request_lsn;
}
update_min_prefetch_lsn(my_ring_index);
check_getpage_response(slot, response);
@@ -622,7 +646,7 @@ readahead_buffer_resize(int newsize, void *extra)
* iteration on the dataset, and trivial compaction.
*/
for (end = MyPState->ring_unused - 1;
end >= MyPState->ring_last && end != UINT64_MAX && nfree != 0;
end >= MyPState->ring_last && end != InfiniteXLogRecPtr && nfree != 0;
end -= 1)
{
PrefetchRequest *slot = GetPrfSlot(end);
@@ -668,7 +692,7 @@ readahead_buffer_resize(int newsize, void *extra)
MyNeonCounters->pageserver_open_requests =
MyPState->n_requests_inflight;
for (; end >= MyPState->ring_last && end != UINT64_MAX; end -= 1)
for (; end >= MyPState->ring_last && end != InfiniteXLogRecPtr; end -= 1)
{
PrefetchRequest *slot = GetPrfSlot(end);
Assert(slot->status != PRFS_REQUESTED);
@@ -712,7 +736,7 @@ consume_prefetch_responses(void)
if (MyPState->ring_receive < MyPState->ring_unused)
prefetch_wait_for(MyPState->ring_unused - 1);
minPrefetchLsn[MyProcNumber] = InvalidXLogRecPtr; /* No more in-flight prefetch requests from this backend */
minPrefetchLsn[MyProcNumber] = InfiniteXLogRecPtr; /* No more in-flight prefetch requests from this backend */
/*
* We know for sure we're not working on any prefetch pages after
@@ -856,13 +880,7 @@ prefetch_read(PrefetchRequest *slot)
response = (NeonResponse *) page_server->receive(shard_no);
MemoryContextSwitchTo(old);
/* Update min in-flight prefetch reqwuest LSN */
if (my_ring_index + 1 < MyPState->ring_unused)
{
PrefetchRequest* next_slot = GetPrfSlot(my_ring_index + 1);
Assert(minPrefetchLsn[MyProcNumber] <= next_slot->request_lsns.request_lsn);
minPrefetchLsn[MyProcNumber] = next_slot->request_lsns.request_lsn;
}
update_min_prefetch_lsn(my_ring_index);
if (response)
{
@@ -982,7 +1000,7 @@ prefetch_on_ps_disconnect(void)
MyNeonCounters->getpage_prefetch_discards_total += 1;
}
minPrefetchLsn[MyProcNumber] = InvalidXLogRecPtr; /* No more in-flight prefetch requests from this backend */
minPrefetchLsn[MyProcNumber] = InfiniteXLogRecPtr; /* No more in-flight prefetch requests from this backend */
/*
* We can have gone into retry due to network error, so update stats with
@@ -1117,7 +1135,7 @@ check_page_lsn(NeonGetPageResponse* resp, XLogRecPtr* replay_lsn_ptr)
if (RecoveryInProgress())
{
XLogRecPtr page_lsn = PageGetLSN((Page)resp->page);
#if PG_VERSION_NUM >= 150000
#if PG_MAJORVERSION_NUM >= 15
XLogRecPtr replay_lsn = GetCurrentReplayRecPtr(NULL);
#else
/*
@@ -1286,7 +1304,7 @@ Retry:
MyPState->ring_unused - MyPState->ring_receive;
MyNeonCounters->getpage_prefetches_buffered =
MyPState->n_responses_buffered;
last_ring_index = UINT64_MAX;
last_ring_index = InfiniteXLogRecPtr;
for (int i = 0; i < nblocks; i++)
{
@@ -1475,7 +1493,7 @@ Retry:
MyPState->ring_unused - MyPState->ring_receive;
Assert(any_hits);
Assert(last_ring_index != UINT64_MAX);
Assert(last_ring_index != InfiniteXLogRecPtr);
Assert(GetPrfSlot(last_ring_index)->status == PRFS_REQUESTED ||
GetPrfSlot(last_ring_index)->status == PRFS_RECEIVED);
@@ -1986,7 +2004,7 @@ communicator_init(void)
* the check here. That's OK, we don't expect the logic to change in old
* releases.
*/
#if PG_VERSION_NUM>=150000
#if PG_MAJORVERSION_NUM >= 15
if (MyNeonCounters >= &neon_per_backend_counters_shared[NUM_NEON_PERF_COUNTER_SLOTS])
elog(ERROR, "MyNeonCounters points past end of array");
#endif
@@ -2065,7 +2083,7 @@ neon_prefetch_response_usable(neon_request_lsns *request_lsns,
* Each request to the pageserver has three LSN values associated with it:
* `not_modified_since`, `request_lsn`, and 'effective_request_lsn'.
* `not_modified_since` and `request_lsn` are sent to the pageserver, but
* in the primary node, we always use UINT64_MAX as the `request_lsn`, so
* in the primary node, we always use InfiniteXLogRecPtr as the `request_lsn`, so
* we remember `effective_request_lsn` separately. In a primary,
* `effective_request_lsn` is the same as `not_modified_since`.
* See comments in neon_get_request_lsns why we can not use last flush WAL position here.
@@ -2290,7 +2308,7 @@ Retry:
if (entry == NULL)
{
ring_index = prefetch_register_bufferv(hashkey.buftag, reqlsns, 1, NULL, false);
Assert(ring_index != UINT64_MAX);
Assert(ring_index != InfiniteXLogRecPtr);
slot = GetPrfSlot(ring_index);
}
else
@@ -2685,13 +2703,18 @@ communicator_processinterrupts(void)
return prev_interrupt_cb();
}
XLogRecPtr communicator_get_min_prefetch_lsn(void)
PG_FUNCTION_INFO_V1(neon_communicator_min_inflight_request_lsn);
Datum
neon_communicator_min_inflight_request_lsn(PG_FUNCTION_ARGS)
{
XLogRecPtr min_lsn = GetXLogReplayRecPtr(NULL);
XLogRecPtr min_lsn = RecoveryInProgress()
? GetXLogReplayRecPtr(NULL)
: InfiniteXLogRecPtr;
size_t n_procs = ProcGlobal->allProcCount;
for (size_t i = 0; i < n_procs; i++)
{
min_lsn = Min(min_lsn, minPrefetchLsn[i]);
}
return min_lsn;
PG_RETURN_INT64(min_lsn);
}

View File

@@ -46,6 +46,4 @@ extern int communicator_read_slru_segment(SlruKind kind, int64 segno,
extern void communicator_reconfigure_timeout_if_needed(void);
extern void communicator_prefetch_pump_state(void);
extern XLogRecPtr communicator_get_min_prefetch_lsn(void);
#endif

View File

@@ -344,6 +344,9 @@ LfcShmemInit(void)
bool found;
static HASHCTL info;
if (lfc_max_size <= 0)
return;
lfc_ctl = (FileCacheControl *) ShmemInitStruct("lfc", sizeof(FileCacheControl), &found);
if (!found)
{
@@ -392,8 +395,11 @@ LfcShmemInit(void)
void
LfcShmemRequest(void)
{
RequestAddinShmemSpace(sizeof(FileCacheControl) + hash_estimate_size(SIZE_MB_TO_CHUNKS(lfc_max_size) + 1, FILE_CACHE_ENRTY_SIZE));
RequestNamedLWLockTranche("lfc_lock", 1);
if (lfc_max_size > 0)
{
RequestAddinShmemSpace(sizeof(FileCacheControl) + hash_estimate_size(SIZE_MB_TO_CHUNKS(lfc_max_size) + 1, FILE_CACHE_ENRTY_SIZE));
RequestNamedLWLockTranche("lfc_lock", 1);
}
}
static bool

View File

@@ -0,0 +1,3 @@
create function neon_communicator_min_inflight_request_lsn() returns pg_lsn
AS 'MODULE_PATHNAME', 'neon_communicator_min_inflight_request_lsn'
LANGUAGE C;

View File

@@ -0,0 +1 @@
drop function neon_communicator_min_inflight_request_lsn();

View File

@@ -58,6 +58,7 @@ extern uint32 WAIT_EVENT_NEON_WAL_DL;
(errmsg(NEON_TAG "[shard %d] " fmt, shard_no, ##__VA_ARGS__), \
errhidestmt(true), errhidecontext(true), errposition(0), internalerrposition(0)))
#define InfiniteXLogRecPtr UINT64_MAX
extern void pg_init_libpagestore(void);
extern void pg_init_walproposer(void);

View File

@@ -675,7 +675,7 @@ neon_get_request_lsns(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
* always have that problem as the can always lag behind the
* primary, but for the primary we can avoid it by always
* requesting the latest page, by setting request LSN to
* UINT64_MAX.
* InfiniteXLogRecPtr.
*
* effective_request_lsn is used to check that received response is still valid.
* In case of primary node it is last written LSN. Originally we used flush_lsn here,
@@ -703,7 +703,7 @@ neon_get_request_lsns(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
* The problem can be fixed by callingGetFlushRecPtr() before checking if the page is in the buffer cache.
* But you can't do that within smgrprefetch(), would need to modify the caller.
*/
result->request_lsn = UINT64_MAX;
result->request_lsn = InfiniteXLogRecPtr;
result->not_modified_since = last_written_lsn;
result->effective_request_lsn = last_written_lsn;
}
@@ -2158,7 +2158,7 @@ neon_read_slru_segment(SMgrRelation reln, const char* path, int segno, void* buf
request_lsn = nm_adjust_lsn(request_lsn);
}
else
request_lsn = UINT64_MAX;
request_lsn = InfiniteXLogRecPtr;
/*
* GetRedoStartLsn() returns LSN of the basebackup. We know that the SLRU