diff --git a/src/mito2/src/read/flat_dedup.rs b/src/mito2/src/read/flat_dedup.rs index d22a103131..aa9b232824 100644 --- a/src/mito2/src/read/flat_dedup.rs +++ b/src/mito2/src/read/flat_dedup.rs @@ -14,6 +14,8 @@ //! Dedup implementation for flat format. +use std::ops::Range; + use api::v1::OpType; use datatypes::arrow::array::{ make_comparator, Array, ArrayRef, BinaryArray, BooleanArray, BooleanBufferBuilder, UInt64Array, @@ -22,18 +24,21 @@ use datatypes::arrow::array::{ use datatypes::arrow::buffer::BooleanBuffer; use datatypes::arrow::compute::kernels::cmp::distinct; use datatypes::arrow::compute::kernels::partition::{partition, Partitions}; -use datatypes::arrow::compute::{filter_record_batch, take_record_batch, SortOptions}; +use datatypes::arrow::compute::kernels::take::take; +use datatypes::arrow::compute::{ + concat_batches, filter_record_batch, take_record_batch, SortOptions, TakeOptions, +}; use datatypes::arrow::error::ArrowError; use datatypes::arrow::record_batch::RecordBatch; use snafu::ResultExt; -use crate::error::{ComputeArrowSnafu, Result}; +use crate::error::{ComputeArrowSnafu, NewRecordBatchSnafu, Result}; use crate::memtable::partition_tree::data::timestamp_array_to_i64_slice; use crate::read::dedup::DedupMetrics; use crate::sst::parquet::flat_format::{ op_type_column_index, primary_key_column_index, time_index_column_index, }; -use crate::sst::parquet::format::PrimaryKeyArray; +use crate::sst::parquet::format::{PrimaryKeyArray, FIXED_POS_COLUMN_NUM}; /// An iterator to dedup sorted batches from an iterator based on the dedup strategy. pub struct DedupIterator { @@ -205,6 +210,261 @@ impl RecordBatchDedupStrategy for FlatLastRow { } } +/// Dedup strategy that keeps the last non-null field for the same key. +pub struct FlatLastNonNull { + /// The start index of field columns: + field_column_start: usize, + /// Filter deleted rows. + filter_deleted: bool, + /// Buffered batch to check whether the next batch have duplicated rows with this batch. + /// Fields in the last row of this batch may be updated by the next batch. + /// The buffered batch should contain no duplication. + buffer: Option, + /// Whether the last row range contains a delete operation. + /// If so, we don't need to update null fields. + contains_delete: bool, +} + +impl RecordBatchDedupStrategy for FlatLastNonNull { + fn push_batch( + &mut self, + batch: RecordBatch, + metrics: &mut DedupMetrics, + ) -> Result> { + if batch.num_rows() == 0 { + return Ok(None); + } + + let row_before_dedup = batch.num_rows(); + + let Some(buffer) = self.buffer.take() else { + // If the buffer is None, dedup the batch, put the batch into the buffer and return. + // There is no previous batch with the same key, we can pass contains_delete as false. + let (record_batch, contains_delete) = + Self::dedup_one_batch(batch, self.field_column_start, false)?; + metrics.num_unselected_rows += row_before_dedup - record_batch.num_rows(); + self.buffer = BatchLastRow::try_new(record_batch); + self.contains_delete = contains_delete; + + return Ok(None); + }; + + if !buffer.is_last_row_duplicated(&batch) { + // The first row of batch has different key from the buffer. + // We can replace the buffer with the new batch. + // Dedup the batch. + // There is no previous batch with the same key, we can pass contains_delete as false. + let (record_batch, contains_delete) = + Self::dedup_one_batch(batch, self.field_column_start, false)?; + metrics.num_unselected_rows += row_before_dedup - record_batch.num_rows(); + debug_assert!(record_batch.num_rows() > 0); + self.buffer = BatchLastRow::try_new(record_batch); + self.contains_delete = contains_delete; + + return maybe_filter_deleted(buffer.last_batch, self.filter_deleted, metrics); + } + + // The next batch has duplicated rows. + // We can return rows except the last row in the buffer. + let output = if buffer.last_batch.num_rows() > 1 { + let dedup_batch = buffer.last_batch.slice(0, buffer.last_batch.num_rows() - 1); + debug_assert_eq!(buffer.last_batch.num_rows() - 1, dedup_batch.num_rows()); + + maybe_filter_deleted(dedup_batch, self.filter_deleted, metrics)? + } else { + None + }; + let last_row = buffer.last_batch.slice(buffer.last_batch.num_rows() - 1, 1); + + // We concat the last row with the next batch. + let schema = batch.schema(); + let merged = concat_batches(&schema, &[last_row, batch]).context(ComputeArrowSnafu)?; + let merged_row_count = merged.num_rows(); + // Dedup the merged batch and update the buffer. + let (record_batch, contains_delete) = + Self::dedup_one_batch(merged, self.field_column_start, self.contains_delete)?; + metrics.num_unselected_rows += merged_row_count - record_batch.num_rows(); + debug_assert!(record_batch.num_rows() > 0); + self.buffer = BatchLastRow::try_new(record_batch); + self.contains_delete = contains_delete; + + Ok(output) + } + + fn finish(&mut self, metrics: &mut DedupMetrics) -> Result> { + let Some(buffer) = self.buffer.take() else { + return Ok(None); + }; + + maybe_filter_deleted(buffer.last_batch, self.filter_deleted, metrics) + } +} + +impl FlatLastNonNull { + /// Creates a new strategy with the given `filter_deleted` flag. + pub fn new(field_column_start: usize, filter_deleted: bool) -> Self { + Self { + field_column_start, + filter_deleted, + buffer: None, + contains_delete: false, + } + } + + /// Remove duplications from the batch without considering the previous and next rows. + /// Returns a tuple containing the deduplicated batch and a boolean indicating whether the last range contains deleted rows. + fn dedup_one_batch( + batch: RecordBatch, + field_column_start: usize, + prev_batch_contains_delete: bool, + ) -> Result<(RecordBatch, bool)> { + // Get op type array for checking delete operations + let op_type_column = batch + .column(op_type_column_index(batch.num_columns())) + .clone(); + let op_types = op_type_column + .as_any() + .downcast_ref::() + .unwrap(); + let num_rows = batch.num_rows(); + if num_rows < 2 { + let contains_delete = if num_rows > 0 { + op_types.value(0) == OpType::Delete as u8 + } else { + false + }; + return Ok((batch, contains_delete)); + } + + let num_columns = batch.num_columns(); + let timestamps = batch.column(time_index_column_index(num_columns)); + // Checks duplications based on the timestamp. + let mask = find_boundaries(timestamps).context(ComputeArrowSnafu)?; + if mask.count_set_bits() == num_rows - 1 { + let contains_delete = op_types.value(num_rows - 1) == OpType::Delete as u8; + // Fast path: No duplication. + return Ok((batch, contains_delete)); + } + + // The batch has duplicated timestamps, but it doesn't mean it must + // has duplicated rows. + // Partitions the batch by the primary key and time index. + let columns: Vec<_> = [ + primary_key_column_index(num_columns), + time_index_column_index(num_columns), + ] + .iter() + .map(|index| batch.column(*index).clone()) + .collect(); + let partitions = partition(&columns).context(ComputeArrowSnafu)?; + + Self::dedup_by_partitions( + batch, + &partitions, + field_column_start, + op_types, + prev_batch_contains_delete, + ) + } + + /// Remove depulications for each partition. + /// Returns a tuple containing the deduplicated batch and a boolean indicating whether the last range contains deleted rows. + fn dedup_by_partitions( + batch: RecordBatch, + partitions: &Partitions, + field_column_start: usize, + op_types: &UInt8Array, + first_range_contains_delete: bool, + ) -> Result<(RecordBatch, bool)> { + let ranges = partitions.ranges(); + let contains_delete = Self::last_range_has_delete(&ranges, op_types); + + // Each range at least has 1 row. + let num_duplications: usize = ranges.iter().map(|r| r.end - r.start - 1).sum(); + if num_duplications == 0 { + // Fast path, no duplication. + return Ok((batch, contains_delete)); + } + + let field_column_end = batch.num_columns() - FIXED_POS_COLUMN_NUM; + let take_options = Some(TakeOptions { + check_bounds: false, + }); + // Always takes the first value for non-field columns in each range. + let non_field_indices: UInt64Array = ranges.iter().map(|r| Some(r.start as u64)).collect(); + let new_columns = batch + .columns() + .iter() + .enumerate() + .map(|(col_idx, column)| { + if col_idx >= field_column_start && col_idx < field_column_end { + let field_indices = Self::compute_field_indices( + &ranges, + column, + op_types, + first_range_contains_delete, + ); + take(column, &field_indices, take_options.clone()).context(ComputeArrowSnafu) + } else { + take(column, &non_field_indices, take_options.clone()) + .context(ComputeArrowSnafu) + } + }) + .collect::>>()?; + + let record_batch = + RecordBatch::try_new(batch.schema(), new_columns).context(NewRecordBatchSnafu)?; + Ok((record_batch, contains_delete)) + } + + /// Returns an array of indices of the latest non null value for + /// each input range. + /// If all values in a range are null, the returned index is unspecific. + /// Stops when encountering a delete operation and ignores all subsequent rows. + fn compute_field_indices( + ranges: &[Range], + field_array: &ArrayRef, + op_types: &UInt8Array, + first_range_contains_delete: bool, + ) -> UInt64Array { + ranges + .iter() + .enumerate() + .map(|(range_idx, r)| { + let mut value_index = r.start as u64; + if range_idx == 0 && first_range_contains_delete { + return Some(value_index); + } + + // Iterate through the range to find the first valid non-null value + // but stop if we encounter a delete operation. + for i in r.clone() { + if op_types.value(i) == OpType::Delete as u8 { + break; + } + if field_array.is_valid(i) { + value_index = i as u64; + break; + } + } + + Some(value_index) + }) + .collect() + } + + /// Checks whether the last range contains a delete operation. + fn last_range_has_delete(ranges: &[Range], op_types: &UInt8Array) -> bool { + if let Some(last_range) = ranges.last() { + last_range + .clone() + .any(|i| op_types.value(i) == OpType::Delete as u8) + } else { + false + } + } +} + /// State of the batch with the last row for dedup. struct BatchLastRow { /// The record batch that contains the last row. @@ -296,6 +556,10 @@ fn maybe_filter_deleted( return Ok(Some(record_batch)); } let batch = filter_deleted_from_batch(record_batch, metrics)?; + // Skips empty batches. + if batch.num_rows() == 0 { + return Ok(None); + } Ok(Some(batch)) } @@ -362,8 +626,8 @@ mod tests { use api::v1::OpType; use datatypes::arrow::array::{ - ArrayRef, BinaryDictionaryBuilder, Int64Array, TimestampMillisecondArray, UInt64Array, - UInt8Array, + ArrayRef, BinaryDictionaryBuilder, Int64Array, StringDictionaryBuilder, + TimestampMillisecondArray, UInt64Array, UInt8Array, }; use datatypes::arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit, UInt32Type}; use datatypes::arrow::record_batch::RecordBatch; @@ -385,6 +649,8 @@ mod tests { debug_assert_eq!(primary_keys.len(), num_rows); let columns: Vec = vec![ + // k0 column (primary key as string dictionary) + build_test_pk_string_dict_array(primary_keys), // field0 column Arc::new(Int64Array::from_iter( fields.iter().map(|v| Some(*v as i64)), @@ -406,6 +672,58 @@ mod tests { RecordBatch::try_new(build_test_flat_schema(), columns).unwrap() } + /// Creates a test RecordBatch in flat format with multiple fields for testing FlatLastNonNull. + fn new_record_batch_multi_fields( + primary_keys: &[&[u8]], + timestamps: &[i64], + sequences: &[u64], + op_types: &[OpType], + fields: &[(Option, Option)], + ) -> RecordBatch { + let num_rows = timestamps.len(); + debug_assert_eq!(sequences.len(), num_rows); + debug_assert_eq!(op_types.len(), num_rows); + debug_assert_eq!(fields.len(), num_rows); + debug_assert_eq!(primary_keys.len(), num_rows); + + let columns: Vec = vec![ + // k0 column (primary key as string dictionary) + build_test_pk_string_dict_array(primary_keys), + // field0 column + Arc::new(Int64Array::from_iter( + fields.iter().map(|field| field.0.map(|v| v as i64)), + )), + // field1 column + Arc::new(Int64Array::from_iter( + fields.iter().map(|field| field.1.map(|v| v as i64)), + )), + // ts column (time index) + Arc::new(TimestampMillisecondArray::from_iter_values( + timestamps.iter().copied(), + )), + // __primary_key column + build_test_pk_array(primary_keys), + // __sequence column + Arc::new(UInt64Array::from_iter_values(sequences.iter().copied())), + // __op_type column + Arc::new(UInt8Array::from_iter_values( + op_types.iter().map(|v| *v as u8), + )), + ]; + + RecordBatch::try_new(build_test_multi_field_schema(), columns).unwrap() + } + + /// Creates a test string dictionary primary key array for given primary keys. + fn build_test_pk_string_dict_array(primary_keys: &[&[u8]]) -> ArrayRef { + let mut builder = StringDictionaryBuilder::::new(); + for &pk in primary_keys { + let pk_str = std::str::from_utf8(pk).unwrap(); + builder.append(pk_str).unwrap(); + } + Arc::new(builder.finish()) + } + /// Creates a test primary key array for given primary keys. fn build_test_pk_array(primary_keys: &[&[u8]]) -> ArrayRef { let mut builder = BinaryDictionaryBuilder::::new(); @@ -418,6 +736,11 @@ mod tests { /// Builds the arrow schema for test flat format. fn build_test_flat_schema() -> SchemaRef { let fields = vec![ + Field::new( + "k0", + DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Utf8)), + false, + ), Field::new("field0", DataType::Int64, true), Field::new( "ts", @@ -435,20 +758,42 @@ mod tests { Arc::new(Schema::new(fields)) } + /// Builds the arrow schema for test flat format with multiple fields. + fn build_test_multi_field_schema() -> SchemaRef { + let fields = vec![ + Field::new( + "k0", + DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Utf8)), + false, + ), + Field::new("field0", DataType::Int64, true), + Field::new("field1", DataType::Int64, true), + Field::new( + "ts", + DataType::Timestamp(TimeUnit::Millisecond, None), + false, + ), + Field::new( + "__primary_key", + DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Binary)), + false, + ), + Field::new("__sequence", DataType::UInt64, false), + Field::new("__op_type", DataType::UInt8, false), + ]; + Arc::new(Schema::new(fields)) + } + /// Asserts that two RecordBatch vectors are equal. fn check_record_batches_equal(expected: &[RecordBatch], actual: &[RecordBatch]) { + for (i, (exp, act)) in expected.iter().zip(actual.iter()).enumerate() { + assert_eq!(exp, act, "RecordBatch {} differs", i); + } assert_eq!( expected.len(), actual.len(), "Number of batches don't match" ); - for (i, (exp, act)) in expected.iter().zip(actual.iter()).enumerate() { - assert_eq!( - exp, act, - "RecordBatch {} differs:\nExpected: {:?}\nActual: {:?}", - i, exp, act - ); - } } /// Helper function to collect iterator results. @@ -584,4 +929,567 @@ mod tests { assert_eq!(3, dedup_iter.metrics.num_unselected_rows); assert_eq!(0, dedup_iter.metrics.num_deleted_rows); } + + #[test] + fn test_flat_last_non_null_no_duplications() { + let input = vec![ + new_record_batch( + &[b"k1", b"k1"], + &[1, 2], + &[11, 12], + &[OpType::Put, OpType::Put], + &[21, 22], + ), + new_record_batch(&[b"k1"], &[3], &[13], &[OpType::Put], &[23]), + new_record_batch( + &[b"k2", b"k2"], + &[1, 2], + &[111, 112], + &[OpType::Put, OpType::Put], + &[31, 32], + ), + ]; + + // Test with filter_deleted = true + let iter = input.clone().into_iter().map(Ok); + let mut dedup_iter = DedupIterator::new(iter, FlatLastNonNull::new(1, true)); + let result = collect_iterator_results(&mut dedup_iter); + check_record_batches_equal(&input, &result); + assert_eq!(0, dedup_iter.metrics.num_unselected_rows); + assert_eq!(0, dedup_iter.metrics.num_deleted_rows); + + // Test with filter_deleted = false + let iter = input.clone().into_iter().map(Ok); + let mut dedup_iter = DedupIterator::new(iter, FlatLastNonNull::new(1, false)); + let result = collect_iterator_results(&mut dedup_iter); + check_record_batches_equal(&input, &result); + assert_eq!(0, dedup_iter.metrics.num_unselected_rows); + assert_eq!(0, dedup_iter.metrics.num_deleted_rows); + } + + #[test] + fn test_flat_last_non_null_field_merging() { + let input = vec![ + new_record_batch_multi_fields( + &[b"k1", b"k1"], + &[1, 2], + &[13, 11], + &[OpType::Put, OpType::Put], + &[(Some(11), Some(11)), (None, None)], + ), + // empty batch + new_record_batch_multi_fields(&[], &[], &[], &[], &[]), + // Duplicate with the previous batch - should merge fields + new_record_batch_multi_fields( + &[b"k1"], + &[2], + &[10], + &[OpType::Put], + &[(Some(12), None)], + ), + new_record_batch_multi_fields( + &[b"k1", b"k1", b"k1"], + &[2, 3, 4], + &[10, 13, 13], + &[OpType::Put, OpType::Put, OpType::Delete], + &[(Some(2), Some(22)), (Some(13), None), (None, Some(14))], + ), + new_record_batch_multi_fields( + &[b"k2", b"k2"], + &[1, 2], + &[20, 20], + &[OpType::Put, OpType::Delete], + &[(Some(101), Some(101)), (None, None)], + ), + new_record_batch_multi_fields( + &[b"k2"], + &[2], + &[19], + &[OpType::Put], + &[(Some(102), Some(102))], + ), + new_record_batch_multi_fields( + &[b"k3"], + &[2], + &[20], + &[OpType::Put], + &[(Some(202), Some(202))], + ), + // This batch won't increase the deleted rows count as it + // is filtered out by the previous batch. (All fields are null). + new_record_batch_multi_fields( + &[b"k3"], + &[2], + &[19], + &[OpType::Delete], + &[(None, None)], + ), + ]; + + // Test with filter_deleted = true + let expected_filter_deleted = vec![ + new_record_batch_multi_fields( + &[b"k1"], + &[1], + &[13], + &[OpType::Put], + &[(Some(11), Some(11))], + ), + new_record_batch_multi_fields( + &[b"k1", b"k1"], + &[2, 3], + &[11, 13], + &[OpType::Put, OpType::Put], + &[(Some(12), Some(22)), (Some(13), None)], + ), + new_record_batch_multi_fields( + &[b"k2"], + &[1], + &[20], + &[OpType::Put], + &[(Some(101), Some(101))], + ), + new_record_batch_multi_fields( + &[b"k3"], + &[2], + &[20], + &[OpType::Put], + &[(Some(202), Some(202))], + ), + ]; + + let iter = input.clone().into_iter().map(Ok); + let mut dedup_iter = DedupIterator::new(iter, FlatLastNonNull::new(1, true)); + let result = collect_iterator_results(&mut dedup_iter); + check_record_batches_equal(&expected_filter_deleted, &result); + assert_eq!(6, dedup_iter.metrics.num_unselected_rows); + assert_eq!(2, dedup_iter.metrics.num_deleted_rows); + + // Test with filter_deleted = false + let expected_no_filter = vec![ + new_record_batch_multi_fields( + &[b"k1"], + &[1], + &[13], + &[OpType::Put], + &[(Some(11), Some(11))], + ), + new_record_batch_multi_fields( + &[b"k1", b"k1", b"k1"], + &[2, 3, 4], + &[11, 13, 13], + &[OpType::Put, OpType::Put, OpType::Delete], + &[(Some(12), Some(22)), (Some(13), None), (None, Some(14))], + ), + new_record_batch_multi_fields( + &[b"k2"], + &[1], + &[20], + &[OpType::Put], + &[(Some(101), Some(101))], + ), + new_record_batch_multi_fields( + &[b"k2"], + &[2], + &[20], + &[OpType::Delete], + &[(None, None)], + ), + new_record_batch_multi_fields( + &[b"k3"], + &[2], + &[20], + &[OpType::Put], + &[(Some(202), Some(202))], + ), + ]; + + let iter = input.clone().into_iter().map(Ok); + let mut dedup_iter = DedupIterator::new(iter, FlatLastNonNull::new(1, false)); + let result = collect_iterator_results(&mut dedup_iter); + check_record_batches_equal(&expected_no_filter, &result); + assert_eq!(4, dedup_iter.metrics.num_unselected_rows); + assert_eq!(0, dedup_iter.metrics.num_deleted_rows); + } + + #[test] + fn test_flat_last_non_null_skip_merge_no_null() { + let input = vec![ + new_record_batch_multi_fields( + &[b"k1", b"k1"], + &[1, 2], + &[13, 11], + &[OpType::Put, OpType::Put], + &[(Some(11), Some(11)), (Some(12), Some(12))], + ), + new_record_batch_multi_fields( + &[b"k1"], + &[2], + &[10], + &[OpType::Put], + &[(None, Some(22))], + ), + new_record_batch_multi_fields( + &[b"k1", b"k1"], + &[2, 3], + &[9, 13], + &[OpType::Put, OpType::Put], + &[(Some(32), None), (Some(13), Some(13))], + ), + ]; + + let expected = vec![ + new_record_batch_multi_fields( + &[b"k1"], + &[1], + &[13], + &[OpType::Put], + &[(Some(11), Some(11))], + ), + new_record_batch_multi_fields( + &[b"k1", b"k1"], + &[2, 3], + &[11, 13], + &[OpType::Put, OpType::Put], + &[(Some(12), Some(12)), (Some(13), Some(13))], + ), + ]; + + let iter = input.into_iter().map(Ok); + let mut dedup_iter = DedupIterator::new(iter, FlatLastNonNull::new(1, true)); + let result = collect_iterator_results(&mut dedup_iter); + check_record_batches_equal(&expected, &result); + assert_eq!(2, dedup_iter.metrics.num_unselected_rows); + assert_eq!(0, dedup_iter.metrics.num_deleted_rows); + } + + #[test] + fn test_flat_last_non_null_merge_null() { + let input = vec![ + new_record_batch_multi_fields( + &[b"k1", b"k1"], + &[1, 2], + &[13, 11], + &[OpType::Put, OpType::Put], + &[(Some(11), Some(11)), (None, None)], + ), + new_record_batch_multi_fields( + &[b"k1"], + &[2], + &[10], + &[OpType::Put], + &[(None, Some(22))], + ), + new_record_batch_multi_fields( + &[b"k1"], + &[3], + &[13], + &[OpType::Put], + &[(Some(33), None)], + ), + ]; + + let expected = vec![ + new_record_batch_multi_fields( + &[b"k1"], + &[1], + &[13], + &[OpType::Put], + &[(Some(11), Some(11))], + ), + new_record_batch_multi_fields( + &[b"k1"], + &[2], + &[11], + &[OpType::Put], + &[(None, Some(22))], + ), + new_record_batch_multi_fields( + &[b"k1"], + &[3], + &[13], + &[OpType::Put], + &[(Some(33), None)], + ), + ]; + + let iter = input.into_iter().map(Ok); + let mut dedup_iter = DedupIterator::new(iter, FlatLastNonNull::new(1, true)); + let result = collect_iterator_results(&mut dedup_iter); + check_record_batches_equal(&expected, &result); + assert_eq!(1, dedup_iter.metrics.num_unselected_rows); + assert_eq!(0, dedup_iter.metrics.num_deleted_rows); + } + + /// Helper function to check dedup strategy behavior directly. + fn check_flat_dedup_strategy( + input: &[RecordBatch], + strategy: &mut dyn RecordBatchDedupStrategy, + expect: &[RecordBatch], + ) { + let mut actual = Vec::new(); + let mut metrics = DedupMetrics::default(); + for batch in input { + if let Some(out) = strategy.push_batch(batch.clone(), &mut metrics).unwrap() { + actual.push(out); + } + } + if let Some(out) = strategy.finish(&mut metrics).unwrap() { + actual.push(out); + } + + check_record_batches_equal(expect, &actual); + } + + #[test] + fn test_flat_last_non_null_strategy_delete_last() { + let input = vec![ + new_record_batch_multi_fields( + &[b"k1"], + &[1], + &[6], + &[OpType::Put], + &[(Some(11), None)], + ), + new_record_batch_multi_fields( + &[b"k1", b"k1"], + &[1, 2], + &[1, 7], + &[OpType::Put, OpType::Put], + &[(Some(1), None), (Some(22), Some(222))], + ), + new_record_batch_multi_fields( + &[b"k1"], + &[2], + &[4], + &[OpType::Put], + &[(Some(12), None)], + ), + new_record_batch_multi_fields( + &[b"k2", b"k2"], + &[2, 3], + &[2, 5], + &[OpType::Put, OpType::Delete], + &[(None, None), (Some(13), None)], + ), + new_record_batch_multi_fields(&[b"k2"], &[3], &[3], &[OpType::Put], &[(None, Some(3))]), + ]; + + let mut strategy = FlatLastNonNull::new(1, true); + check_flat_dedup_strategy( + &input, + &mut strategy, + &[ + new_record_batch_multi_fields( + &[b"k1"], + &[1], + &[6], + &[OpType::Put], + &[(Some(11), None)], + ), + new_record_batch_multi_fields( + &[b"k1"], + &[2], + &[7], + &[OpType::Put], + &[(Some(22), Some(222))], + ), + new_record_batch_multi_fields( + &[b"k2"], + &[2], + &[2], + &[OpType::Put], + &[(None, None)], + ), + ], + ); + } + + #[test] + fn test_flat_last_non_null_strategy_delete_one() { + let input = vec![ + new_record_batch_multi_fields(&[b"k1"], &[1], &[1], &[OpType::Delete], &[(None, None)]), + new_record_batch_multi_fields( + &[b"k2"], + &[1], + &[6], + &[OpType::Put], + &[(Some(11), None)], + ), + ]; + + let mut strategy = FlatLastNonNull::new(1, true); + check_flat_dedup_strategy( + &input, + &mut strategy, + &[new_record_batch_multi_fields( + &[b"k2"], + &[1], + &[6], + &[OpType::Put], + &[(Some(11), None)], + )], + ); + } + + #[test] + fn test_flat_last_non_null_strategy_delete_all() { + let input = vec![ + new_record_batch_multi_fields(&[b"k1"], &[1], &[1], &[OpType::Delete], &[(None, None)]), + new_record_batch_multi_fields( + &[b"k2"], + &[1], + &[6], + &[OpType::Delete], + &[(Some(11), None)], + ), + ]; + + let mut strategy = FlatLastNonNull::new(1, true); + check_flat_dedup_strategy(&input, &mut strategy, &[]); + } + + #[test] + fn test_flat_last_non_null_strategy_same_batch() { + let input = vec![ + new_record_batch_multi_fields( + &[b"k1"], + &[1], + &[6], + &[OpType::Put], + &[(Some(11), None)], + ), + new_record_batch_multi_fields( + &[b"k1", b"k1"], + &[1, 2], + &[1, 7], + &[OpType::Put, OpType::Put], + &[(Some(1), None), (Some(22), Some(222))], + ), + new_record_batch_multi_fields( + &[b"k1"], + &[2], + &[4], + &[OpType::Put], + &[(Some(12), None)], + ), + new_record_batch_multi_fields( + &[b"k1", b"k1"], + &[2, 3], + &[2, 5], + &[OpType::Put, OpType::Put], + &[(None, None), (Some(13), None)], + ), + new_record_batch_multi_fields(&[b"k1"], &[3], &[3], &[OpType::Put], &[(None, Some(3))]), + ]; + + let mut strategy = FlatLastNonNull::new(1, true); + check_flat_dedup_strategy( + &input, + &mut strategy, + &[ + new_record_batch_multi_fields( + &[b"k1"], + &[1], + &[6], + &[OpType::Put], + &[(Some(11), None)], + ), + new_record_batch_multi_fields( + &[b"k1"], + &[2], + &[7], + &[OpType::Put], + &[(Some(22), Some(222))], + ), + new_record_batch_multi_fields( + &[b"k1"], + &[3], + &[5], + &[OpType::Put], + &[(Some(13), Some(3))], + ), + ], + ); + } + + #[test] + fn test_flat_last_non_null_strategy_delete_middle() { + let input = vec![ + new_record_batch_multi_fields( + &[b"k1"], + &[1], + &[7], + &[OpType::Put], + &[(Some(11), None)], + ), + new_record_batch_multi_fields(&[b"k1"], &[1], &[4], &[OpType::Delete], &[(None, None)]), + new_record_batch_multi_fields( + &[b"k1"], + &[1], + &[1], + &[OpType::Put], + &[(Some(12), Some(1))], + ), + new_record_batch_multi_fields( + &[b"k1"], + &[2], + &[8], + &[OpType::Put], + &[(Some(21), None)], + ), + new_record_batch_multi_fields(&[b"k1"], &[2], &[5], &[OpType::Delete], &[(None, None)]), + new_record_batch_multi_fields( + &[b"k1"], + &[2], + &[2], + &[OpType::Put], + &[(Some(22), Some(2))], + ), + new_record_batch_multi_fields( + &[b"k1"], + &[3], + &[9], + &[OpType::Put], + &[(Some(31), None)], + ), + new_record_batch_multi_fields(&[b"k1"], &[3], &[6], &[OpType::Delete], &[(None, None)]), + new_record_batch_multi_fields( + &[b"k1"], + &[3], + &[3], + &[OpType::Put], + &[(Some(32), Some(3))], + ), + ]; + + let mut strategy = FlatLastNonNull::new(1, true); + check_flat_dedup_strategy( + &input, + &mut strategy, + &[ + new_record_batch_multi_fields( + &[b"k1"], + &[1], + &[7], + &[OpType::Put], + &[(Some(11), None)], + ), + new_record_batch_multi_fields( + &[b"k1"], + &[2], + &[8], + &[OpType::Put], + &[(Some(21), None)], + ), + new_record_batch_multi_fields( + &[b"k1"], + &[3], + &[9], + &[OpType::Put], + &[(Some(31), None)], + ), + ], + ); + } }