fix: some read metrics (#3404)

* fix: some read metrics

* chore: fix some metrics

* fix
This commit is contained in:
Lei, HUANG
2024-02-28 16:47:49 +08:00
committed by GitHub
parent c3c80b92c8
commit a0a8e8c587
2 changed files with 74 additions and 27 deletions

View File

@@ -22,7 +22,6 @@ use std::time::{Duration, Instant};
use api::v1::SemanticType;
use common_recordbatch::filter::SimpleFilterEvaluator;
use common_telemetry::tracing::log;
use store_api::metadata::RegionMetadataRef;
use store_api::metric_engine_consts::DATA_SCHEMA_TABLE_ID_COLUMN_NAME;
use store_api::storage::ColumnId;
@@ -278,9 +277,7 @@ impl PartitionReader {
/// # Panics
/// Panics if the reader is invalid.
pub fn next(&mut self) -> Result<()> {
let read_source = Instant::now();
self.source.next()?;
self.context.metrics.read_source += read_source.elapsed();
self.advance_source()?;
self.prune_batch_by_key()
}
@@ -305,8 +302,14 @@ impl PartitionReader {
self.context
}
fn advance_source(&mut self) -> Result<()> {
let read_source = Instant::now();
self.source.next()?;
self.context.metrics.read_source += read_source.elapsed();
Ok(())
}
fn prune_batch_by_key(&mut self) -> Result<()> {
let start = Instant::now();
if self.context.metadata.primary_key.is_empty() || !self.context.need_prune_key {
// Nothing to prune.
return Ok(());
@@ -329,23 +332,36 @@ impl PartitionReader {
&self.context.filters,
&self.context.row_codec,
key,
&mut self.context.metrics,
) {
// We need this key.
self.last_yield_pk_id = Some(pk_id);
self.context.metrics.keys_after_pruning += 1;
break;
}
self.source.next()?;
self.advance_source()?;
}
self.context.metrics.prune_pk += start.elapsed();
Ok(())
}
}
// TODO(yingwen): Improve performance of key prunning. Now we need to find index and
fn prune_primary_key(
metadata: &RegionMetadataRef,
filters: &[SimpleFilterEvaluator],
codec: &McmpRowCodec,
pk: &[u8],
metrics: &mut PartitionReaderMetrics,
) -> bool {
let start = Instant::now();
let res = prune_primary_key_inner(metadata, filters, codec, pk);
metrics.prune_pk += start.elapsed();
res
}
// TODO(yingwen): Improve performance of key pruning. Now we need to find index and
// then decode and convert each value.
/// Returns true if the `pk` is still needed.
fn prune_primary_key(
fn prune_primary_key_inner(
metadata: &RegionMetadataRef,
filters: &[SimpleFilterEvaluator],
codec: &McmpRowCodec,
@@ -408,20 +424,29 @@ pub(crate) struct ReadPartitionContext {
impl Drop for ReadPartitionContext {
fn drop(&mut self) {
let partition_prune_pk = self.metrics.prune_pk.as_secs_f64();
MERGE_TREE_READ_STAGE_ELAPSED
.with_label_values(&["partition_prune_pk"])
.observe(self.metrics.prune_pk.as_secs_f64());
.observe(partition_prune_pk);
let partition_read_source = self.metrics.read_source.as_secs_f64();
MERGE_TREE_READ_STAGE_ELAPSED
.with_label_values(&["partition_read_source"])
.observe(self.metrics.read_source.as_secs_f64());
.observe(partition_read_source);
let partition_data_batch_to_batch = self.metrics.data_batch_to_batch.as_secs_f64();
MERGE_TREE_READ_STAGE_ELAPSED
.with_label_values(&["partition_data_batch_to_batch"])
.observe(self.metrics.data_batch_to_batch.as_secs_f64());
log::debug!(
"TreeIter pruning, before: {}, after: {}",
self.metrics.keys_before_pruning,
self.metrics.keys_before_pruning
);
.observe(partition_data_batch_to_batch);
if self.metrics.keys_before_pruning != 0 {
common_telemetry::debug!(
"TreeIter pruning, before: {}, after: {}, partition_read_source: {}s, partition_prune_pk: {}s, partition_data_batch_to_batch: {}s",
self.metrics.keys_before_pruning,
self.metrics.keys_after_pruning,
partition_read_source,
partition_prune_pk,
partition_data_batch_to_batch,
);
}
}
}

View File

@@ -20,7 +20,6 @@ use std::time::{Duration, Instant};
use api::v1::OpType;
use common_recordbatch::filter::SimpleFilterEvaluator;
use common_telemetry::tracing::log;
use common_time::Timestamp;
use datafusion_common::ScalarValue;
use snafu::ensure;
@@ -37,7 +36,7 @@ use crate::memtable::merge_tree::partition::{
};
use crate::memtable::merge_tree::MergeTreeConfig;
use crate::memtable::{BoxedBatchIterator, KeyValues};
use crate::metrics::MERGE_TREE_READ_STAGE_ELAPSED;
use crate::metrics::{MERGE_TREE_READ_STAGE_ELAPSED, READ_STAGE_ELAPSED};
use crate::read::Batch;
use crate::row_converter::{McmpRowCodec, RowCodec, SortField};
@@ -135,6 +134,7 @@ impl MergeTree {
projection: Option<&[ColumnId]>,
predicate: Option<Predicate>,
) -> Result<BoxedBatchIterator> {
let start = Instant::now();
// Creates the projection set.
let projection: HashSet<_> = if let Some(projection) = projection {
projection.iter().copied().collect()
@@ -151,12 +151,13 @@ impl MergeTree {
})
.unwrap_or_default();
let partitions = self.prune_partitions(&filters);
let mut tree_iter_metric = TreeIterMetrics::default();
let partitions = self.prune_partitions(&filters, &mut tree_iter_metric);
let mut iter = TreeIter {
partitions,
current_reader: None,
metrics: Default::default(),
metrics: tree_iter_metric,
};
let context = ReadPartitionContext::new(
self.metadata.clone(),
@@ -166,6 +167,7 @@ impl MergeTree {
);
iter.fetch_next_partition(context)?;
iter.metrics.iter_elapsed += start.elapsed();
Ok(Box::new(iter))
}
@@ -285,8 +287,13 @@ impl MergeTree {
.clone()
}
fn prune_partitions(&self, filters: &[SimpleFilterEvaluator]) -> VecDeque<PartitionRef> {
fn prune_partitions(
&self,
filters: &[SimpleFilterEvaluator],
metrics: &mut TreeIterMetrics,
) -> VecDeque<PartitionRef> {
let partitions = self.partitions.read().unwrap();
metrics.partitions_total = partitions.len();
if !self.is_partitioned {
return partitions.values().cloned().collect();
}
@@ -312,16 +319,19 @@ impl MergeTree {
pruned.push_back(partition.clone());
}
}
metrics.partitions_after_pruning = pruned.len();
pruned
}
}
#[derive(Default)]
struct TreeIterMetrics {
iter_elapsed: Duration,
fetch_partition_elapsed: Duration,
rows_fetched: usize,
batches_fetched: usize,
partitions_total: usize,
partitions_after_pruning: usize,
}
struct TreeIter {
@@ -335,10 +345,17 @@ impl Drop for TreeIter {
MERGE_TREE_READ_STAGE_ELAPSED
.with_label_values(&["fetch_next_partition"])
.observe(self.metrics.fetch_partition_elapsed.as_secs_f64());
log::debug!(
"TreeIter rows fetched: {}, batches fetched: {}",
let scan_elapsed = self.metrics.iter_elapsed.as_secs_f64();
READ_STAGE_ELAPSED
.with_label_values(&["scan_memtable"])
.observe(scan_elapsed);
common_telemetry::debug!(
"TreeIter partitions total: {}, partitions after prune: {}, rows fetched: {}, batches fetched: {}, scan elapsed: {}",
self.metrics.partitions_total,
self.metrics.partitions_after_pruning,
self.metrics.rows_fetched,
self.metrics.batches_fetched
self.metrics.batches_fetched,
scan_elapsed
);
}
}
@@ -347,7 +364,10 @@ impl Iterator for TreeIter {
type Item = Result<Batch>;
fn next(&mut self) -> Option<Self::Item> {
self.next_batch().transpose()
let start = Instant::now();
let res = self.next_batch().transpose();
self.metrics.iter_elapsed += start.elapsed();
res
}
}
@@ -378,6 +398,8 @@ impl TreeIter {
let batch = part_reader.convert_current_batch()?;
part_reader.next()?;
if part_reader.is_valid() {
self.metrics.rows_fetched += batch.num_rows();
self.metrics.batches_fetched += 1;
return Ok(Some(batch));
}