test: add physical plan wrapper trait

This commit is contained in:
shuiyisong
2023-06-12 16:51:17 +08:00
parent 2dd86b686f
commit 91456daf99
5 changed files with 51 additions and 9 deletions

View File

@@ -19,6 +19,7 @@ use std::time::Instant;
use catalog::remote::CachedMetaKvBackend;
use client::client_manager::DatanodeClients;
use client::{Client, Database, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_base::Plugins;
use common_error::prelude::ErrorExt;
use common_query::Output;
use common_recordbatch::RecordBatches;
@@ -266,13 +267,14 @@ async fn create_query_engine(meta_addr: &str) -> Result<DatafusionQueryEngine> {
partition_manager,
datanode_clients,
));
let plugins: Arc<Plugins> = Default::default();
let state = Arc::new(QueryEngineState::new(
catalog_list,
false,
None,
None,
Default::default(),
plugins.clone(),
));
Ok(DatafusionQueryEngine::new(state))
Ok(DatafusionQueryEngine::new(state, plugins))
}

View File

@@ -22,6 +22,7 @@ use std::collections::HashMap;
use std::sync::Arc;
use async_trait::async_trait;
use common_base::Plugins;
use common_error::prelude::BoxedError;
use common_function::scalars::aggregate::AggregateFunctionMetaRef;
use common_function::scalars::udf::create_udf;
@@ -55,6 +56,7 @@ use crate::executor::QueryExecutor;
use crate::logical_optimizer::LogicalOptimizer;
use crate::physical_optimizer::PhysicalOptimizer;
use crate::physical_planner::PhysicalPlanner;
use crate::physical_wrapper::PhysicalWrapperRef;
use crate::plan::LogicalPlan;
use crate::planner::{DfLogicalPlanner, LogicalPlanner};
use crate::query_engine::{QueryEngineContext, QueryEngineState};
@@ -62,20 +64,31 @@ use crate::{metrics, QueryEngine};
pub struct DatafusionQueryEngine {
state: Arc<QueryEngineState>,
plugins: Arc<Plugins>,
}
impl DatafusionQueryEngine {
pub fn new(state: Arc<QueryEngineState>) -> Self {
Self { state }
pub fn new(state: Arc<QueryEngineState>, plugins: Arc<Plugins>) -> Self {
Self { state, plugins }
}
async fn exec_query_plan(&self, plan: LogicalPlan) -> Result<Output> {
async fn exec_query_plan(
&self,
plan: LogicalPlan,
query_ctx: QueryContextRef,
) -> Result<Output> {
let mut ctx = QueryEngineContext::new(self.state.session_state());
// `create_physical_plan` will optimize logical plan internally
let physical_plan = self.create_physical_plan(&mut ctx, &plan).await?;
let physical_plan = self.optimize_physical_plan(&mut ctx, physical_plan)?;
let physical_plan = if let Some(wrapper) = self.plugins.get::<PhysicalWrapperRef>() {
wrapper.wrap(physical_plan, query_ctx)
} else {
physical_plan
};
Ok(Output::Stream(self.execute_stream(&ctx, &physical_plan)?))
}
@@ -97,7 +110,7 @@ impl DatafusionQueryEngine {
let table = self.find_table(&table_name).await?;
let output = self
.exec_query_plan(LogicalPlan::DfPlan((*dml.input).clone()))
.exec_query_plan(LogicalPlan::DfPlan((*dml.input).clone()), query_ctx)
.await?;
let mut stream = match output {
Output::RecordBatches(batches) => batches.as_stream(),
@@ -233,7 +246,7 @@ impl QueryEngine for DatafusionQueryEngine {
LogicalPlan::DfPlan(DfLogicalPlan::Dml(dml)) => {
self.exec_dml_statement(dml, query_ctx).await
}
_ => self.exec_query_plan(plan).await,
_ => self.exec_query_plan(plan, query_ctx).await,
}
}

View File

@@ -26,6 +26,7 @@ mod optimizer;
pub mod parser;
pub mod physical_optimizer;
pub mod physical_planner;
pub mod physical_wrapper;
pub mod plan;
pub mod planner;
pub mod query_engine;

View File

@@ -0,0 +1,26 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use common_query::physical_plan::PhysicalPlan;
use session::context::QueryContextRef;
/// allow physical plan to be wrapped by other physical plan
/// e.g. add metrics to a physical plan
pub trait PhysicalWrapper: Send + Sync + 'static {
fn wrap(&self, origin: Arc<dyn PhysicalPlan>, ctx: QueryContextRef) -> Arc<dyn PhysicalPlan>;
}
pub type PhysicalWrapperRef = Arc<dyn PhysicalWrapper>;

View File

@@ -99,9 +99,9 @@ impl QueryEngineFactory {
with_dist_planner,
partition_manager,
clients,
plugins,
plugins.clone(),
));
let query_engine = Arc::new(DatafusionQueryEngine::new(state));
let query_engine = Arc::new(DatafusionQueryEngine::new(state, plugins));
register_functions(&query_engine);
Self { query_engine }
}