diff --git a/pgxn/neon/communicator.c b/pgxn/neon/communicator.c index 264207164d..e6ec24eaae 100644 --- a/pgxn/neon/communicator.c +++ b/pgxn/neon/communicator.c @@ -83,12 +83,6 @@ neon_shard_log(shard_no, elvl, "Broken connection state: " message, \ ##__VA_ARGS__) -/* - * Backend-local minimal in-flight prefetch LSN. - * We store it in neon_per_backend_counters_shared and not in separate array to minimize false cache sharing - */ -#define MIN_BACKEND_PREFETCH_LSN MyNeonCounters->min_prefetch_lsn - page_server_api *page_server; /* @@ -456,20 +450,6 @@ check_getpage_response(PrefetchRequest* slot, NeonResponse* resp) } } -/* - * Update min in-flight prefetch LSN for this backend. - */ -static void -update_min_prefetch_lsn(uint64 ring_index) -{ - if (ring_index + 1 < MyPState->ring_unused) - { - PrefetchRequest* next_slot = GetPrfSlot(ring_index + 1); - Assert(MIN_BACKEND_PREFETCH_LSN <= next_slot->request_lsns.request_lsn); - MIN_BACKEND_PREFETCH_LSN = next_slot->request_lsns.request_lsn; - } -} - /* * If there might be responses still in the TCP buffer, then we should try to * use those, to reduce any TCP backpressure on the OS/PS side. @@ -505,8 +485,6 @@ communicator_prefetch_pump_state(void) if (response == NULL) break; - update_min_prefetch_lsn(my_ring_index); - check_getpage_response(slot, response); /* The slot should still be valid */ @@ -545,6 +523,19 @@ communicator_prefetch_pump_state(void) END_PREFETCH_RECEIVE_WORK(); + if (RecoveryInProgress()) + { + /* + * Update backend's min in-flight prefetch LSN. + */ + XLogRecPtr min_prefetch_lsn = GetXLogReplayRecPtr(NULL); + for (uint64_t ring_index = MyPState->ring_receive; ring_index < MyPState->ring_unused; ring_index++) + { + PrefetchRequest* slot = GetPrfSlot(ring_index); + min_prefetch_lsn = Min(slot->request_lsns.request_lsn, min_prefetch_lsn); + } + MIN_BACKEND_PREFETCH_LSN = min_prefetch_lsn; + } communicator_reconfigure_timeout_if_needed(); } @@ -685,8 +676,6 @@ consume_prefetch_responses(void) if (MyPState->ring_receive < MyPState->ring_unused) prefetch_wait_for(MyPState->ring_unused - 1); - MIN_BACKEND_PREFETCH_LSN = InvalidXLogRecPtr; /* No more in-flight prefetch requests from this backend */ - /* * We know for sure we're not working on any prefetch pages after * this. @@ -829,8 +818,6 @@ prefetch_read(PrefetchRequest *slot) response = (NeonResponse *) page_server->receive(shard_no); MemoryContextSwitchTo(old); - update_min_prefetch_lsn(my_ring_index); - if (response) { check_getpage_response(slot, response); @@ -949,8 +936,6 @@ prefetch_on_ps_disconnect(void) MyNeonCounters->getpage_prefetch_discards_total += 1; } - MIN_BACKEND_PREFETCH_LSN = InvalidXLogRecPtr; /* No more in-flight prefetch requests from this backend */ - /* * We can have gone into retry due to network error, so update stats with * the latest available @@ -1052,8 +1037,6 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns Assert(slot->response == NULL); Assert(slot->my_ring_index == MyPState->ring_unused); - MIN_BACKEND_PREFETCH_LSN = Min(request.hdr.lsn, MIN_BACKEND_PREFETCH_LSN); - while (!page_server->send(slot->shard_no, (NeonRequest *) &request)) { Assert(mySlotNo == MyPState->ring_unused); @@ -1502,7 +1485,6 @@ page_server_request(void const *req) PG_TRY(); { before_shmem_exit(prefetch_on_exit, Int32GetDatum(shard_no)); - MIN_BACKEND_PREFETCH_LSN = ((NeonRequest *)req)->lsn; do { while (!page_server->send(shard_no, (NeonRequest *) req) @@ -1514,12 +1496,10 @@ page_server_request(void const *req) resp = page_server->receive(shard_no); MyNeonCounters->pageserver_open_requests--; } while (resp == NULL); - MIN_BACKEND_PREFETCH_LSN = InvalidXLogRecPtr; cancel_before_shmem_exit(prefetch_on_exit, Int32GetDatum(shard_no)); } PG_CATCH(); { - MIN_BACKEND_PREFETCH_LSN = InvalidXLogRecPtr; cancel_before_shmem_exit(prefetch_on_exit, Int32GetDatum(shard_no)); /* Nothing should cancel disconnect: we should not leave connection in opaque state */ HOLD_INTERRUPTS(); @@ -2019,7 +1999,7 @@ neon_prefetch_response_usable(neon_request_lsns *request_lsns, * Each request to the pageserver has three LSN values associated with it: * `not_modified_since`, `request_lsn`, and 'effective_request_lsn'. * `not_modified_since` and `request_lsn` are sent to the pageserver, but - * in the primary node, we always use InvalidXLogRecPtr as the `request_lsn`, so + * in the primary node, we always use UINT64_MAX as the `request_lsn`, so * we remember `effective_request_lsn` separately. In a primary, * `effective_request_lsn` is the same as `not_modified_since`. * See comments in neon_get_request_lsns why we can not use last flush WAL position here. @@ -2477,18 +2457,15 @@ communicator_read_slru_segment(SlruKind kind, int64 segno, neon_request_lsns *re PG_TRY(); { before_shmem_exit(prefetch_on_exit, Int32GetDatum(shard_no)); - MIN_BACKEND_PREFETCH_LSN = request_lsns->request_lsn; do { while (!page_server->send(shard_no, &request.hdr) || !page_server->flush(shard_no)); resp = page_server->receive(shard_no); } while (resp == NULL); - MIN_BACKEND_PREFETCH_LSN = InvalidXLogRecPtr; cancel_before_shmem_exit(prefetch_on_exit, Int32GetDatum(shard_no)); } PG_CATCH(); { - MIN_BACKEND_PREFETCH_LSN = InvalidXLogRecPtr; cancel_before_shmem_exit(prefetch_on_exit, Int32GetDatum(shard_no)); /* Nothing should cancel disconnect: we should not leave connection in opaque state */ HOLD_INTERRUPTS(); @@ -2639,16 +2616,22 @@ PG_FUNCTION_INFO_V1(neon_communicator_min_inflight_request_lsn); Datum neon_communicator_min_inflight_request_lsn(PG_FUNCTION_ARGS) { - XLogRecPtr min_lsn = RecoveryInProgress() - ? GetXLogReplayRecPtr(NULL) - : UINT64_MAX; - size_t n_procs = ProcGlobal->allProcCount; - for (size_t i = 0; i < n_procs; i++) + if (RecoveryInProgress()) { - if (neon_per_backend_counters_shared[i].min_prefetch_lsn != InvalidXLogRecPtr) - { - min_lsn = Min(min_lsn, neon_per_backend_counters_shared[i].min_prefetch_lsn); - } + /* Do not hold GC for primary */ + PG_RETURN_INT64(UINT64_MAX); + } + else + { + XLogRecPtr min_lsn = GetXLogReplayRecPtr(NULL); + size_t n_procs = ProcGlobal->allProcCount; + for (size_t i = 0; i < n_procs; i++) + { + if (neon_per_backend_counters_shared[i].min_prefetch_lsn != InvalidXLogRecPtr) + { + min_lsn = Min(min_lsn, neon_per_backend_counters_shared[i].min_prefetch_lsn); + } + } + PG_RETURN_INT64(min_lsn); } - PG_RETURN_INT64(min_lsn); } diff --git a/pgxn/neon/neon_perf_counters.h b/pgxn/neon/neon_perf_counters.h index ce93f3fbb4..7602caf88e 100644 --- a/pgxn/neon/neon_perf_counters.h +++ b/pgxn/neon/neon_perf_counters.h @@ -174,6 +174,12 @@ extern neon_per_backend_counters *neon_per_backend_counters_shared; #define MyNeonCounters (&neon_per_backend_counters_shared[MyProcNumber]) +/* + * Backend-local minimal in-flight prefetch LSN. + * We store it in neon_per_backend_counters_shared and not in separate array to minimize false cache sharing + */ +#define MIN_BACKEND_PREFETCH_LSN MyNeonCounters->min_prefetch_lsn + extern void inc_getpage_wait(uint64 latency); extern void inc_page_cache_read_wait(uint64 latency); extern void inc_page_cache_write_wait(uint64 latency); diff --git a/pgxn/neon/pagestore_smgr.c b/pgxn/neon/pagestore_smgr.c index ab6f2a5d67..50b8e64abd 100644 --- a/pgxn/neon/pagestore_smgr.c +++ b/pgxn/neon/pagestore_smgr.c @@ -588,6 +588,9 @@ neon_get_request_lsns(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno, /* Request the page at the end of the last fully replayed LSN. */ XLogRecPtr replay_lsn = GetXLogReplayRecPtr(NULL); + if (MIN_BACKEND_PREFETCH_LSN == InvalidXLogRecPtr) + MIN_BACKEND_PREFETCH_LSN = replay_lsn; + for (int i = 0; i < nblocks; i++) { neon_request_lsns *result = &output[i];