From 245db281f356fd2aa5b26f04145f02ec1e29cb0b Mon Sep 17 00:00:00 2001 From: Victor Polevoy Date: Thu, 10 Jul 2025 15:34:48 +0200 Subject: [PATCH] Snapshot --- libs/neon-shmem/src/hash.rs | 31 +++++++++++++++++++ libs/neon-shmem/src/hash/core.rs | 30 ++++++++++++++++++ libs/neon-shmem/src/shmem.rs | 2 ++ pgxn/neon/communicator/src/file_cache.rs | 2 ++ .../neon/communicator/src/integrated_cache.rs | 1 + pgxn/neon/communicator/src/lib.rs | 28 ++++++++++++++++- pgxn/neon/communicator/src/neon_request.rs | 4 +-- .../src/worker_process/main_loop.rs | 7 ++++- pgxn/neon/communicator_new.c | 12 +++++-- pgxn/neon/communicator_new.h | 10 ++++-- pgxn/neon/pagestore_smgr.c | 2 +- 11 files changed, 120 insertions(+), 9 deletions(-) diff --git a/libs/neon-shmem/src/hash.rs b/libs/neon-shmem/src/hash.rs index 84c2be3637..6fc7baefcc 100644 --- a/libs/neon-shmem/src/hash.rs +++ b/libs/neon-shmem/src/hash.rs @@ -14,6 +14,7 @@ //! in-place and are at a high level achieved by expanding/reducing the bucket array and rebuilding the //! dictionary by rehashing all keys. +use std::fmt::Debug; use std::hash::{BuildHasher, Hash}; use std::mem::MaybeUninit; @@ -41,6 +42,22 @@ pub struct HashMapInit<'a, K, V, S = rustc_hash::FxBuildHasher> { num_buckets: u32, } +impl<'a, K, V, S> Debug for HashMapInit<'a, K, V, S> +where + K: Debug, + V: Debug, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("HashMapInit") + .field("shmem_handle", &self.shmem_handle) + .field("shared_ptr", &self.shared_ptr) + .field("shared_size", &self.shared_size) + // .field("hasher", &self.hasher) + .field("num_buckets", &self.num_buckets) + .finish() + } +} + /// This is a per-process handle to a hash table that (possibly) lives in shared memory. /// If a child process is launched with fork(), the child process should /// get its own HashMapAccess by calling HashMapInit::attach_writer/reader(). @@ -56,6 +73,20 @@ pub struct HashMapAccess<'a, K, V, S = rustc_hash::FxBuildHasher> { unsafe impl Sync for HashMapAccess<'_, K, V, S> {} unsafe impl Send for HashMapAccess<'_, K, V, S> {} +impl<'a, K, V, S> Debug for HashMapAccess<'a, K, V, S> +where + K: Debug, + V: Debug, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("HashMapAccess") + .field("shmem_handle", &self.shmem_handle) + .field("shared_ptr", &self.shared_ptr) + // .field("hasher", &self.hasher) + .finish() + } +} + impl<'a, K: Clone + Hash + Eq, V, S> HashMapInit<'a, K, V, S> { /// Change the 'hasher' used by the hash table. /// diff --git a/libs/neon-shmem/src/hash/core.rs b/libs/neon-shmem/src/hash/core.rs index 013eb9a09c..67be7672d1 100644 --- a/libs/neon-shmem/src/hash/core.rs +++ b/libs/neon-shmem/src/hash/core.rs @@ -1,5 +1,6 @@ //! Simple hash table with chaining. +use std::fmt::Debug; use std::hash::Hash; use std::mem::MaybeUninit; @@ -17,6 +18,19 @@ pub(crate) struct Bucket { pub(crate) inner: Option<(K, V)>, } +impl Debug for Bucket +where + K: Debug, + V: Debug, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Bucket") + .field("next", &self.next) + .field("inner", &self.inner) + .finish() + } +} + /// Core hash table implementation. pub(crate) struct CoreHashMap<'a, K, V> { /// Dictionary used to map hashes to bucket indices. @@ -34,6 +48,22 @@ pub(crate) struct CoreHashMap<'a, K, V> { pub(crate) _user_list_head: u32, } +impl<'a, K, V> Debug for CoreHashMap<'a, K, V> +where + K: Debug, + V: Debug, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("CoreHashMap") + .field("dictionary", &self.dictionary) + .field("buckets", &self.buckets) + .field("free_head", &self.free_head) + .field("alloc_limit", &self.alloc_limit) + .field("buckets_in_use", &self.buckets_in_use) + .finish() + } +} + /// Error for when there are no empty buckets left but one is needed. #[derive(Debug, PartialEq)] pub struct FullError(); diff --git a/libs/neon-shmem/src/shmem.rs b/libs/neon-shmem/src/shmem.rs index f19f402859..9c304d6540 100644 --- a/libs/neon-shmem/src/shmem.rs +++ b/libs/neon-shmem/src/shmem.rs @@ -21,6 +21,7 @@ use nix::unistd::ftruncate as nix_ftruncate; /// the underlying file is resized. Do not access the area beyond the current size. Currently, that /// will cause the file to be expanded, but we might use `mprotect()` etc. to enforce that in the /// future. +#[derive(Debug)] pub struct ShmemHandle { /// memfd file descriptor fd: OwnedFd, @@ -35,6 +36,7 @@ pub struct ShmemHandle { } /// This is stored at the beginning in the shared memory area. +#[derive(Debug)] struct SharedStruct { max_size: usize, diff --git a/pgxn/neon/communicator/src/file_cache.rs b/pgxn/neon/communicator/src/file_cache.rs index 1f60c97f2c..f153174c6b 100644 --- a/pgxn/neon/communicator/src/file_cache.rs +++ b/pgxn/neon/communicator/src/file_cache.rs @@ -22,6 +22,7 @@ pub type CacheBlock = u64; pub const INVALID_CACHE_BLOCK: CacheBlock = u64::MAX; +#[derive(Debug)] pub struct FileCache { file: Arc, @@ -35,6 +36,7 @@ pub struct FileCache { // TODO: We keep track of all free blocks in this vec. That doesn't really scale. // Idea: when free_blocks fills up with more than 1024 entries, write them all to // one block on disk. +#[derive(Debug)] struct FreeList { next_free_block: CacheBlock, max_blocks: u64, diff --git a/pgxn/neon/communicator/src/integrated_cache.rs b/pgxn/neon/communicator/src/integrated_cache.rs index 10103cb1b5..6b6ce03a66 100644 --- a/pgxn/neon/communicator/src/integrated_cache.rs +++ b/pgxn/neon/communicator/src/integrated_cache.rs @@ -46,6 +46,7 @@ pub struct IntegratedCacheInitStruct<'t> { } /// Represents write-access to the integrated cache. This is used by the communicator process. +#[derive(Debug)] pub struct IntegratedCacheWriteAccess<'t> { relsize_cache: neon_shmem::hash::HashMapAccess<'t, RelKey, RelEntry>, block_map: neon_shmem::hash::HashMapAccess<'t, BlockKey, BlockEntry>, diff --git a/pgxn/neon/communicator/src/lib.rs b/pgxn/neon/communicator/src/lib.rs index 734e89a89a..30fa698d83 100644 --- a/pgxn/neon/communicator/src/lib.rs +++ b/pgxn/neon/communicator/src/lib.rs @@ -21,5 +21,31 @@ mod worker_process; mod global_allocator; -// FIXME get this from postgres headers somehow +// FIXME: get this from postgres headers somehow +/// Size of a disk block: this also limits the size of a tuple. You can +/// set it bigger if you need bigger tuples (although `TOAST` should +/// reduce the need to have large tuples, since fields can be spread +/// across multiple tuples). [`BLCKSZ`] must be a power of `2`. The +/// maximum possible value of [`BLCKSZ`] is currently `2^15` (`32768`). +/// This is determined by the `15-bit` widths of the `lp_off` and +/// `lp_len` fields in ItemIdData (see `include/storage/itemid.h`). +/// Changing [`BLCKSZ`] requires an `initdb`. pub const BLCKSZ: usize = 8192; + +/// Define SLRU segment size. A page is the same [`BLCKSZ``] and is used +/// everywhere else in Postgres. The segment size can be chosen somewhat +/// arbitrarily; we make it `32` pages by default, or `256Kb`, i.e. 1 +/// **million** transactions for `CLOG` or `64K` transactions for +/// `SUBTRANS`. +/// +/// # Note +/// +/// Because TransactionIds are 32 bits and wrap around at `0xFFFFFFFF``, +/// page numbering also wraps around at `0xFFFFFFFF/xxxx_XACTS_PER_PAGE` +/// (where `xxxx` is `CLOG` or `SUBTRANS`, respectively), and segment +/// numbering at +/// `0xFFFFFFFF/xxxx_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT`. We need +/// take no explicit notice of that fact in `slru.c`, except when +/// comparing segment and page numbers in SimpleLruTruncate +/// (see PagePrecedes()). +pub const SLRU_PAGES_PER_SEGMENT: usize = 32; diff --git a/pgxn/neon/communicator/src/neon_request.rs b/pgxn/neon/communicator/src/neon_request.rs index 89f66b619c..70dfae0261 100644 --- a/pgxn/neon/communicator/src/neon_request.rs +++ b/pgxn/neon/communicator/src/neon_request.rs @@ -44,8 +44,8 @@ pub enum NeonIOResult { /// the result pages are written to the shared memory addresses given in the request GetPageV, /// The result is written to the shared memory address given in the - /// request. - ReadSlruSegment, + /// request. The [`u64`] value here is the number of blocks. + ReadSlruSegment(u64), /// A prefetch request returns as soon as the request has been received by the communicator. /// It is processed in the background. diff --git a/pgxn/neon/communicator/src/worker_process/main_loop.rs b/pgxn/neon/communicator/src/worker_process/main_loop.rs index 4dce64b5f7..39824ba9e5 100644 --- a/pgxn/neon/communicator/src/worker_process/main_loop.rs +++ b/pgxn/neon/communicator/src/worker_process/main_loop.rs @@ -107,6 +107,9 @@ pub(super) async fn init( .integrated_cache_init_struct .worker_process_init(last_lsn, file_cache); + info!("Initialised integrated cache: {cache:?}"); + + // TODO: plumb through the stripe size. let tenant_id = TenantId::from_str(&tenant_id).expect("invalid tenant ID"); let timeline_id = TimelineId::from_str(&timeline_id).expect("invalid timeline ID"); let shard_spec = ShardSpec::new(shard_map, stripe_size).expect("invalid shard spec"); @@ -444,7 +447,9 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { std::ptr::copy_nonoverlapping(src.as_ptr(), dest.as_mut_ptr(), len); }; - NeonIOResult::ReadSlruSegment + let blocks_count = len / (crate::BLCKSZ * crate::SLRU_PAGES_PER_SEGMENT); + + NeonIOResult::ReadSlruSegment(blocks_count as _) } Err(err) => { info!("tonic error: {err:?}"); diff --git a/pgxn/neon/communicator_new.c b/pgxn/neon/communicator_new.c index 137c263993..d6183dc6c4 100644 --- a/pgxn/neon/communicator_new.c +++ b/pgxn/neon/communicator_new.c @@ -997,7 +997,12 @@ communicator_new_dbsize(Oid dbNode) } int -communicator_new_read_slru_segment(SlruKind kind, uint32_t segno, void *buffer) +// communicator_new_read_slru_segment(SlruKind kind, uint32_t segno, void *buffer) +communicator_new_read_slru_segment( + SlruKind kind, + uint32_t segno, + neon_request_lsns *request_lsns, + void *buffer) { NeonIOResult result = {}; NeonIORequest request = { @@ -1012,11 +1017,14 @@ communicator_new_read_slru_segment(SlruKind kind, uint32_t segno, void *buffer) elog(DEBUG5, "readslrusegment called for kind=%u, segno=%u", kind, segno); + /* FIXME: see `request_lsns` in main_loop.rs for why this is needed */ + XLogSetAsyncXactLSN(request_lsns->request_lsn); + perform_request(&request, &result); switch (result.tag) { case NeonIOResult_ReadSlruSegment: - return 0; + return result.read_slru_segment; case NeonIOResult_Error: ereport(ERROR, (errcode_for_file_access(), diff --git a/pgxn/neon/communicator_new.h b/pgxn/neon/communicator_new.h index 8114d75445..1e2497f8a7 100644 --- a/pgxn/neon/communicator_new.h +++ b/pgxn/neon/communicator_new.h @@ -38,8 +38,14 @@ extern void communicator_new_prefetch_register_bufferv(NRelFileInfo rinfo, ForkN BlockNumber nblocks); extern bool communicator_new_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blockno); -extern int communicator_new_read_slru_segment(SlruKind kind, uint32_t segno, - void *buffer); +// extern int communicator_new_read_slru_segment(SlruKind kind, uint32_t segno, +// void *buffer); +extern int communicator_new_read_slru_segment( + SlruKind kind, + uint32_t segno, + neon_request_lsns *request_lsns, + void *buffer +); /* Write requests, to keep the caches up-to-date */ extern void communicator_new_write_page(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blockno, diff --git a/pgxn/neon/pagestore_smgr.c b/pgxn/neon/pagestore_smgr.c index c70159d4b9..23841e9e91 100644 --- a/pgxn/neon/pagestore_smgr.c +++ b/pgxn/neon/pagestore_smgr.c @@ -2385,7 +2385,7 @@ neon_read_slru_segment(SMgrRelation reln, const char* path, int segno, void* buf request_lsns.effective_request_lsn = request_lsn; if (neon_enable_new_communicator) - n_blocks = communicator_new_read_slru_segment(kind, (uint32_t)segno, buffer); + n_blocks = communicator_new_read_slru_segment(kind, (uint32_t)segno, &request_lsns, buffer); else n_blocks = communicator_read_slru_segment(kind, segno, &request_lsns, buffer);