mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-03 20:02:54 +00:00
feat: apply terms with fulltext bloom backend (#5884)
* feat: apply terms with fulltext bloom backend Signed-off-by: Zhenchi <zhongzc_arch@outlook.com> * perf: preload jieba Signed-off-by: Zhenchi <zhongzc_arch@outlook.com> * polish doc Signed-off-by: Zhenchi <zhongzc_arch@outlook.com> --------- Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -5591,6 +5591,7 @@ dependencies = [
|
||||
"greptime-proto",
|
||||
"itertools 0.14.0",
|
||||
"jieba-rs",
|
||||
"lazy_static",
|
||||
"mockall",
|
||||
"pin-project",
|
||||
"prost 0.13.5",
|
||||
|
||||
@@ -23,6 +23,7 @@ futures.workspace = true
|
||||
greptime-proto.workspace = true
|
||||
itertools.workspace = true
|
||||
jieba-rs = "0.7"
|
||||
lazy_static.workspace = true
|
||||
mockall.workspace = true
|
||||
pin-project.workspace = true
|
||||
prost.workspace = true
|
||||
|
||||
@@ -12,11 +12,13 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use jieba_rs::Jieba;
|
||||
|
||||
use crate::fulltext_index::error::Result;
|
||||
use crate::Bytes;
|
||||
|
||||
lazy_static::lazy_static! {
|
||||
static ref JIEBA: jieba_rs::Jieba = jieba_rs::Jieba::new();
|
||||
}
|
||||
|
||||
/// `Tokenizer` tokenizes a text into a list of tokens.
|
||||
pub trait Tokenizer: Send {
|
||||
fn tokenize<'a>(&self, text: &'a str) -> Vec<&'a str>;
|
||||
@@ -44,8 +46,7 @@ pub struct ChineseTokenizer;
|
||||
|
||||
impl Tokenizer for ChineseTokenizer {
|
||||
fn tokenize<'a>(&self, text: &'a str) -> Vec<&'a str> {
|
||||
let jieba = Jieba::new();
|
||||
jieba.cut(text, false)
|
||||
JIEBA.cut(text, false)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
37
src/mito2/src/cache/index/bloom_filter_index.rs
vendored
37
src/mito2/src/cache/index/bloom_filter_index.rs
vendored
@@ -29,8 +29,15 @@ use crate::sst::file::FileId;
|
||||
|
||||
const INDEX_TYPE_BLOOM_FILTER_INDEX: &str = "bloom_filter_index";
|
||||
|
||||
/// Tag for bloom filter index cache.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
|
||||
pub enum Tag {
|
||||
Skipping,
|
||||
Fulltext,
|
||||
}
|
||||
|
||||
/// Cache for bloom filter index.
|
||||
pub type BloomFilterIndexCache = IndexCache<(FileId, ColumnId), BloomFilterMeta>;
|
||||
pub type BloomFilterIndexCache = IndexCache<(FileId, ColumnId, Tag), BloomFilterMeta>;
|
||||
pub type BloomFilterIndexCacheRef = Arc<BloomFilterIndexCache>;
|
||||
|
||||
impl BloomFilterIndexCache {
|
||||
@@ -48,14 +55,20 @@ impl BloomFilterIndexCache {
|
||||
}
|
||||
|
||||
/// Calculates weight for bloom filter index metadata.
|
||||
fn bloom_filter_index_metadata_weight(k: &(FileId, ColumnId), _: &Arc<BloomFilterMeta>) -> u32 {
|
||||
fn bloom_filter_index_metadata_weight(
|
||||
k: &(FileId, ColumnId, Tag),
|
||||
_: &Arc<BloomFilterMeta>,
|
||||
) -> u32 {
|
||||
(k.0.as_bytes().len()
|
||||
+ std::mem::size_of::<ColumnId>()
|
||||
+ std::mem::size_of::<BloomFilterMeta>()) as u32
|
||||
}
|
||||
|
||||
/// Calculates weight for bloom filter index content.
|
||||
fn bloom_filter_index_content_weight((k, _): &((FileId, ColumnId), PageKey), v: &Bytes) -> u32 {
|
||||
fn bloom_filter_index_content_weight(
|
||||
(k, _): &((FileId, ColumnId, Tag), PageKey),
|
||||
v: &Bytes,
|
||||
) -> u32 {
|
||||
(k.0.as_bytes().len() + std::mem::size_of::<ColumnId>() + v.len()) as u32
|
||||
}
|
||||
|
||||
@@ -63,6 +76,7 @@ fn bloom_filter_index_content_weight((k, _): &((FileId, ColumnId), PageKey), v:
|
||||
pub struct CachedBloomFilterIndexBlobReader<R> {
|
||||
file_id: FileId,
|
||||
column_id: ColumnId,
|
||||
tag: Tag,
|
||||
blob_size: u64,
|
||||
inner: R,
|
||||
cache: BloomFilterIndexCacheRef,
|
||||
@@ -73,6 +87,7 @@ impl<R> CachedBloomFilterIndexBlobReader<R> {
|
||||
pub fn new(
|
||||
file_id: FileId,
|
||||
column_id: ColumnId,
|
||||
tag: Tag,
|
||||
blob_size: u64,
|
||||
inner: R,
|
||||
cache: BloomFilterIndexCacheRef,
|
||||
@@ -80,6 +95,7 @@ impl<R> CachedBloomFilterIndexBlobReader<R> {
|
||||
Self {
|
||||
file_id,
|
||||
column_id,
|
||||
tag,
|
||||
blob_size,
|
||||
inner,
|
||||
cache,
|
||||
@@ -93,7 +109,7 @@ impl<R: BloomFilterReader + Send> BloomFilterReader for CachedBloomFilterIndexBl
|
||||
let inner = &self.inner;
|
||||
self.cache
|
||||
.get_or_load(
|
||||
(self.file_id, self.column_id),
|
||||
(self.file_id, self.column_id, self.tag),
|
||||
self.blob_size,
|
||||
offset,
|
||||
size,
|
||||
@@ -107,7 +123,7 @@ impl<R: BloomFilterReader + Send> BloomFilterReader for CachedBloomFilterIndexBl
|
||||
let fetch = ranges.iter().map(|range| {
|
||||
let inner = &self.inner;
|
||||
self.cache.get_or_load(
|
||||
(self.file_id, self.column_id),
|
||||
(self.file_id, self.column_id, self.tag),
|
||||
self.blob_size,
|
||||
range.start,
|
||||
(range.end - range.start) as u32,
|
||||
@@ -123,13 +139,18 @@ impl<R: BloomFilterReader + Send> BloomFilterReader for CachedBloomFilterIndexBl
|
||||
|
||||
/// Reads the meta information of the bloom filter.
|
||||
async fn metadata(&self) -> Result<BloomFilterMeta> {
|
||||
if let Some(cached) = self.cache.get_metadata((self.file_id, self.column_id)) {
|
||||
if let Some(cached) = self
|
||||
.cache
|
||||
.get_metadata((self.file_id, self.column_id, self.tag))
|
||||
{
|
||||
CACHE_HIT.with_label_values(&[INDEX_METADATA_TYPE]).inc();
|
||||
Ok((*cached).clone())
|
||||
} else {
|
||||
let meta = self.inner.metadata().await?;
|
||||
self.cache
|
||||
.put_metadata((self.file_id, self.column_id), Arc::new(meta.clone()));
|
||||
self.cache.put_metadata(
|
||||
(self.file_id, self.column_id, self.tag),
|
||||
Arc::new(meta.clone()),
|
||||
);
|
||||
CACHE_MISS.with_label_values(&[INDEX_METADATA_TYPE]).inc();
|
||||
Ok(meta)
|
||||
}
|
||||
|
||||
@@ -502,7 +502,7 @@ impl ScanRegion {
|
||||
|
||||
let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
|
||||
let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
|
||||
|
||||
let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
|
||||
FulltextIndexApplierBuilder::new(
|
||||
self.access_layer.region_dir().to_string(),
|
||||
self.version.metadata.region_id,
|
||||
@@ -512,6 +512,7 @@ impl ScanRegion {
|
||||
)
|
||||
.with_file_cache(file_cache)
|
||||
.with_puffin_metadata_cache(puffin_metadata_cache)
|
||||
.with_bloom_filter_cache(bloom_filter_index_cache)
|
||||
.build(&self.request.filters)
|
||||
.inspect_err(|err| warn!(err; "Failed to build fulltext index applier"))
|
||||
.ok()
|
||||
|
||||
@@ -505,6 +505,7 @@ pub(crate) fn scan_file_ranges(
|
||||
|
||||
// Reports metrics.
|
||||
reader_metrics.observe_rows(read_type);
|
||||
reader_metrics.filter_metrics.observe();
|
||||
part_metrics.merge_reader_metrics(&reader_metrics);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -31,7 +31,7 @@ use store_api::storage::{ColumnId, RegionId};
|
||||
use crate::access_layer::{RegionFilePathFactory, WriteCachePathProvider};
|
||||
use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
|
||||
use crate::cache::index::bloom_filter_index::{
|
||||
BloomFilterIndexCacheRef, CachedBloomFilterIndexBlobReader,
|
||||
BloomFilterIndexCacheRef, CachedBloomFilterIndexBlobReader, Tag,
|
||||
};
|
||||
use crate::error::{
|
||||
ApplyBloomFilterIndexSnafu, Error, MetadataSnafu, PuffinBuildReaderSnafu, PuffinReadBlobSnafu,
|
||||
@@ -165,6 +165,7 @@ impl BloomFilterIndexApplier {
|
||||
let reader = CachedBloomFilterIndexBlobReader::new(
|
||||
file_id,
|
||||
*column_id,
|
||||
Tag::Skipping,
|
||||
blob_size,
|
||||
BloomFilterReaderImpl::new(blob),
|
||||
bloom_filter_cache.clone(),
|
||||
@@ -308,13 +309,13 @@ impl BloomFilterIndexApplier {
|
||||
) -> std::result::Result<(), index::bloom_filter::error::Error> {
|
||||
let mut applier = BloomFilterApplier::new(Box::new(reader)).await?;
|
||||
|
||||
for (_, output) in output.iter_mut() {
|
||||
for (_, row_group_output) in output.iter_mut() {
|
||||
// All rows are filtered out, skip the search
|
||||
if output.is_empty() {
|
||||
if row_group_output.is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
*output = applier.search(predicates, output).await?;
|
||||
*row_group_output = applier.search(predicates, row_group_output).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -12,10 +12,15 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::{BTreeSet, HashMap};
|
||||
use std::collections::{BTreeSet, HashMap, HashSet};
|
||||
use std::iter;
|
||||
use std::ops::Range;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_base::range_read::RangeReader;
|
||||
use common_telemetry::warn;
|
||||
use index::bloom_filter::applier::{BloomFilterApplier, InListPredicate};
|
||||
use index::bloom_filter::reader::BloomFilterReaderImpl;
|
||||
use index::fulltext_index::search::{FulltextIndexSearcher, RowId, TantivyFulltextIndexSearcher};
|
||||
use index::fulltext_index::Config;
|
||||
use object_store::ObjectStore;
|
||||
@@ -26,11 +31,17 @@ use store_api::storage::{ColumnId, RegionId};
|
||||
|
||||
use crate::access_layer::{RegionFilePathFactory, WriteCachePathProvider};
|
||||
use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
|
||||
use crate::error::{ApplyFulltextIndexSnafu, PuffinBuildReaderSnafu, PuffinReadBlobSnafu, Result};
|
||||
use crate::cache::index::bloom_filter_index::{
|
||||
BloomFilterIndexCacheRef, CachedBloomFilterIndexBlobReader, Tag,
|
||||
};
|
||||
use crate::error::{
|
||||
ApplyBloomFilterIndexSnafu, ApplyFulltextIndexSnafu, MetadataSnafu, PuffinBuildReaderSnafu,
|
||||
PuffinReadBlobSnafu, Result,
|
||||
};
|
||||
use crate::metrics::INDEX_APPLY_ELAPSED;
|
||||
use crate::sst::file::FileId;
|
||||
use crate::sst::index::fulltext_index::applier::builder::FulltextRequest;
|
||||
use crate::sst::index::fulltext_index::INDEX_BLOB_TYPE_TANTIVY;
|
||||
use crate::sst::index::fulltext_index::applier::builder::{FulltextRequest, FulltextTerm};
|
||||
use crate::sst::index::fulltext_index::{INDEX_BLOB_TYPE_BLOOM, INDEX_BLOB_TYPE_TANTIVY};
|
||||
use crate::sst::index::puffin_manager::{
|
||||
PuffinManagerFactory, SstPuffinBlob, SstPuffinDir, SstPuffinReader,
|
||||
};
|
||||
@@ -45,6 +56,9 @@ pub struct FulltextIndexApplier {
|
||||
|
||||
/// The source of the index.
|
||||
index_source: IndexSource,
|
||||
|
||||
/// Cache for bloom filter index.
|
||||
bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
|
||||
}
|
||||
|
||||
pub type FulltextIndexApplierRef = Arc<FulltextIndexApplier>;
|
||||
@@ -63,6 +77,7 @@ impl FulltextIndexApplier {
|
||||
Self {
|
||||
requests,
|
||||
index_source,
|
||||
bloom_filter_index_cache: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -82,13 +97,25 @@ impl FulltextIndexApplier {
|
||||
self
|
||||
}
|
||||
|
||||
/// Applies the queries to the fulltext index of the specified SST file.
|
||||
pub async fn apply(
|
||||
/// Sets the bloom filter cache.
|
||||
pub fn with_bloom_filter_cache(
|
||||
mut self,
|
||||
bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
|
||||
) -> Self {
|
||||
self.bloom_filter_index_cache = bloom_filter_index_cache;
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl FulltextIndexApplier {
|
||||
/// Applies fine-grained fulltext index to the specified SST file.
|
||||
/// Returns the row ids that match the queries.
|
||||
pub async fn apply_fine(
|
||||
&self,
|
||||
file_id: FileId,
|
||||
file_size_hint: Option<u64>,
|
||||
) -> Result<Option<BTreeSet<RowId>>> {
|
||||
let _timer = INDEX_APPLY_ELAPSED
|
||||
let timer = INDEX_APPLY_ELAPSED
|
||||
.with_label_values(&[TYPE_FULLTEXT_INDEX])
|
||||
.start_timer();
|
||||
|
||||
@@ -99,7 +126,7 @@ impl FulltextIndexApplier {
|
||||
}
|
||||
|
||||
let Some(result) = self
|
||||
.apply_one_column(file_size_hint, file_id, *column_id, request)
|
||||
.apply_fine_one_column(file_size_hint, file_id, *column_id, request)
|
||||
.await?
|
||||
else {
|
||||
continue;
|
||||
@@ -118,10 +145,13 @@ impl FulltextIndexApplier {
|
||||
}
|
||||
}
|
||||
|
||||
if row_ids.is_none() {
|
||||
timer.stop_and_discard();
|
||||
}
|
||||
Ok(row_ids)
|
||||
}
|
||||
|
||||
async fn apply_one_column(
|
||||
async fn apply_fine_one_column(
|
||||
&self,
|
||||
file_size_hint: Option<u64>,
|
||||
file_id: FileId,
|
||||
@@ -187,6 +217,195 @@ impl FulltextIndexApplier {
|
||||
}
|
||||
}
|
||||
|
||||
impl FulltextIndexApplier {
|
||||
/// Applies coarse-grained fulltext index to the specified SST file.
|
||||
/// Returns (row group id -> ranges) that match the queries.
|
||||
pub async fn apply_coarse(
|
||||
&self,
|
||||
file_id: FileId,
|
||||
file_size_hint: Option<u64>,
|
||||
row_groups: impl Iterator<Item = (usize, bool)>,
|
||||
) -> Result<Option<Vec<(usize, Vec<Range<usize>>)>>> {
|
||||
let timer = INDEX_APPLY_ELAPSED
|
||||
.with_label_values(&[TYPE_FULLTEXT_INDEX])
|
||||
.start_timer();
|
||||
|
||||
let (input, mut output) = Self::init_coarse_output(row_groups);
|
||||
let mut applied = false;
|
||||
|
||||
for (column_id, request) in &self.requests {
|
||||
if request.terms.is_empty() {
|
||||
// only apply terms
|
||||
continue;
|
||||
}
|
||||
|
||||
applied |= self
|
||||
.apply_coarse_one_column(
|
||||
file_id,
|
||||
file_size_hint,
|
||||
*column_id,
|
||||
&request.terms,
|
||||
&mut output,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
if !applied {
|
||||
timer.stop_and_discard();
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
Self::adjust_coarse_output(input, &mut output);
|
||||
Ok(Some(output))
|
||||
}
|
||||
|
||||
async fn apply_coarse_one_column(
|
||||
&self,
|
||||
file_id: FileId,
|
||||
file_size_hint: Option<u64>,
|
||||
column_id: ColumnId,
|
||||
terms: &[FulltextTerm],
|
||||
output: &mut [(usize, Vec<Range<usize>>)],
|
||||
) -> Result<bool> {
|
||||
let blob_key = format!("{INDEX_BLOB_TYPE_BLOOM}-{column_id}");
|
||||
let Some(reader) = self
|
||||
.index_source
|
||||
.blob(file_id, &blob_key, file_size_hint)
|
||||
.await?
|
||||
else {
|
||||
return Ok(false);
|
||||
};
|
||||
let config =
|
||||
Config::from_blob_metadata(reader.metadata()).context(ApplyFulltextIndexSnafu)?;
|
||||
|
||||
let predicates = Self::terms_to_predicates(terms, &config);
|
||||
if predicates.is_empty() {
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
let range_reader = reader.reader().await.context(PuffinBuildReaderSnafu)?;
|
||||
let reader = if let Some(bloom_filter_cache) = &self.bloom_filter_index_cache {
|
||||
let blob_size = range_reader
|
||||
.metadata()
|
||||
.await
|
||||
.context(MetadataSnafu)?
|
||||
.content_length;
|
||||
let reader = CachedBloomFilterIndexBlobReader::new(
|
||||
file_id,
|
||||
column_id,
|
||||
Tag::Fulltext,
|
||||
blob_size,
|
||||
BloomFilterReaderImpl::new(range_reader),
|
||||
bloom_filter_cache.clone(),
|
||||
);
|
||||
Box::new(reader) as _
|
||||
} else {
|
||||
Box::new(BloomFilterReaderImpl::new(range_reader)) as _
|
||||
};
|
||||
|
||||
let mut applier = BloomFilterApplier::new(reader)
|
||||
.await
|
||||
.context(ApplyBloomFilterIndexSnafu)?;
|
||||
for (_, row_group_output) in output.iter_mut() {
|
||||
// All rows are filtered out, skip the search
|
||||
if row_group_output.is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
*row_group_output = applier
|
||||
.search(&predicates, row_group_output)
|
||||
.await
|
||||
.context(ApplyBloomFilterIndexSnafu)?;
|
||||
}
|
||||
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
/// Initializes the coarse output. Must call `adjust_coarse_output` after applying bloom filters.
|
||||
///
|
||||
/// `row_groups` is a list of (row group length, whether to search).
|
||||
///
|
||||
/// Returns (`input`, `output`):
|
||||
/// * `input` is a list of (row group index to search, row group range based on start of the file).
|
||||
/// * `output` is a list of (row group index to search, row group ranges based on start of the file).
|
||||
#[allow(clippy::type_complexity)]
|
||||
fn init_coarse_output(
|
||||
row_groups: impl Iterator<Item = (usize, bool)>,
|
||||
) -> (Vec<(usize, Range<usize>)>, Vec<(usize, Vec<Range<usize>>)>) {
|
||||
// Calculates row groups' ranges based on start of the file.
|
||||
let mut input = Vec::with_capacity(row_groups.size_hint().0);
|
||||
let mut start = 0;
|
||||
for (i, (len, to_search)) in row_groups.enumerate() {
|
||||
let end = start + len;
|
||||
if to_search {
|
||||
input.push((i, start..end));
|
||||
}
|
||||
start = end;
|
||||
}
|
||||
|
||||
// Initializes output with input ranges, but ranges are based on start of the file not the row group,
|
||||
// so we need to adjust them later.
|
||||
let output = input
|
||||
.iter()
|
||||
.map(|(i, range)| (*i, vec![range.clone()]))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
(input, output)
|
||||
}
|
||||
|
||||
/// Adjusts the coarse output. Makes the output ranges based on row group start.
|
||||
fn adjust_coarse_output(
|
||||
input: Vec<(usize, Range<usize>)>,
|
||||
output: &mut Vec<(usize, Vec<Range<usize>>)>,
|
||||
) {
|
||||
// adjust ranges to be based on row group
|
||||
for ((_, output), (_, input)) in output.iter_mut().zip(input) {
|
||||
let start = input.start;
|
||||
for range in output.iter_mut() {
|
||||
range.start -= start;
|
||||
range.end -= start;
|
||||
}
|
||||
}
|
||||
output.retain(|(_, ranges)| !ranges.is_empty());
|
||||
}
|
||||
|
||||
/// Converts terms to predicates.
|
||||
///
|
||||
/// Split terms by non-alphanumeric characters and convert them to lowercase if case-insensitive.
|
||||
/// Multiple terms are combined with AND semantics.
|
||||
fn terms_to_predicates(terms: &[FulltextTerm], config: &Config) -> Vec<InListPredicate> {
|
||||
let mut probes = HashSet::new();
|
||||
for term in terms {
|
||||
if config.case_sensitive && term.col_lowered {
|
||||
// lowercased terms are not indexed
|
||||
continue;
|
||||
}
|
||||
|
||||
let ts = term
|
||||
.term
|
||||
.split(|c: char| !c.is_alphanumeric())
|
||||
.filter(|&t| !t.is_empty())
|
||||
.map(|t| {
|
||||
if !config.case_sensitive {
|
||||
t.to_lowercase()
|
||||
} else {
|
||||
t.to_string()
|
||||
}
|
||||
.into_bytes()
|
||||
});
|
||||
|
||||
probes.extend(ts);
|
||||
}
|
||||
|
||||
probes
|
||||
.into_iter()
|
||||
.map(|p| InListPredicate {
|
||||
list: iter::once(p).collect(),
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
}
|
||||
}
|
||||
|
||||
/// The source of the index.
|
||||
struct IndexSource {
|
||||
region_dir: String,
|
||||
|
||||
@@ -23,6 +23,7 @@ use store_api::metadata::RegionMetadata;
|
||||
use store_api::storage::{ColumnId, ConcreteDataType, RegionId};
|
||||
|
||||
use crate::cache::file_cache::FileCacheRef;
|
||||
use crate::cache::index::bloom_filter_index::BloomFilterIndexCacheRef;
|
||||
use crate::error::Result;
|
||||
use crate::sst::index::fulltext_index::applier::FulltextIndexApplier;
|
||||
use crate::sst::index::puffin_manager::PuffinManagerFactory;
|
||||
@@ -86,6 +87,7 @@ pub struct FulltextIndexApplierBuilder<'a> {
|
||||
metadata: &'a RegionMetadata,
|
||||
file_cache: Option<FileCacheRef>,
|
||||
puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
|
||||
bloom_filter_cache: Option<BloomFilterIndexCacheRef>,
|
||||
}
|
||||
|
||||
impl<'a> FulltextIndexApplierBuilder<'a> {
|
||||
@@ -105,6 +107,7 @@ impl<'a> FulltextIndexApplierBuilder<'a> {
|
||||
metadata,
|
||||
file_cache: None,
|
||||
puffin_metadata_cache: None,
|
||||
bloom_filter_cache: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -123,6 +126,15 @@ impl<'a> FulltextIndexApplierBuilder<'a> {
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets the bloom filter cache to be used by the `FulltextIndexApplier`.
|
||||
pub fn with_bloom_filter_cache(
|
||||
mut self,
|
||||
bloom_filter_cache: Option<BloomFilterIndexCacheRef>,
|
||||
) -> Self {
|
||||
self.bloom_filter_cache = bloom_filter_cache;
|
||||
self
|
||||
}
|
||||
|
||||
/// Builds `SstIndexApplier` from the given expressions.
|
||||
pub fn build(self, exprs: &[Expr]) -> Result<Option<FulltextIndexApplier>> {
|
||||
let mut requests = HashMap::new();
|
||||
@@ -145,6 +157,7 @@ impl<'a> FulltextIndexApplierBuilder<'a> {
|
||||
)
|
||||
.with_file_cache(self.file_cache)
|
||||
.with_puffin_metadata_cache(self.puffin_metadata_cache)
|
||||
.with_bloom_filter_cache(self.bloom_filter_cache)
|
||||
}))
|
||||
}
|
||||
|
||||
|
||||
@@ -360,6 +360,7 @@ mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::SemanticType;
|
||||
use common_base::BitVec;
|
||||
use datatypes::data_type::DataType;
|
||||
use datatypes::schema::{ColumnSchema, FulltextAnalyzer, FulltextOptions};
|
||||
use datatypes::vectors::{UInt64Vector, UInt8Vector};
|
||||
@@ -390,7 +391,7 @@ mod tests {
|
||||
IntermediateManager::init_fs(path).await.unwrap()
|
||||
}
|
||||
|
||||
fn mock_region_metadata() -> RegionMetadataRef {
|
||||
fn mock_region_metadata(backend: FulltextBackend) -> RegionMetadataRef {
|
||||
let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2));
|
||||
builder
|
||||
.push_column_metadata(ColumnMetadata {
|
||||
@@ -403,7 +404,7 @@ mod tests {
|
||||
enable: true,
|
||||
analyzer: FulltextAnalyzer::English,
|
||||
case_sensitive: true,
|
||||
backend: FulltextBackend::Tantivy,
|
||||
backend: backend.clone(),
|
||||
})
|
||||
.unwrap(),
|
||||
semantic_type: SemanticType::Field,
|
||||
@@ -419,7 +420,7 @@ mod tests {
|
||||
enable: true,
|
||||
analyzer: FulltextAnalyzer::English,
|
||||
case_sensitive: false,
|
||||
backend: FulltextBackend::Tantivy,
|
||||
backend: backend.clone(),
|
||||
})
|
||||
.unwrap(),
|
||||
semantic_type: SemanticType::Field,
|
||||
@@ -435,7 +436,7 @@ mod tests {
|
||||
enable: true,
|
||||
analyzer: FulltextAnalyzer::Chinese,
|
||||
case_sensitive: false,
|
||||
backend: FulltextBackend::Tantivy,
|
||||
backend: backend.clone(),
|
||||
})
|
||||
.unwrap(),
|
||||
semantic_type: SemanticType::Field,
|
||||
@@ -522,6 +523,7 @@ mod tests {
|
||||
/// - `terms`: A list of (ColumnId, [(bool, String)]) for fulltext terms, where bool indicates if term is lowercased
|
||||
async fn build_fulltext_applier_factory(
|
||||
prefix: &str,
|
||||
backend: FulltextBackend,
|
||||
rows: &[(
|
||||
Option<&str>, // text_english_case_sensitive
|
||||
Option<&str>, // text_english_case_insensitive
|
||||
@@ -530,12 +532,13 @@ mod tests {
|
||||
) -> impl Fn(
|
||||
Vec<(ColumnId, &str)>,
|
||||
Vec<(ColumnId, Vec<(bool, &str)>)>,
|
||||
Option<BitVec>,
|
||||
) -> BoxFuture<'static, Option<BTreeSet<RowId>>> {
|
||||
let (d, factory) = PuffinManagerFactory::new_for_test_async(prefix).await;
|
||||
let region_dir = "region0".to_string();
|
||||
let sst_file_id = FileId::random();
|
||||
let object_store = mock_object_store();
|
||||
let region_metadata = mock_region_metadata();
|
||||
let region_metadata = mock_region_metadata(backend.clone());
|
||||
let intm_mgr = new_intm_mgr(d.path().to_string_lossy()).await;
|
||||
|
||||
let mut indexer = FulltextIndexer::new(
|
||||
@@ -544,7 +547,7 @@ mod tests {
|
||||
&intm_mgr,
|
||||
®ion_metadata,
|
||||
true,
|
||||
8096,
|
||||
1,
|
||||
1024,
|
||||
)
|
||||
.await
|
||||
@@ -562,7 +565,9 @@ mod tests {
|
||||
let _ = indexer.finish(&mut writer).await.unwrap();
|
||||
writer.finish().await.unwrap();
|
||||
|
||||
move |queries: Vec<(ColumnId, &str)>, terms_requests: Vec<(ColumnId, Vec<(bool, &str)>)>| {
|
||||
move |queries: Vec<(ColumnId, &str)>,
|
||||
terms_requests: Vec<(ColumnId, Vec<(bool, &str)>)>,
|
||||
coarse_mask: Option<BitVec>| {
|
||||
let _d = &d;
|
||||
let region_dir = region_dir.clone();
|
||||
let object_store = object_store.clone();
|
||||
@@ -604,7 +609,29 @@ mod tests {
|
||||
factory,
|
||||
);
|
||||
|
||||
async move { applier.apply(sst_file_id, None).await.unwrap() }.boxed()
|
||||
let backend = backend.clone();
|
||||
async move {
|
||||
match backend {
|
||||
FulltextBackend::Tantivy => {
|
||||
applier.apply_fine(sst_file_id, None).await.unwrap()
|
||||
}
|
||||
FulltextBackend::Bloom => {
|
||||
let coarse_mask = coarse_mask.unwrap_or_default();
|
||||
let row_groups = (0..coarse_mask.len()).map(|i| (1, coarse_mask[i]));
|
||||
// row group id == row id
|
||||
let resp = applier
|
||||
.apply_coarse(sst_file_id, None, row_groups)
|
||||
.await
|
||||
.unwrap();
|
||||
resp.map(|r| {
|
||||
r.into_iter()
|
||||
.map(|(row_group_id, _)| row_group_id as RowId)
|
||||
.collect()
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
.boxed()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -613,9 +640,10 @@ mod tests {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_fulltext_index_basic_case_sensitive() {
|
||||
async fn test_fulltext_index_basic_case_sensitive_tantivy() {
|
||||
let applier_factory = build_fulltext_applier_factory(
|
||||
"test_fulltext_index_basic_case_sensitive_",
|
||||
"test_fulltext_index_basic_case_sensitive_tantivy_",
|
||||
FulltextBackend::Tantivy,
|
||||
&[
|
||||
(Some("hello"), None, None),
|
||||
(Some("world"), None, None),
|
||||
@@ -625,47 +653,159 @@ mod tests {
|
||||
)
|
||||
.await;
|
||||
|
||||
let row_ids = applier_factory(vec![(1, "hello")], vec![]).await;
|
||||
let row_ids = applier_factory(vec![(1, "hello")], vec![], None).await;
|
||||
assert_eq!(row_ids, Some(rows([0])));
|
||||
|
||||
let row_ids = applier_factory(vec![(1, "world")], vec![]).await;
|
||||
let row_ids = applier_factory(vec![(1, "world")], vec![], None).await;
|
||||
assert_eq!(row_ids, Some(rows([1])));
|
||||
|
||||
let row_ids = applier_factory(vec![(1, "Hello")], vec![]).await;
|
||||
let row_ids = applier_factory(vec![(1, "Hello")], vec![], None).await;
|
||||
assert_eq!(row_ids, Some(rows([3])));
|
||||
|
||||
let row_ids = applier_factory(vec![(1, "World")], vec![]).await;
|
||||
let row_ids = applier_factory(vec![(1, "World")], vec![], None).await;
|
||||
assert_eq!(row_ids, Some(rows([3])));
|
||||
|
||||
let row_ids = applier_factory(vec![], vec![(1, vec![(false, "hello")])]).await;
|
||||
let row_ids = applier_factory(vec![], vec![(1, vec![(false, "hello")])], None).await;
|
||||
assert_eq!(row_ids, Some(rows([0])));
|
||||
|
||||
let row_ids = applier_factory(vec![], vec![(1, vec![(true, "hello")])]).await;
|
||||
let row_ids = applier_factory(vec![], vec![(1, vec![(true, "hello")])], None).await;
|
||||
assert_eq!(row_ids, None);
|
||||
|
||||
let row_ids = applier_factory(vec![], vec![(1, vec![(false, "world")])]).await;
|
||||
let row_ids = applier_factory(vec![], vec![(1, vec![(false, "world")])], None).await;
|
||||
assert_eq!(row_ids, Some(rows([1])));
|
||||
|
||||
let row_ids = applier_factory(vec![], vec![(1, vec![(true, "world")])]).await;
|
||||
let row_ids = applier_factory(vec![], vec![(1, vec![(true, "world")])], None).await;
|
||||
assert_eq!(row_ids, None);
|
||||
|
||||
let row_ids = applier_factory(vec![], vec![(1, vec![(false, "Hello")])]).await;
|
||||
let row_ids = applier_factory(vec![], vec![(1, vec![(false, "Hello")])], None).await;
|
||||
assert_eq!(row_ids, Some(rows([3])));
|
||||
|
||||
let row_ids = applier_factory(vec![], vec![(1, vec![(true, "Hello")])]).await;
|
||||
let row_ids = applier_factory(vec![], vec![(1, vec![(true, "Hello")])], None).await;
|
||||
assert_eq!(row_ids, None);
|
||||
|
||||
let row_ids = applier_factory(vec![], vec![(1, vec![(false, "Hello, World")])]).await;
|
||||
let row_ids = applier_factory(vec![], vec![(1, vec![(false, "Hello, World")])], None).await;
|
||||
assert_eq!(row_ids, Some(rows([3])));
|
||||
|
||||
let row_ids = applier_factory(vec![], vec![(1, vec![(true, "Hello, World")])]).await;
|
||||
let row_ids = applier_factory(vec![], vec![(1, vec![(true, "Hello, World")])], None).await;
|
||||
assert_eq!(row_ids, None);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_fulltext_index_basic_case_insensitive() {
|
||||
async fn test_fulltext_index_basic_case_sensitive_bloom() {
|
||||
let applier_factory = build_fulltext_applier_factory(
|
||||
"test_fulltext_index_basic_case_insensitive_",
|
||||
"test_fulltext_index_basic_case_sensitive_bloom_",
|
||||
FulltextBackend::Bloom,
|
||||
&[
|
||||
(Some("hello"), None, None),
|
||||
(Some("world"), None, None),
|
||||
(None, None, None),
|
||||
(Some("Hello, World"), None, None),
|
||||
],
|
||||
)
|
||||
.await;
|
||||
|
||||
let row_ids = applier_factory(
|
||||
vec![],
|
||||
vec![(1, vec![(false, "hello")])],
|
||||
Some(BitVec::from_slice(&[0b1111])),
|
||||
)
|
||||
.await;
|
||||
assert_eq!(row_ids, Some(rows([0])));
|
||||
|
||||
let row_ids = applier_factory(
|
||||
vec![],
|
||||
vec![(1, vec![(false, "hello")])],
|
||||
Some(BitVec::from_slice(&[0b1110])), // row 0 is filtered out
|
||||
)
|
||||
.await;
|
||||
assert_eq!(row_ids, Some(rows([])));
|
||||
|
||||
let row_ids = applier_factory(
|
||||
vec![],
|
||||
vec![(1, vec![(true, "hello")])],
|
||||
Some(BitVec::from_slice(&[0b1111])),
|
||||
)
|
||||
.await;
|
||||
assert_eq!(row_ids, None);
|
||||
|
||||
let row_ids = applier_factory(
|
||||
vec![],
|
||||
vec![(1, vec![(false, "world")])],
|
||||
Some(BitVec::from_slice(&[0b1111])),
|
||||
)
|
||||
.await;
|
||||
assert_eq!(row_ids, Some(rows([1])));
|
||||
|
||||
let row_ids = applier_factory(
|
||||
vec![],
|
||||
vec![(1, vec![(false, "world")])],
|
||||
Some(BitVec::from_slice(&[0b1101])), // row 1 is filtered out
|
||||
)
|
||||
.await;
|
||||
assert_eq!(row_ids, Some(rows([])));
|
||||
|
||||
let row_ids = applier_factory(
|
||||
vec![],
|
||||
vec![(1, vec![(true, "world")])],
|
||||
Some(BitVec::from_slice(&[0b1111])),
|
||||
)
|
||||
.await;
|
||||
assert_eq!(row_ids, None);
|
||||
|
||||
let row_ids = applier_factory(
|
||||
vec![],
|
||||
vec![(1, vec![(false, "Hello")])],
|
||||
Some(BitVec::from_slice(&[0b1111])),
|
||||
)
|
||||
.await;
|
||||
assert_eq!(row_ids, Some(rows([3])));
|
||||
|
||||
let row_ids = applier_factory(
|
||||
vec![],
|
||||
vec![(1, vec![(false, "Hello")])],
|
||||
Some(BitVec::from_slice(&[0b0111])), // row 3 is filtered out
|
||||
)
|
||||
.await;
|
||||
assert_eq!(row_ids, Some(rows([])));
|
||||
|
||||
let row_ids = applier_factory(
|
||||
vec![],
|
||||
vec![(1, vec![(true, "Hello")])],
|
||||
Some(BitVec::from_slice(&[0b1111])),
|
||||
)
|
||||
.await;
|
||||
assert_eq!(row_ids, None);
|
||||
|
||||
let row_ids = applier_factory(
|
||||
vec![],
|
||||
vec![(1, vec![(false, "Hello, World")])],
|
||||
Some(BitVec::from_slice(&[0b1111])),
|
||||
)
|
||||
.await;
|
||||
assert_eq!(row_ids, Some(rows([3])));
|
||||
|
||||
let row_ids = applier_factory(
|
||||
vec![],
|
||||
vec![(1, vec![(false, "Hello, World")])],
|
||||
Some(BitVec::from_slice(&[0b0111])), // row 3 is filtered out
|
||||
)
|
||||
.await;
|
||||
assert_eq!(row_ids, Some(rows([])));
|
||||
|
||||
let row_ids = applier_factory(
|
||||
vec![],
|
||||
vec![(1, vec![(true, "Hello, World")])],
|
||||
Some(BitVec::from_slice(&[0b1111])),
|
||||
)
|
||||
.await;
|
||||
assert_eq!(row_ids, None);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_fulltext_index_basic_case_insensitive_tantivy() {
|
||||
let applier_factory = build_fulltext_applier_factory(
|
||||
"test_fulltext_index_basic_case_insensitive_tantivy_",
|
||||
FulltextBackend::Tantivy,
|
||||
&[
|
||||
(None, Some("hello"), None),
|
||||
(None, None, None),
|
||||
@@ -675,47 +815,191 @@ mod tests {
|
||||
)
|
||||
.await;
|
||||
|
||||
let row_ids = applier_factory(vec![(2, "hello")], vec![]).await;
|
||||
let row_ids = applier_factory(vec![(2, "hello")], vec![], None).await;
|
||||
assert_eq!(row_ids, Some(rows([0, 3])));
|
||||
|
||||
let row_ids = applier_factory(vec![(2, "world")], vec![]).await;
|
||||
let row_ids = applier_factory(vec![(2, "world")], vec![], None).await;
|
||||
assert_eq!(row_ids, Some(rows([2, 3])));
|
||||
|
||||
let row_ids = applier_factory(vec![(2, "Hello")], vec![]).await;
|
||||
let row_ids = applier_factory(vec![(2, "Hello")], vec![], None).await;
|
||||
assert_eq!(row_ids, Some(rows([0, 3])));
|
||||
|
||||
let row_ids = applier_factory(vec![(2, "World")], vec![]).await;
|
||||
let row_ids = applier_factory(vec![(2, "World")], vec![], None).await;
|
||||
assert_eq!(row_ids, Some(rows([2, 3])));
|
||||
|
||||
let row_ids = applier_factory(vec![], vec![(2, vec![(false, "hello")])]).await;
|
||||
let row_ids = applier_factory(vec![], vec![(2, vec![(false, "hello")])], None).await;
|
||||
assert_eq!(row_ids, Some(rows([0, 3])));
|
||||
|
||||
let row_ids = applier_factory(vec![], vec![(2, vec![(true, "hello")])]).await;
|
||||
let row_ids = applier_factory(vec![], vec![(2, vec![(true, "hello")])], None).await;
|
||||
assert_eq!(row_ids, Some(rows([0, 3])));
|
||||
|
||||
let row_ids = applier_factory(vec![], vec![(2, vec![(false, "world")])]).await;
|
||||
let row_ids = applier_factory(vec![], vec![(2, vec![(false, "world")])], None).await;
|
||||
assert_eq!(row_ids, Some(rows([2, 3])));
|
||||
|
||||
let row_ids = applier_factory(vec![], vec![(2, vec![(true, "world")])]).await;
|
||||
let row_ids = applier_factory(vec![], vec![(2, vec![(true, "world")])], None).await;
|
||||
assert_eq!(row_ids, Some(rows([2, 3])));
|
||||
|
||||
let row_ids = applier_factory(vec![], vec![(2, vec![(false, "Hello")])]).await;
|
||||
let row_ids = applier_factory(vec![], vec![(2, vec![(false, "Hello")])], None).await;
|
||||
assert_eq!(row_ids, Some(rows([0, 3])));
|
||||
|
||||
let row_ids = applier_factory(vec![], vec![(2, vec![(true, "Hello")])]).await;
|
||||
let row_ids = applier_factory(vec![], vec![(2, vec![(true, "Hello")])], None).await;
|
||||
assert_eq!(row_ids, Some(rows([0, 3])));
|
||||
|
||||
let row_ids = applier_factory(vec![], vec![(2, vec![(false, "World")])]).await;
|
||||
let row_ids = applier_factory(vec![], vec![(2, vec![(false, "World")])], None).await;
|
||||
assert_eq!(row_ids, Some(rows([2, 3])));
|
||||
|
||||
let row_ids = applier_factory(vec![], vec![(2, vec![(true, "World")])]).await;
|
||||
let row_ids = applier_factory(vec![], vec![(2, vec![(true, "World")])], None).await;
|
||||
assert_eq!(row_ids, Some(rows([2, 3])));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_fulltext_index_basic_chinese() {
|
||||
async fn test_fulltext_index_basic_case_insensitive_bloom() {
|
||||
let applier_factory = build_fulltext_applier_factory(
|
||||
"test_fulltext_index_basic_chinese_",
|
||||
"test_fulltext_index_basic_case_insensitive_bloom_",
|
||||
FulltextBackend::Bloom,
|
||||
&[
|
||||
(None, Some("hello"), None),
|
||||
(None, None, None),
|
||||
(None, Some("world"), None),
|
||||
(None, Some("Hello, World"), None),
|
||||
],
|
||||
)
|
||||
.await;
|
||||
|
||||
let row_ids = applier_factory(
|
||||
vec![],
|
||||
vec![(2, vec![(false, "hello")])],
|
||||
Some(BitVec::from_slice(&[0b1111])),
|
||||
)
|
||||
.await;
|
||||
assert_eq!(row_ids, Some(rows([0, 3])));
|
||||
|
||||
let row_ids = applier_factory(
|
||||
vec![],
|
||||
vec![(2, vec![(false, "hello")])],
|
||||
Some(BitVec::from_slice(&[0b1110])), // row 0 is filtered out
|
||||
)
|
||||
.await;
|
||||
assert_eq!(row_ids, Some(rows([3])));
|
||||
|
||||
let row_ids = applier_factory(
|
||||
vec![],
|
||||
vec![(2, vec![(true, "hello")])],
|
||||
Some(BitVec::from_slice(&[0b1111])),
|
||||
)
|
||||
.await;
|
||||
assert_eq!(row_ids, Some(rows([0, 3])));
|
||||
|
||||
let row_ids = applier_factory(
|
||||
vec![],
|
||||
vec![(2, vec![(true, "hello")])],
|
||||
Some(BitVec::from_slice(&[0b1110])), // row 0 is filtered out
|
||||
)
|
||||
.await;
|
||||
assert_eq!(row_ids, Some(rows([3])));
|
||||
|
||||
let row_ids = applier_factory(
|
||||
vec![],
|
||||
vec![(2, vec![(false, "world")])],
|
||||
Some(BitVec::from_slice(&[0b1111])),
|
||||
)
|
||||
.await;
|
||||
assert_eq!(row_ids, Some(rows([2, 3])));
|
||||
|
||||
let row_ids = applier_factory(
|
||||
vec![],
|
||||
vec![(2, vec![(false, "world")])],
|
||||
Some(BitVec::from_slice(&[0b1011])), // row 2 is filtered out
|
||||
)
|
||||
.await;
|
||||
assert_eq!(row_ids, Some(rows([3])));
|
||||
|
||||
let row_ids = applier_factory(
|
||||
vec![],
|
||||
vec![(2, vec![(true, "world")])],
|
||||
Some(BitVec::from_slice(&[0b1111])),
|
||||
)
|
||||
.await;
|
||||
assert_eq!(row_ids, Some(rows([2, 3])));
|
||||
|
||||
let row_ids = applier_factory(
|
||||
vec![],
|
||||
vec![(2, vec![(true, "world")])],
|
||||
Some(BitVec::from_slice(&[0b1011])), // row 2 is filtered out
|
||||
)
|
||||
.await;
|
||||
assert_eq!(row_ids, Some(rows([3])));
|
||||
|
||||
let row_ids = applier_factory(
|
||||
vec![],
|
||||
vec![(2, vec![(false, "Hello")])],
|
||||
Some(BitVec::from_slice(&[0b1111])),
|
||||
)
|
||||
.await;
|
||||
assert_eq!(row_ids, Some(rows([0, 3])));
|
||||
|
||||
let row_ids = applier_factory(
|
||||
vec![],
|
||||
vec![(2, vec![(false, "Hello")])],
|
||||
Some(BitVec::from_slice(&[0b0111])), // row 3 is filtered out
|
||||
)
|
||||
.await;
|
||||
assert_eq!(row_ids, Some(rows([0])));
|
||||
|
||||
let row_ids = applier_factory(
|
||||
vec![],
|
||||
vec![(2, vec![(true, "Hello")])],
|
||||
Some(BitVec::from_slice(&[0b1111])),
|
||||
)
|
||||
.await;
|
||||
assert_eq!(row_ids, Some(rows([0, 3])));
|
||||
|
||||
let row_ids = applier_factory(
|
||||
vec![],
|
||||
vec![(2, vec![(true, "Hello")])],
|
||||
Some(BitVec::from_slice(&[0b1110])), // row 0 is filtered out
|
||||
)
|
||||
.await;
|
||||
assert_eq!(row_ids, Some(rows([3])));
|
||||
|
||||
let row_ids = applier_factory(
|
||||
vec![],
|
||||
vec![(2, vec![(false, "World")])],
|
||||
Some(BitVec::from_slice(&[0b1111])),
|
||||
)
|
||||
.await;
|
||||
assert_eq!(row_ids, Some(rows([2, 3])));
|
||||
|
||||
let row_ids = applier_factory(
|
||||
vec![],
|
||||
vec![(2, vec![(false, "World")])],
|
||||
Some(BitVec::from_slice(&[0b0111])), // row 3 is filtered out
|
||||
)
|
||||
.await;
|
||||
assert_eq!(row_ids, Some(rows([2])));
|
||||
|
||||
let row_ids = applier_factory(
|
||||
vec![],
|
||||
vec![(2, vec![(true, "World")])],
|
||||
Some(BitVec::from_slice(&[0b1111])),
|
||||
)
|
||||
.await;
|
||||
assert_eq!(row_ids, Some(rows([2, 3])));
|
||||
|
||||
let row_ids = applier_factory(
|
||||
vec![],
|
||||
vec![(2, vec![(true, "World")])],
|
||||
Some(BitVec::from_slice(&[0b1011])), // row 2 is filtered out
|
||||
)
|
||||
.await;
|
||||
assert_eq!(row_ids, Some(rows([3])));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_fulltext_index_basic_chinese_tantivy() {
|
||||
let applier_factory = build_fulltext_applier_factory(
|
||||
"test_fulltext_index_basic_chinese_tantivy_",
|
||||
FulltextBackend::Tantivy,
|
||||
&[
|
||||
(None, None, Some("你好")),
|
||||
(None, None, None),
|
||||
@@ -725,23 +1009,71 @@ mod tests {
|
||||
)
|
||||
.await;
|
||||
|
||||
let row_ids = applier_factory(vec![(3, "你好")], vec![]).await;
|
||||
let row_ids = applier_factory(vec![(3, "你好")], vec![], None).await;
|
||||
assert_eq!(row_ids, Some(rows([0, 3])));
|
||||
|
||||
let row_ids = applier_factory(vec![(3, "世界")], vec![]).await;
|
||||
let row_ids = applier_factory(vec![(3, "世界")], vec![], None).await;
|
||||
assert_eq!(row_ids, Some(rows([2, 3])));
|
||||
|
||||
let row_ids = applier_factory(vec![], vec![(3, vec![(false, "你好")])]).await;
|
||||
let row_ids = applier_factory(vec![], vec![(3, vec![(false, "你好")])], None).await;
|
||||
assert_eq!(row_ids, Some(rows([0, 3])));
|
||||
|
||||
let row_ids = applier_factory(vec![], vec![(3, vec![(false, "世界")])]).await;
|
||||
let row_ids = applier_factory(vec![], vec![(3, vec![(false, "世界")])], None).await;
|
||||
assert_eq!(row_ids, Some(rows([2, 3])));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_fulltext_index_multi_terms_case_sensitive() {
|
||||
async fn test_fulltext_index_basic_chinese_bloom() {
|
||||
let applier_factory = build_fulltext_applier_factory(
|
||||
"test_fulltext_index_multi_terms_case_sensitive_",
|
||||
"test_fulltext_index_basic_chinese_bloom_",
|
||||
FulltextBackend::Bloom,
|
||||
&[
|
||||
(None, None, Some("你好")),
|
||||
(None, None, None),
|
||||
(None, None, Some("世界")),
|
||||
(None, None, Some("你好,世界")),
|
||||
],
|
||||
)
|
||||
.await;
|
||||
|
||||
let row_ids = applier_factory(
|
||||
vec![],
|
||||
vec![(3, vec![(false, "你好")])],
|
||||
Some(BitVec::from_slice(&[0b1111])),
|
||||
)
|
||||
.await;
|
||||
assert_eq!(row_ids, Some(rows([0, 3])));
|
||||
|
||||
let row_ids = applier_factory(
|
||||
vec![],
|
||||
vec![(3, vec![(false, "你好")])],
|
||||
Some(BitVec::from_slice(&[0b1110])), // row 0 is filtered out
|
||||
)
|
||||
.await;
|
||||
assert_eq!(row_ids, Some(rows([3])));
|
||||
|
||||
let row_ids = applier_factory(
|
||||
vec![],
|
||||
vec![(3, vec![(false, "世界")])],
|
||||
Some(BitVec::from_slice(&[0b1111])),
|
||||
)
|
||||
.await;
|
||||
assert_eq!(row_ids, Some(rows([2, 3])));
|
||||
|
||||
let row_ids = applier_factory(
|
||||
vec![],
|
||||
vec![(3, vec![(false, "世界")])],
|
||||
Some(BitVec::from_slice(&[0b1011])), // row 2 is filtered out
|
||||
)
|
||||
.await;
|
||||
assert_eq!(row_ids, Some(rows([3])));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_fulltext_index_multi_terms_case_sensitive_tantivy() {
|
||||
let applier_factory = build_fulltext_applier_factory(
|
||||
"test_fulltext_index_multi_terms_case_sensitive_tantivy_",
|
||||
FulltextBackend::Tantivy,
|
||||
&[
|
||||
(Some("Hello"), None, None),
|
||||
(Some("World"), None, None),
|
||||
@@ -751,31 +1083,107 @@ mod tests {
|
||||
)
|
||||
.await;
|
||||
|
||||
let row_ids =
|
||||
applier_factory(vec![], vec![(1, vec![(false, "hello"), (false, "world")])]).await;
|
||||
let row_ids = applier_factory(
|
||||
vec![],
|
||||
vec![(1, vec![(false, "hello"), (false, "world")])],
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
assert_eq!(row_ids, Some(rows([])));
|
||||
|
||||
let row_ids =
|
||||
applier_factory(vec![], vec![(1, vec![(false, "Hello"), (false, "World")])]).await;
|
||||
let row_ids = applier_factory(
|
||||
vec![],
|
||||
vec![(1, vec![(false, "Hello"), (false, "World")])],
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
assert_eq!(row_ids, Some(rows([3])));
|
||||
|
||||
let row_ids =
|
||||
applier_factory(vec![], vec![(1, vec![(true, "Hello"), (false, "World")])]).await;
|
||||
let row_ids = applier_factory(
|
||||
vec![],
|
||||
vec![(1, vec![(true, "Hello"), (false, "World")])],
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
assert_eq!(row_ids, Some(rows([1, 3])));
|
||||
|
||||
let row_ids =
|
||||
applier_factory(vec![], vec![(1, vec![(false, "Hello"), (true, "World")])]).await;
|
||||
let row_ids = applier_factory(
|
||||
vec![],
|
||||
vec![(1, vec![(false, "Hello"), (true, "World")])],
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
assert_eq!(row_ids, Some(rows([0, 3])));
|
||||
|
||||
let row_ids =
|
||||
applier_factory(vec![], vec![(1, vec![(true, "Hello"), (true, "World")])]).await;
|
||||
let row_ids = applier_factory(
|
||||
vec![],
|
||||
vec![(1, vec![(true, "Hello"), (true, "World")])],
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
assert_eq!(row_ids, None);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_fulltext_index_multi_terms_case_insensitive() {
|
||||
async fn test_fulltext_index_multi_terms_case_sensitive_bloom() {
|
||||
let applier_factory = build_fulltext_applier_factory(
|
||||
"test_fulltext_index_multi_terms_case_insensitive_",
|
||||
"test_fulltext_index_multi_terms_case_sensitive_bloom_",
|
||||
FulltextBackend::Bloom,
|
||||
&[
|
||||
(Some("Hello"), None, None),
|
||||
(Some("World"), None, None),
|
||||
(None, None, None),
|
||||
(Some("Hello, World"), None, None),
|
||||
],
|
||||
)
|
||||
.await;
|
||||
|
||||
let row_ids = applier_factory(
|
||||
vec![],
|
||||
vec![(1, vec![(false, "hello"), (false, "world")])],
|
||||
Some(BitVec::from_slice(&[0b1111])),
|
||||
)
|
||||
.await;
|
||||
assert_eq!(row_ids, Some(rows([])));
|
||||
|
||||
let row_ids = applier_factory(
|
||||
vec![],
|
||||
vec![(1, vec![(false, "Hello"), (false, "World")])],
|
||||
Some(BitVec::from_slice(&[0b1111])),
|
||||
)
|
||||
.await;
|
||||
assert_eq!(row_ids, Some(rows([3])));
|
||||
|
||||
let row_ids = applier_factory(
|
||||
vec![],
|
||||
vec![(1, vec![(true, "Hello"), (false, "World")])],
|
||||
Some(BitVec::from_slice(&[0b1111])),
|
||||
)
|
||||
.await;
|
||||
assert_eq!(row_ids, Some(rows([1, 3])));
|
||||
|
||||
let row_ids = applier_factory(
|
||||
vec![],
|
||||
vec![(1, vec![(false, "Hello"), (true, "World")])],
|
||||
Some(BitVec::from_slice(&[0b1111])),
|
||||
)
|
||||
.await;
|
||||
assert_eq!(row_ids, Some(rows([0, 3])));
|
||||
|
||||
let row_ids = applier_factory(
|
||||
vec![],
|
||||
vec![(1, vec![(true, "Hello"), (true, "World")])],
|
||||
Some(BitVec::from_slice(&[0b1111])),
|
||||
)
|
||||
.await;
|
||||
assert_eq!(row_ids, None);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_fulltext_index_multi_terms_case_insensitive_tantivy() {
|
||||
let applier_factory = build_fulltext_applier_factory(
|
||||
"test_fulltext_index_multi_terms_case_insensitive_tantivy_",
|
||||
FulltextBackend::Tantivy,
|
||||
&[
|
||||
(None, Some("hello"), None),
|
||||
(None, None, None),
|
||||
@@ -785,27 +1193,91 @@ mod tests {
|
||||
)
|
||||
.await;
|
||||
|
||||
let row_ids =
|
||||
applier_factory(vec![], vec![(2, vec![(false, "hello"), (false, "world")])]).await;
|
||||
let row_ids = applier_factory(
|
||||
vec![],
|
||||
vec![(2, vec![(false, "hello"), (false, "world")])],
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
assert_eq!(row_ids, Some(rows([3])));
|
||||
|
||||
let row_ids =
|
||||
applier_factory(vec![], vec![(2, vec![(true, "hello"), (false, "world")])]).await;
|
||||
let row_ids = applier_factory(
|
||||
vec![],
|
||||
vec![(2, vec![(true, "hello"), (false, "world")])],
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
assert_eq!(row_ids, Some(rows([3])));
|
||||
|
||||
let row_ids =
|
||||
applier_factory(vec![], vec![(2, vec![(false, "hello"), (true, "world")])]).await;
|
||||
let row_ids = applier_factory(
|
||||
vec![],
|
||||
vec![(2, vec![(false, "hello"), (true, "world")])],
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
assert_eq!(row_ids, Some(rows([3])));
|
||||
|
||||
let row_ids =
|
||||
applier_factory(vec![], vec![(2, vec![(true, "hello"), (true, "world")])]).await;
|
||||
let row_ids = applier_factory(
|
||||
vec![],
|
||||
vec![(2, vec![(true, "hello"), (true, "world")])],
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
assert_eq!(row_ids, Some(rows([3])));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_fulltext_index_multi_columns() {
|
||||
async fn test_fulltext_index_multi_terms_case_insensitive_bloom() {
|
||||
let applier_factory = build_fulltext_applier_factory(
|
||||
"test_fulltext_index_multi_columns_",
|
||||
"test_fulltext_index_multi_terms_case_insensitive_bloom_",
|
||||
FulltextBackend::Bloom,
|
||||
&[
|
||||
(None, Some("hello"), None),
|
||||
(None, None, None),
|
||||
(None, Some("world"), None),
|
||||
(None, Some("Hello, World"), None),
|
||||
],
|
||||
)
|
||||
.await;
|
||||
|
||||
let row_ids = applier_factory(
|
||||
vec![],
|
||||
vec![(2, vec![(false, "hello"), (false, "world")])],
|
||||
Some(BitVec::from_slice(&[0b1111])),
|
||||
)
|
||||
.await;
|
||||
assert_eq!(row_ids, Some(rows([3])));
|
||||
|
||||
let row_ids = applier_factory(
|
||||
vec![],
|
||||
vec![(2, vec![(true, "hello"), (false, "world")])],
|
||||
Some(BitVec::from_slice(&[0b1111])),
|
||||
)
|
||||
.await;
|
||||
assert_eq!(row_ids, Some(rows([3])));
|
||||
|
||||
let row_ids = applier_factory(
|
||||
vec![],
|
||||
vec![(2, vec![(false, "hello"), (true, "world")])],
|
||||
Some(BitVec::from_slice(&[0b1111])),
|
||||
)
|
||||
.await;
|
||||
assert_eq!(row_ids, Some(rows([3])));
|
||||
|
||||
let row_ids = applier_factory(
|
||||
vec![],
|
||||
vec![(2, vec![(true, "hello"), (true, "world")])],
|
||||
Some(BitVec::from_slice(&[0b1111])),
|
||||
)
|
||||
.await;
|
||||
assert_eq!(row_ids, Some(rows([3])));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_fulltext_index_multi_columns_tantivy() {
|
||||
let applier_factory = build_fulltext_applier_factory(
|
||||
"test_fulltext_index_multi_columns_tantivy_",
|
||||
FulltextBackend::Tantivy,
|
||||
&[
|
||||
(Some("Hello"), None, Some("你好")),
|
||||
(Some("World"), Some("world"), None),
|
||||
@@ -822,11 +1294,52 @@ mod tests {
|
||||
let row_ids = applier_factory(
|
||||
vec![(1, "Hello"), (3, "你好")],
|
||||
vec![(2, vec![(false, "world")])],
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
assert_eq!(row_ids, Some(rows([3])));
|
||||
|
||||
let row_ids = applier_factory(vec![(2, "World")], vec![(1, vec![(false, "World")])]).await;
|
||||
let row_ids =
|
||||
applier_factory(vec![(2, "World")], vec![(1, vec![(false, "World")])], None).await;
|
||||
assert_eq!(row_ids, Some(rows([1, 3])));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_fulltext_index_multi_columns_bloom() {
|
||||
let applier_factory = build_fulltext_applier_factory(
|
||||
"test_fulltext_index_multi_columns_bloom_",
|
||||
FulltextBackend::Bloom,
|
||||
&[
|
||||
(Some("Hello"), None, Some("你好")),
|
||||
(Some("World"), Some("world"), None),
|
||||
(None, Some("World"), Some("世界")),
|
||||
(
|
||||
Some("Hello, World"),
|
||||
Some("Hello, World"),
|
||||
Some("你好,世界"),
|
||||
),
|
||||
],
|
||||
)
|
||||
.await;
|
||||
|
||||
let row_ids = applier_factory(
|
||||
vec![],
|
||||
vec![
|
||||
(1, vec![(false, "Hello")]),
|
||||
(2, vec![(false, "world")]),
|
||||
(3, vec![(false, "你好")]),
|
||||
],
|
||||
Some(BitVec::from_slice(&[0b1111])),
|
||||
)
|
||||
.await;
|
||||
assert_eq!(row_ids, Some(rows([3])));
|
||||
|
||||
let row_ids = applier_factory(
|
||||
vec![],
|
||||
vec![(1, vec![(false, "World")]), (2, vec![(false, "World")])],
|
||||
Some(BitVec::from_slice(&[0b1111])),
|
||||
)
|
||||
.await;
|
||||
assert_eq!(row_ids, Some(rows([1, 3])));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -369,6 +369,9 @@ impl ParquetReaderBuilder {
|
||||
self.prune_row_groups_by_bloom_filter(parquet_meta, &mut output, metrics)
|
||||
.await;
|
||||
|
||||
self.prune_row_groups_by_fulltext_bloom(parquet_meta, &mut output, metrics)
|
||||
.await;
|
||||
|
||||
output
|
||||
}
|
||||
|
||||
@@ -389,7 +392,7 @@ impl ParquetReaderBuilder {
|
||||
|
||||
let file_size_hint = self.file_handle.meta_ref().index_file_size();
|
||||
let apply_res = match index_applier
|
||||
.apply(self.file_handle.file_id(), Some(file_size_hint))
|
||||
.apply_fine(self.file_handle.file_id(), Some(file_size_hint))
|
||||
.await
|
||||
{
|
||||
Ok(Some(res)) => res,
|
||||
@@ -631,6 +634,67 @@ impl ParquetReaderBuilder {
|
||||
true
|
||||
}
|
||||
|
||||
async fn prune_row_groups_by_fulltext_bloom(
|
||||
&self,
|
||||
parquet_meta: &ParquetMetaData,
|
||||
output: &mut BTreeMap<usize, Option<RowSelection>>,
|
||||
metrics: &mut ReaderFilterMetrics,
|
||||
) -> bool {
|
||||
let Some(index_applier) = &self.fulltext_index_applier else {
|
||||
return false;
|
||||
};
|
||||
|
||||
if !self.file_handle.meta_ref().fulltext_index_available() {
|
||||
return false;
|
||||
}
|
||||
|
||||
let file_size_hint = self.file_handle.meta_ref().index_file_size();
|
||||
let apply_output = match index_applier
|
||||
.apply_coarse(
|
||||
self.file_handle.file_id(),
|
||||
Some(file_size_hint),
|
||||
parquet_meta
|
||||
.row_groups()
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(i, rg)| (rg.num_rows() as usize, output.contains_key(&i))),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(Some(apply_output)) => apply_output,
|
||||
Ok(None) => return false,
|
||||
Err(err) => {
|
||||
if cfg!(any(test, feature = "test")) {
|
||||
panic!(
|
||||
"Failed to apply fulltext index, region_id: {}, file_id: {}, err: {:?}",
|
||||
self.file_handle.region_id(),
|
||||
self.file_handle.file_id(),
|
||||
err
|
||||
);
|
||||
} else {
|
||||
warn!(
|
||||
err; "Failed to apply fulltext index, region_id: {}, file_id: {}",
|
||||
self.file_handle.region_id(), self.file_handle.file_id()
|
||||
);
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
};
|
||||
|
||||
Self::prune_row_groups_by_ranges(
|
||||
parquet_meta,
|
||||
apply_output
|
||||
.into_iter()
|
||||
.map(|(rg, ranges)| (rg, ranges.into_iter())),
|
||||
output,
|
||||
&mut metrics.rg_fulltext_filtered,
|
||||
&mut metrics.rows_fulltext_filtered,
|
||||
);
|
||||
|
||||
true
|
||||
}
|
||||
|
||||
/// Prunes row groups by rows. The `rows_in_row_groups` is like a map from row group to
|
||||
/// a list of row ids to keep.
|
||||
fn prune_row_groups_by_rows(
|
||||
|
||||
Reference in New Issue
Block a user