From 6e2af7ac3f3bc96783272dd1c88ce65e5990cb22 Mon Sep 17 00:00:00 2001 From: Kosntantin Knizhnik Date: Tue, 15 Jul 2025 18:17:11 +0300 Subject: [PATCH] Add function calculating min prefetch request LSN to be used for replica leases --- pgxn/neon/communicator.c | 131 +++++++++++++++++++++++++++++++-- pgxn/neon/communicator.h | 1 + pgxn/neon/file_cache.c | 34 +-------- pgxn/neon/libpagestore.c | 54 +------------- pgxn/neon/neon.c | 49 ++++++++++-- pgxn/neon/neon.h | 17 +++++ pgxn/neon/neon_lwlsncache.c | 37 ++-------- pgxn/neon/neon_perf_counters.c | 15 ++-- pgxn/neon/pagestore_client.h | 1 - pgxn/neon/relsize_cache.c | 38 ++-------- pgxn/neon/walproposer_pg.c | 48 +----------- 11 files changed, 218 insertions(+), 207 deletions(-) diff --git a/pgxn/neon/communicator.c b/pgxn/neon/communicator.c index 158b8940a3..da2f0643d1 100644 --- a/pgxn/neon/communicator.c +++ b/pgxn/neon/communicator.c @@ -54,6 +54,7 @@ */ #include "postgres.h" +#include "access/twophase.h" #include "access/xlog.h" #include "access/xlogdefs.h" #include "access/xlog_internal.h" @@ -75,11 +76,18 @@ #include "neon_perf_counters.h" #include "pagestore_client.h" -#if PG_VERSION_NUM >= 150000 +#if PG_MAJORVERSION_NUM >= 17 +#include "storage/procnumber.h" +#else +#define MyProcNumber MyProc->pgprocno +#endif + + +#if PG_MAJORVERSION_NUM >= 15 #include "access/xlogrecovery.h" #endif -#if PG_VERSION_NUM < 160000 +#if PG_MAJORVERSION_NUM < 16 typedef PGAlignedBlock PGIOAlignedBlock; #endif @@ -293,6 +301,7 @@ static PrefetchState *MyPState; ) static process_interrupts_callback_t prev_interrupt_cb; +static XLogRecPtr* minPrefetchLsn; static bool compact_prefetch_buffers(void); static void consume_prefetch_responses(void); @@ -316,6 +325,33 @@ pg_init_communicator(void) ProcessInterruptsCallback = communicator_processinterrupts; } +static Size +CommunicatorShmemSize(void) +{ + return (MaxBackends + NUM_AUXILIARY_PROCS + max_prepared_xacts) * sizeof(XLogRecPtr); +} + +void +CommunicatorShmemRequest(void) +{ + RequestAddinShmemSpace(CommunicatorShmemSize()); +} + +void +CommunicatorShmemInit(void) +{ + bool found; + minPrefetchLsn = (XLogRecPtr*)ShmemInitStruct("Communicator shared state", + CommunicatorShmemSize(), + &found); + if (!found) + { + /* Fill with MAX_UINT64 */ + memset(minPrefetchLsn, 0xFF, CommunicatorShmemSize()); + } +} + + static bool compact_prefetch_buffers(void) { @@ -478,8 +514,9 @@ communicator_prefetch_pump_state(void) NeonResponse *response; PrefetchRequest *slot; MemoryContext old; + uint64 my_ring_index = MyPState->ring_receive; - slot = GetPrfSlot(MyPState->ring_receive); + slot = GetPrfSlot(my_ring_index); old = MemoryContextSwitchTo(MyPState->errctx); response = page_server->try_receive(slot->shard_no); @@ -488,17 +525,25 @@ communicator_prefetch_pump_state(void) if (response == NULL) break; + /* Update min in-flight prefetch reqwuest LSN */ + if (my_ring_index + 1 < MyPState->ring_unused) + { + PrefetchRequest* next_slot = GetPrfSlot(my_ring_index + 1); + Assert(minPrefetchLsn[MyProcNumber] <= next_slot->request_lsns.request_lsn); + minPrefetchLsn[MyProcNumber] = next_slot->request_lsns.request_lsn; + } + check_getpage_response(slot, response); /* The slot should still be valid */ if (slot->status != PRFS_REQUESTED || slot->response != NULL || - slot->my_ring_index != MyPState->ring_receive) + slot->my_ring_index != my_ring_index) { neon_shard_log(slot->shard_no, PANIC, "Incorrect prefetch slot state after receive: status=%d response=%p my=" UINT64_FORMAT " receive=" UINT64_FORMAT "", slot->status, slot->response, - slot->my_ring_index, MyPState->ring_receive); + slot->my_ring_index, my_ring_index); } /* update prefetch state */ MyPState->n_responses_buffered += 1; @@ -665,6 +710,9 @@ consume_prefetch_responses(void) { if (MyPState->ring_receive < MyPState->ring_unused) prefetch_wait_for(MyPState->ring_unused - 1); + + minPrefetchLsn[MyProcNumber] = InvalidXLogRecPtr; /* No more in-flight prefetch requests from this backend */ + /* * We know for sure we're not working on any prefetch pages after * this. @@ -806,6 +854,15 @@ prefetch_read(PrefetchRequest *slot) old = MemoryContextSwitchTo(MyPState->errctx); response = (NeonResponse *) page_server->receive(shard_no); MemoryContextSwitchTo(old); + + /* Update min in-flight prefetch reqwuest LSN */ + if (my_ring_index + 1 < MyPState->ring_unused) + { + PrefetchRequest* next_slot = GetPrfSlot(my_ring_index + 1); + Assert(minPrefetchLsn[MyProcNumber] <= next_slot->request_lsns.request_lsn); + minPrefetchLsn[MyProcNumber] = next_slot->request_lsns.request_lsn; + } + if (response) { check_getpage_response(slot, response); @@ -924,6 +981,8 @@ prefetch_on_ps_disconnect(void) MyNeonCounters->getpage_prefetch_discards_total += 1; } + minPrefetchLsn[MyProcNumber] = InvalidXLogRecPtr; /* No more in-flight prefetch requests from this backend */ + /* * We can have gone into retry due to network error, so update stats with * the latest available @@ -1025,6 +1084,8 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns Assert(slot->response == NULL); Assert(slot->my_ring_index == MyPState->ring_unused); + minPrefetchLsn[MyProcNumber] = Min(request.hdr.lsn, minPrefetchLsn[MyProcNumber]); + while (!page_server->send(slot->shard_no, (NeonRequest *) &request)) { Assert(mySlotNo == MyPState->ring_unused); @@ -1045,6 +1106,34 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns Assert(!found); } +/* + * Check that pahge LSN returned by PS to replica is not beyand replay LSN. + * It can happen only in case of deteriorated lease. + */ +static bool +check_page_lsn(NeonGetPageResponse* resp, XLogRecPtr* replay_lsn_ptr) +{ + if (RecoveryInProgress()) + { + XLogRecPtr page_lsn = PageGetLSN((Page)resp->page); +#if PG_VERSION_NUM >= 150000 + XLogRecPtr replay_lsn = GetCurrentReplayRecPtr(NULL); +#else + /* + * PG14 doesn't have GetCurrentReplayRecPtr() function which returns end of currently applied record. + * And GetXLogReplayRecPtr returns end of WAL records which was already applied. + * So we have to use this hack with resp->req.lsn which is expected to contain end record ptr in this case. + * But it works onlyfor v3 protocol version. + */ + XLogRecPtr replay_lsn = Max(GetXLogReplayRecPtr(NULL), resp->req.hdr.lsn); +#endif + if (replay_lsn_ptr) + *replay_lsn_ptr = replay_lsn; + return replay_lsn == 0 || page_lsn <= replay_lsn; + } + return true; +} + /* * Lookup of already received prefetch requests. Only already received responses matching required LSNs are accepted. * Present pages are marked in "mask" bitmap and total number of such pages is returned. @@ -1068,7 +1157,7 @@ communicator_prefetch_lookupv(NRelFileInfo rinfo, ForkNumber forknum, BlockNumbe for (int i = 0; i < nblocks; i++) { PrfHashEntry *entry; - + NeonGetPageResponse* resp; hashkey.buftag.blockNum = blocknum + i; entry = prfh_lookup(MyPState->prf_hash, &hashkey); @@ -1101,8 +1190,16 @@ communicator_prefetch_lookupv(NRelFileInfo rinfo, ForkNumber forknum, BlockNumbe continue; } Assert(slot->response->tag == T_NeonGetPageResponse); /* checked by check_getpage_response when response was assigned to the slot */ - memcpy(buffers[i], ((NeonGetPageResponse*)slot->response)->page, BLCKSZ); + resp = (NeonGetPageResponse*)slot->response; + /* + * Ignore "in-future" responses caused by deteriorated lease + */ + if (!check_page_lsn(resp, NULL)) + { + continue; + } + memcpy(buffers[i], resp->page, BLCKSZ); /* * With lfc_store_prefetch_result=true prefetch result is stored in LFC in prefetch_pump_state when response is received @@ -2227,6 +2324,15 @@ Retry: case T_NeonGetPageResponse: { NeonGetPageResponse* getpage_resp = (NeonGetPageResponse *) resp; + XLogRecPtr replay_lsn; + if (!check_page_lsn(getpage_resp, &replay_lsn)) + { + /* Alternative to throw error is to repeat the query with request_lsn=replay_lsn */ + ereport(ERROR, + (errcode(ERRCODE_IO_ERROR), + errmsg("There is no more version of page %u of relation %u/%u/%u.%u at LSN %X/%X at page server, request LSN %X/%X, latest version is at LSN %X/%X", + blockno, RelFileInfoFmt(rinfo), forkNum, LSN_FORMAT_ARGS(replay_lsn), LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(PageGetLSN((Page)getpage_resp->page))))); + } memcpy(buffer, getpage_resp->page, BLCKSZ); /* @@ -2577,3 +2683,14 @@ communicator_processinterrupts(void) return prev_interrupt_cb(); } + +XLogRecPtr communicator_get_min_prefetch_lsn(void) +{ + XLogRecPtr min_lsn = InvalidXLogRecPtr; + size_t n_procs = ProcGlobal->allProcCount; + for (size_t i = 0; i < n_procs; i++) + { + min_lsn = Min(min_lsn, minPrefetchLsn[i]); + } + return min_lsn; +} diff --git a/pgxn/neon/communicator.h b/pgxn/neon/communicator.h index 5376c9b839..4f5dc30f15 100644 --- a/pgxn/neon/communicator.h +++ b/pgxn/neon/communicator.h @@ -46,5 +46,6 @@ extern int communicator_read_slru_segment(SlruKind kind, int64 segno, extern void communicator_reconfigure_timeout_if_needed(void); extern void communicator_prefetch_pump_state(void); +extern XLogRecPtr communicator_get_min_prefetch_lsn(void); #endif diff --git a/pgxn/neon/file_cache.c b/pgxn/neon/file_cache.c index 2c87f139af..0886f21584 100644 --- a/pgxn/neon/file_cache.c +++ b/pgxn/neon/file_cache.c @@ -219,10 +219,6 @@ static char *lfc_path; static uint64 lfc_generation; static FileCacheControl *lfc_ctl; static bool lfc_do_prewarm; -static shmem_startup_hook_type prev_shmem_startup_hook; -#if PG_VERSION_NUM>=150000 -static shmem_request_hook_type prev_shmem_request_hook; -#endif bool lfc_store_prefetch_result; bool lfc_prewarm_update_ws_estimation; @@ -342,19 +338,12 @@ lfc_ensure_opened(void) return true; } -static void -lfc_shmem_startup(void) +void +LfcShmemInit(void) { bool found; static HASHCTL info; - if (prev_shmem_startup_hook) - { - prev_shmem_startup_hook(); - } - - LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE); - lfc_ctl = (FileCacheControl *) ShmemInitStruct("lfc", sizeof(FileCacheControl), &found); if (!found) { @@ -398,17 +387,11 @@ lfc_shmem_startup(void) ConditionVariableInit(&lfc_ctl->cv[i]); } - LWLockRelease(AddinShmemInitLock); } -static void -lfc_shmem_request(void) +void +LfcShmemRequest(void) { -#if PG_VERSION_NUM>=150000 - if (prev_shmem_request_hook) - prev_shmem_request_hook(); -#endif - RequestAddinShmemSpace(sizeof(FileCacheControl) + hash_estimate_size(SIZE_MB_TO_CHUNKS(lfc_max_size) + 1, FILE_CACHE_ENRTY_SIZE)); RequestNamedLWLockTranche("lfc_lock", 1); } @@ -645,15 +628,6 @@ lfc_init(void) if (lfc_max_size == 0) return; - - prev_shmem_startup_hook = shmem_startup_hook; - shmem_startup_hook = lfc_shmem_startup; -#if PG_VERSION_NUM>=150000 - prev_shmem_request_hook = shmem_request_hook; - shmem_request_hook = lfc_shmem_request; -#else - lfc_shmem_request(); -#endif } FileCacheState* diff --git a/pgxn/neon/libpagestore.c b/pgxn/neon/libpagestore.c index 05ba6da663..596258007a 100644 --- a/pgxn/neon/libpagestore.c +++ b/pgxn/neon/libpagestore.c @@ -118,10 +118,6 @@ typedef struct ShardMap shard_map; } PagestoreShmemState; -#if PG_VERSION_NUM >= 150000 -static shmem_request_hook_type prev_shmem_request_hook = NULL; -#endif -static shmem_startup_hook_type prev_shmem_startup_hook; static PagestoreShmemState *pagestore_shared; static uint64 pagestore_local_counter = 0; @@ -1284,18 +1280,12 @@ check_neon_id(char **newval, void **extra, GucSource source) return **newval == '\0' || HexDecodeString(id, *newval, 16); } -static Size -PagestoreShmemSize(void) -{ - return add_size(sizeof(PagestoreShmemState), NeonPerfCountersShmemSize()); -} -static bool +void PagestoreShmemInit(void) { bool found; - LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE); pagestore_shared = ShmemInitStruct("libpagestore shared state", sizeof(PagestoreShmemState), &found); @@ -1306,44 +1296,12 @@ PagestoreShmemInit(void) memset(&pagestore_shared->shard_map, 0, sizeof(ShardMap)); AssignPageserverConnstring(page_server_connstring, NULL); } - - NeonPerfCountersShmemInit(); - - LWLockRelease(AddinShmemInitLock); - return found; } -static void -pagestore_shmem_startup_hook(void) +void +PagestoreShmemRequest(void) { - if (prev_shmem_startup_hook) - prev_shmem_startup_hook(); - - PagestoreShmemInit(); -} - -static void -pagestore_shmem_request(void) -{ -#if PG_VERSION_NUM >= 150000 - if (prev_shmem_request_hook) - prev_shmem_request_hook(); -#endif - - RequestAddinShmemSpace(PagestoreShmemSize()); -} - -static void -pagestore_prepare_shmem(void) -{ -#if PG_VERSION_NUM >= 150000 - prev_shmem_request_hook = shmem_request_hook; - shmem_request_hook = pagestore_shmem_request; -#else - pagestore_shmem_request(); -#endif - prev_shmem_startup_hook = shmem_startup_hook; - shmem_startup_hook = pagestore_shmem_startup_hook; + RequestAddinShmemSpace(sizeof(PagestoreShmemState)); } /* @@ -1352,8 +1310,6 @@ pagestore_prepare_shmem(void) void pg_init_libpagestore(void) { - pagestore_prepare_shmem(); - DefineCustomStringVariable("neon.pageserver_connstring", "connection string to the page server", NULL, @@ -1504,8 +1460,6 @@ pg_init_libpagestore(void) 0, NULL, NULL, NULL); - relsize_hash_init(); - if (page_server != NULL) neon_log(ERROR, "libpagestore already loaded"); diff --git a/pgxn/neon/neon.c b/pgxn/neon/neon.c index 7b749f1080..8347ec63a0 100644 --- a/pgxn/neon/neon.c +++ b/pgxn/neon/neon.c @@ -22,6 +22,7 @@ #include "replication/slot.h" #include "replication/walsender.h" #include "storage/proc.h" +#include "storage/ipc.h" #include "funcapi.h" #include "access/htup_details.h" #include "utils/builtins.h" @@ -59,11 +60,15 @@ static ExecutorEnd_hook_type prev_ExecutorEnd = NULL; static void neon_ExecutorStart(QueryDesc *queryDesc, int eflags); static void neon_ExecutorEnd(QueryDesc *queryDesc); -#if PG_MAJORVERSION_NUM >= 16 static shmem_startup_hook_type prev_shmem_startup_hook; - static void neon_shmem_startup_hook(void); +static void neon_shmem_request_hook(void); + +#if PG_MAJORVERSION_NUM >= 15 +static shmem_request_hook_type prev_shmem_request_hook = NULL; #endif + + #if PG_MAJORVERSION_NUM >= 17 uint32 WAIT_EVENT_NEON_LFC_MAINTENANCE; uint32 WAIT_EVENT_NEON_LFC_READ; @@ -450,15 +455,22 @@ _PG_init(void) */ #if PG_VERSION_NUM >= 160000 load_file("$libdir/neon_rmgr", false); +#endif +#if PG_VERSION_NUM >= 150000 + prev_shmem_request_hook = shmem_request_hook; + shmem_request_hook = neon_shmem_request_hook; +#else + neon_shmem_request_hook(); +#endif prev_shmem_startup_hook = shmem_startup_hook; shmem_startup_hook = neon_shmem_startup_hook; -#endif /* dummy call to a Rust function in the communicator library, to check that it works */ (void) communicator_dummy(123); pg_init_libpagestore(); + relsize_hash_init(); lfc_init(); pg_init_walproposer(); init_lwlsncache(); @@ -637,7 +649,23 @@ approximate_working_set_size(PG_FUNCTION_ARGS) PG_RETURN_INT32(dc); } -#if PG_MAJORVERSION_NUM >= 16 +static void +neon_shmem_request_hook(void) +{ +#if PG_VERSION_NUM >= 150000 + if (prev_shmem_request_hook) + prev_shmem_request_hook(); +#endif + LfcShmemRequest(); + NeonPerfCountersShmemRequest(); + PagestoreShmemRequest(); + RelsizeCacheShmemRequest(); + CommunicatorShmemRequest(); + WalproposerShmemRequest(); + LwLsnCacheShmemRequest(); +} + + static void neon_shmem_startup_hook(void) { @@ -645,6 +673,16 @@ neon_shmem_startup_hook(void) if (prev_shmem_startup_hook) prev_shmem_startup_hook(); + LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE); + + LfcShmemInit(); + NeonPerfCountersShmemInit(); + PagestoreShmemInit(); + RelsizeCacheShmemInit(); + CommunicatorShmemInit(); + WalproposerShmemInit(); + LwLsnCacheShmemInit(); + #if PG_MAJORVERSION_NUM >= 17 WAIT_EVENT_NEON_LFC_MAINTENANCE = WaitEventExtensionNew("Neon/FileCache_Maintenance"); WAIT_EVENT_NEON_LFC_READ = WaitEventExtensionNew("Neon/FileCache_Read"); @@ -657,8 +695,9 @@ neon_shmem_startup_hook(void) WAIT_EVENT_NEON_PS_READ = WaitEventExtensionNew("Neon/PS_ReadIO"); WAIT_EVENT_NEON_WAL_DL = WaitEventExtensionNew("Neon/WAL_Download"); #endif + + LWLockRelease(AddinShmemInitLock); } -#endif /* * ExecutorStart hook: start up tracking if needed diff --git a/pgxn/neon/neon.h b/pgxn/neon/neon.h index 431dacb708..e0a5acf50f 100644 --- a/pgxn/neon/neon.h +++ b/pgxn/neon/neon.h @@ -71,4 +71,21 @@ extern PGDLLEXPORT void WalProposerSync(int argc, char *argv[]); extern PGDLLEXPORT void WalProposerMain(Datum main_arg); extern PGDLLEXPORT void LogicalSlotsMonitorMain(Datum main_arg); +extern void LfcShmemRequest(void); +extern void PagestoreShmemRequest(void); +extern void RelsizeCacheShmemRequest(void); +extern void CommunicatorShmemRequest(void); +extern void WalproposerShmemRequest(void); +extern void LwLsnCacheShmemRequest(void); +extern void NeonPerfCountersShmemRequest(void); + +extern void LfcShmemInit(void); +extern void PagestoreShmemInit(void); +extern void RelsizeCacheShmemInit(void); +extern void CommunicatorShmemInit(void); +extern void WalproposerShmemInit(void); +extern void LwLsnCacheShmemInit(void); +extern void NeonPerfCountersShmemInit(void); + + #endif /* NEON_H */ diff --git a/pgxn/neon/neon_lwlsncache.c b/pgxn/neon/neon_lwlsncache.c index a8cfa0f825..5887c02c36 100644 --- a/pgxn/neon/neon_lwlsncache.c +++ b/pgxn/neon/neon_lwlsncache.c @@ -1,5 +1,6 @@ #include "postgres.h" +#include "neon.h" #include "neon_lwlsncache.h" #include "miscadmin.h" @@ -81,14 +82,6 @@ static set_max_lwlsn_hook_type prev_set_max_lwlsn_hook = NULL; static set_lwlsn_relation_hook_type prev_set_lwlsn_relation_hook = NULL; static set_lwlsn_db_hook_type prev_set_lwlsn_db_hook = NULL; -static shmem_startup_hook_type prev_shmem_startup_hook; - -#if PG_VERSION_NUM >= 150000 -static shmem_request_hook_type prev_shmem_request_hook; -#endif - -static void shmemrequest(void); -static void shmeminit(void); static void neon_set_max_lwlsn(XLogRecPtr lsn); void @@ -99,16 +92,6 @@ init_lwlsncache(void) lwlc_register_gucs(); - prev_shmem_startup_hook = shmem_startup_hook; - shmem_startup_hook = shmeminit; - - #if PG_VERSION_NUM >= 150000 - prev_shmem_request_hook = shmem_request_hook; - shmem_request_hook = shmemrequest; - #else - shmemrequest(); - #endif - prev_set_lwlsn_block_range_hook = set_lwlsn_block_range_hook; set_lwlsn_block_range_hook = neon_set_lwlsn_block_range; prev_set_lwlsn_block_v_hook = set_lwlsn_block_v_hook; @@ -124,20 +107,19 @@ init_lwlsncache(void) } -static void shmemrequest(void) { +void +LwLsnCacheShmemRequest(void) +{ Size requested_size = sizeof(LwLsnCacheCtl); - + requested_size += hash_estimate_size(lwlsn_cache_size, sizeof(LastWrittenLsnCacheEntry)); RequestAddinShmemSpace(requested_size); - - #if PG_VERSION_NUM >= 150000 - if (prev_shmem_request_hook) - prev_shmem_request_hook(); - #endif } -static void shmeminit(void) { +void +LwLsnCacheShmemInit(void) +{ static HASHCTL info; bool found; if (lwlsn_cache_size > 0) @@ -157,9 +139,6 @@ static void shmeminit(void) { } dlist_init(&LwLsnCache->lastWrittenLsnLRU); LwLsnCache->maxLastWrittenLsn = GetRedoRecPtr(); - if (prev_shmem_startup_hook) { - prev_shmem_startup_hook(); - } } /* diff --git a/pgxn/neon/neon_perf_counters.c b/pgxn/neon/neon_perf_counters.c index d0a3d15108..63321d54f2 100644 --- a/pgxn/neon/neon_perf_counters.c +++ b/pgxn/neon/neon_perf_counters.c @@ -17,22 +17,21 @@ #include "storage/shmem.h" #include "utils/builtins.h" +#include "neon.h" #include "neon_perf_counters.h" #include "neon_pgversioncompat.h" neon_per_backend_counters *neon_per_backend_counters_shared; -Size -NeonPerfCountersShmemSize(void) +void +NeonPerfCountersShmemRequest(void) { - Size size = 0; - - size = add_size(size, mul_size(NUM_NEON_PERF_COUNTER_SLOTS, - sizeof(neon_per_backend_counters))); - - return size; + Size size = mul_size(NUM_NEON_PERF_COUNTER_SLOTS, sizeof(neon_per_backend_counters)); + RequestAddinShmemSpace(size); } + + void NeonPerfCountersShmemInit(void) { diff --git a/pgxn/neon/pagestore_client.h b/pgxn/neon/pagestore_client.h index 4470d3a94d..2177e3c7d1 100644 --- a/pgxn/neon/pagestore_client.h +++ b/pgxn/neon/pagestore_client.h @@ -250,7 +250,6 @@ extern const f_smgr *smgr_neon(ProcNumber backend, NRelFileInfo rinfo); extern void smgr_init_neon(void); extern void readahead_buffer_resize(int newsize, void *extra); - /* * LSN values associated with each request to the pageserver */ diff --git a/pgxn/neon/relsize_cache.c b/pgxn/neon/relsize_cache.c index 60ca1675d9..cf8d1bd841 100644 --- a/pgxn/neon/relsize_cache.c +++ b/pgxn/neon/relsize_cache.c @@ -10,6 +10,7 @@ */ #include "postgres.h" +#include "neon.h" #include "neon_pgversioncompat.h" #include "pagestore_client.h" @@ -53,11 +54,6 @@ static HTAB *relsize_hash; static LWLockId relsize_lock; static int relsize_hash_size; static RelSizeHashControl* relsize_ctl; -static shmem_startup_hook_type prev_shmem_startup_hook = NULL; -#if PG_VERSION_NUM >= 150000 -static shmem_request_hook_type prev_shmem_request_hook = NULL; -static void relsize_shmem_request(void); -#endif /* * Size of a cache entry is 36 bytes. So this default will take about 2.3 MB, @@ -65,16 +61,12 @@ static void relsize_shmem_request(void); */ #define DEFAULT_RELSIZE_HASH_SIZE (64 * 1024) -static void -neon_smgr_shmem_startup(void) +void +RelsizeCacheShmemInit(void) { static HASHCTL info; bool found; - if (prev_shmem_startup_hook) - prev_shmem_startup_hook(); - - LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE); relsize_ctl = (RelSizeHashControl *) ShmemInitStruct("relsize_hash", sizeof(RelSizeHashControl), &found); if (!found) { @@ -85,7 +77,6 @@ neon_smgr_shmem_startup(void) relsize_hash_size, relsize_hash_size, &info, HASH_ELEM | HASH_BLOBS); - LWLockRelease(AddinShmemInitLock); relsize_ctl->size = 0; relsize_ctl->hits = 0; relsize_ctl->misses = 0; @@ -242,34 +233,15 @@ relsize_hash_init(void) PGC_POSTMASTER, 0, NULL, NULL, NULL); - - if (relsize_hash_size > 0) - { -#if PG_VERSION_NUM >= 150000 - prev_shmem_request_hook = shmem_request_hook; - shmem_request_hook = relsize_shmem_request; -#else - RequestAddinShmemSpace(hash_estimate_size(relsize_hash_size, sizeof(RelSizeEntry))); - RequestNamedLWLockTranche("neon_relsize", 1); -#endif - - prev_shmem_startup_hook = shmem_startup_hook; - shmem_startup_hook = neon_smgr_shmem_startup; - } } -#if PG_VERSION_NUM >= 150000 /* * shmem_request hook: request additional shared resources. We'll allocate or * attach to the shared resources in neon_smgr_shmem_startup(). */ -static void -relsize_shmem_request(void) +void +RelsizeCacheShmemRequest(void) { - if (prev_shmem_request_hook) - prev_shmem_request_hook(); - RequestAddinShmemSpace(sizeof(RelSizeHashControl) + hash_estimate_size(relsize_hash_size, sizeof(RelSizeEntry))); RequestNamedLWLockTranche("neon_relsize", 1); } -#endif diff --git a/pgxn/neon/walproposer_pg.c b/pgxn/neon/walproposer_pg.c index aaf8f43eeb..347b719dbf 100644 --- a/pgxn/neon/walproposer_pg.c +++ b/pgxn/neon/walproposer_pg.c @@ -83,10 +83,8 @@ static XLogRecPtr standby_flush_lsn = InvalidXLogRecPtr; static XLogRecPtr standby_apply_lsn = InvalidXLogRecPtr; static HotStandbyFeedback agg_hs_feedback; -static void nwp_shmem_startup_hook(void); static void nwp_register_gucs(void); static void assign_neon_safekeepers(const char *newval, void *extra); -static void nwp_prepare_shmem(void); static uint64 backpressure_lag_impl(void); static uint64 startup_backpressure_wrap(void); static bool backpressure_throttling_impl(void); @@ -99,11 +97,6 @@ static TimestampTz walprop_pg_get_current_timestamp(WalProposer *wp); static void walprop_pg_load_libpqwalreceiver(void); static process_interrupts_callback_t PrevProcessInterruptsCallback = NULL; -static shmem_startup_hook_type prev_shmem_startup_hook_type; -#if PG_VERSION_NUM >= 150000 -static shmem_request_hook_type prev_shmem_request_hook = NULL; -static void walproposer_shmem_request(void); -#endif static void WalproposerShmemInit_SyncSafekeeper(void); @@ -193,8 +186,6 @@ pg_init_walproposer(void) nwp_register_gucs(); - nwp_prepare_shmem(); - delay_backend_us = &startup_backpressure_wrap; PrevProcessInterruptsCallback = ProcessInterruptsCallback; ProcessInterruptsCallback = backpressure_throttling_impl; @@ -482,12 +473,11 @@ WalproposerShmemSize(void) return sizeof(WalproposerShmemState); } -static bool +void WalproposerShmemInit(void) { bool found; - LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE); walprop_shared = ShmemInitStruct("Walproposer shared state", sizeof(WalproposerShmemState), &found); @@ -504,9 +494,6 @@ WalproposerShmemInit(void) pg_atomic_init_u32(&walprop_shared->wal_rate_limiter.should_limit, 0); /* END_HADRON */ } - LWLockRelease(AddinShmemInitLock); - - return found; } static void @@ -609,42 +596,15 @@ walprop_register_bgworker(void) /* shmem handling */ -static void -nwp_prepare_shmem(void) -{ -#if PG_VERSION_NUM >= 150000 - prev_shmem_request_hook = shmem_request_hook; - shmem_request_hook = walproposer_shmem_request; -#else - RequestAddinShmemSpace(WalproposerShmemSize()); -#endif - prev_shmem_startup_hook_type = shmem_startup_hook; - shmem_startup_hook = nwp_shmem_startup_hook; -} - -#if PG_VERSION_NUM >= 150000 /* * shmem_request hook: request additional shared resources. We'll allocate or - * attach to the shared resources in nwp_shmem_startup_hook(). + * attach to the shared resources in WalproposerShmemInit(). */ -static void -walproposer_shmem_request(void) +void +WalproposerShmemRequest(void) { - if (prev_shmem_request_hook) - prev_shmem_request_hook(); - RequestAddinShmemSpace(WalproposerShmemSize()); } -#endif - -static void -nwp_shmem_startup_hook(void) -{ - if (prev_shmem_startup_hook_type) - prev_shmem_startup_hook_type(); - - WalproposerShmemInit(); -} WalproposerShmemState * GetWalpropShmemState(void)