perf(mito): split record batches on equal timestamps (#8163)

* perf(mito): split record batches on equal timestamps

Signed-off-by: evenyag <realevenyag@gmail.com>

* test(mito): cover equal-timestamp runs at batch boundaries

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
Yingwen
2026-05-26 16:19:52 +08:00
committed by GitHub
parent 44f1804b5e
commit a23ff4d589

View File

@@ -1662,7 +1662,7 @@ where
}
}
/// Splits the batch by timestamps.
/// Splits the batch so each sub-batch has strictly increasing timestamps.
///
/// # Panics
/// Panics if the timestamp array is invalid.
@@ -1683,7 +1683,7 @@ pub(crate) fn split_record_batch(record_batch: RecordBatch, batches: &mut VecDeq
offsets.push(0);
let values = ts_values.values();
for (i, &value) in values.iter().take(batch_rows - 1).enumerate() {
if value > values[i + 1] {
if value >= values[i + 1] {
offsets.push(i + 1);
}
}
@@ -1951,4 +1951,76 @@ mod tests {
compute_average_batch_size(std::iter::empty())
);
}
/// Builds a flat-format record batch whose time index column holds `timestamps`.
fn flat_ts_batch(timestamps: &[i64]) -> RecordBatch {
use datatypes::arrow::array::{TimestampMillisecondArray, UInt8Array, UInt64Array};
use datatypes::arrow::datatypes::{DataType, Field, Schema, TimeUnit};
let num_rows = timestamps.len();
let schema = Arc::new(Schema::new(vec![
Field::new(
"ts",
DataType::Timestamp(TimeUnit::Millisecond, None),
false,
),
Field::new("pk", DataType::UInt64, false),
Field::new("seq", DataType::UInt64, false),
Field::new("op", DataType::UInt8, false),
]));
RecordBatch::try_new(
schema,
vec![
Arc::new(TimestampMillisecondArray::from(timestamps.to_vec())),
Arc::new(UInt64Array::from(vec![0u64; num_rows])),
Arc::new(UInt64Array::from(vec![0u64; num_rows])),
Arc::new(UInt8Array::from(vec![0u8; num_rows])),
],
)
.unwrap()
}
/// Splits `timestamps` and returns the time index values of each sub-batch.
fn split_ts(timestamps: &[i64]) -> Vec<Vec<i64>> {
let mut batches = VecDeque::new();
split_record_batch(flat_ts_batch(timestamps), &mut batches);
batches
.iter()
.map(|batch| {
let pos = time_index_column_index(batch.num_columns());
let (values, _) = timestamp_array_to_primitive(batch.column(pos)).unwrap();
values.values().to_vec()
})
.collect()
}
#[test]
fn test_split_record_batch_on_equal_timestamps() {
// Splits on both decreasing and equal timestamps.
assert_eq!(
split_ts(&[1, 2, 2, 3, 1]),
vec![vec![1, 2], vec![2, 3], vec![1]]
);
// A run of equal timestamps yields single-row sub-batches.
assert_eq!(split_ts(&[5, 5, 5]), vec![vec![5], vec![5], vec![5]]);
// Equal-ts run at the leading edge of the batch.
assert_eq!(split_ts(&[5, 5, 1, 2]), vec![vec![5], vec![5], vec![1, 2]]);
// Equal-ts run at the trailing edge of the batch.
assert_eq!(split_ts(&[1, 2, 5, 5]), vec![vec![1, 2, 5], vec![5]]);
}
#[test]
fn test_split_record_batch_on_decreasing_timestamps() {
assert_eq!(split_ts(&[1, 2, 3]), vec![vec![1, 2, 3]]);
assert_eq!(split_ts(&[1, 3, 2, 4]), vec![vec![1, 3], vec![2, 4]]);
}
#[test]
fn test_split_record_batch_empty_and_single_row() {
let mut batches = VecDeque::new();
split_record_batch(flat_ts_batch(&[]), &mut batches);
assert!(batches.is_empty());
assert_eq!(split_ts(&[42]), vec![vec![42]]);
}
}