mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-18 22:10:42 +00:00
fix(promql): ignore filters for non-existent labels (#5519)
* fix(promql): ignore filters for non-existent labels * chore: add comments * test: add sqlness test
This commit is contained in:
@@ -44,6 +44,7 @@ use datafusion_expr::utils::conjunction;
|
||||
use datafusion_expr::SortExpr;
|
||||
use datatypes::arrow::datatypes::{DataType as ArrowDataType, TimeUnit as ArrowTimeUnit};
|
||||
use datatypes::data_type::ConcreteDataType;
|
||||
use datatypes::schema::Schema;
|
||||
use itertools::Itertools;
|
||||
use promql::extension_plan::{
|
||||
build_special_time_expr, EmptyMetric, HistogramFold, InstantManipulate, Millisecond,
|
||||
@@ -764,14 +765,24 @@ impl PromPlanner {
|
||||
Some(Offset::Neg(duration)) => -(duration.as_millis() as Millisecond),
|
||||
None => 0,
|
||||
};
|
||||
let mut scan_filters = self.matchers_to_expr(label_matchers.clone())?;
|
||||
if let Some(time_index_filter) = self.build_time_index_filter(offset_duration)? {
|
||||
scan_filters.push(time_index_filter);
|
||||
}
|
||||
|
||||
let time_index_filter = self.build_time_index_filter(offset_duration)?;
|
||||
// make table scan with filter exprs
|
||||
let table_ref = self.table_ref()?;
|
||||
|
||||
let moved_label_matchers = label_matchers.clone();
|
||||
let mut table_scan = self
|
||||
.create_table_scan_plan(table_ref.clone(), scan_filters.clone())
|
||||
.create_table_scan_plan(table_ref.clone(), |schema| {
|
||||
let mut scan_filters =
|
||||
PromPlanner::matchers_to_expr(moved_label_matchers, |name| {
|
||||
schema.column_index_by_name(name).is_some()
|
||||
})?;
|
||||
if let Some(time_index_filter) = time_index_filter {
|
||||
scan_filters.push(time_index_filter);
|
||||
}
|
||||
|
||||
Ok(scan_filters)
|
||||
})
|
||||
.await?;
|
||||
|
||||
// make a projection plan if there is any `__field__` matcher
|
||||
@@ -952,10 +963,19 @@ impl PromPlanner {
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(ruihang): ignore `MetricNameLabel` (`__name__`) matcher
|
||||
fn matchers_to_expr(&self, label_matchers: Matchers) -> Result<Vec<DfExpr>> {
|
||||
/// Convert [`Matchers`] to [`DfExpr`]s.
|
||||
///
|
||||
/// This method will filter out the matchers that don't match the filter function.
|
||||
fn matchers_to_expr<F>(label_matchers: Matchers, filter: F) -> Result<Vec<DfExpr>>
|
||||
where
|
||||
F: Fn(&str) -> bool,
|
||||
{
|
||||
let mut exprs = Vec::with_capacity(label_matchers.matchers.len());
|
||||
for matcher in label_matchers.matchers {
|
||||
// ignores the matchers that don't match the filter function
|
||||
if !filter(&matcher.name) {
|
||||
continue;
|
||||
}
|
||||
let col = DfExpr::Column(Column::from_name(matcher.name));
|
||||
let lit = DfExpr::Literal(ScalarValue::Utf8(Some(matcher.value)));
|
||||
let expr = match matcher.op {
|
||||
@@ -1050,18 +1070,21 @@ impl PromPlanner {
|
||||
///
|
||||
/// # Panic
|
||||
/// If the filter is empty
|
||||
async fn create_table_scan_plan(
|
||||
async fn create_table_scan_plan<F>(
|
||||
&mut self,
|
||||
table_ref: TableReference,
|
||||
filter: Vec<DfExpr>,
|
||||
) -> Result<LogicalPlan> {
|
||||
filter_builder: F,
|
||||
) -> Result<LogicalPlan>
|
||||
where
|
||||
F: FnOnce(&Schema) -> Result<Vec<DfExpr>>,
|
||||
{
|
||||
let provider = self
|
||||
.table_provider
|
||||
.resolve_table(table_ref.clone())
|
||||
.await
|
||||
.context(CatalogSnafu)?;
|
||||
|
||||
let is_time_index_ms = provider
|
||||
let schema = provider
|
||||
.as_any()
|
||||
.downcast_ref::<DefaultTableSource>()
|
||||
.context(UnknownTableSnafu)?
|
||||
@@ -1070,7 +1093,9 @@ impl PromPlanner {
|
||||
.downcast_ref::<DfTableProviderAdapter>()
|
||||
.context(UnknownTableSnafu)?
|
||||
.table()
|
||||
.schema()
|
||||
.schema();
|
||||
|
||||
let is_time_index_ms = schema
|
||||
.timestamp_column()
|
||||
.with_context(|| TimeIndexNotFoundSnafu {
|
||||
table: table_ref.to_quoted_string(),
|
||||
@@ -1078,6 +1103,7 @@ impl PromPlanner {
|
||||
.data_type
|
||||
== ConcreteDataType::timestamp_millisecond_datatype();
|
||||
|
||||
let filter = filter_builder(schema.as_ref())?;
|
||||
let mut scan_plan = LogicalPlanBuilder::scan(table_ref.clone(), provider, None)
|
||||
.context(DataFusionPlanningSnafu)?
|
||||
.build()
|
||||
@@ -1114,12 +1140,14 @@ impl PromPlanner {
|
||||
.context(DataFusionPlanningSnafu)?;
|
||||
}
|
||||
|
||||
// Safety: `scan_filters` is not empty.
|
||||
let result = LogicalPlanBuilder::from(scan_plan)
|
||||
.filter(conjunction(filter).unwrap())
|
||||
.context(DataFusionPlanningSnafu)?
|
||||
.build()
|
||||
.context(DataFusionPlanningSnafu)?;
|
||||
let mut builder = LogicalPlanBuilder::from(scan_plan);
|
||||
if !filter.is_empty() {
|
||||
// Safety: filter is not empty, checked above
|
||||
builder = builder
|
||||
.filter(conjunction(filter).unwrap())
|
||||
.context(DataFusionPlanningSnafu)?;
|
||||
}
|
||||
let result = builder.build().context(DataFusionPlanningSnafu)?;
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
@@ -3497,6 +3525,34 @@ mod test {
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_nonexistent_label() {
|
||||
// template
|
||||
let mut eval_stmt = EvalStmt {
|
||||
expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
|
||||
start: UNIX_EPOCH,
|
||||
end: UNIX_EPOCH
|
||||
.checked_add(Duration::from_secs(100_000))
|
||||
.unwrap(),
|
||||
interval: Duration::from_secs(5),
|
||||
lookback_delta: Duration::from_secs(1),
|
||||
};
|
||||
|
||||
let case = r#"some_metric{nonexistent="hi"}"#;
|
||||
let prom_expr = parser::parse(case).unwrap();
|
||||
eval_stmt.expr = prom_expr;
|
||||
let table_provider = build_test_table_provider(
|
||||
&[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
|
||||
3,
|
||||
3,
|
||||
)
|
||||
.await;
|
||||
// Should be ok
|
||||
let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_label_join() {
|
||||
let prom_expr = parser::parse(
|
||||
|
||||
@@ -0,0 +1,44 @@
|
||||
CREATE TABLE test (
|
||||
ts timestamp(3) time index,
|
||||
host STRING,
|
||||
val BIGINT,
|
||||
PRIMARY KEY(host),
|
||||
);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
INSERT INTO TABLE test VALUES
|
||||
(0, 'host1', 1),
|
||||
(0, 'host2', 2);
|
||||
|
||||
Affected Rows: 2
|
||||
|
||||
SELECT * FROM test;
|
||||
|
||||
+---------------------+-------+-----+
|
||||
| ts | host | val |
|
||||
+---------------------+-------+-----+
|
||||
| 1970-01-01T00:00:00 | host1 | 1 |
|
||||
| 1970-01-01T00:00:00 | host2 | 2 |
|
||||
+---------------------+-------+-----+
|
||||
|
||||
-- test the non-existent matchers --
|
||||
TQL EVAL (0, 15, '5s') test{job=~"host1|host3"};
|
||||
|
||||
+---------------------+-------+-----+
|
||||
| ts | host | val |
|
||||
+---------------------+-------+-----+
|
||||
| 1970-01-01T00:00:00 | host1 | 1 |
|
||||
| 1970-01-01T00:00:05 | host1 | 1 |
|
||||
| 1970-01-01T00:00:10 | host1 | 1 |
|
||||
| 1970-01-01T00:00:15 | host1 | 1 |
|
||||
| 1970-01-01T00:00:00 | host2 | 2 |
|
||||
| 1970-01-01T00:00:05 | host2 | 2 |
|
||||
| 1970-01-01T00:00:10 | host2 | 2 |
|
||||
| 1970-01-01T00:00:15 | host2 | 2 |
|
||||
+---------------------+-------+-----+
|
||||
|
||||
DROP TABLE test;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
@@ -0,0 +1,17 @@
|
||||
CREATE TABLE test (
|
||||
ts timestamp(3) time index,
|
||||
host STRING,
|
||||
val BIGINT,
|
||||
PRIMARY KEY(host),
|
||||
);
|
||||
|
||||
INSERT INTO TABLE test VALUES
|
||||
(0, 'host1', 1),
|
||||
(0, 'host2', 2);
|
||||
|
||||
SELECT * FROM test;
|
||||
|
||||
-- test the non-existent matchers --
|
||||
TQL EVAL (0, 15, '5s') test{job=~"host1|host3"};
|
||||
|
||||
DROP TABLE test;
|
||||
Reference in New Issue
Block a user