refactor: split match arms in prom_expr_to_plan into smaller methods (#4317)

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2024-07-08 19:59:59 +08:00
committed by GitHub
parent c0e9b3dbe2
commit e5730a3745

View File

@@ -169,244 +169,9 @@ impl PromPlanner {
session_state: &SessionState,
) -> Result<LogicalPlan> {
let res = match &prom_expr {
PromExpr::Aggregate(AggregateExpr {
op,
expr,
// TODO(ruihang): support param
param: _param,
modifier,
}) => {
let input = self.prom_expr_to_plan(*expr.clone(), session_state).await?;
// calculate columns to group by
// Need to append time index column into group by columns
let group_exprs = self.agg_modifier_to_col(input.schema(), modifier)?;
// convert op and value columns to aggregate exprs
let aggr_exprs = self.create_aggregate_exprs(*op, &input)?;
// create plan
let group_sort_expr = group_exprs
.clone()
.into_iter()
.map(|expr| expr.sort(true, false));
LogicalPlanBuilder::from(input)
.aggregate(group_exprs, aggr_exprs)
.context(DataFusionPlanningSnafu)?
.sort(group_sort_expr)
.context(DataFusionPlanningSnafu)?
.build()
.context(DataFusionPlanningSnafu)?
}
PromExpr::Unary(UnaryExpr { expr }) => {
// Unary Expr in PromQL implys the `-` operator
let input = self.prom_expr_to_plan(*expr.clone(), session_state).await?;
self.projection_for_each_field_column(input, |col| {
Ok(DfExpr::Negative(Box::new(DfExpr::Column(col.into()))))
})?
}
PromExpr::Binary(PromBinaryExpr {
lhs,
rhs,
op,
modifier,
}) => {
// if set to true, comparison operator will return 0/1 (for true/false) instead of
// filter on the result column
let should_return_bool = if let Some(m) = modifier {
m.return_bool
} else {
false
};
let is_comparison_op = Self::is_token_a_comparison_op(*op);
// we should build a filter plan here if the op is comparison op and need not
// to return 0/1. Otherwise, we should build a projection plan
match (
Self::try_build_literal_expr(lhs),
Self::try_build_literal_expr(rhs),
) {
(Some(lhs), Some(rhs)) => {
self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string());
self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
self.ctx.reset_table_name_and_schema();
let field_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
let mut field_expr = field_expr_builder(lhs, rhs)?;
if is_comparison_op && should_return_bool {
field_expr = DfExpr::Cast(Cast {
expr: Box::new(field_expr),
data_type: ArrowDataType::Float64,
});
}
LogicalPlan::Extension(Extension {
node: Arc::new(
EmptyMetric::new(
self.ctx.start,
self.ctx.end,
self.ctx.interval,
SPECIAL_TIME_FUNCTION.to_string(),
DEFAULT_FIELD_COLUMN.to_string(),
Some(field_expr),
)
.context(DataFusionPlanningSnafu)?,
),
})
}
// lhs is a literal, rhs is a column
(Some(mut expr), None) => {
let input = self.prom_expr_to_plan(*rhs.clone(), session_state).await?;
// check if the literal is a special time expr
if let Some(time_expr) = Self::try_build_special_time_expr(
lhs,
self.ctx.time_index_column.as_ref().unwrap(),
) {
expr = time_expr
}
let bin_expr_builder = |col: &String| {
let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
let mut binary_expr =
binary_expr_builder(expr.clone(), DfExpr::Column(col.into()))?;
if is_comparison_op && should_return_bool {
binary_expr = DfExpr::Cast(Cast {
expr: Box::new(binary_expr),
data_type: ArrowDataType::Float64,
});
}
Ok(binary_expr)
};
if is_comparison_op && !should_return_bool {
self.filter_on_field_column(input, bin_expr_builder)?
} else {
self.projection_for_each_field_column(input, bin_expr_builder)?
}
}
// lhs is a column, rhs is a literal
(None, Some(mut expr)) => {
let input = self.prom_expr_to_plan(*lhs.clone(), session_state).await?;
// check if the literal is a special time expr
if let Some(time_expr) = Self::try_build_special_time_expr(
rhs,
self.ctx.time_index_column.as_ref().unwrap(),
) {
expr = time_expr
}
let bin_expr_builder = |col: &String| {
let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
let mut binary_expr =
binary_expr_builder(DfExpr::Column(col.into()), expr.clone())?;
if is_comparison_op && should_return_bool {
binary_expr = DfExpr::Cast(Cast {
expr: Box::new(binary_expr),
data_type: ArrowDataType::Float64,
});
}
Ok(binary_expr)
};
if is_comparison_op && !should_return_bool {
self.filter_on_field_column(input, bin_expr_builder)?
} else {
self.projection_for_each_field_column(input, bin_expr_builder)?
}
}
// both are columns. join them on time index
(None, None) => {
let left_input =
self.prom_expr_to_plan(*lhs.clone(), session_state).await?;
let left_field_columns = self.ctx.field_columns.clone();
let mut left_table_ref = self
.table_ref()
.unwrap_or_else(|_| TableReference::bare(""));
let left_context = self.ctx.clone();
let right_input =
self.prom_expr_to_plan(*rhs.clone(), session_state).await?;
let right_field_columns = self.ctx.field_columns.clone();
let mut right_table_ref = self
.table_ref()
.unwrap_or_else(|_| TableReference::bare(""));
let right_context = self.ctx.clone();
// TODO(ruihang): avoid join if left and right are the same table
// set op has "special" join semantics
if Self::is_token_a_set_op(*op) {
return self.set_op_on_non_field_columns(
left_input,
right_input,
left_context,
right_context,
*op,
modifier,
);
}
// normal join
if left_table_ref == right_table_ref {
// rename table references to avoid ambiguity
left_table_ref = TableReference::bare("lhs");
right_table_ref = TableReference::bare("rhs");
// `self.ctx` have ctx in right plan, if right plan have no tag,
// we use left plan ctx as the ctx for subsequent calculations,
// to avoid case like `host + scalar(...)`
// we need preserve tag column on `host` table in subsequent projection,
// which only show in left plan ctx.
if self.ctx.tag_columns.is_empty() {
self.ctx = left_context.clone();
self.ctx.table_name = Some("lhs".to_string());
} else {
self.ctx.table_name = Some("rhs".to_string());
}
}
let mut field_columns =
left_field_columns.iter().zip(right_field_columns.iter());
let join_plan = self.join_on_non_field_columns(
left_input,
right_input,
left_table_ref.clone(),
right_table_ref.clone(),
// if left plan or right plan tag is empty, means case like `scalar(...) + host` or `host + scalar(...)`
// under this case we only join on time index
left_context.tag_columns.is_empty()
|| right_context.tag_columns.is_empty(),
)?;
let join_plan_schema = join_plan.schema().clone();
let bin_expr_builder = |_: &String| {
let (left_col_name, right_col_name) = field_columns.next().unwrap();
let left_col = join_plan_schema
.qualified_field_with_name(Some(&left_table_ref), left_col_name)
.context(DataFusionPlanningSnafu)?
.into();
let right_col = join_plan_schema
.qualified_field_with_name(Some(&right_table_ref), right_col_name)
.context(DataFusionPlanningSnafu)?
.into();
let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
let mut binary_expr = binary_expr_builder(
DfExpr::Column(left_col),
DfExpr::Column(right_col),
)?;
if is_comparison_op && should_return_bool {
binary_expr = DfExpr::Cast(Cast {
expr: Box::new(binary_expr),
data_type: ArrowDataType::Float64,
});
}
Ok(binary_expr)
};
if is_comparison_op && !should_return_bool {
self.filter_on_field_column(join_plan, bin_expr_builder)?
} else {
self.projection_for_each_field_column(join_plan, bin_expr_builder)?
}
}
}
}
PromExpr::Aggregate(expr) => self.prom_aggr_expr_to_plan(session_state, expr).await?,
PromExpr::Unary(expr) => self.prom_unary_expr_to_plan(session_state, expr).await?,
PromExpr::Binary(expr) => self.prom_binary_expr_to_plan(session_state, expr).await?,
PromExpr::Paren(ParenExpr { expr }) => {
self.prom_expr_to_plan(*expr.clone(), session_state).await?
}
@@ -414,194 +179,484 @@ impl PromPlanner {
name: "Prom Subquery",
}
.fail()?,
PromExpr::NumberLiteral(NumberLiteral { val }) => {
self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string());
self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
self.ctx.reset_table_name_and_schema();
let literal_expr = df_prelude::lit(*val);
LogicalPlan::Extension(Extension {
node: Arc::new(
EmptyMetric::new(
self.ctx.start,
self.ctx.end,
self.ctx.interval,
SPECIAL_TIME_FUNCTION.to_string(),
DEFAULT_FIELD_COLUMN.to_string(),
Some(literal_expr),
)
.context(DataFusionPlanningSnafu)?,
),
})
PromExpr::NumberLiteral(lit) => self.prom_number_lit_to_plan(lit)?,
PromExpr::StringLiteral(lit) => self.prom_string_lit_to_plan(lit)?,
PromExpr::VectorSelector(selector) => {
self.prom_vector_selector_to_plan(selector).await?
}
PromExpr::StringLiteral(StringLiteral { val }) => {
self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string());
self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
self.ctx.reset_table_name_and_schema();
let literal_expr = df_prelude::lit(val.to_string());
LogicalPlan::Extension(Extension {
node: Arc::new(
EmptyMetric::new(
self.ctx.start,
self.ctx.end,
self.ctx.interval,
SPECIAL_TIME_FUNCTION.to_string(),
DEFAULT_FIELD_COLUMN.to_string(),
Some(literal_expr),
)
.context(DataFusionPlanningSnafu)?,
),
})
}
PromExpr::VectorSelector(VectorSelector {
name,
offset,
matchers,
at: _,
}) => {
let matchers = self.preprocess_label_matchers(matchers, name)?;
self.setup_context().await?;
let normalize = self
.selector_to_series_normalize_plan(offset, matchers, false)
.await?;
let manipulate = InstantManipulate::new(
self.ctx.start,
self.ctx.end,
self.ctx.lookback_delta,
self.ctx.interval,
self.ctx
.time_index_column
.clone()
.expect("time index should be set in `setup_context`"),
self.ctx.field_columns.first().cloned(),
normalize,
);
LogicalPlan::Extension(Extension {
node: Arc::new(manipulate),
})
}
PromExpr::MatrixSelector(MatrixSelector { vs, range }) => {
let VectorSelector {
name,
offset,
matchers,
..
} = vs;
let matchers = self.preprocess_label_matchers(matchers, name)?;
self.setup_context().await?;
ensure!(!range.is_zero(), ZeroRangeSelectorSnafu);
let range_ms = range.as_millis() as _;
self.ctx.range = Some(range_ms);
let normalize = self
.selector_to_series_normalize_plan(offset, matchers, true)
.await?;
let manipulate = RangeManipulate::new(
self.ctx.start,
self.ctx.end,
self.ctx.interval,
// TODO(ruihang): convert via Timestamp datatypes to support different time units
range_ms,
self.ctx
.time_index_column
.clone()
.expect("time index should be set in `setup_context`"),
self.ctx.field_columns.clone(),
normalize,
)
.context(DataFusionPlanningSnafu)?;
LogicalPlan::Extension(Extension {
node: Arc::new(manipulate),
})
}
PromExpr::Call(Call { func, args }) => {
// some special functions that are not expression but a plan
match func.name {
SPECIAL_HISTOGRAM_QUANTILE => {
return self.create_histogram_plan(args, session_state).await
}
SPECIAL_VECTOR_FUNCTION => return self.create_vector_plan(args).await,
SCALAR_FUNCTION => return self.create_scalar_plan(args, session_state).await,
_ => {}
}
// transform function arguments
let args = self.create_function_args(&args.args)?;
let input = if let Some(prom_expr) = args.input {
self.prom_expr_to_plan(prom_expr, session_state).await?
} else {
self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
self.ctx.reset_table_name_and_schema();
LogicalPlan::Extension(Extension {
node: Arc::new(
EmptyMetric::new(
self.ctx.start,
self.ctx.end,
self.ctx.interval,
SPECIAL_TIME_FUNCTION.to_string(),
DEFAULT_FIELD_COLUMN.to_string(),
None,
)
.context(DataFusionPlanningSnafu)?,
),
})
};
let mut func_exprs =
self.create_function_expr(func, args.literals, session_state)?;
func_exprs.insert(0, self.create_time_index_column_expr()?);
func_exprs.extend_from_slice(&self.create_tag_column_exprs()?);
LogicalPlanBuilder::from(input)
.project(func_exprs)
.context(DataFusionPlanningSnafu)?
.filter(self.create_empty_values_filter_expr()?)
.context(DataFusionPlanningSnafu)?
.build()
.context(DataFusionPlanningSnafu)?
}
PromExpr::Extension(promql_parser::parser::ast::Extension { expr }) => {
let children = expr.children();
let plan = self
.prom_expr_to_plan(children[0].clone(), session_state)
.await?;
// Wrapper for the explanation/analyze of the existing plan
// https://docs.rs/datafusion-expr/latest/datafusion_expr/logical_plan/builder/struct.LogicalPlanBuilder.html#method.explain
// if `analyze` is true, runs the actual plan and produces
// information about metrics during run.
// if `verbose` is true, prints out additional details when VERBOSE keyword is specified
match expr.name() {
"ANALYZE" => LogicalPlanBuilder::from(plan)
.explain(false, true)
.unwrap()
.build()
.context(DataFusionPlanningSnafu)?,
"ANALYZE VERBOSE" => LogicalPlanBuilder::from(plan)
.explain(true, true)
.unwrap()
.build()
.context(DataFusionPlanningSnafu)?,
"EXPLAIN" => LogicalPlanBuilder::from(plan)
.explain(false, false)
.unwrap()
.build()
.context(DataFusionPlanningSnafu)?,
"EXPLAIN VERBOSE" => LogicalPlanBuilder::from(plan)
.explain(true, false)
.unwrap()
.build()
.context(DataFusionPlanningSnafu)?,
_ => LogicalPlanBuilder::empty(true)
.build()
.context(DataFusionPlanningSnafu)?,
}
PromExpr::MatrixSelector(selector) => {
self.prom_matrix_selector_to_plan(selector).await?
}
PromExpr::Call(expr) => self.prom_call_expr_to_plan(session_state, expr).await?,
PromExpr::Extension(expr) => self.prom_ext_expr_to_plan(session_state, expr).await?,
};
Ok(res)
}
async fn prom_aggr_expr_to_plan(
&mut self,
session_state: &SessionState,
aggr_expr: &AggregateExpr,
) -> Result<LogicalPlan> {
let AggregateExpr {
op,
expr,
// TODO(ruihang): support param
param: _param,
modifier,
} = aggr_expr;
let input = self.prom_expr_to_plan(*expr.clone(), session_state).await?;
// calculate columns to group by
// Need to append time index column into group by columns
let group_exprs = self.agg_modifier_to_col(input.schema(), modifier)?;
// convert op and value columns to aggregate exprs
let aggr_exprs = self.create_aggregate_exprs(*op, &input)?;
// create plan
let group_sort_expr = group_exprs
.clone()
.into_iter()
.map(|expr| expr.sort(true, false));
LogicalPlanBuilder::from(input)
.aggregate(group_exprs, aggr_exprs)
.context(DataFusionPlanningSnafu)?
.sort(group_sort_expr)
.context(DataFusionPlanningSnafu)?
.build()
.context(DataFusionPlanningSnafu)
}
async fn prom_unary_expr_to_plan(
&mut self,
session_state: &SessionState,
unary_expr: &UnaryExpr,
) -> Result<LogicalPlan> {
let UnaryExpr { expr } = unary_expr;
// Unary Expr in PromQL implys the `-` operator
let input = self.prom_expr_to_plan(*expr.clone(), session_state).await?;
self.projection_for_each_field_column(input, |col| {
Ok(DfExpr::Negative(Box::new(DfExpr::Column(col.into()))))
})
}
async fn prom_binary_expr_to_plan(
&mut self,
session_state: &SessionState,
binary_expr: &PromBinaryExpr,
) -> Result<LogicalPlan> {
let PromBinaryExpr {
lhs,
rhs,
op,
modifier,
} = binary_expr;
// if set to true, comparison operator will return 0/1 (for true/false) instead of
// filter on the result column
let should_return_bool = if let Some(m) = modifier {
m.return_bool
} else {
false
};
let is_comparison_op = Self::is_token_a_comparison_op(*op);
// we should build a filter plan here if the op is comparison op and need not
// to return 0/1. Otherwise, we should build a projection plan
match (
Self::try_build_literal_expr(lhs),
Self::try_build_literal_expr(rhs),
) {
(Some(lhs), Some(rhs)) => {
self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string());
self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
self.ctx.reset_table_name_and_schema();
let field_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
let mut field_expr = field_expr_builder(lhs, rhs)?;
if is_comparison_op && should_return_bool {
field_expr = DfExpr::Cast(Cast {
expr: Box::new(field_expr),
data_type: ArrowDataType::Float64,
});
}
Ok(LogicalPlan::Extension(Extension {
node: Arc::new(
EmptyMetric::new(
self.ctx.start,
self.ctx.end,
self.ctx.interval,
SPECIAL_TIME_FUNCTION.to_string(),
DEFAULT_FIELD_COLUMN.to_string(),
Some(field_expr),
)
.context(DataFusionPlanningSnafu)?,
),
}))
}
// lhs is a literal, rhs is a column
(Some(mut expr), None) => {
let input = self.prom_expr_to_plan(*rhs.clone(), session_state).await?;
// check if the literal is a special time expr
if let Some(time_expr) = Self::try_build_special_time_expr(
lhs,
self.ctx.time_index_column.as_ref().unwrap(),
) {
expr = time_expr
}
let bin_expr_builder = |col: &String| {
let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
let mut binary_expr =
binary_expr_builder(expr.clone(), DfExpr::Column(col.into()))?;
if is_comparison_op && should_return_bool {
binary_expr = DfExpr::Cast(Cast {
expr: Box::new(binary_expr),
data_type: ArrowDataType::Float64,
});
}
Ok(binary_expr)
};
if is_comparison_op && !should_return_bool {
self.filter_on_field_column(input, bin_expr_builder)
} else {
self.projection_for_each_field_column(input, bin_expr_builder)
}
}
// lhs is a column, rhs is a literal
(None, Some(mut expr)) => {
let input = self.prom_expr_to_plan(*lhs.clone(), session_state).await?;
// check if the literal is a special time expr
if let Some(time_expr) = Self::try_build_special_time_expr(
rhs,
self.ctx.time_index_column.as_ref().unwrap(),
) {
expr = time_expr
}
let bin_expr_builder = |col: &String| {
let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
let mut binary_expr =
binary_expr_builder(DfExpr::Column(col.into()), expr.clone())?;
if is_comparison_op && should_return_bool {
binary_expr = DfExpr::Cast(Cast {
expr: Box::new(binary_expr),
data_type: ArrowDataType::Float64,
});
}
Ok(binary_expr)
};
if is_comparison_op && !should_return_bool {
self.filter_on_field_column(input, bin_expr_builder)
} else {
self.projection_for_each_field_column(input, bin_expr_builder)
}
}
// both are columns. join them on time index
(None, None) => {
let left_input = self.prom_expr_to_plan(*lhs.clone(), session_state).await?;
let left_field_columns = self.ctx.field_columns.clone();
let mut left_table_ref = self
.table_ref()
.unwrap_or_else(|_| TableReference::bare(""));
let left_context = self.ctx.clone();
let right_input = self.prom_expr_to_plan(*rhs.clone(), session_state).await?;
let right_field_columns = self.ctx.field_columns.clone();
let mut right_table_ref = self
.table_ref()
.unwrap_or_else(|_| TableReference::bare(""));
let right_context = self.ctx.clone();
// TODO(ruihang): avoid join if left and right are the same table
// set op has "special" join semantics
if Self::is_token_a_set_op(*op) {
return self.set_op_on_non_field_columns(
left_input,
right_input,
left_context,
right_context,
*op,
modifier,
);
}
// normal join
if left_table_ref == right_table_ref {
// rename table references to avoid ambiguity
left_table_ref = TableReference::bare("lhs");
right_table_ref = TableReference::bare("rhs");
// `self.ctx` have ctx in right plan, if right plan have no tag,
// we use left plan ctx as the ctx for subsequent calculations,
// to avoid case like `host + scalar(...)`
// we need preserve tag column on `host` table in subsequent projection,
// which only show in left plan ctx.
if self.ctx.tag_columns.is_empty() {
self.ctx = left_context.clone();
self.ctx.table_name = Some("lhs".to_string());
} else {
self.ctx.table_name = Some("rhs".to_string());
}
}
let mut field_columns = left_field_columns.iter().zip(right_field_columns.iter());
let join_plan = self.join_on_non_field_columns(
left_input,
right_input,
left_table_ref.clone(),
right_table_ref.clone(),
// if left plan or right plan tag is empty, means case like `scalar(...) + host` or `host + scalar(...)`
// under this case we only join on time index
left_context.tag_columns.is_empty() || right_context.tag_columns.is_empty(),
)?;
let join_plan_schema = join_plan.schema().clone();
let bin_expr_builder = |_: &String| {
let (left_col_name, right_col_name) = field_columns.next().unwrap();
let left_col = join_plan_schema
.qualified_field_with_name(Some(&left_table_ref), left_col_name)
.context(DataFusionPlanningSnafu)?
.into();
let right_col = join_plan_schema
.qualified_field_with_name(Some(&right_table_ref), right_col_name)
.context(DataFusionPlanningSnafu)?
.into();
let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
let mut binary_expr =
binary_expr_builder(DfExpr::Column(left_col), DfExpr::Column(right_col))?;
if is_comparison_op && should_return_bool {
binary_expr = DfExpr::Cast(Cast {
expr: Box::new(binary_expr),
data_type: ArrowDataType::Float64,
});
}
Ok(binary_expr)
};
if is_comparison_op && !should_return_bool {
self.filter_on_field_column(join_plan, bin_expr_builder)
} else {
self.projection_for_each_field_column(join_plan, bin_expr_builder)
}
}
}
}
fn prom_number_lit_to_plan(&mut self, number_literal: &NumberLiteral) -> Result<LogicalPlan> {
let NumberLiteral { val } = number_literal;
self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string());
self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
self.ctx.reset_table_name_and_schema();
let literal_expr = df_prelude::lit(*val);
let plan = LogicalPlan::Extension(Extension {
node: Arc::new(
EmptyMetric::new(
self.ctx.start,
self.ctx.end,
self.ctx.interval,
SPECIAL_TIME_FUNCTION.to_string(),
DEFAULT_FIELD_COLUMN.to_string(),
Some(literal_expr),
)
.context(DataFusionPlanningSnafu)?,
),
});
Ok(plan)
}
fn prom_string_lit_to_plan(&mut self, string_literal: &StringLiteral) -> Result<LogicalPlan> {
let StringLiteral { val } = string_literal;
self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string());
self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
self.ctx.reset_table_name_and_schema();
let literal_expr = df_prelude::lit(val.to_string());
let plan = LogicalPlan::Extension(Extension {
node: Arc::new(
EmptyMetric::new(
self.ctx.start,
self.ctx.end,
self.ctx.interval,
SPECIAL_TIME_FUNCTION.to_string(),
DEFAULT_FIELD_COLUMN.to_string(),
Some(literal_expr),
)
.context(DataFusionPlanningSnafu)?,
),
});
Ok(plan)
}
async fn prom_vector_selector_to_plan(
&mut self,
vector_selector: &VectorSelector,
) -> Result<LogicalPlan> {
let VectorSelector {
name,
offset,
matchers,
at: _,
} = vector_selector;
let matchers = self.preprocess_label_matchers(matchers, name)?;
self.setup_context().await?;
let normalize = self
.selector_to_series_normalize_plan(offset, matchers, false)
.await?;
let manipulate = InstantManipulate::new(
self.ctx.start,
self.ctx.end,
self.ctx.lookback_delta,
self.ctx.interval,
self.ctx
.time_index_column
.clone()
.expect("time index should be set in `setup_context`"),
self.ctx.field_columns.first().cloned(),
normalize,
);
Ok(LogicalPlan::Extension(Extension {
node: Arc::new(manipulate),
}))
}
async fn prom_matrix_selector_to_plan(
&mut self,
matrix_selector: &MatrixSelector,
) -> Result<LogicalPlan> {
let MatrixSelector { vs, range } = matrix_selector;
let VectorSelector {
name,
offset,
matchers,
..
} = vs;
let matchers = self.preprocess_label_matchers(matchers, name)?;
self.setup_context().await?;
ensure!(!range.is_zero(), ZeroRangeSelectorSnafu);
let range_ms = range.as_millis() as _;
self.ctx.range = Some(range_ms);
let normalize = self
.selector_to_series_normalize_plan(offset, matchers, true)
.await?;
let manipulate = RangeManipulate::new(
self.ctx.start,
self.ctx.end,
self.ctx.interval,
// TODO(ruihang): convert via Timestamp datatypes to support different time units
range_ms,
self.ctx
.time_index_column
.clone()
.expect("time index should be set in `setup_context`"),
self.ctx.field_columns.clone(),
normalize,
)
.context(DataFusionPlanningSnafu)?;
Ok(LogicalPlan::Extension(Extension {
node: Arc::new(manipulate),
}))
}
async fn prom_call_expr_to_plan(
&mut self,
session_state: &SessionState,
call_expr: &Call,
) -> Result<LogicalPlan> {
let Call { func, args } = call_expr;
// some special functions that are not expression but a plan
match func.name {
SPECIAL_HISTOGRAM_QUANTILE => {
return self.create_histogram_plan(args, session_state).await
}
SPECIAL_VECTOR_FUNCTION => return self.create_vector_plan(args).await,
SCALAR_FUNCTION => return self.create_scalar_plan(args, session_state).await,
_ => {}
}
// transform function arguments
let args = self.create_function_args(&args.args)?;
let input = if let Some(prom_expr) = args.input {
self.prom_expr_to_plan(prom_expr, session_state).await?
} else {
self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
self.ctx.reset_table_name_and_schema();
LogicalPlan::Extension(Extension {
node: Arc::new(
EmptyMetric::new(
self.ctx.start,
self.ctx.end,
self.ctx.interval,
SPECIAL_TIME_FUNCTION.to_string(),
DEFAULT_FIELD_COLUMN.to_string(),
None,
)
.context(DataFusionPlanningSnafu)?,
),
})
};
let mut func_exprs = self.create_function_expr(func, args.literals, session_state)?;
func_exprs.insert(0, self.create_time_index_column_expr()?);
func_exprs.extend_from_slice(&self.create_tag_column_exprs()?);
LogicalPlanBuilder::from(input)
.project(func_exprs)
.context(DataFusionPlanningSnafu)?
.filter(self.create_empty_values_filter_expr()?)
.context(DataFusionPlanningSnafu)?
.build()
.context(DataFusionPlanningSnafu)
}
async fn prom_ext_expr_to_plan(
&mut self,
session_state: &SessionState,
ext_expr: &promql_parser::parser::ast::Extension,
) -> Result<LogicalPlan> {
// let promql_parser::parser::ast::Extension { expr } = ext_expr;
let expr = &ext_expr.expr;
let children = expr.children();
let plan = self
.prom_expr_to_plan(children[0].clone(), session_state)
.await?;
// Wrapper for the explanation/analyze of the existing plan
// https://docs.rs/datafusion-expr/latest/datafusion_expr/logical_plan/builder/struct.LogicalPlanBuilder.html#method.explain
// if `analyze` is true, runs the actual plan and produces
// information about metrics during run.
// if `verbose` is true, prints out additional details when VERBOSE keyword is specified
match expr.name() {
"ANALYZE" => LogicalPlanBuilder::from(plan)
.explain(false, true)
.unwrap()
.build()
.context(DataFusionPlanningSnafu),
"ANALYZE VERBOSE" => LogicalPlanBuilder::from(plan)
.explain(true, true)
.unwrap()
.build()
.context(DataFusionPlanningSnafu),
"EXPLAIN" => LogicalPlanBuilder::from(plan)
.explain(false, false)
.unwrap()
.build()
.context(DataFusionPlanningSnafu),
"EXPLAIN VERBOSE" => LogicalPlanBuilder::from(plan)
.explain(true, false)
.unwrap()
.build()
.context(DataFusionPlanningSnafu),
_ => LogicalPlanBuilder::empty(true)
.build()
.context(DataFusionPlanningSnafu),
}
}
/// Extract metric name from `__name__` matcher and set it into [PromPlannerContext].
/// Returns a new [Matchers] that doesn't contain metric name matcher.
///