From 33a2485f54a35161b0f85da0cd423a61d5739fbc Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Fri, 14 Feb 2025 15:40:15 +0900 Subject: [PATCH] fix(promql): ignore filters for non-existent labels (#5519) * fix(promql): ignore filters for non-existent labels * chore: add comments * test: add sqlness test --- src/query/src/promql/planner.rs | 92 +++++++++++++++---- .../promql/non_existent_matchers.result | 44 +++++++++ .../common/promql/non_existent_matchers.sql | 17 ++++ 3 files changed, 135 insertions(+), 18 deletions(-) create mode 100644 tests/cases/standalone/common/promql/non_existent_matchers.result create mode 100644 tests/cases/standalone/common/promql/non_existent_matchers.sql diff --git a/src/query/src/promql/planner.rs b/src/query/src/promql/planner.rs index 27c9f36770..e64e67183b 100644 --- a/src/query/src/promql/planner.rs +++ b/src/query/src/promql/planner.rs @@ -44,6 +44,7 @@ use datafusion_expr::utils::conjunction; use datafusion_expr::SortExpr; use datatypes::arrow::datatypes::{DataType as ArrowDataType, TimeUnit as ArrowTimeUnit}; use datatypes::data_type::ConcreteDataType; +use datatypes::schema::Schema; use itertools::Itertools; use promql::extension_plan::{ build_special_time_expr, EmptyMetric, HistogramFold, InstantManipulate, Millisecond, @@ -764,14 +765,24 @@ impl PromPlanner { Some(Offset::Neg(duration)) => -(duration.as_millis() as Millisecond), None => 0, }; - let mut scan_filters = self.matchers_to_expr(label_matchers.clone())?; - if let Some(time_index_filter) = self.build_time_index_filter(offset_duration)? { - scan_filters.push(time_index_filter); - } + + let time_index_filter = self.build_time_index_filter(offset_duration)?; // make table scan with filter exprs let table_ref = self.table_ref()?; + + let moved_label_matchers = label_matchers.clone(); let mut table_scan = self - .create_table_scan_plan(table_ref.clone(), scan_filters.clone()) + .create_table_scan_plan(table_ref.clone(), |schema| { + let mut scan_filters = + PromPlanner::matchers_to_expr(moved_label_matchers, |name| { + schema.column_index_by_name(name).is_some() + })?; + if let Some(time_index_filter) = time_index_filter { + scan_filters.push(time_index_filter); + } + + Ok(scan_filters) + }) .await?; // make a projection plan if there is any `__field__` matcher @@ -952,10 +963,19 @@ impl PromPlanner { } } - // TODO(ruihang): ignore `MetricNameLabel` (`__name__`) matcher - fn matchers_to_expr(&self, label_matchers: Matchers) -> Result> { + /// Convert [`Matchers`] to [`DfExpr`]s. + /// + /// This method will filter out the matchers that don't match the filter function. + fn matchers_to_expr(label_matchers: Matchers, filter: F) -> Result> + where + F: Fn(&str) -> bool, + { let mut exprs = Vec::with_capacity(label_matchers.matchers.len()); for matcher in label_matchers.matchers { + // ignores the matchers that don't match the filter function + if !filter(&matcher.name) { + continue; + } let col = DfExpr::Column(Column::from_name(matcher.name)); let lit = DfExpr::Literal(ScalarValue::Utf8(Some(matcher.value))); let expr = match matcher.op { @@ -1050,18 +1070,21 @@ impl PromPlanner { /// /// # Panic /// If the filter is empty - async fn create_table_scan_plan( + async fn create_table_scan_plan( &mut self, table_ref: TableReference, - filter: Vec, - ) -> Result { + filter_builder: F, + ) -> Result + where + F: FnOnce(&Schema) -> Result>, + { let provider = self .table_provider .resolve_table(table_ref.clone()) .await .context(CatalogSnafu)?; - let is_time_index_ms = provider + let schema = provider .as_any() .downcast_ref::() .context(UnknownTableSnafu)? @@ -1070,7 +1093,9 @@ impl PromPlanner { .downcast_ref::() .context(UnknownTableSnafu)? .table() - .schema() + .schema(); + + let is_time_index_ms = schema .timestamp_column() .with_context(|| TimeIndexNotFoundSnafu { table: table_ref.to_quoted_string(), @@ -1078,6 +1103,7 @@ impl PromPlanner { .data_type == ConcreteDataType::timestamp_millisecond_datatype(); + let filter = filter_builder(schema.as_ref())?; let mut scan_plan = LogicalPlanBuilder::scan(table_ref.clone(), provider, None) .context(DataFusionPlanningSnafu)? .build() @@ -1114,12 +1140,14 @@ impl PromPlanner { .context(DataFusionPlanningSnafu)?; } - // Safety: `scan_filters` is not empty. - let result = LogicalPlanBuilder::from(scan_plan) - .filter(conjunction(filter).unwrap()) - .context(DataFusionPlanningSnafu)? - .build() - .context(DataFusionPlanningSnafu)?; + let mut builder = LogicalPlanBuilder::from(scan_plan); + if !filter.is_empty() { + // Safety: filter is not empty, checked above + builder = builder + .filter(conjunction(filter).unwrap()) + .context(DataFusionPlanningSnafu)?; + } + let result = builder.build().context(DataFusionPlanningSnafu)?; Ok(result) } @@ -3497,6 +3525,34 @@ mod test { ); } + #[tokio::test] + async fn test_nonexistent_label() { + // template + let mut eval_stmt = EvalStmt { + expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }), + start: UNIX_EPOCH, + end: UNIX_EPOCH + .checked_add(Duration::from_secs(100_000)) + .unwrap(), + interval: Duration::from_secs(5), + lookback_delta: Duration::from_secs(1), + }; + + let case = r#"some_metric{nonexistent="hi"}"#; + let prom_expr = parser::parse(case).unwrap(); + eval_stmt.expr = prom_expr; + let table_provider = build_test_table_provider( + &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())], + 3, + 3, + ) + .await; + // Should be ok + let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state()) + .await + .unwrap(); + } + #[tokio::test] async fn test_label_join() { let prom_expr = parser::parse( diff --git a/tests/cases/standalone/common/promql/non_existent_matchers.result b/tests/cases/standalone/common/promql/non_existent_matchers.result new file mode 100644 index 0000000000..10a6bf4704 --- /dev/null +++ b/tests/cases/standalone/common/promql/non_existent_matchers.result @@ -0,0 +1,44 @@ +CREATE TABLE test ( + ts timestamp(3) time index, + host STRING, + val BIGINT, + PRIMARY KEY(host), + ); + +Affected Rows: 0 + +INSERT INTO TABLE test VALUES + (0, 'host1', 1), + (0, 'host2', 2); + +Affected Rows: 2 + +SELECT * FROM test; + ++---------------------+-------+-----+ +| ts | host | val | ++---------------------+-------+-----+ +| 1970-01-01T00:00:00 | host1 | 1 | +| 1970-01-01T00:00:00 | host2 | 2 | ++---------------------+-------+-----+ + +-- test the non-existent matchers -- +TQL EVAL (0, 15, '5s') test{job=~"host1|host3"}; + ++---------------------+-------+-----+ +| ts | host | val | ++---------------------+-------+-----+ +| 1970-01-01T00:00:00 | host1 | 1 | +| 1970-01-01T00:00:05 | host1 | 1 | +| 1970-01-01T00:00:10 | host1 | 1 | +| 1970-01-01T00:00:15 | host1 | 1 | +| 1970-01-01T00:00:00 | host2 | 2 | +| 1970-01-01T00:00:05 | host2 | 2 | +| 1970-01-01T00:00:10 | host2 | 2 | +| 1970-01-01T00:00:15 | host2 | 2 | ++---------------------+-------+-----+ + +DROP TABLE test; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/promql/non_existent_matchers.sql b/tests/cases/standalone/common/promql/non_existent_matchers.sql new file mode 100644 index 0000000000..ce04207e26 --- /dev/null +++ b/tests/cases/standalone/common/promql/non_existent_matchers.sql @@ -0,0 +1,17 @@ +CREATE TABLE test ( + ts timestamp(3) time index, + host STRING, + val BIGINT, + PRIMARY KEY(host), + ); + +INSERT INTO TABLE test VALUES + (0, 'host1', 1), + (0, 'host2', 2); + +SELECT * FROM test; + +-- test the non-existent matchers -- +TQL EVAL (0, 15, '5s') test{job=~"host1|host3"}; + +DROP TABLE test;