fix: don't transform Limit in TypeConversionRule, StringNormalizationRule and DistPlannerAnalyzer (#5472)

* fix: do not transform exprs in the limit plan

* chore: keep some logs for debug

* feat: workaround for limit in other rules

* test: add sqlness tests for offset 0

* chore: add fixme
This commit is contained in:
Yingwen
2025-02-05 19:30:24 +08:00
committed by GitHub
parent 5d1761f3e5
commit 0e249f69cd
11 changed files with 129 additions and 11 deletions

View File

@@ -267,6 +267,8 @@ impl DatafusionQueryEngine {
let _timer = metrics::CREATE_PHYSICAL_ELAPSED.start_timer();
let state = ctx.state();
common_telemetry::debug!("Create physical plan, input plan: {logical_plan}");
// special handle EXPLAIN plan
if matches!(logical_plan, DfLogicalPlan::Explain(_)) {
return state
@@ -285,6 +287,8 @@ impl DatafusionQueryEngine {
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?;
common_telemetry::debug!("Create physical plan, analyzed plan: {analyzed_plan}");
// skip optimize for MergeScan
let optimized_plan = if let DfLogicalPlan::Extension(ext) = &analyzed_plan
&& ext.node.name() == MergeScanLogicalPlan::name()
@@ -299,6 +303,8 @@ impl DatafusionQueryEngine {
.context(QueryExecutionSnafu)?
};
common_telemetry::debug!("Create physical plan, optimized plan: {optimized_plan}");
let physical_plan = state
.query_planner()
.create_physical_plan(&optimized_plan, state)

View File

@@ -65,6 +65,12 @@ impl AnalyzerRule for DistPlannerAnalyzer {
impl DistPlannerAnalyzer {
fn inspect_plan_with_subquery(plan: LogicalPlan) -> DfResult<Transformed<LogicalPlan>> {
// Workaround for https://github.com/GreptimeTeam/greptimedb/issues/5469
// FIXME(yingwen): Remove this once we update DataFusion.
if let LogicalPlan::Limit(_) = &plan {
return Ok(Transformed::no(plan));
}
let exprs = plan
.expressions_consider_join()
.into_iter()

View File

@@ -29,15 +29,41 @@ pub struct StringNormalizationRule;
impl AnalyzerRule for StringNormalizationRule {
fn analyze(&self, plan: LogicalPlan, _config: &ConfigOptions) -> Result<LogicalPlan> {
plan.transform(|plan| {
let mut converter = StringNormalizationConverter;
let inputs = plan.inputs().into_iter().cloned().collect::<Vec<_>>();
let expr = plan
.expressions_consider_join()
.into_iter()
.map(|e| e.rewrite(&mut converter).map(|x| x.data))
.collect::<Result<Vec<_>>>()?;
plan.with_new_exprs(expr, inputs).map(Transformed::yes)
plan.transform(|plan| match plan {
LogicalPlan::Projection(_)
| LogicalPlan::Filter(_)
| LogicalPlan::Window(_)
| LogicalPlan::Aggregate(_)
| LogicalPlan::Sort(_)
| LogicalPlan::Join(_)
| LogicalPlan::Repartition(_)
| LogicalPlan::Union(_)
| LogicalPlan::TableScan(_)
| LogicalPlan::EmptyRelation(_)
| LogicalPlan::Subquery(_)
| LogicalPlan::SubqueryAlias(_)
| LogicalPlan::Statement(_)
| LogicalPlan::Values(_)
| LogicalPlan::Analyze(_)
| LogicalPlan::Extension(_)
| LogicalPlan::Distinct(_)
| LogicalPlan::Dml(_)
| LogicalPlan::Copy(_)
| LogicalPlan::Unnest(_)
| LogicalPlan::RecursiveQuery(_) => {
let mut converter = StringNormalizationConverter;
let inputs = plan.inputs().into_iter().cloned().collect::<Vec<_>>();
let expr = plan
.expressions_consider_join()
.into_iter()
.map(|e| e.rewrite(&mut converter).map(|x| x.data))
.collect::<Result<Vec<_>>>()?;
plan.with_new_exprs(expr, inputs).map(Transformed::yes)
}
LogicalPlan::Limit(_)
| LogicalPlan::Explain(_)
| LogicalPlan::Ddl(_)
| LogicalPlan::DescribeTable(_) => Ok(Transformed::no(plan)),
})
.map(|x| x.data)
}

View File

@@ -86,7 +86,6 @@ impl ExtensionAnalyzerRule for TypeConversionRule {
| LogicalPlan::Repartition { .. }
| LogicalPlan::Extension { .. }
| LogicalPlan::Sort { .. }
| LogicalPlan::Limit { .. }
| LogicalPlan::Union { .. }
| LogicalPlan::Join { .. }
| LogicalPlan::Distinct { .. }
@@ -106,7 +105,8 @@ impl ExtensionAnalyzerRule for TypeConversionRule {
plan.with_new_exprs(expr, inputs).map(Transformed::yes)
}
LogicalPlan::Subquery { .. }
LogicalPlan::Limit { .. }
| LogicalPlan::Subquery { .. }
| LogicalPlan::Explain { .. }
| LogicalPlan::SubqueryAlias { .. }
| LogicalPlan::EmptyRelation(_)

View File

@@ -109,6 +109,7 @@ impl DfLogicalPlanner {
let result = sql_to_rel
.statement_to_plan(df_stmt)
.context(PlanSqlSnafu)?;
common_telemetry::debug!("Logical planner, statement to plan result: {result}");
let plan = RangePlanRewriter::new(table_provider, query_ctx.clone())
.rewrite(result)
.await?;
@@ -119,6 +120,7 @@ impl DfLogicalPlanner {
.engine_state
.optimize_by_extension_rules(plan, &context)
.context(DataFusionSnafu)?;
common_telemetry::debug!("Logical planner, optimize result: {plan}");
Ok(plan)
}