diff --git a/src/table/src/table/scan.rs b/src/table/src/table/scan.rs index ab6446684a..cdc2d17b79 100644 --- a/src/table/src/table/scan.rs +++ b/src/table/src/table/scan.rs @@ -14,16 +14,20 @@ use std::any::Any; use std::fmt::{Debug, Formatter}; +use std::pin::Pin; use std::sync::{Arc, Mutex}; +use std::task::{Context, Poll}; use common_query::error as query_error; use common_query::error::Result as QueryResult; use common_query::physical_plan::{Partitioning, PhysicalPlan, PhysicalPlanRef}; -use common_recordbatch::SendableRecordBatchStream; +use common_recordbatch::error::Result as RecordBatchResult; +use common_recordbatch::{RecordBatch, RecordBatchStream, SendableRecordBatchStream}; use datafusion::execution::context::TaskContext; -use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet}; +use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use datafusion_physical_expr::PhysicalSortExpr; use datatypes::schema::SchemaRef; +use futures::{Stream, StreamExt}; use snafu::OptionExt; /// Adapt greptime's [SendableRecordBatchStream] to DataFusion's [PhysicalPlan]. @@ -88,11 +92,16 @@ impl PhysicalPlan for StreamScanAdapter { fn execute( &self, - _partition: usize, + partition: usize, _context: Arc, ) -> QueryResult { let mut stream = self.stream.lock().unwrap(); - stream.take().context(query_error::ExecuteRepeatedlySnafu) + let stream = stream.take().context(query_error::ExecuteRepeatedlySnafu)?; + let baseline_metric = BaselineMetrics::new(&self.metric, partition); + Ok(Box::pin(StreamWithMetricWrapper { + stream, + metric: baseline_metric, + })) } fn metrics(&self) -> Option { @@ -100,6 +109,32 @@ impl PhysicalPlan for StreamScanAdapter { } } +pub struct StreamWithMetricWrapper { + stream: SendableRecordBatchStream, + metric: BaselineMetrics, +} + +impl Stream for StreamWithMetricWrapper { + type Item = RecordBatchResult; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + let _timer = this.metric.elapsed_compute().timer(); + let poll = this.stream.poll_next_unpin(cx); + if let Poll::Ready(Option::Some(Result::Ok(record_batch))) = &poll { + this.metric.record_output(record_batch.num_rows()); + } + + poll + } +} + +impl RecordBatchStream for StreamWithMetricWrapper { + fn schema(&self) -> SchemaRef { + self.stream.schema() + } +} + #[cfg(test)] mod test { use common_recordbatch::{util, RecordBatch, RecordBatches};