mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-14 17:02:56 +00:00
Fix mistypings
This commit is contained in:
committed by
Konstantin Knizhnik
parent
89496a32d0
commit
32b801ea1c
@@ -83,6 +83,12 @@
|
||||
neon_shard_log(shard_no, elvl, "Broken connection state: " message, \
|
||||
##__VA_ARGS__)
|
||||
|
||||
/*
|
||||
* Backend-local minimal in-flight prefetch LSN.
|
||||
* We store it in neon_per_backend_counters_shared and not in separate array to minimize false cache sharing
|
||||
*/
|
||||
#define MIN_BACKEND_PREFETCH_LSN MyNeonCounters->min_prefetch_lsn
|
||||
|
||||
page_server_api *page_server;
|
||||
|
||||
/*
|
||||
@@ -450,6 +456,20 @@ check_getpage_response(PrefetchRequest* slot, NeonResponse* resp)
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Update min in-flight prefetch LSN for this backend.
|
||||
*/
|
||||
static void
|
||||
update_min_prefetch_lsn(uint64 ring_index)
|
||||
{
|
||||
if (ring_index + 1 < MyPState->ring_unused)
|
||||
{
|
||||
PrefetchRequest* next_slot = GetPrfSlot(ring_index + 1);
|
||||
Assert(MIN_BACKEND_PREFETCH_LSN <= next_slot->request_lsns.request_lsn);
|
||||
MIN_BACKEND_PREFETCH_LSN = next_slot->request_lsns.request_lsn;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* If there might be responses still in the TCP buffer, then we should try to
|
||||
* use those, to reduce any TCP backpressure on the OS/PS side.
|
||||
@@ -474,8 +494,9 @@ communicator_prefetch_pump_state(void)
|
||||
NeonResponse *response;
|
||||
PrefetchRequest *slot;
|
||||
MemoryContext old;
|
||||
uint64 my_ring_index = MyPState->ring_receive;
|
||||
|
||||
slot = GetPrfSlot(MyPState->ring_receive);
|
||||
slot = GetPrfSlot(my_ring_index);
|
||||
|
||||
old = MemoryContextSwitchTo(MyPState->errctx);
|
||||
response = page_server->try_receive(slot->shard_no);
|
||||
@@ -484,17 +505,19 @@ communicator_prefetch_pump_state(void)
|
||||
if (response == NULL)
|
||||
break;
|
||||
|
||||
update_min_prefetch_lsn(my_ring_index);
|
||||
|
||||
check_getpage_response(slot, response);
|
||||
|
||||
/* The slot should still be valid */
|
||||
if (slot->status != PRFS_REQUESTED ||
|
||||
slot->response != NULL ||
|
||||
slot->my_ring_index != MyPState->ring_receive)
|
||||
slot->my_ring_index != my_ring_index)
|
||||
{
|
||||
neon_shard_log(slot->shard_no, PANIC,
|
||||
"Incorrect prefetch slot state after receive: status=%d response=%p my=" UINT64_FORMAT " receive=" UINT64_FORMAT "",
|
||||
slot->status, slot->response,
|
||||
slot->my_ring_index, MyPState->ring_receive);
|
||||
slot->my_ring_index, my_ring_index);
|
||||
}
|
||||
/* update prefetch state */
|
||||
MyPState->n_responses_buffered += 1;
|
||||
@@ -661,6 +684,9 @@ consume_prefetch_responses(void)
|
||||
{
|
||||
if (MyPState->ring_receive < MyPState->ring_unused)
|
||||
prefetch_wait_for(MyPState->ring_unused - 1);
|
||||
|
||||
MIN_BACKEND_PREFETCH_LSN = InvalidXLogRecPtr; /* No more in-flight prefetch requests from this backend */
|
||||
|
||||
/*
|
||||
* We know for sure we're not working on any prefetch pages after
|
||||
* this.
|
||||
@@ -802,6 +828,9 @@ prefetch_read(PrefetchRequest *slot)
|
||||
old = MemoryContextSwitchTo(MyPState->errctx);
|
||||
response = (NeonResponse *) page_server->receive(shard_no);
|
||||
MemoryContextSwitchTo(old);
|
||||
|
||||
update_min_prefetch_lsn(my_ring_index);
|
||||
|
||||
if (response)
|
||||
{
|
||||
check_getpage_response(slot, response);
|
||||
@@ -920,6 +949,8 @@ prefetch_on_ps_disconnect(void)
|
||||
MyNeonCounters->getpage_prefetch_discards_total += 1;
|
||||
}
|
||||
|
||||
MIN_BACKEND_PREFETCH_LSN = InvalidXLogRecPtr; /* No more in-flight prefetch requests from this backend */
|
||||
|
||||
/*
|
||||
* We can have gone into retry due to network error, so update stats with
|
||||
* the latest available
|
||||
@@ -1021,6 +1052,8 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns
|
||||
Assert(slot->response == NULL);
|
||||
Assert(slot->my_ring_index == MyPState->ring_unused);
|
||||
|
||||
MIN_BACKEND_PREFETCH_LSN = Min(request.hdr.lsn, MIN_BACKEND_PREFETCH_LSN);
|
||||
|
||||
while (!page_server->send(slot->shard_no, (NeonRequest *) &request))
|
||||
{
|
||||
Assert(mySlotNo == MyPState->ring_unused);
|
||||
@@ -1041,6 +1074,23 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns
|
||||
Assert(!found);
|
||||
}
|
||||
|
||||
/*
|
||||
* Check that returned page LSN is consistent with request lsns
|
||||
*/
|
||||
static void
|
||||
check_page_lsn(NeonGetPageResponse* resp)
|
||||
{
|
||||
if (PageGetLSN(resp->page) > resp->req.hdr.not_modified_since)
|
||||
neon_log(PANIC, "Invalid getpage response version: %X/%08X is higher than last modified LSN %X/%08X",
|
||||
LSN_FORMAT_ARGS(PageGetLSN(resp->page)),
|
||||
LSN_FORMAT_ARGS(resp->req.hdr.not_modified_since));
|
||||
|
||||
if (PageGetLSN(resp->page) > resp->req.hdr.lsn)
|
||||
neon_log(PANIC, "Invalid getpage response version: %X/%08X is higher than request LSN %X/%08X",
|
||||
LSN_FORMAT_ARGS(PageGetLSN(resp->page)),
|
||||
LSN_FORMAT_ARGS(resp->req.hdr.lsn));
|
||||
}
|
||||
|
||||
/*
|
||||
* Lookup of already received prefetch requests. Only already received responses matching required LSNs are accepted.
|
||||
* Present pages are marked in "mask" bitmap and total number of such pages is returned.
|
||||
@@ -1064,7 +1114,7 @@ communicator_prefetch_lookupv(NRelFileInfo rinfo, ForkNumber forknum, BlockNumbe
|
||||
for (int i = 0; i < nblocks; i++)
|
||||
{
|
||||
PrfHashEntry *entry;
|
||||
|
||||
NeonGetPageResponse* resp;
|
||||
hashkey.buftag.blockNum = blocknum + i;
|
||||
entry = prfh_lookup(MyPState->prf_hash, &hashkey);
|
||||
|
||||
@@ -1097,8 +1147,9 @@ communicator_prefetch_lookupv(NRelFileInfo rinfo, ForkNumber forknum, BlockNumbe
|
||||
continue;
|
||||
}
|
||||
Assert(slot->response->tag == T_NeonGetPageResponse); /* checked by check_getpage_response when response was assigned to the slot */
|
||||
memcpy(buffers[i], ((NeonGetPageResponse*)slot->response)->page, BLCKSZ);
|
||||
|
||||
resp = (NeonGetPageResponse*)slot->response;
|
||||
check_page_lsn(resp);
|
||||
memcpy(buffers[i], resp->page, BLCKSZ);
|
||||
|
||||
/*
|
||||
* With lfc_store_prefetch_result=true prefetch result is stored in LFC in prefetch_pump_state when response is received
|
||||
@@ -1449,6 +1500,7 @@ page_server_request(void const *req)
|
||||
PG_TRY();
|
||||
{
|
||||
before_shmem_exit(prefetch_on_exit, Int32GetDatum(shard_no));
|
||||
MIN_BACKEND_PREFETCH_LSN = ((NeonRequest *)req)->lsn;
|
||||
do
|
||||
{
|
||||
while (!page_server->send(shard_no, (NeonRequest *) req)
|
||||
@@ -1460,10 +1512,12 @@ page_server_request(void const *req)
|
||||
resp = page_server->receive(shard_no);
|
||||
MyNeonCounters->pageserver_open_requests--;
|
||||
} while (resp == NULL);
|
||||
MIN_BACKEND_PREFETCH_LSN = InvalidXLogRecPtr;
|
||||
cancel_before_shmem_exit(prefetch_on_exit, Int32GetDatum(shard_no));
|
||||
}
|
||||
PG_CATCH();
|
||||
{
|
||||
MIN_BACKEND_PREFETCH_LSN = InvalidXLogRecPtr;
|
||||
cancel_before_shmem_exit(prefetch_on_exit, Int32GetDatum(shard_no));
|
||||
/* Nothing should cancel disconnect: we should not leave connection in opaque state */
|
||||
HOLD_INTERRUPTS();
|
||||
@@ -1884,7 +1938,7 @@ communicator_init(void)
|
||||
* the check here. That's OK, we don't expect the logic to change in old
|
||||
* releases.
|
||||
*/
|
||||
#if PG_VERSION_NUM>=150000
|
||||
#if PG_MAJORVERSION_NUM >= 15
|
||||
if (MyNeonCounters >= &neon_per_backend_counters_shared[NUM_NEON_PERF_COUNTER_SLOTS])
|
||||
elog(ERROR, "MyNeonCounters points past end of array");
|
||||
#endif
|
||||
@@ -1963,7 +2017,7 @@ neon_prefetch_response_usable(neon_request_lsns *request_lsns,
|
||||
* Each request to the pageserver has three LSN values associated with it:
|
||||
* `not_modified_since`, `request_lsn`, and 'effective_request_lsn'.
|
||||
* `not_modified_since` and `request_lsn` are sent to the pageserver, but
|
||||
* in the primary node, we always use UINT64_MAX as the `request_lsn`, so
|
||||
* in the primary node, we always use InvalidXLogRecPtr as the `request_lsn`, so
|
||||
* we remember `effective_request_lsn` separately. In a primary,
|
||||
* `effective_request_lsn` is the same as `not_modified_since`.
|
||||
* See comments in neon_get_request_lsns why we can not use last flush WAL position here.
|
||||
@@ -2223,6 +2277,7 @@ Retry:
|
||||
case T_NeonGetPageResponse:
|
||||
{
|
||||
NeonGetPageResponse* getpage_resp = (NeonGetPageResponse *) resp;
|
||||
check_page_lsn(getpage_resp);
|
||||
memcpy(buffer, getpage_resp->page, BLCKSZ);
|
||||
|
||||
/*
|
||||
@@ -2420,15 +2475,18 @@ communicator_read_slru_segment(SlruKind kind, int64 segno, neon_request_lsns *re
|
||||
PG_TRY();
|
||||
{
|
||||
before_shmem_exit(prefetch_on_exit, Int32GetDatum(shard_no));
|
||||
MIN_BACKEND_PREFETCH_LSN = request_lsns->request_lsn;
|
||||
do
|
||||
{
|
||||
while (!page_server->send(shard_no, &request.hdr) || !page_server->flush(shard_no));
|
||||
resp = page_server->receive(shard_no);
|
||||
} while (resp == NULL);
|
||||
MIN_BACKEND_PREFETCH_LSN = InvalidXLogRecPtr;
|
||||
cancel_before_shmem_exit(prefetch_on_exit, Int32GetDatum(shard_no));
|
||||
}
|
||||
PG_CATCH();
|
||||
{
|
||||
MIN_BACKEND_PREFETCH_LSN = InvalidXLogRecPtr;
|
||||
cancel_before_shmem_exit(prefetch_on_exit, Int32GetDatum(shard_no));
|
||||
/* Nothing should cancel disconnect: we should not leave connection in opaque state */
|
||||
HOLD_INTERRUPTS();
|
||||
@@ -2573,3 +2631,22 @@ communicator_processinterrupts(void)
|
||||
|
||||
return prev_interrupt_cb();
|
||||
}
|
||||
|
||||
PG_FUNCTION_INFO_V1(neon_communicator_min_inflight_request_lsn);
|
||||
|
||||
Datum
|
||||
neon_communicator_min_inflight_request_lsn(PG_FUNCTION_ARGS)
|
||||
{
|
||||
XLogRecPtr min_lsn = RecoveryInProgress()
|
||||
? GetXLogReplayRecPtr(NULL)
|
||||
: UINT64_MAX;
|
||||
size_t n_procs = ProcGlobal->allProcCount;
|
||||
for (size_t i = 0; i < n_procs; i++)
|
||||
{
|
||||
if (neon_per_backend_counters_shared[i].min_prefetch_lsn != InvalidXLogRecPtr)
|
||||
{
|
||||
min_lsn = Min(min_lsn, neon_per_backend_counters_shared[i].min_prefetch_lsn);
|
||||
}
|
||||
}
|
||||
PG_RETURN_INT64(min_lsn);
|
||||
}
|
||||
|
||||
@@ -156,7 +156,7 @@ typedef struct
|
||||
QTHistogramData query_time_hist;
|
||||
|
||||
/*
|
||||
* Minimal LSBN of infligth prefetch requests
|
||||
* Minimal LSN of infligth prefetch requests
|
||||
*/
|
||||
XLogRecPtr min_prefetch_lsn;
|
||||
} neon_per_backend_counters;
|
||||
|
||||
Reference in New Issue
Block a user