feat: absent function in PromQL (#6618)

* feat: absent function in PromQL

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* impl serde

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* sqlness test

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* ai suggests

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* resolve PR comments

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* comment out some tests

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2025-08-03 23:59:58 -07:00
committed by GitHub
parent a3e55565dc
commit 865ca44dbd
9 changed files with 924 additions and 6 deletions

View File

@@ -27,6 +27,7 @@ use datafusion::common::DFSchemaRef;
use datafusion::datasource::DefaultTableSource;
use datafusion::functions_aggregate::average::avg_udaf;
use datafusion::functions_aggregate::count::count_udaf;
use datafusion::functions_aggregate::expr_fn::first_value;
use datafusion::functions_aggregate::grouping::grouping_udaf;
use datafusion::functions_aggregate::min_max::{max_udaf, min_udaf};
use datafusion::functions_aggregate::stddev::stddev_pop_udaf;
@@ -50,7 +51,7 @@ use datatypes::arrow::datatypes::{DataType as ArrowDataType, TimeUnit as ArrowTi
use datatypes::data_type::ConcreteDataType;
use itertools::Itertools;
use promql::extension_plan::{
build_special_time_expr, EmptyMetric, HistogramFold, InstantManipulate, Millisecond,
build_special_time_expr, Absent, EmptyMetric, HistogramFold, InstantManipulate, Millisecond,
RangeManipulate, ScalarCalculate, SeriesDivide, SeriesNormalize, UnionDistinctOn,
};
use promql::functions::{
@@ -87,6 +88,8 @@ use crate::query_engine::QueryEngineState;
const SPECIAL_TIME_FUNCTION: &str = "time";
/// `scalar()` function in PromQL.
const SCALAR_FUNCTION: &str = "scalar";
/// `absent()` function in PromQL
const SPECIAL_ABSENT_FUNCTION: &str = "absent";
/// `histogram_quantile` function in PromQL
const SPECIAL_HISTOGRAM_QUANTILE: &str = "histogram_quantile";
/// `vector` function in PromQL
@@ -125,7 +128,10 @@ struct PromPlannerContext {
time_index_column: Option<String>,
field_columns: Vec<String>,
tag_columns: Vec<String>,
/// The matcher for field columns `__field__`.
field_column_matcher: Option<Vec<Matcher>>,
/// The matcher for selectors (normal matchers).
selector_matcher: Vec<Matcher>,
schema_name: Option<String>,
/// The range in millisecond of range selector. None if there is no range selector.
range: Option<Millisecond>,
@@ -149,6 +155,7 @@ impl PromPlannerContext {
self.field_columns = vec![];
self.tag_columns = vec![];
self.field_column_matcher = None;
self.selector_matcher.clear();
self.schema_name = None;
self.range = None;
}
@@ -848,6 +855,9 @@ impl PromPlanner {
}
SPECIAL_VECTOR_FUNCTION => return self.create_vector_plan(args).await,
SCALAR_FUNCTION => return self.create_scalar_plan(args, query_engine_state).await,
SPECIAL_ABSENT_FUNCTION => {
return self.create_absent_plan(args, query_engine_state).await
}
_ => {}
}
@@ -1024,6 +1034,7 @@ impl PromPlanner {
);
self.ctx.schema_name = Some(matcher.value.clone());
} else if matcher.name != METRIC_NAME {
self.ctx.selector_matcher.push(matcher.clone());
let _ = matchers.insert(matcher.clone());
}
}
@@ -2500,6 +2511,71 @@ impl PromPlanner {
Ok(scalar_plan)
}
/// Create a [SPECIAL_ABSENT_FUNCTION] plan
async fn create_absent_plan(
&mut self,
args: &PromFunctionArgs,
query_engine_state: &QueryEngineState,
) -> Result<LogicalPlan> {
if args.args.len() != 1 {
return FunctionInvalidArgumentSnafu {
fn_name: SPECIAL_ABSENT_FUNCTION.to_string(),
}
.fail();
}
let input = self
.prom_expr_to_plan(&args.args[0], query_engine_state)
.await?;
let time_index_expr = self.create_time_index_column_expr()?;
let first_field_expr =
self.create_field_column_exprs()?
.pop()
.with_context(|| ValueNotFoundSnafu {
table: self.ctx.table_name.clone().unwrap_or_default(),
})?;
let first_value_expr = first_value(first_field_expr, None);
let ordered_aggregated_input = LogicalPlanBuilder::from(input)
.aggregate(
vec![time_index_expr.clone()],
vec![first_value_expr.clone()],
)
.context(DataFusionPlanningSnafu)?
.sort(vec![time_index_expr.sort(true, false)])
.context(DataFusionPlanningSnafu)?
.build()
.context(DataFusionPlanningSnafu)?;
let fake_labels = self
.ctx
.selector_matcher
.iter()
.filter_map(|matcher| match matcher.op {
MatchOp::Equal => Some((matcher.name.clone(), matcher.value.clone())),
_ => None,
})
.collect::<Vec<_>>();
// Create the absent plan
let absent_plan = LogicalPlan::Extension(Extension {
node: Arc::new(
Absent::try_new(
self.ctx.start,
self.ctx.end,
self.ctx.interval,
self.ctx.time_index_column.as_ref().unwrap().clone(),
self.ctx.field_columns[0].clone(),
fake_labels,
ordered_aggregated_input,
)
.context(DataFusionPlanningSnafu)?,
),
});
Ok(absent_plan)
}
/// Try to build a DataFusion Literal Expression from PromQL Expr, return
/// `None` if the input is not a literal expression.
fn try_build_literal_expr(expr: &PromExpr) -> Option<DfExpr> {