From 73352939830f2e8b7dcbc3469f5705ba443010e4 Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Tue, 8 Apr 2025 19:19:32 +0800 Subject: [PATCH] feat: add term as fulltext index request (#5843) * feat: add term as fulltext index request Signed-off-by: Zhenchi * fix fmt Signed-off-by: Zhenchi * address comments Signed-off-by: Zhenchi --------- Signed-off-by: Zhenchi --- .../src/sst/index/fulltext_index/applier.rs | 40 +- .../index/fulltext_index/applier/builder.rs | 384 ++++++++++++++++-- .../src/sst/index/fulltext_index/creator.rs | 11 +- 3 files changed, 389 insertions(+), 46 deletions(-) diff --git a/src/mito2/src/sst/index/fulltext_index/applier.rs b/src/mito2/src/sst/index/fulltext_index/applier.rs index c6b773eb47..8de040ae66 100644 --- a/src/mito2/src/sst/index/fulltext_index/applier.rs +++ b/src/mito2/src/sst/index/fulltext_index/applier.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::BTreeSet; +use std::collections::{BTreeSet, HashMap}; use std::sync::Arc; use common_telemetry::warn; @@ -28,6 +28,7 @@ use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey}; use crate::error::{ApplyFulltextIndexSnafu, PuffinBuildReaderSnafu, PuffinReadBlobSnafu, Result}; use crate::metrics::INDEX_APPLY_ELAPSED; use crate::sst::file::FileId; +use crate::sst::index::fulltext_index::applier::builder::FulltextRequest; use crate::sst::index::fulltext_index::INDEX_BLOB_TYPE_TANTIVY; use crate::sst::index::puffin_manager::{PuffinManagerFactory, SstPuffinDir}; use crate::sst::index::TYPE_FULLTEXT_INDEX; @@ -42,8 +43,8 @@ pub struct FulltextIndexApplier { /// The region ID. region_id: RegionId, - /// Queries to apply to the index. - queries: Vec<(ColumnId, String)>, + /// Requests to be applied. + requests: HashMap, /// The puffin manager factory. puffin_manager_factory: PuffinManagerFactory, @@ -66,14 +67,14 @@ impl FulltextIndexApplier { region_dir: String, region_id: RegionId, store: ObjectStore, - queries: Vec<(ColumnId, String)>, + requests: HashMap, puffin_manager_factory: PuffinManagerFactory, ) -> Self { Self { region_dir, region_id, store, - queries, + requests, puffin_manager_factory, file_cache: None, puffin_metadata_cache: None, @@ -108,7 +109,7 @@ impl FulltextIndexApplier { let mut inited = false; let mut row_ids = BTreeSet::new(); - for (column_id, query) in &self.queries { + 'outer: for (column_id, request) in &self.requests { let dir = self .index_dir_path(file_id, *column_id, file_size_hint) .await?; @@ -122,20 +123,23 @@ impl FulltextIndexApplier { let searcher = TantivyFulltextIndexSearcher::new(path).context(ApplyFulltextIndexSnafu)?; - let result = searcher - .search(query) - .await - .context(ApplyFulltextIndexSnafu)?; - if !inited { - row_ids = result; - inited = true; - continue; - } + for query in &request.queries { + let result = searcher + .search(&query.0) + .await + .context(ApplyFulltextIndexSnafu)?; - row_ids.retain(|id| result.contains(id)); - if row_ids.is_empty() { - break; + if !inited { + row_ids = result; + inited = true; + continue; + } + + row_ids.retain(|id| result.contains(id)); + if row_ids.is_empty() { + break 'outer; + } } } diff --git a/src/mito2/src/sst/index/fulltext_index/applier/builder.rs b/src/mito2/src/sst/index/fulltext_index/applier/builder.rs index 5e91bbfac4..a2de41da66 100644 --- a/src/mito2/src/sst/index/fulltext_index/applier/builder.rs +++ b/src/mito2/src/sst/index/fulltext_index/applier/builder.rs @@ -12,8 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; + use datafusion_common::ScalarValue; -use datafusion_expr::Expr; +use datafusion_expr::expr::ScalarFunction; +use datafusion_expr::{BinaryExpr, Expr, Operator}; use object_store::ObjectStore; use puffin::puffin_manager::cache::PuffinMetadataCacheRef; use store_api::metadata::RegionMetadata; @@ -24,6 +27,31 @@ use crate::error::Result; use crate::sst::index::fulltext_index::applier::FulltextIndexApplier; use crate::sst::index::puffin_manager::PuffinManagerFactory; +/// A request for fulltext index. +/// +/// It contains all the queries and terms for a column. +#[derive(Default)] +pub struct FulltextRequest { + pub queries: Vec, + pub terms: Vec, +} + +/// A query to be matched in fulltext index. +/// +/// `query` is the query to be matched, e.g. "+foo -bar" in `SELECT * FROM t WHERE matches(text, "+foo -bar")`. +#[derive(Debug, PartialEq, Eq)] +pub struct FulltextQuery(pub String); + +/// A term to be matched in fulltext index. +/// +/// `term` is the term to be matched, e.g. "foo" in `SELECT * FROM t WHERE matches_term(text, "foo")`. +/// `col_lowered` indicates whether the column is lowercased, e.g. `col_lowered = true` when `matches_term(lower(text), "foo")`. +#[derive(Debug, PartialEq, Eq)] +pub struct FulltextTerm { + pub col_lowered: bool, + pub term: String, +} + /// `FulltextIndexApplierBuilder` is a builder for `FulltextIndexApplier`. pub struct FulltextIndexApplierBuilder<'a> { region_dir: String, @@ -72,19 +100,22 @@ impl<'a> FulltextIndexApplierBuilder<'a> { /// Builds `SstIndexApplier` from the given expressions. pub fn build(self, exprs: &[Expr]) -> Result> { - let mut queries = Vec::with_capacity(exprs.len()); + let mut requests = HashMap::new(); for expr in exprs { - if let Some((column_id, query)) = Self::expr_to_query(self.metadata, expr) { - queries.push((column_id, query)); - } + Self::extract_requests(expr, self.metadata, &mut requests); } - Ok((!queries.is_empty()).then(|| { + // Check if any requests have queries or terms + let has_requests = requests + .iter() + .any(|(_, request)| !request.queries.is_empty() || !request.terms.is_empty()); + + Ok(has_requests.then(|| { FulltextIndexApplier::new( self.region_dir, self.region_id, self.store, - queries, + requests, self.puffin_manager_factory, ) .with_file_cache(self.file_cache) @@ -92,10 +123,35 @@ impl<'a> FulltextIndexApplierBuilder<'a> { })) } - fn expr_to_query(metadata: &RegionMetadata, expr: &Expr) -> Option<(ColumnId, String)> { - let Expr::ScalarFunction(f) = expr else { - return None; - }; + fn extract_requests( + expr: &Expr, + metadata: &'a RegionMetadata, + requests: &mut HashMap, + ) { + match expr { + Expr::BinaryExpr(BinaryExpr { + left, + op: Operator::And, + right, + }) => { + Self::extract_requests(left, metadata, requests); + Self::extract_requests(right, metadata, requests); + } + Expr::ScalarFunction(func) => { + if let Some((column_id, query)) = Self::expr_to_query(metadata, func) { + requests.entry(column_id).or_default().queries.push(query); + } else if let Some((column_id, term)) = Self::expr_to_term(metadata, func) { + requests.entry(column_id).or_default().terms.push(term); + } + } + _ => {} + } + } + + fn expr_to_query( + metadata: &RegionMetadata, + f: &ScalarFunction, + ) -> Option<(ColumnId, FulltextQuery)> { if f.name() != "matches" { return None; } @@ -116,7 +172,70 @@ impl<'a> FulltextIndexApplierBuilder<'a> { return None; }; - Some((column.column_id, query.to_string())) + Some((column.column_id, FulltextQuery(query.to_string()))) + } + + fn expr_to_term( + metadata: &RegionMetadata, + f: &ScalarFunction, + ) -> Option<(ColumnId, FulltextTerm)> { + if f.name() != "matches_term" { + return None; + } + if f.args.len() != 2 { + return None; + } + + let mut lowered = false; + let column; + match &f.args[0] { + Expr::Column(c) => { + column = c; + } + Expr::ScalarFunction(f) => { + let lower_arg = Self::extract_lower_arg(f)?; + lowered = true; + if let Expr::Column(c) = lower_arg { + column = c; + } else { + return None; + } + } + _ => return None, + } + + let column = metadata.column_by_name(&column.name)?; + if column.column_schema.data_type != ConcreteDataType::string_datatype() { + return None; + } + + let Expr::Literal(ScalarValue::Utf8(Some(term))) = &f.args[1] else { + return None; + }; + + Some(( + column.column_id, + FulltextTerm { + col_lowered: lowered, + term: term.to_string(), + }, + )) + } + + fn extract_lower_arg(lower_func: &ScalarFunction) -> Option<&Expr> { + if lower_func.args.len() != 1 { + return None; + } + + if lower_func.name() != "lower" { + return None; + } + + if lower_func.args.len() != 1 { + return None; + } + + Some(&lower_func.args[0]) } } @@ -127,6 +246,7 @@ mod tests { use api::v1::SemanticType; use common_function::function_registry::FUNCTION_REGISTRY; use common_function::scalars::udf::create_udf; + use datafusion::functions::string::lower; use datafusion_common::Column; use datafusion_expr::expr::ScalarFunction; use datafusion_expr::ScalarUDF; @@ -166,11 +286,19 @@ mod tests { )) } + fn matches_term_func() -> Arc { + Arc::new(create_udf( + FUNCTION_REGISTRY.get_function("matches_term").unwrap(), + QueryContext::arc(), + Default::default(), + )) + } + #[test] fn test_expr_to_query_basic() { let metadata = mock_metadata(); - let expr = Expr::ScalarFunction(ScalarFunction { + let func = ScalarFunction { args: vec![ Expr::Column(Column { name: "text".to_string(), @@ -179,34 +307,34 @@ mod tests { Expr::Literal(ScalarValue::Utf8(Some("foo".to_string()))), ], func: matches_func(), - }); + }; let (column_id, query) = - FulltextIndexApplierBuilder::expr_to_query(&metadata, &expr).unwrap(); + FulltextIndexApplierBuilder::expr_to_query(&metadata, &func).unwrap(); assert_eq!(column_id, 1); - assert_eq!(query, "foo".to_string()); + assert_eq!(query, FulltextQuery("foo".to_string())); } #[test] fn test_expr_to_query_wrong_num_args() { let metadata = mock_metadata(); - let expr = Expr::ScalarFunction(ScalarFunction { + let func = ScalarFunction { args: vec![Expr::Column(Column { name: "text".to_string(), relation: None, })], func: matches_func(), - }); + }; - assert!(FulltextIndexApplierBuilder::expr_to_query(&metadata, &expr).is_none()); + assert!(FulltextIndexApplierBuilder::expr_to_query(&metadata, &func).is_none()); } #[test] fn test_expr_to_query_not_found_column() { let metadata = mock_metadata(); - let expr = Expr::ScalarFunction(ScalarFunction { + let func = ScalarFunction { args: vec![ Expr::Column(Column { name: "not_found".to_string(), @@ -215,16 +343,16 @@ mod tests { Expr::Literal(ScalarValue::Utf8(Some("foo".to_string()))), ], func: matches_func(), - }); + }; - assert!(FulltextIndexApplierBuilder::expr_to_query(&metadata, &expr).is_none()); + assert!(FulltextIndexApplierBuilder::expr_to_query(&metadata, &func).is_none()); } #[test] fn test_expr_to_query_column_wrong_data_type() { let metadata = mock_metadata(); - let expr = Expr::ScalarFunction(ScalarFunction { + let func = ScalarFunction { args: vec![ Expr::Column(Column { name: "ts".to_string(), @@ -233,16 +361,16 @@ mod tests { Expr::Literal(ScalarValue::Utf8(Some("foo".to_string()))), ], func: matches_func(), - }); + }; - assert!(FulltextIndexApplierBuilder::expr_to_query(&metadata, &expr).is_none()); + assert!(FulltextIndexApplierBuilder::expr_to_query(&metadata, &func).is_none()); } #[test] fn test_expr_to_query_pattern_not_string() { let metadata = mock_metadata(); - let expr = Expr::ScalarFunction(ScalarFunction { + let func = ScalarFunction { args: vec![ Expr::Column(Column { name: "text".to_string(), @@ -251,8 +379,210 @@ mod tests { Expr::Literal(ScalarValue::Int64(Some(42))), ], func: matches_func(), + }; + + assert!(FulltextIndexApplierBuilder::expr_to_query(&metadata, &func).is_none()); + } + + #[test] + fn test_expr_to_term_basic() { + let metadata = mock_metadata(); + + let func = ScalarFunction { + args: vec![ + Expr::Column(Column { + name: "text".to_string(), + relation: None, + }), + Expr::Literal(ScalarValue::Utf8(Some("foo".to_string()))), + ], + func: matches_term_func(), + }; + + let (column_id, term) = + FulltextIndexApplierBuilder::expr_to_term(&metadata, &func).unwrap(); + assert_eq!(column_id, 1); + assert_eq!( + term, + FulltextTerm { + col_lowered: false, + term: "foo".to_string(), + } + ); + } + + #[test] + fn test_expr_to_term_with_lower() { + let metadata = mock_metadata(); + + let lower_func_expr = ScalarFunction { + args: vec![Expr::Column(Column { + name: "text".to_string(), + relation: None, + })], + func: lower(), + }; + + let func = ScalarFunction { + args: vec![ + Expr::ScalarFunction(lower_func_expr), + Expr::Literal(ScalarValue::Utf8(Some("foo".to_string()))), + ], + func: matches_term_func(), + }; + + let (column_id, term) = + FulltextIndexApplierBuilder::expr_to_term(&metadata, &func).unwrap(); + assert_eq!(column_id, 1); + assert_eq!( + term, + FulltextTerm { + col_lowered: true, + term: "foo".to_string(), + } + ); + } + + #[test] + fn test_expr_to_term_wrong_num_args() { + let metadata = mock_metadata(); + + let func = ScalarFunction { + args: vec![Expr::Column(Column { + name: "text".to_string(), + relation: None, + })], + func: matches_term_func(), + }; + + assert!(FulltextIndexApplierBuilder::expr_to_term(&metadata, &func).is_none()); + } + + #[test] + fn test_expr_to_term_wrong_function_name() { + let metadata = mock_metadata(); + + let func = ScalarFunction { + args: vec![ + Expr::Column(Column { + name: "text".to_string(), + relation: None, + }), + Expr::Literal(ScalarValue::Utf8(Some("foo".to_string()))), + ], + func: matches_func(), // Using 'matches' instead of 'matches_term' + }; + + assert!(FulltextIndexApplierBuilder::expr_to_term(&metadata, &func).is_none()); + } + + #[test] + fn test_extract_lower_arg() { + let func = ScalarFunction { + args: vec![Expr::Column(Column { + name: "text".to_string(), + relation: None, + })], + func: lower(), + }; + + let arg = FulltextIndexApplierBuilder::extract_lower_arg(&func).unwrap(); + match arg { + Expr::Column(c) => { + assert_eq!(c.name, "text"); + } + _ => panic!("Expected Column expression"), + } + } + + #[test] + fn test_extract_lower_arg_wrong_function() { + let func = ScalarFunction { + args: vec![Expr::Column(Column { + name: "text".to_string(), + relation: None, + })], + func: matches_func(), // Not 'lower' + }; + + assert!(FulltextIndexApplierBuilder::extract_lower_arg(&func).is_none()); + } + + #[test] + fn test_extract_requests() { + let metadata = mock_metadata(); + + // Create a matches expression + let matches_expr = Expr::ScalarFunction(ScalarFunction { + args: vec![ + Expr::Column(Column { + name: "text".to_string(), + relation: None, + }), + Expr::Literal(ScalarValue::Utf8(Some("foo".to_string()))), + ], + func: matches_func(), }); - assert!(FulltextIndexApplierBuilder::expr_to_query(&metadata, &expr).is_none()); + let mut requests = HashMap::new(); + FulltextIndexApplierBuilder::extract_requests(&matches_expr, &metadata, &mut requests); + + assert_eq!(requests.len(), 1); + let request = requests.get(&1).unwrap(); + assert_eq!(request.queries.len(), 1); + assert_eq!(request.terms.len(), 0); + assert_eq!(request.queries[0], FulltextQuery("foo".to_string())); + } + + #[test] + fn test_extract_multiple_requests() { + let metadata = mock_metadata(); + + // Create a matches expression + let matches_expr = Expr::ScalarFunction(ScalarFunction { + args: vec![ + Expr::Column(Column { + name: "text".to_string(), + relation: None, + }), + Expr::Literal(ScalarValue::Utf8(Some("foo".to_string()))), + ], + func: matches_func(), + }); + + // Create a matches_term expression + let matches_term_expr = Expr::ScalarFunction(ScalarFunction { + args: vec![ + Expr::Column(Column { + name: "text".to_string(), + relation: None, + }), + Expr::Literal(ScalarValue::Utf8(Some("bar".to_string()))), + ], + func: matches_term_func(), + }); + + // Create a binary expression combining both + let binary_expr = Expr::BinaryExpr(BinaryExpr { + left: Box::new(matches_expr), + op: Operator::And, + right: Box::new(matches_term_expr), + }); + + let mut requests = HashMap::new(); + FulltextIndexApplierBuilder::extract_requests(&binary_expr, &metadata, &mut requests); + + assert_eq!(requests.len(), 1); + let request = requests.get(&1).unwrap(); + assert_eq!(request.queries.len(), 1); + assert_eq!(request.terms.len(), 1); + assert_eq!(request.queries[0], FulltextQuery("foo".to_string())); + assert_eq!( + request.terms[0], + FulltextTerm { + col_lowered: false, + term: "bar".to_string(), + } + ); } } diff --git a/src/mito2/src/sst/index/fulltext_index/creator.rs b/src/mito2/src/sst/index/fulltext_index/creator.rs index bd35770eef..2d71892874 100644 --- a/src/mito2/src/sst/index/fulltext_index/creator.rs +++ b/src/mito2/src/sst/index/fulltext_index/creator.rs @@ -376,6 +376,7 @@ mod tests { use crate::access_layer::RegionFilePathFactory; use crate::read::{Batch, BatchColumn}; use crate::sst::file::FileId; + use crate::sst::index::fulltext_index::applier::builder::{FulltextQuery, FulltextRequest}; use crate::sst::index::fulltext_index::applier::FulltextIndexApplier; use crate::sst::index::puffin_manager::PuffinManagerFactory; @@ -556,7 +557,15 @@ mod tests { object_store.clone(), queries .into_iter() - .map(|(a, b)| (a, b.to_string())) + .map(|(a, b)| { + ( + a, + FulltextRequest { + queries: vec![FulltextQuery(b.to_string())], + terms: vec![], + }, + ) + }) .collect(), factory.clone(), );