Skip to main content

mito2/read/
compat.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Utilities to adapt readers with different schema.
16
17use std::collections::HashMap;
18use std::sync::Arc;
19
20use api::v1::SemanticType;
21use datatypes::arrow::array::{
22    Array, ArrayRef, BinaryArray, BinaryBuilder, DictionaryArray, UInt32Array,
23};
24use datatypes::arrow::compute::{TakeOptions, take};
25use datatypes::arrow::datatypes::{FieldRef, Schema, SchemaRef};
26use datatypes::arrow::record_batch::RecordBatch;
27use datatypes::data_type::ConcreteDataType;
28use datatypes::extension::json::is_json_extension_type;
29use datatypes::prelude::DataType;
30use datatypes::schema::ext::ArrowSchemaExt;
31use datatypes::value::Value;
32use datatypes::vectors::VectorRef;
33use datatypes::vectors::json::array::JsonArray;
34use mito_codec::row_converter::{
35    CompositeValues, PrimaryKeyCodec, SortField, build_primary_key_codec,
36    build_primary_key_codec_with_fields,
37};
38use snafu::{OptionExt, ResultExt, ensure};
39use store_api::codec::PrimaryKeyEncoding;
40use store_api::metadata::{RegionMetadata, RegionMetadataRef};
41use store_api::storage::ColumnId;
42
43use crate::error::{
44    CompatReaderSnafu, ComputeArrowSnafu, ConvertValueSnafu, CreateDefaultSnafu, DecodeSnafu,
45    EncodeSnafu, NewRecordBatchSnafu, Result, UnsupportedOperationSnafu,
46};
47use crate::read::flat_projection::{FlatProjectionMapper, flat_projected_columns};
48use crate::sst::parquet::flat_format::{FlatReadFormat, primary_key_column_index};
49use crate::sst::parquet::format::{INTERNAL_COLUMN_NUM, PrimaryKeyArray};
50use crate::sst::{internal_fields, tag_maybe_to_dictionary_field, with_field_id};
51
52/// Returns true if the columns in the `projection_mapper` and `read_format` have same data types
53/// and primary key encodings.
54pub(crate) fn has_same_columns_and_pk_encoding(
55    projection_mapper: &FlatProjectionMapper,
56    read_format: &FlatReadFormat,
57    compaction: bool,
58) -> bool {
59    let left = projection_mapper.metadata();
60    let right = read_format.metadata();
61    if left.primary_key_encoding != right.primary_key_encoding {
62        return false;
63    }
64
65    if left.column_metadatas.len() != right.column_metadatas.len() {
66        return false;
67    }
68
69    for (left_col, right_col) in left.column_metadatas.iter().zip(&right.column_metadatas) {
70        if left_col.column_id != right_col.column_id {
71            return false;
72        }
73        debug_assert_eq!(left_col.semantic_type, right_col.semantic_type);
74    }
75
76    &projection_mapper.input_arrow_schema(compaction) == read_format.arrow_schema()
77}
78
79/// A helper struct to adapt schema of the batch to an expected schema.
80pub(crate) struct FlatCompatBatch {
81    /// Indices to convert actual fields to expect fields.
82    index_or_defaults: Vec<IndexOrDefault>,
83    /// Expected arrow schema.
84    arrow_schema: SchemaRef,
85    /// Primary key adapter.
86    compat_pk: FlatCompatPrimaryKey,
87}
88
89impl FlatCompatBatch {
90    /// Creates a [FlatCompatBatch].
91    ///
92    /// - `mapper` is built from the metadata users expect to see.
93    /// - `read_format` is the [FlatReadFormat] of the input parquet.
94    /// - `compaction` indicates whether the reader is for compaction.
95    pub(crate) fn try_new(
96        mapper: &FlatProjectionMapper,
97        read_format: &FlatReadFormat,
98        compaction: bool,
99    ) -> Result<Option<Self>> {
100        let actual = read_format.metadata();
101        let format_projection = read_format.format_projection();
102        let mut actual_schema = flat_projected_columns(actual, format_projection);
103        if read_format.arrow_schema().has_json_extension_field() {
104            for field in read_format.arrow_schema().fields() {
105                if is_json_extension_type(field)
106                    && let Some(column_id) =
107                        actual.column_by_name(field.name()).map(|x| x.column_id)
108                    && let Some(i) = actual_schema.iter().position(|x| x.0 == column_id)
109                {
110                    actual_schema[i].1 = ConcreteDataType::from_arrow_type(field.data_type());
111                }
112            }
113        }
114
115        let expect_schema = mapper.batch_schema();
116        if expect_schema == actual_schema {
117            // Although the SST has a different schema, but the schema after projection is the same
118            // as expected schema.
119            return Ok(None);
120        }
121
122        if actual.primary_key_encoding == PrimaryKeyEncoding::Sparse && compaction {
123            // Special handling for sparse encoding in compaction.
124            return FlatCompatBatch::try_new_compact_sparse(mapper, actual);
125        }
126
127        let (index_or_defaults, fields) =
128            Self::compute_index_and_fields(&actual_schema, expect_schema, mapper.metadata())?;
129
130        let compat_pk = FlatCompatPrimaryKey::new(mapper.metadata(), actual)?;
131
132        Ok(Some(Self {
133            index_or_defaults,
134            arrow_schema: Arc::new(Schema::new(fields)),
135            compat_pk,
136        }))
137    }
138
139    fn compute_index_and_fields(
140        actual_schema: &[(ColumnId, ConcreteDataType)],
141        expect_schema: &[(ColumnId, ConcreteDataType)],
142        expect_metadata: &RegionMetadata,
143    ) -> Result<(Vec<IndexOrDefault>, Vec<FieldRef>)> {
144        // Maps column id to the index and data type in the actual schema.
145        let actual_schema_index: HashMap<_, _> = actual_schema
146            .iter()
147            .enumerate()
148            .map(|(idx, (column_id, data_type))| (*column_id, (idx, data_type)))
149            .collect();
150
151        let mut index_or_defaults = Vec::with_capacity(expect_schema.len());
152        let mut fields = Vec::with_capacity(expect_schema.len());
153        for (column_id, expect_data_type) in expect_schema {
154            // Safety: expect_schema comes from the same mapper.
155            let column_index = expect_metadata.column_index_by_id(*column_id).unwrap();
156            let expect_column = &expect_metadata.column_metadatas[column_index];
157            let column_field = &expect_metadata.schema.arrow_schema().fields()[column_index];
158            // For tag columns, we need to create a dictionary field.
159            if expect_column.semantic_type == SemanticType::Tag {
160                let field = tag_maybe_to_dictionary_field(
161                    &expect_column.column_schema.data_type,
162                    column_field,
163                );
164                fields.push(Arc::new(with_field_id(
165                    (*field).clone(),
166                    expect_column.column_id,
167                )));
168            } else {
169                let field = with_field_id(
170                    Arc::unwrap_or_clone(column_field.clone()),
171                    expect_column.column_id,
172                )
173                .with_data_type(expect_data_type.as_arrow_type());
174                fields.push(Arc::new(field))
175            };
176
177            if let Some((index, actual_data_type)) = actual_schema_index.get(column_id) {
178                let mut cast_type = None;
179
180                // Same column different type.
181                if expect_data_type != *actual_data_type {
182                    cast_type = Some(expect_data_type.clone())
183                }
184                // Source has this column.
185                index_or_defaults.push(IndexOrDefault::Index {
186                    pos: *index,
187                    cast_type,
188                });
189            } else {
190                // Create a default vector with 1 element for that column.
191                let default_vector = expect_column
192                    .column_schema
193                    .create_default_vector(1)
194                    .context(CreateDefaultSnafu {
195                        region_id: expect_metadata.region_id,
196                        column: &expect_column.column_schema.name,
197                    })?
198                    .with_context(|| CompatReaderSnafu {
199                        region_id: expect_metadata.region_id,
200                        reason: format!(
201                            "column {} does not have a default value to read",
202                            expect_column.column_schema.name
203                        ),
204                    })?;
205                index_or_defaults.push(IndexOrDefault::DefaultValue {
206                    default_vector,
207                    semantic_type: expect_column.semantic_type,
208                });
209            };
210        }
211        fields.extend_from_slice(&internal_fields());
212
213        Ok((index_or_defaults, fields))
214    }
215
216    fn try_new_compact_sparse(
217        mapper: &FlatProjectionMapper,
218        actual: &RegionMetadataRef,
219    ) -> Result<Option<Self>> {
220        // Currently, we don't support converting sparse encoding back to dense encoding in
221        // flat format.
222        ensure!(
223            mapper.metadata().primary_key_encoding == PrimaryKeyEncoding::Sparse,
224            UnsupportedOperationSnafu {
225                err_msg: "Flat format doesn't support converting sparse encoding back to dense encoding"
226            }
227        );
228
229        // For sparse encoding, we don't need to check the primary keys.
230        // Since this is for compaction, we always read all columns.
231        let actual_schema: Vec<_> = actual
232            .field_columns()
233            .chain([actual.time_index_column()])
234            .map(|col| (col.column_id, col.column_schema.data_type.clone()))
235            .collect();
236        let expect_schema: Vec<_> = mapper
237            .metadata()
238            .field_columns()
239            .chain([mapper.metadata().time_index_column()])
240            .map(|col| (col.column_id, col.column_schema.data_type.clone()))
241            .collect();
242
243        let (index_or_defaults, fields) =
244            Self::compute_index_and_fields(&actual_schema, &expect_schema, mapper.metadata())?;
245
246        let compat_pk = FlatCompatPrimaryKey::default();
247
248        Ok(Some(Self {
249            index_or_defaults,
250            arrow_schema: Arc::new(Schema::new(fields)),
251            compat_pk,
252        }))
253    }
254
255    /// Make columns of the `batch` compatible.
256    pub(crate) fn compat(&self, batch: RecordBatch) -> Result<RecordBatch> {
257        let len = batch.num_rows();
258        let columns = self
259            .index_or_defaults
260            .iter()
261            .map(|index_or_default| match index_or_default {
262                IndexOrDefault::Index { pos, cast_type } => {
263                    let old_column = batch.column(*pos);
264
265                    if let Some(ty) = cast_type {
266                        let casted = if let Some(json_type) = ty.as_json()
267                            && json_type.is_json2()
268                        {
269                            JsonArray::from(old_column)
270                                .try_align(&json_type.as_arrow_type())
271                                .context(ConvertValueSnafu)?
272                        } else {
273                            datatypes::arrow::compute::cast(old_column, &ty.as_arrow_type())
274                                .context(ComputeArrowSnafu)?
275                        };
276                        Ok(casted)
277                    } else {
278                        Ok(old_column.clone())
279                    }
280                }
281                IndexOrDefault::DefaultValue {
282                    default_vector,
283                    semantic_type,
284                } => repeat_vector(default_vector, len, *semantic_type == SemanticType::Tag),
285            })
286            .chain(
287                // Adds internal columns.
288                batch.columns()[batch.num_columns() - INTERNAL_COLUMN_NUM..]
289                    .iter()
290                    .map(|col| Ok(col.clone())),
291            )
292            .collect::<Result<Vec<_>>>()?;
293
294        let compat_batch = RecordBatch::try_new(self.arrow_schema.clone(), columns)
295            .context(NewRecordBatchSnafu)?;
296
297        // Handles primary keys.
298        self.compat_pk.compat(compat_batch)
299    }
300}
301
302/// Repeats the vector value `to_len` times.
303fn repeat_vector(vector: &VectorRef, to_len: usize, is_tag: bool) -> Result<ArrayRef> {
304    assert_eq!(1, vector.len());
305    let data_type = vector.data_type();
306    if is_tag && data_type.is_string() {
307        let values = vector.to_arrow_array();
308        if values.is_null(0) {
309            // Creates a dictionary array with `to_len` null keys.
310            let keys = UInt32Array::new_null(to_len);
311            Ok(Arc::new(DictionaryArray::new(keys, values.slice(0, 0))))
312        } else {
313            let keys = UInt32Array::from_value(0, to_len);
314            Ok(Arc::new(DictionaryArray::new(keys, values)))
315        }
316    } else {
317        let keys = UInt32Array::from_value(0, to_len);
318        take(
319            &vector.to_arrow_array(),
320            &keys,
321            Some(TakeOptions {
322                check_bounds: false,
323            }),
324        )
325        .context(ComputeArrowSnafu)
326    }
327}
328
329/// Returns true if the actual primary keys is the same as expected.
330fn is_primary_key_same(expect: &RegionMetadata, actual: &RegionMetadata) -> Result<bool> {
331    ensure!(
332        actual.primary_key.len() <= expect.primary_key.len(),
333        CompatReaderSnafu {
334            region_id: expect.region_id,
335            reason: format!(
336                "primary key has more columns {} than expect {}",
337                actual.primary_key.len(),
338                expect.primary_key.len()
339            ),
340        }
341    );
342    ensure!(
343        actual.primary_key == expect.primary_key[..actual.primary_key.len()],
344        CompatReaderSnafu {
345            region_id: expect.region_id,
346            reason: format!(
347                "primary key has different prefix, expect: {:?}, actual: {:?}",
348                expect.primary_key, actual.primary_key
349            ),
350        }
351    );
352
353    Ok(actual.primary_key.len() == expect.primary_key.len())
354}
355
356/// Index in source batch or a default value to fill a column.
357#[derive(Debug)]
358enum IndexOrDefault {
359    /// Index of the column in source batch.
360    Index {
361        pos: usize,
362        cast_type: Option<ConcreteDataType>,
363    },
364    /// Default value for the column.
365    DefaultValue {
366        /// Default value. The vector has only 1 element.
367        default_vector: VectorRef,
368        /// Semantic type of the column.
369        semantic_type: SemanticType,
370    },
371}
372
373/// Helper to rewrite primary key to another encoding for flat format.
374struct FlatRewritePrimaryKey {
375    /// New primary key encoder.
376    codec: Arc<dyn PrimaryKeyCodec>,
377    /// Metadata of the expected region.
378    metadata: RegionMetadataRef,
379    /// Original primary key codec.
380    /// If we need to rewrite the primary key.
381    old_codec: Arc<dyn PrimaryKeyCodec>,
382}
383
384impl FlatRewritePrimaryKey {
385    fn new(
386        expect: &RegionMetadataRef,
387        actual: &RegionMetadataRef,
388    ) -> Option<FlatRewritePrimaryKey> {
389        if expect.primary_key_encoding == actual.primary_key_encoding {
390            return None;
391        }
392        let codec = build_primary_key_codec(expect);
393        let old_codec = build_primary_key_codec(actual);
394
395        Some(FlatRewritePrimaryKey {
396            codec,
397            metadata: expect.clone(),
398            old_codec,
399        })
400    }
401
402    /// Rewrites the primary key of the `batch`.
403    /// It also appends the values to the primary key.
404    fn rewrite_key(
405        &self,
406        append_values: &[(ColumnId, Value)],
407        batch: RecordBatch,
408    ) -> Result<RecordBatch> {
409        let old_pk_dict_array = batch
410            .column(primary_key_column_index(batch.num_columns()))
411            .as_any()
412            .downcast_ref::<PrimaryKeyArray>()
413            .unwrap();
414        let old_pk_values_array = old_pk_dict_array
415            .values()
416            .as_any()
417            .downcast_ref::<BinaryArray>()
418            .unwrap();
419        let mut builder = BinaryBuilder::with_capacity(
420            old_pk_values_array.len(),
421            old_pk_values_array.value_data().len(),
422        );
423
424        // Binary buffer for the primary key.
425        let mut buffer = Vec::with_capacity(
426            old_pk_values_array.value_data().len() / old_pk_values_array.len().max(1),
427        );
428        let mut column_id_values = Vec::new();
429        // Iterates the binary array and rewrites the primary key.
430        for value in old_pk_values_array.iter() {
431            let Some(old_pk) = value else {
432                builder.append_null();
433                continue;
434            };
435            // Decodes the old primary key.
436            let mut pk_values = self.old_codec.decode(old_pk).context(DecodeSnafu)?;
437            pk_values.extend(append_values);
438
439            buffer.clear();
440            column_id_values.clear();
441            // Encodes the new primary key.
442            match pk_values {
443                CompositeValues::Dense(dense_values) => {
444                    self.codec
445                        .encode_values(dense_values.as_slice(), &mut buffer)
446                        .context(EncodeSnafu)?;
447                }
448                CompositeValues::Sparse(sparse_values) => {
449                    for id in &self.metadata.primary_key {
450                        let value = sparse_values.get_or_null(*id);
451                        column_id_values.push((*id, value.clone()));
452                    }
453                    self.codec
454                        .encode_values(&column_id_values, &mut buffer)
455                        .context(EncodeSnafu)?;
456                }
457            }
458            builder.append_value(&buffer);
459        }
460        let new_pk_values_array = Arc::new(builder.finish());
461        let new_pk_dict_array =
462            PrimaryKeyArray::new(old_pk_dict_array.keys().clone(), new_pk_values_array);
463
464        let mut columns = batch.columns().to_vec();
465        columns[primary_key_column_index(batch.num_columns())] = Arc::new(new_pk_dict_array);
466
467        RecordBatch::try_new(batch.schema(), columns).context(NewRecordBatchSnafu)
468    }
469}
470
471/// Helper to make primary key compatible for flat format.
472#[derive(Default)]
473struct FlatCompatPrimaryKey {
474    /// Primary key rewriter.
475    rewriter: Option<FlatRewritePrimaryKey>,
476    /// Converter to append values to primary keys.
477    converter: Option<Arc<dyn PrimaryKeyCodec>>,
478    /// Default values to append.
479    values: Vec<(ColumnId, Value)>,
480}
481
482impl FlatCompatPrimaryKey {
483    fn new(expect: &RegionMetadataRef, actual: &RegionMetadataRef) -> Result<Self> {
484        let rewriter = FlatRewritePrimaryKey::new(expect, actual);
485
486        if is_primary_key_same(expect, actual)? {
487            return Ok(Self {
488                rewriter,
489                converter: None,
490                values: Vec::new(),
491            });
492        }
493
494        // We need to append default values to the primary key.
495        let to_add = &expect.primary_key[actual.primary_key.len()..];
496        let mut values = Vec::with_capacity(to_add.len());
497        let mut fields = Vec::with_capacity(to_add.len());
498        for column_id in to_add {
499            // Safety: The id comes from expect region metadata.
500            let column = expect.column_by_id(*column_id).unwrap();
501            fields.push((
502                *column_id,
503                SortField::new(column.column_schema.data_type.clone()),
504            ));
505            let default_value = column
506                .column_schema
507                .create_default()
508                .context(CreateDefaultSnafu {
509                    region_id: expect.region_id,
510                    column: &column.column_schema.name,
511                })?
512                .with_context(|| CompatReaderSnafu {
513                    region_id: expect.region_id,
514                    reason: format!(
515                        "key column {} does not have a default value to read",
516                        column.column_schema.name
517                    ),
518                })?;
519            values.push((*column_id, default_value));
520        }
521        // is_primary_key_same() is false so we have different number of primary key columns.
522        debug_assert!(!fields.is_empty());
523
524        // Create converter to append values.
525        let converter = Some(build_primary_key_codec_with_fields(
526            expect.primary_key_encoding,
527            fields.into_iter(),
528        ));
529
530        Ok(Self {
531            rewriter,
532            converter,
533            values,
534        })
535    }
536
537    /// Makes primary key of the `batch` compatible.
538    ///
539    /// Callers must ensure other columns except the `__primary_key` column is compatible.
540    fn compat(&self, batch: RecordBatch) -> Result<RecordBatch> {
541        if let Some(rewriter) = &self.rewriter {
542            // If we have different encoding, rewrite the whole primary key.
543            return rewriter.rewrite_key(&self.values, batch);
544        }
545
546        self.append_key(batch)
547    }
548
549    /// Appends values to the primary key of the `batch`.
550    fn append_key(&self, batch: RecordBatch) -> Result<RecordBatch> {
551        let Some(converter) = &self.converter else {
552            return Ok(batch);
553        };
554
555        let old_pk_dict_array = batch
556            .column(primary_key_column_index(batch.num_columns()))
557            .as_any()
558            .downcast_ref::<PrimaryKeyArray>()
559            .unwrap();
560        let old_pk_values_array = old_pk_dict_array
561            .values()
562            .as_any()
563            .downcast_ref::<BinaryArray>()
564            .unwrap();
565        let mut builder = BinaryBuilder::with_capacity(
566            old_pk_values_array.len(),
567            old_pk_values_array.value_data().len()
568                + converter.estimated_size().unwrap_or_default() * old_pk_values_array.len(),
569        );
570
571        // Binary buffer for the primary key.
572        let mut buffer = Vec::with_capacity(
573            old_pk_values_array.value_data().len() / old_pk_values_array.len().max(1)
574                + converter.estimated_size().unwrap_or_default(),
575        );
576
577        // Iterates the binary array and appends values to the primary key.
578        for value in old_pk_values_array.iter() {
579            let Some(old_pk) = value else {
580                builder.append_null();
581                continue;
582            };
583
584            buffer.clear();
585            buffer.extend_from_slice(old_pk);
586            converter
587                .encode_values(&self.values, &mut buffer)
588                .context(EncodeSnafu)?;
589
590            builder.append_value(&buffer);
591        }
592
593        let new_pk_values_array = Arc::new(builder.finish());
594        let new_pk_dict_array =
595            PrimaryKeyArray::new(old_pk_dict_array.keys().clone(), new_pk_values_array);
596
597        // Overrides the primary key column.
598        let mut columns = batch.columns().to_vec();
599        columns[primary_key_column_index(batch.num_columns())] = Arc::new(new_pk_dict_array);
600
601        RecordBatch::try_new(batch.schema(), columns).context(NewRecordBatchSnafu)
602    }
603}
604
605#[cfg(test)]
606mod tests {
607    use std::sync::Arc;
608
609    use api::v1::{OpType, SemanticType};
610    use datatypes::arrow::array::{
611        ArrayRef, BinaryDictionaryBuilder, Int64Array, StringDictionaryBuilder,
612        TimestampMillisecondArray, UInt8Array, UInt64Array,
613    };
614    use datatypes::arrow::datatypes::UInt32Type;
615    use datatypes::arrow::record_batch::RecordBatch;
616    use datatypes::prelude::ConcreteDataType;
617    use datatypes::schema::ColumnSchema;
618    use datatypes::value::ValueRef;
619    use mito_codec::row_converter::{
620        DensePrimaryKeyCodec, PrimaryKeyCodecExt, SparsePrimaryKeyCodec,
621    };
622    use store_api::codec::PrimaryKeyEncoding;
623    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
624    use store_api::storage::RegionId;
625
626    use super::*;
627    use crate::read::flat_projection::FlatProjectionMapper;
628    use crate::read::read_columns::ReadColumns;
629    use crate::sst::parquet::flat_format::FlatReadFormat;
630    use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
631
632    /// Creates a new [RegionMetadata].
633    fn new_metadata(
634        semantic_types: &[(ColumnId, SemanticType, ConcreteDataType)],
635        primary_key: &[ColumnId],
636    ) -> RegionMetadata {
637        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
638        for (id, semantic_type, data_type) in semantic_types {
639            let column_schema = match semantic_type {
640                SemanticType::Tag => {
641                    ColumnSchema::new(format!("tag_{id}"), data_type.clone(), true)
642                }
643                SemanticType::Field => {
644                    ColumnSchema::new(format!("field_{id}"), data_type.clone(), true)
645                }
646                SemanticType::Timestamp => ColumnSchema::new("ts", data_type.clone(), false),
647            };
648
649            builder.push_column_metadata(ColumnMetadata {
650                column_schema,
651                semantic_type: *semantic_type,
652                column_id: *id,
653            });
654        }
655        builder.primary_key(primary_key.to_vec());
656        builder.build().unwrap()
657    }
658
659    /// Encode primary key.
660    fn encode_key(keys: &[Option<&str>]) -> Vec<u8> {
661        let fields = (0..keys.len())
662            .map(|_| (0, SortField::new(ConcreteDataType::string_datatype())))
663            .collect();
664        let converter = DensePrimaryKeyCodec::with_fields(fields);
665        let row = keys.iter().map(|str_opt| match str_opt {
666            Some(v) => ValueRef::String(v),
667            None => ValueRef::Null,
668        });
669
670        converter.encode(row).unwrap()
671    }
672
673    /// Encode sparse primary key.
674    fn encode_sparse_key(keys: &[(ColumnId, Option<&str>)]) -> Vec<u8> {
675        let fields = (0..keys.len())
676            .map(|_| (1, SortField::new(ConcreteDataType::string_datatype())))
677            .collect();
678        let converter = SparsePrimaryKeyCodec::with_fields(fields);
679        let row = keys
680            .iter()
681            .map(|(id, str_opt)| match str_opt {
682                Some(v) => (*id, ValueRef::String(v)),
683                None => (*id, ValueRef::Null),
684            })
685            .collect::<Vec<_>>();
686        let mut buffer = vec![];
687        converter.encode_value_refs(&row, &mut buffer).unwrap();
688        buffer
689    }
690
691    /// Creates a primary key array for flat format testing.
692    fn build_flat_test_pk_array(primary_keys: &[&[u8]]) -> ArrayRef {
693        let mut builder = BinaryDictionaryBuilder::<UInt32Type>::new();
694        for &pk in primary_keys {
695            builder.append(pk).unwrap();
696        }
697        Arc::new(builder.finish())
698    }
699
700    #[test]
701    fn test_flat_compat_batch_with_missing_columns() {
702        let actual_metadata = Arc::new(new_metadata(
703            &[
704                (
705                    0,
706                    SemanticType::Timestamp,
707                    ConcreteDataType::timestamp_millisecond_datatype(),
708                ),
709                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
710                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
711            ],
712            &[1],
713        ));
714
715        let expected_metadata = Arc::new(new_metadata(
716            &[
717                (
718                    0,
719                    SemanticType::Timestamp,
720                    ConcreteDataType::timestamp_millisecond_datatype(),
721                ),
722                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
723                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
724                // Adds a new field.
725                (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
726            ],
727            &[1],
728        ));
729
730        let mapper = FlatProjectionMapper::all(&expected_metadata).unwrap();
731        let read_format = FlatReadFormat::new(
732            actual_metadata.clone(),
733            ReadColumns::from_deduped_column_ids([0, 1, 2, 3]),
734            None,
735            "test",
736            false,
737        )
738        .unwrap();
739
740        let compat_batch = FlatCompatBatch::try_new(&mapper, &read_format, false)
741            .unwrap()
742            .unwrap();
743
744        let mut tag_builder = StringDictionaryBuilder::<UInt32Type>::new();
745        tag_builder.append_value("tag1");
746        tag_builder.append_value("tag1");
747        let tag_dict_array = Arc::new(tag_builder.finish());
748
749        let k1 = encode_key(&[Some("tag1")]);
750        let input_columns: Vec<ArrayRef> = vec![
751            tag_dict_array.clone(),
752            Arc::new(Int64Array::from(vec![100, 200])),
753            Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
754            build_flat_test_pk_array(&[&k1, &k1]),
755            Arc::new(UInt64Array::from_iter_values([1, 2])),
756            Arc::new(UInt8Array::from_iter_values([
757                OpType::Put as u8,
758                OpType::Put as u8,
759            ])),
760        ];
761        let input_schema =
762            to_flat_sst_arrow_schema(&actual_metadata, &FlatSchemaOptions::default());
763        let input_batch = RecordBatch::try_new(input_schema, input_columns).unwrap();
764
765        let result = compat_batch.compat(input_batch).unwrap();
766
767        let expected_schema =
768            to_flat_sst_arrow_schema(&expected_metadata, &FlatSchemaOptions::default());
769
770        let expected_columns: Vec<ArrayRef> = vec![
771            tag_dict_array.clone(),
772            Arc::new(Int64Array::from(vec![100, 200])),
773            Arc::new(Int64Array::from(vec![None::<i64>, None::<i64>])),
774            Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
775            build_flat_test_pk_array(&[&k1, &k1]),
776            Arc::new(UInt64Array::from_iter_values([1, 2])),
777            Arc::new(UInt8Array::from_iter_values([
778                OpType::Put as u8,
779                OpType::Put as u8,
780            ])),
781        ];
782        let expected_batch = RecordBatch::try_new(expected_schema, expected_columns).unwrap();
783
784        assert_eq!(expected_batch, result);
785    }
786
787    #[test]
788    fn test_flat_compat_batch_with_read_projection_superset() {
789        let actual_metadata = Arc::new(new_metadata(
790            &[
791                (
792                    0,
793                    SemanticType::Timestamp,
794                    ConcreteDataType::timestamp_millisecond_datatype(),
795                ),
796                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
797                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
798            ],
799            &[1],
800        ));
801
802        let expected_metadata = Arc::new(new_metadata(
803            &[
804                (
805                    0,
806                    SemanticType::Timestamp,
807                    ConcreteDataType::timestamp_millisecond_datatype(),
808                ),
809                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
810                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
811                // Adds a new field.
812                (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
813            ],
814            &[1],
815        ));
816
817        let mapper = FlatProjectionMapper::new_with_read_columns(
818            &expected_metadata,
819            vec![1, 2],
820            ReadColumns::from_deduped_column_ids([1, 2, 3]),
821            None,
822        )
823        .unwrap();
824        let read_format = FlatReadFormat::new(
825            actual_metadata.clone(),
826            ReadColumns::from_deduped_column_ids([1, 2, 3]),
827            None,
828            "test",
829            false,
830        )
831        .unwrap();
832
833        let compat_batch = FlatCompatBatch::try_new(&mapper, &read_format, false)
834            .unwrap()
835            .unwrap();
836
837        let mut tag_builder = StringDictionaryBuilder::<UInt32Type>::new();
838        tag_builder.append_value("tag1");
839        tag_builder.append_value("tag1");
840        let tag_dict_array = Arc::new(tag_builder.finish());
841
842        let k1 = encode_key(&[Some("tag1")]);
843        let input_columns: Vec<ArrayRef> = vec![
844            tag_dict_array.clone(),
845            Arc::new(Int64Array::from(vec![100, 200])),
846            Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
847            build_flat_test_pk_array(&[&k1, &k1]),
848            Arc::new(UInt64Array::from_iter_values([1, 2])),
849            Arc::new(UInt8Array::from_iter_values([
850                OpType::Put as u8,
851                OpType::Put as u8,
852            ])),
853        ];
854        let input_schema =
855            to_flat_sst_arrow_schema(&actual_metadata, &FlatSchemaOptions::default());
856        let input_batch = RecordBatch::try_new(input_schema, input_columns).unwrap();
857
858        let result = compat_batch.compat(input_batch).unwrap();
859
860        let expected_schema =
861            to_flat_sst_arrow_schema(&expected_metadata, &FlatSchemaOptions::default());
862        let expected_columns: Vec<ArrayRef> = vec![
863            tag_dict_array.clone(),
864            Arc::new(Int64Array::from(vec![100, 200])),
865            Arc::new(Int64Array::from(vec![None::<i64>, None::<i64>])),
866            Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
867            build_flat_test_pk_array(&[&k1, &k1]),
868            Arc::new(UInt64Array::from_iter_values([1, 2])),
869            Arc::new(UInt8Array::from_iter_values([
870                OpType::Put as u8,
871                OpType::Put as u8,
872            ])),
873        ];
874        let expected_batch = RecordBatch::try_new(expected_schema, expected_columns).unwrap();
875
876        assert_eq!(expected_batch, result);
877    }
878
879    #[test]
880    fn test_flat_compat_batch_with_different_pk_encoding() {
881        let mut actual_metadata = new_metadata(
882            &[
883                (
884                    0,
885                    SemanticType::Timestamp,
886                    ConcreteDataType::timestamp_millisecond_datatype(),
887                ),
888                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
889                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
890            ],
891            &[1],
892        );
893        actual_metadata.primary_key_encoding = PrimaryKeyEncoding::Dense;
894        let actual_metadata = Arc::new(actual_metadata);
895
896        let mut expected_metadata = new_metadata(
897            &[
898                (
899                    0,
900                    SemanticType::Timestamp,
901                    ConcreteDataType::timestamp_millisecond_datatype(),
902                ),
903                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
904                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
905                (3, SemanticType::Tag, ConcreteDataType::string_datatype()),
906            ],
907            &[1, 3],
908        );
909        expected_metadata.primary_key_encoding = PrimaryKeyEncoding::Sparse;
910        let expected_metadata = Arc::new(expected_metadata);
911
912        let mapper = FlatProjectionMapper::all(&expected_metadata).unwrap();
913        let read_format = FlatReadFormat::new(
914            actual_metadata.clone(),
915            ReadColumns::from_deduped_column_ids([0, 1, 2, 3]),
916            None,
917            "test",
918            false,
919        )
920        .unwrap();
921
922        let compat_batch = FlatCompatBatch::try_new(&mapper, &read_format, false)
923            .unwrap()
924            .unwrap();
925
926        // Tag array.
927        let mut tag1_builder = StringDictionaryBuilder::<UInt32Type>::new();
928        tag1_builder.append_value("tag1");
929        tag1_builder.append_value("tag1");
930        let tag1_dict_array = Arc::new(tag1_builder.finish());
931
932        let k1 = encode_key(&[Some("tag1")]);
933        let input_columns: Vec<ArrayRef> = vec![
934            tag1_dict_array.clone(),
935            Arc::new(Int64Array::from(vec![100, 200])),
936            Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
937            build_flat_test_pk_array(&[&k1, &k1]),
938            Arc::new(UInt64Array::from_iter_values([1, 2])),
939            Arc::new(UInt8Array::from_iter_values([
940                OpType::Put as u8,
941                OpType::Put as u8,
942            ])),
943        ];
944        let input_schema =
945            to_flat_sst_arrow_schema(&actual_metadata, &FlatSchemaOptions::default());
946        let input_batch = RecordBatch::try_new(input_schema, input_columns).unwrap();
947
948        let result = compat_batch.compat(input_batch).unwrap();
949
950        let sparse_k1 = encode_sparse_key(&[(1, Some("tag1")), (3, None)]);
951        let mut null_tag_builder = StringDictionaryBuilder::<UInt32Type>::new();
952        null_tag_builder.append_nulls(2);
953        let null_tag_dict_array = Arc::new(null_tag_builder.finish());
954        let expected_columns: Vec<ArrayRef> = vec![
955            tag1_dict_array.clone(),
956            null_tag_dict_array,
957            Arc::new(Int64Array::from(vec![100, 200])),
958            Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
959            build_flat_test_pk_array(&[&sparse_k1, &sparse_k1]),
960            Arc::new(UInt64Array::from_iter_values([1, 2])),
961            Arc::new(UInt8Array::from_iter_values([
962                OpType::Put as u8,
963                OpType::Put as u8,
964            ])),
965        ];
966        let output_schema =
967            to_flat_sst_arrow_schema(&expected_metadata, &FlatSchemaOptions::default());
968        let expected_batch = RecordBatch::try_new(output_schema, expected_columns).unwrap();
969
970        assert_eq!(expected_batch, result);
971    }
972
973    #[test]
974    fn test_flat_compat_batch_compact_sparse() {
975        let mut actual_metadata = new_metadata(
976            &[
977                (
978                    0,
979                    SemanticType::Timestamp,
980                    ConcreteDataType::timestamp_millisecond_datatype(),
981                ),
982                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
983            ],
984            &[],
985        );
986        actual_metadata.primary_key_encoding = PrimaryKeyEncoding::Sparse;
987        let actual_metadata = Arc::new(actual_metadata);
988
989        let mut expected_metadata = new_metadata(
990            &[
991                (
992                    0,
993                    SemanticType::Timestamp,
994                    ConcreteDataType::timestamp_millisecond_datatype(),
995                ),
996                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
997                (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
998            ],
999            &[],
1000        );
1001        expected_metadata.primary_key_encoding = PrimaryKeyEncoding::Sparse;
1002        let expected_metadata = Arc::new(expected_metadata);
1003
1004        let mapper = FlatProjectionMapper::all(&expected_metadata).unwrap();
1005        let read_format = FlatReadFormat::new(
1006            actual_metadata.clone(),
1007            ReadColumns::from_deduped_column_ids([0, 2, 3]),
1008            None,
1009            "test",
1010            true,
1011        )
1012        .unwrap();
1013
1014        let compat_batch = FlatCompatBatch::try_new(&mapper, &read_format, true)
1015            .unwrap()
1016            .unwrap();
1017
1018        let sparse_k1 = encode_sparse_key(&[]);
1019        let input_columns: Vec<ArrayRef> = vec![
1020            Arc::new(Int64Array::from(vec![100, 200])),
1021            Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
1022            build_flat_test_pk_array(&[&sparse_k1, &sparse_k1]),
1023            Arc::new(UInt64Array::from_iter_values([1, 2])),
1024            Arc::new(UInt8Array::from_iter_values([
1025                OpType::Put as u8,
1026                OpType::Put as u8,
1027            ])),
1028        ];
1029        let input_schema =
1030            to_flat_sst_arrow_schema(&actual_metadata, &FlatSchemaOptions::default());
1031        let input_batch = RecordBatch::try_new(input_schema, input_columns).unwrap();
1032
1033        let result = compat_batch.compat(input_batch).unwrap();
1034
1035        let expected_columns: Vec<ArrayRef> = vec![
1036            Arc::new(Int64Array::from(vec![100, 200])),
1037            Arc::new(Int64Array::from(vec![None::<i64>, None::<i64>])),
1038            Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
1039            build_flat_test_pk_array(&[&sparse_k1, &sparse_k1]),
1040            Arc::new(UInt64Array::from_iter_values([1, 2])),
1041            Arc::new(UInt8Array::from_iter_values([
1042                OpType::Put as u8,
1043                OpType::Put as u8,
1044            ])),
1045        ];
1046        let output_schema =
1047            to_flat_sst_arrow_schema(&expected_metadata, &FlatSchemaOptions::default());
1048        let expected_batch = RecordBatch::try_new(output_schema, expected_columns).unwrap();
1049
1050        assert_eq!(expected_batch, result);
1051    }
1052}