fix(bloom-filter): filter rows with segment precision (#5286)

* fix(bloom-filter): filter rows with segment precision

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* add case

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* address TODO

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
This commit is contained in:
Zhenchi
2025-01-06 19:45:15 +08:00
committed by GitHub
parent a1cd194d0c
commit 5cf9d7b6ca
12 changed files with 1042 additions and 627 deletions

1
Cargo.lock generated
View File

@@ -5336,7 +5336,6 @@ dependencies = [
"futures",
"greptime-proto",
"mockall",
"parquet",
"pin-project",
"prost 0.12.6",
"rand",

View File

@@ -22,7 +22,6 @@ fst.workspace = true
futures.workspace = true
greptime-proto.workspace = true
mockall.workspace = true
parquet.workspace = true
pin-project.workspace = true
prost.workspace = true
regex.workspace = true

View File

@@ -45,7 +45,7 @@ pub struct BloomFilterMeta {
}
/// The location of the bloom filter segment in the file.
#[derive(Debug, Serialize, Deserialize, Clone, Hash, PartialEq, Eq)]
#[derive(Debug, Serialize, Deserialize, Clone, Copy, Hash, PartialEq, Eq)]
pub struct BloomFilterSegmentLocation {
/// The offset of the bloom filter segment in the file.
pub offset: u64,

View File

@@ -12,29 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::{BTreeMap, HashSet};
use parquet::arrow::arrow_reader::RowSelection;
use parquet::file::metadata::RowGroupMetaData;
use std::collections::HashSet;
use std::ops::Range;
use crate::bloom_filter::error::Result;
use crate::bloom_filter::reader::BloomFilterReader;
use crate::bloom_filter::{BloomFilterMeta, BloomFilterSegmentLocation, Bytes};
/// Enumerates types of predicates for value filtering.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Predicate {
/// Predicate for matching values in a list.
InList(InListPredicate),
}
/// `InListPredicate` contains a list of acceptable values. A value needs to match at least
/// one of the elements (logical OR semantic) for the predicate to be satisfied.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct InListPredicate {
/// List of acceptable values.
pub list: HashSet<Bytes>,
}
use crate::bloom_filter::{BloomFilterMeta, Bytes};
pub struct BloomFilterApplier {
reader: Box<dyn BloomFilterReader + Send>,
@@ -48,86 +31,126 @@ impl BloomFilterApplier {
Ok(Self { reader, meta })
}
/// Searches for matching row groups using bloom filters.
///
/// This method applies bloom filter index to eliminate row groups that definitely
/// don't contain the searched values. It works by:
///
/// 1. Computing prefix sums for row counts
/// 2. Calculating bloom filter segment locations for each row group
/// 1. A row group may span multiple bloom filter segments
/// 3. Probing bloom filter segments
/// 4. Removing non-matching row groups from the basement
/// 1. If a row group doesn't match any bloom filter segment with any probe, it is removed
///
/// # Note
/// The method modifies the `basement` map in-place by removing row groups that
/// don't match the bloom filter criteria.
/// Searches ranges of rows that match the given probes in the given search range.
pub async fn search(
&mut self,
probes: &HashSet<Bytes>,
row_group_metas: &[RowGroupMetaData],
basement: &mut BTreeMap<usize, Option<RowSelection>>,
) -> Result<()> {
// 0. Fast path - if basement is empty return empty vec
if basement.is_empty() {
return Ok(());
}
search_range: Range<usize>,
) -> Result<Vec<Range<usize>>> {
let rows_per_segment = self.meta.rows_per_segment;
let start_seg = search_range.start / rows_per_segment;
let end_seg = search_range.end.div_ceil(rows_per_segment);
// 1. Compute prefix sum for row counts
let mut sum = 0usize;
let mut prefix_sum = Vec::with_capacity(row_group_metas.len() + 1);
prefix_sum.push(0usize);
for meta in row_group_metas {
sum += meta.num_rows() as usize;
prefix_sum.push(sum);
}
let locs = &self.meta.bloom_filter_segments[start_seg..end_seg];
let bfs = self.reader.bloom_filter_vec(locs).await?;
// 2. Calculate bloom filter segment locations
let mut row_groups_to_remove = HashSet::new();
for &row_group_idx in basement.keys() {
// TODO(ruihang): support further filter over row selection
let mut ranges: Vec<Range<usize>> = Vec::with_capacity(end_seg - start_seg);
for (seg_id, bloom) in (start_seg..end_seg).zip(bfs) {
let start = seg_id * rows_per_segment;
for probe in probes {
if bloom.contains(probe) {
let end = (start + rows_per_segment).min(search_range.end);
let start = start.max(search_range.start);
// todo: dedup & overlap
let rows_range_start = prefix_sum[row_group_idx] / self.meta.rows_per_segment;
let rows_range_end = (prefix_sum[row_group_idx + 1] as f64
/ self.meta.rows_per_segment as f64)
.ceil() as usize;
let mut is_any_range_hit = false;
for i in rows_range_start..rows_range_end {
// 3. Probe each bloom filter segment
let loc = BloomFilterSegmentLocation {
offset: self.meta.bloom_filter_segments[i].offset,
size: self.meta.bloom_filter_segments[i].size,
elem_count: self.meta.bloom_filter_segments[i].elem_count,
};
let bloom = self.reader.bloom_filter(&loc).await?;
// Check if any probe exists in bloom filter
let mut matches = false;
for probe in probes {
if bloom.contains(probe) {
matches = true;
break;
match ranges.last_mut() {
Some(last) if last.end == start => {
last.end = end;
}
_ => {
ranges.push(start..end);
}
}
}
is_any_range_hit |= matches;
if matches {
break;
}
}
if !is_any_range_hit {
row_groups_to_remove.insert(row_group_idx);
}
}
// 4. Remove row groups that do not match any bloom filter segment
for row_group_idx in row_groups_to_remove {
basement.remove(&row_group_idx);
}
Ok(())
Ok(ranges)
}
}
#[cfg(test)]
mod tests {
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use futures::io::Cursor;
use super::*;
use crate::bloom_filter::creator::BloomFilterCreator;
use crate::bloom_filter::reader::BloomFilterReaderImpl;
use crate::external_provider::MockExternalTempFileProvider;
#[tokio::test]
#[allow(clippy::single_range_in_vec_init)]
async fn test_appliter() {
let mut writer = Cursor::new(Vec::new());
let mut creator = BloomFilterCreator::new(
4,
Arc::new(MockExternalTempFileProvider::new()),
Arc::new(AtomicUsize::new(0)),
None,
);
let rows = vec![
vec![b"row00".to_vec(), b"seg00".to_vec(), b"overl".to_vec()],
vec![b"row01".to_vec(), b"seg00".to_vec(), b"overl".to_vec()],
vec![b"row02".to_vec(), b"seg00".to_vec(), b"overl".to_vec()],
vec![b"row03".to_vec(), b"seg00".to_vec(), b"overl".to_vec()],
vec![b"row04".to_vec(), b"seg01".to_vec(), b"overl".to_vec()],
vec![b"row05".to_vec(), b"seg01".to_vec(), b"overl".to_vec()],
vec![b"row06".to_vec(), b"seg01".to_vec(), b"overp".to_vec()],
vec![b"row07".to_vec(), b"seg01".to_vec(), b"overp".to_vec()],
vec![b"row08".to_vec(), b"seg02".to_vec(), b"overp".to_vec()],
vec![b"row09".to_vec(), b"seg02".to_vec(), b"overp".to_vec()],
vec![b"row10".to_vec(), b"seg02".to_vec(), b"overp".to_vec()],
vec![b"row11".to_vec(), b"seg02".to_vec(), b"overp".to_vec()],
];
let cases = vec![
(vec![b"row00".to_vec()], 0..12, vec![0..4]), // search one row in full range
(vec![b"row05".to_vec()], 4..8, vec![4..8]), // search one row in partial range
(vec![b"row03".to_vec()], 4..8, vec![]), // search for a row that doesn't exist in the partial range
(
vec![b"row01".to_vec(), b"row06".to_vec()],
0..12,
vec![0..8],
), // search multiple rows in multiple ranges
(
vec![b"row01".to_vec(), b"row11".to_vec()],
0..12,
vec![0..4, 8..12],
), // search multiple rows in multiple ranges
(vec![b"row99".to_vec()], 0..12, vec![]), // search for a row that doesn't exist in the full range
(vec![b"row00".to_vec()], 12..12, vec![]), // search in an empty range
(
vec![b"row04".to_vec(), b"row05".to_vec()],
0..12,
vec![4..8],
), // search multiple rows in same segment
(vec![b"seg01".to_vec()], 0..12, vec![4..8]), // search rows in a segment
(vec![b"seg01".to_vec()], 6..12, vec![6..8]), // search rows in a segment in partial range
(vec![b"overl".to_vec()], 0..12, vec![0..8]), // search rows in multiple segments
(vec![b"overl".to_vec()], 2..12, vec![2..8]), // search range starts from the middle of a segment
(vec![b"overp".to_vec()], 0..10, vec![4..10]), // search range ends at the middle of a segment
];
for row in rows {
creator.push_row_elems(row).await.unwrap();
}
creator.finish(&mut writer).await.unwrap();
let bytes = writer.into_inner();
let reader = BloomFilterReaderImpl::new(bytes);
let mut applier = BloomFilterApplier::new(Box::new(reader)).await.unwrap();
for (probes, search_range, expected) in cases {
let probes: HashSet<Bytes> = probes.into_iter().collect();
let ranges = applier.search(&probes, search_range).await.unwrap();
assert_eq!(ranges, expected);
}
}
}

View File

@@ -63,6 +63,31 @@ pub trait BloomFilterReader: Sync {
.expected_items(loc.elem_count);
Ok(bm)
}
async fn bloom_filter_vec(
&self,
locs: &[BloomFilterSegmentLocation],
) -> Result<Vec<BloomFilter>> {
let ranges = locs
.iter()
.map(|l| l.offset..l.offset + l.size)
.collect::<Vec<_>>();
let bss = self.read_vec(&ranges).await?;
let mut result = Vec::with_capacity(bss.len());
for (bs, loc) in bss.into_iter().zip(locs.iter()) {
let vec = bs
.chunks_exact(std::mem::size_of::<u64>())
.map(|chunk| u64::from_le_bytes(chunk.try_into().unwrap()))
.collect();
let bm = BloomFilter::from_vec(vec)
.seed(&SEED)
.expected_items(loc.elem_count);
result.push(bm);
}
Ok(result)
}
}
/// `BloomFilterReaderImpl` reads the bloom filter from the file.

View File

@@ -12,10 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::ops::Range;
use std::sync::Arc;
use async_trait::async_trait;
use bytes::Bytes;
use futures::future::try_join_all;
use index::bloom_filter::error::Result;
use index::bloom_filter::reader::BloomFilterReader;
use index::bloom_filter::BloomFilterMeta;
@@ -101,6 +103,24 @@ impl<R: BloomFilterReader + Send> BloomFilterReader for CachedBloomFilterIndexBl
.map(|b| b.into())
}
async fn read_vec(&self, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
let fetch = ranges.iter().map(|range| {
let inner = &self.inner;
self.cache.get_or_load(
(self.file_id, self.column_id),
self.blob_size,
range.start,
(range.end - range.start) as u32,
move |ranges| async move { inner.read_vec(&ranges).await },
)
});
Ok(try_join_all(fetch)
.await?
.into_iter()
.map(Bytes::from)
.collect::<Vec<_>>())
}
/// Reads the meta information of the bloom filter.
async fn metadata(&self) -> Result<BloomFilterMeta> {
if let Some(cached) = self.cache.get_metadata((self.file_id, self.column_id)) {

View File

@@ -12,25 +12,20 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::{BTreeMap, HashMap, HashSet};
mod builder;
use std::collections::HashMap;
use std::ops::Range;
use std::sync::Arc;
use common_base::range_read::RangeReader;
use common_telemetry::warn;
use datafusion_common::ScalarValue;
use datafusion_expr::expr::InList;
use datafusion_expr::{BinaryExpr, Expr, Operator};
use datatypes::data_type::ConcreteDataType;
use datatypes::value::Value;
use index::bloom_filter::applier::{BloomFilterApplier, InListPredicate, Predicate};
use index::bloom_filter::applier::BloomFilterApplier;
use index::bloom_filter::reader::{BloomFilterReader, BloomFilterReaderImpl};
use object_store::ObjectStore;
use parquet::arrow::arrow_reader::RowSelection;
use parquet::file::metadata::RowGroupMetaData;
use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
use puffin::puffin_manager::{BlobGuard, PuffinManager, PuffinReader};
use snafu::{OptionExt, ResultExt};
use store_api::metadata::RegionMetadata;
use snafu::ResultExt;
use store_api::storage::{ColumnId, RegionId};
use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
@@ -38,32 +33,49 @@ use crate::cache::index::bloom_filter_index::{
BloomFilterIndexCacheRef, CachedBloomFilterIndexBlobReader,
};
use crate::error::{
ApplyBloomFilterIndexSnafu, ColumnNotFoundSnafu, ConvertValueSnafu, Error, MetadataSnafu,
PuffinBuildReaderSnafu, PuffinReadBlobSnafu, Result,
ApplyBloomFilterIndexSnafu, Error, MetadataSnafu, PuffinBuildReaderSnafu, PuffinReadBlobSnafu,
Result,
};
use crate::metrics::INDEX_APPLY_ELAPSED;
use crate::row_converter::SortField;
use crate::sst::file::FileId;
pub use crate::sst::index::bloom_filter::applier::builder::BloomFilterIndexApplierBuilder;
use crate::sst::index::bloom_filter::applier::builder::Predicate;
use crate::sst::index::bloom_filter::INDEX_BLOB_TYPE;
use crate::sst::index::codec::IndexValueCodec;
use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory};
use crate::sst::index::TYPE_BLOOM_FILTER_INDEX;
use crate::sst::location;
pub(crate) type BloomFilterIndexApplierRef = Arc<BloomFilterIndexApplier>;
/// `BloomFilterIndexApplier` applies bloom filter predicates to the SST file.
pub struct BloomFilterIndexApplier {
/// Directory of the region.
region_dir: String,
/// ID of the region.
region_id: RegionId,
/// Object store to read the index file.
object_store: ObjectStore,
/// File cache to read the index file.
file_cache: Option<FileCacheRef>,
/// Factory to create puffin manager.
puffin_manager_factory: PuffinManagerFactory,
/// Cache for puffin metadata.
puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
/// Cache for bloom filter index.
bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
/// Bloom filter predicates.
filters: HashMap<ColumnId, Vec<Predicate>>,
}
impl BloomFilterIndexApplier {
/// Creates a new `BloomFilterIndexApplier`.
pub fn new(
region_dir: String,
region_id: RegionId,
@@ -104,19 +116,38 @@ impl BloomFilterIndexApplier {
self
}
/// Applies bloom filter predicates to the provided SST file and returns a bitmap
/// indicating which segments may contain matching rows
/// Applies bloom filter predicates to the provided SST file and returns a
/// list of row group ranges that match the predicates.
///
/// The `row_groups` iterator provides the row group lengths and whether to search in the row group.
pub async fn apply(
&self,
file_id: FileId,
file_size_hint: Option<u64>,
row_group_metas: &[RowGroupMetaData],
basement: &mut BTreeMap<usize, Option<RowSelection>>,
) -> Result<()> {
row_groups: impl Iterator<Item = (usize, bool)>,
) -> Result<Vec<(usize, Vec<Range<usize>>)>> {
let _timer = INDEX_APPLY_ELAPSED
.with_label_values(&[TYPE_BLOOM_FILTER_INDEX])
.start_timer();
// Calculates row groups' ranges based on start of the file.
let mut input = Vec::with_capacity(row_groups.size_hint().0);
let mut start = 0;
for (i, (len, to_search)) in row_groups.enumerate() {
let end = start + len;
if to_search {
input.push((i, start..end));
}
start = end;
}
// Initializes output with input ranges, but ranges are based on start of the file not the row group,
// so we need to adjust them later.
let mut output = input
.iter()
.map(|(i, range)| (*i, vec![range.clone()]))
.collect::<Vec<_>>();
for (column_id, predicates) in &self.filters {
let blob = match self
.blob_reader(file_id, *column_id, file_size_hint)
@@ -136,18 +167,28 @@ impl BloomFilterIndexApplier {
BloomFilterReaderImpl::new(blob),
bloom_filter_cache.clone(),
);
self.apply_filters(reader, predicates, row_group_metas, basement)
self.apply_filters(reader, predicates, &input, &mut output)
.await
.context(ApplyBloomFilterIndexSnafu)?;
} else {
let reader = BloomFilterReaderImpl::new(blob);
self.apply_filters(reader, predicates, row_group_metas, basement)
self.apply_filters(reader, predicates, &input, &mut output)
.await
.context(ApplyBloomFilterIndexSnafu)?;
}
}
Ok(())
// adjust ranges to be based on row group
for ((_, output), (_, input)) in output.iter_mut().zip(input) {
let start = input.start;
for range in output.iter_mut() {
range.start -= start;
range.end -= start;
}
}
output.retain(|(_, ranges)| !ranges.is_empty());
Ok(output)
}
/// Creates a blob reader from the cached or remote index file.
@@ -159,7 +200,10 @@ impl BloomFilterIndexApplier {
column_id: ColumnId,
file_size_hint: Option<u64>,
) -> Result<Option<BlobReader>> {
let reader = match self.cached_blob_reader(file_id, column_id).await {
let reader = match self
.cached_blob_reader(file_id, column_id, file_size_hint)
.await
{
Ok(Some(puffin_reader)) => puffin_reader,
other => {
if let Err(err) = other {
@@ -192,6 +236,7 @@ impl BloomFilterIndexApplier {
&self,
file_id: FileId,
column_id: ColumnId,
file_size_hint: Option<u64>,
) -> Result<Option<BlobReader>> {
let Some(file_cache) = &self.file_cache else {
return Ok(None);
@@ -209,6 +254,7 @@ impl BloomFilterIndexApplier {
.reader(&puffin_file_name)
.await
.context(PuffinBuildReaderSnafu)?
.with_file_size_hint(file_size_hint)
.blob(&Self::column_blob_name(column_id))
.await
.context(PuffinReadBlobSnafu)?
@@ -253,17 +299,31 @@ impl BloomFilterIndexApplier {
&self,
reader: R,
predicates: &[Predicate],
row_group_metas: &[RowGroupMetaData],
basement: &mut BTreeMap<usize, Option<RowSelection>>,
input: &[(usize, Range<usize>)],
output: &mut [(usize, Vec<Range<usize>>)],
) -> std::result::Result<(), index::bloom_filter::error::Error> {
let mut applier = BloomFilterApplier::new(Box::new(reader)).await?;
for predicate in predicates {
match predicate {
Predicate::InList(in_list) => {
applier
.search(&in_list.list, row_group_metas, basement)
.await?;
for ((_, r), (_, output)) in input.iter().zip(output.iter_mut()) {
// All rows are filtered out, skip the search
if output.is_empty() {
continue;
}
for predicate in predicates {
match predicate {
Predicate::InList(in_list) => {
let res = applier.search(&in_list.list, r.clone()).await?;
if res.is_empty() {
output.clear();
break;
}
*output = intersect_ranges(output, &res);
if output.is_empty() {
break;
}
}
}
}
}
@@ -272,6 +332,37 @@ impl BloomFilterIndexApplier {
}
}
/// Intersects two lists of ranges and returns the intersection.
///
/// The input lists are assumed to be sorted and non-overlapping.
fn intersect_ranges(lhs: &[Range<usize>], rhs: &[Range<usize>]) -> Vec<Range<usize>> {
let mut i = 0;
let mut j = 0;
let mut output = Vec::new();
while i < lhs.len() && j < rhs.len() {
let r1 = &lhs[i];
let r2 = &rhs[j];
// Find intersection if exists
let start = r1.start.max(r2.start);
let end = r1.end.min(r2.end);
if start < end {
output.push(start..end);
}
// Move forward the range that ends first
if r1.end < r2.end {
i += 1;
} else {
j += 1;
}
}
output
}
fn is_blob_not_found(err: &Error) -> bool {
matches!(
err,
@@ -282,481 +373,200 @@ fn is_blob_not_found(err: &Error) -> bool {
)
}
pub struct BloomFilterIndexApplierBuilder<'a> {
region_dir: String,
object_store: ObjectStore,
metadata: &'a RegionMetadata,
puffin_manager_factory: PuffinManagerFactory,
file_cache: Option<FileCacheRef>,
puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
output: HashMap<ColumnId, Vec<Predicate>>,
}
impl<'a> BloomFilterIndexApplierBuilder<'a> {
pub fn new(
region_dir: String,
object_store: ObjectStore,
metadata: &'a RegionMetadata,
puffin_manager_factory: PuffinManagerFactory,
) -> Self {
Self {
region_dir,
object_store,
metadata,
puffin_manager_factory,
file_cache: None,
puffin_metadata_cache: None,
bloom_filter_index_cache: None,
output: HashMap::default(),
}
}
pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
self.file_cache = file_cache;
self
}
pub fn with_puffin_metadata_cache(
mut self,
puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
) -> Self {
self.puffin_metadata_cache = puffin_metadata_cache;
self
}
pub fn with_bloom_filter_index_cache(
mut self,
bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
) -> Self {
self.bloom_filter_index_cache = bloom_filter_index_cache;
self
}
/// Builds the applier with given filter expressions
pub fn build(mut self, exprs: &[Expr]) -> Result<Option<BloomFilterIndexApplier>> {
for expr in exprs {
self.traverse_and_collect(expr);
}
if self.output.is_empty() {
return Ok(None);
}
let applier = BloomFilterIndexApplier::new(
self.region_dir,
self.metadata.region_id,
self.object_store,
self.puffin_manager_factory,
self.output,
)
.with_file_cache(self.file_cache)
.with_puffin_metadata_cache(self.puffin_metadata_cache)
.with_bloom_filter_cache(self.bloom_filter_index_cache);
Ok(Some(applier))
}
/// Recursively traverses expressions to collect bloom filter predicates
fn traverse_and_collect(&mut self, expr: &Expr) {
let res = match expr {
Expr::BinaryExpr(BinaryExpr { left, op, right }) => match op {
Operator::And => {
self.traverse_and_collect(left);
self.traverse_and_collect(right);
Ok(())
}
Operator::Eq => self.collect_eq(left, right),
_ => Ok(()),
},
Expr::InList(in_list) => self.collect_in_list(in_list),
_ => Ok(()),
};
if let Err(err) = res {
warn!(err; "Failed to collect bloom filter predicates, ignore it. expr: {expr}");
}
}
/// Helper function to get the column id and type
fn column_id_and_type(
&self,
column_name: &str,
) -> Result<Option<(ColumnId, ConcreteDataType)>> {
let column = self
.metadata
.column_by_name(column_name)
.context(ColumnNotFoundSnafu {
column: column_name,
})?;
Ok(Some((
column.column_id,
column.column_schema.data_type.clone(),
)))
}
/// Collects an equality expression (column = value)
fn collect_eq(&mut self, left: &Expr, right: &Expr) -> Result<()> {
let (col, lit) = match (left, right) {
(Expr::Column(col), Expr::Literal(lit)) => (col, lit),
(Expr::Literal(lit), Expr::Column(col)) => (col, lit),
_ => return Ok(()),
};
if lit.is_null() {
return Ok(());
}
let Some((column_id, data_type)) = self.column_id_and_type(&col.name)? else {
return Ok(());
};
let value = encode_lit(lit, data_type)?;
// Create bloom filter predicate
let mut set = HashSet::new();
set.insert(value);
let predicate = Predicate::InList(InListPredicate { list: set });
// Add to output predicates
self.output.entry(column_id).or_default().push(predicate);
Ok(())
}
/// Collects an in list expression in the form of `column IN (lit, lit, ...)`.
fn collect_in_list(&mut self, in_list: &InList) -> Result<()> {
// Only collect InList predicates if they reference a column
let Expr::Column(column) = &in_list.expr.as_ref() else {
return Ok(());
};
if in_list.list.is_empty() || in_list.negated {
return Ok(());
}
let Some((column_id, data_type)) = self.column_id_and_type(&column.name)? else {
return Ok(());
};
// Convert all non-null literals to predicates
let predicates = in_list
.list
.iter()
.filter_map(Self::nonnull_lit)
.map(|lit| encode_lit(lit, data_type.clone()));
// Collect successful conversions
let mut valid_predicates = HashSet::new();
for predicate in predicates {
match predicate {
Ok(p) => {
valid_predicates.insert(p);
}
Err(e) => warn!(e; "Failed to convert value in InList"),
}
}
if !valid_predicates.is_empty() {
self.output
.entry(column_id)
.or_default()
.push(Predicate::InList(InListPredicate {
list: valid_predicates,
}));
}
Ok(())
}
/// Helper function to get non-null literal value
fn nonnull_lit(expr: &Expr) -> Option<&ScalarValue> {
match expr {
Expr::Literal(lit) if !lit.is_null() => Some(lit),
_ => None,
}
}
}
// TODO(ruihang): extract this and the one under inverted_index into a common util mod.
/// Helper function to encode a literal into bytes.
fn encode_lit(lit: &ScalarValue, data_type: ConcreteDataType) -> Result<Vec<u8>> {
let value = Value::try_from(lit.clone()).context(ConvertValueSnafu)?;
let mut bytes = vec![];
let field = SortField::new(data_type);
IndexValueCodec::encode_nonnull_value(value.as_value_ref(), &field, &mut bytes)?;
Ok(bytes)
}
#[cfg(test)]
mod tests {
use api::v1::SemanticType;
use datafusion_common::Column;
use datatypes::schema::ColumnSchema;
use object_store::services::Memory;
use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
use datafusion_expr::{col, lit, Expr};
use futures::future::BoxFuture;
use puffin::puffin_manager::PuffinWriter;
use store_api::metadata::RegionMetadata;
use super::*;
use crate::sst::index::bloom_filter::creator::tests::{
mock_object_store, mock_region_metadata, new_batch, new_intm_mgr,
};
use crate::sst::index::bloom_filter::creator::BloomFilterIndexer;
fn test_region_metadata() -> RegionMetadata {
let mut builder = RegionMetadataBuilder::new(RegionId::new(1234, 5678));
builder
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"column1",
ConcreteDataType::string_datatype(),
false,
),
semantic_type: SemanticType::Tag,
column_id: 1,
#[allow(clippy::type_complexity)]
fn tester(
region_dir: String,
object_store: ObjectStore,
metadata: &RegionMetadata,
puffin_manager_factory: PuffinManagerFactory,
file_id: FileId,
) -> impl Fn(&[Expr], Vec<(usize, bool)>) -> BoxFuture<'static, Vec<(usize, Vec<Range<usize>>)>>
+ use<'_> {
move |exprs, row_groups| {
let region_dir = region_dir.clone();
let object_store = object_store.clone();
let metadata = metadata.clone();
let puffin_manager_factory = puffin_manager_factory.clone();
let exprs = exprs.to_vec();
Box::pin(async move {
let builder = BloomFilterIndexApplierBuilder::new(
region_dir,
object_store,
&metadata,
puffin_manager_factory,
);
let applier = builder.build(&exprs).unwrap().unwrap();
applier
.apply(file_id, None, row_groups.into_iter())
.await
.unwrap()
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"column2",
ConcreteDataType::int64_datatype(),
false,
),
semantic_type: SemanticType::Field,
column_id: 2,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"column3",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
semantic_type: SemanticType::Timestamp,
column_id: 3,
})
.primary_key(vec![1]);
builder.build().unwrap()
}
}
fn test_object_store() -> ObjectStore {
ObjectStore::new(Memory::default()).unwrap().finish()
}
#[tokio::test]
#[allow(clippy::single_range_in_vec_init)]
async fn test_bloom_filter_applier() {
// tag_str:
// - type: string
// - index: bloom filter
// - granularity: 2
// - column_id: 1
//
// ts:
// - type: timestamp
// - index: time index
// - column_id: 2
//
// field_u64:
// - type: uint64
// - index: bloom filter
// - granularity: 4
// - column_id: 3
let region_metadata = mock_region_metadata();
let prefix = "test_bloom_filter_applier_";
let object_store = mock_object_store();
let intm_mgr = new_intm_mgr(prefix).await;
let memory_usage_threshold = Some(1024);
let file_id = FileId::random();
let region_dir = "region_dir".to_string();
let path = location::index_file_path(&region_dir, file_id);
fn column(name: &str) -> Expr {
Expr::Column(Column {
relation: None,
name: name.to_string(),
})
}
let mut indexer =
BloomFilterIndexer::new(file_id, &region_metadata, intm_mgr, memory_usage_threshold)
.unwrap()
.unwrap();
fn string_lit(s: impl Into<String>) -> Expr {
Expr::Literal(ScalarValue::Utf8(Some(s.into())))
}
// push 20 rows
let batch = new_batch("tag1", 0..10);
indexer.update(&batch).await.unwrap();
let batch = new_batch("tag2", 10..20);
indexer.update(&batch).await.unwrap();
#[test]
fn test_build_with_exprs() {
let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_exprs_");
let metadata = test_region_metadata();
let builder = BloomFilterIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
&metadata,
factory,
let (_d, factory) = PuffinManagerFactory::new_for_test_async(prefix).await;
let puffin_manager = factory.build(object_store.clone());
let mut puffin_writer = puffin_manager.writer(&path).await.unwrap();
indexer.finish(&mut puffin_writer).await.unwrap();
puffin_writer.finish().await.unwrap();
let tester = tester(
region_dir.clone(),
object_store.clone(),
&region_metadata,
factory.clone(),
file_id,
);
let exprs = vec![Expr::BinaryExpr(BinaryExpr {
left: Box::new(column("column1")),
op: Operator::Eq,
right: Box::new(string_lit("value1")),
})];
let result = builder.build(&exprs).unwrap();
assert!(result.is_some());
let filters = result.unwrap().filters;
assert_eq!(filters.len(), 1);
let column_predicates = filters.get(&1).unwrap();
assert_eq!(column_predicates.len(), 1);
let expected = encode_lit(
&ScalarValue::Utf8(Some("value1".to_string())),
ConcreteDataType::string_datatype(),
// rows 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19
// row group: | o row group | o row group | o row group | o row group |
// tag_str: | o pred | x pred |
let res = tester(
&[col("tag_str").eq(lit("tag1"))],
vec![(5, true), (5, true), (5, true), (5, true)],
)
.unwrap();
match &column_predicates[0] {
Predicate::InList(p) => {
assert_eq!(p.list.iter().next().unwrap(), &expected);
}
}
}
.await;
assert_eq!(res, vec![(0, vec![0..5]), (1, vec![0..5])]);
fn int64_lit(i: i64) -> Expr {
Expr::Literal(ScalarValue::Int64(Some(i)))
// rows 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19
// row group: | o row group | x row group | o row group | o row group |
// tag_str: | o pred | x pred |
let res = tester(
&[col("tag_str").eq(lit("tag1"))],
vec![(5, true), (5, false), (5, true), (5, true)],
)
.await;
assert_eq!(res, vec![(0, vec![0..5])]);
// rows 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19
// row group: | o row group | o row group | o row group | o row group |
// tag_str: | o pred | x pred |
// field_u64: | o pred | x pred | x pred | x pred | x pred |
let res = tester(
&[
col("tag_str").eq(lit("tag1")),
col("field_u64").eq(lit(1u64)),
],
vec![(5, true), (5, true), (5, true), (5, true)],
)
.await;
assert_eq!(res, vec![(0, vec![0..4])]);
// rows 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19
// row group: | o row group | o row group | x row group | o row group |
// field_u64: | o pred | x pred | o pred | x pred | x pred |
let res = tester(
&[col("field_u64").in_list(vec![lit(1u64), lit(11u64)], false)],
vec![(5, true), (5, true), (5, false), (5, true)],
)
.await;
assert_eq!(res, vec![(0, vec![0..4]), (1, vec![3..5])]);
}
#[test]
fn test_build_with_in_list() {
let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_in_list_");
let metadata = test_region_metadata();
let builder = BloomFilterIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
&metadata,
factory,
#[allow(clippy::single_range_in_vec_init)]
fn test_intersect_ranges() {
// empty inputs
assert_eq!(intersect_ranges(&[], &[]), Vec::<Range<usize>>::new());
assert_eq!(intersect_ranges(&[1..5], &[]), Vec::<Range<usize>>::new());
assert_eq!(intersect_ranges(&[], &[1..5]), Vec::<Range<usize>>::new());
// no overlap
assert_eq!(
intersect_ranges(&[1..3, 5..7], &[3..5, 7..9]),
Vec::<Range<usize>>::new()
);
let exprs = vec![Expr::InList(InList {
expr: Box::new(column("column2")),
list: vec![int64_lit(1), int64_lit(2), int64_lit(3)],
negated: false,
})];
// single overlap
assert_eq!(intersect_ranges(&[1..5], &[3..7]), vec![3..5]);
let result = builder.build(&exprs).unwrap();
assert!(result.is_some());
let filters = result.unwrap().filters;
let column_predicates = filters.get(&2).unwrap();
assert_eq!(column_predicates.len(), 1);
match &column_predicates[0] {
Predicate::InList(p) => {
assert_eq!(p.list.len(), 3);
}
}
}
#[test]
fn test_build_with_and_expressions() {
let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_and_");
let metadata = test_region_metadata();
let builder = BloomFilterIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
&metadata,
factory,
// multiple overlaps
assert_eq!(
intersect_ranges(&[1..5, 7..10, 12..15], &[2..6, 8..13]),
vec![2..5, 8..10, 12..13]
);
let exprs = vec![Expr::BinaryExpr(BinaryExpr {
left: Box::new(Expr::BinaryExpr(BinaryExpr {
left: Box::new(column("column1")),
op: Operator::Eq,
right: Box::new(string_lit("value1")),
})),
op: Operator::And,
right: Box::new(Expr::BinaryExpr(BinaryExpr {
left: Box::new(column("column2")),
op: Operator::Eq,
right: Box::new(int64_lit(42)),
})),
})];
let result = builder.build(&exprs).unwrap();
assert!(result.is_some());
let filters = result.unwrap().filters;
assert_eq!(filters.len(), 2);
assert!(filters.contains_key(&1));
assert!(filters.contains_key(&2));
}
#[test]
fn test_build_with_null_values() {
let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_null_");
let metadata = test_region_metadata();
let builder = BloomFilterIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
&metadata,
factory,
// exact overlap
assert_eq!(
intersect_ranges(&[1..3, 5..7], &[1..3, 5..7]),
vec![1..3, 5..7]
);
let exprs = vec![
Expr::BinaryExpr(BinaryExpr {
left: Box::new(column("column1")),
op: Operator::Eq,
right: Box::new(Expr::Literal(ScalarValue::Utf8(None))),
}),
Expr::InList(InList {
expr: Box::new(column("column2")),
list: vec![
int64_lit(1),
Expr::Literal(ScalarValue::Int64(None)),
int64_lit(3),
],
negated: false,
}),
];
let result = builder.build(&exprs).unwrap();
assert!(result.is_some());
let filters = result.unwrap().filters;
assert!(!filters.contains_key(&1)); // Null equality should be ignored
let column2_predicates = filters.get(&2).unwrap();
match &column2_predicates[0] {
Predicate::InList(p) => {
assert_eq!(p.list.len(), 2); // Only non-null values should be included
}
}
}
#[test]
fn test_build_with_invalid_expressions() {
let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_invalid_");
let metadata = test_region_metadata();
let builder = BloomFilterIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
&metadata,
factory,
// contained ranges
assert_eq!(
intersect_ranges(&[1..10], &[2..4, 5..7, 8..9]),
vec![2..4, 5..7, 8..9]
);
let exprs = vec![
// Non-equality operator
Expr::BinaryExpr(BinaryExpr {
left: Box::new(column("column1")),
op: Operator::Gt,
right: Box::new(string_lit("value1")),
}),
// Non-existent column
Expr::BinaryExpr(BinaryExpr {
left: Box::new(column("non_existent")),
op: Operator::Eq,
right: Box::new(string_lit("value")),
}),
// Negated IN list
Expr::InList(InList {
expr: Box::new(column("column2")),
list: vec![int64_lit(1), int64_lit(2)],
negated: true,
}),
];
let result = builder.build(&exprs).unwrap();
assert!(result.is_none());
}
#[test]
fn test_build_with_multiple_predicates_same_column() {
let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_multiple_");
let metadata = test_region_metadata();
let builder = BloomFilterIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
&metadata,
factory,
// partial overlaps
assert_eq!(
intersect_ranges(&[1..4, 6..9], &[2..7, 8..10]),
vec![2..4, 6..7, 8..9]
);
let exprs = vec![
Expr::BinaryExpr(BinaryExpr {
left: Box::new(column("column1")),
op: Operator::Eq,
right: Box::new(string_lit("value1")),
}),
Expr::InList(InList {
expr: Box::new(column("column1")),
list: vec![string_lit("value2"), string_lit("value3")],
negated: false,
}),
];
// single point overlap
assert_eq!(
intersect_ranges(&[1..3], &[3..5]),
Vec::<Range<usize>>::new()
);
let result = builder.build(&exprs).unwrap();
assert!(result.is_some());
let filters = result.unwrap().filters;
let column_predicates = filters.get(&1).unwrap();
assert_eq!(column_predicates.len(), 2);
// large ranges
assert_eq!(intersect_ranges(&[0..100], &[50..150]), vec![50..100]);
}
}

View File

@@ -0,0 +1,531 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::{HashMap, HashSet};
use common_telemetry::warn;
use datafusion_common::ScalarValue;
use datafusion_expr::expr::InList;
use datafusion_expr::{BinaryExpr, Expr, Operator};
use datatypes::data_type::ConcreteDataType;
use datatypes::value::Value;
use index::bloom_filter::Bytes;
use object_store::ObjectStore;
use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
use snafu::{OptionExt, ResultExt};
use store_api::metadata::RegionMetadata;
use store_api::storage::ColumnId;
use crate::cache::file_cache::FileCacheRef;
use crate::cache::index::bloom_filter_index::BloomFilterIndexCacheRef;
use crate::error::{ColumnNotFoundSnafu, ConvertValueSnafu, Result};
use crate::row_converter::SortField;
use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplier;
use crate::sst::index::codec::IndexValueCodec;
use crate::sst::index::puffin_manager::PuffinManagerFactory;
/// Enumerates types of predicates for value filtering.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Predicate {
/// Predicate for matching values in a list.
InList(InListPredicate),
}
/// `InListPredicate` contains a list of acceptable values. A value needs to match at least
/// one of the elements (logical OR semantic) for the predicate to be satisfied.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct InListPredicate {
/// List of acceptable values.
pub list: HashSet<Bytes>,
}
pub struct BloomFilterIndexApplierBuilder<'a> {
region_dir: String,
object_store: ObjectStore,
metadata: &'a RegionMetadata,
puffin_manager_factory: PuffinManagerFactory,
file_cache: Option<FileCacheRef>,
puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
output: HashMap<ColumnId, Vec<Predicate>>,
}
impl<'a> BloomFilterIndexApplierBuilder<'a> {
pub fn new(
region_dir: String,
object_store: ObjectStore,
metadata: &'a RegionMetadata,
puffin_manager_factory: PuffinManagerFactory,
) -> Self {
Self {
region_dir,
object_store,
metadata,
puffin_manager_factory,
file_cache: None,
puffin_metadata_cache: None,
bloom_filter_index_cache: None,
output: HashMap::default(),
}
}
pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
self.file_cache = file_cache;
self
}
pub fn with_puffin_metadata_cache(
mut self,
puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
) -> Self {
self.puffin_metadata_cache = puffin_metadata_cache;
self
}
pub fn with_bloom_filter_index_cache(
mut self,
bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
) -> Self {
self.bloom_filter_index_cache = bloom_filter_index_cache;
self
}
/// Builds the applier with given filter expressions
pub fn build(mut self, exprs: &[Expr]) -> Result<Option<BloomFilterIndexApplier>> {
for expr in exprs {
self.traverse_and_collect(expr);
}
if self.output.is_empty() {
return Ok(None);
}
let applier = BloomFilterIndexApplier::new(
self.region_dir,
self.metadata.region_id,
self.object_store,
self.puffin_manager_factory,
self.output,
)
.with_file_cache(self.file_cache)
.with_puffin_metadata_cache(self.puffin_metadata_cache)
.with_bloom_filter_cache(self.bloom_filter_index_cache);
Ok(Some(applier))
}
/// Recursively traverses expressions to collect bloom filter predicates
fn traverse_and_collect(&mut self, expr: &Expr) {
let res = match expr {
Expr::BinaryExpr(BinaryExpr { left, op, right }) => match op {
Operator::And => {
self.traverse_and_collect(left);
self.traverse_and_collect(right);
Ok(())
}
Operator::Eq => self.collect_eq(left, right),
_ => Ok(()),
},
Expr::InList(in_list) => self.collect_in_list(in_list),
_ => Ok(()),
};
if let Err(err) = res {
warn!(err; "Failed to collect bloom filter predicates, ignore it. expr: {expr}");
}
}
/// Helper function to get the column id and type
fn column_id_and_type(
&self,
column_name: &str,
) -> Result<Option<(ColumnId, ConcreteDataType)>> {
let column = self
.metadata
.column_by_name(column_name)
.context(ColumnNotFoundSnafu {
column: column_name,
})?;
Ok(Some((
column.column_id,
column.column_schema.data_type.clone(),
)))
}
/// Collects an equality expression (column = value)
fn collect_eq(&mut self, left: &Expr, right: &Expr) -> Result<()> {
let (col, lit) = match (left, right) {
(Expr::Column(col), Expr::Literal(lit)) => (col, lit),
(Expr::Literal(lit), Expr::Column(col)) => (col, lit),
_ => return Ok(()),
};
if lit.is_null() {
return Ok(());
}
let Some((column_id, data_type)) = self.column_id_and_type(&col.name)? else {
return Ok(());
};
let value = encode_lit(lit, data_type)?;
// Create bloom filter predicate
let mut set = HashSet::new();
set.insert(value);
let predicate = Predicate::InList(InListPredicate { list: set });
// Add to output predicates
self.output.entry(column_id).or_default().push(predicate);
Ok(())
}
/// Collects an in list expression in the form of `column IN (lit, lit, ...)`.
fn collect_in_list(&mut self, in_list: &InList) -> Result<()> {
// Only collect InList predicates if they reference a column
let Expr::Column(column) = &in_list.expr.as_ref() else {
return Ok(());
};
if in_list.list.is_empty() || in_list.negated {
return Ok(());
}
let Some((column_id, data_type)) = self.column_id_and_type(&column.name)? else {
return Ok(());
};
// Convert all non-null literals to predicates
let predicates = in_list
.list
.iter()
.filter_map(Self::nonnull_lit)
.map(|lit| encode_lit(lit, data_type.clone()));
// Collect successful conversions
let mut valid_predicates = HashSet::new();
for predicate in predicates {
match predicate {
Ok(p) => {
valid_predicates.insert(p);
}
Err(e) => warn!(e; "Failed to convert value in InList"),
}
}
if !valid_predicates.is_empty() {
self.output
.entry(column_id)
.or_default()
.push(Predicate::InList(InListPredicate {
list: valid_predicates,
}));
}
Ok(())
}
/// Helper function to get non-null literal value
fn nonnull_lit(expr: &Expr) -> Option<&ScalarValue> {
match expr {
Expr::Literal(lit) if !lit.is_null() => Some(lit),
_ => None,
}
}
}
// TODO(ruihang): extract this and the one under inverted_index into a common util mod.
/// Helper function to encode a literal into bytes.
fn encode_lit(lit: &ScalarValue, data_type: ConcreteDataType) -> Result<Vec<u8>> {
let value = Value::try_from(lit.clone()).context(ConvertValueSnafu)?;
let mut bytes = vec![];
let field = SortField::new(data_type);
IndexValueCodec::encode_nonnull_value(value.as_value_ref(), &field, &mut bytes)?;
Ok(bytes)
}
#[cfg(test)]
mod tests {
use api::v1::SemanticType;
use datafusion_common::Column;
use datatypes::schema::ColumnSchema;
use object_store::services::Memory;
use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
use store_api::storage::RegionId;
use super::*;
fn test_region_metadata() -> RegionMetadata {
let mut builder = RegionMetadataBuilder::new(RegionId::new(1234, 5678));
builder
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"column1",
ConcreteDataType::string_datatype(),
false,
),
semantic_type: SemanticType::Tag,
column_id: 1,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"column2",
ConcreteDataType::int64_datatype(),
false,
),
semantic_type: SemanticType::Field,
column_id: 2,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"column3",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
semantic_type: SemanticType::Timestamp,
column_id: 3,
})
.primary_key(vec![1]);
builder.build().unwrap()
}
fn test_object_store() -> ObjectStore {
ObjectStore::new(Memory::default()).unwrap().finish()
}
fn column(name: &str) -> Expr {
Expr::Column(Column {
relation: None,
name: name.to_string(),
})
}
fn string_lit(s: impl Into<String>) -> Expr {
Expr::Literal(ScalarValue::Utf8(Some(s.into())))
}
#[test]
fn test_build_with_exprs() {
let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_exprs_");
let metadata = test_region_metadata();
let builder = BloomFilterIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
&metadata,
factory,
);
let exprs = vec![Expr::BinaryExpr(BinaryExpr {
left: Box::new(column("column1")),
op: Operator::Eq,
right: Box::new(string_lit("value1")),
})];
let result = builder.build(&exprs).unwrap();
assert!(result.is_some());
let filters = result.unwrap().filters;
assert_eq!(filters.len(), 1);
let column_predicates = filters.get(&1).unwrap();
assert_eq!(column_predicates.len(), 1);
let expected = encode_lit(
&ScalarValue::Utf8(Some("value1".to_string())),
ConcreteDataType::string_datatype(),
)
.unwrap();
match &column_predicates[0] {
Predicate::InList(p) => {
assert_eq!(p.list.iter().next().unwrap(), &expected);
}
}
}
fn int64_lit(i: i64) -> Expr {
Expr::Literal(ScalarValue::Int64(Some(i)))
}
#[test]
fn test_build_with_in_list() {
let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_in_list_");
let metadata = test_region_metadata();
let builder = BloomFilterIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
&metadata,
factory,
);
let exprs = vec![Expr::InList(InList {
expr: Box::new(column("column2")),
list: vec![int64_lit(1), int64_lit(2), int64_lit(3)],
negated: false,
})];
let result = builder.build(&exprs).unwrap();
assert!(result.is_some());
let filters = result.unwrap().filters;
let column_predicates = filters.get(&2).unwrap();
assert_eq!(column_predicates.len(), 1);
match &column_predicates[0] {
Predicate::InList(p) => {
assert_eq!(p.list.len(), 3);
}
}
}
#[test]
fn test_build_with_and_expressions() {
let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_and_");
let metadata = test_region_metadata();
let builder = BloomFilterIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
&metadata,
factory,
);
let exprs = vec![Expr::BinaryExpr(BinaryExpr {
left: Box::new(Expr::BinaryExpr(BinaryExpr {
left: Box::new(column("column1")),
op: Operator::Eq,
right: Box::new(string_lit("value1")),
})),
op: Operator::And,
right: Box::new(Expr::BinaryExpr(BinaryExpr {
left: Box::new(column("column2")),
op: Operator::Eq,
right: Box::new(int64_lit(42)),
})),
})];
let result = builder.build(&exprs).unwrap();
assert!(result.is_some());
let filters = result.unwrap().filters;
assert_eq!(filters.len(), 2);
assert!(filters.contains_key(&1));
assert!(filters.contains_key(&2));
}
#[test]
fn test_build_with_null_values() {
let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_null_");
let metadata = test_region_metadata();
let builder = BloomFilterIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
&metadata,
factory,
);
let exprs = vec![
Expr::BinaryExpr(BinaryExpr {
left: Box::new(column("column1")),
op: Operator::Eq,
right: Box::new(Expr::Literal(ScalarValue::Utf8(None))),
}),
Expr::InList(InList {
expr: Box::new(column("column2")),
list: vec![
int64_lit(1),
Expr::Literal(ScalarValue::Int64(None)),
int64_lit(3),
],
negated: false,
}),
];
let result = builder.build(&exprs).unwrap();
assert!(result.is_some());
let filters = result.unwrap().filters;
assert!(!filters.contains_key(&1)); // Null equality should be ignored
let column2_predicates = filters.get(&2).unwrap();
match &column2_predicates[0] {
Predicate::InList(p) => {
assert_eq!(p.list.len(), 2); // Only non-null values should be included
}
}
}
#[test]
fn test_build_with_invalid_expressions() {
let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_invalid_");
let metadata = test_region_metadata();
let builder = BloomFilterIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
&metadata,
factory,
);
let exprs = vec![
// Non-equality operator
Expr::BinaryExpr(BinaryExpr {
left: Box::new(column("column1")),
op: Operator::Gt,
right: Box::new(string_lit("value1")),
}),
// Non-existent column
Expr::BinaryExpr(BinaryExpr {
left: Box::new(column("non_existent")),
op: Operator::Eq,
right: Box::new(string_lit("value")),
}),
// Negated IN list
Expr::InList(InList {
expr: Box::new(column("column2")),
list: vec![int64_lit(1), int64_lit(2)],
negated: true,
}),
];
let result = builder.build(&exprs).unwrap();
assert!(result.is_none());
}
#[test]
fn test_build_with_multiple_predicates_same_column() {
let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_multiple_");
let metadata = test_region_metadata();
let builder = BloomFilterIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
&metadata,
factory,
);
let exprs = vec![
Expr::BinaryExpr(BinaryExpr {
left: Box::new(column("column1")),
op: Operator::Eq,
right: Box::new(string_lit("value1")),
}),
Expr::InList(InList {
expr: Box::new(column("column1")),
list: vec![string_lit("value2"), string_lit("value3")],
negated: false,
}),
];
let result = builder.build(&exprs).unwrap();
assert!(result.is_some());
let filters = result.unwrap().filters;
let column_predicates = filters.get(&1).unwrap();
assert_eq!(column_predicates.len(), 2);
}
}

View File

@@ -321,7 +321,7 @@ impl BloomFilterIndexer {
}
#[cfg(test)]
mod tests {
pub(crate) mod tests {
use std::iter;
use api::v1::SemanticType;
@@ -341,11 +341,11 @@ mod tests {
use crate::row_converter::{McmpRowCodec, RowCodec, SortField};
use crate::sst::index::puffin_manager::PuffinManagerFactory;
fn mock_object_store() -> ObjectStore {
pub fn mock_object_store() -> ObjectStore {
ObjectStore::new(Memory::default()).unwrap().finish()
}
async fn new_intm_mgr(path: impl AsRef<str>) -> IntermediateManager {
pub async fn new_intm_mgr(path: impl AsRef<str>) -> IntermediateManager {
IntermediateManager::init_fs(path).await.unwrap()
}
@@ -365,7 +365,7 @@ mod tests {
/// - index: bloom filter
/// - granularity: 4
/// - column_id: 3
fn mock_region_metadata() -> RegionMetadataRef {
pub fn mock_region_metadata() -> RegionMetadataRef {
let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2));
builder
.push_column_metadata(ColumnMetadata {
@@ -410,7 +410,7 @@ mod tests {
Arc::new(builder.build().unwrap())
}
fn new_batch(str_tag: impl AsRef<str>, u64_field: impl IntoIterator<Item = u64>) -> Batch {
pub fn new_batch(str_tag: impl AsRef<str>, u64_field: impl IntoIterator<Item = u64>) -> Batch {
let fields = vec![SortField::new(ConcreteDataType::string_datatype())];
let codec = McmpRowCodec::new(fields);
let row: [ValueRef; 1] = [str_tag.as_ref().into()];

View File

@@ -570,6 +570,66 @@ impl ParquetReaderBuilder {
true
}
async fn prune_row_groups_by_bloom_filter(
&self,
parquet_meta: &ParquetMetaData,
output: &mut BTreeMap<usize, Option<RowSelection>>,
metrics: &mut ReaderFilterMetrics,
) -> bool {
let Some(index_applier) = &self.bloom_filter_index_applier else {
return false;
};
if !self.file_handle.meta_ref().bloom_filter_index_available() {
return false;
}
let file_size_hint = self.file_handle.meta_ref().bloom_filter_index_size();
let apply_output = match index_applier
.apply(
self.file_handle.file_id(),
file_size_hint,
parquet_meta
.row_groups()
.iter()
.enumerate()
.map(|(i, rg)| (rg.num_rows() as usize, output.contains_key(&i))),
)
.await
{
Ok(apply_output) => apply_output,
Err(err) => {
if cfg!(any(test, feature = "test")) {
panic!(
"Failed to apply bloom filter index, region_id: {}, file_id: {}, err: {:?}",
self.file_handle.region_id(),
self.file_handle.file_id(),
err
);
} else {
warn!(
err; "Failed to apply bloom filter index, region_id: {}, file_id: {}",
self.file_handle.region_id(), self.file_handle.file_id()
);
}
return false;
}
};
Self::prune_row_groups_by_ranges(
parquet_meta,
apply_output
.into_iter()
.map(|(rg, ranges)| (rg, ranges.into_iter())),
output,
&mut metrics.rg_bloom_filtered,
&mut metrics.rows_bloom_filtered,
);
true
}
/// Prunes row groups by rows. The `rows_in_row_groups` is like a map from row group to
/// a list of row ids to keep.
fn prune_row_groups_by_rows(
@@ -623,56 +683,6 @@ impl ParquetReaderBuilder {
*output = res;
}
async fn prune_row_groups_by_bloom_filter(
&self,
parquet_meta: &ParquetMetaData,
output: &mut BTreeMap<usize, Option<RowSelection>>,
metrics: &mut ReaderFilterMetrics,
) -> bool {
let Some(index_applier) = &self.bloom_filter_index_applier else {
return false;
};
if !self.file_handle.meta_ref().bloom_filter_index_available() {
return false;
}
let before_rg = output.len();
let file_size_hint = self.file_handle.meta_ref().bloom_filter_index_size();
if let Err(err) = index_applier
.apply(
self.file_handle.file_id(),
file_size_hint,
parquet_meta.row_groups(),
output,
)
.await
{
if cfg!(any(test, feature = "test")) {
panic!(
"Failed to apply bloom filter index, region_id: {}, file_id: {}, err: {:?}",
self.file_handle.region_id(),
self.file_handle.file_id(),
err
);
} else {
warn!(
err; "Failed to apply bloom filter index, region_id: {}, file_id: {}",
self.file_handle.region_id(), self.file_handle.file_id()
);
}
return false;
};
let after_rg = output.len();
// Update metrics.
metrics.rg_bloom_filtered += before_rg - after_rg;
true
}
/// Prunes row groups by ranges. The `ranges_in_row_groups` is like a map from row group to
/// a list of row ranges to keep.
fn prune_row_groups_by_ranges(

View File

@@ -14,7 +14,6 @@
//! Utilities for testing SSTs.
use std::num::NonZeroU64;
use std::sync::Arc;
use api::v1::{OpType, SemanticType};

View File

@@ -15,7 +15,6 @@
//! Utilities to mock version.
use std::collections::HashMap;
use std::num::NonZeroU64;
use std::sync::Arc;
use api::v1::value::ValueData;