From 170f94fc0834e3d05e2711a3f00002129e17af2c Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Wed, 14 Jan 2026 16:32:51 +0800 Subject: [PATCH] feat: enable pruning for manipulate plans (#7565) * feat: enable pruning for manipulate plans Signed-off-by: Ruihang Xia * apply to other plans and add sqlness case Signed-off-by: Ruihang Xia * fix scalar manipulate and histogram fold for missing some columns Signed-off-by: Ruihang Xia * don't drop every columns Signed-off-by: Ruihang Xia * remove unrelated part Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia --- src/promql/src/extension_plan/absent.rs | 18 ++- .../src/extension_plan/histogram_fold.rs | 150 ++++++++++++++++- .../src/extension_plan/instant_manipulate.rs | 63 +++++++- src/promql/src/extension_plan/normalize.rs | 53 +++++- .../src/extension_plan/range_manipulate.rs | 152 +++++++++++++++++- .../src/extension_plan/scalar_calculate.rs | 143 +++++++++++++++- .../src/extension_plan/series_divide.rs | 56 ++++++- .../src/extension_plan/union_distinct_on.rs | 50 +++++- .../explain/step_aggr_advance.result | 3 +- .../tql-explain-analyze/column_pruning.result | 91 +++++++++++ .../tql-explain-analyze/column_pruning.sql | 39 +++++ 11 files changed, 797 insertions(+), 21 deletions(-) create mode 100644 tests/cases/standalone/tql-explain-analyze/column_pruning.result create mode 100644 tests/cases/standalone/tql-explain-analyze/column_pruning.sql diff --git a/src/promql/src/extension_plan/absent.rs b/src/promql/src/extension_plan/absent.rs index 2c01a6f570..037c975ce8 100644 --- a/src/promql/src/extension_plan/absent.rs +++ b/src/promql/src/extension_plan/absent.rs @@ -34,7 +34,7 @@ use datafusion::physical_plan::{ RecordBatchStream, SendableRecordBatchStream, }; use datafusion_common::DFSchema; -use datafusion_expr::EmptyRelation; +use datafusion_expr::{EmptyRelation, col}; use datatypes::arrow; use datatypes::arrow::array::{ArrayRef, Float64Array, TimestampMillisecondArray}; use datatypes::arrow::datatypes::{DataType, Field, SchemaRef, TimeUnit}; @@ -107,7 +107,21 @@ impl UserDefinedLogicalNodeCore for Absent { } fn expressions(&self) -> Vec { - vec![] + if self.unfix.is_some() { + return vec![]; + } + + vec![col(&self.time_index_column)] + } + + fn necessary_children_exprs(&self, _output_columns: &[usize]) -> Option>> { + if self.unfix.is_some() { + return None; + } + + let input_schema = self.input.schema(); + let time_index_idx = input_schema.index_of_column_by_name(None, &self.time_index_column)?; + Some(vec![vec![time_index_idx]]) } fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { diff --git a/src/promql/src/extension_plan/histogram_fold.rs b/src/promql/src/extension_plan/histogram_fold.rs index f4637e36f0..b8c04e597a 100644 --- a/src/promql/src/extension_plan/histogram_fold.rs +++ b/src/promql/src/extension_plan/histogram_fold.rs @@ -40,6 +40,7 @@ use datafusion::physical_plan::{ Partitioning, PhysicalExpr, PlanProperties, RecordBatchStream, SendableRecordBatchStream, }; use datafusion::prelude::{Column, Expr}; +use datafusion_expr::col; use datatypes::prelude::{ConcreteDataType, DataType as GtDataType}; use datatypes::value::{OrderedF64, Value, ValueRef}; use datatypes::vectors::{Helper, MutableVector, VectorRef}; @@ -88,7 +89,45 @@ impl UserDefinedLogicalNodeCore for HistogramFold { } fn expressions(&self) -> Vec { - vec![] + let mut exprs = vec![ + col(&self.le_column), + col(&self.ts_column), + col(&self.field_column), + ]; + exprs.extend(self.input.schema().fields().iter().filter_map(|f| { + let name = f.name(); + if name != &self.le_column && name != &self.ts_column && name != &self.field_column { + Some(col(name)) + } else { + None + } + })); + exprs + } + + fn necessary_children_exprs(&self, output_columns: &[usize]) -> Option>> { + let input_schema = self.input.schema(); + let le_column_index = input_schema.index_of_column_by_name(None, &self.le_column)?; + + if output_columns.is_empty() { + let indices = (0..input_schema.fields().len()).collect::>(); + return Some(vec![indices]); + } + + let mut necessary_indices = output_columns + .iter() + .map(|&output_column| { + if output_column < le_column_index { + output_column + } else { + output_column + 1 + } + }) + .collect::>(); + necessary_indices.push(le_column_index); + necessary_indices.sort_unstable(); + necessary_indices.dedup(); + Some(vec![necessary_indices]) } fn fmt_for_explain(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { @@ -998,11 +1037,26 @@ mod test { use datafusion::common::ToDFSchema; use datafusion::datasource::memory::MemorySourceConfig; use datafusion::datasource::source::DataSourceExec; + use datafusion::logical_expr::EmptyRelation; use datafusion::prelude::SessionContext; use datatypes::arrow_array::StringArray; + use futures::FutureExt; use super::*; + fn project_batch(batch: &RecordBatch, indices: &[usize]) -> RecordBatch { + let fields = indices + .iter() + .map(|&idx| batch.schema().field(idx).clone()) + .collect::>(); + let columns = indices + .iter() + .map(|&idx| batch.column(idx).clone()) + .collect::>(); + let schema = Arc::new(Schema::new(fields)); + RecordBatch::try_new(schema, columns).unwrap() + } + fn prepare_test_data() -> DataSourceExec { let schema = Arc::new(Schema::new(vec![ Field::new("host", DataType::Utf8, true), @@ -1190,6 +1244,100 @@ mod test { assert_eq!(result_literal, expected); } + #[tokio::test] + async fn pruning_should_keep_le_column_for_exec() { + let schema = Arc::new(Schema::new(vec![ + Field::new("ts", DataType::Timestamp(TimeUnit::Millisecond, None), true), + Field::new("le", DataType::Utf8, true), + Field::new("val", DataType::Float64, true), + ])); + let df_schema = schema.clone().to_dfschema_ref().unwrap(); + let input = LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row: false, + schema: df_schema, + }); + let plan = HistogramFold::new( + "le".to_string(), + "val".to_string(), + "ts".to_string(), + 0.5, + input, + ) + .unwrap(); + + let output_columns = [0usize, 1usize]; + let required = plan.necessary_children_exprs(&output_columns).unwrap(); + let required = &required[0]; + assert_eq!(required.as_slice(), &[0, 1, 2]); + + let input_batch = RecordBatch::try_new( + schema, + vec![ + Arc::new(TimestampMillisecondArray::from(vec![0, 0])), + Arc::new(StringArray::from(vec!["0.1", "+Inf"])), + Arc::new(Float64Array::from(vec![1.0, 2.0])), + ], + ) + .unwrap(); + let projected = project_batch(&input_batch, required); + let projected_schema = projected.schema(); + let memory_exec = Arc::new(DataSourceExec::new(Arc::new( + MemorySourceConfig::try_new(&[vec![projected]], projected_schema, None).unwrap(), + ))); + + let fold_exec = plan.to_execution_plan(memory_exec); + let session_context = SessionContext::default(); + let output_batches = + datafusion::physical_plan::collect(fold_exec, session_context.task_ctx()) + .await + .unwrap(); + assert_eq!(output_batches.len(), 1); + + let output_batch = &output_batches[0]; + assert_eq!(output_batch.num_rows(), 1); + + let ts = output_batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(ts.values(), &[0i64]); + + let values = output_batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + assert!((values.value(0) - 0.1).abs() < 1e-12); + + // Simulate the pre-fix pruning behavior: omit the `le` column from the child input. + let le_index = 1usize; + let broken_required = output_columns + .iter() + .map(|&output_column| { + if output_column < le_index { + output_column + } else { + output_column + 1 + } + }) + .collect::>(); + + let broken = project_batch(&input_batch, &broken_required); + let broken_schema = broken.schema(); + let broken_exec = Arc::new(DataSourceExec::new(Arc::new( + MemorySourceConfig::try_new(&[vec![broken]], broken_schema, None).unwrap(), + ))); + let broken_fold_exec = plan.to_execution_plan(broken_exec); + let session_context = SessionContext::default(); + let broken_result = std::panic::AssertUnwindSafe(async { + datafusion::physical_plan::collect(broken_fold_exec, session_context.task_ctx()).await + }) + .catch_unwind() + .await; + assert!(broken_result.is_err()); + } + #[test] fn confirm_schema() { let input_schema = Schema::new(vec![ diff --git a/src/promql/src/extension_plan/instant_manipulate.rs b/src/promql/src/extension_plan/instant_manipulate.rs index aa4cd6d184..de5264b561 100644 --- a/src/promql/src/extension_plan/instant_manipulate.rs +++ b/src/promql/src/extension_plan/instant_manipulate.rs @@ -33,6 +33,7 @@ use datafusion::physical_plan::{ DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics, }; +use datafusion_expr::col; use datatypes::arrow::compute; use datatypes::arrow::error::Result as ArrowResult; use futures::{Stream, StreamExt, ready}; @@ -84,7 +85,37 @@ impl UserDefinedLogicalNodeCore for InstantManipulate { } fn expressions(&self) -> Vec { - vec![] + if self.unfix.is_some() { + return vec![]; + } + + let mut exprs = vec![col(&self.time_index_column)]; + if let Some(field) = &self.field_column { + exprs.push(col(field)); + } + exprs + } + + fn necessary_children_exprs(&self, output_columns: &[usize]) -> Option>> { + if self.unfix.is_some() { + return None; + } + + let input_schema = self.input.schema(); + if output_columns.is_empty() { + let indices = (0..input_schema.fields().len()).collect::>(); + return Some(vec![indices]); + } + + let mut required = output_columns.to_vec(); + required.push(input_schema.index_of_column_by_name(None, &self.time_index_column)?); + if let Some(field) = &self.field_column { + required.push(input_schema.index_of_column_by_name(None, field)?); + } + + required.sort_unstable(); + required.dedup(); + Some(vec![required]) } fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { @@ -440,8 +471,6 @@ impl InstantManipulateStream { // refer to Go version: https://github.com/prometheus/prometheus/blob/e934d0f01158a1d55fa0ebb035346b195fcc1260/promql/engine.go#L1571 // and the function `vectorSelectorSingle` pub fn manipulate(&self, input: RecordBatch) -> DataFusionResult { - let mut take_indices = vec![]; - let ts_column = input .column(self.time_index) .as_any() @@ -473,6 +502,8 @@ impl InstantManipulateStream { let aligned_start = self.start + (max_start - self.start) / self.interval * self.interval; let aligned_end = self.end - (self.end - min_end) / self.interval * self.interval; + let mut take_indices = vec![]; + let mut cursor = 0; let aligned_ts_iter = (aligned_start..=aligned_end).step_by(self.interval as usize); @@ -570,6 +601,8 @@ impl InstantManipulateStream { #[cfg(test)] mod test { + use datafusion::common::ToDFSchema; + use datafusion::logical_expr::{EmptyRelation, LogicalPlan}; use datafusion::prelude::SessionContext; use super::*; @@ -611,6 +644,30 @@ mod test { assert_eq!(result_literal, expected); } + #[test] + fn pruning_should_keep_time_and_field_columns_for_exec() { + let df_schema = prepare_test_data().schema().to_dfschema_ref().unwrap(); + let input = LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row: false, + schema: df_schema, + }); + let plan = InstantManipulate::new( + 0, + 0, + 0, + 0, + TIME_INDEX_COLUMN.to_string(), + Some("value".to_string()), + input, + ); + + // Simulate a parent projection requesting only the `path` column. + let output_columns = [2usize]; + let required = plan.necessary_children_exprs(&output_columns).unwrap(); + let required = &required[0]; + assert_eq!(required.as_slice(), &[0, 1, 2]); + } + #[tokio::test] async fn lookback_10s_interval_30s() { let expected = String::from( diff --git a/src/promql/src/extension_plan/normalize.rs b/src/promql/src/extension_plan/normalize.rs index 9466508607..8fc1f55788 100644 --- a/src/promql/src/extension_plan/normalize.rs +++ b/src/promql/src/extension_plan/normalize.rs @@ -31,6 +31,7 @@ use datafusion::physical_plan::{ DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, PlanProperties, RecordBatchStream, SendableRecordBatchStream, }; +use datafusion_expr::col; use datatypes::arrow::array::TimestampMillisecondArray; use datatypes::arrow::datatypes::SchemaRef; use datatypes::arrow::record_batch::RecordBatch; @@ -83,7 +84,38 @@ impl UserDefinedLogicalNodeCore for SeriesNormalize { } fn expressions(&self) -> Vec { - vec![] + if self.unfix.is_some() { + return vec![]; + } + + self.tag_columns + .iter() + .map(col) + .chain(std::iter::once(col(&self.time_index_column_name))) + .collect() + } + + fn necessary_children_exprs(&self, output_columns: &[usize]) -> Option>> { + if self.unfix.is_some() { + return None; + } + + let input_schema = self.input.schema(); + if output_columns.is_empty() { + let indices = (0..input_schema.fields().len()).collect::>(); + return Some(vec![indices]); + } + + let mut required = Vec::with_capacity(output_columns.len() + 1 + self.tag_columns.len()); + required.extend_from_slice(output_columns); + required.push(input_schema.index_of_column_by_name(None, &self.time_index_column_name)?); + for tag in &self.tag_columns { + required.push(input_schema.index_of_column_by_name(None, tag)?); + } + + required.sort_unstable(); + required.dedup(); + Some(vec![required]) } fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { @@ -429,8 +461,10 @@ mod test { use datafusion::arrow::datatypes::{ ArrowPrimitiveType, DataType, Field, Schema, TimestampMillisecondType, }; + use datafusion::common::ToDFSchema; use datafusion::datasource::memory::MemorySourceConfig; use datafusion::datasource::source::DataSourceExec; + use datafusion::logical_expr::{EmptyRelation, LogicalPlan}; use datafusion::prelude::SessionContext; use datatypes::arrow::array::TimestampMillisecondArray; use datatypes::arrow_array::StringArray; @@ -461,6 +495,23 @@ mod test { )) } + #[test] + fn pruning_should_keep_time_and_tag_columns_for_exec() { + let df_schema = prepare_test_data().schema().to_dfschema_ref().unwrap(); + let input = LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row: false, + schema: df_schema, + }); + let plan = + SeriesNormalize::new(0, TIME_INDEX_COLUMN, true, vec!["path".to_string()], input); + + // Simulate a parent projection requesting only the `value` column. + let output_columns = [1usize]; + let required = plan.necessary_children_exprs(&output_columns).unwrap(); + let required = &required[0]; + assert_eq!(required.as_slice(), &[0, 1, 2]); + } + #[tokio::test] async fn test_sort_record_batch() { let memory_exec = Arc::new(prepare_test_data()); diff --git a/src/promql/src/extension_plan/range_manipulate.rs b/src/promql/src/extension_plan/range_manipulate.rs index 9c0586adef..f8ffd87782 100644 --- a/src/promql/src/extension_plan/range_manipulate.rs +++ b/src/promql/src/extension_plan/range_manipulate.rs @@ -18,7 +18,7 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; -use common_telemetry::debug; +use common_telemetry::{debug, warn}; use datafusion::arrow::array::{Array, ArrayRef, Int64Array, TimestampMillisecondArray}; use datafusion::arrow::compute; use datafusion::arrow::datatypes::{Field, SchemaRef}; @@ -38,6 +38,7 @@ use datafusion::physical_plan::{ SendableRecordBatchStream, Statistics, }; use datafusion::sql::TableReference; +use datafusion_expr::col; use futures::{Stream, StreamExt, ready}; use greptime_proto::substrait_extension as pb; use prost::Message; @@ -288,7 +289,53 @@ impl UserDefinedLogicalNodeCore for RangeManipulate { } fn expressions(&self) -> Vec { - vec![] + if self.unfix.is_some() { + return vec![]; + } + + let mut exprs = Vec::with_capacity(1 + self.field_columns.len()); + exprs.push(col(&self.time_index)); + exprs.extend(self.field_columns.iter().map(col)); + exprs + } + + fn necessary_children_exprs(&self, output_columns: &[usize]) -> Option>> { + if self.unfix.is_some() { + return None; + } + + let input_schema = self.input.schema(); + let input_len = input_schema.fields().len(); + let time_index_idx = input_schema.index_of_column_by_name(None, &self.time_index)?; + + if output_columns.is_empty() { + let indices = (0..input_len).collect::>(); + return Some(vec![indices]); + } + + let mut required = Vec::with_capacity(output_columns.len() + 1 + self.field_columns.len()); + required.push(time_index_idx); + for value_column in &self.field_columns { + required.push(input_schema.index_of_column_by_name(None, value_column)?); + } + for &idx in output_columns { + if idx < input_len { + required.push(idx); + } else if idx == input_len { + // Derived timestamp range column. + required.push(time_index_idx); + } else { + warn!( + "Output column index {} is out of bounds for input schema with length {}", + idx, input_len + ); + return None; + } + } + + required.sort_unstable(); + required.dedup(); + Some(vec![required]) } fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { @@ -734,16 +781,31 @@ mod test { use datafusion::common::ToDFSchema; use datafusion::datasource::memory::MemorySourceConfig; use datafusion::datasource::source::DataSourceExec; + use datafusion::logical_expr::{EmptyRelation, LogicalPlan}; use datafusion::physical_expr::Partitioning; use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; use datafusion::physical_plan::memory::MemoryStream; use datafusion::prelude::SessionContext; use datatypes::arrow::array::TimestampMillisecondArray; + use futures::FutureExt; use super::*; const TIME_INDEX_COLUMN: &str = "timestamp"; + fn project_batch(batch: &RecordBatch, indices: &[usize]) -> RecordBatch { + let fields = indices + .iter() + .map(|&idx| batch.schema().field(idx).clone()) + .collect::>(); + let columns = indices + .iter() + .map(|&idx| batch.column(idx).clone()) + .collect::>(); + let schema = Arc::new(Schema::new(fields)); + RecordBatch::try_new(schema, columns).unwrap() + } + fn prepare_test_data() -> DataSourceExec { let schema = Arc::new(Schema::new(vec![ Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true), @@ -844,6 +906,92 @@ mod test { assert_eq!(result_literal, expected); } + #[tokio::test] + async fn pruning_should_keep_time_and_value_columns_for_exec() { + let schema = Arc::new(Schema::new(vec![ + Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true), + Field::new("value_1", DataType::Float64, true), + Field::new("value_2", DataType::Float64, true), + Field::new("path", DataType::Utf8, true), + ])); + let df_schema = schema.clone().to_dfschema_ref().unwrap(); + let input = LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row: false, + schema: df_schema, + }); + let plan = RangeManipulate::new( + 0, + 310_000, + 30_000, + 90_000, + TIME_INDEX_COLUMN.to_string(), + vec!["value_1".to_string(), "value_2".to_string()], + input, + ) + .unwrap(); + + // Simulate a parent projection requesting only the `path` column. + let output_columns = [3usize]; + let required = plan.necessary_children_exprs(&output_columns).unwrap(); + let required = &required[0]; + assert_eq!(required.as_slice(), &[0, 1, 2, 3]); + + let timestamp_column = Arc::new(TimestampMillisecondArray::from(vec![ + 0, 30_000, 60_000, 90_000, 120_000, // every 30s + 180_000, 240_000, // every 60s + 241_000, 271_000, 291_000, // others + ])) as _; + let field_column: ArrayRef = Arc::new(Float64Array::from(vec![1.0; 10])) as _; + let path_column = Arc::new(StringArray::from(vec!["foo"; 10])) as _; + let input_batch = RecordBatch::try_new( + schema, + vec![ + timestamp_column, + field_column.clone(), + field_column, + path_column, + ], + ) + .unwrap(); + + let projected = project_batch(&input_batch, required); + let projected_schema = projected.schema(); + let memory_exec = Arc::new(DataSourceExec::new(Arc::new( + MemorySourceConfig::try_new(&[vec![projected]], projected_schema, None).unwrap(), + ))); + let range_exec = plan.to_execution_plan(memory_exec); + let session_context = SessionContext::default(); + let output_batches = + datafusion::physical_plan::collect(range_exec, session_context.task_ctx()) + .await + .unwrap(); + assert_eq!(output_batches.len(), 1); + + let output_batch = &output_batches[0]; + let path = output_batch + .column(3) + .as_any() + .downcast_ref::() + .unwrap(); + assert!(path.iter().all(|v| v == Some("foo"))); + + // Simulate the pre-fix pruning behavior: omit the timestamp/value columns from the child. + let broken_required = [3usize]; + let broken = project_batch(&input_batch, &broken_required); + let broken_schema = broken.schema(); + let broken_exec = Arc::new(DataSourceExec::new(Arc::new( + MemorySourceConfig::try_new(&[vec![broken]], broken_schema, None).unwrap(), + ))); + let broken_range_exec = plan.to_execution_plan(broken_exec); + let session_context = SessionContext::default(); + let broken_result = std::panic::AssertUnwindSafe(async { + datafusion::physical_plan::collect(broken_range_exec, session_context.task_ctx()).await + }) + .catch_unwind() + .await; + assert!(broken_result.is_err()); + } + #[tokio::test] async fn interval_30s_range_90s() { let expected = String::from( diff --git a/src/promql/src/extension_plan/scalar_calculate.rs b/src/promql/src/extension_plan/scalar_calculate.rs index 8619e79387..6a5f0fb0d8 100644 --- a/src/promql/src/extension_plan/scalar_calculate.rs +++ b/src/promql/src/extension_plan/scalar_calculate.rs @@ -31,6 +31,7 @@ use datafusion::physical_plan::{ }; use datafusion::prelude::Expr; use datafusion::sql::TableReference; +use datafusion_expr::col; use datatypes::arrow::array::{Array, Float64Array, StringArray, TimestampMillisecondArray}; use datatypes::arrow::compute::{CastOptions, cast_with_options, concat_batches}; use datatypes::arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit}; @@ -266,7 +267,36 @@ impl UserDefinedLogicalNodeCore for ScalarCalculate { } fn expressions(&self) -> Vec { - vec![] + if self.unfix.is_some() { + return vec![]; + } + + self.tag_columns + .iter() + .map(col) + .chain(std::iter::once(col(&self.time_index))) + .chain(std::iter::once(col(&self.field_column))) + .collect() + } + + fn necessary_children_exprs(&self, _output_columns: &[usize]) -> Option>> { + if self.unfix.is_some() { + return None; + } + + let input_schema = self.input.schema(); + let time_index_idx = input_schema.index_of_column_by_name(None, &self.time_index)?; + let field_column_idx = input_schema.index_of_column_by_name(None, &self.field_column)?; + + let mut required = Vec::with_capacity(2 + self.tag_columns.len()); + required.extend([time_index_idx, field_column_idx]); + for tag in &self.tag_columns { + required.push(input_schema.index_of_column_by_name(None, tag)?); + } + + required.sort_unstable(); + required.dedup(); + Some(vec![required]) } fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { @@ -275,15 +305,9 @@ impl UserDefinedLogicalNodeCore for ScalarCalculate { fn with_exprs_and_inputs( &self, - exprs: Vec, + _exprs: Vec, inputs: Vec, ) -> DataFusionResult { - if !exprs.is_empty() { - return Err(DataFusionError::Internal( - "ScalarCalculate should not have any expressions".to_string(), - )); - } - let input: LogicalPlan = inputs.into_iter().next().unwrap(); let input_schema = input.schema(); @@ -624,6 +648,109 @@ mod test { use super::*; + fn project_batch(batch: &RecordBatch, indices: &[usize]) -> RecordBatch { + let fields = indices + .iter() + .map(|&idx| batch.schema().field(idx).clone()) + .collect::>(); + let columns = indices + .iter() + .map(|&idx| batch.column(idx).clone()) + .collect::>(); + let schema = Arc::new(Schema::new(fields)); + RecordBatch::try_new(schema, columns).unwrap() + } + + #[test] + fn necessary_children_exprs_preserve_tag_columns() { + let schema = Arc::new(Schema::new(vec![ + Field::new("ts", DataType::Timestamp(TimeUnit::Millisecond, None), true), + Field::new("tag1", DataType::Utf8, true), + Field::new("tag2", DataType::Utf8, true), + Field::new("val", DataType::Float64, true), + Field::new("extra", DataType::Utf8, true), + ])); + let schema = Arc::new(DFSchema::try_from(schema).unwrap()); + let input = LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row: false, + schema, + }); + let tag_columns = vec!["tag1".to_string(), "tag2".to_string()]; + let plan = ScalarCalculate::new(0, 1, 1, input, "ts", &tag_columns, "val", None).unwrap(); + + let required = plan.necessary_children_exprs(&[0, 1]).unwrap(); + assert_eq!(required, vec![vec![0, 1, 2, 3]]); + } + + #[tokio::test] + async fn pruning_should_keep_tag_columns_for_exec() { + let schema = Arc::new(Schema::new(vec![ + Field::new("ts", DataType::Timestamp(TimeUnit::Millisecond, None), true), + Field::new("tag1", DataType::Utf8, true), + Field::new("tag2", DataType::Utf8, true), + Field::new("val", DataType::Float64, true), + Field::new("extra", DataType::Utf8, true), + ])); + let df_schema = Arc::new(DFSchema::try_from(schema.clone()).unwrap()); + let input = LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row: false, + schema: df_schema, + }); + let tag_columns = vec!["tag1".to_string(), "tag2".to_string()]; + let plan = + ScalarCalculate::new(0, 15_000, 5000, input, "ts", &tag_columns, "val", None).unwrap(); + + let required = plan.necessary_children_exprs(&[0, 1]).unwrap(); + let required = &required[0]; + + let batch = RecordBatch::try_new( + schema, + vec![ + Arc::new(TimestampMillisecondArray::from(vec![ + 0, 5_000, 10_000, 15_000, + ])), + Arc::new(StringArray::from(vec!["foo", "foo", "foo", "foo"])), + Arc::new(StringArray::from(vec!["bar", "bar", "bar", "bar"])), + Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0, 4.0])), + Arc::new(StringArray::from(vec!["x", "x", "x", "x"])), + ], + ) + .unwrap(); + + let projected_batch = project_batch(&batch, required); + let projected_schema = projected_batch.schema(); + let memory_exec = Arc::new(DataSourceExec::new(Arc::new( + MemorySourceConfig::try_new(&[vec![projected_batch]], projected_schema, None).unwrap(), + ))); + let scalar_exec = plan.to_execution_plan(memory_exec).unwrap(); + + let session_context = SessionContext::default(); + let result = datafusion::physical_plan::collect(scalar_exec, session_context.task_ctx()) + .await + .unwrap(); + + assert_eq!(result.len(), 1); + let batch = &result[0]; + assert_eq!(batch.num_columns(), 2); + assert_eq!(batch.num_rows(), 4); + assert_eq!(batch.schema().field(0).name(), "ts"); + assert_eq!(batch.schema().field(1).name(), "scalar(val)"); + + let ts = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(ts.values(), &[0i64, 5_000, 10_000, 15_000]); + + let values = batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(values.values(), &[1.0f64, 2.0, 3.0, 4.0]); + } + fn prepare_test_data(series: Vec) -> DataSourceExec { let schema = Arc::new(Schema::new(vec![ Field::new("ts", DataType::Timestamp(TimeUnit::Millisecond, None), true), diff --git a/src/promql/src/extension_plan/series_divide.rs b/src/promql/src/extension_plan/series_divide.rs index 8e50da113b..4085f0d44f 100644 --- a/src/promql/src/extension_plan/series_divide.rs +++ b/src/promql/src/extension_plan/series_divide.rs @@ -33,6 +33,7 @@ use datafusion::physical_plan::{ DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, PlanProperties, RecordBatchStream, SendableRecordBatchStream, }; +use datafusion_expr::col; use datatypes::arrow::compute; use datatypes::compute::SortOptions; use futures::{Stream, StreamExt, ready}; @@ -76,7 +77,38 @@ impl UserDefinedLogicalNodeCore for SeriesDivide { } fn expressions(&self) -> Vec { - vec![] + if self.unfix.is_some() { + return vec![]; + } + + self.tag_columns + .iter() + .map(col) + .chain(std::iter::once(col(&self.time_index_column))) + .collect() + } + + fn necessary_children_exprs(&self, output_columns: &[usize]) -> Option>> { + if self.unfix.is_some() { + return None; + } + + let input_schema = self.input.schema(); + if output_columns.is_empty() { + let indices = (0..input_schema.fields().len()).collect::>(); + return Some(vec![indices]); + } + + let mut required = Vec::with_capacity(output_columns.len() + 1 + self.tag_columns.len()); + required.extend_from_slice(output_columns); + for tag in &self.tag_columns { + required.push(input_schema.index_of_column_by_name(None, tag)?); + } + required.push(input_schema.index_of_column_by_name(None, &self.time_index_column)?); + + required.sort_unstable(); + required.dedup(); + Some(vec![required]) } fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { @@ -544,8 +576,10 @@ impl SeriesDivideStream { #[cfg(test)] mod test { use datafusion::arrow::datatypes::{DataType, Field, Schema}; + use datafusion::common::ToDFSchema; use datafusion::datasource::memory::MemorySourceConfig; use datafusion::datasource::source::DataSourceExec; + use datafusion::logical_expr::{EmptyRelation, LogicalPlan}; use datafusion::prelude::SessionContext; use super::*; @@ -611,6 +645,26 @@ mod test { )) } + #[test] + fn pruning_should_keep_tags_and_time_index_columns_for_exec() { + let df_schema = prepare_test_data().schema().to_dfschema_ref().unwrap(); + let input = LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row: false, + schema: df_schema, + }); + let plan = SeriesDivide::new( + vec!["host".to_string(), "path".to_string()], + "time_index".to_string(), + input, + ); + + // Simulate a parent projection requesting only the `host` column. + let output_columns = [0usize]; + let required = plan.necessary_children_exprs(&output_columns).unwrap(); + let required = &required[0]; + assert_eq!(required.as_slice(), &[0, 1, 2]); + } + #[tokio::test] async fn overall_data() { let memory_exec = Arc::new(prepare_test_data()); diff --git a/src/promql/src/extension_plan/union_distinct_on.rs b/src/promql/src/extension_plan/union_distinct_on.rs index 795669a4e9..a8a1fbb391 100644 --- a/src/promql/src/extension_plan/union_distinct_on.rs +++ b/src/promql/src/extension_plan/union_distinct_on.rs @@ -32,6 +32,7 @@ use datafusion::physical_plan::{ DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, PlanProperties, RecordBatchStream, SendableRecordBatchStream, hash_utils, }; +use datafusion_expr::col; use datatypes::arrow::compute; use futures::future::BoxFuture; use futures::{Stream, StreamExt, TryStreamExt, ready}; @@ -145,7 +146,20 @@ impl UserDefinedLogicalNodeCore for UnionDistinctOn { } fn expressions(&self) -> Vec { - vec![] + let mut exprs: Vec = self.compare_keys.iter().map(col).collect(); + if !self.compare_keys.iter().any(|key| key == &self.ts_col) { + exprs.push(col(&self.ts_col)); + } + exprs + } + + fn necessary_children_exprs(&self, _output_columns: &[usize]) -> Option>> { + let left_len = self.left.schema().fields().len(); + let right_len = self.right.schema().fields().len(); + Some(vec![ + (0..left_len).collect::>(), + (0..right_len).collect::>(), + ]) } fn fmt_for_explain(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { @@ -540,9 +554,43 @@ mod test { use datafusion::arrow::array::Int32Array; use datafusion::arrow::datatypes::{DataType, Field, Schema}; + use datafusion::common::ToDFSchema; + use datafusion::logical_expr::{EmptyRelation, LogicalPlan}; use super::*; + #[test] + fn pruning_should_keep_all_columns_for_exec() { + let schema = Arc::new(Schema::new(vec![ + Field::new("ts", DataType::Int32, false), + Field::new("k", DataType::Int32, false), + Field::new("v", DataType::Int32, false), + ])); + let df_schema = schema.to_dfschema_ref().unwrap(); + let left = LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row: false, + schema: df_schema.clone(), + }); + let right = LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row: false, + schema: df_schema.clone(), + }); + let plan = UnionDistinctOn::new( + left, + right, + vec!["k".to_string()], + "ts".to_string(), + df_schema, + ); + + // Simulate a parent projection requesting only one output column. + let output_columns = [2usize]; + let required = plan.necessary_children_exprs(&output_columns).unwrap(); + assert_eq!(required.len(), 2); + assert_eq!(required[0].as_slice(), &[0, 1, 2]); + assert_eq!(required[1].as_slice(), &[0, 1, 2]); + } + #[test] fn test_interleave_batches() { let schema = Schema::new(vec![ diff --git a/tests/cases/distributed/explain/step_aggr_advance.result b/tests/cases/distributed/explain/step_aggr_advance.result index 20407ddd73..bceb036978 100644 --- a/tests/cases/distributed/explain/step_aggr_advance.result +++ b/tests/cases/distributed/explain/step_aggr_advance.result @@ -544,8 +544,7 @@ tql explain (1752591864, 1752592164, '30s') sum by (a, b, c) (rate(aggr_optimize | | PromSeriesNormalize: offset=[0], time index=[greptime_timestamp], filter NaN: [true] | | | PromSeriesDivide: tags=["a", "b", "c", "d"] | | | Sort: aggr_optimize_not_count.a ASC NULLS FIRST, aggr_optimize_not_count.b ASC NULLS FIRST, aggr_optimize_not_count.c ASC NULLS FIRST, aggr_optimize_not_count.d ASC NULLS FIRST, aggr_optimize_not_count.greptime_timestamp ASC NULLS FIRST | -| | Projection: aggr_optimize_not_count.a, aggr_optimize_not_count.b, aggr_optimize_not_count.c, aggr_optimize_not_count.d, aggr_optimize_not_count.greptime_timestamp, aggr_optimize_not_count.greptime_value | -| | MergeScan [is_placeholder=false, remote_input=[ | +| | MergeScan [is_placeholder=false, remote_input=[ | | | Filter: aggr_optimize_not_count.greptime_timestamp >= TimestampMillisecond(-420000, None) AND aggr_optimize_not_count.greptime_timestamp <= TimestampMillisecond(300000, None) | | | TableScan: aggr_optimize_not_count | | | ]] | diff --git a/tests/cases/standalone/tql-explain-analyze/column_pruning.result b/tests/cases/standalone/tql-explain-analyze/column_pruning.result new file mode 100644 index 0000000000..09424dc61c --- /dev/null +++ b/tests/cases/standalone/tql-explain-analyze/column_pruning.result @@ -0,0 +1,91 @@ +CREATE TABLE promql_column_pruning ( + ts TIMESTAMP(3) TIME INDEX, + job STRING, + instance STRING, + region STRING, + greptime_value DOUBLE, + PRIMARY KEY(job, instance, region), +); + +Affected Rows: 0 + +INSERT INTO promql_column_pruning VALUES + (0, 'job1', 'instance1', 'region1', 1), + (0, 'job1', 'instance2', 'region1', 2), + (0, 'job2', 'instance1', 'region1', 3), + (5000, 'job1', 'instance1', 'region1', 4), + (5000, 'job1', 'instance2', 'region1', 5), + (5000, 'job2', 'instance1', 'region1', 6), + (10000, 'job1', 'instance1', 'region1', 7), + (10000, 'job1', 'instance2', 'region1', 8), + (10000, 'job2', 'instance1', 'region1', 9); + +Affected Rows: 9 + +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +-- SQLNESS REPLACE (Hash.*) REDACTED +TQL ANALYZE (0, 10, '5s') sum(promql_column_pruning); + ++-+-+-+ +| stage | node | plan_| ++-+-+-+ +| 0_| 0_|_CooperativeExec REDACTED +|_|_|_MergeScanExec: REDACTED +|_|_|_| +| 1_| 0_|_SortPreservingMergeExec: [ts@0 ASC NULLS LAST] REDACTED +|_|_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED +|_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[sum(promql_column_pruning.greptime_value)] REDACTED +|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED +|_|_|_RepartitionExec: partitioning=REDACTED +|_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[sum(promql_column_pruning.greptime_value)] REDACTED +|_|_|_ProjectionExec: expr=[ts@0 as ts, greptime_value@4 as greptime_value] REDACTED +|_|_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[ts] REDACTED +|_|_|_PromSeriesDivideExec: tags=["job", "instance", "region"] REDACTED +|_|_|_CooperativeExec REDACTED +|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED +|_|_|_| +|_|_| Total rows: 3_| ++-+-+-+ + +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +-- SQLNESS REPLACE (Hash.*) REDACTED +TQL ANALYZE (0, 10, '5s') sum(rate(promql_column_pruning[5s])); + ++-+-+-+ +| stage | node | plan_| ++-+-+-+ +| 0_| 0_|_CooperativeExec REDACTED +|_|_|_MergeScanExec: REDACTED +|_|_|_| +| 1_| 0_|_SortPreservingMergeExec: [ts@0 ASC NULLS LAST] REDACTED +|_|_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED +|_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[sum(prom_rate(ts_range,greptime_value,ts,Int64(5000)))] REDACTED +|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED +|_|_|_RepartitionExec: partitioning=REDACTED +|_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[sum(prom_rate(ts_range,greptime_value,ts,Int64(5000)))] REDACTED +|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED +|_|_|_FilterExec: prom_rate(ts_range,greptime_value,ts,Int64(5000))@1 IS NOT NULL REDACTED +|_|_|_ProjectionExec: expr=[ts@0 as ts, prom_rate(ts_range@5, greptime_value@4, ts@0, 5000) as prom_rate(ts_range,greptime_value,ts,Int64(5000))] REDACTED +|_|_|_PromRangeManipulateExec: req range=[0..10000], interval=[5000], eval range=[5000], time index=[ts] REDACTED +|_|_|_PromSeriesNormalizeExec: offset=[0], time index=[ts], filter NaN: [true] REDACTED +|_|_|_PromSeriesDivideExec: tags=["job", "instance", "region"] REDACTED +|_|_|_CooperativeExec REDACTED +|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED +|_|_|_| +|_|_| Total rows: 2_| ++-+-+-+ + +DROP TABLE promql_column_pruning; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/tql-explain-analyze/column_pruning.sql b/tests/cases/standalone/tql-explain-analyze/column_pruning.sql new file mode 100644 index 0000000000..3ddfb1ce0d --- /dev/null +++ b/tests/cases/standalone/tql-explain-analyze/column_pruning.sql @@ -0,0 +1,39 @@ +CREATE TABLE promql_column_pruning ( + ts TIMESTAMP(3) TIME INDEX, + job STRING, + instance STRING, + region STRING, + greptime_value DOUBLE, + PRIMARY KEY(job, instance, region), +); + +INSERT INTO promql_column_pruning VALUES + (0, 'job1', 'instance1', 'region1', 1), + (0, 'job1', 'instance2', 'region1', 2), + (0, 'job2', 'instance1', 'region1', 3), + (5000, 'job1', 'instance1', 'region1', 4), + (5000, 'job1', 'instance2', 'region1', 5), + (5000, 'job2', 'instance1', 'region1', 6), + (10000, 'job1', 'instance1', 'region1', 7), + (10000, 'job1', 'instance2', 'region1', 8), + (10000, 'job2', 'instance1', 'region1', 9); + +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +-- SQLNESS REPLACE (Hash.*) REDACTED +TQL ANALYZE (0, 10, '5s') sum(promql_column_pruning); + +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +-- SQLNESS REPLACE (Hash.*) REDACTED +TQL ANALYZE (0, 10, '5s') sum(rate(promql_column_pruning[5s])); + +DROP TABLE promql_column_pruning;