feat: collect filters metrics for scanners (#4591)

* feat: collect filter metrics

* refactor: reuse ReaderFilterMetrics

* feat: record read rows from parquet by type

* feat: unordered scan observe rows

also fix read type

* chore: rename label
This commit is contained in:
Yingwen
2024-08-22 11:22:05 +08:00
committed by GitHub
parent 0025fa6ec7
commit d628079f4c
7 changed files with 144 additions and 70 deletions

View File

@@ -531,7 +531,10 @@ impl<'a> CompactionSstReaderBuilder<'a> {
scan_input.with_predicate(time_range_to_predicate(time_range, &self.metadata)?);
}
SeqScan::new(scan_input).build_reader().await
SeqScan::new(scan_input)
.with_compaction()
.build_reader()
.await
}
}

View File

@@ -56,6 +56,7 @@ use crate::error::{
use crate::memtable::BoxedBatchIterator;
use crate::metrics::{READ_BATCHES_RETURN, READ_ROWS_RETURN, READ_STAGE_ELAPSED};
use crate::read::prune::PruneReader;
use crate::sst::parquet::reader::{ReaderFilterMetrics, ReaderMetrics};
/// Storage internal representation of a batch of rows for a primary key (time series).
///
@@ -752,11 +753,13 @@ pub(crate) struct ScannerMetrics {
num_batches: usize,
/// Number of rows returned.
num_rows: usize,
/// Filter related metrics for readers.
filter_metrics: ReaderFilterMetrics,
}
impl ScannerMetrics {
/// Sets and observes metrics on initializing parts.
fn observe_init_part(&mut self, build_parts_cost: Duration) {
fn observe_init_part(&mut self, build_parts_cost: Duration, reader_metrics: &ReaderMetrics) {
self.build_parts_cost = build_parts_cost;
// Observes metrics.
@@ -766,6 +769,11 @@ impl ScannerMetrics {
READ_STAGE_ELAPSED
.with_label_values(&["build_parts"])
.observe(self.build_parts_cost.as_secs_f64());
// We only call this once so we overwrite it directly.
self.filter_metrics = reader_metrics.filter_metrics;
// Observes filter metrics.
self.filter_metrics.observe();
}
/// Observes metrics on scanner finish.

View File

@@ -97,13 +97,13 @@ impl PruneReader {
let num_rows_before_filter = batch.num_rows();
let Some(batch_filtered) = self.context.precise_filter(batch)? else {
// the entire batch is filtered out
self.metrics.num_rows_precise_filtered += num_rows_before_filter;
self.metrics.filter_metrics.num_rows_precise_filtered += num_rows_before_filter;
return Ok(None);
};
// update metric
let filtered_rows = num_rows_before_filter - batch_filtered.num_rows();
self.metrics.num_rows_precise_filtered += filtered_rows;
self.metrics.filter_metrics.num_rows_precise_filtered += filtered_rows;
if !batch_filtered.is_empty() {
Ok(Some(batch_filtered))

View File

@@ -50,6 +50,7 @@ use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef;
use crate::sst::parquet::file_range::FileRange;
use crate::sst::parquet::reader::ReaderMetrics;
/// A scanner scans a region and returns a [SendableRecordBatchStream].
pub(crate) enum Scanner {
@@ -606,8 +607,9 @@ impl ScanInput {
pub(crate) async fn prune_file_ranges(
&self,
collector: &mut impl FileRangeCollector,
) -> Result<()> {
) -> Result<ReaderMetrics> {
let mut file_prune_cost = Duration::ZERO;
let mut reader_metrics = ReaderMetrics::default();
for file in &self.files {
let prune_start = Instant::now();
let res = self
@@ -620,7 +622,7 @@ impl ScanInput {
.inverted_index_applier(self.inverted_index_applier.clone())
.fulltext_index_applier(self.fulltext_index_applier.clone())
.expected_metadata(Some(self.mapper.metadata().clone()))
.build_reader_input()
.build_reader_input(&mut reader_metrics)
.await;
file_prune_cost += prune_start.elapsed();
let (mut file_range_ctx, row_groups) = match res {
@@ -665,7 +667,7 @@ impl ScanInput {
file_prune_cost
);
Ok(())
Ok(reader_metrics)
}
/// Scans the input source in another task and sends batches to the sender.

View File

@@ -59,6 +59,8 @@ pub struct SeqScan {
/// Semaphore to control scan parallelism of files.
/// Streams created by the scanner share the same semaphore.
semaphore: Arc<Semaphore>,
/// The scanner is used for compaction.
compaction: bool,
}
impl SeqScan {
@@ -75,9 +77,16 @@ impl SeqScan {
properties,
stream_ctx,
semaphore: Arc::new(Semaphore::new(parallelism)),
compaction: false,
}
}
/// Sets the scanner to be used for compaction.
pub(crate) fn with_compaction(mut self) -> Self {
self.compaction = true;
self
}
/// Builds a stream for the query.
///
/// The returned stream is not partitioned and will contains all the data. If want
@@ -97,9 +106,13 @@ impl SeqScan {
prepare_scan_cost: self.stream_ctx.query_start.elapsed(),
..Default::default()
};
let maybe_reader =
Self::build_all_merge_reader(&self.stream_ctx, self.semaphore.clone(), &mut metrics)
.await?;
let maybe_reader = Self::build_all_merge_reader(
&self.stream_ctx,
self.semaphore.clone(),
&mut metrics,
self.compaction,
)
.await?;
// Safety: `build_merge_reader()` always returns a reader if partition is None.
let reader = maybe_reader.unwrap();
Ok(Box::new(reader))
@@ -110,6 +123,7 @@ impl SeqScan {
part: &ScanPart,
sources: &mut Vec<Source>,
row_selector: Option<TimeSeriesRowSelector>,
compaction: bool,
) -> Result<()> {
sources.reserve(part.memtable_ranges.len() + part.file_ranges.len());
// Read memtables.
@@ -117,6 +131,11 @@ impl SeqScan {
let iter = mem.build_iter()?;
sources.push(Source::Iter(iter));
}
let read_type = if compaction {
"compaction"
} else {
"seq_scan_files"
};
// Read files.
for file in &part.file_ranges {
if file.is_empty() {
@@ -148,6 +167,8 @@ impl SeqScan {
"Seq scan region {}, file {}, {} ranges finished, metrics: {:?}",
region_id, file_id, range_num, reader_metrics
);
// Reports metrics.
reader_metrics.observe_rows(read_type);
};
let stream = Box::pin(stream);
sources.push(Source::Stream(stream));
@@ -161,6 +182,7 @@ impl SeqScan {
stream_ctx: &StreamContext,
semaphore: Arc<Semaphore>,
metrics: &mut ScannerMetrics,
compaction: bool,
) -> Result<Option<BoxedBatchReader>> {
// initialize parts list
let mut parts = stream_ctx.parts.lock().await;
@@ -173,7 +195,7 @@ impl SeqScan {
return Ok(None);
};
Self::build_part_sources(part, &mut sources, None)?;
Self::build_part_sources(part, &mut sources, None, compaction)?;
}
Self::build_reader_from_sources(stream_ctx, sources, semaphore).await
@@ -187,6 +209,7 @@ impl SeqScan {
range_id: usize,
semaphore: Arc<Semaphore>,
metrics: &mut ScannerMetrics,
compaction: bool,
) -> Result<Option<BoxedBatchReader>> {
let mut sources = Vec::new();
let build_start = {
@@ -198,7 +221,12 @@ impl SeqScan {
};
let build_start = Instant::now();
Self::build_part_sources(part, &mut sources, stream_ctx.input.series_row_selector)?;
Self::build_part_sources(
part,
&mut sources,
stream_ctx.input.series_row_selector,
compaction,
)?;
build_start
};
@@ -281,12 +309,13 @@ impl SeqScan {
let stream_ctx = self.stream_ctx.clone();
let semaphore = self.semaphore.clone();
let partition_ranges = self.properties.partitions[partition].clone();
let compaction = self.compaction;
let stream = try_stream! {
let first_poll = stream_ctx.query_start.elapsed();
for partition_range in partition_ranges {
let maybe_reader =
Self::build_merge_reader(&stream_ctx, partition_range.identifier, semaphore.clone(), &mut metrics)
Self::build_merge_reader(&stream_ctx, partition_range.identifier, semaphore.clone(), &mut metrics, compaction)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
@@ -359,6 +388,7 @@ impl SeqScan {
};
let stream_ctx = self.stream_ctx.clone();
let semaphore = self.semaphore.clone();
let compaction = self.compaction;
// build stream
let stream = try_stream! {
@@ -379,6 +409,7 @@ impl SeqScan {
id,
semaphore.clone(),
&mut metrics,
compaction,
)
.await
.map_err(BoxedError::new)
@@ -439,7 +470,7 @@ impl SeqScan {
if part_list.0.is_none() {
let now = Instant::now();
let mut distributor = SeqDistributor::default();
input.prune_file_ranges(&mut distributor).await?;
let reader_metrics = input.prune_file_ranges(&mut distributor).await?;
distributor.append_mem_ranges(
&input.memtables,
Some(input.mapper.column_ids()),
@@ -451,7 +482,7 @@ impl SeqScan {
let build_part_cost = now.elapsed();
part_list.1 = build_part_cost;
metrics.observe_init_part(build_part_cost);
metrics.observe_init_part(build_part_cost, &reader_metrics);
} else {
// Updates the cost of building parts.
metrics.build_parts_cost = part_list.1;

View File

@@ -211,6 +211,7 @@ impl RegionScanner for UnorderedScan {
}
}
reader_metrics.observe_rows("unordered_scan_files");
metrics.total_cost = query_start.elapsed();
metrics.observe_metrics_on_finish();
debug!(
@@ -263,7 +264,7 @@ async fn maybe_init_parts(
if part_list.0.is_none() {
let now = Instant::now();
let mut distributor = UnorderedDistributor::default();
input.prune_file_ranges(&mut distributor).await?;
let reader_metrics = input.prune_file_ranges(&mut distributor).await?;
distributor.append_mem_ranges(
&input.memtables,
Some(input.mapper.column_ids()),
@@ -275,7 +276,7 @@ async fn maybe_init_parts(
let build_part_cost = now.elapsed();
part_list.1 = build_part_cost;
metrics.observe_init_part(build_part_cost);
metrics.observe_init_part(build_part_cost, &reader_metrics);
} else {
// Updates the cost of building parts.
metrics.build_parts_cost = part_list.1;

View File

@@ -174,14 +174,19 @@ impl ParquetReaderBuilder {
///
/// This needs to perform IO operation.
pub async fn build(&self) -> Result<ParquetReader> {
let (context, row_groups) = self.build_reader_input().await?;
let mut metrics = ReaderMetrics::default();
let (context, row_groups) = self.build_reader_input(&mut metrics).await?;
ParquetReader::new(Arc::new(context), row_groups).await
}
/// Builds a [FileRangeContext] and collects row groups to read.
///
/// This needs to perform IO operation.
pub(crate) async fn build_reader_input(&self) -> Result<(FileRangeContext, RowGroupMap)> {
pub(crate) async fn build_reader_input(
&self,
metrics: &mut ReaderMetrics,
) -> Result<(FileRangeContext, RowGroupMap)> {
let start = Instant::now();
let file_path = self.file_handle.file_path(&self.file_dir);
@@ -219,10 +224,8 @@ impl ParquetReaderBuilder {
parquet_to_arrow_field_levels(parquet_schema_desc, projection_mask.clone(), hint)
.context(ReadParquetSnafu { path: &file_path })?;
let mut metrics = ReaderMetrics::default();
let row_groups = self
.row_groups_to_read(&read_format, &parquet_meta, &mut metrics)
.row_groups_to_read(&read_format, &parquet_meta, &mut metrics.filter_metrics)
.await;
let reader_builder = RowGroupReaderBuilder {
@@ -336,7 +339,7 @@ impl ParquetReaderBuilder {
&self,
read_format: &ReadFormat,
parquet_meta: &ParquetMetaData,
metrics: &mut ReaderMetrics,
metrics: &mut ReaderFilterMetrics,
) -> BTreeMap<usize, Option<RowSelection>> {
let num_row_groups = parquet_meta.num_row_groups();
let num_rows = parquet_meta.file_metadata().num_rows();
@@ -382,7 +385,7 @@ impl ParquetReaderBuilder {
row_group_size: usize,
parquet_meta: &ParquetMetaData,
output: &mut BTreeMap<usize, Option<RowSelection>>,
metrics: &mut ReaderMetrics,
metrics: &mut ReaderFilterMetrics,
) -> bool {
let Some(index_applier) = &self.fulltext_index_applier else {
return false;
@@ -462,7 +465,7 @@ impl ParquetReaderBuilder {
row_group_size: usize,
parquet_meta: &ParquetMetaData,
output: &mut BTreeMap<usize, Option<RowSelection>>,
metrics: &mut ReaderMetrics,
metrics: &mut ReaderFilterMetrics,
) -> bool {
let Some(index_applier) = &self.inverted_index_applier else {
return false;
@@ -529,7 +532,7 @@ impl ParquetReaderBuilder {
read_format: &ReadFormat,
parquet_meta: &ParquetMetaData,
output: &mut BTreeMap<usize, Option<RowSelection>>,
metrics: &mut ReaderMetrics,
metrics: &mut ReaderFilterMetrics,
) -> bool {
let Some(predicate) = &self.predicate else {
return false;
@@ -724,9 +727,9 @@ fn time_range_to_predicate(
Ok(predicates)
}
/// Parquet reader metrics.
#[derive(Debug, Default, Clone)]
pub(crate) struct ReaderMetrics {
/// Metrics of filtering rows groups and rows.
#[derive(Debug, Default, Clone, Copy)]
pub(crate) struct ReaderFilterMetrics {
/// Number of row groups before filtering.
pub(crate) num_row_groups_before_filtering: usize,
/// Number of row groups filtered by fulltext index.
@@ -743,6 +746,57 @@ pub(crate) struct ReaderMetrics {
pub(crate) num_rows_in_row_group_fulltext_index_filtered: usize,
/// Number of rows in row group filtered by inverted index.
pub(crate) num_rows_in_row_group_inverted_index_filtered: usize,
}
impl ReaderFilterMetrics {
/// Adds `other` metrics to this metrics.
pub(crate) fn merge_from(&mut self, other: &ReaderFilterMetrics) {
self.num_row_groups_before_filtering += other.num_row_groups_before_filtering;
self.num_row_groups_fulltext_index_filtered += other.num_row_groups_fulltext_index_filtered;
self.num_row_groups_inverted_index_filtered += other.num_row_groups_inverted_index_filtered;
self.num_row_groups_min_max_filtered += other.num_row_groups_min_max_filtered;
self.num_rows_precise_filtered += other.num_rows_precise_filtered;
self.num_rows_in_row_group_before_filtering += other.num_rows_in_row_group_before_filtering;
self.num_rows_in_row_group_fulltext_index_filtered +=
other.num_rows_in_row_group_fulltext_index_filtered;
self.num_rows_in_row_group_inverted_index_filtered +=
other.num_rows_in_row_group_inverted_index_filtered;
}
/// Reports metrics.
pub(crate) fn observe(&self) {
READ_ROW_GROUPS_TOTAL
.with_label_values(&["before_filtering"])
.inc_by(self.num_row_groups_before_filtering as u64);
READ_ROW_GROUPS_TOTAL
.with_label_values(&["fulltext_index_filtered"])
.inc_by(self.num_row_groups_fulltext_index_filtered as u64);
READ_ROW_GROUPS_TOTAL
.with_label_values(&["inverted_index_filtered"])
.inc_by(self.num_row_groups_inverted_index_filtered as u64);
READ_ROW_GROUPS_TOTAL
.with_label_values(&["minmax_index_filtered"])
.inc_by(self.num_row_groups_min_max_filtered as u64);
PRECISE_FILTER_ROWS_TOTAL
.with_label_values(&["parquet"])
.inc_by(self.num_rows_precise_filtered as u64);
READ_ROWS_IN_ROW_GROUP_TOTAL
.with_label_values(&["before_filtering"])
.inc_by(self.num_rows_in_row_group_before_filtering as u64);
READ_ROWS_IN_ROW_GROUP_TOTAL
.with_label_values(&["fulltext_index_filtered"])
.inc_by(self.num_rows_in_row_group_fulltext_index_filtered as u64);
READ_ROWS_IN_ROW_GROUP_TOTAL
.with_label_values(&["inverted_index_filtered"])
.inc_by(self.num_rows_in_row_group_inverted_index_filtered as u64);
}
}
/// Parquet reader metrics.
#[derive(Debug, Default, Clone)]
pub(crate) struct ReaderMetrics {
/// Filtered row groups and rows metrics.
pub(crate) filter_metrics: ReaderFilterMetrics,
/// Duration to build the parquet reader.
pub(crate) build_cost: Duration,
/// Duration to scan the reader.
@@ -758,22 +812,20 @@ pub(crate) struct ReaderMetrics {
impl ReaderMetrics {
/// Adds `other` metrics to this metrics.
pub(crate) fn merge_from(&mut self, other: &ReaderMetrics) {
self.num_row_groups_before_filtering += other.num_row_groups_before_filtering;
self.num_row_groups_fulltext_index_filtered += other.num_row_groups_fulltext_index_filtered;
self.num_row_groups_inverted_index_filtered += other.num_row_groups_inverted_index_filtered;
self.num_row_groups_min_max_filtered += other.num_row_groups_min_max_filtered;
self.num_rows_precise_filtered += other.num_rows_precise_filtered;
self.num_rows_in_row_group_before_filtering += other.num_rows_in_row_group_before_filtering;
self.num_rows_in_row_group_fulltext_index_filtered +=
other.num_rows_in_row_group_fulltext_index_filtered;
self.num_rows_in_row_group_inverted_index_filtered +=
other.num_rows_in_row_group_inverted_index_filtered;
self.filter_metrics.merge_from(&other.filter_metrics);
self.build_cost += other.build_cost;
self.scan_cost += other.scan_cost;
self.num_record_batches += other.num_record_batches;
self.num_batches += other.num_batches;
self.num_rows += other.num_rows;
}
/// Reports total rows.
pub(crate) fn observe_rows(&self, read_type: &str) {
READ_ROWS_TOTAL
.with_label_values(&[read_type])
.inc_by(self.num_rows as u64);
}
}
/// Builder to build a [ParquetRecordBatchReader] for a row group.
@@ -1006,10 +1058,12 @@ impl Drop for ParquetReader {
self.context.reader_builder().file_handle.region_id(),
self.context.reader_builder().file_handle.file_id(),
self.context.reader_builder().file_handle.time_range(),
metrics.num_row_groups_before_filtering
- metrics.num_row_groups_inverted_index_filtered
- metrics.num_row_groups_min_max_filtered,
metrics.num_row_groups_before_filtering,
metrics.filter_metrics.num_row_groups_before_filtering
- metrics
.filter_metrics
.num_row_groups_inverted_index_filtered
- metrics.filter_metrics.num_row_groups_min_max_filtered,
metrics.filter_metrics.num_row_groups_before_filtering,
metrics
);
@@ -1020,33 +1074,8 @@ impl Drop for ParquetReader {
READ_STAGE_ELAPSED
.with_label_values(&["scan_row_groups"])
.observe(metrics.scan_cost.as_secs_f64());
READ_ROWS_TOTAL
.with_label_values(&["parquet"])
.inc_by(metrics.num_rows as u64);
READ_ROW_GROUPS_TOTAL
.with_label_values(&["before_filtering"])
.inc_by(metrics.num_row_groups_before_filtering as u64);
READ_ROW_GROUPS_TOTAL
.with_label_values(&["fulltext_index_filtered"])
.inc_by(metrics.num_row_groups_fulltext_index_filtered as u64);
READ_ROW_GROUPS_TOTAL
.with_label_values(&["inverted_index_filtered"])
.inc_by(metrics.num_row_groups_inverted_index_filtered as u64);
READ_ROW_GROUPS_TOTAL
.with_label_values(&["minmax_index_filtered"])
.inc_by(metrics.num_row_groups_min_max_filtered as u64);
PRECISE_FILTER_ROWS_TOTAL
.with_label_values(&["parquet"])
.inc_by(metrics.num_rows_precise_filtered as u64);
READ_ROWS_IN_ROW_GROUP_TOTAL
.with_label_values(&["before_filtering"])
.inc_by(metrics.num_rows_in_row_group_before_filtering as u64);
READ_ROWS_IN_ROW_GROUP_TOTAL
.with_label_values(&["fulltext_index_filtered"])
.inc_by(metrics.num_rows_in_row_group_fulltext_index_filtered as u64);
READ_ROWS_IN_ROW_GROUP_TOTAL
.with_label_values(&["inverted_index_filtered"])
.inc_by(metrics.num_rows_in_row_group_inverted_index_filtered as u64);
metrics.observe_rows("parquet_reader");
metrics.filter_metrics.observe();
}
}