mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-03 11:52:54 +00:00
feat(index): support building inverted index for the field column on Mito (#4887)
feat(index): support building inverted index for the field column Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
This commit is contained in:
@@ -114,17 +114,17 @@ impl PredicatesIndexApplier {
|
||||
.partition_in_place(|(_, ps)| ps.iter().any(|p| matches!(p, Predicate::InList(_))));
|
||||
let mut iter = predicates.into_iter();
|
||||
for _ in 0..in_list_index {
|
||||
let (tag_name, predicates) = iter.next().unwrap();
|
||||
let (column_name, predicates) = iter.next().unwrap();
|
||||
let fst_applier = Box::new(KeysFstApplier::try_from(predicates)?) as _;
|
||||
fst_appliers.push((tag_name, fst_applier));
|
||||
fst_appliers.push((column_name, fst_applier));
|
||||
}
|
||||
|
||||
for (tag_name, predicates) in iter {
|
||||
for (column_name, predicates) in iter {
|
||||
if predicates.is_empty() {
|
||||
continue;
|
||||
}
|
||||
let fst_applier = Box::new(IntersectionFstApplier::try_from(predicates)?) as _;
|
||||
fst_appliers.push((tag_name, fst_applier));
|
||||
fst_appliers.push((column_name, fst_applier));
|
||||
}
|
||||
|
||||
Ok(PredicatesIndexApplier { fst_appliers })
|
||||
|
||||
@@ -393,20 +393,29 @@ impl ScanRegion {
|
||||
.and_then(|c| c.index_cache())
|
||||
.cloned();
|
||||
|
||||
// TODO(zhongzc): currently we only index tag columns, need to support field columns.
|
||||
let ignore_column_ids = &self
|
||||
.version
|
||||
.options
|
||||
.index_options
|
||||
.inverted_index
|
||||
.ignore_column_ids;
|
||||
let indexed_column_ids = self
|
||||
.version
|
||||
.metadata
|
||||
.primary_key
|
||||
.iter()
|
||||
.filter(|id| !ignore_column_ids.contains(id))
|
||||
.copied()
|
||||
.collect::<HashSet<_>>();
|
||||
|
||||
InvertedIndexApplierBuilder::new(
|
||||
self.access_layer.region_dir().to_string(),
|
||||
self.access_layer.object_store().clone(),
|
||||
file_cache,
|
||||
index_cache,
|
||||
self.version.metadata.as_ref(),
|
||||
self.version
|
||||
.options
|
||||
.index_options
|
||||
.inverted_index
|
||||
.ignore_column_ids
|
||||
.iter()
|
||||
.copied()
|
||||
.collect(),
|
||||
indexed_column_ids,
|
||||
self.access_layer.puffin_manager_factory().clone(),
|
||||
)
|
||||
.build(&self.request.filters)
|
||||
|
||||
@@ -20,6 +20,7 @@ pub(crate) mod puffin_manager;
|
||||
mod statistics;
|
||||
mod store;
|
||||
|
||||
use std::collections::HashSet;
|
||||
use std::num::NonZeroUsize;
|
||||
|
||||
use common_telemetry::{debug, warn};
|
||||
@@ -212,13 +213,28 @@ impl<'a> IndexerBuilder<'a> {
|
||||
segment_row_count = row_group_size;
|
||||
}
|
||||
|
||||
// TODO(zhongzc): currently we only index tag columns, need to support field columns.
|
||||
let indexed_column_ids = self
|
||||
.metadata
|
||||
.primary_key
|
||||
.iter()
|
||||
.filter(|id| {
|
||||
!self
|
||||
.index_options
|
||||
.inverted_index
|
||||
.ignore_column_ids
|
||||
.contains(id)
|
||||
})
|
||||
.copied()
|
||||
.collect::<HashSet<_>>();
|
||||
|
||||
let indexer = InvertedIndexer::new(
|
||||
self.file_id,
|
||||
self.metadata,
|
||||
self.intermediate_manager.clone(),
|
||||
self.inverted_index_config.mem_threshold_on_create(),
|
||||
segment_row_count,
|
||||
&self.index_options.inverted_index.ignore_column_ids,
|
||||
indexed_column_ids,
|
||||
);
|
||||
|
||||
Some(indexer)
|
||||
|
||||
@@ -20,7 +20,6 @@ mod regex_match;
|
||||
|
||||
use std::collections::{HashMap, HashSet};
|
||||
|
||||
use api::v1::SemanticType;
|
||||
use common_telemetry::warn;
|
||||
use datafusion_common::ScalarValue;
|
||||
use datafusion_expr::{BinaryExpr, Expr, Operator};
|
||||
@@ -55,8 +54,8 @@ pub(crate) struct InvertedIndexApplierBuilder<'a> {
|
||||
/// Metadata of the region, used to get metadata like column type.
|
||||
metadata: &'a RegionMetadata,
|
||||
|
||||
/// Column ids to ignore.
|
||||
ignore_column_ids: HashSet<ColumnId>,
|
||||
/// Column ids of the columns that are indexed.
|
||||
indexed_column_ids: HashSet<ColumnId>,
|
||||
|
||||
/// Stores predicates during traversal on the Expr tree.
|
||||
output: HashMap<ColumnId, Vec<Predicate>>,
|
||||
@@ -76,7 +75,7 @@ impl<'a> InvertedIndexApplierBuilder<'a> {
|
||||
file_cache: Option<FileCacheRef>,
|
||||
index_cache: Option<InvertedIndexCacheRef>,
|
||||
metadata: &'a RegionMetadata,
|
||||
ignore_column_ids: HashSet<ColumnId>,
|
||||
indexed_column_ids: HashSet<ColumnId>,
|
||||
puffin_manager_factory: PuffinManagerFactory,
|
||||
) -> Self {
|
||||
Self {
|
||||
@@ -84,7 +83,7 @@ impl<'a> InvertedIndexApplierBuilder<'a> {
|
||||
object_store,
|
||||
file_cache,
|
||||
metadata,
|
||||
ignore_column_ids,
|
||||
indexed_column_ids,
|
||||
output: HashMap::default(),
|
||||
index_cache,
|
||||
puffin_manager_factory,
|
||||
@@ -156,9 +155,9 @@ impl<'a> InvertedIndexApplierBuilder<'a> {
|
||||
self.output.entry(column_id).or_default().push(predicate);
|
||||
}
|
||||
|
||||
/// Helper function to get the column id and the column type of a tag column.
|
||||
/// Helper function to get the column id and the column type of a column.
|
||||
/// Returns `None` if the column is not a tag column or if the column is ignored.
|
||||
fn tag_column_id_and_type(
|
||||
fn column_id_and_type(
|
||||
&self,
|
||||
column_name: &str,
|
||||
) -> Result<Option<(ColumnId, ConcreteDataType)>> {
|
||||
@@ -169,11 +168,7 @@ impl<'a> InvertedIndexApplierBuilder<'a> {
|
||||
column: column_name,
|
||||
})?;
|
||||
|
||||
if self.ignore_column_ids.contains(&column.column_id) {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
if column.semantic_type != SemanticType::Tag {
|
||||
if !self.indexed_column_ids.contains(&column.column_id) {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
@@ -330,7 +325,7 @@ mod tests {
|
||||
None,
|
||||
None,
|
||||
&metadata,
|
||||
HashSet::default(),
|
||||
HashSet::from_iter([1, 2, 3]),
|
||||
facotry,
|
||||
);
|
||||
|
||||
|
||||
@@ -28,7 +28,7 @@ impl InvertedIndexApplierBuilder<'_> {
|
||||
let Some(column_name) = Self::column_name(&between.expr) else {
|
||||
return Ok(());
|
||||
};
|
||||
let Some((column_id, data_type)) = self.tag_column_id_and_type(column_name)? else {
|
||||
let Some((column_id, data_type)) = self.column_id_and_type(column_name)? else {
|
||||
return Ok(());
|
||||
};
|
||||
let Some(low) = Self::nonnull_lit(&between.low) else {
|
||||
@@ -78,7 +78,7 @@ mod tests {
|
||||
None,
|
||||
None,
|
||||
&metadata,
|
||||
HashSet::default(),
|
||||
HashSet::from_iter([1, 2, 3]),
|
||||
facotry,
|
||||
);
|
||||
|
||||
@@ -121,7 +121,7 @@ mod tests {
|
||||
None,
|
||||
None,
|
||||
&metadata,
|
||||
HashSet::default(),
|
||||
HashSet::from_iter([1, 2, 3]),
|
||||
facotry,
|
||||
);
|
||||
|
||||
@@ -147,7 +147,7 @@ mod tests {
|
||||
None,
|
||||
None,
|
||||
&metadata,
|
||||
HashSet::default(),
|
||||
HashSet::from_iter([1, 2, 3]),
|
||||
facotry,
|
||||
);
|
||||
|
||||
@@ -159,7 +159,24 @@ mod tests {
|
||||
};
|
||||
|
||||
builder.collect_between(&between).unwrap();
|
||||
assert!(builder.output.is_empty());
|
||||
|
||||
let predicates = builder.output.get(&3).unwrap();
|
||||
assert_eq!(predicates.len(), 1);
|
||||
assert_eq!(
|
||||
predicates[0],
|
||||
Predicate::Range(RangePredicate {
|
||||
range: Range {
|
||||
lower: Some(Bound {
|
||||
inclusive: true,
|
||||
value: encoded_string("abc"),
|
||||
}),
|
||||
upper: Some(Bound {
|
||||
inclusive: true,
|
||||
value: encoded_string("def"),
|
||||
}),
|
||||
}
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -173,7 +190,7 @@ mod tests {
|
||||
None,
|
||||
None,
|
||||
&metadata,
|
||||
HashSet::default(),
|
||||
HashSet::from_iter([1, 2, 3]),
|
||||
facotry,
|
||||
);
|
||||
|
||||
@@ -200,7 +217,7 @@ mod tests {
|
||||
None,
|
||||
None,
|
||||
&metadata,
|
||||
HashSet::default(),
|
||||
HashSet::from_iter([1, 2, 3]),
|
||||
facotry,
|
||||
);
|
||||
|
||||
|
||||
@@ -114,7 +114,7 @@ impl InvertedIndexApplierBuilder<'_> {
|
||||
let Some(lit) = Self::nonnull_lit(literal) else {
|
||||
return Ok(());
|
||||
};
|
||||
let Some((column_id, data_type)) = self.tag_column_id_and_type(column_name)? else {
|
||||
let Some((column_id, data_type)) = self.column_id_and_type(column_name)? else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
@@ -234,7 +234,7 @@ mod tests {
|
||||
None,
|
||||
None,
|
||||
&metadata,
|
||||
HashSet::default(),
|
||||
HashSet::from_iter([1, 2, 3]),
|
||||
facotry,
|
||||
);
|
||||
|
||||
@@ -263,7 +263,7 @@ mod tests {
|
||||
None,
|
||||
None,
|
||||
&metadata,
|
||||
HashSet::default(),
|
||||
HashSet::from_iter([1, 2, 3]),
|
||||
facotry,
|
||||
);
|
||||
|
||||
@@ -283,14 +283,28 @@ mod tests {
|
||||
None,
|
||||
None,
|
||||
&metadata,
|
||||
HashSet::default(),
|
||||
HashSet::from_iter([1, 2, 3]),
|
||||
facotry,
|
||||
);
|
||||
|
||||
builder
|
||||
.collect_comparison_expr(&field_column(), &Operator::Lt, &string_lit("abc"))
|
||||
.unwrap();
|
||||
assert!(builder.output.is_empty());
|
||||
|
||||
let predicates = builder.output.get(&3).unwrap();
|
||||
assert_eq!(predicates.len(), 1);
|
||||
assert_eq!(
|
||||
predicates[0],
|
||||
Predicate::Range(RangePredicate {
|
||||
range: Range {
|
||||
lower: None,
|
||||
upper: Some(Bound {
|
||||
inclusive: false,
|
||||
value: encoded_string("abc"),
|
||||
}),
|
||||
}
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -304,7 +318,7 @@ mod tests {
|
||||
None,
|
||||
None,
|
||||
&metadata,
|
||||
HashSet::default(),
|
||||
HashSet::from_iter([1, 2, 3]),
|
||||
facotry,
|
||||
);
|
||||
|
||||
|
||||
@@ -31,7 +31,7 @@ impl InvertedIndexApplierBuilder<'_> {
|
||||
let Some(lit) = Self::nonnull_lit(right).or_else(|| Self::nonnull_lit(left)) else {
|
||||
return Ok(());
|
||||
};
|
||||
let Some((column_id, data_type)) = self.tag_column_id_and_type(column_name)? else {
|
||||
let Some((column_id, data_type)) = self.column_id_and_type(column_name)? else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
@@ -59,7 +59,7 @@ impl InvertedIndexApplierBuilder<'_> {
|
||||
let Some(lit) = Self::nonnull_lit(right).or_else(|| Self::nonnull_lit(left)) else {
|
||||
return Ok(());
|
||||
};
|
||||
let Some((column_id, data_type)) = self.tag_column_id_and_type(column_name)? else {
|
||||
let Some((column_id, data_type)) = self.column_id_and_type(column_name)? else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
@@ -140,7 +140,7 @@ mod tests {
|
||||
None,
|
||||
None,
|
||||
&metadata,
|
||||
HashSet::default(),
|
||||
HashSet::from_iter([1, 2, 3]),
|
||||
facotry,
|
||||
);
|
||||
|
||||
@@ -178,14 +178,22 @@ mod tests {
|
||||
None,
|
||||
None,
|
||||
&metadata,
|
||||
HashSet::default(),
|
||||
HashSet::from_iter([1, 2, 3]),
|
||||
facotry,
|
||||
);
|
||||
|
||||
builder
|
||||
.collect_eq(&field_column(), &string_lit("abc"))
|
||||
.unwrap();
|
||||
assert!(builder.output.is_empty());
|
||||
|
||||
let predicates = builder.output.get(&3).unwrap();
|
||||
assert_eq!(predicates.len(), 1);
|
||||
assert_eq!(
|
||||
predicates[0],
|
||||
Predicate::InList(InListPredicate {
|
||||
list: HashSet::from_iter([encoded_string("abc")])
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -199,7 +207,7 @@ mod tests {
|
||||
None,
|
||||
None,
|
||||
&metadata,
|
||||
HashSet::default(),
|
||||
HashSet::from_iter([1, 2, 3]),
|
||||
facotry,
|
||||
);
|
||||
|
||||
@@ -219,7 +227,7 @@ mod tests {
|
||||
None,
|
||||
None,
|
||||
&metadata,
|
||||
HashSet::default(),
|
||||
HashSet::from_iter([1, 2, 3]),
|
||||
facotry,
|
||||
);
|
||||
|
||||
@@ -239,7 +247,7 @@ mod tests {
|
||||
None,
|
||||
None,
|
||||
&metadata,
|
||||
HashSet::default(),
|
||||
HashSet::from_iter([1, 2, 3]),
|
||||
facotry,
|
||||
);
|
||||
|
||||
@@ -298,7 +306,7 @@ mod tests {
|
||||
None,
|
||||
None,
|
||||
&metadata,
|
||||
HashSet::default(),
|
||||
HashSet::from_iter([1, 2, 3]),
|
||||
facotry,
|
||||
);
|
||||
|
||||
@@ -336,7 +344,7 @@ mod tests {
|
||||
None,
|
||||
None,
|
||||
&metadata,
|
||||
HashSet::default(),
|
||||
HashSet::from_iter([1, 2, 3]),
|
||||
facotry,
|
||||
);
|
||||
|
||||
|
||||
@@ -29,7 +29,7 @@ impl InvertedIndexApplierBuilder<'_> {
|
||||
let Some(column_name) = Self::column_name(&inlist.expr) else {
|
||||
return Ok(());
|
||||
};
|
||||
let Some((column_id, data_type)) = self.tag_column_id_and_type(column_name)? else {
|
||||
let Some((column_id, data_type)) = self.column_id_and_type(column_name)? else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
@@ -71,7 +71,7 @@ mod tests {
|
||||
None,
|
||||
None,
|
||||
&metadata,
|
||||
HashSet::default(),
|
||||
HashSet::from_iter([1, 2, 3]),
|
||||
facotry,
|
||||
);
|
||||
|
||||
@@ -104,7 +104,7 @@ mod tests {
|
||||
None,
|
||||
None,
|
||||
&metadata,
|
||||
HashSet::default(),
|
||||
HashSet::from_iter([1, 2, 3]),
|
||||
facotry,
|
||||
);
|
||||
|
||||
@@ -129,7 +129,7 @@ mod tests {
|
||||
None,
|
||||
None,
|
||||
&metadata,
|
||||
HashSet::default(),
|
||||
HashSet::from_iter([1, 2, 3]),
|
||||
facotry,
|
||||
);
|
||||
|
||||
@@ -140,7 +140,15 @@ mod tests {
|
||||
};
|
||||
|
||||
builder.collect_inlist(&in_list).unwrap();
|
||||
assert!(builder.output.is_empty());
|
||||
|
||||
let predicates = builder.output.get(&3).unwrap();
|
||||
assert_eq!(predicates.len(), 1);
|
||||
assert_eq!(
|
||||
predicates[0],
|
||||
Predicate::InList(InListPredicate {
|
||||
list: HashSet::from_iter([encoded_string("foo"), encoded_string("bar")])
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -154,7 +162,7 @@ mod tests {
|
||||
None,
|
||||
None,
|
||||
&metadata,
|
||||
HashSet::default(),
|
||||
HashSet::from_iter([1, 2, 3]),
|
||||
facotry,
|
||||
);
|
||||
|
||||
@@ -181,7 +189,7 @@ mod tests {
|
||||
None,
|
||||
None,
|
||||
&metadata,
|
||||
HashSet::default(),
|
||||
HashSet::from_iter([1, 2, 3]),
|
||||
facotry,
|
||||
);
|
||||
|
||||
|
||||
@@ -25,7 +25,7 @@ impl InvertedIndexApplierBuilder<'_> {
|
||||
let Some(column_name) = Self::column_name(column) else {
|
||||
return Ok(());
|
||||
};
|
||||
let Some((column_id, data_type)) = self.tag_column_id_and_type(column_name)? else {
|
||||
let Some((column_id, data_type)) = self.column_id_and_type(column_name)? else {
|
||||
return Ok(());
|
||||
};
|
||||
if !data_type.is_string() {
|
||||
@@ -65,7 +65,7 @@ mod tests {
|
||||
None,
|
||||
None,
|
||||
&metadata,
|
||||
HashSet::default(),
|
||||
HashSet::from_iter([1, 2, 3]),
|
||||
facotry,
|
||||
);
|
||||
|
||||
@@ -94,7 +94,7 @@ mod tests {
|
||||
None,
|
||||
None,
|
||||
&metadata,
|
||||
HashSet::default(),
|
||||
HashSet::from_iter([1, 2, 3]),
|
||||
facotry,
|
||||
);
|
||||
|
||||
@@ -102,7 +102,14 @@ mod tests {
|
||||
.collect_regex_match(&field_column(), &string_lit("abc"))
|
||||
.unwrap();
|
||||
|
||||
assert!(builder.output.is_empty());
|
||||
let predicates = builder.output.get(&3).unwrap();
|
||||
assert_eq!(predicates.len(), 1);
|
||||
assert_eq!(
|
||||
predicates[0],
|
||||
Predicate::RegexMatch(RegexMatchPredicate {
|
||||
pattern: "abc".to_string()
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -116,7 +123,7 @@ mod tests {
|
||||
None,
|
||||
None,
|
||||
&metadata,
|
||||
HashSet::default(),
|
||||
HashSet::from_iter([1, 2, 3]),
|
||||
facotry,
|
||||
);
|
||||
|
||||
@@ -138,7 +145,7 @@ mod tests {
|
||||
None,
|
||||
None,
|
||||
&metadata,
|
||||
HashSet::default(),
|
||||
HashSet::from_iter([1, 2, 3]),
|
||||
facotry,
|
||||
);
|
||||
|
||||
|
||||
@@ -36,6 +36,7 @@ use crate::error::{
|
||||
PushIndexValueSnafu, Result,
|
||||
};
|
||||
use crate::read::Batch;
|
||||
use crate::row_converter::SortField;
|
||||
use crate::sst::file::FileId;
|
||||
use crate::sst::index::intermediate::{IntermediateLocation, IntermediateManager};
|
||||
use crate::sst::index::inverted_index::codec::{IndexValueCodec, IndexValuesCodec};
|
||||
@@ -72,7 +73,7 @@ pub struct InvertedIndexer {
|
||||
memory_usage: Arc<AtomicUsize>,
|
||||
|
||||
/// Ids of indexed columns.
|
||||
column_ids: HashSet<ColumnId>,
|
||||
indexed_column_ids: HashSet<ColumnId>,
|
||||
}
|
||||
|
||||
impl InvertedIndexer {
|
||||
@@ -84,7 +85,7 @@ impl InvertedIndexer {
|
||||
intermediate_manager: IntermediateManager,
|
||||
memory_usage_threshold: Option<usize>,
|
||||
segment_row_count: NonZeroUsize,
|
||||
ignore_column_ids: &[ColumnId],
|
||||
indexed_column_ids: HashSet<ColumnId>,
|
||||
) -> Self {
|
||||
let temp_file_provider = Arc::new(TempFileProvider::new(
|
||||
IntermediateLocation::new(&metadata.region_id, &sst_file_id),
|
||||
@@ -102,14 +103,6 @@ impl InvertedIndexer {
|
||||
let index_creator = Box::new(SortIndexCreator::new(sorter, segment_row_count));
|
||||
|
||||
let codec = IndexValuesCodec::from_tag_columns(metadata.primary_key_columns());
|
||||
let mut column_ids = metadata
|
||||
.primary_key_columns()
|
||||
.map(|c| c.column_id)
|
||||
.collect::<HashSet<_>>();
|
||||
for id in ignore_column_ids {
|
||||
column_ids.remove(id);
|
||||
}
|
||||
|
||||
Self {
|
||||
codec,
|
||||
index_creator,
|
||||
@@ -118,7 +111,7 @@ impl InvertedIndexer {
|
||||
stats: Statistics::new(TYPE_INVERTED_INDEX),
|
||||
aborted: false,
|
||||
memory_usage,
|
||||
column_ids,
|
||||
indexed_column_ids,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -189,7 +182,7 @@ impl InvertedIndexer {
|
||||
guard.inc_row_count(n);
|
||||
|
||||
for ((col_id, col_id_str), field, value) in self.codec.decode(batch.primary_key())? {
|
||||
if !self.column_ids.contains(col_id) {
|
||||
if !self.indexed_column_ids.contains(col_id) {
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -210,6 +203,32 @@ impl InvertedIndexer {
|
||||
.context(PushIndexValueSnafu)?;
|
||||
}
|
||||
|
||||
for field in batch.fields() {
|
||||
if !self.indexed_column_ids.contains(&field.column_id) {
|
||||
continue;
|
||||
}
|
||||
|
||||
let sort_field = SortField::new(field.data.data_type());
|
||||
let col_id_str = field.column_id.to_string();
|
||||
for i in 0..n {
|
||||
self.value_buf.clear();
|
||||
let value = field.data.get_ref(i);
|
||||
|
||||
if value.is_null() {
|
||||
self.index_creator
|
||||
.push_with_name(&col_id_str, None)
|
||||
.await
|
||||
.context(PushIndexValueSnafu)?;
|
||||
} else {
|
||||
IndexValueCodec::encode_nonnull_value(value, &sort_field, &mut self.value_buf)?;
|
||||
self.index_creator
|
||||
.push_with_name(&col_id_str, Some(&self.value_buf))
|
||||
.await
|
||||
.context(PushIndexValueSnafu)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -269,7 +288,7 @@ impl InvertedIndexer {
|
||||
}
|
||||
|
||||
pub fn column_ids(&self) -> impl Iterator<Item = ColumnId> + '_ {
|
||||
self.column_ids.iter().copied()
|
||||
self.indexed_column_ids.iter().copied()
|
||||
}
|
||||
|
||||
pub fn memory_usage(&self) -> usize {
|
||||
@@ -297,6 +316,7 @@ mod tests {
|
||||
|
||||
use super::*;
|
||||
use crate::cache::index::InvertedIndexCache;
|
||||
use crate::read::BatchColumn;
|
||||
use crate::row_converter::{McmpRowCodec, RowCodec, SortField};
|
||||
use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
|
||||
use crate::sst::index::puffin_manager::PuffinManagerFactory;
|
||||
@@ -340,12 +360,25 @@ mod tests {
|
||||
semantic_type: SemanticType::Timestamp,
|
||||
column_id: 3,
|
||||
})
|
||||
.push_column_metadata(ColumnMetadata {
|
||||
column_schema: ColumnSchema::new(
|
||||
"field_u64",
|
||||
ConcreteDataType::uint64_datatype(),
|
||||
false,
|
||||
),
|
||||
semantic_type: SemanticType::Field,
|
||||
column_id: 4,
|
||||
})
|
||||
.primary_key(vec![1, 2]);
|
||||
|
||||
Arc::new(builder.build().unwrap())
|
||||
}
|
||||
|
||||
fn new_batch(num_rows: usize, str_tag: impl AsRef<str>, i32_tag: impl Into<i32>) -> Batch {
|
||||
fn new_batch(
|
||||
str_tag: impl AsRef<str>,
|
||||
i32_tag: impl Into<i32>,
|
||||
u64_field: impl IntoIterator<Item = u64>,
|
||||
) -> Batch {
|
||||
let fields = vec![
|
||||
SortField::new(ConcreteDataType::string_datatype()),
|
||||
SortField::new(ConcreteDataType::int32_datatype()),
|
||||
@@ -354,6 +387,12 @@ mod tests {
|
||||
let row: [ValueRef; 2] = [str_tag.as_ref().into(), i32_tag.into().into()];
|
||||
let primary_key = codec.encode(row.into_iter()).unwrap();
|
||||
|
||||
let u64_field = BatchColumn {
|
||||
column_id: 4,
|
||||
data: Arc::new(UInt64Vector::from_iter_values(u64_field)),
|
||||
};
|
||||
let num_rows = u64_field.data.len();
|
||||
|
||||
Batch::new(
|
||||
primary_key,
|
||||
Arc::new(UInt64Vector::from_iter_values(
|
||||
@@ -365,14 +404,14 @@ mod tests {
|
||||
Arc::new(UInt8Vector::from_iter_values(
|
||||
iter::repeat(1).take(num_rows),
|
||||
)),
|
||||
vec![],
|
||||
vec![u64_field],
|
||||
)
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
async fn build_applier_factory(
|
||||
prefix: &str,
|
||||
tags: BTreeSet<(&'static str, i32)>,
|
||||
rows: BTreeSet<(&'static str, i32, [u64; 2])>,
|
||||
) -> impl Fn(DfExpr) -> BoxFuture<'static, Vec<usize>> {
|
||||
let (d, factory) = PuffinManagerFactory::new_for_test_async(prefix).await;
|
||||
let region_dir = "region0".to_string();
|
||||
@@ -383,6 +422,7 @@ mod tests {
|
||||
let intm_mgr = new_intm_mgr(d.path().to_string_lossy()).await;
|
||||
let memory_threshold = None;
|
||||
let segment_row_count = 2;
|
||||
let indexed_column_ids = HashSet::from_iter([1, 2, 4]);
|
||||
|
||||
let mut creator = InvertedIndexer::new(
|
||||
sst_file_id,
|
||||
@@ -390,18 +430,18 @@ mod tests {
|
||||
intm_mgr,
|
||||
memory_threshold,
|
||||
NonZeroUsize::new(segment_row_count).unwrap(),
|
||||
&[],
|
||||
indexed_column_ids.clone(),
|
||||
);
|
||||
|
||||
for (str_tag, i32_tag) in &tags {
|
||||
let batch = new_batch(segment_row_count, str_tag, *i32_tag);
|
||||
for (str_tag, i32_tag, u64_field) in &rows {
|
||||
let batch = new_batch(str_tag, *i32_tag, u64_field.iter().copied());
|
||||
creator.update(&batch).await.unwrap();
|
||||
}
|
||||
|
||||
let puffin_manager = factory.build(object_store.clone());
|
||||
let mut writer = puffin_manager.writer(&file_path).await.unwrap();
|
||||
let (row_count, _) = creator.finish(&mut writer).await.unwrap();
|
||||
assert_eq!(row_count, tags.len() * segment_row_count);
|
||||
assert_eq!(row_count, rows.len() * segment_row_count);
|
||||
writer.finish().await.unwrap();
|
||||
|
||||
move |expr| {
|
||||
@@ -413,7 +453,7 @@ mod tests {
|
||||
None,
|
||||
Some(cache),
|
||||
®ion_metadata,
|
||||
Default::default(),
|
||||
indexed_column_ids.clone(),
|
||||
factory.clone(),
|
||||
)
|
||||
.build(&[expr])
|
||||
@@ -433,19 +473,19 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_create_and_query_get_key() {
|
||||
let tags = BTreeSet::from_iter([
|
||||
("aaa", 1),
|
||||
("aaa", 2),
|
||||
("aaa", 3),
|
||||
("aab", 1),
|
||||
("aab", 2),
|
||||
("aab", 3),
|
||||
("abc", 1),
|
||||
("abc", 2),
|
||||
("abc", 3),
|
||||
let rows = BTreeSet::from_iter([
|
||||
("aaa", 1, [1, 2]),
|
||||
("aaa", 2, [2, 3]),
|
||||
("aaa", 3, [3, 4]),
|
||||
("aab", 1, [4, 5]),
|
||||
("aab", 2, [5, 6]),
|
||||
("aab", 3, [6, 7]),
|
||||
("abc", 1, [7, 8]),
|
||||
("abc", 2, [8, 9]),
|
||||
("abc", 3, [9, 10]),
|
||||
]);
|
||||
|
||||
let applier_factory = build_applier_factory("test_create_and_query_get_key_", tags).await;
|
||||
let applier_factory = build_applier_factory("test_create_and_query_get_key_", rows).await;
|
||||
|
||||
let expr = col("tag_str").eq(lit("aaa"));
|
||||
let res = applier_factory(expr).await;
|
||||
@@ -468,23 +508,27 @@ mod tests {
|
||||
let expr = col("tag_str").in_list(vec![lit("aaa"), lit("abc")], false);
|
||||
let res = applier_factory(expr).await;
|
||||
assert_eq!(res, vec![0, 1, 2, 6, 7, 8]);
|
||||
|
||||
let expr = col("field_u64").eq(lit(2u64));
|
||||
let res = applier_factory(expr).await;
|
||||
assert_eq!(res, vec![0, 1]);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_create_and_query_range() {
|
||||
let tags = BTreeSet::from_iter([
|
||||
("aaa", 1),
|
||||
("aaa", 2),
|
||||
("aaa", 3),
|
||||
("aab", 1),
|
||||
("aab", 2),
|
||||
("aab", 3),
|
||||
("abc", 1),
|
||||
("abc", 2),
|
||||
("abc", 3),
|
||||
let rows = BTreeSet::from_iter([
|
||||
("aaa", 1, [1, 2]),
|
||||
("aaa", 2, [2, 3]),
|
||||
("aaa", 3, [3, 4]),
|
||||
("aab", 1, [4, 5]),
|
||||
("aab", 2, [5, 6]),
|
||||
("aab", 3, [6, 7]),
|
||||
("abc", 1, [7, 8]),
|
||||
("abc", 2, [8, 9]),
|
||||
("abc", 3, [9, 10]),
|
||||
]);
|
||||
|
||||
let applier_factory = build_applier_factory("test_create_and_query_range_", tags).await;
|
||||
let applier_factory = build_applier_factory("test_create_and_query_range_", rows).await;
|
||||
|
||||
let expr = col("tag_str").between(lit("aaa"), lit("aab"));
|
||||
let res = applier_factory(expr).await;
|
||||
@@ -501,24 +545,28 @@ mod tests {
|
||||
let expr = col("tag_i32").between(lit(2), lit(2));
|
||||
let res = applier_factory(expr).await;
|
||||
assert_eq!(res, vec![1, 4, 7]);
|
||||
|
||||
let expr = col("field_u64").between(lit(2u64), lit(5u64));
|
||||
let res = applier_factory(expr).await;
|
||||
assert_eq!(res, vec![0, 1, 2, 3, 4]);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_create_and_query_comparison() {
|
||||
let tags = BTreeSet::from_iter([
|
||||
("aaa", 1),
|
||||
("aaa", 2),
|
||||
("aaa", 3),
|
||||
("aab", 1),
|
||||
("aab", 2),
|
||||
("aab", 3),
|
||||
("abc", 1),
|
||||
("abc", 2),
|
||||
("abc", 3),
|
||||
let rows = BTreeSet::from_iter([
|
||||
("aaa", 1, [1, 2]),
|
||||
("aaa", 2, [2, 3]),
|
||||
("aaa", 3, [3, 4]),
|
||||
("aab", 1, [4, 5]),
|
||||
("aab", 2, [5, 6]),
|
||||
("aab", 3, [6, 7]),
|
||||
("abc", 1, [7, 8]),
|
||||
("abc", 2, [8, 9]),
|
||||
("abc", 3, [9, 10]),
|
||||
]);
|
||||
|
||||
let applier_factory =
|
||||
build_applier_factory("test_create_and_query_comparison_", tags).await;
|
||||
build_applier_factory("test_create_and_query_comparison_", rows).await;
|
||||
|
||||
let expr = col("tag_str").lt(lit("aab"));
|
||||
let res = applier_factory(expr).await;
|
||||
@@ -528,6 +576,10 @@ mod tests {
|
||||
let res = applier_factory(expr).await;
|
||||
assert_eq!(res, vec![0, 3, 6]);
|
||||
|
||||
let expr = col("field_u64").lt(lit(2u64));
|
||||
let res = applier_factory(expr).await;
|
||||
assert_eq!(res, vec![0]);
|
||||
|
||||
let expr = col("tag_str").gt(lit("aab"));
|
||||
let res = applier_factory(expr).await;
|
||||
assert_eq!(res, vec![6, 7, 8]);
|
||||
@@ -536,6 +588,10 @@ mod tests {
|
||||
let res = applier_factory(expr).await;
|
||||
assert_eq!(res, vec![2, 5, 8]);
|
||||
|
||||
let expr = col("field_u64").gt(lit(8u64));
|
||||
let res = applier_factory(expr).await;
|
||||
assert_eq!(res, vec![7, 8]);
|
||||
|
||||
let expr = col("tag_str").lt_eq(lit("aab"));
|
||||
let res = applier_factory(expr).await;
|
||||
assert_eq!(res, vec![0, 1, 2, 3, 4, 5]);
|
||||
@@ -544,6 +600,10 @@ mod tests {
|
||||
let res = applier_factory(expr).await;
|
||||
assert_eq!(res, vec![0, 1, 3, 4, 6, 7]);
|
||||
|
||||
let expr = col("field_u64").lt_eq(lit(2u64));
|
||||
let res = applier_factory(expr).await;
|
||||
assert_eq!(res, vec![0, 1]);
|
||||
|
||||
let expr = col("tag_str").gt_eq(lit("aab"));
|
||||
let res = applier_factory(expr).await;
|
||||
assert_eq!(res, vec![3, 4, 5, 6, 7, 8]);
|
||||
@@ -552,6 +612,10 @@ mod tests {
|
||||
let res = applier_factory(expr).await;
|
||||
assert_eq!(res, vec![1, 2, 4, 5, 7, 8]);
|
||||
|
||||
let expr = col("field_u64").gt_eq(lit(8u64));
|
||||
let res = applier_factory(expr).await;
|
||||
assert_eq!(res, vec![6, 7, 8]);
|
||||
|
||||
let expr = col("tag_str")
|
||||
.gt(lit("aaa"))
|
||||
.and(col("tag_str").lt(lit("abc")));
|
||||
@@ -561,23 +625,29 @@ mod tests {
|
||||
let expr = col("tag_i32").gt(lit(1)).and(col("tag_i32").lt(lit(3)));
|
||||
let res = applier_factory(expr).await;
|
||||
assert_eq!(res, vec![1, 4, 7]);
|
||||
|
||||
let expr = col("field_u64")
|
||||
.gt(lit(2u64))
|
||||
.and(col("field_u64").lt(lit(9u64)));
|
||||
let res = applier_factory(expr).await;
|
||||
assert_eq!(res, vec![1, 2, 3, 4, 5, 6, 7]);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_create_and_query_regex() {
|
||||
let tags = BTreeSet::from_iter([
|
||||
("aaa", 1),
|
||||
("aaa", 2),
|
||||
("aaa", 3),
|
||||
("aab", 1),
|
||||
("aab", 2),
|
||||
("aab", 3),
|
||||
("abc", 1),
|
||||
("abc", 2),
|
||||
("abc", 3),
|
||||
let rows = BTreeSet::from_iter([
|
||||
("aaa", 1, [1, 2]),
|
||||
("aaa", 2, [2, 3]),
|
||||
("aaa", 3, [3, 4]),
|
||||
("aab", 1, [4, 5]),
|
||||
("aab", 2, [5, 6]),
|
||||
("aab", 3, [6, 7]),
|
||||
("abc", 1, [7, 8]),
|
||||
("abc", 2, [8, 9]),
|
||||
("abc", 3, [9, 10]),
|
||||
]);
|
||||
|
||||
let applier_factory = build_applier_factory("test_create_and_query_regex_", tags).await;
|
||||
let applier_factory = build_applier_factory("test_create_and_query_regex_", rows).await;
|
||||
|
||||
let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit(".*"));
|
||||
let res = applier_factory(expr).await;
|
||||
|
||||
Reference in New Issue
Block a user