Compare commits

...

7 Commits

Author SHA1 Message Date
Kosntantin Knizhnik
27315d985d Address review comments 2025-07-16 18:39:31 +03:00
Konstantin Knizhnik
fb139345c4 Update pgxn/neon/neon--1.6--1.7.sql
Co-authored-by: Matthias van de Meent <matthias@neon.tech>
2025-07-16 18:25:59 +03:00
Kosntantin Knizhnik
502c36695c Address review comments 2025-07-16 18:18:47 +03:00
Konstantin Knizhnik
8002c591c2 Update pgxn/neon/communicator.c
Co-authored-by: Matthias van de Meent <matthias@neon.tech>
2025-07-16 17:38:28 +03:00
Kosntantin Knizhnik
b147722e93 Add neon_communicator_min_inflight_request_lsn function to neon extension 2025-07-16 14:53:26 +03:00
Kosntantin Knizhnik
855b6ea6aa Replace MaxBackends with MAX)_BACKENDS in array size estimation 2025-07-16 09:30:22 +03:00
Kosntantin Knizhnik
6e2af7ac3f Add function calculating min prefetch request LSN to be used for replica leases 2025-07-15 18:17:11 +03:00
15 changed files with 250 additions and 225 deletions

View File

@@ -38,6 +38,8 @@ DATA = \
neon--1.3--1.4.sql \
neon--1.4--1.5.sql \
neon--1.5--1.6.sql \
neon--1.6--1.7.sql \
neon--1.7--1.6.sql \
neon--1.6--1.5.sql \
neon--1.5--1.4.sql \
neon--1.4--1.3.sql \

View File

@@ -54,6 +54,7 @@
*/
#include "postgres.h"
#include "access/twophase.h"
#include "access/xlog.h"
#include "access/xlogdefs.h"
#include "access/xlog_internal.h"
@@ -64,6 +65,7 @@
#include "miscadmin.h"
#include "port/pg_iovec.h"
#include "postmaster/interrupt.h"
#include "postmaster/postmaster.h"
#include "replication/walsender.h"
#include "storage/ipc.h"
#include "utils/timeout.h"
@@ -75,11 +77,18 @@
#include "neon_perf_counters.h"
#include "pagestore_client.h"
#if PG_VERSION_NUM >= 150000
#if PG_MAJORVERSION_NUM >= 17
#include "storage/procnumber.h"
#else
#define MyProcNumber MyProc->pgprocno
#endif
#if PG_MAJORVERSION_NUM >= 15
#include "access/xlogrecovery.h"
#endif
#if PG_VERSION_NUM < 160000
#if PG_MAJORVERSION_NUM < 16
typedef PGAlignedBlock PGIOAlignedBlock;
#endif
@@ -294,6 +303,15 @@ static PrefetchState *MyPState;
static process_interrupts_callback_t prev_interrupt_cb;
/*
* Array in shared memory each cell of which contains minimal in-flight request LSN sent to PS by the backend which procno is
* used as index in this array. This array is initially filled with InfiniteXlogRecPtr (UINT64_MAX) so if backend
* didn't send any request to PS, then this value doesn't effect global min.
*
* We support only 64-bit platforms and so assume that access to array elements is atomic and no any synchronization is needed.
*/
static XLogRecPtr* minPrefetchLsn;
static bool compact_prefetch_buffers(void);
static void consume_prefetch_responses(void);
static uint64 prefetch_register_bufferv(BufferTag tag, neon_request_lsns *frlsns,
@@ -316,6 +334,41 @@ pg_init_communicator(void)
ProcessInterruptsCallback = communicator_processinterrupts;
}
static Size
CommunicatorShmemSize(void)
{
#if PG_MAJORVERSION_NUM >= 15
Assert(MaxBackends != 0);
return (MaxBackends + NUM_AUXILIARY_PROCS + max_prepared_xacts) * sizeof(XLogRecPtr);
#else
return (MAX_BACKENDS + NUM_AUXILIARY_PROCS + max_prepared_xacts) * sizeof(XLogRecPtr);
#endif
}
void
CommunicatorShmemRequest(void)
{
RequestAddinShmemSpace(CommunicatorShmemSize());
}
void
CommunicatorShmemInit(void)
{
bool found;
minPrefetchLsn = (XLogRecPtr*)ShmemInitStruct("Communicator shared state",
CommunicatorShmemSize(),
&found);
if (!found)
{
/*
* Fill with InfiniteXLogRecPtr (UINT64_MAX).
* If backend didn't send any requests to PS, then InfiniteXLogRecPtr doesn't affect global minimal value.
*/
memset(minPrefetchLsn, 0xFF, CommunicatorShmemSize());
}
}
static bool
compact_prefetch_buffers(void)
{
@@ -454,6 +507,20 @@ check_getpage_response(PrefetchRequest* slot, NeonResponse* resp)
}
}
/*
* Update min in-flight prefetch LSN for this backend.
*/
static void
update_min_prefetch_lsn(uint64 ring_index)
{
if (ring_index + 1 < MyPState->ring_unused)
{
PrefetchRequest* next_slot = GetPrfSlot(ring_index + 1);
Assert(minPrefetchLsn[MyProcNumber] <= next_slot->request_lsns.request_lsn);
minPrefetchLsn[MyProcNumber] = next_slot->request_lsns.request_lsn;
}
}
/*
* If there might be responses still in the TCP buffer, then we should try to
* use those, to reduce any TCP backpressure on the OS/PS side.
@@ -478,8 +545,9 @@ communicator_prefetch_pump_state(void)
NeonResponse *response;
PrefetchRequest *slot;
MemoryContext old;
uint64 my_ring_index = MyPState->ring_receive;
slot = GetPrfSlot(MyPState->ring_receive);
slot = GetPrfSlot(my_ring_index);
old = MemoryContextSwitchTo(MyPState->errctx);
response = page_server->try_receive(slot->shard_no);
@@ -488,17 +556,19 @@ communicator_prefetch_pump_state(void)
if (response == NULL)
break;
update_min_prefetch_lsn(my_ring_index);
check_getpage_response(slot, response);
/* The slot should still be valid */
if (slot->status != PRFS_REQUESTED ||
slot->response != NULL ||
slot->my_ring_index != MyPState->ring_receive)
slot->my_ring_index != my_ring_index)
{
neon_shard_log(slot->shard_no, PANIC,
"Incorrect prefetch slot state after receive: status=%d response=%p my=" UINT64_FORMAT " receive=" UINT64_FORMAT "",
slot->status, slot->response,
slot->my_ring_index, MyPState->ring_receive);
slot->my_ring_index, my_ring_index);
}
/* update prefetch state */
MyPState->n_responses_buffered += 1;
@@ -665,6 +735,9 @@ consume_prefetch_responses(void)
{
if (MyPState->ring_receive < MyPState->ring_unused)
prefetch_wait_for(MyPState->ring_unused - 1);
minPrefetchLsn[MyProcNumber] = InfiniteXLogRecPtr; /* No more in-flight prefetch requests from this backend */
/*
* We know for sure we're not working on any prefetch pages after
* this.
@@ -806,6 +879,9 @@ prefetch_read(PrefetchRequest *slot)
old = MemoryContextSwitchTo(MyPState->errctx);
response = (NeonResponse *) page_server->receive(shard_no);
MemoryContextSwitchTo(old);
update_min_prefetch_lsn(my_ring_index);
if (response)
{
check_getpage_response(slot, response);
@@ -924,6 +1000,8 @@ prefetch_on_ps_disconnect(void)
MyNeonCounters->getpage_prefetch_discards_total += 1;
}
minPrefetchLsn[MyProcNumber] = InfiniteXLogRecPtr; /* No more in-flight prefetch requests from this backend */
/*
* We can have gone into retry due to network error, so update stats with
* the latest available
@@ -1025,6 +1103,8 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns
Assert(slot->response == NULL);
Assert(slot->my_ring_index == MyPState->ring_unused);
minPrefetchLsn[MyProcNumber] = Min(request.hdr.lsn, minPrefetchLsn[MyProcNumber]);
while (!page_server->send(slot->shard_no, (NeonRequest *) &request))
{
Assert(mySlotNo == MyPState->ring_unused);
@@ -1045,6 +1125,23 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns
Assert(!found);
}
/*
* Check that returned page LSN is consistent with request lsns
*/
static void
check_page_lsn(NeonGetPageResponse* resp)
{
if (PageGetLSN(resp->page) > resp->req.hdr.not_modified_since)
neon_log(PANIC, "Invalid getpage response version: %X/%08X is higher than last modified LSN %X/%08X",
LSN_FORMAT_ARGS(PageGetLSN(resp->page)),
LSN_FORMAT_ARGS(resp->req.hdr.not_modified_since));
if (PageGetLSN(resp->page) > resp->req.hdr.lsn)
neon_log(PANIC, "Invalid getpage response version: %X/%08X is higher than request LSN %X/%08X",
LSN_FORMAT_ARGS(PageGetLSN(resp->page)),
LSN_FORMAT_ARGS(resp->req.hdr.lsn));
}
/*
* Lookup of already received prefetch requests. Only already received responses matching required LSNs are accepted.
* Present pages are marked in "mask" bitmap and total number of such pages is returned.
@@ -1068,7 +1165,7 @@ communicator_prefetch_lookupv(NRelFileInfo rinfo, ForkNumber forknum, BlockNumbe
for (int i = 0; i < nblocks; i++)
{
PrfHashEntry *entry;
NeonGetPageResponse* resp;
hashkey.buftag.blockNum = blocknum + i;
entry = prfh_lookup(MyPState->prf_hash, &hashkey);
@@ -1101,8 +1198,9 @@ communicator_prefetch_lookupv(NRelFileInfo rinfo, ForkNumber forknum, BlockNumbe
continue;
}
Assert(slot->response->tag == T_NeonGetPageResponse); /* checked by check_getpage_response when response was assigned to the slot */
memcpy(buffers[i], ((NeonGetPageResponse*)slot->response)->page, BLCKSZ);
resp = (NeonGetPageResponse*)slot->response;
check_page_lsn(resp);
memcpy(buffers[i], resp->page, BLCKSZ);
/*
* With lfc_store_prefetch_result=true prefetch result is stored in LFC in prefetch_pump_state when response is received
@@ -1453,6 +1551,7 @@ page_server_request(void const *req)
PG_TRY();
{
before_shmem_exit(prefetch_on_exit, Int32GetDatum(shard_no));
minPrefetchLsn[MyProcNumber] = ((NeonRequest *)req)->lsn;
do
{
while (!page_server->send(shard_no, (NeonRequest *) req)
@@ -1464,10 +1563,12 @@ page_server_request(void const *req)
resp = page_server->receive(shard_no);
MyNeonCounters->pageserver_open_requests--;
} while (resp == NULL);
minPrefetchLsn[MyProcNumber] = InfiniteXLogRecPtr;
cancel_before_shmem_exit(prefetch_on_exit, Int32GetDatum(shard_no));
}
PG_CATCH();
{
minPrefetchLsn[MyProcNumber] = InfiniteXLogRecPtr;
cancel_before_shmem_exit(prefetch_on_exit, Int32GetDatum(shard_no));
/* Nothing should cancel disconnect: we should not leave connection in opaque state */
HOLD_INTERRUPTS();
@@ -1888,7 +1989,7 @@ communicator_init(void)
* the check here. That's OK, we don't expect the logic to change in old
* releases.
*/
#if PG_VERSION_NUM>=150000
#if PG_MAJORVERSION_NUM >= 15
if (MyNeonCounters >= &neon_per_backend_counters_shared[NUM_NEON_PERF_COUNTER_SLOTS])
elog(ERROR, "MyNeonCounters points past end of array");
#endif
@@ -1967,7 +2068,7 @@ neon_prefetch_response_usable(neon_request_lsns *request_lsns,
* Each request to the pageserver has three LSN values associated with it:
* `not_modified_since`, `request_lsn`, and 'effective_request_lsn'.
* `not_modified_since` and `request_lsn` are sent to the pageserver, but
* in the primary node, we always use UINT64_MAX as the `request_lsn`, so
* in the primary node, we always use InfiniteXLogRecPtr as the `request_lsn`, so
* we remember `effective_request_lsn` separately. In a primary,
* `effective_request_lsn` is the same as `not_modified_since`.
* See comments in neon_get_request_lsns why we can not use last flush WAL position here.
@@ -2227,6 +2328,7 @@ Retry:
case T_NeonGetPageResponse:
{
NeonGetPageResponse* getpage_resp = (NeonGetPageResponse *) resp;
check_page_lsn(getpage_resp);
memcpy(buffer, getpage_resp->page, BLCKSZ);
/*
@@ -2424,15 +2526,18 @@ communicator_read_slru_segment(SlruKind kind, int64 segno, neon_request_lsns *re
PG_TRY();
{
before_shmem_exit(prefetch_on_exit, Int32GetDatum(shard_no));
minPrefetchLsn[MyProcNumber] = request_lsns->request_lsn;
do
{
while (!page_server->send(shard_no, &request.hdr) || !page_server->flush(shard_no));
resp = page_server->receive(shard_no);
} while (resp == NULL);
minPrefetchLsn[MyProcNumber] = InfiniteXLogRecPtr;
cancel_before_shmem_exit(prefetch_on_exit, Int32GetDatum(shard_no));
}
PG_CATCH();
{
minPrefetchLsn[MyProcNumber] = InfiniteXLogRecPtr;
cancel_before_shmem_exit(prefetch_on_exit, Int32GetDatum(shard_no));
/* Nothing should cancel disconnect: we should not leave connection in opaque state */
HOLD_INTERRUPTS();
@@ -2577,3 +2682,19 @@ communicator_processinterrupts(void)
return prev_interrupt_cb();
}
PG_FUNCTION_INFO_V1(neon_communicator_min_inflight_request_lsn);
Datum
neon_communicator_min_inflight_request_lsn(PG_FUNCTION_ARGS)
{
XLogRecPtr min_lsn = RecoveryInProgress()
? GetXLogReplayRecPtr(NULL)
: InfiniteXLogRecPtr;
size_t n_procs = ProcGlobal->allProcCount;
for (size_t i = 0; i < n_procs; i++)
{
min_lsn = Min(min_lsn, minPrefetchLsn[i]);
}
PG_RETURN_INT64(min_lsn);
}

View File

@@ -46,5 +46,4 @@ extern int communicator_read_slru_segment(SlruKind kind, int64 segno,
extern void communicator_reconfigure_timeout_if_needed(void);
extern void communicator_prefetch_pump_state(void);
#endif

View File

@@ -219,10 +219,6 @@ static char *lfc_path;
static uint64 lfc_generation;
static FileCacheControl *lfc_ctl;
static bool lfc_do_prewarm;
static shmem_startup_hook_type prev_shmem_startup_hook;
#if PG_VERSION_NUM>=150000
static shmem_request_hook_type prev_shmem_request_hook;
#endif
bool lfc_store_prefetch_result;
bool lfc_prewarm_update_ws_estimation;
@@ -342,18 +338,14 @@ lfc_ensure_opened(void)
return true;
}
static void
lfc_shmem_startup(void)
void
LfcShmemInit(void)
{
bool found;
static HASHCTL info;
if (prev_shmem_startup_hook)
{
prev_shmem_startup_hook();
}
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
if (lfc_max_size <= 0)
return;
lfc_ctl = (FileCacheControl *) ShmemInitStruct("lfc", sizeof(FileCacheControl), &found);
if (!found)
@@ -398,19 +390,16 @@ lfc_shmem_startup(void)
ConditionVariableInit(&lfc_ctl->cv[i]);
}
LWLockRelease(AddinShmemInitLock);
}
static void
lfc_shmem_request(void)
void
LfcShmemRequest(void)
{
#if PG_VERSION_NUM>=150000
if (prev_shmem_request_hook)
prev_shmem_request_hook();
#endif
RequestAddinShmemSpace(sizeof(FileCacheControl) + hash_estimate_size(SIZE_MB_TO_CHUNKS(lfc_max_size) + 1, FILE_CACHE_ENRTY_SIZE));
RequestNamedLWLockTranche("lfc_lock", 1);
if (lfc_max_size > 0)
{
RequestAddinShmemSpace(sizeof(FileCacheControl) + hash_estimate_size(SIZE_MB_TO_CHUNKS(lfc_max_size) + 1, FILE_CACHE_ENRTY_SIZE));
RequestNamedLWLockTranche("lfc_lock", 1);
}
}
static bool
@@ -642,18 +631,6 @@ lfc_init(void)
NULL,
NULL,
NULL);
if (lfc_max_size == 0)
return;
prev_shmem_startup_hook = shmem_startup_hook;
shmem_startup_hook = lfc_shmem_startup;
#if PG_VERSION_NUM>=150000
prev_shmem_request_hook = shmem_request_hook;
shmem_request_hook = lfc_shmem_request;
#else
lfc_shmem_request();
#endif
}
FileCacheState*

View File

@@ -118,10 +118,6 @@ typedef struct
ShardMap shard_map;
} PagestoreShmemState;
#if PG_VERSION_NUM >= 150000
static shmem_request_hook_type prev_shmem_request_hook = NULL;
#endif
static shmem_startup_hook_type prev_shmem_startup_hook;
static PagestoreShmemState *pagestore_shared;
static uint64 pagestore_local_counter = 0;
@@ -1284,18 +1280,12 @@ check_neon_id(char **newval, void **extra, GucSource source)
return **newval == '\0' || HexDecodeString(id, *newval, 16);
}
static Size
PagestoreShmemSize(void)
{
return add_size(sizeof(PagestoreShmemState), NeonPerfCountersShmemSize());
}
static bool
void
PagestoreShmemInit(void)
{
bool found;
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
pagestore_shared = ShmemInitStruct("libpagestore shared state",
sizeof(PagestoreShmemState),
&found);
@@ -1306,44 +1296,12 @@ PagestoreShmemInit(void)
memset(&pagestore_shared->shard_map, 0, sizeof(ShardMap));
AssignPageserverConnstring(page_server_connstring, NULL);
}
NeonPerfCountersShmemInit();
LWLockRelease(AddinShmemInitLock);
return found;
}
static void
pagestore_shmem_startup_hook(void)
void
PagestoreShmemRequest(void)
{
if (prev_shmem_startup_hook)
prev_shmem_startup_hook();
PagestoreShmemInit();
}
static void
pagestore_shmem_request(void)
{
#if PG_VERSION_NUM >= 150000
if (prev_shmem_request_hook)
prev_shmem_request_hook();
#endif
RequestAddinShmemSpace(PagestoreShmemSize());
}
static void
pagestore_prepare_shmem(void)
{
#if PG_VERSION_NUM >= 150000
prev_shmem_request_hook = shmem_request_hook;
shmem_request_hook = pagestore_shmem_request;
#else
pagestore_shmem_request();
#endif
prev_shmem_startup_hook = shmem_startup_hook;
shmem_startup_hook = pagestore_shmem_startup_hook;
RequestAddinShmemSpace(sizeof(PagestoreShmemState));
}
/*
@@ -1352,8 +1310,6 @@ pagestore_prepare_shmem(void)
void
pg_init_libpagestore(void)
{
pagestore_prepare_shmem();
DefineCustomStringVariable("neon.pageserver_connstring",
"connection string to the page server",
NULL,
@@ -1504,8 +1460,6 @@ pg_init_libpagestore(void)
0,
NULL, NULL, NULL);
relsize_hash_init();
if (page_server != NULL)
neon_log(ERROR, "libpagestore already loaded");

View File

@@ -0,0 +1,3 @@
create function neon_communicator_min_inflight_request_lsn() returns pg_catalog.pg_lsn
AS 'MODULE_PATHNAME', 'neon_communicator_min_inflight_request_lsn'
LANGUAGE C;

View File

@@ -0,0 +1 @@
drop function neon_communicator_min_inflight_request_lsn();

View File

@@ -22,6 +22,7 @@
#include "replication/slot.h"
#include "replication/walsender.h"
#include "storage/proc.h"
#include "storage/ipc.h"
#include "funcapi.h"
#include "access/htup_details.h"
#include "utils/builtins.h"
@@ -59,11 +60,15 @@ static ExecutorEnd_hook_type prev_ExecutorEnd = NULL;
static void neon_ExecutorStart(QueryDesc *queryDesc, int eflags);
static void neon_ExecutorEnd(QueryDesc *queryDesc);
#if PG_MAJORVERSION_NUM >= 16
static shmem_startup_hook_type prev_shmem_startup_hook;
static void neon_shmem_startup_hook(void);
static void neon_shmem_request_hook(void);
#if PG_MAJORVERSION_NUM >= 15
static shmem_request_hook_type prev_shmem_request_hook = NULL;
#endif
#if PG_MAJORVERSION_NUM >= 17
uint32 WAIT_EVENT_NEON_LFC_MAINTENANCE;
uint32 WAIT_EVENT_NEON_LFC_READ;
@@ -450,15 +455,13 @@ _PG_init(void)
*/
#if PG_VERSION_NUM >= 160000
load_file("$libdir/neon_rmgr", false);
prev_shmem_startup_hook = shmem_startup_hook;
shmem_startup_hook = neon_shmem_startup_hook;
#endif
/* dummy call to a Rust function in the communicator library, to check that it works */
(void) communicator_dummy(123);
pg_init_libpagestore();
relsize_hash_init();
lfc_init();
pg_init_walproposer();
init_lwlsncache();
@@ -552,6 +555,16 @@ _PG_init(void)
ReportSearchPath();
#if PG_VERSION_NUM >= 150000
prev_shmem_request_hook = shmem_request_hook;
shmem_request_hook = neon_shmem_request_hook;
#else
neon_shmem_request_hook();
#endif
prev_shmem_startup_hook = shmem_startup_hook;
shmem_startup_hook = neon_shmem_startup_hook;
prev_ExecutorStart = ExecutorStart_hook;
ExecutorStart_hook = neon_ExecutorStart;
prev_ExecutorEnd = ExecutorEnd_hook;
@@ -637,7 +650,24 @@ approximate_working_set_size(PG_FUNCTION_ARGS)
PG_RETURN_INT32(dc);
}
#if PG_MAJORVERSION_NUM >= 16
static void
neon_shmem_request_hook(void)
{
#if PG_VERSION_NUM >= 150000
if (prev_shmem_request_hook)
prev_shmem_request_hook();
#endif
LfcShmemRequest();
NeonPerfCountersShmemRequest();
PagestoreShmemRequest();
RelsizeCacheShmemRequest();
CommunicatorShmemRequest();
WalproposerShmemRequest();
LwLsnCacheShmemRequest();
}
static void
neon_shmem_startup_hook(void)
{
@@ -645,6 +675,16 @@ neon_shmem_startup_hook(void)
if (prev_shmem_startup_hook)
prev_shmem_startup_hook();
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
LfcShmemInit();
NeonPerfCountersShmemInit();
PagestoreShmemInit();
RelsizeCacheShmemInit();
CommunicatorShmemInit();
WalproposerShmemInit();
LwLsnCacheShmemInit();
#if PG_MAJORVERSION_NUM >= 17
WAIT_EVENT_NEON_LFC_MAINTENANCE = WaitEventExtensionNew("Neon/FileCache_Maintenance");
WAIT_EVENT_NEON_LFC_READ = WaitEventExtensionNew("Neon/FileCache_Read");
@@ -657,8 +697,9 @@ neon_shmem_startup_hook(void)
WAIT_EVENT_NEON_PS_READ = WaitEventExtensionNew("Neon/PS_ReadIO");
WAIT_EVENT_NEON_WAL_DL = WaitEventExtensionNew("Neon/WAL_Download");
#endif
LWLockRelease(AddinShmemInitLock);
}
#endif
/*
* ExecutorStart hook: start up tracking if needed

View File

@@ -58,6 +58,7 @@ extern uint32 WAIT_EVENT_NEON_WAL_DL;
(errmsg(NEON_TAG "[shard %d] " fmt, shard_no, ##__VA_ARGS__), \
errhidestmt(true), errhidecontext(true), errposition(0), internalerrposition(0)))
#define InfiniteXLogRecPtr UINT64_MAX
extern void pg_init_libpagestore(void);
extern void pg_init_walproposer(void);
@@ -71,4 +72,21 @@ extern PGDLLEXPORT void WalProposerSync(int argc, char *argv[]);
extern PGDLLEXPORT void WalProposerMain(Datum main_arg);
extern PGDLLEXPORT void LogicalSlotsMonitorMain(Datum main_arg);
extern void LfcShmemRequest(void);
extern void PagestoreShmemRequest(void);
extern void RelsizeCacheShmemRequest(void);
extern void CommunicatorShmemRequest(void);
extern void WalproposerShmemRequest(void);
extern void LwLsnCacheShmemRequest(void);
extern void NeonPerfCountersShmemRequest(void);
extern void LfcShmemInit(void);
extern void PagestoreShmemInit(void);
extern void RelsizeCacheShmemInit(void);
extern void CommunicatorShmemInit(void);
extern void WalproposerShmemInit(void);
extern void LwLsnCacheShmemInit(void);
extern void NeonPerfCountersShmemInit(void);
#endif /* NEON_H */

View File

@@ -1,5 +1,6 @@
#include "postgres.h"
#include "neon.h"
#include "neon_lwlsncache.h"
#include "miscadmin.h"
@@ -81,14 +82,6 @@ static set_max_lwlsn_hook_type prev_set_max_lwlsn_hook = NULL;
static set_lwlsn_relation_hook_type prev_set_lwlsn_relation_hook = NULL;
static set_lwlsn_db_hook_type prev_set_lwlsn_db_hook = NULL;
static shmem_startup_hook_type prev_shmem_startup_hook;
#if PG_VERSION_NUM >= 150000
static shmem_request_hook_type prev_shmem_request_hook;
#endif
static void shmemrequest(void);
static void shmeminit(void);
static void neon_set_max_lwlsn(XLogRecPtr lsn);
void
@@ -99,16 +92,6 @@ init_lwlsncache(void)
lwlc_register_gucs();
prev_shmem_startup_hook = shmem_startup_hook;
shmem_startup_hook = shmeminit;
#if PG_VERSION_NUM >= 150000
prev_shmem_request_hook = shmem_request_hook;
shmem_request_hook = shmemrequest;
#else
shmemrequest();
#endif
prev_set_lwlsn_block_range_hook = set_lwlsn_block_range_hook;
set_lwlsn_block_range_hook = neon_set_lwlsn_block_range;
prev_set_lwlsn_block_v_hook = set_lwlsn_block_v_hook;
@@ -124,20 +107,19 @@ init_lwlsncache(void)
}
static void shmemrequest(void) {
void
LwLsnCacheShmemRequest(void)
{
Size requested_size = sizeof(LwLsnCacheCtl);
requested_size += hash_estimate_size(lwlsn_cache_size, sizeof(LastWrittenLsnCacheEntry));
RequestAddinShmemSpace(requested_size);
#if PG_VERSION_NUM >= 150000
if (prev_shmem_request_hook)
prev_shmem_request_hook();
#endif
}
static void shmeminit(void) {
void
LwLsnCacheShmemInit(void)
{
static HASHCTL info;
bool found;
if (lwlsn_cache_size > 0)
@@ -157,9 +139,6 @@ static void shmeminit(void) {
}
dlist_init(&LwLsnCache->lastWrittenLsnLRU);
LwLsnCache->maxLastWrittenLsn = GetRedoRecPtr();
if (prev_shmem_startup_hook) {
prev_shmem_startup_hook();
}
}
/*

View File

@@ -17,22 +17,21 @@
#include "storage/shmem.h"
#include "utils/builtins.h"
#include "neon.h"
#include "neon_perf_counters.h"
#include "neon_pgversioncompat.h"
neon_per_backend_counters *neon_per_backend_counters_shared;
Size
NeonPerfCountersShmemSize(void)
void
NeonPerfCountersShmemRequest(void)
{
Size size = 0;
size = add_size(size, mul_size(NUM_NEON_PERF_COUNTER_SLOTS,
sizeof(neon_per_backend_counters)));
return size;
Size size = mul_size(NUM_NEON_PERF_COUNTER_SLOTS, sizeof(neon_per_backend_counters));
RequestAddinShmemSpace(size);
}
void
NeonPerfCountersShmemInit(void)
{

View File

@@ -250,7 +250,6 @@ extern const f_smgr *smgr_neon(ProcNumber backend, NRelFileInfo rinfo);
extern void smgr_init_neon(void);
extern void readahead_buffer_resize(int newsize, void *extra);
/*
* LSN values associated with each request to the pageserver
*/

View File

@@ -675,7 +675,7 @@ neon_get_request_lsns(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
* always have that problem as the can always lag behind the
* primary, but for the primary we can avoid it by always
* requesting the latest page, by setting request LSN to
* UINT64_MAX.
* InfiniteXLogRecPtr.
*
* effective_request_lsn is used to check that received response is still valid.
* In case of primary node it is last written LSN. Originally we used flush_lsn here,
@@ -703,7 +703,7 @@ neon_get_request_lsns(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
* The problem can be fixed by callingGetFlushRecPtr() before checking if the page is in the buffer cache.
* But you can't do that within smgrprefetch(), would need to modify the caller.
*/
result->request_lsn = UINT64_MAX;
result->request_lsn = InfiniteXLogRecPtr;
result->not_modified_since = last_written_lsn;
result->effective_request_lsn = last_written_lsn;
}
@@ -2158,7 +2158,7 @@ neon_read_slru_segment(SMgrRelation reln, const char* path, int segno, void* buf
request_lsn = nm_adjust_lsn(request_lsn);
}
else
request_lsn = UINT64_MAX;
request_lsn = InfiniteXLogRecPtr;
/*
* GetRedoStartLsn() returns LSN of the basebackup. We know that the SLRU

View File

@@ -10,6 +10,7 @@
*/
#include "postgres.h"
#include "neon.h"
#include "neon_pgversioncompat.h"
#include "pagestore_client.h"
@@ -49,32 +50,23 @@ typedef struct
* algorithm */
} RelSizeHashControl;
static HTAB *relsize_hash;
static LWLockId relsize_lock;
static int relsize_hash_size;
static RelSizeHashControl* relsize_ctl;
static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
#if PG_VERSION_NUM >= 150000
static shmem_request_hook_type prev_shmem_request_hook = NULL;
static void relsize_shmem_request(void);
#endif
/*
* Size of a cache entry is 36 bytes. So this default will take about 2.3 MB,
* which seems reasonable.
*/
#define DEFAULT_RELSIZE_HASH_SIZE (64 * 1024)
static void
neon_smgr_shmem_startup(void)
static HTAB *relsize_hash;
static LWLockId relsize_lock;
static int relsize_hash_size = DEFAULT_RELSIZE_HASH_SIZE;
static RelSizeHashControl* relsize_ctl;
void
RelsizeCacheShmemInit(void)
{
static HASHCTL info;
bool found;
if (prev_shmem_startup_hook)
prev_shmem_startup_hook();
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
relsize_ctl = (RelSizeHashControl *) ShmemInitStruct("relsize_hash", sizeof(RelSizeHashControl), &found);
if (!found)
{
@@ -85,7 +77,6 @@ neon_smgr_shmem_startup(void)
relsize_hash_size, relsize_hash_size,
&info,
HASH_ELEM | HASH_BLOBS);
LWLockRelease(AddinShmemInitLock);
relsize_ctl->size = 0;
relsize_ctl->hits = 0;
relsize_ctl->misses = 0;
@@ -242,34 +233,15 @@ relsize_hash_init(void)
PGC_POSTMASTER,
0,
NULL, NULL, NULL);
if (relsize_hash_size > 0)
{
#if PG_VERSION_NUM >= 150000
prev_shmem_request_hook = shmem_request_hook;
shmem_request_hook = relsize_shmem_request;
#else
RequestAddinShmemSpace(hash_estimate_size(relsize_hash_size, sizeof(RelSizeEntry)));
RequestNamedLWLockTranche("neon_relsize", 1);
#endif
prev_shmem_startup_hook = shmem_startup_hook;
shmem_startup_hook = neon_smgr_shmem_startup;
}
}
#if PG_VERSION_NUM >= 150000
/*
* shmem_request hook: request additional shared resources. We'll allocate or
* attach to the shared resources in neon_smgr_shmem_startup().
*/
static void
relsize_shmem_request(void)
void
RelsizeCacheShmemRequest(void)
{
if (prev_shmem_request_hook)
prev_shmem_request_hook();
RequestAddinShmemSpace(sizeof(RelSizeHashControl) + hash_estimate_size(relsize_hash_size, sizeof(RelSizeEntry)));
RequestNamedLWLockTranche("neon_relsize", 1);
}
#endif

View File

@@ -83,10 +83,8 @@ static XLogRecPtr standby_flush_lsn = InvalidXLogRecPtr;
static XLogRecPtr standby_apply_lsn = InvalidXLogRecPtr;
static HotStandbyFeedback agg_hs_feedback;
static void nwp_shmem_startup_hook(void);
static void nwp_register_gucs(void);
static void assign_neon_safekeepers(const char *newval, void *extra);
static void nwp_prepare_shmem(void);
static uint64 backpressure_lag_impl(void);
static uint64 startup_backpressure_wrap(void);
static bool backpressure_throttling_impl(void);
@@ -99,11 +97,6 @@ static TimestampTz walprop_pg_get_current_timestamp(WalProposer *wp);
static void walprop_pg_load_libpqwalreceiver(void);
static process_interrupts_callback_t PrevProcessInterruptsCallback = NULL;
static shmem_startup_hook_type prev_shmem_startup_hook_type;
#if PG_VERSION_NUM >= 150000
static shmem_request_hook_type prev_shmem_request_hook = NULL;
static void walproposer_shmem_request(void);
#endif
static void WalproposerShmemInit_SyncSafekeeper(void);
@@ -193,8 +186,6 @@ pg_init_walproposer(void)
nwp_register_gucs();
nwp_prepare_shmem();
delay_backend_us = &startup_backpressure_wrap;
PrevProcessInterruptsCallback = ProcessInterruptsCallback;
ProcessInterruptsCallback = backpressure_throttling_impl;
@@ -482,12 +473,11 @@ WalproposerShmemSize(void)
return sizeof(WalproposerShmemState);
}
static bool
void
WalproposerShmemInit(void)
{
bool found;
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
walprop_shared = ShmemInitStruct("Walproposer shared state",
sizeof(WalproposerShmemState),
&found);
@@ -504,9 +494,6 @@ WalproposerShmemInit(void)
pg_atomic_init_u32(&walprop_shared->wal_rate_limiter.should_limit, 0);
/* END_HADRON */
}
LWLockRelease(AddinShmemInitLock);
return found;
}
static void
@@ -609,42 +596,15 @@ walprop_register_bgworker(void)
/* shmem handling */
static void
nwp_prepare_shmem(void)
{
#if PG_VERSION_NUM >= 150000
prev_shmem_request_hook = shmem_request_hook;
shmem_request_hook = walproposer_shmem_request;
#else
RequestAddinShmemSpace(WalproposerShmemSize());
#endif
prev_shmem_startup_hook_type = shmem_startup_hook;
shmem_startup_hook = nwp_shmem_startup_hook;
}
#if PG_VERSION_NUM >= 150000
/*
* shmem_request hook: request additional shared resources. We'll allocate or
* attach to the shared resources in nwp_shmem_startup_hook().
* attach to the shared resources in WalproposerShmemInit().
*/
static void
walproposer_shmem_request(void)
void
WalproposerShmemRequest(void)
{
if (prev_shmem_request_hook)
prev_shmem_request_hook();
RequestAddinShmemSpace(WalproposerShmemSize());
}
#endif
static void
nwp_shmem_startup_hook(void)
{
if (prev_shmem_startup_hook_type)
prev_shmem_startup_hook_type();
WalproposerShmemInit();
}
WalproposerShmemState *
GetWalpropShmemState(void)