feat: bloom filter index applier support or eq chain (#6227)

* feat: bloom filter index applier support or eq chain

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* address comments

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
This commit is contained in:
Zhenchi
2025-06-03 16:08:19 +08:00
committed by GitHub
parent 477e4cc344
commit 078afb2bd6

View File

@@ -15,7 +15,7 @@
use std::collections::{BTreeMap, BTreeSet};
use common_telemetry::warn;
use datafusion_common::ScalarValue;
use datafusion_common::{Column, ScalarValue};
use datafusion_expr::expr::InList;
use datafusion_expr::{BinaryExpr, Expr, Operator};
use datatypes::data_type::ConcreteDataType;
@@ -121,6 +121,7 @@ impl<'a> BloomFilterIndexApplierBuilder<'a> {
Ok(())
}
Operator::Eq => self.collect_eq(left, right),
Operator::Or => self.collect_or_eq_list(left, right),
_ => Ok(()),
},
Expr::InList(in_list) => self.collect_in_list(in_list),
@@ -152,10 +153,8 @@ impl<'a> BloomFilterIndexApplierBuilder<'a> {
/// Collects an equality expression (column = value)
fn collect_eq(&mut self, left: &Expr, right: &Expr) -> Result<()> {
let (col, lit) = match (left, right) {
(Expr::Column(col), Expr::Literal(lit)) => (col, lit),
(Expr::Literal(lit), Expr::Column(col)) => (col, lit),
_ => return Ok(()),
let Some((col, lit)) = Self::eq_expr_col_lit(left, right)? else {
return Ok(());
};
if lit.is_null() {
return Ok(());
@@ -218,6 +217,83 @@ impl<'a> BloomFilterIndexApplierBuilder<'a> {
Ok(())
}
/// Collects an or expression in the form of `column = lit OR column = lit OR ...`.
fn collect_or_eq_list(&mut self, left: &Expr, right: &Expr) -> Result<()> {
let (eq_left, eq_right, or_list) = if let Expr::BinaryExpr(BinaryExpr {
left: l,
op: Operator::Eq,
right: r,
}) = left
{
(l, r, right)
} else if let Expr::BinaryExpr(BinaryExpr {
left: l,
op: Operator::Eq,
right: r,
}) = right
{
(l, r, left)
} else {
return Ok(());
};
let Some((col, lit)) = Self::eq_expr_col_lit(eq_left, eq_right)? else {
return Ok(());
};
if lit.is_null() {
return Ok(());
}
let Some((column_id, data_type)) = self.column_id_and_type(&col.name)? else {
return Ok(());
};
let mut inlist = BTreeSet::new();
inlist.insert(encode_lit(lit, data_type.clone())?);
if Self::collect_or_eq_list_rec(&col.name, &data_type, or_list, &mut inlist)? {
self.predicates
.entry(column_id)
.or_default()
.push(InListPredicate { list: inlist });
}
Ok(())
}
fn collect_or_eq_list_rec(
column_name: &str,
data_type: &ConcreteDataType,
expr: &Expr,
inlist: &mut BTreeSet<Bytes>,
) -> Result<bool> {
if let Expr::BinaryExpr(BinaryExpr { left, op, right }) = expr {
match op {
Operator::Or => {
let r = Self::collect_or_eq_list_rec(column_name, data_type, left, inlist)?
.then(|| {
Self::collect_or_eq_list_rec(column_name, data_type, right, inlist)
})
.transpose()?
.unwrap_or(false);
return Ok(r);
}
Operator::Eq => {
let Some((col, lit)) = Self::eq_expr_col_lit(left, right)? else {
return Ok(false);
};
if lit.is_null() || column_name != col.name {
return Ok(false);
}
let bytes = encode_lit(lit, data_type.clone())?;
inlist.insert(bytes);
return Ok(true);
}
_ => {}
}
}
Ok(false)
}
/// Helper function to get non-null literal value
fn nonnull_lit(expr: &Expr) -> Option<&ScalarValue> {
match expr {
@@ -225,6 +301,19 @@ impl<'a> BloomFilterIndexApplierBuilder<'a> {
_ => None,
}
}
/// Helper function to get the column and literal value from an equality expr (column = lit)
fn eq_expr_col_lit<'b>(
left: &'b Expr,
right: &'b Expr,
) -> Result<Option<(&'b Column, &'b ScalarValue)>> {
let (col, lit) = match (left, right) {
(Expr::Column(col), Expr::Literal(lit)) => (col, lit),
(Expr::Literal(lit), Expr::Column(col)) => (col, lit),
_ => return Ok(None),
};
Ok(Some((col, lit)))
}
}
// TODO(ruihang): extract this and the one under inverted_index into a common util mod.
@@ -241,6 +330,7 @@ fn encode_lit(lit: &ScalarValue, data_type: ConcreteDataType) -> Result<Bytes> {
mod tests {
use api::v1::SemanticType;
use datafusion_common::Column;
use datafusion_expr::{col, lit};
use datatypes::schema::ColumnSchema;
use object_store::services::Memory;
use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
@@ -356,6 +446,66 @@ mod tests {
assert_eq!(column_predicates[0].list.len(), 3);
}
#[test]
fn test_build_with_or_chain() {
let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_or_chain_");
let metadata = test_region_metadata();
let builder = || {
BloomFilterIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
&metadata,
factory.clone(),
)
};
let expr = col("column1")
.eq(lit("value1"))
.or(col("column1")
.eq(lit("value2"))
.or(col("column1").eq(lit("value4"))))
.or(col("column1").eq(lit("value3")));
let result = builder().build(&[expr]).unwrap();
assert!(result.is_some());
let predicates = result.unwrap().predicates;
let column_predicates = predicates.get(&1).unwrap();
assert_eq!(column_predicates.len(), 1);
assert_eq!(column_predicates[0].list.len(), 4);
let or_chain_predicates = &column_predicates[0].list;
let encode_str = |s: &str| {
encode_lit(
&ScalarValue::Utf8(Some(s.to_string())),
ConcreteDataType::string_datatype(),
)
.unwrap()
};
assert!(or_chain_predicates.contains(&encode_str("value1")));
assert!(or_chain_predicates.contains(&encode_str("value2")));
assert!(or_chain_predicates.contains(&encode_str("value3")));
assert!(or_chain_predicates.contains(&encode_str("value4")));
// Test with null value
let expr = col("column1").eq(Expr::Literal(ScalarValue::Utf8(None)));
let result = builder().build(&[expr]).unwrap();
assert!(result.is_none());
// Test with different column
let expr = col("column1")
.eq(lit("value1"))
.or(col("column2").eq(lit("value2")));
let result = builder().build(&[expr]).unwrap();
assert!(result.is_none());
// Test with non or chain
let expr = col("column1")
.eq(lit("value1"))
.or(col("column1").gt_eq(lit("value2")));
let result = builder().build(&[expr]).unwrap();
assert!(result.is_none());
}
#[test]
fn test_build_with_and_expressions() {
let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_and_");