feat: support explain analyze verbose (#5763)

* Add explain_verbose to QueryContext

* feat: fmt plan by display type

* feat: update proto to use ExplainOptions

* feat: display more info in verbose mode

* chore: fix clippy

* test: add sqlness test

* test: update sqlness result

* chore: update proto version

* chore: Simplify QueryContextBuilder::explain_options using get_or_insert_default
This commit is contained in:
Yingwen
2025-03-25 11:48:36 +08:00
committed by GitHub
parent 3b547d9d13
commit d88482b996
15 changed files with 261 additions and 28 deletions

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::Display;
use std::fmt::{self, Display};
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
@@ -28,7 +28,7 @@ use datafusion::logical_expr::Expr;
use datafusion::physical_expr::create_physical_expr;
use datafusion::physical_plan::metrics::{BaselineMetrics, MetricValue};
use datafusion::physical_plan::{
accept, displayable, ExecutionPlan, ExecutionPlanVisitor, PhysicalExpr,
accept, DisplayFormatType, ExecutionPlan, ExecutionPlanVisitor, PhysicalExpr,
RecordBatchStream as DfRecordBatchStream,
};
use datafusion_common::arrow::error::ArrowError;
@@ -206,13 +206,16 @@ impl Stream for DfRecordBatchStreamAdapter {
}
/// DataFusion [SendableRecordBatchStream](DfSendableRecordBatchStream) -> Greptime [RecordBatchStream].
/// The reverse one is [DfRecordBatchStreamAdapter]
/// The reverse one is [DfRecordBatchStreamAdapter].
/// It can collect metrics from DataFusion execution plan.
pub struct RecordBatchStreamAdapter {
schema: SchemaRef,
stream: DfSendableRecordBatchStream,
metrics: Option<BaselineMetrics>,
/// Aggregated plan-level metrics. Resolved after an [ExecutionPlan] is finished.
metrics_2: Metrics,
/// Display plan and metrics in verbose mode.
explain_verbose: bool,
}
/// Json encoded metrics. Contains metric from a whole plan tree.
@@ -231,6 +234,7 @@ impl RecordBatchStreamAdapter {
stream,
metrics: None,
metrics_2: Metrics::Unavailable,
explain_verbose: false,
})
}
@@ -246,12 +250,18 @@ impl RecordBatchStreamAdapter {
stream,
metrics: Some(metrics),
metrics_2: Metrics::Unresolved(df_plan),
explain_verbose: false,
})
}
pub fn set_metrics2(&mut self, plan: Arc<dyn ExecutionPlan>) {
self.metrics_2 = Metrics::Unresolved(plan)
}
/// Set the verbose mode for displaying plan and metrics.
pub fn set_explain_verbose(&mut self, verbose: bool) {
self.explain_verbose = verbose;
}
}
impl RecordBatchStream for RecordBatchStreamAdapter {
@@ -296,7 +306,7 @@ impl Stream for RecordBatchStreamAdapter {
}
Poll::Ready(None) => {
if let Metrics::Unresolved(df_plan) = &self.metrics_2 {
let mut metric_collector = MetricCollector::default();
let mut metric_collector = MetricCollector::new(self.explain_verbose);
accept(df_plan.as_ref(), &mut metric_collector).unwrap();
self.metrics_2 = Metrics::Resolved(metric_collector.record_batch_metrics);
}
@@ -312,10 +322,20 @@ impl Stream for RecordBatchStreamAdapter {
}
/// An [ExecutionPlanVisitor] to collect metrics from a [ExecutionPlan].
#[derive(Default)]
pub struct MetricCollector {
current_level: usize,
pub record_batch_metrics: RecordBatchMetrics,
verbose: bool,
}
impl MetricCollector {
pub fn new(verbose: bool) -> Self {
Self {
current_level: 0,
record_batch_metrics: RecordBatchMetrics::default(),
verbose,
}
}
}
impl ExecutionPlanVisitor for MetricCollector {
@@ -339,7 +359,7 @@ impl ExecutionPlanVisitor for MetricCollector {
.sorted_for_display()
.timestamps_removed();
let mut plan_metric = PlanMetrics {
plan: displayable(plan).one_line().to_string(),
plan: one_line(plan, self.verbose).to_string(),
level: self.current_level,
metrics: Vec::with_capacity(metric.iter().size_hint().0),
};
@@ -371,6 +391,29 @@ impl ExecutionPlanVisitor for MetricCollector {
}
}
/// Returns a single-line summary of the root of the plan.
/// If the `verbose` flag is set, it will display detailed information about the plan.
fn one_line(plan: &dyn ExecutionPlan, verbose: bool) -> impl fmt::Display + '_ {
struct Wrapper<'a> {
plan: &'a dyn ExecutionPlan,
format_type: DisplayFormatType,
}
impl fmt::Display for Wrapper<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.plan.fmt_as(self.format_type, f)?;
writeln!(f)
}
}
let format_type = if verbose {
DisplayFormatType::Verbose
} else {
DisplayFormatType::Default
};
Wrapper { plan, format_type }
}
/// [`RecordBatchMetrics`] carrys metrics value
/// from datanode to frontend through gRPC
#[derive(serde::Serialize, serde::Deserialize, Default, Debug, Clone)]