diff --git a/src/metric-engine/src/engine/flush.rs b/src/metric-engine/src/engine/flush.rs index 47120555ff..5953859cf0 100644 --- a/src/metric-engine/src/engine/flush.rs +++ b/src/metric-engine/src/engine/flush.rs @@ -136,9 +136,9 @@ mod tests { ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", index_version: 0, level: 0, file_path: "test_metric_region/11_0000000001/data/.parquet", file_size: 3000, index_file_path: Some("test_metric_region/11_0000000001/data/index/.puffin"), index_file_size: Some(0), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(20), origin_region_id: 47244640257(11, 1), node_id: None, visible: true } ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "", index_version: 0, level: 0, file_path: "test_metric_region/11_0000000002/data/.parquet", file_size: 3000, index_file_path: Some("test_metric_region/11_0000000002/data/index/.puffin"), index_file_size: Some(0), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true } ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417473(11, 16777217), table_id: 11, region_number: 16777217, region_group: 1, region_sequence: 1, file_id: "", index_version: 0, level: 0, file_path: "test_metric_region/11_0000000001/metadata/.parquet", file_size: 4000, index_file_path: None, index_file_size: None, num_rows: 8, num_row_groups: 1, num_series: Some(8), min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(8), origin_region_id: 47261417473(11, 16777217), node_id: None, visible: true } -ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417474(11, 16777218), table_id: 11, region_number: 16777218, region_group: 1, region_sequence: 2, file_id: "", index_version: 0, level: 0, file_path: "test_metric_region/11_0000000002/metadata/.parquet", file_size: 3000, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, num_series: Some(4), min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4), origin_region_id: 47261417474(11, 16777218), node_id: None, visible: true } +ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417474(11, 16777218), table_id: 11, region_number: 16777218, region_group: 1, region_sequence: 2, file_id: "", index_version: 0, level: 0, file_path: "test_metric_region/11_0000000002/metadata/.parquet", file_size: 4000, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, num_series: Some(4), min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4), origin_region_id: 47261417474(11, 16777218), node_id: None, visible: true } ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "", index_version: 0, level: 0, file_path: "test_metric_region/22_0000000042/data/.parquet", file_size: 3000, index_file_path: Some("test_metric_region/22_0000000042/data/index/.puffin"), index_file_size: Some(0), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true } -ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94506057770(22, 16777258), table_id: 22, region_number: 16777258, region_group: 1, region_sequence: 42, file_id: "", index_version: 0, level: 0, file_path: "test_metric_region/22_0000000042/metadata/.parquet", file_size: 3000, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, num_series: Some(4), min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4), origin_region_id: 94506057770(22, 16777258), node_id: None, visible: true }"#, +ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94506057770(22, 16777258), table_id: 22, region_number: 16777258, region_group: 1, region_sequence: 42, file_id: "", index_version: 0, level: 0, file_path: "test_metric_region/22_0000000042/metadata/.parquet", file_size: 4000, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, num_series: Some(4), min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4), origin_region_id: 94506057770(22, 16777258), node_id: None, visible: true }"#, ); // list from storage let storage_entries = mito diff --git a/src/mito2/src/engine/basic_test.rs b/src/mito2/src/engine/basic_test.rs index 7b8def8207..8493dc6a36 100644 --- a/src/mito2/src/engine/basic_test.rs +++ b/src/mito2/src/engine/basic_test.rs @@ -865,9 +865,9 @@ async fn test_cache_null_primary_key_with_format(flat_format: bool) { #[tokio::test] async fn test_list_ssts() { test_list_ssts_with_format(false, r#" -ManifestSstEntry { table_dir: "test/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", index_version: 0, level: 0, file_path: "test/11_0000000001/.parquet", file_size: 2513, index_file_path: Some("test/11_0000000001/index/.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640257(11, 1), node_id: None, visible: true } -ManifestSstEntry { table_dir: "test/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "", index_version: 0, level: 0, file_path: "test/11_0000000002/.parquet", file_size: 2513, index_file_path: Some("test/11_0000000002/index/.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true } -ManifestSstEntry { table_dir: "test/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "", index_version: 0, level: 0, file_path: "test/22_0000000042/.parquet", file_size: 2513, index_file_path: Some("test/22_0000000042/index/.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }"# , +ManifestSstEntry { table_dir: "test/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", index_version: 0, level: 0, file_path: "test/11_0000000001/.parquet", file_size: 2701, index_file_path: Some("test/11_0000000001/index/.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640257(11, 1), node_id: None, visible: true } +ManifestSstEntry { table_dir: "test/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "", index_version: 0, level: 0, file_path: "test/11_0000000002/.parquet", file_size: 2701, index_file_path: Some("test/11_0000000002/index/.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true } +ManifestSstEntry { table_dir: "test/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "", index_version: 0, level: 0, file_path: "test/22_0000000042/.parquet", file_size: 2701, index_file_path: Some("test/22_0000000042/index/.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }"# , r#" StorageSstEntry { file_path: "test/11_0000000001/.parquet", file_size: None, last_modified_ms: None, node_id: None } StorageSstEntry { file_path: "test/11_0000000001/index/.puffin", file_size: None, last_modified_ms: None, node_id: None } @@ -876,9 +876,9 @@ StorageSstEntry { file_path: "test/11_0000000002/index/.puffin", file_s StorageSstEntry { file_path: "test/22_0000000042/.parquet", file_size: None, last_modified_ms: None, node_id: None } StorageSstEntry { file_path: "test/22_0000000042/index/.puffin", file_size: None, last_modified_ms: None, node_id: None }"#).await; test_list_ssts_with_format(true, r#" -ManifestSstEntry { table_dir: "test/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", index_version: 0, level: 0, file_path: "test/11_0000000001/.parquet", file_size: 2837, index_file_path: Some("test/11_0000000001/index/.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640257(11, 1), node_id: None, visible: true } -ManifestSstEntry { table_dir: "test/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "", index_version: 0, level: 0, file_path: "test/11_0000000002/.parquet", file_size: 2837, index_file_path: Some("test/11_0000000002/index/.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true } -ManifestSstEntry { table_dir: "test/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "", index_version: 0, level: 0, file_path: "test/22_0000000042/.parquet", file_size: 2837, index_file_path: Some("test/22_0000000042/index/.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }"#, +ManifestSstEntry { table_dir: "test/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", index_version: 0, level: 0, file_path: "test/11_0000000001/.parquet", file_size: 3099, index_file_path: Some("test/11_0000000001/index/.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640257(11, 1), node_id: None, visible: true } +ManifestSstEntry { table_dir: "test/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "", index_version: 0, level: 0, file_path: "test/11_0000000002/.parquet", file_size: 3099, index_file_path: Some("test/11_0000000002/index/.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true } +ManifestSstEntry { table_dir: "test/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "", index_version: 0, level: 0, file_path: "test/22_0000000042/.parquet", file_size: 3099, index_file_path: Some("test/22_0000000042/index/.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }"#, r#" StorageSstEntry { file_path: "test/11_0000000001/.parquet", file_size: None, last_modified_ms: None, node_id: None } StorageSstEntry { file_path: "test/11_0000000001/index/.puffin", file_size: None, last_modified_ms: None, node_id: None } diff --git a/src/mito2/src/read/batch_adapter.rs b/src/mito2/src/read/batch_adapter.rs index 4698229c5b..ddb6c8d6bd 100644 --- a/src/mito2/src/read/batch_adapter.rs +++ b/src/mito2/src/read/batch_adapter.rs @@ -35,7 +35,7 @@ use crate::error::{ }; use crate::memtable::BoxedBatchIterator; use crate::read::Batch; -use crate::sst::{internal_fields, tag_maybe_to_dictionary_field}; +use crate::sst::{internal_fields, tag_maybe_to_dictionary_field, with_field_id}; /// Adapts a [`BoxedBatchIterator`] into an `Iterator>` /// producing flat-format record batches. @@ -212,38 +212,45 @@ fn compute_output_arrow_schema( if !read_column_id_set.contains(&column_metadata.column_id) { continue; } - let field = Arc::new(Field::new( + let field = Field::new( &column_metadata.column_schema.name, column_metadata.column_schema.data_type.as_arrow_type(), column_metadata.column_schema.is_nullable(), - )); - let field = if column_metadata.semantic_type == SemanticType::Tag { - tag_maybe_to_dictionary_field(&column_metadata.column_schema.data_type, &field) + ); + let field = with_field_id(field, column_metadata.column_id); + + if column_metadata.semantic_type == SemanticType::Tag { + fields.push(tag_maybe_to_dictionary_field( + &column_metadata.column_schema.data_type, + &Arc::new(field), + )); } else { - field - }; - fields.push(field); + fields.push(Arc::new(field)); + } } for column_metadata in metadata.field_columns() { if !read_column_id_set.contains(&column_metadata.column_id) { continue; } - let field = Arc::new(Field::new( + let field = Field::new( &column_metadata.column_schema.name, column_metadata.column_schema.data_type.as_arrow_type(), column_metadata.column_schema.is_nullable(), - )); - fields.push(field); + ); + fields.push(Arc::new(with_field_id(field, column_metadata.column_id))); } let time_index = metadata.time_index_column(); - let time_index_field = Arc::new(Field::new( + let time_index_field = Field::new( &time_index.column_schema.name, time_index.column_schema.data_type.as_arrow_type(), time_index.column_schema.is_nullable(), - )); - fields.push(time_index_field); + ); + fields.push(Arc::new(with_field_id( + time_index_field, + time_index.column_id, + ))); fields.extend(internal_fields().iter().cloned()); Arc::new(datatypes::arrow::datatypes::Schema::new(fields)) diff --git a/src/mito2/src/read/compat.rs b/src/mito2/src/read/compat.rs index 4577d7fe4e..d6aa5c52a0 100644 --- a/src/mito2/src/read/compat.rs +++ b/src/mito2/src/read/compat.rs @@ -45,7 +45,7 @@ use crate::error::{ use crate::read::flat_projection::{FlatProjectionMapper, flat_projected_columns}; use crate::sst::parquet::flat_format::primary_key_column_index; use crate::sst::parquet::format::{FormatProjection, INTERNAL_COLUMN_NUM, PrimaryKeyArray}; -use crate::sst::{internal_fields, tag_maybe_to_dictionary_field}; +use crate::sst::{internal_fields, tag_maybe_to_dictionary_field, with_field_id}; /// Returns true if `left` and `right` have same columns and primary key encoding. pub(crate) fn has_same_columns_and_pk_encoding( @@ -143,12 +143,19 @@ impl FlatCompatBatch { let column_field = &expect_metadata.schema.arrow_schema().fields()[column_index]; // For tag columns, we need to create a dictionary field. if expect_column.semantic_type == SemanticType::Tag { - fields.push(tag_maybe_to_dictionary_field( + let field = tag_maybe_to_dictionary_field( &expect_column.column_schema.data_type, column_field, - )); + ); + fields.push(Arc::new(with_field_id( + (*field).clone(), + expect_column.column_id, + ))); } else { - fields.push(column_field.clone()); + fields.push(Arc::new(with_field_id( + (**column_field).clone(), + expect_column.column_id, + ))); }; if let Some((index, actual_data_type)) = actual_schema_index.get(column_id) { diff --git a/src/mito2/src/read/flat_projection.rs b/src/mito2/src/read/flat_projection.rs index d73bb5a205..294cd8bbb0 100644 --- a/src/mito2/src/read/flat_projection.rs +++ b/src/mito2/src/read/flat_projection.rs @@ -37,6 +37,7 @@ use crate::sst::parquet::flat_format::sst_column_id_indices; use crate::sst::parquet::format::FormatProjection; use crate::sst::{ FlatSchemaOptions, internal_fields, tag_maybe_to_dictionary_field, to_flat_sst_arrow_schema, + with_field_id, }; /// Handles projection and converts batches in flat format with correct schema. @@ -395,17 +396,20 @@ pub(crate) fn compute_input_arrow_schema( let mut new_fields = Vec::with_capacity(batch_schema.len() + 3); for (column_id, _) in batch_schema { let column_metadata = metadata.column_by_id(*column_id).unwrap(); - let field = Arc::new(Field::new( + let field = Field::new( &column_metadata.column_schema.name, column_metadata.column_schema.data_type.as_arrow_type(), column_metadata.column_schema.is_nullable(), - )); - let field = if column_metadata.semantic_type == SemanticType::Tag { - tag_maybe_to_dictionary_field(&column_metadata.column_schema.data_type, &field) + ); + let field = with_field_id(field, *column_id); + if column_metadata.semantic_type == SemanticType::Tag { + new_fields.push(tag_maybe_to_dictionary_field( + &column_metadata.column_schema.data_type, + &Arc::new(field), + )); } else { - field - }; - new_fields.push(field); + new_fields.push(Arc::new(field)); + } } new_fields.extend_from_slice(&internal_fields()); diff --git a/src/mito2/src/sst.rs b/src/mito2/src/sst.rs index c769f78c6c..1007b57668 100644 --- a/src/mito2/src/sst.rs +++ b/src/mito2/src/sst.rs @@ -59,6 +59,17 @@ pub enum FormatType { Flat, } +/// Iceberg-compatible column field ID key stored in Parquet column metadata. +pub const PARQUET_FIELD_ID_KEY: &str = "PARQUET:field_id"; + +/// Adds `PARQUET:field_id` metadata to an Arrow field. +pub fn with_field_id(mut field: Field, column_id: u32) -> Field { + field + .metadata_mut() + .insert(PARQUET_FIELD_ID_KEY.to_string(), column_id.to_string()); + field +} + /// Gets the arrow schema to store in parquet. pub fn to_sst_arrow_schema(metadata: &RegionMetadata) -> SchemaRef { let fields = Fields::from_iter( @@ -70,13 +81,19 @@ pub fn to_sst_arrow_schema(metadata: &RegionMetadata) -> SchemaRef { .zip(&metadata.column_metadatas) .filter_map(|(field, column_meta)| { if column_meta.semantic_type == SemanticType::Field { - Some(field.clone()) + Some(Arc::new(with_field_id( + (**field).clone(), + column_meta.column_id, + ))) } else { // We have fixed positions for tags (primary key) and time index. None } }) - .chain([metadata.time_index_field()]) + .chain([Arc::new(with_field_id( + (*metadata.time_index_field()).clone(), + metadata.time_index_column().column_id, + ))]) .chain(internal_fields()), ); @@ -135,13 +152,14 @@ pub fn to_flat_sst_arrow_schema( if options.raw_pk_columns { for pk_id in &metadata.primary_key { let pk_index = metadata.column_index_by_id(*pk_id).unwrap(); + let column_id = metadata.column_metadatas[pk_index].column_id; if options.string_pk_use_dict { let old_field = &schema.fields[pk_index]; let new_field = tag_maybe_to_dictionary_field( &metadata.column_metadatas[pk_index].column_schema.data_type, old_field, ); - fields.push(new_field); + fields.push(Arc::new(with_field_id((*new_field).clone(), column_id))); } } } @@ -151,12 +169,18 @@ pub fn to_flat_sst_arrow_schema( .zip(&metadata.column_metadatas) .filter_map(|(field, column_meta)| { if column_meta.semantic_type == SemanticType::Field { - Some(field.clone()) + Some(Arc::new(with_field_id( + (**field).clone(), + column_meta.column_id, + ))) } else { None } }) - .chain([metadata.time_index_field()]) + .chain([Arc::new(with_field_id( + (*metadata.time_index_field()).clone(), + metadata.time_index_column().column_id, + ))]) .chain(internal_fields()); for field in remaining_fields { fields.push(field); @@ -179,12 +203,21 @@ pub fn flat_sst_arrow_schema_column_num( /// Helper function to create a dictionary field from a field. fn to_dictionary_field(field: &Field) -> Field { - Field::new_dictionary( + let mut new_field = Field::new_dictionary( field.name(), datatypes::arrow::datatypes::DataType::UInt32, field.data_type().clone(), field.is_nullable(), - ) + ); + + // retain field_id metadata + if let Some(field_id) = field.metadata().get(PARQUET_FIELD_ID_KEY) { + new_field + .metadata_mut() + .insert(PARQUET_FIELD_ID_KEY.to_string(), field_id.clone()); + } + + new_field } /// Helper function to create a dictionary field from a field if it is a string column. diff --git a/src/mito2/src/sst/parquet/flat_format.rs b/src/mito2/src/sst/parquet/flat_format.rs index 958dc0c8b2..e23d6c85cc 100644 --- a/src/mito2/src/sst/parquet/flat_format.rs +++ b/src/mito2/src/sst/parquet/flat_format.rs @@ -57,7 +57,7 @@ use crate::sst::parquet::format::{ }; use crate::sst::{ FlatSchemaOptions, flat_sst_arrow_schema_column_num, tag_maybe_to_dictionary_field, - to_flat_sst_arrow_schema, + to_flat_sst_arrow_schema, with_field_id, }; /// Helper for writing the SST format. @@ -769,12 +769,12 @@ impl FlatConvertFormat { // Builds new schema let mut new_fields = Vec::with_capacity(batch.schema().fields().len() + self.projected_primary_keys.len()); - for (_, _, column_index) in &self.projected_primary_keys { + for (column_id, _, column_index) in &self.projected_primary_keys { let column_metadata = &self.metadata.column_metadatas[*column_index]; let old_field = &self.metadata.schema.arrow_schema().fields()[*column_index]; let field = tag_maybe_to_dictionary_field(&column_metadata.column_schema.data_type, old_field); - new_fields.push(field); + new_fields.push(Arc::new(with_field_id((*field).clone(), *column_id))); } new_fields.extend(batch.schema().fields().iter().cloned()); diff --git a/src/mito2/src/sst/parquet/format.rs b/src/mito2/src/sst/parquet/format.rs index a5c5fa68ab..e8b2cfe647 100644 --- a/src/mito2/src/sst/parquet/format.rs +++ b/src/mito2/src/sst/parquet/format.rs @@ -800,7 +800,7 @@ mod tests { use crate::sst::parquet::flat_format::{ FlatReadFormat, FlatWriteFormat, sequence_column_index, }; - use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema}; + use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema, with_field_id}; const TEST_SEQUENCE: u64 = 1; const TEST_OP_TYPE: u8 = OpType::Put as u8; @@ -851,23 +851,25 @@ mod tests { fn build_test_arrow_schema() -> SchemaRef { let fields = vec![ - Field::new("field1", ArrowDataType::Int64, true), - Field::new("field0", ArrowDataType::Int64, true), - Field::new( + make_field("field1", ArrowDataType::Int64, true, Some(4)), + make_field("field0", ArrowDataType::Int64, true, Some(2)), + make_field( "ts", ArrowDataType::Timestamp(TimeUnit::Millisecond, None), false, + Some(5), ), - Field::new( + make_field( "__primary_key", ArrowDataType::Dictionary( Box::new(ArrowDataType::UInt32), Box::new(ArrowDataType::Binary), ), false, + None, ), - Field::new("__sequence", ArrowDataType::UInt64, false), - Field::new("__op_type", ArrowDataType::UInt8, false), + make_field("__sequence", ArrowDataType::UInt64, false, None), + make_field("__op_type", ArrowDataType::UInt8, false, None), ]; Arc::new(Schema::new(fields)) } @@ -1066,6 +1068,14 @@ mod tests { ); } + fn make_field(name: &str, dt: ArrowDataType, nullable: bool, field_id: Option) -> Field { + let mut field = Field::new(name, dt, nullable); + if let Some(id) = field_id { + field = with_field_id(field, id); + } + field + } + fn build_test_flat_sst_schema() -> SchemaRef { let fields = vec![ Field::new("tag0", ArrowDataType::Int64, true), // primary key columns first @@ -1091,11 +1101,37 @@ mod tests { Arc::new(Schema::new(fields)) } + fn build_test_flat_sst_schema_with_field_ids() -> SchemaRef { + let ids = [ + Some(1u32), + Some(3), + Some(4), + Some(2), + Some(5), + None, + None, + None, + ]; + let fields: Vec<_> = build_test_flat_sst_schema() + .fields() + .iter() + .zip(ids) + .map(|(f, id)| match id { + Some(id) => Arc::new(with_field_id((**f).clone(), id)) as _, + None => f.clone(), + }) + .collect(); + Arc::new(Schema::new(fields)) + } + #[test] fn test_flat_to_sst_arrow_schema() { let metadata = build_test_region_metadata(); let format = FlatWriteFormat::new(metadata, &FlatSchemaOptions::default()); - assert_eq!(&build_test_flat_sst_schema(), format.arrow_schema()); + assert_eq!( + &build_test_flat_sst_schema_with_field_ids(), + format.arrow_schema() + ); } fn input_columns_for_flat_batch(num_rows: usize) -> Vec { @@ -1118,8 +1154,11 @@ mod tests { let num_rows = 4; let columns: Vec = input_columns_for_flat_batch(num_rows); - let batch = RecordBatch::try_new(build_test_flat_sst_schema(), columns.clone()).unwrap(); - let expect_record = RecordBatch::try_new(build_test_flat_sst_schema(), columns).unwrap(); + let batch = + RecordBatch::try_new(build_test_flat_sst_schema_with_field_ids(), columns.clone()) + .unwrap(); + let expect_record = + RecordBatch::try_new(build_test_flat_sst_schema_with_field_ids(), columns).unwrap(); let actual = format.convert_batch(&batch).unwrap(); assert_eq!(expect_record, actual); @@ -1133,7 +1172,8 @@ mod tests { let num_rows = 4; let columns: Vec = input_columns_for_flat_batch(num_rows); - let batch = RecordBatch::try_new(build_test_flat_sst_schema(), columns).unwrap(); + let batch = + RecordBatch::try_new(build_test_flat_sst_schema_with_field_ids(), columns).unwrap(); let expected_columns: Vec = vec![ Arc::new(Int64Array::from(vec![1; num_rows])), // tag0 @@ -1145,8 +1185,11 @@ mod tests { Arc::new(UInt64Array::from(vec![415411; num_rows])), // overridden sequence Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // op type ]; - let expected_record = - RecordBatch::try_new(build_test_flat_sst_schema(), expected_columns).unwrap(); + let expected_record = RecordBatch::try_new( + build_test_flat_sst_schema_with_field_ids(), + expected_columns, + ) + .unwrap(); let actual = format.convert_batch(&batch).unwrap(); assert_eq!(expected_record, actual); @@ -1432,26 +1475,7 @@ mod tests { ]; // Create schema for old format (without primary key columns) - let old_format_fields = vec![ - Field::new("field1", ArrowDataType::Int64, true), - Field::new("field0", ArrowDataType::Int64, true), - Field::new( - "ts", - ArrowDataType::Timestamp(TimeUnit::Millisecond, None), - false, - ), - Field::new( - "__primary_key", - ArrowDataType::Dictionary( - Box::new(ArrowDataType::UInt32), - Box::new(ArrowDataType::Binary), - ), - false, - ), - Field::new("__sequence", ArrowDataType::UInt64, false), - Field::new("__op_type", ArrowDataType::UInt8, false), - ]; - let old_schema = Arc::new(Schema::new(old_format_fields)); + let old_schema = build_test_arrow_schema(); let record_batch = RecordBatch::try_new(old_schema, columns).unwrap(); // Test conversion with dense encoding @@ -1468,8 +1492,11 @@ mod tests { Arc::new(UInt64Array::from(vec![original_sequence; num_rows])), // sequence Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // op type ]; - let expected_record_batch = - RecordBatch::try_new(build_test_flat_sst_schema(), expected_columns).unwrap(); + let expected_record_batch = RecordBatch::try_new( + build_test_flat_sst_schema_with_field_ids(), + expected_columns, + ) + .unwrap(); // Compare the actual result with the expected record batch assert_eq!(expected_record_batch, result); @@ -1520,26 +1547,7 @@ mod tests { ]; // Create schema for old format (without primary key columns) - let old_format_fields = vec![ - Field::new("field1", ArrowDataType::Int64, true), - Field::new("field0", ArrowDataType::Int64, true), - Field::new( - "ts", - ArrowDataType::Timestamp(TimeUnit::Millisecond, None), - false, - ), - Field::new( - "__primary_key", - ArrowDataType::Dictionary( - Box::new(ArrowDataType::UInt32), - Box::new(ArrowDataType::Binary), - ), - false, - ), - Field::new("__sequence", ArrowDataType::UInt64, false), - Field::new("__op_type", ArrowDataType::UInt8, false), - ]; - let old_schema = Arc::new(Schema::new(old_format_fields)); + let old_schema = build_test_arrow_schema(); let record_batch = RecordBatch::try_new(old_schema, columns).unwrap(); // Test conversion with sparse encoding