mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-05-27 21:50:41 +00:00
perf: Optimize TermSet for very large sets of terms. (#75)
* Removes allocation in a bunch of places * Removes sorting of terms if we're going to use the fast field execution method * Adds back the (accidentally dropped) cardinality threshold * Removes `bool` support -- using the posting lists is always more efficient for a `bool`, since there are at most two of them * More eagerly constructs the term `HashSet` so that it happens once, rather than once per segment
This commit is contained in:
@@ -9,6 +9,10 @@ use crate::query::{AutomatonWeight, BooleanWeight, EnableScoring, Occur, Query,
|
||||
use crate::schema::{Field, Schema, Type};
|
||||
use crate::{SegmentReader, Term};
|
||||
|
||||
/// The term set query will use the fast field implementation if the number of terms is larger than
|
||||
/// this threshold.
|
||||
const TERM_SET_FAST_FIELD_CARDINALITY_THRESHOLD: usize = 1024;
|
||||
|
||||
/// A Term Set Query matches all of the documents containing any of the Term provided
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct TermSetQuery {
|
||||
@@ -23,11 +27,6 @@ impl TermSetQuery {
|
||||
terms_map.entry(term.field()).or_default().push(term);
|
||||
}
|
||||
|
||||
for terms in terms_map.values_mut() {
|
||||
terms.sort_unstable();
|
||||
terms.dedup();
|
||||
}
|
||||
|
||||
TermSetQuery { terms_map }
|
||||
}
|
||||
|
||||
@@ -37,7 +36,7 @@ impl TermSetQuery {
|
||||
) -> crate::Result<BooleanWeight<DoNothingCombiner>> {
|
||||
let mut sub_queries: Vec<(_, Box<dyn Weight>)> = Vec::with_capacity(self.terms_map.len());
|
||||
|
||||
for (&field, sorted_terms) in self.terms_map.iter() {
|
||||
for (&field, terms) in self.terms_map.iter() {
|
||||
let field_entry = schema.get_field_entry(field);
|
||||
let field_type = field_entry.field_type();
|
||||
if !field_type.is_indexed() {
|
||||
@@ -46,10 +45,15 @@ impl TermSetQuery {
|
||||
}
|
||||
|
||||
let supported_for_ff = match field_type.value_type() {
|
||||
Type::U64 | Type::I64 | Type::F64 | Type::Bool | Type::Date | Type::IpAddr => {
|
||||
Type::U64 | Type::I64 | Type::F64 | Type::Date | Type::IpAddr => {
|
||||
// NOTE: Keep in sync with `FastFieldTermSetWeight::scorer`.
|
||||
true
|
||||
}
|
||||
Type::Bool => {
|
||||
// Guaranteed to be low cardinality, so always more efficient to use posting
|
||||
// lists.
|
||||
false
|
||||
}
|
||||
Type::Json | Type::Str => {
|
||||
// Explicitly not supported yet: see `term_set_query_fastfield.rs`.
|
||||
false
|
||||
@@ -57,21 +61,29 @@ impl TermSetQuery {
|
||||
_ => false,
|
||||
};
|
||||
|
||||
if field_type.is_fast() && supported_for_ff {
|
||||
// NOTE: At this point, the terms have not been deduped, and so this threshold may not
|
||||
// be perfectly accurate. But in the case of very large input sets, it's worth avoiding
|
||||
// sorting/deduping the terms until after we've determined their type.
|
||||
if field_type.is_fast()
|
||||
&& supported_for_ff
|
||||
&& terms.len() > TERM_SET_FAST_FIELD_CARDINALITY_THRESHOLD
|
||||
{
|
||||
sub_queries.push((
|
||||
Occur::Should,
|
||||
Box::new(FastFieldTermSetWeight::new(field, sorted_terms.to_vec())),
|
||||
Box::new(FastFieldTermSetWeight::new(field, terms.iter())?),
|
||||
));
|
||||
} else {
|
||||
let mut sorted_terms: Vec<(&[u8], u64)> = terms
|
||||
.iter()
|
||||
.map(|key| (key.serialized_value_bytes(), 0))
|
||||
.collect::<Vec<_>>();
|
||||
sorted_terms.sort_unstable();
|
||||
sorted_terms.dedup();
|
||||
// In practice this won't fail because:
|
||||
// - we are writing to memory, so no IoError
|
||||
// - Terms are ordered
|
||||
let map = Map::from_iter(
|
||||
sorted_terms
|
||||
.iter()
|
||||
.map(|key| (key.serialized_value_bytes(), 0)),
|
||||
)
|
||||
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
|
||||
// - `sorted_terms` are ordered
|
||||
let map = Map::from_iter(sorted_terms.into_iter())
|
||||
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
|
||||
|
||||
sub_queries.push((
|
||||
Occur::Should,
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::net::Ipv6Addr;
|
||||
|
||||
use columnar::{Column, ColumnType, MonotonicallyMappableToU64};
|
||||
use rustc_hash::{FxHashMap, FxHashSet};
|
||||
|
||||
use crate::query::score_combiner::DoNothingCombiner;
|
||||
use crate::query::{
|
||||
@@ -14,22 +14,17 @@ use crate::{DocId, DocSet, Score, SegmentReader, TantivyError, Term, TERMINATED}
|
||||
#[derive(Debug, Clone)]
|
||||
/// `FastFieldTermSetQuery` is the same as [TermSetQuery] but only uses the fast field.
|
||||
pub struct FastFieldTermSetQuery {
|
||||
terms_map: HashMap<crate::schema::Field, Vec<Term>>,
|
||||
terms_map: FxHashMap<crate::schema::Field, Vec<Term>>,
|
||||
}
|
||||
|
||||
impl FastFieldTermSetQuery {
|
||||
/// Create a new `FastFieldTermSetQuery`.
|
||||
pub fn new<T: IntoIterator<Item = Term>>(terms: T) -> Self {
|
||||
let mut terms_map: HashMap<_, Vec<_>> = HashMap::new();
|
||||
let mut terms_map: FxHashMap<_, Vec<_>> = FxHashMap::default();
|
||||
for term in terms {
|
||||
terms_map.entry(term.field()).or_default().push(term);
|
||||
}
|
||||
|
||||
for terms in terms_map.values_mut() {
|
||||
terms.sort_unstable();
|
||||
terms.dedup();
|
||||
}
|
||||
|
||||
FastFieldTermSetQuery { terms_map }
|
||||
}
|
||||
}
|
||||
@@ -37,10 +32,10 @@ impl FastFieldTermSetQuery {
|
||||
impl Query for FastFieldTermSetQuery {
|
||||
fn weight(&self, _enable_scoring: EnableScoring<'_>) -> crate::Result<Box<dyn Weight>> {
|
||||
let mut sub_queries: Vec<(_, Box<dyn Weight>)> = Vec::with_capacity(self.terms_map.len());
|
||||
for (&field, sorted_terms) in &self.terms_map {
|
||||
for (&field, terms) in &self.terms_map {
|
||||
sub_queries.push((
|
||||
Occur::Should,
|
||||
Box::new(FastFieldTermSetWeight::new(field, sorted_terms.clone())),
|
||||
Box::new(FastFieldTermSetWeight::new(field, terms)?),
|
||||
));
|
||||
}
|
||||
Ok(Box::new(BooleanWeight::new(
|
||||
@@ -53,23 +48,76 @@ impl Query for FastFieldTermSetQuery {
|
||||
|
||||
// --- FastFieldTermSetWeight ---
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
enum TermSet {
|
||||
U64(FxHashSet<u64>),
|
||||
Ipv6Addr(FxHashSet<Ipv6Addr>),
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct FastFieldTermSetWeight {
|
||||
field: crate::schema::Field,
|
||||
terms: Vec<Term>,
|
||||
term_set: Option<TermSet>,
|
||||
}
|
||||
|
||||
impl FastFieldTermSetWeight {
|
||||
pub fn new(field: crate::schema::Field, terms: Vec<Term>) -> Self {
|
||||
Self { field, terms }
|
||||
pub fn new<'a>(
|
||||
field: crate::schema::Field,
|
||||
terms: impl IntoIterator<Item = &'a Term>,
|
||||
) -> crate::Result<Self> {
|
||||
let mut terms_iter = terms.into_iter().peekable();
|
||||
|
||||
if terms_iter.peek().is_none() {
|
||||
return Ok(Self {
|
||||
field,
|
||||
term_set: None,
|
||||
});
|
||||
}
|
||||
|
||||
let first_term_value = terms_iter.peek().unwrap().value();
|
||||
let term_set = if first_term_value.as_ip_addr().is_some() {
|
||||
let mut values = FxHashSet::default();
|
||||
for term in terms_iter {
|
||||
let value = term.value();
|
||||
if let Some(val) = value.as_ip_addr() {
|
||||
values.insert(val);
|
||||
} else {
|
||||
return Err(crate::TantivyError::InvalidArgument(format!(
|
||||
"Expected term with ip address, but got {:?}",
|
||||
term
|
||||
)));
|
||||
}
|
||||
}
|
||||
TermSet::Ipv6Addr(values)
|
||||
} else {
|
||||
// Numeric types.
|
||||
//
|
||||
// NOTE: Keep in sync with `TermSetQuery::specialized_weight`.
|
||||
let mut values = FxHashSet::default();
|
||||
for term in terms_iter {
|
||||
let Some(val_u64) = term.value().as_u64_lenient() else {
|
||||
return Err(crate::TantivyError::InvalidArgument(format!(
|
||||
"Expected term with u64, i64, f64, or date, but got {:?}",
|
||||
term
|
||||
)));
|
||||
};
|
||||
values.insert(val_u64);
|
||||
}
|
||||
TermSet::U64(values)
|
||||
};
|
||||
|
||||
Ok(Self {
|
||||
field,
|
||||
term_set: Some(term_set),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl Weight for FastFieldTermSetWeight {
|
||||
fn scorer(&self, reader: &SegmentReader, boost: Score) -> crate::Result<Box<dyn Scorer>> {
|
||||
if self.terms.is_empty() {
|
||||
let Some(term_set) = &self.term_set else {
|
||||
return Ok(Box::new(EmptyScorer));
|
||||
}
|
||||
};
|
||||
|
||||
let field_entry = reader.schema().get_field_entry(self.field);
|
||||
let field_type = field_entry.field_type();
|
||||
@@ -77,72 +125,59 @@ impl Weight for FastFieldTermSetWeight {
|
||||
|
||||
if field_type.is_json() {
|
||||
// TODO: Handle JSON fields.
|
||||
Err(crate::TantivyError::InvalidArgument(format!(
|
||||
return Err(crate::TantivyError::InvalidArgument(format!(
|
||||
"unsupported type for fast fields TermSet {field_type:?}",
|
||||
)))
|
||||
)));
|
||||
} else if field_type.is_str() {
|
||||
// TODO: Handle Str fields. They are superficially simple, because we can convert all
|
||||
// input terms to TermOrdinals, and then use the numeric codepath. But it would require
|
||||
// a batch operation for looking up many terms, because otherwise each term lookup would
|
||||
// involve decompressing a term dictionary block (some of them repeatedly).
|
||||
Err(crate::TantivyError::InvalidArgument(format!(
|
||||
// involve decompressing a term dictionary block (some of them repeatedly). And those
|
||||
// lookups would need to happen per-segment, unlike with numeric types.
|
||||
return Err(crate::TantivyError::InvalidArgument(format!(
|
||||
"unsupported type for fast fields TermSet {field_type:?}",
|
||||
)))
|
||||
} else if field_type.is_ip_addr() {
|
||||
let mut values = HashSet::new();
|
||||
for term in &self.terms {
|
||||
values.insert(term.value().as_ip_addr().unwrap());
|
||||
}
|
||||
)));
|
||||
}
|
||||
|
||||
let Some(ip_addr_column): Option<Column<Ipv6Addr>> =
|
||||
reader.fast_fields().column_opt(field_name)?
|
||||
else {
|
||||
return Ok(Box::new(EmptyScorer));
|
||||
};
|
||||
let docset = TermSetDocSet::new(ip_addr_column, values);
|
||||
Ok(Box::new(ConstScorer::new(docset, boost)))
|
||||
} else {
|
||||
// Numeric types.
|
||||
//
|
||||
// NOTE: Keep in sync with `TermSetQuery::specialized_weight`.
|
||||
let mut values = HashSet::new();
|
||||
for term in &self.terms {
|
||||
let value = term.value();
|
||||
let val_u64 = if let Some(val) = value.as_u64() {
|
||||
val
|
||||
} else if let Some(val) = value.as_i64() {
|
||||
val.to_u64()
|
||||
} else if let Some(val) = value.as_f64() {
|
||||
val.to_u64()
|
||||
} else if let Some(val) = value.as_bool() {
|
||||
val.to_u64()
|
||||
} else if let Some(val) = value.as_date() {
|
||||
val.to_u64()
|
||||
} else {
|
||||
match term_set {
|
||||
TermSet::Ipv6Addr(values) => {
|
||||
if !field_type.is_ip_addr() {
|
||||
return Err(crate::TantivyError::InvalidArgument(format!(
|
||||
"Expected term with u64, i64, f64, bool, or date, but got {:?}",
|
||||
term
|
||||
"fast fields TermSet for field `{field_name}` contains IP addresses, but \
|
||||
the field type is {field_type:?}"
|
||||
)));
|
||||
}
|
||||
let Some(ip_addr_column): Option<Column<Ipv6Addr>> =
|
||||
reader.fast_fields().column_opt(field_name)?
|
||||
else {
|
||||
return Ok(Box::new(EmptyScorer));
|
||||
};
|
||||
values.insert(val_u64);
|
||||
let docset = TermSetDocSet::new(ip_addr_column, values.clone());
|
||||
Ok(Box::new(ConstScorer::new(docset, boost)))
|
||||
}
|
||||
TermSet::U64(values) => {
|
||||
if field_type.is_ip_addr() {
|
||||
return Err(crate::TantivyError::InvalidArgument(format!(
|
||||
"fast fields TermSet for field `{field_name}` contains numeric values, \
|
||||
but the field type is {field_type:?}"
|
||||
)));
|
||||
}
|
||||
let fast_field_reader = reader.fast_fields();
|
||||
let Some((column, _col_type)) = fast_field_reader.u64_lenient_for_type(
|
||||
Some(&[
|
||||
ColumnType::U64,
|
||||
ColumnType::I64,
|
||||
ColumnType::F64,
|
||||
ColumnType::DateTime,
|
||||
]),
|
||||
field_name,
|
||||
)?
|
||||
else {
|
||||
return Ok(Box::new(EmptyScorer));
|
||||
};
|
||||
let docset = TermSetDocSet::new(column, values.clone());
|
||||
Ok(Box::new(ConstScorer::new(docset, boost)))
|
||||
}
|
||||
|
||||
let fast_field_reader = reader.fast_fields();
|
||||
let Some((column, _col_type)) = fast_field_reader.u64_lenient_for_type(
|
||||
Some(&[
|
||||
ColumnType::U64,
|
||||
ColumnType::I64,
|
||||
ColumnType::F64,
|
||||
ColumnType::Bool,
|
||||
ColumnType::DateTime,
|
||||
]),
|
||||
field_name,
|
||||
)?
|
||||
else {
|
||||
return Ok(Box::new(EmptyScorer));
|
||||
};
|
||||
let docset = TermSetDocSet::new(column, values);
|
||||
Ok(Box::new(ConstScorer::new(docset, boost)))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -163,7 +198,7 @@ impl Weight for FastFieldTermSetWeight {
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct TermSetDocSet<T: Copy + Eq + std::hash::Hash> {
|
||||
column: Column<T>,
|
||||
values: HashSet<T>,
|
||||
values: FxHashSet<T>,
|
||||
doc_id: DocId,
|
||||
max_doc: DocId,
|
||||
}
|
||||
@@ -171,7 +206,7 @@ pub(crate) struct TermSetDocSet<T: Copy + Eq + std::hash::Hash> {
|
||||
impl<T: Copy + Eq + std::hash::Hash + PartialOrd + std::fmt::Debug + Send + Sync + 'static>
|
||||
TermSetDocSet<T>
|
||||
{
|
||||
pub fn new(column: Column<T>, values: HashSet<T>) -> Self {
|
||||
pub fn new(column: Column<T>, values: FxHashSet<T>) -> Self {
|
||||
let max_doc = column.num_docs();
|
||||
let mut doc_set = Self {
|
||||
column,
|
||||
@@ -323,26 +358,6 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn test_term_set_query_fast_field_bool() -> crate::Result<()> {
|
||||
let index = create_test_index()?;
|
||||
let reader = index.reader()?;
|
||||
let searcher = reader.searcher();
|
||||
let bool_field_fast = index.schema().get_field("bool_fast").unwrap();
|
||||
|
||||
let query = FastFieldTermSetQuery::new(vec![Term::from_field_bool(bool_field_fast, true)]);
|
||||
|
||||
let count = searcher.search(&query, &Count)?;
|
||||
assert_eq!(count, 2);
|
||||
|
||||
let query = FastFieldTermSetQuery::new(vec![Term::from_field_bool(bool_field_fast, false)]);
|
||||
|
||||
let count = searcher.search(&query, &Count)?;
|
||||
assert_eq!(count, 1);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn test_term_set_query_fast_field_no_match() -> crate::Result<()> {
|
||||
let index = create_test_index()?;
|
||||
|
||||
@@ -332,6 +332,21 @@ where B: AsRef<[u8]>
|
||||
self.get_fast_type::<u64>()
|
||||
}
|
||||
|
||||
/// Returns the `u64` representation of a FastValue stored in a term.
|
||||
///
|
||||
/// Returns `None` if the term is not of a FastValue type, or if the term byte representation
|
||||
/// is invalid.
|
||||
pub fn as_u64_lenient(&self) -> Option<u64> {
|
||||
if !matches!(
|
||||
self.typ(),
|
||||
Type::U64 | Type::I64 | Type::F64 | Type::Bool | Type::Date
|
||||
) {
|
||||
return None;
|
||||
}
|
||||
let value_bytes = self.raw_value_bytes_payload();
|
||||
Some(u64::from_be_bytes(value_bytes.try_into().ok()?))
|
||||
}
|
||||
|
||||
fn get_fast_type<T: FastValue>(&self) -> Option<T> {
|
||||
if self.typ() != T::to_type() {
|
||||
return None;
|
||||
|
||||
Reference in New Issue
Block a user