mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-14 12:00:40 +00:00
@@ -535,6 +535,7 @@ impl PartSortStream {
|
||||
}
|
||||
|
||||
fn push_buffer(&mut self, batch: DfRecordBatch) -> datafusion_common::Result<()> {
|
||||
rb_sanity_check(&batch)?;
|
||||
match &mut self.buffer {
|
||||
PartSortBuffer::All(v) => v.push(batch),
|
||||
PartSortBuffer::Top(top, cnt) => {
|
||||
@@ -559,7 +560,11 @@ impl PartSortStream {
|
||||
}
|
||||
// else check if last value in topk is not in next group range
|
||||
let topk_buffer = self.sort_top_buffer()?;
|
||||
assert_eq!(topk_buffer.num_rows(), self.limit.unwrap());
|
||||
assert!(topk_buffer.num_rows() >= 1);
|
||||
rb_sanity_check(&topk_buffer)?;
|
||||
let min_batch = topk_buffer.slice(topk_buffer.num_rows() - 1, 1);
|
||||
rb_sanity_check(&min_batch)?;
|
||||
let min_sort_column = self.expression.evaluate_to_sort_column(&min_batch)?.values;
|
||||
let last_val = downcast_ts_array!(
|
||||
min_sort_column.data_type() => (ts_to_timestamp, min_sort_column),
|
||||
@@ -1049,6 +1054,22 @@ impl RecordBatchStream for PartSortStream {
|
||||
}
|
||||
}
|
||||
|
||||
fn rb_sanity_check(batch: &DfRecordBatch) -> datafusion_common::Result<()> {
|
||||
let row_cnt = batch.num_rows();
|
||||
for column in batch.columns() {
|
||||
if column.len() != row_cnt {
|
||||
internal_err!(
|
||||
"RecordBatch column length mismatch: expected {}, found {} at {} for column {:?}",
|
||||
row_cnt,
|
||||
column.len(),
|
||||
snafu::location!(),
|
||||
column
|
||||
)?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use std::sync::Arc;
|
||||
|
||||
Reference in New Issue
Block a user