diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index 695f919979..058c5272c2 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -774,7 +774,12 @@ fn memtable_flat_sources( let iter = only_range.build_record_batch_iter(None)?; // Dedup according to append mode and merge mode. // Even single range may have duplicate rows. - let iter = maybe_dedup_one(options, field_column_start, iter); + let iter = maybe_dedup_one( + options.append_mode, + options.merge_mode(), + field_column_start, + iter, + ); flat_sources.sources.push(FlatSource::Iter(iter)); }; } else { @@ -842,17 +847,18 @@ fn merge_and_dedup( Ok(maybe_dedup) } -fn maybe_dedup_one( - options: &RegionOptions, +pub fn maybe_dedup_one( + append_mode: bool, + merge_mode: MergeMode, field_column_start: usize, input_iter: BoxedRecordBatchIterator, ) -> BoxedRecordBatchIterator { - if options.append_mode { + if append_mode { // No dedup in append mode input_iter } else { // Dedup according to merge mode. - match options.merge_mode() { + match merge_mode { MergeMode::LastRow => { Box::new(FlatDedupIterator::new(input_iter, FlatLastRow::new(false))) } diff --git a/src/mito2/src/memtable.rs b/src/mito2/src/memtable.rs index e95d5e348f..c9ff2c0a98 100644 --- a/src/mito2/src/memtable.rs +++ b/src/mito2/src/memtable.rs @@ -55,10 +55,8 @@ pub mod time_partition; pub mod time_series; pub(crate) mod version; -#[cfg(any(test, feature = "test"))] -pub use bulk::part::BulkPart; pub use bulk::part::{ - BulkPartEncoder, BulkPartMeta, UnorderedPart, record_batch_estimated_size, + BulkPart, BulkPartEncoder, BulkPartMeta, UnorderedPart, record_batch_estimated_size, sort_primary_key_record_batch, }; #[cfg(any(test, feature = "test"))] diff --git a/src/mito2/src/memtable/bulk.rs b/src/mito2/src/memtable/bulk.rs index 2c26410ca6..beae618520 100644 --- a/src/mito2/src/memtable/bulk.rs +++ b/src/mito2/src/memtable/bulk.rs @@ -668,10 +668,10 @@ impl BulkMemtable { } /// Iterator builder for bulk range -struct BulkRangeIterBuilder { - part: BulkPart, - context: Arc, - sequence: Option, +pub struct BulkRangeIterBuilder { + pub part: BulkPart, + pub context: Arc, + pub sequence: Option, } impl IterBuilder for BulkRangeIterBuilder { @@ -1188,7 +1188,6 @@ impl MemtableBuilder for BulkMemtableBuilder { #[cfg(test)] mod tests { - use mito_codec::row_converter::build_primary_key_codec; use super::*; diff --git a/src/mito2/src/memtable/bulk/part.rs b/src/mito2/src/memtable/bulk/part.rs index 4bf6d0fadb..e79d1d83b8 100644 --- a/src/mito2/src/memtable/bulk/part.rs +++ b/src/mito2/src/memtable/bulk/part.rs @@ -1211,343 +1211,24 @@ impl BulkPartEncoder { } } -/// Converts mutations to record batches. -fn mutations_to_record_batch( - mutations: &[Mutation], - metadata: &RegionMetadataRef, - pk_encoder: &DensePrimaryKeyCodec, - dedup: bool, -) -> Result> { - let total_rows: usize = mutations - .iter() - .map(|m| m.rows.as_ref().map(|r| r.rows.len()).unwrap_or(0)) - .sum(); - - if total_rows == 0 { - return Ok(None); - } - - let mut pk_builder = BinaryBuilder::with_capacity(total_rows, 0); - - let mut ts_vector: Box = metadata - .time_index_column() - .column_schema - .data_type - .create_mutable_vector(total_rows); - let mut sequence_builder = UInt64Builder::with_capacity(total_rows); - let mut op_type_builder = UInt8Builder::with_capacity(total_rows); - - let mut field_builders: Vec> = metadata - .field_columns() - .map(|f| f.column_schema.data_type.create_mutable_vector(total_rows)) - .collect(); - - let mut pk_buffer = vec![]; - for m in mutations { - let Some(key_values) = KeyValuesRef::new(metadata, m) else { - continue; - }; - - for row in key_values.iter() { - pk_buffer.clear(); - pk_encoder - .encode_to_vec(row.primary_keys(), &mut pk_buffer) - .context(EncodeSnafu)?; - pk_builder.append_value(pk_buffer.as_bytes()); - ts_vector.push_value_ref(&row.timestamp()); - sequence_builder.append_value(row.sequence()); - op_type_builder.append_value(row.op_type() as u8); - for (builder, field) in field_builders.iter_mut().zip(row.fields()) { - builder.push_value_ref(&field); - } - } - } - - let arrow_schema = to_sst_arrow_schema(metadata); - // safety: timestamp column must be valid, and values must not be None. - let timestamp_unit = metadata - .time_index_column() - .column_schema - .data_type - .as_timestamp() - .unwrap() - .unit(); - let sorter = ArraysSorter { - encoded_primary_keys: pk_builder.finish(), - timestamp_unit, - timestamp: ts_vector.to_vector().to_arrow_array(), - sequence: sequence_builder.finish(), - op_type: op_type_builder.finish(), - fields: field_builders - .iter_mut() - .map(|f| f.to_vector().to_arrow_array()), - dedup, - arrow_schema, - }; - - sorter.sort().map(Some) -} - -struct ArraysSorter { - encoded_primary_keys: BinaryArray, - timestamp_unit: TimeUnit, - timestamp: ArrayRef, - sequence: UInt64Array, - op_type: UInt8Array, - fields: I, - dedup: bool, - arrow_schema: SchemaRef, -} - -impl ArraysSorter -where - I: Iterator, -{ - /// Converts arrays to record batch. - fn sort(self) -> Result<(RecordBatch, i64, i64)> { - debug_assert!(!self.timestamp.is_empty()); - debug_assert!(self.timestamp.len() == self.sequence.len()); - debug_assert!(self.timestamp.len() == self.op_type.len()); - debug_assert!(self.timestamp.len() == self.encoded_primary_keys.len()); - - let timestamp_iter = timestamp_array_to_iter(self.timestamp_unit, &self.timestamp); - let (mut min_timestamp, mut max_timestamp) = (i64::MAX, i64::MIN); - let mut to_sort = self - .encoded_primary_keys - .iter() - .zip(timestamp_iter) - .zip(self.sequence.iter()) - .map(|((pk, timestamp), sequence)| { - max_timestamp = max_timestamp.max(*timestamp); - min_timestamp = min_timestamp.min(*timestamp); - (pk, timestamp, sequence) - }) - .enumerate() - .collect::>(); - - to_sort.sort_unstable_by(|(_, (l_pk, l_ts, l_seq)), (_, (r_pk, r_ts, r_seq))| { - l_pk.cmp(r_pk) - .then(l_ts.cmp(r_ts)) - .then(l_seq.cmp(r_seq).reverse()) - }); - - if self.dedup { - // Dedup by timestamps while ignore sequence. - to_sort.dedup_by(|(_, (l_pk, l_ts, _)), (_, (r_pk, r_ts, _))| { - l_pk == r_pk && l_ts == r_ts - }); - } - - let indices = UInt32Array::from_iter_values(to_sort.iter().map(|v| v.0 as u32)); - - let pk_dictionary = Arc::new(binary_array_to_dictionary( - // safety: pk must be BinaryArray - arrow::compute::take( - &self.encoded_primary_keys, - &indices, - Some(TakeOptions { - check_bounds: false, - }), - ) - .context(ComputeArrowSnafu)? - .as_any() - .downcast_ref::() - .unwrap(), - )?) as ArrayRef; - - let mut arrays = Vec::with_capacity(self.arrow_schema.fields.len()); - for arr in self.fields { - arrays.push( - arrow::compute::take( - &arr, - &indices, - Some(TakeOptions { - check_bounds: false, - }), - ) - .context(ComputeArrowSnafu)?, - ); - } - - let timestamp = arrow::compute::take( - &self.timestamp, - &indices, - Some(TakeOptions { - check_bounds: false, - }), - ) - .context(ComputeArrowSnafu)?; - - arrays.push(timestamp); - arrays.push(pk_dictionary); - arrays.push( - arrow::compute::take( - &self.sequence, - &indices, - Some(TakeOptions { - check_bounds: false, - }), - ) - .context(ComputeArrowSnafu)?, - ); - - arrays.push( - arrow::compute::take( - &self.op_type, - &indices, - Some(TakeOptions { - check_bounds: false, - }), - ) - .context(ComputeArrowSnafu)?, - ); - - let batch = RecordBatch::try_new(self.arrow_schema, arrays).context(NewRecordBatchSnafu)?; - Ok((batch, min_timestamp, max_timestamp)) - } -} - -/// Converts timestamp array to an iter of i64 values. -fn timestamp_array_to_iter( - timestamp_unit: TimeUnit, - timestamp: &ArrayRef, -) -> impl Iterator { - match timestamp_unit { - // safety: timestamp column must be valid. - TimeUnit::Second => timestamp - .as_any() - .downcast_ref::() - .unwrap() - .values() - .iter(), - TimeUnit::Millisecond => timestamp - .as_any() - .downcast_ref::() - .unwrap() - .values() - .iter(), - TimeUnit::Microsecond => timestamp - .as_any() - .downcast_ref::() - .unwrap() - .values() - .iter(), - TimeUnit::Nanosecond => timestamp - .as_any() - .downcast_ref::() - .unwrap() - .values() - .iter(), - } -} - -/// Converts a **sorted** [BinaryArray] to [DictionaryArray]. -fn binary_array_to_dictionary(input: &BinaryArray) -> Result { - if input.is_empty() { - return Ok(DictionaryArray::new( - UInt32Array::from(Vec::::new()), - Arc::new(BinaryArray::from_vec(vec![])) as ArrayRef, - )); - } - let mut keys = Vec::with_capacity(16); - let mut values = BinaryBuilder::new(); - let mut prev: usize = 0; - keys.push(prev as u32); - values.append_value(input.value(prev)); - - for current_bytes in input.iter().skip(1) { - // safety: encoded pk must present. - let current_bytes = current_bytes.unwrap(); - let prev_bytes = input.value(prev); - if current_bytes != prev_bytes { - values.append_value(current_bytes); - prev += 1; - } - keys.push(prev as u32); - } - - Ok(DictionaryArray::new( - UInt32Array::from(keys), - Arc::new(values.finish()) as ArrayRef, - )) -} - #[cfg(test)] mod tests { - use std::collections::VecDeque; - use api::v1::{Row, SemanticType, WriteHint}; use datafusion_common::ScalarValue; use datatypes::arrow::array::Float64Array; - use datatypes::prelude::{ConcreteDataType, ScalarVector, Value}; + use datatypes::prelude::{ConcreteDataType, Value}; use datatypes::schema::ColumnSchema; - use datatypes::vectors::{Float64Vector, TimestampMillisecondVector}; use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder}; use store_api::storage::RegionId; use store_api::storage::consts::ReservedColumnId; use super::*; use crate::memtable::bulk::context::BulkIterContext; - use crate::sst::parquet::format::{PrimaryKeyReadFormat, ReadFormat}; use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema}; use crate::test_util::memtable_util::{ build_key_values_with_ts_seq_values, metadata_for_test, region_metadata_to_row_schema, }; - fn check_binary_array_to_dictionary( - input: &[&[u8]], - expected_keys: &[u32], - expected_values: &[&[u8]], - ) { - let input = BinaryArray::from_iter_values(input.iter()); - let array = binary_array_to_dictionary(&input).unwrap(); - assert_eq!( - &expected_keys, - &array.keys().iter().map(|v| v.unwrap()).collect::>() - ); - assert_eq!( - expected_values, - &array - .values() - .as_any() - .downcast_ref::() - .unwrap() - .iter() - .map(|v| v.unwrap()) - .collect::>() - ); - } - - #[test] - fn test_binary_array_to_dictionary() { - check_binary_array_to_dictionary(&[], &[], &[]); - - check_binary_array_to_dictionary(&["a".as_bytes()], &[0], &["a".as_bytes()]); - - check_binary_array_to_dictionary( - &["a".as_bytes(), "a".as_bytes()], - &[0, 0], - &["a".as_bytes()], - ); - - check_binary_array_to_dictionary( - &["a".as_bytes(), "a".as_bytes(), "b".as_bytes()], - &[0, 0, 1], - &["a".as_bytes(), "b".as_bytes()], - ); - - check_binary_array_to_dictionary( - &[ - "a".as_bytes(), - "a".as_bytes(), - "b".as_bytes(), - "c".as_bytes(), - ], - &[0, 0, 1, 2], - &["a".as_bytes(), "b".as_bytes(), "c".as_bytes()], - ); - } - struct MutationInput<'a> { k0: &'a str, k1: u32, @@ -1563,232 +1244,6 @@ mod tests { v1: &'a [Option], } - fn check_mutations_to_record_batches( - input: &[MutationInput], - expected: &[BatchOutput], - expected_timestamp: (i64, i64), - dedup: bool, - ) { - let metadata = metadata_for_test(); - let mutations = input - .iter() - .map(|m| { - build_key_values_with_ts_seq_values( - &metadata, - m.k0.to_string(), - m.k1, - m.timestamps.iter().copied(), - m.v1.iter().copied(), - m.sequence, - ) - .mutation - }) - .collect::>(); - let total_rows: usize = mutations - .iter() - .flat_map(|m| m.rows.iter()) - .map(|r| r.rows.len()) - .sum(); - - let pk_encoder = DensePrimaryKeyCodec::new(&metadata); - - let (batch, _, _) = mutations_to_record_batch(&mutations, &metadata, &pk_encoder, dedup) - .unwrap() - .unwrap(); - let read_format = PrimaryKeyReadFormat::new_with_all_columns(metadata.clone()); - let mut batches = VecDeque::new(); - read_format - .convert_record_batch(&batch, None, &mut batches) - .unwrap(); - if !dedup { - assert_eq!( - total_rows, - batches.iter().map(|b| { b.num_rows() }).sum::() - ); - } - let batch_values = batches - .into_iter() - .map(|b| { - let pk_values = pk_encoder.decode(b.primary_key()).unwrap().into_dense(); - let timestamps = b - .timestamps() - .as_any() - .downcast_ref::() - .unwrap() - .iter_data() - .map(|v| v.unwrap().0.value()) - .collect::>(); - let float_values = b.fields()[1] - .data - .as_any() - .downcast_ref::() - .unwrap() - .iter_data() - .collect::>(); - - (pk_values, timestamps, float_values) - }) - .collect::>(); - assert_eq!(expected.len(), batch_values.len()); - - for idx in 0..expected.len() { - assert_eq!(expected[idx].pk_values, &batch_values[idx].0); - assert_eq!(expected[idx].timestamps, &batch_values[idx].1); - assert_eq!(expected[idx].v1, &batch_values[idx].2); - } - } - - #[test] - fn test_mutations_to_record_batch() { - check_mutations_to_record_batches( - &[MutationInput { - k0: "a", - k1: 0, - timestamps: &[0], - v1: &[Some(0.1)], - sequence: 0, - }], - &[BatchOutput { - pk_values: &[Value::String("a".into()), Value::UInt32(0)], - timestamps: &[0], - v1: &[Some(0.1)], - }], - (0, 0), - true, - ); - - check_mutations_to_record_batches( - &[ - MutationInput { - k0: "a", - k1: 0, - timestamps: &[0], - v1: &[Some(0.1)], - sequence: 0, - }, - MutationInput { - k0: "b", - k1: 0, - timestamps: &[0], - v1: &[Some(0.0)], - sequence: 0, - }, - MutationInput { - k0: "a", - k1: 0, - timestamps: &[1], - v1: &[Some(0.2)], - sequence: 1, - }, - MutationInput { - k0: "a", - k1: 1, - timestamps: &[1], - v1: &[Some(0.3)], - sequence: 2, - }, - ], - &[ - BatchOutput { - pk_values: &[Value::String("a".into()), Value::UInt32(0)], - timestamps: &[0, 1], - v1: &[Some(0.1), Some(0.2)], - }, - BatchOutput { - pk_values: &[Value::String("a".into()), Value::UInt32(1)], - timestamps: &[1], - v1: &[Some(0.3)], - }, - BatchOutput { - pk_values: &[Value::String("b".into()), Value::UInt32(0)], - timestamps: &[0], - v1: &[Some(0.0)], - }, - ], - (0, 1), - true, - ); - - check_mutations_to_record_batches( - &[ - MutationInput { - k0: "a", - k1: 0, - timestamps: &[0], - v1: &[Some(0.1)], - sequence: 0, - }, - MutationInput { - k0: "b", - k1: 0, - timestamps: &[0], - v1: &[Some(0.0)], - sequence: 0, - }, - MutationInput { - k0: "a", - k1: 0, - timestamps: &[0], - v1: &[Some(0.2)], - sequence: 1, - }, - ], - &[ - BatchOutput { - pk_values: &[Value::String("a".into()), Value::UInt32(0)], - timestamps: &[0], - v1: &[Some(0.2)], - }, - BatchOutput { - pk_values: &[Value::String("b".into()), Value::UInt32(0)], - timestamps: &[0], - v1: &[Some(0.0)], - }, - ], - (0, 0), - true, - ); - check_mutations_to_record_batches( - &[ - MutationInput { - k0: "a", - k1: 0, - timestamps: &[0], - v1: &[Some(0.1)], - sequence: 0, - }, - MutationInput { - k0: "b", - k1: 0, - timestamps: &[0], - v1: &[Some(0.0)], - sequence: 0, - }, - MutationInput { - k0: "a", - k1: 0, - timestamps: &[0], - v1: &[Some(0.2)], - sequence: 1, - }, - ], - &[ - BatchOutput { - pk_values: &[Value::String("a".into()), Value::UInt32(0)], - timestamps: &[0, 0], - v1: &[Some(0.2), Some(0.1)], - }, - BatchOutput { - pk_values: &[Value::String("b".into()), Value::UInt32(0)], - timestamps: &[0], - v1: &[Some(0.0)], - }, - ], - (0, 0), - false, - ); - } - fn encode(input: &[MutationInput]) -> EncodedBulkPart { let metadata = metadata_for_test(); let kvs = input diff --git a/src/mito2/src/sst.rs b/src/mito2/src/sst.rs index f3f51bdc08..78e4c563b1 100644 --- a/src/mito2/src/sst.rs +++ b/src/mito2/src/sst.rs @@ -121,7 +121,7 @@ impl FlatSchemaOptions { /// /// The schema is: /// ```text -/// primary key columns, field columns, time index, __prmary_key, __sequence, __op_type +/// primary key columns, field columns, time index, __primary_key, __sequence, __op_type /// ``` /// /// # Panics