From 113ca5b7febd110be8cc2412671df729698f3eda Mon Sep 17 00:00:00 2001 From: discord9 Date: Sat, 24 Jan 2026 14:58:11 +0800 Subject: [PATCH] fix: ts column dedup? Signed-off-by: discord9 --- src/query/src/promql/planner.rs | 38 ++++- .../flow-tql/bug_named_schema.result | 147 ++++++++++++++++++ .../distributed/flow-tql/bug_named_schema.sql | 63 ++++++++ 3 files changed, 245 insertions(+), 3 deletions(-) create mode 100644 tests/cases/distributed/flow-tql/bug_named_schema.result create mode 100644 tests/cases/distributed/flow-tql/bug_named_schema.sql diff --git a/src/query/src/promql/planner.rs b/src/query/src/promql/planner.rs index 91deb507ce..a011ab838a 100644 --- a/src/query/src/promql/planner.rs +++ b/src/query/src/promql/planner.rs @@ -407,7 +407,11 @@ impl PromPlanner { }; // calculate columns to group by // Need to append time index column into group by columns - let mut group_exprs = self.agg_modifier_to_col(input.schema(), modifier, true)?; + let mut group_exprs = Self::dedup_group_exprs_by_name(self.agg_modifier_to_col( + input.schema(), + modifier, + true, + )?); // convert op and value columns to aggregate exprs let (mut aggr_exprs, prev_field_exprs) = self.create_aggregate_exprs(*op, param, &input)?; @@ -435,6 +439,7 @@ impl PromPlanner { // `count_values` must be grouped by fields, // and project the fields to the new label. group_exprs.extend(prev_field_exprs.clone()); + group_exprs = Self::dedup_group_exprs_by_name(group_exprs); let project_fields = self .create_field_column_exprs()? .into_iter() @@ -483,7 +488,11 @@ impl PromPlanner { }); self.ctx.use_tsid = input_has_tsid; - let group_exprs = self.agg_modifier_to_col(input.schema(), modifier, false)?; + let group_exprs = Self::dedup_group_exprs_by_name(self.agg_modifier_to_col( + input.schema(), + modifier, + false, + )?); let val = Self::get_param_as_literal_expr(param, Some(*op), Some(ArrowDataType::Float64))?; @@ -1354,7 +1363,15 @@ impl PromPlanner { } } // add timestamp column - exprs.push(self.create_time_index_column_expr()?); + let time_index = self.create_time_index_column_expr()?; + let time_index_name = time_index.schema_name().to_string(); + let exists = exprs + .iter() + .any(|expr| expr.schema_name().to_string() == time_index_name); + + if !exists { + exprs.push(time_index); + } Ok(exprs) } @@ -1402,6 +1419,21 @@ impl PromPlanner { } } + /// Deduplicates group expressions by their schema names while preserving the first occurrence order. + fn dedup_group_exprs_by_name(exprs: Vec) -> Vec { + let mut seen = HashSet::new(); + let mut deduped = Vec::with_capacity(exprs.len()); + + for expr in exprs { + let name = expr.schema_name().to_string(); + if seen.insert(name.clone()) { + deduped.push(expr); + } + } + + deduped + } + // TODO(ruihang): ignore `MetricNameLabel` (`__name__`) matcher pub fn matchers_to_expr( label_matchers: Matchers, diff --git a/tests/cases/distributed/flow-tql/bug_named_schema.result b/tests/cases/distributed/flow-tql/bug_named_schema.result new file mode 100644 index 0000000000..3b4a6c630d --- /dev/null +++ b/tests/cases/distributed/flow-tql/bug_named_schema.result @@ -0,0 +1,147 @@ +-- Minimalized repro for tsid/table_id Substrait flow path +CREATE TABLE phy_metric_min ( + ts timestamp(3) time index, + tag_a STRING, + v DOUBLE NULL, + PRIMARY KEY (tag_a) +) ENGINE = metric WITH ("physical_metric_table" = ""); + +Affected Rows: 0 + +SHOW CREATE TABLE phy_metric_min; + ++----------------+-----------------------------------------------+ +| Table | Create Table | ++----------------+-----------------------------------------------+ +| phy_metric_min | CREATE TABLE IF NOT EXISTS "phy_metric_min" ( | +| | "ts" TIMESTAMP(3) NOT NULL, | +| | "tag_a" STRING NULL, | +| | "v" DOUBLE NULL, | +| | TIME INDEX ("ts"), | +| | PRIMARY KEY ("tag_a") | +| | ) | +| | | +| | ENGINE=metric | +| | WITH( | +| | physical_metric_table = '' | +| | ) | ++----------------+-----------------------------------------------+ + +CREATE TABLE IF NOT EXISTS metric_min ( + tag_a STRING, + ts TIMESTAMP(3) NOT NULL, + v DOUBLE NULL, + TIME INDEX (ts), + PRIMARY KEY (tag_a) +) ENGINE=metric WITH( + on_physical_table = 'phy_metric_min' +); + +Affected Rows: 0 + +INSERT INTO metric_min + (ts, v, tag_a) +VALUES + ('2026-01-23T03:40:00Z', 1.0, 'alpha'), + ('2026-01-23T03:41:00Z', 2.0, 'alpha'), + ('2026-01-23T03:41:00Z', 4.0, 'beta'); + +Affected Rows: 3 + +select ts, ts from metric_min limit 1; + +Error: 3000(PlanQuery), Failed to plan SQL: Error during planning: Projections require unique expression names but the expression "metric_min.ts" at position 0 and "metric_min.ts" at position 1 have the same name. Consider aliasing ("AS") one of them. + +-- Substrait encode/decode check via TQL pushdown on metric_min +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE phy.__table_id\s=\sUInt32\(\d+\) phy.__table_id=UInt32(REDACTED) +TQL EXPLAIN ( + timestamp '2026-01-23 03:38:00+00', + timestamp '2026-01-23 03:44:00+00', + '1m' +) +sum by (tag_a, ts) ( + sum_over_time(metric_min{tag_a!=""}[2m]) +); + ++---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| logical_plan | MergeScan [is_placeholder=false, remote_input=[ | +| | Sort: metric_min.tag_a ASC NULLS LAST, metric_min.ts ASC NULLS LAST | +| | Aggregate: groupBy=[[metric_min.tag_a, metric_min.ts]], aggr=[[sum(prom_sum_over_time(ts_range,v))]] | +| | Filter: prom_sum_over_time(ts_range,v) IS NOT NULL | +| | Projection: metric_min.ts, prom_sum_over_time(ts_range, v) AS prom_sum_over_time(ts_range,v), metric_min.tag_a | +| | PromRangeManipulate: req range=[0..0], interval=[300000], eval range=[120000], time index=[ts], values=["v"] | +| | PromSeriesNormalize: offset=[0], time index=[ts], filter NaN: [true] | +| | PromSeriesDivide: tags=["__tsid"] | +| | Sort: metric_min.__tsid ASC NULLS FIRST, metric_min.ts ASC NULLS FIRST | +| | Filter: metric_min.tag_a != Utf8("") AND metric_min.ts >= TimestampMillisecond(-420000, None) AND metric_min.ts <= TimestampMillisecond(300000, None) | +| | Projection: metric_min.v, metric_min.tag_a, metric_min.__tsid, metric_min.ts | +| | Filter: metric_min.__table_id = UInt32(1025) | +| | TableScan: metric_min projection=[ts, tag_a, v, __table_id, __tsid] | +| | ]] | +| physical_plan | CooperativeExec | +| | MergeScanExec: REDACTED +| | | ++---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------+ + +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE phy.__table_id\s=\sUInt32\(\d+\) phy.__table_id=UInt32(REDACTED) +TQL EXPLAIN ( + timestamp '2026-01-23 03:38:00+00', + timestamp '2026-01-23 03:44:00+00', + '1m' +) +sum_over_time(metric_min{tag_a!=""}[2m]) +; + ++---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| logical_plan | MergeScan [is_placeholder=false, remote_input=[ | +| | Filter: prom_sum_over_time(ts_range,v) IS NOT NULL | +| | Projection: metric_min.ts, prom_sum_over_time(ts_range, v) AS prom_sum_over_time(ts_range,v), metric_min.tag_a | +| | PromRangeManipulate: req range=[0..0], interval=[300000], eval range=[120000], time index=[ts], values=["v"] | +| | PromSeriesNormalize: offset=[0], time index=[ts], filter NaN: [true] | +| | PromSeriesDivide: tags=["__tsid"] | +| | Sort: metric_min.__tsid ASC NULLS FIRST, metric_min.ts ASC NULLS FIRST | +| | Filter: metric_min.tag_a != Utf8("") AND metric_min.ts >= TimestampMillisecond(-420000, None) AND metric_min.ts <= TimestampMillisecond(300000, None) | +| | Projection: metric_min.v, metric_min.tag_a, metric_min.__tsid, metric_min.ts | +| | Filter: metric_min.__table_id = UInt32(1025) | +| | TableScan: metric_min projection=[ts, tag_a, v, __table_id, __tsid] | +| | ]] | +| physical_plan | CooperativeExec | +| | MergeScanExec: REDACTED +| | | ++---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+ + +TQL EVAL ( + timestamp '2026-01-23 03:38:00+00', + timestamp '2026-01-23 03:44:00+00', + '1m' +) +sum by (tag_a, ts) ( + sum_over_time(metric_min{tag_a!=""}[2m]) +); + ++-------+---------------------+-------------------------------------+ +| tag_a | ts | sum(prom_sum_over_time(ts_range,v)) | ++-------+---------------------+-------------------------------------+ +| alpha | 2026-01-23T03:40:00 | 1.0 | +| alpha | 2026-01-23T03:41:00 | 3.0 | +| alpha | 2026-01-23T03:42:00 | 3.0 | +| alpha | 2026-01-23T03:43:00 | 2.0 | +| beta | 2026-01-23T03:41:00 | 4.0 | +| beta | 2026-01-23T03:42:00 | 4.0 | +| beta | 2026-01-23T03:43:00 | 4.0 | ++-------+---------------------+-------------------------------------+ + +DROP TABLE metric_min; + +Affected Rows: 0 + +DROP TABLE phy_metric_min; + +Affected Rows: 0 + diff --git a/tests/cases/distributed/flow-tql/bug_named_schema.sql b/tests/cases/distributed/flow-tql/bug_named_schema.sql new file mode 100644 index 0000000000..5ea1389223 --- /dev/null +++ b/tests/cases/distributed/flow-tql/bug_named_schema.sql @@ -0,0 +1,63 @@ +-- Minimalized repro for tsid/table_id Substrait flow path + +CREATE TABLE phy_metric_min ( + ts timestamp(3) time index, + tag_a STRING, + v DOUBLE NULL, + PRIMARY KEY (tag_a) +) ENGINE = metric WITH ("physical_metric_table" = ""); + +SHOW CREATE TABLE phy_metric_min; + +CREATE TABLE IF NOT EXISTS metric_min ( + tag_a STRING, + ts TIMESTAMP(3) NOT NULL, + v DOUBLE NULL, + TIME INDEX (ts), + PRIMARY KEY (tag_a) +) ENGINE=metric WITH( + on_physical_table = 'phy_metric_min' +); + +INSERT INTO metric_min + (ts, v, tag_a) +VALUES + ('2026-01-23T03:40:00Z', 1.0, 'alpha'), + ('2026-01-23T03:41:00Z', 2.0, 'alpha'), + ('2026-01-23T03:41:00Z', 4.0, 'beta'); + +select ts, ts from metric_min limit 1; + +-- Substrait encode/decode check via TQL pushdown on metric_min +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE phy.__table_id\s=\sUInt32\(\d+\) phy.__table_id=UInt32(REDACTED) +TQL EXPLAIN ( + timestamp '2026-01-23 03:38:00+00', + timestamp '2026-01-23 03:44:00+00', + '1m' +) +sum by (tag_a, ts) ( + sum_over_time(metric_min{tag_a!=""}[2m]) +); + +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE phy.__table_id\s=\sUInt32\(\d+\) phy.__table_id=UInt32(REDACTED) +TQL EXPLAIN ( + timestamp '2026-01-23 03:38:00+00', + timestamp '2026-01-23 03:44:00+00', + '1m' +) +sum_over_time(metric_min{tag_a!=""}[2m]) +; + +TQL EVAL ( + timestamp '2026-01-23 03:38:00+00', + timestamp '2026-01-23 03:44:00+00', + '1m' +) +sum by (tag_a, ts) ( + sum_over_time(metric_min{tag_a!=""}[2m]) +); + +DROP TABLE metric_min; +DROP TABLE phy_metric_min;