feat: add per-partition timings to merge scan partition metrics (#8293)

* feat: add per-partition timings to merge scan partition metrics

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: stop global timers for partitions with no region

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
Yingwen
2026-06-12 21:22:49 +08:00
committed by GitHub
parent d34d4c1aba
commit 4a4fe749d8

View File

@@ -367,6 +367,11 @@ impl MergeScanExec {
let mut ready_timer = metric.ready_time().timer();
let mut first_consume_timer = Some(metric.first_consume_time().timer());
// Per-partition timings, scoped to this partition's stream for `EXPLAIN VERBOSE`.
let partition_start = Instant::now();
let mut partition_ready_time: Option<Duration> = None;
let mut partition_first_consume_time: Option<Duration> = None;
for region_id in regions
.iter()
.skip(partition)
@@ -423,6 +428,9 @@ impl MergeScanExec {
}
ready_timer.stop();
if partition_ready_time.is_none() {
partition_ready_time = Some(partition_start.elapsed());
}
let mut poll_duration = Duration::ZERO;
let mut poll_timer = Instant::now();
@@ -438,6 +446,7 @@ impl MergeScanExec {
metric.record_output_batch_rows(batch.num_rows());
if let Some(mut first_consume_timer) = first_consume_timer.take() {
first_consume_timer.stop();
partition_first_consume_time = Some(partition_start.elapsed());
}
if let Some(metrics) = stream.metrics() {
@@ -449,6 +458,12 @@ impl MergeScanExec {
// reset poll timer
poll_timer = Instant::now();
}
// Also stop on an exhausted stream that yielded no batch. The `take()`
// guard ensures it only records once, on the first such region.
if let Some(mut first_consume_timer) = first_consume_timer.take() {
first_consume_timer.stop();
partition_first_consume_time = Some(partition_start.elapsed());
}
let total_cost = region_start.elapsed();
// Record region metrics and push to global partition_metrics
@@ -501,10 +516,23 @@ impl MergeScanExec {
MERGE_SCAN_POLL_ELAPSED.observe(poll_duration.as_secs_f64());
}
// Stop the global timers for partitions with no region, otherwise they keep
// running until drop and inflate the shared metrics. No-op otherwise.
ready_timer.stop();
if let Some(mut first_consume_timer) = first_consume_timer.take() {
first_consume_timer.stop();
}
// Finish partition metrics and log results
let partition_finish_time = partition_start.elapsed();
{
let mut partition_metrics_guard = partition_metrics_moved.lock().unwrap();
if let Some(partition_metrics) = partition_metrics_guard.get_mut(&partition) {
partition_metrics.set_timings(
partition_ready_time.unwrap_or_default(),
partition_first_consume_time.unwrap_or_default(),
partition_finish_time,
);
partition_metrics.finish();
}
}
@@ -648,6 +676,12 @@ struct PartitionMetrics {
total_poll_duration: Duration,
total_do_get_cost: Duration,
total_regions: usize,
/// Time until this partition's scan is ready to emit data.
ready_time: Duration,
/// Time until this partition's first stream poll resolves (a batch or exhausted).
first_consume_time: Duration,
/// Time until this partition's scan finishes execution.
finish_time: Duration,
explain_verbose: bool,
finished: bool,
}
@@ -660,6 +694,9 @@ impl PartitionMetrics {
total_poll_duration: Duration::ZERO,
total_do_get_cost: Duration::ZERO,
total_regions: 0,
ready_time: Duration::ZERO,
first_consume_time: Duration::ZERO,
finish_time: Duration::ZERO,
explain_verbose,
finished: false,
}
@@ -672,6 +709,18 @@ impl PartitionMetrics {
self.region_metrics.push(region_metrics);
}
/// Set the per-partition timings captured during streaming.
fn set_timings(
&mut self,
ready_time: Duration,
first_consume_time: Duration,
finish_time: Duration,
) {
self.ready_time = ready_time;
self.first_consume_time = first_consume_time;
self.finish_time = finish_time;
}
/// Finish the partition metrics and log the results.
fn finish(&mut self) {
if self.finished {
@@ -685,19 +734,25 @@ impl PartitionMetrics {
fn log_metrics(&self) {
if self.explain_verbose {
common_telemetry::info!(
"MergeScan partition {} finished: {} regions, total_poll_duration: {:?}, total_do_get_cost: {:?}",
"MergeScan partition {} finished: {} regions, total_poll_duration: {:?}, total_do_get_cost: {:?}, ready_time: {:?}, first_consume_time: {:?}, finish_time: {:?}",
self.partition,
self.total_regions,
self.total_poll_duration,
self.total_do_get_cost
self.total_do_get_cost,
self.ready_time,
self.first_consume_time,
self.finish_time
);
} else {
common_telemetry::debug!(
"MergeScan partition {} finished: {} regions, total_poll_duration: {:?}, total_do_get_cost: {:?}",
"MergeScan partition {} finished: {} regions, total_poll_duration: {:?}, total_do_get_cost: {:?}, ready_time: {:?}, first_consume_time: {:?}, finish_time: {:?}",
self.partition,
self.total_regions,
self.total_poll_duration,
self.total_do_get_cost
self.total_do_get_cost,
self.ready_time,
self.first_consume_time,
self.finish_time
);
}
}
@@ -832,11 +887,14 @@ impl DisplayAs for MergeScanExec {
}
write!(
f,
"\"partition_{}\":{{\"regions\":{},\"total_poll_duration\":\"{:?}\",\"total_do_get_cost\":\"{:?}\",\"region_metrics\":[",
"\"partition_{}\":{{\"regions\":{},\"total_poll_duration\":\"{:?}\",\"total_do_get_cost\":\"{:?}\",\"ready_time\":\"{:?}\",\"first_consume_time\":\"{:?}\",\"finish_time\":\"{:?}\",\"region_metrics\":[",
pm.partition,
pm.total_regions,
pm.total_poll_duration,
pm.total_do_get_cost
pm.total_do_get_cost,
pm.ready_time,
pm.first_consume_time,
pm.finish_time
)?;
for (j, rm) in pm.region_metrics.iter().enumerate() {
if j > 0 {