diff --git a/Cargo.lock b/Cargo.lock index 052e85ed66..2e0fa0a2b2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1359,7 +1359,7 @@ dependencies = [ "http 1.1.0", "libc", "metrics", - "neonart", + "neon-shmem", "nix 0.30.1", "pageserver_client_grpc", "pageserver_page_api", @@ -3930,6 +3930,9 @@ name = "neon-shmem" version = "0.1.0" dependencies = [ "nix 0.30.1", + "rand 0.9.1", + "rand_distr 0.5.1", + "spin", "tempfile", "thiserror 1.0.69", "workspace_hack", @@ -4378,12 +4381,15 @@ name = "pagebench" version = "0.1.0" dependencies = [ "anyhow", + "axum 0.8.1", "camino", "clap", "futures", "hdrhistogram", + "http 1.1.0", "humantime", "humantime-serde", + "metrics", "pageserver_api", "pageserver_client", "pageserver_client_grpc", @@ -4586,7 +4592,9 @@ dependencies = [ "http 1.1.0", "hyper 1.6.0", "hyper-util", + "metrics", "pageserver_page_api", + "priority-queue", "rand 0.8.5", "thiserror 1.0.69", "tokio", @@ -5175,6 +5183,17 @@ dependencies = [ "elliptic-curve 0.13.8", ] +[[package]] +name = "priority-queue" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef08705fa1589a1a59aa924ad77d14722cb0cd97b67dd5004ed5f4a4873fce8d" +dependencies = [ + "autocfg", + "equivalent", + "indexmap 2.9.0", +] + [[package]] name = "proc-macro2" version = "1.0.94" diff --git a/Cargo.toml b/Cargo.toml index 825d05375b..06e5bb0f7c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -258,6 +258,7 @@ endpoint_storage = { version = "0.0.1", path = "./endpoint_storage/" } http-utils = { version = "0.1", path = "./libs/http-utils/" } metrics = { version = "0.1", path = "./libs/metrics/" } neonart = { version = "0.1", path = "./libs/neonart/" } +neon-shmem = { version = "0.1", path = "./libs/neon-shmem/" } pageserver = { path = "./pageserver" } pageserver_api = { version = "0.1", path = "./libs/pageserver_api/" } pageserver_client = { path = "./pageserver/client" } diff --git a/libs/neon-shmem/Cargo.toml b/libs/neon-shmem/Cargo.toml index 2a636bec40..43e287f6dc 100644 --- a/libs/neon-shmem/Cargo.toml +++ b/libs/neon-shmem/Cargo.toml @@ -6,8 +6,13 @@ license.workspace = true [dependencies] thiserror.workspace = true -nix.workspace=true +nix.workspace = true +spin.workspace = true workspace_hack = { version = "0.1", path = "../../workspace_hack" } +[dev-dependencies] +rand = "0.9.1" +rand_distr = "0.5.1" + [target.'cfg(target_os = "macos")'.dependencies] tempfile = "3.14.0" diff --git a/libs/neon-shmem/src/hash.rs b/libs/neon-shmem/src/hash.rs new file mode 100644 index 0000000000..c81d75c91d --- /dev/null +++ b/libs/neon-shmem/src/hash.rs @@ -0,0 +1,265 @@ +//! Hash table implementation on top of 'shmem' +//! +//! Features required in the long run by the communicator project: +//! +//! [X] Accessible from both Postgres processes and rust threads in the communicator process +//! [X] Low latency +//! [ ] Scalable to lots of concurrent accesses (currently uses a single spinlock) +//! [ ] Resizable + +use std::cmp::Eq; +use std::fmt::Debug; +use std::hash::Hash; +use std::ops::Deref; + +use crate::shmem::ShmemHandle; + +use spin; + +mod core; + +#[cfg(test)] +mod tests; + +use core::CoreHashMap; + +/// Fixed-length key type +pub trait Key: Clone + Debug + Hash + Eq { + const KEY_LEN: usize; + + fn as_bytes(&self) -> &[u8]; +} + +/// Values stored in the hash table +pub trait Value {} + +pub enum UpdateAction { + Nothing, + Insert(V), + Remove, +} + +#[derive(Debug)] +pub struct OutOfMemoryError(); + +pub struct HashMapInit<'a, K, V> +where + K: Key, + V: Value, +{ + shmem: ShmemHandle, + shared_ptr: *mut HashMapShared<'a, K, V>, +} + +pub struct HashMapAccess<'a, K: Key, V: Value> { + _shmem: ShmemHandle, + shared_ptr: *mut HashMapShared<'a, K, V>, +} + +unsafe impl<'a, K: Key + Sync, V: Value + Sync> Sync for HashMapAccess<'a, K, V> {} +unsafe impl<'a, K: Key + Send, V: Value + Send> Send for HashMapAccess<'a, K, V> {} + +impl<'a, K: Key, V: Value> HashMapInit<'a, K, V> { + pub fn attach_writer(self) -> HashMapAccess<'a, K, V> { + HashMapAccess { + _shmem: self.shmem, + shared_ptr: self.shared_ptr, + } + } + + pub fn attach_reader(self) -> HashMapAccess<'a, K, V> { + // no difference to attach_writer currently + self.attach_writer() + } +} + +// This is stored in the shared memory area +struct HashMapShared<'a, K, V> +where + K: Key, + V: Value, +{ + inner: spin::RwLock>, +} + +impl<'a, K: Key, V: Value> HashMapInit<'a, K, V> { + /// Initialize a new hash map in the given shared memory area + pub fn init_in_shmem(mut shmem: ShmemHandle, size: usize) -> HashMapInit<'a, K, V> { + shmem + .set_size(size) + .expect("could not resize shared memory area"); + + // carve out HashMapShared from the struct. This does not include the hashmap's dictionary + // and buckets. + let mut ptr: *mut u8 = unsafe { shmem.data_ptr.as_mut() }; + ptr = unsafe { ptr.add(ptr.align_offset(align_of::>())) }; + let shared_ptr: *mut HashMapShared = ptr.cast(); + ptr = unsafe { ptr.add(size_of::>()) }; + + // the rest of the space is given to the hash map's dictionary and buckets + let remaining_area = unsafe { + std::slice::from_raw_parts_mut( + ptr, + size - ptr.offset_from(shmem.data_ptr.as_mut()) as usize, + ) + }; + + let hashmap = CoreHashMap::new(remaining_area); + unsafe { + std::ptr::write( + shared_ptr, + HashMapShared { + inner: spin::RwLock::new(hashmap), + }, + ); + } + + HashMapInit { shmem, shared_ptr } + } +} + +impl<'a, K: Key, V: Value> HashMapAccess<'a, K, V> { + pub fn get<'e>(&'e self, key: &K) -> Option> { + let map = unsafe { self.shared_ptr.as_ref() }.unwrap(); + let lock_guard = map.inner.read(); + + match lock_guard.get(key) { + None => None, + Some(val_ref) => { + let val_ptr = std::ptr::from_ref(val_ref); + Some(ValueReadGuard { + _lock_guard: lock_guard, + value: val_ptr, + }) + } + } + } + + /// Insert a value + pub fn insert(&self, key: &K, value: V) -> Result { + let mut success = None; + + self.update_with_fn(key, |existing| { + if let Some(_) = existing { + success = Some(false); + UpdateAction::Nothing + } else { + success = Some(true); + UpdateAction::Insert(value) + } + })?; + Ok(success.expect("value_fn not called")) + } + + /// Remove value. Returns true if it existed + pub fn remove(&self, key: &K) -> bool { + let mut result = false; + self.update_with_fn(key, |existing| match existing { + Some(_) => { + result = true; + UpdateAction::Remove + } + None => UpdateAction::Nothing, + }) + .expect("out of memory while removing"); + result + } + + /// Update key using the given function. All the other modifying operations are based on this. + pub fn update_with_fn(&self, key: &K, value_fn: F) -> Result<(), OutOfMemoryError> + where + F: FnOnce(Option<&V>) -> UpdateAction, + { + let map = unsafe { self.shared_ptr.as_ref() }.unwrap(); + let mut lock_guard = map.inner.write(); + + let old_val = lock_guard.get(key); + let action = value_fn(old_val); + match (old_val, action) { + (_, UpdateAction::Nothing) => {} + (_, UpdateAction::Insert(new_val)) => { + let _ = lock_guard.insert(key, new_val); + } + (None, UpdateAction::Remove) => panic!("Remove action with no old value"), + (Some(_), UpdateAction::Remove) => { + let _ = lock_guard.remove(key); + } + } + + Ok(()) + } + + /// Update key using the given function. All the other modifying operations are based on this. + pub fn update_with_fn_at_bucket( + &self, + pos: usize, + value_fn: F, + ) -> Result<(), OutOfMemoryError> + where + F: FnOnce(Option<&V>) -> UpdateAction, + { + let map = unsafe { self.shared_ptr.as_ref() }.unwrap(); + let mut lock_guard = map.inner.write(); + + let old_val = lock_guard.get_bucket(pos); + let action = value_fn(old_val.map(|(_k, v)| v)); + match (old_val, action) { + (_, UpdateAction::Nothing) => {} + (_, UpdateAction::Insert(_new_val)) => panic!("cannot insert without key"), + (None, UpdateAction::Remove) => panic!("Remove action with no old value"), + (Some((key, _value)), UpdateAction::Remove) => { + let key = key.clone(); + let _ = lock_guard.remove(&key); + } + } + + Ok(()) + } + + pub fn get_num_buckets(&self) -> usize { + let map = unsafe { self.shared_ptr.as_ref() }.unwrap(); + map.inner.read().get_num_buckets() + } + + /// Return the key and value stored in bucket with given index. This can be used to + /// iterate through the hash map. (An Iterator might be nicer. The communicator's + /// clock algorithm needs to _slowly_ iterate through all buckets with its clock hand, + /// without holding a lock. If we switch to an Iterator, it must not hold the lock.) + pub fn get_bucket<'e>(&'e self, pos: usize) -> Option> { + let map = unsafe { self.shared_ptr.as_ref() }.unwrap(); + let lock_guard = map.inner.read(); + + match lock_guard.get_bucket(pos) { + None => None, + Some((_key, val_ref)) => { + let val_ptr = std::ptr::from_ref(val_ref); + Some(ValueReadGuard { + _lock_guard: lock_guard, + value: val_ptr, + }) + } + } + } + + // for metrics + pub fn get_num_buckets_in_use(&self) -> usize { + let map = unsafe { self.shared_ptr.as_ref() }.unwrap(); + map.inner.read().buckets_in_use as usize + } +} + +pub struct ValueReadGuard<'a, K: Key, V: Value> { + _lock_guard: spin::RwLockReadGuard<'a, CoreHashMap<'a, K, V>>, + value: *const V, +} + +impl<'a, K: Key, V: Value> Deref for ValueReadGuard<'a, K, V> { + type Target = V; + + fn deref(&self) -> &Self::Target { + // SAFETY: The `lock_guard` ensures that the underlying map (and thus the value pointed to + // by `value`) remains valid for the lifetime `'a`. The `value` has been obtained from a + // valid reference within the map. + unsafe { &*self.value } + } +} diff --git a/libs/neon-shmem/src/hash/core.rs b/libs/neon-shmem/src/hash/core.rs new file mode 100644 index 0000000000..0b0528d5da --- /dev/null +++ b/libs/neon-shmem/src/hash/core.rs @@ -0,0 +1,224 @@ +//! Simple hash table with chaining + +use std::hash::{DefaultHasher, Hasher}; +use std::mem::MaybeUninit; + +use crate::hash::Key; + +const INVALID_POS: u32 = u32::MAX; + +// Bucket +struct Bucket { + hash: u64, + next: u32, + inner: Option<(K, V)>, +} + +pub(crate) struct CoreHashMap<'a, K: Key, V> { + dictionary: &'a mut [u32], + buckets: &'a mut [Bucket], + free_head: u32, + + // metrics + pub(crate) buckets_in_use: u32, +} + +pub struct FullError(); + +impl<'a, K: Key, V> CoreHashMap<'a, K, V> { + const FILL_FACTOR: f32 = 0.5; + + pub fn new(area: &'a mut [u8]) -> CoreHashMap<'a, K, V> { + let len = area.len(); + + let mut ptr: *mut u8 = area.as_mut_ptr(); + let end_ptr: *mut u8 = unsafe { area.as_mut_ptr().add(len) }; + + // How much space is left? + let size_remain = unsafe { end_ptr.byte_offset_from(ptr) }; + + let num_buckets = f32::floor( + size_remain as f32 + / (size_of::>() as f32 + + size_of::() as f32 * 1.0 / Self::FILL_FACTOR), + ) as usize; + + // carve out the buckets + ptr = unsafe { ptr.byte_add(ptr.align_offset(align_of::>())) }; + let buckets_ptr = ptr; + ptr = unsafe { ptr.add(size_of::>() * num_buckets) }; + + // use remaining space for the dictionary + ptr = unsafe { ptr.byte_add(ptr.align_offset(align_of::())) }; + let dictionary_ptr = ptr; + + assert!(ptr.addr() < end_ptr.addr()); + let dictionary_size = unsafe { end_ptr.byte_offset_from(ptr) / size_of::() as isize }; + assert!(dictionary_size > 0); + + // Initialize the buckets + let buckets = { + let buckets_ptr: *mut MaybeUninit> = buckets_ptr.cast(); + let buckets = unsafe { std::slice::from_raw_parts_mut(buckets_ptr, num_buckets) }; + for i in 0..buckets.len() { + buckets[i].write(Bucket { + hash: 0, + next: if i < buckets.len() - 1 { + i as u32 + 1 + } else { + INVALID_POS + }, + inner: None, + }); + } + // TODO: use std::slice::assume_init_mut() once it stabilizes + unsafe { std::slice::from_raw_parts_mut(buckets_ptr.cast(), num_buckets) } + }; + + // Initialize the dictionary + let dictionary = { + let dictionary_ptr: *mut MaybeUninit = dictionary_ptr.cast(); + let dictionary = + unsafe { std::slice::from_raw_parts_mut(dictionary_ptr, dictionary_size as usize) }; + + for i in 0..dictionary.len() { + dictionary[i].write(INVALID_POS); + } + // TODO: use std::slice::assume_init_mut() once it stabilizes + unsafe { + std::slice::from_raw_parts_mut(dictionary_ptr.cast(), dictionary_size as usize) + } + }; + + CoreHashMap { + dictionary, + buckets, + free_head: 0, + buckets_in_use: 0, + } + } + + pub fn get(&self, key: &K) -> Option<&V> { + let mut hasher = DefaultHasher::new(); + key.hash(&mut hasher); + let hash = hasher.finish(); + + let mut next = self.dictionary[hash as usize % self.dictionary.len()]; + loop { + if next == INVALID_POS { + return None; + } + + let bucket = &self.buckets[next as usize]; + let (bucket_key, bucket_value) = bucket.inner.as_ref().expect("entry is in use"); + if bucket_key == key { + return Some(&bucket_value); + } + next = bucket.next; + } + } + + pub fn insert(&mut self, key: &K, value: V) -> Result<(), FullError> { + let mut hasher = DefaultHasher::new(); + key.hash(&mut hasher); + let hash = hasher.finish(); + + let first = self.dictionary[hash as usize % self.dictionary.len()]; + if first == INVALID_POS { + // no existing entry + let pos = self.alloc_bucket(key.clone(), value, hash)?; + if pos == INVALID_POS { + return Err(FullError()); + } + self.dictionary[hash as usize % self.dictionary.len()] = pos; + return Ok(()); + } + + let mut next = first; + loop { + let bucket = &mut self.buckets[next as usize]; + let (bucket_key, bucket_value) = bucket.inner.as_mut().expect("entry is in use"); + if bucket_key == key { + // found existing entry, update its value + *bucket_value = value; + return Ok(()); + } + + if bucket.next == INVALID_POS { + // No existing entry found. Append to the chain + let pos = self.alloc_bucket(key.clone(), value, hash)?; + if pos == INVALID_POS { + return Err(FullError()); + } + self.buckets[next as usize].next = pos; + return Ok(()); + } + next = bucket.next; + } + } + + pub fn remove(&mut self, key: &K) -> Result<(), FullError> { + let mut hasher = DefaultHasher::new(); + key.hash(&mut hasher); + let hash = hasher.finish(); + + let mut next = self.dictionary[hash as usize % self.dictionary.len()]; + let mut prev_pos: u32 = INVALID_POS; + loop { + if next == INVALID_POS { + // no existing entry + return Ok(()); + } + let bucket = &mut self.buckets[next as usize]; + let (bucket_key, _) = bucket.inner.as_mut().expect("entry is in use"); + if bucket_key == key { + // found existing entry, unlink it from the chain + if prev_pos == INVALID_POS { + self.dictionary[hash as usize % self.dictionary.len()] = bucket.next; + } else { + self.buckets[prev_pos as usize].next = bucket.next; + } + + // and add it to the freelist + let bucket = &mut self.buckets[next as usize]; + bucket.hash = 0; + bucket.inner = None; + bucket.next = self.free_head; + self.free_head = next; + self.buckets_in_use -= 1; + return Ok(()); + } + prev_pos = next; + next = bucket.next; + } + } + + pub fn get_num_buckets(&self) -> usize { + self.buckets.len() + } + + pub fn get_bucket(&self, pos: usize) -> Option<&(K, V)> { + if pos >= self.buckets.len() { + return None; + } + + self.buckets[pos].inner.as_ref() + } + + fn alloc_bucket(&mut self, key: K, value: V, hash: u64) -> Result { + let pos = self.free_head; + if pos == INVALID_POS { + return Err(FullError()); + } + + let bucket = &mut self.buckets[pos as usize]; + self.free_head = bucket.next; + self.buckets_in_use += 1; + + bucket.hash = hash; + bucket.next = INVALID_POS; + bucket.inner = Some((key, value)); + + return Ok(pos); + } +} diff --git a/libs/neon-shmem/src/hash/tests.rs b/libs/neon-shmem/src/hash/tests.rs new file mode 100644 index 0000000000..c212b883a3 --- /dev/null +++ b/libs/neon-shmem/src/hash/tests.rs @@ -0,0 +1,194 @@ +use std::collections::BTreeMap; +use std::collections::HashSet; +use std::fmt::{Debug, Formatter}; +use std::sync::atomic::{AtomicUsize, Ordering}; + +use crate::hash::HashMapAccess; +use crate::hash::HashMapInit; +use crate::hash::UpdateAction; +use crate::hash::{Key, Value}; +use crate::shmem::ShmemHandle; + +use rand::Rng; +use rand::seq::SliceRandom; +use rand_distr::Zipf; + +const TEST_KEY_LEN: usize = 16; + +#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)] +struct TestKey([u8; TEST_KEY_LEN]); + +impl Key for TestKey { + const KEY_LEN: usize = TEST_KEY_LEN; + fn as_bytes(&self) -> &[u8] { + &self.0 + } +} + +impl From<&TestKey> for u128 { + fn from(val: &TestKey) -> u128 { + u128::from_be_bytes(val.0) + } +} + +impl From for TestKey { + fn from(val: u128) -> TestKey { + TestKey(val.to_be_bytes()) + } +} + +impl<'a> From<&'a [u8]> for TestKey { + fn from(bytes: &'a [u8]) -> TestKey { + TestKey(bytes.try_into().unwrap()) + } +} + +impl Value for usize {} + +fn test_inserts + Copy>(keys: &[K]) { + const MEM_SIZE: usize = 10000000; + let shmem = ShmemHandle::new("test_inserts", 0, MEM_SIZE).unwrap(); + + let init_struct = HashMapInit::::init_in_shmem(shmem, MEM_SIZE); + let mut w = init_struct.attach_writer(); + + for (idx, k) in keys.iter().enumerate() { + let res = w.insert(&(*k).into(), idx); + assert!(res.is_ok()); + } + + for (idx, k) in keys.iter().enumerate() { + let x = w.get(&(*k).into()); + let value = x.as_deref().copied(); + assert_eq!(value, Some(idx)); + } + + //eprintln!("stats: {:?}", tree_writer.get_statistics()); +} + +#[test] +fn dense() { + // This exercises splitting a node with prefix + let keys: &[u128] = &[0, 1, 2, 3, 256]; + test_inserts(keys); + + // Dense keys + let mut keys: Vec = (0..10000).collect(); + test_inserts(&keys); + + // Do the same in random orders + for _ in 1..10 { + keys.shuffle(&mut rand::rng()); + test_inserts(&keys); + } +} + +#[test] +fn sparse() { + // sparse keys + let mut keys: Vec = Vec::new(); + let mut used_keys = HashSet::new(); + for _ in 0..10000 { + loop { + let key = rand::random::(); + if used_keys.get(&key).is_some() { + continue; + } + used_keys.insert(key); + keys.push(key.into()); + break; + } + } + test_inserts(&keys); +} + +struct TestValue(AtomicUsize); + +impl TestValue { + fn new(val: usize) -> TestValue { + TestValue(AtomicUsize::new(val)) + } + + fn load(&self) -> usize { + self.0.load(Ordering::Relaxed) + } +} + +impl Value for TestValue {} + +impl Clone for TestValue { + fn clone(&self) -> TestValue { + TestValue::new(self.load()) + } +} + +impl Debug for TestValue { + fn fmt(&self, fmt: &mut Formatter<'_>) -> Result<(), std::fmt::Error> { + write!(fmt, "{:?}", self.load()) + } +} + +#[derive(Clone, Debug)] +struct TestOp(TestKey, Option); + +fn apply_op( + op: &TestOp, + sut: &HashMapAccess, + shadow: &mut BTreeMap, +) { + eprintln!("applying op: {op:?}"); + + // apply the change to the shadow tree first + let shadow_existing = if let Some(v) = op.1 { + shadow.insert(op.0, v) + } else { + shadow.remove(&op.0) + }; + + // apply to Art tree + sut.update_with_fn(&op.0, |existing| { + assert_eq!(existing.map(TestValue::load), shadow_existing); + + match (existing, op.1) { + (None, None) => UpdateAction::Nothing, + (None, Some(new_val)) => UpdateAction::Insert(TestValue::new(new_val)), + (Some(_old_val), None) => UpdateAction::Remove, + (Some(old_val), Some(new_val)) => { + old_val.0.store(new_val, Ordering::Relaxed); + UpdateAction::Nothing + } + } + }) + .expect("out of memory"); +} + +#[test] +fn random_ops() { + const MEM_SIZE: usize = 10000000; + let shmem = ShmemHandle::new("test_inserts", 0, MEM_SIZE).unwrap(); + + let init_struct = HashMapInit::::init_in_shmem(shmem, MEM_SIZE); + let writer = init_struct.attach_writer(); + + let mut shadow: std::collections::BTreeMap = BTreeMap::new(); + + let distribution = Zipf::new(u128::MAX as f64, 1.1).unwrap(); + let mut rng = rand::rng(); + for i in 0..100000 { + let mut key: TestKey = (rng.sample(distribution) as u128).into(); + + if rng.random_bool(0.10) { + key = TestKey::from(u128::from(&key) | 0xffffffff); + } + + let op = TestOp(key, if rng.random_bool(0.75) { Some(i) } else { None }); + + apply_op(&op, &writer, &mut shadow); + + if i % 1000 == 0 { + eprintln!("{i} ops processed"); + //eprintln!("stats: {:?}", tree_writer.get_statistics()); + //test_iter(&tree_writer, &shadow); + } + } +} diff --git a/libs/neon-shmem/src/lib.rs b/libs/neon-shmem/src/lib.rs index e1b14b1371..f601010122 100644 --- a/libs/neon-shmem/src/lib.rs +++ b/libs/neon-shmem/src/lib.rs @@ -1,418 +1,4 @@ //! Shared memory utilities for neon communicator -use std::num::NonZeroUsize; -use std::os::fd::{AsFd, BorrowedFd, OwnedFd}; -use std::ptr::NonNull; -use std::sync::atomic::{AtomicUsize, Ordering}; - -use nix::errno::Errno; -use nix::sys::mman::MapFlags; -use nix::sys::mman::ProtFlags; -use nix::sys::mman::mmap as nix_mmap; -use nix::sys::mman::munmap as nix_munmap; -use nix::unistd::ftruncate as nix_ftruncate; - -/// ShmemHandle represents a shared memory area that can be shared by processes over fork(). -/// Unlike shared memory allocated by Postgres, this area is resizable, up to 'max_size' that's -/// specified at creation. -/// -/// The area is backed by an anonymous file created with memfd_create(). The full address space for -/// 'max_size' is reserved up-front with mmap(), but whenever you call [`ShmemHandle::set_size`], -/// the underlying file is resized. Do not access the area beyond the current size. Currently, that -/// will cause the file to be expanded, but we might use mprotect() etc. to enforce that in the -/// future. -pub struct ShmemHandle { - /// memfd file descriptor - fd: OwnedFd, - - max_size: usize, - - // Pointer to the beginning of the shared memory area. The header is stored there. - shared_ptr: NonNull, - - // Pointer to the beginning of the user data - pub data_ptr: NonNull, -} - -/// This is stored at the beginning in the shared memory area. -struct SharedStruct { - max_size: usize, - - /// Current size of the backing file. The high-order bit is used for the RESIZE_IN_PROGRESS flag - current_size: AtomicUsize, -} - -const RESIZE_IN_PROGRESS: usize = 1 << 63; - -const HEADER_SIZE: usize = std::mem::size_of::(); - -/// Error type returned by the ShmemHandle functions. -#[derive(thiserror::Error, Debug)] -#[error("{msg}: {errno}")] -pub struct Error { - pub msg: String, - pub errno: Errno, -} - -impl Error { - fn new(msg: &str, errno: Errno) -> Error { - Error { - msg: msg.to_string(), - errno, - } - } -} - -impl ShmemHandle { - /// Create a new shared memory area. To communicate between processes, the processes need to be - /// fork()'d after calling this, so that the ShmemHandle is inherited by all processes. - /// - /// If the ShmemHandle is dropped, the memory is unmapped from the current process. Other - /// processes can continue using it, however. - pub fn new(name: &str, initial_size: usize, max_size: usize) -> Result { - // create the backing anonymous file. - let fd = create_backing_file(name)?; - - Self::new_with_fd(fd, initial_size, max_size) - } - - fn new_with_fd( - fd: OwnedFd, - initial_size: usize, - max_size: usize, - ) -> Result { - // We reserve the high-order bit for the RESIZE_IN_PROGRESS flag, and the actual size - // is a little larger than this because of the SharedStruct header. Make the upper limit - // somewhat smaller than that, because with anything close to that, you'll run out of - // memory anyway. - if max_size >= 1 << 48 { - panic!("max size {} too large", max_size); - } - if initial_size > max_size { - panic!("initial size {initial_size} larger than max size {max_size}"); - } - - // The actual initial / max size is the one given by the caller, plus the size of - // 'SharedStruct'. - let initial_size = HEADER_SIZE + initial_size; - let max_size = NonZeroUsize::new(HEADER_SIZE + max_size).unwrap(); - - // Reserve address space for it with mmap - // - // TODO: Use MAP_HUGETLB if possible - let start_ptr = unsafe { - nix_mmap( - None, - max_size, - ProtFlags::PROT_READ | ProtFlags::PROT_WRITE, - MapFlags::MAP_SHARED, - &fd, - 0, - ) - } - .map_err(|e| Error::new("mmap failed: {e}", e))?; - - // Reserve space for the initial size - enlarge_file(fd.as_fd(), initial_size as u64)?; - - // Initialize the header - let shared: NonNull = start_ptr.cast(); - unsafe { - shared.write(SharedStruct { - max_size: max_size.into(), - current_size: AtomicUsize::new(initial_size), - }) - }; - - // The user data begins after the header - let data_ptr = unsafe { start_ptr.cast().add(HEADER_SIZE) }; - - Ok(ShmemHandle { - fd, - max_size: max_size.into(), - shared_ptr: shared, - data_ptr, - }) - } - - // return reference to the header - fn shared(&self) -> &SharedStruct { - unsafe { self.shared_ptr.as_ref() } - } - - /// Resize the shared memory area. 'new_size' must not be larger than the 'max_size' specified - /// when creating the area. - /// - /// This may only be called from one process/thread concurrently. We detect that case - /// and return an Error. - pub fn set_size(&self, new_size: usize) -> Result<(), Error> { - let new_size = new_size + HEADER_SIZE; - let shared = self.shared(); - - if new_size > self.max_size { - panic!( - "new size ({} is greater than max size ({})", - new_size, self.max_size - ); - } - assert_eq!(self.max_size, shared.max_size); - - // Lock the area by setting the bit in 'current_size' - // - // Ordering::Relaxed would probably be sufficient here, as we don't access any other memory - // and the posix_fallocate/ftruncate call is surely a synchronization point anyway. But - // since this is not performance-critical, better safe than sorry . - let mut old_size = shared.current_size.load(Ordering::Acquire); - loop { - if (old_size & RESIZE_IN_PROGRESS) != 0 { - return Err(Error::new( - "concurrent resize detected", - Errno::UnknownErrno, - )); - } - match shared.current_size.compare_exchange( - old_size, - new_size, - Ordering::Acquire, - Ordering::Relaxed, - ) { - Ok(_) => break, - Err(x) => old_size = x, - } - } - - // Ok, we got the lock. - // - // NB: If anything goes wrong, we *must* clear the bit! - let result = { - use std::cmp::Ordering::{Equal, Greater, Less}; - match new_size.cmp(&old_size) { - Less => nix_ftruncate(&self.fd, new_size as i64).map_err(|e| { - Error::new("could not shrink shmem segment, ftruncate failed: {e}", e) - }), - Equal => Ok(()), - Greater => enlarge_file(self.fd.as_fd(), new_size as u64), - } - }; - - // Unlock - shared.current_size.store( - if result.is_ok() { new_size } else { old_size }, - Ordering::Release, - ); - - result - } - - /// Returns the current user-visible size of the shared memory segment. - /// - /// NOTE: a concurrent set_size() call can change the size at any time. It is the caller's - /// responsibility not to access the area beyond the current size. - pub fn current_size(&self) -> usize { - let total_current_size = - self.shared().current_size.load(Ordering::Relaxed) & !RESIZE_IN_PROGRESS; - total_current_size - HEADER_SIZE - } -} - -impl Drop for ShmemHandle { - fn drop(&mut self) { - // SAFETY: The pointer was obtained from mmap() with the given size. - // We unmap the entire region. - let _ = unsafe { nix_munmap(self.shared_ptr.cast(), self.max_size) }; - // The fd is dropped automatically by OwnedFd. - } -} - -/// Create a "backing file" for the shared memory area. On Linux, use memfd_create(), to create an -/// anonymous in-memory file. One macos, fall back to a regular file. That's good enough for -/// development and testing, but in production we want the file to stay in memory. -/// -/// disable 'unused_variables' warnings, because in the macos path, 'name' is unused. -#[allow(unused_variables)] -fn create_backing_file(name: &str) -> Result { - #[cfg(not(target_os = "macos"))] - { - nix::sys::memfd::memfd_create(name, nix::sys::memfd::MFdFlags::empty()) - .map_err(|e| Error::new("memfd_create failed: {e}", e)) - } - #[cfg(target_os = "macos")] - { - let file = tempfile::tempfile().map_err(|e| { - Error::new( - "could not create temporary file to back shmem area: {e}", - nix::errno::Errno::from_raw(e.raw_os_error().unwrap_or(0)), - ) - })?; - Ok(OwnedFd::from(file)) - } -} - -fn enlarge_file(fd: BorrowedFd, size: u64) -> Result<(), Error> { - // Use posix_fallocate() to enlarge the file. It reserves the space correctly, so that - // we don't get a segfault later when trying to actually use it. - #[cfg(not(target_os = "macos"))] - { - nix::fcntl::posix_fallocate(fd, 0, size as i64).map_err(|e| { - Error::new( - "could not grow shmem segment, posix_fallocate failed: {e}", - e, - ) - }) - } - // As a fallback on macos, which doesn't have posix_fallocate, use plain 'fallocate' - #[cfg(target_os = "macos")] - { - nix::unistd::ftruncate(fd, size as i64) - .map_err(|e| Error::new("could not grow shmem segment, ftruncate failed: {e}", e)) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - use nix::unistd::ForkResult; - use std::ops::Range; - - /// check that all bytes in given range have the expected value. - fn assert_range(ptr: *const u8, expected: u8, range: Range) { - for i in range { - let b = unsafe { *(ptr.add(i)) }; - assert_eq!(expected, b, "unexpected byte at offset {}", i); - } - } - - /// Write 'b' to all bytes in the given range - fn write_range(ptr: *mut u8, b: u8, range: Range) { - unsafe { std::ptr::write_bytes(ptr.add(range.start), b, range.end - range.start) }; - } - - // simple single-process test of growing and shrinking - #[test] - fn test_shmem_resize() -> Result<(), Error> { - let max_size = 1024 * 1024; - let init_struct = ShmemHandle::new("test_shmem_resize", 0, max_size)?; - - assert_eq!(init_struct.current_size(), 0); - - // Initial grow - let size1 = 10000; - init_struct.set_size(size1).unwrap(); - assert_eq!(init_struct.current_size(), size1); - - // Write some data - let data_ptr = init_struct.data_ptr.as_ptr(); - write_range(data_ptr, 0xAA, 0..size1); - assert_range(data_ptr, 0xAA, 0..size1); - - // Shrink - let size2 = 5000; - init_struct.set_size(size2).unwrap(); - assert_eq!(init_struct.current_size(), size2); - - // Grow again - let size3 = 20000; - init_struct.set_size(size3).unwrap(); - assert_eq!(init_struct.current_size(), size3); - - // Try to read it. The area that was shrunk and grown again should read as all zeros now - assert_range(data_ptr, 0xAA, 0..5000); - assert_range(data_ptr, 0, 5000..size1); - - // Try to grow beyond max_size - //let size4 = max_size + 1; - //assert!(init_struct.set_size(size4).is_err()); - - // Dropping init_struct should unmap the memory - drop(init_struct); - - Ok(()) - } - - /// This is used in tests to coordinate between test processes. It's like std::sync::Barrier, - /// but is stored in the shared memory area and works across processes. It's implemented by - /// polling, because e.g. standard rust mutexes are not guaranteed to work across processes. - struct SimpleBarrier { - num_procs: usize, - count: AtomicUsize, - } - - impl SimpleBarrier { - unsafe fn init(ptr: *mut SimpleBarrier, num_procs: usize) { - unsafe { - *ptr = SimpleBarrier { - num_procs, - count: AtomicUsize::new(0), - } - } - } - - pub fn wait(&self) { - let old = self.count.fetch_add(1, Ordering::Relaxed); - - let generation = old / self.num_procs; - - let mut current = old + 1; - while current < (generation + 1) * self.num_procs { - std::thread::sleep(std::time::Duration::from_millis(10)); - current = self.count.load(Ordering::Relaxed); - } - } - } - - #[test] - fn test_multi_process() { - // Initialize - let max_size = 1_000_000_000_000; - let init_struct = ShmemHandle::new("test_multi_process", 0, max_size).unwrap(); - let ptr = init_struct.data_ptr.as_ptr(); - - // Store the SimpleBarrier in the first 1k of the area. - init_struct.set_size(10000).unwrap(); - let barrier_ptr: *mut SimpleBarrier = unsafe { - ptr.add(ptr.align_offset(std::mem::align_of::())) - .cast() - }; - unsafe { SimpleBarrier::init(barrier_ptr, 2) }; - let barrier = unsafe { barrier_ptr.as_ref().unwrap() }; - - // Fork another test process. The code after this runs in both processes concurrently. - let fork_result = unsafe { nix::unistd::fork().unwrap() }; - - // In the parent, fill bytes between 1000..2000. In the child, between 2000..3000 - if fork_result.is_parent() { - write_range(ptr, 0xAA, 1000..2000); - } else { - write_range(ptr, 0xBB, 2000..3000); - } - barrier.wait(); - // Verify the contents. (in both processes) - assert_range(ptr, 0xAA, 1000..2000); - assert_range(ptr, 0xBB, 2000..3000); - - // Grow, from the child this time - let size = 10_000_000; - if !fork_result.is_parent() { - init_struct.set_size(size).unwrap(); - } - barrier.wait(); - - // make some writes at the end - if fork_result.is_parent() { - write_range(ptr, 0xAA, (size - 10)..size); - } else { - write_range(ptr, 0xBB, (size - 20)..(size - 10)); - } - barrier.wait(); - - // Verify the contents. (This runs in both processes) - assert_range(ptr, 0, (size - 1000)..(size - 20)); - assert_range(ptr, 0xBB, (size - 20)..(size - 10)); - assert_range(ptr, 0xAA, (size - 10)..size); - - if let ForkResult::Parent { child } = fork_result { - nix::sys::wait::waitpid(child, None).unwrap(); - } - } -} +pub mod hash; +pub mod shmem; diff --git a/libs/neon-shmem/src/shmem.rs b/libs/neon-shmem/src/shmem.rs new file mode 100644 index 0000000000..21b1454b10 --- /dev/null +++ b/libs/neon-shmem/src/shmem.rs @@ -0,0 +1,418 @@ +//! Dynamically resizable contiguous chunk of shared memory + +use std::num::NonZeroUsize; +use std::os::fd::{AsFd, BorrowedFd, OwnedFd}; +use std::ptr::NonNull; +use std::sync::atomic::{AtomicUsize, Ordering}; + +use nix::errno::Errno; +use nix::sys::mman::MapFlags; +use nix::sys::mman::ProtFlags; +use nix::sys::mman::mmap as nix_mmap; +use nix::sys::mman::munmap as nix_munmap; +use nix::unistd::ftruncate as nix_ftruncate; + +/// ShmemHandle represents a shared memory area that can be shared by processes over fork(). +/// Unlike shared memory allocated by Postgres, this area is resizable, up to 'max_size' that's +/// specified at creation. +/// +/// The area is backed by an anonymous file created with memfd_create(). The full address space for +/// 'max_size' is reserved up-front with mmap(), but whenever you call [`ShmemHandle::set_size`], +/// the underlying file is resized. Do not access the area beyond the current size. Currently, that +/// will cause the file to be expanded, but we might use mprotect() etc. to enforce that in the +/// future. +pub struct ShmemHandle { + /// memfd file descriptor + fd: OwnedFd, + + max_size: usize, + + // Pointer to the beginning of the shared memory area. The header is stored there. + shared_ptr: NonNull, + + // Pointer to the beginning of the user data + pub data_ptr: NonNull, +} + +/// This is stored at the beginning in the shared memory area. +struct SharedStruct { + max_size: usize, + + /// Current size of the backing file. The high-order bit is used for the RESIZE_IN_PROGRESS flag + current_size: AtomicUsize, +} + +const RESIZE_IN_PROGRESS: usize = 1 << 63; + +const HEADER_SIZE: usize = std::mem::size_of::(); + +/// Error type returned by the ShmemHandle functions. +#[derive(thiserror::Error, Debug)] +#[error("{msg}: {errno}")] +pub struct Error { + pub msg: String, + pub errno: Errno, +} + +impl Error { + fn new(msg: &str, errno: Errno) -> Error { + Error { + msg: msg.to_string(), + errno, + } + } +} + +impl ShmemHandle { + /// Create a new shared memory area. To communicate between processes, the processes need to be + /// fork()'d after calling this, so that the ShmemHandle is inherited by all processes. + /// + /// If the ShmemHandle is dropped, the memory is unmapped from the current process. Other + /// processes can continue using it, however. + pub fn new(name: &str, initial_size: usize, max_size: usize) -> Result { + // create the backing anonymous file. + let fd = create_backing_file(name)?; + + Self::new_with_fd(fd, initial_size, max_size) + } + + fn new_with_fd( + fd: OwnedFd, + initial_size: usize, + max_size: usize, + ) -> Result { + // We reserve the high-order bit for the RESIZE_IN_PROGRESS flag, and the actual size + // is a little larger than this because of the SharedStruct header. Make the upper limit + // somewhat smaller than that, because with anything close to that, you'll run out of + // memory anyway. + if max_size >= 1 << 48 { + panic!("max size {} too large", max_size); + } + if initial_size > max_size { + panic!("initial size {initial_size} larger than max size {max_size}"); + } + + // The actual initial / max size is the one given by the caller, plus the size of + // 'SharedStruct'. + let initial_size = HEADER_SIZE + initial_size; + let max_size = NonZeroUsize::new(HEADER_SIZE + max_size).unwrap(); + + // Reserve address space for it with mmap + // + // TODO: Use MAP_HUGETLB if possible + let start_ptr = unsafe { + nix_mmap( + None, + max_size, + ProtFlags::PROT_READ | ProtFlags::PROT_WRITE, + MapFlags::MAP_SHARED, + &fd, + 0, + ) + } + .map_err(|e| Error::new("mmap failed: {e}", e))?; + + // Reserve space for the initial size + enlarge_file(fd.as_fd(), initial_size as u64)?; + + // Initialize the header + let shared: NonNull = start_ptr.cast(); + unsafe { + shared.write(SharedStruct { + max_size: max_size.into(), + current_size: AtomicUsize::new(initial_size), + }) + }; + + // The user data begins after the header + let data_ptr = unsafe { start_ptr.cast().add(HEADER_SIZE) }; + + Ok(ShmemHandle { + fd, + max_size: max_size.into(), + shared_ptr: shared, + data_ptr, + }) + } + + // return reference to the header + fn shared(&self) -> &SharedStruct { + unsafe { self.shared_ptr.as_ref() } + } + + /// Resize the shared memory area. 'new_size' must not be larger than the 'max_size' specified + /// when creating the area. + /// + /// This may only be called from one process/thread concurrently. We detect that case + /// and return an Error. + pub fn set_size(&self, new_size: usize) -> Result<(), Error> { + let new_size = new_size + HEADER_SIZE; + let shared = self.shared(); + + if new_size > self.max_size { + panic!( + "new size ({} is greater than max size ({})", + new_size, self.max_size + ); + } + assert_eq!(self.max_size, shared.max_size); + + // Lock the area by setting the bit in 'current_size' + // + // Ordering::Relaxed would probably be sufficient here, as we don't access any other memory + // and the posix_fallocate/ftruncate call is surely a synchronization point anyway. But + // since this is not performance-critical, better safe than sorry . + let mut old_size = shared.current_size.load(Ordering::Acquire); + loop { + if (old_size & RESIZE_IN_PROGRESS) != 0 { + return Err(Error::new( + "concurrent resize detected", + Errno::UnknownErrno, + )); + } + match shared.current_size.compare_exchange( + old_size, + new_size, + Ordering::Acquire, + Ordering::Relaxed, + ) { + Ok(_) => break, + Err(x) => old_size = x, + } + } + + // Ok, we got the lock. + // + // NB: If anything goes wrong, we *must* clear the bit! + let result = { + use std::cmp::Ordering::{Equal, Greater, Less}; + match new_size.cmp(&old_size) { + Less => nix_ftruncate(&self.fd, new_size as i64).map_err(|e| { + Error::new("could not shrink shmem segment, ftruncate failed: {e}", e) + }), + Equal => Ok(()), + Greater => enlarge_file(self.fd.as_fd(), new_size as u64), + } + }; + + // Unlock + shared.current_size.store( + if result.is_ok() { new_size } else { old_size }, + Ordering::Release, + ); + + result + } + + /// Returns the current user-visible size of the shared memory segment. + /// + /// NOTE: a concurrent set_size() call can change the size at any time. It is the caller's + /// responsibility not to access the area beyond the current size. + pub fn current_size(&self) -> usize { + let total_current_size = + self.shared().current_size.load(Ordering::Relaxed) & !RESIZE_IN_PROGRESS; + total_current_size - HEADER_SIZE + } +} + +impl Drop for ShmemHandle { + fn drop(&mut self) { + // SAFETY: The pointer was obtained from mmap() with the given size. + // We unmap the entire region. + let _ = unsafe { nix_munmap(self.shared_ptr.cast(), self.max_size) }; + // The fd is dropped automatically by OwnedFd. + } +} + +/// Create a "backing file" for the shared memory area. On Linux, use memfd_create(), to create an +/// anonymous in-memory file. One macos, fall back to a regular file. That's good enough for +/// development and testing, but in production we want the file to stay in memory. +/// +/// disable 'unused_variables' warnings, because in the macos path, 'name' is unused. +#[allow(unused_variables)] +fn create_backing_file(name: &str) -> Result { + #[cfg(not(target_os = "macos"))] + { + nix::sys::memfd::memfd_create(name, nix::sys::memfd::MFdFlags::empty()) + .map_err(|e| Error::new("memfd_create failed: {e}", e)) + } + #[cfg(target_os = "macos")] + { + let file = tempfile::tempfile().map_err(|e| { + Error::new( + "could not create temporary file to back shmem area: {e}", + nix::errno::Errno::from_raw(e.raw_os_error().unwrap_or(0)), + ) + })?; + Ok(OwnedFd::from(file)) + } +} + +fn enlarge_file(fd: BorrowedFd, size: u64) -> Result<(), Error> { + // Use posix_fallocate() to enlarge the file. It reserves the space correctly, so that + // we don't get a segfault later when trying to actually use it. + #[cfg(not(target_os = "macos"))] + { + nix::fcntl::posix_fallocate(fd, 0, size as i64).map_err(|e| { + Error::new( + "could not grow shmem segment, posix_fallocate failed: {e}", + e, + ) + }) + } + // As a fallback on macos, which doesn't have posix_fallocate, use plain 'fallocate' + #[cfg(target_os = "macos")] + { + nix::unistd::ftruncate(fd, size as i64) + .map_err(|e| Error::new("could not grow shmem segment, ftruncate failed: {e}", e)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use nix::unistd::ForkResult; + use std::ops::Range; + + /// check that all bytes in given range have the expected value. + fn assert_range(ptr: *const u8, expected: u8, range: Range) { + for i in range { + let b = unsafe { *(ptr.add(i)) }; + assert_eq!(expected, b, "unexpected byte at offset {}", i); + } + } + + /// Write 'b' to all bytes in the given range + fn write_range(ptr: *mut u8, b: u8, range: Range) { + unsafe { std::ptr::write_bytes(ptr.add(range.start), b, range.end - range.start) }; + } + + // simple single-process test of growing and shrinking + #[test] + fn test_shmem_resize() -> Result<(), Error> { + let max_size = 1024 * 1024; + let init_struct = ShmemHandle::new("test_shmem_resize", 0, max_size)?; + + assert_eq!(init_struct.current_size(), 0); + + // Initial grow + let size1 = 10000; + init_struct.set_size(size1).unwrap(); + assert_eq!(init_struct.current_size(), size1); + + // Write some data + let data_ptr = init_struct.data_ptr.as_ptr(); + write_range(data_ptr, 0xAA, 0..size1); + assert_range(data_ptr, 0xAA, 0..size1); + + // Shrink + let size2 = 5000; + init_struct.set_size(size2).unwrap(); + assert_eq!(init_struct.current_size(), size2); + + // Grow again + let size3 = 20000; + init_struct.set_size(size3).unwrap(); + assert_eq!(init_struct.current_size(), size3); + + // Try to read it. The area that was shrunk and grown again should read as all zeros now + assert_range(data_ptr, 0xAA, 0..5000); + assert_range(data_ptr, 0, 5000..size1); + + // Try to grow beyond max_size + //let size4 = max_size + 1; + //assert!(init_struct.set_size(size4).is_err()); + + // Dropping init_struct should unmap the memory + drop(init_struct); + + Ok(()) + } + + /// This is used in tests to coordinate between test processes. It's like std::sync::Barrier, + /// but is stored in the shared memory area and works across processes. It's implemented by + /// polling, because e.g. standard rust mutexes are not guaranteed to work across processes. + struct SimpleBarrier { + num_procs: usize, + count: AtomicUsize, + } + + impl SimpleBarrier { + unsafe fn init(ptr: *mut SimpleBarrier, num_procs: usize) { + unsafe { + *ptr = SimpleBarrier { + num_procs, + count: AtomicUsize::new(0), + } + } + } + + pub fn wait(&self) { + let old = self.count.fetch_add(1, Ordering::Relaxed); + + let generation = old / self.num_procs; + + let mut current = old + 1; + while current < (generation + 1) * self.num_procs { + std::thread::sleep(std::time::Duration::from_millis(10)); + current = self.count.load(Ordering::Relaxed); + } + } + } + + #[test] + fn test_multi_process() { + // Initialize + let max_size = 1_000_000_000_000; + let init_struct = ShmemHandle::new("test_multi_process", 0, max_size).unwrap(); + let ptr = init_struct.data_ptr.as_ptr(); + + // Store the SimpleBarrier in the first 1k of the area. + init_struct.set_size(10000).unwrap(); + let barrier_ptr: *mut SimpleBarrier = unsafe { + ptr.add(ptr.align_offset(std::mem::align_of::())) + .cast() + }; + unsafe { SimpleBarrier::init(barrier_ptr, 2) }; + let barrier = unsafe { barrier_ptr.as_ref().unwrap() }; + + // Fork another test process. The code after this runs in both processes concurrently. + let fork_result = unsafe { nix::unistd::fork().unwrap() }; + + // In the parent, fill bytes between 1000..2000. In the child, between 2000..3000 + if fork_result.is_parent() { + write_range(ptr, 0xAA, 1000..2000); + } else { + write_range(ptr, 0xBB, 2000..3000); + } + barrier.wait(); + // Verify the contents. (in both processes) + assert_range(ptr, 0xAA, 1000..2000); + assert_range(ptr, 0xBB, 2000..3000); + + // Grow, from the child this time + let size = 10_000_000; + if !fork_result.is_parent() { + init_struct.set_size(size).unwrap(); + } + barrier.wait(); + + // make some writes at the end + if fork_result.is_parent() { + write_range(ptr, 0xAA, (size - 10)..size); + } else { + write_range(ptr, 0xBB, (size - 20)..(size - 10)); + } + barrier.wait(); + + // Verify the contents. (This runs in both processes) + assert_range(ptr, 0, (size - 1000)..(size - 20)); + assert_range(ptr, 0xBB, (size - 20)..(size - 10)); + assert_range(ptr, 0xAA, (size - 10)..size); + + if let ForkResult::Parent { child } = fork_result { + nix::sys::wait::waitpid(child, None).unwrap(); + } + } +} diff --git a/pgxn/neon/communicator/Cargo.toml b/pgxn/neon/communicator/Cargo.toml index d5eab85930..be6d22610a 100644 --- a/pgxn/neon/communicator/Cargo.toml +++ b/pgxn/neon/communicator/Cargo.toml @@ -31,7 +31,7 @@ uring-common = { workspace = true, features = ["bytes"] } pageserver_client_grpc.workspace = true pageserver_page_api.workspace = true -neonart.workspace = true +neon-shmem.workspace = true utils.workspace = true [build-dependencies] diff --git a/pgxn/neon/communicator/src/backend_interface.rs b/pgxn/neon/communicator/src/backend_interface.rs index 224680d136..a348852f14 100644 --- a/pgxn/neon/communicator/src/backend_interface.rs +++ b/pgxn/neon/communicator/src/backend_interface.rs @@ -98,7 +98,7 @@ pub extern "C" fn bcomm_start_get_page_v_request<'t>( // Check if the request can be satisfied from the cache first let mut all_cached = true; - let read_op = bs.integrated_cache.start_read_op(); + let mut read_op = bs.integrated_cache.start_read_op(); for i in 0..get_pagev_request.nblocks { if let Some(cache_block) = read_op.get_page( &get_pagev_request.reltag(), diff --git a/pgxn/neon/communicator/src/init.rs b/pgxn/neon/communicator/src/init.rs index 1c66d287ff..c642588840 100644 --- a/pgxn/neon/communicator/src/init.rs +++ b/pgxn/neon/communicator/src/init.rs @@ -23,8 +23,6 @@ use std::mem; use std::mem::MaybeUninit; use std::os::fd::OwnedFd; -use neonart::allocator::r#static::alloc_array_from_slice; - use crate::backend_comms::NeonIOHandle; use crate::integrated_cache::IntegratedCacheInitStruct; @@ -133,3 +131,48 @@ pub extern "C" fn rcommunicator_shmem_init( cis } + +// fixme: currently unused +#[allow(dead_code)] +pub fn alloc_from_slice( + area: &mut [MaybeUninit], +) -> (&mut MaybeUninit, &mut [MaybeUninit]) { + let layout = std::alloc::Layout::new::(); + + let area_start = area.as_mut_ptr(); + + // pad to satisfy alignment requirements + let padding = area_start.align_offset(layout.align()); + if padding + layout.size() > area.len() { + panic!("out of memory"); + } + let area = &mut area[padding..]; + let (result_area, remain) = area.split_at_mut(layout.size()); + + let result_ptr: *mut MaybeUninit = result_area.as_mut_ptr().cast(); + let result = unsafe { result_ptr.as_mut().unwrap() }; + + (result, remain) +} + +pub fn alloc_array_from_slice( + area: &mut [MaybeUninit], + len: usize, +) -> (&mut [MaybeUninit], &mut [MaybeUninit]) { + let layout = std::alloc::Layout::new::(); + + let area_start = area.as_mut_ptr(); + + // pad to satisfy alignment requirements + let padding = area_start.align_offset(layout.align()); + if padding + layout.size() * len > area.len() { + panic!("out of memory"); + } + let area = &mut area[padding..]; + let (result_area, remain) = area.split_at_mut(layout.size() * len); + + let result_ptr: *mut MaybeUninit = result_area.as_mut_ptr().cast(); + let result = unsafe { std::slice::from_raw_parts_mut(result_ptr.as_mut().unwrap(), len) }; + + (result, remain) +} diff --git a/pgxn/neon/communicator/src/integrated_cache.rs b/pgxn/neon/communicator/src/integrated_cache.rs index c544541d17..f0b14233bf 100644 --- a/pgxn/neon/communicator/src/integrated_cache.rs +++ b/pgxn/neon/communicator/src/integrated_cache.rs @@ -18,7 +18,7 @@ // - blocks in the file cache's file. If the file grows too large, need to evict something. // Also if the cache is resized // -// - entries in the cache tree. If we run out of memory in the shmem area, need to evict +// - entries in the cache map. If we run out of memory in the shmem area, need to evict // something // @@ -33,90 +33,67 @@ use crate::file_cache::INVALID_CACHE_BLOCK; use crate::file_cache::{CacheBlock, FileCache}; use pageserver_page_api::model::RelTag; -use metrics::{IntCounter, IntGauge, IntGaugeVec}; +use metrics::{IntCounter, IntGauge}; -use neonart; -use neonart::TreeInitStruct; -use neonart::TreeIterator; -use neonart::UpdateAction; +use neon_shmem::hash::HashMapInit; +use neon_shmem::hash::UpdateAction; +use neon_shmem::shmem::ShmemHandle; const CACHE_AREA_SIZE: usize = 10 * 1024 * 1024; -type IntegratedCacheTreeInitStruct<'t> = - TreeInitStruct<'t, TreeKey, TreeEntry, neonart::ArtMultiSlabAllocator<'t, TreeEntry>>; +type IntegratedCacheMapInitStruct<'t> = HashMapInit<'t, MapKey, MapEntry>; /// This struct is initialized at postmaster startup, and passed to all the processes via fork(). pub struct IntegratedCacheInitStruct<'t> { - allocator: &'t neonart::ArtMultiSlabAllocator<'t, TreeEntry>, - handle: IntegratedCacheTreeInitStruct<'t>, + map_handle: IntegratedCacheMapInitStruct<'t>, } /// Represents write-access to the integrated cache. This is used by the communicator process. pub struct IntegratedCacheWriteAccess<'t> { - cache_tree: neonart::TreeWriteAccess< - 't, - TreeKey, - TreeEntry, - neonart::ArtMultiSlabAllocator<'t, TreeEntry>, - >, + cache_map: neon_shmem::hash::HashMapAccess<'t, MapKey, MapEntry>, global_lw_lsn: AtomicU64, pub(crate) file_cache: Option, // Fields for eviction - clock_hand: std::sync::Mutex>, + clock_hand: std::sync::Mutex, // Metrics page_evictions_counter: IntCounter, clock_iterations_counter: IntCounter, - nodes_total: IntGaugeVec, - nodes_leaf_total: IntGauge, - nodes_internal4_total: IntGauge, - nodes_internal16_total: IntGauge, - nodes_internal48_total: IntGauge, - nodes_internal256_total: IntGauge, - - nodes_memory_bytes: IntGaugeVec, - nodes_memory_leaf_bytes: IntGauge, - nodes_memory_internal4_bytes: IntGauge, - nodes_memory_internal16_bytes: IntGauge, - nodes_memory_internal48_bytes: IntGauge, - nodes_memory_internal256_bytes: IntGauge, - - // metrics from the art tree - cache_memory_size_bytes: IntGauge, - cache_memory_used_bytes: IntGauge, - cache_tree_epoch: IntGauge, - cache_tree_oldest_epoch: IntGauge, - cache_tree_garbage_total: IntGauge, + // metrics from the hash map + cache_map_num_buckets: IntGauge, + cache_map_num_buckets_in_use: IntGauge, } /// Represents read-only access to the integrated cache. Backend processes have this. pub struct IntegratedCacheReadAccess<'t> { - cache_tree: neonart::TreeReadAccess<'t, TreeKey, TreeEntry>, + cache_map: neon_shmem::hash::HashMapAccess<'t, MapKey, MapEntry>, } impl<'t> IntegratedCacheInitStruct<'t> { /// Return the desired size in bytes of the shared memory area to reserve for the integrated /// cache. pub fn shmem_size(_max_procs: u32) -> usize { - CACHE_AREA_SIZE + // FIXME: the map uses its own ShmemHandle now. This is just for fixed-size allocations + // in the general Postgres shared memory segment. + 0 } /// Initialize the shared memory segment. This runs once in postmaster. Returns a struct which /// will be inherited by all processes through fork. pub fn shmem_init( _max_procs: u32, - shmem_area: &'t mut [MaybeUninit], + _shmem_area: &'t mut [MaybeUninit], ) -> IntegratedCacheInitStruct<'t> { - let allocator = neonart::ArtMultiSlabAllocator::new(shmem_area); - - let handle = IntegratedCacheTreeInitStruct::new(allocator); + let shmem_handle = ShmemHandle::new("integrated cache", 0, CACHE_AREA_SIZE).unwrap(); // Initialize the shared memory area - IntegratedCacheInitStruct { allocator, handle } + let map_handle = + neon_shmem::hash::HashMapInit::init_in_shmem(shmem_handle, CACHE_AREA_SIZE); + IntegratedCacheInitStruct { map_handle } } pub fn worker_process_init( @@ -124,42 +101,14 @@ impl<'t> IntegratedCacheInitStruct<'t> { lsn: Lsn, file_cache: Option, ) -> IntegratedCacheWriteAccess<'t> { - let IntegratedCacheInitStruct { - allocator: _allocator, - handle, - } = self; - let tree_writer = handle.attach_writer(); - - let nodes_total = IntGaugeVec::new( - metrics::core::Opts::new("nodes_total", "Number of nodes in cache tree."), - &["node_kind"], - ) - .unwrap(); - let nodes_leaf_total = nodes_total.with_label_values(&["leaf"]); - let nodes_internal4_total = nodes_total.with_label_values(&["internal4"]); - let nodes_internal16_total = nodes_total.with_label_values(&["internal16"]); - let nodes_internal48_total = nodes_total.with_label_values(&["internal48"]); - let nodes_internal256_total = nodes_total.with_label_values(&["internal256"]); - - let nodes_memory_bytes = IntGaugeVec::new( - metrics::core::Opts::new( - "nodes_memory_bytes", - "Memory reserved for nodes in cache tree.", - ), - &["node_kind"], - ) - .unwrap(); - let nodes_memory_leaf_bytes = nodes_memory_bytes.with_label_values(&["leaf"]); - let nodes_memory_internal4_bytes = nodes_memory_bytes.with_label_values(&["internal4"]); - let nodes_memory_internal16_bytes = nodes_memory_bytes.with_label_values(&["internal16"]); - let nodes_memory_internal48_bytes = nodes_memory_bytes.with_label_values(&["internal48"]); - let nodes_memory_internal256_bytes = nodes_memory_bytes.with_label_values(&["internal256"]); + let IntegratedCacheInitStruct { map_handle } = self; + let map_writer = map_handle.attach_writer(); IntegratedCacheWriteAccess { - cache_tree: tree_writer, + cache_map: map_writer, global_lw_lsn: AtomicU64::new(lsn.0), file_cache, - clock_hand: std::sync::Mutex::new(TreeIterator::new_wrapping()), + clock_hand: std::sync::Mutex::new(0), page_evictions_counter: metrics::IntCounter::new( "integrated_cache_evictions", @@ -173,64 +122,31 @@ impl<'t> IntegratedCacheInitStruct<'t> { ) .unwrap(), - nodes_total, - nodes_leaf_total, - nodes_internal4_total, - nodes_internal16_total, - nodes_internal48_total, - nodes_internal256_total, - - nodes_memory_bytes, - nodes_memory_leaf_bytes, - nodes_memory_internal4_bytes, - nodes_memory_internal16_bytes, - nodes_memory_internal48_bytes, - nodes_memory_internal256_bytes, - - cache_memory_size_bytes: metrics::IntGauge::new( - "cache_memory_size_bytes", - "Memory reserved for cache metadata", + cache_map_num_buckets: metrics::IntGauge::new( + "cache_num_map_buckets", + "Allocated size of the cache hash map", ) .unwrap(), - cache_memory_used_bytes: metrics::IntGauge::new( - "cache_memory_size_bytes", - "Memory used for cache metadata", - ) - .unwrap(), - - cache_tree_epoch: metrics::IntGauge::new( - "cache_tree_epoch", - "Current epoch of the cache tree", - ) - .unwrap(), - cache_tree_oldest_epoch: metrics::IntGauge::new( - "cache_tree_oldest_epoch", - "Oldest active epoch of the cache tree", - ) - .unwrap(), - cache_tree_garbage_total: metrics::IntGauge::new( - "cache_tree_garbage_total", - "Number of obsoleted nodes in cache tree pending GC", + cache_map_num_buckets_in_use: metrics::IntGauge::new( + "cache_num_map_buckets_in_use", + "Number of buckets in use in the cache hash map", ) .unwrap(), } } pub fn backend_init(self) -> IntegratedCacheReadAccess<'t> { - let IntegratedCacheInitStruct { - allocator: _allocator, - handle, - } = self; + let IntegratedCacheInitStruct { map_handle } = self; - let tree_reader = handle.attach_reader(); + let map_reader = map_handle.attach_reader(); IntegratedCacheReadAccess { - cache_tree: tree_reader, + cache_map: map_reader, } } } -enum TreeEntry { +enum MapEntry { Rel(RelEntry), Block(BlockEntry), } @@ -239,7 +155,7 @@ struct BlockEntry { lw_lsn: AtomicLsn, cache_block: AtomicU64, - pinned: AtomicBool, + pinned: AtomicU64, // 'referenced' bit for the clock algorithm referenced: AtomicBool, @@ -251,14 +167,14 @@ struct RelEntry { nblocks: AtomicU32, } -impl std::fmt::Debug for TreeEntry { +impl std::fmt::Debug for MapEntry { fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { match self { - TreeEntry::Rel(e) => fmt + MapEntry::Rel(e) => fmt .debug_struct("Rel") .field("nblocks", &e.nblocks.load(Ordering::Relaxed)) .finish(), - TreeEntry::Block(e) => fmt + MapEntry::Block(e) => fmt .debug_struct("Block") .field("lw_lsn", &e.lw_lsn.load()) .field("cache_block", &e.cache_block.load(Ordering::Relaxed)) @@ -275,37 +191,42 @@ impl std::fmt::Debug for TreeEntry { PartialEq, PartialOrd, Eq, + Hash, Ord, zerocopy_derive::IntoBytes, zerocopy_derive::Immutable, zerocopy_derive::FromBytes, )] #[repr(packed)] -// Note: the fields are stored in big-endian order, to make the radix tree more -// efficient, and to make scans over ranges of blocks work correctly. -struct TreeKey { +// Note: the fields are stored in big-endian order. If we used the keys in a radix tree, that would +// make pack the tree more tightly, and would make scans over ranges of blocks work correctly, +// i.e. return the entries in block number order. XXX: We currently use a hash map though, so it +// doesn't matter. +struct MapKey { spc_oid_be: u32, db_oid_be: u32, rel_number_be: u32, fork_number: u8, block_number_be: u32, } -impl<'a> From<&'a [u8]> for TreeKey { +impl<'a> From<&'a [u8]> for MapKey { fn from(bytes: &'a [u8]) -> Self { Self::read_from_bytes(bytes).expect("invalid key length") } } -fn key_range_for_rel_blocks(rel: &RelTag) -> Range { +// fixme: currently unused +#[allow(dead_code)] +fn key_range_for_rel_blocks(rel: &RelTag) -> Range { Range { - start: TreeKey::from((rel, 0)), - end: TreeKey::from((rel, u32::MAX)), + start: MapKey::from((rel, 0)), + end: MapKey::from((rel, u32::MAX)), } } -impl From<&RelTag> for TreeKey { - fn from(val: &RelTag) -> TreeKey { - TreeKey { +impl From<&RelTag> for MapKey { + fn from(val: &RelTag) -> MapKey { + MapKey { spc_oid_be: val.spc_oid.to_be(), db_oid_be: val.db_oid.to_be(), rel_number_be: val.rel_number.to_be(), @@ -315,9 +236,9 @@ impl From<&RelTag> for TreeKey { } } -impl From<(&RelTag, u32)> for TreeKey { - fn from(val: (&RelTag, u32)) -> TreeKey { - TreeKey { +impl From<(&RelTag, u32)> for MapKey { + fn from(val: (&RelTag, u32)) -> MapKey { + MapKey { spc_oid_be: val.0.spc_oid.to_be(), db_oid_be: val.0.db_oid.to_be(), rel_number_be: val.0.rel_number.to_be(), @@ -327,7 +248,7 @@ impl From<(&RelTag, u32)> for TreeKey { } } -impl neonart::Key for TreeKey { +impl neon_shmem::hash::Key for MapKey { const KEY_LEN: usize = 4 + 4 + 4 + 1 + 4; fn as_bytes(&self) -> &[u8] { @@ -335,7 +256,7 @@ impl neonart::Key for TreeKey { } } -impl neonart::Value for TreeEntry {} +impl neon_shmem::hash::Value for MapEntry {} /// Return type used in the cache's get_*() functions. 'Found' means that the page, or other /// information that was enqueried, exists in the cache. ' @@ -351,8 +272,7 @@ pub enum CacheResult { impl<'t> IntegratedCacheWriteAccess<'t> { pub fn get_rel_size(&'t self, rel: &RelTag) -> CacheResult { - let r = self.cache_tree.start_read(); - if let Some(nblocks) = get_rel_size(&r, rel) { + if let Some(nblocks) = get_rel_size(&self.cache_map, rel) { CacheResult::Found(nblocks) } else { let lsn = Lsn(self.global_lw_lsn.load(Ordering::Relaxed)); @@ -366,31 +286,39 @@ impl<'t> IntegratedCacheWriteAccess<'t> { block_number: u32, dst: impl uring_common::buf::IoBufMut + Send + Sync, ) -> Result, std::io::Error> { - let r = self.cache_tree.start_read(); - if let Some(block_tree_entry) = r.get(&TreeKey::from((rel, block_number))) { - let block_entry = if let TreeEntry::Block(e) = block_tree_entry { + let x = if let Some(entry) = + self.cache_map.get(&MapKey::from((rel, block_number))) + { + let block_entry = if let MapEntry::Block(e) = &*entry { e } else { - panic!("unexpected tree entry type for block key"); + panic!("unexpected map entry type for block key"); }; block_entry.referenced.store(true, Ordering::Relaxed); let cache_block = block_entry.cache_block.load(Ordering::Relaxed); if cache_block != INVALID_CACHE_BLOCK { - self.file_cache - .as_ref() - .unwrap() - .read_block(cache_block, dst) - .await?; + // pin it and release lock + block_entry.pinned.fetch_add(1, Ordering::Relaxed); - Ok(CacheResult::Found(())) + (cache_block, DeferredUnpin(block_entry.pinned.as_ptr())) } else { - Ok(CacheResult::NotFound(block_entry.lw_lsn.load())) + return Ok(CacheResult::NotFound(block_entry.lw_lsn.load())); } } else { let lsn = Lsn(self.global_lw_lsn.load(Ordering::Relaxed)); - Ok(CacheResult::NotFound(lsn)) - } + return Ok(CacheResult::NotFound(lsn)); + }; + + let (cache_block, _deferred_pin) = x; + self.file_cache + .as_ref() + .unwrap() + .read_block(cache_block, dst) + .await?; + + // unpin the entry (by implicitly dropping deferred_pin) + Ok(CacheResult::Found(())) } pub async fn page_is_cached( @@ -398,12 +326,11 @@ impl<'t> IntegratedCacheWriteAccess<'t> { rel: &RelTag, block_number: u32, ) -> Result, std::io::Error> { - let r = self.cache_tree.start_read(); - if let Some(block_tree_entry) = r.get(&TreeKey::from((rel, block_number))) { - let block_entry = if let TreeEntry::Block(e) = block_tree_entry { + if let Some(entry) = self.cache_map.get(&MapKey::from((rel, block_number))) { + let block_entry = if let MapEntry::Block(e) = &*entry { e } else { - panic!("unexpected tree entry type for block key"); + panic!("unexpected map entry type for block key"); }; // This is used for prefetch requests. Treat the probe as an 'access', to keep it @@ -427,8 +354,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> { /// information, i.e. we don't know if the relation exists or not. pub fn get_rel_exists(&'t self, rel: &RelTag) -> CacheResult { // we don't currently cache negative entries, so if the relation is in the cache, it exists - let r = self.cache_tree.start_read(); - if let Some(_rel_entry) = r.get(&TreeKey::from(rel)) { + if let Some(_rel_entry) = self.cache_map.get(&MapKey::from(rel)) { CacheResult::Found(true) } else { let lsn = Lsn(self.global_lw_lsn.load(Ordering::Relaxed)); @@ -447,21 +373,22 @@ impl<'t> IntegratedCacheWriteAccess<'t> { } pub fn remember_rel_size(&'t self, rel: &RelTag, nblocks: u32) { - let w = self.cache_tree.start_write(); - let result = w.update_with_fn(&TreeKey::from(rel), |existing| match existing { - None => { - tracing::info!("inserting rel entry for {rel:?}, {nblocks} blocks"); - UpdateAction::Insert(TreeEntry::Rel(RelEntry { - nblocks: AtomicU32::new(nblocks), - })) - } - Some(TreeEntry::Block(_)) => panic!("unexpected tree entry type for rel key"), - Some(TreeEntry::Rel(e)) => { - tracing::info!("updating rel entry for {rel:?}, {nblocks} blocks"); - e.nblocks.store(nblocks, Ordering::Relaxed); - UpdateAction::Nothing - } - }); + let result = self + .cache_map + .update_with_fn(&MapKey::from(rel), |existing| match existing { + None => { + tracing::info!("inserting rel entry for {rel:?}, {nblocks} blocks"); + UpdateAction::Insert(MapEntry::Rel(RelEntry { + nblocks: AtomicU32::new(nblocks), + })) + } + Some(MapEntry::Block(_)) => panic!("unexpected map entry type for rel key"), + Some(MapEntry::Rel(e)) => { + tracing::info!("updating rel entry for {rel:?}, {nblocks} blocks"); + e.nblocks.store(nblocks, Ordering::Relaxed); + UpdateAction::Nothing + } + }); // FIXME: what to do if we run out of memory? Evict other relation entries? Remove // block entries first? @@ -477,7 +404,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> { lw_lsn: Lsn, is_write: bool, ) { - let key = TreeKey::from((rel, block_number)); + let key = MapKey::from((rel, block_number)); // FIXME: make this work when file cache is disabled. Or make it mandatory let file_cache = self.file_cache.as_ref().unwrap(); @@ -488,26 +415,26 @@ impl<'t> IntegratedCacheWriteAccess<'t> { // regular POSIX filesystem read() and write() // First check if we have a block in cache already - let w = self.cache_tree.start_write(); - let mut old_cache_block = None; let mut found_existing = false; - let res = w.update_with_fn(&key, |existing| { + let res = self.cache_map.update_with_fn(&key, |existing| { if let Some(existing) = existing { - let block_entry = if let TreeEntry::Block(e) = existing { + let block_entry = if let MapEntry::Block(e) = existing { e } else { - panic!("unexpected tree entry type for block key"); + panic!("unexpected map entry type for block key"); }; found_existing = true; // Prevent this entry from being evicted - let was_pinned = block_entry.pinned.swap(true, Ordering::Relaxed); - if was_pinned { + let pin_count = block_entry.pinned.fetch_add(1, Ordering::Relaxed); + if pin_count > 0 { // this is unexpected, because the caller has obtained the io-in-progress lock, // so no one else should try to modify the page at the same time. + // XXX: and I think a read should not be happening either, because the postgres + // buffer is held locked. TODO: check these conditions and tidy this up a little. Seems fragile to just panic. panic!("block entry was unexpectedly pinned"); } @@ -547,14 +474,13 @@ impl<'t> IntegratedCacheWriteAccess<'t> { // FIXME: unpin the block entry on error // Update the block entry - let w = self.cache_tree.start_write(); - let res = w.update_with_fn(&key, |existing| { + let res = self.cache_map.update_with_fn(&key, |existing| { assert_eq!(found_existing, existing.is_some()); if let Some(existing) = existing { - let block_entry = if let TreeEntry::Block(e) = existing { + let block_entry = if let MapEntry::Block(e) = existing { e } else { - panic!("unexpected tree entry type for block key"); + panic!("unexpected map entry type for block key"); }; // Update the cache block @@ -570,14 +496,14 @@ impl<'t> IntegratedCacheWriteAccess<'t> { block_entry.referenced.store(true, Ordering::Relaxed); - let was_pinned = block_entry.pinned.swap(false, Ordering::Relaxed); - assert!(was_pinned); + let pin_count = block_entry.pinned.fetch_sub(1, Ordering::Relaxed); + assert!(pin_count > 0); UpdateAction::Nothing } else { - UpdateAction::Insert(TreeEntry::Block(BlockEntry { + UpdateAction::Insert(MapEntry::Block(BlockEntry { lw_lsn: AtomicLsn::new(lw_lsn.0), cache_block: AtomicU64::new(cache_block), - pinned: AtomicBool::new(false), + pinned: AtomicU64::new(0), referenced: AtomicBool::new(true), })) } @@ -612,17 +538,16 @@ impl<'t> IntegratedCacheWriteAccess<'t> { .expect("error writing to cache"); // FIXME: handle errors gracefully. - let w = self.cache_tree.start_write(); - - let res = w.update_with_fn(&key, |existing| { + let res = self.cache_map.update_with_fn(&key, |existing| { if let Some(existing) = existing { - let block_entry = if let TreeEntry::Block(e) = existing { + let block_entry = if let MapEntry::Block(e) = existing { e } else { - panic!("unexpected tree entry type for block key"); + panic!("unexpected map entry type for block key"); }; - assert!(!block_entry.pinned.load(Ordering::Relaxed)); + // FIXME: could there be concurrent readers? + assert!(block_entry.pinned.load(Ordering::Relaxed) == 0); let old_cache_block = block_entry.cache_block.swap(cache_block, Ordering::Relaxed); if old_cache_block != INVALID_CACHE_BLOCK { @@ -630,10 +555,10 @@ impl<'t> IntegratedCacheWriteAccess<'t> { } UpdateAction::Nothing } else { - UpdateAction::Insert(TreeEntry::Block(BlockEntry { + UpdateAction::Insert(MapEntry::Block(BlockEntry { lw_lsn: AtomicLsn::new(lw_lsn.0), cache_block: AtomicU64::new(cache_block), - pinned: AtomicBool::new(false), + pinned: AtomicU64::new(0), referenced: AtomicBool::new(true), })) } @@ -648,47 +573,50 @@ impl<'t> IntegratedCacheWriteAccess<'t> { /// Forget information about given relation in the cache. (For DROP TABLE and such) pub fn forget_rel(&'t self, rel: &RelTag) { tracing::info!("forgetting rel entry for {rel:?}"); - let w = self.cache_tree.start_write(); - w.remove(&TreeKey::from(rel)); + self.cache_map.remove(&MapKey::from(rel)); // also forget all cached blocks for the relation - let mut iter = TreeIterator::new(&key_range_for_rel_blocks(rel)); - let r = self.cache_tree.start_read(); - while let Some((k, _v)) = iter.next(&r) { - let w = self.cache_tree.start_write(); + // FIXME + /* + let mut iter = MapIterator::new(&key_range_for_rel_blocks(rel)); + let r = self.cache_tree.start_read(); + while let Some((k, _v)) = iter.next(&r) { + let w = self.cache_tree.start_write(); - let mut evicted_cache_block = None; + let mut evicted_cache_block = None; - let res = w.update_with_fn(&k, |e| { - if let Some(e) = e { - let block_entry = if let TreeEntry::Block(e) = e { - e + let res = w.update_with_fn(&k, |e| { + if let Some(e) = e { + let block_entry = if let MapEntry::Block(e) = e { + e + } else { + panic!("unexpected map entry type for block key"); + }; + let cache_block = block_entry + .cache_block + .swap(INVALID_CACHE_BLOCK, Ordering::Relaxed); + if cache_block != INVALID_CACHE_BLOCK { + evicted_cache_block = Some(cache_block); + } + UpdateAction::Remove } else { - panic!("unexpected tree entry type for block key"); - }; - let cache_block = block_entry - .cache_block - .swap(INVALID_CACHE_BLOCK, Ordering::Relaxed); - if cache_block != INVALID_CACHE_BLOCK { - evicted_cache_block = Some(cache_block); + UpdateAction::Nothing } - UpdateAction::Remove - } else { - UpdateAction::Nothing + }); + + // FIXME: It's pretty surprising to run out of memory while removing. But + // maybe it can happen because of trying to shrink a node? + res.expect("out of memory"); + + if let Some(evicted_cache_block) = evicted_cache_block { + self.file_cache + .as_ref() + .unwrap() + .dealloc_block(evicted_cache_block); } - }); - - // FIXME: It's pretty surprising to run out of memory while removing. But - // maybe it can happen because of trying to shrink a node? - res.expect("out of memory"); - - if let Some(evicted_cache_block) = evicted_cache_block { - self.file_cache - .as_ref() - .unwrap() - .dealloc_block(evicted_cache_block); - } } + + */ } // Maintenance routines @@ -699,147 +627,109 @@ impl<'t> IntegratedCacheWriteAccess<'t> { pub fn try_evict_one_cache_block(&self) -> Option { let mut clock_hand = self.clock_hand.lock().unwrap(); for _ in 0..100 { - let r = self.cache_tree.start_read(); - self.clock_iterations_counter.inc(); - match clock_hand.next(&r) { + (*clock_hand) += 1; + + let mut evict_this = false; + let num_buckets = self.cache_map.get_num_buckets(); + match self + .cache_map + .get_bucket((*clock_hand) % num_buckets) + .as_deref() + { None => { - // The cache is completely empty. Pretty unexpected that this function - // was called then.. - break; + // This bucket was unused } - Some((_k, TreeEntry::Rel(_))) => { + Some(MapEntry::Rel(_)) => { // ignore rel entries for now. // TODO: They stick in the cache forever } - Some((k, TreeEntry::Block(blk_entry))) => { + Some(MapEntry::Block(blk_entry)) => { if !blk_entry.referenced.swap(false, Ordering::Relaxed) { // Evict this. Maybe. - let w = self.cache_tree.start_write(); - - let mut evicted_cache_block = None; - let res = w.update_with_fn(&k, |old| { - match old { - None => UpdateAction::Nothing, - Some(TreeEntry::Rel(_)) => panic!("unexpected Rel entry"), - Some(TreeEntry::Block(old)) => { - // note: all the accesses to 'pinned' currently happen - // within update_with_fn(), which protects from concurrent - // updates. Otherwise, another thread could set the 'pinned' - // flag just after we have checked it here. - if blk_entry.pinned.load(Ordering::Relaxed) { - return UpdateAction::Nothing; - } - - let _ = self - .global_lw_lsn - .fetch_max(old.lw_lsn.load().0, Ordering::Relaxed); - let cache_block = old - .cache_block - .swap(INVALID_CACHE_BLOCK, Ordering::Relaxed); - if cache_block != INVALID_CACHE_BLOCK { - evicted_cache_block = Some(cache_block); - } - // TODO: we don't evict the entry, just the block. Does it make - // sense to keep the entry? - UpdateAction::Nothing - } - } - }); - - // FIXME: what to do if we run out of memory? Evict other relation entries? Remove - // block entries first? It probably shouldn't happen here, as we're not - // actually updating the tree. - res.expect("out of memory"); - - if evicted_cache_block.is_some() { - self.page_evictions_counter.inc(); - return evicted_cache_block; - } + evict_this = true; } } + }; + + if evict_this { + // grab the write lock + let mut evicted_cache_block = None; + let res = + self.cache_map + .update_with_fn_at_bucket(*clock_hand % num_buckets, |old| { + match old { + None => UpdateAction::Nothing, + Some(MapEntry::Rel(_)) => panic!("unexpected Rel entry"), + Some(MapEntry::Block(old)) => { + // note: all the accesses to 'pinned' currently happen + // within update_with_fn(), or while holding ValueReadGuard, which protects from concurrent + // updates. Otherwise, another thread could set the 'pinned' + // flag just after we have checked it here. + if old.pinned.load(Ordering::Relaxed) != 0 { + return UpdateAction::Nothing; + } + + let _ = self + .global_lw_lsn + .fetch_max(old.lw_lsn.load().0, Ordering::Relaxed); + let cache_block = old + .cache_block + .swap(INVALID_CACHE_BLOCK, Ordering::Relaxed); + if cache_block != INVALID_CACHE_BLOCK { + evicted_cache_block = Some(cache_block); + } + // TODO: we don't evict the entry, just the block. Does it make + // sense to keep the entry? + UpdateAction::Nothing + } + } + }); + + // Out of memory should not happen here, as we're only updating existing values, + // not inserting new entries to the map. + res.expect("out of memory"); + + if evicted_cache_block.is_some() { + self.page_evictions_counter.inc(); + return evicted_cache_block; + } } } // Give up if we didn't find anything None } - pub fn dump_tree(&self, dst: &mut dyn std::io::Write) { - self.cache_tree.start_read().dump(dst); + pub fn dump_map(&self, _dst: &mut dyn std::io::Write) { + //FIXME self.cache_map.start_read().dump(dst); } } impl metrics::core::Collector for IntegratedCacheWriteAccess<'_> { fn desc(&self) -> Vec<&metrics::core::Desc> { let mut descs = Vec::new(); - descs.append(&mut self.nodes_total.desc()); - descs.append(&mut self.nodes_memory_bytes.desc()); descs.append(&mut self.page_evictions_counter.desc()); descs.append(&mut self.clock_iterations_counter.desc()); - descs.append(&mut self.cache_memory_size_bytes.desc()); - descs.append(&mut self.cache_memory_used_bytes.desc()); - - descs.append(&mut self.cache_tree_epoch.desc()); - descs.append(&mut self.cache_tree_oldest_epoch.desc()); - descs.append(&mut self.cache_tree_garbage_total.desc()); + descs.append(&mut self.cache_map_num_buckets.desc()); + descs.append(&mut self.cache_map_num_buckets_in_use.desc()); descs } fn collect(&self) -> Vec { - const ALLOC_BLOCK_SIZE: i64 = neonart::allocator::block::BLOCK_SIZE as i64; - // Update gauges - let art_statistics = self.cache_tree.get_statistics(); - self.nodes_leaf_total - .set(art_statistics.slabs.num_leaf as i64); - self.nodes_internal4_total - .set(art_statistics.slabs.num_internal4 as i64); - self.nodes_internal16_total - .set(art_statistics.slabs.num_internal16 as i64); - self.nodes_internal48_total - .set(art_statistics.slabs.num_internal48 as i64); - self.nodes_internal256_total - .set(art_statistics.slabs.num_internal256 as i64); - - self.nodes_memory_leaf_bytes - .set(art_statistics.slabs.num_blocks_leaf as i64 * ALLOC_BLOCK_SIZE); - self.nodes_memory_internal4_bytes - .set(art_statistics.slabs.num_blocks_internal4 as i64 * ALLOC_BLOCK_SIZE); - self.nodes_memory_internal16_bytes - .set(art_statistics.slabs.num_blocks_internal16 as i64 * ALLOC_BLOCK_SIZE); - self.nodes_memory_internal48_bytes - .set(art_statistics.slabs.num_blocks_internal48 as i64 * ALLOC_BLOCK_SIZE); - self.nodes_memory_internal256_bytes - .set(art_statistics.slabs.num_blocks_internal256 as i64 * ALLOC_BLOCK_SIZE); - - let block_statistics = &art_statistics.blocks; - self.cache_memory_size_bytes - .set(block_statistics.num_blocks as i64 * ALLOC_BLOCK_SIZE as i64); - self.cache_memory_used_bytes.set( - (block_statistics.num_initialized as i64 - block_statistics.num_free_blocks as i64) - * ALLOC_BLOCK_SIZE as i64, - ); - - self.cache_tree_epoch.set(art_statistics.epoch as i64); - self.cache_tree_oldest_epoch - .set(art_statistics.oldest_epoch as i64); - self.cache_tree_garbage_total - .set(art_statistics.num_garbage as i64); + self.cache_map_num_buckets + .set(self.cache_map.get_num_buckets() as i64); + self.cache_map_num_buckets_in_use + .set(self.cache_map.get_num_buckets_in_use() as i64); let mut values = Vec::new(); - values.append(&mut self.nodes_total.collect()); - values.append(&mut self.nodes_memory_bytes.collect()); values.append(&mut self.page_evictions_counter.collect()); values.append(&mut self.clock_iterations_counter.collect()); - values.append(&mut self.cache_memory_size_bytes.collect()); - values.append(&mut self.cache_memory_used_bytes.collect()); - - values.append(&mut self.cache_tree_epoch.collect()); - values.append(&mut self.cache_tree_oldest_epoch.collect()); - values.append(&mut self.cache_tree_garbage_total.collect()); + values.append(&mut self.cache_map_num_buckets.collect()); + values.append(&mut self.cache_map_num_buckets_in_use.collect()); values } @@ -849,12 +739,15 @@ impl metrics::core::Collector for IntegratedCacheWriteAccess<'_> { /// /// This is in a separate function so that it can be shared by /// IntegratedCacheReadAccess::get_rel_size() and IntegratedCacheWriteAccess::get_rel_size() -fn get_rel_size<'t>(r: &neonart::TreeReadGuard, rel: &RelTag) -> Option { - if let Some(existing) = r.get(&TreeKey::from(rel)) { - let rel_entry = if let TreeEntry::Rel(e) = existing { +fn get_rel_size<'t>( + r: &neon_shmem::hash::HashMapAccess, + rel: &RelTag, +) -> Option { + if let Some(existing) = r.get(&MapKey::from(rel)) { + let rel_entry = if let MapEntry::Rel(ref e) = *existing { e } else { - panic!("unexpected tree entry type for rel key"); + panic!("unexpected map entry type for rel key"); }; let nblocks = rel_entry.nblocks.load(Ordering::Relaxed); @@ -874,17 +767,20 @@ fn get_rel_size<'t>(r: &neonart::TreeReadGuard, rel: &RelTag /// request to the communicator process. impl<'t> IntegratedCacheReadAccess<'t> { pub fn get_rel_size(&'t self, rel: &RelTag) -> Option { - get_rel_size(&self.cache_tree.start_read(), rel) + get_rel_size(&self.cache_map, rel) } pub fn start_read_op(&'t self) -> BackendCacheReadOp<'t> { - let r = self.cache_tree.start_read(); - BackendCacheReadOp { read_guard: r } + BackendCacheReadOp { + read_guards: Vec::new(), + map_access: self, + } } } pub struct BackendCacheReadOp<'t> { - read_guard: neonart::TreeReadGuard<'t, TreeKey, TreeEntry>, + read_guards: Vec, + map_access: &'t IntegratedCacheReadAccess<'t>, } impl<'e> BackendCacheReadOp<'e> { @@ -896,17 +792,24 @@ impl<'e> BackendCacheReadOp<'e> { /// read. It's possible that while you are performing the read, the cache block is invalidated. /// After you have completed the read, call BackendCacheReadResult::finish() to check if the /// read was in fact valid or not. If it was concurrently invalidated, you need to retry. - pub fn get_page(&self, rel: &RelTag, block_number: u32) -> Option { - if let Some(block_tree_entry) = self.read_guard.get(&TreeKey::from((rel, block_number))) { - let block_entry = if let TreeEntry::Block(e) = block_tree_entry { + pub fn get_page(&mut self, rel: &RelTag, block_number: u32) -> Option { + if let Some(entry) = self + .map_access + .cache_map + .get(&MapKey::from((rel, block_number))) + { + let block_entry = if let MapEntry::Block(ref e) = *entry { e } else { - panic!("unexpected tree entry type for block key"); + panic!("unexpected map entry type for block key"); }; block_entry.referenced.store(true, Ordering::Relaxed); let cache_block = block_entry.cache_block.load(Ordering::Relaxed); if cache_block != INVALID_CACHE_BLOCK { + block_entry.pinned.fetch_add(1, Ordering::Relaxed); + self.read_guards + .push(DeferredUnpin(block_entry.pinned.as_ptr())); Some(cache_block) } else { None @@ -917,10 +820,27 @@ impl<'e> BackendCacheReadOp<'e> { } pub fn finish(self) -> bool { - // TODO: currently, we use a spinlock to protect the in-memory tree, so concurrent - // invalidations are not possible. But the plan is to switch to optimistic locking, - // and once we do that, this would return 'false' if the optimistic locking failed and - // you need to retry. + // TODO: currently, we hold a pin on the in-memory map, so concurrent invalidations are not + // possible. But if we switch to optimistic locking, this would return 'false' if the + // optimistic locking failed and you need to retry. true } } + +/// A hack to decrement an AtomicU64 on drop. This is used to decrement the pin count +/// of a BlockEntry. The safety depends on the fact that the BlockEntry is not evicted +/// or moved while it's pinned. +struct DeferredUnpin(*mut u64); + +unsafe impl Sync for DeferredUnpin {} +unsafe impl Send for DeferredUnpin {} + +impl Drop for DeferredUnpin { + fn drop(&mut self) { + // unpin it + unsafe { + let pin_ref = AtomicU64::from_ptr(self.0); + pin_ref.fetch_sub(1, Ordering::Relaxed); + } + } +} diff --git a/pgxn/neon/communicator/src/worker_process/metrics_exporter.rs b/pgxn/neon/communicator/src/worker_process/metrics_exporter.rs index 160b37fa37..b1042b928c 100644 --- a/pgxn/neon/communicator/src/worker_process/metrics_exporter.rs +++ b/pgxn/neon/communicator/src/worker_process/metrics_exporter.rs @@ -19,7 +19,7 @@ impl<'a> CommunicatorWorkerProcessStruct<'a> { use axum::routing::get; let app = Router::new() .route("/metrics", get(get_metrics)) - .route("/dump_cache_tree", get(dump_cache_tree)) + .route("/dump_cache_map", get(dump_cache_map)) .with_state(self); // TODO: make configurable. Or listen on unix domain socket? @@ -34,11 +34,11 @@ impl<'a> CommunicatorWorkerProcessStruct<'a> { } } -async fn dump_cache_tree( +async fn dump_cache_map( State(state): State<&CommunicatorWorkerProcessStruct<'static>>, ) -> Response { let mut buf: Vec = Vec::new(); - state.cache.dump_tree(&mut buf); + state.cache.dump_map(&mut buf); Response::builder() .status(StatusCode::OK)