Compare commits

...

26 Commits

Author SHA1 Message Date
Konstantin Knizhnik
3fe7f8802a Register reset_min_request_lsn using before_shmem_exit to make it be called before ProcKill which cleans MyProc 2025-07-28 21:59:35 +03:00
Konstantin Knizhnik
7841303df9 Cleanup perf_counters on backend exit 2025-07-28 21:04:15 +03:00
Konstantin Knizhnik
0e9d3fd2b1 Cleanup perf_counters on backend exit 2025-07-28 21:04:15 +03:00
Konstantin Knizhnik
b2a87b501f Fix cleanup of min_request_lsn on backend exit 2025-07-28 21:04:13 +03:00
Konstantin Knizhnik
06417f2ff9 Some minor refactoring addressing review comments 2025-07-28 21:03:50 +03:00
Konstantin Knizhnik
252876515c Rewrite min-request-lsn reset mechanism on backend exit 2025-07-28 21:03:49 +03:00
Konstantin Knizhnik
d56a72afec Reset backend's perf cpounters on exit 2025-07-28 21:03:09 +03:00
Konstantin Knizhnik
3847ab73a7 Cleanup perf_counters on backend exit 2025-07-28 21:02:14 +03:00
Kosntantin Knizhnik
3e5bbe7027 Address review comments 2025-07-28 21:02:14 +03:00
Kosntantin Knizhnik
546a45f57a Do flush only iof there are no in-flight prefetch requests 2025-07-28 21:02:14 +03:00
Kosntantin Knizhnik
588cb289d5 Do flush only iof there are no in-glight prefetch requests 2025-07-28 21:02:14 +03:00
Kosntantin Knizhnik
b947deb07c Flush requests in prefetch+_pump_state 2025-07-28 21:02:14 +03:00
Kosntantin Knizhnik
2f455baa73 Correctly handle communicator_reconfigure_timeout in case of replica promotion 2025-07-28 21:02:14 +03:00
Kosntantin Knizhnik
f2e65e1d2c Remove assert checks from communicator_reconfigure_timeout_if_needed 2025-07-28 21:02:14 +03:00
Kosntantin Knizhnik
df49def453 Add assert check thsat timeout is registered 2025-07-28 21:02:14 +03:00
Kosntantin Knizhnik
1ef0f71a95 Always configure prefetch timeout at replicas 2025-07-28 21:02:14 +03:00
Konstantin Knizhnik
96df649858 Update pgxn/neon/pagestore_smgr.c
Co-authored-by: Matthias van de Meent <matthias@neon.tech>
2025-07-28 21:02:14 +03:00
Kosntantin Knizhnik
9bfba1b087 Implement new apporach of calculating min in-flight LSN in prefetch_pump_state 2025-07-28 21:02:14 +03:00
Kosntantin Knizhnik
b41b85f8ec Perform page LSN check only for v3 version of protocol 2025-07-28 21:02:14 +03:00
Kosntantin Knizhnik
32b801ea1c Fix mistypings 2025-07-28 21:02:14 +03:00
Kosntantin Knizhnik
89496a32d0 Return end record LSN in log_newpages_copy 2025-07-28 21:02:14 +03:00
Kosntantin Knizhnik
9617b8d328 Update makefile 2025-07-28 21:02:14 +03:00
Kosntantin Knizhnik
a504516b8c Maintain min in-flight prefetch request LSN 2025-07-28 21:02:14 +03:00
Alex Chi Z.
fe7a4e1ab6 fix(test): wait compaction in timeline offload test (#12673)
## Problem

close LKB-753. `test_pageserver_metrics_removed_after_offload` is
unstable and it sometimes leave the metrics behind after tenant
offloading. It turns out that we triggered an image compaction before
the offload and the job was stopped after the offload request was
completed.

## Summary of changes

Wait all background tasks to finish before checking the metrics.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-07-28 16:27:55 +00:00
Heikki Linnakangas
40cae8cc36 Fix misc typos and some cosmetic code cleanup (#12695) 2025-07-28 16:21:35 +00:00
Heikki Linnakangas
02fc8b7c70 Add compatibility macros for MyProcNumber and PGIOAlignedBlock (#12715)
There were a few uses of these already, so collect them to the
compatibility header to avoid the repetition and scattered #ifdefs.

The definition of MyProcNumber is a little different from what was used
before, but the end result is the same. (PGPROC->pgprocno values were
just assigned sequentially to all PGPROC array members, see
InitProcGlobal(). That's a bit silly, which is why it was removed in
v17.)
2025-07-28 15:05:36 +00:00
14 changed files with 215 additions and 53 deletions

View File

@@ -48,6 +48,8 @@ DATA = \
neon--1.3--1.4.sql \
neon--1.4--1.5.sql \
neon--1.5--1.6.sql \
neon--1.6--1.7.sql \
neon--1.7--1.6.sql \
neon--1.6--1.5.sql \
neon--1.5--1.4.sql \
neon--1.4--1.3.sql \

View File

@@ -79,10 +79,6 @@
#include "access/xlogrecovery.h"
#endif
#if PG_VERSION_NUM < 160000
typedef PGAlignedBlock PGIOAlignedBlock;
#endif
#define NEON_PANIC_CONNECTION_STATE(shard_no, elvl, message, ...) \
neon_shard_log(shard_no, elvl, "Broken connection state: " message, \
##__VA_ARGS__)
@@ -264,7 +260,7 @@ typedef struct PrefetchState
/* the buffers */
prfh_hash *prf_hash;
int max_shard_no;
int max_unflushed_shard_no;
/* Mark shards involved in prefetch */
uint8 shard_bitmap[(MAX_SHARDS + 7)/8];
PrefetchRequest prf_buffer[]; /* prefetch buffers */
@@ -304,6 +300,7 @@ static void prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_
static bool prefetch_wait_for(uint64 ring_index);
static void prefetch_cleanup_trailing_unused(void);
static inline void prefetch_set_unused(uint64 ring_index);
static bool prefetch_flush_requests(void);
static bool neon_prefetch_response_usable(neon_request_lsns *request_lsns,
PrefetchRequest *slot);
@@ -473,13 +470,26 @@ communicator_prefetch_pump_state(void)
{
START_PREFETCH_RECEIVE_WORK();
if (MyPState->ring_receive == MyPState->ring_flush && MyPState->ring_flush < MyPState->ring_unused)
{
/*
* Flush request to avoid requests pending for arbitrary long time,
* pinning LSN and holding GC at PS.
*/
if (!prefetch_flush_requests())
{
END_PREFETCH_RECEIVE_WORK();
return;
}
}
while (MyPState->ring_receive != MyPState->ring_flush)
{
NeonResponse *response;
PrefetchRequest *slot;
MemoryContext old;
uint64 my_ring_index = MyPState->ring_receive;
slot = GetPrfSlot(MyPState->ring_receive);
slot = GetPrfSlot(my_ring_index);
old = MemoryContextSwitchTo(MyPState->errctx);
response = page_server->try_receive(slot->shard_no);
@@ -493,12 +503,12 @@ communicator_prefetch_pump_state(void)
/* The slot should still be valid */
if (slot->status != PRFS_REQUESTED ||
slot->response != NULL ||
slot->my_ring_index != MyPState->ring_receive)
slot->my_ring_index != my_ring_index)
{
neon_shard_log(slot->shard_no, PANIC,
"Incorrect prefetch slot state after receive: status=%d response=%p my=" UINT64_FORMAT " receive=" UINT64_FORMAT "",
slot->status, slot->response,
slot->my_ring_index, MyPState->ring_receive);
slot->my_ring_index, my_ring_index);
}
/* update prefetch state */
MyPState->n_responses_buffered += 1;
@@ -526,6 +536,19 @@ communicator_prefetch_pump_state(void)
END_PREFETCH_RECEIVE_WORK();
if (RecoveryInProgress())
{
/*
* Update backend's min in-flight prefetch LSN.
*/
XLogRecPtr min_backend_prefetch_lsn = last_replay_lsn != InvalidXLogRecPtr ? last_replay_lsn : GetXLogReplayRecPtr(NULL);
for (uint64_t ring_index = MyPState->ring_receive; ring_index < MyPState->ring_unused; ring_index++)
{
PrefetchRequest* slot = GetPrfSlot(ring_index);
min_backend_prefetch_lsn = Min(slot->request_lsns.request_lsn, min_backend_prefetch_lsn);
}
MIN_BACKEND_REQUEST_LSN = min_backend_prefetch_lsn;
}
communicator_reconfigure_timeout_if_needed();
}
@@ -565,7 +588,7 @@ readahead_buffer_resize(int newsize, void *extra)
newPState->ring_last = newsize;
newPState->ring_unused = newsize;
newPState->ring_receive = newsize;
newPState->max_shard_no = MyPState->max_shard_no;
newPState->max_unflushed_shard_no = MyPState->max_unflushed_shard_no;
memcpy(newPState->shard_bitmap, MyPState->shard_bitmap, sizeof(MyPState->shard_bitmap));
/*
@@ -665,6 +688,7 @@ consume_prefetch_responses(void)
{
if (MyPState->ring_receive < MyPState->ring_unused)
prefetch_wait_for(MyPState->ring_unused - 1);
/*
* We know for sure we're not working on any prefetch pages after
* this.
@@ -694,7 +718,7 @@ prefetch_cleanup_trailing_unused(void)
static bool
prefetch_flush_requests(void)
{
for (shardno_t shard_no = 0; shard_no < MyPState->max_shard_no; shard_no++)
for (shardno_t shard_no = 0; shard_no < MyPState->max_unflushed_shard_no; shard_no++)
{
if (BITMAP_ISSET(MyPState->shard_bitmap, shard_no))
{
@@ -703,7 +727,8 @@ prefetch_flush_requests(void)
BITMAP_CLR(MyPState->shard_bitmap, shard_no);
}
}
MyPState->max_shard_no = 0;
MyPState->max_unflushed_shard_no = 0;
MyPState->ring_flush = MyPState->ring_unused;
return true;
}
@@ -727,7 +752,6 @@ prefetch_wait_for(uint64 ring_index)
{
if (!prefetch_flush_requests())
return false;
MyPState->ring_flush = MyPState->ring_unused;
}
Assert(MyPState->ring_unused > ring_index);
@@ -806,6 +830,7 @@ prefetch_read(PrefetchRequest *slot)
old = MemoryContextSwitchTo(MyPState->errctx);
response = (NeonResponse *) page_server->receive(shard_no);
MemoryContextSwitchTo(old);
if (response)
{
check_getpage_response(slot, response);
@@ -1014,11 +1039,16 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns
Assert(mySlotNo == MyPState->ring_unused);
if (force_request_lsns)
{
slot->request_lsns = *force_request_lsns;
}
else
{
neon_get_request_lsns(BufTagGetNRelFileInfo(slot->buftag),
slot->buftag.forkNum, slot->buftag.blockNum,
&slot->request_lsns, 1);
last_replay_lsn = InvalidXLogRecPtr;
}
request.hdr.lsn = slot->request_lsns.request_lsn;
request.hdr.not_modified_since = slot->request_lsns.not_modified_since;
@@ -1037,7 +1067,7 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns
MyPState->n_unused -= 1;
MyPState->ring_unused += 1;
BITMAP_SET(MyPState->shard_bitmap, slot->shard_no);
MyPState->max_shard_no = Max(slot->shard_no+1, MyPState->max_shard_no);
MyPState->max_unflushed_shard_no = Max(slot->shard_no+1, MyPState->max_unflushed_shard_no);
/* update slot state */
slot->status = PRFS_REQUESTED;
@@ -1045,6 +1075,25 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns
Assert(!found);
}
/*
* Check that returned page LSN is consistent with request lsns
*/
static void
check_page_lsn(NeonGetPageResponse* resp)
{
if (neon_protocol_version < 3) /* no information to check */
return;
if (PageGetLSN(resp->page) > resp->req.hdr.not_modified_since)
neon_log(PANIC, "Invalid getpage response version: %X/%08X is higher than last modified LSN %X/%08X",
LSN_FORMAT_ARGS(PageGetLSN(resp->page)),
LSN_FORMAT_ARGS(resp->req.hdr.not_modified_since));
if (PageGetLSN(resp->page) > resp->req.hdr.lsn)
neon_log(PANIC, "Invalid getpage response version: %X/%08X is higher than request LSN %X/%08X",
LSN_FORMAT_ARGS(PageGetLSN(resp->page)),
LSN_FORMAT_ARGS(resp->req.hdr.lsn));
}
/*
* Lookup of already received prefetch requests. Only already received responses matching required LSNs are accepted.
* Present pages are marked in "mask" bitmap and total number of such pages is returned.
@@ -1068,7 +1117,7 @@ communicator_prefetch_lookupv(NRelFileInfo rinfo, ForkNumber forknum, BlockNumbe
for (int i = 0; i < nblocks; i++)
{
PrfHashEntry *entry;
NeonGetPageResponse* resp;
hashkey.buftag.blockNum = blocknum + i;
entry = prfh_lookup(MyPState->prf_hash, &hashkey);
@@ -1101,8 +1150,9 @@ communicator_prefetch_lookupv(NRelFileInfo rinfo, ForkNumber forknum, BlockNumbe
continue;
}
Assert(slot->response->tag == T_NeonGetPageResponse); /* checked by check_getpage_response when response was assigned to the slot */
memcpy(buffers[i], ((NeonGetPageResponse*)slot->response)->page, BLCKSZ);
resp = (NeonGetPageResponse*)slot->response;
check_page_lsn(resp);
memcpy(buffers[i], resp->page, BLCKSZ);
/*
* With lfc_store_prefetch_result=true prefetch result is stored in LFC in prefetch_pump_state when response is received
@@ -1395,7 +1445,6 @@ Retry:
*/
goto Retry;
}
MyPState->ring_flush = MyPState->ring_unused;
}
return last_ring_index;
@@ -1465,10 +1514,12 @@ page_server_request(void const *req)
MyNeonCounters->pageserver_open_requests--;
} while (resp == NULL);
cancel_before_shmem_exit(prefetch_on_exit, Int32GetDatum(shard_no));
last_replay_lsn = InvalidXLogRecPtr;
}
PG_CATCH();
{
cancel_before_shmem_exit(prefetch_on_exit, Int32GetDatum(shard_no));
last_replay_lsn = InvalidXLogRecPtr;
/* Nothing should cancel disconnect: we should not leave connection in opaque state */
HOLD_INTERRUPTS();
page_server->disconnect(shard_no);
@@ -1868,6 +1919,13 @@ nm_to_string(NeonMessage *msg)
return s.data;
}
static void
reset_min_request_lsn(int code, Datum arg)
{
if (MyProcNumber != -1)
MIN_BACKEND_REQUEST_LSN = InvalidXLogRecPtr;
}
/*
* communicator_init() -- Initialize per-backend private state
*/
@@ -1879,6 +1937,8 @@ communicator_init(void)
if (MyPState != NULL)
return;
before_shmem_exit(reset_min_request_lsn, 0);
/*
* Sanity check that theperf counters array is sized correctly. We got
* this wrong once, and the formula for max number of backends and aux
@@ -1888,7 +1948,7 @@ communicator_init(void)
* the check here. That's OK, we don't expect the logic to change in old
* releases.
*/
#if PG_VERSION_NUM>=150000
#if PG_MAJORVERSION_NUM >= 15
if (MyNeonCounters >= &neon_per_backend_counters_shared[NUM_NEON_PERF_COUNTER_SLOTS])
elog(ERROR, "MyNeonCounters points past end of array");
#endif
@@ -2227,6 +2287,7 @@ Retry:
case T_NeonGetPageResponse:
{
NeonGetPageResponse* getpage_resp = (NeonGetPageResponse *) resp;
check_page_lsn(getpage_resp);
memcpy(buffer, getpage_resp->page, BLCKSZ);
/*
@@ -2503,12 +2564,30 @@ communicator_reconfigure_timeout_if_needed(void)
!AmPrewarmWorker && /* do not pump prefetch state in prewarm worker */
readahead_getpage_pull_timeout_ms > 0;
if (!needs_set && MIN_BACKEND_REQUEST_LSN != InvalidXLogRecPtr)
{
if (last_replay_lsn == InvalidXLogRecPtr)
MIN_BACKEND_REQUEST_LSN = InvalidXLogRecPtr;
else
needs_set = true; /* Can not reset MIN_BACKEND_REQUEST_LSN now, have to do it later */
}
if (needs_set != timeout_set)
{
/* The background writer doens't (shouldn't) read any pages */
Assert(!AmBackgroundWriterProcess());
/* The checkpointer doens't (shouldn't) read any pages */
Assert(!AmCheckpointerProcess());
/*
* The background writer/checkpointer doens't (shouldn't) read any pages.
* And definitely they should not run on replica.
* The only case when we can get here is replica promotion.
*/
if (AmBackgroundWriterProcess() || AmCheckpointerProcess())
{
MIN_BACKEND_REQUEST_LSN = InvalidXLogRecPtr;
if (timeout_set)
{
disable_timeout(PS_TIMEOUT_ID, false);
timeout_set = false;
}
return;
}
if (unlikely(PS_TIMEOUT_ID == 0))
{
@@ -2541,14 +2620,6 @@ communicator_reconfigure_timeout_if_needed(void)
static void
pagestore_timeout_handler(void)
{
#if PG_MAJORVERSION_NUM <= 14
/*
* PG14: Setting a repeating timeout is not possible, so we signal here
* that the timeout has already been reset, and by telling the system
* that system will re-schedule it later if we need to.
*/
timeout_set = false;
#endif
timeout_signaled = true;
InterruptPending = true;
}
@@ -2568,6 +2639,14 @@ communicator_processinterrupts(void)
if (!readpage_reentrant_guard && readahead_getpage_pull_timeout_ms > 0)
communicator_prefetch_pump_state();
#if PG_MAJORVERSION_NUM <= 14
/*
* PG14: Setting a repeating timeout is not possible, so we signal here
* that the timeout has already been reset, and by telling the system
* that system will re-schedule it later if we need to.
*/
timeout_set = false;
#endif
timeout_signaled = false;
communicator_reconfigure_timeout_if_needed();
}
@@ -2577,3 +2656,28 @@ communicator_processinterrupts(void)
return prev_interrupt_cb();
}
PG_FUNCTION_INFO_V1(neon_communicator_min_inflight_request_lsn);
Datum
neon_communicator_min_inflight_request_lsn(PG_FUNCTION_ARGS)
{
if (RecoveryInProgress())
{
/* Do not hold GC for primary */
PG_RETURN_INT64(UINT64_MAX);
}
else
{
XLogRecPtr min_lsn = GetXLogReplayRecPtr(NULL);
size_t n_procs = ProcGlobal->allProcCount;
for (size_t i = 0; i < n_procs; i++)
{
if (neon_per_backend_counters_shared[i].min_request_lsn != InvalidXLogRecPtr)
{
min_lsn = Min(min_lsn, neon_per_backend_counters_shared[i].min_request_lsn);
}
}
PG_RETURN_INT64(min_lsn);
}
}

View File

@@ -635,6 +635,11 @@ lfc_init(void)
NULL);
}
/*
* Dump a list of pages that are currently in the LFC
*
* This is used to get a snapshot that can be used to prewarm the LFC later.
*/
FileCacheState*
lfc_get_state(size_t max_entries)
{
@@ -2267,4 +2272,3 @@ get_prewarm_info(PG_FUNCTION_ARGS)
PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
}

View File

@@ -0,0 +1,3 @@
create function neon_communicator_min_inflight_request_lsn() returns pg_catalog.pg_lsn
AS 'MODULE_PATHNAME', 'neon_communicator_min_inflight_request_lsn'
LANGUAGE C;

View File

@@ -0,0 +1 @@
drop function neon_communicator_min_inflight_request_lsn();

View File

@@ -1,7 +1,7 @@
/*-------------------------------------------------------------------------
*
* neon.c
* Main entry point into the neon exension
* Main entry point into the neon extension
*
*-------------------------------------------------------------------------
*/
@@ -508,7 +508,7 @@ _PG_init(void)
DefineCustomBoolVariable(
"neon.disable_logical_replication_subscribers",
"Disables incomming logical replication",
"Disable incoming logical replication",
NULL,
&disable_logical_replication_subscribers,
false,
@@ -567,7 +567,7 @@ _PG_init(void)
DefineCustomEnumVariable(
"neon.debug_compare_local",
"Debug mode for compaing content of pages in prefetch ring/LFC/PS and local disk",
"Debug mode for comparing content of pages in prefetch ring/LFC/PS and local disk",
NULL,
&debug_compare_local,
DEBUG_COMPARE_LOCAL_NONE,
@@ -735,7 +735,6 @@ neon_shmem_request_hook(void)
static void
neon_shmem_startup_hook(void)
{
/* Initialize */
if (prev_shmem_startup_hook)
prev_shmem_startup_hook();

View File

@@ -42,7 +42,6 @@ NeonPerfCountersShmemRequest(void)
}
void
NeonPerfCountersShmemInit(void)
{

View File

@@ -154,6 +154,11 @@ typedef struct
* Histogram of query execution time.
*/
QTHistogramData query_time_hist;
/*
* Minimal LSN of in-fligth request requests
*/
XLogRecPtr min_request_lsn;
} neon_per_backend_counters;
/* Pointer to the shared memory array of neon_per_backend_counters structs */
@@ -167,11 +172,13 @@ extern neon_per_backend_counters *neon_per_backend_counters_shared;
*/
#define NUM_NEON_PERF_COUNTER_SLOTS (MaxBackends + NUM_AUXILIARY_PROCS)
#if PG_VERSION_NUM >= 170000
#define MyNeonCounters (&neon_per_backend_counters_shared[MyProcNumber])
#else
#define MyNeonCounters (&neon_per_backend_counters_shared[MyProc->pgprocno])
#endif
/*
* Backend-local minimal in-flight request LSN.
* We store it in neon_per_backend_counters_shared and not in separate array to minimize false cache sharing
*/
#define MIN_BACKEND_REQUEST_LSN MyNeonCounters->min_request_lsn
extern void inc_getpage_wait(uint64 latency);
extern void inc_page_cache_read_wait(uint64 latency);

View File

@@ -9,6 +9,10 @@
#include "fmgr.h"
#include "storage/buf_internals.h"
#if PG_MAJORVERSION_NUM < 16
typedef PGAlignedBlock PGIOAlignedBlock;
#endif
#if PG_MAJORVERSION_NUM < 17
#define NRelFileInfoBackendIsTemp(rinfo) (rinfo.backend != InvalidBackendId)
#else
@@ -158,6 +162,10 @@ InitBufferTag(BufferTag *tag, const RelFileNode *rnode,
#define AmAutoVacuumWorkerProcess() (IsAutoVacuumWorkerProcess())
#endif
#if PG_MAJORVERSION_NUM < 17
#define MyProcNumber (MyProc - &ProcGlobal->allProcs[0])
#endif
#if PG_MAJORVERSION_NUM < 15
extern void InitMaterializedSRF(FunctionCallInfo fcinfo, bits32 flags);
extern TimeLineID GetWALInsertionTimeLine(void);

View File

@@ -243,6 +243,7 @@ extern char *neon_timeline;
extern char *neon_tenant;
extern int32 max_cluster_size;
extern int neon_protocol_version;
extern XLogRecPtr last_replay_lsn;
extern shardno_t get_shard_number(BufferTag* tag);

View File

@@ -72,10 +72,6 @@
#include "access/xlogrecovery.h"
#endif
#if PG_VERSION_NUM < 160000
typedef PGAlignedBlock PGIOAlignedBlock;
#endif
#include "access/nbtree.h"
#include "storage/bufpage.h"
#include "access/xlog_internal.h"
@@ -100,6 +96,8 @@ typedef enum
int debug_compare_local;
XLogRecPtr last_replay_lsn;
static NRelFileInfo unlogged_build_rel_info;
static UnloggedBuildPhase unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS;
@@ -163,7 +161,7 @@ log_newpages_copy(NRelFileInfo * rinfo, ForkNumber forkNum, BlockNumber blkno,
page_std);
}
return ProcLastRecPtr;
return GetXLogInsertRecPtr();
}
#endif /* PG_MAJORVERSION_NUM >= 17 */
@@ -592,6 +590,17 @@ neon_get_request_lsns(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
/* Request the page at the end of the last fully replayed LSN. */
XLogRecPtr replay_lsn = GetXLogReplayRecPtr(NULL);
if (MIN_BACKEND_REQUEST_LSN == InvalidXLogRecPtr)
{
/* mark the backend's replay_lsn as "we have a request ongoing", blocking the expiration of any current LSN */
MIN_BACKEND_REQUEST_LSN = replay_lsn;
/* make sure memory operations are in correct order, even in concurrent systems */
pg_memory_barrier();
/* get the current LSN to register */
replay_lsn = GetXLogReplayRecPtr(NULL);
MIN_BACKEND_REQUEST_LSN = replay_lsn;
}
last_replay_lsn = replay_lsn;
for (int i = 0; i < nblocks; i++)
{
neon_request_lsns *result = &output[i];

View File

@@ -13,6 +13,7 @@
#include "neon.h"
#include "neon_pgversioncompat.h"
#include "miscadmin.h"
#include "pagestore_client.h"
#include RELFILEINFO_HDR
#include "storage/smgr.h"
@@ -23,10 +24,6 @@
#include "utils/dynahash.h"
#include "utils/guc.h"
#if PG_VERSION_NUM >= 150000
#include "miscadmin.h"
#endif
typedef struct
{
NRelFileInfo rinfo;

View File

@@ -55,7 +55,7 @@ def test_neon_extension_compatibility(neon_env_builder: NeonEnvBuilder):
# Ensure that the default version is also updated in the neon.control file
assert cur.fetchone() == ("1.6",)
cur.execute("SELECT * from neon.NEON_STAT_FILE_CACHE")
all_versions = ["1.6", "1.5", "1.4", "1.3", "1.2", "1.1", "1.0"]
all_versions = ["1.7", "1.6", "1.5", "1.4", "1.3", "1.2", "1.1", "1.0"]
current_version = "1.6"
for idx, begin_version in enumerate(all_versions):
for target_version in all_versions[idx + 1 :]:

View File

@@ -298,15 +298,26 @@ def test_pageserver_metrics_removed_after_detach(neon_env_builder: NeonEnvBuilde
assert post_detach_samples == set()
def test_pageserver_metrics_removed_after_offload(neon_env_builder: NeonEnvBuilder):
@pytest.mark.parametrize("compaction", ["compaction_enabled", "compaction_disabled"])
def test_pageserver_metrics_removed_after_offload(
neon_env_builder: NeonEnvBuilder, compaction: str
):
"""Tests that when a timeline is offloaded, the tenant specific metrics are not left behind"""
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.MOCK_S3)
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()
tenant_1, _ = env.create_tenant()
tenant_1, _ = env.create_tenant(
conf={
# disable background compaction and GC so that we don't have leftover tasks
# after offloading.
"gc_period": "0s",
"compaction_period": "0s",
}
if compaction == "compaction_disabled"
else None
)
timeline_1 = env.create_timeline("test_metrics_removed_after_offload_1", tenant_id=tenant_1)
timeline_2 = env.create_timeline("test_metrics_removed_after_offload_2", tenant_id=tenant_1)
@@ -351,6 +362,23 @@ def test_pageserver_metrics_removed_after_offload(neon_env_builder: NeonEnvBuild
state=TimelineArchivalState.ARCHIVED,
)
env.pageserver.http_client().timeline_offload(tenant_1, timeline)
# We need to wait until all background jobs are finished before we can check the metrics.
# There're many of them: compaction, GC, etc.
wait_until(
lambda: all(
sample.value == 0
for sample in env.pageserver.http_client()
.get_metrics()
.query_all("pageserver_background_loop_semaphore_waiting_tasks")
)
and all(
sample.value == 0
for sample in env.pageserver.http_client()
.get_metrics()
.query_all("pageserver_background_loop_semaphore_running_tasks")
)
)
post_offload_samples = set(
[x.name for x in get_ps_metric_samples_for_timeline(tenant_1, timeline)]
)