From 3ed33e1aebcaf9cb4d78e043df08fd17e80e64bd Mon Sep 17 00:00:00 2001 From: evenyag Date: Thu, 11 Dec 2025 17:14:05 +0800 Subject: [PATCH] feat: add slow log and configure by SLOW_FILE_SCAN_THRESHOLD Signed-off-by: evenyag --- src/mito2/src/read/scan_util.rs | 66 +++++++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) diff --git a/src/mito2/src/read/scan_util.rs b/src/mito2/src/read/scan_util.rs index b6107d5af6..8f3d4fb7a5 100644 --- a/src/mito2/src/read/scan_util.rs +++ b/src/mito2/src/read/scan_util.rs @@ -26,6 +26,7 @@ use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder, use datatypes::arrow::record_batch::RecordBatch; use datatypes::timestamp::timestamp_array_to_primitive; use futures::Stream; +use lazy_static::lazy_static; use prometheus::IntGauge; use smallvec::SmallVec; use snafu::OptionExt; @@ -52,6 +53,19 @@ use crate::sst::parquet::flat_format::time_index_column_index; use crate::sst::parquet::reader::{MetadataCacheMetrics, ReaderFilterMetrics, ReaderMetrics}; use crate::sst::parquet::row_group::ParquetFetchMetrics; +lazy_static! { + /// Threshold for slow file scan warning in milliseconds. + /// Can be configured via SLOW_FILE_SCAN_THRESHOLD_MS environment variable. + /// Defaults to 1000ms (1 second). + static ref SLOW_FILE_SCAN_THRESHOLD: Duration = { + let threshold_ms = std::env::var("SLOW_FILE_SCAN_THRESHOLD_MS") + .ok() + .and_then(|s| s.parse::().ok()) + .unwrap_or(1000); + Duration::from_millis(threshold_ms) + }; +} + /// Per-file scan metrics. #[derive(Default, Clone)] pub struct FileScanMetrics { @@ -1046,6 +1060,18 @@ pub(crate) async fn scan_file_ranges( None }; + // Warn if build_part_cost exceeds threshold + if reader_metrics.build_cost > *SLOW_FILE_SCAN_THRESHOLD { + let file = stream_ctx.input.file_from_index(index); + let file_id = file.file_id(); + common_telemetry::warn!( + "Slow file part build detected - region_id: {}, file_id: {}, build_part_cost: {:?}", + part_metrics.0.region_id, + file_id, + reader_metrics.build_cost + ); + } + Ok(build_file_range_scan_stream( stream_ctx, part_metrics, @@ -1091,6 +1117,18 @@ pub(crate) async fn scan_flat_file_ranges( None }; + // Warn if build_part_cost exceeds threshold + if reader_metrics.build_cost > *SLOW_FILE_SCAN_THRESHOLD { + let file = stream_ctx.input.file_from_index(index); + let file_id = file.file_id(); + common_telemetry::warn!( + "Slow file part build detected - region_id: {}, file_id: {}, build_part_cost: {:?}", + part_metrics.0.region_id, + file_id, + reader_metrics.build_cost + ); + } + Ok(build_flat_file_range_scan_stream( stream_ctx, part_metrics, @@ -1134,6 +1172,20 @@ pub fn build_file_range_scan_stream( if let Source::PruneReader(reader) = source { let prune_metrics = reader.metrics(); + // Warn if build_cost + scan_cost exceeds threshold + let total_cost = build_cost + prune_metrics.scan_cost; + if total_cost > *SLOW_FILE_SCAN_THRESHOLD { + let file_id = range.file_handle().file_id(); + common_telemetry::warn!( + "Slow file scan detected - region_id: {}, file_id: {}, total_cost: {:?} (build_reader_cost: {:?}, scan_cost: {:?})", + part_metrics.0.region_id, + file_id, + total_cost, + build_cost, + prune_metrics.scan_cost + ); + } + // Update per-file metrics if tracking is enabled if let Some(file_metrics_map) = per_file_metrics.as_mut() { let file_id = range.file_handle().file_id(); @@ -1201,6 +1253,20 @@ pub fn build_flat_file_range_scan_stream( let prune_metrics = reader.metrics(); + // Warn if build_cost + scan_cost exceeds threshold + let total_cost = build_cost + prune_metrics.scan_cost; + if total_cost > *SLOW_FILE_SCAN_THRESHOLD { + let file_id = range.file_handle().file_id(); + common_telemetry::warn!( + "Slow file scan detected - region_id: {}, file_id: {}, total_cost: {:?} (build_reader_cost: {:?}, scan_cost: {:?})", + part_metrics.0.region_id, + file_id, + total_cost, + build_cost, + prune_metrics.scan_cost + ); + } + // Update per-file metrics if tracking is enabled if let Some(file_metrics_map) = per_file_metrics.as_mut() { let file_id = range.file_handle().file_id();