From 7471f55c2e71925dbc74385638513bbb973237c6 Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Thu, 26 Dec 2024 12:44:03 +0800 Subject: [PATCH] feat(mito): add bloom filter read metrics (#5239) Signed-off-by: Zhenchi --- src/mito2/src/read/prune.rs | 4 +- src/mito2/src/sst/parquet/reader.rs | 138 ++++++++++++++++------------ 2 files changed, 79 insertions(+), 63 deletions(-) diff --git a/src/mito2/src/read/prune.rs b/src/mito2/src/read/prune.rs index 500cd14302..07177a03f1 100644 --- a/src/mito2/src/read/prune.rs +++ b/src/mito2/src/read/prune.rs @@ -113,13 +113,13 @@ impl PruneReader { let num_rows_before_filter = batch.num_rows(); let Some(batch_filtered) = self.context.precise_filter(batch)? else { // the entire batch is filtered out - self.metrics.filter_metrics.num_rows_precise_filtered += num_rows_before_filter; + self.metrics.filter_metrics.rows_precise_filtered += num_rows_before_filter; return Ok(None); }; // update metric let filtered_rows = num_rows_before_filter - batch_filtered.num_rows(); - self.metrics.filter_metrics.num_rows_precise_filtered += filtered_rows; + self.metrics.filter_metrics.rows_precise_filtered += filtered_rows; if !batch_filtered.is_empty() { Ok(Some(batch_filtered)) diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 5931658879..b3fb3340aa 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -350,8 +350,8 @@ impl ParquetReaderBuilder { return BTreeMap::default(); } - metrics.num_row_groups_before_filtering += num_row_groups; - metrics.num_rows_in_row_group_before_filtering += num_rows as usize; + metrics.rg_total += num_row_groups; + metrics.rows_total += num_rows as usize; let mut output = (0..num_row_groups).map(|i| (i, None)).collect(); @@ -398,7 +398,7 @@ impl ParquetReaderBuilder { Err(err) => { if cfg!(any(test, feature = "test")) { panic!( - "Failed to apply full-text index, region_id: {}, file_id: {}, err: {}", + "Failed to apply full-text index, region_id: {}, file_id: {}, err: {:?}", self.file_handle.region_id(), self.file_handle.file_id(), err @@ -420,8 +420,8 @@ impl ParquetReaderBuilder { parquet_meta, row_group_to_row_ids, output, - &mut metrics.num_row_groups_fulltext_index_filtered, - &mut metrics.num_rows_in_row_group_fulltext_index_filtered, + &mut metrics.rg_fulltext_filtered, + &mut metrics.rows_fulltext_filtered, ); true @@ -482,7 +482,7 @@ impl ParquetReaderBuilder { Err(err) => { if cfg!(any(test, feature = "test")) { panic!( - "Failed to apply inverted index, region_id: {}, file_id: {}, err: {}", + "Failed to apply inverted index, region_id: {}, file_id: {}, err: {:?}", self.file_handle.region_id(), self.file_handle.file_id(), err @@ -521,8 +521,8 @@ impl ParquetReaderBuilder { parquet_meta, ranges_in_row_groups, output, - &mut metrics.num_row_groups_inverted_index_filtered, - &mut metrics.num_rows_in_row_group_inverted_index_filtered, + &mut metrics.rg_inverted_filtered, + &mut metrics.rows_inverted_filtered, ); true @@ -564,7 +564,7 @@ impl ParquetReaderBuilder { .collect::>(); let row_groups_after = res.len(); - metrics.num_row_groups_min_max_filtered += row_groups_before - row_groups_after; + metrics.rg_minmax_filtered += row_groups_before - row_groups_after; *output = res; true @@ -627,7 +627,7 @@ impl ParquetReaderBuilder { &self, parquet_meta: &ParquetMetaData, output: &mut BTreeMap>, - _metrics: &mut ReaderFilterMetrics, + metrics: &mut ReaderFilterMetrics, ) -> bool { let Some(index_applier) = &self.bloom_filter_index_applier else { return false; @@ -637,8 +637,10 @@ impl ParquetReaderBuilder { return false; } + let before_rg = output.len(); + let file_size_hint = self.file_handle.meta_ref().bloom_filter_index_size(); - match index_applier + if let Err(err) = index_applier .apply( self.file_handle.file_id(), file_size_hint, @@ -647,26 +649,27 @@ impl ParquetReaderBuilder { ) .await { - Ok(output) => output, - Err(err) => { - if cfg!(any(test, feature = "test")) { - panic!( - "Failed to apply bloom filter index, region_id: {}, file_id: {}, err: {}", - self.file_handle.region_id(), - self.file_handle.file_id(), - err - ); - } else { - warn!( - err; "Failed to apply bloom filter index, region_id: {}, file_id: {}", - self.file_handle.region_id(), self.file_handle.file_id() - ); - } - - return false; + if cfg!(any(test, feature = "test")) { + panic!( + "Failed to apply bloom filter index, region_id: {}, file_id: {}, err: {:?}", + self.file_handle.region_id(), + self.file_handle.file_id(), + err + ); + } else { + warn!( + err; "Failed to apply bloom filter index, region_id: {}, file_id: {}", + self.file_handle.region_id(), self.file_handle.file_id() + ); } + + return false; }; + let after_rg = output.len(); + // Update metrics. + metrics.rg_bloom_filtered += before_rg - after_rg; + true } @@ -727,64 +730,77 @@ impl ParquetReaderBuilder { #[derive(Debug, Default, Clone, Copy)] pub(crate) struct ReaderFilterMetrics { /// Number of row groups before filtering. - pub(crate) num_row_groups_before_filtering: usize, + pub(crate) rg_total: usize, /// Number of row groups filtered by fulltext index. - pub(crate) num_row_groups_fulltext_index_filtered: usize, + pub(crate) rg_fulltext_filtered: usize, /// Number of row groups filtered by inverted index. - pub(crate) num_row_groups_inverted_index_filtered: usize, + pub(crate) rg_inverted_filtered: usize, /// Number of row groups filtered by min-max index. - pub(crate) num_row_groups_min_max_filtered: usize, - /// Number of rows filtered by precise filter. - pub(crate) num_rows_precise_filtered: usize, + pub(crate) rg_minmax_filtered: usize, + /// Number of row groups filtered by bloom filter index. + pub(crate) rg_bloom_filtered: usize, + /// Number of rows in row group before filtering. - pub(crate) num_rows_in_row_group_before_filtering: usize, + pub(crate) rows_total: usize, /// Number of rows in row group filtered by fulltext index. - pub(crate) num_rows_in_row_group_fulltext_index_filtered: usize, + pub(crate) rows_fulltext_filtered: usize, /// Number of rows in row group filtered by inverted index. - pub(crate) num_rows_in_row_group_inverted_index_filtered: usize, + pub(crate) rows_inverted_filtered: usize, + /// Number of rows in row group filtered by bloom filter index. + pub(crate) rows_bloom_filtered: usize, + /// Number of rows filtered by precise filter. + pub(crate) rows_precise_filtered: usize, } impl ReaderFilterMetrics { /// Adds `other` metrics to this metrics. pub(crate) fn merge_from(&mut self, other: &ReaderFilterMetrics) { - self.num_row_groups_before_filtering += other.num_row_groups_before_filtering; - self.num_row_groups_fulltext_index_filtered += other.num_row_groups_fulltext_index_filtered; - self.num_row_groups_inverted_index_filtered += other.num_row_groups_inverted_index_filtered; - self.num_row_groups_min_max_filtered += other.num_row_groups_min_max_filtered; - self.num_rows_precise_filtered += other.num_rows_precise_filtered; - self.num_rows_in_row_group_before_filtering += other.num_rows_in_row_group_before_filtering; - self.num_rows_in_row_group_fulltext_index_filtered += - other.num_rows_in_row_group_fulltext_index_filtered; - self.num_rows_in_row_group_inverted_index_filtered += - other.num_rows_in_row_group_inverted_index_filtered; + self.rg_total += other.rg_total; + self.rg_fulltext_filtered += other.rg_fulltext_filtered; + self.rg_inverted_filtered += other.rg_inverted_filtered; + self.rg_minmax_filtered += other.rg_minmax_filtered; + self.rg_bloom_filtered += other.rg_bloom_filtered; + + self.rows_total += other.rows_total; + self.rows_fulltext_filtered += other.rows_fulltext_filtered; + self.rows_inverted_filtered += other.rows_inverted_filtered; + self.rows_bloom_filtered += other.rows_bloom_filtered; + self.rows_precise_filtered += other.rows_precise_filtered; } /// Reports metrics. pub(crate) fn observe(&self) { READ_ROW_GROUPS_TOTAL .with_label_values(&["before_filtering"]) - .inc_by(self.num_row_groups_before_filtering as u64); + .inc_by(self.rg_total as u64); READ_ROW_GROUPS_TOTAL .with_label_values(&["fulltext_index_filtered"]) - .inc_by(self.num_row_groups_fulltext_index_filtered as u64); + .inc_by(self.rg_fulltext_filtered as u64); READ_ROW_GROUPS_TOTAL .with_label_values(&["inverted_index_filtered"]) - .inc_by(self.num_row_groups_inverted_index_filtered as u64); + .inc_by(self.rg_inverted_filtered as u64); READ_ROW_GROUPS_TOTAL .with_label_values(&["minmax_index_filtered"]) - .inc_by(self.num_row_groups_min_max_filtered as u64); + .inc_by(self.rg_minmax_filtered as u64); + READ_ROW_GROUPS_TOTAL + .with_label_values(&["bloom_filter_index_filtered"]) + .inc_by(self.rg_bloom_filtered as u64); + PRECISE_FILTER_ROWS_TOTAL .with_label_values(&["parquet"]) - .inc_by(self.num_rows_precise_filtered as u64); + .inc_by(self.rows_precise_filtered as u64); READ_ROWS_IN_ROW_GROUP_TOTAL .with_label_values(&["before_filtering"]) - .inc_by(self.num_rows_in_row_group_before_filtering as u64); + .inc_by(self.rows_total as u64); READ_ROWS_IN_ROW_GROUP_TOTAL .with_label_values(&["fulltext_index_filtered"]) - .inc_by(self.num_rows_in_row_group_fulltext_index_filtered as u64); + .inc_by(self.rows_fulltext_filtered as u64); READ_ROWS_IN_ROW_GROUP_TOTAL .with_label_values(&["inverted_index_filtered"]) - .inc_by(self.num_rows_in_row_group_inverted_index_filtered as u64); + .inc_by(self.rows_inverted_filtered as u64); + READ_ROWS_IN_ROW_GROUP_TOTAL + .with_label_values(&["bloom_filter_index_filtered"]) + .inc_by(self.rows_bloom_filtered as u64); } } @@ -1040,12 +1056,12 @@ impl Drop for ParquetReader { self.context.reader_builder().file_handle.region_id(), self.context.reader_builder().file_handle.file_id(), self.context.reader_builder().file_handle.time_range(), - metrics.filter_metrics.num_row_groups_before_filtering - - metrics - .filter_metrics - .num_row_groups_inverted_index_filtered - - metrics.filter_metrics.num_row_groups_min_max_filtered, - metrics.filter_metrics.num_row_groups_before_filtering, + metrics.filter_metrics.rg_total + - metrics.filter_metrics.rg_inverted_filtered + - metrics.filter_metrics.rg_minmax_filtered + - metrics.filter_metrics.rg_fulltext_filtered + - metrics.filter_metrics.rg_bloom_filtered, + metrics.filter_metrics.rg_total, metrics );