From 078afb2bd65d6f7bd666a815676e155d0dd4418c Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Tue, 3 Jun 2025 16:08:19 +0800 Subject: [PATCH] feat: bloom filter index applier support or eq chain (#6227) * feat: bloom filter index applier support or eq chain Signed-off-by: Zhenchi * address comments Signed-off-by: Zhenchi --------- Signed-off-by: Zhenchi --- .../sst/index/bloom_filter/applier/builder.rs | 160 +++++++++++++++++- 1 file changed, 155 insertions(+), 5 deletions(-) diff --git a/src/mito2/src/sst/index/bloom_filter/applier/builder.rs b/src/mito2/src/sst/index/bloom_filter/applier/builder.rs index a9e43ebe32..fcf9e0f768 100644 --- a/src/mito2/src/sst/index/bloom_filter/applier/builder.rs +++ b/src/mito2/src/sst/index/bloom_filter/applier/builder.rs @@ -15,7 +15,7 @@ use std::collections::{BTreeMap, BTreeSet}; use common_telemetry::warn; -use datafusion_common::ScalarValue; +use datafusion_common::{Column, ScalarValue}; use datafusion_expr::expr::InList; use datafusion_expr::{BinaryExpr, Expr, Operator}; use datatypes::data_type::ConcreteDataType; @@ -121,6 +121,7 @@ impl<'a> BloomFilterIndexApplierBuilder<'a> { Ok(()) } Operator::Eq => self.collect_eq(left, right), + Operator::Or => self.collect_or_eq_list(left, right), _ => Ok(()), }, Expr::InList(in_list) => self.collect_in_list(in_list), @@ -152,10 +153,8 @@ impl<'a> BloomFilterIndexApplierBuilder<'a> { /// Collects an equality expression (column = value) fn collect_eq(&mut self, left: &Expr, right: &Expr) -> Result<()> { - let (col, lit) = match (left, right) { - (Expr::Column(col), Expr::Literal(lit)) => (col, lit), - (Expr::Literal(lit), Expr::Column(col)) => (col, lit), - _ => return Ok(()), + let Some((col, lit)) = Self::eq_expr_col_lit(left, right)? else { + return Ok(()); }; if lit.is_null() { return Ok(()); @@ -218,6 +217,83 @@ impl<'a> BloomFilterIndexApplierBuilder<'a> { Ok(()) } + /// Collects an or expression in the form of `column = lit OR column = lit OR ...`. + fn collect_or_eq_list(&mut self, left: &Expr, right: &Expr) -> Result<()> { + let (eq_left, eq_right, or_list) = if let Expr::BinaryExpr(BinaryExpr { + left: l, + op: Operator::Eq, + right: r, + }) = left + { + (l, r, right) + } else if let Expr::BinaryExpr(BinaryExpr { + left: l, + op: Operator::Eq, + right: r, + }) = right + { + (l, r, left) + } else { + return Ok(()); + }; + + let Some((col, lit)) = Self::eq_expr_col_lit(eq_left, eq_right)? else { + return Ok(()); + }; + if lit.is_null() { + return Ok(()); + } + let Some((column_id, data_type)) = self.column_id_and_type(&col.name)? else { + return Ok(()); + }; + + let mut inlist = BTreeSet::new(); + inlist.insert(encode_lit(lit, data_type.clone())?); + if Self::collect_or_eq_list_rec(&col.name, &data_type, or_list, &mut inlist)? { + self.predicates + .entry(column_id) + .or_default() + .push(InListPredicate { list: inlist }); + } + + Ok(()) + } + + fn collect_or_eq_list_rec( + column_name: &str, + data_type: &ConcreteDataType, + expr: &Expr, + inlist: &mut BTreeSet, + ) -> Result { + if let Expr::BinaryExpr(BinaryExpr { left, op, right }) = expr { + match op { + Operator::Or => { + let r = Self::collect_or_eq_list_rec(column_name, data_type, left, inlist)? + .then(|| { + Self::collect_or_eq_list_rec(column_name, data_type, right, inlist) + }) + .transpose()? + .unwrap_or(false); + return Ok(r); + } + Operator::Eq => { + let Some((col, lit)) = Self::eq_expr_col_lit(left, right)? else { + return Ok(false); + }; + if lit.is_null() || column_name != col.name { + return Ok(false); + } + let bytes = encode_lit(lit, data_type.clone())?; + inlist.insert(bytes); + return Ok(true); + } + _ => {} + } + } + + Ok(false) + } + /// Helper function to get non-null literal value fn nonnull_lit(expr: &Expr) -> Option<&ScalarValue> { match expr { @@ -225,6 +301,19 @@ impl<'a> BloomFilterIndexApplierBuilder<'a> { _ => None, } } + + /// Helper function to get the column and literal value from an equality expr (column = lit) + fn eq_expr_col_lit<'b>( + left: &'b Expr, + right: &'b Expr, + ) -> Result> { + let (col, lit) = match (left, right) { + (Expr::Column(col), Expr::Literal(lit)) => (col, lit), + (Expr::Literal(lit), Expr::Column(col)) => (col, lit), + _ => return Ok(None), + }; + Ok(Some((col, lit))) + } } // TODO(ruihang): extract this and the one under inverted_index into a common util mod. @@ -241,6 +330,7 @@ fn encode_lit(lit: &ScalarValue, data_type: ConcreteDataType) -> Result { mod tests { use api::v1::SemanticType; use datafusion_common::Column; + use datafusion_expr::{col, lit}; use datatypes::schema::ColumnSchema; use object_store::services::Memory; use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder}; @@ -356,6 +446,66 @@ mod tests { assert_eq!(column_predicates[0].list.len(), 3); } + #[test] + fn test_build_with_or_chain() { + let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_or_chain_"); + let metadata = test_region_metadata(); + let builder = || { + BloomFilterIndexApplierBuilder::new( + "test".to_string(), + test_object_store(), + &metadata, + factory.clone(), + ) + }; + + let expr = col("column1") + .eq(lit("value1")) + .or(col("column1") + .eq(lit("value2")) + .or(col("column1").eq(lit("value4")))) + .or(col("column1").eq(lit("value3"))); + + let result = builder().build(&[expr]).unwrap(); + assert!(result.is_some()); + + let predicates = result.unwrap().predicates; + let column_predicates = predicates.get(&1).unwrap(); + assert_eq!(column_predicates.len(), 1); + assert_eq!(column_predicates[0].list.len(), 4); + let or_chain_predicates = &column_predicates[0].list; + let encode_str = |s: &str| { + encode_lit( + &ScalarValue::Utf8(Some(s.to_string())), + ConcreteDataType::string_datatype(), + ) + .unwrap() + }; + assert!(or_chain_predicates.contains(&encode_str("value1"))); + assert!(or_chain_predicates.contains(&encode_str("value2"))); + assert!(or_chain_predicates.contains(&encode_str("value3"))); + assert!(or_chain_predicates.contains(&encode_str("value4"))); + + // Test with null value + let expr = col("column1").eq(Expr::Literal(ScalarValue::Utf8(None))); + let result = builder().build(&[expr]).unwrap(); + assert!(result.is_none()); + + // Test with different column + let expr = col("column1") + .eq(lit("value1")) + .or(col("column2").eq(lit("value2"))); + let result = builder().build(&[expr]).unwrap(); + assert!(result.is_none()); + + // Test with non or chain + let expr = col("column1") + .eq(lit("value1")) + .or(col("column1").gt_eq(lit("value2"))); + let result = builder().build(&[expr]).unwrap(); + assert!(result.is_none()); + } + #[test] fn test_build_with_and_expressions() { let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_and_");