mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-03 20:02:54 +00:00
feat(inverted_index.search): add fst applier (#2851)
* feat(inverted_index.search): add fst applier Signed-off-by: Zhenchi <zhongzc_arch@outlook.com> * fix: typos Signed-off-by: Zhenchi <zhongzc_arch@outlook.com> --------- Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
This commit is contained in:
3
Cargo.lock
generated
3
Cargo.lock
generated
@@ -3920,6 +3920,8 @@ dependencies = [
|
||||
"futures",
|
||||
"greptime-proto",
|
||||
"prost 0.12.2",
|
||||
"regex",
|
||||
"regex-automata 0.1.10",
|
||||
"snafu",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
@@ -6953,6 +6955,7 @@ version = "0.1.10"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132"
|
||||
dependencies = [
|
||||
"fst",
|
||||
"regex-syntax 0.6.29",
|
||||
]
|
||||
|
||||
|
||||
@@ -107,6 +107,7 @@ prost = "0.12"
|
||||
raft-engine = { git = "https://github.com/tikv/raft-engine.git", rev = "22dfb426cd994602b57725ef080287d3e53db479" }
|
||||
rand = "0.8"
|
||||
regex = "1.8"
|
||||
regex-automata = { version = "0.1", features = ["transducer"] }
|
||||
reqwest = { version = "0.11", default-features = false, features = [
|
||||
"json",
|
||||
"rustls-tls-native-roots",
|
||||
|
||||
@@ -13,6 +13,8 @@ fst.workspace = true
|
||||
futures.workspace = true
|
||||
greptime-proto.workspace = true
|
||||
prost.workspace = true
|
||||
regex-automata.workspace = true
|
||||
regex.workspace = true
|
||||
snafu.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
|
||||
@@ -14,3 +14,7 @@
|
||||
|
||||
pub mod error;
|
||||
pub mod format;
|
||||
pub mod search;
|
||||
|
||||
pub type FstMap = fst::Map<Vec<u8>>;
|
||||
pub type Bytes = Vec<u8>;
|
||||
|
||||
@@ -20,6 +20,8 @@ use common_error::status_code::StatusCode;
|
||||
use common_macro::stack_trace_debug;
|
||||
use snafu::{Location, Snafu};
|
||||
|
||||
use crate::inverted_index::search::predicate::Predicate;
|
||||
|
||||
#[derive(Snafu)]
|
||||
#[snafu(visibility(pub))]
|
||||
#[stack_trace_debug]
|
||||
@@ -75,6 +77,38 @@ pub enum Error {
|
||||
error: prost::DecodeError,
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to parse regex pattern: {pattern}"))]
|
||||
ParseRegex {
|
||||
#[snafu(source)]
|
||||
error: regex::Error,
|
||||
pattern: String,
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to parse regex DFA"))]
|
||||
ParseDFA {
|
||||
#[snafu(source)]
|
||||
error: regex_automata::Error,
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Unexpected empty predicates to construct fst applier"))]
|
||||
EmptyPredicates { location: Location },
|
||||
|
||||
#[snafu(display("Failed to construct intersection fst applier with InList predicate"))]
|
||||
IntersectionApplierWithInList { location: Location },
|
||||
|
||||
#[snafu(display("Failed to construct keys fst applier without InList predicate"))]
|
||||
KeysApplierWithoutInList { location: Location },
|
||||
|
||||
#[snafu(display(
|
||||
"Failed to construct keys fst applier with unexpected predicates: {predicates:?}"
|
||||
))]
|
||||
KeysApplierUnexpectedPredicates {
|
||||
location: Location,
|
||||
predicates: Vec<Predicate>,
|
||||
},
|
||||
}
|
||||
|
||||
impl ErrorExt for Error {
|
||||
@@ -87,7 +121,14 @@ impl ErrorExt for Error {
|
||||
| UnexpectedOffsetSize { .. }
|
||||
| UnexpectedBlobSize { .. }
|
||||
| DecodeProto { .. }
|
||||
| DecodeFst { .. } => StatusCode::Unexpected,
|
||||
| DecodeFst { .. }
|
||||
| KeysApplierUnexpectedPredicates { .. } => StatusCode::Unexpected,
|
||||
|
||||
ParseRegex { .. }
|
||||
| ParseDFA { .. }
|
||||
| KeysApplierWithoutInList { .. }
|
||||
| IntersectionApplierWithInList { .. }
|
||||
| EmptyPredicates { .. } => StatusCode::InvalidArguments,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -17,12 +17,10 @@ mod footer;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use common_base::BitVec;
|
||||
use fst::Map;
|
||||
use greptime_proto::v1::index::{InvertedIndexMeta, InvertedIndexMetas};
|
||||
|
||||
use crate::inverted_index::error::Result;
|
||||
|
||||
pub type FstMap = Map<Vec<u8>>;
|
||||
use crate::inverted_index::FstMap;
|
||||
|
||||
/// InvertedIndexReader defines an asynchronous reader of inverted index data
|
||||
#[async_trait]
|
||||
|
||||
16
src/index/src/inverted_index/search.rs
Normal file
16
src/index/src/inverted_index/search.rs
Normal file
@@ -0,0 +1,16 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
pub mod fst_apply;
|
||||
pub mod predicate;
|
||||
32
src/index/src/inverted_index/search/fst_apply.rs
Normal file
32
src/index/src/inverted_index/search/fst_apply.rs
Normal file
@@ -0,0 +1,32 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
mod intersection_apply;
|
||||
mod keys_apply;
|
||||
|
||||
pub use intersection_apply::IntersectionFstApplier;
|
||||
pub use keys_apply::KeysFstApplier;
|
||||
|
||||
use crate::inverted_index::FstMap;
|
||||
|
||||
/// A trait for objects that can process a finite state transducer (FstMap) and return
|
||||
/// associated values.
|
||||
pub trait FstApplier: Send + Sync {
|
||||
/// Retrieves values from an FstMap.
|
||||
///
|
||||
/// * `fst`: A reference to the FstMap from which the values will be fetched.
|
||||
///
|
||||
/// Returns a `Vec<u64>`, with each u64 being a value from the FstMap.
|
||||
fn apply(&self, fst: &FstMap) -> Vec<u64>;
|
||||
}
|
||||
@@ -0,0 +1,325 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use fst::map::OpBuilder;
|
||||
use fst::{IntoStreamer, Streamer};
|
||||
use regex_automata::DenseDFA;
|
||||
use snafu::{ensure, ResultExt};
|
||||
|
||||
use crate::inverted_index::error::{
|
||||
EmptyPredicatesSnafu, IntersectionApplierWithInListSnafu, ParseDFASnafu, Result,
|
||||
};
|
||||
use crate::inverted_index::search::fst_apply::FstApplier;
|
||||
use crate::inverted_index::search::predicate::{Predicate, Range};
|
||||
use crate::inverted_index::FstMap;
|
||||
|
||||
type Dfa = DenseDFA<Vec<usize>, usize>;
|
||||
|
||||
/// `IntersectionFstApplier` applies intersection operations on an FstMap using specified ranges and regex patterns.
|
||||
pub struct IntersectionFstApplier {
|
||||
/// A list of `Range` which define inclusive or exclusive ranges for keys to be queried in the FstMap.
|
||||
ranges: Vec<Range>,
|
||||
|
||||
/// A list of `Dfa` compiled from regular expression patterns.
|
||||
dfas: Vec<Dfa>,
|
||||
}
|
||||
|
||||
impl FstApplier for IntersectionFstApplier {
|
||||
fn apply(&self, fst: &FstMap) -> Vec<u64> {
|
||||
let mut op = OpBuilder::new();
|
||||
|
||||
for range in &self.ranges {
|
||||
match (range.lower.as_ref(), range.upper.as_ref()) {
|
||||
(Some(lower), Some(upper)) => match (lower.inclusive, upper.inclusive) {
|
||||
(true, true) => op.push(fst.range().ge(&lower.value).le(&upper.value)),
|
||||
(true, false) => op.push(fst.range().ge(&lower.value).lt(&upper.value)),
|
||||
(false, true) => op.push(fst.range().gt(&lower.value).le(&upper.value)),
|
||||
(false, false) => op.push(fst.range().gt(&lower.value).lt(&upper.value)),
|
||||
},
|
||||
(Some(lower), None) => match lower.inclusive {
|
||||
true => op.push(fst.range().ge(&lower.value)),
|
||||
false => op.push(fst.range().gt(&lower.value)),
|
||||
},
|
||||
(None, Some(upper)) => match upper.inclusive {
|
||||
true => op.push(fst.range().le(&upper.value)),
|
||||
false => op.push(fst.range().lt(&upper.value)),
|
||||
},
|
||||
(None, None) => op.push(fst),
|
||||
}
|
||||
}
|
||||
|
||||
for dfa in &self.dfas {
|
||||
op.push(fst.search(dfa));
|
||||
}
|
||||
|
||||
let mut stream = op.intersection().into_stream();
|
||||
let mut values = Vec::new();
|
||||
while let Some((_, v)) = stream.next() {
|
||||
values.push(v[0].value)
|
||||
}
|
||||
values
|
||||
}
|
||||
}
|
||||
|
||||
impl IntersectionFstApplier {
|
||||
/// Attempts to create an `IntersectionFstApplier` from a list of `Predicate`.
|
||||
///
|
||||
/// This function only accepts predicates of the variants `Range` and `RegexMatch`.
|
||||
/// It does not accept `InList` predicates and will return an error if any are found.
|
||||
/// `InList` predicates are handled by `KeysFstApplier`.
|
||||
pub fn try_from(predicates: Vec<Predicate>) -> Result<Self> {
|
||||
ensure!(!predicates.is_empty(), EmptyPredicatesSnafu);
|
||||
|
||||
let mut dfas = Vec::with_capacity(predicates.len());
|
||||
let mut ranges = Vec::with_capacity(predicates.len());
|
||||
|
||||
for predicate in predicates {
|
||||
match predicate {
|
||||
Predicate::Range(range) => ranges.push(range.range),
|
||||
Predicate::RegexMatch(regex) => {
|
||||
let dfa = DenseDFA::new(®ex.pattern);
|
||||
let dfa = dfa.context(ParseDFASnafu)?;
|
||||
dfas.push(dfa);
|
||||
}
|
||||
// Rejection of `InList` predicates is enforced here.
|
||||
Predicate::InList(_) => {
|
||||
return IntersectionApplierWithInListSnafu.fail();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(Self { dfas, ranges })
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<Vec<Predicate>> for IntersectionFstApplier {
|
||||
type Error = crate::inverted_index::error::Error;
|
||||
|
||||
fn try_from(predicates: Vec<Predicate>) -> Result<Self> {
|
||||
Self::try_from(predicates)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::collections::HashSet;
|
||||
|
||||
use super::*;
|
||||
use crate::inverted_index::error::Error;
|
||||
use crate::inverted_index::search::predicate::{
|
||||
Bound, InListPredicate, RangePredicate, RegexMatchPredicate,
|
||||
};
|
||||
|
||||
fn create_applier_from_range(range: Range) -> Result<IntersectionFstApplier> {
|
||||
IntersectionFstApplier::try_from(vec![Predicate::Range(RangePredicate { range })])
|
||||
}
|
||||
|
||||
fn create_applier_from_pattern(pattern: &str) -> Result<IntersectionFstApplier> {
|
||||
IntersectionFstApplier::try_from(vec![Predicate::RegexMatch(RegexMatchPredicate {
|
||||
pattern: pattern.to_string(),
|
||||
})])
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_intersection_fst_applier_with_ranges() {
|
||||
let test_fst = FstMap::from_iter([("aa", 1), ("bb", 2), ("cc", 3)]).unwrap();
|
||||
|
||||
let applier_inclusive_lower = create_applier_from_range(Range {
|
||||
lower: Some(Bound {
|
||||
value: b"bb".to_vec(),
|
||||
inclusive: true,
|
||||
}),
|
||||
upper: None,
|
||||
})
|
||||
.unwrap();
|
||||
let results = applier_inclusive_lower.apply(&test_fst);
|
||||
assert_eq!(results, vec![2, 3]);
|
||||
|
||||
let applier_exclusive_lower = create_applier_from_range(Range {
|
||||
lower: Some(Bound {
|
||||
value: b"bb".to_vec(),
|
||||
inclusive: false,
|
||||
}),
|
||||
upper: None,
|
||||
})
|
||||
.unwrap();
|
||||
let results = applier_exclusive_lower.apply(&test_fst);
|
||||
assert_eq!(results, vec![3]);
|
||||
|
||||
let applier_inclusive_upper = create_applier_from_range(Range {
|
||||
lower: None,
|
||||
upper: Some(Bound {
|
||||
value: b"bb".to_vec(),
|
||||
inclusive: true,
|
||||
}),
|
||||
})
|
||||
.unwrap();
|
||||
let results = applier_inclusive_upper.apply(&test_fst);
|
||||
assert_eq!(results, vec![1, 2]);
|
||||
|
||||
let applier_exclusive_upper = create_applier_from_range(Range {
|
||||
lower: None,
|
||||
upper: Some(Bound {
|
||||
value: b"bb".to_vec(),
|
||||
inclusive: false,
|
||||
}),
|
||||
})
|
||||
.unwrap();
|
||||
let results = applier_exclusive_upper.apply(&test_fst);
|
||||
assert_eq!(results, vec![1]);
|
||||
|
||||
let applier_inclusive_bounds = create_applier_from_range(Range {
|
||||
lower: Some(Bound {
|
||||
value: b"aa".to_vec(),
|
||||
inclusive: true,
|
||||
}),
|
||||
upper: Some(Bound {
|
||||
value: b"cc".to_vec(),
|
||||
inclusive: true,
|
||||
}),
|
||||
})
|
||||
.unwrap();
|
||||
let results = applier_inclusive_bounds.apply(&test_fst);
|
||||
assert_eq!(results, vec![1, 2, 3]);
|
||||
|
||||
let applier_exclusive_bounds = create_applier_from_range(Range {
|
||||
lower: Some(Bound {
|
||||
value: b"aa".to_vec(),
|
||||
inclusive: false,
|
||||
}),
|
||||
upper: Some(Bound {
|
||||
value: b"cc".to_vec(),
|
||||
inclusive: false,
|
||||
}),
|
||||
})
|
||||
.unwrap();
|
||||
let results = applier_exclusive_bounds.apply(&test_fst);
|
||||
assert_eq!(results, vec![2]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_intersection_fst_applier_with_valid_pattern() {
|
||||
let test_fst = FstMap::from_iter([("aa", 1), ("bb", 2), ("cc", 3)]).unwrap();
|
||||
|
||||
let applier = create_applier_from_pattern("a.?").unwrap();
|
||||
let results = applier.apply(&test_fst);
|
||||
assert_eq!(results, vec![1]);
|
||||
|
||||
let applier = create_applier_from_pattern("b.?").unwrap();
|
||||
let results = applier.apply(&test_fst);
|
||||
assert_eq!(results, vec![2]);
|
||||
|
||||
let applier = create_applier_from_pattern("c.?").unwrap();
|
||||
let results = applier.apply(&test_fst);
|
||||
assert_eq!(results, vec![3]);
|
||||
|
||||
let applier = create_applier_from_pattern("a.*").unwrap();
|
||||
let results = applier.apply(&test_fst);
|
||||
assert_eq!(results, vec![1]);
|
||||
|
||||
let applier = create_applier_from_pattern("b.*").unwrap();
|
||||
let results = applier.apply(&test_fst);
|
||||
assert_eq!(results, vec![2]);
|
||||
|
||||
let applier = create_applier_from_pattern("c.*").unwrap();
|
||||
let results = applier.apply(&test_fst);
|
||||
assert_eq!(results, vec![3]);
|
||||
|
||||
let applier = create_applier_from_pattern("d.?").unwrap();
|
||||
let results = applier.apply(&test_fst);
|
||||
assert!(results.is_empty());
|
||||
|
||||
let applier = create_applier_from_pattern("a.?|b.?").unwrap();
|
||||
let results = applier.apply(&test_fst);
|
||||
assert_eq!(results, vec![1, 2]);
|
||||
|
||||
let applier = create_applier_from_pattern("d.?|a.?").unwrap();
|
||||
let results = applier.apply(&test_fst);
|
||||
assert_eq!(results, vec![1]);
|
||||
|
||||
let applier = create_applier_from_pattern(".*").unwrap();
|
||||
let results = applier.apply(&test_fst);
|
||||
assert_eq!(results, vec![1, 2, 3]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_intersection_fst_applier_with_composite_predicates() {
|
||||
let test_fst = FstMap::from_iter([("aa", 1), ("bb", 2), ("cc", 3)]).unwrap();
|
||||
|
||||
let applier = IntersectionFstApplier::try_from(vec![
|
||||
Predicate::Range(RangePredicate {
|
||||
range: Range {
|
||||
lower: Some(Bound {
|
||||
value: b"aa".to_vec(),
|
||||
inclusive: true,
|
||||
}),
|
||||
upper: Some(Bound {
|
||||
value: b"cc".to_vec(),
|
||||
inclusive: true,
|
||||
}),
|
||||
},
|
||||
}),
|
||||
Predicate::RegexMatch(RegexMatchPredicate {
|
||||
pattern: "a.?".to_string(),
|
||||
}),
|
||||
])
|
||||
.unwrap();
|
||||
let results = applier.apply(&test_fst);
|
||||
assert_eq!(results, vec![1]);
|
||||
|
||||
let applier = IntersectionFstApplier::try_from(vec![
|
||||
Predicate::Range(RangePredicate {
|
||||
range: Range {
|
||||
lower: Some(Bound {
|
||||
value: b"aa".to_vec(),
|
||||
inclusive: false,
|
||||
}),
|
||||
upper: Some(Bound {
|
||||
value: b"cc".to_vec(),
|
||||
inclusive: true,
|
||||
}),
|
||||
},
|
||||
}),
|
||||
Predicate::RegexMatch(RegexMatchPredicate {
|
||||
pattern: "a.?".to_string(),
|
||||
}),
|
||||
])
|
||||
.unwrap();
|
||||
let results = applier.apply(&test_fst);
|
||||
assert!(results.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_intersection_fst_applier_with_invalid_pattern() {
|
||||
let result = create_applier_from_pattern("a(");
|
||||
assert!(matches!(result, Err(Error::ParseDFA { .. })));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_intersection_fst_applier_with_empty_predicates() {
|
||||
let result = IntersectionFstApplier::try_from(vec![]);
|
||||
assert!(matches!(result, Err(Error::EmptyPredicates { .. })));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_intersection_fst_applier_with_in_list_predicate() {
|
||||
let result = IntersectionFstApplier::try_from(vec![Predicate::InList(InListPredicate {
|
||||
list: HashSet::from_iter([b"one".to_vec(), b"two".to_vec()]),
|
||||
})]);
|
||||
assert!(matches!(
|
||||
result,
|
||||
Err(Error::IntersectionApplierWithInList { .. })
|
||||
));
|
||||
}
|
||||
}
|
||||
302
src/index/src/inverted_index/search/fst_apply/keys_apply.rs
Normal file
302
src/index/src/inverted_index/search/fst_apply/keys_apply.rs
Normal file
@@ -0,0 +1,302 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashSet;
|
||||
|
||||
use snafu::{ensure, ResultExt};
|
||||
|
||||
use crate::inverted_index::error::{
|
||||
EmptyPredicatesSnafu, KeysApplierUnexpectedPredicatesSnafu, KeysApplierWithoutInListSnafu,
|
||||
ParseRegexSnafu, Result,
|
||||
};
|
||||
use crate::inverted_index::search::fst_apply::FstApplier;
|
||||
use crate::inverted_index::search::predicate::Predicate;
|
||||
use crate::inverted_index::{Bytes, FstMap};
|
||||
|
||||
/// `KeysFstApplier` is responsible for applying a search using a set of predefined keys
|
||||
/// against an FstMap to fetch associated values.
|
||||
pub struct KeysFstApplier {
|
||||
/// A list of keys to be fetched directly from the FstMap.
|
||||
keys: Vec<Bytes>,
|
||||
}
|
||||
|
||||
impl FstApplier for KeysFstApplier {
|
||||
fn apply(&self, fst: &FstMap) -> Vec<u64> {
|
||||
self.keys.iter().filter_map(|k| fst.get(k)).collect()
|
||||
}
|
||||
}
|
||||
|
||||
impl KeysFstApplier {
|
||||
/// Tries to create a `KeysFstApplier` from a list of predicates.
|
||||
///
|
||||
/// This function constructs the applier by intersecting keys from one or more `InList` predicates,
|
||||
/// which are required. It then optionally refines this set using any additional `Range` and `RegexMatch`
|
||||
/// predicates provided.
|
||||
pub fn try_from(mut predicates: Vec<Predicate>) -> Result<Self> {
|
||||
ensure!(!predicates.is_empty(), EmptyPredicatesSnafu);
|
||||
|
||||
let (in_lists, others) = Self::split_at_in_lists(&mut predicates);
|
||||
let (ranges, regexes) = Self::split_at_ranges(others);
|
||||
Self::ensure_all_regexes(regexes)?;
|
||||
|
||||
ensure!(!in_lists.is_empty(), KeysApplierWithoutInListSnafu);
|
||||
let intersected_keys = Self::intersect_with_lists(in_lists);
|
||||
let range_matched_keys = Self::filter_by_ranges(intersected_keys, ranges);
|
||||
let regex_matched_keys = Self::filter_by_regexes(range_matched_keys, regexes)?;
|
||||
|
||||
Ok(Self {
|
||||
keys: regex_matched_keys,
|
||||
})
|
||||
}
|
||||
|
||||
fn split_at_in_lists(predicates: &mut [Predicate]) -> (&mut [Predicate], &mut [Predicate]) {
|
||||
let in_list_index = predicates
|
||||
.iter_mut()
|
||||
.partition_in_place(|p| matches!(p, Predicate::InList(_)));
|
||||
predicates.split_at_mut(in_list_index)
|
||||
}
|
||||
|
||||
fn split_at_ranges(predicates: &mut [Predicate]) -> (&mut [Predicate], &mut [Predicate]) {
|
||||
let range_index = predicates
|
||||
.iter_mut()
|
||||
.partition_in_place(|p| matches!(p, Predicate::Range(_)));
|
||||
predicates.split_at_mut(range_index)
|
||||
}
|
||||
|
||||
fn ensure_all_regexes(ps: &[Predicate]) -> Result<()> {
|
||||
ensure!(
|
||||
ps.iter().all(|p| matches!(p, Predicate::RegexMatch(_))),
|
||||
KeysApplierUnexpectedPredicatesSnafu {
|
||||
predicates: ps.to_vec()
|
||||
}
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn intersect_with_lists(in_lists: &mut [Predicate]) -> Vec<Bytes> {
|
||||
#[inline]
|
||||
fn get_list(p: &Predicate) -> &HashSet<Bytes> {
|
||||
match p {
|
||||
Predicate::InList(i) => &i.list,
|
||||
_ => unreachable!(), // `in_lists` is filtered by `split_at_in_lists
|
||||
}
|
||||
}
|
||||
|
||||
in_lists.sort_unstable_by_key(|p| get_list(p).len());
|
||||
get_list(&in_lists[0])
|
||||
.iter()
|
||||
.filter(|c| in_lists[1..].iter().all(|s| get_list(s).contains(*c)))
|
||||
.cloned()
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn filter_by_ranges(mut keys: Vec<Bytes>, ranges: &[Predicate]) -> Vec<Bytes> {
|
||||
#[inline]
|
||||
fn range_contains(p: &Predicate, key: &Bytes) -> bool {
|
||||
let (lower, upper) = match p {
|
||||
Predicate::Range(r) => (&r.range.lower, &r.range.upper),
|
||||
_ => unreachable!(), // `ranges` is filtered by `split_at_ranges`
|
||||
};
|
||||
|
||||
match (lower, upper) {
|
||||
(Some(lower), Some(upper)) => match (lower.inclusive, upper.inclusive) {
|
||||
(true, true) => &lower.value <= key && key <= &upper.value,
|
||||
(true, false) => &lower.value <= key && key < &upper.value,
|
||||
(false, true) => &lower.value < key && key <= &upper.value,
|
||||
(false, false) => &lower.value < key && key < &upper.value,
|
||||
},
|
||||
(Some(lower), None) => match lower.inclusive {
|
||||
true => &lower.value <= key,
|
||||
false => &lower.value < key,
|
||||
},
|
||||
(None, Some(upper)) => match upper.inclusive {
|
||||
true => key <= &upper.value,
|
||||
false => key < &upper.value,
|
||||
},
|
||||
(None, None) => true,
|
||||
}
|
||||
}
|
||||
|
||||
keys.retain(|k| ranges.iter().all(|r| range_contains(r, k)));
|
||||
keys
|
||||
}
|
||||
|
||||
fn filter_by_regexes(mut keys: Vec<Bytes>, regexes: &[Predicate]) -> Result<Vec<Bytes>> {
|
||||
for p in regexes {
|
||||
let pattern = match p {
|
||||
Predicate::RegexMatch(r) => &r.pattern,
|
||||
_ => unreachable!(), // checked by `ensure_all_regexes`
|
||||
};
|
||||
|
||||
let regex = regex::Regex::new(pattern).with_context(|_| ParseRegexSnafu {
|
||||
pattern: pattern.to_owned(),
|
||||
})?;
|
||||
|
||||
keys.retain(|k| {
|
||||
std::str::from_utf8(k)
|
||||
.map(|k| regex.is_match(k))
|
||||
.unwrap_or_default()
|
||||
});
|
||||
if keys.is_empty() {
|
||||
return Ok(keys);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(keys)
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<Vec<Predicate>> for KeysFstApplier {
|
||||
type Error = crate::inverted_index::error::Error;
|
||||
fn try_from(predicates: Vec<Predicate>) -> Result<Self> {
|
||||
Self::try_from(predicates)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use fst::Map as FstMap;
|
||||
|
||||
use super::*;
|
||||
use crate::inverted_index::error::Error;
|
||||
use crate::inverted_index::search::predicate::{
|
||||
Bound, InListPredicate, Predicate, Range, RangePredicate, RegexMatchPredicate,
|
||||
};
|
||||
|
||||
fn create_fst_map(items: &[(&[u8], u64)]) -> FstMap<Vec<u8>> {
|
||||
let mut items = items
|
||||
.iter()
|
||||
.map(|(k, v)| (k.to_vec(), *v))
|
||||
.collect::<Vec<_>>();
|
||||
items.sort();
|
||||
FstMap::from_iter(items).unwrap()
|
||||
}
|
||||
|
||||
fn b(s: &str) -> Vec<u8> {
|
||||
s.as_bytes().to_vec()
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_keys_fst_applier_apply() {
|
||||
let test_fst = create_fst_map(&[(b"foo", 1), (b"bar", 2), (b"baz", 3)]);
|
||||
let applier = KeysFstApplier {
|
||||
keys: vec![b("foo"), b("baz")],
|
||||
};
|
||||
|
||||
let results = applier.apply(&test_fst);
|
||||
assert_eq!(results, vec![1, 3]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_keys_fst_applier_with_empty_keys() {
|
||||
let test_fst = create_fst_map(&[(b"foo", 1), (b"bar", 2), (b"baz", 3)]);
|
||||
let applier = KeysFstApplier { keys: vec![] };
|
||||
|
||||
let results = applier.apply(&test_fst);
|
||||
assert!(results.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_keys_fst_applier_with_unmatched_keys() {
|
||||
let test_fst = create_fst_map(&[(b"foo", 1), (b"bar", 2), (b"baz", 3)]);
|
||||
let applier = KeysFstApplier {
|
||||
keys: vec![b("qux"), b("quux")],
|
||||
};
|
||||
|
||||
let results = applier.apply(&test_fst);
|
||||
assert!(results.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_keys_fst_applier_try_from() {
|
||||
let predicates = vec![
|
||||
Predicate::InList(InListPredicate {
|
||||
list: HashSet::from_iter(vec![b("foo"), b("bar")]),
|
||||
}),
|
||||
Predicate::Range(RangePredicate {
|
||||
range: Range {
|
||||
lower: Some(Bound {
|
||||
value: b("bar"),
|
||||
inclusive: true,
|
||||
}),
|
||||
upper: None,
|
||||
},
|
||||
}),
|
||||
Predicate::RegexMatch(RegexMatchPredicate {
|
||||
pattern: ".*r".to_string(),
|
||||
}),
|
||||
];
|
||||
let applier = KeysFstApplier::try_from(predicates).unwrap();
|
||||
assert_eq!(applier.keys, vec![b("bar")]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_keys_fst_applier_try_from_filter_out_unmatched_keys() {
|
||||
let predicates = vec![
|
||||
Predicate::InList(InListPredicate {
|
||||
list: HashSet::from_iter(vec![b("foo"), b("bar")]),
|
||||
}),
|
||||
Predicate::Range(RangePredicate {
|
||||
range: Range {
|
||||
lower: Some(Bound {
|
||||
value: b("f"),
|
||||
inclusive: true,
|
||||
}),
|
||||
upper: None,
|
||||
},
|
||||
}),
|
||||
Predicate::RegexMatch(RegexMatchPredicate {
|
||||
pattern: ".*o".to_string(),
|
||||
}),
|
||||
];
|
||||
let applier = KeysFstApplier::try_from(predicates).unwrap();
|
||||
assert_eq!(applier.keys, vec![b("foo")]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_keys_fst_applier_try_from_empty_predicates() {
|
||||
let predicates = vec![];
|
||||
let result = KeysFstApplier::try_from(predicates);
|
||||
assert!(matches!(result, Err(Error::EmptyPredicates { .. })));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_keys_fst_applier_try_from_without_in_list() {
|
||||
let predicates = vec![Predicate::Range(RangePredicate {
|
||||
range: Range {
|
||||
lower: Some(Bound {
|
||||
value: b("bar"),
|
||||
inclusive: true,
|
||||
}),
|
||||
upper: None,
|
||||
},
|
||||
})];
|
||||
let result = KeysFstApplier::try_from(predicates);
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_keys_fst_applier_try_from_with_invalid_regex() {
|
||||
let predicates = vec![
|
||||
Predicate::InList(InListPredicate {
|
||||
list: HashSet::from_iter(vec![b("foo"), b("bar")]),
|
||||
}),
|
||||
Predicate::RegexMatch(RegexMatchPredicate {
|
||||
pattern: "*invalid regex".to_string(),
|
||||
}),
|
||||
];
|
||||
let result = KeysFstApplier::try_from(predicates);
|
||||
assert!(matches!(result, Err(Error::ParseRegex { .. })));
|
||||
}
|
||||
}
|
||||
73
src/index/src/inverted_index/search/predicate.rs
Normal file
73
src/index/src/inverted_index/search/predicate.rs
Normal file
@@ -0,0 +1,73 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashSet;
|
||||
|
||||
use crate::inverted_index::Bytes;
|
||||
|
||||
/// Enumerates types of predicates for value filtering.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub enum Predicate {
|
||||
/// Predicate for matching values in a list.
|
||||
InList(InListPredicate),
|
||||
|
||||
/// Predicate for matching values within a range.
|
||||
Range(RangePredicate),
|
||||
|
||||
/// Predicate for matching values against a regex pattern.
|
||||
RegexMatch(RegexMatchPredicate),
|
||||
}
|
||||
|
||||
/// `InListPredicate` contains a list of acceptable values. A value needs to match at least
|
||||
/// one of the elements (logical OR semantic) for the predicate to be satisfied.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct InListPredicate {
|
||||
/// List of acceptable values.
|
||||
pub list: HashSet<Bytes>,
|
||||
}
|
||||
|
||||
/// `Bound` is a sub-component of a range, representing a single-sided limit that could be inclusive or exclusive.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct Bound {
|
||||
/// Whether the bound is inclusive or exclusive.
|
||||
pub inclusive: bool,
|
||||
/// The value of the bound.
|
||||
pub value: Bytes,
|
||||
}
|
||||
|
||||
/// `Range` defines a single continuous range which can optionally have a lower and/or upper limit.
|
||||
/// Both the lower and upper bounds must be satisfied for the range condition to be true.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct Range {
|
||||
/// The lower bound of the range.
|
||||
pub lower: Option<Bound>,
|
||||
/// The upper bound of the range.
|
||||
pub upper: Option<Bound>,
|
||||
}
|
||||
|
||||
/// `RangePredicate` encapsulates a range condition that must be satisfied
|
||||
/// for the predicate to hold true (logical AND semantic between the bounds).
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct RangePredicate {
|
||||
/// The range condition.
|
||||
pub range: Range,
|
||||
}
|
||||
|
||||
/// `RegexMatchPredicate` encapsulates a single regex pattern. A value must match
|
||||
/// the pattern for the predicate to be satisfied.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct RegexMatchPredicate {
|
||||
/// The regex pattern.
|
||||
pub pattern: String,
|
||||
}
|
||||
@@ -12,4 +12,6 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#![feature(iter_partition_in_place)]
|
||||
|
||||
pub mod inverted_index;
|
||||
|
||||
@@ -304,7 +304,7 @@ async fn test_engine_truncate_during_flush() {
|
||||
let entry_id = version_data.last_entry_id;
|
||||
let sequence = version_data.committed_sequence;
|
||||
|
||||
// Flush reigon.
|
||||
// Flush region.
|
||||
let engine_cloned = engine.clone();
|
||||
let flush_task = tokio::spawn(async move {
|
||||
info!("do flush task!!!!");
|
||||
|
||||
@@ -177,7 +177,7 @@ fn may_compat_primary_key(
|
||||
CompatReaderSnafu {
|
||||
region_id: expect.region_id,
|
||||
reason: format!(
|
||||
"primary key has more columns {} than exepct {}",
|
||||
"primary key has more columns {} than expect {}",
|
||||
actual.primary_key.len(),
|
||||
expect.primary_key.len()
|
||||
),
|
||||
|
||||
@@ -114,7 +114,7 @@ impl<C: Accessor + Clone> ReadCache<C> {
|
||||
(self.mem_cache.entry_count(), self.mem_cache.weighted_size())
|
||||
}
|
||||
|
||||
/// Invalidte all cache items which key starts with `prefix`.
|
||||
/// Invalidate all cache items which key starts with `prefix`.
|
||||
pub(crate) async fn invalidate_entries_with_prefix(&self, prefix: String) {
|
||||
// Safety: always ok when building cache with `support_invalidation_closures`.
|
||||
self.mem_cache
|
||||
|
||||
@@ -2228,7 +2228,7 @@ mod test {
|
||||
"some_metric.timestamp",
|
||||
],
|
||||
),
|
||||
// single not_eq mathcer
|
||||
// single not_eq matcher
|
||||
(
|
||||
r#"some_metric{__field__!="field_1"}"#,
|
||||
vec![
|
||||
@@ -2240,7 +2240,7 @@ mod test {
|
||||
"some_metric.timestamp",
|
||||
],
|
||||
),
|
||||
// two not_eq mathcers
|
||||
// two not_eq matchers
|
||||
(
|
||||
r#"some_metric{__field__!="field_1", __field__!="field_2"}"#,
|
||||
vec![
|
||||
|
||||
@@ -200,7 +200,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_tls_option_verifiy_ca() {
|
||||
fn test_tls_option_verify_ca() {
|
||||
let s = r#"
|
||||
{
|
||||
"mode": "verify_ca",
|
||||
@@ -219,7 +219,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_tls_option_verifiy_full() {
|
||||
fn test_tls_option_verify_full() {
|
||||
let s = r#"
|
||||
{
|
||||
"mode": "verify_full",
|
||||
|
||||
Reference in New Issue
Block a user