diff --git a/src/query/src/promql/planner.rs b/src/query/src/promql/planner.rs index cc0febb89d..1a5e12d1ac 100644 --- a/src/query/src/promql/planner.rs +++ b/src/query/src/promql/planner.rs @@ -44,10 +44,13 @@ use datafusion::prelude as df_prelude; use datafusion::prelude::{Column, Expr as DfExpr, JoinType}; use datafusion::scalar::ScalarValue; use datafusion::sql::TableReference; +use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRewriter}; use datafusion_common::{DFSchema, NullEquality}; use datafusion_expr::expr::WindowFunctionParams; use datafusion_expr::utils::conjunction; -use datafusion_expr::{ExprSchemable, Literal, SortExpr, TableSource, col, lit}; +use datafusion_expr::{ + ExprSchemable, Literal, Projection, SortExpr, TableScan, TableSource, col, lit, +}; use datatypes::arrow::datatypes::{DataType as ArrowDataType, TimeUnit as ArrowTimeUnit}; use datatypes::data_type::ConcreteDataType; use itertools::Itertools; @@ -353,18 +356,55 @@ impl PromPlanner { param, } = aggr_expr; - let input = self.prom_expr_to_plan(expr, query_engine_state).await?; + let mut input = self.prom_expr_to_plan(expr, query_engine_state).await?; let input_has_tsid = input.schema().fields().iter().any(|field| { field.name() == DATA_SCHEMA_TSID_COLUMN_NAME && field.data_type() == &ArrowDataType::UInt64 }); + // `__tsid` based scan projection may prune tag columns. Ensure tags referenced in + // aggregation modifiers (`by`/`without`) are available before planning group keys. + let required_group_tags = match modifier { + None => BTreeSet::new(), + Some(LabelModifier::Include(labels)) => labels + .labels + .iter() + .filter(|label| !is_metric_engine_internal_column(label.as_str())) + .cloned() + .collect(), + Some(LabelModifier::Exclude(labels)) => { + let mut all_tags = self.collect_row_key_tag_columns_from_plan(&input)?; + for label in &labels.labels { + let _ = all_tags.remove(label); + } + all_tags + } + }; + + if !required_group_tags.is_empty() + && required_group_tags + .iter() + .any(|tag| Self::find_case_sensitive_column(input.schema(), tag.as_str()).is_none()) + { + input = self.ensure_tag_columns_available(input, &required_group_tags)?; + self.refresh_tag_columns_from_schema(input.schema()); + } + match (*op).id() { token::T_TOPK | token::T_BOTTOMK => { self.prom_topk_bottomk_to_plan(aggr_expr, input).await } _ => { - let input_tag_columns = self.ctx.tag_columns.clone(); + // When `__tsid` is available, tag columns may have been pruned from the input plan. + // For `keep_tsid` decision we should compare against the full row-key label set, + // otherwise we may incorrectly treat label-reducing aggregates as preserving labels. + let input_tag_columns = if input_has_tsid { + self.collect_row_key_tag_columns_from_plan(&input)? + .into_iter() + .collect::>() + } else { + self.ctx.tag_columns.clone() + }; // calculate columns to group by // Need to append time index column into group by columns let mut group_exprs = self.agg_modifier_to_col(input.schema(), modifier, true)?; @@ -1613,6 +1653,25 @@ impl PromPlanner { .is_some_and(|col| matches!(col.data_type, ConcreteDataType::UInt64(_))); self.ctx.use_tsid = use_tsid; + let all_table_tags = self.ctx.tag_columns.clone(); + + let scan_tag_columns = if use_tsid { + let mut scan_tags = self.ctx.tag_columns.clone(); + for matcher in &self.ctx.selector_matcher { + if is_metric_engine_internal_column(&matcher.name) { + continue; + } + if all_table_tags.iter().any(|tag| tag == &matcher.name) { + scan_tags.push(matcher.name.clone()); + } + } + scan_tags.sort_unstable(); + scan_tags.dedup(); + scan_tags + } else { + self.ctx.tag_columns.clone() + }; + let is_time_index_ms = scan_table .schema() .timestamp_column() @@ -1630,7 +1689,7 @@ impl PromPlanner { table: scan_table_ref.to_quoted_string(), } })?); - for col in &self.ctx.tag_columns { + for col in &scan_tag_columns { required_columns.insert(col.clone()); } for col in &self.ctx.field_columns { @@ -1676,7 +1735,11 @@ impl PromPlanner { let expr: Vec<_> = self .create_field_column_exprs()? .into_iter() - .chain(self.create_tag_column_exprs()?) + .chain( + scan_tag_columns + .iter() + .map(|tag| DfExpr::Column(Column::from_name(tag))), + ) .chain( self.ctx .use_tsid @@ -1713,7 +1776,11 @@ impl PromPlanner { let project_exprs = self .create_field_column_exprs()? .into_iter() - .chain(self.create_tag_column_exprs()?) + .chain( + scan_tag_columns + .iter() + .map(|tag| DfExpr::Column(Column::from_name(tag))), + ) .chain( self.ctx .use_tsid @@ -1738,6 +1805,163 @@ impl PromPlanner { Ok(result) } + fn collect_row_key_tag_columns_from_plan( + &self, + plan: &LogicalPlan, + ) -> Result> { + fn walk( + planner: &PromPlanner, + plan: &LogicalPlan, + out: &mut BTreeSet, + ) -> Result<()> { + if let LogicalPlan::TableScan(scan) = plan { + let table = planner.table_from_source(&scan.source)?; + for col in table.table_info().meta.row_key_column_names() { + if col != DATA_SCHEMA_TABLE_ID_COLUMN_NAME + && col != DATA_SCHEMA_TSID_COLUMN_NAME + && !is_metric_engine_internal_column(col) + { + out.insert(col.clone()); + } + } + } + + for input in plan.inputs() { + walk(planner, input, out)?; + } + Ok(()) + } + + let mut out = BTreeSet::new(); + walk(self, plan, &mut out)?; + Ok(out) + } + + fn ensure_tag_columns_available( + &self, + plan: LogicalPlan, + required_tags: &BTreeSet, + ) -> Result { + if required_tags.is_empty() { + return Ok(plan); + } + + struct Rewriter { + required_tags: BTreeSet, + } + + impl TreeNodeRewriter for Rewriter { + type Node = LogicalPlan; + + fn f_up( + &mut self, + node: Self::Node, + ) -> datafusion_common::Result> { + match node { + LogicalPlan::TableScan(scan) => { + let schema = scan.source.schema(); + let mut projection = match scan.projection.clone() { + Some(p) => p, + None => { + // Scanning all columns already covers required tags. + return Ok(Transformed::no(LogicalPlan::TableScan(scan))); + } + }; + + let mut changed = false; + for tag in &self.required_tags { + if let Some((idx, _)) = schema + .fields() + .iter() + .enumerate() + .find(|(_, field)| field.name() == tag) + && !projection.contains(&idx) + { + projection.push(idx); + changed = true; + } + } + + if !changed { + return Ok(Transformed::no(LogicalPlan::TableScan(scan))); + } + + projection.sort_unstable(); + projection.dedup(); + + let new_scan = TableScan::try_new( + scan.table_name.clone(), + scan.source.clone(), + Some(projection), + scan.filters, + scan.fetch, + )?; + Ok(Transformed::yes(LogicalPlan::TableScan(new_scan))) + } + LogicalPlan::Projection(proj) => { + let input_schema = proj.input.schema(); + + let existing = proj + .schema + .fields() + .iter() + .map(|f| f.name().as_str()) + .collect::>(); + + let mut expr = proj.expr.clone(); + let mut has_changed = false; + for tag in &self.required_tags { + if existing.contains(tag.as_str()) { + continue; + } + + if let Some(idx) = input_schema.index_of_column_by_name(None, tag) { + expr.push(DfExpr::Column(Column::from( + input_schema.qualified_field(idx), + ))); + has_changed = true; + } + } + + if !has_changed { + return Ok(Transformed::no(LogicalPlan::Projection(proj))); + } + + let new_proj = Projection::try_new(expr, proj.input)?; + Ok(Transformed::yes(LogicalPlan::Projection(new_proj))) + } + other => Ok(Transformed::no(other)), + } + } + } + + let mut rewriter = Rewriter { + required_tags: required_tags.clone(), + }; + let rewritten = plan + .rewrite(&mut rewriter) + .context(DataFusionPlanningSnafu)?; + Ok(rewritten.data) + } + + fn refresh_tag_columns_from_schema(&mut self, schema: &DFSchemaRef) { + let time_index = self.ctx.time_index_column.as_deref(); + let field_columns = self.ctx.field_columns.iter().collect::>(); + + let mut tags = schema + .fields() + .iter() + .map(|f| f.name()) + .filter(|name| Some(name.as_str()) != time_index) + .filter(|name| !field_columns.contains(name)) + .filter(|name| !is_metric_engine_internal_column(name)) + .cloned() + .collect::>(); + tags.sort_unstable(); + tags.dedup(); + self.ctx.tag_columns = tags; + } + /// Setup [PromPlannerContext]'s state fields. /// /// Returns a logical plan for an empty metric. diff --git a/tests/cases/standalone/tql-explain-analyze/tsid_column.result b/tests/cases/standalone/tql-explain-analyze/tsid_column.result index d816c8eb16..a13efa138e 100644 --- a/tests/cases/standalone/tql-explain-analyze/tsid_column.result +++ b/tests/cases/standalone/tql-explain-analyze/tsid_column.result @@ -97,6 +97,71 @@ TQL ANALYZE (0, 10, '5s') sum by (job, instance) (tsid_metric); |_|_| Total rows: 6_| +-+-+-+ +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +-- SQLNESS REPLACE (Hash.*) REDACTED +TQL ANALYZE (0, 10, '5s') sum(irate(tsid_metric[1h])) / scalar(count(count(tsid_metric) by (job))); + ++-+-+-+ +| stage | node | plan_| ++-+-+-+ +| 0_| 0_|_ProjectionExec: expr=[ts@1 as ts, sum(prom_irate(ts_range,val))@2 / scalar(count(count(tsid_metric.val)))@0 as lhs.sum(prom_irate(ts_range,val)) / rhs.scalar(count(count(tsid_metric.val)))] REDACTED +|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED +|_|_|_REDACTED +|_|_|_ScalarCalculateExec: tags=[] REDACTED +|_|_|_CoalescePartitionsExec REDACTED +|_|_|_CooperativeExec REDACTED +|_|_|_MergeScanExec: REDACTED +|_|_|_CooperativeExec REDACTED +|_|_|_MergeScanExec: REDACTED +|_|_|_| +| 1_| 0_|_SortPreservingMergeExec: [ts@0 ASC NULLS LAST] REDACTED +|_|_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED +|_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[count(count(tsid_metric.val))] REDACTED +|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED +|_|_|_RepartitionExec: partitioning=REDACTED +|_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[count(count(tsid_metric.val))] REDACTED +|_|_|_ProjectionExec: expr=[ts@1 as ts, count(tsid_metric.val)@2 as count(tsid_metric.val)] REDACTED +|_|_|_AggregateExec: mode=FinalPartitioned, gby=[job@0 as job, ts@1 as ts], aggr=[count(tsid_metric.val)] REDACTED +|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED +|_|_|_RepartitionExec: partitioning=REDACTED +|_|_|_AggregateExec: mode=Partial, gby=[job@1 as job, ts@2 as ts], aggr=[count(tsid_metric.val)] REDACTED +|_|_|_ProjectionExec: expr=[val@0 as val, job@1 as job, ts@3 as ts] REDACTED +|_|_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[ts] REDACTED +|_|_|_PromSeriesDivideExec: tags=["__tsid"] REDACTED +|_|_|_SortExec: expr=[__tsid@2 ASC, ts@3 ASC], preserve_partitioning=[true] REDACTED +|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED +|_|_|_RepartitionExec: partitioning=REDACTED +|_|_|_ProjectionExec: expr=[val@1 as val, job@3 as job, __tsid@2 as __tsid, ts@0 as ts] REDACTED +|_|_|_CooperativeExec REDACTED +|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0} REDACTED +|_|_|_| +| 1_| 0_|_SortPreservingMergeExec: [ts@0 ASC NULLS LAST] REDACTED +|_|_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED +|_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[sum(prom_irate(ts_range,val))] REDACTED +|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED +|_|_|_RepartitionExec: partitioning=REDACTED +|_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[sum(prom_irate(ts_range,val))] REDACTED +|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED +|_|_|_FilterExec: prom_irate(ts_range,val)@1 IS NOT NULL REDACTED +|_|_|_ProjectionExec: expr=[ts@2 as ts, prom_irate(ts_range@3, val@0) as prom_irate(ts_range,val)] REDACTED +|_|_|_PromRangeManipulateExec: req range=[0..10000], interval=[5000], eval range=[3600000], time index=[ts] REDACTED +|_|_|_PromSeriesNormalizeExec: offset=[0], time index=[ts], filter NaN: [true] REDACTED +|_|_|_PromSeriesDivideExec: tags=["__tsid"] REDACTED +|_|_|_SortExec: expr=[__tsid@1 ASC, ts@2 ASC], preserve_partitioning=[true] REDACTED +|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED +|_|_|_RepartitionExec: partitioning=REDACTED +|_|_|_ProjectionExec: expr=[val@1 as val, __tsid@2 as __tsid, ts@0 as ts] REDACTED +|_|_|_CooperativeExec REDACTED +|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0} REDACTED +|_|_|_| +|_|_| Total rows: 2_| ++-+-+-+ + DROP TABLE tsid_metric; Affected Rows: 0 diff --git a/tests/cases/standalone/tql-explain-analyze/tsid_column.sql b/tests/cases/standalone/tql-explain-analyze/tsid_column.sql index 5fd35505d5..7b3de23f33 100644 --- a/tests/cases/standalone/tql-explain-analyze/tsid_column.sql +++ b/tests/cases/standalone/tql-explain-analyze/tsid_column.sql @@ -42,6 +42,15 @@ TQL ANALYZE (0, 10, '5s') sum(tsid_metric); -- SQLNESS REPLACE (Hash.*) REDACTED TQL ANALYZE (0, 10, '5s') sum by (job, instance) (tsid_metric); +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +-- SQLNESS REPLACE (Hash.*) REDACTED +TQL ANALYZE (0, 10, '5s') sum(irate(tsid_metric[1h])) / scalar(count(count(tsid_metric) by (job))); + DROP TABLE tsid_metric; DROP TABLE tsid_physical;