diff --git a/src/index/src/bloom_filter/applier.rs b/src/index/src/bloom_filter/applier.rs index 273c5413b5..f625283607 100644 --- a/src/index/src/bloom_filter/applier.rs +++ b/src/index/src/bloom_filter/applier.rs @@ -15,6 +15,7 @@ use std::collections::HashSet; use std::ops::Range; +use fastbloom::BloomFilter; use greptime_proto::v1::index::BloomFilterMeta; use itertools::Itertools; @@ -22,6 +23,14 @@ use crate::bloom_filter::error::Result; use crate::bloom_filter::reader::BloomFilterReader; use crate::Bytes; +/// `InListPredicate` contains a list of acceptable values. A value needs to match at least +/// one of the elements (logical OR semantic) for the predicate to be satisfied. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct InListPredicate { + /// List of acceptable values. + pub list: HashSet, +} + pub struct BloomFilterApplier { reader: Box, meta: BloomFilterMeta, @@ -34,66 +43,161 @@ impl BloomFilterApplier { Ok(Self { reader, meta }) } - /// Searches ranges of rows that match the given probes in the given search range. + /// Searches ranges of rows that match all the given predicates in the search ranges. + /// Each predicate represents an OR condition of probes, and all predicates must match (AND semantics). + /// The logic is: (probe1 OR probe2 OR ...) AND (probe3 OR probe4 OR ...) pub async fn search( &mut self, - probes: &HashSet, - search_range: Range, + predicates: &[InListPredicate], + search_ranges: &[Range], ) -> Result>> { - let rows_per_segment = self.meta.rows_per_segment as usize; - let start_seg = search_range.start / rows_per_segment; - let mut end_seg = search_range.end.div_ceil(rows_per_segment); - - if end_seg == self.meta.segment_loc_indices.len() + 1 { - // In a previous version, there was a bug where if the last segment was all null, - // this segment would not be written into the index. This caused the slice - // `self.meta.segment_loc_indices[start_seg..end_seg]` to go out of bounds due to - // the missing segment. Since the `search` function does not search for nulls, - // we can simply ignore the last segment in this buggy scenario. - end_seg -= 1; + if predicates.is_empty() { + // If no predicates, return empty result + return Ok(Vec::new()); } - let locs = &self.meta.segment_loc_indices[start_seg..end_seg]; + let segments = self.row_ranges_to_segments(search_ranges); + let (seg_locations, bloom_filters) = self.load_bloom_filters(&segments).await?; + let matching_row_ranges = self.find_matching_rows(seg_locations, bloom_filters, predicates); + Ok(intersect_ranges(search_ranges, &matching_row_ranges)) + } - // dedup locs - let deduped_locs = locs + /// Converts row ranges to segment ranges and returns unique segments + fn row_ranges_to_segments(&self, row_ranges: &[Range]) -> Vec { + let rows_per_segment = self.meta.rows_per_segment as usize; + + let mut segments = vec![]; + for range in row_ranges { + let start_seg = range.start / rows_per_segment; + let mut end_seg = range.end.div_ceil(rows_per_segment); + + if end_seg == self.meta.segment_loc_indices.len() + 1 { + // Handle legacy bug with missing last segment + // + // In a previous version, there was a bug where if the last segment was all null, + // this segment would not be written into the index. This caused the slice + // `self.meta.segment_loc_indices[start_seg..end_seg]` to go out of bounds due to + // the missing segment. Since the `search` function does not search for nulls, + // we can simply ignore the last segment in this buggy scenario. + end_seg -= 1; + } + segments.extend(start_seg..end_seg); + } + + // Ensure segments are unique and sorted + segments.sort_unstable(); + segments.dedup(); + + segments + } + + /// Loads bloom filters for the given segments and returns the segment locations and bloom filters + async fn load_bloom_filters( + &mut self, + segments: &[usize], + ) -> Result<(Vec<(u64, usize)>, Vec)> { + let segment_locations = segments .iter() - .dedup() - .map(|i| self.meta.bloom_filter_locs[*i as usize]) + .map(|&seg| (self.meta.segment_loc_indices[seg], seg)) .collect::>(); - let bfs = self.reader.bloom_filter_vec(&deduped_locs).await?; - let mut ranges: Vec> = Vec::with_capacity(bfs.len()); - for ((_, mut group), bloom) in locs + let bloom_filter_locs = segment_locations .iter() - .zip(start_seg..end_seg) - .chunk_by(|(x, _)| **x) + .map(|(loc, _)| *loc) + .dedup() + .map(|i| self.meta.bloom_filter_locs[i as usize]) + .collect::>(); + + let bloom_filters = self.reader.bloom_filter_vec(&bloom_filter_locs).await?; + + Ok((segment_locations, bloom_filters)) + } + + /// Finds segments that match all predicates and converts them to row ranges + fn find_matching_rows( + &self, + segment_locations: Vec<(u64, usize)>, + bloom_filters: Vec, + predicates: &[InListPredicate], + ) -> Vec> { + let rows_per_segment = self.meta.rows_per_segment as usize; + let mut matching_row_ranges = Vec::with_capacity(bloom_filters.len()); + + // Group segments by their location index (since they have the same bloom filter) and check if they match all predicates + for ((_loc_index, group), bloom_filter) in segment_locations .into_iter() - .zip(bfs.iter()) + .chunk_by(|(loc, _)| *loc) + .into_iter() + .zip(bloom_filters.iter()) { - let start = group.next().unwrap().1 * rows_per_segment; // SAFETY: group is not empty - let end = group.last().map_or(start + rows_per_segment, |(_, end)| { - (end + 1) * rows_per_segment + // Check if this bloom filter matches each predicate (AND semantics) + let matches_all_predicates = predicates.iter().all(|predicate| { + // For each predicate, at least one probe must match (OR semantics) + predicate + .list + .iter() + .any(|probe| bloom_filter.contains(probe)) }); - let actual_start = start.max(search_range.start); - let actual_end = end.min(search_range.end); - for probe in probes { - if bloom.contains(probe) { - match ranges.last_mut() { - Some(last) if last.end == actual_start => { - last.end = actual_end; - } - _ => { - ranges.push(actual_start..actual_end); - } - } - break; - } + + if !matches_all_predicates { + continue; + } + + // For each matching segment, convert to row range + for (_, segment) in group { + let start_row = segment * rows_per_segment; + let end_row = (segment + 1) * rows_per_segment; + matching_row_ranges.push(start_row..end_row); } } - Ok(ranges) + self.merge_adjacent_ranges(matching_row_ranges) } + + /// Merges adjacent row ranges to reduce the number of ranges + fn merge_adjacent_ranges(&self, ranges: Vec>) -> Vec> { + ranges + .into_iter() + .coalesce(|prev, next| { + if prev.end == next.start { + Ok(prev.start..next.end) + } else { + Err((prev, next)) + } + }) + .collect::>() + } +} + +/// Intersects two lists of ranges and returns the intersection. +/// +/// The input lists are assumed to be sorted and non-overlapping. +fn intersect_ranges(lhs: &[Range], rhs: &[Range]) -> Vec> { + let mut i = 0; + let mut j = 0; + + let mut output = Vec::new(); + while i < lhs.len() && j < rhs.len() { + let r1 = &lhs[i]; + let r2 = &rhs[j]; + + // Find intersection if exists + let start = r1.start.max(r2.start); + let end = r1.end.min(r2.end); + + if start < end { + output.push(start..end); + } + + // Move forward the range that ends first + if r1.end < r2.end { + i += 1; + } else { + j += 1; + } + } + + output } #[cfg(test)] @@ -158,37 +262,6 @@ mod tests { vec![b"dup".to_vec()], ]; - let cases = vec![ - (vec![b"row00".to_vec()], 0..28, vec![0..4]), // search one row in full range - (vec![b"row05".to_vec()], 4..8, vec![4..8]), // search one row in partial range - (vec![b"row03".to_vec()], 4..8, vec![]), // search for a row that doesn't exist in the partial range - ( - vec![b"row01".to_vec(), b"row06".to_vec()], - 0..28, - vec![0..8], - ), // search multiple rows in multiple ranges - ( - vec![b"row01".to_vec(), b"row11".to_vec()], - 0..28, - vec![0..4, 8..12], - ), // search multiple rows in multiple ranges - (vec![b"row99".to_vec()], 0..28, vec![]), // search for a row that doesn't exist in the full range - (vec![b"row00".to_vec()], 12..12, vec![]), // search in an empty range - ( - vec![b"row04".to_vec(), b"row05".to_vec()], - 0..12, - vec![4..8], - ), // search multiple rows in same segment - (vec![b"seg01".to_vec()], 0..28, vec![4..8]), // search rows in a segment - (vec![b"seg01".to_vec()], 6..28, vec![6..8]), // search rows in a segment in partial range - (vec![b"overl".to_vec()], 0..28, vec![0..8]), // search rows in multiple segments - (vec![b"overl".to_vec()], 2..28, vec![2..8]), // search range starts from the middle of a segment - (vec![b"overp".to_vec()], 0..10, vec![4..10]), // search range ends at the middle of a segment - (vec![b"dup".to_vec()], 0..12, vec![]), // search for a duplicate row not in the range - (vec![b"dup".to_vec()], 0..16, vec![12..16]), // search for a duplicate row in the range - (vec![b"dup".to_vec()], 0..28, vec![12..28]), // search for a duplicate row in the full range - ]; - for row in rows { creator.push_row_elems(row).await.unwrap(); } @@ -196,15 +269,215 @@ mod tests { creator.finish(&mut writer).await.unwrap(); let bytes = writer.into_inner(); - let reader = BloomFilterReaderImpl::new(bytes); - let mut applier = BloomFilterApplier::new(Box::new(reader)).await.unwrap(); - for (probes, search_range, expected) in cases { - let probes: HashSet = probes.into_iter().collect(); - let ranges = applier.search(&probes, search_range).await.unwrap(); - assert_eq!(ranges, expected); + // Test cases for predicates + let cases = vec![ + // Single value predicates + ( + vec![InListPredicate { + list: HashSet::from_iter([b"row00".to_vec()]), + }], + 0..28, + vec![0..4], + ), + ( + vec![InListPredicate { + list: HashSet::from_iter([b"row05".to_vec()]), + }], + 4..8, + vec![4..8], + ), + ( + vec![InListPredicate { + list: HashSet::from_iter([b"row03".to_vec()]), + }], + 4..8, + vec![], + ), + // Multiple values in a single predicate (OR logic) + ( + vec![InListPredicate { + list: HashSet::from_iter([b"overl".to_vec(), b"row06".to_vec()]), + }], + 0..28, + vec![0..8], + ), + ( + vec![InListPredicate { + list: HashSet::from_iter([b"seg01".to_vec(), b"overp".to_vec()]), + }], + 0..28, + vec![4..12], + ), + // Non-existent values + ( + vec![InListPredicate { + list: HashSet::from_iter([b"row99".to_vec()]), + }], + 0..28, + vec![], + ), + // Empty range + ( + vec![InListPredicate { + list: HashSet::from_iter([b"row00".to_vec()]), + }], + 12..12, + vec![], + ), + // Multiple values in a single predicate within specific ranges + ( + vec![InListPredicate { + list: HashSet::from_iter([b"row04".to_vec(), b"row05".to_vec()]), + }], + 0..12, + vec![4..8], + ), + ( + vec![InListPredicate { + list: HashSet::from_iter([b"seg01".to_vec()]), + }], + 0..28, + vec![4..8], + ), + ( + vec![InListPredicate { + list: HashSet::from_iter([b"seg01".to_vec()]), + }], + 6..28, + vec![6..8], + ), + // Values spanning multiple segments + ( + vec![InListPredicate { + list: HashSet::from_iter([b"overl".to_vec()]), + }], + 0..28, + vec![0..8], + ), + ( + vec![InListPredicate { + list: HashSet::from_iter([b"overl".to_vec()]), + }], + 2..28, + vec![2..8], + ), + ( + vec![InListPredicate { + list: HashSet::from_iter([b"overp".to_vec()]), + }], + 0..10, + vec![4..10], + ), + // Duplicate values + ( + vec![InListPredicate { + list: HashSet::from_iter([b"dup".to_vec()]), + }], + 0..12, + vec![], + ), + ( + vec![InListPredicate { + list: HashSet::from_iter([b"dup".to_vec()]), + }], + 0..16, + vec![12..16], + ), + ( + vec![InListPredicate { + list: HashSet::from_iter([b"dup".to_vec()]), + }], + 0..28, + vec![12..28], + ), + // Multiple predicates (AND logic) + ( + vec![ + InListPredicate { + list: HashSet::from_iter([b"row00".to_vec(), b"row01".to_vec()]), + }, + InListPredicate { + list: HashSet::from_iter([b"seg00".to_vec()]), + }, + ], + 0..28, + vec![0..4], + ), + ( + vec![ + InListPredicate { + list: HashSet::from_iter([b"overl".to_vec()]), + }, + InListPredicate { + list: HashSet::from_iter([b"seg01".to_vec()]), + }, + ], + 0..28, + vec![4..8], + ), + ]; + + for (predicates, search_range, expected) in cases { + let result = applier.search(&predicates, &[search_range]).await.unwrap(); + assert_eq!( + result, expected, + "Expected {:?}, got {:?}", + expected, result + ); } } + + #[test] + #[allow(clippy::single_range_in_vec_init)] + fn test_intersect_ranges() { + // empty inputs + assert_eq!(intersect_ranges(&[], &[]), Vec::>::new()); + assert_eq!(intersect_ranges(&[1..5], &[]), Vec::>::new()); + assert_eq!(intersect_ranges(&[], &[1..5]), Vec::>::new()); + + // no overlap + assert_eq!( + intersect_ranges(&[1..3, 5..7], &[3..5, 7..9]), + Vec::>::new() + ); + + // single overlap + assert_eq!(intersect_ranges(&[1..5], &[3..7]), vec![3..5]); + + // multiple overlaps + assert_eq!( + intersect_ranges(&[1..5, 7..10, 12..15], &[2..6, 8..13]), + vec![2..5, 8..10, 12..13] + ); + + // exact overlap + assert_eq!( + intersect_ranges(&[1..3, 5..7], &[1..3, 5..7]), + vec![1..3, 5..7] + ); + + // contained ranges + assert_eq!( + intersect_ranges(&[1..10], &[2..4, 5..7, 8..9]), + vec![2..4, 5..7, 8..9] + ); + + // partial overlaps + assert_eq!( + intersect_ranges(&[1..4, 6..9], &[2..7, 8..10]), + vec![2..4, 6..7, 8..9] + ); + + // single point overlap + assert_eq!( + intersect_ranges(&[1..3], &[3..5]), + Vec::>::new() + ); + + // large ranges + assert_eq!(intersect_ranges(&[0..100], &[50..150]), vec![50..100]); + } } diff --git a/src/mito2/src/sst/index/bloom_filter/applier.rs b/src/mito2/src/sst/index/bloom_filter/applier.rs index f7597773a1..afd5cc16cd 100644 --- a/src/mito2/src/sst/index/bloom_filter/applier.rs +++ b/src/mito2/src/sst/index/bloom_filter/applier.rs @@ -20,7 +20,7 @@ use std::sync::Arc; use common_base::range_read::RangeReader; use common_telemetry::warn; -use index::bloom_filter::applier::BloomFilterApplier; +use index::bloom_filter::applier::{BloomFilterApplier, InListPredicate}; use index::bloom_filter::reader::{BloomFilterReader, BloomFilterReaderImpl}; use object_store::ObjectStore; use puffin::puffin_manager::cache::PuffinMetadataCacheRef; @@ -40,7 +40,6 @@ use crate::error::{ use crate::metrics::INDEX_APPLY_ELAPSED; use crate::sst::file::FileId; pub use crate::sst::index::bloom_filter::applier::builder::BloomFilterIndexApplierBuilder; -use crate::sst::index::bloom_filter::applier::builder::Predicate; use crate::sst::index::bloom_filter::INDEX_BLOB_TYPE; use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory}; use crate::sst::index::TYPE_BLOOM_FILTER_INDEX; @@ -71,17 +70,20 @@ pub struct BloomFilterIndexApplier { bloom_filter_index_cache: Option, /// Bloom filter predicates. - filters: HashMap>, + /// For each column, the value will be retained only if it contains __all__ predicates. + predicates: HashMap>, } impl BloomFilterIndexApplier { /// Creates a new `BloomFilterIndexApplier`. + /// + /// For each column, the value will be retained only if it contains __all__ predicates. pub fn new( region_dir: String, region_id: RegionId, object_store: ObjectStore, puffin_manager_factory: PuffinManagerFactory, - filters: HashMap>, + predicates: HashMap>, ) -> Self { Self { region_dir, @@ -91,7 +93,7 @@ impl BloomFilterIndexApplier { puffin_manager_factory, puffin_metadata_cache: None, bloom_filter_index_cache: None, - filters, + predicates, } } @@ -148,7 +150,7 @@ impl BloomFilterIndexApplier { .map(|(i, range)| (*i, vec![range.clone()])) .collect::>(); - for (column_id, predicates) in &self.filters { + for (column_id, predicates) in &self.predicates { let blob = match self .blob_reader(file_id, *column_id, file_size_hint) .await? @@ -167,12 +169,12 @@ impl BloomFilterIndexApplier { BloomFilterReaderImpl::new(blob), bloom_filter_cache.clone(), ); - self.apply_filters(reader, predicates, &input, &mut output) + self.apply_predicates(reader, predicates, &mut output) .await .context(ApplyBloomFilterIndexSnafu)?; } else { let reader = BloomFilterReaderImpl::new(blob); - self.apply_filters(reader, predicates, &input, &mut output) + self.apply_predicates(reader, predicates, &mut output) .await .context(ApplyBloomFilterIndexSnafu)?; } @@ -298,74 +300,27 @@ impl BloomFilterIndexApplier { .context(PuffinBuildReaderSnafu) } - async fn apply_filters( + async fn apply_predicates( &self, reader: R, - predicates: &[Predicate], - input: &[(usize, Range)], + predicates: &[InListPredicate], output: &mut [(usize, Vec>)], ) -> std::result::Result<(), index::bloom_filter::error::Error> { let mut applier = BloomFilterApplier::new(Box::new(reader)).await?; - for ((_, r), (_, output)) in input.iter().zip(output.iter_mut()) { + for (_, output) in output.iter_mut() { // All rows are filtered out, skip the search if output.is_empty() { continue; } - for predicate in predicates { - match predicate { - Predicate::InList(in_list) => { - let res = applier.search(&in_list.list, r.clone()).await?; - if res.is_empty() { - output.clear(); - break; - } - - *output = intersect_ranges(output, &res); - if output.is_empty() { - break; - } - } - } - } + *output = applier.search(predicates, output).await?; } Ok(()) } } -/// Intersects two lists of ranges and returns the intersection. -/// -/// The input lists are assumed to be sorted and non-overlapping. -fn intersect_ranges(lhs: &[Range], rhs: &[Range]) -> Vec> { - let mut i = 0; - let mut j = 0; - - let mut output = Vec::new(); - while i < lhs.len() && j < rhs.len() { - let r1 = &lhs[i]; - let r2 = &rhs[j]; - - // Find intersection if exists - let start = r1.start.max(r2.start); - let end = r1.end.min(r2.end); - - if start < end { - output.push(start..end); - } - - // Move forward the range that ends first - if r1.end < r2.end { - i += 1; - } else { - j += 1; - } - } - - output -} - fn is_blob_not_found(err: &Error) -> bool { matches!( err, @@ -523,55 +478,4 @@ mod tests { .await; assert_eq!(res, vec![(0, vec![0..4]), (1, vec![3..5])]); } - - #[test] - #[allow(clippy::single_range_in_vec_init)] - fn test_intersect_ranges() { - // empty inputs - assert_eq!(intersect_ranges(&[], &[]), Vec::>::new()); - assert_eq!(intersect_ranges(&[1..5], &[]), Vec::>::new()); - assert_eq!(intersect_ranges(&[], &[1..5]), Vec::>::new()); - - // no overlap - assert_eq!( - intersect_ranges(&[1..3, 5..7], &[3..5, 7..9]), - Vec::>::new() - ); - - // single overlap - assert_eq!(intersect_ranges(&[1..5], &[3..7]), vec![3..5]); - - // multiple overlaps - assert_eq!( - intersect_ranges(&[1..5, 7..10, 12..15], &[2..6, 8..13]), - vec![2..5, 8..10, 12..13] - ); - - // exact overlap - assert_eq!( - intersect_ranges(&[1..3, 5..7], &[1..3, 5..7]), - vec![1..3, 5..7] - ); - - // contained ranges - assert_eq!( - intersect_ranges(&[1..10], &[2..4, 5..7, 8..9]), - vec![2..4, 5..7, 8..9] - ); - - // partial overlaps - assert_eq!( - intersect_ranges(&[1..4, 6..9], &[2..7, 8..10]), - vec![2..4, 6..7, 8..9] - ); - - // single point overlap - assert_eq!( - intersect_ranges(&[1..3], &[3..5]), - Vec::>::new() - ); - - // large ranges - assert_eq!(intersect_ranges(&[0..100], &[50..150]), vec![50..100]); - } } diff --git a/src/mito2/src/sst/index/bloom_filter/applier/builder.rs b/src/mito2/src/sst/index/bloom_filter/applier/builder.rs index 956c5ce38e..fa58d7976e 100644 --- a/src/mito2/src/sst/index/bloom_filter/applier/builder.rs +++ b/src/mito2/src/sst/index/bloom_filter/applier/builder.rs @@ -20,6 +20,7 @@ use datafusion_expr::expr::InList; use datafusion_expr::{BinaryExpr, Expr, Operator}; use datatypes::data_type::ConcreteDataType; use datatypes::value::Value; +use index::bloom_filter::applier::InListPredicate; use index::Bytes; use object_store::ObjectStore; use puffin::puffin_manager::cache::PuffinMetadataCacheRef; @@ -35,21 +36,6 @@ use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplier; use crate::sst::index::codec::IndexValueCodec; use crate::sst::index::puffin_manager::PuffinManagerFactory; -/// Enumerates types of predicates for value filtering. -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum Predicate { - /// Predicate for matching values in a list. - InList(InListPredicate), -} - -/// `InListPredicate` contains a list of acceptable values. A value needs to match at least -/// one of the elements (logical OR semantic) for the predicate to be satisfied. -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct InListPredicate { - /// List of acceptable values. - pub list: HashSet, -} - pub struct BloomFilterIndexApplierBuilder<'a> { region_dir: String, object_store: ObjectStore, @@ -58,7 +44,7 @@ pub struct BloomFilterIndexApplierBuilder<'a> { file_cache: Option, puffin_metadata_cache: Option, bloom_filter_index_cache: Option, - output: HashMap>, + predicates: HashMap>, } impl<'a> BloomFilterIndexApplierBuilder<'a> { @@ -76,7 +62,7 @@ impl<'a> BloomFilterIndexApplierBuilder<'a> { file_cache: None, puffin_metadata_cache: None, bloom_filter_index_cache: None, - output: HashMap::default(), + predicates: HashMap::default(), } } @@ -107,7 +93,7 @@ impl<'a> BloomFilterIndexApplierBuilder<'a> { self.traverse_and_collect(expr); } - if self.output.is_empty() { + if self.predicates.is_empty() { return Ok(None); } @@ -116,7 +102,7 @@ impl<'a> BloomFilterIndexApplierBuilder<'a> { self.metadata.region_id, self.object_store, self.puffin_manager_factory, - self.output, + self.predicates, ) .with_file_cache(self.file_cache) .with_puffin_metadata_cache(self.puffin_metadata_cache) @@ -178,14 +164,12 @@ impl<'a> BloomFilterIndexApplierBuilder<'a> { return Ok(()); }; let value = encode_lit(lit, data_type)?; - - // Create bloom filter predicate - let mut set = HashSet::new(); - set.insert(value); - let predicate = Predicate::InList(InListPredicate { list: set }); - - // Add to output predicates - self.output.entry(column_id).or_default().push(predicate); + self.predicates + .entry(column_id) + .or_default() + .push(InListPredicate { + list: HashSet::from([value]), + }); Ok(()) } @@ -223,12 +207,12 @@ impl<'a> BloomFilterIndexApplierBuilder<'a> { } if !valid_predicates.is_empty() { - self.output + self.predicates .entry(column_id) .or_default() - .push(Predicate::InList(InListPredicate { + .push(InListPredicate { list: valid_predicates, - })); + }); } Ok(()) @@ -245,7 +229,7 @@ impl<'a> BloomFilterIndexApplierBuilder<'a> { // TODO(ruihang): extract this and the one under inverted_index into a common util mod. /// Helper function to encode a literal into bytes. -fn encode_lit(lit: &ScalarValue, data_type: ConcreteDataType) -> Result> { +fn encode_lit(lit: &ScalarValue, data_type: ConcreteDataType) -> Result { let value = Value::try_from(lit.clone()).context(ConvertValueSnafu)?; let mut bytes = vec![]; let field = SortField::new(data_type); @@ -323,20 +307,18 @@ mod tests { &metadata, factory, ); - let exprs = vec![Expr::BinaryExpr(BinaryExpr { left: Box::new(column("column1")), op: Operator::Eq, right: Box::new(string_lit("value1")), })]; - let result = builder.build(&exprs).unwrap(); assert!(result.is_some()); - let filters = result.unwrap().filters; - assert_eq!(filters.len(), 1); + let predicates = result.unwrap().predicates; + assert_eq!(predicates.len(), 1); - let column_predicates = filters.get(&1).unwrap(); + let column_predicates = predicates.get(&1).unwrap(); assert_eq!(column_predicates.len(), 1); let expected = encode_lit( @@ -344,11 +326,7 @@ mod tests { ConcreteDataType::string_datatype(), ) .unwrap(); - match &column_predicates[0] { - Predicate::InList(p) => { - assert_eq!(p.list.iter().next().unwrap(), &expected); - } - } + assert_eq!(column_predicates[0].list, HashSet::from([expected])); } fn int64_lit(i: i64) -> Expr { @@ -375,15 +353,10 @@ mod tests { let result = builder.build(&exprs).unwrap(); assert!(result.is_some()); - let filters = result.unwrap().filters; - let column_predicates = filters.get(&2).unwrap(); + let predicates = result.unwrap().predicates; + let column_predicates = predicates.get(&2).unwrap(); assert_eq!(column_predicates.len(), 1); - - match &column_predicates[0] { - Predicate::InList(p) => { - assert_eq!(p.list.len(), 3); - } - } + assert_eq!(column_predicates[0].list.len(), 3); } #[test] @@ -396,7 +369,6 @@ mod tests { &metadata, factory, ); - let exprs = vec![Expr::BinaryExpr(BinaryExpr { left: Box::new(Expr::BinaryExpr(BinaryExpr { left: Box::new(column("column1")), @@ -410,14 +382,13 @@ mod tests { right: Box::new(int64_lit(42)), })), })]; - let result = builder.build(&exprs).unwrap(); assert!(result.is_some()); - let filters = result.unwrap().filters; - assert_eq!(filters.len(), 2); - assert!(filters.contains_key(&1)); - assert!(filters.contains_key(&2)); + let predicates = result.unwrap().predicates; + assert_eq!(predicates.len(), 2); + assert!(predicates.contains_key(&1)); + assert!(predicates.contains_key(&2)); } #[test] @@ -451,14 +422,10 @@ mod tests { let result = builder.build(&exprs).unwrap(); assert!(result.is_some()); - let filters = result.unwrap().filters; - assert!(!filters.contains_key(&1)); // Null equality should be ignored - let column2_predicates = filters.get(&2).unwrap(); - match &column2_predicates[0] { - Predicate::InList(p) => { - assert_eq!(p.list.len(), 2); // Only non-null values should be included - } - } + let predicates = result.unwrap().predicates; + assert!(!predicates.contains_key(&1)); // Null equality should be ignored + let column2_predicates = predicates.get(&2).unwrap(); + assert_eq!(column2_predicates[0].list.len(), 2); } #[test] @@ -471,7 +438,6 @@ mod tests { &metadata, factory, ); - let exprs = vec![ // Non-equality operator Expr::BinaryExpr(BinaryExpr { @@ -507,7 +473,6 @@ mod tests { &metadata, factory, ); - let exprs = vec![ Expr::BinaryExpr(BinaryExpr { left: Box::new(column("column1")), @@ -524,8 +489,8 @@ mod tests { let result = builder.build(&exprs).unwrap(); assert!(result.is_some()); - let filters = result.unwrap().filters; - let column_predicates = filters.get(&1).unwrap(); + let predicates = result.unwrap().predicates; + let column_predicates = predicates.get(&1).unwrap(); assert_eq!(column_predicates.len(), 2); } }