feat(inverted_index.search): add index applier (#2868)

* feat(inverted_index.search): add fst applier

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: typos

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* feat(inverted_index.search): add fst values mapper

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* chore: remove meta check

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: fmt & clippy

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* refactor: one expect for test

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* feat(inverted_index.search): add index applier

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* refactor: bitmap_full -> bitmap_full_range

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* feat: add check for segment_row_count

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: remove redundant code

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: reader test

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* chore: match error in test

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: fmt

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* refactor: add helper function to construct fst value

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* refactor: polish unit tests

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* refactor: bytemuck to extract offset and size

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: toml format

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* refactor: use bytemuck

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* refactor: reorg value in unit tests

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* chore: update proto

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* chore: add a TODO reminder to consider optimizing the order of apply

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* refactor: InList predicates are applied first to benefit from higher selectivity

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* chore: update proto

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* feat: add read options to control the behavior of index not found

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* refactor: polish

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* refactor: move read options to implementation instead of trait

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* feat: add SearchContext, refine doc comments

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* feat: move index_not_found_strategy as a field of SearchContext

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* chore: rename varient

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
This commit is contained in:
Zhenchi
2023-12-05 16:24:24 +08:00
committed by GitHub
parent aa89d9deef
commit 0b421b5177
12 changed files with 434 additions and 12 deletions

2
Cargo.lock generated
View File

@@ -3570,7 +3570,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=2aaee38de81047537dfa42af9df63bcfb866e06c#2aaee38de81047537dfa42af9df63bcfb866e06c"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=b1d403088f02136bcebde53d604f491c260ca8e2#b1d403088f02136bcebde53d604f491c260ca8e2"
dependencies = [
"prost 0.12.2",
"serde",

View File

@@ -88,7 +88,7 @@ etcd-client = "0.12"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "2aaee38de81047537dfa42af9df63bcfb866e06c" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "b1d403088f02136bcebde53d604f491c260ca8e2" }
humantime-serde = "1.1"
itertools = "0.10"
lazy_static = "1.4"

View File

@@ -64,6 +64,9 @@ pub enum Error {
payload_size: u64,
},
#[snafu(display("Unexpected zero segment row count"))]
UnexpectedZeroSegmentRowCount { location: Location },
#[snafu(display("Failed to decode fst"))]
DecodeFst {
#[snafu(source)]
@@ -109,6 +112,9 @@ pub enum Error {
location: Location,
predicates: Vec<Predicate>,
},
#[snafu(display("index not found, name: {name}"))]
IndexNotFound { name: String, location: Location },
}
impl ErrorExt for Error {
@@ -118,6 +124,7 @@ impl ErrorExt for Error {
Seek { .. }
| Read { .. }
| UnexpectedFooterPayloadSize { .. }
| UnexpectedZeroSegmentRowCount { .. }
| UnexpectedOffsetSize { .. }
| UnexpectedBlobSize { .. }
| DecodeProto { .. }
@@ -128,7 +135,8 @@ impl ErrorExt for Error {
| ParseDFA { .. }
| KeysApplierWithoutInList { .. }
| IntersectionApplierWithInList { .. }
| EmptyPredicates { .. } => StatusCode::InvalidArguments,
| EmptyPredicates { .. }
| IndexNotFound { .. } => StatusCode::InvalidArguments,
}
}

View File

@@ -25,7 +25,7 @@ use crate::inverted_index::FstMap;
/// InvertedIndexReader defines an asynchronous reader of inverted index data
#[mockall::automock]
#[async_trait]
pub trait InvertedIndexReader {
pub trait InvertedIndexReader: Send {
/// Retrieve metadata of all inverted indices stored within the blob.
async fn metadata(&mut self) -> Result<InvertedIndexMetas>;

View File

@@ -143,7 +143,11 @@ mod tests {
};
// metas
let mut metas = InvertedIndexMetas::default();
let mut metas = InvertedIndexMetas {
total_row_count: 10,
segment_row_count: 1,
..Default::default()
};
metas.metas.insert(meta.name.clone(), meta);
metas.metas.insert(meta1.name.clone(), meta1);
let mut meta_buf = Vec::new();

View File

@@ -21,7 +21,7 @@ use snafu::{ensure, ResultExt};
use crate::inverted_index::error::{
DecodeProtoSnafu, ReadSnafu, Result, SeekSnafu, UnexpectedFooterPayloadSizeSnafu,
UnexpectedOffsetSizeSnafu,
UnexpectedOffsetSizeSnafu, UnexpectedZeroSegmentRowCountSnafu,
};
use crate::inverted_index::format::FOOTER_PAYLOAD_SIZE_SIZE;
@@ -85,6 +85,11 @@ impl<R: AsyncRead + AsyncSeek + Unpin> InvertedIndeFooterReader<R> {
/// Check if the read metadata is consistent with expected sizes and offsets.
fn validate_metas(&self, metas: &InvertedIndexMetas, payload_size: u64) -> Result<()> {
ensure!(
metas.segment_row_count > 0,
UnexpectedZeroSegmentRowCountSnafu
);
for meta in metas.metas.values() {
let InvertedIndexMeta {
base_offset,
@@ -116,7 +121,10 @@ mod tests {
use super::*;
fn create_test_payload(meta: InvertedIndexMeta) -> Vec<u8> {
let mut metas = InvertedIndexMetas::default();
let mut metas = InvertedIndexMetas {
segment_row_count: 1,
..Default::default()
};
metas.metas.insert("test".to_string(), meta);
let mut payload_buf = vec![];
@@ -131,7 +139,6 @@ mod tests {
async fn test_read_payload() {
let meta = InvertedIndexMeta {
name: "test".to_string(),
segment_row_count: 4096,
..Default::default()
};
@@ -145,14 +152,12 @@ mod tests {
assert_eq!(metas.metas.len(), 1);
let index_meta = &metas.metas.get("test").unwrap();
assert_eq!(index_meta.name, "test");
assert_eq!(index_meta.segment_row_count, 4096);
}
#[tokio::test]
async fn test_invalid_footer_payload_size() {
let meta = InvertedIndexMeta {
name: "test".to_string(),
segment_row_count: 4096,
..Default::default()
};
@@ -171,7 +176,6 @@ mod tests {
name: "test".to_string(),
base_offset: 0,
inverted_index_size: 1, // Set size to 1 to make ecceed the blob size
segment_row_count: 4096,
..Default::default()
};

View File

@@ -14,4 +14,5 @@
pub mod fst_apply;
pub mod fst_values_mapper;
pub mod index_apply;
pub mod predicate;

View File

@@ -22,6 +22,7 @@ use crate::inverted_index::FstMap;
/// A trait for objects that can process a finite state transducer (FstMap) and return
/// associated values.
#[mockall::automock]
pub trait FstApplier: Send + Sync {
/// Retrieves values from an FstMap.
///

View File

@@ -89,7 +89,7 @@ impl KeysFstApplier {
fn get_list(p: &Predicate) -> &HashSet<Bytes> {
match p {
Predicate::InList(i) => &i.list,
_ => unreachable!(), // `in_lists` is filtered by `split_at_in_lists
_ => unreachable!(), // `in_lists` is filtered by `split_at_in_lists`
}
}

View File

@@ -0,0 +1,57 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod predicates_apply;
use async_trait::async_trait;
pub use predicates_apply::PredicatesIndexApplier;
use crate::inverted_index::error::Result;
use crate::inverted_index::format::reader::InvertedIndexReader;
/// A trait for processing and transforming indices obtained from an inverted index.
///
/// Applier instances are reusable and work with various `InvertedIndexReader` instances,
/// avoiding repeated compilation of fixed predicates such as regex patterns.
#[async_trait]
pub trait IndexApplier {
/// Applies the predefined predicates to the data read by the given index reader, returning
/// a list of relevant indices (e.g., post IDs, group IDs, row IDs).
async fn apply(
&self,
context: SearchContext,
reader: &mut dyn InvertedIndexReader,
) -> Result<Vec<usize>>;
}
/// A context for searching the inverted index.
#[derive(Clone, Debug, Eq, PartialEq, Default)]
pub struct SearchContext {
/// `index_not_found_strategy` controls the behavior of the applier when the index is not found.
pub index_not_found_strategy: IndexNotFoundStrategy,
}
/// Defines the behavior of an applier when the index is not found.
#[derive(Clone, Copy, Debug, Eq, PartialEq, Default)]
pub enum IndexNotFoundStrategy {
/// Return an empty list of indices.
#[default]
ReturnEmpty,
/// Ignore the index and continue.
Ignore,
/// Throw an error.
ThrowError,
}

View File

@@ -0,0 +1,346 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use async_trait::async_trait;
use common_base::BitVec;
use greptime_proto::v1::index::InvertedIndexMetas;
use crate::inverted_index::error::{IndexNotFoundSnafu, Result};
use crate::inverted_index::format::reader::InvertedIndexReader;
use crate::inverted_index::search::fst_apply::{
FstApplier, IntersectionFstApplier, KeysFstApplier,
};
use crate::inverted_index::search::fst_values_mapper::FstValuesMapper;
use crate::inverted_index::search::index_apply::{
IndexApplier, IndexNotFoundStrategy, SearchContext,
};
use crate::inverted_index::search::predicate::Predicate;
type IndexName = String;
/// `PredicatesIndexApplier` contains a collection of `FstApplier`s, each associated with an index name,
/// to process and filter index data based on compiled predicates.
pub struct PredicatesIndexApplier {
/// A list of `FstApplier`s, each associated with a specific index name
/// (e.g. a tag field uses its column name as index name)
fst_appliers: Vec<(IndexName, Box<dyn FstApplier>)>,
}
#[async_trait]
impl IndexApplier for PredicatesIndexApplier {
/// Applies all `FstApplier`s to the data in the inverted index reader, intersecting the individual
/// bitmaps obtained for each index to result in a final set of indices.
async fn apply(
&self,
context: SearchContext,
reader: &mut dyn InvertedIndexReader,
) -> Result<Vec<usize>> {
let metadata = reader.metadata().await?;
let mut bitmap = Self::bitmap_full_range(&metadata);
// TODO(zhongzc): optimize the order of applying to make it quicker to return empty.
for (name, fst_applier) in &self.fst_appliers {
if bitmap.count_ones() == 0 {
break;
}
let Some(meta) = metadata.metas.get(name) else {
match context.index_not_found_strategy {
IndexNotFoundStrategy::ReturnEmpty => {
return Ok(vec![]);
}
IndexNotFoundStrategy::Ignore => {
continue;
}
IndexNotFoundStrategy::ThrowError => {
return IndexNotFoundSnafu { name }.fail();
}
}
};
let fst = reader.fst(meta).await?;
let values = fst_applier.apply(&fst);
let mut mapper = FstValuesMapper::new(&mut *reader, meta);
let bm = mapper.map_values(&values).await?;
bitmap &= bm;
}
Ok(bitmap.iter_ones().collect())
}
}
impl PredicatesIndexApplier {
/// Constructs an instance of `PredicatesIndexApplier` based on a list of tag predicates.
/// Chooses an appropriate `FstApplier` for each index name based on the nature of its predicates.
pub fn try_from(mut predicates: Vec<(IndexName, Vec<Predicate>)>) -> Result<Self> {
let mut fst_appliers = Vec::with_capacity(predicates.len());
// InList predicates are applied first to benefit from higher selectivity.
let in_list_index = predicates
.iter_mut()
.partition_in_place(|(_, ps)| ps.iter().any(|p| matches!(p, Predicate::InList(_))));
let mut iter = predicates.into_iter();
for _ in 0..in_list_index {
let (tag_name, predicates) = iter.next().unwrap();
let fst_applier = Box::new(KeysFstApplier::try_from(predicates)?) as _;
fst_appliers.push((tag_name, fst_applier));
}
for (tag_name, predicates) in iter {
if predicates.is_empty() {
continue;
}
let fst_applier = Box::new(IntersectionFstApplier::try_from(predicates)?) as _;
fst_appliers.push((tag_name, fst_applier));
}
Ok(PredicatesIndexApplier { fst_appliers })
}
/// Creates a `BitVec` representing the full range of data in the index for initial scanning.
fn bitmap_full_range(metadata: &InvertedIndexMetas) -> BitVec {
let total_count = metadata.total_row_count;
let segment_count = metadata.segment_row_count;
let len = (total_count + segment_count - 1) / segment_count;
BitVec::repeat(true, len as _)
}
}
impl TryFrom<Vec<(String, Vec<Predicate>)>> for PredicatesIndexApplier {
type Error = crate::inverted_index::error::Error;
fn try_from(predicates: Vec<(String, Vec<Predicate>)>) -> Result<Self> {
Self::try_from(predicates)
}
}
#[cfg(test)]
mod tests {
use common_base::bit_vec::prelude::*;
use greptime_proto::v1::index::InvertedIndexMeta;
use super::*;
use crate::inverted_index::error::Error;
use crate::inverted_index::format::reader::MockInvertedIndexReader;
use crate::inverted_index::search::fst_apply::MockFstApplier;
use crate::inverted_index::FstMap;
fn s(s: &'static str) -> String {
s.to_owned()
}
fn mock_metas(tags: impl IntoIterator<Item = &'static str>) -> InvertedIndexMetas {
let mut metas = InvertedIndexMetas {
total_row_count: 8,
segment_row_count: 1,
..Default::default()
};
for tag in tags.into_iter() {
let meta = InvertedIndexMeta {
name: s(tag),
..Default::default()
};
metas.metas.insert(s(tag), meta);
}
metas
}
fn key_fst_applier(value: &'static str) -> Box<dyn FstApplier> {
let mut mock_fst_applier = MockFstApplier::new();
mock_fst_applier
.expect_apply()
.returning(move |fst| fst.get(value).into_iter().collect());
Box::new(mock_fst_applier)
}
fn fst_value(offset: u32, size: u32) -> u64 {
bytemuck::cast::<_, u64>([offset, size])
}
#[tokio::test]
async fn test_index_applier_apply_get_key() {
// An index applier that point-gets "tag-0_value-0" on tag "tag-0"
let applier = PredicatesIndexApplier {
fst_appliers: vec![(s("tag-0"), key_fst_applier("tag-0_value-0"))],
};
// An index reader with a single tag "tag-0" and a corresponding value "tag-0_value-0"
let mut mock_reader = MockInvertedIndexReader::new();
mock_reader
.expect_metadata()
.returning(|| Ok(mock_metas(["tag-0"])));
mock_reader
.expect_fst()
.returning(|meta| match meta.name.as_str() {
"tag-0" => Ok(FstMap::from_iter([(b"tag-0_value-0", fst_value(2, 1))]).unwrap()),
_ => unreachable!(),
});
mock_reader.expect_bitmap().returning(|meta, offset, size| {
match (meta.name.as_str(), offset, size) {
("tag-0", 2, 1) => Ok(bitvec![u8, Lsb0; 1, 0, 1, 0, 1, 0, 1, 0]),
_ => unreachable!(),
}
});
let indices = applier
.apply(SearchContext::default(), &mut mock_reader)
.await
.unwrap();
assert_eq!(indices, vec![0, 2, 4, 6]);
// An index reader with a single tag "tag-0" but without value "tag-0_value-0"
let mut mock_reader = MockInvertedIndexReader::new();
mock_reader
.expect_metadata()
.returning(|| Ok(mock_metas(["tag-0"])));
mock_reader
.expect_fst()
.returning(|meta| match meta.name.as_str() {
"tag-0" => Ok(FstMap::from_iter([(b"tag-0_value-1", fst_value(2, 1))]).unwrap()),
_ => unreachable!(),
});
let indices = applier
.apply(SearchContext::default(), &mut mock_reader)
.await
.unwrap();
assert!(indices.is_empty());
}
#[tokio::test]
async fn test_index_applier_apply_intersection_with_two_tags() {
// An index applier that intersects "tag-0_value-0" on tag "tag-0" and "tag-1_value-a" on tag "tag-1"
let applier = PredicatesIndexApplier {
fst_appliers: vec![
(s("tag-0"), key_fst_applier("tag-0_value-0")),
(s("tag-1"), key_fst_applier("tag-1_value-a")),
],
};
// An index reader with two tags "tag-0" and "tag-1" and respective values "tag-0_value-0" and "tag-1_value-a"
let mut mock_reader = MockInvertedIndexReader::new();
mock_reader
.expect_metadata()
.returning(|| Ok(mock_metas(["tag-0", "tag-1"])));
mock_reader
.expect_fst()
.returning(|meta| match meta.name.as_str() {
"tag-0" => Ok(FstMap::from_iter([(b"tag-0_value-0", fst_value(1, 1))]).unwrap()),
"tag-1" => Ok(FstMap::from_iter([(b"tag-1_value-a", fst_value(2, 1))]).unwrap()),
_ => unreachable!(),
});
mock_reader.expect_bitmap().returning(|meta, offset, size| {
match (meta.name.as_str(), offset, size) {
("tag-0", 1, 1) => Ok(bitvec![u8, Lsb0; 1, 0, 1, 0, 1, 0, 1, 0]),
("tag-1", 2, 1) => Ok(bitvec![u8, Lsb0; 1, 1, 0, 1, 1, 0, 1, 1]),
_ => unreachable!(),
}
});
let indices = applier
.apply(SearchContext::default(), &mut mock_reader)
.await
.unwrap();
assert_eq!(indices, vec![0, 4, 6]);
}
#[tokio::test]
async fn test_index_applier_without_predicates() {
let applier = PredicatesIndexApplier {
fst_appliers: vec![],
};
let mut mock_reader: MockInvertedIndexReader = MockInvertedIndexReader::new();
mock_reader
.expect_metadata()
.returning(|| Ok(mock_metas(["tag-0"])));
let indices = applier
.apply(SearchContext::default(), &mut mock_reader)
.await
.unwrap();
assert_eq!(indices, vec![0, 1, 2, 3, 4, 5, 6, 7]); // full range to scan
}
#[tokio::test]
async fn test_index_applier_with_empty_index() {
let mut mock_reader = MockInvertedIndexReader::new();
mock_reader.expect_metadata().returning(move || {
Ok(InvertedIndexMetas {
total_row_count: 0, // No rows
segment_row_count: 1,
..Default::default()
})
});
let mut mock_fst_applier = MockFstApplier::new();
mock_fst_applier.expect_apply().never();
let applier = PredicatesIndexApplier {
fst_appliers: vec![(s("tag-0"), Box::new(mock_fst_applier))],
};
let indices = applier
.apply(SearchContext::default(), &mut mock_reader)
.await
.unwrap();
assert!(indices.is_empty());
}
#[tokio::test]
async fn test_index_applier_with_nonexistent_index() {
let mut mock_reader = MockInvertedIndexReader::new();
mock_reader
.expect_metadata()
.returning(|| Ok(mock_metas(vec![])));
let mut mock_fst_applier = MockFstApplier::new();
mock_fst_applier.expect_apply().never();
let applier = PredicatesIndexApplier {
fst_appliers: vec![(s("tag-0"), Box::new(mock_fst_applier))],
};
let result = applier
.apply(
SearchContext {
index_not_found_strategy: IndexNotFoundStrategy::ThrowError,
},
&mut mock_reader,
)
.await;
assert!(matches!(result, Err(Error::IndexNotFound { .. })));
let indices = applier
.apply(
SearchContext {
index_not_found_strategy: IndexNotFoundStrategy::ReturnEmpty,
},
&mut mock_reader,
)
.await
.unwrap();
assert!(indices.is_empty());
let indices = applier
.apply(
SearchContext {
index_not_found_strategy: IndexNotFoundStrategy::Ignore,
},
&mut mock_reader,
)
.await
.unwrap();
assert_eq!(indices, vec![0, 1, 2, 3, 4, 5, 6, 7]);
}
}

View File

@@ -167,6 +167,7 @@ impl HeartbeatHandler for RegionLeaseHandler {
.collect::<Vec<_>>(),
duration_since_epoch: req.duration_since_epoch,
lease_seconds: self.region_lease_seconds,
closeable_region_ids: vec![],
});
Ok(HandleControl::Continue)