mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-27 01:50:38 +00:00
Generalize map to allow arbitrary hash fns, add clear() helper method
This commit is contained in:
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -3843,9 +3843,11 @@ checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a"
|
||||
name = "neon-shmem"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"criterion",
|
||||
"nix 0.30.1",
|
||||
"rand 0.9.1",
|
||||
"rand_distr 0.5.1",
|
||||
"rustc-hash 1.1.0",
|
||||
"tempfile",
|
||||
"thiserror 1.0.69",
|
||||
"workspace_hack",
|
||||
|
||||
@@ -8,10 +8,22 @@ license.workspace = true
|
||||
thiserror.workspace = true
|
||||
nix.workspace = true
|
||||
workspace_hack = { version = "0.1", path = "../../workspace_hack" }
|
||||
rustc-hash = { version = "2.1.1" }
|
||||
|
||||
[dev-dependencies]
|
||||
criterion = { workspace = true, features = ["html_reports"] }
|
||||
rand = "0.9.1"
|
||||
rand_distr = "0.5.1"
|
||||
xxhash-rust = { version = "0.8.15", features = ["xxh3"] }
|
||||
ahash.workspace = true
|
||||
|
||||
[target.'cfg(target_os = "macos")'.dependencies]
|
||||
tempfile = "3.14.0"
|
||||
|
||||
[[bench]]
|
||||
name = "hmap_resize"
|
||||
harness = false
|
||||
|
||||
[[bin]]
|
||||
name = "hmap_test"
|
||||
path = "main.rs"
|
||||
|
||||
@@ -8,9 +8,11 @@
|
||||
//! [ ] Resizable
|
||||
|
||||
use std::fmt::Debug;
|
||||
use std::hash::{DefaultHasher, Hash, Hasher};
|
||||
use std::hash::{Hash, Hasher, BuildHasher};
|
||||
use std::mem::MaybeUninit;
|
||||
|
||||
use rustc_hash::FxBuildHasher;
|
||||
|
||||
use crate::shmem::ShmemHandle;
|
||||
|
||||
mod core;
|
||||
@@ -25,29 +27,32 @@ use entry::{Entry, OccupiedEntry, PrevPos};
|
||||
#[derive(Debug)]
|
||||
pub struct OutOfMemoryError();
|
||||
|
||||
pub struct HashMapInit<'a, K, V> {
|
||||
pub struct HashMapInit<'a, K, V, S = rustc_hash::FxBuildHasher> {
|
||||
// Hash table can be allocated in a fixed memory area, or in a resizeable ShmemHandle.
|
||||
shmem_handle: Option<ShmemHandle>,
|
||||
shared_ptr: *mut HashMapShared<'a, K, V>,
|
||||
hasher: S,
|
||||
}
|
||||
|
||||
pub struct HashMapAccess<'a, K, V> {
|
||||
pub struct HashMapAccess<'a, K, V, S = rustc_hash::FxBuildHasher> {
|
||||
shmem_handle: Option<ShmemHandle>,
|
||||
shared_ptr: *mut HashMapShared<'a, K, V>,
|
||||
hasher: S,
|
||||
}
|
||||
|
||||
unsafe impl<'a, K: Sync, V: Sync> Sync for HashMapAccess<'a, K, V> {}
|
||||
unsafe impl<'a, K: Send, V: Send> Send for HashMapAccess<'a, K, V> {}
|
||||
unsafe impl<'a, K: Sync, V: Sync, S> Sync for HashMapAccess<'a, K, V, S> {}
|
||||
unsafe impl<'a, K: Send, V: Send, S> Send for HashMapAccess<'a, K, V, S> {}
|
||||
|
||||
impl<'a, K, V> HashMapInit<'a, K, V> {
|
||||
pub fn attach_writer(self) -> HashMapAccess<'a, K, V> {
|
||||
impl<'a, K, V, S> HashMapInit<'a, K, V, S> {
|
||||
pub fn attach_writer(self) -> HashMapAccess<'a, K, V, S> {
|
||||
HashMapAccess {
|
||||
shmem_handle: self.shmem_handle,
|
||||
shared_ptr: self.shared_ptr,
|
||||
hasher: self.hasher,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn attach_reader(self) -> HashMapAccess<'a, K, V> {
|
||||
pub fn attach_reader(self) -> HashMapAccess<'a, K, V, S> {
|
||||
// no difference to attach_writer currently
|
||||
self.attach_writer()
|
||||
}
|
||||
@@ -65,42 +70,60 @@ impl<'a, K, V> HashMapInit<'a, K, V> {
|
||||
///
|
||||
/// In between the above parts, there can be padding bytes to align the parts correctly.
|
||||
struct HashMapShared<'a, K, V> {
|
||||
inner: CoreHashMap<'a, K, V>,
|
||||
inner: CoreHashMap<'a, K, V>
|
||||
}
|
||||
|
||||
impl<'a, K, V> HashMapInit<'a, K, V>
|
||||
impl<'a, K, V> HashMapInit<'a, K, V, rustc_hash::FxBuildHasher>
|
||||
where
|
||||
K: Clone + Hash + Eq,
|
||||
K: Clone + Hash + Eq
|
||||
{
|
||||
pub fn init_in_fixed_area(
|
||||
num_buckets: u32,
|
||||
area: &'a mut [MaybeUninit<u8>],
|
||||
) -> HashMapInit<'a, K, V> {
|
||||
Self::init_in_fixed_area_with_hasher(num_buckets, area, rustc_hash::FxBuildHasher::default())
|
||||
}
|
||||
|
||||
/// Initialize a new hash map in the given shared memory area
|
||||
pub fn init_in_shmem(num_buckets: u32, shmem: ShmemHandle) -> HashMapInit<'a, K, V> {
|
||||
Self::init_in_shmem_with_hasher(num_buckets, shmem, rustc_hash::FxBuildHasher::default())
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, K, V, S: BuildHasher> HashMapInit<'a, K, V, S>
|
||||
where
|
||||
K: Clone + Hash + Eq
|
||||
{
|
||||
pub fn estimate_size(num_buckets: u32) -> usize {
|
||||
// add some margin to cover alignment etc.
|
||||
CoreHashMap::<K, V>::estimate_size(num_buckets) + size_of::<HashMapShared<K, V>>() + 1000
|
||||
}
|
||||
|
||||
pub fn init_in_fixed_area(
|
||||
num_buckets: u32,
|
||||
area: &'a mut [MaybeUninit<u8>],
|
||||
) -> HashMapInit<'a, K, V> {
|
||||
Self::init_common(num_buckets, None, area.as_mut_ptr().cast(), area.len())
|
||||
}
|
||||
|
||||
/// Initialize a new hash map in the given shared memory area
|
||||
pub fn init_in_shmem(num_buckets: u32, mut shmem: ShmemHandle) -> HashMapInit<'a, K, V> {
|
||||
|
||||
pub fn init_in_shmem_with_hasher(num_buckets: u32, mut shmem: ShmemHandle, hasher: S) -> HashMapInit<'a, K, V, S> {
|
||||
let size = Self::estimate_size(num_buckets);
|
||||
shmem
|
||||
.set_size(size)
|
||||
.expect("could not resize shared memory area");
|
||||
|
||||
let ptr = unsafe { shmem.data_ptr.as_mut() };
|
||||
Self::init_common(num_buckets, Some(shmem), ptr, size)
|
||||
Self::init_common(num_buckets, Some(shmem), ptr, size, hasher)
|
||||
}
|
||||
|
||||
pub fn init_in_fixed_area_with_hasher(
|
||||
num_buckets: u32,
|
||||
area: &'a mut [MaybeUninit<u8>],
|
||||
hasher: S,
|
||||
) -> HashMapInit<'a, K, V, S> {
|
||||
Self::init_common(num_buckets, None, area.as_mut_ptr().cast(), area.len(), hasher)
|
||||
}
|
||||
|
||||
fn init_common(
|
||||
num_buckets: u32,
|
||||
shmem_handle: Option<ShmemHandle>,
|
||||
area_ptr: *mut u8,
|
||||
area_len: usize,
|
||||
) -> HashMapInit<'a, K, V> {
|
||||
hasher: S,
|
||||
) -> HashMapInit<'a, K, V, S> {
|
||||
// carve out the HashMapShared struct from the area.
|
||||
let mut ptr: *mut u8 = area_ptr;
|
||||
let end_ptr: *mut u8 = unsafe { area_ptr.add(area_len) };
|
||||
@@ -133,18 +156,17 @@ where
|
||||
HashMapInit {
|
||||
shmem_handle,
|
||||
shared_ptr,
|
||||
hasher,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, K, V> HashMapAccess<'a, K, V>
|
||||
impl<'a, K, V, S: BuildHasher> HashMapAccess<'a, K, V, S>
|
||||
where
|
||||
K: Clone + Hash + Eq,
|
||||
{
|
||||
pub fn get_hash_value(&self, key: &K) -> u64 {
|
||||
let mut hasher = DefaultHasher::new();
|
||||
key.hash(&mut hasher);
|
||||
hasher.finish()
|
||||
self.hasher.hash_one(key)
|
||||
}
|
||||
|
||||
pub fn get_with_hash<'e>(&'e self, key: &K, hash: u64) -> Option<&'e V> {
|
||||
@@ -210,6 +232,12 @@ where
|
||||
map.inner.buckets_in_use as usize
|
||||
}
|
||||
|
||||
pub fn clear(&mut self) {
|
||||
let map = unsafe { self.shared_ptr.as_mut() }.unwrap();
|
||||
let inner = &mut map.inner;
|
||||
inner.clear()
|
||||
}
|
||||
|
||||
/// Helper function that abstracts the common logic between growing and shrinking.
|
||||
/// The only significant difference in the rehashing step is how many buckets to rehash.
|
||||
fn rehash_dict(
|
||||
@@ -243,10 +271,7 @@ where
|
||||
continue;
|
||||
}
|
||||
|
||||
let mut hasher = DefaultHasher::new();
|
||||
buckets[i].inner.as_ref().unwrap().0.hash(&mut hasher);
|
||||
let hash = hasher.finish();
|
||||
|
||||
let hash = self.hasher.hash_one(&buckets[i].inner.as_ref().unwrap().0);
|
||||
let pos: usize = (hash % dictionary.len() as u64) as usize;
|
||||
buckets[i].next = dictionary[pos];
|
||||
dictionary[pos] = i as u32;
|
||||
@@ -256,6 +281,23 @@ where
|
||||
inner.dictionary = dictionary;
|
||||
inner.buckets = buckets;
|
||||
}
|
||||
|
||||
/// Rehash the map. Intended for benchmarking only.
|
||||
pub fn shuffle(&mut self) {
|
||||
let map = unsafe { self.shared_ptr.as_mut() }.unwrap();
|
||||
let inner = &mut map.inner;
|
||||
|
||||
let shmem_handle = self
|
||||
.shmem_handle
|
||||
.as_ref()
|
||||
.expect("TODO(quantumish): make shuffle work w/ fixed-size table");
|
||||
let num_buckets = inner.get_num_buckets() as u32;
|
||||
let size_bytes = HashMapInit::<K, V, S>::estimate_size(num_buckets);
|
||||
let end_ptr: *mut u8 = unsafe { shmem_handle.data_ptr.as_ptr().add(size_bytes) };
|
||||
let buckets_ptr = inner.buckets.as_mut_ptr();
|
||||
self.rehash_dict(inner, buckets_ptr, end_ptr, num_buckets, num_buckets);
|
||||
}
|
||||
|
||||
|
||||
/// Grow
|
||||
///
|
||||
@@ -278,7 +320,7 @@ where
|
||||
.as_ref()
|
||||
.expect("grow called on a fixed-size hash table");
|
||||
|
||||
let size_bytes = HashMapInit::<K, V>::estimate_size(num_buckets);
|
||||
let size_bytes = HashMapInit::<K, V, S>::estimate_size(num_buckets);
|
||||
shmem_handle.set_size(size_bytes)?;
|
||||
let end_ptr: *mut u8 = unsafe { shmem_handle.data_ptr.as_ptr().add(size_bytes) };
|
||||
|
||||
@@ -371,7 +413,7 @@ where
|
||||
.as_ref()
|
||||
.expect("shrink called on a fixed-size hash table");
|
||||
|
||||
let size_bytes = HashMapInit::<K, V>::estimate_size(num_buckets);
|
||||
let size_bytes = HashMapInit::<K, V, S>::estimate_size(num_buckets);
|
||||
shmem_handle.set_size(size_bytes)?;
|
||||
let end_ptr: *mut u8 = unsafe { shmem_handle.data_ptr.as_ptr().add(size_bytes) };
|
||||
let buckets_ptr = inner.buckets.as_mut_ptr();
|
||||
|
||||
@@ -50,7 +50,7 @@ where
|
||||
as usize;
|
||||
|
||||
size
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new(
|
||||
buckets: &'a mut [MaybeUninit<Bucket<K, V>>],
|
||||
@@ -160,6 +160,33 @@ where
|
||||
self.alloc_limit != INVALID_POS
|
||||
}
|
||||
|
||||
|
||||
// TODO(quantumish): How does this interact with an ongoing shrink?
|
||||
pub fn clear(&mut self) {
|
||||
for i in 0..self.buckets.len() {
|
||||
self.buckets[i] = Bucket {
|
||||
next: if i < self.buckets.len() - 1 {
|
||||
i as u32 + 1
|
||||
} else {
|
||||
INVALID_POS
|
||||
},
|
||||
prev: if i > 0 {
|
||||
PrevPos::Chained(i as u32 - 1)
|
||||
} else {
|
||||
PrevPos::First(INVALID_POS)
|
||||
},
|
||||
inner: None,
|
||||
}
|
||||
}
|
||||
|
||||
for i in 0..self.dictionary.len() {
|
||||
self.dictionary[i] = INVALID_POS;
|
||||
}
|
||||
|
||||
self.buckets_in_use = 0;
|
||||
self.alloc_limit = INVALID_POS;
|
||||
}
|
||||
|
||||
pub fn entry_at_bucket(&mut self, pos: usize) -> Option<OccupiedEntry<'a, '_, K, V>> {
|
||||
if pos >= self.buckets.len() {
|
||||
return None;
|
||||
|
||||
Reference in New Issue
Block a user