mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-14 17:02:56 +00:00
better metrics of the art tree
This commit is contained in:
@@ -3,7 +3,6 @@ pub(crate) mod node_ptr;
|
||||
mod node_ref;
|
||||
|
||||
use std::vec::Vec;
|
||||
use std::sync::atomic::Ordering;
|
||||
|
||||
use crate::algorithm::lock_and_version::ConcurrentUpdateError;
|
||||
use crate::algorithm::node_ptr::MAX_PREFIX_LEN;
|
||||
@@ -254,7 +253,6 @@ where
|
||||
UpdateAction::Nothing => {}
|
||||
UpdateAction::Insert(new_value) => {
|
||||
insert_split_prefix(key, new_value, &mut wnode, &mut wparent, parent_key, guard)?;
|
||||
guard.tree_writer.tree.num_values.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
UpdateAction::Remove => {
|
||||
panic!("unexpected Remove action on insertion");
|
||||
@@ -287,7 +285,6 @@ where
|
||||
// TODO: If parent has only one child left, merge it with the child, extending its
|
||||
// prefix
|
||||
wparent.delete_child(parent_key);
|
||||
guard.tree_writer.tree.num_values.fetch_sub(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
wnode.write_unlock();
|
||||
@@ -313,7 +310,6 @@ where
|
||||
insert_and_grow(key, new_value, &wnode, &mut wparent, parent_key, guard)?;
|
||||
wnode.write_unlock_obsolete();
|
||||
wparent.write_unlock();
|
||||
guard.tree_writer.tree.num_values.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
UpdateAction::Remove => {
|
||||
panic!("unexpected Remove action on insertion");
|
||||
@@ -328,7 +324,6 @@ where
|
||||
UpdateAction::Nothing => {}
|
||||
UpdateAction::Insert(new_value) => {
|
||||
insert_to_node(&mut wnode, key, new_value, guard)?;
|
||||
guard.tree_writer.tree.num_values.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
UpdateAction::Remove => {
|
||||
panic!("unexpected Remove action on insertion");
|
||||
|
||||
@@ -6,6 +6,7 @@ pub mod r#static;
|
||||
use std::alloc::Layout;
|
||||
use std::marker::PhantomData;
|
||||
use std::mem::MaybeUninit;
|
||||
use std::sync::atomic::Ordering;
|
||||
|
||||
use crate::allocator::multislab::MultiSlabAllocator;
|
||||
use crate::allocator::r#static::alloc_from_slice;
|
||||
@@ -111,3 +112,46 @@ impl<'t, V: crate::Value> ArtAllocator<V> for ArtMultiSlabAllocator<'t, V> {
|
||||
self.inner.dealloc_slab(4, ptr.cast())
|
||||
}
|
||||
}
|
||||
|
||||
impl<'t, V: crate::Value> ArtMultiSlabAllocator<'t, V> {
|
||||
pub(crate) fn get_statistics(&self) -> ArtMultiSlabStats {
|
||||
ArtMultiSlabStats {
|
||||
num_internal4: self.inner.slab_descs[0]
|
||||
.num_allocated
|
||||
.load(Ordering::Relaxed),
|
||||
num_internal16: self.inner.slab_descs[1]
|
||||
.num_allocated
|
||||
.load(Ordering::Relaxed),
|
||||
num_internal48: self.inner.slab_descs[2]
|
||||
.num_allocated
|
||||
.load(Ordering::Relaxed),
|
||||
num_internal256: self.inner.slab_descs[3]
|
||||
.num_allocated
|
||||
.load(Ordering::Relaxed),
|
||||
num_leaf: self.inner.slab_descs[4]
|
||||
.num_allocated
|
||||
.load(Ordering::Relaxed),
|
||||
|
||||
num_blocks_internal4: self.inner.slab_descs[0].num_blocks.load(Ordering::Relaxed),
|
||||
num_blocks_internal16: self.inner.slab_descs[1].num_blocks.load(Ordering::Relaxed),
|
||||
num_blocks_internal48: self.inner.slab_descs[2].num_blocks.load(Ordering::Relaxed),
|
||||
num_blocks_internal256: self.inner.slab_descs[3].num_blocks.load(Ordering::Relaxed),
|
||||
num_blocks_leaf: self.inner.slab_descs[4].num_blocks.load(Ordering::Relaxed),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct ArtMultiSlabStats {
|
||||
pub num_internal4: u64,
|
||||
pub num_internal16: u64,
|
||||
pub num_internal48: u64,
|
||||
pub num_internal256: u64,
|
||||
pub num_leaf: u64,
|
||||
|
||||
pub num_blocks_internal4: u64,
|
||||
pub num_blocks_internal16: u64,
|
||||
pub num_blocks_internal48: u64,
|
||||
pub num_blocks_internal256: u64,
|
||||
pub num_blocks_leaf: u64,
|
||||
}
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use std::alloc::Layout;
|
||||
use std::mem::MaybeUninit;
|
||||
use std::ops::Deref;
|
||||
use std::sync::atomic::{AtomicU32, Ordering};
|
||||
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
|
||||
|
||||
use spin;
|
||||
|
||||
@@ -14,6 +14,9 @@ pub(crate) struct SlabDesc {
|
||||
pub(crate) layout: Layout,
|
||||
|
||||
block_lists: spin::RwLock<BlockLists>,
|
||||
|
||||
pub(crate) num_blocks: AtomicU64,
|
||||
pub(crate) num_allocated: AtomicU64,
|
||||
}
|
||||
|
||||
unsafe impl Sync for SlabDesc {}
|
||||
@@ -75,6 +78,8 @@ impl SlabDesc {
|
||||
SlabDesc {
|
||||
layout: *layout,
|
||||
block_lists: spin::RwLock::new(BlockLists::default()),
|
||||
num_allocated: AtomicU64::new(0),
|
||||
num_blocks: AtomicU64::new(0),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -130,6 +135,9 @@ impl SlabDesc {
|
||||
let result = *free_chunks_head;
|
||||
(*free_chunks_head) = (*result).next;
|
||||
(*block_ptr).num_free_chunks.fetch_sub(1, Ordering::Relaxed);
|
||||
|
||||
self.num_allocated.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
return result.cast();
|
||||
}
|
||||
}
|
||||
@@ -153,6 +161,7 @@ impl SlabDesc {
|
||||
|
||||
// no free chunks. Allocate a new block (and the chunk from that)
|
||||
let (new_block, new_chunk) = self.alloc_block_and_chunk(block_allocator);
|
||||
self.num_blocks.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
// Add the block to the list in the SlabDesc
|
||||
unsafe {
|
||||
@@ -160,6 +169,7 @@ impl SlabDesc {
|
||||
block_lists_guard.nonfull_blocks.push_head(new_block);
|
||||
}
|
||||
|
||||
self.num_allocated.fetch_add(1, Ordering::Relaxed);
|
||||
new_chunk
|
||||
}
|
||||
|
||||
@@ -195,6 +205,7 @@ impl SlabDesc {
|
||||
// the free blocks list, is it? Defer it as garbage to wait out concurrent updates?
|
||||
//block_allocator.release_block()
|
||||
}
|
||||
self.num_allocated.fetch_sub(1, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
fn alloc_block_and_chunk(
|
||||
|
||||
@@ -131,7 +131,7 @@ use std::collections::VecDeque;
|
||||
use std::fmt::Debug;
|
||||
use std::marker::PhantomData;
|
||||
use std::ptr::NonNull;
|
||||
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
|
||||
use crate::epoch::EpochPin;
|
||||
|
||||
@@ -164,9 +164,6 @@ pub struct Tree<V: Value> {
|
||||
writer_attached: AtomicBool,
|
||||
|
||||
epoch: epoch::EpochShared,
|
||||
|
||||
// for metrics
|
||||
num_values: AtomicU64,
|
||||
}
|
||||
|
||||
unsafe impl<V: Value + Sync> Sync for Tree<V> {}
|
||||
@@ -247,7 +244,6 @@ impl<'a, 't: 'a, K: Key, V: Value, A: ArtAllocator<V>> TreeInitStruct<'t, K, V,
|
||||
root: algorithm::new_root(allocator),
|
||||
writer_attached: AtomicBool::new(false),
|
||||
epoch: epoch::EpochShared::new(),
|
||||
num_values: AtomicU64::new(0),
|
||||
};
|
||||
unsafe { tree_ptr.write(init) };
|
||||
|
||||
@@ -377,8 +373,7 @@ impl<'e, K: Key, V: Value, A: ArtAllocator<V>> TreeWriteGuard<'e, K, V, A> {
|
||||
}
|
||||
|
||||
/// Remove value. Returns true if it existed
|
||||
pub fn remove(self, key: &K) -> bool
|
||||
{
|
||||
pub fn remove(self, key: &K) -> bool {
|
||||
let mut result = false;
|
||||
self.update_with_fn(key, |existing| {
|
||||
result = existing.is_some();
|
||||
@@ -557,15 +552,16 @@ impl<'e, K: Key, V: Value + Debug> TreeReadGuard<'e, K, V> {
|
||||
}
|
||||
impl<'e, K: Key, V: Value> TreeWriteAccess<'e, K, V, ArtMultiSlabAllocator<'e, V>> {
|
||||
pub fn get_statistics(&self) -> ArtTreeStatistics {
|
||||
self.allocator.get_statistics();
|
||||
ArtTreeStatistics {
|
||||
num_values: self.tree.num_values.load(Ordering::Relaxed),
|
||||
blocks: self.allocator.inner.block_allocator.get_statistics(),
|
||||
slabs: self.allocator.get_statistics(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct ArtTreeStatistics {
|
||||
pub num_values: u64,
|
||||
pub blocks: allocator::block::BlockAllocatorStats,
|
||||
pub slabs: allocator::ArtMultiSlabStats,
|
||||
}
|
||||
|
||||
@@ -68,7 +68,7 @@ fn test_inserts<K: Into<TestKey> + Copy>(keys: &[K]) {
|
||||
assert_eq!(value, Some(idx).as_ref());
|
||||
}
|
||||
|
||||
eprintln!("stats: {:?}", tree_writer.start_write().get_statistics());
|
||||
eprintln!("stats: {:?}", tree_writer.get_statistics());
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -225,7 +225,7 @@ fn random_ops() {
|
||||
|
||||
if i % 1000 == 0 {
|
||||
eprintln!("{i} ops processed");
|
||||
eprintln!("stats: {:?}", tree_writer.start_write().get_statistics());
|
||||
eprintln!("stats: {:?}", tree_writer.get_statistics());
|
||||
test_iter(&tree_writer, &shadow);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -26,17 +26,19 @@ use std::mem::MaybeUninit;
|
||||
use std::ops::Range;
|
||||
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
|
||||
|
||||
use utils::lsn::{Lsn, AtomicLsn};
|
||||
use utils::lsn::{AtomicLsn, Lsn};
|
||||
use zerocopy::FromBytes;
|
||||
|
||||
use crate::file_cache::{CacheBlock, FileCache};
|
||||
use crate::file_cache::INVALID_CACHE_BLOCK;
|
||||
use crate::file_cache::{CacheBlock, FileCache};
|
||||
use pageserver_page_api::model::RelTag;
|
||||
|
||||
use metrics::{IntCounter, IntGauge, IntGaugeVec};
|
||||
|
||||
use neonart;
|
||||
use neonart::UpdateAction;
|
||||
use neonart::TreeInitStruct;
|
||||
use neonart::TreeIterator;
|
||||
use neonart::UpdateAction;
|
||||
|
||||
const CACHE_AREA_SIZE: usize = 10 * 1024 * 1024;
|
||||
|
||||
@@ -66,13 +68,26 @@ pub struct IntegratedCacheWriteAccess<'t> {
|
||||
clock_hand: std::sync::Mutex<TreeIterator<TreeKey>>,
|
||||
|
||||
// Metrics
|
||||
entries_total: metrics::IntGauge,
|
||||
page_evictions_counter: metrics::IntCounter,
|
||||
clock_iterations_counter: metrics::IntCounter,
|
||||
page_evictions_counter: IntCounter,
|
||||
clock_iterations_counter: IntCounter,
|
||||
|
||||
nodes_total: IntGaugeVec,
|
||||
nodes_leaf_total: IntGauge,
|
||||
nodes_internal4_total: IntGauge,
|
||||
nodes_internal16_total: IntGauge,
|
||||
nodes_internal48_total: IntGauge,
|
||||
nodes_internal256_total: IntGauge,
|
||||
|
||||
nodes_memory_bytes: IntGaugeVec,
|
||||
nodes_memory_leaf_bytes: IntGauge,
|
||||
nodes_memory_internal4_bytes: IntGauge,
|
||||
nodes_memory_internal16_bytes: IntGauge,
|
||||
nodes_memory_internal48_bytes: IntGauge,
|
||||
nodes_memory_internal256_bytes: IntGauge,
|
||||
|
||||
// metrics from the art tree
|
||||
cache_memory_size_bytes: metrics::IntGauge,
|
||||
cache_memory_used_bytes: metrics::IntGauge,
|
||||
cache_memory_size_bytes: IntGauge,
|
||||
cache_memory_used_bytes: IntGauge,
|
||||
}
|
||||
|
||||
/// Represents read-only access to the integrated cache. Backend processes have this.
|
||||
@@ -112,35 +127,73 @@ impl<'t> IntegratedCacheInitStruct<'t> {
|
||||
} = self;
|
||||
let tree_writer = handle.attach_writer();
|
||||
|
||||
let nodes_total = IntGaugeVec::new(
|
||||
metrics::core::Opts::new("nodes_total", "Number of nodes in cache tree."),
|
||||
&["node_kind"],
|
||||
)
|
||||
.unwrap();
|
||||
let nodes_leaf_total = nodes_total.with_label_values(&["leaf"]);
|
||||
let nodes_internal4_total = nodes_total.with_label_values(&["internal4"]);
|
||||
let nodes_internal16_total = nodes_total.with_label_values(&["internal16"]);
|
||||
let nodes_internal48_total = nodes_total.with_label_values(&["internal48"]);
|
||||
let nodes_internal256_total = nodes_total.with_label_values(&["internal256"]);
|
||||
|
||||
let nodes_memory_bytes = IntGaugeVec::new(
|
||||
metrics::core::Opts::new(
|
||||
"nodes_memory_bytes",
|
||||
"Memory reserved for nodes in cache tree.",
|
||||
),
|
||||
&["node_kind"],
|
||||
)
|
||||
.unwrap();
|
||||
let nodes_memory_leaf_bytes = nodes_memory_bytes.with_label_values(&["leaf"]);
|
||||
let nodes_memory_internal4_bytes = nodes_memory_bytes.with_label_values(&["internal4"]);
|
||||
let nodes_memory_internal16_bytes = nodes_memory_bytes.with_label_values(&["internal16"]);
|
||||
let nodes_memory_internal48_bytes = nodes_memory_bytes.with_label_values(&["internal48"]);
|
||||
let nodes_memory_internal256_bytes = nodes_memory_bytes.with_label_values(&["internal256"]);
|
||||
|
||||
IntegratedCacheWriteAccess {
|
||||
cache_tree: tree_writer,
|
||||
global_lw_lsn: AtomicU64::new(lsn.0),
|
||||
file_cache,
|
||||
clock_hand: std::sync::Mutex::new(TreeIterator::new_wrapping()),
|
||||
|
||||
entries_total: metrics::IntGauge::new(
|
||||
"entries_total",
|
||||
"Number of entries in the cache",
|
||||
).unwrap(),
|
||||
|
||||
page_evictions_counter: metrics::IntCounter::new(
|
||||
"integrated_cache_evictions",
|
||||
"Page evictions from the Local File Cache",
|
||||
).unwrap(),
|
||||
)
|
||||
.unwrap(),
|
||||
|
||||
clock_iterations_counter: metrics::IntCounter::new(
|
||||
"clock_iterations",
|
||||
"Number of times the clock hand has moved",
|
||||
).unwrap(),
|
||||
)
|
||||
.unwrap(),
|
||||
|
||||
nodes_total,
|
||||
nodes_leaf_total,
|
||||
nodes_internal4_total,
|
||||
nodes_internal16_total,
|
||||
nodes_internal48_total,
|
||||
nodes_internal256_total,
|
||||
|
||||
nodes_memory_bytes,
|
||||
nodes_memory_leaf_bytes,
|
||||
nodes_memory_internal4_bytes,
|
||||
nodes_memory_internal16_bytes,
|
||||
nodes_memory_internal48_bytes,
|
||||
nodes_memory_internal256_bytes,
|
||||
|
||||
cache_memory_size_bytes: metrics::IntGauge::new(
|
||||
"cache_memory_size_bytes",
|
||||
"Memory reserved for cache metadata",
|
||||
).unwrap(),
|
||||
)
|
||||
.unwrap(),
|
||||
cache_memory_used_bytes: metrics::IntGauge::new(
|
||||
"cache_memory_size_bytes",
|
||||
"Memory used for cache metadata",
|
||||
).unwrap(),
|
||||
)
|
||||
.unwrap(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -369,17 +422,14 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
|
||||
|
||||
pub fn remember_rel_size(&'t self, rel: &RelTag, nblocks: u32) {
|
||||
let w = self.cache_tree.start_write();
|
||||
w.update_with_fn(&TreeKey::from(rel), |existing| {
|
||||
match existing {
|
||||
None => UpdateAction::Insert(
|
||||
TreeEntry::Rel(RelEntry {
|
||||
nblocks: AtomicU32::new(nblocks),
|
||||
})),
|
||||
Some(TreeEntry::Block(_)) => panic!("unexpected tree entry type for rel key"),
|
||||
Some(TreeEntry::Rel(rel)) => {
|
||||
rel.nblocks.store(nblocks, Ordering::Relaxed);
|
||||
UpdateAction::Nothing
|
||||
}
|
||||
w.update_with_fn(&TreeKey::from(rel), |existing| match existing {
|
||||
None => UpdateAction::Insert(TreeEntry::Rel(RelEntry {
|
||||
nblocks: AtomicU32::new(nblocks),
|
||||
})),
|
||||
Some(TreeEntry::Block(_)) => panic!("unexpected tree entry type for rel key"),
|
||||
Some(TreeEntry::Rel(rel)) => {
|
||||
rel.nblocks.store(nblocks, Ordering::Relaxed);
|
||||
UpdateAction::Nothing
|
||||
}
|
||||
});
|
||||
}
|
||||
@@ -470,7 +520,12 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
|
||||
};
|
||||
|
||||
// Update the cache block
|
||||
let old_blk = block_entry.cache_block.compare_exchange(INVALID_CACHE_BLOCK, cache_block, Ordering::Relaxed, Ordering::Relaxed);
|
||||
let old_blk = block_entry.cache_block.compare_exchange(
|
||||
INVALID_CACHE_BLOCK,
|
||||
cache_block,
|
||||
Ordering::Relaxed,
|
||||
Ordering::Relaxed,
|
||||
);
|
||||
assert!(old_blk == Ok(INVALID_CACHE_BLOCK) || old_blk == Err(cache_block));
|
||||
|
||||
block_entry.lw_lsn.store(lw_lsn);
|
||||
@@ -480,9 +535,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
|
||||
let was_pinned = block_entry.pinned.swap(false, Ordering::Relaxed);
|
||||
assert!(was_pinned);
|
||||
UpdateAction::Nothing
|
||||
}
|
||||
else
|
||||
{
|
||||
} else {
|
||||
UpdateAction::Insert(TreeEntry::Block(BlockEntry {
|
||||
lw_lsn: AtomicLsn::new(lw_lsn.0),
|
||||
cache_block: AtomicU64::new(cache_block),
|
||||
@@ -593,7 +646,6 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
|
||||
None => UpdateAction::Nothing,
|
||||
Some(TreeEntry::Rel(_)) => panic!("unexpected Rel entry"),
|
||||
Some(TreeEntry::Block(old)) => {
|
||||
|
||||
// note: all the accesses to 'pinned' currently happen
|
||||
// within update_with_fn(), which protects from concurrent
|
||||
// updates. Otherwise, another thread could set the 'pinned'
|
||||
@@ -605,7 +657,9 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
|
||||
let _ = self
|
||||
.global_lw_lsn
|
||||
.fetch_max(old.lw_lsn.load().0, Ordering::Relaxed);
|
||||
let cache_block = old.cache_block.swap(INVALID_CACHE_BLOCK, Ordering::Relaxed);
|
||||
let cache_block = old
|
||||
.cache_block
|
||||
.swap(INVALID_CACHE_BLOCK, Ordering::Relaxed);
|
||||
if cache_block != INVALID_CACHE_BLOCK {
|
||||
evicted_cache_block = Some(cache_block);
|
||||
}
|
||||
@@ -631,7 +685,8 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
|
||||
impl metrics::core::Collector for IntegratedCacheWriteAccess<'_> {
|
||||
fn desc(&self) -> Vec<&metrics::core::Desc> {
|
||||
let mut descs = Vec::new();
|
||||
descs.append(&mut self.entries_total.desc());
|
||||
descs.append(&mut self.nodes_total.desc());
|
||||
descs.append(&mut self.nodes_memory_bytes.desc());
|
||||
descs.append(&mut self.page_evictions_counter.desc());
|
||||
descs.append(&mut self.clock_iterations_counter.desc());
|
||||
|
||||
@@ -640,15 +695,43 @@ impl metrics::core::Collector for IntegratedCacheWriteAccess<'_> {
|
||||
descs
|
||||
}
|
||||
fn collect(&self) -> Vec<metrics::proto::MetricFamily> {
|
||||
const ALLOC_BLOCK_SIZE: i64 = neonart::allocator::block::BLOCK_SIZE as i64;
|
||||
|
||||
// Update gauges
|
||||
let art_statistics = self.cache_tree.get_statistics();
|
||||
self.entries_total.set(art_statistics.num_values as i64);
|
||||
self.nodes_leaf_total
|
||||
.set(art_statistics.slabs.num_leaf as i64);
|
||||
self.nodes_internal4_total
|
||||
.set(art_statistics.slabs.num_internal4 as i64);
|
||||
self.nodes_internal16_total
|
||||
.set(art_statistics.slabs.num_internal16 as i64);
|
||||
self.nodes_internal48_total
|
||||
.set(art_statistics.slabs.num_internal48 as i64);
|
||||
self.nodes_internal256_total
|
||||
.set(art_statistics.slabs.num_internal256 as i64);
|
||||
|
||||
self.nodes_memory_leaf_bytes
|
||||
.set(art_statistics.slabs.num_blocks_leaf as i64 * ALLOC_BLOCK_SIZE);
|
||||
self.nodes_memory_internal4_bytes
|
||||
.set(art_statistics.slabs.num_blocks_internal4 as i64 * ALLOC_BLOCK_SIZE);
|
||||
self.nodes_memory_internal16_bytes
|
||||
.set(art_statistics.slabs.num_blocks_internal16 as i64 * ALLOC_BLOCK_SIZE);
|
||||
self.nodes_memory_internal48_bytes
|
||||
.set(art_statistics.slabs.num_blocks_internal48 as i64 * ALLOC_BLOCK_SIZE);
|
||||
self.nodes_memory_internal256_bytes
|
||||
.set(art_statistics.slabs.num_blocks_internal256 as i64 * ALLOC_BLOCK_SIZE);
|
||||
|
||||
let block_statistics = &art_statistics.blocks;
|
||||
self.cache_memory_size_bytes.set(block_statistics.num_blocks as i64 * neonart::allocator::block::BLOCK_SIZE as i64);
|
||||
self.cache_memory_used_bytes.set((block_statistics.num_initialized as i64 - block_statistics.num_free_blocks as i64 ) * neonart::allocator::block::BLOCK_SIZE as i64);
|
||||
self.cache_memory_size_bytes
|
||||
.set(block_statistics.num_blocks as i64 * ALLOC_BLOCK_SIZE as i64);
|
||||
self.cache_memory_used_bytes.set(
|
||||
(block_statistics.num_initialized as i64 - block_statistics.num_free_blocks as i64)
|
||||
* ALLOC_BLOCK_SIZE as i64,
|
||||
);
|
||||
|
||||
let mut values = Vec::new();
|
||||
values.append(&mut self.entries_total.collect());
|
||||
values.append(&mut self.nodes_total.collect());
|
||||
values.append(&mut self.nodes_memory_bytes.collect());
|
||||
values.append(&mut self.page_evictions_counter.collect());
|
||||
values.append(&mut self.clock_iterations_counter.collect());
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use std::cmp::Eq;
|
||||
use std::hash::Hash;
|
||||
use std::sync::Arc;
|
||||
use std::cmp::Eq;
|
||||
|
||||
use tokio::sync::{Mutex, OwnedMutexGuard};
|
||||
|
||||
@@ -21,22 +21,25 @@ pub type RequestInProgressTable = MutexHashSet<RequestInProgressKey>;
|
||||
// more primitive locking thingie:
|
||||
|
||||
pub struct MutexHashSet<K>
|
||||
where K: Clone + Eq + Hash
|
||||
where
|
||||
K: Clone + Eq + Hash,
|
||||
{
|
||||
lock_table: ClashMap<K, Arc<Mutex<()>>>,
|
||||
}
|
||||
|
||||
pub struct MutexHashSetGuard<'a, K>
|
||||
where K: Clone + Eq + Hash
|
||||
where
|
||||
K: Clone + Eq + Hash,
|
||||
{
|
||||
pub key: K,
|
||||
set: &'a MutexHashSet<K>,
|
||||
set: &'a MutexHashSet<K>,
|
||||
mutex: Arc<Mutex<()>>,
|
||||
_guard: OwnedMutexGuard<()>,
|
||||
}
|
||||
|
||||
impl<'a, K> Drop for MutexHashSetGuard<'a, K>
|
||||
where K: Clone + Eq + Hash
|
||||
where
|
||||
K: Clone + Eq + Hash,
|
||||
{
|
||||
fn drop(&mut self) {
|
||||
let (_old_key, old_val) = self.set.lock_table.remove(&self.key).unwrap();
|
||||
@@ -47,7 +50,8 @@ impl<'a, K> Drop for MutexHashSetGuard<'a, K>
|
||||
}
|
||||
|
||||
impl<K> MutexHashSet<K>
|
||||
where K: Clone + Eq + Hash
|
||||
where
|
||||
K: Clone + Eq + Hash,
|
||||
{
|
||||
pub fn new() -> MutexHashSet<K> {
|
||||
MutexHashSet {
|
||||
@@ -55,8 +59,7 @@ impl<K> MutexHashSet<K>
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn lock<'a>(&'a self, key: K) -> MutexHashSetGuard<'a, K>
|
||||
{
|
||||
pub async fn lock<'a>(&'a self, key: K) -> MutexHashSetGuard<'a, K> {
|
||||
let my_mutex = Arc::new(Mutex::new(()));
|
||||
let my_guard = Arc::clone(&my_mutex).lock_owned().await;
|
||||
|
||||
|
||||
@@ -8,11 +8,11 @@ use crate::init::CommunicatorInitStruct;
|
||||
use crate::integrated_cache::{CacheResult, IntegratedCacheWriteAccess};
|
||||
use crate::neon_request::{CGetPageVRequest, CPrefetchVRequest};
|
||||
use crate::neon_request::{NeonIORequest, NeonIOResult};
|
||||
use crate::worker_process::in_progress_ios::{RequestInProgressTable, RequestInProgressKey};
|
||||
use crate::worker_process::in_progress_ios::{RequestInProgressKey, RequestInProgressTable};
|
||||
use pageserver_client_grpc::PageserverClient;
|
||||
use pageserver_page_api::model;
|
||||
|
||||
use metrics::{IntCounterVec, IntCounter};
|
||||
use metrics::{IntCounter, IntCounterVec};
|
||||
|
||||
use tokio::io::AsyncReadExt;
|
||||
use tokio_pipe::PipeRead;
|
||||
@@ -60,7 +60,6 @@ pub struct CommunicatorWorkerProcessStruct<'a> {
|
||||
request_rel_zero_extend_nblocks_counter: IntCounter,
|
||||
}
|
||||
|
||||
|
||||
pub(super) async fn init(
|
||||
cis: Box<CommunicatorInitStruct>,
|
||||
tenant_id: String,
|
||||
@@ -90,9 +89,13 @@ pub(super) async fn init(
|
||||
let pageserver_client = PageserverClient::new(&tenant_id, &timeline_id, &auth_token, shard_map);
|
||||
|
||||
let request_counters = IntCounterVec::new(
|
||||
metrics::core::Opts::new("backend_requests_total", "Number of requests from backends."),
|
||||
metrics::core::Opts::new(
|
||||
"backend_requests_total",
|
||||
"Number of requests from backends.",
|
||||
),
|
||||
&["request_kind"],
|
||||
).unwrap();
|
||||
)
|
||||
.unwrap();
|
||||
let request_rel_exists_counter = request_counters.with_label_values(&["rel_exists"]);
|
||||
let request_rel_size_counter = request_counters.with_label_values(&["rel_size"]);
|
||||
let request_get_pagev_counter = request_counters.with_label_values(&["get_pagev"]);
|
||||
@@ -106,20 +109,31 @@ pub(super) async fn init(
|
||||
let request_rel_unlink_counter = request_counters.with_label_values(&["rel_unlink"]);
|
||||
|
||||
let getpage_cache_misses_counter = IntCounter::new(
|
||||
"getpage_cache_misses", "Number of file cache misses in get_pagev requests."
|
||||
).unwrap();
|
||||
"getpage_cache_misses",
|
||||
"Number of file cache misses in get_pagev requests.",
|
||||
)
|
||||
.unwrap();
|
||||
let getpage_cache_hits_counter = IntCounter::new(
|
||||
"getpage_cache_hits", "Number of file cache hits in get_pagev requests."
|
||||
).unwrap();
|
||||
"getpage_cache_hits",
|
||||
"Number of file cache hits in get_pagev requests.",
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
// For the requests that affect multiple blocks, have separate counters for the # of blocks affected
|
||||
let request_nblocks_counters = IntCounterVec::new(
|
||||
metrics::core::Opts::new("request_nblocks_total", "Number of blocks in backend requests."),
|
||||
metrics::core::Opts::new(
|
||||
"request_nblocks_total",
|
||||
"Number of blocks in backend requests.",
|
||||
),
|
||||
&["request_kind"],
|
||||
).unwrap();
|
||||
let request_get_pagev_nblocks_counter = request_nblocks_counters.with_label_values(&["get_pagev"]);
|
||||
let request_prefetchv_nblocks_counter = request_nblocks_counters.with_label_values(&["prefetchv"]);
|
||||
let request_rel_zero_extend_nblocks_counter = request_nblocks_counters.with_label_values(&["rel_zero_extend"]);
|
||||
)
|
||||
.unwrap();
|
||||
let request_get_pagev_nblocks_counter =
|
||||
request_nblocks_counters.with_label_values(&["get_pagev"]);
|
||||
let request_prefetchv_nblocks_counter =
|
||||
request_nblocks_counters.with_label_values(&["prefetchv"]);
|
||||
let request_rel_zero_extend_nblocks_counter =
|
||||
request_nblocks_counters.with_label_values(&["rel_zero_extend"]);
|
||||
|
||||
CommunicatorWorkerProcessStruct {
|
||||
neon_request_slots: cis.neon_request_slots,
|
||||
@@ -221,7 +235,9 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
|
||||
self.request_rel_exists_counter.inc();
|
||||
let rel = req.reltag();
|
||||
|
||||
let _in_progress_guard = self.in_progress_table.lock(RequestInProgressKey::Rel(rel.clone()));
|
||||
let _in_progress_guard = self
|
||||
.in_progress_table
|
||||
.lock(RequestInProgressKey::Rel(rel.clone()));
|
||||
|
||||
let not_modified_since = match self.cache.get_rel_exists(&rel) {
|
||||
CacheResult::Found(exists) => return NeonIOResult::RelExists(exists),
|
||||
@@ -248,7 +264,9 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
|
||||
self.request_rel_size_counter.inc();
|
||||
let rel = req.reltag();
|
||||
|
||||
let _in_progress_guard = self.in_progress_table.lock(RequestInProgressKey::Rel(rel.clone()));
|
||||
let _in_progress_guard = self
|
||||
.in_progress_table
|
||||
.lock(RequestInProgressKey::Rel(rel.clone()));
|
||||
|
||||
// Check the cache first
|
||||
let not_modified_since = match self.cache.get_rel_size(&rel) {
|
||||
@@ -283,22 +301,26 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
|
||||
}
|
||||
NeonIORequest::GetPageV(req) => {
|
||||
self.request_get_pagev_counter.inc();
|
||||
self.request_get_pagev_nblocks_counter.inc_by(req.nblocks as u64);
|
||||
self.request_get_pagev_nblocks_counter
|
||||
.inc_by(req.nblocks as u64);
|
||||
match self.handle_get_pagev_request(req).await {
|
||||
Ok(()) => NeonIOResult::GetPageV,
|
||||
Err(errno) => NeonIOResult::Error(errno),
|
||||
}
|
||||
},
|
||||
}
|
||||
NeonIORequest::PrefetchV(req) => {
|
||||
self.request_prefetchv_counter.inc();
|
||||
self.request_prefetchv_nblocks_counter.inc_by(req.nblocks as u64);
|
||||
self.request_prefetchv_nblocks_counter
|
||||
.inc_by(req.nblocks as u64);
|
||||
let req = req.clone();
|
||||
tokio::spawn(async move { self.handle_prefetchv_request(&req).await });
|
||||
NeonIOResult::PrefetchVLaunched
|
||||
}
|
||||
NeonIORequest::DbSize(req) => {
|
||||
self.request_db_size_counter.inc();
|
||||
let _in_progress_guard = self.in_progress_table.lock(RequestInProgressKey::Db(req.db_oid));
|
||||
let _in_progress_guard = self
|
||||
.in_progress_table
|
||||
.lock(RequestInProgressKey::Db(req.db_oid));
|
||||
|
||||
// Check the cache first
|
||||
let not_modified_since = match self.cache.get_db_size(req.db_oid) {
|
||||
@@ -331,7 +353,9 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
|
||||
|
||||
// Also store it in the LFC while we still have it
|
||||
let rel = req.reltag();
|
||||
let _in_progress_guard = self.in_progress_table.lock(RequestInProgressKey::Block(rel.clone(), req.block_number));
|
||||
let _in_progress_guard = self
|
||||
.in_progress_table
|
||||
.lock(RequestInProgressKey::Block(rel.clone(), req.block_number));
|
||||
self.cache
|
||||
.remember_page(&rel, req.block_number, req.src, Lsn(req.lsn), true)
|
||||
.await;
|
||||
@@ -347,7 +371,8 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
|
||||
}
|
||||
NeonIORequest::RelZeroExtend(req) => {
|
||||
self.request_rel_zero_extend_counter.inc();
|
||||
self.request_rel_zero_extend_nblocks_counter.inc_by(req.nblocks as u64);
|
||||
self.request_rel_zero_extend_nblocks_counter
|
||||
.inc_by(req.nblocks as u64);
|
||||
|
||||
// TODO: need to grab an io-in-progress lock for this? I guess not
|
||||
self.cache
|
||||
@@ -396,7 +421,10 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
|
||||
|
||||
// note: this is deadlock-safe even though we hold multiple locks at the same time,
|
||||
// because they're always acquired in the same order.
|
||||
let in_progress_guard = self.in_progress_table.lock(RequestInProgressKey::Block(rel.clone(), blkno)).await;
|
||||
let in_progress_guard = self
|
||||
.in_progress_table
|
||||
.lock(RequestInProgressKey::Block(rel.clone(), blkno))
|
||||
.await;
|
||||
|
||||
let dest = req.dest[i as usize];
|
||||
let not_modified_since = match self.cache.get_page(&rel, blkno, dest).await {
|
||||
@@ -410,8 +438,10 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
|
||||
};
|
||||
cache_misses.push((blkno, not_modified_since, dest, in_progress_guard));
|
||||
}
|
||||
self.getpage_cache_misses_counter.inc_by(cache_misses.len() as u64);
|
||||
self.getpage_cache_hits_counter.inc_by(req.nblocks as u64 - cache_misses.len() as u64);
|
||||
self.getpage_cache_misses_counter
|
||||
.inc_by(cache_misses.len() as u64);
|
||||
self.getpage_cache_hits_counter
|
||||
.inc_by(req.nblocks as u64 - cache_misses.len() as u64);
|
||||
|
||||
if cache_misses.is_empty() {
|
||||
return Ok(());
|
||||
@@ -471,7 +501,10 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
|
||||
|
||||
// note: this is deadlock-safe even though we hold multiple locks at the same time,
|
||||
// because they're always acquired in the same order.
|
||||
let in_progress_guard = self.in_progress_table.lock(RequestInProgressKey::Block(rel.clone(), blkno)).await;
|
||||
let in_progress_guard = self
|
||||
.in_progress_table
|
||||
.lock(RequestInProgressKey::Block(rel.clone(), blkno))
|
||||
.await;
|
||||
|
||||
let not_modified_since = match self.cache.page_is_cached(&rel, blkno).await {
|
||||
Ok(CacheResult::Found(_)) => {
|
||||
@@ -486,7 +519,11 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
|
||||
if cache_misses.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
let not_modified_since = cache_misses.iter().map(|(_blkno, lsn, _guard)| *lsn).max().unwrap();
|
||||
let not_modified_since = cache_misses
|
||||
.iter()
|
||||
.map(|(_blkno, lsn, _guard)| *lsn)
|
||||
.max()
|
||||
.unwrap();
|
||||
|
||||
// TODO: spawn separate tasks for these. Use the integrated cache to keep track of the
|
||||
// in-flight requests
|
||||
|
||||
Reference in New Issue
Block a user