Implement new apporach of calculating min in-flight LSN in prefetch_pump_state

This commit is contained in:
Kosntantin Knizhnik
2025-07-18 19:04:16 +03:00
committed by Konstantin Knizhnik
parent b41b85f8ec
commit 9bfba1b087
3 changed files with 39 additions and 47 deletions

View File

@@ -83,12 +83,6 @@
neon_shard_log(shard_no, elvl, "Broken connection state: " message, \
##__VA_ARGS__)
/*
* Backend-local minimal in-flight prefetch LSN.
* We store it in neon_per_backend_counters_shared and not in separate array to minimize false cache sharing
*/
#define MIN_BACKEND_PREFETCH_LSN MyNeonCounters->min_prefetch_lsn
page_server_api *page_server;
/*
@@ -456,20 +450,6 @@ check_getpage_response(PrefetchRequest* slot, NeonResponse* resp)
}
}
/*
* Update min in-flight prefetch LSN for this backend.
*/
static void
update_min_prefetch_lsn(uint64 ring_index)
{
if (ring_index + 1 < MyPState->ring_unused)
{
PrefetchRequest* next_slot = GetPrfSlot(ring_index + 1);
Assert(MIN_BACKEND_PREFETCH_LSN <= next_slot->request_lsns.request_lsn);
MIN_BACKEND_PREFETCH_LSN = next_slot->request_lsns.request_lsn;
}
}
/*
* If there might be responses still in the TCP buffer, then we should try to
* use those, to reduce any TCP backpressure on the OS/PS side.
@@ -505,8 +485,6 @@ communicator_prefetch_pump_state(void)
if (response == NULL)
break;
update_min_prefetch_lsn(my_ring_index);
check_getpage_response(slot, response);
/* The slot should still be valid */
@@ -545,6 +523,19 @@ communicator_prefetch_pump_state(void)
END_PREFETCH_RECEIVE_WORK();
if (RecoveryInProgress())
{
/*
* Update backend's min in-flight prefetch LSN.
*/
XLogRecPtr min_prefetch_lsn = GetXLogReplayRecPtr(NULL);
for (uint64_t ring_index = MyPState->ring_receive; ring_index < MyPState->ring_unused; ring_index++)
{
PrefetchRequest* slot = GetPrfSlot(ring_index);
min_prefetch_lsn = Min(slot->request_lsns.request_lsn, min_prefetch_lsn);
}
MIN_BACKEND_PREFETCH_LSN = min_prefetch_lsn;
}
communicator_reconfigure_timeout_if_needed();
}
@@ -685,8 +676,6 @@ consume_prefetch_responses(void)
if (MyPState->ring_receive < MyPState->ring_unused)
prefetch_wait_for(MyPState->ring_unused - 1);
MIN_BACKEND_PREFETCH_LSN = InvalidXLogRecPtr; /* No more in-flight prefetch requests from this backend */
/*
* We know for sure we're not working on any prefetch pages after
* this.
@@ -829,8 +818,6 @@ prefetch_read(PrefetchRequest *slot)
response = (NeonResponse *) page_server->receive(shard_no);
MemoryContextSwitchTo(old);
update_min_prefetch_lsn(my_ring_index);
if (response)
{
check_getpage_response(slot, response);
@@ -949,8 +936,6 @@ prefetch_on_ps_disconnect(void)
MyNeonCounters->getpage_prefetch_discards_total += 1;
}
MIN_BACKEND_PREFETCH_LSN = InvalidXLogRecPtr; /* No more in-flight prefetch requests from this backend */
/*
* We can have gone into retry due to network error, so update stats with
* the latest available
@@ -1052,8 +1037,6 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns
Assert(slot->response == NULL);
Assert(slot->my_ring_index == MyPState->ring_unused);
MIN_BACKEND_PREFETCH_LSN = Min(request.hdr.lsn, MIN_BACKEND_PREFETCH_LSN);
while (!page_server->send(slot->shard_no, (NeonRequest *) &request))
{
Assert(mySlotNo == MyPState->ring_unused);
@@ -1502,7 +1485,6 @@ page_server_request(void const *req)
PG_TRY();
{
before_shmem_exit(prefetch_on_exit, Int32GetDatum(shard_no));
MIN_BACKEND_PREFETCH_LSN = ((NeonRequest *)req)->lsn;
do
{
while (!page_server->send(shard_no, (NeonRequest *) req)
@@ -1514,12 +1496,10 @@ page_server_request(void const *req)
resp = page_server->receive(shard_no);
MyNeonCounters->pageserver_open_requests--;
} while (resp == NULL);
MIN_BACKEND_PREFETCH_LSN = InvalidXLogRecPtr;
cancel_before_shmem_exit(prefetch_on_exit, Int32GetDatum(shard_no));
}
PG_CATCH();
{
MIN_BACKEND_PREFETCH_LSN = InvalidXLogRecPtr;
cancel_before_shmem_exit(prefetch_on_exit, Int32GetDatum(shard_no));
/* Nothing should cancel disconnect: we should not leave connection in opaque state */
HOLD_INTERRUPTS();
@@ -2019,7 +1999,7 @@ neon_prefetch_response_usable(neon_request_lsns *request_lsns,
* Each request to the pageserver has three LSN values associated with it:
* `not_modified_since`, `request_lsn`, and 'effective_request_lsn'.
* `not_modified_since` and `request_lsn` are sent to the pageserver, but
* in the primary node, we always use InvalidXLogRecPtr as the `request_lsn`, so
* in the primary node, we always use UINT64_MAX as the `request_lsn`, so
* we remember `effective_request_lsn` separately. In a primary,
* `effective_request_lsn` is the same as `not_modified_since`.
* See comments in neon_get_request_lsns why we can not use last flush WAL position here.
@@ -2477,18 +2457,15 @@ communicator_read_slru_segment(SlruKind kind, int64 segno, neon_request_lsns *re
PG_TRY();
{
before_shmem_exit(prefetch_on_exit, Int32GetDatum(shard_no));
MIN_BACKEND_PREFETCH_LSN = request_lsns->request_lsn;
do
{
while (!page_server->send(shard_no, &request.hdr) || !page_server->flush(shard_no));
resp = page_server->receive(shard_no);
} while (resp == NULL);
MIN_BACKEND_PREFETCH_LSN = InvalidXLogRecPtr;
cancel_before_shmem_exit(prefetch_on_exit, Int32GetDatum(shard_no));
}
PG_CATCH();
{
MIN_BACKEND_PREFETCH_LSN = InvalidXLogRecPtr;
cancel_before_shmem_exit(prefetch_on_exit, Int32GetDatum(shard_no));
/* Nothing should cancel disconnect: we should not leave connection in opaque state */
HOLD_INTERRUPTS();
@@ -2639,16 +2616,22 @@ PG_FUNCTION_INFO_V1(neon_communicator_min_inflight_request_lsn);
Datum
neon_communicator_min_inflight_request_lsn(PG_FUNCTION_ARGS)
{
XLogRecPtr min_lsn = RecoveryInProgress()
? GetXLogReplayRecPtr(NULL)
: UINT64_MAX;
size_t n_procs = ProcGlobal->allProcCount;
for (size_t i = 0; i < n_procs; i++)
if (RecoveryInProgress())
{
if (neon_per_backend_counters_shared[i].min_prefetch_lsn != InvalidXLogRecPtr)
{
min_lsn = Min(min_lsn, neon_per_backend_counters_shared[i].min_prefetch_lsn);
}
/* Do not hold GC for primary */
PG_RETURN_INT64(UINT64_MAX);
}
else
{
XLogRecPtr min_lsn = GetXLogReplayRecPtr(NULL);
size_t n_procs = ProcGlobal->allProcCount;
for (size_t i = 0; i < n_procs; i++)
{
if (neon_per_backend_counters_shared[i].min_prefetch_lsn != InvalidXLogRecPtr)
{
min_lsn = Min(min_lsn, neon_per_backend_counters_shared[i].min_prefetch_lsn);
}
}
PG_RETURN_INT64(min_lsn);
}
PG_RETURN_INT64(min_lsn);
}

View File

@@ -174,6 +174,12 @@ extern neon_per_backend_counters *neon_per_backend_counters_shared;
#define MyNeonCounters (&neon_per_backend_counters_shared[MyProcNumber])
/*
* Backend-local minimal in-flight prefetch LSN.
* We store it in neon_per_backend_counters_shared and not in separate array to minimize false cache sharing
*/
#define MIN_BACKEND_PREFETCH_LSN MyNeonCounters->min_prefetch_lsn
extern void inc_getpage_wait(uint64 latency);
extern void inc_page_cache_read_wait(uint64 latency);
extern void inc_page_cache_write_wait(uint64 latency);

View File

@@ -588,6 +588,9 @@ neon_get_request_lsns(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
/* Request the page at the end of the last fully replayed LSN. */
XLogRecPtr replay_lsn = GetXLogReplayRecPtr(NULL);
if (MIN_BACKEND_PREFETCH_LSN == InvalidXLogRecPtr)
MIN_BACKEND_PREFETCH_LSN = replay_lsn;
for (int i = 0; i < nblocks; i++)
{
neon_request_lsns *result = &output[i];