|
|
|
|
@@ -14,7 +14,7 @@
|
|
|
|
|
|
|
|
|
|
//! Utilities for scanners.
|
|
|
|
|
|
|
|
|
|
use std::collections::{HashMap, VecDeque};
|
|
|
|
|
use std::collections::VecDeque;
|
|
|
|
|
use std::fmt;
|
|
|
|
|
use std::pin::Pin;
|
|
|
|
|
use std::sync::{Arc, Mutex};
|
|
|
|
|
@@ -26,7 +26,6 @@ use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder,
|
|
|
|
|
use datatypes::arrow::record_batch::RecordBatch;
|
|
|
|
|
use datatypes::timestamp::timestamp_array_to_primitive;
|
|
|
|
|
use futures::Stream;
|
|
|
|
|
use lazy_static::lazy_static;
|
|
|
|
|
use prometheus::IntGauge;
|
|
|
|
|
use smallvec::SmallVec;
|
|
|
|
|
use snafu::OptionExt;
|
|
|
|
|
@@ -43,7 +42,7 @@ use crate::read::merge::{MergeMetrics, MergeMetricsReport};
|
|
|
|
|
use crate::read::range::{RangeBuilderList, RangeMeta, RowGroupIndex};
|
|
|
|
|
use crate::read::scan_region::StreamContext;
|
|
|
|
|
use crate::read::{Batch, BoxedBatchStream, BoxedRecordBatchStream, ScannerMetrics, Source};
|
|
|
|
|
use crate::sst::file::{FileTimeRange, RegionFileId};
|
|
|
|
|
use crate::sst::file::FileTimeRange;
|
|
|
|
|
use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplyMetrics;
|
|
|
|
|
use crate::sst::index::fulltext_index::applier::FulltextIndexApplyMetrics;
|
|
|
|
|
use crate::sst::index::inverted_index::applier::InvertedIndexApplyMetrics;
|
|
|
|
|
@@ -53,70 +52,6 @@ use crate::sst::parquet::flat_format::time_index_column_index;
|
|
|
|
|
use crate::sst::parquet::reader::{MetadataCacheMetrics, ReaderFilterMetrics, ReaderMetrics};
|
|
|
|
|
use crate::sst::parquet::row_group::ParquetFetchMetrics;
|
|
|
|
|
|
|
|
|
|
lazy_static! {
|
|
|
|
|
/// Threshold for slow file scan warning in milliseconds.
|
|
|
|
|
/// Can be configured via SLOW_FILE_SCAN_THRESHOLD_MS environment variable.
|
|
|
|
|
/// Defaults to 1000ms (1 second).
|
|
|
|
|
static ref SLOW_FILE_SCAN_THRESHOLD: Duration = {
|
|
|
|
|
let threshold_ms = std::env::var("SLOW_FILE_SCAN_THRESHOLD_MS")
|
|
|
|
|
.ok()
|
|
|
|
|
.and_then(|s| s.parse::<u64>().ok())
|
|
|
|
|
.unwrap_or(1000);
|
|
|
|
|
Duration::from_millis(threshold_ms)
|
|
|
|
|
};
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Per-file scan metrics.
|
|
|
|
|
#[derive(Default, Clone)]
|
|
|
|
|
pub struct FileScanMetrics {
|
|
|
|
|
/// Number of ranges (row groups) read from this file.
|
|
|
|
|
pub num_ranges: usize,
|
|
|
|
|
/// Number of rows read from this file.
|
|
|
|
|
pub num_rows: usize,
|
|
|
|
|
/// Time spent building file ranges/parts (file-level preparation).
|
|
|
|
|
pub build_part_cost: Duration,
|
|
|
|
|
/// Time spent building readers for this file (accumulated across all ranges).
|
|
|
|
|
pub build_reader_cost: Duration,
|
|
|
|
|
/// Time spent scanning this file (accumulated across all ranges).
|
|
|
|
|
pub scan_cost: Duration,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl fmt::Debug for FileScanMetrics {
|
|
|
|
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
|
|
|
write!(f, "{{\"build_part_cost\":\"{:?}\"", self.build_part_cost)?;
|
|
|
|
|
|
|
|
|
|
if self.num_ranges > 0 {
|
|
|
|
|
write!(f, ", \"num_ranges\":{}", self.num_ranges)?;
|
|
|
|
|
}
|
|
|
|
|
if self.num_rows > 0 {
|
|
|
|
|
write!(f, ", \"num_rows\":{}", self.num_rows)?;
|
|
|
|
|
}
|
|
|
|
|
if !self.build_reader_cost.is_zero() {
|
|
|
|
|
write!(
|
|
|
|
|
f,
|
|
|
|
|
", \"build_reader_cost\":\"{:?}\"",
|
|
|
|
|
self.build_reader_cost
|
|
|
|
|
)?;
|
|
|
|
|
}
|
|
|
|
|
if !self.scan_cost.is_zero() {
|
|
|
|
|
write!(f, ", \"scan_cost\":\"{:?}\"", self.scan_cost)?;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
write!(f, "}}")
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl FileScanMetrics {
|
|
|
|
|
/// Merges another FileMetrics into this one.
|
|
|
|
|
pub(crate) fn merge_from(&mut self, other: &FileScanMetrics) {
|
|
|
|
|
self.num_ranges += other.num_ranges;
|
|
|
|
|
self.num_rows += other.num_rows;
|
|
|
|
|
self.build_part_cost += other.build_part_cost;
|
|
|
|
|
self.build_reader_cost += other.build_reader_cost;
|
|
|
|
|
self.scan_cost += other.scan_cost;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Verbose scan metrics for a partition.
|
|
|
|
|
#[derive(Default)]
|
|
|
|
|
pub(crate) struct ScanMetricsSet {
|
|
|
|
|
@@ -216,8 +151,6 @@ pub(crate) struct ScanMetricsSet {
|
|
|
|
|
fetch_metrics: Option<ParquetFetchMetrics>,
|
|
|
|
|
/// Metadata cache metrics.
|
|
|
|
|
metadata_cache_metrics: Option<MetadataCacheMetrics>,
|
|
|
|
|
/// Per-file scan metrics, only populated when explain_verbose is true.
|
|
|
|
|
per_file_metrics: Option<HashMap<RegionFileId, FileScanMetrics>>,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl fmt::Debug for ScanMetricsSet {
|
|
|
|
|
@@ -266,7 +199,6 @@ impl fmt::Debug for ScanMetricsSet {
|
|
|
|
|
fulltext_index_apply_metrics,
|
|
|
|
|
fetch_metrics,
|
|
|
|
|
metadata_cache_metrics,
|
|
|
|
|
per_file_metrics,
|
|
|
|
|
} = self;
|
|
|
|
|
|
|
|
|
|
// Write core metrics
|
|
|
|
|
@@ -394,20 +326,6 @@ impl fmt::Debug for ScanMetricsSet {
|
|
|
|
|
write!(f, ", \"dedup_metrics\":{:?}", dedup_metrics)?;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Write per-file metrics if present and non-empty
|
|
|
|
|
if let Some(file_metrics) = per_file_metrics
|
|
|
|
|
&& !file_metrics.is_empty()
|
|
|
|
|
{
|
|
|
|
|
write!(f, ", \"per_file_metrics\": {{")?;
|
|
|
|
|
for (i, (file_id, metrics)) in file_metrics.iter().enumerate() {
|
|
|
|
|
if i > 0 {
|
|
|
|
|
write!(f, ", ")?;
|
|
|
|
|
}
|
|
|
|
|
write!(f, "\"{}\": {:?}", file_id, metrics)?;
|
|
|
|
|
}
|
|
|
|
|
write!(f, "}}")?;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
write!(f, ", \"stream_eof\":{stream_eof}}}")
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
@@ -514,17 +432,6 @@ impl ScanMetricsSet {
|
|
|
|
|
.merge_from(metadata_cache_metrics);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Merges per-file metrics.
|
|
|
|
|
fn merge_per_file_metrics(&mut self, other: &HashMap<RegionFileId, FileScanMetrics>) {
|
|
|
|
|
let self_file_metrics = self.per_file_metrics.get_or_insert_with(HashMap::new);
|
|
|
|
|
for (file_id, metrics) in other {
|
|
|
|
|
self_file_metrics
|
|
|
|
|
.entry(*file_id)
|
|
|
|
|
.or_default()
|
|
|
|
|
.merge_from(metrics);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Sets distributor metrics.
|
|
|
|
|
fn set_distributor_metrics(&mut self, distributor_metrics: &SeriesDistributorMetrics) {
|
|
|
|
|
let SeriesDistributorMetrics {
|
|
|
|
|
@@ -815,20 +722,11 @@ impl PartitionMetrics {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Merges [ReaderMetrics] and `build_reader_cost`.
|
|
|
|
|
pub fn merge_reader_metrics(
|
|
|
|
|
&self,
|
|
|
|
|
metrics: &ReaderMetrics,
|
|
|
|
|
per_file_metrics: Option<&HashMap<RegionFileId, FileScanMetrics>>,
|
|
|
|
|
) {
|
|
|
|
|
pub fn merge_reader_metrics(&self, metrics: &ReaderMetrics) {
|
|
|
|
|
self.0.build_parts_cost.add_duration(metrics.build_cost);
|
|
|
|
|
|
|
|
|
|
let mut metrics_set = self.0.metrics.lock().unwrap();
|
|
|
|
|
metrics_set.merge_reader_metrics(metrics);
|
|
|
|
|
|
|
|
|
|
// Merge per-file metrics if provided
|
|
|
|
|
if let Some(file_metrics) = per_file_metrics {
|
|
|
|
|
metrics_set.merge_per_file_metrics(file_metrics);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Finishes the query.
|
|
|
|
|
@@ -1040,44 +938,13 @@ pub(crate) async fn scan_file_ranges(
|
|
|
|
|
.build_file_ranges(&stream_ctx.input, index, &mut reader_metrics)
|
|
|
|
|
.await?;
|
|
|
|
|
part_metrics.inc_num_file_ranges(ranges.len());
|
|
|
|
|
part_metrics.merge_reader_metrics(&reader_metrics, None);
|
|
|
|
|
|
|
|
|
|
// Creates initial per-file metrics with build_part_cost.
|
|
|
|
|
let init_per_file_metrics = if part_metrics.explain_verbose() {
|
|
|
|
|
let file = stream_ctx.input.file_from_index(index);
|
|
|
|
|
let file_id = file.file_id();
|
|
|
|
|
|
|
|
|
|
let mut map = HashMap::new();
|
|
|
|
|
map.insert(
|
|
|
|
|
file_id,
|
|
|
|
|
FileScanMetrics {
|
|
|
|
|
build_part_cost: reader_metrics.build_cost,
|
|
|
|
|
..Default::default()
|
|
|
|
|
},
|
|
|
|
|
);
|
|
|
|
|
Some(map)
|
|
|
|
|
} else {
|
|
|
|
|
None
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// Warn if build_part_cost exceeds threshold
|
|
|
|
|
if reader_metrics.build_cost > *SLOW_FILE_SCAN_THRESHOLD {
|
|
|
|
|
let file = stream_ctx.input.file_from_index(index);
|
|
|
|
|
let file_id = file.file_id();
|
|
|
|
|
common_telemetry::warn!(
|
|
|
|
|
"Slow file part build detected - region_id: {}, file_id: {}, build_part_cost: {:?}",
|
|
|
|
|
part_metrics.0.region_id,
|
|
|
|
|
file_id,
|
|
|
|
|
reader_metrics.build_cost
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
part_metrics.merge_reader_metrics(&reader_metrics);
|
|
|
|
|
|
|
|
|
|
Ok(build_file_range_scan_stream(
|
|
|
|
|
stream_ctx,
|
|
|
|
|
part_metrics,
|
|
|
|
|
read_type,
|
|
|
|
|
ranges,
|
|
|
|
|
init_per_file_metrics,
|
|
|
|
|
))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -1097,44 +964,13 @@ pub(crate) async fn scan_flat_file_ranges(
|
|
|
|
|
.build_file_ranges(&stream_ctx.input, index, &mut reader_metrics)
|
|
|
|
|
.await?;
|
|
|
|
|
part_metrics.inc_num_file_ranges(ranges.len());
|
|
|
|
|
part_metrics.merge_reader_metrics(&reader_metrics, None);
|
|
|
|
|
|
|
|
|
|
// Creates initial per-file metrics with build_part_cost.
|
|
|
|
|
let init_per_file_metrics = if part_metrics.explain_verbose() {
|
|
|
|
|
let file = stream_ctx.input.file_from_index(index);
|
|
|
|
|
let file_id = file.file_id();
|
|
|
|
|
|
|
|
|
|
let mut map = HashMap::new();
|
|
|
|
|
map.insert(
|
|
|
|
|
file_id,
|
|
|
|
|
FileScanMetrics {
|
|
|
|
|
build_part_cost: reader_metrics.build_cost,
|
|
|
|
|
..Default::default()
|
|
|
|
|
},
|
|
|
|
|
);
|
|
|
|
|
Some(map)
|
|
|
|
|
} else {
|
|
|
|
|
None
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// Warn if build_part_cost exceeds threshold
|
|
|
|
|
if reader_metrics.build_cost > *SLOW_FILE_SCAN_THRESHOLD {
|
|
|
|
|
let file = stream_ctx.input.file_from_index(index);
|
|
|
|
|
let file_id = file.file_id();
|
|
|
|
|
common_telemetry::warn!(
|
|
|
|
|
"Slow file part build detected - region_id: {}, file_id: {}, build_part_cost: {:?}",
|
|
|
|
|
part_metrics.0.region_id,
|
|
|
|
|
file_id,
|
|
|
|
|
reader_metrics.build_cost
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
part_metrics.merge_reader_metrics(&reader_metrics);
|
|
|
|
|
|
|
|
|
|
Ok(build_flat_file_range_scan_stream(
|
|
|
|
|
stream_ctx,
|
|
|
|
|
part_metrics,
|
|
|
|
|
read_type,
|
|
|
|
|
ranges,
|
|
|
|
|
init_per_file_metrics,
|
|
|
|
|
))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -1144,7 +980,6 @@ pub fn build_file_range_scan_stream(
|
|
|
|
|
part_metrics: PartitionMetrics,
|
|
|
|
|
read_type: &'static str,
|
|
|
|
|
ranges: SmallVec<[FileRange; 2]>,
|
|
|
|
|
mut per_file_metrics: Option<HashMap<RegionFileId, FileScanMetrics>>,
|
|
|
|
|
) -> impl Stream<Item = Result<Batch>> {
|
|
|
|
|
try_stream! {
|
|
|
|
|
let fetch_metrics = if part_metrics.explain_verbose() {
|
|
|
|
|
@@ -1171,34 +1006,6 @@ pub fn build_file_range_scan_stream(
|
|
|
|
|
}
|
|
|
|
|
if let Source::PruneReader(reader) = source {
|
|
|
|
|
let prune_metrics = reader.metrics();
|
|
|
|
|
|
|
|
|
|
// Warn if build_cost + scan_cost exceeds threshold
|
|
|
|
|
let total_cost = build_cost + prune_metrics.scan_cost;
|
|
|
|
|
if total_cost > *SLOW_FILE_SCAN_THRESHOLD {
|
|
|
|
|
let file_id = range.file_handle().file_id();
|
|
|
|
|
common_telemetry::warn!(
|
|
|
|
|
"Slow file scan detected - region_id: {}, file_id: {}, total_cost: {:?} (build_reader_cost: {:?}, scan_cost: {:?})",
|
|
|
|
|
part_metrics.0.region_id,
|
|
|
|
|
file_id,
|
|
|
|
|
total_cost,
|
|
|
|
|
build_cost,
|
|
|
|
|
prune_metrics.scan_cost
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Update per-file metrics if tracking is enabled
|
|
|
|
|
if let Some(file_metrics_map) = per_file_metrics.as_mut() {
|
|
|
|
|
let file_id = range.file_handle().file_id();
|
|
|
|
|
let file_metrics = file_metrics_map
|
|
|
|
|
.entry(file_id)
|
|
|
|
|
.or_insert_with(FileScanMetrics::default);
|
|
|
|
|
|
|
|
|
|
file_metrics.num_ranges += 1;
|
|
|
|
|
file_metrics.num_rows += prune_metrics.num_rows;
|
|
|
|
|
file_metrics.build_reader_cost += build_cost;
|
|
|
|
|
file_metrics.scan_cost += prune_metrics.scan_cost;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
reader_metrics.merge_from(&prune_metrics);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
@@ -1206,7 +1013,7 @@ pub fn build_file_range_scan_stream(
|
|
|
|
|
// Reports metrics.
|
|
|
|
|
reader_metrics.observe_rows(read_type);
|
|
|
|
|
reader_metrics.filter_metrics.observe();
|
|
|
|
|
part_metrics.merge_reader_metrics(reader_metrics, per_file_metrics.as_ref());
|
|
|
|
|
part_metrics.merge_reader_metrics(reader_metrics);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -1216,7 +1023,6 @@ pub fn build_flat_file_range_scan_stream(
|
|
|
|
|
part_metrics: PartitionMetrics,
|
|
|
|
|
read_type: &'static str,
|
|
|
|
|
ranges: SmallVec<[FileRange; 2]>,
|
|
|
|
|
mut per_file_metrics: Option<HashMap<RegionFileId, FileScanMetrics>>,
|
|
|
|
|
) -> impl Stream<Item = Result<RecordBatch>> {
|
|
|
|
|
try_stream! {
|
|
|
|
|
let fetch_metrics = if part_metrics.explain_verbose() {
|
|
|
|
|
@@ -1252,41 +1058,13 @@ pub fn build_flat_file_range_scan_stream(
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let prune_metrics = reader.metrics();
|
|
|
|
|
|
|
|
|
|
// Warn if build_cost + scan_cost exceeds threshold
|
|
|
|
|
let total_cost = build_cost + prune_metrics.scan_cost;
|
|
|
|
|
if total_cost > *SLOW_FILE_SCAN_THRESHOLD {
|
|
|
|
|
let file_id = range.file_handle().file_id();
|
|
|
|
|
common_telemetry::warn!(
|
|
|
|
|
"Slow file scan detected - region_id: {}, file_id: {}, total_cost: {:?} (build_reader_cost: {:?}, scan_cost: {:?})",
|
|
|
|
|
part_metrics.0.region_id,
|
|
|
|
|
file_id,
|
|
|
|
|
total_cost,
|
|
|
|
|
build_cost,
|
|
|
|
|
prune_metrics.scan_cost
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Update per-file metrics if tracking is enabled
|
|
|
|
|
if let Some(file_metrics_map) = per_file_metrics.as_mut() {
|
|
|
|
|
let file_id = range.file_handle().file_id();
|
|
|
|
|
let file_metrics = file_metrics_map
|
|
|
|
|
.entry(file_id)
|
|
|
|
|
.or_insert_with(FileScanMetrics::default);
|
|
|
|
|
|
|
|
|
|
file_metrics.num_ranges += 1;
|
|
|
|
|
file_metrics.num_rows += prune_metrics.num_rows;
|
|
|
|
|
file_metrics.build_reader_cost += build_cost;
|
|
|
|
|
file_metrics.scan_cost += prune_metrics.scan_cost;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
reader_metrics.merge_from(&prune_metrics);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Reports metrics.
|
|
|
|
|
reader_metrics.observe_rows(read_type);
|
|
|
|
|
reader_metrics.filter_metrics.observe();
|
|
|
|
|
part_metrics.merge_reader_metrics(reader_metrics, per_file_metrics.as_ref());
|
|
|
|
|
part_metrics.merge_reader_metrics(reader_metrics);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|