mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-14 11:40:38 +00:00
Compare commits
26 Commits
bodobolero
...
min_inflig
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3fe7f8802a | ||
|
|
7841303df9 | ||
|
|
0e9d3fd2b1 | ||
|
|
b2a87b501f | ||
|
|
06417f2ff9 | ||
|
|
252876515c | ||
|
|
d56a72afec | ||
|
|
3847ab73a7 | ||
|
|
3e5bbe7027 | ||
|
|
546a45f57a | ||
|
|
588cb289d5 | ||
|
|
b947deb07c | ||
|
|
2f455baa73 | ||
|
|
f2e65e1d2c | ||
|
|
df49def453 | ||
|
|
1ef0f71a95 | ||
|
|
96df649858 | ||
|
|
9bfba1b087 | ||
|
|
b41b85f8ec | ||
|
|
32b801ea1c | ||
|
|
89496a32d0 | ||
|
|
9617b8d328 | ||
|
|
a504516b8c | ||
|
|
fe7a4e1ab6 | ||
|
|
40cae8cc36 | ||
|
|
02fc8b7c70 |
@@ -48,6 +48,8 @@ DATA = \
|
||||
neon--1.3--1.4.sql \
|
||||
neon--1.4--1.5.sql \
|
||||
neon--1.5--1.6.sql \
|
||||
neon--1.6--1.7.sql \
|
||||
neon--1.7--1.6.sql \
|
||||
neon--1.6--1.5.sql \
|
||||
neon--1.5--1.4.sql \
|
||||
neon--1.4--1.3.sql \
|
||||
|
||||
@@ -79,10 +79,6 @@
|
||||
#include "access/xlogrecovery.h"
|
||||
#endif
|
||||
|
||||
#if PG_VERSION_NUM < 160000
|
||||
typedef PGAlignedBlock PGIOAlignedBlock;
|
||||
#endif
|
||||
|
||||
#define NEON_PANIC_CONNECTION_STATE(shard_no, elvl, message, ...) \
|
||||
neon_shard_log(shard_no, elvl, "Broken connection state: " message, \
|
||||
##__VA_ARGS__)
|
||||
@@ -264,7 +260,7 @@ typedef struct PrefetchState
|
||||
|
||||
/* the buffers */
|
||||
prfh_hash *prf_hash;
|
||||
int max_shard_no;
|
||||
int max_unflushed_shard_no;
|
||||
/* Mark shards involved in prefetch */
|
||||
uint8 shard_bitmap[(MAX_SHARDS + 7)/8];
|
||||
PrefetchRequest prf_buffer[]; /* prefetch buffers */
|
||||
@@ -304,6 +300,7 @@ static void prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_
|
||||
static bool prefetch_wait_for(uint64 ring_index);
|
||||
static void prefetch_cleanup_trailing_unused(void);
|
||||
static inline void prefetch_set_unused(uint64 ring_index);
|
||||
static bool prefetch_flush_requests(void);
|
||||
|
||||
static bool neon_prefetch_response_usable(neon_request_lsns *request_lsns,
|
||||
PrefetchRequest *slot);
|
||||
@@ -473,13 +470,26 @@ communicator_prefetch_pump_state(void)
|
||||
{
|
||||
START_PREFETCH_RECEIVE_WORK();
|
||||
|
||||
if (MyPState->ring_receive == MyPState->ring_flush && MyPState->ring_flush < MyPState->ring_unused)
|
||||
{
|
||||
/*
|
||||
* Flush request to avoid requests pending for arbitrary long time,
|
||||
* pinning LSN and holding GC at PS.
|
||||
*/
|
||||
if (!prefetch_flush_requests())
|
||||
{
|
||||
END_PREFETCH_RECEIVE_WORK();
|
||||
return;
|
||||
}
|
||||
}
|
||||
while (MyPState->ring_receive != MyPState->ring_flush)
|
||||
{
|
||||
NeonResponse *response;
|
||||
PrefetchRequest *slot;
|
||||
MemoryContext old;
|
||||
uint64 my_ring_index = MyPState->ring_receive;
|
||||
|
||||
slot = GetPrfSlot(MyPState->ring_receive);
|
||||
slot = GetPrfSlot(my_ring_index);
|
||||
|
||||
old = MemoryContextSwitchTo(MyPState->errctx);
|
||||
response = page_server->try_receive(slot->shard_no);
|
||||
@@ -493,12 +503,12 @@ communicator_prefetch_pump_state(void)
|
||||
/* The slot should still be valid */
|
||||
if (slot->status != PRFS_REQUESTED ||
|
||||
slot->response != NULL ||
|
||||
slot->my_ring_index != MyPState->ring_receive)
|
||||
slot->my_ring_index != my_ring_index)
|
||||
{
|
||||
neon_shard_log(slot->shard_no, PANIC,
|
||||
"Incorrect prefetch slot state after receive: status=%d response=%p my=" UINT64_FORMAT " receive=" UINT64_FORMAT "",
|
||||
slot->status, slot->response,
|
||||
slot->my_ring_index, MyPState->ring_receive);
|
||||
slot->my_ring_index, my_ring_index);
|
||||
}
|
||||
/* update prefetch state */
|
||||
MyPState->n_responses_buffered += 1;
|
||||
@@ -526,6 +536,19 @@ communicator_prefetch_pump_state(void)
|
||||
|
||||
END_PREFETCH_RECEIVE_WORK();
|
||||
|
||||
if (RecoveryInProgress())
|
||||
{
|
||||
/*
|
||||
* Update backend's min in-flight prefetch LSN.
|
||||
*/
|
||||
XLogRecPtr min_backend_prefetch_lsn = last_replay_lsn != InvalidXLogRecPtr ? last_replay_lsn : GetXLogReplayRecPtr(NULL);
|
||||
for (uint64_t ring_index = MyPState->ring_receive; ring_index < MyPState->ring_unused; ring_index++)
|
||||
{
|
||||
PrefetchRequest* slot = GetPrfSlot(ring_index);
|
||||
min_backend_prefetch_lsn = Min(slot->request_lsns.request_lsn, min_backend_prefetch_lsn);
|
||||
}
|
||||
MIN_BACKEND_REQUEST_LSN = min_backend_prefetch_lsn;
|
||||
}
|
||||
communicator_reconfigure_timeout_if_needed();
|
||||
}
|
||||
|
||||
@@ -565,7 +588,7 @@ readahead_buffer_resize(int newsize, void *extra)
|
||||
newPState->ring_last = newsize;
|
||||
newPState->ring_unused = newsize;
|
||||
newPState->ring_receive = newsize;
|
||||
newPState->max_shard_no = MyPState->max_shard_no;
|
||||
newPState->max_unflushed_shard_no = MyPState->max_unflushed_shard_no;
|
||||
memcpy(newPState->shard_bitmap, MyPState->shard_bitmap, sizeof(MyPState->shard_bitmap));
|
||||
|
||||
/*
|
||||
@@ -665,6 +688,7 @@ consume_prefetch_responses(void)
|
||||
{
|
||||
if (MyPState->ring_receive < MyPState->ring_unused)
|
||||
prefetch_wait_for(MyPState->ring_unused - 1);
|
||||
|
||||
/*
|
||||
* We know for sure we're not working on any prefetch pages after
|
||||
* this.
|
||||
@@ -694,7 +718,7 @@ prefetch_cleanup_trailing_unused(void)
|
||||
static bool
|
||||
prefetch_flush_requests(void)
|
||||
{
|
||||
for (shardno_t shard_no = 0; shard_no < MyPState->max_shard_no; shard_no++)
|
||||
for (shardno_t shard_no = 0; shard_no < MyPState->max_unflushed_shard_no; shard_no++)
|
||||
{
|
||||
if (BITMAP_ISSET(MyPState->shard_bitmap, shard_no))
|
||||
{
|
||||
@@ -703,7 +727,8 @@ prefetch_flush_requests(void)
|
||||
BITMAP_CLR(MyPState->shard_bitmap, shard_no);
|
||||
}
|
||||
}
|
||||
MyPState->max_shard_no = 0;
|
||||
MyPState->max_unflushed_shard_no = 0;
|
||||
MyPState->ring_flush = MyPState->ring_unused;
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -727,7 +752,6 @@ prefetch_wait_for(uint64 ring_index)
|
||||
{
|
||||
if (!prefetch_flush_requests())
|
||||
return false;
|
||||
MyPState->ring_flush = MyPState->ring_unused;
|
||||
}
|
||||
|
||||
Assert(MyPState->ring_unused > ring_index);
|
||||
@@ -806,6 +830,7 @@ prefetch_read(PrefetchRequest *slot)
|
||||
old = MemoryContextSwitchTo(MyPState->errctx);
|
||||
response = (NeonResponse *) page_server->receive(shard_no);
|
||||
MemoryContextSwitchTo(old);
|
||||
|
||||
if (response)
|
||||
{
|
||||
check_getpage_response(slot, response);
|
||||
@@ -1014,11 +1039,16 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns
|
||||
Assert(mySlotNo == MyPState->ring_unused);
|
||||
|
||||
if (force_request_lsns)
|
||||
{
|
||||
slot->request_lsns = *force_request_lsns;
|
||||
}
|
||||
else
|
||||
{
|
||||
neon_get_request_lsns(BufTagGetNRelFileInfo(slot->buftag),
|
||||
slot->buftag.forkNum, slot->buftag.blockNum,
|
||||
&slot->request_lsns, 1);
|
||||
last_replay_lsn = InvalidXLogRecPtr;
|
||||
}
|
||||
request.hdr.lsn = slot->request_lsns.request_lsn;
|
||||
request.hdr.not_modified_since = slot->request_lsns.not_modified_since;
|
||||
|
||||
@@ -1037,7 +1067,7 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns
|
||||
MyPState->n_unused -= 1;
|
||||
MyPState->ring_unused += 1;
|
||||
BITMAP_SET(MyPState->shard_bitmap, slot->shard_no);
|
||||
MyPState->max_shard_no = Max(slot->shard_no+1, MyPState->max_shard_no);
|
||||
MyPState->max_unflushed_shard_no = Max(slot->shard_no+1, MyPState->max_unflushed_shard_no);
|
||||
|
||||
/* update slot state */
|
||||
slot->status = PRFS_REQUESTED;
|
||||
@@ -1045,6 +1075,25 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns
|
||||
Assert(!found);
|
||||
}
|
||||
|
||||
/*
|
||||
* Check that returned page LSN is consistent with request lsns
|
||||
*/
|
||||
static void
|
||||
check_page_lsn(NeonGetPageResponse* resp)
|
||||
{
|
||||
if (neon_protocol_version < 3) /* no information to check */
|
||||
return;
|
||||
if (PageGetLSN(resp->page) > resp->req.hdr.not_modified_since)
|
||||
neon_log(PANIC, "Invalid getpage response version: %X/%08X is higher than last modified LSN %X/%08X",
|
||||
LSN_FORMAT_ARGS(PageGetLSN(resp->page)),
|
||||
LSN_FORMAT_ARGS(resp->req.hdr.not_modified_since));
|
||||
|
||||
if (PageGetLSN(resp->page) > resp->req.hdr.lsn)
|
||||
neon_log(PANIC, "Invalid getpage response version: %X/%08X is higher than request LSN %X/%08X",
|
||||
LSN_FORMAT_ARGS(PageGetLSN(resp->page)),
|
||||
LSN_FORMAT_ARGS(resp->req.hdr.lsn));
|
||||
}
|
||||
|
||||
/*
|
||||
* Lookup of already received prefetch requests. Only already received responses matching required LSNs are accepted.
|
||||
* Present pages are marked in "mask" bitmap and total number of such pages is returned.
|
||||
@@ -1068,7 +1117,7 @@ communicator_prefetch_lookupv(NRelFileInfo rinfo, ForkNumber forknum, BlockNumbe
|
||||
for (int i = 0; i < nblocks; i++)
|
||||
{
|
||||
PrfHashEntry *entry;
|
||||
|
||||
NeonGetPageResponse* resp;
|
||||
hashkey.buftag.blockNum = blocknum + i;
|
||||
entry = prfh_lookup(MyPState->prf_hash, &hashkey);
|
||||
|
||||
@@ -1101,8 +1150,9 @@ communicator_prefetch_lookupv(NRelFileInfo rinfo, ForkNumber forknum, BlockNumbe
|
||||
continue;
|
||||
}
|
||||
Assert(slot->response->tag == T_NeonGetPageResponse); /* checked by check_getpage_response when response was assigned to the slot */
|
||||
memcpy(buffers[i], ((NeonGetPageResponse*)slot->response)->page, BLCKSZ);
|
||||
|
||||
resp = (NeonGetPageResponse*)slot->response;
|
||||
check_page_lsn(resp);
|
||||
memcpy(buffers[i], resp->page, BLCKSZ);
|
||||
|
||||
/*
|
||||
* With lfc_store_prefetch_result=true prefetch result is stored in LFC in prefetch_pump_state when response is received
|
||||
@@ -1395,7 +1445,6 @@ Retry:
|
||||
*/
|
||||
goto Retry;
|
||||
}
|
||||
MyPState->ring_flush = MyPState->ring_unused;
|
||||
}
|
||||
|
||||
return last_ring_index;
|
||||
@@ -1465,10 +1514,12 @@ page_server_request(void const *req)
|
||||
MyNeonCounters->pageserver_open_requests--;
|
||||
} while (resp == NULL);
|
||||
cancel_before_shmem_exit(prefetch_on_exit, Int32GetDatum(shard_no));
|
||||
last_replay_lsn = InvalidXLogRecPtr;
|
||||
}
|
||||
PG_CATCH();
|
||||
{
|
||||
cancel_before_shmem_exit(prefetch_on_exit, Int32GetDatum(shard_no));
|
||||
last_replay_lsn = InvalidXLogRecPtr;
|
||||
/* Nothing should cancel disconnect: we should not leave connection in opaque state */
|
||||
HOLD_INTERRUPTS();
|
||||
page_server->disconnect(shard_no);
|
||||
@@ -1868,6 +1919,13 @@ nm_to_string(NeonMessage *msg)
|
||||
return s.data;
|
||||
}
|
||||
|
||||
static void
|
||||
reset_min_request_lsn(int code, Datum arg)
|
||||
{
|
||||
if (MyProcNumber != -1)
|
||||
MIN_BACKEND_REQUEST_LSN = InvalidXLogRecPtr;
|
||||
}
|
||||
|
||||
/*
|
||||
* communicator_init() -- Initialize per-backend private state
|
||||
*/
|
||||
@@ -1879,6 +1937,8 @@ communicator_init(void)
|
||||
if (MyPState != NULL)
|
||||
return;
|
||||
|
||||
before_shmem_exit(reset_min_request_lsn, 0);
|
||||
|
||||
/*
|
||||
* Sanity check that theperf counters array is sized correctly. We got
|
||||
* this wrong once, and the formula for max number of backends and aux
|
||||
@@ -1888,7 +1948,7 @@ communicator_init(void)
|
||||
* the check here. That's OK, we don't expect the logic to change in old
|
||||
* releases.
|
||||
*/
|
||||
#if PG_VERSION_NUM>=150000
|
||||
#if PG_MAJORVERSION_NUM >= 15
|
||||
if (MyNeonCounters >= &neon_per_backend_counters_shared[NUM_NEON_PERF_COUNTER_SLOTS])
|
||||
elog(ERROR, "MyNeonCounters points past end of array");
|
||||
#endif
|
||||
@@ -2227,6 +2287,7 @@ Retry:
|
||||
case T_NeonGetPageResponse:
|
||||
{
|
||||
NeonGetPageResponse* getpage_resp = (NeonGetPageResponse *) resp;
|
||||
check_page_lsn(getpage_resp);
|
||||
memcpy(buffer, getpage_resp->page, BLCKSZ);
|
||||
|
||||
/*
|
||||
@@ -2503,12 +2564,30 @@ communicator_reconfigure_timeout_if_needed(void)
|
||||
!AmPrewarmWorker && /* do not pump prefetch state in prewarm worker */
|
||||
readahead_getpage_pull_timeout_ms > 0;
|
||||
|
||||
if (!needs_set && MIN_BACKEND_REQUEST_LSN != InvalidXLogRecPtr)
|
||||
{
|
||||
if (last_replay_lsn == InvalidXLogRecPtr)
|
||||
MIN_BACKEND_REQUEST_LSN = InvalidXLogRecPtr;
|
||||
else
|
||||
needs_set = true; /* Can not reset MIN_BACKEND_REQUEST_LSN now, have to do it later */
|
||||
}
|
||||
if (needs_set != timeout_set)
|
||||
{
|
||||
/* The background writer doens't (shouldn't) read any pages */
|
||||
Assert(!AmBackgroundWriterProcess());
|
||||
/* The checkpointer doens't (shouldn't) read any pages */
|
||||
Assert(!AmCheckpointerProcess());
|
||||
/*
|
||||
* The background writer/checkpointer doens't (shouldn't) read any pages.
|
||||
* And definitely they should not run on replica.
|
||||
* The only case when we can get here is replica promotion.
|
||||
*/
|
||||
if (AmBackgroundWriterProcess() || AmCheckpointerProcess())
|
||||
{
|
||||
MIN_BACKEND_REQUEST_LSN = InvalidXLogRecPtr;
|
||||
if (timeout_set)
|
||||
{
|
||||
disable_timeout(PS_TIMEOUT_ID, false);
|
||||
timeout_set = false;
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (unlikely(PS_TIMEOUT_ID == 0))
|
||||
{
|
||||
@@ -2541,14 +2620,6 @@ communicator_reconfigure_timeout_if_needed(void)
|
||||
static void
|
||||
pagestore_timeout_handler(void)
|
||||
{
|
||||
#if PG_MAJORVERSION_NUM <= 14
|
||||
/*
|
||||
* PG14: Setting a repeating timeout is not possible, so we signal here
|
||||
* that the timeout has already been reset, and by telling the system
|
||||
* that system will re-schedule it later if we need to.
|
||||
*/
|
||||
timeout_set = false;
|
||||
#endif
|
||||
timeout_signaled = true;
|
||||
InterruptPending = true;
|
||||
}
|
||||
@@ -2568,6 +2639,14 @@ communicator_processinterrupts(void)
|
||||
if (!readpage_reentrant_guard && readahead_getpage_pull_timeout_ms > 0)
|
||||
communicator_prefetch_pump_state();
|
||||
|
||||
#if PG_MAJORVERSION_NUM <= 14
|
||||
/*
|
||||
* PG14: Setting a repeating timeout is not possible, so we signal here
|
||||
* that the timeout has already been reset, and by telling the system
|
||||
* that system will re-schedule it later if we need to.
|
||||
*/
|
||||
timeout_set = false;
|
||||
#endif
|
||||
timeout_signaled = false;
|
||||
communicator_reconfigure_timeout_if_needed();
|
||||
}
|
||||
@@ -2577,3 +2656,28 @@ communicator_processinterrupts(void)
|
||||
|
||||
return prev_interrupt_cb();
|
||||
}
|
||||
|
||||
PG_FUNCTION_INFO_V1(neon_communicator_min_inflight_request_lsn);
|
||||
|
||||
Datum
|
||||
neon_communicator_min_inflight_request_lsn(PG_FUNCTION_ARGS)
|
||||
{
|
||||
if (RecoveryInProgress())
|
||||
{
|
||||
/* Do not hold GC for primary */
|
||||
PG_RETURN_INT64(UINT64_MAX);
|
||||
}
|
||||
else
|
||||
{
|
||||
XLogRecPtr min_lsn = GetXLogReplayRecPtr(NULL);
|
||||
size_t n_procs = ProcGlobal->allProcCount;
|
||||
for (size_t i = 0; i < n_procs; i++)
|
||||
{
|
||||
if (neon_per_backend_counters_shared[i].min_request_lsn != InvalidXLogRecPtr)
|
||||
{
|
||||
min_lsn = Min(min_lsn, neon_per_backend_counters_shared[i].min_request_lsn);
|
||||
}
|
||||
}
|
||||
PG_RETURN_INT64(min_lsn);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -635,6 +635,11 @@ lfc_init(void)
|
||||
NULL);
|
||||
}
|
||||
|
||||
/*
|
||||
* Dump a list of pages that are currently in the LFC
|
||||
*
|
||||
* This is used to get a snapshot that can be used to prewarm the LFC later.
|
||||
*/
|
||||
FileCacheState*
|
||||
lfc_get_state(size_t max_entries)
|
||||
{
|
||||
@@ -2267,4 +2272,3 @@ get_prewarm_info(PG_FUNCTION_ARGS)
|
||||
|
||||
PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
|
||||
}
|
||||
|
||||
|
||||
3
pgxn/neon/neon--1.6--1.7.sql
Normal file
3
pgxn/neon/neon--1.6--1.7.sql
Normal file
@@ -0,0 +1,3 @@
|
||||
create function neon_communicator_min_inflight_request_lsn() returns pg_catalog.pg_lsn
|
||||
AS 'MODULE_PATHNAME', 'neon_communicator_min_inflight_request_lsn'
|
||||
LANGUAGE C;
|
||||
1
pgxn/neon/neon--1.7--1.6.sql
Normal file
1
pgxn/neon/neon--1.7--1.6.sql
Normal file
@@ -0,0 +1 @@
|
||||
drop function neon_communicator_min_inflight_request_lsn();
|
||||
@@ -1,7 +1,7 @@
|
||||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* neon.c
|
||||
* Main entry point into the neon exension
|
||||
* Main entry point into the neon extension
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
@@ -508,7 +508,7 @@ _PG_init(void)
|
||||
|
||||
DefineCustomBoolVariable(
|
||||
"neon.disable_logical_replication_subscribers",
|
||||
"Disables incomming logical replication",
|
||||
"Disable incoming logical replication",
|
||||
NULL,
|
||||
&disable_logical_replication_subscribers,
|
||||
false,
|
||||
@@ -567,7 +567,7 @@ _PG_init(void)
|
||||
|
||||
DefineCustomEnumVariable(
|
||||
"neon.debug_compare_local",
|
||||
"Debug mode for compaing content of pages in prefetch ring/LFC/PS and local disk",
|
||||
"Debug mode for comparing content of pages in prefetch ring/LFC/PS and local disk",
|
||||
NULL,
|
||||
&debug_compare_local,
|
||||
DEBUG_COMPARE_LOCAL_NONE,
|
||||
@@ -735,7 +735,6 @@ neon_shmem_request_hook(void)
|
||||
static void
|
||||
neon_shmem_startup_hook(void)
|
||||
{
|
||||
/* Initialize */
|
||||
if (prev_shmem_startup_hook)
|
||||
prev_shmem_startup_hook();
|
||||
|
||||
|
||||
@@ -42,7 +42,6 @@ NeonPerfCountersShmemRequest(void)
|
||||
}
|
||||
|
||||
|
||||
|
||||
void
|
||||
NeonPerfCountersShmemInit(void)
|
||||
{
|
||||
|
||||
@@ -154,6 +154,11 @@ typedef struct
|
||||
* Histogram of query execution time.
|
||||
*/
|
||||
QTHistogramData query_time_hist;
|
||||
|
||||
/*
|
||||
* Minimal LSN of in-fligth request requests
|
||||
*/
|
||||
XLogRecPtr min_request_lsn;
|
||||
} neon_per_backend_counters;
|
||||
|
||||
/* Pointer to the shared memory array of neon_per_backend_counters structs */
|
||||
@@ -167,11 +172,13 @@ extern neon_per_backend_counters *neon_per_backend_counters_shared;
|
||||
*/
|
||||
#define NUM_NEON_PERF_COUNTER_SLOTS (MaxBackends + NUM_AUXILIARY_PROCS)
|
||||
|
||||
#if PG_VERSION_NUM >= 170000
|
||||
#define MyNeonCounters (&neon_per_backend_counters_shared[MyProcNumber])
|
||||
#else
|
||||
#define MyNeonCounters (&neon_per_backend_counters_shared[MyProc->pgprocno])
|
||||
#endif
|
||||
|
||||
/*
|
||||
* Backend-local minimal in-flight request LSN.
|
||||
* We store it in neon_per_backend_counters_shared and not in separate array to minimize false cache sharing
|
||||
*/
|
||||
#define MIN_BACKEND_REQUEST_LSN MyNeonCounters->min_request_lsn
|
||||
|
||||
extern void inc_getpage_wait(uint64 latency);
|
||||
extern void inc_page_cache_read_wait(uint64 latency);
|
||||
|
||||
@@ -9,6 +9,10 @@
|
||||
#include "fmgr.h"
|
||||
#include "storage/buf_internals.h"
|
||||
|
||||
#if PG_MAJORVERSION_NUM < 16
|
||||
typedef PGAlignedBlock PGIOAlignedBlock;
|
||||
#endif
|
||||
|
||||
#if PG_MAJORVERSION_NUM < 17
|
||||
#define NRelFileInfoBackendIsTemp(rinfo) (rinfo.backend != InvalidBackendId)
|
||||
#else
|
||||
@@ -158,6 +162,10 @@ InitBufferTag(BufferTag *tag, const RelFileNode *rnode,
|
||||
#define AmAutoVacuumWorkerProcess() (IsAutoVacuumWorkerProcess())
|
||||
#endif
|
||||
|
||||
#if PG_MAJORVERSION_NUM < 17
|
||||
#define MyProcNumber (MyProc - &ProcGlobal->allProcs[0])
|
||||
#endif
|
||||
|
||||
#if PG_MAJORVERSION_NUM < 15
|
||||
extern void InitMaterializedSRF(FunctionCallInfo fcinfo, bits32 flags);
|
||||
extern TimeLineID GetWALInsertionTimeLine(void);
|
||||
|
||||
@@ -243,6 +243,7 @@ extern char *neon_timeline;
|
||||
extern char *neon_tenant;
|
||||
extern int32 max_cluster_size;
|
||||
extern int neon_protocol_version;
|
||||
extern XLogRecPtr last_replay_lsn;
|
||||
|
||||
extern shardno_t get_shard_number(BufferTag* tag);
|
||||
|
||||
|
||||
@@ -72,10 +72,6 @@
|
||||
#include "access/xlogrecovery.h"
|
||||
#endif
|
||||
|
||||
#if PG_VERSION_NUM < 160000
|
||||
typedef PGAlignedBlock PGIOAlignedBlock;
|
||||
#endif
|
||||
|
||||
#include "access/nbtree.h"
|
||||
#include "storage/bufpage.h"
|
||||
#include "access/xlog_internal.h"
|
||||
@@ -100,6 +96,8 @@ typedef enum
|
||||
|
||||
int debug_compare_local;
|
||||
|
||||
XLogRecPtr last_replay_lsn;
|
||||
|
||||
static NRelFileInfo unlogged_build_rel_info;
|
||||
static UnloggedBuildPhase unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS;
|
||||
|
||||
@@ -163,7 +161,7 @@ log_newpages_copy(NRelFileInfo * rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
page_std);
|
||||
}
|
||||
|
||||
return ProcLastRecPtr;
|
||||
return GetXLogInsertRecPtr();
|
||||
}
|
||||
#endif /* PG_MAJORVERSION_NUM >= 17 */
|
||||
|
||||
@@ -592,6 +590,17 @@ neon_get_request_lsns(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
|
||||
/* Request the page at the end of the last fully replayed LSN. */
|
||||
XLogRecPtr replay_lsn = GetXLogReplayRecPtr(NULL);
|
||||
|
||||
if (MIN_BACKEND_REQUEST_LSN == InvalidXLogRecPtr)
|
||||
{
|
||||
/* mark the backend's replay_lsn as "we have a request ongoing", blocking the expiration of any current LSN */
|
||||
MIN_BACKEND_REQUEST_LSN = replay_lsn;
|
||||
/* make sure memory operations are in correct order, even in concurrent systems */
|
||||
pg_memory_barrier();
|
||||
/* get the current LSN to register */
|
||||
replay_lsn = GetXLogReplayRecPtr(NULL);
|
||||
MIN_BACKEND_REQUEST_LSN = replay_lsn;
|
||||
}
|
||||
last_replay_lsn = replay_lsn;
|
||||
for (int i = 0; i < nblocks; i++)
|
||||
{
|
||||
neon_request_lsns *result = &output[i];
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
#include "neon.h"
|
||||
#include "neon_pgversioncompat.h"
|
||||
|
||||
#include "miscadmin.h"
|
||||
#include "pagestore_client.h"
|
||||
#include RELFILEINFO_HDR
|
||||
#include "storage/smgr.h"
|
||||
@@ -23,10 +24,6 @@
|
||||
#include "utils/dynahash.h"
|
||||
#include "utils/guc.h"
|
||||
|
||||
#if PG_VERSION_NUM >= 150000
|
||||
#include "miscadmin.h"
|
||||
#endif
|
||||
|
||||
typedef struct
|
||||
{
|
||||
NRelFileInfo rinfo;
|
||||
|
||||
@@ -55,7 +55,7 @@ def test_neon_extension_compatibility(neon_env_builder: NeonEnvBuilder):
|
||||
# Ensure that the default version is also updated in the neon.control file
|
||||
assert cur.fetchone() == ("1.6",)
|
||||
cur.execute("SELECT * from neon.NEON_STAT_FILE_CACHE")
|
||||
all_versions = ["1.6", "1.5", "1.4", "1.3", "1.2", "1.1", "1.0"]
|
||||
all_versions = ["1.7", "1.6", "1.5", "1.4", "1.3", "1.2", "1.1", "1.0"]
|
||||
current_version = "1.6"
|
||||
for idx, begin_version in enumerate(all_versions):
|
||||
for target_version in all_versions[idx + 1 :]:
|
||||
|
||||
@@ -298,15 +298,26 @@ def test_pageserver_metrics_removed_after_detach(neon_env_builder: NeonEnvBuilde
|
||||
assert post_detach_samples == set()
|
||||
|
||||
|
||||
def test_pageserver_metrics_removed_after_offload(neon_env_builder: NeonEnvBuilder):
|
||||
@pytest.mark.parametrize("compaction", ["compaction_enabled", "compaction_disabled"])
|
||||
def test_pageserver_metrics_removed_after_offload(
|
||||
neon_env_builder: NeonEnvBuilder, compaction: str
|
||||
):
|
||||
"""Tests that when a timeline is offloaded, the tenant specific metrics are not left behind"""
|
||||
|
||||
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.MOCK_S3)
|
||||
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
tenant_1, _ = env.create_tenant()
|
||||
tenant_1, _ = env.create_tenant(
|
||||
conf={
|
||||
# disable background compaction and GC so that we don't have leftover tasks
|
||||
# after offloading.
|
||||
"gc_period": "0s",
|
||||
"compaction_period": "0s",
|
||||
}
|
||||
if compaction == "compaction_disabled"
|
||||
else None
|
||||
)
|
||||
|
||||
timeline_1 = env.create_timeline("test_metrics_removed_after_offload_1", tenant_id=tenant_1)
|
||||
timeline_2 = env.create_timeline("test_metrics_removed_after_offload_2", tenant_id=tenant_1)
|
||||
@@ -351,6 +362,23 @@ def test_pageserver_metrics_removed_after_offload(neon_env_builder: NeonEnvBuild
|
||||
state=TimelineArchivalState.ARCHIVED,
|
||||
)
|
||||
env.pageserver.http_client().timeline_offload(tenant_1, timeline)
|
||||
# We need to wait until all background jobs are finished before we can check the metrics.
|
||||
# There're many of them: compaction, GC, etc.
|
||||
wait_until(
|
||||
lambda: all(
|
||||
sample.value == 0
|
||||
for sample in env.pageserver.http_client()
|
||||
.get_metrics()
|
||||
.query_all("pageserver_background_loop_semaphore_waiting_tasks")
|
||||
)
|
||||
and all(
|
||||
sample.value == 0
|
||||
for sample in env.pageserver.http_client()
|
||||
.get_metrics()
|
||||
.query_all("pageserver_background_loop_semaphore_running_tasks")
|
||||
)
|
||||
)
|
||||
|
||||
post_offload_samples = set(
|
||||
[x.name for x in get_ps_metric_samples_for_timeline(tenant_1, timeline)]
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user