diff --git a/src/mito2/src/read/scan_util.rs b/src/mito2/src/read/scan_util.rs index 2982a065c5..6d550603f8 100644 --- a/src/mito2/src/read/scan_util.rs +++ b/src/mito2/src/read/scan_util.rs @@ -41,12 +41,12 @@ use crate::read::range::{RangeBuilderList, RangeMeta, RowGroupIndex}; use crate::read::scan_region::StreamContext; use crate::read::{Batch, BoxedBatchStream, BoxedRecordBatchStream, ScannerMetrics, Source}; use crate::sst::file::FileTimeRange; -use crate::sst::parquet::DEFAULT_ROW_GROUP_SIZE; -use crate::sst::parquet::file_range::FileRange; -use crate::sst::parquet::flat_format::time_index_column_index; use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplyMetrics; use crate::sst::index::fulltext_index::applier::FulltextIndexApplyMetrics; use crate::sst::index::inverted_index::applier::InvertedIndexApplyMetrics; +use crate::sst::parquet::DEFAULT_ROW_GROUP_SIZE; +use crate::sst::parquet::file_range::FileRange; +use crate::sst::parquet::flat_format::time_index_column_index; use crate::sst::parquet::reader::{MetadataCacheMetrics, ReaderFilterMetrics, ReaderMetrics}; use crate::sst::parquet::row_group::ParquetFetchMetrics; diff --git a/src/mito2/src/sst/index/bloom_filter/applier.rs b/src/mito2/src/sst/index/bloom_filter/applier.rs index 39e9f9bb32..54ab6e5b4a 100644 --- a/src/mito2/src/sst/index/bloom_filter/applier.rs +++ b/src/mito2/src/sst/index/bloom_filter/applier.rs @@ -51,7 +51,7 @@ pub use crate::sst::index::bloom_filter::applier::builder::BloomFilterIndexAppli use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory}; /// Metrics for tracking bloom filter index apply operations. -#[derive(Debug, Default, Clone)] +#[derive(Default, Clone)] pub struct BloomFilterIndexApplyMetrics { /// Total time spent applying the index. pub apply_elapsed: std::time::Duration, @@ -63,6 +63,47 @@ pub struct BloomFilterIndexApplyMetrics { pub read_metrics: BloomFilterReadMetrics, } +impl std::fmt::Debug for BloomFilterIndexApplyMetrics { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{{")?; + let mut first = true; + + if !self.apply_elapsed.is_zero() { + write!(f, "\"apply_elapsed\":\"{:?}\"", self.apply_elapsed)?; + first = false; + } + if self.blob_cache_miss > 0 { + if !first { + write!(f, ", ")?; + } + write!(f, "\"blob_cache_miss\":{}", self.blob_cache_miss)?; + first = false; + } + if self.blob_read_bytes > 0 { + if !first { + write!(f, ", ")?; + } + write!(f, "\"blob_read_bytes\":{}", self.blob_read_bytes)?; + first = false; + } + if self.read_metrics.header_size > 0 { + if !first { + write!(f, ", ")?; + } + write!(f, "\"header_size\":{}", self.read_metrics.header_size)?; + first = false; + } + if self.read_metrics.bitset_size > 0 { + if !first { + write!(f, ", ")?; + } + write!(f, "\"bitset_size\":{}", self.read_metrics.bitset_size)?; + } + + write!(f, "}}") + } +} + impl BloomFilterIndexApplyMetrics { /// Merges another metrics into this one. pub fn merge_from(&mut self, other: &Self) { diff --git a/src/mito2/src/sst/index/fulltext_index/applier.rs b/src/mito2/src/sst/index/fulltext_index/applier.rs index 7eb6b71b32..c9d9786252 100644 --- a/src/mito2/src/sst/index/fulltext_index/applier.rs +++ b/src/mito2/src/sst/index/fulltext_index/applier.rs @@ -55,7 +55,7 @@ use crate::sst::index::puffin_manager::{ pub mod builder; /// Metrics for tracking fulltext index apply operations. -#[derive(Debug, Default, Clone)] +#[derive(Default, Clone)] pub struct FulltextIndexApplyMetrics { /// Total time spent applying the index. pub apply_elapsed: std::time::Duration, @@ -71,6 +71,69 @@ pub struct FulltextIndexApplyMetrics { pub bloom_filter_read_metrics: BloomFilterReadMetrics, } +impl std::fmt::Debug for FulltextIndexApplyMetrics { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{{")?; + let mut first = true; + + if !self.apply_elapsed.is_zero() { + write!(f, "\"apply_elapsed\":\"{:?}\"", self.apply_elapsed)?; + first = false; + } + if self.blob_cache_miss > 0 { + if !first { + write!(f, ", ")?; + } + write!(f, "\"blob_cache_miss\":{}", self.blob_cache_miss)?; + first = false; + } + if self.dir_cache_hit > 0 { + if !first { + write!(f, ", ")?; + } + write!(f, "\"dir_cache_hit\":{}", self.dir_cache_hit)?; + first = false; + } + if self.dir_cache_miss > 0 { + if !first { + write!(f, ", ")?; + } + write!(f, "\"dir_cache_miss\":{}", self.dir_cache_miss)?; + first = false; + } + if !self.dir_init_elapsed.is_zero() { + if !first { + write!(f, ", ")?; + } + write!(f, "\"dir_init_elapsed\":\"{:?}\"", self.dir_init_elapsed)?; + first = false; + } + if self.bloom_filter_read_metrics.header_size > 0 { + if !first { + write!(f, ", ")?; + } + write!( + f, + "\"header_size\":{}", + self.bloom_filter_read_metrics.header_size + )?; + first = false; + } + if self.bloom_filter_read_metrics.bitset_size > 0 { + if !first { + write!(f, ", ")?; + } + write!( + f, + "\"bitset_size\":{}", + self.bloom_filter_read_metrics.bitset_size + )?; + } + + write!(f, "}}") + } +} + impl FulltextIndexApplyMetrics { /// Collects metrics from a directory read operation. pub fn collect_dir_metrics( diff --git a/src/mito2/src/sst/index/inverted_index/applier.rs b/src/mito2/src/sst/index/inverted_index/applier.rs index aede343df9..0b7933d1b1 100644 --- a/src/mito2/src/sst/index/inverted_index/applier.rs +++ b/src/mito2/src/sst/index/inverted_index/applier.rs @@ -46,7 +46,7 @@ use crate::sst::index::inverted_index::INDEX_BLOB_TYPE; use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory}; /// Metrics for tracking inverted index apply operations. -#[derive(Debug, Default, Clone)] +#[derive(Default, Clone)] pub struct InvertedIndexApplyMetrics { /// Total time spent applying the index. pub apply_elapsed: std::time::Duration, @@ -58,6 +58,66 @@ pub struct InvertedIndexApplyMetrics { pub inverted_index_read_metrics: InvertedIndexReadMetrics, } +impl std::fmt::Debug for InvertedIndexApplyMetrics { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{{")?; + let mut first = true; + + if !self.apply_elapsed.is_zero() { + write!(f, "\"apply_elapsed\":\"{:?}\"", self.apply_elapsed)?; + first = false; + } + if self.blob_cache_miss > 0 { + if !first { + write!(f, ", ")?; + } + write!(f, "\"blob_cache_miss\":{}", self.blob_cache_miss)?; + first = false; + } + if self.blob_read_bytes > 0 { + if !first { + write!(f, ", ")?; + } + write!(f, "\"blob_read_bytes\":{}", self.blob_read_bytes)?; + first = false; + } + if self.inverted_index_read_metrics.fst_size > 0 { + if !first { + write!(f, ", ")?; + } + write!( + f, + "\"fst_size\":{}", + self.inverted_index_read_metrics.fst_size + )?; + first = false; + } + if self.inverted_index_read_metrics.dict_size > 0 { + if !first { + write!(f, ", ")?; + } + write!( + f, + "\"dict_size\":{}", + self.inverted_index_read_metrics.dict_size + )?; + first = false; + } + if self.inverted_index_read_metrics.bitmap_size > 0 { + if !first { + write!(f, ", ")?; + } + write!( + f, + "\"bitmap_size\":{}", + self.inverted_index_read_metrics.bitmap_size + )?; + } + + write!(f, "}}") + } +} + impl InvertedIndexApplyMetrics { /// Merges another metrics into this one. pub fn merge_from(&mut self, other: &Self) { diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 7ea3f4c497..f9805df86d 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -1047,7 +1047,7 @@ impl ReaderFilterMetrics { } /// Metrics for parquet metadata cache operations. -#[derive(Debug, Default, Clone, Copy)] +#[derive(Default, Clone, Copy)] pub(crate) struct MetadataCacheMetrics { /// Number of memory cache hits for parquet metadata. pub(crate) mem_cache_hit: usize, @@ -1061,6 +1061,51 @@ pub(crate) struct MetadataCacheMetrics { pub(crate) metadata_load_cost: Duration, } +impl std::fmt::Debug for MetadataCacheMetrics { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{{")?; + let mut first = true; + + if self.mem_cache_hit > 0 { + write!(f, "\"mem_cache_hit\":{}", self.mem_cache_hit)?; + first = false; + } + if self.mem_cache_miss > 0 { + if !first { + write!(f, ", ")?; + } + write!(f, "\"mem_cache_miss\":{}", self.mem_cache_miss)?; + first = false; + } + if self.file_cache_hit > 0 { + if !first { + write!(f, ", ")?; + } + write!(f, "\"file_cache_hit\":{}", self.file_cache_hit)?; + first = false; + } + if self.file_cache_miss > 0 { + if !first { + write!(f, ", ")?; + } + write!(f, "\"file_cache_miss\":{}", self.file_cache_miss)?; + first = false; + } + if !self.metadata_load_cost.is_zero() { + if !first { + write!(f, ", ")?; + } + write!( + f, + "\"metadata_load_cost\":\"{:?}\"", + self.metadata_load_cost + )?; + } + + write!(f, "}}") + } +} + impl MetadataCacheMetrics { /// Adds `other` metrics to this metrics. pub(crate) fn merge_from(&mut self, other: &MetadataCacheMetrics) { @@ -1348,7 +1393,11 @@ impl BatchReader for ParquetReader { let parquet_reader = self .context .reader_builder() - .build(row_group_idx, Some(row_selection), Some(&self.fetch_metrics)) + .build( + row_group_idx, + Some(row_selection), + Some(&self.fetch_metrics), + ) .await?; // Resets the parquet reader. diff --git a/src/mito2/src/sst/parquet/row_group.rs b/src/mito2/src/sst/parquet/row_group.rs index 83f2ab96d3..9cbf10b573 100644 --- a/src/mito2/src/sst/parquet/row_group.rs +++ b/src/mito2/src/sst/parquet/row_group.rs @@ -37,7 +37,7 @@ use crate::sst::parquet::helper::{MERGE_GAP, fetch_byte_ranges}; /// Metrics for tracking page/row group fetch operations. /// Uses atomic counters for thread-safe updates. -#[derive(Debug, Default)] +#[derive(Default)] pub struct ParquetFetchMetrics { /// Number of page cache hits. page_cache_hit: std::sync::atomic::AtomicUsize, @@ -53,6 +53,63 @@ pub struct ParquetFetchMetrics { object_store_fetch_elapsed: std::sync::atomic::AtomicU64, } +impl std::fmt::Debug for ParquetFetchMetrics { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{{")?; + let mut first = true; + + let page_cache_hit = self.page_cache_hit(); + let page_cache_miss = self.page_cache_miss(); + let write_cache_hit = self.write_cache_hit(); + let write_cache_miss = self.write_cache_miss(); + let write_cache_elapsed = self.write_cache_fetch_elapsed(); + let object_store_elapsed = self.object_store_fetch_elapsed(); + + if page_cache_hit > 0 { + write!(f, "\"page_cache_hit\":{}", page_cache_hit)?; + first = false; + } + if page_cache_miss > 0 { + if !first { + write!(f, ", ")?; + } + write!(f, "\"page_cache_miss\":{}", page_cache_miss)?; + first = false; + } + if write_cache_hit > 0 { + if !first { + write!(f, ", ")?; + } + write!(f, "\"write_cache_hit\":{}", write_cache_hit)?; + first = false; + } + if write_cache_miss > 0 { + if !first { + write!(f, ", ")?; + } + write!(f, "\"write_cache_miss\":{}", write_cache_miss)?; + first = false; + } + if write_cache_elapsed > 0 { + if !first { + write!(f, ", ")?; + } + let duration = std::time::Duration::from_micros(write_cache_elapsed); + write!(f, "\"write_cache_fetch_elapsed\":\"{:?}\"", duration)?; + first = false; + } + if object_store_elapsed > 0 { + if !first { + write!(f, ", ")?; + } + let duration = std::time::Duration::from_micros(object_store_elapsed); + write!(f, "\"object_store_fetch_elapsed\":\"{:?}\"", duration)?; + } + + write!(f, "}}") + } +} + impl ParquetFetchMetrics { /// Increments page cache hit counter. pub fn inc_page_cache_hit(&self) { @@ -130,12 +187,18 @@ impl ParquetFetchMetrics { pub fn merge_from(&self, other: &ParquetFetchMetrics) { self.page_cache_hit .fetch_add(other.page_cache_hit(), std::sync::atomic::Ordering::Relaxed); - self.page_cache_miss - .fetch_add(other.page_cache_miss(), std::sync::atomic::Ordering::Relaxed); - self.write_cache_hit - .fetch_add(other.write_cache_hit(), std::sync::atomic::Ordering::Relaxed); - self.write_cache_miss - .fetch_add(other.write_cache_miss(), std::sync::atomic::Ordering::Relaxed); + self.page_cache_miss.fetch_add( + other.page_cache_miss(), + std::sync::atomic::Ordering::Relaxed, + ); + self.write_cache_hit.fetch_add( + other.write_cache_hit(), + std::sync::atomic::Ordering::Relaxed, + ); + self.write_cache_miss.fetch_add( + other.write_cache_miss(), + std::sync::atomic::Ordering::Relaxed, + ); self.write_cache_fetch_elapsed.fetch_add( other.write_cache_fetch_elapsed(), std::sync::atomic::Ordering::Relaxed,