From a23ff4d589266a23c372de52cca48fa1bee40ee8 Mon Sep 17 00:00:00 2001 From: Yingwen Date: Tue, 26 May 2026 16:19:52 +0800 Subject: [PATCH] perf(mito): split record batches on equal timestamps (#8163) * perf(mito): split record batches on equal timestamps Signed-off-by: evenyag * test(mito): cover equal-timestamp runs at batch boundaries Signed-off-by: evenyag --------- Signed-off-by: evenyag --- src/mito2/src/read/scan_util.rs | 76 ++++++++++++++++++++++++++++++++- 1 file changed, 74 insertions(+), 2 deletions(-) diff --git a/src/mito2/src/read/scan_util.rs b/src/mito2/src/read/scan_util.rs index 1d97b2eb76..4cf2179430 100644 --- a/src/mito2/src/read/scan_util.rs +++ b/src/mito2/src/read/scan_util.rs @@ -1662,7 +1662,7 @@ where } } -/// Splits the batch by timestamps. +/// Splits the batch so each sub-batch has strictly increasing timestamps. /// /// # Panics /// Panics if the timestamp array is invalid. @@ -1683,7 +1683,7 @@ pub(crate) fn split_record_batch(record_batch: RecordBatch, batches: &mut VecDeq offsets.push(0); let values = ts_values.values(); for (i, &value) in values.iter().take(batch_rows - 1).enumerate() { - if value > values[i + 1] { + if value >= values[i + 1] { offsets.push(i + 1); } } @@ -1951,4 +1951,76 @@ mod tests { compute_average_batch_size(std::iter::empty()) ); } + + /// Builds a flat-format record batch whose time index column holds `timestamps`. + fn flat_ts_batch(timestamps: &[i64]) -> RecordBatch { + use datatypes::arrow::array::{TimestampMillisecondArray, UInt8Array, UInt64Array}; + use datatypes::arrow::datatypes::{DataType, Field, Schema, TimeUnit}; + + let num_rows = timestamps.len(); + let schema = Arc::new(Schema::new(vec![ + Field::new( + "ts", + DataType::Timestamp(TimeUnit::Millisecond, None), + false, + ), + Field::new("pk", DataType::UInt64, false), + Field::new("seq", DataType::UInt64, false), + Field::new("op", DataType::UInt8, false), + ])); + RecordBatch::try_new( + schema, + vec![ + Arc::new(TimestampMillisecondArray::from(timestamps.to_vec())), + Arc::new(UInt64Array::from(vec![0u64; num_rows])), + Arc::new(UInt64Array::from(vec![0u64; num_rows])), + Arc::new(UInt8Array::from(vec![0u8; num_rows])), + ], + ) + .unwrap() + } + + /// Splits `timestamps` and returns the time index values of each sub-batch. + fn split_ts(timestamps: &[i64]) -> Vec> { + let mut batches = VecDeque::new(); + split_record_batch(flat_ts_batch(timestamps), &mut batches); + batches + .iter() + .map(|batch| { + let pos = time_index_column_index(batch.num_columns()); + let (values, _) = timestamp_array_to_primitive(batch.column(pos)).unwrap(); + values.values().to_vec() + }) + .collect() + } + + #[test] + fn test_split_record_batch_on_equal_timestamps() { + // Splits on both decreasing and equal timestamps. + assert_eq!( + split_ts(&[1, 2, 2, 3, 1]), + vec![vec![1, 2], vec![2, 3], vec![1]] + ); + // A run of equal timestamps yields single-row sub-batches. + assert_eq!(split_ts(&[5, 5, 5]), vec![vec![5], vec![5], vec![5]]); + // Equal-ts run at the leading edge of the batch. + assert_eq!(split_ts(&[5, 5, 1, 2]), vec![vec![5], vec![5], vec![1, 2]]); + // Equal-ts run at the trailing edge of the batch. + assert_eq!(split_ts(&[1, 2, 5, 5]), vec![vec![1, 2, 5], vec![5]]); + } + + #[test] + fn test_split_record_batch_on_decreasing_timestamps() { + assert_eq!(split_ts(&[1, 2, 3]), vec![vec![1, 2, 3]]); + assert_eq!(split_ts(&[1, 3, 2, 4]), vec![vec![1, 3], vec![2, 4]]); + } + + #[test] + fn test_split_record_batch_empty_and_single_row() { + let mut batches = VecDeque::new(); + split_record_batch(flat_ts_batch(&[]), &mut batches); + assert!(batches.is_empty()); + + assert_eq!(split_ts(&[42]), vec![vec![42]]); + } }