feat: impl __field__ special matcher to project value columns (#1320)

* plan new come functions

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* implement __value__ matcher

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix typo

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* change __value__ to __field__

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add bad-case tests

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* rename variables

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2023-04-04 17:08:50 +08:00
committed by GitHub
parent 637a4a2a58
commit a2d8804129
5 changed files with 233 additions and 11 deletions

View File

@@ -100,6 +100,9 @@ pub enum Error {
#[snafu(display("Zero range in range selector"))]
ZeroRangeSelector { backtrace: Backtrace },
#[snafu(display("Cannot find column {col}"))]
ColumnNotFound { col: String, backtrace: Backtrace },
}
impl ErrorExt for Error {
@@ -113,7 +116,8 @@ impl ErrorExt for Error {
| MultipleVector { .. }
| ExpectExpr { .. }
| ExpectRangeSelector { .. }
| ZeroRangeSelector { .. } => StatusCode::InvalidArguments,
| ZeroRangeSelector { .. }
| ColumnNotFound { .. } => StatusCode::InvalidArguments,
UnknownTable { .. }
| DataFusionPlanning { .. }

View File

@@ -26,12 +26,14 @@ pub use aggr_over_time::{
AbsentOverTime, AvgOverTime, CountOverTime, LastOverTime, MaxOverTime, MinOverTime,
PresentOverTime, StddevOverTime, StdvarOverTime, SumOverTime,
};
pub use changes::Changes;
use datafusion::arrow::array::ArrayRef;
use datafusion::error::DataFusionError;
use datafusion::physical_plan::ColumnarValue;
pub use extrapolate_rate::{Delta, Increase, Rate};
pub use idelta::IDelta;
pub use quantile::QuantileOverTime;
pub use resets::Resets;
pub(crate) fn extract_array(columnar_value: &ColumnarValue) -> Result<ArrayRef, DataFusionError> {
if let ColumnarValue::Array(array) = columnar_value {

View File

@@ -183,8 +183,6 @@ pub fn stddev_over_time(_: &TimestampMillisecondArray, values: &Float64Array) ->
}
}
// TODO(ruihang): support quantile_over_time
#[cfg(test)]
mod test {
use super::*;

View File

@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#![feature(option_get_or_insert_default)]
pub mod error;
pub mod extension_plan;
pub mod functions;

View File

@@ -32,7 +32,7 @@ use datafusion::prelude::{Column, Expr as DfExpr, JoinType};
use datafusion::scalar::ScalarValue;
use datafusion::sql::TableReference;
use datatypes::arrow::datatypes::DataType as ArrowDataType;
use promql_parser::label::{MatchOp, Matchers, METRIC_NAME};
use promql_parser::label::{MatchOp, Matcher, Matchers, METRIC_NAME};
use promql_parser::parser::{
token, AggModifier, AggregateExpr, BinaryExpr as PromBinaryExpr, Call, EvalStmt,
Expr as PromExpr, Function, MatrixSelector, NumberLiteral, Offset, ParenExpr, StringLiteral,
@@ -42,17 +42,18 @@ use snafu::{ensure, OptionExt, ResultExt};
use table::table::adapter::DfTableProviderAdapter;
use crate::error::{
CatalogSnafu, DataFusionPlanningSnafu, ExpectExprSnafu, ExpectRangeSelectorSnafu,
MultipleVectorSnafu, Result, TableNameNotFoundSnafu, TimeIndexNotFoundSnafu,
UnexpectedPlanExprSnafu, UnexpectedTokenSnafu, UnknownTableSnafu, UnsupportedExprSnafu,
ValueNotFoundSnafu, ZeroRangeSelectorSnafu,
CatalogSnafu, ColumnNotFoundSnafu, DataFusionPlanningSnafu, ExpectExprSnafu,
ExpectRangeSelectorSnafu, MultipleVectorSnafu, Result, TableNameNotFoundSnafu,
TimeIndexNotFoundSnafu, UnexpectedPlanExprSnafu, UnexpectedTokenSnafu, UnknownTableSnafu,
UnsupportedExprSnafu, ValueNotFoundSnafu, ZeroRangeSelectorSnafu,
};
use crate::extension_plan::{
EmptyMetric, InstantManipulate, Millisecond, RangeManipulate, SeriesDivide, SeriesNormalize,
};
use crate::functions::{
AbsentOverTime, AvgOverTime, CountOverTime, Delta, IDelta, Increase, LastOverTime, MaxOverTime,
MinOverTime, PresentOverTime, QuantileOverTime, Rate, SumOverTime,
AbsentOverTime, AvgOverTime, Changes, CountOverTime, Delta, IDelta, Increase, LastOverTime,
MaxOverTime, MinOverTime, PresentOverTime, QuantileOverTime, Rate, Resets, StddevOverTime,
StdvarOverTime, SumOverTime,
};
const LEFT_PLAN_JOIN_ALIAS: &str = "lhs";
@@ -63,6 +64,9 @@ const SPECIAL_TIME_FUNCTION: &str = "time";
/// default value column name for empty metric
const DEFAULT_VALUE_COLUMN: &str = "value";
/// Special modifier to project field columns under multi-field mode
const FIELD_COLUMN_MATCHER: &str = "__field__";
#[derive(Default, Debug, Clone)]
struct PromPlannerContext {
// query parameters
@@ -76,6 +80,7 @@ struct PromPlannerContext {
time_index_column: Option<String>,
value_columns: Vec<String>,
tag_columns: Vec<String>,
field_column_matcher: Option<Vec<Matcher>>,
/// The range in millisecond of range selector. None if there is no range selector.
range: Option<Millisecond>,
}
@@ -399,6 +404,11 @@ impl PromPlanner {
// TODO(ruihang): support other metric match ops
if matcher.name == METRIC_NAME && matches!(matcher.op, MatchOp::Equal) {
self.ctx.table_name = Some(matcher.value.clone());
} else if matcher.name == FIELD_COLUMN_MATCHER {
self.ctx
.field_column_matcher
.get_or_insert_default()
.push(matcher.clone());
} else {
matchers.insert(matcher.clone());
}
@@ -431,10 +441,78 @@ impl PromPlanner {
)));
// make table scan with filter exprs
let table_scan = self
let mut table_scan = self
.create_table_scan_plan(&table_name, filters.clone())
.await?;
// make a projection plan if there is any `__field__` matcher
if let Some(field_matchers) = &self.ctx.field_column_matcher {
let col_set = self.ctx.value_columns.iter().collect::<HashSet<_>>();
// opt-in set
let mut result_set = HashSet::new();
// opt-out set
let mut reverse_set = HashSet::new();
for matcher in field_matchers {
match &matcher.op {
MatchOp::Equal => {
if col_set.contains(&matcher.value) {
result_set.insert(matcher.value.clone());
} else {
return Err(ColumnNotFoundSnafu {
col: self.ctx.table_name.clone().unwrap(),
}
.build());
}
}
MatchOp::NotEqual => {
if col_set.contains(&matcher.value) {
reverse_set.insert(matcher.value.clone());
} else {
return Err(ValueNotFoundSnafu {
table: self.ctx.table_name.clone().unwrap(),
}
.build());
}
}
MatchOp::Re(regex) => {
for col in &self.ctx.value_columns {
if regex.is_match(col) {
result_set.insert(col.clone());
}
}
}
MatchOp::NotRe(regex) => {
for col in &self.ctx.value_columns {
if regex.is_match(col) {
reverse_set.insert(col.clone());
}
}
}
}
}
// merge two set
if result_set.is_empty() {
result_set = col_set.into_iter().cloned().collect();
}
for col in reverse_set {
result_set.remove(&col);
}
self.ctx.value_columns = result_set.iter().cloned().collect();
let exprs = result_set
.into_iter()
.map(|col| DfExpr::Column(col.into()))
.chain(self.create_tag_column_exprs()?.into_iter())
.chain(Some(self.create_time_index_column_expr()?))
.collect::<Vec<_>>();
// reuse this variable for simplicity
table_scan = LogicalPlanBuilder::from(table_scan)
.project(exprs)
.context(DataFusionPlanningSnafu)?
.build()
.context(DataFusionPlanningSnafu)?;
}
// make filter and sort plan
let sort_plan = LogicalPlanBuilder::from(table_scan)
.filter(utils::conjunction(filters.into_iter()).unwrap())
@@ -684,6 +762,8 @@ impl PromPlanner {
)),
"idelta" => ScalarFunc::Udf(IDelta::<false>::scalar_udf()),
"irate" => ScalarFunc::Udf(IDelta::<true>::scalar_udf()),
"resets" => ScalarFunc::Udf(Resets::scalar_udf()),
"changes" => ScalarFunc::Udf(Changes::scalar_udf()),
"avg_over_time" => ScalarFunc::Udf(AvgOverTime::scalar_udf()),
"min_over_time" => ScalarFunc::Udf(MinOverTime::scalar_udf()),
"max_over_time" => ScalarFunc::Udf(MaxOverTime::scalar_udf()),
@@ -692,6 +772,8 @@ impl PromPlanner {
"last_over_time" => ScalarFunc::Udf(LastOverTime::scalar_udf()),
"absent_over_time" => ScalarFunc::Udf(AbsentOverTime::scalar_udf()),
"present_over_time" => ScalarFunc::Udf(PresentOverTime::scalar_udf()),
"stddev_over_time" => ScalarFunc::Udf(StddevOverTime::scalar_udf()),
"stdvar_over_time" => ScalarFunc::Udf(StdvarOverTime::scalar_udf()),
"quantile_over_time" => {
let quantile_expr = match other_input_exprs.get(0) {
Some(DfExpr::Literal(ScalarValue::Float64(Some(quantile)))) => *quantile,
@@ -1690,4 +1772,138 @@ mod test {
indie_query_plan_compare(query, expected).await;
}
#[tokio::test]
async fn value_matcher() {
// template
let mut eval_stmt = EvalStmt {
expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
start: UNIX_EPOCH,
end: UNIX_EPOCH
.checked_add(Duration::from_secs(100_000))
.unwrap(),
interval: Duration::from_secs(5),
lookback_delta: Duration::from_secs(1),
};
let cases = [
// single equal matcher
(
r#"some_metric{__field__="field_1"}"#,
vec![
"some_metric.field_1",
"some_metric.tag_0",
"some_metric.tag_1",
"some_metric.tag_2",
"some_metric.timestamp",
],
),
// two equal matchers
(
r#"some_metric{__field__="field_1", __field__="field_0"}"#,
vec![
"some_metric.field_0",
"some_metric.field_1",
"some_metric.tag_0",
"some_metric.tag_1",
"some_metric.tag_2",
"some_metric.timestamp",
],
),
// single not_eq mathcer
(
r#"some_metric{__field__!="field_1"}"#,
vec![
"some_metric.field_0",
"some_metric.field_2",
"some_metric.tag_0",
"some_metric.tag_1",
"some_metric.tag_2",
"some_metric.timestamp",
],
),
// two not_eq mathcers
(
r#"some_metric{__field__!="field_1", __field__!="field_2"}"#,
vec![
"some_metric.field_0",
"some_metric.tag_0",
"some_metric.tag_1",
"some_metric.tag_2",
"some_metric.timestamp",
],
),
// equal and not_eq matchers (no conflict)
(
r#"some_metric{__field__="field_1", __field__!="field_0"}"#,
vec![
"some_metric.field_1",
"some_metric.tag_0",
"some_metric.tag_1",
"some_metric.tag_2",
"some_metric.timestamp",
],
),
// equal and not_eq matchers (conflict)
(
r#"some_metric{__field__="field_2", __field__!="field_2"}"#,
vec![
"some_metric.tag_0",
"some_metric.tag_1",
"some_metric.tag_2",
"some_metric.timestamp",
],
),
// single regex eq matcher
(
r#"some_metric{__field__=~"field_1|field_2"}"#,
vec![
"some_metric.field_1",
"some_metric.field_2",
"some_metric.tag_0",
"some_metric.tag_1",
"some_metric.tag_2",
"some_metric.timestamp",
],
),
// single regex not_eq matcher
(
r#"some_metric{__field__!~"field_1|field_2"}"#,
vec![
"some_metric.field_0",
"some_metric.tag_0",
"some_metric.tag_1",
"some_metric.tag_2",
"some_metric.timestamp",
],
),
];
for case in cases {
let prom_expr = parser::parse(case.0).unwrap();
eval_stmt.expr = prom_expr;
let table_provider = build_test_table_provider("some_metric".to_string(), 3, 3).await;
let plan = PromPlanner::stmt_to_plan(table_provider, eval_stmt.clone())
.await
.unwrap();
let mut fields = plan.schema().field_names();
let mut expected = case.1.into_iter().map(String::from).collect::<Vec<_>>();
fields.sort();
expected.sort();
assert_eq!(fields, expected, "case: {:?}", case.0);
}
let bad_cases = [
r#"some_metric{__field__="nonexistent"}"#,
r#"some_metric{__field__!="nonexistent"}"#,
];
for case in bad_cases {
let prom_expr = parser::parse(case).unwrap();
eval_stmt.expr = prom_expr;
let table_provider = build_test_table_provider("some_metric".to_string(), 3, 3).await;
let plan = PromPlanner::stmt_to_plan(table_provider, eval_stmt.clone()).await;
assert!(plan.is_err(), "case: {:?}", case);
}
}
}