feat: implement debug for new metrics

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
evenyag
2025-11-25 17:33:26 +08:00
committed by shuiyisong
parent 37847a8df6
commit f9c66ba0de
6 changed files with 291 additions and 15 deletions

View File

@@ -41,12 +41,12 @@ use crate::read::range::{RangeBuilderList, RangeMeta, RowGroupIndex};
use crate::read::scan_region::StreamContext;
use crate::read::{Batch, BoxedBatchStream, BoxedRecordBatchStream, ScannerMetrics, Source};
use crate::sst::file::FileTimeRange;
use crate::sst::parquet::DEFAULT_ROW_GROUP_SIZE;
use crate::sst::parquet::file_range::FileRange;
use crate::sst::parquet::flat_format::time_index_column_index;
use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplyMetrics;
use crate::sst::index::fulltext_index::applier::FulltextIndexApplyMetrics;
use crate::sst::index::inverted_index::applier::InvertedIndexApplyMetrics;
use crate::sst::parquet::DEFAULT_ROW_GROUP_SIZE;
use crate::sst::parquet::file_range::FileRange;
use crate::sst::parquet::flat_format::time_index_column_index;
use crate::sst::parquet::reader::{MetadataCacheMetrics, ReaderFilterMetrics, ReaderMetrics};
use crate::sst::parquet::row_group::ParquetFetchMetrics;

View File

@@ -51,7 +51,7 @@ pub use crate::sst::index::bloom_filter::applier::builder::BloomFilterIndexAppli
use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory};
/// Metrics for tracking bloom filter index apply operations.
#[derive(Debug, Default, Clone)]
#[derive(Default, Clone)]
pub struct BloomFilterIndexApplyMetrics {
/// Total time spent applying the index.
pub apply_elapsed: std::time::Duration,
@@ -63,6 +63,47 @@ pub struct BloomFilterIndexApplyMetrics {
pub read_metrics: BloomFilterReadMetrics,
}
impl std::fmt::Debug for BloomFilterIndexApplyMetrics {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{{")?;
let mut first = true;
if !self.apply_elapsed.is_zero() {
write!(f, "\"apply_elapsed\":\"{:?}\"", self.apply_elapsed)?;
first = false;
}
if self.blob_cache_miss > 0 {
if !first {
write!(f, ", ")?;
}
write!(f, "\"blob_cache_miss\":{}", self.blob_cache_miss)?;
first = false;
}
if self.blob_read_bytes > 0 {
if !first {
write!(f, ", ")?;
}
write!(f, "\"blob_read_bytes\":{}", self.blob_read_bytes)?;
first = false;
}
if self.read_metrics.header_size > 0 {
if !first {
write!(f, ", ")?;
}
write!(f, "\"header_size\":{}", self.read_metrics.header_size)?;
first = false;
}
if self.read_metrics.bitset_size > 0 {
if !first {
write!(f, ", ")?;
}
write!(f, "\"bitset_size\":{}", self.read_metrics.bitset_size)?;
}
write!(f, "}}")
}
}
impl BloomFilterIndexApplyMetrics {
/// Merges another metrics into this one.
pub fn merge_from(&mut self, other: &Self) {

View File

@@ -55,7 +55,7 @@ use crate::sst::index::puffin_manager::{
pub mod builder;
/// Metrics for tracking fulltext index apply operations.
#[derive(Debug, Default, Clone)]
#[derive(Default, Clone)]
pub struct FulltextIndexApplyMetrics {
/// Total time spent applying the index.
pub apply_elapsed: std::time::Duration,
@@ -71,6 +71,69 @@ pub struct FulltextIndexApplyMetrics {
pub bloom_filter_read_metrics: BloomFilterReadMetrics,
}
impl std::fmt::Debug for FulltextIndexApplyMetrics {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{{")?;
let mut first = true;
if !self.apply_elapsed.is_zero() {
write!(f, "\"apply_elapsed\":\"{:?}\"", self.apply_elapsed)?;
first = false;
}
if self.blob_cache_miss > 0 {
if !first {
write!(f, ", ")?;
}
write!(f, "\"blob_cache_miss\":{}", self.blob_cache_miss)?;
first = false;
}
if self.dir_cache_hit > 0 {
if !first {
write!(f, ", ")?;
}
write!(f, "\"dir_cache_hit\":{}", self.dir_cache_hit)?;
first = false;
}
if self.dir_cache_miss > 0 {
if !first {
write!(f, ", ")?;
}
write!(f, "\"dir_cache_miss\":{}", self.dir_cache_miss)?;
first = false;
}
if !self.dir_init_elapsed.is_zero() {
if !first {
write!(f, ", ")?;
}
write!(f, "\"dir_init_elapsed\":\"{:?}\"", self.dir_init_elapsed)?;
first = false;
}
if self.bloom_filter_read_metrics.header_size > 0 {
if !first {
write!(f, ", ")?;
}
write!(
f,
"\"header_size\":{}",
self.bloom_filter_read_metrics.header_size
)?;
first = false;
}
if self.bloom_filter_read_metrics.bitset_size > 0 {
if !first {
write!(f, ", ")?;
}
write!(
f,
"\"bitset_size\":{}",
self.bloom_filter_read_metrics.bitset_size
)?;
}
write!(f, "}}")
}
}
impl FulltextIndexApplyMetrics {
/// Collects metrics from a directory read operation.
pub fn collect_dir_metrics(

View File

@@ -46,7 +46,7 @@ use crate::sst::index::inverted_index::INDEX_BLOB_TYPE;
use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory};
/// Metrics for tracking inverted index apply operations.
#[derive(Debug, Default, Clone)]
#[derive(Default, Clone)]
pub struct InvertedIndexApplyMetrics {
/// Total time spent applying the index.
pub apply_elapsed: std::time::Duration,
@@ -58,6 +58,66 @@ pub struct InvertedIndexApplyMetrics {
pub inverted_index_read_metrics: InvertedIndexReadMetrics,
}
impl std::fmt::Debug for InvertedIndexApplyMetrics {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{{")?;
let mut first = true;
if !self.apply_elapsed.is_zero() {
write!(f, "\"apply_elapsed\":\"{:?}\"", self.apply_elapsed)?;
first = false;
}
if self.blob_cache_miss > 0 {
if !first {
write!(f, ", ")?;
}
write!(f, "\"blob_cache_miss\":{}", self.blob_cache_miss)?;
first = false;
}
if self.blob_read_bytes > 0 {
if !first {
write!(f, ", ")?;
}
write!(f, "\"blob_read_bytes\":{}", self.blob_read_bytes)?;
first = false;
}
if self.inverted_index_read_metrics.fst_size > 0 {
if !first {
write!(f, ", ")?;
}
write!(
f,
"\"fst_size\":{}",
self.inverted_index_read_metrics.fst_size
)?;
first = false;
}
if self.inverted_index_read_metrics.dict_size > 0 {
if !first {
write!(f, ", ")?;
}
write!(
f,
"\"dict_size\":{}",
self.inverted_index_read_metrics.dict_size
)?;
first = false;
}
if self.inverted_index_read_metrics.bitmap_size > 0 {
if !first {
write!(f, ", ")?;
}
write!(
f,
"\"bitmap_size\":{}",
self.inverted_index_read_metrics.bitmap_size
)?;
}
write!(f, "}}")
}
}
impl InvertedIndexApplyMetrics {
/// Merges another metrics into this one.
pub fn merge_from(&mut self, other: &Self) {

View File

@@ -1047,7 +1047,7 @@ impl ReaderFilterMetrics {
}
/// Metrics for parquet metadata cache operations.
#[derive(Debug, Default, Clone, Copy)]
#[derive(Default, Clone, Copy)]
pub(crate) struct MetadataCacheMetrics {
/// Number of memory cache hits for parquet metadata.
pub(crate) mem_cache_hit: usize,
@@ -1061,6 +1061,51 @@ pub(crate) struct MetadataCacheMetrics {
pub(crate) metadata_load_cost: Duration,
}
impl std::fmt::Debug for MetadataCacheMetrics {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{{")?;
let mut first = true;
if self.mem_cache_hit > 0 {
write!(f, "\"mem_cache_hit\":{}", self.mem_cache_hit)?;
first = false;
}
if self.mem_cache_miss > 0 {
if !first {
write!(f, ", ")?;
}
write!(f, "\"mem_cache_miss\":{}", self.mem_cache_miss)?;
first = false;
}
if self.file_cache_hit > 0 {
if !first {
write!(f, ", ")?;
}
write!(f, "\"file_cache_hit\":{}", self.file_cache_hit)?;
first = false;
}
if self.file_cache_miss > 0 {
if !first {
write!(f, ", ")?;
}
write!(f, "\"file_cache_miss\":{}", self.file_cache_miss)?;
first = false;
}
if !self.metadata_load_cost.is_zero() {
if !first {
write!(f, ", ")?;
}
write!(
f,
"\"metadata_load_cost\":\"{:?}\"",
self.metadata_load_cost
)?;
}
write!(f, "}}")
}
}
impl MetadataCacheMetrics {
/// Adds `other` metrics to this metrics.
pub(crate) fn merge_from(&mut self, other: &MetadataCacheMetrics) {
@@ -1348,7 +1393,11 @@ impl BatchReader for ParquetReader {
let parquet_reader = self
.context
.reader_builder()
.build(row_group_idx, Some(row_selection), Some(&self.fetch_metrics))
.build(
row_group_idx,
Some(row_selection),
Some(&self.fetch_metrics),
)
.await?;
// Resets the parquet reader.

View File

@@ -37,7 +37,7 @@ use crate::sst::parquet::helper::{MERGE_GAP, fetch_byte_ranges};
/// Metrics for tracking page/row group fetch operations.
/// Uses atomic counters for thread-safe updates.
#[derive(Debug, Default)]
#[derive(Default)]
pub struct ParquetFetchMetrics {
/// Number of page cache hits.
page_cache_hit: std::sync::atomic::AtomicUsize,
@@ -53,6 +53,63 @@ pub struct ParquetFetchMetrics {
object_store_fetch_elapsed: std::sync::atomic::AtomicU64,
}
impl std::fmt::Debug for ParquetFetchMetrics {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{{")?;
let mut first = true;
let page_cache_hit = self.page_cache_hit();
let page_cache_miss = self.page_cache_miss();
let write_cache_hit = self.write_cache_hit();
let write_cache_miss = self.write_cache_miss();
let write_cache_elapsed = self.write_cache_fetch_elapsed();
let object_store_elapsed = self.object_store_fetch_elapsed();
if page_cache_hit > 0 {
write!(f, "\"page_cache_hit\":{}", page_cache_hit)?;
first = false;
}
if page_cache_miss > 0 {
if !first {
write!(f, ", ")?;
}
write!(f, "\"page_cache_miss\":{}", page_cache_miss)?;
first = false;
}
if write_cache_hit > 0 {
if !first {
write!(f, ", ")?;
}
write!(f, "\"write_cache_hit\":{}", write_cache_hit)?;
first = false;
}
if write_cache_miss > 0 {
if !first {
write!(f, ", ")?;
}
write!(f, "\"write_cache_miss\":{}", write_cache_miss)?;
first = false;
}
if write_cache_elapsed > 0 {
if !first {
write!(f, ", ")?;
}
let duration = std::time::Duration::from_micros(write_cache_elapsed);
write!(f, "\"write_cache_fetch_elapsed\":\"{:?}\"", duration)?;
first = false;
}
if object_store_elapsed > 0 {
if !first {
write!(f, ", ")?;
}
let duration = std::time::Duration::from_micros(object_store_elapsed);
write!(f, "\"object_store_fetch_elapsed\":\"{:?}\"", duration)?;
}
write!(f, "}}")
}
}
impl ParquetFetchMetrics {
/// Increments page cache hit counter.
pub fn inc_page_cache_hit(&self) {
@@ -130,12 +187,18 @@ impl ParquetFetchMetrics {
pub fn merge_from(&self, other: &ParquetFetchMetrics) {
self.page_cache_hit
.fetch_add(other.page_cache_hit(), std::sync::atomic::Ordering::Relaxed);
self.page_cache_miss
.fetch_add(other.page_cache_miss(), std::sync::atomic::Ordering::Relaxed);
self.write_cache_hit
.fetch_add(other.write_cache_hit(), std::sync::atomic::Ordering::Relaxed);
self.write_cache_miss
.fetch_add(other.write_cache_miss(), std::sync::atomic::Ordering::Relaxed);
self.page_cache_miss.fetch_add(
other.page_cache_miss(),
std::sync::atomic::Ordering::Relaxed,
);
self.write_cache_hit.fetch_add(
other.write_cache_hit(),
std::sync::atomic::Ordering::Relaxed,
);
self.write_cache_miss.fetch_add(
other.write_cache_miss(),
std::sync::atomic::Ordering::Relaxed,
);
self.write_cache_fetch_elapsed.fetch_add(
other.write_cache_fetch_elapsed(),
std::sync::atomic::Ordering::Relaxed,