fix: promql histogram with aggregation (#7393)

* fix: promql histogram with aggregation

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update test constructors

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* sqlness tests

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update sqlness result

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* redact partition number

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2025-12-12 15:32:04 +08:00
committed by GitHub
parent baffed8c6a
commit cbfdeca64c
5 changed files with 250 additions and 57 deletions

View File

@@ -36,8 +36,8 @@ use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion::physical_plan::expressions::{CastExpr as PhyCast, Column as PhyColumn};
use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use datafusion::physical_plan::{
DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, PhysicalExpr,
PlanProperties, RecordBatchStream, SendableRecordBatchStream,
DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties,
Partitioning, PhysicalExpr, PlanProperties, RecordBatchStream, SendableRecordBatchStream,
};
use datafusion::prelude::{Column, Expr};
use datatypes::prelude::{ConcreteDataType, DataType as GtDataType};
@@ -180,10 +180,33 @@ impl HistogramFold {
.index_of_column_by_name(None, &self.ts_column)
.unwrap();
let tag_columns = exec_input
.schema()
.fields()
.iter()
.enumerate()
.filter_map(|(idx, field)| {
if idx == le_column_index || idx == field_column_index || idx == ts_column_index {
None
} else {
Some(Arc::new(PhyColumn::new(field.name(), idx)) as _)
}
})
.collect::<Vec<_>>();
let mut partition_exprs = tag_columns.clone();
partition_exprs.push(Arc::new(PhyColumn::new(
self.input.schema().field(ts_column_index).name(),
ts_column_index,
)) as _);
let output_schema: SchemaRef = self.output_schema.inner().clone();
let properties = PlanProperties::new(
EquivalenceProperties::new(output_schema.clone()),
Partitioning::UnknownPartitioning(1),
Partitioning::Hash(
partition_exprs.clone(),
exec_input.output_partitioning().partition_count(),
),
EmissionType::Incremental,
Boundedness::Bounded,
);
@@ -192,6 +215,8 @@ impl HistogramFold {
field_column_index,
ts_column_index,
input: exec_input,
tag_columns,
partition_exprs,
quantile: self.quantile.into(),
output_schema,
metric: ExecutionPlanMetricsSet::new(),
@@ -253,6 +278,9 @@ pub struct HistogramFoldExec {
/// Index for field column in the schema of input.
field_column_index: usize,
ts_column_index: usize,
/// Tag columns are all columns except `le`, `field` and `ts` columns.
tag_columns: Vec<Arc<dyn PhysicalExpr>>,
partition_exprs: Vec<Arc<dyn PhysicalExpr>>,
quantile: f64,
metric: ExecutionPlanMetricsSet,
properties: PlanProperties,
@@ -269,10 +297,10 @@ impl ExecutionPlan for HistogramFoldExec {
fn required_input_ordering(&self) -> Vec<Option<OrderingRequirements>> {
let mut cols = self
.tag_col_exprs()
.into_iter()
.tag_columns
.iter()
.map(|expr| PhysicalSortRequirement {
expr,
expr: expr.clone(),
options: None,
})
.collect::<Vec<PhysicalSortRequirement>>();
@@ -307,7 +335,7 @@ impl ExecutionPlan for HistogramFoldExec {
}
fn required_input_distribution(&self) -> Vec<Distribution> {
self.input.required_input_distribution()
vec![Distribution::HashPartitioned(self.partition_exprs.clone())]
}
fn maintains_input_order(&self) -> Vec<bool> {
@@ -324,15 +352,27 @@ impl ExecutionPlan for HistogramFoldExec {
children: Vec<Arc<dyn ExecutionPlan>>,
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
assert!(!children.is_empty());
let new_input = children[0].clone();
let properties = PlanProperties::new(
EquivalenceProperties::new(self.output_schema.clone()),
Partitioning::Hash(
self.partition_exprs.clone(),
new_input.output_partitioning().partition_count(),
),
EmissionType::Incremental,
Boundedness::Bounded,
);
Ok(Arc::new(Self {
input: children[0].clone(),
input: new_input,
metric: self.metric.clone(),
le_column_index: self.le_column_index,
ts_column_index: self.ts_column_index,
tag_columns: self.tag_columns.clone(),
partition_exprs: self.partition_exprs.clone(),
quantile: self.quantile,
output_schema: self.output_schema.clone(),
field_column_index: self.field_column_index,
properties: self.properties.clone(),
properties,
}))
}
@@ -394,30 +434,6 @@ impl ExecutionPlan for HistogramFoldExec {
}
}
impl HistogramFoldExec {
/// Return all the [PhysicalExpr] of tag columns in order.
///
/// Tag columns are all columns except `le`, `field` and `ts` columns.
pub fn tag_col_exprs(&self) -> Vec<Arc<dyn PhysicalExpr>> {
self.input
.schema()
.fields()
.iter()
.enumerate()
.filter_map(|(idx, field)| {
if idx == self.le_column_index
|| idx == self.field_column_index
|| idx == self.ts_column_index
{
None
} else {
Some(Arc::new(PhyColumn::new(field.name(), idx)) as _)
}
})
.collect()
}
}
impl DisplayAs for HistogramFoldExec {
fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match t {
@@ -1051,40 +1067,83 @@ mod test {
quantile: f64,
ts_column_index: usize,
) -> Arc<HistogramFoldExec> {
let memory_exec = Arc::new(DataSourceExec::new(Arc::new(
let input: Arc<dyn ExecutionPlan> = Arc::new(DataSourceExec::new(Arc::new(
MemorySourceConfig::try_new(&[batches], schema.clone(), None).unwrap(),
)));
let output_schema: SchemaRef = Arc::new(
HistogramFold::convert_schema(
&Arc::new(memory_exec.schema().to_dfschema().unwrap()),
"le",
)
.unwrap()
.as_arrow()
.clone(),
);
let properties = PlanProperties::new(
EquivalenceProperties::new(output_schema.clone()),
Partitioning::UnknownPartitioning(1),
EmissionType::Incremental,
Boundedness::Bounded,
HistogramFold::convert_schema(&Arc::new(input.schema().to_dfschema().unwrap()), "le")
.unwrap()
.as_arrow()
.clone(),
);
let (tag_columns, partition_exprs, properties) =
build_test_plan_properties(&input, output_schema.clone(), ts_column_index);
Arc::new(HistogramFoldExec {
le_column_index: 1,
field_column_index: 2,
quantile,
ts_column_index,
input: memory_exec,
input,
output_schema,
tag_columns,
partition_exprs,
metric: ExecutionPlanMetricsSet::new(),
properties,
})
}
type PlanPropsResult = (
Vec<Arc<dyn PhysicalExpr>>,
Vec<Arc<dyn PhysicalExpr>>,
PlanProperties,
);
fn build_test_plan_properties(
input: &Arc<dyn ExecutionPlan>,
output_schema: SchemaRef,
ts_column_index: usize,
) -> PlanPropsResult {
let tag_columns = input
.schema()
.fields()
.iter()
.enumerate()
.filter_map(|(idx, field)| {
if idx == 1 || idx == 2 || idx == ts_column_index {
None
} else {
Some(Arc::new(PhyColumn::new(field.name(), idx)) as _)
}
})
.collect::<Vec<_>>();
let partition_exprs = if tag_columns.is_empty() {
vec![Arc::new(PhyColumn::new(
input.schema().field(ts_column_index).name(),
ts_column_index,
)) as _]
} else {
tag_columns.clone()
};
let properties = PlanProperties::new(
EquivalenceProperties::new(output_schema.clone()),
Partitioning::Hash(
partition_exprs.clone(),
input.output_partitioning().partition_count(),
),
EmissionType::Incremental,
Boundedness::Bounded,
);
(tag_columns, partition_exprs, properties)
}
#[tokio::test]
async fn fold_overall() {
let memory_exec = Arc::new(prepare_test_data());
let memory_exec: Arc<dyn ExecutionPlan> = Arc::new(prepare_test_data());
let output_schema: SchemaRef = Arc::new(
HistogramFold::convert_schema(
&Arc::new(memory_exec.schema().to_dfschema().unwrap()),
@@ -1094,19 +1153,17 @@ mod test {
.as_arrow()
.clone(),
);
let properties = PlanProperties::new(
EquivalenceProperties::new(output_schema.clone()),
Partitioning::UnknownPartitioning(1),
EmissionType::Incremental,
Boundedness::Bounded,
);
let (tag_columns, partition_exprs, properties) =
build_test_plan_properties(&memory_exec, output_schema.clone(), 0);
let fold_exec = Arc::new(HistogramFoldExec {
le_column_index: 1,
field_column_index: 2,
quantile: 0.4,
ts_column_index: 9999, // not exist but doesn't matter
ts_column_index: 0,
input: memory_exec,
output_schema,
tag_columns,
partition_exprs,
metric: ExecutionPlanMetricsSet::new(),
properties,
});

View File

@@ -0,0 +1,91 @@
-- Minimal repro for histogram quantile over multi-partition input.
create table histogram_gap_bucket (
ts timestamp time index,
le string,
shard string,
val double,
primary key (shard, le)
) partition on columns (shard) (
shard < 'n',
shard >= 'n'
);
Affected Rows: 0
insert into histogram_gap_bucket values
(0, '0.5', 'a', 1),
(0, '1', 'a', 2),
(0, '+Inf', 'a', 2),
(0, '0.5', 'z', 2),
(0, '1', 'z', 4),
(0, '+Inf', 'z', 4),
(10000, '0.5', 'a', 1),
(10000, '1', 'a', 2),
(10000, '+Inf', 'a', 2),
(10000, '0.5', 'z', 1),
(10000, '1', 'z', 3),
(10000, '+Inf', 'z', 3);
Affected Rows: 12
-- Ensure the physical plan keeps the required repartition/order before folding buckets.
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE Hash\(\[ts@1\],.* Hash([ts@1],REDACTED
-- SQLNESS REPLACE Hash\(\[le@0,\sts@1\],.* Hash([le@0, ts@1],REDACTED
tql analyze (0, 10, '10s') histogram_quantile(0.5, sum by (le) (histogram_gap_bucket));
+-+-+-+
| stage | node | plan_|
+-+-+-+
| 0_| 0_|_HistogramFoldExec: le=@0, field=@2, quantile=0.5 REDACTED
|_|_|_SortExec: expr=[ts@1 ASC NULLS LAST, CAST(le@0 AS Float64) ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_RepartitionExec: partitioning=Hash([ts@1],REDACTED
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[le@0 as le, ts@1 as ts], aggr=[sum(histogram_gap_bucket.val)] REDACTED
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_RepartitionExec: partitioning=Hash([le@0, ts@1],REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[le@0 as le, ts@1 as ts], aggr=[sum(histogram_gap_bucket.val)] REDACTED
|_|_|_CooperativeExec REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_AggregateExec: mode=FinalPartitioned, gby=[le@0 as le, ts@1 as ts], aggr=[__sum_state(histogram_gap_bucket.val)] REDACTED
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_RepartitionExec: partitioning=Hash([le@0, ts@1],REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[le@1 as le, ts@0 as ts], aggr=[__sum_state(histogram_gap_bucket.val)] REDACTED
|_|_|_ProjectionExec: expr=[ts@0 as ts, le@1 as le, val@3 as val] REDACTED
|_|_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[10000], time index=[ts] REDACTED
|_|_|_PromSeriesDivideExec: tags=["shard", "le"] REDACTED
|_|_|_CooperativeExec REDACTED
|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED
|_|_|_|
| 1_| 1_|_AggregateExec: mode=FinalPartitioned, gby=[le@0 as le, ts@1 as ts], aggr=[__sum_state(histogram_gap_bucket.val)] REDACTED
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_RepartitionExec: partitioning=Hash([le@0, ts@1],REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[le@1 as le, ts@0 as ts], aggr=[__sum_state(histogram_gap_bucket.val)] REDACTED
|_|_|_ProjectionExec: expr=[ts@0 as ts, le@1 as le, val@3 as val] REDACTED
|_|_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[10000], time index=[ts] REDACTED
|_|_|_PromSeriesDivideExec: tags=["shard", "le"] REDACTED
|_|_|_CooperativeExec REDACTED
|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED
|_|_|_|
|_|_| Total rows: 2_|
+-+-+-+
-- SQLNESS SORT_RESULT 2 1
tql eval (0, 10, '10s') histogram_quantile(0.5, sum by (le) (histogram_gap_bucket));
+---------------------+-------------------------------+
| ts | sum(histogram_gap_bucket.val) |
+---------------------+-------------------------------+
| 1970-01-01T00:00:00 | 0.5 |
| 1970-01-01T00:00:10 | 0.5833333333333334 |
+---------------------+-------------------------------+
drop table histogram_gap_bucket;
Affected Rows: 0

View File

@@ -0,0 +1,40 @@
-- Minimal repro for histogram quantile over multi-partition input.
create table histogram_gap_bucket (
ts timestamp time index,
le string,
shard string,
val double,
primary key (shard, le)
) partition on columns (shard) (
shard < 'n',
shard >= 'n'
);
insert into histogram_gap_bucket values
(0, '0.5', 'a', 1),
(0, '1', 'a', 2),
(0, '+Inf', 'a', 2),
(0, '0.5', 'z', 2),
(0, '1', 'z', 4),
(0, '+Inf', 'z', 4),
(10000, '0.5', 'a', 1),
(10000, '1', 'a', 2),
(10000, '+Inf', 'a', 2),
(10000, '0.5', 'z', 1),
(10000, '1', 'z', 3),
(10000, '+Inf', 'z', 3);
-- Ensure the physical plan keeps the required repartition/order before folding buckets.
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE Hash\(\[ts@1\],.* Hash([ts@1],REDACTED
-- SQLNESS REPLACE Hash\(\[le@0,\sts@1\],.* Hash([le@0, ts@1],REDACTED
tql analyze (0, 10, '10s') histogram_quantile(0.5, sum by (le) (histogram_gap_bucket));
-- SQLNESS SORT_RESULT 2 1
tql eval (0, 10, '10s') histogram_quantile(0.5, sum by (le) (histogram_gap_bucket));
drop table histogram_gap_bucket;

View File

@@ -319,6 +319,7 @@ insert into histogram4_bucket values
Affected Rows: 7
-- SQLNESS SORT_RESULT 3 1
tql eval (2900, 3000, '100s') histogram_quantile(0.9, histogram4_bucket);
+---------------------+---+-----+
@@ -332,6 +333,7 @@ drop table histogram4_bucket;
Affected Rows: 0
-- SQLNESS SORT_RESULT 3 1
tql eval(0, 10, '10s') histogram_quantile(0.99, sum by(pod,instance, fff) (rate(greptime_servers_postgres_query_elapsed_bucket{instance=~"xxx"}[1m])));
++
@@ -395,6 +397,7 @@ insert into histogram5_bucket values
Affected Rows: 12
-- SQLNESS SORT_RESULT 3 1
tql eval (3000, 3015, '3s') histogram_quantile(0.5, histogram5_bucket);
+---------------------+---+--------------------+

View File

@@ -184,10 +184,12 @@ insert into histogram4_bucket values
-- INF here is missing
;
-- SQLNESS SORT_RESULT 3 1
tql eval (2900, 3000, '100s') histogram_quantile(0.9, histogram4_bucket);
drop table histogram4_bucket;
-- SQLNESS SORT_RESULT 3 1
tql eval(0, 10, '10s') histogram_quantile(0.99, sum by(pod,instance, fff) (rate(greptime_servers_postgres_query_elapsed_bucket{instance=~"xxx"}[1m])));
-- test case where table exists but doesn't have 'le' column should raise error
@@ -233,7 +235,7 @@ insert into histogram5_bucket values
(3015000, "5", "a", 30),
(3015000, "+Inf", "a", 50);
-- SQLNESS SORT_RESULT 3 1
tql eval (3000, 3015, '3s') histogram_quantile(0.5, histogram5_bucket);
drop table histogram5_bucket;