From 91456daf99cf21e72a95c01c0587fce0c1b4c455 Mon Sep 17 00:00:00 2001 From: shuiyisong Date: Mon, 12 Jun 2023 16:51:17 +0800 Subject: [PATCH] test: add physical plan wrapper trait --- src/cmd/src/cli/repl.rs | 6 ++++-- src/query/src/datafusion.rs | 23 ++++++++++++++++++----- src/query/src/lib.rs | 1 + src/query/src/physical_wrapper.rs | 26 ++++++++++++++++++++++++++ src/query/src/query_engine.rs | 4 ++-- 5 files changed, 51 insertions(+), 9 deletions(-) create mode 100644 src/query/src/physical_wrapper.rs diff --git a/src/cmd/src/cli/repl.rs b/src/cmd/src/cli/repl.rs index 6e4d5c645c..56cea0dd64 100644 --- a/src/cmd/src/cli/repl.rs +++ b/src/cmd/src/cli/repl.rs @@ -19,6 +19,7 @@ use std::time::Instant; use catalog::remote::CachedMetaKvBackend; use client::client_manager::DatanodeClients; use client::{Client, Database, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use common_base::Plugins; use common_error::prelude::ErrorExt; use common_query::Output; use common_recordbatch::RecordBatches; @@ -266,13 +267,14 @@ async fn create_query_engine(meta_addr: &str) -> Result { partition_manager, datanode_clients, )); + let plugins: Arc = Default::default(); let state = Arc::new(QueryEngineState::new( catalog_list, false, None, None, - Default::default(), + plugins.clone(), )); - Ok(DatafusionQueryEngine::new(state)) + Ok(DatafusionQueryEngine::new(state, plugins)) } diff --git a/src/query/src/datafusion.rs b/src/query/src/datafusion.rs index efc868d1cd..4018fad5e4 100644 --- a/src/query/src/datafusion.rs +++ b/src/query/src/datafusion.rs @@ -22,6 +22,7 @@ use std::collections::HashMap; use std::sync::Arc; use async_trait::async_trait; +use common_base::Plugins; use common_error::prelude::BoxedError; use common_function::scalars::aggregate::AggregateFunctionMetaRef; use common_function::scalars::udf::create_udf; @@ -55,6 +56,7 @@ use crate::executor::QueryExecutor; use crate::logical_optimizer::LogicalOptimizer; use crate::physical_optimizer::PhysicalOptimizer; use crate::physical_planner::PhysicalPlanner; +use crate::physical_wrapper::PhysicalWrapperRef; use crate::plan::LogicalPlan; use crate::planner::{DfLogicalPlanner, LogicalPlanner}; use crate::query_engine::{QueryEngineContext, QueryEngineState}; @@ -62,20 +64,31 @@ use crate::{metrics, QueryEngine}; pub struct DatafusionQueryEngine { state: Arc, + plugins: Arc, } impl DatafusionQueryEngine { - pub fn new(state: Arc) -> Self { - Self { state } + pub fn new(state: Arc, plugins: Arc) -> Self { + Self { state, plugins } } - async fn exec_query_plan(&self, plan: LogicalPlan) -> Result { + async fn exec_query_plan( + &self, + plan: LogicalPlan, + query_ctx: QueryContextRef, + ) -> Result { let mut ctx = QueryEngineContext::new(self.state.session_state()); // `create_physical_plan` will optimize logical plan internally let physical_plan = self.create_physical_plan(&mut ctx, &plan).await?; let physical_plan = self.optimize_physical_plan(&mut ctx, physical_plan)?; + let physical_plan = if let Some(wrapper) = self.plugins.get::() { + wrapper.wrap(physical_plan, query_ctx) + } else { + physical_plan + }; + Ok(Output::Stream(self.execute_stream(&ctx, &physical_plan)?)) } @@ -97,7 +110,7 @@ impl DatafusionQueryEngine { let table = self.find_table(&table_name).await?; let output = self - .exec_query_plan(LogicalPlan::DfPlan((*dml.input).clone())) + .exec_query_plan(LogicalPlan::DfPlan((*dml.input).clone()), query_ctx) .await?; let mut stream = match output { Output::RecordBatches(batches) => batches.as_stream(), @@ -233,7 +246,7 @@ impl QueryEngine for DatafusionQueryEngine { LogicalPlan::DfPlan(DfLogicalPlan::Dml(dml)) => { self.exec_dml_statement(dml, query_ctx).await } - _ => self.exec_query_plan(plan).await, + _ => self.exec_query_plan(plan, query_ctx).await, } } diff --git a/src/query/src/lib.rs b/src/query/src/lib.rs index 4e18a86c00..a71d6b88d7 100644 --- a/src/query/src/lib.rs +++ b/src/query/src/lib.rs @@ -26,6 +26,7 @@ mod optimizer; pub mod parser; pub mod physical_optimizer; pub mod physical_planner; +pub mod physical_wrapper; pub mod plan; pub mod planner; pub mod query_engine; diff --git a/src/query/src/physical_wrapper.rs b/src/query/src/physical_wrapper.rs new file mode 100644 index 0000000000..af01b8984a --- /dev/null +++ b/src/query/src/physical_wrapper.rs @@ -0,0 +1,26 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use common_query::physical_plan::PhysicalPlan; +use session::context::QueryContextRef; + +/// allow physical plan to be wrapped by other physical plan +/// e.g. add metrics to a physical plan +pub trait PhysicalWrapper: Send + Sync + 'static { + fn wrap(&self, origin: Arc, ctx: QueryContextRef) -> Arc; +} + +pub type PhysicalWrapperRef = Arc; diff --git a/src/query/src/query_engine.rs b/src/query/src/query_engine.rs index 5880f405c6..153c90274c 100644 --- a/src/query/src/query_engine.rs +++ b/src/query/src/query_engine.rs @@ -99,9 +99,9 @@ impl QueryEngineFactory { with_dist_planner, partition_manager, clients, - plugins, + plugins.clone(), )); - let query_engine = Arc::new(DatafusionQueryEngine::new(state)); + let query_engine = Arc::new(DatafusionQueryEngine::new(state, plugins)); register_functions(&query_engine); Self { query_engine } }