feat: add term as fulltext index request (#5843)

* feat: add term as fulltext index request

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix fmt

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* address comments

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
This commit is contained in:
Zhenchi
2025-04-08 19:19:32 +08:00
committed by GitHub
parent 609e228852
commit 7335293983
3 changed files with 389 additions and 46 deletions

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::BTreeSet;
use std::collections::{BTreeSet, HashMap};
use std::sync::Arc;
use common_telemetry::warn;
@@ -28,6 +28,7 @@ use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
use crate::error::{ApplyFulltextIndexSnafu, PuffinBuildReaderSnafu, PuffinReadBlobSnafu, Result};
use crate::metrics::INDEX_APPLY_ELAPSED;
use crate::sst::file::FileId;
use crate::sst::index::fulltext_index::applier::builder::FulltextRequest;
use crate::sst::index::fulltext_index::INDEX_BLOB_TYPE_TANTIVY;
use crate::sst::index::puffin_manager::{PuffinManagerFactory, SstPuffinDir};
use crate::sst::index::TYPE_FULLTEXT_INDEX;
@@ -42,8 +43,8 @@ pub struct FulltextIndexApplier {
/// The region ID.
region_id: RegionId,
/// Queries to apply to the index.
queries: Vec<(ColumnId, String)>,
/// Requests to be applied.
requests: HashMap<ColumnId, FulltextRequest>,
/// The puffin manager factory.
puffin_manager_factory: PuffinManagerFactory,
@@ -66,14 +67,14 @@ impl FulltextIndexApplier {
region_dir: String,
region_id: RegionId,
store: ObjectStore,
queries: Vec<(ColumnId, String)>,
requests: HashMap<ColumnId, FulltextRequest>,
puffin_manager_factory: PuffinManagerFactory,
) -> Self {
Self {
region_dir,
region_id,
store,
queries,
requests,
puffin_manager_factory,
file_cache: None,
puffin_metadata_cache: None,
@@ -108,7 +109,7 @@ impl FulltextIndexApplier {
let mut inited = false;
let mut row_ids = BTreeSet::new();
for (column_id, query) in &self.queries {
'outer: for (column_id, request) in &self.requests {
let dir = self
.index_dir_path(file_id, *column_id, file_size_hint)
.await?;
@@ -122,20 +123,23 @@ impl FulltextIndexApplier {
let searcher =
TantivyFulltextIndexSearcher::new(path).context(ApplyFulltextIndexSnafu)?;
let result = searcher
.search(query)
.await
.context(ApplyFulltextIndexSnafu)?;
if !inited {
row_ids = result;
inited = true;
continue;
}
for query in &request.queries {
let result = searcher
.search(&query.0)
.await
.context(ApplyFulltextIndexSnafu)?;
row_ids.retain(|id| result.contains(id));
if row_ids.is_empty() {
break;
if !inited {
row_ids = result;
inited = true;
continue;
}
row_ids.retain(|id| result.contains(id));
if row_ids.is_empty() {
break 'outer;
}
}
}

View File

@@ -12,8 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use datafusion_common::ScalarValue;
use datafusion_expr::Expr;
use datafusion_expr::expr::ScalarFunction;
use datafusion_expr::{BinaryExpr, Expr, Operator};
use object_store::ObjectStore;
use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
use store_api::metadata::RegionMetadata;
@@ -24,6 +27,31 @@ use crate::error::Result;
use crate::sst::index::fulltext_index::applier::FulltextIndexApplier;
use crate::sst::index::puffin_manager::PuffinManagerFactory;
/// A request for fulltext index.
///
/// It contains all the queries and terms for a column.
#[derive(Default)]
pub struct FulltextRequest {
pub queries: Vec<FulltextQuery>,
pub terms: Vec<FulltextTerm>,
}
/// A query to be matched in fulltext index.
///
/// `query` is the query to be matched, e.g. "+foo -bar" in `SELECT * FROM t WHERE matches(text, "+foo -bar")`.
#[derive(Debug, PartialEq, Eq)]
pub struct FulltextQuery(pub String);
/// A term to be matched in fulltext index.
///
/// `term` is the term to be matched, e.g. "foo" in `SELECT * FROM t WHERE matches_term(text, "foo")`.
/// `col_lowered` indicates whether the column is lowercased, e.g. `col_lowered = true` when `matches_term(lower(text), "foo")`.
#[derive(Debug, PartialEq, Eq)]
pub struct FulltextTerm {
pub col_lowered: bool,
pub term: String,
}
/// `FulltextIndexApplierBuilder` is a builder for `FulltextIndexApplier`.
pub struct FulltextIndexApplierBuilder<'a> {
region_dir: String,
@@ -72,19 +100,22 @@ impl<'a> FulltextIndexApplierBuilder<'a> {
/// Builds `SstIndexApplier` from the given expressions.
pub fn build(self, exprs: &[Expr]) -> Result<Option<FulltextIndexApplier>> {
let mut queries = Vec::with_capacity(exprs.len());
let mut requests = HashMap::new();
for expr in exprs {
if let Some((column_id, query)) = Self::expr_to_query(self.metadata, expr) {
queries.push((column_id, query));
}
Self::extract_requests(expr, self.metadata, &mut requests);
}
Ok((!queries.is_empty()).then(|| {
// Check if any requests have queries or terms
let has_requests = requests
.iter()
.any(|(_, request)| !request.queries.is_empty() || !request.terms.is_empty());
Ok(has_requests.then(|| {
FulltextIndexApplier::new(
self.region_dir,
self.region_id,
self.store,
queries,
requests,
self.puffin_manager_factory,
)
.with_file_cache(self.file_cache)
@@ -92,10 +123,35 @@ impl<'a> FulltextIndexApplierBuilder<'a> {
}))
}
fn expr_to_query(metadata: &RegionMetadata, expr: &Expr) -> Option<(ColumnId, String)> {
let Expr::ScalarFunction(f) = expr else {
return None;
};
fn extract_requests(
expr: &Expr,
metadata: &'a RegionMetadata,
requests: &mut HashMap<ColumnId, FulltextRequest>,
) {
match expr {
Expr::BinaryExpr(BinaryExpr {
left,
op: Operator::And,
right,
}) => {
Self::extract_requests(left, metadata, requests);
Self::extract_requests(right, metadata, requests);
}
Expr::ScalarFunction(func) => {
if let Some((column_id, query)) = Self::expr_to_query(metadata, func) {
requests.entry(column_id).or_default().queries.push(query);
} else if let Some((column_id, term)) = Self::expr_to_term(metadata, func) {
requests.entry(column_id).or_default().terms.push(term);
}
}
_ => {}
}
}
fn expr_to_query(
metadata: &RegionMetadata,
f: &ScalarFunction,
) -> Option<(ColumnId, FulltextQuery)> {
if f.name() != "matches" {
return None;
}
@@ -116,7 +172,70 @@ impl<'a> FulltextIndexApplierBuilder<'a> {
return None;
};
Some((column.column_id, query.to_string()))
Some((column.column_id, FulltextQuery(query.to_string())))
}
fn expr_to_term(
metadata: &RegionMetadata,
f: &ScalarFunction,
) -> Option<(ColumnId, FulltextTerm)> {
if f.name() != "matches_term" {
return None;
}
if f.args.len() != 2 {
return None;
}
let mut lowered = false;
let column;
match &f.args[0] {
Expr::Column(c) => {
column = c;
}
Expr::ScalarFunction(f) => {
let lower_arg = Self::extract_lower_arg(f)?;
lowered = true;
if let Expr::Column(c) = lower_arg {
column = c;
} else {
return None;
}
}
_ => return None,
}
let column = metadata.column_by_name(&column.name)?;
if column.column_schema.data_type != ConcreteDataType::string_datatype() {
return None;
}
let Expr::Literal(ScalarValue::Utf8(Some(term))) = &f.args[1] else {
return None;
};
Some((
column.column_id,
FulltextTerm {
col_lowered: lowered,
term: term.to_string(),
},
))
}
fn extract_lower_arg(lower_func: &ScalarFunction) -> Option<&Expr> {
if lower_func.args.len() != 1 {
return None;
}
if lower_func.name() != "lower" {
return None;
}
if lower_func.args.len() != 1 {
return None;
}
Some(&lower_func.args[0])
}
}
@@ -127,6 +246,7 @@ mod tests {
use api::v1::SemanticType;
use common_function::function_registry::FUNCTION_REGISTRY;
use common_function::scalars::udf::create_udf;
use datafusion::functions::string::lower;
use datafusion_common::Column;
use datafusion_expr::expr::ScalarFunction;
use datafusion_expr::ScalarUDF;
@@ -166,11 +286,19 @@ mod tests {
))
}
fn matches_term_func() -> Arc<ScalarUDF> {
Arc::new(create_udf(
FUNCTION_REGISTRY.get_function("matches_term").unwrap(),
QueryContext::arc(),
Default::default(),
))
}
#[test]
fn test_expr_to_query_basic() {
let metadata = mock_metadata();
let expr = Expr::ScalarFunction(ScalarFunction {
let func = ScalarFunction {
args: vec![
Expr::Column(Column {
name: "text".to_string(),
@@ -179,34 +307,34 @@ mod tests {
Expr::Literal(ScalarValue::Utf8(Some("foo".to_string()))),
],
func: matches_func(),
});
};
let (column_id, query) =
FulltextIndexApplierBuilder::expr_to_query(&metadata, &expr).unwrap();
FulltextIndexApplierBuilder::expr_to_query(&metadata, &func).unwrap();
assert_eq!(column_id, 1);
assert_eq!(query, "foo".to_string());
assert_eq!(query, FulltextQuery("foo".to_string()));
}
#[test]
fn test_expr_to_query_wrong_num_args() {
let metadata = mock_metadata();
let expr = Expr::ScalarFunction(ScalarFunction {
let func = ScalarFunction {
args: vec![Expr::Column(Column {
name: "text".to_string(),
relation: None,
})],
func: matches_func(),
});
};
assert!(FulltextIndexApplierBuilder::expr_to_query(&metadata, &expr).is_none());
assert!(FulltextIndexApplierBuilder::expr_to_query(&metadata, &func).is_none());
}
#[test]
fn test_expr_to_query_not_found_column() {
let metadata = mock_metadata();
let expr = Expr::ScalarFunction(ScalarFunction {
let func = ScalarFunction {
args: vec![
Expr::Column(Column {
name: "not_found".to_string(),
@@ -215,16 +343,16 @@ mod tests {
Expr::Literal(ScalarValue::Utf8(Some("foo".to_string()))),
],
func: matches_func(),
});
};
assert!(FulltextIndexApplierBuilder::expr_to_query(&metadata, &expr).is_none());
assert!(FulltextIndexApplierBuilder::expr_to_query(&metadata, &func).is_none());
}
#[test]
fn test_expr_to_query_column_wrong_data_type() {
let metadata = mock_metadata();
let expr = Expr::ScalarFunction(ScalarFunction {
let func = ScalarFunction {
args: vec![
Expr::Column(Column {
name: "ts".to_string(),
@@ -233,16 +361,16 @@ mod tests {
Expr::Literal(ScalarValue::Utf8(Some("foo".to_string()))),
],
func: matches_func(),
});
};
assert!(FulltextIndexApplierBuilder::expr_to_query(&metadata, &expr).is_none());
assert!(FulltextIndexApplierBuilder::expr_to_query(&metadata, &func).is_none());
}
#[test]
fn test_expr_to_query_pattern_not_string() {
let metadata = mock_metadata();
let expr = Expr::ScalarFunction(ScalarFunction {
let func = ScalarFunction {
args: vec![
Expr::Column(Column {
name: "text".to_string(),
@@ -251,8 +379,210 @@ mod tests {
Expr::Literal(ScalarValue::Int64(Some(42))),
],
func: matches_func(),
};
assert!(FulltextIndexApplierBuilder::expr_to_query(&metadata, &func).is_none());
}
#[test]
fn test_expr_to_term_basic() {
let metadata = mock_metadata();
let func = ScalarFunction {
args: vec![
Expr::Column(Column {
name: "text".to_string(),
relation: None,
}),
Expr::Literal(ScalarValue::Utf8(Some("foo".to_string()))),
],
func: matches_term_func(),
};
let (column_id, term) =
FulltextIndexApplierBuilder::expr_to_term(&metadata, &func).unwrap();
assert_eq!(column_id, 1);
assert_eq!(
term,
FulltextTerm {
col_lowered: false,
term: "foo".to_string(),
}
);
}
#[test]
fn test_expr_to_term_with_lower() {
let metadata = mock_metadata();
let lower_func_expr = ScalarFunction {
args: vec![Expr::Column(Column {
name: "text".to_string(),
relation: None,
})],
func: lower(),
};
let func = ScalarFunction {
args: vec![
Expr::ScalarFunction(lower_func_expr),
Expr::Literal(ScalarValue::Utf8(Some("foo".to_string()))),
],
func: matches_term_func(),
};
let (column_id, term) =
FulltextIndexApplierBuilder::expr_to_term(&metadata, &func).unwrap();
assert_eq!(column_id, 1);
assert_eq!(
term,
FulltextTerm {
col_lowered: true,
term: "foo".to_string(),
}
);
}
#[test]
fn test_expr_to_term_wrong_num_args() {
let metadata = mock_metadata();
let func = ScalarFunction {
args: vec![Expr::Column(Column {
name: "text".to_string(),
relation: None,
})],
func: matches_term_func(),
};
assert!(FulltextIndexApplierBuilder::expr_to_term(&metadata, &func).is_none());
}
#[test]
fn test_expr_to_term_wrong_function_name() {
let metadata = mock_metadata();
let func = ScalarFunction {
args: vec![
Expr::Column(Column {
name: "text".to_string(),
relation: None,
}),
Expr::Literal(ScalarValue::Utf8(Some("foo".to_string()))),
],
func: matches_func(), // Using 'matches' instead of 'matches_term'
};
assert!(FulltextIndexApplierBuilder::expr_to_term(&metadata, &func).is_none());
}
#[test]
fn test_extract_lower_arg() {
let func = ScalarFunction {
args: vec![Expr::Column(Column {
name: "text".to_string(),
relation: None,
})],
func: lower(),
};
let arg = FulltextIndexApplierBuilder::extract_lower_arg(&func).unwrap();
match arg {
Expr::Column(c) => {
assert_eq!(c.name, "text");
}
_ => panic!("Expected Column expression"),
}
}
#[test]
fn test_extract_lower_arg_wrong_function() {
let func = ScalarFunction {
args: vec![Expr::Column(Column {
name: "text".to_string(),
relation: None,
})],
func: matches_func(), // Not 'lower'
};
assert!(FulltextIndexApplierBuilder::extract_lower_arg(&func).is_none());
}
#[test]
fn test_extract_requests() {
let metadata = mock_metadata();
// Create a matches expression
let matches_expr = Expr::ScalarFunction(ScalarFunction {
args: vec![
Expr::Column(Column {
name: "text".to_string(),
relation: None,
}),
Expr::Literal(ScalarValue::Utf8(Some("foo".to_string()))),
],
func: matches_func(),
});
assert!(FulltextIndexApplierBuilder::expr_to_query(&metadata, &expr).is_none());
let mut requests = HashMap::new();
FulltextIndexApplierBuilder::extract_requests(&matches_expr, &metadata, &mut requests);
assert_eq!(requests.len(), 1);
let request = requests.get(&1).unwrap();
assert_eq!(request.queries.len(), 1);
assert_eq!(request.terms.len(), 0);
assert_eq!(request.queries[0], FulltextQuery("foo".to_string()));
}
#[test]
fn test_extract_multiple_requests() {
let metadata = mock_metadata();
// Create a matches expression
let matches_expr = Expr::ScalarFunction(ScalarFunction {
args: vec![
Expr::Column(Column {
name: "text".to_string(),
relation: None,
}),
Expr::Literal(ScalarValue::Utf8(Some("foo".to_string()))),
],
func: matches_func(),
});
// Create a matches_term expression
let matches_term_expr = Expr::ScalarFunction(ScalarFunction {
args: vec![
Expr::Column(Column {
name: "text".to_string(),
relation: None,
}),
Expr::Literal(ScalarValue::Utf8(Some("bar".to_string()))),
],
func: matches_term_func(),
});
// Create a binary expression combining both
let binary_expr = Expr::BinaryExpr(BinaryExpr {
left: Box::new(matches_expr),
op: Operator::And,
right: Box::new(matches_term_expr),
});
let mut requests = HashMap::new();
FulltextIndexApplierBuilder::extract_requests(&binary_expr, &metadata, &mut requests);
assert_eq!(requests.len(), 1);
let request = requests.get(&1).unwrap();
assert_eq!(request.queries.len(), 1);
assert_eq!(request.terms.len(), 1);
assert_eq!(request.queries[0], FulltextQuery("foo".to_string()));
assert_eq!(
request.terms[0],
FulltextTerm {
col_lowered: false,
term: "bar".to_string(),
}
);
}
}

View File

@@ -376,6 +376,7 @@ mod tests {
use crate::access_layer::RegionFilePathFactory;
use crate::read::{Batch, BatchColumn};
use crate::sst::file::FileId;
use crate::sst::index::fulltext_index::applier::builder::{FulltextQuery, FulltextRequest};
use crate::sst::index::fulltext_index::applier::FulltextIndexApplier;
use crate::sst::index::puffin_manager::PuffinManagerFactory;
@@ -556,7 +557,15 @@ mod tests {
object_store.clone(),
queries
.into_iter()
.map(|(a, b)| (a, b.to_string()))
.map(|(a, b)| {
(
a,
FulltextRequest {
queries: vec![FulltextQuery(b.to_string())],
terms: vec![],
},
)
})
.collect(),
factory.clone(),
);