diff --git a/libs/neon-shmem/src/hash.rs b/libs/neon-shmem/src/hash.rs index b46a58faaf..733e4b6f33 100644 --- a/libs/neon-shmem/src/hash.rs +++ b/libs/neon-shmem/src/hash.rs @@ -302,7 +302,6 @@ where }), _ => None, } ->>>>>>> quantumish/lfc-resizable-map } /// Returns the number of buckets in the table. diff --git a/pgxn/neon/communicator/src/integrated_cache.rs b/pgxn/neon/communicator/src/integrated_cache.rs index d710f0e35b..acd73b3b40 100644 --- a/pgxn/neon/communicator/src/integrated_cache.rs +++ b/pgxn/neon/communicator/src/integrated_cache.rs @@ -267,8 +267,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> { block_number: u32, dst: impl uring_common::buf::IoBufMut + Send + Sync, ) -> Result, std::io::Error> { - let hash = self.block_map.get_hash_value(&BlockKey::from((rel, block_number))); - let x = if let Some(block_entry) = self.block_map.get_with_hash(&BlockKey::from((rel, block_number)), hash) + let x = if let Some(block_entry) = self.block_map.get(&BlockKey::from((rel, block_number))) { block_entry.referenced.store(true, Ordering::Relaxed); @@ -302,8 +301,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> { rel: &RelTag, block_number: u32, ) -> Result, std::io::Error> { - let hash = self.block_map.get_hash_value(&BlockKey::from((rel, block_number))); - if let Some(block_entry) = self.block_map.get_with_hash(&BlockKey::from((rel, block_number)), hash) { + if let Some(block_entry) = self.block_map.get(&BlockKey::from((rel, block_number))) { // This is used for prefetch requests. Treat the probe as an 'access', to keep it // in cache. block_entry.referenced.store(true, Ordering::Relaxed); @@ -325,8 +323,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> { /// information, i.e. we don't know if the relation exists or not. pub fn get_rel_exists(&'t self, rel: &RelTag) -> CacheResult { // we don't currently cache negative entries, so if the relation is in the cache, it exists - let hash = self.relsize_cache.get_hash_value(&RelKey::from(rel)); - if let Some(_rel_entry) = self.relsize_cache.get_with_hash(&RelKey::from(rel), hash) { + if let Some(_rel_entry) = self.relsize_cache.get(&RelKey::from(rel)) { CacheResult::Found(true) } else { let lsn = Lsn(self.global_lw_lsn.load(Ordering::Relaxed)); @@ -345,12 +342,11 @@ impl<'t> IntegratedCacheWriteAccess<'t> { } pub fn remember_rel_size(&'t self, rel: &RelTag, nblocks: u32) { - let hash = self.relsize_cache.get_hash_value(&RelKey::from(rel)); - match self.relsize_cache.entry_with_hash(RelKey::from(rel), hash) { + match self.relsize_cache.entry(RelKey::from(rel)) { Entry::Vacant(e) => { tracing::info!("inserting rel entry for {rel:?}, {nblocks} blocks"); // FIXME: what to do if we run out of memory? Evict other relation entries? - e.insert(RelEntry { + _ = e.insert(RelEntry { nblocks: AtomicU32::new(nblocks), }).expect("out of memory"); }, @@ -384,8 +380,9 @@ impl<'t> IntegratedCacheWriteAccess<'t> { let mut old_cache_block = None; let mut found_existing = false; - let hash = self.block_map.get_hash_value(&key); - if let Entry::Occupied(e) = self.block_map.entry_with_hash(key.clone(), hash) { + // NOTE(quantumish): honoring original semantics here (used to be update_with_fn) + // but I don't see any reason why this has to take a write lock. + if let Entry::Occupied(e) = self.block_map.entry(key.clone()) { let block_entry = e.get(); found_existing = true; @@ -428,8 +425,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> { // FIXME: unpin the block entry on error // Update the block entry - let hash = self.block_map.get_hash_value(&key); - let entry = self.block_map.entry_with_hash(key, hash); + let entry = self.block_map.entry(key); assert_eq!(found_existing, matches!(entry, Entry::Occupied(_))); match entry { Entry::Occupied(e) => { @@ -453,7 +449,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> { Entry::Vacant(e) => { // FIXME: what to do if we run out of memory? Evict other relation entries? Remove // block entries first? - e.insert(BlockEntry { + _ = e.insert(BlockEntry { lw_lsn: AtomicLsn::new(lw_lsn.0), cache_block: AtomicU64::new(cache_block), pinned: AtomicU64::new(0), @@ -487,8 +483,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> { .expect("error writing to cache"); // FIXME: handle errors gracefully. - let hash = self.block_map.get_hash_value(&key); - match self.block_map.entry_with_hash(key, hash) { + match self.block_map.entry(key) { Entry::Occupied(e) => { let block_entry = e.get(); // FIXME: could there be concurrent readers? @@ -502,7 +497,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> { Entry::Vacant(e) => { // FIXME: what to do if we run out of memory? Evict other relation entries? Remove // block entries first? - e.insert(BlockEntry { + _ = e.insert(BlockEntry { lw_lsn: AtomicLsn::new(lw_lsn.0), cache_block: AtomicU64::new(cache_block), pinned: AtomicU64::new(0), @@ -516,8 +511,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> { /// Forget information about given relation in the cache. (For DROP TABLE and such) pub fn forget_rel(&'t self, rel: &RelTag) { tracing::info!("forgetting rel entry for {rel:?}"); - let hash = self.relsize_cache.get_hash_value(&RelKey::from(rel)); - self.relsize_cache.remove_with_hash(&RelKey::from(rel), hash); + self.relsize_cache.remove(&RelKey::from(rel)); // also forget all cached blocks for the relation // FIXME @@ -596,37 +590,25 @@ impl<'t> IntegratedCacheWriteAccess<'t> { if evict_this { // grab the write lock let mut evicted_cache_block = None; - todo!("quantumish: re-add support for point removal without demolishing performance"); - // self.block_map - // .update_with_fn_at_bucket(*clock_hand % num_buckets, |old| { - // match old { - // None => UpdateAction::Nothing, - // Some(old) => { - // // note: all the accesses to 'pinned' currently happen - // // within update_with_fn(), or while holding ValueReadGuard, which protects from concurrent - // // updates. Otherwise, another thread could set the 'pinned' - // // flag just after we have checked it here. - // if old.pinned.load(Ordering::Relaxed) != 0 { - // return UpdateAction::Nothing; - // } - - // let _ = self - // .global_lw_lsn - // .fetch_max(old.lw_lsn.load().0, Ordering::Relaxed); - // let cache_block = old - // .cache_block - // .swap(INVALID_CACHE_BLOCK, Ordering::Relaxed); - // if cache_block != INVALID_CACHE_BLOCK { - // evicted_cache_block = Some(cache_block); - // } - // UpdateAction::Remove - // } - // } - // }); - - // Out of memory should not happen here, as we're only updating existing values, - // not inserting new entries to the map. - // res.expect("out of memory"); + if let Some(e) = self.block_map.entry_at_bucket(*clock_hand % num_buckets) { + let old = e.get(); + // note: all the accesses to 'pinned' currently happen + // within update_with_fn(), or while holding ValueReadGuard, which protects from concurrent + // updates. Otherwise, another thread could set the 'pinned' + // flag just after we have checked it here. + if old.pinned.load(Ordering::Relaxed) == 0 { + let _ = self + .global_lw_lsn + .fetch_max(old.lw_lsn.load().0, Ordering::Relaxed); + let cache_block = old + .cache_block + .swap(INVALID_CACHE_BLOCK, Ordering::Relaxed); + if cache_block != INVALID_CACHE_BLOCK { + evicted_cache_block = Some(cache_block); + } + e.remove(); + } + } if evicted_cache_block.is_some() { self.page_evictions_counter.inc(); @@ -705,8 +687,7 @@ fn get_rel_size<'t>( r: &neon_shmem::hash::HashMapAccess, rel: &RelTag, ) -> Option { - let hash = r.get_hash_value(&RelKey::from(rel)); - if let Some(rel_entry) = r.get_with_hash(&RelKey::from(rel), hash) { + if let Some(rel_entry) = r.get(&RelKey::from(rel)) { let nblocks = rel_entry.nblocks.load(Ordering::Relaxed); if nblocks != u32::MAX { Some(nblocks) @@ -750,11 +731,10 @@ impl<'e> BackendCacheReadOp<'e> { /// After you have completed the read, call BackendCacheReadResult::finish() to check if the /// read was in fact valid or not. If it was concurrently invalidated, you need to retry. pub fn get_page(&mut self, rel: &RelTag, block_number: u32) -> Option { - let hash = self.map_access.block_map.get_hash_value(&BlockKey::from((rel, block_number))); if let Some(block_entry) = self .map_access .block_map - .get_with_hash(&BlockKey::from((rel, block_number)), hash) + .get(&BlockKey::from((rel, block_number))) { block_entry.referenced.store(true, Ordering::Relaxed);