diff --git a/src/common/recordbatch/src/filter.rs b/src/common/recordbatch/src/filter.rs index d7c522e656..a9605173a3 100644 --- a/src/common/recordbatch/src/filter.rs +++ b/src/common/recordbatch/src/filter.rs @@ -48,7 +48,7 @@ use crate::error::{ArrowComputeSnafu, Result, ToArrowScalarSnafu, UnsupportedOpe /// /// This struct contains normalized predicate expr. In the form of /// `col` `op` `literal` where the `col` is provided from input. -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct SimpleFilterEvaluator { /// Name of the referenced column. column_name: String, diff --git a/src/mito2/src/sst/parquet/file_range.rs b/src/mito2/src/sst/parquet/file_range.rs index 64e894da0e..8eb477195a 100644 --- a/src/mito2/src/sst/parquet/file_range.rs +++ b/src/mito2/src/sst/parquet/file_range.rs @@ -35,7 +35,7 @@ use parquet::arrow::arrow_reader::RowSelection; use parquet::file::metadata::ParquetMetaData; use snafu::{OptionExt, ResultExt}; use store_api::codec::PrimaryKeyEncoding; -use store_api::metadata::RegionMetadataRef; +use store_api::metadata::{RegionMetadata, RegionMetadataRef}; use store_api::storage::{ColumnId, TimeSeriesRowSelector}; use table::predicate::Predicate; @@ -492,35 +492,75 @@ impl TagDecodeState { } impl RangeBase { - fn has_usable_primary_key_filter(&self) -> bool { + fn usable_primary_key_filters(&self) -> Option>> { let Some(filters) = &self.primary_key_filters else { - return false; + return None; }; - let metadata = self.read_format.metadata(); + let sst_metadata = self.read_format.metadata(); + let expected_metadata = self.expected_metadata.as_deref(); + let usable_filters = filters + .iter() + .filter(|filter| { + Self::is_usable_primary_key_filter(sst_metadata, expected_metadata, filter) + }) + .cloned() + .collect::>(); - filters.iter().any(|filter| { - !is_partition_column(filter.column_name()) - && metadata - .column_by_name(filter.column_name()) - .is_some_and(|column| { - column.semantic_type == SemanticType::Tag - && metadata.primary_key_index(column.column_id).is_some() - }) - }) + (!usable_filters.is_empty()).then_some(Arc::new(usable_filters)) + } + + fn is_usable_primary_key_filter( + sst_metadata: &RegionMetadataRef, + expected_metadata: Option<&RegionMetadata>, + filter: &SimpleFilterEvaluator, + ) -> bool { + if is_partition_column(filter.column_name()) { + return false; + } + + let sst_column = match expected_metadata { + Some(expected_metadata) => { + let Some(expected_column) = expected_metadata.column_by_name(filter.column_name()) + else { + return false; + }; + let Some(sst_column) = sst_metadata.column_by_id(expected_column.column_id) else { + return false; + }; + + if sst_column.column_schema.name != expected_column.column_schema.name + || sst_column.semantic_type != expected_column.semantic_type + || sst_column.column_schema.data_type != expected_column.column_schema.data_type + { + return false; + } + + sst_column + } + None => { + let Some(sst_column) = sst_metadata.column_by_name(filter.column_name()) else { + return false; + }; + sst_column + } + }; + + sst_column.semantic_type == SemanticType::Tag + && sst_metadata + .primary_key_index(sst_column.column_id) + .is_some() } /// Builds an encoded primary-key filter for flat scan pre-filtering. pub(crate) fn new_primary_key_filter(&self) -> Option> { - if self.read_format.metadata().primary_key.is_empty() - || !self.has_usable_primary_key_filter() - { + if self.read_format.metadata().primary_key.is_empty() { return None; } - let filters = self.primary_key_filters.as_ref()?; + let filters = self.usable_primary_key_filters()?; Some( self.codec - .primary_key_filter(self.read_format.metadata(), filters.clone()), + .primary_key_filter(self.read_format.metadata(), filters), ) } @@ -1090,9 +1130,10 @@ mod tests { UInt32Array, UInt64Array, }; use datatypes::arrow::datatypes::UInt32Type; + use datatypes::schema::ColumnSchema; use mito_codec::row_converter::build_primary_key_codec; use store_api::codec::PrimaryKeyEncoding; - use store_api::metadata::RegionMetadata; + use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder}; use super::*; use crate::test_util::sst_util::{ @@ -1102,6 +1143,7 @@ mod tests { fn new_test_range_base_with_metadata( metadata: Arc, + expected_metadata: Option>, exprs: &[Expr], ) -> RangeBase { let read_format = ReadFormat::new_flat( @@ -1123,7 +1165,7 @@ mod tests { .then_some(Arc::new(primary_key_filters)), dyn_filters: vec![], read_format, - expected_metadata: None, + expected_metadata, prune_schema: metadata.schema.clone(), codec: build_primary_key_codec(metadata.as_ref()), compat_batch: None, @@ -1134,7 +1176,61 @@ mod tests { } fn new_test_range_base(exprs: &[Expr]) -> RangeBase { - new_test_range_base_with_metadata(Arc::new(sst_region_metadata()), exprs) + new_test_range_base_with_metadata(Arc::new(sst_region_metadata()), None, exprs) + } + + fn new_test_range_base_with_expected_metadata( + metadata: Arc, + expected_metadata: Arc, + exprs: &[Expr], + ) -> RangeBase { + new_test_range_base_with_metadata(metadata, Some(expected_metadata), exprs) + } + + fn expected_metadata_with_reused_tag_name( + old_metadata: &RegionMetadata, + ) -> Arc { + let mut builder = RegionMetadataBuilder::new(old_metadata.region_id); + builder + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "tag_0".to_string(), + ConcreteDataType::string_datatype(), + true, + ), + semantic_type: SemanticType::Tag, + column_id: 10, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "tag_1".to_string(), + ConcreteDataType::string_datatype(), + true, + ), + semantic_type: SemanticType::Tag, + column_id: 1, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "field_0".to_string(), + ConcreteDataType::uint64_datatype(), + true, + ), + semantic_type: SemanticType::Field, + column_id: 2, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "ts".to_string(), + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + semantic_type: SemanticType::Timestamp, + column_id: 3, + }) + .primary_key(vec![10, 1]); + + Arc::new(builder.build().unwrap()) } fn new_raw_batch_with_metadata( @@ -1209,6 +1305,46 @@ mod tests { assert!(base.new_primary_key_filter().is_none()); } + #[test] + fn test_new_primary_key_filter_skips_reused_expected_tag_name() { + let metadata = Arc::new(sst_region_metadata()); + let expected_metadata = expected_metadata_with_reused_tag_name(&metadata); + let base = new_test_range_base_with_expected_metadata( + metadata, + expected_metadata, + &[col("tag_0").eq(lit("b"))], + ); + + assert!(base.new_primary_key_filter().is_none()); + } + + #[test] + fn test_prefilter_primary_key_ignores_reused_expected_tag_name() { + let metadata = Arc::new(sst_region_metadata()); + let expected_metadata = expected_metadata_with_reused_tag_name(&metadata); + let pk_ax = new_primary_key(&["a", "x"]); + let pk_by = new_primary_key(&["b", "y"]); + let batch = new_raw_batch_with_metadata( + metadata.clone(), + &[pk_ax.as_slice(), pk_by.as_slice()], + &[10, 11], + ); + let base = new_test_range_base_with_expected_metadata( + metadata, + expected_metadata, + &[col("tag_0").eq(lit("b")), col("tag_1").eq(lit("x"))], + ); + let mut primary_key_filter = base.new_primary_key_filter().unwrap(); + + let filtered = base + .prefilter_flat_batch_by_primary_key(batch, primary_key_filter.as_mut()) + .unwrap() + .unwrap(); + + assert_eq!(filtered.num_rows(), 1); + assert_eq!(field_values(&filtered), vec![10]); + } + #[test] fn test_prefilter_primary_key_drops_single_dictionary_batch() { let pk_a = new_primary_key(&["a", "x"]); @@ -1302,7 +1438,7 @@ mod tests { ], &[10, 11, 12, 13], ); - let base = new_test_range_base_with_metadata(metadata, &[col("tag_0").eq(lit("b"))]); + let base = new_test_range_base_with_metadata(metadata, None, &[col("tag_0").eq(lit("b"))]); let mut primary_key_filter = base.new_primary_key_filter().unwrap(); let filtered = base