feat: support to generate json output for explain analyze in http api (#5567)

* impl

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* integration test

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* Update src/servers/src/http/hints.rs

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* refactor: with FORMAT option for explain format

* lift some well-known metrics

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Ning Sun <sunning@greptime.com>
This commit is contained in:
Ruihang Xia
2025-02-20 21:13:09 -08:00
committed by GitHub
parent 76083892cd
commit d69e93b91a
7 changed files with 159 additions and 13 deletions

View File

@@ -237,6 +237,13 @@ impl Instance {
let output = match stmt {
Statement::Query(_) | Statement::Explain(_) | Statement::Delete(_) => {
// TODO: remove this when format is supported in datafusion
if let Statement::Explain(explain) = &stmt {
if let Some(format) = explain.format() {
query_ctx.set_explain_format(format.to_string());
}
}
let stmt = QueryStatement::Sql(stmt);
let plan = self
.statement_executor

View File

@@ -57,6 +57,8 @@ promql-parser.workspace = true
prost.workspace = true
rand.workspace = true
regex.workspace = true
serde.workspace = true
serde_json.workspace = true
session.workspace = true
snafu.workspace = true
sql.workspace = true
@@ -81,8 +83,6 @@ num-traits = "0.2"
paste.workspace = true
pretty_assertions = "1.4.0"
rand.workspace = true
serde.workspace = true
serde_json.workspace = true
session = { workspace = true, features = ["testing"] }
statrs = "0.16"
store-api.workspace = true

View File

@@ -17,11 +17,13 @@
//! The code skeleton is taken from `datafusion/physical-plan/src/analyze.rs`
use std::any::Any;
use std::fmt::Display;
use std::sync::Arc;
use ahash::HashMap;
use arrow::array::{StringBuilder, UInt32Builder};
use arrow_schema::{DataType, Field, Schema, SchemaRef};
use common_recordbatch::adapter::{MetricCollector, RecordBatchMetrics};
use common_recordbatch::adapter::{MetricCollector, PlanMetrics, RecordBatchMetrics};
use common_recordbatch::{DfRecordBatch, DfSendableRecordBatchStream};
use datafusion::error::Result as DfResult;
use datafusion::execution::TaskContext;
@@ -34,6 +36,8 @@ use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
use datafusion_common::{internal_err, DataFusionError};
use datafusion_physical_expr::{Distribution, EquivalenceProperties, Partitioning};
use futures::StreamExt;
use serde::Serialize;
use sqlparser::ast::AnalyzeFormat;
use crate::dist_plan::MergeScanExec;
@@ -46,11 +50,12 @@ pub struct DistAnalyzeExec {
input: Arc<dyn ExecutionPlan>,
schema: SchemaRef,
properties: PlanProperties,
format: AnalyzeFormat,
}
impl DistAnalyzeExec {
/// Create a new DistAnalyzeExec
pub fn new(input: Arc<dyn ExecutionPlan>) -> Self {
pub fn new(input: Arc<dyn ExecutionPlan>, format: AnalyzeFormat) -> Self {
let schema = SchemaRef::new(Schema::new(vec![
Field::new(STAGE, DataType::UInt32, true),
Field::new(NODE, DataType::UInt32, true),
@@ -61,6 +66,7 @@ impl DistAnalyzeExec {
input,
schema,
properties,
format,
}
}
@@ -110,7 +116,7 @@ impl ExecutionPlan for DistAnalyzeExec {
self: Arc<Self>,
mut children: Vec<Arc<dyn ExecutionPlan>>,
) -> DfResult<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(Self::new(children.pop().unwrap())))
Ok(Arc::new(Self::new(children.pop().unwrap(), self.format)))
}
fn execute(
@@ -131,6 +137,7 @@ impl ExecutionPlan for DistAnalyzeExec {
let captured_schema = self.schema.clone();
// Finish the input stream and create the output
let format = self.format;
let mut input_stream = coalesce_partition_plan.execute(0, context)?;
let output = async move {
let mut total_rows = 0;
@@ -138,7 +145,7 @@ impl ExecutionPlan for DistAnalyzeExec {
total_rows += batch.num_rows();
}
create_output_batch(total_rows, captured_input, captured_schema)
create_output_batch(total_rows, captured_input, captured_schema, format)
};
Ok(Box::pin(RecordBatchStreamAdapter::new(
@@ -166,10 +173,10 @@ impl AnalyzeOutputBuilder {
}
}
fn append_metric(&mut self, stage: u32, node: u32, metric: RecordBatchMetrics) {
fn append_metric(&mut self, stage: u32, node: u32, content: String) {
self.stage_builder.append_value(stage);
self.node_builder.append_value(node);
self.plan_builder.append_value(metric.to_string());
self.plan_builder.append_value(content);
}
fn append_total_rows(&mut self, total_rows: usize) {
@@ -197,6 +204,7 @@ fn create_output_batch(
total_rows: usize,
input: Arc<dyn ExecutionPlan>,
schema: SchemaRef,
format: AnalyzeFormat,
) -> DfResult<DfRecordBatch> {
let mut builder = AnalyzeOutputBuilder::new(schema);
@@ -207,14 +215,14 @@ fn create_output_batch(
let stage_0_metrics = collector.record_batch_metrics;
// Append the metrics of the current stage
builder.append_metric(0, 0, stage_0_metrics);
builder.append_metric(0, 0, metrics_to_string(stage_0_metrics, format)?);
// Find merge scan and append its sub_stage_metrics
input.apply(|plan| {
if let Some(merge_scan) = plan.as_any().downcast_ref::<MergeScanExec>() {
let sub_stage_metrics = merge_scan.sub_stage_metrics();
for (node, metric) in sub_stage_metrics.into_iter().enumerate() {
builder.append_metric(1, node as _, metric);
builder.append_metric(1, node as _, metrics_to_string(metric, format)?);
}
return Ok(TreeNodeRecursion::Stop);
}
@@ -226,3 +234,87 @@ fn create_output_batch(
builder.finish()
}
fn metrics_to_string(metrics: RecordBatchMetrics, format: AnalyzeFormat) -> DfResult<String> {
match format {
AnalyzeFormat::JSON => Ok(JsonMetrics::from_record_batch_metrics(metrics).to_string()),
AnalyzeFormat::TEXT => Ok(metrics.to_string()),
AnalyzeFormat::GRAPHVIZ => Err(DataFusionError::NotImplemented(
"GRAPHVIZ format is not supported for metrics output".to_string(),
)),
}
}
#[derive(Debug, Default, Serialize)]
struct JsonMetrics {
name: String,
param: String,
// well-known metrics
output_rows: usize,
// busy time in nanoseconds
elapsed_compute: usize,
// other metrics
metrics: HashMap<String, usize>,
children: Vec<JsonMetrics>,
}
impl JsonMetrics {
fn from_record_batch_metrics(record_batch_metrics: RecordBatchMetrics) -> Self {
let mut layers: HashMap<usize, Vec<Self>> = HashMap::default();
for plan_metrics in record_batch_metrics.plan_metrics.into_iter().rev() {
let (level, mut metrics) = Self::from_plan_metrics(plan_metrics);
if let Some(next_layer) = layers.remove(&(level + 1)) {
metrics.children = next_layer;
}
if level == 0 {
return metrics;
}
layers.entry(level).or_default().push(metrics);
}
// Unreachable path. Each metrics should contains at least one level 0.
Self::default()
}
/// Convert a [`PlanMetrics`] to a [`JsonMetrics`] without children.
///
/// Returns the level of the plan and the [`JsonMetrics`].
fn from_plan_metrics(plan_metrics: PlanMetrics) -> (usize, Self) {
let raw_name = plan_metrics.plan.trim_end();
let mut elapsed_compute = 0;
let mut output_rows = 0;
let mut other_metrics = HashMap::default();
let (name, param) = raw_name.split_once(": ").unwrap_or_default();
for (name, value) in plan_metrics.metrics.into_iter() {
if name == "elapsed_compute" {
elapsed_compute = value;
} else if name == "output_rows" {
output_rows = value;
} else {
other_metrics.insert(name, value);
}
}
(
plan_metrics.level,
Self {
name: name.to_string(),
param: param.to_string(),
output_rows,
elapsed_compute,
metrics: other_metrics,
children: vec![],
},
)
}
}
impl Display for JsonMetrics {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", serde_json::to_string(self).unwrap())
}
}

View File

@@ -42,6 +42,7 @@ use datatypes::schema::Schema;
use futures_util::StreamExt;
use session::context::QueryContextRef;
use snafu::{ensure, OptionExt, ResultExt};
use sqlparser::ast::AnalyzeFormat;
use table::requests::{DeleteRequest, InsertRequest};
use table::TableRef;
@@ -347,7 +348,7 @@ impl DatafusionQueryEngine {
#[tracing::instrument(skip_all)]
fn optimize_physical_plan(
&self,
_ctx: &mut QueryEngineContext,
ctx: &mut QueryEngineContext,
plan: Arc<dyn ExecutionPlan>,
) -> Result<Arc<dyn ExecutionPlan>> {
let _timer = metrics::OPTIMIZE_PHYSICAL_ELAPSED.start_timer();
@@ -360,7 +361,15 @@ impl DatafusionQueryEngine {
// skip optimize AnalyzeExec plan
let optimized_plan = if let Some(analyze_plan) = plan.as_any().downcast_ref::<AnalyzeExec>()
{
Arc::new(DistAnalyzeExec::new(analyze_plan.input().clone()))
let format = if let Some(format) = ctx.query_ctx().explain_format()
&& format.to_lowercase() == "json"
{
AnalyzeFormat::JSON
} else {
AnalyzeFormat::TEXT
};
Arc::new(DistAnalyzeExec::new(analyze_plan.input().clone(), format))
// let mut new_plan = analyze_plan.input().clone();
// for optimizer in state.physical_optimizers() {
// new_plan = optimizer

View File

@@ -67,6 +67,8 @@ pub struct QueryContext {
#[derive(Debug, Builder, Clone, Default)]
pub struct QueryContextMutableFields {
warning: Option<String>,
// TODO: remove this when format is supported in datafusion
explain_format: Option<String>,
}
impl Display for QueryContext {
@@ -302,6 +304,21 @@ impl QueryContext {
self.mutable_query_context_data.write().unwrap().warning = Some(msg);
}
pub fn explain_format(&self) -> Option<String> {
self.mutable_query_context_data
.read()
.unwrap()
.explain_format
.clone()
}
pub fn set_explain_format(&self, format: String) {
self.mutable_query_context_data
.write()
.unwrap()
.explain_format = Some(format);
}
pub fn query_timeout(&self) -> Option<Duration> {
self.mutable_session_data.read().unwrap().query_timeout
}

View File

@@ -15,7 +15,7 @@
use std::fmt::{Display, Formatter};
use serde::Serialize;
use sqlparser::ast::Statement as SpStatement;
use sqlparser::ast::{AnalyzeFormat, Statement as SpStatement};
use sqlparser_derive::{Visit, VisitMut};
use crate::error::Error;
@@ -26,6 +26,15 @@ pub struct Explain {
pub inner: SpStatement,
}
impl Explain {
pub fn format(&self) -> Option<AnalyzeFormat> {
match self.inner {
SpStatement::Explain { format, .. } => format,
_ => None,
}
}
}
impl TryFrom<SpStatement> for Explain {
type Error = Error;

View File

@@ -413,6 +413,18 @@ pub async fn test_sql_api(store_type: StorageType) {
let body = serde_json::from_str::<ErrorResponse>(&res.text().await).unwrap();
assert_eq!(body.code(), ErrorCode::DatabaseNotFound as u32);
// test analyze format
let res = client
.get("/v1/sql?sql=explain analyze format json select cpu, ts from demo limit 1")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let body = serde_json::from_str::<GreptimedbV1Response>(&res.text().await).unwrap();
let output = body.output();
assert_eq!(output.len(), 1);
// this is something only json format can show
assert!(format!("{:?}", output[0]).contains("\\\"param\\\""));
// test parse method
let res = client.get("/v1/sql/parse?sql=desc table t").send().await;
assert_eq!(res.status(), StatusCode::OK);