mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-10 06:52:55 +00:00
Use a custom Rust implementation to replace the LFC hash table
The new implementation lives in a separately allocated shared memory area, which could be resized. Resizing it isn't actually implemented yet, though. It would require some co-operation from the LFC code.
This commit is contained in:
126
Cargo.lock
generated
126
Cargo.lock
generated
@@ -1086,6 +1086,25 @@ version = "0.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5"
|
||||
|
||||
[[package]]
|
||||
name = "cbindgen"
|
||||
version = "0.28.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "eadd868a2ce9ca38de7eeafdcec9c7065ef89b42b32f0839278d55f35c54d1ff"
|
||||
dependencies = [
|
||||
"clap",
|
||||
"heck 0.4.1",
|
||||
"indexmap 2.9.0",
|
||||
"log",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"syn 2.0.100",
|
||||
"tempfile",
|
||||
"toml",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "cc"
|
||||
version = "1.2.16"
|
||||
@@ -1212,7 +1231,7 @@ version = "4.5.18"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4ac6a0c7b1a9e9a5186361f67dfa1b88213572f427fb9ab038efb2bd8c582dab"
|
||||
dependencies = [
|
||||
"heck",
|
||||
"heck 0.5.0",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.100",
|
||||
@@ -1270,6 +1289,14 @@ dependencies = [
|
||||
"unicode-width",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "communicator"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"cbindgen",
|
||||
"neon-shmem",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "compute_api"
|
||||
version = "0.1.0"
|
||||
@@ -1936,7 +1963,7 @@ checksum = "0892a17df262a24294c382f0d5997571006e7a4348b4327557c4ff1cd4a8bccc"
|
||||
dependencies = [
|
||||
"darling",
|
||||
"either",
|
||||
"heck",
|
||||
"heck 0.5.0",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.100",
|
||||
@@ -2500,6 +2527,18 @@ dependencies = [
|
||||
"wasm-bindgen",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "getrandom"
|
||||
version = "0.3.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "26145e563e54f2cadc477553f1ec5ee650b00862f0a58bcd12cbdc5f0ea2d2f4"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"libc",
|
||||
"r-efi",
|
||||
"wasi 0.14.2+wasi-0.2.4",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "gettid"
|
||||
version = "0.1.3"
|
||||
@@ -2712,6 +2751,12 @@ dependencies = [
|
||||
"http 1.1.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "heck"
|
||||
version = "0.4.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8"
|
||||
|
||||
[[package]]
|
||||
name = "heck"
|
||||
version = "0.5.0"
|
||||
@@ -3648,7 +3693,7 @@ version = "0.0.22"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b9e6777fc80a575f9503d908c8b498782a6c3ee88a06cb416dc3941401e43b94"
|
||||
dependencies = [
|
||||
"heck",
|
||||
"heck 0.5.0",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.100",
|
||||
@@ -3710,7 +3755,7 @@ dependencies = [
|
||||
"procfs",
|
||||
"prometheus",
|
||||
"rand 0.8.5",
|
||||
"rand_distr",
|
||||
"rand_distr 0.4.3",
|
||||
"twox-hash",
|
||||
]
|
||||
|
||||
@@ -3799,6 +3844,8 @@ name = "neon-shmem"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"nix 0.30.1",
|
||||
"rand 0.9.1",
|
||||
"rand_distr 0.5.1",
|
||||
"tempfile",
|
||||
"thiserror 1.0.69",
|
||||
"workspace_hack",
|
||||
@@ -5092,7 +5139,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "22505a5c94da8e3b7c2996394d1c933236c4d743e81a410bcca4e6989fc066a4"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"heck",
|
||||
"heck 0.5.0",
|
||||
"itertools 0.12.1",
|
||||
"log",
|
||||
"multimap",
|
||||
@@ -5113,7 +5160,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0c1318b19085f08681016926435853bbf7858f9c082d0999b80550ff5d9abe15"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"heck",
|
||||
"heck 0.5.0",
|
||||
"itertools 0.12.1",
|
||||
"log",
|
||||
"multimap",
|
||||
@@ -5238,7 +5285,7 @@ dependencies = [
|
||||
"postgres_backend",
|
||||
"pq_proto",
|
||||
"rand 0.8.5",
|
||||
"rand_distr",
|
||||
"rand_distr 0.4.3",
|
||||
"rcgen",
|
||||
"redis",
|
||||
"regex",
|
||||
@@ -5342,6 +5389,12 @@ dependencies = [
|
||||
"proc-macro2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "r-efi"
|
||||
version = "5.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "74765f6d916ee2faa39bc8e68e4f3ed8949b48cccdac59983d287a7cb71ce9c5"
|
||||
|
||||
[[package]]
|
||||
name = "rand"
|
||||
version = "0.7.3"
|
||||
@@ -5366,6 +5419,16 @@ dependencies = [
|
||||
"rand_core 0.6.4",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rand"
|
||||
version = "0.9.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9fbfd9d094a40bf3ae768db9361049ace4c0e04a4fd6b359518bd7b73a73dd97"
|
||||
dependencies = [
|
||||
"rand_chacha 0.9.0",
|
||||
"rand_core 0.9.3",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rand_chacha"
|
||||
version = "0.2.2"
|
||||
@@ -5386,6 +5449,16 @@ dependencies = [
|
||||
"rand_core 0.6.4",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rand_chacha"
|
||||
version = "0.9.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb"
|
||||
dependencies = [
|
||||
"ppv-lite86",
|
||||
"rand_core 0.9.3",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rand_core"
|
||||
version = "0.5.1"
|
||||
@@ -5404,6 +5477,15 @@ dependencies = [
|
||||
"getrandom 0.2.11",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rand_core"
|
||||
version = "0.9.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "99d9a13982dcf210057a8a78572b2217b667c3beacbf3a0d8b454f6f82837d38"
|
||||
dependencies = [
|
||||
"getrandom 0.3.3",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rand_distr"
|
||||
version = "0.4.3"
|
||||
@@ -5414,6 +5496,16 @@ dependencies = [
|
||||
"rand 0.8.5",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rand_distr"
|
||||
version = "0.5.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6a8615d50dcf34fa31f7ab52692afec947c4dd0ab803cc87cb3b0b4570ff7463"
|
||||
dependencies = [
|
||||
"num-traits",
|
||||
"rand 0.9.1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rand_hc"
|
||||
version = "0.2.0"
|
||||
@@ -6900,7 +6992,7 @@ version = "0.26.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4c6bee85a5a24955dc440386795aa378cd9cf82acd5f764469152d2270e581be"
|
||||
dependencies = [
|
||||
"heck",
|
||||
"heck 0.5.0",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"rustversion",
|
||||
@@ -8199,6 +8291,15 @@ version = "0.11.0+wasi-snapshot-preview1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
|
||||
|
||||
[[package]]
|
||||
name = "wasi"
|
||||
version = "0.14.2+wasi-0.2.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9683f9a5a998d873c0d21fcbe3c083009670149a8fab228644b8bd36b2c48cb3"
|
||||
dependencies = [
|
||||
"wit-bindgen-rt",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "wasite"
|
||||
version = "0.1.0"
|
||||
@@ -8556,6 +8657,15 @@ dependencies = [
|
||||
"windows-sys 0.48.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "wit-bindgen-rt"
|
||||
version = "0.39.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6f42320e61fe2cfd34354ecb597f86f413484a798ba44a8ca1165c58d42da6c1"
|
||||
dependencies = [
|
||||
"bitflags 2.8.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "workspace_hack"
|
||||
version = "0.1.0"
|
||||
|
||||
@@ -44,6 +44,7 @@ members = [
|
||||
"libs/proxy/postgres-types2",
|
||||
"libs/proxy/tokio-postgres2",
|
||||
"endpoint_storage",
|
||||
"pgxn/neon/communicator",
|
||||
]
|
||||
|
||||
[workspace.package]
|
||||
@@ -251,6 +252,7 @@ desim = { version = "0.1", path = "./libs/desim" }
|
||||
endpoint_storage = { version = "0.0.1", path = "./endpoint_storage/" }
|
||||
http-utils = { version = "0.1", path = "./libs/http-utils/" }
|
||||
metrics = { version = "0.1", path = "./libs/metrics/" }
|
||||
neon-shmem = { version = "0.1", path = "./libs/neon-shmem/" }
|
||||
pageserver = { path = "./pageserver" }
|
||||
pageserver_api = { version = "0.1", path = "./libs/pageserver_api/" }
|
||||
pageserver_client = { path = "./pageserver/client" }
|
||||
@@ -278,6 +280,7 @@ walproposer = { version = "0.1", path = "./libs/walproposer/" }
|
||||
workspace_hack = { version = "0.1", path = "./workspace_hack/" }
|
||||
|
||||
## Build dependencies
|
||||
cbindgen = "0.28.0"
|
||||
criterion = "0.5.1"
|
||||
rcgen = "0.13"
|
||||
rstest = "0.18"
|
||||
|
||||
7
Makefile
7
Makefile
@@ -18,10 +18,12 @@ ifeq ($(BUILD_TYPE),release)
|
||||
PG_LDFLAGS = $(LDFLAGS)
|
||||
# Unfortunately, `--profile=...` is a nightly feature
|
||||
CARGO_BUILD_FLAGS += --release
|
||||
NEON_CARGO_ARTIFACT_TARGET_DIR = $(ROOT_PROJECT_DIR)/target/release
|
||||
else ifeq ($(BUILD_TYPE),debug)
|
||||
PG_CONFIGURE_OPTS = --enable-debug --with-openssl --enable-cassert --enable-depend
|
||||
PG_CFLAGS += -O0 -g3 $(CFLAGS)
|
||||
PG_LDFLAGS = $(LDFLAGS)
|
||||
NEON_CARGO_ARTIFACT_TARGET_DIR = $(ROOT_PROJECT_DIR)/target/debug
|
||||
else
|
||||
$(error Bad build type '$(BUILD_TYPE)', see Makefile for options)
|
||||
endif
|
||||
@@ -180,11 +182,16 @@ postgres-check-%: postgres-%
|
||||
|
||||
.PHONY: neon-pg-ext-%
|
||||
neon-pg-ext-%: postgres-%
|
||||
+@echo "Compiling communicator $*"
|
||||
$(CARGO_CMD_PREFIX) cargo build -p communicator $(CARGO_BUILD_FLAGS)
|
||||
|
||||
+@echo "Compiling neon $*"
|
||||
mkdir -p $(POSTGRES_INSTALL_DIR)/build/neon-$*
|
||||
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config COPT='$(COPT)' \
|
||||
LIBCOMMUNICATOR_PATH=$(NEON_CARGO_ARTIFACT_TARGET_DIR) \
|
||||
-C $(POSTGRES_INSTALL_DIR)/build/neon-$* \
|
||||
-f $(ROOT_PROJECT_DIR)/pgxn/neon/Makefile install
|
||||
|
||||
+@echo "Compiling neon_walredo $*"
|
||||
mkdir -p $(POSTGRES_INSTALL_DIR)/build/neon-walredo-$*
|
||||
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config COPT='$(COPT)' \
|
||||
|
||||
@@ -6,8 +6,12 @@ license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
thiserror.workspace = true
|
||||
nix.workspace=true
|
||||
nix.workspace = true
|
||||
workspace_hack = { version = "0.1", path = "../../workspace_hack" }
|
||||
|
||||
[dev-dependencies]
|
||||
rand = "0.9.1"
|
||||
rand_distr = "0.5.1"
|
||||
|
||||
[target.'cfg(target_os = "macos")'.dependencies]
|
||||
tempfile = "3.14.0"
|
||||
|
||||
304
libs/neon-shmem/src/hash.rs
Normal file
304
libs/neon-shmem/src/hash.rs
Normal file
@@ -0,0 +1,304 @@
|
||||
//! Hash table implementation on top of 'shmem'
|
||||
//!
|
||||
//! Features required in the long run by the communicator project:
|
||||
//!
|
||||
//! [X] Accessible from both Postgres processes and rust threads in the communicator process
|
||||
//! [X] Low latency
|
||||
//! [ ] Scalable to lots of concurrent accesses (currently relies on caller for locking)
|
||||
//! [ ] Resizable
|
||||
|
||||
use std::fmt::Debug;
|
||||
use std::hash::{DefaultHasher, Hash, Hasher};
|
||||
use std::mem::MaybeUninit;
|
||||
|
||||
use crate::shmem::ShmemHandle;
|
||||
|
||||
mod core;
|
||||
pub mod entry;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
use core::CoreHashMap;
|
||||
use entry::{Entry, OccupiedEntry};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct OutOfMemoryError();
|
||||
|
||||
pub struct HashMapInit<'a, K, V> {
|
||||
// Hash table can be allocated in a fixed memory area, or in a resizeable ShmemHandle.
|
||||
shmem_handle: Option<ShmemHandle>,
|
||||
shared_ptr: *mut HashMapShared<'a, K, V>,
|
||||
}
|
||||
|
||||
pub struct HashMapAccess<'a, K, V> {
|
||||
shmem_handle: Option<ShmemHandle>,
|
||||
shared_ptr: *mut HashMapShared<'a, K, V>,
|
||||
}
|
||||
|
||||
unsafe impl<'a, K: Sync, V: Sync> Sync for HashMapAccess<'a, K, V> {}
|
||||
unsafe impl<'a, K: Send, V: Send> Send for HashMapAccess<'a, K, V> {}
|
||||
|
||||
impl<'a, K, V> HashMapInit<'a, K, V> {
|
||||
pub fn attach_writer(self) -> HashMapAccess<'a, K, V> {
|
||||
HashMapAccess {
|
||||
shmem_handle: self.shmem_handle,
|
||||
shared_ptr: self.shared_ptr,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn attach_reader(self) -> HashMapAccess<'a, K, V> {
|
||||
// no difference to attach_writer currently
|
||||
self.attach_writer()
|
||||
}
|
||||
}
|
||||
|
||||
/// This is stored in the shared memory area
|
||||
///
|
||||
/// NOTE: We carve out the parts from a contiguous chunk. Growing and shrinking the hash table
|
||||
/// relies on the memory layout! The data structures are laid out in the contiguous shared memory
|
||||
/// area as follows:
|
||||
///
|
||||
/// HashMapShared
|
||||
/// [buckets]
|
||||
/// [dictionary]
|
||||
///
|
||||
/// In between the above parts, there can be padding bytes to align the parts correctly.
|
||||
struct HashMapShared<'a, K, V> {
|
||||
inner: CoreHashMap<'a, K, V>,
|
||||
}
|
||||
|
||||
impl<'a, K, V> HashMapInit<'a, K, V>
|
||||
where
|
||||
K: Clone + Hash + Eq,
|
||||
{
|
||||
pub fn estimate_size(num_buckets: u32) -> usize {
|
||||
// add some margin to cover alignment etc.
|
||||
CoreHashMap::<K, V>::estimate_size(num_buckets) + size_of::<HashMapShared<K, V>>() + 1000
|
||||
}
|
||||
|
||||
pub fn init_in_fixed_area(
|
||||
num_buckets: u32,
|
||||
area: &'a mut [MaybeUninit<u8>],
|
||||
) -> HashMapInit<'a, K, V> {
|
||||
Self::init_common(num_buckets, None, area.as_mut_ptr().cast(), area.len())
|
||||
}
|
||||
|
||||
/// Initialize a new hash map in the given shared memory area
|
||||
pub fn init_in_shmem(num_buckets: u32, mut shmem: ShmemHandle) -> HashMapInit<'a, K, V> {
|
||||
let size = Self::estimate_size(num_buckets);
|
||||
shmem
|
||||
.set_size(size)
|
||||
.expect("could not resize shared memory area");
|
||||
|
||||
let ptr = unsafe { shmem.data_ptr.as_mut() };
|
||||
Self::init_common(num_buckets, Some(shmem), ptr, size)
|
||||
}
|
||||
|
||||
fn init_common(
|
||||
num_buckets: u32,
|
||||
shmem_handle: Option<ShmemHandle>,
|
||||
area_ptr: *mut u8,
|
||||
area_len: usize,
|
||||
) -> HashMapInit<'a, K, V> {
|
||||
// carve out the HashMapShared struct from the area.
|
||||
let mut ptr: *mut u8 = area_ptr;
|
||||
let end_ptr: *mut u8 = unsafe { area_ptr.add(area_len) };
|
||||
ptr = unsafe { ptr.add(ptr.align_offset(align_of::<HashMapShared<K, V>>())) };
|
||||
let shared_ptr: *mut HashMapShared<K, V> = ptr.cast();
|
||||
ptr = unsafe { ptr.add(size_of::<HashMapShared<K, V>>()) };
|
||||
|
||||
// carve out the buckets
|
||||
ptr = unsafe { ptr.byte_add(ptr.align_offset(align_of::<core::Bucket<K, V>>())) };
|
||||
let buckets_ptr = ptr;
|
||||
ptr = unsafe { ptr.add(size_of::<core::Bucket<K, V>>() * num_buckets as usize) };
|
||||
|
||||
// use remaining space for the dictionary
|
||||
ptr = unsafe { ptr.byte_add(ptr.align_offset(align_of::<u32>())) };
|
||||
assert!(ptr.addr() < end_ptr.addr());
|
||||
let dictionary_ptr = ptr;
|
||||
let dictionary_size = unsafe { end_ptr.byte_offset_from(ptr) / size_of::<u32>() as isize };
|
||||
assert!(dictionary_size > 0);
|
||||
|
||||
let buckets =
|
||||
unsafe { std::slice::from_raw_parts_mut(buckets_ptr.cast(), num_buckets as usize) };
|
||||
let dictionary = unsafe {
|
||||
std::slice::from_raw_parts_mut(dictionary_ptr.cast(), dictionary_size as usize)
|
||||
};
|
||||
let hashmap = CoreHashMap::new(buckets, dictionary);
|
||||
unsafe {
|
||||
std::ptr::write(shared_ptr, HashMapShared { inner: hashmap });
|
||||
}
|
||||
|
||||
HashMapInit {
|
||||
shmem_handle: shmem_handle,
|
||||
shared_ptr,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, K, V> HashMapAccess<'a, K, V>
|
||||
where
|
||||
K: Clone + Hash + Eq,
|
||||
{
|
||||
pub fn get_hash_value(&self, key: &K) -> u64 {
|
||||
let mut hasher = DefaultHasher::new();
|
||||
key.hash(&mut hasher);
|
||||
hasher.finish()
|
||||
}
|
||||
|
||||
pub fn get_with_hash<'e>(&'e self, key: &K, hash: u64) -> Option<&'e V> {
|
||||
let map = unsafe { self.shared_ptr.as_ref() }.unwrap();
|
||||
|
||||
map.inner.get_with_hash(key, hash)
|
||||
}
|
||||
|
||||
pub fn entry_with_hash(&mut self, key: K, hash: u64) -> Entry<'a, '_, K, V> {
|
||||
let map = unsafe { self.shared_ptr.as_mut() }.unwrap();
|
||||
|
||||
map.inner.entry_with_hash(key, hash)
|
||||
}
|
||||
|
||||
pub fn remove_with_hash(&mut self, key: &K, hash: u64) {
|
||||
let map = unsafe { self.shared_ptr.as_mut() }.unwrap();
|
||||
|
||||
match map.inner.entry_with_hash(key.clone(), hash) {
|
||||
Entry::Occupied(e) => {
|
||||
e.remove();
|
||||
}
|
||||
Entry::Vacant(_) => {}
|
||||
};
|
||||
}
|
||||
|
||||
pub fn entry_at_bucket(&mut self, pos: usize) -> Option<OccupiedEntry<'a, '_, K, V>> {
|
||||
let map = unsafe { self.shared_ptr.as_mut() }.unwrap();
|
||||
map.inner.entry_at_bucket(pos)
|
||||
}
|
||||
|
||||
pub fn get_num_buckets(&self) -> usize {
|
||||
let map = unsafe { self.shared_ptr.as_ref() }.unwrap();
|
||||
map.inner.get_num_buckets()
|
||||
}
|
||||
|
||||
/// Return the key and value stored in bucket with given index. This can be used to
|
||||
/// iterate through the hash map. (An Iterator might be nicer. The communicator's
|
||||
/// clock algorithm needs to _slowly_ iterate through all buckets with its clock hand,
|
||||
/// without holding a lock. If we switch to an Iterator, it must not hold the lock.)
|
||||
pub fn get_at_bucket(&self, pos: usize) -> Option<&(K, V)> {
|
||||
let map = unsafe { self.shared_ptr.as_ref() }.unwrap();
|
||||
|
||||
if pos >= map.inner.buckets.len() {
|
||||
return None;
|
||||
}
|
||||
let bucket = &map.inner.buckets[pos];
|
||||
bucket.inner.as_ref()
|
||||
}
|
||||
|
||||
pub fn get_bucket_for_value(&self, val_ptr: *const V) -> usize {
|
||||
let map = unsafe { self.shared_ptr.as_ref() }.unwrap();
|
||||
|
||||
let origin = map.inner.buckets.as_ptr();
|
||||
let idx = (val_ptr as usize - origin as usize) / (size_of::<V>() as usize);
|
||||
assert!(idx < map.inner.buckets.len());
|
||||
|
||||
idx
|
||||
}
|
||||
|
||||
// for metrics
|
||||
pub fn get_num_buckets_in_use(&self) -> usize {
|
||||
let map = unsafe { self.shared_ptr.as_ref() }.unwrap();
|
||||
map.inner.buckets_in_use as usize
|
||||
}
|
||||
|
||||
/// Grow
|
||||
///
|
||||
/// 1. grow the underlying shared memory area
|
||||
/// 2. Initialize new buckets. This overwrites the current dictionary
|
||||
/// 3. Recalculate the dictionary
|
||||
pub fn grow(&mut self, num_buckets: u32) -> Result<(), crate::shmem::Error> {
|
||||
let map = unsafe { self.shared_ptr.as_mut() }.unwrap();
|
||||
let inner = &mut map.inner;
|
||||
let old_num_buckets = inner.buckets.len() as u32;
|
||||
|
||||
if num_buckets < old_num_buckets {
|
||||
panic!("grow called with a smaller number of buckets");
|
||||
}
|
||||
if num_buckets == old_num_buckets {
|
||||
return Ok(());
|
||||
}
|
||||
let shmem_handle = self
|
||||
.shmem_handle
|
||||
.as_ref()
|
||||
.expect("grow called on a fixed-size hash table");
|
||||
|
||||
let size_bytes = HashMapInit::<K, V>::estimate_size(num_buckets);
|
||||
shmem_handle.set_size(size_bytes)?;
|
||||
let end_ptr: *mut u8 = unsafe { shmem_handle.data_ptr.as_ptr().add(size_bytes) };
|
||||
|
||||
// Initialize new buckets. The new buckets are linked to the free list. NB: This overwrites
|
||||
// the dictionary!
|
||||
let buckets_ptr = inner.buckets.as_mut_ptr();
|
||||
unsafe {
|
||||
for i in old_num_buckets..num_buckets {
|
||||
let bucket_ptr = buckets_ptr.add(i as usize);
|
||||
bucket_ptr.write(core::Bucket {
|
||||
next: if i < num_buckets {
|
||||
i as u32 + 1
|
||||
} else {
|
||||
inner.free_head
|
||||
},
|
||||
inner: None,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Recalculate the dictionary
|
||||
let buckets;
|
||||
let dictionary;
|
||||
unsafe {
|
||||
let buckets_end_ptr = buckets_ptr.add(num_buckets as usize);
|
||||
let dictionary_ptr: *mut u32 = buckets_end_ptr
|
||||
.byte_add(buckets_end_ptr.align_offset(align_of::<u32>()))
|
||||
.cast();
|
||||
let dictionary_size: usize =
|
||||
end_ptr.byte_offset_from(buckets_end_ptr) as usize / size_of::<u32>();
|
||||
|
||||
buckets = std::slice::from_raw_parts_mut(buckets_ptr, num_buckets as usize);
|
||||
dictionary = std::slice::from_raw_parts_mut(dictionary_ptr, dictionary_size);
|
||||
}
|
||||
for i in 0..dictionary.len() {
|
||||
dictionary[i] = core::INVALID_POS;
|
||||
}
|
||||
|
||||
for i in 0..old_num_buckets as usize {
|
||||
if buckets[i].inner.is_none() {
|
||||
continue;
|
||||
}
|
||||
|
||||
let mut hasher = DefaultHasher::new();
|
||||
buckets[i].inner.as_ref().unwrap().0.hash(&mut hasher);
|
||||
let hash = hasher.finish();
|
||||
|
||||
let pos: usize = (hash % dictionary.len() as u64) as usize;
|
||||
buckets[i].next = dictionary[pos];
|
||||
dictionary[pos] = i as u32;
|
||||
}
|
||||
|
||||
// Finally, update the CoreHashMap struct
|
||||
inner.dictionary = dictionary;
|
||||
inner.buckets = buckets;
|
||||
inner.free_head = old_num_buckets;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// TODO: Shrinking is a multi-step process that requires co-operation from the caller
|
||||
//
|
||||
// 1. The caller must first call begin_shrink(). That forbids allocation of higher-numbered
|
||||
// buckets.
|
||||
//
|
||||
// 2. Next, the caller must evict all entries in higher-numbered buckets.
|
||||
//
|
||||
// 3. Finally, call finish_shrink(). This recomputes the dictionary and shrinks the underlying
|
||||
// shmem area
|
||||
}
|
||||
174
libs/neon-shmem/src/hash/core.rs
Normal file
174
libs/neon-shmem/src/hash/core.rs
Normal file
@@ -0,0 +1,174 @@
|
||||
//! Simple hash table with chaining
|
||||
//!
|
||||
//! # Resizing
|
||||
//!
|
||||
|
||||
use std::hash::Hash;
|
||||
use std::mem::MaybeUninit;
|
||||
|
||||
use crate::hash::entry::{Entry, OccupiedEntry, PrevPos, VacantEntry};
|
||||
|
||||
pub(crate) const INVALID_POS: u32 = u32::MAX;
|
||||
|
||||
// Bucket
|
||||
pub(crate) struct Bucket<K, V> {
|
||||
pub(crate) next: u32,
|
||||
pub(crate) inner: Option<(K, V)>,
|
||||
}
|
||||
|
||||
pub(crate) struct CoreHashMap<'a, K, V> {
|
||||
pub(crate) dictionary: &'a mut [u32],
|
||||
pub(crate) buckets: &'a mut [Bucket<K, V>],
|
||||
pub(crate) free_head: u32,
|
||||
|
||||
pub(crate) _user_list_head: u32,
|
||||
|
||||
// metrics
|
||||
pub(crate) buckets_in_use: u32,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct FullError();
|
||||
|
||||
impl<'a, K: Hash + Eq, V> CoreHashMap<'a, K, V>
|
||||
where
|
||||
K: Clone + Hash + Eq,
|
||||
{
|
||||
const FILL_FACTOR: f32 = 0.60;
|
||||
|
||||
pub fn estimate_size(num_buckets: u32) -> usize {
|
||||
let mut size = 0;
|
||||
|
||||
// buckets
|
||||
size += size_of::<Bucket<K, V>>() * num_buckets as usize;
|
||||
|
||||
// dictionary
|
||||
size += (f32::ceil((size_of::<u32>() * num_buckets as usize) as f32 / Self::FILL_FACTOR))
|
||||
as usize;
|
||||
|
||||
size
|
||||
}
|
||||
|
||||
pub fn new(
|
||||
buckets: &'a mut [MaybeUninit<Bucket<K, V>>],
|
||||
dictionary: &'a mut [MaybeUninit<u32>],
|
||||
) -> CoreHashMap<'a, K, V> {
|
||||
// Initialize the buckets
|
||||
for i in 0..buckets.len() {
|
||||
buckets[i].write(Bucket {
|
||||
next: if i < buckets.len() - 1 {
|
||||
i as u32 + 1
|
||||
} else {
|
||||
INVALID_POS
|
||||
},
|
||||
inner: None,
|
||||
});
|
||||
}
|
||||
|
||||
// Initialize the dictionary
|
||||
for i in 0..dictionary.len() {
|
||||
dictionary[i].write(INVALID_POS);
|
||||
}
|
||||
|
||||
// TODO: use std::slice::assume_init_mut() once it stabilizes
|
||||
let buckets =
|
||||
unsafe { std::slice::from_raw_parts_mut(buckets.as_mut_ptr().cast(), buckets.len()) };
|
||||
let dictionary = unsafe {
|
||||
std::slice::from_raw_parts_mut(dictionary.as_mut_ptr().cast(), dictionary.len())
|
||||
};
|
||||
|
||||
CoreHashMap {
|
||||
dictionary,
|
||||
buckets,
|
||||
free_head: 0,
|
||||
buckets_in_use: 0,
|
||||
_user_list_head: INVALID_POS,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_with_hash(&self, key: &K, hash: u64) -> Option<&V> {
|
||||
let mut next = self.dictionary[hash as usize % self.dictionary.len()];
|
||||
loop {
|
||||
if next == INVALID_POS {
|
||||
return None;
|
||||
}
|
||||
|
||||
let bucket = &self.buckets[next as usize];
|
||||
let (bucket_key, bucket_value) = bucket.inner.as_ref().expect("entry is in use");
|
||||
if bucket_key == key {
|
||||
return Some(&bucket_value);
|
||||
}
|
||||
next = bucket.next;
|
||||
}
|
||||
}
|
||||
|
||||
// all updates are done through Entry
|
||||
pub fn entry_with_hash(&mut self, key: K, hash: u64) -> Entry<'a, '_, K, V> {
|
||||
let dict_pos = hash as usize % self.dictionary.len();
|
||||
let first = self.dictionary[dict_pos];
|
||||
if first == INVALID_POS {
|
||||
// no existing entry
|
||||
return Entry::Vacant(VacantEntry {
|
||||
map: self,
|
||||
key,
|
||||
dict_pos: dict_pos as u32,
|
||||
});
|
||||
}
|
||||
|
||||
let mut prev_pos = PrevPos::First(dict_pos as u32);
|
||||
let mut next = first;
|
||||
loop {
|
||||
let bucket = &mut self.buckets[next as usize];
|
||||
let (bucket_key, _bucket_value) = bucket.inner.as_mut().expect("entry is in use");
|
||||
if *bucket_key == key {
|
||||
// found existing entry
|
||||
return Entry::Occupied(OccupiedEntry {
|
||||
map: self,
|
||||
_key: key,
|
||||
prev_pos,
|
||||
bucket_pos: next,
|
||||
});
|
||||
}
|
||||
|
||||
if bucket.next == INVALID_POS {
|
||||
// No existing entry
|
||||
return Entry::Vacant(VacantEntry {
|
||||
map: self,
|
||||
key,
|
||||
dict_pos: dict_pos as u32,
|
||||
});
|
||||
}
|
||||
prev_pos = PrevPos::Chained(next);
|
||||
next = bucket.next;
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_num_buckets(&self) -> usize {
|
||||
self.buckets.len()
|
||||
}
|
||||
|
||||
pub fn entry_at_bucket(&mut self, pos: usize) -> Option<OccupiedEntry<K, V>> {
|
||||
if pos >= self.buckets.len() {
|
||||
return None;
|
||||
}
|
||||
|
||||
todo!()
|
||||
//self.buckets[pos].inner.as_ref()
|
||||
}
|
||||
|
||||
pub(crate) fn alloc_bucket(&mut self, key: K, value: V) -> Result<u32, FullError> {
|
||||
let pos = self.free_head;
|
||||
if pos == INVALID_POS {
|
||||
return Err(FullError());
|
||||
}
|
||||
|
||||
let bucket = &mut self.buckets[pos as usize];
|
||||
self.free_head = bucket.next;
|
||||
self.buckets_in_use += 1;
|
||||
|
||||
bucket.next = INVALID_POS;
|
||||
bucket.inner = Some((key, value));
|
||||
|
||||
return Ok(pos);
|
||||
}
|
||||
}
|
||||
220
libs/neon-shmem/src/hash/tests.rs
Normal file
220
libs/neon-shmem/src/hash/tests.rs
Normal file
@@ -0,0 +1,220 @@
|
||||
use std::collections::BTreeMap;
|
||||
use std::collections::HashSet;
|
||||
use std::fmt::{Debug, Formatter};
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
|
||||
use crate::hash::HashMapAccess;
|
||||
use crate::hash::HashMapInit;
|
||||
use crate::hash::UpdateAction;
|
||||
use crate::shmem::ShmemHandle;
|
||||
|
||||
use rand::seq::SliceRandom;
|
||||
use rand::{Rng, RngCore};
|
||||
use rand_distr::Zipf;
|
||||
|
||||
const TEST_KEY_LEN: usize = 16;
|
||||
|
||||
#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)]
|
||||
struct TestKey([u8; TEST_KEY_LEN]);
|
||||
|
||||
impl From<&TestKey> for u128 {
|
||||
fn from(val: &TestKey) -> u128 {
|
||||
u128::from_be_bytes(val.0)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<u128> for TestKey {
|
||||
fn from(val: u128) -> TestKey {
|
||||
TestKey(val.to_be_bytes())
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> From<&'a [u8]> for TestKey {
|
||||
fn from(bytes: &'a [u8]) -> TestKey {
|
||||
TestKey(bytes.try_into().unwrap())
|
||||
}
|
||||
}
|
||||
|
||||
fn test_inserts<K: Into<TestKey> + Copy>(keys: &[K]) {
|
||||
const MAX_MEM_SIZE: usize = 10000000;
|
||||
let shmem = ShmemHandle::new("test_inserts", 0, MAX_MEM_SIZE).unwrap();
|
||||
|
||||
let init_struct = HashMapInit::<TestKey, usize>::init_in_shmem(100000, shmem);
|
||||
let w = init_struct.attach_writer();
|
||||
|
||||
for (idx, k) in keys.iter().enumerate() {
|
||||
let res = w.insert(&(*k).into(), idx);
|
||||
assert!(res.is_ok());
|
||||
}
|
||||
|
||||
for (idx, k) in keys.iter().enumerate() {
|
||||
let x = w.get(&(*k).into());
|
||||
let value = x.as_deref().copied();
|
||||
assert_eq!(value, Some(idx));
|
||||
}
|
||||
|
||||
//eprintln!("stats: {:?}", tree_writer.get_statistics());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn dense() {
|
||||
// This exercises splitting a node with prefix
|
||||
let keys: &[u128] = &[0, 1, 2, 3, 256];
|
||||
test_inserts(keys);
|
||||
|
||||
// Dense keys
|
||||
let mut keys: Vec<u128> = (0..10000).collect();
|
||||
test_inserts(&keys);
|
||||
|
||||
// Do the same in random orders
|
||||
for _ in 1..10 {
|
||||
keys.shuffle(&mut rand::rng());
|
||||
test_inserts(&keys);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn sparse() {
|
||||
// sparse keys
|
||||
let mut keys: Vec<TestKey> = Vec::new();
|
||||
let mut used_keys = HashSet::new();
|
||||
for _ in 0..10000 {
|
||||
loop {
|
||||
let key = rand::random::<u128>();
|
||||
if used_keys.get(&key).is_some() {
|
||||
continue;
|
||||
}
|
||||
used_keys.insert(key);
|
||||
keys.push(key.into());
|
||||
break;
|
||||
}
|
||||
}
|
||||
test_inserts(&keys);
|
||||
}
|
||||
|
||||
struct TestValue(AtomicUsize);
|
||||
|
||||
impl TestValue {
|
||||
fn new(val: usize) -> TestValue {
|
||||
TestValue(AtomicUsize::new(val))
|
||||
}
|
||||
|
||||
fn load(&self) -> usize {
|
||||
self.0.load(Ordering::Relaxed)
|
||||
}
|
||||
}
|
||||
|
||||
impl Clone for TestValue {
|
||||
fn clone(&self) -> TestValue {
|
||||
TestValue::new(self.load())
|
||||
}
|
||||
}
|
||||
|
||||
impl Debug for TestValue {
|
||||
fn fmt(&self, fmt: &mut Formatter<'_>) -> Result<(), std::fmt::Error> {
|
||||
write!(fmt, "{:?}", self.load())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
struct TestOp(TestKey, Option<usize>);
|
||||
|
||||
fn apply_op(
|
||||
op: &TestOp,
|
||||
sut: &HashMapAccess<TestKey, TestValue>,
|
||||
shadow: &mut BTreeMap<TestKey, usize>,
|
||||
) {
|
||||
eprintln!("applying op: {op:?}");
|
||||
|
||||
// apply the change to the shadow tree first
|
||||
let shadow_existing = if let Some(v) = op.1 {
|
||||
shadow.insert(op.0, v)
|
||||
} else {
|
||||
shadow.remove(&op.0)
|
||||
};
|
||||
|
||||
// apply to Art tree
|
||||
sut.update_with_fn(&op.0, |existing| {
|
||||
assert_eq!(existing.map(TestValue::load), shadow_existing);
|
||||
|
||||
match (existing, op.1) {
|
||||
(None, None) => UpdateAction::Nothing,
|
||||
(None, Some(new_val)) => UpdateAction::Insert(TestValue::new(new_val)),
|
||||
(Some(_old_val), None) => UpdateAction::Remove,
|
||||
(Some(old_val), Some(new_val)) => {
|
||||
old_val.0.store(new_val, Ordering::Relaxed);
|
||||
UpdateAction::Nothing
|
||||
}
|
||||
}
|
||||
})
|
||||
.expect("out of memory");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn random_ops() {
|
||||
const MAX_MEM_SIZE: usize = 10000000;
|
||||
let shmem = ShmemHandle::new("test_inserts", 0, MAX_MEM_SIZE).unwrap();
|
||||
|
||||
let init_struct = HashMapInit::<TestKey, TestValue>::init_in_shmem(100000, shmem);
|
||||
let writer = init_struct.attach_writer();
|
||||
|
||||
let mut shadow: std::collections::BTreeMap<TestKey, usize> = BTreeMap::new();
|
||||
|
||||
let distribution = Zipf::new(u128::MAX as f64, 1.1).unwrap();
|
||||
let mut rng = rand::rng();
|
||||
for i in 0..100000 {
|
||||
let key: TestKey = (rng.sample(distribution) as u128).into();
|
||||
|
||||
let op = TestOp(key, if rng.random_bool(0.75) { Some(i) } else { None });
|
||||
|
||||
apply_op(&op, &writer, &mut shadow);
|
||||
|
||||
if i % 1000 == 0 {
|
||||
eprintln!("{i} ops processed");
|
||||
//eprintln!("stats: {:?}", tree_writer.get_statistics());
|
||||
//test_iter(&tree_writer, &shadow);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_grow() {
|
||||
const MEM_SIZE: usize = 10000000;
|
||||
let shmem = ShmemHandle::new("test_grow", 0, MEM_SIZE).unwrap();
|
||||
|
||||
let init_struct = HashMapInit::<TestKey, TestValue>::init_in_shmem(1000, shmem);
|
||||
let writer = init_struct.attach_writer();
|
||||
|
||||
let mut shadow: std::collections::BTreeMap<TestKey, usize> = BTreeMap::new();
|
||||
|
||||
let mut rng = rand::rng();
|
||||
for i in 0..10000 {
|
||||
let key: TestKey = ((rng.next_u32() % 1000) as u128).into();
|
||||
|
||||
let op = TestOp(key, if rng.random_bool(0.75) { Some(i) } else { None });
|
||||
|
||||
apply_op(&op, &writer, &mut shadow);
|
||||
|
||||
if i % 1000 == 0 {
|
||||
eprintln!("{i} ops processed");
|
||||
//eprintln!("stats: {:?}", tree_writer.get_statistics());
|
||||
//test_iter(&tree_writer, &shadow);
|
||||
}
|
||||
}
|
||||
|
||||
writer.grow(1500).unwrap();
|
||||
|
||||
for i in 0..10000 {
|
||||
let key: TestKey = ((rng.next_u32() % 1500) as u128).into();
|
||||
|
||||
let op = TestOp(key, if rng.random_bool(0.75) { Some(i) } else { None });
|
||||
|
||||
apply_op(&op, &writer, &mut shadow);
|
||||
|
||||
if i % 1000 == 0 {
|
||||
eprintln!("{i} ops processed");
|
||||
//eprintln!("stats: {:?}", tree_writer.get_statistics());
|
||||
//test_iter(&tree_writer, &shadow);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,3 +1,4 @@
|
||||
//! Shared memory utilities for neon communicator
|
||||
|
||||
pub mod hash;
|
||||
pub mod shmem;
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
# pgxs/neon/Makefile
|
||||
|
||||
|
||||
MODULE_big = neon
|
||||
OBJS = \
|
||||
$(WIN32RES) \
|
||||
@@ -22,7 +21,8 @@ OBJS = \
|
||||
walproposer.o \
|
||||
walproposer_pg.o \
|
||||
control_plane_connector.o \
|
||||
walsender_hooks.o
|
||||
walsender_hooks.o \
|
||||
$(LIBCOMMUNICATOR_PATH)/libcommunicator.a
|
||||
|
||||
PG_CPPFLAGS = -I$(libpq_srcdir)
|
||||
SHLIB_LINK_INTERNAL = $(libpq)
|
||||
|
||||
13
pgxn/neon/communicator/Cargo.toml
Normal file
13
pgxn/neon/communicator/Cargo.toml
Normal file
@@ -0,0 +1,13 @@
|
||||
[package]
|
||||
name = "communicator"
|
||||
version = "0.1.0"
|
||||
edition = "2024"
|
||||
|
||||
[lib]
|
||||
crate-type = ["staticlib"]
|
||||
|
||||
[dependencies]
|
||||
neon-shmem.workspace = true
|
||||
|
||||
[build-dependencies]
|
||||
cbindgen.workspace = true
|
||||
8
pgxn/neon/communicator/README.md
Normal file
8
pgxn/neon/communicator/README.md
Normal file
@@ -0,0 +1,8 @@
|
||||
This package will evolve into a "compute-pageserver communicator"
|
||||
process and machinery. For now, it just provides wrappers on the
|
||||
neon-shmem Rust crate, to allow using it in the C implementation of
|
||||
the LFC.
|
||||
|
||||
At compilation time, pgxn/neon/communicator/ produces a static
|
||||
library, libcommunicator.a. It is linked to the neon.so extension
|
||||
library.
|
||||
22
pgxn/neon/communicator/build.rs
Normal file
22
pgxn/neon/communicator/build.rs
Normal file
@@ -0,0 +1,22 @@
|
||||
use std::env;
|
||||
|
||||
fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
let crate_dir = env::var("CARGO_MANIFEST_DIR").unwrap();
|
||||
|
||||
cbindgen::generate(crate_dir).map_or_else(
|
||||
|error| match error {
|
||||
cbindgen::Error::ParseSyntaxError { .. } => {
|
||||
// This means there was a syntax error in the Rust sources. Don't panic, because
|
||||
// we want the build to continue and the Rust compiler to hit the error. The
|
||||
// Rust compiler produces a better error message than cbindgen.
|
||||
eprintln!("Generating C bindings failed because of a Rust syntax error");
|
||||
}
|
||||
e => panic!("Unable to generate C bindings: {:?}", e),
|
||||
},
|
||||
|bindings| {
|
||||
bindings.write_to_file("communicator_bindings.h");
|
||||
},
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
4
pgxn/neon/communicator/cbindgen.toml
Normal file
4
pgxn/neon/communicator/cbindgen.toml
Normal file
@@ -0,0 +1,4 @@
|
||||
language = "C"
|
||||
|
||||
[enum]
|
||||
prefix_with_name = true
|
||||
240
pgxn/neon/communicator/src/file_cache_hashmap.rs
Normal file
240
pgxn/neon/communicator/src/file_cache_hashmap.rs
Normal file
@@ -0,0 +1,240 @@
|
||||
//! Glue code to allow using the Rust shmem hash map implementation from C code
|
||||
//!
|
||||
//! For convience of adapting existing code, the interface provided somewhat resembles the dynahash
|
||||
//! interface.
|
||||
//!
|
||||
//! NOTE: The caller is responsible for locking! The caller is expected to hold the PostgreSQL
|
||||
//! LWLock, 'lfc_lock', while accessing the hash table, in shared or exclusive mode as appropriate.
|
||||
|
||||
use std::ffi::c_void;
|
||||
use std::marker::PhantomData;
|
||||
|
||||
use neon_shmem::hash::entry::Entry;
|
||||
use neon_shmem::hash::{HashMapAccess, HashMapInit};
|
||||
use neon_shmem::shmem::ShmemHandle;
|
||||
|
||||
/// NB: This must match the definition of BufferTag in Postgres C headers. We could use bindgen to
|
||||
/// generate this from the C headers, but prefer to not introduce dependency on bindgen for now.
|
||||
///
|
||||
/// Note that there are no padding bytes. If the corresponding C struct has padding bytes, the C C
|
||||
/// code must clear them.
|
||||
#[derive(Clone, Debug, Hash, Eq, PartialEq)]
|
||||
#[repr(C)]
|
||||
pub struct FileCacheKey {
|
||||
pub _spc_id: u32,
|
||||
pub _db_id: u32,
|
||||
pub _rel_number: u32,
|
||||
pub _fork_num: u32,
|
||||
pub _block_num: u32,
|
||||
}
|
||||
|
||||
/// Like with FileCacheKey, this must match the definition of FileCacheEntry in file_cache.c. We
|
||||
/// don't look at the contents here though, it's sufficent that the size and alignment matches.
|
||||
#[derive(Clone, Debug, Default)]
|
||||
#[repr(C)]
|
||||
pub struct FileCacheEntry {
|
||||
pub _offset: u32,
|
||||
pub _access_count: u32,
|
||||
pub _prev: *mut FileCacheEntry,
|
||||
pub _next: *mut FileCacheEntry,
|
||||
pub _state: [u32; 8],
|
||||
}
|
||||
|
||||
/// XXX: This could be just:
|
||||
///
|
||||
/// ```ignore
|
||||
/// type FileCacheHashMapHandle = HashMapInit<'a, FileCacheKey, FileCacheEntry>
|
||||
/// ```
|
||||
///
|
||||
/// but with that, cbindgen generates a broken typedef in the C header file which doesn't
|
||||
/// compile. It apparently gets confused by the generics.
|
||||
#[repr(transparent)]
|
||||
pub struct FileCacheHashMapHandle<'a>(
|
||||
pub *mut c_void,
|
||||
PhantomData<HashMapInit<'a, FileCacheKey, FileCacheEntry>>,
|
||||
);
|
||||
impl<'a> From<Box<HashMapInit<'a, FileCacheKey, FileCacheEntry>>> for FileCacheHashMapHandle<'a> {
|
||||
fn from(x: Box<HashMapInit<'a, FileCacheKey, FileCacheEntry>>) -> Self {
|
||||
FileCacheHashMapHandle(Box::into_raw(x) as *mut c_void, PhantomData::default())
|
||||
}
|
||||
}
|
||||
impl<'a> From<FileCacheHashMapHandle<'a>> for Box<HashMapInit<'a, FileCacheKey, FileCacheEntry>> {
|
||||
fn from(x: FileCacheHashMapHandle) -> Self {
|
||||
unsafe { Box::from_raw(x.0.cast()) }
|
||||
}
|
||||
}
|
||||
|
||||
/// XXX: same for this
|
||||
#[repr(transparent)]
|
||||
pub struct FileCacheHashMapAccess<'a>(
|
||||
pub *mut c_void,
|
||||
PhantomData<HashMapAccess<'a, FileCacheKey, FileCacheEntry>>,
|
||||
);
|
||||
impl<'a> From<Box<HashMapAccess<'a, FileCacheKey, FileCacheEntry>>> for FileCacheHashMapAccess<'a> {
|
||||
fn from(x: Box<HashMapAccess<'a, FileCacheKey, FileCacheEntry>>) -> Self {
|
||||
// Convert the Box into a raw mutable pointer to the HashMapAccess itself.
|
||||
// This transfers ownership of the HashMapAccess (and its contained ShmemHandle)
|
||||
// to the raw pointer. The C caller is now responsible for managing this memory.
|
||||
FileCacheHashMapAccess(Box::into_raw(x) as *mut c_void, PhantomData::default())
|
||||
}
|
||||
}
|
||||
impl<'a> FileCacheHashMapAccess<'a> {
|
||||
fn as_ref(self) -> &'a HashMapAccess<'a, FileCacheKey, FileCacheEntry> {
|
||||
let ptr: *mut HashMapAccess<'_, FileCacheKey, FileCacheEntry> = self.0.cast();
|
||||
unsafe { ptr.as_ref().unwrap() }
|
||||
}
|
||||
fn as_mut(self) -> &'a mut HashMapAccess<'a, FileCacheKey, FileCacheEntry> {
|
||||
let ptr: *mut HashMapAccess<'_, FileCacheKey, FileCacheEntry> = self.0.cast();
|
||||
unsafe { ptr.as_mut().unwrap() }
|
||||
}
|
||||
}
|
||||
|
||||
/// Initialize the shared memory area at postmaster startup. The returned handle is inherited
|
||||
/// by all the backend processes across fork()
|
||||
#[unsafe(no_mangle)]
|
||||
pub extern "C" fn bcomm_file_cache_shmem_init<'a>(
|
||||
initial_num_buckets: u32,
|
||||
max_num_buckets: u32,
|
||||
) -> FileCacheHashMapHandle<'a> {
|
||||
let max_bytes = HashMapInit::<FileCacheKey, FileCacheEntry>::estimate_size(max_num_buckets);
|
||||
let shmem_handle =
|
||||
ShmemHandle::new("lfc mapping", 0, max_bytes).expect("shmem initialization failed");
|
||||
|
||||
let handle = HashMapInit::<FileCacheKey, FileCacheEntry>::init_in_shmem(
|
||||
initial_num_buckets,
|
||||
shmem_handle,
|
||||
);
|
||||
|
||||
Box::new(handle).into()
|
||||
}
|
||||
|
||||
/// Initialize the access to the shared memory area in a backend process.
|
||||
///
|
||||
/// XXX: I'm not sure if this actually gets called in each process, or if the returned struct
|
||||
/// is also inherited across fork(). It currently works either way but if this did more
|
||||
/// initialization that needed to be done after fork(), then it would matter.
|
||||
#[unsafe(no_mangle)]
|
||||
pub extern "C" fn bcomm_file_cache_shmem_access<'a>(
|
||||
handle: FileCacheHashMapHandle<'a>,
|
||||
) -> FileCacheHashMapAccess<'a> {
|
||||
let handle: Box<HashMapInit<'_, FileCacheKey, FileCacheEntry>> = handle.into();
|
||||
Box::new(handle.attach_writer()).into()
|
||||
}
|
||||
|
||||
/// Return the current number of buckets in the hash table
|
||||
#[unsafe(no_mangle)]
|
||||
pub extern "C" fn bcomm_file_cache_get_num_buckets<'a>(
|
||||
map: FileCacheHashMapAccess<'static>,
|
||||
) -> u32 {
|
||||
let map = map.as_ref();
|
||||
map.get_num_buckets().try_into().unwrap()
|
||||
}
|
||||
|
||||
/// Look up the entry with given key and hash.
|
||||
///
|
||||
/// This is similar to dynahash's hash_search(... , HASH_FIND)
|
||||
#[unsafe(no_mangle)]
|
||||
pub extern "C" fn bcomm_file_cache_hash_find<'a>(
|
||||
map: FileCacheHashMapAccess<'static>,
|
||||
key: &FileCacheKey,
|
||||
hash: u64,
|
||||
) -> Option<&'static FileCacheEntry> {
|
||||
let map = map.as_ref();
|
||||
map.get_with_hash(key, hash)
|
||||
}
|
||||
|
||||
/// Look up the entry at given bucket position
|
||||
///
|
||||
/// This has no direct equivalent in the dynahash interface, but can be used to
|
||||
/// iterate through all entries in the hash table.
|
||||
#[unsafe(no_mangle)]
|
||||
pub extern "C" fn bcomm_file_cache_hash_get_at_pos<'a>(
|
||||
map: FileCacheHashMapAccess<'static>,
|
||||
pos: u32,
|
||||
) -> Option<&'static FileCacheEntry> {
|
||||
let map = map.as_ref();
|
||||
map.get_at_bucket(pos as usize).map(|(_k, v)| v)
|
||||
}
|
||||
|
||||
/// Remove entry, given a pointer to the value.
|
||||
///
|
||||
/// This is equivalent to dynahash hash_search(entry->key, HASH_REMOVE), where 'entry'
|
||||
/// is an entry you have previously looked up
|
||||
#[unsafe(no_mangle)]
|
||||
pub extern "C" fn bcomm_file_cache_hash_remove_entry<'a, 'b>(
|
||||
map: FileCacheHashMapAccess,
|
||||
entry: *mut FileCacheEntry,
|
||||
) {
|
||||
let map = map.as_mut();
|
||||
let pos = map.get_bucket_for_value(entry);
|
||||
match map.entry_at_bucket(pos) {
|
||||
Some(e) => {
|
||||
e.remove();
|
||||
}
|
||||
None => {
|
||||
// todo: shouldn't happen, panic?
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Compute the hash for given key
|
||||
///
|
||||
/// This is equivalent to dynahash get_hash_value() function. We use Rust's default hasher
|
||||
/// for calculating the hash though.
|
||||
#[unsafe(no_mangle)]
|
||||
pub extern "C" fn bcomm_file_cache_get_hash_value<'a, 'b>(
|
||||
map: FileCacheHashMapAccess<'static>,
|
||||
key: &FileCacheKey,
|
||||
) -> u64 {
|
||||
map.as_ref().get_hash_value(key)
|
||||
}
|
||||
|
||||
/// Insert a new entry to the hash table
|
||||
///
|
||||
/// This is equivalent to dynahash hash_search(..., HASH_ENTER).
|
||||
#[unsafe(no_mangle)]
|
||||
pub extern "C" fn bcomm_file_cache_hash_enter<'a, 'b>(
|
||||
map: FileCacheHashMapAccess,
|
||||
key: &FileCacheKey,
|
||||
hash: u64,
|
||||
found: &mut bool,
|
||||
) -> *mut FileCacheEntry {
|
||||
match map.as_mut().entry_with_hash(key.clone(), hash) {
|
||||
Entry::Occupied(mut e) => {
|
||||
*found = true;
|
||||
e.get_mut()
|
||||
}
|
||||
Entry::Vacant(e) => {
|
||||
*found = false;
|
||||
let initial_value = FileCacheEntry::default();
|
||||
e.insert(initial_value).expect("TODO: hash table full")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the key for a given entry, which must be present in the hash table.
|
||||
///
|
||||
/// Dynahash requires the key to be part of the "value" struct, so you can always
|
||||
/// access the key with something like `entry->key`. The Rust implementation however
|
||||
/// stores the key separately. This function extracts the separately stored key.
|
||||
#[unsafe(no_mangle)]
|
||||
pub extern "C" fn bcomm_file_cache_hash_get_key_for_entry<'a, 'b>(
|
||||
map: FileCacheHashMapAccess,
|
||||
entry: *const FileCacheEntry,
|
||||
) -> Option<&FileCacheKey> {
|
||||
let map = map.as_ref();
|
||||
let pos = map.get_bucket_for_value(entry);
|
||||
map.get_at_bucket(pos as usize).map(|(k, _v)| k)
|
||||
}
|
||||
|
||||
/// Remove all entries from the hash table
|
||||
#[unsafe(no_mangle)]
|
||||
pub extern "C" fn bcomm_file_cache_hash_reset<'a, 'b>(map: FileCacheHashMapAccess) {
|
||||
let map = map.as_mut();
|
||||
let num_buckets = map.get_num_buckets();
|
||||
for i in 0..num_buckets {
|
||||
if let Some(e) = map.entry_at_bucket(i) {
|
||||
e.remove();
|
||||
}
|
||||
}
|
||||
}
|
||||
1
pgxn/neon/communicator/src/lib.rs
Normal file
1
pgxn/neon/communicator/src/lib.rs
Normal file
@@ -0,0 +1 @@
|
||||
pub mod file_cache_hashmap;
|
||||
@@ -22,7 +22,6 @@
|
||||
#include "funcapi.h"
|
||||
#include "miscadmin.h"
|
||||
#include "common/file_utils.h"
|
||||
#include "common/hashfn.h"
|
||||
#include "pgstat.h"
|
||||
#include "port/pg_iovec.h"
|
||||
#include "postmaster/bgworker.h"
|
||||
@@ -37,7 +36,6 @@
|
||||
#include "storage/procsignal.h"
|
||||
#include "tcop/tcopprot.h"
|
||||
#include "utils/builtins.h"
|
||||
#include "utils/dynahash.h"
|
||||
#include "utils/guc.h"
|
||||
|
||||
#if PG_VERSION_NUM >= 150000
|
||||
@@ -47,6 +45,7 @@
|
||||
#include "hll.h"
|
||||
#include "bitmap.h"
|
||||
#include "file_cache.h"
|
||||
#include "file_cache_rust_hash.h"
|
||||
#include "neon.h"
|
||||
#include "neon_lwlsncache.h"
|
||||
#include "neon_perf_counters.h"
|
||||
@@ -124,14 +123,18 @@ typedef enum FileCacheBlockState
|
||||
|
||||
typedef struct FileCacheEntry
|
||||
{
|
||||
BufferTag key;
|
||||
uint32 hash;
|
||||
uint32 offset;
|
||||
uint32 access_count;
|
||||
dlist_node list_node; /* LRU list node */
|
||||
uint32 state[(BLOCKS_PER_CHUNK * 2 + 31) / 32]; /* two bits per block */
|
||||
} FileCacheEntry;
|
||||
|
||||
/* Todo: alignment must be the same too */
|
||||
StaticAssertDecl(sizeof(FileCacheEntry) == sizeof(RustFileCacheEntry),
|
||||
"Rust and C declarations of FileCacheEntry are incompatible");
|
||||
StaticAssertDecl(sizeof(BufferTag) == sizeof(RustFileCacheKey),
|
||||
"Rust and C declarations of FileCacheKey are incompatible");
|
||||
|
||||
#define GET_STATE(entry, i) (((entry)->state[(i) / 16] >> ((i) % 16 * 2)) & 3)
|
||||
#define SET_STATE(entry, i, new_state) (entry)->state[(i) / 16] = ((entry)->state[(i) / 16] & ~(3 << ((i) % 16 * 2))) | ((new_state) << ((i) % 16 * 2))
|
||||
|
||||
@@ -201,7 +204,8 @@ typedef struct FreeListChunk
|
||||
#define FILE_CACHE_STATE_SIZE_FOR_CHUNKS(n_chunks) (sizeof(FileCacheState) + (n_chunks)*sizeof(BufferTag) + (((n_chunks) * BLOCKS_PER_CHUNK)+7)/8)
|
||||
#define FILE_CACHE_STATE_SIZE(fcs) (sizeof(FileCacheState) + (fcs->n_chunks)*sizeof(BufferTag) + (((fcs->n_chunks) << fcs->chunk_size_log)+7)/8)
|
||||
|
||||
static HTAB *lfc_hash;
|
||||
static FileCacheHashMapHandle lfc_hash_handle;
|
||||
static FileCacheHashMapAccess lfc_hash;
|
||||
static int lfc_desc = -1;
|
||||
static LWLockId lfc_lock;
|
||||
static LWLockId lfc_freelist_lock;
|
||||
@@ -258,15 +262,9 @@ lfc_switch_off(void)
|
||||
|
||||
if (LFC_ENABLED())
|
||||
{
|
||||
HASH_SEQ_STATUS status;
|
||||
FileCacheEntry *entry;
|
||||
|
||||
/* Invalidate hash */
|
||||
hash_seq_init(&status, lfc_hash);
|
||||
while ((entry = hash_seq_search(&status)) != NULL)
|
||||
{
|
||||
hash_search_with_hash_value(lfc_hash, &entry->key, entry->hash, HASH_REMOVE, NULL);
|
||||
}
|
||||
file_cache_hash_reset(lfc_hash);
|
||||
|
||||
lfc_ctl->generation += 1;
|
||||
lfc_ctl->size = 0;
|
||||
lfc_ctl->pinned = 0;
|
||||
@@ -347,7 +345,6 @@ lfc_shmem_startup(void)
|
||||
{
|
||||
size_t size;
|
||||
bool found;
|
||||
static HASHCTL info;
|
||||
|
||||
if (prev_shmem_startup_hook)
|
||||
{
|
||||
@@ -366,17 +363,13 @@ lfc_shmem_startup(void)
|
||||
|
||||
lfc_lock = (LWLockId) GetNamedLWLockTranche("lfc_lock");
|
||||
lfc_freelist_lock = (LWLockId) GetNamedLWLockTranche("lfc_freelist_lock");
|
||||
info.keysize = sizeof(BufferTag);
|
||||
info.entrysize = sizeof(FileCacheEntry);
|
||||
|
||||
/*
|
||||
* n_chunks+1 because we add new element to hash table before eviction
|
||||
* of victim
|
||||
*/
|
||||
lfc_hash = ShmemInitHash("lfc_hash",
|
||||
n_chunks + 1, n_chunks + 1,
|
||||
&info,
|
||||
HASH_ELEM | HASH_BLOBS);
|
||||
lfc_hash_handle = file_cache_hash_shmem_init(n_chunks + 1, n_chunks + 1);
|
||||
|
||||
memset(lfc_ctl, 0, offsetof(FileCacheControl, free_pages));
|
||||
dlist_init(&lfc_ctl->lru);
|
||||
|
||||
@@ -406,6 +399,8 @@ lfc_shmem_startup(void)
|
||||
|
||||
}
|
||||
LWLockRelease(AddinShmemInitLock);
|
||||
|
||||
lfc_hash = file_cache_hash_shmem_access(lfc_hash_handle);
|
||||
}
|
||||
|
||||
static void
|
||||
@@ -419,7 +414,6 @@ lfc_shmem_request(void)
|
||||
#endif
|
||||
|
||||
size = sizeof(FileCacheControl);
|
||||
size += hash_estimate_size(SIZE_MB_TO_CHUNKS(lfc_max_size) + 1, sizeof(FileCacheEntry));
|
||||
|
||||
RequestAddinShmemSpace(size);
|
||||
RequestNamedLWLockTranche("lfc_lock", 1);
|
||||
@@ -504,7 +498,7 @@ lfc_change_limit_hook(int newval, void *extra)
|
||||
lfc_ctl->used_pages -= is_page_cached;
|
||||
lfc_ctl->evicted_pages += is_page_cached;
|
||||
}
|
||||
hash_search_with_hash_value(lfc_hash, &victim->key, victim->hash, HASH_REMOVE, NULL);
|
||||
file_cache_hash_remove_entry(lfc_hash, victim);
|
||||
|
||||
if (!freelist_push(offset))
|
||||
{
|
||||
@@ -678,7 +672,7 @@ lfc_get_state(size_t max_entries)
|
||||
dlist_reverse_foreach(iter, &lfc_ctl->lru)
|
||||
{
|
||||
FileCacheEntry *entry = dlist_container(FileCacheEntry, list_node, iter.cur);
|
||||
fcs->chunks[i] = entry->key;
|
||||
fcs->chunks[i] = *file_cache_hash_get_key_for_entry(lfc_hash, entry);
|
||||
for (int j = 0; j < BLOCKS_PER_CHUNK; j++)
|
||||
{
|
||||
if (GET_STATE(entry, j) != UNAVAILABLE)
|
||||
@@ -967,7 +961,7 @@ lfc_invalidate(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber nblocks)
|
||||
{
|
||||
BufferTag tag;
|
||||
FileCacheEntry *entry;
|
||||
uint32 hash;
|
||||
uint64 hash;
|
||||
|
||||
if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */
|
||||
return;
|
||||
@@ -983,8 +977,8 @@ lfc_invalidate(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber nblocks)
|
||||
for (BlockNumber blkno = 0; blkno < nblocks; blkno += BLOCKS_PER_CHUNK)
|
||||
{
|
||||
tag.blockNum = blkno;
|
||||
hash = get_hash_value(lfc_hash, &tag);
|
||||
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL);
|
||||
hash = file_cache_hash_get_hash_value(lfc_hash, &tag);
|
||||
entry = file_cache_hash_find(lfc_hash, &tag, hash);
|
||||
if (entry != NULL)
|
||||
{
|
||||
for (int i = 0; i < BLOCKS_PER_CHUNK; i++)
|
||||
@@ -1012,7 +1006,7 @@ lfc_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno)
|
||||
FileCacheEntry *entry;
|
||||
int chunk_offs = BLOCK_TO_CHUNK_OFF(blkno);
|
||||
bool found = false;
|
||||
uint32 hash;
|
||||
uint64 hash;
|
||||
|
||||
if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */
|
||||
return false;
|
||||
@@ -1022,12 +1016,12 @@ lfc_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno)
|
||||
tag.blockNum = blkno - chunk_offs;
|
||||
|
||||
CriticalAssert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber);
|
||||
hash = get_hash_value(lfc_hash, &tag);
|
||||
hash = file_cache_hash_get_hash_value(lfc_hash, &tag);
|
||||
|
||||
LWLockAcquire(lfc_lock, LW_SHARED);
|
||||
if (LFC_ENABLED())
|
||||
{
|
||||
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL);
|
||||
entry = file_cache_hash_find(lfc_hash, &tag, hash);
|
||||
found = entry != NULL && GET_STATE(entry, chunk_offs) != UNAVAILABLE;
|
||||
}
|
||||
LWLockRelease(lfc_lock);
|
||||
@@ -1046,7 +1040,7 @@ lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
FileCacheEntry *entry;
|
||||
uint32 chunk_offs;
|
||||
int found = 0;
|
||||
uint32 hash;
|
||||
uint64 hash;
|
||||
int i = 0;
|
||||
|
||||
if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */
|
||||
@@ -1059,7 +1053,7 @@ lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
|
||||
chunk_offs = BLOCK_TO_CHUNK_OFF(blkno);
|
||||
tag.blockNum = blkno - chunk_offs;
|
||||
hash = get_hash_value(lfc_hash, &tag);
|
||||
hash = file_cache_hash_get_hash_value(lfc_hash, &tag);
|
||||
|
||||
LWLockAcquire(lfc_lock, LW_SHARED);
|
||||
|
||||
@@ -1071,7 +1065,7 @@ lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
while (true)
|
||||
{
|
||||
int this_chunk = Min(nblocks - i, BLOCKS_PER_CHUNK - chunk_offs);
|
||||
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL);
|
||||
entry = file_cache_hash_find(lfc_hash, &tag, hash);
|
||||
|
||||
if (entry != NULL)
|
||||
{
|
||||
@@ -1101,7 +1095,7 @@ lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
*/
|
||||
chunk_offs = BLOCK_TO_CHUNK_OFF(blkno + i);
|
||||
tag.blockNum = (blkno + i) - chunk_offs;
|
||||
hash = get_hash_value(lfc_hash, &tag);
|
||||
hash = file_cache_hash_get_hash_value(lfc_hash, &tag);
|
||||
}
|
||||
|
||||
LWLockRelease(lfc_lock);
|
||||
@@ -1150,7 +1144,7 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
BufferTag tag;
|
||||
FileCacheEntry *entry;
|
||||
ssize_t rc;
|
||||
uint32 hash;
|
||||
uint64 hash;
|
||||
uint64 generation;
|
||||
uint32 entry_offset;
|
||||
int blocks_read = 0;
|
||||
@@ -1228,7 +1222,7 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
Assert(iov_last_used - first_block_in_chunk_read >= n_blocks_to_read);
|
||||
|
||||
tag.blockNum = blkno - chunk_offs;
|
||||
hash = get_hash_value(lfc_hash, &tag);
|
||||
hash = file_cache_hash_get_hash_value(lfc_hash, &tag);
|
||||
cv = &lfc_ctl->cv[hash % N_COND_VARS];
|
||||
|
||||
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
|
||||
@@ -1241,13 +1235,13 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
return blocks_read;
|
||||
}
|
||||
|
||||
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL);
|
||||
entry = file_cache_hash_find(lfc_hash, &tag, hash);
|
||||
|
||||
/* Approximate working set for the blocks assumed in this entry */
|
||||
for (int i = 0; i < blocks_in_chunk; i++)
|
||||
{
|
||||
tag.blockNum = blkno + i;
|
||||
addSHLL(&lfc_ctl->wss_estimation, hash_bytes((uint8_t const*)&tag, sizeof(tag)));
|
||||
addSHLL(&lfc_ctl->wss_estimation, file_cache_hash_get_hash_value(lfc_hash, &tag));
|
||||
}
|
||||
|
||||
if (entry == NULL)
|
||||
@@ -1395,7 +1389,7 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
* Returns false if there are no unpinned entries and chunk can not be added.
|
||||
*/
|
||||
static bool
|
||||
lfc_init_new_entry(FileCacheEntry* entry, uint32 hash)
|
||||
lfc_init_new_entry(FileCacheEntry *entry)
|
||||
{
|
||||
/*-----------
|
||||
* If the chunk wasn't already in the LFC then we have these
|
||||
@@ -1451,21 +1445,18 @@ lfc_init_new_entry(FileCacheEntry* entry, uint32 hash)
|
||||
|
||||
CriticalAssert(victim->access_count == 0);
|
||||
entry->offset = victim->offset; /* grab victim's chunk */
|
||||
hash_search_with_hash_value(lfc_hash, &victim->key,
|
||||
victim->hash, HASH_REMOVE, NULL);
|
||||
file_cache_hash_remove_entry(lfc_hash, victim);
|
||||
neon_log(DEBUG2, "Swap file cache page");
|
||||
}
|
||||
else
|
||||
{
|
||||
/* Can't add this chunk - we don't have the space for it */
|
||||
hash_search_with_hash_value(lfc_hash, &entry->key, hash,
|
||||
HASH_REMOVE, NULL);
|
||||
file_cache_hash_remove_entry(lfc_hash, entry);
|
||||
lfc_ctl->prewarm_canceled = true; /* cancel prewarm if LFC limit is reached */
|
||||
return false;
|
||||
}
|
||||
|
||||
entry->access_count = 1;
|
||||
entry->hash = hash;
|
||||
lfc_ctl->pinned += 1;
|
||||
|
||||
for (int i = 0; i < BLOCKS_PER_CHUNK; i++)
|
||||
@@ -1505,7 +1496,7 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
|
||||
FileCacheEntry *entry;
|
||||
ssize_t rc;
|
||||
bool found;
|
||||
uint32 hash;
|
||||
uint64 hash;
|
||||
uint64 generation;
|
||||
uint32 entry_offset;
|
||||
instr_time io_start, io_end;
|
||||
@@ -1524,7 +1515,7 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
|
||||
CriticalAssert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber);
|
||||
|
||||
tag.blockNum = blkno - chunk_offs;
|
||||
hash = get_hash_value(lfc_hash, &tag);
|
||||
hash = file_cache_hash_get_hash_value(lfc_hash, &tag);
|
||||
cv = &lfc_ctl->cv[hash % N_COND_VARS];
|
||||
|
||||
retry:
|
||||
@@ -1549,12 +1540,12 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
|
||||
return false;
|
||||
}
|
||||
|
||||
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_ENTER, &found);
|
||||
entry = file_cache_hash_enter(lfc_hash, &tag, hash, &found);
|
||||
|
||||
if (lfc_prewarm_update_ws_estimation)
|
||||
{
|
||||
tag.blockNum = blkno;
|
||||
addSHLL(&lfc_ctl->wss_estimation, hash_bytes((uint8_t const*)&tag, sizeof(tag)));
|
||||
addSHLL(&lfc_ctl->wss_estimation, file_cache_hash_get_hash_value(lfc_hash, &tag));
|
||||
}
|
||||
if (found)
|
||||
{
|
||||
@@ -1576,7 +1567,7 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
|
||||
}
|
||||
else
|
||||
{
|
||||
if (!lfc_init_new_entry(entry, hash))
|
||||
if (!lfc_init_new_entry(entry))
|
||||
{
|
||||
/*
|
||||
* We can't process this chunk due to lack of space in LFC,
|
||||
@@ -1659,7 +1650,7 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
FileCacheEntry *entry;
|
||||
ssize_t rc;
|
||||
bool found;
|
||||
uint32 hash;
|
||||
uint64 hash;
|
||||
uint64 generation;
|
||||
uint32 entry_offset;
|
||||
int buf_offset = 0;
|
||||
@@ -1711,16 +1702,16 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
}
|
||||
|
||||
tag.blockNum = blkno - chunk_offs;
|
||||
hash = get_hash_value(lfc_hash, &tag);
|
||||
hash = file_cache_hash_get_hash_value(lfc_hash, &tag);
|
||||
cv = &lfc_ctl->cv[hash % N_COND_VARS];
|
||||
|
||||
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_ENTER, &found);
|
||||
entry = file_cache_hash_enter(lfc_hash, &tag, hash, &found);
|
||||
|
||||
/* Approximate working set for the blocks assumed in this entry */
|
||||
for (int i = 0; i < blocks_in_chunk; i++)
|
||||
{
|
||||
tag.blockNum = blkno + i;
|
||||
addSHLL(&lfc_ctl->wss_estimation, hash_bytes((uint8_t const*)&tag, sizeof(tag)));
|
||||
addSHLL(&lfc_ctl->wss_estimation, file_cache_hash_get_hash_value(lfc_hash, &tag));
|
||||
}
|
||||
|
||||
if (found)
|
||||
@@ -1737,7 +1728,7 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
}
|
||||
else
|
||||
{
|
||||
if (!lfc_init_new_entry(entry, hash))
|
||||
if (!lfc_init_new_entry(entry))
|
||||
{
|
||||
/*
|
||||
* We can't process this chunk due to lack of space in LFC,
|
||||
@@ -2147,7 +2138,6 @@ local_cache_pages(PG_FUNCTION_ARGS)
|
||||
|
||||
if (SRF_IS_FIRSTCALL())
|
||||
{
|
||||
HASH_SEQ_STATUS status;
|
||||
FileCacheEntry *entry;
|
||||
uint32 n_pages = 0;
|
||||
|
||||
@@ -2203,9 +2193,14 @@ local_cache_pages(PG_FUNCTION_ARGS)
|
||||
|
||||
if (LFC_ENABLED())
|
||||
{
|
||||
hash_seq_init(&status, lfc_hash);
|
||||
while ((entry = hash_seq_search(&status)) != NULL)
|
||||
uint32 num_buckets = file_cache_hash_get_num_buckets(lfc_hash);
|
||||
|
||||
for (uint32 pos = 0; pos < num_buckets; pos++)
|
||||
{
|
||||
entry = file_cache_hash_get_at_pos(lfc_hash, pos);
|
||||
if (entry == NULL)
|
||||
continue;
|
||||
|
||||
for (int i = 0; i < BLOCKS_PER_CHUNK; i++)
|
||||
n_pages += GET_STATE(entry, i) == AVAILABLE;
|
||||
}
|
||||
@@ -2229,20 +2224,26 @@ local_cache_pages(PG_FUNCTION_ARGS)
|
||||
* in the fctx->record structure.
|
||||
*/
|
||||
uint32 n = 0;
|
||||
uint32 num_buckets = file_cache_hash_get_num_buckets(lfc_hash);
|
||||
|
||||
hash_seq_init(&status, lfc_hash);
|
||||
while ((entry = hash_seq_search(&status)) != NULL)
|
||||
for (uint32 pos = 0; pos < num_buckets; pos++)
|
||||
{
|
||||
entry = file_cache_hash_get_at_pos(lfc_hash, pos);
|
||||
if (entry == NULL)
|
||||
continue;
|
||||
|
||||
for (int i = 0; i < BLOCKS_PER_CHUNK; i++)
|
||||
{
|
||||
const BufferTag *key = file_cache_hash_get_key_for_entry(lfc_hash, entry);
|
||||
|
||||
if (GET_STATE(entry, i) == AVAILABLE)
|
||||
{
|
||||
fctx->record[n].pageoffs = entry->offset * BLOCKS_PER_CHUNK + i;
|
||||
fctx->record[n].relfilenode = NInfoGetRelNumber(BufTagGetNRelFileInfo(entry->key));
|
||||
fctx->record[n].reltablespace = NInfoGetSpcOid(BufTagGetNRelFileInfo(entry->key));
|
||||
fctx->record[n].reldatabase = NInfoGetDbOid(BufTagGetNRelFileInfo(entry->key));
|
||||
fctx->record[n].forknum = entry->key.forkNum;
|
||||
fctx->record[n].blocknum = entry->key.blockNum + i;
|
||||
fctx->record[n].relfilenode = NInfoGetRelNumber(BufTagGetNRelFileInfo(*key));
|
||||
fctx->record[n].reltablespace = NInfoGetSpcOid(BufTagGetNRelFileInfo(*key));
|
||||
fctx->record[n].reldatabase = NInfoGetDbOid(BufTagGetNRelFileInfo(*key));
|
||||
fctx->record[n].forknum = key->forkNum;
|
||||
fctx->record[n].blocknum = key->blockNum + i;
|
||||
fctx->record[n].accesscount = entry->access_count;
|
||||
n += 1;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user