diff --git a/columnar/src/block_accessor.rs b/columnar/src/block_accessor.rs index 9926553a8..227cf804d 100644 --- a/columnar/src/block_accessor.rs +++ b/columnar/src/block_accessor.rs @@ -58,6 +58,78 @@ impl } } + /// Like `fetch_block_with_missing`, but deduplicates (doc_id, value) pairs + /// so that each unique value per document is returned only once. + /// + /// This is necessary for correct document counting in aggregations, + /// where multi-valued fields can produce duplicate entries that inflate counts. + #[inline] + pub fn fetch_block_with_missing_unique_per_doc( + &mut self, + docs: &[u32], + accessor: &Column, + missing: Option, + ) where + T: Ord, + { + self.fetch_block_with_missing(docs, accessor, missing); + if accessor.index.get_cardinality().is_multivalue() { + self.dedup_docid_val_pairs(); + } + } + + /// Removes duplicate (doc_id, value) pairs from the caches. + /// + /// After `fetch_block`, entries are sorted by doc_id, but values within + /// the same doc may not be sorted (e.g. `(0,1), (0,2), (0,1)`). + /// We group consecutive entries by doc_id, sort values within each group + /// if it has more than 2 elements, then deduplicate adjacent pairs. + /// + /// Skips entirely if no doc_id appears more than once in the block. + fn dedup_docid_val_pairs(&mut self) + where T: Ord { + if self.docid_cache.len() <= 1 { + return; + } + + // Quick check: if no consecutive doc_ids are equal, no dedup needed. + let has_multivalue = self.docid_cache.windows(2).any(|w| w[0] == w[1]); + if !has_multivalue { + return; + } + + // Sort values within each doc_id group so duplicates become adjacent. + let mut start = 0; + while start < self.docid_cache.len() { + let doc = self.docid_cache[start]; + let mut end = start + 1; + while end < self.docid_cache.len() && self.docid_cache[end] == doc { + end += 1; + } + if end - start > 2 { + self.val_cache[start..end].sort(); + } + start = end; + } + + // Now duplicates are adjacent — deduplicate in place. + let mut write = 0; + for read in 1..self.docid_cache.len() { + if self.docid_cache[read] != self.docid_cache[write] + || self.val_cache[read] != self.val_cache[write] + { + write += 1; + if write != read { + self.docid_cache[write] = self.docid_cache[read]; + self.val_cache[write] = self.val_cache[read]; + } + } + } + let new_len = write + 1; + self.docid_cache.truncate(new_len); + self.val_cache.truncate(new_len); + } + #[inline] pub fn iter_vals(&self) -> impl Iterator + '_ { self.val_cache.iter().cloned() @@ -163,4 +235,56 @@ mod tests { assert_eq!(missing_docs, vec![1, 2, 3, 4, 5]); } + + #[test] + fn test_dedup_docid_val_pairs_consecutive() { + let mut accessor = ColumnBlockAccessor::::default(); + accessor.docid_cache = vec![0, 0, 2, 3]; + accessor.val_cache = vec![10, 10, 10, 10]; + accessor.dedup_docid_val_pairs(); + assert_eq!(accessor.docid_cache, vec![0, 2, 3]); + assert_eq!(accessor.val_cache, vec![10, 10, 10]); + } + + #[test] + fn test_dedup_docid_val_pairs_non_consecutive() { + // (0,1), (0,2), (0,1) — duplicate value not adjacent + let mut accessor = ColumnBlockAccessor::::default(); + accessor.docid_cache = vec![0, 0, 0]; + accessor.val_cache = vec![1, 2, 1]; + accessor.dedup_docid_val_pairs(); + assert_eq!(accessor.docid_cache, vec![0, 0]); + assert_eq!(accessor.val_cache, vec![1, 2]); + } + + #[test] + fn test_dedup_docid_val_pairs_multi_doc() { + // doc 0: values [3, 1, 3], doc 1: values [5, 5] + let mut accessor = ColumnBlockAccessor::::default(); + accessor.docid_cache = vec![0, 0, 0, 1, 1]; + accessor.val_cache = vec![3, 1, 3, 5, 5]; + accessor.dedup_docid_val_pairs(); + assert_eq!(accessor.docid_cache, vec![0, 0, 1]); + assert_eq!(accessor.val_cache, vec![1, 3, 5]); + } + + #[test] + fn test_dedup_docid_val_pairs_no_duplicates() { + let mut accessor = ColumnBlockAccessor::::default(); + accessor.docid_cache = vec![0, 0, 1]; + accessor.val_cache = vec![1, 2, 3]; + accessor.dedup_docid_val_pairs(); + assert_eq!(accessor.docid_cache, vec![0, 0, 1]); + assert_eq!(accessor.val_cache, vec![1, 2, 3]); + } + + #[test] + fn test_dedup_docid_val_pairs_single_element() { + let mut accessor = ColumnBlockAccessor::::default(); + accessor.docid_cache = vec![0]; + accessor.val_cache = vec![1]; + accessor.dedup_docid_val_pairs(); + assert_eq!(accessor.docid_cache, vec![0]); + assert_eq!(accessor.val_cache, vec![1]); + } } diff --git a/src/aggregation/bucket/term_agg.rs b/src/aggregation/bucket/term_agg.rs index ed2793bd1..b254b79ee 100644 --- a/src/aggregation/bucket/term_agg.rs +++ b/src/aggregation/bucket/term_agg.rs @@ -807,11 +807,13 @@ impl SegmentAggregationCollector let req_data = &mut self.terms_req_data; - agg_data.column_block_accessor.fetch_block_with_missing( - docs, - &req_data.accessor, - req_data.missing_value_for_accessor, - ); + agg_data + .column_block_accessor + .fetch_block_with_missing_unique_per_doc( + docs, + &req_data.accessor, + req_data.missing_value_for_accessor, + ); if let Some(sub_agg) = &mut self.sub_agg { let term_buckets = &mut self.parent_buckets[parent_bucket_id as usize]; @@ -2347,7 +2349,7 @@ mod tests { // text field assert_eq!(res["my_texts"]["buckets"][0]["key"], "Hello Hello"); - assert_eq!(res["my_texts"]["buckets"][0]["doc_count"], 5); + assert_eq!(res["my_texts"]["buckets"][0]["doc_count"], 4); assert_eq!(res["my_texts"]["buckets"][1]["key"], "Empty"); assert_eq!(res["my_texts"]["buckets"][1]["doc_count"], 2); assert_eq!( @@ -2356,7 +2358,7 @@ mod tests { ); // text field with number as missing fallback assert_eq!(res["my_texts2"]["buckets"][0]["key"], "Hello Hello"); - assert_eq!(res["my_texts2"]["buckets"][0]["doc_count"], 5); + assert_eq!(res["my_texts2"]["buckets"][0]["doc_count"], 4); assert_eq!(res["my_texts2"]["buckets"][1]["key"], 1337.0); assert_eq!(res["my_texts2"]["buckets"][1]["doc_count"], 2); assert_eq!( @@ -2370,7 +2372,7 @@ mod tests { assert_eq!(res["my_ids"]["buckets"][0]["key"], 1337.0); assert_eq!(res["my_ids"]["buckets"][0]["doc_count"], 4); assert_eq!(res["my_ids"]["buckets"][1]["key"], 1.0); - assert_eq!(res["my_ids"]["buckets"][1]["doc_count"], 3); + assert_eq!(res["my_ids"]["buckets"][1]["doc_count"], 2); assert_eq!(res["my_ids"]["buckets"][2]["key"], serde_json::Value::Null); Ok(())