Add placeholder shmem hashmap implementation

Use that instead of the half-baked Adaptive Radix Tree
implementation. ART would probably be better in the long run, but more
complicated to implement.
This commit is contained in:
Heikki Linnakangas
2025-05-27 18:12:50 +03:00
parent 7c9bd542a6
commit 009168d711
13 changed files with 1452 additions and 777 deletions

21
Cargo.lock generated
View File

@@ -1359,7 +1359,7 @@ dependencies = [
"http 1.1.0",
"libc",
"metrics",
"neonart",
"neon-shmem",
"nix 0.30.1",
"pageserver_client_grpc",
"pageserver_page_api",
@@ -3930,6 +3930,9 @@ name = "neon-shmem"
version = "0.1.0"
dependencies = [
"nix 0.30.1",
"rand 0.9.1",
"rand_distr 0.5.1",
"spin",
"tempfile",
"thiserror 1.0.69",
"workspace_hack",
@@ -4378,12 +4381,15 @@ name = "pagebench"
version = "0.1.0"
dependencies = [
"anyhow",
"axum 0.8.1",
"camino",
"clap",
"futures",
"hdrhistogram",
"http 1.1.0",
"humantime",
"humantime-serde",
"metrics",
"pageserver_api",
"pageserver_client",
"pageserver_client_grpc",
@@ -4586,7 +4592,9 @@ dependencies = [
"http 1.1.0",
"hyper 1.6.0",
"hyper-util",
"metrics",
"pageserver_page_api",
"priority-queue",
"rand 0.8.5",
"thiserror 1.0.69",
"tokio",
@@ -5175,6 +5183,17 @@ dependencies = [
"elliptic-curve 0.13.8",
]
[[package]]
name = "priority-queue"
version = "2.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ef08705fa1589a1a59aa924ad77d14722cb0cd97b67dd5004ed5f4a4873fce8d"
dependencies = [
"autocfg",
"equivalent",
"indexmap 2.9.0",
]
[[package]]
name = "proc-macro2"
version = "1.0.94"

View File

@@ -258,6 +258,7 @@ endpoint_storage = { version = "0.0.1", path = "./endpoint_storage/" }
http-utils = { version = "0.1", path = "./libs/http-utils/" }
metrics = { version = "0.1", path = "./libs/metrics/" }
neonart = { version = "0.1", path = "./libs/neonart/" }
neon-shmem = { version = "0.1", path = "./libs/neon-shmem/" }
pageserver = { path = "./pageserver" }
pageserver_api = { version = "0.1", path = "./libs/pageserver_api/" }
pageserver_client = { path = "./pageserver/client" }

View File

@@ -6,8 +6,13 @@ license.workspace = true
[dependencies]
thiserror.workspace = true
nix.workspace=true
nix.workspace = true
spin.workspace = true
workspace_hack = { version = "0.1", path = "../../workspace_hack" }
[dev-dependencies]
rand = "0.9.1"
rand_distr = "0.5.1"
[target.'cfg(target_os = "macos")'.dependencies]
tempfile = "3.14.0"

265
libs/neon-shmem/src/hash.rs Normal file
View File

@@ -0,0 +1,265 @@
//! Hash table implementation on top of 'shmem'
//!
//! Features required in the long run by the communicator project:
//!
//! [X] Accessible from both Postgres processes and rust threads in the communicator process
//! [X] Low latency
//! [ ] Scalable to lots of concurrent accesses (currently uses a single spinlock)
//! [ ] Resizable
use std::cmp::Eq;
use std::fmt::Debug;
use std::hash::Hash;
use std::ops::Deref;
use crate::shmem::ShmemHandle;
use spin;
mod core;
#[cfg(test)]
mod tests;
use core::CoreHashMap;
/// Fixed-length key type
pub trait Key: Clone + Debug + Hash + Eq {
const KEY_LEN: usize;
fn as_bytes(&self) -> &[u8];
}
/// Values stored in the hash table
pub trait Value {}
pub enum UpdateAction<V> {
Nothing,
Insert(V),
Remove,
}
#[derive(Debug)]
pub struct OutOfMemoryError();
pub struct HashMapInit<'a, K, V>
where
K: Key,
V: Value,
{
shmem: ShmemHandle,
shared_ptr: *mut HashMapShared<'a, K, V>,
}
pub struct HashMapAccess<'a, K: Key, V: Value> {
_shmem: ShmemHandle,
shared_ptr: *mut HashMapShared<'a, K, V>,
}
unsafe impl<'a, K: Key + Sync, V: Value + Sync> Sync for HashMapAccess<'a, K, V> {}
unsafe impl<'a, K: Key + Send, V: Value + Send> Send for HashMapAccess<'a, K, V> {}
impl<'a, K: Key, V: Value> HashMapInit<'a, K, V> {
pub fn attach_writer(self) -> HashMapAccess<'a, K, V> {
HashMapAccess {
_shmem: self.shmem,
shared_ptr: self.shared_ptr,
}
}
pub fn attach_reader(self) -> HashMapAccess<'a, K, V> {
// no difference to attach_writer currently
self.attach_writer()
}
}
// This is stored in the shared memory area
struct HashMapShared<'a, K, V>
where
K: Key,
V: Value,
{
inner: spin::RwLock<CoreHashMap<'a, K, V>>,
}
impl<'a, K: Key, V: Value> HashMapInit<'a, K, V> {
/// Initialize a new hash map in the given shared memory area
pub fn init_in_shmem(mut shmem: ShmemHandle, size: usize) -> HashMapInit<'a, K, V> {
shmem
.set_size(size)
.expect("could not resize shared memory area");
// carve out HashMapShared from the struct. This does not include the hashmap's dictionary
// and buckets.
let mut ptr: *mut u8 = unsafe { shmem.data_ptr.as_mut() };
ptr = unsafe { ptr.add(ptr.align_offset(align_of::<HashMapShared<K, V>>())) };
let shared_ptr: *mut HashMapShared<K, V> = ptr.cast();
ptr = unsafe { ptr.add(size_of::<HashMapShared<K, V>>()) };
// the rest of the space is given to the hash map's dictionary and buckets
let remaining_area = unsafe {
std::slice::from_raw_parts_mut(
ptr,
size - ptr.offset_from(shmem.data_ptr.as_mut()) as usize,
)
};
let hashmap = CoreHashMap::new(remaining_area);
unsafe {
std::ptr::write(
shared_ptr,
HashMapShared {
inner: spin::RwLock::new(hashmap),
},
);
}
HashMapInit { shmem, shared_ptr }
}
}
impl<'a, K: Key, V: Value> HashMapAccess<'a, K, V> {
pub fn get<'e>(&'e self, key: &K) -> Option<ValueReadGuard<'e, K, V>> {
let map = unsafe { self.shared_ptr.as_ref() }.unwrap();
let lock_guard = map.inner.read();
match lock_guard.get(key) {
None => None,
Some(val_ref) => {
let val_ptr = std::ptr::from_ref(val_ref);
Some(ValueReadGuard {
_lock_guard: lock_guard,
value: val_ptr,
})
}
}
}
/// Insert a value
pub fn insert(&self, key: &K, value: V) -> Result<bool, OutOfMemoryError> {
let mut success = None;
self.update_with_fn(key, |existing| {
if let Some(_) = existing {
success = Some(false);
UpdateAction::Nothing
} else {
success = Some(true);
UpdateAction::Insert(value)
}
})?;
Ok(success.expect("value_fn not called"))
}
/// Remove value. Returns true if it existed
pub fn remove(&self, key: &K) -> bool {
let mut result = false;
self.update_with_fn(key, |existing| match existing {
Some(_) => {
result = true;
UpdateAction::Remove
}
None => UpdateAction::Nothing,
})
.expect("out of memory while removing");
result
}
/// Update key using the given function. All the other modifying operations are based on this.
pub fn update_with_fn<F>(&self, key: &K, value_fn: F) -> Result<(), OutOfMemoryError>
where
F: FnOnce(Option<&V>) -> UpdateAction<V>,
{
let map = unsafe { self.shared_ptr.as_ref() }.unwrap();
let mut lock_guard = map.inner.write();
let old_val = lock_guard.get(key);
let action = value_fn(old_val);
match (old_val, action) {
(_, UpdateAction::Nothing) => {}
(_, UpdateAction::Insert(new_val)) => {
let _ = lock_guard.insert(key, new_val);
}
(None, UpdateAction::Remove) => panic!("Remove action with no old value"),
(Some(_), UpdateAction::Remove) => {
let _ = lock_guard.remove(key);
}
}
Ok(())
}
/// Update key using the given function. All the other modifying operations are based on this.
pub fn update_with_fn_at_bucket<F>(
&self,
pos: usize,
value_fn: F,
) -> Result<(), OutOfMemoryError>
where
F: FnOnce(Option<&V>) -> UpdateAction<V>,
{
let map = unsafe { self.shared_ptr.as_ref() }.unwrap();
let mut lock_guard = map.inner.write();
let old_val = lock_guard.get_bucket(pos);
let action = value_fn(old_val.map(|(_k, v)| v));
match (old_val, action) {
(_, UpdateAction::Nothing) => {}
(_, UpdateAction::Insert(_new_val)) => panic!("cannot insert without key"),
(None, UpdateAction::Remove) => panic!("Remove action with no old value"),
(Some((key, _value)), UpdateAction::Remove) => {
let key = key.clone();
let _ = lock_guard.remove(&key);
}
}
Ok(())
}
pub fn get_num_buckets(&self) -> usize {
let map = unsafe { self.shared_ptr.as_ref() }.unwrap();
map.inner.read().get_num_buckets()
}
/// Return the key and value stored in bucket with given index. This can be used to
/// iterate through the hash map. (An Iterator might be nicer. The communicator's
/// clock algorithm needs to _slowly_ iterate through all buckets with its clock hand,
/// without holding a lock. If we switch to an Iterator, it must not hold the lock.)
pub fn get_bucket<'e>(&'e self, pos: usize) -> Option<ValueReadGuard<'e, K, V>> {
let map = unsafe { self.shared_ptr.as_ref() }.unwrap();
let lock_guard = map.inner.read();
match lock_guard.get_bucket(pos) {
None => None,
Some((_key, val_ref)) => {
let val_ptr = std::ptr::from_ref(val_ref);
Some(ValueReadGuard {
_lock_guard: lock_guard,
value: val_ptr,
})
}
}
}
// for metrics
pub fn get_num_buckets_in_use(&self) -> usize {
let map = unsafe { self.shared_ptr.as_ref() }.unwrap();
map.inner.read().buckets_in_use as usize
}
}
pub struct ValueReadGuard<'a, K: Key, V: Value> {
_lock_guard: spin::RwLockReadGuard<'a, CoreHashMap<'a, K, V>>,
value: *const V,
}
impl<'a, K: Key, V: Value> Deref for ValueReadGuard<'a, K, V> {
type Target = V;
fn deref(&self) -> &Self::Target {
// SAFETY: The `lock_guard` ensures that the underlying map (and thus the value pointed to
// by `value`) remains valid for the lifetime `'a`. The `value` has been obtained from a
// valid reference within the map.
unsafe { &*self.value }
}
}

View File

@@ -0,0 +1,224 @@
//! Simple hash table with chaining
use std::hash::{DefaultHasher, Hasher};
use std::mem::MaybeUninit;
use crate::hash::Key;
const INVALID_POS: u32 = u32::MAX;
// Bucket
struct Bucket<K: Key, V> {
hash: u64,
next: u32,
inner: Option<(K, V)>,
}
pub(crate) struct CoreHashMap<'a, K: Key, V> {
dictionary: &'a mut [u32],
buckets: &'a mut [Bucket<K, V>],
free_head: u32,
// metrics
pub(crate) buckets_in_use: u32,
}
pub struct FullError();
impl<'a, K: Key, V> CoreHashMap<'a, K, V> {
const FILL_FACTOR: f32 = 0.5;
pub fn new(area: &'a mut [u8]) -> CoreHashMap<'a, K, V> {
let len = area.len();
let mut ptr: *mut u8 = area.as_mut_ptr();
let end_ptr: *mut u8 = unsafe { area.as_mut_ptr().add(len) };
// How much space is left?
let size_remain = unsafe { end_ptr.byte_offset_from(ptr) };
let num_buckets = f32::floor(
size_remain as f32
/ (size_of::<Bucket<K, V>>() as f32
+ size_of::<u32>() as f32 * 1.0 / Self::FILL_FACTOR),
) as usize;
// carve out the buckets
ptr = unsafe { ptr.byte_add(ptr.align_offset(align_of::<Bucket<K, V>>())) };
let buckets_ptr = ptr;
ptr = unsafe { ptr.add(size_of::<Bucket<K, V>>() * num_buckets) };
// use remaining space for the dictionary
ptr = unsafe { ptr.byte_add(ptr.align_offset(align_of::<u32>())) };
let dictionary_ptr = ptr;
assert!(ptr.addr() < end_ptr.addr());
let dictionary_size = unsafe { end_ptr.byte_offset_from(ptr) / size_of::<u32>() as isize };
assert!(dictionary_size > 0);
// Initialize the buckets
let buckets = {
let buckets_ptr: *mut MaybeUninit<Bucket<K, V>> = buckets_ptr.cast();
let buckets = unsafe { std::slice::from_raw_parts_mut(buckets_ptr, num_buckets) };
for i in 0..buckets.len() {
buckets[i].write(Bucket {
hash: 0,
next: if i < buckets.len() - 1 {
i as u32 + 1
} else {
INVALID_POS
},
inner: None,
});
}
// TODO: use std::slice::assume_init_mut() once it stabilizes
unsafe { std::slice::from_raw_parts_mut(buckets_ptr.cast(), num_buckets) }
};
// Initialize the dictionary
let dictionary = {
let dictionary_ptr: *mut MaybeUninit<u32> = dictionary_ptr.cast();
let dictionary =
unsafe { std::slice::from_raw_parts_mut(dictionary_ptr, dictionary_size as usize) };
for i in 0..dictionary.len() {
dictionary[i].write(INVALID_POS);
}
// TODO: use std::slice::assume_init_mut() once it stabilizes
unsafe {
std::slice::from_raw_parts_mut(dictionary_ptr.cast(), dictionary_size as usize)
}
};
CoreHashMap {
dictionary,
buckets,
free_head: 0,
buckets_in_use: 0,
}
}
pub fn get(&self, key: &K) -> Option<&V> {
let mut hasher = DefaultHasher::new();
key.hash(&mut hasher);
let hash = hasher.finish();
let mut next = self.dictionary[hash as usize % self.dictionary.len()];
loop {
if next == INVALID_POS {
return None;
}
let bucket = &self.buckets[next as usize];
let (bucket_key, bucket_value) = bucket.inner.as_ref().expect("entry is in use");
if bucket_key == key {
return Some(&bucket_value);
}
next = bucket.next;
}
}
pub fn insert(&mut self, key: &K, value: V) -> Result<(), FullError> {
let mut hasher = DefaultHasher::new();
key.hash(&mut hasher);
let hash = hasher.finish();
let first = self.dictionary[hash as usize % self.dictionary.len()];
if first == INVALID_POS {
// no existing entry
let pos = self.alloc_bucket(key.clone(), value, hash)?;
if pos == INVALID_POS {
return Err(FullError());
}
self.dictionary[hash as usize % self.dictionary.len()] = pos;
return Ok(());
}
let mut next = first;
loop {
let bucket = &mut self.buckets[next as usize];
let (bucket_key, bucket_value) = bucket.inner.as_mut().expect("entry is in use");
if bucket_key == key {
// found existing entry, update its value
*bucket_value = value;
return Ok(());
}
if bucket.next == INVALID_POS {
// No existing entry found. Append to the chain
let pos = self.alloc_bucket(key.clone(), value, hash)?;
if pos == INVALID_POS {
return Err(FullError());
}
self.buckets[next as usize].next = pos;
return Ok(());
}
next = bucket.next;
}
}
pub fn remove(&mut self, key: &K) -> Result<(), FullError> {
let mut hasher = DefaultHasher::new();
key.hash(&mut hasher);
let hash = hasher.finish();
let mut next = self.dictionary[hash as usize % self.dictionary.len()];
let mut prev_pos: u32 = INVALID_POS;
loop {
if next == INVALID_POS {
// no existing entry
return Ok(());
}
let bucket = &mut self.buckets[next as usize];
let (bucket_key, _) = bucket.inner.as_mut().expect("entry is in use");
if bucket_key == key {
// found existing entry, unlink it from the chain
if prev_pos == INVALID_POS {
self.dictionary[hash as usize % self.dictionary.len()] = bucket.next;
} else {
self.buckets[prev_pos as usize].next = bucket.next;
}
// and add it to the freelist
let bucket = &mut self.buckets[next as usize];
bucket.hash = 0;
bucket.inner = None;
bucket.next = self.free_head;
self.free_head = next;
self.buckets_in_use -= 1;
return Ok(());
}
prev_pos = next;
next = bucket.next;
}
}
pub fn get_num_buckets(&self) -> usize {
self.buckets.len()
}
pub fn get_bucket(&self, pos: usize) -> Option<&(K, V)> {
if pos >= self.buckets.len() {
return None;
}
self.buckets[pos].inner.as_ref()
}
fn alloc_bucket(&mut self, key: K, value: V, hash: u64) -> Result<u32, FullError> {
let pos = self.free_head;
if pos == INVALID_POS {
return Err(FullError());
}
let bucket = &mut self.buckets[pos as usize];
self.free_head = bucket.next;
self.buckets_in_use += 1;
bucket.hash = hash;
bucket.next = INVALID_POS;
bucket.inner = Some((key, value));
return Ok(pos);
}
}

View File

@@ -0,0 +1,194 @@
use std::collections::BTreeMap;
use std::collections::HashSet;
use std::fmt::{Debug, Formatter};
use std::sync::atomic::{AtomicUsize, Ordering};
use crate::hash::HashMapAccess;
use crate::hash::HashMapInit;
use crate::hash::UpdateAction;
use crate::hash::{Key, Value};
use crate::shmem::ShmemHandle;
use rand::Rng;
use rand::seq::SliceRandom;
use rand_distr::Zipf;
const TEST_KEY_LEN: usize = 16;
#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)]
struct TestKey([u8; TEST_KEY_LEN]);
impl Key for TestKey {
const KEY_LEN: usize = TEST_KEY_LEN;
fn as_bytes(&self) -> &[u8] {
&self.0
}
}
impl From<&TestKey> for u128 {
fn from(val: &TestKey) -> u128 {
u128::from_be_bytes(val.0)
}
}
impl From<u128> for TestKey {
fn from(val: u128) -> TestKey {
TestKey(val.to_be_bytes())
}
}
impl<'a> From<&'a [u8]> for TestKey {
fn from(bytes: &'a [u8]) -> TestKey {
TestKey(bytes.try_into().unwrap())
}
}
impl Value for usize {}
fn test_inserts<K: Into<TestKey> + Copy>(keys: &[K]) {
const MEM_SIZE: usize = 10000000;
let shmem = ShmemHandle::new("test_inserts", 0, MEM_SIZE).unwrap();
let init_struct = HashMapInit::<TestKey, usize>::init_in_shmem(shmem, MEM_SIZE);
let mut w = init_struct.attach_writer();
for (idx, k) in keys.iter().enumerate() {
let res = w.insert(&(*k).into(), idx);
assert!(res.is_ok());
}
for (idx, k) in keys.iter().enumerate() {
let x = w.get(&(*k).into());
let value = x.as_deref().copied();
assert_eq!(value, Some(idx));
}
//eprintln!("stats: {:?}", tree_writer.get_statistics());
}
#[test]
fn dense() {
// This exercises splitting a node with prefix
let keys: &[u128] = &[0, 1, 2, 3, 256];
test_inserts(keys);
// Dense keys
let mut keys: Vec<u128> = (0..10000).collect();
test_inserts(&keys);
// Do the same in random orders
for _ in 1..10 {
keys.shuffle(&mut rand::rng());
test_inserts(&keys);
}
}
#[test]
fn sparse() {
// sparse keys
let mut keys: Vec<TestKey> = Vec::new();
let mut used_keys = HashSet::new();
for _ in 0..10000 {
loop {
let key = rand::random::<u128>();
if used_keys.get(&key).is_some() {
continue;
}
used_keys.insert(key);
keys.push(key.into());
break;
}
}
test_inserts(&keys);
}
struct TestValue(AtomicUsize);
impl TestValue {
fn new(val: usize) -> TestValue {
TestValue(AtomicUsize::new(val))
}
fn load(&self) -> usize {
self.0.load(Ordering::Relaxed)
}
}
impl Value for TestValue {}
impl Clone for TestValue {
fn clone(&self) -> TestValue {
TestValue::new(self.load())
}
}
impl Debug for TestValue {
fn fmt(&self, fmt: &mut Formatter<'_>) -> Result<(), std::fmt::Error> {
write!(fmt, "{:?}", self.load())
}
}
#[derive(Clone, Debug)]
struct TestOp(TestKey, Option<usize>);
fn apply_op(
op: &TestOp,
sut: &HashMapAccess<TestKey, TestValue>,
shadow: &mut BTreeMap<TestKey, usize>,
) {
eprintln!("applying op: {op:?}");
// apply the change to the shadow tree first
let shadow_existing = if let Some(v) = op.1 {
shadow.insert(op.0, v)
} else {
shadow.remove(&op.0)
};
// apply to Art tree
sut.update_with_fn(&op.0, |existing| {
assert_eq!(existing.map(TestValue::load), shadow_existing);
match (existing, op.1) {
(None, None) => UpdateAction::Nothing,
(None, Some(new_val)) => UpdateAction::Insert(TestValue::new(new_val)),
(Some(_old_val), None) => UpdateAction::Remove,
(Some(old_val), Some(new_val)) => {
old_val.0.store(new_val, Ordering::Relaxed);
UpdateAction::Nothing
}
}
})
.expect("out of memory");
}
#[test]
fn random_ops() {
const MEM_SIZE: usize = 10000000;
let shmem = ShmemHandle::new("test_inserts", 0, MEM_SIZE).unwrap();
let init_struct = HashMapInit::<TestKey, TestValue>::init_in_shmem(shmem, MEM_SIZE);
let writer = init_struct.attach_writer();
let mut shadow: std::collections::BTreeMap<TestKey, usize> = BTreeMap::new();
let distribution = Zipf::new(u128::MAX as f64, 1.1).unwrap();
let mut rng = rand::rng();
for i in 0..100000 {
let mut key: TestKey = (rng.sample(distribution) as u128).into();
if rng.random_bool(0.10) {
key = TestKey::from(u128::from(&key) | 0xffffffff);
}
let op = TestOp(key, if rng.random_bool(0.75) { Some(i) } else { None });
apply_op(&op, &writer, &mut shadow);
if i % 1000 == 0 {
eprintln!("{i} ops processed");
//eprintln!("stats: {:?}", tree_writer.get_statistics());
//test_iter(&tree_writer, &shadow);
}
}
}

View File

@@ -1,418 +1,4 @@
//! Shared memory utilities for neon communicator
use std::num::NonZeroUsize;
use std::os::fd::{AsFd, BorrowedFd, OwnedFd};
use std::ptr::NonNull;
use std::sync::atomic::{AtomicUsize, Ordering};
use nix::errno::Errno;
use nix::sys::mman::MapFlags;
use nix::sys::mman::ProtFlags;
use nix::sys::mman::mmap as nix_mmap;
use nix::sys::mman::munmap as nix_munmap;
use nix::unistd::ftruncate as nix_ftruncate;
/// ShmemHandle represents a shared memory area that can be shared by processes over fork().
/// Unlike shared memory allocated by Postgres, this area is resizable, up to 'max_size' that's
/// specified at creation.
///
/// The area is backed by an anonymous file created with memfd_create(). The full address space for
/// 'max_size' is reserved up-front with mmap(), but whenever you call [`ShmemHandle::set_size`],
/// the underlying file is resized. Do not access the area beyond the current size. Currently, that
/// will cause the file to be expanded, but we might use mprotect() etc. to enforce that in the
/// future.
pub struct ShmemHandle {
/// memfd file descriptor
fd: OwnedFd,
max_size: usize,
// Pointer to the beginning of the shared memory area. The header is stored there.
shared_ptr: NonNull<SharedStruct>,
// Pointer to the beginning of the user data
pub data_ptr: NonNull<u8>,
}
/// This is stored at the beginning in the shared memory area.
struct SharedStruct {
max_size: usize,
/// Current size of the backing file. The high-order bit is used for the RESIZE_IN_PROGRESS flag
current_size: AtomicUsize,
}
const RESIZE_IN_PROGRESS: usize = 1 << 63;
const HEADER_SIZE: usize = std::mem::size_of::<SharedStruct>();
/// Error type returned by the ShmemHandle functions.
#[derive(thiserror::Error, Debug)]
#[error("{msg}: {errno}")]
pub struct Error {
pub msg: String,
pub errno: Errno,
}
impl Error {
fn new(msg: &str, errno: Errno) -> Error {
Error {
msg: msg.to_string(),
errno,
}
}
}
impl ShmemHandle {
/// Create a new shared memory area. To communicate between processes, the processes need to be
/// fork()'d after calling this, so that the ShmemHandle is inherited by all processes.
///
/// If the ShmemHandle is dropped, the memory is unmapped from the current process. Other
/// processes can continue using it, however.
pub fn new(name: &str, initial_size: usize, max_size: usize) -> Result<ShmemHandle, Error> {
// create the backing anonymous file.
let fd = create_backing_file(name)?;
Self::new_with_fd(fd, initial_size, max_size)
}
fn new_with_fd(
fd: OwnedFd,
initial_size: usize,
max_size: usize,
) -> Result<ShmemHandle, Error> {
// We reserve the high-order bit for the RESIZE_IN_PROGRESS flag, and the actual size
// is a little larger than this because of the SharedStruct header. Make the upper limit
// somewhat smaller than that, because with anything close to that, you'll run out of
// memory anyway.
if max_size >= 1 << 48 {
panic!("max size {} too large", max_size);
}
if initial_size > max_size {
panic!("initial size {initial_size} larger than max size {max_size}");
}
// The actual initial / max size is the one given by the caller, plus the size of
// 'SharedStruct'.
let initial_size = HEADER_SIZE + initial_size;
let max_size = NonZeroUsize::new(HEADER_SIZE + max_size).unwrap();
// Reserve address space for it with mmap
//
// TODO: Use MAP_HUGETLB if possible
let start_ptr = unsafe {
nix_mmap(
None,
max_size,
ProtFlags::PROT_READ | ProtFlags::PROT_WRITE,
MapFlags::MAP_SHARED,
&fd,
0,
)
}
.map_err(|e| Error::new("mmap failed: {e}", e))?;
// Reserve space for the initial size
enlarge_file(fd.as_fd(), initial_size as u64)?;
// Initialize the header
let shared: NonNull<SharedStruct> = start_ptr.cast();
unsafe {
shared.write(SharedStruct {
max_size: max_size.into(),
current_size: AtomicUsize::new(initial_size),
})
};
// The user data begins after the header
let data_ptr = unsafe { start_ptr.cast().add(HEADER_SIZE) };
Ok(ShmemHandle {
fd,
max_size: max_size.into(),
shared_ptr: shared,
data_ptr,
})
}
// return reference to the header
fn shared(&self) -> &SharedStruct {
unsafe { self.shared_ptr.as_ref() }
}
/// Resize the shared memory area. 'new_size' must not be larger than the 'max_size' specified
/// when creating the area.
///
/// This may only be called from one process/thread concurrently. We detect that case
/// and return an Error.
pub fn set_size(&self, new_size: usize) -> Result<(), Error> {
let new_size = new_size + HEADER_SIZE;
let shared = self.shared();
if new_size > self.max_size {
panic!(
"new size ({} is greater than max size ({})",
new_size, self.max_size
);
}
assert_eq!(self.max_size, shared.max_size);
// Lock the area by setting the bit in 'current_size'
//
// Ordering::Relaxed would probably be sufficient here, as we don't access any other memory
// and the posix_fallocate/ftruncate call is surely a synchronization point anyway. But
// since this is not performance-critical, better safe than sorry .
let mut old_size = shared.current_size.load(Ordering::Acquire);
loop {
if (old_size & RESIZE_IN_PROGRESS) != 0 {
return Err(Error::new(
"concurrent resize detected",
Errno::UnknownErrno,
));
}
match shared.current_size.compare_exchange(
old_size,
new_size,
Ordering::Acquire,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(x) => old_size = x,
}
}
// Ok, we got the lock.
//
// NB: If anything goes wrong, we *must* clear the bit!
let result = {
use std::cmp::Ordering::{Equal, Greater, Less};
match new_size.cmp(&old_size) {
Less => nix_ftruncate(&self.fd, new_size as i64).map_err(|e| {
Error::new("could not shrink shmem segment, ftruncate failed: {e}", e)
}),
Equal => Ok(()),
Greater => enlarge_file(self.fd.as_fd(), new_size as u64),
}
};
// Unlock
shared.current_size.store(
if result.is_ok() { new_size } else { old_size },
Ordering::Release,
);
result
}
/// Returns the current user-visible size of the shared memory segment.
///
/// NOTE: a concurrent set_size() call can change the size at any time. It is the caller's
/// responsibility not to access the area beyond the current size.
pub fn current_size(&self) -> usize {
let total_current_size =
self.shared().current_size.load(Ordering::Relaxed) & !RESIZE_IN_PROGRESS;
total_current_size - HEADER_SIZE
}
}
impl Drop for ShmemHandle {
fn drop(&mut self) {
// SAFETY: The pointer was obtained from mmap() with the given size.
// We unmap the entire region.
let _ = unsafe { nix_munmap(self.shared_ptr.cast(), self.max_size) };
// The fd is dropped automatically by OwnedFd.
}
}
/// Create a "backing file" for the shared memory area. On Linux, use memfd_create(), to create an
/// anonymous in-memory file. One macos, fall back to a regular file. That's good enough for
/// development and testing, but in production we want the file to stay in memory.
///
/// disable 'unused_variables' warnings, because in the macos path, 'name' is unused.
#[allow(unused_variables)]
fn create_backing_file(name: &str) -> Result<OwnedFd, Error> {
#[cfg(not(target_os = "macos"))]
{
nix::sys::memfd::memfd_create(name, nix::sys::memfd::MFdFlags::empty())
.map_err(|e| Error::new("memfd_create failed: {e}", e))
}
#[cfg(target_os = "macos")]
{
let file = tempfile::tempfile().map_err(|e| {
Error::new(
"could not create temporary file to back shmem area: {e}",
nix::errno::Errno::from_raw(e.raw_os_error().unwrap_or(0)),
)
})?;
Ok(OwnedFd::from(file))
}
}
fn enlarge_file(fd: BorrowedFd, size: u64) -> Result<(), Error> {
// Use posix_fallocate() to enlarge the file. It reserves the space correctly, so that
// we don't get a segfault later when trying to actually use it.
#[cfg(not(target_os = "macos"))]
{
nix::fcntl::posix_fallocate(fd, 0, size as i64).map_err(|e| {
Error::new(
"could not grow shmem segment, posix_fallocate failed: {e}",
e,
)
})
}
// As a fallback on macos, which doesn't have posix_fallocate, use plain 'fallocate'
#[cfg(target_os = "macos")]
{
nix::unistd::ftruncate(fd, size as i64)
.map_err(|e| Error::new("could not grow shmem segment, ftruncate failed: {e}", e))
}
}
#[cfg(test)]
mod tests {
use super::*;
use nix::unistd::ForkResult;
use std::ops::Range;
/// check that all bytes in given range have the expected value.
fn assert_range(ptr: *const u8, expected: u8, range: Range<usize>) {
for i in range {
let b = unsafe { *(ptr.add(i)) };
assert_eq!(expected, b, "unexpected byte at offset {}", i);
}
}
/// Write 'b' to all bytes in the given range
fn write_range(ptr: *mut u8, b: u8, range: Range<usize>) {
unsafe { std::ptr::write_bytes(ptr.add(range.start), b, range.end - range.start) };
}
// simple single-process test of growing and shrinking
#[test]
fn test_shmem_resize() -> Result<(), Error> {
let max_size = 1024 * 1024;
let init_struct = ShmemHandle::new("test_shmem_resize", 0, max_size)?;
assert_eq!(init_struct.current_size(), 0);
// Initial grow
let size1 = 10000;
init_struct.set_size(size1).unwrap();
assert_eq!(init_struct.current_size(), size1);
// Write some data
let data_ptr = init_struct.data_ptr.as_ptr();
write_range(data_ptr, 0xAA, 0..size1);
assert_range(data_ptr, 0xAA, 0..size1);
// Shrink
let size2 = 5000;
init_struct.set_size(size2).unwrap();
assert_eq!(init_struct.current_size(), size2);
// Grow again
let size3 = 20000;
init_struct.set_size(size3).unwrap();
assert_eq!(init_struct.current_size(), size3);
// Try to read it. The area that was shrunk and grown again should read as all zeros now
assert_range(data_ptr, 0xAA, 0..5000);
assert_range(data_ptr, 0, 5000..size1);
// Try to grow beyond max_size
//let size4 = max_size + 1;
//assert!(init_struct.set_size(size4).is_err());
// Dropping init_struct should unmap the memory
drop(init_struct);
Ok(())
}
/// This is used in tests to coordinate between test processes. It's like std::sync::Barrier,
/// but is stored in the shared memory area and works across processes. It's implemented by
/// polling, because e.g. standard rust mutexes are not guaranteed to work across processes.
struct SimpleBarrier {
num_procs: usize,
count: AtomicUsize,
}
impl SimpleBarrier {
unsafe fn init(ptr: *mut SimpleBarrier, num_procs: usize) {
unsafe {
*ptr = SimpleBarrier {
num_procs,
count: AtomicUsize::new(0),
}
}
}
pub fn wait(&self) {
let old = self.count.fetch_add(1, Ordering::Relaxed);
let generation = old / self.num_procs;
let mut current = old + 1;
while current < (generation + 1) * self.num_procs {
std::thread::sleep(std::time::Duration::from_millis(10));
current = self.count.load(Ordering::Relaxed);
}
}
}
#[test]
fn test_multi_process() {
// Initialize
let max_size = 1_000_000_000_000;
let init_struct = ShmemHandle::new("test_multi_process", 0, max_size).unwrap();
let ptr = init_struct.data_ptr.as_ptr();
// Store the SimpleBarrier in the first 1k of the area.
init_struct.set_size(10000).unwrap();
let barrier_ptr: *mut SimpleBarrier = unsafe {
ptr.add(ptr.align_offset(std::mem::align_of::<SimpleBarrier>()))
.cast()
};
unsafe { SimpleBarrier::init(barrier_ptr, 2) };
let barrier = unsafe { barrier_ptr.as_ref().unwrap() };
// Fork another test process. The code after this runs in both processes concurrently.
let fork_result = unsafe { nix::unistd::fork().unwrap() };
// In the parent, fill bytes between 1000..2000. In the child, between 2000..3000
if fork_result.is_parent() {
write_range(ptr, 0xAA, 1000..2000);
} else {
write_range(ptr, 0xBB, 2000..3000);
}
barrier.wait();
// Verify the contents. (in both processes)
assert_range(ptr, 0xAA, 1000..2000);
assert_range(ptr, 0xBB, 2000..3000);
// Grow, from the child this time
let size = 10_000_000;
if !fork_result.is_parent() {
init_struct.set_size(size).unwrap();
}
barrier.wait();
// make some writes at the end
if fork_result.is_parent() {
write_range(ptr, 0xAA, (size - 10)..size);
} else {
write_range(ptr, 0xBB, (size - 20)..(size - 10));
}
barrier.wait();
// Verify the contents. (This runs in both processes)
assert_range(ptr, 0, (size - 1000)..(size - 20));
assert_range(ptr, 0xBB, (size - 20)..(size - 10));
assert_range(ptr, 0xAA, (size - 10)..size);
if let ForkResult::Parent { child } = fork_result {
nix::sys::wait::waitpid(child, None).unwrap();
}
}
}
pub mod hash;
pub mod shmem;

View File

@@ -0,0 +1,418 @@
//! Dynamically resizable contiguous chunk of shared memory
use std::num::NonZeroUsize;
use std::os::fd::{AsFd, BorrowedFd, OwnedFd};
use std::ptr::NonNull;
use std::sync::atomic::{AtomicUsize, Ordering};
use nix::errno::Errno;
use nix::sys::mman::MapFlags;
use nix::sys::mman::ProtFlags;
use nix::sys::mman::mmap as nix_mmap;
use nix::sys::mman::munmap as nix_munmap;
use nix::unistd::ftruncate as nix_ftruncate;
/// ShmemHandle represents a shared memory area that can be shared by processes over fork().
/// Unlike shared memory allocated by Postgres, this area is resizable, up to 'max_size' that's
/// specified at creation.
///
/// The area is backed by an anonymous file created with memfd_create(). The full address space for
/// 'max_size' is reserved up-front with mmap(), but whenever you call [`ShmemHandle::set_size`],
/// the underlying file is resized. Do not access the area beyond the current size. Currently, that
/// will cause the file to be expanded, but we might use mprotect() etc. to enforce that in the
/// future.
pub struct ShmemHandle {
/// memfd file descriptor
fd: OwnedFd,
max_size: usize,
// Pointer to the beginning of the shared memory area. The header is stored there.
shared_ptr: NonNull<SharedStruct>,
// Pointer to the beginning of the user data
pub data_ptr: NonNull<u8>,
}
/// This is stored at the beginning in the shared memory area.
struct SharedStruct {
max_size: usize,
/// Current size of the backing file. The high-order bit is used for the RESIZE_IN_PROGRESS flag
current_size: AtomicUsize,
}
const RESIZE_IN_PROGRESS: usize = 1 << 63;
const HEADER_SIZE: usize = std::mem::size_of::<SharedStruct>();
/// Error type returned by the ShmemHandle functions.
#[derive(thiserror::Error, Debug)]
#[error("{msg}: {errno}")]
pub struct Error {
pub msg: String,
pub errno: Errno,
}
impl Error {
fn new(msg: &str, errno: Errno) -> Error {
Error {
msg: msg.to_string(),
errno,
}
}
}
impl ShmemHandle {
/// Create a new shared memory area. To communicate between processes, the processes need to be
/// fork()'d after calling this, so that the ShmemHandle is inherited by all processes.
///
/// If the ShmemHandle is dropped, the memory is unmapped from the current process. Other
/// processes can continue using it, however.
pub fn new(name: &str, initial_size: usize, max_size: usize) -> Result<ShmemHandle, Error> {
// create the backing anonymous file.
let fd = create_backing_file(name)?;
Self::new_with_fd(fd, initial_size, max_size)
}
fn new_with_fd(
fd: OwnedFd,
initial_size: usize,
max_size: usize,
) -> Result<ShmemHandle, Error> {
// We reserve the high-order bit for the RESIZE_IN_PROGRESS flag, and the actual size
// is a little larger than this because of the SharedStruct header. Make the upper limit
// somewhat smaller than that, because with anything close to that, you'll run out of
// memory anyway.
if max_size >= 1 << 48 {
panic!("max size {} too large", max_size);
}
if initial_size > max_size {
panic!("initial size {initial_size} larger than max size {max_size}");
}
// The actual initial / max size is the one given by the caller, plus the size of
// 'SharedStruct'.
let initial_size = HEADER_SIZE + initial_size;
let max_size = NonZeroUsize::new(HEADER_SIZE + max_size).unwrap();
// Reserve address space for it with mmap
//
// TODO: Use MAP_HUGETLB if possible
let start_ptr = unsafe {
nix_mmap(
None,
max_size,
ProtFlags::PROT_READ | ProtFlags::PROT_WRITE,
MapFlags::MAP_SHARED,
&fd,
0,
)
}
.map_err(|e| Error::new("mmap failed: {e}", e))?;
// Reserve space for the initial size
enlarge_file(fd.as_fd(), initial_size as u64)?;
// Initialize the header
let shared: NonNull<SharedStruct> = start_ptr.cast();
unsafe {
shared.write(SharedStruct {
max_size: max_size.into(),
current_size: AtomicUsize::new(initial_size),
})
};
// The user data begins after the header
let data_ptr = unsafe { start_ptr.cast().add(HEADER_SIZE) };
Ok(ShmemHandle {
fd,
max_size: max_size.into(),
shared_ptr: shared,
data_ptr,
})
}
// return reference to the header
fn shared(&self) -> &SharedStruct {
unsafe { self.shared_ptr.as_ref() }
}
/// Resize the shared memory area. 'new_size' must not be larger than the 'max_size' specified
/// when creating the area.
///
/// This may only be called from one process/thread concurrently. We detect that case
/// and return an Error.
pub fn set_size(&self, new_size: usize) -> Result<(), Error> {
let new_size = new_size + HEADER_SIZE;
let shared = self.shared();
if new_size > self.max_size {
panic!(
"new size ({} is greater than max size ({})",
new_size, self.max_size
);
}
assert_eq!(self.max_size, shared.max_size);
// Lock the area by setting the bit in 'current_size'
//
// Ordering::Relaxed would probably be sufficient here, as we don't access any other memory
// and the posix_fallocate/ftruncate call is surely a synchronization point anyway. But
// since this is not performance-critical, better safe than sorry .
let mut old_size = shared.current_size.load(Ordering::Acquire);
loop {
if (old_size & RESIZE_IN_PROGRESS) != 0 {
return Err(Error::new(
"concurrent resize detected",
Errno::UnknownErrno,
));
}
match shared.current_size.compare_exchange(
old_size,
new_size,
Ordering::Acquire,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(x) => old_size = x,
}
}
// Ok, we got the lock.
//
// NB: If anything goes wrong, we *must* clear the bit!
let result = {
use std::cmp::Ordering::{Equal, Greater, Less};
match new_size.cmp(&old_size) {
Less => nix_ftruncate(&self.fd, new_size as i64).map_err(|e| {
Error::new("could not shrink shmem segment, ftruncate failed: {e}", e)
}),
Equal => Ok(()),
Greater => enlarge_file(self.fd.as_fd(), new_size as u64),
}
};
// Unlock
shared.current_size.store(
if result.is_ok() { new_size } else { old_size },
Ordering::Release,
);
result
}
/// Returns the current user-visible size of the shared memory segment.
///
/// NOTE: a concurrent set_size() call can change the size at any time. It is the caller's
/// responsibility not to access the area beyond the current size.
pub fn current_size(&self) -> usize {
let total_current_size =
self.shared().current_size.load(Ordering::Relaxed) & !RESIZE_IN_PROGRESS;
total_current_size - HEADER_SIZE
}
}
impl Drop for ShmemHandle {
fn drop(&mut self) {
// SAFETY: The pointer was obtained from mmap() with the given size.
// We unmap the entire region.
let _ = unsafe { nix_munmap(self.shared_ptr.cast(), self.max_size) };
// The fd is dropped automatically by OwnedFd.
}
}
/// Create a "backing file" for the shared memory area. On Linux, use memfd_create(), to create an
/// anonymous in-memory file. One macos, fall back to a regular file. That's good enough for
/// development and testing, but in production we want the file to stay in memory.
///
/// disable 'unused_variables' warnings, because in the macos path, 'name' is unused.
#[allow(unused_variables)]
fn create_backing_file(name: &str) -> Result<OwnedFd, Error> {
#[cfg(not(target_os = "macos"))]
{
nix::sys::memfd::memfd_create(name, nix::sys::memfd::MFdFlags::empty())
.map_err(|e| Error::new("memfd_create failed: {e}", e))
}
#[cfg(target_os = "macos")]
{
let file = tempfile::tempfile().map_err(|e| {
Error::new(
"could not create temporary file to back shmem area: {e}",
nix::errno::Errno::from_raw(e.raw_os_error().unwrap_or(0)),
)
})?;
Ok(OwnedFd::from(file))
}
}
fn enlarge_file(fd: BorrowedFd, size: u64) -> Result<(), Error> {
// Use posix_fallocate() to enlarge the file. It reserves the space correctly, so that
// we don't get a segfault later when trying to actually use it.
#[cfg(not(target_os = "macos"))]
{
nix::fcntl::posix_fallocate(fd, 0, size as i64).map_err(|e| {
Error::new(
"could not grow shmem segment, posix_fallocate failed: {e}",
e,
)
})
}
// As a fallback on macos, which doesn't have posix_fallocate, use plain 'fallocate'
#[cfg(target_os = "macos")]
{
nix::unistd::ftruncate(fd, size as i64)
.map_err(|e| Error::new("could not grow shmem segment, ftruncate failed: {e}", e))
}
}
#[cfg(test)]
mod tests {
use super::*;
use nix::unistd::ForkResult;
use std::ops::Range;
/// check that all bytes in given range have the expected value.
fn assert_range(ptr: *const u8, expected: u8, range: Range<usize>) {
for i in range {
let b = unsafe { *(ptr.add(i)) };
assert_eq!(expected, b, "unexpected byte at offset {}", i);
}
}
/// Write 'b' to all bytes in the given range
fn write_range(ptr: *mut u8, b: u8, range: Range<usize>) {
unsafe { std::ptr::write_bytes(ptr.add(range.start), b, range.end - range.start) };
}
// simple single-process test of growing and shrinking
#[test]
fn test_shmem_resize() -> Result<(), Error> {
let max_size = 1024 * 1024;
let init_struct = ShmemHandle::new("test_shmem_resize", 0, max_size)?;
assert_eq!(init_struct.current_size(), 0);
// Initial grow
let size1 = 10000;
init_struct.set_size(size1).unwrap();
assert_eq!(init_struct.current_size(), size1);
// Write some data
let data_ptr = init_struct.data_ptr.as_ptr();
write_range(data_ptr, 0xAA, 0..size1);
assert_range(data_ptr, 0xAA, 0..size1);
// Shrink
let size2 = 5000;
init_struct.set_size(size2).unwrap();
assert_eq!(init_struct.current_size(), size2);
// Grow again
let size3 = 20000;
init_struct.set_size(size3).unwrap();
assert_eq!(init_struct.current_size(), size3);
// Try to read it. The area that was shrunk and grown again should read as all zeros now
assert_range(data_ptr, 0xAA, 0..5000);
assert_range(data_ptr, 0, 5000..size1);
// Try to grow beyond max_size
//let size4 = max_size + 1;
//assert!(init_struct.set_size(size4).is_err());
// Dropping init_struct should unmap the memory
drop(init_struct);
Ok(())
}
/// This is used in tests to coordinate between test processes. It's like std::sync::Barrier,
/// but is stored in the shared memory area and works across processes. It's implemented by
/// polling, because e.g. standard rust mutexes are not guaranteed to work across processes.
struct SimpleBarrier {
num_procs: usize,
count: AtomicUsize,
}
impl SimpleBarrier {
unsafe fn init(ptr: *mut SimpleBarrier, num_procs: usize) {
unsafe {
*ptr = SimpleBarrier {
num_procs,
count: AtomicUsize::new(0),
}
}
}
pub fn wait(&self) {
let old = self.count.fetch_add(1, Ordering::Relaxed);
let generation = old / self.num_procs;
let mut current = old + 1;
while current < (generation + 1) * self.num_procs {
std::thread::sleep(std::time::Duration::from_millis(10));
current = self.count.load(Ordering::Relaxed);
}
}
}
#[test]
fn test_multi_process() {
// Initialize
let max_size = 1_000_000_000_000;
let init_struct = ShmemHandle::new("test_multi_process", 0, max_size).unwrap();
let ptr = init_struct.data_ptr.as_ptr();
// Store the SimpleBarrier in the first 1k of the area.
init_struct.set_size(10000).unwrap();
let barrier_ptr: *mut SimpleBarrier = unsafe {
ptr.add(ptr.align_offset(std::mem::align_of::<SimpleBarrier>()))
.cast()
};
unsafe { SimpleBarrier::init(barrier_ptr, 2) };
let barrier = unsafe { barrier_ptr.as_ref().unwrap() };
// Fork another test process. The code after this runs in both processes concurrently.
let fork_result = unsafe { nix::unistd::fork().unwrap() };
// In the parent, fill bytes between 1000..2000. In the child, between 2000..3000
if fork_result.is_parent() {
write_range(ptr, 0xAA, 1000..2000);
} else {
write_range(ptr, 0xBB, 2000..3000);
}
barrier.wait();
// Verify the contents. (in both processes)
assert_range(ptr, 0xAA, 1000..2000);
assert_range(ptr, 0xBB, 2000..3000);
// Grow, from the child this time
let size = 10_000_000;
if !fork_result.is_parent() {
init_struct.set_size(size).unwrap();
}
barrier.wait();
// make some writes at the end
if fork_result.is_parent() {
write_range(ptr, 0xAA, (size - 10)..size);
} else {
write_range(ptr, 0xBB, (size - 20)..(size - 10));
}
barrier.wait();
// Verify the contents. (This runs in both processes)
assert_range(ptr, 0, (size - 1000)..(size - 20));
assert_range(ptr, 0xBB, (size - 20)..(size - 10));
assert_range(ptr, 0xAA, (size - 10)..size);
if let ForkResult::Parent { child } = fork_result {
nix::sys::wait::waitpid(child, None).unwrap();
}
}
}

View File

@@ -31,7 +31,7 @@ uring-common = { workspace = true, features = ["bytes"] }
pageserver_client_grpc.workspace = true
pageserver_page_api.workspace = true
neonart.workspace = true
neon-shmem.workspace = true
utils.workspace = true
[build-dependencies]

View File

@@ -98,7 +98,7 @@ pub extern "C" fn bcomm_start_get_page_v_request<'t>(
// Check if the request can be satisfied from the cache first
let mut all_cached = true;
let read_op = bs.integrated_cache.start_read_op();
let mut read_op = bs.integrated_cache.start_read_op();
for i in 0..get_pagev_request.nblocks {
if let Some(cache_block) = read_op.get_page(
&get_pagev_request.reltag(),

View File

@@ -23,8 +23,6 @@ use std::mem;
use std::mem::MaybeUninit;
use std::os::fd::OwnedFd;
use neonart::allocator::r#static::alloc_array_from_slice;
use crate::backend_comms::NeonIOHandle;
use crate::integrated_cache::IntegratedCacheInitStruct;
@@ -133,3 +131,48 @@ pub extern "C" fn rcommunicator_shmem_init(
cis
}
// fixme: currently unused
#[allow(dead_code)]
pub fn alloc_from_slice<T>(
area: &mut [MaybeUninit<u8>],
) -> (&mut MaybeUninit<T>, &mut [MaybeUninit<u8>]) {
let layout = std::alloc::Layout::new::<T>();
let area_start = area.as_mut_ptr();
// pad to satisfy alignment requirements
let padding = area_start.align_offset(layout.align());
if padding + layout.size() > area.len() {
panic!("out of memory");
}
let area = &mut area[padding..];
let (result_area, remain) = area.split_at_mut(layout.size());
let result_ptr: *mut MaybeUninit<T> = result_area.as_mut_ptr().cast();
let result = unsafe { result_ptr.as_mut().unwrap() };
(result, remain)
}
pub fn alloc_array_from_slice<T>(
area: &mut [MaybeUninit<u8>],
len: usize,
) -> (&mut [MaybeUninit<T>], &mut [MaybeUninit<u8>]) {
let layout = std::alloc::Layout::new::<T>();
let area_start = area.as_mut_ptr();
// pad to satisfy alignment requirements
let padding = area_start.align_offset(layout.align());
if padding + layout.size() * len > area.len() {
panic!("out of memory");
}
let area = &mut area[padding..];
let (result_area, remain) = area.split_at_mut(layout.size() * len);
let result_ptr: *mut MaybeUninit<T> = result_area.as_mut_ptr().cast();
let result = unsafe { std::slice::from_raw_parts_mut(result_ptr.as_mut().unwrap(), len) };
(result, remain)
}

View File

@@ -18,7 +18,7 @@
// - blocks in the file cache's file. If the file grows too large, need to evict something.
// Also if the cache is resized
//
// - entries in the cache tree. If we run out of memory in the shmem area, need to evict
// - entries in the cache map. If we run out of memory in the shmem area, need to evict
// something
//
@@ -33,90 +33,67 @@ use crate::file_cache::INVALID_CACHE_BLOCK;
use crate::file_cache::{CacheBlock, FileCache};
use pageserver_page_api::model::RelTag;
use metrics::{IntCounter, IntGauge, IntGaugeVec};
use metrics::{IntCounter, IntGauge};
use neonart;
use neonart::TreeInitStruct;
use neonart::TreeIterator;
use neonart::UpdateAction;
use neon_shmem::hash::HashMapInit;
use neon_shmem::hash::UpdateAction;
use neon_shmem::shmem::ShmemHandle;
const CACHE_AREA_SIZE: usize = 10 * 1024 * 1024;
type IntegratedCacheTreeInitStruct<'t> =
TreeInitStruct<'t, TreeKey, TreeEntry, neonart::ArtMultiSlabAllocator<'t, TreeEntry>>;
type IntegratedCacheMapInitStruct<'t> = HashMapInit<'t, MapKey, MapEntry>;
/// This struct is initialized at postmaster startup, and passed to all the processes via fork().
pub struct IntegratedCacheInitStruct<'t> {
allocator: &'t neonart::ArtMultiSlabAllocator<'t, TreeEntry>,
handle: IntegratedCacheTreeInitStruct<'t>,
map_handle: IntegratedCacheMapInitStruct<'t>,
}
/// Represents write-access to the integrated cache. This is used by the communicator process.
pub struct IntegratedCacheWriteAccess<'t> {
cache_tree: neonart::TreeWriteAccess<
't,
TreeKey,
TreeEntry,
neonart::ArtMultiSlabAllocator<'t, TreeEntry>,
>,
cache_map: neon_shmem::hash::HashMapAccess<'t, MapKey, MapEntry>,
global_lw_lsn: AtomicU64,
pub(crate) file_cache: Option<FileCache>,
// Fields for eviction
clock_hand: std::sync::Mutex<TreeIterator<TreeKey>>,
clock_hand: std::sync::Mutex<usize>,
// Metrics
page_evictions_counter: IntCounter,
clock_iterations_counter: IntCounter,
nodes_total: IntGaugeVec,
nodes_leaf_total: IntGauge,
nodes_internal4_total: IntGauge,
nodes_internal16_total: IntGauge,
nodes_internal48_total: IntGauge,
nodes_internal256_total: IntGauge,
nodes_memory_bytes: IntGaugeVec,
nodes_memory_leaf_bytes: IntGauge,
nodes_memory_internal4_bytes: IntGauge,
nodes_memory_internal16_bytes: IntGauge,
nodes_memory_internal48_bytes: IntGauge,
nodes_memory_internal256_bytes: IntGauge,
// metrics from the art tree
cache_memory_size_bytes: IntGauge,
cache_memory_used_bytes: IntGauge,
cache_tree_epoch: IntGauge,
cache_tree_oldest_epoch: IntGauge,
cache_tree_garbage_total: IntGauge,
// metrics from the hash map
cache_map_num_buckets: IntGauge,
cache_map_num_buckets_in_use: IntGauge,
}
/// Represents read-only access to the integrated cache. Backend processes have this.
pub struct IntegratedCacheReadAccess<'t> {
cache_tree: neonart::TreeReadAccess<'t, TreeKey, TreeEntry>,
cache_map: neon_shmem::hash::HashMapAccess<'t, MapKey, MapEntry>,
}
impl<'t> IntegratedCacheInitStruct<'t> {
/// Return the desired size in bytes of the shared memory area to reserve for the integrated
/// cache.
pub fn shmem_size(_max_procs: u32) -> usize {
CACHE_AREA_SIZE
// FIXME: the map uses its own ShmemHandle now. This is just for fixed-size allocations
// in the general Postgres shared memory segment.
0
}
/// Initialize the shared memory segment. This runs once in postmaster. Returns a struct which
/// will be inherited by all processes through fork.
pub fn shmem_init(
_max_procs: u32,
shmem_area: &'t mut [MaybeUninit<u8>],
_shmem_area: &'t mut [MaybeUninit<u8>],
) -> IntegratedCacheInitStruct<'t> {
let allocator = neonart::ArtMultiSlabAllocator::new(shmem_area);
let handle = IntegratedCacheTreeInitStruct::new(allocator);
let shmem_handle = ShmemHandle::new("integrated cache", 0, CACHE_AREA_SIZE).unwrap();
// Initialize the shared memory area
IntegratedCacheInitStruct { allocator, handle }
let map_handle =
neon_shmem::hash::HashMapInit::init_in_shmem(shmem_handle, CACHE_AREA_SIZE);
IntegratedCacheInitStruct { map_handle }
}
pub fn worker_process_init(
@@ -124,42 +101,14 @@ impl<'t> IntegratedCacheInitStruct<'t> {
lsn: Lsn,
file_cache: Option<FileCache>,
) -> IntegratedCacheWriteAccess<'t> {
let IntegratedCacheInitStruct {
allocator: _allocator,
handle,
} = self;
let tree_writer = handle.attach_writer();
let nodes_total = IntGaugeVec::new(
metrics::core::Opts::new("nodes_total", "Number of nodes in cache tree."),
&["node_kind"],
)
.unwrap();
let nodes_leaf_total = nodes_total.with_label_values(&["leaf"]);
let nodes_internal4_total = nodes_total.with_label_values(&["internal4"]);
let nodes_internal16_total = nodes_total.with_label_values(&["internal16"]);
let nodes_internal48_total = nodes_total.with_label_values(&["internal48"]);
let nodes_internal256_total = nodes_total.with_label_values(&["internal256"]);
let nodes_memory_bytes = IntGaugeVec::new(
metrics::core::Opts::new(
"nodes_memory_bytes",
"Memory reserved for nodes in cache tree.",
),
&["node_kind"],
)
.unwrap();
let nodes_memory_leaf_bytes = nodes_memory_bytes.with_label_values(&["leaf"]);
let nodes_memory_internal4_bytes = nodes_memory_bytes.with_label_values(&["internal4"]);
let nodes_memory_internal16_bytes = nodes_memory_bytes.with_label_values(&["internal16"]);
let nodes_memory_internal48_bytes = nodes_memory_bytes.with_label_values(&["internal48"]);
let nodes_memory_internal256_bytes = nodes_memory_bytes.with_label_values(&["internal256"]);
let IntegratedCacheInitStruct { map_handle } = self;
let map_writer = map_handle.attach_writer();
IntegratedCacheWriteAccess {
cache_tree: tree_writer,
cache_map: map_writer,
global_lw_lsn: AtomicU64::new(lsn.0),
file_cache,
clock_hand: std::sync::Mutex::new(TreeIterator::new_wrapping()),
clock_hand: std::sync::Mutex::new(0),
page_evictions_counter: metrics::IntCounter::new(
"integrated_cache_evictions",
@@ -173,64 +122,31 @@ impl<'t> IntegratedCacheInitStruct<'t> {
)
.unwrap(),
nodes_total,
nodes_leaf_total,
nodes_internal4_total,
nodes_internal16_total,
nodes_internal48_total,
nodes_internal256_total,
nodes_memory_bytes,
nodes_memory_leaf_bytes,
nodes_memory_internal4_bytes,
nodes_memory_internal16_bytes,
nodes_memory_internal48_bytes,
nodes_memory_internal256_bytes,
cache_memory_size_bytes: metrics::IntGauge::new(
"cache_memory_size_bytes",
"Memory reserved for cache metadata",
cache_map_num_buckets: metrics::IntGauge::new(
"cache_num_map_buckets",
"Allocated size of the cache hash map",
)
.unwrap(),
cache_memory_used_bytes: metrics::IntGauge::new(
"cache_memory_size_bytes",
"Memory used for cache metadata",
)
.unwrap(),
cache_tree_epoch: metrics::IntGauge::new(
"cache_tree_epoch",
"Current epoch of the cache tree",
)
.unwrap(),
cache_tree_oldest_epoch: metrics::IntGauge::new(
"cache_tree_oldest_epoch",
"Oldest active epoch of the cache tree",
)
.unwrap(),
cache_tree_garbage_total: metrics::IntGauge::new(
"cache_tree_garbage_total",
"Number of obsoleted nodes in cache tree pending GC",
cache_map_num_buckets_in_use: metrics::IntGauge::new(
"cache_num_map_buckets_in_use",
"Number of buckets in use in the cache hash map",
)
.unwrap(),
}
}
pub fn backend_init(self) -> IntegratedCacheReadAccess<'t> {
let IntegratedCacheInitStruct {
allocator: _allocator,
handle,
} = self;
let IntegratedCacheInitStruct { map_handle } = self;
let tree_reader = handle.attach_reader();
let map_reader = map_handle.attach_reader();
IntegratedCacheReadAccess {
cache_tree: tree_reader,
cache_map: map_reader,
}
}
}
enum TreeEntry {
enum MapEntry {
Rel(RelEntry),
Block(BlockEntry),
}
@@ -239,7 +155,7 @@ struct BlockEntry {
lw_lsn: AtomicLsn,
cache_block: AtomicU64,
pinned: AtomicBool,
pinned: AtomicU64,
// 'referenced' bit for the clock algorithm
referenced: AtomicBool,
@@ -251,14 +167,14 @@ struct RelEntry {
nblocks: AtomicU32,
}
impl std::fmt::Debug for TreeEntry {
impl std::fmt::Debug for MapEntry {
fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
match self {
TreeEntry::Rel(e) => fmt
MapEntry::Rel(e) => fmt
.debug_struct("Rel")
.field("nblocks", &e.nblocks.load(Ordering::Relaxed))
.finish(),
TreeEntry::Block(e) => fmt
MapEntry::Block(e) => fmt
.debug_struct("Block")
.field("lw_lsn", &e.lw_lsn.load())
.field("cache_block", &e.cache_block.load(Ordering::Relaxed))
@@ -275,37 +191,42 @@ impl std::fmt::Debug for TreeEntry {
PartialEq,
PartialOrd,
Eq,
Hash,
Ord,
zerocopy_derive::IntoBytes,
zerocopy_derive::Immutable,
zerocopy_derive::FromBytes,
)]
#[repr(packed)]
// Note: the fields are stored in big-endian order, to make the radix tree more
// efficient, and to make scans over ranges of blocks work correctly.
struct TreeKey {
// Note: the fields are stored in big-endian order. If we used the keys in a radix tree, that would
// make pack the tree more tightly, and would make scans over ranges of blocks work correctly,
// i.e. return the entries in block number order. XXX: We currently use a hash map though, so it
// doesn't matter.
struct MapKey {
spc_oid_be: u32,
db_oid_be: u32,
rel_number_be: u32,
fork_number: u8,
block_number_be: u32,
}
impl<'a> From<&'a [u8]> for TreeKey {
impl<'a> From<&'a [u8]> for MapKey {
fn from(bytes: &'a [u8]) -> Self {
Self::read_from_bytes(bytes).expect("invalid key length")
}
}
fn key_range_for_rel_blocks(rel: &RelTag) -> Range<TreeKey> {
// fixme: currently unused
#[allow(dead_code)]
fn key_range_for_rel_blocks(rel: &RelTag) -> Range<MapKey> {
Range {
start: TreeKey::from((rel, 0)),
end: TreeKey::from((rel, u32::MAX)),
start: MapKey::from((rel, 0)),
end: MapKey::from((rel, u32::MAX)),
}
}
impl From<&RelTag> for TreeKey {
fn from(val: &RelTag) -> TreeKey {
TreeKey {
impl From<&RelTag> for MapKey {
fn from(val: &RelTag) -> MapKey {
MapKey {
spc_oid_be: val.spc_oid.to_be(),
db_oid_be: val.db_oid.to_be(),
rel_number_be: val.rel_number.to_be(),
@@ -315,9 +236,9 @@ impl From<&RelTag> for TreeKey {
}
}
impl From<(&RelTag, u32)> for TreeKey {
fn from(val: (&RelTag, u32)) -> TreeKey {
TreeKey {
impl From<(&RelTag, u32)> for MapKey {
fn from(val: (&RelTag, u32)) -> MapKey {
MapKey {
spc_oid_be: val.0.spc_oid.to_be(),
db_oid_be: val.0.db_oid.to_be(),
rel_number_be: val.0.rel_number.to_be(),
@@ -327,7 +248,7 @@ impl From<(&RelTag, u32)> for TreeKey {
}
}
impl neonart::Key for TreeKey {
impl neon_shmem::hash::Key for MapKey {
const KEY_LEN: usize = 4 + 4 + 4 + 1 + 4;
fn as_bytes(&self) -> &[u8] {
@@ -335,7 +256,7 @@ impl neonart::Key for TreeKey {
}
}
impl neonart::Value for TreeEntry {}
impl neon_shmem::hash::Value for MapEntry {}
/// Return type used in the cache's get_*() functions. 'Found' means that the page, or other
/// information that was enqueried, exists in the cache. '
@@ -351,8 +272,7 @@ pub enum CacheResult<V> {
impl<'t> IntegratedCacheWriteAccess<'t> {
pub fn get_rel_size(&'t self, rel: &RelTag) -> CacheResult<u32> {
let r = self.cache_tree.start_read();
if let Some(nblocks) = get_rel_size(&r, rel) {
if let Some(nblocks) = get_rel_size(&self.cache_map, rel) {
CacheResult::Found(nblocks)
} else {
let lsn = Lsn(self.global_lw_lsn.load(Ordering::Relaxed));
@@ -366,31 +286,39 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
block_number: u32,
dst: impl uring_common::buf::IoBufMut + Send + Sync,
) -> Result<CacheResult<()>, std::io::Error> {
let r = self.cache_tree.start_read();
if let Some(block_tree_entry) = r.get(&TreeKey::from((rel, block_number))) {
let block_entry = if let TreeEntry::Block(e) = block_tree_entry {
let x = if let Some(entry) =
self.cache_map.get(&MapKey::from((rel, block_number)))
{
let block_entry = if let MapEntry::Block(e) = &*entry {
e
} else {
panic!("unexpected tree entry type for block key");
panic!("unexpected map entry type for block key");
};
block_entry.referenced.store(true, Ordering::Relaxed);
let cache_block = block_entry.cache_block.load(Ordering::Relaxed);
if cache_block != INVALID_CACHE_BLOCK {
self.file_cache
.as_ref()
.unwrap()
.read_block(cache_block, dst)
.await?;
// pin it and release lock
block_entry.pinned.fetch_add(1, Ordering::Relaxed);
Ok(CacheResult::Found(()))
(cache_block, DeferredUnpin(block_entry.pinned.as_ptr()))
} else {
Ok(CacheResult::NotFound(block_entry.lw_lsn.load()))
return Ok(CacheResult::NotFound(block_entry.lw_lsn.load()));
}
} else {
let lsn = Lsn(self.global_lw_lsn.load(Ordering::Relaxed));
Ok(CacheResult::NotFound(lsn))
}
return Ok(CacheResult::NotFound(lsn));
};
let (cache_block, _deferred_pin) = x;
self.file_cache
.as_ref()
.unwrap()
.read_block(cache_block, dst)
.await?;
// unpin the entry (by implicitly dropping deferred_pin)
Ok(CacheResult::Found(()))
}
pub async fn page_is_cached(
@@ -398,12 +326,11 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
rel: &RelTag,
block_number: u32,
) -> Result<CacheResult<()>, std::io::Error> {
let r = self.cache_tree.start_read();
if let Some(block_tree_entry) = r.get(&TreeKey::from((rel, block_number))) {
let block_entry = if let TreeEntry::Block(e) = block_tree_entry {
if let Some(entry) = self.cache_map.get(&MapKey::from((rel, block_number))) {
let block_entry = if let MapEntry::Block(e) = &*entry {
e
} else {
panic!("unexpected tree entry type for block key");
panic!("unexpected map entry type for block key");
};
// This is used for prefetch requests. Treat the probe as an 'access', to keep it
@@ -427,8 +354,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
/// information, i.e. we don't know if the relation exists or not.
pub fn get_rel_exists(&'t self, rel: &RelTag) -> CacheResult<bool> {
// we don't currently cache negative entries, so if the relation is in the cache, it exists
let r = self.cache_tree.start_read();
if let Some(_rel_entry) = r.get(&TreeKey::from(rel)) {
if let Some(_rel_entry) = self.cache_map.get(&MapKey::from(rel)) {
CacheResult::Found(true)
} else {
let lsn = Lsn(self.global_lw_lsn.load(Ordering::Relaxed));
@@ -447,21 +373,22 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
}
pub fn remember_rel_size(&'t self, rel: &RelTag, nblocks: u32) {
let w = self.cache_tree.start_write();
let result = w.update_with_fn(&TreeKey::from(rel), |existing| match existing {
None => {
tracing::info!("inserting rel entry for {rel:?}, {nblocks} blocks");
UpdateAction::Insert(TreeEntry::Rel(RelEntry {
nblocks: AtomicU32::new(nblocks),
}))
}
Some(TreeEntry::Block(_)) => panic!("unexpected tree entry type for rel key"),
Some(TreeEntry::Rel(e)) => {
tracing::info!("updating rel entry for {rel:?}, {nblocks} blocks");
e.nblocks.store(nblocks, Ordering::Relaxed);
UpdateAction::Nothing
}
});
let result = self
.cache_map
.update_with_fn(&MapKey::from(rel), |existing| match existing {
None => {
tracing::info!("inserting rel entry for {rel:?}, {nblocks} blocks");
UpdateAction::Insert(MapEntry::Rel(RelEntry {
nblocks: AtomicU32::new(nblocks),
}))
}
Some(MapEntry::Block(_)) => panic!("unexpected map entry type for rel key"),
Some(MapEntry::Rel(e)) => {
tracing::info!("updating rel entry for {rel:?}, {nblocks} blocks");
e.nblocks.store(nblocks, Ordering::Relaxed);
UpdateAction::Nothing
}
});
// FIXME: what to do if we run out of memory? Evict other relation entries? Remove
// block entries first?
@@ -477,7 +404,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
lw_lsn: Lsn,
is_write: bool,
) {
let key = TreeKey::from((rel, block_number));
let key = MapKey::from((rel, block_number));
// FIXME: make this work when file cache is disabled. Or make it mandatory
let file_cache = self.file_cache.as_ref().unwrap();
@@ -488,26 +415,26 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
// regular POSIX filesystem read() and write()
// First check if we have a block in cache already
let w = self.cache_tree.start_write();
let mut old_cache_block = None;
let mut found_existing = false;
let res = w.update_with_fn(&key, |existing| {
let res = self.cache_map.update_with_fn(&key, |existing| {
if let Some(existing) = existing {
let block_entry = if let TreeEntry::Block(e) = existing {
let block_entry = if let MapEntry::Block(e) = existing {
e
} else {
panic!("unexpected tree entry type for block key");
panic!("unexpected map entry type for block key");
};
found_existing = true;
// Prevent this entry from being evicted
let was_pinned = block_entry.pinned.swap(true, Ordering::Relaxed);
if was_pinned {
let pin_count = block_entry.pinned.fetch_add(1, Ordering::Relaxed);
if pin_count > 0 {
// this is unexpected, because the caller has obtained the io-in-progress lock,
// so no one else should try to modify the page at the same time.
// XXX: and I think a read should not be happening either, because the postgres
// buffer is held locked. TODO: check these conditions and tidy this up a little. Seems fragile to just panic.
panic!("block entry was unexpectedly pinned");
}
@@ -547,14 +474,13 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
// FIXME: unpin the block entry on error
// Update the block entry
let w = self.cache_tree.start_write();
let res = w.update_with_fn(&key, |existing| {
let res = self.cache_map.update_with_fn(&key, |existing| {
assert_eq!(found_existing, existing.is_some());
if let Some(existing) = existing {
let block_entry = if let TreeEntry::Block(e) = existing {
let block_entry = if let MapEntry::Block(e) = existing {
e
} else {
panic!("unexpected tree entry type for block key");
panic!("unexpected map entry type for block key");
};
// Update the cache block
@@ -570,14 +496,14 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
block_entry.referenced.store(true, Ordering::Relaxed);
let was_pinned = block_entry.pinned.swap(false, Ordering::Relaxed);
assert!(was_pinned);
let pin_count = block_entry.pinned.fetch_sub(1, Ordering::Relaxed);
assert!(pin_count > 0);
UpdateAction::Nothing
} else {
UpdateAction::Insert(TreeEntry::Block(BlockEntry {
UpdateAction::Insert(MapEntry::Block(BlockEntry {
lw_lsn: AtomicLsn::new(lw_lsn.0),
cache_block: AtomicU64::new(cache_block),
pinned: AtomicBool::new(false),
pinned: AtomicU64::new(0),
referenced: AtomicBool::new(true),
}))
}
@@ -612,17 +538,16 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
.expect("error writing to cache");
// FIXME: handle errors gracefully.
let w = self.cache_tree.start_write();
let res = w.update_with_fn(&key, |existing| {
let res = self.cache_map.update_with_fn(&key, |existing| {
if let Some(existing) = existing {
let block_entry = if let TreeEntry::Block(e) = existing {
let block_entry = if let MapEntry::Block(e) = existing {
e
} else {
panic!("unexpected tree entry type for block key");
panic!("unexpected map entry type for block key");
};
assert!(!block_entry.pinned.load(Ordering::Relaxed));
// FIXME: could there be concurrent readers?
assert!(block_entry.pinned.load(Ordering::Relaxed) == 0);
let old_cache_block = block_entry.cache_block.swap(cache_block, Ordering::Relaxed);
if old_cache_block != INVALID_CACHE_BLOCK {
@@ -630,10 +555,10 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
}
UpdateAction::Nothing
} else {
UpdateAction::Insert(TreeEntry::Block(BlockEntry {
UpdateAction::Insert(MapEntry::Block(BlockEntry {
lw_lsn: AtomicLsn::new(lw_lsn.0),
cache_block: AtomicU64::new(cache_block),
pinned: AtomicBool::new(false),
pinned: AtomicU64::new(0),
referenced: AtomicBool::new(true),
}))
}
@@ -648,47 +573,50 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
/// Forget information about given relation in the cache. (For DROP TABLE and such)
pub fn forget_rel(&'t self, rel: &RelTag) {
tracing::info!("forgetting rel entry for {rel:?}");
let w = self.cache_tree.start_write();
w.remove(&TreeKey::from(rel));
self.cache_map.remove(&MapKey::from(rel));
// also forget all cached blocks for the relation
let mut iter = TreeIterator::new(&key_range_for_rel_blocks(rel));
let r = self.cache_tree.start_read();
while let Some((k, _v)) = iter.next(&r) {
let w = self.cache_tree.start_write();
// FIXME
/*
let mut iter = MapIterator::new(&key_range_for_rel_blocks(rel));
let r = self.cache_tree.start_read();
while let Some((k, _v)) = iter.next(&r) {
let w = self.cache_tree.start_write();
let mut evicted_cache_block = None;
let mut evicted_cache_block = None;
let res = w.update_with_fn(&k, |e| {
if let Some(e) = e {
let block_entry = if let TreeEntry::Block(e) = e {
e
let res = w.update_with_fn(&k, |e| {
if let Some(e) = e {
let block_entry = if let MapEntry::Block(e) = e {
e
} else {
panic!("unexpected map entry type for block key");
};
let cache_block = block_entry
.cache_block
.swap(INVALID_CACHE_BLOCK, Ordering::Relaxed);
if cache_block != INVALID_CACHE_BLOCK {
evicted_cache_block = Some(cache_block);
}
UpdateAction::Remove
} else {
panic!("unexpected tree entry type for block key");
};
let cache_block = block_entry
.cache_block
.swap(INVALID_CACHE_BLOCK, Ordering::Relaxed);
if cache_block != INVALID_CACHE_BLOCK {
evicted_cache_block = Some(cache_block);
UpdateAction::Nothing
}
UpdateAction::Remove
} else {
UpdateAction::Nothing
});
// FIXME: It's pretty surprising to run out of memory while removing. But
// maybe it can happen because of trying to shrink a node?
res.expect("out of memory");
if let Some(evicted_cache_block) = evicted_cache_block {
self.file_cache
.as_ref()
.unwrap()
.dealloc_block(evicted_cache_block);
}
});
// FIXME: It's pretty surprising to run out of memory while removing. But
// maybe it can happen because of trying to shrink a node?
res.expect("out of memory");
if let Some(evicted_cache_block) = evicted_cache_block {
self.file_cache
.as_ref()
.unwrap()
.dealloc_block(evicted_cache_block);
}
}
*/
}
// Maintenance routines
@@ -699,147 +627,109 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
pub fn try_evict_one_cache_block(&self) -> Option<CacheBlock> {
let mut clock_hand = self.clock_hand.lock().unwrap();
for _ in 0..100 {
let r = self.cache_tree.start_read();
self.clock_iterations_counter.inc();
match clock_hand.next(&r) {
(*clock_hand) += 1;
let mut evict_this = false;
let num_buckets = self.cache_map.get_num_buckets();
match self
.cache_map
.get_bucket((*clock_hand) % num_buckets)
.as_deref()
{
None => {
// The cache is completely empty. Pretty unexpected that this function
// was called then..
break;
// This bucket was unused
}
Some((_k, TreeEntry::Rel(_))) => {
Some(MapEntry::Rel(_)) => {
// ignore rel entries for now.
// TODO: They stick in the cache forever
}
Some((k, TreeEntry::Block(blk_entry))) => {
Some(MapEntry::Block(blk_entry)) => {
if !blk_entry.referenced.swap(false, Ordering::Relaxed) {
// Evict this. Maybe.
let w = self.cache_tree.start_write();
let mut evicted_cache_block = None;
let res = w.update_with_fn(&k, |old| {
match old {
None => UpdateAction::Nothing,
Some(TreeEntry::Rel(_)) => panic!("unexpected Rel entry"),
Some(TreeEntry::Block(old)) => {
// note: all the accesses to 'pinned' currently happen
// within update_with_fn(), which protects from concurrent
// updates. Otherwise, another thread could set the 'pinned'
// flag just after we have checked it here.
if blk_entry.pinned.load(Ordering::Relaxed) {
return UpdateAction::Nothing;
}
let _ = self
.global_lw_lsn
.fetch_max(old.lw_lsn.load().0, Ordering::Relaxed);
let cache_block = old
.cache_block
.swap(INVALID_CACHE_BLOCK, Ordering::Relaxed);
if cache_block != INVALID_CACHE_BLOCK {
evicted_cache_block = Some(cache_block);
}
// TODO: we don't evict the entry, just the block. Does it make
// sense to keep the entry?
UpdateAction::Nothing
}
}
});
// FIXME: what to do if we run out of memory? Evict other relation entries? Remove
// block entries first? It probably shouldn't happen here, as we're not
// actually updating the tree.
res.expect("out of memory");
if evicted_cache_block.is_some() {
self.page_evictions_counter.inc();
return evicted_cache_block;
}
evict_this = true;
}
}
};
if evict_this {
// grab the write lock
let mut evicted_cache_block = None;
let res =
self.cache_map
.update_with_fn_at_bucket(*clock_hand % num_buckets, |old| {
match old {
None => UpdateAction::Nothing,
Some(MapEntry::Rel(_)) => panic!("unexpected Rel entry"),
Some(MapEntry::Block(old)) => {
// note: all the accesses to 'pinned' currently happen
// within update_with_fn(), or while holding ValueReadGuard, which protects from concurrent
// updates. Otherwise, another thread could set the 'pinned'
// flag just after we have checked it here.
if old.pinned.load(Ordering::Relaxed) != 0 {
return UpdateAction::Nothing;
}
let _ = self
.global_lw_lsn
.fetch_max(old.lw_lsn.load().0, Ordering::Relaxed);
let cache_block = old
.cache_block
.swap(INVALID_CACHE_BLOCK, Ordering::Relaxed);
if cache_block != INVALID_CACHE_BLOCK {
evicted_cache_block = Some(cache_block);
}
// TODO: we don't evict the entry, just the block. Does it make
// sense to keep the entry?
UpdateAction::Nothing
}
}
});
// Out of memory should not happen here, as we're only updating existing values,
// not inserting new entries to the map.
res.expect("out of memory");
if evicted_cache_block.is_some() {
self.page_evictions_counter.inc();
return evicted_cache_block;
}
}
}
// Give up if we didn't find anything
None
}
pub fn dump_tree(&self, dst: &mut dyn std::io::Write) {
self.cache_tree.start_read().dump(dst);
pub fn dump_map(&self, _dst: &mut dyn std::io::Write) {
//FIXME self.cache_map.start_read().dump(dst);
}
}
impl metrics::core::Collector for IntegratedCacheWriteAccess<'_> {
fn desc(&self) -> Vec<&metrics::core::Desc> {
let mut descs = Vec::new();
descs.append(&mut self.nodes_total.desc());
descs.append(&mut self.nodes_memory_bytes.desc());
descs.append(&mut self.page_evictions_counter.desc());
descs.append(&mut self.clock_iterations_counter.desc());
descs.append(&mut self.cache_memory_size_bytes.desc());
descs.append(&mut self.cache_memory_used_bytes.desc());
descs.append(&mut self.cache_tree_epoch.desc());
descs.append(&mut self.cache_tree_oldest_epoch.desc());
descs.append(&mut self.cache_tree_garbage_total.desc());
descs.append(&mut self.cache_map_num_buckets.desc());
descs.append(&mut self.cache_map_num_buckets_in_use.desc());
descs
}
fn collect(&self) -> Vec<metrics::proto::MetricFamily> {
const ALLOC_BLOCK_SIZE: i64 = neonart::allocator::block::BLOCK_SIZE as i64;
// Update gauges
let art_statistics = self.cache_tree.get_statistics();
self.nodes_leaf_total
.set(art_statistics.slabs.num_leaf as i64);
self.nodes_internal4_total
.set(art_statistics.slabs.num_internal4 as i64);
self.nodes_internal16_total
.set(art_statistics.slabs.num_internal16 as i64);
self.nodes_internal48_total
.set(art_statistics.slabs.num_internal48 as i64);
self.nodes_internal256_total
.set(art_statistics.slabs.num_internal256 as i64);
self.nodes_memory_leaf_bytes
.set(art_statistics.slabs.num_blocks_leaf as i64 * ALLOC_BLOCK_SIZE);
self.nodes_memory_internal4_bytes
.set(art_statistics.slabs.num_blocks_internal4 as i64 * ALLOC_BLOCK_SIZE);
self.nodes_memory_internal16_bytes
.set(art_statistics.slabs.num_blocks_internal16 as i64 * ALLOC_BLOCK_SIZE);
self.nodes_memory_internal48_bytes
.set(art_statistics.slabs.num_blocks_internal48 as i64 * ALLOC_BLOCK_SIZE);
self.nodes_memory_internal256_bytes
.set(art_statistics.slabs.num_blocks_internal256 as i64 * ALLOC_BLOCK_SIZE);
let block_statistics = &art_statistics.blocks;
self.cache_memory_size_bytes
.set(block_statistics.num_blocks as i64 * ALLOC_BLOCK_SIZE as i64);
self.cache_memory_used_bytes.set(
(block_statistics.num_initialized as i64 - block_statistics.num_free_blocks as i64)
* ALLOC_BLOCK_SIZE as i64,
);
self.cache_tree_epoch.set(art_statistics.epoch as i64);
self.cache_tree_oldest_epoch
.set(art_statistics.oldest_epoch as i64);
self.cache_tree_garbage_total
.set(art_statistics.num_garbage as i64);
self.cache_map_num_buckets
.set(self.cache_map.get_num_buckets() as i64);
self.cache_map_num_buckets_in_use
.set(self.cache_map.get_num_buckets_in_use() as i64);
let mut values = Vec::new();
values.append(&mut self.nodes_total.collect());
values.append(&mut self.nodes_memory_bytes.collect());
values.append(&mut self.page_evictions_counter.collect());
values.append(&mut self.clock_iterations_counter.collect());
values.append(&mut self.cache_memory_size_bytes.collect());
values.append(&mut self.cache_memory_used_bytes.collect());
values.append(&mut self.cache_tree_epoch.collect());
values.append(&mut self.cache_tree_oldest_epoch.collect());
values.append(&mut self.cache_tree_garbage_total.collect());
values.append(&mut self.cache_map_num_buckets.collect());
values.append(&mut self.cache_map_num_buckets_in_use.collect());
values
}
@@ -849,12 +739,15 @@ impl metrics::core::Collector for IntegratedCacheWriteAccess<'_> {
///
/// This is in a separate function so that it can be shared by
/// IntegratedCacheReadAccess::get_rel_size() and IntegratedCacheWriteAccess::get_rel_size()
fn get_rel_size<'t>(r: &neonart::TreeReadGuard<TreeKey, TreeEntry>, rel: &RelTag) -> Option<u32> {
if let Some(existing) = r.get(&TreeKey::from(rel)) {
let rel_entry = if let TreeEntry::Rel(e) = existing {
fn get_rel_size<'t>(
r: &neon_shmem::hash::HashMapAccess<MapKey, MapEntry>,
rel: &RelTag,
) -> Option<u32> {
if let Some(existing) = r.get(&MapKey::from(rel)) {
let rel_entry = if let MapEntry::Rel(ref e) = *existing {
e
} else {
panic!("unexpected tree entry type for rel key");
panic!("unexpected map entry type for rel key");
};
let nblocks = rel_entry.nblocks.load(Ordering::Relaxed);
@@ -874,17 +767,20 @@ fn get_rel_size<'t>(r: &neonart::TreeReadGuard<TreeKey, TreeEntry>, rel: &RelTag
/// request to the communicator process.
impl<'t> IntegratedCacheReadAccess<'t> {
pub fn get_rel_size(&'t self, rel: &RelTag) -> Option<u32> {
get_rel_size(&self.cache_tree.start_read(), rel)
get_rel_size(&self.cache_map, rel)
}
pub fn start_read_op(&'t self) -> BackendCacheReadOp<'t> {
let r = self.cache_tree.start_read();
BackendCacheReadOp { read_guard: r }
BackendCacheReadOp {
read_guards: Vec::new(),
map_access: self,
}
}
}
pub struct BackendCacheReadOp<'t> {
read_guard: neonart::TreeReadGuard<'t, TreeKey, TreeEntry>,
read_guards: Vec<DeferredUnpin>,
map_access: &'t IntegratedCacheReadAccess<'t>,
}
impl<'e> BackendCacheReadOp<'e> {
@@ -896,17 +792,24 @@ impl<'e> BackendCacheReadOp<'e> {
/// read. It's possible that while you are performing the read, the cache block is invalidated.
/// After you have completed the read, call BackendCacheReadResult::finish() to check if the
/// read was in fact valid or not. If it was concurrently invalidated, you need to retry.
pub fn get_page(&self, rel: &RelTag, block_number: u32) -> Option<u64> {
if let Some(block_tree_entry) = self.read_guard.get(&TreeKey::from((rel, block_number))) {
let block_entry = if let TreeEntry::Block(e) = block_tree_entry {
pub fn get_page(&mut self, rel: &RelTag, block_number: u32) -> Option<u64> {
if let Some(entry) = self
.map_access
.cache_map
.get(&MapKey::from((rel, block_number)))
{
let block_entry = if let MapEntry::Block(ref e) = *entry {
e
} else {
panic!("unexpected tree entry type for block key");
panic!("unexpected map entry type for block key");
};
block_entry.referenced.store(true, Ordering::Relaxed);
let cache_block = block_entry.cache_block.load(Ordering::Relaxed);
if cache_block != INVALID_CACHE_BLOCK {
block_entry.pinned.fetch_add(1, Ordering::Relaxed);
self.read_guards
.push(DeferredUnpin(block_entry.pinned.as_ptr()));
Some(cache_block)
} else {
None
@@ -917,10 +820,27 @@ impl<'e> BackendCacheReadOp<'e> {
}
pub fn finish(self) -> bool {
// TODO: currently, we use a spinlock to protect the in-memory tree, so concurrent
// invalidations are not possible. But the plan is to switch to optimistic locking,
// and once we do that, this would return 'false' if the optimistic locking failed and
// you need to retry.
// TODO: currently, we hold a pin on the in-memory map, so concurrent invalidations are not
// possible. But if we switch to optimistic locking, this would return 'false' if the
// optimistic locking failed and you need to retry.
true
}
}
/// A hack to decrement an AtomicU64 on drop. This is used to decrement the pin count
/// of a BlockEntry. The safety depends on the fact that the BlockEntry is not evicted
/// or moved while it's pinned.
struct DeferredUnpin(*mut u64);
unsafe impl Sync for DeferredUnpin {}
unsafe impl Send for DeferredUnpin {}
impl Drop for DeferredUnpin {
fn drop(&mut self) {
// unpin it
unsafe {
let pin_ref = AtomicU64::from_ptr(self.0);
pin_ref.fetch_sub(1, Ordering::Relaxed);
}
}
}

View File

@@ -19,7 +19,7 @@ impl<'a> CommunicatorWorkerProcessStruct<'a> {
use axum::routing::get;
let app = Router::new()
.route("/metrics", get(get_metrics))
.route("/dump_cache_tree", get(dump_cache_tree))
.route("/dump_cache_map", get(dump_cache_map))
.with_state(self);
// TODO: make configurable. Or listen on unix domain socket?
@@ -34,11 +34,11 @@ impl<'a> CommunicatorWorkerProcessStruct<'a> {
}
}
async fn dump_cache_tree(
async fn dump_cache_map(
State(state): State<&CommunicatorWorkerProcessStruct<'static>>,
) -> Response {
let mut buf: Vec<u8> = Vec::new();
state.cache.dump_tree(&mut buf);
state.cache.dump_map(&mut buf);
Response::builder()
.status(StatusCode::OK)