feat: add FlatConvertFormat to convert record batches in old format to the flat format (#6786)

* feat: add convert format to FlatReadFormat

Signed-off-by: evenyag <realevenyag@gmail.com>

* test: test convert format

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: only convert string pks to dictionary

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
Yingwen
2025-08-25 14:47:06 +08:00
committed by GitHub
parent 83a65a81c0
commit d5575d3fa4
5 changed files with 658 additions and 54 deletions

View File

@@ -93,6 +93,8 @@ pub const COLUMN_ID_ENCODE_SIZE: usize = 4;
impl SparsePrimaryKeyCodec {
/// Creates a new [`SparsePrimaryKeyCodec`] instance.
///
/// The `region_metadata` should be the metadata of the logical region.
pub fn new(region_metadata: &RegionMetadataRef) -> Self {
Self {
inner: Arc::new(SparsePrimaryKeyCodecInner {
@@ -123,6 +125,7 @@ impl SparsePrimaryKeyCodec {
}
}
/// Creates a new [`SparsePrimaryKeyCodec`] instance with additional label `fields`.
pub fn with_fields(fields: Vec<(ColumnId, SortField)>) -> Self {
Self {
inner: Arc::new(SparsePrimaryKeyCodecInner {

View File

@@ -45,7 +45,7 @@ use crate::read::projection::{PrimaryKeyProjectionMapper, ProjectionMapper};
use crate::read::{Batch, BatchColumn, BatchReader};
use crate::sst::parquet::flat_format::primary_key_column_index;
use crate::sst::parquet::format::{FormatProjection, PrimaryKeyArray, INTERNAL_COLUMN_NUM};
use crate::sst::{internal_fields, to_dictionary_field};
use crate::sst::{internal_fields, tag_maybe_to_dictionary_field};
/// Reader to adapt schema of underlying reader to expected schema.
pub struct CompatReader<R> {
@@ -228,7 +228,10 @@ impl FlatCompatBatch {
let column_field = &mapper.metadata().schema.arrow_schema().fields()[column_index];
// For tag columns, we need to create a dictionary field.
if expect_column.semantic_type == SemanticType::Tag {
fields.push(Arc::new(to_dictionary_field(column_field)));
fields.push(tag_maybe_to_dictionary_field(
&expect_column.column_schema.data_type,
column_field,
));
} else {
fields.push(column_field.clone());
};
@@ -1437,7 +1440,8 @@ mod tests {
));
let mapper = FlatProjectionMapper::all(&expected_metadata).unwrap();
let read_format = FlatReadFormat::new(actual_metadata.clone(), [0, 1, 2, 3].into_iter());
let read_format =
FlatReadFormat::new(actual_metadata.clone(), [0, 1, 2, 3].into_iter(), false);
let format_projection = read_format.format_projection();
let compat_batch =
@@ -1520,7 +1524,8 @@ mod tests {
let expected_metadata = Arc::new(expected_metadata);
let mapper = FlatProjectionMapper::all(&expected_metadata).unwrap();
let read_format = FlatReadFormat::new(actual_metadata.clone(), [0, 1, 2, 3].into_iter());
let read_format =
FlatReadFormat::new(actual_metadata.clone(), [0, 1, 2, 3].into_iter(), false);
let format_projection = read_format.format_projection();
let compat_batch =

View File

@@ -21,6 +21,7 @@ use common_base::readable_size::ReadableSize;
use datatypes::arrow::datatypes::{
DataType as ArrowDataType, Field, FieldRef, Fields, Schema, SchemaRef,
};
use datatypes::prelude::ConcreteDataType;
use store_api::codec::PrimaryKeyEncoding;
use store_api::metadata::RegionMetadata;
use store_api::storage::consts::{
@@ -120,17 +121,13 @@ pub fn to_flat_sst_arrow_schema(
if options.raw_pk_columns {
for pk_id in &metadata.primary_key {
let pk_index = metadata.column_index_by_id(*pk_id).unwrap();
if options.string_pk_use_dict
&& metadata.column_metadatas[pk_index]
.column_schema
.data_type
.is_string()
{
let field = &schema.fields[pk_index];
let field = Arc::new(to_dictionary_field(field));
fields.push(field);
} else {
fields.push(schema.fields[pk_index].clone());
if options.string_pk_use_dict {
let old_field = &schema.fields[pk_index];
let new_field = tag_maybe_to_dictionary_field(
&metadata.column_metadatas[pk_index].column_schema.data_type,
old_field,
);
fields.push(new_field);
}
}
}
@@ -155,7 +152,7 @@ pub fn to_flat_sst_arrow_schema(
}
/// Helper function to create a dictionary field from a field.
pub(crate) fn to_dictionary_field(field: &Field) -> Field {
fn to_dictionary_field(field: &Field) -> Field {
Field::new_dictionary(
field.name(),
datatypes::arrow::datatypes::DataType::UInt32,
@@ -164,6 +161,18 @@ pub(crate) fn to_dictionary_field(field: &Field) -> Field {
)
}
/// Helper function to create a dictionary field from a field if it is a string column.
pub(crate) fn tag_maybe_to_dictionary_field(
data_type: &ConcreteDataType,
field: &Arc<Field>,
) -> Arc<Field> {
if data_type.is_string() {
Arc::new(to_dictionary_field(field))
} else {
field.clone()
}
}
/// Fields for internal columns.
pub(crate) fn internal_fields() -> [FieldRef; 3] {
// Internal columns are always not null.

View File

@@ -33,17 +33,27 @@ use std::collections::HashMap;
use std::sync::Arc;
use api::v1::SemanticType;
use datatypes::arrow::array::{ArrayRef, UInt64Array};
use datatypes::arrow::datatypes::SchemaRef;
use datatypes::arrow::array::{
Array, ArrayRef, BinaryArray, DictionaryArray, UInt32Array, UInt64Array,
};
use datatypes::arrow::compute::kernels::take::take;
use datatypes::arrow::datatypes::{Schema, SchemaRef};
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::prelude::{ConcreteDataType, DataType};
use mito_codec::row_converter::{build_primary_key_codec, CompositeValues, PrimaryKeyCodec};
use parquet::file::metadata::RowGroupMetaData;
use snafu::ResultExt;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::metadata::{RegionMetadata, RegionMetadataRef};
use store_api::storage::{ColumnId, SequenceNumber};
use crate::error::{NewRecordBatchSnafu, Result};
use crate::sst::parquet::format::{FormatProjection, ReadFormat, StatValues};
use crate::sst::{to_flat_sst_arrow_schema, FlatSchemaOptions};
use crate::error::{
ComputeArrowSnafu, DecodeSnafu, InvalidParquetSnafu, InvalidRecordBatchSnafu,
NewRecordBatchSnafu, Result,
};
use crate::sst::parquet::format::{
FormatProjection, PrimaryKeyArray, ReadFormat, StatValues, INTERNAL_COLUMN_NUM,
};
use crate::sst::{tag_maybe_to_dictionary_field, to_flat_sst_arrow_schema, FlatSchemaOptions};
/// Helper for writing the SST format.
#[allow(dead_code)]
@@ -128,12 +138,14 @@ pub struct FlatReadFormat {
metadata: RegionMetadataRef,
/// SST file schema.
arrow_schema: SchemaRef,
/// Projection.
/// Projection computed for the format.
format_projection: FormatProjection,
/// Column id to index in SST.
column_id_to_sst_index: HashMap<ColumnId, usize>,
/// Sequence number to override the sequence read from the SST.
override_sequence: Option<SequenceNumber>,
/// Optional format converter for handling flat format conversion.
convert_format: Option<FlatConvertFormat>,
}
impl FlatReadFormat {
@@ -141,6 +153,7 @@ impl FlatReadFormat {
pub fn new(
metadata: RegionMetadataRef,
column_ids: impl Iterator<Item = ColumnId>,
convert_to_flat: bool,
) -> FlatReadFormat {
let arrow_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
@@ -153,12 +166,20 @@ impl FlatReadFormat {
column_ids,
);
let convert_format = if convert_to_flat {
let codec = build_primary_key_codec(&metadata);
FlatConvertFormat::new(Arc::clone(&metadata), &format_projection, codec)
} else {
None
};
FlatReadFormat {
metadata,
arrow_schema,
format_projection,
column_id_to_sst_index: id_to_index,
override_sequence: None,
convert_format,
}
}
@@ -227,44 +248,101 @@ impl FlatReadFormat {
&self.format_projection.projection_indices
}
/// Gets the projection.
pub(crate) fn format_projection(&self) -> &FormatProjection {
&self.format_projection
}
/// Creates a sequence array to override.
#[allow(dead_code)]
pub(crate) fn new_override_sequence_array(&self, length: usize) -> Option<ArrayRef> {
self.override_sequence
.map(|seq| Arc::new(UInt64Array::from_value(seq, length)) as ArrayRef)
}
/// Convert a record batch to apply override sequence array.
/// Convert a record batch to apply flat format conversion and override sequence array.
///
/// Returns a new RecordBatch with the sequence column replaced by the override sequence array.
/// Returns a new RecordBatch with flat format conversion applied first (if enabled),
/// then the sequence column replaced by the override sequence array.
#[allow(dead_code)]
pub(crate) fn convert_batch(
&self,
record_batch: &RecordBatch,
record_batch: RecordBatch,
override_sequence_array: Option<&ArrayRef>,
) -> Result<RecordBatch> {
let Some(override_array) = override_sequence_array else {
return Ok(record_batch.clone());
// First, apply flat format conversion if enabled
let batch = if let Some(ref convert_format) = self.convert_format {
convert_format.convert(record_batch)?
} else {
record_batch
};
let mut columns = record_batch.columns().to_vec();
let sequence_column_idx = sequence_column_index(record_batch.num_columns());
// Then apply sequence override if provided
let Some(override_array) = override_sequence_array else {
return Ok(batch);
};
let mut columns = batch.columns().to_vec();
let sequence_column_idx = sequence_column_index(batch.num_columns());
// Use the provided override sequence array, slicing if necessary to match batch length
let sequence_array = if override_array.len() > record_batch.num_rows() {
override_array.slice(0, record_batch.num_rows())
let sequence_array = if override_array.len() > batch.num_rows() {
override_array.slice(0, batch.num_rows())
} else {
override_array.clone()
};
columns[sequence_column_idx] = sequence_array;
RecordBatch::try_new(record_batch.schema(), columns).context(NewRecordBatchSnafu)
RecordBatch::try_new(batch.schema(), columns).context(NewRecordBatchSnafu)
}
/// Returns the format projection.
pub(crate) fn format_projection(&self) -> &FormatProjection {
&self.format_projection
/// Checks whether the batch from the parquet file needs to be converted to match the flat format.
///
/// * `file_path` is the path to the parquet file, for error message.
/// * `num_columns` is the number of columns in the parquet file.
/// * `metadata` is the region metadata (always assumes flat format).
#[allow(dead_code)]
pub(crate) fn need_convert_to_flat(
file_path: &str,
num_columns: usize,
metadata: &RegionMetadata,
) -> Result<bool> {
// For flat format, compute expected column number:
// all columns + internal columns (pk, sequence, op_type)
let expected_columns = metadata.column_metadatas.len() + INTERNAL_COLUMN_NUM;
if expected_columns == num_columns {
// Same number of columns, no conversion needed
Ok(false)
} else {
ensure!(
expected_columns >= num_columns,
InvalidParquetSnafu {
file: file_path,
reason: format!(
"Expected columns {} should be >= actual columns {}",
expected_columns, num_columns
)
}
);
// Different number of columns, check if the difference matches primary key count
let column_diff = expected_columns - num_columns;
ensure!(
column_diff == metadata.primary_key.len(),
InvalidParquetSnafu {
file: file_path,
reason: format!(
"Column number difference {} does not match primary key count {}",
column_diff,
metadata.primary_key.len()
)
}
);
Ok(true)
}
}
fn get_stat_values(
@@ -309,6 +387,179 @@ pub(crate) fn sst_column_id_indices(metadata: &RegionMetadata) -> HashMap<Column
id_to_index
}
/// Converts a batch that doesn't have decoded primary key columns into a batch that has decoded
/// primary key columns in flat format.
pub(crate) struct FlatConvertFormat {
/// Metadata of the region.
metadata: RegionMetadataRef,
/// Primary key codec to decode primary keys.
codec: Arc<dyn PrimaryKeyCodec>,
/// Projected primary key column information: (column_id, pk_index, column_index in metadata).
projected_primary_keys: Vec<(ColumnId, usize, usize)>,
}
impl FlatConvertFormat {
/// Creates a new `FlatConvertFormat`.
///
/// The `format_projection` is the projection computed in the [FlatReadFormat] with the `metadata`.
/// The `codec` is the primary key codec of the `metadata`.
///
/// Returns `None` if there is no primary key.
pub(crate) fn new(
metadata: RegionMetadataRef,
format_projection: &FormatProjection,
codec: Arc<dyn PrimaryKeyCodec>,
) -> Option<Self> {
if metadata.primary_key.is_empty() {
return None;
}
// Builds projected primary keys list maintaining the order of RegionMetadata::primary_key
let mut projected_primary_keys = Vec::new();
for (pk_index, &column_id) in metadata.primary_key.iter().enumerate() {
if format_projection
.column_id_to_projected_index
.contains_key(&column_id)
{
// We expect the format_projection is built from the metadata.
let column_index = metadata.column_index_by_id(column_id).unwrap();
projected_primary_keys.push((column_id, pk_index, column_index));
}
}
Some(Self {
metadata,
codec,
projected_primary_keys,
})
}
/// Converts a batch to have decoded primary key columns in flat format.
///
/// The primary key array in the batch is a dictionary array. We decode each value which is a
/// primary key and reuse the keys array to build a dictionary array for each tag column.
/// The decoded columns are inserted in front of other columns.
pub(crate) fn convert(&self, batch: RecordBatch) -> Result<RecordBatch> {
if self.projected_primary_keys.is_empty() {
return Ok(batch);
}
let primary_key_index = primary_key_column_index(batch.num_columns());
let pk_dict_array = batch
.column(primary_key_index)
.as_any()
.downcast_ref::<PrimaryKeyArray>()
.with_context(|| InvalidRecordBatchSnafu {
reason: "Primary key column is not a dictionary array".to_string(),
})?;
let pk_values_array = pk_dict_array
.values()
.as_any()
.downcast_ref::<BinaryArray>()
.with_context(|| InvalidRecordBatchSnafu {
reason: "Primary key values are not binary array".to_string(),
})?;
// Decodes all primary key values
let mut decoded_pk_values = Vec::with_capacity(pk_values_array.len());
for i in 0..pk_values_array.len() {
if pk_values_array.is_null(i) {
decoded_pk_values.push(None);
} else {
let pk_bytes = pk_values_array.value(i);
let decoded = self.codec.decode(pk_bytes).context(DecodeSnafu)?;
decoded_pk_values.push(Some(decoded));
}
}
// Builds decoded tag column arrays.
let mut decoded_columns = Vec::new();
for (column_id, pk_index, column_index) in &self.projected_primary_keys {
let column_metadata = &self.metadata.column_metadatas[*column_index];
let tag_column = self.build_primary_key_column(
*column_id,
*pk_index,
&column_metadata.column_schema.data_type,
pk_dict_array.keys(),
&decoded_pk_values,
)?;
decoded_columns.push(tag_column);
}
// Builds new columns: decoded tag columns first, then original columns
let mut new_columns = Vec::with_capacity(batch.num_columns() + decoded_columns.len());
new_columns.extend(decoded_columns);
new_columns.extend_from_slice(batch.columns());
// Builds new schema
let mut new_fields =
Vec::with_capacity(batch.schema().fields().len() + self.projected_primary_keys.len());
for (_, _, column_index) in &self.projected_primary_keys {
let column_metadata = &self.metadata.column_metadatas[*column_index];
let old_field = &self.metadata.schema.arrow_schema().fields()[*column_index];
let field =
tag_maybe_to_dictionary_field(&column_metadata.column_schema.data_type, old_field);
new_fields.push(field);
}
new_fields.extend(batch.schema().fields().iter().cloned());
let new_schema = Arc::new(Schema::new(new_fields));
RecordBatch::try_new(new_schema, new_columns).context(NewRecordBatchSnafu)
}
/// Builds an array for a specific tag column.
///
/// It may build a dictionary array if the type is string. Note that the dictionary
/// array may have null values, although keys are not null.
fn build_primary_key_column(
&self,
column_id: ColumnId,
pk_index: usize,
column_type: &ConcreteDataType,
keys: &UInt32Array,
decoded_pk_values: &[Option<CompositeValues>],
) -> Result<ArrayRef> {
// Gets values from the primary key.
let mut builder = column_type.create_mutable_vector(decoded_pk_values.len());
for decoded_opt in decoded_pk_values {
match decoded_opt {
Some(decoded) => {
match decoded {
CompositeValues::Dense(dense) => {
if pk_index < dense.len() {
builder.push_value_ref(dense[pk_index].1.as_value_ref());
} else {
builder.push_null();
}
}
CompositeValues::Sparse(sparse) => {
let value = sparse.get_or_null(column_id);
builder.push_value_ref(value.as_value_ref());
}
};
}
None => builder.push_null(),
}
}
let values_vector = builder.to_vector();
let values_array = values_vector.to_arrow_array();
// Only creates dictionary array for string types, otherwise take values by keys
if matches!(column_type, ConcreteDataType::String(_)) {
// Creates dictionary array using the same keys for string types
// Note that the dictionary values may have nulls.
let dict_array = DictionaryArray::new(keys.clone(), values_array);
Ok(Arc::new(dict_array))
} else {
// For non-string types, takes values by keys indices to create a regular array
let taken_array = take(&values_array, keys, None).context(ComputeArrowSnafu)?;
Ok(taken_array)
}
}
}
#[cfg(test)]
impl FlatReadFormat {
/// Creates a helper with existing `metadata` and all columns.
@@ -316,6 +567,7 @@ impl FlatReadFormat {
Self::new(
Arc::clone(&metadata),
metadata.column_metadatas.iter().map(|c| c.column_id),
false,
)
}
}

View File

@@ -152,7 +152,7 @@ impl ReadFormat {
flat_format: bool,
) -> Self {
if flat_format {
Self::new_flat(metadata, column_ids)
Self::new_flat(metadata, column_ids, false)
} else {
Self::new_primary_key(metadata, column_ids)
}
@@ -170,8 +170,9 @@ impl ReadFormat {
pub fn new_flat(
metadata: RegionMetadataRef,
column_ids: impl Iterator<Item = ColumnId>,
convert_to_flat: bool,
) -> Self {
ReadFormat::Flat(FlatReadFormat::new(metadata, column_ids))
ReadFormat::Flat(FlatReadFormat::new(metadata, column_ids, convert_to_flat))
}
pub(crate) fn as_primary_key(&self) -> Option<&PrimaryKeyReadFormat> {
@@ -925,17 +926,25 @@ mod tests {
use std::sync::Arc;
use api::v1::OpType;
use datatypes::arrow::array::{Int64Array, TimestampMillisecondArray, UInt64Array, UInt8Array};
use datatypes::arrow::array::{
Int64Array, StringArray, TimestampMillisecondArray, UInt64Array, UInt8Array,
};
use datatypes::arrow::datatypes::{DataType as ArrowDataType, Field, Schema, TimeUnit};
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use datatypes::value::ValueRef;
use datatypes::vectors::{Int64Vector, TimestampMillisecondVector, UInt64Vector, UInt8Vector};
use mito_codec::row_converter::{
DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt, SparsePrimaryKeyCodec,
};
use store_api::codec::PrimaryKeyEncoding;
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
use store_api::storage::consts::ReservedColumnId;
use store_api::storage::RegionId;
use super::*;
use crate::sst::parquet::flat_format::FlatWriteFormat;
use crate::sst::FlatSchemaOptions;
use crate::sst::parquet::flat_format::{sequence_column_index, FlatWriteFormat};
use crate::sst::{to_flat_sst_arrow_schema, FlatSchemaOptions};
const TEST_SEQUENCE: u64 = 1;
const TEST_OP_TYPE: u8 = OpType::Put as u8;
@@ -1346,19 +1355,19 @@ mod tests {
// The projection includes all "fixed position" columns: ts(4), __primary_key(5), __sequence(6), __op_type(7)
// Only read tag1 (column_id=3, index=1) + fixed columns
let read_format = ReadFormat::new_flat(metadata.clone(), [3].iter().copied());
let read_format = ReadFormat::new_flat(metadata.clone(), [3].iter().copied(), false);
assert_eq!(&[1, 4, 5, 6, 7], read_format.projection_indices());
// Only read field1 (column_id=4, index=2) + fixed columns
let read_format = ReadFormat::new_flat(metadata.clone(), [4].iter().copied());
let read_format = ReadFormat::new_flat(metadata.clone(), [4].iter().copied(), false);
assert_eq!(&[2, 4, 5, 6, 7], read_format.projection_indices());
// Only read ts (column_id=5, index=4) + fixed columns (ts is already included in fixed)
let read_format = ReadFormat::new_flat(metadata.clone(), [5].iter().copied());
let read_format = ReadFormat::new_flat(metadata.clone(), [5].iter().copied(), false);
assert_eq!(&[4, 5, 6, 7], read_format.projection_indices());
// Read field0(column_id=2, index=3), tag0(column_id=1, index=0), ts(column_id=5, index=4) + fixed columns
let read_format = ReadFormat::new_flat(metadata, [2, 1, 5].iter().copied());
let read_format = ReadFormat::new_flat(metadata, [2, 1, 5].iter().copied(), false);
assert_eq!(&[0, 3, 4, 5, 6, 7], read_format.projection_indices());
}
@@ -1368,6 +1377,7 @@ mod tests {
let mut format = FlatReadFormat::new(
metadata,
std::iter::once(1), // Just read tag0
false,
);
let num_rows = 4;
@@ -1383,10 +1393,8 @@ mod tests {
RecordBatch::try_new(format.arrow_schema().clone(), test_columns).unwrap();
// Test without override sequence - should return clone
let result = format.convert_batch(&record_batch, None).unwrap();
let sequence_column = result.column(
crate::sst::parquet::flat_format::sequence_column_index(result.num_columns()),
);
let result = format.convert_batch(record_batch.clone(), None).unwrap();
let sequence_column = result.column(sequence_column_index(result.num_columns()));
let sequence_array = sequence_column
.as_any()
.downcast_ref::<UInt64Array>()
@@ -1399,11 +1407,9 @@ mod tests {
format.set_override_sequence(Some(override_sequence));
let override_sequence_array = format.new_override_sequence_array(num_rows).unwrap();
let result = format
.convert_batch(&record_batch, Some(&override_sequence_array))
.convert_batch(record_batch, Some(&override_sequence_array))
.unwrap();
let sequence_column = result.column(
crate::sst::parquet::flat_format::sequence_column_index(result.num_columns()),
);
let sequence_column = result.column(sequence_column_index(result.num_columns()));
let sequence_array = sequence_column
.as_any()
.downcast_ref::<UInt64Array>()
@@ -1412,4 +1418,333 @@ mod tests {
let expected_override = UInt64Array::from(vec![override_sequence; num_rows]);
assert_eq!(sequence_array, &expected_override);
}
#[test]
fn test_need_convert_to_flat() {
let metadata = build_test_region_metadata();
// Test case 1: Same number of columns, no conversion needed
// For flat format: all columns (5) + internal columns (3)
let expected_columns = metadata.column_metadatas.len() + 3;
let result =
FlatReadFormat::need_convert_to_flat("test.parquet", expected_columns, &metadata)
.unwrap();
assert!(
!result,
"Should not need conversion when column counts match"
);
// Test case 2: Different number of columns, need conversion
// Missing primary key columns (2 primary keys in test metadata)
let num_columns_without_pk = expected_columns - metadata.primary_key.len();
let result =
FlatReadFormat::need_convert_to_flat("test.parquet", num_columns_without_pk, &metadata)
.unwrap();
assert!(
result,
"Should need conversion when primary key columns are missing"
);
// Test case 3: Invalid case - actual columns more than expected
let too_many_columns = expected_columns + 1;
let err = FlatReadFormat::need_convert_to_flat("test.parquet", too_many_columns, &metadata)
.unwrap_err();
assert!(err.to_string().contains("Expected columns"), "{err:?}");
// Test case 4: Invalid case - column difference doesn't match primary key count
let wrong_diff_columns = expected_columns - 1; // Difference of 1, but we have 2 primary keys
let err =
FlatReadFormat::need_convert_to_flat("test.parquet", wrong_diff_columns, &metadata)
.unwrap_err();
assert!(
err.to_string().contains("Column number difference"),
"{err:?}"
);
}
fn build_test_dense_pk_array(
codec: &DensePrimaryKeyCodec,
pk_values_per_row: &[&[Option<i64>]],
) -> Arc<PrimaryKeyArray> {
let mut builder = PrimaryKeyArrayBuilder::with_capacity(pk_values_per_row.len(), 1024, 0);
for pk_values_row in pk_values_per_row {
let values: Vec<ValueRef> = pk_values_row
.iter()
.map(|opt| match opt {
Some(val) => ValueRef::Int64(*val),
None => ValueRef::Null,
})
.collect();
let encoded = codec.encode(values.into_iter()).unwrap();
builder.append_value(&encoded);
}
Arc::new(builder.finish())
}
fn build_test_sparse_region_metadata() -> RegionMetadataRef {
let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
builder
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"__table_id",
ConcreteDataType::uint32_datatype(),
false,
),
semantic_type: SemanticType::Tag,
column_id: ReservedColumnId::table_id(),
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"__tsid",
ConcreteDataType::uint64_datatype(),
false,
),
semantic_type: SemanticType::Tag,
column_id: ReservedColumnId::tsid(),
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("tag0", ConcreteDataType::string_datatype(), true),
semantic_type: SemanticType::Tag,
column_id: 1,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("tag1", ConcreteDataType::string_datatype(), true),
semantic_type: SemanticType::Tag,
column_id: 3,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"field1",
ConcreteDataType::int64_datatype(),
true,
),
semantic_type: SemanticType::Field,
column_id: 4,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"field0",
ConcreteDataType::int64_datatype(),
true,
),
semantic_type: SemanticType::Field,
column_id: 2,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
semantic_type: SemanticType::Timestamp,
column_id: 5,
})
.primary_key(vec![
ReservedColumnId::table_id(),
ReservedColumnId::tsid(),
1,
3,
])
.primary_key_encoding(PrimaryKeyEncoding::Sparse);
Arc::new(builder.build().unwrap())
}
fn build_test_sparse_pk_array(
codec: &SparsePrimaryKeyCodec,
pk_values_per_row: &[SparseTestRow],
) -> Arc<PrimaryKeyArray> {
let mut builder = PrimaryKeyArrayBuilder::with_capacity(pk_values_per_row.len(), 1024, 0);
for row in pk_values_per_row {
let values = vec![
(ReservedColumnId::table_id(), ValueRef::UInt32(row.table_id)),
(ReservedColumnId::tsid(), ValueRef::UInt64(row.tsid)),
(1, ValueRef::String(&row.tag0)),
(3, ValueRef::String(&row.tag1)),
];
let mut buffer = Vec::new();
codec.encode_value_refs(&values, &mut buffer).unwrap();
builder.append_value(&buffer);
}
Arc::new(builder.finish())
}
#[derive(Clone)]
struct SparseTestRow {
table_id: u32,
tsid: u64,
tag0: String,
tag1: String,
}
#[test]
fn test_flat_read_format_convert_format_with_dense_encoding() {
let metadata = build_test_region_metadata();
let column_ids: Vec<_> = metadata
.column_metadatas
.iter()
.map(|c| c.column_id)
.collect();
let format = FlatReadFormat::new(metadata.clone(), column_ids.into_iter(), true);
let num_rows = 4;
let original_sequence = 100u64;
// Create primary key values for each row: tag0=1, tag1=1 for all rows
let pk_values_per_row = vec![
&[Some(1i64), Some(1i64)][..]; num_rows // All rows have same primary key values
];
// Create a test record batch in old format using dense encoding
let codec = DensePrimaryKeyCodec::new(&metadata);
let dense_pk_array = build_test_dense_pk_array(&codec, &pk_values_per_row);
let columns: Vec<ArrayRef> = vec![
Arc::new(Int64Array::from(vec![2; num_rows])), // field1
Arc::new(Int64Array::from(vec![3; num_rows])), // field0
Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), // ts
dense_pk_array.clone(), // __primary_key (dense encoding)
Arc::new(UInt64Array::from(vec![original_sequence; num_rows])), // sequence
Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // op type
];
// Create schema for old format (without primary key columns)
let old_format_fields = vec![
Field::new("field1", ArrowDataType::Int64, true),
Field::new("field0", ArrowDataType::Int64, true),
Field::new(
"ts",
ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
false,
),
Field::new(
"__primary_key",
ArrowDataType::Dictionary(
Box::new(ArrowDataType::UInt32),
Box::new(ArrowDataType::Binary),
),
false,
),
Field::new("__sequence", ArrowDataType::UInt64, false),
Field::new("__op_type", ArrowDataType::UInt8, false),
];
let old_schema = Arc::new(Schema::new(old_format_fields));
let record_batch = RecordBatch::try_new(old_schema, columns).unwrap();
// Test conversion with dense encoding
let result = format.convert_batch(record_batch, None).unwrap();
// Construct expected RecordBatch in flat format with decoded primary key columns
let expected_columns: Vec<ArrayRef> = vec![
Arc::new(Int64Array::from(vec![1; num_rows])), // tag0 (decoded from primary key)
Arc::new(Int64Array::from(vec![1; num_rows])), // tag1 (decoded from primary key)
Arc::new(Int64Array::from(vec![2; num_rows])), // field1
Arc::new(Int64Array::from(vec![3; num_rows])), // field0
Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), // ts
dense_pk_array, // __primary_key (preserved)
Arc::new(UInt64Array::from(vec![original_sequence; num_rows])), // sequence
Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // op type
];
let expected_record_batch =
RecordBatch::try_new(build_test_flat_sst_schema(), expected_columns).unwrap();
// Compare the actual result with the expected record batch
assert_eq!(expected_record_batch, result);
}
#[test]
fn test_flat_read_format_convert_format_with_sparse_encoding() {
let metadata = build_test_sparse_region_metadata();
let column_ids: Vec<_> = metadata
.column_metadatas
.iter()
.map(|c| c.column_id)
.collect();
let format = FlatReadFormat::new(metadata.clone(), column_ids.into_iter(), true);
let num_rows = 4;
let original_sequence = 100u64;
// Create sparse test data with table_id, tsid and string tags
let pk_test_rows = vec![
SparseTestRow {
table_id: 1,
tsid: 123,
tag0: "frontend".to_string(),
tag1: "pod1".to_string(),
};
num_rows
];
let codec = SparsePrimaryKeyCodec::new(&metadata);
let sparse_pk_array = build_test_sparse_pk_array(&codec, &pk_test_rows);
// Create a test record batch in old format using sparse encoding
let columns: Vec<ArrayRef> = vec![
Arc::new(Int64Array::from(vec![2; num_rows])), // field1
Arc::new(Int64Array::from(vec![3; num_rows])), // field0
Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), // ts
sparse_pk_array.clone(), // __primary_key (sparse encoding)
Arc::new(UInt64Array::from(vec![original_sequence; num_rows])), // sequence
Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // op type
];
// Create schema for old format (without primary key columns)
let old_format_fields = vec![
Field::new("field1", ArrowDataType::Int64, true),
Field::new("field0", ArrowDataType::Int64, true),
Field::new(
"ts",
ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
false,
),
Field::new(
"__primary_key",
ArrowDataType::Dictionary(
Box::new(ArrowDataType::UInt32),
Box::new(ArrowDataType::Binary),
),
false,
),
Field::new("__sequence", ArrowDataType::UInt64, false),
Field::new("__op_type", ArrowDataType::UInt8, false),
];
let old_schema = Arc::new(Schema::new(old_format_fields));
let record_batch = RecordBatch::try_new(old_schema, columns).unwrap();
// Test conversion with sparse encoding
let result = format.convert_batch(record_batch, None).unwrap();
// Construct expected RecordBatch in flat format with decoded primary key columns
let tag0_array = Arc::new(DictionaryArray::new(
UInt32Array::from(vec![0; num_rows]),
Arc::new(StringArray::from(vec!["frontend"])),
));
let tag1_array = Arc::new(DictionaryArray::new(
UInt32Array::from(vec![0; num_rows]),
Arc::new(StringArray::from(vec!["pod1"])),
));
let expected_columns: Vec<ArrayRef> = vec![
Arc::new(UInt32Array::from(vec![1; num_rows])), // __table_id (decoded from primary key)
Arc::new(UInt64Array::from(vec![123; num_rows])), // __tsid (decoded from primary key)
tag0_array, // tag0 (decoded from primary key)
tag1_array, // tag1 (decoded from primary key)
Arc::new(Int64Array::from(vec![2; num_rows])), // field1
Arc::new(Int64Array::from(vec![3; num_rows])), // field0
Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), // ts
sparse_pk_array, // __primary_key (preserved)
Arc::new(UInt64Array::from(vec![original_sequence; num_rows])), // sequence
Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // op type
];
let expected_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
let expected_record_batch =
RecordBatch::try_new(expected_schema, expected_columns).unwrap();
// Compare the actual result with the expected record batch
assert_eq!(expected_record_batch, result);
}
}