From a12ee5cab8b5e4ff4bccac5ea4000d39a017676d Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Thu, 31 Aug 2023 22:51:34 -0500 Subject: [PATCH] fix: qualify inputs on handling join in promql (#2297) * add qualifier to join inputs Signed-off-by: Ruihang Xia * add one more case Signed-off-by: Ruihang Xia * update test results Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia --- src/promql/src/planner.rs | 78 +++++++++----- tests-integration/src/tests/promql_test.rs | 24 ++--- tests/cases/standalone/common/tql/join.result | 102 ++++++++++++++++++ tests/cases/standalone/common/tql/join.sql | 43 ++++++++ 4 files changed, 207 insertions(+), 40 deletions(-) create mode 100644 tests/cases/standalone/common/tql/join.result create mode 100644 tests/cases/standalone/common/tql/join.sql diff --git a/src/promql/src/planner.rs b/src/promql/src/planner.rs index bf8c8eeb6c..b8f92f26ea 100644 --- a/src/promql/src/planner.rs +++ b/src/promql/src/planner.rs @@ -142,11 +142,7 @@ impl PromPlanner { // calculate columns to group by // Need to append time index column into group by columns - let group_exprs = modifier - .as_ref() - .map_or(Ok(vec![self.create_time_index_column_expr()?]), |m| { - self.agg_modifier_to_col(input.schema(), m) - })?; + let group_exprs = self.agg_modifier_to_col(input.schema(), modifier)?; // convert op and value columns to aggregate exprs let aggr_exprs = self.create_aggregate_exprs(*op, &input)?; @@ -261,24 +257,34 @@ impl PromPlanner { (None, None) => { let left_input = self.prom_expr_to_plan(*lhs.clone()).await?; let left_field_columns = self.ctx.field_columns.clone(); - let left_schema = left_input.schema().clone(); + let left_table_ref: OwnedTableReference = + self.ctx.table_name.clone().unwrap_or_default().into(); let right_input = self.prom_expr_to_plan(*rhs.clone()).await?; let right_field_columns = self.ctx.field_columns.clone(); - let right_schema = right_input.schema().clone(); + let right_table_ref: OwnedTableReference = + self.ctx.table_name.clone().unwrap_or_default().into(); + + // TODO(ruihang): avoid join if left and right are the same table let mut field_columns = left_field_columns.iter().zip(right_field_columns.iter()); - // the new ctx.field_columns for the generated join plan - let join_plan = self.join_on_non_field_columns(left_input, right_input)?; + let join_plan = self.join_on_non_field_columns( + left_input, + right_input, + left_table_ref.clone(), + right_table_ref.clone(), + )?; + let join_plan_schema = join_plan.schema().clone(); + let bin_expr_builder = |_: &String| { let (left_col_name, right_col_name) = field_columns.next().unwrap(); - let left_col = left_schema - .field_with_name(None, left_col_name) + let left_col = join_plan_schema + .field_with_name(Some(&left_table_ref), left_col_name) .context(DataFusionPlanningSnafu)? .qualified_column(); - let right_col = right_schema - .field_with_name(None, right_col_name) + let right_col = join_plan_schema + .field_with_name(Some(&right_table_ref), right_col_name) .context(DataFusionPlanningSnafu)? .qualified_column(); @@ -681,10 +687,14 @@ impl PromPlanner { fn agg_modifier_to_col( &mut self, input_schema: &DFSchemaRef, - modifier: &LabelModifier, + modifier: &Option, ) -> Result> { match modifier { - LabelModifier::Include(labels) => { + None => { + self.ctx.tag_columns = vec![]; + Ok(vec![self.create_time_index_column_expr()?]) + } + Some(LabelModifier::Include(labels)) => { let mut exprs = Vec::with_capacity(labels.labels.len()); for label in &labels.labels { // nonexistence label will be ignored @@ -701,7 +711,7 @@ impl PromPlanner { Ok(exprs) } - LabelModifier::Exclude(labels) => { + Some(LabelModifier::Exclude(labels)) => { let mut all_fields = input_schema .fields() .iter() @@ -1225,6 +1235,8 @@ impl PromPlanner { &self, left: LogicalPlan, right: LogicalPlan, + left_table_ref: OwnedTableReference, + right_table_ref: OwnedTableReference, ) -> Result { let mut tag_columns = self .ctx @@ -1238,8 +1250,16 @@ impl PromPlanner { tag_columns.push(Column::from_name(time_index_column)); } + let right = LogicalPlanBuilder::from(right) + .alias(right_table_ref) + .context(DataFusionPlanningSnafu)? + .build() + .context(DataFusionPlanningSnafu)?; + // Inner Join on time index column to concat two operator LogicalPlanBuilder::from(left) + .alias(left_table_ref) + .context(DataFusionPlanningSnafu)? .join( right, JoinType::Inner, @@ -1810,18 +1830,20 @@ mod test { let expected = String::from( "Projection: some_metric.tag_0, some_metric.timestamp, some_metric.field_0 + some_metric.field_0 AS some_metric.field_0 + some_metric.field_0 [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), some_metric.field_0 + some_metric.field_0:Float64;N]\ \n Inner Join: some_metric.tag_0 = some_metric.tag_0, some_metric.timestamp = some_metric.timestamp [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ - \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ - \n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [false] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ - \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ - \n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ - \n Filter: some_metric.tag_0 = Utf8(\"foo\") [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ - \n TableScan: some_metric, unsupported_filters=[tag_0 = Utf8(\"foo\"), timestamp >= TimestampMillisecond(-1000, None), timestamp <= TimestampMillisecond(100001000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ - \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ - \n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [false] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ - \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ - \n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ - \n Filter: some_metric.tag_0 = Utf8(\"bar\") [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ - \n TableScan: some_metric, unsupported_filters=[tag_0 = Utf8(\"bar\"), timestamp >= TimestampMillisecond(-1000, None), timestamp <= TimestampMillisecond(100001000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]" + \n SubqueryAlias: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [false] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n Filter: some_metric.tag_0 = Utf8(\"foo\") [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n TableScan: some_metric, unsupported_filters=[tag_0 = Utf8(\"foo\"), timestamp >= TimestampMillisecond(-1000, None), timestamp <= TimestampMillisecond(100001000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n SubqueryAlias: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [false] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n Filter: some_metric.tag_0 = Utf8(\"bar\") [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n TableScan: some_metric, unsupported_filters=[tag_0 = Utf8(\"bar\"), timestamp >= TimestampMillisecond(-1000, None), timestamp <= TimestampMillisecond(100001000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]" ); assert_eq!(plan.display_indent_schema().to_string(), expected); diff --git a/tests-integration/src/tests/promql_test.rs b/tests-integration/src/tests/promql_test.rs index 4bf6d85067..52aa567793 100644 --- a/tests-integration/src/tests/promql_test.rs +++ b/tests-integration/src/tests/promql_test.rs @@ -417,12 +417,12 @@ async fn aggregators_complex_combined_aggrs(instance: Arc) { unix_epoch_plus_100s(), Duration::from_secs(60), Duration::from_secs(0), - "+------------+---------------------+-----------------------------------------------------------------------------------------------------------+\ - \n| job | ts | SUM(http_requests.value) + MIN(http_requests.value) + MAX(http_requests.value) + AVG(http_requests.value) |\ - \n+------------+---------------------+-----------------------------------------------------------------------------------------------------------+\ - \n| api-server | 1970-01-01T00:00:00 | 1750.0 |\ - \n| app-server | 1970-01-01T00:00:00 | 4550.0 |\ - \n+------------+---------------------+-----------------------------------------------------------------------------------------------------------+", + "+------------+---------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+\ + \n| job | ts | http_requests.http_requests.http_requests.SUM(http_requests.value) + http_requests.MIN(http_requests.value) + http_requests.MAX(http_requests.value) + http_requests.AVG(http_requests.value) |\ + \n+------------+---------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+\ + \n| api-server | 1970-01-01T00:00:00 | 1750.0 |\ + \n| app-server | 1970-01-01T00:00:00 | 4550.0 |\ + \n+------------+---------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+", ) .await; } @@ -442,12 +442,12 @@ async fn two_aggregators_combined_aggrs(instance: Arc) { unix_epoch_plus_100s(), Duration::from_secs(60), Duration::from_secs(0), - "+------------+---------------------+-----------------------------------------------------+\ - \n| job | ts | SUM(http_requests.value) + MIN(http_requests.value) |\ - \n+------------+---------------------+-----------------------------------------------------+\ - \n| api-server | 1970-01-01T00:00:00 | 1100.0 |\ - \n| app-server | 1970-01-01T00:00:00 | 3100.0 |\ - \n+------------+---------------------+-----------------------------------------------------+", + "+------------+---------------------+---------------------------------------------------------------------------------+\ + \n| job | ts | http_requests.SUM(http_requests.value) + http_requests.MIN(http_requests.value) |\ + \n+------------+---------------------+---------------------------------------------------------------------------------+\ + \n| api-server | 1970-01-01T00:00:00 | 1100.0 |\ + \n| app-server | 1970-01-01T00:00:00 | 3100.0 |\ + \n+------------+---------------------+---------------------------------------------------------------------------------+", ) .await; } diff --git a/tests/cases/standalone/common/tql/join.result b/tests/cases/standalone/common/tql/join.result new file mode 100644 index 0000000000..f1085a1c33 --- /dev/null +++ b/tests/cases/standalone/common/tql/join.result @@ -0,0 +1,102 @@ +create table completion( + ts timestamp time index, + model string primary key, + val double +); + +Affected Rows: 0 + +insert into completion values + (0, 'model-a', 10), + (5000, 'model-b', 20), + (10000, 'model-a', 30); + +Affected Rows: 3 + +create table prompt( + ts timestamp time index, + model string primary key, + val double +); + +Affected Rows: 0 + +insert into prompt values + (0, 'model-a', 100), + (5000, 'model-b', 200), + (10000, 'model-a', 300); + +Affected Rows: 3 + +-- SQLNESS SORT_RESULT 3 1 +tql eval(0, 10, '5s') sum(completion * 0.0015 / 1000) + sum(prompt / 1000 * 0.0015); + ++---------------------+-----------------------------------------------------------------------------------------------------------+ +| ts | completion.SUM(val * Float64(0.0015) / Float64(1000)) + prompt.SUM(val / Float64(1000) * Float64(0.0015)) | ++---------------------+-----------------------------------------------------------------------------------------------------------+ +| 1970-01-01T00:00:00 | 0.000165 | +| 1970-01-01T00:00:05 | 0.000495 | +| 1970-01-01T00:00:10 | 0.000825 | ++---------------------+-----------------------------------------------------------------------------------------------------------+ + +-- SQLNESS SORT_RESULT 3 1 +tql eval(0, 10, '5s') sum(completion * 0.0015 / 1000) + sum(prompt * 0.0015 / 1000); + ++---------------------+-----------------------------------------------------------------------------------------------------------+ +| ts | completion.SUM(val * Float64(0.0015) / Float64(1000)) + prompt.SUM(val * Float64(0.0015) / Float64(1000)) | ++---------------------+-----------------------------------------------------------------------------------------------------------+ +| 1970-01-01T00:00:00 | 0.000165 | +| 1970-01-01T00:00:05 | 0.000495 | +| 1970-01-01T00:00:10 | 0.000825 | ++---------------------+-----------------------------------------------------------------------------------------------------------+ + +-- SQLNESS SORT_RESULT 3 1 +tql eval(0, 10, '5s') sum(completion * 0.0015 / 1000) by (model) + sum(prompt * 0.0015 / 1000) by (model); + ++---------+---------------------+-----------------------------------------------------------------------------------------------------------+ +| model | ts | completion.SUM(val * Float64(0.0015) / Float64(1000)) + prompt.SUM(val * Float64(0.0015) / Float64(1000)) | ++---------+---------------------+-----------------------------------------------------------------------------------------------------------+ +| model-a | 1970-01-01T00:00:00 | 0.000825 | +| model-b | 1970-01-01T00:00:05 | 0.00066 | ++---------+---------------------+-----------------------------------------------------------------------------------------------------------+ + +-- SQLNESS SORT_RESULT 3 1 +tql eval(0, 10, '5s') sum(completion * 0.0015 / 1000) by (model) + sum(prompt * 0.0015 / 1000); + ++---------------------+-----------------------------------------------------------------------------------------------------------+ +| ts | completion.SUM(val * Float64(0.0015) / Float64(1000)) + prompt.SUM(val * Float64(0.0015) / Float64(1000)) | ++---------------------+-----------------------------------------------------------------------------------------------------------+ +| 1970-01-01T00:00:00 | 0.000225 | +| 1970-01-01T00:00:05 | 0.00051 | ++---------------------+-----------------------------------------------------------------------------------------------------------+ + +-- SQLNESS SORT_RESULT 3 1 +tql eval(0, 10, '5s') sum(completion / 1000) + max(completion / 1000); + ++---------------------+---------------------------------------------------------------------------+ +| ts | completion.SUM(val / Float64(1000)) + completion.MAX(val / Float64(1000)) | ++---------------------+---------------------------------------------------------------------------+ +| 1970-01-01T00:00:00 | 0.02 | +| 1970-01-01T00:00:05 | 0.05 | +| 1970-01-01T00:00:10 | 0.08 | ++---------------------+---------------------------------------------------------------------------+ + +-- SQLNESS SORT_RESULT 3 1 +tql eval(0, 10, '5s') sum(completion / 1000) + sum(completion / 1000); + ++---------------------+---------------------------------------------------------------------------+ +| ts | completion.SUM(val / Float64(1000)) + completion.SUM(val / Float64(1000)) | ++---------------------+---------------------------------------------------------------------------+ +| 1970-01-01T00:00:00 | 0.02 | +| 1970-01-01T00:00:05 | 0.06 | +| 1970-01-01T00:00:10 | 0.1 | ++---------------------+---------------------------------------------------------------------------+ + +drop table completion; + +Affected Rows: 1 + +drop table prompt; + +Affected Rows: 1 + diff --git a/tests/cases/standalone/common/tql/join.sql b/tests/cases/standalone/common/tql/join.sql new file mode 100644 index 0000000000..f67b4772a5 --- /dev/null +++ b/tests/cases/standalone/common/tql/join.sql @@ -0,0 +1,43 @@ +create table completion( + ts timestamp time index, + model string primary key, + val double +); + +insert into completion values + (0, 'model-a', 10), + (5000, 'model-b', 20), + (10000, 'model-a', 30); + +create table prompt( + ts timestamp time index, + model string primary key, + val double +); + +insert into prompt values + (0, 'model-a', 100), + (5000, 'model-b', 200), + (10000, 'model-a', 300); + +-- SQLNESS SORT_RESULT 3 1 +tql eval(0, 10, '5s') sum(completion * 0.0015 / 1000) + sum(prompt / 1000 * 0.0015); + +-- SQLNESS SORT_RESULT 3 1 +tql eval(0, 10, '5s') sum(completion * 0.0015 / 1000) + sum(prompt * 0.0015 / 1000); + +-- SQLNESS SORT_RESULT 3 1 +tql eval(0, 10, '5s') sum(completion * 0.0015 / 1000) by (model) + sum(prompt * 0.0015 / 1000) by (model); + +-- SQLNESS SORT_RESULT 3 1 +tql eval(0, 10, '5s') sum(completion * 0.0015 / 1000) by (model) + sum(prompt * 0.0015 / 1000); + +-- SQLNESS SORT_RESULT 3 1 +tql eval(0, 10, '5s') sum(completion / 1000) + max(completion / 1000); + +-- SQLNESS SORT_RESULT 3 1 +tql eval(0, 10, '5s') sum(completion / 1000) + sum(completion / 1000); + +drop table completion; + +drop table prompt;