From 393047a5415b154a28e675c16ae25bf015080feb Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Mon, 14 Aug 2023 15:10:45 +0800 Subject: [PATCH] feat: implement metric for MergeScanExec (#2166) Signed-off-by: Ruihang Xia --- src/query/src/dist_plan/merge_scan.rs | 41 ++++++++++++++++++++++++++- 1 file changed, 40 insertions(+), 1 deletion(-) diff --git a/src/query/src/dist_plan/merge_scan.rs b/src/query/src/dist_plan/merge_scan.rs index 4edd6afbc4..08fb755ae5 100644 --- a/src/query/src/dist_plan/merge_scan.rs +++ b/src/query/src/dist_plan/merge_scan.rs @@ -30,6 +30,9 @@ use common_recordbatch::error::ExternalSnafu; use common_recordbatch::{ DfSendableRecordBatchStream, RecordBatch, RecordBatchStreamAdaptor, SendableRecordBatchStream, }; +use datafusion::physical_plan::metrics::{ + Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet, Time, +}; use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning}; use datafusion_common::{DataFusionError, Result, Statistics}; use datafusion_expr::{Extension, LogicalPlan, UserDefinedLogicalNodeCore}; @@ -112,6 +115,7 @@ pub struct MergeScanExec { substrait_plan: Bytes, arrow_schema: ArrowSchemaRef, clients: Arc, + metric: ExecutionPlanMetricsSet, } impl MergeScanExec { @@ -130,6 +134,7 @@ impl MergeScanExec { substrait_plan, arrow_schema, clients, + metric: ExecutionPlanMetricsSet::new(), } } @@ -139,11 +144,13 @@ impl MergeScanExec { let clients = self.clients.clone(); let table = self.table.clone(); let trace_id = context.task_id().and_then(|id| id.parse().ok()); + let metric = MergeScanMetric::new(&self.metric); let stream = try_stream! { for peer in peers { let client = clients.get_client(&peer).await; let database = Database::new(&table.catalog_name, &table.schema_name, client); + let _timer = metric.grpc_time().timer(); let output: Output = database .logical_plan(substrait_plan.clone(), trace_id) .await @@ -164,12 +171,15 @@ impl MergeScanExec { } Output::RecordBatches(record_batches) => { for batch in record_batches.into_iter() { + metric.record_output_batch_rows(batch.num_rows()); yield Self::remove_metadata_from_record_batch(batch); } } Output::Stream(mut stream) => { while let Some(batch) = stream.next().await { - yield Self::remove_metadata_from_record_batch(batch?); + let batch = batch?; + metric.record_output_batch_rows(batch.num_rows()); + yield Self::remove_metadata_from_record_batch(batch); } } } @@ -241,6 +251,10 @@ impl ExecutionPlan for MergeScanExec { fn statistics(&self) -> Statistics { Statistics::default() } + + fn metrics(&self) -> Option { + Some(self.metric.clone_inner()) + } } impl DisplayAs for MergeScanExec { @@ -252,3 +266,28 @@ impl DisplayAs for MergeScanExec { write!(f, "]") } } + +#[derive(Debug, Clone)] +struct MergeScanMetric { + /// Nanosecond spent on fetching data from remote + grpc_time: Time, + /// Count of rows fetched from remote + output_rows: Count, +} + +impl MergeScanMetric { + pub fn new(metric: &ExecutionPlanMetricsSet) -> Self { + Self { + grpc_time: MetricBuilder::new(metric).subset_time("gRPC", 1), + output_rows: MetricBuilder::new(metric).output_rows(1), + } + } + + pub fn grpc_time(&self) -> &Time { + &self.grpc_time + } + + pub fn record_output_batch_rows(&self, num_rows: usize) { + self.output_rows.add(num_rows); + } +}