diff --git a/src/query/src/analyze.rs b/src/query/src/analyze.rs index 6f87e91267..54a34eaf4b 100644 --- a/src/query/src/analyze.rs +++ b/src/query/src/analyze.rs @@ -156,6 +156,9 @@ impl ExecutionPlan for DistAnalyzeExec { while let Some(batch) = input_stream.next().await.transpose()? { total_rows += batch.num_rows(); } + // must drop input stream before starting collect metrics to make sure + // all metrics are collected(especially for MergeScanExec which collect metrics on drop stream) + drop(input_stream); create_output_batch(total_rows, captured_input, captured_schema, format, verbose) }; diff --git a/src/query/src/dist_plan/merge_scan.rs b/src/query/src/dist_plan/merge_scan.rs index c98206dbc5..b00c32bd9f 100644 --- a/src/query/src/dist_plan/merge_scan.rs +++ b/src/query/src/dist_plan/merge_scan.rs @@ -58,6 +58,52 @@ use crate::error::ConvertSchemaSnafu; use crate::metrics::{MERGE_SCAN_ERRORS_TOTAL, MERGE_SCAN_POLL_ELAPSED, MERGE_SCAN_REGIONS}; use crate::region_query::RegionQueryHandlerRef; +/// A wrapper around SendableRecordBatchStream that records metrics +/// when the stream is dropped. So that even if stream is cannceled before it is fully consumed, +/// we can still record the metrics. +pub struct MeasuredStream { + inner: SendableRecordBatchStream, + partition: usize, + region_id: RegionId, + do_get_cost: Duration, + dbname: String, + sub_stage_metrics: Arc>>, + current_channel: u8, + metric: MergeScanMetric, + poll_duration: Duration, +} + +impl Drop for MeasuredStream { + fn drop(&mut self) { + common_telemetry::debug!( + "Merge scan stop poll stream, partition: {}, region_id: {}, poll_duration: {:?}, first_consume: {}, do_get_cost: {:?}", + self.partition, self.region_id, self.poll_duration, self.metric.first_consume_time(), self.do_get_cost + ); + if let Some(metrics) = self.inner.metrics() { + let (c, s) = parse_catalog_and_schema_from_db_string(&self.dbname); + let value = read_meter!( + c, + s, + ReadItem { + cpu_time: metrics.elapsed_compute as u64, + table_scan: metrics.memory_usage as u64 + }, + self.current_channel + ); + self.metric.record_greptime_exec_cost(value as usize); + + // record metrics from sub stage + if let Ok(mut guard) = self.sub_stage_metrics.lock() { + guard.push(metrics); + } else { + common_telemetry::error!("Failed to acquire lock for sub_stage_metrics in MergeScan drop. Metrics for one stream might be lost."); + } + } + + MERGE_SCAN_POLL_ELAPSED.observe(self.poll_duration.as_secs_f64()); + } +} + #[derive(Debug, Hash, PartialOrd, PartialEq, Eq, Clone)] pub struct MergeScanLogicalPlan { /// In logical plan phase it only contains one input @@ -291,7 +337,7 @@ impl MergeScanExec { plan: plan.clone(), }; let do_get_start = Instant::now(); - let mut stream = region_query_handler + let stream = region_query_handler .do_get(read_preference, request) .await .map_err(|e| { @@ -300,14 +346,24 @@ impl MergeScanExec { }) .context(ExternalSnafu)?; let do_get_cost = do_get_start.elapsed(); + let mut stream = MeasuredStream { + inner: stream, + partition, + region_id, + do_get_cost, + dbname: dbname.clone(), + sub_stage_metrics: sub_stage_metrics_moved.clone(), + current_channel: current_channel as u8, + metric: metric.clone(), + poll_duration: Duration::ZERO, + }; ready_timer.stop(); - let mut poll_duration = Duration::ZERO; let mut poll_timer = Instant::now(); - while let Some(batch) = stream.next().await { + while let Some(batch) = stream.inner.next().await { let poll_elapsed = poll_timer.elapsed(); - poll_duration += poll_elapsed; + stream.poll_duration += poll_elapsed; let batch = batch?; // reconstruct batch using `self.schema` @@ -321,30 +377,6 @@ impl MergeScanExec { // reset poll timer poll_timer = Instant::now(); } - common_telemetry::debug!( - "Merge scan stop poll stream, partition: {}, region_id: {}, poll_duration: {:?}, first_consume: {}, do_get_cost: {:?}", - partition, region_id, poll_duration, metric.first_consume_time(), do_get_cost - ); - - // process metrics after all data is drained. - if let Some(metrics) = stream.metrics() { - let (c, s) = parse_catalog_and_schema_from_db_string(&dbname); - let value = read_meter!( - c, - s, - ReadItem { - cpu_time: metrics.elapsed_compute as u64, - table_scan: metrics.memory_usage as u64 - }, - current_channel as u8 - ); - metric.record_greptime_exec_cost(value as usize); - - // record metrics from sub sgates - sub_stage_metrics_moved.lock().unwrap().push(metrics); - } - - MERGE_SCAN_POLL_ELAPSED.observe(poll_duration.as_secs_f64()); } }));