From cbfdeca64cd806a9179a2fd744dc24850aaa8cc0 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Fri, 12 Dec 2025 15:32:04 +0800 Subject: [PATCH] fix: promql histogram with aggregation (#7393) * fix: promql histogram with aggregation Signed-off-by: Ruihang Xia * update test constructors Signed-off-by: Ruihang Xia * sqlness tests Signed-off-by: Ruihang Xia * update sqlness result Signed-off-by: Ruihang Xia * redact partition number Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia --- .../src/extension_plan/histogram_fold.rs | 169 ++++++++++++------ .../promql/histogram_multi_partition.result | 91 ++++++++++ .../promql/histogram_multi_partition.sql | 40 +++++ .../common/promql/simple_histogram.result | 3 + .../common/promql/simple_histogram.sql | 4 +- 5 files changed, 250 insertions(+), 57 deletions(-) create mode 100644 tests/cases/standalone/common/promql/histogram_multi_partition.result create mode 100644 tests/cases/standalone/common/promql/histogram_multi_partition.sql diff --git a/src/promql/src/extension_plan/histogram_fold.rs b/src/promql/src/extension_plan/histogram_fold.rs index 7c657e6c58..f4637e36f0 100644 --- a/src/promql/src/extension_plan/histogram_fold.rs +++ b/src/promql/src/extension_plan/histogram_fold.rs @@ -36,8 +36,8 @@ use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; use datafusion::physical_plan::expressions::{CastExpr as PhyCast, Column as PhyColumn}; use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use datafusion::physical_plan::{ - DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, PhysicalExpr, - PlanProperties, RecordBatchStream, SendableRecordBatchStream, + DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties, + Partitioning, PhysicalExpr, PlanProperties, RecordBatchStream, SendableRecordBatchStream, }; use datafusion::prelude::{Column, Expr}; use datatypes::prelude::{ConcreteDataType, DataType as GtDataType}; @@ -180,10 +180,33 @@ impl HistogramFold { .index_of_column_by_name(None, &self.ts_column) .unwrap(); + let tag_columns = exec_input + .schema() + .fields() + .iter() + .enumerate() + .filter_map(|(idx, field)| { + if idx == le_column_index || idx == field_column_index || idx == ts_column_index { + None + } else { + Some(Arc::new(PhyColumn::new(field.name(), idx)) as _) + } + }) + .collect::>(); + + let mut partition_exprs = tag_columns.clone(); + partition_exprs.push(Arc::new(PhyColumn::new( + self.input.schema().field(ts_column_index).name(), + ts_column_index, + )) as _); + let output_schema: SchemaRef = self.output_schema.inner().clone(); let properties = PlanProperties::new( EquivalenceProperties::new(output_schema.clone()), - Partitioning::UnknownPartitioning(1), + Partitioning::Hash( + partition_exprs.clone(), + exec_input.output_partitioning().partition_count(), + ), EmissionType::Incremental, Boundedness::Bounded, ); @@ -192,6 +215,8 @@ impl HistogramFold { field_column_index, ts_column_index, input: exec_input, + tag_columns, + partition_exprs, quantile: self.quantile.into(), output_schema, metric: ExecutionPlanMetricsSet::new(), @@ -253,6 +278,9 @@ pub struct HistogramFoldExec { /// Index for field column in the schema of input. field_column_index: usize, ts_column_index: usize, + /// Tag columns are all columns except `le`, `field` and `ts` columns. + tag_columns: Vec>, + partition_exprs: Vec>, quantile: f64, metric: ExecutionPlanMetricsSet, properties: PlanProperties, @@ -269,10 +297,10 @@ impl ExecutionPlan for HistogramFoldExec { fn required_input_ordering(&self) -> Vec> { let mut cols = self - .tag_col_exprs() - .into_iter() + .tag_columns + .iter() .map(|expr| PhysicalSortRequirement { - expr, + expr: expr.clone(), options: None, }) .collect::>(); @@ -307,7 +335,7 @@ impl ExecutionPlan for HistogramFoldExec { } fn required_input_distribution(&self) -> Vec { - self.input.required_input_distribution() + vec![Distribution::HashPartitioned(self.partition_exprs.clone())] } fn maintains_input_order(&self) -> Vec { @@ -324,15 +352,27 @@ impl ExecutionPlan for HistogramFoldExec { children: Vec>, ) -> DataFusionResult> { assert!(!children.is_empty()); + let new_input = children[0].clone(); + let properties = PlanProperties::new( + EquivalenceProperties::new(self.output_schema.clone()), + Partitioning::Hash( + self.partition_exprs.clone(), + new_input.output_partitioning().partition_count(), + ), + EmissionType::Incremental, + Boundedness::Bounded, + ); Ok(Arc::new(Self { - input: children[0].clone(), + input: new_input, metric: self.metric.clone(), le_column_index: self.le_column_index, ts_column_index: self.ts_column_index, + tag_columns: self.tag_columns.clone(), + partition_exprs: self.partition_exprs.clone(), quantile: self.quantile, output_schema: self.output_schema.clone(), field_column_index: self.field_column_index, - properties: self.properties.clone(), + properties, })) } @@ -394,30 +434,6 @@ impl ExecutionPlan for HistogramFoldExec { } } -impl HistogramFoldExec { - /// Return all the [PhysicalExpr] of tag columns in order. - /// - /// Tag columns are all columns except `le`, `field` and `ts` columns. - pub fn tag_col_exprs(&self) -> Vec> { - self.input - .schema() - .fields() - .iter() - .enumerate() - .filter_map(|(idx, field)| { - if idx == self.le_column_index - || idx == self.field_column_index - || idx == self.ts_column_index - { - None - } else { - Some(Arc::new(PhyColumn::new(field.name(), idx)) as _) - } - }) - .collect() - } -} - impl DisplayAs for HistogramFoldExec { fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result { match t { @@ -1051,40 +1067,83 @@ mod test { quantile: f64, ts_column_index: usize, ) -> Arc { - let memory_exec = Arc::new(DataSourceExec::new(Arc::new( + let input: Arc = Arc::new(DataSourceExec::new(Arc::new( MemorySourceConfig::try_new(&[batches], schema.clone(), None).unwrap(), ))); let output_schema: SchemaRef = Arc::new( - HistogramFold::convert_schema( - &Arc::new(memory_exec.schema().to_dfschema().unwrap()), - "le", - ) - .unwrap() - .as_arrow() - .clone(), - ); - let properties = PlanProperties::new( - EquivalenceProperties::new(output_schema.clone()), - Partitioning::UnknownPartitioning(1), - EmissionType::Incremental, - Boundedness::Bounded, + HistogramFold::convert_schema(&Arc::new(input.schema().to_dfschema().unwrap()), "le") + .unwrap() + .as_arrow() + .clone(), ); + let (tag_columns, partition_exprs, properties) = + build_test_plan_properties(&input, output_schema.clone(), ts_column_index); + Arc::new(HistogramFoldExec { le_column_index: 1, field_column_index: 2, quantile, ts_column_index, - input: memory_exec, + input, output_schema, + tag_columns, + partition_exprs, metric: ExecutionPlanMetricsSet::new(), properties, }) } + type PlanPropsResult = ( + Vec>, + Vec>, + PlanProperties, + ); + + fn build_test_plan_properties( + input: &Arc, + output_schema: SchemaRef, + ts_column_index: usize, + ) -> PlanPropsResult { + let tag_columns = input + .schema() + .fields() + .iter() + .enumerate() + .filter_map(|(idx, field)| { + if idx == 1 || idx == 2 || idx == ts_column_index { + None + } else { + Some(Arc::new(PhyColumn::new(field.name(), idx)) as _) + } + }) + .collect::>(); + + let partition_exprs = if tag_columns.is_empty() { + vec![Arc::new(PhyColumn::new( + input.schema().field(ts_column_index).name(), + ts_column_index, + )) as _] + } else { + tag_columns.clone() + }; + + let properties = PlanProperties::new( + EquivalenceProperties::new(output_schema.clone()), + Partitioning::Hash( + partition_exprs.clone(), + input.output_partitioning().partition_count(), + ), + EmissionType::Incremental, + Boundedness::Bounded, + ); + + (tag_columns, partition_exprs, properties) + } + #[tokio::test] async fn fold_overall() { - let memory_exec = Arc::new(prepare_test_data()); + let memory_exec: Arc = Arc::new(prepare_test_data()); let output_schema: SchemaRef = Arc::new( HistogramFold::convert_schema( &Arc::new(memory_exec.schema().to_dfschema().unwrap()), @@ -1094,19 +1153,17 @@ mod test { .as_arrow() .clone(), ); - let properties = PlanProperties::new( - EquivalenceProperties::new(output_schema.clone()), - Partitioning::UnknownPartitioning(1), - EmissionType::Incremental, - Boundedness::Bounded, - ); + let (tag_columns, partition_exprs, properties) = + build_test_plan_properties(&memory_exec, output_schema.clone(), 0); let fold_exec = Arc::new(HistogramFoldExec { le_column_index: 1, field_column_index: 2, quantile: 0.4, - ts_column_index: 9999, // not exist but doesn't matter + ts_column_index: 0, input: memory_exec, output_schema, + tag_columns, + partition_exprs, metric: ExecutionPlanMetricsSet::new(), properties, }); diff --git a/tests/cases/standalone/common/promql/histogram_multi_partition.result b/tests/cases/standalone/common/promql/histogram_multi_partition.result new file mode 100644 index 0000000000..c0b146ecd6 --- /dev/null +++ b/tests/cases/standalone/common/promql/histogram_multi_partition.result @@ -0,0 +1,91 @@ +-- Minimal repro for histogram quantile over multi-partition input. +create table histogram_gap_bucket ( + ts timestamp time index, + le string, + shard string, + val double, + primary key (shard, le) +) partition on columns (shard) ( + shard < 'n', + shard >= 'n' +); + +Affected Rows: 0 + +insert into histogram_gap_bucket values + (0, '0.5', 'a', 1), + (0, '1', 'a', 2), + (0, '+Inf', 'a', 2), + (0, '0.5', 'z', 2), + (0, '1', 'z', 4), + (0, '+Inf', 'z', 4), + (10000, '0.5', 'a', 1), + (10000, '1', 'a', 2), + (10000, '+Inf', 'a', 2), + (10000, '0.5', 'z', 1), + (10000, '1', 'z', 3), + (10000, '+Inf', 'z', 3); + +Affected Rows: 12 + +-- Ensure the physical plan keeps the required repartition/order before folding buckets. +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE Hash\(\[ts@1\],.* Hash([ts@1],REDACTED +-- SQLNESS REPLACE Hash\(\[le@0,\sts@1\],.* Hash([le@0, ts@1],REDACTED +tql analyze (0, 10, '10s') histogram_quantile(0.5, sum by (le) (histogram_gap_bucket)); + ++-+-+-+ +| stage | node | plan_| ++-+-+-+ +| 0_| 0_|_HistogramFoldExec: le=@0, field=@2, quantile=0.5 REDACTED +|_|_|_SortExec: expr=[ts@1 ASC NULLS LAST, CAST(le@0 AS Float64) ASC NULLS LAST], preserve_partitioning=[true] REDACTED +|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED +|_|_|_RepartitionExec: partitioning=Hash([ts@1],REDACTED +|_|_|_AggregateExec: mode=FinalPartitioned, gby=[le@0 as le, ts@1 as ts], aggr=[sum(histogram_gap_bucket.val)] REDACTED +|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED +|_|_|_RepartitionExec: partitioning=Hash([le@0, ts@1],REDACTED +|_|_|_AggregateExec: mode=Partial, gby=[le@0 as le, ts@1 as ts], aggr=[sum(histogram_gap_bucket.val)] REDACTED +|_|_|_CooperativeExec REDACTED +|_|_|_MergeScanExec: REDACTED +|_|_|_| +| 1_| 0_|_AggregateExec: mode=FinalPartitioned, gby=[le@0 as le, ts@1 as ts], aggr=[__sum_state(histogram_gap_bucket.val)] REDACTED +|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED +|_|_|_RepartitionExec: partitioning=Hash([le@0, ts@1],REDACTED +|_|_|_AggregateExec: mode=Partial, gby=[le@1 as le, ts@0 as ts], aggr=[__sum_state(histogram_gap_bucket.val)] REDACTED +|_|_|_ProjectionExec: expr=[ts@0 as ts, le@1 as le, val@3 as val] REDACTED +|_|_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[10000], time index=[ts] REDACTED +|_|_|_PromSeriesDivideExec: tags=["shard", "le"] REDACTED +|_|_|_CooperativeExec REDACTED +|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED +|_|_|_| +| 1_| 1_|_AggregateExec: mode=FinalPartitioned, gby=[le@0 as le, ts@1 as ts], aggr=[__sum_state(histogram_gap_bucket.val)] REDACTED +|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED +|_|_|_RepartitionExec: partitioning=Hash([le@0, ts@1],REDACTED +|_|_|_AggregateExec: mode=Partial, gby=[le@1 as le, ts@0 as ts], aggr=[__sum_state(histogram_gap_bucket.val)] REDACTED +|_|_|_ProjectionExec: expr=[ts@0 as ts, le@1 as le, val@3 as val] REDACTED +|_|_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[10000], time index=[ts] REDACTED +|_|_|_PromSeriesDivideExec: tags=["shard", "le"] REDACTED +|_|_|_CooperativeExec REDACTED +|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED +|_|_|_| +|_|_| Total rows: 2_| ++-+-+-+ + +-- SQLNESS SORT_RESULT 2 1 +tql eval (0, 10, '10s') histogram_quantile(0.5, sum by (le) (histogram_gap_bucket)); + ++---------------------+-------------------------------+ +| ts | sum(histogram_gap_bucket.val) | ++---------------------+-------------------------------+ +| 1970-01-01T00:00:00 | 0.5 | +| 1970-01-01T00:00:10 | 0.5833333333333334 | ++---------------------+-------------------------------+ + +drop table histogram_gap_bucket; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/promql/histogram_multi_partition.sql b/tests/cases/standalone/common/promql/histogram_multi_partition.sql new file mode 100644 index 0000000000..b360999fcf --- /dev/null +++ b/tests/cases/standalone/common/promql/histogram_multi_partition.sql @@ -0,0 +1,40 @@ +-- Minimal repro for histogram quantile over multi-partition input. +create table histogram_gap_bucket ( + ts timestamp time index, + le string, + shard string, + val double, + primary key (shard, le) +) partition on columns (shard) ( + shard < 'n', + shard >= 'n' +); + +insert into histogram_gap_bucket values + (0, '0.5', 'a', 1), + (0, '1', 'a', 2), + (0, '+Inf', 'a', 2), + (0, '0.5', 'z', 2), + (0, '1', 'z', 4), + (0, '+Inf', 'z', 4), + (10000, '0.5', 'a', 1), + (10000, '1', 'a', 2), + (10000, '+Inf', 'a', 2), + (10000, '0.5', 'z', 1), + (10000, '1', 'z', 3), + (10000, '+Inf', 'z', 3); + +-- Ensure the physical plan keeps the required repartition/order before folding buckets. +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE Hash\(\[ts@1\],.* Hash([ts@1],REDACTED +-- SQLNESS REPLACE Hash\(\[le@0,\sts@1\],.* Hash([le@0, ts@1],REDACTED +tql analyze (0, 10, '10s') histogram_quantile(0.5, sum by (le) (histogram_gap_bucket)); + +-- SQLNESS SORT_RESULT 2 1 +tql eval (0, 10, '10s') histogram_quantile(0.5, sum by (le) (histogram_gap_bucket)); + +drop table histogram_gap_bucket; diff --git a/tests/cases/standalone/common/promql/simple_histogram.result b/tests/cases/standalone/common/promql/simple_histogram.result index 60a1d42b68..39baf6ea82 100644 --- a/tests/cases/standalone/common/promql/simple_histogram.result +++ b/tests/cases/standalone/common/promql/simple_histogram.result @@ -319,6 +319,7 @@ insert into histogram4_bucket values Affected Rows: 7 +-- SQLNESS SORT_RESULT 3 1 tql eval (2900, 3000, '100s') histogram_quantile(0.9, histogram4_bucket); +---------------------+---+-----+ @@ -332,6 +333,7 @@ drop table histogram4_bucket; Affected Rows: 0 +-- SQLNESS SORT_RESULT 3 1 tql eval(0, 10, '10s') histogram_quantile(0.99, sum by(pod,instance, fff) (rate(greptime_servers_postgres_query_elapsed_bucket{instance=~"xxx"}[1m]))); ++ @@ -395,6 +397,7 @@ insert into histogram5_bucket values Affected Rows: 12 +-- SQLNESS SORT_RESULT 3 1 tql eval (3000, 3015, '3s') histogram_quantile(0.5, histogram5_bucket); +---------------------+---+--------------------+ diff --git a/tests/cases/standalone/common/promql/simple_histogram.sql b/tests/cases/standalone/common/promql/simple_histogram.sql index daeae79254..e6b07aaca6 100644 --- a/tests/cases/standalone/common/promql/simple_histogram.sql +++ b/tests/cases/standalone/common/promql/simple_histogram.sql @@ -184,10 +184,12 @@ insert into histogram4_bucket values -- INF here is missing ; +-- SQLNESS SORT_RESULT 3 1 tql eval (2900, 3000, '100s') histogram_quantile(0.9, histogram4_bucket); drop table histogram4_bucket; +-- SQLNESS SORT_RESULT 3 1 tql eval(0, 10, '10s') histogram_quantile(0.99, sum by(pod,instance, fff) (rate(greptime_servers_postgres_query_elapsed_bucket{instance=~"xxx"}[1m]))); -- test case where table exists but doesn't have 'le' column should raise error @@ -233,7 +235,7 @@ insert into histogram5_bucket values (3015000, "5", "a", 30), (3015000, "+Inf", "a", 50); - +-- SQLNESS SORT_RESULT 3 1 tql eval (3000, 3015, '3s') histogram_quantile(0.5, histogram5_bucket); drop table histogram5_bucket;