feat: enable pruning for manipulate plans (#7565)

* feat: enable pruning for manipulate plans

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* apply to other plans and add sqlness case

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix scalar manipulate and histogram fold for missing some columns

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* don't drop every columns

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove unrelated part

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2026-01-14 16:32:51 +08:00
committed by GitHub
parent 1c9aa59317
commit 170f94fc08
11 changed files with 797 additions and 21 deletions

View File

@@ -34,7 +34,7 @@ use datafusion::physical_plan::{
RecordBatchStream, SendableRecordBatchStream,
};
use datafusion_common::DFSchema;
use datafusion_expr::EmptyRelation;
use datafusion_expr::{EmptyRelation, col};
use datatypes::arrow;
use datatypes::arrow::array::{ArrayRef, Float64Array, TimestampMillisecondArray};
use datatypes::arrow::datatypes::{DataType, Field, SchemaRef, TimeUnit};
@@ -107,7 +107,21 @@ impl UserDefinedLogicalNodeCore for Absent {
}
fn expressions(&self) -> Vec<Expr> {
vec![]
if self.unfix.is_some() {
return vec![];
}
vec![col(&self.time_index_column)]
}
fn necessary_children_exprs(&self, _output_columns: &[usize]) -> Option<Vec<Vec<usize>>> {
if self.unfix.is_some() {
return None;
}
let input_schema = self.input.schema();
let time_index_idx = input_schema.index_of_column_by_name(None, &self.time_index_column)?;
Some(vec![vec![time_index_idx]])
}
fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {

View File

@@ -40,6 +40,7 @@ use datafusion::physical_plan::{
Partitioning, PhysicalExpr, PlanProperties, RecordBatchStream, SendableRecordBatchStream,
};
use datafusion::prelude::{Column, Expr};
use datafusion_expr::col;
use datatypes::prelude::{ConcreteDataType, DataType as GtDataType};
use datatypes::value::{OrderedF64, Value, ValueRef};
use datatypes::vectors::{Helper, MutableVector, VectorRef};
@@ -88,7 +89,45 @@ impl UserDefinedLogicalNodeCore for HistogramFold {
}
fn expressions(&self) -> Vec<Expr> {
vec![]
let mut exprs = vec![
col(&self.le_column),
col(&self.ts_column),
col(&self.field_column),
];
exprs.extend(self.input.schema().fields().iter().filter_map(|f| {
let name = f.name();
if name != &self.le_column && name != &self.ts_column && name != &self.field_column {
Some(col(name))
} else {
None
}
}));
exprs
}
fn necessary_children_exprs(&self, output_columns: &[usize]) -> Option<Vec<Vec<usize>>> {
let input_schema = self.input.schema();
let le_column_index = input_schema.index_of_column_by_name(None, &self.le_column)?;
if output_columns.is_empty() {
let indices = (0..input_schema.fields().len()).collect::<Vec<_>>();
return Some(vec![indices]);
}
let mut necessary_indices = output_columns
.iter()
.map(|&output_column| {
if output_column < le_column_index {
output_column
} else {
output_column + 1
}
})
.collect::<Vec<_>>();
necessary_indices.push(le_column_index);
necessary_indices.sort_unstable();
necessary_indices.dedup();
Some(vec![necessary_indices])
}
fn fmt_for_explain(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
@@ -998,11 +1037,26 @@ mod test {
use datafusion::common::ToDFSchema;
use datafusion::datasource::memory::MemorySourceConfig;
use datafusion::datasource::source::DataSourceExec;
use datafusion::logical_expr::EmptyRelation;
use datafusion::prelude::SessionContext;
use datatypes::arrow_array::StringArray;
use futures::FutureExt;
use super::*;
fn project_batch(batch: &RecordBatch, indices: &[usize]) -> RecordBatch {
let fields = indices
.iter()
.map(|&idx| batch.schema().field(idx).clone())
.collect::<Vec<_>>();
let columns = indices
.iter()
.map(|&idx| batch.column(idx).clone())
.collect::<Vec<_>>();
let schema = Arc::new(Schema::new(fields));
RecordBatch::try_new(schema, columns).unwrap()
}
fn prepare_test_data() -> DataSourceExec {
let schema = Arc::new(Schema::new(vec![
Field::new("host", DataType::Utf8, true),
@@ -1190,6 +1244,100 @@ mod test {
assert_eq!(result_literal, expected);
}
#[tokio::test]
async fn pruning_should_keep_le_column_for_exec() {
let schema = Arc::new(Schema::new(vec![
Field::new("ts", DataType::Timestamp(TimeUnit::Millisecond, None), true),
Field::new("le", DataType::Utf8, true),
Field::new("val", DataType::Float64, true),
]));
let df_schema = schema.clone().to_dfschema_ref().unwrap();
let input = LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: df_schema,
});
let plan = HistogramFold::new(
"le".to_string(),
"val".to_string(),
"ts".to_string(),
0.5,
input,
)
.unwrap();
let output_columns = [0usize, 1usize];
let required = plan.necessary_children_exprs(&output_columns).unwrap();
let required = &required[0];
assert_eq!(required.as_slice(), &[0, 1, 2]);
let input_batch = RecordBatch::try_new(
schema,
vec![
Arc::new(TimestampMillisecondArray::from(vec![0, 0])),
Arc::new(StringArray::from(vec!["0.1", "+Inf"])),
Arc::new(Float64Array::from(vec![1.0, 2.0])),
],
)
.unwrap();
let projected = project_batch(&input_batch, required);
let projected_schema = projected.schema();
let memory_exec = Arc::new(DataSourceExec::new(Arc::new(
MemorySourceConfig::try_new(&[vec![projected]], projected_schema, None).unwrap(),
)));
let fold_exec = plan.to_execution_plan(memory_exec);
let session_context = SessionContext::default();
let output_batches =
datafusion::physical_plan::collect(fold_exec, session_context.task_ctx())
.await
.unwrap();
assert_eq!(output_batches.len(), 1);
let output_batch = &output_batches[0];
assert_eq!(output_batch.num_rows(), 1);
let ts = output_batch
.column(0)
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap();
assert_eq!(ts.values(), &[0i64]);
let values = output_batch
.column(1)
.as_any()
.downcast_ref::<Float64Array>()
.unwrap();
assert!((values.value(0) - 0.1).abs() < 1e-12);
// Simulate the pre-fix pruning behavior: omit the `le` column from the child input.
let le_index = 1usize;
let broken_required = output_columns
.iter()
.map(|&output_column| {
if output_column < le_index {
output_column
} else {
output_column + 1
}
})
.collect::<Vec<_>>();
let broken = project_batch(&input_batch, &broken_required);
let broken_schema = broken.schema();
let broken_exec = Arc::new(DataSourceExec::new(Arc::new(
MemorySourceConfig::try_new(&[vec![broken]], broken_schema, None).unwrap(),
)));
let broken_fold_exec = plan.to_execution_plan(broken_exec);
let session_context = SessionContext::default();
let broken_result = std::panic::AssertUnwindSafe(async {
datafusion::physical_plan::collect(broken_fold_exec, session_context.task_ctx()).await
})
.catch_unwind()
.await;
assert!(broken_result.is_err());
}
#[test]
fn confirm_schema() {
let input_schema = Schema::new(vec![

View File

@@ -33,6 +33,7 @@ use datafusion::physical_plan::{
DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, PlanProperties, RecordBatchStream,
SendableRecordBatchStream, Statistics,
};
use datafusion_expr::col;
use datatypes::arrow::compute;
use datatypes::arrow::error::Result as ArrowResult;
use futures::{Stream, StreamExt, ready};
@@ -84,7 +85,37 @@ impl UserDefinedLogicalNodeCore for InstantManipulate {
}
fn expressions(&self) -> Vec<Expr> {
vec![]
if self.unfix.is_some() {
return vec![];
}
let mut exprs = vec![col(&self.time_index_column)];
if let Some(field) = &self.field_column {
exprs.push(col(field));
}
exprs
}
fn necessary_children_exprs(&self, output_columns: &[usize]) -> Option<Vec<Vec<usize>>> {
if self.unfix.is_some() {
return None;
}
let input_schema = self.input.schema();
if output_columns.is_empty() {
let indices = (0..input_schema.fields().len()).collect::<Vec<_>>();
return Some(vec![indices]);
}
let mut required = output_columns.to_vec();
required.push(input_schema.index_of_column_by_name(None, &self.time_index_column)?);
if let Some(field) = &self.field_column {
required.push(input_schema.index_of_column_by_name(None, field)?);
}
required.sort_unstable();
required.dedup();
Some(vec![required])
}
fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
@@ -440,8 +471,6 @@ impl InstantManipulateStream {
// refer to Go version: https://github.com/prometheus/prometheus/blob/e934d0f01158a1d55fa0ebb035346b195fcc1260/promql/engine.go#L1571
// and the function `vectorSelectorSingle`
pub fn manipulate(&self, input: RecordBatch) -> DataFusionResult<RecordBatch> {
let mut take_indices = vec![];
let ts_column = input
.column(self.time_index)
.as_any()
@@ -473,6 +502,8 @@ impl InstantManipulateStream {
let aligned_start = self.start + (max_start - self.start) / self.interval * self.interval;
let aligned_end = self.end - (self.end - min_end) / self.interval * self.interval;
let mut take_indices = vec![];
let mut cursor = 0;
let aligned_ts_iter = (aligned_start..=aligned_end).step_by(self.interval as usize);
@@ -570,6 +601,8 @@ impl InstantManipulateStream {
#[cfg(test)]
mod test {
use datafusion::common::ToDFSchema;
use datafusion::logical_expr::{EmptyRelation, LogicalPlan};
use datafusion::prelude::SessionContext;
use super::*;
@@ -611,6 +644,30 @@ mod test {
assert_eq!(result_literal, expected);
}
#[test]
fn pruning_should_keep_time_and_field_columns_for_exec() {
let df_schema = prepare_test_data().schema().to_dfschema_ref().unwrap();
let input = LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: df_schema,
});
let plan = InstantManipulate::new(
0,
0,
0,
0,
TIME_INDEX_COLUMN.to_string(),
Some("value".to_string()),
input,
);
// Simulate a parent projection requesting only the `path` column.
let output_columns = [2usize];
let required = plan.necessary_children_exprs(&output_columns).unwrap();
let required = &required[0];
assert_eq!(required.as_slice(), &[0, 1, 2]);
}
#[tokio::test]
async fn lookback_10s_interval_30s() {
let expected = String::from(

View File

@@ -31,6 +31,7 @@ use datafusion::physical_plan::{
DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, PlanProperties, RecordBatchStream,
SendableRecordBatchStream,
};
use datafusion_expr::col;
use datatypes::arrow::array::TimestampMillisecondArray;
use datatypes::arrow::datatypes::SchemaRef;
use datatypes::arrow::record_batch::RecordBatch;
@@ -83,7 +84,38 @@ impl UserDefinedLogicalNodeCore for SeriesNormalize {
}
fn expressions(&self) -> Vec<datafusion::logical_expr::Expr> {
vec![]
if self.unfix.is_some() {
return vec![];
}
self.tag_columns
.iter()
.map(col)
.chain(std::iter::once(col(&self.time_index_column_name)))
.collect()
}
fn necessary_children_exprs(&self, output_columns: &[usize]) -> Option<Vec<Vec<usize>>> {
if self.unfix.is_some() {
return None;
}
let input_schema = self.input.schema();
if output_columns.is_empty() {
let indices = (0..input_schema.fields().len()).collect::<Vec<_>>();
return Some(vec![indices]);
}
let mut required = Vec::with_capacity(output_columns.len() + 1 + self.tag_columns.len());
required.extend_from_slice(output_columns);
required.push(input_schema.index_of_column_by_name(None, &self.time_index_column_name)?);
for tag in &self.tag_columns {
required.push(input_schema.index_of_column_by_name(None, tag)?);
}
required.sort_unstable();
required.dedup();
Some(vec![required])
}
fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
@@ -429,8 +461,10 @@ mod test {
use datafusion::arrow::datatypes::{
ArrowPrimitiveType, DataType, Field, Schema, TimestampMillisecondType,
};
use datafusion::common::ToDFSchema;
use datafusion::datasource::memory::MemorySourceConfig;
use datafusion::datasource::source::DataSourceExec;
use datafusion::logical_expr::{EmptyRelation, LogicalPlan};
use datafusion::prelude::SessionContext;
use datatypes::arrow::array::TimestampMillisecondArray;
use datatypes::arrow_array::StringArray;
@@ -461,6 +495,23 @@ mod test {
))
}
#[test]
fn pruning_should_keep_time_and_tag_columns_for_exec() {
let df_schema = prepare_test_data().schema().to_dfschema_ref().unwrap();
let input = LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: df_schema,
});
let plan =
SeriesNormalize::new(0, TIME_INDEX_COLUMN, true, vec!["path".to_string()], input);
// Simulate a parent projection requesting only the `value` column.
let output_columns = [1usize];
let required = plan.necessary_children_exprs(&output_columns).unwrap();
let required = &required[0];
assert_eq!(required.as_slice(), &[0, 1, 2]);
}
#[tokio::test]
async fn test_sort_record_batch() {
let memory_exec = Arc::new(prepare_test_data());

View File

@@ -18,7 +18,7 @@ use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use common_telemetry::debug;
use common_telemetry::{debug, warn};
use datafusion::arrow::array::{Array, ArrayRef, Int64Array, TimestampMillisecondArray};
use datafusion::arrow::compute;
use datafusion::arrow::datatypes::{Field, SchemaRef};
@@ -38,6 +38,7 @@ use datafusion::physical_plan::{
SendableRecordBatchStream, Statistics,
};
use datafusion::sql::TableReference;
use datafusion_expr::col;
use futures::{Stream, StreamExt, ready};
use greptime_proto::substrait_extension as pb;
use prost::Message;
@@ -288,7 +289,53 @@ impl UserDefinedLogicalNodeCore for RangeManipulate {
}
fn expressions(&self) -> Vec<Expr> {
vec![]
if self.unfix.is_some() {
return vec![];
}
let mut exprs = Vec::with_capacity(1 + self.field_columns.len());
exprs.push(col(&self.time_index));
exprs.extend(self.field_columns.iter().map(col));
exprs
}
fn necessary_children_exprs(&self, output_columns: &[usize]) -> Option<Vec<Vec<usize>>> {
if self.unfix.is_some() {
return None;
}
let input_schema = self.input.schema();
let input_len = input_schema.fields().len();
let time_index_idx = input_schema.index_of_column_by_name(None, &self.time_index)?;
if output_columns.is_empty() {
let indices = (0..input_len).collect::<Vec<_>>();
return Some(vec![indices]);
}
let mut required = Vec::with_capacity(output_columns.len() + 1 + self.field_columns.len());
required.push(time_index_idx);
for value_column in &self.field_columns {
required.push(input_schema.index_of_column_by_name(None, value_column)?);
}
for &idx in output_columns {
if idx < input_len {
required.push(idx);
} else if idx == input_len {
// Derived timestamp range column.
required.push(time_index_idx);
} else {
warn!(
"Output column index {} is out of bounds for input schema with length {}",
idx, input_len
);
return None;
}
}
required.sort_unstable();
required.dedup();
Some(vec![required])
}
fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
@@ -734,16 +781,31 @@ mod test {
use datafusion::common::ToDFSchema;
use datafusion::datasource::memory::MemorySourceConfig;
use datafusion::datasource::source::DataSourceExec;
use datafusion::logical_expr::{EmptyRelation, LogicalPlan};
use datafusion::physical_expr::Partitioning;
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion::physical_plan::memory::MemoryStream;
use datafusion::prelude::SessionContext;
use datatypes::arrow::array::TimestampMillisecondArray;
use futures::FutureExt;
use super::*;
const TIME_INDEX_COLUMN: &str = "timestamp";
fn project_batch(batch: &RecordBatch, indices: &[usize]) -> RecordBatch {
let fields = indices
.iter()
.map(|&idx| batch.schema().field(idx).clone())
.collect::<Vec<_>>();
let columns = indices
.iter()
.map(|&idx| batch.column(idx).clone())
.collect::<Vec<_>>();
let schema = Arc::new(Schema::new(fields));
RecordBatch::try_new(schema, columns).unwrap()
}
fn prepare_test_data() -> DataSourceExec {
let schema = Arc::new(Schema::new(vec![
Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true),
@@ -844,6 +906,92 @@ mod test {
assert_eq!(result_literal, expected);
}
#[tokio::test]
async fn pruning_should_keep_time_and_value_columns_for_exec() {
let schema = Arc::new(Schema::new(vec![
Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true),
Field::new("value_1", DataType::Float64, true),
Field::new("value_2", DataType::Float64, true),
Field::new("path", DataType::Utf8, true),
]));
let df_schema = schema.clone().to_dfschema_ref().unwrap();
let input = LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: df_schema,
});
let plan = RangeManipulate::new(
0,
310_000,
30_000,
90_000,
TIME_INDEX_COLUMN.to_string(),
vec!["value_1".to_string(), "value_2".to_string()],
input,
)
.unwrap();
// Simulate a parent projection requesting only the `path` column.
let output_columns = [3usize];
let required = plan.necessary_children_exprs(&output_columns).unwrap();
let required = &required[0];
assert_eq!(required.as_slice(), &[0, 1, 2, 3]);
let timestamp_column = Arc::new(TimestampMillisecondArray::from(vec![
0, 30_000, 60_000, 90_000, 120_000, // every 30s
180_000, 240_000, // every 60s
241_000, 271_000, 291_000, // others
])) as _;
let field_column: ArrayRef = Arc::new(Float64Array::from(vec![1.0; 10])) as _;
let path_column = Arc::new(StringArray::from(vec!["foo"; 10])) as _;
let input_batch = RecordBatch::try_new(
schema,
vec![
timestamp_column,
field_column.clone(),
field_column,
path_column,
],
)
.unwrap();
let projected = project_batch(&input_batch, required);
let projected_schema = projected.schema();
let memory_exec = Arc::new(DataSourceExec::new(Arc::new(
MemorySourceConfig::try_new(&[vec![projected]], projected_schema, None).unwrap(),
)));
let range_exec = plan.to_execution_plan(memory_exec);
let session_context = SessionContext::default();
let output_batches =
datafusion::physical_plan::collect(range_exec, session_context.task_ctx())
.await
.unwrap();
assert_eq!(output_batches.len(), 1);
let output_batch = &output_batches[0];
let path = output_batch
.column(3)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert!(path.iter().all(|v| v == Some("foo")));
// Simulate the pre-fix pruning behavior: omit the timestamp/value columns from the child.
let broken_required = [3usize];
let broken = project_batch(&input_batch, &broken_required);
let broken_schema = broken.schema();
let broken_exec = Arc::new(DataSourceExec::new(Arc::new(
MemorySourceConfig::try_new(&[vec![broken]], broken_schema, None).unwrap(),
)));
let broken_range_exec = plan.to_execution_plan(broken_exec);
let session_context = SessionContext::default();
let broken_result = std::panic::AssertUnwindSafe(async {
datafusion::physical_plan::collect(broken_range_exec, session_context.task_ctx()).await
})
.catch_unwind()
.await;
assert!(broken_result.is_err());
}
#[tokio::test]
async fn interval_30s_range_90s() {
let expected = String::from(

View File

@@ -31,6 +31,7 @@ use datafusion::physical_plan::{
};
use datafusion::prelude::Expr;
use datafusion::sql::TableReference;
use datafusion_expr::col;
use datatypes::arrow::array::{Array, Float64Array, StringArray, TimestampMillisecondArray};
use datatypes::arrow::compute::{CastOptions, cast_with_options, concat_batches};
use datatypes::arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit};
@@ -266,7 +267,36 @@ impl UserDefinedLogicalNodeCore for ScalarCalculate {
}
fn expressions(&self) -> Vec<Expr> {
vec![]
if self.unfix.is_some() {
return vec![];
}
self.tag_columns
.iter()
.map(col)
.chain(std::iter::once(col(&self.time_index)))
.chain(std::iter::once(col(&self.field_column)))
.collect()
}
fn necessary_children_exprs(&self, _output_columns: &[usize]) -> Option<Vec<Vec<usize>>> {
if self.unfix.is_some() {
return None;
}
let input_schema = self.input.schema();
let time_index_idx = input_schema.index_of_column_by_name(None, &self.time_index)?;
let field_column_idx = input_schema.index_of_column_by_name(None, &self.field_column)?;
let mut required = Vec::with_capacity(2 + self.tag_columns.len());
required.extend([time_index_idx, field_column_idx]);
for tag in &self.tag_columns {
required.push(input_schema.index_of_column_by_name(None, tag)?);
}
required.sort_unstable();
required.dedup();
Some(vec![required])
}
fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
@@ -275,15 +305,9 @@ impl UserDefinedLogicalNodeCore for ScalarCalculate {
fn with_exprs_and_inputs(
&self,
exprs: Vec<Expr>,
_exprs: Vec<Expr>,
inputs: Vec<LogicalPlan>,
) -> DataFusionResult<Self> {
if !exprs.is_empty() {
return Err(DataFusionError::Internal(
"ScalarCalculate should not have any expressions".to_string(),
));
}
let input: LogicalPlan = inputs.into_iter().next().unwrap();
let input_schema = input.schema();
@@ -624,6 +648,109 @@ mod test {
use super::*;
fn project_batch(batch: &RecordBatch, indices: &[usize]) -> RecordBatch {
let fields = indices
.iter()
.map(|&idx| batch.schema().field(idx).clone())
.collect::<Vec<_>>();
let columns = indices
.iter()
.map(|&idx| batch.column(idx).clone())
.collect::<Vec<_>>();
let schema = Arc::new(Schema::new(fields));
RecordBatch::try_new(schema, columns).unwrap()
}
#[test]
fn necessary_children_exprs_preserve_tag_columns() {
let schema = Arc::new(Schema::new(vec![
Field::new("ts", DataType::Timestamp(TimeUnit::Millisecond, None), true),
Field::new("tag1", DataType::Utf8, true),
Field::new("tag2", DataType::Utf8, true),
Field::new("val", DataType::Float64, true),
Field::new("extra", DataType::Utf8, true),
]));
let schema = Arc::new(DFSchema::try_from(schema).unwrap());
let input = LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema,
});
let tag_columns = vec!["tag1".to_string(), "tag2".to_string()];
let plan = ScalarCalculate::new(0, 1, 1, input, "ts", &tag_columns, "val", None).unwrap();
let required = plan.necessary_children_exprs(&[0, 1]).unwrap();
assert_eq!(required, vec![vec![0, 1, 2, 3]]);
}
#[tokio::test]
async fn pruning_should_keep_tag_columns_for_exec() {
let schema = Arc::new(Schema::new(vec![
Field::new("ts", DataType::Timestamp(TimeUnit::Millisecond, None), true),
Field::new("tag1", DataType::Utf8, true),
Field::new("tag2", DataType::Utf8, true),
Field::new("val", DataType::Float64, true),
Field::new("extra", DataType::Utf8, true),
]));
let df_schema = Arc::new(DFSchema::try_from(schema.clone()).unwrap());
let input = LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: df_schema,
});
let tag_columns = vec!["tag1".to_string(), "tag2".to_string()];
let plan =
ScalarCalculate::new(0, 15_000, 5000, input, "ts", &tag_columns, "val", None).unwrap();
let required = plan.necessary_children_exprs(&[0, 1]).unwrap();
let required = &required[0];
let batch = RecordBatch::try_new(
schema,
vec![
Arc::new(TimestampMillisecondArray::from(vec![
0, 5_000, 10_000, 15_000,
])),
Arc::new(StringArray::from(vec!["foo", "foo", "foo", "foo"])),
Arc::new(StringArray::from(vec!["bar", "bar", "bar", "bar"])),
Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0, 4.0])),
Arc::new(StringArray::from(vec!["x", "x", "x", "x"])),
],
)
.unwrap();
let projected_batch = project_batch(&batch, required);
let projected_schema = projected_batch.schema();
let memory_exec = Arc::new(DataSourceExec::new(Arc::new(
MemorySourceConfig::try_new(&[vec![projected_batch]], projected_schema, None).unwrap(),
)));
let scalar_exec = plan.to_execution_plan(memory_exec).unwrap();
let session_context = SessionContext::default();
let result = datafusion::physical_plan::collect(scalar_exec, session_context.task_ctx())
.await
.unwrap();
assert_eq!(result.len(), 1);
let batch = &result[0];
assert_eq!(batch.num_columns(), 2);
assert_eq!(batch.num_rows(), 4);
assert_eq!(batch.schema().field(0).name(), "ts");
assert_eq!(batch.schema().field(1).name(), "scalar(val)");
let ts = batch
.column(0)
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap();
assert_eq!(ts.values(), &[0i64, 5_000, 10_000, 15_000]);
let values = batch
.column(1)
.as_any()
.downcast_ref::<Float64Array>()
.unwrap();
assert_eq!(values.values(), &[1.0f64, 2.0, 3.0, 4.0]);
}
fn prepare_test_data(series: Vec<RecordBatch>) -> DataSourceExec {
let schema = Arc::new(Schema::new(vec![
Field::new("ts", DataType::Timestamp(TimeUnit::Millisecond, None), true),

View File

@@ -33,6 +33,7 @@ use datafusion::physical_plan::{
DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, PlanProperties, RecordBatchStream,
SendableRecordBatchStream,
};
use datafusion_expr::col;
use datatypes::arrow::compute;
use datatypes::compute::SortOptions;
use futures::{Stream, StreamExt, ready};
@@ -76,7 +77,38 @@ impl UserDefinedLogicalNodeCore for SeriesDivide {
}
fn expressions(&self) -> Vec<Expr> {
vec![]
if self.unfix.is_some() {
return vec![];
}
self.tag_columns
.iter()
.map(col)
.chain(std::iter::once(col(&self.time_index_column)))
.collect()
}
fn necessary_children_exprs(&self, output_columns: &[usize]) -> Option<Vec<Vec<usize>>> {
if self.unfix.is_some() {
return None;
}
let input_schema = self.input.schema();
if output_columns.is_empty() {
let indices = (0..input_schema.fields().len()).collect::<Vec<_>>();
return Some(vec![indices]);
}
let mut required = Vec::with_capacity(output_columns.len() + 1 + self.tag_columns.len());
required.extend_from_slice(output_columns);
for tag in &self.tag_columns {
required.push(input_schema.index_of_column_by_name(None, tag)?);
}
required.push(input_schema.index_of_column_by_name(None, &self.time_index_column)?);
required.sort_unstable();
required.dedup();
Some(vec![required])
}
fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
@@ -544,8 +576,10 @@ impl SeriesDivideStream {
#[cfg(test)]
mod test {
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::common::ToDFSchema;
use datafusion::datasource::memory::MemorySourceConfig;
use datafusion::datasource::source::DataSourceExec;
use datafusion::logical_expr::{EmptyRelation, LogicalPlan};
use datafusion::prelude::SessionContext;
use super::*;
@@ -611,6 +645,26 @@ mod test {
))
}
#[test]
fn pruning_should_keep_tags_and_time_index_columns_for_exec() {
let df_schema = prepare_test_data().schema().to_dfschema_ref().unwrap();
let input = LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: df_schema,
});
let plan = SeriesDivide::new(
vec!["host".to_string(), "path".to_string()],
"time_index".to_string(),
input,
);
// Simulate a parent projection requesting only the `host` column.
let output_columns = [0usize];
let required = plan.necessary_children_exprs(&output_columns).unwrap();
let required = &required[0];
assert_eq!(required.as_slice(), &[0, 1, 2]);
}
#[tokio::test]
async fn overall_data() {
let memory_exec = Arc::new(prepare_test_data());

View File

@@ -32,6 +32,7 @@ use datafusion::physical_plan::{
DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, PlanProperties,
RecordBatchStream, SendableRecordBatchStream, hash_utils,
};
use datafusion_expr::col;
use datatypes::arrow::compute;
use futures::future::BoxFuture;
use futures::{Stream, StreamExt, TryStreamExt, ready};
@@ -145,7 +146,20 @@ impl UserDefinedLogicalNodeCore for UnionDistinctOn {
}
fn expressions(&self) -> Vec<Expr> {
vec![]
let mut exprs: Vec<Expr> = self.compare_keys.iter().map(col).collect();
if !self.compare_keys.iter().any(|key| key == &self.ts_col) {
exprs.push(col(&self.ts_col));
}
exprs
}
fn necessary_children_exprs(&self, _output_columns: &[usize]) -> Option<Vec<Vec<usize>>> {
let left_len = self.left.schema().fields().len();
let right_len = self.right.schema().fields().len();
Some(vec![
(0..left_len).collect::<Vec<_>>(),
(0..right_len).collect::<Vec<_>>(),
])
}
fn fmt_for_explain(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
@@ -540,9 +554,43 @@ mod test {
use datafusion::arrow::array::Int32Array;
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::common::ToDFSchema;
use datafusion::logical_expr::{EmptyRelation, LogicalPlan};
use super::*;
#[test]
fn pruning_should_keep_all_columns_for_exec() {
let schema = Arc::new(Schema::new(vec![
Field::new("ts", DataType::Int32, false),
Field::new("k", DataType::Int32, false),
Field::new("v", DataType::Int32, false),
]));
let df_schema = schema.to_dfschema_ref().unwrap();
let left = LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: df_schema.clone(),
});
let right = LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: df_schema.clone(),
});
let plan = UnionDistinctOn::new(
left,
right,
vec!["k".to_string()],
"ts".to_string(),
df_schema,
);
// Simulate a parent projection requesting only one output column.
let output_columns = [2usize];
let required = plan.necessary_children_exprs(&output_columns).unwrap();
assert_eq!(required.len(), 2);
assert_eq!(required[0].as_slice(), &[0, 1, 2]);
assert_eq!(required[1].as_slice(), &[0, 1, 2]);
}
#[test]
fn test_interleave_batches() {
let schema = Schema::new(vec![

View File

@@ -544,8 +544,7 @@ tql explain (1752591864, 1752592164, '30s') sum by (a, b, c) (rate(aggr_optimize
| | PromSeriesNormalize: offset=[0], time index=[greptime_timestamp], filter NaN: [true] |
| | PromSeriesDivide: tags=["a", "b", "c", "d"] |
| | Sort: aggr_optimize_not_count.a ASC NULLS FIRST, aggr_optimize_not_count.b ASC NULLS FIRST, aggr_optimize_not_count.c ASC NULLS FIRST, aggr_optimize_not_count.d ASC NULLS FIRST, aggr_optimize_not_count.greptime_timestamp ASC NULLS FIRST |
| | Projection: aggr_optimize_not_count.a, aggr_optimize_not_count.b, aggr_optimize_not_count.c, aggr_optimize_not_count.d, aggr_optimize_not_count.greptime_timestamp, aggr_optimize_not_count.greptime_value |
| | MergeScan [is_placeholder=false, remote_input=[ |
| | MergeScan [is_placeholder=false, remote_input=[ |
| | Filter: aggr_optimize_not_count.greptime_timestamp >= TimestampMillisecond(-420000, None) AND aggr_optimize_not_count.greptime_timestamp <= TimestampMillisecond(300000, None) |
| | TableScan: aggr_optimize_not_count |
| | ]] |

View File

@@ -0,0 +1,91 @@
CREATE TABLE promql_column_pruning (
ts TIMESTAMP(3) TIME INDEX,
job STRING,
instance STRING,
region STRING,
greptime_value DOUBLE,
PRIMARY KEY(job, instance, region),
);
Affected Rows: 0
INSERT INTO promql_column_pruning VALUES
(0, 'job1', 'instance1', 'region1', 1),
(0, 'job1', 'instance2', 'region1', 2),
(0, 'job2', 'instance1', 'region1', 3),
(5000, 'job1', 'instance1', 'region1', 4),
(5000, 'job1', 'instance2', 'region1', 5),
(5000, 'job2', 'instance1', 'region1', 6),
(10000, 'job1', 'instance1', 'region1', 7),
(10000, 'job1', 'instance2', 'region1', 8),
(10000, 'job2', 'instance1', 'region1', 9);
Affected Rows: 9
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
TQL ANALYZE (0, 10, '5s') sum(promql_column_pruning);
+-+-+-+
| stage | node | plan_|
+-+-+-+
| 0_| 0_|_CooperativeExec REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_SortPreservingMergeExec: [ts@0 ASC NULLS LAST] REDACTED
|_|_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[sum(promql_column_pruning.greptime_value)] REDACTED
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[sum(promql_column_pruning.greptime_value)] REDACTED
|_|_|_ProjectionExec: expr=[ts@0 as ts, greptime_value@4 as greptime_value] REDACTED
|_|_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[ts] REDACTED
|_|_|_PromSeriesDivideExec: tags=["job", "instance", "region"] REDACTED
|_|_|_CooperativeExec REDACTED
|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED
|_|_|_|
|_|_| Total rows: 3_|
+-+-+-+
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
TQL ANALYZE (0, 10, '5s') sum(rate(promql_column_pruning[5s]));
+-+-+-+
| stage | node | plan_|
+-+-+-+
| 0_| 0_|_CooperativeExec REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_SortPreservingMergeExec: [ts@0 ASC NULLS LAST] REDACTED
|_|_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[sum(prom_rate(ts_range,greptime_value,ts,Int64(5000)))] REDACTED
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[sum(prom_rate(ts_range,greptime_value,ts,Int64(5000)))] REDACTED
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_FilterExec: prom_rate(ts_range,greptime_value,ts,Int64(5000))@1 IS NOT NULL REDACTED
|_|_|_ProjectionExec: expr=[ts@0 as ts, prom_rate(ts_range@5, greptime_value@4, ts@0, 5000) as prom_rate(ts_range,greptime_value,ts,Int64(5000))] REDACTED
|_|_|_PromRangeManipulateExec: req range=[0..10000], interval=[5000], eval range=[5000], time index=[ts] REDACTED
|_|_|_PromSeriesNormalizeExec: offset=[0], time index=[ts], filter NaN: [true] REDACTED
|_|_|_PromSeriesDivideExec: tags=["job", "instance", "region"] REDACTED
|_|_|_CooperativeExec REDACTED
|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED
|_|_|_|
|_|_| Total rows: 2_|
+-+-+-+
DROP TABLE promql_column_pruning;
Affected Rows: 0

View File

@@ -0,0 +1,39 @@
CREATE TABLE promql_column_pruning (
ts TIMESTAMP(3) TIME INDEX,
job STRING,
instance STRING,
region STRING,
greptime_value DOUBLE,
PRIMARY KEY(job, instance, region),
);
INSERT INTO promql_column_pruning VALUES
(0, 'job1', 'instance1', 'region1', 1),
(0, 'job1', 'instance2', 'region1', 2),
(0, 'job2', 'instance1', 'region1', 3),
(5000, 'job1', 'instance1', 'region1', 4),
(5000, 'job1', 'instance2', 'region1', 5),
(5000, 'job2', 'instance1', 'region1', 6),
(10000, 'job1', 'instance1', 'region1', 7),
(10000, 'job1', 'instance2', 'region1', 8),
(10000, 'job2', 'instance1', 'region1', 9);
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
TQL ANALYZE (0, 10, '5s') sum(promql_column_pruning);
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
TQL ANALYZE (0, 10, '5s') sum(rate(promql_column_pruning[5s]));
DROP TABLE promql_column_pruning;