diff --git a/libs/neonart/src/algorithm.rs b/libs/neonart/src/algorithm.rs index f1ee34c140..c0c4b19e93 100644 --- a/libs/neonart/src/algorithm.rs +++ b/libs/neonart/src/algorithm.rs @@ -3,7 +3,6 @@ pub(crate) mod node_ptr; mod node_ref; use std::vec::Vec; -use std::sync::atomic::Ordering; use crate::algorithm::lock_and_version::ConcurrentUpdateError; use crate::algorithm::node_ptr::MAX_PREFIX_LEN; @@ -254,7 +253,6 @@ where UpdateAction::Nothing => {} UpdateAction::Insert(new_value) => { insert_split_prefix(key, new_value, &mut wnode, &mut wparent, parent_key, guard)?; - guard.tree_writer.tree.num_values.fetch_add(1, Ordering::Relaxed); } UpdateAction::Remove => { panic!("unexpected Remove action on insertion"); @@ -287,7 +285,6 @@ where // TODO: If parent has only one child left, merge it with the child, extending its // prefix wparent.delete_child(parent_key); - guard.tree_writer.tree.num_values.fetch_sub(1, Ordering::Relaxed); } } wnode.write_unlock(); @@ -313,7 +310,6 @@ where insert_and_grow(key, new_value, &wnode, &mut wparent, parent_key, guard)?; wnode.write_unlock_obsolete(); wparent.write_unlock(); - guard.tree_writer.tree.num_values.fetch_add(1, Ordering::Relaxed); } UpdateAction::Remove => { panic!("unexpected Remove action on insertion"); @@ -328,7 +324,6 @@ where UpdateAction::Nothing => {} UpdateAction::Insert(new_value) => { insert_to_node(&mut wnode, key, new_value, guard)?; - guard.tree_writer.tree.num_values.fetch_add(1, Ordering::Relaxed); } UpdateAction::Remove => { panic!("unexpected Remove action on insertion"); diff --git a/libs/neonart/src/allocator.rs b/libs/neonart/src/allocator.rs index ce3fe08c21..cb962fa33f 100644 --- a/libs/neonart/src/allocator.rs +++ b/libs/neonart/src/allocator.rs @@ -6,6 +6,7 @@ pub mod r#static; use std::alloc::Layout; use std::marker::PhantomData; use std::mem::MaybeUninit; +use std::sync::atomic::Ordering; use crate::allocator::multislab::MultiSlabAllocator; use crate::allocator::r#static::alloc_from_slice; @@ -111,3 +112,46 @@ impl<'t, V: crate::Value> ArtAllocator for ArtMultiSlabAllocator<'t, V> { self.inner.dealloc_slab(4, ptr.cast()) } } + +impl<'t, V: crate::Value> ArtMultiSlabAllocator<'t, V> { + pub(crate) fn get_statistics(&self) -> ArtMultiSlabStats { + ArtMultiSlabStats { + num_internal4: self.inner.slab_descs[0] + .num_allocated + .load(Ordering::Relaxed), + num_internal16: self.inner.slab_descs[1] + .num_allocated + .load(Ordering::Relaxed), + num_internal48: self.inner.slab_descs[2] + .num_allocated + .load(Ordering::Relaxed), + num_internal256: self.inner.slab_descs[3] + .num_allocated + .load(Ordering::Relaxed), + num_leaf: self.inner.slab_descs[4] + .num_allocated + .load(Ordering::Relaxed), + + num_blocks_internal4: self.inner.slab_descs[0].num_blocks.load(Ordering::Relaxed), + num_blocks_internal16: self.inner.slab_descs[1].num_blocks.load(Ordering::Relaxed), + num_blocks_internal48: self.inner.slab_descs[2].num_blocks.load(Ordering::Relaxed), + num_blocks_internal256: self.inner.slab_descs[3].num_blocks.load(Ordering::Relaxed), + num_blocks_leaf: self.inner.slab_descs[4].num_blocks.load(Ordering::Relaxed), + } + } +} + +#[derive(Clone, Debug)] +pub struct ArtMultiSlabStats { + pub num_internal4: u64, + pub num_internal16: u64, + pub num_internal48: u64, + pub num_internal256: u64, + pub num_leaf: u64, + + pub num_blocks_internal4: u64, + pub num_blocks_internal16: u64, + pub num_blocks_internal48: u64, + pub num_blocks_internal256: u64, + pub num_blocks_leaf: u64, +} diff --git a/libs/neonart/src/allocator/slab.rs b/libs/neonart/src/allocator/slab.rs index 6b69157af7..8f3dae3ed9 100644 --- a/libs/neonart/src/allocator/slab.rs +++ b/libs/neonart/src/allocator/slab.rs @@ -1,7 +1,7 @@ use std::alloc::Layout; use std::mem::MaybeUninit; use std::ops::Deref; -use std::sync::atomic::{AtomicU32, Ordering}; +use std::sync::atomic::{AtomicU32, AtomicU64, Ordering}; use spin; @@ -14,6 +14,9 @@ pub(crate) struct SlabDesc { pub(crate) layout: Layout, block_lists: spin::RwLock, + + pub(crate) num_blocks: AtomicU64, + pub(crate) num_allocated: AtomicU64, } unsafe impl Sync for SlabDesc {} @@ -75,6 +78,8 @@ impl SlabDesc { SlabDesc { layout: *layout, block_lists: spin::RwLock::new(BlockLists::default()), + num_allocated: AtomicU64::new(0), + num_blocks: AtomicU64::new(0), } } } @@ -130,6 +135,9 @@ impl SlabDesc { let result = *free_chunks_head; (*free_chunks_head) = (*result).next; (*block_ptr).num_free_chunks.fetch_sub(1, Ordering::Relaxed); + + self.num_allocated.fetch_add(1, Ordering::Relaxed); + return result.cast(); } } @@ -153,6 +161,7 @@ impl SlabDesc { // no free chunks. Allocate a new block (and the chunk from that) let (new_block, new_chunk) = self.alloc_block_and_chunk(block_allocator); + self.num_blocks.fetch_add(1, Ordering::Relaxed); // Add the block to the list in the SlabDesc unsafe { @@ -160,6 +169,7 @@ impl SlabDesc { block_lists_guard.nonfull_blocks.push_head(new_block); } + self.num_allocated.fetch_add(1, Ordering::Relaxed); new_chunk } @@ -195,6 +205,7 @@ impl SlabDesc { // the free blocks list, is it? Defer it as garbage to wait out concurrent updates? //block_allocator.release_block() } + self.num_allocated.fetch_sub(1, Ordering::Relaxed); } fn alloc_block_and_chunk( diff --git a/libs/neonart/src/lib.rs b/libs/neonart/src/lib.rs index c8ccaa9647..be78a2d37c 100644 --- a/libs/neonart/src/lib.rs +++ b/libs/neonart/src/lib.rs @@ -131,7 +131,7 @@ use std::collections::VecDeque; use std::fmt::Debug; use std::marker::PhantomData; use std::ptr::NonNull; -use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; +use std::sync::atomic::{AtomicBool, Ordering}; use crate::epoch::EpochPin; @@ -164,9 +164,6 @@ pub struct Tree { writer_attached: AtomicBool, epoch: epoch::EpochShared, - - // for metrics - num_values: AtomicU64, } unsafe impl Sync for Tree {} @@ -247,7 +244,6 @@ impl<'a, 't: 'a, K: Key, V: Value, A: ArtAllocator> TreeInitStruct<'t, K, V, root: algorithm::new_root(allocator), writer_attached: AtomicBool::new(false), epoch: epoch::EpochShared::new(), - num_values: AtomicU64::new(0), }; unsafe { tree_ptr.write(init) }; @@ -377,8 +373,7 @@ impl<'e, K: Key, V: Value, A: ArtAllocator> TreeWriteGuard<'e, K, V, A> { } /// Remove value. Returns true if it existed - pub fn remove(self, key: &K) -> bool - { + pub fn remove(self, key: &K) -> bool { let mut result = false; self.update_with_fn(key, |existing| { result = existing.is_some(); @@ -557,15 +552,16 @@ impl<'e, K: Key, V: Value + Debug> TreeReadGuard<'e, K, V> { } impl<'e, K: Key, V: Value> TreeWriteAccess<'e, K, V, ArtMultiSlabAllocator<'e, V>> { pub fn get_statistics(&self) -> ArtTreeStatistics { + self.allocator.get_statistics(); ArtTreeStatistics { - num_values: self.tree.num_values.load(Ordering::Relaxed), blocks: self.allocator.inner.block_allocator.get_statistics(), + slabs: self.allocator.get_statistics(), } } } #[derive(Clone, Debug)] pub struct ArtTreeStatistics { - pub num_values: u64, pub blocks: allocator::block::BlockAllocatorStats, + pub slabs: allocator::ArtMultiSlabStats, } diff --git a/libs/neonart/src/tests.rs b/libs/neonart/src/tests.rs index 4d6d0aceed..5e9b6e7f24 100644 --- a/libs/neonart/src/tests.rs +++ b/libs/neonart/src/tests.rs @@ -68,7 +68,7 @@ fn test_inserts + Copy>(keys: &[K]) { assert_eq!(value, Some(idx).as_ref()); } - eprintln!("stats: {:?}", tree_writer.start_write().get_statistics()); + eprintln!("stats: {:?}", tree_writer.get_statistics()); } #[test] @@ -225,7 +225,7 @@ fn random_ops() { if i % 1000 == 0 { eprintln!("{i} ops processed"); - eprintln!("stats: {:?}", tree_writer.start_write().get_statistics()); + eprintln!("stats: {:?}", tree_writer.get_statistics()); test_iter(&tree_writer, &shadow); } } diff --git a/pgxn/neon/communicator/src/integrated_cache.rs b/pgxn/neon/communicator/src/integrated_cache.rs index 29eae38fc4..08da176331 100644 --- a/pgxn/neon/communicator/src/integrated_cache.rs +++ b/pgxn/neon/communicator/src/integrated_cache.rs @@ -26,17 +26,19 @@ use std::mem::MaybeUninit; use std::ops::Range; use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering}; -use utils::lsn::{Lsn, AtomicLsn}; +use utils::lsn::{AtomicLsn, Lsn}; use zerocopy::FromBytes; -use crate::file_cache::{CacheBlock, FileCache}; use crate::file_cache::INVALID_CACHE_BLOCK; +use crate::file_cache::{CacheBlock, FileCache}; use pageserver_page_api::model::RelTag; +use metrics::{IntCounter, IntGauge, IntGaugeVec}; + use neonart; -use neonart::UpdateAction; use neonart::TreeInitStruct; use neonart::TreeIterator; +use neonart::UpdateAction; const CACHE_AREA_SIZE: usize = 10 * 1024 * 1024; @@ -66,13 +68,26 @@ pub struct IntegratedCacheWriteAccess<'t> { clock_hand: std::sync::Mutex>, // Metrics - entries_total: metrics::IntGauge, - page_evictions_counter: metrics::IntCounter, - clock_iterations_counter: metrics::IntCounter, + page_evictions_counter: IntCounter, + clock_iterations_counter: IntCounter, + + nodes_total: IntGaugeVec, + nodes_leaf_total: IntGauge, + nodes_internal4_total: IntGauge, + nodes_internal16_total: IntGauge, + nodes_internal48_total: IntGauge, + nodes_internal256_total: IntGauge, + + nodes_memory_bytes: IntGaugeVec, + nodes_memory_leaf_bytes: IntGauge, + nodes_memory_internal4_bytes: IntGauge, + nodes_memory_internal16_bytes: IntGauge, + nodes_memory_internal48_bytes: IntGauge, + nodes_memory_internal256_bytes: IntGauge, // metrics from the art tree - cache_memory_size_bytes: metrics::IntGauge, - cache_memory_used_bytes: metrics::IntGauge, + cache_memory_size_bytes: IntGauge, + cache_memory_used_bytes: IntGauge, } /// Represents read-only access to the integrated cache. Backend processes have this. @@ -112,35 +127,73 @@ impl<'t> IntegratedCacheInitStruct<'t> { } = self; let tree_writer = handle.attach_writer(); + let nodes_total = IntGaugeVec::new( + metrics::core::Opts::new("nodes_total", "Number of nodes in cache tree."), + &["node_kind"], + ) + .unwrap(); + let nodes_leaf_total = nodes_total.with_label_values(&["leaf"]); + let nodes_internal4_total = nodes_total.with_label_values(&["internal4"]); + let nodes_internal16_total = nodes_total.with_label_values(&["internal16"]); + let nodes_internal48_total = nodes_total.with_label_values(&["internal48"]); + let nodes_internal256_total = nodes_total.with_label_values(&["internal256"]); + + let nodes_memory_bytes = IntGaugeVec::new( + metrics::core::Opts::new( + "nodes_memory_bytes", + "Memory reserved for nodes in cache tree.", + ), + &["node_kind"], + ) + .unwrap(); + let nodes_memory_leaf_bytes = nodes_memory_bytes.with_label_values(&["leaf"]); + let nodes_memory_internal4_bytes = nodes_memory_bytes.with_label_values(&["internal4"]); + let nodes_memory_internal16_bytes = nodes_memory_bytes.with_label_values(&["internal16"]); + let nodes_memory_internal48_bytes = nodes_memory_bytes.with_label_values(&["internal48"]); + let nodes_memory_internal256_bytes = nodes_memory_bytes.with_label_values(&["internal256"]); + IntegratedCacheWriteAccess { cache_tree: tree_writer, global_lw_lsn: AtomicU64::new(lsn.0), file_cache, clock_hand: std::sync::Mutex::new(TreeIterator::new_wrapping()), - entries_total: metrics::IntGauge::new( - "entries_total", - "Number of entries in the cache", - ).unwrap(), - page_evictions_counter: metrics::IntCounter::new( "integrated_cache_evictions", "Page evictions from the Local File Cache", - ).unwrap(), + ) + .unwrap(), clock_iterations_counter: metrics::IntCounter::new( "clock_iterations", "Number of times the clock hand has moved", - ).unwrap(), + ) + .unwrap(), + + nodes_total, + nodes_leaf_total, + nodes_internal4_total, + nodes_internal16_total, + nodes_internal48_total, + nodes_internal256_total, + + nodes_memory_bytes, + nodes_memory_leaf_bytes, + nodes_memory_internal4_bytes, + nodes_memory_internal16_bytes, + nodes_memory_internal48_bytes, + nodes_memory_internal256_bytes, cache_memory_size_bytes: metrics::IntGauge::new( "cache_memory_size_bytes", "Memory reserved for cache metadata", - ).unwrap(), + ) + .unwrap(), cache_memory_used_bytes: metrics::IntGauge::new( "cache_memory_size_bytes", "Memory used for cache metadata", - ).unwrap(), + ) + .unwrap(), } } @@ -369,17 +422,14 @@ impl<'t> IntegratedCacheWriteAccess<'t> { pub fn remember_rel_size(&'t self, rel: &RelTag, nblocks: u32) { let w = self.cache_tree.start_write(); - w.update_with_fn(&TreeKey::from(rel), |existing| { - match existing { - None => UpdateAction::Insert( - TreeEntry::Rel(RelEntry { - nblocks: AtomicU32::new(nblocks), - })), - Some(TreeEntry::Block(_)) => panic!("unexpected tree entry type for rel key"), - Some(TreeEntry::Rel(rel)) => { - rel.nblocks.store(nblocks, Ordering::Relaxed); - UpdateAction::Nothing - } + w.update_with_fn(&TreeKey::from(rel), |existing| match existing { + None => UpdateAction::Insert(TreeEntry::Rel(RelEntry { + nblocks: AtomicU32::new(nblocks), + })), + Some(TreeEntry::Block(_)) => panic!("unexpected tree entry type for rel key"), + Some(TreeEntry::Rel(rel)) => { + rel.nblocks.store(nblocks, Ordering::Relaxed); + UpdateAction::Nothing } }); } @@ -470,7 +520,12 @@ impl<'t> IntegratedCacheWriteAccess<'t> { }; // Update the cache block - let old_blk = block_entry.cache_block.compare_exchange(INVALID_CACHE_BLOCK, cache_block, Ordering::Relaxed, Ordering::Relaxed); + let old_blk = block_entry.cache_block.compare_exchange( + INVALID_CACHE_BLOCK, + cache_block, + Ordering::Relaxed, + Ordering::Relaxed, + ); assert!(old_blk == Ok(INVALID_CACHE_BLOCK) || old_blk == Err(cache_block)); block_entry.lw_lsn.store(lw_lsn); @@ -480,9 +535,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> { let was_pinned = block_entry.pinned.swap(false, Ordering::Relaxed); assert!(was_pinned); UpdateAction::Nothing - } - else - { + } else { UpdateAction::Insert(TreeEntry::Block(BlockEntry { lw_lsn: AtomicLsn::new(lw_lsn.0), cache_block: AtomicU64::new(cache_block), @@ -593,7 +646,6 @@ impl<'t> IntegratedCacheWriteAccess<'t> { None => UpdateAction::Nothing, Some(TreeEntry::Rel(_)) => panic!("unexpected Rel entry"), Some(TreeEntry::Block(old)) => { - // note: all the accesses to 'pinned' currently happen // within update_with_fn(), which protects from concurrent // updates. Otherwise, another thread could set the 'pinned' @@ -605,7 +657,9 @@ impl<'t> IntegratedCacheWriteAccess<'t> { let _ = self .global_lw_lsn .fetch_max(old.lw_lsn.load().0, Ordering::Relaxed); - let cache_block = old.cache_block.swap(INVALID_CACHE_BLOCK, Ordering::Relaxed); + let cache_block = old + .cache_block + .swap(INVALID_CACHE_BLOCK, Ordering::Relaxed); if cache_block != INVALID_CACHE_BLOCK { evicted_cache_block = Some(cache_block); } @@ -631,7 +685,8 @@ impl<'t> IntegratedCacheWriteAccess<'t> { impl metrics::core::Collector for IntegratedCacheWriteAccess<'_> { fn desc(&self) -> Vec<&metrics::core::Desc> { let mut descs = Vec::new(); - descs.append(&mut self.entries_total.desc()); + descs.append(&mut self.nodes_total.desc()); + descs.append(&mut self.nodes_memory_bytes.desc()); descs.append(&mut self.page_evictions_counter.desc()); descs.append(&mut self.clock_iterations_counter.desc()); @@ -640,15 +695,43 @@ impl metrics::core::Collector for IntegratedCacheWriteAccess<'_> { descs } fn collect(&self) -> Vec { + const ALLOC_BLOCK_SIZE: i64 = neonart::allocator::block::BLOCK_SIZE as i64; + // Update gauges let art_statistics = self.cache_tree.get_statistics(); - self.entries_total.set(art_statistics.num_values as i64); + self.nodes_leaf_total + .set(art_statistics.slabs.num_leaf as i64); + self.nodes_internal4_total + .set(art_statistics.slabs.num_internal4 as i64); + self.nodes_internal16_total + .set(art_statistics.slabs.num_internal16 as i64); + self.nodes_internal48_total + .set(art_statistics.slabs.num_internal48 as i64); + self.nodes_internal256_total + .set(art_statistics.slabs.num_internal256 as i64); + + self.nodes_memory_leaf_bytes + .set(art_statistics.slabs.num_blocks_leaf as i64 * ALLOC_BLOCK_SIZE); + self.nodes_memory_internal4_bytes + .set(art_statistics.slabs.num_blocks_internal4 as i64 * ALLOC_BLOCK_SIZE); + self.nodes_memory_internal16_bytes + .set(art_statistics.slabs.num_blocks_internal16 as i64 * ALLOC_BLOCK_SIZE); + self.nodes_memory_internal48_bytes + .set(art_statistics.slabs.num_blocks_internal48 as i64 * ALLOC_BLOCK_SIZE); + self.nodes_memory_internal256_bytes + .set(art_statistics.slabs.num_blocks_internal256 as i64 * ALLOC_BLOCK_SIZE); + let block_statistics = &art_statistics.blocks; - self.cache_memory_size_bytes.set(block_statistics.num_blocks as i64 * neonart::allocator::block::BLOCK_SIZE as i64); - self.cache_memory_used_bytes.set((block_statistics.num_initialized as i64 - block_statistics.num_free_blocks as i64 ) * neonart::allocator::block::BLOCK_SIZE as i64); + self.cache_memory_size_bytes + .set(block_statistics.num_blocks as i64 * ALLOC_BLOCK_SIZE as i64); + self.cache_memory_used_bytes.set( + (block_statistics.num_initialized as i64 - block_statistics.num_free_blocks as i64) + * ALLOC_BLOCK_SIZE as i64, + ); let mut values = Vec::new(); - values.append(&mut self.entries_total.collect()); + values.append(&mut self.nodes_total.collect()); + values.append(&mut self.nodes_memory_bytes.collect()); values.append(&mut self.page_evictions_counter.collect()); values.append(&mut self.clock_iterations_counter.collect()); diff --git a/pgxn/neon/communicator/src/worker_process/in_progress_ios.rs b/pgxn/neon/communicator/src/worker_process/in_progress_ios.rs index 378f114d8d..71560b8b46 100644 --- a/pgxn/neon/communicator/src/worker_process/in_progress_ios.rs +++ b/pgxn/neon/communicator/src/worker_process/in_progress_ios.rs @@ -1,6 +1,6 @@ +use std::cmp::Eq; use std::hash::Hash; use std::sync::Arc; -use std::cmp::Eq; use tokio::sync::{Mutex, OwnedMutexGuard}; @@ -21,22 +21,25 @@ pub type RequestInProgressTable = MutexHashSet; // more primitive locking thingie: pub struct MutexHashSet - where K: Clone + Eq + Hash +where + K: Clone + Eq + Hash, { lock_table: ClashMap>>, } pub struct MutexHashSetGuard<'a, K> - where K: Clone + Eq + Hash +where + K: Clone + Eq + Hash, { pub key: K, - set: &'a MutexHashSet, + set: &'a MutexHashSet, mutex: Arc>, _guard: OwnedMutexGuard<()>, } impl<'a, K> Drop for MutexHashSetGuard<'a, K> - where K: Clone + Eq + Hash +where + K: Clone + Eq + Hash, { fn drop(&mut self) { let (_old_key, old_val) = self.set.lock_table.remove(&self.key).unwrap(); @@ -47,7 +50,8 @@ impl<'a, K> Drop for MutexHashSetGuard<'a, K> } impl MutexHashSet - where K: Clone + Eq + Hash +where + K: Clone + Eq + Hash, { pub fn new() -> MutexHashSet { MutexHashSet { @@ -55,8 +59,7 @@ impl MutexHashSet } } - pub async fn lock<'a>(&'a self, key: K) -> MutexHashSetGuard<'a, K> - { + pub async fn lock<'a>(&'a self, key: K) -> MutexHashSetGuard<'a, K> { let my_mutex = Arc::new(Mutex::new(())); let my_guard = Arc::clone(&my_mutex).lock_owned().await; diff --git a/pgxn/neon/communicator/src/worker_process/main_loop.rs b/pgxn/neon/communicator/src/worker_process/main_loop.rs index 91bcf4d46d..59edbdb831 100644 --- a/pgxn/neon/communicator/src/worker_process/main_loop.rs +++ b/pgxn/neon/communicator/src/worker_process/main_loop.rs @@ -8,11 +8,11 @@ use crate::init::CommunicatorInitStruct; use crate::integrated_cache::{CacheResult, IntegratedCacheWriteAccess}; use crate::neon_request::{CGetPageVRequest, CPrefetchVRequest}; use crate::neon_request::{NeonIORequest, NeonIOResult}; -use crate::worker_process::in_progress_ios::{RequestInProgressTable, RequestInProgressKey}; +use crate::worker_process::in_progress_ios::{RequestInProgressKey, RequestInProgressTable}; use pageserver_client_grpc::PageserverClient; use pageserver_page_api::model; -use metrics::{IntCounterVec, IntCounter}; +use metrics::{IntCounter, IntCounterVec}; use tokio::io::AsyncReadExt; use tokio_pipe::PipeRead; @@ -60,7 +60,6 @@ pub struct CommunicatorWorkerProcessStruct<'a> { request_rel_zero_extend_nblocks_counter: IntCounter, } - pub(super) async fn init( cis: Box, tenant_id: String, @@ -90,9 +89,13 @@ pub(super) async fn init( let pageserver_client = PageserverClient::new(&tenant_id, &timeline_id, &auth_token, shard_map); let request_counters = IntCounterVec::new( - metrics::core::Opts::new("backend_requests_total", "Number of requests from backends."), + metrics::core::Opts::new( + "backend_requests_total", + "Number of requests from backends.", + ), &["request_kind"], - ).unwrap(); + ) + .unwrap(); let request_rel_exists_counter = request_counters.with_label_values(&["rel_exists"]); let request_rel_size_counter = request_counters.with_label_values(&["rel_size"]); let request_get_pagev_counter = request_counters.with_label_values(&["get_pagev"]); @@ -106,20 +109,31 @@ pub(super) async fn init( let request_rel_unlink_counter = request_counters.with_label_values(&["rel_unlink"]); let getpage_cache_misses_counter = IntCounter::new( - "getpage_cache_misses", "Number of file cache misses in get_pagev requests." - ).unwrap(); + "getpage_cache_misses", + "Number of file cache misses in get_pagev requests.", + ) + .unwrap(); let getpage_cache_hits_counter = IntCounter::new( - "getpage_cache_hits", "Number of file cache hits in get_pagev requests." - ).unwrap(); + "getpage_cache_hits", + "Number of file cache hits in get_pagev requests.", + ) + .unwrap(); // For the requests that affect multiple blocks, have separate counters for the # of blocks affected let request_nblocks_counters = IntCounterVec::new( - metrics::core::Opts::new("request_nblocks_total", "Number of blocks in backend requests."), + metrics::core::Opts::new( + "request_nblocks_total", + "Number of blocks in backend requests.", + ), &["request_kind"], - ).unwrap(); - let request_get_pagev_nblocks_counter = request_nblocks_counters.with_label_values(&["get_pagev"]); - let request_prefetchv_nblocks_counter = request_nblocks_counters.with_label_values(&["prefetchv"]); - let request_rel_zero_extend_nblocks_counter = request_nblocks_counters.with_label_values(&["rel_zero_extend"]); + ) + .unwrap(); + let request_get_pagev_nblocks_counter = + request_nblocks_counters.with_label_values(&["get_pagev"]); + let request_prefetchv_nblocks_counter = + request_nblocks_counters.with_label_values(&["prefetchv"]); + let request_rel_zero_extend_nblocks_counter = + request_nblocks_counters.with_label_values(&["rel_zero_extend"]); CommunicatorWorkerProcessStruct { neon_request_slots: cis.neon_request_slots, @@ -221,7 +235,9 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { self.request_rel_exists_counter.inc(); let rel = req.reltag(); - let _in_progress_guard = self.in_progress_table.lock(RequestInProgressKey::Rel(rel.clone())); + let _in_progress_guard = self + .in_progress_table + .lock(RequestInProgressKey::Rel(rel.clone())); let not_modified_since = match self.cache.get_rel_exists(&rel) { CacheResult::Found(exists) => return NeonIOResult::RelExists(exists), @@ -248,7 +264,9 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { self.request_rel_size_counter.inc(); let rel = req.reltag(); - let _in_progress_guard = self.in_progress_table.lock(RequestInProgressKey::Rel(rel.clone())); + let _in_progress_guard = self + .in_progress_table + .lock(RequestInProgressKey::Rel(rel.clone())); // Check the cache first let not_modified_since = match self.cache.get_rel_size(&rel) { @@ -283,22 +301,26 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { } NeonIORequest::GetPageV(req) => { self.request_get_pagev_counter.inc(); - self.request_get_pagev_nblocks_counter.inc_by(req.nblocks as u64); + self.request_get_pagev_nblocks_counter + .inc_by(req.nblocks as u64); match self.handle_get_pagev_request(req).await { Ok(()) => NeonIOResult::GetPageV, Err(errno) => NeonIOResult::Error(errno), } - }, + } NeonIORequest::PrefetchV(req) => { self.request_prefetchv_counter.inc(); - self.request_prefetchv_nblocks_counter.inc_by(req.nblocks as u64); + self.request_prefetchv_nblocks_counter + .inc_by(req.nblocks as u64); let req = req.clone(); tokio::spawn(async move { self.handle_prefetchv_request(&req).await }); NeonIOResult::PrefetchVLaunched } NeonIORequest::DbSize(req) => { self.request_db_size_counter.inc(); - let _in_progress_guard = self.in_progress_table.lock(RequestInProgressKey::Db(req.db_oid)); + let _in_progress_guard = self + .in_progress_table + .lock(RequestInProgressKey::Db(req.db_oid)); // Check the cache first let not_modified_since = match self.cache.get_db_size(req.db_oid) { @@ -331,7 +353,9 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { // Also store it in the LFC while we still have it let rel = req.reltag(); - let _in_progress_guard = self.in_progress_table.lock(RequestInProgressKey::Block(rel.clone(), req.block_number)); + let _in_progress_guard = self + .in_progress_table + .lock(RequestInProgressKey::Block(rel.clone(), req.block_number)); self.cache .remember_page(&rel, req.block_number, req.src, Lsn(req.lsn), true) .await; @@ -347,7 +371,8 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { } NeonIORequest::RelZeroExtend(req) => { self.request_rel_zero_extend_counter.inc(); - self.request_rel_zero_extend_nblocks_counter.inc_by(req.nblocks as u64); + self.request_rel_zero_extend_nblocks_counter + .inc_by(req.nblocks as u64); // TODO: need to grab an io-in-progress lock for this? I guess not self.cache @@ -396,7 +421,10 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { // note: this is deadlock-safe even though we hold multiple locks at the same time, // because they're always acquired in the same order. - let in_progress_guard = self.in_progress_table.lock(RequestInProgressKey::Block(rel.clone(), blkno)).await; + let in_progress_guard = self + .in_progress_table + .lock(RequestInProgressKey::Block(rel.clone(), blkno)) + .await; let dest = req.dest[i as usize]; let not_modified_since = match self.cache.get_page(&rel, blkno, dest).await { @@ -410,8 +438,10 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { }; cache_misses.push((blkno, not_modified_since, dest, in_progress_guard)); } - self.getpage_cache_misses_counter.inc_by(cache_misses.len() as u64); - self.getpage_cache_hits_counter.inc_by(req.nblocks as u64 - cache_misses.len() as u64); + self.getpage_cache_misses_counter + .inc_by(cache_misses.len() as u64); + self.getpage_cache_hits_counter + .inc_by(req.nblocks as u64 - cache_misses.len() as u64); if cache_misses.is_empty() { return Ok(()); @@ -471,7 +501,10 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { // note: this is deadlock-safe even though we hold multiple locks at the same time, // because they're always acquired in the same order. - let in_progress_guard = self.in_progress_table.lock(RequestInProgressKey::Block(rel.clone(), blkno)).await; + let in_progress_guard = self + .in_progress_table + .lock(RequestInProgressKey::Block(rel.clone(), blkno)) + .await; let not_modified_since = match self.cache.page_is_cached(&rel, blkno).await { Ok(CacheResult::Found(_)) => { @@ -486,7 +519,11 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { if cache_misses.is_empty() { return Ok(()); } - let not_modified_since = cache_misses.iter().map(|(_blkno, lsn, _guard)| *lsn).max().unwrap(); + let not_modified_since = cache_misses + .iter() + .map(|(_blkno, lsn, _guard)| *lsn) + .max() + .unwrap(); // TODO: spawn separate tasks for these. Use the integrated cache to keep track of the // in-flight requests