From b540d640cf4a269d5a878c7e9591e42813737db6 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Mon, 12 Jun 2023 18:16:24 +0800 Subject: [PATCH] fix: unstable order with union operation (#1763) Signed-off-by: Ruihang Xia --- src/query/src/query_engine/state.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/query/src/query_engine/state.rs b/src/query/src/query_engine/state.rs index bd19bef0c6..e69af3696f 100644 --- a/src/query/src/query_engine/state.rs +++ b/src/query/src/query_engine/state.rs @@ -27,6 +27,7 @@ use datafusion::catalog::catalog::MemoryCatalogList; use datafusion::error::Result as DfResult; use datafusion::execution::context::{QueryPlanner, SessionConfig, SessionState}; use datafusion::execution::runtime_env::RuntimeEnv; +use datafusion::physical_optimizer::dist_enforcement::EnforceDistribution; use datafusion::physical_optimizer::repartition::Repartition; use datafusion::physical_optimizer::sort_enforcement::EnforceSorting; use datafusion::physical_optimizer::PhysicalOptimizerRule; @@ -86,10 +87,16 @@ impl QueryEngineState { let state = SessionState::with_config_rt(session_config.clone(), runtime_env.clone()); state.physical_optimizers().to_vec() }; - // run the repartition and sort enforcement rules first + // run the repartition and sort enforcement rules first. + // And `EnforceSorting` is required to run after `EnforceDistribution`. Self::remove_physical_optimize_rule(&mut physical_optimizers, EnforceSorting {}.name()); + Self::remove_physical_optimize_rule( + &mut physical_optimizers, + EnforceDistribution {}.name(), + ); Self::remove_physical_optimize_rule(&mut physical_optimizers, Repartition {}.name()); physical_optimizers.insert(0, Arc::new(EnforceSorting {})); + physical_optimizers.insert(0, Arc::new(EnforceDistribution {})); physical_optimizers.insert(0, Arc::new(Repartition {})); let session_state = SessionState::with_config_rt_and_catalog_list(