diff --git a/src/index/src/fulltext_index.rs b/src/index/src/fulltext_index.rs index 3a7f58c8ab..4cbbbdf477 100644 --- a/src/index/src/fulltext_index.rs +++ b/src/index/src/fulltext_index.rs @@ -12,18 +12,25 @@ // See the License for the specific language governing permissions and // limitations under the License. +use puffin::blob_metadata::BlobMetadata; use serde::{Deserialize, Serialize}; - +use snafu::ResultExt; +use tantivy::tokenizer::{LowerCaser, SimpleTokenizer, TextAnalyzer, TokenizerManager}; +use tantivy_jieba::JiebaTokenizer; pub mod create; pub mod error; pub mod search; pub mod tokenizer; +pub const KEY_FULLTEXT_CONFIG: &str = "fulltext_config"; + +use crate::fulltext_index::error::{DeserializeFromJsonSnafu, Result}; + #[cfg(test)] mod tests; /// Configuration for fulltext index. -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)] pub struct Config { /// Analyzer to use for tokenization. pub analyzer: Analyzer, @@ -33,10 +40,38 @@ pub struct Config { } /// Analyzer to use for tokenization. -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)] pub enum Analyzer { #[default] English, Chinese, } + +impl Config { + fn build_tantivy_tokenizer(&self) -> TokenizerManager { + let mut builder = match self.analyzer { + Analyzer::English => TextAnalyzer::builder(SimpleTokenizer::default()).dynamic(), + Analyzer::Chinese => TextAnalyzer::builder(JiebaTokenizer {}).dynamic(), + }; + + if !self.case_sensitive { + builder = builder.filter_dynamic(LowerCaser); + } + + let tokenizer = builder.build(); + let tokenizer_manager = TokenizerManager::new(); + tokenizer_manager.register("default", tokenizer); + tokenizer_manager + } + + /// Extracts the fulltext index configuration from the blob metadata. + pub fn from_blob_metadata(metadata: &BlobMetadata) -> Result { + if let Some(config) = metadata.properties.get(KEY_FULLTEXT_CONFIG) { + let config = serde_json::from_str(config).context(DeserializeFromJsonSnafu)?; + return Ok(config); + } + + Ok(Self::default()) + } +} diff --git a/src/index/src/fulltext_index/create/bloom_filter.rs b/src/index/src/fulltext_index/create/bloom_filter.rs index 970f89d65d..127464db71 100644 --- a/src/index/src/fulltext_index/create/bloom_filter.rs +++ b/src/index/src/fulltext_index/create/bloom_filter.rs @@ -30,12 +30,10 @@ use crate::fulltext_index::error::{ SerializeToJsonSnafu, }; use crate::fulltext_index::tokenizer::{Analyzer, ChineseTokenizer, EnglishTokenizer}; -use crate::fulltext_index::Config; +use crate::fulltext_index::{Config, KEY_FULLTEXT_CONFIG}; const PIPE_BUFFER_SIZE_FOR_SENDING_BLOB: usize = 8192; -pub const KEY_FULLTEXT_CONFIG: &str = "fulltext_config"; - /// `BloomFilterFulltextIndexCreator` is for creating a fulltext index using a bloom filter. pub struct BloomFilterFulltextIndexCreator { inner: Option, diff --git a/src/index/src/fulltext_index/create/tantivy.rs b/src/index/src/fulltext_index/create/tantivy.rs index 6b09c1f0fb..274fea596e 100644 --- a/src/index/src/fulltext_index/create/tantivy.rs +++ b/src/index/src/fulltext_index/create/tantivy.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::path::{Path, PathBuf}; use async_trait::async_trait; @@ -21,15 +22,13 @@ use snafu::{OptionExt, ResultExt}; use tantivy::indexer::NoMergePolicy; use tantivy::schema::{Schema, STORED, TEXT}; use tantivy::store::{Compressor, ZstdCompressor}; -use tantivy::tokenizer::{LowerCaser, SimpleTokenizer, TextAnalyzer, TokenizerManager}; use tantivy::{doc, Index, IndexWriter}; -use tantivy_jieba::JiebaTokenizer; use crate::fulltext_index::create::FulltextIndexCreator; use crate::fulltext_index::error::{ - ExternalSnafu, FinishedSnafu, IoSnafu, JoinSnafu, Result, TantivySnafu, + ExternalSnafu, FinishedSnafu, IoSnafu, JoinSnafu, Result, SerializeToJsonSnafu, TantivySnafu, }; -use crate::fulltext_index::{Analyzer, Config}; +use crate::fulltext_index::{Config, KEY_FULLTEXT_CONFIG}; pub const TEXT_FIELD_NAME: &str = "greptime_fulltext_text"; pub const ROWID_FIELD_NAME: &str = "greptime_fulltext_rowid"; @@ -50,6 +49,9 @@ pub struct TantivyFulltextIndexCreator { /// The directory path in filesystem to store the index. path: PathBuf, + + /// The configuration of the fulltext index. + config: Config, } impl TantivyFulltextIndexCreator { @@ -68,7 +70,7 @@ impl TantivyFulltextIndexCreator { let mut index = Index::create_in_dir(&path, schema).context(TantivySnafu)?; index.settings_mut().docstore_compression = Compressor::Zstd(ZstdCompressor::default()); - index.set_tokenizers(Self::build_tokenizer(&config)); + index.set_tokenizers(config.build_tantivy_tokenizer()); let memory_limit = Self::sanitize_memory_limit(memory_limit); @@ -84,25 +86,10 @@ impl TantivyFulltextIndexCreator { rowid_field, max_rowid: 0, path: path.as_ref().to_path_buf(), + config, }) } - fn build_tokenizer(config: &Config) -> TokenizerManager { - let mut builder = match config.analyzer { - Analyzer::English => TextAnalyzer::builder(SimpleTokenizer::default()).dynamic(), - Analyzer::Chinese => TextAnalyzer::builder(JiebaTokenizer {}).dynamic(), - }; - - if !config.case_sensitive { - builder = builder.filter_dynamic(LowerCaser); - } - - let tokenizer = builder.build(); - let tokenizer_manager = TokenizerManager::new(); - tokenizer_manager.register("default", tokenizer); - tokenizer_manager - } - fn sanitize_memory_limit(memory_limit: usize) -> usize { // Port from tantivy::indexer::index_writer::{MEMORY_BUDGET_NUM_BYTES_MIN, MEMORY_BUDGET_NUM_BYTES_MAX} const MARGIN_IN_BYTES: usize = 1_000_000; @@ -137,8 +124,16 @@ impl FulltextIndexCreator for TantivyFulltextIndexCreator { .await .context(JoinSnafu)??; + let property_key = KEY_FULLTEXT_CONFIG.to_string(); + let property_value = serde_json::to_string(&self.config).context(SerializeToJsonSnafu)?; + puffin_writer - .put_dir(blob_key, self.path.clone(), put_options) + .put_dir( + blob_key, + self.path.clone(), + put_options, + HashMap::from([(property_key, property_value)]), + ) .await .map_err(BoxedError::new) .context(ExternalSnafu) @@ -174,6 +169,7 @@ mod tests { use tantivy::TantivyDocument; use super::*; + use crate::fulltext_index::Analyzer; struct MockPuffinWriter; @@ -197,6 +193,7 @@ mod tests { _key: &str, _dir: PathBuf, _options: PutOptions, + _properties: HashMap, ) -> puffin::error::Result { Ok(0) } @@ -226,7 +223,7 @@ mod tests { ("foo", vec![3]), ("bar", vec![4]), ]; - query_and_check(temp_dir.path(), &cases).await; + query_and_check(temp_dir.path(), config, &cases).await; } } @@ -248,9 +245,13 @@ mod tests { ("hello", vec![0u32, 2]), ("world", vec![1, 2]), ("foo", vec![3]), + ("Foo", vec![]), + ("FOO", vec![]), ("bar", vec![]), + ("Bar", vec![4]), + ("BAR", vec![]), ]; - query_and_check(temp_dir.path(), &cases).await; + query_and_check(temp_dir.path(), config, &cases).await; } } @@ -274,7 +275,7 @@ mod tests { ("foo", vec![4]), ("bar", vec![5]), ]; - query_and_check(temp_dir.path(), &cases).await; + query_and_check(temp_dir.path(), config, &cases).await; } } @@ -297,8 +298,12 @@ mod tests { ("世界", vec![1, 2, 3]), ("foo", vec![4]), ("bar", vec![]), + ("Foo", vec![]), + ("FOO", vec![]), + ("Bar", vec![5]), + ("BAR", vec![]), ]; - query_and_check(temp_dir.path(), &cases).await; + query_and_check(temp_dir.path(), config, &cases).await; } } @@ -315,8 +320,9 @@ mod tests { .unwrap(); } - async fn query_and_check(path: &Path, cases: &[(&str, Vec)]) { - let index = Index::open_in_dir(path).unwrap(); + async fn query_and_check(path: &Path, config: Config, cases: &[(&str, Vec)]) { + let mut index = Index::open_in_dir(path).unwrap(); + index.set_tokenizers(config.build_tantivy_tokenizer()); let reader = index.reader().unwrap(); let searcher = reader.searcher(); for (query, expected) in cases { diff --git a/src/index/src/fulltext_index/search/tantivy.rs b/src/index/src/fulltext_index/search/tantivy.rs index 61c87e863f..a55b599d21 100644 --- a/src/index/src/fulltext_index/search/tantivy.rs +++ b/src/index/src/fulltext_index/search/tantivy.rs @@ -29,6 +29,7 @@ use crate::fulltext_index::error::{ Result, TantivyDocNotFoundSnafu, TantivyParserSnafu, TantivySnafu, }; use crate::fulltext_index::search::{FulltextIndexSearcher, RowId}; +use crate::fulltext_index::Config; /// `TantivyFulltextIndexSearcher` is a searcher using Tantivy. pub struct TantivyFulltextIndexSearcher { @@ -42,10 +43,11 @@ pub struct TantivyFulltextIndexSearcher { impl TantivyFulltextIndexSearcher { /// Creates a new `TantivyFulltextIndexSearcher`. - pub fn new(path: impl AsRef) -> Result { + pub fn new(path: impl AsRef, config: Config) -> Result { let now = Instant::now(); - let index = Index::open_in_dir(path.as_ref()).context(TantivySnafu)?; + let mut index = Index::open_in_dir(path.as_ref()).context(TantivySnafu)?; + index.set_tokenizers(config.build_tantivy_tokenizer()); let reader = index .reader_builder() .reload_policy(ReloadPolicy::Manual) diff --git a/src/index/src/fulltext_index/tests.rs b/src/index/src/fulltext_index/tests.rs index d3491a7e9d..a2a87a645a 100644 --- a/src/index/src/fulltext_index/tests.rs +++ b/src/index/src/fulltext_index/tests.rs @@ -19,7 +19,7 @@ use common_test_util::temp_dir::{create_temp_dir, TempDir}; use puffin::puffin_manager::file_accessor::MockFileAccessor; use puffin::puffin_manager::fs_puffin_manager::FsPuffinManager; use puffin::puffin_manager::stager::BoundedStager; -use puffin::puffin_manager::{DirGuard, PuffinManager, PuffinReader, PuffinWriter, PutOptions}; +use puffin::puffin_manager::{PuffinManager, PuffinReader, PuffinWriter, PutOptions}; use crate::fulltext_index::create::{FulltextIndexCreator, TantivyFulltextIndexCreator}; use crate::fulltext_index::search::{FulltextIndexSearcher, RowId, TantivyFulltextIndexSearcher}; @@ -61,8 +61,7 @@ async fn test_search( prefix: &str, config: Config, texts: Vec<&str>, - query: &str, - expected: impl IntoIterator, + query_expected: Vec<(&str, impl IntoIterator)>, ) { let (_staging_dir, stager) = new_bounded_stager(prefix).await; let file_accessor = Arc::new(MockFileAccessor::new(prefix)); @@ -72,14 +71,16 @@ async fn test_search( let blob_key = "fulltext_index".to_string(); let mut writer = puffin_manager.writer(&file_name).await.unwrap(); create_index(prefix, &mut writer, &blob_key, texts, config).await; + writer.finish().await.unwrap(); let reader = puffin_manager.reader(&file_name).await.unwrap(); let index_dir = reader.dir(&blob_key).await.unwrap(); - let searcher = TantivyFulltextIndexSearcher::new(index_dir.path()).unwrap(); - let results = searcher.search(query).await.unwrap(); - - let expected = expected.into_iter().collect::>(); - assert_eq!(results, expected); + let searcher = TantivyFulltextIndexSearcher::new(index_dir.path(), config).unwrap(); + for (query, expected) in query_expected { + let results = searcher.search(query).await.unwrap(); + let expected = expected.into_iter().collect::>(); + assert_eq!(results, expected); + } } #[tokio::test] @@ -91,8 +92,7 @@ async fn test_simple_term() { "This is a sample text containing Barack Obama", "Another document mentioning Barack", ], - "Barack Obama", - [0, 1], + vec![("Barack Obama", [0, 1])], ) .await; } @@ -103,8 +103,7 @@ async fn test_negative_term() { "test_negative_term_", Config::default(), vec!["apple is a fruit", "I like apple", "fruit is healthy"], - "apple -fruit", - [1], + vec![("apple -fruit", [1])], ) .await; } @@ -119,8 +118,7 @@ async fn test_must_term() { "I love apples and fruits", "apple and fruit are good", ], - "+apple +fruit", - [2], + vec![("+apple +fruit", [2])], ) .await; } @@ -131,8 +129,7 @@ async fn test_boolean_operators() { "test_boolean_operators_", Config::default(), vec!["a b c", "a b", "b c", "c"], - "a AND b OR c", - [0, 1, 2, 3], + vec![("a AND b OR c", [0, 1, 2, 3])], ) .await; } @@ -146,8 +143,7 @@ async fn test_phrase_term() { "This is a sample text containing Barack Obama", "Another document mentioning Barack", ], - "\"Barack Obama\"", - [0], + vec![("\"Barack Obama\"", [0])], ) .await; } @@ -161,8 +157,7 @@ async fn test_config_english_analyzer_case_insensitive() { ..Config::default() }, vec!["Banana is a fruit", "I like apple", "Fruit is healthy"], - "banana", - [0], + vec![("banana", [0]), ("Banana", [0]), ("BANANA", [0])], ) .await; } @@ -175,9 +170,8 @@ async fn test_config_english_analyzer_case_sensitive() { case_sensitive: true, ..Config::default() }, - vec!["Banana is a fruit", "I like apple", "Fruit is healthy"], - "banana", - [], + vec!["Banana is a fruit", "I like banana", "Fruit is healthy"], + vec![("banana", [1]), ("Banana", [0])], ) .await; } @@ -191,8 +185,7 @@ async fn test_config_chinese_analyzer() { ..Default::default() }, vec!["苹果是一种水果", "我喜欢苹果", "水果很健康"], - "苹果", - [0, 1], + vec![("苹果", [0, 1])], ) .await; } diff --git a/src/mito2/src/sst/index/fulltext_index/applier.rs b/src/mito2/src/sst/index/fulltext_index/applier.rs index e463bd0ee8..94ceda6891 100644 --- a/src/mito2/src/sst/index/fulltext_index/applier.rs +++ b/src/mito2/src/sst/index/fulltext_index/applier.rs @@ -17,9 +17,10 @@ use std::sync::Arc; use common_telemetry::warn; use index::fulltext_index::search::{FulltextIndexSearcher, RowId, TantivyFulltextIndexSearcher}; +use index::fulltext_index::Config; use object_store::ObjectStore; use puffin::puffin_manager::cache::PuffinMetadataCacheRef; -use puffin::puffin_manager::{BlobWithMetadata, DirGuard, PuffinManager, PuffinReader}; +use puffin::puffin_manager::{GuardWithMetadata, PuffinManager, PuffinReader}; use snafu::ResultExt; use store_api::storage::{ColumnId, RegionId}; @@ -93,7 +94,7 @@ impl FulltextIndexApplier { let mut row_ids: Option> = None; for (column_id, request) in &self.requests { - if request.queries.is_empty() { + if request.queries.is_empty() && request.terms.is_empty() { continue; } @@ -133,15 +134,21 @@ impl FulltextIndexApplier { .dir(file_id, &blob_key, file_size_hint) .await?; - let path = match &dir { - Some(dir) => dir.path(), + let dir = match &dir { + Some(dir) => dir, None => { return Ok(None); } }; - let searcher = TantivyFulltextIndexSearcher::new(path).context(ApplyFulltextIndexSnafu)?; + let config = Config::from_blob_metadata(dir.metadata()).context(ApplyFulltextIndexSnafu)?; + let path = dir.path(); + + let searcher = + TantivyFulltextIndexSearcher::new(path, config).context(ApplyFulltextIndexSnafu)?; let mut row_ids: Option> = None; + + // 1. Apply queries for query in &request.queries { let result = searcher .search(&query.0) @@ -161,6 +168,21 @@ impl FulltextIndexApplier { } } + // 2. Apply terms + let query = request.terms_as_query(config.case_sensitive); + if !query.0.is_empty() { + let result = searcher + .search(&query.0) + .await + .context(ApplyFulltextIndexSnafu)?; + + if let Some(ids) = row_ids.as_mut() { + ids.retain(|id| result.contains(id)); + } else { + row_ids = Some(result); + } + } + Ok(row_ids) } } @@ -217,7 +239,7 @@ impl IndexSource { file_id: FileId, key: &str, file_size_hint: Option, - ) -> Result>> { + ) -> Result>> { let (reader, fallbacked) = self.ensure_reader(file_id, file_size_hint).await?; let res = reader.blob(key).await; match res { @@ -248,7 +270,7 @@ impl IndexSource { file_id: FileId, key: &str, file_size_hint: Option, - ) -> Result> { + ) -> Result>> { let (reader, fallbacked) = self.ensure_reader(file_id, file_size_hint).await?; let res = reader.dir(key).await; match res { diff --git a/src/mito2/src/sst/index/fulltext_index/applier/builder.rs b/src/mito2/src/sst/index/fulltext_index/applier/builder.rs index e5cb6cf765..14f5936a01 100644 --- a/src/mito2/src/sst/index/fulltext_index/applier/builder.rs +++ b/src/mito2/src/sst/index/fulltext_index/applier/builder.rs @@ -30,12 +30,37 @@ use crate::sst::index::puffin_manager::PuffinManagerFactory; /// A request for fulltext index. /// /// It contains all the queries and terms for a column. -#[derive(Default)] +#[derive(Default, Debug)] pub struct FulltextRequest { pub queries: Vec, pub terms: Vec, } +impl FulltextRequest { + /// Convert terms to a query string. + /// + /// For example, if the terms are ["foo", "bar"], the query string will be `r#"+"foo" +"bar""#`. + /// Need to escape the `"` in the term. + /// + /// `skip_lowercased` is used for the situation that lowercased terms are not indexed. + pub fn terms_as_query(&self, skip_lowercased: bool) -> FulltextQuery { + let mut query = String::new(); + for term in &self.terms { + if skip_lowercased && term.col_lowered { + continue; + } + // Escape the `"` in the term. + let escaped_term = term.term.replace("\"", "\\\""); + if query.is_empty() { + query = format!("+\"{escaped_term}\""); + } else { + query.push_str(&format!(" +\"{escaped_term}\"")); + } + } + FulltextQuery(query) + } +} + /// A query to be matched in fulltext index. /// /// `query` is the query to be matched, e.g. "+foo -bar" in `SELECT * FROM t WHERE matches(text, "+foo -bar")`. @@ -543,4 +568,92 @@ mod tests { } ); } + + #[test] + fn test_terms_as_query() { + // Test with empty terms + let request = FulltextRequest::default(); + assert_eq!(request.terms_as_query(false), FulltextQuery(String::new())); + assert_eq!(request.terms_as_query(true), FulltextQuery(String::new())); + + // Test with a single term (not lowercased) + let mut request = FulltextRequest::default(); + request.terms.push(FulltextTerm { + col_lowered: false, + term: "foo".to_string(), + }); + assert_eq!( + request.terms_as_query(false), + FulltextQuery("+\"foo\"".to_string()) + ); + assert_eq!( + request.terms_as_query(true), + FulltextQuery("+\"foo\"".to_string()) + ); + + // Test with a single lowercased term and skip_lowercased=true + let mut request = FulltextRequest::default(); + request.terms.push(FulltextTerm { + col_lowered: true, + term: "foo".to_string(), + }); + assert_eq!( + request.terms_as_query(false), + FulltextQuery("+\"foo\"".to_string()) + ); + assert_eq!(request.terms_as_query(true), FulltextQuery(String::new())); // Should skip lowercased term + + // Test with multiple terms, mix of lowercased and not + let mut request = FulltextRequest::default(); + request.terms.push(FulltextTerm { + col_lowered: false, + term: "foo".to_string(), + }); + request.terms.push(FulltextTerm { + col_lowered: true, + term: "bar".to_string(), + }); + assert_eq!( + request.terms_as_query(false), + FulltextQuery("+\"foo\" +\"bar\"".to_string()) + ); + assert_eq!( + request.terms_as_query(true), + FulltextQuery("+\"foo\"".to_string()) // Only the non-lowercased term + ); + + // Test with term containing quotes that need escaping + let mut request = FulltextRequest::default(); + request.terms.push(FulltextTerm { + col_lowered: false, + term: "foo\"bar".to_string(), + }); + assert_eq!( + request.terms_as_query(false), + FulltextQuery("+\"foo\\\"bar\"".to_string()) + ); + + // Test with a complex mix of terms + let mut request = FulltextRequest::default(); + request.terms.push(FulltextTerm { + col_lowered: false, + term: "foo".to_string(), + }); + request.terms.push(FulltextTerm { + col_lowered: true, + term: "bar\"quoted\"".to_string(), + }); + request.terms.push(FulltextTerm { + col_lowered: false, + term: "baz\\escape".to_string(), + }); + assert_eq!( + request.terms_as_query(false), + FulltextQuery("+\"foo\" +\"bar\\\"quoted\\\"\" +\"baz\\escape\"".to_string()) + ); + assert_eq!( + request.terms_as_query(true), + FulltextQuery("+\"foo\" +\"baz\\escape\"".to_string()) // Skips the lowercased term + ); + } } diff --git a/src/mito2/src/sst/index/fulltext_index/creator.rs b/src/mito2/src/sst/index/fulltext_index/creator.rs index b6eab05bfa..12b83e39d0 100644 --- a/src/mito2/src/sst/index/fulltext_index/creator.rs +++ b/src/mito2/src/sst/index/fulltext_index/creator.rs @@ -376,7 +376,9 @@ mod tests { use crate::access_layer::RegionFilePathFactory; use crate::read::{Batch, BatchColumn}; use crate::sst::file::FileId; - use crate::sst::index::fulltext_index::applier::builder::{FulltextQuery, FulltextRequest}; + use crate::sst::index::fulltext_index::applier::builder::{ + FulltextQuery, FulltextRequest, FulltextTerm, + }; use crate::sst::index::fulltext_index::applier::FulltextIndexApplier; use crate::sst::index::puffin_manager::PuffinManagerFactory; @@ -510,14 +512,25 @@ mod tests { .unwrap() } - async fn build_applier_factory( + /// Applier factory that can handle both queries and terms. + /// + /// It builds a fulltext index with the given data rows, and returns a function + /// that can handle both queries and terms in a single request. + /// + /// The function takes two parameters: + /// - `queries`: A list of (ColumnId, query_string) pairs for fulltext queries + /// - `terms`: A list of (ColumnId, [(bool, String)]) for fulltext terms, where bool indicates if term is lowercased + async fn build_fulltext_applier_factory( prefix: &str, rows: &[( Option<&str>, // text_english_case_sensitive Option<&str>, // text_english_case_insensitive Option<&str>, // text_chinese )], - ) -> impl Fn(Vec<(ColumnId, &str)>) -> BoxFuture<'static, BTreeSet> { + ) -> impl Fn( + Vec<(ColumnId, &str)>, + Vec<(ColumnId, Vec<(bool, &str)>)>, + ) -> BoxFuture<'static, Option>> { let (d, factory) = PuffinManagerFactory::new_for_test_async(prefix).await; let region_dir = "region0".to_string(); let sst_file_id = FileId::random(); @@ -549,74 +562,253 @@ mod tests { let _ = indexer.finish(&mut writer).await.unwrap(); writer.finish().await.unwrap(); - move |queries| { + move |queries: Vec<(ColumnId, &str)>, terms_requests: Vec<(ColumnId, Vec<(bool, &str)>)>| { let _d = &d; - let applier = FulltextIndexApplier::new( - region_dir.clone(), - region_metadata.region_id, - object_store.clone(), - queries + let region_dir = region_dir.clone(); + let object_store = object_store.clone(); + let factory = factory.clone(); + + let mut requests: HashMap = HashMap::new(); + + // Add queries + for (column_id, query) in queries { + requests + .entry(column_id) + .or_default() + .queries + .push(FulltextQuery(query.to_string())); + } + + // Add terms + for (column_id, terms) in terms_requests { + let fulltext_terms = terms .into_iter() - .map(|(a, b)| { - ( - a, - FulltextRequest { - queries: vec![FulltextQuery(b.to_string())], - terms: vec![], - }, - ) + .map(|(col_lowered, term)| FulltextTerm { + col_lowered, + term: term.to_string(), }) - .collect(), - factory.clone(), + .collect::>(); + + requests + .entry(column_id) + .or_default() + .terms + .extend(fulltext_terms); + } + + let applier = FulltextIndexApplier::new( + region_dir, + region_metadata.region_id, + object_store, + requests, + factory, ); - async move { applier.apply(sst_file_id, None).await.unwrap().unwrap() }.boxed() + async move { applier.apply(sst_file_id, None).await.unwrap() }.boxed() } } + fn rows(row_ids: impl IntoIterator) -> BTreeSet { + row_ids.into_iter().collect() + } + #[tokio::test] - async fn test_fulltext_index_basic() { - let applier_factory = build_applier_factory( - "test_fulltext_index_basic_", + async fn test_fulltext_index_basic_case_sensitive() { + let applier_factory = build_fulltext_applier_factory( + "test_fulltext_index_basic_case_sensitive_", &[ - (Some("hello"), None, Some("你好")), - (Some("world"), Some("world"), None), - (None, Some("World"), Some("世界")), - ( - Some("Hello, World"), - Some("Hello, World"), - Some("你好,世界"), - ), + (Some("hello"), None, None), + (Some("world"), None, None), + (None, None, None), + (Some("Hello, World"), None, None), ], ) .await; - let row_ids = applier_factory(vec![(1, "hello")]).await; - assert_eq!(row_ids, vec![0].into_iter().collect()); + let row_ids = applier_factory(vec![(1, "hello")], vec![]).await; + assert_eq!(row_ids, Some(rows([0]))); - let row_ids = applier_factory(vec![(1, "world")]).await; - assert_eq!(row_ids, vec![1].into_iter().collect()); + let row_ids = applier_factory(vec![(1, "world")], vec![]).await; + assert_eq!(row_ids, Some(rows([1]))); - let row_ids = applier_factory(vec![(2, "hello")]).await; - assert_eq!(row_ids, vec![3].into_iter().collect()); + let row_ids = applier_factory(vec![(1, "Hello")], vec![]).await; + assert_eq!(row_ids, Some(rows([3]))); - let row_ids = applier_factory(vec![(2, "world")]).await; - assert_eq!(row_ids, vec![1, 2, 3].into_iter().collect()); + let row_ids = applier_factory(vec![(1, "World")], vec![]).await; + assert_eq!(row_ids, Some(rows([3]))); - let row_ids = applier_factory(vec![(3, "你好")]).await; - assert_eq!(row_ids, vec![0, 3].into_iter().collect()); + let row_ids = applier_factory(vec![], vec![(1, vec![(false, "hello")])]).await; + assert_eq!(row_ids, Some(rows([0]))); - let row_ids = applier_factory(vec![(3, "世界")]).await; - assert_eq!(row_ids, vec![2, 3].into_iter().collect()); + let row_ids = applier_factory(vec![], vec![(1, vec![(true, "hello")])]).await; + assert_eq!(row_ids, None); + + let row_ids = applier_factory(vec![], vec![(1, vec![(false, "world")])]).await; + assert_eq!(row_ids, Some(rows([1]))); + + let row_ids = applier_factory(vec![], vec![(1, vec![(true, "world")])]).await; + assert_eq!(row_ids, None); + + let row_ids = applier_factory(vec![], vec![(1, vec![(false, "Hello")])]).await; + assert_eq!(row_ids, Some(rows([3]))); + + let row_ids = applier_factory(vec![], vec![(1, vec![(true, "Hello")])]).await; + assert_eq!(row_ids, None); + + let row_ids = applier_factory(vec![], vec![(1, vec![(false, "Hello, World")])]).await; + assert_eq!(row_ids, Some(rows([3]))); + + let row_ids = applier_factory(vec![], vec![(1, vec![(true, "Hello, World")])]).await; + assert_eq!(row_ids, None); + } + + #[tokio::test] + async fn test_fulltext_index_basic_case_insensitive() { + let applier_factory = build_fulltext_applier_factory( + "test_fulltext_index_basic_case_insensitive_", + &[ + (None, Some("hello"), None), + (None, None, None), + (None, Some("world"), None), + (None, Some("Hello, World"), None), + ], + ) + .await; + + let row_ids = applier_factory(vec![(2, "hello")], vec![]).await; + assert_eq!(row_ids, Some(rows([0, 3]))); + + let row_ids = applier_factory(vec![(2, "world")], vec![]).await; + assert_eq!(row_ids, Some(rows([2, 3]))); + + let row_ids = applier_factory(vec![(2, "Hello")], vec![]).await; + assert_eq!(row_ids, Some(rows([0, 3]))); + + let row_ids = applier_factory(vec![(2, "World")], vec![]).await; + assert_eq!(row_ids, Some(rows([2, 3]))); + + let row_ids = applier_factory(vec![], vec![(2, vec![(false, "hello")])]).await; + assert_eq!(row_ids, Some(rows([0, 3]))); + + let row_ids = applier_factory(vec![], vec![(2, vec![(true, "hello")])]).await; + assert_eq!(row_ids, Some(rows([0, 3]))); + + let row_ids = applier_factory(vec![], vec![(2, vec![(false, "world")])]).await; + assert_eq!(row_ids, Some(rows([2, 3]))); + + let row_ids = applier_factory(vec![], vec![(2, vec![(true, "world")])]).await; + assert_eq!(row_ids, Some(rows([2, 3]))); + + let row_ids = applier_factory(vec![], vec![(2, vec![(false, "Hello")])]).await; + assert_eq!(row_ids, Some(rows([0, 3]))); + + let row_ids = applier_factory(vec![], vec![(2, vec![(true, "Hello")])]).await; + assert_eq!(row_ids, Some(rows([0, 3]))); + + let row_ids = applier_factory(vec![], vec![(2, vec![(false, "World")])]).await; + assert_eq!(row_ids, Some(rows([2, 3]))); + + let row_ids = applier_factory(vec![], vec![(2, vec![(true, "World")])]).await; + assert_eq!(row_ids, Some(rows([2, 3]))); + } + + #[tokio::test] + async fn test_fulltext_index_basic_chinese() { + let applier_factory = build_fulltext_applier_factory( + "test_fulltext_index_basic_chinese_", + &[ + (None, None, Some("你好")), + (None, None, None), + (None, None, Some("世界")), + (None, None, Some("你好,世界")), + ], + ) + .await; + + let row_ids = applier_factory(vec![(3, "你好")], vec![]).await; + assert_eq!(row_ids, Some(rows([0, 3]))); + + let row_ids = applier_factory(vec![(3, "世界")], vec![]).await; + assert_eq!(row_ids, Some(rows([2, 3]))); + + let row_ids = applier_factory(vec![], vec![(3, vec![(false, "你好")])]).await; + assert_eq!(row_ids, Some(rows([0, 3]))); + + let row_ids = applier_factory(vec![], vec![(3, vec![(false, "世界")])]).await; + assert_eq!(row_ids, Some(rows([2, 3]))); + } + + #[tokio::test] + async fn test_fulltext_index_multi_terms_case_sensitive() { + let applier_factory = build_fulltext_applier_factory( + "test_fulltext_index_multi_terms_case_sensitive_", + &[ + (Some("Hello"), None, None), + (Some("World"), None, None), + (None, None, None), + (Some("Hello, World"), None, None), + ], + ) + .await; + + let row_ids = + applier_factory(vec![], vec![(1, vec![(false, "hello"), (false, "world")])]).await; + assert_eq!(row_ids, Some(rows([]))); + + let row_ids = + applier_factory(vec![], vec![(1, vec![(false, "Hello"), (false, "World")])]).await; + assert_eq!(row_ids, Some(rows([3]))); + + let row_ids = + applier_factory(vec![], vec![(1, vec![(true, "Hello"), (false, "World")])]).await; + assert_eq!(row_ids, Some(rows([1, 3]))); + + let row_ids = + applier_factory(vec![], vec![(1, vec![(false, "Hello"), (true, "World")])]).await; + assert_eq!(row_ids, Some(rows([0, 3]))); + + let row_ids = + applier_factory(vec![], vec![(1, vec![(true, "Hello"), (true, "World")])]).await; + assert_eq!(row_ids, None); + } + + #[tokio::test] + async fn test_fulltext_index_multi_terms_case_insensitive() { + let applier_factory = build_fulltext_applier_factory( + "test_fulltext_index_multi_terms_case_insensitive_", + &[ + (None, Some("hello"), None), + (None, None, None), + (None, Some("world"), None), + (None, Some("Hello, World"), None), + ], + ) + .await; + + let row_ids = + applier_factory(vec![], vec![(2, vec![(false, "hello"), (false, "world")])]).await; + assert_eq!(row_ids, Some(rows([3]))); + + let row_ids = + applier_factory(vec![], vec![(2, vec![(true, "hello"), (false, "world")])]).await; + assert_eq!(row_ids, Some(rows([3]))); + + let row_ids = + applier_factory(vec![], vec![(2, vec![(false, "hello"), (true, "world")])]).await; + assert_eq!(row_ids, Some(rows([3]))); + + let row_ids = + applier_factory(vec![], vec![(2, vec![(true, "hello"), (true, "world")])]).await; + assert_eq!(row_ids, Some(rows([3]))); } #[tokio::test] async fn test_fulltext_index_multi_columns() { - let applier_factory = build_applier_factory( + let applier_factory = build_fulltext_applier_factory( "test_fulltext_index_multi_columns_", &[ - (Some("hello"), None, Some("你好")), - (Some("world"), Some("world"), None), + (Some("Hello"), None, Some("你好")), + (Some("World"), Some("world"), None), (None, Some("World"), Some("世界")), ( Some("Hello, World"), @@ -627,13 +819,14 @@ mod tests { ) .await; - let row_ids = applier_factory(vec![(1, "hello"), (3, "你好")]).await; - assert_eq!(row_ids, vec![0].into_iter().collect()); + let row_ids = applier_factory( + vec![(1, "Hello"), (3, "你好")], + vec![(2, vec![(false, "world")])], + ) + .await; + assert_eq!(row_ids, Some(rows([3]))); - let row_ids = applier_factory(vec![(1, "world"), (3, "世界")]).await; - assert_eq!(row_ids, vec![].into_iter().collect()); - - let row_ids = applier_factory(vec![(2, "world"), (3, "世界")]).await; - assert_eq!(row_ids, vec![2, 3].into_iter().collect()); + let row_ids = applier_factory(vec![(2, "World")], vec![(1, vec![(false, "World")])]).await; + assert_eq!(row_ids, Some(rows([1, 3]))); } } diff --git a/src/mito2/src/sst/index/puffin_manager.rs b/src/mito2/src/sst/index/puffin_manager.rs index 9f288ecd16..cf4bb185a8 100644 --- a/src/mito2/src/sst/index/puffin_manager.rs +++ b/src/mito2/src/sst/index/puffin_manager.rs @@ -174,12 +174,13 @@ impl PuffinFileAccessor for ObjectStorePuffinFileAccessor { #[cfg(test)] mod tests { + use common_base::range_read::RangeReader; use common_test_util::temp_dir::create_temp_dir; use futures::io::Cursor; use object_store::services::Memory; use puffin::blob_metadata::CompressionCodec; - use puffin::puffin_manager::{DirGuard, PuffinManager, PuffinReader, PuffinWriter, PutOptions}; + use puffin::puffin_manager::{PuffinManager, PuffinReader, PuffinWriter, PutOptions}; use super::*; @@ -229,6 +230,7 @@ mod tests { PutOptions { compression: Some(CompressionCodec::Zstd), }, + Default::default(), ) .await .unwrap(); diff --git a/src/puffin/src/puffin_manager.rs b/src/puffin/src/puffin_manager.rs index 1dfec58f5b..9f287128c1 100644 --- a/src/puffin/src/puffin_manager.rs +++ b/src/puffin/src/puffin_manager.rs @@ -65,7 +65,13 @@ pub trait PuffinWriter { /// Returns the number of bytes written. /// /// The specified `dir` should be accessible from the filesystem. - async fn put_dir(&mut self, key: &str, dir: PathBuf, options: PutOptions) -> Result; + async fn put_dir( + &mut self, + key: &str, + dir: PathBuf, + options: PutOptions, + properties: HashMap, + ) -> Result; /// Sets whether the footer should be LZ4 compressed. fn set_footer_lz4_compressed(&mut self, lz4_compressed: bool); @@ -94,15 +100,15 @@ pub trait PuffinReader { /// Reads a blob from the Puffin file. /// - /// The returned `BlobWithMetadata` is used to access the blob data and its metadata. - /// Users should hold the `BlobWithMetadata` until they are done with the blob data. - async fn blob(&self, key: &str) -> Result>; + /// The returned `GuardWithMetadata` is used to access the blob data and its metadata. + /// Users should hold the `GuardWithMetadata` until they are done with the blob data. + async fn blob(&self, key: &str) -> Result>; /// Reads a directory from the Puffin file. /// - /// The returned `DirGuard` is used to access the directory in the filesystem. - /// The caller is responsible for holding the `DirGuard` until they are done with the directory. - async fn dir(&self, key: &str) -> Result; + /// The returned `GuardWithMetadata` is used to access the directory data and its metadata. + /// Users should hold the `GuardWithMetadata` until they are done with the directory data. + async fn dir(&self, key: &str) -> Result>; } /// `BlobGuard` is provided by the `PuffinReader` to access the blob data. @@ -114,32 +120,41 @@ pub trait BlobGuard { async fn reader(&self) -> Result; } -/// `BlobWithMetadata` provides access to the blob data and its metadata. -pub struct BlobWithMetadata { - blob: B, - metadata: BlobMetadata, -} - -impl BlobWithMetadata { - /// Creates a new `BlobWithMetadata` instance. - pub fn new(blob: B, metadata: BlobMetadata) -> Self { - Self { blob, metadata } - } - - /// Returns the reader for the blob data. - pub async fn reader(&self) -> Result { - self.blob.reader().await - } - - /// Returns the metadata of the blob. - pub fn metadata(&self) -> &BlobMetadata { - &self.metadata - } -} - /// `DirGuard` is provided by the `PuffinReader` to access the directory in the filesystem. /// Users should hold the `DirGuard` until they are done with the directory. #[auto_impl::auto_impl(Arc)] pub trait DirGuard { fn path(&self) -> &PathBuf; } + +/// `GuardWithMetadata` provides access to the blob or directory data and its metadata. +pub struct GuardWithMetadata { + guard: G, + metadata: BlobMetadata, +} + +impl GuardWithMetadata { + /// Creates a new `GuardWithMetadata` instance. + pub fn new(guard: G, metadata: BlobMetadata) -> Self { + Self { guard, metadata } + } + + /// Returns the metadata of the directory. + pub fn metadata(&self) -> &BlobMetadata { + &self.metadata + } +} + +impl GuardWithMetadata { + /// Returns the reader for the blob data. + pub async fn reader(&self) -> Result { + self.guard.reader().await + } +} + +impl GuardWithMetadata { + /// Returns the path of the directory. + pub fn path(&self) -> &PathBuf { + self.guard.path() + } +} diff --git a/src/puffin/src/puffin_manager/fs_puffin_manager/reader.rs b/src/puffin/src/puffin_manager/fs_puffin_manager/reader.rs index 5d2033e2e9..2c616578f6 100644 --- a/src/puffin/src/puffin_manager/fs_puffin_manager/reader.rs +++ b/src/puffin/src/puffin_manager/fs_puffin_manager/reader.rs @@ -36,7 +36,7 @@ use crate::puffin_manager::file_accessor::PuffinFileAccessor; use crate::puffin_manager::fs_puffin_manager::dir_meta::DirMetadata; use crate::puffin_manager::fs_puffin_manager::PuffinMetadataCacheRef; use crate::puffin_manager::stager::{BoxWriter, DirWriterProviderRef, Stager}; -use crate::puffin_manager::{BlobGuard, BlobWithMetadata, PuffinReader}; +use crate::puffin_manager::{BlobGuard, GuardWithMetadata, PuffinReader}; /// `FsPuffinReader` is a `PuffinReader` that provides fs readers for puffin files. pub struct FsPuffinReader @@ -96,26 +96,13 @@ where } async fn metadata(&self) -> Result> { - let reader = self.puffin_file_accessor.reader(&self.handle).await?; - let mut file = PuffinFileReader::new(reader); + let mut file = self.puffin_reader().await?; self.get_puffin_file_metadata(&mut file).await } - async fn blob(&self, key: &str) -> Result> { - let mut reader = self.puffin_file_accessor.reader(&self.handle).await?; - if let Some(file_size_hint) = self.file_size_hint { - reader.with_file_size_hint(file_size_hint); - } - let mut file = PuffinFileReader::new(reader); - - let metadata = self.get_puffin_file_metadata(&mut file).await?; - let blob_metadata = metadata - .blobs - .iter() - .find(|m| m.blob_type == key) - .context(BlobNotFoundSnafu { blob: key })? - .clone(); - + async fn blob(&self, key: &str) -> Result> { + let mut file = self.puffin_reader().await?; + let blob_metadata = self.get_blob_metadata(key, &mut file).await?; let blob = if blob_metadata.compression_codec.is_none() { // If the blob is not compressed, we can directly read it from the puffin file. Either::L(RandomReadBlob { @@ -140,28 +127,33 @@ where Either::R(staged_blob) }; - Ok(BlobWithMetadata::new(blob, blob_metadata)) + Ok(GuardWithMetadata::new(blob, blob_metadata)) } - async fn dir(&self, key: &str) -> Result { - self.stager + async fn dir(&self, key: &str) -> Result> { + let mut file = self.puffin_reader().await?; + let blob_metadata = self.get_blob_metadata(key, &mut file).await?; + let dir = self + .stager .get_dir( &self.handle, key, Box::new(|writer_provider| { let accessor = self.puffin_file_accessor.clone(); let handle = self.handle.clone(); - let key = key.to_string(); + let blob_metadata = blob_metadata.clone(); Box::pin(Self::init_dir_to_stager( + file, + blob_metadata, handle, - key, writer_provider, accessor, - self.file_size_hint, )) }), ) - .await + .await?; + + Ok(GuardWithMetadata::new(dir, blob_metadata)) } } @@ -188,6 +180,30 @@ where Ok(metadata) } + async fn get_blob_metadata( + &self, + key: &str, + file: &mut PuffinFileReader, + ) -> Result { + let metadata = self.get_puffin_file_metadata(file).await?; + let blob_metadata = metadata + .blobs + .iter() + .find(|m| m.blob_type == key) + .context(BlobNotFoundSnafu { blob: key })? + .clone(); + + Ok(blob_metadata) + } + + async fn puffin_reader(&self) -> Result> { + let mut reader = self.puffin_file_accessor.reader(&self.handle).await?; + if let Some(file_size_hint) = self.file_size_hint { + reader.with_file_size_hint(file_size_hint); + } + Ok(PuffinFileReader::new(reader)) + } + async fn init_blob_to_stager( reader: PuffinFileReader, blob_metadata: BlobMetadata, @@ -201,26 +217,14 @@ where } async fn init_dir_to_stager( + mut file: PuffinFileReader, + blob_metadata: BlobMetadata, handle: F::FileHandle, - key: String, writer_provider: DirWriterProviderRef, accessor: F, - file_size_hint: Option, ) -> Result { - let mut reader = accessor.reader(&handle).await?; - if let Some(file_size_hint) = file_size_hint { - reader.with_file_size_hint(file_size_hint); - } - let mut file = PuffinFileReader::new(reader); - let puffin_metadata = file.metadata().await?; - let blob_metadata = puffin_metadata - .blobs - .iter() - .find(|m| m.blob_type == key.as_str()) - .context(BlobNotFoundSnafu { blob: key })?; - - let reader = file.blob_reader(blob_metadata)?; + let reader = file.blob_reader(&blob_metadata)?; let meta = reader.metadata().await.context(MetadataSnafu)?; let buf = reader .read(0..meta.content_length) diff --git a/src/puffin/src/puffin_manager/fs_puffin_manager/writer.rs b/src/puffin/src/puffin_manager/fs_puffin_manager/writer.rs index 61d9df52f0..feb7678756 100644 --- a/src/puffin/src/puffin_manager/fs_puffin_manager/writer.rs +++ b/src/puffin/src/puffin_manager/fs_puffin_manager/writer.rs @@ -88,7 +88,13 @@ where Ok(written_bytes) } - async fn put_dir(&mut self, key: &str, dir_path: PathBuf, options: PutOptions) -> Result { + async fn put_dir( + &mut self, + key: &str, + dir_path: PathBuf, + options: PutOptions, + properties: HashMap, + ) -> Result { ensure!( !self.blob_keys.contains(key), DuplicateBlobSnafu { blob: key } @@ -150,7 +156,7 @@ where blob_type: key.to_string(), compressed_data: encoded.as_slice(), compression_codec: None, - properties: Default::default(), + properties, }; written_bytes += self.puffin_file_writer.add_blob(dir_meta_blob).await?; diff --git a/src/puffin/src/puffin_manager/tests.rs b/src/puffin/src/puffin_manager/tests.rs index e2f32e9498..bd3ec9d5a5 100644 --- a/src/puffin/src/puffin_manager/tests.rs +++ b/src/puffin/src/puffin_manager/tests.rs @@ -23,7 +23,7 @@ use crate::blob_metadata::CompressionCodec; use crate::puffin_manager::file_accessor::MockFileAccessor; use crate::puffin_manager::fs_puffin_manager::FsPuffinManager; use crate::puffin_manager::stager::BoundedStager; -use crate::puffin_manager::{DirGuard, PuffinManager, PuffinReader, PuffinWriter, PutOptions}; +use crate::puffin_manager::{PuffinManager, PuffinReader, PuffinWriter, PutOptions}; async fn new_bounded_stager(prefix: &str, capacity: u64) -> (TempDir, Arc>) { let staging_dir = create_temp_dir(prefix); @@ -343,6 +343,7 @@ async fn put_dir( PutOptions { compression: compression_codec, }, + HashMap::from_iter([("test_key".to_string(), "test_value".to_string())]), ) .await .unwrap(); @@ -356,6 +357,11 @@ async fn check_dir( puffin_reader: &impl PuffinReader, ) { let res_dir = puffin_reader.dir(key).await.unwrap(); + let metadata = res_dir.metadata(); + assert_eq!( + metadata.properties, + HashMap::from_iter([("test_key".to_string(), "test_value".to_string())]) + ); for (file_name, raw_data) in files_in_dir { let file_path = if cfg!(windows) { res_dir.path().join(file_name.replace('/', "\\"))