From 10b936bf031d0cd94420ae31227ff64c2f0bd8d4 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Thu, 5 Jun 2025 18:31:29 +0300 Subject: [PATCH] Use a custom Rust implementation to replace the LFC hash table The new implementation lives in a separately allocated shared memory area, which could be resized. Resizing it isn't actually implemented yet, though. It would require some co-operation from the LFC code. --- Cargo.lock | 126 +++++++- Cargo.toml | 3 + Makefile | 7 + libs/neon-shmem/Cargo.toml | 6 +- libs/neon-shmem/src/hash.rs | 304 ++++++++++++++++++ libs/neon-shmem/src/hash/core.rs | 174 ++++++++++ libs/neon-shmem/src/hash/tests.rs | 220 +++++++++++++ libs/neon-shmem/src/lib.rs | 1 + pgxn/neon/Makefile | 4 +- pgxn/neon/communicator/Cargo.toml | 13 + pgxn/neon/communicator/README.md | 8 + pgxn/neon/communicator/build.rs | 22 ++ pgxn/neon/communicator/cbindgen.toml | 4 + .../communicator/src/file_cache_hashmap.rs | 240 ++++++++++++++ pgxn/neon/communicator/src/lib.rs | 1 + pgxn/neon/file_cache.c | 127 ++++---- 16 files changed, 1186 insertions(+), 74 deletions(-) create mode 100644 libs/neon-shmem/src/hash.rs create mode 100644 libs/neon-shmem/src/hash/core.rs create mode 100644 libs/neon-shmem/src/hash/tests.rs create mode 100644 pgxn/neon/communicator/Cargo.toml create mode 100644 pgxn/neon/communicator/README.md create mode 100644 pgxn/neon/communicator/build.rs create mode 100644 pgxn/neon/communicator/cbindgen.toml create mode 100644 pgxn/neon/communicator/src/file_cache_hashmap.rs create mode 100644 pgxn/neon/communicator/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 588a63b6a3..bafcaea594 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1086,6 +1086,25 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" +[[package]] +name = "cbindgen" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eadd868a2ce9ca38de7eeafdcec9c7065ef89b42b32f0839278d55f35c54d1ff" +dependencies = [ + "clap", + "heck 0.4.1", + "indexmap 2.9.0", + "log", + "proc-macro2", + "quote", + "serde", + "serde_json", + "syn 2.0.100", + "tempfile", + "toml", +] + [[package]] name = "cc" version = "1.2.16" @@ -1212,7 +1231,7 @@ version = "4.5.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4ac6a0c7b1a9e9a5186361f67dfa1b88213572f427fb9ab038efb2bd8c582dab" dependencies = [ - "heck", + "heck 0.5.0", "proc-macro2", "quote", "syn 2.0.100", @@ -1270,6 +1289,14 @@ dependencies = [ "unicode-width", ] +[[package]] +name = "communicator" +version = "0.1.0" +dependencies = [ + "cbindgen", + "neon-shmem", +] + [[package]] name = "compute_api" version = "0.1.0" @@ -1936,7 +1963,7 @@ checksum = "0892a17df262a24294c382f0d5997571006e7a4348b4327557c4ff1cd4a8bccc" dependencies = [ "darling", "either", - "heck", + "heck 0.5.0", "proc-macro2", "quote", "syn 2.0.100", @@ -2500,6 +2527,18 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "getrandom" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26145e563e54f2cadc477553f1ec5ee650b00862f0a58bcd12cbdc5f0ea2d2f4" +dependencies = [ + "cfg-if", + "libc", + "r-efi", + "wasi 0.14.2+wasi-0.2.4", +] + [[package]] name = "gettid" version = "0.1.3" @@ -2712,6 +2751,12 @@ dependencies = [ "http 1.1.0", ] +[[package]] +name = "heck" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" + [[package]] name = "heck" version = "0.5.0" @@ -3648,7 +3693,7 @@ version = "0.0.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b9e6777fc80a575f9503d908c8b498782a6c3ee88a06cb416dc3941401e43b94" dependencies = [ - "heck", + "heck 0.5.0", "proc-macro2", "quote", "syn 2.0.100", @@ -3710,7 +3755,7 @@ dependencies = [ "procfs", "prometheus", "rand 0.8.5", - "rand_distr", + "rand_distr 0.4.3", "twox-hash", ] @@ -3799,6 +3844,8 @@ name = "neon-shmem" version = "0.1.0" dependencies = [ "nix 0.30.1", + "rand 0.9.1", + "rand_distr 0.5.1", "tempfile", "thiserror 1.0.69", "workspace_hack", @@ -5092,7 +5139,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "22505a5c94da8e3b7c2996394d1c933236c4d743e81a410bcca4e6989fc066a4" dependencies = [ "bytes", - "heck", + "heck 0.5.0", "itertools 0.12.1", "log", "multimap", @@ -5113,7 +5160,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c1318b19085f08681016926435853bbf7858f9c082d0999b80550ff5d9abe15" dependencies = [ "bytes", - "heck", + "heck 0.5.0", "itertools 0.12.1", "log", "multimap", @@ -5238,7 +5285,7 @@ dependencies = [ "postgres_backend", "pq_proto", "rand 0.8.5", - "rand_distr", + "rand_distr 0.4.3", "rcgen", "redis", "regex", @@ -5342,6 +5389,12 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "r-efi" +version = "5.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74765f6d916ee2faa39bc8e68e4f3ed8949b48cccdac59983d287a7cb71ce9c5" + [[package]] name = "rand" version = "0.7.3" @@ -5366,6 +5419,16 @@ dependencies = [ "rand_core 0.6.4", ] +[[package]] +name = "rand" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fbfd9d094a40bf3ae768db9361049ace4c0e04a4fd6b359518bd7b73a73dd97" +dependencies = [ + "rand_chacha 0.9.0", + "rand_core 0.9.3", +] + [[package]] name = "rand_chacha" version = "0.2.2" @@ -5386,6 +5449,16 @@ dependencies = [ "rand_core 0.6.4", ] +[[package]] +name = "rand_chacha" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb" +dependencies = [ + "ppv-lite86", + "rand_core 0.9.3", +] + [[package]] name = "rand_core" version = "0.5.1" @@ -5404,6 +5477,15 @@ dependencies = [ "getrandom 0.2.11", ] +[[package]] +name = "rand_core" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "99d9a13982dcf210057a8a78572b2217b667c3beacbf3a0d8b454f6f82837d38" +dependencies = [ + "getrandom 0.3.3", +] + [[package]] name = "rand_distr" version = "0.4.3" @@ -5414,6 +5496,16 @@ dependencies = [ "rand 0.8.5", ] +[[package]] +name = "rand_distr" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a8615d50dcf34fa31f7ab52692afec947c4dd0ab803cc87cb3b0b4570ff7463" +dependencies = [ + "num-traits", + "rand 0.9.1", +] + [[package]] name = "rand_hc" version = "0.2.0" @@ -6900,7 +6992,7 @@ version = "0.26.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4c6bee85a5a24955dc440386795aa378cd9cf82acd5f764469152d2270e581be" dependencies = [ - "heck", + "heck 0.5.0", "proc-macro2", "quote", "rustversion", @@ -8199,6 +8291,15 @@ version = "0.11.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" +[[package]] +name = "wasi" +version = "0.14.2+wasi-0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9683f9a5a998d873c0d21fcbe3c083009670149a8fab228644b8bd36b2c48cb3" +dependencies = [ + "wit-bindgen-rt", +] + [[package]] name = "wasite" version = "0.1.0" @@ -8556,6 +8657,15 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "wit-bindgen-rt" +version = "0.39.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f42320e61fe2cfd34354ecb597f86f413484a798ba44a8ca1165c58d42da6c1" +dependencies = [ + "bitflags 2.8.0", +] + [[package]] name = "workspace_hack" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index a040010fb7..df0ab04fb6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,6 +44,7 @@ members = [ "libs/proxy/postgres-types2", "libs/proxy/tokio-postgres2", "endpoint_storage", + "pgxn/neon/communicator", ] [workspace.package] @@ -251,6 +252,7 @@ desim = { version = "0.1", path = "./libs/desim" } endpoint_storage = { version = "0.0.1", path = "./endpoint_storage/" } http-utils = { version = "0.1", path = "./libs/http-utils/" } metrics = { version = "0.1", path = "./libs/metrics/" } +neon-shmem = { version = "0.1", path = "./libs/neon-shmem/" } pageserver = { path = "./pageserver" } pageserver_api = { version = "0.1", path = "./libs/pageserver_api/" } pageserver_client = { path = "./pageserver/client" } @@ -278,6 +280,7 @@ walproposer = { version = "0.1", path = "./libs/walproposer/" } workspace_hack = { version = "0.1", path = "./workspace_hack/" } ## Build dependencies +cbindgen = "0.28.0" criterion = "0.5.1" rcgen = "0.13" rstest = "0.18" diff --git a/Makefile b/Makefile index 0911465fb8..820f3c20f1 100644 --- a/Makefile +++ b/Makefile @@ -18,10 +18,12 @@ ifeq ($(BUILD_TYPE),release) PG_LDFLAGS = $(LDFLAGS) # Unfortunately, `--profile=...` is a nightly feature CARGO_BUILD_FLAGS += --release + NEON_CARGO_ARTIFACT_TARGET_DIR = $(ROOT_PROJECT_DIR)/target/release else ifeq ($(BUILD_TYPE),debug) PG_CONFIGURE_OPTS = --enable-debug --with-openssl --enable-cassert --enable-depend PG_CFLAGS += -O0 -g3 $(CFLAGS) PG_LDFLAGS = $(LDFLAGS) + NEON_CARGO_ARTIFACT_TARGET_DIR = $(ROOT_PROJECT_DIR)/target/debug else $(error Bad build type '$(BUILD_TYPE)', see Makefile for options) endif @@ -180,11 +182,16 @@ postgres-check-%: postgres-% .PHONY: neon-pg-ext-% neon-pg-ext-%: postgres-% + +@echo "Compiling communicator $*" + $(CARGO_CMD_PREFIX) cargo build -p communicator $(CARGO_BUILD_FLAGS) + +@echo "Compiling neon $*" mkdir -p $(POSTGRES_INSTALL_DIR)/build/neon-$* $(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config COPT='$(COPT)' \ + LIBCOMMUNICATOR_PATH=$(NEON_CARGO_ARTIFACT_TARGET_DIR) \ -C $(POSTGRES_INSTALL_DIR)/build/neon-$* \ -f $(ROOT_PROJECT_DIR)/pgxn/neon/Makefile install + +@echo "Compiling neon_walredo $*" mkdir -p $(POSTGRES_INSTALL_DIR)/build/neon-walredo-$* $(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config COPT='$(COPT)' \ diff --git a/libs/neon-shmem/Cargo.toml b/libs/neon-shmem/Cargo.toml index 2a636bec40..de65f3e8cc 100644 --- a/libs/neon-shmem/Cargo.toml +++ b/libs/neon-shmem/Cargo.toml @@ -6,8 +6,12 @@ license.workspace = true [dependencies] thiserror.workspace = true -nix.workspace=true +nix.workspace = true workspace_hack = { version = "0.1", path = "../../workspace_hack" } +[dev-dependencies] +rand = "0.9.1" +rand_distr = "0.5.1" + [target.'cfg(target_os = "macos")'.dependencies] tempfile = "3.14.0" diff --git a/libs/neon-shmem/src/hash.rs b/libs/neon-shmem/src/hash.rs new file mode 100644 index 0000000000..ae53e2ec41 --- /dev/null +++ b/libs/neon-shmem/src/hash.rs @@ -0,0 +1,304 @@ +//! Hash table implementation on top of 'shmem' +//! +//! Features required in the long run by the communicator project: +//! +//! [X] Accessible from both Postgres processes and rust threads in the communicator process +//! [X] Low latency +//! [ ] Scalable to lots of concurrent accesses (currently relies on caller for locking) +//! [ ] Resizable + +use std::fmt::Debug; +use std::hash::{DefaultHasher, Hash, Hasher}; +use std::mem::MaybeUninit; + +use crate::shmem::ShmemHandle; + +mod core; +pub mod entry; + +#[cfg(test)] +mod tests; + +use core::CoreHashMap; +use entry::{Entry, OccupiedEntry}; + +#[derive(Debug)] +pub struct OutOfMemoryError(); + +pub struct HashMapInit<'a, K, V> { + // Hash table can be allocated in a fixed memory area, or in a resizeable ShmemHandle. + shmem_handle: Option, + shared_ptr: *mut HashMapShared<'a, K, V>, +} + +pub struct HashMapAccess<'a, K, V> { + shmem_handle: Option, + shared_ptr: *mut HashMapShared<'a, K, V>, +} + +unsafe impl<'a, K: Sync, V: Sync> Sync for HashMapAccess<'a, K, V> {} +unsafe impl<'a, K: Send, V: Send> Send for HashMapAccess<'a, K, V> {} + +impl<'a, K, V> HashMapInit<'a, K, V> { + pub fn attach_writer(self) -> HashMapAccess<'a, K, V> { + HashMapAccess { + shmem_handle: self.shmem_handle, + shared_ptr: self.shared_ptr, + } + } + + pub fn attach_reader(self) -> HashMapAccess<'a, K, V> { + // no difference to attach_writer currently + self.attach_writer() + } +} + +/// This is stored in the shared memory area +/// +/// NOTE: We carve out the parts from a contiguous chunk. Growing and shrinking the hash table +/// relies on the memory layout! The data structures are laid out in the contiguous shared memory +/// area as follows: +/// +/// HashMapShared +/// [buckets] +/// [dictionary] +/// +/// In between the above parts, there can be padding bytes to align the parts correctly. +struct HashMapShared<'a, K, V> { + inner: CoreHashMap<'a, K, V>, +} + +impl<'a, K, V> HashMapInit<'a, K, V> +where + K: Clone + Hash + Eq, +{ + pub fn estimate_size(num_buckets: u32) -> usize { + // add some margin to cover alignment etc. + CoreHashMap::::estimate_size(num_buckets) + size_of::>() + 1000 + } + + pub fn init_in_fixed_area( + num_buckets: u32, + area: &'a mut [MaybeUninit], + ) -> HashMapInit<'a, K, V> { + Self::init_common(num_buckets, None, area.as_mut_ptr().cast(), area.len()) + } + + /// Initialize a new hash map in the given shared memory area + pub fn init_in_shmem(num_buckets: u32, mut shmem: ShmemHandle) -> HashMapInit<'a, K, V> { + let size = Self::estimate_size(num_buckets); + shmem + .set_size(size) + .expect("could not resize shared memory area"); + + let ptr = unsafe { shmem.data_ptr.as_mut() }; + Self::init_common(num_buckets, Some(shmem), ptr, size) + } + + fn init_common( + num_buckets: u32, + shmem_handle: Option, + area_ptr: *mut u8, + area_len: usize, + ) -> HashMapInit<'a, K, V> { + // carve out the HashMapShared struct from the area. + let mut ptr: *mut u8 = area_ptr; + let end_ptr: *mut u8 = unsafe { area_ptr.add(area_len) }; + ptr = unsafe { ptr.add(ptr.align_offset(align_of::>())) }; + let shared_ptr: *mut HashMapShared = ptr.cast(); + ptr = unsafe { ptr.add(size_of::>()) }; + + // carve out the buckets + ptr = unsafe { ptr.byte_add(ptr.align_offset(align_of::>())) }; + let buckets_ptr = ptr; + ptr = unsafe { ptr.add(size_of::>() * num_buckets as usize) }; + + // use remaining space for the dictionary + ptr = unsafe { ptr.byte_add(ptr.align_offset(align_of::())) }; + assert!(ptr.addr() < end_ptr.addr()); + let dictionary_ptr = ptr; + let dictionary_size = unsafe { end_ptr.byte_offset_from(ptr) / size_of::() as isize }; + assert!(dictionary_size > 0); + + let buckets = + unsafe { std::slice::from_raw_parts_mut(buckets_ptr.cast(), num_buckets as usize) }; + let dictionary = unsafe { + std::slice::from_raw_parts_mut(dictionary_ptr.cast(), dictionary_size as usize) + }; + let hashmap = CoreHashMap::new(buckets, dictionary); + unsafe { + std::ptr::write(shared_ptr, HashMapShared { inner: hashmap }); + } + + HashMapInit { + shmem_handle: shmem_handle, + shared_ptr, + } + } +} + +impl<'a, K, V> HashMapAccess<'a, K, V> +where + K: Clone + Hash + Eq, +{ + pub fn get_hash_value(&self, key: &K) -> u64 { + let mut hasher = DefaultHasher::new(); + key.hash(&mut hasher); + hasher.finish() + } + + pub fn get_with_hash<'e>(&'e self, key: &K, hash: u64) -> Option<&'e V> { + let map = unsafe { self.shared_ptr.as_ref() }.unwrap(); + + map.inner.get_with_hash(key, hash) + } + + pub fn entry_with_hash(&mut self, key: K, hash: u64) -> Entry<'a, '_, K, V> { + let map = unsafe { self.shared_ptr.as_mut() }.unwrap(); + + map.inner.entry_with_hash(key, hash) + } + + pub fn remove_with_hash(&mut self, key: &K, hash: u64) { + let map = unsafe { self.shared_ptr.as_mut() }.unwrap(); + + match map.inner.entry_with_hash(key.clone(), hash) { + Entry::Occupied(e) => { + e.remove(); + } + Entry::Vacant(_) => {} + }; + } + + pub fn entry_at_bucket(&mut self, pos: usize) -> Option> { + let map = unsafe { self.shared_ptr.as_mut() }.unwrap(); + map.inner.entry_at_bucket(pos) + } + + pub fn get_num_buckets(&self) -> usize { + let map = unsafe { self.shared_ptr.as_ref() }.unwrap(); + map.inner.get_num_buckets() + } + + /// Return the key and value stored in bucket with given index. This can be used to + /// iterate through the hash map. (An Iterator might be nicer. The communicator's + /// clock algorithm needs to _slowly_ iterate through all buckets with its clock hand, + /// without holding a lock. If we switch to an Iterator, it must not hold the lock.) + pub fn get_at_bucket(&self, pos: usize) -> Option<&(K, V)> { + let map = unsafe { self.shared_ptr.as_ref() }.unwrap(); + + if pos >= map.inner.buckets.len() { + return None; + } + let bucket = &map.inner.buckets[pos]; + bucket.inner.as_ref() + } + + pub fn get_bucket_for_value(&self, val_ptr: *const V) -> usize { + let map = unsafe { self.shared_ptr.as_ref() }.unwrap(); + + let origin = map.inner.buckets.as_ptr(); + let idx = (val_ptr as usize - origin as usize) / (size_of::() as usize); + assert!(idx < map.inner.buckets.len()); + + idx + } + + // for metrics + pub fn get_num_buckets_in_use(&self) -> usize { + let map = unsafe { self.shared_ptr.as_ref() }.unwrap(); + map.inner.buckets_in_use as usize + } + + /// Grow + /// + /// 1. grow the underlying shared memory area + /// 2. Initialize new buckets. This overwrites the current dictionary + /// 3. Recalculate the dictionary + pub fn grow(&mut self, num_buckets: u32) -> Result<(), crate::shmem::Error> { + let map = unsafe { self.shared_ptr.as_mut() }.unwrap(); + let inner = &mut map.inner; + let old_num_buckets = inner.buckets.len() as u32; + + if num_buckets < old_num_buckets { + panic!("grow called with a smaller number of buckets"); + } + if num_buckets == old_num_buckets { + return Ok(()); + } + let shmem_handle = self + .shmem_handle + .as_ref() + .expect("grow called on a fixed-size hash table"); + + let size_bytes = HashMapInit::::estimate_size(num_buckets); + shmem_handle.set_size(size_bytes)?; + let end_ptr: *mut u8 = unsafe { shmem_handle.data_ptr.as_ptr().add(size_bytes) }; + + // Initialize new buckets. The new buckets are linked to the free list. NB: This overwrites + // the dictionary! + let buckets_ptr = inner.buckets.as_mut_ptr(); + unsafe { + for i in old_num_buckets..num_buckets { + let bucket_ptr = buckets_ptr.add(i as usize); + bucket_ptr.write(core::Bucket { + next: if i < num_buckets { + i as u32 + 1 + } else { + inner.free_head + }, + inner: None, + }); + } + } + + // Recalculate the dictionary + let buckets; + let dictionary; + unsafe { + let buckets_end_ptr = buckets_ptr.add(num_buckets as usize); + let dictionary_ptr: *mut u32 = buckets_end_ptr + .byte_add(buckets_end_ptr.align_offset(align_of::())) + .cast(); + let dictionary_size: usize = + end_ptr.byte_offset_from(buckets_end_ptr) as usize / size_of::(); + + buckets = std::slice::from_raw_parts_mut(buckets_ptr, num_buckets as usize); + dictionary = std::slice::from_raw_parts_mut(dictionary_ptr, dictionary_size); + } + for i in 0..dictionary.len() { + dictionary[i] = core::INVALID_POS; + } + + for i in 0..old_num_buckets as usize { + if buckets[i].inner.is_none() { + continue; + } + + let mut hasher = DefaultHasher::new(); + buckets[i].inner.as_ref().unwrap().0.hash(&mut hasher); + let hash = hasher.finish(); + + let pos: usize = (hash % dictionary.len() as u64) as usize; + buckets[i].next = dictionary[pos]; + dictionary[pos] = i as u32; + } + + // Finally, update the CoreHashMap struct + inner.dictionary = dictionary; + inner.buckets = buckets; + inner.free_head = old_num_buckets; + + Ok(()) + } + + // TODO: Shrinking is a multi-step process that requires co-operation from the caller + // + // 1. The caller must first call begin_shrink(). That forbids allocation of higher-numbered + // buckets. + // + // 2. Next, the caller must evict all entries in higher-numbered buckets. + // + // 3. Finally, call finish_shrink(). This recomputes the dictionary and shrinks the underlying + // shmem area +} diff --git a/libs/neon-shmem/src/hash/core.rs b/libs/neon-shmem/src/hash/core.rs new file mode 100644 index 0000000000..f4ff8ed2c4 --- /dev/null +++ b/libs/neon-shmem/src/hash/core.rs @@ -0,0 +1,174 @@ +//! Simple hash table with chaining +//! +//! # Resizing +//! + +use std::hash::Hash; +use std::mem::MaybeUninit; + +use crate::hash::entry::{Entry, OccupiedEntry, PrevPos, VacantEntry}; + +pub(crate) const INVALID_POS: u32 = u32::MAX; + +// Bucket +pub(crate) struct Bucket { + pub(crate) next: u32, + pub(crate) inner: Option<(K, V)>, +} + +pub(crate) struct CoreHashMap<'a, K, V> { + pub(crate) dictionary: &'a mut [u32], + pub(crate) buckets: &'a mut [Bucket], + pub(crate) free_head: u32, + + pub(crate) _user_list_head: u32, + + // metrics + pub(crate) buckets_in_use: u32, +} + +#[derive(Debug)] +pub struct FullError(); + +impl<'a, K: Hash + Eq, V> CoreHashMap<'a, K, V> +where + K: Clone + Hash + Eq, +{ + const FILL_FACTOR: f32 = 0.60; + + pub fn estimate_size(num_buckets: u32) -> usize { + let mut size = 0; + + // buckets + size += size_of::>() * num_buckets as usize; + + // dictionary + size += (f32::ceil((size_of::() * num_buckets as usize) as f32 / Self::FILL_FACTOR)) + as usize; + + size + } + + pub fn new( + buckets: &'a mut [MaybeUninit>], + dictionary: &'a mut [MaybeUninit], + ) -> CoreHashMap<'a, K, V> { + // Initialize the buckets + for i in 0..buckets.len() { + buckets[i].write(Bucket { + next: if i < buckets.len() - 1 { + i as u32 + 1 + } else { + INVALID_POS + }, + inner: None, + }); + } + + // Initialize the dictionary + for i in 0..dictionary.len() { + dictionary[i].write(INVALID_POS); + } + + // TODO: use std::slice::assume_init_mut() once it stabilizes + let buckets = + unsafe { std::slice::from_raw_parts_mut(buckets.as_mut_ptr().cast(), buckets.len()) }; + let dictionary = unsafe { + std::slice::from_raw_parts_mut(dictionary.as_mut_ptr().cast(), dictionary.len()) + }; + + CoreHashMap { + dictionary, + buckets, + free_head: 0, + buckets_in_use: 0, + _user_list_head: INVALID_POS, + } + } + + pub fn get_with_hash(&self, key: &K, hash: u64) -> Option<&V> { + let mut next = self.dictionary[hash as usize % self.dictionary.len()]; + loop { + if next == INVALID_POS { + return None; + } + + let bucket = &self.buckets[next as usize]; + let (bucket_key, bucket_value) = bucket.inner.as_ref().expect("entry is in use"); + if bucket_key == key { + return Some(&bucket_value); + } + next = bucket.next; + } + } + + // all updates are done through Entry + pub fn entry_with_hash(&mut self, key: K, hash: u64) -> Entry<'a, '_, K, V> { + let dict_pos = hash as usize % self.dictionary.len(); + let first = self.dictionary[dict_pos]; + if first == INVALID_POS { + // no existing entry + return Entry::Vacant(VacantEntry { + map: self, + key, + dict_pos: dict_pos as u32, + }); + } + + let mut prev_pos = PrevPos::First(dict_pos as u32); + let mut next = first; + loop { + let bucket = &mut self.buckets[next as usize]; + let (bucket_key, _bucket_value) = bucket.inner.as_mut().expect("entry is in use"); + if *bucket_key == key { + // found existing entry + return Entry::Occupied(OccupiedEntry { + map: self, + _key: key, + prev_pos, + bucket_pos: next, + }); + } + + if bucket.next == INVALID_POS { + // No existing entry + return Entry::Vacant(VacantEntry { + map: self, + key, + dict_pos: dict_pos as u32, + }); + } + prev_pos = PrevPos::Chained(next); + next = bucket.next; + } + } + + pub fn get_num_buckets(&self) -> usize { + self.buckets.len() + } + + pub fn entry_at_bucket(&mut self, pos: usize) -> Option> { + if pos >= self.buckets.len() { + return None; + } + + todo!() + //self.buckets[pos].inner.as_ref() + } + + pub(crate) fn alloc_bucket(&mut self, key: K, value: V) -> Result { + let pos = self.free_head; + if pos == INVALID_POS { + return Err(FullError()); + } + + let bucket = &mut self.buckets[pos as usize]; + self.free_head = bucket.next; + self.buckets_in_use += 1; + + bucket.next = INVALID_POS; + bucket.inner = Some((key, value)); + + return Ok(pos); + } +} diff --git a/libs/neon-shmem/src/hash/tests.rs b/libs/neon-shmem/src/hash/tests.rs new file mode 100644 index 0000000000..073aea5220 --- /dev/null +++ b/libs/neon-shmem/src/hash/tests.rs @@ -0,0 +1,220 @@ +use std::collections::BTreeMap; +use std::collections::HashSet; +use std::fmt::{Debug, Formatter}; +use std::sync::atomic::{AtomicUsize, Ordering}; + +use crate::hash::HashMapAccess; +use crate::hash::HashMapInit; +use crate::hash::UpdateAction; +use crate::shmem::ShmemHandle; + +use rand::seq::SliceRandom; +use rand::{Rng, RngCore}; +use rand_distr::Zipf; + +const TEST_KEY_LEN: usize = 16; + +#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)] +struct TestKey([u8; TEST_KEY_LEN]); + +impl From<&TestKey> for u128 { + fn from(val: &TestKey) -> u128 { + u128::from_be_bytes(val.0) + } +} + +impl From for TestKey { + fn from(val: u128) -> TestKey { + TestKey(val.to_be_bytes()) + } +} + +impl<'a> From<&'a [u8]> for TestKey { + fn from(bytes: &'a [u8]) -> TestKey { + TestKey(bytes.try_into().unwrap()) + } +} + +fn test_inserts + Copy>(keys: &[K]) { + const MAX_MEM_SIZE: usize = 10000000; + let shmem = ShmemHandle::new("test_inserts", 0, MAX_MEM_SIZE).unwrap(); + + let init_struct = HashMapInit::::init_in_shmem(100000, shmem); + let w = init_struct.attach_writer(); + + for (idx, k) in keys.iter().enumerate() { + let res = w.insert(&(*k).into(), idx); + assert!(res.is_ok()); + } + + for (idx, k) in keys.iter().enumerate() { + let x = w.get(&(*k).into()); + let value = x.as_deref().copied(); + assert_eq!(value, Some(idx)); + } + + //eprintln!("stats: {:?}", tree_writer.get_statistics()); +} + +#[test] +fn dense() { + // This exercises splitting a node with prefix + let keys: &[u128] = &[0, 1, 2, 3, 256]; + test_inserts(keys); + + // Dense keys + let mut keys: Vec = (0..10000).collect(); + test_inserts(&keys); + + // Do the same in random orders + for _ in 1..10 { + keys.shuffle(&mut rand::rng()); + test_inserts(&keys); + } +} + +#[test] +fn sparse() { + // sparse keys + let mut keys: Vec = Vec::new(); + let mut used_keys = HashSet::new(); + for _ in 0..10000 { + loop { + let key = rand::random::(); + if used_keys.get(&key).is_some() { + continue; + } + used_keys.insert(key); + keys.push(key.into()); + break; + } + } + test_inserts(&keys); +} + +struct TestValue(AtomicUsize); + +impl TestValue { + fn new(val: usize) -> TestValue { + TestValue(AtomicUsize::new(val)) + } + + fn load(&self) -> usize { + self.0.load(Ordering::Relaxed) + } +} + +impl Clone for TestValue { + fn clone(&self) -> TestValue { + TestValue::new(self.load()) + } +} + +impl Debug for TestValue { + fn fmt(&self, fmt: &mut Formatter<'_>) -> Result<(), std::fmt::Error> { + write!(fmt, "{:?}", self.load()) + } +} + +#[derive(Clone, Debug)] +struct TestOp(TestKey, Option); + +fn apply_op( + op: &TestOp, + sut: &HashMapAccess, + shadow: &mut BTreeMap, +) { + eprintln!("applying op: {op:?}"); + + // apply the change to the shadow tree first + let shadow_existing = if let Some(v) = op.1 { + shadow.insert(op.0, v) + } else { + shadow.remove(&op.0) + }; + + // apply to Art tree + sut.update_with_fn(&op.0, |existing| { + assert_eq!(existing.map(TestValue::load), shadow_existing); + + match (existing, op.1) { + (None, None) => UpdateAction::Nothing, + (None, Some(new_val)) => UpdateAction::Insert(TestValue::new(new_val)), + (Some(_old_val), None) => UpdateAction::Remove, + (Some(old_val), Some(new_val)) => { + old_val.0.store(new_val, Ordering::Relaxed); + UpdateAction::Nothing + } + } + }) + .expect("out of memory"); +} + +#[test] +fn random_ops() { + const MAX_MEM_SIZE: usize = 10000000; + let shmem = ShmemHandle::new("test_inserts", 0, MAX_MEM_SIZE).unwrap(); + + let init_struct = HashMapInit::::init_in_shmem(100000, shmem); + let writer = init_struct.attach_writer(); + + let mut shadow: std::collections::BTreeMap = BTreeMap::new(); + + let distribution = Zipf::new(u128::MAX as f64, 1.1).unwrap(); + let mut rng = rand::rng(); + for i in 0..100000 { + let key: TestKey = (rng.sample(distribution) as u128).into(); + + let op = TestOp(key, if rng.random_bool(0.75) { Some(i) } else { None }); + + apply_op(&op, &writer, &mut shadow); + + if i % 1000 == 0 { + eprintln!("{i} ops processed"); + //eprintln!("stats: {:?}", tree_writer.get_statistics()); + //test_iter(&tree_writer, &shadow); + } + } +} + +#[test] +fn test_grow() { + const MEM_SIZE: usize = 10000000; + let shmem = ShmemHandle::new("test_grow", 0, MEM_SIZE).unwrap(); + + let init_struct = HashMapInit::::init_in_shmem(1000, shmem); + let writer = init_struct.attach_writer(); + + let mut shadow: std::collections::BTreeMap = BTreeMap::new(); + + let mut rng = rand::rng(); + for i in 0..10000 { + let key: TestKey = ((rng.next_u32() % 1000) as u128).into(); + + let op = TestOp(key, if rng.random_bool(0.75) { Some(i) } else { None }); + + apply_op(&op, &writer, &mut shadow); + + if i % 1000 == 0 { + eprintln!("{i} ops processed"); + //eprintln!("stats: {:?}", tree_writer.get_statistics()); + //test_iter(&tree_writer, &shadow); + } + } + + writer.grow(1500).unwrap(); + + for i in 0..10000 { + let key: TestKey = ((rng.next_u32() % 1500) as u128).into(); + + let op = TestOp(key, if rng.random_bool(0.75) { Some(i) } else { None }); + + apply_op(&op, &writer, &mut shadow); + + if i % 1000 == 0 { + eprintln!("{i} ops processed"); + //eprintln!("stats: {:?}", tree_writer.get_statistics()); + //test_iter(&tree_writer, &shadow); + } + } +} diff --git a/libs/neon-shmem/src/lib.rs b/libs/neon-shmem/src/lib.rs index d4f171ed66..f601010122 100644 --- a/libs/neon-shmem/src/lib.rs +++ b/libs/neon-shmem/src/lib.rs @@ -1,3 +1,4 @@ //! Shared memory utilities for neon communicator +pub mod hash; pub mod shmem; diff --git a/pgxn/neon/Makefile b/pgxn/neon/Makefile index 8bcc6bf924..bc0d3cdeb7 100644 --- a/pgxn/neon/Makefile +++ b/pgxn/neon/Makefile @@ -1,6 +1,5 @@ # pgxs/neon/Makefile - MODULE_big = neon OBJS = \ $(WIN32RES) \ @@ -22,7 +21,8 @@ OBJS = \ walproposer.o \ walproposer_pg.o \ control_plane_connector.o \ - walsender_hooks.o + walsender_hooks.o \ + $(LIBCOMMUNICATOR_PATH)/libcommunicator.a PG_CPPFLAGS = -I$(libpq_srcdir) SHLIB_LINK_INTERNAL = $(libpq) diff --git a/pgxn/neon/communicator/Cargo.toml b/pgxn/neon/communicator/Cargo.toml new file mode 100644 index 0000000000..f09b9d7a14 --- /dev/null +++ b/pgxn/neon/communicator/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "communicator" +version = "0.1.0" +edition = "2024" + +[lib] +crate-type = ["staticlib"] + +[dependencies] +neon-shmem.workspace = true + +[build-dependencies] +cbindgen.workspace = true diff --git a/pgxn/neon/communicator/README.md b/pgxn/neon/communicator/README.md new file mode 100644 index 0000000000..48fda68721 --- /dev/null +++ b/pgxn/neon/communicator/README.md @@ -0,0 +1,8 @@ +This package will evolve into a "compute-pageserver communicator" +process and machinery. For now, it just provides wrappers on the +neon-shmem Rust crate, to allow using it in the C implementation of +the LFC. + +At compilation time, pgxn/neon/communicator/ produces a static +library, libcommunicator.a. It is linked to the neon.so extension +library. diff --git a/pgxn/neon/communicator/build.rs b/pgxn/neon/communicator/build.rs new file mode 100644 index 0000000000..ef570c3d0a --- /dev/null +++ b/pgxn/neon/communicator/build.rs @@ -0,0 +1,22 @@ +use std::env; + +fn main() -> Result<(), Box> { + let crate_dir = env::var("CARGO_MANIFEST_DIR").unwrap(); + + cbindgen::generate(crate_dir).map_or_else( + |error| match error { + cbindgen::Error::ParseSyntaxError { .. } => { + // This means there was a syntax error in the Rust sources. Don't panic, because + // we want the build to continue and the Rust compiler to hit the error. The + // Rust compiler produces a better error message than cbindgen. + eprintln!("Generating C bindings failed because of a Rust syntax error"); + } + e => panic!("Unable to generate C bindings: {:?}", e), + }, + |bindings| { + bindings.write_to_file("communicator_bindings.h"); + }, + ); + + Ok(()) +} diff --git a/pgxn/neon/communicator/cbindgen.toml b/pgxn/neon/communicator/cbindgen.toml new file mode 100644 index 0000000000..72e0c8174a --- /dev/null +++ b/pgxn/neon/communicator/cbindgen.toml @@ -0,0 +1,4 @@ +language = "C" + +[enum] +prefix_with_name = true diff --git a/pgxn/neon/communicator/src/file_cache_hashmap.rs b/pgxn/neon/communicator/src/file_cache_hashmap.rs new file mode 100644 index 0000000000..0a9ec3db31 --- /dev/null +++ b/pgxn/neon/communicator/src/file_cache_hashmap.rs @@ -0,0 +1,240 @@ +//! Glue code to allow using the Rust shmem hash map implementation from C code +//! +//! For convience of adapting existing code, the interface provided somewhat resembles the dynahash +//! interface. +//! +//! NOTE: The caller is responsible for locking! The caller is expected to hold the PostgreSQL +//! LWLock, 'lfc_lock', while accessing the hash table, in shared or exclusive mode as appropriate. + +use std::ffi::c_void; +use std::marker::PhantomData; + +use neon_shmem::hash::entry::Entry; +use neon_shmem::hash::{HashMapAccess, HashMapInit}; +use neon_shmem::shmem::ShmemHandle; + +/// NB: This must match the definition of BufferTag in Postgres C headers. We could use bindgen to +/// generate this from the C headers, but prefer to not introduce dependency on bindgen for now. +/// +/// Note that there are no padding bytes. If the corresponding C struct has padding bytes, the C C +/// code must clear them. +#[derive(Clone, Debug, Hash, Eq, PartialEq)] +#[repr(C)] +pub struct FileCacheKey { + pub _spc_id: u32, + pub _db_id: u32, + pub _rel_number: u32, + pub _fork_num: u32, + pub _block_num: u32, +} + +/// Like with FileCacheKey, this must match the definition of FileCacheEntry in file_cache.c. We +/// don't look at the contents here though, it's sufficent that the size and alignment matches. +#[derive(Clone, Debug, Default)] +#[repr(C)] +pub struct FileCacheEntry { + pub _offset: u32, + pub _access_count: u32, + pub _prev: *mut FileCacheEntry, + pub _next: *mut FileCacheEntry, + pub _state: [u32; 8], +} + +/// XXX: This could be just: +/// +/// ```ignore +/// type FileCacheHashMapHandle = HashMapInit<'a, FileCacheKey, FileCacheEntry> +/// ``` +/// +/// but with that, cbindgen generates a broken typedef in the C header file which doesn't +/// compile. It apparently gets confused by the generics. +#[repr(transparent)] +pub struct FileCacheHashMapHandle<'a>( + pub *mut c_void, + PhantomData>, +); +impl<'a> From>> for FileCacheHashMapHandle<'a> { + fn from(x: Box>) -> Self { + FileCacheHashMapHandle(Box::into_raw(x) as *mut c_void, PhantomData::default()) + } +} +impl<'a> From> for Box> { + fn from(x: FileCacheHashMapHandle) -> Self { + unsafe { Box::from_raw(x.0.cast()) } + } +} + +/// XXX: same for this +#[repr(transparent)] +pub struct FileCacheHashMapAccess<'a>( + pub *mut c_void, + PhantomData>, +); +impl<'a> From>> for FileCacheHashMapAccess<'a> { + fn from(x: Box>) -> Self { + // Convert the Box into a raw mutable pointer to the HashMapAccess itself. + // This transfers ownership of the HashMapAccess (and its contained ShmemHandle) + // to the raw pointer. The C caller is now responsible for managing this memory. + FileCacheHashMapAccess(Box::into_raw(x) as *mut c_void, PhantomData::default()) + } +} +impl<'a> FileCacheHashMapAccess<'a> { + fn as_ref(self) -> &'a HashMapAccess<'a, FileCacheKey, FileCacheEntry> { + let ptr: *mut HashMapAccess<'_, FileCacheKey, FileCacheEntry> = self.0.cast(); + unsafe { ptr.as_ref().unwrap() } + } + fn as_mut(self) -> &'a mut HashMapAccess<'a, FileCacheKey, FileCacheEntry> { + let ptr: *mut HashMapAccess<'_, FileCacheKey, FileCacheEntry> = self.0.cast(); + unsafe { ptr.as_mut().unwrap() } + } +} + +/// Initialize the shared memory area at postmaster startup. The returned handle is inherited +/// by all the backend processes across fork() +#[unsafe(no_mangle)] +pub extern "C" fn bcomm_file_cache_shmem_init<'a>( + initial_num_buckets: u32, + max_num_buckets: u32, +) -> FileCacheHashMapHandle<'a> { + let max_bytes = HashMapInit::::estimate_size(max_num_buckets); + let shmem_handle = + ShmemHandle::new("lfc mapping", 0, max_bytes).expect("shmem initialization failed"); + + let handle = HashMapInit::::init_in_shmem( + initial_num_buckets, + shmem_handle, + ); + + Box::new(handle).into() +} + +/// Initialize the access to the shared memory area in a backend process. +/// +/// XXX: I'm not sure if this actually gets called in each process, or if the returned struct +/// is also inherited across fork(). It currently works either way but if this did more +/// initialization that needed to be done after fork(), then it would matter. +#[unsafe(no_mangle)] +pub extern "C" fn bcomm_file_cache_shmem_access<'a>( + handle: FileCacheHashMapHandle<'a>, +) -> FileCacheHashMapAccess<'a> { + let handle: Box> = handle.into(); + Box::new(handle.attach_writer()).into() +} + +/// Return the current number of buckets in the hash table +#[unsafe(no_mangle)] +pub extern "C" fn bcomm_file_cache_get_num_buckets<'a>( + map: FileCacheHashMapAccess<'static>, +) -> u32 { + let map = map.as_ref(); + map.get_num_buckets().try_into().unwrap() +} + +/// Look up the entry with given key and hash. +/// +/// This is similar to dynahash's hash_search(... , HASH_FIND) +#[unsafe(no_mangle)] +pub extern "C" fn bcomm_file_cache_hash_find<'a>( + map: FileCacheHashMapAccess<'static>, + key: &FileCacheKey, + hash: u64, +) -> Option<&'static FileCacheEntry> { + let map = map.as_ref(); + map.get_with_hash(key, hash) +} + +/// Look up the entry at given bucket position +/// +/// This has no direct equivalent in the dynahash interface, but can be used to +/// iterate through all entries in the hash table. +#[unsafe(no_mangle)] +pub extern "C" fn bcomm_file_cache_hash_get_at_pos<'a>( + map: FileCacheHashMapAccess<'static>, + pos: u32, +) -> Option<&'static FileCacheEntry> { + let map = map.as_ref(); + map.get_at_bucket(pos as usize).map(|(_k, v)| v) +} + +/// Remove entry, given a pointer to the value. +/// +/// This is equivalent to dynahash hash_search(entry->key, HASH_REMOVE), where 'entry' +/// is an entry you have previously looked up +#[unsafe(no_mangle)] +pub extern "C" fn bcomm_file_cache_hash_remove_entry<'a, 'b>( + map: FileCacheHashMapAccess, + entry: *mut FileCacheEntry, +) { + let map = map.as_mut(); + let pos = map.get_bucket_for_value(entry); + match map.entry_at_bucket(pos) { + Some(e) => { + e.remove(); + } + None => { + // todo: shouldn't happen, panic? + } + } +} + +/// Compute the hash for given key +/// +/// This is equivalent to dynahash get_hash_value() function. We use Rust's default hasher +/// for calculating the hash though. +#[unsafe(no_mangle)] +pub extern "C" fn bcomm_file_cache_get_hash_value<'a, 'b>( + map: FileCacheHashMapAccess<'static>, + key: &FileCacheKey, +) -> u64 { + map.as_ref().get_hash_value(key) +} + +/// Insert a new entry to the hash table +/// +/// This is equivalent to dynahash hash_search(..., HASH_ENTER). +#[unsafe(no_mangle)] +pub extern "C" fn bcomm_file_cache_hash_enter<'a, 'b>( + map: FileCacheHashMapAccess, + key: &FileCacheKey, + hash: u64, + found: &mut bool, +) -> *mut FileCacheEntry { + match map.as_mut().entry_with_hash(key.clone(), hash) { + Entry::Occupied(mut e) => { + *found = true; + e.get_mut() + } + Entry::Vacant(e) => { + *found = false; + let initial_value = FileCacheEntry::default(); + e.insert(initial_value).expect("TODO: hash table full") + } + } +} + +/// Get the key for a given entry, which must be present in the hash table. +/// +/// Dynahash requires the key to be part of the "value" struct, so you can always +/// access the key with something like `entry->key`. The Rust implementation however +/// stores the key separately. This function extracts the separately stored key. +#[unsafe(no_mangle)] +pub extern "C" fn bcomm_file_cache_hash_get_key_for_entry<'a, 'b>( + map: FileCacheHashMapAccess, + entry: *const FileCacheEntry, +) -> Option<&FileCacheKey> { + let map = map.as_ref(); + let pos = map.get_bucket_for_value(entry); + map.get_at_bucket(pos as usize).map(|(k, _v)| k) +} + +/// Remove all entries from the hash table +#[unsafe(no_mangle)] +pub extern "C" fn bcomm_file_cache_hash_reset<'a, 'b>(map: FileCacheHashMapAccess) { + let map = map.as_mut(); + let num_buckets = map.get_num_buckets(); + for i in 0..num_buckets { + if let Some(e) = map.entry_at_bucket(i) { + e.remove(); + } + } +} diff --git a/pgxn/neon/communicator/src/lib.rs b/pgxn/neon/communicator/src/lib.rs new file mode 100644 index 0000000000..4120ce0d38 --- /dev/null +++ b/pgxn/neon/communicator/src/lib.rs @@ -0,0 +1 @@ +pub mod file_cache_hashmap; diff --git a/pgxn/neon/file_cache.c b/pgxn/neon/file_cache.c index 485c282414..5d199716d2 100644 --- a/pgxn/neon/file_cache.c +++ b/pgxn/neon/file_cache.c @@ -22,7 +22,6 @@ #include "funcapi.h" #include "miscadmin.h" #include "common/file_utils.h" -#include "common/hashfn.h" #include "pgstat.h" #include "port/pg_iovec.h" #include "postmaster/bgworker.h" @@ -37,7 +36,6 @@ #include "storage/procsignal.h" #include "tcop/tcopprot.h" #include "utils/builtins.h" -#include "utils/dynahash.h" #include "utils/guc.h" #if PG_VERSION_NUM >= 150000 @@ -47,6 +45,7 @@ #include "hll.h" #include "bitmap.h" #include "file_cache.h" +#include "file_cache_rust_hash.h" #include "neon.h" #include "neon_lwlsncache.h" #include "neon_perf_counters.h" @@ -124,14 +123,18 @@ typedef enum FileCacheBlockState typedef struct FileCacheEntry { - BufferTag key; - uint32 hash; uint32 offset; uint32 access_count; dlist_node list_node; /* LRU list node */ uint32 state[(BLOCKS_PER_CHUNK * 2 + 31) / 32]; /* two bits per block */ } FileCacheEntry; +/* Todo: alignment must be the same too */ +StaticAssertDecl(sizeof(FileCacheEntry) == sizeof(RustFileCacheEntry), + "Rust and C declarations of FileCacheEntry are incompatible"); +StaticAssertDecl(sizeof(BufferTag) == sizeof(RustFileCacheKey), + "Rust and C declarations of FileCacheKey are incompatible"); + #define GET_STATE(entry, i) (((entry)->state[(i) / 16] >> ((i) % 16 * 2)) & 3) #define SET_STATE(entry, i, new_state) (entry)->state[(i) / 16] = ((entry)->state[(i) / 16] & ~(3 << ((i) % 16 * 2))) | ((new_state) << ((i) % 16 * 2)) @@ -201,7 +204,8 @@ typedef struct FreeListChunk #define FILE_CACHE_STATE_SIZE_FOR_CHUNKS(n_chunks) (sizeof(FileCacheState) + (n_chunks)*sizeof(BufferTag) + (((n_chunks) * BLOCKS_PER_CHUNK)+7)/8) #define FILE_CACHE_STATE_SIZE(fcs) (sizeof(FileCacheState) + (fcs->n_chunks)*sizeof(BufferTag) + (((fcs->n_chunks) << fcs->chunk_size_log)+7)/8) -static HTAB *lfc_hash; +static FileCacheHashMapHandle lfc_hash_handle; +static FileCacheHashMapAccess lfc_hash; static int lfc_desc = -1; static LWLockId lfc_lock; static LWLockId lfc_freelist_lock; @@ -258,15 +262,9 @@ lfc_switch_off(void) if (LFC_ENABLED()) { - HASH_SEQ_STATUS status; - FileCacheEntry *entry; - /* Invalidate hash */ - hash_seq_init(&status, lfc_hash); - while ((entry = hash_seq_search(&status)) != NULL) - { - hash_search_with_hash_value(lfc_hash, &entry->key, entry->hash, HASH_REMOVE, NULL); - } + file_cache_hash_reset(lfc_hash); + lfc_ctl->generation += 1; lfc_ctl->size = 0; lfc_ctl->pinned = 0; @@ -347,7 +345,6 @@ lfc_shmem_startup(void) { size_t size; bool found; - static HASHCTL info; if (prev_shmem_startup_hook) { @@ -366,17 +363,13 @@ lfc_shmem_startup(void) lfc_lock = (LWLockId) GetNamedLWLockTranche("lfc_lock"); lfc_freelist_lock = (LWLockId) GetNamedLWLockTranche("lfc_freelist_lock"); - info.keysize = sizeof(BufferTag); - info.entrysize = sizeof(FileCacheEntry); /* * n_chunks+1 because we add new element to hash table before eviction * of victim */ - lfc_hash = ShmemInitHash("lfc_hash", - n_chunks + 1, n_chunks + 1, - &info, - HASH_ELEM | HASH_BLOBS); + lfc_hash_handle = file_cache_hash_shmem_init(n_chunks + 1, n_chunks + 1); + memset(lfc_ctl, 0, offsetof(FileCacheControl, free_pages)); dlist_init(&lfc_ctl->lru); @@ -406,6 +399,8 @@ lfc_shmem_startup(void) } LWLockRelease(AddinShmemInitLock); + + lfc_hash = file_cache_hash_shmem_access(lfc_hash_handle); } static void @@ -419,7 +414,6 @@ lfc_shmem_request(void) #endif size = sizeof(FileCacheControl); - size += hash_estimate_size(SIZE_MB_TO_CHUNKS(lfc_max_size) + 1, sizeof(FileCacheEntry)); RequestAddinShmemSpace(size); RequestNamedLWLockTranche("lfc_lock", 1); @@ -504,7 +498,7 @@ lfc_change_limit_hook(int newval, void *extra) lfc_ctl->used_pages -= is_page_cached; lfc_ctl->evicted_pages += is_page_cached; } - hash_search_with_hash_value(lfc_hash, &victim->key, victim->hash, HASH_REMOVE, NULL); + file_cache_hash_remove_entry(lfc_hash, victim); if (!freelist_push(offset)) { @@ -678,7 +672,7 @@ lfc_get_state(size_t max_entries) dlist_reverse_foreach(iter, &lfc_ctl->lru) { FileCacheEntry *entry = dlist_container(FileCacheEntry, list_node, iter.cur); - fcs->chunks[i] = entry->key; + fcs->chunks[i] = *file_cache_hash_get_key_for_entry(lfc_hash, entry); for (int j = 0; j < BLOCKS_PER_CHUNK; j++) { if (GET_STATE(entry, j) != UNAVAILABLE) @@ -967,7 +961,7 @@ lfc_invalidate(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber nblocks) { BufferTag tag; FileCacheEntry *entry; - uint32 hash; + uint64 hash; if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */ return; @@ -983,8 +977,8 @@ lfc_invalidate(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber nblocks) for (BlockNumber blkno = 0; blkno < nblocks; blkno += BLOCKS_PER_CHUNK) { tag.blockNum = blkno; - hash = get_hash_value(lfc_hash, &tag); - entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL); + hash = file_cache_hash_get_hash_value(lfc_hash, &tag); + entry = file_cache_hash_find(lfc_hash, &tag, hash); if (entry != NULL) { for (int i = 0; i < BLOCKS_PER_CHUNK; i++) @@ -1012,7 +1006,7 @@ lfc_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno) FileCacheEntry *entry; int chunk_offs = BLOCK_TO_CHUNK_OFF(blkno); bool found = false; - uint32 hash; + uint64 hash; if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */ return false; @@ -1022,12 +1016,12 @@ lfc_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno) tag.blockNum = blkno - chunk_offs; CriticalAssert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber); - hash = get_hash_value(lfc_hash, &tag); + hash = file_cache_hash_get_hash_value(lfc_hash, &tag); LWLockAcquire(lfc_lock, LW_SHARED); if (LFC_ENABLED()) { - entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL); + entry = file_cache_hash_find(lfc_hash, &tag, hash); found = entry != NULL && GET_STATE(entry, chunk_offs) != UNAVAILABLE; } LWLockRelease(lfc_lock); @@ -1046,7 +1040,7 @@ lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, FileCacheEntry *entry; uint32 chunk_offs; int found = 0; - uint32 hash; + uint64 hash; int i = 0; if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */ @@ -1059,7 +1053,7 @@ lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, chunk_offs = BLOCK_TO_CHUNK_OFF(blkno); tag.blockNum = blkno - chunk_offs; - hash = get_hash_value(lfc_hash, &tag); + hash = file_cache_hash_get_hash_value(lfc_hash, &tag); LWLockAcquire(lfc_lock, LW_SHARED); @@ -1071,7 +1065,7 @@ lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, while (true) { int this_chunk = Min(nblocks - i, BLOCKS_PER_CHUNK - chunk_offs); - entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL); + entry = file_cache_hash_find(lfc_hash, &tag, hash); if (entry != NULL) { @@ -1101,7 +1095,7 @@ lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, */ chunk_offs = BLOCK_TO_CHUNK_OFF(blkno + i); tag.blockNum = (blkno + i) - chunk_offs; - hash = get_hash_value(lfc_hash, &tag); + hash = file_cache_hash_get_hash_value(lfc_hash, &tag); } LWLockRelease(lfc_lock); @@ -1150,7 +1144,7 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, BufferTag tag; FileCacheEntry *entry; ssize_t rc; - uint32 hash; + uint64 hash; uint64 generation; uint32 entry_offset; int blocks_read = 0; @@ -1228,7 +1222,7 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, Assert(iov_last_used - first_block_in_chunk_read >= n_blocks_to_read); tag.blockNum = blkno - chunk_offs; - hash = get_hash_value(lfc_hash, &tag); + hash = file_cache_hash_get_hash_value(lfc_hash, &tag); cv = &lfc_ctl->cv[hash % N_COND_VARS]; LWLockAcquire(lfc_lock, LW_EXCLUSIVE); @@ -1241,13 +1235,13 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, return blocks_read; } - entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL); + entry = file_cache_hash_find(lfc_hash, &tag, hash); /* Approximate working set for the blocks assumed in this entry */ for (int i = 0; i < blocks_in_chunk; i++) { tag.blockNum = blkno + i; - addSHLL(&lfc_ctl->wss_estimation, hash_bytes((uint8_t const*)&tag, sizeof(tag))); + addSHLL(&lfc_ctl->wss_estimation, file_cache_hash_get_hash_value(lfc_hash, &tag)); } if (entry == NULL) @@ -1395,7 +1389,7 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, * Returns false if there are no unpinned entries and chunk can not be added. */ static bool -lfc_init_new_entry(FileCacheEntry* entry, uint32 hash) +lfc_init_new_entry(FileCacheEntry *entry) { /*----------- * If the chunk wasn't already in the LFC then we have these @@ -1451,21 +1445,18 @@ lfc_init_new_entry(FileCacheEntry* entry, uint32 hash) CriticalAssert(victim->access_count == 0); entry->offset = victim->offset; /* grab victim's chunk */ - hash_search_with_hash_value(lfc_hash, &victim->key, - victim->hash, HASH_REMOVE, NULL); + file_cache_hash_remove_entry(lfc_hash, victim); neon_log(DEBUG2, "Swap file cache page"); } else { /* Can't add this chunk - we don't have the space for it */ - hash_search_with_hash_value(lfc_hash, &entry->key, hash, - HASH_REMOVE, NULL); + file_cache_hash_remove_entry(lfc_hash, entry); lfc_ctl->prewarm_canceled = true; /* cancel prewarm if LFC limit is reached */ return false; } entry->access_count = 1; - entry->hash = hash; lfc_ctl->pinned += 1; for (int i = 0; i < BLOCKS_PER_CHUNK; i++) @@ -1505,7 +1496,7 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno, FileCacheEntry *entry; ssize_t rc; bool found; - uint32 hash; + uint64 hash; uint64 generation; uint32 entry_offset; instr_time io_start, io_end; @@ -1524,7 +1515,7 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno, CriticalAssert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber); tag.blockNum = blkno - chunk_offs; - hash = get_hash_value(lfc_hash, &tag); + hash = file_cache_hash_get_hash_value(lfc_hash, &tag); cv = &lfc_ctl->cv[hash % N_COND_VARS]; retry: @@ -1549,12 +1540,12 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno, return false; } - entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_ENTER, &found); + entry = file_cache_hash_enter(lfc_hash, &tag, hash, &found); if (lfc_prewarm_update_ws_estimation) { tag.blockNum = blkno; - addSHLL(&lfc_ctl->wss_estimation, hash_bytes((uint8_t const*)&tag, sizeof(tag))); + addSHLL(&lfc_ctl->wss_estimation, file_cache_hash_get_hash_value(lfc_hash, &tag)); } if (found) { @@ -1576,7 +1567,7 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno, } else { - if (!lfc_init_new_entry(entry, hash)) + if (!lfc_init_new_entry(entry)) { /* * We can't process this chunk due to lack of space in LFC, @@ -1659,7 +1650,7 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, FileCacheEntry *entry; ssize_t rc; bool found; - uint32 hash; + uint64 hash; uint64 generation; uint32 entry_offset; int buf_offset = 0; @@ -1711,16 +1702,16 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, } tag.blockNum = blkno - chunk_offs; - hash = get_hash_value(lfc_hash, &tag); + hash = file_cache_hash_get_hash_value(lfc_hash, &tag); cv = &lfc_ctl->cv[hash % N_COND_VARS]; - entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_ENTER, &found); + entry = file_cache_hash_enter(lfc_hash, &tag, hash, &found); /* Approximate working set for the blocks assumed in this entry */ for (int i = 0; i < blocks_in_chunk; i++) { tag.blockNum = blkno + i; - addSHLL(&lfc_ctl->wss_estimation, hash_bytes((uint8_t const*)&tag, sizeof(tag))); + addSHLL(&lfc_ctl->wss_estimation, file_cache_hash_get_hash_value(lfc_hash, &tag)); } if (found) @@ -1737,7 +1728,7 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, } else { - if (!lfc_init_new_entry(entry, hash)) + if (!lfc_init_new_entry(entry)) { /* * We can't process this chunk due to lack of space in LFC, @@ -2147,7 +2138,6 @@ local_cache_pages(PG_FUNCTION_ARGS) if (SRF_IS_FIRSTCALL()) { - HASH_SEQ_STATUS status; FileCacheEntry *entry; uint32 n_pages = 0; @@ -2203,9 +2193,14 @@ local_cache_pages(PG_FUNCTION_ARGS) if (LFC_ENABLED()) { - hash_seq_init(&status, lfc_hash); - while ((entry = hash_seq_search(&status)) != NULL) + uint32 num_buckets = file_cache_hash_get_num_buckets(lfc_hash); + + for (uint32 pos = 0; pos < num_buckets; pos++) { + entry = file_cache_hash_get_at_pos(lfc_hash, pos); + if (entry == NULL) + continue; + for (int i = 0; i < BLOCKS_PER_CHUNK; i++) n_pages += GET_STATE(entry, i) == AVAILABLE; } @@ -2229,20 +2224,26 @@ local_cache_pages(PG_FUNCTION_ARGS) * in the fctx->record structure. */ uint32 n = 0; + uint32 num_buckets = file_cache_hash_get_num_buckets(lfc_hash); - hash_seq_init(&status, lfc_hash); - while ((entry = hash_seq_search(&status)) != NULL) + for (uint32 pos = 0; pos < num_buckets; pos++) { + entry = file_cache_hash_get_at_pos(lfc_hash, pos); + if (entry == NULL) + continue; + for (int i = 0; i < BLOCKS_PER_CHUNK; i++) { + const BufferTag *key = file_cache_hash_get_key_for_entry(lfc_hash, entry); + if (GET_STATE(entry, i) == AVAILABLE) { fctx->record[n].pageoffs = entry->offset * BLOCKS_PER_CHUNK + i; - fctx->record[n].relfilenode = NInfoGetRelNumber(BufTagGetNRelFileInfo(entry->key)); - fctx->record[n].reltablespace = NInfoGetSpcOid(BufTagGetNRelFileInfo(entry->key)); - fctx->record[n].reldatabase = NInfoGetDbOid(BufTagGetNRelFileInfo(entry->key)); - fctx->record[n].forknum = entry->key.forkNum; - fctx->record[n].blocknum = entry->key.blockNum + i; + fctx->record[n].relfilenode = NInfoGetRelNumber(BufTagGetNRelFileInfo(*key)); + fctx->record[n].reltablespace = NInfoGetSpcOid(BufTagGetNRelFileInfo(*key)); + fctx->record[n].reldatabase = NInfoGetDbOid(BufTagGetNRelFileInfo(*key)); + fctx->record[n].forknum = key->forkNum; + fctx->record[n].blocknum = key->blockNum + i; fctx->record[n].accesscount = entry->access_count; n += 1; }