diff --git a/src/mito2/src/sst/parquet/flat_format.rs b/src/mito2/src/sst/parquet/flat_format.rs index 5e8debe3c6..dda150485e 100644 --- a/src/mito2/src/sst/parquet/flat_format.rs +++ b/src/mito2/src/sst/parquet/flat_format.rs @@ -140,6 +140,8 @@ pub struct FlatReadFormat { impl FlatReadFormat { /// Creates a helper with existing `metadata` and `column_ids` to read. + /// + /// If `skip_auto_convert` is true, skips auto conversion of format when the encoding is sparse encoding. pub fn new( metadata: RegionMetadataRef, column_ids: impl Iterator, @@ -154,11 +156,18 @@ impl FlatReadFormat { let parquet_adapter = if is_legacy { // Safety: is_legacy_format() ensures primary_key is not empty. - ParquetAdapter::PrimaryKeyToFlat(ParquetPrimaryKeyToFlat::new( - metadata, - column_ids, - skip_auto_convert, - )) + if metadata.primary_key_encoding == PrimaryKeyEncoding::Sparse { + // Only skip auto convert when the primary key encoding is sparse. + ParquetAdapter::PrimaryKeyToFlat(ParquetPrimaryKeyToFlat::new( + metadata, + column_ids, + skip_auto_convert, + )) + } else { + ParquetAdapter::PrimaryKeyToFlat(ParquetPrimaryKeyToFlat::new( + metadata, column_ids, false, + )) + } } else { ParquetAdapter::Flat(ParquetFlat::new(metadata, column_ids)) }; @@ -370,6 +379,12 @@ impl ParquetPrimaryKeyToFlat { column_ids: impl Iterator, skip_auto_convert: bool, ) -> ParquetPrimaryKeyToFlat { + assert!(if skip_auto_convert { + metadata.primary_key_encoding == PrimaryKeyEncoding::Sparse + } else { + true + }); + let column_ids: Vec<_> = column_ids.collect(); // Creates a map to lookup index based on the new format. diff --git a/src/mito2/src/sst/parquet/format.rs b/src/mito2/src/sst/parquet/format.rs index 48eca5879a..4297071e4d 100644 --- a/src/mito2/src/sst/parquet/format.rs +++ b/src/mito2/src/sst/parquet/format.rs @@ -1740,7 +1740,7 @@ mod tests { .collect(); let format = FlatReadFormat::new( metadata.clone(), - column_ids.into_iter(), + column_ids.clone().into_iter(), None, "test", false, @@ -1797,7 +1797,7 @@ mod tests { let record_batch = RecordBatch::try_new(old_schema, columns).unwrap(); // Test conversion with sparse encoding - let result = format.convert_batch(record_batch, None).unwrap(); + let result = format.convert_batch(record_batch.clone(), None).unwrap(); // Construct expected RecordBatch in flat format with decoded primary key columns let tag0_array = Arc::new(DictionaryArray::new( @@ -1826,5 +1826,12 @@ mod tests { // Compare the actual result with the expected record batch assert_eq!(expected_record_batch, result); + + let format = + FlatReadFormat::new(metadata.clone(), column_ids.into_iter(), None, "test", true) + .unwrap(); + // Test conversion with sparse encoding and skip convert. + let result = format.convert_batch(record_batch.clone(), None).unwrap(); + assert_eq!(record_batch, result); } }