From cb50291dcd68ba7b4240cab78671ead96e191e58 Mon Sep 17 00:00:00 2001 From: Victor Polevoy Date: Thu, 10 Jul 2025 11:02:32 +0200 Subject: [PATCH] Fetches the SLRU segment via the new communicator. The fetch is done not into a buffer as earlier, but directly into the file. --- libs/neon-shmem/src/hash.rs | 31 +++++++++ libs/neon-shmem/src/hash/core.rs | 30 +++++++++ libs/neon-shmem/src/shmem.rs | 2 + pgxn/neon/communicator/src/file_cache.rs | 2 + .../neon/communicator/src/integrated_cache.rs | 1 + pgxn/neon/communicator/src/lib.rs | 2 +- pgxn/neon/communicator/src/neon_request.rs | 31 ++++++++- .../src/worker_process/main_loop.rs | 38 ++++++++++- pgxn/neon/communicator_new.c | 66 ++++++++++++++++++- pgxn/neon/communicator_new.h | 8 ++- pgxn/neon/file_cache.c | 30 ++++----- pgxn/neon/libpagestore.c | 8 +-- pgxn/neon/neon.c | 13 ++-- pgxn/neon/neon.h | 1 - pgxn/neon/pagestore_smgr.c | 60 ++++++++--------- pgxn/neon/relsize_cache.c | 10 ++- test_runner/fixtures/neon_fixtures.py | 4 +- test_runner/regress/test_normal_work.py | 4 +- vendor/postgres-v14 | 2 +- vendor/postgres-v15 | 2 +- vendor/postgres-v16 | 2 +- vendor/postgres-v17 | 2 +- vendor/revisions.json | 8 +-- 23 files changed, 275 insertions(+), 82 deletions(-) diff --git a/libs/neon-shmem/src/hash.rs b/libs/neon-shmem/src/hash.rs index 84c2be3637..6fc7baefcc 100644 --- a/libs/neon-shmem/src/hash.rs +++ b/libs/neon-shmem/src/hash.rs @@ -14,6 +14,7 @@ //! in-place and are at a high level achieved by expanding/reducing the bucket array and rebuilding the //! dictionary by rehashing all keys. +use std::fmt::Debug; use std::hash::{BuildHasher, Hash}; use std::mem::MaybeUninit; @@ -41,6 +42,22 @@ pub struct HashMapInit<'a, K, V, S = rustc_hash::FxBuildHasher> { num_buckets: u32, } +impl<'a, K, V, S> Debug for HashMapInit<'a, K, V, S> +where + K: Debug, + V: Debug, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("HashMapInit") + .field("shmem_handle", &self.shmem_handle) + .field("shared_ptr", &self.shared_ptr) + .field("shared_size", &self.shared_size) + // .field("hasher", &self.hasher) + .field("num_buckets", &self.num_buckets) + .finish() + } +} + /// This is a per-process handle to a hash table that (possibly) lives in shared memory. /// If a child process is launched with fork(), the child process should /// get its own HashMapAccess by calling HashMapInit::attach_writer/reader(). @@ -56,6 +73,20 @@ pub struct HashMapAccess<'a, K, V, S = rustc_hash::FxBuildHasher> { unsafe impl Sync for HashMapAccess<'_, K, V, S> {} unsafe impl Send for HashMapAccess<'_, K, V, S> {} +impl<'a, K, V, S> Debug for HashMapAccess<'a, K, V, S> +where + K: Debug, + V: Debug, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("HashMapAccess") + .field("shmem_handle", &self.shmem_handle) + .field("shared_ptr", &self.shared_ptr) + // .field("hasher", &self.hasher) + .finish() + } +} + impl<'a, K: Clone + Hash + Eq, V, S> HashMapInit<'a, K, V, S> { /// Change the 'hasher' used by the hash table. /// diff --git a/libs/neon-shmem/src/hash/core.rs b/libs/neon-shmem/src/hash/core.rs index 013eb9a09c..67be7672d1 100644 --- a/libs/neon-shmem/src/hash/core.rs +++ b/libs/neon-shmem/src/hash/core.rs @@ -1,5 +1,6 @@ //! Simple hash table with chaining. +use std::fmt::Debug; use std::hash::Hash; use std::mem::MaybeUninit; @@ -17,6 +18,19 @@ pub(crate) struct Bucket { pub(crate) inner: Option<(K, V)>, } +impl Debug for Bucket +where + K: Debug, + V: Debug, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Bucket") + .field("next", &self.next) + .field("inner", &self.inner) + .finish() + } +} + /// Core hash table implementation. pub(crate) struct CoreHashMap<'a, K, V> { /// Dictionary used to map hashes to bucket indices. @@ -34,6 +48,22 @@ pub(crate) struct CoreHashMap<'a, K, V> { pub(crate) _user_list_head: u32, } +impl<'a, K, V> Debug for CoreHashMap<'a, K, V> +where + K: Debug, + V: Debug, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("CoreHashMap") + .field("dictionary", &self.dictionary) + .field("buckets", &self.buckets) + .field("free_head", &self.free_head) + .field("alloc_limit", &self.alloc_limit) + .field("buckets_in_use", &self.buckets_in_use) + .finish() + } +} + /// Error for when there are no empty buckets left but one is needed. #[derive(Debug, PartialEq)] pub struct FullError(); diff --git a/libs/neon-shmem/src/shmem.rs b/libs/neon-shmem/src/shmem.rs index f19f402859..9c304d6540 100644 --- a/libs/neon-shmem/src/shmem.rs +++ b/libs/neon-shmem/src/shmem.rs @@ -21,6 +21,7 @@ use nix::unistd::ftruncate as nix_ftruncate; /// the underlying file is resized. Do not access the area beyond the current size. Currently, that /// will cause the file to be expanded, but we might use `mprotect()` etc. to enforce that in the /// future. +#[derive(Debug)] pub struct ShmemHandle { /// memfd file descriptor fd: OwnedFd, @@ -35,6 +36,7 @@ pub struct ShmemHandle { } /// This is stored at the beginning in the shared memory area. +#[derive(Debug)] struct SharedStruct { max_size: usize, diff --git a/pgxn/neon/communicator/src/file_cache.rs b/pgxn/neon/communicator/src/file_cache.rs index 1f60c97f2c..f153174c6b 100644 --- a/pgxn/neon/communicator/src/file_cache.rs +++ b/pgxn/neon/communicator/src/file_cache.rs @@ -22,6 +22,7 @@ pub type CacheBlock = u64; pub const INVALID_CACHE_BLOCK: CacheBlock = u64::MAX; +#[derive(Debug)] pub struct FileCache { file: Arc, @@ -35,6 +36,7 @@ pub struct FileCache { // TODO: We keep track of all free blocks in this vec. That doesn't really scale. // Idea: when free_blocks fills up with more than 1024 entries, write them all to // one block on disk. +#[derive(Debug)] struct FreeList { next_free_block: CacheBlock, max_blocks: u64, diff --git a/pgxn/neon/communicator/src/integrated_cache.rs b/pgxn/neon/communicator/src/integrated_cache.rs index e00e49bf3d..a69af44492 100644 --- a/pgxn/neon/communicator/src/integrated_cache.rs +++ b/pgxn/neon/communicator/src/integrated_cache.rs @@ -46,6 +46,7 @@ pub struct IntegratedCacheInitStruct<'t> { } /// Represents write-access to the integrated cache. This is used by the communicator process. +#[derive(Debug)] pub struct IntegratedCacheWriteAccess<'t> { relsize_cache: neon_shmem::hash::HashMapAccess<'t, RelKey, RelEntry>, block_map: neon_shmem::hash::HashMapAccess<'t, BlockKey, BlockEntry>, diff --git a/pgxn/neon/communicator/src/lib.rs b/pgxn/neon/communicator/src/lib.rs index 734e89a89a..d0c5b758da 100644 --- a/pgxn/neon/communicator/src/lib.rs +++ b/pgxn/neon/communicator/src/lib.rs @@ -21,5 +21,5 @@ mod worker_process; mod global_allocator; -// FIXME get this from postgres headers somehow +// FIXME: get this from postgres headers somehow pub const BLCKSZ: usize = 8192; diff --git a/pgxn/neon/communicator/src/neon_request.rs b/pgxn/neon/communicator/src/neon_request.rs index 732c35d6ce..70ceaf8744 100644 --- a/pgxn/neon/communicator/src/neon_request.rs +++ b/pgxn/neon/communicator/src/neon_request.rs @@ -15,7 +15,9 @@ pub type COid = u32; // This conveniently matches PG_IOV_MAX pub const MAX_GETPAGEV_PAGES: usize = 32; -use pageserver_page_api as page_api; +use std::ffi::CStr; + +use pageserver_page_api::{self as page_api, SlruKind}; /// Request from a Postgres backend to the communicator process #[allow(clippy::large_enum_variant)] @@ -29,6 +31,7 @@ pub enum NeonIORequest { RelExists(CRelExistsRequest), RelSize(CRelSizeRequest), GetPageV(CGetPageVRequest), + ReadSlruSegment(CReadSlruSegmentRequest), PrefetchV(CPrefetchVRequest), DbSize(CDbSizeRequest), @@ -54,6 +57,9 @@ pub enum NeonIOResult { /// the result pages are written to the shared memory addresses given in the request GetPageV, + /// The result is written to the file, path to which is provided + /// in the request. The [`u64`] value here is the number of blocks. + ReadSlruSegment(u64), /// A prefetch request returns as soon as the request has been received by the communicator. /// It is processed in the background. @@ -83,6 +89,7 @@ impl NeonIORequest { RelExists(req) => req.request_id, RelSize(req) => req.request_id, GetPageV(req) => req.request_id, + ReadSlruSegment(req) => req.request_id, PrefetchV(req) => req.request_id, DbSize(req) => req.request_id, WritePage(req) => req.request_id, @@ -193,6 +200,28 @@ pub struct CGetPageVRequest { pub dest: [ShmemBuf; MAX_GETPAGEV_PAGES], } +#[repr(C)] +#[derive(Copy, Clone, Debug)] +pub struct CReadSlruSegmentRequest { + pub request_id: u64, + pub slru_kind: SlruKind, + pub segment_number: u32, + pub request_lsn: CLsn, + /// Must be a null-terminated C string containing the file path + /// where the communicator will write the SLRU segment. + pub destination_file_path: ShmemBuf, +} + +impl CReadSlruSegmentRequest { + /// Returns the file path where the communicator will write the + /// SLRU segment. + pub(crate) fn destination_file_path(&self) -> String { + unsafe { CStr::from_ptr(self.destination_file_path.as_mut_ptr() as *const _) } + .to_string_lossy() + .into_owned() + } +} + #[repr(C)] #[derive(Copy, Clone, Debug)] pub struct CPrefetchVRequest { diff --git a/pgxn/neon/communicator/src/worker_process/main_loop.rs b/pgxn/neon/communicator/src/worker_process/main_loop.rs index c207132753..0b2f9da366 100644 --- a/pgxn/neon/communicator/src/worker_process/main_loop.rs +++ b/pgxn/neon/communicator/src/worker_process/main_loop.rs @@ -24,7 +24,7 @@ use utils::id::{TenantId, TimelineId}; use super::callbacks::{get_request_lsn, notify_proc}; -use tracing::{error, info, info_span, trace}; +use tracing::{debug, error, info, info_span, trace}; use utils::lsn::Lsn; @@ -58,6 +58,7 @@ pub struct CommunicatorWorkerProcessStruct<'a> { request_rel_exists_counter: IntCounter, request_rel_size_counter: IntCounter, request_get_pagev_counter: IntCounter, + request_read_slru_segment_counter: IntCounter, request_prefetchv_counter: IntCounter, request_db_size_counter: IntCounter, request_write_page_counter: IntCounter, @@ -106,6 +107,8 @@ pub(super) async fn init( .integrated_cache_init_struct .worker_process_init(last_lsn, file_cache); + debug!("Initialised integrated cache: {cache:?}"); + let tenant_id = TenantId::from_str(&tenant_id).expect("invalid tenant ID"); let timeline_id = TimelineId::from_str(&timeline_id).expect("invalid timeline ID"); let shard_spec = ShardSpec::new(shard_map, stripe_size).expect("invalid shard spec"); @@ -123,6 +126,8 @@ pub(super) async fn init( let request_rel_exists_counter = request_counters.with_label_values(&["rel_exists"]); let request_rel_size_counter = request_counters.with_label_values(&["rel_size"]); let request_get_pagev_counter = request_counters.with_label_values(&["get_pagev"]); + let request_read_slru_segment_counter = + request_counters.with_label_values(&["read_slru_segment"]); let request_prefetchv_counter = request_counters.with_label_values(&["prefetchv"]); let request_db_size_counter = request_counters.with_label_values(&["db_size"]); let request_write_page_counter = request_counters.with_label_values(&["write_page"]); @@ -173,6 +178,7 @@ pub(super) async fn init( request_rel_exists_counter, request_rel_size_counter, request_get_pagev_counter, + request_read_slru_segment_counter, request_prefetchv_counter, request_db_size_counter, request_write_page_counter, @@ -418,6 +424,36 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { Err(errno) => NeonIOResult::Error(errno), } } + NeonIORequest::ReadSlruSegment(req) => { + self.request_read_slru_segment_counter.inc(); + let lsn = Lsn(req.request_lsn); + let file_path = req.destination_file_path(); + + match self + .client + .get_slru_segment(page_api::GetSlruSegmentRequest { + read_lsn: self.request_lsns(lsn), + kind: req.slru_kind, + segno: req.segment_number, + }) + .await + { + Ok(slru_bytes) => { + if let Err(e) = tokio::fs::write(&file_path, &slru_bytes).await { + info!("could not write slru segment to file {file_path}: {e}"); + return NeonIOResult::Error(e.raw_os_error().unwrap_or(libc::EIO)); + } + + let blocks_count = slru_bytes.len() / crate::BLCKSZ; + + NeonIOResult::ReadSlruSegment(blocks_count as _) + } + Err(err) => { + info!("tonic error: {err:?}"); + NeonIOResult::Error(0) + } + } + } NeonIORequest::PrefetchV(req) => { self.request_prefetchv_counter.inc(); self.request_prefetchv_nblocks_counter diff --git a/pgxn/neon/communicator_new.c b/pgxn/neon/communicator_new.c index bdd5a75d62..87c25af8e5 100644 --- a/pgxn/neon/communicator_new.c +++ b/pgxn/neon/communicator_new.c @@ -997,10 +997,58 @@ communicator_new_dbsize(Oid dbNode) } int -communicator_new_read_slru_segment(SlruKind kind, int64 segno, void *buffer) +communicator_new_read_slru_segment( + SlruKind kind, + uint32_t segno, + neon_request_lsns *request_lsns, + const char* path) { - /* TODO */ - elog(ERROR, "not implemented"); + NeonIOResult result = {}; + NeonIORequest request = { + .tag = NeonIORequest_ReadSlruSegment, + .read_slru_segment = { + .request_id = assign_request_id(), + .slru_kind = kind, + .segment_number = segno, + .request_lsn = request_lsns->request_lsn, + } + }; + int nblocks = -1; + char *temp_path = bounce_buf(); + + if (path == NULL) { + elog(ERROR, "read_slru_segment called with NULL path"); + return -1; + } + + strlcpy(temp_path, path, BLCKSZ); + request.read_slru_segment.destination_file_path.ptr = (uint8_t *) temp_path; + + elog(DEBUG5, "readslrusegment called for kind=%u, segno=%u, file_path=\"%s\"", + kind, segno, request.read_slru_segment.destination_file_path.ptr); + + /* FIXME: see `request_lsns` in main_loop.rs for why this is needed */ + XLogSetAsyncXactLSN(request_lsns->request_lsn); + + perform_request(&request, &result); + + switch (result.tag) + { + case NeonIOResult_ReadSlruSegment: + nblocks = result.read_slru_segment; + break; + case NeonIOResult_Error: + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read slru segment, kind=%u, segno=%u: %s", + kind, segno, pg_strerror(result.error)))); + break; + default: + elog(ERROR, "unexpected result for read SLRU operation: %d", result.tag); + break; + } + + return nblocks; } /* Write requests */ @@ -1305,6 +1353,18 @@ print_neon_io_request(NeonIORequest *request) r->spc_oid, r->db_oid, r->rel_number, r->fork_number, r->block_number, r->block_number + r->nblocks); return buf; } + case NeonIORequest_ReadSlruSegment: + { + CReadSlruSegmentRequest *r = &request->read_slru_segment; + + snprintf(buf, sizeof(buf), "ReadSlruSegment: req " UINT64_FORMAT " slrukind=%u, segno=%u, lsn=%X/%X, file_path=\"%s\"", + r->request_id, + r->slru_kind, + r->segment_number, + LSN_FORMAT_ARGS(r->request_lsn), + r->destination_file_path.ptr); + return buf; + } case NeonIORequest_PrefetchV: { CPrefetchVRequest *r = &request->prefetch_v; diff --git a/pgxn/neon/communicator_new.h b/pgxn/neon/communicator_new.h index 1323c48e15..a19feaaac6 100644 --- a/pgxn/neon/communicator_new.h +++ b/pgxn/neon/communicator_new.h @@ -38,8 +38,12 @@ extern void communicator_new_prefetch_register_bufferv(NRelFileInfo rinfo, ForkN BlockNumber nblocks); extern bool communicator_new_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blockno); -extern int communicator_new_read_slru_segment(SlruKind kind, int64 segno, - void *buffer); +extern int communicator_new_read_slru_segment( + SlruKind kind, + uint32_t segno, + neon_request_lsns *request_lsns, + const char *path +); /* Write requests, to keep the caches up-to-date */ extern void communicator_new_write_page(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blockno, diff --git a/pgxn/neon/file_cache.c b/pgxn/neon/file_cache.c index 909fd6fa36..754b1ca033 100644 --- a/pgxn/neon/file_cache.c +++ b/pgxn/neon/file_cache.c @@ -258,7 +258,7 @@ lfc_switch_off(void) { int fd; - Assert(!neon_enable_new_communicator); + Assert(!neon_use_communicator_worker); if (LFC_ENABLED()) { @@ -325,7 +325,7 @@ lfc_maybe_disabled(void) static bool lfc_ensure_opened(void) { - Assert(!neon_enable_new_communicator); + Assert(!neon_use_communicator_worker); if (lfc_generation != lfc_ctl->generation) { @@ -352,7 +352,7 @@ lfc_shmem_startup(void) bool found; static HASHCTL info; - Assert(!neon_enable_new_communicator); + Assert(!neon_use_communicator_worker); if (prev_shmem_startup_hook) { @@ -652,7 +652,7 @@ lfc_init(void) if (lfc_max_size == 0) return; - if (neon_enable_new_communicator) + if (neon_use_communicator_worker) return; prev_shmem_startup_hook = shmem_startup_hook; @@ -730,7 +730,7 @@ lfc_prewarm(FileCacheState* fcs, uint32 n_workers) dsm_segment *seg; BackgroundWorkerHandle* bgw_handle[MAX_PREWARM_WORKERS]; - Assert(!neon_enable_new_communicator); + Assert(!neon_use_communicator_worker); if (!lfc_ensure_opened()) return; @@ -885,7 +885,7 @@ lfc_prewarm_main(Datum main_arg) PrewarmWorkerState* ws; uint32 worker_id = DatumGetInt32(main_arg); - Assert(!neon_enable_new_communicator); + Assert(!neon_use_communicator_worker); AmPrewarmWorker = true; @@ -987,7 +987,7 @@ lfc_invalidate(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber nblocks) FileCacheEntry *entry; uint32 hash; - Assert(!neon_enable_new_communicator); + Assert(!neon_use_communicator_worker); if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */ return; @@ -1034,7 +1034,7 @@ lfc_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno) bool found = false; uint32 hash; - Assert(!neon_enable_new_communicator); + Assert(!neon_use_communicator_worker); if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */ return false; @@ -1071,7 +1071,7 @@ lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, uint32 hash; int i = 0; - Assert(!neon_enable_new_communicator); + Assert(!neon_use_communicator_worker); if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */ return 0; @@ -1180,7 +1180,7 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, int blocks_read = 0; int buf_offset = 0; - Assert(!neon_enable_new_communicator); + Assert(!neon_use_communicator_worker); if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */ return -1; @@ -1547,7 +1547,7 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno, int chunk_offs = BLOCK_TO_CHUNK_OFF(blkno); - Assert(!neon_enable_new_communicator); + Assert(!neon_use_communicator_worker); if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */ return false; @@ -1694,7 +1694,7 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, uint32 entry_offset; int buf_offset = 0; - Assert(!neon_enable_new_communicator); + Assert(!neon_use_communicator_worker); if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */ return; @@ -2211,7 +2211,7 @@ get_local_cache_state(PG_FUNCTION_ARGS) size_t max_entries = PG_ARGISNULL(0) ? lfc_prewarm_limit : PG_GETARG_INT32(0); FileCacheState* fcs; - if (neon_enable_new_communicator) + if (neon_use_communicator_worker) elog(ERROR, "TODO: not implemented"); fcs = lfc_get_state(max_entries); @@ -2231,7 +2231,7 @@ prewarm_local_cache(PG_FUNCTION_ARGS) uint32 n_workers = PG_GETARG_INT32(1); FileCacheState* fcs; - if (neon_enable_new_communicator) + if (neon_use_communicator_worker) elog(ERROR, "TODO: not implemented"); fcs = (FileCacheState*)state; @@ -2254,7 +2254,7 @@ get_prewarm_info(PG_FUNCTION_ARGS) uint32 total_pages; size_t n_workers; - if (neon_enable_new_communicator) + if (neon_use_communicator_worker) elog(ERROR, "TODO: not implemented"); if (lfc_size_limit == 0) diff --git a/pgxn/neon/libpagestore.c b/pgxn/neon/libpagestore.c index 7dfc08e54a..1e41527fb5 100644 --- a/pgxn/neon/libpagestore.c +++ b/pgxn/neon/libpagestore.c @@ -123,7 +123,7 @@ static uint64 pagestore_local_counter = 0; typedef enum PSConnectionState { PS_Disconnected, /* no connection yet */ PS_Connecting_Startup, /* connection starting up */ - PS_Connecting_PageStream, /* negotiating pagestream */ + PS_Connecting_PageStream, /* negotiating pagestream */ PS_Connected, /* connected, pagestream established */ } PSConnectionState; @@ -253,7 +253,7 @@ AssignPageserverConnstring(const char *newval, void *extra) * In that case, the shard map is loaded from 'neon.pageserver_grpc_urls' * instead, and that happens in the communicator process only. */ - if (neon_enable_new_communicator) + if (neon_use_communicator_worker) return; /* @@ -395,7 +395,7 @@ get_shard_number(BufferTag *tag) } static inline void -CLEANUP_AND_DISCONNECT(PageServer *shard) +CLEANUP_AND_DISCONNECT(PageServer *shard) { if (shard->wes_read) { @@ -417,7 +417,7 @@ CLEANUP_AND_DISCONNECT(PageServer *shard) * complete the connection (e.g. due to receiving an earlier cancellation * during connection start). * Returns true if successfully connected; false if the connection failed. - * + * * Throws errors in unrecoverable situations, or when this backend's query * is canceled. */ diff --git a/pgxn/neon/neon.c b/pgxn/neon/neon.c index 68f00de761..10785f748f 100644 --- a/pgxn/neon/neon.c +++ b/pgxn/neon/neon.c @@ -52,7 +52,6 @@ PG_MODULE_MAGIC; void _PG_init(void); -bool neon_enable_new_communicator; static int running_xacts_overflow_policy; static bool monitor_query_exec_time = false; @@ -468,10 +467,10 @@ _PG_init(void) #endif DefineCustomBoolVariable( - "neon.enable_new_communicator", - "Enables new communicator implementation", + "neon.use_communicator_worker", + "Uses the communicator worker implementation", NULL, - &neon_enable_new_communicator, + &neon_use_communicator_worker, true, PGC_POSTMASTER, 0, @@ -483,7 +482,7 @@ _PG_init(void) init_lwlsncache(); pg_init_communicator(); - if (neon_enable_new_communicator) + if (neon_use_communicator_worker) pg_init_communicator_new(); Custom_XLogReaderRoutines = NeonOnDemandXLogReaderRoutines; @@ -648,7 +647,7 @@ approximate_working_set_size_seconds(PG_FUNCTION_ARGS) duration = PG_ARGISNULL(0) ? (time_t) -1 : PG_GETARG_INT32(0); - if (neon_enable_new_communicator) + if (neon_use_communicator_worker) dc = communicator_new_approximate_working_set_size_seconds(duration, false); else dc = lfc_approximate_working_set_size_seconds(duration, false); @@ -664,7 +663,7 @@ approximate_working_set_size(PG_FUNCTION_ARGS) bool reset = PG_GETARG_BOOL(0); int32 dc; - if (neon_enable_new_communicator) + if (neon_use_communicator_worker) dc = communicator_new_approximate_working_set_size_seconds(-1, reset); else dc = lfc_approximate_working_set_size_seconds(-1, reset); diff --git a/pgxn/neon/neon.h b/pgxn/neon/neon.h index f781b08aa0..215396ef7a 100644 --- a/pgxn/neon/neon.h +++ b/pgxn/neon/neon.h @@ -13,7 +13,6 @@ #include "utils/wait_event.h" /* GUCs */ -extern bool neon_enable_new_communicator; extern char *neon_auth_token; extern char *neon_timeline; extern char *neon_tenant; diff --git a/pgxn/neon/pagestore_smgr.c b/pgxn/neon/pagestore_smgr.c index 9340d49f5a..06ce61d2e5 100644 --- a/pgxn/neon/pagestore_smgr.c +++ b/pgxn/neon/pagestore_smgr.c @@ -822,7 +822,7 @@ neon_exists(SMgrRelation reln, ForkNumber forkNum) return false; } - if (neon_enable_new_communicator) + if (neon_use_communicator_worker) return communicator_new_rel_exists(InfoFromSMgrRel(reln), forkNum); else { @@ -900,7 +900,7 @@ neon_create(SMgrRelation reln, ForkNumber forkNum, bool isRedo) * that's being replayed, so we should not have the correctness issue * mentioned in previous paragraph. */ - if (neon_enable_new_communicator) + if (neon_use_communicator_worker) { XLogRecPtr lsn = neon_get_write_lsn(); @@ -961,7 +961,7 @@ neon_unlink(NRelFileInfoBackend rinfo, ForkNumber forkNum, bool isRedo) if (!NRelFileInfoBackendIsTemp(rinfo)) { - if (neon_enable_new_communicator) + if (neon_use_communicator_worker) { XLogRecPtr lsn = neon_get_write_lsn(); @@ -1055,7 +1055,7 @@ neon_extend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, forkNum, blkno, (uint32) (lsn >> 32), (uint32) lsn); - if (neon_enable_new_communicator) + if (neon_use_communicator_worker) { // FIXME: this can pass lsn == invalid. Is that ok? communicator_new_rel_extend(InfoFromSMgrRel(reln), forkNum, blkno, (const void *) buffer, lsn); @@ -1182,7 +1182,7 @@ neon_zeroextend(SMgrRelation reln, ForkNumber forkNum, BlockNumber start_block, lsn = XLogInsert(RM_XLOG_ID, XLOG_FPI); - if (!neon_enable_new_communicator) + if (!neon_use_communicator_worker) { for (int i = 0; i < count; i++) { @@ -1198,7 +1198,7 @@ neon_zeroextend(SMgrRelation reln, ForkNumber forkNum, BlockNumber start_block, Assert(lsn != 0); - if (neon_enable_new_communicator) + if (neon_use_communicator_worker) { communicator_new_rel_zeroextend(InfoFromSMgrRel(reln), forkNum, start_block, nblocks, lsn); } @@ -1266,7 +1266,7 @@ neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence); } - if (neon_enable_new_communicator) + if (neon_use_communicator_worker) { communicator_new_prefetch_register_bufferv(InfoFromSMgrRel(reln), forknum, blocknum, nblocks); return false; @@ -1276,7 +1276,7 @@ neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, tag.dbOid = reln->smgr_rlocator.locator.dbOid; tag.relNumber = reln->smgr_rlocator.locator.relNumber; tag.forkNum = forknum; - + while (nblocks > 0) { int iterblocks = Min(nblocks, PG_IOV_MAX); @@ -1298,7 +1298,7 @@ neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, blocknum += iterblocks; } - if (!neon_enable_new_communicator) + if (!neon_use_communicator_worker) communicator_prefetch_pump_state(); return false; @@ -1326,7 +1326,7 @@ neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum) neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence); } - if (neon_enable_new_communicator) + if (neon_use_communicator_worker) { communicator_new_prefetch_register_bufferv(InfoFromSMgrRel(reln), forknum, blocknum, 1); } @@ -1388,7 +1388,7 @@ neon_writeback(SMgrRelation reln, ForkNumber forknum, */ neon_log(SmgrTrace, "writeback noop"); - if (!neon_enable_new_communicator) + if (!neon_use_communicator_worker) communicator_prefetch_pump_state(); if (debug_compare_local) @@ -1406,7 +1406,7 @@ void neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, neon_request_lsns request_lsns, void *buffer) { - if (neon_enable_new_communicator) + if (neon_use_communicator_worker) { // FIXME: request_lsns is ignored. That affects the neon_test_utils callers. // Add the capability to specify the LSNs explicitly, for the sake of neon_test_utils ? @@ -1539,7 +1539,7 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence); } - if (neon_enable_new_communicator) + if (neon_use_communicator_worker) { communicator_new_read_at_lsnv(InfoFromSMgrRel(reln), forkNum, blkno, (void *) &buffer, 1); @@ -1650,12 +1650,12 @@ neon_readv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, nblocks, PG_IOV_MAX); /* Try to read PS results if they are available */ - if (!neon_enable_new_communicator) + if (!neon_use_communicator_worker) communicator_prefetch_pump_state(); memset(read_pages, 0, sizeof(read_pages)); - if (neon_enable_new_communicator) + if (neon_use_communicator_worker) { communicator_new_read_at_lsnv(InfoFromSMgrRel(reln), forknum, blocknum, buffers, nblocks); @@ -1664,7 +1664,7 @@ neon_readv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, { neon_get_request_lsns(InfoFromSMgrRel(reln), forknum, blocknum, request_lsns, nblocks); - + prefetch_result = communicator_prefetch_lookupv(InfoFromSMgrRel(reln), forknum, blocknum, request_lsns, nblocks, buffers, read_pages); @@ -1811,7 +1811,7 @@ neon_write(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, const vo forknum, blocknum, (uint32) (lsn >> 32), (uint32) lsn); - if (neon_enable_new_communicator) + if (neon_use_communicator_worker) { communicator_new_write_page(InfoFromSMgrRel(reln), forknum, blocknum, buffer, lsn); } @@ -1881,7 +1881,7 @@ neon_writev(SMgrRelation reln, ForkNumber forknum, BlockNumber blkno, neon_wallog_pagev(reln, forknum, blkno, nblocks, (const char **) buffers, false); - if (neon_enable_new_communicator) + if (neon_use_communicator_worker) { for (int i = 0; i < nblocks; i++) { @@ -1936,7 +1936,7 @@ neon_nblocks(SMgrRelation reln, ForkNumber forknum) neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence); } - if (neon_enable_new_communicator) + if (neon_use_communicator_worker) { n_blocks = communicator_new_rel_nblocks(InfoFromSMgrRel(reln), forknum); } @@ -1976,7 +1976,7 @@ neon_dbsize(Oid dbNode) neon_request_lsns request_lsns; NRelFileInfo dummy_node = {0}; - if (neon_enable_new_communicator) + if (neon_use_communicator_worker) { db_size = communicator_new_dbsize(dbNode); } @@ -2023,7 +2023,7 @@ neon_truncate(SMgrRelation reln, ForkNumber forknum, BlockNumber old_blocks, Blo neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence); } - if (neon_enable_new_communicator) + if (neon_use_communicator_worker) { XLogRecPtr lsn = neon_get_write_lsn(); @@ -2104,7 +2104,7 @@ neon_immedsync(SMgrRelation reln, ForkNumber forknum) neon_log(SmgrTrace, "[NEON_SMGR] immedsync noop"); - if (!neon_enable_new_communicator) + if (!neon_use_communicator_worker) communicator_prefetch_pump_state(); if (debug_compare_local) @@ -2291,7 +2291,7 @@ neon_end_unlogged_build(SMgrRelation reln) nblocks = mdnblocks(reln, MAIN_FORKNUM); recptr = GetXLogInsertRecPtr(); - if (!neon_enable_new_communicator) + if (!neon_use_communicator_worker) { neon_set_lwlsn_block_range(recptr, InfoFromNInfoB(rinfob), @@ -2308,7 +2308,7 @@ neon_end_unlogged_build(SMgrRelation reln) RelFileInfoFmt(InfoFromNInfoB(rinfob)), forknum); - if (neon_enable_new_communicator) + if (neon_use_communicator_worker) { communicator_new_update_cached_rel_size(InfoFromSMgrRel(reln), forknum, nblocks, recptr); } @@ -2384,8 +2384,8 @@ neon_read_slru_segment(SMgrRelation reln, const char* path, int segno, void* buf request_lsns.not_modified_since = not_modified_since; request_lsns.effective_request_lsn = request_lsn; - if (neon_enable_new_communicator) - n_blocks = communicator_new_read_slru_segment(kind, segno, buffer); + if (neon_use_communicator_worker) + n_blocks = communicator_new_read_slru_segment(kind, (uint32_t)segno, &request_lsns, path); else n_blocks = communicator_read_slru_segment(kind, segno, &request_lsns, buffer); @@ -2424,7 +2424,7 @@ AtEOXact_neon(XactEvent event, void *arg) } break; } - if (!neon_enable_new_communicator) + if (!neon_use_communicator_worker) communicator_reconfigure_timeout_if_needed(); } @@ -2483,7 +2483,7 @@ smgr_init_neon(void) smgr_init_standard(); neon_init(); - if (neon_enable_new_communicator) + if (neon_use_communicator_worker) communicator_new_init(); else communicator_init(); @@ -2498,7 +2498,7 @@ neon_extend_rel_size(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno, /* This is only used in WAL replay */ Assert(RecoveryInProgress()); - if (neon_enable_new_communicator) + if (neon_use_communicator_worker) { relsize = communicator_new_rel_nblocks(rinfo, forknum); @@ -2677,7 +2677,7 @@ neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id) * We should perform this check after assigning LwLSN to prevent * prefetching of some older version of the page by some other backend. */ - if (neon_enable_new_communicator) + if (neon_use_communicator_worker) no_redo_needed = communicator_new_cache_contains(rinfo, forknum, blkno); else no_redo_needed = !lfc_cache_contains(rinfo, forknum, blkno); diff --git a/pgxn/neon/relsize_cache.c b/pgxn/neon/relsize_cache.c index 4ea303f996..f3ceec78cc 100644 --- a/pgxn/neon/relsize_cache.c +++ b/pgxn/neon/relsize_cache.c @@ -23,9 +23,7 @@ #include "utils/dynahash.h" #include "utils/guc.h" -#if PG_VERSION_NUM >= 150000 #include "miscadmin.h" -#endif typedef struct { @@ -100,7 +98,7 @@ get_cached_relsize(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber *size) { bool found = false; - Assert(!neon_enable_new_communicator); + Assert(!neon_use_communicator_worker); if (relsize_hash_size > 0) { @@ -133,7 +131,7 @@ get_cached_relsize(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber *size) void set_cached_relsize(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber size) { - Assert(!neon_enable_new_communicator); + Assert(!neon_use_communicator_worker); if (relsize_hash_size > 0) { @@ -183,7 +181,7 @@ set_cached_relsize(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber size) void update_cached_relsize(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber size) { - Assert(!neon_enable_new_communicator); + Assert(!neon_use_communicator_worker); if (relsize_hash_size > 0) { @@ -219,7 +217,7 @@ update_cached_relsize(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber size) void forget_cached_relsize(NRelFileInfo rinfo, ForkNumber forknum) { - Assert(!neon_enable_new_communicator); + Assert(!neon_use_communicator_worker); if (relsize_hash_size > 0) { diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index e5c646468f..7c90c0162c 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -4369,9 +4369,9 @@ class Endpoint(PgProtocol, LogUtils): # XXX: By checking for None, we enable the new communicator for all tests # by default if grpc or grpc is None: - config_lines += [f"neon.enable_new_communicator=on"] + config_lines += ["neon.use_communicator_worker=on"] else: - config_lines += [f"neon.enable_new_communicator=off"] + config_lines += ["neon.use_communicator_worker=off"] # Delete file cache if it exists (and we're recreating the endpoint) if USE_LFC: diff --git a/test_runner/regress/test_normal_work.py b/test_runner/regress/test_normal_work.py index b815fee702..ae545664d2 100644 --- a/test_runner/regress/test_normal_work.py +++ b/test_runner/regress/test_normal_work.py @@ -17,7 +17,9 @@ def check_tenant( config_lines = [ f"neon.safekeeper_proto_version = {safekeeper_proto_version}", ] - endpoint = env.endpoints.create_start("main", tenant_id=tenant_id, config_lines=config_lines, grpc=True) + endpoint = env.endpoints.create_start( + "main", tenant_id=tenant_id, config_lines=config_lines, grpc=True + ) # we rely upon autocommit after each statement res_1 = endpoint.safe_psql_many( queries=[ diff --git a/vendor/postgres-v14 b/vendor/postgres-v14 index ac3c460e01..1cb207d1c9 160000 --- a/vendor/postgres-v14 +++ b/vendor/postgres-v14 @@ -1 +1 @@ -Subproject commit ac3c460e01a31f11fb52fd8d8e88e60f0e1069b4 +Subproject commit 1cb207d1c9efb1f6c6f864a47bf45e992a7f0eb0 diff --git a/vendor/postgres-v15 b/vendor/postgres-v15 index 24313bf8f3..9d19780350 160000 --- a/vendor/postgres-v15 +++ b/vendor/postgres-v15 @@ -1 +1 @@ -Subproject commit 24313bf8f3de722968a2fdf764de7ef77ed64f06 +Subproject commit 9d19780350c0c7b536312dc3b891ade55628bc7b diff --git a/vendor/postgres-v16 b/vendor/postgres-v16 index 51194dc5ce..1486f919d4 160000 --- a/vendor/postgres-v16 +++ b/vendor/postgres-v16 @@ -1 +1 @@ -Subproject commit 51194dc5ce2e3523068d8607852e6c3125a17e58 +Subproject commit 1486f919d4dc21637407ee7ed203497bb5bd516a diff --git a/vendor/postgres-v17 b/vendor/postgres-v17 index eac5279cd1..160d0b52d6 160000 --- a/vendor/postgres-v17 +++ b/vendor/postgres-v17 @@ -1 +1 @@ -Subproject commit eac5279cd147d4086e0eb242198aae2f4b766d7b +Subproject commit 160d0b52d66f4a5d21251a2912a50561bf600333 diff --git a/vendor/revisions.json b/vendor/revisions.json index e4b6c8e23a..69e7559c67 100644 --- a/vendor/revisions.json +++ b/vendor/revisions.json @@ -1,18 +1,18 @@ { "v17": [ "17.5", - "eac5279cd147d4086e0eb242198aae2f4b766d7b" + "160d0b52d66f4a5d21251a2912a50561bf600333" ], "v16": [ "16.9", - "51194dc5ce2e3523068d8607852e6c3125a17e58" + "1486f919d4dc21637407ee7ed203497bb5bd516a" ], "v15": [ "15.13", - "24313bf8f3de722968a2fdf764de7ef77ed64f06" + "9d19780350c0c7b536312dc3b891ade55628bc7b" ], "v14": [ "14.18", - "ac3c460e01a31f11fb52fd8d8e88e60f0e1069b4" + "1cb207d1c9efb1f6c6f864a47bf45e992a7f0eb0" ] }