diff --git a/Cargo.lock b/Cargo.lock index 1b108a7546..2ab3200029 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5591,6 +5591,7 @@ dependencies = [ "greptime-proto", "itertools 0.14.0", "jieba-rs", + "lazy_static", "mockall", "pin-project", "prost 0.13.5", diff --git a/src/index/Cargo.toml b/src/index/Cargo.toml index dc6e394ef4..c4b7057895 100644 --- a/src/index/Cargo.toml +++ b/src/index/Cargo.toml @@ -23,6 +23,7 @@ futures.workspace = true greptime-proto.workspace = true itertools.workspace = true jieba-rs = "0.7" +lazy_static.workspace = true mockall.workspace = true pin-project.workspace = true prost.workspace = true diff --git a/src/index/src/fulltext_index/tokenizer.rs b/src/index/src/fulltext_index/tokenizer.rs index 721ffdd3b9..b00e7fda9c 100644 --- a/src/index/src/fulltext_index/tokenizer.rs +++ b/src/index/src/fulltext_index/tokenizer.rs @@ -12,11 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -use jieba_rs::Jieba; - use crate::fulltext_index::error::Result; use crate::Bytes; +lazy_static::lazy_static! { + static ref JIEBA: jieba_rs::Jieba = jieba_rs::Jieba::new(); +} + /// `Tokenizer` tokenizes a text into a list of tokens. pub trait Tokenizer: Send { fn tokenize<'a>(&self, text: &'a str) -> Vec<&'a str>; @@ -44,8 +46,7 @@ pub struct ChineseTokenizer; impl Tokenizer for ChineseTokenizer { fn tokenize<'a>(&self, text: &'a str) -> Vec<&'a str> { - let jieba = Jieba::new(); - jieba.cut(text, false) + JIEBA.cut(text, false) } } diff --git a/src/mito2/src/cache/index/bloom_filter_index.rs b/src/mito2/src/cache/index/bloom_filter_index.rs index 097acd6367..61df853573 100644 --- a/src/mito2/src/cache/index/bloom_filter_index.rs +++ b/src/mito2/src/cache/index/bloom_filter_index.rs @@ -29,8 +29,15 @@ use crate::sst::file::FileId; const INDEX_TYPE_BLOOM_FILTER_INDEX: &str = "bloom_filter_index"; +/// Tag for bloom filter index cache. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum Tag { + Skipping, + Fulltext, +} + /// Cache for bloom filter index. -pub type BloomFilterIndexCache = IndexCache<(FileId, ColumnId), BloomFilterMeta>; +pub type BloomFilterIndexCache = IndexCache<(FileId, ColumnId, Tag), BloomFilterMeta>; pub type BloomFilterIndexCacheRef = Arc; impl BloomFilterIndexCache { @@ -48,14 +55,20 @@ impl BloomFilterIndexCache { } /// Calculates weight for bloom filter index metadata. -fn bloom_filter_index_metadata_weight(k: &(FileId, ColumnId), _: &Arc) -> u32 { +fn bloom_filter_index_metadata_weight( + k: &(FileId, ColumnId, Tag), + _: &Arc, +) -> u32 { (k.0.as_bytes().len() + std::mem::size_of::() + std::mem::size_of::()) as u32 } /// Calculates weight for bloom filter index content. -fn bloom_filter_index_content_weight((k, _): &((FileId, ColumnId), PageKey), v: &Bytes) -> u32 { +fn bloom_filter_index_content_weight( + (k, _): &((FileId, ColumnId, Tag), PageKey), + v: &Bytes, +) -> u32 { (k.0.as_bytes().len() + std::mem::size_of::() + v.len()) as u32 } @@ -63,6 +76,7 @@ fn bloom_filter_index_content_weight((k, _): &((FileId, ColumnId), PageKey), v: pub struct CachedBloomFilterIndexBlobReader { file_id: FileId, column_id: ColumnId, + tag: Tag, blob_size: u64, inner: R, cache: BloomFilterIndexCacheRef, @@ -73,6 +87,7 @@ impl CachedBloomFilterIndexBlobReader { pub fn new( file_id: FileId, column_id: ColumnId, + tag: Tag, blob_size: u64, inner: R, cache: BloomFilterIndexCacheRef, @@ -80,6 +95,7 @@ impl CachedBloomFilterIndexBlobReader { Self { file_id, column_id, + tag, blob_size, inner, cache, @@ -93,7 +109,7 @@ impl BloomFilterReader for CachedBloomFilterIndexBl let inner = &self.inner; self.cache .get_or_load( - (self.file_id, self.column_id), + (self.file_id, self.column_id, self.tag), self.blob_size, offset, size, @@ -107,7 +123,7 @@ impl BloomFilterReader for CachedBloomFilterIndexBl let fetch = ranges.iter().map(|range| { let inner = &self.inner; self.cache.get_or_load( - (self.file_id, self.column_id), + (self.file_id, self.column_id, self.tag), self.blob_size, range.start, (range.end - range.start) as u32, @@ -123,13 +139,18 @@ impl BloomFilterReader for CachedBloomFilterIndexBl /// Reads the meta information of the bloom filter. async fn metadata(&self) -> Result { - if let Some(cached) = self.cache.get_metadata((self.file_id, self.column_id)) { + if let Some(cached) = self + .cache + .get_metadata((self.file_id, self.column_id, self.tag)) + { CACHE_HIT.with_label_values(&[INDEX_METADATA_TYPE]).inc(); Ok((*cached).clone()) } else { let meta = self.inner.metadata().await?; - self.cache - .put_metadata((self.file_id, self.column_id), Arc::new(meta.clone())); + self.cache.put_metadata( + (self.file_id, self.column_id, self.tag), + Arc::new(meta.clone()), + ); CACHE_MISS.with_label_values(&[INDEX_METADATA_TYPE]).inc(); Ok(meta) } diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 855455453f..8b9efb9ae8 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -502,7 +502,7 @@ impl ScanRegion { let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache()); let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned(); - + let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned(); FulltextIndexApplierBuilder::new( self.access_layer.region_dir().to_string(), self.version.metadata.region_id, @@ -512,6 +512,7 @@ impl ScanRegion { ) .with_file_cache(file_cache) .with_puffin_metadata_cache(puffin_metadata_cache) + .with_bloom_filter_cache(bloom_filter_index_cache) .build(&self.request.filters) .inspect_err(|err| warn!(err; "Failed to build fulltext index applier")) .ok() diff --git a/src/mito2/src/read/scan_util.rs b/src/mito2/src/read/scan_util.rs index 4a211f7117..36899796b0 100644 --- a/src/mito2/src/read/scan_util.rs +++ b/src/mito2/src/read/scan_util.rs @@ -505,6 +505,7 @@ pub(crate) fn scan_file_ranges( // Reports metrics. reader_metrics.observe_rows(read_type); + reader_metrics.filter_metrics.observe(); part_metrics.merge_reader_metrics(&reader_metrics); } } diff --git a/src/mito2/src/sst/index/bloom_filter/applier.rs b/src/mito2/src/sst/index/bloom_filter/applier.rs index afd5cc16cd..fac5db5405 100644 --- a/src/mito2/src/sst/index/bloom_filter/applier.rs +++ b/src/mito2/src/sst/index/bloom_filter/applier.rs @@ -31,7 +31,7 @@ use store_api::storage::{ColumnId, RegionId}; use crate::access_layer::{RegionFilePathFactory, WriteCachePathProvider}; use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey}; use crate::cache::index::bloom_filter_index::{ - BloomFilterIndexCacheRef, CachedBloomFilterIndexBlobReader, + BloomFilterIndexCacheRef, CachedBloomFilterIndexBlobReader, Tag, }; use crate::error::{ ApplyBloomFilterIndexSnafu, Error, MetadataSnafu, PuffinBuildReaderSnafu, PuffinReadBlobSnafu, @@ -165,6 +165,7 @@ impl BloomFilterIndexApplier { let reader = CachedBloomFilterIndexBlobReader::new( file_id, *column_id, + Tag::Skipping, blob_size, BloomFilterReaderImpl::new(blob), bloom_filter_cache.clone(), @@ -308,13 +309,13 @@ impl BloomFilterIndexApplier { ) -> std::result::Result<(), index::bloom_filter::error::Error> { let mut applier = BloomFilterApplier::new(Box::new(reader)).await?; - for (_, output) in output.iter_mut() { + for (_, row_group_output) in output.iter_mut() { // All rows are filtered out, skip the search - if output.is_empty() { + if row_group_output.is_empty() { continue; } - *output = applier.search(predicates, output).await?; + *row_group_output = applier.search(predicates, row_group_output).await?; } Ok(()) diff --git a/src/mito2/src/sst/index/fulltext_index/applier.rs b/src/mito2/src/sst/index/fulltext_index/applier.rs index 94ceda6891..063227a89f 100644 --- a/src/mito2/src/sst/index/fulltext_index/applier.rs +++ b/src/mito2/src/sst/index/fulltext_index/applier.rs @@ -12,10 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeSet, HashMap}; +use std::collections::{BTreeSet, HashMap, HashSet}; +use std::iter; +use std::ops::Range; use std::sync::Arc; +use common_base::range_read::RangeReader; use common_telemetry::warn; +use index::bloom_filter::applier::{BloomFilterApplier, InListPredicate}; +use index::bloom_filter::reader::BloomFilterReaderImpl; use index::fulltext_index::search::{FulltextIndexSearcher, RowId, TantivyFulltextIndexSearcher}; use index::fulltext_index::Config; use object_store::ObjectStore; @@ -26,11 +31,17 @@ use store_api::storage::{ColumnId, RegionId}; use crate::access_layer::{RegionFilePathFactory, WriteCachePathProvider}; use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey}; -use crate::error::{ApplyFulltextIndexSnafu, PuffinBuildReaderSnafu, PuffinReadBlobSnafu, Result}; +use crate::cache::index::bloom_filter_index::{ + BloomFilterIndexCacheRef, CachedBloomFilterIndexBlobReader, Tag, +}; +use crate::error::{ + ApplyBloomFilterIndexSnafu, ApplyFulltextIndexSnafu, MetadataSnafu, PuffinBuildReaderSnafu, + PuffinReadBlobSnafu, Result, +}; use crate::metrics::INDEX_APPLY_ELAPSED; use crate::sst::file::FileId; -use crate::sst::index::fulltext_index::applier::builder::FulltextRequest; -use crate::sst::index::fulltext_index::INDEX_BLOB_TYPE_TANTIVY; +use crate::sst::index::fulltext_index::applier::builder::{FulltextRequest, FulltextTerm}; +use crate::sst::index::fulltext_index::{INDEX_BLOB_TYPE_BLOOM, INDEX_BLOB_TYPE_TANTIVY}; use crate::sst::index::puffin_manager::{ PuffinManagerFactory, SstPuffinBlob, SstPuffinDir, SstPuffinReader, }; @@ -45,6 +56,9 @@ pub struct FulltextIndexApplier { /// The source of the index. index_source: IndexSource, + + /// Cache for bloom filter index. + bloom_filter_index_cache: Option, } pub type FulltextIndexApplierRef = Arc; @@ -63,6 +77,7 @@ impl FulltextIndexApplier { Self { requests, index_source, + bloom_filter_index_cache: None, } } @@ -82,13 +97,25 @@ impl FulltextIndexApplier { self } - /// Applies the queries to the fulltext index of the specified SST file. - pub async fn apply( + /// Sets the bloom filter cache. + pub fn with_bloom_filter_cache( + mut self, + bloom_filter_index_cache: Option, + ) -> Self { + self.bloom_filter_index_cache = bloom_filter_index_cache; + self + } +} + +impl FulltextIndexApplier { + /// Applies fine-grained fulltext index to the specified SST file. + /// Returns the row ids that match the queries. + pub async fn apply_fine( &self, file_id: FileId, file_size_hint: Option, ) -> Result>> { - let _timer = INDEX_APPLY_ELAPSED + let timer = INDEX_APPLY_ELAPSED .with_label_values(&[TYPE_FULLTEXT_INDEX]) .start_timer(); @@ -99,7 +126,7 @@ impl FulltextIndexApplier { } let Some(result) = self - .apply_one_column(file_size_hint, file_id, *column_id, request) + .apply_fine_one_column(file_size_hint, file_id, *column_id, request) .await? else { continue; @@ -118,10 +145,13 @@ impl FulltextIndexApplier { } } + if row_ids.is_none() { + timer.stop_and_discard(); + } Ok(row_ids) } - async fn apply_one_column( + async fn apply_fine_one_column( &self, file_size_hint: Option, file_id: FileId, @@ -187,6 +217,195 @@ impl FulltextIndexApplier { } } +impl FulltextIndexApplier { + /// Applies coarse-grained fulltext index to the specified SST file. + /// Returns (row group id -> ranges) that match the queries. + pub async fn apply_coarse( + &self, + file_id: FileId, + file_size_hint: Option, + row_groups: impl Iterator, + ) -> Result>)>>> { + let timer = INDEX_APPLY_ELAPSED + .with_label_values(&[TYPE_FULLTEXT_INDEX]) + .start_timer(); + + let (input, mut output) = Self::init_coarse_output(row_groups); + let mut applied = false; + + for (column_id, request) in &self.requests { + if request.terms.is_empty() { + // only apply terms + continue; + } + + applied |= self + .apply_coarse_one_column( + file_id, + file_size_hint, + *column_id, + &request.terms, + &mut output, + ) + .await?; + } + + if !applied { + timer.stop_and_discard(); + return Ok(None); + } + + Self::adjust_coarse_output(input, &mut output); + Ok(Some(output)) + } + + async fn apply_coarse_one_column( + &self, + file_id: FileId, + file_size_hint: Option, + column_id: ColumnId, + terms: &[FulltextTerm], + output: &mut [(usize, Vec>)], + ) -> Result { + let blob_key = format!("{INDEX_BLOB_TYPE_BLOOM}-{column_id}"); + let Some(reader) = self + .index_source + .blob(file_id, &blob_key, file_size_hint) + .await? + else { + return Ok(false); + }; + let config = + Config::from_blob_metadata(reader.metadata()).context(ApplyFulltextIndexSnafu)?; + + let predicates = Self::terms_to_predicates(terms, &config); + if predicates.is_empty() { + return Ok(false); + } + + let range_reader = reader.reader().await.context(PuffinBuildReaderSnafu)?; + let reader = if let Some(bloom_filter_cache) = &self.bloom_filter_index_cache { + let blob_size = range_reader + .metadata() + .await + .context(MetadataSnafu)? + .content_length; + let reader = CachedBloomFilterIndexBlobReader::new( + file_id, + column_id, + Tag::Fulltext, + blob_size, + BloomFilterReaderImpl::new(range_reader), + bloom_filter_cache.clone(), + ); + Box::new(reader) as _ + } else { + Box::new(BloomFilterReaderImpl::new(range_reader)) as _ + }; + + let mut applier = BloomFilterApplier::new(reader) + .await + .context(ApplyBloomFilterIndexSnafu)?; + for (_, row_group_output) in output.iter_mut() { + // All rows are filtered out, skip the search + if row_group_output.is_empty() { + continue; + } + + *row_group_output = applier + .search(&predicates, row_group_output) + .await + .context(ApplyBloomFilterIndexSnafu)?; + } + + Ok(true) + } + + /// Initializes the coarse output. Must call `adjust_coarse_output` after applying bloom filters. + /// + /// `row_groups` is a list of (row group length, whether to search). + /// + /// Returns (`input`, `output`): + /// * `input` is a list of (row group index to search, row group range based on start of the file). + /// * `output` is a list of (row group index to search, row group ranges based on start of the file). + #[allow(clippy::type_complexity)] + fn init_coarse_output( + row_groups: impl Iterator, + ) -> (Vec<(usize, Range)>, Vec<(usize, Vec>)>) { + // Calculates row groups' ranges based on start of the file. + let mut input = Vec::with_capacity(row_groups.size_hint().0); + let mut start = 0; + for (i, (len, to_search)) in row_groups.enumerate() { + let end = start + len; + if to_search { + input.push((i, start..end)); + } + start = end; + } + + // Initializes output with input ranges, but ranges are based on start of the file not the row group, + // so we need to adjust them later. + let output = input + .iter() + .map(|(i, range)| (*i, vec![range.clone()])) + .collect::>(); + + (input, output) + } + + /// Adjusts the coarse output. Makes the output ranges based on row group start. + fn adjust_coarse_output( + input: Vec<(usize, Range)>, + output: &mut Vec<(usize, Vec>)>, + ) { + // adjust ranges to be based on row group + for ((_, output), (_, input)) in output.iter_mut().zip(input) { + let start = input.start; + for range in output.iter_mut() { + range.start -= start; + range.end -= start; + } + } + output.retain(|(_, ranges)| !ranges.is_empty()); + } + + /// Converts terms to predicates. + /// + /// Split terms by non-alphanumeric characters and convert them to lowercase if case-insensitive. + /// Multiple terms are combined with AND semantics. + fn terms_to_predicates(terms: &[FulltextTerm], config: &Config) -> Vec { + let mut probes = HashSet::new(); + for term in terms { + if config.case_sensitive && term.col_lowered { + // lowercased terms are not indexed + continue; + } + + let ts = term + .term + .split(|c: char| !c.is_alphanumeric()) + .filter(|&t| !t.is_empty()) + .map(|t| { + if !config.case_sensitive { + t.to_lowercase() + } else { + t.to_string() + } + .into_bytes() + }); + + probes.extend(ts); + } + + probes + .into_iter() + .map(|p| InListPredicate { + list: iter::once(p).collect(), + }) + .collect::>() + } +} + /// The source of the index. struct IndexSource { region_dir: String, diff --git a/src/mito2/src/sst/index/fulltext_index/applier/builder.rs b/src/mito2/src/sst/index/fulltext_index/applier/builder.rs index 14f5936a01..3297275f26 100644 --- a/src/mito2/src/sst/index/fulltext_index/applier/builder.rs +++ b/src/mito2/src/sst/index/fulltext_index/applier/builder.rs @@ -23,6 +23,7 @@ use store_api::metadata::RegionMetadata; use store_api::storage::{ColumnId, ConcreteDataType, RegionId}; use crate::cache::file_cache::FileCacheRef; +use crate::cache::index::bloom_filter_index::BloomFilterIndexCacheRef; use crate::error::Result; use crate::sst::index::fulltext_index::applier::FulltextIndexApplier; use crate::sst::index::puffin_manager::PuffinManagerFactory; @@ -86,6 +87,7 @@ pub struct FulltextIndexApplierBuilder<'a> { metadata: &'a RegionMetadata, file_cache: Option, puffin_metadata_cache: Option, + bloom_filter_cache: Option, } impl<'a> FulltextIndexApplierBuilder<'a> { @@ -105,6 +107,7 @@ impl<'a> FulltextIndexApplierBuilder<'a> { metadata, file_cache: None, puffin_metadata_cache: None, + bloom_filter_cache: None, } } @@ -123,6 +126,15 @@ impl<'a> FulltextIndexApplierBuilder<'a> { self } + /// Sets the bloom filter cache to be used by the `FulltextIndexApplier`. + pub fn with_bloom_filter_cache( + mut self, + bloom_filter_cache: Option, + ) -> Self { + self.bloom_filter_cache = bloom_filter_cache; + self + } + /// Builds `SstIndexApplier` from the given expressions. pub fn build(self, exprs: &[Expr]) -> Result> { let mut requests = HashMap::new(); @@ -145,6 +157,7 @@ impl<'a> FulltextIndexApplierBuilder<'a> { ) .with_file_cache(self.file_cache) .with_puffin_metadata_cache(self.puffin_metadata_cache) + .with_bloom_filter_cache(self.bloom_filter_cache) })) } diff --git a/src/mito2/src/sst/index/fulltext_index/creator.rs b/src/mito2/src/sst/index/fulltext_index/creator.rs index 12b83e39d0..1d884ac3a5 100644 --- a/src/mito2/src/sst/index/fulltext_index/creator.rs +++ b/src/mito2/src/sst/index/fulltext_index/creator.rs @@ -360,6 +360,7 @@ mod tests { use std::sync::Arc; use api::v1::SemanticType; + use common_base::BitVec; use datatypes::data_type::DataType; use datatypes::schema::{ColumnSchema, FulltextAnalyzer, FulltextOptions}; use datatypes::vectors::{UInt64Vector, UInt8Vector}; @@ -390,7 +391,7 @@ mod tests { IntermediateManager::init_fs(path).await.unwrap() } - fn mock_region_metadata() -> RegionMetadataRef { + fn mock_region_metadata(backend: FulltextBackend) -> RegionMetadataRef { let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2)); builder .push_column_metadata(ColumnMetadata { @@ -403,7 +404,7 @@ mod tests { enable: true, analyzer: FulltextAnalyzer::English, case_sensitive: true, - backend: FulltextBackend::Tantivy, + backend: backend.clone(), }) .unwrap(), semantic_type: SemanticType::Field, @@ -419,7 +420,7 @@ mod tests { enable: true, analyzer: FulltextAnalyzer::English, case_sensitive: false, - backend: FulltextBackend::Tantivy, + backend: backend.clone(), }) .unwrap(), semantic_type: SemanticType::Field, @@ -435,7 +436,7 @@ mod tests { enable: true, analyzer: FulltextAnalyzer::Chinese, case_sensitive: false, - backend: FulltextBackend::Tantivy, + backend: backend.clone(), }) .unwrap(), semantic_type: SemanticType::Field, @@ -522,6 +523,7 @@ mod tests { /// - `terms`: A list of (ColumnId, [(bool, String)]) for fulltext terms, where bool indicates if term is lowercased async fn build_fulltext_applier_factory( prefix: &str, + backend: FulltextBackend, rows: &[( Option<&str>, // text_english_case_sensitive Option<&str>, // text_english_case_insensitive @@ -530,12 +532,13 @@ mod tests { ) -> impl Fn( Vec<(ColumnId, &str)>, Vec<(ColumnId, Vec<(bool, &str)>)>, + Option, ) -> BoxFuture<'static, Option>> { let (d, factory) = PuffinManagerFactory::new_for_test_async(prefix).await; let region_dir = "region0".to_string(); let sst_file_id = FileId::random(); let object_store = mock_object_store(); - let region_metadata = mock_region_metadata(); + let region_metadata = mock_region_metadata(backend.clone()); let intm_mgr = new_intm_mgr(d.path().to_string_lossy()).await; let mut indexer = FulltextIndexer::new( @@ -544,7 +547,7 @@ mod tests { &intm_mgr, ®ion_metadata, true, - 8096, + 1, 1024, ) .await @@ -562,7 +565,9 @@ mod tests { let _ = indexer.finish(&mut writer).await.unwrap(); writer.finish().await.unwrap(); - move |queries: Vec<(ColumnId, &str)>, terms_requests: Vec<(ColumnId, Vec<(bool, &str)>)>| { + move |queries: Vec<(ColumnId, &str)>, + terms_requests: Vec<(ColumnId, Vec<(bool, &str)>)>, + coarse_mask: Option| { let _d = &d; let region_dir = region_dir.clone(); let object_store = object_store.clone(); @@ -604,7 +609,29 @@ mod tests { factory, ); - async move { applier.apply(sst_file_id, None).await.unwrap() }.boxed() + let backend = backend.clone(); + async move { + match backend { + FulltextBackend::Tantivy => { + applier.apply_fine(sst_file_id, None).await.unwrap() + } + FulltextBackend::Bloom => { + let coarse_mask = coarse_mask.unwrap_or_default(); + let row_groups = (0..coarse_mask.len()).map(|i| (1, coarse_mask[i])); + // row group id == row id + let resp = applier + .apply_coarse(sst_file_id, None, row_groups) + .await + .unwrap(); + resp.map(|r| { + r.into_iter() + .map(|(row_group_id, _)| row_group_id as RowId) + .collect() + }) + } + } + } + .boxed() } } @@ -613,9 +640,10 @@ mod tests { } #[tokio::test] - async fn test_fulltext_index_basic_case_sensitive() { + async fn test_fulltext_index_basic_case_sensitive_tantivy() { let applier_factory = build_fulltext_applier_factory( - "test_fulltext_index_basic_case_sensitive_", + "test_fulltext_index_basic_case_sensitive_tantivy_", + FulltextBackend::Tantivy, &[ (Some("hello"), None, None), (Some("world"), None, None), @@ -625,47 +653,159 @@ mod tests { ) .await; - let row_ids = applier_factory(vec![(1, "hello")], vec![]).await; + let row_ids = applier_factory(vec![(1, "hello")], vec![], None).await; assert_eq!(row_ids, Some(rows([0]))); - let row_ids = applier_factory(vec![(1, "world")], vec![]).await; + let row_ids = applier_factory(vec![(1, "world")], vec![], None).await; assert_eq!(row_ids, Some(rows([1]))); - let row_ids = applier_factory(vec![(1, "Hello")], vec![]).await; + let row_ids = applier_factory(vec![(1, "Hello")], vec![], None).await; assert_eq!(row_ids, Some(rows([3]))); - let row_ids = applier_factory(vec![(1, "World")], vec![]).await; + let row_ids = applier_factory(vec![(1, "World")], vec![], None).await; assert_eq!(row_ids, Some(rows([3]))); - let row_ids = applier_factory(vec![], vec![(1, vec![(false, "hello")])]).await; + let row_ids = applier_factory(vec![], vec![(1, vec![(false, "hello")])], None).await; assert_eq!(row_ids, Some(rows([0]))); - let row_ids = applier_factory(vec![], vec![(1, vec![(true, "hello")])]).await; + let row_ids = applier_factory(vec![], vec![(1, vec![(true, "hello")])], None).await; assert_eq!(row_ids, None); - let row_ids = applier_factory(vec![], vec![(1, vec![(false, "world")])]).await; + let row_ids = applier_factory(vec![], vec![(1, vec![(false, "world")])], None).await; assert_eq!(row_ids, Some(rows([1]))); - let row_ids = applier_factory(vec![], vec![(1, vec![(true, "world")])]).await; + let row_ids = applier_factory(vec![], vec![(1, vec![(true, "world")])], None).await; assert_eq!(row_ids, None); - let row_ids = applier_factory(vec![], vec![(1, vec![(false, "Hello")])]).await; + let row_ids = applier_factory(vec![], vec![(1, vec![(false, "Hello")])], None).await; assert_eq!(row_ids, Some(rows([3]))); - let row_ids = applier_factory(vec![], vec![(1, vec![(true, "Hello")])]).await; + let row_ids = applier_factory(vec![], vec![(1, vec![(true, "Hello")])], None).await; assert_eq!(row_ids, None); - let row_ids = applier_factory(vec![], vec![(1, vec![(false, "Hello, World")])]).await; + let row_ids = applier_factory(vec![], vec![(1, vec![(false, "Hello, World")])], None).await; assert_eq!(row_ids, Some(rows([3]))); - let row_ids = applier_factory(vec![], vec![(1, vec![(true, "Hello, World")])]).await; + let row_ids = applier_factory(vec![], vec![(1, vec![(true, "Hello, World")])], None).await; assert_eq!(row_ids, None); } #[tokio::test] - async fn test_fulltext_index_basic_case_insensitive() { + async fn test_fulltext_index_basic_case_sensitive_bloom() { let applier_factory = build_fulltext_applier_factory( - "test_fulltext_index_basic_case_insensitive_", + "test_fulltext_index_basic_case_sensitive_bloom_", + FulltextBackend::Bloom, + &[ + (Some("hello"), None, None), + (Some("world"), None, None), + (None, None, None), + (Some("Hello, World"), None, None), + ], + ) + .await; + + let row_ids = applier_factory( + vec![], + vec![(1, vec![(false, "hello")])], + Some(BitVec::from_slice(&[0b1111])), + ) + .await; + assert_eq!(row_ids, Some(rows([0]))); + + let row_ids = applier_factory( + vec![], + vec![(1, vec![(false, "hello")])], + Some(BitVec::from_slice(&[0b1110])), // row 0 is filtered out + ) + .await; + assert_eq!(row_ids, Some(rows([]))); + + let row_ids = applier_factory( + vec![], + vec![(1, vec![(true, "hello")])], + Some(BitVec::from_slice(&[0b1111])), + ) + .await; + assert_eq!(row_ids, None); + + let row_ids = applier_factory( + vec![], + vec![(1, vec![(false, "world")])], + Some(BitVec::from_slice(&[0b1111])), + ) + .await; + assert_eq!(row_ids, Some(rows([1]))); + + let row_ids = applier_factory( + vec![], + vec![(1, vec![(false, "world")])], + Some(BitVec::from_slice(&[0b1101])), // row 1 is filtered out + ) + .await; + assert_eq!(row_ids, Some(rows([]))); + + let row_ids = applier_factory( + vec![], + vec![(1, vec![(true, "world")])], + Some(BitVec::from_slice(&[0b1111])), + ) + .await; + assert_eq!(row_ids, None); + + let row_ids = applier_factory( + vec![], + vec![(1, vec![(false, "Hello")])], + Some(BitVec::from_slice(&[0b1111])), + ) + .await; + assert_eq!(row_ids, Some(rows([3]))); + + let row_ids = applier_factory( + vec![], + vec![(1, vec![(false, "Hello")])], + Some(BitVec::from_slice(&[0b0111])), // row 3 is filtered out + ) + .await; + assert_eq!(row_ids, Some(rows([]))); + + let row_ids = applier_factory( + vec![], + vec![(1, vec![(true, "Hello")])], + Some(BitVec::from_slice(&[0b1111])), + ) + .await; + assert_eq!(row_ids, None); + + let row_ids = applier_factory( + vec![], + vec![(1, vec![(false, "Hello, World")])], + Some(BitVec::from_slice(&[0b1111])), + ) + .await; + assert_eq!(row_ids, Some(rows([3]))); + + let row_ids = applier_factory( + vec![], + vec![(1, vec![(false, "Hello, World")])], + Some(BitVec::from_slice(&[0b0111])), // row 3 is filtered out + ) + .await; + assert_eq!(row_ids, Some(rows([]))); + + let row_ids = applier_factory( + vec![], + vec![(1, vec![(true, "Hello, World")])], + Some(BitVec::from_slice(&[0b1111])), + ) + .await; + assert_eq!(row_ids, None); + } + + #[tokio::test] + async fn test_fulltext_index_basic_case_insensitive_tantivy() { + let applier_factory = build_fulltext_applier_factory( + "test_fulltext_index_basic_case_insensitive_tantivy_", + FulltextBackend::Tantivy, &[ (None, Some("hello"), None), (None, None, None), @@ -675,47 +815,191 @@ mod tests { ) .await; - let row_ids = applier_factory(vec![(2, "hello")], vec![]).await; + let row_ids = applier_factory(vec![(2, "hello")], vec![], None).await; assert_eq!(row_ids, Some(rows([0, 3]))); - let row_ids = applier_factory(vec![(2, "world")], vec![]).await; + let row_ids = applier_factory(vec![(2, "world")], vec![], None).await; assert_eq!(row_ids, Some(rows([2, 3]))); - let row_ids = applier_factory(vec![(2, "Hello")], vec![]).await; + let row_ids = applier_factory(vec![(2, "Hello")], vec![], None).await; assert_eq!(row_ids, Some(rows([0, 3]))); - let row_ids = applier_factory(vec![(2, "World")], vec![]).await; + let row_ids = applier_factory(vec![(2, "World")], vec![], None).await; assert_eq!(row_ids, Some(rows([2, 3]))); - let row_ids = applier_factory(vec![], vec![(2, vec![(false, "hello")])]).await; + let row_ids = applier_factory(vec![], vec![(2, vec![(false, "hello")])], None).await; assert_eq!(row_ids, Some(rows([0, 3]))); - let row_ids = applier_factory(vec![], vec![(2, vec![(true, "hello")])]).await; + let row_ids = applier_factory(vec![], vec![(2, vec![(true, "hello")])], None).await; assert_eq!(row_ids, Some(rows([0, 3]))); - let row_ids = applier_factory(vec![], vec![(2, vec![(false, "world")])]).await; + let row_ids = applier_factory(vec![], vec![(2, vec![(false, "world")])], None).await; assert_eq!(row_ids, Some(rows([2, 3]))); - let row_ids = applier_factory(vec![], vec![(2, vec![(true, "world")])]).await; + let row_ids = applier_factory(vec![], vec![(2, vec![(true, "world")])], None).await; assert_eq!(row_ids, Some(rows([2, 3]))); - let row_ids = applier_factory(vec![], vec![(2, vec![(false, "Hello")])]).await; + let row_ids = applier_factory(vec![], vec![(2, vec![(false, "Hello")])], None).await; assert_eq!(row_ids, Some(rows([0, 3]))); - let row_ids = applier_factory(vec![], vec![(2, vec![(true, "Hello")])]).await; + let row_ids = applier_factory(vec![], vec![(2, vec![(true, "Hello")])], None).await; assert_eq!(row_ids, Some(rows([0, 3]))); - let row_ids = applier_factory(vec![], vec![(2, vec![(false, "World")])]).await; + let row_ids = applier_factory(vec![], vec![(2, vec![(false, "World")])], None).await; assert_eq!(row_ids, Some(rows([2, 3]))); - let row_ids = applier_factory(vec![], vec![(2, vec![(true, "World")])]).await; + let row_ids = applier_factory(vec![], vec![(2, vec![(true, "World")])], None).await; assert_eq!(row_ids, Some(rows([2, 3]))); } #[tokio::test] - async fn test_fulltext_index_basic_chinese() { + async fn test_fulltext_index_basic_case_insensitive_bloom() { let applier_factory = build_fulltext_applier_factory( - "test_fulltext_index_basic_chinese_", + "test_fulltext_index_basic_case_insensitive_bloom_", + FulltextBackend::Bloom, + &[ + (None, Some("hello"), None), + (None, None, None), + (None, Some("world"), None), + (None, Some("Hello, World"), None), + ], + ) + .await; + + let row_ids = applier_factory( + vec![], + vec![(2, vec![(false, "hello")])], + Some(BitVec::from_slice(&[0b1111])), + ) + .await; + assert_eq!(row_ids, Some(rows([0, 3]))); + + let row_ids = applier_factory( + vec![], + vec![(2, vec![(false, "hello")])], + Some(BitVec::from_slice(&[0b1110])), // row 0 is filtered out + ) + .await; + assert_eq!(row_ids, Some(rows([3]))); + + let row_ids = applier_factory( + vec![], + vec![(2, vec![(true, "hello")])], + Some(BitVec::from_slice(&[0b1111])), + ) + .await; + assert_eq!(row_ids, Some(rows([0, 3]))); + + let row_ids = applier_factory( + vec![], + vec![(2, vec![(true, "hello")])], + Some(BitVec::from_slice(&[0b1110])), // row 0 is filtered out + ) + .await; + assert_eq!(row_ids, Some(rows([3]))); + + let row_ids = applier_factory( + vec![], + vec![(2, vec![(false, "world")])], + Some(BitVec::from_slice(&[0b1111])), + ) + .await; + assert_eq!(row_ids, Some(rows([2, 3]))); + + let row_ids = applier_factory( + vec![], + vec![(2, vec![(false, "world")])], + Some(BitVec::from_slice(&[0b1011])), // row 2 is filtered out + ) + .await; + assert_eq!(row_ids, Some(rows([3]))); + + let row_ids = applier_factory( + vec![], + vec![(2, vec![(true, "world")])], + Some(BitVec::from_slice(&[0b1111])), + ) + .await; + assert_eq!(row_ids, Some(rows([2, 3]))); + + let row_ids = applier_factory( + vec![], + vec![(2, vec![(true, "world")])], + Some(BitVec::from_slice(&[0b1011])), // row 2 is filtered out + ) + .await; + assert_eq!(row_ids, Some(rows([3]))); + + let row_ids = applier_factory( + vec![], + vec![(2, vec![(false, "Hello")])], + Some(BitVec::from_slice(&[0b1111])), + ) + .await; + assert_eq!(row_ids, Some(rows([0, 3]))); + + let row_ids = applier_factory( + vec![], + vec![(2, vec![(false, "Hello")])], + Some(BitVec::from_slice(&[0b0111])), // row 3 is filtered out + ) + .await; + assert_eq!(row_ids, Some(rows([0]))); + + let row_ids = applier_factory( + vec![], + vec![(2, vec![(true, "Hello")])], + Some(BitVec::from_slice(&[0b1111])), + ) + .await; + assert_eq!(row_ids, Some(rows([0, 3]))); + + let row_ids = applier_factory( + vec![], + vec![(2, vec![(true, "Hello")])], + Some(BitVec::from_slice(&[0b1110])), // row 0 is filtered out + ) + .await; + assert_eq!(row_ids, Some(rows([3]))); + + let row_ids = applier_factory( + vec![], + vec![(2, vec![(false, "World")])], + Some(BitVec::from_slice(&[0b1111])), + ) + .await; + assert_eq!(row_ids, Some(rows([2, 3]))); + + let row_ids = applier_factory( + vec![], + vec![(2, vec![(false, "World")])], + Some(BitVec::from_slice(&[0b0111])), // row 3 is filtered out + ) + .await; + assert_eq!(row_ids, Some(rows([2]))); + + let row_ids = applier_factory( + vec![], + vec![(2, vec![(true, "World")])], + Some(BitVec::from_slice(&[0b1111])), + ) + .await; + assert_eq!(row_ids, Some(rows([2, 3]))); + + let row_ids = applier_factory( + vec![], + vec![(2, vec![(true, "World")])], + Some(BitVec::from_slice(&[0b1011])), // row 2 is filtered out + ) + .await; + assert_eq!(row_ids, Some(rows([3]))); + } + + #[tokio::test] + async fn test_fulltext_index_basic_chinese_tantivy() { + let applier_factory = build_fulltext_applier_factory( + "test_fulltext_index_basic_chinese_tantivy_", + FulltextBackend::Tantivy, &[ (None, None, Some("你好")), (None, None, None), @@ -725,23 +1009,71 @@ mod tests { ) .await; - let row_ids = applier_factory(vec![(3, "你好")], vec![]).await; + let row_ids = applier_factory(vec![(3, "你好")], vec![], None).await; assert_eq!(row_ids, Some(rows([0, 3]))); - let row_ids = applier_factory(vec![(3, "世界")], vec![]).await; + let row_ids = applier_factory(vec![(3, "世界")], vec![], None).await; assert_eq!(row_ids, Some(rows([2, 3]))); - let row_ids = applier_factory(vec![], vec![(3, vec![(false, "你好")])]).await; + let row_ids = applier_factory(vec![], vec![(3, vec![(false, "你好")])], None).await; assert_eq!(row_ids, Some(rows([0, 3]))); - let row_ids = applier_factory(vec![], vec![(3, vec![(false, "世界")])]).await; + let row_ids = applier_factory(vec![], vec![(3, vec![(false, "世界")])], None).await; assert_eq!(row_ids, Some(rows([2, 3]))); } #[tokio::test] - async fn test_fulltext_index_multi_terms_case_sensitive() { + async fn test_fulltext_index_basic_chinese_bloom() { let applier_factory = build_fulltext_applier_factory( - "test_fulltext_index_multi_terms_case_sensitive_", + "test_fulltext_index_basic_chinese_bloom_", + FulltextBackend::Bloom, + &[ + (None, None, Some("你好")), + (None, None, None), + (None, None, Some("世界")), + (None, None, Some("你好,世界")), + ], + ) + .await; + + let row_ids = applier_factory( + vec![], + vec![(3, vec![(false, "你好")])], + Some(BitVec::from_slice(&[0b1111])), + ) + .await; + assert_eq!(row_ids, Some(rows([0, 3]))); + + let row_ids = applier_factory( + vec![], + vec![(3, vec![(false, "你好")])], + Some(BitVec::from_slice(&[0b1110])), // row 0 is filtered out + ) + .await; + assert_eq!(row_ids, Some(rows([3]))); + + let row_ids = applier_factory( + vec![], + vec![(3, vec![(false, "世界")])], + Some(BitVec::from_slice(&[0b1111])), + ) + .await; + assert_eq!(row_ids, Some(rows([2, 3]))); + + let row_ids = applier_factory( + vec![], + vec![(3, vec![(false, "世界")])], + Some(BitVec::from_slice(&[0b1011])), // row 2 is filtered out + ) + .await; + assert_eq!(row_ids, Some(rows([3]))); + } + + #[tokio::test] + async fn test_fulltext_index_multi_terms_case_sensitive_tantivy() { + let applier_factory = build_fulltext_applier_factory( + "test_fulltext_index_multi_terms_case_sensitive_tantivy_", + FulltextBackend::Tantivy, &[ (Some("Hello"), None, None), (Some("World"), None, None), @@ -751,31 +1083,107 @@ mod tests { ) .await; - let row_ids = - applier_factory(vec![], vec![(1, vec![(false, "hello"), (false, "world")])]).await; + let row_ids = applier_factory( + vec![], + vec![(1, vec![(false, "hello"), (false, "world")])], + None, + ) + .await; assert_eq!(row_ids, Some(rows([]))); - let row_ids = - applier_factory(vec![], vec![(1, vec![(false, "Hello"), (false, "World")])]).await; + let row_ids = applier_factory( + vec![], + vec![(1, vec![(false, "Hello"), (false, "World")])], + None, + ) + .await; assert_eq!(row_ids, Some(rows([3]))); - let row_ids = - applier_factory(vec![], vec![(1, vec![(true, "Hello"), (false, "World")])]).await; + let row_ids = applier_factory( + vec![], + vec![(1, vec![(true, "Hello"), (false, "World")])], + None, + ) + .await; assert_eq!(row_ids, Some(rows([1, 3]))); - let row_ids = - applier_factory(vec![], vec![(1, vec![(false, "Hello"), (true, "World")])]).await; + let row_ids = applier_factory( + vec![], + vec![(1, vec![(false, "Hello"), (true, "World")])], + None, + ) + .await; assert_eq!(row_ids, Some(rows([0, 3]))); - let row_ids = - applier_factory(vec![], vec![(1, vec![(true, "Hello"), (true, "World")])]).await; + let row_ids = applier_factory( + vec![], + vec![(1, vec![(true, "Hello"), (true, "World")])], + None, + ) + .await; assert_eq!(row_ids, None); } #[tokio::test] - async fn test_fulltext_index_multi_terms_case_insensitive() { + async fn test_fulltext_index_multi_terms_case_sensitive_bloom() { let applier_factory = build_fulltext_applier_factory( - "test_fulltext_index_multi_terms_case_insensitive_", + "test_fulltext_index_multi_terms_case_sensitive_bloom_", + FulltextBackend::Bloom, + &[ + (Some("Hello"), None, None), + (Some("World"), None, None), + (None, None, None), + (Some("Hello, World"), None, None), + ], + ) + .await; + + let row_ids = applier_factory( + vec![], + vec![(1, vec![(false, "hello"), (false, "world")])], + Some(BitVec::from_slice(&[0b1111])), + ) + .await; + assert_eq!(row_ids, Some(rows([]))); + + let row_ids = applier_factory( + vec![], + vec![(1, vec![(false, "Hello"), (false, "World")])], + Some(BitVec::from_slice(&[0b1111])), + ) + .await; + assert_eq!(row_ids, Some(rows([3]))); + + let row_ids = applier_factory( + vec![], + vec![(1, vec![(true, "Hello"), (false, "World")])], + Some(BitVec::from_slice(&[0b1111])), + ) + .await; + assert_eq!(row_ids, Some(rows([1, 3]))); + + let row_ids = applier_factory( + vec![], + vec![(1, vec![(false, "Hello"), (true, "World")])], + Some(BitVec::from_slice(&[0b1111])), + ) + .await; + assert_eq!(row_ids, Some(rows([0, 3]))); + + let row_ids = applier_factory( + vec![], + vec![(1, vec![(true, "Hello"), (true, "World")])], + Some(BitVec::from_slice(&[0b1111])), + ) + .await; + assert_eq!(row_ids, None); + } + + #[tokio::test] + async fn test_fulltext_index_multi_terms_case_insensitive_tantivy() { + let applier_factory = build_fulltext_applier_factory( + "test_fulltext_index_multi_terms_case_insensitive_tantivy_", + FulltextBackend::Tantivy, &[ (None, Some("hello"), None), (None, None, None), @@ -785,27 +1193,91 @@ mod tests { ) .await; - let row_ids = - applier_factory(vec![], vec![(2, vec![(false, "hello"), (false, "world")])]).await; + let row_ids = applier_factory( + vec![], + vec![(2, vec![(false, "hello"), (false, "world")])], + None, + ) + .await; assert_eq!(row_ids, Some(rows([3]))); - let row_ids = - applier_factory(vec![], vec![(2, vec![(true, "hello"), (false, "world")])]).await; + let row_ids = applier_factory( + vec![], + vec![(2, vec![(true, "hello"), (false, "world")])], + None, + ) + .await; assert_eq!(row_ids, Some(rows([3]))); - let row_ids = - applier_factory(vec![], vec![(2, vec![(false, "hello"), (true, "world")])]).await; + let row_ids = applier_factory( + vec![], + vec![(2, vec![(false, "hello"), (true, "world")])], + None, + ) + .await; assert_eq!(row_ids, Some(rows([3]))); - let row_ids = - applier_factory(vec![], vec![(2, vec![(true, "hello"), (true, "world")])]).await; + let row_ids = applier_factory( + vec![], + vec![(2, vec![(true, "hello"), (true, "world")])], + None, + ) + .await; assert_eq!(row_ids, Some(rows([3]))); } #[tokio::test] - async fn test_fulltext_index_multi_columns() { + async fn test_fulltext_index_multi_terms_case_insensitive_bloom() { let applier_factory = build_fulltext_applier_factory( - "test_fulltext_index_multi_columns_", + "test_fulltext_index_multi_terms_case_insensitive_bloom_", + FulltextBackend::Bloom, + &[ + (None, Some("hello"), None), + (None, None, None), + (None, Some("world"), None), + (None, Some("Hello, World"), None), + ], + ) + .await; + + let row_ids = applier_factory( + vec![], + vec![(2, vec![(false, "hello"), (false, "world")])], + Some(BitVec::from_slice(&[0b1111])), + ) + .await; + assert_eq!(row_ids, Some(rows([3]))); + + let row_ids = applier_factory( + vec![], + vec![(2, vec![(true, "hello"), (false, "world")])], + Some(BitVec::from_slice(&[0b1111])), + ) + .await; + assert_eq!(row_ids, Some(rows([3]))); + + let row_ids = applier_factory( + vec![], + vec![(2, vec![(false, "hello"), (true, "world")])], + Some(BitVec::from_slice(&[0b1111])), + ) + .await; + assert_eq!(row_ids, Some(rows([3]))); + + let row_ids = applier_factory( + vec![], + vec![(2, vec![(true, "hello"), (true, "world")])], + Some(BitVec::from_slice(&[0b1111])), + ) + .await; + assert_eq!(row_ids, Some(rows([3]))); + } + + #[tokio::test] + async fn test_fulltext_index_multi_columns_tantivy() { + let applier_factory = build_fulltext_applier_factory( + "test_fulltext_index_multi_columns_tantivy_", + FulltextBackend::Tantivy, &[ (Some("Hello"), None, Some("你好")), (Some("World"), Some("world"), None), @@ -822,11 +1294,52 @@ mod tests { let row_ids = applier_factory( vec![(1, "Hello"), (3, "你好")], vec![(2, vec![(false, "world")])], + None, ) .await; assert_eq!(row_ids, Some(rows([3]))); - let row_ids = applier_factory(vec![(2, "World")], vec![(1, vec![(false, "World")])]).await; + let row_ids = + applier_factory(vec![(2, "World")], vec![(1, vec![(false, "World")])], None).await; + assert_eq!(row_ids, Some(rows([1, 3]))); + } + + #[tokio::test] + async fn test_fulltext_index_multi_columns_bloom() { + let applier_factory = build_fulltext_applier_factory( + "test_fulltext_index_multi_columns_bloom_", + FulltextBackend::Bloom, + &[ + (Some("Hello"), None, Some("你好")), + (Some("World"), Some("world"), None), + (None, Some("World"), Some("世界")), + ( + Some("Hello, World"), + Some("Hello, World"), + Some("你好,世界"), + ), + ], + ) + .await; + + let row_ids = applier_factory( + vec![], + vec![ + (1, vec![(false, "Hello")]), + (2, vec![(false, "world")]), + (3, vec![(false, "你好")]), + ], + Some(BitVec::from_slice(&[0b1111])), + ) + .await; + assert_eq!(row_ids, Some(rows([3]))); + + let row_ids = applier_factory( + vec![], + vec![(1, vec![(false, "World")]), (2, vec![(false, "World")])], + Some(BitVec::from_slice(&[0b1111])), + ) + .await; assert_eq!(row_ids, Some(rows([1, 3]))); } } diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 069e10344c..5c2ab17591 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -369,6 +369,9 @@ impl ParquetReaderBuilder { self.prune_row_groups_by_bloom_filter(parquet_meta, &mut output, metrics) .await; + self.prune_row_groups_by_fulltext_bloom(parquet_meta, &mut output, metrics) + .await; + output } @@ -389,7 +392,7 @@ impl ParquetReaderBuilder { let file_size_hint = self.file_handle.meta_ref().index_file_size(); let apply_res = match index_applier - .apply(self.file_handle.file_id(), Some(file_size_hint)) + .apply_fine(self.file_handle.file_id(), Some(file_size_hint)) .await { Ok(Some(res)) => res, @@ -631,6 +634,67 @@ impl ParquetReaderBuilder { true } + async fn prune_row_groups_by_fulltext_bloom( + &self, + parquet_meta: &ParquetMetaData, + output: &mut BTreeMap>, + metrics: &mut ReaderFilterMetrics, + ) -> bool { + let Some(index_applier) = &self.fulltext_index_applier else { + return false; + }; + + if !self.file_handle.meta_ref().fulltext_index_available() { + return false; + } + + let file_size_hint = self.file_handle.meta_ref().index_file_size(); + let apply_output = match index_applier + .apply_coarse( + self.file_handle.file_id(), + Some(file_size_hint), + parquet_meta + .row_groups() + .iter() + .enumerate() + .map(|(i, rg)| (rg.num_rows() as usize, output.contains_key(&i))), + ) + .await + { + Ok(Some(apply_output)) => apply_output, + Ok(None) => return false, + Err(err) => { + if cfg!(any(test, feature = "test")) { + panic!( + "Failed to apply fulltext index, region_id: {}, file_id: {}, err: {:?}", + self.file_handle.region_id(), + self.file_handle.file_id(), + err + ); + } else { + warn!( + err; "Failed to apply fulltext index, region_id: {}, file_id: {}", + self.file_handle.region_id(), self.file_handle.file_id() + ); + } + + return false; + } + }; + + Self::prune_row_groups_by_ranges( + parquet_meta, + apply_output + .into_iter() + .map(|(rg, ranges)| (rg, ranges.into_iter())), + output, + &mut metrics.rg_fulltext_filtered, + &mut metrics.rows_fulltext_filtered, + ); + + true + } + /// Prunes row groups by rows. The `rows_in_row_groups` is like a map from row group to /// a list of row ids to keep. fn prune_row_groups_by_rows(