From 4a4fe749d8d75bf53bb81307896ea0b2083892fe Mon Sep 17 00:00:00 2001 From: Yingwen Date: Fri, 12 Jun 2026 21:22:49 +0800 Subject: [PATCH] feat: add per-partition timings to merge scan partition metrics (#8293) * feat: add per-partition timings to merge scan partition metrics Signed-off-by: evenyag * fix: stop global timers for partitions with no region Signed-off-by: evenyag --------- Signed-off-by: evenyag --- src/query/src/dist_plan/merge_scan.rs | 70 ++++++++++++++++++++++++--- 1 file changed, 64 insertions(+), 6 deletions(-) diff --git a/src/query/src/dist_plan/merge_scan.rs b/src/query/src/dist_plan/merge_scan.rs index daf9624374..0b03d4f53b 100644 --- a/src/query/src/dist_plan/merge_scan.rs +++ b/src/query/src/dist_plan/merge_scan.rs @@ -367,6 +367,11 @@ impl MergeScanExec { let mut ready_timer = metric.ready_time().timer(); let mut first_consume_timer = Some(metric.first_consume_time().timer()); + // Per-partition timings, scoped to this partition's stream for `EXPLAIN VERBOSE`. + let partition_start = Instant::now(); + let mut partition_ready_time: Option = None; + let mut partition_first_consume_time: Option = None; + for region_id in regions .iter() .skip(partition) @@ -423,6 +428,9 @@ impl MergeScanExec { } ready_timer.stop(); + if partition_ready_time.is_none() { + partition_ready_time = Some(partition_start.elapsed()); + } let mut poll_duration = Duration::ZERO; let mut poll_timer = Instant::now(); @@ -438,6 +446,7 @@ impl MergeScanExec { metric.record_output_batch_rows(batch.num_rows()); if let Some(mut first_consume_timer) = first_consume_timer.take() { first_consume_timer.stop(); + partition_first_consume_time = Some(partition_start.elapsed()); } if let Some(metrics) = stream.metrics() { @@ -449,6 +458,12 @@ impl MergeScanExec { // reset poll timer poll_timer = Instant::now(); } + // Also stop on an exhausted stream that yielded no batch. The `take()` + // guard ensures it only records once, on the first such region. + if let Some(mut first_consume_timer) = first_consume_timer.take() { + first_consume_timer.stop(); + partition_first_consume_time = Some(partition_start.elapsed()); + } let total_cost = region_start.elapsed(); // Record region metrics and push to global partition_metrics @@ -501,10 +516,23 @@ impl MergeScanExec { MERGE_SCAN_POLL_ELAPSED.observe(poll_duration.as_secs_f64()); } + // Stop the global timers for partitions with no region, otherwise they keep + // running until drop and inflate the shared metrics. No-op otherwise. + ready_timer.stop(); + if let Some(mut first_consume_timer) = first_consume_timer.take() { + first_consume_timer.stop(); + } + // Finish partition metrics and log results + let partition_finish_time = partition_start.elapsed(); { let mut partition_metrics_guard = partition_metrics_moved.lock().unwrap(); if let Some(partition_metrics) = partition_metrics_guard.get_mut(&partition) { + partition_metrics.set_timings( + partition_ready_time.unwrap_or_default(), + partition_first_consume_time.unwrap_or_default(), + partition_finish_time, + ); partition_metrics.finish(); } } @@ -648,6 +676,12 @@ struct PartitionMetrics { total_poll_duration: Duration, total_do_get_cost: Duration, total_regions: usize, + /// Time until this partition's scan is ready to emit data. + ready_time: Duration, + /// Time until this partition's first stream poll resolves (a batch or exhausted). + first_consume_time: Duration, + /// Time until this partition's scan finishes execution. + finish_time: Duration, explain_verbose: bool, finished: bool, } @@ -660,6 +694,9 @@ impl PartitionMetrics { total_poll_duration: Duration::ZERO, total_do_get_cost: Duration::ZERO, total_regions: 0, + ready_time: Duration::ZERO, + first_consume_time: Duration::ZERO, + finish_time: Duration::ZERO, explain_verbose, finished: false, } @@ -672,6 +709,18 @@ impl PartitionMetrics { self.region_metrics.push(region_metrics); } + /// Set the per-partition timings captured during streaming. + fn set_timings( + &mut self, + ready_time: Duration, + first_consume_time: Duration, + finish_time: Duration, + ) { + self.ready_time = ready_time; + self.first_consume_time = first_consume_time; + self.finish_time = finish_time; + } + /// Finish the partition metrics and log the results. fn finish(&mut self) { if self.finished { @@ -685,19 +734,25 @@ impl PartitionMetrics { fn log_metrics(&self) { if self.explain_verbose { common_telemetry::info!( - "MergeScan partition {} finished: {} regions, total_poll_duration: {:?}, total_do_get_cost: {:?}", + "MergeScan partition {} finished: {} regions, total_poll_duration: {:?}, total_do_get_cost: {:?}, ready_time: {:?}, first_consume_time: {:?}, finish_time: {:?}", self.partition, self.total_regions, self.total_poll_duration, - self.total_do_get_cost + self.total_do_get_cost, + self.ready_time, + self.first_consume_time, + self.finish_time ); } else { common_telemetry::debug!( - "MergeScan partition {} finished: {} regions, total_poll_duration: {:?}, total_do_get_cost: {:?}", + "MergeScan partition {} finished: {} regions, total_poll_duration: {:?}, total_do_get_cost: {:?}, ready_time: {:?}, first_consume_time: {:?}, finish_time: {:?}", self.partition, self.total_regions, self.total_poll_duration, - self.total_do_get_cost + self.total_do_get_cost, + self.ready_time, + self.first_consume_time, + self.finish_time ); } } @@ -832,11 +887,14 @@ impl DisplayAs for MergeScanExec { } write!( f, - "\"partition_{}\":{{\"regions\":{},\"total_poll_duration\":\"{:?}\",\"total_do_get_cost\":\"{:?}\",\"region_metrics\":[", + "\"partition_{}\":{{\"regions\":{},\"total_poll_duration\":\"{:?}\",\"total_do_get_cost\":\"{:?}\",\"ready_time\":\"{:?}\",\"first_consume_time\":\"{:?}\",\"finish_time\":\"{:?}\",\"region_metrics\":[", pm.partition, pm.total_regions, pm.total_poll_duration, - pm.total_do_get_cost + pm.total_do_get_cost, + pm.ready_time, + pm.first_consume_time, + pm.finish_time )?; for (j, rm) in pm.region_metrics.iter().enumerate() { if j > 0 {