diff --git a/src/mito2/src/memtable/bulk/part.rs b/src/mito2/src/memtable/bulk/part.rs index 5be155eedb..d57e76ea4e 100644 --- a/src/mito2/src/memtable/bulk/part.rs +++ b/src/mito2/src/memtable/bulk/part.rs @@ -61,7 +61,7 @@ use crate::memtable::bulk::context::BulkIterContextRef; use crate::memtable::bulk::part_reader::EncodedBulkPartIter; use crate::memtable::time_series::{ValueBuilder, Values}; use crate::memtable::BoxedRecordBatchIterator; -use crate::sst::parquet::format::{PrimaryKeyArray, ReadFormat}; +use crate::sst::parquet::format::{PrimaryKeyArray, PrimaryKeyArrayBuilder, ReadFormat}; use crate::sst::parquet::helper::parse_parquet_metadata; use crate::sst::to_sst_arrow_schema; @@ -217,9 +217,6 @@ impl BulkPart { } } -/// Builder type for primary key dictionary array. -type PrimaryKeyArrayBuilder = BinaryDictionaryBuilder; - /// Primary key column builder for handling strings specially. enum PrimaryKeyColumnBuilder { /// String dictionary builder for string types. diff --git a/src/mito2/src/read/compat.rs b/src/mito2/src/read/compat.rs index 839a68a8f0..601d69964f 100644 --- a/src/mito2/src/read/compat.rs +++ b/src/mito2/src/read/compat.rs @@ -17,7 +17,15 @@ use std::collections::HashMap; use std::sync::Arc; +use api::v1::SemanticType; +use datatypes::arrow::array::{ + Array, ArrayRef, BinaryArray, BinaryBuilder, DictionaryArray, UInt32Array, +}; +use datatypes::arrow::compute::{take, TakeOptions}; +use datatypes::arrow::datatypes::{Schema, SchemaRef}; +use datatypes::arrow::record_batch::RecordBatch; use datatypes::data_type::ConcreteDataType; +use datatypes::prelude::DataType; use datatypes::value::Value; use datatypes::vectors::VectorRef; use mito_codec::row_converter::{ @@ -29,17 +37,22 @@ use store_api::metadata::{RegionMetadata, RegionMetadataRef}; use store_api::storage::ColumnId; use crate::error::{ - CompatReaderSnafu, CreateDefaultSnafu, DecodeSnafu, EncodeSnafu, Result, UnexpectedSnafu, + CompatReaderSnafu, ComputeArrowSnafu, CreateDefaultSnafu, DecodeSnafu, EncodeSnafu, + NewRecordBatchSnafu, Result, UnexpectedSnafu, }; +use crate::read::flat_projection::{flat_projected_columns, FlatProjectionMapper}; use crate::read::projection::{PrimaryKeyProjectionMapper, ProjectionMapper}; use crate::read::{Batch, BatchColumn, BatchReader}; +use crate::sst::parquet::flat_format::primary_key_column_index; +use crate::sst::parquet::format::{FormatProjection, PrimaryKeyArray, INTERNAL_COLUMN_NUM}; +use crate::sst::{internal_fields, to_dictionary_field}; /// Reader to adapt schema of underlying reader to expected schema. pub struct CompatReader { /// Underlying reader. reader: R, /// Helper to compat batches. - compat: CompatBatch, + compat: PrimaryKeyCompatBatch, } impl CompatReader { @@ -54,7 +67,7 @@ impl CompatReader { ) -> Result> { Ok(CompatReader { reader, - compat: CompatBatch::new(mapper, reader_meta)?, + compat: PrimaryKeyCompatBatch::new(mapper, reader_meta)?, }) } } @@ -72,8 +85,36 @@ impl BatchReader for CompatReader { } } +/// Helper to adapt schema of the batch to an expected schema. +pub(crate) enum CompatBatch { + /// Adapter for primary key format. + PrimaryKey(PrimaryKeyCompatBatch), + /// Adapter for flat format. + #[allow(dead_code)] + Flat(FlatCompatBatch), +} + +impl CompatBatch { + /// Returns the inner primary key batch adapter if this is a PrimaryKey format. + pub(crate) fn as_primary_key(&self) -> Option<&PrimaryKeyCompatBatch> { + match self { + CompatBatch::PrimaryKey(batch) => Some(batch), + _ => None, + } + } + + /// Returns the inner flat batch adapter if this is a Flat format. + #[allow(dead_code)] + pub(crate) fn as_flat(&self) -> Option<&FlatCompatBatch> { + match self { + CompatBatch::Flat(batch) => Some(batch), + _ => None, + } + } +} + /// A helper struct to adapt schema of the batch to an expected schema. -pub(crate) struct CompatBatch { +pub(crate) struct PrimaryKeyCompatBatch { /// Optional primary key adapter. rewrite_pk: Option, /// Optional primary key adapter. @@ -82,7 +123,7 @@ pub(crate) struct CompatBatch { compat_fields: Option, } -impl CompatBatch { +impl PrimaryKeyCompatBatch { /// Creates a new [CompatBatch]. /// - `mapper` is built from the metadata users expect to see. /// - `reader_meta` is the metadata of the input reader. @@ -144,6 +185,170 @@ pub(crate) fn has_same_columns_and_pk_encoding( true } +/// A helper struct to adapt schema of the batch to an expected schema. +#[allow(dead_code)] +pub(crate) struct FlatCompatBatch { + /// Indices to convert actual fields to expect fields. + index_or_defaults: Vec, + /// Expected arrow schema. + arrow_schema: SchemaRef, + /// Primary key adapter. + compat_pk: FlatCompatPrimaryKey, +} + +impl FlatCompatBatch { + /// Creates a [FlatCompatBatch]. + /// + /// - `mapper` is built from the metadata users expect to see. + /// - `actual` is the [RegionMetadata] of the input parquet. + /// - `format_projection` is the projection of the read format for the input parquet. + pub(crate) fn try_new( + mapper: &FlatProjectionMapper, + actual: &RegionMetadataRef, + format_projection: &FormatProjection, + ) -> Result { + let actual_schema = flat_projected_columns(actual, format_projection); + let expect_schema = mapper.batch_schema(); + // has_same_columns_and_pk_encoding() already checks columns and encodings. + debug_assert_ne!(expect_schema, actual_schema); + + // Maps column id to the index and data type in the actual schema. + let actual_schema_index: HashMap<_, _> = actual_schema + .iter() + .enumerate() + .map(|(idx, (column_id, data_type))| (*column_id, (idx, data_type))) + .collect(); + + let mut index_or_defaults = Vec::with_capacity(expect_schema.len()); + let mut fields = Vec::with_capacity(expect_schema.len()); + for (column_id, expect_data_type) in expect_schema { + // Safety: expect_schema comes from the same mapper. + let column_index = mapper.metadata().column_index_by_id(*column_id).unwrap(); + let expect_column = &mapper.metadata().column_metadatas[column_index]; + let column_field = &mapper.metadata().schema.arrow_schema().fields()[column_index]; + // For tag columns, we need to create a dictionary field. + if expect_column.semantic_type == SemanticType::Tag { + fields.push(Arc::new(to_dictionary_field(column_field))); + } else { + fields.push(column_field.clone()); + }; + + if let Some((index, actual_data_type)) = actual_schema_index.get(column_id) { + let mut cast_type = None; + + // Same column different type. + if expect_data_type != *actual_data_type { + cast_type = Some(expect_data_type.clone()) + } + // Source has this column. + index_or_defaults.push(IndexOrDefault::Index { + pos: *index, + cast_type, + }); + } else { + // Create a default vector with 1 element for that column. + let default_vector = expect_column + .column_schema + .create_default_vector(1) + .context(CreateDefaultSnafu { + region_id: mapper.metadata().region_id, + column: &expect_column.column_schema.name, + })? + .with_context(|| CompatReaderSnafu { + region_id: mapper.metadata().region_id, + reason: format!( + "column {} does not have a default value to read", + expect_column.column_schema.name + ), + })?; + index_or_defaults.push(IndexOrDefault::DefaultValue { + column_id: expect_column.column_id, + default_vector, + semantic_type: expect_column.semantic_type, + }); + }; + } + fields.extend_from_slice(&internal_fields()); + + let compat_pk = FlatCompatPrimaryKey::new(mapper.metadata(), actual)?; + + Ok(Self { + index_or_defaults, + arrow_schema: Arc::new(Schema::new(fields)), + compat_pk, + }) + } + + /// Make columns of the `batch` compatible. + #[allow(dead_code)] + pub(crate) fn compat(&self, batch: RecordBatch) -> Result { + let len = batch.num_rows(); + let columns = self + .index_or_defaults + .iter() + .map(|index_or_default| match index_or_default { + IndexOrDefault::Index { pos, cast_type } => { + let old_column = batch.column(*pos); + + if let Some(ty) = cast_type { + // Safety: We ensure type can be converted and the new batch should be valid. + // Tips: `safe` must be true in `CastOptions`, which will replace the specific value with null when it cannot be converted. + let casted = + datatypes::arrow::compute::cast(old_column, &ty.as_arrow_type()) + .context(ComputeArrowSnafu)?; + Ok(casted) + } else { + Ok(old_column.clone()) + } + } + IndexOrDefault::DefaultValue { + column_id: _, + default_vector, + semantic_type, + } => repeat_vector(default_vector, len, *semantic_type == SemanticType::Tag), + }) + .chain( + // Adds internal columns. + batch.columns()[batch.num_columns() - INTERNAL_COLUMN_NUM..] + .iter() + .map(|col| Ok(col.clone())), + ) + .collect::>>()?; + + let compat_batch = RecordBatch::try_new(self.arrow_schema.clone(), columns) + .context(NewRecordBatchSnafu)?; + + // Handles primary keys. + self.compat_pk.compat(compat_batch) + } +} + +/// Repeats the vector value `to_len` times. +fn repeat_vector(vector: &VectorRef, to_len: usize, is_tag: bool) -> Result { + assert_eq!(1, vector.len()); + if is_tag { + let values = vector.to_arrow_array(); + if values.is_null(0) { + // Creates a dictionary array with `to_len` null keys. + let keys = UInt32Array::new_null(to_len); + Ok(Arc::new(DictionaryArray::new(keys, values.slice(0, 0)))) + } else { + let keys = UInt32Array::from_value(0, to_len); + Ok(Arc::new(DictionaryArray::new(keys, values))) + } + } else { + let keys = UInt32Array::from_value(0, to_len); + take( + &vector.to_arrow_array(), + &keys, + Some(TakeOptions { + check_bounds: false, + }), + ) + .context(ComputeArrowSnafu) + } +} + /// Helper to make primary key compatible. #[derive(Debug)] struct CompatPrimaryKey { @@ -218,6 +423,7 @@ impl CompatFields { IndexOrDefault::DefaultValue { column_id, default_vector, + semantic_type: _, } => { let data = default_vector.replicate(&[len]); BatchColumn { @@ -252,11 +458,8 @@ fn may_rewrite_primary_key( }) } -/// Creates a [CompatPrimaryKey] if needed. -fn may_compat_primary_key( - expect: &RegionMetadata, - actual: &RegionMetadata, -) -> Result> { +/// Returns true if the actual primary keys is the same as expected. +fn is_primary_key_same(expect: &RegionMetadata, actual: &RegionMetadata) -> Result { ensure!( actual.primary_key.len() <= expect.primary_key.len(), CompatReaderSnafu { @@ -278,7 +481,16 @@ fn may_compat_primary_key( ), } ); - if actual.primary_key.len() == expect.primary_key.len() { + + Ok(actual.primary_key.len() == expect.primary_key.len()) +} + +/// Creates a [CompatPrimaryKey] if needed. +fn may_compat_primary_key( + expect: &RegionMetadata, + actual: &RegionMetadata, +) -> Result> { + if is_primary_key_same(expect, actual)? { return Ok(None); } @@ -368,6 +580,7 @@ fn may_compat_fields( Ok(IndexOrDefault::DefaultValue { column_id: column.column_id, default_vector, + semantic_type: SemanticType::Field, }) } }) @@ -393,6 +606,8 @@ enum IndexOrDefault { column_id: ColumnId, /// Default value. The vector has only 1 element. default_vector: VectorRef, + /// Semantic type of the column. + semantic_type: SemanticType, }, } @@ -450,11 +665,248 @@ impl RewritePrimaryKey { } } +/// Helper to rewrite primary key to another encoding for flat format. +struct FlatRewritePrimaryKey { + /// New primary key encoder. + codec: Arc, + /// Metadata of the expected region. + metadata: RegionMetadataRef, + /// Original primary key codec. + /// If we need to rewrite the primary key. + old_codec: Arc, +} + +impl FlatRewritePrimaryKey { + fn new( + expect: &RegionMetadataRef, + actual: &RegionMetadataRef, + ) -> Option { + if expect.primary_key_encoding == actual.primary_key_encoding { + return None; + } + let codec = build_primary_key_codec(expect); + let old_codec = build_primary_key_codec(actual); + + Some(FlatRewritePrimaryKey { + codec, + metadata: expect.clone(), + old_codec, + }) + } + + /// Rewrites the primary key of the `batch`. + /// It also appends the values to the primary key. + fn rewrite_key( + &self, + append_values: &[(ColumnId, Value)], + batch: RecordBatch, + ) -> Result { + let old_pk_dict_array = batch + .column(primary_key_column_index(batch.num_columns())) + .as_any() + .downcast_ref::() + .unwrap(); + let old_pk_values_array = old_pk_dict_array + .values() + .as_any() + .downcast_ref::() + .unwrap(); + let mut builder = BinaryBuilder::with_capacity( + old_pk_values_array.len(), + old_pk_values_array.value_data().len(), + ); + + // Binary buffer for the primary key. + let mut buffer = Vec::with_capacity( + old_pk_values_array.value_data().len() / old_pk_values_array.len().max(1), + ); + let mut column_id_values = Vec::new(); + // Iterates the binary array and rewrites the primary key. + for value in old_pk_values_array.iter() { + let Some(old_pk) = value else { + builder.append_null(); + continue; + }; + // Decodes the old primary key. + let mut pk_values = self.old_codec.decode(old_pk).context(DecodeSnafu)?; + pk_values.extend(append_values); + + buffer.clear(); + column_id_values.clear(); + // Encodes the new primary key. + match pk_values { + CompositeValues::Dense(dense_values) => { + self.codec + .encode_values(dense_values.as_slice(), &mut buffer) + .context(EncodeSnafu)?; + } + CompositeValues::Sparse(sparse_values) => { + for id in &self.metadata.primary_key { + let value = sparse_values.get_or_null(*id); + column_id_values.push((*id, value.clone())); + } + self.codec + .encode_values(&column_id_values, &mut buffer) + .context(EncodeSnafu)?; + } + } + builder.append_value(&buffer); + } + let new_pk_values_array = Arc::new(builder.finish()); + let new_pk_dict_array = + PrimaryKeyArray::new(old_pk_dict_array.keys().clone(), new_pk_values_array); + + let mut columns = batch.columns().to_vec(); + columns[primary_key_column_index(batch.num_columns())] = Arc::new(new_pk_dict_array); + + RecordBatch::try_new(batch.schema(), columns).context(NewRecordBatchSnafu) + } +} + +/// Helper to make primary key compatible for flat format. +struct FlatCompatPrimaryKey { + /// Primary key rewriter. + rewriter: Option, + /// Converter to append values to primary keys. + converter: Option>, + /// Default values to append. + values: Vec<(ColumnId, Value)>, +} + +impl FlatCompatPrimaryKey { + fn new(expect: &RegionMetadataRef, actual: &RegionMetadataRef) -> Result { + let rewriter = FlatRewritePrimaryKey::new(expect, actual); + + if is_primary_key_same(expect, actual)? { + return Ok(Self { + rewriter, + converter: None, + values: Vec::new(), + }); + } + + // We need to append default values to the primary key. + let to_add = &expect.primary_key[actual.primary_key.len()..]; + let mut values = Vec::with_capacity(to_add.len()); + let mut fields = Vec::with_capacity(to_add.len()); + for column_id in to_add { + // Safety: The id comes from expect region metadata. + let column = expect.column_by_id(*column_id).unwrap(); + fields.push(( + *column_id, + SortField::new(column.column_schema.data_type.clone()), + )); + let default_value = column + .column_schema + .create_default() + .context(CreateDefaultSnafu { + region_id: expect.region_id, + column: &column.column_schema.name, + })? + .with_context(|| CompatReaderSnafu { + region_id: expect.region_id, + reason: format!( + "key column {} does not have a default value to read", + column.column_schema.name + ), + })?; + values.push((*column_id, default_value)); + } + // is_primary_key_same() is false so we have different number of primary key columns. + debug_assert!(!fields.is_empty()); + + // Create converter to append values. + let converter = Some(build_primary_key_codec_with_fields( + expect.primary_key_encoding, + fields.into_iter(), + )); + + Ok(Self { + rewriter, + converter, + values, + }) + } + + /// Makes primary key of the `batch` compatible. + /// + /// Callers must ensure other columns except the `__primary_key` column is compatible. + fn compat(&self, batch: RecordBatch) -> Result { + if let Some(rewriter) = &self.rewriter { + // If we have different encoding, rewrite the whole primary key. + return rewriter.rewrite_key(&self.values, batch); + } + + self.append_key(batch) + } + + /// Appends values to the primary key of the `batch`. + fn append_key(&self, batch: RecordBatch) -> Result { + let Some(converter) = &self.converter else { + return Ok(batch); + }; + + let old_pk_dict_array = batch + .column(primary_key_column_index(batch.num_columns())) + .as_any() + .downcast_ref::() + .unwrap(); + let old_pk_values_array = old_pk_dict_array + .values() + .as_any() + .downcast_ref::() + .unwrap(); + let mut builder = BinaryBuilder::with_capacity( + old_pk_values_array.len(), + old_pk_values_array.value_data().len() + + converter.estimated_size().unwrap_or_default() * old_pk_values_array.len(), + ); + + // Binary buffer for the primary key. + let mut buffer = Vec::with_capacity( + old_pk_values_array.value_data().len() / old_pk_values_array.len().max(1) + + converter.estimated_size().unwrap_or_default(), + ); + + // Iterates the binary array and appends values to the primary key. + for value in old_pk_values_array.iter() { + let Some(old_pk) = value else { + builder.append_null(); + continue; + }; + + buffer.clear(); + buffer.extend_from_slice(old_pk); + converter + .encode_values(&self.values, &mut buffer) + .context(EncodeSnafu)?; + + builder.append_value(&buffer); + } + + let new_pk_values_array = Arc::new(builder.finish()); + let new_pk_dict_array = + PrimaryKeyArray::new(old_pk_dict_array.keys().clone(), new_pk_values_array); + + // Overrides the primary key column. + let mut columns = batch.columns().to_vec(); + columns[primary_key_column_index(batch.num_columns())] = Arc::new(new_pk_dict_array); + + RecordBatch::try_new(batch.schema(), columns).context(NewRecordBatchSnafu) + } +} + #[cfg(test)] mod tests { use std::sync::Arc; use api::v1::{OpType, SemanticType}; + use datatypes::arrow::array::{ + ArrayRef, BinaryDictionaryBuilder, Int64Array, StringDictionaryBuilder, + TimestampMillisecondArray, UInt64Array, UInt8Array, + }; + use datatypes::arrow::datatypes::UInt32Type; + use datatypes::arrow::record_batch::RecordBatch; use datatypes::prelude::ConcreteDataType; use datatypes::schema::ColumnSchema; use datatypes::value::ValueRef; @@ -467,6 +919,9 @@ mod tests { use store_api::storage::RegionId; use super::*; + use crate::read::flat_projection::FlatProjectionMapper; + use crate::sst::parquet::flat_format::FlatReadFormat; + use crate::sst::{to_flat_sst_arrow_schema, FlatSchemaOptions}; use crate::test_util::{check_reader_result, VecBatchReader}; /// Creates a new [RegionMetadata]. @@ -941,4 +1396,180 @@ mod tests { ) .await; } + + /// Creates a primary key array for flat format testing. + fn build_flat_test_pk_array(primary_keys: &[&[u8]]) -> ArrayRef { + let mut builder = BinaryDictionaryBuilder::::new(); + for &pk in primary_keys { + builder.append(pk).unwrap(); + } + Arc::new(builder.finish()) + } + + #[test] + fn test_flat_compat_batch_with_missing_columns() { + let actual_metadata = Arc::new(new_metadata( + &[ + ( + 0, + SemanticType::Timestamp, + ConcreteDataType::timestamp_millisecond_datatype(), + ), + (1, SemanticType::Tag, ConcreteDataType::string_datatype()), + (2, SemanticType::Field, ConcreteDataType::int64_datatype()), + ], + &[1], + )); + + let expected_metadata = Arc::new(new_metadata( + &[ + ( + 0, + SemanticType::Timestamp, + ConcreteDataType::timestamp_millisecond_datatype(), + ), + (1, SemanticType::Tag, ConcreteDataType::string_datatype()), + (2, SemanticType::Field, ConcreteDataType::int64_datatype()), + // Adds a new field. + (3, SemanticType::Field, ConcreteDataType::int64_datatype()), + ], + &[1], + )); + + let mapper = FlatProjectionMapper::all(&expected_metadata).unwrap(); + let read_format = FlatReadFormat::new(actual_metadata.clone(), [0, 1, 2, 3].into_iter()); + let format_projection = read_format.format_projection(); + + let compat_batch = + FlatCompatBatch::try_new(&mapper, &actual_metadata, format_projection).unwrap(); + + let mut tag_builder = StringDictionaryBuilder::::new(); + tag_builder.append_value("tag1"); + tag_builder.append_value("tag1"); + let tag_dict_array = Arc::new(tag_builder.finish()); + + let k1 = encode_key(&[Some("tag1")]); + let input_columns: Vec = vec![ + tag_dict_array.clone(), + Arc::new(Int64Array::from(vec![100, 200])), + Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])), + build_flat_test_pk_array(&[&k1, &k1]), + Arc::new(UInt64Array::from_iter_values([1, 2])), + Arc::new(UInt8Array::from_iter_values([ + OpType::Put as u8, + OpType::Put as u8, + ])), + ]; + let input_schema = + to_flat_sst_arrow_schema(&actual_metadata, &FlatSchemaOptions::default()); + let input_batch = RecordBatch::try_new(input_schema, input_columns).unwrap(); + + let result = compat_batch.compat(input_batch).unwrap(); + + let expected_schema = + to_flat_sst_arrow_schema(&expected_metadata, &FlatSchemaOptions::default()); + + let expected_columns: Vec = vec![ + tag_dict_array.clone(), + Arc::new(Int64Array::from(vec![100, 200])), + Arc::new(Int64Array::from(vec![None::, None::])), + Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])), + build_flat_test_pk_array(&[&k1, &k1]), + Arc::new(UInt64Array::from_iter_values([1, 2])), + Arc::new(UInt8Array::from_iter_values([ + OpType::Put as u8, + OpType::Put as u8, + ])), + ]; + let expected_batch = RecordBatch::try_new(expected_schema, expected_columns).unwrap(); + + assert_eq!(expected_batch, result); + } + + #[test] + fn test_flat_compat_batch_with_different_pk_encoding() { + let mut actual_metadata = new_metadata( + &[ + ( + 0, + SemanticType::Timestamp, + ConcreteDataType::timestamp_millisecond_datatype(), + ), + (1, SemanticType::Tag, ConcreteDataType::string_datatype()), + (2, SemanticType::Field, ConcreteDataType::int64_datatype()), + ], + &[1], + ); + actual_metadata.primary_key_encoding = PrimaryKeyEncoding::Dense; + let actual_metadata = Arc::new(actual_metadata); + + let mut expected_metadata = new_metadata( + &[ + ( + 0, + SemanticType::Timestamp, + ConcreteDataType::timestamp_millisecond_datatype(), + ), + (1, SemanticType::Tag, ConcreteDataType::string_datatype()), + (2, SemanticType::Field, ConcreteDataType::int64_datatype()), + (3, SemanticType::Tag, ConcreteDataType::string_datatype()), + ], + &[1, 3], + ); + expected_metadata.primary_key_encoding = PrimaryKeyEncoding::Sparse; + let expected_metadata = Arc::new(expected_metadata); + + let mapper = FlatProjectionMapper::all(&expected_metadata).unwrap(); + let read_format = FlatReadFormat::new(actual_metadata.clone(), [0, 1, 2, 3].into_iter()); + let format_projection = read_format.format_projection(); + + let compat_batch = + FlatCompatBatch::try_new(&mapper, &actual_metadata, format_projection).unwrap(); + + // Tag array. + let mut tag1_builder = StringDictionaryBuilder::::new(); + tag1_builder.append_value("tag1"); + tag1_builder.append_value("tag1"); + let tag1_dict_array = Arc::new(tag1_builder.finish()); + + let k1 = encode_key(&[Some("tag1")]); + let input_columns: Vec = vec![ + tag1_dict_array.clone(), + Arc::new(Int64Array::from(vec![100, 200])), + Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])), + build_flat_test_pk_array(&[&k1, &k1]), + Arc::new(UInt64Array::from_iter_values([1, 2])), + Arc::new(UInt8Array::from_iter_values([ + OpType::Put as u8, + OpType::Put as u8, + ])), + ]; + let input_schema = + to_flat_sst_arrow_schema(&actual_metadata, &FlatSchemaOptions::default()); + let input_batch = RecordBatch::try_new(input_schema, input_columns).unwrap(); + + let result = compat_batch.compat(input_batch).unwrap(); + + let sparse_k1 = encode_sparse_key(&[(1, Some("tag1")), (3, None)]); + let mut null_tag_builder = StringDictionaryBuilder::::new(); + null_tag_builder.append_nulls(2); + let null_tag_dict_array = Arc::new(null_tag_builder.finish()); + let expected_columns: Vec = vec![ + tag1_dict_array.clone(), + null_tag_dict_array, + Arc::new(Int64Array::from(vec![100, 200])), + Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])), + build_flat_test_pk_array(&[&sparse_k1, &sparse_k1]), + Arc::new(UInt64Array::from_iter_values([1, 2])), + Arc::new(UInt8Array::from_iter_values([ + OpType::Put as u8, + OpType::Put as u8, + ])), + ]; + let output_schema = + to_flat_sst_arrow_schema(&expected_metadata, &FlatSchemaOptions::default()); + let expected_batch = RecordBatch::try_new(output_schema, expected_columns).unwrap(); + + assert_eq!(expected_batch, result); + } } diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 363a6ea503..be7854f0db 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -45,7 +45,7 @@ use crate::error::Result; use crate::extension::{BoxedExtensionRange, BoxedExtensionRangeProvider}; use crate::memtable::MemtableRange; use crate::metrics::READ_SST_COUNT; -use crate::read::compat::{self, CompatBatch}; +use crate::read::compat::{self, CompatBatch, FlatCompatBatch, PrimaryKeyCompatBatch}; use crate::read::projection::ProjectionMapper; use crate::read::range::{FileRangeBuilder, MemRangeBuilder, RangeMeta, RowGroupIndex}; use crate::read::seq_scan::SeqScan; @@ -907,17 +907,29 @@ impl ScanInput { } } }; - if !compat::has_same_columns_and_pk_encoding( + + let need_compat = !compat::has_same_columns_and_pk_encoding( self.mapper.metadata(), file_range_ctx.read_format().metadata(), - ) { + ); + if need_compat { // They have different schema. We need to adapt the batch first so the // mapper can convert it. - let compat = CompatBatch::new( - &self.mapper, - file_range_ctx.read_format().metadata().clone(), - )?; - file_range_ctx.set_compat_batch(Some(compat)); + let compat = if let Some(flat_format) = file_range_ctx.read_format().as_flat() { + let mapper = self.mapper.as_flat().unwrap(); + Some(CompatBatch::Flat(FlatCompatBatch::try_new( + mapper, + flat_format.metadata(), + flat_format.format_projection(), + )?)) + } else { + let compact_batch = PrimaryKeyCompatBatch::new( + &self.mapper, + file_range_ctx.read_format().metadata().clone(), + )?; + Some(CompatBatch::PrimaryKey(compact_batch)) + }; + file_range_ctx.set_compat_batch(compat); } Ok(FileRangeBuilder::new(Arc::new(file_range_ctx), selection)) } diff --git a/src/mito2/src/read/scan_util.rs b/src/mito2/src/read/scan_util.rs index a752301456..820fe5a1cd 100644 --- a/src/mito2/src/read/scan_util.rs +++ b/src/mito2/src/read/scan_util.rs @@ -687,7 +687,7 @@ pub fn build_file_range_scan_stream( let mut source = Source::PruneReader(reader); while let Some(mut batch) = source.next_batch().await? { if let Some(compact_batch) = compat_batch { - batch = compact_batch.compat_batch(batch)?; + batch = compact_batch.as_primary_key().unwrap().compat_batch(batch)?; } yield batch; } diff --git a/src/mito2/src/sst.rs b/src/mito2/src/sst.rs index d7f7d28f5b..9c7d56c9d5 100644 --- a/src/mito2/src/sst.rs +++ b/src/mito2/src/sst.rs @@ -127,12 +127,7 @@ pub fn to_flat_sst_arrow_schema( .is_string() { let field = &schema.fields[pk_index]; - let field = Arc::new(Field::new_dictionary( - field.name(), - datatypes::arrow::datatypes::DataType::UInt32, - field.data_type().clone(), - field.is_nullable(), - )); + let field = Arc::new(to_dictionary_field(field)); fields.push(field); } else { fields.push(schema.fields[pk_index].clone()); @@ -159,8 +154,18 @@ pub fn to_flat_sst_arrow_schema( Arc::new(Schema::new(fields)) } +/// Helper function to create a dictionary field from a field. +pub(crate) fn to_dictionary_field(field: &Field) -> Field { + Field::new_dictionary( + field.name(), + datatypes::arrow::datatypes::DataType::UInt32, + field.data_type().clone(), + field.is_nullable(), + ) +} + /// Fields for internal columns. -fn internal_fields() -> [FieldRef; 3] { +pub(crate) fn internal_fields() -> [FieldRef; 3] { // Internal columns are always not null. [ Arc::new(Field::new_dictionary( diff --git a/src/mito2/src/sst/parquet/flat_format.rs b/src/mito2/src/sst/parquet/flat_format.rs index 5e5b2d033f..534f8e99e2 100644 --- a/src/mito2/src/sst/parquet/flat_format.rs +++ b/src/mito2/src/sst/parquet/flat_format.rs @@ -128,11 +128,8 @@ pub struct FlatReadFormat { metadata: RegionMetadataRef, /// SST file schema. arrow_schema: SchemaRef, - /// Indices of columns to read from the SST. It contains all internal columns. - projection_indices: Vec, - /// Column id to their index in the projected schema ( - /// the schema after projection). - column_id_to_projected_index: HashMap, + /// Projection. + format_projection: FormatProjection, /// Column id to index in SST. column_id_to_sst_index: HashMap, /// Sequence number to override the sequence read from the SST. @@ -159,8 +156,7 @@ impl FlatReadFormat { FlatReadFormat { metadata, arrow_schema, - projection_indices: format_projection.projection_indices, - column_id_to_projected_index: format_projection.column_id_to_projected_index, + format_projection, column_id_to_sst_index: id_to_index, override_sequence: None, } @@ -174,7 +170,10 @@ impl FlatReadFormat { /// Index of a column in the projected batch by its column id. pub fn projected_index_by_id(&self, column_id: ColumnId) -> Option { - self.column_id_to_projected_index.get(&column_id).copied() + self.format_projection + .column_id_to_projected_index + .get(&column_id) + .copied() } /// Returns min values of specific column in row groups. @@ -225,7 +224,7 @@ impl FlatReadFormat { /// Gets sorted projection indices to read. pub(crate) fn projection_indices(&self) -> &[usize] { - &self.projection_indices + &self.format_projection.projection_indices } /// Creates a sequence array to override. @@ -263,6 +262,11 @@ impl FlatReadFormat { RecordBatch::try_new(record_batch.schema(), columns).context(NewRecordBatchSnafu) } + /// Returns the format projection. + pub(crate) fn format_projection(&self) -> &FormatProjection { + &self.format_projection + } + fn get_stat_values( &self, row_groups: &[impl Borrow], diff --git a/src/mito2/src/sst/parquet/format.rs b/src/mito2/src/sst/parquet/format.rs index e1005bd57d..52e1d67dfe 100644 --- a/src/mito2/src/sst/parquet/format.rs +++ b/src/mito2/src/sst/parquet/format.rs @@ -33,7 +33,9 @@ use std::sync::Arc; use api::v1::SemanticType; use common_time::Timestamp; use datafusion_common::ScalarValue; -use datatypes::arrow::array::{ArrayRef, BinaryArray, DictionaryArray, UInt32Array, UInt64Array}; +use datatypes::arrow::array::{ + ArrayRef, BinaryArray, BinaryDictionaryBuilder, DictionaryArray, UInt32Array, UInt64Array, +}; use datatypes::arrow::datatypes::{SchemaRef, UInt32Type}; use datatypes::arrow::record_batch::RecordBatch; use datatypes::prelude::DataType; @@ -55,11 +57,15 @@ use crate::sst::to_sst_arrow_schema; /// Arrow array type for the primary key dictionary. pub(crate) type PrimaryKeyArray = DictionaryArray; +/// Builder type for primary key dictionary array. +pub(crate) type PrimaryKeyArrayBuilder = BinaryDictionaryBuilder; /// Number of columns that have fixed positions. /// /// Contains: time index and internal columns. pub(crate) const FIXED_POS_COLUMN_NUM: usize = 4; +/// Number of internal columns. +pub(crate) const INTERNAL_COLUMN_NUM: usize = 3; /// Helper for writing the SST format with primary key. pub(crate) struct PrimaryKeyWriteFormat {