From 506dc20765f892b3d7ad77af841f6bbf7c1a3892 Mon Sep 17 00:00:00 2001 From: Yingwen Date: Fri, 6 Sep 2024 12:13:23 +0800 Subject: [PATCH] fix: last non null iter not init (#4687) --- src/mito2/src/read/dedup.rs | 61 ++++++++++++++++++++++++++++++++++++- 1 file changed, 60 insertions(+), 1 deletion(-) diff --git a/src/mito2/src/read/dedup.rs b/src/mito2/src/read/dedup.rs index 52ff05fd12..ddc96049e7 100644 --- a/src/mito2/src/read/dedup.rs +++ b/src/mito2/src/read/dedup.rs @@ -258,13 +258,18 @@ impl LastFieldsBuilder { fn maybe_init(&mut self, batch: &Batch) { debug_assert!(!batch.is_empty()); - if self.initialized || batch.fields().is_empty() { + if self.initialized { // Already initialized or no fields to merge. return; } self.initialized = true; + if batch.fields().is_empty() { + // No fields to merge. + return; + } + let last_idx = batch.num_rows() - 1; let fields = batch.fields(); // Safety: The last_idx is valid. @@ -1165,4 +1170,58 @@ mod tests { ]; assert_eq!(&expect, &actual[..]); } + + /// Returns a new [Batch] without fields. + fn new_batch_no_fields( + primary_key: &[u8], + timestamps: &[i64], + sequences: &[u64], + op_types: &[OpType], + ) -> Batch { + let mut builder = BatchBuilder::new(primary_key.to_vec()); + builder + .timestamps_array(Arc::new(TimestampMillisecondArray::from_iter_values( + timestamps.iter().copied(), + ))) + .unwrap() + .sequences_array(Arc::new(UInt64Array::from_iter_values( + sequences.iter().copied(), + ))) + .unwrap() + .op_types_array(Arc::new(UInt8Array::from_iter_values( + op_types.iter().map(|v| *v as u8), + ))) + .unwrap(); + builder.build().unwrap() + } + + #[test] + fn test_last_non_null_iter_no_batch() { + let input = [ + new_batch_no_fields( + b"k1", + &[1, 1, 2], + &[13, 12, 13], + &[OpType::Put, OpType::Put, OpType::Put], + ), + new_batch_no_fields(b"k1", &[2, 3], &[12, 13], &[OpType::Put, OpType::Delete]), + new_batch_no_fields( + b"k2", + &[1, 1, 2], + &[13, 12, 13], + &[OpType::Put, OpType::Put, OpType::Put], + ), + ]; + let iter = input.into_iter().map(Ok); + let iter = LastNonNullIter::new(iter); + let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect(); + let expect = [ + new_batch_no_fields(b"k1", &[1], &[13], &[OpType::Put]), + new_batch_no_fields(b"k1", &[2], &[13], &[OpType::Put]), + new_batch_no_fields(b"k1", &[3], &[13], &[OpType::Delete]), + new_batch_no_fields(b"k2", &[1], &[13], &[OpType::Put]), + new_batch_no_fields(b"k2", &[2], &[13], &[OpType::Put]), + ]; + assert_eq!(&expect, &actual[..]); + } }