feat: implement metric for MergeScanExec (#2166)

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2023-08-14 15:10:45 +08:00
committed by GitHub
parent 606b489d53
commit 393047a541

View File

@@ -30,6 +30,9 @@ use common_recordbatch::error::ExternalSnafu;
use common_recordbatch::{
DfSendableRecordBatchStream, RecordBatch, RecordBatchStreamAdaptor, SendableRecordBatchStream,
};
use datafusion::physical_plan::metrics::{
Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet, Time,
};
use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning};
use datafusion_common::{DataFusionError, Result, Statistics};
use datafusion_expr::{Extension, LogicalPlan, UserDefinedLogicalNodeCore};
@@ -112,6 +115,7 @@ pub struct MergeScanExec {
substrait_plan: Bytes,
arrow_schema: ArrowSchemaRef,
clients: Arc<DatanodeClients>,
metric: ExecutionPlanMetricsSet,
}
impl MergeScanExec {
@@ -130,6 +134,7 @@ impl MergeScanExec {
substrait_plan,
arrow_schema,
clients,
metric: ExecutionPlanMetricsSet::new(),
}
}
@@ -139,11 +144,13 @@ impl MergeScanExec {
let clients = self.clients.clone();
let table = self.table.clone();
let trace_id = context.task_id().and_then(|id| id.parse().ok());
let metric = MergeScanMetric::new(&self.metric);
let stream = try_stream! {
for peer in peers {
let client = clients.get_client(&peer).await;
let database = Database::new(&table.catalog_name, &table.schema_name, client);
let _timer = metric.grpc_time().timer();
let output: Output = database
.logical_plan(substrait_plan.clone(), trace_id)
.await
@@ -164,12 +171,15 @@ impl MergeScanExec {
}
Output::RecordBatches(record_batches) => {
for batch in record_batches.into_iter() {
metric.record_output_batch_rows(batch.num_rows());
yield Self::remove_metadata_from_record_batch(batch);
}
}
Output::Stream(mut stream) => {
while let Some(batch) = stream.next().await {
yield Self::remove_metadata_from_record_batch(batch?);
let batch = batch?;
metric.record_output_batch_rows(batch.num_rows());
yield Self::remove_metadata_from_record_batch(batch);
}
}
}
@@ -241,6 +251,10 @@ impl ExecutionPlan for MergeScanExec {
fn statistics(&self) -> Statistics {
Statistics::default()
}
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metric.clone_inner())
}
}
impl DisplayAs for MergeScanExec {
@@ -252,3 +266,28 @@ impl DisplayAs for MergeScanExec {
write!(f, "]")
}
}
#[derive(Debug, Clone)]
struct MergeScanMetric {
/// Nanosecond spent on fetching data from remote
grpc_time: Time,
/// Count of rows fetched from remote
output_rows: Count,
}
impl MergeScanMetric {
pub fn new(metric: &ExecutionPlanMetricsSet) -> Self {
Self {
grpc_time: MetricBuilder::new(metric).subset_time("gRPC", 1),
output_rows: MetricBuilder::new(metric).output_rows(1),
}
}
pub fn grpc_time(&self) -> &Time {
&self.grpc_time
}
pub fn record_output_batch_rows(&self, num_rows: usize) {
self.output_rows.add(num_rows);
}
}