diff --git a/src/common/query/src/physical_plan.rs b/src/common/query/src/physical_plan.rs index 144e1bcd0b..1c148020ac 100644 --- a/src/common/query/src/physical_plan.rs +++ b/src/common/query/src/physical_plan.rs @@ -22,6 +22,7 @@ use datafusion::arrow::datatypes::SchemaRef as DfSchemaRef; use datafusion::error::Result as DfResult; pub use datafusion::execution::context::{SessionContext, TaskContext}; use datafusion::physical_plan::expressions::PhysicalSortExpr; +use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; pub use datafusion::physical_plan::Partitioning; use datafusion::physical_plan::Statistics; use datatypes::schema::SchemaRef; @@ -69,6 +70,10 @@ pub trait PhysicalPlan: Debug + Send + Sync { partition: usize, context: Arc, ) -> Result; + + fn metrics(&self) -> Option { + None + } } /// Adapt DataFusion's [`ExecutionPlan`](DfPhysicalPlan) to GreptimeDB's [`PhysicalPlan`]. @@ -76,11 +81,16 @@ pub trait PhysicalPlan: Debug + Send + Sync { pub struct PhysicalPlanAdapter { schema: SchemaRef, df_plan: Arc, + metric: ExecutionPlanMetricsSet, } impl PhysicalPlanAdapter { pub fn new(schema: SchemaRef, df_plan: Arc) -> Self { - Self { schema, df_plan } + Self { + schema, + df_plan, + metric: ExecutionPlanMetricsSet::new(), + } } pub fn df_plan(&self) -> Arc { @@ -127,15 +137,21 @@ impl PhysicalPlan for PhysicalPlanAdapter { partition: usize, context: Arc, ) -> Result { + let baseline_metric = BaselineMetrics::new(&self.metric, partition); + let df_plan = self.df_plan.clone(); let stream = df_plan .execute(partition, context) .context(error::GeneralDataFusionSnafu)?; - let adapter = RecordBatchStreamAdapter::try_new(stream) + let adapter = RecordBatchStreamAdapter::try_new_with_metrics(stream, baseline_metric) .context(error::ConvertDfRecordBatchStreamSnafu)?; Ok(Box::pin(adapter)) } + + fn metrics(&self) -> Option { + Some(self.metric.clone_inner()) + } } #[derive(Debug)] @@ -196,6 +212,10 @@ impl DfPhysicalPlan for DfPhysicalPlanAdapter { fn statistics(&self) -> Statistics { Statistics::default() } + + fn metrics(&self) -> Option { + self.0.metrics() + } } #[cfg(test)] diff --git a/src/common/recordbatch/src/adapter.rs b/src/common/recordbatch/src/adapter.rs index 260b52fe41..24d5c2af4a 100644 --- a/src/common/recordbatch/src/adapter.rs +++ b/src/common/recordbatch/src/adapter.rs @@ -20,6 +20,7 @@ use std::task::{Context, Poll}; use datafusion::arrow::datatypes::SchemaRef as DfSchemaRef; use datafusion::error::Result as DfResult; use datafusion::parquet::arrow::async_reader::{AsyncFileReader, ParquetRecordBatchStream}; +use datafusion::physical_plan::metrics::BaselineMetrics; use datafusion::physical_plan::RecordBatchStream as DfRecordBatchStream; use datafusion_common::DataFusionError; use datatypes::schema::{Schema, SchemaRef}; @@ -115,13 +116,31 @@ impl Stream for DfRecordBatchStreamAdapter { pub struct RecordBatchStreamAdapter { schema: SchemaRef, stream: DfSendableRecordBatchStream, + metrics: Option, } impl RecordBatchStreamAdapter { pub fn try_new(stream: DfSendableRecordBatchStream) -> Result { let schema = Arc::new(Schema::try_from(stream.schema()).context(error::SchemaConversionSnafu)?); - Ok(Self { schema, stream }) + Ok(Self { + schema, + stream, + metrics: None, + }) + } + + pub fn try_new_with_metrics( + stream: DfSendableRecordBatchStream, + metrics: BaselineMetrics, + ) -> Result { + let schema = + Arc::new(Schema::try_from(stream.schema()).context(error::SchemaConversionSnafu)?); + Ok(Self { + schema, + stream, + metrics: Some(metrics), + }) } } @@ -135,6 +154,12 @@ impl Stream for RecordBatchStreamAdapter { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let timer = self + .metrics + .as_ref() + .map(|m| m.elapsed_compute().clone()) + .unwrap_or_default(); + let _guard = timer.timer(); match Pin::new(&mut self.stream).poll_next(cx) { Poll::Pending => Poll::Pending, Poll::Ready(Some(df_record_batch)) => { diff --git a/src/table/src/table/scan.rs b/src/table/src/table/scan.rs index c510dbdde8..cdc2d17b79 100644 --- a/src/table/src/table/scan.rs +++ b/src/table/src/table/scan.rs @@ -14,15 +14,20 @@ use std::any::Any; use std::fmt::{Debug, Formatter}; +use std::pin::Pin; use std::sync::{Arc, Mutex}; +use std::task::{Context, Poll}; use common_query::error as query_error; use common_query::error::Result as QueryResult; use common_query::physical_plan::{Partitioning, PhysicalPlan, PhysicalPlanRef}; -use common_recordbatch::SendableRecordBatchStream; +use common_recordbatch::error::Result as RecordBatchResult; +use common_recordbatch::{RecordBatch, RecordBatchStream, SendableRecordBatchStream}; use datafusion::execution::context::TaskContext; +use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use datafusion_physical_expr::PhysicalSortExpr; use datatypes::schema::SchemaRef; +use futures::{Stream, StreamExt}; use snafu::OptionExt; /// Adapt greptime's [SendableRecordBatchStream] to DataFusion's [PhysicalPlan]. @@ -30,6 +35,7 @@ pub struct StreamScanAdapter { stream: Mutex>, schema: SchemaRef, output_ordering: Option>, + metric: ExecutionPlanMetricsSet, } impl Debug for StreamScanAdapter { @@ -49,6 +55,7 @@ impl StreamScanAdapter { stream: Mutex::new(Some(stream)), schema, output_ordering: None, + metric: ExecutionPlanMetricsSet::new(), } } @@ -85,11 +92,46 @@ impl PhysicalPlan for StreamScanAdapter { fn execute( &self, - _partition: usize, + partition: usize, _context: Arc, ) -> QueryResult { let mut stream = self.stream.lock().unwrap(); - stream.take().context(query_error::ExecuteRepeatedlySnafu) + let stream = stream.take().context(query_error::ExecuteRepeatedlySnafu)?; + let baseline_metric = BaselineMetrics::new(&self.metric, partition); + Ok(Box::pin(StreamWithMetricWrapper { + stream, + metric: baseline_metric, + })) + } + + fn metrics(&self) -> Option { + Some(self.metric.clone_inner()) + } +} + +pub struct StreamWithMetricWrapper { + stream: SendableRecordBatchStream, + metric: BaselineMetrics, +} + +impl Stream for StreamWithMetricWrapper { + type Item = RecordBatchResult; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + let _timer = this.metric.elapsed_compute().timer(); + let poll = this.stream.poll_next_unpin(cx); + if let Poll::Ready(Option::Some(Result::Ok(record_batch))) = &poll { + this.metric.record_output(record_batch.num_rows()); + } + + poll + } +} + +impl RecordBatchStream for StreamWithMetricWrapper { + fn schema(&self) -> SchemaRef { + self.stream.schema() } }