mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-08 22:12:56 +00:00
nw: add shrinking and deletion skeletons
This commit is contained in:
@@ -292,6 +292,54 @@ where
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn begin_shrink(&mut self, num_buckets: u32) {
|
||||
let map = unsafe { self.shared_ptr.as_mut() }.unwrap();
|
||||
if num_buckets < map.inner.get_num_buckets() as u32 {
|
||||
panic!("shrink called with a larger number of buckets");
|
||||
}
|
||||
map.inner.alloc_limit = num_buckets;
|
||||
}
|
||||
|
||||
fn finish_shrink(&mut self) -> Result<(), crate::shmem::Error> {
|
||||
let map = unsafe { self.shared_ptr.as_mut() }.unwrap();
|
||||
let inner = &mut map.inner;
|
||||
if !inner.is_shrinking() {
|
||||
panic!("called finish_shrink when no shrink is in progress");
|
||||
}
|
||||
|
||||
let new_num_buckets = inner.alloc_limit;
|
||||
|
||||
if inner.get_num_buckets() == new_num_buckets as usize {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
for b in &inner.buckets[new_num_buckets as usize..] {
|
||||
if b.inner.is_some() {
|
||||
// TODO(quantumish) Do we want to treat this as a violation of an invariant
|
||||
// or a legitimate error the caller can run into? Originally I thought this
|
||||
// could return something like a UnevictedError(index) as soon as it runs
|
||||
// into something (that way a caller could clear their soon-to-be-shrinked
|
||||
// buckets by repeatedly trying to call `finish_shrink`).
|
||||
//
|
||||
// Would require making a wider error type enum with this and shmem errors.
|
||||
panic!("unevicted entries in shrinked space")
|
||||
}
|
||||
}
|
||||
|
||||
let shmem_handle = self
|
||||
.shmem_handle
|
||||
.as_ref()
|
||||
.expect("shrink called on a fixed-size hash table");
|
||||
|
||||
let size_bytes = HashMapInit::<K, V>::estimate_size(new_num_buckets);
|
||||
shmem_handle.set_size(size_bytes)?;
|
||||
let end_ptr: *mut u8 = unsafe { shmem_handle.data_ptr.as_ptr().add(size_bytes) };
|
||||
|
||||
let buckets_ptr = inner.buckets.as_mut_ptr();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// TODO: Shrinking is a multi-step process that requires co-operation from the caller
|
||||
//
|
||||
// 1. The caller must first call begin_shrink(). That forbids allocation of higher-numbered
|
||||
|
||||
@@ -22,6 +22,7 @@ pub(crate) struct CoreHashMap<'a, K, V> {
|
||||
pub(crate) free_head: u32,
|
||||
|
||||
pub(crate) _user_list_head: u32,
|
||||
pub(crate) alloc_limit: u32,
|
||||
|
||||
// metrics
|
||||
pub(crate) buckets_in_use: u32,
|
||||
@@ -83,6 +84,7 @@ where
|
||||
free_head: 0,
|
||||
buckets_in_use: 0,
|
||||
_user_list_head: INVALID_POS,
|
||||
alloc_limit: INVALID_POS,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -147,25 +149,50 @@ where
|
||||
self.buckets.len()
|
||||
}
|
||||
|
||||
pub fn is_shrinking(&self) -> bool {
|
||||
self.alloc_limit != INVALID_POS
|
||||
}
|
||||
|
||||
pub fn entry_at_bucket(&mut self, pos: usize) -> Option<OccupiedEntry<K, V>> {
|
||||
if pos >= self.buckets.len() {
|
||||
return None;
|
||||
}
|
||||
|
||||
todo!()
|
||||
//self.buckets[pos].inner.as_ref()
|
||||
let entry = self.buckets[pos].inner.as_ref();
|
||||
if entry.is_none() {
|
||||
return None;
|
||||
}
|
||||
|
||||
let (key, _) = entry.unwrap();
|
||||
Some(OccupiedEntry {
|
||||
_key: key.clone(), // TODO(quantumish): clone unavoidable?
|
||||
bucket_pos: pos as u32,
|
||||
map: self,
|
||||
prev_pos: todo!(), // TODO(quantumish): possibly needs O(n) traversals to rediscover - costly!
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn alloc_bucket(&mut self, key: K, value: V) -> Result<u32, FullError> {
|
||||
let pos = self.free_head;
|
||||
if pos == INVALID_POS {
|
||||
return Err(FullError());
|
||||
}
|
||||
let mut pos = self.free_head;
|
||||
|
||||
let bucket = &mut self.buckets[pos as usize];
|
||||
self.free_head = bucket.next;
|
||||
self.buckets_in_use += 1;
|
||||
// TODO(quantumish): relies on INVALID_POS being u32::MAX by default!
|
||||
// instead add a clause `pos != INVALID_POS`?
|
||||
let mut prev = PrevPos::First(self.free_head);
|
||||
while pos < self.alloc_limit {
|
||||
if pos == INVALID_POS {
|
||||
return Err(FullError());
|
||||
}
|
||||
let bucket = &mut self.buckets[pos as usize];
|
||||
prev = PrevPos::Chained(pos);
|
||||
pos = bucket.next;
|
||||
}
|
||||
let bucket = &mut self.buckets[pos as usize];
|
||||
match prev {
|
||||
PrevPos::First(_) => self.free_head = bucket.next,
|
||||
PrevPos::Chained(p) => self.buckets[p].next = bucket.next,
|
||||
}
|
||||
|
||||
self.buckets_in_use += 1;
|
||||
bucket.next = INVALID_POS;
|
||||
bucket.inner = Some((key, value));
|
||||
|
||||
|
||||
Reference in New Issue
Block a user