feat: support distributed EXPLAIN ANALYZE (#3908)

* feat: fetch and pass per-plan metrics

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* impl DistAnalyzeExec

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update sqlness results

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix clippy

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix typo

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix typo again

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* Update src/query/src/analyze.rs

Co-authored-by: Jeremyhi <jiachun_feng@proton.me>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: Jeremyhi <jiachun_feng@proton.me>
This commit is contained in:
Ruihang Xia
2024-05-10 20:31:29 +08:00
committed by GitHub
parent 06e1c43743
commit aec5cca2c7
15 changed files with 601 additions and 172 deletions

229
src/query/src/analyze.rs Normal file
View File

@@ -0,0 +1,229 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Customized `ANALYZE` plan that aware of [MergeScanExec].
//!
//! The code skeleton is taken from `datafusion/physical-plan/src/analyze.rs`
use std::any::Any;
use std::sync::Arc;
use arrow::array::{StringBuilder, UInt32Builder};
use arrow_schema::{DataType, Field, Schema, SchemaRef};
use common_query::{DfPhysicalPlan, DfPhysicalPlanRef};
use common_recordbatch::adapter::{MetricCollector, RecordBatchMetrics};
use common_recordbatch::{DfRecordBatch, DfSendableRecordBatchStream};
use datafusion::error::Result as DfResult;
use datafusion::execution::TaskContext;
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::{
accept, DisplayAs, DisplayFormatType, ExecutionPlanProperties, PlanProperties,
};
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
use datafusion_common::{internal_err, DataFusionError};
use datafusion_physical_expr::{Distribution, EquivalenceProperties, Partitioning};
use futures::StreamExt;
use crate::dist_plan::MergeScanExec;
const STAGE: &str = "stage";
const NODE: &str = "node";
const PLAN: &str = "plan";
#[derive(Debug)]
pub struct DistAnalyzeExec {
input: DfPhysicalPlanRef,
schema: SchemaRef,
properties: PlanProperties,
}
impl DistAnalyzeExec {
/// Create a new DistAnalyzeExec
pub fn new(input: DfPhysicalPlanRef) -> Self {
let schema = SchemaRef::new(Schema::new(vec![
Field::new(STAGE, DataType::UInt32, true),
Field::new(NODE, DataType::UInt32, true),
Field::new(PLAN, DataType::Utf8, true),
]));
let properties = Self::compute_properties(&input, schema.clone());
Self {
input,
schema,
properties,
}
}
/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
fn compute_properties(input: &DfPhysicalPlanRef, schema: SchemaRef) -> PlanProperties {
let eq_properties = EquivalenceProperties::new(schema);
let output_partitioning = Partitioning::UnknownPartitioning(1);
let exec_mode = input.execution_mode();
PlanProperties::new(eq_properties, output_partitioning, exec_mode)
}
}
impl DisplayAs for DistAnalyzeExec {
fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "DistAnalyzeExec",)
}
}
}
}
impl DfPhysicalPlan for DistAnalyzeExec {
fn name(&self) -> &'static str {
"DistAnalyzeExec"
}
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
self
}
fn properties(&self) -> &PlanProperties {
&self.properties
}
fn children(&self) -> Vec<DfPhysicalPlanRef> {
vec![self.input.clone()]
}
/// AnalyzeExec is handled specially so this value is ignored
fn required_input_distribution(&self) -> Vec<Distribution> {
vec![]
}
fn with_new_children(
self: Arc<Self>,
mut children: Vec<DfPhysicalPlanRef>,
) -> DfResult<DfPhysicalPlanRef> {
Ok(Arc::new(Self::new(children.pop().unwrap())))
}
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> DfResult<DfSendableRecordBatchStream> {
if 0 != partition {
return internal_err!("AnalyzeExec invalid partition. Expected 0, got {partition}");
}
// Wrap the input plan using `CoalescePartitionsExec` to poll multiple
// partitions in parallel
let coalesce_partition_plan = CoalescePartitionsExec::new(self.input.clone());
// Create future that computes thefinal output
let captured_input = self.input.clone();
let captured_schema = self.schema.clone();
// Finish the input stream and create the output
let mut input_stream = coalesce_partition_plan.execute(0, context)?;
let output = async move {
let mut total_rows = 0;
while let Some(batch) = input_stream.next().await.transpose()? {
total_rows += batch.num_rows();
}
create_output_batch(total_rows, captured_input, captured_schema)
};
Ok(Box::pin(RecordBatchStreamAdapter::new(
self.schema.clone(),
futures::stream::once(output),
)))
}
}
/// Build the result [`DfRecordBatch`] of `ANALYZE`
struct AnalyzeOutputBuilder {
stage_builder: UInt32Builder,
node_builder: UInt32Builder,
plan_builder: StringBuilder,
schema: SchemaRef,
}
impl AnalyzeOutputBuilder {
fn new(schema: SchemaRef) -> Self {
Self {
stage_builder: UInt32Builder::with_capacity(4),
node_builder: UInt32Builder::with_capacity(4),
plan_builder: StringBuilder::with_capacity(1, 1024),
schema,
}
}
fn append_metric(&mut self, stage: u32, node: u32, metric: RecordBatchMetrics) {
self.stage_builder.append_value(stage);
self.node_builder.append_value(node);
self.plan_builder.append_value(metric.to_string());
}
fn append_total_rows(&mut self, total_rows: usize) {
self.stage_builder.append_null();
self.node_builder.append_null();
self.plan_builder
.append_value(format!("Total rows: {}", total_rows));
}
fn finish(mut self) -> DfResult<DfRecordBatch> {
DfRecordBatch::try_new(
self.schema,
vec![
Arc::new(self.stage_builder.finish()),
Arc::new(self.node_builder.finish()),
Arc::new(self.plan_builder.finish()),
],
)
.map_err(DataFusionError::from)
}
}
/// Creates the output of AnalyzeExec as a RecordBatch
fn create_output_batch(
total_rows: usize,
input: DfPhysicalPlanRef,
schema: SchemaRef,
) -> DfResult<DfRecordBatch> {
let mut builder = AnalyzeOutputBuilder::new(schema);
// Treat the current stage as stage 0. Fetch its metrics
let mut collector = MetricCollector::default();
// Safety: metric collector won't return error
accept(input.as_ref(), &mut collector).unwrap();
let stage_0_metrics = collector.record_batch_metrics;
// Append the metrics of the current stage
builder.append_metric(0, 0, stage_0_metrics);
// Find merge scan and append its sub_stage_metrics
input.apply(&mut |plan| {
if let Some(merge_scan) = plan.as_any().downcast_ref::<MergeScanExec>() {
let sub_stage_metrics = merge_scan.sub_stage_metrics();
for (node, metric) in sub_stage_metrics.into_iter().enumerate() {
builder.append_metric(1, node as _, metric);
}
return Ok(TreeNodeRecursion::Stop);
}
Ok(TreeNodeRecursion::Continue)
})?;
// Write total rows
builder.append_total_rows(total_rows);
builder.finish()
}

View File

@@ -44,6 +44,7 @@ use snafu::{ensure, OptionExt, ResultExt};
use table::requests::{DeleteRequest, InsertRequest};
use table::TableRef;
use crate::analyze::DistAnalyzeExec;
use crate::dataframe::DataFrame;
pub use crate::datafusion::planner::DfContextProviderAdapter;
use crate::error::{
@@ -407,9 +408,7 @@ impl PhysicalOptimizer for DatafusionQueryEngine {
.optimize(new_plan, config)
.context(DataFusionSnafu)?;
}
Arc::new(analyze_plan.clone())
.with_new_children(vec![new_plan])
.unwrap()
Arc::new(DistAnalyzeExec::new(new_plan))
} else {
let mut new_plan = df_plan;
for optimizer in state.physical_optimizers() {

View File

@@ -18,5 +18,5 @@ mod merge_scan;
mod planner;
pub use analyzer::DistPlannerAnalyzer;
pub use merge_scan::MergeScanLogicalPlan;
pub use merge_scan::{MergeScanExec, MergeScanLogicalPlan};
pub use planner::DistExtensionPlanner;

View File

@@ -13,7 +13,7 @@
// limitations under the License.
use std::any::Any;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use arrow_schema::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef};
@@ -24,7 +24,7 @@ use common_error::ext::BoxedError;
use common_meta::table_name::TableName;
use common_plugins::GREPTIME_EXEC_READ_COST;
use common_query::physical_plan::TaskContext;
use common_recordbatch::adapter::DfRecordBatchStreamAdapter;
use common_recordbatch::adapter::{DfRecordBatchStreamAdapter, RecordBatchMetrics};
use common_recordbatch::error::ExternalSnafu;
use common_recordbatch::{
DfSendableRecordBatchStream, RecordBatch, RecordBatchStreamWrapper, SendableRecordBatchStream,
@@ -128,6 +128,8 @@ pub struct MergeScanExec {
region_query_handler: RegionQueryHandlerRef,
metric: ExecutionPlanMetricsSet,
properties: PlanProperties,
/// Metrics from sub stages
sub_stage_metrics: Arc<Mutex<Vec<RecordBatchMetrics>>>,
query_ctx: QueryContextRef,
}
@@ -166,6 +168,7 @@ impl MergeScanExec {
arrow_schema: arrow_schema_without_metadata,
region_query_handler,
metric: ExecutionPlanMetricsSet::new(),
sub_stage_metrics: Arc::default(),
properties,
query_ctx,
})
@@ -185,6 +188,7 @@ impl MergeScanExec {
let timezone = self.query_ctx.timezone().to_string();
let extensions = self.query_ctx.extensions();
let sub_sgate_metrics_moved = self.sub_stage_metrics.clone();
let stream = Box::pin(stream!({
MERGE_SCAN_REGIONS.observe(regions.len() as f64);
let _finish_timer = metric.finish_time().timer();
@@ -236,6 +240,8 @@ impl MergeScanExec {
// reset poll timer
poll_timer = Instant::now();
}
// process metrics after all data is drained.
if let Some(metrics) = stream.metrics() {
let (c, s) = parse_catalog_and_schema_from_db_string(&dbname);
let value = read_meter!(
@@ -247,6 +253,9 @@ impl MergeScanExec {
}
);
metric.record_greptime_exec_cost(value as usize);
// record metrics from sub sgates
sub_sgate_metrics_moved.lock().unwrap().push(metrics);
}
MERGE_SCAN_POLL_ELAPSED.observe(poll_duration.as_secs_f64());
@@ -279,6 +288,10 @@ impl MergeScanExec {
let schema = Schema::try_from(arrow_schema).context(ConvertSchemaSnafu)?;
Ok(Arc::new(schema))
}
pub fn sub_stage_metrics(&self) -> Vec<RecordBatchMetrics> {
self.sub_stage_metrics.lock().unwrap().clone()
}
}
impl ExecutionPlan for MergeScanExec {

View File

@@ -15,6 +15,7 @@
#![feature(let_chains)]
#![feature(int_roundings)]
mod analyze;
pub mod dataframe;
pub mod datafusion;
pub mod dist_plan;