Update integrated_cache.rs to use new hashmap API

This commit is contained in:
David Freifeld
2025-07-02 12:18:37 -07:00
parent 0c099b0944
commit 86fb7b966a
2 changed files with 34 additions and 55 deletions

View File

@@ -302,7 +302,6 @@ where
}),
_ => None,
}
>>>>>>> quantumish/lfc-resizable-map
}
/// Returns the number of buckets in the table.

View File

@@ -267,8 +267,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
block_number: u32,
dst: impl uring_common::buf::IoBufMut + Send + Sync,
) -> Result<CacheResult<()>, std::io::Error> {
let hash = self.block_map.get_hash_value(&BlockKey::from((rel, block_number)));
let x = if let Some(block_entry) = self.block_map.get_with_hash(&BlockKey::from((rel, block_number)), hash)
let x = if let Some(block_entry) = self.block_map.get(&BlockKey::from((rel, block_number)))
{
block_entry.referenced.store(true, Ordering::Relaxed);
@@ -302,8 +301,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
rel: &RelTag,
block_number: u32,
) -> Result<CacheResult<()>, std::io::Error> {
let hash = self.block_map.get_hash_value(&BlockKey::from((rel, block_number)));
if let Some(block_entry) = self.block_map.get_with_hash(&BlockKey::from((rel, block_number)), hash) {
if let Some(block_entry) = self.block_map.get(&BlockKey::from((rel, block_number))) {
// This is used for prefetch requests. Treat the probe as an 'access', to keep it
// in cache.
block_entry.referenced.store(true, Ordering::Relaxed);
@@ -325,8 +323,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
/// information, i.e. we don't know if the relation exists or not.
pub fn get_rel_exists(&'t self, rel: &RelTag) -> CacheResult<bool> {
// we don't currently cache negative entries, so if the relation is in the cache, it exists
let hash = self.relsize_cache.get_hash_value(&RelKey::from(rel));
if let Some(_rel_entry) = self.relsize_cache.get_with_hash(&RelKey::from(rel), hash) {
if let Some(_rel_entry) = self.relsize_cache.get(&RelKey::from(rel)) {
CacheResult::Found(true)
} else {
let lsn = Lsn(self.global_lw_lsn.load(Ordering::Relaxed));
@@ -345,12 +342,11 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
}
pub fn remember_rel_size(&'t self, rel: &RelTag, nblocks: u32) {
let hash = self.relsize_cache.get_hash_value(&RelKey::from(rel));
match self.relsize_cache.entry_with_hash(RelKey::from(rel), hash) {
match self.relsize_cache.entry(RelKey::from(rel)) {
Entry::Vacant(e) => {
tracing::info!("inserting rel entry for {rel:?}, {nblocks} blocks");
// FIXME: what to do if we run out of memory? Evict other relation entries?
e.insert(RelEntry {
_ = e.insert(RelEntry {
nblocks: AtomicU32::new(nblocks),
}).expect("out of memory");
},
@@ -384,8 +380,9 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
let mut old_cache_block = None;
let mut found_existing = false;
let hash = self.block_map.get_hash_value(&key);
if let Entry::Occupied(e) = self.block_map.entry_with_hash(key.clone(), hash) {
// NOTE(quantumish): honoring original semantics here (used to be update_with_fn)
// but I don't see any reason why this has to take a write lock.
if let Entry::Occupied(e) = self.block_map.entry(key.clone()) {
let block_entry = e.get();
found_existing = true;
@@ -428,8 +425,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
// FIXME: unpin the block entry on error
// Update the block entry
let hash = self.block_map.get_hash_value(&key);
let entry = self.block_map.entry_with_hash(key, hash);
let entry = self.block_map.entry(key);
assert_eq!(found_existing, matches!(entry, Entry::Occupied(_)));
match entry {
Entry::Occupied(e) => {
@@ -453,7 +449,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
Entry::Vacant(e) => {
// FIXME: what to do if we run out of memory? Evict other relation entries? Remove
// block entries first?
e.insert(BlockEntry {
_ = e.insert(BlockEntry {
lw_lsn: AtomicLsn::new(lw_lsn.0),
cache_block: AtomicU64::new(cache_block),
pinned: AtomicU64::new(0),
@@ -487,8 +483,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
.expect("error writing to cache");
// FIXME: handle errors gracefully.
let hash = self.block_map.get_hash_value(&key);
match self.block_map.entry_with_hash(key, hash) {
match self.block_map.entry(key) {
Entry::Occupied(e) => {
let block_entry = e.get();
// FIXME: could there be concurrent readers?
@@ -502,7 +497,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
Entry::Vacant(e) => {
// FIXME: what to do if we run out of memory? Evict other relation entries? Remove
// block entries first?
e.insert(BlockEntry {
_ = e.insert(BlockEntry {
lw_lsn: AtomicLsn::new(lw_lsn.0),
cache_block: AtomicU64::new(cache_block),
pinned: AtomicU64::new(0),
@@ -516,8 +511,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
/// Forget information about given relation in the cache. (For DROP TABLE and such)
pub fn forget_rel(&'t self, rel: &RelTag) {
tracing::info!("forgetting rel entry for {rel:?}");
let hash = self.relsize_cache.get_hash_value(&RelKey::from(rel));
self.relsize_cache.remove_with_hash(&RelKey::from(rel), hash);
self.relsize_cache.remove(&RelKey::from(rel));
// also forget all cached blocks for the relation
// FIXME
@@ -596,37 +590,25 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
if evict_this {
// grab the write lock
let mut evicted_cache_block = None;
todo!("quantumish: re-add support for point removal without demolishing performance");
// self.block_map
// .update_with_fn_at_bucket(*clock_hand % num_buckets, |old| {
// match old {
// None => UpdateAction::Nothing,
// Some(old) => {
// // note: all the accesses to 'pinned' currently happen
// // within update_with_fn(), or while holding ValueReadGuard, which protects from concurrent
// // updates. Otherwise, another thread could set the 'pinned'
// // flag just after we have checked it here.
// if old.pinned.load(Ordering::Relaxed) != 0 {
// return UpdateAction::Nothing;
// }
// let _ = self
// .global_lw_lsn
// .fetch_max(old.lw_lsn.load().0, Ordering::Relaxed);
// let cache_block = old
// .cache_block
// .swap(INVALID_CACHE_BLOCK, Ordering::Relaxed);
// if cache_block != INVALID_CACHE_BLOCK {
// evicted_cache_block = Some(cache_block);
// }
// UpdateAction::Remove
// }
// }
// });
// Out of memory should not happen here, as we're only updating existing values,
// not inserting new entries to the map.
// res.expect("out of memory");
if let Some(e) = self.block_map.entry_at_bucket(*clock_hand % num_buckets) {
let old = e.get();
// note: all the accesses to 'pinned' currently happen
// within update_with_fn(), or while holding ValueReadGuard, which protects from concurrent
// updates. Otherwise, another thread could set the 'pinned'
// flag just after we have checked it here.
if old.pinned.load(Ordering::Relaxed) == 0 {
let _ = self
.global_lw_lsn
.fetch_max(old.lw_lsn.load().0, Ordering::Relaxed);
let cache_block = old
.cache_block
.swap(INVALID_CACHE_BLOCK, Ordering::Relaxed);
if cache_block != INVALID_CACHE_BLOCK {
evicted_cache_block = Some(cache_block);
}
e.remove();
}
}
if evicted_cache_block.is_some() {
self.page_evictions_counter.inc();
@@ -705,8 +687,7 @@ fn get_rel_size<'t>(
r: &neon_shmem::hash::HashMapAccess<RelKey, RelEntry>,
rel: &RelTag,
) -> Option<u32> {
let hash = r.get_hash_value(&RelKey::from(rel));
if let Some(rel_entry) = r.get_with_hash(&RelKey::from(rel), hash) {
if let Some(rel_entry) = r.get(&RelKey::from(rel)) {
let nblocks = rel_entry.nblocks.load(Ordering::Relaxed);
if nblocks != u32::MAX {
Some(nblocks)
@@ -750,11 +731,10 @@ impl<'e> BackendCacheReadOp<'e> {
/// After you have completed the read, call BackendCacheReadResult::finish() to check if the
/// read was in fact valid or not. If it was concurrently invalidated, you need to retry.
pub fn get_page(&mut self, rel: &RelTag, block_number: u32) -> Option<u64> {
let hash = self.map_access.block_map.get_hash_value(&BlockKey::from((rel, block_number)));
if let Some(block_entry) = self
.map_access
.block_map
.get_with_hash(&BlockKey::from((rel, block_number)), hash)
.get(&BlockKey::from((rel, block_number)))
{
block_entry.referenced.store(true, Ordering::Relaxed);