diff --git a/src/mito2/src/engine/basic_test.rs b/src/mito2/src/engine/basic_test.rs index 38b4df7a9e..39f2366659 100644 --- a/src/mito2/src/engine/basic_test.rs +++ b/src/mito2/src/engine/basic_test.rs @@ -972,17 +972,17 @@ async fn test_list_ssts_with_format( #[tokio::test] async fn test_all_index_metas_list_all_types() { test_all_index_metas_list_all_types_with_format(false, r#" -PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", index_file_size: Some(6032), index_type: "bloom_filter", target_type: "column", target_key: "3", target_json: "{\"column\":3}", blob_size: 751, meta_json: Some("{\"bloom\":{\"bloom_filter_size\":640,\"row_count\":20,\"rows_per_segment\":2,\"segment_count\":10}}"), node_id: None } -PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", index_file_size: Some(6032), index_type: "fulltext_bloom", target_type: "column", target_key: "1", target_json: "{\"column\":1}", blob_size: 87, meta_json: Some("{\"bloom\":{\"bloom_filter_size\":64,\"row_count\":20,\"rows_per_segment\":4,\"segment_count\":5},\"fulltext\":{\"analyzer\":\"English\",\"case_sensitive\":false}}"), node_id: None } -PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", index_file_size: Some(6032), index_type: "fulltext_tantivy", target_type: "column", target_key: "2", target_json: "{\"column\":2}", blob_size: 1104, meta_json: Some("{\"fulltext\":{\"analyzer\":\"Chinese\",\"case_sensitive\":true}}"), node_id: None } -PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", index_file_size: Some(6032), index_type: "inverted", target_type: "column", target_key: "0", target_json: "{\"column\":0}", blob_size: 70, meta_json: Some("{\"inverted\":{\"base_offset\":0,\"bitmap_type\":\"Roaring\",\"fst_size\":44,\"inverted_index_size\":70,\"null_bitmap_size\":8,\"relative_fst_offset\":26,\"relative_null_bitmap_offset\":0,\"segment_row_count\":1024,\"total_row_count\":20}}"), node_id: None } -PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", index_file_size: Some(6032), index_type: "inverted", target_type: "column", target_key: "4", target_json: "{\"column\":4}", blob_size: 515, meta_json: Some("{\"inverted\":{\"base_offset\":0,\"bitmap_type\":\"Roaring\",\"fst_size\":147,\"inverted_index_size\":515,\"null_bitmap_size\":8,\"relative_fst_offset\":368,\"relative_null_bitmap_offset\":0,\"segment_row_count\":1024,\"total_row_count\":20}}"), node_id: None }"#).await; +PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", index_file_size: Some(6500), index_type: "bloom_filter", target_type: "column", target_key: "1", target_json: "{\"column\":1}", blob_size: 751, meta_json: Some("{\"bloom\":{\"bloom_filter_size\":640,\"row_count\":20,\"rows_per_segment\":2,\"segment_count\":10}}"), node_id: None } +PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", index_file_size: Some(6500), index_type: "fulltext_bloom", target_type: "column", target_key: "4", target_json: "{\"column\":4}", blob_size: 89, meta_json: Some("{\"bloom\":{\"bloom_filter_size\":64,\"row_count\":20,\"rows_per_segment\":4,\"segment_count\":5},\"fulltext\":{\"analyzer\":\"English\",\"case_sensitive\":false}}"), node_id: None } +PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", index_file_size: Some(6500), index_type: "fulltext_tantivy", target_type: "column", target_key: "5", target_json: "{\"column\":5}", blob_size: 1100, meta_json: Some("{\"fulltext\":{\"analyzer\":\"Chinese\",\"case_sensitive\":true}}"), node_id: None } +PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", index_file_size: Some(6500), index_type: "inverted", target_type: "column", target_key: "1", target_json: "{\"column\":1}", blob_size: 518, meta_json: Some("{\"inverted\":{\"base_offset\":0,\"bitmap_type\":\"Roaring\",\"fst_size\":150,\"inverted_index_size\":518,\"null_bitmap_size\":8,\"relative_fst_offset\":368,\"relative_null_bitmap_offset\":0,\"segment_row_count\":1024,\"total_row_count\":20}}"), node_id: None } +PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", index_file_size: Some(6500), index_type: "inverted", target_type: "column", target_key: "2", target_json: "{\"column\":2}", blob_size: 515, meta_json: Some("{\"inverted\":{\"base_offset\":0,\"bitmap_type\":\"Roaring\",\"fst_size\":147,\"inverted_index_size\":515,\"null_bitmap_size\":8,\"relative_fst_offset\":368,\"relative_null_bitmap_offset\":0,\"segment_row_count\":1024,\"total_row_count\":20}}"), node_id: None }"#).await; test_all_index_metas_list_all_types_with_format(true, r#" -PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", index_file_size: Some(6144), index_type: "bloom_filter", target_type: "column", target_key: "3", target_json: "{\"column\":3}", blob_size: 751, meta_json: Some("{\"bloom\":{\"bloom_filter_size\":640,\"row_count\":20,\"rows_per_segment\":2,\"segment_count\":10}}"), node_id: None } -PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", index_file_size: Some(6144), index_type: "fulltext_bloom", target_type: "column", target_key: "1", target_json: "{\"column\":1}", blob_size: 89, meta_json: Some("{\"bloom\":{\"bloom_filter_size\":64,\"row_count\":20,\"rows_per_segment\":4,\"segment_count\":5},\"fulltext\":{\"analyzer\":\"English\",\"case_sensitive\":false}}"), node_id: None } -PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", index_file_size: Some(6144), index_type: "fulltext_tantivy", target_type: "column", target_key: "2", target_json: "{\"column\":2}", blob_size: 1104, meta_json: Some("{\"fulltext\":{\"analyzer\":\"Chinese\",\"case_sensitive\":true}}"), node_id: None } -PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", index_file_size: Some(6144), index_type: "inverted", target_type: "column", target_key: "0", target_json: "{\"column\":0}", blob_size: 92, meta_json: Some("{\"inverted\":{\"base_offset\":0,\"bitmap_type\":\"Roaring\",\"fst_size\":66,\"inverted_index_size\":92,\"null_bitmap_size\":8,\"relative_fst_offset\":26,\"relative_null_bitmap_offset\":0,\"segment_row_count\":1024,\"total_row_count\":20}}"), node_id: None } -PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", index_file_size: Some(6144), index_type: "inverted", target_type: "column", target_key: "4", target_json: "{\"column\":4}", blob_size: 515, meta_json: Some("{\"inverted\":{\"base_offset\":0,\"bitmap_type\":\"Roaring\",\"fst_size\":147,\"inverted_index_size\":515,\"null_bitmap_size\":8,\"relative_fst_offset\":368,\"relative_null_bitmap_offset\":0,\"segment_row_count\":1024,\"total_row_count\":20}}"), node_id: None }"#).await; +PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", index_file_size: Some(6500), index_type: "bloom_filter", target_type: "column", target_key: "1", target_json: "{\"column\":1}", blob_size: 751, meta_json: Some("{\"bloom\":{\"bloom_filter_size\":640,\"row_count\":20,\"rows_per_segment\":2,\"segment_count\":10}}"), node_id: None } +PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", index_file_size: Some(6500), index_type: "fulltext_bloom", target_type: "column", target_key: "4", target_json: "{\"column\":4}", blob_size: 89, meta_json: Some("{\"bloom\":{\"bloom_filter_size\":64,\"row_count\":20,\"rows_per_segment\":4,\"segment_count\":5},\"fulltext\":{\"analyzer\":\"English\",\"case_sensitive\":false}}"), node_id: None } +PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", index_file_size: Some(6500), index_type: "fulltext_tantivy", target_type: "column", target_key: "5", target_json: "{\"column\":5}", blob_size: 1100, meta_json: Some("{\"fulltext\":{\"analyzer\":\"Chinese\",\"case_sensitive\":true}}"), node_id: None } +PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", index_file_size: Some(6500), index_type: "inverted", target_type: "column", target_key: "1", target_json: "{\"column\":1}", blob_size: 518, meta_json: Some("{\"inverted\":{\"base_offset\":0,\"bitmap_type\":\"Roaring\",\"fst_size\":150,\"inverted_index_size\":518,\"null_bitmap_size\":8,\"relative_fst_offset\":368,\"relative_null_bitmap_offset\":0,\"segment_row_count\":1024,\"total_row_count\":20}}"), node_id: None } +PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", index_file_size: Some(6500), index_type: "inverted", target_type: "column", target_key: "2", target_json: "{\"column\":2}", blob_size: 515, meta_json: Some("{\"inverted\":{\"base_offset\":0,\"bitmap_type\":\"Roaring\",\"fst_size\":147,\"inverted_index_size\":515,\"null_bitmap_size\":8,\"relative_fst_offset\":368,\"relative_null_bitmap_offset\":0,\"segment_row_count\":1024,\"total_row_count\":20}}"), node_id: None }"#).await; } async fn test_all_index_metas_list_all_types_with_format(flat_format: bool, expect_format: &str) { @@ -1001,12 +1001,33 @@ async fn test_all_index_metas_list_all_types_with_format(flat_format: bool, expe // One region with both fulltext backends and inverted index enabled, plus bloom skipping index let region_id = RegionId::new(11, 1); - let mut request = CreateRequestBuilder::new().tag_num(3).field_num(2).build(); - // inverted index on tag_0 - request.column_metadatas[0] + let mut request = CreateRequestBuilder::new().tag_num(1).field_num(2).build(); + // bloom filter skipping index on field_1 + let skipping = SkippingIndexOptions::new_unchecked(2, 0.01, SkippingIndexType::BloomFilter); + request.column_metadatas[1] + .column_schema + .set_skipping_options(&skipping) + .unwrap(); + + // inverted index on field_1 + request.column_metadatas[2] .column_schema .set_inverted_index(true); - // fulltext bloom on tag_1 + // inverted index on tag_0 + request.column_metadatas[1] + .column_schema + .set_inverted_index(true); + + request.column_metadatas.push(ColumnMetadata { + column_schema: ColumnSchema::new( + "field_2".to_string(), + ConcreteDataType::string_datatype(), + true, + ), + semantic_type: SemanticType::Field, + column_id: 4, + }); + // fulltext bloom on field_2 let ft_bloom = FulltextOptions::new_unchecked( true, FulltextAnalyzer::English, @@ -1015,11 +1036,24 @@ async fn test_all_index_metas_list_all_types_with_format(flat_format: bool, expe 4, 0.001, ); - request.column_metadatas[1] + request + .column_metadatas + .last_mut() + .unwrap() .column_schema .set_fulltext_options(&ft_bloom) .unwrap(); - // fulltext tantivy on tag_2 + + request.column_metadatas.push(ColumnMetadata { + column_schema: ColumnSchema::new( + "field_3".to_string(), + ConcreteDataType::string_datatype(), + true, + ), + semantic_type: SemanticType::Field, + column_id: 5, + }); + // fulltext tantivy on field_3 let ft_tantivy = FulltextOptions::new_unchecked( true, FulltextAnalyzer::Chinese, @@ -1028,28 +1062,20 @@ async fn test_all_index_metas_list_all_types_with_format(flat_format: bool, expe 2, 0.01, ); - request.column_metadatas[2] + request + .column_metadatas + .last_mut() + .unwrap() .column_schema .set_fulltext_options(&ft_tantivy) .unwrap(); - // bloom filter skipping index on field_1 (which is at index 3) - let skipping = SkippingIndexOptions::new_unchecked(2, 0.01, SkippingIndexType::BloomFilter); - request.column_metadatas[3] - .column_schema - .set_skipping_options(&skipping) - .unwrap(); - - // inverted index on field_1 - request.column_metadatas[4] - .column_schema - .set_inverted_index(true); engine .handle_request(region_id, RegionRequest::Create(request.clone())) .await .unwrap(); - // write some rows (schema: tag_0, tag_1, tag_2, field_0, field_1, ts) + // write some rows (schema: tag_0, field_0, field_1, field_2, field_3, ts) let column_schemas = rows_schema(&request); let rows_vec: Vec = (0..20) .map(|ts| api::v1::Row { @@ -1057,12 +1083,6 @@ async fn test_all_index_metas_list_all_types_with_format(flat_format: bool, expe api::v1::Value { value_data: Some(api::v1::value::ValueData::StringValue("x".to_string())), }, - api::v1::Value { - value_data: Some(api::v1::value::ValueData::StringValue("y".to_string())), - }, - api::v1::Value { - value_data: Some(api::v1::value::ValueData::StringValue("z".to_string())), - }, api::v1::Value { value_data: Some(api::v1::value::ValueData::F64Value(ts as f64)), }, @@ -1074,6 +1094,12 @@ async fn test_all_index_metas_list_all_types_with_format(flat_format: bool, expe ts as i64 * 1000, )), }, + api::v1::Value { + value_data: Some(api::v1::value::ValueData::StringValue("y".to_string())), + }, + api::v1::Value { + value_data: Some(api::v1::value::ValueData::StringValue("z".to_string())), + }, ], }) .collect(); @@ -1095,7 +1121,7 @@ async fn test_all_index_metas_list_all_types_with_format(flat_format: bool, expe .unwrap(); fn bucket_size(size: u64) -> u64 { - if size < 512 { size } else { (size / 16) * 16 } + if size < 512 { size } else { (size / 100) * 100 } } let mut metas = engine.all_index_metas().await; @@ -1125,5 +1151,5 @@ async fn test_all_index_metas_list_all_types_with_format(flat_format: bool, expe .map(|entry| format!("\n{:?}", entry)) .collect::(); - assert_eq!(debug_format, expect_format); + assert_eq!(expect_format, debug_format); } diff --git a/src/mito2/src/memtable/bulk/part_reader.rs b/src/mito2/src/memtable/bulk/part_reader.rs index b14ff05dfe..5578018a8d 100644 --- a/src/mito2/src/memtable/bulk/part_reader.rs +++ b/src/mito2/src/memtable/bulk/part_reader.rs @@ -13,12 +13,10 @@ // limitations under the License. use std::collections::VecDeque; -use std::ops::BitAnd; use std::sync::Arc; use bytes::Bytes; use datatypes::arrow::array::BooleanArray; -use datatypes::arrow::buffer::BooleanBuffer; use datatypes::arrow::record_batch::RecordBatch; use parquet::arrow::ProjectionMask; use parquet::arrow::arrow_reader::ParquetRecordBatchReader; @@ -30,7 +28,7 @@ use crate::error::{self, ComputeArrowSnafu, DecodeArrowRowGroupSnafu}; use crate::memtable::bulk::context::{BulkIterContext, BulkIterContextRef}; use crate::memtable::bulk::row_group_reader::MemtableRowGroupReaderBuilder; use crate::sst::parquet::flat_format::sequence_column_index; -use crate::sst::parquet::reader::{MaybeFilter, RowGroupReaderContext}; +use crate::sst::parquet::reader::RowGroupReaderContext; /// Iterator for reading data inside a bulk part. pub struct EncodedBulkPartIter { @@ -191,38 +189,13 @@ fn apply_combined_filters( let num_rows = record_batch.num_rows(); let mut combined_filter = None; - // First, apply predicate filters. + // First, apply predicate filters using the shared method. if !context.base.filters.is_empty() { - let num_rows = record_batch.num_rows(); - let mut mask = BooleanBuffer::new_set(num_rows); - - // Run filter one by one and combine them result, similar to RangeBase::precise_filter - for filter_ctx in &context.base.filters { - let filter = match filter_ctx.filter() { - MaybeFilter::Filter(f) => f, - // Column matches. - MaybeFilter::Matched => continue, - // Column doesn't match, filter the entire batch. - MaybeFilter::Pruned => return Ok(None), - }; - - // Safety: We checked the format type in new(). - let Some(column_index) = context - .read_format() - .as_flat() - .unwrap() - .projected_index_by_id(filter_ctx.column_id()) - else { - continue; - }; - let array = record_batch.column(column_index); - let result = filter - .evaluate_array(array) - .context(crate::error::RecordBatchSnafu)?; - - mask = mask.bitand(&result); - } - // Convert the mask to BooleanArray + let predicate_mask = context.base.compute_filter_mask_flat(&record_batch)?; + // If predicate filters out the entire batch, return None early + let Some(mask) = predicate_mask else { + return Ok(None); + }; combined_filter = Some(BooleanArray::from(mask)); } diff --git a/src/mito2/src/read/compat.rs b/src/mito2/src/read/compat.rs index f3fb2f6c96..8bc24a4953 100644 --- a/src/mito2/src/read/compat.rs +++ b/src/mito2/src/read/compat.rs @@ -386,7 +386,8 @@ impl FlatCompatBatch { /// Repeats the vector value `to_len` times. fn repeat_vector(vector: &VectorRef, to_len: usize, is_tag: bool) -> Result { assert_eq!(1, vector.len()); - if is_tag { + let data_type = vector.data_type(); + if is_tag && data_type.is_string() { let values = vector.to_arrow_array(); if values.is_null(0) { // Creates a dictionary array with `to_len` null keys. diff --git a/src/mito2/src/read/flat_projection.rs b/src/mito2/src/read/flat_projection.rs index ddad8e772f..23257ef649 100644 --- a/src/mito2/src/read/flat_projection.rs +++ b/src/mito2/src/read/flat_projection.rs @@ -48,6 +48,8 @@ pub struct FlatProjectionMapper { /// Ids of columns to project. It keeps ids in the same order as the `projection` /// indices to build the mapper. /// The mapper won't deduplicate the column ids. + /// + /// Note that this doesn't contain the `__table_id` and `__tsid`. column_ids: Vec, /// Ids and DataTypes of columns of the expected batch. /// We can use this to check if the batch is compatible with the expected schema. diff --git a/src/mito2/src/sst/index.rs b/src/mito2/src/sst/index.rs index 20e8cb66d3..8ad7f6ef01 100644 --- a/src/mito2/src/sst/index.rs +++ b/src/mito2/src/sst/index.rs @@ -26,10 +26,13 @@ use std::sync::Arc; use bloom_filter::creator::BloomFilterIndexer; use common_telemetry::{debug, info, warn}; +use datatypes::arrow::array::BinaryArray; use datatypes::arrow::record_batch::RecordBatch; +use mito_codec::index::IndexValuesCodec; +use mito_codec::row_converter::CompositeValues; use puffin_manager::SstPuffinManager; use smallvec::{SmallVec, smallvec}; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; use statistics::{ByteCount, RowCount}; use store_api::metadata::RegionMetadataRef; use store_api::storage::{ColumnId, FileId, RegionId}; @@ -40,7 +43,7 @@ use crate::access_layer::{AccessLayerRef, FilePathProvider, OperationType, Regio use crate::cache::file_cache::{FileType, IndexKey}; use crate::cache::write_cache::{UploadTracker, WriteCacheRef}; use crate::config::{BloomFilterConfig, FulltextIndexConfig, InvertedIndexConfig}; -use crate::error::{BuildIndexAsyncSnafu, Error, Result}; +use crate::error::{BuildIndexAsyncSnafu, DecodeSnafu, Error, InvalidRecordBatchSnafu, Result}; use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList}; use crate::metrics::INDEX_CREATE_MEMORY_USAGE; use crate::read::{Batch, BatchReader}; @@ -57,6 +60,8 @@ use crate::sst::index::fulltext_index::creator::FulltextIndexer; use crate::sst::index::intermediate::IntermediateManager; use crate::sst::index::inverted_index::creator::InvertedIndexer; use crate::sst::parquet::SstInfo; +use crate::sst::parquet::flat_format::primary_key_column_index; +use crate::sst::parquet::format::PrimaryKeyArray; pub(crate) const TYPE_INVERTED_INDEX: &str = "inverted_index"; pub(crate) const TYPE_FULLTEXT_INDEX: &str = "fulltext_index"; @@ -698,6 +703,56 @@ impl IndexBuildScheduler { } } +/// Decodes primary keys from a flat format RecordBatch. +/// Returns a list of (decoded_pk_value, count) tuples where count is the number of occurrences. +pub(crate) fn decode_primary_keys_with_counts( + batch: &RecordBatch, + codec: &IndexValuesCodec, +) -> Result> { + let primary_key_index = primary_key_column_index(batch.num_columns()); + let pk_dict_array = batch + .column(primary_key_index) + .as_any() + .downcast_ref::() + .context(InvalidRecordBatchSnafu { + reason: "Primary key column is not a dictionary array", + })?; + let pk_values_array = pk_dict_array + .values() + .as_any() + .downcast_ref::() + .context(InvalidRecordBatchSnafu { + reason: "Primary key values are not binary array", + })?; + let keys = pk_dict_array.keys(); + + // Decodes primary keys and count consecutive occurrences + let mut result: Vec<(CompositeValues, usize)> = Vec::new(); + let mut prev_key: Option = None; + + for i in 0..keys.len() { + let current_key = keys.value(i); + + // Checks if current key is the same as previous key + if let Some(prev) = prev_key + && prev == current_key + { + // Safety: We already have a key in the result vector. + result.last_mut().unwrap().1 += 1; + continue; + } + + // New key, decodes it. + let pk_bytes = pk_values_array.value(current_key as usize); + let decoded_value = codec.decoder().decode(pk_bytes).context(DecodeSnafu)?; + + result.push((decoded_value, 1)); + prev_key = Some(current_key); + } + + Ok(result) +} + #[cfg(test)] mod tests { use std::sync::Arc; diff --git a/src/mito2/src/sst/index/bloom_filter/creator.rs b/src/mito2/src/sst/index/bloom_filter/creator.rs index a48898902f..0d16a21d7c 100644 --- a/src/mito2/src/sst/index/bloom_filter/creator.rs +++ b/src/mito2/src/sst/index/bloom_filter/creator.rs @@ -16,6 +16,7 @@ use std::collections::HashMap; use std::sync::Arc; use std::sync::atomic::AtomicUsize; +use api::v1::SemanticType; use common_telemetry::{debug, warn}; use datatypes::arrow::record_batch::RecordBatch; use datatypes::schema::SkippingIndexType; @@ -23,9 +24,10 @@ use datatypes::vectors::Helper; use index::bloom_filter::creator::BloomFilterCreator; use index::target::IndexTarget; use mito_codec::index::{IndexValueCodec, IndexValuesCodec}; -use mito_codec::row_converter::SortField; +use mito_codec::row_converter::{CompositeValues, SortField}; use puffin::puffin_manager::{PuffinWriter, PutOptions}; use snafu::{ResultExt, ensure}; +use store_api::codec::PrimaryKeyEncoding; use store_api::metadata::RegionMetadataRef; use store_api::storage::{ColumnId, FileId}; use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt}; @@ -35,13 +37,13 @@ use crate::error::{ OperateAbortedIndexSnafu, PuffinAddBlobSnafu, PushBloomFilterValueSnafu, Result, }; use crate::read::Batch; -use crate::sst::index::TYPE_BLOOM_FILTER_INDEX; use crate::sst::index::bloom_filter::INDEX_BLOB_TYPE; use crate::sst::index::intermediate::{ IntermediateLocation, IntermediateManager, TempFileProvider, }; use crate::sst::index::puffin_manager::SstPuffinWriter; use crate::sst::index::statistics::{ByteCount, RowCount, Statistics}; +use crate::sst::index::{TYPE_BLOOM_FILTER_INDEX, decode_primary_keys_with_counts}; /// The buffer size for the pipe used to send index data to the puffin blob. const PIPE_BUFFER_SIZE_FOR_SENDING_BLOB: usize = 8192; @@ -289,47 +291,81 @@ impl BloomFilterIndexer { let n = batch.num_rows(); guard.inc_row_count(n); + let is_sparse = self.metadata.primary_key_encoding == PrimaryKeyEncoding::Sparse; + let mut decoded_pks: Option> = None; + for (col_id, creator) in &mut self.creators { - // Get the column name from metadata - if let Some(column_meta) = self.metadata.column_by_id(*col_id) { - let column_name = &column_meta.column_schema.name; + // Safety: `creators` are created from the metadata so it won't be None. + let column_meta = self.metadata.column_by_id(*col_id).unwrap(); + let column_name = &column_meta.column_schema.name; + if let Some(column_array) = batch.column_by_name(column_name) { + // Convert Arrow array to VectorRef + let vector = Helper::try_into_vector(column_array.clone()) + .context(crate::error::ConvertVectorSnafu)?; + let sort_field = SortField::new(vector.data_type()); - // Find the column in the RecordBatch by name - if let Some(column_array) = batch.column_by_name(column_name) { - // Convert Arrow array to VectorRef - let vector = Helper::try_into_vector(column_array.clone()) - .context(crate::error::ConvertVectorSnafu)?; - let sort_field = SortField::new(vector.data_type()); + for i in 0..n { + let value = vector.get_ref(i); + let elems = (!value.is_null()) + .then(|| { + let mut buf = vec![]; + IndexValueCodec::encode_nonnull_value(value, &sort_field, &mut buf) + .context(EncodeSnafu)?; + Ok(buf) + }) + .transpose()?; - for i in 0..n { - let value = vector.get_ref(i); - let elems = (!value.is_null()) - .then(|| { - let mut buf = vec![]; - IndexValueCodec::encode_nonnull_value(value, &sort_field, &mut buf) - .context(EncodeSnafu)?; - Ok(buf) - }) - .transpose()?; + creator + .push_row_elems(elems) + .await + .context(PushBloomFilterValueSnafu)?; + } + } else if is_sparse && column_meta.semantic_type == SemanticType::Tag { + // Column not found in batch, tries to decode from primary keys for sparse encoding. + if decoded_pks.is_none() { + decoded_pks = Some(decode_primary_keys_with_counts(batch, &self.codec)?); + } - creator - .push_row_elems(elems) - .await - .context(PushBloomFilterValueSnafu)?; - } - } else { + let pk_values_with_counts = decoded_pks.as_ref().unwrap(); + let Some(col_info) = self.codec.pk_col_info(*col_id) else { debug!( - "Column {} not found in the batch during building bloom filter index", + "Column {} not found in primary key during building bloom filter index", column_name ); - // Push empty elements to maintain alignment - for _ in 0..n { - creator - .push_row_elems(None) - .await - .context(PushBloomFilterValueSnafu)?; - } + continue; + }; + let pk_index = col_info.idx; + let field = &col_info.field; + for (decoded, count) in pk_values_with_counts { + let value = match decoded { + CompositeValues::Dense(dense) => dense.get(pk_index).map(|v| &v.1), + CompositeValues::Sparse(sparse) => sparse.get(col_id), + }; + + let elems = value + .filter(|v| !v.is_null()) + .map(|v| { + let mut buf = vec![]; + IndexValueCodec::encode_nonnull_value( + v.as_value_ref(), + field, + &mut buf, + ) + .context(EncodeSnafu)?; + Ok(buf) + }) + .transpose()?; + + creator + .push_n_row_elems(*count, elems) + .await + .context(PushBloomFilterValueSnafu)?; } + } else { + debug!( + "Column {} not found in the batch during building bloom filter index", + column_name + ); } } diff --git a/src/mito2/src/sst/index/fulltext_index/creator.rs b/src/mito2/src/sst/index/fulltext_index/creator.rs index 15e8870441..2efa154ec4 100644 --- a/src/mito2/src/sst/index/fulltext_index/creator.rs +++ b/src/mito2/src/sst/index/fulltext_index/creator.rs @@ -16,6 +16,7 @@ use std::collections::HashMap; use std::sync::Arc; use std::sync::atomic::AtomicUsize; +use api::v1::SemanticType; use common_telemetry::warn; use datatypes::arrow::array::{Array, LargeStringArray, StringArray}; use datatypes::arrow::datatypes::DataType; @@ -69,6 +70,17 @@ impl FulltextIndexer { let mut creators = HashMap::new(); for column in &metadata.column_metadatas { + // Tag columns don't support fulltext index now. + // If we need to support fulltext index for tag columns, we also need to parse + // the codec and handle sparse encoding for flat format specially. + if column.semantic_type == SemanticType::Tag { + common_telemetry::debug!( + "Skip creating fulltext index for tag column {}", + column.column_schema.name + ); + continue; + } + let options = column .column_schema .fulltext_options() diff --git a/src/mito2/src/sst/index/inverted_index/creator.rs b/src/mito2/src/sst/index/inverted_index/creator.rs index b7019422f8..f31cfaf1dc 100644 --- a/src/mito2/src/sst/index/inverted_index/creator.rs +++ b/src/mito2/src/sst/index/inverted_index/creator.rs @@ -17,6 +17,7 @@ use std::num::NonZeroUsize; use std::sync::Arc; use std::sync::atomic::AtomicUsize; +use api::v1::SemanticType; use common_telemetry::{debug, warn}; use datatypes::arrow::record_batch::RecordBatch; use datatypes::vectors::Helper; @@ -26,9 +27,10 @@ use index::inverted_index::create::sort_create::SortIndexCreator; use index::inverted_index::format::writer::InvertedIndexBlobWriter; use index::target::IndexTarget; use mito_codec::index::{IndexValueCodec, IndexValuesCodec}; -use mito_codec::row_converter::SortField; +use mito_codec::row_converter::{CompositeValues, SortField}; use puffin::puffin_manager::{PuffinWriter, PutOptions}; use snafu::{ResultExt, ensure}; +use store_api::codec::PrimaryKeyEncoding; use store_api::metadata::RegionMetadataRef; use store_api::storage::{ColumnId, FileId}; use tokio::io::duplex; @@ -39,13 +41,13 @@ use crate::error::{ PushIndexValueSnafu, Result, }; use crate::read::Batch; -use crate::sst::index::TYPE_INVERTED_INDEX; use crate::sst::index::intermediate::{ IntermediateLocation, IntermediateManager, TempFileProvider, }; use crate::sst::index::inverted_index::INDEX_BLOB_TYPE; use crate::sst::index::puffin_manager::SstPuffinWriter; use crate::sst::index::statistics::{ByteCount, RowCount, Statistics}; +use crate::sst::index::{TYPE_INVERTED_INDEX, decode_primary_keys_with_counts}; /// The minimum memory usage threshold for one column. const MIN_MEMORY_USAGE_THRESHOLD_PER_COLUMN: usize = 1024 * 1024; // 1MB @@ -78,9 +80,6 @@ pub struct InvertedIndexer { /// Region metadata for column lookups. metadata: RegionMetadataRef, - /// Cache for mapping indexed column positions to their indices in the RecordBatch. - /// Aligns with indexed_column_ids. Initialized lazily when first batch is processed. - column_index_cache: Option>>, } impl InvertedIndexer { @@ -130,7 +129,6 @@ impl InvertedIndexer { memory_usage, indexed_column_ids, metadata: metadata.clone(), - column_index_cache: None, } } @@ -170,29 +168,29 @@ impl InvertedIndexer { } async fn do_update_flat(&mut self, batch: &RecordBatch) -> Result<()> { - // Initialize column index cache if not already done - if self.column_index_cache.is_none() { - self.initialize_column_index_cache(batch); - } - let mut guard = self.stats.record_update(); - let n = batch.num_rows(); - guard.inc_row_count(n); + guard.inc_row_count(batch.num_rows()); - let column_indices = self.column_index_cache.as_ref().unwrap(); + let is_sparse = self.metadata.primary_key_encoding == PrimaryKeyEncoding::Sparse; + let mut decoded_pks: Option> = None; - for ((col_id, target_key), &column_index) in - self.indexed_column_ids.iter().zip(column_indices.iter()) - { - if let Some(index) = column_index { - let column_array = batch.column(index); + for (col_id, target_key) in &self.indexed_column_ids { + let Some(column_meta) = self.metadata.column_by_id(*col_id) else { + debug!( + "Column {} not found in the metadata during building inverted index", + col_id + ); + continue; + }; + let column_name = &column_meta.column_schema.name; + if let Some(column_array) = batch.column_by_name(column_name) { // Convert Arrow array to VectorRef using Helper let vector = Helper::try_into_vector(column_array.clone()) .context(crate::error::ConvertVectorSnafu)?; let sort_field = SortField::new(vector.data_type()); - for row in 0..n { + for row in 0..batch.num_rows() { self.value_buf.clear(); let value_ref = vector.get_ref(row); @@ -214,6 +212,47 @@ impl InvertedIndexer { .context(PushIndexValueSnafu)?; } } + } else if is_sparse && column_meta.semantic_type == SemanticType::Tag { + // Column not found in batch, tries to decode from primary keys for sparse encoding. + if decoded_pks.is_none() { + decoded_pks = Some(decode_primary_keys_with_counts(batch, &self.codec)?); + } + + let pk_values_with_counts = decoded_pks.as_ref().unwrap(); + let Some(col_info) = self.codec.pk_col_info(*col_id) else { + debug!( + "Column {} not found in primary key during building bloom filter index", + column_name + ); + continue; + }; + let pk_index = col_info.idx; + let field = &col_info.field; + for (decoded, count) in pk_values_with_counts { + let value = match decoded { + CompositeValues::Dense(dense) => dense.get(pk_index).map(|v| &v.1), + CompositeValues::Sparse(sparse) => sparse.get(col_id), + }; + + let elem = value + .filter(|v| !v.is_null()) + .map(|v| { + self.value_buf.clear(); + IndexValueCodec::encode_nonnull_value( + v.as_value_ref(), + field, + &mut self.value_buf, + ) + .context(EncodeSnafu)?; + Ok(self.value_buf.as_slice()) + }) + .transpose()?; + + self.index_creator + .push_with_name_n(target_key, elem, *count) + .await + .context(PushIndexValueSnafu)?; + } } else { debug!( "Column {} not found in the batch during building inverted index", @@ -225,26 +264,6 @@ impl InvertedIndexer { Ok(()) } - /// Initializes the column index cache by mapping indexed column ids to their positions in the RecordBatch. - fn initialize_column_index_cache(&mut self, batch: &RecordBatch) { - let mut column_indices = Vec::with_capacity(self.indexed_column_ids.len()); - - for (col_id, _) in &self.indexed_column_ids { - let column_index = if let Some(column_meta) = self.metadata.column_by_id(*col_id) { - let column_name = &column_meta.column_schema.name; - batch - .schema() - .column_with_name(column_name) - .map(|(index, _)| index) - } else { - None - }; - column_indices.push(column_index); - } - - self.column_index_cache = Some(column_indices); - } - /// Finishes index creation and cleans up garbage. /// Returns the number of rows and bytes written. pub async fn finish( diff --git a/src/mito2/src/sst/parquet/file_range.rs b/src/mito2/src/sst/parquet/file_range.rs index d216f1e132..268391135b 100644 --- a/src/mito2/src/sst/parquet/file_range.rs +++ b/src/mito2/src/sst/parquet/file_range.rs @@ -15,18 +15,20 @@ //! Structs and functions for reading ranges from a parquet file. A file range //! is usually a row group in a parquet file. +use std::collections::HashMap; use std::ops::BitAnd; use std::sync::Arc; use api::v1::{OpType, SemanticType}; use common_telemetry::error; -use datatypes::arrow::array::BooleanArray; +use datatypes::arrow::array::{ArrayRef, BooleanArray}; use datatypes::arrow::buffer::BooleanBuffer; use datatypes::arrow::record_batch::RecordBatch; use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec}; use parquet::arrow::arrow_reader::RowSelection; use snafu::{OptionExt, ResultExt}; -use store_api::storage::TimeSeriesRowSelector; +use store_api::codec::PrimaryKeyEncoding; +use store_api::storage::{ColumnId, TimeSeriesRowSelector}; use crate::error::{ ComputeArrowSnafu, DataTypeMismatchSnafu, DecodeSnafu, DecodeStatsSnafu, RecordBatchSnafu, @@ -37,11 +39,11 @@ use crate::read::compat::CompatBatch; use crate::read::last_row::RowGroupLastRowCachedReader; use crate::read::prune::{FlatPruneReader, PruneReader}; use crate::sst::file::FileHandle; +use crate::sst::parquet::flat_format::{DecodedPrimaryKeys, decode_primary_keys}; use crate::sst::parquet::format::ReadFormat; use crate::sst::parquet::reader::{ FlatRowGroupReader, MaybeFilter, RowGroupReader, RowGroupReaderBuilder, SimpleFilterContext, }; - /// A range of a parquet SST. Now it is a row group. /// We can read different file ranges in parallel. #[derive(Clone)] @@ -357,7 +359,34 @@ impl RangeBase { } /// Filters the input RecordBatch by the pushed down predicate and returns RecordBatch. + /// + /// It assumes all necessary tags are already decoded from the primary key. pub(crate) fn precise_filter_flat(&self, input: RecordBatch) -> Result> { + let mask = self.compute_filter_mask_flat(&input)?; + + // If mask is None, the entire batch is filtered out + let Some(mask) = mask else { + return Ok(None); + }; + + let filtered_batch = + datatypes::arrow::compute::filter_record_batch(&input, &BooleanArray::from(mask)) + .context(ComputeArrowSnafu)?; + + if filtered_batch.num_rows() > 0 { + Ok(Some(filtered_batch)) + } else { + Ok(None) + } + } + + /// Computes the filter mask for the input RecordBatch based on pushed down predicates. + /// + /// Returns `None` if the entire batch is filtered out, otherwise returns the boolean mask. + pub(crate) fn compute_filter_mask_flat( + &self, + input: &RecordBatch, + ) -> Result> { let mut mask = BooleanBuffer::new_set(input.num_rows()); let flat_format = self @@ -367,6 +396,11 @@ impl RangeBase { reason: "Expected flat format for precise_filter_flat", })?; + // Decodes primary keys once if we have any tag filters not in projection + let mut decoded_pks: Option = None; + // Cache decoded tag arrays by column id to avoid redundant decoding + let mut decoded_tag_cache: HashMap = HashMap::new(); + // Run filter one by one and combine them result for filter_ctx in &self.filters { let filter = match filter_ctx.filter() { @@ -383,20 +417,53 @@ impl RangeBase { let column = &input.columns()[idx]; let result = filter.evaluate_array(column).context(RecordBatchSnafu)?; mask = mask.bitand(&result); - } else { - // Column not found in projection, continue - continue; + } else if filter_ctx.semantic_type() == SemanticType::Tag { + // Column not found in projection, it may be a tag column. + // Decodes primary keys if not already decoded. + if decoded_pks.is_none() { + decoded_pks = Some(decode_primary_keys(self.codec.as_ref(), input)?); + } + + let metadata = flat_format.metadata(); + let column_id = filter_ctx.column_id(); + + // Check cache first + let tag_column = if let Some(cached_column) = decoded_tag_cache.get(&column_id) { + cached_column.clone() + } else { + // For dense encoding, we need pk_index. For sparse encoding, pk_index is None. + let pk_index = if self.codec.encoding() == PrimaryKeyEncoding::Sparse { + None + } else { + metadata.primary_key_index(column_id) + }; + let column_index = metadata.column_index_by_id(column_id); + + if let (Some(column_index), Some(decoded)) = + (column_index, decoded_pks.as_ref()) + { + let column_metadata = &metadata.column_metadatas[column_index]; + let tag_column = decoded.get_tag_column( + column_id, + pk_index, + &column_metadata.column_schema.data_type, + )?; + // Cache the decoded tag column + decoded_tag_cache.insert(column_id, tag_column.clone()); + tag_column + } else { + continue; + } + }; + + let result = filter + .evaluate_array(&tag_column) + .context(RecordBatchSnafu)?; + mask = mask.bitand(&result); } + // Non-tag column not found in projection. } - let filtered_batch = - datatypes::arrow::compute::filter_record_batch(&input, &BooleanArray::from(mask)) - .context(ComputeArrowSnafu)?; - - if filtered_batch.num_rows() > 0 { - Ok(Some(filtered_batch)) - } else { - Ok(None) - } + Ok(Some(mask)) } } diff --git a/src/mito2/src/sst/parquet/flat_format.rs b/src/mito2/src/sst/parquet/flat_format.rs index bcf1d8694c..cd13dfea01 100644 --- a/src/mito2/src/sst/parquet/flat_format.rs +++ b/src/mito2/src/sst/parquet/flat_format.rs @@ -127,7 +127,9 @@ pub(crate) fn op_type_column_index(num_columns: usize) -> usize { num_columns - 1 } -// TODO(yingwen): Add an option to skip reading internal columns. +// TODO(yingwen): Add an option to skip reading internal columns if the region is +// append only and doesn't use sparse encoding (We need to check the table id under +// sparse encoding). /// Helper for reading the flat SST format with projection. /// /// It only supports flat format that stores primary keys additionally. @@ -528,6 +530,125 @@ pub(crate) fn sst_column_id_indices(metadata: &RegionMetadata) -> HashMap Result { + let primary_key_index = primary_key_column_index(batch.num_columns()); + let pk_dict_array = batch + .column(primary_key_index) + .as_any() + .downcast_ref::() + .with_context(|| InvalidRecordBatchSnafu { + reason: "Primary key column is not a dictionary array".to_string(), + })?; + let pk_values_array = pk_dict_array + .values() + .as_any() + .downcast_ref::() + .with_context(|| InvalidRecordBatchSnafu { + reason: "Primary key values are not binary array".to_string(), + })?; + + let keys = pk_dict_array.keys(); + + // Decodes primary key values by iterating through keys, reusing decoded values for duplicate keys. + // Maps original key index -> new decoded value index + let mut key_to_decoded_index = Vec::with_capacity(keys.len()); + let mut decoded_pk_values = Vec::new(); + let mut prev_key: Option = None; + + // The parquet reader may read the whole dictionary page into the dictionary values, so + // we may decode many primary keys not in this batch if we decode the values array directly. + for i in 0..keys.len() { + let current_key = keys.value(i); + + // Check if current key is the same as previous key + if let Some(prev) = prev_key + && prev == current_key + { + // Reuse the last decoded index + key_to_decoded_index.push((decoded_pk_values.len() - 1) as u32); + continue; + } + + // New key, decodes the value + let pk_bytes = pk_values_array.value(current_key as usize); + let decoded_value = codec.decode(pk_bytes).context(DecodeSnafu)?; + + decoded_pk_values.push(decoded_value); + key_to_decoded_index.push((decoded_pk_values.len() - 1) as u32); + prev_key = Some(current_key); + } + + // Create the keys array from key_to_decoded_index + let keys_array = UInt32Array::from(key_to_decoded_index); + + Ok(DecodedPrimaryKeys { + decoded_pk_values, + keys_array, + }) +} + +/// Holds decoded primary key values and their indices. +pub(crate) struct DecodedPrimaryKeys { + /// Decoded primary key values for unique keys in the dictionary. + decoded_pk_values: Vec, + /// Prebuilt keys array for creating dictionary arrays. + keys_array: UInt32Array, +} + +impl DecodedPrimaryKeys { + /// Gets a tag column array by column id and data type. + /// + /// For sparse encoding, uses column_id to lookup values. + /// For dense encoding, uses pk_index to get values. + pub(crate) fn get_tag_column( + &self, + column_id: ColumnId, + pk_index: Option, + column_type: &ConcreteDataType, + ) -> Result { + // Gets values from the primary key. + let mut builder = column_type.create_mutable_vector(self.decoded_pk_values.len()); + for decoded in &self.decoded_pk_values { + match decoded { + CompositeValues::Dense(dense) => { + let pk_idx = pk_index.expect("pk_index required for dense encoding"); + if pk_idx < dense.len() { + builder.push_value_ref(&dense[pk_idx].1.as_value_ref()); + } else { + builder.push_null(); + } + } + CompositeValues::Sparse(sparse) => { + let value = sparse.get_or_null(column_id); + builder.push_value_ref(&value.as_value_ref()); + } + }; + } + + let values_vector = builder.to_vector(); + let values_array = values_vector.to_arrow_array(); + + // Only creates dictionary array for string types, otherwise take values by keys + if column_type.is_string() { + // Creates dictionary array using the same keys for string types + // Note that the dictionary values may have nulls. + let dict_array = DictionaryArray::new(self.keys_array.clone(), values_array); + Ok(Arc::new(dict_array)) + } else { + // For non-string types, takes values by keys indices to create a regular array + let taken_array = + take(&values_array, &self.keys_array, None).context(ComputeArrowSnafu)?; + Ok(taken_array) + } + } +} + /// Converts a batch that doesn't have decoded primary key columns into a batch that has decoded /// primary key columns in flat format. pub(crate) struct FlatConvertFormat { @@ -577,53 +698,22 @@ impl FlatConvertFormat { /// Converts a batch to have decoded primary key columns in flat format. /// - /// The primary key array in the batch is a dictionary array. We decode each value which is a - /// primary key and reuse the keys array to build a dictionary array for each tag column. - /// The decoded columns are inserted in front of other columns. + /// The primary key array in the batch is a dictionary array. pub(crate) fn convert(&self, batch: RecordBatch) -> Result { if self.projected_primary_keys.is_empty() { return Ok(batch); } - let primary_key_index = primary_key_column_index(batch.num_columns()); - let pk_dict_array = batch - .column(primary_key_index) - .as_any() - .downcast_ref::() - .with_context(|| InvalidRecordBatchSnafu { - reason: "Primary key column is not a dictionary array".to_string(), - })?; - - let pk_values_array = pk_dict_array - .values() - .as_any() - .downcast_ref::() - .with_context(|| InvalidRecordBatchSnafu { - reason: "Primary key values are not binary array".to_string(), - })?; - - // Decodes all primary key values - let mut decoded_pk_values = Vec::with_capacity(pk_values_array.len()); - for i in 0..pk_values_array.len() { - if pk_values_array.is_null(i) { - decoded_pk_values.push(None); - } else { - let pk_bytes = pk_values_array.value(i); - let decoded = self.codec.decode(pk_bytes).context(DecodeSnafu)?; - decoded_pk_values.push(Some(decoded)); - } - } + let decoded_pks = decode_primary_keys(self.codec.as_ref(), &batch)?; // Builds decoded tag column arrays. let mut decoded_columns = Vec::new(); for (column_id, pk_index, column_index) in &self.projected_primary_keys { let column_metadata = &self.metadata.column_metadatas[*column_index]; - let tag_column = self.build_primary_key_column( + let tag_column = decoded_pks.get_tag_column( *column_id, - *pk_index, + Some(*pk_index), &column_metadata.column_schema.data_type, - pk_dict_array.keys(), - &decoded_pk_values, )?; decoded_columns.push(tag_column); } @@ -648,57 +738,6 @@ impl FlatConvertFormat { let new_schema = Arc::new(Schema::new(new_fields)); RecordBatch::try_new(new_schema, new_columns).context(NewRecordBatchSnafu) } - - /// Builds an array for a specific tag column. - /// - /// It may build a dictionary array if the type is string. Note that the dictionary - /// array may have null values, although keys are not null. - fn build_primary_key_column( - &self, - column_id: ColumnId, - pk_index: usize, - column_type: &ConcreteDataType, - keys: &UInt32Array, - decoded_pk_values: &[Option], - ) -> Result { - // Gets values from the primary key. - let mut builder = column_type.create_mutable_vector(decoded_pk_values.len()); - for decoded_opt in decoded_pk_values { - match decoded_opt { - Some(decoded) => { - match decoded { - CompositeValues::Dense(dense) => { - if pk_index < dense.len() { - builder.push_value_ref(&dense[pk_index].1.as_value_ref()); - } else { - builder.push_null(); - } - } - CompositeValues::Sparse(sparse) => { - let value = sparse.get_or_null(column_id); - builder.push_value_ref(&value.as_value_ref()); - } - }; - } - None => builder.push_null(), - } - } - - let values_vector = builder.to_vector(); - let values_array = values_vector.to_arrow_array(); - - // Only creates dictionary array for string types, otherwise take values by keys - if column_type.is_string() { - // Creates dictionary array using the same keys for string types - // Note that the dictionary values may have nulls. - let dict_array = DictionaryArray::new(keys.clone(), values_array); - Ok(Arc::new(dict_array)) - } else { - // For non-string types, takes values by keys indices to create a regular array - let taken_array = take(&values_array, keys, None).context(ComputeArrowSnafu)?; - Ok(taken_array) - } - } } #[cfg(test)] diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index d02786455e..60cf654380 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -1397,6 +1397,7 @@ impl FlatRowGroupReader { let record_batch = batch_result.context(ArrowReaderSnafu { path: self.context.file_path(), })?; + // Safety: Only flat format use FlatRowGroupReader. let flat_format = self.context.read_format().as_flat().unwrap(); let record_batch =