diff --git a/src/promql/src/error.rs b/src/promql/src/error.rs index 7202aa5a7a..1dcddcd126 100644 --- a/src/promql/src/error.rs +++ b/src/promql/src/error.rs @@ -129,6 +129,12 @@ pub enum Error { right: Vec, location: Location, }, + + #[snafu(display("Multi fields calculation is not supported in {}", operator))] + MultiFieldsNotSupported { + operator: String, + location: Location, + }, } impl ErrorExt for Error { @@ -149,6 +155,7 @@ impl ErrorExt for Error { | UnsupportedVectorMatch { .. } | CombineTableColumnMismatch { .. } | DataFusionPlanning { .. } + | MultiFieldsNotSupported { .. } | UnexpectedPlanExpr { .. } | IllegalRange { .. } => StatusCode::InvalidArguments, diff --git a/src/promql/src/extension_plan/union_distinct_on.rs b/src/promql/src/extension_plan/union_distinct_on.rs index 22551b73f8..20e40415c9 100644 --- a/src/promql/src/extension_plan/union_distinct_on.rs +++ b/src/promql/src/extension_plan/union_distinct_on.rs @@ -362,6 +362,7 @@ impl HashedData { ) -> DataFusionResult { // Collect all batches from the input stream let initial = (Vec::new(), 0); + let schema = input.schema(); let (batches, _num_rows) = input .try_fold(initial, |mut acc, batch| async { // Update rowcount @@ -399,7 +400,7 @@ impl HashedData { } // Finilize the hash map - let batch = interleave_batches(batches, interleave_indices)?; + let batch = interleave_batches(schema, batches, interleave_indices)?; Ok(Self { hash_map, @@ -442,10 +443,19 @@ impl HashedData { /// Utility function to interleave batches. Based on [interleave](datafusion::arrow::compute::interleave) fn interleave_batches( + schema: SchemaRef, batches: Vec, indices: Vec<(usize, usize)>, ) -> DataFusionResult { - let schema = batches[0].schema(); + if batches.is_empty() { + if indices.is_empty() { + return Ok(RecordBatch::new_empty(schema)); + } else { + return Err(DataFusionError::Internal( + "Cannot interleave empty batches with non-empty indices".to_string(), + )); + } + } // transform batches into arrays let mut arrays = vec![vec![]; schema.fields().len()]; @@ -488,6 +498,8 @@ fn take_batch(batch: &RecordBatch, indices: &[usize]) -> DataFusionResult>(); + self.ctx.time_index_column = Some(left_time_index.clone()); + + // alias right time index column if necessary + if left_context.time_index_column != right_context.time_index_column { + let right_project_exprs = right + .schema() + .fields() + .iter() + .map(|field| { + if field.name() == &right_time_index { + DfExpr::Column(Column::from_name(&right_time_index)).alias(&left_time_index) + } else { + DfExpr::Column(Column::from_name(field.name())) + } + }) + .collect::>(); + + right = LogicalPlanBuilder::from(right) + .project(right_project_exprs) + .context(DataFusionPlanningSnafu)? + .build() + .context(DataFusionPlanningSnafu)?; + } // Generate join plan. // All set operations in PromQL are "distinct" @@ -1675,6 +1701,21 @@ impl PromPlanner { right_context: PromPlannerContext, modifier: &Option, ) -> Result { + // checks + ensure!( + left_context.field_columns.len() == right_context.field_columns.len(), + CombineTableColumnMismatchSnafu { + left: left_context.field_columns.clone(), + right: right_context.field_columns.clone() + } + ); + ensure!( + left_context.field_columns.len() == 1, + MultiFieldsNotSupportedSnafu { + operator: "OR operator" + } + ); + // prepare hash sets let all_tags = left_tag_cols_set .union(&right_tag_cols_set) @@ -1712,6 +1753,9 @@ impl PromPlanner { .with_context(|| TimeIndexNotFoundSnafu { table: right_qualifier_string.clone(), })?; + // Take the name of first field column. The length is checked above. + let left_field_col = left_context.field_columns.first().unwrap(); + let right_field_col = right_context.field_columns.first().unwrap(); // step 0: fill all columns in output schema let mut all_columns_set = left @@ -1724,6 +1768,10 @@ impl PromPlanner { // remove time index column all_columns_set.remove(&left_time_index_column); all_columns_set.remove(&right_time_index_column); + // remove field column in the right + if left_field_col != right_field_col { + all_columns_set.remove(right_field_col); + } let mut all_columns = all_columns_set.into_iter().collect::>(); // sort to ensure the generated schema is not volatile all_columns.sort_unstable(); @@ -1735,7 +1783,7 @@ impl PromPlanner { if tags_not_in_left.contains(col) { DfExpr::Literal(ScalarValue::Utf8(None)).alias(col.to_string()) } else { - DfExpr::Column(Column::new(left_qualifier.clone(), col)) + DfExpr::Column(Column::new(None::, col)) } }); let right_time_index_expr = DfExpr::Column(Column::new( @@ -1743,11 +1791,16 @@ impl PromPlanner { right_time_index_column, )) .alias(left_time_index_column.clone()); + // `skip(1)` to skip the time index column let right_proj_exprs_without_time_index = all_columns.iter().skip(1).map(|col| { - if tags_not_in_right.contains(col) { + // expr + if col == left_field_col && left_field_col != right_field_col { + // alias field in right side if necessary to handle different field name + DfExpr::Column(Column::new(right_qualifier.clone(), right_field_col)) + } else if tags_not_in_right.contains(col) { DfExpr::Literal(ScalarValue::Utf8(None)).alias(col.to_string()) } else { - DfExpr::Column(Column::new(right_qualifier.clone(), col)) + DfExpr::Column(Column::new(None::, col)) } }); let right_proj_exprs = [right_time_index_expr] diff --git a/tests/cases/standalone/common/promql/set_operation.result b/tests/cases/standalone/common/promql/set_operation.result index 31a0a6638f..230ee5b566 100644 --- a/tests/cases/standalone/common/promql/set_operation.result +++ b/tests/cases/standalone/common/promql/set_operation.result @@ -25,7 +25,7 @@ insert into http_requests values Affected Rows: 8 -- empty metric -create table cpu_count(ts timestamp time index); +create table cpu_count(ts timestamp time index, greptime_value double); Affected Rows: 0 @@ -38,10 +38,9 @@ create table vector_matching_a( Affected Rows: 0 insert into vector_matching_a values - (3000000, "x", 10), - (3000000, "y", 20); + (3000000, "x", 10); -Affected Rows: 2 +Affected Rows: 1 -- eval instant at 50m http_requests{group="canary"} and http_requests{instance="0"} -- http_requests{group="canary", instance="0", job="api-server"} 300 @@ -157,7 +156,16 @@ tql eval (3000, 3000, '1s') http_requests{g="canary"} or http_requests{g="produc -- SQLNESS SORT_RESULT 3 1 tql eval (3000, 3000, '1s') (http_requests{g="canary"} + 1) or http_requests{instance="1"}; -Error: 1004(InvalidArguments), Internal error during building DataFusion plan: No field named http_requests.greptime_value. Valid fields are http_requests.job, http_requests.instance, http_requests.g, http_requests.ts, "greptime_value + Float64(1)". ++---------------------+------------+-----------------------------+----------+-----+ +| ts | g | greptime_value + Float64(1) | instance | job | ++---------------------+------------+-----------------------------+----------+-----+ +| 1970-01-01T00:50:00 | canary | 301.0 | 0 | api | +| 1970-01-01T00:50:00 | canary | 401.0 | 1 | api | +| 1970-01-01T00:50:00 | canary | 701.0 | 0 | app | +| 1970-01-01T00:50:00 | canary | 801.0 | 1 | app | +| 1970-01-01T00:50:00 | production | 200.0 | 1 | api | +| 1970-01-01T00:50:00 | production | 600.0 | 1 | app | ++---------------------+------------+-----------------------------+----------+-----+ -- # Matching only on instance excludes everything that has instance=0/1 but includes -- # entries without the instance label. @@ -168,11 +176,18 @@ Error: 1004(InvalidArguments), Internal error during building DataFusion plan: N -- {group="canary", instance="1", job="app-server"} 801 -- vector_matching_a{l="x"} 10 -- vector_matching_a{l="y"} 20 --- NOT SUPPORTED: union on different schemas --- NOT SUPPORTED: `or` +-- SQLNESS SORT_RESULT 3 1 tql eval (3000, 3000, '1s') (http_requests{g="canary"} + 1) or on(instance) (http_requests or cpu_count or vector_matching_a); -Error: 1004(InvalidArguments), Internal error during building DataFusion plan: No field named cpu_count.greptime_value. Valid fields are cpu_count.ts. ++---------------------+--------+-----------------------------+----------+-----+---+ +| ts | g | greptime_value + Float64(1) | instance | job | l | ++---------------------+--------+-----------------------------+----------+-----+---+ +| 1970-01-01T00:50:00 | | 10.0 | | | x | +| 1970-01-01T00:50:00 | canary | 301.0 | 0 | api | | +| 1970-01-01T00:50:00 | canary | 401.0 | 1 | api | | +| 1970-01-01T00:50:00 | canary | 701.0 | 0 | app | | +| 1970-01-01T00:50:00 | canary | 801.0 | 1 | app | | ++---------------------+--------+-----------------------------+----------+-----+---+ -- eval instant at 50m (http_requests{group="canary"} + 1) or ignoring(l, group, job) (http_requests or cpu_count or vector_matching_a) -- {group="canary", instance="0", job="api-server"} 301 @@ -181,11 +196,18 @@ Error: 1004(InvalidArguments), Internal error during building DataFusion plan: N -- {group="canary", instance="1", job="app-server"} 801 -- vector_matching_a{l="x"} 10 -- vector_matching_a{l="y"} 20 --- NOT SUPPORTED: union on different schemas --- NOT SUPPORTED: `or` +-- SQLNESS SORT_RESULT 3 1 tql eval (3000, 3000, '1s') (http_requests{g="canary"} + 1) or ignoring(l, g, job) (http_requests or cpu_count or vector_matching_a); -Error: 1004(InvalidArguments), Internal error during building DataFusion plan: No field named cpu_count.greptime_value. Valid fields are cpu_count.ts. ++---------------------+--------+-----------------------------+----------+-----+---+ +| ts | g | greptime_value + Float64(1) | instance | job | l | ++---------------------+--------+-----------------------------+----------+-----+---+ +| 1970-01-01T00:50:00 | | 10.0 | | | x | +| 1970-01-01T00:50:00 | canary | 301.0 | 0 | api | | +| 1970-01-01T00:50:00 | canary | 401.0 | 1 | api | | +| 1970-01-01T00:50:00 | canary | 701.0 | 0 | app | | +| 1970-01-01T00:50:00 | canary | 801.0 | 1 | app | | ++---------------------+--------+-----------------------------+----------+-----+---+ -- eval instant at 50m http_requests{group="canary"} unless http_requests{instance="0"} -- http_requests{group="canary", instance="1", job="api-server"} 400 @@ -248,10 +270,21 @@ tql eval (3000, 3000, '1s') http_requests{g="canary"} unless ignoring(g) http_re -- http_requests{group="production", instance="0", job="app-server"} 500 -- http_requests{group="production", instance="1", job="api-server"} 200 -- http_requests{group="production", instance="1", job="app-server"} 600 --- NOT SUPPORTED: `vector()` +-- SQLNESS SORT_RESULT 3 1 tql eval (3000, 3000, '1s') http_requests AND ON (dummy) vector(1); -Error: 1004(InvalidArguments), Internal error during building DataFusion plan: No field named time. Valid fields are http_requests.ts, http_requests.job, http_requests.instance, http_requests.g, http_requests.greptime_value. ++---------------------+-----+----------+------------+----------------+ +| ts | job | instance | g | greptime_value | ++---------------------+-----+----------+------------+----------------+ +| 1970-01-01T00:50:00 | api | 0 | canary | 300.0 | +| 1970-01-01T00:50:00 | api | 0 | production | 100.0 | +| 1970-01-01T00:50:00 | api | 1 | canary | 400.0 | +| 1970-01-01T00:50:00 | api | 1 | production | 200.0 | +| 1970-01-01T00:50:00 | app | 0 | canary | 700.0 | +| 1970-01-01T00:50:00 | app | 0 | production | 500.0 | +| 1970-01-01T00:50:00 | app | 1 | canary | 800.0 | +| 1970-01-01T00:50:00 | app | 1 | production | 600.0 | ++---------------------+-----+----------+------------+----------------+ -- eval instant at 50m http_requests AND IGNORING (group, instance, job) vector(1) -- http_requests{group="canary", instance="0", job="api-server"} 300 @@ -262,10 +295,21 @@ Error: 1004(InvalidArguments), Internal error during building DataFusion plan: N -- http_requests{group="production", instance="0", job="app-server"} 500 -- http_requests{group="production", instance="1", job="api-server"} 200 -- http_requests{group="production", instance="1", job="app-server"} 600 --- NOT SUPPORTED: `vector()` +-- SQLNESS SORT_RESULT 3 1 tql eval (3000, 3000, '1s') http_requests AND IGNORING (g, instance, job) vector(1); -Error: 1004(InvalidArguments), Internal error during building DataFusion plan: No field named time. Valid fields are http_requests.ts, http_requests.job, http_requests.instance, http_requests.g, http_requests.greptime_value. ++---------------------+-----+----------+------------+----------------+ +| ts | job | instance | g | greptime_value | ++---------------------+-----+----------+------------+----------------+ +| 1970-01-01T00:50:00 | api | 0 | canary | 300.0 | +| 1970-01-01T00:50:00 | api | 0 | production | 100.0 | +| 1970-01-01T00:50:00 | api | 1 | canary | 400.0 | +| 1970-01-01T00:50:00 | api | 1 | production | 200.0 | +| 1970-01-01T00:50:00 | app | 0 | canary | 700.0 | +| 1970-01-01T00:50:00 | app | 0 | production | 500.0 | +| 1970-01-01T00:50:00 | app | 1 | canary | 800.0 | +| 1970-01-01T00:50:00 | app | 1 | production | 600.0 | ++---------------------+-----+----------+------------+----------------+ drop table http_requests; diff --git a/tests/cases/standalone/common/promql/set_operation.sql b/tests/cases/standalone/common/promql/set_operation.sql index 521791a02c..a386ca96c4 100644 --- a/tests/cases/standalone/common/promql/set_operation.sql +++ b/tests/cases/standalone/common/promql/set_operation.sql @@ -22,7 +22,7 @@ insert into http_requests values (3000000, "app", "1", "canary", 800); -- empty metric -create table cpu_count(ts timestamp time index); +create table cpu_count(ts timestamp time index, greptime_value double); create table vector_matching_a( ts timestamp time index, @@ -31,8 +31,7 @@ create table vector_matching_a( ); insert into vector_matching_a values - (3000000, "x", 10), - (3000000, "y", 20); + (3000000, "x", 10); -- eval instant at 50m http_requests{group="canary"} and http_requests{instance="0"} -- http_requests{group="canary", instance="0", job="api-server"} 300 @@ -103,8 +102,7 @@ tql eval (3000, 3000, '1s') (http_requests{g="canary"} + 1) or http_requests{ins -- {group="canary", instance="1", job="app-server"} 801 -- vector_matching_a{l="x"} 10 -- vector_matching_a{l="y"} 20 --- NOT SUPPORTED: union on different schemas --- NOT SUPPORTED: `or` +-- SQLNESS SORT_RESULT 3 1 tql eval (3000, 3000, '1s') (http_requests{g="canary"} + 1) or on(instance) (http_requests or cpu_count or vector_matching_a); -- eval instant at 50m (http_requests{group="canary"} + 1) or ignoring(l, group, job) (http_requests or cpu_count or vector_matching_a) @@ -114,8 +112,7 @@ tql eval (3000, 3000, '1s') (http_requests{g="canary"} + 1) or on(instance) (htt -- {group="canary", instance="1", job="app-server"} 801 -- vector_matching_a{l="x"} 10 -- vector_matching_a{l="y"} 20 --- NOT SUPPORTED: union on different schemas --- NOT SUPPORTED: `or` +-- SQLNESS SORT_RESULT 3 1 tql eval (3000, 3000, '1s') (http_requests{g="canary"} + 1) or ignoring(l, g, job) (http_requests or cpu_count or vector_matching_a); -- eval instant at 50m http_requests{group="canary"} unless http_requests{instance="0"} @@ -153,7 +150,7 @@ tql eval (3000, 3000, '1s') http_requests{g="canary"} unless ignoring(g) http_re -- http_requests{group="production", instance="0", job="app-server"} 500 -- http_requests{group="production", instance="1", job="api-server"} 200 -- http_requests{group="production", instance="1", job="app-server"} 600 --- NOT SUPPORTED: `vector()` +-- SQLNESS SORT_RESULT 3 1 tql eval (3000, 3000, '1s') http_requests AND ON (dummy) vector(1); -- eval instant at 50m http_requests AND IGNORING (group, instance, job) vector(1) @@ -165,7 +162,7 @@ tql eval (3000, 3000, '1s') http_requests AND ON (dummy) vector(1); -- http_requests{group="production", instance="0", job="app-server"} 500 -- http_requests{group="production", instance="1", job="api-server"} 200 -- http_requests{group="production", instance="1", job="app-server"} 600 --- NOT SUPPORTED: `vector()` +-- SQLNESS SORT_RESULT 3 1 tql eval (3000, 3000, '1s') http_requests AND IGNORING (g, instance, job) vector(1); drop table http_requests;