perf: skip unnecessary label columns in promql query (#7602)

* feat: fully install tsid to promql planner

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* sqlness tests

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove unused field

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2026-01-21 21:31:23 +08:00
committed by GitHub
parent c34e9970e7
commit cc1dbd108b
3 changed files with 304 additions and 6 deletions

View File

@@ -44,10 +44,13 @@ use datafusion::prelude as df_prelude;
use datafusion::prelude::{Column, Expr as DfExpr, JoinType};
use datafusion::scalar::ScalarValue;
use datafusion::sql::TableReference;
use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRewriter};
use datafusion_common::{DFSchema, NullEquality};
use datafusion_expr::expr::WindowFunctionParams;
use datafusion_expr::utils::conjunction;
use datafusion_expr::{ExprSchemable, Literal, SortExpr, TableSource, col, lit};
use datafusion_expr::{
ExprSchemable, Literal, Projection, SortExpr, TableScan, TableSource, col, lit,
};
use datatypes::arrow::datatypes::{DataType as ArrowDataType, TimeUnit as ArrowTimeUnit};
use datatypes::data_type::ConcreteDataType;
use itertools::Itertools;
@@ -353,18 +356,55 @@ impl PromPlanner {
param,
} = aggr_expr;
let input = self.prom_expr_to_plan(expr, query_engine_state).await?;
let mut input = self.prom_expr_to_plan(expr, query_engine_state).await?;
let input_has_tsid = input.schema().fields().iter().any(|field| {
field.name() == DATA_SCHEMA_TSID_COLUMN_NAME
&& field.data_type() == &ArrowDataType::UInt64
});
// `__tsid` based scan projection may prune tag columns. Ensure tags referenced in
// aggregation modifiers (`by`/`without`) are available before planning group keys.
let required_group_tags = match modifier {
None => BTreeSet::new(),
Some(LabelModifier::Include(labels)) => labels
.labels
.iter()
.filter(|label| !is_metric_engine_internal_column(label.as_str()))
.cloned()
.collect(),
Some(LabelModifier::Exclude(labels)) => {
let mut all_tags = self.collect_row_key_tag_columns_from_plan(&input)?;
for label in &labels.labels {
let _ = all_tags.remove(label);
}
all_tags
}
};
if !required_group_tags.is_empty()
&& required_group_tags
.iter()
.any(|tag| Self::find_case_sensitive_column(input.schema(), tag.as_str()).is_none())
{
input = self.ensure_tag_columns_available(input, &required_group_tags)?;
self.refresh_tag_columns_from_schema(input.schema());
}
match (*op).id() {
token::T_TOPK | token::T_BOTTOMK => {
self.prom_topk_bottomk_to_plan(aggr_expr, input).await
}
_ => {
let input_tag_columns = self.ctx.tag_columns.clone();
// When `__tsid` is available, tag columns may have been pruned from the input plan.
// For `keep_tsid` decision we should compare against the full row-key label set,
// otherwise we may incorrectly treat label-reducing aggregates as preserving labels.
let input_tag_columns = if input_has_tsid {
self.collect_row_key_tag_columns_from_plan(&input)?
.into_iter()
.collect::<Vec<_>>()
} else {
self.ctx.tag_columns.clone()
};
// calculate columns to group by
// Need to append time index column into group by columns
let mut group_exprs = self.agg_modifier_to_col(input.schema(), modifier, true)?;
@@ -1613,6 +1653,25 @@ impl PromPlanner {
.is_some_and(|col| matches!(col.data_type, ConcreteDataType::UInt64(_)));
self.ctx.use_tsid = use_tsid;
let all_table_tags = self.ctx.tag_columns.clone();
let scan_tag_columns = if use_tsid {
let mut scan_tags = self.ctx.tag_columns.clone();
for matcher in &self.ctx.selector_matcher {
if is_metric_engine_internal_column(&matcher.name) {
continue;
}
if all_table_tags.iter().any(|tag| tag == &matcher.name) {
scan_tags.push(matcher.name.clone());
}
}
scan_tags.sort_unstable();
scan_tags.dedup();
scan_tags
} else {
self.ctx.tag_columns.clone()
};
let is_time_index_ms = scan_table
.schema()
.timestamp_column()
@@ -1630,7 +1689,7 @@ impl PromPlanner {
table: scan_table_ref.to_quoted_string(),
}
})?);
for col in &self.ctx.tag_columns {
for col in &scan_tag_columns {
required_columns.insert(col.clone());
}
for col in &self.ctx.field_columns {
@@ -1676,7 +1735,11 @@ impl PromPlanner {
let expr: Vec<_> = self
.create_field_column_exprs()?
.into_iter()
.chain(self.create_tag_column_exprs()?)
.chain(
scan_tag_columns
.iter()
.map(|tag| DfExpr::Column(Column::from_name(tag))),
)
.chain(
self.ctx
.use_tsid
@@ -1713,7 +1776,11 @@ impl PromPlanner {
let project_exprs = self
.create_field_column_exprs()?
.into_iter()
.chain(self.create_tag_column_exprs()?)
.chain(
scan_tag_columns
.iter()
.map(|tag| DfExpr::Column(Column::from_name(tag))),
)
.chain(
self.ctx
.use_tsid
@@ -1738,6 +1805,163 @@ impl PromPlanner {
Ok(result)
}
fn collect_row_key_tag_columns_from_plan(
&self,
plan: &LogicalPlan,
) -> Result<BTreeSet<String>> {
fn walk(
planner: &PromPlanner,
plan: &LogicalPlan,
out: &mut BTreeSet<String>,
) -> Result<()> {
if let LogicalPlan::TableScan(scan) = plan {
let table = planner.table_from_source(&scan.source)?;
for col in table.table_info().meta.row_key_column_names() {
if col != DATA_SCHEMA_TABLE_ID_COLUMN_NAME
&& col != DATA_SCHEMA_TSID_COLUMN_NAME
&& !is_metric_engine_internal_column(col)
{
out.insert(col.clone());
}
}
}
for input in plan.inputs() {
walk(planner, input, out)?;
}
Ok(())
}
let mut out = BTreeSet::new();
walk(self, plan, &mut out)?;
Ok(out)
}
fn ensure_tag_columns_available(
&self,
plan: LogicalPlan,
required_tags: &BTreeSet<String>,
) -> Result<LogicalPlan> {
if required_tags.is_empty() {
return Ok(plan);
}
struct Rewriter {
required_tags: BTreeSet<String>,
}
impl TreeNodeRewriter for Rewriter {
type Node = LogicalPlan;
fn f_up(
&mut self,
node: Self::Node,
) -> datafusion_common::Result<Transformed<Self::Node>> {
match node {
LogicalPlan::TableScan(scan) => {
let schema = scan.source.schema();
let mut projection = match scan.projection.clone() {
Some(p) => p,
None => {
// Scanning all columns already covers required tags.
return Ok(Transformed::no(LogicalPlan::TableScan(scan)));
}
};
let mut changed = false;
for tag in &self.required_tags {
if let Some((idx, _)) = schema
.fields()
.iter()
.enumerate()
.find(|(_, field)| field.name() == tag)
&& !projection.contains(&idx)
{
projection.push(idx);
changed = true;
}
}
if !changed {
return Ok(Transformed::no(LogicalPlan::TableScan(scan)));
}
projection.sort_unstable();
projection.dedup();
let new_scan = TableScan::try_new(
scan.table_name.clone(),
scan.source.clone(),
Some(projection),
scan.filters,
scan.fetch,
)?;
Ok(Transformed::yes(LogicalPlan::TableScan(new_scan)))
}
LogicalPlan::Projection(proj) => {
let input_schema = proj.input.schema();
let existing = proj
.schema
.fields()
.iter()
.map(|f| f.name().as_str())
.collect::<HashSet<_>>();
let mut expr = proj.expr.clone();
let mut has_changed = false;
for tag in &self.required_tags {
if existing.contains(tag.as_str()) {
continue;
}
if let Some(idx) = input_schema.index_of_column_by_name(None, tag) {
expr.push(DfExpr::Column(Column::from(
input_schema.qualified_field(idx),
)));
has_changed = true;
}
}
if !has_changed {
return Ok(Transformed::no(LogicalPlan::Projection(proj)));
}
let new_proj = Projection::try_new(expr, proj.input)?;
Ok(Transformed::yes(LogicalPlan::Projection(new_proj)))
}
other => Ok(Transformed::no(other)),
}
}
}
let mut rewriter = Rewriter {
required_tags: required_tags.clone(),
};
let rewritten = plan
.rewrite(&mut rewriter)
.context(DataFusionPlanningSnafu)?;
Ok(rewritten.data)
}
fn refresh_tag_columns_from_schema(&mut self, schema: &DFSchemaRef) {
let time_index = self.ctx.time_index_column.as_deref();
let field_columns = self.ctx.field_columns.iter().collect::<HashSet<_>>();
let mut tags = schema
.fields()
.iter()
.map(|f| f.name())
.filter(|name| Some(name.as_str()) != time_index)
.filter(|name| !field_columns.contains(name))
.filter(|name| !is_metric_engine_internal_column(name))
.cloned()
.collect::<Vec<_>>();
tags.sort_unstable();
tags.dedup();
self.ctx.tag_columns = tags;
}
/// Setup [PromPlannerContext]'s state fields.
///
/// Returns a logical plan for an empty metric.

View File

@@ -97,6 +97,71 @@ TQL ANALYZE (0, 10, '5s') sum by (job, instance) (tsid_metric);
|_|_| Total rows: 6_|
+-+-+-+
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
TQL ANALYZE (0, 10, '5s') sum(irate(tsid_metric[1h])) / scalar(count(count(tsid_metric) by (job)));
+-+-+-+
| stage | node | plan_|
+-+-+-+
| 0_| 0_|_ProjectionExec: expr=[ts@1 as ts, sum(prom_irate(ts_range,val))@2 / scalar(count(count(tsid_metric.val)))@0 as lhs.sum(prom_irate(ts_range,val)) / rhs.scalar(count(count(tsid_metric.val)))] REDACTED
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_REDACTED
|_|_|_ScalarCalculateExec: tags=[] REDACTED
|_|_|_CoalescePartitionsExec REDACTED
|_|_|_CooperativeExec REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_CooperativeExec REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_SortPreservingMergeExec: [ts@0 ASC NULLS LAST] REDACTED
|_|_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[count(count(tsid_metric.val))] REDACTED
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[count(count(tsid_metric.val))] REDACTED
|_|_|_ProjectionExec: expr=[ts@1 as ts, count(tsid_metric.val)@2 as count(tsid_metric.val)] REDACTED
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[job@0 as job, ts@1 as ts], aggr=[count(tsid_metric.val)] REDACTED
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[job@1 as job, ts@2 as ts], aggr=[count(tsid_metric.val)] REDACTED
|_|_|_ProjectionExec: expr=[val@0 as val, job@1 as job, ts@3 as ts] REDACTED
|_|_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[ts] REDACTED
|_|_|_PromSeriesDivideExec: tags=["__tsid"] REDACTED
|_|_|_SortExec: expr=[__tsid@2 ASC, ts@3 ASC], preserve_partitioning=[true] REDACTED
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_ProjectionExec: expr=[val@1 as val, job@3 as job, __tsid@2 as __tsid, ts@0 as ts] REDACTED
|_|_|_CooperativeExec REDACTED
|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0} REDACTED
|_|_|_|
| 1_| 0_|_SortPreservingMergeExec: [ts@0 ASC NULLS LAST] REDACTED
|_|_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[sum(prom_irate(ts_range,val))] REDACTED
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[sum(prom_irate(ts_range,val))] REDACTED
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_FilterExec: prom_irate(ts_range,val)@1 IS NOT NULL REDACTED
|_|_|_ProjectionExec: expr=[ts@2 as ts, prom_irate(ts_range@3, val@0) as prom_irate(ts_range,val)] REDACTED
|_|_|_PromRangeManipulateExec: req range=[0..10000], interval=[5000], eval range=[3600000], time index=[ts] REDACTED
|_|_|_PromSeriesNormalizeExec: offset=[0], time index=[ts], filter NaN: [true] REDACTED
|_|_|_PromSeriesDivideExec: tags=["__tsid"] REDACTED
|_|_|_SortExec: expr=[__tsid@1 ASC, ts@2 ASC], preserve_partitioning=[true] REDACTED
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_ProjectionExec: expr=[val@1 as val, __tsid@2 as __tsid, ts@0 as ts] REDACTED
|_|_|_CooperativeExec REDACTED
|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0} REDACTED
|_|_|_|
|_|_| Total rows: 2_|
+-+-+-+
DROP TABLE tsid_metric;
Affected Rows: 0

View File

@@ -42,6 +42,15 @@ TQL ANALYZE (0, 10, '5s') sum(tsid_metric);
-- SQLNESS REPLACE (Hash.*) REDACTED
TQL ANALYZE (0, 10, '5s') sum by (job, instance) (tsid_metric);
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
TQL ANALYZE (0, 10, '5s') sum(irate(tsid_metric[1h])) / scalar(count(count(tsid_metric) by (job)));
DROP TABLE tsid_metric;
DROP TABLE tsid_physical;