diff --git a/src/query/src/range_select/plan.rs b/src/query/src/range_select/plan.rs index 83b47bc1b1..d83514f435 100644 --- a/src/query/src/range_select/plan.rs +++ b/src/query/src/range_select/plan.rs @@ -1165,8 +1165,9 @@ impl RangeSelectStream { let num_rows = self.output_batch.as_ref().unwrap().num_rows(); if num_rows == 0 { + self.output_batch = None; self.output_batch_offset = 0; - return Ok(self.output_batch.take()); + return Ok(None); } if self.output_batch_offset == 0 && num_rows <= self.batch_size { @@ -1236,7 +1237,10 @@ impl Stream for RangeSelectStream { } Poll::Ready(Some(Ok(batch))) } - Ok(None) => Poll::Ready(None), + Ok(None) => { + self.exec_state = ExecutionState::Done; + Poll::Ready(None) + } // error making output Err(error) => Poll::Ready(Some(Err(error))), }; @@ -1290,7 +1294,7 @@ mod test { use datafusion::prelude::SessionContext; use datafusion_physical_expr::PhysicalSortExpr; use datafusion_physical_expr::expressions::Column; - use datatypes::arrow::array::TimestampMillisecondArray; + use datatypes::arrow::array::{Float64Array, Int64Array, TimestampMillisecondArray}; use datatypes::arrow_array::StringArray; use super::*; @@ -1352,6 +1356,40 @@ mod test { )) } + fn prepare_empty_test_data(is_float: bool) -> DataSourceExec { + let schema = Arc::new(Schema::new(vec![ + Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true), + Field::new( + "value", + if is_float { + DataType::Float64 + } else { + DataType::Int64 + }, + true, + ), + Field::new("host", DataType::Utf8, true), + ])); + let timestamp_column: Arc = + Arc::new(TimestampMillisecondArray::from(Vec::::new())) as _; + let value_column: Arc = if is_float { + Arc::new(Float64Array::from(Vec::>::new())) as _ + } else { + Arc::new(Int64Array::from(Vec::>::new())) as _ + }; + let host_column: Arc = + Arc::new(StringArray::from(Vec::>::new())) as _; + let data = RecordBatch::try_new( + schema.clone(), + vec![timestamp_column, value_column, host_column], + ) + .unwrap(); + + DataSourceExec::new(Arc::new( + MemorySourceConfig::try_new(&[vec![data]], schema, None).unwrap(), + )) + } + async fn collect_range_select_test( range1: Millisecond, range2: Millisecond, @@ -1765,6 +1803,76 @@ mod test { assert_eq!(vec![3, 3, 3, 3], row_counts); } + #[tokio::test] + async fn range_select_skips_empty_output_batch() { + let memory_exec = Arc::new(prepare_empty_test_data(true)); + let schema = Arc::new(Schema::new(vec![ + Field::new("MIN(value)", DataType::Float64, true), + Field::new("MAX(value)", DataType::Float64, true), + Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true), + Field::new("host", DataType::Utf8, true), + ])); + let cache = Arc::new(PlanProperties::new( + EquivalenceProperties::new(schema.clone()), + Partitioning::UnknownPartitioning(1), + EmissionType::Incremental, + Boundedness::Bounded, + )); + let input_schema = memory_exec.schema().clone(); + let range_select_exec = Arc::new(RangeSelectExec { + input: memory_exec, + range_exec: vec![ + RangeFnExec { + expr: Arc::new( + AggregateExprBuilder::new( + min_max::min_udaf(), + vec![Arc::new(Column::new("value", 1))], + ) + .schema(input_schema.clone()) + .alias("MIN(value)") + .build() + .unwrap(), + ), + range: 10_000, + fill: Some(Fill::Null), + need_cast: None, + }, + RangeFnExec { + expr: Arc::new( + AggregateExprBuilder::new( + min_max::max_udaf(), + vec![Arc::new(Column::new("value", 1))], + ) + .schema(input_schema) + .alias("MAX(value)") + .build() + .unwrap(), + ), + range: 5_000, + fill: Some(Fill::Null), + need_cast: None, + }, + ], + align: 5_000, + align_to: 0, + by: vec![Arc::new(Column::new("host", 2))], + time_index: TIME_INDEX_COLUMN.to_string(), + schema: schema.clone(), + schema_before_project: schema.clone(), + schema_project: None, + by_schema: Arc::new(Schema::new(vec![Field::new("host", DataType::Utf8, true)])), + metric: ExecutionPlanMetricsSet::new(), + cache, + }); + let session_context = SessionContext::new(); + let result = + datafusion::physical_plan::collect(range_select_exec, session_context.task_ctx()) + .await + .unwrap(); + + assert!(result.is_empty()); + } + #[test] fn fill_test() { assert!(Fill::try_from_str("", &DataType::UInt8).unwrap().is_none());