diff --git a/pgxn/neon/communicator/src/file_cache.rs b/pgxn/neon/communicator/src/file_cache.rs index d754428fa5..05bbe1a57e 100644 --- a/pgxn/neon/communicator/src/file_cache.rs +++ b/pgxn/neon/communicator/src/file_cache.rs @@ -33,6 +33,8 @@ pub struct FileCache { } // TODO: We keep track of all free blocks in this vec. That doesn't really scale. +// Idea: when free_blocks fills up with more than 1024 entries, write them all to +// one block on disk. struct FreeList { next_free_block: CacheBlock, max_blocks: u64, diff --git a/pgxn/neon/communicator/src/global_allocator.rs b/pgxn/neon/communicator/src/global_allocator.rs new file mode 100644 index 0000000000..9009b6d464 --- /dev/null +++ b/pgxn/neon/communicator/src/global_allocator.rs @@ -0,0 +1,113 @@ +//! Global allocator, for tracking memory usage of the Rust parts +//! +//! Postgres is designed to handle allocation failure (ie. malloc() returning NULL) gracefully. It +//! rolls backs the transaction and gives the user an "ERROR: out of memory" error. Rust code +//! however panics if an allocation fails. We don't want that to ever happen, because an unhandled +//! panic leads to Postgres crash and restart. Our strategy is to pre-allocate a large enough chunk +//! of memory for use by the Rust code, so that the allocations never fail. +//! +//! To pick the size for the pre-allocated chunk, we have a metric to track the high watermark +//! memory usage of all the Rust allocations in total. +//! +//! TODO: +//! +//! - Currently we just export the metrics. Actual allocations are still just passed through to +//! the system allocator. +//! - Take padding etc. overhead into account + +use std::alloc::{GlobalAlloc, Layout, System}; +use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; + +use metrics::IntGauge; + +struct MyAllocator { + allocations: AtomicU64, + deallocations: AtomicU64, + + allocated: AtomicUsize, + high: AtomicUsize, +} + +unsafe impl GlobalAlloc for MyAllocator { + unsafe fn alloc(&self, layout: Layout) -> *mut u8 { + self.allocations.fetch_add(1, Ordering::Relaxed); + let mut allocated = self.allocated.fetch_add(layout.size(), Ordering::Relaxed); + allocated += layout.size(); + self.high.fetch_max(allocated, Ordering::Relaxed); + unsafe { System.alloc(layout) } + } + + unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) { + self.deallocations.fetch_add(1, Ordering::Relaxed); + self.allocated.fetch_sub(layout.size(), Ordering::Relaxed); + unsafe { System.dealloc(ptr, layout) } + } +} + +#[global_allocator] +static GLOBAL: MyAllocator = MyAllocator { + allocations: AtomicU64::new(0), + deallocations: AtomicU64::new(0), + allocated: AtomicUsize::new(0), + high: AtomicUsize::new(0), +}; + +pub struct MyAllocatorCollector { + allocations: IntGauge, + deallocations: IntGauge, + allocated: IntGauge, + high: IntGauge, +} + +impl MyAllocatorCollector { + pub fn new() -> MyAllocatorCollector { + MyAllocatorCollector { + allocations: IntGauge::new( + "allocations_total", + "Number of allocations in Rust code", + ).unwrap(), + deallocations: IntGauge::new( + "deallocations_total", + "Number of deallocations in Rust code", + ).unwrap(), + allocated: IntGauge::new( + "allocated_total", + "Bytes currently allocated", + ).unwrap(), + high: IntGauge::new( + "allocated_high", + "High watermark of allocated bytes", + ).unwrap(), + } + } +} + +impl metrics::core::Collector for MyAllocatorCollector { + fn desc(&self) -> Vec<&metrics::core::Desc> { + let mut descs = Vec::new(); + + descs.append(&mut self.allocations.desc()); + descs.append(&mut self.deallocations.desc()); + descs.append(&mut self.allocated.desc()); + descs.append(&mut self.high.desc()); + + descs + } + + fn collect(&self) -> Vec { + let mut values = Vec::new(); + + // update the gauges + self.allocations.set(GLOBAL.allocations.load(Ordering::Relaxed) as i64); + self.deallocations.set(GLOBAL.allocations.load(Ordering::Relaxed) as i64); + self.allocated.set(GLOBAL.allocated.load(Ordering::Relaxed) as i64); + self.high.set(GLOBAL.high.load(Ordering::Relaxed) as i64); + + values.append(&mut self.allocations.collect()); + values.append(&mut self.deallocations.collect()); + values.append(&mut self.allocated.collect()); + values.append(&mut self.high.collect()); + + values + } +} diff --git a/pgxn/neon/communicator/src/integrated_cache.rs b/pgxn/neon/communicator/src/integrated_cache.rs index 32fe07b3fd..6af5c8110b 100644 --- a/pgxn/neon/communicator/src/integrated_cache.rs +++ b/pgxn/neon/communicator/src/integrated_cache.rs @@ -216,15 +216,7 @@ impl std::fmt::Debug for BlockEntry { } } -#[derive( - Clone, - Debug, - PartialEq, - PartialOrd, - Eq, - Hash, - Ord, -)] +#[derive(Clone, Debug, PartialEq, PartialOrd, Eq, Hash, Ord)] struct RelKey(RelTag); impl From<&RelTag> for RelKey { diff --git a/pgxn/neon/communicator/src/lib.rs b/pgxn/neon/communicator/src/lib.rs index 3e4773983a..fbe582df78 100644 --- a/pgxn/neon/communicator/src/lib.rs +++ b/pgxn/neon/communicator/src/lib.rs @@ -21,5 +21,7 @@ mod integrated_cache; mod neon_request; mod worker_process; +mod global_allocator; + // FIXME get this from postgres headers somehow pub const BLCKSZ: usize = 8192; diff --git a/pgxn/neon/communicator/src/worker_process/main_loop.rs b/pgxn/neon/communicator/src/worker_process/main_loop.rs index 44700fe0c1..cb83b7d69c 100644 --- a/pgxn/neon/communicator/src/worker_process/main_loop.rs +++ b/pgxn/neon/communicator/src/worker_process/main_loop.rs @@ -6,6 +6,7 @@ use std::sync::atomic::{AtomicU64, Ordering}; use crate::backend_comms::NeonIOHandle; use crate::file_cache::FileCache; +use crate::global_allocator::MyAllocatorCollector; use crate::init::CommunicatorInitStruct; use crate::integrated_cache::{CacheResult, IntegratedCacheWriteAccess}; use crate::neon_request::{CGetPageVRequest, CPrefetchVRequest}; @@ -60,6 +61,8 @@ pub struct CommunicatorWorkerProcessStruct<'a> { request_get_pagev_nblocks_counter: IntCounter, request_prefetchv_nblocks_counter: IntCounter, request_rel_zero_extend_nblocks_counter: IntCounter, + + allocator_metrics: MyAllocatorCollector, } pub(super) async fn init( @@ -166,6 +169,8 @@ pub(super) async fn init( request_get_pagev_nblocks_counter, request_prefetchv_nblocks_counter, request_rel_zero_extend_nblocks_counter, + + allocator_metrics: MyAllocatorCollector::new(), } } @@ -578,6 +583,7 @@ impl<'t> metrics::core::Collector for CommunicatorWorkerProcessStruct<'t> { descs.append(&mut file_cache.desc()); } descs.append(&mut self.cache.desc()); + descs.append(&mut self.allocator_metrics.desc()); descs } @@ -593,6 +599,7 @@ impl<'t> metrics::core::Collector for CommunicatorWorkerProcessStruct<'t> { values.append(&mut file_cache.collect()); } values.append(&mut self.cache.collect()); + values.append(&mut self.allocator_metrics.collect()); values }