refactor: remove DfPlan wrapper (#4733)

* refactor: remove DfPlan wrapper

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* clean up

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove unused errors

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix test assertion

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2024-09-19 20:29:33 +08:00
committed by GitHub
parent 1acda74c26
commit f5cf25b0db
38 changed files with 195 additions and 294 deletions

12
Cargo.lock generated
View File

@@ -1014,7 +1014,7 @@ dependencies = [
"bitflags 2.5.0",
"cexpr",
"clang-sys",
"itertools 0.10.5",
"itertools 0.12.1",
"lazy_static",
"lazycell",
"log",
@@ -4019,6 +4019,7 @@ dependencies = [
"common-test-util",
"common-time",
"common-version",
"datafusion-expr",
"datanode",
"datatypes",
"futures",
@@ -4836,7 +4837,7 @@ dependencies = [
"httpdate",
"itoa",
"pin-project-lite",
"socket2 0.4.10",
"socket2 0.5.7",
"tokio",
"tower-service",
"tracing",
@@ -8439,7 +8440,7 @@ checksum = "22505a5c94da8e3b7c2996394d1c933236c4d743e81a410bcca4e6989fc066a4"
dependencies = [
"bytes",
"heck 0.5.0",
"itertools 0.10.5",
"itertools 0.12.1",
"log",
"multimap",
"once_cell",
@@ -8491,7 +8492,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "81bddcdb20abf9501610992b6759a4c888aef7d1a7247ef75e2404275ac24af1"
dependencies = [
"anyhow",
"itertools 0.10.5",
"itertools 0.12.1",
"proc-macro2",
"quote",
"syn 2.0.66",
@@ -8651,7 +8652,7 @@ dependencies = [
"indoc",
"libc",
"memoffset 0.9.1",
"parking_lot 0.11.2",
"parking_lot 0.12.3",
"portable-atomic",
"pyo3-build-config",
"pyo3-ffi",
@@ -10480,6 +10481,7 @@ dependencies = [
"dashmap",
"datafusion",
"datafusion-common",
"datafusion-expr",
"datatypes",
"derive_builder 0.12.0",
"futures",

View File

@@ -35,7 +35,6 @@ use either::Either;
use meta_client::client::MetaClientBuilder;
use query::datafusion::DatafusionQueryEngine;
use query::parser::QueryLanguageParser;
use query::plan::LogicalPlan;
use query::query_engine::{DefaultSerializer, QueryEngineState};
use query::QueryEngine;
use rustyline::error::ReadlineError;
@@ -179,7 +178,7 @@ impl Repl {
.await
.context(PlanStatementSnafu)?;
let LogicalPlan::DfPlan(plan) = query_engine
let plan = query_engine
.optimize(&query_engine.engine_context(query_ctx), &plan)
.context(PlanStatementSnafu)?;

View File

@@ -843,7 +843,7 @@ impl RegionServerInner {
let result = self
.query_engine
.execute(request.plan.into(), query_ctx)
.execute(request.plan, query_ctx)
.await
.context(ExecuteLogicalPlanSnafu)?;

View File

@@ -24,8 +24,8 @@ use common_function::scalars::aggregate::AggregateFunctionMetaRef;
use common_query::prelude::ScalarUdf;
use common_query::Output;
use common_runtime::Runtime;
use datafusion_expr::LogicalPlan;
use query::dataframe::DataFrame;
use query::plan::LogicalPlan;
use query::planner::LogicalPlanner;
use query::query_engine::{DescribeResult, QueryEngineState};
use query::{QueryEngine, QueryEngineContext};

View File

@@ -41,7 +41,6 @@ use datafusion_expr::{
BinaryExpr, Expr, Operator, Projection, ScalarUDFImpl, Signature, TypeSignature, Volatility,
};
use query::parser::QueryLanguageParser;
use query::plan::LogicalPlan;
use query::query_engine::DefaultSerializer;
use query::QueryEngine;
use snafu::ResultExt;
@@ -111,7 +110,6 @@ pub async fn sql_to_flow_plan(
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let LogicalPlan::DfPlan(plan) = plan;
let opted_plan = apply_df_optimizer(plan).await?;

View File

@@ -163,7 +163,6 @@ mod test {
use itertools::Itertools;
use prost::Message;
use query::parser::QueryLanguageParser;
use query::plan::LogicalPlan;
use query::query_engine::DefaultSerializer;
use query::QueryEngine;
use session::context::QueryContext;
@@ -274,7 +273,6 @@ mod test {
.plan(stmt, QueryContext::arc())
.await
.unwrap();
let LogicalPlan::DfPlan(plan) = plan;
let plan = apply_df_optimizer(plan).await.unwrap();
// encode then decode so to rely on the impl of conversion from logical plan to substrait plan
@@ -297,7 +295,6 @@ mod test {
.plan(stmt, QueryContext::arc())
.await
.unwrap();
let LogicalPlan::DfPlan(plan) = plan;
let plan = apply_df_optimizer(plan).await;
assert!(plan.is_err());

View File

@@ -37,6 +37,7 @@ common-runtime.workspace = true
common-telemetry.workspace = true
common-time.workspace = true
common-version.workspace = true
datafusion-expr.workspace = true
datanode.workspace = true
humantime-serde.workspace = true
lazy_static.workspace = true

View File

@@ -40,6 +40,7 @@ use common_procedure::options::ProcedureConfig;
use common_procedure::ProcedureManagerRef;
use common_query::Output;
use common_telemetry::{debug, error, tracing};
use datafusion_expr::LogicalPlan;
use log_store::raft_engine::RaftEngineBackend;
use operator::delete::DeleterRef;
use operator::insert::InserterRef;
@@ -48,7 +49,6 @@ use pipeline::pipeline_operator::PipelineOperator;
use prometheus::HistogramTimer;
use query::metrics::OnDone;
use query::parser::{PromQuery, QueryLanguageParser, QueryStatement};
use query::plan::LogicalPlan;
use query::query_engine::options::{validate_catalog_and_schema, QueryOptions};
use query::query_engine::DescribeResult;
use query::QueryEngineRef;

View File

@@ -133,13 +133,6 @@ pub enum Error {
source: query::error::Error,
},
#[snafu(display("Failed to get schema from logical plan"))]
GetSchema {
#[snafu(implicit)]
location: Location,
source: query::error::Error,
},
#[snafu(display("Column datatype error"))]
ColumnDataType {
#[snafu(implicit)]
@@ -184,6 +177,13 @@ pub enum Error {
source: datatypes::error::Error,
},
#[snafu(display("Failed to convert datafusion schema"))]
ConvertSchema {
source: datatypes::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to convert expr to struct"))]
InvalidExpr {
#[snafu(implicit)]
@@ -795,6 +795,7 @@ impl ErrorExt for Error {
| Error::PrepareFileTable { .. }
| Error::InferFileTableSchema { .. }
| Error::SchemaIncompatible { .. }
| Error::ConvertSchema { .. }
| Error::UnsupportedRegionRequest { .. }
| Error::InvalidTableName { .. }
| Error::InvalidViewName { .. }
@@ -872,7 +873,6 @@ impl ErrorExt for Error {
| Error::FindNewColumnsOnInsertion { source, .. } => source.status_code(),
Error::ExecuteStatement { source, .. }
| Error::GetSchema { source, .. }
| Error::ExtractTableNames { source, .. }
| Error::PlanStatement { source, .. }
| Error::ParseQuery { source, .. }

View File

@@ -42,9 +42,9 @@ use common_query::Output;
use common_telemetry::tracing;
use common_time::range::TimestampRange;
use common_time::Timestamp;
use datafusion_expr::LogicalPlan;
use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef};
use query::parser::QueryStatement;
use query::plan::LogicalPlan;
use query::QueryEngineRef;
use session::context::QueryContextRef;
use session::table_name::table_idents_to_full_name;

View File

@@ -29,7 +29,6 @@ use datafusion::datasource::DefaultTableSource;
use datafusion_common::TableReference as DfTableReference;
use datafusion_expr::LogicalPlanBuilder;
use object_store::ObjectStore;
use query::plan::LogicalPlan;
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt};
use table::requests::CopyTableRequest;
@@ -133,7 +132,7 @@ impl StatementExecutor {
let output = self
.query_engine
.execute(LogicalPlan::DfPlan(plan), query_ctx)
.execute(plan, query_ctx)
.await
.context(ExecLogicalPlanSnafu)?;
let stream = match output.data {

View File

@@ -37,7 +37,7 @@ use common_query::Output;
use common_telemetry::{debug, info, tracing};
use common_time::Timezone;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::RawSchema;
use datatypes::schema::{RawSchema, Schema};
use datatypes::value::Value;
use lazy_static::lazy_static;
use partition::expr::{Operand, PartitionExpr, RestrictedOp};
@@ -69,11 +69,11 @@ use table::TableRef;
use super::StatementExecutor;
use crate::error::{
self, AlterExprToRequestSnafu, CatalogSnafu, ColumnDataTypeSnafu, ColumnNotFoundSnafu,
CreateLogicalTablesSnafu, CreateTableInfoSnafu, DeserializePartitionSnafu, EmptyDdlExprSnafu,
ExtractTableNamesSnafu, FlowNotFoundSnafu, InvalidPartitionRuleSnafu, InvalidPartitionSnafu,
InvalidTableNameSnafu, InvalidViewNameSnafu, InvalidViewStmtSnafu, ParseSqlValueSnafu, Result,
SchemaInUseSnafu, SchemaNotFoundSnafu, SchemaReadOnlySnafu, SubstraitCodecSnafu,
TableAlreadyExistsSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu,
ConvertSchemaSnafu, CreateLogicalTablesSnafu, CreateTableInfoSnafu, DeserializePartitionSnafu,
EmptyDdlExprSnafu, ExtractTableNamesSnafu, FlowNotFoundSnafu, InvalidPartitionRuleSnafu,
InvalidPartitionSnafu, InvalidTableNameSnafu, InvalidViewNameSnafu, InvalidViewStmtSnafu,
ParseSqlValueSnafu, Result, SchemaInUseSnafu, SchemaNotFoundSnafu, SchemaReadOnlySnafu,
SubstraitCodecSnafu, TableAlreadyExistsSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu,
UnrecognizedTableOptionSnafu, ViewAlreadyExistsSnafu,
};
use crate::expr_factory;
@@ -406,9 +406,12 @@ impl StatementExecutor {
// Save the columns in plan, it may changed when the schemas of tables in plan
// are altered.
let plan_columns: Vec<_> = logical_plan
let schema: Schema = logical_plan
.schema()
.context(error::GetSchemaSnafu)?
.clone()
.try_into()
.context(ConvertSchemaSnafu)?;
let plan_columns: Vec<_> = schema
.column_schemas()
.iter()
.map(|c| c.name.clone())
@@ -434,9 +437,8 @@ impl StatementExecutor {
// Extract the table names from the original plan
// and rewrite them as fully qualified names.
let (table_names, plan) =
extract_and_rewrite_full_table_names(logical_plan.unwrap_df_plan(), ctx.clone())
.context(ExtractTableNamesSnafu)?;
let (table_names, plan) = extract_and_rewrite_full_table_names(logical_plan, ctx.clone())
.context(ExtractTableNamesSnafu)?;
let table_names = table_names.into_iter().map(|t| t.into()).collect();

View File

@@ -16,11 +16,11 @@ use std::collections::HashMap;
use common_query::Output;
use common_telemetry::tracing;
use datafusion_expr::LogicalPlan;
use query::parser::{
PromQuery, QueryLanguageParser, ANALYZE_NODE_NAME, ANALYZE_VERBOSE_NODE_NAME,
DEFAULT_LOOKBACK_STRING, EXPLAIN_NODE_NAME, EXPLAIN_VERBOSE_NODE_NAME,
};
use query::plan::LogicalPlan;
use session::context::QueryContextRef;
use snafu::ResultExt;
use sql::statements::tql::Tql;

View File

@@ -26,7 +26,7 @@ use common_telemetry::{debug, info};
use common_time::timestamp::{TimeUnit, Timestamp};
use datafusion::logical_expr::col;
use datafusion_common::{TableReference, ToDFSchema};
use datafusion_expr::{DmlStatement, LogicalPlan as DfLogicalPlan};
use datafusion_expr::{DmlStatement, LogicalPlan};
use datatypes::prelude::ScalarVector;
use datatypes::timestamp::TimestampNanosecond;
use datatypes::vectors::{StringVector, TimestampNanosecondVector, Vector};
@@ -34,7 +34,6 @@ use moka::sync::Cache;
use operator::insert::InserterRef;
use operator::statement::StatementExecutorRef;
use query::dataframe::DataFrame;
use query::plan::LogicalPlan;
use query::QueryEngineRef;
use session::context::{QueryContextBuilder, QueryContextRef};
use snafu::{ensure, OptionExt, ResultExt};
@@ -373,7 +372,7 @@ impl PipelineTable {
Arc::new(dataframe.into_parts().1),
);
let plan = LogicalPlan::DfPlan(DfLogicalPlan::Dml(stmt));
let plan = LogicalPlan::Dml(stmt);
// 4. execute dml stmt
let output = self
@@ -427,7 +426,7 @@ impl PipelineTable {
.limit(0, Some(1))
.context(BuildDfLogicalPlanSnafu)?;
let plan = LogicalPlan::DfPlan(dataframe.into_parts().1);
let plan = dataframe.into_parts().1;
let table_info = self.table.table_info();

View File

@@ -36,7 +36,7 @@ use datafusion::physical_plan::analyze::AnalyzeExec;
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::ExecutionPlan;
use datafusion_common::ResolvedTableReference;
use datafusion_expr::{DmlStatement, LogicalPlan as DfLogicalPlan, WriteOp};
use datafusion_expr::{DmlStatement, LogicalPlan as DfLogicalPlan, LogicalPlan, WriteOp};
use datatypes::prelude::VectorRef;
use datatypes::schema::Schema;
use futures_util::StreamExt;
@@ -50,14 +50,13 @@ use crate::dataframe::DataFrame;
pub use crate::datafusion::planner::DfContextProviderAdapter;
use crate::dist_plan::MergeScanLogicalPlan;
use crate::error::{
CatalogSnafu, CreateRecordBatchSnafu, DataFusionSnafu, MissingTableMutationHandlerSnafu,
MissingTimestampColumnSnafu, QueryExecutionSnafu, Result, TableMutationSnafu,
TableNotFoundSnafu, TableReadOnlySnafu, UnsupportedExprSnafu,
CatalogSnafu, ConvertSchemaSnafu, CreateRecordBatchSnafu, DataFusionSnafu,
MissingTableMutationHandlerSnafu, MissingTimestampColumnSnafu, QueryExecutionSnafu, Result,
TableMutationSnafu, TableNotFoundSnafu, TableReadOnlySnafu, UnsupportedExprSnafu,
};
use crate::executor::QueryExecutor;
use crate::metrics::{OnDone, QUERY_STAGE_ELAPSED};
use crate::physical_wrapper::PhysicalPlanWrapperRef;
use crate::plan::LogicalPlan;
use crate::planner::{DfLogicalPlanner, LogicalPlanner};
use crate::query_engine::{DescribeResult, QueryEngineContext, QueryEngineState};
use crate::{metrics, QueryEngine};
@@ -119,7 +118,7 @@ impl DatafusionQueryEngine {
let table = self.find_table(&table_name, &query_ctx).await?;
let output = self
.exec_query_plan(LogicalPlan::DfPlan((*dml.input).clone()), query_ctx.clone())
.exec_query_plan((*dml.input).clone(), query_ctx.clone())
.await?;
let mut stream = match output.data {
OutputData::RecordBatches(batches) => batches.as_stream(),
@@ -265,52 +264,48 @@ impl DatafusionQueryEngine {
logical_plan: &LogicalPlan,
) -> Result<Arc<dyn ExecutionPlan>> {
let _timer = metrics::CREATE_PHYSICAL_ELAPSED.start_timer();
match logical_plan {
LogicalPlan::DfPlan(df_plan) => {
let state = ctx.state();
let state = ctx.state();
// special handle EXPLAIN plan
if matches!(df_plan, DfLogicalPlan::Explain(_)) {
return state
.create_physical_plan(df_plan)
.await
.context(error::DatafusionSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu);
}
// analyze first
let analyzed_plan = state
.analyzer()
.execute_and_check(df_plan.clone(), state.config_options(), |_, _| {})
.context(error::DatafusionSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?;
// skip optimize for MergeScan
let optimized_plan = if let DfLogicalPlan::Extension(ext) = &analyzed_plan
&& ext.node.name() == MergeScanLogicalPlan::name()
{
analyzed_plan.clone()
} else {
state
.optimizer()
.optimize(analyzed_plan, state, |_, _| {})
.context(error::DatafusionSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?
};
let physical_plan = state
.query_planner()
.create_physical_plan(&optimized_plan, state)
.await
.context(error::DatafusionSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?;
Ok(physical_plan)
}
// special handle EXPLAIN plan
if matches!(logical_plan, DfLogicalPlan::Explain(_)) {
return state
.create_physical_plan(logical_plan)
.await
.context(error::DatafusionSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu);
}
// analyze first
let analyzed_plan = state
.analyzer()
.execute_and_check(logical_plan.clone(), state.config_options(), |_, _| {})
.context(error::DatafusionSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?;
// skip optimize for MergeScan
let optimized_plan = if let DfLogicalPlan::Extension(ext) = &analyzed_plan
&& ext.node.name() == MergeScanLogicalPlan::name()
{
analyzed_plan.clone()
} else {
state
.optimizer()
.optimize(analyzed_plan, state, |_, _| {})
.context(error::DatafusionSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?
};
let physical_plan = state
.query_planner()
.create_physical_plan(&optimized_plan, state)
.await
.context(error::DatafusionSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?;
Ok(physical_plan)
}
#[tracing::instrument(skip_all)]
@@ -320,28 +315,25 @@ impl DatafusionQueryEngine {
plan: &LogicalPlan,
) -> Result<LogicalPlan> {
let _timer = metrics::OPTIMIZE_LOGICAL_ELAPSED.start_timer();
match plan {
LogicalPlan::DfPlan(df_plan) => {
// Optimized by extension rules
let optimized_plan = self
.state
.optimize_by_extension_rules(df_plan.clone(), context)
.context(error::DatafusionSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?;
// Optimized by datafusion optimizer
let optimized_plan = self
.state
.session_state()
.optimize(&optimized_plan)
.context(error::DatafusionSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?;
// Optimized by extension rules
let optimized_plan = self
.state
.optimize_by_extension_rules(plan.clone(), context)
.context(error::DatafusionSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?;
Ok(LogicalPlan::DfPlan(optimized_plan))
}
}
// Optimized by datafusion optimizer
let optimized_plan = self
.state
.session_state()
.optimize(&optimized_plan)
.context(error::DatafusionSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?;
Ok(optimized_plan)
}
#[tracing::instrument(skip_all)]
@@ -399,15 +391,25 @@ impl QueryEngine for DatafusionQueryEngine {
) -> Result<DescribeResult> {
let ctx = self.engine_context(query_ctx);
if let Ok(optimised_plan) = self.optimize(&ctx, &plan) {
let schema = optimised_plan
.schema()
.clone()
.try_into()
.context(ConvertSchemaSnafu)?;
Ok(DescribeResult {
schema: optimised_plan.schema()?,
schema,
logical_plan: optimised_plan,
})
} else {
// Table's like those in information_schema cannot be optimized when
// it contains parameters. So we fallback to original plans.
let schema = plan
.schema()
.clone()
.try_into()
.context(ConvertSchemaSnafu)?;
Ok(DescribeResult {
schema: plan.schema()?,
schema,
logical_plan: plan,
})
}
@@ -415,9 +417,7 @@ impl QueryEngine for DatafusionQueryEngine {
async fn execute(&self, plan: LogicalPlan, query_ctx: QueryContextRef) -> Result<Output> {
match plan {
LogicalPlan::DfPlan(DfLogicalPlan::Dml(dml)) => {
self.exec_dml_statement(dml, query_ctx).await
}
LogicalPlan::Dml(dml) => self.exec_dml_statement(dml, query_ctx).await,
_ => self.exec_query_plan(plan, query_ctx).await,
}
}
@@ -577,10 +577,10 @@ mod tests {
// TODO(sunng87): do not rely on to_string for compare
assert_eq!(
format!("{plan:?}"),
r#"DfPlan(Limit: skip=0, fetch=20
r#"Limit: skip=0, fetch=20
Projection: SUM(numbers.number)
Aggregate: groupBy=[[]], aggr=[[SUM(numbers.number)]]
TableScan: numbers)"#
TableScan: numbers"#
);
}

View File

@@ -108,13 +108,6 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to convert Datafusion schema"))]
ConvertDatafusionSchema {
source: datatypes::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to parse timestamp `{}`", raw))]
ParseTimestamp {
raw: String,
@@ -228,6 +221,7 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Unknown table type, downcast failed"))]
UnknownTable {
#[snafu(implicit)]
@@ -354,7 +348,6 @@ impl ErrorExt for Error {
QueryAccessDenied { .. } => StatusCode::AccessDenied,
Catalog { source, .. } => source.status_code(),
ConvertDatafusionSchema { source, .. } => source.status_code(),
CreateRecordBatch { source, .. } => source.status_code(),
QueryExecution { source, .. } | QueryPlan { source, .. } => source.status_code(),
PlanSql { error, .. } => {

View File

@@ -12,108 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::{HashMap, HashSet};
use std::fmt::{Debug, Display};
use std::collections::HashSet;
use common_query::prelude::ScalarValue;
use datafusion::datasource::DefaultTableSource;
use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRewriter};
use datafusion_common::{ParamValues, TableReference};
use datafusion_common::TableReference;
use datafusion_expr::LogicalPlan as DfLogicalPlan;
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::Schema;
use session::context::QueryContextRef;
use snafu::ResultExt;
pub use table::metadata::TableType;
use table::table::adapter::DfTableProviderAdapter;
use table::table_name::TableName;
use crate::error::{ConvertDatafusionSchemaSnafu, DataFusionSnafu, Result};
/// A LogicalPlan represents the different types of relational
/// operators (such as Projection, Filter, etc) and can be created by
/// the SQL query planner.
///
/// A LogicalPlan represents transforming an input relation (table) to
/// an output relation (table) with a (potentially) different
/// schema. A plan represents a dataflow tree where data flows
/// from leaves up to the root to produce the query result.
#[derive(Clone, Debug)]
pub enum LogicalPlan {
DfPlan(DfLogicalPlan),
}
impl LogicalPlan {
/// Get the schema for this logical plan
pub fn schema(&self) -> Result<Schema> {
match self {
Self::DfPlan(plan) => {
let df_schema = plan.schema();
df_schema
.clone()
.try_into()
.context(ConvertDatafusionSchemaSnafu)
}
}
}
/// Return a `format`able structure that produces a single line
/// per node. For example:
///
/// ```text
/// Projection: employee.id
/// Filter: employee.state Eq Utf8(\"CO\")\
/// CsvScan: employee projection=Some([0, 3])
/// ```
pub fn display_indent(&self) -> impl Display + '_ {
let LogicalPlan::DfPlan(plan) = self;
plan.display_indent()
}
/// Walk the logical plan, find any `PlaceHolder` tokens,
/// and return a map of their IDs and ConcreteDataTypes
pub fn get_param_types(&self) -> Result<HashMap<String, Option<ConcreteDataType>>> {
let LogicalPlan::DfPlan(plan) = self;
let types = plan.get_parameter_types().context(DataFusionSnafu)?;
Ok(types
.into_iter()
.map(|(k, v)| (k, v.map(|v| ConcreteDataType::from_arrow_type(&v))))
.collect())
}
/// Return a logical plan with all placeholders/params (e.g $1 $2,
/// ...) replaced with corresponding values provided in the
/// params_values
pub fn replace_params_with_values(&self, values: &[ScalarValue]) -> Result<LogicalPlan> {
let LogicalPlan::DfPlan(plan) = self;
plan.clone()
.replace_params_with_values(&ParamValues::List(values.to_vec()))
.context(DataFusionSnafu)
.map(LogicalPlan::DfPlan)
}
/// Unwrap the logical plan into a DataFusion logical plan
pub fn unwrap_df_plan(self) -> DfLogicalPlan {
match self {
LogicalPlan::DfPlan(plan) => plan,
}
}
/// Returns the DataFusion logical plan reference
pub fn df_plan(&self) -> &DfLogicalPlan {
match self {
LogicalPlan::DfPlan(plan) => plan,
}
}
}
impl From<DfLogicalPlan> for LogicalPlan {
fn from(plan: DfLogicalPlan) -> Self {
Self::DfPlan(plan)
}
}
use crate::error::{DataFusionSnafu, Result};
struct TableNamesExtractAndRewriter {
pub(crate) table_names: HashSet<TableName>,

View File

@@ -22,7 +22,7 @@ use common_telemetry::tracing;
use datafusion::common::DFSchema;
use datafusion::execution::context::SessionState;
use datafusion::sql::planner::PlannerContext;
use datafusion_expr::Expr as DfExpr;
use datafusion_expr::{Expr as DfExpr, LogicalPlan};
use datafusion_sql::planner::{ParserOptions, SqlToRel};
use promql_parser::parser::EvalStmt;
use session::context::QueryContextRef;
@@ -32,7 +32,6 @@ use sql::statements::statement::Statement;
use crate::error::{DataFusionSnafu, PlanSqlSnafu, QueryPlanSnafu, Result, SqlSnafu};
use crate::parser::QueryStatement;
use crate::plan::LogicalPlan;
use crate::promql::planner::PromPlanner;
use crate::query_engine::{DefaultPlanDecoder, QueryEngineState};
use crate::range_select::plan_rewrite::RangePlanRewriter;
@@ -109,7 +108,7 @@ impl DfLogicalPlanner {
.optimize_by_extension_rules(plan, &context)
.context(DataFusionSnafu)?;
Ok(LogicalPlan::DfPlan(plan))
Ok(plan)
}
/// Generate a relational expression from a SQL expression
@@ -160,7 +159,6 @@ impl DfLogicalPlanner {
);
PromPlanner::stmt_to_plan(table_provider, stmt, &self.session_state)
.await
.map(LogicalPlan::DfPlan)
.map_err(BoxedError::new)
.context(QueryPlanSnafu)
}
@@ -168,7 +166,7 @@ impl DfLogicalPlanner {
#[tracing::instrument(skip_all)]
fn optimize_logical_plan(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
self.engine_state
.optimize_logical_plan(plan.unwrap_df_plan())
.optimize_logical_plan(plan)
.context(DataFusionSnafu)
.map(Into::into)
}

View File

@@ -30,6 +30,7 @@ use common_function::handlers::{
use common_function::scalars::aggregate::AggregateFunctionMetaRef;
use common_query::prelude::ScalarUdf;
use common_query::Output;
use datafusion_expr::LogicalPlan;
use datatypes::schema::Schema;
pub use default_serializer::{DefaultPlanDecoder, DefaultSerializer};
use session::context::QueryContextRef;
@@ -38,7 +39,6 @@ use table::TableRef;
use crate::dataframe::DataFrame;
use crate::datafusion::DatafusionQueryEngine;
use crate::error::Result;
use crate::plan::LogicalPlan;
use crate::planner::LogicalPlanner;
pub use crate::query_engine::context::QueryEngineContext;
pub use crate::query_engine::state::QueryEngineState;

View File

@@ -556,7 +556,6 @@ mod test {
use super::*;
use crate::parser::QueryLanguageParser;
use crate::plan::LogicalPlan as GreptimeLogicalPlan;
use crate::{QueryEngineFactory, QueryEngineRef};
async fn create_test_engine() -> QueryEngineRef {
@@ -611,14 +610,14 @@ mod test {
QueryEngineFactory::new(catalog_list, None, None, None, None, false).query_engine()
}
async fn do_query(sql: &str) -> Result<crate::plan::LogicalPlan> {
async fn do_query(sql: &str) -> Result<LogicalPlan> {
let stmt = QueryLanguageParser::parse_sql(sql, &QueryContext::arc()).unwrap();
let engine = create_test_engine().await;
engine.planner().plan(stmt, QueryContext::arc()).await
}
async fn query_plan_compare(sql: &str, expected: String) {
let GreptimeLogicalPlan::DfPlan(plan) = do_query(sql).await.unwrap();
let plan = do_query(sql).await.unwrap();
assert_eq!(plan.display_indent_schema().to_string(), expected);
}

View File

@@ -35,7 +35,6 @@ use table::test_util::MemTable;
use crate::error::{QueryExecutionSnafu, Result};
use crate::parser::QueryLanguageParser;
use crate::plan::LogicalPlan;
use crate::query_engine::options::QueryOptions;
use crate::query_engine::QueryEngineFactory;
use crate::tests::exec_selection;
@@ -64,18 +63,16 @@ async fn test_datafusion_query_engine() -> Result<()> {
let limit = 10;
let table_provider = Arc::new(DfTableProviderAdapter::new(table.clone()));
let plan = LogicalPlan::DfPlan(
LogicalPlanBuilder::scan(
"numbers",
Arc::new(DefaultTableSource { table_provider }),
None,
)
.unwrap()
.limit(0, Some(limit))
.unwrap()
.build()
.unwrap(),
);
let plan = LogicalPlanBuilder::scan(
"numbers",
Arc::new(DefaultTableSource { table_provider }),
None,
)
.unwrap()
.limit(0, Some(limit))
.unwrap()
.build()
.unwrap();
let output = engine.execute(plan, QueryContext::arc()).await?;

View File

@@ -33,7 +33,6 @@ use datafusion_common::TableReference;
use datafusion_expr::LogicalPlanBuilder;
use datatypes::prelude::ScalarVector;
use datatypes::vectors::{StringVector, Vector};
use query::plan::LogicalPlan;
use query::QueryEngineRef;
use servers::query_handler::grpc::GrpcQueryHandlerRef;
use session::context::{QueryContextBuilder, QueryContextRef};
@@ -224,7 +223,7 @@ impl<E: ErrorExt + Send + Sync + 'static> ScriptsTable<E> {
let output = self
.query_engine
.execute(LogicalPlan::DfPlan(plan), query_ctx(&table_info))
.execute(plan, query_ctx(&table_info))
.await
.context(ExecuteInternalStatementSnafu)?;
let stream = match output.data {
@@ -279,7 +278,7 @@ impl<E: ErrorExt + Send + Sync + 'static> ScriptsTable<E> {
.context(BuildDfLogicalPlanSnafu)?;
let output = query_engine
.execute(LogicalPlan::DfPlan(plan), query_ctx(&table_info))
.execute(plan, query_ctx(&table_info))
.await
.context(ExecuteInternalStatementSnafu)?;
let stream = match output.data {

View File

@@ -46,6 +46,7 @@ common-version = { workspace = true, features = ["codec"] }
dashmap.workspace = true
datafusion.workspace = true
datafusion-common.workspace = true
datafusion-expr.workspace = true
datatypes.workspace = true
derive_builder.workspace = true
futures = "0.3"

View File

@@ -398,13 +398,6 @@ pub enum Error {
source: query::error::Error,
},
#[snafu(display("Failed to get param types"))]
GetPreparedStmtParams {
source: query::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("{}", reason))]
UnexpectedResult {
reason: String,
@@ -452,13 +445,6 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to replace params with values in prepared statement"))]
ReplacePreparedStmtParams {
source: query::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to convert scalar value"))]
ConvertScalarValue {
source: datatypes::error::Error,
@@ -635,9 +621,7 @@ impl ErrorExt for Error {
InvalidUtf8Value { .. } => StatusCode::InvalidArguments,
ReplacePreparedStmtParams { source, .. }
| GetPreparedStmtParams { source, .. }
| ParsePromQL { source, .. } => source.status_code(),
ParsePromQL { source, .. } => source.status_code(),
Other { source, .. } => source.status_code(),
UnexpectedResult { .. } => StatusCode::Unexpected,

View File

@@ -969,11 +969,11 @@ mod test {
use axum::routing::get;
use common_query::Output;
use common_recordbatch::RecordBatches;
use datafusion_expr::LogicalPlan;
use datatypes::prelude::*;
use datatypes::schema::{ColumnSchema, Schema};
use datatypes::vectors::{StringVector, UInt32Vector};
use query::parser::PromQuery;
use query::plan::LogicalPlan;
use query::query_engine::DescribeResult;
use session::context::QueryContextRef;
use tokio::sync::mpsc;

View File

@@ -21,8 +21,8 @@ use api::v1::RowInsertRequests;
use async_trait::async_trait;
use common_error::ext::ErrorExt;
use common_query::Output;
use datafusion_expr::LogicalPlan;
use query::parser::PromQuery;
use query::plan::LogicalPlan;
use serde_json::Value;
use session::context::QueryContextRef;
use sql::statements::statement::Statement;

View File

@@ -18,8 +18,8 @@
#![feature(let_chains)]
#![feature(if_let_guard)]
use datafusion_expr::LogicalPlan;
use datatypes::schema::Schema;
use query::plan::LogicalPlan;
pub mod addrs;
pub mod configurator;

View File

@@ -25,6 +25,8 @@ use common_catalog::parse_optional_catalog_and_schema_from_db_string;
use common_error::ext::ErrorExt;
use common_query::Output;
use common_telemetry::{debug, error, tracing, warn};
use datafusion_common::ParamValues;
use datafusion_expr::LogicalPlan;
use datatypes::prelude::ConcreteDataType;
use itertools::Itertools;
use opensrv_mysql::{
@@ -32,7 +34,6 @@ use opensrv_mysql::{
StatementMetaWriter, ValueInner,
};
use parking_lot::RwLock;
use query::plan::LogicalPlan;
use query::query_engine::DescribeResult;
use rand::RngCore;
use session::context::{Channel, QueryContextRef};
@@ -43,7 +44,7 @@ use sql::parser::{ParseOptions, ParserContext};
use sql::statements::statement::Statement;
use tokio::io::AsyncWrite;
use crate::error::{self, InvalidPrepareStatementSnafu, Result};
use crate::error::{self, DataFrameSnafu, InvalidPrepareStatementSnafu, Result};
use crate::metrics::METRIC_AUTH_FAILURE;
use crate::mysql::helper::{
self, format_placeholder, replace_placeholders, transform_placeholders,
@@ -175,8 +176,11 @@ impl MysqlInstanceShim {
let params = if let Some(plan) = &plan {
prepared_params(
&plan
.get_param_types()
.context(error::GetPreparedStmtParamsSnafu)?,
.get_parameter_types()
.context(DataFrameSnafu)?
.into_iter()
.map(|(k, v)| (k, v.map(|v| ConcreteDataType::from_arrow_type(&v))))
.collect(),
)?
} else {
dummy_params(param_num)?
@@ -323,8 +327,11 @@ impl<W: AsyncWrite + Send + Sync + Unpin> AsyncMysqlShim<W> for MysqlInstanceShi
let outputs = match sql_plan.plan {
Some(plan) => {
let param_types = plan
.get_param_types()
.context(error::GetPreparedStmtParamsSnafu)?;
.get_parameter_types()
.context(DataFrameSnafu)?
.into_iter()
.map(|(k, v)| (k, v.map(|v| ConcreteDataType::from_arrow_type(&v))))
.collect::<HashMap<_, _>>();
if params.len() != param_types.len() {
return error::InternalSnafu {
@@ -436,8 +443,11 @@ impl<W: AsyncWrite + Send + Sync + Unpin> AsyncMysqlShim<W> for MysqlInstanceShi
let outputs = match sql_plan.plan {
Some(plan) => {
let param_types = plan
.get_param_types()
.context(error::GetPreparedStmtParamsSnafu)?;
.get_parameter_types()
.context(DataFrameSnafu)?
.into_iter()
.map(|(k, v)| (k, v.map(|v| ConcreteDataType::from_arrow_type(&v))))
.collect::<HashMap<_, _>>();
if params.len() != param_types.len() {
writer
@@ -618,8 +628,9 @@ fn replace_params_with_values(
}
}
plan.replace_params_with_values(&values)
.context(error::ReplacePreparedStmtParamsSnafu)
plan.clone()
.replace_params_with_values(&ParamValues::List(values.clone()))
.context(DataFrameSnafu)
}
fn replace_params_with_exprs(
@@ -645,8 +656,9 @@ fn replace_params_with_exprs(
}
}
plan.replace_params_with_values(&values)
.context(error::ReplacePreparedStmtParamsSnafu)
plan.clone()
.replace_params_with_values(&ParamValues::List(values.clone()))
.context(DataFrameSnafu)
}
async fn validate_query(query: &str) -> Result<Statement> {

View File

@@ -20,6 +20,8 @@ use common_query::{Output, OutputData};
use common_recordbatch::error::Result as RecordBatchResult;
use common_recordbatch::RecordBatch;
use common_telemetry::{debug, error, tracing};
use datafusion_common::ParamValues;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::SchemaRef;
use futures::{future, stream, Stream, StreamExt};
use pgwire::api::portal::{Format, Portal};
@@ -272,7 +274,10 @@ impl ExtendedQueryHandler for PostgresServerHandler {
let output = if let Some(plan) = &sql_plan.plan {
let plan = plan
.replace_params_with_values(parameters_to_scalar_values(plan, portal)?.as_ref())
.clone()
.replace_params_with_values(&ParamValues::List(parameters_to_scalar_values(
plan, portal,
)?))
.map_err(|e| PgWireError::ApiError(Box::new(e)))?;
self.query_handler
.do_exec_plan(plan, query_ctx.clone())
@@ -306,8 +311,11 @@ impl ExtendedQueryHandler for PostgresServerHandler {
let sql_plan = &stmt.statement;
let (param_types, sql_plan, format) = if let Some(plan) = &sql_plan.plan {
let param_types = plan
.get_param_types()
.map_err(|e| PgWireError::ApiError(Box::new(e)))?;
.get_parameter_types()
.map_err(|e| PgWireError::ApiError(Box::new(e)))?
.into_iter()
.map(|(k, v)| (k, v.map(|v| ConcreteDataType::from_arrow_type(&v))))
.collect();
let types = param_types_to_pg_types(&param_types)
.map_err(|e| PgWireError::ApiError(Box::new(e)))?;

View File

@@ -23,6 +23,7 @@ use std::ops::Deref;
use chrono::{NaiveDate, NaiveDateTime};
use common_time::Interval;
use datafusion_common::ScalarValue;
use datafusion_expr::LogicalPlan;
use datatypes::prelude::{ConcreteDataType, Value};
use datatypes::schema::Schema;
use datatypes::types::TimestampType;
@@ -30,7 +31,6 @@ use pgwire::api::portal::{Format, Portal};
use pgwire::api::results::{DataRowEncoder, FieldInfo};
use pgwire::api::Type;
use pgwire::error::{PgWireError, PgWireResult};
use query::plan::LogicalPlan;
use session::context::QueryContextRef;
use session::session_config::PGByteaOutputValue;
@@ -276,8 +276,11 @@ pub(super) fn parameters_to_scalar_values(
let client_param_types = &portal.statement.parameter_types;
let param_types = plan
.get_param_types()
.map_err(|e| PgWireError::ApiError(Box::new(e)))?;
.get_parameter_types()
.map_err(|e| PgWireError::ApiError(Box::new(e)))?
.into_iter()
.map(|(k, v)| (k, v.map(|v| ConcreteDataType::from_arrow_type(&v))))
.collect::<HashMap<_, _>>();
for idx in 0..param_count {
let server_type = param_types

View File

@@ -28,10 +28,10 @@ use common_telemetry::tracing;
use common_time::timestamp::TimeUnit;
use datafusion::prelude::{col, lit, regexp_match, Expr};
use datafusion_common::ScalarValue;
use datafusion_expr::LogicalPlan;
use datatypes::prelude::{ConcreteDataType, Value};
use openmetrics_parser::{MetricsExposition, PrometheusType, PrometheusValue};
use query::dataframe::DataFrame;
use query::plan::LogicalPlan;
use snafu::{ensure, OptionExt, ResultExt};
use snap::raw::{Decoder, Encoder};
@@ -123,7 +123,7 @@ pub fn query_to_plan(dataframe: DataFrame, q: &Query) -> Result<LogicalPlan> {
.filter(conditions)
.context(error::DataFrameSnafu)?;
Ok(LogicalPlan::DfPlan(dataframe.into_parts().1))
Ok(dataframe.into_parts().1)
}
#[inline]

View File

@@ -17,8 +17,8 @@ use std::sync::Arc;
use async_trait::async_trait;
use common_error::ext::{BoxedError, ErrorExt};
use common_query::Output;
use datafusion_expr::LogicalPlan;
use query::parser::PromQuery;
use query::plan::LogicalPlan;
use session::context::QueryContextRef;
use snafu::ResultExt;
use sql::statements::statement::Statement;

View File

@@ -21,8 +21,8 @@ use auth::tests::{DatabaseAuthInfo, MockUserProvider};
use axum::{http, Router};
use common_query::Output;
use common_test_util::ports;
use datafusion_expr::LogicalPlan;
use query::parser::PromQuery;
use query::plan::LogicalPlan;
use query::query_engine::DescribeResult;
use servers::error::{Error, Result};
use servers::http::header::constants::GREPTIME_DB_HEADER_NAME;

View File

@@ -19,8 +19,8 @@ use async_trait::async_trait;
use axum::Router;
use common_query::Output;
use common_test_util::ports;
use datafusion_expr::LogicalPlan;
use query::parser::PromQuery;
use query::plan::LogicalPlan;
use query::query_engine::DescribeResult;
use servers::error::{self, Result};
use servers::http::test_helpers::TestClient;

View File

@@ -23,9 +23,9 @@ use async_trait::async_trait;
use axum::Router;
use common_query::Output;
use common_test_util::ports;
use datafusion_expr::LogicalPlan;
use prost::Message;
use query::parser::PromQuery;
use query::plan::LogicalPlan;
use query::query_engine::DescribeResult;
use servers::error::{Error, Result};
use servers::http::header::{CONTENT_ENCODING_SNAPPY, CONTENT_TYPE_PROTOBUF};

View File

@@ -21,8 +21,8 @@ use async_trait::async_trait;
use catalog::memory::MemoryCatalogManager;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_query::Output;
use datafusion_expr::LogicalPlan;
use query::parser::{PromQuery, QueryLanguageParser, QueryStatement};
use query::plan::LogicalPlan;
use query::query_engine::DescribeResult;
use query::{QueryEngineFactory, QueryEngineRef};
use script::engine::{CompileContext, EvalContext, Script, ScriptEngine};

View File

@@ -33,7 +33,6 @@ mod test {
use common_recordbatch::RecordBatches;
use frontend::instance::Instance;
use query::parser::QueryLanguageParser;
use query::plan::LogicalPlan;
use query::query_engine::DefaultSerializer;
use servers::query_handler::grpc::GrpcQueryHandler;
use session::context::QueryContext;
@@ -540,7 +539,7 @@ CREATE TABLE {table_name} (
&QueryContext::arc(),
)
.unwrap();
let LogicalPlan::DfPlan(plan) = instance
let plan = instance
.frontend()
.statement_executor()
.plan(stmt, QueryContext::arc())

View File

@@ -28,10 +28,10 @@ mod tests {
use common_query::Output;
use common_recordbatch::RecordBatches;
use common_telemetry::debug;
use datafusion_expr::LogicalPlan;
use frontend::error::{self, Error, Result};
use frontend::instance::Instance;
use query::parser::QueryLanguageParser;
use query::plan::LogicalPlan;
use query::query_engine::DefaultSerializer;
use servers::interceptor::{SqlQueryInterceptor, SqlQueryInterceptorRef};
use servers::query_handler::sql::SqlQueryHandler;
@@ -233,7 +233,7 @@ mod tests {
&QueryContext::arc(),
)
.unwrap();
let LogicalPlan::DfPlan(plan) = instance
let plan = instance
.frontend()
.statement_executor()
.plan(stmt, QueryContext::arc())
@@ -317,7 +317,7 @@ mod tests {
fn pre_execute(
&self,
_statement: &Statement,
_plan: Option<&query::plan::LogicalPlan>,
_plan: Option<&LogicalPlan>,
_query_ctx: QueryContextRef,
) -> Result<()> {
let _ = self.c.fetch_add(1, std::sync::atomic::Ordering::Relaxed);