From 1b2381502e53b059545cc4ee578cf4b391ddb987 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Mon, 12 Jun 2023 15:29:08 +0800 Subject: [PATCH] fix: bring EnforceSorting rule forward (#1754) * fix: bring EnforceSorting rule forward Signed-off-by: Ruihang Xia * remove duplicated rules Signed-off-by: Ruihang Xia * wrap remove logic into a method Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia --- src/query/src/query_engine/state.rs | 32 ++++++++++++++++++++++++++++- 1 file changed, 31 insertions(+), 1 deletion(-) diff --git a/src/query/src/query_engine/state.rs b/src/query/src/query_engine/state.rs index c19e0ed8b6..bd19bef0c6 100644 --- a/src/query/src/query_engine/state.rs +++ b/src/query/src/query_engine/state.rs @@ -27,6 +27,9 @@ use datafusion::catalog::catalog::MemoryCatalogList; use datafusion::error::Result as DfResult; use datafusion::execution::context::{QueryPlanner, SessionConfig, SessionState}; use datafusion::execution::runtime_env::RuntimeEnv; +use datafusion::physical_optimizer::repartition::Repartition; +use datafusion::physical_optimizer::sort_enforcement::EnforceSorting; +use datafusion::physical_optimizer::PhysicalOptimizerRule; use datafusion::physical_plan::planner::{DefaultPhysicalPlanner, ExtensionPlanner}; use datafusion::physical_plan::{ExecutionPlan, PhysicalPlanner}; use datafusion_expr::LogicalPlan as DfLogicalPlan; @@ -79,6 +82,16 @@ impl QueryEngineState { let mut optimizer = Optimizer::new(); optimizer.rules.push(Arc::new(OrderHintRule)); + let mut physical_optimizers = { + let state = SessionState::with_config_rt(session_config.clone(), runtime_env.clone()); + state.physical_optimizers().to_vec() + }; + // run the repartition and sort enforcement rules first + Self::remove_physical_optimize_rule(&mut physical_optimizers, EnforceSorting {}.name()); + Self::remove_physical_optimize_rule(&mut physical_optimizers, Repartition {}.name()); + physical_optimizers.insert(0, Arc::new(EnforceSorting {})); + physical_optimizers.insert(0, Arc::new(Repartition {})); + let session_state = SessionState::with_config_rt_and_catalog_list( session_config, runtime_env, @@ -90,7 +103,8 @@ impl QueryEngineState { partition_manager, datanode_clients, ))) - .with_optimizer_rules(optimizer.rules); + .with_optimizer_rules(optimizer.rules) + .with_physical_optimizer_rules(physical_optimizers); let df_context = SessionContext::with_state(session_state); @@ -102,6 +116,22 @@ impl QueryEngineState { } } + fn remove_physical_optimize_rule( + rules: &mut Vec>, + name: &str, + ) { + let mut index_to_move = None; + for (i, rule) in rules.iter().enumerate() { + if rule.name() == name { + index_to_move = Some(i); + break; + } + } + if let Some(index) = index_to_move { + rules.remove(index); + } + } + /// Register a udf function // TODO(dennis): manage UDFs by ourself. pub fn register_udf(&self, udf: ScalarUdf) {