use separate hash tables for relsize cache and block mappings

This commit is contained in:
Heikki Linnakangas
2025-05-28 23:57:55 +03:00
parent 009168d711
commit 33549bad1d
5 changed files with 216 additions and 255 deletions

2
Cargo.lock generated
View File

@@ -1373,8 +1373,6 @@ dependencies = [
"tracing-subscriber",
"uring-common",
"utils",
"zerocopy 0.8.24",
"zerocopy-derive 0.8.24",
]
[[package]]

View File

@@ -7,9 +7,9 @@
//! [ ] Scalable to lots of concurrent accesses (currently uses a single spinlock)
//! [ ] Resizable
use std::cmp::Eq;
use std::fmt::Debug;
use std::hash::Hash;
use std::mem::MaybeUninit;
use std::ops::Deref;
use crate::shmem::ShmemHandle;
@@ -23,16 +23,6 @@ mod tests;
use core::CoreHashMap;
/// Fixed-length key type
pub trait Key: Clone + Debug + Hash + Eq {
const KEY_LEN: usize;
fn as_bytes(&self) -> &[u8];
}
/// Values stored in the hash table
pub trait Value {}
pub enum UpdateAction<V> {
Nothing,
Insert(V),
@@ -43,23 +33,21 @@ pub enum UpdateAction<V> {
pub struct OutOfMemoryError();
pub struct HashMapInit<'a, K, V>
where
K: Key,
V: Value,
{
shmem: ShmemHandle,
// Hash table can be allocated in a fixed memory area, or in a resizeable ShmemHandle.
shmem: Option<ShmemHandle>,
shared_ptr: *mut HashMapShared<'a, K, V>,
}
pub struct HashMapAccess<'a, K: Key, V: Value> {
_shmem: ShmemHandle,
pub struct HashMapAccess<'a, K, V> {
_shmem: Option<ShmemHandle>,
shared_ptr: *mut HashMapShared<'a, K, V>,
}
unsafe impl<'a, K: Key + Sync, V: Value + Sync> Sync for HashMapAccess<'a, K, V> {}
unsafe impl<'a, K: Key + Send, V: Value + Send> Send for HashMapAccess<'a, K, V> {}
unsafe impl<'a, K: Sync, V: Sync> Sync for HashMapAccess<'a, K, V> {}
unsafe impl<'a, K: Send, V: Send> Send for HashMapAccess<'a, K, V> {}
impl<'a, K: Key, V: Value> HashMapInit<'a, K, V> {
impl<'a, K, V> HashMapInit<'a, K, V> {
pub fn attach_writer(self) -> HashMapAccess<'a, K, V> {
HashMapAccess {
_shmem: self.shmem,
@@ -75,23 +63,37 @@ impl<'a, K: Key, V: Value> HashMapInit<'a, K, V> {
// This is stored in the shared memory area
struct HashMapShared<'a, K, V>
where
K: Key,
V: Value,
{
inner: spin::RwLock<CoreHashMap<'a, K, V>>,
}
impl<'a, K: Key, V: Value> HashMapInit<'a, K, V> {
impl<'a, K, V> HashMapInit<'a, K, V>
where K: Clone + Hash + Eq,
{
pub fn estimate_size(num_buckets: u32) -> usize {
// add some margin to cover alignment etc.
CoreHashMap::<K, V>::estimate_size(num_buckets) + size_of::<HashMapShared<K, V>>() + 1000
}
pub fn init_in_fixed_area(num_buckets: u32, area: &'a mut [MaybeUninit<u8>]) -> HashMapInit<'a, K, V> {
Self::init_common(num_buckets, None, area.as_mut_ptr().cast(), area.len())
}
/// Initialize a new hash map in the given shared memory area
pub fn init_in_shmem(mut shmem: ShmemHandle, size: usize) -> HashMapInit<'a, K, V> {
pub fn init_in_shmem(num_buckets: u32, mut shmem: ShmemHandle) -> HashMapInit<'a, K, V> {
let size = Self::estimate_size(num_buckets);
shmem
.set_size(size)
.expect("could not resize shared memory area");
// carve out HashMapShared from the struct. This does not include the hashmap's dictionary
let ptr = unsafe { shmem.data_ptr.as_mut() };
Self::init_common(num_buckets, Some(shmem), ptr, size)
}
fn init_common(num_buckets: u32, shmem_handle: Option<ShmemHandle>, area_ptr: *mut u8, area_len: usize) -> HashMapInit<'a, K, V> {
// carve out HashMapShared from the area. This does not include the hashmap's dictionary
// and buckets.
let mut ptr: *mut u8 = unsafe { shmem.data_ptr.as_mut() };
let mut ptr: *mut u8 = area_ptr;
ptr = unsafe { ptr.add(ptr.align_offset(align_of::<HashMapShared<K, V>>())) };
let shared_ptr: *mut HashMapShared<K, V> = ptr.cast();
ptr = unsafe { ptr.add(size_of::<HashMapShared<K, V>>()) };
@@ -100,11 +102,11 @@ impl<'a, K: Key, V: Value> HashMapInit<'a, K, V> {
let remaining_area = unsafe {
std::slice::from_raw_parts_mut(
ptr,
size - ptr.offset_from(shmem.data_ptr.as_mut()) as usize,
area_len - ptr.offset_from(area_ptr) as usize,
)
};
let hashmap = CoreHashMap::new(remaining_area);
let hashmap = CoreHashMap::new(num_buckets, remaining_area);
unsafe {
std::ptr::write(
shared_ptr,
@@ -114,11 +116,17 @@ impl<'a, K: Key, V: Value> HashMapInit<'a, K, V> {
);
}
HashMapInit { shmem, shared_ptr }
HashMapInit {
shmem: shmem_handle,
shared_ptr,
}
}
}
impl<'a, K: Key, V: Value> HashMapAccess<'a, K, V> {
impl<'a, K, V> HashMapAccess<'a, K, V>
where K: Clone + Hash + Eq,
{
pub fn get<'e>(&'e self, key: &K) -> Option<ValueReadGuard<'e, K, V>> {
let map = unsafe { self.shared_ptr.as_ref() }.unwrap();
let lock_guard = map.inner.read();
@@ -248,12 +256,12 @@ impl<'a, K: Key, V: Value> HashMapAccess<'a, K, V> {
}
}
pub struct ValueReadGuard<'a, K: Key, V: Value> {
pub struct ValueReadGuard<'a, K, V> {
_lock_guard: spin::RwLockReadGuard<'a, CoreHashMap<'a, K, V>>,
value: *const V,
}
impl<'a, K: Key, V: Value> Deref for ValueReadGuard<'a, K, V> {
impl<'a, K, V> Deref for ValueReadGuard<'a, K, V> {
type Target = V;
fn deref(&self) -> &Self::Target {

View File

@@ -1,20 +1,18 @@
//! Simple hash table with chaining
use std::hash::{DefaultHasher, Hasher};
use std::hash::{DefaultHasher, Hash, Hasher};
use std::mem::MaybeUninit;
use crate::hash::Key;
const INVALID_POS: u32 = u32::MAX;
// Bucket
struct Bucket<K: Key, V> {
struct Bucket<K, V> {
hash: u64,
next: u32,
inner: Option<(K, V)>,
}
pub(crate) struct CoreHashMap<'a, K: Key, V> {
pub(crate) struct CoreHashMap<'a, K, V> {
dictionary: &'a mut [u32],
buckets: &'a mut [Bucket<K, V>],
free_head: u32,
@@ -25,28 +23,35 @@ pub(crate) struct CoreHashMap<'a, K: Key, V> {
pub struct FullError();
impl<'a, K: Key, V> CoreHashMap<'a, K, V> {
const FILL_FACTOR: f32 = 0.5;
impl<'a, K, V> CoreHashMap<'a, K, V>
where K: Clone + Hash + Eq,
{
const FILL_FACTOR: f32 = 0.60;
pub fn new(area: &'a mut [u8]) -> CoreHashMap<'a, K, V> {
pub fn estimate_size(num_buckets: u32) -> usize{
let mut size = 0;
// buckets
size += size_of::<Bucket<K, V>>() * num_buckets as usize;
// dictionary
size += (f32::ceil(
(size_of::<u32>() * num_buckets as usize) as f32 / Self::FILL_FACTOR)
) as usize;
size
}
pub fn new(num_buckets: u32, area: &'a mut [u8]) -> CoreHashMap<'a, K, V> {
let len = area.len();
let mut ptr: *mut u8 = area.as_mut_ptr();
let end_ptr: *mut u8 = unsafe { area.as_mut_ptr().add(len) };
// How much space is left?
let size_remain = unsafe { end_ptr.byte_offset_from(ptr) };
let num_buckets = f32::floor(
size_remain as f32
/ (size_of::<Bucket<K, V>>() as f32
+ size_of::<u32>() as f32 * 1.0 / Self::FILL_FACTOR),
) as usize;
// carve out the buckets
ptr = unsafe { ptr.byte_add(ptr.align_offset(align_of::<Bucket<K, V>>())) };
let buckets_ptr = ptr;
ptr = unsafe { ptr.add(size_of::<Bucket<K, V>>() * num_buckets) };
ptr = unsafe { ptr.add(size_of::<Bucket<K, V>>() * num_buckets as usize) };
// use remaining space for the dictionary
ptr = unsafe { ptr.byte_add(ptr.align_offset(align_of::<u32>())) };
@@ -59,7 +64,7 @@ impl<'a, K: Key, V> CoreHashMap<'a, K, V> {
// Initialize the buckets
let buckets = {
let buckets_ptr: *mut MaybeUninit<Bucket<K, V>> = buckets_ptr.cast();
let buckets = unsafe { std::slice::from_raw_parts_mut(buckets_ptr, num_buckets) };
let buckets = unsafe { std::slice::from_raw_parts_mut(buckets_ptr, num_buckets as usize) };
for i in 0..buckets.len() {
buckets[i].write(Bucket {
hash: 0,
@@ -72,7 +77,7 @@ impl<'a, K: Key, V> CoreHashMap<'a, K, V> {
});
}
// TODO: use std::slice::assume_init_mut() once it stabilizes
unsafe { std::slice::from_raw_parts_mut(buckets_ptr.cast(), num_buckets) }
unsafe { std::slice::from_raw_parts_mut(buckets_ptr.cast(), num_buckets as usize) }
};
// Initialize the dictionary

View File

@@ -22,8 +22,6 @@ tokio-pipe = { version = "0.2.12" }
thiserror.workspace = true
tracing.workspace = true
tracing-subscriber.workspace = true
zerocopy = "0.8.0"
zerocopy-derive = "0.8.0"
metrics.workspace = true
uring-common = { workspace = true, features = ["bytes"] }

View File

@@ -23,11 +23,9 @@
//
use std::mem::MaybeUninit;
use std::ops::Range;
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
use utils::lsn::{AtomicLsn, Lsn};
use zerocopy::FromBytes;
use crate::file_cache::INVALID_CACHE_BLOCK;
use crate::file_cache::{CacheBlock, FileCache};
@@ -39,18 +37,27 @@ use neon_shmem::hash::HashMapInit;
use neon_shmem::hash::UpdateAction;
use neon_shmem::shmem::ShmemHandle;
const CACHE_AREA_SIZE: usize = 10 * 1024 * 1024;
/// in bytes
/// FIXME: calculate some reasonable upper bound
const MAX_BLOCK_MAP_SIZE: usize = 1024*1024*1024;
type IntegratedCacheMapInitStruct<'t> = HashMapInit<'t, MapKey, MapEntry>;
/// # of entries in the block mapping
/// FIXME: make it resizable.
const BLOCK_MAP_SIZE: u32 = 1000;
// in # of entries
const RELSIZE_CACHE_SIZE: u32 = 64 * 1024;
/// This struct is initialized at postmaster startup, and passed to all the processes via fork().
pub struct IntegratedCacheInitStruct<'t> {
map_handle: IntegratedCacheMapInitStruct<'t>,
relsize_cache_handle: HashMapInit<'t, RelKey, RelEntry>,
block_map_handle: HashMapInit<'t, BlockKey, BlockEntry>,
}
/// Represents write-access to the integrated cache. This is used by the communicator process.
pub struct IntegratedCacheWriteAccess<'t> {
cache_map: neon_shmem::hash::HashMapAccess<'t, MapKey, MapEntry>,
relsize_cache: neon_shmem::hash::HashMapAccess<'t, RelKey, RelEntry>,
block_map: neon_shmem::hash::HashMapAccess<'t, BlockKey, BlockEntry>,
global_lw_lsn: AtomicU64,
@@ -64,36 +71,46 @@ pub struct IntegratedCacheWriteAccess<'t> {
clock_iterations_counter: IntCounter,
// metrics from the hash map
cache_map_num_buckets: IntGauge,
cache_map_num_buckets_in_use: IntGauge,
block_map_num_buckets: IntGauge,
block_map_num_buckets_in_use: IntGauge,
relsize_cache_num_buckets: IntGauge,
relsize_cache_num_buckets_in_use: IntGauge,
}
/// Represents read-only access to the integrated cache. Backend processes have this.
pub struct IntegratedCacheReadAccess<'t> {
cache_map: neon_shmem::hash::HashMapAccess<'t, MapKey, MapEntry>,
relsize_cache: neon_shmem::hash::HashMapAccess<'t, RelKey, RelEntry>,
block_map: neon_shmem::hash::HashMapAccess<'t, BlockKey, BlockEntry>,
}
impl<'t> IntegratedCacheInitStruct<'t> {
/// Return the desired size in bytes of the shared memory area to reserve for the integrated
/// cache.
/// Return the desired size in bytes of the fixed-size shared memory area to reserve for the
/// integrated cache.
pub fn shmem_size(_max_procs: u32) -> usize {
// FIXME: the map uses its own ShmemHandle now. This is just for fixed-size allocations
// in the general Postgres shared memory segment.
0
HashMapInit::<RelKey, RelEntry>::estimate_size(RELSIZE_CACHE_SIZE)
}
/// Initialize the shared memory segment. This runs once in postmaster. Returns a struct which
/// will be inherited by all processes through fork.
pub fn shmem_init(
_max_procs: u32,
_shmem_area: &'t mut [MaybeUninit<u8>],
shmem_area: &'t mut [MaybeUninit<u8>],
) -> IntegratedCacheInitStruct<'t> {
let shmem_handle = ShmemHandle::new("integrated cache", 0, CACHE_AREA_SIZE).unwrap();
// Initialize the hash map
let relsize_cache_handle =
neon_shmem::hash::HashMapInit::init_in_fixed_area(RELSIZE_CACHE_SIZE, shmem_area);
// Initialize the shared memory area
let map_handle =
neon_shmem::hash::HashMapInit::init_in_shmem(shmem_handle, CACHE_AREA_SIZE);
IntegratedCacheInitStruct { map_handle }
let shmem_handle = ShmemHandle::new("block mapping", 0, MAX_BLOCK_MAP_SIZE).unwrap();
let block_map_handle =
neon_shmem::hash::HashMapInit::init_in_shmem(BLOCK_MAP_SIZE, shmem_handle);
IntegratedCacheInitStruct {
relsize_cache_handle,
block_map_handle,
}
}
pub fn worker_process_init(
@@ -101,11 +118,13 @@ impl<'t> IntegratedCacheInitStruct<'t> {
lsn: Lsn,
file_cache: Option<FileCache>,
) -> IntegratedCacheWriteAccess<'t> {
let IntegratedCacheInitStruct { map_handle } = self;
let map_writer = map_handle.attach_writer();
let IntegratedCacheInitStruct {
relsize_cache_handle,
block_map_handle,
} = self;
IntegratedCacheWriteAccess {
cache_map: map_writer,
relsize_cache: relsize_cache_handle.attach_writer(),
block_map: block_map_handle.attach_writer(),
global_lw_lsn: AtomicU64::new(lsn.0),
file_cache,
clock_hand: std::sync::Mutex::new(0),
@@ -122,35 +141,44 @@ impl<'t> IntegratedCacheInitStruct<'t> {
)
.unwrap(),
cache_map_num_buckets: metrics::IntGauge::new(
"cache_num_map_buckets",
"Allocated size of the cache hash map",
block_map_num_buckets: metrics::IntGauge::new(
"block_map_num_buckets",
"Allocated size of the block cache hash map",
)
.unwrap(),
cache_map_num_buckets_in_use: metrics::IntGauge::new(
"cache_num_map_buckets_in_use",
"Number of buckets in use in the cache hash map",
block_map_num_buckets_in_use: metrics::IntGauge::new(
"block_map_num_buckets_in_use",
"Number of buckets in use in the block cache hash map",
)
.unwrap(),
relsize_cache_num_buckets: metrics::IntGauge::new(
"relsize_cache_num_buckets",
"Allocated size of the relsize cache hash map",
)
.unwrap(),
relsize_cache_num_buckets_in_use: metrics::IntGauge::new(
"relsize_cache_num_buckets_in_use",
"Number of buckets in use in the relsize cache hash map",
)
.unwrap(),
}
}
pub fn backend_init(self) -> IntegratedCacheReadAccess<'t> {
let IntegratedCacheInitStruct { map_handle } = self;
let map_reader = map_handle.attach_reader();
let IntegratedCacheInitStruct {
relsize_cache_handle,
block_map_handle,
} = self;
IntegratedCacheReadAccess {
cache_map: map_reader,
relsize_cache: relsize_cache_handle.attach_reader(),
block_map: block_map_handle.attach_reader(),
}
}
}
enum MapEntry {
Rel(RelEntry),
Block(BlockEntry),
}
/// Value stored in the cache mapping hash table.
struct BlockEntry {
lw_lsn: AtomicLsn,
cache_block: AtomicU64,
@@ -161,27 +189,30 @@ struct BlockEntry {
referenced: AtomicBool,
}
/// Value stored in the relsize cache hash table.
struct RelEntry {
/// cached size of the relation
/// u32::MAX means 'not known' (that's InvalidBlockNumber in Postgres)
nblocks: AtomicU32,
}
impl std::fmt::Debug for MapEntry {
impl std::fmt::Debug for RelEntry {
fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
match self {
MapEntry::Rel(e) => fmt
.debug_struct("Rel")
.field("nblocks", &e.nblocks.load(Ordering::Relaxed))
.finish(),
MapEntry::Block(e) => fmt
.debug_struct("Block")
.field("lw_lsn", &e.lw_lsn.load())
.field("cache_block", &e.cache_block.load(Ordering::Relaxed))
.field("pinned", &e.pinned.load(Ordering::Relaxed))
.field("referenced", &e.referenced.load(Ordering::Relaxed))
.finish(),
}
fmt
.debug_struct("Rel")
.field("nblocks", &self.nblocks.load(Ordering::Relaxed))
.finish()
}
}
impl std::fmt::Debug for BlockEntry {
fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
fmt
.debug_struct("Block")
.field("lw_lsn", &self.lw_lsn.load())
.field("cache_block", &self.cache_block.load(Ordering::Relaxed))
.field("pinned", &self.pinned.load(Ordering::Relaxed))
.field("referenced", &self.referenced.load(Ordering::Relaxed))
.finish()
}
}
@@ -193,71 +224,30 @@ impl std::fmt::Debug for MapEntry {
Eq,
Hash,
Ord,
zerocopy_derive::IntoBytes,
zerocopy_derive::Immutable,
zerocopy_derive::FromBytes,
)]
#[repr(packed)]
// Note: the fields are stored in big-endian order. If we used the keys in a radix tree, that would
// make pack the tree more tightly, and would make scans over ranges of blocks work correctly,
// i.e. return the entries in block number order. XXX: We currently use a hash map though, so it
// doesn't matter.
struct MapKey {
spc_oid_be: u32,
db_oid_be: u32,
rel_number_be: u32,
fork_number: u8,
block_number_be: u32,
}
impl<'a> From<&'a [u8]> for MapKey {
fn from(bytes: &'a [u8]) -> Self {
Self::read_from_bytes(bytes).expect("invalid key length")
struct RelKey(RelTag);
impl From<&RelTag> for RelKey {
fn from(val: &RelTag) -> RelKey {
RelKey(val.clone())
}
}
// fixme: currently unused
#[allow(dead_code)]
fn key_range_for_rel_blocks(rel: &RelTag) -> Range<MapKey> {
Range {
start: MapKey::from((rel, 0)),
end: MapKey::from((rel, u32::MAX)),
}
#[derive(Clone, Debug, PartialEq, PartialOrd, Eq, Hash, Ord)]
struct BlockKey {
rel: RelTag,
block_number: u32,
}
impl From<&RelTag> for MapKey {
fn from(val: &RelTag) -> MapKey {
MapKey {
spc_oid_be: val.spc_oid.to_be(),
db_oid_be: val.db_oid.to_be(),
rel_number_be: val.rel_number.to_be(),
fork_number: val.fork_number.to_be(),
block_number_be: u32::MAX.to_be(),
impl From<(&RelTag, u32)> for BlockKey {
fn from(val: (&RelTag, u32)) -> BlockKey {
BlockKey {
rel: val.0.clone(),
block_number: val.1,
}
}
}
impl From<(&RelTag, u32)> for MapKey {
fn from(val: (&RelTag, u32)) -> MapKey {
MapKey {
spc_oid_be: val.0.spc_oid.to_be(),
db_oid_be: val.0.db_oid.to_be(),
rel_number_be: val.0.rel_number.to_be(),
fork_number: val.0.fork_number.to_be(),
block_number_be: val.1.to_be(),
}
}
}
impl neon_shmem::hash::Key for MapKey {
const KEY_LEN: usize = 4 + 4 + 4 + 1 + 4;
fn as_bytes(&self) -> &[u8] {
zerocopy::IntoBytes::as_bytes(self)
}
}
impl neon_shmem::hash::Value for MapEntry {}
/// Return type used in the cache's get_*() functions. 'Found' means that the page, or other
/// information that was enqueried, exists in the cache. '
pub enum CacheResult<V> {
@@ -272,7 +262,7 @@ pub enum CacheResult<V> {
impl<'t> IntegratedCacheWriteAccess<'t> {
pub fn get_rel_size(&'t self, rel: &RelTag) -> CacheResult<u32> {
if let Some(nblocks) = get_rel_size(&self.cache_map, rel) {
if let Some(nblocks) = get_rel_size(&self.relsize_cache, rel) {
CacheResult::Found(nblocks)
} else {
let lsn = Lsn(self.global_lw_lsn.load(Ordering::Relaxed));
@@ -286,14 +276,9 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
block_number: u32,
dst: impl uring_common::buf::IoBufMut + Send + Sync,
) -> Result<CacheResult<()>, std::io::Error> {
let x = if let Some(entry) =
self.cache_map.get(&MapKey::from((rel, block_number)))
let x = if let Some(block_entry) =
self.block_map.get(&BlockKey::from((rel, block_number)))
{
let block_entry = if let MapEntry::Block(e) = &*entry {
e
} else {
panic!("unexpected map entry type for block key");
};
block_entry.referenced.store(true, Ordering::Relaxed);
let cache_block = block_entry.cache_block.load(Ordering::Relaxed);
@@ -326,13 +311,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
rel: &RelTag,
block_number: u32,
) -> Result<CacheResult<()>, std::io::Error> {
if let Some(entry) = self.cache_map.get(&MapKey::from((rel, block_number))) {
let block_entry = if let MapEntry::Block(e) = &*entry {
e
} else {
panic!("unexpected map entry type for block key");
};
if let Some(block_entry) = self.block_map.get(&BlockKey::from((rel, block_number))) {
// This is used for prefetch requests. Treat the probe as an 'access', to keep it
// in cache.
block_entry.referenced.store(true, Ordering::Relaxed);
@@ -354,7 +333,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
/// information, i.e. we don't know if the relation exists or not.
pub fn get_rel_exists(&'t self, rel: &RelTag) -> CacheResult<bool> {
// we don't currently cache negative entries, so if the relation is in the cache, it exists
if let Some(_rel_entry) = self.cache_map.get(&MapKey::from(rel)) {
if let Some(_rel_entry) = self.relsize_cache.get(&RelKey::from(rel)) {
CacheResult::Found(true)
} else {
let lsn = Lsn(self.global_lw_lsn.load(Ordering::Relaxed));
@@ -374,16 +353,15 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
pub fn remember_rel_size(&'t self, rel: &RelTag, nblocks: u32) {
let result = self
.cache_map
.update_with_fn(&MapKey::from(rel), |existing| match existing {
.relsize_cache
.update_with_fn(&RelKey::from(rel), |existing| match existing {
None => {
tracing::info!("inserting rel entry for {rel:?}, {nblocks} blocks");
UpdateAction::Insert(MapEntry::Rel(RelEntry {
UpdateAction::Insert(RelEntry {
nblocks: AtomicU32::new(nblocks),
}))
})
}
Some(MapEntry::Block(_)) => panic!("unexpected map entry type for rel key"),
Some(MapEntry::Rel(e)) => {
Some(e) => {
tracing::info!("updating rel entry for {rel:?}, {nblocks} blocks");
e.nblocks.store(nblocks, Ordering::Relaxed);
UpdateAction::Nothing
@@ -404,7 +382,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
lw_lsn: Lsn,
is_write: bool,
) {
let key = MapKey::from((rel, block_number));
let key = BlockKey::from((rel, block_number));
// FIXME: make this work when file cache is disabled. Or make it mandatory
let file_cache = self.file_cache.as_ref().unwrap();
@@ -418,14 +396,8 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
let mut old_cache_block = None;
let mut found_existing = false;
let res = self.cache_map.update_with_fn(&key, |existing| {
if let Some(existing) = existing {
let block_entry = if let MapEntry::Block(e) = existing {
e
} else {
panic!("unexpected map entry type for block key");
};
let res = self.block_map.update_with_fn(&key, |existing| {
if let Some(block_entry) = existing {
found_existing = true;
// Prevent this entry from being evicted
@@ -474,15 +446,9 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
// FIXME: unpin the block entry on error
// Update the block entry
let res = self.cache_map.update_with_fn(&key, |existing| {
let res = self.block_map.update_with_fn(&key, |existing| {
assert_eq!(found_existing, existing.is_some());
if let Some(existing) = existing {
let block_entry = if let MapEntry::Block(e) = existing {
e
} else {
panic!("unexpected map entry type for block key");
};
if let Some(block_entry) = existing {
// Update the cache block
let old_blk = block_entry.cache_block.compare_exchange(
INVALID_CACHE_BLOCK,
@@ -500,12 +466,12 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
assert!(pin_count > 0);
UpdateAction::Nothing
} else {
UpdateAction::Insert(MapEntry::Block(BlockEntry {
UpdateAction::Insert(BlockEntry {
lw_lsn: AtomicLsn::new(lw_lsn.0),
cache_block: AtomicU64::new(cache_block),
pinned: AtomicU64::new(0),
referenced: AtomicBool::new(true),
}))
})
}
});
@@ -538,14 +504,8 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
.expect("error writing to cache");
// FIXME: handle errors gracefully.
let res = self.cache_map.update_with_fn(&key, |existing| {
if let Some(existing) = existing {
let block_entry = if let MapEntry::Block(e) = existing {
e
} else {
panic!("unexpected map entry type for block key");
};
let res = self.block_map.update_with_fn(&key, |existing| {
if let Some(block_entry) = existing {
// FIXME: could there be concurrent readers?
assert!(block_entry.pinned.load(Ordering::Relaxed) == 0);
@@ -555,12 +515,12 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
}
UpdateAction::Nothing
} else {
UpdateAction::Insert(MapEntry::Block(BlockEntry {
UpdateAction::Insert(BlockEntry {
lw_lsn: AtomicLsn::new(lw_lsn.0),
cache_block: AtomicU64::new(cache_block),
pinned: AtomicU64::new(0),
referenced: AtomicBool::new(true),
}))
})
}
});
@@ -573,7 +533,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
/// Forget information about given relation in the cache. (For DROP TABLE and such)
pub fn forget_rel(&'t self, rel: &RelTag) {
tracing::info!("forgetting rel entry for {rel:?}");
self.cache_map.remove(&MapKey::from(rel));
self.relsize_cache.remove(&RelKey::from(rel));
// also forget all cached blocks for the relation
// FIXME
@@ -632,20 +592,16 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
(*clock_hand) += 1;
let mut evict_this = false;
let num_buckets = self.cache_map.get_num_buckets();
let num_buckets = self.block_map.get_num_buckets();
match self
.cache_map
.block_map
.get_bucket((*clock_hand) % num_buckets)
.as_deref()
{
None => {
// This bucket was unused
}
Some(MapEntry::Rel(_)) => {
// ignore rel entries for now.
// TODO: They stick in the cache forever
}
Some(MapEntry::Block(blk_entry)) => {
Some(blk_entry) => {
if !blk_entry.referenced.swap(false, Ordering::Relaxed) {
// Evict this. Maybe.
evict_this = true;
@@ -657,12 +613,11 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
// grab the write lock
let mut evicted_cache_block = None;
let res =
self.cache_map
self.block_map
.update_with_fn_at_bucket(*clock_hand % num_buckets, |old| {
match old {
None => UpdateAction::Nothing,
Some(MapEntry::Rel(_)) => panic!("unexpected Rel entry"),
Some(MapEntry::Block(old)) => {
Some(old) => {
// note: all the accesses to 'pinned' currently happen
// within update_with_fn(), or while holding ValueReadGuard, which protects from concurrent
// updates. Otherwise, another thread could set the 'pinned'
@@ -680,9 +635,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
if cache_block != INVALID_CACHE_BLOCK {
evicted_cache_block = Some(cache_block);
}
// TODO: we don't evict the entry, just the block. Does it make
// sense to keep the entry?
UpdateAction::Nothing
UpdateAction::Remove
}
}
});
@@ -712,24 +665,34 @@ impl metrics::core::Collector for IntegratedCacheWriteAccess<'_> {
descs.append(&mut self.page_evictions_counter.desc());
descs.append(&mut self.clock_iterations_counter.desc());
descs.append(&mut self.cache_map_num_buckets.desc());
descs.append(&mut self.cache_map_num_buckets_in_use.desc());
descs.append(&mut self.block_map_num_buckets.desc());
descs.append(&mut self.block_map_num_buckets_in_use.desc());
descs.append(&mut self.relsize_cache_num_buckets.desc());
descs.append(&mut self.relsize_cache_num_buckets_in_use.desc());
descs
}
fn collect(&self) -> Vec<metrics::proto::MetricFamily> {
// Update gauges
self.cache_map_num_buckets
.set(self.cache_map.get_num_buckets() as i64);
self.cache_map_num_buckets_in_use
.set(self.cache_map.get_num_buckets_in_use() as i64);
self.block_map_num_buckets
.set(self.block_map.get_num_buckets() as i64);
self.block_map_num_buckets_in_use
.set(self.block_map.get_num_buckets_in_use() as i64);
self.relsize_cache_num_buckets
.set(self.relsize_cache.get_num_buckets() as i64);
self.relsize_cache_num_buckets_in_use
.set(self.relsize_cache.get_num_buckets_in_use() as i64);
let mut values = Vec::new();
values.append(&mut self.page_evictions_counter.collect());
values.append(&mut self.clock_iterations_counter.collect());
values.append(&mut self.cache_map_num_buckets.collect());
values.append(&mut self.cache_map_num_buckets_in_use.collect());
values.append(&mut self.block_map_num_buckets.collect());
values.append(&mut self.block_map_num_buckets_in_use.collect());
values.append(&mut self.relsize_cache_num_buckets.collect());
values.append(&mut self.relsize_cache_num_buckets_in_use.collect());
values
}
@@ -740,16 +703,10 @@ impl metrics::core::Collector for IntegratedCacheWriteAccess<'_> {
/// This is in a separate function so that it can be shared by
/// IntegratedCacheReadAccess::get_rel_size() and IntegratedCacheWriteAccess::get_rel_size()
fn get_rel_size<'t>(
r: &neon_shmem::hash::HashMapAccess<MapKey, MapEntry>,
r: &neon_shmem::hash::HashMapAccess<RelKey, RelEntry>,
rel: &RelTag,
) -> Option<u32> {
if let Some(existing) = r.get(&MapKey::from(rel)) {
let rel_entry = if let MapEntry::Rel(ref e) = *existing {
e
} else {
panic!("unexpected map entry type for rel key");
};
if let Some(rel_entry) = r.get(&RelKey::from(rel)) {
let nblocks = rel_entry.nblocks.load(Ordering::Relaxed);
if nblocks != u32::MAX {
Some(nblocks)
@@ -767,7 +724,7 @@ fn get_rel_size<'t>(
/// request to the communicator process.
impl<'t> IntegratedCacheReadAccess<'t> {
pub fn get_rel_size(&'t self, rel: &RelTag) -> Option<u32> {
get_rel_size(&self.cache_map, rel)
get_rel_size(&self.relsize_cache, rel)
}
pub fn start_read_op(&'t self) -> BackendCacheReadOp<'t> {
@@ -793,16 +750,11 @@ impl<'e> BackendCacheReadOp<'e> {
/// After you have completed the read, call BackendCacheReadResult::finish() to check if the
/// read was in fact valid or not. If it was concurrently invalidated, you need to retry.
pub fn get_page(&mut self, rel: &RelTag, block_number: u32) -> Option<u64> {
if let Some(entry) = self
if let Some(block_entry) = self
.map_access
.cache_map
.get(&MapKey::from((rel, block_number)))
.block_map
.get(&BlockKey::from((rel, block_number)))
{
let block_entry = if let MapEntry::Block(ref e) = *entry {
e
} else {
panic!("unexpected map entry type for block key");
};
block_entry.referenced.store(true, Ordering::Relaxed);
let cache_block = block_entry.cache_block.load(Ordering::Relaxed);