From fb510de86c6c4fa0f13076419184e1836c53d7e8 Mon Sep 17 00:00:00 2001 From: David Freifeld Date: Mon, 21 Jul 2025 14:57:30 -0700 Subject: [PATCH] Connect LFC resize logic to hashmap shrink API --- libs/neon-shmem/src/hash.rs | 2 +- .../neon/communicator/src/integrated_cache.rs | 44 ++++++++++++++++++- 2 files changed, 44 insertions(+), 2 deletions(-) diff --git a/libs/neon-shmem/src/hash.rs b/libs/neon-shmem/src/hash.rs index 58726b9ba3..a8b60ba64c 100644 --- a/libs/neon-shmem/src/hash.rs +++ b/libs/neon-shmem/src/hash.rs @@ -510,7 +510,7 @@ where /// # Panics /// Panics if called on a map initialized with [`HashMapInit::with_fixed`] or if `num_buckets` is /// greater than the number of buckets in the map. - pub fn begin_shrink(&mut self, num_buckets: u32) { + pub fn begin_shrink(&self, num_buckets: u32) { let mut map = unsafe { self.shared_ptr.as_mut() }.unwrap().write(); assert!( num_buckets <= map.get_num_buckets() as u32, diff --git a/pgxn/neon/communicator/src/integrated_cache.rs b/pgxn/neon/communicator/src/integrated_cache.rs index 5f0ca5f510..63c771b94c 100644 --- a/pgxn/neon/communicator/src/integrated_cache.rs +++ b/pgxn/neon/communicator/src/integrated_cache.rs @@ -639,7 +639,49 @@ impl<'t> IntegratedCacheWriteAccess<'t> { ); } } else { - // TODO: Shrinking not implemented yet + // Don't hold lock for longer than necessary. + { + let mut clock_hand = self.clock_hand.lock().unwrap(); + + // Make sure the clock hand resets properly. + // TODO(quantumish): confirm this is expected behavior? + if *clock_hand > num_blocks as usize { + *clock_hand = num_blocks as usize - 1; + } + self.block_map.begin_shrink(num_blocks); + } + + // Evict everything in to-be-shrinked space + // TODO(quantumish): consider moving ahead of clock hand? + for i in num_blocks..old_num_blocks { + let Some(entry) = self.block_map.entry_at_bucket(i as usize) else { + continue; + }; + let old = entry.get(); + if old.pinned.load(Ordering::Relaxed) != 0 { + tracing::warn!( + "could not shrink file cache to {} blocks (old size {}): entry {} is pinned", + num_blocks, + old_num_blocks, + i + ); + return; + } + _ = self.global_lw_lsn.fetch_max(old.lw_lsn.load().0, Ordering::Relaxed); + old.cache_block.store(INVALID_CACHE_BLOCK, Ordering::Relaxed); + entry.remove(); + // TODO(quantumish): is this expected behavior? + self.page_evictions_counter.inc(); + } + + if let Err(err) = self.block_map.finish_shrink() { + tracing::warn!( + "could not shrink file cache to {} blocks (old size {}): {}", + num_blocks, + old_num_blocks, + err + ); + } } }