feat: stream drop record metrics

Signed-off-by: discord9 <discord9@163.com>

refactor: move logging to drop too

Signed-off-by: discord9 <discord9@163.com>

fix: drop input stream before collect metrics

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2025-07-09 19:52:27 +08:00
parent df4cd157e1
commit bdecdb869e
2 changed files with 63 additions and 28 deletions

View File

@@ -156,6 +156,9 @@ impl ExecutionPlan for DistAnalyzeExec {
while let Some(batch) = input_stream.next().await.transpose()? {
total_rows += batch.num_rows();
}
// must drop input stream before starting collect metrics to make sure
// all metrics are collected(especially for MergeScanExec which collect metrics on drop stream)
drop(input_stream);
create_output_batch(total_rows, captured_input, captured_schema, format, verbose)
};

View File

@@ -58,6 +58,52 @@ use crate::error::ConvertSchemaSnafu;
use crate::metrics::{MERGE_SCAN_ERRORS_TOTAL, MERGE_SCAN_POLL_ELAPSED, MERGE_SCAN_REGIONS};
use crate::region_query::RegionQueryHandlerRef;
/// A wrapper around SendableRecordBatchStream that records metrics
/// when the stream is dropped. So that even if stream is cannceled before it is fully consumed,
/// we can still record the metrics.
pub struct MeasuredStream {
inner: SendableRecordBatchStream,
partition: usize,
region_id: RegionId,
do_get_cost: Duration,
dbname: String,
sub_stage_metrics: Arc<Mutex<Vec<RecordBatchMetrics>>>,
current_channel: u8,
metric: MergeScanMetric,
poll_duration: Duration,
}
impl Drop for MeasuredStream {
fn drop(&mut self) {
common_telemetry::debug!(
"Merge scan stop poll stream, partition: {}, region_id: {}, poll_duration: {:?}, first_consume: {}, do_get_cost: {:?}",
self.partition, self.region_id, self.poll_duration, self.metric.first_consume_time(), self.do_get_cost
);
if let Some(metrics) = self.inner.metrics() {
let (c, s) = parse_catalog_and_schema_from_db_string(&self.dbname);
let value = read_meter!(
c,
s,
ReadItem {
cpu_time: metrics.elapsed_compute as u64,
table_scan: metrics.memory_usage as u64
},
self.current_channel
);
self.metric.record_greptime_exec_cost(value as usize);
// record metrics from sub stage
if let Ok(mut guard) = self.sub_stage_metrics.lock() {
guard.push(metrics);
} else {
common_telemetry::error!("Failed to acquire lock for sub_stage_metrics in MergeScan drop. Metrics for one stream might be lost.");
}
}
MERGE_SCAN_POLL_ELAPSED.observe(self.poll_duration.as_secs_f64());
}
}
#[derive(Debug, Hash, PartialOrd, PartialEq, Eq, Clone)]
pub struct MergeScanLogicalPlan {
/// In logical plan phase it only contains one input
@@ -291,7 +337,7 @@ impl MergeScanExec {
plan: plan.clone(),
};
let do_get_start = Instant::now();
let mut stream = region_query_handler
let stream = region_query_handler
.do_get(read_preference, request)
.await
.map_err(|e| {
@@ -300,14 +346,24 @@ impl MergeScanExec {
})
.context(ExternalSnafu)?;
let do_get_cost = do_get_start.elapsed();
let mut stream = MeasuredStream {
inner: stream,
partition,
region_id,
do_get_cost,
dbname: dbname.clone(),
sub_stage_metrics: sub_stage_metrics_moved.clone(),
current_channel: current_channel as u8,
metric: metric.clone(),
poll_duration: Duration::ZERO,
};
ready_timer.stop();
let mut poll_duration = Duration::ZERO;
let mut poll_timer = Instant::now();
while let Some(batch) = stream.next().await {
while let Some(batch) = stream.inner.next().await {
let poll_elapsed = poll_timer.elapsed();
poll_duration += poll_elapsed;
stream.poll_duration += poll_elapsed;
let batch = batch?;
// reconstruct batch using `self.schema`
@@ -321,30 +377,6 @@ impl MergeScanExec {
// reset poll timer
poll_timer = Instant::now();
}
common_telemetry::debug!(
"Merge scan stop poll stream, partition: {}, region_id: {}, poll_duration: {:?}, first_consume: {}, do_get_cost: {:?}",
partition, region_id, poll_duration, metric.first_consume_time(), do_get_cost
);
// process metrics after all data is drained.
if let Some(metrics) = stream.metrics() {
let (c, s) = parse_catalog_and_schema_from_db_string(&dbname);
let value = read_meter!(
c,
s,
ReadItem {
cpu_time: metrics.elapsed_compute as u64,
table_scan: metrics.memory_usage as u64
},
current_channel as u8
);
metric.record_greptime_exec_cost(value as usize);
// record metrics from sub sgates
sub_stage_metrics_moved.lock().unwrap().push(metrics);
}
MERGE_SCAN_POLL_ELAPSED.observe(poll_duration.as_secs_f64());
}
}));