From df4cd157e1719c6ffbc5d2fce337cf29687a785b Mon Sep 17 00:00:00 2001 From: discord9 Date: Wed, 9 Jul 2025 20:32:31 +0800 Subject: [PATCH] Revert "feat: stream drop record metrics" This reverts commit 6a16946a5b8ea37557bbb1b600847d24274d6500. Signed-off-by: discord9 --- src/query/src/dist_plan/merge_scan.rs | 76 +++++++++------------------ 1 file changed, 25 insertions(+), 51 deletions(-) diff --git a/src/query/src/dist_plan/merge_scan.rs b/src/query/src/dist_plan/merge_scan.rs index cc6da43bf5..c98206dbc5 100644 --- a/src/query/src/dist_plan/merge_scan.rs +++ b/src/query/src/dist_plan/merge_scan.rs @@ -58,45 +58,6 @@ use crate::error::ConvertSchemaSnafu; use crate::metrics::{MERGE_SCAN_ERRORS_TOTAL, MERGE_SCAN_POLL_ELAPSED, MERGE_SCAN_REGIONS}; use crate::region_query::RegionQueryHandlerRef; -/// A wrapper around SendableRecordBatchStream that records metrics -/// when the stream is dropped. So that even if stream is cannceled before it is fully consumed, -/// we can still record the metrics. -pub struct MeasuredStream { - inner: SendableRecordBatchStream, - dbname: String, - sub_stage_metrics: Arc>>, - current_channel: u8, - metric: MergeScanMetric, - poll_duration: Duration, -} - -impl Drop for MeasuredStream { - fn drop(&mut self) { - if let Some(metrics) = self.inner.metrics() { - let (c, s) = parse_catalog_and_schema_from_db_string(&self.dbname); - let value = read_meter!( - c, - s, - ReadItem { - cpu_time: metrics.elapsed_compute as u64, - table_scan: metrics.memory_usage as u64 - }, - self.current_channel - ); - self.metric.record_greptime_exec_cost(value as usize); - - // record metrics from sub stage - if let Ok(mut guard) = self.sub_stage_metrics.lock() { - guard.push(metrics); - } else { - common_telemetry::error!("Failed to acquire lock for sub_stage_metrics in MergeScan drop. Metrics for one stream might be lost."); - } - } - - MERGE_SCAN_POLL_ELAPSED.observe(self.poll_duration.as_secs_f64()); - } -} - #[derive(Debug, Hash, PartialOrd, PartialEq, Eq, Clone)] pub struct MergeScanLogicalPlan { /// In logical plan phase it only contains one input @@ -330,7 +291,7 @@ impl MergeScanExec { plan: plan.clone(), }; let do_get_start = Instant::now(); - let stream = region_query_handler + let mut stream = region_query_handler .do_get(read_preference, request) .await .map_err(|e| { @@ -338,22 +299,15 @@ impl MergeScanExec { BoxedError::new(e) }) .context(ExternalSnafu)?; - let mut stream = MeasuredStream { - inner: stream, - dbname: dbname.clone(), - sub_stage_metrics: sub_stage_metrics_moved.clone(), - current_channel: current_channel as u8, - metric: metric.clone(), - poll_duration: Duration::ZERO, - }; let do_get_cost = do_get_start.elapsed(); ready_timer.stop(); + let mut poll_duration = Duration::ZERO; let mut poll_timer = Instant::now(); - while let Some(batch) = stream.inner.next().await { + while let Some(batch) = stream.next().await { let poll_elapsed = poll_timer.elapsed(); - stream.poll_duration += poll_elapsed; + poll_duration += poll_elapsed; let batch = batch?; // reconstruct batch using `self.schema` @@ -369,8 +323,28 @@ impl MergeScanExec { } common_telemetry::debug!( "Merge scan stop poll stream, partition: {}, region_id: {}, poll_duration: {:?}, first_consume: {}, do_get_cost: {:?}", - partition, region_id, stream.poll_duration, metric.first_consume_time(), do_get_cost + partition, region_id, poll_duration, metric.first_consume_time(), do_get_cost ); + + // process metrics after all data is drained. + if let Some(metrics) = stream.metrics() { + let (c, s) = parse_catalog_and_schema_from_db_string(&dbname); + let value = read_meter!( + c, + s, + ReadItem { + cpu_time: metrics.elapsed_compute as u64, + table_scan: metrics.memory_usage as u64 + }, + current_channel as u8 + ); + metric.record_greptime_exec_cost(value as usize); + + // record metrics from sub sgates + sub_stage_metrics_moved.lock().unwrap().push(metrics); + } + + MERGE_SCAN_POLL_ELAPSED.observe(poll_duration.as_secs_f64()); } }));