Implement growing the hash table. Fix unit tests.

This commit is contained in:
Heikki Linnakangas
2025-05-29 15:54:55 +03:00
parent b3c25418a6
commit f06bb2bbd8
14 changed files with 391 additions and 193 deletions

View File

@@ -32,15 +32,14 @@ pub enum UpdateAction<V> {
#[derive(Debug)]
pub struct OutOfMemoryError();
pub struct HashMapInit<'a, K, V>
{
pub struct HashMapInit<'a, K, V> {
// Hash table can be allocated in a fixed memory area, or in a resizeable ShmemHandle.
shmem: Option<ShmemHandle>,
shmem_handle: Option<ShmemHandle>,
shared_ptr: *mut HashMapShared<'a, K, V>,
}
pub struct HashMapAccess<'a, K, V> {
_shmem: Option<ShmemHandle>,
shmem_handle: Option<ShmemHandle>,
shared_ptr: *mut HashMapShared<'a, K, V>,
}
@@ -50,7 +49,7 @@ unsafe impl<'a, K: Send, V: Send> Send for HashMapAccess<'a, K, V> {}
impl<'a, K, V> HashMapInit<'a, K, V> {
pub fn attach_writer(self) -> HashMapAccess<'a, K, V> {
HashMapAccess {
_shmem: self.shmem,
shmem_handle: self.shmem_handle,
shared_ptr: self.shared_ptr,
}
}
@@ -62,20 +61,23 @@ impl<'a, K, V> HashMapInit<'a, K, V> {
}
// This is stored in the shared memory area
struct HashMapShared<'a, K, V>
{
struct HashMapShared<'a, K, V> {
inner: spin::RwLock<CoreHashMap<'a, K, V>>,
}
impl<'a, K, V> HashMapInit<'a, K, V>
where K: Clone + Hash + Eq,
where
K: Clone + Hash + Eq,
{
pub fn estimate_size(num_buckets: u32) -> usize {
// add some margin to cover alignment etc.
CoreHashMap::<K, V>::estimate_size(num_buckets) + size_of::<HashMapShared<K, V>>() + 1000
}
pub fn init_in_fixed_area(num_buckets: u32, area: &'a mut [MaybeUninit<u8>]) -> HashMapInit<'a, K, V> {
pub fn init_in_fixed_area(
num_buckets: u32,
area: &'a mut [MaybeUninit<u8>],
) -> HashMapInit<'a, K, V> {
Self::init_common(num_buckets, None, area.as_mut_ptr().cast(), area.len())
}
@@ -90,7 +92,12 @@ where K: Clone + Hash + Eq,
Self::init_common(num_buckets, Some(shmem), ptr, size)
}
fn init_common(num_buckets: u32, shmem_handle: Option<ShmemHandle>, area_ptr: *mut u8, area_len: usize) -> HashMapInit<'a, K, V> {
fn init_common(
num_buckets: u32,
shmem_handle: Option<ShmemHandle>,
area_ptr: *mut u8,
area_len: usize,
) -> HashMapInit<'a, K, V> {
// carve out HashMapShared from the area. This does not include the hashmap's dictionary
// and buckets.
let mut ptr: *mut u8 = area_ptr;
@@ -100,10 +107,7 @@ where K: Clone + Hash + Eq,
// the rest of the space is given to the hash map's dictionary and buckets
let remaining_area = unsafe {
std::slice::from_raw_parts_mut(
ptr,
area_len - ptr.offset_from(area_ptr) as usize,
)
std::slice::from_raw_parts_mut(ptr, area_len - ptr.offset_from(area_ptr) as usize)
};
let hashmap = CoreHashMap::new(num_buckets, remaining_area);
@@ -117,15 +121,15 @@ where K: Clone + Hash + Eq,
}
HashMapInit {
shmem: shmem_handle,
shmem_handle: shmem_handle,
shared_ptr,
}
}
}
impl<'a, K, V> HashMapAccess<'a, K, V>
where K: Clone + Hash + Eq,
where
K: Clone + Hash + Eq,
{
pub fn get<'e>(&'e self, key: &K) -> Option<ValueReadGuard<'e, K, V>> {
let map = unsafe { self.shared_ptr.as_ref() }.unwrap();
@@ -254,6 +258,95 @@ impl<'a, K, V> HashMapAccess<'a, K, V>
let map = unsafe { self.shared_ptr.as_ref() }.unwrap();
map.inner.read().buckets_in_use as usize
}
/// Grow
///
/// 1. grow the underlying shared memory area
/// 2. Initialize new buckets. This overwrites the current dictionary
/// 3. Recalculate the dictionary
pub fn grow(&self, num_buckets: u32) -> Result<(), crate::shmem::Error> {
let map = unsafe { self.shared_ptr.as_ref() }.unwrap();
let mut lock_guard = map.inner.write();
let inner = &mut *lock_guard;
let old_num_buckets = inner.buckets.len() as u32;
if num_buckets < old_num_buckets {
panic!("grow called with a smaller number of buckets");
}
if num_buckets == old_num_buckets {
return Ok(());
}
let shmem_handle = self
.shmem_handle
.as_ref()
.expect("grow called on a fixed-size hash table");
let size_bytes = HashMapInit::<K, V>::estimate_size(num_buckets);
shmem_handle.set_size(size_bytes)?;
let end_ptr: *mut u8 = unsafe { shmem_handle.data_ptr.as_ptr().add(size_bytes) };
// Initialize new buckets. The new buckets are linked to the free list. NB: This overwrites
// the dictionary!
let buckets_ptr = inner.buckets.as_mut_ptr();
unsafe {
for i in old_num_buckets..num_buckets {
let bucket_ptr = buckets_ptr.add(i as usize);
bucket_ptr.write(core::Bucket {
hash: 0,
next: if i < num_buckets {
i as u32 + 1
} else {
inner.free_head
},
inner: None,
});
}
}
// Recalculate the dictionary
let buckets;
let dictionary;
unsafe {
let buckets_end_ptr = buckets_ptr.add(num_buckets as usize);
let dictionary_ptr: *mut u32 = buckets_end_ptr
.byte_add(buckets_end_ptr.align_offset(align_of::<u32>()))
.cast();
let dictionary_size: usize =
end_ptr.byte_offset_from(buckets_end_ptr) as usize / size_of::<u32>();
buckets = std::slice::from_raw_parts_mut(buckets_ptr, num_buckets as usize);
dictionary = std::slice::from_raw_parts_mut(dictionary_ptr, dictionary_size);
}
for i in 0..dictionary.len() {
dictionary[i] = core::INVALID_POS;
}
for i in 0..old_num_buckets as usize {
if buckets[i].inner.is_none() {
continue;
}
let pos: usize = (buckets[i].hash % dictionary.len() as u64) as usize;
buckets[i].next = dictionary[pos];
dictionary[pos] = i as u32;
}
// Finally, update the CoreHashMap struct
inner.dictionary = dictionary;
inner.buckets = buckets;
inner.free_head = old_num_buckets;
Ok(())
}
// TODO: Shrinking is a multi-step process that requires co-operation from the caller
//
// 1. The caller must first call begin_shrink(). That forbids allocation of higher-numbered
// buckets.
//
// 2. Next, the caller must evict all entries in higher-numbered buckets.
//
// 3. Finally, call finish_shrink(). This recomputes the dictionary and shrinks the underlying
// shmem area
}
pub struct ValueReadGuard<'a, K, V> {

View File

@@ -1,21 +1,24 @@
//! Simple hash table with chaining
//!
//! # Resizing
//!
use std::hash::{DefaultHasher, Hash, Hasher};
use std::mem::MaybeUninit;
const INVALID_POS: u32 = u32::MAX;
pub(crate) const INVALID_POS: u32 = u32::MAX;
// Bucket
struct Bucket<K, V> {
hash: u64,
next: u32,
inner: Option<(K, V)>,
pub(crate) struct Bucket<K, V> {
pub(crate) hash: u64,
pub(crate) next: u32,
pub(crate) inner: Option<(K, V)>,
}
pub(crate) struct CoreHashMap<'a, K, V> {
dictionary: &'a mut [u32],
buckets: &'a mut [Bucket<K, V>],
free_head: u32,
pub(crate) dictionary: &'a mut [u32],
pub(crate) buckets: &'a mut [Bucket<K, V>],
pub(crate) free_head: u32,
// metrics
pub(crate) buckets_in_use: u32,
@@ -24,20 +27,20 @@ pub(crate) struct CoreHashMap<'a, K, V> {
pub struct FullError();
impl<'a, K, V> CoreHashMap<'a, K, V>
where K: Clone + Hash + Eq,
where
K: Clone + Hash + Eq,
{
const FILL_FACTOR: f32 = 0.60;
pub fn estimate_size(num_buckets: u32) -> usize{
pub fn estimate_size(num_buckets: u32) -> usize {
let mut size = 0;
// buckets
size += size_of::<Bucket<K, V>>() * num_buckets as usize;
// dictionary
size += (f32::ceil(
(size_of::<u32>() * num_buckets as usize) as f32 / Self::FILL_FACTOR)
) as usize;
size += (f32::ceil((size_of::<u32>() * num_buckets as usize) as f32 / Self::FILL_FACTOR))
as usize;
size
}
@@ -64,7 +67,8 @@ impl<'a, K, V> CoreHashMap<'a, K, V>
// Initialize the buckets
let buckets = {
let buckets_ptr: *mut MaybeUninit<Bucket<K, V>> = buckets_ptr.cast();
let buckets = unsafe { std::slice::from_raw_parts_mut(buckets_ptr, num_buckets as usize) };
let buckets =
unsafe { std::slice::from_raw_parts_mut(buckets_ptr, num_buckets as usize) };
for i in 0..buckets.len() {
buckets[i].write(Bucket {
hash: 0,

View File

@@ -6,11 +6,10 @@ use std::sync::atomic::{AtomicUsize, Ordering};
use crate::hash::HashMapAccess;
use crate::hash::HashMapInit;
use crate::hash::UpdateAction;
use crate::hash::{Key, Value};
use crate::shmem::ShmemHandle;
use rand::Rng;
use rand::seq::SliceRandom;
use rand::{Rng, RngCore};
use rand_distr::Zipf;
const TEST_KEY_LEN: usize = 16;
@@ -18,13 +17,6 @@ const TEST_KEY_LEN: usize = 16;
#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)]
struct TestKey([u8; TEST_KEY_LEN]);
impl Key for TestKey {
const KEY_LEN: usize = TEST_KEY_LEN;
fn as_bytes(&self) -> &[u8] {
&self.0
}
}
impl From<&TestKey> for u128 {
fn from(val: &TestKey) -> u128 {
u128::from_be_bytes(val.0)
@@ -43,14 +35,12 @@ impl<'a> From<&'a [u8]> for TestKey {
}
}
impl Value for usize {}
fn test_inserts<K: Into<TestKey> + Copy>(keys: &[K]) {
const MEM_SIZE: usize = 10000000;
let shmem = ShmemHandle::new("test_inserts", 0, MEM_SIZE).unwrap();
const MAX_MEM_SIZE: usize = 10000000;
let shmem = ShmemHandle::new("test_inserts", 0, MAX_MEM_SIZE).unwrap();
let init_struct = HashMapInit::<TestKey, usize>::init_in_shmem(shmem, MEM_SIZE);
let mut w = init_struct.attach_writer();
let init_struct = HashMapInit::<TestKey, usize>::init_in_shmem(100000, shmem);
let w = init_struct.attach_writer();
for (idx, k) in keys.iter().enumerate() {
let res = w.insert(&(*k).into(), idx);
@@ -114,8 +104,6 @@ impl TestValue {
}
}
impl Value for TestValue {}
impl Clone for TestValue {
fn clone(&self) -> TestValue {
TestValue::new(self.load())
@@ -164,10 +152,10 @@ fn apply_op(
#[test]
fn random_ops() {
const MEM_SIZE: usize = 10000000;
let shmem = ShmemHandle::new("test_inserts", 0, MEM_SIZE).unwrap();
const MAX_MEM_SIZE: usize = 10000000;
let shmem = ShmemHandle::new("test_inserts", 0, MAX_MEM_SIZE).unwrap();
let init_struct = HashMapInit::<TestKey, TestValue>::init_in_shmem(shmem, MEM_SIZE);
let init_struct = HashMapInit::<TestKey, TestValue>::init_in_shmem(100000, shmem);
let writer = init_struct.attach_writer();
let mut shadow: std::collections::BTreeMap<TestKey, usize> = BTreeMap::new();
@@ -175,11 +163,49 @@ fn random_ops() {
let distribution = Zipf::new(u128::MAX as f64, 1.1).unwrap();
let mut rng = rand::rng();
for i in 0..100000 {
let mut key: TestKey = (rng.sample(distribution) as u128).into();
if rng.random_bool(0.10) {
key = TestKey::from(u128::from(&key) | 0xffffffff);
}
let key: TestKey = (rng.sample(distribution) as u128).into();
let op = TestOp(key, if rng.random_bool(0.75) { Some(i) } else { None });
apply_op(&op, &writer, &mut shadow);
if i % 1000 == 0 {
eprintln!("{i} ops processed");
//eprintln!("stats: {:?}", tree_writer.get_statistics());
//test_iter(&tree_writer, &shadow);
}
}
}
#[test]
fn test_grow() {
const MEM_SIZE: usize = 10000000;
let shmem = ShmemHandle::new("test_grow", 0, MEM_SIZE).unwrap();
let init_struct = HashMapInit::<TestKey, TestValue>::init_in_shmem(1000, shmem);
let writer = init_struct.attach_writer();
let mut shadow: std::collections::BTreeMap<TestKey, usize> = BTreeMap::new();
let mut rng = rand::rng();
for i in 0..10000 {
let key: TestKey = ((rng.next_u32() % 1000) as u128).into();
let op = TestOp(key, if rng.random_bool(0.75) { Some(i) } else { None });
apply_op(&op, &writer, &mut shadow);
if i % 1000 == 0 {
eprintln!("{i} ops processed");
//eprintln!("stats: {:?}", tree_writer.get_statistics());
//test_iter(&tree_writer, &shadow);
}
}
writer.grow(1500).unwrap();
for i in 0..10000 {
let key: TestKey = ((rng.next_u32() % 1500) as u128).into();
let op = TestOp(key, if rng.random_bool(0.75) { Some(i) } else { None });

View File

@@ -1,19 +1,17 @@
use std::{
collections::{HashMap},
sync::{
Arc,
},
time::{Duration, Instant},
collections::HashMap,
io::{self, Error, ErrorKind},
sync::Arc,
time::{Duration, Instant},
};
use priority_queue::PriorityQueue;
use tokio::{
sync::{Mutex, Semaphore, OwnedSemaphorePermit},
time::sleep,
net::TcpStream,
io::{AsyncRead, AsyncWrite, ReadBuf},
net::TcpStream,
sync::{Mutex, OwnedSemaphorePermit, Semaphore},
time::sleep,
};
use tonic::transport::{Channel, Endpoint};
@@ -21,24 +19,19 @@ use uuid;
use std::{
pin::Pin,
task::{Context, Poll}
task::{Context, Poll},
};
use futures::future;
use rand::{
Rng,
rngs::StdRng,
SeedableRng
};
use rand::{Rng, SeedableRng, rngs::StdRng};
use bytes::BytesMut;
use http::Uri;
use hyper_util::rt::TokioIo;
use bytes::BytesMut;
use tower::service_fn;
use tokio_util::sync::CancellationToken;
//
// The "TokioTcp" is flakey TCP network for testing purposes, in order
// to simulate network errors and delays.
@@ -233,7 +226,6 @@ impl ConnectionPool {
hang_rate: f64,
aggregate_metrics: Option<Arc<crate::PageserverClientAggregateMetrics>>,
) -> Arc<Self> {
let shutdown_token = CancellationToken::new();
let pool = Arc::new(Self {
inner: Mutex::new(Inner {
@@ -310,7 +302,10 @@ impl ConnectionPool {
// metric
match self.aggregate_metrics {
Some(ref metrics) => {
metrics.retry_counters.with_label_values(&["connection_swept"]).inc();
metrics
.retry_counters
.with_label_values(&["connection_swept"])
.inc();
}
None => {}
}
@@ -327,21 +322,25 @@ impl ConnectionPool {
}
// If we have a permit already, get a connection out of the heap
async fn get_conn_with_permit(self: Arc<Self>, permit: OwnedSemaphorePermit)
-> Option<PooledClient> {
async fn get_conn_with_permit(
self: Arc<Self>,
permit: OwnedSemaphorePermit,
) -> Option<PooledClient> {
let mut inner = self.inner.lock().await;
// Pop the highest-active-consumers connection. There are no connections
// in the heap that have more than max_consumers active consumers.
if let Some((id, _cons)) = inner.pq.pop() {
let entry = inner.entries.get_mut(&id)
let entry = inner
.entries
.get_mut(&id)
.expect("pq and entries got out of sync");
let mut active_consumers = entry.active_consumers;
entry.active_consumers += 1;
entry.last_used = Instant::now();
let client = PooledClient {
let client = PooledClient {
channel: entry.channel.clone(),
pool: Arc::clone(&self),
id,
@@ -367,7 +366,6 @@ impl ConnectionPool {
}
pub async fn get_client(self: Arc<Self>) -> Result<PooledClient, tonic::Status> {
// The pool is shutting down. Don't accept new connections.
if self.shutdown_token.is_cancelled() {
return Err(tonic::Status::unavailable("Pool is shutting down"));
@@ -395,10 +393,12 @@ impl ConnectionPool {
}
}
Err(_) => {
match self_clone.aggregate_metrics {
Some(ref metrics) => {
metrics.retry_counters.with_label_values(&["sema_acquire_failed"]).inc();
metrics
.retry_counters
.with_label_values(&["sema_acquire_failed"])
.inc();
}
None => {}
}
@@ -490,10 +490,13 @@ impl ConnectionPool {
// Generate a random backoff to add some jitter so that connections
// don't all retry at the same time.
let mut backoff_delay = Duration::from_millis(
rand::thread_rng().gen_range(0..=self.connect_backoff.as_millis() as u64));
rand::thread_rng().gen_range(0..=self.connect_backoff.as_millis() as u64),
);
loop {
if self.shutdown_token.is_cancelled() { return; }
if self.shutdown_token.is_cancelled() {
return;
}
// Back off.
// Loop because failure can occur while we are sleeping, so wait
@@ -504,8 +507,7 @@ impl ConnectionPool {
if let Some(delay) = {
let inner = self.inner.lock().await;
inner.last_connect_failure.and_then(|at| {
(at.elapsed() < backoff_delay)
.then(|| backoff_delay - at.elapsed())
(at.elapsed() < backoff_delay).then(|| backoff_delay - at.elapsed())
})
} {
sleep(delay).await;
@@ -523,7 +525,10 @@ impl ConnectionPool {
//
match self.aggregate_metrics {
Some(ref metrics) => {
metrics.retry_counters.with_label_values(&["connection_attempt"]).inc();
metrics
.retry_counters
.with_label_values(&["connection_attempt"])
.inc();
}
None => {}
}
@@ -543,7 +548,10 @@ impl ConnectionPool {
{
match self.aggregate_metrics {
Some(ref metrics) => {
metrics.retry_counters.with_label_values(&["connection_success"]).inc();
metrics
.retry_counters
.with_label_values(&["connection_success"])
.inc();
}
None => {}
}
@@ -568,7 +576,10 @@ impl ConnectionPool {
Ok(Err(_)) | Err(_) => {
match self.aggregate_metrics {
Some(ref metrics) => {
metrics.retry_counters.with_label_values(&["connect_failed"]).inc();
metrics
.retry_counters
.with_label_values(&["connect_failed"])
.inc();
}
None => {}
}
@@ -576,7 +587,8 @@ impl ConnectionPool {
inner.last_connect_failure = Some(Instant::now());
// Add some jitter so that every connection doesn't retry at once
let jitter = rand::thread_rng().gen_range(0..=backoff_delay.as_millis() as u64);
backoff_delay = Duration::from_millis(backoff_delay.as_millis() as u64 + jitter);
backoff_delay =
Duration::from_millis(backoff_delay.as_millis() as u64 + jitter);
// Do not backoff longer than one minute
if backoff_delay > Duration::from_secs(60) {
@@ -588,7 +600,6 @@ impl ConnectionPool {
}
}
/// Return client to the pool, indicating success or error.
pub async fn return_client(&self, id: uuid::Uuid, success: bool, permit: OwnedSemaphorePermit) {
let mut inner = self.inner.lock().await;
@@ -607,7 +618,10 @@ impl ConnectionPool {
if entry.consecutive_errors == self.error_threshold {
match self.aggregate_metrics {
Some(ref metrics) => {
metrics.retry_counters.with_label_values(&["connection_dropped"]).inc();
metrics
.retry_counters
.with_label_values(&["connection_dropped"])
.inc();
}
None => {}
}
@@ -657,6 +671,8 @@ impl PooledClient {
}
pub async fn finish(self, result: Result<(), tonic::Status>) {
self.pool.return_client(self.id, result.is_ok(), self.permit).await;
self.pool
.return_client(self.id, result.is_ok(), self.permit)
.await;
}
}

View File

@@ -47,14 +47,14 @@ pub struct PageserverClientAggregateMetrics {
}
impl PageserverClientAggregateMetrics {
pub fn new() -> Self {
let request_counters = IntCounterVec::new(
metrics::core::Opts::new(
"backend_requests_total",
"Number of requests from backends.",
),
&["request_kind"],
).unwrap();
)
.unwrap();
let retry_counters = IntCounterVec::new(
metrics::core::Opts::new(
@@ -62,14 +62,15 @@ impl PageserverClientAggregateMetrics {
"Number of retried requests from backends.",
),
&["request_kind"],
).unwrap();
)
.unwrap();
Self {
request_counters,
retry_counters,
}
}
pub fn collect (&self) -> Vec<metrics::proto::MetricFamily> {
pub fn collect(&self) -> Vec<metrics::proto::MetricFamily> {
let mut metrics = Vec::new();
metrics.append(&mut self.request_counters.collect());
metrics.append(&mut self.retry_counters.collect());
@@ -132,7 +133,6 @@ impl PageserverClient {
options: ClientCacheOptions,
metrics: Option<Arc<PageserverClientAggregateMetrics>>,
) -> Self {
Self {
_tenant_id: tenant_id.to_string(),
_timeline_id: timeline_id.to_string(),
@@ -230,7 +230,10 @@ impl PageserverClient {
match self.aggregate_metrics {
Some(ref metrics) => {
metrics.request_counters.with_label_values(&["get_page"]).inc();
metrics
.request_counters
.with_label_values(&["get_page"])
.inc();
}
None => {}
}

View File

@@ -28,7 +28,6 @@ use axum::body::Body;
use axum::extract::State;
use axum::response::Response;
use http::StatusCode;
use http::header::CONTENT_TYPE;
@@ -170,8 +169,9 @@ pub(crate) fn main(args: Args) -> anyhow::Result<()> {
main_impl(args, thread_local_stats)
})
}
async fn get_metrics(State(state): State<Arc<pageserver_client_grpc::PageserverClientAggregateMetrics>>) -> Response {
async fn get_metrics(
State(state): State<Arc<pageserver_client_grpc::PageserverClientAggregateMetrics>>,
) -> Response {
let metrics = state.collect();
info!("metrics: {metrics:?}");
@@ -402,7 +402,10 @@ async fn main_impl(
if args.grpc_stream {
client_grpc_stream(args, worker_id, ss, cancel, rps_period, ranges, weights).await
} else if args.grpc {
client_grpc(args, worker_id, new_value, ss, cancel, rps_period, ranges, weights).await
client_grpc(
args, worker_id, new_value, ss, cancel, rps_period, ranges, weights,
)
.await
} else {
client_libpq(args, worker_id, ss, cancel, rps_period, ranges, weights).await
}
@@ -581,8 +584,6 @@ async fn client_grpc(
let client = Arc::new(client);
shared_state.start_work_barrier.wait().await;
let client_start = Instant::now();
let mut ticks_processed = 0;

View File

@@ -62,22 +62,15 @@ pub struct MyAllocatorCollector {
impl MyAllocatorCollector {
pub fn new() -> MyAllocatorCollector {
MyAllocatorCollector {
allocations: IntGauge::new(
"allocations_total",
"Number of allocations in Rust code",
).unwrap(),
allocations: IntGauge::new("allocations_total", "Number of allocations in Rust code")
.unwrap(),
deallocations: IntGauge::new(
"deallocations_total",
"Number of deallocations in Rust code",
).unwrap(),
allocated: IntGauge::new(
"allocated_total",
"Bytes currently allocated",
).unwrap(),
high: IntGauge::new(
"allocated_high",
"High watermark of allocated bytes",
).unwrap(),
)
.unwrap(),
allocated: IntGauge::new("allocated_total", "Bytes currently allocated").unwrap(),
high: IntGauge::new("allocated_high", "High watermark of allocated bytes").unwrap(),
}
}
}
@@ -98,9 +91,12 @@ impl metrics::core::Collector for MyAllocatorCollector {
let mut values = Vec::new();
// update the gauges
self.allocations.set(GLOBAL.allocations.load(Ordering::Relaxed) as i64);
self.deallocations.set(GLOBAL.allocations.load(Ordering::Relaxed) as i64);
self.allocated.set(GLOBAL.allocated.load(Ordering::Relaxed) as i64);
self.allocations
.set(GLOBAL.allocations.load(Ordering::Relaxed) as i64);
self.deallocations
.set(GLOBAL.allocations.load(Ordering::Relaxed) as i64);
self.allocated
.set(GLOBAL.allocated.load(Ordering::Relaxed) as i64);
self.high.set(GLOBAL.high.load(Ordering::Relaxed) as i64);
values.append(&mut self.allocations.collect());

View File

@@ -83,6 +83,8 @@ pub extern "C" fn rcommunicator_shmem_init(
max_procs: u32,
shmem_area_ptr: *mut MaybeUninit<u8>,
shmem_area_len: u64,
initial_file_cache_size: u64,
max_file_cache_size: u64,
) -> &'static mut CommunicatorInitStruct {
let shmem_area: &'static mut [MaybeUninit<u8>] =
unsafe { std::slice::from_raw_parts_mut(shmem_area_ptr, shmem_area_len as usize) };
@@ -107,8 +109,12 @@ pub extern "C" fn rcommunicator_shmem_init(
};
// Give the rest of the area to the integrated cache
let integrated_cache_init_struct =
IntegratedCacheInitStruct::shmem_init(max_procs, remaining_area);
let integrated_cache_init_struct = IntegratedCacheInitStruct::shmem_init(
max_procs,
remaining_area,
initial_file_cache_size,
max_file_cache_size,
);
let (submission_pipe_read_fd, submission_pipe_write_fd) = unsafe {
use std::os::fd::FromRawFd;

View File

@@ -37,14 +37,6 @@ use neon_shmem::hash::HashMapInit;
use neon_shmem::hash::UpdateAction;
use neon_shmem::shmem::ShmemHandle;
/// in bytes
/// FIXME: calculate some reasonable upper bound
const MAX_BLOCK_MAP_SIZE: usize = 1024*1024*1024;
/// # of entries in the block mapping
/// FIXME: make it resizable.
const BLOCK_MAP_SIZE: u32 = 1000;
// in # of entries
const RELSIZE_CACHE_SIZE: u32 = 64 * 1024;
@@ -84,12 +76,12 @@ pub struct IntegratedCacheReadAccess<'t> {
block_map: neon_shmem::hash::HashMapAccess<'t, BlockKey, BlockEntry>,
}
impl<'t> IntegratedCacheInitStruct<'t> {
/// Return the desired size in bytes of the fixed-size shared memory area to reserve for the
/// integrated cache.
pub fn shmem_size(_max_procs: u32) -> usize {
// The relsize cache is fixed-size. The block map is allocated in a separate resizable
// area.
HashMapInit::<RelKey, RelEntry>::estimate_size(RELSIZE_CACHE_SIZE)
}
@@ -98,21 +90,30 @@ impl<'t> IntegratedCacheInitStruct<'t> {
pub fn shmem_init(
_max_procs: u32,
shmem_area: &'t mut [MaybeUninit<u8>],
initial_file_cache_size: u64,
max_file_cache_size: u64,
) -> IntegratedCacheInitStruct<'t> {
// Initialize the hash map
// Initialize the relsize cache in the fixed-size area
let relsize_cache_handle =
neon_shmem::hash::HashMapInit::init_in_fixed_area(RELSIZE_CACHE_SIZE, shmem_area);
let shmem_handle = ShmemHandle::new("block mapping", 0, MAX_BLOCK_MAP_SIZE).unwrap();
let max_bytes =
HashMapInit::<BlockKey, BlockEntry>::estimate_size(max_file_cache_size as u32);
let block_map_handle =
neon_shmem::hash::HashMapInit::init_in_shmem(BLOCK_MAP_SIZE, shmem_handle);
// Initialize the block map in a separate resizable shared memory area
let shmem_handle = ShmemHandle::new("block mapping", 0, max_bytes).unwrap();
let block_map_handle = neon_shmem::hash::HashMapInit::init_in_shmem(
initial_file_cache_size as u32,
shmem_handle,
);
IntegratedCacheInitStruct {
relsize_cache_handle,
block_map_handle,
}
}
/// Initialize access to the integrated cache for the communicator worker process
pub fn worker_process_init(
self,
lsn: Lsn,
@@ -165,6 +166,7 @@ impl<'t> IntegratedCacheInitStruct<'t> {
}
}
/// Initialize access to the integrated cache for a backend process
pub fn backend_init(self) -> IntegratedCacheReadAccess<'t> {
let IntegratedCacheInitStruct {
relsize_cache_handle,
@@ -198,16 +200,14 @@ struct RelEntry {
impl std::fmt::Debug for RelEntry {
fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
fmt
.debug_struct("Rel")
fmt.debug_struct("Rel")
.field("nblocks", &self.nblocks.load(Ordering::Relaxed))
.finish()
}
}
impl std::fmt::Debug for BlockEntry {
fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
fmt
.debug_struct("Block")
fmt.debug_struct("Block")
.field("lw_lsn", &self.lw_lsn.load())
.field("cache_block", &self.cache_block.load(Ordering::Relaxed))
.field("pinned", &self.pinned.load(Ordering::Relaxed))
@@ -268,8 +268,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
block_number: u32,
dst: impl uring_common::buf::IoBufMut + Send + Sync,
) -> Result<CacheResult<()>, std::io::Error> {
let x = if let Some(block_entry) =
self.block_map.get(&BlockKey::from((rel, block_number)))
let x = if let Some(block_entry) = self.block_map.get(&BlockKey::from((rel, block_number)))
{
block_entry.referenced.store(true, Ordering::Relaxed);
@@ -344,24 +343,23 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
}
pub fn remember_rel_size(&'t self, rel: &RelTag, nblocks: u32) {
let result = self
.relsize_cache
.update_with_fn(&RelKey::from(rel), |existing| match existing {
None => {
tracing::info!("inserting rel entry for {rel:?}, {nblocks} blocks");
UpdateAction::Insert(RelEntry {
nblocks: AtomicU32::new(nblocks),
})
}
Some(e) => {
tracing::info!("updating rel entry for {rel:?}, {nblocks} blocks");
e.nblocks.store(nblocks, Ordering::Relaxed);
UpdateAction::Nothing
}
});
let result =
self.relsize_cache
.update_with_fn(&RelKey::from(rel), |existing| match existing {
None => {
tracing::info!("inserting rel entry for {rel:?}, {nblocks} blocks");
UpdateAction::Insert(RelEntry {
nblocks: AtomicU32::new(nblocks),
})
}
Some(e) => {
tracing::info!("updating rel entry for {rel:?}, {nblocks} blocks");
e.nblocks.store(nblocks, Ordering::Relaxed);
UpdateAction::Nothing
}
});
// FIXME: what to do if we run out of memory? Evict other relation entries? Remove
// block entries first?
// FIXME: what to do if we run out of memory? Evict other relation entries?
result.expect("out of memory");
}
@@ -606,31 +604,31 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
let mut evicted_cache_block = None;
let res =
self.block_map
.update_with_fn_at_bucket(*clock_hand % num_buckets, |old| {
match old {
None => UpdateAction::Nothing,
Some(old) => {
// note: all the accesses to 'pinned' currently happen
// within update_with_fn(), or while holding ValueReadGuard, which protects from concurrent
// updates. Otherwise, another thread could set the 'pinned'
// flag just after we have checked it here.
if old.pinned.load(Ordering::Relaxed) != 0 {
return UpdateAction::Nothing;
}
.update_with_fn_at_bucket(*clock_hand % num_buckets, |old| {
match old {
None => UpdateAction::Nothing,
Some(old) => {
// note: all the accesses to 'pinned' currently happen
// within update_with_fn(), or while holding ValueReadGuard, which protects from concurrent
// updates. Otherwise, another thread could set the 'pinned'
// flag just after we have checked it here.
if old.pinned.load(Ordering::Relaxed) != 0 {
return UpdateAction::Nothing;
}
let _ = self
.global_lw_lsn
.fetch_max(old.lw_lsn.load().0, Ordering::Relaxed);
let cache_block = old
.cache_block
.swap(INVALID_CACHE_BLOCK, Ordering::Relaxed);
if cache_block != INVALID_CACHE_BLOCK {
evicted_cache_block = Some(cache_block);
let _ = self
.global_lw_lsn
.fetch_max(old.lw_lsn.load().0, Ordering::Relaxed);
let cache_block = old
.cache_block
.swap(INVALID_CACHE_BLOCK, Ordering::Relaxed);
if cache_block != INVALID_CACHE_BLOCK {
evicted_cache_block = Some(cache_block);
}
UpdateAction::Remove
}
UpdateAction::Remove
}
}
});
});
// Out of memory should not happen here, as we're only updating existing values,
// not inserting new entries to the map.
@@ -646,6 +644,21 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
None
}
pub fn resize_file_cache(&self, num_blocks: u32) {
let old_num_blocks = self.block_map.get_num_buckets() as u32;
if old_num_blocks < num_blocks {
if let Err(err) = self.block_map.grow(num_blocks) {
tracing::warn!(
"could not grow file cache to {} blocks (old size {}): {}",
num_blocks,
old_num_blocks,
err
);
}
}
}
pub fn dump_map(&self, _dst: &mut dyn std::io::Write) {
//FIXME self.cache_map.start_read().dump(dst);
}

View File

@@ -71,13 +71,13 @@ pub(super) async fn init(
timeline_id: String,
auth_token: Option<String>,
shard_map: HashMap<utils::shard::ShardIndex, String>,
file_cache_size: u64,
initial_file_cache_size: u64,
file_cache_path: Option<PathBuf>,
) -> CommunicatorWorkerProcessStruct<'static> {
let last_lsn = get_request_lsn();
let file_cache = if let Some(path) = file_cache_path {
Some(FileCache::new(&path, file_cache_size).expect("could not create cache file"))
Some(FileCache::new(&path, initial_file_cache_size).expect("could not create cache file"))
} else {
// FIXME: temporarily for testing, use LFC even if disabled
Some(

View File

@@ -8,6 +8,7 @@ use tracing::error;
use crate::init::CommunicatorInitStruct;
use crate::worker_process::main_loop;
use crate::worker_process::main_loop::CommunicatorWorkerProcessStruct;
/// Launch the communicator's tokio tasks, which do most of the work.
///
@@ -24,8 +25,8 @@ pub extern "C" fn communicator_worker_process_launch(
shard_map: *mut *mut c_char,
nshards: u32,
file_cache_path: *const c_char,
file_cache_size: u64,
) {
initial_file_cache_size: u64,
) -> &'static CommunicatorWorkerProcessStruct<'static> {
// Convert the arguments into more convenient Rust types
let tenant_id = unsafe { CStr::from_ptr(tenant_id) }.to_str().unwrap();
let timeline_id = unsafe { CStr::from_ptr(timeline_id) }.to_str().unwrap();
@@ -53,7 +54,7 @@ pub extern "C" fn communicator_worker_process_launch(
timeline_id.to_string(),
auth_token,
shard_map,
file_cache_size,
initial_file_cache_size,
file_cache_path,
));
let worker_struct = Box::leak(Box::new(worker_struct));
@@ -69,6 +70,8 @@ pub extern "C" fn communicator_worker_process_launch(
// keep the runtime running after we exit this function
Box::leak(Box::new(runtime));
worker_struct
}
/// Convert the "shard map" from an array of C strings, indexed by shard no to a rust HashMap
@@ -98,3 +101,12 @@ fn parse_shard_map(
}
result
}
/// Inform the rust code about a configuration change
#[unsafe(no_mangle)]
pub extern "C" fn communicator_worker_config_reload(
proc_handle: &'static CommunicatorWorkerProcessStruct<'static>,
file_cache_size: u64,
) {
proc_handle.cache.resize_file_cache(file_cache_size as u32);
}

View File

@@ -46,6 +46,7 @@
* here. This code shouldn't be using the C file cache for anything else than
* the GUCs.
*/
extern int lfc_max_size;
extern int lfc_size_limit;
extern char *lfc_path;
@@ -171,6 +172,8 @@ communicator_new_shmem_startup(void)
size_t communicator_size;
size_t shmem_size;
void *shmem_ptr;
uint64 initial_file_cache_size;
uint64 max_file_cache_size;
rc = pipe(pipefd);
if (rc != 0)
@@ -197,8 +200,17 @@ communicator_new_shmem_startup(void)
for (int i = 0; i < MaxProcs; i++)
InitSharedLatch(&communicator_shmem_ptr->backends[i].io_completion_latch);
/* lfc_size_limit is in MBs */
initial_file_cache_size = lfc_size_limit * (1024 * 1024 / BLCKSZ);
max_file_cache_size = lfc_max_size * (1024 * 1024 / BLCKSZ);
if (initial_file_cache_size < 100)
initial_file_cache_size = 100;
if (max_file_cache_size < 100)
max_file_cache_size = 100;
/* Initialize the rust-managed parts */
cis = rcommunicator_shmem_init(pipefd[0], pipefd[1], MaxProcs, shmem_ptr, shmem_size);
cis = rcommunicator_shmem_init(pipefd[0], pipefd[1], MaxProcs, shmem_ptr, shmem_size,
initial_file_cache_size, max_file_cache_size);
}
/**** Worker process functions. These run in the communicator worker process ****/
@@ -212,7 +224,8 @@ communicator_new_bgworker_main(Datum main_arg)
struct LoggingState *logging;
char errbuf[1000];
int elevel;
uint64 initial_file_cache_size;
uint64 file_cache_size;
const struct CommunicatorWorkerProcessStruct *proc_handle;
/*
* Pretend that this process is a WAL sender. That affects the shutdown
@@ -222,7 +235,9 @@ communicator_new_bgworker_main(Datum main_arg)
MarkPostmasterChildWalSender();
/* lfc_size_limit is in MBs */
initial_file_cache_size = lfc_size_limit * (1024 * 1024 / BLCKSZ);
file_cache_size = lfc_size_limit * (1024 * 1024 / BLCKSZ);
if (file_cache_size < 100)
file_cache_size = 100;
/* Establish signal handlers. */
pqsignal(SIGUSR1, procsignal_sigusr1_handler);
@@ -240,7 +255,7 @@ communicator_new_bgworker_main(Datum main_arg)
logging = configure_logging();
communicator_worker_process_launch(
proc_handle = communicator_worker_process_launch(
cis,
neon_tenant,
neon_timeline,
@@ -248,7 +263,7 @@ communicator_new_bgworker_main(Datum main_arg)
connstrs,
num_shards,
lfc_path,
initial_file_cache_size);
file_cache_size);
cis = NULL;
elog(LOG, "communicator threads started");
@@ -258,6 +273,18 @@ communicator_new_bgworker_main(Datum main_arg)
CHECK_FOR_INTERRUPTS();
if (ConfigReloadPending)
{
ConfigReloadPending = false;
ProcessConfigFile(PGC_SIGHUP);
/* lfc_size_limit is in MBs */
file_cache_size = lfc_size_limit * (1024 * 1024 / BLCKSZ);
if (file_cache_size < 100)
file_cache_size = 100;
communicator_worker_config_reload(proc_handle, file_cache_size);
}
for (;;)
{
rc = pump_logging(logging, (uint8 *) errbuf, sizeof(errbuf), &elevel);

View File

@@ -183,7 +183,7 @@ typedef struct FileCacheControl
static HTAB *lfc_hash;
static int lfc_desc = -1;
static LWLockId lfc_lock;
static int lfc_max_size;
int lfc_max_size;
int lfc_size_limit;
static int lfc_prewarm_limit;
static int lfc_prewarm_batch;

View File

@@ -26,6 +26,7 @@ typedef struct FileCacheState
/* GUCs */
extern bool lfc_store_prefetch_result;
extern int lfc_max_size;
extern int lfc_size_limit;
extern char *lfc_path;