mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-15 12:30:38 +00:00
1092 lines
38 KiB
Rust
1092 lines
38 KiB
Rust
// Copyright 2023 Greptime Team
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
//! Planner, QueryEngine implementations based on DataFusion.
|
|
|
|
mod error;
|
|
mod planner;
|
|
|
|
use std::any::Any;
|
|
use std::collections::HashMap;
|
|
use std::sync::Arc;
|
|
|
|
use async_trait::async_trait;
|
|
use common_base::Plugins;
|
|
use common_catalog::consts::is_readonly_schema;
|
|
use common_error::ext::BoxedError;
|
|
use common_function::function::FunctionContext;
|
|
use common_function::function_factory::ScalarFunctionFactory;
|
|
use common_query::{Output, OutputData, OutputMeta};
|
|
use common_recordbatch::adapter::RecordBatchStreamAdapter;
|
|
use common_recordbatch::{EmptyRecordBatchStream, SendableRecordBatchStream};
|
|
use common_telemetry::tracing;
|
|
use datafusion::catalog::TableFunction;
|
|
use datafusion::dataframe::DataFrame;
|
|
use datafusion::physical_plan::ExecutionPlan;
|
|
use datafusion::physical_plan::analyze::AnalyzeExec;
|
|
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
|
|
use datafusion_common::ResolvedTableReference;
|
|
use datafusion_expr::{
|
|
AggregateUDF, DmlStatement, LogicalPlan as DfLogicalPlan, LogicalPlan, WindowUDF, WriteOp,
|
|
};
|
|
use datatypes::prelude::VectorRef;
|
|
use datatypes::schema::Schema;
|
|
use futures_util::StreamExt;
|
|
use session::context::QueryContextRef;
|
|
use snafu::{OptionExt, ResultExt, ensure};
|
|
use sqlparser::ast::AnalyzeFormat;
|
|
use table::TableRef;
|
|
use table::requests::{DeleteRequest, InsertRequest};
|
|
use tracing::Span;
|
|
|
|
use crate::analyze::DistAnalyzeExec;
|
|
pub use crate::datafusion::planner::DfContextProviderAdapter;
|
|
use crate::dist_plan::{DistPlannerOptions, MergeScanLogicalPlan};
|
|
use crate::error::{
|
|
CatalogSnafu, ConvertSchemaSnafu, CreateRecordBatchSnafu, MissingTableMutationHandlerSnafu,
|
|
MissingTimestampColumnSnafu, QueryExecutionSnafu, Result, TableMutationSnafu,
|
|
TableNotFoundSnafu, TableReadOnlySnafu, UnsupportedExprSnafu,
|
|
};
|
|
use crate::executor::QueryExecutor;
|
|
use crate::metrics::{OnDone, QUERY_STAGE_ELAPSED};
|
|
use crate::physical_wrapper::PhysicalPlanWrapperRef;
|
|
use crate::planner::{DfLogicalPlanner, LogicalPlanner};
|
|
use crate::query_engine::{DescribeResult, QueryEngineContext, QueryEngineState};
|
|
use crate::{QueryEngine, metrics};
|
|
|
|
/// Query parallelism hint key.
|
|
/// This hint can be set in the query context to control the parallelism of the query execution.
|
|
pub const QUERY_PARALLELISM_HINT: &str = "query_parallelism";
|
|
|
|
/// Whether to fallback to the original plan when failed to push down.
|
|
pub const QUERY_FALLBACK_HINT: &str = "query_fallback";
|
|
|
|
pub struct DatafusionQueryEngine {
|
|
state: Arc<QueryEngineState>,
|
|
plugins: Plugins,
|
|
}
|
|
|
|
impl DatafusionQueryEngine {
|
|
pub fn new(state: Arc<QueryEngineState>, plugins: Plugins) -> Self {
|
|
Self { state, plugins }
|
|
}
|
|
|
|
#[tracing::instrument(skip_all)]
|
|
async fn exec_query_plan(
|
|
&self,
|
|
plan: LogicalPlan,
|
|
query_ctx: QueryContextRef,
|
|
) -> Result<Output> {
|
|
let mut ctx = self.engine_context(query_ctx.clone());
|
|
|
|
// `create_physical_plan` will optimize logical plan internally
|
|
let physical_plan = self.create_physical_plan(&mut ctx, &plan).await?;
|
|
let optimized_physical_plan = self.optimize_physical_plan(&mut ctx, physical_plan)?;
|
|
|
|
let physical_plan = if let Some(wrapper) = self.plugins.get::<PhysicalPlanWrapperRef>() {
|
|
wrapper.wrap(optimized_physical_plan, query_ctx)
|
|
} else {
|
|
optimized_physical_plan
|
|
};
|
|
|
|
Ok(Output::new(
|
|
OutputData::Stream(self.execute_stream(&ctx, &physical_plan)?),
|
|
OutputMeta::new_with_plan(physical_plan),
|
|
))
|
|
}
|
|
|
|
#[tracing::instrument(skip_all)]
|
|
async fn exec_dml_statement(
|
|
&self,
|
|
dml: DmlStatement,
|
|
query_ctx: QueryContextRef,
|
|
) -> Result<Output> {
|
|
ensure!(
|
|
matches!(dml.op, WriteOp::Insert(_) | WriteOp::Delete),
|
|
UnsupportedExprSnafu {
|
|
name: format!("DML op {}", dml.op),
|
|
}
|
|
);
|
|
|
|
let _timer = QUERY_STAGE_ELAPSED
|
|
.with_label_values(&[dml.op.name()])
|
|
.start_timer();
|
|
|
|
let default_catalog = &query_ctx.current_catalog().to_owned();
|
|
let default_schema = &query_ctx.current_schema();
|
|
let table_name = dml.table_name.resolve(default_catalog, default_schema);
|
|
let table = self.find_table(&table_name, &query_ctx).await?;
|
|
|
|
let output = self
|
|
.exec_query_plan((*dml.input).clone(), query_ctx.clone())
|
|
.await?;
|
|
let mut stream = match output.data {
|
|
OutputData::RecordBatches(batches) => batches.as_stream(),
|
|
OutputData::Stream(stream) => stream,
|
|
_ => unreachable!(),
|
|
};
|
|
|
|
let mut affected_rows = 0;
|
|
let mut insert_cost = 0;
|
|
|
|
while let Some(batch) = stream.next().await {
|
|
let batch = batch.context(CreateRecordBatchSnafu)?;
|
|
let column_vectors = batch
|
|
.column_vectors(&table_name.to_string(), table.schema())
|
|
.map_err(BoxedError::new)
|
|
.context(QueryExecutionSnafu)?;
|
|
|
|
match dml.op {
|
|
WriteOp::Insert(_) => {
|
|
// We ignore the insert op.
|
|
let output = self
|
|
.insert(&table_name, column_vectors, query_ctx.clone())
|
|
.await?;
|
|
let (rows, cost) = output.extract_rows_and_cost();
|
|
affected_rows += rows;
|
|
insert_cost += cost;
|
|
}
|
|
WriteOp::Delete => {
|
|
affected_rows += self
|
|
.delete(&table_name, &table, column_vectors, query_ctx.clone())
|
|
.await?;
|
|
}
|
|
_ => unreachable!("guarded by the 'ensure!' at the beginning"),
|
|
}
|
|
}
|
|
Ok(Output::new(
|
|
OutputData::AffectedRows(affected_rows),
|
|
OutputMeta::new_with_cost(insert_cost),
|
|
))
|
|
}
|
|
|
|
#[tracing::instrument(skip_all)]
|
|
async fn delete(
|
|
&self,
|
|
table_name: &ResolvedTableReference,
|
|
table: &TableRef,
|
|
column_vectors: HashMap<String, VectorRef>,
|
|
query_ctx: QueryContextRef,
|
|
) -> Result<usize> {
|
|
let catalog_name = table_name.catalog.to_string();
|
|
let schema_name = table_name.schema.to_string();
|
|
let table_name = table_name.table.to_string();
|
|
let table_schema = table.schema();
|
|
|
|
ensure!(
|
|
!is_readonly_schema(&schema_name),
|
|
TableReadOnlySnafu { table: table_name }
|
|
);
|
|
|
|
let ts_column = table_schema
|
|
.timestamp_column()
|
|
.map(|x| &x.name)
|
|
.with_context(|| MissingTimestampColumnSnafu {
|
|
table_name: table_name.clone(),
|
|
})?;
|
|
|
|
let table_info = table.table_info();
|
|
let rowkey_columns = table_info
|
|
.meta
|
|
.row_key_column_names()
|
|
.collect::<Vec<&String>>();
|
|
let column_vectors = column_vectors
|
|
.into_iter()
|
|
.filter(|x| &x.0 == ts_column || rowkey_columns.contains(&&x.0))
|
|
.collect::<HashMap<_, _>>();
|
|
|
|
let request = DeleteRequest {
|
|
catalog_name,
|
|
schema_name,
|
|
table_name,
|
|
key_column_values: column_vectors,
|
|
};
|
|
|
|
self.state
|
|
.table_mutation_handler()
|
|
.context(MissingTableMutationHandlerSnafu)?
|
|
.delete(request, query_ctx)
|
|
.await
|
|
.context(TableMutationSnafu)
|
|
}
|
|
|
|
#[tracing::instrument(skip_all)]
|
|
async fn insert(
|
|
&self,
|
|
table_name: &ResolvedTableReference,
|
|
column_vectors: HashMap<String, VectorRef>,
|
|
query_ctx: QueryContextRef,
|
|
) -> Result<Output> {
|
|
let catalog_name = table_name.catalog.to_string();
|
|
let schema_name = table_name.schema.to_string();
|
|
let table_name = table_name.table.to_string();
|
|
|
|
ensure!(
|
|
!is_readonly_schema(&schema_name),
|
|
TableReadOnlySnafu { table: table_name }
|
|
);
|
|
|
|
let request = InsertRequest {
|
|
catalog_name,
|
|
schema_name,
|
|
table_name,
|
|
columns_values: column_vectors,
|
|
};
|
|
|
|
self.state
|
|
.table_mutation_handler()
|
|
.context(MissingTableMutationHandlerSnafu)?
|
|
.insert(request, query_ctx)
|
|
.await
|
|
.context(TableMutationSnafu)
|
|
}
|
|
|
|
async fn find_table(
|
|
&self,
|
|
table_name: &ResolvedTableReference,
|
|
query_context: &QueryContextRef,
|
|
) -> Result<TableRef> {
|
|
let catalog_name = table_name.catalog.as_ref();
|
|
let schema_name = table_name.schema.as_ref();
|
|
let table_name = table_name.table.as_ref();
|
|
|
|
self.state
|
|
.catalog_manager()
|
|
.table(catalog_name, schema_name, table_name, Some(query_context))
|
|
.await
|
|
.context(CatalogSnafu)?
|
|
.with_context(|| TableNotFoundSnafu { table: table_name })
|
|
}
|
|
|
|
#[tracing::instrument(skip_all)]
|
|
async fn create_physical_plan(
|
|
&self,
|
|
ctx: &mut QueryEngineContext,
|
|
logical_plan: &LogicalPlan,
|
|
) -> Result<Arc<dyn ExecutionPlan>> {
|
|
/// Only print context on panic, to avoid cluttering logs.
|
|
///
|
|
/// TODO(discord9): remove this once we catch the bug
|
|
#[derive(Debug)]
|
|
struct PanicLogger<'a> {
|
|
input_logical_plan: &'a LogicalPlan,
|
|
after_analyze: Option<LogicalPlan>,
|
|
after_optimize: Option<LogicalPlan>,
|
|
phy_plan: Option<Arc<dyn ExecutionPlan>>,
|
|
}
|
|
impl Drop for PanicLogger<'_> {
|
|
fn drop(&mut self) {
|
|
if std::thread::panicking() {
|
|
common_telemetry::error!(
|
|
"Panic while creating physical plan, input logical plan: {:?}, after analyze: {:?}, after optimize: {:?}, final physical plan: {:?}",
|
|
self.input_logical_plan,
|
|
self.after_analyze,
|
|
self.after_optimize,
|
|
self.phy_plan
|
|
);
|
|
}
|
|
}
|
|
}
|
|
|
|
let mut logger = PanicLogger {
|
|
input_logical_plan: logical_plan,
|
|
after_analyze: None,
|
|
after_optimize: None,
|
|
phy_plan: None,
|
|
};
|
|
|
|
let _timer = metrics::CREATE_PHYSICAL_ELAPSED.start_timer();
|
|
let state = ctx.state();
|
|
|
|
common_telemetry::debug!("Create physical plan, input plan: {logical_plan}");
|
|
|
|
// special handle EXPLAIN plan
|
|
if matches!(logical_plan, DfLogicalPlan::Explain(_)) {
|
|
return state
|
|
.create_physical_plan(logical_plan)
|
|
.await
|
|
.map_err(Into::into);
|
|
}
|
|
|
|
// analyze first
|
|
let analyzed_plan = state.analyzer().execute_and_check(
|
|
logical_plan.clone(),
|
|
state.config_options(),
|
|
|_, _| {},
|
|
)?;
|
|
|
|
logger.after_analyze = Some(analyzed_plan.clone());
|
|
|
|
common_telemetry::debug!("Create physical plan, analyzed plan: {analyzed_plan}");
|
|
|
|
// skip optimize for MergeScan
|
|
let optimized_plan = if let DfLogicalPlan::Extension(ext) = &analyzed_plan
|
|
&& ext.node.name() == MergeScanLogicalPlan::name()
|
|
{
|
|
analyzed_plan.clone()
|
|
} else {
|
|
state
|
|
.optimizer()
|
|
.optimize(analyzed_plan, state, |_, _| {})?
|
|
};
|
|
|
|
common_telemetry::debug!("Create physical plan, optimized plan: {optimized_plan}");
|
|
logger.after_optimize = Some(optimized_plan.clone());
|
|
|
|
let physical_plan = state
|
|
.query_planner()
|
|
.create_physical_plan(&optimized_plan, state)
|
|
.await?;
|
|
|
|
logger.phy_plan = Some(physical_plan.clone());
|
|
drop(logger);
|
|
Ok(physical_plan)
|
|
}
|
|
|
|
#[tracing::instrument(skip_all)]
|
|
pub fn optimize(
|
|
&self,
|
|
context: &QueryEngineContext,
|
|
plan: &LogicalPlan,
|
|
) -> Result<LogicalPlan> {
|
|
let _timer = metrics::OPTIMIZE_LOGICAL_ELAPSED.start_timer();
|
|
|
|
// Optimized by extension rules
|
|
let optimized_plan = self
|
|
.state
|
|
.optimize_by_extension_rules(plan.clone(), context)?;
|
|
|
|
// Optimized by datafusion optimizer
|
|
let optimized_plan = self.state.session_state().optimize(&optimized_plan)?;
|
|
|
|
Ok(optimized_plan)
|
|
}
|
|
|
|
#[tracing::instrument(skip_all)]
|
|
fn optimize_physical_plan(
|
|
&self,
|
|
ctx: &mut QueryEngineContext,
|
|
plan: Arc<dyn ExecutionPlan>,
|
|
) -> Result<Arc<dyn ExecutionPlan>> {
|
|
let _timer = metrics::OPTIMIZE_PHYSICAL_ELAPSED.start_timer();
|
|
|
|
// TODO(ruihang): `self.create_physical_plan()` already optimize the plan, check
|
|
// if we need to optimize it again here.
|
|
// let state = ctx.state();
|
|
// let config = state.config_options();
|
|
|
|
// skip optimize AnalyzeExec plan
|
|
let optimized_plan = if let Some(analyze_plan) = plan.as_any().downcast_ref::<AnalyzeExec>()
|
|
{
|
|
let format = if let Some(format) = ctx.query_ctx().explain_format()
|
|
&& format.to_lowercase() == "json"
|
|
{
|
|
AnalyzeFormat::JSON
|
|
} else {
|
|
AnalyzeFormat::TEXT
|
|
};
|
|
// Sets the verbose flag of the query context.
|
|
// The MergeScanExec plan uses the verbose flag to determine whether to print the plan in verbose mode.
|
|
ctx.query_ctx().set_explain_verbose(analyze_plan.verbose());
|
|
|
|
Arc::new(DistAnalyzeExec::new(
|
|
analyze_plan.input().clone(),
|
|
analyze_plan.verbose(),
|
|
format,
|
|
))
|
|
// let mut new_plan = analyze_plan.input().clone();
|
|
// for optimizer in state.physical_optimizers() {
|
|
// new_plan = optimizer
|
|
// .optimize(new_plan, config)
|
|
// .context(DataFusionSnafu)?;
|
|
// }
|
|
// Arc::new(DistAnalyzeExec::new(new_plan))
|
|
} else {
|
|
plan
|
|
// let mut new_plan = plan;
|
|
// for optimizer in state.physical_optimizers() {
|
|
// new_plan = optimizer
|
|
// .optimize(new_plan, config)
|
|
// .context(DataFusionSnafu)?;
|
|
// }
|
|
// new_plan
|
|
};
|
|
|
|
Ok(optimized_plan)
|
|
}
|
|
}
|
|
|
|
#[async_trait]
|
|
impl QueryEngine for DatafusionQueryEngine {
|
|
fn as_any(&self) -> &dyn Any {
|
|
self
|
|
}
|
|
|
|
fn planner(&self) -> Arc<dyn LogicalPlanner> {
|
|
Arc::new(DfLogicalPlanner::new(self.state.clone()))
|
|
}
|
|
|
|
fn name(&self) -> &str {
|
|
"datafusion"
|
|
}
|
|
|
|
async fn describe(
|
|
&self,
|
|
plan: LogicalPlan,
|
|
_query_ctx: QueryContextRef,
|
|
) -> Result<DescribeResult> {
|
|
let schema = plan
|
|
.schema()
|
|
.clone()
|
|
.try_into()
|
|
.context(ConvertSchemaSnafu)?;
|
|
Ok(DescribeResult {
|
|
schema,
|
|
logical_plan: plan,
|
|
})
|
|
}
|
|
|
|
async fn execute(&self, plan: LogicalPlan, query_ctx: QueryContextRef) -> Result<Output> {
|
|
match plan {
|
|
LogicalPlan::Dml(dml) => self.exec_dml_statement(dml, query_ctx).await,
|
|
_ => self.exec_query_plan(plan, query_ctx).await,
|
|
}
|
|
}
|
|
|
|
/// Note in SQL queries, aggregate names are looked up using
|
|
/// lowercase unless the query uses quotes. For example,
|
|
///
|
|
/// `SELECT MY_UDAF(x)...` will look for an aggregate named `"my_udaf"`
|
|
/// `SELECT "my_UDAF"(x)` will look for an aggregate named `"my_UDAF"`
|
|
///
|
|
/// So it's better to make UDAF name lowercase when creating one.
|
|
fn register_aggregate_function(&self, func: AggregateUDF) {
|
|
self.state.register_aggr_function(func);
|
|
}
|
|
|
|
/// Register an scalar function.
|
|
/// Will override if the function with same name is already registered.
|
|
fn register_scalar_function(&self, func: ScalarFunctionFactory) {
|
|
self.state.register_scalar_function(func);
|
|
}
|
|
|
|
fn register_table_function(&self, func: Arc<TableFunction>) {
|
|
self.state.register_table_function(func);
|
|
}
|
|
|
|
fn register_window_function(&self, func: WindowUDF) {
|
|
self.state.register_window_function(func);
|
|
}
|
|
|
|
fn read_table(&self, table: TableRef) -> Result<DataFrame> {
|
|
self.state.read_table(table).map_err(Into::into)
|
|
}
|
|
|
|
fn engine_context(&self, query_ctx: QueryContextRef) -> QueryEngineContext {
|
|
let mut state = self.state.session_state();
|
|
state.config_mut().set_extension(query_ctx.clone());
|
|
// note that hints in "x-greptime-hints" is automatically parsed
|
|
// and set to query context's extension, so we can get it from query context.
|
|
if let Some(parallelism) = query_ctx.extension(QUERY_PARALLELISM_HINT) {
|
|
if let Ok(n) = parallelism.parse::<u64>() {
|
|
if n > 0 {
|
|
let new_cfg = state.config().clone().with_target_partitions(n as usize);
|
|
*state.config_mut() = new_cfg;
|
|
}
|
|
} else {
|
|
common_telemetry::warn!(
|
|
"Failed to parse query_parallelism: {}, using default value",
|
|
parallelism
|
|
);
|
|
}
|
|
}
|
|
|
|
// configure execution options
|
|
state.config_mut().options_mut().execution.time_zone =
|
|
Some(query_ctx.timezone().to_string());
|
|
|
|
// usually it's impossible to have both `set variable` set by sql client and
|
|
// hint in header by grpc client, so only need to deal with them separately
|
|
if query_ctx.configuration_parameter().allow_query_fallback() {
|
|
state
|
|
.config_mut()
|
|
.options_mut()
|
|
.extensions
|
|
.insert(DistPlannerOptions {
|
|
allow_query_fallback: true,
|
|
});
|
|
} else if let Some(fallback) = query_ctx.extension(QUERY_FALLBACK_HINT) {
|
|
// also check the query context for fallback hint
|
|
// if it is set, we will enable the fallback
|
|
if fallback.to_lowercase().parse::<bool>().unwrap_or(false) {
|
|
state
|
|
.config_mut()
|
|
.options_mut()
|
|
.extensions
|
|
.insert(DistPlannerOptions {
|
|
allow_query_fallback: true,
|
|
});
|
|
}
|
|
}
|
|
|
|
state
|
|
.config_mut()
|
|
.options_mut()
|
|
.extensions
|
|
.insert(FunctionContext {
|
|
query_ctx: query_ctx.clone(),
|
|
state: self.engine_state().function_state(),
|
|
});
|
|
|
|
let config_options = state.config_options().clone();
|
|
let _ = state
|
|
.execution_props_mut()
|
|
.config_options
|
|
.insert(config_options);
|
|
|
|
QueryEngineContext::new(state, query_ctx)
|
|
}
|
|
|
|
fn engine_state(&self) -> &QueryEngineState {
|
|
&self.state
|
|
}
|
|
}
|
|
|
|
impl QueryExecutor for DatafusionQueryEngine {
|
|
#[tracing::instrument(skip_all)]
|
|
fn execute_stream(
|
|
&self,
|
|
ctx: &QueryEngineContext,
|
|
plan: &Arc<dyn ExecutionPlan>,
|
|
) -> Result<SendableRecordBatchStream> {
|
|
let explain_verbose = ctx.query_ctx().explain_verbose();
|
|
let output_partitions = plan.properties().output_partitioning().partition_count();
|
|
if explain_verbose {
|
|
common_telemetry::info!("Executing query plan, output_partitions: {output_partitions}");
|
|
}
|
|
|
|
let exec_timer = metrics::EXEC_PLAN_ELAPSED.start_timer();
|
|
let task_ctx = ctx.build_task_ctx();
|
|
let span = Span::current();
|
|
|
|
match plan.properties().output_partitioning().partition_count() {
|
|
0 => {
|
|
let schema = Arc::new(
|
|
Schema::try_from(plan.schema())
|
|
.map_err(BoxedError::new)
|
|
.context(QueryExecutionSnafu)?,
|
|
);
|
|
Ok(Box::pin(EmptyRecordBatchStream::new(schema)))
|
|
}
|
|
1 => {
|
|
let df_stream = plan.execute(0, task_ctx)?;
|
|
let mut stream = RecordBatchStreamAdapter::try_new_with_span(df_stream, span)
|
|
.context(error::ConvertDfRecordBatchStreamSnafu)
|
|
.map_err(BoxedError::new)
|
|
.context(QueryExecutionSnafu)?;
|
|
stream.set_metrics2(plan.clone());
|
|
stream.set_explain_verbose(explain_verbose);
|
|
let stream = OnDone::new(Box::pin(stream), move || {
|
|
let exec_cost = exec_timer.stop_and_record();
|
|
if explain_verbose {
|
|
common_telemetry::info!(
|
|
"DatafusionQueryEngine execute 1 stream, cost: {:?}s",
|
|
exec_cost,
|
|
);
|
|
}
|
|
});
|
|
Ok(Box::pin(stream))
|
|
}
|
|
_ => {
|
|
// merge into a single partition
|
|
let merged_plan = CoalescePartitionsExec::new(plan.clone());
|
|
// CoalescePartitionsExec must produce a single partition
|
|
assert_eq!(
|
|
1,
|
|
merged_plan
|
|
.properties()
|
|
.output_partitioning()
|
|
.partition_count()
|
|
);
|
|
let df_stream = merged_plan.execute(0, task_ctx)?;
|
|
let mut stream = RecordBatchStreamAdapter::try_new_with_span(df_stream, span)
|
|
.context(error::ConvertDfRecordBatchStreamSnafu)
|
|
.map_err(BoxedError::new)
|
|
.context(QueryExecutionSnafu)?;
|
|
stream.set_metrics2(plan.clone());
|
|
stream.set_explain_verbose(ctx.query_ctx().explain_verbose());
|
|
let stream = OnDone::new(Box::pin(stream), move || {
|
|
let exec_cost = exec_timer.stop_and_record();
|
|
if explain_verbose {
|
|
common_telemetry::info!(
|
|
"DatafusionQueryEngine execute {output_partitions} stream, cost: {:?}s",
|
|
exec_cost
|
|
);
|
|
}
|
|
});
|
|
Ok(Box::pin(stream))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use std::fmt;
|
|
use std::sync::Arc;
|
|
use std::sync::atomic::{AtomicUsize, Ordering};
|
|
|
|
use api::v1::SemanticType;
|
|
use arrow::array::{ArrayRef, UInt64Array};
|
|
use arrow_schema::SortOptions;
|
|
use catalog::RegisterTableRequest;
|
|
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, NUMBERS_TABLE_ID};
|
|
use common_error::ext::BoxedError;
|
|
use common_recordbatch::{EmptyRecordBatchStream, SendableRecordBatchStream, util};
|
|
use datafusion::physical_plan::display::{DisplayAs, DisplayFormatType};
|
|
use datafusion::physical_plan::expressions::PhysicalSortExpr;
|
|
use datafusion::physical_plan::joins::{HashJoinExec, JoinOn, PartitionMode};
|
|
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
|
|
use datafusion::physical_plan::{ExecutionPlan, PhysicalExpr};
|
|
use datafusion::prelude::{col, lit};
|
|
use datafusion_common::{JoinType, NullEquality};
|
|
use datafusion_physical_expr::expressions::Column;
|
|
use datatypes::prelude::ConcreteDataType;
|
|
use datatypes::schema::{ColumnSchema, SchemaRef};
|
|
use datatypes::vectors::{Helper, UInt32Vector, VectorRef};
|
|
use session::context::{QueryContext, QueryContextBuilder};
|
|
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder, RegionMetadataRef};
|
|
use store_api::region_engine::{
|
|
PartitionRange, PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties,
|
|
};
|
|
use store_api::storage::{RegionId, ScanRequest};
|
|
use table::table::numbers::{NUMBERS_TABLE_NAME, NumbersTable};
|
|
use table::table::scan::RegionScanExec;
|
|
|
|
use super::*;
|
|
use crate::options::QueryOptions;
|
|
use crate::parser::QueryLanguageParser;
|
|
use crate::part_sort::PartSortExec;
|
|
use crate::query_engine::{QueryEngineFactory, QueryEngineRef};
|
|
|
|
#[derive(Debug)]
|
|
struct RecordingScanner {
|
|
schema: SchemaRef,
|
|
metadata: RegionMetadataRef,
|
|
properties: ScannerProperties,
|
|
update_calls: Arc<AtomicUsize>,
|
|
last_filter_len: Arc<AtomicUsize>,
|
|
}
|
|
|
|
impl RecordingScanner {
|
|
fn new(
|
|
schema: SchemaRef,
|
|
metadata: RegionMetadataRef,
|
|
update_calls: Arc<AtomicUsize>,
|
|
last_filter_len: Arc<AtomicUsize>,
|
|
) -> Self {
|
|
Self {
|
|
schema,
|
|
metadata,
|
|
properties: ScannerProperties::default(),
|
|
update_calls,
|
|
last_filter_len,
|
|
}
|
|
}
|
|
}
|
|
|
|
impl RegionScanner for RecordingScanner {
|
|
fn name(&self) -> &str {
|
|
"RecordingScanner"
|
|
}
|
|
|
|
fn properties(&self) -> &ScannerProperties {
|
|
&self.properties
|
|
}
|
|
|
|
fn schema(&self) -> SchemaRef {
|
|
self.schema.clone()
|
|
}
|
|
|
|
fn metadata(&self) -> RegionMetadataRef {
|
|
self.metadata.clone()
|
|
}
|
|
|
|
fn prepare(&mut self, request: PrepareRequest) -> std::result::Result<(), BoxedError> {
|
|
self.properties.prepare(request);
|
|
Ok(())
|
|
}
|
|
|
|
fn scan_partition(
|
|
&self,
|
|
_ctx: &QueryScanContext,
|
|
_metrics_set: &ExecutionPlanMetricsSet,
|
|
_partition: usize,
|
|
) -> std::result::Result<SendableRecordBatchStream, BoxedError> {
|
|
Ok(Box::pin(EmptyRecordBatchStream::new(self.schema.clone())))
|
|
}
|
|
|
|
fn has_predicate_without_region(&self) -> bool {
|
|
true
|
|
}
|
|
|
|
fn add_dyn_filter_to_predicate(
|
|
&mut self,
|
|
filter_exprs: Vec<Arc<dyn PhysicalExpr>>,
|
|
) -> Vec<bool> {
|
|
self.update_calls.fetch_add(1, Ordering::Relaxed);
|
|
self.last_filter_len
|
|
.store(filter_exprs.len(), Ordering::Relaxed);
|
|
vec![true; filter_exprs.len()]
|
|
}
|
|
|
|
fn set_logical_region(&mut self, logical_region: bool) {
|
|
self.properties.set_logical_region(logical_region);
|
|
}
|
|
}
|
|
|
|
impl DisplayAs for RecordingScanner {
|
|
fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
write!(f, "RecordingScanner")
|
|
}
|
|
}
|
|
|
|
async fn create_test_engine() -> QueryEngineRef {
|
|
let catalog_manager = catalog::memory::new_memory_catalog_manager().unwrap();
|
|
let req = RegisterTableRequest {
|
|
catalog: DEFAULT_CATALOG_NAME.to_string(),
|
|
schema: DEFAULT_SCHEMA_NAME.to_string(),
|
|
table_name: NUMBERS_TABLE_NAME.to_string(),
|
|
table_id: NUMBERS_TABLE_ID,
|
|
table: NumbersTable::table(NUMBERS_TABLE_ID),
|
|
};
|
|
catalog_manager.register_table_sync(req).unwrap();
|
|
|
|
QueryEngineFactory::new(
|
|
catalog_manager,
|
|
None,
|
|
None,
|
|
None,
|
|
None,
|
|
false,
|
|
QueryOptions::default(),
|
|
)
|
|
.query_engine()
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_sql_to_plan() {
|
|
let engine = create_test_engine().await;
|
|
let sql = "select sum(number) from numbers limit 20";
|
|
|
|
let stmt = QueryLanguageParser::parse_sql(sql, &QueryContext::arc()).unwrap();
|
|
let plan = engine
|
|
.planner()
|
|
.plan(&stmt, QueryContext::arc())
|
|
.await
|
|
.unwrap();
|
|
|
|
assert_eq!(
|
|
plan.to_string(),
|
|
r#"Limit: skip=0, fetch=20
|
|
Projection: sum(numbers.number)
|
|
Aggregate: groupBy=[[]], aggr=[[sum(numbers.number)]]
|
|
TableScan: numbers"#
|
|
);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_execute() {
|
|
let engine = create_test_engine().await;
|
|
let sql = "select sum(number) from numbers limit 20";
|
|
|
|
let stmt = QueryLanguageParser::parse_sql(sql, &QueryContext::arc()).unwrap();
|
|
let plan = engine
|
|
.planner()
|
|
.plan(&stmt, QueryContext::arc())
|
|
.await
|
|
.unwrap();
|
|
|
|
let output = engine.execute(plan, QueryContext::arc()).await.unwrap();
|
|
|
|
match output.data {
|
|
OutputData::Stream(recordbatch) => {
|
|
let numbers = util::collect(recordbatch).await.unwrap();
|
|
assert_eq!(1, numbers.len());
|
|
assert_eq!(numbers[0].num_columns(), 1);
|
|
assert_eq!(1, numbers[0].schema.num_columns());
|
|
assert_eq!(
|
|
"sum(numbers.number)",
|
|
numbers[0].schema.column_schemas()[0].name
|
|
);
|
|
|
|
let batch = &numbers[0];
|
|
assert_eq!(1, batch.num_columns());
|
|
assert_eq!(batch.column(0).len(), 1);
|
|
|
|
let expected = Arc::new(UInt64Array::from_iter_values([4950])) as ArrayRef;
|
|
assert_eq!(batch.column(0), &expected);
|
|
}
|
|
_ => unreachable!(),
|
|
}
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_read_table() {
|
|
let engine = create_test_engine().await;
|
|
|
|
let engine = engine
|
|
.as_any()
|
|
.downcast_ref::<DatafusionQueryEngine>()
|
|
.unwrap();
|
|
let query_ctx = Arc::new(QueryContextBuilder::default().build());
|
|
let table = engine
|
|
.find_table(
|
|
&ResolvedTableReference {
|
|
catalog: "greptime".into(),
|
|
schema: "public".into(),
|
|
table: "numbers".into(),
|
|
},
|
|
&query_ctx,
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
let df = engine.read_table(table).unwrap();
|
|
let df = df
|
|
.select_columns(&["number"])
|
|
.unwrap()
|
|
.filter(col("number").lt(lit(10)))
|
|
.unwrap();
|
|
let batches = df.collect().await.unwrap();
|
|
assert_eq!(1, batches.len());
|
|
let batch = &batches[0];
|
|
|
|
assert_eq!(1, batch.num_columns());
|
|
assert_eq!(batch.column(0).len(), 10);
|
|
|
|
assert_eq!(
|
|
Helper::try_into_vector(batch.column(0)).unwrap(),
|
|
Arc::new(UInt32Vector::from_slice([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])) as VectorRef
|
|
);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_describe() {
|
|
let engine = create_test_engine().await;
|
|
let sql = "select sum(number) from numbers limit 20";
|
|
|
|
let stmt = QueryLanguageParser::parse_sql(sql, &QueryContext::arc()).unwrap();
|
|
|
|
let plan = engine
|
|
.planner()
|
|
.plan(&stmt, QueryContext::arc())
|
|
.await
|
|
.unwrap();
|
|
|
|
let DescribeResult {
|
|
schema,
|
|
logical_plan,
|
|
} = engine.describe(plan, QueryContext::arc()).await.unwrap();
|
|
|
|
assert_eq!(
|
|
schema.column_schemas()[0],
|
|
ColumnSchema::new(
|
|
"sum(numbers.number)",
|
|
ConcreteDataType::uint64_datatype(),
|
|
true
|
|
)
|
|
);
|
|
assert_eq!(
|
|
"Limit: skip=0, fetch=20\n Projection: sum(numbers.number)\n Aggregate: groupBy=[[]], aggr=[[sum(numbers.number)]]\n TableScan: numbers",
|
|
format!("{}", logical_plan.display_indent())
|
|
);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_topk_dynamic_filter_pushdown_reaches_region_scan() {
|
|
let engine = create_test_engine().await;
|
|
let engine = engine
|
|
.as_any()
|
|
.downcast_ref::<DatafusionQueryEngine>()
|
|
.unwrap();
|
|
let engine_ctx = engine.engine_context(QueryContext::arc());
|
|
let state = engine_ctx.state();
|
|
|
|
let schema = Arc::new(datatypes::schema::Schema::new(vec![ColumnSchema::new(
|
|
"ts",
|
|
ConcreteDataType::timestamp_millisecond_datatype(),
|
|
false,
|
|
)]));
|
|
|
|
let mut metadata_builder = RegionMetadataBuilder::new(RegionId::new(1024, 1));
|
|
metadata_builder
|
|
.push_column_metadata(ColumnMetadata {
|
|
column_schema: ColumnSchema::new(
|
|
"ts",
|
|
ConcreteDataType::timestamp_millisecond_datatype(),
|
|
false,
|
|
)
|
|
.with_time_index(true),
|
|
semantic_type: SemanticType::Timestamp,
|
|
column_id: 1,
|
|
})
|
|
.primary_key(vec![]);
|
|
let metadata = Arc::new(metadata_builder.build().unwrap());
|
|
|
|
let update_calls = Arc::new(AtomicUsize::new(0));
|
|
let last_filter_len = Arc::new(AtomicUsize::new(0));
|
|
let scanner = Box::new(RecordingScanner::new(
|
|
schema,
|
|
metadata,
|
|
update_calls.clone(),
|
|
last_filter_len.clone(),
|
|
));
|
|
let scan = Arc::new(RegionScanExec::new(scanner, ScanRequest::default(), None).unwrap());
|
|
|
|
let sort_expr = PhysicalSortExpr {
|
|
expr: Arc::new(Column::new("ts", 0)),
|
|
options: SortOptions {
|
|
descending: true,
|
|
..Default::default()
|
|
},
|
|
};
|
|
let partition_ranges: Vec<Vec<PartitionRange>> = vec![vec![]];
|
|
let mut plan: Arc<dyn ExecutionPlan> =
|
|
Arc::new(PartSortExec::try_new(sort_expr, Some(3), partition_ranges, scan).unwrap());
|
|
|
|
for optimizer in state.physical_optimizers() {
|
|
plan = optimizer.optimize(plan, state.config_options()).unwrap();
|
|
}
|
|
|
|
assert!(update_calls.load(Ordering::Relaxed) > 0);
|
|
assert!(last_filter_len.load(Ordering::Relaxed) > 0);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_join_dynamic_filter_pushdown_reaches_region_scan() {
|
|
let engine = create_test_engine().await;
|
|
let engine = engine
|
|
.as_any()
|
|
.downcast_ref::<DatafusionQueryEngine>()
|
|
.unwrap();
|
|
let engine_ctx = engine.engine_context(QueryContext::arc());
|
|
let state = engine_ctx.state();
|
|
|
|
assert!(
|
|
state
|
|
.config_options()
|
|
.optimizer
|
|
.enable_join_dynamic_filter_pushdown
|
|
);
|
|
|
|
let schema = Arc::new(datatypes::schema::Schema::new(vec![ColumnSchema::new(
|
|
"ts",
|
|
ConcreteDataType::timestamp_millisecond_datatype(),
|
|
false,
|
|
)]));
|
|
|
|
let mut left_metadata_builder = RegionMetadataBuilder::new(RegionId::new(2048, 1));
|
|
left_metadata_builder
|
|
.push_column_metadata(ColumnMetadata {
|
|
column_schema: ColumnSchema::new(
|
|
"ts",
|
|
ConcreteDataType::timestamp_millisecond_datatype(),
|
|
false,
|
|
)
|
|
.with_time_index(true),
|
|
semantic_type: SemanticType::Timestamp,
|
|
column_id: 1,
|
|
})
|
|
.primary_key(vec![]);
|
|
let left_metadata = Arc::new(left_metadata_builder.build().unwrap());
|
|
|
|
let mut right_metadata_builder = RegionMetadataBuilder::new(RegionId::new(2048, 2));
|
|
right_metadata_builder
|
|
.push_column_metadata(ColumnMetadata {
|
|
column_schema: ColumnSchema::new(
|
|
"ts",
|
|
ConcreteDataType::timestamp_millisecond_datatype(),
|
|
false,
|
|
)
|
|
.with_time_index(true),
|
|
semantic_type: SemanticType::Timestamp,
|
|
column_id: 1,
|
|
})
|
|
.primary_key(vec![]);
|
|
let right_metadata = Arc::new(right_metadata_builder.build().unwrap());
|
|
|
|
let left_update_calls = Arc::new(AtomicUsize::new(0));
|
|
let left_last_filter_len = Arc::new(AtomicUsize::new(0));
|
|
let right_update_calls = Arc::new(AtomicUsize::new(0));
|
|
let right_last_filter_len = Arc::new(AtomicUsize::new(0));
|
|
|
|
let left_scan = Arc::new(
|
|
RegionScanExec::new(
|
|
Box::new(RecordingScanner::new(
|
|
schema.clone(),
|
|
left_metadata,
|
|
left_update_calls.clone(),
|
|
left_last_filter_len.clone(),
|
|
)),
|
|
ScanRequest::default(),
|
|
None,
|
|
)
|
|
.unwrap(),
|
|
);
|
|
let right_scan = Arc::new(
|
|
RegionScanExec::new(
|
|
Box::new(RecordingScanner::new(
|
|
schema,
|
|
right_metadata,
|
|
right_update_calls.clone(),
|
|
right_last_filter_len.clone(),
|
|
)),
|
|
ScanRequest::default(),
|
|
None,
|
|
)
|
|
.unwrap(),
|
|
);
|
|
|
|
let on: JoinOn = vec![(
|
|
Arc::new(Column::new("ts", 0)) as Arc<dyn PhysicalExpr>,
|
|
Arc::new(Column::new("ts", 0)) as Arc<dyn PhysicalExpr>,
|
|
)];
|
|
|
|
let mut plan: Arc<dyn ExecutionPlan> = Arc::new(
|
|
HashJoinExec::try_new(
|
|
left_scan,
|
|
right_scan,
|
|
on,
|
|
None,
|
|
&JoinType::Inner,
|
|
None,
|
|
PartitionMode::CollectLeft,
|
|
NullEquality::NullEqualsNull,
|
|
false,
|
|
)
|
|
.unwrap(),
|
|
);
|
|
|
|
for optimizer in state.physical_optimizers() {
|
|
plan = optimizer.optimize(plan, state.config_options()).unwrap();
|
|
}
|
|
|
|
assert!(left_update_calls.load(Ordering::Relaxed) > 0);
|
|
assert_eq!(0, left_last_filter_len.load(Ordering::Relaxed));
|
|
assert!(right_update_calls.load(Ordering::Relaxed) > 0);
|
|
assert!(right_last_filter_len.load(Ordering::Relaxed) > 0);
|
|
}
|
|
}
|