From fa971c6513d460b020eed30c09203d9d3a340fc7 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Tue, 13 Dec 2022 17:44:37 +0800 Subject: [PATCH] fix: errors in optimzer --- src/query/src/optimizer.rs | 59 ++++++++++++++++++++++---------------- 1 file changed, 35 insertions(+), 24 deletions(-) diff --git a/src/query/src/optimizer.rs b/src/query/src/optimizer.rs index c50046cd51..b81e494ebd 100644 --- a/src/query/src/optimizer.rs +++ b/src/query/src/optimizer.rs @@ -16,12 +16,13 @@ use std::str::FromStr; use std::sync::Arc; use common_time::timestamp::{TimeUnit, Timestamp}; -use datafusion::execution::context::ExecutionProps; -use datafusion_expr::{Expr, ExprSchemable, LogicalPlan, Operator, TableScan}; -use datafusion_expr::expr_rewriter::{ExprRewriter, ExprRewritable}; use datafusion::optimizer::optimizer::OptimizerRule; -use datafusion::optimizer::utils; +use datafusion::optimizer::OptimizerConfig; use datafusion_common::{DFSchemaRef, DataFusionError, Result, ScalarValue}; +use datafusion_expr::expr_rewriter::{ExprRewritable, ExprRewriter}; +use datafusion_expr::{ + Between, BinaryExpr, Expr, ExprSchemable, Filter, LogicalPlan, Operator, TableScan, +}; use datatypes::arrow::compute; use datatypes::arrow::datatypes::DataType; @@ -36,24 +37,27 @@ impl OptimizerRule for TypeConversionRule { fn optimize( &self, plan: &LogicalPlan, - execution_props: &ExecutionProps, + optimizer_config: &mut OptimizerConfig, ) -> Result { let mut converter = TypeConverter { schemas: plan.all_schemas(), }; match plan { - LogicalPlan::Filter(Filter { predicate, input }) => Ok(LogicalPlan::Filter(Filter { - predicate: predicate.clone().rewrite(&mut converter)?, - input: Arc::new(self.optimize(input, execution_props)?), - })), + LogicalPlan::Filter(Filter { predicate, input }) => Ok(LogicalPlan::Filter( + Filter::try_new( + predicate.clone().rewrite(&mut converter)?, + Arc::new(self.optimize(input, optimizer_config)?), + ) + .unwrap(), + )), LogicalPlan::TableScan(TableScan { table_name, source, projection, projected_schema, filters, - limit, + fetch, }) => { let rewrite_filters = filters .clone() @@ -66,7 +70,7 @@ impl OptimizerRule for TypeConversionRule { projection: projection.clone(), projected_schema: projected_schema.clone(), filters: rewrite_filters, - limit: *limit, + fetch: *fetch, })) } LogicalPlan::Projection { .. } @@ -83,12 +87,15 @@ impl OptimizerRule for TypeConversionRule { | LogicalPlan::CrossJoin { .. } | LogicalPlan::CreateMemoryTable { .. } | LogicalPlan::DropTable { .. } + | LogicalPlan::DropView { .. } + | LogicalPlan::Distinct { .. } | LogicalPlan::Values { .. } + | LogicalPlan::SetVariable { .. } | LogicalPlan::Analyze { .. } => { let inputs = plan.inputs(); let new_inputs = inputs .iter() - .map(|plan| self.optimize(plan, execution_props)) + .map(|plan| self.optimize(plan, optimizer_config)) .collect::>>()?; let expr = plan @@ -97,10 +104,15 @@ impl OptimizerRule for TypeConversionRule { .map(|e| e.rewrite(&mut converter)) .collect::>>()?; - utils::from_plan(plan, &expr, &new_inputs) + datafusion_expr::utils::from_plan(plan, &expr, &new_inputs) } - LogicalPlan::EmptyRelation { .. } => Ok(plan.clone()), + LogicalPlan::Subquery { .. } + | LogicalPlan::SubqueryAlias { .. } + | LogicalPlan::CreateView { .. } + | LogicalPlan::CreateCatalogSchema { .. } + | LogicalPlan::CreateCatalog { .. } + | LogicalPlan::EmptyRelation { .. } => Ok(plan.clone()), } } @@ -136,8 +148,7 @@ impl<'a> TypeConverter<'a> { (target_type, value) => { let value_arr = value.to_array(); let arr = - compute::cast(&value_arr, target_type) - .map_err(DataFusionError::ArrowError)?; + compute::cast(&value_arr, target_type).map_err(DataFusionError::ArrowError)?; ScalarValue::try_from_array( &Arc::from(arr), // index: Converts a value in `array` at `index` into a ScalarValue @@ -185,7 +196,7 @@ impl<'a> TypeConverter<'a> { impl<'a> ExprRewriter for TypeConverter<'a> { fn mutate(&mut self, expr: Expr) -> Result { let new_expr = match expr { - Expr::BinaryExpr { left, op, right } => match op { + Expr::BinaryExpr(BinaryExpr { left, op, right }) => match op { Operator::Eq | Operator::NotEq | Operator::Lt @@ -193,28 +204,28 @@ impl<'a> ExprRewriter for TypeConverter<'a> { | Operator::Gt | Operator::GtEq => { let (left, right) = self.convert_type(&left, &right)?; - Expr::BinaryExpr { + Expr::BinaryExpr(BinaryExpr { left: Box::new(left), op, right: Box::new(right), - } + }) } - _ => Expr::BinaryExpr { left, op, right }, + _ => Expr::BinaryExpr(BinaryExpr { left, op, right }), }, - Expr::Between { + Expr::Between(Between { expr, negated, low, high, - } => { + }) => { let (expr, low) = self.convert_type(&expr, &low)?; let (expr, high) = self.convert_type(&expr, &high)?; - Expr::Between { + Expr::Between(Between { expr: Box::new(expr), negated, low: Box::new(low), high: Box::new(high), - } + }) } Expr::InList { expr,