mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-07 05:42:57 +00:00
feat(promql-planner): introduce vector matching binary operation (#5578)
* feat(promql-planner): support vector matching for binary operation * test: add sqlness tests
This commit is contained in:
@@ -482,6 +482,7 @@ impl PromPlanner {
|
||||
// if left plan or right plan tag is empty, means case like `scalar(...) + host` or `host + scalar(...)`
|
||||
// under this case we only join on time index
|
||||
left_context.tag_columns.is_empty() || right_context.tag_columns.is_empty(),
|
||||
modifier,
|
||||
)?;
|
||||
let join_plan_schema = join_plan.schema().clone();
|
||||
|
||||
@@ -2176,24 +2177,49 @@ impl PromPlanner {
|
||||
left_time_index_column: Option<String>,
|
||||
right_time_index_column: Option<String>,
|
||||
only_join_time_index: bool,
|
||||
modifier: &Option<BinModifier>,
|
||||
) -> Result<LogicalPlan> {
|
||||
let mut left_tag_columns = if only_join_time_index {
|
||||
vec![]
|
||||
BTreeSet::new()
|
||||
} else {
|
||||
self.ctx
|
||||
.tag_columns
|
||||
.iter()
|
||||
.map(Column::from_name)
|
||||
.collect::<Vec<_>>()
|
||||
.cloned()
|
||||
.collect::<BTreeSet<_>>()
|
||||
};
|
||||
let mut right_tag_columns = left_tag_columns.clone();
|
||||
|
||||
// apply modifier
|
||||
if let Some(modifier) = modifier {
|
||||
// apply label modifier
|
||||
if let Some(matching) = &modifier.matching {
|
||||
match matching {
|
||||
// keeps columns mentioned in `on`
|
||||
LabelModifier::Include(on) => {
|
||||
let mask = on.labels.iter().cloned().collect::<BTreeSet<_>>();
|
||||
left_tag_columns = left_tag_columns.intersection(&mask).cloned().collect();
|
||||
right_tag_columns =
|
||||
right_tag_columns.intersection(&mask).cloned().collect();
|
||||
}
|
||||
// removes columns memtioned in `ignoring`
|
||||
LabelModifier::Exclude(ignoring) => {
|
||||
// doesn't check existence of label
|
||||
for label in &ignoring.labels {
|
||||
let _ = left_tag_columns.remove(label);
|
||||
let _ = right_tag_columns.remove(label);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// push time index column if it exists
|
||||
if let (Some(left_time_index_column), Some(right_time_index_column)) =
|
||||
(left_time_index_column, right_time_index_column)
|
||||
{
|
||||
left_tag_columns.push(Column::from_name(left_time_index_column));
|
||||
right_tag_columns.push(Column::from_name(right_time_index_column));
|
||||
left_tag_columns.insert(left_time_index_column);
|
||||
right_tag_columns.insert(right_time_index_column);
|
||||
}
|
||||
|
||||
let right = LogicalPlanBuilder::from(right)
|
||||
@@ -2209,7 +2235,16 @@ impl PromPlanner {
|
||||
.join(
|
||||
right,
|
||||
JoinType::Inner,
|
||||
(left_tag_columns, right_tag_columns),
|
||||
(
|
||||
left_tag_columns
|
||||
.into_iter()
|
||||
.map(Column::from_name)
|
||||
.collect::<Vec<_>>(),
|
||||
right_tag_columns
|
||||
.into_iter()
|
||||
.map(Column::from_name)
|
||||
.collect::<Vec<_>>(),
|
||||
),
|
||||
None,
|
||||
)
|
||||
.context(DataFusionPlanningSnafu)?
|
||||
@@ -3395,6 +3430,59 @@ mod test {
|
||||
indie_query_plan_compare(query, expected).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_hash_join() {
|
||||
let mut eval_stmt = EvalStmt {
|
||||
expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
|
||||
start: UNIX_EPOCH,
|
||||
end: UNIX_EPOCH
|
||||
.checked_add(Duration::from_secs(100_000))
|
||||
.unwrap(),
|
||||
interval: Duration::from_secs(5),
|
||||
lookback_delta: Duration::from_secs(1),
|
||||
};
|
||||
|
||||
let case = r#"http_server_requests_seconds_sum{uri="/accounts/login"} / ignoring(kubernetes_pod_name,kubernetes_namespace) http_server_requests_seconds_count{uri="/accounts/login"}"#;
|
||||
|
||||
let prom_expr = parser::parse(case).unwrap();
|
||||
eval_stmt.expr = prom_expr;
|
||||
let table_provider = build_test_table_provider_with_fields(
|
||||
&[
|
||||
(
|
||||
DEFAULT_SCHEMA_NAME.to_string(),
|
||||
"http_server_requests_seconds_sum".to_string(),
|
||||
),
|
||||
(
|
||||
DEFAULT_SCHEMA_NAME.to_string(),
|
||||
"http_server_requests_seconds_count".to_string(),
|
||||
),
|
||||
],
|
||||
&["uri", "kubernetes_namespace", "kubernetes_pod_name"],
|
||||
)
|
||||
.await;
|
||||
// Should be ok
|
||||
let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
|
||||
.await
|
||||
.unwrap();
|
||||
let expected = r#"Projection: http_server_requests_seconds_count.uri, http_server_requests_seconds_count.kubernetes_namespace, http_server_requests_seconds_count.kubernetes_pod_name, http_server_requests_seconds_count.greptime_timestamp, http_server_requests_seconds_sum.greptime_value / http_server_requests_seconds_count.greptime_value AS http_server_requests_seconds_sum.greptime_value / http_server_requests_seconds_count.greptime_value
|
||||
Inner Join: http_server_requests_seconds_sum.greptime_timestamp = http_server_requests_seconds_count.greptime_timestamp, http_server_requests_seconds_sum.uri = http_server_requests_seconds_count.uri
|
||||
SubqueryAlias: http_server_requests_seconds_sum
|
||||
PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp]
|
||||
PromSeriesNormalize: offset=[0], time index=[greptime_timestamp], filter NaN: [false]
|
||||
PromSeriesDivide: tags=["uri", "kubernetes_namespace", "kubernetes_pod_name"]
|
||||
Sort: http_server_requests_seconds_sum.uri DESC NULLS LAST, http_server_requests_seconds_sum.kubernetes_namespace DESC NULLS LAST, http_server_requests_seconds_sum.kubernetes_pod_name DESC NULLS LAST, http_server_requests_seconds_sum.greptime_timestamp DESC NULLS LAST
|
||||
Filter: http_server_requests_seconds_sum.uri = Utf8("/accounts/login") AND http_server_requests_seconds_sum.greptime_timestamp >= TimestampMillisecond(-1000, None) AND http_server_requests_seconds_sum.greptime_timestamp <= TimestampMillisecond(100001000, None)
|
||||
TableScan: http_server_requests_seconds_sum
|
||||
SubqueryAlias: http_server_requests_seconds_count
|
||||
PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp]
|
||||
PromSeriesNormalize: offset=[0], time index=[greptime_timestamp], filter NaN: [false]
|
||||
PromSeriesDivide: tags=["uri", "kubernetes_namespace", "kubernetes_pod_name"]
|
||||
Sort: http_server_requests_seconds_count.uri DESC NULLS LAST, http_server_requests_seconds_count.kubernetes_namespace DESC NULLS LAST, http_server_requests_seconds_count.kubernetes_pod_name DESC NULLS LAST, http_server_requests_seconds_count.greptime_timestamp DESC NULLS LAST
|
||||
Filter: http_server_requests_seconds_count.uri = Utf8("/accounts/login") AND http_server_requests_seconds_count.greptime_timestamp >= TimestampMillisecond(-1000, None) AND http_server_requests_seconds_count.greptime_timestamp <= TimestampMillisecond(100001000, None)
|
||||
TableScan: http_server_requests_seconds_count"#;
|
||||
assert_eq!(plan.to_string(), expected);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_nested_histogram_quantile() {
|
||||
let mut eval_stmt = EvalStmt {
|
||||
|
||||
@@ -638,3 +638,78 @@ drop table cache_miss;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
create table cache_hit_with_null_label (
|
||||
ts timestamp time index,
|
||||
job string,
|
||||
null_label string null,
|
||||
greptime_value double,
|
||||
primary key (job, null_label)
|
||||
);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
create table cache_miss_with_null_label (
|
||||
ts timestamp time index,
|
||||
job string,
|
||||
null_label string null,
|
||||
greptime_value double,
|
||||
primary key (job, null_label)
|
||||
);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
insert into cache_hit_with_null_label values
|
||||
(3000, "read", null, 1.0),
|
||||
(3000, "write", null, 2.0),
|
||||
(4000, "read", null, 3.0),
|
||||
(4000, "write", null, 4.0);
|
||||
|
||||
Affected Rows: 4
|
||||
|
||||
insert into cache_miss_with_null_label values
|
||||
(3000, "read", null, 1.0),
|
||||
(3000, "write", null, 2.0),
|
||||
(4000, "read", null, 1.0),
|
||||
(4000, "write", null, 2.0);
|
||||
|
||||
Affected Rows: 4
|
||||
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
-- null!=null, so it will returns the empty set.
|
||||
tql eval (3, 4, '1s') cache_hit_with_null_label / (cache_miss_with_null_label + cache_hit_with_null_label);
|
||||
|
||||
++
|
||||
++
|
||||
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
tql eval (3, 4, '1s') cache_hit_with_null_label / ignoring(null_label) (cache_miss_with_null_label + ignoring(null_label) cache_hit_with_null_label);
|
||||
|
||||
+-------+------------+---------------------+---------------------------------------------------------------------------------------------------------------+
|
||||
| job | null_label | ts | lhs.greptime_value / rhs.cache_miss_with_null_label.greptime_value + cache_hit_with_null_label.greptime_value |
|
||||
+-------+------------+---------------------+---------------------------------------------------------------------------------------------------------------+
|
||||
| read | | 1970-01-01T00:00:03 | 0.5 |
|
||||
| read | | 1970-01-01T00:00:04 | 0.75 |
|
||||
| write | | 1970-01-01T00:00:03 | 0.5 |
|
||||
| write | | 1970-01-01T00:00:04 | 0.6666666666666666 |
|
||||
+-------+------------+---------------------+---------------------------------------------------------------------------------------------------------------+
|
||||
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
tql eval (3, 4, '1s') cache_hit_with_null_label / on(job) (cache_miss_with_null_label + on(job) cache_hit_with_null_label);
|
||||
|
||||
+-------+------------+---------------------+---------------------------------------------------------------------------------------------------------------+
|
||||
| job | null_label | ts | lhs.greptime_value / rhs.cache_miss_with_null_label.greptime_value + cache_hit_with_null_label.greptime_value |
|
||||
+-------+------------+---------------------+---------------------------------------------------------------------------------------------------------------+
|
||||
| read | | 1970-01-01T00:00:03 | 0.5 |
|
||||
| read | | 1970-01-01T00:00:04 | 0.75 |
|
||||
| write | | 1970-01-01T00:00:03 | 0.5 |
|
||||
| write | | 1970-01-01T00:00:04 | 0.6666666666666666 |
|
||||
+-------+------------+---------------------+---------------------------------------------------------------------------------------------------------------+
|
||||
|
||||
drop table cache_hit_with_null_label;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
drop table cache_miss_with_null_label;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
|
||||
@@ -295,3 +295,45 @@ tql eval (3, 4, '1s') cache_hit / (cache_miss + cache_hit);
|
||||
drop table cache_hit;
|
||||
|
||||
drop table cache_miss;
|
||||
|
||||
create table cache_hit_with_null_label (
|
||||
ts timestamp time index,
|
||||
job string,
|
||||
null_label string null,
|
||||
greptime_value double,
|
||||
primary key (job, null_label)
|
||||
);
|
||||
|
||||
create table cache_miss_with_null_label (
|
||||
ts timestamp time index,
|
||||
job string,
|
||||
null_label string null,
|
||||
greptime_value double,
|
||||
primary key (job, null_label)
|
||||
);
|
||||
|
||||
insert into cache_hit_with_null_label values
|
||||
(3000, "read", null, 1.0),
|
||||
(3000, "write", null, 2.0),
|
||||
(4000, "read", null, 3.0),
|
||||
(4000, "write", null, 4.0);
|
||||
|
||||
insert into cache_miss_with_null_label values
|
||||
(3000, "read", null, 1.0),
|
||||
(3000, "write", null, 2.0),
|
||||
(4000, "read", null, 1.0),
|
||||
(4000, "write", null, 2.0);
|
||||
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
-- null!=null, so it will returns the empty set.
|
||||
tql eval (3, 4, '1s') cache_hit_with_null_label / (cache_miss_with_null_label + cache_hit_with_null_label);
|
||||
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
tql eval (3, 4, '1s') cache_hit_with_null_label / ignoring(null_label) (cache_miss_with_null_label + ignoring(null_label) cache_hit_with_null_label);
|
||||
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
tql eval (3, 4, '1s') cache_hit_with_null_label / on(job) (cache_miss_with_null_label + on(job) cache_hit_with_null_label);
|
||||
|
||||
drop table cache_hit_with_null_label;
|
||||
|
||||
drop table cache_miss_with_null_label;
|
||||
|
||||
Reference in New Issue
Block a user