feat: implement metrics for Scan plan (#1812)

* add metrics in some interfaces

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* calc elapsed time and rows

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2023-06-25 14:06:50 +08:00
committed by GitHub
parent 0fb18245b8
commit 62f660e439
3 changed files with 93 additions and 6 deletions

View File

@@ -22,6 +22,7 @@ use datafusion::arrow::datatypes::SchemaRef as DfSchemaRef;
use datafusion::error::Result as DfResult;
pub use datafusion::execution::context::{SessionContext, TaskContext};
use datafusion::physical_plan::expressions::PhysicalSortExpr;
use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
pub use datafusion::physical_plan::Partitioning;
use datafusion::physical_plan::Statistics;
use datatypes::schema::SchemaRef;
@@ -69,6 +70,10 @@ pub trait PhysicalPlan: Debug + Send + Sync {
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream>;
fn metrics(&self) -> Option<MetricsSet> {
None
}
}
/// Adapt DataFusion's [`ExecutionPlan`](DfPhysicalPlan) to GreptimeDB's [`PhysicalPlan`].
@@ -76,11 +81,16 @@ pub trait PhysicalPlan: Debug + Send + Sync {
pub struct PhysicalPlanAdapter {
schema: SchemaRef,
df_plan: Arc<dyn DfPhysicalPlan>,
metric: ExecutionPlanMetricsSet,
}
impl PhysicalPlanAdapter {
pub fn new(schema: SchemaRef, df_plan: Arc<dyn DfPhysicalPlan>) -> Self {
Self { schema, df_plan }
Self {
schema,
df_plan,
metric: ExecutionPlanMetricsSet::new(),
}
}
pub fn df_plan(&self) -> Arc<dyn DfPhysicalPlan> {
@@ -127,15 +137,21 @@ impl PhysicalPlan for PhysicalPlanAdapter {
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let baseline_metric = BaselineMetrics::new(&self.metric, partition);
let df_plan = self.df_plan.clone();
let stream = df_plan
.execute(partition, context)
.context(error::GeneralDataFusionSnafu)?;
let adapter = RecordBatchStreamAdapter::try_new(stream)
let adapter = RecordBatchStreamAdapter::try_new_with_metrics(stream, baseline_metric)
.context(error::ConvertDfRecordBatchStreamSnafu)?;
Ok(Box::pin(adapter))
}
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metric.clone_inner())
}
}
#[derive(Debug)]
@@ -196,6 +212,10 @@ impl DfPhysicalPlan for DfPhysicalPlanAdapter {
fn statistics(&self) -> Statistics {
Statistics::default()
}
fn metrics(&self) -> Option<MetricsSet> {
self.0.metrics()
}
}
#[cfg(test)]

View File

@@ -20,6 +20,7 @@ use std::task::{Context, Poll};
use datafusion::arrow::datatypes::SchemaRef as DfSchemaRef;
use datafusion::error::Result as DfResult;
use datafusion::parquet::arrow::async_reader::{AsyncFileReader, ParquetRecordBatchStream};
use datafusion::physical_plan::metrics::BaselineMetrics;
use datafusion::physical_plan::RecordBatchStream as DfRecordBatchStream;
use datafusion_common::DataFusionError;
use datatypes::schema::{Schema, SchemaRef};
@@ -115,13 +116,31 @@ impl Stream for DfRecordBatchStreamAdapter {
pub struct RecordBatchStreamAdapter {
schema: SchemaRef,
stream: DfSendableRecordBatchStream,
metrics: Option<BaselineMetrics>,
}
impl RecordBatchStreamAdapter {
pub fn try_new(stream: DfSendableRecordBatchStream) -> Result<Self> {
let schema =
Arc::new(Schema::try_from(stream.schema()).context(error::SchemaConversionSnafu)?);
Ok(Self { schema, stream })
Ok(Self {
schema,
stream,
metrics: None,
})
}
pub fn try_new_with_metrics(
stream: DfSendableRecordBatchStream,
metrics: BaselineMetrics,
) -> Result<Self> {
let schema =
Arc::new(Schema::try_from(stream.schema()).context(error::SchemaConversionSnafu)?);
Ok(Self {
schema,
stream,
metrics: Some(metrics),
})
}
}
@@ -135,6 +154,12 @@ impl Stream for RecordBatchStreamAdapter {
type Item = Result<RecordBatch>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let timer = self
.metrics
.as_ref()
.map(|m| m.elapsed_compute().clone())
.unwrap_or_default();
let _guard = timer.timer();
match Pin::new(&mut self.stream).poll_next(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Some(df_record_batch)) => {

View File

@@ -14,15 +14,20 @@
use std::any::Any;
use std::fmt::{Debug, Formatter};
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use common_query::error as query_error;
use common_query::error::Result as QueryResult;
use common_query::physical_plan::{Partitioning, PhysicalPlan, PhysicalPlanRef};
use common_recordbatch::SendableRecordBatchStream;
use common_recordbatch::error::Result as RecordBatchResult;
use common_recordbatch::{RecordBatch, RecordBatchStream, SendableRecordBatchStream};
use datafusion::execution::context::TaskContext;
use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use datafusion_physical_expr::PhysicalSortExpr;
use datatypes::schema::SchemaRef;
use futures::{Stream, StreamExt};
use snafu::OptionExt;
/// Adapt greptime's [SendableRecordBatchStream] to DataFusion's [PhysicalPlan].
@@ -30,6 +35,7 @@ pub struct StreamScanAdapter {
stream: Mutex<Option<SendableRecordBatchStream>>,
schema: SchemaRef,
output_ordering: Option<Vec<PhysicalSortExpr>>,
metric: ExecutionPlanMetricsSet,
}
impl Debug for StreamScanAdapter {
@@ -49,6 +55,7 @@ impl StreamScanAdapter {
stream: Mutex::new(Some(stream)),
schema,
output_ordering: None,
metric: ExecutionPlanMetricsSet::new(),
}
}
@@ -85,11 +92,46 @@ impl PhysicalPlan for StreamScanAdapter {
fn execute(
&self,
_partition: usize,
partition: usize,
_context: Arc<TaskContext>,
) -> QueryResult<SendableRecordBatchStream> {
let mut stream = self.stream.lock().unwrap();
stream.take().context(query_error::ExecuteRepeatedlySnafu)
let stream = stream.take().context(query_error::ExecuteRepeatedlySnafu)?;
let baseline_metric = BaselineMetrics::new(&self.metric, partition);
Ok(Box::pin(StreamWithMetricWrapper {
stream,
metric: baseline_metric,
}))
}
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metric.clone_inner())
}
}
pub struct StreamWithMetricWrapper {
stream: SendableRecordBatchStream,
metric: BaselineMetrics,
}
impl Stream for StreamWithMetricWrapper {
type Item = RecordBatchResult<RecordBatch>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
let _timer = this.metric.elapsed_compute().timer();
let poll = this.stream.poll_next_unpin(cx);
if let Poll::Ready(Option::Some(Result::Ok(record_batch))) = &poll {
this.metric.record_output(record_batch.num_rows());
}
poll
}
}
impl RecordBatchStream for StreamWithMetricWrapper {
fn schema(&self) -> SchemaRef {
self.stream.schema()
}
}