handle empty input

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2026-03-28 16:39:08 +08:00
parent b1330a8413
commit c8675226ff

View File

@@ -1165,8 +1165,9 @@ impl RangeSelectStream {
let num_rows = self.output_batch.as_ref().unwrap().num_rows();
if num_rows == 0 {
self.output_batch = None;
self.output_batch_offset = 0;
return Ok(self.output_batch.take());
return Ok(None);
}
if self.output_batch_offset == 0 && num_rows <= self.batch_size {
@@ -1236,7 +1237,10 @@ impl Stream for RangeSelectStream {
}
Poll::Ready(Some(Ok(batch)))
}
Ok(None) => Poll::Ready(None),
Ok(None) => {
self.exec_state = ExecutionState::Done;
Poll::Ready(None)
}
// error making output
Err(error) => Poll::Ready(Some(Err(error))),
};
@@ -1290,7 +1294,7 @@ mod test {
use datafusion::prelude::SessionContext;
use datafusion_physical_expr::PhysicalSortExpr;
use datafusion_physical_expr::expressions::Column;
use datatypes::arrow::array::TimestampMillisecondArray;
use datatypes::arrow::array::{Float64Array, Int64Array, TimestampMillisecondArray};
use datatypes::arrow_array::StringArray;
use super::*;
@@ -1352,6 +1356,40 @@ mod test {
))
}
fn prepare_empty_test_data(is_float: bool) -> DataSourceExec {
let schema = Arc::new(Schema::new(vec![
Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true),
Field::new(
"value",
if is_float {
DataType::Float64
} else {
DataType::Int64
},
true,
),
Field::new("host", DataType::Utf8, true),
]));
let timestamp_column: Arc<dyn Array> =
Arc::new(TimestampMillisecondArray::from(Vec::<i64>::new())) as _;
let value_column: Arc<dyn Array> = if is_float {
Arc::new(Float64Array::from(Vec::<Option<f64>>::new())) as _
} else {
Arc::new(Int64Array::from(Vec::<Option<i64>>::new())) as _
};
let host_column: Arc<dyn Array> =
Arc::new(StringArray::from(Vec::<Option<&str>>::new())) as _;
let data = RecordBatch::try_new(
schema.clone(),
vec![timestamp_column, value_column, host_column],
)
.unwrap();
DataSourceExec::new(Arc::new(
MemorySourceConfig::try_new(&[vec![data]], schema, None).unwrap(),
))
}
async fn collect_range_select_test(
range1: Millisecond,
range2: Millisecond,
@@ -1765,6 +1803,76 @@ mod test {
assert_eq!(vec![3, 3, 3, 3], row_counts);
}
#[tokio::test]
async fn range_select_skips_empty_output_batch() {
let memory_exec = Arc::new(prepare_empty_test_data(true));
let schema = Arc::new(Schema::new(vec![
Field::new("MIN(value)", DataType::Float64, true),
Field::new("MAX(value)", DataType::Float64, true),
Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true),
Field::new("host", DataType::Utf8, true),
]));
let cache = Arc::new(PlanProperties::new(
EquivalenceProperties::new(schema.clone()),
Partitioning::UnknownPartitioning(1),
EmissionType::Incremental,
Boundedness::Bounded,
));
let input_schema = memory_exec.schema().clone();
let range_select_exec = Arc::new(RangeSelectExec {
input: memory_exec,
range_exec: vec![
RangeFnExec {
expr: Arc::new(
AggregateExprBuilder::new(
min_max::min_udaf(),
vec![Arc::new(Column::new("value", 1))],
)
.schema(input_schema.clone())
.alias("MIN(value)")
.build()
.unwrap(),
),
range: 10_000,
fill: Some(Fill::Null),
need_cast: None,
},
RangeFnExec {
expr: Arc::new(
AggregateExprBuilder::new(
min_max::max_udaf(),
vec![Arc::new(Column::new("value", 1))],
)
.schema(input_schema)
.alias("MAX(value)")
.build()
.unwrap(),
),
range: 5_000,
fill: Some(Fill::Null),
need_cast: None,
},
],
align: 5_000,
align_to: 0,
by: vec![Arc::new(Column::new("host", 2))],
time_index: TIME_INDEX_COLUMN.to_string(),
schema: schema.clone(),
schema_before_project: schema.clone(),
schema_project: None,
by_schema: Arc::new(Schema::new(vec![Field::new("host", DataType::Utf8, true)])),
metric: ExecutionPlanMetricsSet::new(),
cache,
});
let session_context = SessionContext::new();
let result =
datafusion::physical_plan::collect(range_select_exec, session_context.task_ctx())
.await
.unwrap();
assert!(result.is_empty());
}
#[test]
fn fill_test() {
assert!(Fill::try_from_str("", &DataType::UInt8).unwrap().is_none());