fix: ts column dedup?

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2026-01-24 14:58:11 +08:00
parent 2f82e7525f
commit 113ca5b7fe
3 changed files with 245 additions and 3 deletions

View File

@@ -407,7 +407,11 @@ impl PromPlanner {
};
// calculate columns to group by
// Need to append time index column into group by columns
let mut group_exprs = self.agg_modifier_to_col(input.schema(), modifier, true)?;
let mut group_exprs = Self::dedup_group_exprs_by_name(self.agg_modifier_to_col(
input.schema(),
modifier,
true,
)?);
// convert op and value columns to aggregate exprs
let (mut aggr_exprs, prev_field_exprs) =
self.create_aggregate_exprs(*op, param, &input)?;
@@ -435,6 +439,7 @@ impl PromPlanner {
// `count_values` must be grouped by fields,
// and project the fields to the new label.
group_exprs.extend(prev_field_exprs.clone());
group_exprs = Self::dedup_group_exprs_by_name(group_exprs);
let project_fields = self
.create_field_column_exprs()?
.into_iter()
@@ -483,7 +488,11 @@ impl PromPlanner {
});
self.ctx.use_tsid = input_has_tsid;
let group_exprs = self.agg_modifier_to_col(input.schema(), modifier, false)?;
let group_exprs = Self::dedup_group_exprs_by_name(self.agg_modifier_to_col(
input.schema(),
modifier,
false,
)?);
let val = Self::get_param_as_literal_expr(param, Some(*op), Some(ArrowDataType::Float64))?;
@@ -1354,7 +1363,15 @@ impl PromPlanner {
}
}
// add timestamp column
exprs.push(self.create_time_index_column_expr()?);
let time_index = self.create_time_index_column_expr()?;
let time_index_name = time_index.schema_name().to_string();
let exists = exprs
.iter()
.any(|expr| expr.schema_name().to_string() == time_index_name);
if !exists {
exprs.push(time_index);
}
Ok(exprs)
}
@@ -1402,6 +1419,21 @@ impl PromPlanner {
}
}
/// Deduplicates group expressions by their schema names while preserving the first occurrence order.
fn dedup_group_exprs_by_name(exprs: Vec<DfExpr>) -> Vec<DfExpr> {
let mut seen = HashSet::new();
let mut deduped = Vec::with_capacity(exprs.len());
for expr in exprs {
let name = expr.schema_name().to_string();
if seen.insert(name.clone()) {
deduped.push(expr);
}
}
deduped
}
// TODO(ruihang): ignore `MetricNameLabel` (`__name__`) matcher
pub fn matchers_to_expr(
label_matchers: Matchers,

View File

@@ -0,0 +1,147 @@
-- Minimalized repro for tsid/table_id Substrait flow path
CREATE TABLE phy_metric_min (
ts timestamp(3) time index,
tag_a STRING,
v DOUBLE NULL,
PRIMARY KEY (tag_a)
) ENGINE = metric WITH ("physical_metric_table" = "");
Affected Rows: 0
SHOW CREATE TABLE phy_metric_min;
+----------------+-----------------------------------------------+
| Table | Create Table |
+----------------+-----------------------------------------------+
| phy_metric_min | CREATE TABLE IF NOT EXISTS "phy_metric_min" ( |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | "tag_a" STRING NULL, |
| | "v" DOUBLE NULL, |
| | TIME INDEX ("ts"), |
| | PRIMARY KEY ("tag_a") |
| | ) |
| | |
| | ENGINE=metric |
| | WITH( |
| | physical_metric_table = '' |
| | ) |
+----------------+-----------------------------------------------+
CREATE TABLE IF NOT EXISTS metric_min (
tag_a STRING,
ts TIMESTAMP(3) NOT NULL,
v DOUBLE NULL,
TIME INDEX (ts),
PRIMARY KEY (tag_a)
) ENGINE=metric WITH(
on_physical_table = 'phy_metric_min'
);
Affected Rows: 0
INSERT INTO metric_min
(ts, v, tag_a)
VALUES
('2026-01-23T03:40:00Z', 1.0, 'alpha'),
('2026-01-23T03:41:00Z', 2.0, 'alpha'),
('2026-01-23T03:41:00Z', 4.0, 'beta');
Affected Rows: 3
select ts, ts from metric_min limit 1;
Error: 3000(PlanQuery), Failed to plan SQL: Error during planning: Projections require unique expression names but the expression "metric_min.ts" at position 0 and "metric_min.ts" at position 1 have the same name. Consider aliasing ("AS") one of them.
-- Substrait encode/decode check via TQL pushdown on metric_min
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE phy.__table_id\s=\sUInt32\(\d+\) phy.__table_id=UInt32(REDACTED)
TQL EXPLAIN (
timestamp '2026-01-23 03:38:00+00',
timestamp '2026-01-23 03:44:00+00',
'1m'
)
sum by (tag_a, ts) (
sum_over_time(metric_min{tag_a!=""}[2m])
);
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | MergeScan [is_placeholder=false, remote_input=[ |
| | Sort: metric_min.tag_a ASC NULLS LAST, metric_min.ts ASC NULLS LAST |
| | Aggregate: groupBy=[[metric_min.tag_a, metric_min.ts]], aggr=[[sum(prom_sum_over_time(ts_range,v))]] |
| | Filter: prom_sum_over_time(ts_range,v) IS NOT NULL |
| | Projection: metric_min.ts, prom_sum_over_time(ts_range, v) AS prom_sum_over_time(ts_range,v), metric_min.tag_a |
| | PromRangeManipulate: req range=[0..0], interval=[300000], eval range=[120000], time index=[ts], values=["v"] |
| | PromSeriesNormalize: offset=[0], time index=[ts], filter NaN: [true] |
| | PromSeriesDivide: tags=["__tsid"] |
| | Sort: metric_min.__tsid ASC NULLS FIRST, metric_min.ts ASC NULLS FIRST |
| | Filter: metric_min.tag_a != Utf8("") AND metric_min.ts >= TimestampMillisecond(-420000, None) AND metric_min.ts <= TimestampMillisecond(300000, None) |
| | Projection: metric_min.v, metric_min.tag_a, metric_min.__tsid, metric_min.ts |
| | Filter: metric_min.__table_id = UInt32(1025) |
| | TableScan: metric_min projection=[ts, tag_a, v, __table_id, __tsid] |
| | ]] |
| physical_plan | CooperativeExec |
| | MergeScanExec: REDACTED
| | |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE phy.__table_id\s=\sUInt32\(\d+\) phy.__table_id=UInt32(REDACTED)
TQL EXPLAIN (
timestamp '2026-01-23 03:38:00+00',
timestamp '2026-01-23 03:44:00+00',
'1m'
)
sum_over_time(metric_min{tag_a!=""}[2m])
;
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | MergeScan [is_placeholder=false, remote_input=[ |
| | Filter: prom_sum_over_time(ts_range,v) IS NOT NULL |
| | Projection: metric_min.ts, prom_sum_over_time(ts_range, v) AS prom_sum_over_time(ts_range,v), metric_min.tag_a |
| | PromRangeManipulate: req range=[0..0], interval=[300000], eval range=[120000], time index=[ts], values=["v"] |
| | PromSeriesNormalize: offset=[0], time index=[ts], filter NaN: [true] |
| | PromSeriesDivide: tags=["__tsid"] |
| | Sort: metric_min.__tsid ASC NULLS FIRST, metric_min.ts ASC NULLS FIRST |
| | Filter: metric_min.tag_a != Utf8("") AND metric_min.ts >= TimestampMillisecond(-420000, None) AND metric_min.ts <= TimestampMillisecond(300000, None) |
| | Projection: metric_min.v, metric_min.tag_a, metric_min.__tsid, metric_min.ts |
| | Filter: metric_min.__table_id = UInt32(1025) |
| | TableScan: metric_min projection=[ts, tag_a, v, __table_id, __tsid] |
| | ]] |
| physical_plan | CooperativeExec |
| | MergeScanExec: REDACTED
| | |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+
TQL EVAL (
timestamp '2026-01-23 03:38:00+00',
timestamp '2026-01-23 03:44:00+00',
'1m'
)
sum by (tag_a, ts) (
sum_over_time(metric_min{tag_a!=""}[2m])
);
+-------+---------------------+-------------------------------------+
| tag_a | ts | sum(prom_sum_over_time(ts_range,v)) |
+-------+---------------------+-------------------------------------+
| alpha | 2026-01-23T03:40:00 | 1.0 |
| alpha | 2026-01-23T03:41:00 | 3.0 |
| alpha | 2026-01-23T03:42:00 | 3.0 |
| alpha | 2026-01-23T03:43:00 | 2.0 |
| beta | 2026-01-23T03:41:00 | 4.0 |
| beta | 2026-01-23T03:42:00 | 4.0 |
| beta | 2026-01-23T03:43:00 | 4.0 |
+-------+---------------------+-------------------------------------+
DROP TABLE metric_min;
Affected Rows: 0
DROP TABLE phy_metric_min;
Affected Rows: 0

View File

@@ -0,0 +1,63 @@
-- Minimalized repro for tsid/table_id Substrait flow path
CREATE TABLE phy_metric_min (
ts timestamp(3) time index,
tag_a STRING,
v DOUBLE NULL,
PRIMARY KEY (tag_a)
) ENGINE = metric WITH ("physical_metric_table" = "");
SHOW CREATE TABLE phy_metric_min;
CREATE TABLE IF NOT EXISTS metric_min (
tag_a STRING,
ts TIMESTAMP(3) NOT NULL,
v DOUBLE NULL,
TIME INDEX (ts),
PRIMARY KEY (tag_a)
) ENGINE=metric WITH(
on_physical_table = 'phy_metric_min'
);
INSERT INTO metric_min
(ts, v, tag_a)
VALUES
('2026-01-23T03:40:00Z', 1.0, 'alpha'),
('2026-01-23T03:41:00Z', 2.0, 'alpha'),
('2026-01-23T03:41:00Z', 4.0, 'beta');
select ts, ts from metric_min limit 1;
-- Substrait encode/decode check via TQL pushdown on metric_min
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE phy.__table_id\s=\sUInt32\(\d+\) phy.__table_id=UInt32(REDACTED)
TQL EXPLAIN (
timestamp '2026-01-23 03:38:00+00',
timestamp '2026-01-23 03:44:00+00',
'1m'
)
sum by (tag_a, ts) (
sum_over_time(metric_min{tag_a!=""}[2m])
);
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE phy.__table_id\s=\sUInt32\(\d+\) phy.__table_id=UInt32(REDACTED)
TQL EXPLAIN (
timestamp '2026-01-23 03:38:00+00',
timestamp '2026-01-23 03:44:00+00',
'1m'
)
sum_over_time(metric_min{tag_a!=""}[2m])
;
TQL EVAL (
timestamp '2026-01-23 03:38:00+00',
timestamp '2026-01-23 03:44:00+00',
'1m'
)
sum by (tag_a, ts) (
sum_over_time(metric_min{tag_a!=""}[2m])
);
DROP TABLE metric_min;
DROP TABLE phy_metric_min;