From 5d1f2310043ab8ece5c69d5b3982a7c6e0c033f0 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Wed, 15 Feb 2023 16:52:14 +0800 Subject: [PATCH] fix: update planner state according to output plan (#1005) * fix: update context according to planner phase Signed-off-by: Ruihang Xia * alias out qualifier Signed-off-by: Ruihang Xia * remove ignore Signed-off-by: Ruihang Xia * fix clippy Signed-off-by: Ruihang Xia * fix typo Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia --- src/datanode/src/tests/promql_test.rs | 56 ++++++++- src/promql/src/planner.rs | 156 ++++++++++++++++++++------ 2 files changed, 178 insertions(+), 34 deletions(-) diff --git a/src/datanode/src/tests/promql_test.rs b/src/datanode/src/tests/promql_test.rs index a6133be39f..c26f8f584d 100644 --- a/src/datanode/src/tests/promql_test.rs +++ b/src/datanode/src/tests/promql_test.rs @@ -302,7 +302,6 @@ async fn aggregators_empty_without() { // {job="app-server"} 4550 // {job="api-server"} 1750 #[tokio::test(flavor = "multi_thread")] -#[ignore = "binary expr on aggr result is not supported"] async fn aggregators_complex_combined_aggrs() { create_insert_query_assert( AGGREGATORS_CREATE_TABLE, @@ -312,7 +311,33 @@ async fn aggregators_complex_combined_aggrs() { unix_epoch_plus_100s(), Duration::from_secs(60), Duration::from_secs(0), - "", + "+------------+-----------------------------------------------------------------------------------------------------------+\ + \n| job | SUM(http_requests.value) + MIN(http_requests.value) + MAX(http_requests.value) + AVG(http_requests.value) |\ + \n+------------+-----------------------------------------------------------------------------------------------------------+\ + \n| api-server | 1750 |\ + \n| app-server | 4550 |\ + \n+------------+-----------------------------------------------------------------------------------------------------------+", + ) + .await; +} + +// This is not from prometheus test set. It's derived from `aggregators_complex_combined_aggrs()` +#[tokio::test(flavor = "multi_thread")] +async fn two_aggregators_combined_aggrs() { + create_insert_query_assert( + AGGREGATORS_CREATE_TABLE, + AGGREGATORS_INSERT_DATA, + "sum(http_requests) by (job) + min(http_requests) by (job) ", + UNIX_EPOCH, + unix_epoch_plus_100s(), + Duration::from_secs(60), + Duration::from_secs(0), + "+------------+-----------------------------------------------------+\ + \n| job | SUM(http_requests.value) + MIN(http_requests.value) |\ + \n+------------+-----------------------------------------------------+\ + \n| api-server | 1100 |\ + \n| app-server | 3100 |\ + \n+------------+-----------------------------------------------------+", ) .await; } @@ -339,3 +364,30 @@ async fn stddev_by_label() { ) .await; } + +// This is not derived from prometheus +#[tokio::test(flavor = "multi_thread")] +async fn binary_op_plain_columns() { + create_insert_query_assert( + AGGREGATORS_CREATE_TABLE, + AGGREGATORS_INSERT_DATA, + r#"http_requests - http_requests"#, + UNIX_EPOCH, + unix_epoch_plus_100s(), + Duration::from_secs(60), + Duration::from_secs(0), + "+------------+----------+------------+---------------------+-------------------------------------------+\ + \n| job | instance | group | ts | http_requests.value - http_requests.value |\ + \n+------------+----------+------------+---------------------+-------------------------------------------+\ + \n| api-server | 0 | canary | 1970-01-01T00:00:00 | 0 |\ + \n| api-server | 0 | production | 1970-01-01T00:00:00 | 0 |\ + \n| api-server | 1 | canary | 1970-01-01T00:00:00 | 0 |\ + \n| api-server | 1 | production | 1970-01-01T00:00:00 | 0 |\ + \n| app-server | 0 | canary | 1970-01-01T00:00:00 | 0 |\ + \n| app-server | 0 | production | 1970-01-01T00:00:00 | 0 |\ + \n| app-server | 1 | canary | 1970-01-01T00:00:00 | 0 |\ + \n| app-server | 1 | production | 1970-01-01T00:00:00 | 0 |\ + \n+------------+----------+------------+---------------------+-------------------------------------------+", + ) + .await; +} diff --git a/src/promql/src/planner.rs b/src/promql/src/planner.rs index 4582943e25..c7ab57c89d 100644 --- a/src/promql/src/planner.rs +++ b/src/promql/src/planner.rs @@ -17,9 +17,10 @@ use std::str::FromStr; use std::sync::Arc; use std::time::UNIX_EPOCH; -use datafusion::common::DFSchemaRef; +use datafusion::common::{DFSchemaRef, Result as DfResult}; use datafusion::datasource::DefaultTableSource; use datafusion::logical_expr::expr::AggregateFunction; +use datafusion::logical_expr::expr_rewriter::normalize_cols; use datafusion::logical_expr::{ AggregateFunction as AggregateFunctionEnum, BinaryExpr, BuiltinScalarFunction, Extension, LogicalPlan, LogicalPlanBuilder, Operator, @@ -104,7 +105,10 @@ impl PromPlanner { // calculate columns to group by let group_exprs = self.agg_modifier_to_col(input.schema(), modifier)?; // convert op and value columns to aggregate exprs - let aggr_exprs = self.create_aggregate_exprs(*op)?; + let aggr_exprs = self.create_aggregate_exprs(*op, &input)?; + + // remove time index column from context + self.ctx.time_index_column = None; // create plan LogicalPlanBuilder::from(input) @@ -152,19 +156,32 @@ impl PromPlanner { // both are columns. join them on time index (None, None) => { let left_input = self.prom_expr_to_plan(*lhs.clone())?; + let left_value_columns = self.ctx.value_columns.clone(); + let left_schema = left_input.schema().clone(); + let right_input = self.prom_expr_to_plan(*rhs.clone())?; - let join_plan = self.join_on_time_index(left_input, right_input)?; - self.projection_for_each_value_column(join_plan, |col| { + let right_value_columns = self.ctx.value_columns.clone(); + let right_schema = right_input.schema().clone(); + + let mut value_columns = + left_value_columns.iter().zip(right_value_columns.iter()); + // the new ctx.value_columns for the generated join plan + let join_plan = self.join_on_non_value_columns(left_input, right_input)?; + self.projection_for_each_value_column(join_plan, |_| { + let (left_col_name, right_col_name) = value_columns.next().unwrap(); + let left_col = left_schema + .field_with_name(None, left_col_name) + .context(DataFusionPlanningSnafu)? + .qualified_column(); + let right_col = right_schema + .field_with_name(None, right_col_name) + .context(DataFusionPlanningSnafu)? + .qualified_column(); + Ok(DfExpr::BinaryExpr(BinaryExpr { - left: Box::new(DfExpr::Column(Column::new( - Some(LEFT_PLAN_JOIN_ALIAS), - col, - ))), + left: Box::new(DfExpr::Column(left_col)), op: Self::prom_token_to_binary_op(*op)?, - right: Box::new(DfExpr::Column(Column::new( - self.ctx.table_name.as_ref(), - col, - ))), + right: Box::new(DfExpr::Column(right_col)), })) })? } @@ -328,8 +345,12 @@ impl PromPlanner { } /// Convert [AggModifier] to [Column] exprs for aggregation. + /// + /// # Side effect + /// + /// This method will also change the tag columns in ctx. fn agg_modifier_to_col( - &self, + &mut self, input_schema: &DFSchemaRef, modifier: &AggModifier, ) -> Result> { @@ -352,6 +373,10 @@ impl PromPlanner { })?; exprs.push(DfExpr::Column(Column::from(field.name()))); } + + // change the tag columns in context + self.ctx.tag_columns = labels.iter().cloned().collect(); + Ok(exprs) } AggModifier::Without(labels) => { @@ -382,6 +407,10 @@ impl PromPlanner { for value in &self.ctx.value_columns { all_fields.remove(value); } + + // change the tag columns in context + self.ctx.tag_columns = all_fields.iter().map(|col| (*col).clone()).collect(); + // collect remaining fields and convert to col expr let exprs = all_fields .into_iter() @@ -578,7 +607,17 @@ impl PromPlanner { }) } - fn create_aggregate_exprs(&self, op: TokenType) -> Result> { + /// Create [DfExpr::AggregateFunction] expr for each value column with given aggregate function. + /// + /// # Side effect + /// + /// This method will update value columns in context to the new value columns created by + /// aggregate function. + fn create_aggregate_exprs( + &mut self, + op: TokenType, + input_plan: &LogicalPlan, + ) -> Result> { let aggr = match op.id() { token::T_SUM => AggregateFunctionEnum::Sum, token::T_AVG => AggregateFunctionEnum::Avg, @@ -597,7 +636,8 @@ impl PromPlanner { _ => UnexpectedTokenSnafu { token: op }.fail()?, }; - let exprs = self + // perform aggregate operation to each value column + let exprs: Vec = self .ctx .value_columns .iter() @@ -610,6 +650,16 @@ impl PromPlanner { }) }) .collect(); + + // update value column name according to the aggregators + let mut new_value_columns = Vec::with_capacity(self.ctx.value_columns.len()); + let normalized_exprs = + normalize_cols(exprs.iter().cloned(), input_plan).context(DataFusionPlanningSnafu)?; + for expr in normalized_exprs { + new_value_columns.push(expr.display_name().context(DataFusionPlanningSnafu)?); + } + self.ctx.value_columns = new_value_columns; + Ok(exprs) } @@ -666,15 +716,25 @@ impl PromPlanner { } } - /// Build a inner join on time index column to concat two logical plans. + /// Build a inner join on time index column and tag columns to concat two logical plans. /// The left plan will be alised as [`LEFT_PLAN_JOIN_ALIAS`]. - fn join_on_time_index(&self, left: LogicalPlan, right: LogicalPlan) -> Result { - let time_index_column = Column::from_name( - self.ctx - .time_index_column - .clone() - .context(TimeIndexNotFoundSnafu { table: "unknown" })?, - ); + fn join_on_non_value_columns( + &self, + left: LogicalPlan, + right: LogicalPlan, + ) -> Result { + let mut tag_columns = self + .ctx + .tag_columns + .iter() + .map(Column::from_name) + .collect::>(); + + // push time index column if it exist + if let Some(time_index_column) = &self.ctx.time_index_column { + tag_columns.push(Column::from_name(time_index_column)); + } + // Inner Join on time index column to concat two operator LogicalPlanBuilder::from(left) .alias(LEFT_PLAN_JOIN_ALIAS) @@ -682,7 +742,8 @@ impl PromPlanner { .join( right, JoinType::Inner, - (vec![time_index_column.clone()], vec![time_index_column]), + // (vec![time_index_column.clone()], vec![time_index_column]), + (tag_columns.clone(), tag_columns), None, ) .context(DataFusionPlanningSnafu)? @@ -690,23 +751,54 @@ impl PromPlanner { .context(DataFusionPlanningSnafu) } - // Build a projection that project and perform operation expr for every value columns. + /// Build a projection that project and perform operation expr for every value columns. + /// Non-value columns (tag and timestamp) will be preserved in the projection. + /// + /// # Side effect + /// + /// This function will update the value columns in the context. Those new column names + /// don't contains qualifier. fn projection_for_each_value_column( - &self, + &mut self, input: LogicalPlan, name_to_expr: F, ) -> Result where - F: Fn(&String) -> Result, + F: FnMut(&String) -> Result, { - let value_columns = self + let non_value_columns_iter = self + .ctx + .tag_columns + .iter() + .chain(self.ctx.time_index_column.iter()) + .map(|col| Ok(DfExpr::Column(Column::from(col)))); + + // build computation exprs + let result_value_columns = self .ctx .value_columns .iter() .map(name_to_expr) .collect::>>()?; + + // alias the computation exprs to remove qualifier + self.ctx.value_columns = result_value_columns + .iter() + .map(|expr| expr.display_name()) + .collect::>>() + .context(DataFusionPlanningSnafu)?; + let value_columns_iter = result_value_columns + .into_iter() + .zip(self.ctx.value_columns.iter()) + .map(|(expr, name)| Ok(DfExpr::Alias(Box::new(expr), name.to_string()))); + + // chain non-value columns (unchanged) and value columns (applied computation then alias) + let project_fields = non_value_columns_iter + .chain(value_columns_iter) + .collect::>>()?; + LogicalPlanBuilder::from(input) - .project(value_columns) + .project(project_fields) .context(DataFusionPlanningSnafu)? .build() .context(DataFusionPlanningSnafu) @@ -1167,8 +1259,8 @@ mod test { let plan = PromPlanner::stmt_to_plan(eval_stmt, context_provider).unwrap(); let expected = String::from( - "Projection: lhs.field_0 + some_metric.field_0 [lhs.field_0 + some_metric.field_0:Float64;N]\ - \n Inner Join: lhs.timestamp = some_metric.timestamp [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + "Projection: lhs.tag_0, lhs.timestamp, some_metric.field_0 + some_metric.field_0 AS some_metric.field_0 + some_metric.field_0 [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), some_metric.field_0 + some_metric.field_0:Float64;N]\ + \n Inner Join: lhs.tag_0 = some_metric.tag_0, lhs.timestamp = some_metric.timestamp [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ \n SubqueryAlias: lhs [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ \n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ @@ -1204,7 +1296,7 @@ mod test { let plan = PromPlanner::stmt_to_plan(eval_stmt, context_provider).unwrap(); let expected = String::from( - "Projection: Float64(1) + some_metric.field_0 [Float64(1) + some_metric.field_0:Float64;N]\ + "Projection: some_metric.tag_0, some_metric.timestamp, Float64(1) + some_metric.field_0 AS Float64(1) + field_0 [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), Float64(1) + field_0:Float64;N]\ \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ \n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\