From 8d3ebde6524ad140195cb51fd90e52ae129def9b Mon Sep 17 00:00:00 2001 From: fys <40801205+fengys1996@users.noreply.github.com> Date: Mon, 25 May 2026 19:45:42 +0800 Subject: [PATCH] fix(mito2): schema-safe skipping index pruning (#8122) * fix: schema-safe skipping index pruning * fix: cargo clippy * fix: sqlness test * remove default plan in BloomFilterIndexApplier * fix comment of plan_for_sst * add fast path for default_plan * minor refactor * some rename * fix: cr by ai --- .../src/sst/index/bloom_filter/applier.rs | 177 ++++++++++++++++-- .../sst/index/bloom_filter/applier/builder.rs | 25 ++- src/mito2/src/sst/parquet.rs | 23 ++- src/mito2/src/sst/parquet/reader.rs | 20 +- .../change_col_type_skipping_index.result | 47 +++++ .../alter/change_col_type_skipping_index.sql | 26 +++ 6 files changed, 286 insertions(+), 32 deletions(-) create mode 100644 tests/cases/standalone/common/alter/change_col_type_skipping_index.result create mode 100644 tests/cases/standalone/common/alter/change_col_type_skipping_index.sql diff --git a/src/mito2/src/sst/index/bloom_filter/applier.rs b/src/mito2/src/sst/index/bloom_filter/applier.rs index e36874e97d..b1f6032630 100644 --- a/src/mito2/src/sst/index/bloom_filter/applier.rs +++ b/src/mito2/src/sst/index/bloom_filter/applier.rs @@ -21,6 +21,7 @@ use std::time::Instant; use common_base::range_read::RangeReader; use common_telemetry::{tracing, warn}; +use datatypes::data_type::ConcreteDataType; use index::bloom_filter::applier::{BloomFilterApplier, InListPredicate}; use index::bloom_filter::reader::{ BloomFilterReadMetrics, BloomFilterReader, BloomFilterReaderImpl, @@ -30,6 +31,7 @@ use object_store::ObjectStore; use puffin::puffin_manager::cache::PuffinMetadataCacheRef; use puffin::puffin_manager::{PuffinManager, PuffinReader}; use snafu::ResultExt; +use store_api::metadata::RegionMetadataRef; use store_api::region_request::PathType; use store_api::storage::ColumnId; @@ -38,7 +40,6 @@ use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey}; use crate::cache::index::bloom_filter_index::{ BloomFilterIndexCacheRef, CachedBloomFilterIndexBlobReader, Tag, }; -use crate::cache::index::result_cache::PredicateKey; use crate::error::{ ApplyBloomFilterIndexSnafu, Error, MetadataSnafu, PuffinBuildReaderSnafu, PuffinReadBlobSnafu, Result, @@ -133,10 +134,10 @@ pub struct BloomFilterIndexApplier { /// Bloom filter predicates. /// For each column, the value will be retained only if it contains __all__ predicates. - predicates: Arc>>, + default_predicates: Arc>>, - /// Predicate key. Used to identify the predicate and fetch result from cache. - predicate_key: PredicateKey, + /// Expected predicate column types from the latest region metadata. + expected_predicate_col_types: BTreeMap, } impl BloomFilterIndexApplier { @@ -149,8 +150,9 @@ impl BloomFilterIndexApplier { object_store: ObjectStore, puffin_manager_factory: PuffinManagerFactory, predicates: BTreeMap>, + expected_predicate_col_types: BTreeMap, ) -> Self { - let predicates = Arc::new(predicates); + let default_predicates = Arc::new(predicates); Self { table_dir, path_type, @@ -159,8 +161,8 @@ impl BloomFilterIndexApplier { puffin_manager_factory, puffin_metadata_cache: None, bloom_filter_index_cache: None, - predicate_key: PredicateKey::new_bloom(predicates.clone()), - predicates, + default_predicates, + expected_predicate_col_types, } } @@ -207,6 +209,7 @@ impl BloomFilterIndexApplier { &self, file_id: RegionIndexId, file_size_hint: Option, + predicates: &BTreeMap>, row_groups: impl Iterator, mut metrics: Option<&mut BloomFilterIndexApplyMetrics>, ) -> Result>)>> { @@ -230,7 +233,7 @@ impl BloomFilterIndexApplier { .map(|(i, range)| (*i, vec![range.clone()])) .collect::>(); - for (column_id, predicates) in self.predicates.iter() { + for (column_id, predicates) in predicates { let blob = match self .blob_reader(file_id, *column_id, file_size_hint, metrics.as_deref_mut()) .await? @@ -438,9 +441,46 @@ impl BloomFilterIndexApplier { Ok(()) } - /// Returns the predicate key. - pub fn predicate_key(&self) -> &PredicateKey { - &self.predicate_key + /// Returns compatible bloom filter predicates with the given SST metadata. + /// + /// Returns `None` when no compatible predicate remains for this SST. + pub fn compatible_predicate_for_sst( + &self, + sst_metadata: &RegionMetadataRef, + ) -> Option>>> { + let mut has_type_mismatch = false; + let mut compatible_col_ids = Vec::new(); + + for (col_id, expected) in &self.expected_predicate_col_types { + let Some(sst_col) = sst_metadata.column_by_id(*col_id) else { + has_type_mismatch = true; + continue; + }; + + if sst_col.column_schema.data_type != *expected { + has_type_mismatch = true; + continue; + } + + compatible_col_ids.push(*col_id); + } + + if compatible_col_ids.is_empty() { + return None; + } + + if !has_type_mismatch { + return Some(self.default_predicates.clone()); + } + + let mut compatible_predicates = BTreeMap::new(); + for col_id in compatible_col_ids { + if let Some(predicates) = self.default_predicates.get(&col_id) { + compatible_predicates.insert(col_id, predicates.clone()); + } + } + + Some(Arc::new(compatible_predicates)) } } @@ -456,9 +496,12 @@ fn is_blob_not_found(err: &Error) -> bool { #[cfg(test)] mod tests { + use std::collections::BTreeSet; use datafusion_expr::{Expr, col, lit}; use futures::future::BoxFuture; + use index::Bytes; + use object_store::services::Memory; use puffin::puffin_manager::PuffinWriter; use store_api::metadata::RegionMetadata; use store_api::storage::FileId; @@ -470,6 +513,113 @@ mod tests { mock_object_store, mock_region_metadata, new_batch, new_intm_mgr, }; + #[tokio::test] + async fn test_compatible_predicate_for_sst() { + let (_d, puffin_manager_factory) = + PuffinManagerFactory::new_for_test_async("test_plan_for_sst_basic_").await; + let object_store = ObjectStore::new(Memory::default()).unwrap().finish(); + let table_dir = "table_dir".to_string(); + + let predicates = BTreeMap::from_iter([( + 1, + vec![InListPredicate { + list: BTreeSet::from_iter([Bytes::from("foo")]), + }], + )]); + let expected_predicate_col_types = + BTreeMap::from_iter([(1, ConcreteDataType::string_datatype())]); + + let applier = BloomFilterIndexApplier::new( + table_dir, + PathType::Bare, + object_store, + puffin_manager_factory, + predicates, + expected_predicate_col_types, + ); + let predicates = applier.compatible_predicate_for_sst(&mock_region_metadata()); + assert!(predicates.is_some()); + } + + #[tokio::test] + async fn test_compatible_predicate_for_sst_type_mismatch() { + let (_d, puffin_manager_factory) = + PuffinManagerFactory::new_for_test_async("test_plan_for_sst_type_mismatch_").await; + let object_store = ObjectStore::new(Memory::default()).unwrap().finish(); + let table_dir = "table_dir".to_string(); + + let predicates = BTreeMap::from_iter([( + 1, + vec![InListPredicate { + list: BTreeSet::from_iter([Bytes::from("foo")]), + }], + )]); + let expected_predicate_col_types = + BTreeMap::from_iter([(1, ConcreteDataType::int64_datatype())]); + + let applier = BloomFilterIndexApplier::new( + table_dir, + PathType::Bare, + object_store, + puffin_manager_factory, + predicates, + expected_predicate_col_types, + ); + let predicates = applier.compatible_predicate_for_sst(&mock_region_metadata()); + assert!(predicates.is_none()); + } + + #[tokio::test] + async fn test_compatible_predicate_for_sst_partial_type_mismatch() { + let (_d, puffin_manager_factory) = + PuffinManagerFactory::new_for_test_async("test_plan_for_sst_partial_mismatch_").await; + let object_store = ObjectStore::new(Memory::default()).unwrap().finish(); + let table_dir = "table_dir".to_string(); + + // Column 1 (tag_str): expected string — matches SST (compatible). + // Column 3 (field_u64): expected int64 — SST has uint64 (mismatched). + let predicates = BTreeMap::from_iter([ + ( + 1, + vec![InListPredicate { + list: BTreeSet::from_iter([Bytes::from("foo")]), + }], + ), + ( + 3, + vec![InListPredicate { + list: BTreeSet::from_iter([Bytes::from("bar")]), + }], + ), + ]); + let expected_predicate_col_types = BTreeMap::from_iter([ + (1, ConcreteDataType::string_datatype()), + (3, ConcreteDataType::int64_datatype()), // intentional mismatch + ]); + + let applier = BloomFilterIndexApplier::new( + table_dir, + PathType::Bare, + object_store, + puffin_manager_factory, + predicates, + expected_predicate_col_types, + ); + let result = applier.compatible_predicate_for_sst(&mock_region_metadata()); + + // The subset containing only the compatible column must be returned. + let result = result.expect("expected Some with compatible subset"); + assert!( + result.contains_key(&1), + "compatible column 1 must be present" + ); + assert!( + !result.contains_key(&3), + "mismatched column 3 must be absent" + ); + assert_eq!(result.len(), 1, "only the compatible predicate must remain"); + } + #[allow(clippy::type_complexity)] fn tester( table_dir: String, @@ -496,8 +646,11 @@ mod tests { ); let applier = builder.build(&exprs).unwrap().unwrap(); + let predicates = applier + .compatible_predicate_for_sst(&Arc::new(metadata.clone())) + .unwrap(); applier - .apply(file_id, None, row_groups.into_iter(), None) + .apply(file_id, None, &predicates, row_groups.into_iter(), None) .await .unwrap() .into_iter() diff --git a/src/mito2/src/sst/index/bloom_filter/applier/builder.rs b/src/mito2/src/sst/index/bloom_filter/applier/builder.rs index a2930d4075..beb26d1bf7 100644 --- a/src/mito2/src/sst/index/bloom_filter/applier/builder.rs +++ b/src/mito2/src/sst/index/bloom_filter/applier/builder.rs @@ -101,12 +101,14 @@ impl<'a> BloomFilterIndexApplierBuilder<'a> { return Ok(None); } + let expected_predicate_column_types = self.expected_predicate_column_types(); let applier = BloomFilterIndexApplier::new( self.table_dir, self.path_type, self.object_store, self.puffin_manager_factory, self.predicates, + expected_predicate_column_types, ) .with_file_cache(self.file_cache) .with_puffin_metadata_cache(self.puffin_metadata_cache) @@ -137,6 +139,17 @@ impl<'a> BloomFilterIndexApplierBuilder<'a> { } } + /// Returns `(column_id, data_type)` pairs for predicate columns. + fn expected_predicate_column_types(&self) -> BTreeMap { + self.predicates + .keys() + .filter_map(|col_id| { + let col = self.metadata.column_by_id(*col_id)?; + Some((*col_id, col.column_schema.data_type.clone())) + }) + .collect() + } + /// Helper function to get the column id and type fn column_id_and_type( &self, @@ -404,7 +417,7 @@ mod tests { let result = builder.build(&exprs).unwrap(); assert!(result.is_some()); - let predicates = result.unwrap().predicates; + let predicates = result.unwrap().default_predicates; assert_eq!(predicates.len(), 1); let column_predicates = predicates.get(&1).unwrap(); @@ -443,7 +456,7 @@ mod tests { let result = builder.build(&exprs).unwrap(); assert!(result.is_some()); - let predicates = result.unwrap().predicates; + let predicates = result.unwrap().default_predicates; let column_predicates = predicates.get(&2).unwrap(); assert_eq!(column_predicates.len(), 1); assert_eq!(column_predicates[0].list.len(), 3); @@ -473,7 +486,7 @@ mod tests { let result = builder().build(&[expr]).unwrap(); assert!(result.is_some()); - let predicates = result.unwrap().predicates; + let predicates = result.unwrap().default_predicates; let column_predicates = predicates.get(&1).unwrap(); assert_eq!(column_predicates.len(), 1); assert_eq!(column_predicates[0].list.len(), 4); @@ -537,7 +550,7 @@ mod tests { let result = builder.build(&exprs).unwrap(); assert!(result.is_some()); - let predicates = result.unwrap().predicates; + let predicates = result.unwrap().default_predicates; assert_eq!(predicates.len(), 2); assert!(predicates.contains_key(&1)); assert!(predicates.contains_key(&2)); @@ -575,7 +588,7 @@ mod tests { let result = builder.build(&exprs).unwrap(); assert!(result.is_some()); - let predicates = result.unwrap().predicates; + let predicates = result.unwrap().default_predicates; assert!(!predicates.contains_key(&1)); // Null equality should be ignored let column2_predicates = predicates.get(&2).unwrap(); assert_eq!(column2_predicates[0].list.len(), 2); @@ -644,7 +657,7 @@ mod tests { let result = builder.build(&exprs).unwrap(); assert!(result.is_some()); - let predicates = result.unwrap().predicates; + let predicates = result.unwrap().default_predicates; let column_predicates = predicates.get(&1).unwrap(); assert_eq!(column_predicates.len(), 2); } diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index a3fe32cbe3..48474d9fe4 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -138,6 +138,7 @@ mod tests { use super::*; use crate::access_layer::{FilePathProvider, Metrics, RegionFilePathFactory, WriteType}; + use crate::cache::index::result_cache::PredicateKey; use crate::cache::test_util::assert_parquet_metadata_equal; use crate::cache::{CacheManager, CacheStrategy, PageKey}; use crate::config::IndexConfig; @@ -985,11 +986,14 @@ mod tests { assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 2); assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 2); assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 100); + let bloom_predicates = bloom_filter_applier + .as_ref() + .unwrap() + .compatible_predicate_for_sst(&metadata) + .unwrap(); + let bloom_predicate_key = PredicateKey::new_bloom(bloom_predicates); let cached = index_result_cache - .get( - bloom_filter_applier.unwrap().predicate_key(), - handle.file_id().file_id(), - ) + .get(&bloom_predicate_key, handle.file_id().file_id()) .unwrap(); assert!(cached.contains_row_group(2)); assert!(cached.contains_row_group(3)); @@ -1055,11 +1059,14 @@ mod tests { assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0); assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 2); assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 140); + let bloom_predicates = bloom_filter_applier + .as_ref() + .unwrap() + .compatible_predicate_for_sst(&metadata) + .unwrap(); + let bloom_predicate_key = PredicateKey::new_bloom(bloom_predicates); let cached = index_result_cache - .get( - bloom_filter_applier.unwrap().predicate_key(), - handle.file_id().file_id(), - ) + .get(&bloom_predicate_key, handle.file_id().file_id()) .unwrap(); assert!(cached.contains_row_group(0)); assert!(cached.contains_row_group(1)); diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 5d812f6307..02db70fa88 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -741,6 +741,7 @@ impl ParquetReaderBuilder { } self.prune_row_groups_by_bloom_filter( + read_format.metadata(), row_group_size, parquet_meta, &mut output, @@ -935,6 +936,7 @@ impl ParquetReaderBuilder { async fn prune_row_groups_by_bloom_filter( &self, + sst_metadata: &RegionMetadataRef, row_group_size: usize, parquet_meta: &ParquetMetaData, output: &mut RowGroupSelection, @@ -953,12 +955,17 @@ impl ParquetReaderBuilder { &self.bloom_filter_index_appliers[..] }; for index_applier in appliers.iter().flatten() { - let predicate_key = index_applier.predicate_key(); + let Some(compatible_predicates) = + index_applier.compatible_predicate_for_sst(sst_metadata) + else { + continue; + }; + let predicate_key = PredicateKey::new_bloom(compatible_predicates.clone()); // Fast path: return early if the result is in the cache. - let cached = self - .cache_strategy - .index_result_cache() - .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id())); + let cached = self.cache_strategy.index_result_cache().and_then(|cache| { + let file_id = self.file_handle.file_id().file_id(); + cache.get(&predicate_key, file_id) + }); if let Some(result) = cached.as_ref() && all_required_row_groups_searched(output, result) { @@ -986,6 +993,7 @@ impl ParquetReaderBuilder { .apply( self.file_handle.index_id(), Some(file_size_hint), + &compatible_predicates, rgs, metrics.bloom_filter_apply_metrics.as_mut(), ) @@ -1006,7 +1014,7 @@ impl ParquetReaderBuilder { } self.apply_index_result_and_update_cache( - predicate_key, + &predicate_key, self.file_handle.file_id().file_id(), selection, output, diff --git a/tests/cases/standalone/common/alter/change_col_type_skipping_index.result b/tests/cases/standalone/common/alter/change_col_type_skipping_index.result new file mode 100644 index 0000000000..e19cb455a5 --- /dev/null +++ b/tests/cases/standalone/common/alter/change_col_type_skipping_index.result @@ -0,0 +1,47 @@ +-- Regression test for skip index with column type change. +CREATE TABLE monitoring_data_skip ( + host STRING SKIPPING INDEX, + `region` STRING, + cpu_usage DOUBLE SKIPPING INDEX, + `timestamp` TIMESTAMP TIME INDEX +) WITH ('append_mode'='true'); + +Affected Rows: 0 + +INSERT INTO monitoring_data_skip (host, region, cpu_usage, `timestamp`) VALUES +('web-01', 'us-east', 12.5, '2026-05-06 10:00:00'), +('web-01', 'us-east', 15.2, '2026-05-06 10:01:00'), +('web-02', 'us-east', 23.7, '2026-05-06 10:01:00'), +('db-01', 'us-east', 45.0, '2026-05-06 10:02:00'), +('db-02', 'us-west', 82.2, '2026-05-06 10:02:00'), +('cache-01', 'eu-central', 55.4, '2026-05-06 10:02:00'), +('queue-01', 'ap-south', 99.1, '2026-05-06 10:02:00'); + +Affected Rows: 7 + +ADMIN FLUSH_TABLE('monitoring_data_skip'); + ++-------------------------------------------+ +| ADMIN FLUSH_TABLE('monitoring_data_skip') | ++-------------------------------------------+ +| 0 | ++-------------------------------------------+ + +ALTER TABLE monitoring_data_skip +MODIFY COLUMN cpu_usage STRING; + +Affected Rows: 0 + +SELECT host, region, cpu_usage, `timestamp` FROM monitoring_data_skip +WHERE cpu_usage = '23.7'; + ++--------+---------+-----------+---------------------+ +| host | region | cpu_usage | timestamp | ++--------+---------+-----------+---------------------+ +| web-02 | us-east | 23.7 | 2026-05-06T10:01:00 | ++--------+---------+-----------+---------------------+ + +DROP TABLE monitoring_data_skip; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/alter/change_col_type_skipping_index.sql b/tests/cases/standalone/common/alter/change_col_type_skipping_index.sql new file mode 100644 index 0000000000..8a7e3b431d --- /dev/null +++ b/tests/cases/standalone/common/alter/change_col_type_skipping_index.sql @@ -0,0 +1,26 @@ +-- Regression test for skip index with column type change. +CREATE TABLE monitoring_data_skip ( + host STRING SKIPPING INDEX, + `region` STRING, + cpu_usage DOUBLE SKIPPING INDEX, + `timestamp` TIMESTAMP TIME INDEX +) WITH ('append_mode'='true'); + +INSERT INTO monitoring_data_skip (host, region, cpu_usage, `timestamp`) VALUES +('web-01', 'us-east', 12.5, '2026-05-06 10:00:00'), +('web-01', 'us-east', 15.2, '2026-05-06 10:01:00'), +('web-02', 'us-east', 23.7, '2026-05-06 10:01:00'), +('db-01', 'us-east', 45.0, '2026-05-06 10:02:00'), +('db-02', 'us-west', 82.2, '2026-05-06 10:02:00'), +('cache-01', 'eu-central', 55.4, '2026-05-06 10:02:00'), +('queue-01', 'ap-south', 99.1, '2026-05-06 10:02:00'); + +ADMIN FLUSH_TABLE('monitoring_data_skip'); + +ALTER TABLE monitoring_data_skip +MODIFY COLUMN cpu_usage STRING; + +SELECT host, region, cpu_usage, `timestamp` FROM monitoring_data_skip +WHERE cpu_usage = '23.7'; + +DROP TABLE monitoring_data_skip;