From 44269fcd5e5942d088ac1e261d9b3fbe032a8bad Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Mon, 5 May 2025 13:32:40 +0300 Subject: [PATCH] Implement simple eviction and free block tracking --- libs/neonart/src/lib.rs | 27 ++++- libs/neonart/src/tests.rs | 5 +- pgxn/neon/communicator/src/file_cache.rs | 41 +++++-- .../neon/communicator/src/integrated_cache.rs | 107 +++++++++++++++--- .../src/worker_process/main_loop.rs | 6 +- 5 files changed, 156 insertions(+), 30 deletions(-) diff --git a/libs/neonart/src/lib.rs b/libs/neonart/src/lib.rs index 5be80c8a81..9b6a8389bd 100644 --- a/libs/neonart/src/lib.rs +++ b/libs/neonart/src/lib.rs @@ -358,8 +358,13 @@ impl<'t, K: Key, V: Value, A: ArtAllocator> TreeWriteGuard<'t, K, V, A> { } /// Remove value - pub fn remove(self, key: &K) { - self.update_with_fn(key, |_| None) + pub fn remove(self, key: &K) -> Option { + let mut old = None; + self.update_with_fn(key, |existing| { + old = existing.cloned(); + None + }); + old } /// Update key using the given function. All the other modifying operations are based on this. @@ -419,6 +424,17 @@ pub struct TreeIterator impl TreeIterator where K: Key + for<'a> From<&'a [u8]>, { + pub fn new_wrapping() -> TreeIterator { + let mut next_key = Vec::new(); + next_key.resize(K::KEY_LEN, 0); + TreeIterator { + done: false, + next_key, + max_key: None, + phantom_key: PhantomData, + } + } + pub fn new(range: &std::ops::Range) -> TreeIterator { TreeIterator { done: false, @@ -429,7 +445,7 @@ impl TreeIterator } - pub fn next<'g, V>(&mut self, read_guard: TreeReadGuard<'g, K, V>) -> Option<(K, V)> + pub fn next<'g, V>(&mut self, read_guard: &'g TreeReadGuard<'g, K, V>) -> Option<(K, &'g V)> where V: Value { if self.done { @@ -437,6 +453,8 @@ impl TreeIterator } if let Some((k , v)) = algorithm::iter_next(&mut self.next_key, read_guard.tree.root, &read_guard.epoch_pin) { assert_eq!(k.len(), self.next_key.len()); + + // Check if we reached the end of the range if let Some(max_key) = &self.max_key { assert_eq!(k.len(), max_key.len()); if k.as_slice() >= max_key.as_slice() { @@ -444,12 +462,13 @@ impl TreeIterator return None; } } + // increment the key self.next_key = k.clone(); increment_key(self.next_key.as_mut_slice()); let k = k.as_slice().into(); - Some((k, v.clone())) + Some((k, v)) } else { self.done = true; None diff --git a/libs/neonart/src/tests.rs b/libs/neonart/src/tests.rs index 308001f8ce..0b6ab685e8 100644 --- a/libs/neonart/src/tests.rs +++ b/libs/neonart/src/tests.rs @@ -131,8 +131,9 @@ fn test_iter>(tree: &TreeWriteAccess, let mut iter = TreeIterator::new(&(TestKey::MIN..TestKey::MAX)); loop { - let shadow_item = shadow_iter.next().map(|(k, v)| (k.clone(), v.clone())); - let item = iter.next(tree.start_read()); + let shadow_item = shadow_iter.next().map(|(k, v)| (k.clone(), v)); + let r = tree.start_read(); + let item = iter.next(&r); if shadow_item != item { eprintln!("FAIL: iterator returned {:?}, expected {:?}", item, shadow_item); diff --git a/pgxn/neon/communicator/src/file_cache.rs b/pgxn/neon/communicator/src/file_cache.rs index 9509c15d25..45cc7b02a2 100644 --- a/pgxn/neon/communicator/src/file_cache.rs +++ b/pgxn/neon/communicator/src/file_cache.rs @@ -11,10 +11,11 @@ use std::fs::File; use std::path::Path; use std::sync::Arc; -use std::sync::atomic::{AtomicU64, Ordering}; use tokio_epoll_uring; +use std::sync::Mutex; + use crate::BLCKSZ; pub type CacheBlock = u64; @@ -24,15 +25,21 @@ pub struct FileCache { file: Arc, - // TODO: there's no reclamation mechanism, the cache grows - // indefinitely. This is the next free block, i.e. the current - // size of the file - next_free_block: AtomicU64, + free_list: Mutex +} + +// TODO: We keep track of all free blocks in this vec. That doesn't really scale. +struct FreeList { + next_free_block: CacheBlock, + max_blocks: u64, + + free_blocks: Vec, } impl FileCache { pub fn new( file_cache_path: &Path, + initial_size: u64, uring_system: tokio_epoll_uring::SystemHandle, ) -> Result { let file = std::fs::OpenOptions::new() @@ -47,7 +54,11 @@ impl FileCache { Ok(FileCache { file: Arc::new(file), uring_system, - next_free_block: AtomicU64::new(0), + free_list: Mutex::new(FreeList { + next_free_block: 0, + max_blocks: initial_size, + free_blocks: Vec::new(), + }), }) } @@ -94,8 +105,22 @@ impl FileCache { Ok(()) } - pub fn alloc_block(&self) -> CacheBlock { - self.next_free_block.fetch_add(1, Ordering::Relaxed) + pub fn alloc_block(&self) -> Option { + let mut free_list = self.free_list.lock().unwrap(); + if let Some(x) = free_list.free_blocks.pop() { + return Some(x); + } + if free_list.next_free_block < free_list.max_blocks { + let result = free_list.next_free_block; + free_list.next_free_block -= 1; + return Some(result); + } + None + } + + pub fn dealloc_block(&self, cache_block: CacheBlock) { + let mut free_list = self.free_list.lock().unwrap(); + free_list.free_blocks.push(cache_block); } } diff --git a/pgxn/neon/communicator/src/integrated_cache.rs b/pgxn/neon/communicator/src/integrated_cache.rs index 17d374d697..a9ba1930e0 100644 --- a/pgxn/neon/communicator/src/integrated_cache.rs +++ b/pgxn/neon/communicator/src/integrated_cache.rs @@ -1,6 +1,6 @@ //! Integrated communicator cache //! -//! Tracks: +//! It tracks: //! - Relation sizes and existence //! - Last-written LSN //! - TODO: Block cache (also known as LFC) @@ -24,6 +24,7 @@ use std::mem::MaybeUninit; use std::ops::Range; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use utils::lsn::Lsn; use zerocopy::FromBytes; @@ -55,9 +56,12 @@ pub struct IntegratedCacheWriteAccess<'t> { neonart::ArtMultiSlabAllocator<'t, TreeEntry>, >, - global_lw_lsn: Lsn, + global_lw_lsn: AtomicU64, file_cache: Option, + + // Fields for eviction + clock_hand: std::sync::Mutex>, } /// Represents read-only access to the integrated cache. Backend processes have this. @@ -99,8 +103,9 @@ impl<'t> IntegratedCacheInitStruct<'t> { IntegratedCacheWriteAccess { cache_tree: tree_writer, - global_lw_lsn: lsn, + global_lw_lsn: AtomicU64::new(lsn.0), file_cache, + clock_hand: std::sync::Mutex::new(TreeIterator::new_wrapping()), } } @@ -124,10 +129,22 @@ enum TreeEntry { Block(BlockEntry), } -#[derive(Clone)] struct BlockEntry { lw_lsn: Lsn, cache_block: Option, + + // 'referenced' bit for the clock algorithm + referenced: AtomicBool, +} + +impl Clone for BlockEntry { + fn clone(&self) -> BlockEntry { + BlockEntry { + lw_lsn: self.lw_lsn, + cache_block: self.cache_block, + referenced: AtomicBool::new(self.referenced.load(Ordering::Relaxed)), + } + } } #[derive(Clone, Default)] @@ -233,7 +250,8 @@ impl<'t> IntegratedCacheWriteAccess<'t> { if let Some(nblocks) = get_rel_size(&r, rel) { CacheResult::Found(nblocks) } else { - CacheResult::NotFound(self.global_lw_lsn) + let lsn = Lsn(self.global_lw_lsn.load(Ordering::Relaxed)); + CacheResult::NotFound(lsn) } } @@ -262,7 +280,8 @@ impl<'t> IntegratedCacheWriteAccess<'t> { Ok(CacheResult::NotFound(block_entry.lw_lsn)) } } else { - Ok(CacheResult::NotFound(self.global_lw_lsn)) + let lsn = Lsn(self.global_lw_lsn.load(Ordering::Relaxed)); + Ok(CacheResult::NotFound(lsn)) } } @@ -285,7 +304,8 @@ impl<'t> IntegratedCacheWriteAccess<'t> { Ok(CacheResult::NotFound(block_entry.lw_lsn)) } } else { - Ok(CacheResult::NotFound(self.global_lw_lsn)) + let lsn = Lsn(self.global_lw_lsn.load(Ordering::Relaxed)); + Ok(CacheResult::NotFound(lsn)) } } @@ -297,13 +317,19 @@ impl<'t> IntegratedCacheWriteAccess<'t> { if let Some(_rel_entry) = r.get(&TreeKey::from(rel)) { CacheResult::Found(true) } else { - CacheResult::NotFound(self.global_lw_lsn) + let lsn = Lsn(self.global_lw_lsn.load(Ordering::Relaxed)); + CacheResult::NotFound(lsn) } } pub fn get_db_size(&'t self, _db_oid: u32) -> CacheResult { + // TODO: it would be nice to cache database sizes too. Getting the database size + // is not a very common operation, but when you do it, it's often interactive, with + // e.g. psql \l+ command, so the user will feel the latency. + // fixme: is this right lsn? - CacheResult::NotFound(self.global_lw_lsn) + let lsn = Lsn(self.global_lw_lsn.load(Ordering::Relaxed)); + CacheResult::NotFound(lsn) } pub fn remember_rel_size(&'t self, rel: &RelTag, nblocks: u32) { @@ -329,6 +355,15 @@ impl<'t> IntegratedCacheWriteAccess<'t> { let key = TreeKey::from((rel, block_number)); + let mut reserved_cache_block = loop { + if let Some(x) = file_cache.alloc_block() { + break Some(x); + } + if let Some(x) = self.try_evict_one_cache_block() { + break Some(x); + } + }; + let mut cache_block = None; w.update_with_fn(&key, |existing| { @@ -340,24 +375,30 @@ impl<'t> IntegratedCacheWriteAccess<'t> { }; block_entry.lw_lsn = lw_lsn; if block_entry.cache_block.is_none() { - block_entry.cache_block = Some(file_cache.alloc_block()); + block_entry.cache_block = reserved_cache_block.take(); } cache_block = block_entry.cache_block; Some(TreeEntry::Block(block_entry)) } else { - cache_block = Some(file_cache.alloc_block()); + cache_block = reserved_cache_block.take(); Some(TreeEntry::Block(BlockEntry { lw_lsn: lw_lsn, cache_block: cache_block, + referenced: AtomicBool::new(true), })) } }); + + if let Some(x) = reserved_cache_block { + file_cache.dealloc_block(x); + } + let cache_block = cache_block.unwrap(); file_cache .write_block(cache_block, src) .await .expect("error writing to cache"); - } + }; } /// Forget information about given relation in the cache. (For DROP TABLE and such) @@ -367,11 +408,51 @@ impl<'t> IntegratedCacheWriteAccess<'t> { // also forget all cached blocks for the relation let mut iter = TreeIterator::new(&key_range_for_rel_blocks(rel)); - while let Some((k, _v)) = iter.next(self.cache_tree.start_read()) { + let r = self.cache_tree.start_read(); + while let Some((k, _v)) = iter.next(&r) { let w = self.cache_tree.start_write(); w.remove(&k); } } + + // Maintenance routines + + /// Evict one block from the file cache. This is used when the file cache fills up + /// Returns the evicted block, it's not put to the fre list, so it's available for the + /// caller to use immediately. + pub fn try_evict_one_cache_block(&self) -> Option { + let mut clock_hand = self.clock_hand.lock().unwrap(); + for _ in 0..1000 { + let r = self.cache_tree.start_read(); + match clock_hand.next(&r) { + None => { + // The cache is completely empty. Pretty unexpected that this function + // was called then.. + }, + Some((_k, TreeEntry::Rel(_))) => { + // ignore rel entries for now. + // TODO: They stick in the cache forever + }, + Some((k, TreeEntry::Block(blk_entry))) => { + if !blk_entry.referenced.swap(false, Ordering::Relaxed) { + // Evict this + let w = self.cache_tree.start_write(); + let old = w.remove(&k); + if let Some(TreeEntry::Block(old)) = old { + let _ = self.global_lw_lsn.fetch_max(old.lw_lsn.0, Ordering::Relaxed); + if let Some(cache_block) = old.cache_block { + return Some(cache_block); + } + } else { + assert!(old.is_none()); + } + } + }, + } + } + // Give up if we didn't find anything + None + } } /// Read relation size from the cache. diff --git a/pgxn/neon/communicator/src/worker_process/main_loop.rs b/pgxn/neon/communicator/src/worker_process/main_loop.rs index 948b8b7394..afb12e4e4e 100644 --- a/pgxn/neon/communicator/src/worker_process/main_loop.rs +++ b/pgxn/neon/communicator/src/worker_process/main_loop.rs @@ -39,7 +39,7 @@ pub(super) async fn init( timeline_id: String, auth_token: Option, shard_map: HashMap, - _file_cache_size: u64, + file_cache_size: u64, file_cache_path: Option, ) -> CommunicatorWorkerProcessStruct<'static> { let last_lsn = get_request_lsn(); @@ -47,11 +47,11 @@ pub(super) async fn init( let uring_system = tokio_epoll_uring::System::launch().await.unwrap(); let file_cache = if let Some(path) = file_cache_path { - Some(FileCache::new(&path, uring_system).expect("could not create cache file")) + Some(FileCache::new(&path, file_cache_size, uring_system).expect("could not create cache file")) } else { // FIXME: temporarily for testing, use LFC even if disabled Some( - FileCache::new(&PathBuf::from("new_filecache"), uring_system) + FileCache::new(&PathBuf::from("new_filecache"), 1000, uring_system) .expect("could not create cache file"), ) };