fix: bring EnforceSorting rule forward (#1754)

* fix: bring EnforceSorting rule forward

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove duplicated rules

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* wrap remove logic into a method

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2023-06-12 15:29:08 +08:00
committed by GitHub
parent 0e937be3f5
commit 1b2381502e

View File

@@ -27,6 +27,9 @@ use datafusion::catalog::catalog::MemoryCatalogList;
use datafusion::error::Result as DfResult;
use datafusion::execution::context::{QueryPlanner, SessionConfig, SessionState};
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::physical_optimizer::repartition::Repartition;
use datafusion::physical_optimizer::sort_enforcement::EnforceSorting;
use datafusion::physical_optimizer::PhysicalOptimizerRule;
use datafusion::physical_plan::planner::{DefaultPhysicalPlanner, ExtensionPlanner};
use datafusion::physical_plan::{ExecutionPlan, PhysicalPlanner};
use datafusion_expr::LogicalPlan as DfLogicalPlan;
@@ -79,6 +82,16 @@ impl QueryEngineState {
let mut optimizer = Optimizer::new();
optimizer.rules.push(Arc::new(OrderHintRule));
let mut physical_optimizers = {
let state = SessionState::with_config_rt(session_config.clone(), runtime_env.clone());
state.physical_optimizers().to_vec()
};
// run the repartition and sort enforcement rules first
Self::remove_physical_optimize_rule(&mut physical_optimizers, EnforceSorting {}.name());
Self::remove_physical_optimize_rule(&mut physical_optimizers, Repartition {}.name());
physical_optimizers.insert(0, Arc::new(EnforceSorting {}));
physical_optimizers.insert(0, Arc::new(Repartition {}));
let session_state = SessionState::with_config_rt_and_catalog_list(
session_config,
runtime_env,
@@ -90,7 +103,8 @@ impl QueryEngineState {
partition_manager,
datanode_clients,
)))
.with_optimizer_rules(optimizer.rules);
.with_optimizer_rules(optimizer.rules)
.with_physical_optimizer_rules(physical_optimizers);
let df_context = SessionContext::with_state(session_state);
@@ -102,6 +116,22 @@ impl QueryEngineState {
}
}
fn remove_physical_optimize_rule(
rules: &mut Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>>,
name: &str,
) {
let mut index_to_move = None;
for (i, rule) in rules.iter().enumerate() {
if rule.name() == name {
index_to_move = Some(i);
break;
}
}
if let Some(index) = index_to_move {
rules.remove(index);
}
}
/// Register a udf function
// TODO(dennis): manage UDFs by ourself.
pub fn register_udf(&self, udf: ScalarUdf) {