From 33549bad1daa2b9d91391ccd94bb159f11c60a5a Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Wed, 28 May 2025 23:57:55 +0300 Subject: [PATCH] use separate hash tables for relsize cache and block mappings --- Cargo.lock | 2 - libs/neon-shmem/src/hash.rs | 74 ++-- libs/neon-shmem/src/hash/core.rs | 45 ++- pgxn/neon/communicator/Cargo.toml | 2 - .../neon/communicator/src/integrated_cache.rs | 348 ++++++++---------- 5 files changed, 216 insertions(+), 255 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2e0fa0a2b2..58821b37e0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1373,8 +1373,6 @@ dependencies = [ "tracing-subscriber", "uring-common", "utils", - "zerocopy 0.8.24", - "zerocopy-derive 0.8.24", ] [[package]] diff --git a/libs/neon-shmem/src/hash.rs b/libs/neon-shmem/src/hash.rs index c81d75c91d..dcb5343b42 100644 --- a/libs/neon-shmem/src/hash.rs +++ b/libs/neon-shmem/src/hash.rs @@ -7,9 +7,9 @@ //! [ ] Scalable to lots of concurrent accesses (currently uses a single spinlock) //! [ ] Resizable -use std::cmp::Eq; use std::fmt::Debug; use std::hash::Hash; +use std::mem::MaybeUninit; use std::ops::Deref; use crate::shmem::ShmemHandle; @@ -23,16 +23,6 @@ mod tests; use core::CoreHashMap; -/// Fixed-length key type -pub trait Key: Clone + Debug + Hash + Eq { - const KEY_LEN: usize; - - fn as_bytes(&self) -> &[u8]; -} - -/// Values stored in the hash table -pub trait Value {} - pub enum UpdateAction { Nothing, Insert(V), @@ -43,23 +33,21 @@ pub enum UpdateAction { pub struct OutOfMemoryError(); pub struct HashMapInit<'a, K, V> -where - K: Key, - V: Value, { - shmem: ShmemHandle, + // Hash table can be allocated in a fixed memory area, or in a resizeable ShmemHandle. + shmem: Option, shared_ptr: *mut HashMapShared<'a, K, V>, } -pub struct HashMapAccess<'a, K: Key, V: Value> { - _shmem: ShmemHandle, +pub struct HashMapAccess<'a, K, V> { + _shmem: Option, shared_ptr: *mut HashMapShared<'a, K, V>, } -unsafe impl<'a, K: Key + Sync, V: Value + Sync> Sync for HashMapAccess<'a, K, V> {} -unsafe impl<'a, K: Key + Send, V: Value + Send> Send for HashMapAccess<'a, K, V> {} +unsafe impl<'a, K: Sync, V: Sync> Sync for HashMapAccess<'a, K, V> {} +unsafe impl<'a, K: Send, V: Send> Send for HashMapAccess<'a, K, V> {} -impl<'a, K: Key, V: Value> HashMapInit<'a, K, V> { +impl<'a, K, V> HashMapInit<'a, K, V> { pub fn attach_writer(self) -> HashMapAccess<'a, K, V> { HashMapAccess { _shmem: self.shmem, @@ -75,23 +63,37 @@ impl<'a, K: Key, V: Value> HashMapInit<'a, K, V> { // This is stored in the shared memory area struct HashMapShared<'a, K, V> -where - K: Key, - V: Value, { inner: spin::RwLock>, } -impl<'a, K: Key, V: Value> HashMapInit<'a, K, V> { +impl<'a, K, V> HashMapInit<'a, K, V> +where K: Clone + Hash + Eq, +{ + pub fn estimate_size(num_buckets: u32) -> usize { + // add some margin to cover alignment etc. + CoreHashMap::::estimate_size(num_buckets) + size_of::>() + 1000 + } + + pub fn init_in_fixed_area(num_buckets: u32, area: &'a mut [MaybeUninit]) -> HashMapInit<'a, K, V> { + Self::init_common(num_buckets, None, area.as_mut_ptr().cast(), area.len()) + } + /// Initialize a new hash map in the given shared memory area - pub fn init_in_shmem(mut shmem: ShmemHandle, size: usize) -> HashMapInit<'a, K, V> { + pub fn init_in_shmem(num_buckets: u32, mut shmem: ShmemHandle) -> HashMapInit<'a, K, V> { + let size = Self::estimate_size(num_buckets); shmem .set_size(size) .expect("could not resize shared memory area"); - // carve out HashMapShared from the struct. This does not include the hashmap's dictionary + let ptr = unsafe { shmem.data_ptr.as_mut() }; + Self::init_common(num_buckets, Some(shmem), ptr, size) + } + + fn init_common(num_buckets: u32, shmem_handle: Option, area_ptr: *mut u8, area_len: usize) -> HashMapInit<'a, K, V> { + // carve out HashMapShared from the area. This does not include the hashmap's dictionary // and buckets. - let mut ptr: *mut u8 = unsafe { shmem.data_ptr.as_mut() }; + let mut ptr: *mut u8 = area_ptr; ptr = unsafe { ptr.add(ptr.align_offset(align_of::>())) }; let shared_ptr: *mut HashMapShared = ptr.cast(); ptr = unsafe { ptr.add(size_of::>()) }; @@ -100,11 +102,11 @@ impl<'a, K: Key, V: Value> HashMapInit<'a, K, V> { let remaining_area = unsafe { std::slice::from_raw_parts_mut( ptr, - size - ptr.offset_from(shmem.data_ptr.as_mut()) as usize, + area_len - ptr.offset_from(area_ptr) as usize, ) }; - let hashmap = CoreHashMap::new(remaining_area); + let hashmap = CoreHashMap::new(num_buckets, remaining_area); unsafe { std::ptr::write( shared_ptr, @@ -114,11 +116,17 @@ impl<'a, K: Key, V: Value> HashMapInit<'a, K, V> { ); } - HashMapInit { shmem, shared_ptr } + HashMapInit { + shmem: shmem_handle, + shared_ptr, + } } + } -impl<'a, K: Key, V: Value> HashMapAccess<'a, K, V> { +impl<'a, K, V> HashMapAccess<'a, K, V> + where K: Clone + Hash + Eq, +{ pub fn get<'e>(&'e self, key: &K) -> Option> { let map = unsafe { self.shared_ptr.as_ref() }.unwrap(); let lock_guard = map.inner.read(); @@ -248,12 +256,12 @@ impl<'a, K: Key, V: Value> HashMapAccess<'a, K, V> { } } -pub struct ValueReadGuard<'a, K: Key, V: Value> { +pub struct ValueReadGuard<'a, K, V> { _lock_guard: spin::RwLockReadGuard<'a, CoreHashMap<'a, K, V>>, value: *const V, } -impl<'a, K: Key, V: Value> Deref for ValueReadGuard<'a, K, V> { +impl<'a, K, V> Deref for ValueReadGuard<'a, K, V> { type Target = V; fn deref(&self) -> &Self::Target { diff --git a/libs/neon-shmem/src/hash/core.rs b/libs/neon-shmem/src/hash/core.rs index 0b0528d5da..9729596870 100644 --- a/libs/neon-shmem/src/hash/core.rs +++ b/libs/neon-shmem/src/hash/core.rs @@ -1,20 +1,18 @@ //! Simple hash table with chaining -use std::hash::{DefaultHasher, Hasher}; +use std::hash::{DefaultHasher, Hash, Hasher}; use std::mem::MaybeUninit; -use crate::hash::Key; - const INVALID_POS: u32 = u32::MAX; // Bucket -struct Bucket { +struct Bucket { hash: u64, next: u32, inner: Option<(K, V)>, } -pub(crate) struct CoreHashMap<'a, K: Key, V> { +pub(crate) struct CoreHashMap<'a, K, V> { dictionary: &'a mut [u32], buckets: &'a mut [Bucket], free_head: u32, @@ -25,28 +23,35 @@ pub(crate) struct CoreHashMap<'a, K: Key, V> { pub struct FullError(); -impl<'a, K: Key, V> CoreHashMap<'a, K, V> { - const FILL_FACTOR: f32 = 0.5; +impl<'a, K, V> CoreHashMap<'a, K, V> + where K: Clone + Hash + Eq, +{ + const FILL_FACTOR: f32 = 0.60; - pub fn new(area: &'a mut [u8]) -> CoreHashMap<'a, K, V> { + pub fn estimate_size(num_buckets: u32) -> usize{ + let mut size = 0; + + // buckets + size += size_of::>() * num_buckets as usize; + + // dictionary + size += (f32::ceil( + (size_of::() * num_buckets as usize) as f32 / Self::FILL_FACTOR) + ) as usize; + + size + } + + pub fn new(num_buckets: u32, area: &'a mut [u8]) -> CoreHashMap<'a, K, V> { let len = area.len(); let mut ptr: *mut u8 = area.as_mut_ptr(); let end_ptr: *mut u8 = unsafe { area.as_mut_ptr().add(len) }; - // How much space is left? - let size_remain = unsafe { end_ptr.byte_offset_from(ptr) }; - - let num_buckets = f32::floor( - size_remain as f32 - / (size_of::>() as f32 - + size_of::() as f32 * 1.0 / Self::FILL_FACTOR), - ) as usize; - // carve out the buckets ptr = unsafe { ptr.byte_add(ptr.align_offset(align_of::>())) }; let buckets_ptr = ptr; - ptr = unsafe { ptr.add(size_of::>() * num_buckets) }; + ptr = unsafe { ptr.add(size_of::>() * num_buckets as usize) }; // use remaining space for the dictionary ptr = unsafe { ptr.byte_add(ptr.align_offset(align_of::())) }; @@ -59,7 +64,7 @@ impl<'a, K: Key, V> CoreHashMap<'a, K, V> { // Initialize the buckets let buckets = { let buckets_ptr: *mut MaybeUninit> = buckets_ptr.cast(); - let buckets = unsafe { std::slice::from_raw_parts_mut(buckets_ptr, num_buckets) }; + let buckets = unsafe { std::slice::from_raw_parts_mut(buckets_ptr, num_buckets as usize) }; for i in 0..buckets.len() { buckets[i].write(Bucket { hash: 0, @@ -72,7 +77,7 @@ impl<'a, K: Key, V> CoreHashMap<'a, K, V> { }); } // TODO: use std::slice::assume_init_mut() once it stabilizes - unsafe { std::slice::from_raw_parts_mut(buckets_ptr.cast(), num_buckets) } + unsafe { std::slice::from_raw_parts_mut(buckets_ptr.cast(), num_buckets as usize) } }; // Initialize the dictionary diff --git a/pgxn/neon/communicator/Cargo.toml b/pgxn/neon/communicator/Cargo.toml index be6d22610a..40400e2b4c 100644 --- a/pgxn/neon/communicator/Cargo.toml +++ b/pgxn/neon/communicator/Cargo.toml @@ -22,8 +22,6 @@ tokio-pipe = { version = "0.2.12" } thiserror.workspace = true tracing.workspace = true tracing-subscriber.workspace = true -zerocopy = "0.8.0" -zerocopy-derive = "0.8.0" metrics.workspace = true uring-common = { workspace = true, features = ["bytes"] } diff --git a/pgxn/neon/communicator/src/integrated_cache.rs b/pgxn/neon/communicator/src/integrated_cache.rs index f0b14233bf..32fe07b3fd 100644 --- a/pgxn/neon/communicator/src/integrated_cache.rs +++ b/pgxn/neon/communicator/src/integrated_cache.rs @@ -23,11 +23,9 @@ // use std::mem::MaybeUninit; -use std::ops::Range; use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering}; use utils::lsn::{AtomicLsn, Lsn}; -use zerocopy::FromBytes; use crate::file_cache::INVALID_CACHE_BLOCK; use crate::file_cache::{CacheBlock, FileCache}; @@ -39,18 +37,27 @@ use neon_shmem::hash::HashMapInit; use neon_shmem::hash::UpdateAction; use neon_shmem::shmem::ShmemHandle; -const CACHE_AREA_SIZE: usize = 10 * 1024 * 1024; +/// in bytes +/// FIXME: calculate some reasonable upper bound +const MAX_BLOCK_MAP_SIZE: usize = 1024*1024*1024; -type IntegratedCacheMapInitStruct<'t> = HashMapInit<'t, MapKey, MapEntry>; +/// # of entries in the block mapping +/// FIXME: make it resizable. +const BLOCK_MAP_SIZE: u32 = 1000; + +// in # of entries +const RELSIZE_CACHE_SIZE: u32 = 64 * 1024; /// This struct is initialized at postmaster startup, and passed to all the processes via fork(). pub struct IntegratedCacheInitStruct<'t> { - map_handle: IntegratedCacheMapInitStruct<'t>, + relsize_cache_handle: HashMapInit<'t, RelKey, RelEntry>, + block_map_handle: HashMapInit<'t, BlockKey, BlockEntry>, } /// Represents write-access to the integrated cache. This is used by the communicator process. pub struct IntegratedCacheWriteAccess<'t> { - cache_map: neon_shmem::hash::HashMapAccess<'t, MapKey, MapEntry>, + relsize_cache: neon_shmem::hash::HashMapAccess<'t, RelKey, RelEntry>, + block_map: neon_shmem::hash::HashMapAccess<'t, BlockKey, BlockEntry>, global_lw_lsn: AtomicU64, @@ -64,36 +71,46 @@ pub struct IntegratedCacheWriteAccess<'t> { clock_iterations_counter: IntCounter, // metrics from the hash map - cache_map_num_buckets: IntGauge, - cache_map_num_buckets_in_use: IntGauge, + block_map_num_buckets: IntGauge, + block_map_num_buckets_in_use: IntGauge, + + relsize_cache_num_buckets: IntGauge, + relsize_cache_num_buckets_in_use: IntGauge, } /// Represents read-only access to the integrated cache. Backend processes have this. pub struct IntegratedCacheReadAccess<'t> { - cache_map: neon_shmem::hash::HashMapAccess<'t, MapKey, MapEntry>, + relsize_cache: neon_shmem::hash::HashMapAccess<'t, RelKey, RelEntry>, + block_map: neon_shmem::hash::HashMapAccess<'t, BlockKey, BlockEntry>, } + + impl<'t> IntegratedCacheInitStruct<'t> { - /// Return the desired size in bytes of the shared memory area to reserve for the integrated - /// cache. + /// Return the desired size in bytes of the fixed-size shared memory area to reserve for the + /// integrated cache. pub fn shmem_size(_max_procs: u32) -> usize { - // FIXME: the map uses its own ShmemHandle now. This is just for fixed-size allocations - // in the general Postgres shared memory segment. - 0 + HashMapInit::::estimate_size(RELSIZE_CACHE_SIZE) } /// Initialize the shared memory segment. This runs once in postmaster. Returns a struct which /// will be inherited by all processes through fork. pub fn shmem_init( _max_procs: u32, - _shmem_area: &'t mut [MaybeUninit], + shmem_area: &'t mut [MaybeUninit], ) -> IntegratedCacheInitStruct<'t> { - let shmem_handle = ShmemHandle::new("integrated cache", 0, CACHE_AREA_SIZE).unwrap(); + // Initialize the hash map + let relsize_cache_handle = + neon_shmem::hash::HashMapInit::init_in_fixed_area(RELSIZE_CACHE_SIZE, shmem_area); - // Initialize the shared memory area - let map_handle = - neon_shmem::hash::HashMapInit::init_in_shmem(shmem_handle, CACHE_AREA_SIZE); - IntegratedCacheInitStruct { map_handle } + let shmem_handle = ShmemHandle::new("block mapping", 0, MAX_BLOCK_MAP_SIZE).unwrap(); + + let block_map_handle = + neon_shmem::hash::HashMapInit::init_in_shmem(BLOCK_MAP_SIZE, shmem_handle); + IntegratedCacheInitStruct { + relsize_cache_handle, + block_map_handle, + } } pub fn worker_process_init( @@ -101,11 +118,13 @@ impl<'t> IntegratedCacheInitStruct<'t> { lsn: Lsn, file_cache: Option, ) -> IntegratedCacheWriteAccess<'t> { - let IntegratedCacheInitStruct { map_handle } = self; - let map_writer = map_handle.attach_writer(); - + let IntegratedCacheInitStruct { + relsize_cache_handle, + block_map_handle, + } = self; IntegratedCacheWriteAccess { - cache_map: map_writer, + relsize_cache: relsize_cache_handle.attach_writer(), + block_map: block_map_handle.attach_writer(), global_lw_lsn: AtomicU64::new(lsn.0), file_cache, clock_hand: std::sync::Mutex::new(0), @@ -122,35 +141,44 @@ impl<'t> IntegratedCacheInitStruct<'t> { ) .unwrap(), - cache_map_num_buckets: metrics::IntGauge::new( - "cache_num_map_buckets", - "Allocated size of the cache hash map", + block_map_num_buckets: metrics::IntGauge::new( + "block_map_num_buckets", + "Allocated size of the block cache hash map", ) .unwrap(), - cache_map_num_buckets_in_use: metrics::IntGauge::new( - "cache_num_map_buckets_in_use", - "Number of buckets in use in the cache hash map", + block_map_num_buckets_in_use: metrics::IntGauge::new( + "block_map_num_buckets_in_use", + "Number of buckets in use in the block cache hash map", + ) + .unwrap(), + + relsize_cache_num_buckets: metrics::IntGauge::new( + "relsize_cache_num_buckets", + "Allocated size of the relsize cache hash map", + ) + .unwrap(), + relsize_cache_num_buckets_in_use: metrics::IntGauge::new( + "relsize_cache_num_buckets_in_use", + "Number of buckets in use in the relsize cache hash map", ) .unwrap(), } } pub fn backend_init(self) -> IntegratedCacheReadAccess<'t> { - let IntegratedCacheInitStruct { map_handle } = self; - - let map_reader = map_handle.attach_reader(); + let IntegratedCacheInitStruct { + relsize_cache_handle, + block_map_handle, + } = self; IntegratedCacheReadAccess { - cache_map: map_reader, + relsize_cache: relsize_cache_handle.attach_reader(), + block_map: block_map_handle.attach_reader(), } } } -enum MapEntry { - Rel(RelEntry), - Block(BlockEntry), -} - +/// Value stored in the cache mapping hash table. struct BlockEntry { lw_lsn: AtomicLsn, cache_block: AtomicU64, @@ -161,27 +189,30 @@ struct BlockEntry { referenced: AtomicBool, } +/// Value stored in the relsize cache hash table. struct RelEntry { /// cached size of the relation /// u32::MAX means 'not known' (that's InvalidBlockNumber in Postgres) nblocks: AtomicU32, } -impl std::fmt::Debug for MapEntry { +impl std::fmt::Debug for RelEntry { fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { - match self { - MapEntry::Rel(e) => fmt - .debug_struct("Rel") - .field("nblocks", &e.nblocks.load(Ordering::Relaxed)) - .finish(), - MapEntry::Block(e) => fmt - .debug_struct("Block") - .field("lw_lsn", &e.lw_lsn.load()) - .field("cache_block", &e.cache_block.load(Ordering::Relaxed)) - .field("pinned", &e.pinned.load(Ordering::Relaxed)) - .field("referenced", &e.referenced.load(Ordering::Relaxed)) - .finish(), - } + fmt + .debug_struct("Rel") + .field("nblocks", &self.nblocks.load(Ordering::Relaxed)) + .finish() + } +} +impl std::fmt::Debug for BlockEntry { + fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { + fmt + .debug_struct("Block") + .field("lw_lsn", &self.lw_lsn.load()) + .field("cache_block", &self.cache_block.load(Ordering::Relaxed)) + .field("pinned", &self.pinned.load(Ordering::Relaxed)) + .field("referenced", &self.referenced.load(Ordering::Relaxed)) + .finish() } } @@ -193,71 +224,30 @@ impl std::fmt::Debug for MapEntry { Eq, Hash, Ord, - zerocopy_derive::IntoBytes, - zerocopy_derive::Immutable, - zerocopy_derive::FromBytes, )] -#[repr(packed)] -// Note: the fields are stored in big-endian order. If we used the keys in a radix tree, that would -// make pack the tree more tightly, and would make scans over ranges of blocks work correctly, -// i.e. return the entries in block number order. XXX: We currently use a hash map though, so it -// doesn't matter. -struct MapKey { - spc_oid_be: u32, - db_oid_be: u32, - rel_number_be: u32, - fork_number: u8, - block_number_be: u32, -} -impl<'a> From<&'a [u8]> for MapKey { - fn from(bytes: &'a [u8]) -> Self { - Self::read_from_bytes(bytes).expect("invalid key length") +struct RelKey(RelTag); + +impl From<&RelTag> for RelKey { + fn from(val: &RelTag) -> RelKey { + RelKey(val.clone()) } } -// fixme: currently unused -#[allow(dead_code)] -fn key_range_for_rel_blocks(rel: &RelTag) -> Range { - Range { - start: MapKey::from((rel, 0)), - end: MapKey::from((rel, u32::MAX)), - } +#[derive(Clone, Debug, PartialEq, PartialOrd, Eq, Hash, Ord)] +struct BlockKey { + rel: RelTag, + block_number: u32, } -impl From<&RelTag> for MapKey { - fn from(val: &RelTag) -> MapKey { - MapKey { - spc_oid_be: val.spc_oid.to_be(), - db_oid_be: val.db_oid.to_be(), - rel_number_be: val.rel_number.to_be(), - fork_number: val.fork_number.to_be(), - block_number_be: u32::MAX.to_be(), +impl From<(&RelTag, u32)> for BlockKey { + fn from(val: (&RelTag, u32)) -> BlockKey { + BlockKey { + rel: val.0.clone(), + block_number: val.1, } } } -impl From<(&RelTag, u32)> for MapKey { - fn from(val: (&RelTag, u32)) -> MapKey { - MapKey { - spc_oid_be: val.0.spc_oid.to_be(), - db_oid_be: val.0.db_oid.to_be(), - rel_number_be: val.0.rel_number.to_be(), - fork_number: val.0.fork_number.to_be(), - block_number_be: val.1.to_be(), - } - } -} - -impl neon_shmem::hash::Key for MapKey { - const KEY_LEN: usize = 4 + 4 + 4 + 1 + 4; - - fn as_bytes(&self) -> &[u8] { - zerocopy::IntoBytes::as_bytes(self) - } -} - -impl neon_shmem::hash::Value for MapEntry {} - /// Return type used in the cache's get_*() functions. 'Found' means that the page, or other /// information that was enqueried, exists in the cache. ' pub enum CacheResult { @@ -272,7 +262,7 @@ pub enum CacheResult { impl<'t> IntegratedCacheWriteAccess<'t> { pub fn get_rel_size(&'t self, rel: &RelTag) -> CacheResult { - if let Some(nblocks) = get_rel_size(&self.cache_map, rel) { + if let Some(nblocks) = get_rel_size(&self.relsize_cache, rel) { CacheResult::Found(nblocks) } else { let lsn = Lsn(self.global_lw_lsn.load(Ordering::Relaxed)); @@ -286,14 +276,9 @@ impl<'t> IntegratedCacheWriteAccess<'t> { block_number: u32, dst: impl uring_common::buf::IoBufMut + Send + Sync, ) -> Result, std::io::Error> { - let x = if let Some(entry) = - self.cache_map.get(&MapKey::from((rel, block_number))) + let x = if let Some(block_entry) = + self.block_map.get(&BlockKey::from((rel, block_number))) { - let block_entry = if let MapEntry::Block(e) = &*entry { - e - } else { - panic!("unexpected map entry type for block key"); - }; block_entry.referenced.store(true, Ordering::Relaxed); let cache_block = block_entry.cache_block.load(Ordering::Relaxed); @@ -326,13 +311,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> { rel: &RelTag, block_number: u32, ) -> Result, std::io::Error> { - if let Some(entry) = self.cache_map.get(&MapKey::from((rel, block_number))) { - let block_entry = if let MapEntry::Block(e) = &*entry { - e - } else { - panic!("unexpected map entry type for block key"); - }; - + if let Some(block_entry) = self.block_map.get(&BlockKey::from((rel, block_number))) { // This is used for prefetch requests. Treat the probe as an 'access', to keep it // in cache. block_entry.referenced.store(true, Ordering::Relaxed); @@ -354,7 +333,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> { /// information, i.e. we don't know if the relation exists or not. pub fn get_rel_exists(&'t self, rel: &RelTag) -> CacheResult { // we don't currently cache negative entries, so if the relation is in the cache, it exists - if let Some(_rel_entry) = self.cache_map.get(&MapKey::from(rel)) { + if let Some(_rel_entry) = self.relsize_cache.get(&RelKey::from(rel)) { CacheResult::Found(true) } else { let lsn = Lsn(self.global_lw_lsn.load(Ordering::Relaxed)); @@ -374,16 +353,15 @@ impl<'t> IntegratedCacheWriteAccess<'t> { pub fn remember_rel_size(&'t self, rel: &RelTag, nblocks: u32) { let result = self - .cache_map - .update_with_fn(&MapKey::from(rel), |existing| match existing { + .relsize_cache + .update_with_fn(&RelKey::from(rel), |existing| match existing { None => { tracing::info!("inserting rel entry for {rel:?}, {nblocks} blocks"); - UpdateAction::Insert(MapEntry::Rel(RelEntry { + UpdateAction::Insert(RelEntry { nblocks: AtomicU32::new(nblocks), - })) + }) } - Some(MapEntry::Block(_)) => panic!("unexpected map entry type for rel key"), - Some(MapEntry::Rel(e)) => { + Some(e) => { tracing::info!("updating rel entry for {rel:?}, {nblocks} blocks"); e.nblocks.store(nblocks, Ordering::Relaxed); UpdateAction::Nothing @@ -404,7 +382,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> { lw_lsn: Lsn, is_write: bool, ) { - let key = MapKey::from((rel, block_number)); + let key = BlockKey::from((rel, block_number)); // FIXME: make this work when file cache is disabled. Or make it mandatory let file_cache = self.file_cache.as_ref().unwrap(); @@ -418,14 +396,8 @@ impl<'t> IntegratedCacheWriteAccess<'t> { let mut old_cache_block = None; let mut found_existing = false; - let res = self.cache_map.update_with_fn(&key, |existing| { - if let Some(existing) = existing { - let block_entry = if let MapEntry::Block(e) = existing { - e - } else { - panic!("unexpected map entry type for block key"); - }; - + let res = self.block_map.update_with_fn(&key, |existing| { + if let Some(block_entry) = existing { found_existing = true; // Prevent this entry from being evicted @@ -474,15 +446,9 @@ impl<'t> IntegratedCacheWriteAccess<'t> { // FIXME: unpin the block entry on error // Update the block entry - let res = self.cache_map.update_with_fn(&key, |existing| { + let res = self.block_map.update_with_fn(&key, |existing| { assert_eq!(found_existing, existing.is_some()); - if let Some(existing) = existing { - let block_entry = if let MapEntry::Block(e) = existing { - e - } else { - panic!("unexpected map entry type for block key"); - }; - + if let Some(block_entry) = existing { // Update the cache block let old_blk = block_entry.cache_block.compare_exchange( INVALID_CACHE_BLOCK, @@ -500,12 +466,12 @@ impl<'t> IntegratedCacheWriteAccess<'t> { assert!(pin_count > 0); UpdateAction::Nothing } else { - UpdateAction::Insert(MapEntry::Block(BlockEntry { + UpdateAction::Insert(BlockEntry { lw_lsn: AtomicLsn::new(lw_lsn.0), cache_block: AtomicU64::new(cache_block), pinned: AtomicU64::new(0), referenced: AtomicBool::new(true), - })) + }) } }); @@ -538,14 +504,8 @@ impl<'t> IntegratedCacheWriteAccess<'t> { .expect("error writing to cache"); // FIXME: handle errors gracefully. - let res = self.cache_map.update_with_fn(&key, |existing| { - if let Some(existing) = existing { - let block_entry = if let MapEntry::Block(e) = existing { - e - } else { - panic!("unexpected map entry type for block key"); - }; - + let res = self.block_map.update_with_fn(&key, |existing| { + if let Some(block_entry) = existing { // FIXME: could there be concurrent readers? assert!(block_entry.pinned.load(Ordering::Relaxed) == 0); @@ -555,12 +515,12 @@ impl<'t> IntegratedCacheWriteAccess<'t> { } UpdateAction::Nothing } else { - UpdateAction::Insert(MapEntry::Block(BlockEntry { + UpdateAction::Insert(BlockEntry { lw_lsn: AtomicLsn::new(lw_lsn.0), cache_block: AtomicU64::new(cache_block), pinned: AtomicU64::new(0), referenced: AtomicBool::new(true), - })) + }) } }); @@ -573,7 +533,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> { /// Forget information about given relation in the cache. (For DROP TABLE and such) pub fn forget_rel(&'t self, rel: &RelTag) { tracing::info!("forgetting rel entry for {rel:?}"); - self.cache_map.remove(&MapKey::from(rel)); + self.relsize_cache.remove(&RelKey::from(rel)); // also forget all cached blocks for the relation // FIXME @@ -632,20 +592,16 @@ impl<'t> IntegratedCacheWriteAccess<'t> { (*clock_hand) += 1; let mut evict_this = false; - let num_buckets = self.cache_map.get_num_buckets(); + let num_buckets = self.block_map.get_num_buckets(); match self - .cache_map + .block_map .get_bucket((*clock_hand) % num_buckets) .as_deref() { None => { // This bucket was unused } - Some(MapEntry::Rel(_)) => { - // ignore rel entries for now. - // TODO: They stick in the cache forever - } - Some(MapEntry::Block(blk_entry)) => { + Some(blk_entry) => { if !blk_entry.referenced.swap(false, Ordering::Relaxed) { // Evict this. Maybe. evict_this = true; @@ -657,12 +613,11 @@ impl<'t> IntegratedCacheWriteAccess<'t> { // grab the write lock let mut evicted_cache_block = None; let res = - self.cache_map + self.block_map .update_with_fn_at_bucket(*clock_hand % num_buckets, |old| { match old { None => UpdateAction::Nothing, - Some(MapEntry::Rel(_)) => panic!("unexpected Rel entry"), - Some(MapEntry::Block(old)) => { + Some(old) => { // note: all the accesses to 'pinned' currently happen // within update_with_fn(), or while holding ValueReadGuard, which protects from concurrent // updates. Otherwise, another thread could set the 'pinned' @@ -680,9 +635,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> { if cache_block != INVALID_CACHE_BLOCK { evicted_cache_block = Some(cache_block); } - // TODO: we don't evict the entry, just the block. Does it make - // sense to keep the entry? - UpdateAction::Nothing + UpdateAction::Remove } } }); @@ -712,24 +665,34 @@ impl metrics::core::Collector for IntegratedCacheWriteAccess<'_> { descs.append(&mut self.page_evictions_counter.desc()); descs.append(&mut self.clock_iterations_counter.desc()); - descs.append(&mut self.cache_map_num_buckets.desc()); - descs.append(&mut self.cache_map_num_buckets_in_use.desc()); + descs.append(&mut self.block_map_num_buckets.desc()); + descs.append(&mut self.block_map_num_buckets_in_use.desc()); + + descs.append(&mut self.relsize_cache_num_buckets.desc()); + descs.append(&mut self.relsize_cache_num_buckets_in_use.desc()); descs } fn collect(&self) -> Vec { // Update gauges - self.cache_map_num_buckets - .set(self.cache_map.get_num_buckets() as i64); - self.cache_map_num_buckets_in_use - .set(self.cache_map.get_num_buckets_in_use() as i64); + self.block_map_num_buckets + .set(self.block_map.get_num_buckets() as i64); + self.block_map_num_buckets_in_use + .set(self.block_map.get_num_buckets_in_use() as i64); + self.relsize_cache_num_buckets + .set(self.relsize_cache.get_num_buckets() as i64); + self.relsize_cache_num_buckets_in_use + .set(self.relsize_cache.get_num_buckets_in_use() as i64); let mut values = Vec::new(); values.append(&mut self.page_evictions_counter.collect()); values.append(&mut self.clock_iterations_counter.collect()); - values.append(&mut self.cache_map_num_buckets.collect()); - values.append(&mut self.cache_map_num_buckets_in_use.collect()); + values.append(&mut self.block_map_num_buckets.collect()); + values.append(&mut self.block_map_num_buckets_in_use.collect()); + + values.append(&mut self.relsize_cache_num_buckets.collect()); + values.append(&mut self.relsize_cache_num_buckets_in_use.collect()); values } @@ -740,16 +703,10 @@ impl metrics::core::Collector for IntegratedCacheWriteAccess<'_> { /// This is in a separate function so that it can be shared by /// IntegratedCacheReadAccess::get_rel_size() and IntegratedCacheWriteAccess::get_rel_size() fn get_rel_size<'t>( - r: &neon_shmem::hash::HashMapAccess, + r: &neon_shmem::hash::HashMapAccess, rel: &RelTag, ) -> Option { - if let Some(existing) = r.get(&MapKey::from(rel)) { - let rel_entry = if let MapEntry::Rel(ref e) = *existing { - e - } else { - panic!("unexpected map entry type for rel key"); - }; - + if let Some(rel_entry) = r.get(&RelKey::from(rel)) { let nblocks = rel_entry.nblocks.load(Ordering::Relaxed); if nblocks != u32::MAX { Some(nblocks) @@ -767,7 +724,7 @@ fn get_rel_size<'t>( /// request to the communicator process. impl<'t> IntegratedCacheReadAccess<'t> { pub fn get_rel_size(&'t self, rel: &RelTag) -> Option { - get_rel_size(&self.cache_map, rel) + get_rel_size(&self.relsize_cache, rel) } pub fn start_read_op(&'t self) -> BackendCacheReadOp<'t> { @@ -793,16 +750,11 @@ impl<'e> BackendCacheReadOp<'e> { /// After you have completed the read, call BackendCacheReadResult::finish() to check if the /// read was in fact valid or not. If it was concurrently invalidated, you need to retry. pub fn get_page(&mut self, rel: &RelTag, block_number: u32) -> Option { - if let Some(entry) = self + if let Some(block_entry) = self .map_access - .cache_map - .get(&MapKey::from((rel, block_number))) + .block_map + .get(&BlockKey::from((rel, block_number))) { - let block_entry = if let MapEntry::Block(ref e) = *entry { - e - } else { - panic!("unexpected map entry type for block key"); - }; block_entry.referenced.store(true, Ordering::Relaxed); let cache_block = block_entry.cache_block.load(Ordering::Relaxed);