diff --git a/src/mito2/src/sst/parquet/file_range.rs b/src/mito2/src/sst/parquet/file_range.rs index 8eb477195a..3682e26fc8 100644 --- a/src/mito2/src/sst/parquet/file_range.rs +++ b/src/mito2/src/sst/parquet/file_range.rs @@ -553,7 +553,12 @@ impl RangeBase { /// Builds an encoded primary-key filter for flat scan pre-filtering. pub(crate) fn new_primary_key_filter(&self) -> Option> { - if self.read_format.metadata().primary_key.is_empty() { + if self.read_format.metadata().primary_key.is_empty() + || !self + .read_format + .as_flat() + .is_some_and(|format| format.raw_batch_has_primary_key_dictionary()) + { return None; } let filters = self.usable_primary_key_filters()?; @@ -1129,18 +1134,23 @@ mod tests { Array, ArrayRef, BinaryArray, DictionaryArray, TimestampMillisecondArray, UInt8Array, UInt32Array, UInt64Array, }; - use datatypes::arrow::datatypes::UInt32Type; + use datatypes::arrow::datatypes::{Schema, UInt32Type}; use datatypes::schema::ColumnSchema; use mito_codec::row_converter::build_primary_key_codec; use store_api::codec::PrimaryKeyEncoding; use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder}; use super::*; + use crate::sst::internal_fields; use crate::test_util::sst_util::{ new_primary_key, new_sparse_primary_key, sst_region_metadata, sst_region_metadata_with_encoding, }; + fn flat_file_num_columns(metadata: &RegionMetadata) -> usize { + metadata.column_metadatas.len() + 3 + } + fn new_test_range_base_with_metadata( metadata: Arc, expected_metadata: Option>, @@ -1149,7 +1159,7 @@ mod tests { let read_format = ReadFormat::new_flat( metadata.clone(), metadata.column_metadatas.iter().map(|c| c.column_id), - Some(5), + Some(flat_file_num_columns(&metadata)), "test", false, ) @@ -1240,16 +1250,20 @@ mod tests { ) -> RecordBatch { assert_eq!(primary_keys.len(), field_values.len()); - let schema = ReadFormat::new_flat( - metadata.clone(), - metadata.column_metadatas.iter().map(|c| c.column_id), - Some(5), - "test", - false, - ) - .unwrap() - .arrow_schema() - .clone(); + let arrow_schema = metadata.schema.arrow_schema(); + let field_column = arrow_schema + .field(arrow_schema.index_of("field_0").unwrap()) + .clone(); + let time_index_column = arrow_schema + .field(arrow_schema.index_of("ts").unwrap()) + .clone(); + let mut fields = vec![field_column, time_index_column]; + fields.extend( + internal_fields() + .into_iter() + .map(|field| field.as_ref().clone()), + ); + let schema = Arc::new(Schema::new(fields)); let mut dict_values = Vec::new(); let mut keys = Vec::with_capacity(primary_keys.len()); @@ -1318,6 +1332,38 @@ mod tests { assert!(base.new_primary_key_filter().is_none()); } + #[test] + fn test_new_primary_key_filter_skips_legacy_primary_key_batches() { + let metadata = Arc::new(sst_region_metadata_with_encoding( + PrimaryKeyEncoding::Sparse, + )); + let read_format = ReadFormat::new_flat( + metadata.clone(), + metadata.column_metadatas.iter().map(|c| c.column_id), + None, + "test", + true, + ) + .unwrap(); + let primary_key_filters = + vec![SimpleFilterEvaluator::try_new(&col("tag_0").eq(lit("b"))).unwrap()]; + let base = RangeBase { + filters: vec![], + primary_key_filters: Some(Arc::new(primary_key_filters)), + dyn_filters: vec![], + read_format, + expected_metadata: None, + prune_schema: metadata.schema.clone(), + codec: build_primary_key_codec(metadata.as_ref()), + compat_batch: None, + compaction_projection_mapper: None, + pre_filter_mode: PreFilterMode::All, + partition_filter: None, + }; + + assert!(base.new_primary_key_filter().is_none()); + } + #[test] fn test_prefilter_primary_key_ignores_reused_expected_tag_name() { let metadata = Arc::new(sst_region_metadata()); diff --git a/src/mito2/src/sst/parquet/flat_format.rs b/src/mito2/src/sst/parquet/flat_format.rs index d6b061e468..ad9ddf0280 100644 --- a/src/mito2/src/sst/parquet/flat_format.rs +++ b/src/mito2/src/sst/parquet/flat_format.rs @@ -267,6 +267,15 @@ impl FlatReadFormat { } } + /// Returns true if raw parquet batches already use the flat layout with an encoded + /// `__primary_key` dictionary column. + pub(crate) fn raw_batch_has_primary_key_dictionary(&self) -> bool { + match &self.parquet_adapter { + ParquetAdapter::Flat(_) => true, + ParquetAdapter::PrimaryKeyToFlat(_) => false, + } + } + /// Creates a sequence array to override. pub(crate) fn new_override_sequence_array(&self, length: usize) -> Option { self.override_sequence