diff --git a/src/mito-codec/src/row_converter/sparse.rs b/src/mito-codec/src/row_converter/sparse.rs index f80d6b8987..b858058506 100644 --- a/src/mito-codec/src/row_converter/sparse.rs +++ b/src/mito-codec/src/row_converter/sparse.rs @@ -93,6 +93,8 @@ pub const COLUMN_ID_ENCODE_SIZE: usize = 4; impl SparsePrimaryKeyCodec { /// Creates a new [`SparsePrimaryKeyCodec`] instance. + /// + /// The `region_metadata` should be the metadata of the logical region. pub fn new(region_metadata: &RegionMetadataRef) -> Self { Self { inner: Arc::new(SparsePrimaryKeyCodecInner { @@ -123,6 +125,7 @@ impl SparsePrimaryKeyCodec { } } + /// Creates a new [`SparsePrimaryKeyCodec`] instance with additional label `fields`. pub fn with_fields(fields: Vec<(ColumnId, SortField)>) -> Self { Self { inner: Arc::new(SparsePrimaryKeyCodecInner { diff --git a/src/mito2/src/read/compat.rs b/src/mito2/src/read/compat.rs index 601d69964f..ebede80cf7 100644 --- a/src/mito2/src/read/compat.rs +++ b/src/mito2/src/read/compat.rs @@ -45,7 +45,7 @@ use crate::read::projection::{PrimaryKeyProjectionMapper, ProjectionMapper}; use crate::read::{Batch, BatchColumn, BatchReader}; use crate::sst::parquet::flat_format::primary_key_column_index; use crate::sst::parquet::format::{FormatProjection, PrimaryKeyArray, INTERNAL_COLUMN_NUM}; -use crate::sst::{internal_fields, to_dictionary_field}; +use crate::sst::{internal_fields, tag_maybe_to_dictionary_field}; /// Reader to adapt schema of underlying reader to expected schema. pub struct CompatReader { @@ -228,7 +228,10 @@ impl FlatCompatBatch { let column_field = &mapper.metadata().schema.arrow_schema().fields()[column_index]; // For tag columns, we need to create a dictionary field. if expect_column.semantic_type == SemanticType::Tag { - fields.push(Arc::new(to_dictionary_field(column_field))); + fields.push(tag_maybe_to_dictionary_field( + &expect_column.column_schema.data_type, + column_field, + )); } else { fields.push(column_field.clone()); }; @@ -1437,7 +1440,8 @@ mod tests { )); let mapper = FlatProjectionMapper::all(&expected_metadata).unwrap(); - let read_format = FlatReadFormat::new(actual_metadata.clone(), [0, 1, 2, 3].into_iter()); + let read_format = + FlatReadFormat::new(actual_metadata.clone(), [0, 1, 2, 3].into_iter(), false); let format_projection = read_format.format_projection(); let compat_batch = @@ -1520,7 +1524,8 @@ mod tests { let expected_metadata = Arc::new(expected_metadata); let mapper = FlatProjectionMapper::all(&expected_metadata).unwrap(); - let read_format = FlatReadFormat::new(actual_metadata.clone(), [0, 1, 2, 3].into_iter()); + let read_format = + FlatReadFormat::new(actual_metadata.clone(), [0, 1, 2, 3].into_iter(), false); let format_projection = read_format.format_projection(); let compat_batch = diff --git a/src/mito2/src/sst.rs b/src/mito2/src/sst.rs index 9c7d56c9d5..5ff9c0f0e0 100644 --- a/src/mito2/src/sst.rs +++ b/src/mito2/src/sst.rs @@ -21,6 +21,7 @@ use common_base::readable_size::ReadableSize; use datatypes::arrow::datatypes::{ DataType as ArrowDataType, Field, FieldRef, Fields, Schema, SchemaRef, }; +use datatypes::prelude::ConcreteDataType; use store_api::codec::PrimaryKeyEncoding; use store_api::metadata::RegionMetadata; use store_api::storage::consts::{ @@ -120,17 +121,13 @@ pub fn to_flat_sst_arrow_schema( if options.raw_pk_columns { for pk_id in &metadata.primary_key { let pk_index = metadata.column_index_by_id(*pk_id).unwrap(); - if options.string_pk_use_dict - && metadata.column_metadatas[pk_index] - .column_schema - .data_type - .is_string() - { - let field = &schema.fields[pk_index]; - let field = Arc::new(to_dictionary_field(field)); - fields.push(field); - } else { - fields.push(schema.fields[pk_index].clone()); + if options.string_pk_use_dict { + let old_field = &schema.fields[pk_index]; + let new_field = tag_maybe_to_dictionary_field( + &metadata.column_metadatas[pk_index].column_schema.data_type, + old_field, + ); + fields.push(new_field); } } } @@ -155,7 +152,7 @@ pub fn to_flat_sst_arrow_schema( } /// Helper function to create a dictionary field from a field. -pub(crate) fn to_dictionary_field(field: &Field) -> Field { +fn to_dictionary_field(field: &Field) -> Field { Field::new_dictionary( field.name(), datatypes::arrow::datatypes::DataType::UInt32, @@ -164,6 +161,18 @@ pub(crate) fn to_dictionary_field(field: &Field) -> Field { ) } +/// Helper function to create a dictionary field from a field if it is a string column. +pub(crate) fn tag_maybe_to_dictionary_field( + data_type: &ConcreteDataType, + field: &Arc, +) -> Arc { + if data_type.is_string() { + Arc::new(to_dictionary_field(field)) + } else { + field.clone() + } +} + /// Fields for internal columns. pub(crate) fn internal_fields() -> [FieldRef; 3] { // Internal columns are always not null. diff --git a/src/mito2/src/sst/parquet/flat_format.rs b/src/mito2/src/sst/parquet/flat_format.rs index 534f8e99e2..0e60d95bd3 100644 --- a/src/mito2/src/sst/parquet/flat_format.rs +++ b/src/mito2/src/sst/parquet/flat_format.rs @@ -33,17 +33,27 @@ use std::collections::HashMap; use std::sync::Arc; use api::v1::SemanticType; -use datatypes::arrow::array::{ArrayRef, UInt64Array}; -use datatypes::arrow::datatypes::SchemaRef; +use datatypes::arrow::array::{ + Array, ArrayRef, BinaryArray, DictionaryArray, UInt32Array, UInt64Array, +}; +use datatypes::arrow::compute::kernels::take::take; +use datatypes::arrow::datatypes::{Schema, SchemaRef}; use datatypes::arrow::record_batch::RecordBatch; +use datatypes::prelude::{ConcreteDataType, DataType}; +use mito_codec::row_converter::{build_primary_key_codec, CompositeValues, PrimaryKeyCodec}; use parquet::file::metadata::RowGroupMetaData; -use snafu::ResultExt; +use snafu::{ensure, OptionExt, ResultExt}; use store_api::metadata::{RegionMetadata, RegionMetadataRef}; use store_api::storage::{ColumnId, SequenceNumber}; -use crate::error::{NewRecordBatchSnafu, Result}; -use crate::sst::parquet::format::{FormatProjection, ReadFormat, StatValues}; -use crate::sst::{to_flat_sst_arrow_schema, FlatSchemaOptions}; +use crate::error::{ + ComputeArrowSnafu, DecodeSnafu, InvalidParquetSnafu, InvalidRecordBatchSnafu, + NewRecordBatchSnafu, Result, +}; +use crate::sst::parquet::format::{ + FormatProjection, PrimaryKeyArray, ReadFormat, StatValues, INTERNAL_COLUMN_NUM, +}; +use crate::sst::{tag_maybe_to_dictionary_field, to_flat_sst_arrow_schema, FlatSchemaOptions}; /// Helper for writing the SST format. #[allow(dead_code)] @@ -128,12 +138,14 @@ pub struct FlatReadFormat { metadata: RegionMetadataRef, /// SST file schema. arrow_schema: SchemaRef, - /// Projection. + /// Projection computed for the format. format_projection: FormatProjection, /// Column id to index in SST. column_id_to_sst_index: HashMap, /// Sequence number to override the sequence read from the SST. override_sequence: Option, + /// Optional format converter for handling flat format conversion. + convert_format: Option, } impl FlatReadFormat { @@ -141,6 +153,7 @@ impl FlatReadFormat { pub fn new( metadata: RegionMetadataRef, column_ids: impl Iterator, + convert_to_flat: bool, ) -> FlatReadFormat { let arrow_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default()); @@ -153,12 +166,20 @@ impl FlatReadFormat { column_ids, ); + let convert_format = if convert_to_flat { + let codec = build_primary_key_codec(&metadata); + FlatConvertFormat::new(Arc::clone(&metadata), &format_projection, codec) + } else { + None + }; + FlatReadFormat { metadata, arrow_schema, format_projection, column_id_to_sst_index: id_to_index, override_sequence: None, + convert_format, } } @@ -227,44 +248,101 @@ impl FlatReadFormat { &self.format_projection.projection_indices } + /// Gets the projection. + pub(crate) fn format_projection(&self) -> &FormatProjection { + &self.format_projection + } + /// Creates a sequence array to override. - #[allow(dead_code)] pub(crate) fn new_override_sequence_array(&self, length: usize) -> Option { self.override_sequence .map(|seq| Arc::new(UInt64Array::from_value(seq, length)) as ArrayRef) } - /// Convert a record batch to apply override sequence array. + /// Convert a record batch to apply flat format conversion and override sequence array. /// - /// Returns a new RecordBatch with the sequence column replaced by the override sequence array. + /// Returns a new RecordBatch with flat format conversion applied first (if enabled), + /// then the sequence column replaced by the override sequence array. #[allow(dead_code)] pub(crate) fn convert_batch( &self, - record_batch: &RecordBatch, + record_batch: RecordBatch, override_sequence_array: Option<&ArrayRef>, ) -> Result { - let Some(override_array) = override_sequence_array else { - return Ok(record_batch.clone()); + // First, apply flat format conversion if enabled + let batch = if let Some(ref convert_format) = self.convert_format { + convert_format.convert(record_batch)? + } else { + record_batch }; - let mut columns = record_batch.columns().to_vec(); - let sequence_column_idx = sequence_column_index(record_batch.num_columns()); + // Then apply sequence override if provided + let Some(override_array) = override_sequence_array else { + return Ok(batch); + }; + + let mut columns = batch.columns().to_vec(); + let sequence_column_idx = sequence_column_index(batch.num_columns()); // Use the provided override sequence array, slicing if necessary to match batch length - let sequence_array = if override_array.len() > record_batch.num_rows() { - override_array.slice(0, record_batch.num_rows()) + let sequence_array = if override_array.len() > batch.num_rows() { + override_array.slice(0, batch.num_rows()) } else { override_array.clone() }; columns[sequence_column_idx] = sequence_array; - RecordBatch::try_new(record_batch.schema(), columns).context(NewRecordBatchSnafu) + RecordBatch::try_new(batch.schema(), columns).context(NewRecordBatchSnafu) } - /// Returns the format projection. - pub(crate) fn format_projection(&self) -> &FormatProjection { - &self.format_projection + /// Checks whether the batch from the parquet file needs to be converted to match the flat format. + /// + /// * `file_path` is the path to the parquet file, for error message. + /// * `num_columns` is the number of columns in the parquet file. + /// * `metadata` is the region metadata (always assumes flat format). + #[allow(dead_code)] + pub(crate) fn need_convert_to_flat( + file_path: &str, + num_columns: usize, + metadata: &RegionMetadata, + ) -> Result { + // For flat format, compute expected column number: + // all columns + internal columns (pk, sequence, op_type) + let expected_columns = metadata.column_metadatas.len() + INTERNAL_COLUMN_NUM; + + if expected_columns == num_columns { + // Same number of columns, no conversion needed + Ok(false) + } else { + ensure!( + expected_columns >= num_columns, + InvalidParquetSnafu { + file: file_path, + reason: format!( + "Expected columns {} should be >= actual columns {}", + expected_columns, num_columns + ) + } + ); + + // Different number of columns, check if the difference matches primary key count + let column_diff = expected_columns - num_columns; + + ensure!( + column_diff == metadata.primary_key.len(), + InvalidParquetSnafu { + file: file_path, + reason: format!( + "Column number difference {} does not match primary key count {}", + column_diff, + metadata.primary_key.len() + ) + } + ); + + Ok(true) + } } fn get_stat_values( @@ -309,6 +387,179 @@ pub(crate) fn sst_column_id_indices(metadata: &RegionMetadata) -> HashMap, + /// Projected primary key column information: (column_id, pk_index, column_index in metadata). + projected_primary_keys: Vec<(ColumnId, usize, usize)>, +} + +impl FlatConvertFormat { + /// Creates a new `FlatConvertFormat`. + /// + /// The `format_projection` is the projection computed in the [FlatReadFormat] with the `metadata`. + /// The `codec` is the primary key codec of the `metadata`. + /// + /// Returns `None` if there is no primary key. + pub(crate) fn new( + metadata: RegionMetadataRef, + format_projection: &FormatProjection, + codec: Arc, + ) -> Option { + if metadata.primary_key.is_empty() { + return None; + } + + // Builds projected primary keys list maintaining the order of RegionMetadata::primary_key + let mut projected_primary_keys = Vec::new(); + for (pk_index, &column_id) in metadata.primary_key.iter().enumerate() { + if format_projection + .column_id_to_projected_index + .contains_key(&column_id) + { + // We expect the format_projection is built from the metadata. + let column_index = metadata.column_index_by_id(column_id).unwrap(); + projected_primary_keys.push((column_id, pk_index, column_index)); + } + } + + Some(Self { + metadata, + codec, + projected_primary_keys, + }) + } + + /// Converts a batch to have decoded primary key columns in flat format. + /// + /// The primary key array in the batch is a dictionary array. We decode each value which is a + /// primary key and reuse the keys array to build a dictionary array for each tag column. + /// The decoded columns are inserted in front of other columns. + pub(crate) fn convert(&self, batch: RecordBatch) -> Result { + if self.projected_primary_keys.is_empty() { + return Ok(batch); + } + + let primary_key_index = primary_key_column_index(batch.num_columns()); + let pk_dict_array = batch + .column(primary_key_index) + .as_any() + .downcast_ref::() + .with_context(|| InvalidRecordBatchSnafu { + reason: "Primary key column is not a dictionary array".to_string(), + })?; + + let pk_values_array = pk_dict_array + .values() + .as_any() + .downcast_ref::() + .with_context(|| InvalidRecordBatchSnafu { + reason: "Primary key values are not binary array".to_string(), + })?; + + // Decodes all primary key values + let mut decoded_pk_values = Vec::with_capacity(pk_values_array.len()); + for i in 0..pk_values_array.len() { + if pk_values_array.is_null(i) { + decoded_pk_values.push(None); + } else { + let pk_bytes = pk_values_array.value(i); + let decoded = self.codec.decode(pk_bytes).context(DecodeSnafu)?; + decoded_pk_values.push(Some(decoded)); + } + } + + // Builds decoded tag column arrays. + let mut decoded_columns = Vec::new(); + for (column_id, pk_index, column_index) in &self.projected_primary_keys { + let column_metadata = &self.metadata.column_metadatas[*column_index]; + let tag_column = self.build_primary_key_column( + *column_id, + *pk_index, + &column_metadata.column_schema.data_type, + pk_dict_array.keys(), + &decoded_pk_values, + )?; + decoded_columns.push(tag_column); + } + + // Builds new columns: decoded tag columns first, then original columns + let mut new_columns = Vec::with_capacity(batch.num_columns() + decoded_columns.len()); + new_columns.extend(decoded_columns); + new_columns.extend_from_slice(batch.columns()); + + // Builds new schema + let mut new_fields = + Vec::with_capacity(batch.schema().fields().len() + self.projected_primary_keys.len()); + for (_, _, column_index) in &self.projected_primary_keys { + let column_metadata = &self.metadata.column_metadatas[*column_index]; + let old_field = &self.metadata.schema.arrow_schema().fields()[*column_index]; + let field = + tag_maybe_to_dictionary_field(&column_metadata.column_schema.data_type, old_field); + new_fields.push(field); + } + new_fields.extend(batch.schema().fields().iter().cloned()); + + let new_schema = Arc::new(Schema::new(new_fields)); + RecordBatch::try_new(new_schema, new_columns).context(NewRecordBatchSnafu) + } + + /// Builds an array for a specific tag column. + /// + /// It may build a dictionary array if the type is string. Note that the dictionary + /// array may have null values, although keys are not null. + fn build_primary_key_column( + &self, + column_id: ColumnId, + pk_index: usize, + column_type: &ConcreteDataType, + keys: &UInt32Array, + decoded_pk_values: &[Option], + ) -> Result { + // Gets values from the primary key. + let mut builder = column_type.create_mutable_vector(decoded_pk_values.len()); + for decoded_opt in decoded_pk_values { + match decoded_opt { + Some(decoded) => { + match decoded { + CompositeValues::Dense(dense) => { + if pk_index < dense.len() { + builder.push_value_ref(dense[pk_index].1.as_value_ref()); + } else { + builder.push_null(); + } + } + CompositeValues::Sparse(sparse) => { + let value = sparse.get_or_null(column_id); + builder.push_value_ref(value.as_value_ref()); + } + }; + } + None => builder.push_null(), + } + } + + let values_vector = builder.to_vector(); + let values_array = values_vector.to_arrow_array(); + + // Only creates dictionary array for string types, otherwise take values by keys + if matches!(column_type, ConcreteDataType::String(_)) { + // Creates dictionary array using the same keys for string types + // Note that the dictionary values may have nulls. + let dict_array = DictionaryArray::new(keys.clone(), values_array); + Ok(Arc::new(dict_array)) + } else { + // For non-string types, takes values by keys indices to create a regular array + let taken_array = take(&values_array, keys, None).context(ComputeArrowSnafu)?; + Ok(taken_array) + } + } +} + #[cfg(test)] impl FlatReadFormat { /// Creates a helper with existing `metadata` and all columns. @@ -316,6 +567,7 @@ impl FlatReadFormat { Self::new( Arc::clone(&metadata), metadata.column_metadatas.iter().map(|c| c.column_id), + false, ) } } diff --git a/src/mito2/src/sst/parquet/format.rs b/src/mito2/src/sst/parquet/format.rs index 52e1d67dfe..211e47ea43 100644 --- a/src/mito2/src/sst/parquet/format.rs +++ b/src/mito2/src/sst/parquet/format.rs @@ -152,7 +152,7 @@ impl ReadFormat { flat_format: bool, ) -> Self { if flat_format { - Self::new_flat(metadata, column_ids) + Self::new_flat(metadata, column_ids, false) } else { Self::new_primary_key(metadata, column_ids) } @@ -170,8 +170,9 @@ impl ReadFormat { pub fn new_flat( metadata: RegionMetadataRef, column_ids: impl Iterator, + convert_to_flat: bool, ) -> Self { - ReadFormat::Flat(FlatReadFormat::new(metadata, column_ids)) + ReadFormat::Flat(FlatReadFormat::new(metadata, column_ids, convert_to_flat)) } pub(crate) fn as_primary_key(&self) -> Option<&PrimaryKeyReadFormat> { @@ -925,17 +926,25 @@ mod tests { use std::sync::Arc; use api::v1::OpType; - use datatypes::arrow::array::{Int64Array, TimestampMillisecondArray, UInt64Array, UInt8Array}; + use datatypes::arrow::array::{ + Int64Array, StringArray, TimestampMillisecondArray, UInt64Array, UInt8Array, + }; use datatypes::arrow::datatypes::{DataType as ArrowDataType, Field, Schema, TimeUnit}; use datatypes::prelude::ConcreteDataType; use datatypes::schema::ColumnSchema; + use datatypes::value::ValueRef; use datatypes::vectors::{Int64Vector, TimestampMillisecondVector, UInt64Vector, UInt8Vector}; + use mito_codec::row_converter::{ + DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt, SparsePrimaryKeyCodec, + }; + use store_api::codec::PrimaryKeyEncoding; use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder}; + use store_api::storage::consts::ReservedColumnId; use store_api::storage::RegionId; use super::*; - use crate::sst::parquet::flat_format::FlatWriteFormat; - use crate::sst::FlatSchemaOptions; + use crate::sst::parquet::flat_format::{sequence_column_index, FlatWriteFormat}; + use crate::sst::{to_flat_sst_arrow_schema, FlatSchemaOptions}; const TEST_SEQUENCE: u64 = 1; const TEST_OP_TYPE: u8 = OpType::Put as u8; @@ -1346,19 +1355,19 @@ mod tests { // The projection includes all "fixed position" columns: ts(4), __primary_key(5), __sequence(6), __op_type(7) // Only read tag1 (column_id=3, index=1) + fixed columns - let read_format = ReadFormat::new_flat(metadata.clone(), [3].iter().copied()); + let read_format = ReadFormat::new_flat(metadata.clone(), [3].iter().copied(), false); assert_eq!(&[1, 4, 5, 6, 7], read_format.projection_indices()); // Only read field1 (column_id=4, index=2) + fixed columns - let read_format = ReadFormat::new_flat(metadata.clone(), [4].iter().copied()); + let read_format = ReadFormat::new_flat(metadata.clone(), [4].iter().copied(), false); assert_eq!(&[2, 4, 5, 6, 7], read_format.projection_indices()); // Only read ts (column_id=5, index=4) + fixed columns (ts is already included in fixed) - let read_format = ReadFormat::new_flat(metadata.clone(), [5].iter().copied()); + let read_format = ReadFormat::new_flat(metadata.clone(), [5].iter().copied(), false); assert_eq!(&[4, 5, 6, 7], read_format.projection_indices()); // Read field0(column_id=2, index=3), tag0(column_id=1, index=0), ts(column_id=5, index=4) + fixed columns - let read_format = ReadFormat::new_flat(metadata, [2, 1, 5].iter().copied()); + let read_format = ReadFormat::new_flat(metadata, [2, 1, 5].iter().copied(), false); assert_eq!(&[0, 3, 4, 5, 6, 7], read_format.projection_indices()); } @@ -1368,6 +1377,7 @@ mod tests { let mut format = FlatReadFormat::new( metadata, std::iter::once(1), // Just read tag0 + false, ); let num_rows = 4; @@ -1383,10 +1393,8 @@ mod tests { RecordBatch::try_new(format.arrow_schema().clone(), test_columns).unwrap(); // Test without override sequence - should return clone - let result = format.convert_batch(&record_batch, None).unwrap(); - let sequence_column = result.column( - crate::sst::parquet::flat_format::sequence_column_index(result.num_columns()), - ); + let result = format.convert_batch(record_batch.clone(), None).unwrap(); + let sequence_column = result.column(sequence_column_index(result.num_columns())); let sequence_array = sequence_column .as_any() .downcast_ref::() @@ -1399,11 +1407,9 @@ mod tests { format.set_override_sequence(Some(override_sequence)); let override_sequence_array = format.new_override_sequence_array(num_rows).unwrap(); let result = format - .convert_batch(&record_batch, Some(&override_sequence_array)) + .convert_batch(record_batch, Some(&override_sequence_array)) .unwrap(); - let sequence_column = result.column( - crate::sst::parquet::flat_format::sequence_column_index(result.num_columns()), - ); + let sequence_column = result.column(sequence_column_index(result.num_columns())); let sequence_array = sequence_column .as_any() .downcast_ref::() @@ -1412,4 +1418,333 @@ mod tests { let expected_override = UInt64Array::from(vec![override_sequence; num_rows]); assert_eq!(sequence_array, &expected_override); } + + #[test] + fn test_need_convert_to_flat() { + let metadata = build_test_region_metadata(); + + // Test case 1: Same number of columns, no conversion needed + // For flat format: all columns (5) + internal columns (3) + let expected_columns = metadata.column_metadatas.len() + 3; + let result = + FlatReadFormat::need_convert_to_flat("test.parquet", expected_columns, &metadata) + .unwrap(); + assert!( + !result, + "Should not need conversion when column counts match" + ); + + // Test case 2: Different number of columns, need conversion + // Missing primary key columns (2 primary keys in test metadata) + let num_columns_without_pk = expected_columns - metadata.primary_key.len(); + let result = + FlatReadFormat::need_convert_to_flat("test.parquet", num_columns_without_pk, &metadata) + .unwrap(); + assert!( + result, + "Should need conversion when primary key columns are missing" + ); + + // Test case 3: Invalid case - actual columns more than expected + let too_many_columns = expected_columns + 1; + let err = FlatReadFormat::need_convert_to_flat("test.parquet", too_many_columns, &metadata) + .unwrap_err(); + assert!(err.to_string().contains("Expected columns"), "{err:?}"); + + // Test case 4: Invalid case - column difference doesn't match primary key count + let wrong_diff_columns = expected_columns - 1; // Difference of 1, but we have 2 primary keys + let err = + FlatReadFormat::need_convert_to_flat("test.parquet", wrong_diff_columns, &metadata) + .unwrap_err(); + assert!( + err.to_string().contains("Column number difference"), + "{err:?}" + ); + } + + fn build_test_dense_pk_array( + codec: &DensePrimaryKeyCodec, + pk_values_per_row: &[&[Option]], + ) -> Arc { + let mut builder = PrimaryKeyArrayBuilder::with_capacity(pk_values_per_row.len(), 1024, 0); + + for pk_values_row in pk_values_per_row { + let values: Vec = pk_values_row + .iter() + .map(|opt| match opt { + Some(val) => ValueRef::Int64(*val), + None => ValueRef::Null, + }) + .collect(); + + let encoded = codec.encode(values.into_iter()).unwrap(); + builder.append_value(&encoded); + } + + Arc::new(builder.finish()) + } + + fn build_test_sparse_region_metadata() -> RegionMetadataRef { + let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1)); + builder + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "__table_id", + ConcreteDataType::uint32_datatype(), + false, + ), + semantic_type: SemanticType::Tag, + column_id: ReservedColumnId::table_id(), + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "__tsid", + ConcreteDataType::uint64_datatype(), + false, + ), + semantic_type: SemanticType::Tag, + column_id: ReservedColumnId::tsid(), + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new("tag0", ConcreteDataType::string_datatype(), true), + semantic_type: SemanticType::Tag, + column_id: 1, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new("tag1", ConcreteDataType::string_datatype(), true), + semantic_type: SemanticType::Tag, + column_id: 3, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "field1", + ConcreteDataType::int64_datatype(), + true, + ), + semantic_type: SemanticType::Field, + column_id: 4, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "field0", + ConcreteDataType::int64_datatype(), + true, + ), + semantic_type: SemanticType::Field, + column_id: 2, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + semantic_type: SemanticType::Timestamp, + column_id: 5, + }) + .primary_key(vec![ + ReservedColumnId::table_id(), + ReservedColumnId::tsid(), + 1, + 3, + ]) + .primary_key_encoding(PrimaryKeyEncoding::Sparse); + Arc::new(builder.build().unwrap()) + } + + fn build_test_sparse_pk_array( + codec: &SparsePrimaryKeyCodec, + pk_values_per_row: &[SparseTestRow], + ) -> Arc { + let mut builder = PrimaryKeyArrayBuilder::with_capacity(pk_values_per_row.len(), 1024, 0); + for row in pk_values_per_row { + let values = vec![ + (ReservedColumnId::table_id(), ValueRef::UInt32(row.table_id)), + (ReservedColumnId::tsid(), ValueRef::UInt64(row.tsid)), + (1, ValueRef::String(&row.tag0)), + (3, ValueRef::String(&row.tag1)), + ]; + + let mut buffer = Vec::new(); + codec.encode_value_refs(&values, &mut buffer).unwrap(); + builder.append_value(&buffer); + } + + Arc::new(builder.finish()) + } + + #[derive(Clone)] + struct SparseTestRow { + table_id: u32, + tsid: u64, + tag0: String, + tag1: String, + } + + #[test] + fn test_flat_read_format_convert_format_with_dense_encoding() { + let metadata = build_test_region_metadata(); + + let column_ids: Vec<_> = metadata + .column_metadatas + .iter() + .map(|c| c.column_id) + .collect(); + let format = FlatReadFormat::new(metadata.clone(), column_ids.into_iter(), true); + + let num_rows = 4; + let original_sequence = 100u64; + + // Create primary key values for each row: tag0=1, tag1=1 for all rows + let pk_values_per_row = vec![ + &[Some(1i64), Some(1i64)][..]; num_rows // All rows have same primary key values + ]; + + // Create a test record batch in old format using dense encoding + let codec = DensePrimaryKeyCodec::new(&metadata); + let dense_pk_array = build_test_dense_pk_array(&codec, &pk_values_per_row); + let columns: Vec = vec![ + Arc::new(Int64Array::from(vec![2; num_rows])), // field1 + Arc::new(Int64Array::from(vec![3; num_rows])), // field0 + Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), // ts + dense_pk_array.clone(), // __primary_key (dense encoding) + Arc::new(UInt64Array::from(vec![original_sequence; num_rows])), // sequence + Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // op type + ]; + + // Create schema for old format (without primary key columns) + let old_format_fields = vec![ + Field::new("field1", ArrowDataType::Int64, true), + Field::new("field0", ArrowDataType::Int64, true), + Field::new( + "ts", + ArrowDataType::Timestamp(TimeUnit::Millisecond, None), + false, + ), + Field::new( + "__primary_key", + ArrowDataType::Dictionary( + Box::new(ArrowDataType::UInt32), + Box::new(ArrowDataType::Binary), + ), + false, + ), + Field::new("__sequence", ArrowDataType::UInt64, false), + Field::new("__op_type", ArrowDataType::UInt8, false), + ]; + let old_schema = Arc::new(Schema::new(old_format_fields)); + let record_batch = RecordBatch::try_new(old_schema, columns).unwrap(); + + // Test conversion with dense encoding + let result = format.convert_batch(record_batch, None).unwrap(); + + // Construct expected RecordBatch in flat format with decoded primary key columns + let expected_columns: Vec = vec![ + Arc::new(Int64Array::from(vec![1; num_rows])), // tag0 (decoded from primary key) + Arc::new(Int64Array::from(vec![1; num_rows])), // tag1 (decoded from primary key) + Arc::new(Int64Array::from(vec![2; num_rows])), // field1 + Arc::new(Int64Array::from(vec![3; num_rows])), // field0 + Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), // ts + dense_pk_array, // __primary_key (preserved) + Arc::new(UInt64Array::from(vec![original_sequence; num_rows])), // sequence + Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // op type + ]; + let expected_record_batch = + RecordBatch::try_new(build_test_flat_sst_schema(), expected_columns).unwrap(); + + // Compare the actual result with the expected record batch + assert_eq!(expected_record_batch, result); + } + + #[test] + fn test_flat_read_format_convert_format_with_sparse_encoding() { + let metadata = build_test_sparse_region_metadata(); + + let column_ids: Vec<_> = metadata + .column_metadatas + .iter() + .map(|c| c.column_id) + .collect(); + let format = FlatReadFormat::new(metadata.clone(), column_ids.into_iter(), true); + + let num_rows = 4; + let original_sequence = 100u64; + + // Create sparse test data with table_id, tsid and string tags + let pk_test_rows = vec![ + SparseTestRow { + table_id: 1, + tsid: 123, + tag0: "frontend".to_string(), + tag1: "pod1".to_string(), + }; + num_rows + ]; + + let codec = SparsePrimaryKeyCodec::new(&metadata); + let sparse_pk_array = build_test_sparse_pk_array(&codec, &pk_test_rows); + // Create a test record batch in old format using sparse encoding + let columns: Vec = vec![ + Arc::new(Int64Array::from(vec![2; num_rows])), // field1 + Arc::new(Int64Array::from(vec![3; num_rows])), // field0 + Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), // ts + sparse_pk_array.clone(), // __primary_key (sparse encoding) + Arc::new(UInt64Array::from(vec![original_sequence; num_rows])), // sequence + Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // op type + ]; + + // Create schema for old format (without primary key columns) + let old_format_fields = vec![ + Field::new("field1", ArrowDataType::Int64, true), + Field::new("field0", ArrowDataType::Int64, true), + Field::new( + "ts", + ArrowDataType::Timestamp(TimeUnit::Millisecond, None), + false, + ), + Field::new( + "__primary_key", + ArrowDataType::Dictionary( + Box::new(ArrowDataType::UInt32), + Box::new(ArrowDataType::Binary), + ), + false, + ), + Field::new("__sequence", ArrowDataType::UInt64, false), + Field::new("__op_type", ArrowDataType::UInt8, false), + ]; + let old_schema = Arc::new(Schema::new(old_format_fields)); + let record_batch = RecordBatch::try_new(old_schema, columns).unwrap(); + + // Test conversion with sparse encoding + let result = format.convert_batch(record_batch, None).unwrap(); + + // Construct expected RecordBatch in flat format with decoded primary key columns + let tag0_array = Arc::new(DictionaryArray::new( + UInt32Array::from(vec![0; num_rows]), + Arc::new(StringArray::from(vec!["frontend"])), + )); + let tag1_array = Arc::new(DictionaryArray::new( + UInt32Array::from(vec![0; num_rows]), + Arc::new(StringArray::from(vec!["pod1"])), + )); + let expected_columns: Vec = vec![ + Arc::new(UInt32Array::from(vec![1; num_rows])), // __table_id (decoded from primary key) + Arc::new(UInt64Array::from(vec![123; num_rows])), // __tsid (decoded from primary key) + tag0_array, // tag0 (decoded from primary key) + tag1_array, // tag1 (decoded from primary key) + Arc::new(Int64Array::from(vec![2; num_rows])), // field1 + Arc::new(Int64Array::from(vec![3; num_rows])), // field0 + Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), // ts + sparse_pk_array, // __primary_key (preserved) + Arc::new(UInt64Array::from(vec![original_sequence; num_rows])), // sequence + Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // op type + ]; + let expected_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default()); + let expected_record_batch = + RecordBatch::try_new(expected_schema, expected_columns).unwrap(); + + // Compare the actual result with the expected record batch + assert_eq!(expected_record_batch, result); + } }