mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-06-05 01:50:42 +00:00
fix: deduplicate doc counts in term aggregation for multi-valued fields (#2854)
* fix: deduplicate doc counts in term aggregation for multi-valued fields Term aggregation was counting term occurrences instead of documents for multi-valued fields. A document with the same value appearing multiple times would inflate doc_count. Add `fetch_block_with_missing_unique_per_doc` to ColumnBlockAccessor that deduplicates (doc_id, value) pairs, and use it in term aggregation. Fixes #2721 * refactor: only deduplicate for multivalue cardinality Duplicates can only occur with multivalue columns, so narrow the check from !is_full() to is_multivalue(). * fix: handle non-consecutive duplicate values in dedup Sort values within each doc_id group before deduplicating, so that non-adjacent duplicates are correctly handled. Add unit tests for dedup_docid_val_pairs: consecutive duplicates, non-consecutive duplicates, multi-doc groups, no duplicates, and single element. * perf: skip dedup when block has no multivalue entries Add early return when no consecutive doc_ids are equal, avoiding unnecessary sort and dedup passes. Remove the 2-element swap optimization as it is not needed by the dedup algorithm. --------- Co-authored-by: nryoo <nryoo@nryooui-MacBookPro.local>
This commit is contained in:
@@ -58,6 +58,78 @@ impl<T: PartialOrd + Copy + std::fmt::Debug + Send + Sync + 'static + Default>
|
||||
}
|
||||
}
|
||||
|
||||
/// Like `fetch_block_with_missing`, but deduplicates (doc_id, value) pairs
|
||||
/// so that each unique value per document is returned only once.
|
||||
///
|
||||
/// This is necessary for correct document counting in aggregations,
|
||||
/// where multi-valued fields can produce duplicate entries that inflate counts.
|
||||
#[inline]
|
||||
pub fn fetch_block_with_missing_unique_per_doc(
|
||||
&mut self,
|
||||
docs: &[u32],
|
||||
accessor: &Column<T>,
|
||||
missing: Option<T>,
|
||||
) where
|
||||
T: Ord,
|
||||
{
|
||||
self.fetch_block_with_missing(docs, accessor, missing);
|
||||
if accessor.index.get_cardinality().is_multivalue() {
|
||||
self.dedup_docid_val_pairs();
|
||||
}
|
||||
}
|
||||
|
||||
/// Removes duplicate (doc_id, value) pairs from the caches.
|
||||
///
|
||||
/// After `fetch_block`, entries are sorted by doc_id, but values within
|
||||
/// the same doc may not be sorted (e.g. `(0,1), (0,2), (0,1)`).
|
||||
/// We group consecutive entries by doc_id, sort values within each group
|
||||
/// if it has more than 2 elements, then deduplicate adjacent pairs.
|
||||
///
|
||||
/// Skips entirely if no doc_id appears more than once in the block.
|
||||
fn dedup_docid_val_pairs(&mut self)
|
||||
where T: Ord {
|
||||
if self.docid_cache.len() <= 1 {
|
||||
return;
|
||||
}
|
||||
|
||||
// Quick check: if no consecutive doc_ids are equal, no dedup needed.
|
||||
let has_multivalue = self.docid_cache.windows(2).any(|w| w[0] == w[1]);
|
||||
if !has_multivalue {
|
||||
return;
|
||||
}
|
||||
|
||||
// Sort values within each doc_id group so duplicates become adjacent.
|
||||
let mut start = 0;
|
||||
while start < self.docid_cache.len() {
|
||||
let doc = self.docid_cache[start];
|
||||
let mut end = start + 1;
|
||||
while end < self.docid_cache.len() && self.docid_cache[end] == doc {
|
||||
end += 1;
|
||||
}
|
||||
if end - start > 2 {
|
||||
self.val_cache[start..end].sort();
|
||||
}
|
||||
start = end;
|
||||
}
|
||||
|
||||
// Now duplicates are adjacent — deduplicate in place.
|
||||
let mut write = 0;
|
||||
for read in 1..self.docid_cache.len() {
|
||||
if self.docid_cache[read] != self.docid_cache[write]
|
||||
|| self.val_cache[read] != self.val_cache[write]
|
||||
{
|
||||
write += 1;
|
||||
if write != read {
|
||||
self.docid_cache[write] = self.docid_cache[read];
|
||||
self.val_cache[write] = self.val_cache[read];
|
||||
}
|
||||
}
|
||||
}
|
||||
let new_len = write + 1;
|
||||
self.docid_cache.truncate(new_len);
|
||||
self.val_cache.truncate(new_len);
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn iter_vals(&self) -> impl Iterator<Item = T> + '_ {
|
||||
self.val_cache.iter().cloned()
|
||||
@@ -163,4 +235,56 @@ mod tests {
|
||||
|
||||
assert_eq!(missing_docs, vec![1, 2, 3, 4, 5]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_dedup_docid_val_pairs_consecutive() {
|
||||
let mut accessor = ColumnBlockAccessor::<u64>::default();
|
||||
accessor.docid_cache = vec![0, 0, 2, 3];
|
||||
accessor.val_cache = vec![10, 10, 10, 10];
|
||||
accessor.dedup_docid_val_pairs();
|
||||
assert_eq!(accessor.docid_cache, vec![0, 2, 3]);
|
||||
assert_eq!(accessor.val_cache, vec![10, 10, 10]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_dedup_docid_val_pairs_non_consecutive() {
|
||||
// (0,1), (0,2), (0,1) — duplicate value not adjacent
|
||||
let mut accessor = ColumnBlockAccessor::<u64>::default();
|
||||
accessor.docid_cache = vec![0, 0, 0];
|
||||
accessor.val_cache = vec![1, 2, 1];
|
||||
accessor.dedup_docid_val_pairs();
|
||||
assert_eq!(accessor.docid_cache, vec![0, 0]);
|
||||
assert_eq!(accessor.val_cache, vec![1, 2]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_dedup_docid_val_pairs_multi_doc() {
|
||||
// doc 0: values [3, 1, 3], doc 1: values [5, 5]
|
||||
let mut accessor = ColumnBlockAccessor::<u64>::default();
|
||||
accessor.docid_cache = vec![0, 0, 0, 1, 1];
|
||||
accessor.val_cache = vec![3, 1, 3, 5, 5];
|
||||
accessor.dedup_docid_val_pairs();
|
||||
assert_eq!(accessor.docid_cache, vec![0, 0, 1]);
|
||||
assert_eq!(accessor.val_cache, vec![1, 3, 5]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_dedup_docid_val_pairs_no_duplicates() {
|
||||
let mut accessor = ColumnBlockAccessor::<u64>::default();
|
||||
accessor.docid_cache = vec![0, 0, 1];
|
||||
accessor.val_cache = vec![1, 2, 3];
|
||||
accessor.dedup_docid_val_pairs();
|
||||
assert_eq!(accessor.docid_cache, vec![0, 0, 1]);
|
||||
assert_eq!(accessor.val_cache, vec![1, 2, 3]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_dedup_docid_val_pairs_single_element() {
|
||||
let mut accessor = ColumnBlockAccessor::<u64>::default();
|
||||
accessor.docid_cache = vec![0];
|
||||
accessor.val_cache = vec![1];
|
||||
accessor.dedup_docid_val_pairs();
|
||||
assert_eq!(accessor.docid_cache, vec![0]);
|
||||
assert_eq!(accessor.val_cache, vec![1]);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -807,11 +807,13 @@ impl<TermMap: TermAggregationMap, C: SubAggCache> SegmentAggregationCollector
|
||||
|
||||
let req_data = &mut self.terms_req_data;
|
||||
|
||||
agg_data.column_block_accessor.fetch_block_with_missing(
|
||||
docs,
|
||||
&req_data.accessor,
|
||||
req_data.missing_value_for_accessor,
|
||||
);
|
||||
agg_data
|
||||
.column_block_accessor
|
||||
.fetch_block_with_missing_unique_per_doc(
|
||||
docs,
|
||||
&req_data.accessor,
|
||||
req_data.missing_value_for_accessor,
|
||||
);
|
||||
|
||||
if let Some(sub_agg) = &mut self.sub_agg {
|
||||
let term_buckets = &mut self.parent_buckets[parent_bucket_id as usize];
|
||||
@@ -2347,7 +2349,7 @@ mod tests {
|
||||
|
||||
// text field
|
||||
assert_eq!(res["my_texts"]["buckets"][0]["key"], "Hello Hello");
|
||||
assert_eq!(res["my_texts"]["buckets"][0]["doc_count"], 5);
|
||||
assert_eq!(res["my_texts"]["buckets"][0]["doc_count"], 4);
|
||||
assert_eq!(res["my_texts"]["buckets"][1]["key"], "Empty");
|
||||
assert_eq!(res["my_texts"]["buckets"][1]["doc_count"], 2);
|
||||
assert_eq!(
|
||||
@@ -2356,7 +2358,7 @@ mod tests {
|
||||
);
|
||||
// text field with number as missing fallback
|
||||
assert_eq!(res["my_texts2"]["buckets"][0]["key"], "Hello Hello");
|
||||
assert_eq!(res["my_texts2"]["buckets"][0]["doc_count"], 5);
|
||||
assert_eq!(res["my_texts2"]["buckets"][0]["doc_count"], 4);
|
||||
assert_eq!(res["my_texts2"]["buckets"][1]["key"], 1337.0);
|
||||
assert_eq!(res["my_texts2"]["buckets"][1]["doc_count"], 2);
|
||||
assert_eq!(
|
||||
@@ -2370,7 +2372,7 @@ mod tests {
|
||||
assert_eq!(res["my_ids"]["buckets"][0]["key"], 1337.0);
|
||||
assert_eq!(res["my_ids"]["buckets"][0]["doc_count"], 4);
|
||||
assert_eq!(res["my_ids"]["buckets"][1]["key"], 1.0);
|
||||
assert_eq!(res["my_ids"]["buckets"][1]["doc_count"], 3);
|
||||
assert_eq!(res["my_ids"]["buckets"][1]["doc_count"], 2);
|
||||
assert_eq!(res["my_ids"]["buckets"][2]["key"], serde_json::Value::Null);
|
||||
|
||||
Ok(())
|
||||
|
||||
Reference in New Issue
Block a user