Add function calculating min prefetch request LSN to be used for replica leases

This commit is contained in:
Kosntantin Knizhnik
2025-07-15 18:17:11 +03:00
parent 921a4f2009
commit 6e2af7ac3f
11 changed files with 218 additions and 207 deletions

View File

@@ -54,6 +54,7 @@
*/
#include "postgres.h"
#include "access/twophase.h"
#include "access/xlog.h"
#include "access/xlogdefs.h"
#include "access/xlog_internal.h"
@@ -75,11 +76,18 @@
#include "neon_perf_counters.h"
#include "pagestore_client.h"
#if PG_VERSION_NUM >= 150000
#if PG_MAJORVERSION_NUM >= 17
#include "storage/procnumber.h"
#else
#define MyProcNumber MyProc->pgprocno
#endif
#if PG_MAJORVERSION_NUM >= 15
#include "access/xlogrecovery.h"
#endif
#if PG_VERSION_NUM < 160000
#if PG_MAJORVERSION_NUM < 16
typedef PGAlignedBlock PGIOAlignedBlock;
#endif
@@ -293,6 +301,7 @@ static PrefetchState *MyPState;
)
static process_interrupts_callback_t prev_interrupt_cb;
static XLogRecPtr* minPrefetchLsn;
static bool compact_prefetch_buffers(void);
static void consume_prefetch_responses(void);
@@ -316,6 +325,33 @@ pg_init_communicator(void)
ProcessInterruptsCallback = communicator_processinterrupts;
}
static Size
CommunicatorShmemSize(void)
{
return (MaxBackends + NUM_AUXILIARY_PROCS + max_prepared_xacts) * sizeof(XLogRecPtr);
}
void
CommunicatorShmemRequest(void)
{
RequestAddinShmemSpace(CommunicatorShmemSize());
}
void
CommunicatorShmemInit(void)
{
bool found;
minPrefetchLsn = (XLogRecPtr*)ShmemInitStruct("Communicator shared state",
CommunicatorShmemSize(),
&found);
if (!found)
{
/* Fill with MAX_UINT64 */
memset(minPrefetchLsn, 0xFF, CommunicatorShmemSize());
}
}
static bool
compact_prefetch_buffers(void)
{
@@ -478,8 +514,9 @@ communicator_prefetch_pump_state(void)
NeonResponse *response;
PrefetchRequest *slot;
MemoryContext old;
uint64 my_ring_index = MyPState->ring_receive;
slot = GetPrfSlot(MyPState->ring_receive);
slot = GetPrfSlot(my_ring_index);
old = MemoryContextSwitchTo(MyPState->errctx);
response = page_server->try_receive(slot->shard_no);
@@ -488,17 +525,25 @@ communicator_prefetch_pump_state(void)
if (response == NULL)
break;
/* Update min in-flight prefetch reqwuest LSN */
if (my_ring_index + 1 < MyPState->ring_unused)
{
PrefetchRequest* next_slot = GetPrfSlot(my_ring_index + 1);
Assert(minPrefetchLsn[MyProcNumber] <= next_slot->request_lsns.request_lsn);
minPrefetchLsn[MyProcNumber] = next_slot->request_lsns.request_lsn;
}
check_getpage_response(slot, response);
/* The slot should still be valid */
if (slot->status != PRFS_REQUESTED ||
slot->response != NULL ||
slot->my_ring_index != MyPState->ring_receive)
slot->my_ring_index != my_ring_index)
{
neon_shard_log(slot->shard_no, PANIC,
"Incorrect prefetch slot state after receive: status=%d response=%p my=" UINT64_FORMAT " receive=" UINT64_FORMAT "",
slot->status, slot->response,
slot->my_ring_index, MyPState->ring_receive);
slot->my_ring_index, my_ring_index);
}
/* update prefetch state */
MyPState->n_responses_buffered += 1;
@@ -665,6 +710,9 @@ consume_prefetch_responses(void)
{
if (MyPState->ring_receive < MyPState->ring_unused)
prefetch_wait_for(MyPState->ring_unused - 1);
minPrefetchLsn[MyProcNumber] = InvalidXLogRecPtr; /* No more in-flight prefetch requests from this backend */
/*
* We know for sure we're not working on any prefetch pages after
* this.
@@ -806,6 +854,15 @@ prefetch_read(PrefetchRequest *slot)
old = MemoryContextSwitchTo(MyPState->errctx);
response = (NeonResponse *) page_server->receive(shard_no);
MemoryContextSwitchTo(old);
/* Update min in-flight prefetch reqwuest LSN */
if (my_ring_index + 1 < MyPState->ring_unused)
{
PrefetchRequest* next_slot = GetPrfSlot(my_ring_index + 1);
Assert(minPrefetchLsn[MyProcNumber] <= next_slot->request_lsns.request_lsn);
minPrefetchLsn[MyProcNumber] = next_slot->request_lsns.request_lsn;
}
if (response)
{
check_getpage_response(slot, response);
@@ -924,6 +981,8 @@ prefetch_on_ps_disconnect(void)
MyNeonCounters->getpage_prefetch_discards_total += 1;
}
minPrefetchLsn[MyProcNumber] = InvalidXLogRecPtr; /* No more in-flight prefetch requests from this backend */
/*
* We can have gone into retry due to network error, so update stats with
* the latest available
@@ -1025,6 +1084,8 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns
Assert(slot->response == NULL);
Assert(slot->my_ring_index == MyPState->ring_unused);
minPrefetchLsn[MyProcNumber] = Min(request.hdr.lsn, minPrefetchLsn[MyProcNumber]);
while (!page_server->send(slot->shard_no, (NeonRequest *) &request))
{
Assert(mySlotNo == MyPState->ring_unused);
@@ -1045,6 +1106,34 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns
Assert(!found);
}
/*
* Check that pahge LSN returned by PS to replica is not beyand replay LSN.
* It can happen only in case of deteriorated lease.
*/
static bool
check_page_lsn(NeonGetPageResponse* resp, XLogRecPtr* replay_lsn_ptr)
{
if (RecoveryInProgress())
{
XLogRecPtr page_lsn = PageGetLSN((Page)resp->page);
#if PG_VERSION_NUM >= 150000
XLogRecPtr replay_lsn = GetCurrentReplayRecPtr(NULL);
#else
/*
* PG14 doesn't have GetCurrentReplayRecPtr() function which returns end of currently applied record.
* And GetXLogReplayRecPtr returns end of WAL records which was already applied.
* So we have to use this hack with resp->req.lsn which is expected to contain end record ptr in this case.
* But it works onlyfor v3 protocol version.
*/
XLogRecPtr replay_lsn = Max(GetXLogReplayRecPtr(NULL), resp->req.hdr.lsn);
#endif
if (replay_lsn_ptr)
*replay_lsn_ptr = replay_lsn;
return replay_lsn == 0 || page_lsn <= replay_lsn;
}
return true;
}
/*
* Lookup of already received prefetch requests. Only already received responses matching required LSNs are accepted.
* Present pages are marked in "mask" bitmap and total number of such pages is returned.
@@ -1068,7 +1157,7 @@ communicator_prefetch_lookupv(NRelFileInfo rinfo, ForkNumber forknum, BlockNumbe
for (int i = 0; i < nblocks; i++)
{
PrfHashEntry *entry;
NeonGetPageResponse* resp;
hashkey.buftag.blockNum = blocknum + i;
entry = prfh_lookup(MyPState->prf_hash, &hashkey);
@@ -1101,8 +1190,16 @@ communicator_prefetch_lookupv(NRelFileInfo rinfo, ForkNumber forknum, BlockNumbe
continue;
}
Assert(slot->response->tag == T_NeonGetPageResponse); /* checked by check_getpage_response when response was assigned to the slot */
memcpy(buffers[i], ((NeonGetPageResponse*)slot->response)->page, BLCKSZ);
resp = (NeonGetPageResponse*)slot->response;
/*
* Ignore "in-future" responses caused by deteriorated lease
*/
if (!check_page_lsn(resp, NULL))
{
continue;
}
memcpy(buffers[i], resp->page, BLCKSZ);
/*
* With lfc_store_prefetch_result=true prefetch result is stored in LFC in prefetch_pump_state when response is received
@@ -2227,6 +2324,15 @@ Retry:
case T_NeonGetPageResponse:
{
NeonGetPageResponse* getpage_resp = (NeonGetPageResponse *) resp;
XLogRecPtr replay_lsn;
if (!check_page_lsn(getpage_resp, &replay_lsn))
{
/* Alternative to throw error is to repeat the query with request_lsn=replay_lsn */
ereport(ERROR,
(errcode(ERRCODE_IO_ERROR),
errmsg("There is no more version of page %u of relation %u/%u/%u.%u at LSN %X/%X at page server, request LSN %X/%X, latest version is at LSN %X/%X",
blockno, RelFileInfoFmt(rinfo), forkNum, LSN_FORMAT_ARGS(replay_lsn), LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(PageGetLSN((Page)getpage_resp->page)))));
}
memcpy(buffer, getpage_resp->page, BLCKSZ);
/*
@@ -2577,3 +2683,14 @@ communicator_processinterrupts(void)
return prev_interrupt_cb();
}
XLogRecPtr communicator_get_min_prefetch_lsn(void)
{
XLogRecPtr min_lsn = InvalidXLogRecPtr;
size_t n_procs = ProcGlobal->allProcCount;
for (size_t i = 0; i < n_procs; i++)
{
min_lsn = Min(min_lsn, minPrefetchLsn[i]);
}
return min_lsn;
}

View File

@@ -46,5 +46,6 @@ extern int communicator_read_slru_segment(SlruKind kind, int64 segno,
extern void communicator_reconfigure_timeout_if_needed(void);
extern void communicator_prefetch_pump_state(void);
extern XLogRecPtr communicator_get_min_prefetch_lsn(void);
#endif

View File

@@ -219,10 +219,6 @@ static char *lfc_path;
static uint64 lfc_generation;
static FileCacheControl *lfc_ctl;
static bool lfc_do_prewarm;
static shmem_startup_hook_type prev_shmem_startup_hook;
#if PG_VERSION_NUM>=150000
static shmem_request_hook_type prev_shmem_request_hook;
#endif
bool lfc_store_prefetch_result;
bool lfc_prewarm_update_ws_estimation;
@@ -342,19 +338,12 @@ lfc_ensure_opened(void)
return true;
}
static void
lfc_shmem_startup(void)
void
LfcShmemInit(void)
{
bool found;
static HASHCTL info;
if (prev_shmem_startup_hook)
{
prev_shmem_startup_hook();
}
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
lfc_ctl = (FileCacheControl *) ShmemInitStruct("lfc", sizeof(FileCacheControl), &found);
if (!found)
{
@@ -398,17 +387,11 @@ lfc_shmem_startup(void)
ConditionVariableInit(&lfc_ctl->cv[i]);
}
LWLockRelease(AddinShmemInitLock);
}
static void
lfc_shmem_request(void)
void
LfcShmemRequest(void)
{
#if PG_VERSION_NUM>=150000
if (prev_shmem_request_hook)
prev_shmem_request_hook();
#endif
RequestAddinShmemSpace(sizeof(FileCacheControl) + hash_estimate_size(SIZE_MB_TO_CHUNKS(lfc_max_size) + 1, FILE_CACHE_ENRTY_SIZE));
RequestNamedLWLockTranche("lfc_lock", 1);
}
@@ -645,15 +628,6 @@ lfc_init(void)
if (lfc_max_size == 0)
return;
prev_shmem_startup_hook = shmem_startup_hook;
shmem_startup_hook = lfc_shmem_startup;
#if PG_VERSION_NUM>=150000
prev_shmem_request_hook = shmem_request_hook;
shmem_request_hook = lfc_shmem_request;
#else
lfc_shmem_request();
#endif
}
FileCacheState*

View File

@@ -118,10 +118,6 @@ typedef struct
ShardMap shard_map;
} PagestoreShmemState;
#if PG_VERSION_NUM >= 150000
static shmem_request_hook_type prev_shmem_request_hook = NULL;
#endif
static shmem_startup_hook_type prev_shmem_startup_hook;
static PagestoreShmemState *pagestore_shared;
static uint64 pagestore_local_counter = 0;
@@ -1284,18 +1280,12 @@ check_neon_id(char **newval, void **extra, GucSource source)
return **newval == '\0' || HexDecodeString(id, *newval, 16);
}
static Size
PagestoreShmemSize(void)
{
return add_size(sizeof(PagestoreShmemState), NeonPerfCountersShmemSize());
}
static bool
void
PagestoreShmemInit(void)
{
bool found;
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
pagestore_shared = ShmemInitStruct("libpagestore shared state",
sizeof(PagestoreShmemState),
&found);
@@ -1306,44 +1296,12 @@ PagestoreShmemInit(void)
memset(&pagestore_shared->shard_map, 0, sizeof(ShardMap));
AssignPageserverConnstring(page_server_connstring, NULL);
}
NeonPerfCountersShmemInit();
LWLockRelease(AddinShmemInitLock);
return found;
}
static void
pagestore_shmem_startup_hook(void)
void
PagestoreShmemRequest(void)
{
if (prev_shmem_startup_hook)
prev_shmem_startup_hook();
PagestoreShmemInit();
}
static void
pagestore_shmem_request(void)
{
#if PG_VERSION_NUM >= 150000
if (prev_shmem_request_hook)
prev_shmem_request_hook();
#endif
RequestAddinShmemSpace(PagestoreShmemSize());
}
static void
pagestore_prepare_shmem(void)
{
#if PG_VERSION_NUM >= 150000
prev_shmem_request_hook = shmem_request_hook;
shmem_request_hook = pagestore_shmem_request;
#else
pagestore_shmem_request();
#endif
prev_shmem_startup_hook = shmem_startup_hook;
shmem_startup_hook = pagestore_shmem_startup_hook;
RequestAddinShmemSpace(sizeof(PagestoreShmemState));
}
/*
@@ -1352,8 +1310,6 @@ pagestore_prepare_shmem(void)
void
pg_init_libpagestore(void)
{
pagestore_prepare_shmem();
DefineCustomStringVariable("neon.pageserver_connstring",
"connection string to the page server",
NULL,
@@ -1504,8 +1460,6 @@ pg_init_libpagestore(void)
0,
NULL, NULL, NULL);
relsize_hash_init();
if (page_server != NULL)
neon_log(ERROR, "libpagestore already loaded");

View File

@@ -22,6 +22,7 @@
#include "replication/slot.h"
#include "replication/walsender.h"
#include "storage/proc.h"
#include "storage/ipc.h"
#include "funcapi.h"
#include "access/htup_details.h"
#include "utils/builtins.h"
@@ -59,11 +60,15 @@ static ExecutorEnd_hook_type prev_ExecutorEnd = NULL;
static void neon_ExecutorStart(QueryDesc *queryDesc, int eflags);
static void neon_ExecutorEnd(QueryDesc *queryDesc);
#if PG_MAJORVERSION_NUM >= 16
static shmem_startup_hook_type prev_shmem_startup_hook;
static void neon_shmem_startup_hook(void);
static void neon_shmem_request_hook(void);
#if PG_MAJORVERSION_NUM >= 15
static shmem_request_hook_type prev_shmem_request_hook = NULL;
#endif
#if PG_MAJORVERSION_NUM >= 17
uint32 WAIT_EVENT_NEON_LFC_MAINTENANCE;
uint32 WAIT_EVENT_NEON_LFC_READ;
@@ -450,15 +455,22 @@ _PG_init(void)
*/
#if PG_VERSION_NUM >= 160000
load_file("$libdir/neon_rmgr", false);
#endif
#if PG_VERSION_NUM >= 150000
prev_shmem_request_hook = shmem_request_hook;
shmem_request_hook = neon_shmem_request_hook;
#else
neon_shmem_request_hook();
#endif
prev_shmem_startup_hook = shmem_startup_hook;
shmem_startup_hook = neon_shmem_startup_hook;
#endif
/* dummy call to a Rust function in the communicator library, to check that it works */
(void) communicator_dummy(123);
pg_init_libpagestore();
relsize_hash_init();
lfc_init();
pg_init_walproposer();
init_lwlsncache();
@@ -637,7 +649,23 @@ approximate_working_set_size(PG_FUNCTION_ARGS)
PG_RETURN_INT32(dc);
}
#if PG_MAJORVERSION_NUM >= 16
static void
neon_shmem_request_hook(void)
{
#if PG_VERSION_NUM >= 150000
if (prev_shmem_request_hook)
prev_shmem_request_hook();
#endif
LfcShmemRequest();
NeonPerfCountersShmemRequest();
PagestoreShmemRequest();
RelsizeCacheShmemRequest();
CommunicatorShmemRequest();
WalproposerShmemRequest();
LwLsnCacheShmemRequest();
}
static void
neon_shmem_startup_hook(void)
{
@@ -645,6 +673,16 @@ neon_shmem_startup_hook(void)
if (prev_shmem_startup_hook)
prev_shmem_startup_hook();
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
LfcShmemInit();
NeonPerfCountersShmemInit();
PagestoreShmemInit();
RelsizeCacheShmemInit();
CommunicatorShmemInit();
WalproposerShmemInit();
LwLsnCacheShmemInit();
#if PG_MAJORVERSION_NUM >= 17
WAIT_EVENT_NEON_LFC_MAINTENANCE = WaitEventExtensionNew("Neon/FileCache_Maintenance");
WAIT_EVENT_NEON_LFC_READ = WaitEventExtensionNew("Neon/FileCache_Read");
@@ -657,8 +695,9 @@ neon_shmem_startup_hook(void)
WAIT_EVENT_NEON_PS_READ = WaitEventExtensionNew("Neon/PS_ReadIO");
WAIT_EVENT_NEON_WAL_DL = WaitEventExtensionNew("Neon/WAL_Download");
#endif
LWLockRelease(AddinShmemInitLock);
}
#endif
/*
* ExecutorStart hook: start up tracking if needed

View File

@@ -71,4 +71,21 @@ extern PGDLLEXPORT void WalProposerSync(int argc, char *argv[]);
extern PGDLLEXPORT void WalProposerMain(Datum main_arg);
extern PGDLLEXPORT void LogicalSlotsMonitorMain(Datum main_arg);
extern void LfcShmemRequest(void);
extern void PagestoreShmemRequest(void);
extern void RelsizeCacheShmemRequest(void);
extern void CommunicatorShmemRequest(void);
extern void WalproposerShmemRequest(void);
extern void LwLsnCacheShmemRequest(void);
extern void NeonPerfCountersShmemRequest(void);
extern void LfcShmemInit(void);
extern void PagestoreShmemInit(void);
extern void RelsizeCacheShmemInit(void);
extern void CommunicatorShmemInit(void);
extern void WalproposerShmemInit(void);
extern void LwLsnCacheShmemInit(void);
extern void NeonPerfCountersShmemInit(void);
#endif /* NEON_H */

View File

@@ -1,5 +1,6 @@
#include "postgres.h"
#include "neon.h"
#include "neon_lwlsncache.h"
#include "miscadmin.h"
@@ -81,14 +82,6 @@ static set_max_lwlsn_hook_type prev_set_max_lwlsn_hook = NULL;
static set_lwlsn_relation_hook_type prev_set_lwlsn_relation_hook = NULL;
static set_lwlsn_db_hook_type prev_set_lwlsn_db_hook = NULL;
static shmem_startup_hook_type prev_shmem_startup_hook;
#if PG_VERSION_NUM >= 150000
static shmem_request_hook_type prev_shmem_request_hook;
#endif
static void shmemrequest(void);
static void shmeminit(void);
static void neon_set_max_lwlsn(XLogRecPtr lsn);
void
@@ -99,16 +92,6 @@ init_lwlsncache(void)
lwlc_register_gucs();
prev_shmem_startup_hook = shmem_startup_hook;
shmem_startup_hook = shmeminit;
#if PG_VERSION_NUM >= 150000
prev_shmem_request_hook = shmem_request_hook;
shmem_request_hook = shmemrequest;
#else
shmemrequest();
#endif
prev_set_lwlsn_block_range_hook = set_lwlsn_block_range_hook;
set_lwlsn_block_range_hook = neon_set_lwlsn_block_range;
prev_set_lwlsn_block_v_hook = set_lwlsn_block_v_hook;
@@ -124,20 +107,19 @@ init_lwlsncache(void)
}
static void shmemrequest(void) {
void
LwLsnCacheShmemRequest(void)
{
Size requested_size = sizeof(LwLsnCacheCtl);
requested_size += hash_estimate_size(lwlsn_cache_size, sizeof(LastWrittenLsnCacheEntry));
RequestAddinShmemSpace(requested_size);
#if PG_VERSION_NUM >= 150000
if (prev_shmem_request_hook)
prev_shmem_request_hook();
#endif
}
static void shmeminit(void) {
void
LwLsnCacheShmemInit(void)
{
static HASHCTL info;
bool found;
if (lwlsn_cache_size > 0)
@@ -157,9 +139,6 @@ static void shmeminit(void) {
}
dlist_init(&LwLsnCache->lastWrittenLsnLRU);
LwLsnCache->maxLastWrittenLsn = GetRedoRecPtr();
if (prev_shmem_startup_hook) {
prev_shmem_startup_hook();
}
}
/*

View File

@@ -17,22 +17,21 @@
#include "storage/shmem.h"
#include "utils/builtins.h"
#include "neon.h"
#include "neon_perf_counters.h"
#include "neon_pgversioncompat.h"
neon_per_backend_counters *neon_per_backend_counters_shared;
Size
NeonPerfCountersShmemSize(void)
void
NeonPerfCountersShmemRequest(void)
{
Size size = 0;
size = add_size(size, mul_size(NUM_NEON_PERF_COUNTER_SLOTS,
sizeof(neon_per_backend_counters)));
return size;
Size size = mul_size(NUM_NEON_PERF_COUNTER_SLOTS, sizeof(neon_per_backend_counters));
RequestAddinShmemSpace(size);
}
void
NeonPerfCountersShmemInit(void)
{

View File

@@ -250,7 +250,6 @@ extern const f_smgr *smgr_neon(ProcNumber backend, NRelFileInfo rinfo);
extern void smgr_init_neon(void);
extern void readahead_buffer_resize(int newsize, void *extra);
/*
* LSN values associated with each request to the pageserver
*/

View File

@@ -10,6 +10,7 @@
*/
#include "postgres.h"
#include "neon.h"
#include "neon_pgversioncompat.h"
#include "pagestore_client.h"
@@ -53,11 +54,6 @@ static HTAB *relsize_hash;
static LWLockId relsize_lock;
static int relsize_hash_size;
static RelSizeHashControl* relsize_ctl;
static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
#if PG_VERSION_NUM >= 150000
static shmem_request_hook_type prev_shmem_request_hook = NULL;
static void relsize_shmem_request(void);
#endif
/*
* Size of a cache entry is 36 bytes. So this default will take about 2.3 MB,
@@ -65,16 +61,12 @@ static void relsize_shmem_request(void);
*/
#define DEFAULT_RELSIZE_HASH_SIZE (64 * 1024)
static void
neon_smgr_shmem_startup(void)
void
RelsizeCacheShmemInit(void)
{
static HASHCTL info;
bool found;
if (prev_shmem_startup_hook)
prev_shmem_startup_hook();
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
relsize_ctl = (RelSizeHashControl *) ShmemInitStruct("relsize_hash", sizeof(RelSizeHashControl), &found);
if (!found)
{
@@ -85,7 +77,6 @@ neon_smgr_shmem_startup(void)
relsize_hash_size, relsize_hash_size,
&info,
HASH_ELEM | HASH_BLOBS);
LWLockRelease(AddinShmemInitLock);
relsize_ctl->size = 0;
relsize_ctl->hits = 0;
relsize_ctl->misses = 0;
@@ -242,34 +233,15 @@ relsize_hash_init(void)
PGC_POSTMASTER,
0,
NULL, NULL, NULL);
if (relsize_hash_size > 0)
{
#if PG_VERSION_NUM >= 150000
prev_shmem_request_hook = shmem_request_hook;
shmem_request_hook = relsize_shmem_request;
#else
RequestAddinShmemSpace(hash_estimate_size(relsize_hash_size, sizeof(RelSizeEntry)));
RequestNamedLWLockTranche("neon_relsize", 1);
#endif
prev_shmem_startup_hook = shmem_startup_hook;
shmem_startup_hook = neon_smgr_shmem_startup;
}
}
#if PG_VERSION_NUM >= 150000
/*
* shmem_request hook: request additional shared resources. We'll allocate or
* attach to the shared resources in neon_smgr_shmem_startup().
*/
static void
relsize_shmem_request(void)
void
RelsizeCacheShmemRequest(void)
{
if (prev_shmem_request_hook)
prev_shmem_request_hook();
RequestAddinShmemSpace(sizeof(RelSizeHashControl) + hash_estimate_size(relsize_hash_size, sizeof(RelSizeEntry)));
RequestNamedLWLockTranche("neon_relsize", 1);
}
#endif

View File

@@ -83,10 +83,8 @@ static XLogRecPtr standby_flush_lsn = InvalidXLogRecPtr;
static XLogRecPtr standby_apply_lsn = InvalidXLogRecPtr;
static HotStandbyFeedback agg_hs_feedback;
static void nwp_shmem_startup_hook(void);
static void nwp_register_gucs(void);
static void assign_neon_safekeepers(const char *newval, void *extra);
static void nwp_prepare_shmem(void);
static uint64 backpressure_lag_impl(void);
static uint64 startup_backpressure_wrap(void);
static bool backpressure_throttling_impl(void);
@@ -99,11 +97,6 @@ static TimestampTz walprop_pg_get_current_timestamp(WalProposer *wp);
static void walprop_pg_load_libpqwalreceiver(void);
static process_interrupts_callback_t PrevProcessInterruptsCallback = NULL;
static shmem_startup_hook_type prev_shmem_startup_hook_type;
#if PG_VERSION_NUM >= 150000
static shmem_request_hook_type prev_shmem_request_hook = NULL;
static void walproposer_shmem_request(void);
#endif
static void WalproposerShmemInit_SyncSafekeeper(void);
@@ -193,8 +186,6 @@ pg_init_walproposer(void)
nwp_register_gucs();
nwp_prepare_shmem();
delay_backend_us = &startup_backpressure_wrap;
PrevProcessInterruptsCallback = ProcessInterruptsCallback;
ProcessInterruptsCallback = backpressure_throttling_impl;
@@ -482,12 +473,11 @@ WalproposerShmemSize(void)
return sizeof(WalproposerShmemState);
}
static bool
void
WalproposerShmemInit(void)
{
bool found;
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
walprop_shared = ShmemInitStruct("Walproposer shared state",
sizeof(WalproposerShmemState),
&found);
@@ -504,9 +494,6 @@ WalproposerShmemInit(void)
pg_atomic_init_u32(&walprop_shared->wal_rate_limiter.should_limit, 0);
/* END_HADRON */
}
LWLockRelease(AddinShmemInitLock);
return found;
}
static void
@@ -609,42 +596,15 @@ walprop_register_bgworker(void)
/* shmem handling */
static void
nwp_prepare_shmem(void)
{
#if PG_VERSION_NUM >= 150000
prev_shmem_request_hook = shmem_request_hook;
shmem_request_hook = walproposer_shmem_request;
#else
RequestAddinShmemSpace(WalproposerShmemSize());
#endif
prev_shmem_startup_hook_type = shmem_startup_hook;
shmem_startup_hook = nwp_shmem_startup_hook;
}
#if PG_VERSION_NUM >= 150000
/*
* shmem_request hook: request additional shared resources. We'll allocate or
* attach to the shared resources in nwp_shmem_startup_hook().
* attach to the shared resources in WalproposerShmemInit().
*/
static void
walproposer_shmem_request(void)
void
WalproposerShmemRequest(void)
{
if (prev_shmem_request_hook)
prev_shmem_request_hook();
RequestAddinShmemSpace(WalproposerShmemSize());
}
#endif
static void
nwp_shmem_startup_hook(void)
{
if (prev_shmem_startup_hook_type)
prev_shmem_startup_hook_type();
WalproposerShmemInit();
}
WalproposerShmemState *
GetWalpropShmemState(void)