fix: correct promql behavior on nonexistent columns (#5547)

* Revert "fix(promql): ignore filters for non-existent labels (#5519)"

This reverts commit 33a2485f54.

* reimplement

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* state safety

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2025-02-17 10:43:50 -08:00
committed by GitHub
parent deb9520970
commit 4ef038d098
5 changed files with 126 additions and 114 deletions

View File

@@ -44,7 +44,6 @@ use datafusion_expr::utils::conjunction;
use datafusion_expr::SortExpr;
use datatypes::arrow::datatypes::{DataType as ArrowDataType, TimeUnit as ArrowTimeUnit};
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::Schema;
use itertools::Itertools;
use promql::extension_plan::{
build_special_time_expr, EmptyMetric, HistogramFold, InstantManipulate, Millisecond,
@@ -759,31 +758,26 @@ impl PromPlanner {
label_matchers: Matchers,
is_range_selector: bool,
) -> Result<LogicalPlan> {
// make table scan plan
let table_ref = self.table_ref()?;
let mut table_scan = self.create_table_scan_plan(table_ref.clone()).await?;
let table_schema = table_scan.schema();
// make filter exprs
let offset_duration = match offset {
Some(Offset::Pos(duration)) => duration.as_millis() as Millisecond,
Some(Offset::Neg(duration)) => -(duration.as_millis() as Millisecond),
None => 0,
};
let time_index_filter = self.build_time_index_filter(offset_duration)?;
// make table scan with filter exprs
let table_ref = self.table_ref()?;
let moved_label_matchers = label_matchers.clone();
let mut table_scan = self
.create_table_scan_plan(table_ref.clone(), |schema| {
let mut scan_filters =
PromPlanner::matchers_to_expr(moved_label_matchers, |name| {
schema.column_index_by_name(name).is_some()
})?;
if let Some(time_index_filter) = time_index_filter {
scan_filters.push(time_index_filter);
}
Ok(scan_filters)
})
.await?;
let mut scan_filters = self.matchers_to_expr(label_matchers.clone(), table_schema)?;
if let Some(time_index_filter) = self.build_time_index_filter(offset_duration)? {
scan_filters.push(time_index_filter);
}
table_scan = LogicalPlanBuilder::from(table_scan)
.filter(conjunction(scan_filters).unwrap()) // Safety: `scan_filters` is not empty.
.context(DataFusionPlanningSnafu)?
.build()
.context(DataFusionPlanningSnafu)?;
// make a projection plan if there is any `__field__` matcher
if let Some(field_matchers) = &self.ctx.field_column_matcher {
@@ -963,20 +957,22 @@ impl PromPlanner {
}
}
/// Convert [`Matchers`] to [`DfExpr`]s.
///
/// This method will filter out the matchers that don't match the filter function.
fn matchers_to_expr<F>(label_matchers: Matchers, filter: F) -> Result<Vec<DfExpr>>
where
F: Fn(&str) -> bool,
{
// TODO(ruihang): ignore `MetricNameLabel` (`__name__`) matcher
fn matchers_to_expr(
&self,
label_matchers: Matchers,
table_schema: &DFSchemaRef,
) -> Result<Vec<DfExpr>> {
let mut exprs = Vec::with_capacity(label_matchers.matchers.len());
for matcher in label_matchers.matchers {
// ignores the matchers that don't match the filter function
if !filter(&matcher.name) {
continue;
}
let col = DfExpr::Column(Column::from_name(matcher.name));
let col = if table_schema
.field_with_unqualified_name(&matcher.name)
.is_err()
{
DfExpr::Literal(ScalarValue::Utf8(Some(String::new()))).alias(matcher.name)
} else {
DfExpr::Column(Column::from_name(matcher.name))
};
let lit = DfExpr::Literal(ScalarValue::Utf8(Some(matcher.value)));
let expr = match matcher.op {
MatchOp::Equal => col.eq(lit),
@@ -1076,21 +1072,14 @@ impl PromPlanner {
///
/// # Panic
/// If the filter is empty
async fn create_table_scan_plan<F>(
&mut self,
table_ref: TableReference,
filter_builder: F,
) -> Result<LogicalPlan>
where
F: FnOnce(&Schema) -> Result<Vec<DfExpr>>,
{
async fn create_table_scan_plan(&mut self, table_ref: TableReference) -> Result<LogicalPlan> {
let provider = self
.table_provider
.resolve_table(table_ref.clone())
.await
.context(CatalogSnafu)?;
let schema = provider
let is_time_index_ms = provider
.as_any()
.downcast_ref::<DefaultTableSource>()
.context(UnknownTableSnafu)?
@@ -1099,9 +1088,7 @@ impl PromPlanner {
.downcast_ref::<DfTableProviderAdapter>()
.context(UnknownTableSnafu)?
.table()
.schema();
let is_time_index_ms = schema
.schema()
.timestamp_column()
.with_context(|| TimeIndexNotFoundSnafu {
table: table_ref.to_quoted_string(),
@@ -1109,7 +1096,6 @@ impl PromPlanner {
.data_type
== ConcreteDataType::timestamp_millisecond_datatype();
let filter = filter_builder(schema.as_ref())?;
let mut scan_plan = LogicalPlanBuilder::scan(table_ref.clone(), provider, None)
.context(DataFusionPlanningSnafu)?
.build()
@@ -1146,14 +1132,9 @@ impl PromPlanner {
.context(DataFusionPlanningSnafu)?;
}
let mut builder = LogicalPlanBuilder::from(scan_plan);
if !filter.is_empty() {
// Safety: filter is not empty, checked above
builder = builder
.filter(conjunction(filter).unwrap())
.context(DataFusionPlanningSnafu)?;
}
let result = builder.build().context(DataFusionPlanningSnafu)?;
let result = LogicalPlanBuilder::from(scan_plan)
.build()
.context(DataFusionPlanningSnafu)?;
Ok(result)
}

View File

@@ -197,3 +197,67 @@ DROP TABLE test;
Affected Rows: 0
CREATE TABLE test (
ts timestamp(3) time index,
host STRING,
val BIGINT,
PRIMARY KEY(host),
);
Affected Rows: 0
INSERT INTO TABLE test VALUES
(0, 'host1', 1),
(0, 'host2', 2);
Affected Rows: 2
SELECT * FROM test;
+---------------------+-------+-----+
| ts | host | val |
+---------------------+-------+-----+
| 1970-01-01T00:00:00 | host1 | 1 |
| 1970-01-01T00:00:00 | host2 | 2 |
+---------------------+-------+-----+
-- test the non-existent matchers --
TQL EVAL (0, 1, '5s') test{job=~"host1|host3"};
++
++
-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 1, '5s') test{job=~".*"};
+---------------------+-------+-----+
| ts | host | val |
+---------------------+-------+-----+
| 1970-01-01T00:00:00 | host1 | 1 |
| 1970-01-01T00:00:00 | host2 | 2 |
+---------------------+-------+-----+
TQL EVAL (0, 1, '5s') test{job=~".+"};
++
++
-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 1, '5s') test{job=""};
+---------------------+-------+-----+
| ts | host | val |
+---------------------+-------+-----+
| 1970-01-01T00:00:00 | host1 | 1 |
| 1970-01-01T00:00:00 | host2 | 2 |
+---------------------+-------+-----+
TQL EVAL (0, 1, '5s') test{job!=""};
++
++
DROP TABLE test;
Affected Rows: 0

View File

@@ -53,3 +53,31 @@ TQL EVAL (0, 15, '5s') label_replace(test{host="host2"}, "idc", "$2", "idc", "(.
TQL EVAL (0, 15, '5s') label_replace(test{host="host2"}, "idc", "", "", "");
DROP TABLE test;
CREATE TABLE test (
ts timestamp(3) time index,
host STRING,
val BIGINT,
PRIMARY KEY(host),
);
INSERT INTO TABLE test VALUES
(0, 'host1', 1),
(0, 'host2', 2);
SELECT * FROM test;
-- test the non-existent matchers --
TQL EVAL (0, 1, '5s') test{job=~"host1|host3"};
-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 1, '5s') test{job=~".*"};
TQL EVAL (0, 1, '5s') test{job=~".+"};
-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 1, '5s') test{job=""};
TQL EVAL (0, 1, '5s') test{job!=""};
DROP TABLE test;

View File

@@ -1,44 +0,0 @@
CREATE TABLE test (
ts timestamp(3) time index,
host STRING,
val BIGINT,
PRIMARY KEY(host),
);
Affected Rows: 0
INSERT INTO TABLE test VALUES
(0, 'host1', 1),
(0, 'host2', 2);
Affected Rows: 2
SELECT * FROM test;
+---------------------+-------+-----+
| ts | host | val |
+---------------------+-------+-----+
| 1970-01-01T00:00:00 | host1 | 1 |
| 1970-01-01T00:00:00 | host2 | 2 |
+---------------------+-------+-----+
-- test the non-existent matchers --
TQL EVAL (0, 15, '5s') test{job=~"host1|host3"};
+---------------------+-------+-----+
| ts | host | val |
+---------------------+-------+-----+
| 1970-01-01T00:00:00 | host1 | 1 |
| 1970-01-01T00:00:05 | host1 | 1 |
| 1970-01-01T00:00:10 | host1 | 1 |
| 1970-01-01T00:00:15 | host1 | 1 |
| 1970-01-01T00:00:00 | host2 | 2 |
| 1970-01-01T00:00:05 | host2 | 2 |
| 1970-01-01T00:00:10 | host2 | 2 |
| 1970-01-01T00:00:15 | host2 | 2 |
+---------------------+-------+-----+
DROP TABLE test;
Affected Rows: 0

View File

@@ -1,17 +0,0 @@
CREATE TABLE test (
ts timestamp(3) time index,
host STRING,
val BIGINT,
PRIMARY KEY(host),
);
INSERT INTO TABLE test VALUES
(0, 'host1', 1),
(0, 'host2', 2);
SELECT * FROM test;
-- test the non-existent matchers --
TQL EVAL (0, 15, '5s') test{job=~"host1|host3"};
DROP TABLE test;