fix: label_replace and label_join functions when used as sub‐expressions (#6443)

* fix: label_replace and label_join functions in expressions

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* chore: remove update_fields

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* chore: tql eval -> TQL EVAL

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* fix: empty regex and not existing source label

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* chore: simplfy test

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* fix: test

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* fix: test

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

---------

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>
Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
dennis zhuang
2025-07-03 13:34:22 +08:00
committed by Yingwen
parent 60835afb47
commit 7ab9b335a1
6 changed files with 316 additions and 89 deletions

View File

@@ -201,6 +201,12 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("vector cannot contain metrics with the same labelset"))]
SameLabelSet {
#[snafu(implicit)]
location: Location,
},
}
impl ErrorExt for Error {
@@ -221,6 +227,7 @@ impl ErrorExt for Error {
| CombineTableColumnMismatch { .. }
| UnexpectedPlanExpr { .. }
| UnsupportedMatcherOp { .. }
| SameLabelSet { .. }
| TimestampOutOfRange { .. } => StatusCode::InvalidArguments,
UnknownTable { .. } => StatusCode::Internal,

View File

@@ -76,7 +76,7 @@ use crate::promql::error::{
CatalogSnafu, ColumnNotFoundSnafu, CombineTableColumnMismatchSnafu, DataFusionPlanningSnafu,
ExpectRangeSelectorSnafu, FunctionInvalidArgumentSnafu, InvalidTimeRangeSnafu,
MultiFieldsNotSupportedSnafu, MultipleMetricMatchersSnafu, MultipleVectorSnafu,
NoMetricMatcherSnafu, PromqlPlanNodeSnafu, Result, TableNameNotFoundSnafu,
NoMetricMatcherSnafu, PromqlPlanNodeSnafu, Result, SameLabelSetSnafu, TableNameNotFoundSnafu,
TimeIndexNotFoundSnafu, UnexpectedPlanExprSnafu, UnexpectedTokenSnafu, UnknownTableSnafu,
UnsupportedExprSnafu, UnsupportedMatcherOpSnafu, UnsupportedVectorMatchSnafu,
ValueNotFoundSnafu, ZeroRangeSelectorSnafu,
@@ -786,7 +786,7 @@ impl PromPlanner {
),
})
};
let mut func_exprs =
let (mut func_exprs, new_tags) =
self.create_function_expr(func, args.literals.clone(), session_state)?;
func_exprs.insert(0, self.create_time_index_column_expr()?);
func_exprs.extend_from_slice(&self.create_tag_column_exprs()?);
@@ -822,6 +822,12 @@ impl PromPlanner {
_ => builder,
};
// Update context tags after building plan
// We can't push them before planning, because they won't exist until projection.
for tag in new_tags {
self.ctx.tag_columns.push(tag);
}
builder.build().context(DataFusionPlanningSnafu)
}
@@ -1458,21 +1464,25 @@ impl PromPlanner {
Ok(result)
}
/// Creates function expressions for projection and returns the expressions and new tags.
///
/// # Side Effects
///
/// This method will update [PromPlannerContext]'s value fields.
/// This method will update [PromPlannerContext]'s fields and tags if needed.
fn create_function_expr(
&mut self,
func: &Function,
other_input_exprs: Vec<DfExpr>,
session_state: &SessionState,
) -> Result<Vec<DfExpr>> {
) -> Result<(Vec<DfExpr>, Vec<String>)> {
// TODO(ruihang): check function args list
let mut other_input_exprs: VecDeque<DfExpr> = other_input_exprs.into();
// TODO(ruihang): set this according to in-param list
let field_column_pos = 0;
let mut exprs = Vec::with_capacity(self.ctx.field_columns.len());
// New labels after executing the function, e.g. `label_replace` etc.
let mut new_tags = vec![];
let scalar_func = match func.name {
"increase" => ScalarFunc::ExtrapolateUdf(
Arc::new(Increase::scalar_udf()),
@@ -1605,32 +1615,41 @@ impl PromPlanner {
}
}
// Remove it from tag columns
// Remove it from tag columns if exists to avoid duplicated column names
self.ctx.tag_columns.retain(|tag| *tag != dst_label);
// Add the new label expr
new_tags.push(dst_label);
// Add the new label expr to evaluate
exprs.push(concat_expr);
ScalarFunc::GeneratedExpr
}
"label_replace" => {
let (replace_expr, dst_label) =
Self::build_regexp_replace_label_expr(&mut other_input_exprs, session_state)?;
if let Some((replace_expr, dst_label)) =
self.build_regexp_replace_label_expr(&mut other_input_exprs, session_state)?
{
// Reserve the current field columns except the `dst_label`.
for value in &self.ctx.field_columns {
if *value != dst_label {
let expr = DfExpr::Column(Column::from_name(value));
exprs.push(expr);
}
}
// Reserve the current field columns except the `dst_label`.
for value in &self.ctx.field_columns {
if *value != dst_label {
ensure!(
!self.ctx.tag_columns.contains(&dst_label),
SameLabelSetSnafu
);
new_tags.push(dst_label);
// Add the new label expr to evaluate
exprs.push(replace_expr);
} else {
// Keep the current field columns
for value in &self.ctx.field_columns {
let expr = DfExpr::Column(Column::from_name(value));
exprs.push(expr);
}
}
// Remove it from tag columns
self.ctx.tag_columns.retain(|tag| *tag != dst_label);
// Add the new label expr
exprs.push(replace_expr);
ScalarFunc::GeneratedExpr
}
"sort" | "sort_desc" | "sort_by_label" | "sort_by_label_desc" => {
@@ -1730,29 +1749,34 @@ impl PromPlanner {
}
}
// update value columns' name, and alias them to remove qualifiers
let mut new_field_columns = Vec::with_capacity(exprs.len());
// Update value columns' name, and alias them to remove qualifiers
// For label functions such as `label_join`, `label_replace`, etc.,
// we keep the fields unchanged.
if !matches!(func.name, "label_join" | "label_replace") {
let mut new_field_columns = Vec::with_capacity(exprs.len());
exprs = exprs
.into_iter()
.map(|expr| {
let display_name = expr.schema_name().to_string();
new_field_columns.push(display_name.clone());
Ok(expr.alias(display_name))
})
.collect::<std::result::Result<Vec<_>, _>>()
.context(DataFusionPlanningSnafu)?;
exprs = exprs
.into_iter()
.map(|expr| {
let display_name = expr.schema_name().to_string();
new_field_columns.push(display_name.clone());
Ok(expr.alias(display_name))
})
.collect::<std::result::Result<Vec<_>, _>>()
.context(DataFusionPlanningSnafu)?;
self.ctx.field_columns = new_field_columns;
self.ctx.field_columns = new_field_columns;
}
Ok(exprs)
Ok((exprs, new_tags))
}
/// Build expr for `label_replace` function
fn build_regexp_replace_label_expr(
&self,
other_input_exprs: &mut VecDeque<DfExpr>,
session_state: &SessionState,
) -> Result<(DfExpr, String)> {
) -> Result<Option<(DfExpr, String)>> {
// label_replace(vector, dst_label, replacement, src_label, regex)
let dst_label = match other_input_exprs.pop_front() {
Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)))) => d,
@@ -1775,6 +1799,7 @@ impl PromPlanner {
}
.fail()?,
};
let regex = match other_input_exprs.pop_front() {
Some(DfExpr::Literal(ScalarValue::Utf8(Some(r)))) => r,
other => UnexpectedPlanExprSnafu {
@@ -1783,6 +1808,30 @@ impl PromPlanner {
.fail()?,
};
// If the src_label exists and regex is empty, keep everything unchanged.
if self.ctx.tag_columns.contains(&src_label) && regex.is_empty() {
return Ok(None);
}
// If the src_label doesn't exists, and
if !self.ctx.tag_columns.contains(&src_label) {
if replacement.is_empty() {
// the replacement is empty, keep everything unchanged.
return Ok(None);
} else {
// the replacement is not empty, always adds dst_label with replacement value.
return Ok(Some((
// alias literal `replacement` as dst_label
DfExpr::Literal(ScalarValue::Utf8(Some(replacement))).alias(&dst_label),
dst_label,
)));
}
}
// Preprocess the regex:
// https://github.com/prometheus/prometheus/blob/d902abc50d6652ba8fe9a81ff8e5cce936114eba/promql/functions.go#L1575C32-L1575C37
let regex = format!("^(?s:{regex})$");
let func = session_state
.scalar_functions()
.get("regexp_replace")
@@ -1801,14 +1850,14 @@ impl PromPlanner {
DfExpr::Literal(ScalarValue::Utf8(Some(replacement))),
];
Ok((
Ok(Some((
DfExpr::ScalarFunction(ScalarFunction {
func: func.clone(),
args,
})
.alias(&dst_label),
dst_label,
))
)))
}
/// Build expr for `label_join` function
@@ -3763,7 +3812,7 @@ mod test {
lookback_delta: Duration::from_secs(1),
};
let case = r#"label_replace(histogram_quantile(0.99, sum by(pod, le, path, code) (rate(greptime_servers_grpc_requests_elapsed_bucket{container="frontend"}[1m0s]))), "pod", "$1", "pod", "greptimedb-frontend-[0-9a-z]*-(.*)")"#;
let case = r#"label_replace(histogram_quantile(0.99, sum by(pod, le, path, code) (rate(greptime_servers_grpc_requests_elapsed_bucket{container="frontend"}[1m0s]))), "pod_new", "$1", "pod", "greptimedb-frontend-[0-9a-z]*-(.*)")"#;
let prom_expr = parser::parse(case).unwrap();
eval_stmt.expr = prom_expr;
@@ -4339,15 +4388,17 @@ mod test {
.await
.unwrap();
let expected = "Filter: field_0 IS NOT NULL AND foo IS NOT NULL [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8]\
\n Projection: up.timestamp, up.field_0 AS field_0, concat_ws(Utf8(\",\"), up.tag_1, up.tag_2, up.tag_3) AS foo AS foo, up.tag_0, up.tag_1, up.tag_2, up.tag_3 [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8]\
\n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromSeriesDivide: tags=[\"tag_0\", \"tag_1\", \"tag_2\", \"tag_3\"] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Sort: up.tag_0 ASC NULLS FIRST, up.tag_1 ASC NULLS FIRST, up.tag_2 ASC NULLS FIRST, up.tag_3 ASC NULLS FIRST, up.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Filter: up.tag_0 = Utf8(\"api-server\") AND up.timestamp >= TimestampMillisecond(-1000, None) AND up.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n TableScan: up [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]";
let expected = r#"
Filter: up.field_0 IS NOT NULL [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8]
Projection: up.timestamp, up.field_0, concat_ws(Utf8(","), up.tag_1, up.tag_2, up.tag_3) AS foo, up.tag_0, up.tag_1, up.tag_2, up.tag_3 [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8]
PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
PromSeriesDivide: tags=["tag_0", "tag_1", "tag_2", "tag_3"] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
Sort: up.tag_0 ASC NULLS FIRST, up.tag_1 ASC NULLS FIRST, up.tag_2 ASC NULLS FIRST, up.tag_3 ASC NULLS FIRST, up.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
Filter: up.tag_0 = Utf8("api-server") AND up.timestamp >= TimestampMillisecond(-1000, None) AND up.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
TableScan: up [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"#;
assert_eq!(plan.display_indent_schema().to_string(), expected);
let ret = plan.display_indent_schema().to_string();
assert_eq!(format!("\n{ret}"), expected, "\n{}", ret);
}
#[tokio::test]
@@ -4373,15 +4424,17 @@ mod test {
.await
.unwrap();
let expected = "Filter: field_0 IS NOT NULL AND foo IS NOT NULL [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8]\
\n Projection: up.timestamp, up.field_0 AS field_0, regexp_replace(up.tag_0, Utf8(\"(.*):.*\"), Utf8(\"$1\")) AS foo AS foo, up.tag_0 [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8]\
\n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Sort: up.tag_0 ASC NULLS FIRST, up.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Filter: up.tag_0 = Utf8(\"a:c\") AND up.timestamp >= TimestampMillisecond(-1000, None) AND up.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n TableScan: up [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]";
let expected = r#"
Filter: up.field_0 IS NOT NULL [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8]
Projection: up.timestamp, up.field_0, regexp_replace(up.tag_0, Utf8("^(?s:(.*):.*)$"), Utf8("$1")) AS foo, up.tag_0 [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8]
PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
PromSeriesDivide: tags=["tag_0"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
Sort: up.tag_0 ASC NULLS FIRST, up.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
Filter: up.tag_0 = Utf8("a:c") AND up.timestamp >= TimestampMillisecond(-1000, None) AND up.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
TableScan: up [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"#;
assert_eq!(plan.display_indent_schema().to_string(), expected);
let ret = plan.display_indent_schema().to_string();
assert_eq!(format!("\n{ret}"), expected, "\n{}", ret);
}
#[tokio::test]