fix: errors in optimzer

This commit is contained in:
Lei, HUANG
2022-12-13 17:44:37 +08:00
parent 36c929e1a7
commit fa971c6513

View File

@@ -16,12 +16,13 @@ use std::str::FromStr;
use std::sync::Arc;
use common_time::timestamp::{TimeUnit, Timestamp};
use datafusion::execution::context::ExecutionProps;
use datafusion_expr::{Expr, ExprSchemable, LogicalPlan, Operator, TableScan};
use datafusion_expr::expr_rewriter::{ExprRewriter, ExprRewritable};
use datafusion::optimizer::optimizer::OptimizerRule;
use datafusion::optimizer::utils;
use datafusion::optimizer::OptimizerConfig;
use datafusion_common::{DFSchemaRef, DataFusionError, Result, ScalarValue};
use datafusion_expr::expr_rewriter::{ExprRewritable, ExprRewriter};
use datafusion_expr::{
Between, BinaryExpr, Expr, ExprSchemable, Filter, LogicalPlan, Operator, TableScan,
};
use datatypes::arrow::compute;
use datatypes::arrow::datatypes::DataType;
@@ -36,24 +37,27 @@ impl OptimizerRule for TypeConversionRule {
fn optimize(
&self,
plan: &LogicalPlan,
execution_props: &ExecutionProps,
optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
let mut converter = TypeConverter {
schemas: plan.all_schemas(),
};
match plan {
LogicalPlan::Filter(Filter { predicate, input }) => Ok(LogicalPlan::Filter(Filter {
predicate: predicate.clone().rewrite(&mut converter)?,
input: Arc::new(self.optimize(input, execution_props)?),
})),
LogicalPlan::Filter(Filter { predicate, input }) => Ok(LogicalPlan::Filter(
Filter::try_new(
predicate.clone().rewrite(&mut converter)?,
Arc::new(self.optimize(input, optimizer_config)?),
)
.unwrap(),
)),
LogicalPlan::TableScan(TableScan {
table_name,
source,
projection,
projected_schema,
filters,
limit,
fetch,
}) => {
let rewrite_filters = filters
.clone()
@@ -66,7 +70,7 @@ impl OptimizerRule for TypeConversionRule {
projection: projection.clone(),
projected_schema: projected_schema.clone(),
filters: rewrite_filters,
limit: *limit,
fetch: *fetch,
}))
}
LogicalPlan::Projection { .. }
@@ -83,12 +87,15 @@ impl OptimizerRule for TypeConversionRule {
| LogicalPlan::CrossJoin { .. }
| LogicalPlan::CreateMemoryTable { .. }
| LogicalPlan::DropTable { .. }
| LogicalPlan::DropView { .. }
| LogicalPlan::Distinct { .. }
| LogicalPlan::Values { .. }
| LogicalPlan::SetVariable { .. }
| LogicalPlan::Analyze { .. } => {
let inputs = plan.inputs();
let new_inputs = inputs
.iter()
.map(|plan| self.optimize(plan, execution_props))
.map(|plan| self.optimize(plan, optimizer_config))
.collect::<Result<Vec<_>>>()?;
let expr = plan
@@ -97,10 +104,15 @@ impl OptimizerRule for TypeConversionRule {
.map(|e| e.rewrite(&mut converter))
.collect::<Result<Vec<_>>>()?;
utils::from_plan(plan, &expr, &new_inputs)
datafusion_expr::utils::from_plan(plan, &expr, &new_inputs)
}
LogicalPlan::EmptyRelation { .. } => Ok(plan.clone()),
LogicalPlan::Subquery { .. }
| LogicalPlan::SubqueryAlias { .. }
| LogicalPlan::CreateView { .. }
| LogicalPlan::CreateCatalogSchema { .. }
| LogicalPlan::CreateCatalog { .. }
| LogicalPlan::EmptyRelation { .. } => Ok(plan.clone()),
}
}
@@ -136,8 +148,7 @@ impl<'a> TypeConverter<'a> {
(target_type, value) => {
let value_arr = value.to_array();
let arr =
compute::cast(&value_arr, target_type)
.map_err(DataFusionError::ArrowError)?;
compute::cast(&value_arr, target_type).map_err(DataFusionError::ArrowError)?;
ScalarValue::try_from_array(
&Arc::from(arr), // index: Converts a value in `array` at `index` into a ScalarValue
@@ -185,7 +196,7 @@ impl<'a> TypeConverter<'a> {
impl<'a> ExprRewriter for TypeConverter<'a> {
fn mutate(&mut self, expr: Expr) -> Result<Expr> {
let new_expr = match expr {
Expr::BinaryExpr { left, op, right } => match op {
Expr::BinaryExpr(BinaryExpr { left, op, right }) => match op {
Operator::Eq
| Operator::NotEq
| Operator::Lt
@@ -193,28 +204,28 @@ impl<'a> ExprRewriter for TypeConverter<'a> {
| Operator::Gt
| Operator::GtEq => {
let (left, right) = self.convert_type(&left, &right)?;
Expr::BinaryExpr {
Expr::BinaryExpr(BinaryExpr {
left: Box::new(left),
op,
right: Box::new(right),
}
})
}
_ => Expr::BinaryExpr { left, op, right },
_ => Expr::BinaryExpr(BinaryExpr { left, op, right }),
},
Expr::Between {
Expr::Between(Between {
expr,
negated,
low,
high,
} => {
}) => {
let (expr, low) = self.convert_type(&expr, &low)?;
let (expr, high) = self.convert_type(&expr, &high)?;
Expr::Between {
Expr::Between(Between {
expr: Box::new(expr),
negated,
low: Box::new(low),
high: Box::new(high),
}
})
}
Expr::InList {
expr,