From a0a8e8c587ece5ea23abeb3b0070d553eb8778fc Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Wed, 28 Feb 2024 16:47:49 +0800 Subject: [PATCH] fix: some read metrics (#3404) * fix: some read metrics * chore: fix some metrics * fix --- .../src/memtable/merge_tree/partition.rs | 59 +++++++++++++------ src/mito2/src/memtable/merge_tree/tree.rs | 42 +++++++++---- 2 files changed, 74 insertions(+), 27 deletions(-) diff --git a/src/mito2/src/memtable/merge_tree/partition.rs b/src/mito2/src/memtable/merge_tree/partition.rs index 28df85ba7a..f12764a672 100644 --- a/src/mito2/src/memtable/merge_tree/partition.rs +++ b/src/mito2/src/memtable/merge_tree/partition.rs @@ -22,7 +22,6 @@ use std::time::{Duration, Instant}; use api::v1::SemanticType; use common_recordbatch::filter::SimpleFilterEvaluator; -use common_telemetry::tracing::log; use store_api::metadata::RegionMetadataRef; use store_api::metric_engine_consts::DATA_SCHEMA_TABLE_ID_COLUMN_NAME; use store_api::storage::ColumnId; @@ -278,9 +277,7 @@ impl PartitionReader { /// # Panics /// Panics if the reader is invalid. pub fn next(&mut self) -> Result<()> { - let read_source = Instant::now(); - self.source.next()?; - self.context.metrics.read_source += read_source.elapsed(); + self.advance_source()?; self.prune_batch_by_key() } @@ -305,8 +302,14 @@ impl PartitionReader { self.context } + fn advance_source(&mut self) -> Result<()> { + let read_source = Instant::now(); + self.source.next()?; + self.context.metrics.read_source += read_source.elapsed(); + Ok(()) + } + fn prune_batch_by_key(&mut self) -> Result<()> { - let start = Instant::now(); if self.context.metadata.primary_key.is_empty() || !self.context.need_prune_key { // Nothing to prune. return Ok(()); @@ -329,23 +332,36 @@ impl PartitionReader { &self.context.filters, &self.context.row_codec, key, + &mut self.context.metrics, ) { // We need this key. self.last_yield_pk_id = Some(pk_id); self.context.metrics.keys_after_pruning += 1; break; } - self.source.next()?; + self.advance_source()?; } - self.context.metrics.prune_pk += start.elapsed(); Ok(()) } } -// TODO(yingwen): Improve performance of key prunning. Now we need to find index and +fn prune_primary_key( + metadata: &RegionMetadataRef, + filters: &[SimpleFilterEvaluator], + codec: &McmpRowCodec, + pk: &[u8], + metrics: &mut PartitionReaderMetrics, +) -> bool { + let start = Instant::now(); + let res = prune_primary_key_inner(metadata, filters, codec, pk); + metrics.prune_pk += start.elapsed(); + res +} + +// TODO(yingwen): Improve performance of key pruning. Now we need to find index and // then decode and convert each value. /// Returns true if the `pk` is still needed. -fn prune_primary_key( +fn prune_primary_key_inner( metadata: &RegionMetadataRef, filters: &[SimpleFilterEvaluator], codec: &McmpRowCodec, @@ -408,20 +424,29 @@ pub(crate) struct ReadPartitionContext { impl Drop for ReadPartitionContext { fn drop(&mut self) { + let partition_prune_pk = self.metrics.prune_pk.as_secs_f64(); MERGE_TREE_READ_STAGE_ELAPSED .with_label_values(&["partition_prune_pk"]) - .observe(self.metrics.prune_pk.as_secs_f64()); + .observe(partition_prune_pk); + let partition_read_source = self.metrics.read_source.as_secs_f64(); MERGE_TREE_READ_STAGE_ELAPSED .with_label_values(&["partition_read_source"]) - .observe(self.metrics.read_source.as_secs_f64()); + .observe(partition_read_source); + let partition_data_batch_to_batch = self.metrics.data_batch_to_batch.as_secs_f64(); MERGE_TREE_READ_STAGE_ELAPSED .with_label_values(&["partition_data_batch_to_batch"]) - .observe(self.metrics.data_batch_to_batch.as_secs_f64()); - log::debug!( - "TreeIter pruning, before: {}, after: {}", - self.metrics.keys_before_pruning, - self.metrics.keys_before_pruning - ); + .observe(partition_data_batch_to_batch); + + if self.metrics.keys_before_pruning != 0 { + common_telemetry::debug!( + "TreeIter pruning, before: {}, after: {}, partition_read_source: {}s, partition_prune_pk: {}s, partition_data_batch_to_batch: {}s", + self.metrics.keys_before_pruning, + self.metrics.keys_after_pruning, + partition_read_source, + partition_prune_pk, + partition_data_batch_to_batch, + ); + } } } diff --git a/src/mito2/src/memtable/merge_tree/tree.rs b/src/mito2/src/memtable/merge_tree/tree.rs index 84c5fb09df..252d361f3b 100644 --- a/src/mito2/src/memtable/merge_tree/tree.rs +++ b/src/mito2/src/memtable/merge_tree/tree.rs @@ -20,7 +20,6 @@ use std::time::{Duration, Instant}; use api::v1::OpType; use common_recordbatch::filter::SimpleFilterEvaluator; -use common_telemetry::tracing::log; use common_time::Timestamp; use datafusion_common::ScalarValue; use snafu::ensure; @@ -37,7 +36,7 @@ use crate::memtable::merge_tree::partition::{ }; use crate::memtable::merge_tree::MergeTreeConfig; use crate::memtable::{BoxedBatchIterator, KeyValues}; -use crate::metrics::MERGE_TREE_READ_STAGE_ELAPSED; +use crate::metrics::{MERGE_TREE_READ_STAGE_ELAPSED, READ_STAGE_ELAPSED}; use crate::read::Batch; use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; @@ -135,6 +134,7 @@ impl MergeTree { projection: Option<&[ColumnId]>, predicate: Option, ) -> Result { + let start = Instant::now(); // Creates the projection set. let projection: HashSet<_> = if let Some(projection) = projection { projection.iter().copied().collect() @@ -151,12 +151,13 @@ impl MergeTree { }) .unwrap_or_default(); - let partitions = self.prune_partitions(&filters); + let mut tree_iter_metric = TreeIterMetrics::default(); + let partitions = self.prune_partitions(&filters, &mut tree_iter_metric); let mut iter = TreeIter { partitions, current_reader: None, - metrics: Default::default(), + metrics: tree_iter_metric, }; let context = ReadPartitionContext::new( self.metadata.clone(), @@ -166,6 +167,7 @@ impl MergeTree { ); iter.fetch_next_partition(context)?; + iter.metrics.iter_elapsed += start.elapsed(); Ok(Box::new(iter)) } @@ -285,8 +287,13 @@ impl MergeTree { .clone() } - fn prune_partitions(&self, filters: &[SimpleFilterEvaluator]) -> VecDeque { + fn prune_partitions( + &self, + filters: &[SimpleFilterEvaluator], + metrics: &mut TreeIterMetrics, + ) -> VecDeque { let partitions = self.partitions.read().unwrap(); + metrics.partitions_total = partitions.len(); if !self.is_partitioned { return partitions.values().cloned().collect(); } @@ -312,16 +319,19 @@ impl MergeTree { pruned.push_back(partition.clone()); } } - + metrics.partitions_after_pruning = pruned.len(); pruned } } #[derive(Default)] struct TreeIterMetrics { + iter_elapsed: Duration, fetch_partition_elapsed: Duration, rows_fetched: usize, batches_fetched: usize, + partitions_total: usize, + partitions_after_pruning: usize, } struct TreeIter { @@ -335,10 +345,17 @@ impl Drop for TreeIter { MERGE_TREE_READ_STAGE_ELAPSED .with_label_values(&["fetch_next_partition"]) .observe(self.metrics.fetch_partition_elapsed.as_secs_f64()); - log::debug!( - "TreeIter rows fetched: {}, batches fetched: {}", + let scan_elapsed = self.metrics.iter_elapsed.as_secs_f64(); + READ_STAGE_ELAPSED + .with_label_values(&["scan_memtable"]) + .observe(scan_elapsed); + common_telemetry::debug!( + "TreeIter partitions total: {}, partitions after prune: {}, rows fetched: {}, batches fetched: {}, scan elapsed: {}", + self.metrics.partitions_total, + self.metrics.partitions_after_pruning, self.metrics.rows_fetched, - self.metrics.batches_fetched + self.metrics.batches_fetched, + scan_elapsed ); } } @@ -347,7 +364,10 @@ impl Iterator for TreeIter { type Item = Result; fn next(&mut self) -> Option { - self.next_batch().transpose() + let start = Instant::now(); + let res = self.next_batch().transpose(); + self.metrics.iter_elapsed += start.elapsed(); + res } } @@ -378,6 +398,8 @@ impl TreeIter { let batch = part_reader.convert_current_batch()?; part_reader.next()?; if part_reader.is_valid() { + self.metrics.rows_fetched += batch.num_rows(); + self.metrics.batches_fetched += 1; return Ok(Some(batch)); }