mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-05 20:42:54 +00:00
Add LFC resizing implementation and utilities for hole punching
This commit is contained in:
@@ -390,6 +390,16 @@ where
|
||||
map.get_num_buckets()
|
||||
}
|
||||
|
||||
/// Returns the logical number of buckets in the table (aka the amount of allocatable buckets).
|
||||
pub fn get_num_logical_buckets(&self) -> usize {
|
||||
let map = unsafe { self.shared_ptr.as_ref() }.unwrap().read();
|
||||
if map.alloc_limit == INVALID_POS {
|
||||
map.get_num_buckets()
|
||||
} else {
|
||||
map.alloc_limit as usize
|
||||
}
|
||||
}
|
||||
|
||||
/// Return the key and value stored in bucket with given index. This can be used to
|
||||
/// iterate through the hash map.
|
||||
// TODO: An Iterator might be nicer. The communicator's clock algorithm needs to
|
||||
|
||||
@@ -27,7 +27,12 @@ pub struct FileCache {
|
||||
file: Arc<File>,
|
||||
|
||||
free_list: Mutex<FreeList>,
|
||||
|
||||
// NOTE(quantumish): when we punch holes in the LFC to shrink the file size,
|
||||
// we *shouldn't* add them to the free list (since that's used to write new entries)
|
||||
// but we still should remember them so that we can add them to the free list when
|
||||
// a growth later occurs. this still has the same scalability flaws as the freelist
|
||||
hole_list: Mutex<Vec<CacheBlock>>,
|
||||
|
||||
// metrics
|
||||
max_blocks_gauge: metrics::IntGauge,
|
||||
num_free_blocks_gauge: metrics::IntGauge,
|
||||
@@ -81,6 +86,7 @@ impl FileCache {
|
||||
max_blocks: initial_size,
|
||||
free_blocks: Vec::new(),
|
||||
}),
|
||||
hole_list: Mutex::new(Vec::new()),
|
||||
max_blocks_gauge,
|
||||
num_free_blocks_gauge,
|
||||
})
|
||||
@@ -129,11 +135,60 @@ impl FileCache {
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
|
||||
pub fn dealloc_block(&self, cache_block: CacheBlock) {
|
||||
let mut free_list = self.free_list.lock().unwrap();
|
||||
free_list.free_blocks.push(cache_block);
|
||||
}
|
||||
|
||||
/// "Delete" a block via fallocate's hole punching feature.
|
||||
// TODO(quantumish): possibly implement some batching? lots of syscalls...
|
||||
// unfortunately should be at odds with our access pattern as entries in the hashmap
|
||||
// should have no correlation with the location of blocks in the actual LFC file.
|
||||
pub fn delete_block(&self, cache_block: CacheBlock) {
|
||||
use nix::fcntl as nix;
|
||||
if let Err(e) = nix::fallocate(
|
||||
self.file.clone(),
|
||||
nix::FallocateFlags::FALLOC_FL_PUNCH_HOLE
|
||||
.union(nix::FallocateFlags::FALLOC_FL_KEEP_SIZE),
|
||||
(cache_block as usize * BLCKSZ) as libc::off_t,
|
||||
BLCKSZ as libc::off_t
|
||||
) {
|
||||
tracing::error!("failed to punch hole in LFC at block {cache_block}: {e}");
|
||||
return;
|
||||
}
|
||||
|
||||
let mut hole_list = self.hole_list.lock().unwrap();
|
||||
hole_list.push(cache_block);
|
||||
}
|
||||
|
||||
/// Attempt to reclaim `num_blocks` of previously hole-punched blocks.
|
||||
// TODO(quantumish): could probably just be merged w/ grow() - is there ever a reason
|
||||
// to call this separately?
|
||||
pub fn undelete_blocks(&self, num_blocks: u64) -> u64 {
|
||||
// Safety: nothing else should ever need to take both of these locks at once.
|
||||
// TODO(quantumish): may just be worth putting both under the same lock.
|
||||
let mut hole_list = self.hole_list.lock().unwrap();
|
||||
let mut free_list = self.free_list.lock().unwrap();
|
||||
let amt = hole_list.len().min(num_blocks as usize);
|
||||
for _ in 0..amt {
|
||||
free_list.free_blocks.push(hole_list.pop().unwrap());
|
||||
}
|
||||
amt as u64
|
||||
}
|
||||
|
||||
/// Physically grows the file and expands the freelist.
|
||||
pub fn grow(&self, num_blocks: u64) {
|
||||
self.free_list.lock().unwrap().max_blocks += num_blocks;
|
||||
}
|
||||
|
||||
/// Returns number of blocks in the remaining space.
|
||||
pub fn free_space(&self) -> u64 {
|
||||
let free_list = self.free_list.lock().unwrap();
|
||||
let slab = free_list.max_blocks - free_list.next_free_block.min(free_list.max_blocks);
|
||||
let fragments = free_list.free_blocks.len() as u64;
|
||||
slab + fragments
|
||||
}
|
||||
}
|
||||
|
||||
impl metrics::core::Collector for FileCache {
|
||||
|
||||
@@ -587,7 +587,7 @@ impl IntegratedCacheWriteAccess {
|
||||
(*clock_hand) += 1;
|
||||
|
||||
let mut evict_this = false;
|
||||
let num_buckets = self.block_map.get_num_buckets();
|
||||
let num_buckets = self.block_map.get_num_logical_buckets();
|
||||
match self
|
||||
.block_map
|
||||
.get_at_bucket((*clock_hand) % num_buckets)
|
||||
@@ -638,26 +638,43 @@ impl IntegratedCacheWriteAccess {
|
||||
|
||||
/// Resize the local file cache.
|
||||
pub fn resize_file_cache(&'static self, num_blocks: u32) {
|
||||
// TODO(quantumish): unclear what the semantics of this entire operation is
|
||||
// if there is no file cache.
|
||||
let file_cache = self.file_cache.as_ref().unwrap();
|
||||
let old_num_blocks = self.block_map.get_num_buckets() as u32;
|
||||
|
||||
tracing::error!("trying to resize cache to {num_blocks} blocks");
|
||||
let difference = old_num_blocks.abs_diff(num_blocks);
|
||||
if old_num_blocks < num_blocks {
|
||||
tracing::error!("growing to {num_blocks}!");
|
||||
if let Err(err) = self.block_map.grow(num_blocks) {
|
||||
tracing::warn!(
|
||||
tracing::error!(
|
||||
"could not grow file cache to {} blocks (old size {}): {}",
|
||||
num_blocks,
|
||||
old_num_blocks,
|
||||
err
|
||||
);
|
||||
}
|
||||
let remaining = file_cache.undelete_blocks(difference as u64);
|
||||
file_cache.grow(remaining);
|
||||
debug_assert!(file_cache.free_space() > remaining);
|
||||
} else {
|
||||
let page_evictions = &self.page_evictions_counter;
|
||||
let global_lw_lsn = &self.global_lw_lsn;
|
||||
let block_map = self.block_map.clone();
|
||||
tokio::task::spawn_blocking(move || {
|
||||
block_map.begin_shrink(num_blocks);
|
||||
// Evict everything in to-be-shrinked space
|
||||
// Don't hold clock hand lock any longer than necessary, should be ok to evict in parallel
|
||||
// but we don't want to compete with the eviction logic in the to-be-shrunk region.
|
||||
{
|
||||
let mut clock_hand = self.clock_hand.lock().unwrap();
|
||||
|
||||
block_map.begin_shrink(num_blocks);
|
||||
// Avoid skipping over beginning entries due to modulo shift.
|
||||
if *clock_hand > num_blocks as usize {
|
||||
*clock_hand = num_blocks as usize - 1;
|
||||
}
|
||||
}
|
||||
// Try and evict everything in to-be-shrinked space
|
||||
// TODO(quantumish): consider moving things ahead of clock hand?
|
||||
let mut successful_evictions = 0;
|
||||
for i in num_blocks..old_num_blocks {
|
||||
let Some(entry) = block_map.entry_at_bucket(i as usize) else {
|
||||
continue;
|
||||
@@ -673,30 +690,65 @@ impl IntegratedCacheWriteAccess {
|
||||
continue;
|
||||
}
|
||||
_ = global_lw_lsn.fetch_max(old.lw_lsn.load().0, Ordering::Relaxed);
|
||||
old.cache_block.store(INVALID_CACHE_BLOCK, Ordering::Relaxed);
|
||||
let cache_block = old.cache_block.swap(INVALID_CACHE_BLOCK, Ordering::Relaxed);
|
||||
entry.remove();
|
||||
|
||||
file_cache.delete_block(cache_block);
|
||||
|
||||
successful_evictions += 1;
|
||||
// TODO(quantumish): is this expected behavior?
|
||||
page_evictions.inc();
|
||||
}
|
||||
|
||||
// if let Err(err) = block_map.finish_shrink() {
|
||||
// tracing::warn!(
|
||||
// "could not shrink file cache to {} blocks (old size {}): {}",
|
||||
// num_blocks,
|
||||
// old_num_blocks,
|
||||
// err
|
||||
// );
|
||||
// }
|
||||
// Don hold lock for longer than necessary.
|
||||
// {
|
||||
// let mut clock_hand = self.clock_hand.lock().unwrap();
|
||||
|
||||
// // Make sure the clock hand resets properly.
|
||||
// // TODO(quantumish): confirm this is expected behavior?
|
||||
// if *clock_hand > num_blocks as usize {
|
||||
// *clock_hand = num_blocks as usize - 1;
|
||||
// }
|
||||
// }
|
||||
// We want to quickly clear space in the LFC. Regression tests expect to see
|
||||
// an immediate-ish change in the file size, so we evict other entries to reclaim
|
||||
// enough space. Waiting for stragglers at the end of the map could *in theory*
|
||||
// take indefinite amounts of time depending on how long they stay pinned.
|
||||
while successful_evictions < difference {
|
||||
if let Some(i) = self.try_evict_one_cache_block() {
|
||||
file_cache.delete_block(i);
|
||||
successful_evictions += 1;
|
||||
}
|
||||
}
|
||||
|
||||
// Try again at evicting entries in to-be-shrunk region, except don't give up this time.
|
||||
// Not a great solution all around: unnecessary scanning, spinning, and code duplication.
|
||||
// Not sure what a good alternative is though, as there may be enough of these entries that
|
||||
// we can't store a Vec of them and we ultimately can't proceed until they're evicted.
|
||||
// Maybe a notification system for unpinning somehow? This also makes me think that pinning
|
||||
// of entries should be a first class concept within the hashmap implementation...
|
||||
'outer: for i in num_blocks..old_num_blocks {
|
||||
loop {
|
||||
let Some(entry) = block_map.entry_at_bucket(i as usize) else {
|
||||
continue 'outer;
|
||||
};
|
||||
let old = entry.get();
|
||||
if old.pinned.load(Ordering::Relaxed) != 0 {
|
||||
drop(entry);
|
||||
// Painful...
|
||||
std::thread::sleep(std::time::Duration::from_secs(1));
|
||||
continue;
|
||||
}
|
||||
_ = global_lw_lsn.fetch_max(old.lw_lsn.load().0, Ordering::Relaxed);
|
||||
let cache_block = old.cache_block.swap(INVALID_CACHE_BLOCK, Ordering::Relaxed);
|
||||
entry.remove();
|
||||
|
||||
file_cache.delete_block(cache_block);
|
||||
|
||||
// TODO(quantumish): is this expected behavior?
|
||||
page_evictions.inc();
|
||||
continue 'outer;
|
||||
}
|
||||
}
|
||||
|
||||
if let Err(err) = self.block_map.finish_shrink() {
|
||||
tracing::warn!(
|
||||
"could not shrink file cache to {} blocks (old size {}): {}",
|
||||
num_blocks,
|
||||
old_num_blocks,
|
||||
err
|
||||
);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user