feat: support PromQL operations over the same metric (#3124)

* update sqlness result

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update ut cases

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove deadcode

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2024-01-12 07:07:17 +08:00
committed by GitHub
parent 8ec1e42754
commit 0882da4d01
5 changed files with 143 additions and 69 deletions

View File

@@ -291,13 +291,13 @@ impl PromPlanner {
(None, None) => {
let left_input = self.prom_expr_to_plan(*lhs.clone()).await?;
let left_field_columns = self.ctx.field_columns.clone();
let left_table_ref: OwnedTableReference =
let mut left_table_ref: OwnedTableReference =
self.ctx.table_name.clone().unwrap_or_default().into();
let left_context = self.ctx.clone();
let right_input = self.prom_expr_to_plan(*rhs.clone()).await?;
let right_field_columns = self.ctx.field_columns.clone();
let right_table_ref: OwnedTableReference =
let mut right_table_ref: OwnedTableReference =
self.ctx.table_name.clone().unwrap_or_default().into();
let right_context = self.ctx.clone();
@@ -316,6 +316,12 @@ impl PromPlanner {
}
// normal join
if left_table_ref == right_table_ref {
// rename table references to avoid ambiguity
left_table_ref = OwnedTableReference::bare("lhs");
right_table_ref = OwnedTableReference::bare("rhs");
self.ctx.table_name = Some("lhs".to_string());
}
let mut field_columns =
left_field_columns.iter().zip(right_field_columns.iter());
let join_plan = self.join_on_non_field_columns(
@@ -1847,7 +1853,7 @@ impl PromPlanner {
.zip(self.ctx.field_columns.iter())
.map(|(expr, name)| Ok(DfExpr::Alias(Alias::new(expr, name.to_string()))));
// chain non-value columns (unchanged) and value columns (applied computation then alias)
// chain non-field columns (unchanged) and field columns (applied computation then alias)
let project_fields = non_field_columns_iter
.chain(field_columns_iter)
.collect::<Result<Vec<_>>>()?;
@@ -2379,16 +2385,16 @@ mod test {
.unwrap();
let expected = String::from(
"Projection: some_metric.tag_0, some_metric.timestamp, some_metric.field_0 + some_metric.field_0 AS some_metric.field_0 + some_metric.field_0 [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), some_metric.field_0 + some_metric.field_0:Float64;N]\
\n Inner Join: some_metric.tag_0 = some_metric.tag_0, some_metric.timestamp = some_metric.timestamp [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n SubqueryAlias: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
"Projection: lhs.tag_0, lhs.timestamp, lhs.field_0 + rhs.field_0 AS lhs.field_0 + rhs.field_0 [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), lhs.field_0 + rhs.field_0:Float64;N]\
\n Inner Join: lhs.tag_0 = rhs.tag_0, lhs.timestamp = rhs.timestamp [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n SubqueryAlias: lhs [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [false] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Filter: some_metric.tag_0 = Utf8(\"foo\") [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n TableScan: some_metric, unsupported_filters=[tag_0 = Utf8(\"foo\"), timestamp >= TimestampMillisecond(-1000, None), timestamp <= TimestampMillisecond(100001000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n SubqueryAlias: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n SubqueryAlias: rhs [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [false] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\