diff --git a/src/mito2/src/read/dedup.rs b/src/mito2/src/read/dedup.rs index 0ba6f86113..c8709edcd9 100644 --- a/src/mito2/src/read/dedup.rs +++ b/src/mito2/src/read/dedup.rs @@ -14,13 +14,18 @@ //! Utilities to remove duplicate rows from a sorted batch. +use api::v1::OpType; use async_trait::async_trait; use common_telemetry::debug; use common_time::Timestamp; +use datatypes::data_type::DataType; +use datatypes::prelude::ScalarVector; +use datatypes::value::Value; +use datatypes::vectors::MutableVector; use crate::error::Result; use crate::metrics::MERGE_FILTER_ROWS_TOTAL; -use crate::read::{Batch, BatchReader}; +use crate::read::{Batch, BatchColumn, BatchReader}; /// A reader that dedup sorted batches from a source based on the /// dedup strategy. @@ -181,12 +186,7 @@ impl DedupStrategy for LastRow { // Filters deleted rows. if self.filter_deleted { - let num_rows = batch.num_rows(); - batch.filter_deleted()?; - let num_rows_after_filter = batch.num_rows(); - let num_deleted = num_rows - num_rows_after_filter; - metrics.num_deleted_rows += num_deleted; - metrics.num_unselected_rows += num_deleted; + filter_deleted_from_batch(&mut batch, metrics)?; } // The batch can become empty if all rows are deleted. @@ -202,6 +202,18 @@ impl DedupStrategy for LastRow { } } +/// Removes deleted rows from the batch and updates metrics. +fn filter_deleted_from_batch(batch: &mut Batch, metrics: &mut DedupMetrics) -> Result<()> { + let num_rows = batch.num_rows(); + batch.filter_deleted()?; + let num_rows_after_filter = batch.num_rows(); + let num_deleted = num_rows - num_rows_after_filter; + metrics.num_deleted_rows += num_deleted; + metrics.num_unselected_rows += num_deleted; + + Ok(()) +} + /// Metrics for deduplication. #[derive(Debug, Default)] pub(crate) struct DedupMetrics { @@ -211,11 +223,260 @@ pub(crate) struct DedupMetrics { pub(crate) num_deleted_rows: usize, } +/// Buffer to store fields in the last row to merge. +struct LastFieldsBuilder { + /// Filter deleted rows. + filter_deleted: bool, + /// Fields builders, lazy initialized. + builders: Vec>, + /// Last fields to merge, lazy initialized. + /// Only initializes this field when `skip_merge()` is false. + last_fields: Vec, + /// Whether the last row (including `last_fields`) has null field. + /// Only sets this field when `contains_deletion` is false. + contains_null: bool, + /// Whether the last row has delete op. If true, skips merging fields. + contains_deletion: bool, + /// Whether the builder is initialized. + initialized: bool, +} + +impl LastFieldsBuilder { + /// Returns a new builder with the given `filter_deleted` flag. + fn new(filter_deleted: bool) -> Self { + Self { + filter_deleted, + builders: Vec::new(), + last_fields: Vec::new(), + contains_null: false, + contains_deletion: false, + initialized: false, + } + } + + /// Initializes the builders with the last row of the batch. + fn maybe_init(&mut self, batch: &Batch) { + debug_assert!(!batch.is_empty()); + + if self.initialized || batch.fields().is_empty() { + // Already initialized or no fields to merge. + return; + } + + self.initialized = true; + + let last_idx = batch.num_rows() - 1; + let fields = batch.fields(); + // Safety: The last_idx is valid. + self.contains_deletion = + batch.op_types().get_data(last_idx).unwrap() == OpType::Delete as u8; + // If the row has been deleted, then we don't need to merge fields. + if !self.contains_deletion { + self.contains_null = fields.iter().any(|col| col.data.is_null(last_idx)); + } + + if self.skip_merge() { + // No null field or the row has been deleted, no need to merge. + return; + } + if self.builders.is_empty() { + self.builders = fields + .iter() + .map(|col| col.data.data_type().create_mutable_vector(1)) + .collect(); + } + self.last_fields = fields.iter().map(|col| col.data.get(last_idx)).collect(); + } + + /// Returns true if the builder don't need to merge the rows. + fn skip_merge(&self) -> bool { + debug_assert!(self.initialized); + + // No null field or the row has been deleted, no need to merge. + self.contains_deletion || !self.contains_null + } + + /// Pushes first row of a batch to the builder. + fn push_first_row(&mut self, batch: &Batch) { + debug_assert!(self.initialized); + debug_assert!(!batch.is_empty()); + + if self.skip_merge() { + // No remaining null field, skips this batch. + return; + } + + let fields = batch.fields(); + for (idx, value) in self.last_fields.iter_mut().enumerate() { + if value.is_null() && !fields[idx].data.is_null(0) { + // Updates the value. + *value = fields[idx].data.get(0); + } + } + // Updates the flag. + self.contains_null = self.last_fields.iter().any(Value::is_null); + } + + /// Merges last not null fields, builds a new batch and resets the builder. + /// It may overwrites the last row of the `buffer`. + fn merge_last_not_null( + &mut self, + buffer: Batch, + metrics: &mut DedupMetrics, + ) -> Result> { + debug_assert!(self.initialized); + + let mut output = if self.last_fields.is_empty() { + // No need to overwrite the last row. + buffer + } else { + // Builds last fields. + for (builder, value) in self.builders.iter_mut().zip(&self.last_fields) { + // Safety: Vectors of the batch has the same type. + builder.push_value_ref(value.as_value_ref()); + } + let fields = self + .builders + .iter_mut() + .zip(buffer.fields()) + .map(|(builder, col)| BatchColumn { + column_id: col.column_id, + data: builder.to_vector(), + }) + .collect(); + + if buffer.num_rows() == 1 { + // Replaces the buffer directly if it only has one row. + buffer.with_fields(fields)? + } else { + // Replaces the last row of the buffer. + let front = buffer.slice(0, buffer.num_rows() - 1); + let last = buffer.slice(buffer.num_rows() - 1, 1); + let last = last.with_fields(fields)?; + Batch::concat(vec![front, last])? + } + }; + + // Resets itself. `self.builders` is already reset in `to_vector()`. + self.clear(); + + if self.filter_deleted { + filter_deleted_from_batch(&mut output, metrics)?; + } + + if output.is_empty() { + Ok(None) + } else { + Ok(Some(output)) + } + } + + /// Clears the builder. + fn clear(&mut self) { + self.last_fields.clear(); + self.contains_null = false; + self.contains_deletion = false; + self.initialized = false; + } +} + +/// Dedup strategy that keeps the last not null field for the same key. +/// +/// It assumes that batches from files and memtables don't contain duplicate rows +/// and the merge reader never concatenates batches from different source. +/// +/// We might implement a new strategy if we need to process files with duplicate rows. +pub(crate) struct LastNotNull { + /// Buffered batch that fields in the last row may be updated. + buffer: Option, + /// Fields that overlaps with the last row of the `buffer`. + last_fields: LastFieldsBuilder, +} + +impl LastNotNull { + /// Creates a new strategy with the given `filter_deleted` flag. + #[allow(dead_code)] + pub(crate) fn new(filter_deleted: bool) -> Self { + Self { + buffer: None, + last_fields: LastFieldsBuilder::new(filter_deleted), + } + } +} + +impl DedupStrategy for LastNotNull { + fn push_batch(&mut self, batch: Batch, metrics: &mut DedupMetrics) -> Result> { + if batch.is_empty() { + return Ok(None); + } + + let Some(buffer) = self.buffer.as_mut() else { + // The buffer is empty, store the batch and return. We need to observe the next batch. + self.buffer = Some(batch); + return Ok(None); + }; + + // Initializes last fields with the first buffer. + self.last_fields.maybe_init(buffer); + + if buffer.primary_key() != batch.primary_key() { + // Next key is different. + let buffer = std::mem::replace(buffer, batch); + let merged = self.last_fields.merge_last_not_null(buffer, metrics)?; + return Ok(merged); + } + + if buffer.last_timestamp() != batch.first_timestamp() { + // The next batch has a different timestamp. + let buffer = std::mem::replace(buffer, batch); + let merged = self.last_fields.merge_last_not_null(buffer, metrics)?; + return Ok(merged); + } + + // The next batch has the same key and timestamp. + + metrics.num_unselected_rows += 1; + // We assumes each batch doesn't contain duplicate rows so we only need to check the first row. + if batch.num_rows() == 1 { + self.last_fields.push_first_row(&batch); + return Ok(None); + } + + // The next batch has the same key and timestamp but contains multiple rows. + // We can merge the first row and buffer the remaining rows. + let first = batch.slice(0, 1); + self.last_fields.push_first_row(&first); + // Moves the remaining rows to the buffer. + let batch = batch.slice(1, batch.num_rows() - 1); + let buffer = std::mem::replace(buffer, batch); + let merged = self.last_fields.merge_last_not_null(buffer, metrics)?; + + Ok(merged) + } + + fn finish(&mut self, metrics: &mut DedupMetrics) -> Result> { + let Some(buffer) = self.buffer.take() else { + return Ok(None); + }; + + // Initializes last fields with the first buffer. + self.last_fields.maybe_init(&buffer); + + let merged = self.last_fields.merge_last_not_null(buffer, metrics)?; + + Ok(merged) + } +} + #[cfg(test)] mod tests { + use std::sync::Arc; + use api::v1::OpType; + use datatypes::arrow::array::{TimestampMillisecondArray, UInt64Array, UInt8Array}; use super::*; + use crate::read::BatchBuilder; use crate::test_util::{check_reader_result, new_batch, VecBatchReader}; #[tokio::test] @@ -237,11 +498,20 @@ mod tests { &[31, 32], ), ]; + + // Test last row. let reader = VecBatchReader::new(&input); let mut reader = DedupReader::new(reader, LastRow::new(true)); check_reader_result(&mut reader, &input).await; assert_eq!(0, reader.metrics().num_unselected_rows); assert_eq!(0, reader.metrics().num_deleted_rows); + + // Test last not null. + let reader = VecBatchReader::new(&input); + let mut reader = DedupReader::new(reader, LastNotNull::new(true)); + check_reader_result(&mut reader, &input).await; + assert_eq!(0, reader.metrics().num_unselected_rows); + assert_eq!(0, reader.metrics().num_deleted_rows); } #[tokio::test] @@ -277,8 +547,8 @@ mod tests { // is filtered out by the previous batch. new_batch(b"k3", &[2], &[19], &[OpType::Delete], &[0]), ]; - let reader = VecBatchReader::new(&input); // Filter deleted. + let reader = VecBatchReader::new(&input); let mut reader = DedupReader::new(reader, LastRow::new(true)); check_reader_result( &mut reader, @@ -333,4 +603,383 @@ mod tests { assert_eq!(3, reader.metrics().num_unselected_rows); assert_eq!(0, reader.metrics().num_deleted_rows); } + + /// Returns a new [Batch] whose field has column id 1, 2. + fn new_batch_multi_fields( + primary_key: &[u8], + timestamps: &[i64], + sequences: &[u64], + op_types: &[OpType], + fields: &[(Option, Option)], + ) -> Batch { + let mut builder = BatchBuilder::new(primary_key.to_vec()); + builder + .timestamps_array(Arc::new(TimestampMillisecondArray::from_iter_values( + timestamps.iter().copied(), + ))) + .unwrap() + .sequences_array(Arc::new(UInt64Array::from_iter_values( + sequences.iter().copied(), + ))) + .unwrap() + .op_types_array(Arc::new(UInt8Array::from_iter_values( + op_types.iter().map(|v| *v as u8), + ))) + .unwrap() + .push_field_array( + 1, + Arc::new(UInt64Array::from_iter(fields.iter().map(|field| field.0))), + ) + .unwrap() + .push_field_array( + 2, + Arc::new(UInt64Array::from_iter(fields.iter().map(|field| field.1))), + ) + .unwrap(); + builder.build().unwrap() + } + + #[tokio::test] + async fn test_last_not_null_merge() { + let input = [ + new_batch_multi_fields( + b"k1", + &[1, 2], + &[13, 11], + &[OpType::Put, OpType::Put], + &[(Some(11), Some(11)), (None, None)], + ), + // empty batch. + new_batch_multi_fields(b"k1", &[], &[], &[], &[]), + // Duplicate with the previous batch. + new_batch_multi_fields(b"k1", &[2], &[10], &[OpType::Put], &[(Some(12), None)]), + new_batch_multi_fields( + b"k1", + &[2, 3, 4], + &[10, 13, 13], + &[OpType::Put, OpType::Put, OpType::Delete], + &[(Some(2), Some(22)), (Some(13), None), (None, Some(14))], + ), + new_batch_multi_fields( + b"k2", + &[1, 2], + &[20, 20], + &[OpType::Put, OpType::Delete], + &[(Some(101), Some(101)), (None, None)], + ), + new_batch_multi_fields( + b"k2", + &[2], + &[19], + &[OpType::Put], + &[(Some(102), Some(102))], + ), + new_batch_multi_fields( + b"k3", + &[2], + &[20], + &[OpType::Put], + &[(Some(202), Some(202))], + ), + // This batch won't increase the deleted rows count as it + // is filtered out by the previous batch. (All fields are null). + new_batch_multi_fields(b"k3", &[2], &[19], &[OpType::Delete], &[(None, None)]), + ]; + + // Filter deleted. + let reader = VecBatchReader::new(&input); + let mut reader = DedupReader::new(reader, LastNotNull::new(true)); + check_reader_result( + &mut reader, + &[ + new_batch_multi_fields( + b"k1", + &[1, 2], + &[13, 11], + &[OpType::Put, OpType::Put], + &[(Some(11), Some(11)), (Some(12), Some(22))], + ), + new_batch_multi_fields(b"k1", &[3], &[13], &[OpType::Put], &[(Some(13), None)]), + new_batch_multi_fields( + b"k2", + &[1], + &[20], + &[OpType::Put], + &[(Some(101), Some(101))], + ), + new_batch_multi_fields( + b"k3", + &[2], + &[20], + &[OpType::Put], + &[(Some(202), Some(202))], + ), + ], + ) + .await; + assert_eq!(6, reader.metrics().num_unselected_rows); + assert_eq!(2, reader.metrics().num_deleted_rows); + + // Does not filter deleted. + let reader = VecBatchReader::new(&input); + let mut reader = DedupReader::new(reader, LastNotNull::new(false)); + check_reader_result( + &mut reader, + &[ + new_batch_multi_fields( + b"k1", + &[1, 2], + &[13, 11], + &[OpType::Put, OpType::Put], + &[(Some(11), Some(11)), (Some(12), Some(22))], + ), + new_batch_multi_fields( + b"k1", + &[3, 4], + &[13, 13], + &[OpType::Put, OpType::Delete], + &[(Some(13), None), (None, Some(14))], + ), + new_batch_multi_fields( + b"k2", + &[1, 2], + &[20, 20], + &[OpType::Put, OpType::Delete], + &[(Some(101), Some(101)), (None, None)], + ), + new_batch_multi_fields( + b"k3", + &[2], + &[20], + &[OpType::Put], + &[(Some(202), Some(202))], + ), + ], + ) + .await; + assert_eq!(4, reader.metrics().num_unselected_rows); + assert_eq!(0, reader.metrics().num_deleted_rows); + } + + #[tokio::test] + async fn test_last_not_null_skip_merge_single() { + let input = [new_batch_multi_fields( + b"k1", + &[1, 2, 3], + &[13, 11, 13], + &[OpType::Put, OpType::Delete, OpType::Put], + &[(Some(11), Some(11)), (None, None), (Some(13), Some(13))], + )]; + + let reader = VecBatchReader::new(&input); + let mut reader = DedupReader::new(reader, LastNotNull::new(true)); + check_reader_result( + &mut reader, + &[new_batch_multi_fields( + b"k1", + &[1, 3], + &[13, 13], + &[OpType::Put, OpType::Put], + &[(Some(11), Some(11)), (Some(13), Some(13))], + )], + ) + .await; + assert_eq!(1, reader.metrics().num_unselected_rows); + assert_eq!(1, reader.metrics().num_deleted_rows); + + let reader = VecBatchReader::new(&input); + let mut reader = DedupReader::new(reader, LastNotNull::new(false)); + check_reader_result(&mut reader, &input).await; + assert_eq!(0, reader.metrics().num_unselected_rows); + assert_eq!(0, reader.metrics().num_deleted_rows); + } + + #[tokio::test] + async fn test_last_not_null_skip_merge_no_null() { + let input = [ + new_batch_multi_fields( + b"k1", + &[1, 2], + &[13, 11], + &[OpType::Put, OpType::Put], + &[(Some(11), Some(11)), (Some(12), Some(12))], + ), + new_batch_multi_fields(b"k1", &[2], &[10], &[OpType::Put], &[(None, Some(22))]), + new_batch_multi_fields( + b"k1", + &[2, 3], + &[9, 13], + &[OpType::Put, OpType::Put], + &[(Some(32), None), (Some(13), Some(13))], + ), + ]; + + let reader = VecBatchReader::new(&input); + let mut reader = DedupReader::new(reader, LastNotNull::new(true)); + check_reader_result( + &mut reader, + &[ + new_batch_multi_fields( + b"k1", + &[1, 2], + &[13, 11], + &[OpType::Put, OpType::Put], + &[(Some(11), Some(11)), (Some(12), Some(12))], + ), + new_batch_multi_fields(b"k1", &[3], &[13], &[OpType::Put], &[(Some(13), Some(13))]), + ], + ) + .await; + assert_eq!(2, reader.metrics().num_unselected_rows); + assert_eq!(0, reader.metrics().num_deleted_rows); + } + + #[tokio::test] + async fn test_last_not_null_merge_null() { + let input = [ + new_batch_multi_fields( + b"k1", + &[1, 2], + &[13, 11], + &[OpType::Put, OpType::Put], + &[(Some(11), Some(11)), (None, None)], + ), + new_batch_multi_fields(b"k1", &[2], &[10], &[OpType::Put], &[(None, Some(22))]), + new_batch_multi_fields(b"k1", &[3], &[13], &[OpType::Put], &[(Some(33), None)]), + ]; + + let reader = VecBatchReader::new(&input); + let mut reader = DedupReader::new(reader, LastNotNull::new(true)); + check_reader_result( + &mut reader, + &[ + new_batch_multi_fields( + b"k1", + &[1, 2], + &[13, 11], + &[OpType::Put, OpType::Put], + &[(Some(11), Some(11)), (None, Some(22))], + ), + new_batch_multi_fields(b"k1", &[3], &[13], &[OpType::Put], &[(Some(33), None)]), + ], + ) + .await; + assert_eq!(1, reader.metrics().num_unselected_rows); + assert_eq!(0, reader.metrics().num_deleted_rows); + } + + fn check_dedup_strategy(input: &[Batch], strategy: &mut dyn DedupStrategy, expect: &[Batch]) { + let mut actual = Vec::new(); + let mut metrics = DedupMetrics::default(); + for batch in input { + if let Some(out) = strategy.push_batch(batch.clone(), &mut metrics).unwrap() { + actual.push(out); + } + } + if let Some(out) = strategy.finish(&mut metrics).unwrap() { + actual.push(out); + } + + assert_eq!(expect, actual); + } + + #[test] + fn test_last_not_null_strategy_delete_last() { + let input = [ + new_batch_multi_fields(b"k1", &[1], &[6], &[OpType::Put], &[(Some(11), None)]), + new_batch_multi_fields( + b"k1", + &[1, 2], + &[1, 7], + &[OpType::Put, OpType::Put], + &[(Some(1), None), (Some(22), Some(222))], + ), + new_batch_multi_fields(b"k1", &[2], &[4], &[OpType::Put], &[(Some(12), None)]), + new_batch_multi_fields( + b"k2", + &[2, 3], + &[2, 5], + &[OpType::Put, OpType::Delete], + &[(None, None), (Some(13), None)], + ), + new_batch_multi_fields(b"k2", &[3], &[3], &[OpType::Put], &[(None, Some(3))]), + ]; + + let mut strategy = LastNotNull::new(true); + check_dedup_strategy( + &input, + &mut strategy, + &[ + new_batch_multi_fields(b"k1", &[1], &[6], &[OpType::Put], &[(Some(11), None)]), + new_batch_multi_fields(b"k1", &[2], &[7], &[OpType::Put], &[(Some(22), Some(222))]), + new_batch_multi_fields(b"k2", &[2], &[2], &[OpType::Put], &[(None, None)]), + ], + ); + } + + #[test] + fn test_last_not_null_strategy_delete_one() { + let input = [ + new_batch_multi_fields(b"k1", &[1], &[1], &[OpType::Delete], &[(None, None)]), + new_batch_multi_fields(b"k2", &[1], &[6], &[OpType::Put], &[(Some(11), None)]), + ]; + + let mut strategy = LastNotNull::new(true); + check_dedup_strategy( + &input, + &mut strategy, + &[new_batch_multi_fields( + b"k2", + &[1], + &[6], + &[OpType::Put], + &[(Some(11), None)], + )], + ); + } + + #[test] + fn test_last_not_null_strategy_delete_all() { + let input = [ + new_batch_multi_fields(b"k1", &[1], &[1], &[OpType::Delete], &[(None, None)]), + new_batch_multi_fields(b"k2", &[1], &[6], &[OpType::Delete], &[(Some(11), None)]), + ]; + + let mut strategy = LastNotNull::new(true); + check_dedup_strategy(&input, &mut strategy, &[]); + } + + #[test] + fn test_last_not_null_strategy_same_batch() { + let input = [ + new_batch_multi_fields(b"k1", &[1], &[6], &[OpType::Put], &[(Some(11), None)]), + new_batch_multi_fields( + b"k1", + &[1, 2], + &[1, 7], + &[OpType::Put, OpType::Put], + &[(Some(1), None), (Some(22), Some(222))], + ), + new_batch_multi_fields(b"k1", &[2], &[4], &[OpType::Put], &[(Some(12), None)]), + new_batch_multi_fields( + b"k1", + &[2, 3], + &[2, 5], + &[OpType::Put, OpType::Put], + &[(None, None), (Some(13), None)], + ), + new_batch_multi_fields(b"k1", &[3], &[3], &[OpType::Put], &[(None, Some(3))]), + ]; + + let mut strategy = LastNotNull::new(true); + check_dedup_strategy( + &input, + &mut strategy, + &[ + new_batch_multi_fields(b"k1", &[1], &[6], &[OpType::Put], &[(Some(11), None)]), + new_batch_multi_fields(b"k1", &[2], &[7], &[OpType::Put], &[(Some(22), Some(222))]), + new_batch_multi_fields(b"k1", &[3], &[5], &[OpType::Put], &[(Some(13), Some(3))]), + ], + ); + } } diff --git a/src/mito2/src/read/merge.rs b/src/mito2/src/read/merge.rs index 9377bb447c..3a6173360b 100644 --- a/src/mito2/src/read/merge.rs +++ b/src/mito2/src/read/merge.rs @@ -33,6 +33,10 @@ use crate::read::{Batch, BatchReader, BoxedBatchReader, Source}; /// 1. Batch is ordered by primary key, time index, sequence desc, op type desc (we can /// ignore op type as sequence is already unique). /// 2. Batches from sources **must** not be empty. +/// +/// The reader won't concatenate batches. Each batch returned by the reader also doesn't +/// contain duplicate rows. But the last (primary key, timestamp) of a batch may be the same +/// as the first one in the next batch. pub struct MergeReader { /// Holds [Node]s whose key range of current batch **is** overlapped with the merge window. /// Each node yields batches from a `source`.