diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index eb3e1d997b..7cb6d24a75 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -237,6 +237,13 @@ impl Instance { let output = match stmt { Statement::Query(_) | Statement::Explain(_) | Statement::Delete(_) => { + // TODO: remove this when format is supported in datafusion + if let Statement::Explain(explain) = &stmt { + if let Some(format) = explain.format() { + query_ctx.set_explain_format(format.to_string()); + } + } + let stmt = QueryStatement::Sql(stmt); let plan = self .statement_executor diff --git a/src/query/Cargo.toml b/src/query/Cargo.toml index 8358a109bb..50676afacb 100644 --- a/src/query/Cargo.toml +++ b/src/query/Cargo.toml @@ -57,6 +57,8 @@ promql-parser.workspace = true prost.workspace = true rand.workspace = true regex.workspace = true +serde.workspace = true +serde_json.workspace = true session.workspace = true snafu.workspace = true sql.workspace = true @@ -81,8 +83,6 @@ num-traits = "0.2" paste.workspace = true pretty_assertions = "1.4.0" rand.workspace = true -serde.workspace = true -serde_json.workspace = true session = { workspace = true, features = ["testing"] } statrs = "0.16" store-api.workspace = true diff --git a/src/query/src/analyze.rs b/src/query/src/analyze.rs index b0158fd087..07726628cf 100644 --- a/src/query/src/analyze.rs +++ b/src/query/src/analyze.rs @@ -17,11 +17,13 @@ //! The code skeleton is taken from `datafusion/physical-plan/src/analyze.rs` use std::any::Any; +use std::fmt::Display; use std::sync::Arc; +use ahash::HashMap; use arrow::array::{StringBuilder, UInt32Builder}; use arrow_schema::{DataType, Field, Schema, SchemaRef}; -use common_recordbatch::adapter::{MetricCollector, RecordBatchMetrics}; +use common_recordbatch::adapter::{MetricCollector, PlanMetrics, RecordBatchMetrics}; use common_recordbatch::{DfRecordBatch, DfSendableRecordBatchStream}; use datafusion::error::Result as DfResult; use datafusion::execution::TaskContext; @@ -34,6 +36,8 @@ use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; use datafusion_common::{internal_err, DataFusionError}; use datafusion_physical_expr::{Distribution, EquivalenceProperties, Partitioning}; use futures::StreamExt; +use serde::Serialize; +use sqlparser::ast::AnalyzeFormat; use crate::dist_plan::MergeScanExec; @@ -46,11 +50,12 @@ pub struct DistAnalyzeExec { input: Arc, schema: SchemaRef, properties: PlanProperties, + format: AnalyzeFormat, } impl DistAnalyzeExec { /// Create a new DistAnalyzeExec - pub fn new(input: Arc) -> Self { + pub fn new(input: Arc, format: AnalyzeFormat) -> Self { let schema = SchemaRef::new(Schema::new(vec![ Field::new(STAGE, DataType::UInt32, true), Field::new(NODE, DataType::UInt32, true), @@ -61,6 +66,7 @@ impl DistAnalyzeExec { input, schema, properties, + format, } } @@ -110,7 +116,7 @@ impl ExecutionPlan for DistAnalyzeExec { self: Arc, mut children: Vec>, ) -> DfResult> { - Ok(Arc::new(Self::new(children.pop().unwrap()))) + Ok(Arc::new(Self::new(children.pop().unwrap(), self.format))) } fn execute( @@ -131,6 +137,7 @@ impl ExecutionPlan for DistAnalyzeExec { let captured_schema = self.schema.clone(); // Finish the input stream and create the output + let format = self.format; let mut input_stream = coalesce_partition_plan.execute(0, context)?; let output = async move { let mut total_rows = 0; @@ -138,7 +145,7 @@ impl ExecutionPlan for DistAnalyzeExec { total_rows += batch.num_rows(); } - create_output_batch(total_rows, captured_input, captured_schema) + create_output_batch(total_rows, captured_input, captured_schema, format) }; Ok(Box::pin(RecordBatchStreamAdapter::new( @@ -166,10 +173,10 @@ impl AnalyzeOutputBuilder { } } - fn append_metric(&mut self, stage: u32, node: u32, metric: RecordBatchMetrics) { + fn append_metric(&mut self, stage: u32, node: u32, content: String) { self.stage_builder.append_value(stage); self.node_builder.append_value(node); - self.plan_builder.append_value(metric.to_string()); + self.plan_builder.append_value(content); } fn append_total_rows(&mut self, total_rows: usize) { @@ -197,6 +204,7 @@ fn create_output_batch( total_rows: usize, input: Arc, schema: SchemaRef, + format: AnalyzeFormat, ) -> DfResult { let mut builder = AnalyzeOutputBuilder::new(schema); @@ -207,14 +215,14 @@ fn create_output_batch( let stage_0_metrics = collector.record_batch_metrics; // Append the metrics of the current stage - builder.append_metric(0, 0, stage_0_metrics); + builder.append_metric(0, 0, metrics_to_string(stage_0_metrics, format)?); // Find merge scan and append its sub_stage_metrics input.apply(|plan| { if let Some(merge_scan) = plan.as_any().downcast_ref::() { let sub_stage_metrics = merge_scan.sub_stage_metrics(); for (node, metric) in sub_stage_metrics.into_iter().enumerate() { - builder.append_metric(1, node as _, metric); + builder.append_metric(1, node as _, metrics_to_string(metric, format)?); } return Ok(TreeNodeRecursion::Stop); } @@ -226,3 +234,87 @@ fn create_output_batch( builder.finish() } + +fn metrics_to_string(metrics: RecordBatchMetrics, format: AnalyzeFormat) -> DfResult { + match format { + AnalyzeFormat::JSON => Ok(JsonMetrics::from_record_batch_metrics(metrics).to_string()), + AnalyzeFormat::TEXT => Ok(metrics.to_string()), + AnalyzeFormat::GRAPHVIZ => Err(DataFusionError::NotImplemented( + "GRAPHVIZ format is not supported for metrics output".to_string(), + )), + } +} + +#[derive(Debug, Default, Serialize)] +struct JsonMetrics { + name: String, + param: String, + + // well-known metrics + output_rows: usize, + // busy time in nanoseconds + elapsed_compute: usize, + + // other metrics + metrics: HashMap, + children: Vec, +} + +impl JsonMetrics { + fn from_record_batch_metrics(record_batch_metrics: RecordBatchMetrics) -> Self { + let mut layers: HashMap> = HashMap::default(); + + for plan_metrics in record_batch_metrics.plan_metrics.into_iter().rev() { + let (level, mut metrics) = Self::from_plan_metrics(plan_metrics); + if let Some(next_layer) = layers.remove(&(level + 1)) { + metrics.children = next_layer; + } + if level == 0 { + return metrics; + } + layers.entry(level).or_default().push(metrics); + } + + // Unreachable path. Each metrics should contains at least one level 0. + Self::default() + } + + /// Convert a [`PlanMetrics`] to a [`JsonMetrics`] without children. + /// + /// Returns the level of the plan and the [`JsonMetrics`]. + fn from_plan_metrics(plan_metrics: PlanMetrics) -> (usize, Self) { + let raw_name = plan_metrics.plan.trim_end(); + let mut elapsed_compute = 0; + let mut output_rows = 0; + let mut other_metrics = HashMap::default(); + let (name, param) = raw_name.split_once(": ").unwrap_or_default(); + + for (name, value) in plan_metrics.metrics.into_iter() { + if name == "elapsed_compute" { + elapsed_compute = value; + } else if name == "output_rows" { + output_rows = value; + } else { + other_metrics.insert(name, value); + } + } + + ( + plan_metrics.level, + Self { + name: name.to_string(), + param: param.to_string(), + output_rows, + elapsed_compute, + metrics: other_metrics, + children: vec![], + }, + ) + } +} + +impl Display for JsonMetrics { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", serde_json::to_string(self).unwrap()) + } +} diff --git a/src/query/src/datafusion.rs b/src/query/src/datafusion.rs index 8df6296a1e..fff002268a 100644 --- a/src/query/src/datafusion.rs +++ b/src/query/src/datafusion.rs @@ -42,6 +42,7 @@ use datatypes::schema::Schema; use futures_util::StreamExt; use session::context::QueryContextRef; use snafu::{ensure, OptionExt, ResultExt}; +use sqlparser::ast::AnalyzeFormat; use table::requests::{DeleteRequest, InsertRequest}; use table::TableRef; @@ -347,7 +348,7 @@ impl DatafusionQueryEngine { #[tracing::instrument(skip_all)] fn optimize_physical_plan( &self, - _ctx: &mut QueryEngineContext, + ctx: &mut QueryEngineContext, plan: Arc, ) -> Result> { let _timer = metrics::OPTIMIZE_PHYSICAL_ELAPSED.start_timer(); @@ -360,7 +361,15 @@ impl DatafusionQueryEngine { // skip optimize AnalyzeExec plan let optimized_plan = if let Some(analyze_plan) = plan.as_any().downcast_ref::() { - Arc::new(DistAnalyzeExec::new(analyze_plan.input().clone())) + let format = if let Some(format) = ctx.query_ctx().explain_format() + && format.to_lowercase() == "json" + { + AnalyzeFormat::JSON + } else { + AnalyzeFormat::TEXT + }; + + Arc::new(DistAnalyzeExec::new(analyze_plan.input().clone(), format)) // let mut new_plan = analyze_plan.input().clone(); // for optimizer in state.physical_optimizers() { // new_plan = optimizer diff --git a/src/session/src/context.rs b/src/session/src/context.rs index 5715447dfc..42264dbf89 100644 --- a/src/session/src/context.rs +++ b/src/session/src/context.rs @@ -67,6 +67,8 @@ pub struct QueryContext { #[derive(Debug, Builder, Clone, Default)] pub struct QueryContextMutableFields { warning: Option, + // TODO: remove this when format is supported in datafusion + explain_format: Option, } impl Display for QueryContext { @@ -302,6 +304,21 @@ impl QueryContext { self.mutable_query_context_data.write().unwrap().warning = Some(msg); } + pub fn explain_format(&self) -> Option { + self.mutable_query_context_data + .read() + .unwrap() + .explain_format + .clone() + } + + pub fn set_explain_format(&self, format: String) { + self.mutable_query_context_data + .write() + .unwrap() + .explain_format = Some(format); + } + pub fn query_timeout(&self) -> Option { self.mutable_session_data.read().unwrap().query_timeout } diff --git a/src/sql/src/statements/explain.rs b/src/sql/src/statements/explain.rs index 96a12c7a41..c893321fdb 100644 --- a/src/sql/src/statements/explain.rs +++ b/src/sql/src/statements/explain.rs @@ -15,7 +15,7 @@ use std::fmt::{Display, Formatter}; use serde::Serialize; -use sqlparser::ast::Statement as SpStatement; +use sqlparser::ast::{AnalyzeFormat, Statement as SpStatement}; use sqlparser_derive::{Visit, VisitMut}; use crate::error::Error; @@ -26,6 +26,15 @@ pub struct Explain { pub inner: SpStatement, } +impl Explain { + pub fn format(&self) -> Option { + match self.inner { + SpStatement::Explain { format, .. } => format, + _ => None, + } + } +} + impl TryFrom for Explain { type Error = Error; diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index c2d6567d33..e1bb8b962b 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -413,6 +413,18 @@ pub async fn test_sql_api(store_type: StorageType) { let body = serde_json::from_str::(&res.text().await).unwrap(); assert_eq!(body.code(), ErrorCode::DatabaseNotFound as u32); + // test analyze format + let res = client + .get("/v1/sql?sql=explain analyze format json select cpu, ts from demo limit 1") + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + let body = serde_json::from_str::(&res.text().await).unwrap(); + let output = body.output(); + assert_eq!(output.len(), 1); + // this is something only json format can show + assert!(format!("{:?}", output[0]).contains("\\\"param\\\"")); + // test parse method let res = client.get("/v1/sql/parse?sql=desc table t").send().await; assert_eq!(res.status(), StatusCode::OK);