From da3f9ee72dacb86e3f76e826cdcdd27f938b8ac0 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Fri, 4 Jul 2025 12:39:41 +0300 Subject: [PATCH] cargo fmt --- libs/neon-shmem/benches/hmap_resize.rs | 472 +++++++++-------- libs/neon-shmem/src/hash.rs | 473 +++++++++--------- libs/neon-shmem/src/hash/core.rs | 105 ++-- libs/neon-shmem/src/hash/entry.rs | 106 ++-- libs/neon-shmem/src/hash/tests.rs | 427 ++++++++-------- libs/neon-shmem/src/shmem.rs | 31 +- libs/neon-shmem/src/sync.rs | 159 +++--- .../neon/communicator/src/integrated_cache.rs | 134 ++--- 8 files changed, 978 insertions(+), 929 deletions(-) diff --git a/libs/neon-shmem/benches/hmap_resize.rs b/libs/neon-shmem/benches/hmap_resize.rs index 6b86e7ed27..edc0eec50b 100644 --- a/libs/neon-shmem/benches/hmap_resize.rs +++ b/libs/neon-shmem/benches/hmap_resize.rs @@ -1,12 +1,12 @@ -use criterion::{criterion_group, criterion_main, BatchSize, Criterion, BenchmarkId}; +use criterion::{BatchSize, BenchmarkId, Criterion, criterion_group, criterion_main}; use neon_shmem::hash::HashMapAccess; use neon_shmem::hash::HashMapInit; use neon_shmem::hash::entry::Entry; -use rand::prelude::*; use rand::distr::{Distribution, StandardUniform}; -use std::hash::BuildHasher; +use rand::prelude::*; use std::default::Default; - +use std::hash::BuildHasher; + // Taken from bindings to C code #[derive(Clone, Debug, Hash, Eq, PartialEq)] @@ -20,15 +20,15 @@ pub struct FileCacheKey { } impl Distribution for StandardUniform { - // questionable, but doesn't need to be good randomness - fn sample(&self, rng: &mut R) -> FileCacheKey { - FileCacheKey { - _spc_id: rng.random(), - _db_id: rng.random(), - _rel_number: rng.random(), - _fork_num: rng.random(), - _block_num: rng.random() - } + // questionable, but doesn't need to be good randomness + fn sample(&self, rng: &mut R) -> FileCacheKey { + FileCacheKey { + _spc_id: rng.random(), + _db_id: rng.random(), + _rel_number: rng.random(), + _fork_num: rng.random(), + _block_num: rng.random(), + } } } @@ -43,240 +43,288 @@ pub struct FileCacheEntry { } impl FileCacheEntry { - fn dummy() -> Self { - Self { - _offset: 0, - _access_count: 0, - _prev: std::ptr::null_mut(), - _next: std::ptr::null_mut(), - _state: [0; 8] - } - } + fn dummy() -> Self { + Self { + _offset: 0, + _access_count: 0, + _prev: std::ptr::null_mut(), + _next: std::ptr::null_mut(), + _state: [0; 8], + } + } } // Utilities for applying operations. #[derive(Clone, Debug)] -struct TestOp(K, Option); +struct TestOp(K, Option); fn apply_op( - op: TestOp, - map: &mut HashMapAccess, + op: TestOp, + map: &mut HashMapAccess, ) { - let entry = map.entry(op.0); + let entry = map.entry(op.0); match op.1 { - Some(new) => { - match entry { - Entry::Occupied(mut e) => Some(e.insert(new)), - Entry::Vacant(e) => { _ = e.insert(new).unwrap(); None }, - } - }, - None => { - match entry { - Entry::Occupied(e) => Some(e.remove()), - Entry::Vacant(_) => None, - } - }, - }; + Some(new) => match entry { + Entry::Occupied(mut e) => Some(e.insert(new)), + Entry::Vacant(e) => { + _ = e.insert(new).unwrap(); + None + } + }, + None => match entry { + Entry::Occupied(e) => Some(e.remove()), + Entry::Vacant(_) => None, + }, + }; } // Hash utilities struct SeaRandomState { - k1: u64, - k2: u64, - k3: u64, - k4: u64 + k1: u64, + k2: u64, + k3: u64, + k4: u64, } impl std::hash::BuildHasher for SeaRandomState { - type Hasher = seahash::SeaHasher; - - fn build_hasher(&self) -> Self::Hasher { - seahash::SeaHasher::with_seeds(self.k1, self.k2, self.k3, self.k4) - } + type Hasher = seahash::SeaHasher; + + fn build_hasher(&self) -> Self::Hasher { + seahash::SeaHasher::with_seeds(self.k1, self.k2, self.k3, self.k4) + } } impl SeaRandomState { - fn new() -> Self { - let mut rng = rand::rng(); - Self { k1: rng.random(), k2: rng.random(), k3: rng.random(), k4: rng.random() } - } + fn new() -> Self { + let mut rng = rand::rng(); + Self { + k1: rng.random(), + k2: rng.random(), + k3: rng.random(), + k4: rng.random(), + } + } } fn small_benchs(c: &mut Criterion) { - let mut group = c.benchmark_group("Small maps"); + let mut group = c.benchmark_group("Small maps"); group.sample_size(10); - - group.bench_function("small_rehash", |b| { - let ideal_filled = 4_000_000; - let size = 5_000_000; - let mut writer = HashMapInit::new_resizeable(size, size * 2).attach_writer(); - let mut rng = rand::rng(); - while writer.get_num_buckets_in_use() < ideal_filled as usize { - let key: FileCacheKey = rng.random(); - let val = FileCacheEntry::dummy(); - apply_op(TestOp(key, Some(val)), &mut writer); - } - b.iter(|| writer.shuffle()); - }); - - group.bench_function("small_rehash_xxhash", |b| { - let ideal_filled = 4_000_000; - let size = 5_000_000; - let mut writer = HashMapInit::new_resizeable(size, size * 2) - .with_hasher(twox_hash::xxhash64::RandomState::default()) - .attach_writer(); - let mut rng = rand::rng(); - while writer.get_num_buckets_in_use() < ideal_filled as usize { - let key: FileCacheKey = rng.random(); - let val = FileCacheEntry::dummy(); - apply_op(TestOp(key, Some(val)), &mut writer); - } - b.iter(|| writer.shuffle()); - }); + group.bench_function("small_rehash", |b| { + let ideal_filled = 4_000_000; + let size = 5_000_000; + let mut writer = HashMapInit::new_resizeable(size, size * 2).attach_writer(); + let mut rng = rand::rng(); + while writer.get_num_buckets_in_use() < ideal_filled as usize { + let key: FileCacheKey = rng.random(); + let val = FileCacheEntry::dummy(); + apply_op(TestOp(key, Some(val)), &mut writer); + } + b.iter(|| writer.shuffle()); + }); - - group.bench_function("small_rehash_ahash", |b| { - let ideal_filled = 4_000_000; - let size = 5_000_000; - let mut writer = HashMapInit::new_resizeable(size, size * 2) - .with_hasher(ahash::RandomState::default()) - .attach_writer(); - let mut rng = rand::rng(); - while writer.get_num_buckets_in_use() < ideal_filled as usize { - let key: FileCacheKey = rng.random(); - let val = FileCacheEntry::dummy(); - apply_op(TestOp(key, Some(val)), &mut writer); - } - b.iter(|| writer.shuffle()); - }); + group.bench_function("small_rehash_xxhash", |b| { + let ideal_filled = 4_000_000; + let size = 5_000_000; + let mut writer = HashMapInit::new_resizeable(size, size * 2) + .with_hasher(twox_hash::xxhash64::RandomState::default()) + .attach_writer(); + let mut rng = rand::rng(); + while writer.get_num_buckets_in_use() < ideal_filled as usize { + let key: FileCacheKey = rng.random(); + let val = FileCacheEntry::dummy(); + apply_op(TestOp(key, Some(val)), &mut writer); + } + b.iter(|| writer.shuffle()); + }); - group.bench_function("small_rehash_seahash", |b| { - let ideal_filled = 4_000_000; - let size = 5_000_000; - let mut writer = HashMapInit::new_resizeable(size, size * 2) - .with_hasher(SeaRandomState::new()) - .attach_writer(); - let mut rng = rand::rng(); - while writer.get_num_buckets_in_use() < ideal_filled as usize { - let key: FileCacheKey = rng.random(); - let val = FileCacheEntry::dummy(); - apply_op(TestOp(key, Some(val)), &mut writer); - } - b.iter(|| writer.shuffle()); - }); + group.bench_function("small_rehash_ahash", |b| { + let ideal_filled = 4_000_000; + let size = 5_000_000; + let mut writer = HashMapInit::new_resizeable(size, size * 2) + .with_hasher(ahash::RandomState::default()) + .attach_writer(); + let mut rng = rand::rng(); + while writer.get_num_buckets_in_use() < ideal_filled as usize { + let key: FileCacheKey = rng.random(); + let val = FileCacheEntry::dummy(); + apply_op(TestOp(key, Some(val)), &mut writer); + } + b.iter(|| writer.shuffle()); + }); - group.finish(); + group.bench_function("small_rehash_seahash", |b| { + let ideal_filled = 4_000_000; + let size = 5_000_000; + let mut writer = HashMapInit::new_resizeable(size, size * 2) + .with_hasher(SeaRandomState::new()) + .attach_writer(); + let mut rng = rand::rng(); + while writer.get_num_buckets_in_use() < ideal_filled as usize { + let key: FileCacheKey = rng.random(); + let val = FileCacheEntry::dummy(); + apply_op(TestOp(key, Some(val)), &mut writer); + } + b.iter(|| writer.shuffle()); + }); + + group.finish(); } fn real_benchs(c: &mut Criterion) { - let mut group = c.benchmark_group("Realistic workloads"); - group.sample_size(10); + let mut group = c.benchmark_group("Realistic workloads"); + group.sample_size(10); group.bench_function("real_bulk_insert", |b| { - let size = 125_000_000; - let ideal_filled = 100_000_000; - let mut rng = rand::rng(); - b.iter_batched( - || HashMapInit::new_resizeable(size, size * 2).attach_writer(), - |writer| { - for _ in 0..ideal_filled { - let key: FileCacheKey = rng.random(); - let val = FileCacheEntry::dummy(); - let entry = writer.entry(key); - std::hint::black_box(match entry { - Entry::Occupied(mut e) => { e.insert(val); }, - Entry::Vacant(e) => { _ = e.insert(val).unwrap(); }, - }) - } - }, - BatchSize::SmallInput, - ) - }); - - group.bench_function("real_rehash", |b| { - let size = 125_000_000; - let ideal_filled = 100_000_000; - let mut writer = HashMapInit::new_resizeable(size, size).attach_writer(); - let mut rng = rand::rng(); - while writer.get_num_buckets_in_use() < ideal_filled { - let key: FileCacheKey = rng.random(); - let val = FileCacheEntry::dummy(); - apply_op(TestOp(key, Some(val)), &mut writer); - } - b.iter(|| writer.shuffle()); - }); - - group.bench_function("real_rehash_hashbrown", |b| { - let size = 125_000_000; - let ideal_filled = 100_000_000; - let mut writer = hashbrown::raw::RawTable::new(); - let mut rng = rand::rng(); - let hasher = rustc_hash::FxBuildHasher::default(); - unsafe { - writer.resize(size, |(k,_)| hasher.hash_one(&k), - hashbrown::raw::Fallibility::Infallible).unwrap(); - } - while writer.len() < ideal_filled as usize { - let key: FileCacheKey = rng.random(); - let val = FileCacheEntry::dummy(); - writer.insert(hasher.hash_one(&key), (key, val), |(k,_)| hasher.hash_one(&k)); - } - b.iter(|| unsafe { writer.table.rehash_in_place( - &|table, index| hasher.hash_one(&table.bucket::<(FileCacheKey, FileCacheEntry)>(index).as_ref().0), - std::mem::size_of::<(FileCacheKey, FileCacheEntry)>(), - if std::mem::needs_drop::<(FileCacheKey, FileCacheEntry)>() { - Some(|ptr| std::ptr::drop_in_place(ptr as *mut (FileCacheKey, FileCacheEntry))) - } else { - None + let size = 125_000_000; + let ideal_filled = 100_000_000; + let mut rng = rand::rng(); + b.iter_batched( + || HashMapInit::new_resizeable(size, size * 2).attach_writer(), + |writer| { + for _ in 0..ideal_filled { + let key: FileCacheKey = rng.random(); + let val = FileCacheEntry::dummy(); + let entry = writer.entry(key); + std::hint::black_box(match entry { + Entry::Occupied(mut e) => { + e.insert(val); + } + Entry::Vacant(e) => { + _ = e.insert(val).unwrap(); + } + }) + } }, - ) }); - }); + BatchSize::SmallInput, + ) + }); - for elems in [2, 4, 8, 16, 32, 64, 96, 112] { - group.bench_with_input(BenchmarkId::new("real_rehash_varied", elems), &elems, |b, &size| { - let ideal_filled = size * 1_000_000; - let size = 125_000_000; - let mut writer = HashMapInit::new_resizeable(size, size).attach_writer(); - let mut rng = rand::rng(); - while writer.get_num_buckets_in_use() < ideal_filled as usize { - let key: FileCacheKey = rng.random(); - let val = FileCacheEntry::dummy(); - apply_op(TestOp(key, Some(val)), &mut writer); - } - b.iter(|| writer.shuffle()); - }); - group.bench_with_input(BenchmarkId::new("real_rehash_varied_hashbrown", elems), &elems, |b, &size| { - let ideal_filled = size * 1_000_000; - let size = 125_000_000; - let mut writer = hashbrown::raw::RawTable::new(); - let mut rng = rand::rng(); - let hasher = rustc_hash::FxBuildHasher::default(); - unsafe { - writer.resize(size, |(k,_)| hasher.hash_one(&k), - hashbrown::raw::Fallibility::Infallible).unwrap(); - } - while writer.len() < ideal_filled as usize { - let key: FileCacheKey = rng.random(); - let val = FileCacheEntry::dummy(); - writer.insert(hasher.hash_one(&key), (key, val), |(k,_)| hasher.hash_one(&k)); - } - b.iter(|| unsafe { writer.table.rehash_in_place( - &|table, index| hasher.hash_one(&table.bucket::<(FileCacheKey, FileCacheEntry)>(index).as_ref().0), - std::mem::size_of::<(FileCacheKey, FileCacheEntry)>(), - if std::mem::needs_drop::<(FileCacheKey, FileCacheEntry)>() { - Some(|ptr| std::ptr::drop_in_place(ptr as *mut (FileCacheKey, FileCacheEntry))) - } else { - None - }, - ) }); - }); - } - - group.finish(); + group.bench_function("real_rehash", |b| { + let size = 125_000_000; + let ideal_filled = 100_000_000; + let mut writer = HashMapInit::new_resizeable(size, size).attach_writer(); + let mut rng = rand::rng(); + while writer.get_num_buckets_in_use() < ideal_filled { + let key: FileCacheKey = rng.random(); + let val = FileCacheEntry::dummy(); + apply_op(TestOp(key, Some(val)), &mut writer); + } + b.iter(|| writer.shuffle()); + }); + + group.bench_function("real_rehash_hashbrown", |b| { + let size = 125_000_000; + let ideal_filled = 100_000_000; + let mut writer = hashbrown::raw::RawTable::new(); + let mut rng = rand::rng(); + let hasher = rustc_hash::FxBuildHasher::default(); + unsafe { + writer + .resize( + size, + |(k, _)| hasher.hash_one(&k), + hashbrown::raw::Fallibility::Infallible, + ) + .unwrap(); + } + while writer.len() < ideal_filled as usize { + let key: FileCacheKey = rng.random(); + let val = FileCacheEntry::dummy(); + writer.insert(hasher.hash_one(&key), (key, val), |(k, _)| { + hasher.hash_one(&k) + }); + } + b.iter(|| unsafe { + writer.table.rehash_in_place( + &|table, index| { + hasher.hash_one( + &table + .bucket::<(FileCacheKey, FileCacheEntry)>(index) + .as_ref() + .0, + ) + }, + std::mem::size_of::<(FileCacheKey, FileCacheEntry)>(), + if std::mem::needs_drop::<(FileCacheKey, FileCacheEntry)>() { + Some(|ptr| std::ptr::drop_in_place(ptr as *mut (FileCacheKey, FileCacheEntry))) + } else { + None + }, + ) + }); + }); + + for elems in [2, 4, 8, 16, 32, 64, 96, 112] { + group.bench_with_input( + BenchmarkId::new("real_rehash_varied", elems), + &elems, + |b, &size| { + let ideal_filled = size * 1_000_000; + let size = 125_000_000; + let mut writer = HashMapInit::new_resizeable(size, size).attach_writer(); + let mut rng = rand::rng(); + while writer.get_num_buckets_in_use() < ideal_filled as usize { + let key: FileCacheKey = rng.random(); + let val = FileCacheEntry::dummy(); + apply_op(TestOp(key, Some(val)), &mut writer); + } + b.iter(|| writer.shuffle()); + }, + ); + group.bench_with_input( + BenchmarkId::new("real_rehash_varied_hashbrown", elems), + &elems, + |b, &size| { + let ideal_filled = size * 1_000_000; + let size = 125_000_000; + let mut writer = hashbrown::raw::RawTable::new(); + let mut rng = rand::rng(); + let hasher = rustc_hash::FxBuildHasher::default(); + unsafe { + writer + .resize( + size, + |(k, _)| hasher.hash_one(&k), + hashbrown::raw::Fallibility::Infallible, + ) + .unwrap(); + } + while writer.len() < ideal_filled as usize { + let key: FileCacheKey = rng.random(); + let val = FileCacheEntry::dummy(); + writer.insert(hasher.hash_one(&key), (key, val), |(k, _)| { + hasher.hash_one(&k) + }); + } + b.iter(|| unsafe { + writer.table.rehash_in_place( + &|table, index| { + hasher.hash_one( + &table + .bucket::<(FileCacheKey, FileCacheEntry)>(index) + .as_ref() + .0, + ) + }, + std::mem::size_of::<(FileCacheKey, FileCacheEntry)>(), + if std::mem::needs_drop::<(FileCacheKey, FileCacheEntry)>() { + Some(|ptr| { + std::ptr::drop_in_place(ptr as *mut (FileCacheKey, FileCacheEntry)) + }) + } else { + None + }, + ) + }); + }, + ); + } + + group.finish(); } - + criterion_group!(benches, small_benchs, real_benchs); criterion_main!(benches); diff --git a/libs/neon-shmem/src/hash.rs b/libs/neon-shmem/src/hash.rs index f0d198af41..b4671ac1a6 100644 --- a/libs/neon-shmem/src/hash.rs +++ b/libs/neon-shmem/src/hash.rs @@ -1,7 +1,7 @@ //! Resizable hash table implementation on top of byte-level storage (either a [`ShmemHandle`] or a fixed byte array). //! //! This hash table has two major components: the bucket array and the dictionary. Each bucket within the -//! bucket array contains a `Option<(K, V)>` and an index of another bucket. In this way there is both an +//! bucket array contains a `Option<(K, V)>` and an index of another bucket. In this way there is both an //! implicit freelist within the bucket array (`None` buckets point to other `None` entries) and various hash //! chains within the bucket array (a Some bucket will point to other Some buckets that had the same hash). //! @@ -14,11 +14,11 @@ //! in-place and are at a high level achieved by expanding/reducing the bucket array and rebuilding the //! dictionary by rehashing all keys. -use std::hash::{Hash, BuildHasher}; +use std::hash::{BuildHasher, Hash}; use std::mem::MaybeUninit; -use crate::{shmem, sync::*}; use crate::shmem::ShmemHandle; +use crate::{shmem, sync::*}; mod core; pub mod entry; @@ -27,58 +27,58 @@ pub mod entry; mod tests; use core::{Bucket, CoreHashMap, INVALID_POS}; -use entry::{Entry, OccupiedEntry, VacantEntry, PrevPos}; +use entry::{Entry, OccupiedEntry, PrevPos, VacantEntry}; /// Builder for a [`HashMapAccess`]. #[must_use] pub struct HashMapInit<'a, K, V, S = rustc_hash::FxBuildHasher> { shmem_handle: Option, shared_ptr: *mut RwLock>, - shared_size: usize, - hasher: S, - num_buckets: u32, + shared_size: usize, + hasher: S, + num_buckets: u32, } -/// Accessor for a hash table. +/// Accessor for a hash table. pub struct HashMapAccess<'a, K, V, S = rustc_hash::FxBuildHasher> { shmem_handle: Option, shared_ptr: *mut HashMapShared<'a, K, V>, - hasher: S, + hasher: S, } unsafe impl Sync for HashMapAccess<'_, K, V, S> {} unsafe impl Send for HashMapAccess<'_, K, V, S> {} impl<'a, K: Clone + Hash + Eq, V, S> HashMapInit<'a, K, V, S> { - pub fn with_hasher(self, hasher: T) -> HashMapInit<'a, K, V, T> { - HashMapInit { - hasher, - shmem_handle: self.shmem_handle, - shared_ptr: self.shared_ptr, - shared_size: self.shared_size, - num_buckets: self.num_buckets, - } - } + pub fn with_hasher(self, hasher: T) -> HashMapInit<'a, K, V, T> { + HashMapInit { + hasher, + shmem_handle: self.shmem_handle, + shared_ptr: self.shared_ptr, + shared_size: self.shared_size, + num_buckets: self.num_buckets, + } + } - /// Loosely (over)estimate the size needed to store a hash table with `num_buckets` buckets. - pub fn estimate_size(num_buckets: u32) -> usize { + /// Loosely (over)estimate the size needed to store a hash table with `num_buckets` buckets. + pub fn estimate_size(num_buckets: u32) -> usize { // add some margin to cover alignment etc. CoreHashMap::::estimate_size(num_buckets) + size_of::>() + 1000 } - /// Initialize a table for writing. + /// Initialize a table for writing. pub fn attach_writer(self) -> HashMapAccess<'a, K, V, S> { let mut ptr: *mut u8 = self.shared_ptr.cast(); let end_ptr: *mut u8 = unsafe { ptr.add(self.shared_size) }; - // carve out area for the One Big Lock (TM) and the HashMapShared. - ptr = unsafe { ptr.add(ptr.align_offset(align_of::())) }; - let raw_lock_ptr = ptr; - ptr = unsafe { ptr.add(size_of::()) }; - ptr = unsafe { ptr.add(ptr.align_offset(align_of::>())) }; - let shared_ptr: *mut HashMapShared = ptr.cast(); + // carve out area for the One Big Lock (TM) and the HashMapShared. + ptr = unsafe { ptr.add(ptr.align_offset(align_of::())) }; + let raw_lock_ptr = ptr; + ptr = unsafe { ptr.add(size_of::()) }; + ptr = unsafe { ptr.add(ptr.align_offset(align_of::>())) }; + let shared_ptr: *mut HashMapShared = ptr.cast(); ptr = unsafe { ptr.add(size_of::>()) }; - + // carve out the buckets ptr = unsafe { ptr.byte_add(ptr.align_offset(align_of::>())) }; let buckets_ptr = ptr; @@ -91,26 +91,27 @@ impl<'a, K: Clone + Hash + Eq, V, S> HashMapInit<'a, K, V, S> { let dictionary_size = unsafe { end_ptr.byte_offset_from(ptr) / size_of::() as isize }; assert!(dictionary_size > 0); - let buckets = - unsafe { std::slice::from_raw_parts_mut(buckets_ptr.cast(), self.num_buckets as usize) }; + let buckets = unsafe { + std::slice::from_raw_parts_mut(buckets_ptr.cast(), self.num_buckets as usize) + }; let dictionary = unsafe { std::slice::from_raw_parts_mut(dictionary_ptr.cast(), dictionary_size as usize) }; let hashmap = CoreHashMap::new(buckets, dictionary); - let lock = RwLock::from_raw(PthreadRwLock::new(raw_lock_ptr.cast()), hashmap); - unsafe { - std::ptr::write(shared_ptr, lock); - } - + let lock = RwLock::from_raw(PthreadRwLock::new(raw_lock_ptr.cast()), hashmap); + unsafe { + std::ptr::write(shared_ptr, lock); + } + HashMapAccess { shmem_handle: self.shmem_handle, shared_ptr, - hasher: self.hasher, + hasher: self.hasher, } } - /// Initialize a table for reading. Currently identical to [`HashMapInit::attach_writer`]. + /// Initialize a table for reading. Currently identical to [`HashMapInit::attach_writer`]. pub fn attach_reader(self) -> HashMapAccess<'a, K, V, S> { self.attach_writer() } @@ -132,78 +133,75 @@ type HashMapShared<'a, K, V> = RwLock>; impl<'a, K, V> HashMapInit<'a, K, V, rustc_hash::FxBuildHasher> where - K: Clone + Hash + Eq + K: Clone + Hash + Eq, { - /// Place the hash table within a user-supplied fixed memory area. - pub fn with_fixed( - num_buckets: u32, - area: &'a mut [MaybeUninit], - ) -> Self { - Self { - num_buckets, - shmem_handle: None, - shared_ptr: area.as_mut_ptr().cast(), - shared_size: area.len(), - hasher: rustc_hash::FxBuildHasher, - } + /// Place the hash table within a user-supplied fixed memory area. + pub fn with_fixed(num_buckets: u32, area: &'a mut [MaybeUninit]) -> Self { + Self { + num_buckets, + shmem_handle: None, + shared_ptr: area.as_mut_ptr().cast(), + shared_size: area.len(), + hasher: rustc_hash::FxBuildHasher, + } } /// Place a new hash map in the given shared memory area - /// - /// # Panics - /// Will panic on failure to resize area to expected map size. + /// + /// # Panics + /// Will panic on failure to resize area to expected map size. pub fn with_shmem(num_buckets: u32, shmem: ShmemHandle) -> Self { - let size = Self::estimate_size(num_buckets); - shmem + let size = Self::estimate_size(num_buckets); + shmem .set_size(size) .expect("could not resize shared memory area"); - Self { - num_buckets, - shared_ptr: shmem.data_ptr.as_ptr().cast(), - shmem_handle: Some(shmem), - shared_size: size, - hasher: rustc_hash::FxBuildHasher - } + Self { + num_buckets, + shared_ptr: shmem.data_ptr.as_ptr().cast(), + shmem_handle: Some(shmem), + shared_size: size, + hasher: rustc_hash::FxBuildHasher, + } } - /// Make a resizable hash map within a new shared memory area with the given name. - pub fn new_resizeable_named(num_buckets: u32, max_buckets: u32, name: &str) -> Self { - let size = Self::estimate_size(num_buckets); - let max_size = Self::estimate_size(max_buckets); - let shmem = ShmemHandle::new(name, size, max_size) - .expect("failed to make shared memory area"); - - Self { - num_buckets, - shared_ptr: shmem.data_ptr.as_ptr().cast(), - shmem_handle: Some(shmem), - shared_size: size, - hasher: rustc_hash::FxBuildHasher - } - } + /// Make a resizable hash map within a new shared memory area with the given name. + pub fn new_resizeable_named(num_buckets: u32, max_buckets: u32, name: &str) -> Self { + let size = Self::estimate_size(num_buckets); + let max_size = Self::estimate_size(max_buckets); + let shmem = + ShmemHandle::new(name, size, max_size).expect("failed to make shared memory area"); - /// Make a resizable hash map within a new anonymous shared memory area. - pub fn new_resizeable(num_buckets: u32, max_buckets: u32) -> Self { - use std::sync::atomic::{AtomicUsize, Ordering}; - static COUNTER: AtomicUsize = AtomicUsize::new(0); - let val = COUNTER.fetch_add(1, Ordering::Relaxed); - let name = format!("neon_shmem_hmap{val}"); - Self::new_resizeable_named(num_buckets, max_buckets, &name) - } + Self { + num_buckets, + shared_ptr: shmem.data_ptr.as_ptr().cast(), + shmem_handle: Some(shmem), + shared_size: size, + hasher: rustc_hash::FxBuildHasher, + } + } + + /// Make a resizable hash map within a new anonymous shared memory area. + pub fn new_resizeable(num_buckets: u32, max_buckets: u32) -> Self { + use std::sync::atomic::{AtomicUsize, Ordering}; + static COUNTER: AtomicUsize = AtomicUsize::new(0); + let val = COUNTER.fetch_add(1, Ordering::Relaxed); + let name = format!("neon_shmem_hmap{val}"); + Self::new_resizeable_named(num_buckets, max_buckets, &name) + } } impl<'a, K, V, S: BuildHasher> HashMapAccess<'a, K, V, S> where K: Clone + Hash + Eq, { - /// Hash a key using the map's hasher. - #[inline] + /// Hash a key using the map's hasher. + #[inline] fn get_hash_value(&self, key: &K) -> u64 { - self.hasher.hash_one(key) + self.hasher.hash_one(key) } - fn entry_with_hash(&self, key: K, hash: u64) -> Entry<'a, '_, K, V> { - let mut map = unsafe { self.shared_ptr.as_ref() }.unwrap().write(); + fn entry_with_hash(&self, key: K, hash: u64) -> Entry<'a, '_, K, V> { + let mut map = unsafe { self.shared_ptr.as_ref() }.unwrap().write(); let dict_pos = hash as usize % map.dictionary.len(); let first = map.dictionary[dict_pos]; if first == INVALID_POS { @@ -241,71 +239,69 @@ where prev_pos = PrevPos::Chained(next); next = bucket.next; } - } - - /// Get a reference to the corresponding value for a key. + } + + /// Get a reference to the corresponding value for a key. pub fn get<'e>(&'e self, key: &K) -> Option> { - let hash = self.get_hash_value(key); + let hash = self.get_hash_value(key); let map = unsafe { self.shared_ptr.as_ref() }.unwrap().read(); - RwLockReadGuard::try_map(map, |m| m.get_with_hash(key, hash)).ok() + RwLockReadGuard::try_map(map, |m| m.get_with_hash(key, hash)).ok() } - /// Get a reference to the entry containing a key. + /// Get a reference to the entry containing a key. pub fn entry(&self, key: K) -> Entry<'a, '_, K, V> { - let hash = self.get_hash_value(&key); - self.entry_with_hash(key, hash) + let hash = self.get_hash_value(&key); + self.entry_with_hash(key, hash) } - /// Remove a key given its hash. Returns the associated value if it existed. + /// Remove a key given its hash. Returns the associated value if it existed. pub fn remove(&self, key: &K) -> Option { - let hash = self.get_hash_value(&key); + let hash = self.get_hash_value(&key); match self.entry_with_hash(key.clone(), hash) { Entry::Occupied(e) => Some(e.remove()), - Entry::Vacant(_) => None + Entry::Vacant(_) => None, } } - /// Insert/update a key. Returns the previous associated value if it existed. - /// - /// # Errors - /// Will return [`core::FullError`] if there is no more space left in the map. + /// Insert/update a key. Returns the previous associated value if it existed. + /// + /// # Errors + /// Will return [`core::FullError`] if there is no more space left in the map. pub fn insert(&self, key: K, value: V) -> Result, core::FullError> { - let hash = self.get_hash_value(&key); + let hash = self.get_hash_value(&key); match self.entry_with_hash(key.clone(), hash) { Entry::Occupied(mut e) => Ok(Some(e.insert(value))), Entry::Vacant(e) => { - _ = e.insert(value)?; - Ok(None) - } + _ = e.insert(value)?; + Ok(None) + } } } - - /// Optionally return the entry for a bucket at a given index if it exists. - /// - /// Has more overhead than one would intuitively expect: performs both a clone of the key - /// due to the [`OccupiedEntry`] type owning the key and also a hash of the key in order - /// to enable repairing the hash chain if the entry is removed. + + /// Optionally return the entry for a bucket at a given index if it exists. + /// + /// Has more overhead than one would intuitively expect: performs both a clone of the key + /// due to the [`OccupiedEntry`] type owning the key and also a hash of the key in order + /// to enable repairing the hash chain if the entry is removed. pub fn entry_at_bucket(&self, pos: usize) -> Option> { let map = unsafe { self.shared_ptr.as_mut() }.unwrap().write(); - if pos >= map.buckets.len() { - return None; - } + if pos >= map.buckets.len() { + return None; + } - let entry = map.buckets[pos].inner.as_ref(); - match entry { - Some((key, _)) => Some(OccupiedEntry { - _key: key.clone(), - bucket_pos: pos as u32, - prev_pos: entry::PrevPos::Unknown( - self.get_hash_value(&key) - ), - map, - }), - _ => None, - } + let entry = map.buckets[pos].inner.as_ref(); + match entry { + Some((key, _)) => Some(OccupiedEntry { + _key: key.clone(), + bucket_pos: pos as u32, + prev_pos: entry::PrevPos::Unknown(self.get_hash_value(&key)), + map, + }), + _ => None, + } } - /// Returns the number of buckets in the table. + /// Returns the number of buckets in the table. pub fn get_num_buckets(&self) -> usize { let map = unsafe { self.shared_ptr.as_ref() }.unwrap().read(); map.get_num_buckets() @@ -313,18 +309,18 @@ where /// Return the key and value stored in bucket with given index. This can be used to /// iterate through the hash map. - // TODO: An Iterator might be nicer. The communicator's clock algorithm needs to - // _slowly_ iterate through all buckets with its clock hand, without holding a lock. - // If we switch to an Iterator, it must not hold the lock. + // TODO: An Iterator might be nicer. The communicator's clock algorithm needs to + // _slowly_ iterate through all buckets with its clock hand, without holding a lock. + // If we switch to an Iterator, it must not hold the lock. pub fn get_at_bucket(&self, pos: usize) -> Option> { let map = unsafe { self.shared_ptr.as_ref() }.unwrap().read(); if pos >= map.buckets.len() { return None; } - RwLockReadGuard::try_map(map, |m| m.buckets[pos].inner.as_ref()).ok() + RwLockReadGuard::try_map(map, |m| m.buckets[pos].inner.as_ref()).ok() } - /// Returns the index of the bucket a given value corresponds to. + /// Returns the index of the bucket a given value corresponds to. pub fn get_bucket_for_value(&self, val_ptr: *const V) -> usize { let map = unsafe { self.shared_ptr.as_ref() }.unwrap().read(); @@ -341,25 +337,25 @@ where map.buckets_in_use as usize } - /// Clears all entries in a table. Does not reset any shrinking operations. - pub fn clear(&self) { - let mut map = unsafe { self.shared_ptr.as_mut() }.unwrap().write(); + /// Clears all entries in a table. Does not reset any shrinking operations. + pub fn clear(&self) { + let mut map = unsafe { self.shared_ptr.as_mut() }.unwrap().write(); map.clear(); - } - - /// Perform an in-place rehash of some region (0..`rehash_buckets`) of the table and reset - /// the `buckets` and `dictionary` slices to be as long as `num_buckets`. Resets the freelist - /// in the process. - fn rehash_dict( - &self, - inner: &mut CoreHashMap<'a, K, V>, - buckets_ptr: *mut core::Bucket, - end_ptr: *mut u8, - num_buckets: u32, - rehash_buckets: u32, - ) { - inner.free_head = INVALID_POS; - + } + + /// Perform an in-place rehash of some region (0..`rehash_buckets`) of the table and reset + /// the `buckets` and `dictionary` slices to be as long as `num_buckets`. Resets the freelist + /// in the process. + fn rehash_dict( + &self, + inner: &mut CoreHashMap<'a, K, V>, + buckets_ptr: *mut core::Bucket, + end_ptr: *mut u8, + num_buckets: u32, + rehash_buckets: u32, + ) { + inner.free_head = INVALID_POS; + let buckets; let dictionary; unsafe { @@ -372,19 +368,19 @@ where buckets = std::slice::from_raw_parts_mut(buckets_ptr, num_buckets as usize); dictionary = std::slice::from_raw_parts_mut(dictionary_ptr, dictionary_size); - } + } for e in dictionary.iter_mut() { *e = INVALID_POS; } - + for (i, bucket) in buckets.iter_mut().enumerate().take(rehash_buckets as usize) { if bucket.inner.is_none() { - bucket.next = inner.free_head; + bucket.next = inner.free_head; inner.free_head = i as u32; - continue; + continue; } - let hash = self.hasher.hash_one(&bucket.inner.as_ref().unwrap().0); + let hash = self.hasher.hash_one(&bucket.inner.as_ref().unwrap().0); let pos: usize = (hash % dictionary.len() as u64) as usize; bucket.next = dictionary[pos]; dictionary[pos] = i as u32; @@ -392,34 +388,37 @@ where inner.dictionary = dictionary; inner.buckets = buckets; - } + } - /// Rehash the map without growing or shrinking. - pub fn shuffle(&self) { - let mut map = unsafe { self.shared_ptr.as_mut() }.unwrap().write(); - let num_buckets = map.get_num_buckets() as u32; - let size_bytes = HashMapInit::::estimate_size(num_buckets); - let end_ptr: *mut u8 = unsafe { self.shared_ptr.byte_add(size_bytes).cast() }; + /// Rehash the map without growing or shrinking. + pub fn shuffle(&self) { + let mut map = unsafe { self.shared_ptr.as_mut() }.unwrap().write(); + let num_buckets = map.get_num_buckets() as u32; + let size_bytes = HashMapInit::::estimate_size(num_buckets); + let end_ptr: *mut u8 = unsafe { self.shared_ptr.byte_add(size_bytes).cast() }; let buckets_ptr = map.buckets.as_mut_ptr(); - self.rehash_dict(&mut map, buckets_ptr, end_ptr, num_buckets, num_buckets); - } + self.rehash_dict(&mut map, buckets_ptr, end_ptr, num_buckets, num_buckets); + } - /// Grow the number of buckets within the table. + /// Grow the number of buckets within the table. /// /// 1. Grows the underlying shared memory area /// 2. Initializes new buckets and overwrites the current dictionary /// 3. Rehashes the dictionary - /// - /// # Panics - /// Panics if called on a map initialized with [`HashMapInit::with_fixed`]. - /// - /// # Errors - /// Returns an [`shmem::Error`] if any errors occur resizing the memory region. + /// + /// # Panics + /// Panics if called on a map initialized with [`HashMapInit::with_fixed`]. + /// + /// # Errors + /// Returns an [`shmem::Error`] if any errors occur resizing the memory region. pub fn grow(&self, num_buckets: u32) -> Result<(), shmem::Error> { let mut map = unsafe { self.shared_ptr.as_mut() }.unwrap().write(); let old_num_buckets = map.buckets.len() as u32; - assert!(num_buckets >= old_num_buckets, "grow called with a smaller number of buckets"); + assert!( + num_buckets >= old_num_buckets, + "grow called with a smaller number of buckets" + ); if num_buckets == old_num_buckets { return Ok(()); } @@ -433,13 +432,13 @@ where let end_ptr: *mut u8 = unsafe { shmem_handle.data_ptr.as_ptr().add(size_bytes) }; // Initialize new buckets. The new buckets are linked to the free list. - // NB: This overwrites the dictionary! + // NB: This overwrites the dictionary! let buckets_ptr = map.buckets.as_mut_ptr(); unsafe { for i in old_num_buckets..num_buckets { let bucket = buckets_ptr.add(i as usize); bucket.write(core::Bucket { - next: if i < num_buckets-1 { + next: if i < num_buckets - 1 { i + 1 } else { map.free_head @@ -449,86 +448,90 @@ where } } - self.rehash_dict(&mut map, buckets_ptr, end_ptr, num_buckets, old_num_buckets); + self.rehash_dict(&mut map, buckets_ptr, end_ptr, num_buckets, old_num_buckets); map.free_head = old_num_buckets; Ok(()) } - /// Begin a shrink, limiting all new allocations to be in buckets with index below `num_buckets`. - /// - /// # Panics - /// Panics if called on a map initialized with [`HashMapInit::with_fixed`] or if `num_buckets` is - /// greater than the number of buckets in the map. - pub fn begin_shrink(&mut self, num_buckets: u32) { - let mut map = unsafe { self.shared_ptr.as_mut() }.unwrap().write(); - assert!( - num_buckets <= map.get_num_buckets() as u32, + /// Begin a shrink, limiting all new allocations to be in buckets with index below `num_buckets`. + /// + /// # Panics + /// Panics if called on a map initialized with [`HashMapInit::with_fixed`] or if `num_buckets` is + /// greater than the number of buckets in the map. + pub fn begin_shrink(&mut self, num_buckets: u32) { + let mut map = unsafe { self.shared_ptr.as_mut() }.unwrap().write(); + assert!( + num_buckets <= map.get_num_buckets() as u32, "shrink called with a larger number of buckets" ); - _ = self + _ = self .shmem_handle .as_ref() .expect("shrink called on a fixed-size hash table"); - map.alloc_limit = num_buckets; - } + map.alloc_limit = num_buckets; + } - /// If a shrink operation is underway, returns the target size of the map. Otherwise, returns None. - pub fn shrink_goal(&self) -> Option { - let map = unsafe { self.shared_ptr.as_mut() }.unwrap().read(); + /// If a shrink operation is underway, returns the target size of the map. Otherwise, returns None. + pub fn shrink_goal(&self) -> Option { + let map = unsafe { self.shared_ptr.as_mut() }.unwrap().read(); let goal = map.alloc_limit; - if goal == INVALID_POS { None } else { Some(goal as usize) } - } - - /// Complete a shrink after caller has evicted entries, removing the unused buckets and rehashing. - /// - /// # Panics - /// The following cases result in a panic: - /// - Calling this function on a map initialized with [`HashMapInit::with_fixed`]. - /// - Calling this function on a map when no shrink operation is in progress. - /// - Calling this function on a map with `shrink_mode` set to [`HashMapShrinkMode::Remap`] and - /// there are more buckets in use than the value returned by [`HashMapAccess::shrink_goal`]. - /// - /// # Errors - /// Returns an [`shmem::Error`] if any errors occur resizing the memory region. - pub fn finish_shrink(&self) -> Result<(), shmem::Error> { - let mut map = unsafe { self.shared_ptr.as_mut() }.unwrap().write(); - assert!( - map.alloc_limit != INVALID_POS, - "called finish_shrink when no shrink is in progress" - ); + if goal == INVALID_POS { + None + } else { + Some(goal as usize) + } + } - let num_buckets = map.alloc_limit; + /// Complete a shrink after caller has evicted entries, removing the unused buckets and rehashing. + /// + /// # Panics + /// The following cases result in a panic: + /// - Calling this function on a map initialized with [`HashMapInit::with_fixed`]. + /// - Calling this function on a map when no shrink operation is in progress. + /// - Calling this function on a map with `shrink_mode` set to [`HashMapShrinkMode::Remap`] and + /// there are more buckets in use than the value returned by [`HashMapAccess::shrink_goal`]. + /// + /// # Errors + /// Returns an [`shmem::Error`] if any errors occur resizing the memory region. + pub fn finish_shrink(&self) -> Result<(), shmem::Error> { + let mut map = unsafe { self.shared_ptr.as_mut() }.unwrap().write(); + assert!( + map.alloc_limit != INVALID_POS, + "called finish_shrink when no shrink is in progress" + ); - if map.get_num_buckets() == num_buckets as usize { + let num_buckets = map.alloc_limit; + + if map.get_num_buckets() == num_buckets as usize { return Ok(()); } - assert!( - map.buckets_in_use <= num_buckets, - "called finish_shrink before enough entries were removed" - ); - - for i in (num_buckets as usize)..map.buckets.len() { - if let Some((k, v)) = map.buckets[i].inner.take() { - // alloc_bucket increases count, so need to decrease since we're just moving - map.buckets_in_use -= 1; - map.alloc_bucket(k, v).unwrap(); - } - } + assert!( + map.buckets_in_use <= num_buckets, + "called finish_shrink before enough entries were removed" + ); + + for i in (num_buckets as usize)..map.buckets.len() { + if let Some((k, v)) = map.buckets[i].inner.take() { + // alloc_bucket increases count, so need to decrease since we're just moving + map.buckets_in_use -= 1; + map.alloc_bucket(k, v).unwrap(); + } + } let shmem_handle = self .shmem_handle .as_ref() .expect("shrink called on a fixed-size hash table"); - let size_bytes = HashMapInit::::estimate_size(num_buckets); + let size_bytes = HashMapInit::::estimate_size(num_buckets); shmem_handle.set_size(size_bytes)?; let end_ptr: *mut u8 = unsafe { shmem_handle.data_ptr.as_ptr().add(size_bytes) }; - let buckets_ptr = map.buckets.as_mut_ptr(); - self.rehash_dict(&mut map, buckets_ptr, end_ptr, num_buckets, num_buckets); - map.alloc_limit = INVALID_POS; - - Ok(()) - } + let buckets_ptr = map.buckets.as_mut_ptr(); + self.rehash_dict(&mut map, buckets_ptr, end_ptr, num_buckets, num_buckets); + map.alloc_limit = INVALID_POS; + + Ok(()) + } } diff --git a/libs/neon-shmem/src/hash/core.rs b/libs/neon-shmem/src/hash/core.rs index aea89358df..013eb9a09c 100644 --- a/libs/neon-shmem/src/hash/core.rs +++ b/libs/neon-shmem/src/hash/core.rs @@ -11,26 +11,26 @@ pub(crate) const INVALID_POS: u32 = u32::MAX; /// Fundamental storage unit within the hash table. Either empty or contains a key-value pair. /// Always part of a chain of some kind (either a freelist if empty or a hash chain if full). pub(crate) struct Bucket { - /// Index of next bucket in the chain. - pub(crate) next: u32, - /// Key-value pair contained within bucket. + /// Index of next bucket in the chain. + pub(crate) next: u32, + /// Key-value pair contained within bucket. pub(crate) inner: Option<(K, V)>, } /// Core hash table implementation. pub(crate) struct CoreHashMap<'a, K, V> { - /// Dictionary used to map hashes to bucket indices. + /// Dictionary used to map hashes to bucket indices. pub(crate) dictionary: &'a mut [u32], - /// Buckets containing key-value pairs. + /// Buckets containing key-value pairs. pub(crate) buckets: &'a mut [Bucket], - /// Head of the freelist. + /// Head of the freelist. pub(crate) free_head: u32, - /// Maximum index of a bucket allowed to be allocated. [`INVALID_POS`] if no limit. - pub(crate) alloc_limit: u32, + /// Maximum index of a bucket allowed to be allocated. [`INVALID_POS`] if no limit. + pub(crate) alloc_limit: u32, /// The number of currently occupied buckets. pub(crate) buckets_in_use: u32, - // pub(crate) lock: libc::pthread_mutex_t, - // Unclear what the purpose of this is. + // pub(crate) lock: libc::pthread_mutex_t, + // Unclear what the purpose of this is. pub(crate) _user_list_head: u32, } @@ -41,7 +41,7 @@ pub struct FullError(); impl<'a, K: Clone + Hash + Eq, V> CoreHashMap<'a, K, V> { const FILL_FACTOR: f32 = 0.60; - /// Estimate the size of data contained within the the hash map. + /// Estimate the size of data contained within the the hash map. pub fn estimate_size(num_buckets: u32) -> usize { let mut size = 0; @@ -53,7 +53,7 @@ impl<'a, K: Clone + Hash + Eq, V> CoreHashMap<'a, K, V> { as usize; size - } + } pub fn new( buckets: &'a mut [MaybeUninit>], @@ -66,7 +66,7 @@ impl<'a, K: Clone + Hash + Eq, V> CoreHashMap<'a, K, V> { i as u32 + 1 } else { INVALID_POS - }, + }, inner: None, }); } @@ -89,11 +89,11 @@ impl<'a, K: Clone + Hash + Eq, V> CoreHashMap<'a, K, V> { free_head: 0, buckets_in_use: 0, _user_list_head: INVALID_POS, - alloc_limit: INVALID_POS, + alloc_limit: INVALID_POS, } } - /// Get the value associated with a key (if it exists) given its hash. + /// Get the value associated with a key (if it exists) given its hash. pub fn get_with_hash(&self, key: &K, hash: u64) -> Option<&V> { let mut next = self.dictionary[hash as usize % self.dictionary.len()]; loop { @@ -110,22 +110,22 @@ impl<'a, K: Clone + Hash + Eq, V> CoreHashMap<'a, K, V> { } } - /// Get number of buckets in map. + /// Get number of buckets in map. pub fn get_num_buckets(&self) -> usize { self.buckets.len() } - /// Clears all entries from the hashmap. - /// - /// Does not reset any allocation limits, but does clear any entries beyond them. - pub fn clear(&mut self) { - for i in 0..self.buckets.len() { + /// Clears all entries from the hashmap. + /// + /// Does not reset any allocation limits, but does clear any entries beyond them. + pub fn clear(&mut self) { + for i in 0..self.buckets.len() { self.buckets[i] = Bucket { next: if i < self.buckets.len() - 1 { i as u32 + 1 } else { INVALID_POS - }, + }, inner: None, } } @@ -133,45 +133,46 @@ impl<'a, K: Clone + Hash + Eq, V> CoreHashMap<'a, K, V> { self.dictionary[i] = INVALID_POS; } - self.free_head = 0; - self.buckets_in_use = 0; - } + self.free_head = 0; + self.buckets_in_use = 0; + } - /// Find the position of an unused bucket via the freelist and initialize it. + /// Find the position of an unused bucket via the freelist and initialize it. pub(crate) fn alloc_bucket(&mut self, key: K, value: V) -> Result { let mut pos = self.free_head; - // Find the first bucket we're *allowed* to use. - let mut prev = PrevPos::First(self.free_head); - while pos != INVALID_POS && pos >= self.alloc_limit { - let bucket = &mut self.buckets[pos as usize]; - prev = PrevPos::Chained(pos); - pos = bucket.next; - } - if pos == INVALID_POS { - return Err(FullError()); - } + // Find the first bucket we're *allowed* to use. + let mut prev = PrevPos::First(self.free_head); + while pos != INVALID_POS && pos >= self.alloc_limit { + let bucket = &mut self.buckets[pos as usize]; + prev = PrevPos::Chained(pos); + pos = bucket.next; + } + if pos == INVALID_POS { + return Err(FullError()); + } - // Repair the freelist. - match prev { - PrevPos::First(_) => { - let next_pos = self.buckets[pos as usize].next; - self.free_head = next_pos; - } - PrevPos::Chained(p) => if p != INVALID_POS { - let next_pos = self.buckets[pos as usize].next; - self.buckets[p as usize].next = next_pos; - }, - _ => unreachable!() - } + // Repair the freelist. + match prev { + PrevPos::First(_) => { + let next_pos = self.buckets[pos as usize].next; + self.free_head = next_pos; + } + PrevPos::Chained(p) => { + if p != INVALID_POS { + let next_pos = self.buckets[pos as usize].next; + self.buckets[p as usize].next = next_pos; + } + } + _ => unreachable!(), + } - // Initialize the bucket. - let bucket = &mut self.buckets[pos as usize]; - self.buckets_in_use += 1; + // Initialize the bucket. + let bucket = &mut self.buckets[pos as usize]; + self.buckets_in_use += 1; bucket.next = INVALID_POS; bucket.inner = Some((key, value)); Ok(pos) } } - diff --git a/libs/neon-shmem/src/hash/entry.rs b/libs/neon-shmem/src/hash/entry.rs index a5832665aa..cc7e48652a 100644 --- a/libs/neon-shmem/src/hash/entry.rs +++ b/libs/neon-shmem/src/hash/entry.rs @@ -6,31 +6,30 @@ use crate::sync::{RwLockWriteGuard, ValueWriteGuard}; use std::hash::Hash; use std::mem; - pub enum Entry<'a, 'b, K, V> { - Occupied(OccupiedEntry<'a, 'b, K, V>), + Occupied(OccupiedEntry<'a, 'b, K, V>), Vacant(VacantEntry<'a, 'b, K, V>), } /// Enum representing the previous position within a chain. #[derive(Clone, Copy)] pub(crate) enum PrevPos { - /// Starting index within the dictionary. + /// Starting index within the dictionary. First(u32), - /// Regular index within the buckets. + /// Regular index within the buckets. Chained(u32), - /// Unknown - e.g. the associated entry was retrieved by index instead of chain. - Unknown(u64), + /// Unknown - e.g. the associated entry was retrieved by index instead of chain. + Unknown(u64), } pub struct OccupiedEntry<'a, 'b, K, V> { - /// Mutable reference to the map containing this entry. - pub(crate) map: RwLockWriteGuard<'b, CoreHashMap<'a, K, V>>, - /// The key of the occupied entry + /// Mutable reference to the map containing this entry. + pub(crate) map: RwLockWriteGuard<'b, CoreHashMap<'a, K, V>>, + /// The key of the occupied entry pub(crate) _key: K, - /// The index of the previous entry in the chain. + /// The index of the previous entry in the chain. pub(crate) prev_pos: PrevPos, - /// The position of the bucket in the [`CoreHashMap`] bucket array. + /// The position of the bucket in the [`CoreHashMap`] bucket array. pub(crate) bucket_pos: u32, } @@ -51,56 +50,56 @@ impl OccupiedEntry<'_, '_, K, V> { .1 } - /// Inserts a value into the entry, replacing (and returning) the existing value. + /// Inserts a value into the entry, replacing (and returning) the existing value. pub fn insert(&mut self, value: V) -> V { let bucket = &mut self.map.buckets[self.bucket_pos as usize]; // This assumes inner is Some, which it must be for an OccupiedEntry mem::replace(&mut bucket.inner.as_mut().unwrap().1, value) } - /// Removes the entry from the hash map, returning the value originally stored within it. - /// - /// This may result in multiple bucket accesses if the entry was obtained by index as the - /// previous chain entry needs to be discovered in this case. - /// - /// # Panics - /// Panics if the `prev_pos` field is equal to [`PrevPos::Unknown`]. In practice, this means - /// the entry was obtained via calling something like [`CoreHashMap::entry_at_bucket`]. + /// Removes the entry from the hash map, returning the value originally stored within it. + /// + /// This may result in multiple bucket accesses if the entry was obtained by index as the + /// previous chain entry needs to be discovered in this case. + /// + /// # Panics + /// Panics if the `prev_pos` field is equal to [`PrevPos::Unknown`]. In practice, this means + /// the entry was obtained via calling something like [`CoreHashMap::entry_at_bucket`]. pub fn remove(mut self) -> V { - // If this bucket was queried by index, go ahead and follow its chain from the start. - let prev = if let PrevPos::Unknown(hash) = self.prev_pos { - let dict_idx = hash as usize % self.map.dictionary.len(); - let mut prev = PrevPos::First(dict_idx as u32); - let mut curr = self.map.dictionary[dict_idx]; - while curr != self.bucket_pos { - curr = self.map.buckets[curr as usize].next; - prev = PrevPos::Chained(curr); - } - prev - } else { - self.prev_pos - }; - + // If this bucket was queried by index, go ahead and follow its chain from the start. + let prev = if let PrevPos::Unknown(hash) = self.prev_pos { + let dict_idx = hash as usize % self.map.dictionary.len(); + let mut prev = PrevPos::First(dict_idx as u32); + let mut curr = self.map.dictionary[dict_idx]; + while curr != self.bucket_pos { + curr = self.map.buckets[curr as usize].next; + prev = PrevPos::Chained(curr); + } + prev + } else { + self.prev_pos + }; + // CoreHashMap::remove returns Option<(K, V)>. We know it's Some for an OccupiedEntry. let bucket = &mut self.map.buckets[self.bucket_pos as usize]; - + // unlink it from the chain match prev { PrevPos::First(dict_pos) => { - self.map.dictionary[dict_pos as usize] = bucket.next; - }, + self.map.dictionary[dict_pos as usize] = bucket.next; + } PrevPos::Chained(bucket_pos) => { - // println!("we think prev of {} is {bucket_pos}", self.bucket_pos); + // println!("we think prev of {} is {bucket_pos}", self.bucket_pos); self.map.buckets[bucket_pos as usize].next = bucket.next; - }, - _ => unreachable!(), + } + _ => unreachable!(), } // and add it to the freelist - let free = self.map.free_head; + let free = self.map.free_head; let bucket = &mut self.map.buckets[self.bucket_pos as usize]; let old_value = bucket.inner.take(); - bucket.next = free; + bucket.next = free; self.map.free_head = self.bucket_pos; self.map.buckets_in_use -= 1; @@ -110,19 +109,19 @@ impl OccupiedEntry<'_, '_, K, V> { /// An abstract view into a vacant entry within the map. pub struct VacantEntry<'a, 'b, K, V> { - /// Mutable reference to the map containing this entry. - pub(crate) map: RwLockWriteGuard<'b, CoreHashMap<'a, K, V>>, - /// The key to be inserted into this entry. + /// Mutable reference to the map containing this entry. + pub(crate) map: RwLockWriteGuard<'b, CoreHashMap<'a, K, V>>, + /// The key to be inserted into this entry. pub(crate) key: K, - /// The position within the dictionary corresponding to the key's hash. + /// The position within the dictionary corresponding to the key's hash. pub(crate) dict_pos: u32, } impl<'b, K: Clone + Hash + Eq, V> VacantEntry<'_, 'b, K, V> { - /// Insert a value into the vacant entry, finding and populating an empty bucket in the process. - /// - /// # Errors - /// Will return [`FullError`] if there are no unoccupied buckets in the map. + /// Insert a value into the vacant entry, finding and populating an empty bucket in the process. + /// + /// # Errors + /// Will return [`FullError`] if there are no unoccupied buckets in the map. pub fn insert(mut self, value: V) -> Result, FullError> { let pos = self.map.alloc_bucket(self.key, value)?; if pos == INVALID_POS { @@ -131,9 +130,8 @@ impl<'b, K: Clone + Hash + Eq, V> VacantEntry<'_, 'b, K, V> { self.map.buckets[pos as usize].next = self.map.dictionary[self.dict_pos as usize]; self.map.dictionary[self.dict_pos as usize] = pos; - Ok(RwLockWriteGuard::map( - self.map, - |m| &mut m.buckets[pos as usize].inner.as_mut().unwrap().1 - )) + Ok(RwLockWriteGuard::map(self.map, |m| { + &mut m.buckets[pos as usize].inner.as_mut().unwrap().1 + })) } } diff --git a/libs/neon-shmem/src/hash/tests.rs b/libs/neon-shmem/src/hash/tests.rs index 5eeb18c0a1..aee47a0b3e 100644 --- a/libs/neon-shmem/src/hash/tests.rs +++ b/libs/neon-shmem/src/hash/tests.rs @@ -3,9 +3,9 @@ use std::collections::HashSet; use std::fmt::Debug; use std::mem::MaybeUninit; +use crate::hash::Entry; use crate::hash::HashMapAccess; use crate::hash::HashMapInit; -use crate::hash::Entry; use crate::hash::core::FullError; use rand::seq::SliceRandom; @@ -35,20 +35,21 @@ impl<'a> From<&'a [u8]> for TestKey { } } -fn test_inserts + Copy>(keys: &[K]) { - let w = HashMapInit::::new_resizeable_named( - 100000, 120000, "test_inserts" - ).attach_writer(); +fn test_inserts + Copy>(keys: &[K]) { + let w = HashMapInit::::new_resizeable_named(100000, 120000, "test_inserts") + .attach_writer(); for (idx, k) in keys.iter().enumerate() { - let res = w.entry((*k).into()); - match res { - Entry::Occupied(mut e) => { e.insert(idx); } - Entry::Vacant(e) => { - let res = e.insert(idx); - assert!(res.is_ok()); - }, - }; + let res = w.entry((*k).into()); + match res { + Entry::Occupied(mut e) => { + e.insert(idx); + } + Entry::Vacant(e) => { + let res = e.insert(idx); + assert!(res.is_ok()); + } + }; } for (idx, k) in keys.iter().enumerate() { @@ -109,79 +110,85 @@ fn apply_op( shadow.remove(&op.0) }; - let entry = map.entry(op.0); + let entry = map.entry(op.0); let hash_existing = match op.1 { - Some(new) => { - match entry { - Entry::Occupied(mut e) => Some(e.insert(new)), - Entry::Vacant(e) => { _ = e.insert(new).unwrap(); None }, - } - }, - None => { - match entry { - Entry::Occupied(e) => Some(e.remove()), - Entry::Vacant(_) => None, - } - }, - }; + Some(new) => match entry { + Entry::Occupied(mut e) => Some(e.insert(new)), + Entry::Vacant(e) => { + _ = e.insert(new).unwrap(); + None + } + }, + None => match entry { + Entry::Occupied(e) => Some(e.remove()), + Entry::Vacant(_) => None, + }, + }; - assert_eq!(shadow_existing, hash_existing); + assert_eq!(shadow_existing, hash_existing); } fn do_random_ops( - num_ops: usize, - size: u32, - del_prob: f64, - writer: &mut HashMapAccess, - shadow: &mut BTreeMap, - rng: &mut rand::rngs::ThreadRng, + num_ops: usize, + size: u32, + del_prob: f64, + writer: &mut HashMapAccess, + shadow: &mut BTreeMap, + rng: &mut rand::rngs::ThreadRng, ) { - for i in 0..num_ops { + for i in 0..num_ops { let key: TestKey = ((rng.next_u32() % size) as u128).into(); - let op = TestOp(key, if rng.random_bool(del_prob) { Some(i) } else { None }); + let op = TestOp( + key, + if rng.random_bool(del_prob) { + Some(i) + } else { + None + }, + ); apply_op(&op, writer, shadow); } } fn do_deletes( - num_ops: usize, - writer: &mut HashMapAccess, - shadow: &mut BTreeMap, + num_ops: usize, + writer: &mut HashMapAccess, + shadow: &mut BTreeMap, ) { - for _ in 0..num_ops { - let (k, _) = shadow.pop_first().unwrap(); - writer.remove(&k); - } + for _ in 0..num_ops { + let (k, _) = shadow.pop_first().unwrap(); + writer.remove(&k); + } } fn do_shrink( - writer: &mut HashMapAccess, - shadow: &mut BTreeMap, - to: u32 + writer: &mut HashMapAccess, + shadow: &mut BTreeMap, + to: u32, ) { - assert!(writer.shrink_goal().is_none()); - writer.begin_shrink(to); - assert_eq!(writer.shrink_goal(), Some(to as usize)); - while writer.get_num_buckets_in_use() > to as usize { - let (k, _) = shadow.pop_first().unwrap(); - let entry = writer.entry(k); - if let Entry::Occupied(e) = entry { - e.remove(); - } - } - let old_usage = writer.get_num_buckets_in_use(); - writer.finish_shrink().unwrap(); - assert!(writer.shrink_goal().is_none()); - assert_eq!(writer.get_num_buckets_in_use(), old_usage); + assert!(writer.shrink_goal().is_none()); + writer.begin_shrink(to); + assert_eq!(writer.shrink_goal(), Some(to as usize)); + while writer.get_num_buckets_in_use() > to as usize { + let (k, _) = shadow.pop_first().unwrap(); + let entry = writer.entry(k); + if let Entry::Occupied(e) = entry { + e.remove(); + } + } + let old_usage = writer.get_num_buckets_in_use(); + writer.finish_shrink().unwrap(); + assert!(writer.shrink_goal().is_none()); + assert_eq!(writer.get_num_buckets_in_use(), old_usage); } #[test] fn random_ops() { - let mut writer = HashMapInit::::new_resizeable_named( - 100000, 120000, "test_random" - ).attach_writer(); + let mut writer = + HashMapInit::::new_resizeable_named(100000, 120000, "test_random") + .attach_writer(); let mut shadow: std::collections::BTreeMap = BTreeMap::new(); - + let distribution = Zipf::new(u128::MAX as f64, 1.1).unwrap(); let mut rng = rand::rng(); for i in 0..100000 { @@ -193,234 +200,230 @@ fn random_ops() { } } - #[test] fn test_shuffle() { - let mut writer = HashMapInit::::new_resizeable_named( - 1000, 1200, "test_shuf" - ).attach_writer(); + let mut writer = HashMapInit::::new_resizeable_named(1000, 1200, "test_shuf") + .attach_writer(); let mut shadow: std::collections::BTreeMap = BTreeMap::new(); let mut rng = rand::rng(); do_random_ops(10000, 1000, 0.75, &mut writer, &mut shadow, &mut rng); writer.shuffle(); - do_random_ops(10000, 1000, 0.75, &mut writer, &mut shadow, &mut rng); + do_random_ops(10000, 1000, 0.75, &mut writer, &mut shadow, &mut rng); } #[test] fn test_grow() { - let mut writer = HashMapInit::::new_resizeable_named( - 1000, 2000, "test_grow" - ).attach_writer(); + let mut writer = HashMapInit::::new_resizeable_named(1000, 2000, "test_grow") + .attach_writer(); let mut shadow: std::collections::BTreeMap = BTreeMap::new(); let mut rng = rand::rng(); do_random_ops(10000, 1000, 0.75, &mut writer, &mut shadow, &mut rng); - let old_usage = writer.get_num_buckets_in_use(); + let old_usage = writer.get_num_buckets_in_use(); writer.grow(1500).unwrap(); - assert_eq!(writer.get_num_buckets_in_use(), old_usage); - assert_eq!(writer.get_num_buckets(), 1500); - do_random_ops(10000, 1500, 0.75, &mut writer, &mut shadow, &mut rng); + assert_eq!(writer.get_num_buckets_in_use(), old_usage); + assert_eq!(writer.get_num_buckets(), 1500); + do_random_ops(10000, 1500, 0.75, &mut writer, &mut shadow, &mut rng); } #[test] fn test_clear() { - let mut writer = HashMapInit::::new_resizeable_named( - 1500, 2000, "test_clear" - ).attach_writer(); + let mut writer = HashMapInit::::new_resizeable_named(1500, 2000, "test_clear") + .attach_writer(); let mut shadow: std::collections::BTreeMap = BTreeMap::new(); let mut rng = rand::rng(); do_random_ops(2000, 1500, 0.75, &mut writer, &mut shadow, &mut rng); - writer.clear(); - assert_eq!(writer.get_num_buckets_in_use(), 0); - assert_eq!(writer.get_num_buckets(), 1500); - while let Some((key, _)) = shadow.pop_first() { - assert!(writer.get(&key).is_none()); - } - do_random_ops(2000, 1500, 0.75, &mut writer, &mut shadow, &mut rng); - for i in 0..(1500 - writer.get_num_buckets_in_use()) { - writer.insert((1500 + i as u128).into(), 0).unwrap(); - } - assert_eq!(writer.insert(5000.into(), 0), Err(FullError {})); - writer.clear(); - assert!(writer.insert(5000.into(), 0).is_ok()); + writer.clear(); + assert_eq!(writer.get_num_buckets_in_use(), 0); + assert_eq!(writer.get_num_buckets(), 1500); + while let Some((key, _)) = shadow.pop_first() { + assert!(writer.get(&key).is_none()); + } + do_random_ops(2000, 1500, 0.75, &mut writer, &mut shadow, &mut rng); + for i in 0..(1500 - writer.get_num_buckets_in_use()) { + writer.insert((1500 + i as u128).into(), 0).unwrap(); + } + assert_eq!(writer.insert(5000.into(), 0), Err(FullError {})); + writer.clear(); + assert!(writer.insert(5000.into(), 0).is_ok()); } #[test] fn test_idx_remove() { - let mut writer = HashMapInit::::new_resizeable_named( - 1500, 2000, "test_clear" - ).attach_writer(); + let mut writer = HashMapInit::::new_resizeable_named(1500, 2000, "test_clear") + .attach_writer(); let mut shadow: std::collections::BTreeMap = BTreeMap::new(); let mut rng = rand::rng(); do_random_ops(2000, 1500, 0.25, &mut writer, &mut shadow, &mut rng); - for _ in 0..100 { - let idx = (rng.next_u32() % 1500) as usize; - if let Some(e) = writer.entry_at_bucket(idx) { - shadow.remove(&e._key); - e.remove(); - } - - } - while let Some((key, val)) = shadow.pop_first() { - assert_eq!(*writer.get(&key).unwrap(), val); - } + for _ in 0..100 { + let idx = (rng.next_u32() % 1500) as usize; + if let Some(e) = writer.entry_at_bucket(idx) { + shadow.remove(&e._key); + e.remove(); + } + } + while let Some((key, val)) = shadow.pop_first() { + assert_eq!(*writer.get(&key).unwrap(), val); + } } #[test] fn test_idx_get() { - let mut writer = HashMapInit::::new_resizeable_named( - 1500, 2000, "test_clear" - ).attach_writer(); + let mut writer = HashMapInit::::new_resizeable_named(1500, 2000, "test_clear") + .attach_writer(); let mut shadow: std::collections::BTreeMap = BTreeMap::new(); let mut rng = rand::rng(); do_random_ops(2000, 1500, 0.25, &mut writer, &mut shadow, &mut rng); - for _ in 0..100 { - let idx = (rng.next_u32() % 1500) as usize; - if let Some(pair) = writer.get_at_bucket(idx) { - { - let v: *const usize = &pair.1; - assert_eq!(writer.get_bucket_for_value(v), idx); - } - { - let v: *const usize = &pair.1; - assert_eq!(writer.get_bucket_for_value(v), idx); - } - } - } + for _ in 0..100 { + let idx = (rng.next_u32() % 1500) as usize; + if let Some(pair) = writer.get_at_bucket(idx) { + { + let v: *const usize = &pair.1; + assert_eq!(writer.get_bucket_for_value(v), idx); + } + { + let v: *const usize = &pair.1; + assert_eq!(writer.get_bucket_for_value(v), idx); + } + } + } } #[test] fn test_shrink() { - let mut writer = HashMapInit::::new_resizeable_named( - 1500, 2000, "test_shrink" - ).attach_writer(); + let mut writer = HashMapInit::::new_resizeable_named(1500, 2000, "test_shrink") + .attach_writer(); let mut shadow: std::collections::BTreeMap = BTreeMap::new(); let mut rng = rand::rng(); - - do_random_ops(10000, 1500, 0.75, &mut writer, &mut shadow, &mut rng); - do_shrink(&mut writer, &mut shadow, 1000); - assert_eq!(writer.get_num_buckets(), 1000); - do_deletes(500, &mut writer, &mut shadow); - do_random_ops(10000, 500, 0.75, &mut writer, &mut shadow, &mut rng); - assert!(writer.get_num_buckets_in_use() <= 1000); + + do_random_ops(10000, 1500, 0.75, &mut writer, &mut shadow, &mut rng); + do_shrink(&mut writer, &mut shadow, 1000); + assert_eq!(writer.get_num_buckets(), 1000); + do_deletes(500, &mut writer, &mut shadow); + do_random_ops(10000, 500, 0.75, &mut writer, &mut shadow, &mut rng); + assert!(writer.get_num_buckets_in_use() <= 1000); } #[test] fn test_shrink_grow_seq() { - let mut writer = HashMapInit::::new_resizeable_named( - 1000, 20000, "test_grow_seq" - ).attach_writer(); + let mut writer = + HashMapInit::::new_resizeable_named(1000, 20000, "test_grow_seq") + .attach_writer(); let mut shadow: std::collections::BTreeMap = BTreeMap::new(); let mut rng = rand::rng(); do_random_ops(500, 1000, 0.1, &mut writer, &mut shadow, &mut rng); - eprintln!("Shrinking to 750"); + eprintln!("Shrinking to 750"); do_shrink(&mut writer, &mut shadow, 750); - do_random_ops(200, 1000, 0.5, &mut writer, &mut shadow, &mut rng); - eprintln!("Growing to 1500"); - writer.grow(1500).unwrap(); - do_random_ops(600, 1500, 0.1, &mut writer, &mut shadow, &mut rng); - eprintln!("Shrinking to 200"); - while shadow.len() > 100 { - do_deletes(1, &mut writer, &mut shadow); - } - do_shrink(&mut writer, &mut shadow, 200); - do_random_ops(50, 1500, 0.25, &mut writer, &mut shadow, &mut rng); - eprintln!("Growing to 10k"); - writer.grow(10000).unwrap(); - do_random_ops(10000, 5000, 0.25, &mut writer, &mut shadow, &mut rng); + do_random_ops(200, 1000, 0.5, &mut writer, &mut shadow, &mut rng); + eprintln!("Growing to 1500"); + writer.grow(1500).unwrap(); + do_random_ops(600, 1500, 0.1, &mut writer, &mut shadow, &mut rng); + eprintln!("Shrinking to 200"); + while shadow.len() > 100 { + do_deletes(1, &mut writer, &mut shadow); + } + do_shrink(&mut writer, &mut shadow, 200); + do_random_ops(50, 1500, 0.25, &mut writer, &mut shadow, &mut rng); + eprintln!("Growing to 10k"); + writer.grow(10000).unwrap(); + do_random_ops(10000, 5000, 0.25, &mut writer, &mut shadow, &mut rng); } #[test] fn test_bucket_ops() { - let writer = HashMapInit::::new_resizeable_named( - 1000, 1200, "test_bucket_ops" - ).attach_writer(); - match writer.entry(1.into()) { - Entry::Occupied(mut e) => { e.insert(2); }, - Entry::Vacant(e) => { _ = e.insert(2).unwrap(); }, - } - assert_eq!(writer.get_num_buckets_in_use(), 1); - assert_eq!(writer.get_num_buckets(), 1000); - assert_eq!(*writer.get(&1.into()).unwrap(), 2); - let pos = match writer.entry(1.into()) { - Entry::Occupied(e) => { - assert_eq!(e._key, 1.into()); - let pos = e.bucket_pos as usize; - pos - }, - Entry::Vacant(_) => { panic!("Insert didn't affect entry"); }, - }; - assert_eq!(writer.entry_at_bucket(pos).unwrap()._key, 1.into()); - assert_eq!(*writer.get_at_bucket(pos).unwrap(), (1.into(), 2)); - { - let ptr: *const usize = &*writer.get(&1.into()).unwrap(); - assert_eq!(writer.get_bucket_for_value(ptr), pos); - } - writer.remove(&1.into()); - assert!(writer.get(&1.into()).is_none()); + let writer = HashMapInit::::new_resizeable_named(1000, 1200, "test_bucket_ops") + .attach_writer(); + match writer.entry(1.into()) { + Entry::Occupied(mut e) => { + e.insert(2); + } + Entry::Vacant(e) => { + _ = e.insert(2).unwrap(); + } + } + assert_eq!(writer.get_num_buckets_in_use(), 1); + assert_eq!(writer.get_num_buckets(), 1000); + assert_eq!(*writer.get(&1.into()).unwrap(), 2); + let pos = match writer.entry(1.into()) { + Entry::Occupied(e) => { + assert_eq!(e._key, 1.into()); + let pos = e.bucket_pos as usize; + pos + } + Entry::Vacant(_) => { + panic!("Insert didn't affect entry"); + } + }; + assert_eq!(writer.entry_at_bucket(pos).unwrap()._key, 1.into()); + assert_eq!(*writer.get_at_bucket(pos).unwrap(), (1.into(), 2)); + { + let ptr: *const usize = &*writer.get(&1.into()).unwrap(); + assert_eq!(writer.get_bucket_for_value(ptr), pos); + } + writer.remove(&1.into()); + assert!(writer.get(&1.into()).is_none()); } #[test] fn test_shrink_zero() { - let mut writer = HashMapInit::::new_resizeable_named( - 1500, 2000, "test_shrink_zero" - ).attach_writer(); - writer.begin_shrink(0); - for i in 0..1500 { - writer.entry_at_bucket(i).map(|x| x.remove()); - } - writer.finish_shrink().unwrap(); - assert_eq!(writer.get_num_buckets_in_use(), 0); - let entry = writer.entry(1.into()); - if let Entry::Vacant(v) = entry { - assert!(v.insert(2).is_err()); - } else { - panic!("Somehow got non-vacant entry in empty map.") - } - writer.grow(50).unwrap(); - let entry = writer.entry(1.into()); - if let Entry::Vacant(v) = entry { - assert!(v.insert(2).is_ok()); - } else { - panic!("Somehow got non-vacant entry in empty map.") - } - assert_eq!(writer.get_num_buckets_in_use(), 1); + let mut writer = + HashMapInit::::new_resizeable_named(1500, 2000, "test_shrink_zero") + .attach_writer(); + writer.begin_shrink(0); + for i in 0..1500 { + writer.entry_at_bucket(i).map(|x| x.remove()); + } + writer.finish_shrink().unwrap(); + assert_eq!(writer.get_num_buckets_in_use(), 0); + let entry = writer.entry(1.into()); + if let Entry::Vacant(v) = entry { + assert!(v.insert(2).is_err()); + } else { + panic!("Somehow got non-vacant entry in empty map.") + } + writer.grow(50).unwrap(); + let entry = writer.entry(1.into()); + if let Entry::Vacant(v) = entry { + assert!(v.insert(2).is_ok()); + } else { + panic!("Somehow got non-vacant entry in empty map.") + } + assert_eq!(writer.get_num_buckets_in_use(), 1); } #[test] #[should_panic] fn test_grow_oom() { - let writer = HashMapInit::::new_resizeable_named( - 1500, 2000, "test_grow_oom" - ).attach_writer(); - writer.grow(20000).unwrap(); + let writer = HashMapInit::::new_resizeable_named(1500, 2000, "test_grow_oom") + .attach_writer(); + writer.grow(20000).unwrap(); } #[test] #[should_panic] fn test_shrink_bigger() { - let mut writer = HashMapInit::::new_resizeable_named( - 1500, 2500, "test_shrink_bigger" - ).attach_writer(); - writer.begin_shrink(2000); + let mut writer = + HashMapInit::::new_resizeable_named(1500, 2500, "test_shrink_bigger") + .attach_writer(); + writer.begin_shrink(2000); } #[test] #[should_panic] fn test_shrink_early_finish() { - let writer = HashMapInit::::new_resizeable_named( - 1500, 2500, "test_shrink_early_finish" - ).attach_writer(); - writer.finish_shrink().unwrap(); + let writer = + HashMapInit::::new_resizeable_named(1500, 2500, "test_shrink_early_finish") + .attach_writer(); + writer.finish_shrink().unwrap(); } #[test] #[should_panic] fn test_shrink_fixed_size() { - let mut area = [MaybeUninit::uninit(); 10000]; + let mut area = [MaybeUninit::uninit(); 10000]; let init_struct = HashMapInit::::with_fixed(3, &mut area); let mut writer = init_struct.attach_writer(); - writer.begin_shrink(1); + writer.begin_shrink(1); } diff --git a/libs/neon-shmem/src/shmem.rs b/libs/neon-shmem/src/shmem.rs index ea654337ff..f19f402859 100644 --- a/libs/neon-shmem/src/shmem.rs +++ b/libs/neon-shmem/src/shmem.rs @@ -76,19 +76,15 @@ impl ShmemHandle { Self::new_with_fd(fd, initial_size, max_size) } - fn new_with_fd( - fd: OwnedFd, - initial_size: usize, - max_size: usize, - ) -> Result { + fn new_with_fd(fd: OwnedFd, initial_size: usize, max_size: usize) -> Result { // We reserve the high-order bit for the `RESIZE_IN_PROGRESS` flag, and the actual size // is a little larger than this because of the SharedStruct header. Make the upper limit // somewhat smaller than that, because with anything close to that, you'll run out of // memory anyway. assert!(max_size < 1 << 48, "max size {max_size} too large"); - + assert!( - initial_size <= max_size, + initial_size <= max_size, "initial size {initial_size} larger than max size {max_size}" ); @@ -150,12 +146,12 @@ impl ShmemHandle { let shared = self.shared(); assert!( - new_size <= self.max_size, + new_size <= self.max_size, "new size ({new_size}) is greater than max size ({})", - self.max_size + self.max_size ); - assert_eq!(self.max_size, shared.max_size); + assert_eq!(self.max_size, shared.max_size); // Lock the area by setting the bit in `current_size` // @@ -187,9 +183,8 @@ impl ShmemHandle { let result = { use std::cmp::Ordering::{Equal, Greater, Less}; match new_size.cmp(&old_size) { - Less => nix_ftruncate(&self.fd, new_size as i64).map_err(|e| { - Error::new("could not shrink shmem segment, ftruncate failed", e) - }), + Less => nix_ftruncate(&self.fd, new_size as i64) + .map_err(|e| Error::new("could not shrink shmem segment, ftruncate failed", e)), Equal => Ok(()), Greater => enlarge_file(self.fd.as_fd(), new_size as u64), } @@ -207,7 +202,7 @@ impl ShmemHandle { /// Returns the current user-visible size of the shared memory segment. /// /// NOTE: a concurrent [`ShmemHandle::set_size()`] call can change the size at any time. - /// It is the caller's responsibility not to access the area beyond the current size. + /// It is the caller's responsibility not to access the area beyond the current size. pub fn current_size(&self) -> usize { let total_current_size = self.shared().current_size.load(Ordering::Relaxed) & !RESIZE_IN_PROGRESS; @@ -253,12 +248,8 @@ fn enlarge_file(fd: BorrowedFd, size: u64) -> Result<(), Error> { // we don't get a segfault later when trying to actually use it. #[cfg(not(target_os = "macos"))] { - nix::fcntl::posix_fallocate(fd, 0, size as i64).map_err(|e| { - Error::new( - "could not grow shmem segment, posix_fallocate failed", - e, - ) - }) + nix::fcntl::posix_fallocate(fd, 0, size as i64) + .map_err(|e| Error::new("could not grow shmem segment, posix_fallocate failed", e)) } // As a fallback on macos, which doesn't have posix_fallocate, use plain 'fallocate' #[cfg(target_os = "macos")] diff --git a/libs/neon-shmem/src/sync.rs b/libs/neon-shmem/src/sync.rs index 271923fd74..5a296b4047 100644 --- a/libs/neon-shmem/src/sync.rs +++ b/libs/neon-shmem/src/sync.rs @@ -15,91 +15,90 @@ pub type ValueWriteGuard<'a, T> = lock_api::MappedRwLockWriteGuard<'a, PthreadRw pub struct PthreadRwLock(Option>); impl PthreadRwLock { - pub fn new(lock: *mut libc::pthread_rwlock_t) -> Self { - unsafe { - let mut attrs = MaybeUninit::uninit(); - // Ignoring return value here - only possible error is OOM. - libc::pthread_rwlockattr_init(attrs.as_mut_ptr()); - libc::pthread_rwlockattr_setpshared( - attrs.as_mut_ptr(), - libc::PTHREAD_PROCESS_SHARED - ); - // TODO(quantumish): worth making this function return Result? - libc::pthread_rwlock_init(lock, attrs.as_mut_ptr()); - // Safety: POSIX specifies that "any function affecting the attributes - // object (including destruction) shall not affect any previously - // initialized read-write locks". - libc::pthread_rwlockattr_destroy(attrs.as_mut_ptr()); - Self(Some(NonNull::new_unchecked(lock))) - } - } - - fn inner(&self) -> NonNull { - match self.0 { - None => panic!("PthreadRwLock constructed badly - something likely used RawMutex::INIT"), - Some(x) => x, - } - } + pub fn new(lock: *mut libc::pthread_rwlock_t) -> Self { + unsafe { + let mut attrs = MaybeUninit::uninit(); + // Ignoring return value here - only possible error is OOM. + libc::pthread_rwlockattr_init(attrs.as_mut_ptr()); + libc::pthread_rwlockattr_setpshared(attrs.as_mut_ptr(), libc::PTHREAD_PROCESS_SHARED); + // TODO(quantumish): worth making this function return Result? + libc::pthread_rwlock_init(lock, attrs.as_mut_ptr()); + // Safety: POSIX specifies that "any function affecting the attributes + // object (including destruction) shall not affect any previously + // initialized read-write locks". + libc::pthread_rwlockattr_destroy(attrs.as_mut_ptr()); + Self(Some(NonNull::new_unchecked(lock))) + } + } + + fn inner(&self) -> NonNull { + match self.0 { + None => { + panic!("PthreadRwLock constructed badly - something likely used RawMutex::INIT") + } + Some(x) => x, + } + } } unsafe impl lock_api::RawRwLock for PthreadRwLock { - type GuardMarker = lock_api::GuardSend; - const INIT: Self = Self(None); - - fn lock_shared(&self) { - unsafe { - let res = libc::pthread_rwlock_rdlock(self.inner().as_ptr()); - if res != 0 { - panic!("rdlock failed with {}", Errno::from_raw(res)); - } - } - } + type GuardMarker = lock_api::GuardSend; + const INIT: Self = Self(None); - fn try_lock_shared(&self) -> bool { - unsafe { - let res = libc::pthread_rwlock_tryrdlock(self.inner().as_ptr()); - match res { - 0 => true, - libc::EAGAIN => false, - _ => panic!("try_rdlock failed with {}", Errno::from_raw(res)), - } - } - } + fn lock_shared(&self) { + unsafe { + let res = libc::pthread_rwlock_rdlock(self.inner().as_ptr()); + if res != 0 { + panic!("rdlock failed with {}", Errno::from_raw(res)); + } + } + } - fn lock_exclusive(&self) { - unsafe { - let res = libc::pthread_rwlock_wrlock(self.inner().as_ptr()); - if res != 0 { - panic!("wrlock failed with {}", Errno::from_raw(res)); - } - } - } + fn try_lock_shared(&self) -> bool { + unsafe { + let res = libc::pthread_rwlock_tryrdlock(self.inner().as_ptr()); + match res { + 0 => true, + libc::EAGAIN => false, + _ => panic!("try_rdlock failed with {}", Errno::from_raw(res)), + } + } + } - fn try_lock_exclusive(&self) -> bool { - unsafe { - let res = libc::pthread_rwlock_trywrlock(self.inner().as_ptr()); - match res { - 0 => true, - libc::EAGAIN => false, - _ => panic!("try_wrlock failed with {}", Errno::from_raw(res)), - } - } - } + fn lock_exclusive(&self) { + unsafe { + let res = libc::pthread_rwlock_wrlock(self.inner().as_ptr()); + if res != 0 { + panic!("wrlock failed with {}", Errno::from_raw(res)); + } + } + } - unsafe fn unlock_exclusive(&self) { - unsafe { - let res = libc::pthread_rwlock_unlock(self.inner().as_ptr()); - if res != 0 { - panic!("unlock failed with {}", Errno::from_raw(res)); - } - } - } - unsafe fn unlock_shared(&self) { - unsafe { - let res = libc::pthread_rwlock_unlock(self.inner().as_ptr()); - if res != 0 { - panic!("unlock failed with {}", Errno::from_raw(res)); - } - } - } + fn try_lock_exclusive(&self) -> bool { + unsafe { + let res = libc::pthread_rwlock_trywrlock(self.inner().as_ptr()); + match res { + 0 => true, + libc::EAGAIN => false, + _ => panic!("try_wrlock failed with {}", Errno::from_raw(res)), + } + } + } + + unsafe fn unlock_exclusive(&self) { + unsafe { + let res = libc::pthread_rwlock_unlock(self.inner().as_ptr()); + if res != 0 { + panic!("unlock failed with {}", Errno::from_raw(res)); + } + } + } + unsafe fn unlock_shared(&self) { + unsafe { + let res = libc::pthread_rwlock_unlock(self.inner().as_ptr()); + if res != 0 { + panic!("unlock failed with {}", Errno::from_raw(res)); + } + } + } } diff --git a/pgxn/neon/communicator/src/integrated_cache.rs b/pgxn/neon/communicator/src/integrated_cache.rs index 5223871a71..193039f6af 100644 --- a/pgxn/neon/communicator/src/integrated_cache.rs +++ b/pgxn/neon/communicator/src/integrated_cache.rs @@ -102,10 +102,8 @@ impl<'t> IntegratedCacheInitStruct<'t> { // Initialize the block map in a separate resizable shared memory area let shmem_handle = ShmemHandle::new("block mapping", 0, max_bytes).unwrap(); - let block_map_handle = neon_shmem::hash::HashMapInit::with_shmem( - initial_file_cache_size as u32, - shmem_handle, - ); + let block_map_handle = + neon_shmem::hash::HashMapInit::with_shmem(initial_file_cache_size as u32, shmem_handle); IntegratedCacheInitStruct { relsize_cache_handle, block_map_handle, @@ -343,18 +341,20 @@ impl<'t> IntegratedCacheWriteAccess<'t> { pub fn remember_rel_size(&'t self, rel: &RelTag, nblocks: u32) { match self.relsize_cache.entry(RelKey::from(rel)) { - Entry::Vacant(e) => { - tracing::info!("inserting rel entry for {rel:?}, {nblocks} blocks"); - // FIXME: what to do if we run out of memory? Evict other relation entries? - _ = e.insert(RelEntry { - nblocks: AtomicU32::new(nblocks), - }).expect("out of memory"); - }, - Entry::Occupied(e) => { - tracing::info!("updating rel entry for {rel:?}, {nblocks} blocks"); - e.get().nblocks.store(nblocks, Ordering::Relaxed); + Entry::Vacant(e) => { + tracing::info!("inserting rel entry for {rel:?}, {nblocks} blocks"); + // FIXME: what to do if we run out of memory? Evict other relation entries? + _ = e + .insert(RelEntry { + nblocks: AtomicU32::new(nblocks), + }) + .expect("out of memory"); } - }; + Entry::Occupied(e) => { + tracing::info!("updating rel entry for {rel:?}, {nblocks} blocks"); + e.get().nblocks.store(nblocks, Ordering::Relaxed); + } + }; } /// Remember the given page contents in the cache. @@ -380,12 +380,12 @@ impl<'t> IntegratedCacheWriteAccess<'t> { let mut old_cache_block = None; let mut found_existing = false; - // NOTE(quantumish): honoring original semantics here (used to be update_with_fn) - // but I don't see any reason why this has to take a write lock. + // NOTE(quantumish): honoring original semantics here (used to be update_with_fn) + // but I don't see any reason why this has to take a write lock. if let Entry::Occupied(e) = self.block_map.entry(key.clone()) { - let block_entry = e.get(); + let block_entry = e.get(); found_existing = true; - + // Prevent this entry from being evicted let pin_count = block_entry.pinned.fetch_add(1, Ordering::Relaxed); if pin_count > 0 { @@ -395,7 +395,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> { // buffer is held locked. TODO: check these conditions and tidy this up a little. Seems fragile to just panic. panic!("block entry was unexpectedly pinned"); } - + let cache_block = block_entry.cache_block.load(Ordering::Relaxed); old_cache_block = if cache_block != INVALID_CACHE_BLOCK { Some(cache_block) @@ -425,11 +425,11 @@ impl<'t> IntegratedCacheWriteAccess<'t> { // FIXME: unpin the block entry on error // Update the block entry - let entry = self.block_map.entry(key); - assert_eq!(found_existing, matches!(entry, Entry::Occupied(_))); + let entry = self.block_map.entry(key); + assert_eq!(found_existing, matches!(entry, Entry::Occupied(_))); match entry { - Entry::Occupied(e) => { - let block_entry = e.get(); + Entry::Occupied(e) => { + let block_entry = e.get(); // Update the cache block let old_blk = block_entry.cache_block.compare_exchange( INVALID_CACHE_BLOCK, @@ -445,18 +445,20 @@ impl<'t> IntegratedCacheWriteAccess<'t> { let pin_count = block_entry.pinned.fetch_sub(1, Ordering::Relaxed); assert!(pin_count > 0); - } - Entry::Vacant(e) => { - // FIXME: what to do if we run out of memory? Evict other relation entries? Remove - // block entries first? - _ = e.insert(BlockEntry { - lw_lsn: AtomicLsn::new(lw_lsn.0), - cache_block: AtomicU64::new(cache_block), - pinned: AtomicU64::new(0), - referenced: AtomicBool::new(true), - }).expect("out of memory"); } - } + Entry::Vacant(e) => { + // FIXME: what to do if we run out of memory? Evict other relation entries? Remove + // block entries first? + _ = e + .insert(BlockEntry { + lw_lsn: AtomicLsn::new(lw_lsn.0), + cache_block: AtomicU64::new(cache_block), + pinned: AtomicU64::new(0), + referenced: AtomicBool::new(true), + }) + .expect("out of memory"); + } + } } else { // !is_write // @@ -483,26 +485,31 @@ impl<'t> IntegratedCacheWriteAccess<'t> { .expect("error writing to cache"); // FIXME: handle errors gracefully. - match self.block_map.entry(key) { - Entry::Occupied(e) => { - let block_entry = e.get(); - // FIXME: could there be concurrent readers? + match self.block_map.entry(key) { + Entry::Occupied(e) => { + let block_entry = e.get(); + // FIXME: could there be concurrent readers? assert!(block_entry.pinned.load(Ordering::Relaxed) == 0); - let old_cache_block = block_entry.cache_block.swap(cache_block, Ordering::Relaxed); + let old_cache_block = + block_entry.cache_block.swap(cache_block, Ordering::Relaxed); if old_cache_block != INVALID_CACHE_BLOCK { - panic!("remember_page called in !is_write mode, but page is already cached at blk {old_cache_block}"); + panic!( + "remember_page called in !is_write mode, but page is already cached at blk {old_cache_block}" + ); } - }, - Entry::Vacant(e) => { - // FIXME: what to do if we run out of memory? Evict other relation entries? Remove - // block entries first? - _ = e.insert(BlockEntry { - lw_lsn: AtomicLsn::new(lw_lsn.0), - cache_block: AtomicU64::new(cache_block), - pinned: AtomicU64::new(0), - referenced: AtomicBool::new(true), - }).expect("out of memory"); + } + Entry::Vacant(e) => { + // FIXME: what to do if we run out of memory? Evict other relation entries? Remove + // block entries first? + _ = e + .insert(BlockEntry { + lw_lsn: AtomicLsn::new(lw_lsn.0), + cache_block: AtomicU64::new(cache_block), + pinned: AtomicU64::new(0), + referenced: AtomicBool::new(true), + }) + .expect("out of memory"); } } } @@ -591,23 +598,22 @@ impl<'t> IntegratedCacheWriteAccess<'t> { // grab the write lock let mut evicted_cache_block = None; if let Some(e) = self.block_map.entry_at_bucket(*clock_hand % num_buckets) { - let old = e.get(); - // note: all the accesses to 'pinned' currently happen + let old = e.get(); + // note: all the accesses to 'pinned' currently happen // within update_with_fn(), or while holding ValueReadGuard, which protects from concurrent // updates. Otherwise, another thread could set the 'pinned' // flag just after we have checked it here. if old.pinned.load(Ordering::Relaxed) == 0 { - let _ = self - .global_lw_lsn - .fetch_max(old.lw_lsn.load().0, Ordering::Relaxed); - let cache_block = old - .cache_block - .swap(INVALID_CACHE_BLOCK, Ordering::Relaxed); - if cache_block != INVALID_CACHE_BLOCK { - evicted_cache_block = Some(cache_block); - } - e.remove(); - } + let _ = self + .global_lw_lsn + .fetch_max(old.lw_lsn.load().0, Ordering::Relaxed); + let cache_block = + old.cache_block.swap(INVALID_CACHE_BLOCK, Ordering::Relaxed); + if cache_block != INVALID_CACHE_BLOCK { + evicted_cache_block = Some(cache_block); + } + e.remove(); + } } if evicted_cache_block.is_some() {