feat(inverted_index): Add applier builder to convert Expr to Predicates (Part 2) (#3068)

* feat(inverted_index.integration): Add applier builder to convert Expr to Predicates (Part 1)

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* feat(inverted_index.integration): Add applier builder to convert Expr to Predicates (Part 2)

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* test: add comparison unit tests

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* test: add eq_list unit tests

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* test: add in_list unit tests

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* test: add and unit tests

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* test: strip tests

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: address comments

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
This commit is contained in:
Zhenchi
2024-01-03 13:14:40 +08:00
committed by GitHub
parent 2e686fe053
commit b9302e4f0d
5 changed files with 947 additions and 31 deletions

View File

@@ -13,13 +13,10 @@
// limitations under the License.
mod between;
// TODO(zhongzc): This PR is too large. The following modules are coming soon.
// mod comparison;
// mod eq_list;
// mod in_list;
// mod regex_match;
mod comparison;
mod eq_list;
mod in_list;
mod regex_match;
use std::collections::HashMap;
@@ -27,7 +24,7 @@ use api::v1::SemanticType;
use common_query::logical_plan::Expr;
use common_telemetry::warn;
use datafusion_common::ScalarValue;
use datafusion_expr::Expr as DfExpr;
use datafusion_expr::{BinaryExpr, Expr as DfExpr, Operator};
use datatypes::data_type::ConcreteDataType;
use datatypes::value::Value;
use index::inverted_index::search::index_apply::PredicatesIndexApplier;
@@ -99,23 +96,21 @@ impl<'a> SstIndexApplierBuilder<'a> {
let res = match expr {
DfExpr::Between(between) => self.collect_between(between),
// TODO(zhongzc): This PR is too large. The following arms are coming soon.
// DfExpr::InList(in_list) => self.collect_inlist(in_list),
// DfExpr::BinaryExpr(BinaryExpr { left, op, right }) => match op {
// Operator::And => {
// self.traverse_and_collect(left);
// self.traverse_and_collect(right);
// Ok(())
// }
// Operator::Or => self.collect_or_eq_list(left, right),
// Operator::Eq => self.collect_eq(left, right),
// Operator::Lt | Operator::LtEq | Operator::Gt | Operator::GtEq => {
// self.collect_comparison_expr(left, op, right)
// }
// Operator::RegexMatch => self.collect_regex_match(left, right),
// _ => Ok(()),
// },
DfExpr::InList(in_list) => self.collect_inlist(in_list),
DfExpr::BinaryExpr(BinaryExpr { left, op, right }) => match op {
Operator::And => {
self.traverse_and_collect(left);
self.traverse_and_collect(right);
Ok(())
}
Operator::Or => self.collect_or_eq_list(left, right),
Operator::Eq => self.collect_eq(left, right),
Operator::Lt | Operator::LtEq | Operator::Gt | Operator::GtEq => {
self.collect_comparison_expr(left, op, right)
}
Operator::RegexMatch => self.collect_regex_match(left, right),
_ => Ok(()),
},
// TODO(zhongzc): support more expressions, e.g. IsNull, IsNotNull, ...
_ => Ok(()),
@@ -180,8 +175,12 @@ impl<'a> SstIndexApplierBuilder<'a> {
mod tests {
use api::v1::SemanticType;
use datafusion_common::Column;
use datafusion_expr::Between;
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use index::inverted_index::search::predicate::{
Bound, Range, RangePredicate, RegexMatchPredicate,
};
use object_store::services::Memory;
use object_store::ObjectStore;
use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
@@ -198,20 +197,25 @@ mod tests {
column_id: 1,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("b", ConcreteDataType::string_datatype(), false),
semantic_type: SemanticType::Field,
column_schema: ColumnSchema::new("b", ConcreteDataType::int64_datatype(), false),
semantic_type: SemanticType::Tag,
column_id: 2,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("c", ConcreteDataType::string_datatype(), false),
semantic_type: SemanticType::Field,
column_id: 3,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"c",
"d",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
semantic_type: SemanticType::Timestamp,
column_id: 3,
column_id: 4,
})
.primary_key(vec![1]);
.primary_key(vec![1, 2]);
builder.build().unwrap()
}
@@ -226,13 +230,20 @@ mod tests {
})
}
pub(crate) fn field_column() -> DfExpr {
pub(crate) fn tag_column2() -> DfExpr {
DfExpr::Column(Column {
relation: None,
name: "b".to_string(),
})
}
pub(crate) fn field_column() -> DfExpr {
DfExpr::Column(Column {
relation: None,
name: "c".to_string(),
})
}
pub(crate) fn nonexistent_column() -> DfExpr {
DfExpr::Column(Column {
relation: None,
@@ -258,4 +269,64 @@ mod tests {
.unwrap();
bytes
}
pub(crate) fn encoded_int64(s: impl Into<i64>) -> Vec<u8> {
let mut bytes = vec![];
IndexValueCodec::encode_value(
Value::from(s.into()).as_value_ref(),
&SortField::new(ConcreteDataType::int64_datatype()),
&mut bytes,
)
.unwrap();
bytes
}
#[test]
fn test_collect_and_basic() {
let metadata = test_region_metadata();
let mut builder =
SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata);
let expr = DfExpr::BinaryExpr(BinaryExpr {
left: Box::new(DfExpr::BinaryExpr(BinaryExpr {
left: Box::new(tag_column()),
op: Operator::RegexMatch,
right: Box::new(string_lit("bar")),
})),
op: Operator::And,
right: Box::new(DfExpr::Between(Between {
expr: Box::new(tag_column2()),
negated: false,
low: Box::new(int64_lit(123)),
high: Box::new(int64_lit(456)),
})),
});
builder.traverse_and_collect(&expr);
let predicates = builder.output.get("a").unwrap();
assert_eq!(predicates.len(), 1);
assert_eq!(
predicates[0],
Predicate::RegexMatch(RegexMatchPredicate {
pattern: "bar".to_string()
})
);
let predicates = builder.output.get("b").unwrap();
assert_eq!(predicates.len(), 1);
assert_eq!(
predicates[0],
Predicate::Range(RangePredicate {
range: Range {
lower: Some(Bound {
inclusive: true,
value: encoded_int64(123),
}),
upper: Some(Bound {
inclusive: true,
value: encoded_int64(456),
}),
}
})
);
}
}

View File

@@ -0,0 +1,280 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use datafusion_expr::{Expr as DfExpr, Operator};
use index::inverted_index::search::predicate::{Bound, Predicate, Range, RangePredicate};
use index::inverted_index::Bytes;
use crate::error::Result;
use crate::sst::index::applier::builder::SstIndexApplierBuilder;
impl<'a> SstIndexApplierBuilder<'a> {
/// Collects a comparison expression in the form of
/// `column < lit`, `column > lit`, `column <= lit`, `column >= lit`,
/// `lit < column`, `lit > column`, `lit <= column`, `lit >= column`.
pub(crate) fn collect_comparison_expr(
&mut self,
left: &DfExpr,
op: &Operator,
right: &DfExpr,
) -> Result<()> {
match op {
Operator::Lt => {
if matches!(right, DfExpr::Column(_)) {
self.collect_column_gt_lit(right, left)
} else {
self.collect_column_lt_lit(left, right)
}
}
Operator::LtEq => {
if matches!(right, DfExpr::Column(_)) {
self.collect_column_ge_lit(right, left)
} else {
self.collect_column_le_lit(left, right)
}
}
Operator::Gt => {
if matches!(right, DfExpr::Column(_)) {
self.collect_column_lt_lit(right, left)
} else {
self.collect_column_gt_lit(left, right)
}
}
Operator::GtEq => {
if matches!(right, DfExpr::Column(_)) {
self.collect_column_le_lit(right, left)
} else {
self.collect_column_ge_lit(left, right)
}
}
_ => Ok(()),
}
}
fn collect_column_lt_lit(&mut self, left: &DfExpr, right: &DfExpr) -> Result<()> {
self.collect_column_cmp_lit(left, right, |value| Range {
lower: None,
upper: Some(Bound {
inclusive: false,
value,
}),
})
}
fn collect_column_gt_lit(&mut self, left: &DfExpr, right: &DfExpr) -> Result<()> {
self.collect_column_cmp_lit(left, right, |value| Range {
lower: Some(Bound {
inclusive: false,
value,
}),
upper: None,
})
}
fn collect_column_le_lit(&mut self, left: &DfExpr, right: &DfExpr) -> Result<()> {
self.collect_column_cmp_lit(left, right, |value| Range {
lower: None,
upper: Some(Bound {
inclusive: true,
value,
}),
})
}
fn collect_column_ge_lit(&mut self, left: &DfExpr, right: &DfExpr) -> Result<()> {
self.collect_column_cmp_lit(left, right, |value| Range {
lower: Some(Bound {
inclusive: true,
value,
}),
upper: None,
})
}
fn collect_column_cmp_lit(
&mut self,
column: &DfExpr,
literal: &DfExpr,
range_builder: impl FnOnce(Bytes) -> Range,
) -> Result<()> {
let Some(column_name) = Self::column_name(column) else {
return Ok(());
};
let Some(lit) = Self::nonnull_lit(literal) else {
return Ok(());
};
let Some(data_type) = self.tag_column_type(column_name)? else {
return Ok(());
};
let predicate = Predicate::Range(RangePredicate {
range: range_builder(Self::encode_lit(lit, data_type)?),
});
self.add_predicate(column_name, predicate);
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::error::Error;
use crate::sst::index::applier::builder::tests::{
encoded_string, field_column, int64_lit, nonexistent_column, string_lit, tag_column,
test_object_store, test_region_metadata,
};
#[test]
fn test_collect_comparison_basic() {
let cases = [
(
(&tag_column(), &Operator::Lt, &string_lit("123")),
Range {
lower: None,
upper: Some(Bound {
inclusive: false,
value: encoded_string("123"),
}),
},
),
(
(&string_lit("123"), &Operator::Lt, &tag_column()),
Range {
lower: Some(Bound {
inclusive: false,
value: encoded_string("123"),
}),
upper: None,
},
),
(
(&tag_column(), &Operator::LtEq, &string_lit("123")),
Range {
lower: None,
upper: Some(Bound {
inclusive: true,
value: encoded_string("123"),
}),
},
),
(
(&string_lit("123"), &Operator::LtEq, &tag_column()),
Range {
lower: Some(Bound {
inclusive: true,
value: encoded_string("123"),
}),
upper: None,
},
),
(
(&tag_column(), &Operator::Gt, &string_lit("123")),
Range {
lower: Some(Bound {
inclusive: false,
value: encoded_string("123"),
}),
upper: None,
},
),
(
(&string_lit("123"), &Operator::Gt, &tag_column()),
Range {
lower: None,
upper: Some(Bound {
inclusive: false,
value: encoded_string("123"),
}),
},
),
(
(&tag_column(), &Operator::GtEq, &string_lit("123")),
Range {
lower: Some(Bound {
inclusive: true,
value: encoded_string("123"),
}),
upper: None,
},
),
(
(&string_lit("123"), &Operator::GtEq, &tag_column()),
Range {
lower: None,
upper: Some(Bound {
inclusive: true,
value: encoded_string("123"),
}),
},
),
];
let metadata = test_region_metadata();
let mut builder =
SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata);
for ((left, op, right), _) in &cases {
builder.collect_comparison_expr(left, op, right).unwrap();
}
let predicates = builder.output.get("a").unwrap();
assert_eq!(predicates.len(), cases.len());
for ((_, expected), actual) in cases.into_iter().zip(predicates) {
assert_eq!(
actual,
&Predicate::Range(RangePredicate { range: expected })
);
}
}
#[test]
fn test_collect_comparison_type_mismatch() {
let metadata = test_region_metadata();
let mut builder =
SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata);
let res = builder.collect_comparison_expr(&tag_column(), &Operator::Lt, &int64_lit(10));
assert!(matches!(res, Err(Error::FieldTypeMismatch { .. })));
assert!(builder.output.is_empty());
}
#[test]
fn test_collect_comparison_field_column() {
let metadata = test_region_metadata();
let mut builder =
SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata);
builder
.collect_comparison_expr(&field_column(), &Operator::Lt, &string_lit("abc"))
.unwrap();
assert!(builder.output.is_empty());
}
#[test]
fn test_collect_comparison_nonexistent_column() {
let metadata = test_region_metadata();
let mut builder =
SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata);
let res = builder.collect_comparison_expr(
&nonexistent_column(),
&Operator::Lt,
&string_lit("abc"),
);
assert!(matches!(res, Err(Error::ColumnNotFound { .. })));
assert!(builder.output.is_empty());
}
}

View File

@@ -0,0 +1,302 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashSet;
use datafusion_expr::{BinaryExpr, Expr as DfExpr, Operator};
use datatypes::data_type::ConcreteDataType;
use index::inverted_index::search::predicate::{InListPredicate, Predicate};
use index::inverted_index::Bytes;
use crate::error::Result;
use crate::sst::index::applier::builder::SstIndexApplierBuilder;
impl<'a> SstIndexApplierBuilder<'a> {
/// Collects an eq expression in the form of `column = lit`.
pub(crate) fn collect_eq(&mut self, left: &DfExpr, right: &DfExpr) -> Result<()> {
let Some(column_name) = Self::column_name(left).or_else(|| Self::column_name(right)) else {
return Ok(());
};
let Some(lit) = Self::nonnull_lit(right).or_else(|| Self::nonnull_lit(left)) else {
return Ok(());
};
let Some(data_type) = self.tag_column_type(column_name)? else {
return Ok(());
};
let predicate = Predicate::InList(InListPredicate {
list: HashSet::from_iter([Self::encode_lit(lit, data_type)?]),
});
self.add_predicate(column_name, predicate);
Ok(())
}
/// Collects eq list in the form of `column = lit OR column = lit OR ...`.
pub(crate) fn collect_or_eq_list(&mut self, eq_expr: &DfExpr, or_list: &DfExpr) -> Result<()> {
let DfExpr::BinaryExpr(BinaryExpr {
left,
op: Operator::Eq,
right,
}) = eq_expr
else {
return Ok(());
};
let Some(column_name) = Self::column_name(left).or_else(|| Self::column_name(right)) else {
return Ok(());
};
let Some(lit) = Self::nonnull_lit(right).or_else(|| Self::nonnull_lit(left)) else {
return Ok(());
};
let Some(data_type) = self.tag_column_type(column_name)? else {
return Ok(());
};
let bytes = Self::encode_lit(lit, data_type.clone())?;
let mut inlist = HashSet::from_iter([bytes]);
if Self::collect_eq_list_inner(column_name, &data_type, or_list, &mut inlist)? {
let predicate = Predicate::InList(InListPredicate { list: inlist });
self.add_predicate(column_name, predicate);
}
Ok(())
}
/// Recursively collects eq list.
///
/// Returns false if the expression doesn't match the form then
/// caller can safely ignore the expression.
fn collect_eq_list_inner(
column_name: &str,
data_type: &ConcreteDataType,
expr: &DfExpr,
inlist: &mut HashSet<Bytes>,
) -> Result<bool> {
let DfExpr::BinaryExpr(BinaryExpr {
left,
op: op @ (Operator::Eq | Operator::Or),
right,
}) = expr
else {
return Ok(false);
};
if op == &Operator::Or {
let r = Self::collect_eq_list_inner(column_name, data_type, left, inlist)?
.then(|| Self::collect_eq_list_inner(column_name, data_type, right, inlist))
.transpose()?
.unwrap_or(false);
return Ok(r);
}
if op == &Operator::Eq {
let Some(name) = Self::column_name(left).or_else(|| Self::column_name(right)) else {
return Ok(false);
};
if column_name != name {
return Ok(false);
}
let Some(lit) = Self::nonnull_lit(right).or_else(|| Self::nonnull_lit(left)) else {
return Ok(false);
};
inlist.insert(Self::encode_lit(lit, data_type.clone())?);
return Ok(true);
}
Ok(false)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::error::Error;
use crate::sst::index::applier::builder::tests::{
encoded_string, field_column, int64_lit, nonexistent_column, string_lit, tag_column,
tag_column2, test_object_store, test_region_metadata,
};
#[test]
fn test_collect_eq_basic() {
let metadata = test_region_metadata();
let mut builder =
SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata);
builder
.collect_eq(&tag_column(), &string_lit("foo"))
.unwrap();
builder
.collect_eq(&string_lit("bar"), &tag_column())
.unwrap();
let predicates = builder.output.get("a").unwrap();
assert_eq!(predicates.len(), 2);
assert_eq!(
predicates[0],
Predicate::InList(InListPredicate {
list: HashSet::from_iter([encoded_string("foo")])
})
);
assert_eq!(
predicates[1],
Predicate::InList(InListPredicate {
list: HashSet::from_iter([encoded_string("bar")])
})
);
}
#[test]
fn test_collect_eq_field_column() {
let metadata = test_region_metadata();
let mut builder =
SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata);
builder
.collect_eq(&field_column(), &string_lit("abc"))
.unwrap();
assert!(builder.output.is_empty());
}
#[test]
fn test_collect_eq_nonexistent_column() {
let metadata = test_region_metadata();
let mut builder =
SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata);
let res = builder.collect_eq(&nonexistent_column(), &string_lit("abc"));
assert!(matches!(res, Err(Error::ColumnNotFound { .. })));
assert!(builder.output.is_empty());
}
#[test]
fn test_collect_eq_type_mismatch() {
let metadata = test_region_metadata();
let mut builder =
SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata);
let res = builder.collect_eq(&tag_column(), &int64_lit(1));
assert!(matches!(res, Err(Error::FieldTypeMismatch { .. })));
assert!(builder.output.is_empty());
}
#[test]
fn test_collect_or_eq_list_basic() {
let metadata = test_region_metadata();
let mut builder =
SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata);
let eq_expr = DfExpr::BinaryExpr(BinaryExpr {
left: Box::new(tag_column()),
op: Operator::Eq,
right: Box::new(string_lit("abc")),
});
let or_eq_list = DfExpr::BinaryExpr(BinaryExpr {
left: Box::new(DfExpr::BinaryExpr(BinaryExpr {
left: Box::new(tag_column()),
op: Operator::Eq,
right: Box::new(string_lit("foo")),
})),
op: Operator::Or,
right: Box::new(DfExpr::BinaryExpr(BinaryExpr {
left: Box::new(DfExpr::BinaryExpr(BinaryExpr {
left: Box::new(tag_column()),
op: Operator::Eq,
right: Box::new(string_lit("bar")),
})),
op: Operator::Or,
right: Box::new(DfExpr::BinaryExpr(BinaryExpr {
left: Box::new(tag_column()),
op: Operator::Eq,
right: Box::new(string_lit("baz")),
})),
})),
});
builder.collect_or_eq_list(&eq_expr, &or_eq_list).unwrap();
let predicates = builder.output.get("a").unwrap();
assert_eq!(predicates.len(), 1);
assert_eq!(
predicates[0],
Predicate::InList(InListPredicate {
list: HashSet::from_iter([
encoded_string("abc"),
encoded_string("foo"),
encoded_string("bar"),
encoded_string("baz")
])
})
);
}
#[test]
fn test_collect_or_eq_list_invalid_op() {
let metadata = test_region_metadata();
let mut builder =
SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata);
let eq_expr = DfExpr::BinaryExpr(BinaryExpr {
left: Box::new(tag_column()),
op: Operator::Eq,
right: Box::new(string_lit("abc")),
});
let or_eq_list = DfExpr::BinaryExpr(BinaryExpr {
left: Box::new(DfExpr::BinaryExpr(BinaryExpr {
left: Box::new(tag_column()),
op: Operator::Eq,
right: Box::new(string_lit("foo")),
})),
op: Operator::Or,
right: Box::new(DfExpr::BinaryExpr(BinaryExpr {
left: Box::new(tag_column()),
op: Operator::Gt, // invalid op
right: Box::new(string_lit("foo")),
})),
});
builder.collect_or_eq_list(&eq_expr, &or_eq_list).unwrap();
assert!(builder.output.is_empty());
}
#[test]
fn test_collect_or_eq_list_multiple_columns() {
let metadata = test_region_metadata();
let mut builder =
SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata);
let eq_expr = DfExpr::BinaryExpr(BinaryExpr {
left: Box::new(tag_column()),
op: Operator::Eq,
right: Box::new(string_lit("abc")),
});
let or_eq_list = DfExpr::BinaryExpr(BinaryExpr {
left: Box::new(DfExpr::BinaryExpr(BinaryExpr {
left: Box::new(tag_column()),
op: Operator::Eq,
right: Box::new(string_lit("foo")),
})),
op: Operator::Or,
right: Box::new(DfExpr::BinaryExpr(BinaryExpr {
left: Box::new(tag_column2()), // different column
op: Operator::Eq,
right: Box::new(string_lit("bar")),
})),
});
builder.collect_or_eq_list(&eq_expr, &or_eq_list).unwrap();
assert!(builder.output.is_empty());
}
}

View File

@@ -0,0 +1,152 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashSet;
use datafusion_expr::expr::InList;
use index::inverted_index::search::predicate::{InListPredicate, Predicate};
use crate::error::Result;
use crate::sst::index::applier::builder::SstIndexApplierBuilder;
impl<'a> SstIndexApplierBuilder<'a> {
/// Collects an in list expression in the form of `column IN (lit, lit, ...)`.
pub(crate) fn collect_inlist(&mut self, inlist: &InList) -> Result<()> {
if inlist.negated {
return Ok(());
}
let Some(column_name) = Self::column_name(&inlist.expr) else {
return Ok(());
};
let Some(data_type) = self.tag_column_type(column_name)? else {
return Ok(());
};
let mut predicate = InListPredicate {
list: HashSet::with_capacity(inlist.list.len()),
};
for lit in &inlist.list {
let Some(lit) = Self::nonnull_lit(lit) else {
return Ok(());
};
predicate
.list
.insert(Self::encode_lit(lit, data_type.clone())?);
}
self.add_predicate(column_name, Predicate::InList(predicate));
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::error::Error;
use crate::sst::index::applier::builder::tests::{
encoded_string, field_column, int64_lit, nonexistent_column, string_lit, tag_column,
test_object_store, test_region_metadata,
};
#[test]
fn test_collect_in_list_basic() {
let metadata = test_region_metadata();
let mut builder =
SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata);
let in_list = InList {
expr: Box::new(tag_column()),
list: vec![string_lit("foo"), string_lit("bar")],
negated: false,
};
builder.collect_inlist(&in_list).unwrap();
let predicates = builder.output.get("a").unwrap();
assert_eq!(predicates.len(), 1);
assert_eq!(
predicates[0],
Predicate::InList(InListPredicate {
list: HashSet::from_iter([encoded_string("foo"), encoded_string("bar")])
})
);
}
#[test]
fn test_collect_in_list_negated() {
let metadata = test_region_metadata();
let mut builder =
SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata);
let in_list = InList {
expr: Box::new(tag_column()),
list: vec![string_lit("foo"), string_lit("bar")],
negated: true,
};
builder.collect_inlist(&in_list).unwrap();
assert!(builder.output.is_empty());
}
#[test]
fn test_collect_in_list_field_column() {
let metadata = test_region_metadata();
let mut builder =
SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata);
let in_list = InList {
expr: Box::new(field_column()),
list: vec![string_lit("foo"), string_lit("bar")],
negated: false,
};
builder.collect_inlist(&in_list).unwrap();
assert!(builder.output.is_empty());
}
#[test]
fn test_collect_in_list_type_mismatch() {
let metadata = test_region_metadata();
let mut builder =
SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata);
let in_list = InList {
expr: Box::new(tag_column()),
list: vec![int64_lit(123), int64_lit(456)],
negated: false,
};
let res = builder.collect_inlist(&in_list);
assert!(matches!(res, Err(Error::FieldTypeMismatch { .. })));
assert!(builder.output.is_empty());
}
#[test]
fn test_collect_in_list_nonexistent_column() {
let metadata = test_region_metadata();
let mut builder =
SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata);
let in_list = InList {
expr: Box::new(nonexistent_column()),
list: vec![string_lit("foo"), string_lit("bar")],
negated: false,
};
let res = builder.collect_inlist(&in_list);
assert!(matches!(res, Err(Error::ColumnNotFound { .. })));
assert!(builder.output.is_empty());
}
}

View File

@@ -0,0 +1,111 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use datafusion_common::ScalarValue;
use datafusion_expr::Expr as DfExpr;
use index::inverted_index::search::predicate::{Predicate, RegexMatchPredicate};
use crate::error::Result;
use crate::sst::index::applier::builder::SstIndexApplierBuilder;
impl<'a> SstIndexApplierBuilder<'a> {
/// Collects a regex match expression in the form of `column ~ pattern`.
pub(crate) fn collect_regex_match(&mut self, column: &DfExpr, pattern: &DfExpr) -> Result<()> {
let Some(column_name) = Self::column_name(column) else {
return Ok(());
};
let Some(data_type) = self.tag_column_type(column_name)? else {
return Ok(());
};
if !data_type.is_string() {
return Ok(());
}
let DfExpr::Literal(ScalarValue::Utf8(Some(pattern))) = pattern else {
return Ok(());
};
let predicate = Predicate::RegexMatch(RegexMatchPredicate {
pattern: pattern.clone(),
});
self.add_predicate(column_name, predicate);
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::error::Error;
use crate::sst::index::applier::builder::tests::{
field_column, int64_lit, nonexistent_column, string_lit, tag_column, test_object_store,
test_region_metadata,
};
#[test]
fn test_regex_match_basic() {
let metadata = test_region_metadata();
let mut builder =
SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata);
builder
.collect_regex_match(&tag_column(), &string_lit("abc"))
.unwrap();
let predicates = builder.output.get("a").unwrap();
assert_eq!(predicates.len(), 1);
assert_eq!(
predicates[0],
Predicate::RegexMatch(RegexMatchPredicate {
pattern: "abc".to_string()
})
);
}
#[test]
fn test_regex_match_field_column() {
let metadata = test_region_metadata();
let mut builder =
SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata);
builder
.collect_regex_match(&field_column(), &string_lit("abc"))
.unwrap();
assert!(builder.output.is_empty());
}
#[test]
fn test_regex_match_type_mismatch() {
let metadata = test_region_metadata();
let mut builder =
SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata);
builder
.collect_regex_match(&tag_column(), &int64_lit(123))
.unwrap();
assert!(builder.output.is_empty());
}
#[test]
fn test_regex_match_type_nonexist_column() {
let metadata = test_region_metadata();
let mut builder =
SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata);
let res = builder.collect_regex_match(&nonexistent_column(), &string_lit("abc"));
assert!(matches!(res, Err(Error::ColumnNotFound { .. })));
assert!(builder.output.is_empty());
}
}