fix(mito2): schema-safe skipping index pruning (#8122)

* fix: schema-safe skipping index pruning

* fix: cargo clippy

* fix: sqlness test

* remove default plan in BloomFilterIndexApplier

* fix comment of plan_for_sst

* add fast path for default_plan

* minor refactor

* some rename

* fix: cr by ai
This commit is contained in:
fys
2026-05-25 19:45:42 +08:00
committed by GitHub
parent eb264d9adf
commit 8d3ebde652
6 changed files with 286 additions and 32 deletions

View File

@@ -21,6 +21,7 @@ use std::time::Instant;
use common_base::range_read::RangeReader;
use common_telemetry::{tracing, warn};
use datatypes::data_type::ConcreteDataType;
use index::bloom_filter::applier::{BloomFilterApplier, InListPredicate};
use index::bloom_filter::reader::{
BloomFilterReadMetrics, BloomFilterReader, BloomFilterReaderImpl,
@@ -30,6 +31,7 @@ use object_store::ObjectStore;
use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
use puffin::puffin_manager::{PuffinManager, PuffinReader};
use snafu::ResultExt;
use store_api::metadata::RegionMetadataRef;
use store_api::region_request::PathType;
use store_api::storage::ColumnId;
@@ -38,7 +40,6 @@ use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
use crate::cache::index::bloom_filter_index::{
BloomFilterIndexCacheRef, CachedBloomFilterIndexBlobReader, Tag,
};
use crate::cache::index::result_cache::PredicateKey;
use crate::error::{
ApplyBloomFilterIndexSnafu, Error, MetadataSnafu, PuffinBuildReaderSnafu, PuffinReadBlobSnafu,
Result,
@@ -133,10 +134,10 @@ pub struct BloomFilterIndexApplier {
/// Bloom filter predicates.
/// For each column, the value will be retained only if it contains __all__ predicates.
predicates: Arc<BTreeMap<ColumnId, Vec<InListPredicate>>>,
default_predicates: Arc<BTreeMap<ColumnId, Vec<InListPredicate>>>,
/// Predicate key. Used to identify the predicate and fetch result from cache.
predicate_key: PredicateKey,
/// Expected predicate column types from the latest region metadata.
expected_predicate_col_types: BTreeMap<ColumnId, ConcreteDataType>,
}
impl BloomFilterIndexApplier {
@@ -149,8 +150,9 @@ impl BloomFilterIndexApplier {
object_store: ObjectStore,
puffin_manager_factory: PuffinManagerFactory,
predicates: BTreeMap<ColumnId, Vec<InListPredicate>>,
expected_predicate_col_types: BTreeMap<ColumnId, ConcreteDataType>,
) -> Self {
let predicates = Arc::new(predicates);
let default_predicates = Arc::new(predicates);
Self {
table_dir,
path_type,
@@ -159,8 +161,8 @@ impl BloomFilterIndexApplier {
puffin_manager_factory,
puffin_metadata_cache: None,
bloom_filter_index_cache: None,
predicate_key: PredicateKey::new_bloom(predicates.clone()),
predicates,
default_predicates,
expected_predicate_col_types,
}
}
@@ -207,6 +209,7 @@ impl BloomFilterIndexApplier {
&self,
file_id: RegionIndexId,
file_size_hint: Option<u64>,
predicates: &BTreeMap<ColumnId, Vec<InListPredicate>>,
row_groups: impl Iterator<Item = (usize, bool)>,
mut metrics: Option<&mut BloomFilterIndexApplyMetrics>,
) -> Result<Vec<(usize, Vec<Range<usize>>)>> {
@@ -230,7 +233,7 @@ impl BloomFilterIndexApplier {
.map(|(i, range)| (*i, vec![range.clone()]))
.collect::<Vec<_>>();
for (column_id, predicates) in self.predicates.iter() {
for (column_id, predicates) in predicates {
let blob = match self
.blob_reader(file_id, *column_id, file_size_hint, metrics.as_deref_mut())
.await?
@@ -438,9 +441,46 @@ impl BloomFilterIndexApplier {
Ok(())
}
/// Returns the predicate key.
pub fn predicate_key(&self) -> &PredicateKey {
&self.predicate_key
/// Returns compatible bloom filter predicates with the given SST metadata.
///
/// Returns `None` when no compatible predicate remains for this SST.
pub fn compatible_predicate_for_sst(
&self,
sst_metadata: &RegionMetadataRef,
) -> Option<Arc<BTreeMap<ColumnId, Vec<InListPredicate>>>> {
let mut has_type_mismatch = false;
let mut compatible_col_ids = Vec::new();
for (col_id, expected) in &self.expected_predicate_col_types {
let Some(sst_col) = sst_metadata.column_by_id(*col_id) else {
has_type_mismatch = true;
continue;
};
if sst_col.column_schema.data_type != *expected {
has_type_mismatch = true;
continue;
}
compatible_col_ids.push(*col_id);
}
if compatible_col_ids.is_empty() {
return None;
}
if !has_type_mismatch {
return Some(self.default_predicates.clone());
}
let mut compatible_predicates = BTreeMap::new();
for col_id in compatible_col_ids {
if let Some(predicates) = self.default_predicates.get(&col_id) {
compatible_predicates.insert(col_id, predicates.clone());
}
}
Some(Arc::new(compatible_predicates))
}
}
@@ -456,9 +496,12 @@ fn is_blob_not_found(err: &Error) -> bool {
#[cfg(test)]
mod tests {
use std::collections::BTreeSet;
use datafusion_expr::{Expr, col, lit};
use futures::future::BoxFuture;
use index::Bytes;
use object_store::services::Memory;
use puffin::puffin_manager::PuffinWriter;
use store_api::metadata::RegionMetadata;
use store_api::storage::FileId;
@@ -470,6 +513,113 @@ mod tests {
mock_object_store, mock_region_metadata, new_batch, new_intm_mgr,
};
#[tokio::test]
async fn test_compatible_predicate_for_sst() {
let (_d, puffin_manager_factory) =
PuffinManagerFactory::new_for_test_async("test_plan_for_sst_basic_").await;
let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
let table_dir = "table_dir".to_string();
let predicates = BTreeMap::from_iter([(
1,
vec![InListPredicate {
list: BTreeSet::from_iter([Bytes::from("foo")]),
}],
)]);
let expected_predicate_col_types =
BTreeMap::from_iter([(1, ConcreteDataType::string_datatype())]);
let applier = BloomFilterIndexApplier::new(
table_dir,
PathType::Bare,
object_store,
puffin_manager_factory,
predicates,
expected_predicate_col_types,
);
let predicates = applier.compatible_predicate_for_sst(&mock_region_metadata());
assert!(predicates.is_some());
}
#[tokio::test]
async fn test_compatible_predicate_for_sst_type_mismatch() {
let (_d, puffin_manager_factory) =
PuffinManagerFactory::new_for_test_async("test_plan_for_sst_type_mismatch_").await;
let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
let table_dir = "table_dir".to_string();
let predicates = BTreeMap::from_iter([(
1,
vec![InListPredicate {
list: BTreeSet::from_iter([Bytes::from("foo")]),
}],
)]);
let expected_predicate_col_types =
BTreeMap::from_iter([(1, ConcreteDataType::int64_datatype())]);
let applier = BloomFilterIndexApplier::new(
table_dir,
PathType::Bare,
object_store,
puffin_manager_factory,
predicates,
expected_predicate_col_types,
);
let predicates = applier.compatible_predicate_for_sst(&mock_region_metadata());
assert!(predicates.is_none());
}
#[tokio::test]
async fn test_compatible_predicate_for_sst_partial_type_mismatch() {
let (_d, puffin_manager_factory) =
PuffinManagerFactory::new_for_test_async("test_plan_for_sst_partial_mismatch_").await;
let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
let table_dir = "table_dir".to_string();
// Column 1 (tag_str): expected string — matches SST (compatible).
// Column 3 (field_u64): expected int64 — SST has uint64 (mismatched).
let predicates = BTreeMap::from_iter([
(
1,
vec![InListPredicate {
list: BTreeSet::from_iter([Bytes::from("foo")]),
}],
),
(
3,
vec![InListPredicate {
list: BTreeSet::from_iter([Bytes::from("bar")]),
}],
),
]);
let expected_predicate_col_types = BTreeMap::from_iter([
(1, ConcreteDataType::string_datatype()),
(3, ConcreteDataType::int64_datatype()), // intentional mismatch
]);
let applier = BloomFilterIndexApplier::new(
table_dir,
PathType::Bare,
object_store,
puffin_manager_factory,
predicates,
expected_predicate_col_types,
);
let result = applier.compatible_predicate_for_sst(&mock_region_metadata());
// The subset containing only the compatible column must be returned.
let result = result.expect("expected Some with compatible subset");
assert!(
result.contains_key(&1),
"compatible column 1 must be present"
);
assert!(
!result.contains_key(&3),
"mismatched column 3 must be absent"
);
assert_eq!(result.len(), 1, "only the compatible predicate must remain");
}
#[allow(clippy::type_complexity)]
fn tester(
table_dir: String,
@@ -496,8 +646,11 @@ mod tests {
);
let applier = builder.build(&exprs).unwrap().unwrap();
let predicates = applier
.compatible_predicate_for_sst(&Arc::new(metadata.clone()))
.unwrap();
applier
.apply(file_id, None, row_groups.into_iter(), None)
.apply(file_id, None, &predicates, row_groups.into_iter(), None)
.await
.unwrap()
.into_iter()

View File

@@ -101,12 +101,14 @@ impl<'a> BloomFilterIndexApplierBuilder<'a> {
return Ok(None);
}
let expected_predicate_column_types = self.expected_predicate_column_types();
let applier = BloomFilterIndexApplier::new(
self.table_dir,
self.path_type,
self.object_store,
self.puffin_manager_factory,
self.predicates,
expected_predicate_column_types,
)
.with_file_cache(self.file_cache)
.with_puffin_metadata_cache(self.puffin_metadata_cache)
@@ -137,6 +139,17 @@ impl<'a> BloomFilterIndexApplierBuilder<'a> {
}
}
/// Returns `(column_id, data_type)` pairs for predicate columns.
fn expected_predicate_column_types(&self) -> BTreeMap<ColumnId, ConcreteDataType> {
self.predicates
.keys()
.filter_map(|col_id| {
let col = self.metadata.column_by_id(*col_id)?;
Some((*col_id, col.column_schema.data_type.clone()))
})
.collect()
}
/// Helper function to get the column id and type
fn column_id_and_type(
&self,
@@ -404,7 +417,7 @@ mod tests {
let result = builder.build(&exprs).unwrap();
assert!(result.is_some());
let predicates = result.unwrap().predicates;
let predicates = result.unwrap().default_predicates;
assert_eq!(predicates.len(), 1);
let column_predicates = predicates.get(&1).unwrap();
@@ -443,7 +456,7 @@ mod tests {
let result = builder.build(&exprs).unwrap();
assert!(result.is_some());
let predicates = result.unwrap().predicates;
let predicates = result.unwrap().default_predicates;
let column_predicates = predicates.get(&2).unwrap();
assert_eq!(column_predicates.len(), 1);
assert_eq!(column_predicates[0].list.len(), 3);
@@ -473,7 +486,7 @@ mod tests {
let result = builder().build(&[expr]).unwrap();
assert!(result.is_some());
let predicates = result.unwrap().predicates;
let predicates = result.unwrap().default_predicates;
let column_predicates = predicates.get(&1).unwrap();
assert_eq!(column_predicates.len(), 1);
assert_eq!(column_predicates[0].list.len(), 4);
@@ -537,7 +550,7 @@ mod tests {
let result = builder.build(&exprs).unwrap();
assert!(result.is_some());
let predicates = result.unwrap().predicates;
let predicates = result.unwrap().default_predicates;
assert_eq!(predicates.len(), 2);
assert!(predicates.contains_key(&1));
assert!(predicates.contains_key(&2));
@@ -575,7 +588,7 @@ mod tests {
let result = builder.build(&exprs).unwrap();
assert!(result.is_some());
let predicates = result.unwrap().predicates;
let predicates = result.unwrap().default_predicates;
assert!(!predicates.contains_key(&1)); // Null equality should be ignored
let column2_predicates = predicates.get(&2).unwrap();
assert_eq!(column2_predicates[0].list.len(), 2);
@@ -644,7 +657,7 @@ mod tests {
let result = builder.build(&exprs).unwrap();
assert!(result.is_some());
let predicates = result.unwrap().predicates;
let predicates = result.unwrap().default_predicates;
let column_predicates = predicates.get(&1).unwrap();
assert_eq!(column_predicates.len(), 2);
}

View File

@@ -138,6 +138,7 @@ mod tests {
use super::*;
use crate::access_layer::{FilePathProvider, Metrics, RegionFilePathFactory, WriteType};
use crate::cache::index::result_cache::PredicateKey;
use crate::cache::test_util::assert_parquet_metadata_equal;
use crate::cache::{CacheManager, CacheStrategy, PageKey};
use crate::config::IndexConfig;
@@ -985,11 +986,14 @@ mod tests {
assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 2);
assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 2);
assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 100);
let bloom_predicates = bloom_filter_applier
.as_ref()
.unwrap()
.compatible_predicate_for_sst(&metadata)
.unwrap();
let bloom_predicate_key = PredicateKey::new_bloom(bloom_predicates);
let cached = index_result_cache
.get(
bloom_filter_applier.unwrap().predicate_key(),
handle.file_id().file_id(),
)
.get(&bloom_predicate_key, handle.file_id().file_id())
.unwrap();
assert!(cached.contains_row_group(2));
assert!(cached.contains_row_group(3));
@@ -1055,11 +1059,14 @@ mod tests {
assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 2);
assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 140);
let bloom_predicates = bloom_filter_applier
.as_ref()
.unwrap()
.compatible_predicate_for_sst(&metadata)
.unwrap();
let bloom_predicate_key = PredicateKey::new_bloom(bloom_predicates);
let cached = index_result_cache
.get(
bloom_filter_applier.unwrap().predicate_key(),
handle.file_id().file_id(),
)
.get(&bloom_predicate_key, handle.file_id().file_id())
.unwrap();
assert!(cached.contains_row_group(0));
assert!(cached.contains_row_group(1));

View File

@@ -741,6 +741,7 @@ impl ParquetReaderBuilder {
}
self.prune_row_groups_by_bloom_filter(
read_format.metadata(),
row_group_size,
parquet_meta,
&mut output,
@@ -935,6 +936,7 @@ impl ParquetReaderBuilder {
async fn prune_row_groups_by_bloom_filter(
&self,
sst_metadata: &RegionMetadataRef,
row_group_size: usize,
parquet_meta: &ParquetMetaData,
output: &mut RowGroupSelection,
@@ -953,12 +955,17 @@ impl ParquetReaderBuilder {
&self.bloom_filter_index_appliers[..]
};
for index_applier in appliers.iter().flatten() {
let predicate_key = index_applier.predicate_key();
let Some(compatible_predicates) =
index_applier.compatible_predicate_for_sst(sst_metadata)
else {
continue;
};
let predicate_key = PredicateKey::new_bloom(compatible_predicates.clone());
// Fast path: return early if the result is in the cache.
let cached = self
.cache_strategy
.index_result_cache()
.and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
let cached = self.cache_strategy.index_result_cache().and_then(|cache| {
let file_id = self.file_handle.file_id().file_id();
cache.get(&predicate_key, file_id)
});
if let Some(result) = cached.as_ref()
&& all_required_row_groups_searched(output, result)
{
@@ -986,6 +993,7 @@ impl ParquetReaderBuilder {
.apply(
self.file_handle.index_id(),
Some(file_size_hint),
&compatible_predicates,
rgs,
metrics.bloom_filter_apply_metrics.as_mut(),
)
@@ -1006,7 +1014,7 @@ impl ParquetReaderBuilder {
}
self.apply_index_result_and_update_cache(
predicate_key,
&predicate_key,
self.file_handle.file_id().file_id(),
selection,
output,

View File

@@ -0,0 +1,47 @@
-- Regression test for skip index with column type change.
CREATE TABLE monitoring_data_skip (
host STRING SKIPPING INDEX,
`region` STRING,
cpu_usage DOUBLE SKIPPING INDEX,
`timestamp` TIMESTAMP TIME INDEX
) WITH ('append_mode'='true');
Affected Rows: 0
INSERT INTO monitoring_data_skip (host, region, cpu_usage, `timestamp`) VALUES
('web-01', 'us-east', 12.5, '2026-05-06 10:00:00'),
('web-01', 'us-east', 15.2, '2026-05-06 10:01:00'),
('web-02', 'us-east', 23.7, '2026-05-06 10:01:00'),
('db-01', 'us-east', 45.0, '2026-05-06 10:02:00'),
('db-02', 'us-west', 82.2, '2026-05-06 10:02:00'),
('cache-01', 'eu-central', 55.4, '2026-05-06 10:02:00'),
('queue-01', 'ap-south', 99.1, '2026-05-06 10:02:00');
Affected Rows: 7
ADMIN FLUSH_TABLE('monitoring_data_skip');
+-------------------------------------------+
| ADMIN FLUSH_TABLE('monitoring_data_skip') |
+-------------------------------------------+
| 0 |
+-------------------------------------------+
ALTER TABLE monitoring_data_skip
MODIFY COLUMN cpu_usage STRING;
Affected Rows: 0
SELECT host, region, cpu_usage, `timestamp` FROM monitoring_data_skip
WHERE cpu_usage = '23.7';
+--------+---------+-----------+---------------------+
| host | region | cpu_usage | timestamp |
+--------+---------+-----------+---------------------+
| web-02 | us-east | 23.7 | 2026-05-06T10:01:00 |
+--------+---------+-----------+---------------------+
DROP TABLE monitoring_data_skip;
Affected Rows: 0

View File

@@ -0,0 +1,26 @@
-- Regression test for skip index with column type change.
CREATE TABLE monitoring_data_skip (
host STRING SKIPPING INDEX,
`region` STRING,
cpu_usage DOUBLE SKIPPING INDEX,
`timestamp` TIMESTAMP TIME INDEX
) WITH ('append_mode'='true');
INSERT INTO monitoring_data_skip (host, region, cpu_usage, `timestamp`) VALUES
('web-01', 'us-east', 12.5, '2026-05-06 10:00:00'),
('web-01', 'us-east', 15.2, '2026-05-06 10:01:00'),
('web-02', 'us-east', 23.7, '2026-05-06 10:01:00'),
('db-01', 'us-east', 45.0, '2026-05-06 10:02:00'),
('db-02', 'us-west', 82.2, '2026-05-06 10:02:00'),
('cache-01', 'eu-central', 55.4, '2026-05-06 10:02:00'),
('queue-01', 'ap-south', 99.1, '2026-05-06 10:02:00');
ADMIN FLUSH_TABLE('monitoring_data_skip');
ALTER TABLE monitoring_data_skip
MODIFY COLUMN cpu_usage STRING;
SELECT host, region, cpu_usage, `timestamp` FROM monitoring_data_skip
WHERE cpu_usage = '23.7';
DROP TABLE monitoring_data_skip;