From f06bb2bbd87bdae1074c545b4641ad661f12fc56 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Thu, 29 May 2025 15:54:55 +0300 Subject: [PATCH] Implement growing the hash table. Fix unit tests. --- libs/neon-shmem/src/hash.rs | 129 ++++++++++++++--- libs/neon-shmem/src/hash/core.rs | 32 +++-- libs/neon-shmem/src/hash/tests.rs | 76 ++++++---- pageserver/client_grpc/src/client_cache.rs | 88 +++++++----- pageserver/client_grpc/src/lib.rs | 15 +- .../pagebench/src/cmd/getpage_latest_lsn.rs | 13 +- .../neon/communicator/src/global_allocator.rs | 28 ++-- pgxn/neon/communicator/src/init.rs | 10 +- .../neon/communicator/src/integrated_cache.rs | 131 ++++++++++-------- .../src/worker_process/main_loop.rs | 4 +- .../src/worker_process/worker_interface.rs | 18 ++- pgxn/neon/communicator_new.c | 37 ++++- pgxn/neon/file_cache.c | 2 +- pgxn/neon/file_cache.h | 1 + 14 files changed, 391 insertions(+), 193 deletions(-) diff --git a/libs/neon-shmem/src/hash.rs b/libs/neon-shmem/src/hash.rs index dcb5343b42..2485fb6e79 100644 --- a/libs/neon-shmem/src/hash.rs +++ b/libs/neon-shmem/src/hash.rs @@ -32,15 +32,14 @@ pub enum UpdateAction { #[derive(Debug)] pub struct OutOfMemoryError(); -pub struct HashMapInit<'a, K, V> -{ +pub struct HashMapInit<'a, K, V> { // Hash table can be allocated in a fixed memory area, or in a resizeable ShmemHandle. - shmem: Option, + shmem_handle: Option, shared_ptr: *mut HashMapShared<'a, K, V>, } pub struct HashMapAccess<'a, K, V> { - _shmem: Option, + shmem_handle: Option, shared_ptr: *mut HashMapShared<'a, K, V>, } @@ -50,7 +49,7 @@ unsafe impl<'a, K: Send, V: Send> Send for HashMapAccess<'a, K, V> {} impl<'a, K, V> HashMapInit<'a, K, V> { pub fn attach_writer(self) -> HashMapAccess<'a, K, V> { HashMapAccess { - _shmem: self.shmem, + shmem_handle: self.shmem_handle, shared_ptr: self.shared_ptr, } } @@ -62,20 +61,23 @@ impl<'a, K, V> HashMapInit<'a, K, V> { } // This is stored in the shared memory area -struct HashMapShared<'a, K, V> -{ +struct HashMapShared<'a, K, V> { inner: spin::RwLock>, } impl<'a, K, V> HashMapInit<'a, K, V> -where K: Clone + Hash + Eq, +where + K: Clone + Hash + Eq, { pub fn estimate_size(num_buckets: u32) -> usize { // add some margin to cover alignment etc. CoreHashMap::::estimate_size(num_buckets) + size_of::>() + 1000 } - - pub fn init_in_fixed_area(num_buckets: u32, area: &'a mut [MaybeUninit]) -> HashMapInit<'a, K, V> { + + pub fn init_in_fixed_area( + num_buckets: u32, + area: &'a mut [MaybeUninit], + ) -> HashMapInit<'a, K, V> { Self::init_common(num_buckets, None, area.as_mut_ptr().cast(), area.len()) } @@ -90,7 +92,12 @@ where K: Clone + Hash + Eq, Self::init_common(num_buckets, Some(shmem), ptr, size) } - fn init_common(num_buckets: u32, shmem_handle: Option, area_ptr: *mut u8, area_len: usize) -> HashMapInit<'a, K, V> { + fn init_common( + num_buckets: u32, + shmem_handle: Option, + area_ptr: *mut u8, + area_len: usize, + ) -> HashMapInit<'a, K, V> { // carve out HashMapShared from the area. This does not include the hashmap's dictionary // and buckets. let mut ptr: *mut u8 = area_ptr; @@ -100,10 +107,7 @@ where K: Clone + Hash + Eq, // the rest of the space is given to the hash map's dictionary and buckets let remaining_area = unsafe { - std::slice::from_raw_parts_mut( - ptr, - area_len - ptr.offset_from(area_ptr) as usize, - ) + std::slice::from_raw_parts_mut(ptr, area_len - ptr.offset_from(area_ptr) as usize) }; let hashmap = CoreHashMap::new(num_buckets, remaining_area); @@ -117,15 +121,15 @@ where K: Clone + Hash + Eq, } HashMapInit { - shmem: shmem_handle, + shmem_handle: shmem_handle, shared_ptr, } } - } impl<'a, K, V> HashMapAccess<'a, K, V> - where K: Clone + Hash + Eq, +where + K: Clone + Hash + Eq, { pub fn get<'e>(&'e self, key: &K) -> Option> { let map = unsafe { self.shared_ptr.as_ref() }.unwrap(); @@ -254,6 +258,95 @@ impl<'a, K, V> HashMapAccess<'a, K, V> let map = unsafe { self.shared_ptr.as_ref() }.unwrap(); map.inner.read().buckets_in_use as usize } + + /// Grow + /// + /// 1. grow the underlying shared memory area + /// 2. Initialize new buckets. This overwrites the current dictionary + /// 3. Recalculate the dictionary + pub fn grow(&self, num_buckets: u32) -> Result<(), crate::shmem::Error> { + let map = unsafe { self.shared_ptr.as_ref() }.unwrap(); + let mut lock_guard = map.inner.write(); + let inner = &mut *lock_guard; + let old_num_buckets = inner.buckets.len() as u32; + + if num_buckets < old_num_buckets { + panic!("grow called with a smaller number of buckets"); + } + if num_buckets == old_num_buckets { + return Ok(()); + } + let shmem_handle = self + .shmem_handle + .as_ref() + .expect("grow called on a fixed-size hash table"); + + let size_bytes = HashMapInit::::estimate_size(num_buckets); + shmem_handle.set_size(size_bytes)?; + let end_ptr: *mut u8 = unsafe { shmem_handle.data_ptr.as_ptr().add(size_bytes) }; + + // Initialize new buckets. The new buckets are linked to the free list. NB: This overwrites + // the dictionary! + let buckets_ptr = inner.buckets.as_mut_ptr(); + unsafe { + for i in old_num_buckets..num_buckets { + let bucket_ptr = buckets_ptr.add(i as usize); + bucket_ptr.write(core::Bucket { + hash: 0, + next: if i < num_buckets { + i as u32 + 1 + } else { + inner.free_head + }, + inner: None, + }); + } + } + + // Recalculate the dictionary + let buckets; + let dictionary; + unsafe { + let buckets_end_ptr = buckets_ptr.add(num_buckets as usize); + let dictionary_ptr: *mut u32 = buckets_end_ptr + .byte_add(buckets_end_ptr.align_offset(align_of::())) + .cast(); + let dictionary_size: usize = + end_ptr.byte_offset_from(buckets_end_ptr) as usize / size_of::(); + + buckets = std::slice::from_raw_parts_mut(buckets_ptr, num_buckets as usize); + dictionary = std::slice::from_raw_parts_mut(dictionary_ptr, dictionary_size); + } + for i in 0..dictionary.len() { + dictionary[i] = core::INVALID_POS; + } + + for i in 0..old_num_buckets as usize { + if buckets[i].inner.is_none() { + continue; + } + let pos: usize = (buckets[i].hash % dictionary.len() as u64) as usize; + buckets[i].next = dictionary[pos]; + dictionary[pos] = i as u32; + } + + // Finally, update the CoreHashMap struct + inner.dictionary = dictionary; + inner.buckets = buckets; + inner.free_head = old_num_buckets; + + Ok(()) + } + + // TODO: Shrinking is a multi-step process that requires co-operation from the caller + // + // 1. The caller must first call begin_shrink(). That forbids allocation of higher-numbered + // buckets. + // + // 2. Next, the caller must evict all entries in higher-numbered buckets. + // + // 3. Finally, call finish_shrink(). This recomputes the dictionary and shrinks the underlying + // shmem area } pub struct ValueReadGuard<'a, K, V> { diff --git a/libs/neon-shmem/src/hash/core.rs b/libs/neon-shmem/src/hash/core.rs index 9729596870..8efbd4b36a 100644 --- a/libs/neon-shmem/src/hash/core.rs +++ b/libs/neon-shmem/src/hash/core.rs @@ -1,21 +1,24 @@ //! Simple hash table with chaining +//! +//! # Resizing +//! use std::hash::{DefaultHasher, Hash, Hasher}; use std::mem::MaybeUninit; -const INVALID_POS: u32 = u32::MAX; +pub(crate) const INVALID_POS: u32 = u32::MAX; // Bucket -struct Bucket { - hash: u64, - next: u32, - inner: Option<(K, V)>, +pub(crate) struct Bucket { + pub(crate) hash: u64, + pub(crate) next: u32, + pub(crate) inner: Option<(K, V)>, } pub(crate) struct CoreHashMap<'a, K, V> { - dictionary: &'a mut [u32], - buckets: &'a mut [Bucket], - free_head: u32, + pub(crate) dictionary: &'a mut [u32], + pub(crate) buckets: &'a mut [Bucket], + pub(crate) free_head: u32, // metrics pub(crate) buckets_in_use: u32, @@ -24,20 +27,20 @@ pub(crate) struct CoreHashMap<'a, K, V> { pub struct FullError(); impl<'a, K, V> CoreHashMap<'a, K, V> - where K: Clone + Hash + Eq, +where + K: Clone + Hash + Eq, { const FILL_FACTOR: f32 = 0.60; - pub fn estimate_size(num_buckets: u32) -> usize{ + pub fn estimate_size(num_buckets: u32) -> usize { let mut size = 0; // buckets size += size_of::>() * num_buckets as usize; // dictionary - size += (f32::ceil( - (size_of::() * num_buckets as usize) as f32 / Self::FILL_FACTOR) - ) as usize; + size += (f32::ceil((size_of::() * num_buckets as usize) as f32 / Self::FILL_FACTOR)) + as usize; size } @@ -64,7 +67,8 @@ impl<'a, K, V> CoreHashMap<'a, K, V> // Initialize the buckets let buckets = { let buckets_ptr: *mut MaybeUninit> = buckets_ptr.cast(); - let buckets = unsafe { std::slice::from_raw_parts_mut(buckets_ptr, num_buckets as usize) }; + let buckets = + unsafe { std::slice::from_raw_parts_mut(buckets_ptr, num_buckets as usize) }; for i in 0..buckets.len() { buckets[i].write(Bucket { hash: 0, diff --git a/libs/neon-shmem/src/hash/tests.rs b/libs/neon-shmem/src/hash/tests.rs index c212b883a3..073aea5220 100644 --- a/libs/neon-shmem/src/hash/tests.rs +++ b/libs/neon-shmem/src/hash/tests.rs @@ -6,11 +6,10 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use crate::hash::HashMapAccess; use crate::hash::HashMapInit; use crate::hash::UpdateAction; -use crate::hash::{Key, Value}; use crate::shmem::ShmemHandle; -use rand::Rng; use rand::seq::SliceRandom; +use rand::{Rng, RngCore}; use rand_distr::Zipf; const TEST_KEY_LEN: usize = 16; @@ -18,13 +17,6 @@ const TEST_KEY_LEN: usize = 16; #[derive(Clone, Copy, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)] struct TestKey([u8; TEST_KEY_LEN]); -impl Key for TestKey { - const KEY_LEN: usize = TEST_KEY_LEN; - fn as_bytes(&self) -> &[u8] { - &self.0 - } -} - impl From<&TestKey> for u128 { fn from(val: &TestKey) -> u128 { u128::from_be_bytes(val.0) @@ -43,14 +35,12 @@ impl<'a> From<&'a [u8]> for TestKey { } } -impl Value for usize {} - fn test_inserts + Copy>(keys: &[K]) { - const MEM_SIZE: usize = 10000000; - let shmem = ShmemHandle::new("test_inserts", 0, MEM_SIZE).unwrap(); + const MAX_MEM_SIZE: usize = 10000000; + let shmem = ShmemHandle::new("test_inserts", 0, MAX_MEM_SIZE).unwrap(); - let init_struct = HashMapInit::::init_in_shmem(shmem, MEM_SIZE); - let mut w = init_struct.attach_writer(); + let init_struct = HashMapInit::::init_in_shmem(100000, shmem); + let w = init_struct.attach_writer(); for (idx, k) in keys.iter().enumerate() { let res = w.insert(&(*k).into(), idx); @@ -114,8 +104,6 @@ impl TestValue { } } -impl Value for TestValue {} - impl Clone for TestValue { fn clone(&self) -> TestValue { TestValue::new(self.load()) @@ -164,10 +152,10 @@ fn apply_op( #[test] fn random_ops() { - const MEM_SIZE: usize = 10000000; - let shmem = ShmemHandle::new("test_inserts", 0, MEM_SIZE).unwrap(); + const MAX_MEM_SIZE: usize = 10000000; + let shmem = ShmemHandle::new("test_inserts", 0, MAX_MEM_SIZE).unwrap(); - let init_struct = HashMapInit::::init_in_shmem(shmem, MEM_SIZE); + let init_struct = HashMapInit::::init_in_shmem(100000, shmem); let writer = init_struct.attach_writer(); let mut shadow: std::collections::BTreeMap = BTreeMap::new(); @@ -175,11 +163,49 @@ fn random_ops() { let distribution = Zipf::new(u128::MAX as f64, 1.1).unwrap(); let mut rng = rand::rng(); for i in 0..100000 { - let mut key: TestKey = (rng.sample(distribution) as u128).into(); - - if rng.random_bool(0.10) { - key = TestKey::from(u128::from(&key) | 0xffffffff); - } + let key: TestKey = (rng.sample(distribution) as u128).into(); + + let op = TestOp(key, if rng.random_bool(0.75) { Some(i) } else { None }); + + apply_op(&op, &writer, &mut shadow); + + if i % 1000 == 0 { + eprintln!("{i} ops processed"); + //eprintln!("stats: {:?}", tree_writer.get_statistics()); + //test_iter(&tree_writer, &shadow); + } + } +} + +#[test] +fn test_grow() { + const MEM_SIZE: usize = 10000000; + let shmem = ShmemHandle::new("test_grow", 0, MEM_SIZE).unwrap(); + + let init_struct = HashMapInit::::init_in_shmem(1000, shmem); + let writer = init_struct.attach_writer(); + + let mut shadow: std::collections::BTreeMap = BTreeMap::new(); + + let mut rng = rand::rng(); + for i in 0..10000 { + let key: TestKey = ((rng.next_u32() % 1000) as u128).into(); + + let op = TestOp(key, if rng.random_bool(0.75) { Some(i) } else { None }); + + apply_op(&op, &writer, &mut shadow); + + if i % 1000 == 0 { + eprintln!("{i} ops processed"); + //eprintln!("stats: {:?}", tree_writer.get_statistics()); + //test_iter(&tree_writer, &shadow); + } + } + + writer.grow(1500).unwrap(); + + for i in 0..10000 { + let key: TestKey = ((rng.next_u32() % 1500) as u128).into(); let op = TestOp(key, if rng.random_bool(0.75) { Some(i) } else { None }); diff --git a/pageserver/client_grpc/src/client_cache.rs b/pageserver/client_grpc/src/client_cache.rs index cb496148ee..b58a7119a4 100644 --- a/pageserver/client_grpc/src/client_cache.rs +++ b/pageserver/client_grpc/src/client_cache.rs @@ -1,19 +1,17 @@ use std::{ - collections::{HashMap}, - sync::{ - Arc, - }, - time::{Duration, Instant}, + collections::HashMap, io::{self, Error, ErrorKind}, + sync::Arc, + time::{Duration, Instant}, }; use priority_queue::PriorityQueue; use tokio::{ - sync::{Mutex, Semaphore, OwnedSemaphorePermit}, - time::sleep, - net::TcpStream, io::{AsyncRead, AsyncWrite, ReadBuf}, + net::TcpStream, + sync::{Mutex, OwnedSemaphorePermit, Semaphore}, + time::sleep, }; use tonic::transport::{Channel, Endpoint}; @@ -21,24 +19,19 @@ use uuid; use std::{ pin::Pin, - task::{Context, Poll} + task::{Context, Poll}, }; use futures::future; -use rand::{ - Rng, - rngs::StdRng, - SeedableRng -}; +use rand::{Rng, SeedableRng, rngs::StdRng}; +use bytes::BytesMut; use http::Uri; use hyper_util::rt::TokioIo; -use bytes::BytesMut; use tower::service_fn; use tokio_util::sync::CancellationToken; - // // The "TokioTcp" is flakey TCP network for testing purposes, in order // to simulate network errors and delays. @@ -233,7 +226,6 @@ impl ConnectionPool { hang_rate: f64, aggregate_metrics: Option>, ) -> Arc { - let shutdown_token = CancellationToken::new(); let pool = Arc::new(Self { inner: Mutex::new(Inner { @@ -310,7 +302,10 @@ impl ConnectionPool { // metric match self.aggregate_metrics { Some(ref metrics) => { - metrics.retry_counters.with_label_values(&["connection_swept"]).inc(); + metrics + .retry_counters + .with_label_values(&["connection_swept"]) + .inc(); } None => {} } @@ -327,21 +322,25 @@ impl ConnectionPool { } // If we have a permit already, get a connection out of the heap - async fn get_conn_with_permit(self: Arc, permit: OwnedSemaphorePermit) - -> Option { + async fn get_conn_with_permit( + self: Arc, + permit: OwnedSemaphorePermit, + ) -> Option { let mut inner = self.inner.lock().await; // Pop the highest-active-consumers connection. There are no connections // in the heap that have more than max_consumers active consumers. if let Some((id, _cons)) = inner.pq.pop() { - let entry = inner.entries.get_mut(&id) + let entry = inner + .entries + .get_mut(&id) .expect("pq and entries got out of sync"); let mut active_consumers = entry.active_consumers; entry.active_consumers += 1; entry.last_used = Instant::now(); - let client = PooledClient { + let client = PooledClient { channel: entry.channel.clone(), pool: Arc::clone(&self), id, @@ -367,7 +366,6 @@ impl ConnectionPool { } pub async fn get_client(self: Arc) -> Result { - // The pool is shutting down. Don't accept new connections. if self.shutdown_token.is_cancelled() { return Err(tonic::Status::unavailable("Pool is shutting down")); @@ -395,10 +393,12 @@ impl ConnectionPool { } } Err(_) => { - match self_clone.aggregate_metrics { Some(ref metrics) => { - metrics.retry_counters.with_label_values(&["sema_acquire_failed"]).inc(); + metrics + .retry_counters + .with_label_values(&["sema_acquire_failed"]) + .inc(); } None => {} } @@ -490,10 +490,13 @@ impl ConnectionPool { // Generate a random backoff to add some jitter so that connections // don't all retry at the same time. let mut backoff_delay = Duration::from_millis( - rand::thread_rng().gen_range(0..=self.connect_backoff.as_millis() as u64)); + rand::thread_rng().gen_range(0..=self.connect_backoff.as_millis() as u64), + ); loop { - if self.shutdown_token.is_cancelled() { return; } + if self.shutdown_token.is_cancelled() { + return; + } // Back off. // Loop because failure can occur while we are sleeping, so wait @@ -504,8 +507,7 @@ impl ConnectionPool { if let Some(delay) = { let inner = self.inner.lock().await; inner.last_connect_failure.and_then(|at| { - (at.elapsed() < backoff_delay) - .then(|| backoff_delay - at.elapsed()) + (at.elapsed() < backoff_delay).then(|| backoff_delay - at.elapsed()) }) } { sleep(delay).await; @@ -523,7 +525,10 @@ impl ConnectionPool { // match self.aggregate_metrics { Some(ref metrics) => { - metrics.retry_counters.with_label_values(&["connection_attempt"]).inc(); + metrics + .retry_counters + .with_label_values(&["connection_attempt"]) + .inc(); } None => {} } @@ -543,7 +548,10 @@ impl ConnectionPool { { match self.aggregate_metrics { Some(ref metrics) => { - metrics.retry_counters.with_label_values(&["connection_success"]).inc(); + metrics + .retry_counters + .with_label_values(&["connection_success"]) + .inc(); } None => {} } @@ -568,7 +576,10 @@ impl ConnectionPool { Ok(Err(_)) | Err(_) => { match self.aggregate_metrics { Some(ref metrics) => { - metrics.retry_counters.with_label_values(&["connect_failed"]).inc(); + metrics + .retry_counters + .with_label_values(&["connect_failed"]) + .inc(); } None => {} } @@ -576,7 +587,8 @@ impl ConnectionPool { inner.last_connect_failure = Some(Instant::now()); // Add some jitter so that every connection doesn't retry at once let jitter = rand::thread_rng().gen_range(0..=backoff_delay.as_millis() as u64); - backoff_delay = Duration::from_millis(backoff_delay.as_millis() as u64 + jitter); + backoff_delay = + Duration::from_millis(backoff_delay.as_millis() as u64 + jitter); // Do not backoff longer than one minute if backoff_delay > Duration::from_secs(60) { @@ -588,7 +600,6 @@ impl ConnectionPool { } } - /// Return client to the pool, indicating success or error. pub async fn return_client(&self, id: uuid::Uuid, success: bool, permit: OwnedSemaphorePermit) { let mut inner = self.inner.lock().await; @@ -607,7 +618,10 @@ impl ConnectionPool { if entry.consecutive_errors == self.error_threshold { match self.aggregate_metrics { Some(ref metrics) => { - metrics.retry_counters.with_label_values(&["connection_dropped"]).inc(); + metrics + .retry_counters + .with_label_values(&["connection_dropped"]) + .inc(); } None => {} } @@ -657,6 +671,8 @@ impl PooledClient { } pub async fn finish(self, result: Result<(), tonic::Status>) { - self.pool.return_client(self.id, result.is_ok(), self.permit).await; + self.pool + .return_client(self.id, result.is_ok(), self.permit) + .await; } } diff --git a/pageserver/client_grpc/src/lib.rs b/pageserver/client_grpc/src/lib.rs index 4c1a4a5185..d005cddc3f 100644 --- a/pageserver/client_grpc/src/lib.rs +++ b/pageserver/client_grpc/src/lib.rs @@ -47,14 +47,14 @@ pub struct PageserverClientAggregateMetrics { } impl PageserverClientAggregateMetrics { pub fn new() -> Self { - let request_counters = IntCounterVec::new( metrics::core::Opts::new( "backend_requests_total", "Number of requests from backends.", ), &["request_kind"], - ).unwrap(); + ) + .unwrap(); let retry_counters = IntCounterVec::new( metrics::core::Opts::new( @@ -62,14 +62,15 @@ impl PageserverClientAggregateMetrics { "Number of retried requests from backends.", ), &["request_kind"], - ).unwrap(); + ) + .unwrap(); Self { request_counters, retry_counters, } } - pub fn collect (&self) -> Vec { + pub fn collect(&self) -> Vec { let mut metrics = Vec::new(); metrics.append(&mut self.request_counters.collect()); metrics.append(&mut self.retry_counters.collect()); @@ -132,7 +133,6 @@ impl PageserverClient { options: ClientCacheOptions, metrics: Option>, ) -> Self { - Self { _tenant_id: tenant_id.to_string(), _timeline_id: timeline_id.to_string(), @@ -230,7 +230,10 @@ impl PageserverClient { match self.aggregate_metrics { Some(ref metrics) => { - metrics.request_counters.with_label_values(&["get_page"]).inc(); + metrics + .request_counters + .with_label_values(&["get_page"]) + .inc(); } None => {} } diff --git a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs index f49ad68b2c..44874f2cf2 100644 --- a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs +++ b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs @@ -28,7 +28,6 @@ use axum::body::Body; use axum::extract::State; use axum::response::Response; - use http::StatusCode; use http::header::CONTENT_TYPE; @@ -170,8 +169,9 @@ pub(crate) fn main(args: Args) -> anyhow::Result<()> { main_impl(args, thread_local_stats) }) } -async fn get_metrics(State(state): State>) -> Response { - +async fn get_metrics( + State(state): State>, +) -> Response { let metrics = state.collect(); info!("metrics: {metrics:?}"); @@ -402,7 +402,10 @@ async fn main_impl( if args.grpc_stream { client_grpc_stream(args, worker_id, ss, cancel, rps_period, ranges, weights).await } else if args.grpc { - client_grpc(args, worker_id, new_value, ss, cancel, rps_period, ranges, weights).await + client_grpc( + args, worker_id, new_value, ss, cancel, rps_period, ranges, weights, + ) + .await } else { client_libpq(args, worker_id, ss, cancel, rps_period, ranges, weights).await } @@ -581,8 +584,6 @@ async fn client_grpc( let client = Arc::new(client); - - shared_state.start_work_barrier.wait().await; let client_start = Instant::now(); let mut ticks_processed = 0; diff --git a/pgxn/neon/communicator/src/global_allocator.rs b/pgxn/neon/communicator/src/global_allocator.rs index 9009b6d464..0c8e88071f 100644 --- a/pgxn/neon/communicator/src/global_allocator.rs +++ b/pgxn/neon/communicator/src/global_allocator.rs @@ -62,22 +62,15 @@ pub struct MyAllocatorCollector { impl MyAllocatorCollector { pub fn new() -> MyAllocatorCollector { MyAllocatorCollector { - allocations: IntGauge::new( - "allocations_total", - "Number of allocations in Rust code", - ).unwrap(), + allocations: IntGauge::new("allocations_total", "Number of allocations in Rust code") + .unwrap(), deallocations: IntGauge::new( "deallocations_total", "Number of deallocations in Rust code", - ).unwrap(), - allocated: IntGauge::new( - "allocated_total", - "Bytes currently allocated", - ).unwrap(), - high: IntGauge::new( - "allocated_high", - "High watermark of allocated bytes", - ).unwrap(), + ) + .unwrap(), + allocated: IntGauge::new("allocated_total", "Bytes currently allocated").unwrap(), + high: IntGauge::new("allocated_high", "High watermark of allocated bytes").unwrap(), } } } @@ -98,9 +91,12 @@ impl metrics::core::Collector for MyAllocatorCollector { let mut values = Vec::new(); // update the gauges - self.allocations.set(GLOBAL.allocations.load(Ordering::Relaxed) as i64); - self.deallocations.set(GLOBAL.allocations.load(Ordering::Relaxed) as i64); - self.allocated.set(GLOBAL.allocated.load(Ordering::Relaxed) as i64); + self.allocations + .set(GLOBAL.allocations.load(Ordering::Relaxed) as i64); + self.deallocations + .set(GLOBAL.allocations.load(Ordering::Relaxed) as i64); + self.allocated + .set(GLOBAL.allocated.load(Ordering::Relaxed) as i64); self.high.set(GLOBAL.high.load(Ordering::Relaxed) as i64); values.append(&mut self.allocations.collect()); diff --git a/pgxn/neon/communicator/src/init.rs b/pgxn/neon/communicator/src/init.rs index c642588840..db926a944c 100644 --- a/pgxn/neon/communicator/src/init.rs +++ b/pgxn/neon/communicator/src/init.rs @@ -83,6 +83,8 @@ pub extern "C" fn rcommunicator_shmem_init( max_procs: u32, shmem_area_ptr: *mut MaybeUninit, shmem_area_len: u64, + initial_file_cache_size: u64, + max_file_cache_size: u64, ) -> &'static mut CommunicatorInitStruct { let shmem_area: &'static mut [MaybeUninit] = unsafe { std::slice::from_raw_parts_mut(shmem_area_ptr, shmem_area_len as usize) }; @@ -107,8 +109,12 @@ pub extern "C" fn rcommunicator_shmem_init( }; // Give the rest of the area to the integrated cache - let integrated_cache_init_struct = - IntegratedCacheInitStruct::shmem_init(max_procs, remaining_area); + let integrated_cache_init_struct = IntegratedCacheInitStruct::shmem_init( + max_procs, + remaining_area, + initial_file_cache_size, + max_file_cache_size, + ); let (submission_pipe_read_fd, submission_pipe_write_fd) = unsafe { use std::os::fd::FromRawFd; diff --git a/pgxn/neon/communicator/src/integrated_cache.rs b/pgxn/neon/communicator/src/integrated_cache.rs index 6af5c8110b..78a99390e2 100644 --- a/pgxn/neon/communicator/src/integrated_cache.rs +++ b/pgxn/neon/communicator/src/integrated_cache.rs @@ -37,14 +37,6 @@ use neon_shmem::hash::HashMapInit; use neon_shmem::hash::UpdateAction; use neon_shmem::shmem::ShmemHandle; -/// in bytes -/// FIXME: calculate some reasonable upper bound -const MAX_BLOCK_MAP_SIZE: usize = 1024*1024*1024; - -/// # of entries in the block mapping -/// FIXME: make it resizable. -const BLOCK_MAP_SIZE: u32 = 1000; - // in # of entries const RELSIZE_CACHE_SIZE: u32 = 64 * 1024; @@ -84,12 +76,12 @@ pub struct IntegratedCacheReadAccess<'t> { block_map: neon_shmem::hash::HashMapAccess<'t, BlockKey, BlockEntry>, } - - impl<'t> IntegratedCacheInitStruct<'t> { /// Return the desired size in bytes of the fixed-size shared memory area to reserve for the /// integrated cache. pub fn shmem_size(_max_procs: u32) -> usize { + // The relsize cache is fixed-size. The block map is allocated in a separate resizable + // area. HashMapInit::::estimate_size(RELSIZE_CACHE_SIZE) } @@ -98,21 +90,30 @@ impl<'t> IntegratedCacheInitStruct<'t> { pub fn shmem_init( _max_procs: u32, shmem_area: &'t mut [MaybeUninit], + initial_file_cache_size: u64, + max_file_cache_size: u64, ) -> IntegratedCacheInitStruct<'t> { - // Initialize the hash map + // Initialize the relsize cache in the fixed-size area let relsize_cache_handle = neon_shmem::hash::HashMapInit::init_in_fixed_area(RELSIZE_CACHE_SIZE, shmem_area); - let shmem_handle = ShmemHandle::new("block mapping", 0, MAX_BLOCK_MAP_SIZE).unwrap(); + let max_bytes = + HashMapInit::::estimate_size(max_file_cache_size as u32); - let block_map_handle = - neon_shmem::hash::HashMapInit::init_in_shmem(BLOCK_MAP_SIZE, shmem_handle); + // Initialize the block map in a separate resizable shared memory area + let shmem_handle = ShmemHandle::new("block mapping", 0, max_bytes).unwrap(); + + let block_map_handle = neon_shmem::hash::HashMapInit::init_in_shmem( + initial_file_cache_size as u32, + shmem_handle, + ); IntegratedCacheInitStruct { relsize_cache_handle, block_map_handle, } } + /// Initialize access to the integrated cache for the communicator worker process pub fn worker_process_init( self, lsn: Lsn, @@ -165,6 +166,7 @@ impl<'t> IntegratedCacheInitStruct<'t> { } } + /// Initialize access to the integrated cache for a backend process pub fn backend_init(self) -> IntegratedCacheReadAccess<'t> { let IntegratedCacheInitStruct { relsize_cache_handle, @@ -198,16 +200,14 @@ struct RelEntry { impl std::fmt::Debug for RelEntry { fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { - fmt - .debug_struct("Rel") + fmt.debug_struct("Rel") .field("nblocks", &self.nblocks.load(Ordering::Relaxed)) .finish() } } impl std::fmt::Debug for BlockEntry { fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { - fmt - .debug_struct("Block") + fmt.debug_struct("Block") .field("lw_lsn", &self.lw_lsn.load()) .field("cache_block", &self.cache_block.load(Ordering::Relaxed)) .field("pinned", &self.pinned.load(Ordering::Relaxed)) @@ -268,8 +268,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> { block_number: u32, dst: impl uring_common::buf::IoBufMut + Send + Sync, ) -> Result, std::io::Error> { - let x = if let Some(block_entry) = - self.block_map.get(&BlockKey::from((rel, block_number))) + let x = if let Some(block_entry) = self.block_map.get(&BlockKey::from((rel, block_number))) { block_entry.referenced.store(true, Ordering::Relaxed); @@ -344,24 +343,23 @@ impl<'t> IntegratedCacheWriteAccess<'t> { } pub fn remember_rel_size(&'t self, rel: &RelTag, nblocks: u32) { - let result = self - .relsize_cache - .update_with_fn(&RelKey::from(rel), |existing| match existing { - None => { - tracing::info!("inserting rel entry for {rel:?}, {nblocks} blocks"); - UpdateAction::Insert(RelEntry { - nblocks: AtomicU32::new(nblocks), - }) - } - Some(e) => { - tracing::info!("updating rel entry for {rel:?}, {nblocks} blocks"); - e.nblocks.store(nblocks, Ordering::Relaxed); - UpdateAction::Nothing - } - }); + let result = + self.relsize_cache + .update_with_fn(&RelKey::from(rel), |existing| match existing { + None => { + tracing::info!("inserting rel entry for {rel:?}, {nblocks} blocks"); + UpdateAction::Insert(RelEntry { + nblocks: AtomicU32::new(nblocks), + }) + } + Some(e) => { + tracing::info!("updating rel entry for {rel:?}, {nblocks} blocks"); + e.nblocks.store(nblocks, Ordering::Relaxed); + UpdateAction::Nothing + } + }); - // FIXME: what to do if we run out of memory? Evict other relation entries? Remove - // block entries first? + // FIXME: what to do if we run out of memory? Evict other relation entries? result.expect("out of memory"); } @@ -606,31 +604,31 @@ impl<'t> IntegratedCacheWriteAccess<'t> { let mut evicted_cache_block = None; let res = self.block_map - .update_with_fn_at_bucket(*clock_hand % num_buckets, |old| { - match old { - None => UpdateAction::Nothing, - Some(old) => { - // note: all the accesses to 'pinned' currently happen - // within update_with_fn(), or while holding ValueReadGuard, which protects from concurrent - // updates. Otherwise, another thread could set the 'pinned' - // flag just after we have checked it here. - if old.pinned.load(Ordering::Relaxed) != 0 { - return UpdateAction::Nothing; - } + .update_with_fn_at_bucket(*clock_hand % num_buckets, |old| { + match old { + None => UpdateAction::Nothing, + Some(old) => { + // note: all the accesses to 'pinned' currently happen + // within update_with_fn(), or while holding ValueReadGuard, which protects from concurrent + // updates. Otherwise, another thread could set the 'pinned' + // flag just after we have checked it here. + if old.pinned.load(Ordering::Relaxed) != 0 { + return UpdateAction::Nothing; + } - let _ = self - .global_lw_lsn - .fetch_max(old.lw_lsn.load().0, Ordering::Relaxed); - let cache_block = old - .cache_block - .swap(INVALID_CACHE_BLOCK, Ordering::Relaxed); - if cache_block != INVALID_CACHE_BLOCK { - evicted_cache_block = Some(cache_block); + let _ = self + .global_lw_lsn + .fetch_max(old.lw_lsn.load().0, Ordering::Relaxed); + let cache_block = old + .cache_block + .swap(INVALID_CACHE_BLOCK, Ordering::Relaxed); + if cache_block != INVALID_CACHE_BLOCK { + evicted_cache_block = Some(cache_block); + } + UpdateAction::Remove } - UpdateAction::Remove } - } - }); + }); // Out of memory should not happen here, as we're only updating existing values, // not inserting new entries to the map. @@ -646,6 +644,21 @@ impl<'t> IntegratedCacheWriteAccess<'t> { None } + pub fn resize_file_cache(&self, num_blocks: u32) { + let old_num_blocks = self.block_map.get_num_buckets() as u32; + + if old_num_blocks < num_blocks { + if let Err(err) = self.block_map.grow(num_blocks) { + tracing::warn!( + "could not grow file cache to {} blocks (old size {}): {}", + num_blocks, + old_num_blocks, + err + ); + } + } + } + pub fn dump_map(&self, _dst: &mut dyn std::io::Write) { //FIXME self.cache_map.start_read().dump(dst); } diff --git a/pgxn/neon/communicator/src/worker_process/main_loop.rs b/pgxn/neon/communicator/src/worker_process/main_loop.rs index cb83b7d69c..5ab18c8eb8 100644 --- a/pgxn/neon/communicator/src/worker_process/main_loop.rs +++ b/pgxn/neon/communicator/src/worker_process/main_loop.rs @@ -71,13 +71,13 @@ pub(super) async fn init( timeline_id: String, auth_token: Option, shard_map: HashMap, - file_cache_size: u64, + initial_file_cache_size: u64, file_cache_path: Option, ) -> CommunicatorWorkerProcessStruct<'static> { let last_lsn = get_request_lsn(); let file_cache = if let Some(path) = file_cache_path { - Some(FileCache::new(&path, file_cache_size).expect("could not create cache file")) + Some(FileCache::new(&path, initial_file_cache_size).expect("could not create cache file")) } else { // FIXME: temporarily for testing, use LFC even if disabled Some( diff --git a/pgxn/neon/communicator/src/worker_process/worker_interface.rs b/pgxn/neon/communicator/src/worker_process/worker_interface.rs index 74f2711310..d91f109706 100644 --- a/pgxn/neon/communicator/src/worker_process/worker_interface.rs +++ b/pgxn/neon/communicator/src/worker_process/worker_interface.rs @@ -8,6 +8,7 @@ use tracing::error; use crate::init::CommunicatorInitStruct; use crate::worker_process::main_loop; +use crate::worker_process::main_loop::CommunicatorWorkerProcessStruct; /// Launch the communicator's tokio tasks, which do most of the work. /// @@ -24,8 +25,8 @@ pub extern "C" fn communicator_worker_process_launch( shard_map: *mut *mut c_char, nshards: u32, file_cache_path: *const c_char, - file_cache_size: u64, -) { + initial_file_cache_size: u64, +) -> &'static CommunicatorWorkerProcessStruct<'static> { // Convert the arguments into more convenient Rust types let tenant_id = unsafe { CStr::from_ptr(tenant_id) }.to_str().unwrap(); let timeline_id = unsafe { CStr::from_ptr(timeline_id) }.to_str().unwrap(); @@ -53,7 +54,7 @@ pub extern "C" fn communicator_worker_process_launch( timeline_id.to_string(), auth_token, shard_map, - file_cache_size, + initial_file_cache_size, file_cache_path, )); let worker_struct = Box::leak(Box::new(worker_struct)); @@ -69,6 +70,8 @@ pub extern "C" fn communicator_worker_process_launch( // keep the runtime running after we exit this function Box::leak(Box::new(runtime)); + + worker_struct } /// Convert the "shard map" from an array of C strings, indexed by shard no to a rust HashMap @@ -98,3 +101,12 @@ fn parse_shard_map( } result } + +/// Inform the rust code about a configuration change +#[unsafe(no_mangle)] +pub extern "C" fn communicator_worker_config_reload( + proc_handle: &'static CommunicatorWorkerProcessStruct<'static>, + file_cache_size: u64, +) { + proc_handle.cache.resize_file_cache(file_cache_size as u32); +} diff --git a/pgxn/neon/communicator_new.c b/pgxn/neon/communicator_new.c index fed3ea274b..b06a740d2a 100644 --- a/pgxn/neon/communicator_new.c +++ b/pgxn/neon/communicator_new.c @@ -46,6 +46,7 @@ * here. This code shouldn't be using the C file cache for anything else than * the GUCs. */ +extern int lfc_max_size; extern int lfc_size_limit; extern char *lfc_path; @@ -171,6 +172,8 @@ communicator_new_shmem_startup(void) size_t communicator_size; size_t shmem_size; void *shmem_ptr; + uint64 initial_file_cache_size; + uint64 max_file_cache_size; rc = pipe(pipefd); if (rc != 0) @@ -197,8 +200,17 @@ communicator_new_shmem_startup(void) for (int i = 0; i < MaxProcs; i++) InitSharedLatch(&communicator_shmem_ptr->backends[i].io_completion_latch); + /* lfc_size_limit is in MBs */ + initial_file_cache_size = lfc_size_limit * (1024 * 1024 / BLCKSZ); + max_file_cache_size = lfc_max_size * (1024 * 1024 / BLCKSZ); + if (initial_file_cache_size < 100) + initial_file_cache_size = 100; + if (max_file_cache_size < 100) + max_file_cache_size = 100; + /* Initialize the rust-managed parts */ - cis = rcommunicator_shmem_init(pipefd[0], pipefd[1], MaxProcs, shmem_ptr, shmem_size); + cis = rcommunicator_shmem_init(pipefd[0], pipefd[1], MaxProcs, shmem_ptr, shmem_size, + initial_file_cache_size, max_file_cache_size); } /**** Worker process functions. These run in the communicator worker process ****/ @@ -212,7 +224,8 @@ communicator_new_bgworker_main(Datum main_arg) struct LoggingState *logging; char errbuf[1000]; int elevel; - uint64 initial_file_cache_size; + uint64 file_cache_size; + const struct CommunicatorWorkerProcessStruct *proc_handle; /* * Pretend that this process is a WAL sender. That affects the shutdown @@ -222,7 +235,9 @@ communicator_new_bgworker_main(Datum main_arg) MarkPostmasterChildWalSender(); /* lfc_size_limit is in MBs */ - initial_file_cache_size = lfc_size_limit * (1024 * 1024 / BLCKSZ); + file_cache_size = lfc_size_limit * (1024 * 1024 / BLCKSZ); + if (file_cache_size < 100) + file_cache_size = 100; /* Establish signal handlers. */ pqsignal(SIGUSR1, procsignal_sigusr1_handler); @@ -240,7 +255,7 @@ communicator_new_bgworker_main(Datum main_arg) logging = configure_logging(); - communicator_worker_process_launch( + proc_handle = communicator_worker_process_launch( cis, neon_tenant, neon_timeline, @@ -248,7 +263,7 @@ communicator_new_bgworker_main(Datum main_arg) connstrs, num_shards, lfc_path, - initial_file_cache_size); + file_cache_size); cis = NULL; elog(LOG, "communicator threads started"); @@ -258,6 +273,18 @@ communicator_new_bgworker_main(Datum main_arg) CHECK_FOR_INTERRUPTS(); + if (ConfigReloadPending) + { + ConfigReloadPending = false; + ProcessConfigFile(PGC_SIGHUP); + + /* lfc_size_limit is in MBs */ + file_cache_size = lfc_size_limit * (1024 * 1024 / BLCKSZ); + if (file_cache_size < 100) + file_cache_size = 100; + communicator_worker_config_reload(proc_handle, file_cache_size); + } + for (;;) { rc = pump_logging(logging, (uint8 *) errbuf, sizeof(errbuf), &elevel); diff --git a/pgxn/neon/file_cache.c b/pgxn/neon/file_cache.c index c930753dc0..2f9536ffd6 100644 --- a/pgxn/neon/file_cache.c +++ b/pgxn/neon/file_cache.c @@ -183,7 +183,7 @@ typedef struct FileCacheControl static HTAB *lfc_hash; static int lfc_desc = -1; static LWLockId lfc_lock; -static int lfc_max_size; +int lfc_max_size; int lfc_size_limit; static int lfc_prewarm_limit; static int lfc_prewarm_batch; diff --git a/pgxn/neon/file_cache.h b/pgxn/neon/file_cache.h index a392063862..1b6ff36164 100644 --- a/pgxn/neon/file_cache.h +++ b/pgxn/neon/file_cache.h @@ -26,6 +26,7 @@ typedef struct FileCacheState /* GUCs */ extern bool lfc_store_prefetch_result; +extern int lfc_max_size; extern int lfc_size_limit; extern char *lfc_path;