diff --git a/src/query/src/query_engine/state.rs b/src/query/src/query_engine/state.rs index f696c8b53e..0052c98063 100644 --- a/src/query/src/query_engine/state.rs +++ b/src/query/src/query_engine/state.rs @@ -49,7 +49,9 @@ use datafusion_optimizer::Analyzer; use datafusion_optimizer::analyzer::function_rewrite::ApplyFunctionRewrites; use datafusion_optimizer::optimizer::Optimizer; use partition::manager::PartitionRuleManagerRef; -use promql::extension_plan::PromExtensionPlanner; +use promql::extension_plan::{ + InstantManipulate, PromExtensionPlanner, RangeManipulate, SeriesDivide, SeriesNormalize, +}; use table::TableRef; use table::table::adapter::DfTableProviderAdapter; @@ -435,12 +437,50 @@ impl QueryPlanner for DfQueryPlanner { logical_plan: &DfLogicalPlan, session_state: &SessionState, ) -> DfResult> { + let scoped_session_state; + let session_state = if should_disable_repartitioned_aggregations(logical_plan) { + scoped_session_state = { + let mut session_state = session_state.clone(); + *session_state.config_mut() = session_state + .config() + .clone() + .with_repartition_aggregations(false); + session_state + }; + &scoped_session_state + } else { + session_state + }; + self.physical_planner .create_physical_plan(logical_plan, session_state) .await } } +fn should_disable_repartitioned_aggregations(plan: &DfLogicalPlan) -> bool { + match plan { + DfLogicalPlan::Aggregate(aggregate) => contains_promql_vector_node(&aggregate.input), + _ => plan + .inputs() + .into_iter() + .any(should_disable_repartitioned_aggregations), + } +} + +fn contains_promql_vector_node(plan: &DfLogicalPlan) -> bool { + match plan { + DfLogicalPlan::Extension(extension) => { + let node = extension.node.as_any(); + node.is::() + || node.is::() + || node.is::() + || node.is::() + } + _ => plan.inputs().into_iter().any(contains_promql_vector_node), + } +} + /// MySQL-compatible scalar function aliases: (target_name, alias) const SCALAR_FUNCTION_ALIASES: &[(&str, &str)] = &[ ("upper", "ucase"), diff --git a/tests/cases/distributed/explain/step_aggr_advance.result b/tests/cases/distributed/explain/step_aggr_advance.result index 5938fa202d..1e060ec310 100644 --- a/tests/cases/distributed/explain/step_aggr_advance.result +++ b/tests/cases/distributed/explain/step_aggr_advance.result @@ -57,10 +57,9 @@ tql analyze (1752591864, 1752592164, '30s') max by (a, b, c) (max_over_time(aggr | 0_| 0_|_SortPreservingMergeExec: [a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST, greptime_timestamp@3 ASC NULLS LAST] REDACTED |_|_|_MergeScanExec: REDACTED |_|_|_| -| 1_| 0_|_SortPreservingMergeExec: [a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST, greptime_timestamp@3 ASC NULLS LAST] REDACTED -|_|_|_SortExec: expr=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST, greptime_timestamp@3 ASC NULLS LAST], preserve_partitioning=[true] REDACTED -|_|_|_AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b, c@2 as c, greptime_timestamp@3 as greptime_timestamp], aggr=[max(prom_max_over_time(greptime_timestamp_range,greptime_value))] REDACTED -|_|_|_RepartitionExec: partitioning=REDACTED +| 1_| 0_|_SortExec: expr=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST, greptime_timestamp@3 ASC NULLS LAST], preserve_partitioning=[false] REDACTED +|_|_|_AggregateExec: mode=Final, gby=[a@0 as a, b@1 as b, c@2 as c, greptime_timestamp@3 as greptime_timestamp], aggr=[max(prom_max_over_time(greptime_timestamp_range,greptime_value))] REDACTED +|_|_|_CoalescePartitionsExec REDACTED |_|_|_AggregateExec: mode=Partial, gby=[a@2 as a, b@3 as b, c@4 as c, greptime_timestamp@0 as greptime_timestamp], aggr=[max(prom_max_over_time(greptime_timestamp_range,greptime_value))] REDACTED |_|_|_FilterExec: prom_max_over_time(greptime_timestamp_range,greptime_value)@1 IS NOT NULL REDACTED |_|_|_ProjectionExec: expr=[greptime_timestamp@4 as greptime_timestamp, prom_max_over_time(greptime_timestamp_range@6, greptime_value@5) as prom_max_over_time(greptime_timestamp_range,greptime_value), a@0 as a, b@1 as b, c@2 as c] REDACTED @@ -69,10 +68,9 @@ tql analyze (1752591864, 1752592164, '30s') max by (a, b, c) (max_over_time(aggr |_|_|_PromSeriesDivideExec: tags=["a", "b", "c", "d"] REDACTED |_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED |_|_|_| -| 1_| 1_|_SortPreservingMergeExec: [a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST, greptime_timestamp@3 ASC NULLS LAST] REDACTED -|_|_|_SortExec: expr=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST, greptime_timestamp@3 ASC NULLS LAST], preserve_partitioning=[true] REDACTED -|_|_|_AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b, c@2 as c, greptime_timestamp@3 as greptime_timestamp], aggr=[max(prom_max_over_time(greptime_timestamp_range,greptime_value))] REDACTED -|_|_|_RepartitionExec: partitioning=REDACTED +| 1_| 1_|_SortExec: expr=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST, greptime_timestamp@3 ASC NULLS LAST], preserve_partitioning=[false] REDACTED +|_|_|_AggregateExec: mode=Final, gby=[a@0 as a, b@1 as b, c@2 as c, greptime_timestamp@3 as greptime_timestamp], aggr=[max(prom_max_over_time(greptime_timestamp_range,greptime_value))] REDACTED +|_|_|_CoalescePartitionsExec REDACTED |_|_|_AggregateExec: mode=Partial, gby=[a@2 as a, b@3 as b, c@4 as c, greptime_timestamp@0 as greptime_timestamp], aggr=[max(prom_max_over_time(greptime_timestamp_range,greptime_value))] REDACTED |_|_|_FilterExec: prom_max_over_time(greptime_timestamp_range,greptime_value)@1 IS NOT NULL REDACTED |_|_|_ProjectionExec: expr=[greptime_timestamp@4 as greptime_timestamp, prom_max_over_time(greptime_timestamp_range@6, greptime_value@5) as prom_max_over_time(greptime_timestamp_range,greptime_value), a@0 as a, b@1 as b, c@2 as c] REDACTED @@ -130,8 +128,8 @@ tql analyze (1752591864, 1752592164, '30s') sum by (a, b) (max_over_time(aggr_op |_|_|_AggregateExec: mode=SinglePartitioned, gby=[a@0 as a, b@1 as b, greptime_timestamp@2 as greptime_timestamp], aggr=[sum(prom_max_over_time(greptime_timestamp_range,greptime_value))] REDACTED |_|_|_MergeScanExec: REDACTED |_|_|_| -| 1_| 0_|_AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b, greptime_timestamp@2 as greptime_timestamp], aggr=[__sum_state(prom_max_over_time(greptime_timestamp_range,greptime_value))] REDACTED -|_|_|_RepartitionExec: partitioning=REDACTED +| 1_| 0_|_AggregateExec: mode=Final, gby=[a@0 as a, b@1 as b, greptime_timestamp@2 as greptime_timestamp], aggr=[__sum_state(prom_max_over_time(greptime_timestamp_range,greptime_value))] REDACTED +|_|_|_CoalescePartitionsExec REDACTED |_|_|_AggregateExec: mode=Partial, gby=[a@2 as a, b@3 as b, greptime_timestamp@0 as greptime_timestamp], aggr=[__sum_state(prom_max_over_time(greptime_timestamp_range,greptime_value))] REDACTED |_|_|_FilterExec: prom_max_over_time(greptime_timestamp_range,greptime_value)@1 IS NOT NULL REDACTED |_|_|_ProjectionExec: expr=[greptime_timestamp@4 as greptime_timestamp, prom_max_over_time(greptime_timestamp_range@6, greptime_value@5) as prom_max_over_time(greptime_timestamp_range,greptime_value), a@0 as a, b@1 as b] REDACTED @@ -140,8 +138,8 @@ tql analyze (1752591864, 1752592164, '30s') sum by (a, b) (max_over_time(aggr_op |_|_|_PromSeriesDivideExec: tags=["a", "b", "c", "d"] REDACTED |_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED |_|_|_| -| 1_| 1_|_AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b, greptime_timestamp@2 as greptime_timestamp], aggr=[__sum_state(prom_max_over_time(greptime_timestamp_range,greptime_value))] REDACTED -|_|_|_RepartitionExec: partitioning=REDACTED +| 1_| 1_|_AggregateExec: mode=Final, gby=[a@0 as a, b@1 as b, greptime_timestamp@2 as greptime_timestamp], aggr=[__sum_state(prom_max_over_time(greptime_timestamp_range,greptime_value))] REDACTED +|_|_|_CoalescePartitionsExec REDACTED |_|_|_AggregateExec: mode=Partial, gby=[a@2 as a, b@3 as b, greptime_timestamp@0 as greptime_timestamp], aggr=[__sum_state(prom_max_over_time(greptime_timestamp_range,greptime_value))] REDACTED |_|_|_FilterExec: prom_max_over_time(greptime_timestamp_range,greptime_value)@1 IS NOT NULL REDACTED |_|_|_ProjectionExec: expr=[greptime_timestamp@4 as greptime_timestamp, prom_max_over_time(greptime_timestamp_range@6, greptime_value@5) as prom_max_over_time(greptime_timestamp_range,greptime_value), a@0 as a, b@1 as b] REDACTED @@ -199,8 +197,8 @@ tql analyze (1752591864, 1752592164, '30s') avg by (a) (max_over_time(aggr_optim |_|_|_AggregateExec: mode=SinglePartitioned, gby=[a@0 as a, greptime_timestamp@1 as greptime_timestamp], aggr=[avg(prom_max_over_time(greptime_timestamp_range,greptime_value))] REDACTED |_|_|_MergeScanExec: REDACTED |_|_|_| -| 1_| 0_|_AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, greptime_timestamp@1 as greptime_timestamp], aggr=[__avg_state(prom_max_over_time(greptime_timestamp_range,greptime_value))] REDACTED -|_|_|_RepartitionExec: partitioning=REDACTED +| 1_| 0_|_AggregateExec: mode=Final, gby=[a@0 as a, greptime_timestamp@1 as greptime_timestamp], aggr=[__avg_state(prom_max_over_time(greptime_timestamp_range,greptime_value))] REDACTED +|_|_|_CoalescePartitionsExec REDACTED |_|_|_AggregateExec: mode=Partial, gby=[a@2 as a, greptime_timestamp@0 as greptime_timestamp], aggr=[__avg_state(prom_max_over_time(greptime_timestamp_range,greptime_value))] REDACTED |_|_|_FilterExec: prom_max_over_time(greptime_timestamp_range,greptime_value)@1 IS NOT NULL REDACTED |_|_|_ProjectionExec: expr=[greptime_timestamp@4 as greptime_timestamp, prom_max_over_time(greptime_timestamp_range@6, greptime_value@5) as prom_max_over_time(greptime_timestamp_range,greptime_value), a@0 as a] REDACTED @@ -209,8 +207,8 @@ tql analyze (1752591864, 1752592164, '30s') avg by (a) (max_over_time(aggr_optim |_|_|_PromSeriesDivideExec: tags=["a", "b", "c", "d"] REDACTED |_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED |_|_|_| -| 1_| 1_|_AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, greptime_timestamp@1 as greptime_timestamp], aggr=[__avg_state(prom_max_over_time(greptime_timestamp_range,greptime_value))] REDACTED -|_|_|_RepartitionExec: partitioning=REDACTED +| 1_| 1_|_AggregateExec: mode=Final, gby=[a@0 as a, greptime_timestamp@1 as greptime_timestamp], aggr=[__avg_state(prom_max_over_time(greptime_timestamp_range,greptime_value))] REDACTED +|_|_|_CoalescePartitionsExec REDACTED |_|_|_AggregateExec: mode=Partial, gby=[a@2 as a, greptime_timestamp@0 as greptime_timestamp], aggr=[__avg_state(prom_max_over_time(greptime_timestamp_range,greptime_value))] REDACTED |_|_|_FilterExec: prom_max_over_time(greptime_timestamp_range,greptime_value)@1 IS NOT NULL REDACTED |_|_|_ProjectionExec: expr=[greptime_timestamp@4 as greptime_timestamp, prom_max_over_time(greptime_timestamp_range@6, greptime_value@5) as prom_max_over_time(greptime_timestamp_range,greptime_value), a@0 as a] REDACTED @@ -264,9 +262,10 @@ tql analyze (1752591864, 1752592164, '30s') count by (a, b, c, d) (max_over_time | 0_| 0_|_SortPreservingMergeExec: [a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST, d@3 ASC NULLS LAST, greptime_timestamp@4 ASC NULLS LAST] REDACTED |_|_|_MergeScanExec: REDACTED |_|_|_| -| 1_| 0_|_SortPreservingMergeExec: [a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST, d@3 ASC NULLS LAST, greptime_timestamp@4 ASC NULLS LAST] REDACTED -|_|_|_SortExec: expr=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST, d@3 ASC NULLS LAST, greptime_timestamp@4 ASC NULLS LAST], preserve_partitioning=[true] REDACTED -|_|_|_AggregateExec: mode=SinglePartitioned, gby=[a@2 as a, b@3 as b, c@4 as c, d@5 as d, greptime_timestamp@0 as greptime_timestamp], aggr=[count(prom_max_over_time(greptime_timestamp_range,greptime_value))] REDACTED +| 1_| 0_|_SortExec: expr=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST, d@3 ASC NULLS LAST, greptime_timestamp@4 ASC NULLS LAST], preserve_partitioning=[false] REDACTED +|_|_|_AggregateExec: mode=Final, gby=[a@0 as a, b@1 as b, c@2 as c, d@3 as d, greptime_timestamp@4 as greptime_timestamp], aggr=[count(prom_max_over_time(greptime_timestamp_range,greptime_value))] REDACTED +|_|_|_CoalescePartitionsExec REDACTED +|_|_|_AggregateExec: mode=Partial, gby=[a@2 as a, b@3 as b, c@4 as c, d@5 as d, greptime_timestamp@0 as greptime_timestamp], aggr=[count(prom_max_over_time(greptime_timestamp_range,greptime_value))] REDACTED |_|_|_FilterExec: prom_max_over_time(greptime_timestamp_range,greptime_value)@1 IS NOT NULL REDACTED |_|_|_ProjectionExec: expr=[greptime_timestamp@4 as greptime_timestamp, prom_max_over_time(greptime_timestamp_range@6, greptime_value@5) as prom_max_over_time(greptime_timestamp_range,greptime_value), a@0 as a, b@1 as b, c@2 as c, d@3 as d] REDACTED |_|_|_PromRangeManipulateExec: req range=[1752591864000..1752592164000], interval=[30000], eval range=[120000], time index=[greptime_timestamp] REDACTED @@ -274,9 +273,10 @@ tql analyze (1752591864, 1752592164, '30s') count by (a, b, c, d) (max_over_time |_|_|_PromSeriesDivideExec: tags=["a", "b", "c", "d"] REDACTED |_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED |_|_|_| -| 1_| 1_|_SortPreservingMergeExec: [a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST, d@3 ASC NULLS LAST, greptime_timestamp@4 ASC NULLS LAST] REDACTED -|_|_|_SortExec: expr=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST, d@3 ASC NULLS LAST, greptime_timestamp@4 ASC NULLS LAST], preserve_partitioning=[true] REDACTED -|_|_|_AggregateExec: mode=SinglePartitioned, gby=[a@2 as a, b@3 as b, c@4 as c, d@5 as d, greptime_timestamp@0 as greptime_timestamp], aggr=[count(prom_max_over_time(greptime_timestamp_range,greptime_value))] REDACTED +| 1_| 1_|_SortExec: expr=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST, d@3 ASC NULLS LAST, greptime_timestamp@4 ASC NULLS LAST], preserve_partitioning=[false] REDACTED +|_|_|_AggregateExec: mode=Final, gby=[a@0 as a, b@1 as b, c@2 as c, d@3 as d, greptime_timestamp@4 as greptime_timestamp], aggr=[count(prom_max_over_time(greptime_timestamp_range,greptime_value))] REDACTED +|_|_|_CoalescePartitionsExec REDACTED +|_|_|_AggregateExec: mode=Partial, gby=[a@2 as a, b@3 as b, c@4 as c, d@5 as d, greptime_timestamp@0 as greptime_timestamp], aggr=[count(prom_max_over_time(greptime_timestamp_range,greptime_value))] REDACTED |_|_|_FilterExec: prom_max_over_time(greptime_timestamp_range,greptime_value)@1 IS NOT NULL REDACTED |_|_|_ProjectionExec: expr=[greptime_timestamp@4 as greptime_timestamp, prom_max_over_time(greptime_timestamp_range@6, greptime_value@5) as prom_max_over_time(greptime_timestamp_range,greptime_value), a@0 as a, b@1 as b, c@2 as c, d@3 as d] REDACTED |_|_|_PromRangeManipulateExec: req range=[1752591864000..1752592164000], interval=[30000], eval range=[120000], time index=[greptime_timestamp] REDACTED @@ -333,8 +333,8 @@ tql analyze (1752591864, 1752592164, '30s') min by (b, c, d) (max_over_time(aggr |_|_|_AggregateExec: mode=SinglePartitioned, gby=[b@0 as b, c@1 as c, d@2 as d, greptime_timestamp@3 as greptime_timestamp], aggr=[min(prom_max_over_time(greptime_timestamp_range,greptime_value))] REDACTED |_|_|_MergeScanExec: REDACTED |_|_|_| -| 1_| 0_|_AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, c@1 as c, d@2 as d, greptime_timestamp@3 as greptime_timestamp], aggr=[__min_state(prom_max_over_time(greptime_timestamp_range,greptime_value))] REDACTED -|_|_|_RepartitionExec: partitioning=REDACTED +| 1_| 0_|_AggregateExec: mode=Final, gby=[b@0 as b, c@1 as c, d@2 as d, greptime_timestamp@3 as greptime_timestamp], aggr=[__min_state(prom_max_over_time(greptime_timestamp_range,greptime_value))] REDACTED +|_|_|_CoalescePartitionsExec REDACTED |_|_|_AggregateExec: mode=Partial, gby=[b@2 as b, c@3 as c, d@4 as d, greptime_timestamp@0 as greptime_timestamp], aggr=[__min_state(prom_max_over_time(greptime_timestamp_range,greptime_value))] REDACTED |_|_|_FilterExec: prom_max_over_time(greptime_timestamp_range,greptime_value)@1 IS NOT NULL REDACTED |_|_|_ProjectionExec: expr=[greptime_timestamp@4 as greptime_timestamp, prom_max_over_time(greptime_timestamp_range@6, greptime_value@5) as prom_max_over_time(greptime_timestamp_range,greptime_value), b@1 as b, c@2 as c, d@3 as d] REDACTED @@ -343,8 +343,8 @@ tql analyze (1752591864, 1752592164, '30s') min by (b, c, d) (max_over_time(aggr |_|_|_PromSeriesDivideExec: tags=["a", "b", "c", "d"] REDACTED |_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED |_|_|_| -| 1_| 1_|_AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, c@1 as c, d@2 as d, greptime_timestamp@3 as greptime_timestamp], aggr=[__min_state(prom_max_over_time(greptime_timestamp_range,greptime_value))] REDACTED -|_|_|_RepartitionExec: partitioning=REDACTED +| 1_| 1_|_AggregateExec: mode=Final, gby=[b@0 as b, c@1 as c, d@2 as d, greptime_timestamp@3 as greptime_timestamp], aggr=[__min_state(prom_max_over_time(greptime_timestamp_range,greptime_value))] REDACTED +|_|_|_CoalescePartitionsExec REDACTED |_|_|_AggregateExec: mode=Partial, gby=[b@2 as b, c@3 as c, d@4 as d, greptime_timestamp@0 as greptime_timestamp], aggr=[__min_state(prom_max_over_time(greptime_timestamp_range,greptime_value))] REDACTED |_|_|_FilterExec: prom_max_over_time(greptime_timestamp_range,greptime_value)@1 IS NOT NULL REDACTED |_|_|_ProjectionExec: expr=[greptime_timestamp@4 as greptime_timestamp, prom_max_over_time(greptime_timestamp_range@6, greptime_value@5) as prom_max_over_time(greptime_timestamp_range,greptime_value), b@1 as b, c@2 as c, d@3 as d] REDACTED @@ -403,16 +403,16 @@ tql analyze sum(aggr_optimize_not); |_|_|_AggregateExec: mode=Partial, gby=[greptime_timestamp@0 as greptime_timestamp], aggr=[sum(aggr_optimize_not.greptime_value)] REDACTED |_|_|_MergeScanExec: REDACTED |_|_|_| -| 1_| 0_|_AggregateExec: mode=FinalPartitioned, gby=[greptime_timestamp@0 as greptime_timestamp], aggr=[__sum_state(aggr_optimize_not.greptime_value)] REDACTED -|_|_|_RepartitionExec: partitioning=REDACTED +| 1_| 0_|_AggregateExec: mode=Final, gby=[greptime_timestamp@0 as greptime_timestamp], aggr=[__sum_state(aggr_optimize_not.greptime_value)] REDACTED +|_|_|_CoalescePartitionsExec REDACTED |_|_|_AggregateExec: mode=Partial, gby=[greptime_timestamp@0 as greptime_timestamp], aggr=[__sum_state(aggr_optimize_not.greptime_value)] REDACTED |_|_|_ProjectionExec: expr=[greptime_timestamp@4 as greptime_timestamp, greptime_value@5 as greptime_value] REDACTED |_|_|_PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[greptime_timestamp] REDACTED |_|_|_PromSeriesDivideExec: tags=["a", "b", "c", "d"] REDACTED |_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED |_|_|_| -| 1_| 1_|_AggregateExec: mode=FinalPartitioned, gby=[greptime_timestamp@0 as greptime_timestamp], aggr=[__sum_state(aggr_optimize_not.greptime_value)] REDACTED -|_|_|_RepartitionExec: partitioning=REDACTED +| 1_| 1_|_AggregateExec: mode=Final, gby=[greptime_timestamp@0 as greptime_timestamp], aggr=[__sum_state(aggr_optimize_not.greptime_value)] REDACTED +|_|_|_CoalescePartitionsExec REDACTED |_|_|_AggregateExec: mode=Partial, gby=[greptime_timestamp@0 as greptime_timestamp], aggr=[__sum_state(aggr_optimize_not.greptime_value)] REDACTED |_|_|_ProjectionExec: expr=[greptime_timestamp@4 as greptime_timestamp, greptime_value@5 as greptime_value] REDACTED |_|_|_PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[greptime_timestamp] REDACTED @@ -476,15 +476,16 @@ tql explain (1752591864, 1752592164, '30s') sum by (a, b, c) (rate(aggr_optimize | | ]] | | physical_plan | ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c, greptime_timestamp@3 as greptime_timestamp, sum(prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000)))@5 / sum(prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000)))@4 as aggr_optimize_not.sum(prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000))) / aggr_optimize_not_count.sum(prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000)))] | | | REDACTED -| | CoalescePartitionsExec | -| | AggregateExec: mode=SinglePartitioned, gby=[a@2 as a, b@3 as b, c@4 as c, greptime_timestamp@0 as greptime_timestamp], aggr=[sum(prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000)))] | -| | FilterExec: prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000))@1 IS NOT NULL | -| | ProjectionExec: expr=[greptime_timestamp@4 as greptime_timestamp, prom_rate(greptime_timestamp_range@6, greptime_value@5, greptime_timestamp@4, 120000) as prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000)), a@0 as a, b@1 as b, c@2 as c] | -| | PromRangeManipulateExec: req range=[1752591864000..1752592164000], interval=[30000], eval range=[120000], time index=[greptime_timestamp] | -| | PromSeriesNormalizeExec: offset=[0], time index=[greptime_timestamp], filter NaN: [true] | -| | PromSeriesDivideExec: tags=["a", "b", "c", "d"] | -| | SortExec: expr=[a@0 ASC, b@1 ASC, c@2 ASC, d@3 ASC, greptime_timestamp@4 ASC], preserve_partitioning=[true] | -| | MergeScanExec: REDACTED +| | AggregateExec: mode=Final, gby=[a@0 as a, b@1 as b, c@2 as c, greptime_timestamp@3 as greptime_timestamp], aggr=[sum(prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000)))] | +| | CoalescePartitionsExec | +| | AggregateExec: mode=Partial, gby=[a@2 as a, b@3 as b, c@4 as c, greptime_timestamp@0 as greptime_timestamp], aggr=[sum(prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000)))] | +| | FilterExec: prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000))@1 IS NOT NULL | +| | ProjectionExec: expr=[greptime_timestamp@4 as greptime_timestamp, prom_rate(greptime_timestamp_range@6, greptime_value@5, greptime_timestamp@4, 120000) as prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000)), a@0 as a, b@1 as b, c@2 as c] | +| | PromRangeManipulateExec: req range=[1752591864000..1752592164000], interval=[30000], eval range=[120000], time index=[greptime_timestamp] | +| | PromSeriesNormalizeExec: offset=[0], time index=[greptime_timestamp], filter NaN: [true] | +| | PromSeriesDivideExec: tags=["a", "b", "c", "d"] | +| | SortExec: expr=[a@0 ASC, b@1 ASC, c@2 ASC, d@3 ASC, greptime_timestamp@4 ASC], preserve_partitioning=[true] | +| | MergeScanExec: REDACTED | | SortExec: expr=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST, greptime_timestamp@3 ASC NULLS LAST], preserve_partitioning=[true] | | | CooperativeExec | | | MergeScanExec: REDACTED @@ -505,8 +506,9 @@ tql analyze (1752591864, 1752592164, '30s') sum by (a, b, c) (rate(aggr_optimize +-+-+-+ | 0_| 0_|_ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c, greptime_timestamp@3 as greptime_timestamp, sum(prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000)))@5 / sum(prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000)))@4 as aggr_optimize_not.sum(prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000))) / aggr_optimize_not_count.sum(prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000)))] REDACTED |_|_|_REDACTED +|_|_|_AggregateExec: mode=Final, gby=[a@0 as a, b@1 as b, c@2 as c, greptime_timestamp@3 as greptime_timestamp], aggr=[sum(prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000)))] REDACTED |_|_|_CoalescePartitionsExec REDACTED -|_|_|_AggregateExec: mode=SinglePartitioned, gby=[a@2 as a, b@3 as b, c@4 as c, greptime_timestamp@0 as greptime_timestamp], aggr=[sum(prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000)))] REDACTED +|_|_|_AggregateExec: mode=Partial, gby=[a@2 as a, b@3 as b, c@4 as c, greptime_timestamp@0 as greptime_timestamp], aggr=[sum(prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000)))] REDACTED |_|_|_FilterExec: prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000))@1 IS NOT NULL REDACTED |_|_|_ProjectionExec: expr=[greptime_timestamp@4 as greptime_timestamp, prom_rate(greptime_timestamp_range@6, greptime_value@5, greptime_timestamp@4, 120000) as prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000)), a@0 as a, b@1 as b, c@2 as c] REDACTED |_|_|_PromRangeManipulateExec: req range=[1752591864000..1752592164000], interval=[30000], eval range=[120000], time index=[greptime_timestamp] REDACTED @@ -525,10 +527,9 @@ tql analyze (1752591864, 1752592164, '30s') sum by (a, b, c) (rate(aggr_optimize |_|_|_SeqScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0} REDACTED |_|_|_| | 1_| 0_|_ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c, greptime_timestamp@3 as greptime_timestamp, sum(prom_rate(greptime_timestamp_range,greptime_value,aggr_optimize_not.greptime_timestamp,Int64(120000)))@4 as sum(prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000)))] REDACTED -|_|_|_SortPreservingMergeExec: [a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST, greptime_timestamp@3 ASC NULLS LAST] REDACTED -|_|_|_SortExec: expr=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST, greptime_timestamp@3 ASC NULLS LAST], preserve_partitioning=[true] REDACTED -|_|_|_AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b, c@2 as c, greptime_timestamp@3 as greptime_timestamp], aggr=[sum(prom_rate(greptime_timestamp_range,greptime_value,aggr_optimize_not.greptime_timestamp,Int64(120000)))] REDACTED -|_|_|_RepartitionExec: partitioning=REDACTED +|_|_|_SortExec: expr=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST, greptime_timestamp@3 ASC NULLS LAST], preserve_partitioning=[false] REDACTED +|_|_|_AggregateExec: mode=Final, gby=[a@0 as a, b@1 as b, c@2 as c, greptime_timestamp@3 as greptime_timestamp], aggr=[sum(prom_rate(greptime_timestamp_range,greptime_value,aggr_optimize_not.greptime_timestamp,Int64(120000)))] REDACTED +|_|_|_CoalescePartitionsExec REDACTED |_|_|_AggregateExec: mode=Partial, gby=[a@2 as a, b@3 as b, c@4 as c, greptime_timestamp@0 as greptime_timestamp], aggr=[sum(prom_rate(greptime_timestamp_range,greptime_value,aggr_optimize_not.greptime_timestamp,Int64(120000)))] REDACTED |_|_|_FilterExec: prom_rate(greptime_timestamp_range,greptime_value,aggr_optimize_not.greptime_timestamp,Int64(120000))@1 IS NOT NULL REDACTED |_|_|_ProjectionExec: expr=[greptime_timestamp@4 as greptime_timestamp, prom_rate(greptime_timestamp_range@6, greptime_value@5, greptime_timestamp@4, 120000) as prom_rate(greptime_timestamp_range,greptime_value,aggr_optimize_not.greptime_timestamp,Int64(120000)), a@0 as a, b@1 as b, c@2 as c] REDACTED @@ -538,10 +539,9 @@ tql analyze (1752591864, 1752592164, '30s') sum by (a, b, c) (rate(aggr_optimize |_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED |_|_|_| | 1_| 1_|_ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c, greptime_timestamp@3 as greptime_timestamp, sum(prom_rate(greptime_timestamp_range,greptime_value,aggr_optimize_not.greptime_timestamp,Int64(120000)))@4 as sum(prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000)))] REDACTED -|_|_|_SortPreservingMergeExec: [a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST, greptime_timestamp@3 ASC NULLS LAST] REDACTED -|_|_|_SortExec: expr=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST, greptime_timestamp@3 ASC NULLS LAST], preserve_partitioning=[true] REDACTED -|_|_|_AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b, c@2 as c, greptime_timestamp@3 as greptime_timestamp], aggr=[sum(prom_rate(greptime_timestamp_range,greptime_value,aggr_optimize_not.greptime_timestamp,Int64(120000)))] REDACTED -|_|_|_RepartitionExec: partitioning=REDACTED +|_|_|_SortExec: expr=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST, greptime_timestamp@3 ASC NULLS LAST], preserve_partitioning=[false] REDACTED +|_|_|_AggregateExec: mode=Final, gby=[a@0 as a, b@1 as b, c@2 as c, greptime_timestamp@3 as greptime_timestamp], aggr=[sum(prom_rate(greptime_timestamp_range,greptime_value,aggr_optimize_not.greptime_timestamp,Int64(120000)))] REDACTED +|_|_|_CoalescePartitionsExec REDACTED |_|_|_AggregateExec: mode=Partial, gby=[a@2 as a, b@3 as b, c@4 as c, greptime_timestamp@0 as greptime_timestamp], aggr=[sum(prom_rate(greptime_timestamp_range,greptime_value,aggr_optimize_not.greptime_timestamp,Int64(120000)))] REDACTED |_|_|_FilterExec: prom_rate(greptime_timestamp_range,greptime_value,aggr_optimize_not.greptime_timestamp,Int64(120000))@1 IS NOT NULL REDACTED |_|_|_ProjectionExec: expr=[greptime_timestamp@4 as greptime_timestamp, prom_rate(greptime_timestamp_range@6, greptime_value@5, greptime_timestamp@4, 120000) as prom_rate(greptime_timestamp_range,greptime_value,aggr_optimize_not.greptime_timestamp,Int64(120000)), a@0 as a, b@1 as b, c@2 as c] REDACTED diff --git a/tests/cases/standalone/common/promql/histogram_multi_partition.result b/tests/cases/standalone/common/promql/histogram_multi_partition.result index 091ed7bfcc..a88daddc39 100644 --- a/tests/cases/standalone/common/promql/histogram_multi_partition.result +++ b/tests/cases/standalone/common/promql/histogram_multi_partition.result @@ -49,16 +49,16 @@ tql analyze (0, 10, '10s') histogram_quantile(0.5, sum by (le) (histogram_gap_bu |_|_|_AggregateExec: mode=Partial, gby=[le@0 as le, ts@1 as ts], aggr=[sum(histogram_gap_bucket.val)] REDACTED |_|_|_MergeScanExec: REDACTED |_|_|_| -| 1_| 0_|_AggregateExec: mode=FinalPartitioned, gby=[le@0 as le, ts@1 as ts], aggr=[__sum_state(histogram_gap_bucket.val)] REDACTED -|_|_|_RepartitionExec: partitioning=Hash([le@0, ts@1],REDACTED +| 1_| 0_|_AggregateExec: mode=Final, gby=[le@0 as le, ts@1 as ts], aggr=[__sum_state(histogram_gap_bucket.val)] REDACTED +|_|_|_CoalescePartitionsExec REDACTED |_|_|_AggregateExec: mode=Partial, gby=[le@1 as le, ts@0 as ts], aggr=[__sum_state(histogram_gap_bucket.val)] REDACTED |_|_|_ProjectionExec: expr=[ts@0 as ts, le@1 as le, val@3 as val] REDACTED |_|_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[10000], time index=[ts] REDACTED |_|_|_PromSeriesDivideExec: tags=["shard", "le"] REDACTED |_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED |_|_|_| -| 1_| 1_|_AggregateExec: mode=FinalPartitioned, gby=[le@0 as le, ts@1 as ts], aggr=[__sum_state(histogram_gap_bucket.val)] REDACTED -|_|_|_RepartitionExec: partitioning=Hash([le@0, ts@1],REDACTED +| 1_| 1_|_AggregateExec: mode=Final, gby=[le@0 as le, ts@1 as ts], aggr=[__sum_state(histogram_gap_bucket.val)] REDACTED +|_|_|_CoalescePartitionsExec REDACTED |_|_|_AggregateExec: mode=Partial, gby=[le@1 as le, ts@0 as ts], aggr=[__sum_state(histogram_gap_bucket.val)] REDACTED |_|_|_ProjectionExec: expr=[ts@0 as ts, le@1 as le, val@3 as val] REDACTED |_|_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[10000], time index=[ts] REDACTED diff --git a/tests/cases/standalone/common/tql/general_table.result b/tests/cases/standalone/common/tql/general_table.result index 84cc662657..5c36d5b9df 100644 --- a/tests/cases/standalone/common/tql/general_table.result +++ b/tests/cases/standalone/common/tql/general_table.result @@ -27,9 +27,10 @@ TQL analyze (0, 10, '1s') sum by(job) (irate(cpu_usage{job="fire"}[5s])) / 1e9; +-+-+-+ | 0_| 0_|_ProjectionExec: expr=[job@0 as job, ts@1 as ts, sum(prom_irate(ts_range,value))@2 / 1000000000 as sum(prom_irate(ts_range,value)) / Float64(1000000000)] REDACTED |_|_|_RepartitionExec: partitioning=REDACTED -|_|_|_SortPreservingMergeExec: [job@0 ASC NULLS LAST, ts@1 ASC NULLS LAST] REDACTED -|_|_|_SortExec: expr=[job@0 ASC NULLS LAST, ts@1 ASC NULLS LAST], preserve_partitioning=[true] REDACTED -|_|_|_AggregateExec: mode=SinglePartitioned, gby=[job@2 as job, ts@0 as ts], aggr=[sum(prom_irate(ts_range,value))] REDACTED +|_|_|_SortExec: expr=[job@0 ASC NULLS LAST, ts@1 ASC NULLS LAST], preserve_partitioning=[false] REDACTED +|_|_|_AggregateExec: mode=Final, gby=[job@0 as job, ts@1 as ts], aggr=[sum(prom_irate(ts_range,value))] REDACTED +|_|_|_CoalescePartitionsExec REDACTED +|_|_|_AggregateExec: mode=Partial, gby=[job@2 as job, ts@0 as ts], aggr=[sum(prom_irate(ts_range,value))] REDACTED |_|_|_FilterExec: prom_irate(ts_range,value)@1 IS NOT NULL REDACTED |_|_|_ProjectionExec: expr=[ts@2 as ts, prom_irate(ts_range@3, value@0) as prom_irate(ts_range,value), job@1 as job] REDACTED |_|_|_PromRangeManipulateExec: req range=[0..10000], interval=[1000], eval range=[5000], time index=[ts] REDACTED diff --git a/tests/cases/standalone/common/tql/partition.result b/tests/cases/standalone/common/tql/partition.result index 5d0fd98123..c9f7f4857d 100644 --- a/tests/cases/standalone/common/tql/partition.result +++ b/tests/cases/standalone/common/tql/partition.result @@ -24,9 +24,10 @@ tql analyze (0, 10, '1s') 100 - (avg by (k) (irate(t[1m])) * 100); |_|_|_| | 1_| 0_|_ProjectionExec: expr=[k@0 as k, j@1 as j, 100 - avg(prom_irate(j_range,i))@2 * 100 as Float64(100) - avg(prom_irate(j_range,i)) * Float64(100)] REDACTED |_|_|_RepartitionExec: partitioning=REDACTED -|_|_|_SortPreservingMergeExec: [k@0 ASC NULLS LAST, j@1 ASC NULLS LAST] REDACTED -|_|_|_SortExec: expr=[k@0 ASC NULLS LAST, j@1 ASC NULLS LAST], preserve_partitioning=[true] REDACTED -|_|_|_AggregateExec: mode=SinglePartitioned, gby=[k@2 as k, j@0 as j], aggr=[avg(prom_irate(j_range,i))] REDACTED +|_|_|_SortExec: expr=[k@0 ASC NULLS LAST, j@1 ASC NULLS LAST], preserve_partitioning=[false] REDACTED +|_|_|_AggregateExec: mode=Final, gby=[k@0 as k, j@1 as j], aggr=[avg(prom_irate(j_range,i))] REDACTED +|_|_|_CoalescePartitionsExec REDACTED +|_|_|_AggregateExec: mode=Partial, gby=[k@2 as k, j@0 as j], aggr=[avg(prom_irate(j_range,i))] REDACTED |_|_|_FilterExec: prom_irate(j_range,i)@1 IS NOT NULL REDACTED |_|_|_ProjectionExec: expr=[j@1 as j, prom_irate(j_range@3, i@0) as prom_irate(j_range,i), k@2 as k] REDACTED |_|_|_PromRangeManipulateExec: req range=[0..10000], interval=[1000], eval range=[60000], time index=[j] REDACTED @@ -71,8 +72,8 @@ tql analyze (0, 10, '1s') 100 - (avg by (k) (irate(t[1m])) * 100); |_|_|_AggregateExec: mode=SinglePartitioned, gby=[k@0 as k, j@1 as j], aggr=[avg(prom_irate(j_range,i))] REDACTED |_|_|_MergeScanExec: REDACTED |_|_|_| -| 1_| 0_|_AggregateExec: mode=FinalPartitioned, gby=[k@0 as k, j@1 as j], aggr=[__avg_state(prom_irate(j_range,i))] REDACTED -|_|_|_RepartitionExec: partitioning=REDACTED +| 1_| 0_|_AggregateExec: mode=Final, gby=[k@0 as k, j@1 as j], aggr=[__avg_state(prom_irate(j_range,i))] REDACTED +|_|_|_CoalescePartitionsExec REDACTED |_|_|_AggregateExec: mode=Partial, gby=[k@2 as k, j@0 as j], aggr=[__avg_state(prom_irate(j_range,i))] REDACTED |_|_|_FilterExec: prom_irate(j_range,i)@1 IS NOT NULL REDACTED |_|_|_ProjectionExec: expr=[j@1 as j, prom_irate(j_range@4, i@0) as prom_irate(j_range,i), k@2 as k] REDACTED @@ -81,8 +82,8 @@ tql analyze (0, 10, '1s') 100 - (avg by (k) (irate(t[1m])) * 100); |_|_|_PromSeriesDivideExec: tags=["k", "l"] REDACTED |_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED |_|_|_| -| 1_| 1_|_AggregateExec: mode=FinalPartitioned, gby=[k@0 as k, j@1 as j], aggr=[__avg_state(prom_irate(j_range,i))] REDACTED -|_|_|_RepartitionExec: partitioning=REDACTED +| 1_| 1_|_AggregateExec: mode=Final, gby=[k@0 as k, j@1 as j], aggr=[__avg_state(prom_irate(j_range,i))] REDACTED +|_|_|_CoalescePartitionsExec REDACTED |_|_|_AggregateExec: mode=Partial, gby=[k@2 as k, j@0 as j], aggr=[__avg_state(prom_irate(j_range,i))] REDACTED |_|_|_FilterExec: prom_irate(j_range,i)@1 IS NOT NULL REDACTED |_|_|_ProjectionExec: expr=[j@1 as j, prom_irate(j_range@4, i@0) as prom_irate(j_range,i), k@2 as k] REDACTED @@ -123,10 +124,9 @@ tql analyze (0, 10, '1s') 100 - (avg by (k) (irate(t[1m])) * 100); +-+-+-+ | 0_| 0_|_ProjectionExec: expr=[k@0 as k, j@1 as j, 100 - avg(prom_irate(j_range,i))@2 * 100 as Float64(100) - avg(prom_irate(j_range,i)) * Float64(100)] REDACTED |_|_|_RepartitionExec: partitioning=REDACTED -|_|_|_SortPreservingMergeExec: [k@0 ASC NULLS LAST, j@1 ASC NULLS LAST] REDACTED -|_|_|_SortExec: expr=[k@0 ASC NULLS LAST, j@1 ASC NULLS LAST], preserve_partitioning=[true] REDACTED -|_|_|_AggregateExec: mode=FinalPartitioned, gby=[k@0 as k, j@1 as j], aggr=[avg(prom_irate(j_range,i))] REDACTED -|_|_|_RepartitionExec: partitioning=REDACTED +|_|_|_SortExec: expr=[k@0 ASC NULLS LAST, j@1 ASC NULLS LAST], preserve_partitioning=[false] REDACTED +|_|_|_AggregateExec: mode=Final, gby=[k@0 as k, j@1 as j], aggr=[avg(prom_irate(j_range,i))] REDACTED +|_|_|_CoalescePartitionsExec REDACTED |_|_|_AggregateExec: mode=Partial, gby=[k@2 as k, j@0 as j], aggr=[avg(prom_irate(j_range,i))] REDACTED |_|_|_FilterExec: prom_irate(j_range,i)@1 IS NOT NULL REDACTED |_|_|_ProjectionExec: expr=[j@1 as j, prom_irate(j_range@4, i@0) as prom_irate(j_range,i), k@2 as k] REDACTED diff --git a/tests/cases/standalone/optimizer/windowed_sort.result b/tests/cases/standalone/optimizer/windowed_sort.result index 544563ad1a..075f324236 100644 --- a/tests/cases/standalone/optimizer/windowed_sort.result +++ b/tests/cases/standalone/optimizer/windowed_sort.result @@ -282,10 +282,9 @@ TQL ANALYZE('2023-10-01 00:00:00.000'::TIMESTAMP, '2023-10-01 00:00:05.000'::TIM | 0_| 0_|_CooperativeExec REDACTED |_|_|_MergeScanExec: REDACTED |_|_|_| -| 1_| 0_|_SortPreservingMergeExec: [greptime_timestamp@0 ASC NULLS LAST] REDACTED -|_|_|_SortExec: expr=[greptime_timestamp@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED -|_|_|_AggregateExec: mode=FinalPartitioned, gby=[greptime_timestamp@0 as greptime_timestamp], aggr=[sum(instance_job_REDACTED -|_|_|_RepartitionExec: partitioning=REDACTED +| 1_| 0_|_SortExec: expr=[greptime_timestamp@0 ASC NULLS LAST], preserve_partitioning=[false] REDACTED +|_|_|_AggregateExec: mode=Final, gby=[greptime_timestamp@0 as greptime_timestamp], aggr=[sum(instance_job_REDACTED +|_|_|_CoalescePartitionsExec REDACTED |_|_|_AggregateExec: mode=Partial, gby=[greptime_timestamp@0 as greptime_timestamp], aggr=[sum(instance_job_REDACTED |_|_|_ProjectionExec: expr=[greptime_timestamp@0 as greptime_timestamp, greptime_value@1 as greptime_value] REDACTED |_|_|_PromInstantManipulateExec: range=[1696118400000..1696118405000], lookback=[300000], interval=[1000], time index=[greptime_timestamp] REDACTED diff --git a/tests/cases/standalone/tql-explain-analyze/analyze.result b/tests/cases/standalone/tql-explain-analyze/analyze.result index 6eb6991995..8927aaf864 100644 --- a/tests/cases/standalone/tql-explain-analyze/analyze.result +++ b/tests/cases/standalone/tql-explain-analyze/analyze.result @@ -290,16 +290,16 @@ TQL ANALYZE sum(test2); |_|_|_AggregateExec: mode=Partial, gby=[greptime_timestamp@0 as greptime_timestamp], aggr=[sum(test2.greptime_value)] REDACTED |_|_|_MergeScanExec: REDACTED |_|_|_| -| 1_| 0_|_AggregateExec: mode=FinalPartitioned, gby=[greptime_timestamp@0 as greptime_timestamp], aggr=[__sum_state(test2.greptime_value)] REDACTED -|_|_|_RepartitionExec: partitioning=REDACTED +| 1_| 0_|_AggregateExec: mode=Final, gby=[greptime_timestamp@0 as greptime_timestamp], aggr=[__sum_state(test2.greptime_value)] REDACTED +|_|_|_CoalescePartitionsExec REDACTED |_|_|_AggregateExec: mode=Partial, gby=[greptime_timestamp@0 as greptime_timestamp], aggr=[__sum_state(test2.greptime_value)] REDACTED |_|_|_ProjectionExec: expr=[greptime_timestamp@0 as greptime_timestamp, greptime_value@1 as greptime_value] REDACTED |_|_|_PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[greptime_timestamp] REDACTED |_|_|_PromSeriesDivideExec: tags=["shard"] REDACTED |_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED |_|_|_| -| 1_| 1_|_AggregateExec: mode=FinalPartitioned, gby=[greptime_timestamp@0 as greptime_timestamp], aggr=[__sum_state(test2.greptime_value)] REDACTED -|_|_|_RepartitionExec: partitioning=REDACTED +| 1_| 1_|_AggregateExec: mode=Final, gby=[greptime_timestamp@0 as greptime_timestamp], aggr=[__sum_state(test2.greptime_value)] REDACTED +|_|_|_CoalescePartitionsExec REDACTED |_|_|_AggregateExec: mode=Partial, gby=[greptime_timestamp@0 as greptime_timestamp], aggr=[__sum_state(test2.greptime_value)] REDACTED |_|_|_ProjectionExec: expr=[greptime_timestamp@0 as greptime_timestamp, greptime_value@1 as greptime_value] REDACTED |_|_|_PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[greptime_timestamp] REDACTED diff --git a/tests/cases/standalone/tql-explain-analyze/column_pruning.result b/tests/cases/standalone/tql-explain-analyze/column_pruning.result index 4bce77d783..a9f7510353 100644 --- a/tests/cases/standalone/tql-explain-analyze/column_pruning.result +++ b/tests/cases/standalone/tql-explain-analyze/column_pruning.result @@ -37,10 +37,9 @@ TQL ANALYZE (0, 10, '5s') sum(promql_column_pruning); | 0_| 0_|_CooperativeExec REDACTED |_|_|_MergeScanExec: REDACTED |_|_|_| -| 1_| 0_|_SortPreservingMergeExec: [ts@0 ASC NULLS LAST] REDACTED -|_|_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED -|_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[sum(promql_column_pruning.greptime_value)] REDACTED -|_|_|_RepartitionExec: partitioning=REDACTED +| 1_| 0_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[false] REDACTED +|_|_|_AggregateExec: mode=Final, gby=[ts@0 as ts], aggr=[sum(promql_column_pruning.greptime_value)] REDACTED +|_|_|_CoalescePartitionsExec REDACTED |_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[sum(promql_column_pruning.greptime_value)] REDACTED |_|_|_ProjectionExec: expr=[ts@0 as ts, greptime_value@4 as greptime_value] REDACTED |_|_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[ts] REDACTED @@ -65,10 +64,9 @@ TQL ANALYZE (0, 10, '5s') sum(rate(promql_column_pruning[5s])); | 0_| 0_|_CooperativeExec REDACTED |_|_|_MergeScanExec: REDACTED |_|_|_| -| 1_| 0_|_SortPreservingMergeExec: [ts@0 ASC NULLS LAST] REDACTED -|_|_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED -|_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[sum(prom_rate(ts_range,greptime_value,ts,Int64(5000)))] REDACTED -|_|_|_RepartitionExec: partitioning=REDACTED +| 1_| 0_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[false] REDACTED +|_|_|_AggregateExec: mode=Final, gby=[ts@0 as ts], aggr=[sum(prom_rate(ts_range,greptime_value,ts,Int64(5000)))] REDACTED +|_|_|_CoalescePartitionsExec REDACTED |_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[sum(prom_rate(ts_range,greptime_value,ts,Int64(5000)))] REDACTED |_|_|_FilterExec: prom_rate(ts_range,greptime_value,ts,Int64(5000))@1 IS NOT NULL REDACTED |_|_|_ProjectionExec: expr=[ts@0 as ts, prom_rate(ts_range@5, greptime_value@4, ts@0, 5000) as prom_rate(ts_range,greptime_value,ts,Int64(5000))] REDACTED diff --git a/tests/cases/standalone/tql-explain-analyze/tsid_column.result b/tests/cases/standalone/tql-explain-analyze/tsid_column.result index 4a7a875060..1eb0210c82 100644 --- a/tests/cases/standalone/tql-explain-analyze/tsid_column.result +++ b/tests/cases/standalone/tql-explain-analyze/tsid_column.result @@ -45,10 +45,9 @@ TQL ANALYZE (0, 10, '5s') sum(tsid_metric); | 0_| 0_|_CooperativeExec REDACTED |_|_|_MergeScanExec: REDACTED |_|_|_| -| 1_| 0_|_SortPreservingMergeExec: [ts@0 ASC NULLS LAST] REDACTED -|_|_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED -|_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[sum(tsid_metric.val)] REDACTED -|_|_|_RepartitionExec: partitioning=REDACTED +| 1_| 0_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[false] REDACTED +|_|_|_AggregateExec: mode=Final, gby=[ts@0 as ts], aggr=[sum(tsid_metric.val)] REDACTED +|_|_|_CoalescePartitionsExec REDACTED |_|_|_AggregateExec: mode=Partial, gby=[ts@1 as ts], aggr=[sum(tsid_metric.val)] REDACTED |_|_|_ProjectionExec: expr=[val@0 as val, ts@2 as ts] REDACTED |_|_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[ts] REDACTED @@ -74,10 +73,9 @@ TQL ANALYZE (0, 10, '5s') sum by (job, instance) (tsid_metric); | 0_| 0_|_CooperativeExec REDACTED |_|_|_MergeScanExec: REDACTED |_|_|_| -| 1_| 0_|_SortPreservingMergeExec: [job@0 ASC NULLS LAST, instance@1 ASC NULLS LAST, ts@2 ASC NULLS LAST] REDACTED -|_|_|_SortExec: expr=[job@0 ASC NULLS LAST, instance@1 ASC NULLS LAST, ts@2 ASC NULLS LAST], preserve_partitioning=[true] REDACTED -|_|_|_AggregateExec: mode=FinalPartitioned, gby=[job@0 as job, instance@1 as instance, ts@2 as ts], aggr=[sum(tsid_metric.val), __tsid] REDACTED -|_|_|_RepartitionExec: partitioning=REDACTED +| 1_| 0_|_SortExec: expr=[job@0 ASC NULLS LAST, instance@1 ASC NULLS LAST, ts@2 ASC NULLS LAST], preserve_partitioning=[false] REDACTED +|_|_|_AggregateExec: mode=Final, gby=[job@0 as job, instance@1 as instance, ts@2 as ts], aggr=[sum(tsid_metric.val), __tsid] REDACTED +|_|_|_CoalescePartitionsExec REDACTED |_|_|_AggregateExec: mode=Partial, gby=[job@2 as job, instance@1 as instance, ts@4 as ts], aggr=[sum(tsid_metric.val), __tsid] REDACTED |_|_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[ts] REDACTED |_|_|_PromSeriesDivideExec: tags=["__tsid"] REDACTED @@ -107,14 +105,14 @@ TQL ANALYZE (0, 10, '5s') sum(irate(tsid_metric[1h])) / scalar(count(count(tsid |_|_|_CooperativeExec REDACTED |_|_|_MergeScanExec: REDACTED |_|_|_| -| 1_| 0_|_SortPreservingMergeExec: [ts@0 ASC NULLS LAST] REDACTED -|_|_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED -|_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[count(count(tsid_metric.val))] REDACTED -|_|_|_RepartitionExec: partitioning=REDACTED +| 1_| 0_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[false] REDACTED +|_|_|_AggregateExec: mode=Final, gby=[ts@0 as ts], aggr=[count(count(tsid_metric.val))] REDACTED +|_|_|_CoalescePartitionsExec REDACTED |_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[count(count(tsid_metric.val))] REDACTED -|_|_|_ProjectionExec: expr=[ts@0 as ts] REDACTED -|_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts, job@1 as job], aggr=[] REDACTED |_|_|_RepartitionExec: partitioning=REDACTED +|_|_|_ProjectionExec: expr=[ts@0 as ts] REDACTED +|_|_|_AggregateExec: mode=Final, gby=[ts@0 as ts, job@1 as job], aggr=[] REDACTED +|_|_|_CoalescePartitionsExec REDACTED |_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts, job@1 as job], aggr=[] REDACTED |_|_|_ProjectionExec: expr=[ts@3 as ts, job@1 as job] REDACTED |_|_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[ts] REDACTED @@ -122,10 +120,9 @@ TQL ANALYZE (0, 10, '5s') sum(irate(tsid_metric[1h])) / scalar(count(count(tsid |_|_|_ProjectionExec: expr=[val@1 as val, job@3 as job, __tsid@2 as __tsid, ts@0 as ts] REDACTED |_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED |_|_|_| -| 1_| 0_|_SortPreservingMergeExec: [ts@0 ASC NULLS LAST] REDACTED -|_|_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED -|_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[sum(prom_irate(ts_range,val))] REDACTED -|_|_|_RepartitionExec: partitioning=REDACTED +| 1_| 0_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[false] REDACTED +|_|_|_AggregateExec: mode=Final, gby=[ts@0 as ts], aggr=[sum(prom_irate(ts_range,val))] REDACTED +|_|_|_CoalescePartitionsExec REDACTED |_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[sum(prom_irate(ts_range,val))] REDACTED |_|_|_FilterExec: prom_irate(ts_range,val)@1 IS NOT NULL REDACTED |_|_|_ProjectionExec: expr=[ts@2 as ts, prom_irate(ts_range@3, val@0) as prom_irate(ts_range,val)] REDACTED @@ -158,14 +155,14 @@ TQL ANALYZE (0, 10, '5s') sum(irate(tsid_metric[1h])) / scalar(count(sum(tsid_m |_|_|_CooperativeExec REDACTED |_|_|_MergeScanExec: REDACTED |_|_|_| -| 1_| 0_|_SortPreservingMergeExec: [ts@0 ASC NULLS LAST] REDACTED -|_|_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED -|_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[count(sum(tsid_metric.val))] REDACTED -|_|_|_RepartitionExec: partitioning=REDACTED +| 1_| 0_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[false] REDACTED +|_|_|_AggregateExec: mode=Final, gby=[ts@0 as ts], aggr=[count(sum(tsid_metric.val))] REDACTED +|_|_|_CoalescePartitionsExec REDACTED |_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[count(sum(tsid_metric.val))] REDACTED -|_|_|_ProjectionExec: expr=[ts@0 as ts] REDACTED -|_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts, job@1 as job], aggr=[] REDACTED |_|_|_RepartitionExec: partitioning=REDACTED +|_|_|_ProjectionExec: expr=[ts@0 as ts] REDACTED +|_|_|_AggregateExec: mode=Final, gby=[ts@0 as ts, job@1 as job], aggr=[] REDACTED +|_|_|_CoalescePartitionsExec REDACTED |_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts, job@1 as job], aggr=[] REDACTED |_|_|_ProjectionExec: expr=[ts@1 as ts, job@0 as job] REDACTED |_|_|_FilterExec: val@0 IS NOT NULL, projection=[job@1, ts@2] REDACTED @@ -175,10 +172,9 @@ TQL ANALYZE (0, 10, '5s') sum(irate(tsid_metric[1h])) / scalar(count(sum(tsid_m |_|_|_ProjectionExec: expr=[val@1 as val, job@3 as job, __tsid@2 as __tsid, ts@0 as ts] REDACTED |_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED |_|_|_| -| 1_| 0_|_SortPreservingMergeExec: [ts@0 ASC NULLS LAST] REDACTED -|_|_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED -|_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[sum(prom_irate(ts_range,val))] REDACTED -|_|_|_RepartitionExec: partitioning=REDACTED +| 1_| 0_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[false] REDACTED +|_|_|_AggregateExec: mode=Final, gby=[ts@0 as ts], aggr=[sum(prom_irate(ts_range,val))] REDACTED +|_|_|_CoalescePartitionsExec REDACTED |_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[sum(prom_irate(ts_range,val))] REDACTED |_|_|_FilterExec: prom_irate(ts_range,val)@1 IS NOT NULL REDACTED |_|_|_ProjectionExec: expr=[ts@2 as ts, prom_irate(ts_range@3, val@0) as prom_irate(ts_range,val)] REDACTED