Implement simple eviction and free block tracking

This commit is contained in:
Heikki Linnakangas
2025-05-05 13:32:40 +03:00
parent 44cc648dc8
commit 44269fcd5e
5 changed files with 156 additions and 30 deletions

View File

@@ -358,8 +358,13 @@ impl<'t, K: Key, V: Value, A: ArtAllocator<V>> TreeWriteGuard<'t, K, V, A> {
}
/// Remove value
pub fn remove(self, key: &K) {
self.update_with_fn(key, |_| None)
pub fn remove(self, key: &K) -> Option<V> {
let mut old = None;
self.update_with_fn(key, |existing| {
old = existing.cloned();
None
});
old
}
/// Update key using the given function. All the other modifying operations are based on this.
@@ -419,6 +424,17 @@ pub struct TreeIterator<K>
impl<K> TreeIterator<K>
where K: Key + for<'a> From<&'a [u8]>,
{
pub fn new_wrapping() -> TreeIterator<K> {
let mut next_key = Vec::new();
next_key.resize(K::KEY_LEN, 0);
TreeIterator {
done: false,
next_key,
max_key: None,
phantom_key: PhantomData,
}
}
pub fn new(range: &std::ops::Range<K>) -> TreeIterator<K> {
TreeIterator {
done: false,
@@ -429,7 +445,7 @@ impl<K> TreeIterator<K>
}
pub fn next<'g, V>(&mut self, read_guard: TreeReadGuard<'g, K, V>) -> Option<(K, V)>
pub fn next<'g, V>(&mut self, read_guard: &'g TreeReadGuard<'g, K, V>) -> Option<(K, &'g V)>
where V: Value
{
if self.done {
@@ -437,6 +453,8 @@ impl<K> TreeIterator<K>
}
if let Some((k , v)) = algorithm::iter_next(&mut self.next_key, read_guard.tree.root, &read_guard.epoch_pin) {
assert_eq!(k.len(), self.next_key.len());
// Check if we reached the end of the range
if let Some(max_key) = &self.max_key {
assert_eq!(k.len(), max_key.len());
if k.as_slice() >= max_key.as_slice() {
@@ -444,12 +462,13 @@ impl<K> TreeIterator<K>
return None;
}
}
// increment the key
self.next_key = k.clone();
increment_key(self.next_key.as_mut_slice());
let k = k.as_slice().into();
Some((k, v.clone()))
Some((k, v))
} else {
self.done = true;
None

View File

@@ -131,8 +131,9 @@ fn test_iter<A: ArtAllocator<usize>>(tree: &TreeWriteAccess<TestKey, usize, A>,
let mut iter = TreeIterator::new(&(TestKey::MIN..TestKey::MAX));
loop {
let shadow_item = shadow_iter.next().map(|(k, v)| (k.clone(), v.clone()));
let item = iter.next(tree.start_read());
let shadow_item = shadow_iter.next().map(|(k, v)| (k.clone(), v));
let r = tree.start_read();
let item = iter.next(&r);
if shadow_item != item {
eprintln!("FAIL: iterator returned {:?}, expected {:?}", item, shadow_item);

View File

@@ -11,10 +11,11 @@
use std::fs::File;
use std::path::Path;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use tokio_epoll_uring;
use std::sync::Mutex;
use crate::BLCKSZ;
pub type CacheBlock = u64;
@@ -24,15 +25,21 @@ pub struct FileCache {
file: Arc<File>,
// TODO: there's no reclamation mechanism, the cache grows
// indefinitely. This is the next free block, i.e. the current
// size of the file
next_free_block: AtomicU64,
free_list: Mutex<FreeList>
}
// TODO: We keep track of all free blocks in this vec. That doesn't really scale.
struct FreeList {
next_free_block: CacheBlock,
max_blocks: u64,
free_blocks: Vec<CacheBlock>,
}
impl FileCache {
pub fn new(
file_cache_path: &Path,
initial_size: u64,
uring_system: tokio_epoll_uring::SystemHandle,
) -> Result<FileCache, std::io::Error> {
let file = std::fs::OpenOptions::new()
@@ -47,7 +54,11 @@ impl FileCache {
Ok(FileCache {
file: Arc::new(file),
uring_system,
next_free_block: AtomicU64::new(0),
free_list: Mutex::new(FreeList {
next_free_block: 0,
max_blocks: initial_size,
free_blocks: Vec::new(),
}),
})
}
@@ -94,8 +105,22 @@ impl FileCache {
Ok(())
}
pub fn alloc_block(&self) -> CacheBlock {
self.next_free_block.fetch_add(1, Ordering::Relaxed)
pub fn alloc_block(&self) -> Option<CacheBlock> {
let mut free_list = self.free_list.lock().unwrap();
if let Some(x) = free_list.free_blocks.pop() {
return Some(x);
}
if free_list.next_free_block < free_list.max_blocks {
let result = free_list.next_free_block;
free_list.next_free_block -= 1;
return Some(result);
}
None
}
pub fn dealloc_block(&self, cache_block: CacheBlock) {
let mut free_list = self.free_list.lock().unwrap();
free_list.free_blocks.push(cache_block);
}
}

View File

@@ -1,6 +1,6 @@
//! Integrated communicator cache
//!
//! Tracks:
//! It tracks:
//! - Relation sizes and existence
//! - Last-written LSN
//! - TODO: Block cache (also known as LFC)
@@ -24,6 +24,7 @@
use std::mem::MaybeUninit;
use std::ops::Range;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use utils::lsn::Lsn;
use zerocopy::FromBytes;
@@ -55,9 +56,12 @@ pub struct IntegratedCacheWriteAccess<'t> {
neonart::ArtMultiSlabAllocator<'t, TreeEntry>,
>,
global_lw_lsn: Lsn,
global_lw_lsn: AtomicU64,
file_cache: Option<FileCache>,
// Fields for eviction
clock_hand: std::sync::Mutex<TreeIterator<TreeKey>>,
}
/// Represents read-only access to the integrated cache. Backend processes have this.
@@ -99,8 +103,9 @@ impl<'t> IntegratedCacheInitStruct<'t> {
IntegratedCacheWriteAccess {
cache_tree: tree_writer,
global_lw_lsn: lsn,
global_lw_lsn: AtomicU64::new(lsn.0),
file_cache,
clock_hand: std::sync::Mutex::new(TreeIterator::new_wrapping()),
}
}
@@ -124,10 +129,22 @@ enum TreeEntry {
Block(BlockEntry),
}
#[derive(Clone)]
struct BlockEntry {
lw_lsn: Lsn,
cache_block: Option<CacheBlock>,
// 'referenced' bit for the clock algorithm
referenced: AtomicBool,
}
impl Clone for BlockEntry {
fn clone(&self) -> BlockEntry {
BlockEntry {
lw_lsn: self.lw_lsn,
cache_block: self.cache_block,
referenced: AtomicBool::new(self.referenced.load(Ordering::Relaxed)),
}
}
}
#[derive(Clone, Default)]
@@ -233,7 +250,8 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
if let Some(nblocks) = get_rel_size(&r, rel) {
CacheResult::Found(nblocks)
} else {
CacheResult::NotFound(self.global_lw_lsn)
let lsn = Lsn(self.global_lw_lsn.load(Ordering::Relaxed));
CacheResult::NotFound(lsn)
}
}
@@ -262,7 +280,8 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
Ok(CacheResult::NotFound(block_entry.lw_lsn))
}
} else {
Ok(CacheResult::NotFound(self.global_lw_lsn))
let lsn = Lsn(self.global_lw_lsn.load(Ordering::Relaxed));
Ok(CacheResult::NotFound(lsn))
}
}
@@ -285,7 +304,8 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
Ok(CacheResult::NotFound(block_entry.lw_lsn))
}
} else {
Ok(CacheResult::NotFound(self.global_lw_lsn))
let lsn = Lsn(self.global_lw_lsn.load(Ordering::Relaxed));
Ok(CacheResult::NotFound(lsn))
}
}
@@ -297,13 +317,19 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
if let Some(_rel_entry) = r.get(&TreeKey::from(rel)) {
CacheResult::Found(true)
} else {
CacheResult::NotFound(self.global_lw_lsn)
let lsn = Lsn(self.global_lw_lsn.load(Ordering::Relaxed));
CacheResult::NotFound(lsn)
}
}
pub fn get_db_size(&'t self, _db_oid: u32) -> CacheResult<u64> {
// TODO: it would be nice to cache database sizes too. Getting the database size
// is not a very common operation, but when you do it, it's often interactive, with
// e.g. psql \l+ command, so the user will feel the latency.
// fixme: is this right lsn?
CacheResult::NotFound(self.global_lw_lsn)
let lsn = Lsn(self.global_lw_lsn.load(Ordering::Relaxed));
CacheResult::NotFound(lsn)
}
pub fn remember_rel_size(&'t self, rel: &RelTag, nblocks: u32) {
@@ -329,6 +355,15 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
let key = TreeKey::from((rel, block_number));
let mut reserved_cache_block = loop {
if let Some(x) = file_cache.alloc_block() {
break Some(x);
}
if let Some(x) = self.try_evict_one_cache_block() {
break Some(x);
}
};
let mut cache_block = None;
w.update_with_fn(&key, |existing| {
@@ -340,24 +375,30 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
};
block_entry.lw_lsn = lw_lsn;
if block_entry.cache_block.is_none() {
block_entry.cache_block = Some(file_cache.alloc_block());
block_entry.cache_block = reserved_cache_block.take();
}
cache_block = block_entry.cache_block;
Some(TreeEntry::Block(block_entry))
} else {
cache_block = Some(file_cache.alloc_block());
cache_block = reserved_cache_block.take();
Some(TreeEntry::Block(BlockEntry {
lw_lsn: lw_lsn,
cache_block: cache_block,
referenced: AtomicBool::new(true),
}))
}
});
if let Some(x) = reserved_cache_block {
file_cache.dealloc_block(x);
}
let cache_block = cache_block.unwrap();
file_cache
.write_block(cache_block, src)
.await
.expect("error writing to cache");
}
};
}
/// Forget information about given relation in the cache. (For DROP TABLE and such)
@@ -367,11 +408,51 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
// also forget all cached blocks for the relation
let mut iter = TreeIterator::new(&key_range_for_rel_blocks(rel));
while let Some((k, _v)) = iter.next(self.cache_tree.start_read()) {
let r = self.cache_tree.start_read();
while let Some((k, _v)) = iter.next(&r) {
let w = self.cache_tree.start_write();
w.remove(&k);
}
}
// Maintenance routines
/// Evict one block from the file cache. This is used when the file cache fills up
/// Returns the evicted block, it's not put to the fre list, so it's available for the
/// caller to use immediately.
pub fn try_evict_one_cache_block(&self) -> Option<CacheBlock> {
let mut clock_hand = self.clock_hand.lock().unwrap();
for _ in 0..1000 {
let r = self.cache_tree.start_read();
match clock_hand.next(&r) {
None => {
// The cache is completely empty. Pretty unexpected that this function
// was called then..
},
Some((_k, TreeEntry::Rel(_))) => {
// ignore rel entries for now.
// TODO: They stick in the cache forever
},
Some((k, TreeEntry::Block(blk_entry))) => {
if !blk_entry.referenced.swap(false, Ordering::Relaxed) {
// Evict this
let w = self.cache_tree.start_write();
let old = w.remove(&k);
if let Some(TreeEntry::Block(old)) = old {
let _ = self.global_lw_lsn.fetch_max(old.lw_lsn.0, Ordering::Relaxed);
if let Some(cache_block) = old.cache_block {
return Some(cache_block);
}
} else {
assert!(old.is_none());
}
}
},
}
}
// Give up if we didn't find anything
None
}
}
/// Read relation size from the cache.

View File

@@ -39,7 +39,7 @@ pub(super) async fn init(
timeline_id: String,
auth_token: Option<String>,
shard_map: HashMap<utils::shard::ShardIndex, String>,
_file_cache_size: u64,
file_cache_size: u64,
file_cache_path: Option<PathBuf>,
) -> CommunicatorWorkerProcessStruct<'static> {
let last_lsn = get_request_lsn();
@@ -47,11 +47,11 @@ pub(super) async fn init(
let uring_system = tokio_epoll_uring::System::launch().await.unwrap();
let file_cache = if let Some(path) = file_cache_path {
Some(FileCache::new(&path, uring_system).expect("could not create cache file"))
Some(FileCache::new(&path, file_cache_size, uring_system).expect("could not create cache file"))
} else {
// FIXME: temporarily for testing, use LFC even if disabled
Some(
FileCache::new(&PathBuf::from("new_filecache"), uring_system)
FileCache::new(&PathBuf::from("new_filecache"), 1000, uring_system)
.expect("could not create cache file"),
)
};