calc elapsed time and rows

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2023-06-21 17:18:13 +08:00
committed by shuiyisong
parent 5e06be51e4
commit 7ddfa9d3e4

View File

@@ -14,16 +14,20 @@
use std::any::Any;
use std::fmt::{Debug, Formatter};
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use common_query::error as query_error;
use common_query::error::Result as QueryResult;
use common_query::physical_plan::{Partitioning, PhysicalPlan, PhysicalPlanRef};
use common_recordbatch::SendableRecordBatchStream;
use common_recordbatch::error::Result as RecordBatchResult;
use common_recordbatch::{RecordBatch, RecordBatchStream, SendableRecordBatchStream};
use datafusion::execution::context::TaskContext;
use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use datafusion_physical_expr::PhysicalSortExpr;
use datatypes::schema::SchemaRef;
use futures::{Stream, StreamExt};
use snafu::OptionExt;
/// Adapt greptime's [SendableRecordBatchStream] to DataFusion's [PhysicalPlan].
@@ -88,11 +92,16 @@ impl PhysicalPlan for StreamScanAdapter {
fn execute(
&self,
_partition: usize,
partition: usize,
_context: Arc<TaskContext>,
) -> QueryResult<SendableRecordBatchStream> {
let mut stream = self.stream.lock().unwrap();
stream.take().context(query_error::ExecuteRepeatedlySnafu)
let stream = stream.take().context(query_error::ExecuteRepeatedlySnafu)?;
let baseline_metric = BaselineMetrics::new(&self.metric, partition);
Ok(Box::pin(StreamWithMetricWrapper {
stream,
metric: baseline_metric,
}))
}
fn metrics(&self) -> Option<MetricsSet> {
@@ -100,6 +109,32 @@ impl PhysicalPlan for StreamScanAdapter {
}
}
pub struct StreamWithMetricWrapper {
stream: SendableRecordBatchStream,
metric: BaselineMetrics,
}
impl Stream for StreamWithMetricWrapper {
type Item = RecordBatchResult<RecordBatch>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
let _timer = this.metric.elapsed_compute().timer();
let poll = this.stream.poll_next_unpin(cx);
if let Poll::Ready(Option::Some(Result::Ok(record_batch))) = &poll {
this.metric.record_output(record_batch.num_rows());
}
poll
}
}
impl RecordBatchStream for StreamWithMetricWrapper {
fn schema(&self) -> SchemaRef {
self.stream.schema()
}
}
#[cfg(test)]
mod test {
use common_recordbatch::{util, RecordBatch, RecordBatches};