From 7f63cecd5f9631c4d5dd4774ce18a2864694d50e Mon Sep 17 00:00:00 2001 From: quantumish Date: Thu, 24 Jul 2025 19:25:13 -0700 Subject: [PATCH] Add LFC resizing implementation and utilities for hole punching --- libs/neon-shmem/src/hash.rs | 10 ++ pgxn/neon/communicator/src/file_cache.rs | 59 +++++++++- .../neon/communicator/src/integrated_cache.rs | 102 +++++++++++++----- 3 files changed, 144 insertions(+), 27 deletions(-) diff --git a/libs/neon-shmem/src/hash.rs b/libs/neon-shmem/src/hash.rs index b58f900029..28309fa5de 100644 --- a/libs/neon-shmem/src/hash.rs +++ b/libs/neon-shmem/src/hash.rs @@ -390,6 +390,16 @@ where map.get_num_buckets() } + /// Returns the logical number of buckets in the table (aka the amount of allocatable buckets). + pub fn get_num_logical_buckets(&self) -> usize { + let map = unsafe { self.shared_ptr.as_ref() }.unwrap().read(); + if map.alloc_limit == INVALID_POS { + map.get_num_buckets() + } else { + map.alloc_limit as usize + } + } + /// Return the key and value stored in bucket with given index. This can be used to /// iterate through the hash map. // TODO: An Iterator might be nicer. The communicator's clock algorithm needs to diff --git a/pgxn/neon/communicator/src/file_cache.rs b/pgxn/neon/communicator/src/file_cache.rs index f153174c6b..88141c6798 100644 --- a/pgxn/neon/communicator/src/file_cache.rs +++ b/pgxn/neon/communicator/src/file_cache.rs @@ -27,7 +27,12 @@ pub struct FileCache { file: Arc, free_list: Mutex, - + // NOTE(quantumish): when we punch holes in the LFC to shrink the file size, + // we *shouldn't* add them to the free list (since that's used to write new entries) + // but we still should remember them so that we can add them to the free list when + // a growth later occurs. this still has the same scalability flaws as the freelist + hole_list: Mutex>, + // metrics max_blocks_gauge: metrics::IntGauge, num_free_blocks_gauge: metrics::IntGauge, @@ -81,6 +86,7 @@ impl FileCache { max_blocks: initial_size, free_blocks: Vec::new(), }), + hole_list: Mutex::new(Vec::new()), max_blocks_gauge, num_free_blocks_gauge, }) @@ -129,11 +135,60 @@ impl FileCache { } None } - + pub fn dealloc_block(&self, cache_block: CacheBlock) { let mut free_list = self.free_list.lock().unwrap(); free_list.free_blocks.push(cache_block); } + + /// "Delete" a block via fallocate's hole punching feature. + // TODO(quantumish): possibly implement some batching? lots of syscalls... + // unfortunately should be at odds with our access pattern as entries in the hashmap + // should have no correlation with the location of blocks in the actual LFC file. + pub fn delete_block(&self, cache_block: CacheBlock) { + use nix::fcntl as nix; + if let Err(e) = nix::fallocate( + self.file.clone(), + nix::FallocateFlags::FALLOC_FL_PUNCH_HOLE + .union(nix::FallocateFlags::FALLOC_FL_KEEP_SIZE), + (cache_block as usize * BLCKSZ) as libc::off_t, + BLCKSZ as libc::off_t + ) { + tracing::error!("failed to punch hole in LFC at block {cache_block}: {e}"); + return; + } + + let mut hole_list = self.hole_list.lock().unwrap(); + hole_list.push(cache_block); + } + + /// Attempt to reclaim `num_blocks` of previously hole-punched blocks. + // TODO(quantumish): could probably just be merged w/ grow() - is there ever a reason + // to call this separately? + pub fn undelete_blocks(&self, num_blocks: u64) -> u64 { + // Safety: nothing else should ever need to take both of these locks at once. + // TODO(quantumish): may just be worth putting both under the same lock. + let mut hole_list = self.hole_list.lock().unwrap(); + let mut free_list = self.free_list.lock().unwrap(); + let amt = hole_list.len().min(num_blocks as usize); + for _ in 0..amt { + free_list.free_blocks.push(hole_list.pop().unwrap()); + } + amt as u64 + } + + /// Physically grows the file and expands the freelist. + pub fn grow(&self, num_blocks: u64) { + self.free_list.lock().unwrap().max_blocks += num_blocks; + } + + /// Returns number of blocks in the remaining space. + pub fn free_space(&self) -> u64 { + let free_list = self.free_list.lock().unwrap(); + let slab = free_list.max_blocks - free_list.next_free_block.min(free_list.max_blocks); + let fragments = free_list.free_blocks.len() as u64; + slab + fragments + } } impl metrics::core::Collector for FileCache { diff --git a/pgxn/neon/communicator/src/integrated_cache.rs b/pgxn/neon/communicator/src/integrated_cache.rs index 469054a1b9..861b5d2865 100644 --- a/pgxn/neon/communicator/src/integrated_cache.rs +++ b/pgxn/neon/communicator/src/integrated_cache.rs @@ -587,7 +587,7 @@ impl IntegratedCacheWriteAccess { (*clock_hand) += 1; let mut evict_this = false; - let num_buckets = self.block_map.get_num_buckets(); + let num_buckets = self.block_map.get_num_logical_buckets(); match self .block_map .get_at_bucket((*clock_hand) % num_buckets) @@ -638,26 +638,43 @@ impl IntegratedCacheWriteAccess { /// Resize the local file cache. pub fn resize_file_cache(&'static self, num_blocks: u32) { + // TODO(quantumish): unclear what the semantics of this entire operation is + // if there is no file cache. + let file_cache = self.file_cache.as_ref().unwrap(); let old_num_blocks = self.block_map.get_num_buckets() as u32; - + tracing::error!("trying to resize cache to {num_blocks} blocks"); + let difference = old_num_blocks.abs_diff(num_blocks); if old_num_blocks < num_blocks { - tracing::error!("growing to {num_blocks}!"); if let Err(err) = self.block_map.grow(num_blocks) { - tracing::warn!( + tracing::error!( "could not grow file cache to {} blocks (old size {}): {}", num_blocks, old_num_blocks, err ); } + let remaining = file_cache.undelete_blocks(difference as u64); + file_cache.grow(remaining); + debug_assert!(file_cache.free_space() > remaining); } else { let page_evictions = &self.page_evictions_counter; let global_lw_lsn = &self.global_lw_lsn; let block_map = self.block_map.clone(); tokio::task::spawn_blocking(move || { - block_map.begin_shrink(num_blocks); - // Evict everything in to-be-shrinked space + // Don't hold clock hand lock any longer than necessary, should be ok to evict in parallel + // but we don't want to compete with the eviction logic in the to-be-shrunk region. + { + let mut clock_hand = self.clock_hand.lock().unwrap(); + + block_map.begin_shrink(num_blocks); + // Avoid skipping over beginning entries due to modulo shift. + if *clock_hand > num_blocks as usize { + *clock_hand = num_blocks as usize - 1; + } + } + // Try and evict everything in to-be-shrinked space // TODO(quantumish): consider moving things ahead of clock hand? + let mut successful_evictions = 0; for i in num_blocks..old_num_blocks { let Some(entry) = block_map.entry_at_bucket(i as usize) else { continue; @@ -673,30 +690,65 @@ impl IntegratedCacheWriteAccess { continue; } _ = global_lw_lsn.fetch_max(old.lw_lsn.load().0, Ordering::Relaxed); - old.cache_block.store(INVALID_CACHE_BLOCK, Ordering::Relaxed); + let cache_block = old.cache_block.swap(INVALID_CACHE_BLOCK, Ordering::Relaxed); entry.remove(); + + file_cache.delete_block(cache_block); + + successful_evictions += 1; // TODO(quantumish): is this expected behavior? page_evictions.inc(); } - // if let Err(err) = block_map.finish_shrink() { - // tracing::warn!( - // "could not shrink file cache to {} blocks (old size {}): {}", - // num_blocks, - // old_num_blocks, - // err - // ); - // } - // Don hold lock for longer than necessary. - // { - // let mut clock_hand = self.clock_hand.lock().unwrap(); - - // // Make sure the clock hand resets properly. - // // TODO(quantumish): confirm this is expected behavior? - // if *clock_hand > num_blocks as usize { - // *clock_hand = num_blocks as usize - 1; - // } - // } + // We want to quickly clear space in the LFC. Regression tests expect to see + // an immediate-ish change in the file size, so we evict other entries to reclaim + // enough space. Waiting for stragglers at the end of the map could *in theory* + // take indefinite amounts of time depending on how long they stay pinned. + while successful_evictions < difference { + if let Some(i) = self.try_evict_one_cache_block() { + file_cache.delete_block(i); + successful_evictions += 1; + } + } + + // Try again at evicting entries in to-be-shrunk region, except don't give up this time. + // Not a great solution all around: unnecessary scanning, spinning, and code duplication. + // Not sure what a good alternative is though, as there may be enough of these entries that + // we can't store a Vec of them and we ultimately can't proceed until they're evicted. + // Maybe a notification system for unpinning somehow? This also makes me think that pinning + // of entries should be a first class concept within the hashmap implementation... + 'outer: for i in num_blocks..old_num_blocks { + loop { + let Some(entry) = block_map.entry_at_bucket(i as usize) else { + continue 'outer; + }; + let old = entry.get(); + if old.pinned.load(Ordering::Relaxed) != 0 { + drop(entry); + // Painful... + std::thread::sleep(std::time::Duration::from_secs(1)); + continue; + } + _ = global_lw_lsn.fetch_max(old.lw_lsn.load().0, Ordering::Relaxed); + let cache_block = old.cache_block.swap(INVALID_CACHE_BLOCK, Ordering::Relaxed); + entry.remove(); + + file_cache.delete_block(cache_block); + + // TODO(quantumish): is this expected behavior? + page_evictions.inc(); + continue 'outer; + } + } + + if let Err(err) = self.block_map.finish_shrink() { + tracing::warn!( + "could not shrink file cache to {} blocks (old size {}): {}", + num_blocks, + old_num_blocks, + err + ); + } }); } }