|
|
|
|
@@ -260,7 +260,7 @@ typedef struct PrefetchState
|
|
|
|
|
|
|
|
|
|
/* the buffers */
|
|
|
|
|
prfh_hash *prf_hash;
|
|
|
|
|
int max_shard_no;
|
|
|
|
|
int max_unflushed_shard_no;
|
|
|
|
|
/* Mark shards involved in prefetch */
|
|
|
|
|
uint8 shard_bitmap[(MAX_SHARDS + 7)/8];
|
|
|
|
|
PrefetchRequest prf_buffer[]; /* prefetch buffers */
|
|
|
|
|
@@ -300,6 +300,7 @@ static void prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_
|
|
|
|
|
static bool prefetch_wait_for(uint64 ring_index);
|
|
|
|
|
static void prefetch_cleanup_trailing_unused(void);
|
|
|
|
|
static inline void prefetch_set_unused(uint64 ring_index);
|
|
|
|
|
static bool prefetch_flush_requests(void);
|
|
|
|
|
|
|
|
|
|
static bool neon_prefetch_response_usable(neon_request_lsns *request_lsns,
|
|
|
|
|
PrefetchRequest *slot);
|
|
|
|
|
@@ -469,13 +470,26 @@ communicator_prefetch_pump_state(void)
|
|
|
|
|
{
|
|
|
|
|
START_PREFETCH_RECEIVE_WORK();
|
|
|
|
|
|
|
|
|
|
if (MyPState->ring_receive == MyPState->ring_flush && MyPState->ring_flush < MyPState->ring_unused)
|
|
|
|
|
{
|
|
|
|
|
/*
|
|
|
|
|
* Flush request to avoid requests pending for arbitrary long time,
|
|
|
|
|
* pinning LSN and holding GC at PS.
|
|
|
|
|
*/
|
|
|
|
|
if (!prefetch_flush_requests())
|
|
|
|
|
{
|
|
|
|
|
END_PREFETCH_RECEIVE_WORK();
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
while (MyPState->ring_receive != MyPState->ring_flush)
|
|
|
|
|
{
|
|
|
|
|
NeonResponse *response;
|
|
|
|
|
PrefetchRequest *slot;
|
|
|
|
|
MemoryContext old;
|
|
|
|
|
uint64 my_ring_index = MyPState->ring_receive;
|
|
|
|
|
|
|
|
|
|
slot = GetPrfSlot(MyPState->ring_receive);
|
|
|
|
|
slot = GetPrfSlot(my_ring_index);
|
|
|
|
|
|
|
|
|
|
old = MemoryContextSwitchTo(MyPState->errctx);
|
|
|
|
|
response = page_server->try_receive(slot->shard_no);
|
|
|
|
|
@@ -489,12 +503,12 @@ communicator_prefetch_pump_state(void)
|
|
|
|
|
/* The slot should still be valid */
|
|
|
|
|
if (slot->status != PRFS_REQUESTED ||
|
|
|
|
|
slot->response != NULL ||
|
|
|
|
|
slot->my_ring_index != MyPState->ring_receive)
|
|
|
|
|
slot->my_ring_index != my_ring_index)
|
|
|
|
|
{
|
|
|
|
|
neon_shard_log(slot->shard_no, PANIC,
|
|
|
|
|
"Incorrect prefetch slot state after receive: status=%d response=%p my=" UINT64_FORMAT " receive=" UINT64_FORMAT "",
|
|
|
|
|
slot->status, slot->response,
|
|
|
|
|
slot->my_ring_index, MyPState->ring_receive);
|
|
|
|
|
slot->my_ring_index, my_ring_index);
|
|
|
|
|
}
|
|
|
|
|
/* update prefetch state */
|
|
|
|
|
MyPState->n_responses_buffered += 1;
|
|
|
|
|
@@ -522,6 +536,19 @@ communicator_prefetch_pump_state(void)
|
|
|
|
|
|
|
|
|
|
END_PREFETCH_RECEIVE_WORK();
|
|
|
|
|
|
|
|
|
|
if (RecoveryInProgress())
|
|
|
|
|
{
|
|
|
|
|
/*
|
|
|
|
|
* Update backend's min in-flight prefetch LSN.
|
|
|
|
|
*/
|
|
|
|
|
XLogRecPtr min_backend_prefetch_lsn = last_replay_lsn != InvalidXLogRecPtr ? last_replay_lsn : GetXLogReplayRecPtr(NULL);
|
|
|
|
|
for (uint64_t ring_index = MyPState->ring_receive; ring_index < MyPState->ring_unused; ring_index++)
|
|
|
|
|
{
|
|
|
|
|
PrefetchRequest* slot = GetPrfSlot(ring_index);
|
|
|
|
|
min_backend_prefetch_lsn = Min(slot->request_lsns.request_lsn, min_backend_prefetch_lsn);
|
|
|
|
|
}
|
|
|
|
|
MIN_BACKEND_REQUEST_LSN = min_backend_prefetch_lsn;
|
|
|
|
|
}
|
|
|
|
|
communicator_reconfigure_timeout_if_needed();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -561,7 +588,7 @@ readahead_buffer_resize(int newsize, void *extra)
|
|
|
|
|
newPState->ring_last = newsize;
|
|
|
|
|
newPState->ring_unused = newsize;
|
|
|
|
|
newPState->ring_receive = newsize;
|
|
|
|
|
newPState->max_shard_no = MyPState->max_shard_no;
|
|
|
|
|
newPState->max_unflushed_shard_no = MyPState->max_unflushed_shard_no;
|
|
|
|
|
memcpy(newPState->shard_bitmap, MyPState->shard_bitmap, sizeof(MyPState->shard_bitmap));
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
@@ -661,6 +688,7 @@ consume_prefetch_responses(void)
|
|
|
|
|
{
|
|
|
|
|
if (MyPState->ring_receive < MyPState->ring_unused)
|
|
|
|
|
prefetch_wait_for(MyPState->ring_unused - 1);
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* We know for sure we're not working on any prefetch pages after
|
|
|
|
|
* this.
|
|
|
|
|
@@ -690,7 +718,7 @@ prefetch_cleanup_trailing_unused(void)
|
|
|
|
|
static bool
|
|
|
|
|
prefetch_flush_requests(void)
|
|
|
|
|
{
|
|
|
|
|
for (shardno_t shard_no = 0; shard_no < MyPState->max_shard_no; shard_no++)
|
|
|
|
|
for (shardno_t shard_no = 0; shard_no < MyPState->max_unflushed_shard_no; shard_no++)
|
|
|
|
|
{
|
|
|
|
|
if (BITMAP_ISSET(MyPState->shard_bitmap, shard_no))
|
|
|
|
|
{
|
|
|
|
|
@@ -699,7 +727,8 @@ prefetch_flush_requests(void)
|
|
|
|
|
BITMAP_CLR(MyPState->shard_bitmap, shard_no);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
MyPState->max_shard_no = 0;
|
|
|
|
|
MyPState->max_unflushed_shard_no = 0;
|
|
|
|
|
MyPState->ring_flush = MyPState->ring_unused;
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -723,7 +752,6 @@ prefetch_wait_for(uint64 ring_index)
|
|
|
|
|
{
|
|
|
|
|
if (!prefetch_flush_requests())
|
|
|
|
|
return false;
|
|
|
|
|
MyPState->ring_flush = MyPState->ring_unused;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Assert(MyPState->ring_unused > ring_index);
|
|
|
|
|
@@ -802,6 +830,7 @@ prefetch_read(PrefetchRequest *slot)
|
|
|
|
|
old = MemoryContextSwitchTo(MyPState->errctx);
|
|
|
|
|
response = (NeonResponse *) page_server->receive(shard_no);
|
|
|
|
|
MemoryContextSwitchTo(old);
|
|
|
|
|
|
|
|
|
|
if (response)
|
|
|
|
|
{
|
|
|
|
|
check_getpage_response(slot, response);
|
|
|
|
|
@@ -1010,11 +1039,16 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns
|
|
|
|
|
Assert(mySlotNo == MyPState->ring_unused);
|
|
|
|
|
|
|
|
|
|
if (force_request_lsns)
|
|
|
|
|
{
|
|
|
|
|
slot->request_lsns = *force_request_lsns;
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
neon_get_request_lsns(BufTagGetNRelFileInfo(slot->buftag),
|
|
|
|
|
slot->buftag.forkNum, slot->buftag.blockNum,
|
|
|
|
|
&slot->request_lsns, 1);
|
|
|
|
|
last_replay_lsn = InvalidXLogRecPtr;
|
|
|
|
|
}
|
|
|
|
|
request.hdr.lsn = slot->request_lsns.request_lsn;
|
|
|
|
|
request.hdr.not_modified_since = slot->request_lsns.not_modified_since;
|
|
|
|
|
|
|
|
|
|
@@ -1033,7 +1067,7 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns
|
|
|
|
|
MyPState->n_unused -= 1;
|
|
|
|
|
MyPState->ring_unused += 1;
|
|
|
|
|
BITMAP_SET(MyPState->shard_bitmap, slot->shard_no);
|
|
|
|
|
MyPState->max_shard_no = Max(slot->shard_no+1, MyPState->max_shard_no);
|
|
|
|
|
MyPState->max_unflushed_shard_no = Max(slot->shard_no+1, MyPState->max_unflushed_shard_no);
|
|
|
|
|
|
|
|
|
|
/* update slot state */
|
|
|
|
|
slot->status = PRFS_REQUESTED;
|
|
|
|
|
@@ -1041,6 +1075,25 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns
|
|
|
|
|
Assert(!found);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Check that returned page LSN is consistent with request lsns
|
|
|
|
|
*/
|
|
|
|
|
static void
|
|
|
|
|
check_page_lsn(NeonGetPageResponse* resp)
|
|
|
|
|
{
|
|
|
|
|
if (neon_protocol_version < 3) /* no information to check */
|
|
|
|
|
return;
|
|
|
|
|
if (PageGetLSN(resp->page) > resp->req.hdr.not_modified_since)
|
|
|
|
|
neon_log(PANIC, "Invalid getpage response version: %X/%08X is higher than last modified LSN %X/%08X",
|
|
|
|
|
LSN_FORMAT_ARGS(PageGetLSN(resp->page)),
|
|
|
|
|
LSN_FORMAT_ARGS(resp->req.hdr.not_modified_since));
|
|
|
|
|
|
|
|
|
|
if (PageGetLSN(resp->page) > resp->req.hdr.lsn)
|
|
|
|
|
neon_log(PANIC, "Invalid getpage response version: %X/%08X is higher than request LSN %X/%08X",
|
|
|
|
|
LSN_FORMAT_ARGS(PageGetLSN(resp->page)),
|
|
|
|
|
LSN_FORMAT_ARGS(resp->req.hdr.lsn));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Lookup of already received prefetch requests. Only already received responses matching required LSNs are accepted.
|
|
|
|
|
* Present pages are marked in "mask" bitmap and total number of such pages is returned.
|
|
|
|
|
@@ -1064,7 +1117,7 @@ communicator_prefetch_lookupv(NRelFileInfo rinfo, ForkNumber forknum, BlockNumbe
|
|
|
|
|
for (int i = 0; i < nblocks; i++)
|
|
|
|
|
{
|
|
|
|
|
PrfHashEntry *entry;
|
|
|
|
|
|
|
|
|
|
NeonGetPageResponse* resp;
|
|
|
|
|
hashkey.buftag.blockNum = blocknum + i;
|
|
|
|
|
entry = prfh_lookup(MyPState->prf_hash, &hashkey);
|
|
|
|
|
|
|
|
|
|
@@ -1097,8 +1150,9 @@ communicator_prefetch_lookupv(NRelFileInfo rinfo, ForkNumber forknum, BlockNumbe
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
Assert(slot->response->tag == T_NeonGetPageResponse); /* checked by check_getpage_response when response was assigned to the slot */
|
|
|
|
|
memcpy(buffers[i], ((NeonGetPageResponse*)slot->response)->page, BLCKSZ);
|
|
|
|
|
|
|
|
|
|
resp = (NeonGetPageResponse*)slot->response;
|
|
|
|
|
check_page_lsn(resp);
|
|
|
|
|
memcpy(buffers[i], resp->page, BLCKSZ);
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* With lfc_store_prefetch_result=true prefetch result is stored in LFC in prefetch_pump_state when response is received
|
|
|
|
|
@@ -1391,7 +1445,6 @@ Retry:
|
|
|
|
|
*/
|
|
|
|
|
goto Retry;
|
|
|
|
|
}
|
|
|
|
|
MyPState->ring_flush = MyPState->ring_unused;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return last_ring_index;
|
|
|
|
|
@@ -1461,10 +1514,12 @@ page_server_request(void const *req)
|
|
|
|
|
MyNeonCounters->pageserver_open_requests--;
|
|
|
|
|
} while (resp == NULL);
|
|
|
|
|
cancel_before_shmem_exit(prefetch_on_exit, Int32GetDatum(shard_no));
|
|
|
|
|
last_replay_lsn = InvalidXLogRecPtr;
|
|
|
|
|
}
|
|
|
|
|
PG_CATCH();
|
|
|
|
|
{
|
|
|
|
|
cancel_before_shmem_exit(prefetch_on_exit, Int32GetDatum(shard_no));
|
|
|
|
|
last_replay_lsn = InvalidXLogRecPtr;
|
|
|
|
|
/* Nothing should cancel disconnect: we should not leave connection in opaque state */
|
|
|
|
|
HOLD_INTERRUPTS();
|
|
|
|
|
page_server->disconnect(shard_no);
|
|
|
|
|
@@ -1864,6 +1919,13 @@ nm_to_string(NeonMessage *msg)
|
|
|
|
|
return s.data;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void
|
|
|
|
|
reset_min_request_lsn(int code, Datum arg)
|
|
|
|
|
{
|
|
|
|
|
if (MyProcNumber != -1)
|
|
|
|
|
MIN_BACKEND_REQUEST_LSN = InvalidXLogRecPtr;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* communicator_init() -- Initialize per-backend private state
|
|
|
|
|
*/
|
|
|
|
|
@@ -1875,6 +1937,8 @@ communicator_init(void)
|
|
|
|
|
if (MyPState != NULL)
|
|
|
|
|
return;
|
|
|
|
|
|
|
|
|
|
before_shmem_exit(reset_min_request_lsn, 0);
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Sanity check that theperf counters array is sized correctly. We got
|
|
|
|
|
* this wrong once, and the formula for max number of backends and aux
|
|
|
|
|
@@ -1884,7 +1948,7 @@ communicator_init(void)
|
|
|
|
|
* the check here. That's OK, we don't expect the logic to change in old
|
|
|
|
|
* releases.
|
|
|
|
|
*/
|
|
|
|
|
#if PG_VERSION_NUM>=150000
|
|
|
|
|
#if PG_MAJORVERSION_NUM >= 15
|
|
|
|
|
if (MyNeonCounters >= &neon_per_backend_counters_shared[NUM_NEON_PERF_COUNTER_SLOTS])
|
|
|
|
|
elog(ERROR, "MyNeonCounters points past end of array");
|
|
|
|
|
#endif
|
|
|
|
|
@@ -2223,6 +2287,7 @@ Retry:
|
|
|
|
|
case T_NeonGetPageResponse:
|
|
|
|
|
{
|
|
|
|
|
NeonGetPageResponse* getpage_resp = (NeonGetPageResponse *) resp;
|
|
|
|
|
check_page_lsn(getpage_resp);
|
|
|
|
|
memcpy(buffer, getpage_resp->page, BLCKSZ);
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
@@ -2499,12 +2564,30 @@ communicator_reconfigure_timeout_if_needed(void)
|
|
|
|
|
!AmPrewarmWorker && /* do not pump prefetch state in prewarm worker */
|
|
|
|
|
readahead_getpage_pull_timeout_ms > 0;
|
|
|
|
|
|
|
|
|
|
if (!needs_set && MIN_BACKEND_REQUEST_LSN != InvalidXLogRecPtr)
|
|
|
|
|
{
|
|
|
|
|
if (last_replay_lsn == InvalidXLogRecPtr)
|
|
|
|
|
MIN_BACKEND_REQUEST_LSN = InvalidXLogRecPtr;
|
|
|
|
|
else
|
|
|
|
|
needs_set = true; /* Can not reset MIN_BACKEND_REQUEST_LSN now, have to do it later */
|
|
|
|
|
}
|
|
|
|
|
if (needs_set != timeout_set)
|
|
|
|
|
{
|
|
|
|
|
/* The background writer doens't (shouldn't) read any pages */
|
|
|
|
|
Assert(!AmBackgroundWriterProcess());
|
|
|
|
|
/* The checkpointer doens't (shouldn't) read any pages */
|
|
|
|
|
Assert(!AmCheckpointerProcess());
|
|
|
|
|
/*
|
|
|
|
|
* The background writer/checkpointer doens't (shouldn't) read any pages.
|
|
|
|
|
* And definitely they should not run on replica.
|
|
|
|
|
* The only case when we can get here is replica promotion.
|
|
|
|
|
*/
|
|
|
|
|
if (AmBackgroundWriterProcess() || AmCheckpointerProcess())
|
|
|
|
|
{
|
|
|
|
|
MIN_BACKEND_REQUEST_LSN = InvalidXLogRecPtr;
|
|
|
|
|
if (timeout_set)
|
|
|
|
|
{
|
|
|
|
|
disable_timeout(PS_TIMEOUT_ID, false);
|
|
|
|
|
timeout_set = false;
|
|
|
|
|
}
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (unlikely(PS_TIMEOUT_ID == 0))
|
|
|
|
|
{
|
|
|
|
|
@@ -2537,14 +2620,6 @@ communicator_reconfigure_timeout_if_needed(void)
|
|
|
|
|
static void
|
|
|
|
|
pagestore_timeout_handler(void)
|
|
|
|
|
{
|
|
|
|
|
#if PG_MAJORVERSION_NUM <= 14
|
|
|
|
|
/*
|
|
|
|
|
* PG14: Setting a repeating timeout is not possible, so we signal here
|
|
|
|
|
* that the timeout has already been reset, and by telling the system
|
|
|
|
|
* that system will re-schedule it later if we need to.
|
|
|
|
|
*/
|
|
|
|
|
timeout_set = false;
|
|
|
|
|
#endif
|
|
|
|
|
timeout_signaled = true;
|
|
|
|
|
InterruptPending = true;
|
|
|
|
|
}
|
|
|
|
|
@@ -2564,6 +2639,14 @@ communicator_processinterrupts(void)
|
|
|
|
|
if (!readpage_reentrant_guard && readahead_getpage_pull_timeout_ms > 0)
|
|
|
|
|
communicator_prefetch_pump_state();
|
|
|
|
|
|
|
|
|
|
#if PG_MAJORVERSION_NUM <= 14
|
|
|
|
|
/*
|
|
|
|
|
* PG14: Setting a repeating timeout is not possible, so we signal here
|
|
|
|
|
* that the timeout has already been reset, and by telling the system
|
|
|
|
|
* that system will re-schedule it later if we need to.
|
|
|
|
|
*/
|
|
|
|
|
timeout_set = false;
|
|
|
|
|
#endif
|
|
|
|
|
timeout_signaled = false;
|
|
|
|
|
communicator_reconfigure_timeout_if_needed();
|
|
|
|
|
}
|
|
|
|
|
@@ -2573,3 +2656,28 @@ communicator_processinterrupts(void)
|
|
|
|
|
|
|
|
|
|
return prev_interrupt_cb();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
PG_FUNCTION_INFO_V1(neon_communicator_min_inflight_request_lsn);
|
|
|
|
|
|
|
|
|
|
Datum
|
|
|
|
|
neon_communicator_min_inflight_request_lsn(PG_FUNCTION_ARGS)
|
|
|
|
|
{
|
|
|
|
|
if (RecoveryInProgress())
|
|
|
|
|
{
|
|
|
|
|
/* Do not hold GC for primary */
|
|
|
|
|
PG_RETURN_INT64(UINT64_MAX);
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
XLogRecPtr min_lsn = GetXLogReplayRecPtr(NULL);
|
|
|
|
|
size_t n_procs = ProcGlobal->allProcCount;
|
|
|
|
|
for (size_t i = 0; i < n_procs; i++)
|
|
|
|
|
{
|
|
|
|
|
if (neon_per_backend_counters_shared[i].min_request_lsn != InvalidXLogRecPtr)
|
|
|
|
|
{
|
|
|
|
|
min_lsn = Min(min_lsn, neon_per_backend_counters_shared[i].min_request_lsn);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
PG_RETURN_INT64(min_lsn);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|