refactor: Remove PhysicalOptimizer and LogicalOptimizer trait (#4426)

* refactor(query): Remove LogicalOptimizer trait

* refactor(query): Remove PhysicalOptimizer trait
This commit is contained in:
Ran Miller
2024-07-24 21:01:44 +08:00
committed by GitHub
parent f7872654cc
commit e935bf7574
5 changed files with 64 additions and 120 deletions

View File

@@ -34,7 +34,6 @@ use common_telemetry::debug;
use either::Either;
use meta_client::client::MetaClientBuilder;
use query::datafusion::DatafusionQueryEngine;
use query::logical_optimizer::LogicalOptimizer;
use query::parser::QueryLanguageParser;
use query::plan::LogicalPlan;
use query::query_engine::{DefaultSerializer, QueryEngineState};

View File

@@ -55,9 +55,7 @@ use crate::error::{
TableNotFoundSnafu, TableReadOnlySnafu, UnsupportedExprSnafu,
};
use crate::executor::QueryExecutor;
use crate::logical_optimizer::LogicalOptimizer;
use crate::metrics::{OnDone, QUERY_STAGE_ELAPSED};
use crate::physical_optimizer::PhysicalOptimizer;
use crate::physical_wrapper::PhysicalPlanWrapperRef;
use crate::plan::LogicalPlan;
use crate::planner::{DfLogicalPlanner, LogicalPlanner};
@@ -310,6 +308,70 @@ impl DatafusionQueryEngine {
}
}
}
#[tracing::instrument(skip_all)]
pub fn optimize(
&self,
context: &QueryEngineContext,
plan: &LogicalPlan,
) -> Result<LogicalPlan> {
let _timer = metrics::OPTIMIZE_LOGICAL_ELAPSED.start_timer();
match plan {
LogicalPlan::DfPlan(df_plan) => {
// Optimized by extension rules
let optimized_plan = self
.state
.optimize_by_extension_rules(df_plan.clone(), context)
.context(error::DatafusionSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?;
// Optimized by datafusion optimizer
let optimized_plan = self
.state
.session_state()
.optimize(&optimized_plan)
.context(error::DatafusionSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?;
Ok(LogicalPlan::DfPlan(optimized_plan))
}
}
}
#[tracing::instrument(skip_all)]
fn optimize_physical_plan(
&self,
ctx: &mut QueryEngineContext,
plan: Arc<dyn ExecutionPlan>,
) -> Result<Arc<dyn ExecutionPlan>> {
let _timer = metrics::OPTIMIZE_PHYSICAL_ELAPSED.start_timer();
let state = ctx.state();
let config = state.config_options();
// skip optimize AnalyzeExec plan
let optimized_plan = if let Some(analyze_plan) = plan.as_any().downcast_ref::<AnalyzeExec>()
{
let mut new_plan = analyze_plan.input().clone();
for optimizer in state.physical_optimizers() {
new_plan = optimizer
.optimize(new_plan, config)
.context(DataFusionSnafu)?;
}
Arc::new(DistAnalyzeExec::new(new_plan))
} else {
let mut new_plan = plan;
for optimizer in state.physical_optimizers() {
new_plan = optimizer
.optimize(new_plan, config)
.context(DataFusionSnafu)?;
}
new_plan
};
Ok(optimized_plan)
}
}
#[async_trait]
@@ -387,70 +449,6 @@ impl QueryEngine for DatafusionQueryEngine {
}
}
impl LogicalOptimizer for DatafusionQueryEngine {
#[tracing::instrument(skip_all)]
fn optimize(&self, context: &QueryEngineContext, plan: &LogicalPlan) -> Result<LogicalPlan> {
let _timer = metrics::OPTIMIZE_LOGICAL_ELAPSED.start_timer();
match plan {
LogicalPlan::DfPlan(df_plan) => {
// Optimized by extension rules
let optimized_plan = self
.state
.optimize_by_extension_rules(df_plan.clone(), context)
.context(error::DatafusionSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?;
// Optimized by datafusion optimizer
let optimized_plan = self
.state
.session_state()
.optimize(&optimized_plan)
.context(error::DatafusionSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?;
Ok(LogicalPlan::DfPlan(optimized_plan))
}
}
}
}
impl PhysicalOptimizer for DatafusionQueryEngine {
#[tracing::instrument(skip_all)]
fn optimize_physical_plan(
&self,
ctx: &mut QueryEngineContext,
plan: Arc<dyn ExecutionPlan>,
) -> Result<Arc<dyn ExecutionPlan>> {
let _timer = metrics::OPTIMIZE_PHYSICAL_ELAPSED.start_timer();
let state = ctx.state();
let config = state.config_options();
// skip optimize AnalyzeExec plan
let optimized_plan = if let Some(analyze_plan) = plan.as_any().downcast_ref::<AnalyzeExec>()
{
let mut new_plan = analyze_plan.input().clone();
for optimizer in state.physical_optimizers() {
new_plan = optimizer
.optimize(new_plan, config)
.context(DataFusionSnafu)?;
}
Arc::new(DistAnalyzeExec::new(new_plan))
} else {
let mut new_plan = plan;
for optimizer in state.physical_optimizers() {
new_plan = optimizer
.optimize(new_plan, config)
.context(DataFusionSnafu)?;
}
new_plan
};
Ok(optimized_plan)
}
}
impl QueryExecutor for DatafusionQueryEngine {
#[tracing::instrument(skip_all)]
fn execute_stream(

View File

@@ -24,11 +24,9 @@ pub mod dist_plan;
pub mod dummy_catalog;
pub mod error;
pub mod executor;
pub mod logical_optimizer;
pub mod metrics;
mod optimizer;
pub mod parser;
pub mod physical_optimizer;
pub mod physical_wrapper;
pub mod plan;
pub mod planner;

View File

@@ -1,23 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::error::Result;
use crate::plan::LogicalPlan;
use crate::QueryEngineContext;
/// Logical plan optimizer, rewrite the [`LogicalPlan`] in some way.
pub trait LogicalOptimizer {
/// Optimize the `plan`
fn optimize(&self, context: &QueryEngineContext, plan: &LogicalPlan) -> Result<LogicalPlan>;
}

View File

@@ -1,28 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use datafusion::physical_plan::ExecutionPlan;
use crate::error::Result;
use crate::query_engine::QueryEngineContext;
pub trait PhysicalOptimizer {
fn optimize_physical_plan(
&self,
ctx: &mut QueryEngineContext,
plan: Arc<dyn ExecutionPlan>,
) -> Result<Arc<dyn ExecutionPlan>>;
}