fix: unstable order with union operation (#1763)

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2023-06-12 18:16:24 +08:00
committed by GitHub
parent 51a4d660b7
commit b540d640cf

View File

@@ -27,6 +27,7 @@ use datafusion::catalog::catalog::MemoryCatalogList;
use datafusion::error::Result as DfResult;
use datafusion::execution::context::{QueryPlanner, SessionConfig, SessionState};
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::physical_optimizer::dist_enforcement::EnforceDistribution;
use datafusion::physical_optimizer::repartition::Repartition;
use datafusion::physical_optimizer::sort_enforcement::EnforceSorting;
use datafusion::physical_optimizer::PhysicalOptimizerRule;
@@ -86,10 +87,16 @@ impl QueryEngineState {
let state = SessionState::with_config_rt(session_config.clone(), runtime_env.clone());
state.physical_optimizers().to_vec()
};
// run the repartition and sort enforcement rules first
// run the repartition and sort enforcement rules first.
// And `EnforceSorting` is required to run after `EnforceDistribution`.
Self::remove_physical_optimize_rule(&mut physical_optimizers, EnforceSorting {}.name());
Self::remove_physical_optimize_rule(
&mut physical_optimizers,
EnforceDistribution {}.name(),
);
Self::remove_physical_optimize_rule(&mut physical_optimizers, Repartition {}.name());
physical_optimizers.insert(0, Arc::new(EnforceSorting {}));
physical_optimizers.insert(0, Arc::new(EnforceDistribution {}));
physical_optimizers.insert(0, Arc::new(Repartition {}));
let session_state = SessionState::with_config_rt_and_catalog_list(