build(deps): bump datafusion 20240528 (#4061)

* build(deps): bump datafusion 20240528

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* another update

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update expected sqlness result

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix first/last value

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* reformat comment

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix remaining errors

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* revert toml format

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix pyo3 feature

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove dead code

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* Apply suggestions from code review

Co-authored-by: Jeremyhi <jiachun_feng@proton.me>

* format file

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: Jeremyhi <jiachun_feng@proton.me>
This commit is contained in:
Ruihang Xia
2024-06-01 22:03:00 +08:00
committed by GitHub
parent 45fee948e9
commit c2218f8be8
60 changed files with 1136 additions and 1179 deletions

View File

@@ -58,6 +58,7 @@ regex.workspace = true
session.workspace = true
snafu.workspace = true
sql.workspace = true
sqlparser.workspace = true
store-api.workspace = true
substrait.workspace = true
table.workspace = true

View File

@@ -97,8 +97,8 @@ impl ExecutionPlan for DistAnalyzeExec {
&self.properties
}
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![self.input.clone()]
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![&self.input]
}
/// AnalyzeExec is handled specially so this value is ignored
@@ -210,7 +210,7 @@ fn create_output_batch(
builder.append_metric(0, 0, stage_0_metrics);
// Find merge scan and append its sub_stage_metrics
input.apply(&mut |plan| {
input.apply(|plan| {
if let Some(merge_scan) = plan.as_any().downcast_ref::<MergeScanExec>() {
let sub_stage_metrics = merge_scan.sub_stage_metrics();
for (node, metric) in sub_stage_metrics.into_iter().enumerate() {

View File

@@ -23,13 +23,12 @@ use common_query::logical_plan::create_aggregate_function;
use datafusion::catalog::TableReference;
use datafusion::error::Result as DfResult;
use datafusion::execution::context::SessionState;
use datafusion::physical_plan::udf::ScalarUDF;
use datafusion::sql::planner::ContextProvider;
use datafusion::variable::VarType;
use datafusion_common::config::ConfigOptions;
use datafusion_common::DataFusionError;
use datafusion_expr::var_provider::is_system_variables;
use datafusion_expr::{AggregateUDF, TableSource, WindowUDF};
use datafusion_expr::{AggregateUDF, ScalarUDF, TableSource, WindowUDF};
use datafusion_sql::parser::Statement as DfStatement;
use session::context::QueryContextRef;
use snafu::ResultExt;
@@ -128,11 +127,14 @@ impl ContextProvider for DfContextProviderAdapter {
}
fn get_aggregate_meta(&self, name: &str) -> Option<Arc<AggregateUDF>> {
self.engine_state.aggregate_function(name).map(|func| {
Arc::new(
create_aggregate_function(func.name(), func.args_count(), func.create()).into(),
)
})
self.engine_state.aggregate_function(name).map_or_else(
|| self.session_state.aggregate_functions().get(name).cloned(),
|func| {
Some(Arc::new(
create_aggregate_function(func.name(), func.args_count(), func.create()).into(),
))
},
)
}
fn get_window_meta(&self, _name: &str) -> Option<Arc<WindowUDF>> {
@@ -161,17 +163,17 @@ impl ContextProvider for DfContextProviderAdapter {
self.session_state.config_options()
}
fn udfs_names(&self) -> Vec<String> {
fn udf_names(&self) -> Vec<String> {
// TODO(LFC): Impl it.
vec![]
}
fn udafs_names(&self) -> Vec<String> {
fn udaf_names(&self) -> Vec<String> {
// TODO(LFC): Impl it.
vec![]
}
fn udwfs_names(&self) -> Vec<String> {
fn udwf_names(&self) -> Vec<String> {
// TODO(LFC): Impl it.
vec![]
}

View File

@@ -149,7 +149,6 @@ impl Categorizer {
| Expr::SimilarTo(_)
| Expr::IsUnknown(_)
| Expr::IsNotUnknown(_)
| Expr::GetIndexedField(_)
| Expr::Case(_)
| Expr::Cast(_)
| Expr::TryCast(_)

View File

@@ -86,8 +86,12 @@ impl UserDefinedLogicalNodeCore for MergeScanLogicalPlan {
write!(f, "MergeScan [is_placeholder={}]", self.is_placeholder)
}
fn from_template(&self, _exprs: &[datafusion_expr::Expr], _inputs: &[LogicalPlan]) -> Self {
self.clone()
fn with_exprs_and_inputs(
&self,
_exprs: Vec<datafusion::prelude::Expr>,
_inputs: Vec<LogicalPlan>,
) -> Result<Self> {
Ok(self.clone())
}
}
@@ -306,7 +310,7 @@ impl ExecutionPlan for MergeScanExec {
&self.properties
}
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![]
}

View File

@@ -171,7 +171,7 @@ struct TableNameExtractor {
pub table_name: Option<TableName>,
}
impl TreeNodeVisitor for TableNameExtractor {
impl TreeNodeVisitor<'_> for TableNameExtractor {
type Node = LogicalPlan;
fn f_down(&mut self, node: &Self::Node) -> Result<TreeNodeRecursion> {

View File

@@ -57,7 +57,7 @@ impl CountWildcardToTimeIndexRule {
};
plan.map_expressions(|expr| {
let original_name = name_preserver.save(&expr)?;
let transformed_expr = expr.transform_up_mut(&mut |expr| match expr {
let transformed_expr = expr.transform_up(|expr| match expr {
Expr::WindowFunction(mut window_function)
if Self::is_count_star_window_aggregate(&window_function) =>
{
@@ -135,7 +135,7 @@ struct TimeIndexFinder {
table_alias: Option<TableReference>,
}
impl TreeNodeVisitor for TimeIndexFinder {
impl TreeNodeVisitor<'_> for TimeIndexFinder {
type Node = LogicalPlan;
fn f_down(&mut self, node: &Self::Node) -> DataFusionResult<TreeNodeRecursion> {

View File

@@ -75,9 +75,9 @@ impl OrderHintRule {
{
let mut opts = Vec::with_capacity(order_expr.len());
for sort in order_expr {
let name = match sort.expr.try_into_col() {
Ok(col) => col.name,
Err(_) => return Ok(Transformed::no(plan)),
let name = match sort.expr.try_as_col() {
Some(col) => col.name.clone(),
None => return Ok(Transformed::no(plan)),
};
opts.push(OrderOption {
name,
@@ -108,7 +108,7 @@ struct OrderHintVisitor {
order_expr: Option<Vec<Sort>>,
}
impl TreeNodeVisitor for OrderHintVisitor {
impl TreeNodeVisitor<'_> for OrderHintVisitor {
type Node = LogicalPlan;
fn f_down(&mut self, node: &Self::Node) -> DataFusionResult<TreeNodeRecursion> {

View File

@@ -50,7 +50,7 @@ impl PhysicalOptimizerRule for RemoveDuplicate {
impl RemoveDuplicate {
fn do_optimize(plan: Arc<dyn ExecutionPlan>) -> DfResult<Arc<dyn ExecutionPlan>> {
let result = plan
.transform_down_mut(&mut |plan| {
.transform_down(|plan| {
if plan.as_any().is::<CoalesceBatchesExec>()
|| plan.as_any().is::<RepartitionExec>()
{

View File

@@ -26,7 +26,7 @@ pub struct StringNormalizationRule;
impl AnalyzerRule for StringNormalizationRule {
fn analyze(&self, plan: LogicalPlan, _config: &ConfigOptions) -> Result<LogicalPlan> {
plan.transform(&|plan| {
plan.transform(|plan| {
let mut converter = StringNormalizationConverter;
let inputs = plan.inputs().into_iter().cloned().collect::<Vec<_>>();
let expr = plan

View File

@@ -149,7 +149,7 @@ impl DfLogicalPlanner {
&query_ctx,
)?),
);
PromPlanner::stmt_to_plan(table_provider, stmt)
PromPlanner::stmt_to_plan(table_provider, stmt, &self.session_state)
.await
.map(LogicalPlan::DfPlan)
.map_err(BoxedError::new)

View File

@@ -13,7 +13,6 @@
// limitations under the License.
use std::collections::{BTreeSet, HashSet, VecDeque};
use std::str::FromStr;
use std::sync::Arc;
use std::time::UNIX_EPOCH;
@@ -22,13 +21,14 @@ use catalog::table_source::DfTableSourceProvider;
use common_query::prelude::GREPTIME_VALUE;
use datafusion::common::{DFSchemaRef, Result as DfResult};
use datafusion::datasource::DefaultTableSource;
use datafusion::execution::context::SessionState;
use datafusion::logical_expr::expr::{
AggregateFunction, AggregateFunctionDefinition, Alias, ScalarFunction,
};
use datafusion::logical_expr::expr_rewriter::normalize_cols;
use datafusion::logical_expr::{
AggregateFunction as AggregateFunctionEnum, BinaryExpr, BuiltinScalarFunction, Cast, Extension,
LogicalPlan, LogicalPlanBuilder, Operator, ScalarFunctionDefinition, ScalarUDF as ScalarUdfDef,
AggregateFunction as AggregateFunctionEnum, BinaryExpr, Cast, Extension, LogicalPlan,
LogicalPlanBuilder, Operator, ScalarUDF as ScalarUdfDef,
};
use datafusion::prelude as df_prelude;
use datafusion::prelude::{Column, Expr as DfExpr, JoinType};
@@ -151,17 +151,22 @@ impl PromPlanner {
pub async fn stmt_to_plan(
table_provider: DfTableSourceProvider,
stmt: EvalStmt,
session_state: &SessionState,
) -> Result<LogicalPlan> {
let mut planner = Self {
table_provider,
ctx: PromPlannerContext::from_eval_stmt(&stmt),
};
planner.prom_expr_to_plan(stmt.expr).await
planner.prom_expr_to_plan(stmt.expr, session_state).await
}
#[async_recursion]
pub async fn prom_expr_to_plan(&mut self, prom_expr: PromExpr) -> Result<LogicalPlan> {
pub async fn prom_expr_to_plan(
&mut self,
prom_expr: PromExpr,
session_state: &SessionState,
) -> Result<LogicalPlan> {
let res = match &prom_expr {
PromExpr::Aggregate(AggregateExpr {
op,
@@ -170,7 +175,7 @@ impl PromPlanner {
param: _param,
modifier,
}) => {
let input = self.prom_expr_to_plan(*expr.clone()).await?;
let input = self.prom_expr_to_plan(*expr.clone(), session_state).await?;
// calculate columns to group by
// Need to append time index column into group by columns
@@ -194,7 +199,7 @@ impl PromPlanner {
}
PromExpr::Unary(UnaryExpr { expr }) => {
// Unary Expr in PromQL implys the `-` operator
let input = self.prom_expr_to_plan(*expr.clone()).await?;
let input = self.prom_expr_to_plan(*expr.clone(), session_state).await?;
self.projection_for_each_field_column(input, |col| {
Ok(DfExpr::Negative(Box::new(DfExpr::Column(col.into()))))
})?
@@ -250,7 +255,7 @@ impl PromPlanner {
}
// lhs is a literal, rhs is a column
(Some(mut expr), None) => {
let input = self.prom_expr_to_plan(*rhs.clone()).await?;
let input = self.prom_expr_to_plan(*rhs.clone(), session_state).await?;
// check if the literal is a special time expr
if let Some(time_expr) = Self::try_build_special_time_expr(
lhs,
@@ -279,7 +284,7 @@ impl PromPlanner {
}
// lhs is a column, rhs is a literal
(None, Some(mut expr)) => {
let input = self.prom_expr_to_plan(*lhs.clone()).await?;
let input = self.prom_expr_to_plan(*lhs.clone(), session_state).await?;
// check if the literal is a special time expr
if let Some(time_expr) = Self::try_build_special_time_expr(
rhs,
@@ -308,14 +313,16 @@ impl PromPlanner {
}
// both are columns. join them on time index
(None, None) => {
let left_input = self.prom_expr_to_plan(*lhs.clone()).await?;
let left_input =
self.prom_expr_to_plan(*lhs.clone(), session_state).await?;
let left_field_columns = self.ctx.field_columns.clone();
let mut left_table_ref = self
.table_ref()
.unwrap_or_else(|_| TableReference::bare(""));
let left_context = self.ctx.clone();
let right_input = self.prom_expr_to_plan(*rhs.clone()).await?;
let right_input =
self.prom_expr_to_plan(*rhs.clone(), session_state).await?;
let right_field_columns = self.ctx.field_columns.clone();
let mut right_table_ref = self
.table_ref()
@@ -399,7 +406,9 @@ impl PromPlanner {
}
}
}
PromExpr::Paren(ParenExpr { expr }) => self.prom_expr_to_plan(*expr.clone()).await?,
PromExpr::Paren(ParenExpr { expr }) => {
self.prom_expr_to_plan(*expr.clone(), session_state).await?
}
PromExpr::Subquery(SubqueryExpr { .. }) => UnsupportedExprSnafu {
name: "Prom Subquery",
}
@@ -510,16 +519,18 @@ impl PromPlanner {
PromExpr::Call(Call { func, args }) => {
// some special functions that are not expression but a plan
match func.name {
SPECIAL_HISTOGRAM_QUANTILE => return self.create_histogram_plan(args).await,
SPECIAL_HISTOGRAM_QUANTILE => {
return self.create_histogram_plan(args, session_state).await
}
SPECIAL_VECTOR_FUNCTION => return self.create_vector_plan(args).await,
SCALAR_FUNCTION => return self.create_scalar_plan(args).await,
SCALAR_FUNCTION => return self.create_scalar_plan(args, session_state).await,
_ => {}
}
// transform function arguments
let args = self.create_function_args(&args.args)?;
let input = if let Some(prom_expr) = args.input {
self.prom_expr_to_plan(prom_expr).await?
self.prom_expr_to_plan(prom_expr, session_state).await?
} else {
self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
self.ctx.reset_table_name_and_schema();
@@ -537,7 +548,8 @@ impl PromPlanner {
),
})
};
let mut func_exprs = self.create_function_expr(func, args.literals)?;
let mut func_exprs =
self.create_function_expr(func, args.literals, session_state)?;
func_exprs.insert(0, self.create_time_index_column_expr()?);
func_exprs.extend_from_slice(&self.create_tag_column_exprs()?);
@@ -551,7 +563,9 @@ impl PromPlanner {
}
PromExpr::Extension(promql_parser::parser::ast::Extension { expr }) => {
let children = expr.children();
let plan = self.prom_expr_to_plan(children[0].clone()).await?;
let plan = self
.prom_expr_to_plan(children[0].clone(), session_state)
.await?;
// Wrapper for the explanation/analyze of the existing plan
// https://docs.rs/datafusion-expr/latest/datafusion_expr/logical_plan/builder/struct.LogicalPlanBuilder.html#method.explain
// if `analyze` is true, runs the actual plan and produces
@@ -1063,6 +1077,7 @@ impl PromPlanner {
&mut self,
func: &Function,
other_input_exprs: Vec<DfExpr>,
session_state: &SessionState,
) -> Result<Vec<DfExpr>> {
// TODO(ruihang): check function args list
let mut other_input_exprs: VecDeque<DfExpr> = other_input_exprs.into();
@@ -1071,30 +1086,30 @@ impl PromPlanner {
let field_column_pos = 0;
let mut exprs = Vec::with_capacity(self.ctx.field_columns.len());
let scalar_func = match func.name {
"increase" => ScalarFunc::ExtrapolateUdf(Increase::scalar_udf(
"increase" => ScalarFunc::ExtrapolateUdf(Arc::new(Increase::scalar_udf(
self.ctx.range.context(ExpectRangeSelectorSnafu)?,
)),
"rate" => ScalarFunc::ExtrapolateUdf(Rate::scalar_udf(
))),
"rate" => ScalarFunc::ExtrapolateUdf(Arc::new(Rate::scalar_udf(
self.ctx.range.context(ExpectRangeSelectorSnafu)?,
)),
"delta" => ScalarFunc::ExtrapolateUdf(Delta::scalar_udf(
))),
"delta" => ScalarFunc::ExtrapolateUdf(Arc::new(Delta::scalar_udf(
self.ctx.range.context(ExpectRangeSelectorSnafu)?,
)),
"idelta" => ScalarFunc::Udf(IDelta::<false>::scalar_udf()),
"irate" => ScalarFunc::Udf(IDelta::<true>::scalar_udf()),
"resets" => ScalarFunc::Udf(Resets::scalar_udf()),
"changes" => ScalarFunc::Udf(Changes::scalar_udf()),
"deriv" => ScalarFunc::Udf(Deriv::scalar_udf()),
"avg_over_time" => ScalarFunc::Udf(AvgOverTime::scalar_udf()),
"min_over_time" => ScalarFunc::Udf(MinOverTime::scalar_udf()),
"max_over_time" => ScalarFunc::Udf(MaxOverTime::scalar_udf()),
"sum_over_time" => ScalarFunc::Udf(SumOverTime::scalar_udf()),
"count_over_time" => ScalarFunc::Udf(CountOverTime::scalar_udf()),
"last_over_time" => ScalarFunc::Udf(LastOverTime::scalar_udf()),
"absent_over_time" => ScalarFunc::Udf(AbsentOverTime::scalar_udf()),
"present_over_time" => ScalarFunc::Udf(PresentOverTime::scalar_udf()),
"stddev_over_time" => ScalarFunc::Udf(StddevOverTime::scalar_udf()),
"stdvar_over_time" => ScalarFunc::Udf(StdvarOverTime::scalar_udf()),
))),
"idelta" => ScalarFunc::Udf(Arc::new(IDelta::<false>::scalar_udf())),
"irate" => ScalarFunc::Udf(Arc::new(IDelta::<true>::scalar_udf())),
"resets" => ScalarFunc::Udf(Arc::new(Resets::scalar_udf())),
"changes" => ScalarFunc::Udf(Arc::new(Changes::scalar_udf())),
"deriv" => ScalarFunc::Udf(Arc::new(Deriv::scalar_udf())),
"avg_over_time" => ScalarFunc::Udf(Arc::new(AvgOverTime::scalar_udf())),
"min_over_time" => ScalarFunc::Udf(Arc::new(MinOverTime::scalar_udf())),
"max_over_time" => ScalarFunc::Udf(Arc::new(MaxOverTime::scalar_udf())),
"sum_over_time" => ScalarFunc::Udf(Arc::new(SumOverTime::scalar_udf())),
"count_over_time" => ScalarFunc::Udf(Arc::new(CountOverTime::scalar_udf())),
"last_over_time" => ScalarFunc::Udf(Arc::new(LastOverTime::scalar_udf())),
"absent_over_time" => ScalarFunc::Udf(Arc::new(AbsentOverTime::scalar_udf())),
"present_over_time" => ScalarFunc::Udf(Arc::new(PresentOverTime::scalar_udf())),
"stddev_over_time" => ScalarFunc::Udf(Arc::new(StddevOverTime::scalar_udf())),
"stdvar_over_time" => ScalarFunc::Udf(Arc::new(StdvarOverTime::scalar_udf())),
"quantile_over_time" => {
let quantile_expr = match other_input_exprs.pop_front() {
Some(DfExpr::Literal(ScalarValue::Float64(Some(quantile)))) => quantile,
@@ -1103,7 +1118,7 @@ impl PromPlanner {
}
.fail()?,
};
ScalarFunc::Udf(QuantileOverTime::scalar_udf(quantile_expr))
ScalarFunc::Udf(Arc::new(QuantileOverTime::scalar_udf(quantile_expr)))
}
"predict_linear" => {
let t_expr = match other_input_exprs.pop_front() {
@@ -1114,7 +1129,7 @@ impl PromPlanner {
}
.fail()?,
};
ScalarFunc::Udf(PredictLinear::scalar_udf(t_expr))
ScalarFunc::Udf(Arc::new(PredictLinear::scalar_udf(t_expr)))
}
"holt_winters" => {
let sf_exp = match other_input_exprs.pop_front() {
@@ -1134,7 +1149,7 @@ impl PromPlanner {
}
.fail()?,
};
ScalarFunc::Udf(HoltWinters::scalar_udf(sf_exp, tf_exp))
ScalarFunc::Udf(Arc::new(HoltWinters::scalar_udf(sf_exp, tf_exp)))
}
"time" => {
exprs.push(build_special_time_expr(
@@ -1201,9 +1216,7 @@ impl PromPlanner {
right: Box::new(interval_1day_lit_expr),
});
let date_trunc_expr = DfExpr::ScalarFunction(ScalarFunction {
func_def: ScalarFunctionDefinition::UDF(
datafusion_functions::datetime::date_trunc(),
),
func: datafusion_functions::datetime::date_trunc(),
args: vec![month_lit_expr, self.create_time_index_column_expr()?],
});
let date_trunc_plus_interval_expr = DfExpr::BinaryExpr(BinaryExpr {
@@ -1212,9 +1225,7 @@ impl PromPlanner {
right: Box::new(the_1month_minus_1day_expr),
});
let date_part_expr = DfExpr::ScalarFunction(ScalarFunction {
func_def: ScalarFunctionDefinition::UDF(
datafusion_functions::datetime::date_part(),
),
func: datafusion_functions::datetime::date_part(),
args: vec![day_lit_expr, date_trunc_plus_interval_expr],
});
@@ -1222,8 +1233,8 @@ impl PromPlanner {
ScalarFunc::GeneratedExpr
}
_ => {
if let Ok(f) = BuiltinScalarFunction::from_str(func.name) {
ScalarFunc::DataFusionBuiltin(f)
if let Some(f) = session_state.scalar_functions().get(func.name) {
ScalarFunc::DataFusionBuiltin(f.clone())
} else if let Some(f) = datafusion_functions::math::functions()
.iter()
.find(|f| f.name() == func.name)
@@ -1242,28 +1253,25 @@ impl PromPlanner {
let col_expr = DfExpr::Column(Column::from_name(value));
match scalar_func.clone() {
ScalarFunc::DataFusionBuiltin(fun) => {
ScalarFunc::DataFusionBuiltin(func) => {
other_input_exprs.insert(field_column_pos, col_expr);
let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
func_def: ScalarFunctionDefinition::BuiltIn(fun),
func,
args: other_input_exprs.clone().into(),
});
exprs.push(fn_expr);
let _ = other_input_exprs.remove(field_column_pos);
}
ScalarFunc::DataFusionUdf(f) => {
ScalarFunc::DataFusionUdf(func) => {
let args = itertools::chain!(
other_input_exprs.iter().take(field_column_pos).cloned(),
std::iter::once(col_expr),
other_input_exprs.iter().skip(field_column_pos).cloned()
)
.collect_vec();
exprs.push(DfExpr::ScalarFunction(ScalarFunction {
func_def: ScalarFunctionDefinition::UDF(f),
args,
}))
exprs.push(DfExpr::ScalarFunction(ScalarFunction { func, args }))
}
ScalarFunc::Udf(fun) => {
ScalarFunc::Udf(func) => {
let ts_range_expr = DfExpr::Column(Column::from_name(
RangeManipulate::build_timestamp_range_name(
self.ctx.time_index_column.as_ref().unwrap(),
@@ -1272,14 +1280,14 @@ impl PromPlanner {
other_input_exprs.insert(field_column_pos, ts_range_expr);
other_input_exprs.insert(field_column_pos + 1, col_expr);
let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
func_def: ScalarFunctionDefinition::UDF(Arc::new(fun)),
func,
args: other_input_exprs.clone().into(),
});
exprs.push(fn_expr);
let _ = other_input_exprs.remove(field_column_pos + 1);
let _ = other_input_exprs.remove(field_column_pos);
}
ScalarFunc::ExtrapolateUdf(fun) => {
ScalarFunc::ExtrapolateUdf(func) => {
let ts_range_expr = DfExpr::Column(Column::from_name(
RangeManipulate::build_timestamp_range_name(
self.ctx.time_index_column.as_ref().unwrap(),
@@ -1290,7 +1298,7 @@ impl PromPlanner {
other_input_exprs
.insert(field_column_pos + 2, self.create_time_index_column_expr()?);
let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
func_def: ScalarFunctionDefinition::UDF(Arc::new(fun)),
func,
args: other_input_exprs.clone().into(),
});
exprs.push(fn_expr);
@@ -1418,7 +1426,11 @@ impl PromPlanner {
}
/// Create a [SPECIAL_HISTOGRAM_QUANTILE] plan.
async fn create_histogram_plan(&mut self, args: &PromFunctionArgs) -> Result<LogicalPlan> {
async fn create_histogram_plan(
&mut self,
args: &PromFunctionArgs,
session_state: &SessionState,
) -> Result<LogicalPlan> {
if args.args.len() != 2 {
return FunctionInvalidArgumentSnafu {
fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(),
@@ -1431,7 +1443,7 @@ impl PromPlanner {
}
})?;
let input = args.args[1].as_ref().clone();
let input_plan = self.prom_expr_to_plan(input).await?;
let input_plan = self.prom_expr_to_plan(input, session_state).await?;
if !self.ctx.has_le_tag() {
return ColumnNotFoundSnafu {
@@ -1505,7 +1517,11 @@ impl PromPlanner {
}
/// Create a [SCALAR_FUNCTION] plan
async fn create_scalar_plan(&mut self, args: &PromFunctionArgs) -> Result<LogicalPlan> {
async fn create_scalar_plan(
&mut self,
args: &PromFunctionArgs,
session_state: &SessionState,
) -> Result<LogicalPlan> {
ensure!(
args.len() == 1,
FunctionInvalidArgumentSnafu {
@@ -1513,7 +1529,7 @@ impl PromPlanner {
}
);
let input = self
.prom_expr_to_plan(args.args[0].as_ref().clone())
.prom_expr_to_plan(args.args[0].as_ref().clone(), session_state)
.await?;
ensure!(
self.ctx.field_columns.len() == 1,
@@ -1653,16 +1669,13 @@ impl PromPlanner {
token::T_LTE => Ok(Box::new(|lhs, rhs| Ok(lhs.lt_eq(rhs)))),
token::T_POW => Ok(Box::new(|lhs, rhs| {
Ok(DfExpr::ScalarFunction(ScalarFunction {
func_def: ScalarFunctionDefinition::UDF(datafusion_functions::math::power()),
func: datafusion_functions::math::power(),
args: vec![lhs, rhs],
}))
})),
token::T_ATAN2 => Ok(Box::new(|lhs, rhs| {
Ok(DfExpr::ScalarFunction(ScalarFunction {
// func_def: ScalarFunctionDefinition::BuiltIn(BuiltinScalarFunction::Atan2),
func_def: datafusion_expr::ScalarFunctionDefinition::UDF(
datafusion_functions::math::atan2(),
),
func: datafusion_functions::math::atan2(),
args: vec![lhs, rhs],
}))
})),
@@ -2153,7 +2166,7 @@ impl PromPlanner {
})?,
);
let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
func_def: ScalarFunctionDefinition::UDF(datafusion_functions::datetime::date_part()),
func: datafusion_functions::datetime::date_part(),
args: vec![lit_expr, input_expr],
});
Ok(fn_expr)
@@ -2168,13 +2181,13 @@ struct FunctionArgs {
#[derive(Debug, Clone)]
enum ScalarFunc {
DataFusionBuiltin(BuiltinScalarFunction),
DataFusionBuiltin(Arc<ScalarUdfDef>),
/// The UDF that is defined by Datafusion itself.
DataFusionUdf(Arc<ScalarUdfDef>),
Udf(ScalarUdfDef),
Udf(Arc<ScalarUdfDef>),
// todo(ruihang): maybe merge with Udf later
/// UDF that require extra information like range length to be evaluated.
ExtrapolateUdf(ScalarUdfDef),
ExtrapolateUdf(Arc<ScalarUdfDef>),
/// Func that doesn't require input, like `time()`.
GeneratedExpr,
}
@@ -2187,8 +2200,10 @@ mod test {
use catalog::RegisterTableRequest;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_query::test_util::DummyDecoder;
use datafusion::execution::runtime_env::RuntimeEnv;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, Schema};
use df_prelude::SessionConfig;
use promql_parser::label::Labels;
use promql_parser::parser;
use session::context::QueryContext;
@@ -2197,6 +2212,10 @@ mod test {
use super::*;
fn build_session_state() -> SessionState {
SessionState::new_with_config_rt(SessionConfig::new(), Arc::new(RuntimeEnv::default()))
}
async fn build_test_table_provider(
table_name_tuples: &[(String, String)],
num_tag: usize,
@@ -2295,7 +2314,7 @@ mod test {
1,
)
.await;
let plan = PromPlanner::stmt_to_plan(table_provider, eval_stmt)
let plan = PromPlanner::stmt_to_plan(table_provider, eval_stmt, &build_session_state())
.await
.unwrap();
@@ -2505,9 +2524,10 @@ mod test {
2,
)
.await;
let plan = PromPlanner::stmt_to_plan(table_provider, eval_stmt.clone())
.await
.unwrap();
let plan =
PromPlanner::stmt_to_plan(table_provider, eval_stmt.clone(), &build_session_state())
.await
.unwrap();
let expected_no_without = String::from(
"Sort: some_metric.tag_1 ASC NULLS LAST, some_metric.timestamp ASC NULLS LAST [tag_1:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
\n Aggregate: groupBy=[[some_metric.tag_1, some_metric.timestamp]], aggr=[[TEMPLATE(some_metric.field_0), TEMPLATE(some_metric.field_1)]] [tag_1:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
@@ -2535,7 +2555,7 @@ mod test {
2,
)
.await;
let plan = PromPlanner::stmt_to_plan(table_provider, eval_stmt)
let plan = PromPlanner::stmt_to_plan(table_provider, eval_stmt, &build_session_state())
.await
.unwrap();
let expected_without = String::from(
@@ -2660,7 +2680,7 @@ mod test {
1,
)
.await;
let plan = PromPlanner::stmt_to_plan(table_provider, eval_stmt)
let plan = PromPlanner::stmt_to_plan(table_provider, eval_stmt, &build_session_state())
.await
.unwrap();
@@ -2710,7 +2730,7 @@ mod test {
1,
)
.await;
let plan = PromPlanner::stmt_to_plan(table_provider, eval_stmt)
let plan = PromPlanner::stmt_to_plan(table_provider, eval_stmt, &build_session_state())
.await
.unwrap();
@@ -2954,9 +2974,13 @@ mod test {
3,
)
.await;
let plan = PromPlanner::stmt_to_plan(table_provider, eval_stmt.clone())
.await
.unwrap();
let plan = PromPlanner::stmt_to_plan(
table_provider,
eval_stmt.clone(),
&build_session_state(),
)
.await
.unwrap();
let mut fields = plan.schema().field_names();
let mut expected = case.1.into_iter().map(String::from).collect::<Vec<_>>();
fields.sort();
@@ -2978,7 +3002,12 @@ mod test {
3,
)
.await;
let plan = PromPlanner::stmt_to_plan(table_provider, eval_stmt.clone()).await;
let plan = PromPlanner::stmt_to_plan(
table_provider,
eval_stmt.clone(),
&build_session_state(),
)
.await;
assert!(plan.is_err(), "case: {:?}", case);
}
}
@@ -3030,7 +3059,8 @@ mod test {
)
.await;
let plan = PromPlanner::stmt_to_plan(table_provider, eval_stmt).await;
let plan =
PromPlanner::stmt_to_plan(table_provider, eval_stmt, &build_session_state()).await;
assert!(plan.is_err(), "query: {:?}", query);
}
}
@@ -3096,6 +3126,7 @@ mod test {
interval: Duration::from_secs(5),
lookback_delta: Duration::from_secs(1),
},
&build_session_state(),
)
.await
.unwrap();
@@ -3124,6 +3155,7 @@ mod test {
interval: Duration::from_secs(5),
lookback_delta: Duration::from_secs(1),
},
&build_session_state(),
)
.await
.unwrap();

View File

@@ -496,8 +496,9 @@ impl RangeSelect {
DFSchema::new_with_metadata(by_fields, input.schema().metadata().clone())
.context(DataFusionSnafu)?,
);
// If the results of project plan can be obtained directly from range plan without any additional calculations, no project plan is required.
// We can simply project the final output of the range plan to produce the final result.
// If the results of project plan can be obtained directly from range plan without any additional
// calculations, no project plan is required. We can simply project the final output of the range
// plan to produce the final result.
let schema_project = projection_expr
.iter()
.map(|project_expr| {
@@ -506,7 +507,12 @@ impl RangeSelect {
.index_of_column_by_name(column.relation.as_ref(), &column.name)
.ok_or(())
} else {
Err(())
let (qualifier, field) = project_expr
.to_field(input.schema().as_ref())
.map_err(|_| ())?;
schema_before_project
.index_of_column_by_name(qualifier.as_ref(), field.name())
.ok_or(())
}
})
.collect::<std::result::Result<Vec<usize>, ()>>()
@@ -584,9 +590,22 @@ impl UserDefinedLogicalNodeCore for RangeSelect {
)
}
fn from_template(&self, exprs: &[Expr], inputs: &[LogicalPlan]) -> Self {
assert!(!inputs.is_empty());
assert!(exprs.len() == self.range_expr.len() + self.by.len() + 1);
fn with_exprs_and_inputs(
&self,
exprs: Vec<Expr>,
inputs: Vec<LogicalPlan>,
) -> DataFusionResult<Self> {
if inputs.is_empty() {
return Err(DataFusionError::Plan(
"RangeSelect: inputs is empty".to_string(),
));
}
if exprs.len() != self.range_expr.len() + self.by.len() + 1 {
return Err(DataFusionError::Plan(
"RangeSelect: exprs length not match".to_string(),
));
}
let range_expr = exprs
.iter()
.zip(self.range_expr.iter())
@@ -601,7 +620,7 @@ impl UserDefinedLogicalNodeCore for RangeSelect {
.collect();
let time_expr = exprs[self.range_expr.len()].clone();
let by = exprs[self.range_expr.len() + 1..].to_vec();
Self {
Ok(Self {
align: self.align,
align_to: self.align_to,
range_expr,
@@ -613,7 +632,7 @@ impl UserDefinedLogicalNodeCore for RangeSelect {
by_schema: self.by_schema.clone(),
schema_project: self.schema_project.clone(),
schema_before_project: self.schema_before_project.clone(),
}
})
}
}
@@ -674,24 +693,11 @@ impl RangeSelect {
};
let expr = match &range_expr {
Expr::AggregateFunction(
aggr @ datafusion_expr::expr::AggregateFunction {
func_def:
AggregateFunctionDefinition::BuiltIn(AggregateFunction::FirstValue),
..
},
)
| Expr::AggregateFunction(
aggr @ datafusion_expr::expr::AggregateFunction {
func_def:
AggregateFunctionDefinition::BuiltIn(AggregateFunction::LastValue),
..
},
) => {
let is_last_value_func = matches!(
aggr.func_def,
AggregateFunctionDefinition::BuiltIn(AggregateFunction::LastValue)
);
Expr::AggregateFunction(aggr)
if (aggr.func_def.name() == "last_value"
|| aggr.func_def.name() == "first_value") =>
{
let is_last_value_func = aggr.func_def.name() == "last_value";
// Because we only need to find the first_value/last_value,
// the complexity of sorting the entire batch is O(nlogn).
@@ -795,10 +801,8 @@ impl RangeSelect {
&input_schema,
name,
false,
false,
),
f => Err(DataFusionError::NotImplemented(format!(
"Range function from {f:?}"
))),
}
}
_ => Err(DataFusionError::Plan(format!(
@@ -930,8 +934,8 @@ impl ExecutionPlan for RangeSelectExec {
&self.cache
}
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![self.input.clone()]
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![&self.input]
}
fn with_new_children(

View File

@@ -510,7 +510,7 @@ impl RangePlanRewriter {
fn have_range_in_exprs(exprs: &[Expr]) -> bool {
exprs.iter().any(|expr| {
let mut find_range = false;
let _ = expr.apply(&mut |expr| {
let _ = expr.apply(|expr| {
Ok(match expr {
Expr::ScalarFunction(func) if func.name() == "range_fn" => {
find_range = true;
@@ -525,7 +525,7 @@ fn have_range_in_exprs(exprs: &[Expr]) -> bool {
fn interval_only_in_expr(expr: &Expr) -> bool {
let mut all_interval = true;
let _ = expr.apply(&mut |expr| {
let _ = expr.apply(|expr| {
if !matches!(
expr,
Expr::Literal(ScalarValue::IntervalDayTime(_))
@@ -651,8 +651,8 @@ mod test {
let query =
r#"SELECT (covar(field_0 + field_1, field_1)/4) RANGE '5m' FROM test ALIGN '1h';"#;
let expected = String::from(
"Projection: COVAR(test.field_0 + test.field_1,test.field_1) RANGE 5m / Int64(4) [COVAR(test.field_0 + test.field_1,test.field_1) RANGE 5m / Int64(4):Float64;N]\
\n RangeSelect: range_exprs=[COVAR(test.field_0 + test.field_1,test.field_1) RANGE 5m], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1, test.tag_2, test.tag_3, test.tag_4], time_index=timestamp [COVAR(test.field_0 + test.field_1,test.field_1) RANGE 5m:Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8]\
"Projection: covar_samp(test.field_0 + test.field_1,test.field_1) RANGE 5m / Int64(4) [covar_samp(test.field_0 + test.field_1,test.field_1) RANGE 5m / Int64(4):Float64;N]\
\n RangeSelect: range_exprs=[covar_samp(test.field_0 + test.field_1,test.field_1) RANGE 5m], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1, test.tag_2, test.tag_3, test.tag_4], time_index=timestamp [covar_samp(test.field_0 + test.field_1,test.field_1) RANGE 5m:Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8]\
\n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]"
);
query_plan_compare(query, expected).await;

View File

@@ -24,6 +24,7 @@ use sql::dialect::GreptimeDbDialect;
use sql::parser::ParserContext;
use sql::statements::create::{CreateTable, TIME_INDEX};
use sql::statements::{self, OptionMap};
use sqlparser::ast::KeyOrIndexDisplay;
use store_api::metric_engine_consts::{is_metric_engine, is_metric_engine_internal_column};
use table::metadata::{TableInfoRef, TableMeta};
use table::requests::{FILE_TABLE_META_KEY, TTL_KEY, WRITE_BUFFER_SIZE_KEY};
@@ -108,8 +109,11 @@ fn create_table_constraints(
constraints.push(TableConstraint::Unique {
name: Some(TIME_INDEX.into()),
columns: vec![Ident::with_quote(quote_style, column_name)],
is_primary: false,
characteristics: None,
index_name: None,
index_type_display: KeyOrIndexDisplay::None,
index_type: None,
index_options: vec![],
});
}
if !table_meta.primary_key_indices.is_empty() {
@@ -124,11 +128,13 @@ fn create_table_constraints(
}
})
.collect();
constraints.push(TableConstraint::Unique {
constraints.push(TableConstraint::PrimaryKey {
name: None,
columns,
is_primary: true,
characteristics: None,
index_name: None,
index_type: None,
index_options: vec![],
});
}