mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-04 12:22:55 +00:00
refactor: allow bloom filter search to apply and conjunction (#5770)
* refactor: change bloom filter search from any to all match Signed-off-by: Zhenchi <zhongzc_arch@outlook.com> * polish Signed-off-by: Zhenchi <zhongzc_arch@outlook.com> * place back in list Signed-off-by: Zhenchi <zhongzc_arch@outlook.com> * nit Signed-off-by: Zhenchi <zhongzc_arch@outlook.com> --------- Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
This commit is contained in:
@@ -15,6 +15,7 @@
|
||||
use std::collections::HashSet;
|
||||
use std::ops::Range;
|
||||
|
||||
use fastbloom::BloomFilter;
|
||||
use greptime_proto::v1::index::BloomFilterMeta;
|
||||
use itertools::Itertools;
|
||||
|
||||
@@ -22,6 +23,14 @@ use crate::bloom_filter::error::Result;
|
||||
use crate::bloom_filter::reader::BloomFilterReader;
|
||||
use crate::Bytes;
|
||||
|
||||
/// `InListPredicate` contains a list of acceptable values. A value needs to match at least
|
||||
/// one of the elements (logical OR semantic) for the predicate to be satisfied.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct InListPredicate {
|
||||
/// List of acceptable values.
|
||||
pub list: HashSet<Bytes>,
|
||||
}
|
||||
|
||||
pub struct BloomFilterApplier {
|
||||
reader: Box<dyn BloomFilterReader + Send>,
|
||||
meta: BloomFilterMeta,
|
||||
@@ -34,66 +43,161 @@ impl BloomFilterApplier {
|
||||
Ok(Self { reader, meta })
|
||||
}
|
||||
|
||||
/// Searches ranges of rows that match the given probes in the given search range.
|
||||
/// Searches ranges of rows that match all the given predicates in the search ranges.
|
||||
/// Each predicate represents an OR condition of probes, and all predicates must match (AND semantics).
|
||||
/// The logic is: (probe1 OR probe2 OR ...) AND (probe3 OR probe4 OR ...)
|
||||
pub async fn search(
|
||||
&mut self,
|
||||
probes: &HashSet<Bytes>,
|
||||
search_range: Range<usize>,
|
||||
predicates: &[InListPredicate],
|
||||
search_ranges: &[Range<usize>],
|
||||
) -> Result<Vec<Range<usize>>> {
|
||||
let rows_per_segment = self.meta.rows_per_segment as usize;
|
||||
let start_seg = search_range.start / rows_per_segment;
|
||||
let mut end_seg = search_range.end.div_ceil(rows_per_segment);
|
||||
|
||||
if end_seg == self.meta.segment_loc_indices.len() + 1 {
|
||||
// In a previous version, there was a bug where if the last segment was all null,
|
||||
// this segment would not be written into the index. This caused the slice
|
||||
// `self.meta.segment_loc_indices[start_seg..end_seg]` to go out of bounds due to
|
||||
// the missing segment. Since the `search` function does not search for nulls,
|
||||
// we can simply ignore the last segment in this buggy scenario.
|
||||
end_seg -= 1;
|
||||
if predicates.is_empty() {
|
||||
// If no predicates, return empty result
|
||||
return Ok(Vec::new());
|
||||
}
|
||||
|
||||
let locs = &self.meta.segment_loc_indices[start_seg..end_seg];
|
||||
let segments = self.row_ranges_to_segments(search_ranges);
|
||||
let (seg_locations, bloom_filters) = self.load_bloom_filters(&segments).await?;
|
||||
let matching_row_ranges = self.find_matching_rows(seg_locations, bloom_filters, predicates);
|
||||
Ok(intersect_ranges(search_ranges, &matching_row_ranges))
|
||||
}
|
||||
|
||||
// dedup locs
|
||||
let deduped_locs = locs
|
||||
/// Converts row ranges to segment ranges and returns unique segments
|
||||
fn row_ranges_to_segments(&self, row_ranges: &[Range<usize>]) -> Vec<usize> {
|
||||
let rows_per_segment = self.meta.rows_per_segment as usize;
|
||||
|
||||
let mut segments = vec![];
|
||||
for range in row_ranges {
|
||||
let start_seg = range.start / rows_per_segment;
|
||||
let mut end_seg = range.end.div_ceil(rows_per_segment);
|
||||
|
||||
if end_seg == self.meta.segment_loc_indices.len() + 1 {
|
||||
// Handle legacy bug with missing last segment
|
||||
//
|
||||
// In a previous version, there was a bug where if the last segment was all null,
|
||||
// this segment would not be written into the index. This caused the slice
|
||||
// `self.meta.segment_loc_indices[start_seg..end_seg]` to go out of bounds due to
|
||||
// the missing segment. Since the `search` function does not search for nulls,
|
||||
// we can simply ignore the last segment in this buggy scenario.
|
||||
end_seg -= 1;
|
||||
}
|
||||
segments.extend(start_seg..end_seg);
|
||||
}
|
||||
|
||||
// Ensure segments are unique and sorted
|
||||
segments.sort_unstable();
|
||||
segments.dedup();
|
||||
|
||||
segments
|
||||
}
|
||||
|
||||
/// Loads bloom filters for the given segments and returns the segment locations and bloom filters
|
||||
async fn load_bloom_filters(
|
||||
&mut self,
|
||||
segments: &[usize],
|
||||
) -> Result<(Vec<(u64, usize)>, Vec<BloomFilter>)> {
|
||||
let segment_locations = segments
|
||||
.iter()
|
||||
.dedup()
|
||||
.map(|i| self.meta.bloom_filter_locs[*i as usize])
|
||||
.map(|&seg| (self.meta.segment_loc_indices[seg], seg))
|
||||
.collect::<Vec<_>>();
|
||||
let bfs = self.reader.bloom_filter_vec(&deduped_locs).await?;
|
||||
|
||||
let mut ranges: Vec<Range<usize>> = Vec::with_capacity(bfs.len());
|
||||
for ((_, mut group), bloom) in locs
|
||||
let bloom_filter_locs = segment_locations
|
||||
.iter()
|
||||
.zip(start_seg..end_seg)
|
||||
.chunk_by(|(x, _)| **x)
|
||||
.map(|(loc, _)| *loc)
|
||||
.dedup()
|
||||
.map(|i| self.meta.bloom_filter_locs[i as usize])
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let bloom_filters = self.reader.bloom_filter_vec(&bloom_filter_locs).await?;
|
||||
|
||||
Ok((segment_locations, bloom_filters))
|
||||
}
|
||||
|
||||
/// Finds segments that match all predicates and converts them to row ranges
|
||||
fn find_matching_rows(
|
||||
&self,
|
||||
segment_locations: Vec<(u64, usize)>,
|
||||
bloom_filters: Vec<BloomFilter>,
|
||||
predicates: &[InListPredicate],
|
||||
) -> Vec<Range<usize>> {
|
||||
let rows_per_segment = self.meta.rows_per_segment as usize;
|
||||
let mut matching_row_ranges = Vec::with_capacity(bloom_filters.len());
|
||||
|
||||
// Group segments by their location index (since they have the same bloom filter) and check if they match all predicates
|
||||
for ((_loc_index, group), bloom_filter) in segment_locations
|
||||
.into_iter()
|
||||
.zip(bfs.iter())
|
||||
.chunk_by(|(loc, _)| *loc)
|
||||
.into_iter()
|
||||
.zip(bloom_filters.iter())
|
||||
{
|
||||
let start = group.next().unwrap().1 * rows_per_segment; // SAFETY: group is not empty
|
||||
let end = group.last().map_or(start + rows_per_segment, |(_, end)| {
|
||||
(end + 1) * rows_per_segment
|
||||
// Check if this bloom filter matches each predicate (AND semantics)
|
||||
let matches_all_predicates = predicates.iter().all(|predicate| {
|
||||
// For each predicate, at least one probe must match (OR semantics)
|
||||
predicate
|
||||
.list
|
||||
.iter()
|
||||
.any(|probe| bloom_filter.contains(probe))
|
||||
});
|
||||
let actual_start = start.max(search_range.start);
|
||||
let actual_end = end.min(search_range.end);
|
||||
for probe in probes {
|
||||
if bloom.contains(probe) {
|
||||
match ranges.last_mut() {
|
||||
Some(last) if last.end == actual_start => {
|
||||
last.end = actual_end;
|
||||
}
|
||||
_ => {
|
||||
ranges.push(actual_start..actual_end);
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
if !matches_all_predicates {
|
||||
continue;
|
||||
}
|
||||
|
||||
// For each matching segment, convert to row range
|
||||
for (_, segment) in group {
|
||||
let start_row = segment * rows_per_segment;
|
||||
let end_row = (segment + 1) * rows_per_segment;
|
||||
matching_row_ranges.push(start_row..end_row);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(ranges)
|
||||
self.merge_adjacent_ranges(matching_row_ranges)
|
||||
}
|
||||
|
||||
/// Merges adjacent row ranges to reduce the number of ranges
|
||||
fn merge_adjacent_ranges(&self, ranges: Vec<Range<usize>>) -> Vec<Range<usize>> {
|
||||
ranges
|
||||
.into_iter()
|
||||
.coalesce(|prev, next| {
|
||||
if prev.end == next.start {
|
||||
Ok(prev.start..next.end)
|
||||
} else {
|
||||
Err((prev, next))
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
}
|
||||
}
|
||||
|
||||
/// Intersects two lists of ranges and returns the intersection.
|
||||
///
|
||||
/// The input lists are assumed to be sorted and non-overlapping.
|
||||
fn intersect_ranges(lhs: &[Range<usize>], rhs: &[Range<usize>]) -> Vec<Range<usize>> {
|
||||
let mut i = 0;
|
||||
let mut j = 0;
|
||||
|
||||
let mut output = Vec::new();
|
||||
while i < lhs.len() && j < rhs.len() {
|
||||
let r1 = &lhs[i];
|
||||
let r2 = &rhs[j];
|
||||
|
||||
// Find intersection if exists
|
||||
let start = r1.start.max(r2.start);
|
||||
let end = r1.end.min(r2.end);
|
||||
|
||||
if start < end {
|
||||
output.push(start..end);
|
||||
}
|
||||
|
||||
// Move forward the range that ends first
|
||||
if r1.end < r2.end {
|
||||
i += 1;
|
||||
} else {
|
||||
j += 1;
|
||||
}
|
||||
}
|
||||
|
||||
output
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -158,37 +262,6 @@ mod tests {
|
||||
vec![b"dup".to_vec()],
|
||||
];
|
||||
|
||||
let cases = vec![
|
||||
(vec![b"row00".to_vec()], 0..28, vec![0..4]), // search one row in full range
|
||||
(vec![b"row05".to_vec()], 4..8, vec![4..8]), // search one row in partial range
|
||||
(vec![b"row03".to_vec()], 4..8, vec![]), // search for a row that doesn't exist in the partial range
|
||||
(
|
||||
vec![b"row01".to_vec(), b"row06".to_vec()],
|
||||
0..28,
|
||||
vec![0..8],
|
||||
), // search multiple rows in multiple ranges
|
||||
(
|
||||
vec![b"row01".to_vec(), b"row11".to_vec()],
|
||||
0..28,
|
||||
vec![0..4, 8..12],
|
||||
), // search multiple rows in multiple ranges
|
||||
(vec![b"row99".to_vec()], 0..28, vec![]), // search for a row that doesn't exist in the full range
|
||||
(vec![b"row00".to_vec()], 12..12, vec![]), // search in an empty range
|
||||
(
|
||||
vec![b"row04".to_vec(), b"row05".to_vec()],
|
||||
0..12,
|
||||
vec![4..8],
|
||||
), // search multiple rows in same segment
|
||||
(vec![b"seg01".to_vec()], 0..28, vec![4..8]), // search rows in a segment
|
||||
(vec![b"seg01".to_vec()], 6..28, vec![6..8]), // search rows in a segment in partial range
|
||||
(vec![b"overl".to_vec()], 0..28, vec![0..8]), // search rows in multiple segments
|
||||
(vec![b"overl".to_vec()], 2..28, vec![2..8]), // search range starts from the middle of a segment
|
||||
(vec![b"overp".to_vec()], 0..10, vec![4..10]), // search range ends at the middle of a segment
|
||||
(vec![b"dup".to_vec()], 0..12, vec![]), // search for a duplicate row not in the range
|
||||
(vec![b"dup".to_vec()], 0..16, vec![12..16]), // search for a duplicate row in the range
|
||||
(vec![b"dup".to_vec()], 0..28, vec![12..28]), // search for a duplicate row in the full range
|
||||
];
|
||||
|
||||
for row in rows {
|
||||
creator.push_row_elems(row).await.unwrap();
|
||||
}
|
||||
@@ -196,15 +269,215 @@ mod tests {
|
||||
creator.finish(&mut writer).await.unwrap();
|
||||
|
||||
let bytes = writer.into_inner();
|
||||
|
||||
let reader = BloomFilterReaderImpl::new(bytes);
|
||||
|
||||
let mut applier = BloomFilterApplier::new(Box::new(reader)).await.unwrap();
|
||||
|
||||
for (probes, search_range, expected) in cases {
|
||||
let probes: HashSet<Bytes> = probes.into_iter().collect();
|
||||
let ranges = applier.search(&probes, search_range).await.unwrap();
|
||||
assert_eq!(ranges, expected);
|
||||
// Test cases for predicates
|
||||
let cases = vec![
|
||||
// Single value predicates
|
||||
(
|
||||
vec![InListPredicate {
|
||||
list: HashSet::from_iter([b"row00".to_vec()]),
|
||||
}],
|
||||
0..28,
|
||||
vec![0..4],
|
||||
),
|
||||
(
|
||||
vec![InListPredicate {
|
||||
list: HashSet::from_iter([b"row05".to_vec()]),
|
||||
}],
|
||||
4..8,
|
||||
vec![4..8],
|
||||
),
|
||||
(
|
||||
vec![InListPredicate {
|
||||
list: HashSet::from_iter([b"row03".to_vec()]),
|
||||
}],
|
||||
4..8,
|
||||
vec![],
|
||||
),
|
||||
// Multiple values in a single predicate (OR logic)
|
||||
(
|
||||
vec![InListPredicate {
|
||||
list: HashSet::from_iter([b"overl".to_vec(), b"row06".to_vec()]),
|
||||
}],
|
||||
0..28,
|
||||
vec![0..8],
|
||||
),
|
||||
(
|
||||
vec![InListPredicate {
|
||||
list: HashSet::from_iter([b"seg01".to_vec(), b"overp".to_vec()]),
|
||||
}],
|
||||
0..28,
|
||||
vec![4..12],
|
||||
),
|
||||
// Non-existent values
|
||||
(
|
||||
vec![InListPredicate {
|
||||
list: HashSet::from_iter([b"row99".to_vec()]),
|
||||
}],
|
||||
0..28,
|
||||
vec![],
|
||||
),
|
||||
// Empty range
|
||||
(
|
||||
vec![InListPredicate {
|
||||
list: HashSet::from_iter([b"row00".to_vec()]),
|
||||
}],
|
||||
12..12,
|
||||
vec![],
|
||||
),
|
||||
// Multiple values in a single predicate within specific ranges
|
||||
(
|
||||
vec![InListPredicate {
|
||||
list: HashSet::from_iter([b"row04".to_vec(), b"row05".to_vec()]),
|
||||
}],
|
||||
0..12,
|
||||
vec![4..8],
|
||||
),
|
||||
(
|
||||
vec![InListPredicate {
|
||||
list: HashSet::from_iter([b"seg01".to_vec()]),
|
||||
}],
|
||||
0..28,
|
||||
vec![4..8],
|
||||
),
|
||||
(
|
||||
vec![InListPredicate {
|
||||
list: HashSet::from_iter([b"seg01".to_vec()]),
|
||||
}],
|
||||
6..28,
|
||||
vec![6..8],
|
||||
),
|
||||
// Values spanning multiple segments
|
||||
(
|
||||
vec![InListPredicate {
|
||||
list: HashSet::from_iter([b"overl".to_vec()]),
|
||||
}],
|
||||
0..28,
|
||||
vec![0..8],
|
||||
),
|
||||
(
|
||||
vec![InListPredicate {
|
||||
list: HashSet::from_iter([b"overl".to_vec()]),
|
||||
}],
|
||||
2..28,
|
||||
vec![2..8],
|
||||
),
|
||||
(
|
||||
vec![InListPredicate {
|
||||
list: HashSet::from_iter([b"overp".to_vec()]),
|
||||
}],
|
||||
0..10,
|
||||
vec![4..10],
|
||||
),
|
||||
// Duplicate values
|
||||
(
|
||||
vec![InListPredicate {
|
||||
list: HashSet::from_iter([b"dup".to_vec()]),
|
||||
}],
|
||||
0..12,
|
||||
vec![],
|
||||
),
|
||||
(
|
||||
vec![InListPredicate {
|
||||
list: HashSet::from_iter([b"dup".to_vec()]),
|
||||
}],
|
||||
0..16,
|
||||
vec![12..16],
|
||||
),
|
||||
(
|
||||
vec![InListPredicate {
|
||||
list: HashSet::from_iter([b"dup".to_vec()]),
|
||||
}],
|
||||
0..28,
|
||||
vec![12..28],
|
||||
),
|
||||
// Multiple predicates (AND logic)
|
||||
(
|
||||
vec![
|
||||
InListPredicate {
|
||||
list: HashSet::from_iter([b"row00".to_vec(), b"row01".to_vec()]),
|
||||
},
|
||||
InListPredicate {
|
||||
list: HashSet::from_iter([b"seg00".to_vec()]),
|
||||
},
|
||||
],
|
||||
0..28,
|
||||
vec![0..4],
|
||||
),
|
||||
(
|
||||
vec![
|
||||
InListPredicate {
|
||||
list: HashSet::from_iter([b"overl".to_vec()]),
|
||||
},
|
||||
InListPredicate {
|
||||
list: HashSet::from_iter([b"seg01".to_vec()]),
|
||||
},
|
||||
],
|
||||
0..28,
|
||||
vec![4..8],
|
||||
),
|
||||
];
|
||||
|
||||
for (predicates, search_range, expected) in cases {
|
||||
let result = applier.search(&predicates, &[search_range]).await.unwrap();
|
||||
assert_eq!(
|
||||
result, expected,
|
||||
"Expected {:?}, got {:?}",
|
||||
expected, result
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[allow(clippy::single_range_in_vec_init)]
|
||||
fn test_intersect_ranges() {
|
||||
// empty inputs
|
||||
assert_eq!(intersect_ranges(&[], &[]), Vec::<Range<usize>>::new());
|
||||
assert_eq!(intersect_ranges(&[1..5], &[]), Vec::<Range<usize>>::new());
|
||||
assert_eq!(intersect_ranges(&[], &[1..5]), Vec::<Range<usize>>::new());
|
||||
|
||||
// no overlap
|
||||
assert_eq!(
|
||||
intersect_ranges(&[1..3, 5..7], &[3..5, 7..9]),
|
||||
Vec::<Range<usize>>::new()
|
||||
);
|
||||
|
||||
// single overlap
|
||||
assert_eq!(intersect_ranges(&[1..5], &[3..7]), vec![3..5]);
|
||||
|
||||
// multiple overlaps
|
||||
assert_eq!(
|
||||
intersect_ranges(&[1..5, 7..10, 12..15], &[2..6, 8..13]),
|
||||
vec![2..5, 8..10, 12..13]
|
||||
);
|
||||
|
||||
// exact overlap
|
||||
assert_eq!(
|
||||
intersect_ranges(&[1..3, 5..7], &[1..3, 5..7]),
|
||||
vec![1..3, 5..7]
|
||||
);
|
||||
|
||||
// contained ranges
|
||||
assert_eq!(
|
||||
intersect_ranges(&[1..10], &[2..4, 5..7, 8..9]),
|
||||
vec![2..4, 5..7, 8..9]
|
||||
);
|
||||
|
||||
// partial overlaps
|
||||
assert_eq!(
|
||||
intersect_ranges(&[1..4, 6..9], &[2..7, 8..10]),
|
||||
vec![2..4, 6..7, 8..9]
|
||||
);
|
||||
|
||||
// single point overlap
|
||||
assert_eq!(
|
||||
intersect_ranges(&[1..3], &[3..5]),
|
||||
Vec::<Range<usize>>::new()
|
||||
);
|
||||
|
||||
// large ranges
|
||||
assert_eq!(intersect_ranges(&[0..100], &[50..150]), vec![50..100]);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -20,7 +20,7 @@ use std::sync::Arc;
|
||||
|
||||
use common_base::range_read::RangeReader;
|
||||
use common_telemetry::warn;
|
||||
use index::bloom_filter::applier::BloomFilterApplier;
|
||||
use index::bloom_filter::applier::{BloomFilterApplier, InListPredicate};
|
||||
use index::bloom_filter::reader::{BloomFilterReader, BloomFilterReaderImpl};
|
||||
use object_store::ObjectStore;
|
||||
use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
|
||||
@@ -40,7 +40,6 @@ use crate::error::{
|
||||
use crate::metrics::INDEX_APPLY_ELAPSED;
|
||||
use crate::sst::file::FileId;
|
||||
pub use crate::sst::index::bloom_filter::applier::builder::BloomFilterIndexApplierBuilder;
|
||||
use crate::sst::index::bloom_filter::applier::builder::Predicate;
|
||||
use crate::sst::index::bloom_filter::INDEX_BLOB_TYPE;
|
||||
use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory};
|
||||
use crate::sst::index::TYPE_BLOOM_FILTER_INDEX;
|
||||
@@ -71,17 +70,20 @@ pub struct BloomFilterIndexApplier {
|
||||
bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
|
||||
|
||||
/// Bloom filter predicates.
|
||||
filters: HashMap<ColumnId, Vec<Predicate>>,
|
||||
/// For each column, the value will be retained only if it contains __all__ predicates.
|
||||
predicates: HashMap<ColumnId, Vec<InListPredicate>>,
|
||||
}
|
||||
|
||||
impl BloomFilterIndexApplier {
|
||||
/// Creates a new `BloomFilterIndexApplier`.
|
||||
///
|
||||
/// For each column, the value will be retained only if it contains __all__ predicates.
|
||||
pub fn new(
|
||||
region_dir: String,
|
||||
region_id: RegionId,
|
||||
object_store: ObjectStore,
|
||||
puffin_manager_factory: PuffinManagerFactory,
|
||||
filters: HashMap<ColumnId, Vec<Predicate>>,
|
||||
predicates: HashMap<ColumnId, Vec<InListPredicate>>,
|
||||
) -> Self {
|
||||
Self {
|
||||
region_dir,
|
||||
@@ -91,7 +93,7 @@ impl BloomFilterIndexApplier {
|
||||
puffin_manager_factory,
|
||||
puffin_metadata_cache: None,
|
||||
bloom_filter_index_cache: None,
|
||||
filters,
|
||||
predicates,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -148,7 +150,7 @@ impl BloomFilterIndexApplier {
|
||||
.map(|(i, range)| (*i, vec![range.clone()]))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
for (column_id, predicates) in &self.filters {
|
||||
for (column_id, predicates) in &self.predicates {
|
||||
let blob = match self
|
||||
.blob_reader(file_id, *column_id, file_size_hint)
|
||||
.await?
|
||||
@@ -167,12 +169,12 @@ impl BloomFilterIndexApplier {
|
||||
BloomFilterReaderImpl::new(blob),
|
||||
bloom_filter_cache.clone(),
|
||||
);
|
||||
self.apply_filters(reader, predicates, &input, &mut output)
|
||||
self.apply_predicates(reader, predicates, &mut output)
|
||||
.await
|
||||
.context(ApplyBloomFilterIndexSnafu)?;
|
||||
} else {
|
||||
let reader = BloomFilterReaderImpl::new(blob);
|
||||
self.apply_filters(reader, predicates, &input, &mut output)
|
||||
self.apply_predicates(reader, predicates, &mut output)
|
||||
.await
|
||||
.context(ApplyBloomFilterIndexSnafu)?;
|
||||
}
|
||||
@@ -298,74 +300,27 @@ impl BloomFilterIndexApplier {
|
||||
.context(PuffinBuildReaderSnafu)
|
||||
}
|
||||
|
||||
async fn apply_filters<R: BloomFilterReader + Send + 'static>(
|
||||
async fn apply_predicates<R: BloomFilterReader + Send + 'static>(
|
||||
&self,
|
||||
reader: R,
|
||||
predicates: &[Predicate],
|
||||
input: &[(usize, Range<usize>)],
|
||||
predicates: &[InListPredicate],
|
||||
output: &mut [(usize, Vec<Range<usize>>)],
|
||||
) -> std::result::Result<(), index::bloom_filter::error::Error> {
|
||||
let mut applier = BloomFilterApplier::new(Box::new(reader)).await?;
|
||||
|
||||
for ((_, r), (_, output)) in input.iter().zip(output.iter_mut()) {
|
||||
for (_, output) in output.iter_mut() {
|
||||
// All rows are filtered out, skip the search
|
||||
if output.is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
for predicate in predicates {
|
||||
match predicate {
|
||||
Predicate::InList(in_list) => {
|
||||
let res = applier.search(&in_list.list, r.clone()).await?;
|
||||
if res.is_empty() {
|
||||
output.clear();
|
||||
break;
|
||||
}
|
||||
|
||||
*output = intersect_ranges(output, &res);
|
||||
if output.is_empty() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
*output = applier.search(predicates, output).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Intersects two lists of ranges and returns the intersection.
|
||||
///
|
||||
/// The input lists are assumed to be sorted and non-overlapping.
|
||||
fn intersect_ranges(lhs: &[Range<usize>], rhs: &[Range<usize>]) -> Vec<Range<usize>> {
|
||||
let mut i = 0;
|
||||
let mut j = 0;
|
||||
|
||||
let mut output = Vec::new();
|
||||
while i < lhs.len() && j < rhs.len() {
|
||||
let r1 = &lhs[i];
|
||||
let r2 = &rhs[j];
|
||||
|
||||
// Find intersection if exists
|
||||
let start = r1.start.max(r2.start);
|
||||
let end = r1.end.min(r2.end);
|
||||
|
||||
if start < end {
|
||||
output.push(start..end);
|
||||
}
|
||||
|
||||
// Move forward the range that ends first
|
||||
if r1.end < r2.end {
|
||||
i += 1;
|
||||
} else {
|
||||
j += 1;
|
||||
}
|
||||
}
|
||||
|
||||
output
|
||||
}
|
||||
|
||||
fn is_blob_not_found(err: &Error) -> bool {
|
||||
matches!(
|
||||
err,
|
||||
@@ -523,55 +478,4 @@ mod tests {
|
||||
.await;
|
||||
assert_eq!(res, vec![(0, vec![0..4]), (1, vec![3..5])]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[allow(clippy::single_range_in_vec_init)]
|
||||
fn test_intersect_ranges() {
|
||||
// empty inputs
|
||||
assert_eq!(intersect_ranges(&[], &[]), Vec::<Range<usize>>::new());
|
||||
assert_eq!(intersect_ranges(&[1..5], &[]), Vec::<Range<usize>>::new());
|
||||
assert_eq!(intersect_ranges(&[], &[1..5]), Vec::<Range<usize>>::new());
|
||||
|
||||
// no overlap
|
||||
assert_eq!(
|
||||
intersect_ranges(&[1..3, 5..7], &[3..5, 7..9]),
|
||||
Vec::<Range<usize>>::new()
|
||||
);
|
||||
|
||||
// single overlap
|
||||
assert_eq!(intersect_ranges(&[1..5], &[3..7]), vec![3..5]);
|
||||
|
||||
// multiple overlaps
|
||||
assert_eq!(
|
||||
intersect_ranges(&[1..5, 7..10, 12..15], &[2..6, 8..13]),
|
||||
vec![2..5, 8..10, 12..13]
|
||||
);
|
||||
|
||||
// exact overlap
|
||||
assert_eq!(
|
||||
intersect_ranges(&[1..3, 5..7], &[1..3, 5..7]),
|
||||
vec![1..3, 5..7]
|
||||
);
|
||||
|
||||
// contained ranges
|
||||
assert_eq!(
|
||||
intersect_ranges(&[1..10], &[2..4, 5..7, 8..9]),
|
||||
vec![2..4, 5..7, 8..9]
|
||||
);
|
||||
|
||||
// partial overlaps
|
||||
assert_eq!(
|
||||
intersect_ranges(&[1..4, 6..9], &[2..7, 8..10]),
|
||||
vec![2..4, 6..7, 8..9]
|
||||
);
|
||||
|
||||
// single point overlap
|
||||
assert_eq!(
|
||||
intersect_ranges(&[1..3], &[3..5]),
|
||||
Vec::<Range<usize>>::new()
|
||||
);
|
||||
|
||||
// large ranges
|
||||
assert_eq!(intersect_ranges(&[0..100], &[50..150]), vec![50..100]);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -20,6 +20,7 @@ use datafusion_expr::expr::InList;
|
||||
use datafusion_expr::{BinaryExpr, Expr, Operator};
|
||||
use datatypes::data_type::ConcreteDataType;
|
||||
use datatypes::value::Value;
|
||||
use index::bloom_filter::applier::InListPredicate;
|
||||
use index::Bytes;
|
||||
use object_store::ObjectStore;
|
||||
use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
|
||||
@@ -35,21 +36,6 @@ use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplier;
|
||||
use crate::sst::index::codec::IndexValueCodec;
|
||||
use crate::sst::index::puffin_manager::PuffinManagerFactory;
|
||||
|
||||
/// Enumerates types of predicates for value filtering.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub enum Predicate {
|
||||
/// Predicate for matching values in a list.
|
||||
InList(InListPredicate),
|
||||
}
|
||||
|
||||
/// `InListPredicate` contains a list of acceptable values. A value needs to match at least
|
||||
/// one of the elements (logical OR semantic) for the predicate to be satisfied.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct InListPredicate {
|
||||
/// List of acceptable values.
|
||||
pub list: HashSet<Bytes>,
|
||||
}
|
||||
|
||||
pub struct BloomFilterIndexApplierBuilder<'a> {
|
||||
region_dir: String,
|
||||
object_store: ObjectStore,
|
||||
@@ -58,7 +44,7 @@ pub struct BloomFilterIndexApplierBuilder<'a> {
|
||||
file_cache: Option<FileCacheRef>,
|
||||
puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
|
||||
bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
|
||||
output: HashMap<ColumnId, Vec<Predicate>>,
|
||||
predicates: HashMap<ColumnId, Vec<InListPredicate>>,
|
||||
}
|
||||
|
||||
impl<'a> BloomFilterIndexApplierBuilder<'a> {
|
||||
@@ -76,7 +62,7 @@ impl<'a> BloomFilterIndexApplierBuilder<'a> {
|
||||
file_cache: None,
|
||||
puffin_metadata_cache: None,
|
||||
bloom_filter_index_cache: None,
|
||||
output: HashMap::default(),
|
||||
predicates: HashMap::default(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -107,7 +93,7 @@ impl<'a> BloomFilterIndexApplierBuilder<'a> {
|
||||
self.traverse_and_collect(expr);
|
||||
}
|
||||
|
||||
if self.output.is_empty() {
|
||||
if self.predicates.is_empty() {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
@@ -116,7 +102,7 @@ impl<'a> BloomFilterIndexApplierBuilder<'a> {
|
||||
self.metadata.region_id,
|
||||
self.object_store,
|
||||
self.puffin_manager_factory,
|
||||
self.output,
|
||||
self.predicates,
|
||||
)
|
||||
.with_file_cache(self.file_cache)
|
||||
.with_puffin_metadata_cache(self.puffin_metadata_cache)
|
||||
@@ -178,14 +164,12 @@ impl<'a> BloomFilterIndexApplierBuilder<'a> {
|
||||
return Ok(());
|
||||
};
|
||||
let value = encode_lit(lit, data_type)?;
|
||||
|
||||
// Create bloom filter predicate
|
||||
let mut set = HashSet::new();
|
||||
set.insert(value);
|
||||
let predicate = Predicate::InList(InListPredicate { list: set });
|
||||
|
||||
// Add to output predicates
|
||||
self.output.entry(column_id).or_default().push(predicate);
|
||||
self.predicates
|
||||
.entry(column_id)
|
||||
.or_default()
|
||||
.push(InListPredicate {
|
||||
list: HashSet::from([value]),
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -223,12 +207,12 @@ impl<'a> BloomFilterIndexApplierBuilder<'a> {
|
||||
}
|
||||
|
||||
if !valid_predicates.is_empty() {
|
||||
self.output
|
||||
self.predicates
|
||||
.entry(column_id)
|
||||
.or_default()
|
||||
.push(Predicate::InList(InListPredicate {
|
||||
.push(InListPredicate {
|
||||
list: valid_predicates,
|
||||
}));
|
||||
});
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -245,7 +229,7 @@ impl<'a> BloomFilterIndexApplierBuilder<'a> {
|
||||
|
||||
// TODO(ruihang): extract this and the one under inverted_index into a common util mod.
|
||||
/// Helper function to encode a literal into bytes.
|
||||
fn encode_lit(lit: &ScalarValue, data_type: ConcreteDataType) -> Result<Vec<u8>> {
|
||||
fn encode_lit(lit: &ScalarValue, data_type: ConcreteDataType) -> Result<Bytes> {
|
||||
let value = Value::try_from(lit.clone()).context(ConvertValueSnafu)?;
|
||||
let mut bytes = vec![];
|
||||
let field = SortField::new(data_type);
|
||||
@@ -323,20 +307,18 @@ mod tests {
|
||||
&metadata,
|
||||
factory,
|
||||
);
|
||||
|
||||
let exprs = vec![Expr::BinaryExpr(BinaryExpr {
|
||||
left: Box::new(column("column1")),
|
||||
op: Operator::Eq,
|
||||
right: Box::new(string_lit("value1")),
|
||||
})];
|
||||
|
||||
let result = builder.build(&exprs).unwrap();
|
||||
assert!(result.is_some());
|
||||
|
||||
let filters = result.unwrap().filters;
|
||||
assert_eq!(filters.len(), 1);
|
||||
let predicates = result.unwrap().predicates;
|
||||
assert_eq!(predicates.len(), 1);
|
||||
|
||||
let column_predicates = filters.get(&1).unwrap();
|
||||
let column_predicates = predicates.get(&1).unwrap();
|
||||
assert_eq!(column_predicates.len(), 1);
|
||||
|
||||
let expected = encode_lit(
|
||||
@@ -344,11 +326,7 @@ mod tests {
|
||||
ConcreteDataType::string_datatype(),
|
||||
)
|
||||
.unwrap();
|
||||
match &column_predicates[0] {
|
||||
Predicate::InList(p) => {
|
||||
assert_eq!(p.list.iter().next().unwrap(), &expected);
|
||||
}
|
||||
}
|
||||
assert_eq!(column_predicates[0].list, HashSet::from([expected]));
|
||||
}
|
||||
|
||||
fn int64_lit(i: i64) -> Expr {
|
||||
@@ -375,15 +353,10 @@ mod tests {
|
||||
let result = builder.build(&exprs).unwrap();
|
||||
assert!(result.is_some());
|
||||
|
||||
let filters = result.unwrap().filters;
|
||||
let column_predicates = filters.get(&2).unwrap();
|
||||
let predicates = result.unwrap().predicates;
|
||||
let column_predicates = predicates.get(&2).unwrap();
|
||||
assert_eq!(column_predicates.len(), 1);
|
||||
|
||||
match &column_predicates[0] {
|
||||
Predicate::InList(p) => {
|
||||
assert_eq!(p.list.len(), 3);
|
||||
}
|
||||
}
|
||||
assert_eq!(column_predicates[0].list.len(), 3);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -396,7 +369,6 @@ mod tests {
|
||||
&metadata,
|
||||
factory,
|
||||
);
|
||||
|
||||
let exprs = vec![Expr::BinaryExpr(BinaryExpr {
|
||||
left: Box::new(Expr::BinaryExpr(BinaryExpr {
|
||||
left: Box::new(column("column1")),
|
||||
@@ -410,14 +382,13 @@ mod tests {
|
||||
right: Box::new(int64_lit(42)),
|
||||
})),
|
||||
})];
|
||||
|
||||
let result = builder.build(&exprs).unwrap();
|
||||
assert!(result.is_some());
|
||||
|
||||
let filters = result.unwrap().filters;
|
||||
assert_eq!(filters.len(), 2);
|
||||
assert!(filters.contains_key(&1));
|
||||
assert!(filters.contains_key(&2));
|
||||
let predicates = result.unwrap().predicates;
|
||||
assert_eq!(predicates.len(), 2);
|
||||
assert!(predicates.contains_key(&1));
|
||||
assert!(predicates.contains_key(&2));
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -451,14 +422,10 @@ mod tests {
|
||||
let result = builder.build(&exprs).unwrap();
|
||||
assert!(result.is_some());
|
||||
|
||||
let filters = result.unwrap().filters;
|
||||
assert!(!filters.contains_key(&1)); // Null equality should be ignored
|
||||
let column2_predicates = filters.get(&2).unwrap();
|
||||
match &column2_predicates[0] {
|
||||
Predicate::InList(p) => {
|
||||
assert_eq!(p.list.len(), 2); // Only non-null values should be included
|
||||
}
|
||||
}
|
||||
let predicates = result.unwrap().predicates;
|
||||
assert!(!predicates.contains_key(&1)); // Null equality should be ignored
|
||||
let column2_predicates = predicates.get(&2).unwrap();
|
||||
assert_eq!(column2_predicates[0].list.len(), 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -471,7 +438,6 @@ mod tests {
|
||||
&metadata,
|
||||
factory,
|
||||
);
|
||||
|
||||
let exprs = vec![
|
||||
// Non-equality operator
|
||||
Expr::BinaryExpr(BinaryExpr {
|
||||
@@ -507,7 +473,6 @@ mod tests {
|
||||
&metadata,
|
||||
factory,
|
||||
);
|
||||
|
||||
let exprs = vec![
|
||||
Expr::BinaryExpr(BinaryExpr {
|
||||
left: Box::new(column("column1")),
|
||||
@@ -524,8 +489,8 @@ mod tests {
|
||||
let result = builder.build(&exprs).unwrap();
|
||||
assert!(result.is_some());
|
||||
|
||||
let filters = result.unwrap().filters;
|
||||
let column_predicates = filters.get(&1).unwrap();
|
||||
let predicates = result.unwrap().predicates;
|
||||
let column_predicates = predicates.get(&1).unwrap();
|
||||
assert_eq!(column_predicates.len(), 2);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user