diff --git a/columnar/src/block_accessor.rs b/columnar/src/block_accessor.rs index 227cf804d..19aa980cc 100644 --- a/columnar/src/block_accessor.rs +++ b/columnar/src/block_accessor.rs @@ -33,14 +33,14 @@ impl &mut self, docs: &[u32], accessor: &Column, - missing: Option, + missing_opt: Option, ) { self.fetch_block(docs, accessor); // no missing values if accessor.index.get_cardinality().is_full() { return; } - let Some(missing) = missing else { + let Some(missing) = missing_opt else { return; }; diff --git a/src/aggregation/metric/cardinality.rs b/src/aggregation/metric/cardinality.rs index eb6b4f8e9..0f6fec5a0 100644 --- a/src/aggregation/metric/cardinality.rs +++ b/src/aggregation/metric/cardinality.rs @@ -4,7 +4,6 @@ use std::io; use columnar::column_values::CompactSpaceU64Accessor; use columnar::{Column, ColumnType, Dictionary, StrColumn}; -use common::f64_to_u64; use datasketches::hll::{HllSketch, HllType, HllUnion}; use rustc_hash::{FxBuildHasher, FxHashMap, FxHashSet}; use serde::{Deserialize, Deserializer, Serialize, Serializer}; @@ -21,8 +20,6 @@ use crate::TantivyError; /// 2^11 = 2048 registers, giving ~2.3% relative error and ~1KB per sketch (Hll4). const LG_K: u8 = 11; -const MISSING_TERM_SENTINEL: u64 = u64::MAX; - /// # Cardinality /// /// The cardinality aggregation allows for computing an estimate @@ -136,7 +133,10 @@ enum CouponCache { coupon_map: Vec, missing_coupon_opt: Option, }, - Sparse(FxHashMap), + Sparse { + coupon_map: FxHashMap, + missing_coupon_opt: Option, + }, } impl CouponCache { @@ -154,7 +154,15 @@ impl CouponCache { }; } let highest_term_ord = term_ords.last().copied().unwrap_or(0u64); - if highest_term_ord < 1_000_000u64 || highest_term_ord < num_terms as u64 * 10u64 { + // We prefer the dense implementation, if it is not too wasteful. + // There are two cases for which we can use it. + // 1- if the data is small. + // 2- if the data is not necessarily small, but due to a high occupancy ratio, the RAM usage + // is not that much bigger than if we had used a HashSet. (occupancy ratio + extra + // metadata ~ x2.25) + let should_use_dense = + highest_term_ord < 1_000_000u64 || highest_term_ord < num_terms as u64 * 3u64; + if should_use_dense { let mut coupon_map: Vec = vec![0; highest_term_ord as usize + 1]; for (term_ord, coupon) in term_ords.into_iter().zip(coupons.into_iter()) { coupon_map[term_ord as usize] = coupon; @@ -164,12 +172,12 @@ impl CouponCache { missing_coupon_opt, } } else { - let mut coupon_cache_sparse: FxHashMap = + let coupon_map: FxHashMap = term_ords.into_iter().zip(coupons.into_iter()).collect(); - if let Some(missing_coupon) = missing_coupon_opt { - coupon_cache_sparse.insert(MISSING_TERM_SENTINEL, missing_coupon); + CouponCache::Sparse { + coupon_map, + missing_coupon_opt, } - CouponCache::Sparse(coupon_cache_sparse) } } } @@ -239,15 +247,14 @@ fn build_coupon_cache( .flatten() .map(|bucket| bucket.entries.len()) .sum(); - let mut term_ords_set = - FxHashSet::with_capacity_and_hasher(term_ords_capacity, FxBuildHasher); + let mut term_ords_set = FxHashSet::with_capacity_and_hasher(term_ords_capacity, FxBuildHasher); for bucket in buckets.iter().flatten() { term_ords_set.extend(bucket.entries.iter().copied()); } let mut term_ords: Vec = term_ords_set.into_iter().collect(); term_ords.sort_unstable(); - term_ords.pop_if(|highest_term_ord| *highest_term_ord == MISSING_TERM_SENTINEL); + term_ords.pop_if(|highest_term_ord| *highest_term_ord >= dictionary.num_terms() as u64); let mut coupons: Vec = Vec::with_capacity(term_ords.capacity()); dictionary.sorted_ords_to_term_cb(&term_ords, |term_bytes| { @@ -257,11 +264,16 @@ fn build_coupon_cache( // Regardless of whether or not there is effectively a missing value in one of the buckets, // we populate the cache with the missing key too (if any). - let missing_coupon_opt: Option = missing_value_opt.and_then(|missing_key| { + let missing_coupon_opt: Option = missing_value_opt.map(|missing_key| { if let Key::Str(missing_value_str) = missing_key { - Some(murmurhash32::murmurhash2(missing_value_str.as_bytes())) + murmurhash32::murmurhash2(missing_value_str.as_bytes()) } else { - None + // See https://github.com/quickwit-oss/tantivy/issues/2891 + // A missing key with a type different from Str will not work as intended + // for the moment. + // + // Right now this is just a partial workaround. + 35679954u32 } }); Ok(CouponCache::new(term_ords, coupons, missing_coupon_opt)) @@ -287,9 +299,12 @@ fn append_to_sketch( } } } - CouponCache::Sparse(hash_map) => { + CouponCache::Sparse { + coupon_map, + missing_coupon_opt, + } => { for term_ord in term_ords { - if let Some(coupon) = hash_map.get(term_ord).copied() { + if let Some(coupon) = coupon_map.get(term_ord).copied().or(*missing_coupon_opt) { sketch.insert_coupon(coupon); } } @@ -327,7 +342,11 @@ impl SegmentCardinalityCollector { { return Ok(()); } - self.coupon_cache = Some(build_coupon_cache(&self.buckets, dictionary, missing_value_opt)?); + self.coupon_cache = Some(build_coupon_cache( + &self.buckets, + dictionary, + missing_value_opt, + )?); Ok(()) } @@ -522,7 +541,7 @@ mod tests { use crate::aggregation::agg_req::Aggregations; use crate::aggregation::tests::{exec_request, get_test_index_from_terms}; - use crate::schema::{IntoIpv6Addr, Schema, FAST}; + use crate::schema::{IntoIpv6Addr, Schema, FAST, STRING}; use crate::Index; #[test] @@ -705,6 +724,30 @@ mod tests { assert_eq!(estimate, 3.0); } + /// Verifies that merging two small sketches (both in List/Set coupon mode) + /// produces an exact result — i.e. the HllUnion does not unnecessarily + /// promote to the full HLL array when the combined cardinality is small. + #[test] + fn cardinality_collector_merge_stays_exact_for_small_sets() { + use super::CardinalityCollector; + + let mut left = CardinalityCollector::default(); + for i in 0u64..50 { + left.insert(i); + } + + let mut right = CardinalityCollector::default(); + for i in 30u64..100 { + right.insert(i); + } + + left.merge_fruits(right).unwrap(); + let estimate = left.finalize().unwrap(); + // 100 distinct values (0..100). Both sketches are in Set mode (< 192 coupons), + // so the union should stay in coupon mode and give an exact count. + assert_eq!(estimate, 100.0); + } + #[test] fn cardinality_collector_serialize_deserialize_binary() { use datasketches::hll::HllSketch; @@ -721,6 +764,98 @@ mod tests { assert!((deserialized.estimate() - 3.0).abs() < 0.01); } + /// Tests that the `missing` parameter correctly counts a single empty document + /// for both u64 and str columns. + #[test] + fn cardinality_aggregation_missing_value_single_empty_doc() { + let mut schema_builder = Schema::builder(); + let id_field = schema_builder.add_u64_field("id", FAST); + let name_field = schema_builder.add_text_field("name", STRING | FAST); + let index = Index::create_in_ram(schema_builder.build()); + let mut writer = index.writer_for_tests().unwrap(); + writer + .add_document(doc!(id_field=>1u64,name_field=>"some_name")) + .unwrap(); + writer.add_document(doc!()).unwrap(); + writer.commit().unwrap(); + + { + // int colum with missing value non redundant + let agg_req: Aggregations = serde_json::from_value(json!({ + "cardinality": { + "cardinality": { + "field": "id", + "missing": 42u64 + }, + } + })) + .unwrap(); + let res = exec_request(agg_req, &index).unwrap(); + assert_eq!(res["cardinality"]["value"], 2.0); + } + + { + // int colum with missing value redundant + let agg_req: Aggregations = serde_json::from_value(json!({ + "cardinality": { + "cardinality": { + "field": "id", + "missing": 1u64 + }, + } + })) + .unwrap(); + let res = exec_request(agg_req, &index).unwrap(); + assert_eq!(res["cardinality"]["value"], 1.0); + } + + { + // str colum with missing value non redundant + // With more than one segment, this is not well handled. + let agg_req: Aggregations = serde_json::from_value(json!({ + "cardinality": { + "cardinality": { + "field": "name", + "missing": "other_name" + }, + } + })) + .unwrap(); + let res = exec_request(agg_req, &index).unwrap(); + assert_eq!(res["cardinality"]["value"], 2.0); + } + + { + // str colum with missing value redundant + let agg_req: Aggregations = serde_json::from_value(json!({ + "cardinality": { + "cardinality": { + "field": "name", + "missing": "some_name" + }, + } + })) + .unwrap(); + let res = exec_request(agg_req, &index).unwrap(); + assert_eq!(res["cardinality"]["value"], 1.0); + } + + { + // str column with missing value with a number type. + let agg_req: Aggregations = serde_json::from_value(json!({ + "cardinality": { + "cardinality": { + "field": "name", + "missing": 3, + }, + } + })) + .unwrap(); + let res = exec_request(agg_req, &index).unwrap(); + assert_eq!(res["cardinality"]["value"], 2.0); + } + } + #[test] fn cardinality_collector_salt_differentiates_types() { use super::CardinalityCollector;