From 42f37c27e5eef38223a17764d69bcc945f22f894 Mon Sep 17 00:00:00 2001 From: Victor Polevoy Date: Wed, 16 Jul 2025 15:36:39 +0200 Subject: [PATCH] Rename the communicator switch variable --- pgxn/neon/file_cache.c | 30 +++++++-------- pgxn/neon/libpagestore.c | 8 ++-- pgxn/neon/neon.c | 13 +++---- pgxn/neon/neon.h | 1 - pgxn/neon/pagestore_smgr.c | 54 +++++++++++++-------------- pgxn/neon/relsize_cache.c | 8 ++-- test_runner/fixtures/neon_fixtures.py | 4 +- vendor/postgres-v14 | 2 +- vendor/postgres-v15 | 2 +- vendor/postgres-v16 | 2 +- vendor/postgres-v17 | 2 +- 11 files changed, 62 insertions(+), 64 deletions(-) diff --git a/pgxn/neon/file_cache.c b/pgxn/neon/file_cache.c index 847e2ba9f6..6a6a358eca 100644 --- a/pgxn/neon/file_cache.c +++ b/pgxn/neon/file_cache.c @@ -230,7 +230,7 @@ lfc_switch_off(void) { int fd; - Assert(!neon_enable_new_communicator); + Assert(!neon_use_communicator_worker); if (LFC_ENABLED()) { @@ -297,7 +297,7 @@ lfc_maybe_disabled(void) static bool lfc_ensure_opened(void) { - Assert(!neon_enable_new_communicator); + Assert(!neon_use_communicator_worker); if (lfc_generation != lfc_ctl->generation) { @@ -324,7 +324,7 @@ lfc_shmem_startup(void) bool found; static HASHCTL info; - Assert(!neon_enable_new_communicator); + Assert(!neon_use_communicator_worker); if (prev_shmem_startup_hook) { @@ -624,7 +624,7 @@ lfc_init(void) if (lfc_max_size == 0) return; - if (neon_enable_new_communicator) + if (neon_use_communicator_worker) return; prev_shmem_startup_hook = shmem_startup_hook; @@ -702,7 +702,7 @@ lfc_prewarm(FileCacheState* fcs, uint32 n_workers) dsm_segment *seg; BackgroundWorkerHandle* bgw_handle[MAX_PREWARM_WORKERS]; - Assert(!neon_enable_new_communicator); + Assert(!neon_use_communicator_worker); if (!lfc_ensure_opened()) return; @@ -857,7 +857,7 @@ lfc_prewarm_main(Datum main_arg) PrewarmWorkerState* ws; uint32 worker_id = DatumGetInt32(main_arg); - Assert(!neon_enable_new_communicator); + Assert(!neon_use_communicator_worker); AmPrewarmWorker = true; @@ -959,7 +959,7 @@ lfc_invalidate(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber nblocks) FileCacheEntry *entry; uint32 hash; - Assert(!neon_enable_new_communicator); + Assert(!neon_use_communicator_worker); if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */ return; @@ -1006,7 +1006,7 @@ lfc_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno) bool found = false; uint32 hash; - Assert(!neon_enable_new_communicator); + Assert(!neon_use_communicator_worker); if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */ return false; @@ -1043,7 +1043,7 @@ lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, uint32 hash; int i = 0; - Assert(!neon_enable_new_communicator); + Assert(!neon_use_communicator_worker); if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */ return 0; @@ -1152,7 +1152,7 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, int blocks_read = 0; int buf_offset = 0; - Assert(!neon_enable_new_communicator); + Assert(!neon_use_communicator_worker); if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */ return -1; @@ -1520,7 +1520,7 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno, int chunk_offs = BLOCK_TO_CHUNK_OFF(blkno); - Assert(!neon_enable_new_communicator); + Assert(!neon_use_communicator_worker); if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */ return false; @@ -1667,7 +1667,7 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, uint32 entry_offset; int buf_offset = 0; - Assert(!neon_enable_new_communicator); + Assert(!neon_use_communicator_worker); if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */ return; @@ -2184,7 +2184,7 @@ get_local_cache_state(PG_FUNCTION_ARGS) size_t max_entries = PG_ARGISNULL(0) ? lfc_prewarm_limit : PG_GETARG_INT32(0); FileCacheState* fcs; - if (neon_enable_new_communicator) + if (neon_use_communicator_worker) elog(ERROR, "TODO: not implemented"); fcs = lfc_get_state(max_entries); @@ -2204,7 +2204,7 @@ prewarm_local_cache(PG_FUNCTION_ARGS) uint32 n_workers = PG_GETARG_INT32(1); FileCacheState* fcs; - if (neon_enable_new_communicator) + if (neon_use_communicator_worker) elog(ERROR, "TODO: not implemented"); fcs = (FileCacheState*)state; @@ -2227,7 +2227,7 @@ get_prewarm_info(PG_FUNCTION_ARGS) uint32 total_pages; size_t n_workers; - if (neon_enable_new_communicator) + if (neon_use_communicator_worker) elog(ERROR, "TODO: not implemented"); if (lfc_size_limit == 0) diff --git a/pgxn/neon/libpagestore.c b/pgxn/neon/libpagestore.c index 7dfc08e54a..1e41527fb5 100644 --- a/pgxn/neon/libpagestore.c +++ b/pgxn/neon/libpagestore.c @@ -123,7 +123,7 @@ static uint64 pagestore_local_counter = 0; typedef enum PSConnectionState { PS_Disconnected, /* no connection yet */ PS_Connecting_Startup, /* connection starting up */ - PS_Connecting_PageStream, /* negotiating pagestream */ + PS_Connecting_PageStream, /* negotiating pagestream */ PS_Connected, /* connected, pagestream established */ } PSConnectionState; @@ -253,7 +253,7 @@ AssignPageserverConnstring(const char *newval, void *extra) * In that case, the shard map is loaded from 'neon.pageserver_grpc_urls' * instead, and that happens in the communicator process only. */ - if (neon_enable_new_communicator) + if (neon_use_communicator_worker) return; /* @@ -395,7 +395,7 @@ get_shard_number(BufferTag *tag) } static inline void -CLEANUP_AND_DISCONNECT(PageServer *shard) +CLEANUP_AND_DISCONNECT(PageServer *shard) { if (shard->wes_read) { @@ -417,7 +417,7 @@ CLEANUP_AND_DISCONNECT(PageServer *shard) * complete the connection (e.g. due to receiving an earlier cancellation * during connection start). * Returns true if successfully connected; false if the connection failed. - * + * * Throws errors in unrecoverable situations, or when this backend's query * is canceled. */ diff --git a/pgxn/neon/neon.c b/pgxn/neon/neon.c index 548bdd9bb8..623492c506 100644 --- a/pgxn/neon/neon.c +++ b/pgxn/neon/neon.c @@ -52,7 +52,6 @@ PG_MODULE_MAGIC; void _PG_init(void); -bool neon_enable_new_communicator; static int running_xacts_overflow_policy; static bool monitor_query_exec_time = false; @@ -468,10 +467,10 @@ _PG_init(void) #endif DefineCustomBoolVariable( - "neon.enable_new_communicator", - "Enables new communicator implementation", + "neon.use_communicator_worker", + "Uses the communicator worker implementation", NULL, - &neon_enable_new_communicator, + &neon_use_communicator_worker, true, PGC_POSTMASTER, 0, @@ -483,7 +482,7 @@ _PG_init(void) init_lwlsncache(); pg_init_communicator(); - if (neon_enable_new_communicator) + if (neon_use_communicator_worker) pg_init_communicator_new(); Custom_XLogReaderRoutines = NeonOnDemandXLogReaderRoutines; @@ -639,7 +638,7 @@ approximate_working_set_size_seconds(PG_FUNCTION_ARGS) duration = PG_ARGISNULL(0) ? (time_t) -1 : PG_GETARG_INT32(0); - if (neon_enable_new_communicator) + if (neon_use_communicator_worker) dc = communicator_new_approximate_working_set_size_seconds(duration, false); else dc = lfc_approximate_working_set_size_seconds(duration, false); @@ -655,7 +654,7 @@ approximate_working_set_size(PG_FUNCTION_ARGS) int32 dc; bool reset = PG_GETARG_BOOL(0); - if (neon_enable_new_communicator) + if (neon_use_communicator_worker) dc = communicator_new_approximate_working_set_size_seconds(-1, reset); else dc = lfc_approximate_working_set_size_seconds(-1, reset); diff --git a/pgxn/neon/neon.h b/pgxn/neon/neon.h index 149ed5ebed..431dacb708 100644 --- a/pgxn/neon/neon.h +++ b/pgxn/neon/neon.h @@ -13,7 +13,6 @@ #include "utils/wait_event.h" /* GUCs */ -extern bool neon_enable_new_communicator; extern char *neon_auth_token; extern char *neon_timeline; extern char *neon_tenant; diff --git a/pgxn/neon/pagestore_smgr.c b/pgxn/neon/pagestore_smgr.c index 79cd4685cb..06ce61d2e5 100644 --- a/pgxn/neon/pagestore_smgr.c +++ b/pgxn/neon/pagestore_smgr.c @@ -822,7 +822,7 @@ neon_exists(SMgrRelation reln, ForkNumber forkNum) return false; } - if (neon_enable_new_communicator) + if (neon_use_communicator_worker) return communicator_new_rel_exists(InfoFromSMgrRel(reln), forkNum); else { @@ -900,7 +900,7 @@ neon_create(SMgrRelation reln, ForkNumber forkNum, bool isRedo) * that's being replayed, so we should not have the correctness issue * mentioned in previous paragraph. */ - if (neon_enable_new_communicator) + if (neon_use_communicator_worker) { XLogRecPtr lsn = neon_get_write_lsn(); @@ -961,7 +961,7 @@ neon_unlink(NRelFileInfoBackend rinfo, ForkNumber forkNum, bool isRedo) if (!NRelFileInfoBackendIsTemp(rinfo)) { - if (neon_enable_new_communicator) + if (neon_use_communicator_worker) { XLogRecPtr lsn = neon_get_write_lsn(); @@ -1055,7 +1055,7 @@ neon_extend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, forkNum, blkno, (uint32) (lsn >> 32), (uint32) lsn); - if (neon_enable_new_communicator) + if (neon_use_communicator_worker) { // FIXME: this can pass lsn == invalid. Is that ok? communicator_new_rel_extend(InfoFromSMgrRel(reln), forkNum, blkno, (const void *) buffer, lsn); @@ -1182,7 +1182,7 @@ neon_zeroextend(SMgrRelation reln, ForkNumber forkNum, BlockNumber start_block, lsn = XLogInsert(RM_XLOG_ID, XLOG_FPI); - if (!neon_enable_new_communicator) + if (!neon_use_communicator_worker) { for (int i = 0; i < count; i++) { @@ -1198,7 +1198,7 @@ neon_zeroextend(SMgrRelation reln, ForkNumber forkNum, BlockNumber start_block, Assert(lsn != 0); - if (neon_enable_new_communicator) + if (neon_use_communicator_worker) { communicator_new_rel_zeroextend(InfoFromSMgrRel(reln), forkNum, start_block, nblocks, lsn); } @@ -1266,7 +1266,7 @@ neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence); } - if (neon_enable_new_communicator) + if (neon_use_communicator_worker) { communicator_new_prefetch_register_bufferv(InfoFromSMgrRel(reln), forknum, blocknum, nblocks); return false; @@ -1298,7 +1298,7 @@ neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, blocknum += iterblocks; } - if (!neon_enable_new_communicator) + if (!neon_use_communicator_worker) communicator_prefetch_pump_state(); return false; @@ -1326,7 +1326,7 @@ neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum) neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence); } - if (neon_enable_new_communicator) + if (neon_use_communicator_worker) { communicator_new_prefetch_register_bufferv(InfoFromSMgrRel(reln), forknum, blocknum, 1); } @@ -1388,7 +1388,7 @@ neon_writeback(SMgrRelation reln, ForkNumber forknum, */ neon_log(SmgrTrace, "writeback noop"); - if (!neon_enable_new_communicator) + if (!neon_use_communicator_worker) communicator_prefetch_pump_state(); if (debug_compare_local) @@ -1406,7 +1406,7 @@ void neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, neon_request_lsns request_lsns, void *buffer) { - if (neon_enable_new_communicator) + if (neon_use_communicator_worker) { // FIXME: request_lsns is ignored. That affects the neon_test_utils callers. // Add the capability to specify the LSNs explicitly, for the sake of neon_test_utils ? @@ -1539,7 +1539,7 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence); } - if (neon_enable_new_communicator) + if (neon_use_communicator_worker) { communicator_new_read_at_lsnv(InfoFromSMgrRel(reln), forkNum, blkno, (void *) &buffer, 1); @@ -1650,12 +1650,12 @@ neon_readv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, nblocks, PG_IOV_MAX); /* Try to read PS results if they are available */ - if (!neon_enable_new_communicator) + if (!neon_use_communicator_worker) communicator_prefetch_pump_state(); memset(read_pages, 0, sizeof(read_pages)); - if (neon_enable_new_communicator) + if (neon_use_communicator_worker) { communicator_new_read_at_lsnv(InfoFromSMgrRel(reln), forknum, blocknum, buffers, nblocks); @@ -1811,7 +1811,7 @@ neon_write(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, const vo forknum, blocknum, (uint32) (lsn >> 32), (uint32) lsn); - if (neon_enable_new_communicator) + if (neon_use_communicator_worker) { communicator_new_write_page(InfoFromSMgrRel(reln), forknum, blocknum, buffer, lsn); } @@ -1881,7 +1881,7 @@ neon_writev(SMgrRelation reln, ForkNumber forknum, BlockNumber blkno, neon_wallog_pagev(reln, forknum, blkno, nblocks, (const char **) buffers, false); - if (neon_enable_new_communicator) + if (neon_use_communicator_worker) { for (int i = 0; i < nblocks; i++) { @@ -1936,7 +1936,7 @@ neon_nblocks(SMgrRelation reln, ForkNumber forknum) neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence); } - if (neon_enable_new_communicator) + if (neon_use_communicator_worker) { n_blocks = communicator_new_rel_nblocks(InfoFromSMgrRel(reln), forknum); } @@ -1976,7 +1976,7 @@ neon_dbsize(Oid dbNode) neon_request_lsns request_lsns; NRelFileInfo dummy_node = {0}; - if (neon_enable_new_communicator) + if (neon_use_communicator_worker) { db_size = communicator_new_dbsize(dbNode); } @@ -2023,7 +2023,7 @@ neon_truncate(SMgrRelation reln, ForkNumber forknum, BlockNumber old_blocks, Blo neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence); } - if (neon_enable_new_communicator) + if (neon_use_communicator_worker) { XLogRecPtr lsn = neon_get_write_lsn(); @@ -2104,7 +2104,7 @@ neon_immedsync(SMgrRelation reln, ForkNumber forknum) neon_log(SmgrTrace, "[NEON_SMGR] immedsync noop"); - if (!neon_enable_new_communicator) + if (!neon_use_communicator_worker) communicator_prefetch_pump_state(); if (debug_compare_local) @@ -2291,7 +2291,7 @@ neon_end_unlogged_build(SMgrRelation reln) nblocks = mdnblocks(reln, MAIN_FORKNUM); recptr = GetXLogInsertRecPtr(); - if (!neon_enable_new_communicator) + if (!neon_use_communicator_worker) { neon_set_lwlsn_block_range(recptr, InfoFromNInfoB(rinfob), @@ -2308,7 +2308,7 @@ neon_end_unlogged_build(SMgrRelation reln) RelFileInfoFmt(InfoFromNInfoB(rinfob)), forknum); - if (neon_enable_new_communicator) + if (neon_use_communicator_worker) { communicator_new_update_cached_rel_size(InfoFromSMgrRel(reln), forknum, nblocks, recptr); } @@ -2384,7 +2384,7 @@ neon_read_slru_segment(SMgrRelation reln, const char* path, int segno, void* buf request_lsns.not_modified_since = not_modified_since; request_lsns.effective_request_lsn = request_lsn; - if (neon_enable_new_communicator) + if (neon_use_communicator_worker) n_blocks = communicator_new_read_slru_segment(kind, (uint32_t)segno, &request_lsns, path); else n_blocks = communicator_read_slru_segment(kind, segno, &request_lsns, buffer); @@ -2424,7 +2424,7 @@ AtEOXact_neon(XactEvent event, void *arg) } break; } - if (!neon_enable_new_communicator) + if (!neon_use_communicator_worker) communicator_reconfigure_timeout_if_needed(); } @@ -2483,7 +2483,7 @@ smgr_init_neon(void) smgr_init_standard(); neon_init(); - if (neon_enable_new_communicator) + if (neon_use_communicator_worker) communicator_new_init(); else communicator_init(); @@ -2498,7 +2498,7 @@ neon_extend_rel_size(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno, /* This is only used in WAL replay */ Assert(RecoveryInProgress()); - if (neon_enable_new_communicator) + if (neon_use_communicator_worker) { relsize = communicator_new_rel_nblocks(rinfo, forknum); @@ -2677,7 +2677,7 @@ neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id) * We should perform this check after assigning LwLSN to prevent * prefetching of some older version of the page by some other backend. */ - if (neon_enable_new_communicator) + if (neon_use_communicator_worker) no_redo_needed = communicator_new_cache_contains(rinfo, forknum, blkno); else no_redo_needed = !lfc_cache_contains(rinfo, forknum, blkno); diff --git a/pgxn/neon/relsize_cache.c b/pgxn/neon/relsize_cache.c index 4ea303f996..a880e6bc71 100644 --- a/pgxn/neon/relsize_cache.c +++ b/pgxn/neon/relsize_cache.c @@ -100,7 +100,7 @@ get_cached_relsize(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber *size) { bool found = false; - Assert(!neon_enable_new_communicator); + Assert(!neon_use_communicator_worker); if (relsize_hash_size > 0) { @@ -133,7 +133,7 @@ get_cached_relsize(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber *size) void set_cached_relsize(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber size) { - Assert(!neon_enable_new_communicator); + Assert(!neon_use_communicator_worker); if (relsize_hash_size > 0) { @@ -183,7 +183,7 @@ set_cached_relsize(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber size) void update_cached_relsize(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber size) { - Assert(!neon_enable_new_communicator); + Assert(!neon_use_communicator_worker); if (relsize_hash_size > 0) { @@ -219,7 +219,7 @@ update_cached_relsize(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber size) void forget_cached_relsize(NRelFileInfo rinfo, ForkNumber forknum) { - Assert(!neon_enable_new_communicator); + Assert(!neon_use_communicator_worker); if (relsize_hash_size > 0) { diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index d594d2132b..a839373588 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -4353,9 +4353,9 @@ class Endpoint(PgProtocol, LogUtils): # XXX: By checking for None, we enable the new communicator for all tests # by default if grpc or grpc is None: - config_lines += ["neon.enable_new_communicator=on"] + config_lines += ["neon.use_communicator_worker=on"] else: - config_lines += ["neon.enable_new_communicator=off"] + config_lines += ["neon.use_communicator_worker=off"] # Delete file cache if it exists (and we're recreating the endpoint) if USE_LFC: diff --git a/vendor/postgres-v14 b/vendor/postgres-v14 index 9085654ee8..47304b9215 160000 --- a/vendor/postgres-v14 +++ b/vendor/postgres-v14 @@ -1 +1 @@ -Subproject commit 9085654ee8022d5cc4ca719380a1dc53e5e3246f +Subproject commit 47304b921555b3f33eb3b49daada3078e774cfd7 diff --git a/vendor/postgres-v15 b/vendor/postgres-v15 index 8c3249f36c..cef72d5308 160000 --- a/vendor/postgres-v15 +++ b/vendor/postgres-v15 @@ -1 +1 @@ -Subproject commit 8c3249f36c7df6ac0efb8ee9f1baf4aa1b83e5c9 +Subproject commit cef72d5308ddce3795a9043fcd94f8849f7f4800 diff --git a/vendor/postgres-v16 b/vendor/postgres-v16 index 7a4c0eacae..e9db1ff5a6 160000 --- a/vendor/postgres-v16 +++ b/vendor/postgres-v16 @@ -1 +1 @@ -Subproject commit 7a4c0eacaeb9b97416542fa19103061c166460b1 +Subproject commit e9db1ff5a6f3ca18f626ba3d62ab475e6c688a96 diff --git a/vendor/postgres-v17 b/vendor/postgres-v17 index db424d42d7..a50d80c750 160000 --- a/vendor/postgres-v17 +++ b/vendor/postgres-v17 @@ -1 +1 @@ -Subproject commit db424d42d748f8ad91ac00e28db2c7f2efa42f7f +Subproject commit a50d80c7507e8ae9fc37bf1869051cf2d51370ab