feat: add slow log and configure by SLOW_FILE_SCAN_THRESHOLD

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
evenyag
2025-12-11 17:14:05 +08:00
parent 2fcaa5ebc3
commit 3ed33e1aeb

View File

@@ -26,6 +26,7 @@ use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder,
use datatypes::arrow::record_batch::RecordBatch; use datatypes::arrow::record_batch::RecordBatch;
use datatypes::timestamp::timestamp_array_to_primitive; use datatypes::timestamp::timestamp_array_to_primitive;
use futures::Stream; use futures::Stream;
use lazy_static::lazy_static;
use prometheus::IntGauge; use prometheus::IntGauge;
use smallvec::SmallVec; use smallvec::SmallVec;
use snafu::OptionExt; use snafu::OptionExt;
@@ -52,6 +53,19 @@ use crate::sst::parquet::flat_format::time_index_column_index;
use crate::sst::parquet::reader::{MetadataCacheMetrics, ReaderFilterMetrics, ReaderMetrics}; use crate::sst::parquet::reader::{MetadataCacheMetrics, ReaderFilterMetrics, ReaderMetrics};
use crate::sst::parquet::row_group::ParquetFetchMetrics; use crate::sst::parquet::row_group::ParquetFetchMetrics;
lazy_static! {
/// Threshold for slow file scan warning in milliseconds.
/// Can be configured via SLOW_FILE_SCAN_THRESHOLD_MS environment variable.
/// Defaults to 1000ms (1 second).
static ref SLOW_FILE_SCAN_THRESHOLD: Duration = {
let threshold_ms = std::env::var("SLOW_FILE_SCAN_THRESHOLD_MS")
.ok()
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(1000);
Duration::from_millis(threshold_ms)
};
}
/// Per-file scan metrics. /// Per-file scan metrics.
#[derive(Default, Clone)] #[derive(Default, Clone)]
pub struct FileScanMetrics { pub struct FileScanMetrics {
@@ -1046,6 +1060,18 @@ pub(crate) async fn scan_file_ranges(
None None
}; };
// Warn if build_part_cost exceeds threshold
if reader_metrics.build_cost > *SLOW_FILE_SCAN_THRESHOLD {
let file = stream_ctx.input.file_from_index(index);
let file_id = file.file_id();
common_telemetry::warn!(
"Slow file part build detected - region_id: {}, file_id: {}, build_part_cost: {:?}",
part_metrics.0.region_id,
file_id,
reader_metrics.build_cost
);
}
Ok(build_file_range_scan_stream( Ok(build_file_range_scan_stream(
stream_ctx, stream_ctx,
part_metrics, part_metrics,
@@ -1091,6 +1117,18 @@ pub(crate) async fn scan_flat_file_ranges(
None None
}; };
// Warn if build_part_cost exceeds threshold
if reader_metrics.build_cost > *SLOW_FILE_SCAN_THRESHOLD {
let file = stream_ctx.input.file_from_index(index);
let file_id = file.file_id();
common_telemetry::warn!(
"Slow file part build detected - region_id: {}, file_id: {}, build_part_cost: {:?}",
part_metrics.0.region_id,
file_id,
reader_metrics.build_cost
);
}
Ok(build_flat_file_range_scan_stream( Ok(build_flat_file_range_scan_stream(
stream_ctx, stream_ctx,
part_metrics, part_metrics,
@@ -1134,6 +1172,20 @@ pub fn build_file_range_scan_stream(
if let Source::PruneReader(reader) = source { if let Source::PruneReader(reader) = source {
let prune_metrics = reader.metrics(); let prune_metrics = reader.metrics();
// Warn if build_cost + scan_cost exceeds threshold
let total_cost = build_cost + prune_metrics.scan_cost;
if total_cost > *SLOW_FILE_SCAN_THRESHOLD {
let file_id = range.file_handle().file_id();
common_telemetry::warn!(
"Slow file scan detected - region_id: {}, file_id: {}, total_cost: {:?} (build_reader_cost: {:?}, scan_cost: {:?})",
part_metrics.0.region_id,
file_id,
total_cost,
build_cost,
prune_metrics.scan_cost
);
}
// Update per-file metrics if tracking is enabled // Update per-file metrics if tracking is enabled
if let Some(file_metrics_map) = per_file_metrics.as_mut() { if let Some(file_metrics_map) = per_file_metrics.as_mut() {
let file_id = range.file_handle().file_id(); let file_id = range.file_handle().file_id();
@@ -1201,6 +1253,20 @@ pub fn build_flat_file_range_scan_stream(
let prune_metrics = reader.metrics(); let prune_metrics = reader.metrics();
// Warn if build_cost + scan_cost exceeds threshold
let total_cost = build_cost + prune_metrics.scan_cost;
if total_cost > *SLOW_FILE_SCAN_THRESHOLD {
let file_id = range.file_handle().file_id();
common_telemetry::warn!(
"Slow file scan detected - region_id: {}, file_id: {}, total_cost: {:?} (build_reader_cost: {:?}, scan_cost: {:?})",
part_metrics.0.region_id,
file_id,
total_cost,
build_cost,
prune_metrics.scan_cost
);
}
// Update per-file metrics if tracking is enabled // Update per-file metrics if tracking is enabled
if let Some(file_metrics_map) = per_file_metrics.as_mut() { if let Some(file_metrics_map) = per_file_metrics.as_mut() {
let file_id = range.file_handle().file_id(); let file_id = range.file_handle().file_id();