feat: persist our column_id to parquet field_id (#8032)

* feat: persist our column_id to parquet field_id

* refactor: avoid clone field when possible

* chore: fmt

* chore: address style suggestions
This commit is contained in:
Ning Sun
2026-04-30 23:40:24 +08:00
committed by GitHub
parent d7310244a5
commit b8951a3514
8 changed files with 157 additions and 98 deletions

View File

@@ -136,9 +136,9 @@ mod tests {
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test_metric_region/11_0000000001/data/<file_id>.parquet", file_size: 3000, index_file_path: Some("test_metric_region/11_0000000001/data/index/<file_id>.puffin"), index_file_size: Some(0), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(20), origin_region_id: 47244640257(11, 1), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test_metric_region/11_0000000002/data/<file_id>.parquet", file_size: 3000, index_file_path: Some("test_metric_region/11_0000000002/data/index/<file_id>.puffin"), index_file_size: Some(0), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417473(11, 16777217), table_id: 11, region_number: 16777217, region_group: 1, region_sequence: 1, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test_metric_region/11_0000000001/metadata/<file_id>.parquet", file_size: 4000, index_file_path: None, index_file_size: None, num_rows: 8, num_row_groups: 1, num_series: Some(8), min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(8), origin_region_id: 47261417473(11, 16777217), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417474(11, 16777218), table_id: 11, region_number: 16777218, region_group: 1, region_sequence: 2, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test_metric_region/11_0000000002/metadata/<file_id>.parquet", file_size: 3000, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, num_series: Some(4), min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4), origin_region_id: 47261417474(11, 16777218), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417474(11, 16777218), table_id: 11, region_number: 16777218, region_group: 1, region_sequence: 2, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test_metric_region/11_0000000002/metadata/<file_id>.parquet", file_size: 4000, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, num_series: Some(4), min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4), origin_region_id: 47261417474(11, 16777218), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test_metric_region/22_0000000042/data/<file_id>.parquet", file_size: 3000, index_file_path: Some("test_metric_region/22_0000000042/data/index/<file_id>.puffin"), index_file_size: Some(0), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94506057770(22, 16777258), table_id: 22, region_number: 16777258, region_group: 1, region_sequence: 42, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test_metric_region/22_0000000042/metadata/<file_id>.parquet", file_size: 3000, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, num_series: Some(4), min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4), origin_region_id: 94506057770(22, 16777258), node_id: None, visible: true }"#,
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94506057770(22, 16777258), table_id: 22, region_number: 16777258, region_group: 1, region_sequence: 42, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test_metric_region/22_0000000042/metadata/<file_id>.parquet", file_size: 4000, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, num_series: Some(4), min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4), origin_region_id: 94506057770(22, 16777258), node_id: None, visible: true }"#,
);
// list from storage
let storage_entries = mito

View File

@@ -865,9 +865,9 @@ async fn test_cache_null_primary_key_with_format(flat_format: bool) {
#[tokio::test]
async fn test_list_ssts() {
test_list_ssts_with_format(false, r#"
ManifestSstEntry { table_dir: "test/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test/11_0000000001/<file_id>.parquet", file_size: 2513, index_file_path: Some("test/11_0000000001/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640257(11, 1), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test/11_0000000002/<file_id>.parquet", file_size: 2513, index_file_path: Some("test/11_0000000002/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test/22_0000000042/<file_id>.parquet", file_size: 2513, index_file_path: Some("test/22_0000000042/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }"# ,
ManifestSstEntry { table_dir: "test/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test/11_0000000001/<file_id>.parquet", file_size: 2701, index_file_path: Some("test/11_0000000001/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640257(11, 1), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test/11_0000000002/<file_id>.parquet", file_size: 2701, index_file_path: Some("test/11_0000000002/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test/22_0000000042/<file_id>.parquet", file_size: 2701, index_file_path: Some("test/22_0000000042/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }"# ,
r#"
StorageSstEntry { file_path: "test/11_0000000001/<file_id>.parquet", file_size: None, last_modified_ms: None, node_id: None }
StorageSstEntry { file_path: "test/11_0000000001/index/<file_id>.puffin", file_size: None, last_modified_ms: None, node_id: None }
@@ -876,9 +876,9 @@ StorageSstEntry { file_path: "test/11_0000000002/index/<file_id>.puffin", file_s
StorageSstEntry { file_path: "test/22_0000000042/<file_id>.parquet", file_size: None, last_modified_ms: None, node_id: None }
StorageSstEntry { file_path: "test/22_0000000042/index/<file_id>.puffin", file_size: None, last_modified_ms: None, node_id: None }"#).await;
test_list_ssts_with_format(true, r#"
ManifestSstEntry { table_dir: "test/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test/11_0000000001/<file_id>.parquet", file_size: 2837, index_file_path: Some("test/11_0000000001/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640257(11, 1), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test/11_0000000002/<file_id>.parquet", file_size: 2837, index_file_path: Some("test/11_0000000002/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test/22_0000000042/<file_id>.parquet", file_size: 2837, index_file_path: Some("test/22_0000000042/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }"#,
ManifestSstEntry { table_dir: "test/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test/11_0000000001/<file_id>.parquet", file_size: 3099, index_file_path: Some("test/11_0000000001/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640257(11, 1), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test/11_0000000002/<file_id>.parquet", file_size: 3099, index_file_path: Some("test/11_0000000002/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test/22_0000000042/<file_id>.parquet", file_size: 3099, index_file_path: Some("test/22_0000000042/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }"#,
r#"
StorageSstEntry { file_path: "test/11_0000000001/<file_id>.parquet", file_size: None, last_modified_ms: None, node_id: None }
StorageSstEntry { file_path: "test/11_0000000001/index/<file_id>.puffin", file_size: None, last_modified_ms: None, node_id: None }

View File

@@ -35,7 +35,7 @@ use crate::error::{
};
use crate::memtable::BoxedBatchIterator;
use crate::read::Batch;
use crate::sst::{internal_fields, tag_maybe_to_dictionary_field};
use crate::sst::{internal_fields, tag_maybe_to_dictionary_field, with_field_id};
/// Adapts a [`BoxedBatchIterator`] into an `Iterator<Item = Result<RecordBatch>>`
/// producing flat-format record batches.
@@ -212,38 +212,45 @@ fn compute_output_arrow_schema(
if !read_column_id_set.contains(&column_metadata.column_id) {
continue;
}
let field = Arc::new(Field::new(
let field = Field::new(
&column_metadata.column_schema.name,
column_metadata.column_schema.data_type.as_arrow_type(),
column_metadata.column_schema.is_nullable(),
));
let field = if column_metadata.semantic_type == SemanticType::Tag {
tag_maybe_to_dictionary_field(&column_metadata.column_schema.data_type, &field)
);
let field = with_field_id(field, column_metadata.column_id);
if column_metadata.semantic_type == SemanticType::Tag {
fields.push(tag_maybe_to_dictionary_field(
&column_metadata.column_schema.data_type,
&Arc::new(field),
));
} else {
field
};
fields.push(field);
fields.push(Arc::new(field));
}
}
for column_metadata in metadata.field_columns() {
if !read_column_id_set.contains(&column_metadata.column_id) {
continue;
}
let field = Arc::new(Field::new(
let field = Field::new(
&column_metadata.column_schema.name,
column_metadata.column_schema.data_type.as_arrow_type(),
column_metadata.column_schema.is_nullable(),
));
fields.push(field);
);
fields.push(Arc::new(with_field_id(field, column_metadata.column_id)));
}
let time_index = metadata.time_index_column();
let time_index_field = Arc::new(Field::new(
let time_index_field = Field::new(
&time_index.column_schema.name,
time_index.column_schema.data_type.as_arrow_type(),
time_index.column_schema.is_nullable(),
));
fields.push(time_index_field);
);
fields.push(Arc::new(with_field_id(
time_index_field,
time_index.column_id,
)));
fields.extend(internal_fields().iter().cloned());
Arc::new(datatypes::arrow::datatypes::Schema::new(fields))

View File

@@ -45,7 +45,7 @@ use crate::error::{
use crate::read::flat_projection::{FlatProjectionMapper, flat_projected_columns};
use crate::sst::parquet::flat_format::primary_key_column_index;
use crate::sst::parquet::format::{FormatProjection, INTERNAL_COLUMN_NUM, PrimaryKeyArray};
use crate::sst::{internal_fields, tag_maybe_to_dictionary_field};
use crate::sst::{internal_fields, tag_maybe_to_dictionary_field, with_field_id};
/// Returns true if `left` and `right` have same columns and primary key encoding.
pub(crate) fn has_same_columns_and_pk_encoding(
@@ -143,12 +143,19 @@ impl FlatCompatBatch {
let column_field = &expect_metadata.schema.arrow_schema().fields()[column_index];
// For tag columns, we need to create a dictionary field.
if expect_column.semantic_type == SemanticType::Tag {
fields.push(tag_maybe_to_dictionary_field(
let field = tag_maybe_to_dictionary_field(
&expect_column.column_schema.data_type,
column_field,
));
);
fields.push(Arc::new(with_field_id(
(*field).clone(),
expect_column.column_id,
)));
} else {
fields.push(column_field.clone());
fields.push(Arc::new(with_field_id(
(**column_field).clone(),
expect_column.column_id,
)));
};
if let Some((index, actual_data_type)) = actual_schema_index.get(column_id) {

View File

@@ -37,6 +37,7 @@ use crate::sst::parquet::flat_format::sst_column_id_indices;
use crate::sst::parquet::format::FormatProjection;
use crate::sst::{
FlatSchemaOptions, internal_fields, tag_maybe_to_dictionary_field, to_flat_sst_arrow_schema,
with_field_id,
};
/// Handles projection and converts batches in flat format with correct schema.
@@ -395,17 +396,20 @@ pub(crate) fn compute_input_arrow_schema(
let mut new_fields = Vec::with_capacity(batch_schema.len() + 3);
for (column_id, _) in batch_schema {
let column_metadata = metadata.column_by_id(*column_id).unwrap();
let field = Arc::new(Field::new(
let field = Field::new(
&column_metadata.column_schema.name,
column_metadata.column_schema.data_type.as_arrow_type(),
column_metadata.column_schema.is_nullable(),
));
let field = if column_metadata.semantic_type == SemanticType::Tag {
tag_maybe_to_dictionary_field(&column_metadata.column_schema.data_type, &field)
);
let field = with_field_id(field, *column_id);
if column_metadata.semantic_type == SemanticType::Tag {
new_fields.push(tag_maybe_to_dictionary_field(
&column_metadata.column_schema.data_type,
&Arc::new(field),
));
} else {
field
};
new_fields.push(field);
new_fields.push(Arc::new(field));
}
}
new_fields.extend_from_slice(&internal_fields());

View File

@@ -59,6 +59,17 @@ pub enum FormatType {
Flat,
}
/// Iceberg-compatible column field ID key stored in Parquet column metadata.
pub const PARQUET_FIELD_ID_KEY: &str = "PARQUET:field_id";
/// Adds `PARQUET:field_id` metadata to an Arrow field.
pub fn with_field_id(mut field: Field, column_id: u32) -> Field {
field
.metadata_mut()
.insert(PARQUET_FIELD_ID_KEY.to_string(), column_id.to_string());
field
}
/// Gets the arrow schema to store in parquet.
pub fn to_sst_arrow_schema(metadata: &RegionMetadata) -> SchemaRef {
let fields = Fields::from_iter(
@@ -70,13 +81,19 @@ pub fn to_sst_arrow_schema(metadata: &RegionMetadata) -> SchemaRef {
.zip(&metadata.column_metadatas)
.filter_map(|(field, column_meta)| {
if column_meta.semantic_type == SemanticType::Field {
Some(field.clone())
Some(Arc::new(with_field_id(
(**field).clone(),
column_meta.column_id,
)))
} else {
// We have fixed positions for tags (primary key) and time index.
None
}
})
.chain([metadata.time_index_field()])
.chain([Arc::new(with_field_id(
(*metadata.time_index_field()).clone(),
metadata.time_index_column().column_id,
))])
.chain(internal_fields()),
);
@@ -135,13 +152,14 @@ pub fn to_flat_sst_arrow_schema(
if options.raw_pk_columns {
for pk_id in &metadata.primary_key {
let pk_index = metadata.column_index_by_id(*pk_id).unwrap();
let column_id = metadata.column_metadatas[pk_index].column_id;
if options.string_pk_use_dict {
let old_field = &schema.fields[pk_index];
let new_field = tag_maybe_to_dictionary_field(
&metadata.column_metadatas[pk_index].column_schema.data_type,
old_field,
);
fields.push(new_field);
fields.push(Arc::new(with_field_id((*new_field).clone(), column_id)));
}
}
}
@@ -151,12 +169,18 @@ pub fn to_flat_sst_arrow_schema(
.zip(&metadata.column_metadatas)
.filter_map(|(field, column_meta)| {
if column_meta.semantic_type == SemanticType::Field {
Some(field.clone())
Some(Arc::new(with_field_id(
(**field).clone(),
column_meta.column_id,
)))
} else {
None
}
})
.chain([metadata.time_index_field()])
.chain([Arc::new(with_field_id(
(*metadata.time_index_field()).clone(),
metadata.time_index_column().column_id,
))])
.chain(internal_fields());
for field in remaining_fields {
fields.push(field);
@@ -179,12 +203,21 @@ pub fn flat_sst_arrow_schema_column_num(
/// Helper function to create a dictionary field from a field.
fn to_dictionary_field(field: &Field) -> Field {
Field::new_dictionary(
let mut new_field = Field::new_dictionary(
field.name(),
datatypes::arrow::datatypes::DataType::UInt32,
field.data_type().clone(),
field.is_nullable(),
)
);
// retain field_id metadata
if let Some(field_id) = field.metadata().get(PARQUET_FIELD_ID_KEY) {
new_field
.metadata_mut()
.insert(PARQUET_FIELD_ID_KEY.to_string(), field_id.clone());
}
new_field
}
/// Helper function to create a dictionary field from a field if it is a string column.

View File

@@ -57,7 +57,7 @@ use crate::sst::parquet::format::{
};
use crate::sst::{
FlatSchemaOptions, flat_sst_arrow_schema_column_num, tag_maybe_to_dictionary_field,
to_flat_sst_arrow_schema,
to_flat_sst_arrow_schema, with_field_id,
};
/// Helper for writing the SST format.
@@ -769,12 +769,12 @@ impl FlatConvertFormat {
// Builds new schema
let mut new_fields =
Vec::with_capacity(batch.schema().fields().len() + self.projected_primary_keys.len());
for (_, _, column_index) in &self.projected_primary_keys {
for (column_id, _, column_index) in &self.projected_primary_keys {
let column_metadata = &self.metadata.column_metadatas[*column_index];
let old_field = &self.metadata.schema.arrow_schema().fields()[*column_index];
let field =
tag_maybe_to_dictionary_field(&column_metadata.column_schema.data_type, old_field);
new_fields.push(field);
new_fields.push(Arc::new(with_field_id((*field).clone(), *column_id)));
}
new_fields.extend(batch.schema().fields().iter().cloned());

View File

@@ -800,7 +800,7 @@ mod tests {
use crate::sst::parquet::flat_format::{
FlatReadFormat, FlatWriteFormat, sequence_column_index,
};
use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema, with_field_id};
const TEST_SEQUENCE: u64 = 1;
const TEST_OP_TYPE: u8 = OpType::Put as u8;
@@ -851,23 +851,25 @@ mod tests {
fn build_test_arrow_schema() -> SchemaRef {
let fields = vec![
Field::new("field1", ArrowDataType::Int64, true),
Field::new("field0", ArrowDataType::Int64, true),
Field::new(
make_field("field1", ArrowDataType::Int64, true, Some(4)),
make_field("field0", ArrowDataType::Int64, true, Some(2)),
make_field(
"ts",
ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
false,
Some(5),
),
Field::new(
make_field(
"__primary_key",
ArrowDataType::Dictionary(
Box::new(ArrowDataType::UInt32),
Box::new(ArrowDataType::Binary),
),
false,
None,
),
Field::new("__sequence", ArrowDataType::UInt64, false),
Field::new("__op_type", ArrowDataType::UInt8, false),
make_field("__sequence", ArrowDataType::UInt64, false, None),
make_field("__op_type", ArrowDataType::UInt8, false, None),
];
Arc::new(Schema::new(fields))
}
@@ -1066,6 +1068,14 @@ mod tests {
);
}
fn make_field(name: &str, dt: ArrowDataType, nullable: bool, field_id: Option<u32>) -> Field {
let mut field = Field::new(name, dt, nullable);
if let Some(id) = field_id {
field = with_field_id(field, id);
}
field
}
fn build_test_flat_sst_schema() -> SchemaRef {
let fields = vec![
Field::new("tag0", ArrowDataType::Int64, true), // primary key columns first
@@ -1091,11 +1101,37 @@ mod tests {
Arc::new(Schema::new(fields))
}
fn build_test_flat_sst_schema_with_field_ids() -> SchemaRef {
let ids = [
Some(1u32),
Some(3),
Some(4),
Some(2),
Some(5),
None,
None,
None,
];
let fields: Vec<_> = build_test_flat_sst_schema()
.fields()
.iter()
.zip(ids)
.map(|(f, id)| match id {
Some(id) => Arc::new(with_field_id((**f).clone(), id)) as _,
None => f.clone(),
})
.collect();
Arc::new(Schema::new(fields))
}
#[test]
fn test_flat_to_sst_arrow_schema() {
let metadata = build_test_region_metadata();
let format = FlatWriteFormat::new(metadata, &FlatSchemaOptions::default());
assert_eq!(&build_test_flat_sst_schema(), format.arrow_schema());
assert_eq!(
&build_test_flat_sst_schema_with_field_ids(),
format.arrow_schema()
);
}
fn input_columns_for_flat_batch(num_rows: usize) -> Vec<ArrayRef> {
@@ -1118,8 +1154,11 @@ mod tests {
let num_rows = 4;
let columns: Vec<ArrayRef> = input_columns_for_flat_batch(num_rows);
let batch = RecordBatch::try_new(build_test_flat_sst_schema(), columns.clone()).unwrap();
let expect_record = RecordBatch::try_new(build_test_flat_sst_schema(), columns).unwrap();
let batch =
RecordBatch::try_new(build_test_flat_sst_schema_with_field_ids(), columns.clone())
.unwrap();
let expect_record =
RecordBatch::try_new(build_test_flat_sst_schema_with_field_ids(), columns).unwrap();
let actual = format.convert_batch(&batch).unwrap();
assert_eq!(expect_record, actual);
@@ -1133,7 +1172,8 @@ mod tests {
let num_rows = 4;
let columns: Vec<ArrayRef> = input_columns_for_flat_batch(num_rows);
let batch = RecordBatch::try_new(build_test_flat_sst_schema(), columns).unwrap();
let batch =
RecordBatch::try_new(build_test_flat_sst_schema_with_field_ids(), columns).unwrap();
let expected_columns: Vec<ArrayRef> = vec![
Arc::new(Int64Array::from(vec![1; num_rows])), // tag0
@@ -1145,8 +1185,11 @@ mod tests {
Arc::new(UInt64Array::from(vec![415411; num_rows])), // overridden sequence
Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // op type
];
let expected_record =
RecordBatch::try_new(build_test_flat_sst_schema(), expected_columns).unwrap();
let expected_record = RecordBatch::try_new(
build_test_flat_sst_schema_with_field_ids(),
expected_columns,
)
.unwrap();
let actual = format.convert_batch(&batch).unwrap();
assert_eq!(expected_record, actual);
@@ -1432,26 +1475,7 @@ mod tests {
];
// Create schema for old format (without primary key columns)
let old_format_fields = vec![
Field::new("field1", ArrowDataType::Int64, true),
Field::new("field0", ArrowDataType::Int64, true),
Field::new(
"ts",
ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
false,
),
Field::new(
"__primary_key",
ArrowDataType::Dictionary(
Box::new(ArrowDataType::UInt32),
Box::new(ArrowDataType::Binary),
),
false,
),
Field::new("__sequence", ArrowDataType::UInt64, false),
Field::new("__op_type", ArrowDataType::UInt8, false),
];
let old_schema = Arc::new(Schema::new(old_format_fields));
let old_schema = build_test_arrow_schema();
let record_batch = RecordBatch::try_new(old_schema, columns).unwrap();
// Test conversion with dense encoding
@@ -1468,8 +1492,11 @@ mod tests {
Arc::new(UInt64Array::from(vec![original_sequence; num_rows])), // sequence
Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // op type
];
let expected_record_batch =
RecordBatch::try_new(build_test_flat_sst_schema(), expected_columns).unwrap();
let expected_record_batch = RecordBatch::try_new(
build_test_flat_sst_schema_with_field_ids(),
expected_columns,
)
.unwrap();
// Compare the actual result with the expected record batch
assert_eq!(expected_record_batch, result);
@@ -1520,26 +1547,7 @@ mod tests {
];
// Create schema for old format (without primary key columns)
let old_format_fields = vec![
Field::new("field1", ArrowDataType::Int64, true),
Field::new("field0", ArrowDataType::Int64, true),
Field::new(
"ts",
ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
false,
),
Field::new(
"__primary_key",
ArrowDataType::Dictionary(
Box::new(ArrowDataType::UInt32),
Box::new(ArrowDataType::Binary),
),
false,
),
Field::new("__sequence", ArrowDataType::UInt64, false),
Field::new("__op_type", ArrowDataType::UInt8, false),
];
let old_schema = Arc::new(Schema::new(old_format_fields));
let old_schema = build_test_arrow_schema();
let record_batch = RecordBatch::try_new(old_schema, columns).unwrap();
// Test conversion with sparse encoding