mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-04 12:02:55 +00:00
Document problems and pitfalls with fine-grained hashmap impl
This commit is contained in:
@@ -344,6 +344,7 @@ where
|
||||
shard_pos: shard_off,
|
||||
bucket_pos: pos,
|
||||
bucket_arr: &map.bucket_arr,
|
||||
key_pos: entry_pos,
|
||||
}
|
||||
})
|
||||
}
|
||||
@@ -376,7 +377,13 @@ where
|
||||
let map = unsafe { self.shared_ptr.as_mut() }.unwrap();
|
||||
map.clear();
|
||||
}
|
||||
|
||||
|
||||
/// Begin a rehash operation. Converts all existing entries
|
||||
// TODO: missing logic to prevent furhter resize operations when one is already underway.
|
||||
// One future feature could be to allow interruptible resizes. We wouldn't pay much of a
|
||||
// space penalty if we used something like https://crates.io/crates/u4 inside EntryTag
|
||||
// to allow for many tiers of older chains (we would have to track previous sizes within
|
||||
// a sliding window at the front of the memory region or something)
|
||||
fn begin_rehash(
|
||||
&self,
|
||||
shards: &mut Vec<RwLockWriteGuard<'_, DictShard<'_, K>>>,
|
||||
@@ -402,18 +409,23 @@ where
|
||||
true
|
||||
}
|
||||
|
||||
|
||||
// TODO(quantumish): off by one for return value logic?
|
||||
// Unfinished, final large-ish piece standing in the way of a prototype.
|
||||
//
|
||||
// Based off the hashbrown implementation but adapted to an incremental context. See below:
|
||||
// https://github.com/quantumish/hashbrown/blob/6610e6d2b1f288ef7b0709a3efefbc846395dc5e/src/raw/mod.rs#L2866
|
||||
fn do_rehash(&self) -> bool {
|
||||
let map = unsafe { self.shared_ptr.as_mut() }.unwrap();
|
||||
// TODO(quantumish): refactor these out into settable quantities
|
||||
const REHASH_CHUNK_SIZE: usize = 10;
|
||||
const REHASH_ATTEMPTS: usize = 5;
|
||||
|
||||
let end = map.rehash_end.load(Ordering::Relaxed);
|
||||
let ind = map.rehash_index.load(Ordering::Relaxed);
|
||||
if ind >= end { return true }
|
||||
|
||||
// We have to use a mutex to prevent concurrent rehashes as they provide a pretty
|
||||
// obvious chance at a deadlock: one thread wants to rehash an entry into a shard
|
||||
// which is held by another thread which wants to rehash its block into the shard
|
||||
// held by the first. Doesn't seem like there's an obvious way around this?
|
||||
let _guard = self.resize_lock.try_lock();
|
||||
if _guard.is_none() { return false }
|
||||
|
||||
@@ -438,27 +450,31 @@ where
|
||||
EntryTag::Tombstone => core::MapEntryType::Skip,
|
||||
_ => core::MapEntryType::Tombstone,
|
||||
}).unwrap();
|
||||
let new_pos = new.pos();
|
||||
|
||||
match new.tag() {
|
||||
EntryTag::Empty | EntryTag::RehashTombstone => {
|
||||
shard.keys[shard_off].tag = EntryTag::Empty;
|
||||
unsafe {
|
||||
std::mem::swap(
|
||||
shard.keys[shard_off].val.assume_init_mut(),
|
||||
new.
|
||||
},
|
||||
EntryTag::Rehash => {
|
||||
// I believe the blocker here is that this unfortunately this would require
|
||||
// duplicating a lot of the logic of a write lookup again but with the caveat
|
||||
// that we're already holding one of the shard locks and need to pass that
|
||||
// context on. One thing I was considering at the time was using a hashmap to
|
||||
// manage the lock guards and passing that around?
|
||||
todo!("finish rehash implementation")
|
||||
// match new.tag() {
|
||||
// EntryTag::Empty | EntryTag::RehashTombstone => {
|
||||
// shard.keys[shard_off].tag = EntryTag::Empty;
|
||||
// unsafe {
|
||||
// std::mem::swap(
|
||||
// shard.keys[shard_off].val.assume_init_mut(),
|
||||
// new.
|
||||
// },
|
||||
// EntryTag::Rehash => {
|
||||
|
||||
},
|
||||
_ => unreachable!()
|
||||
}
|
||||
// },
|
||||
// _ => unreachable!()
|
||||
// }
|
||||
}
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
|
||||
pub fn finish_rehash(&self) {
|
||||
let map = unsafe { self.shared_ptr.as_mut() }.unwrap();
|
||||
while self.do_rehash() {}
|
||||
@@ -572,7 +588,7 @@ where
|
||||
pub fn shrink_goal(&self) -> Option<usize> {
|
||||
let map = unsafe { self.shared_ptr.as_mut() }.unwrap();
|
||||
let goal = map.bucket_arr.alloc_limit.load(Ordering::Relaxed);
|
||||
goal.next_checkeddd()
|
||||
goal.next_checked()
|
||||
}
|
||||
|
||||
pub fn finish_shrink(&self) -> Result<(), shmem::Error> {
|
||||
@@ -582,7 +598,7 @@ where
|
||||
|
||||
let num_buckets = map.bucket_arr.alloc_limit
|
||||
.load(Ordering::Relaxed)
|
||||
.next_checkeddd()
|
||||
.next_checked()
|
||||
.expect("called finish_shrink when no shrink is in progress");
|
||||
|
||||
if map.get_num_buckets() == num_buckets {
|
||||
|
||||
@@ -1,3 +1,20 @@
|
||||
//! Lock-free stable array of buckets managed with a freelist.
|
||||
//!
|
||||
//! Since the positions of entries in the dictionary and the bucket array are not correlated,
|
||||
//! we either had to separately shard both and deal with the overhead of two lock acquisitions
|
||||
//! per read/write, or make the bucket array lock free. This is *generally* fine since most
|
||||
//! accesses of the bucket array are done while holding the lock on the corresponding dict shard
|
||||
//! and thus synchronized. May not hold up to the removals done by the LFC which is a problem.
|
||||
//!
|
||||
//! Routines are pretty closely adapted from https://timharris.uk/papers/2001-disc.pdf
|
||||
//!
|
||||
//! Notable caveats:
|
||||
//! - Can only store around 2^30 entries, which is actually only 10x our current workload.
|
||||
//! - This is because we need two tag bits to distinguish full/empty and marked/unmarked entries.
|
||||
//! - Has not been seriously tested.
|
||||
//!
|
||||
//! Full entries also store the index to their corresponding dictionary entry in order
|
||||
//! to enable .entry_at_bucket() which is needed for the clock eviction algo in the LFC.
|
||||
|
||||
use std::cell::UnsafeCell;
|
||||
use std::mem::MaybeUninit;
|
||||
@@ -9,6 +26,7 @@ use atomic::Atomic;
|
||||
#[repr(transparent)]
|
||||
pub(crate) struct BucketIdx(pub(super) u32);
|
||||
|
||||
// This should always be true as `BucketIdx` is a simple newtype.
|
||||
const _: () = assert!(Atomic::<BucketIdx>::is_lock_free());
|
||||
|
||||
impl BucketIdx {
|
||||
@@ -20,8 +38,10 @@ impl BucketIdx {
|
||||
const FULL_TAG: u32 = 0b10 << 30;
|
||||
/// Reserved. Don't use me.
|
||||
const RSVD_TAG: u32 = 0b11 << 30;
|
||||
|
||||
|
||||
/// Invalid index within the bucket array (can be mixed with any tag).
|
||||
pub const INVALID: Self = Self(0x3FFFFFFF);
|
||||
/// Max index within the bucket array (can be mixed with any tag).
|
||||
pub const MAX: usize = Self::INVALID.0 as usize - 1;
|
||||
|
||||
pub(super) fn is_marked(&self) -> bool {
|
||||
@@ -45,15 +65,17 @@ impl BucketIdx {
|
||||
debug_assert!(val < Self::MAX);
|
||||
Self(val as u32 | Self::FULL_TAG)
|
||||
}
|
||||
|
||||
|
||||
/// Try to extract a valid index if the tag is NEXT.
|
||||
pub fn next_checked(&self) -> Option<usize> {
|
||||
if *self == Self::INVALID || self.is_marked() {
|
||||
None
|
||||
} else {
|
||||
if self.0 & Self::RSVD_TAG == Self::NEXT_TAG && *self != Self::INVALID {
|
||||
Some(self.0 as usize)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
/// Try to extract an index if the tag is FULL.
|
||||
pub fn full_checked(&self) -> Option<usize> {
|
||||
if self.0 & Self::RSVD_TAG == Self::FULL_TAG {
|
||||
Some((self.0 & Self::INVALID.0) as usize)
|
||||
@@ -63,24 +85,12 @@ impl BucketIdx {
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for BucketIdx {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
|
||||
let idx = self.get_unmarked().0;
|
||||
write!(
|
||||
f, "BucketIdx(marked={}, idx={})",
|
||||
self.is_marked(),
|
||||
match *self {
|
||||
Self::INVALID => "INVALID".to_string(),
|
||||
_ => format!("{idx}")
|
||||
}
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
/// format storage unit within the hash table. Either empty or contains a key-value pair.
|
||||
/// Always part of a chain of some kind (either a freelist if empty or a hash chain if full).
|
||||
/// Entry within the bucket array. Value is only initialized if you
|
||||
pub(crate) struct Bucket<V> {
|
||||
pub val: MaybeUninit<V>,
|
||||
// Only initialized if `next` field is tagged with FULL.
|
||||
pub val: MaybeUninit<V>,
|
||||
// Either points to next entry in freelist if empty or points
|
||||
// to the corresponding entry in dictionary if full.
|
||||
pub next: Atomic<BucketIdx>,
|
||||
}
|
||||
|
||||
@@ -92,13 +102,6 @@ impl<V> Bucket<V> {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn full(val: V) -> Self {
|
||||
Self {
|
||||
val: MaybeUninit::new(val),
|
||||
next: Atomic::new(BucketIdx::INVALID)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn as_ref(&self) -> &V {
|
||||
unsafe { self.val.assume_init_ref() }
|
||||
}
|
||||
@@ -164,7 +167,9 @@ impl<'a, V> BucketArray<'a, V> {
|
||||
pub fn len(&self) -> usize {
|
||||
unsafe { (&*self.buckets.get()).len() }
|
||||
}
|
||||
|
||||
|
||||
/// Deallocate a bucket, adding it to the free list.
|
||||
// Adapted from List::insert in https://timharris.uk/papers/2001-disc.pdf
|
||||
pub fn dealloc_bucket(&self, pos: usize) -> V {
|
||||
loop {
|
||||
let free = self.free_head.load(Ordering::Relaxed);
|
||||
@@ -178,6 +183,8 @@ impl<'a, V> BucketArray<'a, V> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Find a usable bucket at the front of the free list.
|
||||
// Adapted from List::search in https://timharris.uk/papers/2001-disc.pdf
|
||||
#[allow(unused_assignments)]
|
||||
fn find_bucket(&self) -> (BucketIdx, BucketIdx) {
|
||||
let mut left_node = BucketIdx::INVALID;
|
||||
@@ -229,9 +236,10 @@ impl<'a, V> BucketArray<'a, V> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Pop a bucket from the free list.
|
||||
// Adapted from List::delete in https://timharris.uk/papers/2001-disc.pdf
|
||||
#[allow(unused_assignments)]
|
||||
pub(crate) fn alloc_bucket(&self, value: V, key_pos: usize) -> Option<BucketIdx> {
|
||||
// println!("alloc()");
|
||||
let mut right_node_next = BucketIdx::INVALID;
|
||||
let mut left_idx = BucketIdx::INVALID;
|
||||
let mut right_idx = BucketIdx::INVALID;
|
||||
|
||||
@@ -1,4 +1,8 @@
|
||||
//! Simple hash table with chaining.
|
||||
//! Sharded linear probing hash table.
|
||||
|
||||
//! NOTE/FIXME: one major bug with this design is that the current hashmap DOES NOT TRACK
|
||||
//! the previous size of the hashmap and thus does lookups incorrectly/badly. This should
|
||||
//! be a reasonably minor fix?
|
||||
|
||||
use std::cell::UnsafeCell;
|
||||
use std::hash::Hash;
|
||||
@@ -11,27 +15,45 @@ use crate::hash::{
|
||||
bucket::{BucketArray, Bucket, BucketIdx}
|
||||
};
|
||||
|
||||
/// Metadata tag for the type of an entry in the hashmap.
|
||||
#[derive(PartialEq, Eq, Clone, Copy)]
|
||||
pub(crate) enum EntryTag {
|
||||
/// An occupied entry inserted after a resize operation.
|
||||
Occupied,
|
||||
/// An occupied entry inserted before a resize operation
|
||||
/// a.k.a. an entry that needs to be rehashed at some point.
|
||||
Rehash,
|
||||
/// An entry that was once `Occupied`.
|
||||
Tombstone,
|
||||
/// An entry that was once `Rehash`.
|
||||
RehashTombstone,
|
||||
/// An empty entry.
|
||||
Empty,
|
||||
}
|
||||
|
||||
/// Searching the chains of a hashmap oftentimes requires interpreting
|
||||
/// a set of metadata tags differently. This enum encodes the ways a
|
||||
/// metadata tag can be treated during a lookup.
|
||||
pub(crate) enum MapEntryType {
|
||||
/// Should be treated as if it were occupied.
|
||||
Occupied,
|
||||
/// Should be treated as if it were a tombstone.
|
||||
Tombstone,
|
||||
/// Should be treated as if it were empty.
|
||||
Empty,
|
||||
/// Should be ignored.
|
||||
Skip
|
||||
}
|
||||
|
||||
/// A key within the dictionary component of the hashmap.
|
||||
pub(crate) struct EntryKey<K> {
|
||||
// NOTE: This could be split out to save 3 bytes per entry!
|
||||
// Wasn't sure it was worth the penalty of another shmem area.
|
||||
pub(crate) tag: EntryTag,
|
||||
pub(crate) val: MaybeUninit<K>,
|
||||
}
|
||||
|
||||
/// A shard of the dictionary.
|
||||
pub(crate) struct DictShard<'a, K> {
|
||||
pub(crate) keys: &'a mut [EntryKey<K>],
|
||||
pub(crate) idxs: &'a mut [BucketIdx],
|
||||
@@ -43,7 +65,7 @@ impl<'a, K> DictShard<'a, K> {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct MaybeUninitDictShard<'a, K> {
|
||||
pub(crate) struct MaybeUninitDictShard<'a, K> {
|
||||
pub(crate) keys: &'a mut [MaybeUninit<EntryKey<K>>],
|
||||
pub(crate) idxs: &'a mut [MaybeUninit<BucketIdx>],
|
||||
}
|
||||
@@ -52,8 +74,11 @@ pub(crate) struct MaybeUninitDictShard<'a, K> {
|
||||
pub(crate) struct CoreHashMap<'a, K, V> {
|
||||
/// Dictionary used to map hashes to bucket indices.
|
||||
pub(crate) dict_shards: &'a mut [RwLock<DictShard<'a, K>>],
|
||||
/// Stable bucket array used to store the values.
|
||||
pub(crate) bucket_arr: BucketArray<'a, V>,
|
||||
pub(crate) rehash_index: AtomicUsize,
|
||||
/// Index of the next entry to process for rehashing.
|
||||
pub(crate) rehash_index: AtomicUsize,
|
||||
/// Index of the end of the range to be rehashed.
|
||||
pub(crate) rehash_end: AtomicUsize,
|
||||
}
|
||||
|
||||
@@ -108,11 +133,14 @@ impl<'a, K: Clone + Hash + Eq, V> CoreHashMap<'a, K, V> {
|
||||
bucket_arr: BucketArray::new(buckets_cell),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/// Get the value associated with a key (if it exists) given its hash.
|
||||
pub fn get_with_hash(&'a self, key: &K, hash: u64) -> Option<ValueReadGuard<'a, V>> {
|
||||
let ind = self.rehash_index.load(Ordering::Relaxed);
|
||||
let end = self.rehash_end.load(Ordering::Relaxed);
|
||||
|
||||
// First search the chains from the current context (thus treat
|
||||
// to-be-rehashed entries as tombstones within a current chain).
|
||||
let res = self.get(key, hash, |tag| match tag {
|
||||
EntryTag::Empty => MapEntryType::Empty,
|
||||
EntryTag::Occupied => MapEntryType::Occupied,
|
||||
@@ -121,8 +149,10 @@ impl<'a, K: Clone + Hash + Eq, V> CoreHashMap<'a, K, V> {
|
||||
if res.is_some() {
|
||||
return res;
|
||||
}
|
||||
|
||||
|
||||
if ind < end {
|
||||
// Search chains from the previous size of the map if a rehash is in progress.
|
||||
// Ignore any entries inserted since the resize operation occurred.
|
||||
self.get(key, hash, |tag| match tag {
|
||||
EntryTag::Empty => MapEntryType::Empty,
|
||||
EntryTag::Rehash => MapEntryType::Occupied,
|
||||
@@ -140,6 +170,8 @@ impl<'a, K: Clone + Hash + Eq, V> CoreHashMap<'a, K, V> {
|
||||
let res = self.entry(key.clone(), hash, |tag| match tag {
|
||||
EntryTag::Empty => MapEntryType::Empty,
|
||||
EntryTag::Occupied => MapEntryType::Occupied,
|
||||
// We can't treat old entries as tombstones here, as we definitely can't
|
||||
// insert over them! Instead we can just skip directly over them.
|
||||
EntryTag::Rehash => MapEntryType::Skip,
|
||||
_ => MapEntryType::Tombstone,
|
||||
});
|
||||
@@ -159,7 +191,6 @@ impl<'a, K: Clone + Hash + Eq, V> CoreHashMap<'a, K, V> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the value associated with a key (if it exists) given its hash.
|
||||
fn get<F>(&'a self, key: &K, hash: u64, f: F) -> Option<ValueReadGuard<'a, V>>
|
||||
where F: Fn(EntryTag) -> MapEntryType
|
||||
{
|
||||
@@ -191,26 +222,39 @@ impl<'a, K: Clone + Hash + Eq, V> CoreHashMap<'a, K, V> {
|
||||
None
|
||||
}
|
||||
|
||||
|
||||
pub fn entry<F>(&'a self, key: K, hash: u64, f: F) -> Result<Entry<'a, K, V>, FullError>
|
||||
where F: Fn(EntryTag) -> MapEntryType
|
||||
{
|
||||
// We need to keep holding on the locks for each shard we process since if we don't find the
|
||||
// key anywhere, we want to insert it at the earliest possible position (which may be several
|
||||
// shards away). Ideally cross-shard chains are quite rare, so this shouldn't be a big deal.
|
||||
//
|
||||
// NB: Somewhat real chance of a deadlock! E.g. one thread has a ridiculously long chain that
|
||||
// starts at block N and wraps around the hashmap to N-1, yet another thread begins a lookup at
|
||||
// N-1 during this and has a chain that lasts a few shards. Then thread 1 is blocked on thread 2
|
||||
// to get to shard N-1 but thread 2 is blocked on thread 1 to get to shard N. Pretty fringe case
|
||||
// since chains shouldn't last very long, but still a problem with this somewhat naive sharding
|
||||
// mechanism.
|
||||
//
|
||||
// We could fix this by either refusing to hold locks and only inserting into the earliest entry
|
||||
// within the current shard (which effectively means after a while we forget about certain open
|
||||
// entries at the end of shards) or by pivoting to a more involved concurrency setup?
|
||||
let mut shards = Vec::new();
|
||||
let mut insert_pos = None;
|
||||
let mut insert_shard = None;
|
||||
|
||||
let num_buckets = self.get_num_buckets();
|
||||
let shard_size = num_buckets / self.dict_shards.len();
|
||||
let bucket_pos = hash as usize % num_buckets;
|
||||
let shard_start = bucket_pos / shard_size;
|
||||
let mut entry_pos = hash as usize % num_buckets;
|
||||
let shard_start = entry_pos / shard_size;
|
||||
for off in 0..self.dict_shards.len() {
|
||||
let shard_idx = (shard_start + off) % self.dict_shards.len();
|
||||
let shard = self.dict_shards[shard_idx].write();
|
||||
let mut inserted = false;
|
||||
let entry_start = if off == 0 { bucket_pos % shard_size } else { 0 };
|
||||
let entry_start = if off == 0 { entry_pos % shard_size } else { 0 };
|
||||
for entry_idx in entry_start..shard.len() {
|
||||
entry_pos += 1;
|
||||
match f(shard.keys[entry_idx].tag) {
|
||||
MapEntryType::Skip => continue,
|
||||
MapEntryType::Empty => {
|
||||
@@ -243,6 +287,7 @@ impl<'a, K: Clone + Hash + Eq, V> CoreHashMap<'a, K, V> {
|
||||
shard_pos: entry_idx,
|
||||
bucket_pos,
|
||||
bucket_arr: &self.bucket_arr,
|
||||
key_pos: entry_pos,
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,23 +8,11 @@ use crate::sync::{RwLockWriteGuard, ValueWriteGuard};
|
||||
|
||||
use std::hash::Hash;
|
||||
|
||||
use super::core::EntryKey;
|
||||
|
||||
pub enum Entry<'a, K, V> {
|
||||
Occupied(OccupiedEntry<'a, K, V>),
|
||||
Vacant(VacantEntry<'a, K, V>),
|
||||
}
|
||||
|
||||
impl<'a, K, V> Entry<'a, K, V> {
|
||||
pub fn loc(&self) -> (RwLockWriteGuard<'a, DictShard<'a, K>>, usize) {
|
||||
match self {
|
||||
Self::Occupied(o) => o.shard.keys[o.shard_pos].tag,
|
||||
Self::Vacant(o) => o.shard.keys[o.shard_pos].tag
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
pub struct OccupiedEntry<'a, K, V> {
|
||||
/// Mutable reference to the shard of the map the entry is in.
|
||||
pub(crate) shard: RwLockWriteGuard<'a, DictShard<'a, K>>,
|
||||
@@ -81,7 +69,7 @@ impl<'a, K: Clone + Hash + Eq, V> VacantEntry<'a, K, V> {
|
||||
.expect("bucket is available if entry is");
|
||||
self.shard.keys[self.shard_pos].tag = EntryTag::Occupied;
|
||||
self.shard.keys[self.shard_pos].val.write(self._key);
|
||||
let idx = pos.next_checkeddd().expect("position is valid");
|
||||
let idx = pos.next_checked().expect("position is valid");
|
||||
self.shard.idxs[self.shard_pos] = pos;
|
||||
|
||||
RwLockWriteGuard::map(self.shard, |_| {
|
||||
|
||||
Reference in New Issue
Block a user