refactor(query): Remove PhysicalPlanner trait (#4412)

This commit is contained in:
Ran Miller
2024-07-24 11:06:46 +08:00
committed by GitHub
parent 547730a467
commit f7872654cc
3 changed files with 55 additions and 93 deletions

View File

@@ -58,7 +58,6 @@ use crate::executor::QueryExecutor;
use crate::logical_optimizer::LogicalOptimizer;
use crate::metrics::{OnDone, QUERY_STAGE_ELAPSED};
use crate::physical_optimizer::PhysicalOptimizer;
use crate::physical_planner::PhysicalPlanner;
use crate::physical_wrapper::PhysicalPlanWrapperRef;
use crate::plan::LogicalPlan;
use crate::planner::{DfLogicalPlanner, LogicalPlanner};
@@ -256,6 +255,61 @@ impl DatafusionQueryEngine {
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu { table: table_name })
}
#[tracing::instrument(skip_all)]
async fn create_physical_plan(
&self,
ctx: &mut QueryEngineContext,
logical_plan: &LogicalPlan,
) -> Result<Arc<dyn ExecutionPlan>> {
let _timer = metrics::CREATE_PHYSICAL_ELAPSED.start_timer();
match logical_plan {
LogicalPlan::DfPlan(df_plan) => {
let state = ctx.state();
// special handle EXPLAIN plan
if matches!(df_plan, DfLogicalPlan::Explain(_)) {
return state
.create_physical_plan(df_plan)
.await
.context(error::DatafusionSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu);
}
// analyze first
let analyzed_plan = state
.analyzer()
.execute_and_check(df_plan.clone(), state.config_options(), |_, _| {})
.context(error::DatafusionSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?;
// skip optimize for MergeScan
let optimized_plan = if let DfLogicalPlan::Extension(ext) = &analyzed_plan
&& ext.node.name() == MergeScanLogicalPlan::name()
{
analyzed_plan.clone()
} else {
state
.optimizer()
.optimize(analyzed_plan, state, |_, _| {})
.context(error::DatafusionSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?
};
let physical_plan = state
.query_planner()
.create_physical_plan(&optimized_plan, state)
.await
.context(error::DatafusionSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?;
Ok(physical_plan)
}
}
}
}
#[async_trait]
@@ -362,64 +416,6 @@ impl LogicalOptimizer for DatafusionQueryEngine {
}
}
#[async_trait::async_trait]
impl PhysicalPlanner for DatafusionQueryEngine {
#[tracing::instrument(skip_all)]
async fn create_physical_plan(
&self,
ctx: &mut QueryEngineContext,
logical_plan: &LogicalPlan,
) -> Result<Arc<dyn ExecutionPlan>> {
let _timer = metrics::CREATE_PHYSICAL_ELAPSED.start_timer();
match logical_plan {
LogicalPlan::DfPlan(df_plan) => {
let state = ctx.state();
// special handle EXPLAIN plan
if matches!(df_plan, DfLogicalPlan::Explain(_)) {
return state
.create_physical_plan(df_plan)
.await
.context(error::DatafusionSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu);
}
// analyze first
let analyzed_plan = state
.analyzer()
.execute_and_check(df_plan.clone(), state.config_options(), |_, _| {})
.context(error::DatafusionSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?;
// skip optimize for MergeScan
let optimized_plan = if let DfLogicalPlan::Extension(ext) = &analyzed_plan
&& ext.node.name() == MergeScanLogicalPlan::name()
{
analyzed_plan.clone()
} else {
state
.optimizer()
.optimize(analyzed_plan, state, |_, _| {})
.context(error::DatafusionSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?
};
let physical_plan = state
.query_planner()
.create_physical_plan(&optimized_plan, state)
.await
.context(error::DatafusionSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?;
Ok(physical_plan)
}
}
}
}
impl PhysicalOptimizer for DatafusionQueryEngine {
#[tracing::instrument(skip_all)]
fn optimize_physical_plan(

View File

@@ -29,7 +29,6 @@ pub mod metrics;
mod optimizer;
pub mod parser;
pub mod physical_optimizer;
pub mod physical_planner;
pub mod physical_wrapper;
pub mod plan;
pub mod planner;

View File

@@ -1,33 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use datafusion::physical_plan::ExecutionPlan;
use crate::error::Result;
use crate::plan::LogicalPlan;
use crate::query_engine::QueryEngineContext;
/// Physical query planner that converts a `LogicalPlan` to an
/// `ExecutionPlan` suitable for execution.
#[async_trait::async_trait]
pub trait PhysicalPlanner {
/// Create a physical plan from a logical plan
async fn create_physical_plan(
&self,
ctx: &mut QueryEngineContext,
logical_plan: &LogicalPlan,
) -> Result<Arc<dyn ExecutionPlan>>;
}