diff --git a/src/query/src/part_sort.rs b/src/query/src/part_sort.rs index 3fcafb6886..ac9ea039cb 100644 --- a/src/query/src/part_sort.rs +++ b/src/query/src/part_sort.rs @@ -535,6 +535,7 @@ impl PartSortStream { } fn push_buffer(&mut self, batch: DfRecordBatch) -> datafusion_common::Result<()> { + rb_sanity_check(&batch)?; match &mut self.buffer { PartSortBuffer::All(v) => v.push(batch), PartSortBuffer::Top(top, cnt) => { @@ -559,7 +560,11 @@ impl PartSortStream { } // else check if last value in topk is not in next group range let topk_buffer = self.sort_top_buffer()?; + assert_eq!(topk_buffer.num_rows(), self.limit.unwrap()); + assert!(topk_buffer.num_rows() >= 1); + rb_sanity_check(&topk_buffer)?; let min_batch = topk_buffer.slice(topk_buffer.num_rows() - 1, 1); + rb_sanity_check(&min_batch)?; let min_sort_column = self.expression.evaluate_to_sort_column(&min_batch)?.values; let last_val = downcast_ts_array!( min_sort_column.data_type() => (ts_to_timestamp, min_sort_column), @@ -1049,6 +1054,22 @@ impl RecordBatchStream for PartSortStream { } } +fn rb_sanity_check(batch: &DfRecordBatch) -> datafusion_common::Result<()> { + let row_cnt = batch.num_rows(); + for column in batch.columns() { + if column.len() != row_cnt { + internal_err!( + "RecordBatch column length mismatch: expected {}, found {} at {} for column {:?}", + row_cnt, + column.len(), + snafu::location!(), + column + )?; + } + } + Ok(()) +} + #[cfg(test)] mod test { use std::sync::Arc;