From c2f8445483225d198a0c9fa7fb30d0aec366fa99 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Wed, 15 Apr 2026 08:44:53 +0800 Subject: [PATCH] generalize Signed-off-by: Ruihang Xia --- .../optimizer/reduce_aggregate_repartition.rs | 79 +++++++++++-------- .../explain/step_aggr_advance.result | 60 +++++++------- .../promql/histogram_multi_partition.result | 8 +- .../standalone/common/tql/partition.result | 15 ++-- .../standalone/optimizer/windowed_sort.result | 7 +- .../tql-explain-analyze/analyze.result | 8 +- .../tql-explain-analyze/column_pruning.result | 14 ++-- .../tql-explain-analyze/tsid_column.result | 56 ++++++------- 8 files changed, 134 insertions(+), 113 deletions(-) diff --git a/src/query/src/optimizer/reduce_aggregate_repartition.rs b/src/query/src/optimizer/reduce_aggregate_repartition.rs index 8da3b6b9c3..d29ceb56b5 100644 --- a/src/query/src/optimizer/reduce_aggregate_repartition.rs +++ b/src/query/src/optimizer/reduce_aggregate_repartition.rs @@ -23,9 +23,6 @@ use datafusion::physical_plan::{ExecutionPlan, ExecutionPlanProperties, InputOrd use datafusion_common::Result as DfResult; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_physical_expr::{Distribution, Partitioning}; -use promql::extension_plan::{ - InstantManipulateExec, RangeManipulateExec, SeriesDivideExec, SeriesNormalizeExec, -}; /// Replaces a redundant hash repartition before a coarser aggregate with a /// single fan-in. @@ -120,16 +117,6 @@ impl ReduceAggregateRepartition { } fn can_reduce_repartition(repartition_exec: &RepartitionExec) -> bool { - let has_direct_promql_input = matches!( - repartition_exec.input().as_any().downcast_ref::(), - Some(partial_agg) - if partial_agg.mode() == &AggregateMode::Partial - && Self::contains_promql_exec(partial_agg.input(), true) - ); - if Self::contains_promql_exec(repartition_exec.input(), false) { - return has_direct_promql_input; - } - let Partitioning::Hash(finer_partition_exprs, _) = repartition_exec.input().output_partitioning() else { @@ -146,25 +133,6 @@ impl ReduceAggregateRepartition { ); coarsening_satisfaction.is_subset() } - - fn contains_promql_exec(plan: &Arc, stop_at_aggregate: bool) -> bool { - let plan_any = plan.as_any(); - if plan_any.is::() - || plan_any.is::() - || plan_any.is::() - || plan_any.is::() - { - return true; - } - - if stop_at_aggregate && plan_any.is::() { - return false; - } - - plan.children() - .into_iter() - .any(|child| Self::contains_promql_exec(child, stop_at_aggregate)) - } } #[cfg(test)] @@ -174,16 +142,18 @@ mod tests { use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion::datasource::memory::MemorySourceConfig; use datafusion::datasource::source::DataSourceExec; + use datafusion::logical_expr::{EmptyRelation, LogicalPlan}; use datafusion::physical_optimizer::PhysicalOptimizerRule; use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}; use datafusion::physical_plan::projection::ProjectionExec; use datafusion::physical_plan::repartition::RepartitionExec; use datafusion::physical_plan::{ExecutionPlan, displayable}; - use datafusion_common::Result; + use datafusion_common::{DFSchema, Result}; use datafusion_physical_expr::aggregate::AggregateFunctionExpr; use datafusion_physical_expr::expressions::col; use datafusion_physical_expr::{Partitioning, PhysicalExpr}; use pretty_assertions::assert_eq; + use promql::extension_plan::SeriesNormalize; use super::ReduceAggregateRepartition; @@ -243,6 +213,13 @@ mod tests { Ok(displayable(optimized.as_ref()).indent(true).to_string()) } + fn empty_logical_plan() -> LogicalPlan { + LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row: false, + schema: Arc::new(DFSchema::empty()), + }) + } + fn project_with_aliases( input: Arc, aliases: &[(&str, &str)], @@ -254,6 +231,17 @@ mod tests { Ok(Arc::new(ProjectionExec::try_new(exprs?, input)?)) } + fn promql_normalize(input: Arc) -> Arc { + SeriesNormalize::new( + 0, + "a", + false, + vec!["b".to_string(), "c".to_string()], + empty_logical_plan(), + ) + .to_execution_plan(input) + } + #[test] fn rewrites_final_partitioned_subset_repartition() -> Result<()> { let raw = input_exec(); @@ -450,6 +438,31 @@ mod tests { Ok(()) } + #[test] + fn rewrites_promql_subset_partitioning_through_projection() -> Result<()> { + let raw = input_exec(); + let finer = repartition(raw.clone(), &["a", "b", "c"], &raw.schema())?; + let normalized = promql_normalize(finer); + let projected = project_with_aliases(normalized, &[("a", "ts"), ("b", "x"), ("c", "y")])?; + let final_repartition = repartition(projected.clone(), &["x", "y"], &projected.schema())?; + let final_agg = aggregate( + AggregateMode::SinglePartitioned, + final_repartition, + group_by(&["x", "y"], &projected.schema())?, + projected.schema(), + vec![], + )?; + + let optimized = optimize(final_agg)?; + assert!( + optimized.contains("AggregateExec: mode=Single, gby=[x@1 as x, y@2 as y], aggr=[]"), + "{optimized}" + ); + assert!(optimized.contains("CoalescePartitionsExec"), "{optimized}"); + assert!(optimized.contains("PromSeriesNormalizeExec"), "{optimized}"); + Ok(()) + } + #[test] fn keeps_non_subset_repartition() -> Result<()> { let raw = input_exec(); diff --git a/tests/cases/distributed/explain/step_aggr_advance.result b/tests/cases/distributed/explain/step_aggr_advance.result index 6c7a20fa9d..5938fa202d 100644 --- a/tests/cases/distributed/explain/step_aggr_advance.result +++ b/tests/cases/distributed/explain/step_aggr_advance.result @@ -57,9 +57,10 @@ tql analyze (1752591864, 1752592164, '30s') max by (a, b, c) (max_over_time(aggr | 0_| 0_|_SortPreservingMergeExec: [a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST, greptime_timestamp@3 ASC NULLS LAST] REDACTED |_|_|_MergeScanExec: REDACTED |_|_|_| -| 1_| 0_|_SortExec: expr=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST, greptime_timestamp@3 ASC NULLS LAST], preserve_partitioning=[false] REDACTED -|_|_|_AggregateExec: mode=Final, gby=[a@0 as a, b@1 as b, c@2 as c, greptime_timestamp@3 as greptime_timestamp], aggr=[max(prom_max_over_time(greptime_timestamp_range,greptime_value))] REDACTED -|_|_|_CoalescePartitionsExec REDACTED +| 1_| 0_|_SortPreservingMergeExec: [a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST, greptime_timestamp@3 ASC NULLS LAST] REDACTED +|_|_|_SortExec: expr=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST, greptime_timestamp@3 ASC NULLS LAST], preserve_partitioning=[true] REDACTED +|_|_|_AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b, c@2 as c, greptime_timestamp@3 as greptime_timestamp], aggr=[max(prom_max_over_time(greptime_timestamp_range,greptime_value))] REDACTED +|_|_|_RepartitionExec: partitioning=REDACTED |_|_|_AggregateExec: mode=Partial, gby=[a@2 as a, b@3 as b, c@4 as c, greptime_timestamp@0 as greptime_timestamp], aggr=[max(prom_max_over_time(greptime_timestamp_range,greptime_value))] REDACTED |_|_|_FilterExec: prom_max_over_time(greptime_timestamp_range,greptime_value)@1 IS NOT NULL REDACTED |_|_|_ProjectionExec: expr=[greptime_timestamp@4 as greptime_timestamp, prom_max_over_time(greptime_timestamp_range@6, greptime_value@5) as prom_max_over_time(greptime_timestamp_range,greptime_value), a@0 as a, b@1 as b, c@2 as c] REDACTED @@ -68,9 +69,10 @@ tql analyze (1752591864, 1752592164, '30s') max by (a, b, c) (max_over_time(aggr |_|_|_PromSeriesDivideExec: tags=["a", "b", "c", "d"] REDACTED |_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED |_|_|_| -| 1_| 1_|_SortExec: expr=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST, greptime_timestamp@3 ASC NULLS LAST], preserve_partitioning=[false] REDACTED -|_|_|_AggregateExec: mode=Final, gby=[a@0 as a, b@1 as b, c@2 as c, greptime_timestamp@3 as greptime_timestamp], aggr=[max(prom_max_over_time(greptime_timestamp_range,greptime_value))] REDACTED -|_|_|_CoalescePartitionsExec REDACTED +| 1_| 1_|_SortPreservingMergeExec: [a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST, greptime_timestamp@3 ASC NULLS LAST] REDACTED +|_|_|_SortExec: expr=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST, greptime_timestamp@3 ASC NULLS LAST], preserve_partitioning=[true] REDACTED +|_|_|_AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b, c@2 as c, greptime_timestamp@3 as greptime_timestamp], aggr=[max(prom_max_over_time(greptime_timestamp_range,greptime_value))] REDACTED +|_|_|_RepartitionExec: partitioning=REDACTED |_|_|_AggregateExec: mode=Partial, gby=[a@2 as a, b@3 as b, c@4 as c, greptime_timestamp@0 as greptime_timestamp], aggr=[max(prom_max_over_time(greptime_timestamp_range,greptime_value))] REDACTED |_|_|_FilterExec: prom_max_over_time(greptime_timestamp_range,greptime_value)@1 IS NOT NULL REDACTED |_|_|_ProjectionExec: expr=[greptime_timestamp@4 as greptime_timestamp, prom_max_over_time(greptime_timestamp_range@6, greptime_value@5) as prom_max_over_time(greptime_timestamp_range,greptime_value), a@0 as a, b@1 as b, c@2 as c] REDACTED @@ -128,8 +130,8 @@ tql analyze (1752591864, 1752592164, '30s') sum by (a, b) (max_over_time(aggr_op |_|_|_AggregateExec: mode=SinglePartitioned, gby=[a@0 as a, b@1 as b, greptime_timestamp@2 as greptime_timestamp], aggr=[sum(prom_max_over_time(greptime_timestamp_range,greptime_value))] REDACTED |_|_|_MergeScanExec: REDACTED |_|_|_| -| 1_| 0_|_AggregateExec: mode=Final, gby=[a@0 as a, b@1 as b, greptime_timestamp@2 as greptime_timestamp], aggr=[__sum_state(prom_max_over_time(greptime_timestamp_range,greptime_value))] REDACTED -|_|_|_CoalescePartitionsExec REDACTED +| 1_| 0_|_AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b, greptime_timestamp@2 as greptime_timestamp], aggr=[__sum_state(prom_max_over_time(greptime_timestamp_range,greptime_value))] REDACTED +|_|_|_RepartitionExec: partitioning=REDACTED |_|_|_AggregateExec: mode=Partial, gby=[a@2 as a, b@3 as b, greptime_timestamp@0 as greptime_timestamp], aggr=[__sum_state(prom_max_over_time(greptime_timestamp_range,greptime_value))] REDACTED |_|_|_FilterExec: prom_max_over_time(greptime_timestamp_range,greptime_value)@1 IS NOT NULL REDACTED |_|_|_ProjectionExec: expr=[greptime_timestamp@4 as greptime_timestamp, prom_max_over_time(greptime_timestamp_range@6, greptime_value@5) as prom_max_over_time(greptime_timestamp_range,greptime_value), a@0 as a, b@1 as b] REDACTED @@ -138,8 +140,8 @@ tql analyze (1752591864, 1752592164, '30s') sum by (a, b) (max_over_time(aggr_op |_|_|_PromSeriesDivideExec: tags=["a", "b", "c", "d"] REDACTED |_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED |_|_|_| -| 1_| 1_|_AggregateExec: mode=Final, gby=[a@0 as a, b@1 as b, greptime_timestamp@2 as greptime_timestamp], aggr=[__sum_state(prom_max_over_time(greptime_timestamp_range,greptime_value))] REDACTED -|_|_|_CoalescePartitionsExec REDACTED +| 1_| 1_|_AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b, greptime_timestamp@2 as greptime_timestamp], aggr=[__sum_state(prom_max_over_time(greptime_timestamp_range,greptime_value))] REDACTED +|_|_|_RepartitionExec: partitioning=REDACTED |_|_|_AggregateExec: mode=Partial, gby=[a@2 as a, b@3 as b, greptime_timestamp@0 as greptime_timestamp], aggr=[__sum_state(prom_max_over_time(greptime_timestamp_range,greptime_value))] REDACTED |_|_|_FilterExec: prom_max_over_time(greptime_timestamp_range,greptime_value)@1 IS NOT NULL REDACTED |_|_|_ProjectionExec: expr=[greptime_timestamp@4 as greptime_timestamp, prom_max_over_time(greptime_timestamp_range@6, greptime_value@5) as prom_max_over_time(greptime_timestamp_range,greptime_value), a@0 as a, b@1 as b] REDACTED @@ -197,8 +199,8 @@ tql analyze (1752591864, 1752592164, '30s') avg by (a) (max_over_time(aggr_optim |_|_|_AggregateExec: mode=SinglePartitioned, gby=[a@0 as a, greptime_timestamp@1 as greptime_timestamp], aggr=[avg(prom_max_over_time(greptime_timestamp_range,greptime_value))] REDACTED |_|_|_MergeScanExec: REDACTED |_|_|_| -| 1_| 0_|_AggregateExec: mode=Final, gby=[a@0 as a, greptime_timestamp@1 as greptime_timestamp], aggr=[__avg_state(prom_max_over_time(greptime_timestamp_range,greptime_value))] REDACTED -|_|_|_CoalescePartitionsExec REDACTED +| 1_| 0_|_AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, greptime_timestamp@1 as greptime_timestamp], aggr=[__avg_state(prom_max_over_time(greptime_timestamp_range,greptime_value))] REDACTED +|_|_|_RepartitionExec: partitioning=REDACTED |_|_|_AggregateExec: mode=Partial, gby=[a@2 as a, greptime_timestamp@0 as greptime_timestamp], aggr=[__avg_state(prom_max_over_time(greptime_timestamp_range,greptime_value))] REDACTED |_|_|_FilterExec: prom_max_over_time(greptime_timestamp_range,greptime_value)@1 IS NOT NULL REDACTED |_|_|_ProjectionExec: expr=[greptime_timestamp@4 as greptime_timestamp, prom_max_over_time(greptime_timestamp_range@6, greptime_value@5) as prom_max_over_time(greptime_timestamp_range,greptime_value), a@0 as a] REDACTED @@ -207,8 +209,8 @@ tql analyze (1752591864, 1752592164, '30s') avg by (a) (max_over_time(aggr_optim |_|_|_PromSeriesDivideExec: tags=["a", "b", "c", "d"] REDACTED |_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED |_|_|_| -| 1_| 1_|_AggregateExec: mode=Final, gby=[a@0 as a, greptime_timestamp@1 as greptime_timestamp], aggr=[__avg_state(prom_max_over_time(greptime_timestamp_range,greptime_value))] REDACTED -|_|_|_CoalescePartitionsExec REDACTED +| 1_| 1_|_AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, greptime_timestamp@1 as greptime_timestamp], aggr=[__avg_state(prom_max_over_time(greptime_timestamp_range,greptime_value))] REDACTED +|_|_|_RepartitionExec: partitioning=REDACTED |_|_|_AggregateExec: mode=Partial, gby=[a@2 as a, greptime_timestamp@0 as greptime_timestamp], aggr=[__avg_state(prom_max_over_time(greptime_timestamp_range,greptime_value))] REDACTED |_|_|_FilterExec: prom_max_over_time(greptime_timestamp_range,greptime_value)@1 IS NOT NULL REDACTED |_|_|_ProjectionExec: expr=[greptime_timestamp@4 as greptime_timestamp, prom_max_over_time(greptime_timestamp_range@6, greptime_value@5) as prom_max_over_time(greptime_timestamp_range,greptime_value), a@0 as a] REDACTED @@ -331,8 +333,8 @@ tql analyze (1752591864, 1752592164, '30s') min by (b, c, d) (max_over_time(aggr |_|_|_AggregateExec: mode=SinglePartitioned, gby=[b@0 as b, c@1 as c, d@2 as d, greptime_timestamp@3 as greptime_timestamp], aggr=[min(prom_max_over_time(greptime_timestamp_range,greptime_value))] REDACTED |_|_|_MergeScanExec: REDACTED |_|_|_| -| 1_| 0_|_AggregateExec: mode=Final, gby=[b@0 as b, c@1 as c, d@2 as d, greptime_timestamp@3 as greptime_timestamp], aggr=[__min_state(prom_max_over_time(greptime_timestamp_range,greptime_value))] REDACTED -|_|_|_CoalescePartitionsExec REDACTED +| 1_| 0_|_AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, c@1 as c, d@2 as d, greptime_timestamp@3 as greptime_timestamp], aggr=[__min_state(prom_max_over_time(greptime_timestamp_range,greptime_value))] REDACTED +|_|_|_RepartitionExec: partitioning=REDACTED |_|_|_AggregateExec: mode=Partial, gby=[b@2 as b, c@3 as c, d@4 as d, greptime_timestamp@0 as greptime_timestamp], aggr=[__min_state(prom_max_over_time(greptime_timestamp_range,greptime_value))] REDACTED |_|_|_FilterExec: prom_max_over_time(greptime_timestamp_range,greptime_value)@1 IS NOT NULL REDACTED |_|_|_ProjectionExec: expr=[greptime_timestamp@4 as greptime_timestamp, prom_max_over_time(greptime_timestamp_range@6, greptime_value@5) as prom_max_over_time(greptime_timestamp_range,greptime_value), b@1 as b, c@2 as c, d@3 as d] REDACTED @@ -341,8 +343,8 @@ tql analyze (1752591864, 1752592164, '30s') min by (b, c, d) (max_over_time(aggr |_|_|_PromSeriesDivideExec: tags=["a", "b", "c", "d"] REDACTED |_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED |_|_|_| -| 1_| 1_|_AggregateExec: mode=Final, gby=[b@0 as b, c@1 as c, d@2 as d, greptime_timestamp@3 as greptime_timestamp], aggr=[__min_state(prom_max_over_time(greptime_timestamp_range,greptime_value))] REDACTED -|_|_|_CoalescePartitionsExec REDACTED +| 1_| 1_|_AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, c@1 as c, d@2 as d, greptime_timestamp@3 as greptime_timestamp], aggr=[__min_state(prom_max_over_time(greptime_timestamp_range,greptime_value))] REDACTED +|_|_|_RepartitionExec: partitioning=REDACTED |_|_|_AggregateExec: mode=Partial, gby=[b@2 as b, c@3 as c, d@4 as d, greptime_timestamp@0 as greptime_timestamp], aggr=[__min_state(prom_max_over_time(greptime_timestamp_range,greptime_value))] REDACTED |_|_|_FilterExec: prom_max_over_time(greptime_timestamp_range,greptime_value)@1 IS NOT NULL REDACTED |_|_|_ProjectionExec: expr=[greptime_timestamp@4 as greptime_timestamp, prom_max_over_time(greptime_timestamp_range@6, greptime_value@5) as prom_max_over_time(greptime_timestamp_range,greptime_value), b@1 as b, c@2 as c, d@3 as d] REDACTED @@ -401,16 +403,16 @@ tql analyze sum(aggr_optimize_not); |_|_|_AggregateExec: mode=Partial, gby=[greptime_timestamp@0 as greptime_timestamp], aggr=[sum(aggr_optimize_not.greptime_value)] REDACTED |_|_|_MergeScanExec: REDACTED |_|_|_| -| 1_| 0_|_AggregateExec: mode=Final, gby=[greptime_timestamp@0 as greptime_timestamp], aggr=[__sum_state(aggr_optimize_not.greptime_value)] REDACTED -|_|_|_CoalescePartitionsExec REDACTED +| 1_| 0_|_AggregateExec: mode=FinalPartitioned, gby=[greptime_timestamp@0 as greptime_timestamp], aggr=[__sum_state(aggr_optimize_not.greptime_value)] REDACTED +|_|_|_RepartitionExec: partitioning=REDACTED |_|_|_AggregateExec: mode=Partial, gby=[greptime_timestamp@0 as greptime_timestamp], aggr=[__sum_state(aggr_optimize_not.greptime_value)] REDACTED |_|_|_ProjectionExec: expr=[greptime_timestamp@4 as greptime_timestamp, greptime_value@5 as greptime_value] REDACTED |_|_|_PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[greptime_timestamp] REDACTED |_|_|_PromSeriesDivideExec: tags=["a", "b", "c", "d"] REDACTED |_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED |_|_|_| -| 1_| 1_|_AggregateExec: mode=Final, gby=[greptime_timestamp@0 as greptime_timestamp], aggr=[__sum_state(aggr_optimize_not.greptime_value)] REDACTED -|_|_|_CoalescePartitionsExec REDACTED +| 1_| 1_|_AggregateExec: mode=FinalPartitioned, gby=[greptime_timestamp@0 as greptime_timestamp], aggr=[__sum_state(aggr_optimize_not.greptime_value)] REDACTED +|_|_|_RepartitionExec: partitioning=REDACTED |_|_|_AggregateExec: mode=Partial, gby=[greptime_timestamp@0 as greptime_timestamp], aggr=[__sum_state(aggr_optimize_not.greptime_value)] REDACTED |_|_|_ProjectionExec: expr=[greptime_timestamp@4 as greptime_timestamp, greptime_value@5 as greptime_value] REDACTED |_|_|_PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[greptime_timestamp] REDACTED @@ -523,9 +525,10 @@ tql analyze (1752591864, 1752592164, '30s') sum by (a, b, c) (rate(aggr_optimize |_|_|_SeqScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0} REDACTED |_|_|_| | 1_| 0_|_ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c, greptime_timestamp@3 as greptime_timestamp, sum(prom_rate(greptime_timestamp_range,greptime_value,aggr_optimize_not.greptime_timestamp,Int64(120000)))@4 as sum(prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000)))] REDACTED -|_|_|_SortExec: expr=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST, greptime_timestamp@3 ASC NULLS LAST], preserve_partitioning=[false] REDACTED -|_|_|_AggregateExec: mode=Final, gby=[a@0 as a, b@1 as b, c@2 as c, greptime_timestamp@3 as greptime_timestamp], aggr=[sum(prom_rate(greptime_timestamp_range,greptime_value,aggr_optimize_not.greptime_timestamp,Int64(120000)))] REDACTED -|_|_|_CoalescePartitionsExec REDACTED +|_|_|_SortPreservingMergeExec: [a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST, greptime_timestamp@3 ASC NULLS LAST] REDACTED +|_|_|_SortExec: expr=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST, greptime_timestamp@3 ASC NULLS LAST], preserve_partitioning=[true] REDACTED +|_|_|_AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b, c@2 as c, greptime_timestamp@3 as greptime_timestamp], aggr=[sum(prom_rate(greptime_timestamp_range,greptime_value,aggr_optimize_not.greptime_timestamp,Int64(120000)))] REDACTED +|_|_|_RepartitionExec: partitioning=REDACTED |_|_|_AggregateExec: mode=Partial, gby=[a@2 as a, b@3 as b, c@4 as c, greptime_timestamp@0 as greptime_timestamp], aggr=[sum(prom_rate(greptime_timestamp_range,greptime_value,aggr_optimize_not.greptime_timestamp,Int64(120000)))] REDACTED |_|_|_FilterExec: prom_rate(greptime_timestamp_range,greptime_value,aggr_optimize_not.greptime_timestamp,Int64(120000))@1 IS NOT NULL REDACTED |_|_|_ProjectionExec: expr=[greptime_timestamp@4 as greptime_timestamp, prom_rate(greptime_timestamp_range@6, greptime_value@5, greptime_timestamp@4, 120000) as prom_rate(greptime_timestamp_range,greptime_value,aggr_optimize_not.greptime_timestamp,Int64(120000)), a@0 as a, b@1 as b, c@2 as c] REDACTED @@ -535,9 +538,10 @@ tql analyze (1752591864, 1752592164, '30s') sum by (a, b, c) (rate(aggr_optimize |_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED |_|_|_| | 1_| 1_|_ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c, greptime_timestamp@3 as greptime_timestamp, sum(prom_rate(greptime_timestamp_range,greptime_value,aggr_optimize_not.greptime_timestamp,Int64(120000)))@4 as sum(prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000)))] REDACTED -|_|_|_SortExec: expr=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST, greptime_timestamp@3 ASC NULLS LAST], preserve_partitioning=[false] REDACTED -|_|_|_AggregateExec: mode=Final, gby=[a@0 as a, b@1 as b, c@2 as c, greptime_timestamp@3 as greptime_timestamp], aggr=[sum(prom_rate(greptime_timestamp_range,greptime_value,aggr_optimize_not.greptime_timestamp,Int64(120000)))] REDACTED -|_|_|_CoalescePartitionsExec REDACTED +|_|_|_SortPreservingMergeExec: [a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST, greptime_timestamp@3 ASC NULLS LAST] REDACTED +|_|_|_SortExec: expr=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST, greptime_timestamp@3 ASC NULLS LAST], preserve_partitioning=[true] REDACTED +|_|_|_AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b, c@2 as c, greptime_timestamp@3 as greptime_timestamp], aggr=[sum(prom_rate(greptime_timestamp_range,greptime_value,aggr_optimize_not.greptime_timestamp,Int64(120000)))] REDACTED +|_|_|_RepartitionExec: partitioning=REDACTED |_|_|_AggregateExec: mode=Partial, gby=[a@2 as a, b@3 as b, c@4 as c, greptime_timestamp@0 as greptime_timestamp], aggr=[sum(prom_rate(greptime_timestamp_range,greptime_value,aggr_optimize_not.greptime_timestamp,Int64(120000)))] REDACTED |_|_|_FilterExec: prom_rate(greptime_timestamp_range,greptime_value,aggr_optimize_not.greptime_timestamp,Int64(120000))@1 IS NOT NULL REDACTED |_|_|_ProjectionExec: expr=[greptime_timestamp@4 as greptime_timestamp, prom_rate(greptime_timestamp_range@6, greptime_value@5, greptime_timestamp@4, 120000) as prom_rate(greptime_timestamp_range,greptime_value,aggr_optimize_not.greptime_timestamp,Int64(120000)), a@0 as a, b@1 as b, c@2 as c] REDACTED diff --git a/tests/cases/standalone/common/promql/histogram_multi_partition.result b/tests/cases/standalone/common/promql/histogram_multi_partition.result index a88daddc39..091ed7bfcc 100644 --- a/tests/cases/standalone/common/promql/histogram_multi_partition.result +++ b/tests/cases/standalone/common/promql/histogram_multi_partition.result @@ -49,16 +49,16 @@ tql analyze (0, 10, '10s') histogram_quantile(0.5, sum by (le) (histogram_gap_bu |_|_|_AggregateExec: mode=Partial, gby=[le@0 as le, ts@1 as ts], aggr=[sum(histogram_gap_bucket.val)] REDACTED |_|_|_MergeScanExec: REDACTED |_|_|_| -| 1_| 0_|_AggregateExec: mode=Final, gby=[le@0 as le, ts@1 as ts], aggr=[__sum_state(histogram_gap_bucket.val)] REDACTED -|_|_|_CoalescePartitionsExec REDACTED +| 1_| 0_|_AggregateExec: mode=FinalPartitioned, gby=[le@0 as le, ts@1 as ts], aggr=[__sum_state(histogram_gap_bucket.val)] REDACTED +|_|_|_RepartitionExec: partitioning=Hash([le@0, ts@1],REDACTED |_|_|_AggregateExec: mode=Partial, gby=[le@1 as le, ts@0 as ts], aggr=[__sum_state(histogram_gap_bucket.val)] REDACTED |_|_|_ProjectionExec: expr=[ts@0 as ts, le@1 as le, val@3 as val] REDACTED |_|_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[10000], time index=[ts] REDACTED |_|_|_PromSeriesDivideExec: tags=["shard", "le"] REDACTED |_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED |_|_|_| -| 1_| 1_|_AggregateExec: mode=Final, gby=[le@0 as le, ts@1 as ts], aggr=[__sum_state(histogram_gap_bucket.val)] REDACTED -|_|_|_CoalescePartitionsExec REDACTED +| 1_| 1_|_AggregateExec: mode=FinalPartitioned, gby=[le@0 as le, ts@1 as ts], aggr=[__sum_state(histogram_gap_bucket.val)] REDACTED +|_|_|_RepartitionExec: partitioning=Hash([le@0, ts@1],REDACTED |_|_|_AggregateExec: mode=Partial, gby=[le@1 as le, ts@0 as ts], aggr=[__sum_state(histogram_gap_bucket.val)] REDACTED |_|_|_ProjectionExec: expr=[ts@0 as ts, le@1 as le, val@3 as val] REDACTED |_|_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[10000], time index=[ts] REDACTED diff --git a/tests/cases/standalone/common/tql/partition.result b/tests/cases/standalone/common/tql/partition.result index bea610aa95..5d0fd98123 100644 --- a/tests/cases/standalone/common/tql/partition.result +++ b/tests/cases/standalone/common/tql/partition.result @@ -71,8 +71,8 @@ tql analyze (0, 10, '1s') 100 - (avg by (k) (irate(t[1m])) * 100); |_|_|_AggregateExec: mode=SinglePartitioned, gby=[k@0 as k, j@1 as j], aggr=[avg(prom_irate(j_range,i))] REDACTED |_|_|_MergeScanExec: REDACTED |_|_|_| -| 1_| 0_|_AggregateExec: mode=Final, gby=[k@0 as k, j@1 as j], aggr=[__avg_state(prom_irate(j_range,i))] REDACTED -|_|_|_CoalescePartitionsExec REDACTED +| 1_| 0_|_AggregateExec: mode=FinalPartitioned, gby=[k@0 as k, j@1 as j], aggr=[__avg_state(prom_irate(j_range,i))] REDACTED +|_|_|_RepartitionExec: partitioning=REDACTED |_|_|_AggregateExec: mode=Partial, gby=[k@2 as k, j@0 as j], aggr=[__avg_state(prom_irate(j_range,i))] REDACTED |_|_|_FilterExec: prom_irate(j_range,i)@1 IS NOT NULL REDACTED |_|_|_ProjectionExec: expr=[j@1 as j, prom_irate(j_range@4, i@0) as prom_irate(j_range,i), k@2 as k] REDACTED @@ -81,8 +81,8 @@ tql analyze (0, 10, '1s') 100 - (avg by (k) (irate(t[1m])) * 100); |_|_|_PromSeriesDivideExec: tags=["k", "l"] REDACTED |_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED |_|_|_| -| 1_| 1_|_AggregateExec: mode=Final, gby=[k@0 as k, j@1 as j], aggr=[__avg_state(prom_irate(j_range,i))] REDACTED -|_|_|_CoalescePartitionsExec REDACTED +| 1_| 1_|_AggregateExec: mode=FinalPartitioned, gby=[k@0 as k, j@1 as j], aggr=[__avg_state(prom_irate(j_range,i))] REDACTED +|_|_|_RepartitionExec: partitioning=REDACTED |_|_|_AggregateExec: mode=Partial, gby=[k@2 as k, j@0 as j], aggr=[__avg_state(prom_irate(j_range,i))] REDACTED |_|_|_FilterExec: prom_irate(j_range,i)@1 IS NOT NULL REDACTED |_|_|_ProjectionExec: expr=[j@1 as j, prom_irate(j_range@4, i@0) as prom_irate(j_range,i), k@2 as k] REDACTED @@ -123,9 +123,10 @@ tql analyze (0, 10, '1s') 100 - (avg by (k) (irate(t[1m])) * 100); +-+-+-+ | 0_| 0_|_ProjectionExec: expr=[k@0 as k, j@1 as j, 100 - avg(prom_irate(j_range,i))@2 * 100 as Float64(100) - avg(prom_irate(j_range,i)) * Float64(100)] REDACTED |_|_|_RepartitionExec: partitioning=REDACTED -|_|_|_SortExec: expr=[k@0 ASC NULLS LAST, j@1 ASC NULLS LAST], preserve_partitioning=[false] REDACTED -|_|_|_AggregateExec: mode=Final, gby=[k@0 as k, j@1 as j], aggr=[avg(prom_irate(j_range,i))] REDACTED -|_|_|_CoalescePartitionsExec REDACTED +|_|_|_SortPreservingMergeExec: [k@0 ASC NULLS LAST, j@1 ASC NULLS LAST] REDACTED +|_|_|_SortExec: expr=[k@0 ASC NULLS LAST, j@1 ASC NULLS LAST], preserve_partitioning=[true] REDACTED +|_|_|_AggregateExec: mode=FinalPartitioned, gby=[k@0 as k, j@1 as j], aggr=[avg(prom_irate(j_range,i))] REDACTED +|_|_|_RepartitionExec: partitioning=REDACTED |_|_|_AggregateExec: mode=Partial, gby=[k@2 as k, j@0 as j], aggr=[avg(prom_irate(j_range,i))] REDACTED |_|_|_FilterExec: prom_irate(j_range,i)@1 IS NOT NULL REDACTED |_|_|_ProjectionExec: expr=[j@1 as j, prom_irate(j_range@4, i@0) as prom_irate(j_range,i), k@2 as k] REDACTED diff --git a/tests/cases/standalone/optimizer/windowed_sort.result b/tests/cases/standalone/optimizer/windowed_sort.result index 075f324236..544563ad1a 100644 --- a/tests/cases/standalone/optimizer/windowed_sort.result +++ b/tests/cases/standalone/optimizer/windowed_sort.result @@ -282,9 +282,10 @@ TQL ANALYZE('2023-10-01 00:00:00.000'::TIMESTAMP, '2023-10-01 00:00:05.000'::TIM | 0_| 0_|_CooperativeExec REDACTED |_|_|_MergeScanExec: REDACTED |_|_|_| -| 1_| 0_|_SortExec: expr=[greptime_timestamp@0 ASC NULLS LAST], preserve_partitioning=[false] REDACTED -|_|_|_AggregateExec: mode=Final, gby=[greptime_timestamp@0 as greptime_timestamp], aggr=[sum(instance_job_REDACTED -|_|_|_CoalescePartitionsExec REDACTED +| 1_| 0_|_SortPreservingMergeExec: [greptime_timestamp@0 ASC NULLS LAST] REDACTED +|_|_|_SortExec: expr=[greptime_timestamp@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED +|_|_|_AggregateExec: mode=FinalPartitioned, gby=[greptime_timestamp@0 as greptime_timestamp], aggr=[sum(instance_job_REDACTED +|_|_|_RepartitionExec: partitioning=REDACTED |_|_|_AggregateExec: mode=Partial, gby=[greptime_timestamp@0 as greptime_timestamp], aggr=[sum(instance_job_REDACTED |_|_|_ProjectionExec: expr=[greptime_timestamp@0 as greptime_timestamp, greptime_value@1 as greptime_value] REDACTED |_|_|_PromInstantManipulateExec: range=[1696118400000..1696118405000], lookback=[300000], interval=[1000], time index=[greptime_timestamp] REDACTED diff --git a/tests/cases/standalone/tql-explain-analyze/analyze.result b/tests/cases/standalone/tql-explain-analyze/analyze.result index 8927aaf864..6eb6991995 100644 --- a/tests/cases/standalone/tql-explain-analyze/analyze.result +++ b/tests/cases/standalone/tql-explain-analyze/analyze.result @@ -290,16 +290,16 @@ TQL ANALYZE sum(test2); |_|_|_AggregateExec: mode=Partial, gby=[greptime_timestamp@0 as greptime_timestamp], aggr=[sum(test2.greptime_value)] REDACTED |_|_|_MergeScanExec: REDACTED |_|_|_| -| 1_| 0_|_AggregateExec: mode=Final, gby=[greptime_timestamp@0 as greptime_timestamp], aggr=[__sum_state(test2.greptime_value)] REDACTED -|_|_|_CoalescePartitionsExec REDACTED +| 1_| 0_|_AggregateExec: mode=FinalPartitioned, gby=[greptime_timestamp@0 as greptime_timestamp], aggr=[__sum_state(test2.greptime_value)] REDACTED +|_|_|_RepartitionExec: partitioning=REDACTED |_|_|_AggregateExec: mode=Partial, gby=[greptime_timestamp@0 as greptime_timestamp], aggr=[__sum_state(test2.greptime_value)] REDACTED |_|_|_ProjectionExec: expr=[greptime_timestamp@0 as greptime_timestamp, greptime_value@1 as greptime_value] REDACTED |_|_|_PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[greptime_timestamp] REDACTED |_|_|_PromSeriesDivideExec: tags=["shard"] REDACTED |_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED |_|_|_| -| 1_| 1_|_AggregateExec: mode=Final, gby=[greptime_timestamp@0 as greptime_timestamp], aggr=[__sum_state(test2.greptime_value)] REDACTED -|_|_|_CoalescePartitionsExec REDACTED +| 1_| 1_|_AggregateExec: mode=FinalPartitioned, gby=[greptime_timestamp@0 as greptime_timestamp], aggr=[__sum_state(test2.greptime_value)] REDACTED +|_|_|_RepartitionExec: partitioning=REDACTED |_|_|_AggregateExec: mode=Partial, gby=[greptime_timestamp@0 as greptime_timestamp], aggr=[__sum_state(test2.greptime_value)] REDACTED |_|_|_ProjectionExec: expr=[greptime_timestamp@0 as greptime_timestamp, greptime_value@1 as greptime_value] REDACTED |_|_|_PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[greptime_timestamp] REDACTED diff --git a/tests/cases/standalone/tql-explain-analyze/column_pruning.result b/tests/cases/standalone/tql-explain-analyze/column_pruning.result index a9f7510353..4bce77d783 100644 --- a/tests/cases/standalone/tql-explain-analyze/column_pruning.result +++ b/tests/cases/standalone/tql-explain-analyze/column_pruning.result @@ -37,9 +37,10 @@ TQL ANALYZE (0, 10, '5s') sum(promql_column_pruning); | 0_| 0_|_CooperativeExec REDACTED |_|_|_MergeScanExec: REDACTED |_|_|_| -| 1_| 0_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[false] REDACTED -|_|_|_AggregateExec: mode=Final, gby=[ts@0 as ts], aggr=[sum(promql_column_pruning.greptime_value)] REDACTED -|_|_|_CoalescePartitionsExec REDACTED +| 1_| 0_|_SortPreservingMergeExec: [ts@0 ASC NULLS LAST] REDACTED +|_|_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED +|_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[sum(promql_column_pruning.greptime_value)] REDACTED +|_|_|_RepartitionExec: partitioning=REDACTED |_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[sum(promql_column_pruning.greptime_value)] REDACTED |_|_|_ProjectionExec: expr=[ts@0 as ts, greptime_value@4 as greptime_value] REDACTED |_|_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[ts] REDACTED @@ -64,9 +65,10 @@ TQL ANALYZE (0, 10, '5s') sum(rate(promql_column_pruning[5s])); | 0_| 0_|_CooperativeExec REDACTED |_|_|_MergeScanExec: REDACTED |_|_|_| -| 1_| 0_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[false] REDACTED -|_|_|_AggregateExec: mode=Final, gby=[ts@0 as ts], aggr=[sum(prom_rate(ts_range,greptime_value,ts,Int64(5000)))] REDACTED -|_|_|_CoalescePartitionsExec REDACTED +| 1_| 0_|_SortPreservingMergeExec: [ts@0 ASC NULLS LAST] REDACTED +|_|_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED +|_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[sum(prom_rate(ts_range,greptime_value,ts,Int64(5000)))] REDACTED +|_|_|_RepartitionExec: partitioning=REDACTED |_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[sum(prom_rate(ts_range,greptime_value,ts,Int64(5000)))] REDACTED |_|_|_FilterExec: prom_rate(ts_range,greptime_value,ts,Int64(5000))@1 IS NOT NULL REDACTED |_|_|_ProjectionExec: expr=[ts@0 as ts, prom_rate(ts_range@5, greptime_value@4, ts@0, 5000) as prom_rate(ts_range,greptime_value,ts,Int64(5000))] REDACTED diff --git a/tests/cases/standalone/tql-explain-analyze/tsid_column.result b/tests/cases/standalone/tql-explain-analyze/tsid_column.result index 4305992a43..86769d03db 100644 --- a/tests/cases/standalone/tql-explain-analyze/tsid_column.result +++ b/tests/cases/standalone/tql-explain-analyze/tsid_column.result @@ -45,9 +45,10 @@ TQL ANALYZE (0, 10, '5s') sum(tsid_metric); | 0_| 0_|_CooperativeExec REDACTED |_|_|_MergeScanExec: REDACTED |_|_|_| -| 1_| 0_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[false] REDACTED -|_|_|_AggregateExec: mode=Final, gby=[ts@0 as ts], aggr=[sum(tsid_metric.val)] REDACTED -|_|_|_CoalescePartitionsExec REDACTED +| 1_| 0_|_SortPreservingMergeExec: [ts@0 ASC NULLS LAST] REDACTED +|_|_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED +|_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[sum(tsid_metric.val)] REDACTED +|_|_|_RepartitionExec: partitioning=REDACTED |_|_|_AggregateExec: mode=Partial, gby=[ts@1 as ts], aggr=[sum(tsid_metric.val)] REDACTED |_|_|_ProjectionExec: expr=[val@0 as val, ts@2 as ts] REDACTED |_|_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[ts] REDACTED @@ -73,9 +74,10 @@ TQL ANALYZE (0, 10, '5s') sum by (job, instance) (tsid_metric); | 0_| 0_|_CooperativeExec REDACTED |_|_|_MergeScanExec: REDACTED |_|_|_| -| 1_| 0_|_SortExec: expr=[job@0 ASC NULLS LAST, instance@1 ASC NULLS LAST, ts@2 ASC NULLS LAST], preserve_partitioning=[false] REDACTED -|_|_|_AggregateExec: mode=Final, gby=[job@0 as job, instance@1 as instance, ts@2 as ts], aggr=[sum(tsid_metric.val), __tsid] REDACTED -|_|_|_CoalescePartitionsExec REDACTED +| 1_| 0_|_SortPreservingMergeExec: [job@0 ASC NULLS LAST, instance@1 ASC NULLS LAST, ts@2 ASC NULLS LAST] REDACTED +|_|_|_SortExec: expr=[job@0 ASC NULLS LAST, instance@1 ASC NULLS LAST, ts@2 ASC NULLS LAST], preserve_partitioning=[true] REDACTED +|_|_|_AggregateExec: mode=FinalPartitioned, gby=[job@0 as job, instance@1 as instance, ts@2 as ts], aggr=[sum(tsid_metric.val), __tsid] REDACTED +|_|_|_RepartitionExec: partitioning=REDACTED |_|_|_AggregateExec: mode=Partial, gby=[job@2 as job, instance@1 as instance, ts@4 as ts], aggr=[sum(tsid_metric.val), __tsid] REDACTED |_|_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[ts] REDACTED |_|_|_PromSeriesDivideExec: tags=["__tsid"] REDACTED @@ -105,15 +107,13 @@ TQL ANALYZE (0, 10, '5s') sum(irate(tsid_metric[1h])) / scalar(count(count(tsid |_|_|_CooperativeExec REDACTED |_|_|_MergeScanExec: REDACTED |_|_|_| -| 1_| 0_|_SortPreservingMergeExec: [ts@0 ASC NULLS LAST] REDACTED -|_|_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED -|_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[count(count(tsid_metric.val))] REDACTED -|_|_|_RepartitionExec: partitioning=REDACTED -|_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[count(count(tsid_metric.val))] REDACTED -|_|_|_RepartitionExec: partitioning=REDACTED -|_|_|_ProjectionExec: expr=[ts@0 as ts] REDACTED -|_|_|_AggregateExec: mode=Final, gby=[ts@0 as ts, job@1 as job], aggr=[] REDACTED +| 1_| 0_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[false] REDACTED +|_|_|_AggregateExec: mode=Final, gby=[ts@0 as ts], aggr=[count(count(tsid_metric.val))] REDACTED |_|_|_CoalescePartitionsExec REDACTED +|_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[count(count(tsid_metric.val))] REDACTED +|_|_|_ProjectionExec: expr=[ts@0 as ts] REDACTED +|_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts, job@1 as job], aggr=[] REDACTED +|_|_|_RepartitionExec: partitioning=REDACTED |_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts, job@1 as job], aggr=[] REDACTED |_|_|_ProjectionExec: expr=[ts@3 as ts, job@1 as job] REDACTED |_|_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[ts] REDACTED @@ -121,9 +121,10 @@ TQL ANALYZE (0, 10, '5s') sum(irate(tsid_metric[1h])) / scalar(count(count(tsid |_|_|_ProjectionExec: expr=[val@1 as val, job@3 as job, __tsid@2 as __tsid, ts@0 as ts] REDACTED |_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED |_|_|_| -| 1_| 0_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[false] REDACTED -|_|_|_AggregateExec: mode=Final, gby=[ts@0 as ts], aggr=[sum(prom_irate(ts_range,val))] REDACTED -|_|_|_CoalescePartitionsExec REDACTED +| 1_| 0_|_SortPreservingMergeExec: [ts@0 ASC NULLS LAST] REDACTED +|_|_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED +|_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[sum(prom_irate(ts_range,val))] REDACTED +|_|_|_RepartitionExec: partitioning=REDACTED |_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[sum(prom_irate(ts_range,val))] REDACTED |_|_|_FilterExec: prom_irate(ts_range,val)@1 IS NOT NULL REDACTED |_|_|_ProjectionExec: expr=[ts@2 as ts, prom_irate(ts_range@3, val@0) as prom_irate(ts_range,val)] REDACTED @@ -156,15 +157,13 @@ TQL ANALYZE (0, 10, '5s') sum(irate(tsid_metric[1h])) / scalar(count(sum(tsid_m |_|_|_CooperativeExec REDACTED |_|_|_MergeScanExec: REDACTED |_|_|_| -| 1_| 0_|_SortPreservingMergeExec: [ts@0 ASC NULLS LAST] REDACTED -|_|_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED -|_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[count(sum(tsid_metric.val))] REDACTED -|_|_|_RepartitionExec: partitioning=REDACTED -|_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[count(sum(tsid_metric.val))] REDACTED -|_|_|_RepartitionExec: partitioning=REDACTED -|_|_|_ProjectionExec: expr=[ts@0 as ts] REDACTED -|_|_|_AggregateExec: mode=Final, gby=[ts@0 as ts, job@1 as job], aggr=[] REDACTED +| 1_| 0_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[false] REDACTED +|_|_|_AggregateExec: mode=Final, gby=[ts@0 as ts], aggr=[count(sum(tsid_metric.val))] REDACTED |_|_|_CoalescePartitionsExec REDACTED +|_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[count(sum(tsid_metric.val))] REDACTED +|_|_|_ProjectionExec: expr=[ts@0 as ts] REDACTED +|_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts, job@1 as job], aggr=[] REDACTED +|_|_|_RepartitionExec: partitioning=REDACTED |_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts, job@1 as job], aggr=[] REDACTED |_|_|_ProjectionExec: expr=[ts@1 as ts, job@0 as job] REDACTED |_|_|_FilterExec: val@0 IS NOT NULL, projection=[job@1, ts@2] REDACTED @@ -174,9 +173,10 @@ TQL ANALYZE (0, 10, '5s') sum(irate(tsid_metric[1h])) / scalar(count(sum(tsid_m |_|_|_ProjectionExec: expr=[val@1 as val, job@3 as job, __tsid@2 as __tsid, ts@0 as ts] REDACTED |_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED |_|_|_| -| 1_| 0_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[false] REDACTED -|_|_|_AggregateExec: mode=Final, gby=[ts@0 as ts], aggr=[sum(prom_irate(ts_range,val))] REDACTED -|_|_|_CoalescePartitionsExec REDACTED +| 1_| 0_|_SortPreservingMergeExec: [ts@0 ASC NULLS LAST] REDACTED +|_|_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED +|_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[sum(prom_irate(ts_range,val))] REDACTED +|_|_|_RepartitionExec: partitioning=REDACTED |_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[sum(prom_irate(ts_range,val))] REDACTED |_|_|_FilterExec: prom_irate(ts_range,val)@1 IS NOT NULL REDACTED |_|_|_ProjectionExec: expr=[ts@2 as ts, prom_irate(ts_range@3, val@0) as prom_irate(ts_range,val)] REDACTED