From 0e249f69cd39efa9c9cba259c867e7e4ff797671 Mon Sep 17 00:00:00 2001 From: Yingwen Date: Wed, 5 Feb 2025 19:30:24 +0800 Subject: [PATCH] fix: don't transform Limit in TypeConversionRule, StringNormalizationRule and DistPlannerAnalyzer (#5472) * fix: do not transform exprs in the limit plan * chore: keep some logs for debug * feat: workaround for limit in other rules * test: add sqlness tests for offset 0 * chore: add fixme --- src/query/src/datafusion.rs | 6 +++ src/query/src/dist_plan/analyzer.rs | 6 +++ .../src/optimizer/string_normalization.rs | 44 +++++++++++++++---- src/query/src/optimizer/type_conversion.rs | 4 +- src/query/src/planner.rs | 2 + .../standalone/common/order/limit.result | 9 ++++ tests/cases/standalone/common/order/limit.sql | 2 + .../common/system/information_schema.result | 24 ++++++++++ .../common/system/information_schema.sql | 4 ++ tests/cases/standalone/limit/limit.result | 29 ++++++++++++ tests/cases/standalone/limit/limit.sql | 10 +++++ 11 files changed, 129 insertions(+), 11 deletions(-) diff --git a/src/query/src/datafusion.rs b/src/query/src/datafusion.rs index 4bb52de316..8df6296a1e 100644 --- a/src/query/src/datafusion.rs +++ b/src/query/src/datafusion.rs @@ -267,6 +267,8 @@ impl DatafusionQueryEngine { let _timer = metrics::CREATE_PHYSICAL_ELAPSED.start_timer(); let state = ctx.state(); + common_telemetry::debug!("Create physical plan, input plan: {logical_plan}"); + // special handle EXPLAIN plan if matches!(logical_plan, DfLogicalPlan::Explain(_)) { return state @@ -285,6 +287,8 @@ impl DatafusionQueryEngine { .map_err(BoxedError::new) .context(QueryExecutionSnafu)?; + common_telemetry::debug!("Create physical plan, analyzed plan: {analyzed_plan}"); + // skip optimize for MergeScan let optimized_plan = if let DfLogicalPlan::Extension(ext) = &analyzed_plan && ext.node.name() == MergeScanLogicalPlan::name() @@ -299,6 +303,8 @@ impl DatafusionQueryEngine { .context(QueryExecutionSnafu)? }; + common_telemetry::debug!("Create physical plan, optimized plan: {optimized_plan}"); + let physical_plan = state .query_planner() .create_physical_plan(&optimized_plan, state) diff --git a/src/query/src/dist_plan/analyzer.rs b/src/query/src/dist_plan/analyzer.rs index 88ee9b2c57..0ec32bcaa2 100644 --- a/src/query/src/dist_plan/analyzer.rs +++ b/src/query/src/dist_plan/analyzer.rs @@ -65,6 +65,12 @@ impl AnalyzerRule for DistPlannerAnalyzer { impl DistPlannerAnalyzer { fn inspect_plan_with_subquery(plan: LogicalPlan) -> DfResult> { + // Workaround for https://github.com/GreptimeTeam/greptimedb/issues/5469 + // FIXME(yingwen): Remove this once we update DataFusion. + if let LogicalPlan::Limit(_) = &plan { + return Ok(Transformed::no(plan)); + } + let exprs = plan .expressions_consider_join() .into_iter() diff --git a/src/query/src/optimizer/string_normalization.rs b/src/query/src/optimizer/string_normalization.rs index 7dbba9f4f7..d9e5efe7de 100644 --- a/src/query/src/optimizer/string_normalization.rs +++ b/src/query/src/optimizer/string_normalization.rs @@ -29,15 +29,41 @@ pub struct StringNormalizationRule; impl AnalyzerRule for StringNormalizationRule { fn analyze(&self, plan: LogicalPlan, _config: &ConfigOptions) -> Result { - plan.transform(|plan| { - let mut converter = StringNormalizationConverter; - let inputs = plan.inputs().into_iter().cloned().collect::>(); - let expr = plan - .expressions_consider_join() - .into_iter() - .map(|e| e.rewrite(&mut converter).map(|x| x.data)) - .collect::>>()?; - plan.with_new_exprs(expr, inputs).map(Transformed::yes) + plan.transform(|plan| match plan { + LogicalPlan::Projection(_) + | LogicalPlan::Filter(_) + | LogicalPlan::Window(_) + | LogicalPlan::Aggregate(_) + | LogicalPlan::Sort(_) + | LogicalPlan::Join(_) + | LogicalPlan::Repartition(_) + | LogicalPlan::Union(_) + | LogicalPlan::TableScan(_) + | LogicalPlan::EmptyRelation(_) + | LogicalPlan::Subquery(_) + | LogicalPlan::SubqueryAlias(_) + | LogicalPlan::Statement(_) + | LogicalPlan::Values(_) + | LogicalPlan::Analyze(_) + | LogicalPlan::Extension(_) + | LogicalPlan::Distinct(_) + | LogicalPlan::Dml(_) + | LogicalPlan::Copy(_) + | LogicalPlan::Unnest(_) + | LogicalPlan::RecursiveQuery(_) => { + let mut converter = StringNormalizationConverter; + let inputs = plan.inputs().into_iter().cloned().collect::>(); + let expr = plan + .expressions_consider_join() + .into_iter() + .map(|e| e.rewrite(&mut converter).map(|x| x.data)) + .collect::>>()?; + plan.with_new_exprs(expr, inputs).map(Transformed::yes) + } + LogicalPlan::Limit(_) + | LogicalPlan::Explain(_) + | LogicalPlan::Ddl(_) + | LogicalPlan::DescribeTable(_) => Ok(Transformed::no(plan)), }) .map(|x| x.data) } diff --git a/src/query/src/optimizer/type_conversion.rs b/src/query/src/optimizer/type_conversion.rs index 3e6041f1e5..5f8a9f6283 100644 --- a/src/query/src/optimizer/type_conversion.rs +++ b/src/query/src/optimizer/type_conversion.rs @@ -86,7 +86,6 @@ impl ExtensionAnalyzerRule for TypeConversionRule { | LogicalPlan::Repartition { .. } | LogicalPlan::Extension { .. } | LogicalPlan::Sort { .. } - | LogicalPlan::Limit { .. } | LogicalPlan::Union { .. } | LogicalPlan::Join { .. } | LogicalPlan::Distinct { .. } @@ -106,7 +105,8 @@ impl ExtensionAnalyzerRule for TypeConversionRule { plan.with_new_exprs(expr, inputs).map(Transformed::yes) } - LogicalPlan::Subquery { .. } + LogicalPlan::Limit { .. } + | LogicalPlan::Subquery { .. } | LogicalPlan::Explain { .. } | LogicalPlan::SubqueryAlias { .. } | LogicalPlan::EmptyRelation(_) diff --git a/src/query/src/planner.rs b/src/query/src/planner.rs index 75d70e5901..a122bbf008 100644 --- a/src/query/src/planner.rs +++ b/src/query/src/planner.rs @@ -109,6 +109,7 @@ impl DfLogicalPlanner { let result = sql_to_rel .statement_to_plan(df_stmt) .context(PlanSqlSnafu)?; + common_telemetry::debug!("Logical planner, statement to plan result: {result}"); let plan = RangePlanRewriter::new(table_provider, query_ctx.clone()) .rewrite(result) .await?; @@ -119,6 +120,7 @@ impl DfLogicalPlanner { .engine_state .optimize_by_extension_rules(plan, &context) .context(DataFusionSnafu)?; + common_telemetry::debug!("Logical planner, optimize result: {plan}"); Ok(plan) } diff --git a/tests/cases/standalone/common/order/limit.result b/tests/cases/standalone/common/order/limit.result index 999a3a1087..bd2fe2ce67 100644 --- a/tests/cases/standalone/common/order/limit.result +++ b/tests/cases/standalone/common/order/limit.result @@ -14,6 +14,15 @@ SELECT a FROM test LIMIT 1; | 1970-01-01T00:00:00.011 | +-------------------------+ +SELECT b FROM test ORDER BY b LIMIT 2 OFFSET 0; + ++----+ +| b | ++----+ +| 21 | +| 22 | ++----+ + SELECT a FROM test LIMIT 1.25; Error: 3001(EngineExecuteQuery), DataFusion error: Error during planning: Expected LIMIT to be an integer or null, but got Float64 diff --git a/tests/cases/standalone/common/order/limit.sql b/tests/cases/standalone/common/order/limit.sql index 660716be89..d644836066 100644 --- a/tests/cases/standalone/common/order/limit.sql +++ b/tests/cases/standalone/common/order/limit.sql @@ -4,6 +4,8 @@ INSERT INTO test VALUES (11, 22), (12, 21), (13, 22); SELECT a FROM test LIMIT 1; +SELECT b FROM test ORDER BY b LIMIT 2 OFFSET 0; + SELECT a FROM test LIMIT 1.25; SELECT a FROM test LIMIT 2-1; diff --git a/tests/cases/standalone/common/system/information_schema.result b/tests/cases/standalone/common/system/information_schema.result index 18b13f0712..fe4e4ada00 100644 --- a/tests/cases/standalone/common/system/information_schema.result +++ b/tests/cases/standalone/common/system/information_schema.result @@ -433,6 +433,30 @@ select * from information_schema.columns order by table_schema, table_name, colu | greptime | public | numbers | number | 1 | | | 10 | 0 | | | | PRI | | select,insert | | UInt32 | int unsigned | TAG | | No | int unsigned | | | +---------------+--------------------+---------------------------------------+-----------------------------------+------------------+--------------------------+------------------------+-------------------+---------------+--------------------+--------------------+----------------+------------+-------+---------------+-----------------------+----------------------+-----------------+---------------+----------------+-------------+-----------------+----------------+--------+ +select table_schema, table_name from information_schema.tables order by table_name limit 5; + ++--------------------+---------------------------------------+ +| table_schema | table_name | ++--------------------+---------------------------------------+ +| information_schema | build_info | +| information_schema | character_sets | +| information_schema | check_constraints | +| information_schema | cluster_info | +| information_schema | collation_character_set_applicability | ++--------------------+---------------------------------------+ + +select table_schema, table_name from information_schema.tables order by table_name limit 5 offset 0; + ++--------------------+---------------------------------------+ +| table_schema | table_name | ++--------------------+---------------------------------------+ +| information_schema | build_info | +| information_schema | character_sets | +| information_schema | check_constraints | +| information_schema | cluster_info | +| information_schema | collation_character_set_applicability | ++--------------------+---------------------------------------+ + create database my_db; diff --git a/tests/cases/standalone/common/system/information_schema.sql b/tests/cases/standalone/common/system/information_schema.sql index b913050e70..f5767e4cb0 100644 --- a/tests/cases/standalone/common/system/information_schema.sql +++ b/tests/cases/standalone/common/system/information_schema.sql @@ -11,6 +11,10 @@ order by table_schema, table_name; select * from information_schema.columns order by table_schema, table_name, column_name; +select table_schema, table_name from information_schema.tables order by table_name limit 5; + +select table_schema, table_name from information_schema.tables order by table_name limit 5 offset 0; + create database my_db; diff --git a/tests/cases/standalone/limit/limit.result b/tests/cases/standalone/limit/limit.result index ef2bf90a20..323b8ae78d 100644 --- a/tests/cases/standalone/limit/limit.result +++ b/tests/cases/standalone/limit/limit.result @@ -23,3 +23,32 @@ EXPLAIN SELECT * FROM (SELECT SUM(number) FROM numbers LIMIT 100000000000) WHERE | | | +---------------+----------------------------------+ +CREATE TABLE test (a TIMESTAMP TIME INDEX, b INTEGER); + +Affected Rows: 0 + +INSERT INTO test VALUES (11, 23), (12, 21), (13, 22); + +Affected Rows: 3 + +SELECT a FROM test LIMIT 1; + ++-------------------------+ +| a | ++-------------------------+ +| 1970-01-01T00:00:00.011 | ++-------------------------+ + +SELECT b FROM test ORDER BY b LIMIT 2 OFFSET 0; + ++----+ +| b | ++----+ +| 21 | +| 22 | ++----+ + +DROP TABLE test; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/limit/limit.sql b/tests/cases/standalone/limit/limit.sql index 1a3f42380f..43510067d4 100644 --- a/tests/cases/standalone/limit/limit.sql +++ b/tests/cases/standalone/limit/limit.sql @@ -3,3 +3,13 @@ SELECT * FROM (SELECT SUM(number) FROM numbers LIMIT 100000000000) LIMIT 0; EXPLAIN SELECT * FROM (SELECT SUM(number) FROM numbers LIMIT 100000000000) LIMIT 0; EXPLAIN SELECT * FROM (SELECT SUM(number) FROM numbers LIMIT 100000000000) WHERE 1=0; + +CREATE TABLE test (a TIMESTAMP TIME INDEX, b INTEGER); + +INSERT INTO test VALUES (11, 23), (12, 21), (13, 22); + +SELECT a FROM test LIMIT 1; + +SELECT b FROM test ORDER BY b LIMIT 2 OFFSET 0; + +DROP TABLE test;