mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-26 08:00:01 +00:00
feat(bloom-filter): bloom filter applier (#5220)
* wip Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * draft search logic Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * use defined BloomFilterReader Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * fix clippy Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * round the range end Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * finish index applier Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * integrate applier into mito2 with cache layer Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * fix cache key and add unit test Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * provide bloom filter index size hint Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * revert BloomFilterReaderImpl::read_vec Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * remove dead code Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * ignore null on eq Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * add more tests and fix bloom filter logic Signed-off-by: Ruihang Xia <waynestxia@gmail.com> --------- Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -5288,6 +5288,7 @@ dependencies = [
|
||||
"futures",
|
||||
"greptime-proto",
|
||||
"mockall",
|
||||
"parquet",
|
||||
"pin-project",
|
||||
"prost 0.12.6",
|
||||
"rand",
|
||||
|
||||
@@ -22,6 +22,7 @@ fst.workspace = true
|
||||
futures.workspace = true
|
||||
greptime-proto.workspace = true
|
||||
mockall.workspace = true
|
||||
parquet.workspace = true
|
||||
pin-project.workspace = true
|
||||
prost.workspace = true
|
||||
regex.workspace = true
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
pub mod applier;
|
||||
pub mod creator;
|
||||
pub mod error;
|
||||
pub mod reader;
|
||||
@@ -25,7 +26,7 @@ pub type BytesRef<'a> = &'a [u8];
|
||||
pub const SEED: u128 = 42;
|
||||
|
||||
/// The Meta information of the bloom filter stored in the file.
|
||||
#[derive(Debug, Default, Serialize, Deserialize)]
|
||||
#[derive(Debug, Default, Serialize, Deserialize, Clone)]
|
||||
pub struct BloomFilterMeta {
|
||||
/// The number of rows per segment.
|
||||
pub rows_per_segment: usize,
|
||||
@@ -44,7 +45,7 @@ pub struct BloomFilterMeta {
|
||||
}
|
||||
|
||||
/// The location of the bloom filter segment in the file.
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
#[derive(Debug, Serialize, Deserialize, Clone, Hash, PartialEq, Eq)]
|
||||
pub struct BloomFilterSegmentLocation {
|
||||
/// The offset of the bloom filter segment in the file.
|
||||
pub offset: u64,
|
||||
|
||||
133
src/index/src/bloom_filter/applier.rs
Normal file
133
src/index/src/bloom_filter/applier.rs
Normal file
@@ -0,0 +1,133 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::{BTreeMap, HashSet};
|
||||
|
||||
use parquet::arrow::arrow_reader::RowSelection;
|
||||
use parquet::file::metadata::RowGroupMetaData;
|
||||
|
||||
use crate::bloom_filter::error::Result;
|
||||
use crate::bloom_filter::reader::BloomFilterReader;
|
||||
use crate::bloom_filter::{BloomFilterMeta, BloomFilterSegmentLocation, Bytes};
|
||||
|
||||
/// Enumerates types of predicates for value filtering.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub enum Predicate {
|
||||
/// Predicate for matching values in a list.
|
||||
InList(InListPredicate),
|
||||
}
|
||||
|
||||
/// `InListPredicate` contains a list of acceptable values. A value needs to match at least
|
||||
/// one of the elements (logical OR semantic) for the predicate to be satisfied.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct InListPredicate {
|
||||
/// List of acceptable values.
|
||||
pub list: HashSet<Bytes>,
|
||||
}
|
||||
|
||||
pub struct BloomFilterApplier {
|
||||
reader: Box<dyn BloomFilterReader + Send>,
|
||||
meta: BloomFilterMeta,
|
||||
}
|
||||
|
||||
impl BloomFilterApplier {
|
||||
pub async fn new(mut reader: Box<dyn BloomFilterReader + Send>) -> Result<Self> {
|
||||
let meta = reader.metadata().await?;
|
||||
|
||||
Ok(Self { reader, meta })
|
||||
}
|
||||
|
||||
/// Searches for matching row groups using bloom filters.
|
||||
///
|
||||
/// This method applies bloom filter index to eliminate row groups that definitely
|
||||
/// don't contain the searched values. It works by:
|
||||
///
|
||||
/// 1. Computing prefix sums for row counts
|
||||
/// 2. Calculating bloom filter segment locations for each row group
|
||||
/// 1. A row group may span multiple bloom filter segments
|
||||
/// 3. Probing bloom filter segments
|
||||
/// 4. Removing non-matching row groups from the basement
|
||||
/// 1. If a row group doesn't match any bloom filter segment with any probe, it is removed
|
||||
///
|
||||
/// # Note
|
||||
/// The method modifies the `basement` map in-place by removing row groups that
|
||||
/// don't match the bloom filter criteria.
|
||||
pub async fn search(
|
||||
&mut self,
|
||||
probes: &HashSet<Bytes>,
|
||||
row_group_metas: &[RowGroupMetaData],
|
||||
basement: &mut BTreeMap<usize, Option<RowSelection>>,
|
||||
) -> Result<()> {
|
||||
// 0. Fast path - if basement is empty return empty vec
|
||||
if basement.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// 1. Compute prefix sum for row counts
|
||||
let mut sum = 0usize;
|
||||
let mut prefix_sum = Vec::with_capacity(row_group_metas.len() + 1);
|
||||
prefix_sum.push(0usize);
|
||||
for meta in row_group_metas {
|
||||
sum += meta.num_rows() as usize;
|
||||
prefix_sum.push(sum);
|
||||
}
|
||||
|
||||
// 2. Calculate bloom filter segment locations
|
||||
let mut row_groups_to_remove = HashSet::new();
|
||||
for &row_group_idx in basement.keys() {
|
||||
// TODO(ruihang): support further filter over row selection
|
||||
|
||||
// todo: dedup & overlap
|
||||
let rows_range_start = prefix_sum[row_group_idx] / self.meta.rows_per_segment;
|
||||
let rows_range_end = (prefix_sum[row_group_idx + 1] as f64
|
||||
/ self.meta.rows_per_segment as f64)
|
||||
.ceil() as usize;
|
||||
|
||||
let mut is_any_range_hit = false;
|
||||
for i in rows_range_start..rows_range_end {
|
||||
// 3. Probe each bloom filter segment
|
||||
let loc = BloomFilterSegmentLocation {
|
||||
offset: self.meta.bloom_filter_segments[i].offset,
|
||||
size: self.meta.bloom_filter_segments[i].size,
|
||||
elem_count: self.meta.bloom_filter_segments[i].elem_count,
|
||||
};
|
||||
let bloom = self.reader.bloom_filter(&loc).await?;
|
||||
|
||||
// Check if any probe exists in bloom filter
|
||||
let mut matches = false;
|
||||
for probe in probes {
|
||||
if bloom.contains(probe) {
|
||||
matches = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
is_any_range_hit |= matches;
|
||||
if matches {
|
||||
break;
|
||||
}
|
||||
}
|
||||
if !is_any_range_hit {
|
||||
row_groups_to_remove.insert(row_group_idx);
|
||||
}
|
||||
}
|
||||
|
||||
// 4. Remove row groups that do not match any bloom filter segment
|
||||
for row_group_idx in row_groups_to_remove {
|
||||
basement.remove(&row_group_idx);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -38,7 +38,15 @@ pub trait BloomFilterReader {
|
||||
async fn range_read(&mut self, offset: u64, size: u32) -> Result<Bytes>;
|
||||
|
||||
/// Reads bunch of ranges from the file.
|
||||
async fn read_vec(&mut self, ranges: &[Range<u64>]) -> Result<Vec<Bytes>>;
|
||||
async fn read_vec(&mut self, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
|
||||
let mut results = Vec::with_capacity(ranges.len());
|
||||
for range in ranges {
|
||||
let size = (range.end - range.start) as u32;
|
||||
let data = self.range_read(range.start, size).await?;
|
||||
results.push(data);
|
||||
}
|
||||
Ok(results)
|
||||
}
|
||||
|
||||
/// Reads the meta information of the bloom filter.
|
||||
async fn metadata(&mut self) -> Result<BloomFilterMeta>;
|
||||
|
||||
@@ -28,6 +28,7 @@ use std::sync::Arc;
|
||||
use bytes::Bytes;
|
||||
use datatypes::value::Value;
|
||||
use datatypes::vectors::VectorRef;
|
||||
use index::bloom_filter_index::{BloomFilterIndexCache, BloomFilterIndexCacheRef};
|
||||
use moka::notification::RemovalCause;
|
||||
use moka::sync::Cache;
|
||||
use parquet::column::page::Page;
|
||||
@@ -69,6 +70,8 @@ pub struct CacheManager {
|
||||
write_cache: Option<WriteCacheRef>,
|
||||
/// Cache for inverted index.
|
||||
index_cache: Option<InvertedIndexCacheRef>,
|
||||
/// Cache for bloom filter index.
|
||||
bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
|
||||
/// Puffin metadata cache.
|
||||
puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
|
||||
/// Cache for time series selectors.
|
||||
@@ -221,6 +224,10 @@ impl CacheManager {
|
||||
self.index_cache.as_ref()
|
||||
}
|
||||
|
||||
pub(crate) fn bloom_filter_index_cache(&self) -> Option<&BloomFilterIndexCacheRef> {
|
||||
self.bloom_filter_index_cache.as_ref()
|
||||
}
|
||||
|
||||
pub(crate) fn puffin_metadata_cache(&self) -> Option<&PuffinMetadataCacheRef> {
|
||||
self.puffin_metadata_cache.as_ref()
|
||||
}
|
||||
@@ -364,6 +371,12 @@ impl CacheManagerBuilder {
|
||||
self.index_content_size,
|
||||
self.index_content_page_size,
|
||||
);
|
||||
// TODO(ruihang): check if it's ok to reuse the same param with inverted index
|
||||
let bloom_filter_index_cache = BloomFilterIndexCache::new(
|
||||
self.index_metadata_size,
|
||||
self.index_content_size,
|
||||
self.index_content_page_size,
|
||||
);
|
||||
let puffin_metadata_cache =
|
||||
PuffinMetadataCache::new(self.puffin_metadata_size, &CACHE_BYTES);
|
||||
let selector_result_cache = (self.selector_result_cache_size != 0).then(|| {
|
||||
@@ -387,6 +400,7 @@ impl CacheManagerBuilder {
|
||||
page_cache,
|
||||
write_cache: self.write_cache,
|
||||
index_cache: Some(Arc::new(inverted_index_cache)),
|
||||
bloom_filter_index_cache: Some(Arc::new(bloom_filter_index_cache)),
|
||||
puffin_metadata_cache: Some(Arc::new(puffin_metadata_cache)),
|
||||
selector_result_cache,
|
||||
}
|
||||
|
||||
1
src/mito2/src/cache/index.rs
vendored
1
src/mito2/src/cache/index.rs
vendored
@@ -12,6 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
pub mod bloom_filter_index;
|
||||
pub mod inverted_index;
|
||||
|
||||
use std::future::Future;
|
||||
|
||||
167
src/mito2/src/cache/index/bloom_filter_index.rs
vendored
Normal file
167
src/mito2/src/cache/index/bloom_filter_index.rs
vendored
Normal file
@@ -0,0 +1,167 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use bytes::Bytes;
|
||||
use index::bloom_filter::error::Result;
|
||||
use index::bloom_filter::reader::BloomFilterReader;
|
||||
use index::bloom_filter::BloomFilterMeta;
|
||||
use store_api::storage::ColumnId;
|
||||
|
||||
use crate::cache::index::{IndexCache, PageKey, INDEX_METADATA_TYPE};
|
||||
use crate::metrics::{CACHE_HIT, CACHE_MISS};
|
||||
use crate::sst::file::FileId;
|
||||
|
||||
const INDEX_TYPE_BLOOM_FILTER_INDEX: &str = "bloom_filter_index";
|
||||
|
||||
/// Cache for bloom filter index.
|
||||
pub type BloomFilterIndexCache = IndexCache<(FileId, ColumnId), BloomFilterMeta>;
|
||||
pub type BloomFilterIndexCacheRef = Arc<BloomFilterIndexCache>;
|
||||
|
||||
impl BloomFilterIndexCache {
|
||||
/// Creates a new bloom filter index cache.
|
||||
pub fn new(index_metadata_cap: u64, index_content_cap: u64, page_size: u64) -> Self {
|
||||
Self::new_with_weighter(
|
||||
index_metadata_cap,
|
||||
index_content_cap,
|
||||
page_size,
|
||||
INDEX_TYPE_BLOOM_FILTER_INDEX,
|
||||
bloom_filter_index_metadata_weight,
|
||||
bloom_filter_index_content_weight,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
/// Calculates weight for bloom filter index metadata.
|
||||
fn bloom_filter_index_metadata_weight(k: &(FileId, ColumnId), _: &Arc<BloomFilterMeta>) -> u32 {
|
||||
(k.0.as_bytes().len()
|
||||
+ std::mem::size_of::<ColumnId>()
|
||||
+ std::mem::size_of::<BloomFilterMeta>()) as u32
|
||||
}
|
||||
|
||||
/// Calculates weight for bloom filter index content.
|
||||
fn bloom_filter_index_content_weight((k, _): &((FileId, ColumnId), PageKey), v: &Bytes) -> u32 {
|
||||
(k.0.as_bytes().len() + std::mem::size_of::<ColumnId>() + v.len()) as u32
|
||||
}
|
||||
|
||||
/// Bloom filter index blob reader with cache.
|
||||
pub struct CachedBloomFilterIndexBlobReader<R> {
|
||||
file_id: FileId,
|
||||
column_id: ColumnId,
|
||||
file_size: u64,
|
||||
inner: R,
|
||||
cache: BloomFilterIndexCacheRef,
|
||||
}
|
||||
|
||||
impl<R> CachedBloomFilterIndexBlobReader<R> {
|
||||
/// Creates a new bloom filter index blob reader with cache.
|
||||
pub fn new(
|
||||
file_id: FileId,
|
||||
column_id: ColumnId,
|
||||
file_size: u64,
|
||||
inner: R,
|
||||
cache: BloomFilterIndexCacheRef,
|
||||
) -> Self {
|
||||
Self {
|
||||
file_id,
|
||||
column_id,
|
||||
file_size,
|
||||
inner,
|
||||
cache,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<R: BloomFilterReader + Send> BloomFilterReader for CachedBloomFilterIndexBlobReader<R> {
|
||||
async fn range_read(&mut self, offset: u64, size: u32) -> Result<Bytes> {
|
||||
let inner = &mut self.inner;
|
||||
self.cache
|
||||
.get_or_load(
|
||||
(self.file_id, self.column_id),
|
||||
self.file_size,
|
||||
offset,
|
||||
size,
|
||||
move |ranges| async move { inner.read_vec(&ranges).await },
|
||||
)
|
||||
.await
|
||||
.map(|b| b.into())
|
||||
}
|
||||
|
||||
/// Reads the meta information of the bloom filter.
|
||||
async fn metadata(&mut self) -> Result<BloomFilterMeta> {
|
||||
if let Some(cached) = self.cache.get_metadata((self.file_id, self.column_id)) {
|
||||
CACHE_HIT.with_label_values(&[INDEX_METADATA_TYPE]).inc();
|
||||
Ok((*cached).clone())
|
||||
} else {
|
||||
let meta = self.inner.metadata().await?;
|
||||
self.cache
|
||||
.put_metadata((self.file_id, self.column_id), Arc::new(meta.clone()));
|
||||
CACHE_MISS.with_label_values(&[INDEX_METADATA_TYPE]).inc();
|
||||
Ok(meta)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use rand::{Rng, RngCore};
|
||||
|
||||
use super::*;
|
||||
|
||||
const FUZZ_REPEAT_TIMES: usize = 100;
|
||||
|
||||
#[test]
|
||||
fn fuzz_index_calculation() {
|
||||
let mut rng = rand::thread_rng();
|
||||
let mut data = vec![0u8; 1024 * 1024];
|
||||
rng.fill_bytes(&mut data);
|
||||
|
||||
for _ in 0..FUZZ_REPEAT_TIMES {
|
||||
let offset = rng.gen_range(0..data.len() as u64);
|
||||
let size = rng.gen_range(0..data.len() as u32 - offset as u32);
|
||||
let page_size: usize = rng.gen_range(1..1024);
|
||||
|
||||
let indexes =
|
||||
PageKey::generate_page_keys(offset, size, page_size as u64).collect::<Vec<_>>();
|
||||
let page_num = indexes.len();
|
||||
let mut read = Vec::with_capacity(size as usize);
|
||||
for key in indexes.into_iter() {
|
||||
let start = key.page_id as usize * page_size;
|
||||
let page = if start + page_size < data.len() {
|
||||
&data[start..start + page_size]
|
||||
} else {
|
||||
&data[start..]
|
||||
};
|
||||
read.extend_from_slice(page);
|
||||
}
|
||||
let expected_range = offset as usize..(offset + size as u64 as u64) as usize;
|
||||
let read = read[PageKey::calculate_range(offset, size, page_size as u64)].to_vec();
|
||||
assert_eq!(
|
||||
read,
|
||||
data.get(expected_range).unwrap(),
|
||||
"fuzz_read_index failed, offset: {}, size: {}, page_size: {}\nread len: {}, expected len: {}\nrange: {:?}, page num: {}",
|
||||
offset,
|
||||
size,
|
||||
page_size,
|
||||
read.len(),
|
||||
size as usize,
|
||||
PageKey::calculate_range(offset, size, page_size as u64),
|
||||
page_num
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -433,6 +433,7 @@ impl EngineInner {
|
||||
.with_parallel_scan_channel_size(self.config.parallel_scan_channel_size)
|
||||
.with_ignore_inverted_index(self.config.inverted_index.apply_on_query.disabled())
|
||||
.with_ignore_fulltext_index(self.config.fulltext_index.apply_on_query.disabled())
|
||||
// .with_ignore_bloom_filter(self.config.bloom_filter_index.apply_on_query.disabled()) // TODO(ruihang): wait for #5237
|
||||
.with_start_time(query_start);
|
||||
|
||||
Ok(scan_region)
|
||||
|
||||
@@ -576,6 +576,13 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to apply bloom filter index"))]
|
||||
ApplyBloomFilterIndex {
|
||||
source: index::bloom_filter::error::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to push index value"))]
|
||||
PushIndexValue {
|
||||
source: index::inverted_index::error::Error,
|
||||
@@ -1022,6 +1029,7 @@ impl ErrorExt for Error {
|
||||
EmptyRegionDir { .. } | EmptyManifestDir { .. } => StatusCode::RegionNotFound,
|
||||
ArrowReader { .. } => StatusCode::StorageUnavailable,
|
||||
ConvertValue { source, .. } => source.status_code(),
|
||||
ApplyBloomFilterIndex { source, .. } => source.status_code(),
|
||||
BuildIndexApplier { source, .. }
|
||||
| PushIndexValue { source, .. }
|
||||
| ApplyInvertedIndex { source, .. }
|
||||
|
||||
@@ -47,6 +47,9 @@ use crate::read::{Batch, Source};
|
||||
use crate::region::options::MergeMode;
|
||||
use crate::region::version::VersionRef;
|
||||
use crate::sst::file::FileHandle;
|
||||
use crate::sst::index::bloom_filter::applier::{
|
||||
BloomFilterIndexApplierBuilder, BloomFilterIndexApplierRef,
|
||||
};
|
||||
use crate::sst::index::fulltext_index::applier::builder::FulltextIndexApplierBuilder;
|
||||
use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
|
||||
use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
|
||||
@@ -175,6 +178,8 @@ pub(crate) struct ScanRegion {
|
||||
ignore_inverted_index: bool,
|
||||
/// Whether to ignore fulltext index.
|
||||
ignore_fulltext_index: bool,
|
||||
/// Whether to ignore bloom filter.
|
||||
ignore_bloom_filter: bool,
|
||||
/// Start time of the scan task.
|
||||
start_time: Option<Instant>,
|
||||
}
|
||||
@@ -195,6 +200,7 @@ impl ScanRegion {
|
||||
parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
|
||||
ignore_inverted_index: false,
|
||||
ignore_fulltext_index: false,
|
||||
ignore_bloom_filter: false,
|
||||
start_time: None,
|
||||
}
|
||||
}
|
||||
@@ -223,6 +229,14 @@ impl ScanRegion {
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets whether to ignore bloom filter.
|
||||
#[must_use]
|
||||
#[allow(dead_code)] // TODO(ruihang): waiting for #5237
|
||||
pub(crate) fn with_ignore_bloom_filter(mut self, ignore: bool) -> Self {
|
||||
self.ignore_bloom_filter = ignore;
|
||||
self
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub(crate) fn with_start_time(mut self, now: Instant) -> Self {
|
||||
self.start_time = Some(now);
|
||||
@@ -322,6 +336,7 @@ impl ScanRegion {
|
||||
self.maybe_remove_field_filters();
|
||||
|
||||
let inverted_index_applier = self.build_invereted_index_applier();
|
||||
let bloom_filter_applier = self.build_bloom_filter_applier();
|
||||
let fulltext_index_applier = self.build_fulltext_index_applier();
|
||||
let predicate = Predicate::new(self.request.filters.clone());
|
||||
// The mapper always computes projected column ids as the schema of SSTs may change.
|
||||
@@ -345,6 +360,7 @@ impl ScanRegion {
|
||||
.with_files(files)
|
||||
.with_cache(self.cache_manager)
|
||||
.with_inverted_index_applier(inverted_index_applier)
|
||||
.with_bloom_filter_index_applier(bloom_filter_applier)
|
||||
.with_fulltext_index_applier(fulltext_index_applier)
|
||||
.with_parallel_scan_channel_size(self.parallel_scan_channel_size)
|
||||
.with_start_time(self.start_time)
|
||||
@@ -448,6 +464,47 @@ impl ScanRegion {
|
||||
.map(Arc::new)
|
||||
}
|
||||
|
||||
/// Use the latest schema to build the bloom filter index applier.
|
||||
fn build_bloom_filter_applier(&self) -> Option<BloomFilterIndexApplierRef> {
|
||||
if self.ignore_bloom_filter {
|
||||
return None;
|
||||
}
|
||||
|
||||
let file_cache = || -> Option<FileCacheRef> {
|
||||
let cache_manager = self.cache_manager.as_ref()?;
|
||||
let write_cache = cache_manager.write_cache()?;
|
||||
let file_cache = write_cache.file_cache();
|
||||
Some(file_cache)
|
||||
}();
|
||||
|
||||
let index_cache = self
|
||||
.cache_manager
|
||||
.as_ref()
|
||||
.and_then(|c| c.bloom_filter_index_cache())
|
||||
.cloned();
|
||||
|
||||
let puffin_metadata_cache = self
|
||||
.cache_manager
|
||||
.as_ref()
|
||||
.and_then(|c| c.puffin_metadata_cache())
|
||||
.cloned();
|
||||
|
||||
BloomFilterIndexApplierBuilder::new(
|
||||
self.access_layer.region_dir().to_string(),
|
||||
self.access_layer.object_store().clone(),
|
||||
self.version.metadata.as_ref(),
|
||||
self.access_layer.puffin_manager_factory().clone(),
|
||||
)
|
||||
.with_file_cache(file_cache)
|
||||
.with_bloom_filter_index_cache(index_cache)
|
||||
.with_puffin_metadata_cache(puffin_metadata_cache)
|
||||
.build(&self.request.filters)
|
||||
.inspect_err(|err| warn!(err; "Failed to build bloom filter index applier"))
|
||||
.ok()
|
||||
.flatten()
|
||||
.map(Arc::new)
|
||||
}
|
||||
|
||||
/// Use the latest schema to build the fulltext index applier.
|
||||
fn build_fulltext_index_applier(&self) -> Option<FulltextIndexApplierRef> {
|
||||
if self.ignore_fulltext_index {
|
||||
@@ -501,6 +558,7 @@ pub(crate) struct ScanInput {
|
||||
pub(crate) parallel_scan_channel_size: usize,
|
||||
/// Index appliers.
|
||||
inverted_index_applier: Option<InvertedIndexApplierRef>,
|
||||
bloom_filter_index_applier: Option<BloomFilterIndexApplierRef>,
|
||||
fulltext_index_applier: Option<FulltextIndexApplierRef>,
|
||||
/// Start time of the query.
|
||||
pub(crate) query_start: Option<Instant>,
|
||||
@@ -529,6 +587,7 @@ impl ScanInput {
|
||||
ignore_file_not_found: false,
|
||||
parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
|
||||
inverted_index_applier: None,
|
||||
bloom_filter_index_applier: None,
|
||||
fulltext_index_applier: None,
|
||||
query_start: None,
|
||||
append_mode: false,
|
||||
@@ -600,6 +659,16 @@ impl ScanInput {
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets bloom filter applier.
|
||||
#[must_use]
|
||||
pub(crate) fn with_bloom_filter_index_applier(
|
||||
mut self,
|
||||
applier: Option<BloomFilterIndexApplierRef>,
|
||||
) -> Self {
|
||||
self.bloom_filter_index_applier = applier;
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets fulltext index applier.
|
||||
#[must_use]
|
||||
pub(crate) fn with_fulltext_index_applier(
|
||||
@@ -694,6 +763,7 @@ impl ScanInput {
|
||||
.projection(Some(self.mapper.column_ids().to_vec()))
|
||||
.cache(self.cache_manager.clone())
|
||||
.inverted_index_applier(self.inverted_index_applier.clone())
|
||||
.bloom_filter_index_applier(self.bloom_filter_index_applier.clone())
|
||||
.fulltext_index_applier(self.fulltext_index_applier.clone())
|
||||
.expected_metadata(Some(self.mapper.metadata().clone()))
|
||||
.build_reader_input(reader_metrics)
|
||||
|
||||
@@ -143,8 +143,8 @@ pub enum IndexType {
|
||||
InvertedIndex,
|
||||
/// Full-text index.
|
||||
FulltextIndex,
|
||||
/// Bloom filter.
|
||||
BloomFilter,
|
||||
/// Bloom Filter index
|
||||
BloomFilterIndex,
|
||||
}
|
||||
|
||||
impl FileMeta {
|
||||
@@ -158,9 +158,10 @@ impl FileMeta {
|
||||
self.available_indexes.contains(&IndexType::FulltextIndex)
|
||||
}
|
||||
|
||||
/// Returns true if the file has a bloom filter
|
||||
pub fn bloom_filter_available(&self) -> bool {
|
||||
self.available_indexes.contains(&IndexType::BloomFilter)
|
||||
/// Returns true if the file has a bloom filter index.
|
||||
pub fn bloom_filter_index_available(&self) -> bool {
|
||||
self.available_indexes
|
||||
.contains(&IndexType::BloomFilterIndex)
|
||||
}
|
||||
|
||||
/// Returns the size of the inverted index file
|
||||
@@ -180,6 +181,15 @@ impl FileMeta {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the size of the bloom filter index file
|
||||
pub fn bloom_filter_index_size(&self) -> Option<u64> {
|
||||
if self.available_indexes.len() == 1 && self.bloom_filter_index_available() {
|
||||
Some(self.index_file_size)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle to a SST file.
|
||||
|
||||
@@ -44,7 +44,7 @@ use crate::sst::index::inverted_index::creator::InvertedIndexer;
|
||||
|
||||
pub(crate) const TYPE_INVERTED_INDEX: &str = "inverted_index";
|
||||
pub(crate) const TYPE_FULLTEXT_INDEX: &str = "fulltext_index";
|
||||
pub(crate) const TYPE_BLOOM_FILTER: &str = "bloom_filter";
|
||||
pub(crate) const TYPE_BLOOM_FILTER_INDEX: &str = "bloom_filter_index";
|
||||
|
||||
/// Output of the index creation.
|
||||
#[derive(Debug, Clone, Default)]
|
||||
@@ -69,7 +69,7 @@ impl IndexOutput {
|
||||
indexes.push(IndexType::FulltextIndex);
|
||||
}
|
||||
if self.bloom_filter.is_available() {
|
||||
indexes.push(IndexType::BloomFilter);
|
||||
indexes.push(IndexType::BloomFilterIndex);
|
||||
}
|
||||
indexes
|
||||
}
|
||||
@@ -162,7 +162,7 @@ impl Indexer {
|
||||
.as_ref()
|
||||
.map_or(0, |creator| creator.memory_usage());
|
||||
INDEX_CREATE_MEMORY_USAGE
|
||||
.with_label_values(&[TYPE_BLOOM_FILTER])
|
||||
.with_label_values(&[TYPE_BLOOM_FILTER_INDEX])
|
||||
.add(bloom_filter_mem as i64 - self.last_mem_bloom_filter as i64);
|
||||
self.last_mem_bloom_filter = bloom_filter_mem;
|
||||
}
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
pub(crate) mod applier;
|
||||
pub(crate) mod creator;
|
||||
|
||||
const INDEX_BLOB_TYPE: &str = "greptime-bloom-filter-v1";
|
||||
|
||||
722
src/mito2/src/sst/index/bloom_filter/applier.rs
Normal file
722
src/mito2/src/sst/index/bloom_filter/applier.rs
Normal file
@@ -0,0 +1,722 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::{BTreeMap, HashMap, HashSet};
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_base::range_read::RangeReader;
|
||||
use common_telemetry::warn;
|
||||
use datafusion_common::ScalarValue;
|
||||
use datafusion_expr::expr::InList;
|
||||
use datafusion_expr::{BinaryExpr, Expr, Operator};
|
||||
use datatypes::data_type::ConcreteDataType;
|
||||
use datatypes::value::Value;
|
||||
use index::bloom_filter::applier::{BloomFilterApplier, InListPredicate, Predicate};
|
||||
use index::bloom_filter::reader::{BloomFilterReader, BloomFilterReaderImpl};
|
||||
use object_store::ObjectStore;
|
||||
use parquet::arrow::arrow_reader::RowSelection;
|
||||
use parquet::file::metadata::RowGroupMetaData;
|
||||
use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
|
||||
use puffin::puffin_manager::{BlobGuard, PuffinManager, PuffinReader};
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use store_api::metadata::RegionMetadata;
|
||||
use store_api::storage::{ColumnId, RegionId};
|
||||
|
||||
use super::INDEX_BLOB_TYPE;
|
||||
use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
|
||||
use crate::cache::index::bloom_filter_index::{
|
||||
BloomFilterIndexCacheRef, CachedBloomFilterIndexBlobReader,
|
||||
};
|
||||
use crate::error::{
|
||||
ApplyBloomFilterIndexSnafu, ColumnNotFoundSnafu, ConvertValueSnafu, MetadataSnafu,
|
||||
PuffinBuildReaderSnafu, PuffinReadBlobSnafu, Result,
|
||||
};
|
||||
use crate::metrics::INDEX_APPLY_ELAPSED;
|
||||
use crate::row_converter::SortField;
|
||||
use crate::sst::file::FileId;
|
||||
use crate::sst::index::codec::IndexValueCodec;
|
||||
use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory};
|
||||
use crate::sst::index::TYPE_BLOOM_FILTER_INDEX;
|
||||
use crate::sst::location;
|
||||
|
||||
pub(crate) type BloomFilterIndexApplierRef = Arc<BloomFilterIndexApplier>;
|
||||
|
||||
pub struct BloomFilterIndexApplier {
|
||||
region_dir: String,
|
||||
region_id: RegionId,
|
||||
object_store: ObjectStore,
|
||||
file_cache: Option<FileCacheRef>,
|
||||
puffin_manager_factory: PuffinManagerFactory,
|
||||
puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
|
||||
bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
|
||||
filters: HashMap<ColumnId, Vec<Predicate>>,
|
||||
}
|
||||
|
||||
impl BloomFilterIndexApplier {
|
||||
pub fn new(
|
||||
region_dir: String,
|
||||
region_id: RegionId,
|
||||
object_store: ObjectStore,
|
||||
puffin_manager_factory: PuffinManagerFactory,
|
||||
filters: HashMap<ColumnId, Vec<Predicate>>,
|
||||
) -> Self {
|
||||
Self {
|
||||
region_dir,
|
||||
region_id,
|
||||
object_store,
|
||||
file_cache: None,
|
||||
puffin_manager_factory,
|
||||
puffin_metadata_cache: None,
|
||||
bloom_filter_index_cache: None,
|
||||
filters,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
|
||||
self.file_cache = file_cache;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_puffin_metadata_cache(
|
||||
mut self,
|
||||
puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
|
||||
) -> Self {
|
||||
self.puffin_metadata_cache = puffin_metadata_cache;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_bloom_filter_cache(
|
||||
mut self,
|
||||
bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
|
||||
) -> Self {
|
||||
self.bloom_filter_index_cache = bloom_filter_index_cache;
|
||||
self
|
||||
}
|
||||
|
||||
/// Applies bloom filter predicates to the provided SST file and returns a bitmap
|
||||
/// indicating which segments may contain matching rows
|
||||
pub async fn apply(
|
||||
&self,
|
||||
file_id: FileId,
|
||||
file_size_hint: Option<u64>,
|
||||
row_group_metas: &[RowGroupMetaData],
|
||||
basement: &mut BTreeMap<usize, Option<RowSelection>>,
|
||||
) -> Result<()> {
|
||||
let _timer = INDEX_APPLY_ELAPSED
|
||||
.with_label_values(&[TYPE_BLOOM_FILTER_INDEX])
|
||||
.start_timer();
|
||||
|
||||
for (column_id, predicates) in &self.filters {
|
||||
let mut blob = match self.cached_blob_reader(file_id, *column_id).await {
|
||||
Ok(Some(puffin_reader)) => puffin_reader,
|
||||
other => {
|
||||
if let Err(err) = other {
|
||||
warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.")
|
||||
}
|
||||
self.remote_blob_reader(file_id, *column_id, file_size_hint)
|
||||
.await?
|
||||
}
|
||||
};
|
||||
|
||||
// Create appropriate reader based on whether we have caching enabled
|
||||
if let Some(bloom_filter_cache) = &self.bloom_filter_index_cache {
|
||||
let file_size = if let Some(file_size) = file_size_hint {
|
||||
file_size
|
||||
} else {
|
||||
blob.metadata().await.context(MetadataSnafu)?.content_length
|
||||
};
|
||||
let reader = CachedBloomFilterIndexBlobReader::new(
|
||||
file_id,
|
||||
*column_id,
|
||||
file_size,
|
||||
BloomFilterReaderImpl::new(blob),
|
||||
bloom_filter_cache.clone(),
|
||||
);
|
||||
self.apply_filters(reader, predicates, row_group_metas, basement)
|
||||
.await
|
||||
.context(ApplyBloomFilterIndexSnafu)?;
|
||||
} else {
|
||||
let reader = BloomFilterReaderImpl::new(blob);
|
||||
self.apply_filters(reader, predicates, row_group_metas, basement)
|
||||
.await
|
||||
.context(ApplyBloomFilterIndexSnafu)?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Creates a blob reader from the cached index file
|
||||
async fn cached_blob_reader(
|
||||
&self,
|
||||
file_id: FileId,
|
||||
column_id: ColumnId,
|
||||
) -> Result<Option<BlobReader>> {
|
||||
let Some(file_cache) = &self.file_cache else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
let index_key = IndexKey::new(self.region_id, file_id, FileType::Puffin);
|
||||
if file_cache.get(index_key).await.is_none() {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
let puffin_manager = self.puffin_manager_factory.build(file_cache.local_store());
|
||||
let puffin_file_name = file_cache.cache_file_path(index_key);
|
||||
|
||||
let reader = puffin_manager
|
||||
.reader(&puffin_file_name)
|
||||
.await
|
||||
.context(PuffinBuildReaderSnafu)?
|
||||
.blob(&Self::column_blob_name(column_id))
|
||||
.await
|
||||
.context(PuffinReadBlobSnafu)?
|
||||
.reader()
|
||||
.await
|
||||
.context(PuffinBuildReaderSnafu)?;
|
||||
Ok(Some(reader))
|
||||
}
|
||||
|
||||
// TODO(ruihang): use the same util with the code in creator
|
||||
fn column_blob_name(column_id: ColumnId) -> String {
|
||||
format!("{INDEX_BLOB_TYPE}-{column_id}")
|
||||
}
|
||||
|
||||
/// Creates a blob reader from the remote index file
|
||||
async fn remote_blob_reader(
|
||||
&self,
|
||||
file_id: FileId,
|
||||
column_id: ColumnId,
|
||||
file_size_hint: Option<u64>,
|
||||
) -> Result<BlobReader> {
|
||||
let puffin_manager = self
|
||||
.puffin_manager_factory
|
||||
.build(self.object_store.clone())
|
||||
.with_puffin_metadata_cache(self.puffin_metadata_cache.clone());
|
||||
|
||||
let file_path = location::index_file_path(&self.region_dir, file_id);
|
||||
puffin_manager
|
||||
.reader(&file_path)
|
||||
.await
|
||||
.context(PuffinBuildReaderSnafu)?
|
||||
.with_file_size_hint(file_size_hint)
|
||||
.blob(&Self::column_blob_name(column_id))
|
||||
.await
|
||||
.context(PuffinReadBlobSnafu)?
|
||||
.reader()
|
||||
.await
|
||||
.context(PuffinBuildReaderSnafu)
|
||||
}
|
||||
|
||||
async fn apply_filters<R: BloomFilterReader + Send + 'static>(
|
||||
&self,
|
||||
reader: R,
|
||||
predicates: &[Predicate],
|
||||
row_group_metas: &[RowGroupMetaData],
|
||||
basement: &mut BTreeMap<usize, Option<RowSelection>>,
|
||||
) -> std::result::Result<(), index::bloom_filter::error::Error> {
|
||||
let mut applier = BloomFilterApplier::new(Box::new(reader)).await?;
|
||||
|
||||
for predicate in predicates {
|
||||
match predicate {
|
||||
Predicate::InList(in_list) => {
|
||||
applier
|
||||
.search(&in_list.list, row_group_metas, basement)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct BloomFilterIndexApplierBuilder<'a> {
|
||||
region_dir: String,
|
||||
object_store: ObjectStore,
|
||||
metadata: &'a RegionMetadata,
|
||||
puffin_manager_factory: PuffinManagerFactory,
|
||||
file_cache: Option<FileCacheRef>,
|
||||
puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
|
||||
bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
|
||||
output: HashMap<ColumnId, Vec<Predicate>>,
|
||||
}
|
||||
|
||||
impl<'a> BloomFilterIndexApplierBuilder<'a> {
|
||||
pub fn new(
|
||||
region_dir: String,
|
||||
object_store: ObjectStore,
|
||||
metadata: &'a RegionMetadata,
|
||||
puffin_manager_factory: PuffinManagerFactory,
|
||||
) -> Self {
|
||||
Self {
|
||||
region_dir,
|
||||
object_store,
|
||||
metadata,
|
||||
puffin_manager_factory,
|
||||
file_cache: None,
|
||||
puffin_metadata_cache: None,
|
||||
bloom_filter_index_cache: None,
|
||||
output: HashMap::default(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
|
||||
self.file_cache = file_cache;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_puffin_metadata_cache(
|
||||
mut self,
|
||||
puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
|
||||
) -> Self {
|
||||
self.puffin_metadata_cache = puffin_metadata_cache;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_bloom_filter_index_cache(
|
||||
mut self,
|
||||
bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
|
||||
) -> Self {
|
||||
self.bloom_filter_index_cache = bloom_filter_index_cache;
|
||||
self
|
||||
}
|
||||
|
||||
/// Builds the applier with given filter expressions
|
||||
pub fn build(mut self, exprs: &[Expr]) -> Result<Option<BloomFilterIndexApplier>> {
|
||||
for expr in exprs {
|
||||
self.traverse_and_collect(expr);
|
||||
}
|
||||
|
||||
if self.output.is_empty() {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let applier = BloomFilterIndexApplier::new(
|
||||
self.region_dir,
|
||||
self.metadata.region_id,
|
||||
self.object_store,
|
||||
self.puffin_manager_factory,
|
||||
self.output,
|
||||
)
|
||||
.with_file_cache(self.file_cache)
|
||||
.with_puffin_metadata_cache(self.puffin_metadata_cache)
|
||||
.with_bloom_filter_cache(self.bloom_filter_index_cache);
|
||||
|
||||
Ok(Some(applier))
|
||||
}
|
||||
|
||||
/// Recursively traverses expressions to collect bloom filter predicates
|
||||
fn traverse_and_collect(&mut self, expr: &Expr) {
|
||||
let res = match expr {
|
||||
Expr::BinaryExpr(BinaryExpr { left, op, right }) => match op {
|
||||
Operator::And => {
|
||||
self.traverse_and_collect(left);
|
||||
self.traverse_and_collect(right);
|
||||
Ok(())
|
||||
}
|
||||
Operator::Eq => self.collect_eq(left, right),
|
||||
_ => Ok(()),
|
||||
},
|
||||
Expr::InList(in_list) => self.collect_in_list(in_list),
|
||||
_ => Ok(()),
|
||||
};
|
||||
|
||||
if let Err(err) = res {
|
||||
warn!(err; "Failed to collect bloom filter predicates, ignore it. expr: {expr}");
|
||||
}
|
||||
}
|
||||
|
||||
/// Helper function to get the column id and type
|
||||
fn column_id_and_type(
|
||||
&self,
|
||||
column_name: &str,
|
||||
) -> Result<Option<(ColumnId, ConcreteDataType)>> {
|
||||
let column = self
|
||||
.metadata
|
||||
.column_by_name(column_name)
|
||||
.context(ColumnNotFoundSnafu {
|
||||
column: column_name,
|
||||
})?;
|
||||
|
||||
Ok(Some((
|
||||
column.column_id,
|
||||
column.column_schema.data_type.clone(),
|
||||
)))
|
||||
}
|
||||
|
||||
/// Collects an equality expression (column = value)
|
||||
fn collect_eq(&mut self, left: &Expr, right: &Expr) -> Result<()> {
|
||||
let (col, lit) = match (left, right) {
|
||||
(Expr::Column(col), Expr::Literal(lit)) => (col, lit),
|
||||
(Expr::Literal(lit), Expr::Column(col)) => (col, lit),
|
||||
_ => return Ok(()),
|
||||
};
|
||||
if lit.is_null() {
|
||||
return Ok(());
|
||||
}
|
||||
let Some((column_id, data_type)) = self.column_id_and_type(&col.name)? else {
|
||||
return Ok(());
|
||||
};
|
||||
let value = encode_lit(lit, data_type)?;
|
||||
|
||||
// Create bloom filter predicate
|
||||
let mut set = HashSet::new();
|
||||
set.insert(value);
|
||||
let predicate = Predicate::InList(InListPredicate { list: set });
|
||||
|
||||
// Add to output predicates
|
||||
self.output.entry(column_id).or_default().push(predicate);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Collects an in list expression in the form of `column IN (lit, lit, ...)`.
|
||||
fn collect_in_list(&mut self, in_list: &InList) -> Result<()> {
|
||||
// Only collect InList predicates if they reference a column
|
||||
let Expr::Column(column) = &in_list.expr.as_ref() else {
|
||||
return Ok(());
|
||||
};
|
||||
if in_list.list.is_empty() || in_list.negated {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let Some((column_id, data_type)) = self.column_id_and_type(&column.name)? else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
// Convert all non-null literals to predicates
|
||||
let predicates = in_list
|
||||
.list
|
||||
.iter()
|
||||
.filter_map(Self::nonnull_lit)
|
||||
.map(|lit| encode_lit(lit, data_type.clone()));
|
||||
|
||||
// Collect successful conversions
|
||||
let mut valid_predicates = HashSet::new();
|
||||
for predicate in predicates {
|
||||
match predicate {
|
||||
Ok(p) => {
|
||||
valid_predicates.insert(p);
|
||||
}
|
||||
Err(e) => warn!(e; "Failed to convert value in InList"),
|
||||
}
|
||||
}
|
||||
|
||||
if !valid_predicates.is_empty() {
|
||||
self.output
|
||||
.entry(column_id)
|
||||
.or_default()
|
||||
.push(Predicate::InList(InListPredicate {
|
||||
list: valid_predicates,
|
||||
}));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Helper function to get non-null literal value
|
||||
fn nonnull_lit(expr: &Expr) -> Option<&ScalarValue> {
|
||||
match expr {
|
||||
Expr::Literal(lit) if !lit.is_null() => Some(lit),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(ruihang): extract this and the one under inverted_index into a common util mod.
|
||||
/// Helper function to encode a literal into bytes.
|
||||
fn encode_lit(lit: &ScalarValue, data_type: ConcreteDataType) -> Result<Vec<u8>> {
|
||||
let value = Value::try_from(lit.clone()).context(ConvertValueSnafu)?;
|
||||
let mut bytes = vec![];
|
||||
let field = SortField::new(data_type);
|
||||
IndexValueCodec::encode_nonnull_value(value.as_value_ref(), &field, &mut bytes)?;
|
||||
Ok(bytes)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use api::v1::SemanticType;
|
||||
use datafusion_common::Column;
|
||||
use datatypes::schema::ColumnSchema;
|
||||
use object_store::services::Memory;
|
||||
use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
|
||||
|
||||
use super::*;
|
||||
|
||||
fn test_region_metadata() -> RegionMetadata {
|
||||
let mut builder = RegionMetadataBuilder::new(RegionId::new(1234, 5678));
|
||||
builder
|
||||
.push_column_metadata(ColumnMetadata {
|
||||
column_schema: ColumnSchema::new(
|
||||
"column1",
|
||||
ConcreteDataType::string_datatype(),
|
||||
false,
|
||||
),
|
||||
semantic_type: SemanticType::Tag,
|
||||
column_id: 1,
|
||||
})
|
||||
.push_column_metadata(ColumnMetadata {
|
||||
column_schema: ColumnSchema::new(
|
||||
"column2",
|
||||
ConcreteDataType::int64_datatype(),
|
||||
false,
|
||||
),
|
||||
semantic_type: SemanticType::Field,
|
||||
column_id: 2,
|
||||
})
|
||||
.push_column_metadata(ColumnMetadata {
|
||||
column_schema: ColumnSchema::new(
|
||||
"column3",
|
||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
false,
|
||||
),
|
||||
semantic_type: SemanticType::Timestamp,
|
||||
column_id: 3,
|
||||
})
|
||||
.primary_key(vec![1]);
|
||||
builder.build().unwrap()
|
||||
}
|
||||
|
||||
fn test_object_store() -> ObjectStore {
|
||||
ObjectStore::new(Memory::default()).unwrap().finish()
|
||||
}
|
||||
|
||||
fn column(name: &str) -> Expr {
|
||||
Expr::Column(Column {
|
||||
relation: None,
|
||||
name: name.to_string(),
|
||||
})
|
||||
}
|
||||
|
||||
fn string_lit(s: impl Into<String>) -> Expr {
|
||||
Expr::Literal(ScalarValue::Utf8(Some(s.into())))
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_build_with_exprs() {
|
||||
let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_exprs_");
|
||||
let metadata = test_region_metadata();
|
||||
let builder = BloomFilterIndexApplierBuilder::new(
|
||||
"test".to_string(),
|
||||
test_object_store(),
|
||||
&metadata,
|
||||
factory,
|
||||
);
|
||||
|
||||
let exprs = vec![Expr::BinaryExpr(BinaryExpr {
|
||||
left: Box::new(column("column1")),
|
||||
op: Operator::Eq,
|
||||
right: Box::new(string_lit("value1")),
|
||||
})];
|
||||
|
||||
let result = builder.build(&exprs).unwrap();
|
||||
assert!(result.is_some());
|
||||
|
||||
let filters = result.unwrap().filters;
|
||||
assert_eq!(filters.len(), 1);
|
||||
|
||||
let column_predicates = filters.get(&1).unwrap();
|
||||
assert_eq!(column_predicates.len(), 1);
|
||||
|
||||
let expected = encode_lit(
|
||||
&ScalarValue::Utf8(Some("value1".to_string())),
|
||||
ConcreteDataType::string_datatype(),
|
||||
)
|
||||
.unwrap();
|
||||
match &column_predicates[0] {
|
||||
Predicate::InList(p) => {
|
||||
assert_eq!(p.list.iter().next().unwrap(), &expected);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn int64_lit(i: i64) -> Expr {
|
||||
Expr::Literal(ScalarValue::Int64(Some(i)))
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_build_with_in_list() {
|
||||
let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_in_list_");
|
||||
let metadata = test_region_metadata();
|
||||
let builder = BloomFilterIndexApplierBuilder::new(
|
||||
"test".to_string(),
|
||||
test_object_store(),
|
||||
&metadata,
|
||||
factory,
|
||||
);
|
||||
|
||||
let exprs = vec![Expr::InList(InList {
|
||||
expr: Box::new(column("column2")),
|
||||
list: vec![int64_lit(1), int64_lit(2), int64_lit(3)],
|
||||
negated: false,
|
||||
})];
|
||||
|
||||
let result = builder.build(&exprs).unwrap();
|
||||
assert!(result.is_some());
|
||||
|
||||
let filters = result.unwrap().filters;
|
||||
let column_predicates = filters.get(&2).unwrap();
|
||||
assert_eq!(column_predicates.len(), 1);
|
||||
|
||||
match &column_predicates[0] {
|
||||
Predicate::InList(p) => {
|
||||
assert_eq!(p.list.len(), 3);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_build_with_and_expressions() {
|
||||
let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_and_");
|
||||
let metadata = test_region_metadata();
|
||||
let builder = BloomFilterIndexApplierBuilder::new(
|
||||
"test".to_string(),
|
||||
test_object_store(),
|
||||
&metadata,
|
||||
factory,
|
||||
);
|
||||
|
||||
let exprs = vec![Expr::BinaryExpr(BinaryExpr {
|
||||
left: Box::new(Expr::BinaryExpr(BinaryExpr {
|
||||
left: Box::new(column("column1")),
|
||||
op: Operator::Eq,
|
||||
right: Box::new(string_lit("value1")),
|
||||
})),
|
||||
op: Operator::And,
|
||||
right: Box::new(Expr::BinaryExpr(BinaryExpr {
|
||||
left: Box::new(column("column2")),
|
||||
op: Operator::Eq,
|
||||
right: Box::new(int64_lit(42)),
|
||||
})),
|
||||
})];
|
||||
|
||||
let result = builder.build(&exprs).unwrap();
|
||||
assert!(result.is_some());
|
||||
|
||||
let filters = result.unwrap().filters;
|
||||
assert_eq!(filters.len(), 2);
|
||||
assert!(filters.contains_key(&1));
|
||||
assert!(filters.contains_key(&2));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_build_with_null_values() {
|
||||
let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_null_");
|
||||
let metadata = test_region_metadata();
|
||||
let builder = BloomFilterIndexApplierBuilder::new(
|
||||
"test".to_string(),
|
||||
test_object_store(),
|
||||
&metadata,
|
||||
factory,
|
||||
);
|
||||
|
||||
let exprs = vec![
|
||||
Expr::BinaryExpr(BinaryExpr {
|
||||
left: Box::new(column("column1")),
|
||||
op: Operator::Eq,
|
||||
right: Box::new(Expr::Literal(ScalarValue::Utf8(None))),
|
||||
}),
|
||||
Expr::InList(InList {
|
||||
expr: Box::new(column("column2")),
|
||||
list: vec![
|
||||
int64_lit(1),
|
||||
Expr::Literal(ScalarValue::Int64(None)),
|
||||
int64_lit(3),
|
||||
],
|
||||
negated: false,
|
||||
}),
|
||||
];
|
||||
|
||||
let result = builder.build(&exprs).unwrap();
|
||||
assert!(result.is_some());
|
||||
|
||||
let filters = result.unwrap().filters;
|
||||
assert!(!filters.contains_key(&1)); // Null equality should be ignored
|
||||
let column2_predicates = filters.get(&2).unwrap();
|
||||
match &column2_predicates[0] {
|
||||
Predicate::InList(p) => {
|
||||
assert_eq!(p.list.len(), 2); // Only non-null values should be included
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_build_with_invalid_expressions() {
|
||||
let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_invalid_");
|
||||
let metadata = test_region_metadata();
|
||||
let builder = BloomFilterIndexApplierBuilder::new(
|
||||
"test".to_string(),
|
||||
test_object_store(),
|
||||
&metadata,
|
||||
factory,
|
||||
);
|
||||
|
||||
let exprs = vec![
|
||||
// Non-equality operator
|
||||
Expr::BinaryExpr(BinaryExpr {
|
||||
left: Box::new(column("column1")),
|
||||
op: Operator::Gt,
|
||||
right: Box::new(string_lit("value1")),
|
||||
}),
|
||||
// Non-existent column
|
||||
Expr::BinaryExpr(BinaryExpr {
|
||||
left: Box::new(column("non_existent")),
|
||||
op: Operator::Eq,
|
||||
right: Box::new(string_lit("value")),
|
||||
}),
|
||||
// Negated IN list
|
||||
Expr::InList(InList {
|
||||
expr: Box::new(column("column2")),
|
||||
list: vec![int64_lit(1), int64_lit(2)],
|
||||
negated: true,
|
||||
}),
|
||||
];
|
||||
|
||||
let result = builder.build(&exprs).unwrap();
|
||||
assert!(result.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_build_with_multiple_predicates_same_column() {
|
||||
let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_multiple_");
|
||||
let metadata = test_region_metadata();
|
||||
let builder = BloomFilterIndexApplierBuilder::new(
|
||||
"test".to_string(),
|
||||
test_object_store(),
|
||||
&metadata,
|
||||
factory,
|
||||
);
|
||||
|
||||
let exprs = vec![
|
||||
Expr::BinaryExpr(BinaryExpr {
|
||||
left: Box::new(column("column1")),
|
||||
op: Operator::Eq,
|
||||
right: Box::new(string_lit("value1")),
|
||||
}),
|
||||
Expr::InList(InList {
|
||||
expr: Box::new(column("column1")),
|
||||
list: vec![string_lit("value2"), string_lit("value3")],
|
||||
negated: false,
|
||||
}),
|
||||
];
|
||||
|
||||
let result = builder.build(&exprs).unwrap();
|
||||
assert!(result.is_some());
|
||||
|
||||
let filters = result.unwrap().filters;
|
||||
let column_predicates = filters.get(&1).unwrap();
|
||||
assert_eq!(column_predicates.len(), 2);
|
||||
}
|
||||
}
|
||||
@@ -39,7 +39,7 @@ use crate::sst::index::intermediate::{
|
||||
};
|
||||
use crate::sst::index::puffin_manager::SstPuffinWriter;
|
||||
use crate::sst::index::statistics::{ByteCount, RowCount, Statistics};
|
||||
use crate::sst::index::TYPE_BLOOM_FILTER;
|
||||
use crate::sst::index::TYPE_BLOOM_FILTER_INDEX;
|
||||
|
||||
/// The buffer size for the pipe used to send index data to the puffin blob.
|
||||
const PIPE_BUFFER_SIZE_FOR_SENDING_BLOB: usize = 8192;
|
||||
@@ -114,7 +114,7 @@ impl BloomFilterIndexer {
|
||||
temp_file_provider,
|
||||
codec,
|
||||
aborted: false,
|
||||
stats: Statistics::new(TYPE_BLOOM_FILTER),
|
||||
stats: Statistics::new(TYPE_BLOOM_FILTER_INDEX),
|
||||
global_memory_usage,
|
||||
};
|
||||
Ok(Some(indexer))
|
||||
|
||||
@@ -51,6 +51,7 @@ use crate::read::prune::{PruneReader, Source};
|
||||
use crate::read::{Batch, BatchReader};
|
||||
use crate::row_converter::{McmpRowCodec, SortField};
|
||||
use crate::sst::file::FileHandle;
|
||||
use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplierRef;
|
||||
use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
|
||||
use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef;
|
||||
use crate::sst::parquet::file_range::{FileRangeContext, FileRangeContextRef};
|
||||
@@ -80,6 +81,7 @@ pub struct ParquetReaderBuilder {
|
||||
cache_manager: Option<CacheManagerRef>,
|
||||
/// Index appliers.
|
||||
inverted_index_applier: Option<InvertedIndexApplierRef>,
|
||||
bloom_filter_index_applier: Option<BloomFilterIndexApplierRef>,
|
||||
fulltext_index_applier: Option<FulltextIndexApplierRef>,
|
||||
/// Expected metadata of the region while reading the SST.
|
||||
/// This is usually the latest metadata of the region. The reader use
|
||||
@@ -102,6 +104,7 @@ impl ParquetReaderBuilder {
|
||||
projection: None,
|
||||
cache_manager: None,
|
||||
inverted_index_applier: None,
|
||||
bloom_filter_index_applier: None,
|
||||
fulltext_index_applier: None,
|
||||
expected_metadata: None,
|
||||
}
|
||||
@@ -140,6 +143,16 @@ impl ParquetReaderBuilder {
|
||||
self
|
||||
}
|
||||
|
||||
/// Attaches the bloom filter index applier to the builder.
|
||||
#[must_use]
|
||||
pub(crate) fn bloom_filter_index_applier(
|
||||
mut self,
|
||||
index_applier: Option<BloomFilterIndexApplierRef>,
|
||||
) -> Self {
|
||||
self.bloom_filter_index_applier = index_applier;
|
||||
self
|
||||
}
|
||||
|
||||
/// Attaches the fulltext index applier to the builder.
|
||||
#[must_use]
|
||||
pub(crate) fn fulltext_index_applier(
|
||||
@@ -359,6 +372,9 @@ impl ParquetReaderBuilder {
|
||||
self.prune_row_groups_by_minmax(read_format, parquet_meta, &mut output, metrics);
|
||||
}
|
||||
|
||||
self.prune_row_groups_by_bloom_filter(parquet_meta, &mut output, metrics)
|
||||
.await;
|
||||
|
||||
output
|
||||
}
|
||||
|
||||
@@ -607,6 +623,53 @@ impl ParquetReaderBuilder {
|
||||
*output = res;
|
||||
}
|
||||
|
||||
async fn prune_row_groups_by_bloom_filter(
|
||||
&self,
|
||||
parquet_meta: &ParquetMetaData,
|
||||
output: &mut BTreeMap<usize, Option<RowSelection>>,
|
||||
_metrics: &mut ReaderFilterMetrics,
|
||||
) -> bool {
|
||||
let Some(index_applier) = &self.bloom_filter_index_applier else {
|
||||
return false;
|
||||
};
|
||||
|
||||
if !self.file_handle.meta_ref().bloom_filter_index_available() {
|
||||
return false;
|
||||
}
|
||||
|
||||
let file_size_hint = self.file_handle.meta_ref().bloom_filter_index_size();
|
||||
match index_applier
|
||||
.apply(
|
||||
self.file_handle.file_id(),
|
||||
file_size_hint,
|
||||
parquet_meta.row_groups(),
|
||||
output,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(output) => output,
|
||||
Err(err) => {
|
||||
if cfg!(any(test, feature = "test")) {
|
||||
panic!(
|
||||
"Failed to apply bloom filter index, region_id: {}, file_id: {}, err: {}",
|
||||
self.file_handle.region_id(),
|
||||
self.file_handle.file_id(),
|
||||
err
|
||||
);
|
||||
} else {
|
||||
warn!(
|
||||
err; "Failed to apply bloom filter index, region_id: {}, file_id: {}",
|
||||
self.file_handle.region_id(), self.file_handle.file_id()
|
||||
);
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
};
|
||||
|
||||
true
|
||||
}
|
||||
|
||||
/// Prunes row groups by ranges. The `ranges_in_row_groups` is like a map from row group to
|
||||
/// a list of row ranges to keep.
|
||||
fn prune_row_groups_by_ranges(
|
||||
|
||||
@@ -643,7 +643,6 @@ impl TestEnv {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let object_store_manager = self.get_object_store_manager().unwrap();
|
||||
let write_cache = WriteCache::new(local_store, capacity, None, puffin_mgr, intm_mgr)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
Reference in New Issue
Block a user